1 /*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * * Neither the name of Redis nor the names of its contributors may be used
15 * to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 /*-----------------------------------------------------------------------------
32 * Sorted set API
33 *----------------------------------------------------------------------------*/
34
35 /* ZSETs are ordered sets using two data structures to hold the same elements
36 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
37 * data structure.
38 *
39 * The elements are added to a hash table mapping Redis objects to scores.
40 * At the same time the elements are added to a skip list mapping scores
41 * to Redis objects (so objects are sorted by scores in this "view").
42 *
43 * Note that the SDS string representing the element is the same in both
44 * the hash table and skiplist in order to save memory. What we do in order
45 * to manage the shared SDS string more easily is to free the SDS string
46 * only in zslFreeNode(). The dictionary has no value free method set.
47 * So we should always remove an element from the dictionary, and later from
48 * the skiplist.
49 *
50 * This skiplist implementation is almost a C translation of the original
51 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
52 * Alternative to Balanced Trees", modified in three ways:
53 * a) this implementation allows for repeated scores.
54 * b) the comparison is not just by key (our 'score') but by satellite data.
55 * c) there is a back pointer, so it's a doubly linked list with the back
56 * pointers being only at "level 1". This allows to traverse the list
57 * from tail to head, useful for ZREVRANGE. */
58
59 #include "server.h"
60 #include <math.h>
61
62 /*-----------------------------------------------------------------------------
63 * Skiplist implementation of the low level API
64 *----------------------------------------------------------------------------*/
65
66 int zslLexValueGteMin(sds value, zlexrangespec *spec);
67 int zslLexValueLteMax(sds value, zlexrangespec *spec);
68
69 /* Create a skiplist node with the specified number of levels.
70 * The SDS string 'ele' is referenced by the node after the call. */
zslCreateNode(int level,double score,sds ele)71 zskiplistNode *zslCreateNode(int level, double score, sds ele) {
72 zskiplistNode *zn =
73 zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
74 zn->score = score;
75 zn->ele = ele;
76 return zn;
77 }
78
79 /* Create a new skiplist. */
zslCreate(void)80 zskiplist *zslCreate(void) {
81 int j;
82 zskiplist *zsl;
83
84 zsl = zmalloc(sizeof(*zsl));
85 zsl->level = 1;
86 zsl->length = 0;
87 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
88 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
89 zsl->header->level[j].forward = NULL;
90 zsl->header->level[j].span = 0;
91 }
92 zsl->header->backward = NULL;
93 zsl->tail = NULL;
94 return zsl;
95 }
96
97 /* Free the specified skiplist node. The referenced SDS string representation
98 * of the element is freed too, unless node->ele is set to NULL before calling
99 * this function. */
zslFreeNode(zskiplistNode * node)100 void zslFreeNode(zskiplistNode *node) {
101 sdsfree(node->ele);
102 zfree(node);
103 }
104
105 /* Free a whole skiplist. */
zslFree(zskiplist * zsl)106 void zslFree(zskiplist *zsl) {
107 zskiplistNode *node = zsl->header->level[0].forward, *next;
108
109 zfree(zsl->header);
110 while(node) {
111 next = node->level[0].forward;
112 zslFreeNode(node);
113 node = next;
114 }
115 zfree(zsl);
116 }
117
118 /* Returns a random level for the new skiplist node we are going to create.
119 * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
120 * (both inclusive), with a powerlaw-alike distribution where higher
121 * levels are less likely to be returned. */
zslRandomLevel(void)122 int zslRandomLevel(void) {
123 int level = 1;
124 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
125 level += 1;
126 return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
127 }
128
129 /* Insert a new node in the skiplist. Assumes the element does not already
130 * exist (up to the caller to enforce that). The skiplist takes ownership
131 * of the passed SDS string 'ele'. */
zslInsert(zskiplist * zsl,double score,sds ele)132 zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
133 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
134 unsigned int rank[ZSKIPLIST_MAXLEVEL];
135 int i, level;
136
137 serverAssert(!isnan(score));
138 x = zsl->header;
139 for (i = zsl->level-1; i >= 0; i--) {
140 /* store rank that is crossed to reach the insert position */
141 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
142 while (x->level[i].forward &&
143 (x->level[i].forward->score < score ||
144 (x->level[i].forward->score == score &&
145 sdscmp(x->level[i].forward->ele,ele) < 0)))
146 {
147 rank[i] += x->level[i].span;
148 x = x->level[i].forward;
149 }
150 update[i] = x;
151 }
152 /* we assume the element is not already inside, since we allow duplicated
153 * scores, reinserting the same element should never happen since the
154 * caller of zslInsert() should test in the hash table if the element is
155 * already inside or not. */
156 level = zslRandomLevel();
157 if (level > zsl->level) {
158 for (i = zsl->level; i < level; i++) {
159 rank[i] = 0;
160 update[i] = zsl->header;
161 update[i]->level[i].span = zsl->length;
162 }
163 zsl->level = level;
164 }
165 x = zslCreateNode(level,score,ele);
166 for (i = 0; i < level; i++) {
167 x->level[i].forward = update[i]->level[i].forward;
168 update[i]->level[i].forward = x;
169
170 /* update span covered by update[i] as x is inserted here */
171 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
172 update[i]->level[i].span = (rank[0] - rank[i]) + 1;
173 }
174
175 /* increment span for untouched levels */
176 for (i = level; i < zsl->level; i++) {
177 update[i]->level[i].span++;
178 }
179
180 x->backward = (update[0] == zsl->header) ? NULL : update[0];
181 if (x->level[0].forward)
182 x->level[0].forward->backward = x;
183 else
184 zsl->tail = x;
185 zsl->length++;
186 return x;
187 }
188
189 /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
zslDeleteNode(zskiplist * zsl,zskiplistNode * x,zskiplistNode ** update)190 void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
191 int i;
192 for (i = 0; i < zsl->level; i++) {
193 if (update[i]->level[i].forward == x) {
194 update[i]->level[i].span += x->level[i].span - 1;
195 update[i]->level[i].forward = x->level[i].forward;
196 } else {
197 update[i]->level[i].span -= 1;
198 }
199 }
200 if (x->level[0].forward) {
201 x->level[0].forward->backward = x->backward;
202 } else {
203 zsl->tail = x->backward;
204 }
205 while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
206 zsl->level--;
207 zsl->length--;
208 }
209
210 /* Delete an element with matching score/element from the skiplist.
211 * The function returns 1 if the node was found and deleted, otherwise
212 * 0 is returned.
213 *
214 * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
215 * it is not freed (but just unlinked) and *node is set to the node pointer,
216 * so that it is possible for the caller to reuse the node (including the
217 * referenced SDS string at node->ele). */
zslDelete(zskiplist * zsl,double score,sds ele,zskiplistNode ** node)218 int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
219 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
220 int i;
221
222 x = zsl->header;
223 for (i = zsl->level-1; i >= 0; i--) {
224 while (x->level[i].forward &&
225 (x->level[i].forward->score < score ||
226 (x->level[i].forward->score == score &&
227 sdscmp(x->level[i].forward->ele,ele) < 0)))
228 {
229 x = x->level[i].forward;
230 }
231 update[i] = x;
232 }
233 /* We may have multiple elements with the same score, what we need
234 * is to find the element with both the right score and object. */
235 x = x->level[0].forward;
236 if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
237 zslDeleteNode(zsl, x, update);
238 if (!node)
239 zslFreeNode(x);
240 else
241 *node = x;
242 return 1;
243 }
244 return 0; /* not found */
245 }
246
247 /* Update the score of an elmenent inside the sorted set skiplist.
248 * Note that the element must exist and must match 'score'.
249 * This function does not update the score in the hash table side, the
250 * caller should take care of it.
251 *
252 * Note that this function attempts to just update the node, in case after
253 * the score update, the node would be exactly at the same position.
254 * Otherwise the skiplist is modified by removing and re-adding a new
255 * element, which is more costly.
256 *
257 * The function returns the updated element skiplist node pointer. */
zslUpdateScore(zskiplist * zsl,double curscore,sds ele,double newscore)258 zskiplistNode *zslUpdateScore(zskiplist *zsl, double curscore, sds ele, double newscore) {
259 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
260 int i;
261
262 /* We need to seek to element to update to start: this is useful anyway,
263 * we'll have to update or remove it. */
264 x = zsl->header;
265 for (i = zsl->level-1; i >= 0; i--) {
266 while (x->level[i].forward &&
267 (x->level[i].forward->score < curscore ||
268 (x->level[i].forward->score == curscore &&
269 sdscmp(x->level[i].forward->ele,ele) < 0)))
270 {
271 x = x->level[i].forward;
272 }
273 update[i] = x;
274 }
275
276 /* Jump to our element: note that this function assumes that the
277 * element with the matching score exists. */
278 x = x->level[0].forward;
279 serverAssert(x && curscore == x->score && sdscmp(x->ele,ele) == 0);
280
281 /* If the node, after the score update, would be still exactly
282 * at the same position, we can just update the score without
283 * actually removing and re-inserting the element in the skiplist. */
284 if ((x->backward == NULL || x->backward->score < newscore) &&
285 (x->level[0].forward == NULL || x->level[0].forward->score > newscore))
286 {
287 x->score = newscore;
288 return x;
289 }
290
291 /* No way to reuse the old node: we need to remove and insert a new
292 * one at a different place. */
293 zslDeleteNode(zsl, x, update);
294 zskiplistNode *newnode = zslInsert(zsl,newscore,x->ele);
295 /* We reused the old node x->ele SDS string, free the node now
296 * since zslInsert created a new one. */
297 x->ele = NULL;
298 zslFreeNode(x);
299 return newnode;
300 }
301
zslValueGteMin(double value,zrangespec * spec)302 int zslValueGteMin(double value, zrangespec *spec) {
303 return spec->minex ? (value > spec->min) : (value >= spec->min);
304 }
305
zslValueLteMax(double value,zrangespec * spec)306 int zslValueLteMax(double value, zrangespec *spec) {
307 return spec->maxex ? (value < spec->max) : (value <= spec->max);
308 }
309
310 /* Returns if there is a part of the zset is in range. */
zslIsInRange(zskiplist * zsl,zrangespec * range)311 int zslIsInRange(zskiplist *zsl, zrangespec *range) {
312 zskiplistNode *x;
313
314 /* Test for ranges that will always be empty. */
315 if (range->min > range->max ||
316 (range->min == range->max && (range->minex || range->maxex)))
317 return 0;
318 x = zsl->tail;
319 if (x == NULL || !zslValueGteMin(x->score,range))
320 return 0;
321 x = zsl->header->level[0].forward;
322 if (x == NULL || !zslValueLteMax(x->score,range))
323 return 0;
324 return 1;
325 }
326
327 /* Find the first node that is contained in the specified range.
328 * Returns NULL when no element is contained in the range. */
zslFirstInRange(zskiplist * zsl,zrangespec * range)329 zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) {
330 zskiplistNode *x;
331 int i;
332
333 /* If everything is out of range, return early. */
334 if (!zslIsInRange(zsl,range)) return NULL;
335
336 x = zsl->header;
337 for (i = zsl->level-1; i >= 0; i--) {
338 /* Go forward while *OUT* of range. */
339 while (x->level[i].forward &&
340 !zslValueGteMin(x->level[i].forward->score,range))
341 x = x->level[i].forward;
342 }
343
344 /* This is an inner range, so the next node cannot be NULL. */
345 x = x->level[0].forward;
346 serverAssert(x != NULL);
347
348 /* Check if score <= max. */
349 if (!zslValueLteMax(x->score,range)) return NULL;
350 return x;
351 }
352
353 /* Find the last node that is contained in the specified range.
354 * Returns NULL when no element is contained in the range. */
zslLastInRange(zskiplist * zsl,zrangespec * range)355 zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range) {
356 zskiplistNode *x;
357 int i;
358
359 /* If everything is out of range, return early. */
360 if (!zslIsInRange(zsl,range)) return NULL;
361
362 x = zsl->header;
363 for (i = zsl->level-1; i >= 0; i--) {
364 /* Go forward while *IN* range. */
365 while (x->level[i].forward &&
366 zslValueLteMax(x->level[i].forward->score,range))
367 x = x->level[i].forward;
368 }
369
370 /* This is an inner range, so this node cannot be NULL. */
371 serverAssert(x != NULL);
372
373 /* Check if score >= min. */
374 if (!zslValueGteMin(x->score,range)) return NULL;
375 return x;
376 }
377
378 /* Delete all the elements with score between min and max from the skiplist.
379 * Min and max are inclusive, so a score >= min || score <= max is deleted.
380 * Note that this function takes the reference to the hash table view of the
381 * sorted set, in order to remove the elements from the hash table too. */
zslDeleteRangeByScore(zskiplist * zsl,zrangespec * range,dict * dict)382 unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dict) {
383 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
384 unsigned long removed = 0;
385 int i;
386
387 x = zsl->header;
388 for (i = zsl->level-1; i >= 0; i--) {
389 while (x->level[i].forward && (range->minex ?
390 x->level[i].forward->score <= range->min :
391 x->level[i].forward->score < range->min))
392 x = x->level[i].forward;
393 update[i] = x;
394 }
395
396 /* Current node is the last with score < or <= min. */
397 x = x->level[0].forward;
398
399 /* Delete nodes while in range. */
400 while (x &&
401 (range->maxex ? x->score < range->max : x->score <= range->max))
402 {
403 zskiplistNode *next = x->level[0].forward;
404 zslDeleteNode(zsl,x,update);
405 dictDelete(dict,x->ele);
406 zslFreeNode(x); /* Here is where x->ele is actually released. */
407 removed++;
408 x = next;
409 }
410 return removed;
411 }
412
zslDeleteRangeByLex(zskiplist * zsl,zlexrangespec * range,dict * dict)413 unsigned long zslDeleteRangeByLex(zskiplist *zsl, zlexrangespec *range, dict *dict) {
414 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
415 unsigned long removed = 0;
416 int i;
417
418
419 x = zsl->header;
420 for (i = zsl->level-1; i >= 0; i--) {
421 while (x->level[i].forward &&
422 !zslLexValueGteMin(x->level[i].forward->ele,range))
423 x = x->level[i].forward;
424 update[i] = x;
425 }
426
427 /* Current node is the last with score < or <= min. */
428 x = x->level[0].forward;
429
430 /* Delete nodes while in range. */
431 while (x && zslLexValueLteMax(x->ele,range)) {
432 zskiplistNode *next = x->level[0].forward;
433 zslDeleteNode(zsl,x,update);
434 dictDelete(dict,x->ele);
435 zslFreeNode(x); /* Here is where x->ele is actually released. */
436 removed++;
437 x = next;
438 }
439 return removed;
440 }
441
442 /* Delete all the elements with rank between start and end from the skiplist.
443 * Start and end are inclusive. Note that start and end need to be 1-based */
zslDeleteRangeByRank(zskiplist * zsl,unsigned int start,unsigned int end,dict * dict)444 unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
445 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
446 unsigned long traversed = 0, removed = 0;
447 int i;
448
449 x = zsl->header;
450 for (i = zsl->level-1; i >= 0; i--) {
451 while (x->level[i].forward && (traversed + x->level[i].span) < start) {
452 traversed += x->level[i].span;
453 x = x->level[i].forward;
454 }
455 update[i] = x;
456 }
457
458 traversed++;
459 x = x->level[0].forward;
460 while (x && traversed <= end) {
461 zskiplistNode *next = x->level[0].forward;
462 zslDeleteNode(zsl,x,update);
463 dictDelete(dict,x->ele);
464 zslFreeNode(x);
465 removed++;
466 traversed++;
467 x = next;
468 }
469 return removed;
470 }
471
472 /* Find the rank for an element by both score and key.
473 * Returns 0 when the element cannot be found, rank otherwise.
474 * Note that the rank is 1-based due to the span of zsl->header to the
475 * first element. */
zslGetRank(zskiplist * zsl,double score,sds ele)476 unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
477 zskiplistNode *x;
478 unsigned long rank = 0;
479 int i;
480
481 x = zsl->header;
482 for (i = zsl->level-1; i >= 0; i--) {
483 while (x->level[i].forward &&
484 (x->level[i].forward->score < score ||
485 (x->level[i].forward->score == score &&
486 sdscmp(x->level[i].forward->ele,ele) <= 0))) {
487 rank += x->level[i].span;
488 x = x->level[i].forward;
489 }
490
491 /* x might be equal to zsl->header, so test if obj is non-NULL */
492 if (x->ele && sdscmp(x->ele,ele) == 0) {
493 return rank;
494 }
495 }
496 return 0;
497 }
498
499 /* Finds an element by its rank. The rank argument needs to be 1-based. */
zslGetElementByRank(zskiplist * zsl,unsigned long rank)500 zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
501 zskiplistNode *x;
502 unsigned long traversed = 0;
503 int i;
504
505 x = zsl->header;
506 for (i = zsl->level-1; i >= 0; i--) {
507 while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
508 {
509 traversed += x->level[i].span;
510 x = x->level[i].forward;
511 }
512 if (traversed == rank) {
513 return x;
514 }
515 }
516 return NULL;
517 }
518
519 /* Populate the rangespec according to the objects min and max. */
zslParseRange(robj * min,robj * max,zrangespec * spec)520 static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
521 char *eptr;
522 spec->minex = spec->maxex = 0;
523
524 /* Parse the min-max interval. If one of the values is prefixed
525 * by the "(" character, it's considered "open". For instance
526 * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
527 * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
528 if (min->encoding == OBJ_ENCODING_INT) {
529 spec->min = (long)min->ptr;
530 } else {
531 if (((char*)min->ptr)[0] == '(') {
532 spec->min = strtod((char*)min->ptr+1,&eptr);
533 if (eptr[0] != '\0' || isnan(spec->min)) return C_ERR;
534 spec->minex = 1;
535 } else {
536 spec->min = strtod((char*)min->ptr,&eptr);
537 if (eptr[0] != '\0' || isnan(spec->min)) return C_ERR;
538 }
539 }
540 if (max->encoding == OBJ_ENCODING_INT) {
541 spec->max = (long)max->ptr;
542 } else {
543 if (((char*)max->ptr)[0] == '(') {
544 spec->max = strtod((char*)max->ptr+1,&eptr);
545 if (eptr[0] != '\0' || isnan(spec->max)) return C_ERR;
546 spec->maxex = 1;
547 } else {
548 spec->max = strtod((char*)max->ptr,&eptr);
549 if (eptr[0] != '\0' || isnan(spec->max)) return C_ERR;
550 }
551 }
552
553 return C_OK;
554 }
555
556 /* ------------------------ Lexicographic ranges ---------------------------- */
557
558 /* Parse max or min argument of ZRANGEBYLEX.
559 * (foo means foo (open interval)
560 * [foo means foo (closed interval)
561 * - means the min string possible
562 * + means the max string possible
563 *
564 * If the string is valid the *dest pointer is set to the redis object
565 * that will be used for the comparison, and ex will be set to 0 or 1
566 * respectively if the item is exclusive or inclusive. C_OK will be
567 * returned.
568 *
569 * If the string is not a valid range C_ERR is returned, and the value
570 * of *dest and *ex is undefined. */
zslParseLexRangeItem(robj * item,sds * dest,int * ex)571 int zslParseLexRangeItem(robj *item, sds *dest, int *ex) {
572 char *c = item->ptr;
573
574 switch(c[0]) {
575 case '+':
576 if (c[1] != '\0') return C_ERR;
577 *ex = 1;
578 *dest = shared.maxstring;
579 return C_OK;
580 case '-':
581 if (c[1] != '\0') return C_ERR;
582 *ex = 1;
583 *dest = shared.minstring;
584 return C_OK;
585 case '(':
586 *ex = 1;
587 *dest = sdsnewlen(c+1,sdslen(c)-1);
588 return C_OK;
589 case '[':
590 *ex = 0;
591 *dest = sdsnewlen(c+1,sdslen(c)-1);
592 return C_OK;
593 default:
594 return C_ERR;
595 }
596 }
597
598 /* Free a lex range structure, must be called only after zelParseLexRange()
599 * populated the structure with success (C_OK returned). */
zslFreeLexRange(zlexrangespec * spec)600 void zslFreeLexRange(zlexrangespec *spec) {
601 if (spec->min != shared.minstring &&
602 spec->min != shared.maxstring) sdsfree(spec->min);
603 if (spec->max != shared.minstring &&
604 spec->max != shared.maxstring) sdsfree(spec->max);
605 }
606
607 /* Populate the lex rangespec according to the objects min and max.
608 *
609 * Return C_OK on success. On error C_ERR is returned.
610 * When OK is returned the structure must be freed with zslFreeLexRange(),
611 * otherwise no release is needed. */
zslParseLexRange(robj * min,robj * max,zlexrangespec * spec)612 int zslParseLexRange(robj *min, robj *max, zlexrangespec *spec) {
613 /* The range can't be valid if objects are integer encoded.
614 * Every item must start with ( or [. */
615 if (min->encoding == OBJ_ENCODING_INT ||
616 max->encoding == OBJ_ENCODING_INT) return C_ERR;
617
618 spec->min = spec->max = NULL;
619 if (zslParseLexRangeItem(min, &spec->min, &spec->minex) == C_ERR ||
620 zslParseLexRangeItem(max, &spec->max, &spec->maxex) == C_ERR) {
621 zslFreeLexRange(spec);
622 return C_ERR;
623 } else {
624 return C_OK;
625 }
626 }
627
628 /* This is just a wrapper to sdscmp() that is able to
629 * handle shared.minstring and shared.maxstring as the equivalent of
630 * -inf and +inf for strings */
sdscmplex(sds a,sds b)631 int sdscmplex(sds a, sds b) {
632 if (a == b) return 0;
633 if (a == shared.minstring || b == shared.maxstring) return -1;
634 if (a == shared.maxstring || b == shared.minstring) return 1;
635 return sdscmp(a,b);
636 }
637
zslLexValueGteMin(sds value,zlexrangespec * spec)638 int zslLexValueGteMin(sds value, zlexrangespec *spec) {
639 return spec->minex ?
640 (sdscmplex(value,spec->min) > 0) :
641 (sdscmplex(value,spec->min) >= 0);
642 }
643
zslLexValueLteMax(sds value,zlexrangespec * spec)644 int zslLexValueLteMax(sds value, zlexrangespec *spec) {
645 return spec->maxex ?
646 (sdscmplex(value,spec->max) < 0) :
647 (sdscmplex(value,spec->max) <= 0);
648 }
649
650 /* Returns if there is a part of the zset is in the lex range. */
zslIsInLexRange(zskiplist * zsl,zlexrangespec * range)651 int zslIsInLexRange(zskiplist *zsl, zlexrangespec *range) {
652 zskiplistNode *x;
653
654 /* Test for ranges that will always be empty. */
655 int cmp = sdscmplex(range->min,range->max);
656 if (cmp > 0 || (cmp == 0 && (range->minex || range->maxex)))
657 return 0;
658 x = zsl->tail;
659 if (x == NULL || !zslLexValueGteMin(x->ele,range))
660 return 0;
661 x = zsl->header->level[0].forward;
662 if (x == NULL || !zslLexValueLteMax(x->ele,range))
663 return 0;
664 return 1;
665 }
666
667 /* Find the first node that is contained in the specified lex range.
668 * Returns NULL when no element is contained in the range. */
zslFirstInLexRange(zskiplist * zsl,zlexrangespec * range)669 zskiplistNode *zslFirstInLexRange(zskiplist *zsl, zlexrangespec *range) {
670 zskiplistNode *x;
671 int i;
672
673 /* If everything is out of range, return early. */
674 if (!zslIsInLexRange(zsl,range)) return NULL;
675
676 x = zsl->header;
677 for (i = zsl->level-1; i >= 0; i--) {
678 /* Go forward while *OUT* of range. */
679 while (x->level[i].forward &&
680 !zslLexValueGteMin(x->level[i].forward->ele,range))
681 x = x->level[i].forward;
682 }
683
684 /* This is an inner range, so the next node cannot be NULL. */
685 x = x->level[0].forward;
686 serverAssert(x != NULL);
687
688 /* Check if score <= max. */
689 if (!zslLexValueLteMax(x->ele,range)) return NULL;
690 return x;
691 }
692
693 /* Find the last node that is contained in the specified range.
694 * Returns NULL when no element is contained in the range. */
zslLastInLexRange(zskiplist * zsl,zlexrangespec * range)695 zskiplistNode *zslLastInLexRange(zskiplist *zsl, zlexrangespec *range) {
696 zskiplistNode *x;
697 int i;
698
699 /* If everything is out of range, return early. */
700 if (!zslIsInLexRange(zsl,range)) return NULL;
701
702 x = zsl->header;
703 for (i = zsl->level-1; i >= 0; i--) {
704 /* Go forward while *IN* range. */
705 while (x->level[i].forward &&
706 zslLexValueLteMax(x->level[i].forward->ele,range))
707 x = x->level[i].forward;
708 }
709
710 /* This is an inner range, so this node cannot be NULL. */
711 serverAssert(x != NULL);
712
713 /* Check if score >= min. */
714 if (!zslLexValueGteMin(x->ele,range)) return NULL;
715 return x;
716 }
717
718 /*-----------------------------------------------------------------------------
719 * Ziplist-backed sorted set API
720 *----------------------------------------------------------------------------*/
721
zzlGetScore(unsigned char * sptr)722 double zzlGetScore(unsigned char *sptr) {
723 unsigned char *vstr;
724 unsigned int vlen;
725 long long vlong;
726 char buf[128];
727 double score;
728
729 serverAssert(sptr != NULL);
730 serverAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
731
732 if (vstr) {
733 memcpy(buf,vstr,vlen);
734 buf[vlen] = '\0';
735 score = strtod(buf,NULL);
736 } else {
737 score = vlong;
738 }
739
740 return score;
741 }
742
743 /* Return a ziplist element as an SDS string. */
ziplistGetObject(unsigned char * sptr)744 sds ziplistGetObject(unsigned char *sptr) {
745 unsigned char *vstr;
746 unsigned int vlen;
747 long long vlong;
748
749 serverAssert(sptr != NULL);
750 serverAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
751
752 if (vstr) {
753 return sdsnewlen((char*)vstr,vlen);
754 } else {
755 return sdsfromlonglong(vlong);
756 }
757 }
758
759 /* Compare element in sorted set with given element. */
zzlCompareElements(unsigned char * eptr,unsigned char * cstr,unsigned int clen)760 int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
761 unsigned char *vstr;
762 unsigned int vlen;
763 long long vlong;
764 unsigned char vbuf[32];
765 int minlen, cmp;
766
767 serverAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
768 if (vstr == NULL) {
769 /* Store string representation of long long in buf. */
770 vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
771 vstr = vbuf;
772 }
773
774 minlen = (vlen < clen) ? vlen : clen;
775 cmp = memcmp(vstr,cstr,minlen);
776 if (cmp == 0) return vlen-clen;
777 return cmp;
778 }
779
zzlLength(unsigned char * zl)780 unsigned int zzlLength(unsigned char *zl) {
781 return ziplistLen(zl)/2;
782 }
783
784 /* Move to next entry based on the values in eptr and sptr. Both are set to
785 * NULL when there is no next entry. */
zzlNext(unsigned char * zl,unsigned char ** eptr,unsigned char ** sptr)786 void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
787 unsigned char *_eptr, *_sptr;
788 serverAssert(*eptr != NULL && *sptr != NULL);
789
790 _eptr = ziplistNext(zl,*sptr);
791 if (_eptr != NULL) {
792 _sptr = ziplistNext(zl,_eptr);
793 serverAssert(_sptr != NULL);
794 } else {
795 /* No next entry. */
796 _sptr = NULL;
797 }
798
799 *eptr = _eptr;
800 *sptr = _sptr;
801 }
802
803 /* Move to the previous entry based on the values in eptr and sptr. Both are
804 * set to NULL when there is no next entry. */
zzlPrev(unsigned char * zl,unsigned char ** eptr,unsigned char ** sptr)805 void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
806 unsigned char *_eptr, *_sptr;
807 serverAssert(*eptr != NULL && *sptr != NULL);
808
809 _sptr = ziplistPrev(zl,*eptr);
810 if (_sptr != NULL) {
811 _eptr = ziplistPrev(zl,_sptr);
812 serverAssert(_eptr != NULL);
813 } else {
814 /* No previous entry. */
815 _eptr = NULL;
816 }
817
818 *eptr = _eptr;
819 *sptr = _sptr;
820 }
821
822 /* Returns if there is a part of the zset is in range. Should only be used
823 * internally by zzlFirstInRange and zzlLastInRange. */
zzlIsInRange(unsigned char * zl,zrangespec * range)824 int zzlIsInRange(unsigned char *zl, zrangespec *range) {
825 unsigned char *p;
826 double score;
827
828 /* Test for ranges that will always be empty. */
829 if (range->min > range->max ||
830 (range->min == range->max && (range->minex || range->maxex)))
831 return 0;
832
833 p = ziplistIndex(zl,-1); /* Last score. */
834 if (p == NULL) return 0; /* Empty sorted set */
835 score = zzlGetScore(p);
836 if (!zslValueGteMin(score,range))
837 return 0;
838
839 p = ziplistIndex(zl,1); /* First score. */
840 serverAssert(p != NULL);
841 score = zzlGetScore(p);
842 if (!zslValueLteMax(score,range))
843 return 0;
844
845 return 1;
846 }
847
848 /* Find pointer to the first element contained in the specified range.
849 * Returns NULL when no element is contained in the range. */
zzlFirstInRange(unsigned char * zl,zrangespec * range)850 unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec *range) {
851 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
852 double score;
853
854 /* If everything is out of range, return early. */
855 if (!zzlIsInRange(zl,range)) return NULL;
856
857 while (eptr != NULL) {
858 sptr = ziplistNext(zl,eptr);
859 serverAssert(sptr != NULL);
860
861 score = zzlGetScore(sptr);
862 if (zslValueGteMin(score,range)) {
863 /* Check if score <= max. */
864 if (zslValueLteMax(score,range))
865 return eptr;
866 return NULL;
867 }
868
869 /* Move to next element. */
870 eptr = ziplistNext(zl,sptr);
871 }
872
873 return NULL;
874 }
875
876 /* Find pointer to the last element contained in the specified range.
877 * Returns NULL when no element is contained in the range. */
zzlLastInRange(unsigned char * zl,zrangespec * range)878 unsigned char *zzlLastInRange(unsigned char *zl, zrangespec *range) {
879 unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
880 double score;
881
882 /* If everything is out of range, return early. */
883 if (!zzlIsInRange(zl,range)) return NULL;
884
885 while (eptr != NULL) {
886 sptr = ziplistNext(zl,eptr);
887 serverAssert(sptr != NULL);
888
889 score = zzlGetScore(sptr);
890 if (zslValueLteMax(score,range)) {
891 /* Check if score >= min. */
892 if (zslValueGteMin(score,range))
893 return eptr;
894 return NULL;
895 }
896
897 /* Move to previous element by moving to the score of previous element.
898 * When this returns NULL, we know there also is no element. */
899 sptr = ziplistPrev(zl,eptr);
900 if (sptr != NULL)
901 serverAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
902 else
903 eptr = NULL;
904 }
905
906 return NULL;
907 }
908
zzlLexValueGteMin(unsigned char * p,zlexrangespec * spec)909 int zzlLexValueGteMin(unsigned char *p, zlexrangespec *spec) {
910 sds value = ziplistGetObject(p);
911 int res = zslLexValueGteMin(value,spec);
912 sdsfree(value);
913 return res;
914 }
915
zzlLexValueLteMax(unsigned char * p,zlexrangespec * spec)916 int zzlLexValueLteMax(unsigned char *p, zlexrangespec *spec) {
917 sds value = ziplistGetObject(p);
918 int res = zslLexValueLteMax(value,spec);
919 sdsfree(value);
920 return res;
921 }
922
923 /* Returns if there is a part of the zset is in range. Should only be used
924 * internally by zzlFirstInRange and zzlLastInRange. */
zzlIsInLexRange(unsigned char * zl,zlexrangespec * range)925 int zzlIsInLexRange(unsigned char *zl, zlexrangespec *range) {
926 unsigned char *p;
927
928 /* Test for ranges that will always be empty. */
929 int cmp = sdscmplex(range->min,range->max);
930 if (cmp > 0 || (cmp == 0 && (range->minex || range->maxex)))
931 return 0;
932
933 p = ziplistIndex(zl,-2); /* Last element. */
934 if (p == NULL) return 0;
935 if (!zzlLexValueGteMin(p,range))
936 return 0;
937
938 p = ziplistIndex(zl,0); /* First element. */
939 serverAssert(p != NULL);
940 if (!zzlLexValueLteMax(p,range))
941 return 0;
942
943 return 1;
944 }
945
946 /* Find pointer to the first element contained in the specified lex range.
947 * Returns NULL when no element is contained in the range. */
zzlFirstInLexRange(unsigned char * zl,zlexrangespec * range)948 unsigned char *zzlFirstInLexRange(unsigned char *zl, zlexrangespec *range) {
949 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
950
951 /* If everything is out of range, return early. */
952 if (!zzlIsInLexRange(zl,range)) return NULL;
953
954 while (eptr != NULL) {
955 if (zzlLexValueGteMin(eptr,range)) {
956 /* Check if score <= max. */
957 if (zzlLexValueLteMax(eptr,range))
958 return eptr;
959 return NULL;
960 }
961
962 /* Move to next element. */
963 sptr = ziplistNext(zl,eptr); /* This element score. Skip it. */
964 serverAssert(sptr != NULL);
965 eptr = ziplistNext(zl,sptr); /* Next element. */
966 }
967
968 return NULL;
969 }
970
971 /* Find pointer to the last element contained in the specified lex range.
972 * Returns NULL when no element is contained in the range. */
zzlLastInLexRange(unsigned char * zl,zlexrangespec * range)973 unsigned char *zzlLastInLexRange(unsigned char *zl, zlexrangespec *range) {
974 unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
975
976 /* If everything is out of range, return early. */
977 if (!zzlIsInLexRange(zl,range)) return NULL;
978
979 while (eptr != NULL) {
980 if (zzlLexValueLteMax(eptr,range)) {
981 /* Check if score >= min. */
982 if (zzlLexValueGteMin(eptr,range))
983 return eptr;
984 return NULL;
985 }
986
987 /* Move to previous element by moving to the score of previous element.
988 * When this returns NULL, we know there also is no element. */
989 sptr = ziplistPrev(zl,eptr);
990 if (sptr != NULL)
991 serverAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
992 else
993 eptr = NULL;
994 }
995
996 return NULL;
997 }
998
zzlFind(unsigned char * zl,sds ele,double * score)999 unsigned char *zzlFind(unsigned char *zl, sds ele, double *score) {
1000 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
1001
1002 while (eptr != NULL) {
1003 sptr = ziplistNext(zl,eptr);
1004 serverAssert(sptr != NULL);
1005
1006 if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele))) {
1007 /* Matching element, pull out score. */
1008 if (score != NULL) *score = zzlGetScore(sptr);
1009 return eptr;
1010 }
1011
1012 /* Move to next element. */
1013 eptr = ziplistNext(zl,sptr);
1014 }
1015 return NULL;
1016 }
1017
1018 /* Delete (element,score) pair from ziplist. Use local copy of eptr because we
1019 * don't want to modify the one given as argument. */
zzlDelete(unsigned char * zl,unsigned char * eptr)1020 unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) {
1021 unsigned char *p = eptr;
1022
1023 /* TODO: add function to ziplist API to delete N elements from offset. */
1024 zl = ziplistDelete(zl,&p);
1025 zl = ziplistDelete(zl,&p);
1026 return zl;
1027 }
1028
zzlInsertAt(unsigned char * zl,unsigned char * eptr,sds ele,double score)1029 unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, sds ele, double score) {
1030 unsigned char *sptr;
1031 char scorebuf[128];
1032 int scorelen;
1033 size_t offset;
1034
1035 scorelen = d2string(scorebuf,sizeof(scorebuf),score);
1036 if (eptr == NULL) {
1037 zl = ziplistPush(zl,(unsigned char*)ele,sdslen(ele),ZIPLIST_TAIL);
1038 zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
1039 } else {
1040 /* Keep offset relative to zl, as it might be re-allocated. */
1041 offset = eptr-zl;
1042 zl = ziplistInsert(zl,eptr,(unsigned char*)ele,sdslen(ele));
1043 eptr = zl+offset;
1044
1045 /* Insert score after the element. */
1046 serverAssert((sptr = ziplistNext(zl,eptr)) != NULL);
1047 zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
1048 }
1049 return zl;
1050 }
1051
1052 /* Insert (element,score) pair in ziplist. This function assumes the element is
1053 * not yet present in the list. */
zzlInsert(unsigned char * zl,sds ele,double score)1054 unsigned char *zzlInsert(unsigned char *zl, sds ele, double score) {
1055 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
1056 double s;
1057
1058 while (eptr != NULL) {
1059 sptr = ziplistNext(zl,eptr);
1060 serverAssert(sptr != NULL);
1061 s = zzlGetScore(sptr);
1062
1063 if (s > score) {
1064 /* First element with score larger than score for element to be
1065 * inserted. This means we should take its spot in the list to
1066 * maintain ordering. */
1067 zl = zzlInsertAt(zl,eptr,ele,score);
1068 break;
1069 } else if (s == score) {
1070 /* Ensure lexicographical ordering for elements. */
1071 if (zzlCompareElements(eptr,(unsigned char*)ele,sdslen(ele)) > 0) {
1072 zl = zzlInsertAt(zl,eptr,ele,score);
1073 break;
1074 }
1075 }
1076
1077 /* Move to next element. */
1078 eptr = ziplistNext(zl,sptr);
1079 }
1080
1081 /* Push on tail of list when it was not yet inserted. */
1082 if (eptr == NULL)
1083 zl = zzlInsertAt(zl,NULL,ele,score);
1084 return zl;
1085 }
1086
zzlDeleteRangeByScore(unsigned char * zl,zrangespec * range,unsigned long * deleted)1087 unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec *range, unsigned long *deleted) {
1088 unsigned char *eptr, *sptr;
1089 double score;
1090 unsigned long num = 0;
1091
1092 if (deleted != NULL) *deleted = 0;
1093
1094 eptr = zzlFirstInRange(zl,range);
1095 if (eptr == NULL) return zl;
1096
1097 /* When the tail of the ziplist is deleted, eptr will point to the sentinel
1098 * byte and ziplistNext will return NULL. */
1099 while ((sptr = ziplistNext(zl,eptr)) != NULL) {
1100 score = zzlGetScore(sptr);
1101 if (zslValueLteMax(score,range)) {
1102 /* Delete both the element and the score. */
1103 zl = ziplistDelete(zl,&eptr);
1104 zl = ziplistDelete(zl,&eptr);
1105 num++;
1106 } else {
1107 /* No longer in range. */
1108 break;
1109 }
1110 }
1111
1112 if (deleted != NULL) *deleted = num;
1113 return zl;
1114 }
1115
zzlDeleteRangeByLex(unsigned char * zl,zlexrangespec * range,unsigned long * deleted)1116 unsigned char *zzlDeleteRangeByLex(unsigned char *zl, zlexrangespec *range, unsigned long *deleted) {
1117 unsigned char *eptr, *sptr;
1118 unsigned long num = 0;
1119
1120 if (deleted != NULL) *deleted = 0;
1121
1122 eptr = zzlFirstInLexRange(zl,range);
1123 if (eptr == NULL) return zl;
1124
1125 /* When the tail of the ziplist is deleted, eptr will point to the sentinel
1126 * byte and ziplistNext will return NULL. */
1127 while ((sptr = ziplistNext(zl,eptr)) != NULL) {
1128 if (zzlLexValueLteMax(eptr,range)) {
1129 /* Delete both the element and the score. */
1130 zl = ziplistDelete(zl,&eptr);
1131 zl = ziplistDelete(zl,&eptr);
1132 num++;
1133 } else {
1134 /* No longer in range. */
1135 break;
1136 }
1137 }
1138
1139 if (deleted != NULL) *deleted = num;
1140 return zl;
1141 }
1142
1143 /* Delete all the elements with rank between start and end from the skiplist.
1144 * Start and end are inclusive. Note that start and end need to be 1-based */
zzlDeleteRangeByRank(unsigned char * zl,unsigned int start,unsigned int end,unsigned long * deleted)1145 unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) {
1146 unsigned int num = (end-start)+1;
1147 if (deleted) *deleted = num;
1148 zl = ziplistDeleteRange(zl,2*(start-1),2*num);
1149 return zl;
1150 }
1151
1152 /*-----------------------------------------------------------------------------
1153 * Common sorted set API
1154 *----------------------------------------------------------------------------*/
1155
zsetLength(const robj * zobj)1156 unsigned long zsetLength(const robj *zobj) {
1157 unsigned long length = 0;
1158 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
1159 length = zzlLength(zobj->ptr);
1160 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1161 length = ((const zset*)zobj->ptr)->zsl->length;
1162 } else {
1163 serverPanic("Unknown sorted set encoding");
1164 }
1165 return length;
1166 }
1167
zsetConvert(robj * zobj,int encoding)1168 void zsetConvert(robj *zobj, int encoding) {
1169 zset *zs;
1170 zskiplistNode *node, *next;
1171 sds ele;
1172 double score;
1173
1174 if (zobj->encoding == encoding) return;
1175 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
1176 unsigned char *zl = zobj->ptr;
1177 unsigned char *eptr, *sptr;
1178 unsigned char *vstr;
1179 unsigned int vlen;
1180 long long vlong;
1181
1182 if (encoding != OBJ_ENCODING_SKIPLIST)
1183 serverPanic("Unknown target encoding");
1184
1185 zs = zmalloc(sizeof(*zs));
1186 zs->dict = dictCreate(&zsetDictType,NULL);
1187 zs->zsl = zslCreate();
1188
1189 eptr = ziplistIndex(zl,0);
1190 serverAssertWithInfo(NULL,zobj,eptr != NULL);
1191 sptr = ziplistNext(zl,eptr);
1192 serverAssertWithInfo(NULL,zobj,sptr != NULL);
1193
1194 while (eptr != NULL) {
1195 score = zzlGetScore(sptr);
1196 serverAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
1197 if (vstr == NULL)
1198 ele = sdsfromlonglong(vlong);
1199 else
1200 ele = sdsnewlen((char*)vstr,vlen);
1201
1202 node = zslInsert(zs->zsl,score,ele);
1203 serverAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK);
1204 zzlNext(zl,&eptr,&sptr);
1205 }
1206
1207 zfree(zobj->ptr);
1208 zobj->ptr = zs;
1209 zobj->encoding = OBJ_ENCODING_SKIPLIST;
1210 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1211 unsigned char *zl = ziplistNew();
1212
1213 if (encoding != OBJ_ENCODING_ZIPLIST)
1214 serverPanic("Unknown target encoding");
1215
1216 /* Approach similar to zslFree(), since we want to free the skiplist at
1217 * the same time as creating the ziplist. */
1218 zs = zobj->ptr;
1219 dictRelease(zs->dict);
1220 node = zs->zsl->header->level[0].forward;
1221 zfree(zs->zsl->header);
1222 zfree(zs->zsl);
1223
1224 while (node) {
1225 zl = zzlInsertAt(zl,NULL,node->ele,node->score);
1226 next = node->level[0].forward;
1227 zslFreeNode(node);
1228 node = next;
1229 }
1230
1231 zfree(zs);
1232 zobj->ptr = zl;
1233 zobj->encoding = OBJ_ENCODING_ZIPLIST;
1234 } else {
1235 serverPanic("Unknown sorted set encoding");
1236 }
1237 }
1238
1239 /* Convert the sorted set object into a ziplist if it is not already a ziplist
1240 * and if the number of elements and the maximum element size is within the
1241 * expected ranges. */
zsetConvertToZiplistIfNeeded(robj * zobj,size_t maxelelen)1242 void zsetConvertToZiplistIfNeeded(robj *zobj, size_t maxelelen) {
1243 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) return;
1244 zset *zset = zobj->ptr;
1245
1246 if (zset->zsl->length <= server.zset_max_ziplist_entries &&
1247 maxelelen <= server.zset_max_ziplist_value)
1248 zsetConvert(zobj,OBJ_ENCODING_ZIPLIST);
1249 }
1250
1251 /* Return (by reference) the score of the specified member of the sorted set
1252 * storing it into *score. If the element does not exist C_ERR is returned
1253 * otherwise C_OK is returned and *score is correctly populated.
1254 * If 'zobj' or 'member' is NULL, C_ERR is returned. */
zsetScore(robj * zobj,sds member,double * score)1255 int zsetScore(robj *zobj, sds member, double *score) {
1256 if (!zobj || !member) return C_ERR;
1257
1258 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
1259 if (zzlFind(zobj->ptr, member, score) == NULL) return C_ERR;
1260 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1261 zset *zs = zobj->ptr;
1262 dictEntry *de = dictFind(zs->dict, member);
1263 if (de == NULL) return C_ERR;
1264 *score = *(double*)dictGetVal(de);
1265 } else {
1266 serverPanic("Unknown sorted set encoding");
1267 }
1268 return C_OK;
1269 }
1270
1271 /* Add a new element or update the score of an existing element in a sorted
1272 * set, regardless of its encoding.
1273 *
1274 * The set of flags change the command behavior. They are passed with an integer
1275 * pointer since the function will clear the flags and populate them with
1276 * other flags to indicate different conditions.
1277 *
1278 * The input flags are the following:
1279 *
1280 * ZADD_INCR: Increment the current element score by 'score' instead of updating
1281 * the current element score. If the element does not exist, we
1282 * assume 0 as previous score.
1283 * ZADD_NX: Perform the operation only if the element does not exist.
1284 * ZADD_XX: Perform the operation only if the element already exist.
1285 *
1286 * When ZADD_INCR is used, the new score of the element is stored in
1287 * '*newscore' if 'newscore' is not NULL.
1288 *
1289 * The returned flags are the following:
1290 *
1291 * ZADD_NAN: The resulting score is not a number.
1292 * ZADD_ADDED: The element was added (not present before the call).
1293 * ZADD_UPDATED: The element score was updated.
1294 * ZADD_NOP: No operation was performed because of NX or XX.
1295 *
1296 * Return value:
1297 *
1298 * The function returns 1 on success, and sets the appropriate flags
1299 * ADDED or UPDATED to signal what happened during the operation (note that
1300 * none could be set if we re-added an element using the same score it used
1301 * to have, or in the case a zero increment is used).
1302 *
1303 * The function returns 0 on erorr, currently only when the increment
1304 * produces a NAN condition, or when the 'score' value is NAN since the
1305 * start.
1306 *
1307 * The commad as a side effect of adding a new element may convert the sorted
1308 * set internal encoding from ziplist to hashtable+skiplist.
1309 *
1310 * Memory managemnet of 'ele':
1311 *
1312 * The function does not take ownership of the 'ele' SDS string, but copies
1313 * it if needed. */
zsetAdd(robj * zobj,double score,sds ele,int * flags,double * newscore)1314 int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
1315 /* Turn options into simple to check vars. */
1316 int incr = (*flags & ZADD_INCR) != 0;
1317 int nx = (*flags & ZADD_NX) != 0;
1318 int xx = (*flags & ZADD_XX) != 0;
1319 *flags = 0; /* We'll return our response flags. */
1320 double curscore;
1321
1322 /* NaN as input is an error regardless of all the other parameters. */
1323 if (isnan(score)) {
1324 *flags = ZADD_NAN;
1325 return 0;
1326 }
1327
1328 /* Update the sorted set according to its encoding. */
1329 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
1330 unsigned char *eptr;
1331
1332 if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
1333 /* NX? Return, same element already exists. */
1334 if (nx) {
1335 *flags |= ZADD_NOP;
1336 return 1;
1337 }
1338
1339 /* Prepare the score for the increment if needed. */
1340 if (incr) {
1341 score += curscore;
1342 if (isnan(score)) {
1343 *flags |= ZADD_NAN;
1344 return 0;
1345 }
1346 if (newscore) *newscore = score;
1347 }
1348
1349 /* Remove and re-insert when score changed. */
1350 if (score != curscore) {
1351 zobj->ptr = zzlDelete(zobj->ptr,eptr);
1352 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
1353 *flags |= ZADD_UPDATED;
1354 }
1355 return 1;
1356 } else if (!xx) {
1357 /* Optimize: check if the element is too large or the list
1358 * becomes too long *before* executing zzlInsert. */
1359 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
1360 if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
1361 zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
1362 if (sdslen(ele) > server.zset_max_ziplist_value)
1363 zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
1364 if (newscore) *newscore = score;
1365 *flags |= ZADD_ADDED;
1366 return 1;
1367 } else {
1368 *flags |= ZADD_NOP;
1369 return 1;
1370 }
1371 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1372 zset *zs = zobj->ptr;
1373 zskiplistNode *znode;
1374 dictEntry *de;
1375
1376 de = dictFind(zs->dict,ele);
1377 if (de != NULL) {
1378 /* NX? Return, same element already exists. */
1379 if (nx) {
1380 *flags |= ZADD_NOP;
1381 return 1;
1382 }
1383 curscore = *(double*)dictGetVal(de);
1384
1385 /* Prepare the score for the increment if needed. */
1386 if (incr) {
1387 score += curscore;
1388 if (isnan(score)) {
1389 *flags |= ZADD_NAN;
1390 return 0;
1391 }
1392 if (newscore) *newscore = score;
1393 }
1394
1395 /* Remove and re-insert when score changes. */
1396 if (score != curscore) {
1397 znode = zslUpdateScore(zs->zsl,curscore,ele,score);
1398 /* Note that we did not removed the original element from
1399 * the hash table representing the sorted set, so we just
1400 * update the score. */
1401 dictGetVal(de) = &znode->score; /* Update score ptr. */
1402 *flags |= ZADD_UPDATED;
1403 }
1404 return 1;
1405 } else if (!xx) {
1406 ele = sdsdup(ele);
1407 znode = zslInsert(zs->zsl,score,ele);
1408 serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
1409 *flags |= ZADD_ADDED;
1410 if (newscore) *newscore = score;
1411 return 1;
1412 } else {
1413 *flags |= ZADD_NOP;
1414 return 1;
1415 }
1416 } else {
1417 serverPanic("Unknown sorted set encoding");
1418 }
1419 return 0; /* Never reached. */
1420 }
1421
1422 /* Delete the element 'ele' from the sorted set, returning 1 if the element
1423 * existed and was deleted, 0 otherwise (the element was not there). */
zsetDel(robj * zobj,sds ele)1424 int zsetDel(robj *zobj, sds ele) {
1425 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
1426 unsigned char *eptr;
1427
1428 if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) {
1429 zobj->ptr = zzlDelete(zobj->ptr,eptr);
1430 return 1;
1431 }
1432 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1433 zset *zs = zobj->ptr;
1434 dictEntry *de;
1435 double score;
1436
1437 de = dictUnlink(zs->dict,ele);
1438 if (de != NULL) {
1439 /* Get the score in order to delete from the skiplist later. */
1440 score = *(double*)dictGetVal(de);
1441
1442 /* Delete from the hash table and later from the skiplist.
1443 * Note that the order is important: deleting from the skiplist
1444 * actually releases the SDS string representing the element,
1445 * which is shared between the skiplist and the hash table, so
1446 * we need to delete from the skiplist as the final step. */
1447 dictFreeUnlinkedEntry(zs->dict,de);
1448
1449 /* Delete from skiplist. */
1450 int retval = zslDelete(zs->zsl,score,ele,NULL);
1451 serverAssert(retval);
1452
1453 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1454 return 1;
1455 }
1456 } else {
1457 serverPanic("Unknown sorted set encoding");
1458 }
1459 return 0; /* No such element found. */
1460 }
1461
1462 /* Given a sorted set object returns the 0-based rank of the object or
1463 * -1 if the object does not exist.
1464 *
1465 * For rank we mean the position of the element in the sorted collection
1466 * of elements. So the first element has rank 0, the second rank 1, and so
1467 * forth up to length-1 elements.
1468 *
1469 * If 'reverse' is false, the rank is returned considering as first element
1470 * the one with the lowest score. Otherwise if 'reverse' is non-zero
1471 * the rank is computed considering as element with rank 0 the one with
1472 * the highest score. */
zsetRank(robj * zobj,sds ele,int reverse)1473 long zsetRank(robj *zobj, sds ele, int reverse) {
1474 unsigned long llen;
1475 unsigned long rank;
1476
1477 llen = zsetLength(zobj);
1478
1479 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
1480 unsigned char *zl = zobj->ptr;
1481 unsigned char *eptr, *sptr;
1482
1483 eptr = ziplistIndex(zl,0);
1484 serverAssert(eptr != NULL);
1485 sptr = ziplistNext(zl,eptr);
1486 serverAssert(sptr != NULL);
1487
1488 rank = 1;
1489 while(eptr != NULL) {
1490 if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele)))
1491 break;
1492 rank++;
1493 zzlNext(zl,&eptr,&sptr);
1494 }
1495
1496 if (eptr != NULL) {
1497 if (reverse)
1498 return llen-rank;
1499 else
1500 return rank-1;
1501 } else {
1502 return -1;
1503 }
1504 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1505 zset *zs = zobj->ptr;
1506 zskiplist *zsl = zs->zsl;
1507 dictEntry *de;
1508 double score;
1509
1510 de = dictFind(zs->dict,ele);
1511 if (de != NULL) {
1512 score = *(double*)dictGetVal(de);
1513 rank = zslGetRank(zsl,score,ele);
1514 /* Existing elements always have a rank. */
1515 serverAssert(rank != 0);
1516 if (reverse)
1517 return llen-rank;
1518 else
1519 return rank-1;
1520 } else {
1521 return -1;
1522 }
1523 } else {
1524 serverPanic("Unknown sorted set encoding");
1525 }
1526 }
1527
1528 /*-----------------------------------------------------------------------------
1529 * Sorted set commands
1530 *----------------------------------------------------------------------------*/
1531
1532 /* This generic command implements both ZADD and ZINCRBY. */
zaddGenericCommand(client * c,int flags)1533 void zaddGenericCommand(client *c, int flags) {
1534 static char *nanerr = "resulting score is not a number (NaN)";
1535 robj *key = c->argv[1];
1536 robj *zobj;
1537 sds ele;
1538 double score = 0, *scores = NULL;
1539 int j, elements;
1540 int scoreidx = 0;
1541 /* The following vars are used in order to track what the command actually
1542 * did during the execution, to reply to the client and to trigger the
1543 * notification of keyspace change. */
1544 int added = 0; /* Number of new elements added. */
1545 int updated = 0; /* Number of elements with updated score. */
1546 int processed = 0; /* Number of elements processed, may remain zero with
1547 options like XX. */
1548
1549 /* Parse options. At the end 'scoreidx' is set to the argument position
1550 * of the score of the first score-element pair. */
1551 scoreidx = 2;
1552 while(scoreidx < c->argc) {
1553 char *opt = c->argv[scoreidx]->ptr;
1554 if (!strcasecmp(opt,"nx")) flags |= ZADD_NX;
1555 else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX;
1556 else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH;
1557 else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR;
1558 else break;
1559 scoreidx++;
1560 }
1561
1562 /* Turn options into simple to check vars. */
1563 int incr = (flags & ZADD_INCR) != 0;
1564 int nx = (flags & ZADD_NX) != 0;
1565 int xx = (flags & ZADD_XX) != 0;
1566 int ch = (flags & ZADD_CH) != 0;
1567
1568 /* After the options, we expect to have an even number of args, since
1569 * we expect any number of score-element pairs. */
1570 elements = c->argc-scoreidx;
1571 if (elements % 2 || !elements) {
1572 addReply(c,shared.syntaxerr);
1573 return;
1574 }
1575 elements /= 2; /* Now this holds the number of score-element pairs. */
1576
1577 /* Check for incompatible options. */
1578 if (nx && xx) {
1579 addReplyError(c,
1580 "XX and NX options at the same time are not compatible");
1581 return;
1582 }
1583
1584 if (incr && elements > 1) {
1585 addReplyError(c,
1586 "INCR option supports a single increment-element pair");
1587 return;
1588 }
1589
1590 /* Start parsing all the scores, we need to emit any syntax error
1591 * before executing additions to the sorted set, as the command should
1592 * either execute fully or nothing at all. */
1593 scores = zmalloc(sizeof(double)*elements);
1594 for (j = 0; j < elements; j++) {
1595 if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
1596 != C_OK) goto cleanup;
1597 }
1598
1599 /* Lookup the key and create the sorted set if does not exist. */
1600 zobj = lookupKeyWrite(c->db,key);
1601 if (zobj == NULL) {
1602 if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
1603 if (server.zset_max_ziplist_entries == 0 ||
1604 server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
1605 {
1606 zobj = createZsetObject();
1607 } else {
1608 zobj = createZsetZiplistObject();
1609 }
1610 dbAdd(c->db,key,zobj);
1611 } else {
1612 if (zobj->type != OBJ_ZSET) {
1613 addReply(c,shared.wrongtypeerr);
1614 goto cleanup;
1615 }
1616 }
1617
1618 for (j = 0; j < elements; j++) {
1619 double newscore;
1620 score = scores[j];
1621 int retflags = flags;
1622
1623 ele = c->argv[scoreidx+1+j*2]->ptr;
1624 int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
1625 if (retval == 0) {
1626 addReplyError(c,nanerr);
1627 goto cleanup;
1628 }
1629 if (retflags & ZADD_ADDED) added++;
1630 if (retflags & ZADD_UPDATED) updated++;
1631 if (!(retflags & ZADD_NOP)) processed++;
1632 score = newscore;
1633 }
1634 server.dirty += (added+updated);
1635
1636 reply_to_client:
1637 if (incr) { /* ZINCRBY or INCR option. */
1638 if (processed)
1639 addReplyDouble(c,score);
1640 else
1641 addReply(c,shared.nullbulk);
1642 } else { /* ZADD. */
1643 addReplyLongLong(c,ch ? added+updated : added);
1644 }
1645
1646 cleanup:
1647 zfree(scores);
1648 if (added || updated) {
1649 signalModifiedKey(c->db,key);
1650 notifyKeyspaceEvent(NOTIFY_ZSET,
1651 incr ? "zincr" : "zadd", key, c->db->id);
1652 }
1653 }
1654
zaddCommand(client * c)1655 void zaddCommand(client *c) {
1656 zaddGenericCommand(c,ZADD_NONE);
1657 }
1658
zincrbyCommand(client * c)1659 void zincrbyCommand(client *c) {
1660 zaddGenericCommand(c,ZADD_INCR);
1661 }
1662
zremCommand(client * c)1663 void zremCommand(client *c) {
1664 robj *key = c->argv[1];
1665 robj *zobj;
1666 int deleted = 0, keyremoved = 0, j;
1667
1668 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1669 checkType(c,zobj,OBJ_ZSET)) return;
1670
1671 for (j = 2; j < c->argc; j++) {
1672 if (zsetDel(zobj,c->argv[j]->ptr)) deleted++;
1673 if (zsetLength(zobj) == 0) {
1674 dbDelete(c->db,key);
1675 keyremoved = 1;
1676 break;
1677 }
1678 }
1679
1680 if (deleted) {
1681 notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id);
1682 if (keyremoved)
1683 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
1684 signalModifiedKey(c->db,key);
1685 server.dirty += deleted;
1686 }
1687 addReplyLongLong(c,deleted);
1688 }
1689
1690 /* Implements ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZREMRANGEBYLEX commands. */
1691 #define ZRANGE_RANK 0
1692 #define ZRANGE_SCORE 1
1693 #define ZRANGE_LEX 2
zremrangeGenericCommand(client * c,int rangetype)1694 void zremrangeGenericCommand(client *c, int rangetype) {
1695 robj *key = c->argv[1];
1696 robj *zobj;
1697 int keyremoved = 0;
1698 unsigned long deleted = 0;
1699 zrangespec range;
1700 zlexrangespec lexrange;
1701 long start, end, llen;
1702
1703 /* Step 1: Parse the range. */
1704 if (rangetype == ZRANGE_RANK) {
1705 if ((getLongFromObjectOrReply(c,c->argv[2],&start,NULL) != C_OK) ||
1706 (getLongFromObjectOrReply(c,c->argv[3],&end,NULL) != C_OK))
1707 return;
1708 } else if (rangetype == ZRANGE_SCORE) {
1709 if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK) {
1710 addReplyError(c,"min or max is not a float");
1711 return;
1712 }
1713 } else if (rangetype == ZRANGE_LEX) {
1714 if (zslParseLexRange(c->argv[2],c->argv[3],&lexrange) != C_OK) {
1715 addReplyError(c,"min or max not valid string range item");
1716 return;
1717 }
1718 }
1719
1720 /* Step 2: Lookup & range sanity checks if needed. */
1721 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1722 checkType(c,zobj,OBJ_ZSET)) goto cleanup;
1723
1724 if (rangetype == ZRANGE_RANK) {
1725 /* Sanitize indexes. */
1726 llen = zsetLength(zobj);
1727 if (start < 0) start = llen+start;
1728 if (end < 0) end = llen+end;
1729 if (start < 0) start = 0;
1730
1731 /* Invariant: start >= 0, so this test will be true when end < 0.
1732 * The range is empty when start > end or start >= length. */
1733 if (start > end || start >= llen) {
1734 addReply(c,shared.czero);
1735 goto cleanup;
1736 }
1737 if (end >= llen) end = llen-1;
1738 }
1739
1740 /* Step 3: Perform the range deletion operation. */
1741 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
1742 switch(rangetype) {
1743 case ZRANGE_RANK:
1744 zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);
1745 break;
1746 case ZRANGE_SCORE:
1747 zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,&range,&deleted);
1748 break;
1749 case ZRANGE_LEX:
1750 zobj->ptr = zzlDeleteRangeByLex(zobj->ptr,&lexrange,&deleted);
1751 break;
1752 }
1753 if (zzlLength(zobj->ptr) == 0) {
1754 dbDelete(c->db,key);
1755 keyremoved = 1;
1756 }
1757 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1758 zset *zs = zobj->ptr;
1759 switch(rangetype) {
1760 case ZRANGE_RANK:
1761 deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
1762 break;
1763 case ZRANGE_SCORE:
1764 deleted = zslDeleteRangeByScore(zs->zsl,&range,zs->dict);
1765 break;
1766 case ZRANGE_LEX:
1767 deleted = zslDeleteRangeByLex(zs->zsl,&lexrange,zs->dict);
1768 break;
1769 }
1770 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1771 if (dictSize(zs->dict) == 0) {
1772 dbDelete(c->db,key);
1773 keyremoved = 1;
1774 }
1775 } else {
1776 serverPanic("Unknown sorted set encoding");
1777 }
1778
1779 /* Step 4: Notifications and reply. */
1780 if (deleted) {
1781 char *event[3] = {"zremrangebyrank","zremrangebyscore","zremrangebylex"};
1782 signalModifiedKey(c->db,key);
1783 notifyKeyspaceEvent(NOTIFY_ZSET,event[rangetype],key,c->db->id);
1784 if (keyremoved)
1785 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
1786 }
1787 server.dirty += deleted;
1788 addReplyLongLong(c,deleted);
1789
1790 cleanup:
1791 if (rangetype == ZRANGE_LEX) zslFreeLexRange(&lexrange);
1792 }
1793
zremrangebyrankCommand(client * c)1794 void zremrangebyrankCommand(client *c) {
1795 zremrangeGenericCommand(c,ZRANGE_RANK);
1796 }
1797
zremrangebyscoreCommand(client * c)1798 void zremrangebyscoreCommand(client *c) {
1799 zremrangeGenericCommand(c,ZRANGE_SCORE);
1800 }
1801
zremrangebylexCommand(client * c)1802 void zremrangebylexCommand(client *c) {
1803 zremrangeGenericCommand(c,ZRANGE_LEX);
1804 }
1805
1806 typedef struct {
1807 robj *subject;
1808 int type; /* Set, sorted set */
1809 int encoding;
1810 double weight;
1811
1812 union {
1813 /* Set iterators. */
1814 union _iterset {
1815 struct {
1816 intset *is;
1817 int ii;
1818 } is;
1819 struct {
1820 dict *dict;
1821 dictIterator *di;
1822 dictEntry *de;
1823 } ht;
1824 } set;
1825
1826 /* Sorted set iterators. */
1827 union _iterzset {
1828 struct {
1829 unsigned char *zl;
1830 unsigned char *eptr, *sptr;
1831 } zl;
1832 struct {
1833 zset *zs;
1834 zskiplistNode *node;
1835 } sl;
1836 } zset;
1837 } iter;
1838 } zsetopsrc;
1839
1840
1841 /* Use dirty flags for pointers that need to be cleaned up in the next
1842 * iteration over the zsetopval. The dirty flag for the long long value is
1843 * special, since long long values don't need cleanup. Instead, it means that
1844 * we already checked that "ell" holds a long long, or tried to convert another
1845 * representation into a long long value. When this was successful,
1846 * OPVAL_VALID_LL is set as well. */
1847 #define OPVAL_DIRTY_SDS 1
1848 #define OPVAL_DIRTY_LL 2
1849 #define OPVAL_VALID_LL 4
1850
1851 /* Store value retrieved from the iterator. */
1852 typedef struct {
1853 int flags;
1854 unsigned char _buf[32]; /* Private buffer. */
1855 sds ele;
1856 unsigned char *estr;
1857 unsigned int elen;
1858 long long ell;
1859 double score;
1860 } zsetopval;
1861
1862 typedef union _iterset iterset;
1863 typedef union _iterzset iterzset;
1864
zuiInitIterator(zsetopsrc * op)1865 void zuiInitIterator(zsetopsrc *op) {
1866 if (op->subject == NULL)
1867 return;
1868
1869 if (op->type == OBJ_SET) {
1870 iterset *it = &op->iter.set;
1871 if (op->encoding == OBJ_ENCODING_INTSET) {
1872 it->is.is = op->subject->ptr;
1873 it->is.ii = 0;
1874 } else if (op->encoding == OBJ_ENCODING_HT) {
1875 it->ht.dict = op->subject->ptr;
1876 it->ht.di = dictGetIterator(op->subject->ptr);
1877 it->ht.de = dictNext(it->ht.di);
1878 } else {
1879 serverPanic("Unknown set encoding");
1880 }
1881 } else if (op->type == OBJ_ZSET) {
1882 iterzset *it = &op->iter.zset;
1883 if (op->encoding == OBJ_ENCODING_ZIPLIST) {
1884 it->zl.zl = op->subject->ptr;
1885 it->zl.eptr = ziplistIndex(it->zl.zl,0);
1886 if (it->zl.eptr != NULL) {
1887 it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr);
1888 serverAssert(it->zl.sptr != NULL);
1889 }
1890 } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
1891 it->sl.zs = op->subject->ptr;
1892 it->sl.node = it->sl.zs->zsl->header->level[0].forward;
1893 } else {
1894 serverPanic("Unknown sorted set encoding");
1895 }
1896 } else {
1897 serverPanic("Unsupported type");
1898 }
1899 }
1900
zuiClearIterator(zsetopsrc * op)1901 void zuiClearIterator(zsetopsrc *op) {
1902 if (op->subject == NULL)
1903 return;
1904
1905 if (op->type == OBJ_SET) {
1906 iterset *it = &op->iter.set;
1907 if (op->encoding == OBJ_ENCODING_INTSET) {
1908 UNUSED(it); /* skip */
1909 } else if (op->encoding == OBJ_ENCODING_HT) {
1910 dictReleaseIterator(it->ht.di);
1911 } else {
1912 serverPanic("Unknown set encoding");
1913 }
1914 } else if (op->type == OBJ_ZSET) {
1915 iterzset *it = &op->iter.zset;
1916 if (op->encoding == OBJ_ENCODING_ZIPLIST) {
1917 UNUSED(it); /* skip */
1918 } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
1919 UNUSED(it); /* skip */
1920 } else {
1921 serverPanic("Unknown sorted set encoding");
1922 }
1923 } else {
1924 serverPanic("Unsupported type");
1925 }
1926 }
1927
zuiLength(zsetopsrc * op)1928 unsigned long zuiLength(zsetopsrc *op) {
1929 if (op->subject == NULL)
1930 return 0;
1931
1932 if (op->type == OBJ_SET) {
1933 if (op->encoding == OBJ_ENCODING_INTSET) {
1934 return intsetLen(op->subject->ptr);
1935 } else if (op->encoding == OBJ_ENCODING_HT) {
1936 dict *ht = op->subject->ptr;
1937 return dictSize(ht);
1938 } else {
1939 serverPanic("Unknown set encoding");
1940 }
1941 } else if (op->type == OBJ_ZSET) {
1942 if (op->encoding == OBJ_ENCODING_ZIPLIST) {
1943 return zzlLength(op->subject->ptr);
1944 } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
1945 zset *zs = op->subject->ptr;
1946 return zs->zsl->length;
1947 } else {
1948 serverPanic("Unknown sorted set encoding");
1949 }
1950 } else {
1951 serverPanic("Unsupported type");
1952 }
1953 }
1954
1955 /* Check if the current value is valid. If so, store it in the passed structure
1956 * and move to the next element. If not valid, this means we have reached the
1957 * end of the structure and can abort. */
zuiNext(zsetopsrc * op,zsetopval * val)1958 int zuiNext(zsetopsrc *op, zsetopval *val) {
1959 if (op->subject == NULL)
1960 return 0;
1961
1962 if (val->flags & OPVAL_DIRTY_SDS)
1963 sdsfree(val->ele);
1964
1965 memset(val,0,sizeof(zsetopval));
1966
1967 if (op->type == OBJ_SET) {
1968 iterset *it = &op->iter.set;
1969 if (op->encoding == OBJ_ENCODING_INTSET) {
1970 int64_t ell;
1971
1972 if (!intsetGet(it->is.is,it->is.ii,&ell))
1973 return 0;
1974 val->ell = ell;
1975 val->score = 1.0;
1976
1977 /* Move to next element. */
1978 it->is.ii++;
1979 } else if (op->encoding == OBJ_ENCODING_HT) {
1980 if (it->ht.de == NULL)
1981 return 0;
1982 val->ele = dictGetKey(it->ht.de);
1983 val->score = 1.0;
1984
1985 /* Move to next element. */
1986 it->ht.de = dictNext(it->ht.di);
1987 } else {
1988 serverPanic("Unknown set encoding");
1989 }
1990 } else if (op->type == OBJ_ZSET) {
1991 iterzset *it = &op->iter.zset;
1992 if (op->encoding == OBJ_ENCODING_ZIPLIST) {
1993 /* No need to check both, but better be explicit. */
1994 if (it->zl.eptr == NULL || it->zl.sptr == NULL)
1995 return 0;
1996 serverAssert(ziplistGet(it->zl.eptr,&val->estr,&val->elen,&val->ell));
1997 val->score = zzlGetScore(it->zl.sptr);
1998
1999 /* Move to next element. */
2000 zzlNext(it->zl.zl,&it->zl.eptr,&it->zl.sptr);
2001 } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
2002 if (it->sl.node == NULL)
2003 return 0;
2004 val->ele = it->sl.node->ele;
2005 val->score = it->sl.node->score;
2006
2007 /* Move to next element. */
2008 it->sl.node = it->sl.node->level[0].forward;
2009 } else {
2010 serverPanic("Unknown sorted set encoding");
2011 }
2012 } else {
2013 serverPanic("Unsupported type");
2014 }
2015 return 1;
2016 }
2017
zuiLongLongFromValue(zsetopval * val)2018 int zuiLongLongFromValue(zsetopval *val) {
2019 if (!(val->flags & OPVAL_DIRTY_LL)) {
2020 val->flags |= OPVAL_DIRTY_LL;
2021
2022 if (val->ele != NULL) {
2023 if (string2ll(val->ele,sdslen(val->ele),&val->ell))
2024 val->flags |= OPVAL_VALID_LL;
2025 } else if (val->estr != NULL) {
2026 if (string2ll((char*)val->estr,val->elen,&val->ell))
2027 val->flags |= OPVAL_VALID_LL;
2028 } else {
2029 /* The long long was already set, flag as valid. */
2030 val->flags |= OPVAL_VALID_LL;
2031 }
2032 }
2033 return val->flags & OPVAL_VALID_LL;
2034 }
2035
zuiSdsFromValue(zsetopval * val)2036 sds zuiSdsFromValue(zsetopval *val) {
2037 if (val->ele == NULL) {
2038 if (val->estr != NULL) {
2039 val->ele = sdsnewlen((char*)val->estr,val->elen);
2040 } else {
2041 val->ele = sdsfromlonglong(val->ell);
2042 }
2043 val->flags |= OPVAL_DIRTY_SDS;
2044 }
2045 return val->ele;
2046 }
2047
2048 /* This is different from zuiSdsFromValue since returns a new SDS string
2049 * which is up to the caller to free. */
zuiNewSdsFromValue(zsetopval * val)2050 sds zuiNewSdsFromValue(zsetopval *val) {
2051 if (val->flags & OPVAL_DIRTY_SDS) {
2052 /* We have already one to return! */
2053 sds ele = val->ele;
2054 val->flags &= ~OPVAL_DIRTY_SDS;
2055 val->ele = NULL;
2056 return ele;
2057 } else if (val->ele) {
2058 return sdsdup(val->ele);
2059 } else if (val->estr) {
2060 return sdsnewlen((char*)val->estr,val->elen);
2061 } else {
2062 return sdsfromlonglong(val->ell);
2063 }
2064 }
2065
zuiBufferFromValue(zsetopval * val)2066 int zuiBufferFromValue(zsetopval *val) {
2067 if (val->estr == NULL) {
2068 if (val->ele != NULL) {
2069 val->elen = sdslen(val->ele);
2070 val->estr = (unsigned char*)val->ele;
2071 } else {
2072 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell);
2073 val->estr = val->_buf;
2074 }
2075 }
2076 return 1;
2077 }
2078
2079 /* Find value pointed to by val in the source pointer to by op. When found,
2080 * return 1 and store its score in target. Return 0 otherwise. */
zuiFind(zsetopsrc * op,zsetopval * val,double * score)2081 int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
2082 if (op->subject == NULL)
2083 return 0;
2084
2085 if (op->type == OBJ_SET) {
2086 if (op->encoding == OBJ_ENCODING_INTSET) {
2087 if (zuiLongLongFromValue(val) &&
2088 intsetFind(op->subject->ptr,val->ell))
2089 {
2090 *score = 1.0;
2091 return 1;
2092 } else {
2093 return 0;
2094 }
2095 } else if (op->encoding == OBJ_ENCODING_HT) {
2096 dict *ht = op->subject->ptr;
2097 zuiSdsFromValue(val);
2098 if (dictFind(ht,val->ele) != NULL) {
2099 *score = 1.0;
2100 return 1;
2101 } else {
2102 return 0;
2103 }
2104 } else {
2105 serverPanic("Unknown set encoding");
2106 }
2107 } else if (op->type == OBJ_ZSET) {
2108 zuiSdsFromValue(val);
2109
2110 if (op->encoding == OBJ_ENCODING_ZIPLIST) {
2111 if (zzlFind(op->subject->ptr,val->ele,score) != NULL) {
2112 /* Score is already set by zzlFind. */
2113 return 1;
2114 } else {
2115 return 0;
2116 }
2117 } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
2118 zset *zs = op->subject->ptr;
2119 dictEntry *de;
2120 if ((de = dictFind(zs->dict,val->ele)) != NULL) {
2121 *score = *(double*)dictGetVal(de);
2122 return 1;
2123 } else {
2124 return 0;
2125 }
2126 } else {
2127 serverPanic("Unknown sorted set encoding");
2128 }
2129 } else {
2130 serverPanic("Unsupported type");
2131 }
2132 }
2133
zuiCompareByCardinality(const void * s1,const void * s2)2134 int zuiCompareByCardinality(const void *s1, const void *s2) {
2135 unsigned long first = zuiLength((zsetopsrc*)s1);
2136 unsigned long second = zuiLength((zsetopsrc*)s2);
2137 if (first > second) return 1;
2138 if (first < second) return -1;
2139 return 0;
2140 }
2141
2142 #define REDIS_AGGR_SUM 1
2143 #define REDIS_AGGR_MIN 2
2144 #define REDIS_AGGR_MAX 3
2145 #define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e))
2146
zunionInterAggregate(double * target,double val,int aggregate)2147 inline static void zunionInterAggregate(double *target, double val, int aggregate) {
2148 if (aggregate == REDIS_AGGR_SUM) {
2149 *target = *target + val;
2150 /* The result of adding two doubles is NaN when one variable
2151 * is +inf and the other is -inf. When these numbers are added,
2152 * we maintain the convention of the result being 0.0. */
2153 if (isnan(*target)) *target = 0.0;
2154 } else if (aggregate == REDIS_AGGR_MIN) {
2155 *target = val < *target ? val : *target;
2156 } else if (aggregate == REDIS_AGGR_MAX) {
2157 *target = val > *target ? val : *target;
2158 } else {
2159 /* safety net */
2160 serverPanic("Unknown ZUNION/INTER aggregate type");
2161 }
2162 }
2163
2164 uint64_t dictSdsHash(const void *key);
2165 int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
2166
2167 dictType setAccumulatorDictType = {
2168 dictSdsHash, /* hash function */
2169 NULL, /* key dup */
2170 NULL, /* val dup */
2171 dictSdsKeyCompare, /* key compare */
2172 NULL, /* key destructor */
2173 NULL /* val destructor */
2174 };
2175
zunionInterGenericCommand(client * c,robj * dstkey,int op)2176 void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
2177 int i, j;
2178 long setnum;
2179 int aggregate = REDIS_AGGR_SUM;
2180 zsetopsrc *src;
2181 zsetopval zval;
2182 sds tmp;
2183 size_t maxelelen = 0;
2184 robj *dstobj;
2185 zset *dstzset;
2186 zskiplistNode *znode;
2187 int touched = 0;
2188
2189 /* expect setnum input keys to be given */
2190 if ((getLongFromObjectOrReply(c, c->argv[2], &setnum, NULL) != C_OK))
2191 return;
2192
2193 if (setnum < 1) {
2194 addReplyError(c,
2195 "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
2196 return;
2197 }
2198
2199 /* test if the expected number of keys would overflow */
2200 if (setnum > c->argc-3) {
2201 addReply(c,shared.syntaxerr);
2202 return;
2203 }
2204
2205 /* read keys to be used for input */
2206 src = zcalloc(sizeof(zsetopsrc) * setnum);
2207 for (i = 0, j = 3; i < setnum; i++, j++) {
2208 robj *obj = lookupKeyWrite(c->db,c->argv[j]);
2209 if (obj != NULL) {
2210 if (obj->type != OBJ_ZSET && obj->type != OBJ_SET) {
2211 zfree(src);
2212 addReply(c,shared.wrongtypeerr);
2213 return;
2214 }
2215
2216 src[i].subject = obj;
2217 src[i].type = obj->type;
2218 src[i].encoding = obj->encoding;
2219 } else {
2220 src[i].subject = NULL;
2221 }
2222
2223 /* Default all weights to 1. */
2224 src[i].weight = 1.0;
2225 }
2226
2227 /* parse optional extra arguments */
2228 if (j < c->argc) {
2229 int remaining = c->argc - j;
2230
2231 while (remaining) {
2232 if (remaining >= (setnum + 1) &&
2233 !strcasecmp(c->argv[j]->ptr,"weights"))
2234 {
2235 j++; remaining--;
2236 for (i = 0; i < setnum; i++, j++, remaining--) {
2237 if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
2238 "weight value is not a float") != C_OK)
2239 {
2240 zfree(src);
2241 return;
2242 }
2243 }
2244 } else if (remaining >= 2 &&
2245 !strcasecmp(c->argv[j]->ptr,"aggregate"))
2246 {
2247 j++; remaining--;
2248 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
2249 aggregate = REDIS_AGGR_SUM;
2250 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
2251 aggregate = REDIS_AGGR_MIN;
2252 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
2253 aggregate = REDIS_AGGR_MAX;
2254 } else {
2255 zfree(src);
2256 addReply(c,shared.syntaxerr);
2257 return;
2258 }
2259 j++; remaining--;
2260 } else {
2261 zfree(src);
2262 addReply(c,shared.syntaxerr);
2263 return;
2264 }
2265 }
2266 }
2267
2268 /* sort sets from the smallest to largest, this will improve our
2269 * algorithm's performance */
2270 qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
2271
2272 dstobj = createZsetObject();
2273 dstzset = dstobj->ptr;
2274 memset(&zval, 0, sizeof(zval));
2275
2276 if (op == SET_OP_INTER) {
2277 /* Skip everything if the smallest input is empty. */
2278 if (zuiLength(&src[0]) > 0) {
2279 /* Precondition: as src[0] is non-empty and the inputs are ordered
2280 * by size, all src[i > 0] are non-empty too. */
2281 zuiInitIterator(&src[0]);
2282 while (zuiNext(&src[0],&zval)) {
2283 double score, value;
2284
2285 score = src[0].weight * zval.score;
2286 if (isnan(score)) score = 0;
2287
2288 for (j = 1; j < setnum; j++) {
2289 /* It is not safe to access the zset we are
2290 * iterating, so explicitly check for equal object. */
2291 if (src[j].subject == src[0].subject) {
2292 value = zval.score*src[j].weight;
2293 zunionInterAggregate(&score,value,aggregate);
2294 } else if (zuiFind(&src[j],&zval,&value)) {
2295 value *= src[j].weight;
2296 zunionInterAggregate(&score,value,aggregate);
2297 } else {
2298 break;
2299 }
2300 }
2301
2302 /* Only continue when present in every input. */
2303 if (j == setnum) {
2304 tmp = zuiNewSdsFromValue(&zval);
2305 znode = zslInsert(dstzset->zsl,score,tmp);
2306 dictAdd(dstzset->dict,tmp,&znode->score);
2307 if (sdslen(tmp) > maxelelen) maxelelen = sdslen(tmp);
2308 }
2309 }
2310 zuiClearIterator(&src[0]);
2311 }
2312 } else if (op == SET_OP_UNION) {
2313 dict *accumulator = dictCreate(&setAccumulatorDictType,NULL);
2314 dictIterator *di;
2315 dictEntry *de, *existing;
2316 double score;
2317
2318 if (setnum) {
2319 /* Our union is at least as large as the largest set.
2320 * Resize the dictionary ASAP to avoid useless rehashing. */
2321 dictExpand(accumulator,zuiLength(&src[setnum-1]));
2322 }
2323
2324 /* Step 1: Create a dictionary of elements -> aggregated-scores
2325 * by iterating one sorted set after the other. */
2326 for (i = 0; i < setnum; i++) {
2327 if (zuiLength(&src[i]) == 0) continue;
2328
2329 zuiInitIterator(&src[i]);
2330 while (zuiNext(&src[i],&zval)) {
2331 /* Initialize value */
2332 score = src[i].weight * zval.score;
2333 if (isnan(score)) score = 0;
2334
2335 /* Search for this element in the accumulating dictionary. */
2336 de = dictAddRaw(accumulator,zuiSdsFromValue(&zval),&existing);
2337 /* If we don't have it, we need to create a new entry. */
2338 if (!existing) {
2339 tmp = zuiNewSdsFromValue(&zval);
2340 /* Remember the longest single element encountered,
2341 * to understand if it's possible to convert to ziplist
2342 * at the end. */
2343 if (sdslen(tmp) > maxelelen) maxelelen = sdslen(tmp);
2344 /* Update the element with its initial score. */
2345 dictSetKey(accumulator, de, tmp);
2346 dictSetDoubleVal(de,score);
2347 } else {
2348 /* Update the score with the score of the new instance
2349 * of the element found in the current sorted set.
2350 *
2351 * Here we access directly the dictEntry double
2352 * value inside the union as it is a big speedup
2353 * compared to using the getDouble/setDouble API. */
2354 zunionInterAggregate(&existing->v.d,score,aggregate);
2355 }
2356 }
2357 zuiClearIterator(&src[i]);
2358 }
2359
2360 /* Step 2: convert the dictionary into the final sorted set. */
2361 di = dictGetIterator(accumulator);
2362
2363 /* We now are aware of the final size of the resulting sorted set,
2364 * let's resize the dictionary embedded inside the sorted set to the
2365 * right size, in order to save rehashing time. */
2366 dictExpand(dstzset->dict,dictSize(accumulator));
2367
2368 while((de = dictNext(di)) != NULL) {
2369 sds ele = dictGetKey(de);
2370 score = dictGetDoubleVal(de);
2371 znode = zslInsert(dstzset->zsl,score,ele);
2372 dictAdd(dstzset->dict,ele,&znode->score);
2373 }
2374 dictReleaseIterator(di);
2375 dictRelease(accumulator);
2376 } else {
2377 serverPanic("Unknown operator");
2378 }
2379
2380 if (dbDelete(c->db,dstkey))
2381 touched = 1;
2382 if (dstzset->zsl->length) {
2383 zsetConvertToZiplistIfNeeded(dstobj,maxelelen);
2384 dbAdd(c->db,dstkey,dstobj);
2385 addReplyLongLong(c,zsetLength(dstobj));
2386 signalModifiedKey(c->db,dstkey);
2387 notifyKeyspaceEvent(NOTIFY_ZSET,
2388 (op == SET_OP_UNION) ? "zunionstore" : "zinterstore",
2389 dstkey,c->db->id);
2390 server.dirty++;
2391 } else {
2392 decrRefCount(dstobj);
2393 addReply(c,shared.czero);
2394 if (touched) {
2395 signalModifiedKey(c->db,dstkey);
2396 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id);
2397 server.dirty++;
2398 }
2399 }
2400 zfree(src);
2401 }
2402
zunionstoreCommand(client * c)2403 void zunionstoreCommand(client *c) {
2404 zunionInterGenericCommand(c,c->argv[1], SET_OP_UNION);
2405 }
2406
zinterstoreCommand(client * c)2407 void zinterstoreCommand(client *c) {
2408 zunionInterGenericCommand(c,c->argv[1], SET_OP_INTER);
2409 }
2410
zrangeGenericCommand(client * c,int reverse)2411 void zrangeGenericCommand(client *c, int reverse) {
2412 robj *key = c->argv[1];
2413 robj *zobj;
2414 int withscores = 0;
2415 long start;
2416 long end;
2417 long llen;
2418 long rangelen;
2419
2420 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
2421 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
2422
2423 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
2424 withscores = 1;
2425 } else if (c->argc >= 5) {
2426 addReply(c,shared.syntaxerr);
2427 return;
2428 }
2429
2430 if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
2431 || checkType(c,zobj,OBJ_ZSET)) return;
2432
2433 /* Sanitize indexes. */
2434 llen = zsetLength(zobj);
2435 if (start < 0) start = llen+start;
2436 if (end < 0) end = llen+end;
2437 if (start < 0) start = 0;
2438
2439 /* Invariant: start >= 0, so this test will be true when end < 0.
2440 * The range is empty when start > end or start >= length. */
2441 if (start > end || start >= llen) {
2442 addReply(c,shared.emptymultibulk);
2443 return;
2444 }
2445 if (end >= llen) end = llen-1;
2446 rangelen = (end-start)+1;
2447
2448 /* Return the result in form of a multi-bulk reply */
2449 addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
2450
2451 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
2452 unsigned char *zl = zobj->ptr;
2453 unsigned char *eptr, *sptr;
2454 unsigned char *vstr;
2455 unsigned int vlen;
2456 long long vlong;
2457
2458 if (reverse)
2459 eptr = ziplistIndex(zl,-2-(2*start));
2460 else
2461 eptr = ziplistIndex(zl,2*start);
2462
2463 serverAssertWithInfo(c,zobj,eptr != NULL);
2464 sptr = ziplistNext(zl,eptr);
2465
2466 while (rangelen--) {
2467 serverAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL);
2468 serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
2469 if (vstr == NULL)
2470 addReplyBulkLongLong(c,vlong);
2471 else
2472 addReplyBulkCBuffer(c,vstr,vlen);
2473
2474 if (withscores)
2475 addReplyDouble(c,zzlGetScore(sptr));
2476
2477 if (reverse)
2478 zzlPrev(zl,&eptr,&sptr);
2479 else
2480 zzlNext(zl,&eptr,&sptr);
2481 }
2482
2483 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
2484 zset *zs = zobj->ptr;
2485 zskiplist *zsl = zs->zsl;
2486 zskiplistNode *ln;
2487 sds ele;
2488
2489 /* Check if starting point is trivial, before doing log(N) lookup. */
2490 if (reverse) {
2491 ln = zsl->tail;
2492 if (start > 0)
2493 ln = zslGetElementByRank(zsl,llen-start);
2494 } else {
2495 ln = zsl->header->level[0].forward;
2496 if (start > 0)
2497 ln = zslGetElementByRank(zsl,start+1);
2498 }
2499
2500 while(rangelen--) {
2501 serverAssertWithInfo(c,zobj,ln != NULL);
2502 ele = ln->ele;
2503 addReplyBulkCBuffer(c,ele,sdslen(ele));
2504 if (withscores)
2505 addReplyDouble(c,ln->score);
2506 ln = reverse ? ln->backward : ln->level[0].forward;
2507 }
2508 } else {
2509 serverPanic("Unknown sorted set encoding");
2510 }
2511 }
2512
zrangeCommand(client * c)2513 void zrangeCommand(client *c) {
2514 zrangeGenericCommand(c,0);
2515 }
2516
zrevrangeCommand(client * c)2517 void zrevrangeCommand(client *c) {
2518 zrangeGenericCommand(c,1);
2519 }
2520
2521 /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
genericZrangebyscoreCommand(client * c,int reverse)2522 void genericZrangebyscoreCommand(client *c, int reverse) {
2523 zrangespec range;
2524 robj *key = c->argv[1];
2525 robj *zobj;
2526 long offset = 0, limit = -1;
2527 int withscores = 0;
2528 unsigned long rangelen = 0;
2529 void *replylen = NULL;
2530 int minidx, maxidx;
2531
2532 /* Parse the range arguments. */
2533 if (reverse) {
2534 /* Range is given as [max,min] */
2535 maxidx = 2; minidx = 3;
2536 } else {
2537 /* Range is given as [min,max] */
2538 minidx = 2; maxidx = 3;
2539 }
2540
2541 if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != C_OK) {
2542 addReplyError(c,"min or max is not a float");
2543 return;
2544 }
2545
2546 /* Parse optional extra arguments. Note that ZCOUNT will exactly have
2547 * 4 arguments, so we'll never enter the following code path. */
2548 if (c->argc > 4) {
2549 int remaining = c->argc - 4;
2550 int pos = 4;
2551
2552 while (remaining) {
2553 if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
2554 pos++; remaining--;
2555 withscores = 1;
2556 } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
2557 if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL)
2558 != C_OK) ||
2559 (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL)
2560 != C_OK))
2561 {
2562 return;
2563 }
2564 pos += 3; remaining -= 3;
2565 } else {
2566 addReply(c,shared.syntaxerr);
2567 return;
2568 }
2569 }
2570 }
2571
2572 /* Ok, lookup the key and get the range */
2573 if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL ||
2574 checkType(c,zobj,OBJ_ZSET)) return;
2575
2576 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
2577 unsigned char *zl = zobj->ptr;
2578 unsigned char *eptr, *sptr;
2579 unsigned char *vstr;
2580 unsigned int vlen;
2581 long long vlong;
2582 double score;
2583
2584 /* If reversed, get the last node in range as starting point. */
2585 if (reverse) {
2586 eptr = zzlLastInRange(zl,&range);
2587 } else {
2588 eptr = zzlFirstInRange(zl,&range);
2589 }
2590
2591 /* No "first" element in the specified interval. */
2592 if (eptr == NULL) {
2593 addReply(c, shared.emptymultibulk);
2594 return;
2595 }
2596
2597 /* Get score pointer for the first element. */
2598 serverAssertWithInfo(c,zobj,eptr != NULL);
2599 sptr = ziplistNext(zl,eptr);
2600
2601 /* We don't know in advance how many matching elements there are in the
2602 * list, so we push this object that will represent the multi-bulk
2603 * length in the output buffer, and will "fix" it later */
2604 replylen = addDeferredMultiBulkLength(c);
2605
2606 /* If there is an offset, just traverse the number of elements without
2607 * checking the score because that is done in the next loop. */
2608 while (eptr && offset--) {
2609 if (reverse) {
2610 zzlPrev(zl,&eptr,&sptr);
2611 } else {
2612 zzlNext(zl,&eptr,&sptr);
2613 }
2614 }
2615
2616 while (eptr && limit--) {
2617 score = zzlGetScore(sptr);
2618
2619 /* Abort when the node is no longer in range. */
2620 if (reverse) {
2621 if (!zslValueGteMin(score,&range)) break;
2622 } else {
2623 if (!zslValueLteMax(score,&range)) break;
2624 }
2625
2626 /* We know the element exists, so ziplistGet should always succeed */
2627 serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
2628
2629 rangelen++;
2630 if (vstr == NULL) {
2631 addReplyBulkLongLong(c,vlong);
2632 } else {
2633 addReplyBulkCBuffer(c,vstr,vlen);
2634 }
2635
2636 if (withscores) {
2637 addReplyDouble(c,score);
2638 }
2639
2640 /* Move to next node */
2641 if (reverse) {
2642 zzlPrev(zl,&eptr,&sptr);
2643 } else {
2644 zzlNext(zl,&eptr,&sptr);
2645 }
2646 }
2647 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
2648 zset *zs = zobj->ptr;
2649 zskiplist *zsl = zs->zsl;
2650 zskiplistNode *ln;
2651
2652 /* If reversed, get the last node in range as starting point. */
2653 if (reverse) {
2654 ln = zslLastInRange(zsl,&range);
2655 } else {
2656 ln = zslFirstInRange(zsl,&range);
2657 }
2658
2659 /* No "first" element in the specified interval. */
2660 if (ln == NULL) {
2661 addReply(c, shared.emptymultibulk);
2662 return;
2663 }
2664
2665 /* We don't know in advance how many matching elements there are in the
2666 * list, so we push this object that will represent the multi-bulk
2667 * length in the output buffer, and will "fix" it later */
2668 replylen = addDeferredMultiBulkLength(c);
2669
2670 /* If there is an offset, just traverse the number of elements without
2671 * checking the score because that is done in the next loop. */
2672 while (ln && offset--) {
2673 if (reverse) {
2674 ln = ln->backward;
2675 } else {
2676 ln = ln->level[0].forward;
2677 }
2678 }
2679
2680 while (ln && limit--) {
2681 /* Abort when the node is no longer in range. */
2682 if (reverse) {
2683 if (!zslValueGteMin(ln->score,&range)) break;
2684 } else {
2685 if (!zslValueLteMax(ln->score,&range)) break;
2686 }
2687
2688 rangelen++;
2689 addReplyBulkCBuffer(c,ln->ele,sdslen(ln->ele));
2690
2691 if (withscores) {
2692 addReplyDouble(c,ln->score);
2693 }
2694
2695 /* Move to next node */
2696 if (reverse) {
2697 ln = ln->backward;
2698 } else {
2699 ln = ln->level[0].forward;
2700 }
2701 }
2702 } else {
2703 serverPanic("Unknown sorted set encoding");
2704 }
2705
2706 if (withscores) {
2707 rangelen *= 2;
2708 }
2709
2710 setDeferredMultiBulkLength(c, replylen, rangelen);
2711 }
2712
zrangebyscoreCommand(client * c)2713 void zrangebyscoreCommand(client *c) {
2714 genericZrangebyscoreCommand(c,0);
2715 }
2716
zrevrangebyscoreCommand(client * c)2717 void zrevrangebyscoreCommand(client *c) {
2718 genericZrangebyscoreCommand(c,1);
2719 }
2720
zcountCommand(client * c)2721 void zcountCommand(client *c) {
2722 robj *key = c->argv[1];
2723 robj *zobj;
2724 zrangespec range;
2725 unsigned long count = 0;
2726
2727 /* Parse the range arguments */
2728 if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK) {
2729 addReplyError(c,"min or max is not a float");
2730 return;
2731 }
2732
2733 /* Lookup the sorted set */
2734 if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
2735 checkType(c, zobj, OBJ_ZSET)) return;
2736
2737 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
2738 unsigned char *zl = zobj->ptr;
2739 unsigned char *eptr, *sptr;
2740 double score;
2741
2742 /* Use the first element in range as the starting point */
2743 eptr = zzlFirstInRange(zl,&range);
2744
2745 /* No "first" element */
2746 if (eptr == NULL) {
2747 addReply(c, shared.czero);
2748 return;
2749 }
2750
2751 /* First element is in range */
2752 sptr = ziplistNext(zl,eptr);
2753 score = zzlGetScore(sptr);
2754 serverAssertWithInfo(c,zobj,zslValueLteMax(score,&range));
2755
2756 /* Iterate over elements in range */
2757 while (eptr) {
2758 score = zzlGetScore(sptr);
2759
2760 /* Abort when the node is no longer in range. */
2761 if (!zslValueLteMax(score,&range)) {
2762 break;
2763 } else {
2764 count++;
2765 zzlNext(zl,&eptr,&sptr);
2766 }
2767 }
2768 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
2769 zset *zs = zobj->ptr;
2770 zskiplist *zsl = zs->zsl;
2771 zskiplistNode *zn;
2772 unsigned long rank;
2773
2774 /* Find first element in range */
2775 zn = zslFirstInRange(zsl, &range);
2776
2777 /* Use rank of first element, if any, to determine preliminary count */
2778 if (zn != NULL) {
2779 rank = zslGetRank(zsl, zn->score, zn->ele);
2780 count = (zsl->length - (rank - 1));
2781
2782 /* Find last element in range */
2783 zn = zslLastInRange(zsl, &range);
2784
2785 /* Use rank of last element, if any, to determine the actual count */
2786 if (zn != NULL) {
2787 rank = zslGetRank(zsl, zn->score, zn->ele);
2788 count -= (zsl->length - rank);
2789 }
2790 }
2791 } else {
2792 serverPanic("Unknown sorted set encoding");
2793 }
2794
2795 addReplyLongLong(c, count);
2796 }
2797
zlexcountCommand(client * c)2798 void zlexcountCommand(client *c) {
2799 robj *key = c->argv[1];
2800 robj *zobj;
2801 zlexrangespec range;
2802 unsigned long count = 0;
2803
2804 /* Parse the range arguments */
2805 if (zslParseLexRange(c->argv[2],c->argv[3],&range) != C_OK) {
2806 addReplyError(c,"min or max not valid string range item");
2807 return;
2808 }
2809
2810 /* Lookup the sorted set */
2811 if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
2812 checkType(c, zobj, OBJ_ZSET))
2813 {
2814 zslFreeLexRange(&range);
2815 return;
2816 }
2817
2818 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
2819 unsigned char *zl = zobj->ptr;
2820 unsigned char *eptr, *sptr;
2821
2822 /* Use the first element in range as the starting point */
2823 eptr = zzlFirstInLexRange(zl,&range);
2824
2825 /* No "first" element */
2826 if (eptr == NULL) {
2827 zslFreeLexRange(&range);
2828 addReply(c, shared.czero);
2829 return;
2830 }
2831
2832 /* First element is in range */
2833 sptr = ziplistNext(zl,eptr);
2834 serverAssertWithInfo(c,zobj,zzlLexValueLteMax(eptr,&range));
2835
2836 /* Iterate over elements in range */
2837 while (eptr) {
2838 /* Abort when the node is no longer in range. */
2839 if (!zzlLexValueLteMax(eptr,&range)) {
2840 break;
2841 } else {
2842 count++;
2843 zzlNext(zl,&eptr,&sptr);
2844 }
2845 }
2846 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
2847 zset *zs = zobj->ptr;
2848 zskiplist *zsl = zs->zsl;
2849 zskiplistNode *zn;
2850 unsigned long rank;
2851
2852 /* Find first element in range */
2853 zn = zslFirstInLexRange(zsl, &range);
2854
2855 /* Use rank of first element, if any, to determine preliminary count */
2856 if (zn != NULL) {
2857 rank = zslGetRank(zsl, zn->score, zn->ele);
2858 count = (zsl->length - (rank - 1));
2859
2860 /* Find last element in range */
2861 zn = zslLastInLexRange(zsl, &range);
2862
2863 /* Use rank of last element, if any, to determine the actual count */
2864 if (zn != NULL) {
2865 rank = zslGetRank(zsl, zn->score, zn->ele);
2866 count -= (zsl->length - rank);
2867 }
2868 }
2869 } else {
2870 serverPanic("Unknown sorted set encoding");
2871 }
2872
2873 zslFreeLexRange(&range);
2874 addReplyLongLong(c, count);
2875 }
2876
2877 /* This command implements ZRANGEBYLEX, ZREVRANGEBYLEX. */
genericZrangebylexCommand(client * c,int reverse)2878 void genericZrangebylexCommand(client *c, int reverse) {
2879 zlexrangespec range;
2880 robj *key = c->argv[1];
2881 robj *zobj;
2882 long offset = 0, limit = -1;
2883 unsigned long rangelen = 0;
2884 void *replylen = NULL;
2885 int minidx, maxidx;
2886
2887 /* Parse the range arguments. */
2888 if (reverse) {
2889 /* Range is given as [max,min] */
2890 maxidx = 2; minidx = 3;
2891 } else {
2892 /* Range is given as [min,max] */
2893 minidx = 2; maxidx = 3;
2894 }
2895
2896 if (zslParseLexRange(c->argv[minidx],c->argv[maxidx],&range) != C_OK) {
2897 addReplyError(c,"min or max not valid string range item");
2898 return;
2899 }
2900
2901 /* Parse optional extra arguments. Note that ZCOUNT will exactly have
2902 * 4 arguments, so we'll never enter the following code path. */
2903 if (c->argc > 4) {
2904 int remaining = c->argc - 4;
2905 int pos = 4;
2906
2907 while (remaining) {
2908 if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
2909 if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != C_OK) ||
2910 (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != C_OK)) {
2911 zslFreeLexRange(&range);
2912 return;
2913 }
2914 pos += 3; remaining -= 3;
2915 } else {
2916 zslFreeLexRange(&range);
2917 addReply(c,shared.syntaxerr);
2918 return;
2919 }
2920 }
2921 }
2922
2923 /* Ok, lookup the key and get the range */
2924 if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL ||
2925 checkType(c,zobj,OBJ_ZSET))
2926 {
2927 zslFreeLexRange(&range);
2928 return;
2929 }
2930
2931 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
2932 unsigned char *zl = zobj->ptr;
2933 unsigned char *eptr, *sptr;
2934 unsigned char *vstr;
2935 unsigned int vlen;
2936 long long vlong;
2937
2938 /* If reversed, get the last node in range as starting point. */
2939 if (reverse) {
2940 eptr = zzlLastInLexRange(zl,&range);
2941 } else {
2942 eptr = zzlFirstInLexRange(zl,&range);
2943 }
2944
2945 /* No "first" element in the specified interval. */
2946 if (eptr == NULL) {
2947 addReply(c, shared.emptymultibulk);
2948 zslFreeLexRange(&range);
2949 return;
2950 }
2951
2952 /* Get score pointer for the first element. */
2953 serverAssertWithInfo(c,zobj,eptr != NULL);
2954 sptr = ziplistNext(zl,eptr);
2955
2956 /* We don't know in advance how many matching elements there are in the
2957 * list, so we push this object that will represent the multi-bulk
2958 * length in the output buffer, and will "fix" it later */
2959 replylen = addDeferredMultiBulkLength(c);
2960
2961 /* If there is an offset, just traverse the number of elements without
2962 * checking the score because that is done in the next loop. */
2963 while (eptr && offset--) {
2964 if (reverse) {
2965 zzlPrev(zl,&eptr,&sptr);
2966 } else {
2967 zzlNext(zl,&eptr,&sptr);
2968 }
2969 }
2970
2971 while (eptr && limit--) {
2972 /* Abort when the node is no longer in range. */
2973 if (reverse) {
2974 if (!zzlLexValueGteMin(eptr,&range)) break;
2975 } else {
2976 if (!zzlLexValueLteMax(eptr,&range)) break;
2977 }
2978
2979 /* We know the element exists, so ziplistGet should always
2980 * succeed. */
2981 serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
2982
2983 rangelen++;
2984 if (vstr == NULL) {
2985 addReplyBulkLongLong(c,vlong);
2986 } else {
2987 addReplyBulkCBuffer(c,vstr,vlen);
2988 }
2989
2990 /* Move to next node */
2991 if (reverse) {
2992 zzlPrev(zl,&eptr,&sptr);
2993 } else {
2994 zzlNext(zl,&eptr,&sptr);
2995 }
2996 }
2997 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
2998 zset *zs = zobj->ptr;
2999 zskiplist *zsl = zs->zsl;
3000 zskiplistNode *ln;
3001
3002 /* If reversed, get the last node in range as starting point. */
3003 if (reverse) {
3004 ln = zslLastInLexRange(zsl,&range);
3005 } else {
3006 ln = zslFirstInLexRange(zsl,&range);
3007 }
3008
3009 /* No "first" element in the specified interval. */
3010 if (ln == NULL) {
3011 addReply(c, shared.emptymultibulk);
3012 zslFreeLexRange(&range);
3013 return;
3014 }
3015
3016 /* We don't know in advance how many matching elements there are in the
3017 * list, so we push this object that will represent the multi-bulk
3018 * length in the output buffer, and will "fix" it later */
3019 replylen = addDeferredMultiBulkLength(c);
3020
3021 /* If there is an offset, just traverse the number of elements without
3022 * checking the score because that is done in the next loop. */
3023 while (ln && offset--) {
3024 if (reverse) {
3025 ln = ln->backward;
3026 } else {
3027 ln = ln->level[0].forward;
3028 }
3029 }
3030
3031 while (ln && limit--) {
3032 /* Abort when the node is no longer in range. */
3033 if (reverse) {
3034 if (!zslLexValueGteMin(ln->ele,&range)) break;
3035 } else {
3036 if (!zslLexValueLteMax(ln->ele,&range)) break;
3037 }
3038
3039 rangelen++;
3040 addReplyBulkCBuffer(c,ln->ele,sdslen(ln->ele));
3041
3042 /* Move to next node */
3043 if (reverse) {
3044 ln = ln->backward;
3045 } else {
3046 ln = ln->level[0].forward;
3047 }
3048 }
3049 } else {
3050 serverPanic("Unknown sorted set encoding");
3051 }
3052
3053 zslFreeLexRange(&range);
3054 setDeferredMultiBulkLength(c, replylen, rangelen);
3055 }
3056
zrangebylexCommand(client * c)3057 void zrangebylexCommand(client *c) {
3058 genericZrangebylexCommand(c,0);
3059 }
3060
zrevrangebylexCommand(client * c)3061 void zrevrangebylexCommand(client *c) {
3062 genericZrangebylexCommand(c,1);
3063 }
3064
zcardCommand(client * c)3065 void zcardCommand(client *c) {
3066 robj *key = c->argv[1];
3067 robj *zobj;
3068
3069 if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
3070 checkType(c,zobj,OBJ_ZSET)) return;
3071
3072 addReplyLongLong(c,zsetLength(zobj));
3073 }
3074
zscoreCommand(client * c)3075 void zscoreCommand(client *c) {
3076 robj *key = c->argv[1];
3077 robj *zobj;
3078 double score;
3079
3080 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
3081 checkType(c,zobj,OBJ_ZSET)) return;
3082
3083 if (zsetScore(zobj,c->argv[2]->ptr,&score) == C_ERR) {
3084 addReply(c,shared.nullbulk);
3085 } else {
3086 addReplyDouble(c,score);
3087 }
3088 }
3089
zrankGenericCommand(client * c,int reverse)3090 void zrankGenericCommand(client *c, int reverse) {
3091 robj *key = c->argv[1];
3092 robj *ele = c->argv[2];
3093 robj *zobj;
3094 long rank;
3095
3096 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
3097 checkType(c,zobj,OBJ_ZSET)) return;
3098
3099 serverAssertWithInfo(c,ele,sdsEncodedObject(ele));
3100 rank = zsetRank(zobj,ele->ptr,reverse);
3101 if (rank >= 0) {
3102 addReplyLongLong(c,rank);
3103 } else {
3104 addReply(c,shared.nullbulk);
3105 }
3106 }
3107
zrankCommand(client * c)3108 void zrankCommand(client *c) {
3109 zrankGenericCommand(c, 0);
3110 }
3111
zrevrankCommand(client * c)3112 void zrevrankCommand(client *c) {
3113 zrankGenericCommand(c, 1);
3114 }
3115
zscanCommand(client * c)3116 void zscanCommand(client *c) {
3117 robj *o;
3118 unsigned long cursor;
3119
3120 if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return;
3121 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL ||
3122 checkType(c,o,OBJ_ZSET)) return;
3123 scanGenericCommand(c,o,cursor);
3124 }
3125
3126 /* This command implements the generic zpop operation, used by:
3127 * ZPOPMIN, ZPOPMAX, BZPOPMIN and BZPOPMAX. This function is also used
3128 * inside blocked.c in the unblocking stage of BZPOPMIN and BZPOPMAX.
3129 *
3130 * If 'emitkey' is true also the key name is emitted, useful for the blocking
3131 * behavior of BZPOP[MIN|MAX], since we can block into multiple keys.
3132 *
3133 * The synchronous version instead does not need to emit the key, but may
3134 * use the 'count' argument to return multiple items if available. */
genericZpopCommand(client * c,robj ** keyv,int keyc,int where,int emitkey,robj * countarg)3135 void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey, robj *countarg) {
3136 int idx;
3137 robj *key = NULL;
3138 robj *zobj = NULL;
3139 sds ele;
3140 double score;
3141 long count = 1;
3142
3143 /* If a count argument as passed, parse it or return an error. */
3144 if (countarg) {
3145 if (getLongFromObjectOrReply(c,countarg,&count,NULL) != C_OK)
3146 return;
3147 if (count <= 0) {
3148 addReply(c,shared.emptymultibulk);
3149 return;
3150 }
3151 }
3152
3153 /* Check type and break on the first error, otherwise identify candidate. */
3154 idx = 0;
3155 while (idx < keyc) {
3156 key = keyv[idx++];
3157 zobj = lookupKeyWrite(c->db,key);
3158 if (!zobj) continue;
3159 if (checkType(c,zobj,OBJ_ZSET)) return;
3160 break;
3161 }
3162
3163 /* No candidate for zpopping, return empty. */
3164 if (!zobj) {
3165 addReply(c,shared.emptymultibulk);
3166 return;
3167 }
3168
3169 void *arraylen_ptr = addDeferredMultiBulkLength(c);
3170 long arraylen = 0;
3171
3172 /* We emit the key only for the blocking variant. */
3173 if (emitkey) addReplyBulk(c,key);
3174
3175 /* Remove the element. */
3176 do {
3177 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
3178 unsigned char *zl = zobj->ptr;
3179 unsigned char *eptr, *sptr;
3180 unsigned char *vstr;
3181 unsigned int vlen;
3182 long long vlong;
3183
3184 /* Get the first or last element in the sorted set. */
3185 eptr = ziplistIndex(zl,where == ZSET_MAX ? -2 : 0);
3186 serverAssertWithInfo(c,zobj,eptr != NULL);
3187 serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
3188 if (vstr == NULL)
3189 ele = sdsfromlonglong(vlong);
3190 else
3191 ele = sdsnewlen(vstr,vlen);
3192
3193 /* Get the score. */
3194 sptr = ziplistNext(zl,eptr);
3195 serverAssertWithInfo(c,zobj,sptr != NULL);
3196 score = zzlGetScore(sptr);
3197 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
3198 zset *zs = zobj->ptr;
3199 zskiplist *zsl = zs->zsl;
3200 zskiplistNode *zln;
3201
3202 /* Get the first or last element in the sorted set. */
3203 zln = (where == ZSET_MAX ? zsl->tail :
3204 zsl->header->level[0].forward);
3205
3206 /* There must be an element in the sorted set. */
3207 serverAssertWithInfo(c,zobj,zln != NULL);
3208 ele = sdsdup(zln->ele);
3209 score = zln->score;
3210 } else {
3211 serverPanic("Unknown sorted set encoding");
3212 }
3213
3214 serverAssertWithInfo(c,zobj,zsetDel(zobj,ele));
3215 server.dirty++;
3216
3217 if (arraylen == 0) { /* Do this only for the first iteration. */
3218 char *events[2] = {"zpopmin","zpopmax"};
3219 notifyKeyspaceEvent(NOTIFY_ZSET,events[where],key,c->db->id);
3220 signalModifiedKey(c->db,key);
3221 }
3222
3223 addReplyBulkCBuffer(c,ele,sdslen(ele));
3224 addReplyDouble(c,score);
3225 sdsfree(ele);
3226 arraylen += 2;
3227
3228 /* Remove the key, if indeed needed. */
3229 if (zsetLength(zobj) == 0) {
3230 dbDelete(c->db,key);
3231 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
3232 break;
3233 }
3234 } while(--count);
3235
3236 setDeferredMultiBulkLength(c,arraylen_ptr,arraylen + (emitkey != 0));
3237 }
3238
3239 /* ZPOPMIN key [<count>] */
zpopminCommand(client * c)3240 void zpopminCommand(client *c) {
3241 if (c->argc > 3) {
3242 addReply(c,shared.syntaxerr);
3243 return;
3244 }
3245 genericZpopCommand(c,&c->argv[1],1,ZSET_MIN,0,
3246 c->argc == 3 ? c->argv[2] : NULL);
3247 }
3248
3249 /* ZMAXPOP key [<count>] */
zpopmaxCommand(client * c)3250 void zpopmaxCommand(client *c) {
3251 if (c->argc > 3) {
3252 addReply(c,shared.syntaxerr);
3253 return;
3254 }
3255 genericZpopCommand(c,&c->argv[1],1,ZSET_MAX,0,
3256 c->argc == 3 ? c->argv[2] : NULL);
3257 }
3258
3259 /* BZPOPMIN / BZPOPMAX actual implementation. */
blockingGenericZpopCommand(client * c,int where)3260 void blockingGenericZpopCommand(client *c, int where) {
3261 robj *o;
3262 mstime_t timeout;
3263 int j;
3264
3265 if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
3266 != C_OK) return;
3267
3268 for (j = 1; j < c->argc-1; j++) {
3269 o = lookupKeyWrite(c->db,c->argv[j]);
3270 if (o != NULL) {
3271 if (o->type != OBJ_ZSET) {
3272 addReply(c,shared.wrongtypeerr);
3273 return;
3274 } else {
3275 if (zsetLength(o) != 0) {
3276 /* Non empty zset, this is like a normal ZPOP[MIN|MAX]. */
3277 genericZpopCommand(c,&c->argv[j],1,where,1,NULL);
3278 /* Replicate it as an ZPOP[MIN|MAX] instead of BZPOP[MIN|MAX]. */
3279 rewriteClientCommandVector(c,2,
3280 where == ZSET_MAX ? shared.zpopmax : shared.zpopmin,
3281 c->argv[j]);
3282 return;
3283 }
3284 }
3285 }
3286 }
3287
3288 /* If we are inside a MULTI/EXEC and the zset is empty the only thing
3289 * we can do is treating it as a timeout (even with timeout 0). */
3290 if (c->flags & CLIENT_MULTI) {
3291 addReply(c,shared.nullmultibulk);
3292 return;
3293 }
3294
3295 /* If the keys do not exist we must block */
3296 blockForKeys(c,BLOCKED_ZSET,c->argv + 1,c->argc - 2,timeout,NULL,NULL);
3297 }
3298
3299 // BZPOPMIN key [key ...] timeout
bzpopminCommand(client * c)3300 void bzpopminCommand(client *c) {
3301 blockingGenericZpopCommand(c,ZSET_MIN);
3302 }
3303
3304 // BZPOPMAX key [key ...] timeout
bzpopmaxCommand(client * c)3305 void bzpopmaxCommand(client *c) {
3306 blockingGenericZpopCommand(c,ZSET_MAX);
3307 }
3308