xref: /f-stack/app/redis-5.0.5/src/t_list.c (revision 572c4311)
1 /*
2  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *   * Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *   * Redistributions in binary form must reproduce the above copyright
11  *     notice, this list of conditions and the following disclaimer in the
12  *     documentation and/or other materials provided with the distribution.
13  *   * Neither the name of Redis nor the names of its contributors may be used
14  *     to endorse or promote products derived from this software without
15  *     specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include "server.h"
31 
32 /*-----------------------------------------------------------------------------
33  * List API
34  *----------------------------------------------------------------------------*/
35 
36 /* The function pushes an element to the specified list object 'subject',
37  * at head or tail position as specified by 'where'.
38  *
39  * There is no need for the caller to increment the refcount of 'value' as
40  * the function takes care of it if needed. */
listTypePush(robj * subject,robj * value,int where)41 void listTypePush(robj *subject, robj *value, int where) {
42     if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
43         int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
44         value = getDecodedObject(value);
45         size_t len = sdslen(value->ptr);
46         quicklistPush(subject->ptr, value->ptr, len, pos);
47         decrRefCount(value);
48     } else {
49         serverPanic("Unknown list encoding");
50     }
51 }
52 
listPopSaver(unsigned char * data,unsigned int sz)53 void *listPopSaver(unsigned char *data, unsigned int sz) {
54     return createStringObject((char*)data,sz);
55 }
56 
listTypePop(robj * subject,int where)57 robj *listTypePop(robj *subject, int where) {
58     long long vlong;
59     robj *value = NULL;
60 
61     int ql_where = where == LIST_HEAD ? QUICKLIST_HEAD : QUICKLIST_TAIL;
62     if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
63         if (quicklistPopCustom(subject->ptr, ql_where, (unsigned char **)&value,
64                                NULL, &vlong, listPopSaver)) {
65             if (!value)
66                 value = createStringObjectFromLongLong(vlong);
67         }
68     } else {
69         serverPanic("Unknown list encoding");
70     }
71     return value;
72 }
73 
listTypeLength(const robj * subject)74 unsigned long listTypeLength(const robj *subject) {
75     if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
76         return quicklistCount(subject->ptr);
77     } else {
78         serverPanic("Unknown list encoding");
79     }
80 }
81 
82 /* Initialize an iterator at the specified index. */
listTypeInitIterator(robj * subject,long index,unsigned char direction)83 listTypeIterator *listTypeInitIterator(robj *subject, long index,
84                                        unsigned char direction) {
85     listTypeIterator *li = zmalloc(sizeof(listTypeIterator));
86     li->subject = subject;
87     li->encoding = subject->encoding;
88     li->direction = direction;
89     li->iter = NULL;
90     /* LIST_HEAD means start at TAIL and move *towards* head.
91      * LIST_TAIL means start at HEAD and move *towards tail. */
92     int iter_direction =
93         direction == LIST_HEAD ? AL_START_TAIL : AL_START_HEAD;
94     if (li->encoding == OBJ_ENCODING_QUICKLIST) {
95         li->iter = quicklistGetIteratorAtIdx(li->subject->ptr,
96                                              iter_direction, index);
97     } else {
98         serverPanic("Unknown list encoding");
99     }
100     return li;
101 }
102 
103 /* Clean up the iterator. */
listTypeReleaseIterator(listTypeIterator * li)104 void listTypeReleaseIterator(listTypeIterator *li) {
105     zfree(li->iter);
106     zfree(li);
107 }
108 
109 /* Stores pointer to current the entry in the provided entry structure
110  * and advances the position of the iterator. Returns 1 when the current
111  * entry is in fact an entry, 0 otherwise. */
listTypeNext(listTypeIterator * li,listTypeEntry * entry)112 int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {
113     /* Protect from converting when iterating */
114     serverAssert(li->subject->encoding == li->encoding);
115 
116     entry->li = li;
117     if (li->encoding == OBJ_ENCODING_QUICKLIST) {
118         return quicklistNext(li->iter, &entry->entry);
119     } else {
120         serverPanic("Unknown list encoding");
121     }
122     return 0;
123 }
124 
125 /* Return entry or NULL at the current position of the iterator. */
listTypeGet(listTypeEntry * entry)126 robj *listTypeGet(listTypeEntry *entry) {
127     robj *value = NULL;
128     if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
129         if (entry->entry.value) {
130             value = createStringObject((char *)entry->entry.value,
131                                        entry->entry.sz);
132         } else {
133             value = createStringObjectFromLongLong(entry->entry.longval);
134         }
135     } else {
136         serverPanic("Unknown list encoding");
137     }
138     return value;
139 }
140 
listTypeInsert(listTypeEntry * entry,robj * value,int where)141 void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
142     if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
143         value = getDecodedObject(value);
144         sds str = value->ptr;
145         size_t len = sdslen(str);
146         if (where == LIST_TAIL) {
147             quicklistInsertAfter((quicklist *)entry->entry.quicklist,
148                                  &entry->entry, str, len);
149         } else if (where == LIST_HEAD) {
150             quicklistInsertBefore((quicklist *)entry->entry.quicklist,
151                                   &entry->entry, str, len);
152         }
153         decrRefCount(value);
154     } else {
155         serverPanic("Unknown list encoding");
156     }
157 }
158 
159 /* Compare the given object with the entry at the current position. */
listTypeEqual(listTypeEntry * entry,robj * o)160 int listTypeEqual(listTypeEntry *entry, robj *o) {
161     if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
162         serverAssertWithInfo(NULL,o,sdsEncodedObject(o));
163         return quicklistCompare(entry->entry.zi,o->ptr,sdslen(o->ptr));
164     } else {
165         serverPanic("Unknown list encoding");
166     }
167 }
168 
169 /* Delete the element pointed to. */
listTypeDelete(listTypeIterator * iter,listTypeEntry * entry)170 void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry) {
171     if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
172         quicklistDelEntry(iter->iter, &entry->entry);
173     } else {
174         serverPanic("Unknown list encoding");
175     }
176 }
177 
178 /* Create a quicklist from a single ziplist */
listTypeConvert(robj * subject,int enc)179 void listTypeConvert(robj *subject, int enc) {
180     serverAssertWithInfo(NULL,subject,subject->type==OBJ_LIST);
181     serverAssertWithInfo(NULL,subject,subject->encoding==OBJ_ENCODING_ZIPLIST);
182 
183     if (enc == OBJ_ENCODING_QUICKLIST) {
184         size_t zlen = server.list_max_ziplist_size;
185         int depth = server.list_compress_depth;
186         subject->ptr = quicklistCreateFromZiplist(zlen, depth, subject->ptr);
187         subject->encoding = OBJ_ENCODING_QUICKLIST;
188     } else {
189         serverPanic("Unsupported list conversion");
190     }
191 }
192 
193 /*-----------------------------------------------------------------------------
194  * List Commands
195  *----------------------------------------------------------------------------*/
196 
pushGenericCommand(client * c,int where)197 void pushGenericCommand(client *c, int where) {
198     int j, pushed = 0;
199     robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
200 
201     if (lobj && lobj->type != OBJ_LIST) {
202         addReply(c,shared.wrongtypeerr);
203         return;
204     }
205 
206     for (j = 2; j < c->argc; j++) {
207         if (!lobj) {
208             lobj = createQuicklistObject();
209             quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
210                                 server.list_compress_depth);
211             dbAdd(c->db,c->argv[1],lobj);
212         }
213         listTypePush(lobj,c->argv[j],where);
214         pushed++;
215     }
216     addReplyLongLong(c, (lobj ? listTypeLength(lobj) : 0));
217     if (pushed) {
218         char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
219 
220         signalModifiedKey(c->db,c->argv[1]);
221         notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
222     }
223     server.dirty += pushed;
224 }
225 
lpushCommand(client * c)226 void lpushCommand(client *c) {
227     pushGenericCommand(c,LIST_HEAD);
228 }
229 
rpushCommand(client * c)230 void rpushCommand(client *c) {
231     pushGenericCommand(c,LIST_TAIL);
232 }
233 
pushxGenericCommand(client * c,int where)234 void pushxGenericCommand(client *c, int where) {
235     int j, pushed = 0;
236     robj *subject;
237 
238     if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
239         checkType(c,subject,OBJ_LIST)) return;
240 
241     for (j = 2; j < c->argc; j++) {
242         listTypePush(subject,c->argv[j],where);
243         pushed++;
244     }
245 
246     addReplyLongLong(c,listTypeLength(subject));
247 
248     if (pushed) {
249         char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
250         signalModifiedKey(c->db,c->argv[1]);
251         notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
252     }
253     server.dirty += pushed;
254 }
255 
lpushxCommand(client * c)256 void lpushxCommand(client *c) {
257     pushxGenericCommand(c,LIST_HEAD);
258 }
259 
rpushxCommand(client * c)260 void rpushxCommand(client *c) {
261     pushxGenericCommand(c,LIST_TAIL);
262 }
263 
linsertCommand(client * c)264 void linsertCommand(client *c) {
265     int where;
266     robj *subject;
267     listTypeIterator *iter;
268     listTypeEntry entry;
269     int inserted = 0;
270 
271     if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
272         where = LIST_TAIL;
273     } else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
274         where = LIST_HEAD;
275     } else {
276         addReply(c,shared.syntaxerr);
277         return;
278     }
279 
280     if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
281         checkType(c,subject,OBJ_LIST)) return;
282 
283     /* Seek pivot from head to tail */
284     iter = listTypeInitIterator(subject,0,LIST_TAIL);
285     while (listTypeNext(iter,&entry)) {
286         if (listTypeEqual(&entry,c->argv[3])) {
287             listTypeInsert(&entry,c->argv[4],where);
288             inserted = 1;
289             break;
290         }
291     }
292     listTypeReleaseIterator(iter);
293 
294     if (inserted) {
295         signalModifiedKey(c->db,c->argv[1]);
296         notifyKeyspaceEvent(NOTIFY_LIST,"linsert",
297                             c->argv[1],c->db->id);
298         server.dirty++;
299     } else {
300         /* Notify client of a failed insert */
301         addReply(c,shared.cnegone);
302         return;
303     }
304 
305     addReplyLongLong(c,listTypeLength(subject));
306 }
307 
llenCommand(client * c)308 void llenCommand(client *c) {
309     robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
310     if (o == NULL || checkType(c,o,OBJ_LIST)) return;
311     addReplyLongLong(c,listTypeLength(o));
312 }
313 
lindexCommand(client * c)314 void lindexCommand(client *c) {
315     robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
316     if (o == NULL || checkType(c,o,OBJ_LIST)) return;
317     long index;
318     robj *value = NULL;
319 
320     if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
321         return;
322 
323     if (o->encoding == OBJ_ENCODING_QUICKLIST) {
324         quicklistEntry entry;
325         if (quicklistIndex(o->ptr, index, &entry)) {
326             if (entry.value) {
327                 value = createStringObject((char*)entry.value,entry.sz);
328             } else {
329                 value = createStringObjectFromLongLong(entry.longval);
330             }
331             addReplyBulk(c,value);
332             decrRefCount(value);
333         } else {
334             addReply(c,shared.nullbulk);
335         }
336     } else {
337         serverPanic("Unknown list encoding");
338     }
339 }
340 
lsetCommand(client * c)341 void lsetCommand(client *c) {
342     robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
343     if (o == NULL || checkType(c,o,OBJ_LIST)) return;
344     long index;
345     robj *value = c->argv[3];
346 
347     if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
348         return;
349 
350     if (o->encoding == OBJ_ENCODING_QUICKLIST) {
351         quicklist *ql = o->ptr;
352         int replaced = quicklistReplaceAtIndex(ql, index,
353                                                value->ptr, sdslen(value->ptr));
354         if (!replaced) {
355             addReply(c,shared.outofrangeerr);
356         } else {
357             addReply(c,shared.ok);
358             signalModifiedKey(c->db,c->argv[1]);
359             notifyKeyspaceEvent(NOTIFY_LIST,"lset",c->argv[1],c->db->id);
360             server.dirty++;
361         }
362     } else {
363         serverPanic("Unknown list encoding");
364     }
365 }
366 
popGenericCommand(client * c,int where)367 void popGenericCommand(client *c, int where) {
368     robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);
369     if (o == NULL || checkType(c,o,OBJ_LIST)) return;
370 
371     robj *value = listTypePop(o,where);
372     if (value == NULL) {
373         addReply(c,shared.nullbulk);
374     } else {
375         char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
376 
377         addReplyBulk(c,value);
378         decrRefCount(value);
379         notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
380         if (listTypeLength(o) == 0) {
381             notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
382                                 c->argv[1],c->db->id);
383             dbDelete(c->db,c->argv[1]);
384         }
385         signalModifiedKey(c->db,c->argv[1]);
386         server.dirty++;
387     }
388 }
389 
lpopCommand(client * c)390 void lpopCommand(client *c) {
391     popGenericCommand(c,LIST_HEAD);
392 }
393 
rpopCommand(client * c)394 void rpopCommand(client *c) {
395     popGenericCommand(c,LIST_TAIL);
396 }
397 
lrangeCommand(client * c)398 void lrangeCommand(client *c) {
399     robj *o;
400     long start, end, llen, rangelen;
401 
402     if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
403         (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
404 
405     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
406          || checkType(c,o,OBJ_LIST)) return;
407     llen = listTypeLength(o);
408 
409     /* convert negative indexes */
410     if (start < 0) start = llen+start;
411     if (end < 0) end = llen+end;
412     if (start < 0) start = 0;
413 
414     /* Invariant: start >= 0, so this test will be true when end < 0.
415      * The range is empty when start > end or start >= length. */
416     if (start > end || start >= llen) {
417         addReply(c,shared.emptymultibulk);
418         return;
419     }
420     if (end >= llen) end = llen-1;
421     rangelen = (end-start)+1;
422 
423     /* Return the result in form of a multi-bulk reply */
424     addReplyMultiBulkLen(c,rangelen);
425     if (o->encoding == OBJ_ENCODING_QUICKLIST) {
426         listTypeIterator *iter = listTypeInitIterator(o, start, LIST_TAIL);
427 
428         while(rangelen--) {
429             listTypeEntry entry;
430             listTypeNext(iter, &entry);
431             quicklistEntry *qe = &entry.entry;
432             if (qe->value) {
433                 addReplyBulkCBuffer(c,qe->value,qe->sz);
434             } else {
435                 addReplyBulkLongLong(c,qe->longval);
436             }
437         }
438         listTypeReleaseIterator(iter);
439     } else {
440         serverPanic("List encoding is not QUICKLIST!");
441     }
442 }
443 
ltrimCommand(client * c)444 void ltrimCommand(client *c) {
445     robj *o;
446     long start, end, llen, ltrim, rtrim;
447 
448     if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
449         (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
450 
451     if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
452         checkType(c,o,OBJ_LIST)) return;
453     llen = listTypeLength(o);
454 
455     /* convert negative indexes */
456     if (start < 0) start = llen+start;
457     if (end < 0) end = llen+end;
458     if (start < 0) start = 0;
459 
460     /* Invariant: start >= 0, so this test will be true when end < 0.
461      * The range is empty when start > end or start >= length. */
462     if (start > end || start >= llen) {
463         /* Out of range start or start > end result in empty list */
464         ltrim = llen;
465         rtrim = 0;
466     } else {
467         if (end >= llen) end = llen-1;
468         ltrim = start;
469         rtrim = llen-end-1;
470     }
471 
472     /* Remove list elements to perform the trim */
473     if (o->encoding == OBJ_ENCODING_QUICKLIST) {
474         quicklistDelRange(o->ptr,0,ltrim);
475         quicklistDelRange(o->ptr,-rtrim,rtrim);
476     } else {
477         serverPanic("Unknown list encoding");
478     }
479 
480     notifyKeyspaceEvent(NOTIFY_LIST,"ltrim",c->argv[1],c->db->id);
481     if (listTypeLength(o) == 0) {
482         dbDelete(c->db,c->argv[1]);
483         notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
484     }
485     signalModifiedKey(c->db,c->argv[1]);
486     server.dirty++;
487     addReply(c,shared.ok);
488 }
489 
lremCommand(client * c)490 void lremCommand(client *c) {
491     robj *subject, *obj;
492     obj = c->argv[3];
493     long toremove;
494     long removed = 0;
495 
496     if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != C_OK))
497         return;
498 
499     subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
500     if (subject == NULL || checkType(c,subject,OBJ_LIST)) return;
501 
502     listTypeIterator *li;
503     if (toremove < 0) {
504         toremove = -toremove;
505         li = listTypeInitIterator(subject,-1,LIST_HEAD);
506     } else {
507         li = listTypeInitIterator(subject,0,LIST_TAIL);
508     }
509 
510     listTypeEntry entry;
511     while (listTypeNext(li,&entry)) {
512         if (listTypeEqual(&entry,obj)) {
513             listTypeDelete(li, &entry);
514             server.dirty++;
515             removed++;
516             if (toremove && removed == toremove) break;
517         }
518     }
519     listTypeReleaseIterator(li);
520 
521     if (removed) {
522         signalModifiedKey(c->db,c->argv[1]);
523         notifyKeyspaceEvent(NOTIFY_LIST,"lrem",c->argv[1],c->db->id);
524     }
525 
526     if (listTypeLength(subject) == 0) {
527         dbDelete(c->db,c->argv[1]);
528         notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
529     }
530 
531     addReplyLongLong(c,removed);
532 }
533 
534 /* This is the semantic of this command:
535  *  RPOPLPUSH srclist dstlist:
536  *    IF LLEN(srclist) > 0
537  *      element = RPOP srclist
538  *      LPUSH dstlist element
539  *      RETURN element
540  *    ELSE
541  *      RETURN nil
542  *    END
543  *  END
544  *
545  * The idea is to be able to get an element from a list in a reliable way
546  * since the element is not just returned but pushed against another list
547  * as well. This command was originally proposed by Ezra Zygmuntowicz.
548  */
549 
rpoplpushHandlePush(client * c,robj * dstkey,robj * dstobj,robj * value)550 void rpoplpushHandlePush(client *c, robj *dstkey, robj *dstobj, robj *value) {
551     /* Create the list if the key does not exist */
552     if (!dstobj) {
553         dstobj = createQuicklistObject();
554         quicklistSetOptions(dstobj->ptr, server.list_max_ziplist_size,
555                             server.list_compress_depth);
556         dbAdd(c->db,dstkey,dstobj);
557     }
558     signalModifiedKey(c->db,dstkey);
559     listTypePush(dstobj,value,LIST_HEAD);
560     notifyKeyspaceEvent(NOTIFY_LIST,"lpush",dstkey,c->db->id);
561     /* Always send the pushed value to the client. */
562     addReplyBulk(c,value);
563 }
564 
rpoplpushCommand(client * c)565 void rpoplpushCommand(client *c) {
566     robj *sobj, *value;
567     if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
568         checkType(c,sobj,OBJ_LIST)) return;
569 
570     if (listTypeLength(sobj) == 0) {
571         /* This may only happen after loading very old RDB files. Recent
572          * versions of Redis delete keys of empty lists. */
573         addReply(c,shared.nullbulk);
574     } else {
575         robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
576         robj *touchedkey = c->argv[1];
577 
578         if (dobj && checkType(c,dobj,OBJ_LIST)) return;
579         value = listTypePop(sobj,LIST_TAIL);
580         /* We saved touched key, and protect it, since rpoplpushHandlePush
581          * may change the client command argument vector (it does not
582          * currently). */
583         incrRefCount(touchedkey);
584         rpoplpushHandlePush(c,c->argv[2],dobj,value);
585 
586         /* listTypePop returns an object with its refcount incremented */
587         decrRefCount(value);
588 
589         /* Delete the source list when it is empty */
590         notifyKeyspaceEvent(NOTIFY_LIST,"rpop",touchedkey,c->db->id);
591         if (listTypeLength(sobj) == 0) {
592             dbDelete(c->db,touchedkey);
593             notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
594                                 touchedkey,c->db->id);
595         }
596         signalModifiedKey(c->db,touchedkey);
597         decrRefCount(touchedkey);
598         server.dirty++;
599         if (c->cmd->proc == brpoplpushCommand) {
600             rewriteClientCommandVector(c,3,shared.rpoplpush,c->argv[1],c->argv[2]);
601         }
602     }
603 }
604 
605 /*-----------------------------------------------------------------------------
606  * Blocking POP operations
607  *----------------------------------------------------------------------------*/
608 
609 /* This is a helper function for handleClientsBlockedOnLists(). It's work
610  * is to serve a specific client (receiver) that is blocked on 'key'
611  * in the context of the specified 'db', doing the following:
612  *
613  * 1) Provide the client with the 'value' element.
614  * 2) If the dstkey is not NULL (we are serving a BRPOPLPUSH) also push the
615  *    'value' element on the destination list (the LPUSH side of the command).
616  * 3) Propagate the resulting BRPOP, BLPOP and additional LPUSH if any into
617  *    the AOF and replication channel.
618  *
619  * The argument 'where' is LIST_TAIL or LIST_HEAD, and indicates if the
620  * 'value' element was popped from the head (BLPOP) or tail (BRPOP) so that
621  * we can propagate the command properly.
622  *
623  * The function returns C_OK if we are able to serve the client, otherwise
624  * C_ERR is returned to signal the caller that the list POP operation
625  * should be undone as the client was not served: This only happens for
626  * BRPOPLPUSH that fails to push the value to the destination key as it is
627  * of the wrong type. */
serveClientBlockedOnList(client * receiver,robj * key,robj * dstkey,redisDb * db,robj * value,int where)628 int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
629 {
630     robj *argv[3];
631 
632     if (dstkey == NULL) {
633         /* Propagate the [LR]POP operation. */
634         argv[0] = (where == LIST_HEAD) ? shared.lpop :
635                                           shared.rpop;
636         argv[1] = key;
637         propagate((where == LIST_HEAD) ?
638             server.lpopCommand : server.rpopCommand,
639             db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
640 
641         /* BRPOP/BLPOP */
642         addReplyMultiBulkLen(receiver,2);
643         addReplyBulk(receiver,key);
644         addReplyBulk(receiver,value);
645 
646         /* Notify event. */
647         char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
648         notifyKeyspaceEvent(NOTIFY_LIST,event,key,receiver->db->id);
649     } else {
650         /* BRPOPLPUSH */
651         robj *dstobj =
652             lookupKeyWrite(receiver->db,dstkey);
653         if (!(dstobj &&
654              checkType(receiver,dstobj,OBJ_LIST)))
655         {
656             /* Propagate the RPOP operation. */
657             argv[0] = shared.rpop;
658             argv[1] = key;
659             propagate(server.rpopCommand,
660                 db->id,argv,2,
661                 PROPAGATE_AOF|
662                 PROPAGATE_REPL);
663             rpoplpushHandlePush(receiver,dstkey,dstobj,
664                 value);
665             /* Propagate the LPUSH operation. */
666             argv[0] = shared.lpush;
667             argv[1] = dstkey;
668             argv[2] = value;
669             propagate(server.lpushCommand,
670                 db->id,argv,3,
671                 PROPAGATE_AOF|
672                 PROPAGATE_REPL);
673 
674             /* Notify event ("lpush" was notified by rpoplpushHandlePush). */
675             notifyKeyspaceEvent(NOTIFY_LIST,"rpop",key,receiver->db->id);
676         } else {
677             /* BRPOPLPUSH failed because of wrong
678              * destination type. */
679             return C_ERR;
680         }
681     }
682     return C_OK;
683 }
684 
685 /* Blocking RPOP/LPOP */
blockingPopGenericCommand(client * c,int where)686 void blockingPopGenericCommand(client *c, int where) {
687     robj *o;
688     mstime_t timeout;
689     int j;
690 
691     if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
692         != C_OK) return;
693 
694     for (j = 1; j < c->argc-1; j++) {
695         o = lookupKeyWrite(c->db,c->argv[j]);
696         if (o != NULL) {
697             if (o->type != OBJ_LIST) {
698                 addReply(c,shared.wrongtypeerr);
699                 return;
700             } else {
701                 if (listTypeLength(o) != 0) {
702                     /* Non empty list, this is like a non normal [LR]POP. */
703                     char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
704                     robj *value = listTypePop(o,where);
705                     serverAssert(value != NULL);
706 
707                     addReplyMultiBulkLen(c,2);
708                     addReplyBulk(c,c->argv[j]);
709                     addReplyBulk(c,value);
710                     decrRefCount(value);
711                     notifyKeyspaceEvent(NOTIFY_LIST,event,
712                                         c->argv[j],c->db->id);
713                     if (listTypeLength(o) == 0) {
714                         dbDelete(c->db,c->argv[j]);
715                         notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
716                                             c->argv[j],c->db->id);
717                     }
718                     signalModifiedKey(c->db,c->argv[j]);
719                     server.dirty++;
720 
721                     /* Replicate it as an [LR]POP instead of B[LR]POP. */
722                     rewriteClientCommandVector(c,2,
723                         (where == LIST_HEAD) ? shared.lpop : shared.rpop,
724                         c->argv[j]);
725                     return;
726                 }
727             }
728         }
729     }
730 
731     /* If we are inside a MULTI/EXEC and the list is empty the only thing
732      * we can do is treating it as a timeout (even with timeout 0). */
733     if (c->flags & CLIENT_MULTI) {
734         addReply(c,shared.nullmultibulk);
735         return;
736     }
737 
738     /* If the list is empty or the key does not exists we must block */
739     blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,NULL);
740 }
741 
blpopCommand(client * c)742 void blpopCommand(client *c) {
743     blockingPopGenericCommand(c,LIST_HEAD);
744 }
745 
brpopCommand(client * c)746 void brpopCommand(client *c) {
747     blockingPopGenericCommand(c,LIST_TAIL);
748 }
749 
brpoplpushCommand(client * c)750 void brpoplpushCommand(client *c) {
751     mstime_t timeout;
752 
753     if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS)
754         != C_OK) return;
755 
756     robj *key = lookupKeyWrite(c->db, c->argv[1]);
757 
758     if (key == NULL) {
759         if (c->flags & CLIENT_MULTI) {
760             /* Blocking against an empty list in a multi state
761              * returns immediately. */
762             addReply(c, shared.nullbulk);
763         } else {
764             /* The list is empty and the client blocks. */
765             blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,timeout,c->argv[2],NULL);
766         }
767     } else {
768         if (key->type != OBJ_LIST) {
769             addReply(c, shared.wrongtypeerr);
770         } else {
771             /* The list exists and has elements, so
772              * the regular rpoplpushCommand is executed. */
773             serverAssertWithInfo(c,key,listTypeLength(key) > 0);
774             rpoplpushCommand(c);
775         }
776     }
777 }
778