1 /* 2 * Copyright (c) 2016, Salvatore Sanfilippo <antirez at gmail dot com> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * * Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * * Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * * Neither the name of Redis nor the names of its contributors may be used 14 * to endorse or promote products derived from this software without 15 * specific prior written permission. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27 * POSSIBILITY OF SUCH DAMAGE. 28 */ 29 30 #include "server.h" 31 #include "cluster.h" 32 #include <dlfcn.h> 33 34 #define REDISMODULE_CORE 1 35 #include "redismodule.h" 36 37 /* -------------------------------------------------------------------------- 38 * Private data structures used by the modules system. Those are data 39 * structures that are never exposed to Redis Modules, if not as void 40 * pointers that have an API the module can call with them) 41 * -------------------------------------------------------------------------- */ 42 43 /* This structure represents a module inside the system. */ 44 struct RedisModule { 45 void *handle; /* Module dlopen() handle. */ 46 char *name; /* Module name. */ 47 int ver; /* Module version. We use just progressive integers. */ 48 int apiver; /* Module API version as requested during initialization.*/ 49 list *types; /* Module data types. */ 50 list *usedby; /* List of modules using APIs from this one. */ 51 list *using; /* List of modules we use some APIs of. */ 52 list *filters; /* List of filters the module has registered. */ 53 int in_call; /* RM_Call() nesting level */ 54 }; 55 typedef struct RedisModule RedisModule; 56 57 /* This represents a shared API. Shared APIs will be used to populate 58 * the server.sharedapi dictionary, mapping names of APIs exported by 59 * modules for other modules to use, to their structure specifying the 60 * function pointer that can be called. */ 61 struct RedisModuleSharedAPI { 62 void *func; 63 RedisModule *module; 64 }; 65 typedef struct RedisModuleSharedAPI RedisModuleSharedAPI; 66 67 static dict *modules; /* Hash table of modules. SDS -> RedisModule ptr.*/ 68 69 /* Entries in the context->amqueue array, representing objects to free 70 * when the callback returns. */ 71 struct AutoMemEntry { 72 void *ptr; 73 int type; 74 }; 75 76 /* AutMemEntry type field values. */ 77 #define REDISMODULE_AM_KEY 0 78 #define REDISMODULE_AM_STRING 1 79 #define REDISMODULE_AM_REPLY 2 80 #define REDISMODULE_AM_FREED 3 /* Explicitly freed by user already. */ 81 #define REDISMODULE_AM_DICT 4 82 83 /* The pool allocator block. Redis Modules can allocate memory via this special 84 * allocator that will automatically release it all once the callback returns. 85 * This means that it can only be used for ephemeral allocations. However 86 * there are two advantages for modules to use this API: 87 * 88 * 1) The memory is automatically released when the callback returns. 89 * 2) This allocator is faster for many small allocations since whole blocks 90 * are allocated, and small pieces returned to the caller just advancing 91 * the index of the allocation. 92 * 93 * Allocations are always rounded to the size of the void pointer in order 94 * to always return aligned memory chunks. */ 95 96 #define REDISMODULE_POOL_ALLOC_MIN_SIZE (1024*8) 97 #define REDISMODULE_POOL_ALLOC_ALIGN (sizeof(void*)) 98 99 typedef struct RedisModulePoolAllocBlock { 100 uint32_t size; 101 uint32_t used; 102 struct RedisModulePoolAllocBlock *next; 103 char memory[]; 104 } RedisModulePoolAllocBlock; 105 106 /* This structure represents the context in which Redis modules operate. 107 * Most APIs module can access, get a pointer to the context, so that the API 108 * implementation can hold state across calls, or remember what to free after 109 * the call and so forth. 110 * 111 * Note that not all the context structure is always filled with actual values 112 * but only the fields needed in a given context. */ 113 114 struct RedisModuleBlockedClient; 115 116 struct RedisModuleCtx { 117 void *getapifuncptr; /* NOTE: Must be the first field. */ 118 struct RedisModule *module; /* Module reference. */ 119 client *client; /* Client calling a command. */ 120 struct RedisModuleBlockedClient *blocked_client; /* Blocked client for 121 thread safe context. */ 122 struct AutoMemEntry *amqueue; /* Auto memory queue of objects to free. */ 123 int amqueue_len; /* Number of slots in amqueue. */ 124 int amqueue_used; /* Number of used slots in amqueue. */ 125 int flags; /* REDISMODULE_CTX_... flags. */ 126 void **postponed_arrays; /* To set with RM_ReplySetArrayLength(). */ 127 int postponed_arrays_count; /* Number of entries in postponed_arrays. */ 128 void *blocked_privdata; /* Privdata set when unblocking a client. */ 129 130 /* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST flag set. */ 131 int *keys_pos; 132 int keys_count; 133 134 struct RedisModulePoolAllocBlock *pa_head; 135 }; 136 typedef struct RedisModuleCtx RedisModuleCtx; 137 138 #define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL} 139 #define REDISMODULE_CTX_MULTI_EMITTED (1<<0) 140 #define REDISMODULE_CTX_AUTO_MEMORY (1<<1) 141 #define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2) 142 #define REDISMODULE_CTX_BLOCKED_REPLY (1<<3) 143 #define REDISMODULE_CTX_BLOCKED_TIMEOUT (1<<4) 144 #define REDISMODULE_CTX_THREAD_SAFE (1<<5) 145 #define REDISMODULE_CTX_BLOCKED_DISCONNECTED (1<<6) 146 147 /* This represents a Redis key opened with RM_OpenKey(). */ 148 struct RedisModuleKey { 149 RedisModuleCtx *ctx; 150 redisDb *db; 151 robj *key; /* Key name object. */ 152 robj *value; /* Value object, or NULL if the key was not found. */ 153 void *iter; /* Iterator. */ 154 int mode; /* Opening mode. */ 155 156 /* Zset iterator. */ 157 uint32_t ztype; /* REDISMODULE_ZSET_RANGE_* */ 158 zrangespec zrs; /* Score range. */ 159 zlexrangespec zlrs; /* Lex range. */ 160 uint32_t zstart; /* Start pos for positional ranges. */ 161 uint32_t zend; /* End pos for positional ranges. */ 162 void *zcurrent; /* Zset iterator current node. */ 163 int zer; /* Zset iterator end reached flag 164 (true if end was reached). */ 165 }; 166 typedef struct RedisModuleKey RedisModuleKey; 167 168 /* RedisModuleKey 'ztype' values. */ 169 #define REDISMODULE_ZSET_RANGE_NONE 0 /* This must always be 0. */ 170 #define REDISMODULE_ZSET_RANGE_LEX 1 171 #define REDISMODULE_ZSET_RANGE_SCORE 2 172 #define REDISMODULE_ZSET_RANGE_POS 3 173 174 /* Function pointer type of a function representing a command inside 175 * a Redis module. */ 176 struct RedisModuleBlockedClient; 177 typedef int (*RedisModuleCmdFunc) (RedisModuleCtx *ctx, void **argv, int argc); 178 typedef void (*RedisModuleDisconnectFunc) (RedisModuleCtx *ctx, struct RedisModuleBlockedClient *bc); 179 180 /* This struct holds the information about a command registered by a module.*/ 181 struct RedisModuleCommandProxy { 182 struct RedisModule *module; 183 RedisModuleCmdFunc func; 184 struct redisCommand *rediscmd; 185 }; 186 typedef struct RedisModuleCommandProxy RedisModuleCommandProxy; 187 188 #define REDISMODULE_REPLYFLAG_NONE 0 189 #define REDISMODULE_REPLYFLAG_TOPARSE (1<<0) /* Protocol must be parsed. */ 190 #define REDISMODULE_REPLYFLAG_NESTED (1<<1) /* Nested reply object. No proto 191 or struct free. */ 192 193 /* Reply of RM_Call() function. The function is filled in a lazy 194 * way depending on the function called on the reply structure. By default 195 * only the type, proto and protolen are filled. */ 196 typedef struct RedisModuleCallReply { 197 RedisModuleCtx *ctx; 198 int type; /* REDISMODULE_REPLY_... */ 199 int flags; /* REDISMODULE_REPLYFLAG_... */ 200 size_t len; /* Len of strings or num of elements of arrays. */ 201 char *proto; /* Raw reply protocol. An SDS string at top-level object. */ 202 size_t protolen;/* Length of protocol. */ 203 union { 204 const char *str; /* String pointer for string and error replies. This 205 does not need to be freed, always points inside 206 a reply->proto buffer of the reply object or, in 207 case of array elements, of parent reply objects. */ 208 long long ll; /* Reply value for integer reply. */ 209 struct RedisModuleCallReply *array; /* Array of sub-reply elements. */ 210 } val; 211 } RedisModuleCallReply; 212 213 /* Structure representing a blocked client. We get a pointer to such 214 * an object when blocking from modules. */ 215 typedef struct RedisModuleBlockedClient { 216 client *client; /* Pointer to the blocked client. or NULL if the client 217 was destroyed during the life of this object. */ 218 RedisModule *module; /* Module blocking the client. */ 219 RedisModuleCmdFunc reply_callback; /* Reply callback on normal completion.*/ 220 RedisModuleCmdFunc timeout_callback; /* Reply callback on timeout. */ 221 RedisModuleDisconnectFunc disconnect_callback; /* Called on disconnection.*/ 222 void (*free_privdata)(RedisModuleCtx*,void*);/* privdata cleanup callback.*/ 223 void *privdata; /* Module private data that may be used by the reply 224 or timeout callback. It is set via the 225 RedisModule_UnblockClient() API. */ 226 client *reply_client; /* Fake client used to accumulate replies 227 in thread safe contexts. */ 228 int dbid; /* Database number selected by the original client. */ 229 } RedisModuleBlockedClient; 230 231 static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER; 232 static list *moduleUnblockedClients; 233 234 /* We need a mutex that is unlocked / relocked in beforeSleep() in order to 235 * allow thread safe contexts to execute commands at a safe moment. */ 236 static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER; 237 238 239 /* Function pointer type for keyspace event notification subscriptions from modules. */ 240 typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key); 241 242 /* Keyspace notification subscriber information. 243 * See RM_SubscribeToKeyspaceEvents() for more information. */ 244 typedef struct RedisModuleKeyspaceSubscriber { 245 /* The module subscribed to the event */ 246 RedisModule *module; 247 /* Notification callback in the module*/ 248 RedisModuleNotificationFunc notify_callback; 249 /* A bit mask of the events the module is interested in */ 250 int event_mask; 251 /* Active flag set on entry, to avoid reentrant subscribers 252 * calling themselves */ 253 int active; 254 } RedisModuleKeyspaceSubscriber; 255 256 /* The module keyspace notification subscribers list */ 257 static list *moduleKeyspaceSubscribers; 258 259 /* Static client recycled for when we need to provide a context with a client 260 * in a situation where there is no client to provide. This avoidsallocating 261 * a new client per round. For instance this is used in the keyspace 262 * notifications, timers and cluster messages callbacks. */ 263 static client *moduleFreeContextReusedClient; 264 265 /* Data structures related to the exported dictionary data structure. */ 266 typedef struct RedisModuleDict { 267 rax *rax; /* The radix tree. */ 268 } RedisModuleDict; 269 270 typedef struct RedisModuleDictIter { 271 RedisModuleDict *dict; 272 raxIterator ri; 273 } RedisModuleDictIter; 274 275 typedef struct RedisModuleCommandFilterCtx { 276 RedisModuleString **argv; 277 int argc; 278 } RedisModuleCommandFilterCtx; 279 280 typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter); 281 282 typedef struct RedisModuleCommandFilter { 283 /* The module that registered the filter */ 284 RedisModule *module; 285 /* Filter callback function */ 286 RedisModuleCommandFilterFunc callback; 287 /* REDISMODULE_CMDFILTER_* flags */ 288 int flags; 289 } RedisModuleCommandFilter; 290 291 /* Registered filters */ 292 static list *moduleCommandFilters; 293 294 /* -------------------------------------------------------------------------- 295 * Prototypes 296 * -------------------------------------------------------------------------- */ 297 298 void RM_FreeCallReply(RedisModuleCallReply *reply); 299 void RM_CloseKey(RedisModuleKey *key); 300 void autoMemoryCollect(RedisModuleCtx *ctx); 301 robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int *argcp, int *flags, va_list ap); 302 void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx); 303 void RM_ZsetRangeStop(RedisModuleKey *kp); 304 static void zsetKeyReset(RedisModuleKey *key); 305 void RM_FreeDict(RedisModuleCtx *ctx, RedisModuleDict *d); 306 307 /* -------------------------------------------------------------------------- 308 * Heap allocation raw functions 309 * -------------------------------------------------------------------------- */ 310 311 /* Use like malloc(). Memory allocated with this function is reported in 312 * Redis INFO memory, used for keys eviction according to maxmemory settings 313 * and in general is taken into account as memory allocated by Redis. 314 * You should avoid using malloc(). */ 315 void *RM_Alloc(size_t bytes) { 316 return zmalloc(bytes); 317 } 318 319 /* Use like calloc(). Memory allocated with this function is reported in 320 * Redis INFO memory, used for keys eviction according to maxmemory settings 321 * and in general is taken into account as memory allocated by Redis. 322 * You should avoid using calloc() directly. */ 323 void *RM_Calloc(size_t nmemb, size_t size) { 324 return zcalloc(nmemb*size); 325 } 326 327 /* Use like realloc() for memory obtained with RedisModule_Alloc(). */ 328 void* RM_Realloc(void *ptr, size_t bytes) { 329 return zrealloc(ptr,bytes); 330 } 331 332 /* Use like free() for memory obtained by RedisModule_Alloc() and 333 * RedisModule_Realloc(). However you should never try to free with 334 * RedisModule_Free() memory allocated with malloc() inside your module. */ 335 void RM_Free(void *ptr) { 336 zfree(ptr); 337 } 338 339 /* Like strdup() but returns memory allocated with RedisModule_Alloc(). */ 340 char *RM_Strdup(const char *str) { 341 return zstrdup(str); 342 } 343 344 /* -------------------------------------------------------------------------- 345 * Pool allocator 346 * -------------------------------------------------------------------------- */ 347 348 /* Release the chain of blocks used for pool allocations. */ 349 void poolAllocRelease(RedisModuleCtx *ctx) { 350 RedisModulePoolAllocBlock *head = ctx->pa_head, *next; 351 352 while(head != NULL) { 353 next = head->next; 354 zfree(head); 355 head = next; 356 } 357 ctx->pa_head = NULL; 358 } 359 360 /* Return heap allocated memory that will be freed automatically when the 361 * module callback function returns. Mostly suitable for small allocations 362 * that are short living and must be released when the callback returns 363 * anyway. The returned memory is aligned to the architecture word size 364 * if at least word size bytes are requested, otherwise it is just 365 * aligned to the next power of two, so for example a 3 bytes request is 366 * 4 bytes aligned while a 2 bytes request is 2 bytes aligned. 367 * 368 * There is no realloc style function since when this is needed to use the 369 * pool allocator is not a good idea. 370 * 371 * The function returns NULL if `bytes` is 0. */ 372 void *RM_PoolAlloc(RedisModuleCtx *ctx, size_t bytes) { 373 if (bytes == 0) return NULL; 374 RedisModulePoolAllocBlock *b = ctx->pa_head; 375 size_t left = b ? b->size - b->used : 0; 376 377 /* Fix alignment. */ 378 if (left >= bytes) { 379 size_t alignment = REDISMODULE_POOL_ALLOC_ALIGN; 380 while (bytes < alignment && alignment/2 >= bytes) alignment /= 2; 381 if (b->used % alignment) 382 b->used += alignment - (b->used % alignment); 383 left = (b->used > b->size) ? 0 : b->size - b->used; 384 } 385 386 /* Create a new block if needed. */ 387 if (left < bytes) { 388 size_t blocksize = REDISMODULE_POOL_ALLOC_MIN_SIZE; 389 if (blocksize < bytes) blocksize = bytes; 390 b = zmalloc(sizeof(*b) + blocksize); 391 b->size = blocksize; 392 b->used = 0; 393 b->next = ctx->pa_head; 394 ctx->pa_head = b; 395 } 396 397 char *retval = b->memory + b->used; 398 b->used += bytes; 399 return retval; 400 } 401 402 /* -------------------------------------------------------------------------- 403 * Helpers for modules API implementation 404 * -------------------------------------------------------------------------- */ 405 406 /* Create an empty key of the specified type. 'kp' must point to a key object 407 * opened for writing where the .value member is set to NULL because the 408 * key was found to be non existing. 409 * 410 * On success REDISMODULE_OK is returned and the key is populated with 411 * the value of the specified type. The function fails and returns 412 * REDISMODULE_ERR if: 413 * 414 * 1) The key is not open for writing. 415 * 2) The key is not empty. 416 * 3) The specified type is unknown. 417 */ 418 int moduleCreateEmptyKey(RedisModuleKey *key, int type) { 419 robj *obj; 420 421 /* The key must be open for writing and non existing to proceed. */ 422 if (!(key->mode & REDISMODULE_WRITE) || key->value) 423 return REDISMODULE_ERR; 424 425 switch(type) { 426 case REDISMODULE_KEYTYPE_LIST: 427 obj = createQuicklistObject(); 428 quicklistSetOptions(obj->ptr, server.list_max_ziplist_size, 429 server.list_compress_depth); 430 break; 431 case REDISMODULE_KEYTYPE_ZSET: 432 obj = createZsetZiplistObject(); 433 break; 434 case REDISMODULE_KEYTYPE_HASH: 435 obj = createHashObject(); 436 break; 437 default: return REDISMODULE_ERR; 438 } 439 dbAdd(key->db,key->key,obj); 440 key->value = obj; 441 return REDISMODULE_OK; 442 } 443 444 /* This function is called in low-level API implementation functions in order 445 * to check if the value associated with the key remained empty after an 446 * operation that removed elements from an aggregate data type. 447 * 448 * If this happens, the key is deleted from the DB and the key object state 449 * is set to the right one in order to be targeted again by write operations 450 * possibly recreating the key if needed. 451 * 452 * The function returns 1 if the key value object is found empty and is 453 * deleted, otherwise 0 is returned. */ 454 int moduleDelKeyIfEmpty(RedisModuleKey *key) { 455 if (!(key->mode & REDISMODULE_WRITE) || key->value == NULL) return 0; 456 int isempty; 457 robj *o = key->value; 458 459 switch(o->type) { 460 case OBJ_LIST: isempty = listTypeLength(o) == 0; break; 461 case OBJ_SET: isempty = setTypeSize(o) == 0; break; 462 case OBJ_ZSET: isempty = zsetLength(o) == 0; break; 463 case OBJ_HASH : isempty = hashTypeLength(o) == 0; break; 464 default: isempty = 0; 465 } 466 467 if (isempty) { 468 dbDelete(key->db,key->key); 469 key->value = NULL; 470 return 1; 471 } else { 472 return 0; 473 } 474 } 475 476 /* -------------------------------------------------------------------------- 477 * Service API exported to modules 478 * 479 * Note that all the exported APIs are called RM_<funcname> in the core 480 * and RedisModule_<funcname> in the module side (defined as function 481 * pointers in redismodule.h). In this way the dynamic linker does not 482 * mess with our global function pointers, overriding it with the symbols 483 * defined in the main executable having the same names. 484 * -------------------------------------------------------------------------- */ 485 486 /* Lookup the requested module API and store the function pointer into the 487 * target pointer. The function returns REDISMODULE_ERR if there is no such 488 * named API, otherwise REDISMODULE_OK. 489 * 490 * This function is not meant to be used by modules developer, it is only 491 * used implicitly by including redismodule.h. */ 492 int RM_GetApi(const char *funcname, void **targetPtrPtr) { 493 dictEntry *he = dictFind(server.moduleapi, funcname); 494 if (!he) return REDISMODULE_ERR; 495 *targetPtrPtr = dictGetVal(he); 496 return REDISMODULE_OK; 497 } 498 499 /* Free the context after the user function was called. */ 500 void moduleFreeContext(RedisModuleCtx *ctx) { 501 autoMemoryCollect(ctx); 502 poolAllocRelease(ctx); 503 if (ctx->postponed_arrays) { 504 zfree(ctx->postponed_arrays); 505 ctx->postponed_arrays_count = 0; 506 serverLog(LL_WARNING, 507 "API misuse detected in module %s: " 508 "RedisModule_ReplyWithArray(REDISMODULE_POSTPONED_ARRAY_LEN) " 509 "not matched by the same number of RedisModule_SetReplyArrayLen() " 510 "calls.", 511 ctx->module->name); 512 } 513 if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) freeClient(ctx->client); 514 } 515 516 /* Helper function for when a command callback is called, in order to handle 517 * details needed to correctly replicate commands. */ 518 void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) { 519 client *c = ctx->client; 520 521 if (c->flags & CLIENT_LUA) return; 522 523 /* Handle the replication of the final EXEC, since whatever a command 524 * emits is always wrapped around MULTI/EXEC. */ 525 if (ctx->flags & REDISMODULE_CTX_MULTI_EMITTED) { 526 robj *propargv[1]; 527 propargv[0] = createStringObject("EXEC",4); 528 alsoPropagate(server.execCommand,c->db->id,propargv,1, 529 PROPAGATE_AOF|PROPAGATE_REPL); 530 decrRefCount(propargv[0]); 531 } 532 } 533 534 /* This Redis command binds the normal Redis command invocation with commands 535 * exported by modules. */ 536 void RedisModuleCommandDispatcher(client *c) { 537 RedisModuleCommandProxy *cp = (void*)(unsigned long)c->cmd->getkeys_proc; 538 RedisModuleCtx ctx = REDISMODULE_CTX_INIT; 539 540 ctx.module = cp->module; 541 ctx.client = c; 542 cp->func(&ctx,(void**)c->argv,c->argc); 543 moduleHandlePropagationAfterCommandCallback(&ctx); 544 moduleFreeContext(&ctx); 545 546 /* In some cases processMultibulkBuffer uses sdsMakeRoomFor to 547 * expand the query buffer, and in order to avoid a big object copy 548 * the query buffer SDS may be used directly as the SDS string backing 549 * the client argument vectors: sometimes this will result in the SDS 550 * string having unused space at the end. Later if a module takes ownership 551 * of the RedisString, such space will be wasted forever. Inside the 552 * Redis core this is not a problem because tryObjectEncoding() is called 553 * before storing strings in the key space. Here we need to do it 554 * for the module. */ 555 for (int i = 0; i < c->argc; i++) { 556 /* Only do the work if the module took ownership of the object: 557 * in that case the refcount is no longer 1. */ 558 if (c->argv[i]->refcount > 1) 559 trimStringObjectIfNeeded(c->argv[i]); 560 } 561 } 562 563 /* This function returns the list of keys, with the same interface as the 564 * 'getkeys' function of the native commands, for module commands that exported 565 * the "getkeys-api" flag during the registration. This is done when the 566 * list of keys are not at fixed positions, so that first/last/step cannot 567 * be used. 568 * 569 * In order to accomplish its work, the module command is called, flagging 570 * the context in a way that the command can recognize this is a special 571 * "get keys" call by calling RedisModule_IsKeysPositionRequest(ctx). */ 572 int *moduleGetCommandKeysViaAPI(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) { 573 RedisModuleCommandProxy *cp = (void*)(unsigned long)cmd->getkeys_proc; 574 RedisModuleCtx ctx = REDISMODULE_CTX_INIT; 575 576 ctx.module = cp->module; 577 ctx.client = NULL; 578 ctx.flags |= REDISMODULE_CTX_KEYS_POS_REQUEST; 579 cp->func(&ctx,(void**)argv,argc); 580 int *res = ctx.keys_pos; 581 if (numkeys) *numkeys = ctx.keys_count; 582 moduleFreeContext(&ctx); 583 return res; 584 } 585 586 /* Return non-zero if a module command, that was declared with the 587 * flag "getkeys-api", is called in a special way to get the keys positions 588 * and not to get executed. Otherwise zero is returned. */ 589 int RM_IsKeysPositionRequest(RedisModuleCtx *ctx) { 590 return (ctx->flags & REDISMODULE_CTX_KEYS_POS_REQUEST) != 0; 591 } 592 593 /* When a module command is called in order to obtain the position of 594 * keys, since it was flagged as "getkeys-api" during the registration, 595 * the command implementation checks for this special call using the 596 * RedisModule_IsKeysPositionRequest() API and uses this function in 597 * order to report keys, like in the following example: 598 * 599 * if (RedisModule_IsKeysPositionRequest(ctx)) { 600 * RedisModule_KeyAtPos(ctx,1); 601 * RedisModule_KeyAtPos(ctx,2); 602 * } 603 * 604 * Note: in the example below the get keys API would not be needed since 605 * keys are at fixed positions. This interface is only used for commands 606 * with a more complex structure. */ 607 void RM_KeyAtPos(RedisModuleCtx *ctx, int pos) { 608 if (!(ctx->flags & REDISMODULE_CTX_KEYS_POS_REQUEST)) return; 609 if (pos <= 0) return; 610 ctx->keys_pos = zrealloc(ctx->keys_pos,sizeof(int)*(ctx->keys_count+1)); 611 ctx->keys_pos[ctx->keys_count++] = pos; 612 } 613 614 /* Helper for RM_CreateCommand(). Turns a string representing command 615 * flags into the command flags used by the Redis core. 616 * 617 * It returns the set of flags, or -1 if unknown flags are found. */ 618 int commandFlagsFromString(char *s) { 619 int count, j; 620 int flags = 0; 621 sds *tokens = sdssplitlen(s,strlen(s)," ",1,&count); 622 for (j = 0; j < count; j++) { 623 char *t = tokens[j]; 624 if (!strcasecmp(t,"write")) flags |= CMD_WRITE; 625 else if (!strcasecmp(t,"readonly")) flags |= CMD_READONLY; 626 else if (!strcasecmp(t,"admin")) flags |= CMD_ADMIN; 627 else if (!strcasecmp(t,"deny-oom")) flags |= CMD_DENYOOM; 628 else if (!strcasecmp(t,"deny-script")) flags |= CMD_NOSCRIPT; 629 else if (!strcasecmp(t,"allow-loading")) flags |= CMD_LOADING; 630 else if (!strcasecmp(t,"pubsub")) flags |= CMD_PUBSUB; 631 else if (!strcasecmp(t,"random")) flags |= CMD_RANDOM; 632 else if (!strcasecmp(t,"allow-stale")) flags |= CMD_STALE; 633 else if (!strcasecmp(t,"no-monitor")) flags |= CMD_SKIP_MONITOR; 634 else if (!strcasecmp(t,"fast")) flags |= CMD_FAST; 635 else if (!strcasecmp(t,"getkeys-api")) flags |= CMD_MODULE_GETKEYS; 636 else if (!strcasecmp(t,"no-cluster")) flags |= CMD_MODULE_NO_CLUSTER; 637 else break; 638 } 639 sdsfreesplitres(tokens,count); 640 if (j != count) return -1; /* Some token not processed correctly. */ 641 return flags; 642 } 643 644 /* Register a new command in the Redis server, that will be handled by 645 * calling the function pointer 'func' using the RedisModule calling 646 * convention. The function returns REDISMODULE_ERR if the specified command 647 * name is already busy or a set of invalid flags were passed, otherwise 648 * REDISMODULE_OK is returned and the new command is registered. 649 * 650 * This function must be called during the initialization of the module 651 * inside the RedisModule_OnLoad() function. Calling this function outside 652 * of the initialization function is not defined. 653 * 654 * The command function type is the following: 655 * 656 * int MyCommand_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); 657 * 658 * And is supposed to always return REDISMODULE_OK. 659 * 660 * The set of flags 'strflags' specify the behavior of the command, and should 661 * be passed as a C string composed of space separated words, like for 662 * example "write deny-oom". The set of flags are: 663 * 664 * * **"write"**: The command may modify the data set (it may also read 665 * from it). 666 * * **"readonly"**: The command returns data from keys but never writes. 667 * * **"admin"**: The command is an administrative command (may change 668 * replication or perform similar tasks). 669 * * **"deny-oom"**: The command may use additional memory and should be 670 * denied during out of memory conditions. 671 * * **"deny-script"**: Don't allow this command in Lua scripts. 672 * * **"allow-loading"**: Allow this command while the server is loading data. 673 * Only commands not interacting with the data set 674 * should be allowed to run in this mode. If not sure 675 * don't use this flag. 676 * * **"pubsub"**: The command publishes things on Pub/Sub channels. 677 * * **"random"**: The command may have different outputs even starting 678 * from the same input arguments and key values. 679 * * **"allow-stale"**: The command is allowed to run on slaves that don't 680 * serve stale data. Don't use if you don't know what 681 * this means. 682 * * **"no-monitor"**: Don't propagate the command on monitor. Use this if 683 * the command has sensible data among the arguments. 684 * * **"fast"**: The command time complexity is not greater 685 * than O(log(N)) where N is the size of the collection or 686 * anything else representing the normal scalability 687 * issue with the command. 688 * * **"getkeys-api"**: The command implements the interface to return 689 * the arguments that are keys. Used when start/stop/step 690 * is not enough because of the command syntax. 691 * * **"no-cluster"**: The command should not register in Redis Cluster 692 * since is not designed to work with it because, for 693 * example, is unable to report the position of the 694 * keys, programmatically creates key names, or any 695 * other reason. 696 */ 697 int RM_CreateCommand(RedisModuleCtx *ctx, const char *name, RedisModuleCmdFunc cmdfunc, const char *strflags, int firstkey, int lastkey, int keystep) { 698 int flags = strflags ? commandFlagsFromString((char*)strflags) : 0; 699 if (flags == -1) return REDISMODULE_ERR; 700 if ((flags & CMD_MODULE_NO_CLUSTER) && server.cluster_enabled) 701 return REDISMODULE_ERR; 702 703 struct redisCommand *rediscmd; 704 RedisModuleCommandProxy *cp; 705 sds cmdname = sdsnew(name); 706 707 /* Check if the command name is busy. */ 708 if (lookupCommand(cmdname) != NULL) { 709 sdsfree(cmdname); 710 return REDISMODULE_ERR; 711 } 712 713 /* Create a command "proxy", which is a structure that is referenced 714 * in the command table, so that the generic command that works as 715 * binding between modules and Redis, can know what function to call 716 * and what the module is. 717 * 718 * Note that we use the Redis command table 'getkeys_proc' in order to 719 * pass a reference to the command proxy structure. */ 720 cp = zmalloc(sizeof(*cp)); 721 cp->module = ctx->module; 722 cp->func = cmdfunc; 723 cp->rediscmd = zmalloc(sizeof(*rediscmd)); 724 cp->rediscmd->name = cmdname; 725 cp->rediscmd->proc = RedisModuleCommandDispatcher; 726 cp->rediscmd->arity = -1; 727 cp->rediscmd->flags = flags | CMD_MODULE; 728 cp->rediscmd->getkeys_proc = (redisGetKeysProc*)(unsigned long)cp; 729 cp->rediscmd->firstkey = firstkey; 730 cp->rediscmd->lastkey = lastkey; 731 cp->rediscmd->keystep = keystep; 732 cp->rediscmd->microseconds = 0; 733 cp->rediscmd->calls = 0; 734 dictAdd(server.commands,sdsdup(cmdname),cp->rediscmd); 735 dictAdd(server.orig_commands,sdsdup(cmdname),cp->rediscmd); 736 return REDISMODULE_OK; 737 } 738 739 /* Called by RM_Init() to setup the `ctx->module` structure. 740 * 741 * This is an internal function, Redis modules developers don't need 742 * to use it. */ 743 void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int apiver) { 744 RedisModule *module; 745 746 if (ctx->module != NULL) return; 747 module = zmalloc(sizeof(*module)); 748 module->name = sdsnew((char*)name); 749 module->ver = ver; 750 module->apiver = apiver; 751 module->types = listCreate(); 752 module->usedby = listCreate(); 753 module->using = listCreate(); 754 module->filters = listCreate(); 755 module->in_call = 0; 756 ctx->module = module; 757 } 758 759 /* Return non-zero if the module name is busy. 760 * Otherwise zero is returned. */ 761 int RM_IsModuleNameBusy(const char *name) { 762 sds modulename = sdsnew(name); 763 dictEntry *de = dictFind(modules,modulename); 764 sdsfree(modulename); 765 return de != NULL; 766 } 767 768 /* Return the current UNIX time in milliseconds. */ 769 long long RM_Milliseconds(void) { 770 return mstime(); 771 } 772 773 /* -------------------------------------------------------------------------- 774 * Automatic memory management for modules 775 * -------------------------------------------------------------------------- */ 776 777 /* Enable automatic memory management. See API.md for more information. 778 * 779 * The function must be called as the first function of a command implementation 780 * that wants to use automatic memory. */ 781 void RM_AutoMemory(RedisModuleCtx *ctx) { 782 ctx->flags |= REDISMODULE_CTX_AUTO_MEMORY; 783 } 784 785 /* Add a new object to release automatically when the callback returns. */ 786 void autoMemoryAdd(RedisModuleCtx *ctx, int type, void *ptr) { 787 if (!(ctx->flags & REDISMODULE_CTX_AUTO_MEMORY)) return; 788 if (ctx->amqueue_used == ctx->amqueue_len) { 789 ctx->amqueue_len *= 2; 790 if (ctx->amqueue_len < 16) ctx->amqueue_len = 16; 791 ctx->amqueue = zrealloc(ctx->amqueue,sizeof(struct AutoMemEntry)*ctx->amqueue_len); 792 } 793 ctx->amqueue[ctx->amqueue_used].type = type; 794 ctx->amqueue[ctx->amqueue_used].ptr = ptr; 795 ctx->amqueue_used++; 796 } 797 798 /* Mark an object as freed in the auto release queue, so that users can still 799 * free things manually if they want. 800 * 801 * The function returns 1 if the object was actually found in the auto memory 802 * pool, otherwise 0 is returned. */ 803 int autoMemoryFreed(RedisModuleCtx *ctx, int type, void *ptr) { 804 if (!(ctx->flags & REDISMODULE_CTX_AUTO_MEMORY)) return 0; 805 806 int count = (ctx->amqueue_used+1)/2; 807 for (int j = 0; j < count; j++) { 808 for (int side = 0; side < 2; side++) { 809 /* For side = 0 check right side of the array, for 810 * side = 1 check the left side instead (zig-zag scanning). */ 811 int i = (side == 0) ? (ctx->amqueue_used - 1 - j) : j; 812 if (ctx->amqueue[i].type == type && 813 ctx->amqueue[i].ptr == ptr) 814 { 815 ctx->amqueue[i].type = REDISMODULE_AM_FREED; 816 817 /* Switch the freed element and the last element, to avoid growing 818 * the queue unnecessarily if we allocate/free in a loop */ 819 if (i != ctx->amqueue_used-1) { 820 ctx->amqueue[i] = ctx->amqueue[ctx->amqueue_used-1]; 821 } 822 823 /* Reduce the size of the queue because we either moved the top 824 * element elsewhere or freed it */ 825 ctx->amqueue_used--; 826 return 1; 827 } 828 } 829 } 830 return 0; 831 } 832 833 /* Release all the objects in queue. */ 834 void autoMemoryCollect(RedisModuleCtx *ctx) { 835 if (!(ctx->flags & REDISMODULE_CTX_AUTO_MEMORY)) return; 836 /* Clear the AUTO_MEMORY flag from the context, otherwise the functions 837 * we call to free the resources, will try to scan the auto release 838 * queue to mark the entries as freed. */ 839 ctx->flags &= ~REDISMODULE_CTX_AUTO_MEMORY; 840 int j; 841 for (j = 0; j < ctx->amqueue_used; j++) { 842 void *ptr = ctx->amqueue[j].ptr; 843 switch(ctx->amqueue[j].type) { 844 case REDISMODULE_AM_STRING: decrRefCount(ptr); break; 845 case REDISMODULE_AM_REPLY: RM_FreeCallReply(ptr); break; 846 case REDISMODULE_AM_KEY: RM_CloseKey(ptr); break; 847 case REDISMODULE_AM_DICT: RM_FreeDict(NULL,ptr); break; 848 } 849 } 850 ctx->flags |= REDISMODULE_CTX_AUTO_MEMORY; 851 zfree(ctx->amqueue); 852 ctx->amqueue = NULL; 853 ctx->amqueue_len = 0; 854 ctx->amqueue_used = 0; 855 } 856 857 /* -------------------------------------------------------------------------- 858 * String objects APIs 859 * -------------------------------------------------------------------------- */ 860 861 /* Create a new module string object. The returned string must be freed 862 * with RedisModule_FreeString(), unless automatic memory is enabled. 863 * 864 * The string is created by copying the `len` bytes starting 865 * at `ptr`. No reference is retained to the passed buffer. 866 * 867 * The module context 'ctx' is optional and may be NULL if you want to create 868 * a string out of the context scope. However in that case, the automatic 869 * memory management will not be available, and the string memory must be 870 * managed manually. */ 871 RedisModuleString *RM_CreateString(RedisModuleCtx *ctx, const char *ptr, size_t len) { 872 RedisModuleString *o = createStringObject(ptr,len); 873 if (ctx != NULL) autoMemoryAdd(ctx,REDISMODULE_AM_STRING,o); 874 return o; 875 } 876 877 /* Create a new module string object from a printf format and arguments. 878 * The returned string must be freed with RedisModule_FreeString(), unless 879 * automatic memory is enabled. 880 * 881 * The string is created using the sds formatter function sdscatvprintf(). 882 * 883 * The passed context 'ctx' may be NULL if necessary, see the 884 * RedisModule_CreateString() documentation for more info. */ 885 RedisModuleString *RM_CreateStringPrintf(RedisModuleCtx *ctx, const char *fmt, ...) { 886 sds s = sdsempty(); 887 888 va_list ap; 889 va_start(ap, fmt); 890 s = sdscatvprintf(s, fmt, ap); 891 va_end(ap); 892 893 RedisModuleString *o = createObject(OBJ_STRING, s); 894 if (ctx != NULL) autoMemoryAdd(ctx,REDISMODULE_AM_STRING,o); 895 896 return o; 897 } 898 899 900 /* Like RedisModule_CreatString(), but creates a string starting from a long long 901 * integer instead of taking a buffer and its length. 902 * 903 * The returned string must be released with RedisModule_FreeString() or by 904 * enabling automatic memory management. 905 * 906 * The passed context 'ctx' may be NULL if necessary, see the 907 * RedisModule_CreateString() documentation for more info. */ 908 RedisModuleString *RM_CreateStringFromLongLong(RedisModuleCtx *ctx, long long ll) { 909 char buf[LONG_STR_SIZE]; 910 size_t len = ll2string(buf,sizeof(buf),ll); 911 return RM_CreateString(ctx,buf,len); 912 } 913 914 /* Like RedisModule_CreatString(), but creates a string starting from another 915 * RedisModuleString. 916 * 917 * The returned string must be released with RedisModule_FreeString() or by 918 * enabling automatic memory management. 919 * 920 * The passed context 'ctx' may be NULL if necessary, see the 921 * RedisModule_CreateString() documentation for more info. */ 922 RedisModuleString *RM_CreateStringFromString(RedisModuleCtx *ctx, const RedisModuleString *str) { 923 RedisModuleString *o = dupStringObject(str); 924 if (ctx != NULL) autoMemoryAdd(ctx,REDISMODULE_AM_STRING,o); 925 return o; 926 } 927 928 /* Free a module string object obtained with one of the Redis modules API calls 929 * that return new string objects. 930 * 931 * It is possible to call this function even when automatic memory management 932 * is enabled. In that case the string will be released ASAP and removed 933 * from the pool of string to release at the end. 934 * 935 * If the string was created with a NULL context 'ctx', it is also possible to 936 * pass ctx as NULL when releasing the string (but passing a context will not 937 * create any issue). Strings created with a context should be freed also passing 938 * the context, so if you want to free a string out of context later, make sure 939 * to create it using a NULL context. */ 940 void RM_FreeString(RedisModuleCtx *ctx, RedisModuleString *str) { 941 decrRefCount(str); 942 if (ctx != NULL) autoMemoryFreed(ctx,REDISMODULE_AM_STRING,str); 943 } 944 945 /* Every call to this function, will make the string 'str' requiring 946 * an additional call to RedisModule_FreeString() in order to really 947 * free the string. Note that the automatic freeing of the string obtained 948 * enabling modules automatic memory management counts for one 949 * RedisModule_FreeString() call (it is just executed automatically). 950 * 951 * Normally you want to call this function when, at the same time 952 * the following conditions are true: 953 * 954 * 1) You have automatic memory management enabled. 955 * 2) You want to create string objects. 956 * 3) Those string objects you create need to live *after* the callback 957 * function(for example a command implementation) creating them returns. 958 * 959 * Usually you want this in order to store the created string object 960 * into your own data structure, for example when implementing a new data 961 * type. 962 * 963 * Note that when memory management is turned off, you don't need 964 * any call to RetainString() since creating a string will always result 965 * into a string that lives after the callback function returns, if 966 * no FreeString() call is performed. 967 * 968 * It is possible to call this function with a NULL context. */ 969 void RM_RetainString(RedisModuleCtx *ctx, RedisModuleString *str) { 970 if (ctx == NULL || !autoMemoryFreed(ctx,REDISMODULE_AM_STRING,str)) { 971 /* Increment the string reference counting only if we can't 972 * just remove the object from the list of objects that should 973 * be reclaimed. Why we do that, instead of just incrementing 974 * the refcount in any case, and let the automatic FreeString() 975 * call at the end to bring the refcount back at the desired 976 * value? Because this way we ensure that the object refcount 977 * value is 1 (instead of going to 2 to be dropped later to 1) 978 * after the call to this function. This is needed for functions 979 * like RedisModule_StringAppendBuffer() to work. */ 980 incrRefCount(str); 981 } 982 } 983 984 /* Given a string module object, this function returns the string pointer 985 * and length of the string. The returned pointer and length should only 986 * be used for read only accesses and never modified. */ 987 const char *RM_StringPtrLen(const RedisModuleString *str, size_t *len) { 988 if (str == NULL) { 989 const char *errmsg = "(NULL string reply referenced in module)"; 990 if (len) *len = strlen(errmsg); 991 return errmsg; 992 } 993 if (len) *len = sdslen(str->ptr); 994 return str->ptr; 995 } 996 997 /* -------------------------------------------------------------------------- 998 * Higher level string operations 999 * ------------------------------------------------------------------------- */ 1000 1001 /* Convert the string into a long long integer, storing it at `*ll`. 1002 * Returns REDISMODULE_OK on success. If the string can't be parsed 1003 * as a valid, strict long long (no spaces before/after), REDISMODULE_ERR 1004 * is returned. */ 1005 int RM_StringToLongLong(const RedisModuleString *str, long long *ll) { 1006 return string2ll(str->ptr,sdslen(str->ptr),ll) ? REDISMODULE_OK : 1007 REDISMODULE_ERR; 1008 } 1009 1010 /* Convert the string into a double, storing it at `*d`. 1011 * Returns REDISMODULE_OK on success or REDISMODULE_ERR if the string is 1012 * not a valid string representation of a double value. */ 1013 int RM_StringToDouble(const RedisModuleString *str, double *d) { 1014 int retval = getDoubleFromObject(str,d); 1015 return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR; 1016 } 1017 1018 /* Compare two string objects, returning -1, 0 or 1 respectively if 1019 * a < b, a == b, a > b. Strings are compared byte by byte as two 1020 * binary blobs without any encoding care / collation attempt. */ 1021 int RM_StringCompare(RedisModuleString *a, RedisModuleString *b) { 1022 return compareStringObjects(a,b); 1023 } 1024 1025 /* Return the (possibly modified in encoding) input 'str' object if 1026 * the string is unshared, otherwise NULL is returned. */ 1027 RedisModuleString *moduleAssertUnsharedString(RedisModuleString *str) { 1028 if (str->refcount != 1) { 1029 serverLog(LL_WARNING, 1030 "Module attempted to use an in-place string modify operation " 1031 "with a string referenced multiple times. Please check the code " 1032 "for API usage correctness."); 1033 return NULL; 1034 } 1035 if (str->encoding == OBJ_ENCODING_EMBSTR) { 1036 /* Note: here we "leak" the additional allocation that was 1037 * used in order to store the embedded string in the object. */ 1038 str->ptr = sdsnewlen(str->ptr,sdslen(str->ptr)); 1039 str->encoding = OBJ_ENCODING_RAW; 1040 } else if (str->encoding == OBJ_ENCODING_INT) { 1041 /* Convert the string from integer to raw encoding. */ 1042 str->ptr = sdsfromlonglong((long)str->ptr); 1043 str->encoding = OBJ_ENCODING_RAW; 1044 } 1045 return str; 1046 } 1047 1048 /* Append the specified buffer to the string 'str'. The string must be a 1049 * string created by the user that is referenced only a single time, otherwise 1050 * REDISMODULE_ERR is returned and the operation is not performed. */ 1051 int RM_StringAppendBuffer(RedisModuleCtx *ctx, RedisModuleString *str, const char *buf, size_t len) { 1052 UNUSED(ctx); 1053 str = moduleAssertUnsharedString(str); 1054 if (str == NULL) return REDISMODULE_ERR; 1055 str->ptr = sdscatlen(str->ptr,buf,len); 1056 return REDISMODULE_OK; 1057 } 1058 1059 /* -------------------------------------------------------------------------- 1060 * Reply APIs 1061 * 1062 * Most functions always return REDISMODULE_OK so you can use it with 1063 * 'return' in order to return from the command implementation with: 1064 * 1065 * if (... some condition ...) 1066 * return RM_ReplyWithLongLong(ctx,mycount); 1067 * -------------------------------------------------------------------------- */ 1068 1069 /* Send an error about the number of arguments given to the command, 1070 * citing the command name in the error message. 1071 * 1072 * Example: 1073 * 1074 * if (argc != 3) return RedisModule_WrongArity(ctx); 1075 */ 1076 int RM_WrongArity(RedisModuleCtx *ctx) { 1077 addReplyErrorFormat(ctx->client, 1078 "wrong number of arguments for '%s' command", 1079 (char*)ctx->client->argv[0]->ptr); 1080 return REDISMODULE_OK; 1081 } 1082 1083 /* Return the client object the `RM_Reply*` functions should target. 1084 * Normally this is just `ctx->client`, that is the client that called 1085 * the module command, however in the case of thread safe contexts there 1086 * is no directly associated client (since it would not be safe to access 1087 * the client from a thread), so instead the blocked client object referenced 1088 * in the thread safe context, has a fake client that we just use to accumulate 1089 * the replies. Later, when the client is unblocked, the accumulated replies 1090 * are appended to the actual client. 1091 * 1092 * The function returns the client pointer depending on the context, or 1093 * NULL if there is no potential client. This happens when we are in the 1094 * context of a thread safe context that was not initialized with a blocked 1095 * client object. Other contexts without associated clients are the ones 1096 * initialized to run the timers callbacks. */ 1097 client *moduleGetReplyClient(RedisModuleCtx *ctx) { 1098 if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) { 1099 if (ctx->blocked_client) 1100 return ctx->blocked_client->reply_client; 1101 else 1102 return NULL; 1103 } else { 1104 /* If this is a non thread safe context, just return the client 1105 * that is running the command if any. This may be NULL as well 1106 * in the case of contexts that are not executed with associated 1107 * clients, like timer contexts. */ 1108 return ctx->client; 1109 } 1110 } 1111 1112 /* Send an integer reply to the client, with the specified long long value. 1113 * The function always returns REDISMODULE_OK. */ 1114 int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) { 1115 client *c = moduleGetReplyClient(ctx); 1116 if (c == NULL) return REDISMODULE_OK; 1117 addReplyLongLong(c,ll); 1118 return REDISMODULE_OK; 1119 } 1120 1121 /* Reply with an error or simple string (status message). Used to implement 1122 * ReplyWithSimpleString() and ReplyWithError(). 1123 * The function always returns REDISMODULE_OK. */ 1124 int replyWithStatus(RedisModuleCtx *ctx, const char *msg, char *prefix) { 1125 client *c = moduleGetReplyClient(ctx); 1126 if (c == NULL) return REDISMODULE_OK; 1127 sds strmsg = sdsnewlen(prefix,1); 1128 strmsg = sdscat(strmsg,msg); 1129 strmsg = sdscatlen(strmsg,"\r\n",2); 1130 addReplySds(c,strmsg); 1131 return REDISMODULE_OK; 1132 } 1133 1134 /* Reply with the error 'err'. 1135 * 1136 * Note that 'err' must contain all the error, including 1137 * the initial error code. The function only provides the initial "-", so 1138 * the usage is, for example: 1139 * 1140 * RedisModule_ReplyWithError(ctx,"ERR Wrong Type"); 1141 * 1142 * and not just: 1143 * 1144 * RedisModule_ReplyWithError(ctx,"Wrong Type"); 1145 * 1146 * The function always returns REDISMODULE_OK. 1147 */ 1148 int RM_ReplyWithError(RedisModuleCtx *ctx, const char *err) { 1149 return replyWithStatus(ctx,err,"-"); 1150 } 1151 1152 /* Reply with a simple string (+... \r\n in RESP protocol). This replies 1153 * are suitable only when sending a small non-binary string with small 1154 * overhead, like "OK" or similar replies. 1155 * 1156 * The function always returns REDISMODULE_OK. */ 1157 int RM_ReplyWithSimpleString(RedisModuleCtx *ctx, const char *msg) { 1158 return replyWithStatus(ctx,msg,"+"); 1159 } 1160 1161 /* Reply with an array type of 'len' elements. However 'len' other calls 1162 * to `ReplyWith*` style functions must follow in order to emit the elements 1163 * of the array. 1164 * 1165 * When producing arrays with a number of element that is not known beforehand 1166 * the function can be called with the special count 1167 * REDISMODULE_POSTPONED_ARRAY_LEN, and the actual number of elements can be 1168 * later set with RedisModule_ReplySetArrayLength() (which will set the 1169 * latest "open" count if there are multiple ones). 1170 * 1171 * The function always returns REDISMODULE_OK. */ 1172 int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) { 1173 client *c = moduleGetReplyClient(ctx); 1174 if (c == NULL) return REDISMODULE_OK; 1175 if (len == REDISMODULE_POSTPONED_ARRAY_LEN) { 1176 ctx->postponed_arrays = zrealloc(ctx->postponed_arrays,sizeof(void*)* 1177 (ctx->postponed_arrays_count+1)); 1178 ctx->postponed_arrays[ctx->postponed_arrays_count] = 1179 addDeferredMultiBulkLength(c); 1180 ctx->postponed_arrays_count++; 1181 } else { 1182 addReplyMultiBulkLen(c,len); 1183 } 1184 return REDISMODULE_OK; 1185 } 1186 1187 /* When RedisModule_ReplyWithArray() is used with the argument 1188 * REDISMODULE_POSTPONED_ARRAY_LEN, because we don't know beforehand the number 1189 * of items we are going to output as elements of the array, this function 1190 * will take care to set the array length. 1191 * 1192 * Since it is possible to have multiple array replies pending with unknown 1193 * length, this function guarantees to always set the latest array length 1194 * that was created in a postponed way. 1195 * 1196 * For example in order to output an array like [1,[10,20,30]] we 1197 * could write: 1198 * 1199 * RedisModule_ReplyWithArray(ctx,REDISMODULE_POSTPONED_ARRAY_LEN); 1200 * RedisModule_ReplyWithLongLong(ctx,1); 1201 * RedisModule_ReplyWithArray(ctx,REDISMODULE_POSTPONED_ARRAY_LEN); 1202 * RedisModule_ReplyWithLongLong(ctx,10); 1203 * RedisModule_ReplyWithLongLong(ctx,20); 1204 * RedisModule_ReplyWithLongLong(ctx,30); 1205 * RedisModule_ReplySetArrayLength(ctx,3); // Set len of 10,20,30 array. 1206 * RedisModule_ReplySetArrayLength(ctx,2); // Set len of top array 1207 * 1208 * Note that in the above example there is no reason to postpone the array 1209 * length, since we produce a fixed number of elements, but in the practice 1210 * the code may use an iterator or other ways of creating the output so 1211 * that is not easy to calculate in advance the number of elements. 1212 */ 1213 void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) { 1214 client *c = moduleGetReplyClient(ctx); 1215 if (c == NULL) return; 1216 if (ctx->postponed_arrays_count == 0) { 1217 serverLog(LL_WARNING, 1218 "API misuse detected in module %s: " 1219 "RedisModule_ReplySetArrayLength() called without previous " 1220 "RedisModule_ReplyWithArray(ctx,REDISMODULE_POSTPONED_ARRAY_LEN) " 1221 "call.", ctx->module->name); 1222 return; 1223 } 1224 ctx->postponed_arrays_count--; 1225 setDeferredMultiBulkLength(c, 1226 ctx->postponed_arrays[ctx->postponed_arrays_count], 1227 len); 1228 if (ctx->postponed_arrays_count == 0) { 1229 zfree(ctx->postponed_arrays); 1230 ctx->postponed_arrays = NULL; 1231 } 1232 } 1233 1234 /* Reply with a bulk string, taking in input a C buffer pointer and length. 1235 * 1236 * The function always returns REDISMODULE_OK. */ 1237 int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) { 1238 client *c = moduleGetReplyClient(ctx); 1239 if (c == NULL) return REDISMODULE_OK; 1240 addReplyBulkCBuffer(c,(char*)buf,len); 1241 return REDISMODULE_OK; 1242 } 1243 1244 /* Reply with a bulk string, taking in input a RedisModuleString object. 1245 * 1246 * The function always returns REDISMODULE_OK. */ 1247 int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) { 1248 client *c = moduleGetReplyClient(ctx); 1249 if (c == NULL) return REDISMODULE_OK; 1250 addReplyBulk(c,str); 1251 return REDISMODULE_OK; 1252 } 1253 1254 /* Reply to the client with a NULL. In the RESP protocol a NULL is encoded 1255 * as the string "$-1\r\n". 1256 * 1257 * The function always returns REDISMODULE_OK. */ 1258 int RM_ReplyWithNull(RedisModuleCtx *ctx) { 1259 client *c = moduleGetReplyClient(ctx); 1260 if (c == NULL) return REDISMODULE_OK; 1261 addReply(c,shared.nullbulk); 1262 return REDISMODULE_OK; 1263 } 1264 1265 /* Reply exactly what a Redis command returned us with RedisModule_Call(). 1266 * This function is useful when we use RedisModule_Call() in order to 1267 * execute some command, as we want to reply to the client exactly the 1268 * same reply we obtained by the command. 1269 * 1270 * The function always returns REDISMODULE_OK. */ 1271 int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) { 1272 client *c = moduleGetReplyClient(ctx); 1273 if (c == NULL) return REDISMODULE_OK; 1274 sds proto = sdsnewlen(reply->proto, reply->protolen); 1275 addReplySds(c,proto); 1276 return REDISMODULE_OK; 1277 } 1278 1279 /* Send a string reply obtained converting the double 'd' into a bulk string. 1280 * This function is basically equivalent to converting a double into 1281 * a string into a C buffer, and then calling the function 1282 * RedisModule_ReplyWithStringBuffer() with the buffer and length. 1283 * 1284 * The function always returns REDISMODULE_OK. */ 1285 int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) { 1286 client *c = moduleGetReplyClient(ctx); 1287 if (c == NULL) return REDISMODULE_OK; 1288 addReplyDouble(c,d); 1289 return REDISMODULE_OK; 1290 } 1291 1292 /* -------------------------------------------------------------------------- 1293 * Commands replication API 1294 * -------------------------------------------------------------------------- */ 1295 1296 /* Helper function to replicate MULTI the first time we replicate something 1297 * in the context of a command execution. EXEC will be handled by the 1298 * RedisModuleCommandDispatcher() function. */ 1299 void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) { 1300 /* Skip this if client explicitly wrap the command with MULTI, or if 1301 * the module command was called by a script. */ 1302 if (ctx->client->flags & (CLIENT_MULTI|CLIENT_LUA)) return; 1303 /* If we already emitted MULTI return ASAP. */ 1304 if (ctx->flags & REDISMODULE_CTX_MULTI_EMITTED) return; 1305 /* If this is a thread safe context, we do not want to wrap commands 1306 * executed into MUTLI/EXEC, they are executed as single commands 1307 * from an external client in essence. */ 1308 if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) return; 1309 execCommandPropagateMulti(ctx->client); 1310 ctx->flags |= REDISMODULE_CTX_MULTI_EMITTED; 1311 } 1312 1313 /* Replicate the specified command and arguments to slaves and AOF, as effect 1314 * of execution of the calling command implementation. 1315 * 1316 * The replicated commands are always wrapped into the MULTI/EXEC that 1317 * contains all the commands replicated in a given module command 1318 * execution. However the commands replicated with RedisModule_Call() 1319 * are the first items, the ones replicated with RedisModule_Replicate() 1320 * will all follow before the EXEC. 1321 * 1322 * Modules should try to use one interface or the other. 1323 * 1324 * This command follows exactly the same interface of RedisModule_Call(), 1325 * so a set of format specifiers must be passed, followed by arguments 1326 * matching the provided format specifiers. 1327 * 1328 * Please refer to RedisModule_Call() for more information. 1329 * 1330 * The command returns REDISMODULE_ERR if the format specifiers are invalid 1331 * or the command name does not belong to a known command. */ 1332 int RM_Replicate(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...) { 1333 struct redisCommand *cmd; 1334 robj **argv = NULL; 1335 int argc = 0, flags = 0, j; 1336 va_list ap; 1337 1338 cmd = lookupCommandByCString((char*)cmdname); 1339 if (!cmd) return REDISMODULE_ERR; 1340 1341 /* Create the client and dispatch the command. */ 1342 va_start(ap, fmt); 1343 argv = moduleCreateArgvFromUserFormat(cmdname,fmt,&argc,&flags,ap); 1344 va_end(ap); 1345 if (argv == NULL) return REDISMODULE_ERR; 1346 1347 /* Replicate! */ 1348 moduleReplicateMultiIfNeeded(ctx); 1349 alsoPropagate(cmd,ctx->client->db->id,argv,argc, 1350 PROPAGATE_AOF|PROPAGATE_REPL); 1351 1352 /* Release the argv. */ 1353 for (j = 0; j < argc; j++) decrRefCount(argv[j]); 1354 zfree(argv); 1355 server.dirty++; 1356 return REDISMODULE_OK; 1357 } 1358 1359 /* This function will replicate the command exactly as it was invoked 1360 * by the client. Note that this function will not wrap the command into 1361 * a MULTI/EXEC stanza, so it should not be mixed with other replication 1362 * commands. 1363 * 1364 * Basically this form of replication is useful when you want to propagate 1365 * the command to the slaves and AOF file exactly as it was called, since 1366 * the command can just be re-executed to deterministically re-create the 1367 * new state starting from the old one. 1368 * 1369 * The function always returns REDISMODULE_OK. */ 1370 int RM_ReplicateVerbatim(RedisModuleCtx *ctx) { 1371 alsoPropagate(ctx->client->cmd,ctx->client->db->id, 1372 ctx->client->argv,ctx->client->argc, 1373 PROPAGATE_AOF|PROPAGATE_REPL); 1374 server.dirty++; 1375 return REDISMODULE_OK; 1376 } 1377 1378 /* -------------------------------------------------------------------------- 1379 * DB and Key APIs -- Generic API 1380 * -------------------------------------------------------------------------- */ 1381 1382 /* Return the ID of the current client calling the currently active module 1383 * command. The returned ID has a few guarantees: 1384 * 1385 * 1. The ID is different for each different client, so if the same client 1386 * executes a module command multiple times, it can be recognized as 1387 * having the same ID, otherwise the ID will be different. 1388 * 2. The ID increases monotonically. Clients connecting to the server later 1389 * are guaranteed to get IDs greater than any past ID previously seen. 1390 * 1391 * Valid IDs are from 1 to 2^64-1. If 0 is returned it means there is no way 1392 * to fetch the ID in the context the function was currently called. */ 1393 unsigned long long RM_GetClientId(RedisModuleCtx *ctx) { 1394 if (ctx->client == NULL) return 0; 1395 return ctx->client->id; 1396 } 1397 1398 /* Return the currently selected DB. */ 1399 int RM_GetSelectedDb(RedisModuleCtx *ctx) { 1400 return ctx->client->db->id; 1401 } 1402 1403 1404 /* Return the current context's flags. The flags provide information on the 1405 * current request context (whether the client is a Lua script or in a MULTI), 1406 * and about the Redis instance in general, i.e replication and persistence. 1407 * 1408 * The available flags are: 1409 * 1410 * * REDISMODULE_CTX_FLAGS_LUA: The command is running in a Lua script 1411 * 1412 * * REDISMODULE_CTX_FLAGS_MULTI: The command is running inside a transaction 1413 * 1414 * * REDISMODULE_CTX_FLAGS_REPLICATED: The command was sent over the replication 1415 * link by the MASTER 1416 * 1417 * * REDISMODULE_CTX_FLAGS_MASTER: The Redis instance is a master 1418 * 1419 * * REDISMODULE_CTX_FLAGS_SLAVE: The Redis instance is a slave 1420 * 1421 * * REDISMODULE_CTX_FLAGS_READONLY: The Redis instance is read-only 1422 * 1423 * * REDISMODULE_CTX_FLAGS_CLUSTER: The Redis instance is in cluster mode 1424 * 1425 * * REDISMODULE_CTX_FLAGS_AOF: The Redis instance has AOF enabled 1426 * 1427 * * REDISMODULE_CTX_FLAGS_RDB: The instance has RDB enabled 1428 * 1429 * * REDISMODULE_CTX_FLAGS_MAXMEMORY: The instance has Maxmemory set 1430 * 1431 * * REDISMODULE_CTX_FLAGS_EVICT: Maxmemory is set and has an eviction 1432 * policy that may delete keys 1433 * 1434 * * REDISMODULE_CTX_FLAGS_OOM: Redis is out of memory according to the 1435 * maxmemory setting. 1436 * 1437 * * REDISMODULE_CTX_FLAGS_OOM_WARNING: Less than 25% of memory remains before 1438 * reaching the maxmemory level. 1439 */ 1440 int RM_GetContextFlags(RedisModuleCtx *ctx) { 1441 1442 int flags = 0; 1443 /* Client specific flags */ 1444 if (ctx->client) { 1445 if (ctx->client->flags & CLIENT_LUA) 1446 flags |= REDISMODULE_CTX_FLAGS_LUA; 1447 if (ctx->client->flags & CLIENT_MULTI) 1448 flags |= REDISMODULE_CTX_FLAGS_MULTI; 1449 /* Module command recieved from MASTER, is replicated. */ 1450 if (ctx->client->flags & CLIENT_MASTER) 1451 flags |= REDISMODULE_CTX_FLAGS_REPLICATED; 1452 } 1453 1454 if (server.cluster_enabled) 1455 flags |= REDISMODULE_CTX_FLAGS_CLUSTER; 1456 1457 /* Maxmemory and eviction policy */ 1458 if (server.maxmemory > 0) { 1459 flags |= REDISMODULE_CTX_FLAGS_MAXMEMORY; 1460 1461 if (server.maxmemory_policy != MAXMEMORY_NO_EVICTION) 1462 flags |= REDISMODULE_CTX_FLAGS_EVICT; 1463 } 1464 1465 /* Persistence flags */ 1466 if (server.aof_state != AOF_OFF) 1467 flags |= REDISMODULE_CTX_FLAGS_AOF; 1468 if (server.saveparamslen > 0) 1469 flags |= REDISMODULE_CTX_FLAGS_RDB; 1470 1471 /* Replication flags */ 1472 if (server.masterhost == NULL) { 1473 flags |= REDISMODULE_CTX_FLAGS_MASTER; 1474 } else { 1475 flags |= REDISMODULE_CTX_FLAGS_SLAVE; 1476 if (server.repl_slave_ro) 1477 flags |= REDISMODULE_CTX_FLAGS_READONLY; 1478 } 1479 1480 /* OOM flag. */ 1481 float level; 1482 int retval = getMaxmemoryState(NULL,NULL,NULL,&level); 1483 if (retval == C_ERR) flags |= REDISMODULE_CTX_FLAGS_OOM; 1484 if (level > 0.75) flags |= REDISMODULE_CTX_FLAGS_OOM_WARNING; 1485 1486 return flags; 1487 } 1488 1489 /* Change the currently selected DB. Returns an error if the id 1490 * is out of range. 1491 * 1492 * Note that the client will retain the currently selected DB even after 1493 * the Redis command implemented by the module calling this function 1494 * returns. 1495 * 1496 * If the module command wishes to change something in a different DB and 1497 * returns back to the original one, it should call RedisModule_GetSelectedDb() 1498 * before in order to restore the old DB number before returning. */ 1499 int RM_SelectDb(RedisModuleCtx *ctx, int newid) { 1500 int retval = selectDb(ctx->client,newid); 1501 return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR; 1502 } 1503 1504 /* Return an handle representing a Redis key, so that it is possible 1505 * to call other APIs with the key handle as argument to perform 1506 * operations on the key. 1507 * 1508 * The return value is the handle representing the key, that must be 1509 * closed with RM_CloseKey(). 1510 * 1511 * If the key does not exist and WRITE mode is requested, the handle 1512 * is still returned, since it is possible to perform operations on 1513 * a yet not existing key (that will be created, for example, after 1514 * a list push operation). If the mode is just READ instead, and the 1515 * key does not exist, NULL is returned. However it is still safe to 1516 * call RedisModule_CloseKey() and RedisModule_KeyType() on a NULL 1517 * value. */ 1518 void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) { 1519 RedisModuleKey *kp; 1520 robj *value; 1521 1522 if (mode & REDISMODULE_WRITE) { 1523 value = lookupKeyWrite(ctx->client->db,keyname); 1524 } else { 1525 value = lookupKeyRead(ctx->client->db,keyname); 1526 if (value == NULL) { 1527 return NULL; 1528 } 1529 } 1530 1531 /* Setup the key handle. */ 1532 kp = zmalloc(sizeof(*kp)); 1533 kp->ctx = ctx; 1534 kp->db = ctx->client->db; 1535 kp->key = keyname; 1536 incrRefCount(keyname); 1537 kp->value = value; 1538 kp->iter = NULL; 1539 kp->mode = mode; 1540 zsetKeyReset(kp); 1541 autoMemoryAdd(ctx,REDISMODULE_AM_KEY,kp); 1542 return (void*)kp; 1543 } 1544 1545 /* Close a key handle. */ 1546 void RM_CloseKey(RedisModuleKey *key) { 1547 if (key == NULL) return; 1548 if (key->mode & REDISMODULE_WRITE) signalModifiedKey(key->db,key->key); 1549 /* TODO: if (key->iter) RM_KeyIteratorStop(kp); */ 1550 RM_ZsetRangeStop(key); 1551 decrRefCount(key->key); 1552 autoMemoryFreed(key->ctx,REDISMODULE_AM_KEY,key); 1553 zfree(key); 1554 } 1555 1556 /* Return the type of the key. If the key pointer is NULL then 1557 * REDISMODULE_KEYTYPE_EMPTY is returned. */ 1558 int RM_KeyType(RedisModuleKey *key) { 1559 if (key == NULL || key->value == NULL) return REDISMODULE_KEYTYPE_EMPTY; 1560 /* We map between defines so that we are free to change the internal 1561 * defines as desired. */ 1562 switch(key->value->type) { 1563 case OBJ_STRING: return REDISMODULE_KEYTYPE_STRING; 1564 case OBJ_LIST: return REDISMODULE_KEYTYPE_LIST; 1565 case OBJ_SET: return REDISMODULE_KEYTYPE_SET; 1566 case OBJ_ZSET: return REDISMODULE_KEYTYPE_ZSET; 1567 case OBJ_HASH: return REDISMODULE_KEYTYPE_HASH; 1568 case OBJ_MODULE: return REDISMODULE_KEYTYPE_MODULE; 1569 default: return 0; 1570 } 1571 } 1572 1573 /* Return the length of the value associated with the key. 1574 * For strings this is the length of the string. For all the other types 1575 * is the number of elements (just counting keys for hashes). 1576 * 1577 * If the key pointer is NULL or the key is empty, zero is returned. */ 1578 size_t RM_ValueLength(RedisModuleKey *key) { 1579 if (key == NULL || key->value == NULL) return 0; 1580 switch(key->value->type) { 1581 case OBJ_STRING: return stringObjectLen(key->value); 1582 case OBJ_LIST: return listTypeLength(key->value); 1583 case OBJ_SET: return setTypeSize(key->value); 1584 case OBJ_ZSET: return zsetLength(key->value); 1585 case OBJ_HASH: return hashTypeLength(key->value); 1586 default: return 0; 1587 } 1588 } 1589 1590 /* If the key is open for writing, remove it, and setup the key to 1591 * accept new writes as an empty key (that will be created on demand). 1592 * On success REDISMODULE_OK is returned. If the key is not open for 1593 * writing REDISMODULE_ERR is returned. */ 1594 int RM_DeleteKey(RedisModuleKey *key) { 1595 if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR; 1596 if (key->value) { 1597 dbDelete(key->db,key->key); 1598 key->value = NULL; 1599 } 1600 return REDISMODULE_OK; 1601 } 1602 1603 /* If the key is open for writing, unlink it (that is delete it in a 1604 * non-blocking way, not reclaiming memory immediately) and setup the key to 1605 * accept new writes as an empty key (that will be created on demand). 1606 * On success REDISMODULE_OK is returned. If the key is not open for 1607 * writing REDISMODULE_ERR is returned. */ 1608 int RM_UnlinkKey(RedisModuleKey *key) { 1609 if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR; 1610 if (key->value) { 1611 dbAsyncDelete(key->db,key->key); 1612 key->value = NULL; 1613 } 1614 return REDISMODULE_OK; 1615 } 1616 1617 /* Return the key expire value, as milliseconds of remaining TTL. 1618 * If no TTL is associated with the key or if the key is empty, 1619 * REDISMODULE_NO_EXPIRE is returned. */ 1620 mstime_t RM_GetExpire(RedisModuleKey *key) { 1621 mstime_t expire = getExpire(key->db,key->key); 1622 if (expire == -1 || key->value == NULL) return -1; 1623 expire -= mstime(); 1624 return expire >= 0 ? expire : 0; 1625 } 1626 1627 /* Set a new expire for the key. If the special expire 1628 * REDISMODULE_NO_EXPIRE is set, the expire is cancelled if there was 1629 * one (the same as the PERSIST command). 1630 * 1631 * Note that the expire must be provided as a positive integer representing 1632 * the number of milliseconds of TTL the key should have. 1633 * 1634 * The function returns REDISMODULE_OK on success or REDISMODULE_ERR if 1635 * the key was not open for writing or is an empty key. */ 1636 int RM_SetExpire(RedisModuleKey *key, mstime_t expire) { 1637 if (!(key->mode & REDISMODULE_WRITE) || key->value == NULL) 1638 return REDISMODULE_ERR; 1639 if (expire != REDISMODULE_NO_EXPIRE) { 1640 expire += mstime(); 1641 setExpire(key->ctx->client,key->db,key->key,expire); 1642 } else { 1643 removeExpire(key->db,key->key); 1644 } 1645 return REDISMODULE_OK; 1646 } 1647 1648 /* -------------------------------------------------------------------------- 1649 * Key API for String type 1650 * -------------------------------------------------------------------------- */ 1651 1652 /* If the key is open for writing, set the specified string 'str' as the 1653 * value of the key, deleting the old value if any. 1654 * On success REDISMODULE_OK is returned. If the key is not open for 1655 * writing or there is an active iterator, REDISMODULE_ERR is returned. */ 1656 int RM_StringSet(RedisModuleKey *key, RedisModuleString *str) { 1657 if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR; 1658 RM_DeleteKey(key); 1659 setKey(key->db,key->key,str); 1660 key->value = str; 1661 return REDISMODULE_OK; 1662 } 1663 1664 /* Prepare the key associated string value for DMA access, and returns 1665 * a pointer and size (by reference), that the user can use to read or 1666 * modify the string in-place accessing it directly via pointer. 1667 * 1668 * The 'mode' is composed by bitwise OR-ing the following flags: 1669 * 1670 * REDISMODULE_READ -- Read access 1671 * REDISMODULE_WRITE -- Write access 1672 * 1673 * If the DMA is not requested for writing, the pointer returned should 1674 * only be accessed in a read-only fashion. 1675 * 1676 * On error (wrong type) NULL is returned. 1677 * 1678 * DMA access rules: 1679 * 1680 * 1. No other key writing function should be called since the moment 1681 * the pointer is obtained, for all the time we want to use DMA access 1682 * to read or modify the string. 1683 * 1684 * 2. Each time RM_StringTruncate() is called, to continue with the DMA 1685 * access, RM_StringDMA() should be called again to re-obtain 1686 * a new pointer and length. 1687 * 1688 * 3. If the returned pointer is not NULL, but the length is zero, no 1689 * byte can be touched (the string is empty, or the key itself is empty) 1690 * so a RM_StringTruncate() call should be used if there is to enlarge 1691 * the string, and later call StringDMA() again to get the pointer. 1692 */ 1693 char *RM_StringDMA(RedisModuleKey *key, size_t *len, int mode) { 1694 /* We need to return *some* pointer for empty keys, we just return 1695 * a string literal pointer, that is the advantage to be mapped into 1696 * a read only memory page, so the module will segfault if a write 1697 * attempt is performed. */ 1698 char *emptystring = "<dma-empty-string>"; 1699 if (key->value == NULL) { 1700 *len = 0; 1701 return emptystring; 1702 } 1703 1704 if (key->value->type != OBJ_STRING) return NULL; 1705 1706 /* For write access, and even for read access if the object is encoded, 1707 * we unshare the string (that has the side effect of decoding it). */ 1708 if ((mode & REDISMODULE_WRITE) || key->value->encoding != OBJ_ENCODING_RAW) 1709 key->value = dbUnshareStringValue(key->db, key->key, key->value); 1710 1711 *len = sdslen(key->value->ptr); 1712 return key->value->ptr; 1713 } 1714 1715 /* If the string is open for writing and is of string type, resize it, padding 1716 * with zero bytes if the new length is greater than the old one. 1717 * 1718 * After this call, RM_StringDMA() must be called again to continue 1719 * DMA access with the new pointer. 1720 * 1721 * The function returns REDISMODULE_OK on success, and REDISMODULE_ERR on 1722 * error, that is, the key is not open for writing, is not a string 1723 * or resizing for more than 512 MB is requested. 1724 * 1725 * If the key is empty, a string key is created with the new string value 1726 * unless the new length value requested is zero. */ 1727 int RM_StringTruncate(RedisModuleKey *key, size_t newlen) { 1728 if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR; 1729 if (key->value && key->value->type != OBJ_STRING) return REDISMODULE_ERR; 1730 if (newlen > 512*1024*1024) return REDISMODULE_ERR; 1731 1732 /* Empty key and new len set to 0. Just return REDISMODULE_OK without 1733 * doing anything. */ 1734 if (key->value == NULL && newlen == 0) return REDISMODULE_OK; 1735 1736 if (key->value == NULL) { 1737 /* Empty key: create it with the new size. */ 1738 robj *o = createObject(OBJ_STRING,sdsnewlen(NULL, newlen)); 1739 setKey(key->db,key->key,o); 1740 key->value = o; 1741 decrRefCount(o); 1742 } else { 1743 /* Unshare and resize. */ 1744 key->value = dbUnshareStringValue(key->db, key->key, key->value); 1745 size_t curlen = sdslen(key->value->ptr); 1746 if (newlen > curlen) { 1747 key->value->ptr = sdsgrowzero(key->value->ptr,newlen); 1748 } else if (newlen < curlen) { 1749 sdsrange(key->value->ptr,0,newlen-1); 1750 /* If the string is too wasteful, reallocate it. */ 1751 if (sdslen(key->value->ptr) < sdsavail(key->value->ptr)) 1752 key->value->ptr = sdsRemoveFreeSpace(key->value->ptr); 1753 } 1754 } 1755 return REDISMODULE_OK; 1756 } 1757 1758 /* -------------------------------------------------------------------------- 1759 * Key API for List type 1760 * -------------------------------------------------------------------------- */ 1761 1762 /* Push an element into a list, on head or tail depending on 'where' argument. 1763 * If the key pointer is about an empty key opened for writing, the key 1764 * is created. On error (key opened for read-only operations or of the wrong 1765 * type) REDISMODULE_ERR is returned, otherwise REDISMODULE_OK is returned. */ 1766 int RM_ListPush(RedisModuleKey *key, int where, RedisModuleString *ele) { 1767 if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR; 1768 if (key->value && key->value->type != OBJ_LIST) return REDISMODULE_ERR; 1769 if (key->value == NULL) moduleCreateEmptyKey(key,REDISMODULE_KEYTYPE_LIST); 1770 listTypePush(key->value, ele, 1771 (where == REDISMODULE_LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL); 1772 return REDISMODULE_OK; 1773 } 1774 1775 /* Pop an element from the list, and returns it as a module string object 1776 * that the user should be free with RM_FreeString() or by enabling 1777 * automatic memory. 'where' specifies if the element should be popped from 1778 * head or tail. The command returns NULL if: 1779 * 1) The list is empty. 1780 * 2) The key was not open for writing. 1781 * 3) The key is not a list. */ 1782 RedisModuleString *RM_ListPop(RedisModuleKey *key, int where) { 1783 if (!(key->mode & REDISMODULE_WRITE) || 1784 key->value == NULL || 1785 key->value->type != OBJ_LIST) return NULL; 1786 robj *ele = listTypePop(key->value, 1787 (where == REDISMODULE_LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL); 1788 robj *decoded = getDecodedObject(ele); 1789 decrRefCount(ele); 1790 moduleDelKeyIfEmpty(key); 1791 autoMemoryAdd(key->ctx,REDISMODULE_AM_STRING,decoded); 1792 return decoded; 1793 } 1794 1795 /* -------------------------------------------------------------------------- 1796 * Key API for Sorted Set type 1797 * -------------------------------------------------------------------------- */ 1798 1799 /* Conversion from/to public flags of the Modules API and our private flags, 1800 * so that we have everything decoupled. */ 1801 int RM_ZsetAddFlagsToCoreFlags(int flags) { 1802 int retflags = 0; 1803 if (flags & REDISMODULE_ZADD_XX) retflags |= ZADD_XX; 1804 if (flags & REDISMODULE_ZADD_NX) retflags |= ZADD_NX; 1805 return retflags; 1806 } 1807 1808 /* See previous function comment. */ 1809 int RM_ZsetAddFlagsFromCoreFlags(int flags) { 1810 int retflags = 0; 1811 if (flags & ZADD_ADDED) retflags |= REDISMODULE_ZADD_ADDED; 1812 if (flags & ZADD_UPDATED) retflags |= REDISMODULE_ZADD_UPDATED; 1813 if (flags & ZADD_NOP) retflags |= REDISMODULE_ZADD_NOP; 1814 return retflags; 1815 } 1816 1817 /* Add a new element into a sorted set, with the specified 'score'. 1818 * If the element already exists, the score is updated. 1819 * 1820 * A new sorted set is created at value if the key is an empty open key 1821 * setup for writing. 1822 * 1823 * Additional flags can be passed to the function via a pointer, the flags 1824 * are both used to receive input and to communicate state when the function 1825 * returns. 'flagsptr' can be NULL if no special flags are used. 1826 * 1827 * The input flags are: 1828 * 1829 * REDISMODULE_ZADD_XX: Element must already exist. Do nothing otherwise. 1830 * REDISMODULE_ZADD_NX: Element must not exist. Do nothing otherwise. 1831 * 1832 * The output flags are: 1833 * 1834 * REDISMODULE_ZADD_ADDED: The new element was added to the sorted set. 1835 * REDISMODULE_ZADD_UPDATED: The score of the element was updated. 1836 * REDISMODULE_ZADD_NOP: No operation was performed because XX or NX flags. 1837 * 1838 * On success the function returns REDISMODULE_OK. On the following errors 1839 * REDISMODULE_ERR is returned: 1840 * 1841 * * The key was not opened for writing. 1842 * * The key is of the wrong type. 1843 * * 'score' double value is not a number (NaN). 1844 */ 1845 int RM_ZsetAdd(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr) { 1846 int flags = 0; 1847 if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR; 1848 if (key->value && key->value->type != OBJ_ZSET) return REDISMODULE_ERR; 1849 if (key->value == NULL) moduleCreateEmptyKey(key,REDISMODULE_KEYTYPE_ZSET); 1850 if (flagsptr) flags = RM_ZsetAddFlagsToCoreFlags(*flagsptr); 1851 if (zsetAdd(key->value,score,ele->ptr,&flags,NULL) == 0) { 1852 if (flagsptr) *flagsptr = 0; 1853 return REDISMODULE_ERR; 1854 } 1855 if (flagsptr) *flagsptr = RM_ZsetAddFlagsFromCoreFlags(flags); 1856 return REDISMODULE_OK; 1857 } 1858 1859 /* This function works exactly like RM_ZsetAdd(), but instead of setting 1860 * a new score, the score of the existing element is incremented, or if the 1861 * element does not already exist, it is added assuming the old score was 1862 * zero. 1863 * 1864 * The input and output flags, and the return value, have the same exact 1865 * meaning, with the only difference that this function will return 1866 * REDISMODULE_ERR even when 'score' is a valid double number, but adding it 1867 * to the existing score results into a NaN (not a number) condition. 1868 * 1869 * This function has an additional field 'newscore', if not NULL is filled 1870 * with the new score of the element after the increment, if no error 1871 * is returned. */ 1872 int RM_ZsetIncrby(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr, double *newscore) { 1873 int flags = 0; 1874 if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR; 1875 if (key->value && key->value->type != OBJ_ZSET) return REDISMODULE_ERR; 1876 if (key->value == NULL) moduleCreateEmptyKey(key,REDISMODULE_KEYTYPE_ZSET); 1877 if (flagsptr) flags = RM_ZsetAddFlagsToCoreFlags(*flagsptr); 1878 flags |= ZADD_INCR; 1879 if (zsetAdd(key->value,score,ele->ptr,&flags,newscore) == 0) { 1880 if (flagsptr) *flagsptr = 0; 1881 return REDISMODULE_ERR; 1882 } 1883 /* zsetAdd() may signal back that the resulting score is not a number. */ 1884 if (flagsptr && (*flagsptr & ZADD_NAN)) { 1885 *flagsptr = 0; 1886 return REDISMODULE_ERR; 1887 } 1888 if (flagsptr) *flagsptr = RM_ZsetAddFlagsFromCoreFlags(flags); 1889 return REDISMODULE_OK; 1890 } 1891 1892 /* Remove the specified element from the sorted set. 1893 * The function returns REDISMODULE_OK on success, and REDISMODULE_ERR 1894 * on one of the following conditions: 1895 * 1896 * * The key was not opened for writing. 1897 * * The key is of the wrong type. 1898 * 1899 * The return value does NOT indicate the fact the element was really 1900 * removed (since it existed) or not, just if the function was executed 1901 * with success. 1902 * 1903 * In order to know if the element was removed, the additional argument 1904 * 'deleted' must be passed, that populates the integer by reference 1905 * setting it to 1 or 0 depending on the outcome of the operation. 1906 * The 'deleted' argument can be NULL if the caller is not interested 1907 * to know if the element was really removed. 1908 * 1909 * Empty keys will be handled correctly by doing nothing. */ 1910 int RM_ZsetRem(RedisModuleKey *key, RedisModuleString *ele, int *deleted) { 1911 if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR; 1912 if (key->value && key->value->type != OBJ_ZSET) return REDISMODULE_ERR; 1913 if (key->value != NULL && zsetDel(key->value,ele->ptr)) { 1914 if (deleted) *deleted = 1; 1915 } else { 1916 if (deleted) *deleted = 0; 1917 } 1918 return REDISMODULE_OK; 1919 } 1920 1921 /* On success retrieve the double score associated at the sorted set element 1922 * 'ele' and returns REDISMODULE_OK. Otherwise REDISMODULE_ERR is returned 1923 * to signal one of the following conditions: 1924 * 1925 * * There is no such element 'ele' in the sorted set. 1926 * * The key is not a sorted set. 1927 * * The key is an open empty key. 1928 */ 1929 int RM_ZsetScore(RedisModuleKey *key, RedisModuleString *ele, double *score) { 1930 if (key->value == NULL) return REDISMODULE_ERR; 1931 if (key->value->type != OBJ_ZSET) return REDISMODULE_ERR; 1932 if (zsetScore(key->value,ele->ptr,score) == C_ERR) return REDISMODULE_ERR; 1933 return REDISMODULE_OK; 1934 } 1935 1936 /* -------------------------------------------------------------------------- 1937 * Key API for Sorted Set iterator 1938 * -------------------------------------------------------------------------- */ 1939 1940 void zsetKeyReset(RedisModuleKey *key) { 1941 key->ztype = REDISMODULE_ZSET_RANGE_NONE; 1942 key->zcurrent = NULL; 1943 key->zer = 1; 1944 } 1945 1946 /* Stop a sorted set iteration. */ 1947 void RM_ZsetRangeStop(RedisModuleKey *key) { 1948 /* Free resources if needed. */ 1949 if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) 1950 zslFreeLexRange(&key->zlrs); 1951 /* Setup sensible values so that misused iteration API calls when an 1952 * iterator is not active will result into something more sensible 1953 * than crashing. */ 1954 zsetKeyReset(key); 1955 } 1956 1957 /* Return the "End of range" flag value to signal the end of the iteration. */ 1958 int RM_ZsetRangeEndReached(RedisModuleKey *key) { 1959 return key->zer; 1960 } 1961 1962 /* Helper function for RM_ZsetFirstInScoreRange() and RM_ZsetLastInScoreRange(). 1963 * Setup the sorted set iteration according to the specified score range 1964 * (see the functions calling it for more info). If 'first' is true the 1965 * first element in the range is used as a starting point for the iterator 1966 * otherwise the last. Return REDISMODULE_OK on success otherwise 1967 * REDISMODULE_ERR. */ 1968 int zsetInitScoreRange(RedisModuleKey *key, double min, double max, int minex, int maxex, int first) { 1969 if (!key->value || key->value->type != OBJ_ZSET) return REDISMODULE_ERR; 1970 1971 RM_ZsetRangeStop(key); 1972 key->ztype = REDISMODULE_ZSET_RANGE_SCORE; 1973 key->zer = 0; 1974 1975 /* Setup the range structure used by the sorted set core implementation 1976 * in order to seek at the specified element. */ 1977 zrangespec *zrs = &key->zrs; 1978 zrs->min = min; 1979 zrs->max = max; 1980 zrs->minex = minex; 1981 zrs->maxex = maxex; 1982 1983 if (key->value->encoding == OBJ_ENCODING_ZIPLIST) { 1984 key->zcurrent = first ? zzlFirstInRange(key->value->ptr,zrs) : 1985 zzlLastInRange(key->value->ptr,zrs); 1986 } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) { 1987 zset *zs = key->value->ptr; 1988 zskiplist *zsl = zs->zsl; 1989 key->zcurrent = first ? zslFirstInRange(zsl,zrs) : 1990 zslLastInRange(zsl,zrs); 1991 } else { 1992 serverPanic("Unsupported zset encoding"); 1993 } 1994 if (key->zcurrent == NULL) key->zer = 1; 1995 return REDISMODULE_OK; 1996 } 1997 1998 /* Setup a sorted set iterator seeking the first element in the specified 1999 * range. Returns REDISMODULE_OK if the iterator was correctly initialized 2000 * otherwise REDISMODULE_ERR is returned in the following conditions: 2001 * 2002 * 1. The value stored at key is not a sorted set or the key is empty. 2003 * 2004 * The range is specified according to the two double values 'min' and 'max'. 2005 * Both can be infinite using the following two macros: 2006 * 2007 * REDISMODULE_POSITIVE_INFINITE for positive infinite value 2008 * REDISMODULE_NEGATIVE_INFINITE for negative infinite value 2009 * 2010 * 'minex' and 'maxex' parameters, if true, respectively setup a range 2011 * where the min and max value are exclusive (not included) instead of 2012 * inclusive. */ 2013 int RM_ZsetFirstInScoreRange(RedisModuleKey *key, double min, double max, int minex, int maxex) { 2014 return zsetInitScoreRange(key,min,max,minex,maxex,1); 2015 } 2016 2017 /* Exactly like RedisModule_ZsetFirstInScoreRange() but the last element of 2018 * the range is selected for the start of the iteration instead. */ 2019 int RM_ZsetLastInScoreRange(RedisModuleKey *key, double min, double max, int minex, int maxex) { 2020 return zsetInitScoreRange(key,min,max,minex,maxex,0); 2021 } 2022 2023 /* Helper function for RM_ZsetFirstInLexRange() and RM_ZsetLastInLexRange(). 2024 * Setup the sorted set iteration according to the specified lexicographical 2025 * range (see the functions calling it for more info). If 'first' is true the 2026 * first element in the range is used as a starting point for the iterator 2027 * otherwise the last. Return REDISMODULE_OK on success otherwise 2028 * REDISMODULE_ERR. 2029 * 2030 * Note that this function takes 'min' and 'max' in the same form of the 2031 * Redis ZRANGEBYLEX command. */ 2032 int zsetInitLexRange(RedisModuleKey *key, RedisModuleString *min, RedisModuleString *max, int first) { 2033 if (!key->value || key->value->type != OBJ_ZSET) return REDISMODULE_ERR; 2034 2035 RM_ZsetRangeStop(key); 2036 key->zer = 0; 2037 2038 /* Setup the range structure used by the sorted set core implementation 2039 * in order to seek at the specified element. */ 2040 zlexrangespec *zlrs = &key->zlrs; 2041 if (zslParseLexRange(min, max, zlrs) == C_ERR) return REDISMODULE_ERR; 2042 2043 /* Set the range type to lex only after successfully parsing the range, 2044 * otherwise we don't want the zlexrangespec to be freed. */ 2045 key->ztype = REDISMODULE_ZSET_RANGE_LEX; 2046 2047 if (key->value->encoding == OBJ_ENCODING_ZIPLIST) { 2048 key->zcurrent = first ? zzlFirstInLexRange(key->value->ptr,zlrs) : 2049 zzlLastInLexRange(key->value->ptr,zlrs); 2050 } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) { 2051 zset *zs = key->value->ptr; 2052 zskiplist *zsl = zs->zsl; 2053 key->zcurrent = first ? zslFirstInLexRange(zsl,zlrs) : 2054 zslLastInLexRange(zsl,zlrs); 2055 } else { 2056 serverPanic("Unsupported zset encoding"); 2057 } 2058 if (key->zcurrent == NULL) key->zer = 1; 2059 2060 return REDISMODULE_OK; 2061 } 2062 2063 /* Setup a sorted set iterator seeking the first element in the specified 2064 * lexicographical range. Returns REDISMODULE_OK if the iterator was correctly 2065 * initialized otherwise REDISMODULE_ERR is returned in the 2066 * following conditions: 2067 * 2068 * 1. The value stored at key is not a sorted set or the key is empty. 2069 * 2. The lexicographical range 'min' and 'max' format is invalid. 2070 * 2071 * 'min' and 'max' should be provided as two RedisModuleString objects 2072 * in the same format as the parameters passed to the ZRANGEBYLEX command. 2073 * The function does not take ownership of the objects, so they can be released 2074 * ASAP after the iterator is setup. */ 2075 int RM_ZsetFirstInLexRange(RedisModuleKey *key, RedisModuleString *min, RedisModuleString *max) { 2076 return zsetInitLexRange(key,min,max,1); 2077 } 2078 2079 /* Exactly like RedisModule_ZsetFirstInLexRange() but the last element of 2080 * the range is selected for the start of the iteration instead. */ 2081 int RM_ZsetLastInLexRange(RedisModuleKey *key, RedisModuleString *min, RedisModuleString *max) { 2082 return zsetInitLexRange(key,min,max,0); 2083 } 2084 2085 /* Return the current sorted set element of an active sorted set iterator 2086 * or NULL if the range specified in the iterator does not include any 2087 * element. */ 2088 RedisModuleString *RM_ZsetRangeCurrentElement(RedisModuleKey *key, double *score) { 2089 RedisModuleString *str; 2090 2091 if (key->zcurrent == NULL) return NULL; 2092 if (key->value->encoding == OBJ_ENCODING_ZIPLIST) { 2093 unsigned char *eptr, *sptr; 2094 eptr = key->zcurrent; 2095 sds ele = ziplistGetObject(eptr); 2096 if (score) { 2097 sptr = ziplistNext(key->value->ptr,eptr); 2098 *score = zzlGetScore(sptr); 2099 } 2100 str = createObject(OBJ_STRING,ele); 2101 } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) { 2102 zskiplistNode *ln = key->zcurrent; 2103 if (score) *score = ln->score; 2104 str = createStringObject(ln->ele,sdslen(ln->ele)); 2105 } else { 2106 serverPanic("Unsupported zset encoding"); 2107 } 2108 autoMemoryAdd(key->ctx,REDISMODULE_AM_STRING,str); 2109 return str; 2110 } 2111 2112 /* Go to the next element of the sorted set iterator. Returns 1 if there was 2113 * a next element, 0 if we are already at the latest element or the range 2114 * does not include any item at all. */ 2115 int RM_ZsetRangeNext(RedisModuleKey *key) { 2116 if (!key->ztype || !key->zcurrent) return 0; /* No active iterator. */ 2117 2118 if (key->value->encoding == OBJ_ENCODING_ZIPLIST) { 2119 unsigned char *zl = key->value->ptr; 2120 unsigned char *eptr = key->zcurrent; 2121 unsigned char *next; 2122 next = ziplistNext(zl,eptr); /* Skip element. */ 2123 if (next) next = ziplistNext(zl,next); /* Skip score. */ 2124 if (next == NULL) { 2125 key->zer = 1; 2126 return 0; 2127 } else { 2128 /* Are we still within the range? */ 2129 if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE) { 2130 /* Fetch the next element score for the 2131 * range check. */ 2132 unsigned char *saved_next = next; 2133 next = ziplistNext(zl,next); /* Skip next element. */ 2134 double score = zzlGetScore(next); /* Obtain the next score. */ 2135 if (!zslValueLteMax(score,&key->zrs)) { 2136 key->zer = 1; 2137 return 0; 2138 } 2139 next = saved_next; 2140 } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) { 2141 if (!zzlLexValueLteMax(next,&key->zlrs)) { 2142 key->zer = 1; 2143 return 0; 2144 } 2145 } 2146 key->zcurrent = next; 2147 return 1; 2148 } 2149 } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) { 2150 zskiplistNode *ln = key->zcurrent, *next = ln->level[0].forward; 2151 if (next == NULL) { 2152 key->zer = 1; 2153 return 0; 2154 } else { 2155 /* Are we still within the range? */ 2156 if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE && 2157 !zslValueLteMax(next->score,&key->zrs)) 2158 { 2159 key->zer = 1; 2160 return 0; 2161 } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) { 2162 if (!zslLexValueLteMax(next->ele,&key->zlrs)) { 2163 key->zer = 1; 2164 return 0; 2165 } 2166 } 2167 key->zcurrent = next; 2168 return 1; 2169 } 2170 } else { 2171 serverPanic("Unsupported zset encoding"); 2172 } 2173 } 2174 2175 /* Go to the previous element of the sorted set iterator. Returns 1 if there was 2176 * a previous element, 0 if we are already at the first element or the range 2177 * does not include any item at all. */ 2178 int RM_ZsetRangePrev(RedisModuleKey *key) { 2179 if (!key->ztype || !key->zcurrent) return 0; /* No active iterator. */ 2180 2181 if (key->value->encoding == OBJ_ENCODING_ZIPLIST) { 2182 unsigned char *zl = key->value->ptr; 2183 unsigned char *eptr = key->zcurrent; 2184 unsigned char *prev; 2185 prev = ziplistPrev(zl,eptr); /* Go back to previous score. */ 2186 if (prev) prev = ziplistPrev(zl,prev); /* Back to previous ele. */ 2187 if (prev == NULL) { 2188 key->zer = 1; 2189 return 0; 2190 } else { 2191 /* Are we still within the range? */ 2192 if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE) { 2193 /* Fetch the previous element score for the 2194 * range check. */ 2195 unsigned char *saved_prev = prev; 2196 prev = ziplistNext(zl,prev); /* Skip element to get the score.*/ 2197 double score = zzlGetScore(prev); /* Obtain the prev score. */ 2198 if (!zslValueGteMin(score,&key->zrs)) { 2199 key->zer = 1; 2200 return 0; 2201 } 2202 prev = saved_prev; 2203 } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) { 2204 if (!zzlLexValueGteMin(prev,&key->zlrs)) { 2205 key->zer = 1; 2206 return 0; 2207 } 2208 } 2209 key->zcurrent = prev; 2210 return 1; 2211 } 2212 } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) { 2213 zskiplistNode *ln = key->zcurrent, *prev = ln->backward; 2214 if (prev == NULL) { 2215 key->zer = 1; 2216 return 0; 2217 } else { 2218 /* Are we still within the range? */ 2219 if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE && 2220 !zslValueGteMin(prev->score,&key->zrs)) 2221 { 2222 key->zer = 1; 2223 return 0; 2224 } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) { 2225 if (!zslLexValueGteMin(prev->ele,&key->zlrs)) { 2226 key->zer = 1; 2227 return 0; 2228 } 2229 } 2230 key->zcurrent = prev; 2231 return 1; 2232 } 2233 } else { 2234 serverPanic("Unsupported zset encoding"); 2235 } 2236 } 2237 2238 /* -------------------------------------------------------------------------- 2239 * Key API for Hash type 2240 * -------------------------------------------------------------------------- */ 2241 2242 /* Set the field of the specified hash field to the specified value. 2243 * If the key is an empty key open for writing, it is created with an empty 2244 * hash value, in order to set the specified field. 2245 * 2246 * The function is variadic and the user must specify pairs of field 2247 * names and values, both as RedisModuleString pointers (unless the 2248 * CFIELD option is set, see later). At the end of the field/value-ptr pairs, 2249 * NULL must be specified as last argument to signal the end of the arguments 2250 * in the variadic function. 2251 * 2252 * Example to set the hash argv[1] to the value argv[2]: 2253 * 2254 * RedisModule_HashSet(key,REDISMODULE_HASH_NONE,argv[1],argv[2],NULL); 2255 * 2256 * The function can also be used in order to delete fields (if they exist) 2257 * by setting them to the specified value of REDISMODULE_HASH_DELETE: 2258 * 2259 * RedisModule_HashSet(key,REDISMODULE_HASH_NONE,argv[1], 2260 * REDISMODULE_HASH_DELETE,NULL); 2261 * 2262 * The behavior of the command changes with the specified flags, that can be 2263 * set to REDISMODULE_HASH_NONE if no special behavior is needed. 2264 * 2265 * REDISMODULE_HASH_NX: The operation is performed only if the field was not 2266 * already existing in the hash. 2267 * REDISMODULE_HASH_XX: The operation is performed only if the field was 2268 * already existing, so that a new value could be 2269 * associated to an existing filed, but no new fields 2270 * are created. 2271 * REDISMODULE_HASH_CFIELDS: The field names passed are null terminated C 2272 * strings instead of RedisModuleString objects. 2273 * 2274 * Unless NX is specified, the command overwrites the old field value with 2275 * the new one. 2276 * 2277 * When using REDISMODULE_HASH_CFIELDS, field names are reported using 2278 * normal C strings, so for example to delete the field "foo" the following 2279 * code can be used: 2280 * 2281 * RedisModule_HashSet(key,REDISMODULE_HASH_CFIELDS,"foo", 2282 * REDISMODULE_HASH_DELETE,NULL); 2283 * 2284 * Return value: 2285 * 2286 * The number of fields updated (that may be less than the number of fields 2287 * specified because of the XX or NX options). 2288 * 2289 * In the following case the return value is always zero: 2290 * 2291 * * The key was not open for writing. 2292 * * The key was associated with a non Hash value. 2293 */ 2294 int RM_HashSet(RedisModuleKey *key, int flags, ...) { 2295 va_list ap; 2296 if (!(key->mode & REDISMODULE_WRITE)) return 0; 2297 if (key->value && key->value->type != OBJ_HASH) return 0; 2298 if (key->value == NULL) moduleCreateEmptyKey(key,REDISMODULE_KEYTYPE_HASH); 2299 2300 int updated = 0; 2301 va_start(ap, flags); 2302 while(1) { 2303 RedisModuleString *field, *value; 2304 /* Get the field and value objects. */ 2305 if (flags & REDISMODULE_HASH_CFIELDS) { 2306 char *cfield = va_arg(ap,char*); 2307 if (cfield == NULL) break; 2308 field = createRawStringObject(cfield,strlen(cfield)); 2309 } else { 2310 field = va_arg(ap,RedisModuleString*); 2311 if (field == NULL) break; 2312 } 2313 value = va_arg(ap,RedisModuleString*); 2314 2315 /* Handle XX and NX */ 2316 if (flags & (REDISMODULE_HASH_XX|REDISMODULE_HASH_NX)) { 2317 int exists = hashTypeExists(key->value, field->ptr); 2318 if (((flags & REDISMODULE_HASH_XX) && !exists) || 2319 ((flags & REDISMODULE_HASH_NX) && exists)) 2320 { 2321 if (flags & REDISMODULE_HASH_CFIELDS) decrRefCount(field); 2322 continue; 2323 } 2324 } 2325 2326 /* Handle deletion if value is REDISMODULE_HASH_DELETE. */ 2327 if (value == REDISMODULE_HASH_DELETE) { 2328 updated += hashTypeDelete(key->value, field->ptr); 2329 if (flags & REDISMODULE_HASH_CFIELDS) decrRefCount(field); 2330 continue; 2331 } 2332 2333 int low_flags = HASH_SET_COPY; 2334 /* If CFIELDS is active, we can pass the ownership of the 2335 * SDS object to the low level function that sets the field 2336 * to avoid a useless copy. */ 2337 if (flags & REDISMODULE_HASH_CFIELDS) 2338 low_flags |= HASH_SET_TAKE_FIELD; 2339 2340 robj *argv[2] = {field,value}; 2341 hashTypeTryConversion(key->value,argv,0,1); 2342 updated += hashTypeSet(key->value, field->ptr, value->ptr, low_flags); 2343 2344 /* If CFIELDS is active, SDS string ownership is now of hashTypeSet(), 2345 * however we still have to release the 'field' object shell. */ 2346 if (flags & REDISMODULE_HASH_CFIELDS) { 2347 field->ptr = NULL; /* Prevent the SDS string from being freed. */ 2348 decrRefCount(field); 2349 } 2350 } 2351 va_end(ap); 2352 moduleDelKeyIfEmpty(key); 2353 return updated; 2354 } 2355 2356 /* Get fields from an hash value. This function is called using a variable 2357 * number of arguments, alternating a field name (as a StringRedisModule 2358 * pointer) with a pointer to a StringRedisModule pointer, that is set to the 2359 * value of the field if the field exist, or NULL if the field did not exist. 2360 * At the end of the field/value-ptr pairs, NULL must be specified as last 2361 * argument to signal the end of the arguments in the variadic function. 2362 * 2363 * This is an example usage: 2364 * 2365 * RedisModuleString *first, *second; 2366 * RedisModule_HashGet(mykey,REDISMODULE_HASH_NONE,argv[1],&first, 2367 * argv[2],&second,NULL); 2368 * 2369 * As with RedisModule_HashSet() the behavior of the command can be specified 2370 * passing flags different than REDISMODULE_HASH_NONE: 2371 * 2372 * REDISMODULE_HASH_CFIELD: field names as null terminated C strings. 2373 * 2374 * REDISMODULE_HASH_EXISTS: instead of setting the value of the field 2375 * expecting a RedisModuleString pointer to pointer, the function just 2376 * reports if the field esists or not and expects an integer pointer 2377 * as the second element of each pair. 2378 * 2379 * Example of REDISMODULE_HASH_CFIELD: 2380 * 2381 * RedisModuleString *username, *hashedpass; 2382 * RedisModule_HashGet(mykey,"username",&username,"hp",&hashedpass, NULL); 2383 * 2384 * Example of REDISMODULE_HASH_EXISTS: 2385 * 2386 * int exists; 2387 * RedisModule_HashGet(mykey,argv[1],&exists,NULL); 2388 * 2389 * The function returns REDISMODULE_OK on success and REDISMODULE_ERR if 2390 * the key is not an hash value. 2391 * 2392 * Memory management: 2393 * 2394 * The returned RedisModuleString objects should be released with 2395 * RedisModule_FreeString(), or by enabling automatic memory management. 2396 */ 2397 int RM_HashGet(RedisModuleKey *key, int flags, ...) { 2398 va_list ap; 2399 if (key->value && key->value->type != OBJ_HASH) return REDISMODULE_ERR; 2400 2401 va_start(ap, flags); 2402 while(1) { 2403 RedisModuleString *field, **valueptr; 2404 int *existsptr; 2405 /* Get the field object and the value pointer to pointer. */ 2406 if (flags & REDISMODULE_HASH_CFIELDS) { 2407 char *cfield = va_arg(ap,char*); 2408 if (cfield == NULL) break; 2409 field = createRawStringObject(cfield,strlen(cfield)); 2410 } else { 2411 field = va_arg(ap,RedisModuleString*); 2412 if (field == NULL) break; 2413 } 2414 2415 /* Query the hash for existence or value object. */ 2416 if (flags & REDISMODULE_HASH_EXISTS) { 2417 existsptr = va_arg(ap,int*); 2418 if (key->value) 2419 *existsptr = hashTypeExists(key->value,field->ptr); 2420 else 2421 *existsptr = 0; 2422 } else { 2423 valueptr = va_arg(ap,RedisModuleString**); 2424 if (key->value) { 2425 *valueptr = hashTypeGetValueObject(key->value,field->ptr); 2426 if (*valueptr) { 2427 robj *decoded = getDecodedObject(*valueptr); 2428 decrRefCount(*valueptr); 2429 *valueptr = decoded; 2430 } 2431 if (*valueptr) 2432 autoMemoryAdd(key->ctx,REDISMODULE_AM_STRING,*valueptr); 2433 } else { 2434 *valueptr = NULL; 2435 } 2436 } 2437 2438 /* Cleanup */ 2439 if (flags & REDISMODULE_HASH_CFIELDS) decrRefCount(field); 2440 } 2441 va_end(ap); 2442 return REDISMODULE_OK; 2443 } 2444 2445 /* -------------------------------------------------------------------------- 2446 * Redis <-> Modules generic Call() API 2447 * -------------------------------------------------------------------------- */ 2448 2449 /* Create a new RedisModuleCallReply object. The processing of the reply 2450 * is lazy, the object is just populated with the raw protocol and later 2451 * is processed as needed. Initially we just make sure to set the right 2452 * reply type, which is extremely cheap to do. */ 2453 RedisModuleCallReply *moduleCreateCallReplyFromProto(RedisModuleCtx *ctx, sds proto) { 2454 RedisModuleCallReply *reply = zmalloc(sizeof(*reply)); 2455 reply->ctx = ctx; 2456 reply->proto = proto; 2457 reply->protolen = sdslen(proto); 2458 reply->flags = REDISMODULE_REPLYFLAG_TOPARSE; /* Lazy parsing. */ 2459 switch(proto[0]) { 2460 case '$': 2461 case '+': reply->type = REDISMODULE_REPLY_STRING; break; 2462 case '-': reply->type = REDISMODULE_REPLY_ERROR; break; 2463 case ':': reply->type = REDISMODULE_REPLY_INTEGER; break; 2464 case '*': reply->type = REDISMODULE_REPLY_ARRAY; break; 2465 default: reply->type = REDISMODULE_REPLY_UNKNOWN; break; 2466 } 2467 if ((proto[0] == '*' || proto[0] == '$') && proto[1] == '-') 2468 reply->type = REDISMODULE_REPLY_NULL; 2469 return reply; 2470 } 2471 2472 void moduleParseCallReply_Int(RedisModuleCallReply *reply); 2473 void moduleParseCallReply_BulkString(RedisModuleCallReply *reply); 2474 void moduleParseCallReply_SimpleString(RedisModuleCallReply *reply); 2475 void moduleParseCallReply_Array(RedisModuleCallReply *reply); 2476 2477 /* Do nothing if REDISMODULE_REPLYFLAG_TOPARSE is false, otherwise 2478 * use the protcol of the reply in reply->proto in order to fill the 2479 * reply with parsed data according to the reply type. */ 2480 void moduleParseCallReply(RedisModuleCallReply *reply) { 2481 if (!(reply->flags & REDISMODULE_REPLYFLAG_TOPARSE)) return; 2482 reply->flags &= ~REDISMODULE_REPLYFLAG_TOPARSE; 2483 2484 switch(reply->proto[0]) { 2485 case ':': moduleParseCallReply_Int(reply); break; 2486 case '$': moduleParseCallReply_BulkString(reply); break; 2487 case '-': /* handled by next item. */ 2488 case '+': moduleParseCallReply_SimpleString(reply); break; 2489 case '*': moduleParseCallReply_Array(reply); break; 2490 } 2491 } 2492 2493 void moduleParseCallReply_Int(RedisModuleCallReply *reply) { 2494 char *proto = reply->proto; 2495 char *p = strchr(proto+1,'\r'); 2496 2497 string2ll(proto+1,p-proto-1,&reply->val.ll); 2498 reply->protolen = p-proto+2; 2499 reply->type = REDISMODULE_REPLY_INTEGER; 2500 } 2501 2502 void moduleParseCallReply_BulkString(RedisModuleCallReply *reply) { 2503 char *proto = reply->proto; 2504 char *p = strchr(proto+1,'\r'); 2505 long long bulklen; 2506 2507 string2ll(proto+1,p-proto-1,&bulklen); 2508 if (bulklen == -1) { 2509 reply->protolen = p-proto+2; 2510 reply->type = REDISMODULE_REPLY_NULL; 2511 } else { 2512 reply->val.str = p+2; 2513 reply->len = bulklen; 2514 reply->protolen = p-proto+2+bulklen+2; 2515 reply->type = REDISMODULE_REPLY_STRING; 2516 } 2517 } 2518 2519 void moduleParseCallReply_SimpleString(RedisModuleCallReply *reply) { 2520 char *proto = reply->proto; 2521 char *p = strchr(proto+1,'\r'); 2522 2523 reply->val.str = proto+1; 2524 reply->len = p-proto-1; 2525 reply->protolen = p-proto+2; 2526 reply->type = proto[0] == '+' ? REDISMODULE_REPLY_STRING : 2527 REDISMODULE_REPLY_ERROR; 2528 } 2529 2530 void moduleParseCallReply_Array(RedisModuleCallReply *reply) { 2531 char *proto = reply->proto; 2532 char *p = strchr(proto+1,'\r'); 2533 long long arraylen, j; 2534 2535 string2ll(proto+1,p-proto-1,&arraylen); 2536 p += 2; 2537 2538 if (arraylen == -1) { 2539 reply->protolen = p-proto; 2540 reply->type = REDISMODULE_REPLY_NULL; 2541 return; 2542 } 2543 2544 reply->val.array = zmalloc(sizeof(RedisModuleCallReply)*arraylen); 2545 reply->len = arraylen; 2546 for (j = 0; j < arraylen; j++) { 2547 RedisModuleCallReply *ele = reply->val.array+j; 2548 ele->flags = REDISMODULE_REPLYFLAG_NESTED | 2549 REDISMODULE_REPLYFLAG_TOPARSE; 2550 ele->proto = p; 2551 ele->ctx = reply->ctx; 2552 moduleParseCallReply(ele); 2553 p += ele->protolen; 2554 } 2555 reply->protolen = p-proto; 2556 reply->type = REDISMODULE_REPLY_ARRAY; 2557 } 2558 2559 /* Free a Call reply and all the nested replies it contains if it's an 2560 * array. */ 2561 void RM_FreeCallReply_Rec(RedisModuleCallReply *reply, int freenested){ 2562 /* Don't free nested replies by default: the user must always free the 2563 * toplevel reply. However be gentle and don't crash if the module 2564 * misuses the API. */ 2565 if (!freenested && reply->flags & REDISMODULE_REPLYFLAG_NESTED) return; 2566 2567 if (!(reply->flags & REDISMODULE_REPLYFLAG_TOPARSE)) { 2568 if (reply->type == REDISMODULE_REPLY_ARRAY) { 2569 size_t j; 2570 for (j = 0; j < reply->len; j++) 2571 RM_FreeCallReply_Rec(reply->val.array+j,1); 2572 zfree(reply->val.array); 2573 } 2574 } 2575 2576 /* For nested replies, we don't free reply->proto (which if not NULL 2577 * references the parent reply->proto buffer), nor the structure 2578 * itself which is allocated as an array of structures, and is freed 2579 * when the array value is released. */ 2580 if (!(reply->flags & REDISMODULE_REPLYFLAG_NESTED)) { 2581 if (reply->proto) sdsfree(reply->proto); 2582 zfree(reply); 2583 } 2584 } 2585 2586 /* Wrapper for the recursive free reply function. This is needed in order 2587 * to have the first level function to return on nested replies, but only 2588 * if called by the module API. */ 2589 void RM_FreeCallReply(RedisModuleCallReply *reply) { 2590 2591 RedisModuleCtx *ctx = reply->ctx; 2592 RM_FreeCallReply_Rec(reply,0); 2593 autoMemoryFreed(ctx,REDISMODULE_AM_REPLY,reply); 2594 } 2595 2596 /* Return the reply type. */ 2597 int RM_CallReplyType(RedisModuleCallReply *reply) { 2598 if (!reply) return REDISMODULE_REPLY_UNKNOWN; 2599 return reply->type; 2600 } 2601 2602 /* Return the reply type length, where applicable. */ 2603 size_t RM_CallReplyLength(RedisModuleCallReply *reply) { 2604 moduleParseCallReply(reply); 2605 switch(reply->type) { 2606 case REDISMODULE_REPLY_STRING: 2607 case REDISMODULE_REPLY_ERROR: 2608 case REDISMODULE_REPLY_ARRAY: 2609 return reply->len; 2610 default: 2611 return 0; 2612 } 2613 } 2614 2615 /* Return the 'idx'-th nested call reply element of an array reply, or NULL 2616 * if the reply type is wrong or the index is out of range. */ 2617 RedisModuleCallReply *RM_CallReplyArrayElement(RedisModuleCallReply *reply, size_t idx) { 2618 moduleParseCallReply(reply); 2619 if (reply->type != REDISMODULE_REPLY_ARRAY) return NULL; 2620 if (idx >= reply->len) return NULL; 2621 return reply->val.array+idx; 2622 } 2623 2624 /* Return the long long of an integer reply. */ 2625 long long RM_CallReplyInteger(RedisModuleCallReply *reply) { 2626 moduleParseCallReply(reply); 2627 if (reply->type != REDISMODULE_REPLY_INTEGER) return LLONG_MIN; 2628 return reply->val.ll; 2629 } 2630 2631 /* Return the pointer and length of a string or error reply. */ 2632 const char *RM_CallReplyStringPtr(RedisModuleCallReply *reply, size_t *len) { 2633 moduleParseCallReply(reply); 2634 if (reply->type != REDISMODULE_REPLY_STRING && 2635 reply->type != REDISMODULE_REPLY_ERROR) return NULL; 2636 if (len) *len = reply->len; 2637 return reply->val.str; 2638 } 2639 2640 /* Return a new string object from a call reply of type string, error or 2641 * integer. Otherwise (wrong reply type) return NULL. */ 2642 RedisModuleString *RM_CreateStringFromCallReply(RedisModuleCallReply *reply) { 2643 moduleParseCallReply(reply); 2644 switch(reply->type) { 2645 case REDISMODULE_REPLY_STRING: 2646 case REDISMODULE_REPLY_ERROR: 2647 return RM_CreateString(reply->ctx,reply->val.str,reply->len); 2648 case REDISMODULE_REPLY_INTEGER: { 2649 char buf[64]; 2650 int len = ll2string(buf,sizeof(buf),reply->val.ll); 2651 return RM_CreateString(reply->ctx,buf,len); 2652 } 2653 default: return NULL; 2654 } 2655 } 2656 2657 /* Returns an array of robj pointers, and populates *argc with the number 2658 * of items, by parsing the format specifier "fmt" as described for 2659 * the RM_Call(), RM_Replicate() and other module APIs. 2660 * 2661 * The integer pointed by 'flags' is populated with flags according 2662 * to special modifiers in "fmt". For now only one exists: 2663 * 2664 * "!" -> REDISMODULE_ARGV_REPLICATE 2665 * 2666 * On error (format specifier error) NULL is returned and nothing is 2667 * allocated. On success the argument vector is returned. */ 2668 2669 #define REDISMODULE_ARGV_REPLICATE (1<<0) 2670 2671 robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int *argcp, int *flags, va_list ap) { 2672 int argc = 0, argv_size, j; 2673 robj **argv = NULL; 2674 2675 /* As a first guess to avoid useless reallocations, size argv to 2676 * hold one argument for each char specifier in 'fmt'. */ 2677 argv_size = strlen(fmt)+1; /* +1 because of the command name. */ 2678 argv = zrealloc(argv,sizeof(robj*)*argv_size); 2679 2680 /* Build the arguments vector based on the format specifier. */ 2681 argv[0] = createStringObject(cmdname,strlen(cmdname)); 2682 argc++; 2683 2684 /* Create the client and dispatch the command. */ 2685 const char *p = fmt; 2686 while(*p) { 2687 if (*p == 'c') { 2688 char *cstr = va_arg(ap,char*); 2689 argv[argc++] = createStringObject(cstr,strlen(cstr)); 2690 } else if (*p == 's') { 2691 robj *obj = va_arg(ap,void*); 2692 argv[argc++] = obj; 2693 incrRefCount(obj); 2694 } else if (*p == 'b') { 2695 char *buf = va_arg(ap,char*); 2696 size_t len = va_arg(ap,size_t); 2697 argv[argc++] = createStringObject(buf,len); 2698 } else if (*p == 'l') { 2699 long ll = va_arg(ap,long long); 2700 argv[argc++] = createObject(OBJ_STRING,sdsfromlonglong(ll)); 2701 } else if (*p == 'v') { 2702 /* A vector of strings */ 2703 robj **v = va_arg(ap, void*); 2704 size_t vlen = va_arg(ap, size_t); 2705 2706 /* We need to grow argv to hold the vector's elements. 2707 * We resize by vector_len-1 elements, because we held 2708 * one element in argv for the vector already */ 2709 argv_size += vlen-1; 2710 argv = zrealloc(argv,sizeof(robj*)*argv_size); 2711 2712 size_t i = 0; 2713 for (i = 0; i < vlen; i++) { 2714 incrRefCount(v[i]); 2715 argv[argc++] = v[i]; 2716 } 2717 } else if (*p == '!') { 2718 if (flags) (*flags) |= REDISMODULE_ARGV_REPLICATE; 2719 } else { 2720 goto fmterr; 2721 } 2722 p++; 2723 } 2724 *argcp = argc; 2725 return argv; 2726 2727 fmterr: 2728 for (j = 0; j < argc; j++) 2729 decrRefCount(argv[j]); 2730 zfree(argv); 2731 return NULL; 2732 } 2733 2734 /* Exported API to call any Redis command from modules. 2735 * On success a RedisModuleCallReply object is returned, otherwise 2736 * NULL is returned and errno is set to the following values: 2737 * 2738 * EINVAL: command non existing, wrong arity, wrong format specifier. 2739 * EPERM: operation in Cluster instance with key in non local slot. */ 2740 RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...) { 2741 struct redisCommand *cmd; 2742 client *c = NULL; 2743 robj **argv = NULL; 2744 int argc = 0, flags = 0; 2745 va_list ap; 2746 RedisModuleCallReply *reply = NULL; 2747 int replicate = 0; /* Replicate this command? */ 2748 2749 /* Create the client and dispatch the command. */ 2750 va_start(ap, fmt); 2751 c = createClient(-1); 2752 argv = moduleCreateArgvFromUserFormat(cmdname,fmt,&argc,&flags,ap); 2753 replicate = flags & REDISMODULE_ARGV_REPLICATE; 2754 va_end(ap); 2755 2756 /* Setup our fake client for command execution. */ 2757 c->flags |= CLIENT_MODULE; 2758 c->db = ctx->client->db; 2759 c->argv = argv; 2760 c->argc = argc; 2761 if (ctx->module) ctx->module->in_call++; 2762 2763 /* We handle the above format error only when the client is setup so that 2764 * we can free it normally. */ 2765 if (argv == NULL) goto cleanup; 2766 2767 /* Call command filters */ 2768 moduleCallCommandFilters(c); 2769 2770 /* Lookup command now, after filters had a chance to make modifications 2771 * if necessary. 2772 */ 2773 cmd = lookupCommand(c->argv[0]->ptr); 2774 if (!cmd) { 2775 errno = EINVAL; 2776 goto cleanup; 2777 } 2778 c->cmd = c->lastcmd = cmd; 2779 2780 /* Basic arity checks. */ 2781 if ((cmd->arity > 0 && cmd->arity != argc) || (argc < -cmd->arity)) { 2782 errno = EINVAL; 2783 goto cleanup; 2784 } 2785 2786 /* If this is a Redis Cluster node, we need to make sure the module is not 2787 * trying to access non-local keys, with the exception of commands 2788 * received from our master. */ 2789 if (server.cluster_enabled && !(ctx->client->flags & CLIENT_MASTER)) { 2790 /* Duplicate relevant flags in the module client. */ 2791 c->flags &= ~(CLIENT_READONLY|CLIENT_ASKING); 2792 c->flags |= ctx->client->flags & (CLIENT_READONLY|CLIENT_ASKING); 2793 if (getNodeByQuery(c,c->cmd,c->argv,c->argc,NULL,NULL) != 2794 server.cluster->myself) 2795 { 2796 errno = EPERM; 2797 goto cleanup; 2798 } 2799 } 2800 2801 /* If we are using single commands replication, we need to wrap what 2802 * we propagate into a MULTI/EXEC block, so that it will be atomic like 2803 * a Lua script in the context of AOF and slaves. */ 2804 if (replicate) moduleReplicateMultiIfNeeded(ctx); 2805 2806 /* Run the command */ 2807 int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS; 2808 if (replicate) { 2809 call_flags |= CMD_CALL_PROPAGATE_AOF; 2810 call_flags |= CMD_CALL_PROPAGATE_REPL; 2811 } 2812 call(c,call_flags); 2813 2814 /* Convert the result of the Redis command into a suitable Lua type. 2815 * The first thing we need is to create a single string from the client 2816 * output buffers. */ 2817 sds proto = sdsnewlen(c->buf,c->bufpos); 2818 c->bufpos = 0; 2819 while(listLength(c->reply)) { 2820 clientReplyBlock *o = listNodeValue(listFirst(c->reply)); 2821 2822 proto = sdscatlen(proto,o->buf,o->used); 2823 listDelNode(c->reply,listFirst(c->reply)); 2824 } 2825 reply = moduleCreateCallReplyFromProto(ctx,proto); 2826 autoMemoryAdd(ctx,REDISMODULE_AM_REPLY,reply); 2827 2828 cleanup: 2829 if (ctx->module) ctx->module->in_call--; 2830 freeClient(c); 2831 return reply; 2832 } 2833 2834 /* Return a pointer, and a length, to the protocol returned by the command 2835 * that returned the reply object. */ 2836 const char *RM_CallReplyProto(RedisModuleCallReply *reply, size_t *len) { 2837 if (reply->proto) *len = sdslen(reply->proto); 2838 return reply->proto; 2839 } 2840 2841 /* -------------------------------------------------------------------------- 2842 * Modules data types 2843 * 2844 * When String DMA or using existing data structures is not enough, it is 2845 * possible to create new data types from scratch and export them to 2846 * Redis. The module must provide a set of callbacks for handling the 2847 * new values exported (for example in order to provide RDB saving/loading, 2848 * AOF rewrite, and so forth). In this section we define this API. 2849 * -------------------------------------------------------------------------- */ 2850 2851 /* Turn a 9 chars name in the specified charset and a 10 bit encver into 2852 * a single 64 bit unsigned integer that represents this exact module name 2853 * and version. This final number is called a "type ID" and is used when 2854 * writing module exported values to RDB files, in order to re-associate the 2855 * value to the right module to load them during RDB loading. 2856 * 2857 * If the string is not of the right length or the charset is wrong, or 2858 * if encver is outside the unsigned 10 bit integer range, 0 is returned, 2859 * otherwise the function returns the right type ID. 2860 * 2861 * The resulting 64 bit integer is composed as follows: 2862 * 2863 * (high order bits) 6|6|6|6|6|6|6|6|6|10 (low order bits) 2864 * 2865 * The first 6 bits value is the first character, name[0], while the last 2866 * 6 bits value, immediately before the 10 bits integer, is name[8]. 2867 * The last 10 bits are the encoding version. 2868 * 2869 * Note that a name and encver combo of "AAAAAAAAA" and 0, will produce 2870 * zero as return value, that is the same we use to signal errors, thus 2871 * this combination is invalid, and also useless since type names should 2872 * try to be vary to avoid collisions. */ 2873 2874 const char *ModuleTypeNameCharSet = 2875 "ABCDEFGHIJKLMNOPQRSTUVWXYZ" 2876 "abcdefghijklmnopqrstuvwxyz" 2877 "0123456789-_"; 2878 2879 uint64_t moduleTypeEncodeId(const char *name, int encver) { 2880 /* We use 64 symbols so that we can map each character into 6 bits 2881 * of the final output. */ 2882 const char *cset = ModuleTypeNameCharSet; 2883 if (strlen(name) != 9) return 0; 2884 if (encver < 0 || encver > 1023) return 0; 2885 2886 uint64_t id = 0; 2887 for (int j = 0; j < 9; j++) { 2888 char *p = strchr(cset,name[j]); 2889 if (!p) return 0; 2890 unsigned long pos = p-cset; 2891 id = (id << 6) | pos; 2892 } 2893 id = (id << 10) | encver; 2894 return id; 2895 } 2896 2897 /* Search, in the list of exported data types of all the modules registered, 2898 * a type with the same name as the one given. Returns the moduleType 2899 * structure pointer if such a module is found, or NULL otherwise. */ 2900 moduleType *moduleTypeLookupModuleByName(const char *name) { 2901 dictIterator *di = dictGetIterator(modules); 2902 dictEntry *de; 2903 2904 while ((de = dictNext(di)) != NULL) { 2905 struct RedisModule *module = dictGetVal(de); 2906 listIter li; 2907 listNode *ln; 2908 2909 listRewind(module->types,&li); 2910 while((ln = listNext(&li))) { 2911 moduleType *mt = ln->value; 2912 if (memcmp(name,mt->name,sizeof(mt->name)) == 0) { 2913 dictReleaseIterator(di); 2914 return mt; 2915 } 2916 } 2917 } 2918 dictReleaseIterator(di); 2919 return NULL; 2920 } 2921 2922 /* Lookup a module by ID, with caching. This function is used during RDB 2923 * loading. Modules exporting data types should never be able to unload, so 2924 * our cache does not need to expire. */ 2925 #define MODULE_LOOKUP_CACHE_SIZE 3 2926 2927 moduleType *moduleTypeLookupModuleByID(uint64_t id) { 2928 static struct { 2929 uint64_t id; 2930 moduleType *mt; 2931 } cache[MODULE_LOOKUP_CACHE_SIZE]; 2932 2933 /* Search in cache to start. */ 2934 int j; 2935 for (j = 0; j < MODULE_LOOKUP_CACHE_SIZE && cache[j].mt != NULL; j++) 2936 if (cache[j].id == id) return cache[j].mt; 2937 2938 /* Slow module by module lookup. */ 2939 moduleType *mt = NULL; 2940 dictIterator *di = dictGetIterator(modules); 2941 dictEntry *de; 2942 2943 while ((de = dictNext(di)) != NULL && mt == NULL) { 2944 struct RedisModule *module = dictGetVal(de); 2945 listIter li; 2946 listNode *ln; 2947 2948 listRewind(module->types,&li); 2949 while((ln = listNext(&li))) { 2950 moduleType *this_mt = ln->value; 2951 /* Compare only the 54 bit module identifier and not the 2952 * encoding version. */ 2953 if (this_mt->id >> 10 == id >> 10) { 2954 mt = this_mt; 2955 break; 2956 } 2957 } 2958 } 2959 dictReleaseIterator(di); 2960 2961 /* Add to cache if possible. */ 2962 if (mt && j < MODULE_LOOKUP_CACHE_SIZE) { 2963 cache[j].id = id; 2964 cache[j].mt = mt; 2965 } 2966 return mt; 2967 } 2968 2969 /* Turn an (unresolved) module ID into a type name, to show the user an 2970 * error when RDB files contain module data we can't load. 2971 * The buffer pointed by 'name' must be 10 bytes at least. The function will 2972 * fill it with a null terminated module name. */ 2973 void moduleTypeNameByID(char *name, uint64_t moduleid) { 2974 const char *cset = ModuleTypeNameCharSet; 2975 2976 name[9] = '\0'; 2977 char *p = name+8; 2978 moduleid >>= 10; 2979 for (int j = 0; j < 9; j++) { 2980 *p-- = cset[moduleid & 63]; 2981 moduleid >>= 6; 2982 } 2983 } 2984 2985 /* Register a new data type exported by the module. The parameters are the 2986 * following. Please for in depth documentation check the modules API 2987 * documentation, especially the TYPES.md file. 2988 * 2989 * * **name**: A 9 characters data type name that MUST be unique in the Redis 2990 * Modules ecosystem. Be creative... and there will be no collisions. Use 2991 * the charset A-Z a-z 9-0, plus the two "-_" characters. A good 2992 * idea is to use, for example `<typename>-<vendor>`. For example 2993 * "tree-AntZ" may mean "Tree data structure by @antirez". To use both 2994 * lower case and upper case letters helps in order to prevent collisions. 2995 * * **encver**: Encoding version, which is, the version of the serialization 2996 * that a module used in order to persist data. As long as the "name" 2997 * matches, the RDB loading will be dispatched to the type callbacks 2998 * whatever 'encver' is used, however the module can understand if 2999 * the encoding it must load are of an older version of the module. 3000 * For example the module "tree-AntZ" initially used encver=0. Later 3001 * after an upgrade, it started to serialize data in a different format 3002 * and to register the type with encver=1. However this module may 3003 * still load old data produced by an older version if the rdb_load 3004 * callback is able to check the encver value and act accordingly. 3005 * The encver must be a positive value between 0 and 1023. 3006 * * **typemethods_ptr** is a pointer to a RedisModuleTypeMethods structure 3007 * that should be populated with the methods callbacks and structure 3008 * version, like in the following example: 3009 * 3010 * RedisModuleTypeMethods tm = { 3011 * .version = REDISMODULE_TYPE_METHOD_VERSION, 3012 * .rdb_load = myType_RDBLoadCallBack, 3013 * .rdb_save = myType_RDBSaveCallBack, 3014 * .aof_rewrite = myType_AOFRewriteCallBack, 3015 * .free = myType_FreeCallBack, 3016 * 3017 * // Optional fields 3018 * .digest = myType_DigestCallBack, 3019 * .mem_usage = myType_MemUsageCallBack, 3020 * } 3021 * 3022 * * **rdb_load**: A callback function pointer that loads data from RDB files. 3023 * * **rdb_save**: A callback function pointer that saves data to RDB files. 3024 * * **aof_rewrite**: A callback function pointer that rewrites data as commands. 3025 * * **digest**: A callback function pointer that is used for `DEBUG DIGEST`. 3026 * * **free**: A callback function pointer that can free a type value. 3027 * 3028 * The **digest* and **mem_usage** methods should currently be omitted since 3029 * they are not yet implemented inside the Redis modules core. 3030 * 3031 * Note: the module name "AAAAAAAAA" is reserved and produces an error, it 3032 * happens to be pretty lame as well. 3033 * 3034 * If there is already a module registering a type with the same name, 3035 * and if the module name or encver is invalid, NULL is returned. 3036 * Otherwise the new type is registered into Redis, and a reference of 3037 * type RedisModuleType is returned: the caller of the function should store 3038 * this reference into a gobal variable to make future use of it in the 3039 * modules type API, since a single module may register multiple types. 3040 * Example code fragment: 3041 * 3042 * static RedisModuleType *BalancedTreeType; 3043 * 3044 * int RedisModule_OnLoad(RedisModuleCtx *ctx) { 3045 * // some code here ... 3046 * BalancedTreeType = RM_CreateDataType(...); 3047 * } 3048 */ 3049 moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver, void *typemethods_ptr) { 3050 uint64_t id = moduleTypeEncodeId(name,encver); 3051 if (id == 0) return NULL; 3052 if (moduleTypeLookupModuleByName(name) != NULL) return NULL; 3053 3054 long typemethods_version = ((long*)typemethods_ptr)[0]; 3055 if (typemethods_version == 0) return NULL; 3056 3057 struct typemethods { 3058 uint64_t version; 3059 moduleTypeLoadFunc rdb_load; 3060 moduleTypeSaveFunc rdb_save; 3061 moduleTypeRewriteFunc aof_rewrite; 3062 moduleTypeMemUsageFunc mem_usage; 3063 moduleTypeDigestFunc digest; 3064 moduleTypeFreeFunc free; 3065 } *tms = (struct typemethods*) typemethods_ptr; 3066 3067 moduleType *mt = zcalloc(sizeof(*mt)); 3068 mt->id = id; 3069 mt->module = ctx->module; 3070 mt->rdb_load = tms->rdb_load; 3071 mt->rdb_save = tms->rdb_save; 3072 mt->aof_rewrite = tms->aof_rewrite; 3073 mt->mem_usage = tms->mem_usage; 3074 mt->digest = tms->digest; 3075 mt->free = tms->free; 3076 memcpy(mt->name,name,sizeof(mt->name)); 3077 listAddNodeTail(ctx->module->types,mt); 3078 return mt; 3079 } 3080 3081 /* If the key is open for writing, set the specified module type object 3082 * as the value of the key, deleting the old value if any. 3083 * On success REDISMODULE_OK is returned. If the key is not open for 3084 * writing or there is an active iterator, REDISMODULE_ERR is returned. */ 3085 int RM_ModuleTypeSetValue(RedisModuleKey *key, moduleType *mt, void *value) { 3086 if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR; 3087 RM_DeleteKey(key); 3088 robj *o = createModuleObject(mt,value); 3089 setKey(key->db,key->key,o); 3090 decrRefCount(o); 3091 key->value = o; 3092 return REDISMODULE_OK; 3093 } 3094 3095 /* Assuming RedisModule_KeyType() returned REDISMODULE_KEYTYPE_MODULE on 3096 * the key, returns the module type pointer of the value stored at key. 3097 * 3098 * If the key is NULL, is not associated with a module type, or is empty, 3099 * then NULL is returned instead. */ 3100 moduleType *RM_ModuleTypeGetType(RedisModuleKey *key) { 3101 if (key == NULL || 3102 key->value == NULL || 3103 RM_KeyType(key) != REDISMODULE_KEYTYPE_MODULE) return NULL; 3104 moduleValue *mv = key->value->ptr; 3105 return mv->type; 3106 } 3107 3108 /* Assuming RedisModule_KeyType() returned REDISMODULE_KEYTYPE_MODULE on 3109 * the key, returns the module type low-level value stored at key, as 3110 * it was set by the user via RedisModule_ModuleTypeSet(). 3111 * 3112 * If the key is NULL, is not associated with a module type, or is empty, 3113 * then NULL is returned instead. */ 3114 void *RM_ModuleTypeGetValue(RedisModuleKey *key) { 3115 if (key == NULL || 3116 key->value == NULL || 3117 RM_KeyType(key) != REDISMODULE_KEYTYPE_MODULE) return NULL; 3118 moduleValue *mv = key->value->ptr; 3119 return mv->value; 3120 } 3121 3122 /* -------------------------------------------------------------------------- 3123 * RDB loading and saving functions 3124 * -------------------------------------------------------------------------- */ 3125 3126 /* Called when there is a load error in the context of a module. This cannot 3127 * be recovered like for the built-in types. */ 3128 void moduleRDBLoadError(RedisModuleIO *io) { 3129 serverLog(LL_WARNING, 3130 "Error loading data from RDB (short read or EOF). " 3131 "Read performed by module '%s' about type '%s' " 3132 "after reading '%llu' bytes of a value.", 3133 io->type->module->name, 3134 io->type->name, 3135 (unsigned long long)io->bytes); 3136 exit(1); 3137 } 3138 3139 /* Save an unsigned 64 bit value into the RDB file. This function should only 3140 * be called in the context of the rdb_save method of modules implementing new 3141 * data types. */ 3142 void RM_SaveUnsigned(RedisModuleIO *io, uint64_t value) { 3143 if (io->error) return; 3144 /* Save opcode. */ 3145 int retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_UINT); 3146 if (retval == -1) goto saveerr; 3147 io->bytes += retval; 3148 /* Save value. */ 3149 retval = rdbSaveLen(io->rio, value); 3150 if (retval == -1) goto saveerr; 3151 io->bytes += retval; 3152 return; 3153 3154 saveerr: 3155 io->error = 1; 3156 } 3157 3158 /* Load an unsigned 64 bit value from the RDB file. This function should only 3159 * be called in the context of the rdb_load method of modules implementing 3160 * new data types. */ 3161 uint64_t RM_LoadUnsigned(RedisModuleIO *io) { 3162 if (io->ver == 2) { 3163 uint64_t opcode = rdbLoadLen(io->rio,NULL); 3164 if (opcode != RDB_MODULE_OPCODE_UINT) goto loaderr; 3165 } 3166 uint64_t value; 3167 int retval = rdbLoadLenByRef(io->rio, NULL, &value); 3168 if (retval == -1) goto loaderr; 3169 return value; 3170 3171 loaderr: 3172 moduleRDBLoadError(io); 3173 return 0; /* Never reached. */ 3174 } 3175 3176 /* Like RedisModule_SaveUnsigned() but for signed 64 bit values. */ 3177 void RM_SaveSigned(RedisModuleIO *io, int64_t value) { 3178 union {uint64_t u; int64_t i;} conv; 3179 conv.i = value; 3180 RM_SaveUnsigned(io,conv.u); 3181 } 3182 3183 /* Like RedisModule_LoadUnsigned() but for signed 64 bit values. */ 3184 int64_t RM_LoadSigned(RedisModuleIO *io) { 3185 union {uint64_t u; int64_t i;} conv; 3186 conv.u = RM_LoadUnsigned(io); 3187 return conv.i; 3188 } 3189 3190 /* In the context of the rdb_save method of a module type, saves a 3191 * string into the RDB file taking as input a RedisModuleString. 3192 * 3193 * The string can be later loaded with RedisModule_LoadString() or 3194 * other Load family functions expecting a serialized string inside 3195 * the RDB file. */ 3196 void RM_SaveString(RedisModuleIO *io, RedisModuleString *s) { 3197 if (io->error) return; 3198 /* Save opcode. */ 3199 ssize_t retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_STRING); 3200 if (retval == -1) goto saveerr; 3201 io->bytes += retval; 3202 /* Save value. */ 3203 retval = rdbSaveStringObject(io->rio, s); 3204 if (retval == -1) goto saveerr; 3205 io->bytes += retval; 3206 return; 3207 3208 saveerr: 3209 io->error = 1; 3210 } 3211 3212 /* Like RedisModule_SaveString() but takes a raw C pointer and length 3213 * as input. */ 3214 void RM_SaveStringBuffer(RedisModuleIO *io, const char *str, size_t len) { 3215 if (io->error) return; 3216 /* Save opcode. */ 3217 ssize_t retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_STRING); 3218 if (retval == -1) goto saveerr; 3219 io->bytes += retval; 3220 /* Save value. */ 3221 retval = rdbSaveRawString(io->rio, (unsigned char*)str,len); 3222 if (retval == -1) goto saveerr; 3223 io->bytes += retval; 3224 return; 3225 3226 saveerr: 3227 io->error = 1; 3228 } 3229 3230 /* Implements RM_LoadString() and RM_LoadStringBuffer() */ 3231 void *moduleLoadString(RedisModuleIO *io, int plain, size_t *lenptr) { 3232 if (io->ver == 2) { 3233 uint64_t opcode = rdbLoadLen(io->rio,NULL); 3234 if (opcode != RDB_MODULE_OPCODE_STRING) goto loaderr; 3235 } 3236 void *s = rdbGenericLoadStringObject(io->rio, 3237 plain ? RDB_LOAD_PLAIN : RDB_LOAD_NONE, lenptr); 3238 if (s == NULL) goto loaderr; 3239 return s; 3240 3241 loaderr: 3242 moduleRDBLoadError(io); 3243 return NULL; /* Never reached. */ 3244 } 3245 3246 /* In the context of the rdb_load method of a module data type, loads a string 3247 * from the RDB file, that was previously saved with RedisModule_SaveString() 3248 * functions family. 3249 * 3250 * The returned string is a newly allocated RedisModuleString object, and 3251 * the user should at some point free it with a call to RedisModule_FreeString(). 3252 * 3253 * If the data structure does not store strings as RedisModuleString objects, 3254 * the similar function RedisModule_LoadStringBuffer() could be used instead. */ 3255 RedisModuleString *RM_LoadString(RedisModuleIO *io) { 3256 return moduleLoadString(io,0,NULL); 3257 } 3258 3259 /* Like RedisModule_LoadString() but returns an heap allocated string that 3260 * was allocated with RedisModule_Alloc(), and can be resized or freed with 3261 * RedisModule_Realloc() or RedisModule_Free(). 3262 * 3263 * The size of the string is stored at '*lenptr' if not NULL. 3264 * The returned string is not automatically NULL termianted, it is loaded 3265 * exactly as it was stored inisde the RDB file. */ 3266 char *RM_LoadStringBuffer(RedisModuleIO *io, size_t *lenptr) { 3267 return moduleLoadString(io,1,lenptr); 3268 } 3269 3270 /* In the context of the rdb_save method of a module data type, saves a double 3271 * value to the RDB file. The double can be a valid number, a NaN or infinity. 3272 * It is possible to load back the value with RedisModule_LoadDouble(). */ 3273 void RM_SaveDouble(RedisModuleIO *io, double value) { 3274 if (io->error) return; 3275 /* Save opcode. */ 3276 int retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_DOUBLE); 3277 if (retval == -1) goto saveerr; 3278 io->bytes += retval; 3279 /* Save value. */ 3280 retval = rdbSaveBinaryDoubleValue(io->rio, value); 3281 if (retval == -1) goto saveerr; 3282 io->bytes += retval; 3283 return; 3284 3285 saveerr: 3286 io->error = 1; 3287 } 3288 3289 /* In the context of the rdb_save method of a module data type, loads back the 3290 * double value saved by RedisModule_SaveDouble(). */ 3291 double RM_LoadDouble(RedisModuleIO *io) { 3292 if (io->ver == 2) { 3293 uint64_t opcode = rdbLoadLen(io->rio,NULL); 3294 if (opcode != RDB_MODULE_OPCODE_DOUBLE) goto loaderr; 3295 } 3296 double value; 3297 int retval = rdbLoadBinaryDoubleValue(io->rio, &value); 3298 if (retval == -1) goto loaderr; 3299 return value; 3300 3301 loaderr: 3302 moduleRDBLoadError(io); 3303 return 0; /* Never reached. */ 3304 } 3305 3306 /* In the context of the rdb_save method of a module data type, saves a float 3307 * value to the RDB file. The float can be a valid number, a NaN or infinity. 3308 * It is possible to load back the value with RedisModule_LoadFloat(). */ 3309 void RM_SaveFloat(RedisModuleIO *io, float value) { 3310 if (io->error) return; 3311 /* Save opcode. */ 3312 int retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_FLOAT); 3313 if (retval == -1) goto saveerr; 3314 io->bytes += retval; 3315 /* Save value. */ 3316 retval = rdbSaveBinaryFloatValue(io->rio, value); 3317 if (retval == -1) goto saveerr; 3318 io->bytes += retval; 3319 return; 3320 3321 saveerr: 3322 io->error = 1; 3323 } 3324 3325 /* In the context of the rdb_save method of a module data type, loads back the 3326 * float value saved by RedisModule_SaveFloat(). */ 3327 float RM_LoadFloat(RedisModuleIO *io) { 3328 if (io->ver == 2) { 3329 uint64_t opcode = rdbLoadLen(io->rio,NULL); 3330 if (opcode != RDB_MODULE_OPCODE_FLOAT) goto loaderr; 3331 } 3332 float value; 3333 int retval = rdbLoadBinaryFloatValue(io->rio, &value); 3334 if (retval == -1) goto loaderr; 3335 return value; 3336 3337 loaderr: 3338 moduleRDBLoadError(io); 3339 return 0; /* Never reached. */ 3340 } 3341 3342 /* -------------------------------------------------------------------------- 3343 * Key digest API (DEBUG DIGEST interface for modules types) 3344 * -------------------------------------------------------------------------- */ 3345 3346 /* Add a new element to the digest. This function can be called multiple times 3347 * one element after the other, for all the elements that constitute a given 3348 * data structure. The function call must be followed by the call to 3349 * `RedisModule_DigestEndSequence` eventually, when all the elements that are 3350 * always in a given order are added. See the Redis Modules data types 3351 * documentation for more info. However this is a quick example that uses Redis 3352 * data types as an example. 3353 * 3354 * To add a sequence of unordered elements (for example in the case of a Redis 3355 * Set), the pattern to use is: 3356 * 3357 * foreach element { 3358 * AddElement(element); 3359 * EndSequence(); 3360 * } 3361 * 3362 * Because Sets are not ordered, so every element added has a position that 3363 * does not depend from the other. However if instead our elements are 3364 * ordered in pairs, like field-value pairs of an Hash, then one should 3365 * use: 3366 * 3367 * foreach key,value { 3368 * AddElement(key); 3369 * AddElement(value); 3370 * EndSquence(); 3371 * } 3372 * 3373 * Because the key and value will be always in the above order, while instead 3374 * the single key-value pairs, can appear in any position into a Redis hash. 3375 * 3376 * A list of ordered elements would be implemented with: 3377 * 3378 * foreach element { 3379 * AddElement(element); 3380 * } 3381 * EndSequence(); 3382 * 3383 */ 3384 void RM_DigestAddStringBuffer(RedisModuleDigest *md, unsigned char *ele, size_t len) { 3385 mixDigest(md->o,ele,len); 3386 } 3387 3388 /* Like `RedisModule_DigestAddStringBuffer()` but takes a long long as input 3389 * that gets converted into a string before adding it to the digest. */ 3390 void RM_DigestAddLongLong(RedisModuleDigest *md, long long ll) { 3391 char buf[LONG_STR_SIZE]; 3392 size_t len = ll2string(buf,sizeof(buf),ll); 3393 mixDigest(md->o,buf,len); 3394 } 3395 3396 /* See the documentation for `RedisModule_DigestAddElement()`. */ 3397 void RM_DigestEndSequence(RedisModuleDigest *md) { 3398 xorDigest(md->x,md->o,sizeof(md->o)); 3399 memset(md->o,0,sizeof(md->o)); 3400 } 3401 3402 /* -------------------------------------------------------------------------- 3403 * AOF API for modules data types 3404 * -------------------------------------------------------------------------- */ 3405 3406 /* Emits a command into the AOF during the AOF rewriting process. This function 3407 * is only called in the context of the aof_rewrite method of data types exported 3408 * by a module. The command works exactly like RedisModule_Call() in the way 3409 * the parameters are passed, but it does not return anything as the error 3410 * handling is performed by Redis itself. */ 3411 void RM_EmitAOF(RedisModuleIO *io, const char *cmdname, const char *fmt, ...) { 3412 if (io->error) return; 3413 struct redisCommand *cmd; 3414 robj **argv = NULL; 3415 int argc = 0, flags = 0, j; 3416 va_list ap; 3417 3418 cmd = lookupCommandByCString((char*)cmdname); 3419 if (!cmd) { 3420 serverLog(LL_WARNING, 3421 "Fatal: AOF method for module data type '%s' tried to " 3422 "emit unknown command '%s'", 3423 io->type->name, cmdname); 3424 io->error = 1; 3425 errno = EINVAL; 3426 return; 3427 } 3428 3429 /* Emit the arguments into the AOF in Redis protocol format. */ 3430 va_start(ap, fmt); 3431 argv = moduleCreateArgvFromUserFormat(cmdname,fmt,&argc,&flags,ap); 3432 va_end(ap); 3433 if (argv == NULL) { 3434 serverLog(LL_WARNING, 3435 "Fatal: AOF method for module data type '%s' tried to " 3436 "call RedisModule_EmitAOF() with wrong format specifiers '%s'", 3437 io->type->name, fmt); 3438 io->error = 1; 3439 errno = EINVAL; 3440 return; 3441 } 3442 3443 /* Bulk count. */ 3444 if (!io->error && rioWriteBulkCount(io->rio,'*',argc) == 0) 3445 io->error = 1; 3446 3447 /* Arguments. */ 3448 for (j = 0; j < argc; j++) { 3449 if (!io->error && rioWriteBulkObject(io->rio,argv[j]) == 0) 3450 io->error = 1; 3451 decrRefCount(argv[j]); 3452 } 3453 zfree(argv); 3454 return; 3455 } 3456 3457 /* -------------------------------------------------------------------------- 3458 * IO context handling 3459 * -------------------------------------------------------------------------- */ 3460 3461 RedisModuleCtx *RM_GetContextFromIO(RedisModuleIO *io) { 3462 if (io->ctx) return io->ctx; /* Can't have more than one... */ 3463 RedisModuleCtx ctxtemplate = REDISMODULE_CTX_INIT; 3464 io->ctx = zmalloc(sizeof(RedisModuleCtx)); 3465 *(io->ctx) = ctxtemplate; 3466 io->ctx->module = io->type->module; 3467 io->ctx->client = NULL; 3468 return io->ctx; 3469 } 3470 3471 /* Returns a RedisModuleString with the name of the key currently saving or 3472 * loading, when an IO data type callback is called. There is no guarantee 3473 * that the key name is always available, so this may return NULL. 3474 */ 3475 const RedisModuleString *RM_GetKeyNameFromIO(RedisModuleIO *io) { 3476 return io->key; 3477 } 3478 3479 /* -------------------------------------------------------------------------- 3480 * Logging 3481 * -------------------------------------------------------------------------- */ 3482 3483 /* This is the low level function implementing both: 3484 * 3485 * RM_Log() 3486 * RM_LogIOError() 3487 * 3488 */ 3489 void RM_LogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_list ap) { 3490 char msg[LOG_MAX_LEN]; 3491 size_t name_len; 3492 int level; 3493 3494 if (!strcasecmp(levelstr,"debug")) level = LL_DEBUG; 3495 else if (!strcasecmp(levelstr,"verbose")) level = LL_VERBOSE; 3496 else if (!strcasecmp(levelstr,"notice")) level = LL_NOTICE; 3497 else if (!strcasecmp(levelstr,"warning")) level = LL_WARNING; 3498 else level = LL_VERBOSE; /* Default. */ 3499 3500 if (level < server.verbosity) return; 3501 3502 name_len = snprintf(msg, sizeof(msg),"<%s> ", module->name); 3503 vsnprintf(msg + name_len, sizeof(msg) - name_len, fmt, ap); 3504 serverLogRaw(level,msg); 3505 } 3506 3507 /* Produces a log message to the standard Redis log, the format accepts 3508 * printf-alike specifiers, while level is a string describing the log 3509 * level to use when emitting the log, and must be one of the following: 3510 * 3511 * * "debug" 3512 * * "verbose" 3513 * * "notice" 3514 * * "warning" 3515 * 3516 * If the specified log level is invalid, verbose is used by default. 3517 * There is a fixed limit to the length of the log line this function is able 3518 * to emit, this limit is not specified but is guaranteed to be more than 3519 * a few lines of text. 3520 */ 3521 void RM_Log(RedisModuleCtx *ctx, const char *levelstr, const char *fmt, ...) { 3522 if (!ctx->module) return; /* Can only log if module is initialized */ 3523 3524 va_list ap; 3525 va_start(ap, fmt); 3526 RM_LogRaw(ctx->module,levelstr,fmt,ap); 3527 va_end(ap); 3528 } 3529 3530 /* Log errors from RDB / AOF serialization callbacks. 3531 * 3532 * This function should be used when a callback is returning a critical 3533 * error to the caller since cannot load or save the data for some 3534 * critical reason. */ 3535 void RM_LogIOError(RedisModuleIO *io, const char *levelstr, const char *fmt, ...) { 3536 va_list ap; 3537 va_start(ap, fmt); 3538 RM_LogRaw(io->type->module,levelstr,fmt,ap); 3539 va_end(ap); 3540 } 3541 3542 /* -------------------------------------------------------------------------- 3543 * Blocking clients from modules 3544 * -------------------------------------------------------------------------- */ 3545 3546 /* Readable handler for the awake pipe. We do nothing here, the awake bytes 3547 * will be actually read in a more appropriate place in the 3548 * moduleHandleBlockedClients() function that is where clients are actually 3549 * served. */ 3550 void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) { 3551 UNUSED(el); 3552 UNUSED(fd); 3553 UNUSED(mask); 3554 UNUSED(privdata); 3555 } 3556 3557 /* This is called from blocked.c in order to unblock a client: may be called 3558 * for multiple reasons while the client is in the middle of being blocked 3559 * because the client is terminated, but is also called for cleanup when a 3560 * client is unblocked in a clean way after replaying. 3561 * 3562 * What we do here is just to set the client to NULL in the redis module 3563 * blocked client handle. This way if the client is terminated while there 3564 * is a pending threaded operation involving the blocked client, we'll know 3565 * that the client no longer exists and no reply callback should be called. 3566 * 3567 * The structure RedisModuleBlockedClient will be always deallocated when 3568 * running the list of clients blocked by a module that need to be unblocked. */ 3569 void unblockClientFromModule(client *c) { 3570 RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; 3571 3572 /* Call the disconnection callback if any. */ 3573 if (bc->disconnect_callback) { 3574 RedisModuleCtx ctx = REDISMODULE_CTX_INIT; 3575 ctx.blocked_privdata = bc->privdata; 3576 ctx.module = bc->module; 3577 ctx.client = bc->client; 3578 bc->disconnect_callback(&ctx,bc); 3579 moduleFreeContext(&ctx); 3580 } 3581 3582 bc->client = NULL; 3583 /* Reset the client for a new query since, for blocking commands implemented 3584 * into modules, we do not it immediately after the command returns (and 3585 * the client blocks) in order to be still able to access the argument 3586 * vector from callbacks. */ 3587 resetClient(c); 3588 } 3589 3590 /* Block a client in the context of a blocking command, returning an handle 3591 * which will be used, later, in order to unblock the client with a call to 3592 * RedisModule_UnblockClient(). The arguments specify callback functions 3593 * and a timeout after which the client is unblocked. 3594 * 3595 * The callbacks are called in the following contexts: 3596 * 3597 * reply_callback: called after a successful RedisModule_UnblockClient() 3598 * call in order to reply to the client and unblock it. 3599 * 3600 * reply_timeout: called when the timeout is reached in order to send an 3601 * error to the client. 3602 * 3603 * free_privdata: called in order to free the private data that is passed 3604 * by RedisModule_UnblockClient() call. 3605 */ 3606 RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) { 3607 client *c = ctx->client; 3608 int islua = c->flags & CLIENT_LUA; 3609 int ismulti = c->flags & CLIENT_MULTI; 3610 3611 c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient)); 3612 RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; 3613 3614 /* We need to handle the invalid operation of calling modules blocking 3615 * commands from Lua or MULTI. We actually create an already aborted 3616 * (client set to NULL) blocked client handle, and actually reply with 3617 * an error. */ 3618 bc->client = (islua || ismulti) ? NULL : c; 3619 bc->module = ctx->module; 3620 bc->reply_callback = reply_callback; 3621 bc->timeout_callback = timeout_callback; 3622 bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */ 3623 bc->free_privdata = free_privdata; 3624 bc->privdata = NULL; 3625 bc->reply_client = createClient(-1); 3626 bc->reply_client->flags |= CLIENT_MODULE; 3627 bc->dbid = c->db->id; 3628 c->bpop.timeout = timeout_ms ? (mstime()+timeout_ms) : 0; 3629 3630 if (islua || ismulti) { 3631 c->bpop.module_blocked_handle = NULL; 3632 addReplyError(c, islua ? 3633 "Blocking module command called from Lua script" : 3634 "Blocking module command called from transaction"); 3635 } else { 3636 blockClient(c,BLOCKED_MODULE); 3637 } 3638 return bc; 3639 } 3640 3641 /* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger 3642 * the reply callbacks to be called in order to reply to the client. 3643 * The 'privdata' argument will be accessible by the reply callback, so 3644 * the caller of this function can pass any value that is needed in order to 3645 * actually reply to the client. 3646 * 3647 * A common usage for 'privdata' is a thread that computes something that 3648 * needs to be passed to the client, included but not limited some slow 3649 * to compute reply or some reply obtained via networking. 3650 * 3651 * Note: this function can be called from threads spawned by the module. */ 3652 int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) { 3653 pthread_mutex_lock(&moduleUnblockedClientsMutex); 3654 bc->privdata = privdata; 3655 listAddNodeTail(moduleUnblockedClients,bc); 3656 if (write(server.module_blocked_pipe[1],"A",1) != 1) { 3657 /* Ignore the error, this is best-effort. */ 3658 } 3659 pthread_mutex_unlock(&moduleUnblockedClientsMutex); 3660 return REDISMODULE_OK; 3661 } 3662 3663 /* Abort a blocked client blocking operation: the client will be unblocked 3664 * without firing any callback. */ 3665 int RM_AbortBlock(RedisModuleBlockedClient *bc) { 3666 bc->reply_callback = NULL; 3667 bc->disconnect_callback = NULL; 3668 return RM_UnblockClient(bc,NULL); 3669 } 3670 3671 /* Set a callback that will be called if a blocked client disconnects 3672 * before the module has a chance to call RedisModule_UnblockClient() 3673 * 3674 * Usually what you want to do there, is to cleanup your module state 3675 * so that you can call RedisModule_UnblockClient() safely, otherwise 3676 * the client will remain blocked forever if the timeout is large. 3677 * 3678 * Notes: 3679 * 3680 * 1. It is not safe to call Reply* family functions here, it is also 3681 * useless since the client is gone. 3682 * 3683 * 2. This callback is not called if the client disconnects because of 3684 * a timeout. In such a case, the client is unblocked automatically 3685 * and the timeout callback is called. 3686 */ 3687 void RM_SetDisconnectCallback(RedisModuleBlockedClient *bc, RedisModuleDisconnectFunc callback) { 3688 bc->disconnect_callback = callback; 3689 } 3690 3691 /* This function will check the moduleUnblockedClients queue in order to 3692 * call the reply callback and really unblock the client. 3693 * 3694 * Clients end into this list because of calls to RM_UnblockClient(), 3695 * however it is possible that while the module was doing work for the 3696 * blocked client, it was terminated by Redis (for timeout or other reasons). 3697 * When this happens the RedisModuleBlockedClient structure in the queue 3698 * will have the 'client' field set to NULL. */ 3699 void moduleHandleBlockedClients(void) { 3700 listNode *ln; 3701 RedisModuleBlockedClient *bc; 3702 3703 pthread_mutex_lock(&moduleUnblockedClientsMutex); 3704 /* Here we unblock all the pending clients blocked in modules operations 3705 * so we can read every pending "awake byte" in the pipe. */ 3706 char buf[1]; 3707 while (read(server.module_blocked_pipe[0],buf,1) == 1); 3708 while (listLength(moduleUnblockedClients)) { 3709 ln = listFirst(moduleUnblockedClients); 3710 bc = ln->value; 3711 client *c = bc->client; 3712 listDelNode(moduleUnblockedClients,ln); 3713 pthread_mutex_unlock(&moduleUnblockedClientsMutex); 3714 3715 /* Release the lock during the loop, as long as we don't 3716 * touch the shared list. */ 3717 3718 /* Call the reply callback if the client is valid and we have 3719 * any callback. */ 3720 if (c && bc->reply_callback) { 3721 RedisModuleCtx ctx = REDISMODULE_CTX_INIT; 3722 ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY; 3723 ctx.blocked_privdata = bc->privdata; 3724 ctx.module = bc->module; 3725 ctx.client = bc->client; 3726 ctx.blocked_client = bc; 3727 bc->reply_callback(&ctx,(void**)c->argv,c->argc); 3728 moduleHandlePropagationAfterCommandCallback(&ctx); 3729 moduleFreeContext(&ctx); 3730 } 3731 3732 /* Free privdata if any. */ 3733 if (bc->privdata && bc->free_privdata) { 3734 RedisModuleCtx ctx = REDISMODULE_CTX_INIT; 3735 if (c == NULL) 3736 ctx.flags |= REDISMODULE_CTX_BLOCKED_DISCONNECTED; 3737 ctx.blocked_privdata = bc->privdata; 3738 ctx.module = bc->module; 3739 ctx.client = bc->client; 3740 bc->free_privdata(&ctx,bc->privdata); 3741 moduleFreeContext(&ctx); 3742 } 3743 3744 /* It is possible that this blocked client object accumulated 3745 * replies to send to the client in a thread safe context. 3746 * We need to glue such replies to the client output buffer and 3747 * free the temporary client we just used for the replies. */ 3748 if (c) AddReplyFromClient(c, bc->reply_client); 3749 freeClient(bc->reply_client); 3750 3751 if (c != NULL) { 3752 /* Before unblocking the client, set the disconnect callback 3753 * to NULL, because if we reached this point, the client was 3754 * properly unblocked by the module. */ 3755 bc->disconnect_callback = NULL; 3756 unblockClient(c); 3757 /* Put the client in the list of clients that need to write 3758 * if there are pending replies here. This is needed since 3759 * during a non blocking command the client may receive output. */ 3760 if (clientHasPendingReplies(c) && 3761 !(c->flags & CLIENT_PENDING_WRITE)) 3762 { 3763 c->flags |= CLIENT_PENDING_WRITE; 3764 listAddNodeHead(server.clients_pending_write,c); 3765 } 3766 } 3767 3768 /* Free 'bc' only after unblocking the client, since it is 3769 * referenced in the client blocking context, and must be valid 3770 * when calling unblockClient(). */ 3771 zfree(bc); 3772 3773 /* Lock again before to iterate the loop. */ 3774 pthread_mutex_lock(&moduleUnblockedClientsMutex); 3775 } 3776 pthread_mutex_unlock(&moduleUnblockedClientsMutex); 3777 } 3778 3779 /* Called when our client timed out. After this function unblockClient() 3780 * is called, and it will invalidate the blocked client. So this function 3781 * does not need to do any cleanup. Eventually the module will call the 3782 * API to unblock the client and the memory will be released. */ 3783 void moduleBlockedClientTimedOut(client *c) { 3784 RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; 3785 RedisModuleCtx ctx = REDISMODULE_CTX_INIT; 3786 ctx.flags |= REDISMODULE_CTX_BLOCKED_TIMEOUT; 3787 ctx.module = bc->module; 3788 ctx.client = bc->client; 3789 ctx.blocked_client = bc; 3790 bc->timeout_callback(&ctx,(void**)c->argv,c->argc); 3791 moduleFreeContext(&ctx); 3792 /* For timeout events, we do not want to call the disconnect callback, 3793 * because the blocked client will be automatically disconnected in 3794 * this case, and the user can still hook using the timeout callback. */ 3795 bc->disconnect_callback = NULL; 3796 } 3797 3798 /* Return non-zero if a module command was called in order to fill the 3799 * reply for a blocked client. */ 3800 int RM_IsBlockedReplyRequest(RedisModuleCtx *ctx) { 3801 return (ctx->flags & REDISMODULE_CTX_BLOCKED_REPLY) != 0; 3802 } 3803 3804 /* Return non-zero if a module command was called in order to fill the 3805 * reply for a blocked client that timed out. */ 3806 int RM_IsBlockedTimeoutRequest(RedisModuleCtx *ctx) { 3807 return (ctx->flags & REDISMODULE_CTX_BLOCKED_TIMEOUT) != 0; 3808 } 3809 3810 /* Get the private data set by RedisModule_UnblockClient() */ 3811 void *RM_GetBlockedClientPrivateData(RedisModuleCtx *ctx) { 3812 return ctx->blocked_privdata; 3813 } 3814 3815 /* Get the blocked client associated with a given context. 3816 * This is useful in the reply and timeout callbacks of blocked clients, 3817 * before sometimes the module has the blocked client handle references 3818 * around, and wants to cleanup it. */ 3819 RedisModuleBlockedClient *RM_GetBlockedClientHandle(RedisModuleCtx *ctx) { 3820 return ctx->blocked_client; 3821 } 3822 3823 /* Return true if when the free callback of a blocked client is called, 3824 * the reason for the client to be unblocked is that it disconnected 3825 * while it was blocked. */ 3826 int RM_BlockedClientDisconnected(RedisModuleCtx *ctx) { 3827 return (ctx->flags & REDISMODULE_CTX_BLOCKED_DISCONNECTED) != 0; 3828 } 3829 3830 /* -------------------------------------------------------------------------- 3831 * Thread Safe Contexts 3832 * -------------------------------------------------------------------------- */ 3833 3834 /* Return a context which can be used inside threads to make Redis context 3835 * calls with certain modules APIs. If 'bc' is not NULL then the module will 3836 * be bound to a blocked client, and it will be possible to use the 3837 * `RedisModule_Reply*` family of functions to accumulate a reply for when the 3838 * client will be unblocked. Otherwise the thread safe context will be 3839 * detached by a specific client. 3840 * 3841 * To call non-reply APIs, the thread safe context must be prepared with: 3842 * 3843 * RedisModule_ThreadSafeCallStart(ctx); 3844 * ... make your call here ... 3845 * RedisModule_ThreadSafeCallStop(ctx); 3846 * 3847 * This is not needed when using `RedisModule_Reply*` functions, assuming 3848 * that a blocked client was used when the context was created, otherwise 3849 * no RedisModule_Reply* call should be made at all. 3850 * 3851 * TODO: thread safe contexts do not inherit the blocked client 3852 * selected database. */ 3853 RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) { 3854 RedisModuleCtx *ctx = zmalloc(sizeof(*ctx)); 3855 RedisModuleCtx empty = REDISMODULE_CTX_INIT; 3856 memcpy(ctx,&empty,sizeof(empty)); 3857 if (bc) { 3858 ctx->blocked_client = bc; 3859 ctx->module = bc->module; 3860 } 3861 ctx->flags |= REDISMODULE_CTX_THREAD_SAFE; 3862 /* Even when the context is associated with a blocked client, we can't 3863 * access it safely from another thread, so we create a fake client here 3864 * in order to keep things like the currently selected database and similar 3865 * things. */ 3866 ctx->client = createClient(-1); 3867 if (bc) { 3868 selectDb(ctx->client,bc->dbid); 3869 ctx->client->id = bc->client->id; 3870 } 3871 return ctx; 3872 } 3873 3874 /* Release a thread safe context. */ 3875 void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) { 3876 moduleFreeContext(ctx); 3877 zfree(ctx); 3878 } 3879 3880 /* Acquire the server lock before executing a thread safe API call. 3881 * This is not needed for `RedisModule_Reply*` calls when there is 3882 * a blocked client connected to the thread safe context. */ 3883 void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) { 3884 UNUSED(ctx); 3885 moduleAcquireGIL(); 3886 } 3887 3888 /* Release the server lock after a thread safe API call was executed. */ 3889 void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) { 3890 UNUSED(ctx); 3891 moduleReleaseGIL(); 3892 } 3893 3894 void moduleAcquireGIL(void) { 3895 pthread_mutex_lock(&moduleGIL); 3896 } 3897 3898 void moduleReleaseGIL(void) { 3899 pthread_mutex_unlock(&moduleGIL); 3900 } 3901 3902 3903 /* -------------------------------------------------------------------------- 3904 * Module Keyspace Notifications API 3905 * -------------------------------------------------------------------------- */ 3906 3907 /* Subscribe to keyspace notifications. This is a low-level version of the 3908 * keyspace-notifications API. A module can register callbacks to be notified 3909 * when keyspce events occur. 3910 * 3911 * Notification events are filtered by their type (string events, set events, 3912 * etc), and the subscriber callback receives only events that match a specific 3913 * mask of event types. 3914 * 3915 * When subscribing to notifications with RedisModule_SubscribeToKeyspaceEvents 3916 * the module must provide an event type-mask, denoting the events the subscriber 3917 * is interested in. This can be an ORed mask of any of the following flags: 3918 * 3919 * - REDISMODULE_NOTIFY_GENERIC: Generic commands like DEL, EXPIRE, RENAME 3920 * - REDISMODULE_NOTIFY_STRING: String events 3921 * - REDISMODULE_NOTIFY_LIST: List events 3922 * - REDISMODULE_NOTIFY_SET: Set events 3923 * - REDISMODULE_NOTIFY_HASH: Hash events 3924 * - REDISMODULE_NOTIFY_ZSET: Sorted Set events 3925 * - REDISMODULE_NOTIFY_EXPIRED: Expiration events 3926 * - REDISMODULE_NOTIFY_EVICTED: Eviction events 3927 * - REDISMODULE_NOTIFY_STREAM: Stream events 3928 * - REDISMODULE_NOTIFY_ALL: All events 3929 * 3930 * We do not distinguish between key events and keyspace events, and it is up 3931 * to the module to filter the actions taken based on the key. 3932 * 3933 * The subscriber signature is: 3934 * 3935 * int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, 3936 * const char *event, 3937 * RedisModuleString *key); 3938 * 3939 * `type` is the event type bit, that must match the mask given at registration 3940 * time. The event string is the actual command being executed, and key is the 3941 * relevant Redis key. 3942 * 3943 * Notification callback gets executed with a redis context that can not be 3944 * used to send anything to the client, and has the db number where the event 3945 * occurred as its selected db number. 3946 * 3947 * Notice that it is not necessary to enable notifications in redis.conf for 3948 * module notifications to work. 3949 * 3950 * Warning: the notification callbacks are performed in a synchronous manner, 3951 * so notification callbacks must to be fast, or they would slow Redis down. 3952 * If you need to take long actions, use threads to offload them. 3953 * 3954 * See https://redis.io/topics/notifications for more information. 3955 */ 3956 int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc callback) { 3957 RedisModuleKeyspaceSubscriber *sub = zmalloc(sizeof(*sub)); 3958 sub->module = ctx->module; 3959 sub->event_mask = types; 3960 sub->notify_callback = callback; 3961 sub->active = 0; 3962 3963 listAddNodeTail(moduleKeyspaceSubscribers, sub); 3964 return REDISMODULE_OK; 3965 } 3966 3967 /* Dispatcher for keyspace notifications to module subscriber functions. 3968 * This gets called only if at least one module requested to be notified on 3969 * keyspace notifications */ 3970 void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) { 3971 /* Don't do anything if there aren't any subscribers */ 3972 if (listLength(moduleKeyspaceSubscribers) == 0) return; 3973 3974 listIter li; 3975 listNode *ln; 3976 listRewind(moduleKeyspaceSubscribers,&li); 3977 3978 /* Remove irrelevant flags from the type mask */ 3979 type &= ~(NOTIFY_KEYEVENT | NOTIFY_KEYSPACE); 3980 3981 while((ln = listNext(&li))) { 3982 RedisModuleKeyspaceSubscriber *sub = ln->value; 3983 /* Only notify subscribers on events matching they registration, 3984 * and avoid subscribers triggering themselves */ 3985 if ((sub->event_mask & type) && sub->active == 0) { 3986 RedisModuleCtx ctx = REDISMODULE_CTX_INIT; 3987 ctx.module = sub->module; 3988 ctx.client = moduleFreeContextReusedClient; 3989 selectDb(ctx.client, dbid); 3990 3991 /* mark the handler as active to avoid reentrant loops. 3992 * If the subscriber performs an action triggering itself, 3993 * it will not be notified about it. */ 3994 sub->active = 1; 3995 sub->notify_callback(&ctx, type, event, key); 3996 sub->active = 0; 3997 moduleFreeContext(&ctx); 3998 } 3999 } 4000 } 4001 4002 /* Unsubscribe any notification subscribers this module has upon unloading */ 4003 void moduleUnsubscribeNotifications(RedisModule *module) { 4004 listIter li; 4005 listNode *ln; 4006 listRewind(moduleKeyspaceSubscribers,&li); 4007 while((ln = listNext(&li))) { 4008 RedisModuleKeyspaceSubscriber *sub = ln->value; 4009 if (sub->module == module) { 4010 listDelNode(moduleKeyspaceSubscribers, ln); 4011 zfree(sub); 4012 } 4013 } 4014 } 4015 4016 /* -------------------------------------------------------------------------- 4017 * Modules Cluster API 4018 * -------------------------------------------------------------------------- */ 4019 4020 /* The Cluster message callback function pointer type. */ 4021 typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len); 4022 4023 /* This structure identifies a registered caller: it must match a given module 4024 * ID, for a given message type. The callback function is just the function 4025 * that was registered as receiver. */ 4026 typedef struct moduleClusterReceiver { 4027 uint64_t module_id; 4028 RedisModuleClusterMessageReceiver callback; 4029 struct RedisModule *module; 4030 struct moduleClusterReceiver *next; 4031 } moduleClusterReceiver; 4032 4033 typedef struct moduleClusterNodeInfo { 4034 int flags; 4035 char ip[NET_IP_STR_LEN]; 4036 int port; 4037 char master_id[40]; /* Only if flags & REDISMODULE_NODE_MASTER is true. */ 4038 } mdouleClusterNodeInfo; 4039 4040 /* We have an array of message types: each bucket is a linked list of 4041 * configured receivers. */ 4042 static moduleClusterReceiver *clusterReceivers[UINT8_MAX]; 4043 4044 /* Dispatch the message to the right module receiver. */ 4045 void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len) { 4046 moduleClusterReceiver *r = clusterReceivers[type]; 4047 while(r) { 4048 if (r->module_id == module_id) { 4049 RedisModuleCtx ctx = REDISMODULE_CTX_INIT; 4050 ctx.module = r->module; 4051 ctx.client = moduleFreeContextReusedClient; 4052 selectDb(ctx.client, 0); 4053 r->callback(&ctx,sender_id,type,payload,len); 4054 moduleFreeContext(&ctx); 4055 return; 4056 } 4057 r = r->next; 4058 } 4059 } 4060 4061 /* Register a callback receiver for cluster messages of type 'type'. If there 4062 * was already a registered callback, this will replace the callback function 4063 * with the one provided, otherwise if the callback is set to NULL and there 4064 * is already a callback for this function, the callback is unregistered 4065 * (so this API call is also used in order to delete the receiver). */ 4066 void RM_RegisterClusterMessageReceiver(RedisModuleCtx *ctx, uint8_t type, RedisModuleClusterMessageReceiver callback) { 4067 if (!server.cluster_enabled) return; 4068 4069 uint64_t module_id = moduleTypeEncodeId(ctx->module->name,0); 4070 moduleClusterReceiver *r = clusterReceivers[type], *prev = NULL; 4071 while(r) { 4072 if (r->module_id == module_id) { 4073 /* Found! Set or delete. */ 4074 if (callback) { 4075 r->callback = callback; 4076 } else { 4077 /* Delete the receiver entry if the user is setting 4078 * it to NULL. Just unlink the receiver node from the 4079 * linked list. */ 4080 if (prev) 4081 prev->next = r->next; 4082 else 4083 clusterReceivers[type]->next = r->next; 4084 zfree(r); 4085 } 4086 return; 4087 } 4088 prev = r; 4089 r = r->next; 4090 } 4091 4092 /* Not found, let's add it. */ 4093 if (callback) { 4094 r = zmalloc(sizeof(*r)); 4095 r->module_id = module_id; 4096 r->module = ctx->module; 4097 r->callback = callback; 4098 r->next = clusterReceivers[type]; 4099 clusterReceivers[type] = r; 4100 } 4101 } 4102 4103 /* Send a message to all the nodes in the cluster if `target` is NULL, otherwise 4104 * at the specified target, which is a REDISMODULE_NODE_ID_LEN bytes node ID, as 4105 * returned by the receiver callback or by the nodes iteration functions. 4106 * 4107 * The function returns REDISMODULE_OK if the message was successfully sent, 4108 * otherwise if the node is not connected or such node ID does not map to any 4109 * known cluster node, REDISMODULE_ERR is returned. */ 4110 int RM_SendClusterMessage(RedisModuleCtx *ctx, char *target_id, uint8_t type, unsigned char *msg, uint32_t len) { 4111 if (!server.cluster_enabled) return REDISMODULE_ERR; 4112 uint64_t module_id = moduleTypeEncodeId(ctx->module->name,0); 4113 if (clusterSendModuleMessageToTarget(target_id,module_id,type,msg,len) == C_OK) 4114 return REDISMODULE_OK; 4115 else 4116 return REDISMODULE_ERR; 4117 } 4118 4119 /* Return an array of string pointers, each string pointer points to a cluster 4120 * node ID of exactly REDISMODULE_NODE_ID_SIZE bytes (without any null term). 4121 * The number of returned node IDs is stored into `*numnodes`. 4122 * However if this function is called by a module not running an a Redis 4123 * instance with Redis Cluster enabled, NULL is returned instead. 4124 * 4125 * The IDs returned can be used with RedisModule_GetClusterNodeInfo() in order 4126 * to get more information about single nodes. 4127 * 4128 * The array returned by this function must be freed using the function 4129 * RedisModule_FreeClusterNodesList(). 4130 * 4131 * Example: 4132 * 4133 * size_t count, j; 4134 * char **ids = RedisModule_GetClusterNodesList(ctx,&count); 4135 * for (j = 0; j < count; j++) { 4136 * RedisModule_Log("notice","Node %.*s", 4137 * REDISMODULE_NODE_ID_LEN,ids[j]); 4138 * } 4139 * RedisModule_FreeClusterNodesList(ids); 4140 */ 4141 char **RM_GetClusterNodesList(RedisModuleCtx *ctx, size_t *numnodes) { 4142 UNUSED(ctx); 4143 4144 if (!server.cluster_enabled) return NULL; 4145 size_t count = dictSize(server.cluster->nodes); 4146 char **ids = zmalloc((count+1)*REDISMODULE_NODE_ID_LEN); 4147 dictIterator *di = dictGetIterator(server.cluster->nodes); 4148 dictEntry *de; 4149 int j = 0; 4150 while((de = dictNext(di)) != NULL) { 4151 clusterNode *node = dictGetVal(de); 4152 if (node->flags & (CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE)) continue; 4153 ids[j] = zmalloc(REDISMODULE_NODE_ID_LEN); 4154 memcpy(ids[j],node->name,REDISMODULE_NODE_ID_LEN); 4155 j++; 4156 } 4157 *numnodes = j; 4158 ids[j] = NULL; /* Null term so that FreeClusterNodesList does not need 4159 * to also get the count argument. */ 4160 dictReleaseIterator(di); 4161 return ids; 4162 } 4163 4164 /* Free the node list obtained with RedisModule_GetClusterNodesList. */ 4165 void RM_FreeClusterNodesList(char **ids) { 4166 if (ids == NULL) return; 4167 for (int j = 0; ids[j]; j++) zfree(ids[j]); 4168 zfree(ids); 4169 } 4170 4171 /* Return this node ID (REDISMODULE_CLUSTER_ID_LEN bytes) or NULL if the cluster 4172 * is disabled. */ 4173 const char *RM_GetMyClusterID(void) { 4174 if (!server.cluster_enabled) return NULL; 4175 return server.cluster->myself->name; 4176 } 4177 4178 /* Return the number of nodes in the cluster, regardless of their state 4179 * (handshake, noaddress, ...) so that the number of active nodes may actually 4180 * be smaller, but not greater than this number. If the instance is not in 4181 * cluster mode, zero is returned. */ 4182 size_t RM_GetClusterSize(void) { 4183 if (!server.cluster_enabled) return 0; 4184 return dictSize(server.cluster->nodes); 4185 } 4186 4187 /* Populate the specified info for the node having as ID the specified 'id', 4188 * then returns REDISMODULE_OK. Otherwise if the node ID does not exist from 4189 * the POV of this local node, REDISMODULE_ERR is returned. 4190 * 4191 * The arguments ip, master_id, port and flags can be NULL in case we don't 4192 * need to populate back certain info. If an ip and master_id (only populated 4193 * if the instance is a slave) are specified, they point to buffers holding 4194 * at least REDISMODULE_NODE_ID_LEN bytes. The strings written back as ip 4195 * and master_id are not null terminated. 4196 * 4197 * The list of flags reported is the following: 4198 * 4199 * * REDISMODULE_NODE_MYSELF This node 4200 * * REDISMODULE_NODE_MASTER The node is a master 4201 * * REDISMODULE_NODE_SLAVE The node is a replica 4202 * * REDISMODULE_NODE_PFAIL We see the node as failing 4203 * * REDISMODULE_NODE_FAIL The cluster agrees the node is failing 4204 * * REDISMODULE_NODE_NOFAILOVER The slave is configured to never failover 4205 */ 4206 4207 clusterNode *clusterLookupNode(const char *name); /* We need access to internals */ 4208 4209 int RM_GetClusterNodeInfo(RedisModuleCtx *ctx, const char *id, char *ip, char *master_id, int *port, int *flags) { 4210 UNUSED(ctx); 4211 4212 clusterNode *node = clusterLookupNode(id); 4213 if (node->flags & (CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE)) 4214 return REDISMODULE_ERR; 4215 4216 if (ip) memcpy(ip,node->name,REDISMODULE_NODE_ID_LEN); 4217 4218 if (master_id) { 4219 /* If the information is not available, the function will set the 4220 * field to zero bytes, so that when the field can't be populated the 4221 * function kinda remains predictable. */ 4222 if (node->flags & CLUSTER_NODE_MASTER && node->slaveof) 4223 memcpy(master_id,node->slaveof->name,REDISMODULE_NODE_ID_LEN); 4224 else 4225 memset(master_id,0,REDISMODULE_NODE_ID_LEN); 4226 } 4227 if (port) *port = node->port; 4228 4229 /* As usually we have to remap flags for modules, in order to ensure 4230 * we can provide binary compatibility. */ 4231 if (flags) { 4232 *flags = 0; 4233 if (node->flags & CLUSTER_NODE_MYSELF) *flags |= REDISMODULE_NODE_MYSELF; 4234 if (node->flags & CLUSTER_NODE_MASTER) *flags |= REDISMODULE_NODE_MASTER; 4235 if (node->flags & CLUSTER_NODE_SLAVE) *flags |= REDISMODULE_NODE_SLAVE; 4236 if (node->flags & CLUSTER_NODE_PFAIL) *flags |= REDISMODULE_NODE_PFAIL; 4237 if (node->flags & CLUSTER_NODE_FAIL) *flags |= REDISMODULE_NODE_FAIL; 4238 if (node->flags & CLUSTER_NODE_NOFAILOVER) *flags |= REDISMODULE_NODE_NOFAILOVER; 4239 } 4240 return REDISMODULE_OK; 4241 } 4242 4243 /* Set Redis Cluster flags in order to change the normal behavior of 4244 * Redis Cluster, especially with the goal of disabling certain functions. 4245 * This is useful for modules that use the Cluster API in order to create 4246 * a different distributed system, but still want to use the Redis Cluster 4247 * message bus. Flags that can be set: 4248 * 4249 * CLUSTER_MODULE_FLAG_NO_FAILOVER 4250 * CLUSTER_MODULE_FLAG_NO_REDIRECTION 4251 * 4252 * With the following effects: 4253 * 4254 * NO_FAILOVER: prevent Redis Cluster slaves to failover a failing master. 4255 * Also disables the replica migration feature. 4256 * 4257 * NO_REDIRECTION: Every node will accept any key, without trying to perform 4258 * partitioning according to the user Redis Cluster algorithm. 4259 * Slots informations will still be propagated across the 4260 * cluster, but without effects. */ 4261 void RM_SetClusterFlags(RedisModuleCtx *ctx, uint64_t flags) { 4262 UNUSED(ctx); 4263 if (flags & REDISMODULE_CLUSTER_FLAG_NO_FAILOVER) 4264 server.cluster_module_flags |= CLUSTER_MODULE_FLAG_NO_FAILOVER; 4265 if (flags & REDISMODULE_CLUSTER_FLAG_NO_REDIRECTION) 4266 server.cluster_module_flags |= CLUSTER_MODULE_FLAG_NO_REDIRECTION; 4267 } 4268 4269 /* -------------------------------------------------------------------------- 4270 * Modules Timers API 4271 * 4272 * Module timers are an high precision "green timers" abstraction where 4273 * every module can register even millions of timers without problems, even if 4274 * the actual event loop will just have a single timer that is used to awake the 4275 * module timers subsystem in order to process the next event. 4276 * 4277 * All the timers are stored into a radix tree, ordered by expire time, when 4278 * the main Redis event loop timer callback is called, we try to process all 4279 * the timers already expired one after the other. Then we re-enter the event 4280 * loop registering a timer that will expire when the next to process module 4281 * timer will expire. 4282 * 4283 * Every time the list of active timers drops to zero, we unregister the 4284 * main event loop timer, so that there is no overhead when such feature is 4285 * not used. 4286 * -------------------------------------------------------------------------- */ 4287 4288 static rax *Timers; /* The radix tree of all the timers sorted by expire. */ 4289 long long aeTimer = -1; /* Main event loop (ae.c) timer identifier. */ 4290 4291 typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data); 4292 4293 /* The timer descriptor, stored as value in the radix tree. */ 4294 typedef struct RedisModuleTimer { 4295 RedisModule *module; /* Module reference. */ 4296 RedisModuleTimerProc callback; /* The callback to invoke on expire. */ 4297 void *data; /* Private data for the callback. */ 4298 int dbid; /* Database number selected by the original client. */ 4299 } RedisModuleTimer; 4300 4301 /* This is the timer handler that is called by the main event loop. We schedule 4302 * this timer to be called when the nearest of our module timers will expire. */ 4303 int moduleTimerHandler(struct aeEventLoop *eventLoop, long long id, void *clientData) { 4304 UNUSED(eventLoop); 4305 UNUSED(id); 4306 UNUSED(clientData); 4307 4308 /* To start let's try to fire all the timers already expired. */ 4309 raxIterator ri; 4310 raxStart(&ri,Timers); 4311 uint64_t now = ustime(); 4312 long long next_period = 0; 4313 while(1) { 4314 raxSeek(&ri,"^",NULL,0); 4315 if (!raxNext(&ri)) break; 4316 uint64_t expiretime; 4317 memcpy(&expiretime,ri.key,sizeof(expiretime)); 4318 expiretime = ntohu64(expiretime); 4319 if (now >= expiretime) { 4320 RedisModuleTimer *timer = ri.data; 4321 RedisModuleCtx ctx = REDISMODULE_CTX_INIT; 4322 4323 ctx.module = timer->module; 4324 ctx.client = moduleFreeContextReusedClient; 4325 selectDb(ctx.client, timer->dbid); 4326 timer->callback(&ctx,timer->data); 4327 moduleFreeContext(&ctx); 4328 raxRemove(Timers,(unsigned char*)ri.key,ri.key_len,NULL); 4329 zfree(timer); 4330 } else { 4331 next_period = (expiretime-now)/1000; /* Scale to milliseconds. */ 4332 break; 4333 } 4334 } 4335 raxStop(&ri); 4336 4337 /* Reschedule the next timer or cancel it. */ 4338 if (next_period <= 0) next_period = 1; 4339 return (raxSize(Timers) > 0) ? next_period : AE_NOMORE; 4340 } 4341 4342 /* Create a new timer that will fire after `period` milliseconds, and will call 4343 * the specified function using `data` as argument. The returned timer ID can be 4344 * used to get information from the timer or to stop it before it fires. */ 4345 RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisModuleTimerProc callback, void *data) { 4346 RedisModuleTimer *timer = zmalloc(sizeof(*timer)); 4347 timer->module = ctx->module; 4348 timer->callback = callback; 4349 timer->data = data; 4350 timer->dbid = ctx->client->db->id; 4351 uint64_t expiretime = ustime()+period*1000; 4352 uint64_t key; 4353 4354 while(1) { 4355 key = htonu64(expiretime); 4356 if (raxFind(Timers, (unsigned char*)&key,sizeof(key)) == raxNotFound) { 4357 raxInsert(Timers,(unsigned char*)&key,sizeof(key),timer,NULL); 4358 break; 4359 } else { 4360 expiretime++; 4361 } 4362 } 4363 4364 /* We need to install the main event loop timer if it's not already 4365 * installed, or we may need to refresh its period if we just installed 4366 * a timer that will expire sooner than any other else. */ 4367 if (aeTimer != -1) { 4368 raxIterator ri; 4369 raxStart(&ri,Timers); 4370 raxSeek(&ri,"^",NULL,0); 4371 raxNext(&ri); 4372 if (memcmp(ri.key,&key,sizeof(key)) == 0) { 4373 /* This is the first key, we need to re-install the timer according 4374 * to the just added event. */ 4375 aeDeleteTimeEvent(server.el,aeTimer); 4376 aeTimer = -1; 4377 } 4378 raxStop(&ri); 4379 } 4380 4381 /* If we have no main timer (the old one was invalidated, or this is the 4382 * first module timer we have), install one. */ 4383 if (aeTimer == -1) 4384 aeTimer = aeCreateTimeEvent(server.el,period,moduleTimerHandler,NULL,NULL); 4385 4386 return key; 4387 } 4388 4389 /* Stop a timer, returns REDISMODULE_OK if the timer was found, belonged to the 4390 * calling module, and was stopped, otherwise REDISMODULE_ERR is returned. 4391 * If not NULL, the data pointer is set to the value of the data argument when 4392 * the timer was created. */ 4393 int RM_StopTimer(RedisModuleCtx *ctx, RedisModuleTimerID id, void **data) { 4394 RedisModuleTimer *timer = raxFind(Timers,(unsigned char*)&id,sizeof(id)); 4395 if (timer == raxNotFound || timer->module != ctx->module) 4396 return REDISMODULE_ERR; 4397 if (data) *data = timer->data; 4398 raxRemove(Timers,(unsigned char*)&id,sizeof(id),NULL); 4399 zfree(timer); 4400 return REDISMODULE_OK; 4401 } 4402 4403 /* Obtain information about a timer: its remaining time before firing 4404 * (in milliseconds), and the private data pointer associated with the timer. 4405 * If the timer specified does not exist or belongs to a different module 4406 * no information is returned and the function returns REDISMODULE_ERR, otherwise 4407 * REDISMODULE_OK is returned. The arguments remaining or data can be NULL if 4408 * the caller does not need certain information. */ 4409 int RM_GetTimerInfo(RedisModuleCtx *ctx, RedisModuleTimerID id, uint64_t *remaining, void **data) { 4410 RedisModuleTimer *timer = raxFind(Timers,(unsigned char*)&id,sizeof(id)); 4411 if (timer == raxNotFound || timer->module != ctx->module) 4412 return REDISMODULE_ERR; 4413 if (remaining) { 4414 int64_t rem = ntohu64(id)-ustime(); 4415 if (rem < 0) rem = 0; 4416 *remaining = rem/1000; /* Scale to milliseconds. */ 4417 } 4418 if (data) *data = timer->data; 4419 return REDISMODULE_OK; 4420 } 4421 4422 /* -------------------------------------------------------------------------- 4423 * Modules Dictionary API 4424 * 4425 * Implements a sorted dictionary (actually backed by a radix tree) with 4426 * the usual get / set / del / num-items API, together with an iterator 4427 * capable of going back and forth. 4428 * -------------------------------------------------------------------------- */ 4429 4430 /* Create a new dictionary. The 'ctx' pointer can be the current module context 4431 * or NULL, depending on what you want. Please follow the following rules: 4432 * 4433 * 1. Use a NULL context if you plan to retain a reference to this dictionary 4434 * that will survive the time of the module callback where you created it. 4435 * 2. Use a NULL context if no context is available at the time you are creating 4436 * the dictionary (of course...). 4437 * 3. However use the current callback context as 'ctx' argument if the 4438 * dictionary time to live is just limited to the callback scope. In this 4439 * case, if enabled, you can enjoy the automatic memory management that will 4440 * reclaim the dictionary memory, as well as the strings returned by the 4441 * Next / Prev dictionary iterator calls. 4442 */ 4443 RedisModuleDict *RM_CreateDict(RedisModuleCtx *ctx) { 4444 struct RedisModuleDict *d = zmalloc(sizeof(*d)); 4445 d->rax = raxNew(); 4446 if (ctx != NULL) autoMemoryAdd(ctx,REDISMODULE_AM_DICT,d); 4447 return d; 4448 } 4449 4450 /* Free a dictionary created with RM_CreateDict(). You need to pass the 4451 * context pointer 'ctx' only if the dictionary was created using the 4452 * context instead of passing NULL. */ 4453 void RM_FreeDict(RedisModuleCtx *ctx, RedisModuleDict *d) { 4454 if (ctx != NULL) autoMemoryFreed(ctx,REDISMODULE_AM_DICT,d); 4455 raxFree(d->rax); 4456 zfree(d); 4457 } 4458 4459 /* Return the size of the dictionary (number of keys). */ 4460 uint64_t RM_DictSize(RedisModuleDict *d) { 4461 return raxSize(d->rax); 4462 } 4463 4464 /* Store the specified key into the dictionary, setting its value to the 4465 * pointer 'ptr'. If the key was added with success, since it did not 4466 * already exist, REDISMODULE_OK is returned. Otherwise if the key already 4467 * exists the function returns REDISMODULE_ERR. */ 4468 int RM_DictSetC(RedisModuleDict *d, void *key, size_t keylen, void *ptr) { 4469 int retval = raxTryInsert(d->rax,key,keylen,ptr,NULL); 4470 return (retval == 1) ? REDISMODULE_OK : REDISMODULE_ERR; 4471 } 4472 4473 /* Like RedisModule_DictSetC() but will replace the key with the new 4474 * value if the key already exists. */ 4475 int RM_DictReplaceC(RedisModuleDict *d, void *key, size_t keylen, void *ptr) { 4476 int retval = raxInsert(d->rax,key,keylen,ptr,NULL); 4477 return (retval == 1) ? REDISMODULE_OK : REDISMODULE_ERR; 4478 } 4479 4480 /* Like RedisModule_DictSetC() but takes the key as a RedisModuleString. */ 4481 int RM_DictSet(RedisModuleDict *d, RedisModuleString *key, void *ptr) { 4482 return RM_DictSetC(d,key->ptr,sdslen(key->ptr),ptr); 4483 } 4484 4485 /* Like RedisModule_DictReplaceC() but takes the key as a RedisModuleString. */ 4486 int RM_DictReplace(RedisModuleDict *d, RedisModuleString *key, void *ptr) { 4487 return RM_DictReplaceC(d,key->ptr,sdslen(key->ptr),ptr); 4488 } 4489 4490 /* Return the value stored at the specified key. The function returns NULL 4491 * both in the case the key does not exist, or if you actually stored 4492 * NULL at key. So, optionally, if the 'nokey' pointer is not NULL, it will 4493 * be set by reference to 1 if the key does not exist, or to 0 if the key 4494 * exists. */ 4495 void *RM_DictGetC(RedisModuleDict *d, void *key, size_t keylen, int *nokey) { 4496 void *res = raxFind(d->rax,key,keylen); 4497 if (nokey) *nokey = (res == raxNotFound); 4498 return (res == raxNotFound) ? NULL : res; 4499 } 4500 4501 /* Like RedisModule_DictGetC() but takes the key as a RedisModuleString. */ 4502 void *RM_DictGet(RedisModuleDict *d, RedisModuleString *key, int *nokey) { 4503 return RM_DictGetC(d,key->ptr,sdslen(key->ptr),nokey); 4504 } 4505 4506 /* Remove the specified key from the dictionary, returning REDISMODULE_OK if 4507 * the key was found and delted, or REDISMODULE_ERR if instead there was 4508 * no such key in the dictionary. When the operation is successful, if 4509 * 'oldval' is not NULL, then '*oldval' is set to the value stored at the 4510 * key before it was deleted. Using this feature it is possible to get 4511 * a pointer to the value (for instance in order to release it), without 4512 * having to call RedisModule_DictGet() before deleting the key. */ 4513 int RM_DictDelC(RedisModuleDict *d, void *key, size_t keylen, void *oldval) { 4514 int retval = raxRemove(d->rax,key,keylen,oldval); 4515 return retval ? REDISMODULE_OK : REDISMODULE_ERR; 4516 } 4517 4518 /* Like RedisModule_DictDelC() but gets the key as a RedisModuleString. */ 4519 int RM_DictDel(RedisModuleDict *d, RedisModuleString *key, void *oldval) { 4520 return RM_DictDelC(d,key->ptr,sdslen(key->ptr),oldval); 4521 } 4522 4523 /* Return an interator, setup in order to start iterating from the specified 4524 * key by applying the operator 'op', which is just a string specifying the 4525 * comparison operator to use in order to seek the first element. The 4526 * operators avalable are: 4527 * 4528 * "^" -- Seek the first (lexicographically smaller) key. 4529 * "$" -- Seek the last (lexicographically biffer) key. 4530 * ">" -- Seek the first element greter than the specified key. 4531 * ">=" -- Seek the first element greater or equal than the specified key. 4532 * "<" -- Seek the first element smaller than the specified key. 4533 * "<=" -- Seek the first element smaller or equal than the specified key. 4534 * "==" -- Seek the first element matching exactly the specified key. 4535 * 4536 * Note that for "^" and "$" the passed key is not used, and the user may 4537 * just pass NULL with a length of 0. 4538 * 4539 * If the element to start the iteration cannot be seeked based on the 4540 * key and operator passed, RedisModule_DictNext() / Prev() will just return 4541 * REDISMODULE_ERR at the first call, otherwise they'll produce elements. 4542 */ 4543 RedisModuleDictIter *RM_DictIteratorStartC(RedisModuleDict *d, const char *op, void *key, size_t keylen) { 4544 RedisModuleDictIter *di = zmalloc(sizeof(*di)); 4545 di->dict = d; 4546 raxStart(&di->ri,d->rax); 4547 raxSeek(&di->ri,op,key,keylen); 4548 return di; 4549 } 4550 4551 /* Exactly like RedisModule_DictIteratorStartC, but the key is passed as a 4552 * RedisModuleString. */ 4553 RedisModuleDictIter *RM_DictIteratorStart(RedisModuleDict *d, const char *op, RedisModuleString *key) { 4554 return RM_DictIteratorStartC(d,op,key->ptr,sdslen(key->ptr)); 4555 } 4556 4557 /* Release the iterator created with RedisModule_DictIteratorStart(). This call 4558 * is mandatory otherwise a memory leak is introduced in the module. */ 4559 void RM_DictIteratorStop(RedisModuleDictIter *di) { 4560 raxStop(&di->ri); 4561 zfree(di); 4562 } 4563 4564 /* After its creation with RedisModule_DictIteratorStart(), it is possible to 4565 * change the currently selected element of the iterator by using this 4566 * API call. The result based on the operator and key is exactly like 4567 * the function RedisModule_DictIteratorStart(), however in this case the 4568 * return value is just REDISMODULE_OK in case the seeked element was found, 4569 * or REDISMODULE_ERR in case it was not possible to seek the specified 4570 * element. It is possible to reseek an iterator as many times as you want. */ 4571 int RM_DictIteratorReseekC(RedisModuleDictIter *di, const char *op, void *key, size_t keylen) { 4572 return raxSeek(&di->ri,op,key,keylen); 4573 } 4574 4575 /* Like RedisModule_DictIteratorReseekC() but takes the key as as a 4576 * RedisModuleString. */ 4577 int RM_DictIteratorReseek(RedisModuleDictIter *di, const char *op, RedisModuleString *key) { 4578 return RM_DictIteratorReseekC(di,op,key->ptr,sdslen(key->ptr)); 4579 } 4580 4581 /* Return the current item of the dictionary iterator 'di' and steps to the 4582 * next element. If the iterator already yield the last element and there 4583 * are no other elements to return, NULL is returned, otherwise a pointer 4584 * to a string representing the key is provided, and the '*keylen' length 4585 * is set by reference (if keylen is not NULL). The '*dataptr', if not NULL 4586 * is set to the value of the pointer stored at the returned key as auxiliary 4587 * data (as set by the RedisModule_DictSet API). 4588 * 4589 * Usage example: 4590 * 4591 * ... create the iterator here ... 4592 * char *key; 4593 * void *data; 4594 * while((key = RedisModule_DictNextC(iter,&keylen,&data)) != NULL) { 4595 * printf("%.*s %p\n", (int)keylen, key, data); 4596 * } 4597 * 4598 * The returned pointer is of type void because sometimes it makes sense 4599 * to cast it to a char* sometimes to an unsigned char* depending on the 4600 * fact it contains or not binary data, so this API ends being more 4601 * comfortable to use. 4602 * 4603 * The validity of the returned pointer is until the next call to the 4604 * next/prev iterator step. Also the pointer is no longer valid once the 4605 * iterator is released. */ 4606 void *RM_DictNextC(RedisModuleDictIter *di, size_t *keylen, void **dataptr) { 4607 if (!raxNext(&di->ri)) return NULL; 4608 if (keylen) *keylen = di->ri.key_len; 4609 if (dataptr) *dataptr = di->ri.data; 4610 return di->ri.key; 4611 } 4612 4613 /* This function is exactly like RedisModule_DictNext() but after returning 4614 * the currently selected element in the iterator, it selects the previous 4615 * element (laxicographically smaller) instead of the next one. */ 4616 void *RM_DictPrevC(RedisModuleDictIter *di, size_t *keylen, void **dataptr) { 4617 if (!raxPrev(&di->ri)) return NULL; 4618 if (keylen) *keylen = di->ri.key_len; 4619 if (dataptr) *dataptr = di->ri.data; 4620 return di->ri.key; 4621 } 4622 4623 /* Like RedisModuleNextC(), but instead of returning an internally allocated 4624 * buffer and key length, it returns directly a module string object allocated 4625 * in the specified context 'ctx' (that may be NULL exactly like for the main 4626 * API RedisModule_CreateString). 4627 * 4628 * The returned string object should be deallocated after use, either manually 4629 * or by using a context that has automatic memory management active. */ 4630 RedisModuleString *RM_DictNext(RedisModuleCtx *ctx, RedisModuleDictIter *di, void **dataptr) { 4631 size_t keylen; 4632 void *key = RM_DictNextC(di,&keylen,dataptr); 4633 if (key == NULL) return NULL; 4634 return RM_CreateString(ctx,key,keylen); 4635 } 4636 4637 /* Like RedisModule_DictNext() but after returning the currently selected 4638 * element in the iterator, it selects the previous element (laxicographically 4639 * smaller) instead of the next one. */ 4640 RedisModuleString *RM_DictPrev(RedisModuleCtx *ctx, RedisModuleDictIter *di, void **dataptr) { 4641 size_t keylen; 4642 void *key = RM_DictPrevC(di,&keylen,dataptr); 4643 if (key == NULL) return NULL; 4644 return RM_CreateString(ctx,key,keylen); 4645 } 4646 4647 /* Compare the element currently pointed by the iterator to the specified 4648 * element given by key/keylen, according to the operator 'op' (the set of 4649 * valid operators are the same valid for RedisModule_DictIteratorStart). 4650 * If the comparision is successful the command returns REDISMODULE_OK 4651 * otherwise REDISMODULE_ERR is returned. 4652 * 4653 * This is useful when we want to just emit a lexicographical range, so 4654 * in the loop, as we iterate elements, we can also check if we are still 4655 * on range. 4656 * 4657 * The function returne REDISMODULE_ERR if the iterator reached the 4658 * end of elements condition as well. */ 4659 int RM_DictCompareC(RedisModuleDictIter *di, const char *op, void *key, size_t keylen) { 4660 if (raxEOF(&di->ri)) return REDISMODULE_ERR; 4661 int res = raxCompare(&di->ri,op,key,keylen); 4662 return res ? REDISMODULE_OK : REDISMODULE_ERR; 4663 } 4664 4665 /* Like RedisModule_DictCompareC but gets the key to compare with the current 4666 * iterator key as a RedisModuleString. */ 4667 int RM_DictCompare(RedisModuleDictIter *di, const char *op, RedisModuleString *key) { 4668 if (raxEOF(&di->ri)) return REDISMODULE_ERR; 4669 int res = raxCompare(&di->ri,op,key->ptr,sdslen(key->ptr)); 4670 return res ? REDISMODULE_OK : REDISMODULE_ERR; 4671 } 4672 4673 /* -------------------------------------------------------------------------- 4674 * Modules utility APIs 4675 * -------------------------------------------------------------------------- */ 4676 4677 /* Return random bytes using SHA1 in counter mode with a /dev/urandom 4678 * initialized seed. This function is fast so can be used to generate 4679 * many bytes without any effect on the operating system entropy pool. 4680 * Currently this function is not thread safe. */ 4681 void RM_GetRandomBytes(unsigned char *dst, size_t len) { 4682 getRandomBytes(dst,len); 4683 } 4684 4685 /* Like RedisModule_GetRandomBytes() but instead of setting the string to 4686 * random bytes the string is set to random characters in the in the 4687 * hex charset [0-9a-f]. */ 4688 void RM_GetRandomHexChars(char *dst, size_t len) { 4689 getRandomHexChars(dst,len); 4690 } 4691 4692 /* -------------------------------------------------------------------------- 4693 * Modules API exporting / importing 4694 * -------------------------------------------------------------------------- */ 4695 4696 /* This function is called by a module in order to export some API with a 4697 * given name. Other modules will be able to use this API by calling the 4698 * symmetrical function RM_GetSharedAPI() and casting the return value to 4699 * the right function pointer. 4700 * 4701 * The function will return REDISMODULE_OK if the name is not already taken, 4702 * otherwise REDISMODULE_ERR will be returned and no operation will be 4703 * performed. 4704 * 4705 * IMPORTANT: the apiname argument should be a string literal with static 4706 * lifetime. The API relies on the fact that it will always be valid in 4707 * the future. */ 4708 int RM_ExportSharedAPI(RedisModuleCtx *ctx, const char *apiname, void *func) { 4709 RedisModuleSharedAPI *sapi = zmalloc(sizeof(*sapi)); 4710 sapi->module = ctx->module; 4711 sapi->func = func; 4712 if (dictAdd(server.sharedapi, (char*)apiname, sapi) != DICT_OK) { 4713 zfree(sapi); 4714 return REDISMODULE_ERR; 4715 } 4716 return REDISMODULE_OK; 4717 } 4718 4719 /* Request an exported API pointer. The return value is just a void pointer 4720 * that the caller of this function will be required to cast to the right 4721 * function pointer, so this is a private contract between modules. 4722 * 4723 * If the requested API is not available then NULL is returned. Because 4724 * modules can be loaded at different times with different order, this 4725 * function calls should be put inside some module generic API registering 4726 * step, that is called every time a module attempts to execute a 4727 * command that requires external APIs: if some API cannot be resolved, the 4728 * command should return an error. 4729 * 4730 * Here is an exmaple: 4731 * 4732 * int ... myCommandImplementation() { 4733 * if (getExternalAPIs() == 0) { 4734 * reply with an error here if we cannot have the APIs 4735 * } 4736 * // Use the API: 4737 * myFunctionPointer(foo); 4738 * } 4739 * 4740 * And the function registerAPI() is: 4741 * 4742 * int getExternalAPIs(void) { 4743 * static int api_loaded = 0; 4744 * if (api_loaded != 0) return 1; // APIs already resolved. 4745 * 4746 * myFunctionPointer = RedisModule_GetOtherModuleAPI("..."); 4747 * if (myFunctionPointer == NULL) return 0; 4748 * 4749 * return 1; 4750 * } 4751 */ 4752 void *RM_GetSharedAPI(RedisModuleCtx *ctx, const char *apiname) { 4753 dictEntry *de = dictFind(server.sharedapi, apiname); 4754 if (de == NULL) return NULL; 4755 RedisModuleSharedAPI *sapi = dictGetVal(de); 4756 if (listSearchKey(sapi->module->usedby,ctx->module) == NULL) { 4757 listAddNodeTail(sapi->module->usedby,ctx->module); 4758 listAddNodeTail(ctx->module->using,sapi->module); 4759 } 4760 return sapi->func; 4761 } 4762 4763 /* Remove all the APIs registered by the specified module. Usually you 4764 * want this when the module is going to be unloaded. This function 4765 * assumes that's caller responsibility to make sure the APIs are not 4766 * used by other modules. 4767 * 4768 * The number of unregistered APIs is returned. */ 4769 int moduleUnregisterSharedAPI(RedisModule *module) { 4770 int count = 0; 4771 dictIterator *di = dictGetSafeIterator(server.sharedapi); 4772 dictEntry *de; 4773 while ((de = dictNext(di)) != NULL) { 4774 const char *apiname = dictGetKey(de); 4775 RedisModuleSharedAPI *sapi = dictGetVal(de); 4776 if (sapi->module == module) { 4777 dictDelete(server.sharedapi,apiname); 4778 zfree(sapi); 4779 count++; 4780 } 4781 } 4782 dictReleaseIterator(di); 4783 return count; 4784 } 4785 4786 /* Remove the specified module as an user of APIs of ever other module. 4787 * This is usually called when a module is unloaded. 4788 * 4789 * Returns the number of modules this module was using APIs from. */ 4790 int moduleUnregisterUsedAPI(RedisModule *module) { 4791 listIter li; 4792 listNode *ln; 4793 int count = 0; 4794 4795 listRewind(module->using,&li); 4796 while((ln = listNext(&li))) { 4797 RedisModule *used = ln->value; 4798 listNode *ln = listSearchKey(used->usedby,module); 4799 if (ln) { 4800 listDelNode(module->using,ln); 4801 count++; 4802 } 4803 } 4804 return count; 4805 } 4806 4807 /* Unregister all filters registered by a module. 4808 * This is called when a module is being unloaded. 4809 * 4810 * Returns the number of filters unregistered. */ 4811 int moduleUnregisterFilters(RedisModule *module) { 4812 listIter li; 4813 listNode *ln; 4814 int count = 0; 4815 4816 listRewind(module->filters,&li); 4817 while((ln = listNext(&li))) { 4818 RedisModuleCommandFilter *filter = ln->value; 4819 listNode *ln = listSearchKey(moduleCommandFilters,filter); 4820 if (ln) { 4821 listDelNode(moduleCommandFilters,ln); 4822 count++; 4823 } 4824 zfree(filter); 4825 } 4826 return count; 4827 } 4828 4829 /* -------------------------------------------------------------------------- 4830 * Module Command Filter API 4831 * -------------------------------------------------------------------------- */ 4832 4833 /* Register a new command filter function. 4834 * 4835 * Command filtering makes it possible for modules to extend Redis by plugging 4836 * into the execution flow of all commands. 4837 * 4838 * A registered filter gets called before Redis executes *any* command. This 4839 * includes both core Redis commands and commands registered by any module. The 4840 * filter applies in all execution paths including: 4841 * 4842 * 1. Invocation by a client. 4843 * 2. Invocation through `RedisModule_Call()` by any module. 4844 * 3. Invocation through Lua 'redis.call()`. 4845 * 4. Replication of a command from a master. 4846 * 4847 * The filter executes in a special filter context, which is different and more 4848 * limited than a RedisModuleCtx. Because the filter affects any command, it 4849 * must be implemented in a very efficient way to reduce the performance impact 4850 * on Redis. All Redis Module API calls that require a valid context (such as 4851 * `RedisModule_Call()`, `RedisModule_OpenKey()`, etc.) are not supported in a 4852 * filter context. 4853 * 4854 * The `RedisModuleCommandFilterCtx` can be used to inspect or modify the 4855 * executed command and its arguments. As the filter executes before Redis 4856 * begins processing the command, any change will affect the way the command is 4857 * processed. For example, a module can override Redis commands this way: 4858 * 4859 * 1. Register a `MODULE.SET` command which implements an extended version of 4860 * the Redis `SET` command. 4861 * 2. Register a command filter which detects invocation of `SET` on a specific 4862 * pattern of keys. Once detected, the filter will replace the first 4863 * argument from `SET` to `MODULE.SET`. 4864 * 3. When filter execution is complete, Redis considers the new command name 4865 * and therefore executes the module's own command. 4866 * 4867 * Note that in the above use case, if `MODULE.SET` itself uses 4868 * `RedisModule_Call()` the filter will be applied on that call as well. If 4869 * that is not desired, the `REDISMODULE_CMDFILTER_NOSELF` flag can be set when 4870 * registering the filter. 4871 * 4872 * The `REDISMODULE_CMDFILTER_NOSELF` flag prevents execution flows that 4873 * originate from the module's own `RM_Call()` from reaching the filter. This 4874 * flag is effective for all execution flows, including nested ones, as long as 4875 * the execution begins from the module's command context or a thread-safe 4876 * context that is associated with a blocking command. 4877 * 4878 * Detached thread-safe contexts are *not* associated with the module and cannot 4879 * be protected by this flag. 4880 * 4881 * If multiple filters are registered (by the same or different modules), they 4882 * are executed in the order of registration. 4883 */ 4884 4885 RedisModuleCommandFilter *RM_RegisterCommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilterFunc callback, int flags) { 4886 RedisModuleCommandFilter *filter = zmalloc(sizeof(*filter)); 4887 filter->module = ctx->module; 4888 filter->callback = callback; 4889 filter->flags = flags; 4890 4891 listAddNodeTail(moduleCommandFilters, filter); 4892 listAddNodeTail(ctx->module->filters, filter); 4893 return filter; 4894 } 4895 4896 /* Unregister a command filter. 4897 */ 4898 int RM_UnregisterCommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilter *filter) { 4899 listNode *ln; 4900 4901 /* A module can only remove its own filters */ 4902 if (filter->module != ctx->module) return REDISMODULE_ERR; 4903 4904 ln = listSearchKey(moduleCommandFilters,filter); 4905 if (!ln) return REDISMODULE_ERR; 4906 listDelNode(moduleCommandFilters,ln); 4907 4908 ln = listSearchKey(ctx->module->filters,filter); 4909 if (!ln) return REDISMODULE_ERR; /* Shouldn't happen */ 4910 listDelNode(ctx->module->filters,ln); 4911 4912 return REDISMODULE_OK; 4913 } 4914 4915 void moduleCallCommandFilters(client *c) { 4916 if (listLength(moduleCommandFilters) == 0) return; 4917 4918 listIter li; 4919 listNode *ln; 4920 listRewind(moduleCommandFilters,&li); 4921 4922 RedisModuleCommandFilterCtx filter = { 4923 .argv = c->argv, 4924 .argc = c->argc 4925 }; 4926 4927 while((ln = listNext(&li))) { 4928 RedisModuleCommandFilter *f = ln->value; 4929 4930 /* Skip filter if REDISMODULE_CMDFILTER_NOSELF is set and module is 4931 * currently processing a command. 4932 */ 4933 if ((f->flags & REDISMODULE_CMDFILTER_NOSELF) && f->module->in_call) continue; 4934 4935 /* Call filter */ 4936 f->callback(&filter); 4937 } 4938 4939 c->argv = filter.argv; 4940 c->argc = filter.argc; 4941 } 4942 4943 /* Return the number of arguments a filtered command has. The number of 4944 * arguments include the command itself. 4945 */ 4946 int RM_CommandFilterArgsCount(RedisModuleCommandFilterCtx *fctx) 4947 { 4948 return fctx->argc; 4949 } 4950 4951 /* Return the specified command argument. The first argument (position 0) is 4952 * the command itself, and the rest are user-provided args. 4953 */ 4954 const RedisModuleString *RM_CommandFilterArgGet(RedisModuleCommandFilterCtx *fctx, int pos) 4955 { 4956 if (pos < 0 || pos >= fctx->argc) return NULL; 4957 return fctx->argv[pos]; 4958 } 4959 4960 /* Modify the filtered command by inserting a new argument at the specified 4961 * position. The specified RedisModuleString argument may be used by Redis 4962 * after the filter context is destroyed, so it must not be auto-memory 4963 * allocated, freed or used elsewhere. 4964 */ 4965 4966 int RM_CommandFilterArgInsert(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg) 4967 { 4968 int i; 4969 4970 if (pos < 0 || pos > fctx->argc) return REDISMODULE_ERR; 4971 4972 fctx->argv = zrealloc(fctx->argv, (fctx->argc+1)*sizeof(RedisModuleString *)); 4973 for (i = fctx->argc; i > pos; i--) { 4974 fctx->argv[i] = fctx->argv[i-1]; 4975 } 4976 fctx->argv[pos] = arg; 4977 fctx->argc++; 4978 4979 return REDISMODULE_OK; 4980 } 4981 4982 /* Modify the filtered command by replacing an existing argument with a new one. 4983 * The specified RedisModuleString argument may be used by Redis after the 4984 * filter context is destroyed, so it must not be auto-memory allocated, freed 4985 * or used elsewhere. 4986 */ 4987 4988 int RM_CommandFilterArgReplace(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg) 4989 { 4990 if (pos < 0 || pos >= fctx->argc) return REDISMODULE_ERR; 4991 4992 decrRefCount(fctx->argv[pos]); 4993 fctx->argv[pos] = arg; 4994 4995 return REDISMODULE_OK; 4996 } 4997 4998 /* Modify the filtered command by deleting an argument at the specified 4999 * position. 5000 */ 5001 int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos) 5002 { 5003 int i; 5004 if (pos < 0 || pos >= fctx->argc) return REDISMODULE_ERR; 5005 5006 decrRefCount(fctx->argv[pos]); 5007 for (i = pos; i < fctx->argc-1; i++) { 5008 fctx->argv[i] = fctx->argv[i+1]; 5009 } 5010 fctx->argc--; 5011 5012 return REDISMODULE_OK; 5013 } 5014 5015 /* -------------------------------------------------------------------------- 5016 * Modules API internals 5017 * -------------------------------------------------------------------------- */ 5018 5019 /* server.moduleapi dictionary type. Only uses plain C strings since 5020 * this gets queries from modules. */ 5021 5022 uint64_t dictCStringKeyHash(const void *key) { 5023 return dictGenHashFunction((unsigned char*)key, strlen((char*)key)); 5024 } 5025 5026 int dictCStringKeyCompare(void *privdata, const void *key1, const void *key2) { 5027 UNUSED(privdata); 5028 return strcmp(key1,key2) == 0; 5029 } 5030 5031 dictType moduleAPIDictType = { 5032 dictCStringKeyHash, /* hash function */ 5033 NULL, /* key dup */ 5034 NULL, /* val dup */ 5035 dictCStringKeyCompare, /* key compare */ 5036 NULL, /* key destructor */ 5037 NULL /* val destructor */ 5038 }; 5039 5040 int moduleRegisterApi(const char *funcname, void *funcptr) { 5041 return dictAdd(server.moduleapi, (char*)funcname, funcptr); 5042 } 5043 5044 #define REGISTER_API(name) \ 5045 moduleRegisterApi("RedisModule_" #name, (void *)(unsigned long)RM_ ## name) 5046 5047 /* Global initialization at Redis startup. */ 5048 void moduleRegisterCoreAPI(void); 5049 5050 void moduleInitModulesSystem(void) { 5051 moduleUnblockedClients = listCreate(); 5052 server.loadmodule_queue = listCreate(); 5053 modules = dictCreate(&modulesDictType,NULL); 5054 5055 /* Set up the keyspace notification susbscriber list and static client */ 5056 moduleKeyspaceSubscribers = listCreate(); 5057 moduleFreeContextReusedClient = createClient(-1); 5058 moduleFreeContextReusedClient->flags |= CLIENT_MODULE; 5059 5060 /* Set up filter list */ 5061 moduleCommandFilters = listCreate(); 5062 5063 moduleRegisterCoreAPI(); 5064 if (pipe(server.module_blocked_pipe) == -1) { 5065 serverLog(LL_WARNING, 5066 "Can't create the pipe for module blocking commands: %s", 5067 strerror(errno)); 5068 exit(1); 5069 } 5070 /* Make the pipe non blocking. This is just a best effort aware mechanism 5071 * and we do not want to block not in the read nor in the write half. */ 5072 anetNonBlock(NULL,server.module_blocked_pipe[0]); 5073 anetNonBlock(NULL,server.module_blocked_pipe[1]); 5074 5075 /* Create the timers radix tree. */ 5076 Timers = raxNew(); 5077 5078 /* Our thread-safe contexts GIL must start with already locked: 5079 * it is just unlocked when it's safe. */ 5080 pthread_mutex_lock(&moduleGIL); 5081 } 5082 5083 /* Load all the modules in the server.loadmodule_queue list, which is 5084 * populated by `loadmodule` directives in the configuration file. 5085 * We can't load modules directly when processing the configuration file 5086 * because the server must be fully initialized before loading modules. 5087 * 5088 * The function aborts the server on errors, since to start with missing 5089 * modules is not considered sane: clients may rely on the existence of 5090 * given commands, loading AOF also may need some modules to exist, and 5091 * if this instance is a slave, it must understand commands from master. */ 5092 void moduleLoadFromQueue(void) { 5093 listIter li; 5094 listNode *ln; 5095 5096 listRewind(server.loadmodule_queue,&li); 5097 while((ln = listNext(&li))) { 5098 struct moduleLoadQueueEntry *loadmod = ln->value; 5099 if (moduleLoad(loadmod->path,(void **)loadmod->argv,loadmod->argc) 5100 == C_ERR) 5101 { 5102 serverLog(LL_WARNING, 5103 "Can't load module from %s: server aborting", 5104 loadmod->path); 5105 exit(1); 5106 } 5107 } 5108 } 5109 5110 void moduleFreeModuleStructure(struct RedisModule *module) { 5111 listRelease(module->types); 5112 listRelease(module->filters); 5113 sdsfree(module->name); 5114 zfree(module); 5115 } 5116 5117 void moduleUnregisterCommands(struct RedisModule *module) { 5118 /* Unregister all the commands registered by this module. */ 5119 dictIterator *di = dictGetSafeIterator(server.commands); 5120 dictEntry *de; 5121 while ((de = dictNext(di)) != NULL) { 5122 struct redisCommand *cmd = dictGetVal(de); 5123 if (cmd->proc == RedisModuleCommandDispatcher) { 5124 RedisModuleCommandProxy *cp = 5125 (void*)(unsigned long)cmd->getkeys_proc; 5126 sds cmdname = cp->rediscmd->name; 5127 if (cp->module == module) { 5128 dictDelete(server.commands,cmdname); 5129 dictDelete(server.orig_commands,cmdname); 5130 sdsfree(cmdname); 5131 zfree(cp->rediscmd); 5132 zfree(cp); 5133 } 5134 } 5135 } 5136 dictReleaseIterator(di); 5137 } 5138 5139 /* Load a module and initialize it. On success C_OK is returned, otherwise 5140 * C_ERR is returned. */ 5141 int moduleLoad(const char *path, void **module_argv, int module_argc) { 5142 int (*onload)(void *, void **, int); 5143 void *handle; 5144 RedisModuleCtx ctx = REDISMODULE_CTX_INIT; 5145 5146 handle = dlopen(path,RTLD_NOW|RTLD_LOCAL); 5147 if (handle == NULL) { 5148 serverLog(LL_WARNING, "Module %s failed to load: %s", path, dlerror()); 5149 return C_ERR; 5150 } 5151 onload = (int (*)(void *, void **, int))(unsigned long) dlsym(handle,"RedisModule_OnLoad"); 5152 if (onload == NULL) { 5153 dlclose(handle); 5154 serverLog(LL_WARNING, 5155 "Module %s does not export RedisModule_OnLoad() " 5156 "symbol. Module not loaded.",path); 5157 return C_ERR; 5158 } 5159 if (onload((void*)&ctx,module_argv,module_argc) == REDISMODULE_ERR) { 5160 if (ctx.module) { 5161 moduleUnregisterCommands(ctx.module); 5162 moduleUnregisterSharedAPI(ctx.module); 5163 moduleUnregisterUsedAPI(ctx.module); 5164 moduleFreeModuleStructure(ctx.module); 5165 } 5166 dlclose(handle); 5167 serverLog(LL_WARNING, 5168 "Module %s initialization failed. Module not loaded",path); 5169 return C_ERR; 5170 } 5171 5172 /* Redis module loaded! Register it. */ 5173 dictAdd(modules,ctx.module->name,ctx.module); 5174 ctx.module->handle = handle; 5175 serverLog(LL_NOTICE,"Module '%s' loaded from %s",ctx.module->name,path); 5176 moduleFreeContext(&ctx); 5177 return C_OK; 5178 } 5179 5180 5181 /* Unload the module registered with the specified name. On success 5182 * C_OK is returned, otherwise C_ERR is returned and errno is set 5183 * to the following values depending on the type of error: 5184 * 5185 * * ENONET: No such module having the specified name. 5186 * * EBUSY: The module exports a new data type and can only be reloaded. */ 5187 int moduleUnload(sds name) { 5188 struct RedisModule *module = dictFetchValue(modules,name); 5189 5190 if (module == NULL) { 5191 errno = ENOENT; 5192 return REDISMODULE_ERR; 5193 } else if (listLength(module->types)) { 5194 errno = EBUSY; 5195 return REDISMODULE_ERR; 5196 } else if (listLength(module->usedby)) { 5197 errno = EPERM; 5198 return REDISMODULE_ERR; 5199 } 5200 5201 moduleUnregisterCommands(module); 5202 moduleUnregisterSharedAPI(module); 5203 moduleUnregisterUsedAPI(module); 5204 moduleUnregisterFilters(module); 5205 5206 /* Remove any notification subscribers this module might have */ 5207 moduleUnsubscribeNotifications(module); 5208 5209 /* Unregister all the hooks. TODO: Yet no hooks support here. */ 5210 5211 /* Unload the dynamic library. */ 5212 if (dlclose(module->handle) == -1) { 5213 char *error = dlerror(); 5214 if (error == NULL) error = "Unknown error"; 5215 serverLog(LL_WARNING,"Error when trying to close the %s module: %s", 5216 module->name, error); 5217 } 5218 5219 /* Remove from list of modules. */ 5220 serverLog(LL_NOTICE,"Module %s unloaded",module->name); 5221 dictDelete(modules,module->name); 5222 module->name = NULL; /* The name was already freed by dictDelete(). */ 5223 moduleFreeModuleStructure(module); 5224 5225 return REDISMODULE_OK; 5226 } 5227 5228 /* Redis MODULE command. 5229 * 5230 * MODULE LOAD <path> [args...] */ 5231 void moduleCommand(client *c) { 5232 char *subcmd = c->argv[1]->ptr; 5233 if (c->argc == 2 && !strcasecmp(subcmd,"help")) { 5234 const char *help[] = { 5235 "LIST -- Return a list of loaded modules.", 5236 "LOAD <path> [arg ...] -- Load a module library from <path>.", 5237 "UNLOAD <name> -- Unload a module.", 5238 NULL 5239 }; 5240 addReplyHelp(c, help); 5241 } else 5242 if (!strcasecmp(subcmd,"load") && c->argc >= 3) { 5243 robj **argv = NULL; 5244 int argc = 0; 5245 5246 if (c->argc > 3) { 5247 argc = c->argc - 3; 5248 argv = &c->argv[3]; 5249 } 5250 5251 if (moduleLoad(c->argv[2]->ptr,(void **)argv,argc) == C_OK) 5252 addReply(c,shared.ok); 5253 else 5254 addReplyError(c, 5255 "Error loading the extension. Please check the server logs."); 5256 } else if (!strcasecmp(subcmd,"unload") && c->argc == 3) { 5257 if (moduleUnload(c->argv[2]->ptr) == C_OK) 5258 addReply(c,shared.ok); 5259 else { 5260 char *errmsg; 5261 switch(errno) { 5262 case ENOENT: 5263 errmsg = "no such module with that name"; 5264 break; 5265 case EBUSY: 5266 errmsg = "the module exports one or more module-side data " 5267 "types, can't unload"; 5268 break; 5269 case EPERM: 5270 errmsg = "the module exports APIs used by other modules. " 5271 "Please unload them first and try again"; 5272 break; 5273 default: 5274 errmsg = "operation not possible."; 5275 break; 5276 } 5277 addReplyErrorFormat(c,"Error unloading module: %s",errmsg); 5278 } 5279 } else if (!strcasecmp(subcmd,"list") && c->argc == 2) { 5280 dictIterator *di = dictGetIterator(modules); 5281 dictEntry *de; 5282 5283 addReplyMultiBulkLen(c,dictSize(modules)); 5284 while ((de = dictNext(di)) != NULL) { 5285 sds name = dictGetKey(de); 5286 struct RedisModule *module = dictGetVal(de); 5287 addReplyMultiBulkLen(c,4); 5288 addReplyBulkCString(c,"name"); 5289 addReplyBulkCBuffer(c,name,sdslen(name)); 5290 addReplyBulkCString(c,"ver"); 5291 addReplyLongLong(c,module->ver); 5292 } 5293 dictReleaseIterator(di); 5294 } else { 5295 addReplySubcommandSyntaxError(c); 5296 return; 5297 } 5298 } 5299 5300 /* Return the number of registered modules. */ 5301 size_t moduleCount(void) { 5302 return dictSize(modules); 5303 } 5304 5305 /* Register all the APIs we export. Keep this function at the end of the 5306 * file so that's easy to seek it to add new entries. */ 5307 void moduleRegisterCoreAPI(void) { 5308 server.moduleapi = dictCreate(&moduleAPIDictType,NULL); 5309 server.sharedapi = dictCreate(&moduleAPIDictType,NULL); 5310 REGISTER_API(Alloc); 5311 REGISTER_API(Calloc); 5312 REGISTER_API(Realloc); 5313 REGISTER_API(Free); 5314 REGISTER_API(Strdup); 5315 REGISTER_API(CreateCommand); 5316 REGISTER_API(SetModuleAttribs); 5317 REGISTER_API(IsModuleNameBusy); 5318 REGISTER_API(WrongArity); 5319 REGISTER_API(ReplyWithLongLong); 5320 REGISTER_API(ReplyWithError); 5321 REGISTER_API(ReplyWithSimpleString); 5322 REGISTER_API(ReplyWithArray); 5323 REGISTER_API(ReplySetArrayLength); 5324 REGISTER_API(ReplyWithString); 5325 REGISTER_API(ReplyWithStringBuffer); 5326 REGISTER_API(ReplyWithNull); 5327 REGISTER_API(ReplyWithCallReply); 5328 REGISTER_API(ReplyWithDouble); 5329 REGISTER_API(GetSelectedDb); 5330 REGISTER_API(SelectDb); 5331 REGISTER_API(OpenKey); 5332 REGISTER_API(CloseKey); 5333 REGISTER_API(KeyType); 5334 REGISTER_API(ValueLength); 5335 REGISTER_API(ListPush); 5336 REGISTER_API(ListPop); 5337 REGISTER_API(StringToLongLong); 5338 REGISTER_API(StringToDouble); 5339 REGISTER_API(Call); 5340 REGISTER_API(CallReplyProto); 5341 REGISTER_API(FreeCallReply); 5342 REGISTER_API(CallReplyInteger); 5343 REGISTER_API(CallReplyType); 5344 REGISTER_API(CallReplyLength); 5345 REGISTER_API(CallReplyArrayElement); 5346 REGISTER_API(CallReplyStringPtr); 5347 REGISTER_API(CreateStringFromCallReply); 5348 REGISTER_API(CreateString); 5349 REGISTER_API(CreateStringFromLongLong); 5350 REGISTER_API(CreateStringFromString); 5351 REGISTER_API(CreateStringPrintf); 5352 REGISTER_API(FreeString); 5353 REGISTER_API(StringPtrLen); 5354 REGISTER_API(AutoMemory); 5355 REGISTER_API(Replicate); 5356 REGISTER_API(ReplicateVerbatim); 5357 REGISTER_API(DeleteKey); 5358 REGISTER_API(UnlinkKey); 5359 REGISTER_API(StringSet); 5360 REGISTER_API(StringDMA); 5361 REGISTER_API(StringTruncate); 5362 REGISTER_API(SetExpire); 5363 REGISTER_API(GetExpire); 5364 REGISTER_API(ZsetAdd); 5365 REGISTER_API(ZsetIncrby); 5366 REGISTER_API(ZsetScore); 5367 REGISTER_API(ZsetRem); 5368 REGISTER_API(ZsetRangeStop); 5369 REGISTER_API(ZsetFirstInScoreRange); 5370 REGISTER_API(ZsetLastInScoreRange); 5371 REGISTER_API(ZsetFirstInLexRange); 5372 REGISTER_API(ZsetLastInLexRange); 5373 REGISTER_API(ZsetRangeCurrentElement); 5374 REGISTER_API(ZsetRangeNext); 5375 REGISTER_API(ZsetRangePrev); 5376 REGISTER_API(ZsetRangeEndReached); 5377 REGISTER_API(HashSet); 5378 REGISTER_API(HashGet); 5379 REGISTER_API(IsKeysPositionRequest); 5380 REGISTER_API(KeyAtPos); 5381 REGISTER_API(GetClientId); 5382 REGISTER_API(GetContextFlags); 5383 REGISTER_API(PoolAlloc); 5384 REGISTER_API(CreateDataType); 5385 REGISTER_API(ModuleTypeSetValue); 5386 REGISTER_API(ModuleTypeGetType); 5387 REGISTER_API(ModuleTypeGetValue); 5388 REGISTER_API(SaveUnsigned); 5389 REGISTER_API(LoadUnsigned); 5390 REGISTER_API(SaveSigned); 5391 REGISTER_API(LoadSigned); 5392 REGISTER_API(SaveString); 5393 REGISTER_API(SaveStringBuffer); 5394 REGISTER_API(LoadString); 5395 REGISTER_API(LoadStringBuffer); 5396 REGISTER_API(SaveDouble); 5397 REGISTER_API(LoadDouble); 5398 REGISTER_API(SaveFloat); 5399 REGISTER_API(LoadFloat); 5400 REGISTER_API(EmitAOF); 5401 REGISTER_API(Log); 5402 REGISTER_API(LogIOError); 5403 REGISTER_API(StringAppendBuffer); 5404 REGISTER_API(RetainString); 5405 REGISTER_API(StringCompare); 5406 REGISTER_API(GetContextFromIO); 5407 REGISTER_API(GetKeyNameFromIO); 5408 REGISTER_API(BlockClient); 5409 REGISTER_API(UnblockClient); 5410 REGISTER_API(IsBlockedReplyRequest); 5411 REGISTER_API(IsBlockedTimeoutRequest); 5412 REGISTER_API(GetBlockedClientPrivateData); 5413 REGISTER_API(AbortBlock); 5414 REGISTER_API(Milliseconds); 5415 REGISTER_API(GetThreadSafeContext); 5416 REGISTER_API(FreeThreadSafeContext); 5417 REGISTER_API(ThreadSafeContextLock); 5418 REGISTER_API(ThreadSafeContextUnlock); 5419 REGISTER_API(DigestAddStringBuffer); 5420 REGISTER_API(DigestAddLongLong); 5421 REGISTER_API(DigestEndSequence); 5422 REGISTER_API(SubscribeToKeyspaceEvents); 5423 REGISTER_API(RegisterClusterMessageReceiver); 5424 REGISTER_API(SendClusterMessage); 5425 REGISTER_API(GetClusterNodeInfo); 5426 REGISTER_API(GetClusterNodesList); 5427 REGISTER_API(FreeClusterNodesList); 5428 REGISTER_API(CreateTimer); 5429 REGISTER_API(StopTimer); 5430 REGISTER_API(GetTimerInfo); 5431 REGISTER_API(GetMyClusterID); 5432 REGISTER_API(GetClusterSize); 5433 REGISTER_API(GetRandomBytes); 5434 REGISTER_API(GetRandomHexChars); 5435 REGISTER_API(BlockedClientDisconnected); 5436 REGISTER_API(SetDisconnectCallback); 5437 REGISTER_API(GetBlockedClientHandle); 5438 REGISTER_API(SetClusterFlags); 5439 REGISTER_API(CreateDict); 5440 REGISTER_API(FreeDict); 5441 REGISTER_API(DictSize); 5442 REGISTER_API(DictSetC); 5443 REGISTER_API(DictReplaceC); 5444 REGISTER_API(DictSet); 5445 REGISTER_API(DictReplace); 5446 REGISTER_API(DictGetC); 5447 REGISTER_API(DictGet); 5448 REGISTER_API(DictDelC); 5449 REGISTER_API(DictDel); 5450 REGISTER_API(DictIteratorStartC); 5451 REGISTER_API(DictIteratorStart); 5452 REGISTER_API(DictIteratorStop); 5453 REGISTER_API(DictIteratorReseekC); 5454 REGISTER_API(DictIteratorReseek); 5455 REGISTER_API(DictNextC); 5456 REGISTER_API(DictPrevC); 5457 REGISTER_API(DictNext); 5458 REGISTER_API(DictPrev); 5459 REGISTER_API(DictCompareC); 5460 REGISTER_API(DictCompare); 5461 REGISTER_API(ExportSharedAPI); 5462 REGISTER_API(GetSharedAPI); 5463 REGISTER_API(RegisterCommandFilter); 5464 REGISTER_API(UnregisterCommandFilter); 5465 REGISTER_API(CommandFilterArgsCount); 5466 REGISTER_API(CommandFilterArgGet); 5467 REGISTER_API(CommandFilterArgInsert); 5468 REGISTER_API(CommandFilterArgReplace); 5469 REGISTER_API(CommandFilterArgDelete); 5470 } 5471