1 /*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include "server.h"
31
32 /*-----------------------------------------------------------------------------
33 * List API
34 *----------------------------------------------------------------------------*/
35
36 /* The function pushes an element to the specified list object 'subject',
37 * at head or tail position as specified by 'where'.
38 *
39 * There is no need for the caller to increment the refcount of 'value' as
40 * the function takes care of it if needed. */
listTypePush(robj * subject,robj * value,int where)41 void listTypePush(robj *subject, robj *value, int where) {
42 if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
43 int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
44 value = getDecodedObject(value);
45 size_t len = sdslen(value->ptr);
46 quicklistPush(subject->ptr, value->ptr, len, pos);
47 decrRefCount(value);
48 } else {
49 serverPanic("Unknown list encoding");
50 }
51 }
52
listPopSaver(unsigned char * data,unsigned int sz)53 void *listPopSaver(unsigned char *data, unsigned int sz) {
54 return createStringObject((char*)data,sz);
55 }
56
listTypePop(robj * subject,int where)57 robj *listTypePop(robj *subject, int where) {
58 long long vlong;
59 robj *value = NULL;
60
61 int ql_where = where == LIST_HEAD ? QUICKLIST_HEAD : QUICKLIST_TAIL;
62 if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
63 if (quicklistPopCustom(subject->ptr, ql_where, (unsigned char **)&value,
64 NULL, &vlong, listPopSaver)) {
65 if (!value)
66 value = createStringObjectFromLongLong(vlong);
67 }
68 } else {
69 serverPanic("Unknown list encoding");
70 }
71 return value;
72 }
73
listTypeLength(robj * subject)74 unsigned long listTypeLength(robj *subject) {
75 if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
76 return quicklistCount(subject->ptr);
77 } else {
78 serverPanic("Unknown list encoding");
79 }
80 }
81
82 /* Initialize an iterator at the specified index. */
listTypeInitIterator(robj * subject,long index,unsigned char direction)83 listTypeIterator *listTypeInitIterator(robj *subject, long index,
84 unsigned char direction) {
85 listTypeIterator *li = zmalloc(sizeof(listTypeIterator));
86 li->subject = subject;
87 li->encoding = subject->encoding;
88 li->direction = direction;
89 li->iter = NULL;
90 /* LIST_HEAD means start at TAIL and move *towards* head.
91 * LIST_TAIL means start at HEAD and move *towards tail. */
92 int iter_direction =
93 direction == LIST_HEAD ? AL_START_TAIL : AL_START_HEAD;
94 if (li->encoding == OBJ_ENCODING_QUICKLIST) {
95 li->iter = quicklistGetIteratorAtIdx(li->subject->ptr,
96 iter_direction, index);
97 } else {
98 serverPanic("Unknown list encoding");
99 }
100 return li;
101 }
102
103 /* Clean up the iterator. */
listTypeReleaseIterator(listTypeIterator * li)104 void listTypeReleaseIterator(listTypeIterator *li) {
105 zfree(li->iter);
106 zfree(li);
107 }
108
109 /* Stores pointer to current the entry in the provided entry structure
110 * and advances the position of the iterator. Returns 1 when the current
111 * entry is in fact an entry, 0 otherwise. */
listTypeNext(listTypeIterator * li,listTypeEntry * entry)112 int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {
113 /* Protect from converting when iterating */
114 serverAssert(li->subject->encoding == li->encoding);
115
116 entry->li = li;
117 if (li->encoding == OBJ_ENCODING_QUICKLIST) {
118 return quicklistNext(li->iter, &entry->entry);
119 } else {
120 serverPanic("Unknown list encoding");
121 }
122 return 0;
123 }
124
125 /* Return entry or NULL at the current position of the iterator. */
listTypeGet(listTypeEntry * entry)126 robj *listTypeGet(listTypeEntry *entry) {
127 robj *value = NULL;
128 if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
129 if (entry->entry.value) {
130 value = createStringObject((char *)entry->entry.value,
131 entry->entry.sz);
132 } else {
133 value = createStringObjectFromLongLong(entry->entry.longval);
134 }
135 } else {
136 serverPanic("Unknown list encoding");
137 }
138 return value;
139 }
140
listTypeInsert(listTypeEntry * entry,robj * value,int where)141 void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
142 if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
143 value = getDecodedObject(value);
144 sds str = value->ptr;
145 size_t len = sdslen(str);
146 if (where == LIST_TAIL) {
147 quicklistInsertAfter((quicklist *)entry->entry.quicklist,
148 &entry->entry, str, len);
149 } else if (where == LIST_HEAD) {
150 quicklistInsertBefore((quicklist *)entry->entry.quicklist,
151 &entry->entry, str, len);
152 }
153 decrRefCount(value);
154 } else {
155 serverPanic("Unknown list encoding");
156 }
157 }
158
159 /* Compare the given object with the entry at the current position. */
listTypeEqual(listTypeEntry * entry,robj * o)160 int listTypeEqual(listTypeEntry *entry, robj *o) {
161 if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
162 serverAssertWithInfo(NULL,o,sdsEncodedObject(o));
163 return quicklistCompare(entry->entry.zi,o->ptr,sdslen(o->ptr));
164 } else {
165 serverPanic("Unknown list encoding");
166 }
167 }
168
169 /* Delete the element pointed to. */
listTypeDelete(listTypeIterator * iter,listTypeEntry * entry)170 void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry) {
171 if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
172 quicklistDelEntry(iter->iter, &entry->entry);
173 } else {
174 serverPanic("Unknown list encoding");
175 }
176 }
177
178 /* Create a quicklist from a single ziplist */
listTypeConvert(robj * subject,int enc)179 void listTypeConvert(robj *subject, int enc) {
180 serverAssertWithInfo(NULL,subject,subject->type==OBJ_LIST);
181 serverAssertWithInfo(NULL,subject,subject->encoding==OBJ_ENCODING_ZIPLIST);
182
183 if (enc == OBJ_ENCODING_QUICKLIST) {
184 size_t zlen = server.list_max_ziplist_size;
185 int depth = server.list_compress_depth;
186 subject->ptr = quicklistCreateFromZiplist(zlen, depth, subject->ptr);
187 subject->encoding = OBJ_ENCODING_QUICKLIST;
188 } else {
189 serverPanic("Unsupported list conversion");
190 }
191 }
192
193 /*-----------------------------------------------------------------------------
194 * List Commands
195 *----------------------------------------------------------------------------*/
196
pushGenericCommand(client * c,int where)197 void pushGenericCommand(client *c, int where) {
198 int j, waiting = 0, pushed = 0;
199 robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
200
201 if (lobj && lobj->type != OBJ_LIST) {
202 addReply(c,shared.wrongtypeerr);
203 return;
204 }
205
206 for (j = 2; j < c->argc; j++) {
207 c->argv[j] = tryObjectEncoding(c->argv[j]);
208 if (!lobj) {
209 lobj = createQuicklistObject();
210 quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
211 server.list_compress_depth);
212 dbAdd(c->db,c->argv[1],lobj);
213 }
214 listTypePush(lobj,c->argv[j],where);
215 pushed++;
216 }
217 addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
218 if (pushed) {
219 char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
220
221 signalModifiedKey(c->db,c->argv[1]);
222 notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
223 }
224 server.dirty += pushed;
225 }
226
lpushCommand(client * c)227 void lpushCommand(client *c) {
228 pushGenericCommand(c,LIST_HEAD);
229 }
230
rpushCommand(client * c)231 void rpushCommand(client *c) {
232 pushGenericCommand(c,LIST_TAIL);
233 }
234
pushxGenericCommand(client * c,robj * refval,robj * val,int where)235 void pushxGenericCommand(client *c, robj *refval, robj *val, int where) {
236 robj *subject;
237 listTypeIterator *iter;
238 listTypeEntry entry;
239 int inserted = 0;
240
241 if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
242 checkType(c,subject,OBJ_LIST)) return;
243
244 if (refval != NULL) {
245 /* Seek refval from head to tail */
246 iter = listTypeInitIterator(subject,0,LIST_TAIL);
247 while (listTypeNext(iter,&entry)) {
248 if (listTypeEqual(&entry,refval)) {
249 listTypeInsert(&entry,val,where);
250 inserted = 1;
251 break;
252 }
253 }
254 listTypeReleaseIterator(iter);
255
256 if (inserted) {
257 signalModifiedKey(c->db,c->argv[1]);
258 notifyKeyspaceEvent(NOTIFY_LIST,"linsert",
259 c->argv[1],c->db->id);
260 server.dirty++;
261 } else {
262 /* Notify client of a failed insert */
263 addReply(c,shared.cnegone);
264 return;
265 }
266 } else {
267 char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
268
269 listTypePush(subject,val,where);
270 signalModifiedKey(c->db,c->argv[1]);
271 notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
272 server.dirty++;
273 }
274
275 addReplyLongLong(c,listTypeLength(subject));
276 }
277
lpushxCommand(client * c)278 void lpushxCommand(client *c) {
279 c->argv[2] = tryObjectEncoding(c->argv[2]);
280 pushxGenericCommand(c,NULL,c->argv[2],LIST_HEAD);
281 }
282
rpushxCommand(client * c)283 void rpushxCommand(client *c) {
284 c->argv[2] = tryObjectEncoding(c->argv[2]);
285 pushxGenericCommand(c,NULL,c->argv[2],LIST_TAIL);
286 }
287
linsertCommand(client * c)288 void linsertCommand(client *c) {
289 c->argv[4] = tryObjectEncoding(c->argv[4]);
290 if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
291 pushxGenericCommand(c,c->argv[3],c->argv[4],LIST_TAIL);
292 } else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
293 pushxGenericCommand(c,c->argv[3],c->argv[4],LIST_HEAD);
294 } else {
295 addReply(c,shared.syntaxerr);
296 }
297 }
298
llenCommand(client * c)299 void llenCommand(client *c) {
300 robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
301 if (o == NULL || checkType(c,o,OBJ_LIST)) return;
302 addReplyLongLong(c,listTypeLength(o));
303 }
304
lindexCommand(client * c)305 void lindexCommand(client *c) {
306 robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
307 if (o == NULL || checkType(c,o,OBJ_LIST)) return;
308 long index;
309 robj *value = NULL;
310
311 if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
312 return;
313
314 if (o->encoding == OBJ_ENCODING_QUICKLIST) {
315 quicklistEntry entry;
316 if (quicklistIndex(o->ptr, index, &entry)) {
317 if (entry.value) {
318 value = createStringObject((char*)entry.value,entry.sz);
319 } else {
320 value = createStringObjectFromLongLong(entry.longval);
321 }
322 addReplyBulk(c,value);
323 decrRefCount(value);
324 } else {
325 addReply(c,shared.nullbulk);
326 }
327 } else {
328 serverPanic("Unknown list encoding");
329 }
330 }
331
lsetCommand(client * c)332 void lsetCommand(client *c) {
333 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
334 if (o == NULL || checkType(c,o,OBJ_LIST)) return;
335 long index;
336 robj *value = c->argv[3];
337
338 if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
339 return;
340
341 if (o->encoding == OBJ_ENCODING_QUICKLIST) {
342 quicklist *ql = o->ptr;
343 int replaced = quicklistReplaceAtIndex(ql, index,
344 value->ptr, sdslen(value->ptr));
345 if (!replaced) {
346 addReply(c,shared.outofrangeerr);
347 } else {
348 addReply(c,shared.ok);
349 signalModifiedKey(c->db,c->argv[1]);
350 notifyKeyspaceEvent(NOTIFY_LIST,"lset",c->argv[1],c->db->id);
351 server.dirty++;
352 }
353 } else {
354 serverPanic("Unknown list encoding");
355 }
356 }
357
popGenericCommand(client * c,int where)358 void popGenericCommand(client *c, int where) {
359 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);
360 if (o == NULL || checkType(c,o,OBJ_LIST)) return;
361
362 robj *value = listTypePop(o,where);
363 if (value == NULL) {
364 addReply(c,shared.nullbulk);
365 } else {
366 char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
367
368 addReplyBulk(c,value);
369 decrRefCount(value);
370 notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
371 if (listTypeLength(o) == 0) {
372 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
373 c->argv[1],c->db->id);
374 dbDelete(c->db,c->argv[1]);
375 }
376 signalModifiedKey(c->db,c->argv[1]);
377 server.dirty++;
378 }
379 }
380
lpopCommand(client * c)381 void lpopCommand(client *c) {
382 popGenericCommand(c,LIST_HEAD);
383 }
384
rpopCommand(client * c)385 void rpopCommand(client *c) {
386 popGenericCommand(c,LIST_TAIL);
387 }
388
lrangeCommand(client * c)389 void lrangeCommand(client *c) {
390 robj *o;
391 long start, end, llen, rangelen;
392
393 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
394 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
395
396 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
397 || checkType(c,o,OBJ_LIST)) return;
398 llen = listTypeLength(o);
399
400 /* convert negative indexes */
401 if (start < 0) start = llen+start;
402 if (end < 0) end = llen+end;
403 if (start < 0) start = 0;
404
405 /* Invariant: start >= 0, so this test will be true when end < 0.
406 * The range is empty when start > end or start >= length. */
407 if (start > end || start >= llen) {
408 addReply(c,shared.emptymultibulk);
409 return;
410 }
411 if (end >= llen) end = llen-1;
412 rangelen = (end-start)+1;
413
414 /* Return the result in form of a multi-bulk reply */
415 addReplyMultiBulkLen(c,rangelen);
416 if (o->encoding == OBJ_ENCODING_QUICKLIST) {
417 listTypeIterator *iter = listTypeInitIterator(o, start, LIST_TAIL);
418
419 while(rangelen--) {
420 listTypeEntry entry;
421 listTypeNext(iter, &entry);
422 quicklistEntry *qe = &entry.entry;
423 if (qe->value) {
424 addReplyBulkCBuffer(c,qe->value,qe->sz);
425 } else {
426 addReplyBulkLongLong(c,qe->longval);
427 }
428 }
429 listTypeReleaseIterator(iter);
430 } else {
431 serverPanic("List encoding is not QUICKLIST!");
432 }
433 }
434
ltrimCommand(client * c)435 void ltrimCommand(client *c) {
436 robj *o;
437 long start, end, llen, ltrim, rtrim;
438
439 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
440 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
441
442 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
443 checkType(c,o,OBJ_LIST)) return;
444 llen = listTypeLength(o);
445
446 /* convert negative indexes */
447 if (start < 0) start = llen+start;
448 if (end < 0) end = llen+end;
449 if (start < 0) start = 0;
450
451 /* Invariant: start >= 0, so this test will be true when end < 0.
452 * The range is empty when start > end or start >= length. */
453 if (start > end || start >= llen) {
454 /* Out of range start or start > end result in empty list */
455 ltrim = llen;
456 rtrim = 0;
457 } else {
458 if (end >= llen) end = llen-1;
459 ltrim = start;
460 rtrim = llen-end-1;
461 }
462
463 /* Remove list elements to perform the trim */
464 if (o->encoding == OBJ_ENCODING_QUICKLIST) {
465 quicklistDelRange(o->ptr,0,ltrim);
466 quicklistDelRange(o->ptr,-rtrim,rtrim);
467 } else {
468 serverPanic("Unknown list encoding");
469 }
470
471 notifyKeyspaceEvent(NOTIFY_LIST,"ltrim",c->argv[1],c->db->id);
472 if (listTypeLength(o) == 0) {
473 dbDelete(c->db,c->argv[1]);
474 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
475 }
476 signalModifiedKey(c->db,c->argv[1]);
477 server.dirty++;
478 addReply(c,shared.ok);
479 }
480
lremCommand(client * c)481 void lremCommand(client *c) {
482 robj *subject, *obj;
483 obj = c->argv[3];
484 long toremove;
485 long removed = 0;
486
487 if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != C_OK))
488 return;
489
490 subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
491 if (subject == NULL || checkType(c,subject,OBJ_LIST)) return;
492
493 listTypeIterator *li;
494 if (toremove < 0) {
495 toremove = -toremove;
496 li = listTypeInitIterator(subject,-1,LIST_HEAD);
497 } else {
498 li = listTypeInitIterator(subject,0,LIST_TAIL);
499 }
500
501 listTypeEntry entry;
502 while (listTypeNext(li,&entry)) {
503 if (listTypeEqual(&entry,obj)) {
504 listTypeDelete(li, &entry);
505 server.dirty++;
506 removed++;
507 if (toremove && removed == toremove) break;
508 }
509 }
510 listTypeReleaseIterator(li);
511
512 if (removed) {
513 signalModifiedKey(c->db,c->argv[1]);
514 notifyKeyspaceEvent(NOTIFY_GENERIC,"lrem",c->argv[1],c->db->id);
515 }
516
517 if (listTypeLength(subject) == 0) {
518 dbDelete(c->db,c->argv[1]);
519 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
520 }
521
522 addReplyLongLong(c,removed);
523 }
524
525 /* This is the semantic of this command:
526 * RPOPLPUSH srclist dstlist:
527 * IF LLEN(srclist) > 0
528 * element = RPOP srclist
529 * LPUSH dstlist element
530 * RETURN element
531 * ELSE
532 * RETURN nil
533 * END
534 * END
535 *
536 * The idea is to be able to get an element from a list in a reliable way
537 * since the element is not just returned but pushed against another list
538 * as well. This command was originally proposed by Ezra Zygmuntowicz.
539 */
540
rpoplpushHandlePush(client * c,robj * dstkey,robj * dstobj,robj * value)541 void rpoplpushHandlePush(client *c, robj *dstkey, robj *dstobj, robj *value) {
542 /* Create the list if the key does not exist */
543 if (!dstobj) {
544 dstobj = createQuicklistObject();
545 quicklistSetOptions(dstobj->ptr, server.list_max_ziplist_size,
546 server.list_compress_depth);
547 dbAdd(c->db,dstkey,dstobj);
548 }
549 signalModifiedKey(c->db,dstkey);
550 listTypePush(dstobj,value,LIST_HEAD);
551 notifyKeyspaceEvent(NOTIFY_LIST,"lpush",dstkey,c->db->id);
552 /* Always send the pushed value to the client. */
553 addReplyBulk(c,value);
554 }
555
rpoplpushCommand(client * c)556 void rpoplpushCommand(client *c) {
557 robj *sobj, *value;
558 if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
559 checkType(c,sobj,OBJ_LIST)) return;
560
561 if (listTypeLength(sobj) == 0) {
562 /* This may only happen after loading very old RDB files. Recent
563 * versions of Redis delete keys of empty lists. */
564 addReply(c,shared.nullbulk);
565 } else {
566 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
567 robj *touchedkey = c->argv[1];
568
569 if (dobj && checkType(c,dobj,OBJ_LIST)) return;
570 value = listTypePop(sobj,LIST_TAIL);
571 /* We saved touched key, and protect it, since rpoplpushHandlePush
572 * may change the client command argument vector (it does not
573 * currently). */
574 incrRefCount(touchedkey);
575 rpoplpushHandlePush(c,c->argv[2],dobj,value);
576
577 /* listTypePop returns an object with its refcount incremented */
578 decrRefCount(value);
579
580 /* Delete the source list when it is empty */
581 notifyKeyspaceEvent(NOTIFY_LIST,"rpop",touchedkey,c->db->id);
582 if (listTypeLength(sobj) == 0) {
583 dbDelete(c->db,touchedkey);
584 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
585 touchedkey,c->db->id);
586 }
587 signalModifiedKey(c->db,touchedkey);
588 decrRefCount(touchedkey);
589 server.dirty++;
590 }
591 }
592
593 /*-----------------------------------------------------------------------------
594 * Blocking POP operations
595 *----------------------------------------------------------------------------*/
596
597 /* This is how the current blocking POP works, we use BLPOP as example:
598 * - If the user calls BLPOP and the key exists and contains a non empty list
599 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
600 * if blocking is not required.
601 * - If instead BLPOP is called and the key does not exists or the list is
602 * empty we need to block. In order to do so we remove the notification for
603 * new data to read in the client socket (so that we'll not serve new
604 * requests if the blocking request is not served). Also we put the client
605 * in a dictionary (db->blocking_keys) mapping keys to a list of clients
606 * blocking for this keys.
607 * - If a PUSH operation against a key with blocked clients waiting is
608 * performed, we mark this key as "ready", and after the current command,
609 * MULTI/EXEC block, or script, is executed, we serve all the clients waiting
610 * for this list, from the one that blocked first, to the last, accordingly
611 * to the number of elements we have in the ready list.
612 */
613
614 /* Set a client in blocking mode for the specified key, with the specified
615 * timeout */
blockForKeys(client * c,robj ** keys,int numkeys,mstime_t timeout,robj * target)616 void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
617 dictEntry *de;
618 list *l;
619 int j;
620
621 c->bpop.timeout = timeout;
622 c->bpop.target = target;
623
624 if (target != NULL) incrRefCount(target);
625
626 for (j = 0; j < numkeys; j++) {
627 /* If the key already exists in the dict ignore it. */
628 if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
629 incrRefCount(keys[j]);
630
631 /* And in the other "side", to map keys -> clients */
632 de = dictFind(c->db->blocking_keys,keys[j]);
633 if (de == NULL) {
634 int retval;
635
636 /* For every key we take a list of clients blocked for it */
637 l = listCreate();
638 retval = dictAdd(c->db->blocking_keys,keys[j],l);
639 incrRefCount(keys[j]);
640 serverAssertWithInfo(c,keys[j],retval == DICT_OK);
641 } else {
642 l = dictGetVal(de);
643 }
644 listAddNodeTail(l,c);
645 }
646 blockClient(c,BLOCKED_LIST);
647 }
648
649 /* Unblock a client that's waiting in a blocking operation such as BLPOP.
650 * You should never call this function directly, but unblockClient() instead. */
unblockClientWaitingData(client * c)651 void unblockClientWaitingData(client *c) {
652 dictEntry *de;
653 dictIterator *di;
654 list *l;
655
656 serverAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0);
657 di = dictGetIterator(c->bpop.keys);
658 /* The client may wait for multiple keys, so unblock it for every key. */
659 while((de = dictNext(di)) != NULL) {
660 robj *key = dictGetKey(de);
661
662 /* Remove this client from the list of clients waiting for this key. */
663 l = dictFetchValue(c->db->blocking_keys,key);
664 serverAssertWithInfo(c,key,l != NULL);
665 listDelNode(l,listSearchKey(l,c));
666 /* If the list is empty we need to remove it to avoid wasting memory */
667 if (listLength(l) == 0)
668 dictDelete(c->db->blocking_keys,key);
669 }
670 dictReleaseIterator(di);
671
672 /* Cleanup the client structure */
673 dictEmpty(c->bpop.keys,NULL);
674 if (c->bpop.target) {
675 decrRefCount(c->bpop.target);
676 c->bpop.target = NULL;
677 }
678 }
679
680 /* If the specified key has clients blocked waiting for list pushes, this
681 * function will put the key reference into the server.ready_keys list.
682 * Note that db->ready_keys is a hash table that allows us to avoid putting
683 * the same key again and again in the list in case of multiple pushes
684 * made by a script or in the context of MULTI/EXEC.
685 *
686 * The list will be finally processed by handleClientsBlockedOnLists() */
signalListAsReady(redisDb * db,robj * key)687 void signalListAsReady(redisDb *db, robj *key) {
688 readyList *rl;
689
690 /* No clients blocking for this key? No need to queue it. */
691 if (dictFind(db->blocking_keys,key) == NULL) return;
692
693 /* Key was already signaled? No need to queue it again. */
694 if (dictFind(db->ready_keys,key) != NULL) return;
695
696 /* Ok, we need to queue this key into server.ready_keys. */
697 rl = zmalloc(sizeof(*rl));
698 rl->key = key;
699 rl->db = db;
700 incrRefCount(key);
701 listAddNodeTail(server.ready_keys,rl);
702
703 /* We also add the key in the db->ready_keys dictionary in order
704 * to avoid adding it multiple times into a list with a simple O(1)
705 * check. */
706 incrRefCount(key);
707 serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
708 }
709
710 /* This is a helper function for handleClientsBlockedOnLists(). It's work
711 * is to serve a specific client (receiver) that is blocked on 'key'
712 * in the context of the specified 'db', doing the following:
713 *
714 * 1) Provide the client with the 'value' element.
715 * 2) If the dstkey is not NULL (we are serving a BRPOPLPUSH) also push the
716 * 'value' element on the destination list (the LPUSH side of the command).
717 * 3) Propagate the resulting BRPOP, BLPOP and additional LPUSH if any into
718 * the AOF and replication channel.
719 *
720 * The argument 'where' is LIST_TAIL or LIST_HEAD, and indicates if the
721 * 'value' element was popped fron the head (BLPOP) or tail (BRPOP) so that
722 * we can propagate the command properly.
723 *
724 * The function returns C_OK if we are able to serve the client, otherwise
725 * C_ERR is returned to signal the caller that the list POP operation
726 * should be undone as the client was not served: This only happens for
727 * BRPOPLPUSH that fails to push the value to the destination key as it is
728 * of the wrong type. */
serveClientBlockedOnList(client * receiver,robj * key,robj * dstkey,redisDb * db,robj * value,int where)729 int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
730 {
731 robj *argv[3];
732
733 if (dstkey == NULL) {
734 /* Propagate the [LR]POP operation. */
735 argv[0] = (where == LIST_HEAD) ? shared.lpop :
736 shared.rpop;
737 argv[1] = key;
738 propagate((where == LIST_HEAD) ?
739 server.lpopCommand : server.rpopCommand,
740 db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
741
742 /* BRPOP/BLPOP */
743 addReplyMultiBulkLen(receiver,2);
744 addReplyBulk(receiver,key);
745 addReplyBulk(receiver,value);
746 } else {
747 /* BRPOPLPUSH */
748 robj *dstobj =
749 lookupKeyWrite(receiver->db,dstkey);
750 if (!(dstobj &&
751 checkType(receiver,dstobj,OBJ_LIST)))
752 {
753 /* Propagate the RPOP operation. */
754 argv[0] = shared.rpop;
755 argv[1] = key;
756 propagate(server.rpopCommand,
757 db->id,argv,2,
758 PROPAGATE_AOF|
759 PROPAGATE_REPL);
760 rpoplpushHandlePush(receiver,dstkey,dstobj,
761 value);
762 /* Propagate the LPUSH operation. */
763 argv[0] = shared.lpush;
764 argv[1] = dstkey;
765 argv[2] = value;
766 propagate(server.lpushCommand,
767 db->id,argv,3,
768 PROPAGATE_AOF|
769 PROPAGATE_REPL);
770 } else {
771 /* BRPOPLPUSH failed because of wrong
772 * destination type. */
773 return C_ERR;
774 }
775 }
776 return C_OK;
777 }
778
779 /* This function should be called by Redis every time a single command,
780 * a MULTI/EXEC block, or a Lua script, terminated its execution after
781 * being called by a client.
782 *
783 * All the keys with at least one client blocked that received at least
784 * one new element via some PUSH operation are accumulated into
785 * the server.ready_keys list. This function will run the list and will
786 * serve clients accordingly. Note that the function will iterate again and
787 * again as a result of serving BRPOPLPUSH we can have new blocking clients
788 * to serve because of the PUSH side of BRPOPLPUSH. */
handleClientsBlockedOnLists(void)789 void handleClientsBlockedOnLists(void) {
790 while(listLength(server.ready_keys) != 0) {
791 list *l;
792
793 /* Point server.ready_keys to a fresh list and save the current one
794 * locally. This way as we run the old list we are free to call
795 * signalListAsReady() that may push new elements in server.ready_keys
796 * when handling clients blocked into BRPOPLPUSH. */
797 l = server.ready_keys;
798 server.ready_keys = listCreate();
799
800 while(listLength(l) != 0) {
801 listNode *ln = listFirst(l);
802 readyList *rl = ln->value;
803
804 /* First of all remove this key from db->ready_keys so that
805 * we can safely call signalListAsReady() against this key. */
806 dictDelete(rl->db->ready_keys,rl->key);
807
808 /* If the key exists and it's a list, serve blocked clients
809 * with data. */
810 robj *o = lookupKeyWrite(rl->db,rl->key);
811 if (o != NULL && o->type == OBJ_LIST) {
812 dictEntry *de;
813
814 /* We serve clients in the same order they blocked for
815 * this key, from the first blocked to the last. */
816 de = dictFind(rl->db->blocking_keys,rl->key);
817 if (de) {
818 list *clients = dictGetVal(de);
819 int numclients = listLength(clients);
820
821 while(numclients--) {
822 listNode *clientnode = listFirst(clients);
823 client *receiver = clientnode->value;
824 robj *dstkey = receiver->bpop.target;
825 int where = (receiver->lastcmd &&
826 receiver->lastcmd->proc == blpopCommand) ?
827 LIST_HEAD : LIST_TAIL;
828 robj *value = listTypePop(o,where);
829
830 if (value) {
831 /* Protect receiver->bpop.target, that will be
832 * freed by the next unblockClient()
833 * call. */
834 if (dstkey) incrRefCount(dstkey);
835 unblockClient(receiver);
836
837 if (serveClientBlockedOnList(receiver,
838 rl->key,dstkey,rl->db,value,
839 where) == C_ERR)
840 {
841 /* If we failed serving the client we need
842 * to also undo the POP operation. */
843 listTypePush(o,value,where);
844 }
845
846 if (dstkey) decrRefCount(dstkey);
847 decrRefCount(value);
848 } else {
849 break;
850 }
851 }
852 }
853
854 if (listTypeLength(o) == 0) {
855 dbDelete(rl->db,rl->key);
856 }
857 /* We don't call signalModifiedKey() as it was already called
858 * when an element was pushed on the list. */
859 }
860
861 /* Free this item. */
862 decrRefCount(rl->key);
863 zfree(rl);
864 listDelNode(l,ln);
865 }
866 listRelease(l); /* We have the new list on place at this point. */
867 }
868 }
869
870 /* Blocking RPOP/LPOP */
blockingPopGenericCommand(client * c,int where)871 void blockingPopGenericCommand(client *c, int where) {
872 robj *o;
873 mstime_t timeout;
874 int j;
875
876 if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
877 != C_OK) return;
878
879 for (j = 1; j < c->argc-1; j++) {
880 o = lookupKeyWrite(c->db,c->argv[j]);
881 if (o != NULL) {
882 if (o->type != OBJ_LIST) {
883 addReply(c,shared.wrongtypeerr);
884 return;
885 } else {
886 if (listTypeLength(o) != 0) {
887 /* Non empty list, this is like a non normal [LR]POP. */
888 char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
889 robj *value = listTypePop(o,where);
890 serverAssert(value != NULL);
891
892 addReplyMultiBulkLen(c,2);
893 addReplyBulk(c,c->argv[j]);
894 addReplyBulk(c,value);
895 decrRefCount(value);
896 notifyKeyspaceEvent(NOTIFY_LIST,event,
897 c->argv[j],c->db->id);
898 if (listTypeLength(o) == 0) {
899 dbDelete(c->db,c->argv[j]);
900 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
901 c->argv[j],c->db->id);
902 }
903 signalModifiedKey(c->db,c->argv[j]);
904 server.dirty++;
905
906 /* Replicate it as an [LR]POP instead of B[LR]POP. */
907 rewriteClientCommandVector(c,2,
908 (where == LIST_HEAD) ? shared.lpop : shared.rpop,
909 c->argv[j]);
910 return;
911 }
912 }
913 }
914 }
915
916 /* If we are inside a MULTI/EXEC and the list is empty the only thing
917 * we can do is treating it as a timeout (even with timeout 0). */
918 if (c->flags & CLIENT_MULTI) {
919 addReply(c,shared.nullmultibulk);
920 return;
921 }
922
923 /* If the list is empty or the key does not exists we must block */
924 blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
925 }
926
blpopCommand(client * c)927 void blpopCommand(client *c) {
928 blockingPopGenericCommand(c,LIST_HEAD);
929 }
930
brpopCommand(client * c)931 void brpopCommand(client *c) {
932 blockingPopGenericCommand(c,LIST_TAIL);
933 }
934
brpoplpushCommand(client * c)935 void brpoplpushCommand(client *c) {
936 mstime_t timeout;
937
938 if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS)
939 != C_OK) return;
940
941 robj *key = lookupKeyWrite(c->db, c->argv[1]);
942
943 if (key == NULL) {
944 if (c->flags & CLIENT_MULTI) {
945 /* Blocking against an empty list in a multi state
946 * returns immediately. */
947 addReply(c, shared.nullbulk);
948 } else {
949 /* The list is empty and the client blocks. */
950 blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
951 }
952 } else {
953 if (key->type != OBJ_LIST) {
954 addReply(c, shared.wrongtypeerr);
955 } else {
956 /* The list exists and has elements, so
957 * the regular rpoplpushCommand is executed. */
958 serverAssertWithInfo(c,key,listTypeLength(key) > 0);
959 rpoplpushCommand(c);
960 }
961 }
962 }
963