xref: /f-stack/app/redis-5.0.5/src/blocked.c (revision 572c4311)
1 /* blocked.c - generic support for blocking operations like BLPOP & WAIT.
2  *
3  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are met:
8  *
9  *   * Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *   * Redistributions in binary form must reproduce the above copyright
12  *     notice, this list of conditions and the following disclaimer in the
13  *     documentation and/or other materials provided with the distribution.
14  *   * Neither the name of Redis nor the names of its contributors may be used
15  *     to endorse or promote products derived from this software without
16  *     specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * ---------------------------------------------------------------------------
31  *
32  * API:
33  *
34  * getTimeoutFromObjectOrReply() is just an utility function to parse a
35  * timeout argument since blocking operations usually require a timeout.
36  *
37  * blockClient() set the CLIENT_BLOCKED flag in the client, and set the
38  * specified block type 'btype' filed to one of BLOCKED_* macros.
39  *
40  * unblockClient() unblocks the client doing the following:
41  * 1) It calls the btype-specific function to cleanup the state.
42  * 2) It unblocks the client by unsetting the CLIENT_BLOCKED flag.
43  * 3) It puts the client into a list of just unblocked clients that are
44  *    processed ASAP in the beforeSleep() event loop callback, so that
45  *    if there is some query buffer to process, we do it. This is also
46  *    required because otherwise there is no 'readable' event fired, we
47  *    already read the pending commands. We also set the CLIENT_UNBLOCKED
48  *    flag to remember the client is in the unblocked_clients list.
49  *
50  * processUnblockedClients() is called inside the beforeSleep() function
51  * to process the query buffer from unblocked clients and remove the clients
52  * from the blocked_clients queue.
53  *
54  * replyToBlockedClientTimedOut() is called by the cron function when
55  * a client blocked reaches the specified timeout (if the timeout is set
56  * to 0, no timeout is processed).
57  * It usually just needs to send a reply to the client.
58  *
59  * When implementing a new type of blocking opeation, the implementation
60  * should modify unblockClient() and replyToBlockedClientTimedOut() in order
61  * to handle the btype-specific behavior of this two functions.
62  * If the blocking operation waits for certain keys to change state, the
63  * clusterRedirectBlockedClientIfNeeded() function should also be updated.
64  */
65 
66 #include "server.h"
67 
68 int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where);
69 
70 /* Get a timeout value from an object and store it into 'timeout'.
71  * The final timeout is always stored as milliseconds as a time where the
72  * timeout will expire, however the parsing is performed according to
73  * the 'unit' that can be seconds or milliseconds.
74  *
75  * Note that if the timeout is zero (usually from the point of view of
76  * commands API this means no timeout) the value stored into 'timeout'
77  * is zero. */
getTimeoutFromObjectOrReply(client * c,robj * object,mstime_t * timeout,int unit)78 int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit) {
79     long long tval;
80 
81     if (getLongLongFromObjectOrReply(c,object,&tval,
82         "timeout is not an integer or out of range") != C_OK)
83         return C_ERR;
84 
85     if (tval < 0) {
86         addReplyError(c,"timeout is negative");
87         return C_ERR;
88     }
89 
90     if (tval > 0) {
91         if (unit == UNIT_SECONDS) tval *= 1000;
92         tval += mstime();
93     }
94     *timeout = tval;
95 
96     return C_OK;
97 }
98 
99 /* Block a client for the specific operation type. Once the CLIENT_BLOCKED
100  * flag is set client query buffer is not longer processed, but accumulated,
101  * and will be processed when the client is unblocked. */
blockClient(client * c,int btype)102 void blockClient(client *c, int btype) {
103     c->flags |= CLIENT_BLOCKED;
104     c->btype = btype;
105     server.blocked_clients++;
106     server.blocked_clients_by_type[btype]++;
107 }
108 
109 /* This function is called in the beforeSleep() function of the event loop
110  * in order to process the pending input buffer of clients that were
111  * unblocked after a blocking operation. */
processUnblockedClients(void)112 void processUnblockedClients(void) {
113     listNode *ln;
114     client *c;
115 
116     while (listLength(server.unblocked_clients)) {
117         ln = listFirst(server.unblocked_clients);
118         serverAssert(ln != NULL);
119         c = ln->value;
120         listDelNode(server.unblocked_clients,ln);
121         c->flags &= ~CLIENT_UNBLOCKED;
122 
123         /* Process remaining data in the input buffer, unless the client
124          * is blocked again. Actually processInputBuffer() checks that the
125          * client is not blocked before to proceed, but things may change and
126          * the code is conceptually more correct this way. */
127         if (!(c->flags & CLIENT_BLOCKED)) {
128             if (c->querybuf && sdslen(c->querybuf) > 0) {
129                 processInputBufferAndReplicate(c);
130             }
131         }
132     }
133 }
134 
135 /* This function will schedule the client for reprocessing at a safe time.
136  *
137  * This is useful when a client was blocked for some reason (blocking opeation,
138  * CLIENT PAUSE, or whatever), because it may end with some accumulated query
139  * buffer that needs to be processed ASAP:
140  *
141  * 1. When a client is blocked, its readable handler is still active.
142  * 2. However in this case it only gets data into the query buffer, but the
143  *    query is not parsed or executed once there is enough to proceed as
144  *    usually (because the client is blocked... so we can't execute commands).
145  * 3. When the client is unblocked, without this function, the client would
146  *    have to write some query in order for the readable handler to finally
147  *    call processQueryBuffer*() on it.
148  * 4. With this function instead we can put the client in a queue that will
149  *    process it for queries ready to be executed at a safe time.
150  */
queueClientForReprocessing(client * c)151 void queueClientForReprocessing(client *c) {
152     /* The client may already be into the unblocked list because of a previous
153      * blocking operation, don't add back it into the list multiple times. */
154     if (!(c->flags & CLIENT_UNBLOCKED)) {
155         c->flags |= CLIENT_UNBLOCKED;
156         listAddNodeTail(server.unblocked_clients,c);
157     }
158 }
159 
160 /* Unblock a client calling the right function depending on the kind
161  * of operation the client is blocking for. */
unblockClient(client * c)162 void unblockClient(client *c) {
163     if (c->btype == BLOCKED_LIST ||
164         c->btype == BLOCKED_ZSET ||
165         c->btype == BLOCKED_STREAM) {
166         unblockClientWaitingData(c);
167     } else if (c->btype == BLOCKED_WAIT) {
168         unblockClientWaitingReplicas(c);
169     } else if (c->btype == BLOCKED_MODULE) {
170         unblockClientFromModule(c);
171     } else {
172         serverPanic("Unknown btype in unblockClient().");
173     }
174     /* Clear the flags, and put the client in the unblocked list so that
175      * we'll process new commands in its query buffer ASAP. */
176     server.blocked_clients--;
177     server.blocked_clients_by_type[c->btype]--;
178     c->flags &= ~CLIENT_BLOCKED;
179     c->btype = BLOCKED_NONE;
180     queueClientForReprocessing(c);
181 }
182 
183 /* This function gets called when a blocked client timed out in order to
184  * send it a reply of some kind. After this function is called,
185  * unblockClient() will be called with the same client as argument. */
replyToBlockedClientTimedOut(client * c)186 void replyToBlockedClientTimedOut(client *c) {
187     if (c->btype == BLOCKED_LIST ||
188         c->btype == BLOCKED_ZSET ||
189         c->btype == BLOCKED_STREAM) {
190         addReply(c,shared.nullmultibulk);
191     } else if (c->btype == BLOCKED_WAIT) {
192         addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
193     } else if (c->btype == BLOCKED_MODULE) {
194         moduleBlockedClientTimedOut(c);
195     } else {
196         serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
197     }
198 }
199 
200 /* Mass-unblock clients because something changed in the instance that makes
201  * blocking no longer safe. For example clients blocked in list operations
202  * in an instance which turns from master to slave is unsafe, so this function
203  * is called when a master turns into a slave.
204  *
205  * The semantics is to send an -UNBLOCKED error to the client, disconnecting
206  * it at the same time. */
disconnectAllBlockedClients(void)207 void disconnectAllBlockedClients(void) {
208     listNode *ln;
209     listIter li;
210 
211     listRewind(server.clients,&li);
212     while((ln = listNext(&li))) {
213         client *c = listNodeValue(ln);
214 
215         if (c->flags & CLIENT_BLOCKED) {
216             addReplySds(c,sdsnew(
217                 "-UNBLOCKED force unblock from blocking operation, "
218                 "instance state changed (master -> replica?)\r\n"));
219             unblockClient(c);
220             c->flags |= CLIENT_CLOSE_AFTER_REPLY;
221         }
222     }
223 }
224 
225 /* This function should be called by Redis every time a single command,
226  * a MULTI/EXEC block, or a Lua script, terminated its execution after
227  * being called by a client. It handles serving clients blocked in
228  * lists, streams, and sorted sets, via a blocking commands.
229  *
230  * All the keys with at least one client blocked that received at least
231  * one new element via some write operation are accumulated into
232  * the server.ready_keys list. This function will run the list and will
233  * serve clients accordingly. Note that the function will iterate again and
234  * again as a result of serving BRPOPLPUSH we can have new blocking clients
235  * to serve because of the PUSH side of BRPOPLPUSH.
236  *
237  * This function is normally "fair", that is, it will server clients
238  * using a FIFO behavior. However this fairness is violated in certain
239  * edge cases, that is, when we have clients blocked at the same time
240  * in a sorted set and in a list, for the same key (a very odd thing to
241  * do client side, indeed!). Because mismatching clients (blocking for
242  * a different type compared to the current key type) are moved in the
243  * other side of the linked list. However as long as the key starts to
244  * be used only for a single type, like virtually any Redis application will
245  * do, the function is already fair. */
handleClientsBlockedOnKeys(void)246 void handleClientsBlockedOnKeys(void) {
247     while(listLength(server.ready_keys) != 0) {
248         list *l;
249 
250         /* Point server.ready_keys to a fresh list and save the current one
251          * locally. This way as we run the old list we are free to call
252          * signalKeyAsReady() that may push new elements in server.ready_keys
253          * when handling clients blocked into BRPOPLPUSH. */
254         l = server.ready_keys;
255         server.ready_keys = listCreate();
256 
257         while(listLength(l) != 0) {
258             listNode *ln = listFirst(l);
259             readyList *rl = ln->value;
260 
261             /* First of all remove this key from db->ready_keys so that
262              * we can safely call signalKeyAsReady() against this key. */
263             dictDelete(rl->db->ready_keys,rl->key);
264 
265             /* Serve clients blocked on list key. */
266             robj *o = lookupKeyWrite(rl->db,rl->key);
267             if (o != NULL && o->type == OBJ_LIST) {
268                 dictEntry *de;
269 
270                 /* We serve clients in the same order they blocked for
271                  * this key, from the first blocked to the last. */
272                 de = dictFind(rl->db->blocking_keys,rl->key);
273                 if (de) {
274                     list *clients = dictGetVal(de);
275                     int numclients = listLength(clients);
276 
277                     while(numclients--) {
278                         listNode *clientnode = listFirst(clients);
279                         client *receiver = clientnode->value;
280 
281                         if (receiver->btype != BLOCKED_LIST) {
282                             /* Put at the tail, so that at the next call
283                              * we'll not run into it again. */
284                             listDelNode(clients,clientnode);
285                             listAddNodeTail(clients,receiver);
286                             continue;
287                         }
288 
289                         robj *dstkey = receiver->bpop.target;
290                         int where = (receiver->lastcmd &&
291                                      receiver->lastcmd->proc == blpopCommand) ?
292                                      LIST_HEAD : LIST_TAIL;
293                         robj *value = listTypePop(o,where);
294 
295                         if (value) {
296                             /* Protect receiver->bpop.target, that will be
297                              * freed by the next unblockClient()
298                              * call. */
299                             if (dstkey) incrRefCount(dstkey);
300                             unblockClient(receiver);
301 
302                             if (serveClientBlockedOnList(receiver,
303                                 rl->key,dstkey,rl->db,value,
304                                 where) == C_ERR)
305                             {
306                                 /* If we failed serving the client we need
307                                  * to also undo the POP operation. */
308                                 listTypePush(o,value,where);
309                             }
310 
311                             if (dstkey) decrRefCount(dstkey);
312                             decrRefCount(value);
313                         } else {
314                             break;
315                         }
316                     }
317                 }
318 
319                 if (listTypeLength(o) == 0) {
320                     dbDelete(rl->db,rl->key);
321                     notifyKeyspaceEvent(NOTIFY_GENERIC,"del",rl->key,rl->db->id);
322                 }
323                 /* We don't call signalModifiedKey() as it was already called
324                  * when an element was pushed on the list. */
325             }
326 
327             /* Serve clients blocked on sorted set key. */
328             else if (o != NULL && o->type == OBJ_ZSET) {
329                 dictEntry *de;
330 
331                 /* We serve clients in the same order they blocked for
332                  * this key, from the first blocked to the last. */
333                 de = dictFind(rl->db->blocking_keys,rl->key);
334                 if (de) {
335                     list *clients = dictGetVal(de);
336                     int numclients = listLength(clients);
337                     unsigned long zcard = zsetLength(o);
338 
339                     while(numclients-- && zcard) {
340                         listNode *clientnode = listFirst(clients);
341                         client *receiver = clientnode->value;
342 
343                         if (receiver->btype != BLOCKED_ZSET) {
344                             /* Put at the tail, so that at the next call
345                              * we'll not run into it again. */
346                             listDelNode(clients,clientnode);
347                             listAddNodeTail(clients,receiver);
348                             continue;
349                         }
350 
351                         int where = (receiver->lastcmd &&
352                                      receiver->lastcmd->proc == bzpopminCommand)
353                                      ? ZSET_MIN : ZSET_MAX;
354                         unblockClient(receiver);
355                         genericZpopCommand(receiver,&rl->key,1,where,1,NULL);
356                         zcard--;
357 
358                         /* Replicate the command. */
359                         robj *argv[2];
360                         struct redisCommand *cmd = where == ZSET_MIN ?
361                                                    server.zpopminCommand :
362                                                    server.zpopmaxCommand;
363                         argv[0] = createStringObject(cmd->name,strlen(cmd->name));
364                         argv[1] = rl->key;
365                         incrRefCount(rl->key);
366                         propagate(cmd,receiver->db->id,
367                                   argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
368                         decrRefCount(argv[0]);
369                         decrRefCount(argv[1]);
370                     }
371                 }
372             }
373 
374             /* Serve clients blocked on stream key. */
375             else if (o != NULL && o->type == OBJ_STREAM) {
376                 dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
377                 stream *s = o->ptr;
378 
379                 /* We need to provide the new data arrived on the stream
380                  * to all the clients that are waiting for an offset smaller
381                  * than the current top item. */
382                 if (de) {
383                     list *clients = dictGetVal(de);
384                     listNode *ln;
385                     listIter li;
386                     listRewind(clients,&li);
387 
388                     while((ln = listNext(&li))) {
389                         client *receiver = listNodeValue(ln);
390                         if (receiver->btype != BLOCKED_STREAM) continue;
391                         streamID *gt = dictFetchValue(receiver->bpop.keys,
392                                                       rl->key);
393 
394                         /* If we blocked in the context of a consumer
395                          * group, we need to resolve the group and update the
396                          * last ID the client is blocked for: this is needed
397                          * because serving other clients in the same consumer
398                          * group will alter the "last ID" of the consumer
399                          * group, and clients blocked in a consumer group are
400                          * always blocked for the ">" ID: we need to deliver
401                          * only new messages and avoid unblocking the client
402                          * otherwise. */
403                         streamCG *group = NULL;
404                         if (receiver->bpop.xread_group) {
405                             group = streamLookupCG(s,
406                                     receiver->bpop.xread_group->ptr);
407                             /* If the group was not found, send an error
408                              * to the consumer. */
409                             if (!group) {
410                                 addReplyError(receiver,
411                                     "-NOGROUP the consumer group this client "
412                                     "was blocked on no longer exists");
413                                 unblockClient(receiver);
414                                 continue;
415                             } else {
416                                 *gt = group->last_id;
417                             }
418                         }
419 
420                         if (streamCompareID(&s->last_id, gt) > 0) {
421                             streamID start = *gt;
422                             start.seq++; /* Can't overflow, it's an uint64_t */
423 
424                             /* Lookup the consumer for the group, if any. */
425                             streamConsumer *consumer = NULL;
426                             int noack = 0;
427 
428                             if (group) {
429                                 consumer = streamLookupConsumer(group,
430                                            receiver->bpop.xread_consumer->ptr,
431                                            1);
432                                 noack = receiver->bpop.xread_group_noack;
433                             }
434 
435                             /* Emit the two elements sub-array consisting of
436                              * the name of the stream and the data we
437                              * extracted from it. Wrapped in a single-item
438                              * array, since we have just one key. */
439                             addReplyMultiBulkLen(receiver,1);
440                             addReplyMultiBulkLen(receiver,2);
441                             addReplyBulk(receiver,rl->key);
442 
443                             streamPropInfo pi = {
444                                 rl->key,
445                                 receiver->bpop.xread_group
446                             };
447                             streamReplyWithRange(receiver,s,&start,NULL,
448                                                  receiver->bpop.xread_count,
449                                                  0, group, consumer, noack, &pi);
450 
451                             /* Note that after we unblock the client, 'gt'
452                              * and other receiver->bpop stuff are no longer
453                              * valid, so we must do the setup above before
454                              * this call. */
455                             unblockClient(receiver);
456                         }
457                     }
458                 }
459             }
460 
461             /* Free this item. */
462             decrRefCount(rl->key);
463             zfree(rl);
464             listDelNode(l,ln);
465         }
466         listRelease(l); /* We have the new list on place at this point. */
467     }
468 }
469 
470 /* This is how the current blocking lists/sorted sets/streams work, we use
471  * BLPOP as example, but the concept is the same for other list ops, sorted
472  * sets and XREAD.
473  * - If the user calls BLPOP and the key exists and contains a non empty list
474  *   then LPOP is called instead. So BLPOP is semantically the same as LPOP
475  *   if blocking is not required.
476  * - If instead BLPOP is called and the key does not exists or the list is
477  *   empty we need to block. In order to do so we remove the notification for
478  *   new data to read in the client socket (so that we'll not serve new
479  *   requests if the blocking request is not served). Also we put the client
480  *   in a dictionary (db->blocking_keys) mapping keys to a list of clients
481  *   blocking for this keys.
482  * - If a PUSH operation against a key with blocked clients waiting is
483  *   performed, we mark this key as "ready", and after the current command,
484  *   MULTI/EXEC block, or script, is executed, we serve all the clients waiting
485  *   for this list, from the one that blocked first, to the last, accordingly
486  *   to the number of elements we have in the ready list.
487  */
488 
489 /* Set a client in blocking mode for the specified key (list, zset or stream),
490  * with the specified timeout. The 'type' argument is BLOCKED_LIST,
491  * BLOCKED_ZSET or BLOCKED_STREAM depending on the kind of operation we are
492  * waiting for an empty key in order to awake the client. The client is blocked
493  * for all the 'numkeys' keys as in the 'keys' argument. When we block for
494  * stream keys, we also provide an array of streamID structures: clients will
495  * be unblocked only when items with an ID greater or equal to the specified
496  * one is appended to the stream. */
blockForKeys(client * c,int btype,robj ** keys,int numkeys,mstime_t timeout,robj * target,streamID * ids)497 void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids) {
498     dictEntry *de;
499     list *l;
500     int j;
501 
502     c->bpop.timeout = timeout;
503     c->bpop.target = target;
504 
505     if (target != NULL) incrRefCount(target);
506 
507     for (j = 0; j < numkeys; j++) {
508         /* The value associated with the key name in the bpop.keys dictionary
509          * is NULL for lists and sorted sets, or the stream ID for streams. */
510         void *key_data = NULL;
511         if (btype == BLOCKED_STREAM) {
512             key_data = zmalloc(sizeof(streamID));
513             memcpy(key_data,ids+j,sizeof(streamID));
514         }
515 
516         /* If the key already exists in the dictionary ignore it. */
517         if (dictAdd(c->bpop.keys,keys[j],key_data) != DICT_OK) {
518             zfree(key_data);
519             continue;
520         }
521         incrRefCount(keys[j]);
522 
523         /* And in the other "side", to map keys -> clients */
524         de = dictFind(c->db->blocking_keys,keys[j]);
525         if (de == NULL) {
526             int retval;
527 
528             /* For every key we take a list of clients blocked for it */
529             l = listCreate();
530             retval = dictAdd(c->db->blocking_keys,keys[j],l);
531             incrRefCount(keys[j]);
532             serverAssertWithInfo(c,keys[j],retval == DICT_OK);
533         } else {
534             l = dictGetVal(de);
535         }
536         listAddNodeTail(l,c);
537     }
538     blockClient(c,btype);
539 }
540 
541 /* Unblock a client that's waiting in a blocking operation such as BLPOP.
542  * You should never call this function directly, but unblockClient() instead. */
unblockClientWaitingData(client * c)543 void unblockClientWaitingData(client *c) {
544     dictEntry *de;
545     dictIterator *di;
546     list *l;
547 
548     serverAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0);
549     di = dictGetIterator(c->bpop.keys);
550     /* The client may wait for multiple keys, so unblock it for every key. */
551     while((de = dictNext(di)) != NULL) {
552         robj *key = dictGetKey(de);
553 
554         /* Remove this client from the list of clients waiting for this key. */
555         l = dictFetchValue(c->db->blocking_keys,key);
556         serverAssertWithInfo(c,key,l != NULL);
557         listDelNode(l,listSearchKey(l,c));
558         /* If the list is empty we need to remove it to avoid wasting memory */
559         if (listLength(l) == 0)
560             dictDelete(c->db->blocking_keys,key);
561     }
562     dictReleaseIterator(di);
563 
564     /* Cleanup the client structure */
565     dictEmpty(c->bpop.keys,NULL);
566     if (c->bpop.target) {
567         decrRefCount(c->bpop.target);
568         c->bpop.target = NULL;
569     }
570     if (c->bpop.xread_group) {
571         decrRefCount(c->bpop.xread_group);
572         decrRefCount(c->bpop.xread_consumer);
573         c->bpop.xread_group = NULL;
574         c->bpop.xread_consumer = NULL;
575     }
576 }
577 
578 /* If the specified key has clients blocked waiting for list pushes, this
579  * function will put the key reference into the server.ready_keys list.
580  * Note that db->ready_keys is a hash table that allows us to avoid putting
581  * the same key again and again in the list in case of multiple pushes
582  * made by a script or in the context of MULTI/EXEC.
583  *
584  * The list will be finally processed by handleClientsBlockedOnLists() */
signalKeyAsReady(redisDb * db,robj * key)585 void signalKeyAsReady(redisDb *db, robj *key) {
586     readyList *rl;
587 
588     /* No clients blocking for this key? No need to queue it. */
589     if (dictFind(db->blocking_keys,key) == NULL) return;
590 
591     /* Key was already signaled? No need to queue it again. */
592     if (dictFind(db->ready_keys,key) != NULL) return;
593 
594     /* Ok, we need to queue this key into server.ready_keys. */
595     rl = zmalloc(sizeof(*rl));
596     rl->key = key;
597     rl->db = db;
598     incrRefCount(key);
599     listAddNodeTail(server.ready_keys,rl);
600 
601     /* We also add the key in the db->ready_keys dictionary in order
602      * to avoid adding it multiple times into a list with a simple O(1)
603      * check. */
604     incrRefCount(key);
605     serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
606 }
607 
608 
609