1 /* Helloblock module -- An example of blocking command implementation 2 * with threads. 3 * 4 * ----------------------------------------------------------------------------- 5 * 6 * Copyright (c) 2016, Salvatore Sanfilippo <antirez at gmail dot com> 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions are met: 11 * 12 * * Redistributions of source code must retain the above copyright notice, 13 * this list of conditions and the following disclaimer. 14 * * Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * * Neither the name of Redis nor the names of its contributors may be used 18 * to endorse or promote products derived from this software without 19 * specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 22 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 23 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 24 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 25 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 26 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 27 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 28 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 29 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 30 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 31 * POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #define REDISMODULE_EXPERIMENTAL_API 35 #include "../redismodule.h" 36 #include <stdio.h> 37 #include <stdlib.h> 38 #include <pthread.h> 39 #include <unistd.h> 40 41 /* Reply callback for blocking command HELLO.BLOCK */ 42 int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { 43 REDISMODULE_NOT_USED(argv); 44 REDISMODULE_NOT_USED(argc); 45 int *myint = RedisModule_GetBlockedClientPrivateData(ctx); 46 return RedisModule_ReplyWithLongLong(ctx,*myint); 47 } 48 49 /* Timeout callback for blocking command HELLO.BLOCK */ 50 int HelloBlock_Timeout(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { 51 REDISMODULE_NOT_USED(argv); 52 REDISMODULE_NOT_USED(argc); 53 return RedisModule_ReplyWithSimpleString(ctx,"Request timedout"); 54 } 55 56 /* Private data freeing callback for HELLO.BLOCK command. */ 57 void HelloBlock_FreeData(RedisModuleCtx *ctx, void *privdata) { 58 REDISMODULE_NOT_USED(ctx); 59 RedisModule_Free(privdata); 60 } 61 62 /* The thread entry point that actually executes the blocking part 63 * of the command HELLO.BLOCK. */ 64 void *HelloBlock_ThreadMain(void *arg) { 65 void **targ = arg; 66 RedisModuleBlockedClient *bc = targ[0]; 67 long long delay = (unsigned long)targ[1]; 68 RedisModule_Free(targ); 69 70 sleep(delay); 71 int *r = RedisModule_Alloc(sizeof(int)); 72 *r = rand(); 73 RedisModule_UnblockClient(bc,r); 74 return NULL; 75 } 76 77 /* An example blocked client disconnection callback. 78 * 79 * Note that in the case of the HELLO.BLOCK command, the blocked client is now 80 * owned by the thread calling sleep(). In this specific case, there is not 81 * much we can do, however normally we could instead implement a way to 82 * signal the thread that the client disconnected, and sleep the specified 83 * amount of seconds with a while loop calling sleep(1), so that once we 84 * detect the client disconnection, we can terminate the thread ASAP. */ 85 void HelloBlock_Disconnected(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc) { 86 RedisModule_Log(ctx,"warning","Blocked client %p disconnected!", 87 (void*)bc); 88 89 /* Here you should cleanup your state / threads, and if possible 90 * call RedisModule_UnblockClient(), or notify the thread that will 91 * call the function ASAP. */ 92 } 93 94 /* HELLO.BLOCK <delay> <timeout> -- Block for <count> seconds, then reply with 95 * a random number. Timeout is the command timeout, so that you can test 96 * what happens when the delay is greater than the timeout. */ 97 int HelloBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { 98 if (argc != 3) return RedisModule_WrongArity(ctx); 99 long long delay; 100 long long timeout; 101 102 if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) { 103 return RedisModule_ReplyWithError(ctx,"ERR invalid count"); 104 } 105 106 if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK) { 107 return RedisModule_ReplyWithError(ctx,"ERR invalid count"); 108 } 109 110 pthread_t tid; 111 RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout); 112 113 /* Here we set a disconnection handler, however since this module will 114 * block in sleep() in a thread, there is not much we can do in the 115 * callback, so this is just to show you the API. */ 116 RedisModule_SetDisconnectCallback(bc,HelloBlock_Disconnected); 117 118 /* Now that we setup a blocking client, we need to pass the control 119 * to the thread. However we need to pass arguments to the thread: 120 * the delay and a reference to the blocked client handle. */ 121 void **targ = RedisModule_Alloc(sizeof(void*)*2); 122 targ[0] = bc; 123 targ[1] = (void*)(unsigned long) delay; 124 125 if (pthread_create(&tid,NULL,HelloBlock_ThreadMain,targ) != 0) { 126 RedisModule_AbortBlock(bc); 127 return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread"); 128 } 129 return REDISMODULE_OK; 130 } 131 132 /* The thread entry point that actually executes the blocking part 133 * of the command HELLO.KEYS. 134 * 135 * Note: this implementation is very simple on purpose, so no duplicated 136 * keys (returned by SCAN) are filtered. However adding such a functionality 137 * would be trivial just using any data structure implementing a dictionary 138 * in order to filter the duplicated items. */ 139 void *HelloKeys_ThreadMain(void *arg) { 140 RedisModuleBlockedClient *bc = arg; 141 RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc); 142 long long cursor = 0; 143 size_t replylen = 0; 144 145 RedisModule_ReplyWithArray(ctx,REDISMODULE_POSTPONED_ARRAY_LEN); 146 do { 147 RedisModule_ThreadSafeContextLock(ctx); 148 RedisModuleCallReply *reply = RedisModule_Call(ctx, 149 "SCAN","l",(long long)cursor); 150 RedisModule_ThreadSafeContextUnlock(ctx); 151 152 RedisModuleCallReply *cr_cursor = 153 RedisModule_CallReplyArrayElement(reply,0); 154 RedisModuleCallReply *cr_keys = 155 RedisModule_CallReplyArrayElement(reply,1); 156 157 RedisModuleString *s = RedisModule_CreateStringFromCallReply(cr_cursor); 158 RedisModule_StringToLongLong(s,&cursor); 159 RedisModule_FreeString(ctx,s); 160 161 size_t items = RedisModule_CallReplyLength(cr_keys); 162 for (size_t j = 0; j < items; j++) { 163 RedisModuleCallReply *ele = 164 RedisModule_CallReplyArrayElement(cr_keys,j); 165 RedisModule_ReplyWithCallReply(ctx,ele); 166 replylen++; 167 } 168 RedisModule_FreeCallReply(reply); 169 } while (cursor != 0); 170 RedisModule_ReplySetArrayLength(ctx,replylen); 171 172 RedisModule_FreeThreadSafeContext(ctx); 173 RedisModule_UnblockClient(bc,NULL); 174 return NULL; 175 } 176 177 /* HELLO.KEYS -- Return all the keys in the current database without blocking 178 * the server. The keys do not represent a point-in-time state so only the keys 179 * that were in the database from the start to the end are guaranteed to be 180 * there. */ 181 int HelloKeys_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { 182 REDISMODULE_NOT_USED(argv); 183 if (argc != 1) return RedisModule_WrongArity(ctx); 184 185 pthread_t tid; 186 187 /* Note that when blocking the client we do not set any callback: no 188 * timeout is possible since we passed '0', nor we need a reply callback 189 * because we'll use the thread safe context to accumulate a reply. */ 190 RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0); 191 192 /* Now that we setup a blocking client, we need to pass the control 193 * to the thread. However we need to pass arguments to the thread: 194 * the reference to the blocked client handle. */ 195 if (pthread_create(&tid,NULL,HelloKeys_ThreadMain,bc) != 0) { 196 RedisModule_AbortBlock(bc); 197 return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread"); 198 } 199 return REDISMODULE_OK; 200 } 201 202 /* This function must be present on each Redis module. It is used in order to 203 * register the commands into the Redis server. */ 204 int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { 205 REDISMODULE_NOT_USED(argv); 206 REDISMODULE_NOT_USED(argc); 207 208 if (RedisModule_Init(ctx,"helloblock",1,REDISMODULE_APIVER_1) 209 == REDISMODULE_ERR) return REDISMODULE_ERR; 210 211 if (RedisModule_CreateCommand(ctx,"hello.block", 212 HelloBlock_RedisCommand,"",0,0,0) == REDISMODULE_ERR) 213 return REDISMODULE_ERR; 214 if (RedisModule_CreateCommand(ctx,"hello.keys", 215 HelloKeys_RedisCommand,"",0,0,0) == REDISMODULE_ERR) 216 return REDISMODULE_ERR; 217 218 return REDISMODULE_OK; 219 } 220