xref: /redis-3.2.3/src/t_list.c (revision bf30f5a7)
1 /*
2  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *   * Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *   * Redistributions in binary form must reproduce the above copyright
11  *     notice, this list of conditions and the following disclaimer in the
12  *     documentation and/or other materials provided with the distribution.
13  *   * Neither the name of Redis nor the names of its contributors may be used
14  *     to endorse or promote products derived from this software without
15  *     specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include "server.h"
31 
32 /*-----------------------------------------------------------------------------
33  * List API
34  *----------------------------------------------------------------------------*/
35 
36 /* The function pushes an element to the specified list object 'subject',
37  * at head or tail position as specified by 'where'.
38  *
39  * There is no need for the caller to increment the refcount of 'value' as
40  * the function takes care of it if needed. */
listTypePush(robj * subject,robj * value,int where)41 void listTypePush(robj *subject, robj *value, int where) {
42     if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
43         int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
44         value = getDecodedObject(value);
45         size_t len = sdslen(value->ptr);
46         quicklistPush(subject->ptr, value->ptr, len, pos);
47         decrRefCount(value);
48     } else {
49         serverPanic("Unknown list encoding");
50     }
51 }
52 
listPopSaver(unsigned char * data,unsigned int sz)53 void *listPopSaver(unsigned char *data, unsigned int sz) {
54     return createStringObject((char*)data,sz);
55 }
56 
listTypePop(robj * subject,int where)57 robj *listTypePop(robj *subject, int where) {
58     long long vlong;
59     robj *value = NULL;
60 
61     int ql_where = where == LIST_HEAD ? QUICKLIST_HEAD : QUICKLIST_TAIL;
62     if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
63         if (quicklistPopCustom(subject->ptr, ql_where, (unsigned char **)&value,
64                                NULL, &vlong, listPopSaver)) {
65             if (!value)
66                 value = createStringObjectFromLongLong(vlong);
67         }
68     } else {
69         serverPanic("Unknown list encoding");
70     }
71     return value;
72 }
73 
listTypeLength(robj * subject)74 unsigned long listTypeLength(robj *subject) {
75     if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
76         return quicklistCount(subject->ptr);
77     } else {
78         serverPanic("Unknown list encoding");
79     }
80 }
81 
82 /* Initialize an iterator at the specified index. */
listTypeInitIterator(robj * subject,long index,unsigned char direction)83 listTypeIterator *listTypeInitIterator(robj *subject, long index,
84                                        unsigned char direction) {
85     listTypeIterator *li = zmalloc(sizeof(listTypeIterator));
86     li->subject = subject;
87     li->encoding = subject->encoding;
88     li->direction = direction;
89     li->iter = NULL;
90     /* LIST_HEAD means start at TAIL and move *towards* head.
91      * LIST_TAIL means start at HEAD and move *towards tail. */
92     int iter_direction =
93         direction == LIST_HEAD ? AL_START_TAIL : AL_START_HEAD;
94     if (li->encoding == OBJ_ENCODING_QUICKLIST) {
95         li->iter = quicklistGetIteratorAtIdx(li->subject->ptr,
96                                              iter_direction, index);
97     } else {
98         serverPanic("Unknown list encoding");
99     }
100     return li;
101 }
102 
103 /* Clean up the iterator. */
listTypeReleaseIterator(listTypeIterator * li)104 void listTypeReleaseIterator(listTypeIterator *li) {
105     zfree(li->iter);
106     zfree(li);
107 }
108 
109 /* Stores pointer to current the entry in the provided entry structure
110  * and advances the position of the iterator. Returns 1 when the current
111  * entry is in fact an entry, 0 otherwise. */
listTypeNext(listTypeIterator * li,listTypeEntry * entry)112 int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {
113     /* Protect from converting when iterating */
114     serverAssert(li->subject->encoding == li->encoding);
115 
116     entry->li = li;
117     if (li->encoding == OBJ_ENCODING_QUICKLIST) {
118         return quicklistNext(li->iter, &entry->entry);
119     } else {
120         serverPanic("Unknown list encoding");
121     }
122     return 0;
123 }
124 
125 /* Return entry or NULL at the current position of the iterator. */
listTypeGet(listTypeEntry * entry)126 robj *listTypeGet(listTypeEntry *entry) {
127     robj *value = NULL;
128     if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
129         if (entry->entry.value) {
130             value = createStringObject((char *)entry->entry.value,
131                                        entry->entry.sz);
132         } else {
133             value = createStringObjectFromLongLong(entry->entry.longval);
134         }
135     } else {
136         serverPanic("Unknown list encoding");
137     }
138     return value;
139 }
140 
listTypeInsert(listTypeEntry * entry,robj * value,int where)141 void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
142     if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
143         value = getDecodedObject(value);
144         sds str = value->ptr;
145         size_t len = sdslen(str);
146         if (where == LIST_TAIL) {
147             quicklistInsertAfter((quicklist *)entry->entry.quicklist,
148                                  &entry->entry, str, len);
149         } else if (where == LIST_HEAD) {
150             quicklistInsertBefore((quicklist *)entry->entry.quicklist,
151                                   &entry->entry, str, len);
152         }
153         decrRefCount(value);
154     } else {
155         serverPanic("Unknown list encoding");
156     }
157 }
158 
159 /* Compare the given object with the entry at the current position. */
listTypeEqual(listTypeEntry * entry,robj * o)160 int listTypeEqual(listTypeEntry *entry, robj *o) {
161     if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
162         serverAssertWithInfo(NULL,o,sdsEncodedObject(o));
163         return quicklistCompare(entry->entry.zi,o->ptr,sdslen(o->ptr));
164     } else {
165         serverPanic("Unknown list encoding");
166     }
167 }
168 
169 /* Delete the element pointed to. */
listTypeDelete(listTypeIterator * iter,listTypeEntry * entry)170 void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry) {
171     if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
172         quicklistDelEntry(iter->iter, &entry->entry);
173     } else {
174         serverPanic("Unknown list encoding");
175     }
176 }
177 
178 /* Create a quicklist from a single ziplist */
listTypeConvert(robj * subject,int enc)179 void listTypeConvert(robj *subject, int enc) {
180     serverAssertWithInfo(NULL,subject,subject->type==OBJ_LIST);
181     serverAssertWithInfo(NULL,subject,subject->encoding==OBJ_ENCODING_ZIPLIST);
182 
183     if (enc == OBJ_ENCODING_QUICKLIST) {
184         size_t zlen = server.list_max_ziplist_size;
185         int depth = server.list_compress_depth;
186         subject->ptr = quicklistCreateFromZiplist(zlen, depth, subject->ptr);
187         subject->encoding = OBJ_ENCODING_QUICKLIST;
188     } else {
189         serverPanic("Unsupported list conversion");
190     }
191 }
192 
193 /*-----------------------------------------------------------------------------
194  * List Commands
195  *----------------------------------------------------------------------------*/
196 
pushGenericCommand(client * c,int where)197 void pushGenericCommand(client *c, int where) {
198     int j, waiting = 0, pushed = 0;
199     robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
200 
201     if (lobj && lobj->type != OBJ_LIST) {
202         addReply(c,shared.wrongtypeerr);
203         return;
204     }
205 
206     for (j = 2; j < c->argc; j++) {
207         c->argv[j] = tryObjectEncoding(c->argv[j]);
208         if (!lobj) {
209             lobj = createQuicklistObject();
210             quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
211                                 server.list_compress_depth);
212             dbAdd(c->db,c->argv[1],lobj);
213         }
214         listTypePush(lobj,c->argv[j],where);
215         pushed++;
216     }
217     addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
218     if (pushed) {
219         char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
220 
221         signalModifiedKey(c->db,c->argv[1]);
222         notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
223     }
224     server.dirty += pushed;
225 }
226 
lpushCommand(client * c)227 void lpushCommand(client *c) {
228     pushGenericCommand(c,LIST_HEAD);
229 }
230 
rpushCommand(client * c)231 void rpushCommand(client *c) {
232     pushGenericCommand(c,LIST_TAIL);
233 }
234 
pushxGenericCommand(client * c,robj * refval,robj * val,int where)235 void pushxGenericCommand(client *c, robj *refval, robj *val, int where) {
236     robj *subject;
237     listTypeIterator *iter;
238     listTypeEntry entry;
239     int inserted = 0;
240 
241     if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
242         checkType(c,subject,OBJ_LIST)) return;
243 
244     if (refval != NULL) {
245         /* Seek refval from head to tail */
246         iter = listTypeInitIterator(subject,0,LIST_TAIL);
247         while (listTypeNext(iter,&entry)) {
248             if (listTypeEqual(&entry,refval)) {
249                 listTypeInsert(&entry,val,where);
250                 inserted = 1;
251                 break;
252             }
253         }
254         listTypeReleaseIterator(iter);
255 
256         if (inserted) {
257             signalModifiedKey(c->db,c->argv[1]);
258             notifyKeyspaceEvent(NOTIFY_LIST,"linsert",
259                                 c->argv[1],c->db->id);
260             server.dirty++;
261         } else {
262             /* Notify client of a failed insert */
263             addReply(c,shared.cnegone);
264             return;
265         }
266     } else {
267         char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
268 
269         listTypePush(subject,val,where);
270         signalModifiedKey(c->db,c->argv[1]);
271         notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
272         server.dirty++;
273     }
274 
275     addReplyLongLong(c,listTypeLength(subject));
276 }
277 
lpushxCommand(client * c)278 void lpushxCommand(client *c) {
279     c->argv[2] = tryObjectEncoding(c->argv[2]);
280     pushxGenericCommand(c,NULL,c->argv[2],LIST_HEAD);
281 }
282 
rpushxCommand(client * c)283 void rpushxCommand(client *c) {
284     c->argv[2] = tryObjectEncoding(c->argv[2]);
285     pushxGenericCommand(c,NULL,c->argv[2],LIST_TAIL);
286 }
287 
linsertCommand(client * c)288 void linsertCommand(client *c) {
289     c->argv[4] = tryObjectEncoding(c->argv[4]);
290     if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
291         pushxGenericCommand(c,c->argv[3],c->argv[4],LIST_TAIL);
292     } else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
293         pushxGenericCommand(c,c->argv[3],c->argv[4],LIST_HEAD);
294     } else {
295         addReply(c,shared.syntaxerr);
296     }
297 }
298 
llenCommand(client * c)299 void llenCommand(client *c) {
300     robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
301     if (o == NULL || checkType(c,o,OBJ_LIST)) return;
302     addReplyLongLong(c,listTypeLength(o));
303 }
304 
lindexCommand(client * c)305 void lindexCommand(client *c) {
306     robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
307     if (o == NULL || checkType(c,o,OBJ_LIST)) return;
308     long index;
309     robj *value = NULL;
310 
311     if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
312         return;
313 
314     if (o->encoding == OBJ_ENCODING_QUICKLIST) {
315         quicklistEntry entry;
316         if (quicklistIndex(o->ptr, index, &entry)) {
317             if (entry.value) {
318                 value = createStringObject((char*)entry.value,entry.sz);
319             } else {
320                 value = createStringObjectFromLongLong(entry.longval);
321             }
322             addReplyBulk(c,value);
323             decrRefCount(value);
324         } else {
325             addReply(c,shared.nullbulk);
326         }
327     } else {
328         serverPanic("Unknown list encoding");
329     }
330 }
331 
lsetCommand(client * c)332 void lsetCommand(client *c) {
333     robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
334     if (o == NULL || checkType(c,o,OBJ_LIST)) return;
335     long index;
336     robj *value = c->argv[3];
337 
338     if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
339         return;
340 
341     if (o->encoding == OBJ_ENCODING_QUICKLIST) {
342         quicklist *ql = o->ptr;
343         int replaced = quicklistReplaceAtIndex(ql, index,
344                                                value->ptr, sdslen(value->ptr));
345         if (!replaced) {
346             addReply(c,shared.outofrangeerr);
347         } else {
348             addReply(c,shared.ok);
349             signalModifiedKey(c->db,c->argv[1]);
350             notifyKeyspaceEvent(NOTIFY_LIST,"lset",c->argv[1],c->db->id);
351             server.dirty++;
352         }
353     } else {
354         serverPanic("Unknown list encoding");
355     }
356 }
357 
popGenericCommand(client * c,int where)358 void popGenericCommand(client *c, int where) {
359     robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);
360     if (o == NULL || checkType(c,o,OBJ_LIST)) return;
361 
362     robj *value = listTypePop(o,where);
363     if (value == NULL) {
364         addReply(c,shared.nullbulk);
365     } else {
366         char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
367 
368         addReplyBulk(c,value);
369         decrRefCount(value);
370         notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
371         if (listTypeLength(o) == 0) {
372             notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
373                                 c->argv[1],c->db->id);
374             dbDelete(c->db,c->argv[1]);
375         }
376         signalModifiedKey(c->db,c->argv[1]);
377         server.dirty++;
378     }
379 }
380 
lpopCommand(client * c)381 void lpopCommand(client *c) {
382     popGenericCommand(c,LIST_HEAD);
383 }
384 
rpopCommand(client * c)385 void rpopCommand(client *c) {
386     popGenericCommand(c,LIST_TAIL);
387 }
388 
lrangeCommand(client * c)389 void lrangeCommand(client *c) {
390     robj *o;
391     long start, end, llen, rangelen;
392 
393     if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
394         (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
395 
396     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
397          || checkType(c,o,OBJ_LIST)) return;
398     llen = listTypeLength(o);
399 
400     /* convert negative indexes */
401     if (start < 0) start = llen+start;
402     if (end < 0) end = llen+end;
403     if (start < 0) start = 0;
404 
405     /* Invariant: start >= 0, so this test will be true when end < 0.
406      * The range is empty when start > end or start >= length. */
407     if (start > end || start >= llen) {
408         addReply(c,shared.emptymultibulk);
409         return;
410     }
411     if (end >= llen) end = llen-1;
412     rangelen = (end-start)+1;
413 
414     /* Return the result in form of a multi-bulk reply */
415     addReplyMultiBulkLen(c,rangelen);
416     if (o->encoding == OBJ_ENCODING_QUICKLIST) {
417         listTypeIterator *iter = listTypeInitIterator(o, start, LIST_TAIL);
418 
419         while(rangelen--) {
420             listTypeEntry entry;
421             listTypeNext(iter, &entry);
422             quicklistEntry *qe = &entry.entry;
423             if (qe->value) {
424                 addReplyBulkCBuffer(c,qe->value,qe->sz);
425             } else {
426                 addReplyBulkLongLong(c,qe->longval);
427             }
428         }
429         listTypeReleaseIterator(iter);
430     } else {
431         serverPanic("List encoding is not QUICKLIST!");
432     }
433 }
434 
ltrimCommand(client * c)435 void ltrimCommand(client *c) {
436     robj *o;
437     long start, end, llen, ltrim, rtrim;
438 
439     if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
440         (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
441 
442     if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
443         checkType(c,o,OBJ_LIST)) return;
444     llen = listTypeLength(o);
445 
446     /* convert negative indexes */
447     if (start < 0) start = llen+start;
448     if (end < 0) end = llen+end;
449     if (start < 0) start = 0;
450 
451     /* Invariant: start >= 0, so this test will be true when end < 0.
452      * The range is empty when start > end or start >= length. */
453     if (start > end || start >= llen) {
454         /* Out of range start or start > end result in empty list */
455         ltrim = llen;
456         rtrim = 0;
457     } else {
458         if (end >= llen) end = llen-1;
459         ltrim = start;
460         rtrim = llen-end-1;
461     }
462 
463     /* Remove list elements to perform the trim */
464     if (o->encoding == OBJ_ENCODING_QUICKLIST) {
465         quicklistDelRange(o->ptr,0,ltrim);
466         quicklistDelRange(o->ptr,-rtrim,rtrim);
467     } else {
468         serverPanic("Unknown list encoding");
469     }
470 
471     notifyKeyspaceEvent(NOTIFY_LIST,"ltrim",c->argv[1],c->db->id);
472     if (listTypeLength(o) == 0) {
473         dbDelete(c->db,c->argv[1]);
474         notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
475     }
476     signalModifiedKey(c->db,c->argv[1]);
477     server.dirty++;
478     addReply(c,shared.ok);
479 }
480 
lremCommand(client * c)481 void lremCommand(client *c) {
482     robj *subject, *obj;
483     obj = c->argv[3];
484     long toremove;
485     long removed = 0;
486 
487     if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != C_OK))
488         return;
489 
490     subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
491     if (subject == NULL || checkType(c,subject,OBJ_LIST)) return;
492 
493     listTypeIterator *li;
494     if (toremove < 0) {
495         toremove = -toremove;
496         li = listTypeInitIterator(subject,-1,LIST_HEAD);
497     } else {
498         li = listTypeInitIterator(subject,0,LIST_TAIL);
499     }
500 
501     listTypeEntry entry;
502     while (listTypeNext(li,&entry)) {
503         if (listTypeEqual(&entry,obj)) {
504             listTypeDelete(li, &entry);
505             server.dirty++;
506             removed++;
507             if (toremove && removed == toremove) break;
508         }
509     }
510     listTypeReleaseIterator(li);
511 
512     if (removed) {
513         signalModifiedKey(c->db,c->argv[1]);
514         notifyKeyspaceEvent(NOTIFY_GENERIC,"lrem",c->argv[1],c->db->id);
515     }
516 
517     if (listTypeLength(subject) == 0) {
518         dbDelete(c->db,c->argv[1]);
519         notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
520     }
521 
522     addReplyLongLong(c,removed);
523 }
524 
525 /* This is the semantic of this command:
526  *  RPOPLPUSH srclist dstlist:
527  *    IF LLEN(srclist) > 0
528  *      element = RPOP srclist
529  *      LPUSH dstlist element
530  *      RETURN element
531  *    ELSE
532  *      RETURN nil
533  *    END
534  *  END
535  *
536  * The idea is to be able to get an element from a list in a reliable way
537  * since the element is not just returned but pushed against another list
538  * as well. This command was originally proposed by Ezra Zygmuntowicz.
539  */
540 
rpoplpushHandlePush(client * c,robj * dstkey,robj * dstobj,robj * value)541 void rpoplpushHandlePush(client *c, robj *dstkey, robj *dstobj, robj *value) {
542     /* Create the list if the key does not exist */
543     if (!dstobj) {
544         dstobj = createQuicklistObject();
545         quicklistSetOptions(dstobj->ptr, server.list_max_ziplist_size,
546                             server.list_compress_depth);
547         dbAdd(c->db,dstkey,dstobj);
548     }
549     signalModifiedKey(c->db,dstkey);
550     listTypePush(dstobj,value,LIST_HEAD);
551     notifyKeyspaceEvent(NOTIFY_LIST,"lpush",dstkey,c->db->id);
552     /* Always send the pushed value to the client. */
553     addReplyBulk(c,value);
554 }
555 
rpoplpushCommand(client * c)556 void rpoplpushCommand(client *c) {
557     robj *sobj, *value;
558     if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
559         checkType(c,sobj,OBJ_LIST)) return;
560 
561     if (listTypeLength(sobj) == 0) {
562         /* This may only happen after loading very old RDB files. Recent
563          * versions of Redis delete keys of empty lists. */
564         addReply(c,shared.nullbulk);
565     } else {
566         robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
567         robj *touchedkey = c->argv[1];
568 
569         if (dobj && checkType(c,dobj,OBJ_LIST)) return;
570         value = listTypePop(sobj,LIST_TAIL);
571         /* We saved touched key, and protect it, since rpoplpushHandlePush
572          * may change the client command argument vector (it does not
573          * currently). */
574         incrRefCount(touchedkey);
575         rpoplpushHandlePush(c,c->argv[2],dobj,value);
576 
577         /* listTypePop returns an object with its refcount incremented */
578         decrRefCount(value);
579 
580         /* Delete the source list when it is empty */
581         notifyKeyspaceEvent(NOTIFY_LIST,"rpop",touchedkey,c->db->id);
582         if (listTypeLength(sobj) == 0) {
583             dbDelete(c->db,touchedkey);
584             notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
585                                 touchedkey,c->db->id);
586         }
587         signalModifiedKey(c->db,touchedkey);
588         decrRefCount(touchedkey);
589         server.dirty++;
590     }
591 }
592 
593 /*-----------------------------------------------------------------------------
594  * Blocking POP operations
595  *----------------------------------------------------------------------------*/
596 
597 /* This is how the current blocking POP works, we use BLPOP as example:
598  * - If the user calls BLPOP and the key exists and contains a non empty list
599  *   then LPOP is called instead. So BLPOP is semantically the same as LPOP
600  *   if blocking is not required.
601  * - If instead BLPOP is called and the key does not exists or the list is
602  *   empty we need to block. In order to do so we remove the notification for
603  *   new data to read in the client socket (so that we'll not serve new
604  *   requests if the blocking request is not served). Also we put the client
605  *   in a dictionary (db->blocking_keys) mapping keys to a list of clients
606  *   blocking for this keys.
607  * - If a PUSH operation against a key with blocked clients waiting is
608  *   performed, we mark this key as "ready", and after the current command,
609  *   MULTI/EXEC block, or script, is executed, we serve all the clients waiting
610  *   for this list, from the one that blocked first, to the last, accordingly
611  *   to the number of elements we have in the ready list.
612  */
613 
614 /* Set a client in blocking mode for the specified key, with the specified
615  * timeout */
blockForKeys(client * c,robj ** keys,int numkeys,mstime_t timeout,robj * target)616 void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
617     dictEntry *de;
618     list *l;
619     int j;
620 
621     c->bpop.timeout = timeout;
622     c->bpop.target = target;
623 
624     if (target != NULL) incrRefCount(target);
625 
626     for (j = 0; j < numkeys; j++) {
627         /* If the key already exists in the dict ignore it. */
628         if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
629         incrRefCount(keys[j]);
630 
631         /* And in the other "side", to map keys -> clients */
632         de = dictFind(c->db->blocking_keys,keys[j]);
633         if (de == NULL) {
634             int retval;
635 
636             /* For every key we take a list of clients blocked for it */
637             l = listCreate();
638             retval = dictAdd(c->db->blocking_keys,keys[j],l);
639             incrRefCount(keys[j]);
640             serverAssertWithInfo(c,keys[j],retval == DICT_OK);
641         } else {
642             l = dictGetVal(de);
643         }
644         listAddNodeTail(l,c);
645     }
646     blockClient(c,BLOCKED_LIST);
647 }
648 
649 /* Unblock a client that's waiting in a blocking operation such as BLPOP.
650  * You should never call this function directly, but unblockClient() instead. */
unblockClientWaitingData(client * c)651 void unblockClientWaitingData(client *c) {
652     dictEntry *de;
653     dictIterator *di;
654     list *l;
655 
656     serverAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0);
657     di = dictGetIterator(c->bpop.keys);
658     /* The client may wait for multiple keys, so unblock it for every key. */
659     while((de = dictNext(di)) != NULL) {
660         robj *key = dictGetKey(de);
661 
662         /* Remove this client from the list of clients waiting for this key. */
663         l = dictFetchValue(c->db->blocking_keys,key);
664         serverAssertWithInfo(c,key,l != NULL);
665         listDelNode(l,listSearchKey(l,c));
666         /* If the list is empty we need to remove it to avoid wasting memory */
667         if (listLength(l) == 0)
668             dictDelete(c->db->blocking_keys,key);
669     }
670     dictReleaseIterator(di);
671 
672     /* Cleanup the client structure */
673     dictEmpty(c->bpop.keys,NULL);
674     if (c->bpop.target) {
675         decrRefCount(c->bpop.target);
676         c->bpop.target = NULL;
677     }
678 }
679 
680 /* If the specified key has clients blocked waiting for list pushes, this
681  * function will put the key reference into the server.ready_keys list.
682  * Note that db->ready_keys is a hash table that allows us to avoid putting
683  * the same key again and again in the list in case of multiple pushes
684  * made by a script or in the context of MULTI/EXEC.
685  *
686  * The list will be finally processed by handleClientsBlockedOnLists() */
signalListAsReady(redisDb * db,robj * key)687 void signalListAsReady(redisDb *db, robj *key) {
688     readyList *rl;
689 
690     /* No clients blocking for this key? No need to queue it. */
691     if (dictFind(db->blocking_keys,key) == NULL) return;
692 
693     /* Key was already signaled? No need to queue it again. */
694     if (dictFind(db->ready_keys,key) != NULL) return;
695 
696     /* Ok, we need to queue this key into server.ready_keys. */
697     rl = zmalloc(sizeof(*rl));
698     rl->key = key;
699     rl->db = db;
700     incrRefCount(key);
701     listAddNodeTail(server.ready_keys,rl);
702 
703     /* We also add the key in the db->ready_keys dictionary in order
704      * to avoid adding it multiple times into a list with a simple O(1)
705      * check. */
706     incrRefCount(key);
707     serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
708 }
709 
710 /* This is a helper function for handleClientsBlockedOnLists(). It's work
711  * is to serve a specific client (receiver) that is blocked on 'key'
712  * in the context of the specified 'db', doing the following:
713  *
714  * 1) Provide the client with the 'value' element.
715  * 2) If the dstkey is not NULL (we are serving a BRPOPLPUSH) also push the
716  *    'value' element on the destination list (the LPUSH side of the command).
717  * 3) Propagate the resulting BRPOP, BLPOP and additional LPUSH if any into
718  *    the AOF and replication channel.
719  *
720  * The argument 'where' is LIST_TAIL or LIST_HEAD, and indicates if the
721  * 'value' element was popped fron the head (BLPOP) or tail (BRPOP) so that
722  * we can propagate the command properly.
723  *
724  * The function returns C_OK if we are able to serve the client, otherwise
725  * C_ERR is returned to signal the caller that the list POP operation
726  * should be undone as the client was not served: This only happens for
727  * BRPOPLPUSH that fails to push the value to the destination key as it is
728  * of the wrong type. */
serveClientBlockedOnList(client * receiver,robj * key,robj * dstkey,redisDb * db,robj * value,int where)729 int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
730 {
731     robj *argv[3];
732 
733     if (dstkey == NULL) {
734         /* Propagate the [LR]POP operation. */
735         argv[0] = (where == LIST_HEAD) ? shared.lpop :
736                                           shared.rpop;
737         argv[1] = key;
738         propagate((where == LIST_HEAD) ?
739             server.lpopCommand : server.rpopCommand,
740             db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
741 
742         /* BRPOP/BLPOP */
743         addReplyMultiBulkLen(receiver,2);
744         addReplyBulk(receiver,key);
745         addReplyBulk(receiver,value);
746     } else {
747         /* BRPOPLPUSH */
748         robj *dstobj =
749             lookupKeyWrite(receiver->db,dstkey);
750         if (!(dstobj &&
751              checkType(receiver,dstobj,OBJ_LIST)))
752         {
753             /* Propagate the RPOP operation. */
754             argv[0] = shared.rpop;
755             argv[1] = key;
756             propagate(server.rpopCommand,
757                 db->id,argv,2,
758                 PROPAGATE_AOF|
759                 PROPAGATE_REPL);
760             rpoplpushHandlePush(receiver,dstkey,dstobj,
761                 value);
762             /* Propagate the LPUSH operation. */
763             argv[0] = shared.lpush;
764             argv[1] = dstkey;
765             argv[2] = value;
766             propagate(server.lpushCommand,
767                 db->id,argv,3,
768                 PROPAGATE_AOF|
769                 PROPAGATE_REPL);
770         } else {
771             /* BRPOPLPUSH failed because of wrong
772              * destination type. */
773             return C_ERR;
774         }
775     }
776     return C_OK;
777 }
778 
779 /* This function should be called by Redis every time a single command,
780  * a MULTI/EXEC block, or a Lua script, terminated its execution after
781  * being called by a client.
782  *
783  * All the keys with at least one client blocked that received at least
784  * one new element via some PUSH operation are accumulated into
785  * the server.ready_keys list. This function will run the list and will
786  * serve clients accordingly. Note that the function will iterate again and
787  * again as a result of serving BRPOPLPUSH we can have new blocking clients
788  * to serve because of the PUSH side of BRPOPLPUSH. */
handleClientsBlockedOnLists(void)789 void handleClientsBlockedOnLists(void) {
790     while(listLength(server.ready_keys) != 0) {
791         list *l;
792 
793         /* Point server.ready_keys to a fresh list and save the current one
794          * locally. This way as we run the old list we are free to call
795          * signalListAsReady() that may push new elements in server.ready_keys
796          * when handling clients blocked into BRPOPLPUSH. */
797         l = server.ready_keys;
798         server.ready_keys = listCreate();
799 
800         while(listLength(l) != 0) {
801             listNode *ln = listFirst(l);
802             readyList *rl = ln->value;
803 
804             /* First of all remove this key from db->ready_keys so that
805              * we can safely call signalListAsReady() against this key. */
806             dictDelete(rl->db->ready_keys,rl->key);
807 
808             /* If the key exists and it's a list, serve blocked clients
809              * with data. */
810             robj *o = lookupKeyWrite(rl->db,rl->key);
811             if (o != NULL && o->type == OBJ_LIST) {
812                 dictEntry *de;
813 
814                 /* We serve clients in the same order they blocked for
815                  * this key, from the first blocked to the last. */
816                 de = dictFind(rl->db->blocking_keys,rl->key);
817                 if (de) {
818                     list *clients = dictGetVal(de);
819                     int numclients = listLength(clients);
820 
821                     while(numclients--) {
822                         listNode *clientnode = listFirst(clients);
823                         client *receiver = clientnode->value;
824                         robj *dstkey = receiver->bpop.target;
825                         int where = (receiver->lastcmd &&
826                                      receiver->lastcmd->proc == blpopCommand) ?
827                                     LIST_HEAD : LIST_TAIL;
828                         robj *value = listTypePop(o,where);
829 
830                         if (value) {
831                             /* Protect receiver->bpop.target, that will be
832                              * freed by the next unblockClient()
833                              * call. */
834                             if (dstkey) incrRefCount(dstkey);
835                             unblockClient(receiver);
836 
837                             if (serveClientBlockedOnList(receiver,
838                                 rl->key,dstkey,rl->db,value,
839                                 where) == C_ERR)
840                             {
841                                 /* If we failed serving the client we need
842                                  * to also undo the POP operation. */
843                                     listTypePush(o,value,where);
844                             }
845 
846                             if (dstkey) decrRefCount(dstkey);
847                             decrRefCount(value);
848                         } else {
849                             break;
850                         }
851                     }
852                 }
853 
854                 if (listTypeLength(o) == 0) {
855                     dbDelete(rl->db,rl->key);
856                 }
857                 /* We don't call signalModifiedKey() as it was already called
858                  * when an element was pushed on the list. */
859             }
860 
861             /* Free this item. */
862             decrRefCount(rl->key);
863             zfree(rl);
864             listDelNode(l,ln);
865         }
866         listRelease(l); /* We have the new list on place at this point. */
867     }
868 }
869 
870 /* Blocking RPOP/LPOP */
blockingPopGenericCommand(client * c,int where)871 void blockingPopGenericCommand(client *c, int where) {
872     robj *o;
873     mstime_t timeout;
874     int j;
875 
876     if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
877         != C_OK) return;
878 
879     for (j = 1; j < c->argc-1; j++) {
880         o = lookupKeyWrite(c->db,c->argv[j]);
881         if (o != NULL) {
882             if (o->type != OBJ_LIST) {
883                 addReply(c,shared.wrongtypeerr);
884                 return;
885             } else {
886                 if (listTypeLength(o) != 0) {
887                     /* Non empty list, this is like a non normal [LR]POP. */
888                     char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
889                     robj *value = listTypePop(o,where);
890                     serverAssert(value != NULL);
891 
892                     addReplyMultiBulkLen(c,2);
893                     addReplyBulk(c,c->argv[j]);
894                     addReplyBulk(c,value);
895                     decrRefCount(value);
896                     notifyKeyspaceEvent(NOTIFY_LIST,event,
897                                         c->argv[j],c->db->id);
898                     if (listTypeLength(o) == 0) {
899                         dbDelete(c->db,c->argv[j]);
900                         notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
901                                             c->argv[j],c->db->id);
902                     }
903                     signalModifiedKey(c->db,c->argv[j]);
904                     server.dirty++;
905 
906                     /* Replicate it as an [LR]POP instead of B[LR]POP. */
907                     rewriteClientCommandVector(c,2,
908                         (where == LIST_HEAD) ? shared.lpop : shared.rpop,
909                         c->argv[j]);
910                     return;
911                 }
912             }
913         }
914     }
915 
916     /* If we are inside a MULTI/EXEC and the list is empty the only thing
917      * we can do is treating it as a timeout (even with timeout 0). */
918     if (c->flags & CLIENT_MULTI) {
919         addReply(c,shared.nullmultibulk);
920         return;
921     }
922 
923     /* If the list is empty or the key does not exists we must block */
924     blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
925 }
926 
blpopCommand(client * c)927 void blpopCommand(client *c) {
928     blockingPopGenericCommand(c,LIST_HEAD);
929 }
930 
brpopCommand(client * c)931 void brpopCommand(client *c) {
932     blockingPopGenericCommand(c,LIST_TAIL);
933 }
934 
brpoplpushCommand(client * c)935 void brpoplpushCommand(client *c) {
936     mstime_t timeout;
937 
938     if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS)
939         != C_OK) return;
940 
941     robj *key = lookupKeyWrite(c->db, c->argv[1]);
942 
943     if (key == NULL) {
944         if (c->flags & CLIENT_MULTI) {
945             /* Blocking against an empty list in a multi state
946              * returns immediately. */
947             addReply(c, shared.nullbulk);
948         } else {
949             /* The list is empty and the client blocks. */
950             blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
951         }
952     } else {
953         if (key->type != OBJ_LIST) {
954             addReply(c, shared.wrongtypeerr);
955         } else {
956             /* The list exists and has elements, so
957              * the regular rpoplpushCommand is executed. */
958             serverAssertWithInfo(c,key,listTypeLength(key) > 0);
959             rpoplpushCommand(c);
960         }
961     }
962 }
963