1 /*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include "server.h"
31
32 /*-----------------------------------------------------------------------------
33 * List API
34 *----------------------------------------------------------------------------*/
35
36 /* The function pushes an element to the specified list object 'subject',
37 * at head or tail position as specified by 'where'.
38 *
39 * There is no need for the caller to increment the refcount of 'value' as
40 * the function takes care of it if needed. */
listTypePush(robj * subject,robj * value,int where)41 void listTypePush(robj *subject, robj *value, int where) {
42 if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
43 int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
44 value = getDecodedObject(value);
45 size_t len = sdslen(value->ptr);
46 quicklistPush(subject->ptr, value->ptr, len, pos);
47 decrRefCount(value);
48 } else {
49 serverPanic("Unknown list encoding");
50 }
51 }
52
listPopSaver(unsigned char * data,unsigned int sz)53 void *listPopSaver(unsigned char *data, unsigned int sz) {
54 return createStringObject((char*)data,sz);
55 }
56
listTypePop(robj * subject,int where)57 robj *listTypePop(robj *subject, int where) {
58 long long vlong;
59 robj *value = NULL;
60
61 int ql_where = where == LIST_HEAD ? QUICKLIST_HEAD : QUICKLIST_TAIL;
62 if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
63 if (quicklistPopCustom(subject->ptr, ql_where, (unsigned char **)&value,
64 NULL, &vlong, listPopSaver)) {
65 if (!value)
66 value = createStringObjectFromLongLong(vlong);
67 }
68 } else {
69 serverPanic("Unknown list encoding");
70 }
71 return value;
72 }
73
listTypeLength(const robj * subject)74 unsigned long listTypeLength(const robj *subject) {
75 if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
76 return quicklistCount(subject->ptr);
77 } else {
78 serverPanic("Unknown list encoding");
79 }
80 }
81
82 /* Initialize an iterator at the specified index. */
listTypeInitIterator(robj * subject,long index,unsigned char direction)83 listTypeIterator *listTypeInitIterator(robj *subject, long index,
84 unsigned char direction) {
85 listTypeIterator *li = zmalloc(sizeof(listTypeIterator));
86 li->subject = subject;
87 li->encoding = subject->encoding;
88 li->direction = direction;
89 li->iter = NULL;
90 /* LIST_HEAD means start at TAIL and move *towards* head.
91 * LIST_TAIL means start at HEAD and move *towards tail. */
92 int iter_direction =
93 direction == LIST_HEAD ? AL_START_TAIL : AL_START_HEAD;
94 if (li->encoding == OBJ_ENCODING_QUICKLIST) {
95 li->iter = quicklistGetIteratorAtIdx(li->subject->ptr,
96 iter_direction, index);
97 } else {
98 serverPanic("Unknown list encoding");
99 }
100 return li;
101 }
102
103 /* Clean up the iterator. */
listTypeReleaseIterator(listTypeIterator * li)104 void listTypeReleaseIterator(listTypeIterator *li) {
105 zfree(li->iter);
106 zfree(li);
107 }
108
109 /* Stores pointer to current the entry in the provided entry structure
110 * and advances the position of the iterator. Returns 1 when the current
111 * entry is in fact an entry, 0 otherwise. */
listTypeNext(listTypeIterator * li,listTypeEntry * entry)112 int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {
113 /* Protect from converting when iterating */
114 serverAssert(li->subject->encoding == li->encoding);
115
116 entry->li = li;
117 if (li->encoding == OBJ_ENCODING_QUICKLIST) {
118 return quicklistNext(li->iter, &entry->entry);
119 } else {
120 serverPanic("Unknown list encoding");
121 }
122 return 0;
123 }
124
125 /* Return entry or NULL at the current position of the iterator. */
listTypeGet(listTypeEntry * entry)126 robj *listTypeGet(listTypeEntry *entry) {
127 robj *value = NULL;
128 if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
129 if (entry->entry.value) {
130 value = createStringObject((char *)entry->entry.value,
131 entry->entry.sz);
132 } else {
133 value = createStringObjectFromLongLong(entry->entry.longval);
134 }
135 } else {
136 serverPanic("Unknown list encoding");
137 }
138 return value;
139 }
140
listTypeInsert(listTypeEntry * entry,robj * value,int where)141 void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
142 if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
143 value = getDecodedObject(value);
144 sds str = value->ptr;
145 size_t len = sdslen(str);
146 if (where == LIST_TAIL) {
147 quicklistInsertAfter((quicklist *)entry->entry.quicklist,
148 &entry->entry, str, len);
149 } else if (where == LIST_HEAD) {
150 quicklistInsertBefore((quicklist *)entry->entry.quicklist,
151 &entry->entry, str, len);
152 }
153 decrRefCount(value);
154 } else {
155 serverPanic("Unknown list encoding");
156 }
157 }
158
159 /* Compare the given object with the entry at the current position. */
listTypeEqual(listTypeEntry * entry,robj * o)160 int listTypeEqual(listTypeEntry *entry, robj *o) {
161 if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
162 serverAssertWithInfo(NULL,o,sdsEncodedObject(o));
163 return quicklistCompare(entry->entry.zi,o->ptr,sdslen(o->ptr));
164 } else {
165 serverPanic("Unknown list encoding");
166 }
167 }
168
169 /* Delete the element pointed to. */
listTypeDelete(listTypeIterator * iter,listTypeEntry * entry)170 void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry) {
171 if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
172 quicklistDelEntry(iter->iter, &entry->entry);
173 } else {
174 serverPanic("Unknown list encoding");
175 }
176 }
177
178 /* Create a quicklist from a single ziplist */
listTypeConvert(robj * subject,int enc)179 void listTypeConvert(robj *subject, int enc) {
180 serverAssertWithInfo(NULL,subject,subject->type==OBJ_LIST);
181 serverAssertWithInfo(NULL,subject,subject->encoding==OBJ_ENCODING_ZIPLIST);
182
183 if (enc == OBJ_ENCODING_QUICKLIST) {
184 size_t zlen = server.list_max_ziplist_size;
185 int depth = server.list_compress_depth;
186 subject->ptr = quicklistCreateFromZiplist(zlen, depth, subject->ptr);
187 subject->encoding = OBJ_ENCODING_QUICKLIST;
188 } else {
189 serverPanic("Unsupported list conversion");
190 }
191 }
192
193 /*-----------------------------------------------------------------------------
194 * List Commands
195 *----------------------------------------------------------------------------*/
196
pushGenericCommand(client * c,int where)197 void pushGenericCommand(client *c, int where) {
198 int j, pushed = 0;
199 robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
200
201 if (lobj && lobj->type != OBJ_LIST) {
202 addReply(c,shared.wrongtypeerr);
203 return;
204 }
205
206 for (j = 2; j < c->argc; j++) {
207 if (!lobj) {
208 lobj = createQuicklistObject();
209 quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
210 server.list_compress_depth);
211 dbAdd(c->db,c->argv[1],lobj);
212 }
213 listTypePush(lobj,c->argv[j],where);
214 pushed++;
215 }
216 addReplyLongLong(c, (lobj ? listTypeLength(lobj) : 0));
217 if (pushed) {
218 char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
219
220 signalModifiedKey(c->db,c->argv[1]);
221 notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
222 }
223 server.dirty += pushed;
224 }
225
lpushCommand(client * c)226 void lpushCommand(client *c) {
227 pushGenericCommand(c,LIST_HEAD);
228 }
229
rpushCommand(client * c)230 void rpushCommand(client *c) {
231 pushGenericCommand(c,LIST_TAIL);
232 }
233
pushxGenericCommand(client * c,int where)234 void pushxGenericCommand(client *c, int where) {
235 int j, pushed = 0;
236 robj *subject;
237
238 if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
239 checkType(c,subject,OBJ_LIST)) return;
240
241 for (j = 2; j < c->argc; j++) {
242 listTypePush(subject,c->argv[j],where);
243 pushed++;
244 }
245
246 addReplyLongLong(c,listTypeLength(subject));
247
248 if (pushed) {
249 char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
250 signalModifiedKey(c->db,c->argv[1]);
251 notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
252 }
253 server.dirty += pushed;
254 }
255
lpushxCommand(client * c)256 void lpushxCommand(client *c) {
257 pushxGenericCommand(c,LIST_HEAD);
258 }
259
rpushxCommand(client * c)260 void rpushxCommand(client *c) {
261 pushxGenericCommand(c,LIST_TAIL);
262 }
263
linsertCommand(client * c)264 void linsertCommand(client *c) {
265 int where;
266 robj *subject;
267 listTypeIterator *iter;
268 listTypeEntry entry;
269 int inserted = 0;
270
271 if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
272 where = LIST_TAIL;
273 } else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
274 where = LIST_HEAD;
275 } else {
276 addReply(c,shared.syntaxerr);
277 return;
278 }
279
280 if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
281 checkType(c,subject,OBJ_LIST)) return;
282
283 /* Seek pivot from head to tail */
284 iter = listTypeInitIterator(subject,0,LIST_TAIL);
285 while (listTypeNext(iter,&entry)) {
286 if (listTypeEqual(&entry,c->argv[3])) {
287 listTypeInsert(&entry,c->argv[4],where);
288 inserted = 1;
289 break;
290 }
291 }
292 listTypeReleaseIterator(iter);
293
294 if (inserted) {
295 signalModifiedKey(c->db,c->argv[1]);
296 notifyKeyspaceEvent(NOTIFY_LIST,"linsert",
297 c->argv[1],c->db->id);
298 server.dirty++;
299 } else {
300 /* Notify client of a failed insert */
301 addReply(c,shared.cnegone);
302 return;
303 }
304
305 addReplyLongLong(c,listTypeLength(subject));
306 }
307
llenCommand(client * c)308 void llenCommand(client *c) {
309 robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
310 if (o == NULL || checkType(c,o,OBJ_LIST)) return;
311 addReplyLongLong(c,listTypeLength(o));
312 }
313
lindexCommand(client * c)314 void lindexCommand(client *c) {
315 robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
316 if (o == NULL || checkType(c,o,OBJ_LIST)) return;
317 long index;
318 robj *value = NULL;
319
320 if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
321 return;
322
323 if (o->encoding == OBJ_ENCODING_QUICKLIST) {
324 quicklistEntry entry;
325 if (quicklistIndex(o->ptr, index, &entry)) {
326 if (entry.value) {
327 value = createStringObject((char*)entry.value,entry.sz);
328 } else {
329 value = createStringObjectFromLongLong(entry.longval);
330 }
331 addReplyBulk(c,value);
332 decrRefCount(value);
333 } else {
334 addReply(c,shared.nullbulk);
335 }
336 } else {
337 serverPanic("Unknown list encoding");
338 }
339 }
340
lsetCommand(client * c)341 void lsetCommand(client *c) {
342 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
343 if (o == NULL || checkType(c,o,OBJ_LIST)) return;
344 long index;
345 robj *value = c->argv[3];
346
347 if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
348 return;
349
350 if (o->encoding == OBJ_ENCODING_QUICKLIST) {
351 quicklist *ql = o->ptr;
352 int replaced = quicklistReplaceAtIndex(ql, index,
353 value->ptr, sdslen(value->ptr));
354 if (!replaced) {
355 addReply(c,shared.outofrangeerr);
356 } else {
357 addReply(c,shared.ok);
358 signalModifiedKey(c->db,c->argv[1]);
359 notifyKeyspaceEvent(NOTIFY_LIST,"lset",c->argv[1],c->db->id);
360 server.dirty++;
361 }
362 } else {
363 serverPanic("Unknown list encoding");
364 }
365 }
366
popGenericCommand(client * c,int where)367 void popGenericCommand(client *c, int where) {
368 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);
369 if (o == NULL || checkType(c,o,OBJ_LIST)) return;
370
371 robj *value = listTypePop(o,where);
372 if (value == NULL) {
373 addReply(c,shared.nullbulk);
374 } else {
375 char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
376
377 addReplyBulk(c,value);
378 decrRefCount(value);
379 notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
380 if (listTypeLength(o) == 0) {
381 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
382 c->argv[1],c->db->id);
383 dbDelete(c->db,c->argv[1]);
384 }
385 signalModifiedKey(c->db,c->argv[1]);
386 server.dirty++;
387 }
388 }
389
lpopCommand(client * c)390 void lpopCommand(client *c) {
391 popGenericCommand(c,LIST_HEAD);
392 }
393
rpopCommand(client * c)394 void rpopCommand(client *c) {
395 popGenericCommand(c,LIST_TAIL);
396 }
397
lrangeCommand(client * c)398 void lrangeCommand(client *c) {
399 robj *o;
400 long start, end, llen, rangelen;
401
402 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
403 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
404
405 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
406 || checkType(c,o,OBJ_LIST)) return;
407 llen = listTypeLength(o);
408
409 /* convert negative indexes */
410 if (start < 0) start = llen+start;
411 if (end < 0) end = llen+end;
412 if (start < 0) start = 0;
413
414 /* Invariant: start >= 0, so this test will be true when end < 0.
415 * The range is empty when start > end or start >= length. */
416 if (start > end || start >= llen) {
417 addReply(c,shared.emptymultibulk);
418 return;
419 }
420 if (end >= llen) end = llen-1;
421 rangelen = (end-start)+1;
422
423 /* Return the result in form of a multi-bulk reply */
424 addReplyMultiBulkLen(c,rangelen);
425 if (o->encoding == OBJ_ENCODING_QUICKLIST) {
426 listTypeIterator *iter = listTypeInitIterator(o, start, LIST_TAIL);
427
428 while(rangelen--) {
429 listTypeEntry entry;
430 listTypeNext(iter, &entry);
431 quicklistEntry *qe = &entry.entry;
432 if (qe->value) {
433 addReplyBulkCBuffer(c,qe->value,qe->sz);
434 } else {
435 addReplyBulkLongLong(c,qe->longval);
436 }
437 }
438 listTypeReleaseIterator(iter);
439 } else {
440 serverPanic("List encoding is not QUICKLIST!");
441 }
442 }
443
ltrimCommand(client * c)444 void ltrimCommand(client *c) {
445 robj *o;
446 long start, end, llen, ltrim, rtrim;
447
448 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
449 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
450
451 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
452 checkType(c,o,OBJ_LIST)) return;
453 llen = listTypeLength(o);
454
455 /* convert negative indexes */
456 if (start < 0) start = llen+start;
457 if (end < 0) end = llen+end;
458 if (start < 0) start = 0;
459
460 /* Invariant: start >= 0, so this test will be true when end < 0.
461 * The range is empty when start > end or start >= length. */
462 if (start > end || start >= llen) {
463 /* Out of range start or start > end result in empty list */
464 ltrim = llen;
465 rtrim = 0;
466 } else {
467 if (end >= llen) end = llen-1;
468 ltrim = start;
469 rtrim = llen-end-1;
470 }
471
472 /* Remove list elements to perform the trim */
473 if (o->encoding == OBJ_ENCODING_QUICKLIST) {
474 quicklistDelRange(o->ptr,0,ltrim);
475 quicklistDelRange(o->ptr,-rtrim,rtrim);
476 } else {
477 serverPanic("Unknown list encoding");
478 }
479
480 notifyKeyspaceEvent(NOTIFY_LIST,"ltrim",c->argv[1],c->db->id);
481 if (listTypeLength(o) == 0) {
482 dbDelete(c->db,c->argv[1]);
483 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
484 }
485 signalModifiedKey(c->db,c->argv[1]);
486 server.dirty++;
487 addReply(c,shared.ok);
488 }
489
lremCommand(client * c)490 void lremCommand(client *c) {
491 robj *subject, *obj;
492 obj = c->argv[3];
493 long toremove;
494 long removed = 0;
495
496 if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != C_OK))
497 return;
498
499 subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
500 if (subject == NULL || checkType(c,subject,OBJ_LIST)) return;
501
502 listTypeIterator *li;
503 if (toremove < 0) {
504 toremove = -toremove;
505 li = listTypeInitIterator(subject,-1,LIST_HEAD);
506 } else {
507 li = listTypeInitIterator(subject,0,LIST_TAIL);
508 }
509
510 listTypeEntry entry;
511 while (listTypeNext(li,&entry)) {
512 if (listTypeEqual(&entry,obj)) {
513 listTypeDelete(li, &entry);
514 server.dirty++;
515 removed++;
516 if (toremove && removed == toremove) break;
517 }
518 }
519 listTypeReleaseIterator(li);
520
521 if (removed) {
522 signalModifiedKey(c->db,c->argv[1]);
523 notifyKeyspaceEvent(NOTIFY_LIST,"lrem",c->argv[1],c->db->id);
524 }
525
526 if (listTypeLength(subject) == 0) {
527 dbDelete(c->db,c->argv[1]);
528 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
529 }
530
531 addReplyLongLong(c,removed);
532 }
533
534 /* This is the semantic of this command:
535 * RPOPLPUSH srclist dstlist:
536 * IF LLEN(srclist) > 0
537 * element = RPOP srclist
538 * LPUSH dstlist element
539 * RETURN element
540 * ELSE
541 * RETURN nil
542 * END
543 * END
544 *
545 * The idea is to be able to get an element from a list in a reliable way
546 * since the element is not just returned but pushed against another list
547 * as well. This command was originally proposed by Ezra Zygmuntowicz.
548 */
549
rpoplpushHandlePush(client * c,robj * dstkey,robj * dstobj,robj * value)550 void rpoplpushHandlePush(client *c, robj *dstkey, robj *dstobj, robj *value) {
551 /* Create the list if the key does not exist */
552 if (!dstobj) {
553 dstobj = createQuicklistObject();
554 quicklistSetOptions(dstobj->ptr, server.list_max_ziplist_size,
555 server.list_compress_depth);
556 dbAdd(c->db,dstkey,dstobj);
557 }
558 signalModifiedKey(c->db,dstkey);
559 listTypePush(dstobj,value,LIST_HEAD);
560 notifyKeyspaceEvent(NOTIFY_LIST,"lpush",dstkey,c->db->id);
561 /* Always send the pushed value to the client. */
562 addReplyBulk(c,value);
563 }
564
rpoplpushCommand(client * c)565 void rpoplpushCommand(client *c) {
566 robj *sobj, *value;
567 if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
568 checkType(c,sobj,OBJ_LIST)) return;
569
570 if (listTypeLength(sobj) == 0) {
571 /* This may only happen after loading very old RDB files. Recent
572 * versions of Redis delete keys of empty lists. */
573 addReply(c,shared.nullbulk);
574 } else {
575 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
576 robj *touchedkey = c->argv[1];
577
578 if (dobj && checkType(c,dobj,OBJ_LIST)) return;
579 value = listTypePop(sobj,LIST_TAIL);
580 /* We saved touched key, and protect it, since rpoplpushHandlePush
581 * may change the client command argument vector (it does not
582 * currently). */
583 incrRefCount(touchedkey);
584 rpoplpushHandlePush(c,c->argv[2],dobj,value);
585
586 /* listTypePop returns an object with its refcount incremented */
587 decrRefCount(value);
588
589 /* Delete the source list when it is empty */
590 notifyKeyspaceEvent(NOTIFY_LIST,"rpop",touchedkey,c->db->id);
591 if (listTypeLength(sobj) == 0) {
592 dbDelete(c->db,touchedkey);
593 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
594 touchedkey,c->db->id);
595 }
596 signalModifiedKey(c->db,touchedkey);
597 decrRefCount(touchedkey);
598 server.dirty++;
599 if (c->cmd->proc == brpoplpushCommand) {
600 rewriteClientCommandVector(c,3,shared.rpoplpush,c->argv[1],c->argv[2]);
601 }
602 }
603 }
604
605 /*-----------------------------------------------------------------------------
606 * Blocking POP operations
607 *----------------------------------------------------------------------------*/
608
609 /* This is a helper function for handleClientsBlockedOnLists(). It's work
610 * is to serve a specific client (receiver) that is blocked on 'key'
611 * in the context of the specified 'db', doing the following:
612 *
613 * 1) Provide the client with the 'value' element.
614 * 2) If the dstkey is not NULL (we are serving a BRPOPLPUSH) also push the
615 * 'value' element on the destination list (the LPUSH side of the command).
616 * 3) Propagate the resulting BRPOP, BLPOP and additional LPUSH if any into
617 * the AOF and replication channel.
618 *
619 * The argument 'where' is LIST_TAIL or LIST_HEAD, and indicates if the
620 * 'value' element was popped from the head (BLPOP) or tail (BRPOP) so that
621 * we can propagate the command properly.
622 *
623 * The function returns C_OK if we are able to serve the client, otherwise
624 * C_ERR is returned to signal the caller that the list POP operation
625 * should be undone as the client was not served: This only happens for
626 * BRPOPLPUSH that fails to push the value to the destination key as it is
627 * of the wrong type. */
serveClientBlockedOnList(client * receiver,robj * key,robj * dstkey,redisDb * db,robj * value,int where)628 int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
629 {
630 robj *argv[3];
631
632 if (dstkey == NULL) {
633 /* Propagate the [LR]POP operation. */
634 argv[0] = (where == LIST_HEAD) ? shared.lpop :
635 shared.rpop;
636 argv[1] = key;
637 propagate((where == LIST_HEAD) ?
638 server.lpopCommand : server.rpopCommand,
639 db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
640
641 /* BRPOP/BLPOP */
642 addReplyMultiBulkLen(receiver,2);
643 addReplyBulk(receiver,key);
644 addReplyBulk(receiver,value);
645
646 /* Notify event. */
647 char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
648 notifyKeyspaceEvent(NOTIFY_LIST,event,key,receiver->db->id);
649 } else {
650 /* BRPOPLPUSH */
651 robj *dstobj =
652 lookupKeyWrite(receiver->db,dstkey);
653 if (!(dstobj &&
654 checkType(receiver,dstobj,OBJ_LIST)))
655 {
656 /* Propagate the RPOP operation. */
657 argv[0] = shared.rpop;
658 argv[1] = key;
659 propagate(server.rpopCommand,
660 db->id,argv,2,
661 PROPAGATE_AOF|
662 PROPAGATE_REPL);
663 rpoplpushHandlePush(receiver,dstkey,dstobj,
664 value);
665 /* Propagate the LPUSH operation. */
666 argv[0] = shared.lpush;
667 argv[1] = dstkey;
668 argv[2] = value;
669 propagate(server.lpushCommand,
670 db->id,argv,3,
671 PROPAGATE_AOF|
672 PROPAGATE_REPL);
673
674 /* Notify event ("lpush" was notified by rpoplpushHandlePush). */
675 notifyKeyspaceEvent(NOTIFY_LIST,"rpop",key,receiver->db->id);
676 } else {
677 /* BRPOPLPUSH failed because of wrong
678 * destination type. */
679 return C_ERR;
680 }
681 }
682 return C_OK;
683 }
684
685 /* Blocking RPOP/LPOP */
blockingPopGenericCommand(client * c,int where)686 void blockingPopGenericCommand(client *c, int where) {
687 robj *o;
688 mstime_t timeout;
689 int j;
690
691 if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
692 != C_OK) return;
693
694 for (j = 1; j < c->argc-1; j++) {
695 o = lookupKeyWrite(c->db,c->argv[j]);
696 if (o != NULL) {
697 if (o->type != OBJ_LIST) {
698 addReply(c,shared.wrongtypeerr);
699 return;
700 } else {
701 if (listTypeLength(o) != 0) {
702 /* Non empty list, this is like a non normal [LR]POP. */
703 char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
704 robj *value = listTypePop(o,where);
705 serverAssert(value != NULL);
706
707 addReplyMultiBulkLen(c,2);
708 addReplyBulk(c,c->argv[j]);
709 addReplyBulk(c,value);
710 decrRefCount(value);
711 notifyKeyspaceEvent(NOTIFY_LIST,event,
712 c->argv[j],c->db->id);
713 if (listTypeLength(o) == 0) {
714 dbDelete(c->db,c->argv[j]);
715 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
716 c->argv[j],c->db->id);
717 }
718 signalModifiedKey(c->db,c->argv[j]);
719 server.dirty++;
720
721 /* Replicate it as an [LR]POP instead of B[LR]POP. */
722 rewriteClientCommandVector(c,2,
723 (where == LIST_HEAD) ? shared.lpop : shared.rpop,
724 c->argv[j]);
725 return;
726 }
727 }
728 }
729 }
730
731 /* If we are inside a MULTI/EXEC and the list is empty the only thing
732 * we can do is treating it as a timeout (even with timeout 0). */
733 if (c->flags & CLIENT_MULTI) {
734 addReply(c,shared.nullmultibulk);
735 return;
736 }
737
738 /* If the list is empty or the key does not exists we must block */
739 blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,NULL);
740 }
741
blpopCommand(client * c)742 void blpopCommand(client *c) {
743 blockingPopGenericCommand(c,LIST_HEAD);
744 }
745
brpopCommand(client * c)746 void brpopCommand(client *c) {
747 blockingPopGenericCommand(c,LIST_TAIL);
748 }
749
brpoplpushCommand(client * c)750 void brpoplpushCommand(client *c) {
751 mstime_t timeout;
752
753 if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS)
754 != C_OK) return;
755
756 robj *key = lookupKeyWrite(c->db, c->argv[1]);
757
758 if (key == NULL) {
759 if (c->flags & CLIENT_MULTI) {
760 /* Blocking against an empty list in a multi state
761 * returns immediately. */
762 addReply(c, shared.nullbulk);
763 } else {
764 /* The list is empty and the client blocks. */
765 blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,timeout,c->argv[2],NULL);
766 }
767 } else {
768 if (key->type != OBJ_LIST) {
769 addReply(c, shared.wrongtypeerr);
770 } else {
771 /* The list exists and has elements, so
772 * the regular rpoplpushCommand is executed. */
773 serverAssertWithInfo(c,key,listTypeLength(key) > 0);
774 rpoplpushCommand(c);
775 }
776 }
777 }
778