xref: /f-stack/app/redis-5.0.5/src/t_stream.c (revision 572c4311)
1 /*
2  * Copyright (c) 2017, Salvatore Sanfilippo <antirez at gmail dot com>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *   * Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *   * Redistributions in binary form must reproduce the above copyright
11  *     notice, this list of conditions and the following disclaimer in the
12  *     documentation and/or other materials provided with the distribution.
13  *   * Neither the name of Redis nor the names of its contributors may be used
14  *     to endorse or promote products derived from this software without
15  *     specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include "server.h"
31 #include "endianconv.h"
32 #include "stream.h"
33 
34 #define STREAM_BYTES_PER_LISTPACK 2048
35 
36 /* Every stream item inside the listpack, has a flags field that is used to
37  * mark the entry as deleted, or having the same field as the "master"
38  * entry at the start of the listpack> */
39 #define STREAM_ITEM_FLAG_NONE 0             /* No special flags. */
40 #define STREAM_ITEM_FLAG_DELETED (1<<0)     /* Entry is delted. Skip it. */
41 #define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1)  /* Same fields as master entry. */
42 
43 void streamFreeCG(streamCG *cg);
44 void streamFreeNACK(streamNACK *na);
45 size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer);
46 
47 /* -----------------------------------------------------------------------
48  * Low level stream encoding: a radix tree of listpacks.
49  * ----------------------------------------------------------------------- */
50 
51 /* Create a new stream data structure. */
streamNew(void)52 stream *streamNew(void) {
53     stream *s = zmalloc(sizeof(*s));
54     s->rax = raxNew();
55     s->length = 0;
56     s->last_id.ms = 0;
57     s->last_id.seq = 0;
58     s->cgroups = NULL; /* Created on demand to save memory when not used. */
59     return s;
60 }
61 
62 /* Free a stream, including the listpacks stored inside the radix tree. */
freeStream(stream * s)63 void freeStream(stream *s) {
64     raxFreeWithCallback(s->rax,(void(*)(void*))lpFree);
65     if (s->cgroups)
66         raxFreeWithCallback(s->cgroups,(void(*)(void*))streamFreeCG);
67     zfree(s);
68 }
69 
70 /* Generate the next stream item ID given the previous one. If the current
71  * milliseconds Unix time is greater than the previous one, just use this
72  * as time part and start with sequence part of zero. Otherwise we use the
73  * previous time (and never go backward) and increment the sequence. */
streamNextID(streamID * last_id,streamID * new_id)74 void streamNextID(streamID *last_id, streamID *new_id) {
75     uint64_t ms = mstime();
76     if (ms > last_id->ms) {
77         new_id->ms = ms;
78         new_id->seq = 0;
79     } else {
80         new_id->ms = last_id->ms;
81         new_id->seq = last_id->seq+1;
82     }
83 }
84 
85 /* This is just a wrapper for lpAppend() to directly use a 64 bit integer
86  * instead of a string. */
lpAppendInteger(unsigned char * lp,int64_t value)87 unsigned char *lpAppendInteger(unsigned char *lp, int64_t value) {
88     char buf[LONG_STR_SIZE];
89     int slen = ll2string(buf,sizeof(buf),value);
90     return lpAppend(lp,(unsigned char*)buf,slen);
91 }
92 
93 /* This is just a wrapper for lpReplace() to directly use a 64 bit integer
94  * instead of a string to replace the current element. The function returns
95  * the new listpack as return value, and also updates the current cursor
96  * by updating '*pos'. */
lpReplaceInteger(unsigned char * lp,unsigned char ** pos,int64_t value)97 unsigned char *lpReplaceInteger(unsigned char *lp, unsigned char **pos, int64_t value) {
98     char buf[LONG_STR_SIZE];
99     int slen = ll2string(buf,sizeof(buf),value);
100     return lpInsert(lp, (unsigned char*)buf, slen, *pos, LP_REPLACE, pos);
101 }
102 
103 /* This is a wrapper function for lpGet() to directly get an integer value
104  * from the listpack (that may store numbers as a string), converting
105  * the string if needed. */
lpGetInteger(unsigned char * ele)106 int64_t lpGetInteger(unsigned char *ele) {
107     int64_t v;
108     unsigned char *e = lpGet(ele,&v,NULL);
109     if (e == NULL) return v;
110     /* The following code path should never be used for how listpacks work:
111      * they should always be able to store an int64_t value in integer
112      * encoded form. However the implementation may change. */
113     long long ll;
114     int retval = string2ll((char*)e,v,&ll);
115     serverAssert(retval != 0);
116     v = ll;
117     return v;
118 }
119 
120 /* Debugging function to log the full content of a listpack. Useful
121  * for development and debugging. */
streamLogListpackContent(unsigned char * lp)122 void streamLogListpackContent(unsigned char *lp) {
123     unsigned char *p = lpFirst(lp);
124     while(p) {
125         unsigned char buf[LP_INTBUF_SIZE];
126         int64_t v;
127         unsigned char *ele = lpGet(p,&v,buf);
128         serverLog(LL_WARNING,"- [%d] '%.*s'", (int)v, (int)v, ele);
129         p = lpNext(lp,p);
130     }
131 }
132 
133 /* Convert the specified stream entry ID as a 128 bit big endian number, so
134  * that the IDs can be sorted lexicographically. */
streamEncodeID(void * buf,streamID * id)135 void streamEncodeID(void *buf, streamID *id) {
136     uint64_t e[2];
137     e[0] = htonu64(id->ms);
138     e[1] = htonu64(id->seq);
139     memcpy(buf,e,sizeof(e));
140 }
141 
142 /* This is the reverse of streamEncodeID(): the decoded ID will be stored
143  * in the 'id' structure passed by reference. The buffer 'buf' must point
144  * to a 128 bit big-endian encoded ID. */
streamDecodeID(void * buf,streamID * id)145 void streamDecodeID(void *buf, streamID *id) {
146     uint64_t e[2];
147     memcpy(e,buf,sizeof(e));
148     id->ms = ntohu64(e[0]);
149     id->seq = ntohu64(e[1]);
150 }
151 
152 /* Compare two stream IDs. Return -1 if a < b, 0 if a == b, 1 if a > b. */
streamCompareID(streamID * a,streamID * b)153 int streamCompareID(streamID *a, streamID *b) {
154     if (a->ms > b->ms) return 1;
155     else if (a->ms < b->ms) return -1;
156     /* The ms part is the same. Check the sequence part. */
157     else if (a->seq > b->seq) return 1;
158     else if (a->seq < b->seq) return -1;
159     /* Everything is the same: IDs are equal. */
160     return 0;
161 }
162 
163 /* Adds a new item into the stream 's' having the specified number of
164  * field-value pairs as specified in 'numfields' and stored into 'argv'.
165  * Returns the new entry ID populating the 'added_id' structure.
166  *
167  * If 'use_id' is not NULL, the ID is not auto-generated by the function,
168  * but instead the passed ID is uesd to add the new entry. In this case
169  * adding the entry may fail as specified later in this comment.
170  *
171  * The function returns C_OK if the item was added, this is always true
172  * if the ID was generated by the function. However the function may return
173  * C_ERR if an ID was given via 'use_id', but adding it failed since the
174  * current top ID is greater or equal. */
streamAppendItem(stream * s,robj ** argv,int64_t numfields,streamID * added_id,streamID * use_id)175 int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id) {
176     /* If an ID was given, check that it's greater than the last entry ID
177      * or return an error. */
178     if (use_id && streamCompareID(use_id,&s->last_id) <= 0) return C_ERR;
179 
180     /* Add the new entry. */
181     raxIterator ri;
182     raxStart(&ri,s->rax);
183     raxSeek(&ri,"$",NULL,0);
184 
185     size_t lp_bytes = 0;        /* Total bytes in the tail listpack. */
186     unsigned char *lp = NULL;   /* Tail listpack pointer. */
187 
188     /* Get a reference to the tail node listpack. */
189     if (raxNext(&ri)) {
190         lp = ri.data;
191         lp_bytes = lpBytes(lp);
192     }
193     raxStop(&ri);
194 
195     /* Generate the new entry ID. */
196     streamID id;
197     if (use_id)
198         id = *use_id;
199     else
200         streamNextID(&s->last_id,&id);
201 
202     /* We have to add the key into the radix tree in lexicographic order,
203      * to do so we consider the ID as a single 128 bit number written in
204      * big endian, so that the most significant bytes are the first ones. */
205     uint64_t rax_key[2];    /* Key in the radix tree containing the listpack.*/
206     streamID master_id;     /* ID of the master entry in the listpack. */
207 
208     /* Create a new listpack and radix tree node if needed. Note that when
209      * a new listpack is created, we populate it with a "master entry". This
210      * is just a set of fields that is taken as references in order to compress
211      * the stream entries that we'll add inside the listpack.
212      *
213      * Note that while we use the first added entry fields to create
214      * the master entry, the first added entry is NOT represented in the master
215      * entry, which is a stand alone object. But of course, the first entry
216      * will compress well because it's used as reference.
217      *
218      * The master entry is composed like in the following example:
219      *
220      * +-------+---------+------------+---------+--/--+---------+---------+-+
221      * | count | deleted | num-fields | field_1 | field_2 | ... | field_N |0|
222      * +-------+---------+------------+---------+--/--+---------+---------+-+
223      *
224      * count and deleted just represent respectively the total number of
225      * entries inside the listpack that are valid, and marked as deleted
226      * (delted flag in the entry flags set). So the total number of items
227      * actually inside the listpack (both deleted and not) is count+deleted.
228      *
229      * The real entries will be encoded with an ID that is just the
230      * millisecond and sequence difference compared to the key stored at
231      * the radix tree node containing the listpack (delta encoding), and
232      * if the fields of the entry are the same as the master enty fields, the
233      * entry flags will specify this fact and the entry fields and number
234      * of fields will be omitted (see later in the code of this function).
235      *
236      * The "0" entry at the end is the same as the 'lp-count' entry in the
237      * regular stream entries (see below), and marks the fact that there are
238      * no more entries, when we scan the stream from right to left. */
239 
240     /* First of all, check if we can append to the current macro node or
241      * if we need to switch to the next one. 'lp' will be set to NULL if
242      * the current node is full. */
243     if (lp != NULL) {
244         if (server.stream_node_max_bytes &&
245             lp_bytes > server.stream_node_max_bytes)
246         {
247             lp = NULL;
248         } else if (server.stream_node_max_entries) {
249             int64_t count = lpGetInteger(lpFirst(lp));
250             if (count > server.stream_node_max_entries) lp = NULL;
251         }
252     }
253 
254     int flags = STREAM_ITEM_FLAG_NONE;
255     if (lp == NULL || lp_bytes > server.stream_node_max_bytes) {
256         master_id = id;
257         streamEncodeID(rax_key,&id);
258         /* Create the listpack having the master entry ID and fields. */
259         lp = lpNew();
260         lp = lpAppendInteger(lp,1); /* One item, the one we are adding. */
261         lp = lpAppendInteger(lp,0); /* Zero deleted so far. */
262         lp = lpAppendInteger(lp,numfields);
263         for (int64_t i = 0; i < numfields; i++) {
264             sds field = argv[i*2]->ptr;
265             lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
266         }
267         lp = lpAppendInteger(lp,0); /* Master entry zero terminator. */
268         raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
269         /* The first entry we insert, has obviously the same fields of the
270          * master entry. */
271         flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
272     } else {
273         serverAssert(ri.key_len == sizeof(rax_key));
274         memcpy(rax_key,ri.key,sizeof(rax_key));
275 
276         /* Read the master ID from the radix tree key. */
277         streamDecodeID(rax_key,&master_id);
278         unsigned char *lp_ele = lpFirst(lp);
279 
280         /* Update count and skip the deleted fields. */
281         int64_t count = lpGetInteger(lp_ele);
282         lp = lpReplaceInteger(lp,&lp_ele,count+1);
283         lp_ele = lpNext(lp,lp_ele); /* seek deleted. */
284         lp_ele = lpNext(lp,lp_ele); /* seek master entry num fields. */
285 
286         /* Check if the entry we are adding, have the same fields
287          * as the master entry. */
288         int64_t master_fields_count = lpGetInteger(lp_ele);
289         lp_ele = lpNext(lp,lp_ele);
290         if (numfields == master_fields_count) {
291             int64_t i;
292             for (i = 0; i < master_fields_count; i++) {
293                 sds field = argv[i*2]->ptr;
294                 int64_t e_len;
295                 unsigned char buf[LP_INTBUF_SIZE];
296                 unsigned char *e = lpGet(lp_ele,&e_len,buf);
297                 /* Stop if there is a mismatch. */
298                 if (sdslen(field) != (size_t)e_len ||
299                     memcmp(e,field,e_len) != 0) break;
300                 lp_ele = lpNext(lp,lp_ele);
301             }
302             /* All fields are the same! We can compress the field names
303              * setting a single bit in the flags. */
304             if (i == master_fields_count) flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
305         }
306     }
307 
308     /* Populate the listpack with the new entry. We use the following
309      * encoding:
310      *
311      * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+
312      * |flags|entry-id|num-fields|field-1|value-1|...|field-N|value-N|lp-count|
313      * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+
314      *
315      * However if the SAMEFIELD flag is set, we have just to populate
316      * the entry with the values, so it becomes:
317      *
318      * +-----+--------+-------+-/-+-------+--------+
319      * |flags|entry-id|value-1|...|value-N|lp-count|
320      * +-----+--------+-------+-/-+-------+--------+
321      *
322      * The entry-id field is actually two separated fields: the ms
323      * and seq difference compared to the master entry.
324      *
325      * The lp-count field is a number that states the number of listpack pieces
326      * that compose the entry, so that it's possible to travel the entry
327      * in reverse order: we can just start from the end of the listpack, read
328      * the entry, and jump back N times to seek the "flags" field to read
329      * the stream full entry. */
330     lp = lpAppendInteger(lp,flags);
331     lp = lpAppendInteger(lp,id.ms - master_id.ms);
332     lp = lpAppendInteger(lp,id.seq - master_id.seq);
333     if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
334         lp = lpAppendInteger(lp,numfields);
335     for (int64_t i = 0; i < numfields; i++) {
336         sds field = argv[i*2]->ptr, value = argv[i*2+1]->ptr;
337         if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
338             lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
339         lp = lpAppend(lp,(unsigned char*)value,sdslen(value));
340     }
341     /* Compute and store the lp-count field. */
342     int64_t lp_count = numfields;
343     lp_count += 3; /* Add the 3 fixed fields flags + ms-diff + seq-diff. */
344     if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) {
345         /* If the item is not compressed, it also has the fields other than
346          * the values, and an additional num-fileds field. */
347         lp_count += numfields+1;
348     }
349     lp = lpAppendInteger(lp,lp_count);
350 
351     /* Insert back into the tree in order to update the listpack pointer. */
352     if (ri.data != lp)
353         raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
354     s->length++;
355     s->last_id = id;
356     if (added_id) *added_id = id;
357     return C_OK;
358 }
359 
360 /* Trim the stream 's' to have no more than maxlen elements, and return the
361  * number of elements removed from the stream. The 'approx' option, if non-zero,
362  * specifies that the trimming must be performed in a approximated way in
363  * order to maximize performances. This means that the stream may contain
364  * more elements than 'maxlen', and elements are only removed if we can remove
365  * a *whole* node of the radix tree. The elements are removed from the head
366  * of the stream (older elements).
367  *
368  * The function may return zero if:
369  *
370  * 1) The stream is already shorter or equal to the specified max length.
371  * 2) The 'approx' option is true and the head node had not enough elements
372  *    to be deleted, leaving the stream with a number of elements >= maxlen.
373  */
streamTrimByLength(stream * s,size_t maxlen,int approx)374 int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
375     if (s->length <= maxlen) return 0;
376 
377     raxIterator ri;
378     raxStart(&ri,s->rax);
379     raxSeek(&ri,"^",NULL,0);
380 
381     int64_t deleted = 0;
382     while(s->length > maxlen && raxNext(&ri)) {
383         unsigned char *lp = ri.data, *p = lpFirst(lp);
384         int64_t entries = lpGetInteger(p);
385 
386         /* Check if we can remove the whole node, and still have at
387          * least maxlen elements. */
388         if (s->length - entries >= maxlen) {
389             lpFree(lp);
390             raxRemove(s->rax,ri.key,ri.key_len,NULL);
391             raxSeek(&ri,">=",ri.key,ri.key_len);
392             s->length -= entries;
393             deleted += entries;
394             continue;
395         }
396 
397         /* If we cannot remove a whole element, and approx is true,
398          * stop here. */
399         if (approx) break;
400 
401         /* Otherwise, we have to mark single entries inside the listpack
402          * as deleted. We start by updating the entries/deleted counters. */
403         int64_t to_delete = s->length - maxlen;
404         serverAssert(to_delete < entries);
405         lp = lpReplaceInteger(lp,&p,entries-to_delete);
406         p = lpNext(lp,p); /* Seek deleted field. */
407         int64_t marked_deleted = lpGetInteger(p);
408         lp = lpReplaceInteger(lp,&p,marked_deleted+to_delete);
409         p = lpNext(lp,p); /* Seek num-of-fields in the master entry. */
410 
411         /* Skip all the master fields. */
412         int64_t master_fields_count = lpGetInteger(p);
413         p = lpNext(lp,p); /* Seek the first field. */
414         for (int64_t j = 0; j < master_fields_count; j++)
415             p = lpNext(lp,p); /* Skip all master fields. */
416         p = lpNext(lp,p); /* Skip the zero master entry terminator. */
417 
418         /* 'p' is now pointing to the first entry inside the listpack.
419          * We have to run entry after entry, marking entries as deleted
420          * if they are already not deleted. */
421         while(p) {
422             int flags = lpGetInteger(p);
423             int to_skip;
424 
425             /* Mark the entry as deleted. */
426             if (!(flags & STREAM_ITEM_FLAG_DELETED)) {
427                 flags |= STREAM_ITEM_FLAG_DELETED;
428                 lp = lpReplaceInteger(lp,&p,flags);
429                 deleted++;
430                 s->length--;
431                 if (s->length <= maxlen) break; /* Enough entries deleted. */
432             }
433 
434             p = lpNext(lp,p); /* Skip ID ms delta. */
435             p = lpNext(lp,p); /* Skip ID seq delta. */
436             p = lpNext(lp,p); /* Seek num-fields or values (if compressed). */
437             if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
438                 to_skip = master_fields_count;
439             } else {
440                 to_skip = lpGetInteger(p);
441                 to_skip = 1+(to_skip*2);
442             }
443 
444             while(to_skip--) p = lpNext(lp,p); /* Skip the whole entry. */
445             p = lpNext(lp,p); /* Skip the final lp-count field. */
446         }
447 
448         /* Here we should perform garbage collection in case at this point
449          * there are too many entries deleted inside the listpack. */
450         entries -= to_delete;
451         marked_deleted += to_delete;
452         if (entries + marked_deleted > 10 && marked_deleted > entries/2) {
453             /* TODO: perform a garbage collection. */
454         }
455 
456         /* Update the listpack with the new pointer. */
457         raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);
458 
459         break; /* If we are here, there was enough to delete in the current
460                   node, so no need to go to the next node. */
461     }
462 
463     raxStop(&ri);
464     return deleted;
465 }
466 
467 /* Initialize the stream iterator, so that we can call iterating functions
468  * to get the next items. This requires a corresponding streamIteratorStop()
469  * at the end. The 'rev' parameter controls the direction. If it's zero the
470  * iteration is from the start to the end element (inclusive), otherwise
471  * if rev is non-zero, the iteration is reversed.
472  *
473  * Once the iterator is initialized, we iterate like this:
474  *
475  *  streamIterator myiterator;
476  *  streamIteratorStart(&myiterator,...);
477  *  int64_t numfields;
478  *  while(streamIteratorGetID(&myiterator,&ID,&numfields)) {
479  *      while(numfields--) {
480  *          unsigned char *key, *value;
481  *          size_t key_len, value_len;
482  *          streamIteratorGetField(&myiterator,&key,&value,&key_len,&value_len);
483  *
484  *          ... do what you want with key and value ...
485  *      }
486  *  }
487  *  streamIteratorStop(&myiterator); */
streamIteratorStart(streamIterator * si,stream * s,streamID * start,streamID * end,int rev)488 void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev) {
489     /* Intialize the iterator and translates the iteration start/stop
490      * elements into a 128 big big-endian number. */
491     if (start) {
492         streamEncodeID(si->start_key,start);
493     } else {
494         si->start_key[0] = 0;
495         si->start_key[1] = 0;
496     }
497 
498     if (end) {
499         streamEncodeID(si->end_key,end);
500     } else {
501         si->end_key[0] = UINT64_MAX;
502         si->end_key[1] = UINT64_MAX;
503     }
504 
505     /* Seek the correct node in the radix tree. */
506     raxStart(&si->ri,s->rax);
507     if (!rev) {
508         if (start && (start->ms || start->seq)) {
509             raxSeek(&si->ri,"<=",(unsigned char*)si->start_key,
510                     sizeof(si->start_key));
511             if (raxEOF(&si->ri)) raxSeek(&si->ri,"^",NULL,0);
512         } else {
513             raxSeek(&si->ri,"^",NULL,0);
514         }
515     } else {
516         if (end && (end->ms || end->seq)) {
517             raxSeek(&si->ri,"<=",(unsigned char*)si->end_key,
518                     sizeof(si->end_key));
519             if (raxEOF(&si->ri)) raxSeek(&si->ri,"$",NULL,0);
520         } else {
521             raxSeek(&si->ri,"$",NULL,0);
522         }
523     }
524     si->stream = s;
525     si->lp = NULL; /* There is no current listpack right now. */
526     si->lp_ele = NULL; /* Current listpack cursor. */
527     si->rev = rev;  /* Direction, if non-zero reversed, from end to start. */
528 }
529 
530 /* Return 1 and store the current item ID at 'id' if there are still
531  * elements within the iteration range, otherwise return 0 in order to
532  * signal the iteration terminated. */
streamIteratorGetID(streamIterator * si,streamID * id,int64_t * numfields)533 int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
534     while(1) { /* Will stop when element > stop_key or end of radix tree. */
535         /* If the current listpack is set to NULL, this is the start of the
536          * iteration or the previous listpack was completely iterated.
537          * Go to the next node. */
538         if (si->lp == NULL || si->lp_ele == NULL) {
539             if (!si->rev && !raxNext(&si->ri)) return 0;
540             else if (si->rev && !raxPrev(&si->ri)) return 0;
541             serverAssert(si->ri.key_len == sizeof(streamID));
542             /* Get the master ID. */
543             streamDecodeID(si->ri.key,&si->master_id);
544             /* Get the master fields count. */
545             si->lp = si->ri.data;
546             si->lp_ele = lpFirst(si->lp);           /* Seek items count */
547             si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek deleted count. */
548             si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek num fields. */
549             si->master_fields_count = lpGetInteger(si->lp_ele);
550             si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek first field. */
551             si->master_fields_start = si->lp_ele;
552             /* We are now pointing to the first field of the master entry.
553              * We need to seek either the first or the last entry depending
554              * on the direction of the iteration. */
555             if (!si->rev) {
556                 /* If we are iterating in normal order, skip the master fields
557                  * to seek the first actual entry. */
558                 for (uint64_t i = 0; i < si->master_fields_count; i++)
559                     si->lp_ele = lpNext(si->lp,si->lp_ele);
560             } else {
561                 /* If we are iterating in reverse direction, just seek the
562                  * last part of the last entry in the listpack (that is, the
563                  * fields count). */
564                 si->lp_ele = lpLast(si->lp);
565             }
566         } else if (si->rev) {
567             /* If we are itereating in the reverse order, and this is not
568              * the first entry emitted for this listpack, then we already
569              * emitted the current entry, and have to go back to the previous
570              * one. */
571             int lp_count = lpGetInteger(si->lp_ele);
572             while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele);
573             /* Seek lp-count of prev entry. */
574             si->lp_ele = lpPrev(si->lp,si->lp_ele);
575         }
576 
577         /* For every radix tree node, iterate the corresponding listpack,
578          * returning elements when they are within range. */
579         while(1) {
580             if (!si->rev) {
581                 /* If we are going forward, skip the previous entry
582                  * lp-count field (or in case of the master entry, the zero
583                  * term field) */
584                 si->lp_ele = lpNext(si->lp,si->lp_ele);
585                 if (si->lp_ele == NULL) break;
586             } else {
587                 /* If we are going backward, read the number of elements this
588                  * entry is composed of, and jump backward N times to seek
589                  * its start. */
590                 int64_t lp_count = lpGetInteger(si->lp_ele);
591                 if (lp_count == 0) { /* We reached the master entry. */
592                     si->lp = NULL;
593                     si->lp_ele = NULL;
594                     break;
595                 }
596                 while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele);
597             }
598 
599             /* Get the flags entry. */
600             si->lp_flags = si->lp_ele;
601             int flags = lpGetInteger(si->lp_ele);
602             si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek ID. */
603 
604             /* Get the ID: it is encoded as difference between the master
605              * ID and this entry ID. */
606             *id = si->master_id;
607             id->ms += lpGetInteger(si->lp_ele);
608             si->lp_ele = lpNext(si->lp,si->lp_ele);
609             id->seq += lpGetInteger(si->lp_ele);
610             si->lp_ele = lpNext(si->lp,si->lp_ele);
611             unsigned char buf[sizeof(streamID)];
612             streamEncodeID(buf,id);
613 
614             /* The number of entries is here or not depending on the
615              * flags. */
616             if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
617                 *numfields = si->master_fields_count;
618             } else {
619                 *numfields = lpGetInteger(si->lp_ele);
620                 si->lp_ele = lpNext(si->lp,si->lp_ele);
621             }
622 
623             /* If current >= start, and the entry is not marked as
624              * deleted, emit it. */
625             if (!si->rev) {
626                 if (memcmp(buf,si->start_key,sizeof(streamID)) >= 0 &&
627                     !(flags & STREAM_ITEM_FLAG_DELETED))
628                 {
629                     if (memcmp(buf,si->end_key,sizeof(streamID)) > 0)
630                         return 0; /* We are already out of range. */
631                     si->entry_flags = flags;
632                     if (flags & STREAM_ITEM_FLAG_SAMEFIELDS)
633                         si->master_fields_ptr = si->master_fields_start;
634                     return 1; /* Valid item returned. */
635                 }
636             } else {
637                 if (memcmp(buf,si->end_key,sizeof(streamID)) <= 0 &&
638                     !(flags & STREAM_ITEM_FLAG_DELETED))
639                 {
640                     if (memcmp(buf,si->start_key,sizeof(streamID)) < 0)
641                         return 0; /* We are already out of range. */
642                     si->entry_flags = flags;
643                     if (flags & STREAM_ITEM_FLAG_SAMEFIELDS)
644                         si->master_fields_ptr = si->master_fields_start;
645                     return 1; /* Valid item returned. */
646                 }
647             }
648 
649             /* If we do not emit, we have to discard if we are going
650              * forward, or seek the previous entry if we are going
651              * backward. */
652             if (!si->rev) {
653                 int64_t to_discard = (flags & STREAM_ITEM_FLAG_SAMEFIELDS) ?
654                                       *numfields : *numfields*2;
655                 for (int64_t i = 0; i < to_discard; i++)
656                     si->lp_ele = lpNext(si->lp,si->lp_ele);
657             } else {
658                 int64_t prev_times = 4; /* flag + id ms + id seq + one more to
659                                            go back to the previous entry "count"
660                                            field. */
661                 /* If the entry was not flagged SAMEFIELD we also read the
662                  * number of fields, so go back one more. */
663                 if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) prev_times++;
664                 while(prev_times--) si->lp_ele = lpPrev(si->lp,si->lp_ele);
665             }
666         }
667 
668         /* End of listpack reached. Try the next/prev radix tree node. */
669     }
670 }
671 
672 /* Get the field and value of the current item we are iterating. This should
673  * be called immediately after streamIteratorGetID(), and for each field
674  * according to the number of fields returned by streamIteratorGetID().
675  * The function populates the field and value pointers and the corresponding
676  * lengths by reference, that are valid until the next iterator call, assuming
677  * no one touches the stream meanwhile. */
streamIteratorGetField(streamIterator * si,unsigned char ** fieldptr,unsigned char ** valueptr,int64_t * fieldlen,int64_t * valuelen)678 void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen) {
679     if (si->entry_flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
680         *fieldptr = lpGet(si->master_fields_ptr,fieldlen,si->field_buf);
681         si->master_fields_ptr = lpNext(si->lp,si->master_fields_ptr);
682     } else {
683         *fieldptr = lpGet(si->lp_ele,fieldlen,si->field_buf);
684         si->lp_ele = lpNext(si->lp,si->lp_ele);
685     }
686     *valueptr = lpGet(si->lp_ele,valuelen,si->value_buf);
687     si->lp_ele = lpNext(si->lp,si->lp_ele);
688 }
689 
690 /* Remove the current entry from the stream: can be called after the
691  * GetID() API or after any GetField() call, however we need to iterate
692  * a valid entry while calling this function. Moreover the function
693  * requires the entry ID we are currently iterating, that was previously
694  * returned by GetID().
695  *
696  * Note that after calling this function, next calls to GetField() can't
697  * be performed: the entry is now deleted. Instead the iterator will
698  * automatically re-seek to the next entry, so the caller should continue
699  * with GetID(). */
streamIteratorRemoveEntry(streamIterator * si,streamID * current)700 void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {
701     unsigned char *lp = si->lp;
702     int64_t aux;
703 
704     /* We do not really delete the entry here. Instead we mark it as
705      * deleted flagging it, and also incrementing the count of the
706      * deleted entries in the listpack header.
707      *
708      * We start flagging: */
709     int flags = lpGetInteger(si->lp_flags);
710     flags |= STREAM_ITEM_FLAG_DELETED;
711     lp = lpReplaceInteger(lp,&si->lp_flags,flags);
712 
713     /* Change the valid/deleted entries count in the master entry. */
714     unsigned char *p = lpFirst(lp);
715     aux = lpGetInteger(p);
716 
717     if (aux == 1) {
718         /* If this is the last element in the listpack, we can remove the whole
719          * node. */
720         lpFree(lp);
721         raxRemove(si->stream->rax,si->ri.key,si->ri.key_len,NULL);
722     } else {
723         /* In the base case we alter the counters of valid/deleted entries. */
724         lp = lpReplaceInteger(lp,&p,aux-1);
725         p = lpNext(lp,p); /* Seek deleted field. */
726         aux = lpGetInteger(p);
727         lp = lpReplaceInteger(lp,&p,aux+1);
728 
729         /* Update the listpack with the new pointer. */
730         if (si->lp != lp)
731             raxInsert(si->stream->rax,si->ri.key,si->ri.key_len,lp,NULL);
732     }
733 
734     /* Update the number of entries counter. */
735     si->stream->length--;
736 
737     /* Re-seek the iterator to fix the now messed up state. */
738     streamID start, end;
739     if (si->rev) {
740         streamDecodeID(si->start_key,&start);
741         end = *current;
742     } else {
743         start = *current;
744         streamDecodeID(si->end_key,&end);
745     }
746     streamIteratorStop(si);
747     streamIteratorStart(si,si->stream,&start,&end,si->rev);
748 
749     /* TODO: perform a garbage collection here if the ration between
750      * deleted and valid goes over a certain limit. */
751 }
752 
753 /* Stop the stream iterator. The only cleanup we need is to free the rax
754  * itereator, since the stream iterator itself is supposed to be stack
755  * allocated. */
streamIteratorStop(streamIterator * si)756 void streamIteratorStop(streamIterator *si) {
757     raxStop(&si->ri);
758 }
759 
760 /* Delete the specified item ID from the stream, returning 1 if the item
761  * was deleted 0 otherwise (if it does not exist). */
streamDeleteItem(stream * s,streamID * id)762 int streamDeleteItem(stream *s, streamID *id) {
763     int deleted = 0;
764     streamIterator si;
765     streamIteratorStart(&si,s,id,id,0);
766     streamID myid;
767     int64_t numfields;
768     if (streamIteratorGetID(&si,&myid,&numfields)) {
769         streamIteratorRemoveEntry(&si,&myid);
770         deleted = 1;
771     }
772     streamIteratorStop(&si);
773     return deleted;
774 }
775 
776 /* Emit a reply in the client output buffer by formatting a Stream ID
777  * in the standard <ms>-<seq> format, using the simple string protocol
778  * of REPL. */
addReplyStreamID(client * c,streamID * id)779 void addReplyStreamID(client *c, streamID *id) {
780     sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq);
781     addReplyBulkSds(c,replyid);
782 }
783 
784 /* Similar to the above function, but just creates an object, usually useful
785  * for replication purposes to create arguments. */
createObjectFromStreamID(streamID * id)786 robj *createObjectFromStreamID(streamID *id) {
787     return createObject(OBJ_STRING, sdscatfmt(sdsempty(),"%U-%U",
788                         id->ms,id->seq));
789 }
790 
791 /* As a result of an explicit XCLAIM or XREADGROUP command, new entries
792  * are created in the pending list of the stream and consumers. We need
793  * to propagate this changes in the form of XCLAIM commands. */
streamPropagateXCLAIM(client * c,robj * key,streamCG * group,robj * groupname,robj * id,streamNACK * nack)794 void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack) {
795     /* We need to generate an XCLAIM that will work in a idempotent fashion:
796      *
797      * XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time>
798      *        RETRYCOUNT <count> FORCE JUSTID LASTID <id>.
799      *
800      * Note that JUSTID is useful in order to avoid that XCLAIM will do
801      * useless work in the slave side, trying to fetch the stream item. */
802     robj *argv[14];
803     argv[0] = createStringObject("XCLAIM",6);
804     argv[1] = key;
805     argv[2] = groupname;
806     argv[3] = createStringObject(nack->consumer->name,sdslen(nack->consumer->name));
807     argv[4] = createStringObjectFromLongLong(0);
808     argv[5] = id;
809     argv[6] = createStringObject("TIME",4);
810     argv[7] = createStringObjectFromLongLong(nack->delivery_time);
811     argv[8] = createStringObject("RETRYCOUNT",10);
812     argv[9] = createStringObjectFromLongLong(nack->delivery_count);
813     argv[10] = createStringObject("FORCE",5);
814     argv[11] = createStringObject("JUSTID",6);
815     argv[12] = createStringObject("LASTID",6);
816     argv[13] = createObjectFromStreamID(&group->last_id);
817     propagate(server.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL);
818     decrRefCount(argv[0]);
819     decrRefCount(argv[3]);
820     decrRefCount(argv[4]);
821     decrRefCount(argv[6]);
822     decrRefCount(argv[7]);
823     decrRefCount(argv[8]);
824     decrRefCount(argv[9]);
825     decrRefCount(argv[10]);
826     decrRefCount(argv[11]);
827     decrRefCount(argv[12]);
828     decrRefCount(argv[13]);
829 }
830 
831 /* We need this when we want to propoagate the new last-id of a consumer group
832  * that was consumed by XREADGROUP with the NOACK option: in that case we can't
833  * propagate the last ID just using the XCLAIM LASTID option, so we emit
834  *
835  *  XGROUP SETID <key> <groupname> <id>
836  */
streamPropagateGroupID(client * c,robj * key,streamCG * group,robj * groupname)837 void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname) {
838     robj *argv[5];
839     argv[0] = createStringObject("XGROUP",6);
840     argv[1] = createStringObject("SETID",5);
841     argv[2] = key;
842     argv[3] = groupname;
843     argv[4] = createObjectFromStreamID(&group->last_id);
844     propagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL);
845     decrRefCount(argv[0]);
846     decrRefCount(argv[1]);
847     decrRefCount(argv[4]);
848 }
849 
850 /* Send the stream items in the specified range to the client 'c'. The range
851  * the client will receive is between start and end inclusive, if 'count' is
852  * non zero, no more than 'count' elements are sent.
853  *
854  * The 'end' pointer can be NULL to mean that we want all the elements from
855  * 'start' till the end of the stream. If 'rev' is non zero, elements are
856  * produced in reversed order from end to start.
857  *
858  * The function returns the number of entries emitted.
859  *
860  * If group and consumer are not NULL, the function performs additional work:
861  * 1. It updates the last delivered ID in the group in case we are
862  *    sending IDs greater than the current last ID.
863  * 2. If the requested IDs are already assigned to some other consumer, the
864  *    function will not return it to the client.
865  * 3. An entry in the pending list will be created for every entry delivered
866  *    for the first time to this consumer.
867  *
868  * The behavior may be modified passing non-zero flags:
869  *
870  * STREAM_RWR_NOACK: Do not craete PEL entries, that is, the point "3" above
871  *                   is not performed.
872  * STREAM_RWR_RAWENTRIES: Do not emit array boundaries, but just the entries,
873  *                        and return the number of entries emitted as usually.
874  *                        This is used when the function is just used in order
875  *                        to emit data and there is some higher level logic.
876  *
877  * The final argument 'spi' (stream propagatino info pointer) is a structure
878  * filled with information needed to propagte the command execution to AOF
879  * and slaves, in the case a consumer group was passed: we need to generate
880  * XCLAIM commands to create the pending list into AOF/slaves in that case.
881  *
882  * If 'spi' is set to NULL no propagation will happen even if the group was
883  * given, but currently such a feature is never used by the code base that
884  * will always pass 'spi' and propagate when a group is passed.
885  *
886  * Note that this function is recursive in certain cases. When it's called
887  * with a non NULL group and consumer argument, it may call
888  * streamReplyWithRangeFromConsumerPEL() in order to get entries from the
889  * consumer pending entries list. However such a function will then call
890  * streamReplyWithRange() in order to emit single entries (found in the
891  * PEL by ID) to the client. This is the use case for the STREAM_RWR_RAWENTRIES
892  * flag.
893  */
894 #define STREAM_RWR_NOACK (1<<0)         /* Do not create entries in the PEL. */
895 #define STREAM_RWR_RAWENTRIES (1<<1)    /* Do not emit protocol for array
896                                            boundaries, just the entries. */
897 #define STREAM_RWR_HISTORY (1<<2)       /* Only serve consumer local PEL. */
streamReplyWithRange(client * c,stream * s,streamID * start,streamID * end,size_t count,int rev,streamCG * group,streamConsumer * consumer,int flags,streamPropInfo * spi)898 size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) {
899     void *arraylen_ptr = NULL;
900     size_t arraylen = 0;
901     streamIterator si;
902     int64_t numfields;
903     streamID id;
904     int propagate_last_id = 0;
905 
906     /* If the client is asking for some history, we serve it using a
907      * different function, so that we return entries *solely* from its
908      * own PEL. This ensures each consumer will always and only see
909      * the history of messages delivered to it and not yet confirmed
910      * as delivered. */
911     if (group && (flags & STREAM_RWR_HISTORY)) {
912         return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count,
913                                                    consumer);
914     }
915 
916     if (!(flags & STREAM_RWR_RAWENTRIES))
917         arraylen_ptr = addDeferredMultiBulkLength(c);
918     streamIteratorStart(&si,s,start,end,rev);
919     while(streamIteratorGetID(&si,&id,&numfields)) {
920         /* Update the group last_id if needed. */
921         if (group && streamCompareID(&id,&group->last_id) > 0) {
922             group->last_id = id;
923             propagate_last_id = 1;
924         }
925 
926         /* Emit a two elements array for each item. The first is
927          * the ID, the second is an array of field-value pairs. */
928         addReplyMultiBulkLen(c,2);
929         addReplyStreamID(c,&id);
930         addReplyMultiBulkLen(c,numfields*2);
931 
932         /* Emit the field-value pairs. */
933         while(numfields--) {
934             unsigned char *key, *value;
935             int64_t key_len, value_len;
936             streamIteratorGetField(&si,&key,&value,&key_len,&value_len);
937             addReplyBulkCBuffer(c,key,key_len);
938             addReplyBulkCBuffer(c,value,value_len);
939         }
940 
941         /* If a group is passed, we need to create an entry in the
942          * PEL (pending entries list) of this group *and* this consumer.
943          *
944          * Note that we cannot be sure about the fact the message is not
945          * already owned by another consumer, because the admin is able
946          * to change the consumer group last delivered ID using the
947          * XGROUP SETID command. So if we find that there is already
948          * a NACK for the entry, we need to associate it to the new
949          * consumer. */
950         if (group && !(flags & STREAM_RWR_NOACK)) {
951             unsigned char buf[sizeof(streamID)];
952             streamEncodeID(buf,&id);
953 
954             /* Try to add a new NACK. Most of the time this will work and
955              * will not require extra lookups. We'll fix the problem later
956              * if we find that there is already a entry for this ID. */
957             streamNACK *nack = streamCreateNACK(consumer);
958             int group_inserted =
959                 raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL);
960             int consumer_inserted =
961                 raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
962 
963             /* Now we can check if the entry was already busy, and
964              * in that case reassign the entry to the new consumer,
965              * or update it if the consumer is the same as before. */
966             if (group_inserted == 0) {
967                 streamFreeNACK(nack);
968                 nack = raxFind(group->pel,buf,sizeof(buf));
969                 serverAssert(nack != raxNotFound);
970                 raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
971                 /* Update the consumer and NACK metadata. */
972                 nack->consumer = consumer;
973                 nack->delivery_time = mstime();
974                 nack->delivery_count = 1;
975                 /* Add the entry in the new consumer local PEL. */
976                 raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
977             } else if (group_inserted == 1 && consumer_inserted == 0) {
978                 serverPanic("NACK half-created. Should not be possible.");
979             }
980 
981             /* Propagate as XCLAIM. */
982             if (spi) {
983                 robj *idarg = createObjectFromStreamID(&id);
984                 streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack);
985                 decrRefCount(idarg);
986             }
987         } else {
988             if (propagate_last_id)
989                 streamPropagateGroupID(c,spi->keyname,group,spi->groupname);
990         }
991 
992         arraylen++;
993         if (count && count == arraylen) break;
994     }
995     streamIteratorStop(&si);
996     if (arraylen_ptr) setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
997     return arraylen;
998 }
999 
1000 /* This is an helper function for streamReplyWithRange() when called with
1001  * group and consumer arguments, but with a range that is referring to already
1002  * delivered messages. In this case we just emit messages that are already
1003  * in the history of the consumer, fetching the IDs from its PEL.
1004  *
1005  * Note that this function does not have a 'rev' argument because it's not
1006  * possible to iterate in reverse using a group. Basically this function
1007  * is only called as a result of the XREADGROUP command.
1008  *
1009  * This function is more expensive because it needs to inspect the PEL and then
1010  * seek into the radix tree of the messages in order to emit the full message
1011  * to the client. However clients only reach this code path when they are
1012  * fetching the history of already retrieved messages, which is rare. */
streamReplyWithRangeFromConsumerPEL(client * c,stream * s,streamID * start,streamID * end,size_t count,streamConsumer * consumer)1013 size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer) {
1014     raxIterator ri;
1015     unsigned char startkey[sizeof(streamID)];
1016     unsigned char endkey[sizeof(streamID)];
1017     streamEncodeID(startkey,start);
1018     if (end) streamEncodeID(endkey,end);
1019 
1020     size_t arraylen = 0;
1021     void *arraylen_ptr = addDeferredMultiBulkLength(c);
1022     raxStart(&ri,consumer->pel);
1023     raxSeek(&ri,">=",startkey,sizeof(startkey));
1024     while(raxNext(&ri) && (!count || arraylen < count)) {
1025         if (end && memcmp(ri.key,end,ri.key_len) > 0) break;
1026         streamID thisid;
1027         streamDecodeID(ri.key,&thisid);
1028         if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,NULL,NULL,
1029                                  STREAM_RWR_RAWENTRIES,NULL) == 0)
1030         {
1031             /* Note that we may have a not acknowledged entry in the PEL
1032              * about a message that's no longer here because was removed
1033              * by the user by other means. In that case we signal it emitting
1034              * the ID but then a NULL entry for the fields. */
1035             addReplyMultiBulkLen(c,2);
1036             streamID id;
1037             streamDecodeID(ri.key,&id);
1038             addReplyStreamID(c,&id);
1039             addReply(c,shared.nullmultibulk);
1040         } else {
1041             streamNACK *nack = ri.data;
1042             nack->delivery_time = mstime();
1043             nack->delivery_count++;
1044         }
1045         arraylen++;
1046     }
1047     raxStop(&ri);
1048     setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
1049     return arraylen;
1050 }
1051 
1052 /* -----------------------------------------------------------------------
1053  * Stream commands implementation
1054  * ----------------------------------------------------------------------- */
1055 
1056 /* Look the stream at 'key' and return the corresponding stream object.
1057  * The function creates a key setting it to an empty stream if needed. */
streamTypeLookupWriteOrCreate(client * c,robj * key)1058 robj *streamTypeLookupWriteOrCreate(client *c, robj *key) {
1059     robj *o = lookupKeyWrite(c->db,key);
1060     if (o == NULL) {
1061         o = createStreamObject();
1062         dbAdd(c->db,key,o);
1063     } else {
1064         if (o->type != OBJ_STREAM) {
1065             addReply(c,shared.wrongtypeerr);
1066             return NULL;
1067         }
1068     }
1069     return o;
1070 }
1071 
1072 /* Helper function to convert a string to an unsigned long long value.
1073  * The function attempts to use the faster string2ll() function inside
1074  * Redis: if it fails, strtoull() is used instead. The function returns
1075  * 1 if the conversion happened successfully or 0 if the number is
1076  * invalid or out of range. */
string2ull(const char * s,unsigned long long * value)1077 int string2ull(const char *s, unsigned long long *value) {
1078     long long ll;
1079     if (string2ll(s,strlen(s),&ll)) {
1080         if (ll < 0) return 0; /* Negative values are out of range. */
1081         *value = ll;
1082         return 1;
1083     }
1084     errno = 0;
1085     char *endptr = NULL;
1086     *value = strtoull(s,&endptr,10);
1087     if (errno == EINVAL || errno == ERANGE || !(*s != '\0' && *endptr == '\0'))
1088         return 0; /* strtoull() failed. */
1089     return 1; /* Conversion done! */
1090 }
1091 
1092 /* Parse a stream ID in the format given by clients to Redis, that is
1093  * <ms>-<seq>, and converts it into a streamID structure. If
1094  * the specified ID is invalid C_ERR is returned and an error is reported
1095  * to the client, otherwise C_OK is returned. The ID may be in incomplete
1096  * form, just stating the milliseconds time part of the stream. In such a case
1097  * the missing part is set according to the value of 'missing_seq' parameter.
1098  *
1099  * The IDs "-" and "+" specify respectively the minimum and maximum IDs
1100  * that can be represented. If 'strict' is set to 1, "-" and "+" will be
1101  * treated as an invalid ID.
1102  *
1103  * If 'c' is set to NULL, no reply is sent to the client. */
streamGenericParseIDOrReply(client * c,robj * o,streamID * id,uint64_t missing_seq,int strict)1104 int streamGenericParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int strict) {
1105     char buf[128];
1106     if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid;
1107     memcpy(buf,o->ptr,sdslen(o->ptr)+1);
1108 
1109     if (strict && (buf[0] == '-' || buf[0] == '+') && buf[1] == '\0')
1110         goto invalid;
1111 
1112     /* Handle the "-" and "+" special cases. */
1113     if (buf[0] == '-' && buf[1] == '\0') {
1114         id->ms = 0;
1115         id->seq = 0;
1116         return C_OK;
1117     } else if (buf[0] == '+' && buf[1] == '\0') {
1118         id->ms = UINT64_MAX;
1119         id->seq = UINT64_MAX;
1120         return C_OK;
1121     }
1122 
1123     /* Parse <ms>-<seq> form. */
1124     char *dot = strchr(buf,'-');
1125     if (dot) *dot = '\0';
1126     unsigned long long ms, seq;
1127     if (string2ull(buf,&ms) == 0) goto invalid;
1128     if (dot && string2ull(dot+1,&seq) == 0) goto invalid;
1129     if (!dot) seq = missing_seq;
1130     id->ms = ms;
1131     id->seq = seq;
1132     return C_OK;
1133 
1134 invalid:
1135     if (c) addReplyError(c,"Invalid stream ID specified as stream "
1136                            "command argument");
1137     return C_ERR;
1138 }
1139 
1140 /* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to
1141  * 0, to be used when - and + are accetable IDs. */
streamParseIDOrReply(client * c,robj * o,streamID * id,uint64_t missing_seq)1142 int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) {
1143     return streamGenericParseIDOrReply(c,o,id,missing_seq,0);
1144 }
1145 
1146 /* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to
1147  * 1, to be used when we want to return an error if the special IDs + or -
1148  * are provided. */
streamParseStrictIDOrReply(client * c,robj * o,streamID * id,uint64_t missing_seq)1149 int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) {
1150     return streamGenericParseIDOrReply(c,o,id,missing_seq,1);
1151 }
1152 
1153 /* We propagate MAXLEN ~ <count> as MAXLEN = <resulting-len-of-stream>
1154  * otherwise trimming is no longer determinsitic on replicas / AOF. */
streamRewriteApproxMaxlen(client * c,stream * s,int maxlen_arg_idx)1155 void streamRewriteApproxMaxlen(client *c, stream *s, int maxlen_arg_idx) {
1156     robj *maxlen_obj = createStringObjectFromLongLong(s->length);
1157     robj *equal_obj = createStringObject("=",1);
1158 
1159     rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj);
1160     rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj);
1161 
1162     decrRefCount(equal_obj);
1163     decrRefCount(maxlen_obj);
1164 }
1165 
1166 /* XADD key [MAXLEN [~|=] <count>] <ID or *> [field value] [field value] ... */
xaddCommand(client * c)1167 void xaddCommand(client *c) {
1168     streamID id;
1169     int id_given = 0; /* Was an ID different than "*" specified? */
1170     long long maxlen = -1;  /* If left to -1 no trimming is performed. */
1171     int approx_maxlen = 0;  /* If 1 only delete whole radix tree nodes, so
1172                                the maxium length is not applied verbatim. */
1173     int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */
1174 
1175     /* Parse options. */
1176     int i = 2; /* This is the first argument position where we could
1177                   find an option, or the ID. */
1178     for (; i < c->argc; i++) {
1179         int moreargs = (c->argc-1) - i; /* Number of additional arguments. */
1180         char *opt = c->argv[i]->ptr;
1181         if (opt[0] == '*' && opt[1] == '\0') {
1182             /* This is just a fast path for the common case of auto-ID
1183              * creation. */
1184             break;
1185         } else if (!strcasecmp(opt,"maxlen") && moreargs) {
1186             approx_maxlen = 0;
1187             char *next = c->argv[i+1]->ptr;
1188             /* Check for the form MAXLEN ~ <count>. */
1189             if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
1190                 approx_maxlen = 1;
1191                 i++;
1192             } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
1193                 i++;
1194             }
1195             if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)
1196                 != C_OK) return;
1197 
1198             if (maxlen < 0) {
1199                 addReplyError(c,"The MAXLEN argument must be >= 0.");
1200                 return;
1201             }
1202             i++;
1203             maxlen_arg_idx = i;
1204         } else {
1205             /* If we are here is a syntax error or a valid ID. */
1206             if (streamParseStrictIDOrReply(c,c->argv[i],&id,0) != C_OK) return;
1207             id_given = 1;
1208             break;
1209         }
1210     }
1211     int field_pos = i+1;
1212 
1213     /* Check arity. */
1214     if ((c->argc - field_pos) < 2 || ((c->argc-field_pos) % 2) == 1) {
1215         addReplyError(c,"wrong number of arguments for XADD");
1216         return;
1217     }
1218 
1219     /* Lookup the stream at key. */
1220     robj *o;
1221     stream *s;
1222     if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
1223     s = o->ptr;
1224 
1225     /* Append using the low level function and return the ID. */
1226     if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,
1227         &id, id_given ? &id : NULL)
1228         == C_ERR)
1229     {
1230         addReplyError(c,"The ID specified in XADD is equal or smaller than the "
1231                         "target stream top item");
1232         return;
1233     }
1234     addReplyStreamID(c,&id);
1235 
1236     signalModifiedKey(c->db,c->argv[1]);
1237     notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
1238     server.dirty++;
1239 
1240     if (maxlen >= 0) {
1241         /* Notify xtrim event if needed. */
1242         if (streamTrimByLength(s,maxlen,approx_maxlen)) {
1243             notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
1244         }
1245         if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx);
1246     }
1247 
1248     /* Let's rewrite the ID argument with the one actually generated for
1249      * AOF/replication propagation. */
1250     robj *idarg = createObjectFromStreamID(&id);
1251     rewriteClientCommandArgument(c,i,idarg);
1252     decrRefCount(idarg);
1253 
1254     /* We need to signal to blocked clients that there is new data on this
1255      * stream. */
1256     if (server.blocked_clients_by_type[BLOCKED_STREAM])
1257         signalKeyAsReady(c->db, c->argv[1]);
1258 }
1259 
1260 /* XRANGE/XREVRANGE actual implementation. */
xrangeGenericCommand(client * c,int rev)1261 void xrangeGenericCommand(client *c, int rev) {
1262     robj *o;
1263     stream *s;
1264     streamID startid, endid;
1265     long long count = -1;
1266     robj *startarg = rev ? c->argv[3] : c->argv[2];
1267     robj *endarg = rev ? c->argv[2] : c->argv[3];
1268 
1269     if (streamParseIDOrReply(c,startarg,&startid,0) == C_ERR) return;
1270     if (streamParseIDOrReply(c,endarg,&endid,UINT64_MAX) == C_ERR) return;
1271 
1272     /* Parse the COUNT option if any. */
1273     if (c->argc > 4) {
1274         for (int j = 4; j < c->argc; j++) {
1275             int additional = c->argc-j-1;
1276             if (strcasecmp(c->argv[j]->ptr,"COUNT") == 0 && additional >= 1) {
1277                 if (getLongLongFromObjectOrReply(c,c->argv[j+1],&count,NULL)
1278                     != C_OK) return;
1279                 if (count < 0) count = 0;
1280                 j++; /* Consume additional arg. */
1281             } else {
1282                 addReply(c,shared.syntaxerr);
1283                 return;
1284             }
1285         }
1286     }
1287 
1288     /* Return the specified range to the user. */
1289     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
1290         || checkType(c,o,OBJ_STREAM)) return;
1291     s = o->ptr;
1292 
1293     if (count == 0) {
1294         addReply(c,shared.nullmultibulk);
1295     } else {
1296         if (count == -1) count = 0;
1297         streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL);
1298     }
1299 }
1300 
1301 /* XRANGE key start end [COUNT <n>] */
xrangeCommand(client * c)1302 void xrangeCommand(client *c) {
1303     xrangeGenericCommand(c,0);
1304 }
1305 
1306 /* XREVRANGE key end start [COUNT <n>] */
xrevrangeCommand(client * c)1307 void xrevrangeCommand(client *c) {
1308     xrangeGenericCommand(c,1);
1309 }
1310 
1311 /* XLEN */
xlenCommand(client * c)1312 void xlenCommand(client *c) {
1313     robj *o;
1314     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL
1315         || checkType(c,o,OBJ_STREAM)) return;
1316     stream *s = o->ptr;
1317     addReplyLongLong(c,s->length);
1318 }
1319 
1320 /* XREAD [BLOCK <milliseconds>] [COUNT <count>] STREAMS key_1 key_2 ... key_N
1321  *       ID_1 ID_2 ... ID_N
1322  *
1323  * This function also implements the XREAD-GROUP command, which is like XREAD
1324  * but accepting the [GROUP group-name consumer-name] additional option.
1325  * This is useful because while XREAD is a read command and can be called
1326  * on slaves, XREAD-GROUP is not. */
1327 #define XREAD_BLOCKED_DEFAULT_COUNT 1000
xreadCommand(client * c)1328 void xreadCommand(client *c) {
1329     long long timeout = -1; /* -1 means, no BLOCK argument given. */
1330     long long count = 0;
1331     int streams_count = 0;
1332     int streams_arg = 0;
1333     int noack = 0;          /* True if NOACK option was specified. */
1334     #define STREAMID_STATIC_VECTOR_LEN 8
1335     streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
1336     streamID *ids = static_ids;
1337     streamCG **groups = NULL;
1338     int xreadgroup = sdslen(c->argv[0]->ptr) == 10; /* XREAD or XREADGROUP? */
1339     robj *groupname = NULL;
1340     robj *consumername = NULL;
1341 
1342     /* Parse arguments. */
1343     for (int i = 1; i < c->argc; i++) {
1344         int moreargs = c->argc-i-1;
1345         char *o = c->argv[i]->ptr;
1346         if (!strcasecmp(o,"BLOCK") && moreargs) {
1347             i++;
1348             if (getTimeoutFromObjectOrReply(c,c->argv[i],&timeout,
1349                 UNIT_MILLISECONDS) != C_OK) return;
1350         } else if (!strcasecmp(o,"COUNT") && moreargs) {
1351             i++;
1352             if (getLongLongFromObjectOrReply(c,c->argv[i],&count,NULL) != C_OK)
1353                 return;
1354             if (count < 0) count = 0;
1355         } else if (!strcasecmp(o,"STREAMS") && moreargs) {
1356             streams_arg = i+1;
1357             streams_count = (c->argc-streams_arg);
1358             if ((streams_count % 2) != 0) {
1359                 addReplyError(c,"Unbalanced XREAD list of streams: "
1360                                 "for each stream key an ID or '$' must be "
1361                                 "specified.");
1362                 return;
1363             }
1364             streams_count /= 2; /* We have two arguments for each stream. */
1365             break;
1366         } else if (!strcasecmp(o,"GROUP") && moreargs >= 2) {
1367             if (!xreadgroup) {
1368                 addReplyError(c,"The GROUP option is only supported by "
1369                                 "XREADGROUP. You called XREAD instead.");
1370                 return;
1371             }
1372             groupname = c->argv[i+1];
1373             consumername = c->argv[i+2];
1374             i += 2;
1375         } else if (!strcasecmp(o,"NOACK")) {
1376             if (!xreadgroup) {
1377                 addReplyError(c,"The NOACK option is only supported by "
1378                                 "XREADGROUP. You called XREAD instead.");
1379                 return;
1380             }
1381             noack = 1;
1382         } else {
1383             addReply(c,shared.syntaxerr);
1384             return;
1385         }
1386     }
1387 
1388     /* STREAMS option is mandatory. */
1389     if (streams_arg == 0) {
1390         addReply(c,shared.syntaxerr);
1391         return;
1392     }
1393 
1394     /* If the user specified XREADGROUP then it must also
1395      * provide the GROUP option. */
1396     if (xreadgroup && groupname == NULL) {
1397         addReplyError(c,"Missing GROUP option for XREADGROUP");
1398         return;
1399     }
1400 
1401     /* Parse the IDs and resolve the group name. */
1402     if (streams_count > STREAMID_STATIC_VECTOR_LEN)
1403         ids = zmalloc(sizeof(streamID)*streams_count);
1404     if (groupname) groups = zmalloc(sizeof(streamCG*)*streams_count);
1405 
1406     for (int i = streams_arg + streams_count; i < c->argc; i++) {
1407         /* Specifying "$" as last-known-id means that the client wants to be
1408          * served with just the messages that will arrive into the stream
1409          * starting from now. */
1410         int id_idx = i - streams_arg - streams_count;
1411         robj *key = c->argv[i-streams_count];
1412         robj *o = lookupKeyRead(c->db,key);
1413         if (o && checkType(c,o,OBJ_STREAM)) goto cleanup;
1414         streamCG *group = NULL;
1415 
1416         /* If a group was specified, than we need to be sure that the
1417          * key and group actually exist. */
1418         if (groupname) {
1419             if (o == NULL ||
1420                 (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL)
1421             {
1422                 addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer "
1423                                        "group '%s' in XREADGROUP with GROUP "
1424                                        "option",
1425                                     (char*)key->ptr,(char*)groupname->ptr);
1426                 goto cleanup;
1427             }
1428             groups[id_idx] = group;
1429         }
1430 
1431         if (strcmp(c->argv[i]->ptr,"$") == 0) {
1432             if (xreadgroup) {
1433                 addReplyError(c,"The $ ID is meaningless in the context of "
1434                                 "XREADGROUP: you want to read the history of "
1435                                 "this consumer by specifying a proper ID, or "
1436                                 "use the > ID to get new messages. The $ ID would "
1437                                 "just return an empty result set.");
1438                 goto cleanup;
1439             }
1440             if (o) {
1441                 stream *s = o->ptr;
1442                 ids[id_idx] = s->last_id;
1443             } else {
1444                 ids[id_idx].ms = 0;
1445                 ids[id_idx].seq = 0;
1446             }
1447             continue;
1448         } else if (strcmp(c->argv[i]->ptr,">") == 0) {
1449             if (!xreadgroup) {
1450                 addReplyError(c,"The > ID can be specified only when calling "
1451                                 "XREADGROUP using the GROUP <group> "
1452                                 "<consumer> option.");
1453                 goto cleanup;
1454             }
1455             /* We use just the maximum ID to signal this is a ">" ID, anyway
1456              * the code handling the blocking clients will have to update the
1457              * ID later in order to match the changing consumer group last ID. */
1458             ids[id_idx].ms = UINT64_MAX;
1459             ids[id_idx].seq = UINT64_MAX;
1460             continue;
1461         }
1462         if (streamParseStrictIDOrReply(c,c->argv[i],ids+id_idx,0) != C_OK)
1463             goto cleanup;
1464     }
1465 
1466     /* Try to serve the client synchronously. */
1467     size_t arraylen = 0;
1468     void *arraylen_ptr = NULL;
1469     for (int i = 0; i < streams_count; i++) {
1470         robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i]);
1471         if (o == NULL) continue;
1472         stream *s = o->ptr;
1473         streamID *gt = ids+i; /* ID must be greater than this. */
1474         int serve_synchronously = 0;
1475         int serve_history = 0; /* True for XREADGROUP with ID != ">". */
1476 
1477         /* Check if there are the conditions to serve the client
1478          * synchronously. */
1479         if (groups) {
1480             /* If the consumer is blocked on a group, we always serve it
1481              * synchronously (serving its local history) if the ID specified
1482              * was not the special ">" ID. */
1483             if (gt->ms != UINT64_MAX ||
1484                 gt->seq != UINT64_MAX)
1485             {
1486                 serve_synchronously = 1;
1487                 serve_history = 1;
1488             } else {
1489                 /* We also want to serve a consumer in a consumer group
1490                  * synchronously in case the group top item delivered is smaller
1491                  * than what the stream has inside. */
1492                 streamID *last = &groups[i]->last_id;
1493                 if (s->length && (streamCompareID(&s->last_id, last) > 0)) {
1494                     serve_synchronously = 1;
1495                     *gt = *last;
1496                 }
1497             }
1498         } else {
1499             /* For consumers without a group, we serve synchronously if we can
1500              * actually provide at least one item from the stream. */
1501             if (s->length && (streamCompareID(&s->last_id, gt) > 0)) {
1502                 serve_synchronously = 1;
1503             }
1504         }
1505 
1506         if (serve_synchronously) {
1507             arraylen++;
1508             if (arraylen == 1) arraylen_ptr = addDeferredMultiBulkLength(c);
1509             /* streamReplyWithRange() handles the 'start' ID as inclusive,
1510              * so start from the next ID, since we want only messages with
1511              * IDs greater than start. */
1512             streamID start = *gt;
1513             start.seq++; /* uint64_t can't overflow in this context. */
1514 
1515             /* Emit the two elements sub-array consisting of the name
1516              * of the stream and the data we extracted from it. */
1517             addReplyMultiBulkLen(c,2);
1518             addReplyBulk(c,c->argv[streams_arg+i]);
1519             streamConsumer *consumer = NULL;
1520             if (groups) consumer = streamLookupConsumer(groups[i],
1521                                                         consumername->ptr,1);
1522             streamPropInfo spi = {c->argv[i+streams_arg],groupname};
1523             int flags = 0;
1524             if (noack) flags |= STREAM_RWR_NOACK;
1525             if (serve_history) flags |= STREAM_RWR_HISTORY;
1526             streamReplyWithRange(c,s,&start,NULL,count,0,
1527                                  groups ? groups[i] : NULL,
1528                                  consumer, flags, &spi);
1529             if (groups) server.dirty++;
1530         }
1531     }
1532 
1533      /* We replied synchronously! Set the top array len and return to caller. */
1534     if (arraylen) {
1535         setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
1536         goto cleanup;
1537     }
1538 
1539     /* Block if needed. */
1540     if (timeout != -1) {
1541         /* If we are inside a MULTI/EXEC and the list is empty the only thing
1542          * we can do is treating it as a timeout (even with timeout 0). */
1543         if (c->flags & CLIENT_MULTI) {
1544             addReply(c,shared.nullmultibulk);
1545             goto cleanup;
1546         }
1547         blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count,
1548                      timeout, NULL, ids);
1549         /* If no COUNT is given and we block, set a relatively small count:
1550          * in case the ID provided is too low, we do not want the server to
1551          * block just to serve this client a huge stream of messages. */
1552         c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT;
1553 
1554         /* If this is a XREADGROUP + GROUP we need to remember for which
1555          * group and consumer name we are blocking, so later when one of the
1556          * keys receive more data, we can call streamReplyWithRange() passing
1557          * the right arguments. */
1558         if (groupname) {
1559             incrRefCount(groupname);
1560             incrRefCount(consumername);
1561             c->bpop.xread_group = groupname;
1562             c->bpop.xread_consumer = consumername;
1563             c->bpop.xread_group_noack = noack;
1564         } else {
1565             c->bpop.xread_group = NULL;
1566             c->bpop.xread_consumer = NULL;
1567         }
1568         goto cleanup;
1569     }
1570 
1571     /* No BLOCK option, nor any stream we can serve. Reply as with a
1572      * timeout happened. */
1573     addReply(c,shared.nullmultibulk);
1574     /* Continue to cleanup... */
1575 
1576 cleanup: /* Cleanup. */
1577 
1578     /* The command is propagated (in the READGROUP form) as a side effect
1579      * of calling lower level APIs. So stop any implicit propagation. */
1580     preventCommandPropagation(c);
1581     if (ids != static_ids) zfree(ids);
1582     zfree(groups);
1583 }
1584 
1585 /* -----------------------------------------------------------------------
1586  * Low level implementation of consumer groups
1587  * ----------------------------------------------------------------------- */
1588 
1589 /* Create a NACK entry setting the delivery count to 1 and the delivery
1590  * time to the current time. The NACK consumer will be set to the one
1591  * specified as argument of the function. */
streamCreateNACK(streamConsumer * consumer)1592 streamNACK *streamCreateNACK(streamConsumer *consumer) {
1593     streamNACK *nack = zmalloc(sizeof(*nack));
1594     nack->delivery_time = mstime();
1595     nack->delivery_count = 1;
1596     nack->consumer = consumer;
1597     return nack;
1598 }
1599 
1600 /* Free a NACK entry. */
streamFreeNACK(streamNACK * na)1601 void streamFreeNACK(streamNACK *na) {
1602     zfree(na);
1603 }
1604 
1605 /* Free a consumer and associated data structures. Note that this function
1606  * will not reassign the pending messages associated with this consumer
1607  * nor will delete them from the stream, so when this function is called
1608  * to delete a consumer, and not when the whole stream is destroyed, the caller
1609  * should do some work before. */
streamFreeConsumer(streamConsumer * sc)1610 void streamFreeConsumer(streamConsumer *sc) {
1611     raxFree(sc->pel); /* No value free callback: the PEL entries are shared
1612                          between the consumer and the main stream PEL. */
1613     sdsfree(sc->name);
1614     zfree(sc);
1615 }
1616 
1617 /* Create a new consumer group in the context of the stream 's', having the
1618  * specified name and last server ID. If a consumer group with the same name
1619  * already existed NULL is returned, otherwise the pointer to the consumer
1620  * group is returned. */
streamCreateCG(stream * s,char * name,size_t namelen,streamID * id)1621 streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id) {
1622     if (s->cgroups == NULL) s->cgroups = raxNew();
1623     if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound)
1624         return NULL;
1625 
1626     streamCG *cg = zmalloc(sizeof(*cg));
1627     cg->pel = raxNew();
1628     cg->consumers = raxNew();
1629     cg->last_id = *id;
1630     raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL);
1631     return cg;
1632 }
1633 
1634 /* Free a consumer group and all its associated data. */
streamFreeCG(streamCG * cg)1635 void streamFreeCG(streamCG *cg) {
1636     raxFreeWithCallback(cg->pel,(void(*)(void*))streamFreeNACK);
1637     raxFreeWithCallback(cg->consumers,(void(*)(void*))streamFreeConsumer);
1638     zfree(cg);
1639 }
1640 
1641 /* Lookup the consumer group in the specified stream and returns its
1642  * pointer, otherwise if there is no such group, NULL is returned. */
streamLookupCG(stream * s,sds groupname)1643 streamCG *streamLookupCG(stream *s, sds groupname) {
1644     if (s->cgroups == NULL) return NULL;
1645     streamCG *cg = raxFind(s->cgroups,(unsigned char*)groupname,
1646                            sdslen(groupname));
1647     return (cg == raxNotFound) ? NULL : cg;
1648 }
1649 
1650 /* Lookup the consumer with the specified name in the group 'cg': if the
1651  * consumer does not exist it is automatically created as a side effect
1652  * of calling this function, otherwise its last seen time is updated and
1653  * the existing consumer reference returned. */
streamLookupConsumer(streamCG * cg,sds name,int create)1654 streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) {
1655     streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
1656                                sdslen(name));
1657     if (consumer == raxNotFound) {
1658         if (!create) return NULL;
1659         consumer = zmalloc(sizeof(*consumer));
1660         consumer->name = sdsdup(name);
1661         consumer->pel = raxNew();
1662         raxInsert(cg->consumers,(unsigned char*)name,sdslen(name),
1663                   consumer,NULL);
1664     }
1665     consumer->seen_time = mstime();
1666     return consumer;
1667 }
1668 
1669 /* Delete the consumer specified in the consumer group 'cg'. The consumer
1670  * may have pending messages: they are removed from the PEL, and the number
1671  * of pending messages "lost" is returned. */
streamDelConsumer(streamCG * cg,sds name)1672 uint64_t streamDelConsumer(streamCG *cg, sds name) {
1673     streamConsumer *consumer = streamLookupConsumer(cg,name,0);
1674     if (consumer == NULL) return 0;
1675 
1676     uint64_t retval = raxSize(consumer->pel);
1677 
1678     /* Iterate all the consumer pending messages, deleting every corresponding
1679      * entry from the global entry. */
1680     raxIterator ri;
1681     raxStart(&ri,consumer->pel);
1682     raxSeek(&ri,"^",NULL,0);
1683     while(raxNext(&ri)) {
1684         streamNACK *nack = ri.data;
1685         raxRemove(cg->pel,ri.key,ri.key_len,NULL);
1686         streamFreeNACK(nack);
1687     }
1688     raxStop(&ri);
1689 
1690     /* Deallocate the consumer. */
1691     raxRemove(cg->consumers,(unsigned char*)name,sdslen(name),NULL);
1692     streamFreeConsumer(consumer);
1693     return retval;
1694 }
1695 
1696 /* -----------------------------------------------------------------------
1697  * Consumer groups commands
1698  * ----------------------------------------------------------------------- */
1699 
1700 /* XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM]
1701  * XGROUP SETID <key> <groupname> <id or $>
1702  * XGROUP DESTROY <key> <groupname>
1703  * XGROUP DELCONSUMER <key> <groupname> <consumername> */
xgroupCommand(client * c)1704 void xgroupCommand(client *c) {
1705     const char *help[] = {
1706 "CREATE      <key> <groupname> <id or $> [opt] -- Create a new consumer group.",
1707 "            option MKSTREAM: create the empty stream if it does not exist.",
1708 "SETID       <key> <groupname> <id or $>  -- Set the current group ID.",
1709 "DESTROY     <key> <groupname>            -- Remove the specified group.",
1710 "DELCONSUMER <key> <groupname> <consumer> -- Remove the specified consumer.",
1711 "HELP                                     -- Prints this help.",
1712 NULL
1713     };
1714     stream *s = NULL;
1715     sds grpname = NULL;
1716     streamCG *cg = NULL;
1717     char *opt = c->argv[1]->ptr; /* Subcommand name. */
1718     int mkstream = 0;
1719     robj *o;
1720 
1721     /* CREATE has an MKSTREAM option that creates the stream if it
1722      * does not exist. */
1723     if (c->argc == 6 && !strcasecmp(opt,"CREATE")) {
1724         if (strcasecmp(c->argv[5]->ptr,"MKSTREAM")) {
1725             addReplySubcommandSyntaxError(c);
1726             return;
1727         }
1728         mkstream = 1;
1729         grpname = c->argv[3]->ptr;
1730     }
1731 
1732     /* Everything but the "HELP" option requires a key and group name. */
1733     if (c->argc >= 4) {
1734         o = lookupKeyWrite(c->db,c->argv[2]);
1735         if (o) {
1736             if (checkType(c,o,OBJ_STREAM)) return;
1737             s = o->ptr;
1738         }
1739         grpname = c->argv[3]->ptr;
1740     }
1741 
1742     /* Check for missing key/group. */
1743     if (c->argc >= 4 && !mkstream) {
1744         /* At this point key must exist, or there is an error. */
1745         if (s == NULL) {
1746             addReplyError(c,
1747                 "The XGROUP subcommand requires the key to exist. "
1748                 "Note that for CREATE you may want to use the MKSTREAM "
1749                 "option to create an empty stream automatically.");
1750             return;
1751         }
1752 
1753         /* Certain subcommands require the group to exist. */
1754         if ((cg = streamLookupCG(s,grpname)) == NULL &&
1755             (!strcasecmp(opt,"SETID") ||
1756              !strcasecmp(opt,"DELCONSUMER")))
1757         {
1758             addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' "
1759                                    "for key name '%s'",
1760                                    (char*)grpname, (char*)c->argv[2]->ptr);
1761             return;
1762         }
1763     }
1764 
1765     /* Dispatch the different subcommands. */
1766     if (!strcasecmp(opt,"CREATE") && (c->argc == 5 || c->argc == 6)) {
1767         streamID id;
1768         if (!strcmp(c->argv[4]->ptr,"$")) {
1769             if (s) {
1770                 id = s->last_id;
1771             } else {
1772                 id.ms = 0;
1773                 id.seq = 0;
1774             }
1775         } else if (streamParseStrictIDOrReply(c,c->argv[4],&id,0) != C_OK) {
1776             return;
1777         }
1778 
1779         /* Handle the MKSTREAM option now that the command can no longer fail. */
1780         if (s == NULL) {
1781             serverAssert(mkstream);
1782             o = createStreamObject();
1783             dbAdd(c->db,c->argv[2],o);
1784             s = o->ptr;
1785         }
1786 
1787         streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id);
1788         if (cg) {
1789             addReply(c,shared.ok);
1790             server.dirty++;
1791             notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-create",
1792                                 c->argv[2],c->db->id);
1793         } else {
1794             addReplySds(c,
1795                 sdsnew("-BUSYGROUP Consumer Group name already exists\r\n"));
1796         }
1797     } else if (!strcasecmp(opt,"SETID") && c->argc == 5) {
1798         streamID id;
1799         if (!strcmp(c->argv[4]->ptr,"$")) {
1800             id = s->last_id;
1801         } else if (streamParseIDOrReply(c,c->argv[4],&id,0) != C_OK) {
1802             return;
1803         }
1804         cg->last_id = id;
1805         addReply(c,shared.ok);
1806         server.dirty++;
1807         notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-setid",c->argv[2],c->db->id);
1808     } else if (!strcasecmp(opt,"DESTROY") && c->argc == 4) {
1809         if (cg) {
1810             raxRemove(s->cgroups,(unsigned char*)grpname,sdslen(grpname),NULL);
1811             streamFreeCG(cg);
1812             addReply(c,shared.cone);
1813             server.dirty++;
1814             notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-destroy",
1815                                 c->argv[2],c->db->id);
1816         } else {
1817             addReply(c,shared.czero);
1818         }
1819     } else if (!strcasecmp(opt,"DELCONSUMER") && c->argc == 5) {
1820         /* Delete the consumer and returns the number of pending messages
1821          * that were yet associated with such a consumer. */
1822         long long pending = streamDelConsumer(cg,c->argv[4]->ptr);
1823         addReplyLongLong(c,pending);
1824         server.dirty++;
1825         notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-delconsumer",
1826                             c->argv[2],c->db->id);
1827     } else if (!strcasecmp(opt,"HELP")) {
1828         addReplyHelp(c, help);
1829     } else {
1830         addReplySubcommandSyntaxError(c);
1831     }
1832 }
1833 
1834 /* XSETID <stream> <groupname> <id>
1835  *
1836  * Set the internal "last ID" of a stream. */
xsetidCommand(client * c)1837 void xsetidCommand(client *c) {
1838     robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
1839     if (o == NULL || checkType(c,o,OBJ_STREAM)) return;
1840 
1841     stream *s = o->ptr;
1842     streamID id;
1843     if (streamParseStrictIDOrReply(c,c->argv[2],&id,0) != C_OK) return;
1844 
1845     /* If the stream has at least one item, we want to check that the user
1846      * is setting a last ID that is equal or greater than the current top
1847      * item, otherwise the fundamental ID monotonicity assumption is violated. */
1848     if (s->length > 0) {
1849         streamID maxid;
1850         streamIterator si;
1851         streamIteratorStart(&si,s,NULL,NULL,1);
1852         int64_t numfields;
1853         streamIteratorGetID(&si,&maxid,&numfields);
1854         streamIteratorStop(&si);
1855 
1856         if (streamCompareID(&id,&maxid) < 0) {
1857             addReplyError(c,"The ID specified in XSETID is smaller than the "
1858                             "target stream top item");
1859             return;
1860         }
1861     }
1862     s->last_id = id;
1863     addReply(c,shared.ok);
1864     server.dirty++;
1865     notifyKeyspaceEvent(NOTIFY_STREAM,"xsetid",c->argv[1],c->db->id);
1866 }
1867 
1868 /* XACK <key> <group> <id> <id> ... <id>
1869  *
1870  * Acknowledge a message as processed. In practical terms we just check the
1871  * pendine entries list (PEL) of the group, and delete the PEL entry both from
1872  * the group and the consumer (pending messages are referenced in both places).
1873  *
1874  * Return value of the command is the number of messages successfully
1875  * acknowledged, that is, the IDs we were actually able to resolve in the PEL.
1876  */
xackCommand(client * c)1877 void xackCommand(client *c) {
1878     streamCG *group = NULL;
1879     robj *o = lookupKeyRead(c->db,c->argv[1]);
1880     if (o) {
1881         if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */
1882         group = streamLookupCG(o->ptr,c->argv[2]->ptr);
1883     }
1884 
1885     /* No key or group? Nothing to ack. */
1886     if (o == NULL || group == NULL) {
1887         addReply(c,shared.czero);
1888         return;
1889     }
1890 
1891     int acknowledged = 0;
1892     for (int j = 3; j < c->argc; j++) {
1893         streamID id;
1894         unsigned char buf[sizeof(streamID)];
1895         if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
1896         streamEncodeID(buf,&id);
1897 
1898         /* Lookup the ID in the group PEL: it will have a reference to the
1899          * NACK structure that will have a reference to the consumer, so that
1900          * we are able to remove the entry from both PELs. */
1901         streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));
1902         if (nack != raxNotFound) {
1903             raxRemove(group->pel,buf,sizeof(buf),NULL);
1904             raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
1905             streamFreeNACK(nack);
1906             acknowledged++;
1907             server.dirty++;
1908         }
1909     }
1910     addReplyLongLong(c,acknowledged);
1911 }
1912 
1913 /* XPENDING <key> <group> [<start> <stop> <count> [<consumer>]]
1914  *
1915  * If start and stop are omitted, the command just outputs information about
1916  * the amount of pending messages for the key/group pair, together with
1917  * the minimum and maxium ID of pending messages.
1918  *
1919  * If start and stop are provided instead, the pending messages are returned
1920  * with informations about the current owner, number of deliveries and last
1921  * delivery time and so forth. */
xpendingCommand(client * c)1922 void xpendingCommand(client *c) {
1923     int justinfo = c->argc == 3; /* Without the range just outputs general
1924                                     informations about the PEL. */
1925     robj *key = c->argv[1];
1926     robj *groupname = c->argv[2];
1927     robj *consumername = (c->argc == 7) ? c->argv[6] : NULL;
1928     streamID startid, endid;
1929     long long count;
1930 
1931     /* Start and stop, and the consumer, can be omitted. */
1932     if (c->argc != 3 && c->argc != 6 && c->argc != 7) {
1933         addReply(c,shared.syntaxerr);
1934         return;
1935     }
1936 
1937     /* Parse start/end/count arguments ASAP if needed, in order to report
1938      * syntax errors before any other error. */
1939     if (c->argc >= 6) {
1940         if (getLongLongFromObjectOrReply(c,c->argv[5],&count,NULL) == C_ERR)
1941             return;
1942         if (count < 0) count = 0;
1943         if (streamParseIDOrReply(c,c->argv[3],&startid,0) == C_ERR)
1944             return;
1945         if (streamParseIDOrReply(c,c->argv[4],&endid,UINT64_MAX) == C_ERR)
1946             return;
1947     }
1948 
1949     /* Lookup the key and the group inside the stream. */
1950     robj *o = lookupKeyRead(c->db,c->argv[1]);
1951     streamCG *group;
1952 
1953     if (o && checkType(c,o,OBJ_STREAM)) return;
1954     if (o == NULL ||
1955         (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL)
1956     {
1957         addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer "
1958                                "group '%s'",
1959                                (char*)key->ptr,(char*)groupname->ptr);
1960         return;
1961     }
1962 
1963     /* XPENDING <key> <group> variant. */
1964     if (justinfo) {
1965         addReplyMultiBulkLen(c,4);
1966         /* Total number of messages in the PEL. */
1967         addReplyLongLong(c,raxSize(group->pel));
1968         /* First and last IDs. */
1969         if (raxSize(group->pel) == 0) {
1970             addReply(c,shared.nullbulk); /* Start. */
1971             addReply(c,shared.nullbulk); /* End. */
1972             addReply(c,shared.nullmultibulk); /* Clients. */
1973         } else {
1974             /* Start. */
1975             raxIterator ri;
1976             raxStart(&ri,group->pel);
1977             raxSeek(&ri,"^",NULL,0);
1978             raxNext(&ri);
1979             streamDecodeID(ri.key,&startid);
1980             addReplyStreamID(c,&startid);
1981 
1982             /* End. */
1983             raxSeek(&ri,"$",NULL,0);
1984             raxNext(&ri);
1985             streamDecodeID(ri.key,&endid);
1986             addReplyStreamID(c,&endid);
1987             raxStop(&ri);
1988 
1989             /* Consumers with pending messages. */
1990             raxStart(&ri,group->consumers);
1991             raxSeek(&ri,"^",NULL,0);
1992             void *arraylen_ptr = addDeferredMultiBulkLength(c);
1993             size_t arraylen = 0;
1994             while(raxNext(&ri)) {
1995                 streamConsumer *consumer = ri.data;
1996                 if (raxSize(consumer->pel) == 0) continue;
1997                 addReplyMultiBulkLen(c,2);
1998                 addReplyBulkCBuffer(c,ri.key,ri.key_len);
1999                 addReplyBulkLongLong(c,raxSize(consumer->pel));
2000                 arraylen++;
2001             }
2002             setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
2003             raxStop(&ri);
2004         }
2005     }
2006     /* XPENDING <key> <group> <start> <stop> <count> [<consumer>] variant. */
2007     else {
2008         streamConsumer *consumer = consumername ?
2009                                 streamLookupConsumer(group,consumername->ptr,0):
2010                                 NULL;
2011 
2012         /* If a consumer name was mentioned but it does not exist, we can
2013          * just return an empty array. */
2014         if (consumername && consumer == NULL) {
2015             addReplyMultiBulkLen(c,0);
2016             return;
2017         }
2018 
2019         rax *pel = consumer ? consumer->pel : group->pel;
2020         unsigned char startkey[sizeof(streamID)];
2021         unsigned char endkey[sizeof(streamID)];
2022         raxIterator ri;
2023         mstime_t now = mstime();
2024 
2025         streamEncodeID(startkey,&startid);
2026         streamEncodeID(endkey,&endid);
2027         raxStart(&ri,pel);
2028         raxSeek(&ri,">=",startkey,sizeof(startkey));
2029         void *arraylen_ptr = addDeferredMultiBulkLength(c);
2030         size_t arraylen = 0;
2031 
2032         while(count && raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) {
2033             streamNACK *nack = ri.data;
2034 
2035             arraylen++;
2036             count--;
2037             addReplyMultiBulkLen(c,4);
2038 
2039             /* Entry ID. */
2040             streamID id;
2041             streamDecodeID(ri.key,&id);
2042             addReplyStreamID(c,&id);
2043 
2044             /* Consumer name. */
2045             addReplyBulkCBuffer(c,nack->consumer->name,
2046                                 sdslen(nack->consumer->name));
2047 
2048             /* Milliseconds elapsed since last delivery. */
2049             mstime_t elapsed = now - nack->delivery_time;
2050             if (elapsed < 0) elapsed = 0;
2051             addReplyLongLong(c,elapsed);
2052 
2053             /* Number of deliveries. */
2054             addReplyLongLong(c,nack->delivery_count);
2055         }
2056         raxStop(&ri);
2057         setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
2058     }
2059 }
2060 
2061 /* XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2>
2062  *        [IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>]
2063  *        [FORCE] [JUSTID]
2064  *
2065  * Gets ownership of one or multiple messages in the Pending Entries List
2066  * of a given stream consumer group.
2067  *
2068  * If the message ID (among the specified ones) exists, and its idle
2069  * time greater or equal to <min-idle-time>, then the message new owner
2070  * becomes the specified <consumer>. If the minimum idle time specified
2071  * is zero, messages are claimed regardless of their idle time.
2072  *
2073  * All the messages that cannot be found inside the pending entries list
2074  * are ignored, but in case the FORCE option is used. In that case we
2075  * create the NACK (representing a not yet acknowledged message) entry in
2076  * the consumer group PEL.
2077  *
2078  * This command creates the consumer as side effect if it does not yet
2079  * exists. Moreover the command reset the idle time of the message to 0,
2080  * even if by using the IDLE or TIME options, the user can control the
2081  * new idle time.
2082  *
2083  * The options at the end can be used in order to specify more attributes
2084  * to set in the representation of the pending message:
2085  *
2086  * 1. IDLE <ms>:
2087  *      Set the idle time (last time it was delivered) of the message.
2088  *      If IDLE is not specified, an IDLE of 0 is assumed, that is,
2089  *      the time count is reset because the message has now a new
2090  *      owner trying to process it.
2091  *
2092  * 2. TIME <ms-unix-time>:
2093  *      This is the same as IDLE but instead of a relative amount of
2094  *      milliseconds, it sets the idle time to a specific unix time
2095  *      (in milliseconds). This is useful in order to rewrite the AOF
2096  *      file generating XCLAIM commands.
2097  *
2098  * 3. RETRYCOUNT <count>:
2099  *      Set the retry counter to the specified value. This counter is
2100  *      incremented every time a message is delivered again. Normally
2101  *      XCLAIM does not alter this counter, which is just served to clients
2102  *      when the XPENDING command is called: this way clients can detect
2103  *      anomalies, like messages that are never processed for some reason
2104  *      after a big number of delivery attempts.
2105  *
2106  * 4. FORCE:
2107  *      Creates the pending message entry in the PEL even if certain
2108  *      specified IDs are not already in the PEL assigned to a different
2109  *      client. However the message must be exist in the stream, otherwise
2110  *      the IDs of non existing messages are ignored.
2111  *
2112  * 5. JUSTID:
2113  *      Return just an array of IDs of messages successfully claimed,
2114  *      without returning the actual message.
2115  *
2116  * 6. LASTID <id>:
2117  *      Update the consumer group last ID with the specified ID if the
2118  *      current last ID is smaller than the provided one.
2119  *      This is used for replication / AOF, so that when we read from a
2120  *      consumer group, the XCLAIM that gets propagated to give ownership
2121  *      to the consumer, is also used in order to update the group current
2122  *      ID.
2123  *
2124  * The command returns an array of messages that the user
2125  * successfully claimed, so that the caller is able to understand
2126  * what messages it is now in charge of. */
xclaimCommand(client * c)2127 void xclaimCommand(client *c) {
2128     streamCG *group = NULL;
2129     robj *o = lookupKeyRead(c->db,c->argv[1]);
2130     long long minidle; /* Minimum idle time argument. */
2131     long long retrycount = -1;   /* -1 means RETRYCOUNT option not given. */
2132     mstime_t deliverytime = -1;  /* -1 means IDLE/TIME options not given. */
2133     int force = 0;
2134     int justid = 0;
2135 
2136     if (o) {
2137         if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */
2138         group = streamLookupCG(o->ptr,c->argv[2]->ptr);
2139     }
2140 
2141     /* No key or group? Send an error given that the group creation
2142      * is mandatory. */
2143     if (o == NULL || group == NULL) {
2144         addReplyErrorFormat(c,"-NOGROUP No such key '%s' or "
2145                               "consumer group '%s'", (char*)c->argv[1]->ptr,
2146                               (char*)c->argv[2]->ptr);
2147         return;
2148     }
2149 
2150     if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,
2151         "Invalid min-idle-time argument for XCLAIM")
2152         != C_OK) return;
2153     if (minidle < 0) minidle = 0;
2154 
2155     /* Start parsing the IDs, so that we abort ASAP if there is a syntax
2156      * error: the return value of this command cannot be an error in case
2157      * the client successfully claimed some message, so it should be
2158      * executed in a "all or nothing" fashion. */
2159     int j;
2160     for (j = 5; j < c->argc; j++) {
2161         streamID id;
2162         if (streamParseStrictIDOrReply(NULL,c->argv[j],&id,0) != C_OK) break;
2163     }
2164     int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */
2165 
2166     /* If we stopped because some IDs cannot be parsed, perhaps they
2167      * are trailing options. */
2168     mstime_t now = mstime();
2169     streamID last_id = {0,0};
2170     int propagate_last_id = 0;
2171     for (; j < c->argc; j++) {
2172         int moreargs = (c->argc-1) - j; /* Number of additional arguments. */
2173         char *opt = c->argv[j]->ptr;
2174         if (!strcasecmp(opt,"FORCE")) {
2175             force = 1;
2176         } else if (!strcasecmp(opt,"JUSTID")) {
2177             justid = 1;
2178         } else if (!strcasecmp(opt,"IDLE") && moreargs) {
2179             j++;
2180             if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,
2181                 "Invalid IDLE option argument for XCLAIM")
2182                 != C_OK) return;
2183             deliverytime = now - deliverytime;
2184         } else if (!strcasecmp(opt,"TIME") && moreargs) {
2185             j++;
2186             if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,
2187                 "Invalid TIME option argument for XCLAIM")
2188                 != C_OK) return;
2189         } else if (!strcasecmp(opt,"RETRYCOUNT") && moreargs) {
2190             j++;
2191             if (getLongLongFromObjectOrReply(c,c->argv[j],&retrycount,
2192                 "Invalid RETRYCOUNT option argument for XCLAIM")
2193                 != C_OK) return;
2194         } else if (!strcasecmp(opt,"LASTID") && moreargs) {
2195             j++;
2196             if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0) != C_OK) return;
2197         } else {
2198             addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt);
2199             return;
2200         }
2201     }
2202 
2203     if (streamCompareID(&last_id,&group->last_id) > 0) {
2204         group->last_id = last_id;
2205         propagate_last_id = 1;
2206     }
2207 
2208     if (deliverytime != -1) {
2209         /* If a delivery time was passed, either with IDLE or TIME, we
2210          * do some sanity check on it, and set the deliverytime to now
2211          * (which is a sane choice usually) if the value is bogus.
2212          * To raise an error here is not wise because clients may compute
2213          * the idle time doing some math starting from their local time,
2214          * and this is not a good excuse to fail in case, for instance,
2215          * the computer time is a bit in the future from our POV. */
2216         if (deliverytime < 0 || deliverytime > now) deliverytime = now;
2217     } else {
2218         /* If no IDLE/TIME option was passed, we want the last delivery
2219          * time to be now, so that the idle time of the message will be
2220          * zero. */
2221         deliverytime = now;
2222     }
2223 
2224     /* Do the actual claiming. */
2225     streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr,1);
2226     void *arraylenptr = addDeferredMultiBulkLength(c);
2227     size_t arraylen = 0;
2228     for (int j = 5; j <= last_id_arg; j++) {
2229         streamID id;
2230         unsigned char buf[sizeof(streamID)];
2231         if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK)
2232             serverPanic("StreamID invalid after check. Should not be possible.");
2233         streamEncodeID(buf,&id);
2234 
2235         /* Lookup the ID in the group PEL. */
2236         streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));
2237 
2238         /* If FORCE is passed, let's check if at least the entry
2239          * exists in the Stream. In such case, we'll crate a new
2240          * entry in the PEL from scratch, so that XCLAIM can also
2241          * be used to create entries in the PEL. Useful for AOF
2242          * and replication of consumer groups. */
2243         if (force && nack == raxNotFound) {
2244             streamIterator myiterator;
2245             streamIteratorStart(&myiterator,o->ptr,&id,&id,0);
2246             int64_t numfields;
2247             int found = 0;
2248             streamID item_id;
2249             if (streamIteratorGetID(&myiterator,&item_id,&numfields)) found = 1;
2250             streamIteratorStop(&myiterator);
2251 
2252             /* Item must exist for us to create a NACK for it. */
2253             if (!found) continue;
2254 
2255             /* Create the NACK. */
2256             nack = streamCreateNACK(NULL);
2257             raxInsert(group->pel,buf,sizeof(buf),nack,NULL);
2258         }
2259 
2260         if (nack != raxNotFound) {
2261             /* We need to check if the minimum idle time requested
2262              * by the caller is satisfied by this entry.
2263              *
2264              * Note that the nack could be created by FORCE, in this
2265              * case there was no pre-existing entry and minidle should
2266              * be ignored, but in that case nick->consumer is NULL. */
2267             if (nack->consumer && minidle) {
2268                 mstime_t this_idle = now - nack->delivery_time;
2269                 if (this_idle < minidle) continue;
2270             }
2271             /* Remove the entry from the old consumer.
2272              * Note that nack->consumer is NULL if we created the
2273              * NACK above because of the FORCE option. */
2274             if (nack->consumer)
2275                 raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
2276             /* Update the consumer and idle time. */
2277             nack->consumer = consumer;
2278             nack->delivery_time = deliverytime;
2279             /* Set the delivery attempts counter if given, otherwise
2280              * autoincrement unless JUSTID option provided */
2281             if (retrycount >= 0) {
2282                 nack->delivery_count = retrycount;
2283             } else if (!justid) {
2284                 nack->delivery_count++;
2285             }
2286             /* Add the entry in the new consumer local PEL. */
2287             raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
2288             /* Send the reply for this entry. */
2289             if (justid) {
2290                 addReplyStreamID(c,&id);
2291             } else {
2292                 size_t emitted = streamReplyWithRange(c,o->ptr,&id,&id,1,0,
2293                                     NULL,NULL,STREAM_RWR_RAWENTRIES,NULL);
2294                 if (!emitted) addReply(c,shared.nullbulk);
2295             }
2296             arraylen++;
2297 
2298             /* Propagate this change. */
2299             streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack);
2300             propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */
2301             server.dirty++;
2302         }
2303     }
2304     if (propagate_last_id) {
2305         streamPropagateGroupID(c,c->argv[1],group,c->argv[2]);
2306         server.dirty++;
2307     }
2308     setDeferredMultiBulkLength(c,arraylenptr,arraylen);
2309     preventCommandPropagation(c);
2310 }
2311 
2312 
2313 /* XDEL <key> [<ID1> <ID2> ... <IDN>]
2314  *
2315  * Removes the specified entries from the stream. Returns the number
2316  * of items actually deleted, that may be different from the number
2317  * of IDs passed in case certain IDs do not exist. */
xdelCommand(client * c)2318 void xdelCommand(client *c) {
2319     robj *o;
2320 
2321     if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL
2322         || checkType(c,o,OBJ_STREAM)) return;
2323     stream *s = o->ptr;
2324 
2325     /* We need to sanity check the IDs passed to start. Even if not
2326      * a big issue, it is not great that the command is only partially
2327      * executed because at some point an invalid ID is parsed. */
2328     streamID id;
2329     for (int j = 2; j < c->argc; j++) {
2330         if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
2331     }
2332 
2333     /* Actually apply the command. */
2334     int deleted = 0;
2335     for (int j = 2; j < c->argc; j++) {
2336         streamParseStrictIDOrReply(c,c->argv[j],&id,0); /* Retval already checked. */
2337         deleted += streamDeleteItem(s,&id);
2338     }
2339 
2340     /* Propagate the write if needed. */
2341     if (deleted) {
2342         signalModifiedKey(c->db,c->argv[1]);
2343         notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id);
2344         server.dirty += deleted;
2345     }
2346     addReplyLongLong(c,deleted);
2347 }
2348 
2349 /* General form: XTRIM <key> [... options ...]
2350  *
2351  * List of options:
2352  *
2353  * MAXLEN [~|=] <count>     -- Trim so that the stream will be capped at
2354  *                             the specified length. Use ~ before the
2355  *                             count in order to demand approximated trimming
2356  *                             (like XADD MAXLEN option).
2357  */
2358 
2359 #define TRIM_STRATEGY_NONE 0
2360 #define TRIM_STRATEGY_MAXLEN 1
xtrimCommand(client * c)2361 void xtrimCommand(client *c) {
2362     robj *o;
2363 
2364     /* If the key does not exist, we are ok returning zero, that is, the
2365      * number of elements removed from the stream. */
2366     if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL
2367         || checkType(c,o,OBJ_STREAM)) return;
2368     stream *s = o->ptr;
2369 
2370     /* Argument parsing. */
2371     int trim_strategy = TRIM_STRATEGY_NONE;
2372     long long maxlen = -1;  /* If left to -1 no trimming is performed. */
2373     int approx_maxlen = 0;  /* If 1 only delete whole radix tree nodes, so
2374                                the maxium length is not applied verbatim. */
2375     int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */
2376 
2377     /* Parse options. */
2378     int i = 2; /* Start of options. */
2379     for (; i < c->argc; i++) {
2380         int moreargs = (c->argc-1) - i; /* Number of additional arguments. */
2381         char *opt = c->argv[i]->ptr;
2382         if (!strcasecmp(opt,"maxlen") && moreargs) {
2383             approx_maxlen = 0;
2384             trim_strategy = TRIM_STRATEGY_MAXLEN;
2385             char *next = c->argv[i+1]->ptr;
2386             /* Check for the form MAXLEN ~ <count>. */
2387             if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
2388                 approx_maxlen = 1;
2389                 i++;
2390             } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
2391                 i++;
2392             }
2393             if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)
2394                 != C_OK) return;
2395 
2396             if (maxlen < 0) {
2397                 addReplyError(c,"The MAXLEN argument must be >= 0.");
2398                 return;
2399             }
2400             i++;
2401             maxlen_arg_idx = i;
2402         } else {
2403             addReply(c,shared.syntaxerr);
2404             return;
2405         }
2406     }
2407 
2408     /* Perform the trimming. */
2409     int64_t deleted = 0;
2410     if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
2411         deleted = streamTrimByLength(s,maxlen,approx_maxlen);
2412     } else {
2413         addReplyError(c,"XTRIM called without an option to trim the stream");
2414         return;
2415     }
2416 
2417     /* Propagate the write if needed. */
2418     if (deleted) {
2419         signalModifiedKey(c->db,c->argv[1]);
2420         notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
2421         server.dirty += deleted;
2422         if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx);
2423     }
2424     addReplyLongLong(c,deleted);
2425 }
2426 
2427 /* XINFO CONSUMERS <key> <group>
2428  * XINFO GROUPS <key>
2429  * XINFO STREAM <key>
2430  * XINFO HELP. */
xinfoCommand(client * c)2431 void xinfoCommand(client *c) {
2432     const char *help[] = {
2433 "CONSUMERS <key> <groupname>  -- Show consumer groups of group <groupname>.",
2434 "GROUPS <key>                 -- Show the stream consumer groups.",
2435 "STREAM <key>                 -- Show information about the stream.",
2436 "HELP                         -- Print this help.",
2437 NULL
2438     };
2439     stream *s = NULL;
2440     char *opt;
2441     robj *key;
2442 
2443     /* HELP is special. Handle it ASAP. */
2444     if (!strcasecmp(c->argv[1]->ptr,"HELP")) {
2445         addReplyHelp(c, help);
2446         return;
2447     } else if (c->argc < 3) {
2448         addReplyError(c,"syntax error, try 'XINFO HELP'");
2449         return;
2450     }
2451 
2452     /* With the exception of HELP handled before any other sub commands, all
2453      * the ones are in the form of "<subcommand> <key>". */
2454     opt = c->argv[1]->ptr;
2455     key = c->argv[2];
2456 
2457     /* Lookup the key now, this is common for all the subcommands but HELP. */
2458     robj *o = lookupKeyWriteOrReply(c,key,shared.nokeyerr);
2459     if (o == NULL || checkType(c,o,OBJ_STREAM)) return;
2460     s = o->ptr;
2461 
2462     /* Dispatch the different subcommands. */
2463     if (!strcasecmp(opt,"CONSUMERS") && c->argc == 4) {
2464         /* XINFO CONSUMERS <key> <group>. */
2465         streamCG *cg = streamLookupCG(s,c->argv[3]->ptr);
2466         if (cg == NULL) {
2467             addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' "
2468                                    "for key name '%s'",
2469                                    (char*)c->argv[3]->ptr, (char*)key->ptr);
2470             return;
2471         }
2472 
2473         addReplyMultiBulkLen(c,raxSize(cg->consumers));
2474         raxIterator ri;
2475         raxStart(&ri,cg->consumers);
2476         raxSeek(&ri,"^",NULL,0);
2477         mstime_t now = mstime();
2478         while(raxNext(&ri)) {
2479             streamConsumer *consumer = ri.data;
2480             mstime_t idle = now - consumer->seen_time;
2481             if (idle < 0) idle = 0;
2482 
2483             addReplyMultiBulkLen(c,6);
2484             addReplyBulkCString(c,"name");
2485             addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name));
2486             addReplyBulkCString(c,"pending");
2487             addReplyLongLong(c,raxSize(consumer->pel));
2488             addReplyBulkCString(c,"idle");
2489             addReplyLongLong(c,idle);
2490         }
2491         raxStop(&ri);
2492     } else if (!strcasecmp(opt,"GROUPS") && c->argc == 3) {
2493         /* XINFO GROUPS <key>. */
2494         if (s->cgroups == NULL) {
2495             addReplyMultiBulkLen(c,0);
2496             return;
2497         }
2498 
2499         addReplyMultiBulkLen(c,raxSize(s->cgroups));
2500         raxIterator ri;
2501         raxStart(&ri,s->cgroups);
2502         raxSeek(&ri,"^",NULL,0);
2503         while(raxNext(&ri)) {
2504             streamCG *cg = ri.data;
2505             addReplyMultiBulkLen(c,8);
2506             addReplyBulkCString(c,"name");
2507             addReplyBulkCBuffer(c,ri.key,ri.key_len);
2508             addReplyBulkCString(c,"consumers");
2509             addReplyLongLong(c,raxSize(cg->consumers));
2510             addReplyBulkCString(c,"pending");
2511             addReplyLongLong(c,raxSize(cg->pel));
2512             addReplyBulkCString(c,"last-delivered-id");
2513             addReplyStreamID(c,&cg->last_id);
2514         }
2515         raxStop(&ri);
2516     } else if (!strcasecmp(opt,"STREAM") && c->argc == 3) {
2517         /* XINFO STREAM <key> (or the alias XINFO <key>). */
2518         addReplyMultiBulkLen(c,14);
2519         addReplyBulkCString(c,"length");
2520         addReplyLongLong(c,s->length);
2521         addReplyBulkCString(c,"radix-tree-keys");
2522         addReplyLongLong(c,raxSize(s->rax));
2523         addReplyBulkCString(c,"radix-tree-nodes");
2524         addReplyLongLong(c,s->rax->numnodes);
2525         addReplyBulkCString(c,"groups");
2526         addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0);
2527         addReplyBulkCString(c,"last-generated-id");
2528         addReplyStreamID(c,&s->last_id);
2529 
2530         /* To emit the first/last entry we us the streamReplyWithRange()
2531          * API. */
2532         int count;
2533         streamID start, end;
2534         start.ms = start.seq = 0;
2535         end.ms = end.seq = UINT64_MAX;
2536         addReplyBulkCString(c,"first-entry");
2537         count = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL,
2538                                      STREAM_RWR_RAWENTRIES,NULL);
2539         if (!count) addReply(c,shared.nullbulk);
2540         addReplyBulkCString(c,"last-entry");
2541         count = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL,
2542                                      STREAM_RWR_RAWENTRIES,NULL);
2543         if (!count) addReply(c,shared.nullbulk);
2544     } else {
2545         addReplySubcommandSyntaxError(c);
2546     }
2547 }
2548 
2549