1 /* 2 * Copyright (c) 2017, Salvatore Sanfilippo <antirez at gmail dot com> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * * Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * * Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * * Neither the name of Redis nor the names of its contributors may be used 14 * to endorse or promote products derived from this software without 15 * specific prior written permission. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27 * POSSIBILITY OF SUCH DAMAGE. 28 */ 29 30 #include "server.h" 31 #include "endianconv.h" 32 #include "stream.h" 33 34 #define STREAM_BYTES_PER_LISTPACK 2048 35 36 /* Every stream item inside the listpack, has a flags field that is used to 37 * mark the entry as deleted, or having the same field as the "master" 38 * entry at the start of the listpack> */ 39 #define STREAM_ITEM_FLAG_NONE 0 /* No special flags. */ 40 #define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is delted. Skip it. */ 41 #define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */ 42 43 void streamFreeCG(streamCG *cg); 44 void streamFreeNACK(streamNACK *na); 45 size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer); 46 47 /* ----------------------------------------------------------------------- 48 * Low level stream encoding: a radix tree of listpacks. 49 * ----------------------------------------------------------------------- */ 50 51 /* Create a new stream data structure. */ 52 stream *streamNew(void) { 53 stream *s = zmalloc(sizeof(*s)); 54 s->rax = raxNew(); 55 s->length = 0; 56 s->last_id.ms = 0; 57 s->last_id.seq = 0; 58 s->cgroups = NULL; /* Created on demand to save memory when not used. */ 59 return s; 60 } 61 62 /* Free a stream, including the listpacks stored inside the radix tree. */ 63 void freeStream(stream *s) { 64 raxFreeWithCallback(s->rax,(void(*)(void*))lpFree); 65 if (s->cgroups) 66 raxFreeWithCallback(s->cgroups,(void(*)(void*))streamFreeCG); 67 zfree(s); 68 } 69 70 /* Generate the next stream item ID given the previous one. If the current 71 * milliseconds Unix time is greater than the previous one, just use this 72 * as time part and start with sequence part of zero. Otherwise we use the 73 * previous time (and never go backward) and increment the sequence. */ 74 void streamNextID(streamID *last_id, streamID *new_id) { 75 uint64_t ms = mstime(); 76 if (ms > last_id->ms) { 77 new_id->ms = ms; 78 new_id->seq = 0; 79 } else { 80 new_id->ms = last_id->ms; 81 new_id->seq = last_id->seq+1; 82 } 83 } 84 85 /* This is just a wrapper for lpAppend() to directly use a 64 bit integer 86 * instead of a string. */ 87 unsigned char *lpAppendInteger(unsigned char *lp, int64_t value) { 88 char buf[LONG_STR_SIZE]; 89 int slen = ll2string(buf,sizeof(buf),value); 90 return lpAppend(lp,(unsigned char*)buf,slen); 91 } 92 93 /* This is just a wrapper for lpReplace() to directly use a 64 bit integer 94 * instead of a string to replace the current element. The function returns 95 * the new listpack as return value, and also updates the current cursor 96 * by updating '*pos'. */ 97 unsigned char *lpReplaceInteger(unsigned char *lp, unsigned char **pos, int64_t value) { 98 char buf[LONG_STR_SIZE]; 99 int slen = ll2string(buf,sizeof(buf),value); 100 return lpInsert(lp, (unsigned char*)buf, slen, *pos, LP_REPLACE, pos); 101 } 102 103 /* This is a wrapper function for lpGet() to directly get an integer value 104 * from the listpack (that may store numbers as a string), converting 105 * the string if needed. */ 106 int64_t lpGetInteger(unsigned char *ele) { 107 int64_t v; 108 unsigned char *e = lpGet(ele,&v,NULL); 109 if (e == NULL) return v; 110 /* The following code path should never be used for how listpacks work: 111 * they should always be able to store an int64_t value in integer 112 * encoded form. However the implementation may change. */ 113 long long ll; 114 int retval = string2ll((char*)e,v,&ll); 115 serverAssert(retval != 0); 116 v = ll; 117 return v; 118 } 119 120 /* Debugging function to log the full content of a listpack. Useful 121 * for development and debugging. */ 122 void streamLogListpackContent(unsigned char *lp) { 123 unsigned char *p = lpFirst(lp); 124 while(p) { 125 unsigned char buf[LP_INTBUF_SIZE]; 126 int64_t v; 127 unsigned char *ele = lpGet(p,&v,buf); 128 serverLog(LL_WARNING,"- [%d] '%.*s'", (int)v, (int)v, ele); 129 p = lpNext(lp,p); 130 } 131 } 132 133 /* Convert the specified stream entry ID as a 128 bit big endian number, so 134 * that the IDs can be sorted lexicographically. */ 135 void streamEncodeID(void *buf, streamID *id) { 136 uint64_t e[2]; 137 e[0] = htonu64(id->ms); 138 e[1] = htonu64(id->seq); 139 memcpy(buf,e,sizeof(e)); 140 } 141 142 /* This is the reverse of streamEncodeID(): the decoded ID will be stored 143 * in the 'id' structure passed by reference. The buffer 'buf' must point 144 * to a 128 bit big-endian encoded ID. */ 145 void streamDecodeID(void *buf, streamID *id) { 146 uint64_t e[2]; 147 memcpy(e,buf,sizeof(e)); 148 id->ms = ntohu64(e[0]); 149 id->seq = ntohu64(e[1]); 150 } 151 152 /* Compare two stream IDs. Return -1 if a < b, 0 if a == b, 1 if a > b. */ 153 int streamCompareID(streamID *a, streamID *b) { 154 if (a->ms > b->ms) return 1; 155 else if (a->ms < b->ms) return -1; 156 /* The ms part is the same. Check the sequence part. */ 157 else if (a->seq > b->seq) return 1; 158 else if (a->seq < b->seq) return -1; 159 /* Everything is the same: IDs are equal. */ 160 return 0; 161 } 162 163 /* Adds a new item into the stream 's' having the specified number of 164 * field-value pairs as specified in 'numfields' and stored into 'argv'. 165 * Returns the new entry ID populating the 'added_id' structure. 166 * 167 * If 'use_id' is not NULL, the ID is not auto-generated by the function, 168 * but instead the passed ID is uesd to add the new entry. In this case 169 * adding the entry may fail as specified later in this comment. 170 * 171 * The function returns C_OK if the item was added, this is always true 172 * if the ID was generated by the function. However the function may return 173 * C_ERR if an ID was given via 'use_id', but adding it failed since the 174 * current top ID is greater or equal. */ 175 int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id) { 176 /* If an ID was given, check that it's greater than the last entry ID 177 * or return an error. */ 178 if (use_id && streamCompareID(use_id,&s->last_id) <= 0) return C_ERR; 179 180 /* Add the new entry. */ 181 raxIterator ri; 182 raxStart(&ri,s->rax); 183 raxSeek(&ri,"$",NULL,0); 184 185 size_t lp_bytes = 0; /* Total bytes in the tail listpack. */ 186 unsigned char *lp = NULL; /* Tail listpack pointer. */ 187 188 /* Get a reference to the tail node listpack. */ 189 if (raxNext(&ri)) { 190 lp = ri.data; 191 lp_bytes = lpBytes(lp); 192 } 193 raxStop(&ri); 194 195 /* Generate the new entry ID. */ 196 streamID id; 197 if (use_id) 198 id = *use_id; 199 else 200 streamNextID(&s->last_id,&id); 201 202 /* We have to add the key into the radix tree in lexicographic order, 203 * to do so we consider the ID as a single 128 bit number written in 204 * big endian, so that the most significant bytes are the first ones. */ 205 uint64_t rax_key[2]; /* Key in the radix tree containing the listpack.*/ 206 streamID master_id; /* ID of the master entry in the listpack. */ 207 208 /* Create a new listpack and radix tree node if needed. Note that when 209 * a new listpack is created, we populate it with a "master entry". This 210 * is just a set of fields that is taken as references in order to compress 211 * the stream entries that we'll add inside the listpack. 212 * 213 * Note that while we use the first added entry fields to create 214 * the master entry, the first added entry is NOT represented in the master 215 * entry, which is a stand alone object. But of course, the first entry 216 * will compress well because it's used as reference. 217 * 218 * The master entry is composed like in the following example: 219 * 220 * +-------+---------+------------+---------+--/--+---------+---------+-+ 221 * | count | deleted | num-fields | field_1 | field_2 | ... | field_N |0| 222 * +-------+---------+------------+---------+--/--+---------+---------+-+ 223 * 224 * count and deleted just represent respectively the total number of 225 * entries inside the listpack that are valid, and marked as deleted 226 * (delted flag in the entry flags set). So the total number of items 227 * actually inside the listpack (both deleted and not) is count+deleted. 228 * 229 * The real entries will be encoded with an ID that is just the 230 * millisecond and sequence difference compared to the key stored at 231 * the radix tree node containing the listpack (delta encoding), and 232 * if the fields of the entry are the same as the master enty fields, the 233 * entry flags will specify this fact and the entry fields and number 234 * of fields will be omitted (see later in the code of this function). 235 * 236 * The "0" entry at the end is the same as the 'lp-count' entry in the 237 * regular stream entries (see below), and marks the fact that there are 238 * no more entries, when we scan the stream from right to left. */ 239 240 /* First of all, check if we can append to the current macro node or 241 * if we need to switch to the next one. 'lp' will be set to NULL if 242 * the current node is full. */ 243 if (lp != NULL) { 244 if (server.stream_node_max_bytes && 245 lp_bytes > server.stream_node_max_bytes) 246 { 247 lp = NULL; 248 } else if (server.stream_node_max_entries) { 249 int64_t count = lpGetInteger(lpFirst(lp)); 250 if (count > server.stream_node_max_entries) lp = NULL; 251 } 252 } 253 254 int flags = STREAM_ITEM_FLAG_NONE; 255 if (lp == NULL || lp_bytes > server.stream_node_max_bytes) { 256 master_id = id; 257 streamEncodeID(rax_key,&id); 258 /* Create the listpack having the master entry ID and fields. */ 259 lp = lpNew(); 260 lp = lpAppendInteger(lp,1); /* One item, the one we are adding. */ 261 lp = lpAppendInteger(lp,0); /* Zero deleted so far. */ 262 lp = lpAppendInteger(lp,numfields); 263 for (int64_t i = 0; i < numfields; i++) { 264 sds field = argv[i*2]->ptr; 265 lp = lpAppend(lp,(unsigned char*)field,sdslen(field)); 266 } 267 lp = lpAppendInteger(lp,0); /* Master entry zero terminator. */ 268 raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL); 269 /* The first entry we insert, has obviously the same fields of the 270 * master entry. */ 271 flags |= STREAM_ITEM_FLAG_SAMEFIELDS; 272 } else { 273 serverAssert(ri.key_len == sizeof(rax_key)); 274 memcpy(rax_key,ri.key,sizeof(rax_key)); 275 276 /* Read the master ID from the radix tree key. */ 277 streamDecodeID(rax_key,&master_id); 278 unsigned char *lp_ele = lpFirst(lp); 279 280 /* Update count and skip the deleted fields. */ 281 int64_t count = lpGetInteger(lp_ele); 282 lp = lpReplaceInteger(lp,&lp_ele,count+1); 283 lp_ele = lpNext(lp,lp_ele); /* seek deleted. */ 284 lp_ele = lpNext(lp,lp_ele); /* seek master entry num fields. */ 285 286 /* Check if the entry we are adding, have the same fields 287 * as the master entry. */ 288 int64_t master_fields_count = lpGetInteger(lp_ele); 289 lp_ele = lpNext(lp,lp_ele); 290 if (numfields == master_fields_count) { 291 int64_t i; 292 for (i = 0; i < master_fields_count; i++) { 293 sds field = argv[i*2]->ptr; 294 int64_t e_len; 295 unsigned char buf[LP_INTBUF_SIZE]; 296 unsigned char *e = lpGet(lp_ele,&e_len,buf); 297 /* Stop if there is a mismatch. */ 298 if (sdslen(field) != (size_t)e_len || 299 memcmp(e,field,e_len) != 0) break; 300 lp_ele = lpNext(lp,lp_ele); 301 } 302 /* All fields are the same! We can compress the field names 303 * setting a single bit in the flags. */ 304 if (i == master_fields_count) flags |= STREAM_ITEM_FLAG_SAMEFIELDS; 305 } 306 } 307 308 /* Populate the listpack with the new entry. We use the following 309 * encoding: 310 * 311 * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+ 312 * |flags|entry-id|num-fields|field-1|value-1|...|field-N|value-N|lp-count| 313 * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+ 314 * 315 * However if the SAMEFIELD flag is set, we have just to populate 316 * the entry with the values, so it becomes: 317 * 318 * +-----+--------+-------+-/-+-------+--------+ 319 * |flags|entry-id|value-1|...|value-N|lp-count| 320 * +-----+--------+-------+-/-+-------+--------+ 321 * 322 * The entry-id field is actually two separated fields: the ms 323 * and seq difference compared to the master entry. 324 * 325 * The lp-count field is a number that states the number of listpack pieces 326 * that compose the entry, so that it's possible to travel the entry 327 * in reverse order: we can just start from the end of the listpack, read 328 * the entry, and jump back N times to seek the "flags" field to read 329 * the stream full entry. */ 330 lp = lpAppendInteger(lp,flags); 331 lp = lpAppendInteger(lp,id.ms - master_id.ms); 332 lp = lpAppendInteger(lp,id.seq - master_id.seq); 333 if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) 334 lp = lpAppendInteger(lp,numfields); 335 for (int64_t i = 0; i < numfields; i++) { 336 sds field = argv[i*2]->ptr, value = argv[i*2+1]->ptr; 337 if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) 338 lp = lpAppend(lp,(unsigned char*)field,sdslen(field)); 339 lp = lpAppend(lp,(unsigned char*)value,sdslen(value)); 340 } 341 /* Compute and store the lp-count field. */ 342 int64_t lp_count = numfields; 343 lp_count += 3; /* Add the 3 fixed fields flags + ms-diff + seq-diff. */ 344 if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) { 345 /* If the item is not compressed, it also has the fields other than 346 * the values, and an additional num-fileds field. */ 347 lp_count += numfields+1; 348 } 349 lp = lpAppendInteger(lp,lp_count); 350 351 /* Insert back into the tree in order to update the listpack pointer. */ 352 if (ri.data != lp) 353 raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL); 354 s->length++; 355 s->last_id = id; 356 if (added_id) *added_id = id; 357 return C_OK; 358 } 359 360 /* Trim the stream 's' to have no more than maxlen elements, and return the 361 * number of elements removed from the stream. The 'approx' option, if non-zero, 362 * specifies that the trimming must be performed in a approximated way in 363 * order to maximize performances. This means that the stream may contain 364 * more elements than 'maxlen', and elements are only removed if we can remove 365 * a *whole* node of the radix tree. The elements are removed from the head 366 * of the stream (older elements). 367 * 368 * The function may return zero if: 369 * 370 * 1) The stream is already shorter or equal to the specified max length. 371 * 2) The 'approx' option is true and the head node had not enough elements 372 * to be deleted, leaving the stream with a number of elements >= maxlen. 373 */ 374 int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) { 375 if (s->length <= maxlen) return 0; 376 377 raxIterator ri; 378 raxStart(&ri,s->rax); 379 raxSeek(&ri,"^",NULL,0); 380 381 int64_t deleted = 0; 382 while(s->length > maxlen && raxNext(&ri)) { 383 unsigned char *lp = ri.data, *p = lpFirst(lp); 384 int64_t entries = lpGetInteger(p); 385 386 /* Check if we can remove the whole node, and still have at 387 * least maxlen elements. */ 388 if (s->length - entries >= maxlen) { 389 lpFree(lp); 390 raxRemove(s->rax,ri.key,ri.key_len,NULL); 391 raxSeek(&ri,">=",ri.key,ri.key_len); 392 s->length -= entries; 393 deleted += entries; 394 continue; 395 } 396 397 /* If we cannot remove a whole element, and approx is true, 398 * stop here. */ 399 if (approx) break; 400 401 /* Otherwise, we have to mark single entries inside the listpack 402 * as deleted. We start by updating the entries/deleted counters. */ 403 int64_t to_delete = s->length - maxlen; 404 serverAssert(to_delete < entries); 405 lp = lpReplaceInteger(lp,&p,entries-to_delete); 406 p = lpNext(lp,p); /* Seek deleted field. */ 407 int64_t marked_deleted = lpGetInteger(p); 408 lp = lpReplaceInteger(lp,&p,marked_deleted+to_delete); 409 p = lpNext(lp,p); /* Seek num-of-fields in the master entry. */ 410 411 /* Skip all the master fields. */ 412 int64_t master_fields_count = lpGetInteger(p); 413 p = lpNext(lp,p); /* Seek the first field. */ 414 for (int64_t j = 0; j < master_fields_count; j++) 415 p = lpNext(lp,p); /* Skip all master fields. */ 416 p = lpNext(lp,p); /* Skip the zero master entry terminator. */ 417 418 /* 'p' is now pointing to the first entry inside the listpack. 419 * We have to run entry after entry, marking entries as deleted 420 * if they are already not deleted. */ 421 while(p) { 422 int flags = lpGetInteger(p); 423 int to_skip; 424 425 /* Mark the entry as deleted. */ 426 if (!(flags & STREAM_ITEM_FLAG_DELETED)) { 427 flags |= STREAM_ITEM_FLAG_DELETED; 428 lp = lpReplaceInteger(lp,&p,flags); 429 deleted++; 430 s->length--; 431 if (s->length <= maxlen) break; /* Enough entries deleted. */ 432 } 433 434 p = lpNext(lp,p); /* Skip ID ms delta. */ 435 p = lpNext(lp,p); /* Skip ID seq delta. */ 436 p = lpNext(lp,p); /* Seek num-fields or values (if compressed). */ 437 if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) { 438 to_skip = master_fields_count; 439 } else { 440 to_skip = lpGetInteger(p); 441 to_skip = 1+(to_skip*2); 442 } 443 444 while(to_skip--) p = lpNext(lp,p); /* Skip the whole entry. */ 445 p = lpNext(lp,p); /* Skip the final lp-count field. */ 446 } 447 448 /* Here we should perform garbage collection in case at this point 449 * there are too many entries deleted inside the listpack. */ 450 entries -= to_delete; 451 marked_deleted += to_delete; 452 if (entries + marked_deleted > 10 && marked_deleted > entries/2) { 453 /* TODO: perform a garbage collection. */ 454 } 455 456 /* Update the listpack with the new pointer. */ 457 raxInsert(s->rax,ri.key,ri.key_len,lp,NULL); 458 459 break; /* If we are here, there was enough to delete in the current 460 node, so no need to go to the next node. */ 461 } 462 463 raxStop(&ri); 464 return deleted; 465 } 466 467 /* Initialize the stream iterator, so that we can call iterating functions 468 * to get the next items. This requires a corresponding streamIteratorStop() 469 * at the end. The 'rev' parameter controls the direction. If it's zero the 470 * iteration is from the start to the end element (inclusive), otherwise 471 * if rev is non-zero, the iteration is reversed. 472 * 473 * Once the iterator is initialized, we iterate like this: 474 * 475 * streamIterator myiterator; 476 * streamIteratorStart(&myiterator,...); 477 * int64_t numfields; 478 * while(streamIteratorGetID(&myiterator,&ID,&numfields)) { 479 * while(numfields--) { 480 * unsigned char *key, *value; 481 * size_t key_len, value_len; 482 * streamIteratorGetField(&myiterator,&key,&value,&key_len,&value_len); 483 * 484 * ... do what you want with key and value ... 485 * } 486 * } 487 * streamIteratorStop(&myiterator); */ 488 void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev) { 489 /* Intialize the iterator and translates the iteration start/stop 490 * elements into a 128 big big-endian number. */ 491 if (start) { 492 streamEncodeID(si->start_key,start); 493 } else { 494 si->start_key[0] = 0; 495 si->start_key[1] = 0; 496 } 497 498 if (end) { 499 streamEncodeID(si->end_key,end); 500 } else { 501 si->end_key[0] = UINT64_MAX; 502 si->end_key[1] = UINT64_MAX; 503 } 504 505 /* Seek the correct node in the radix tree. */ 506 raxStart(&si->ri,s->rax); 507 if (!rev) { 508 if (start && (start->ms || start->seq)) { 509 raxSeek(&si->ri,"<=",(unsigned char*)si->start_key, 510 sizeof(si->start_key)); 511 if (raxEOF(&si->ri)) raxSeek(&si->ri,"^",NULL,0); 512 } else { 513 raxSeek(&si->ri,"^",NULL,0); 514 } 515 } else { 516 if (end && (end->ms || end->seq)) { 517 raxSeek(&si->ri,"<=",(unsigned char*)si->end_key, 518 sizeof(si->end_key)); 519 if (raxEOF(&si->ri)) raxSeek(&si->ri,"$",NULL,0); 520 } else { 521 raxSeek(&si->ri,"$",NULL,0); 522 } 523 } 524 si->stream = s; 525 si->lp = NULL; /* There is no current listpack right now. */ 526 si->lp_ele = NULL; /* Current listpack cursor. */ 527 si->rev = rev; /* Direction, if non-zero reversed, from end to start. */ 528 } 529 530 /* Return 1 and store the current item ID at 'id' if there are still 531 * elements within the iteration range, otherwise return 0 in order to 532 * signal the iteration terminated. */ 533 int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) { 534 while(1) { /* Will stop when element > stop_key or end of radix tree. */ 535 /* If the current listpack is set to NULL, this is the start of the 536 * iteration or the previous listpack was completely iterated. 537 * Go to the next node. */ 538 if (si->lp == NULL || si->lp_ele == NULL) { 539 if (!si->rev && !raxNext(&si->ri)) return 0; 540 else if (si->rev && !raxPrev(&si->ri)) return 0; 541 serverAssert(si->ri.key_len == sizeof(streamID)); 542 /* Get the master ID. */ 543 streamDecodeID(si->ri.key,&si->master_id); 544 /* Get the master fields count. */ 545 si->lp = si->ri.data; 546 si->lp_ele = lpFirst(si->lp); /* Seek items count */ 547 si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek deleted count. */ 548 si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek num fields. */ 549 si->master_fields_count = lpGetInteger(si->lp_ele); 550 si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek first field. */ 551 si->master_fields_start = si->lp_ele; 552 /* We are now pointing to the first field of the master entry. 553 * We need to seek either the first or the last entry depending 554 * on the direction of the iteration. */ 555 if (!si->rev) { 556 /* If we are iterating in normal order, skip the master fields 557 * to seek the first actual entry. */ 558 for (uint64_t i = 0; i < si->master_fields_count; i++) 559 si->lp_ele = lpNext(si->lp,si->lp_ele); 560 } else { 561 /* If we are iterating in reverse direction, just seek the 562 * last part of the last entry in the listpack (that is, the 563 * fields count). */ 564 si->lp_ele = lpLast(si->lp); 565 } 566 } else if (si->rev) { 567 /* If we are itereating in the reverse order, and this is not 568 * the first entry emitted for this listpack, then we already 569 * emitted the current entry, and have to go back to the previous 570 * one. */ 571 int lp_count = lpGetInteger(si->lp_ele); 572 while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele); 573 /* Seek lp-count of prev entry. */ 574 si->lp_ele = lpPrev(si->lp,si->lp_ele); 575 } 576 577 /* For every radix tree node, iterate the corresponding listpack, 578 * returning elements when they are within range. */ 579 while(1) { 580 if (!si->rev) { 581 /* If we are going forward, skip the previous entry 582 * lp-count field (or in case of the master entry, the zero 583 * term field) */ 584 si->lp_ele = lpNext(si->lp,si->lp_ele); 585 if (si->lp_ele == NULL) break; 586 } else { 587 /* If we are going backward, read the number of elements this 588 * entry is composed of, and jump backward N times to seek 589 * its start. */ 590 int64_t lp_count = lpGetInteger(si->lp_ele); 591 if (lp_count == 0) { /* We reached the master entry. */ 592 si->lp = NULL; 593 si->lp_ele = NULL; 594 break; 595 } 596 while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele); 597 } 598 599 /* Get the flags entry. */ 600 si->lp_flags = si->lp_ele; 601 int flags = lpGetInteger(si->lp_ele); 602 si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek ID. */ 603 604 /* Get the ID: it is encoded as difference between the master 605 * ID and this entry ID. */ 606 *id = si->master_id; 607 id->ms += lpGetInteger(si->lp_ele); 608 si->lp_ele = lpNext(si->lp,si->lp_ele); 609 id->seq += lpGetInteger(si->lp_ele); 610 si->lp_ele = lpNext(si->lp,si->lp_ele); 611 unsigned char buf[sizeof(streamID)]; 612 streamEncodeID(buf,id); 613 614 /* The number of entries is here or not depending on the 615 * flags. */ 616 if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) { 617 *numfields = si->master_fields_count; 618 } else { 619 *numfields = lpGetInteger(si->lp_ele); 620 si->lp_ele = lpNext(si->lp,si->lp_ele); 621 } 622 623 /* If current >= start, and the entry is not marked as 624 * deleted, emit it. */ 625 if (!si->rev) { 626 if (memcmp(buf,si->start_key,sizeof(streamID)) >= 0 && 627 !(flags & STREAM_ITEM_FLAG_DELETED)) 628 { 629 if (memcmp(buf,si->end_key,sizeof(streamID)) > 0) 630 return 0; /* We are already out of range. */ 631 si->entry_flags = flags; 632 if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) 633 si->master_fields_ptr = si->master_fields_start; 634 return 1; /* Valid item returned. */ 635 } 636 } else { 637 if (memcmp(buf,si->end_key,sizeof(streamID)) <= 0 && 638 !(flags & STREAM_ITEM_FLAG_DELETED)) 639 { 640 if (memcmp(buf,si->start_key,sizeof(streamID)) < 0) 641 return 0; /* We are already out of range. */ 642 si->entry_flags = flags; 643 if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) 644 si->master_fields_ptr = si->master_fields_start; 645 return 1; /* Valid item returned. */ 646 } 647 } 648 649 /* If we do not emit, we have to discard if we are going 650 * forward, or seek the previous entry if we are going 651 * backward. */ 652 if (!si->rev) { 653 int64_t to_discard = (flags & STREAM_ITEM_FLAG_SAMEFIELDS) ? 654 *numfields : *numfields*2; 655 for (int64_t i = 0; i < to_discard; i++) 656 si->lp_ele = lpNext(si->lp,si->lp_ele); 657 } else { 658 int64_t prev_times = 4; /* flag + id ms + id seq + one more to 659 go back to the previous entry "count" 660 field. */ 661 /* If the entry was not flagged SAMEFIELD we also read the 662 * number of fields, so go back one more. */ 663 if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) prev_times++; 664 while(prev_times--) si->lp_ele = lpPrev(si->lp,si->lp_ele); 665 } 666 } 667 668 /* End of listpack reached. Try the next/prev radix tree node. */ 669 } 670 } 671 672 /* Get the field and value of the current item we are iterating. This should 673 * be called immediately after streamIteratorGetID(), and for each field 674 * according to the number of fields returned by streamIteratorGetID(). 675 * The function populates the field and value pointers and the corresponding 676 * lengths by reference, that are valid until the next iterator call, assuming 677 * no one touches the stream meanwhile. */ 678 void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen) { 679 if (si->entry_flags & STREAM_ITEM_FLAG_SAMEFIELDS) { 680 *fieldptr = lpGet(si->master_fields_ptr,fieldlen,si->field_buf); 681 si->master_fields_ptr = lpNext(si->lp,si->master_fields_ptr); 682 } else { 683 *fieldptr = lpGet(si->lp_ele,fieldlen,si->field_buf); 684 si->lp_ele = lpNext(si->lp,si->lp_ele); 685 } 686 *valueptr = lpGet(si->lp_ele,valuelen,si->value_buf); 687 si->lp_ele = lpNext(si->lp,si->lp_ele); 688 } 689 690 /* Remove the current entry from the stream: can be called after the 691 * GetID() API or after any GetField() call, however we need to iterate 692 * a valid entry while calling this function. Moreover the function 693 * requires the entry ID we are currently iterating, that was previously 694 * returned by GetID(). 695 * 696 * Note that after calling this function, next calls to GetField() can't 697 * be performed: the entry is now deleted. Instead the iterator will 698 * automatically re-seek to the next entry, so the caller should continue 699 * with GetID(). */ 700 void streamIteratorRemoveEntry(streamIterator *si, streamID *current) { 701 unsigned char *lp = si->lp; 702 int64_t aux; 703 704 /* We do not really delete the entry here. Instead we mark it as 705 * deleted flagging it, and also incrementing the count of the 706 * deleted entries in the listpack header. 707 * 708 * We start flagging: */ 709 int flags = lpGetInteger(si->lp_flags); 710 flags |= STREAM_ITEM_FLAG_DELETED; 711 lp = lpReplaceInteger(lp,&si->lp_flags,flags); 712 713 /* Change the valid/deleted entries count in the master entry. */ 714 unsigned char *p = lpFirst(lp); 715 aux = lpGetInteger(p); 716 717 if (aux == 1) { 718 /* If this is the last element in the listpack, we can remove the whole 719 * node. */ 720 lpFree(lp); 721 raxRemove(si->stream->rax,si->ri.key,si->ri.key_len,NULL); 722 } else { 723 /* In the base case we alter the counters of valid/deleted entries. */ 724 lp = lpReplaceInteger(lp,&p,aux-1); 725 p = lpNext(lp,p); /* Seek deleted field. */ 726 aux = lpGetInteger(p); 727 lp = lpReplaceInteger(lp,&p,aux+1); 728 729 /* Update the listpack with the new pointer. */ 730 if (si->lp != lp) 731 raxInsert(si->stream->rax,si->ri.key,si->ri.key_len,lp,NULL); 732 } 733 734 /* Update the number of entries counter. */ 735 si->stream->length--; 736 737 /* Re-seek the iterator to fix the now messed up state. */ 738 streamID start, end; 739 if (si->rev) { 740 streamDecodeID(si->start_key,&start); 741 end = *current; 742 } else { 743 start = *current; 744 streamDecodeID(si->end_key,&end); 745 } 746 streamIteratorStop(si); 747 streamIteratorStart(si,si->stream,&start,&end,si->rev); 748 749 /* TODO: perform a garbage collection here if the ration between 750 * deleted and valid goes over a certain limit. */ 751 } 752 753 /* Stop the stream iterator. The only cleanup we need is to free the rax 754 * itereator, since the stream iterator itself is supposed to be stack 755 * allocated. */ 756 void streamIteratorStop(streamIterator *si) { 757 raxStop(&si->ri); 758 } 759 760 /* Delete the specified item ID from the stream, returning 1 if the item 761 * was deleted 0 otherwise (if it does not exist). */ 762 int streamDeleteItem(stream *s, streamID *id) { 763 int deleted = 0; 764 streamIterator si; 765 streamIteratorStart(&si,s,id,id,0); 766 streamID myid; 767 int64_t numfields; 768 if (streamIteratorGetID(&si,&myid,&numfields)) { 769 streamIteratorRemoveEntry(&si,&myid); 770 deleted = 1; 771 } 772 streamIteratorStop(&si); 773 return deleted; 774 } 775 776 /* Emit a reply in the client output buffer by formatting a Stream ID 777 * in the standard <ms>-<seq> format, using the simple string protocol 778 * of REPL. */ 779 void addReplyStreamID(client *c, streamID *id) { 780 sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq); 781 addReplyBulkSds(c,replyid); 782 } 783 784 /* Similar to the above function, but just creates an object, usually useful 785 * for replication purposes to create arguments. */ 786 robj *createObjectFromStreamID(streamID *id) { 787 return createObject(OBJ_STRING, sdscatfmt(sdsempty(),"%U-%U", 788 id->ms,id->seq)); 789 } 790 791 /* As a result of an explicit XCLAIM or XREADGROUP command, new entries 792 * are created in the pending list of the stream and consumers. We need 793 * to propagate this changes in the form of XCLAIM commands. */ 794 void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack) { 795 /* We need to generate an XCLAIM that will work in a idempotent fashion: 796 * 797 * XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time> 798 * RETRYCOUNT <count> FORCE JUSTID LASTID <id>. 799 * 800 * Note that JUSTID is useful in order to avoid that XCLAIM will do 801 * useless work in the slave side, trying to fetch the stream item. */ 802 robj *argv[14]; 803 argv[0] = createStringObject("XCLAIM",6); 804 argv[1] = key; 805 argv[2] = groupname; 806 argv[3] = createStringObject(nack->consumer->name,sdslen(nack->consumer->name)); 807 argv[4] = createStringObjectFromLongLong(0); 808 argv[5] = id; 809 argv[6] = createStringObject("TIME",4); 810 argv[7] = createStringObjectFromLongLong(nack->delivery_time); 811 argv[8] = createStringObject("RETRYCOUNT",10); 812 argv[9] = createStringObjectFromLongLong(nack->delivery_count); 813 argv[10] = createStringObject("FORCE",5); 814 argv[11] = createStringObject("JUSTID",6); 815 argv[12] = createStringObject("LASTID",6); 816 argv[13] = createObjectFromStreamID(&group->last_id); 817 propagate(server.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL); 818 decrRefCount(argv[0]); 819 decrRefCount(argv[3]); 820 decrRefCount(argv[4]); 821 decrRefCount(argv[6]); 822 decrRefCount(argv[7]); 823 decrRefCount(argv[8]); 824 decrRefCount(argv[9]); 825 decrRefCount(argv[10]); 826 decrRefCount(argv[11]); 827 decrRefCount(argv[12]); 828 decrRefCount(argv[13]); 829 } 830 831 /* We need this when we want to propoagate the new last-id of a consumer group 832 * that was consumed by XREADGROUP with the NOACK option: in that case we can't 833 * propagate the last ID just using the XCLAIM LASTID option, so we emit 834 * 835 * XGROUP SETID <key> <groupname> <id> 836 */ 837 void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname) { 838 robj *argv[5]; 839 argv[0] = createStringObject("XGROUP",6); 840 argv[1] = createStringObject("SETID",5); 841 argv[2] = key; 842 argv[3] = groupname; 843 argv[4] = createObjectFromStreamID(&group->last_id); 844 propagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL); 845 decrRefCount(argv[0]); 846 decrRefCount(argv[1]); 847 decrRefCount(argv[4]); 848 } 849 850 /* Send the stream items in the specified range to the client 'c'. The range 851 * the client will receive is between start and end inclusive, if 'count' is 852 * non zero, no more than 'count' elements are sent. 853 * 854 * The 'end' pointer can be NULL to mean that we want all the elements from 855 * 'start' till the end of the stream. If 'rev' is non zero, elements are 856 * produced in reversed order from end to start. 857 * 858 * The function returns the number of entries emitted. 859 * 860 * If group and consumer are not NULL, the function performs additional work: 861 * 1. It updates the last delivered ID in the group in case we are 862 * sending IDs greater than the current last ID. 863 * 2. If the requested IDs are already assigned to some other consumer, the 864 * function will not return it to the client. 865 * 3. An entry in the pending list will be created for every entry delivered 866 * for the first time to this consumer. 867 * 868 * The behavior may be modified passing non-zero flags: 869 * 870 * STREAM_RWR_NOACK: Do not craete PEL entries, that is, the point "3" above 871 * is not performed. 872 * STREAM_RWR_RAWENTRIES: Do not emit array boundaries, but just the entries, 873 * and return the number of entries emitted as usually. 874 * This is used when the function is just used in order 875 * to emit data and there is some higher level logic. 876 * 877 * The final argument 'spi' (stream propagatino info pointer) is a structure 878 * filled with information needed to propagte the command execution to AOF 879 * and slaves, in the case a consumer group was passed: we need to generate 880 * XCLAIM commands to create the pending list into AOF/slaves in that case. 881 * 882 * If 'spi' is set to NULL no propagation will happen even if the group was 883 * given, but currently such a feature is never used by the code base that 884 * will always pass 'spi' and propagate when a group is passed. 885 * 886 * Note that this function is recursive in certain cases. When it's called 887 * with a non NULL group and consumer argument, it may call 888 * streamReplyWithRangeFromConsumerPEL() in order to get entries from the 889 * consumer pending entries list. However such a function will then call 890 * streamReplyWithRange() in order to emit single entries (found in the 891 * PEL by ID) to the client. This is the use case for the STREAM_RWR_RAWENTRIES 892 * flag. 893 */ 894 #define STREAM_RWR_NOACK (1<<0) /* Do not create entries in the PEL. */ 895 #define STREAM_RWR_RAWENTRIES (1<<1) /* Do not emit protocol for array 896 boundaries, just the entries. */ 897 #define STREAM_RWR_HISTORY (1<<2) /* Only serve consumer local PEL. */ 898 size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) { 899 void *arraylen_ptr = NULL; 900 size_t arraylen = 0; 901 streamIterator si; 902 int64_t numfields; 903 streamID id; 904 int propagate_last_id = 0; 905 906 /* If the client is asking for some history, we serve it using a 907 * different function, so that we return entries *solely* from its 908 * own PEL. This ensures each consumer will always and only see 909 * the history of messages delivered to it and not yet confirmed 910 * as delivered. */ 911 if (group && (flags & STREAM_RWR_HISTORY)) { 912 return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count, 913 consumer); 914 } 915 916 if (!(flags & STREAM_RWR_RAWENTRIES)) 917 arraylen_ptr = addDeferredMultiBulkLength(c); 918 streamIteratorStart(&si,s,start,end,rev); 919 while(streamIteratorGetID(&si,&id,&numfields)) { 920 /* Update the group last_id if needed. */ 921 if (group && streamCompareID(&id,&group->last_id) > 0) { 922 group->last_id = id; 923 propagate_last_id = 1; 924 } 925 926 /* Emit a two elements array for each item. The first is 927 * the ID, the second is an array of field-value pairs. */ 928 addReplyMultiBulkLen(c,2); 929 addReplyStreamID(c,&id); 930 addReplyMultiBulkLen(c,numfields*2); 931 932 /* Emit the field-value pairs. */ 933 while(numfields--) { 934 unsigned char *key, *value; 935 int64_t key_len, value_len; 936 streamIteratorGetField(&si,&key,&value,&key_len,&value_len); 937 addReplyBulkCBuffer(c,key,key_len); 938 addReplyBulkCBuffer(c,value,value_len); 939 } 940 941 /* If a group is passed, we need to create an entry in the 942 * PEL (pending entries list) of this group *and* this consumer. 943 * 944 * Note that we cannot be sure about the fact the message is not 945 * already owned by another consumer, because the admin is able 946 * to change the consumer group last delivered ID using the 947 * XGROUP SETID command. So if we find that there is already 948 * a NACK for the entry, we need to associate it to the new 949 * consumer. */ 950 if (group && !(flags & STREAM_RWR_NOACK)) { 951 unsigned char buf[sizeof(streamID)]; 952 streamEncodeID(buf,&id); 953 954 /* Try to add a new NACK. Most of the time this will work and 955 * will not require extra lookups. We'll fix the problem later 956 * if we find that there is already a entry for this ID. */ 957 streamNACK *nack = streamCreateNACK(consumer); 958 int group_inserted = 959 raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL); 960 int consumer_inserted = 961 raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL); 962 963 /* Now we can check if the entry was already busy, and 964 * in that case reassign the entry to the new consumer, 965 * or update it if the consumer is the same as before. */ 966 if (group_inserted == 0) { 967 streamFreeNACK(nack); 968 nack = raxFind(group->pel,buf,sizeof(buf)); 969 serverAssert(nack != raxNotFound); 970 raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); 971 /* Update the consumer and NACK metadata. */ 972 nack->consumer = consumer; 973 nack->delivery_time = mstime(); 974 nack->delivery_count = 1; 975 /* Add the entry in the new consumer local PEL. */ 976 raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); 977 } else if (group_inserted == 1 && consumer_inserted == 0) { 978 serverPanic("NACK half-created. Should not be possible."); 979 } 980 981 /* Propagate as XCLAIM. */ 982 if (spi) { 983 robj *idarg = createObjectFromStreamID(&id); 984 streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack); 985 decrRefCount(idarg); 986 } 987 } else { 988 if (propagate_last_id) 989 streamPropagateGroupID(c,spi->keyname,group,spi->groupname); 990 } 991 992 arraylen++; 993 if (count && count == arraylen) break; 994 } 995 streamIteratorStop(&si); 996 if (arraylen_ptr) setDeferredMultiBulkLength(c,arraylen_ptr,arraylen); 997 return arraylen; 998 } 999 1000 /* This is an helper function for streamReplyWithRange() when called with 1001 * group and consumer arguments, but with a range that is referring to already 1002 * delivered messages. In this case we just emit messages that are already 1003 * in the history of the consumer, fetching the IDs from its PEL. 1004 * 1005 * Note that this function does not have a 'rev' argument because it's not 1006 * possible to iterate in reverse using a group. Basically this function 1007 * is only called as a result of the XREADGROUP command. 1008 * 1009 * This function is more expensive because it needs to inspect the PEL and then 1010 * seek into the radix tree of the messages in order to emit the full message 1011 * to the client. However clients only reach this code path when they are 1012 * fetching the history of already retrieved messages, which is rare. */ 1013 size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer) { 1014 raxIterator ri; 1015 unsigned char startkey[sizeof(streamID)]; 1016 unsigned char endkey[sizeof(streamID)]; 1017 streamEncodeID(startkey,start); 1018 if (end) streamEncodeID(endkey,end); 1019 1020 size_t arraylen = 0; 1021 void *arraylen_ptr = addDeferredMultiBulkLength(c); 1022 raxStart(&ri,consumer->pel); 1023 raxSeek(&ri,">=",startkey,sizeof(startkey)); 1024 while(raxNext(&ri) && (!count || arraylen < count)) { 1025 if (end && memcmp(ri.key,end,ri.key_len) > 0) break; 1026 streamID thisid; 1027 streamDecodeID(ri.key,&thisid); 1028 if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,NULL,NULL, 1029 STREAM_RWR_RAWENTRIES,NULL) == 0) 1030 { 1031 /* Note that we may have a not acknowledged entry in the PEL 1032 * about a message that's no longer here because was removed 1033 * by the user by other means. In that case we signal it emitting 1034 * the ID but then a NULL entry for the fields. */ 1035 addReplyMultiBulkLen(c,2); 1036 streamID id; 1037 streamDecodeID(ri.key,&id); 1038 addReplyStreamID(c,&id); 1039 addReply(c,shared.nullmultibulk); 1040 } else { 1041 streamNACK *nack = ri.data; 1042 nack->delivery_time = mstime(); 1043 nack->delivery_count++; 1044 } 1045 arraylen++; 1046 } 1047 raxStop(&ri); 1048 setDeferredMultiBulkLength(c,arraylen_ptr,arraylen); 1049 return arraylen; 1050 } 1051 1052 /* ----------------------------------------------------------------------- 1053 * Stream commands implementation 1054 * ----------------------------------------------------------------------- */ 1055 1056 /* Look the stream at 'key' and return the corresponding stream object. 1057 * The function creates a key setting it to an empty stream if needed. */ 1058 robj *streamTypeLookupWriteOrCreate(client *c, robj *key) { 1059 robj *o = lookupKeyWrite(c->db,key); 1060 if (o == NULL) { 1061 o = createStreamObject(); 1062 dbAdd(c->db,key,o); 1063 } else { 1064 if (o->type != OBJ_STREAM) { 1065 addReply(c,shared.wrongtypeerr); 1066 return NULL; 1067 } 1068 } 1069 return o; 1070 } 1071 1072 /* Helper function to convert a string to an unsigned long long value. 1073 * The function attempts to use the faster string2ll() function inside 1074 * Redis: if it fails, strtoull() is used instead. The function returns 1075 * 1 if the conversion happened successfully or 0 if the number is 1076 * invalid or out of range. */ 1077 int string2ull(const char *s, unsigned long long *value) { 1078 long long ll; 1079 if (string2ll(s,strlen(s),&ll)) { 1080 if (ll < 0) return 0; /* Negative values are out of range. */ 1081 *value = ll; 1082 return 1; 1083 } 1084 errno = 0; 1085 char *endptr = NULL; 1086 *value = strtoull(s,&endptr,10); 1087 if (errno == EINVAL || errno == ERANGE || !(*s != '\0' && *endptr == '\0')) 1088 return 0; /* strtoull() failed. */ 1089 return 1; /* Conversion done! */ 1090 } 1091 1092 /* Parse a stream ID in the format given by clients to Redis, that is 1093 * <ms>-<seq>, and converts it into a streamID structure. If 1094 * the specified ID is invalid C_ERR is returned and an error is reported 1095 * to the client, otherwise C_OK is returned. The ID may be in incomplete 1096 * form, just stating the milliseconds time part of the stream. In such a case 1097 * the missing part is set according to the value of 'missing_seq' parameter. 1098 * 1099 * The IDs "-" and "+" specify respectively the minimum and maximum IDs 1100 * that can be represented. If 'strict' is set to 1, "-" and "+" will be 1101 * treated as an invalid ID. 1102 * 1103 * If 'c' is set to NULL, no reply is sent to the client. */ 1104 int streamGenericParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int strict) { 1105 char buf[128]; 1106 if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid; 1107 memcpy(buf,o->ptr,sdslen(o->ptr)+1); 1108 1109 if (strict && (buf[0] == '-' || buf[0] == '+') && buf[1] == '\0') 1110 goto invalid; 1111 1112 /* Handle the "-" and "+" special cases. */ 1113 if (buf[0] == '-' && buf[1] == '\0') { 1114 id->ms = 0; 1115 id->seq = 0; 1116 return C_OK; 1117 } else if (buf[0] == '+' && buf[1] == '\0') { 1118 id->ms = UINT64_MAX; 1119 id->seq = UINT64_MAX; 1120 return C_OK; 1121 } 1122 1123 /* Parse <ms>-<seq> form. */ 1124 char *dot = strchr(buf,'-'); 1125 if (dot) *dot = '\0'; 1126 unsigned long long ms, seq; 1127 if (string2ull(buf,&ms) == 0) goto invalid; 1128 if (dot && string2ull(dot+1,&seq) == 0) goto invalid; 1129 if (!dot) seq = missing_seq; 1130 id->ms = ms; 1131 id->seq = seq; 1132 return C_OK; 1133 1134 invalid: 1135 if (c) addReplyError(c,"Invalid stream ID specified as stream " 1136 "command argument"); 1137 return C_ERR; 1138 } 1139 1140 /* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to 1141 * 0, to be used when - and + are accetable IDs. */ 1142 int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) { 1143 return streamGenericParseIDOrReply(c,o,id,missing_seq,0); 1144 } 1145 1146 /* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to 1147 * 1, to be used when we want to return an error if the special IDs + or - 1148 * are provided. */ 1149 int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) { 1150 return streamGenericParseIDOrReply(c,o,id,missing_seq,1); 1151 } 1152 1153 /* We propagate MAXLEN ~ <count> as MAXLEN = <resulting-len-of-stream> 1154 * otherwise trimming is no longer determinsitic on replicas / AOF. */ 1155 void streamRewriteApproxMaxlen(client *c, stream *s, int maxlen_arg_idx) { 1156 robj *maxlen_obj = createStringObjectFromLongLong(s->length); 1157 robj *equal_obj = createStringObject("=",1); 1158 1159 rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj); 1160 rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj); 1161 1162 decrRefCount(equal_obj); 1163 decrRefCount(maxlen_obj); 1164 } 1165 1166 /* XADD key [MAXLEN [~|=] <count>] <ID or *> [field value] [field value] ... */ 1167 void xaddCommand(client *c) { 1168 streamID id; 1169 int id_given = 0; /* Was an ID different than "*" specified? */ 1170 long long maxlen = -1; /* If left to -1 no trimming is performed. */ 1171 int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so 1172 the maxium length is not applied verbatim. */ 1173 int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */ 1174 1175 /* Parse options. */ 1176 int i = 2; /* This is the first argument position where we could 1177 find an option, or the ID. */ 1178 for (; i < c->argc; i++) { 1179 int moreargs = (c->argc-1) - i; /* Number of additional arguments. */ 1180 char *opt = c->argv[i]->ptr; 1181 if (opt[0] == '*' && opt[1] == '\0') { 1182 /* This is just a fast path for the common case of auto-ID 1183 * creation. */ 1184 break; 1185 } else if (!strcasecmp(opt,"maxlen") && moreargs) { 1186 approx_maxlen = 0; 1187 char *next = c->argv[i+1]->ptr; 1188 /* Check for the form MAXLEN ~ <count>. */ 1189 if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') { 1190 approx_maxlen = 1; 1191 i++; 1192 } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') { 1193 i++; 1194 } 1195 if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL) 1196 != C_OK) return; 1197 1198 if (maxlen < 0) { 1199 addReplyError(c,"The MAXLEN argument must be >= 0."); 1200 return; 1201 } 1202 i++; 1203 maxlen_arg_idx = i; 1204 } else { 1205 /* If we are here is a syntax error or a valid ID. */ 1206 if (streamParseStrictIDOrReply(c,c->argv[i],&id,0) != C_OK) return; 1207 id_given = 1; 1208 break; 1209 } 1210 } 1211 int field_pos = i+1; 1212 1213 /* Check arity. */ 1214 if ((c->argc - field_pos) < 2 || ((c->argc-field_pos) % 2) == 1) { 1215 addReplyError(c,"wrong number of arguments for XADD"); 1216 return; 1217 } 1218 1219 /* Lookup the stream at key. */ 1220 robj *o; 1221 stream *s; 1222 if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; 1223 s = o->ptr; 1224 1225 /* Append using the low level function and return the ID. */ 1226 if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2, 1227 &id, id_given ? &id : NULL) 1228 == C_ERR) 1229 { 1230 addReplyError(c,"The ID specified in XADD is equal or smaller than the " 1231 "target stream top item"); 1232 return; 1233 } 1234 addReplyStreamID(c,&id); 1235 1236 signalModifiedKey(c->db,c->argv[1]); 1237 notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id); 1238 server.dirty++; 1239 1240 if (maxlen >= 0) { 1241 /* Notify xtrim event if needed. */ 1242 if (streamTrimByLength(s,maxlen,approx_maxlen)) { 1243 notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); 1244 } 1245 if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx); 1246 } 1247 1248 /* Let's rewrite the ID argument with the one actually generated for 1249 * AOF/replication propagation. */ 1250 robj *idarg = createObjectFromStreamID(&id); 1251 rewriteClientCommandArgument(c,i,idarg); 1252 decrRefCount(idarg); 1253 1254 /* We need to signal to blocked clients that there is new data on this 1255 * stream. */ 1256 if (server.blocked_clients_by_type[BLOCKED_STREAM]) 1257 signalKeyAsReady(c->db, c->argv[1]); 1258 } 1259 1260 /* XRANGE/XREVRANGE actual implementation. */ 1261 void xrangeGenericCommand(client *c, int rev) { 1262 robj *o; 1263 stream *s; 1264 streamID startid, endid; 1265 long long count = -1; 1266 robj *startarg = rev ? c->argv[3] : c->argv[2]; 1267 robj *endarg = rev ? c->argv[2] : c->argv[3]; 1268 1269 if (streamParseIDOrReply(c,startarg,&startid,0) == C_ERR) return; 1270 if (streamParseIDOrReply(c,endarg,&endid,UINT64_MAX) == C_ERR) return; 1271 1272 /* Parse the COUNT option if any. */ 1273 if (c->argc > 4) { 1274 for (int j = 4; j < c->argc; j++) { 1275 int additional = c->argc-j-1; 1276 if (strcasecmp(c->argv[j]->ptr,"COUNT") == 0 && additional >= 1) { 1277 if (getLongLongFromObjectOrReply(c,c->argv[j+1],&count,NULL) 1278 != C_OK) return; 1279 if (count < 0) count = 0; 1280 j++; /* Consume additional arg. */ 1281 } else { 1282 addReply(c,shared.syntaxerr); 1283 return; 1284 } 1285 } 1286 } 1287 1288 /* Return the specified range to the user. */ 1289 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL 1290 || checkType(c,o,OBJ_STREAM)) return; 1291 s = o->ptr; 1292 1293 if (count == 0) { 1294 addReply(c,shared.nullmultibulk); 1295 } else { 1296 if (count == -1) count = 0; 1297 streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL); 1298 } 1299 } 1300 1301 /* XRANGE key start end [COUNT <n>] */ 1302 void xrangeCommand(client *c) { 1303 xrangeGenericCommand(c,0); 1304 } 1305 1306 /* XREVRANGE key end start [COUNT <n>] */ 1307 void xrevrangeCommand(client *c) { 1308 xrangeGenericCommand(c,1); 1309 } 1310 1311 /* XLEN */ 1312 void xlenCommand(client *c) { 1313 robj *o; 1314 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL 1315 || checkType(c,o,OBJ_STREAM)) return; 1316 stream *s = o->ptr; 1317 addReplyLongLong(c,s->length); 1318 } 1319 1320 /* XREAD [BLOCK <milliseconds>] [COUNT <count>] STREAMS key_1 key_2 ... key_N 1321 * ID_1 ID_2 ... ID_N 1322 * 1323 * This function also implements the XREAD-GROUP command, which is like XREAD 1324 * but accepting the [GROUP group-name consumer-name] additional option. 1325 * This is useful because while XREAD is a read command and can be called 1326 * on slaves, XREAD-GROUP is not. */ 1327 #define XREAD_BLOCKED_DEFAULT_COUNT 1000 1328 void xreadCommand(client *c) { 1329 long long timeout = -1; /* -1 means, no BLOCK argument given. */ 1330 long long count = 0; 1331 int streams_count = 0; 1332 int streams_arg = 0; 1333 int noack = 0; /* True if NOACK option was specified. */ 1334 #define STREAMID_STATIC_VECTOR_LEN 8 1335 streamID static_ids[STREAMID_STATIC_VECTOR_LEN]; 1336 streamID *ids = static_ids; 1337 streamCG **groups = NULL; 1338 int xreadgroup = sdslen(c->argv[0]->ptr) == 10; /* XREAD or XREADGROUP? */ 1339 robj *groupname = NULL; 1340 robj *consumername = NULL; 1341 1342 /* Parse arguments. */ 1343 for (int i = 1; i < c->argc; i++) { 1344 int moreargs = c->argc-i-1; 1345 char *o = c->argv[i]->ptr; 1346 if (!strcasecmp(o,"BLOCK") && moreargs) { 1347 i++; 1348 if (getTimeoutFromObjectOrReply(c,c->argv[i],&timeout, 1349 UNIT_MILLISECONDS) != C_OK) return; 1350 } else if (!strcasecmp(o,"COUNT") && moreargs) { 1351 i++; 1352 if (getLongLongFromObjectOrReply(c,c->argv[i],&count,NULL) != C_OK) 1353 return; 1354 if (count < 0) count = 0; 1355 } else if (!strcasecmp(o,"STREAMS") && moreargs) { 1356 streams_arg = i+1; 1357 streams_count = (c->argc-streams_arg); 1358 if ((streams_count % 2) != 0) { 1359 addReplyError(c,"Unbalanced XREAD list of streams: " 1360 "for each stream key an ID or '$' must be " 1361 "specified."); 1362 return; 1363 } 1364 streams_count /= 2; /* We have two arguments for each stream. */ 1365 break; 1366 } else if (!strcasecmp(o,"GROUP") && moreargs >= 2) { 1367 if (!xreadgroup) { 1368 addReplyError(c,"The GROUP option is only supported by " 1369 "XREADGROUP. You called XREAD instead."); 1370 return; 1371 } 1372 groupname = c->argv[i+1]; 1373 consumername = c->argv[i+2]; 1374 i += 2; 1375 } else if (!strcasecmp(o,"NOACK")) { 1376 if (!xreadgroup) { 1377 addReplyError(c,"The NOACK option is only supported by " 1378 "XREADGROUP. You called XREAD instead."); 1379 return; 1380 } 1381 noack = 1; 1382 } else { 1383 addReply(c,shared.syntaxerr); 1384 return; 1385 } 1386 } 1387 1388 /* STREAMS option is mandatory. */ 1389 if (streams_arg == 0) { 1390 addReply(c,shared.syntaxerr); 1391 return; 1392 } 1393 1394 /* If the user specified XREADGROUP then it must also 1395 * provide the GROUP option. */ 1396 if (xreadgroup && groupname == NULL) { 1397 addReplyError(c,"Missing GROUP option for XREADGROUP"); 1398 return; 1399 } 1400 1401 /* Parse the IDs and resolve the group name. */ 1402 if (streams_count > STREAMID_STATIC_VECTOR_LEN) 1403 ids = zmalloc(sizeof(streamID)*streams_count); 1404 if (groupname) groups = zmalloc(sizeof(streamCG*)*streams_count); 1405 1406 for (int i = streams_arg + streams_count; i < c->argc; i++) { 1407 /* Specifying "$" as last-known-id means that the client wants to be 1408 * served with just the messages that will arrive into the stream 1409 * starting from now. */ 1410 int id_idx = i - streams_arg - streams_count; 1411 robj *key = c->argv[i-streams_count]; 1412 robj *o = lookupKeyRead(c->db,key); 1413 if (o && checkType(c,o,OBJ_STREAM)) goto cleanup; 1414 streamCG *group = NULL; 1415 1416 /* If a group was specified, than we need to be sure that the 1417 * key and group actually exist. */ 1418 if (groupname) { 1419 if (o == NULL || 1420 (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL) 1421 { 1422 addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer " 1423 "group '%s' in XREADGROUP with GROUP " 1424 "option", 1425 (char*)key->ptr,(char*)groupname->ptr); 1426 goto cleanup; 1427 } 1428 groups[id_idx] = group; 1429 } 1430 1431 if (strcmp(c->argv[i]->ptr,"$") == 0) { 1432 if (xreadgroup) { 1433 addReplyError(c,"The $ ID is meaningless in the context of " 1434 "XREADGROUP: you want to read the history of " 1435 "this consumer by specifying a proper ID, or " 1436 "use the > ID to get new messages. The $ ID would " 1437 "just return an empty result set."); 1438 goto cleanup; 1439 } 1440 if (o) { 1441 stream *s = o->ptr; 1442 ids[id_idx] = s->last_id; 1443 } else { 1444 ids[id_idx].ms = 0; 1445 ids[id_idx].seq = 0; 1446 } 1447 continue; 1448 } else if (strcmp(c->argv[i]->ptr,">") == 0) { 1449 if (!xreadgroup) { 1450 addReplyError(c,"The > ID can be specified only when calling " 1451 "XREADGROUP using the GROUP <group> " 1452 "<consumer> option."); 1453 goto cleanup; 1454 } 1455 /* We use just the maximum ID to signal this is a ">" ID, anyway 1456 * the code handling the blocking clients will have to update the 1457 * ID later in order to match the changing consumer group last ID. */ 1458 ids[id_idx].ms = UINT64_MAX; 1459 ids[id_idx].seq = UINT64_MAX; 1460 continue; 1461 } 1462 if (streamParseStrictIDOrReply(c,c->argv[i],ids+id_idx,0) != C_OK) 1463 goto cleanup; 1464 } 1465 1466 /* Try to serve the client synchronously. */ 1467 size_t arraylen = 0; 1468 void *arraylen_ptr = NULL; 1469 for (int i = 0; i < streams_count; i++) { 1470 robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i]); 1471 if (o == NULL) continue; 1472 stream *s = o->ptr; 1473 streamID *gt = ids+i; /* ID must be greater than this. */ 1474 int serve_synchronously = 0; 1475 int serve_history = 0; /* True for XREADGROUP with ID != ">". */ 1476 1477 /* Check if there are the conditions to serve the client 1478 * synchronously. */ 1479 if (groups) { 1480 /* If the consumer is blocked on a group, we always serve it 1481 * synchronously (serving its local history) if the ID specified 1482 * was not the special ">" ID. */ 1483 if (gt->ms != UINT64_MAX || 1484 gt->seq != UINT64_MAX) 1485 { 1486 serve_synchronously = 1; 1487 serve_history = 1; 1488 } else { 1489 /* We also want to serve a consumer in a consumer group 1490 * synchronously in case the group top item delivered is smaller 1491 * than what the stream has inside. */ 1492 streamID *last = &groups[i]->last_id; 1493 if (s->length && (streamCompareID(&s->last_id, last) > 0)) { 1494 serve_synchronously = 1; 1495 *gt = *last; 1496 } 1497 } 1498 } else { 1499 /* For consumers without a group, we serve synchronously if we can 1500 * actually provide at least one item from the stream. */ 1501 if (s->length && (streamCompareID(&s->last_id, gt) > 0)) { 1502 serve_synchronously = 1; 1503 } 1504 } 1505 1506 if (serve_synchronously) { 1507 arraylen++; 1508 if (arraylen == 1) arraylen_ptr = addDeferredMultiBulkLength(c); 1509 /* streamReplyWithRange() handles the 'start' ID as inclusive, 1510 * so start from the next ID, since we want only messages with 1511 * IDs greater than start. */ 1512 streamID start = *gt; 1513 start.seq++; /* uint64_t can't overflow in this context. */ 1514 1515 /* Emit the two elements sub-array consisting of the name 1516 * of the stream and the data we extracted from it. */ 1517 addReplyMultiBulkLen(c,2); 1518 addReplyBulk(c,c->argv[streams_arg+i]); 1519 streamConsumer *consumer = NULL; 1520 if (groups) consumer = streamLookupConsumer(groups[i], 1521 consumername->ptr,1); 1522 streamPropInfo spi = {c->argv[i+streams_arg],groupname}; 1523 int flags = 0; 1524 if (noack) flags |= STREAM_RWR_NOACK; 1525 if (serve_history) flags |= STREAM_RWR_HISTORY; 1526 streamReplyWithRange(c,s,&start,NULL,count,0, 1527 groups ? groups[i] : NULL, 1528 consumer, flags, &spi); 1529 if (groups) server.dirty++; 1530 } 1531 } 1532 1533 /* We replied synchronously! Set the top array len and return to caller. */ 1534 if (arraylen) { 1535 setDeferredMultiBulkLength(c,arraylen_ptr,arraylen); 1536 goto cleanup; 1537 } 1538 1539 /* Block if needed. */ 1540 if (timeout != -1) { 1541 /* If we are inside a MULTI/EXEC and the list is empty the only thing 1542 * we can do is treating it as a timeout (even with timeout 0). */ 1543 if (c->flags & CLIENT_MULTI) { 1544 addReply(c,shared.nullmultibulk); 1545 goto cleanup; 1546 } 1547 blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count, 1548 timeout, NULL, ids); 1549 /* If no COUNT is given and we block, set a relatively small count: 1550 * in case the ID provided is too low, we do not want the server to 1551 * block just to serve this client a huge stream of messages. */ 1552 c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT; 1553 1554 /* If this is a XREADGROUP + GROUP we need to remember for which 1555 * group and consumer name we are blocking, so later when one of the 1556 * keys receive more data, we can call streamReplyWithRange() passing 1557 * the right arguments. */ 1558 if (groupname) { 1559 incrRefCount(groupname); 1560 incrRefCount(consumername); 1561 c->bpop.xread_group = groupname; 1562 c->bpop.xread_consumer = consumername; 1563 c->bpop.xread_group_noack = noack; 1564 } else { 1565 c->bpop.xread_group = NULL; 1566 c->bpop.xread_consumer = NULL; 1567 } 1568 goto cleanup; 1569 } 1570 1571 /* No BLOCK option, nor any stream we can serve. Reply as with a 1572 * timeout happened. */ 1573 addReply(c,shared.nullmultibulk); 1574 /* Continue to cleanup... */ 1575 1576 cleanup: /* Cleanup. */ 1577 1578 /* The command is propagated (in the READGROUP form) as a side effect 1579 * of calling lower level APIs. So stop any implicit propagation. */ 1580 preventCommandPropagation(c); 1581 if (ids != static_ids) zfree(ids); 1582 zfree(groups); 1583 } 1584 1585 /* ----------------------------------------------------------------------- 1586 * Low level implementation of consumer groups 1587 * ----------------------------------------------------------------------- */ 1588 1589 /* Create a NACK entry setting the delivery count to 1 and the delivery 1590 * time to the current time. The NACK consumer will be set to the one 1591 * specified as argument of the function. */ 1592 streamNACK *streamCreateNACK(streamConsumer *consumer) { 1593 streamNACK *nack = zmalloc(sizeof(*nack)); 1594 nack->delivery_time = mstime(); 1595 nack->delivery_count = 1; 1596 nack->consumer = consumer; 1597 return nack; 1598 } 1599 1600 /* Free a NACK entry. */ 1601 void streamFreeNACK(streamNACK *na) { 1602 zfree(na); 1603 } 1604 1605 /* Free a consumer and associated data structures. Note that this function 1606 * will not reassign the pending messages associated with this consumer 1607 * nor will delete them from the stream, so when this function is called 1608 * to delete a consumer, and not when the whole stream is destroyed, the caller 1609 * should do some work before. */ 1610 void streamFreeConsumer(streamConsumer *sc) { 1611 raxFree(sc->pel); /* No value free callback: the PEL entries are shared 1612 between the consumer and the main stream PEL. */ 1613 sdsfree(sc->name); 1614 zfree(sc); 1615 } 1616 1617 /* Create a new consumer group in the context of the stream 's', having the 1618 * specified name and last server ID. If a consumer group with the same name 1619 * already existed NULL is returned, otherwise the pointer to the consumer 1620 * group is returned. */ 1621 streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id) { 1622 if (s->cgroups == NULL) s->cgroups = raxNew(); 1623 if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound) 1624 return NULL; 1625 1626 streamCG *cg = zmalloc(sizeof(*cg)); 1627 cg->pel = raxNew(); 1628 cg->consumers = raxNew(); 1629 cg->last_id = *id; 1630 raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL); 1631 return cg; 1632 } 1633 1634 /* Free a consumer group and all its associated data. */ 1635 void streamFreeCG(streamCG *cg) { 1636 raxFreeWithCallback(cg->pel,(void(*)(void*))streamFreeNACK); 1637 raxFreeWithCallback(cg->consumers,(void(*)(void*))streamFreeConsumer); 1638 zfree(cg); 1639 } 1640 1641 /* Lookup the consumer group in the specified stream and returns its 1642 * pointer, otherwise if there is no such group, NULL is returned. */ 1643 streamCG *streamLookupCG(stream *s, sds groupname) { 1644 if (s->cgroups == NULL) return NULL; 1645 streamCG *cg = raxFind(s->cgroups,(unsigned char*)groupname, 1646 sdslen(groupname)); 1647 return (cg == raxNotFound) ? NULL : cg; 1648 } 1649 1650 /* Lookup the consumer with the specified name in the group 'cg': if the 1651 * consumer does not exist it is automatically created as a side effect 1652 * of calling this function, otherwise its last seen time is updated and 1653 * the existing consumer reference returned. */ 1654 streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) { 1655 streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name, 1656 sdslen(name)); 1657 if (consumer == raxNotFound) { 1658 if (!create) return NULL; 1659 consumer = zmalloc(sizeof(*consumer)); 1660 consumer->name = sdsdup(name); 1661 consumer->pel = raxNew(); 1662 raxInsert(cg->consumers,(unsigned char*)name,sdslen(name), 1663 consumer,NULL); 1664 } 1665 consumer->seen_time = mstime(); 1666 return consumer; 1667 } 1668 1669 /* Delete the consumer specified in the consumer group 'cg'. The consumer 1670 * may have pending messages: they are removed from the PEL, and the number 1671 * of pending messages "lost" is returned. */ 1672 uint64_t streamDelConsumer(streamCG *cg, sds name) { 1673 streamConsumer *consumer = streamLookupConsumer(cg,name,0); 1674 if (consumer == NULL) return 0; 1675 1676 uint64_t retval = raxSize(consumer->pel); 1677 1678 /* Iterate all the consumer pending messages, deleting every corresponding 1679 * entry from the global entry. */ 1680 raxIterator ri; 1681 raxStart(&ri,consumer->pel); 1682 raxSeek(&ri,"^",NULL,0); 1683 while(raxNext(&ri)) { 1684 streamNACK *nack = ri.data; 1685 raxRemove(cg->pel,ri.key,ri.key_len,NULL); 1686 streamFreeNACK(nack); 1687 } 1688 raxStop(&ri); 1689 1690 /* Deallocate the consumer. */ 1691 raxRemove(cg->consumers,(unsigned char*)name,sdslen(name),NULL); 1692 streamFreeConsumer(consumer); 1693 return retval; 1694 } 1695 1696 /* ----------------------------------------------------------------------- 1697 * Consumer groups commands 1698 * ----------------------------------------------------------------------- */ 1699 1700 /* XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM] 1701 * XGROUP SETID <key> <groupname> <id or $> 1702 * XGROUP DESTROY <key> <groupname> 1703 * XGROUP DELCONSUMER <key> <groupname> <consumername> */ 1704 void xgroupCommand(client *c) { 1705 const char *help[] = { 1706 "CREATE <key> <groupname> <id or $> [opt] -- Create a new consumer group.", 1707 " option MKSTREAM: create the empty stream if it does not exist.", 1708 "SETID <key> <groupname> <id or $> -- Set the current group ID.", 1709 "DESTROY <key> <groupname> -- Remove the specified group.", 1710 "DELCONSUMER <key> <groupname> <consumer> -- Remove the specified consumer.", 1711 "HELP -- Prints this help.", 1712 NULL 1713 }; 1714 stream *s = NULL; 1715 sds grpname = NULL; 1716 streamCG *cg = NULL; 1717 char *opt = c->argv[1]->ptr; /* Subcommand name. */ 1718 int mkstream = 0; 1719 robj *o; 1720 1721 /* CREATE has an MKSTREAM option that creates the stream if it 1722 * does not exist. */ 1723 if (c->argc == 6 && !strcasecmp(opt,"CREATE")) { 1724 if (strcasecmp(c->argv[5]->ptr,"MKSTREAM")) { 1725 addReplySubcommandSyntaxError(c); 1726 return; 1727 } 1728 mkstream = 1; 1729 grpname = c->argv[3]->ptr; 1730 } 1731 1732 /* Everything but the "HELP" option requires a key and group name. */ 1733 if (c->argc >= 4) { 1734 o = lookupKeyWrite(c->db,c->argv[2]); 1735 if (o) { 1736 if (checkType(c,o,OBJ_STREAM)) return; 1737 s = o->ptr; 1738 } 1739 grpname = c->argv[3]->ptr; 1740 } 1741 1742 /* Check for missing key/group. */ 1743 if (c->argc >= 4 && !mkstream) { 1744 /* At this point key must exist, or there is an error. */ 1745 if (s == NULL) { 1746 addReplyError(c, 1747 "The XGROUP subcommand requires the key to exist. " 1748 "Note that for CREATE you may want to use the MKSTREAM " 1749 "option to create an empty stream automatically."); 1750 return; 1751 } 1752 1753 /* Certain subcommands require the group to exist. */ 1754 if ((cg = streamLookupCG(s,grpname)) == NULL && 1755 (!strcasecmp(opt,"SETID") || 1756 !strcasecmp(opt,"DELCONSUMER"))) 1757 { 1758 addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' " 1759 "for key name '%s'", 1760 (char*)grpname, (char*)c->argv[2]->ptr); 1761 return; 1762 } 1763 } 1764 1765 /* Dispatch the different subcommands. */ 1766 if (!strcasecmp(opt,"CREATE") && (c->argc == 5 || c->argc == 6)) { 1767 streamID id; 1768 if (!strcmp(c->argv[4]->ptr,"$")) { 1769 if (s) { 1770 id = s->last_id; 1771 } else { 1772 id.ms = 0; 1773 id.seq = 0; 1774 } 1775 } else if (streamParseStrictIDOrReply(c,c->argv[4],&id,0) != C_OK) { 1776 return; 1777 } 1778 1779 /* Handle the MKSTREAM option now that the command can no longer fail. */ 1780 if (s == NULL) { 1781 serverAssert(mkstream); 1782 o = createStreamObject(); 1783 dbAdd(c->db,c->argv[2],o); 1784 s = o->ptr; 1785 } 1786 1787 streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id); 1788 if (cg) { 1789 addReply(c,shared.ok); 1790 server.dirty++; 1791 notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-create", 1792 c->argv[2],c->db->id); 1793 } else { 1794 addReplySds(c, 1795 sdsnew("-BUSYGROUP Consumer Group name already exists\r\n")); 1796 } 1797 } else if (!strcasecmp(opt,"SETID") && c->argc == 5) { 1798 streamID id; 1799 if (!strcmp(c->argv[4]->ptr,"$")) { 1800 id = s->last_id; 1801 } else if (streamParseIDOrReply(c,c->argv[4],&id,0) != C_OK) { 1802 return; 1803 } 1804 cg->last_id = id; 1805 addReply(c,shared.ok); 1806 server.dirty++; 1807 notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-setid",c->argv[2],c->db->id); 1808 } else if (!strcasecmp(opt,"DESTROY") && c->argc == 4) { 1809 if (cg) { 1810 raxRemove(s->cgroups,(unsigned char*)grpname,sdslen(grpname),NULL); 1811 streamFreeCG(cg); 1812 addReply(c,shared.cone); 1813 server.dirty++; 1814 notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-destroy", 1815 c->argv[2],c->db->id); 1816 } else { 1817 addReply(c,shared.czero); 1818 } 1819 } else if (!strcasecmp(opt,"DELCONSUMER") && c->argc == 5) { 1820 /* Delete the consumer and returns the number of pending messages 1821 * that were yet associated with such a consumer. */ 1822 long long pending = streamDelConsumer(cg,c->argv[4]->ptr); 1823 addReplyLongLong(c,pending); 1824 server.dirty++; 1825 notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-delconsumer", 1826 c->argv[2],c->db->id); 1827 } else if (!strcasecmp(opt,"HELP")) { 1828 addReplyHelp(c, help); 1829 } else { 1830 addReplySubcommandSyntaxError(c); 1831 } 1832 } 1833 1834 /* XSETID <stream> <groupname> <id> 1835 * 1836 * Set the internal "last ID" of a stream. */ 1837 void xsetidCommand(client *c) { 1838 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr); 1839 if (o == NULL || checkType(c,o,OBJ_STREAM)) return; 1840 1841 stream *s = o->ptr; 1842 streamID id; 1843 if (streamParseStrictIDOrReply(c,c->argv[2],&id,0) != C_OK) return; 1844 1845 /* If the stream has at least one item, we want to check that the user 1846 * is setting a last ID that is equal or greater than the current top 1847 * item, otherwise the fundamental ID monotonicity assumption is violated. */ 1848 if (s->length > 0) { 1849 streamID maxid; 1850 streamIterator si; 1851 streamIteratorStart(&si,s,NULL,NULL,1); 1852 int64_t numfields; 1853 streamIteratorGetID(&si,&maxid,&numfields); 1854 streamIteratorStop(&si); 1855 1856 if (streamCompareID(&id,&maxid) < 0) { 1857 addReplyError(c,"The ID specified in XSETID is smaller than the " 1858 "target stream top item"); 1859 return; 1860 } 1861 } 1862 s->last_id = id; 1863 addReply(c,shared.ok); 1864 server.dirty++; 1865 notifyKeyspaceEvent(NOTIFY_STREAM,"xsetid",c->argv[1],c->db->id); 1866 } 1867 1868 /* XACK <key> <group> <id> <id> ... <id> 1869 * 1870 * Acknowledge a message as processed. In practical terms we just check the 1871 * pendine entries list (PEL) of the group, and delete the PEL entry both from 1872 * the group and the consumer (pending messages are referenced in both places). 1873 * 1874 * Return value of the command is the number of messages successfully 1875 * acknowledged, that is, the IDs we were actually able to resolve in the PEL. 1876 */ 1877 void xackCommand(client *c) { 1878 streamCG *group = NULL; 1879 robj *o = lookupKeyRead(c->db,c->argv[1]); 1880 if (o) { 1881 if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */ 1882 group = streamLookupCG(o->ptr,c->argv[2]->ptr); 1883 } 1884 1885 /* No key or group? Nothing to ack. */ 1886 if (o == NULL || group == NULL) { 1887 addReply(c,shared.czero); 1888 return; 1889 } 1890 1891 int acknowledged = 0; 1892 for (int j = 3; j < c->argc; j++) { 1893 streamID id; 1894 unsigned char buf[sizeof(streamID)]; 1895 if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return; 1896 streamEncodeID(buf,&id); 1897 1898 /* Lookup the ID in the group PEL: it will have a reference to the 1899 * NACK structure that will have a reference to the consumer, so that 1900 * we are able to remove the entry from both PELs. */ 1901 streamNACK *nack = raxFind(group->pel,buf,sizeof(buf)); 1902 if (nack != raxNotFound) { 1903 raxRemove(group->pel,buf,sizeof(buf),NULL); 1904 raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); 1905 streamFreeNACK(nack); 1906 acknowledged++; 1907 server.dirty++; 1908 } 1909 } 1910 addReplyLongLong(c,acknowledged); 1911 } 1912 1913 /* XPENDING <key> <group> [<start> <stop> <count> [<consumer>]] 1914 * 1915 * If start and stop are omitted, the command just outputs information about 1916 * the amount of pending messages for the key/group pair, together with 1917 * the minimum and maxium ID of pending messages. 1918 * 1919 * If start and stop are provided instead, the pending messages are returned 1920 * with informations about the current owner, number of deliveries and last 1921 * delivery time and so forth. */ 1922 void xpendingCommand(client *c) { 1923 int justinfo = c->argc == 3; /* Without the range just outputs general 1924 informations about the PEL. */ 1925 robj *key = c->argv[1]; 1926 robj *groupname = c->argv[2]; 1927 robj *consumername = (c->argc == 7) ? c->argv[6] : NULL; 1928 streamID startid, endid; 1929 long long count; 1930 1931 /* Start and stop, and the consumer, can be omitted. */ 1932 if (c->argc != 3 && c->argc != 6 && c->argc != 7) { 1933 addReply(c,shared.syntaxerr); 1934 return; 1935 } 1936 1937 /* Parse start/end/count arguments ASAP if needed, in order to report 1938 * syntax errors before any other error. */ 1939 if (c->argc >= 6) { 1940 if (getLongLongFromObjectOrReply(c,c->argv[5],&count,NULL) == C_ERR) 1941 return; 1942 if (count < 0) count = 0; 1943 if (streamParseIDOrReply(c,c->argv[3],&startid,0) == C_ERR) 1944 return; 1945 if (streamParseIDOrReply(c,c->argv[4],&endid,UINT64_MAX) == C_ERR) 1946 return; 1947 } 1948 1949 /* Lookup the key and the group inside the stream. */ 1950 robj *o = lookupKeyRead(c->db,c->argv[1]); 1951 streamCG *group; 1952 1953 if (o && checkType(c,o,OBJ_STREAM)) return; 1954 if (o == NULL || 1955 (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL) 1956 { 1957 addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer " 1958 "group '%s'", 1959 (char*)key->ptr,(char*)groupname->ptr); 1960 return; 1961 } 1962 1963 /* XPENDING <key> <group> variant. */ 1964 if (justinfo) { 1965 addReplyMultiBulkLen(c,4); 1966 /* Total number of messages in the PEL. */ 1967 addReplyLongLong(c,raxSize(group->pel)); 1968 /* First and last IDs. */ 1969 if (raxSize(group->pel) == 0) { 1970 addReply(c,shared.nullbulk); /* Start. */ 1971 addReply(c,shared.nullbulk); /* End. */ 1972 addReply(c,shared.nullmultibulk); /* Clients. */ 1973 } else { 1974 /* Start. */ 1975 raxIterator ri; 1976 raxStart(&ri,group->pel); 1977 raxSeek(&ri,"^",NULL,0); 1978 raxNext(&ri); 1979 streamDecodeID(ri.key,&startid); 1980 addReplyStreamID(c,&startid); 1981 1982 /* End. */ 1983 raxSeek(&ri,"$",NULL,0); 1984 raxNext(&ri); 1985 streamDecodeID(ri.key,&endid); 1986 addReplyStreamID(c,&endid); 1987 raxStop(&ri); 1988 1989 /* Consumers with pending messages. */ 1990 raxStart(&ri,group->consumers); 1991 raxSeek(&ri,"^",NULL,0); 1992 void *arraylen_ptr = addDeferredMultiBulkLength(c); 1993 size_t arraylen = 0; 1994 while(raxNext(&ri)) { 1995 streamConsumer *consumer = ri.data; 1996 if (raxSize(consumer->pel) == 0) continue; 1997 addReplyMultiBulkLen(c,2); 1998 addReplyBulkCBuffer(c,ri.key,ri.key_len); 1999 addReplyBulkLongLong(c,raxSize(consumer->pel)); 2000 arraylen++; 2001 } 2002 setDeferredMultiBulkLength(c,arraylen_ptr,arraylen); 2003 raxStop(&ri); 2004 } 2005 } 2006 /* XPENDING <key> <group> <start> <stop> <count> [<consumer>] variant. */ 2007 else { 2008 streamConsumer *consumer = consumername ? 2009 streamLookupConsumer(group,consumername->ptr,0): 2010 NULL; 2011 2012 /* If a consumer name was mentioned but it does not exist, we can 2013 * just return an empty array. */ 2014 if (consumername && consumer == NULL) { 2015 addReplyMultiBulkLen(c,0); 2016 return; 2017 } 2018 2019 rax *pel = consumer ? consumer->pel : group->pel; 2020 unsigned char startkey[sizeof(streamID)]; 2021 unsigned char endkey[sizeof(streamID)]; 2022 raxIterator ri; 2023 mstime_t now = mstime(); 2024 2025 streamEncodeID(startkey,&startid); 2026 streamEncodeID(endkey,&endid); 2027 raxStart(&ri,pel); 2028 raxSeek(&ri,">=",startkey,sizeof(startkey)); 2029 void *arraylen_ptr = addDeferredMultiBulkLength(c); 2030 size_t arraylen = 0; 2031 2032 while(count && raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) { 2033 streamNACK *nack = ri.data; 2034 2035 arraylen++; 2036 count--; 2037 addReplyMultiBulkLen(c,4); 2038 2039 /* Entry ID. */ 2040 streamID id; 2041 streamDecodeID(ri.key,&id); 2042 addReplyStreamID(c,&id); 2043 2044 /* Consumer name. */ 2045 addReplyBulkCBuffer(c,nack->consumer->name, 2046 sdslen(nack->consumer->name)); 2047 2048 /* Milliseconds elapsed since last delivery. */ 2049 mstime_t elapsed = now - nack->delivery_time; 2050 if (elapsed < 0) elapsed = 0; 2051 addReplyLongLong(c,elapsed); 2052 2053 /* Number of deliveries. */ 2054 addReplyLongLong(c,nack->delivery_count); 2055 } 2056 raxStop(&ri); 2057 setDeferredMultiBulkLength(c,arraylen_ptr,arraylen); 2058 } 2059 } 2060 2061 /* XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> 2062 * [IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>] 2063 * [FORCE] [JUSTID] 2064 * 2065 * Gets ownership of one or multiple messages in the Pending Entries List 2066 * of a given stream consumer group. 2067 * 2068 * If the message ID (among the specified ones) exists, and its idle 2069 * time greater or equal to <min-idle-time>, then the message new owner 2070 * becomes the specified <consumer>. If the minimum idle time specified 2071 * is zero, messages are claimed regardless of their idle time. 2072 * 2073 * All the messages that cannot be found inside the pending entries list 2074 * are ignored, but in case the FORCE option is used. In that case we 2075 * create the NACK (representing a not yet acknowledged message) entry in 2076 * the consumer group PEL. 2077 * 2078 * This command creates the consumer as side effect if it does not yet 2079 * exists. Moreover the command reset the idle time of the message to 0, 2080 * even if by using the IDLE or TIME options, the user can control the 2081 * new idle time. 2082 * 2083 * The options at the end can be used in order to specify more attributes 2084 * to set in the representation of the pending message: 2085 * 2086 * 1. IDLE <ms>: 2087 * Set the idle time (last time it was delivered) of the message. 2088 * If IDLE is not specified, an IDLE of 0 is assumed, that is, 2089 * the time count is reset because the message has now a new 2090 * owner trying to process it. 2091 * 2092 * 2. TIME <ms-unix-time>: 2093 * This is the same as IDLE but instead of a relative amount of 2094 * milliseconds, it sets the idle time to a specific unix time 2095 * (in milliseconds). This is useful in order to rewrite the AOF 2096 * file generating XCLAIM commands. 2097 * 2098 * 3. RETRYCOUNT <count>: 2099 * Set the retry counter to the specified value. This counter is 2100 * incremented every time a message is delivered again. Normally 2101 * XCLAIM does not alter this counter, which is just served to clients 2102 * when the XPENDING command is called: this way clients can detect 2103 * anomalies, like messages that are never processed for some reason 2104 * after a big number of delivery attempts. 2105 * 2106 * 4. FORCE: 2107 * Creates the pending message entry in the PEL even if certain 2108 * specified IDs are not already in the PEL assigned to a different 2109 * client. However the message must be exist in the stream, otherwise 2110 * the IDs of non existing messages are ignored. 2111 * 2112 * 5. JUSTID: 2113 * Return just an array of IDs of messages successfully claimed, 2114 * without returning the actual message. 2115 * 2116 * 6. LASTID <id>: 2117 * Update the consumer group last ID with the specified ID if the 2118 * current last ID is smaller than the provided one. 2119 * This is used for replication / AOF, so that when we read from a 2120 * consumer group, the XCLAIM that gets propagated to give ownership 2121 * to the consumer, is also used in order to update the group current 2122 * ID. 2123 * 2124 * The command returns an array of messages that the user 2125 * successfully claimed, so that the caller is able to understand 2126 * what messages it is now in charge of. */ 2127 void xclaimCommand(client *c) { 2128 streamCG *group = NULL; 2129 robj *o = lookupKeyRead(c->db,c->argv[1]); 2130 long long minidle; /* Minimum idle time argument. */ 2131 long long retrycount = -1; /* -1 means RETRYCOUNT option not given. */ 2132 mstime_t deliverytime = -1; /* -1 means IDLE/TIME options not given. */ 2133 int force = 0; 2134 int justid = 0; 2135 2136 if (o) { 2137 if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */ 2138 group = streamLookupCG(o->ptr,c->argv[2]->ptr); 2139 } 2140 2141 /* No key or group? Send an error given that the group creation 2142 * is mandatory. */ 2143 if (o == NULL || group == NULL) { 2144 addReplyErrorFormat(c,"-NOGROUP No such key '%s' or " 2145 "consumer group '%s'", (char*)c->argv[1]->ptr, 2146 (char*)c->argv[2]->ptr); 2147 return; 2148 } 2149 2150 if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle, 2151 "Invalid min-idle-time argument for XCLAIM") 2152 != C_OK) return; 2153 if (minidle < 0) minidle = 0; 2154 2155 /* Start parsing the IDs, so that we abort ASAP if there is a syntax 2156 * error: the return value of this command cannot be an error in case 2157 * the client successfully claimed some message, so it should be 2158 * executed in a "all or nothing" fashion. */ 2159 int j; 2160 for (j = 5; j < c->argc; j++) { 2161 streamID id; 2162 if (streamParseStrictIDOrReply(NULL,c->argv[j],&id,0) != C_OK) break; 2163 } 2164 int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */ 2165 2166 /* If we stopped because some IDs cannot be parsed, perhaps they 2167 * are trailing options. */ 2168 mstime_t now = mstime(); 2169 streamID last_id = {0,0}; 2170 int propagate_last_id = 0; 2171 for (; j < c->argc; j++) { 2172 int moreargs = (c->argc-1) - j; /* Number of additional arguments. */ 2173 char *opt = c->argv[j]->ptr; 2174 if (!strcasecmp(opt,"FORCE")) { 2175 force = 1; 2176 } else if (!strcasecmp(opt,"JUSTID")) { 2177 justid = 1; 2178 } else if (!strcasecmp(opt,"IDLE") && moreargs) { 2179 j++; 2180 if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime, 2181 "Invalid IDLE option argument for XCLAIM") 2182 != C_OK) return; 2183 deliverytime = now - deliverytime; 2184 } else if (!strcasecmp(opt,"TIME") && moreargs) { 2185 j++; 2186 if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime, 2187 "Invalid TIME option argument for XCLAIM") 2188 != C_OK) return; 2189 } else if (!strcasecmp(opt,"RETRYCOUNT") && moreargs) { 2190 j++; 2191 if (getLongLongFromObjectOrReply(c,c->argv[j],&retrycount, 2192 "Invalid RETRYCOUNT option argument for XCLAIM") 2193 != C_OK) return; 2194 } else if (!strcasecmp(opt,"LASTID") && moreargs) { 2195 j++; 2196 if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0) != C_OK) return; 2197 } else { 2198 addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt); 2199 return; 2200 } 2201 } 2202 2203 if (streamCompareID(&last_id,&group->last_id) > 0) { 2204 group->last_id = last_id; 2205 propagate_last_id = 1; 2206 } 2207 2208 if (deliverytime != -1) { 2209 /* If a delivery time was passed, either with IDLE or TIME, we 2210 * do some sanity check on it, and set the deliverytime to now 2211 * (which is a sane choice usually) if the value is bogus. 2212 * To raise an error here is not wise because clients may compute 2213 * the idle time doing some math starting from their local time, 2214 * and this is not a good excuse to fail in case, for instance, 2215 * the computer time is a bit in the future from our POV. */ 2216 if (deliverytime < 0 || deliverytime > now) deliverytime = now; 2217 } else { 2218 /* If no IDLE/TIME option was passed, we want the last delivery 2219 * time to be now, so that the idle time of the message will be 2220 * zero. */ 2221 deliverytime = now; 2222 } 2223 2224 /* Do the actual claiming. */ 2225 streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr,1); 2226 void *arraylenptr = addDeferredMultiBulkLength(c); 2227 size_t arraylen = 0; 2228 for (int j = 5; j <= last_id_arg; j++) { 2229 streamID id; 2230 unsigned char buf[sizeof(streamID)]; 2231 if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) 2232 serverPanic("StreamID invalid after check. Should not be possible."); 2233 streamEncodeID(buf,&id); 2234 2235 /* Lookup the ID in the group PEL. */ 2236 streamNACK *nack = raxFind(group->pel,buf,sizeof(buf)); 2237 2238 /* If FORCE is passed, let's check if at least the entry 2239 * exists in the Stream. In such case, we'll crate a new 2240 * entry in the PEL from scratch, so that XCLAIM can also 2241 * be used to create entries in the PEL. Useful for AOF 2242 * and replication of consumer groups. */ 2243 if (force && nack == raxNotFound) { 2244 streamIterator myiterator; 2245 streamIteratorStart(&myiterator,o->ptr,&id,&id,0); 2246 int64_t numfields; 2247 int found = 0; 2248 streamID item_id; 2249 if (streamIteratorGetID(&myiterator,&item_id,&numfields)) found = 1; 2250 streamIteratorStop(&myiterator); 2251 2252 /* Item must exist for us to create a NACK for it. */ 2253 if (!found) continue; 2254 2255 /* Create the NACK. */ 2256 nack = streamCreateNACK(NULL); 2257 raxInsert(group->pel,buf,sizeof(buf),nack,NULL); 2258 } 2259 2260 if (nack != raxNotFound) { 2261 /* We need to check if the minimum idle time requested 2262 * by the caller is satisfied by this entry. 2263 * 2264 * Note that the nack could be created by FORCE, in this 2265 * case there was no pre-existing entry and minidle should 2266 * be ignored, but in that case nick->consumer is NULL. */ 2267 if (nack->consumer && minidle) { 2268 mstime_t this_idle = now - nack->delivery_time; 2269 if (this_idle < minidle) continue; 2270 } 2271 /* Remove the entry from the old consumer. 2272 * Note that nack->consumer is NULL if we created the 2273 * NACK above because of the FORCE option. */ 2274 if (nack->consumer) 2275 raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); 2276 /* Update the consumer and idle time. */ 2277 nack->consumer = consumer; 2278 nack->delivery_time = deliverytime; 2279 /* Set the delivery attempts counter if given, otherwise 2280 * autoincrement unless JUSTID option provided */ 2281 if (retrycount >= 0) { 2282 nack->delivery_count = retrycount; 2283 } else if (!justid) { 2284 nack->delivery_count++; 2285 } 2286 /* Add the entry in the new consumer local PEL. */ 2287 raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); 2288 /* Send the reply for this entry. */ 2289 if (justid) { 2290 addReplyStreamID(c,&id); 2291 } else { 2292 size_t emitted = streamReplyWithRange(c,o->ptr,&id,&id,1,0, 2293 NULL,NULL,STREAM_RWR_RAWENTRIES,NULL); 2294 if (!emitted) addReply(c,shared.nullbulk); 2295 } 2296 arraylen++; 2297 2298 /* Propagate this change. */ 2299 streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack); 2300 propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */ 2301 server.dirty++; 2302 } 2303 } 2304 if (propagate_last_id) { 2305 streamPropagateGroupID(c,c->argv[1],group,c->argv[2]); 2306 server.dirty++; 2307 } 2308 setDeferredMultiBulkLength(c,arraylenptr,arraylen); 2309 preventCommandPropagation(c); 2310 } 2311 2312 2313 /* XDEL <key> [<ID1> <ID2> ... <IDN>] 2314 * 2315 * Removes the specified entries from the stream. Returns the number 2316 * of items actually deleted, that may be different from the number 2317 * of IDs passed in case certain IDs do not exist. */ 2318 void xdelCommand(client *c) { 2319 robj *o; 2320 2321 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL 2322 || checkType(c,o,OBJ_STREAM)) return; 2323 stream *s = o->ptr; 2324 2325 /* We need to sanity check the IDs passed to start. Even if not 2326 * a big issue, it is not great that the command is only partially 2327 * executed because at some point an invalid ID is parsed. */ 2328 streamID id; 2329 for (int j = 2; j < c->argc; j++) { 2330 if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return; 2331 } 2332 2333 /* Actually apply the command. */ 2334 int deleted = 0; 2335 for (int j = 2; j < c->argc; j++) { 2336 streamParseStrictIDOrReply(c,c->argv[j],&id,0); /* Retval already checked. */ 2337 deleted += streamDeleteItem(s,&id); 2338 } 2339 2340 /* Propagate the write if needed. */ 2341 if (deleted) { 2342 signalModifiedKey(c->db,c->argv[1]); 2343 notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id); 2344 server.dirty += deleted; 2345 } 2346 addReplyLongLong(c,deleted); 2347 } 2348 2349 /* General form: XTRIM <key> [... options ...] 2350 * 2351 * List of options: 2352 * 2353 * MAXLEN [~|=] <count> -- Trim so that the stream will be capped at 2354 * the specified length. Use ~ before the 2355 * count in order to demand approximated trimming 2356 * (like XADD MAXLEN option). 2357 */ 2358 2359 #define TRIM_STRATEGY_NONE 0 2360 #define TRIM_STRATEGY_MAXLEN 1 2361 void xtrimCommand(client *c) { 2362 robj *o; 2363 2364 /* If the key does not exist, we are ok returning zero, that is, the 2365 * number of elements removed from the stream. */ 2366 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL 2367 || checkType(c,o,OBJ_STREAM)) return; 2368 stream *s = o->ptr; 2369 2370 /* Argument parsing. */ 2371 int trim_strategy = TRIM_STRATEGY_NONE; 2372 long long maxlen = -1; /* If left to -1 no trimming is performed. */ 2373 int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so 2374 the maxium length is not applied verbatim. */ 2375 int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */ 2376 2377 /* Parse options. */ 2378 int i = 2; /* Start of options. */ 2379 for (; i < c->argc; i++) { 2380 int moreargs = (c->argc-1) - i; /* Number of additional arguments. */ 2381 char *opt = c->argv[i]->ptr; 2382 if (!strcasecmp(opt,"maxlen") && moreargs) { 2383 approx_maxlen = 0; 2384 trim_strategy = TRIM_STRATEGY_MAXLEN; 2385 char *next = c->argv[i+1]->ptr; 2386 /* Check for the form MAXLEN ~ <count>. */ 2387 if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') { 2388 approx_maxlen = 1; 2389 i++; 2390 } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') { 2391 i++; 2392 } 2393 if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL) 2394 != C_OK) return; 2395 2396 if (maxlen < 0) { 2397 addReplyError(c,"The MAXLEN argument must be >= 0."); 2398 return; 2399 } 2400 i++; 2401 maxlen_arg_idx = i; 2402 } else { 2403 addReply(c,shared.syntaxerr); 2404 return; 2405 } 2406 } 2407 2408 /* Perform the trimming. */ 2409 int64_t deleted = 0; 2410 if (trim_strategy == TRIM_STRATEGY_MAXLEN) { 2411 deleted = streamTrimByLength(s,maxlen,approx_maxlen); 2412 } else { 2413 addReplyError(c,"XTRIM called without an option to trim the stream"); 2414 return; 2415 } 2416 2417 /* Propagate the write if needed. */ 2418 if (deleted) { 2419 signalModifiedKey(c->db,c->argv[1]); 2420 notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); 2421 server.dirty += deleted; 2422 if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx); 2423 } 2424 addReplyLongLong(c,deleted); 2425 } 2426 2427 /* XINFO CONSUMERS <key> <group> 2428 * XINFO GROUPS <key> 2429 * XINFO STREAM <key> 2430 * XINFO HELP. */ 2431 void xinfoCommand(client *c) { 2432 const char *help[] = { 2433 "CONSUMERS <key> <groupname> -- Show consumer groups of group <groupname>.", 2434 "GROUPS <key> -- Show the stream consumer groups.", 2435 "STREAM <key> -- Show information about the stream.", 2436 "HELP -- Print this help.", 2437 NULL 2438 }; 2439 stream *s = NULL; 2440 char *opt; 2441 robj *key; 2442 2443 /* HELP is special. Handle it ASAP. */ 2444 if (!strcasecmp(c->argv[1]->ptr,"HELP")) { 2445 addReplyHelp(c, help); 2446 return; 2447 } else if (c->argc < 3) { 2448 addReplyError(c,"syntax error, try 'XINFO HELP'"); 2449 return; 2450 } 2451 2452 /* With the exception of HELP handled before any other sub commands, all 2453 * the ones are in the form of "<subcommand> <key>". */ 2454 opt = c->argv[1]->ptr; 2455 key = c->argv[2]; 2456 2457 /* Lookup the key now, this is common for all the subcommands but HELP. */ 2458 robj *o = lookupKeyWriteOrReply(c,key,shared.nokeyerr); 2459 if (o == NULL || checkType(c,o,OBJ_STREAM)) return; 2460 s = o->ptr; 2461 2462 /* Dispatch the different subcommands. */ 2463 if (!strcasecmp(opt,"CONSUMERS") && c->argc == 4) { 2464 /* XINFO CONSUMERS <key> <group>. */ 2465 streamCG *cg = streamLookupCG(s,c->argv[3]->ptr); 2466 if (cg == NULL) { 2467 addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' " 2468 "for key name '%s'", 2469 (char*)c->argv[3]->ptr, (char*)key->ptr); 2470 return; 2471 } 2472 2473 addReplyMultiBulkLen(c,raxSize(cg->consumers)); 2474 raxIterator ri; 2475 raxStart(&ri,cg->consumers); 2476 raxSeek(&ri,"^",NULL,0); 2477 mstime_t now = mstime(); 2478 while(raxNext(&ri)) { 2479 streamConsumer *consumer = ri.data; 2480 mstime_t idle = now - consumer->seen_time; 2481 if (idle < 0) idle = 0; 2482 2483 addReplyMultiBulkLen(c,6); 2484 addReplyBulkCString(c,"name"); 2485 addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name)); 2486 addReplyBulkCString(c,"pending"); 2487 addReplyLongLong(c,raxSize(consumer->pel)); 2488 addReplyBulkCString(c,"idle"); 2489 addReplyLongLong(c,idle); 2490 } 2491 raxStop(&ri); 2492 } else if (!strcasecmp(opt,"GROUPS") && c->argc == 3) { 2493 /* XINFO GROUPS <key>. */ 2494 if (s->cgroups == NULL) { 2495 addReplyMultiBulkLen(c,0); 2496 return; 2497 } 2498 2499 addReplyMultiBulkLen(c,raxSize(s->cgroups)); 2500 raxIterator ri; 2501 raxStart(&ri,s->cgroups); 2502 raxSeek(&ri,"^",NULL,0); 2503 while(raxNext(&ri)) { 2504 streamCG *cg = ri.data; 2505 addReplyMultiBulkLen(c,8); 2506 addReplyBulkCString(c,"name"); 2507 addReplyBulkCBuffer(c,ri.key,ri.key_len); 2508 addReplyBulkCString(c,"consumers"); 2509 addReplyLongLong(c,raxSize(cg->consumers)); 2510 addReplyBulkCString(c,"pending"); 2511 addReplyLongLong(c,raxSize(cg->pel)); 2512 addReplyBulkCString(c,"last-delivered-id"); 2513 addReplyStreamID(c,&cg->last_id); 2514 } 2515 raxStop(&ri); 2516 } else if (!strcasecmp(opt,"STREAM") && c->argc == 3) { 2517 /* XINFO STREAM <key> (or the alias XINFO <key>). */ 2518 addReplyMultiBulkLen(c,14); 2519 addReplyBulkCString(c,"length"); 2520 addReplyLongLong(c,s->length); 2521 addReplyBulkCString(c,"radix-tree-keys"); 2522 addReplyLongLong(c,raxSize(s->rax)); 2523 addReplyBulkCString(c,"radix-tree-nodes"); 2524 addReplyLongLong(c,s->rax->numnodes); 2525 addReplyBulkCString(c,"groups"); 2526 addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0); 2527 addReplyBulkCString(c,"last-generated-id"); 2528 addReplyStreamID(c,&s->last_id); 2529 2530 /* To emit the first/last entry we us the streamReplyWithRange() 2531 * API. */ 2532 int count; 2533 streamID start, end; 2534 start.ms = start.seq = 0; 2535 end.ms = end.seq = UINT64_MAX; 2536 addReplyBulkCString(c,"first-entry"); 2537 count = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL, 2538 STREAM_RWR_RAWENTRIES,NULL); 2539 if (!count) addReply(c,shared.nullbulk); 2540 addReplyBulkCString(c,"last-entry"); 2541 count = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL, 2542 STREAM_RWR_RAWENTRIES,NULL); 2543 if (!count) addReply(c,shared.nullbulk); 2544 } else { 2545 addReplySubcommandSyntaxError(c); 2546 } 2547 } 2548 2549