1*572c4311Sfengbojiang /* Helloblock module -- An example of blocking command implementation
2*572c4311Sfengbojiang * with threads.
3*572c4311Sfengbojiang *
4*572c4311Sfengbojiang * -----------------------------------------------------------------------------
5*572c4311Sfengbojiang *
6*572c4311Sfengbojiang * Copyright (c) 2016, Salvatore Sanfilippo <antirez at gmail dot com>
7*572c4311Sfengbojiang * All rights reserved.
8*572c4311Sfengbojiang *
9*572c4311Sfengbojiang * Redistribution and use in source and binary forms, with or without
10*572c4311Sfengbojiang * modification, are permitted provided that the following conditions are met:
11*572c4311Sfengbojiang *
12*572c4311Sfengbojiang * * Redistributions of source code must retain the above copyright notice,
13*572c4311Sfengbojiang * this list of conditions and the following disclaimer.
14*572c4311Sfengbojiang * * Redistributions in binary form must reproduce the above copyright
15*572c4311Sfengbojiang * notice, this list of conditions and the following disclaimer in the
16*572c4311Sfengbojiang * documentation and/or other materials provided with the distribution.
17*572c4311Sfengbojiang * * Neither the name of Redis nor the names of its contributors may be used
18*572c4311Sfengbojiang * to endorse or promote products derived from this software without
19*572c4311Sfengbojiang * specific prior written permission.
20*572c4311Sfengbojiang *
21*572c4311Sfengbojiang * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
22*572c4311Sfengbojiang * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23*572c4311Sfengbojiang * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
24*572c4311Sfengbojiang * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
25*572c4311Sfengbojiang * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
26*572c4311Sfengbojiang * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
27*572c4311Sfengbojiang * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
28*572c4311Sfengbojiang * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
29*572c4311Sfengbojiang * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
30*572c4311Sfengbojiang * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31*572c4311Sfengbojiang * POSSIBILITY OF SUCH DAMAGE.
32*572c4311Sfengbojiang */
33*572c4311Sfengbojiang
34*572c4311Sfengbojiang #define REDISMODULE_EXPERIMENTAL_API
35*572c4311Sfengbojiang #include "../redismodule.h"
36*572c4311Sfengbojiang #include <stdio.h>
37*572c4311Sfengbojiang #include <stdlib.h>
38*572c4311Sfengbojiang #include <pthread.h>
39*572c4311Sfengbojiang #include <unistd.h>
40*572c4311Sfengbojiang
41*572c4311Sfengbojiang /* Reply callback for blocking command HELLO.BLOCK */
HelloBlock_Reply(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)42*572c4311Sfengbojiang int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
43*572c4311Sfengbojiang REDISMODULE_NOT_USED(argv);
44*572c4311Sfengbojiang REDISMODULE_NOT_USED(argc);
45*572c4311Sfengbojiang int *myint = RedisModule_GetBlockedClientPrivateData(ctx);
46*572c4311Sfengbojiang return RedisModule_ReplyWithLongLong(ctx,*myint);
47*572c4311Sfengbojiang }
48*572c4311Sfengbojiang
49*572c4311Sfengbojiang /* Timeout callback for blocking command HELLO.BLOCK */
HelloBlock_Timeout(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)50*572c4311Sfengbojiang int HelloBlock_Timeout(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
51*572c4311Sfengbojiang REDISMODULE_NOT_USED(argv);
52*572c4311Sfengbojiang REDISMODULE_NOT_USED(argc);
53*572c4311Sfengbojiang return RedisModule_ReplyWithSimpleString(ctx,"Request timedout");
54*572c4311Sfengbojiang }
55*572c4311Sfengbojiang
56*572c4311Sfengbojiang /* Private data freeing callback for HELLO.BLOCK command. */
HelloBlock_FreeData(RedisModuleCtx * ctx,void * privdata)57*572c4311Sfengbojiang void HelloBlock_FreeData(RedisModuleCtx *ctx, void *privdata) {
58*572c4311Sfengbojiang REDISMODULE_NOT_USED(ctx);
59*572c4311Sfengbojiang RedisModule_Free(privdata);
60*572c4311Sfengbojiang }
61*572c4311Sfengbojiang
62*572c4311Sfengbojiang /* The thread entry point that actually executes the blocking part
63*572c4311Sfengbojiang * of the command HELLO.BLOCK. */
HelloBlock_ThreadMain(void * arg)64*572c4311Sfengbojiang void *HelloBlock_ThreadMain(void *arg) {
65*572c4311Sfengbojiang void **targ = arg;
66*572c4311Sfengbojiang RedisModuleBlockedClient *bc = targ[0];
67*572c4311Sfengbojiang long long delay = (unsigned long)targ[1];
68*572c4311Sfengbojiang RedisModule_Free(targ);
69*572c4311Sfengbojiang
70*572c4311Sfengbojiang sleep(delay);
71*572c4311Sfengbojiang int *r = RedisModule_Alloc(sizeof(int));
72*572c4311Sfengbojiang *r = rand();
73*572c4311Sfengbojiang RedisModule_UnblockClient(bc,r);
74*572c4311Sfengbojiang return NULL;
75*572c4311Sfengbojiang }
76*572c4311Sfengbojiang
77*572c4311Sfengbojiang /* An example blocked client disconnection callback.
78*572c4311Sfengbojiang *
79*572c4311Sfengbojiang * Note that in the case of the HELLO.BLOCK command, the blocked client is now
80*572c4311Sfengbojiang * owned by the thread calling sleep(). In this specific case, there is not
81*572c4311Sfengbojiang * much we can do, however normally we could instead implement a way to
82*572c4311Sfengbojiang * signal the thread that the client disconnected, and sleep the specified
83*572c4311Sfengbojiang * amount of seconds with a while loop calling sleep(1), so that once we
84*572c4311Sfengbojiang * detect the client disconnection, we can terminate the thread ASAP. */
HelloBlock_Disconnected(RedisModuleCtx * ctx,RedisModuleBlockedClient * bc)85*572c4311Sfengbojiang void HelloBlock_Disconnected(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc) {
86*572c4311Sfengbojiang RedisModule_Log(ctx,"warning","Blocked client %p disconnected!",
87*572c4311Sfengbojiang (void*)bc);
88*572c4311Sfengbojiang
89*572c4311Sfengbojiang /* Here you should cleanup your state / threads, and if possible
90*572c4311Sfengbojiang * call RedisModule_UnblockClient(), or notify the thread that will
91*572c4311Sfengbojiang * call the function ASAP. */
92*572c4311Sfengbojiang }
93*572c4311Sfengbojiang
94*572c4311Sfengbojiang /* HELLO.BLOCK <delay> <timeout> -- Block for <count> seconds, then reply with
95*572c4311Sfengbojiang * a random number. Timeout is the command timeout, so that you can test
96*572c4311Sfengbojiang * what happens when the delay is greater than the timeout. */
HelloBlock_RedisCommand(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)97*572c4311Sfengbojiang int HelloBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
98*572c4311Sfengbojiang if (argc != 3) return RedisModule_WrongArity(ctx);
99*572c4311Sfengbojiang long long delay;
100*572c4311Sfengbojiang long long timeout;
101*572c4311Sfengbojiang
102*572c4311Sfengbojiang if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) {
103*572c4311Sfengbojiang return RedisModule_ReplyWithError(ctx,"ERR invalid count");
104*572c4311Sfengbojiang }
105*572c4311Sfengbojiang
106*572c4311Sfengbojiang if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK) {
107*572c4311Sfengbojiang return RedisModule_ReplyWithError(ctx,"ERR invalid count");
108*572c4311Sfengbojiang }
109*572c4311Sfengbojiang
110*572c4311Sfengbojiang pthread_t tid;
111*572c4311Sfengbojiang RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);
112*572c4311Sfengbojiang
113*572c4311Sfengbojiang /* Here we set a disconnection handler, however since this module will
114*572c4311Sfengbojiang * block in sleep() in a thread, there is not much we can do in the
115*572c4311Sfengbojiang * callback, so this is just to show you the API. */
116*572c4311Sfengbojiang RedisModule_SetDisconnectCallback(bc,HelloBlock_Disconnected);
117*572c4311Sfengbojiang
118*572c4311Sfengbojiang /* Now that we setup a blocking client, we need to pass the control
119*572c4311Sfengbojiang * to the thread. However we need to pass arguments to the thread:
120*572c4311Sfengbojiang * the delay and a reference to the blocked client handle. */
121*572c4311Sfengbojiang void **targ = RedisModule_Alloc(sizeof(void*)*2);
122*572c4311Sfengbojiang targ[0] = bc;
123*572c4311Sfengbojiang targ[1] = (void*)(unsigned long) delay;
124*572c4311Sfengbojiang
125*572c4311Sfengbojiang if (pthread_create(&tid,NULL,HelloBlock_ThreadMain,targ) != 0) {
126*572c4311Sfengbojiang RedisModule_AbortBlock(bc);
127*572c4311Sfengbojiang return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
128*572c4311Sfengbojiang }
129*572c4311Sfengbojiang return REDISMODULE_OK;
130*572c4311Sfengbojiang }
131*572c4311Sfengbojiang
132*572c4311Sfengbojiang /* The thread entry point that actually executes the blocking part
133*572c4311Sfengbojiang * of the command HELLO.KEYS.
134*572c4311Sfengbojiang *
135*572c4311Sfengbojiang * Note: this implementation is very simple on purpose, so no duplicated
136*572c4311Sfengbojiang * keys (returned by SCAN) are filtered. However adding such a functionality
137*572c4311Sfengbojiang * would be trivial just using any data structure implementing a dictionary
138*572c4311Sfengbojiang * in order to filter the duplicated items. */
HelloKeys_ThreadMain(void * arg)139*572c4311Sfengbojiang void *HelloKeys_ThreadMain(void *arg) {
140*572c4311Sfengbojiang RedisModuleBlockedClient *bc = arg;
141*572c4311Sfengbojiang RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc);
142*572c4311Sfengbojiang long long cursor = 0;
143*572c4311Sfengbojiang size_t replylen = 0;
144*572c4311Sfengbojiang
145*572c4311Sfengbojiang RedisModule_ReplyWithArray(ctx,REDISMODULE_POSTPONED_ARRAY_LEN);
146*572c4311Sfengbojiang do {
147*572c4311Sfengbojiang RedisModule_ThreadSafeContextLock(ctx);
148*572c4311Sfengbojiang RedisModuleCallReply *reply = RedisModule_Call(ctx,
149*572c4311Sfengbojiang "SCAN","l",(long long)cursor);
150*572c4311Sfengbojiang RedisModule_ThreadSafeContextUnlock(ctx);
151*572c4311Sfengbojiang
152*572c4311Sfengbojiang RedisModuleCallReply *cr_cursor =
153*572c4311Sfengbojiang RedisModule_CallReplyArrayElement(reply,0);
154*572c4311Sfengbojiang RedisModuleCallReply *cr_keys =
155*572c4311Sfengbojiang RedisModule_CallReplyArrayElement(reply,1);
156*572c4311Sfengbojiang
157*572c4311Sfengbojiang RedisModuleString *s = RedisModule_CreateStringFromCallReply(cr_cursor);
158*572c4311Sfengbojiang RedisModule_StringToLongLong(s,&cursor);
159*572c4311Sfengbojiang RedisModule_FreeString(ctx,s);
160*572c4311Sfengbojiang
161*572c4311Sfengbojiang size_t items = RedisModule_CallReplyLength(cr_keys);
162*572c4311Sfengbojiang for (size_t j = 0; j < items; j++) {
163*572c4311Sfengbojiang RedisModuleCallReply *ele =
164*572c4311Sfengbojiang RedisModule_CallReplyArrayElement(cr_keys,j);
165*572c4311Sfengbojiang RedisModule_ReplyWithCallReply(ctx,ele);
166*572c4311Sfengbojiang replylen++;
167*572c4311Sfengbojiang }
168*572c4311Sfengbojiang RedisModule_FreeCallReply(reply);
169*572c4311Sfengbojiang } while (cursor != 0);
170*572c4311Sfengbojiang RedisModule_ReplySetArrayLength(ctx,replylen);
171*572c4311Sfengbojiang
172*572c4311Sfengbojiang RedisModule_FreeThreadSafeContext(ctx);
173*572c4311Sfengbojiang RedisModule_UnblockClient(bc,NULL);
174*572c4311Sfengbojiang return NULL;
175*572c4311Sfengbojiang }
176*572c4311Sfengbojiang
177*572c4311Sfengbojiang /* HELLO.KEYS -- Return all the keys in the current database without blocking
178*572c4311Sfengbojiang * the server. The keys do not represent a point-in-time state so only the keys
179*572c4311Sfengbojiang * that were in the database from the start to the end are guaranteed to be
180*572c4311Sfengbojiang * there. */
HelloKeys_RedisCommand(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)181*572c4311Sfengbojiang int HelloKeys_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
182*572c4311Sfengbojiang REDISMODULE_NOT_USED(argv);
183*572c4311Sfengbojiang if (argc != 1) return RedisModule_WrongArity(ctx);
184*572c4311Sfengbojiang
185*572c4311Sfengbojiang pthread_t tid;
186*572c4311Sfengbojiang
187*572c4311Sfengbojiang /* Note that when blocking the client we do not set any callback: no
188*572c4311Sfengbojiang * timeout is possible since we passed '0', nor we need a reply callback
189*572c4311Sfengbojiang * because we'll use the thread safe context to accumulate a reply. */
190*572c4311Sfengbojiang RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0);
191*572c4311Sfengbojiang
192*572c4311Sfengbojiang /* Now that we setup a blocking client, we need to pass the control
193*572c4311Sfengbojiang * to the thread. However we need to pass arguments to the thread:
194*572c4311Sfengbojiang * the reference to the blocked client handle. */
195*572c4311Sfengbojiang if (pthread_create(&tid,NULL,HelloKeys_ThreadMain,bc) != 0) {
196*572c4311Sfengbojiang RedisModule_AbortBlock(bc);
197*572c4311Sfengbojiang return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
198*572c4311Sfengbojiang }
199*572c4311Sfengbojiang return REDISMODULE_OK;
200*572c4311Sfengbojiang }
201*572c4311Sfengbojiang
202*572c4311Sfengbojiang /* This function must be present on each Redis module. It is used in order to
203*572c4311Sfengbojiang * register the commands into the Redis server. */
RedisModule_OnLoad(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)204*572c4311Sfengbojiang int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
205*572c4311Sfengbojiang REDISMODULE_NOT_USED(argv);
206*572c4311Sfengbojiang REDISMODULE_NOT_USED(argc);
207*572c4311Sfengbojiang
208*572c4311Sfengbojiang if (RedisModule_Init(ctx,"helloblock",1,REDISMODULE_APIVER_1)
209*572c4311Sfengbojiang == REDISMODULE_ERR) return REDISMODULE_ERR;
210*572c4311Sfengbojiang
211*572c4311Sfengbojiang if (RedisModule_CreateCommand(ctx,"hello.block",
212*572c4311Sfengbojiang HelloBlock_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
213*572c4311Sfengbojiang return REDISMODULE_ERR;
214*572c4311Sfengbojiang if (RedisModule_CreateCommand(ctx,"hello.keys",
215*572c4311Sfengbojiang HelloKeys_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
216*572c4311Sfengbojiang return REDISMODULE_ERR;
217*572c4311Sfengbojiang
218*572c4311Sfengbojiang return REDISMODULE_OK;
219*572c4311Sfengbojiang }
220