xref: /f-stack/app/redis-5.0.5/src/t_zset.c (revision 572c4311)
1 /*
2  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3  * Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are met:
8  *
9  *   * Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *   * Redistributions in binary form must reproduce the above copyright
12  *     notice, this list of conditions and the following disclaimer in the
13  *     documentation and/or other materials provided with the distribution.
14  *   * Neither the name of Redis nor the names of its contributors may be used
15  *     to endorse or promote products derived from this software without
16  *     specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 /*-----------------------------------------------------------------------------
32  * Sorted set API
33  *----------------------------------------------------------------------------*/
34 
35 /* ZSETs are ordered sets using two data structures to hold the same elements
36  * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
37  * data structure.
38  *
39  * The elements are added to a hash table mapping Redis objects to scores.
40  * At the same time the elements are added to a skip list mapping scores
41  * to Redis objects (so objects are sorted by scores in this "view").
42  *
43  * Note that the SDS string representing the element is the same in both
44  * the hash table and skiplist in order to save memory. What we do in order
45  * to manage the shared SDS string more easily is to free the SDS string
46  * only in zslFreeNode(). The dictionary has no value free method set.
47  * So we should always remove an element from the dictionary, and later from
48  * the skiplist.
49  *
50  * This skiplist implementation is almost a C translation of the original
51  * algorithm described by William Pugh in "Skip Lists: A Probabilistic
52  * Alternative to Balanced Trees", modified in three ways:
53  * a) this implementation allows for repeated scores.
54  * b) the comparison is not just by key (our 'score') but by satellite data.
55  * c) there is a back pointer, so it's a doubly linked list with the back
56  * pointers being only at "level 1". This allows to traverse the list
57  * from tail to head, useful for ZREVRANGE. */
58 
59 #include "server.h"
60 #include <math.h>
61 
62 /*-----------------------------------------------------------------------------
63  * Skiplist implementation of the low level API
64  *----------------------------------------------------------------------------*/
65 
66 int zslLexValueGteMin(sds value, zlexrangespec *spec);
67 int zslLexValueLteMax(sds value, zlexrangespec *spec);
68 
69 /* Create a skiplist node with the specified number of levels.
70  * The SDS string 'ele' is referenced by the node after the call. */
zslCreateNode(int level,double score,sds ele)71 zskiplistNode *zslCreateNode(int level, double score, sds ele) {
72     zskiplistNode *zn =
73         zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
74     zn->score = score;
75     zn->ele = ele;
76     return zn;
77 }
78 
79 /* Create a new skiplist. */
zslCreate(void)80 zskiplist *zslCreate(void) {
81     int j;
82     zskiplist *zsl;
83 
84     zsl = zmalloc(sizeof(*zsl));
85     zsl->level = 1;
86     zsl->length = 0;
87     zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
88     for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
89         zsl->header->level[j].forward = NULL;
90         zsl->header->level[j].span = 0;
91     }
92     zsl->header->backward = NULL;
93     zsl->tail = NULL;
94     return zsl;
95 }
96 
97 /* Free the specified skiplist node. The referenced SDS string representation
98  * of the element is freed too, unless node->ele is set to NULL before calling
99  * this function. */
zslFreeNode(zskiplistNode * node)100 void zslFreeNode(zskiplistNode *node) {
101     sdsfree(node->ele);
102     zfree(node);
103 }
104 
105 /* Free a whole skiplist. */
zslFree(zskiplist * zsl)106 void zslFree(zskiplist *zsl) {
107     zskiplistNode *node = zsl->header->level[0].forward, *next;
108 
109     zfree(zsl->header);
110     while(node) {
111         next = node->level[0].forward;
112         zslFreeNode(node);
113         node = next;
114     }
115     zfree(zsl);
116 }
117 
118 /* Returns a random level for the new skiplist node we are going to create.
119  * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
120  * (both inclusive), with a powerlaw-alike distribution where higher
121  * levels are less likely to be returned. */
zslRandomLevel(void)122 int zslRandomLevel(void) {
123     int level = 1;
124     while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
125         level += 1;
126     return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
127 }
128 
129 /* Insert a new node in the skiplist. Assumes the element does not already
130  * exist (up to the caller to enforce that). The skiplist takes ownership
131  * of the passed SDS string 'ele'. */
zslInsert(zskiplist * zsl,double score,sds ele)132 zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
133     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
134     unsigned int rank[ZSKIPLIST_MAXLEVEL];
135     int i, level;
136 
137     serverAssert(!isnan(score));
138     x = zsl->header;
139     for (i = zsl->level-1; i >= 0; i--) {
140         /* store rank that is crossed to reach the insert position */
141         rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
142         while (x->level[i].forward &&
143                 (x->level[i].forward->score < score ||
144                     (x->level[i].forward->score == score &&
145                     sdscmp(x->level[i].forward->ele,ele) < 0)))
146         {
147             rank[i] += x->level[i].span;
148             x = x->level[i].forward;
149         }
150         update[i] = x;
151     }
152     /* we assume the element is not already inside, since we allow duplicated
153      * scores, reinserting the same element should never happen since the
154      * caller of zslInsert() should test in the hash table if the element is
155      * already inside or not. */
156     level = zslRandomLevel();
157     if (level > zsl->level) {
158         for (i = zsl->level; i < level; i++) {
159             rank[i] = 0;
160             update[i] = zsl->header;
161             update[i]->level[i].span = zsl->length;
162         }
163         zsl->level = level;
164     }
165     x = zslCreateNode(level,score,ele);
166     for (i = 0; i < level; i++) {
167         x->level[i].forward = update[i]->level[i].forward;
168         update[i]->level[i].forward = x;
169 
170         /* update span covered by update[i] as x is inserted here */
171         x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
172         update[i]->level[i].span = (rank[0] - rank[i]) + 1;
173     }
174 
175     /* increment span for untouched levels */
176     for (i = level; i < zsl->level; i++) {
177         update[i]->level[i].span++;
178     }
179 
180     x->backward = (update[0] == zsl->header) ? NULL : update[0];
181     if (x->level[0].forward)
182         x->level[0].forward->backward = x;
183     else
184         zsl->tail = x;
185     zsl->length++;
186     return x;
187 }
188 
189 /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
zslDeleteNode(zskiplist * zsl,zskiplistNode * x,zskiplistNode ** update)190 void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
191     int i;
192     for (i = 0; i < zsl->level; i++) {
193         if (update[i]->level[i].forward == x) {
194             update[i]->level[i].span += x->level[i].span - 1;
195             update[i]->level[i].forward = x->level[i].forward;
196         } else {
197             update[i]->level[i].span -= 1;
198         }
199     }
200     if (x->level[0].forward) {
201         x->level[0].forward->backward = x->backward;
202     } else {
203         zsl->tail = x->backward;
204     }
205     while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
206         zsl->level--;
207     zsl->length--;
208 }
209 
210 /* Delete an element with matching score/element from the skiplist.
211  * The function returns 1 if the node was found and deleted, otherwise
212  * 0 is returned.
213  *
214  * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
215  * it is not freed (but just unlinked) and *node is set to the node pointer,
216  * so that it is possible for the caller to reuse the node (including the
217  * referenced SDS string at node->ele). */
zslDelete(zskiplist * zsl,double score,sds ele,zskiplistNode ** node)218 int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
219     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
220     int i;
221 
222     x = zsl->header;
223     for (i = zsl->level-1; i >= 0; i--) {
224         while (x->level[i].forward &&
225                 (x->level[i].forward->score < score ||
226                     (x->level[i].forward->score == score &&
227                      sdscmp(x->level[i].forward->ele,ele) < 0)))
228         {
229             x = x->level[i].forward;
230         }
231         update[i] = x;
232     }
233     /* We may have multiple elements with the same score, what we need
234      * is to find the element with both the right score and object. */
235     x = x->level[0].forward;
236     if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
237         zslDeleteNode(zsl, x, update);
238         if (!node)
239             zslFreeNode(x);
240         else
241             *node = x;
242         return 1;
243     }
244     return 0; /* not found */
245 }
246 
247 /* Update the score of an elmenent inside the sorted set skiplist.
248  * Note that the element must exist and must match 'score'.
249  * This function does not update the score in the hash table side, the
250  * caller should take care of it.
251  *
252  * Note that this function attempts to just update the node, in case after
253  * the score update, the node would be exactly at the same position.
254  * Otherwise the skiplist is modified by removing and re-adding a new
255  * element, which is more costly.
256  *
257  * The function returns the updated element skiplist node pointer. */
zslUpdateScore(zskiplist * zsl,double curscore,sds ele,double newscore)258 zskiplistNode *zslUpdateScore(zskiplist *zsl, double curscore, sds ele, double newscore) {
259     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
260     int i;
261 
262     /* We need to seek to element to update to start: this is useful anyway,
263      * we'll have to update or remove it. */
264     x = zsl->header;
265     for (i = zsl->level-1; i >= 0; i--) {
266         while (x->level[i].forward &&
267                 (x->level[i].forward->score < curscore ||
268                     (x->level[i].forward->score == curscore &&
269                      sdscmp(x->level[i].forward->ele,ele) < 0)))
270         {
271             x = x->level[i].forward;
272         }
273         update[i] = x;
274     }
275 
276     /* Jump to our element: note that this function assumes that the
277      * element with the matching score exists. */
278     x = x->level[0].forward;
279     serverAssert(x && curscore == x->score && sdscmp(x->ele,ele) == 0);
280 
281     /* If the node, after the score update, would be still exactly
282      * at the same position, we can just update the score without
283      * actually removing and re-inserting the element in the skiplist. */
284     if ((x->backward == NULL || x->backward->score < newscore) &&
285         (x->level[0].forward == NULL || x->level[0].forward->score > newscore))
286     {
287         x->score = newscore;
288         return x;
289     }
290 
291     /* No way to reuse the old node: we need to remove and insert a new
292      * one at a different place. */
293     zslDeleteNode(zsl, x, update);
294     zskiplistNode *newnode = zslInsert(zsl,newscore,x->ele);
295     /* We reused the old node x->ele SDS string, free the node now
296      * since zslInsert created a new one. */
297     x->ele = NULL;
298     zslFreeNode(x);
299     return newnode;
300 }
301 
zslValueGteMin(double value,zrangespec * spec)302 int zslValueGteMin(double value, zrangespec *spec) {
303     return spec->minex ? (value > spec->min) : (value >= spec->min);
304 }
305 
zslValueLteMax(double value,zrangespec * spec)306 int zslValueLteMax(double value, zrangespec *spec) {
307     return spec->maxex ? (value < spec->max) : (value <= spec->max);
308 }
309 
310 /* Returns if there is a part of the zset is in range. */
zslIsInRange(zskiplist * zsl,zrangespec * range)311 int zslIsInRange(zskiplist *zsl, zrangespec *range) {
312     zskiplistNode *x;
313 
314     /* Test for ranges that will always be empty. */
315     if (range->min > range->max ||
316             (range->min == range->max && (range->minex || range->maxex)))
317         return 0;
318     x = zsl->tail;
319     if (x == NULL || !zslValueGteMin(x->score,range))
320         return 0;
321     x = zsl->header->level[0].forward;
322     if (x == NULL || !zslValueLteMax(x->score,range))
323         return 0;
324     return 1;
325 }
326 
327 /* Find the first node that is contained in the specified range.
328  * Returns NULL when no element is contained in the range. */
zslFirstInRange(zskiplist * zsl,zrangespec * range)329 zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) {
330     zskiplistNode *x;
331     int i;
332 
333     /* If everything is out of range, return early. */
334     if (!zslIsInRange(zsl,range)) return NULL;
335 
336     x = zsl->header;
337     for (i = zsl->level-1; i >= 0; i--) {
338         /* Go forward while *OUT* of range. */
339         while (x->level[i].forward &&
340             !zslValueGteMin(x->level[i].forward->score,range))
341                 x = x->level[i].forward;
342     }
343 
344     /* This is an inner range, so the next node cannot be NULL. */
345     x = x->level[0].forward;
346     serverAssert(x != NULL);
347 
348     /* Check if score <= max. */
349     if (!zslValueLteMax(x->score,range)) return NULL;
350     return x;
351 }
352 
353 /* Find the last node that is contained in the specified range.
354  * Returns NULL when no element is contained in the range. */
zslLastInRange(zskiplist * zsl,zrangespec * range)355 zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range) {
356     zskiplistNode *x;
357     int i;
358 
359     /* If everything is out of range, return early. */
360     if (!zslIsInRange(zsl,range)) return NULL;
361 
362     x = zsl->header;
363     for (i = zsl->level-1; i >= 0; i--) {
364         /* Go forward while *IN* range. */
365         while (x->level[i].forward &&
366             zslValueLteMax(x->level[i].forward->score,range))
367                 x = x->level[i].forward;
368     }
369 
370     /* This is an inner range, so this node cannot be NULL. */
371     serverAssert(x != NULL);
372 
373     /* Check if score >= min. */
374     if (!zslValueGteMin(x->score,range)) return NULL;
375     return x;
376 }
377 
378 /* Delete all the elements with score between min and max from the skiplist.
379  * Min and max are inclusive, so a score >= min || score <= max is deleted.
380  * Note that this function takes the reference to the hash table view of the
381  * sorted set, in order to remove the elements from the hash table too. */
zslDeleteRangeByScore(zskiplist * zsl,zrangespec * range,dict * dict)382 unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dict) {
383     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
384     unsigned long removed = 0;
385     int i;
386 
387     x = zsl->header;
388     for (i = zsl->level-1; i >= 0; i--) {
389         while (x->level[i].forward && (range->minex ?
390             x->level[i].forward->score <= range->min :
391             x->level[i].forward->score < range->min))
392                 x = x->level[i].forward;
393         update[i] = x;
394     }
395 
396     /* Current node is the last with score < or <= min. */
397     x = x->level[0].forward;
398 
399     /* Delete nodes while in range. */
400     while (x &&
401            (range->maxex ? x->score < range->max : x->score <= range->max))
402     {
403         zskiplistNode *next = x->level[0].forward;
404         zslDeleteNode(zsl,x,update);
405         dictDelete(dict,x->ele);
406         zslFreeNode(x); /* Here is where x->ele is actually released. */
407         removed++;
408         x = next;
409     }
410     return removed;
411 }
412 
zslDeleteRangeByLex(zskiplist * zsl,zlexrangespec * range,dict * dict)413 unsigned long zslDeleteRangeByLex(zskiplist *zsl, zlexrangespec *range, dict *dict) {
414     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
415     unsigned long removed = 0;
416     int i;
417 
418 
419     x = zsl->header;
420     for (i = zsl->level-1; i >= 0; i--) {
421         while (x->level[i].forward &&
422             !zslLexValueGteMin(x->level[i].forward->ele,range))
423                 x = x->level[i].forward;
424         update[i] = x;
425     }
426 
427     /* Current node is the last with score < or <= min. */
428     x = x->level[0].forward;
429 
430     /* Delete nodes while in range. */
431     while (x && zslLexValueLteMax(x->ele,range)) {
432         zskiplistNode *next = x->level[0].forward;
433         zslDeleteNode(zsl,x,update);
434         dictDelete(dict,x->ele);
435         zslFreeNode(x); /* Here is where x->ele is actually released. */
436         removed++;
437         x = next;
438     }
439     return removed;
440 }
441 
442 /* Delete all the elements with rank between start and end from the skiplist.
443  * Start and end are inclusive. Note that start and end need to be 1-based */
zslDeleteRangeByRank(zskiplist * zsl,unsigned int start,unsigned int end,dict * dict)444 unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
445     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
446     unsigned long traversed = 0, removed = 0;
447     int i;
448 
449     x = zsl->header;
450     for (i = zsl->level-1; i >= 0; i--) {
451         while (x->level[i].forward && (traversed + x->level[i].span) < start) {
452             traversed += x->level[i].span;
453             x = x->level[i].forward;
454         }
455         update[i] = x;
456     }
457 
458     traversed++;
459     x = x->level[0].forward;
460     while (x && traversed <= end) {
461         zskiplistNode *next = x->level[0].forward;
462         zslDeleteNode(zsl,x,update);
463         dictDelete(dict,x->ele);
464         zslFreeNode(x);
465         removed++;
466         traversed++;
467         x = next;
468     }
469     return removed;
470 }
471 
472 /* Find the rank for an element by both score and key.
473  * Returns 0 when the element cannot be found, rank otherwise.
474  * Note that the rank is 1-based due to the span of zsl->header to the
475  * first element. */
zslGetRank(zskiplist * zsl,double score,sds ele)476 unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
477     zskiplistNode *x;
478     unsigned long rank = 0;
479     int i;
480 
481     x = zsl->header;
482     for (i = zsl->level-1; i >= 0; i--) {
483         while (x->level[i].forward &&
484             (x->level[i].forward->score < score ||
485                 (x->level[i].forward->score == score &&
486                 sdscmp(x->level[i].forward->ele,ele) <= 0))) {
487             rank += x->level[i].span;
488             x = x->level[i].forward;
489         }
490 
491         /* x might be equal to zsl->header, so test if obj is non-NULL */
492         if (x->ele && sdscmp(x->ele,ele) == 0) {
493             return rank;
494         }
495     }
496     return 0;
497 }
498 
499 /* Finds an element by its rank. The rank argument needs to be 1-based. */
zslGetElementByRank(zskiplist * zsl,unsigned long rank)500 zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
501     zskiplistNode *x;
502     unsigned long traversed = 0;
503     int i;
504 
505     x = zsl->header;
506     for (i = zsl->level-1; i >= 0; i--) {
507         while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
508         {
509             traversed += x->level[i].span;
510             x = x->level[i].forward;
511         }
512         if (traversed == rank) {
513             return x;
514         }
515     }
516     return NULL;
517 }
518 
519 /* Populate the rangespec according to the objects min and max. */
zslParseRange(robj * min,robj * max,zrangespec * spec)520 static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
521     char *eptr;
522     spec->minex = spec->maxex = 0;
523 
524     /* Parse the min-max interval. If one of the values is prefixed
525      * by the "(" character, it's considered "open". For instance
526      * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
527      * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
528     if (min->encoding == OBJ_ENCODING_INT) {
529         spec->min = (long)min->ptr;
530     } else {
531         if (((char*)min->ptr)[0] == '(') {
532             spec->min = strtod((char*)min->ptr+1,&eptr);
533             if (eptr[0] != '\0' || isnan(spec->min)) return C_ERR;
534             spec->minex = 1;
535         } else {
536             spec->min = strtod((char*)min->ptr,&eptr);
537             if (eptr[0] != '\0' || isnan(spec->min)) return C_ERR;
538         }
539     }
540     if (max->encoding == OBJ_ENCODING_INT) {
541         spec->max = (long)max->ptr;
542     } else {
543         if (((char*)max->ptr)[0] == '(') {
544             spec->max = strtod((char*)max->ptr+1,&eptr);
545             if (eptr[0] != '\0' || isnan(spec->max)) return C_ERR;
546             spec->maxex = 1;
547         } else {
548             spec->max = strtod((char*)max->ptr,&eptr);
549             if (eptr[0] != '\0' || isnan(spec->max)) return C_ERR;
550         }
551     }
552 
553     return C_OK;
554 }
555 
556 /* ------------------------ Lexicographic ranges ---------------------------- */
557 
558 /* Parse max or min argument of ZRANGEBYLEX.
559   * (foo means foo (open interval)
560   * [foo means foo (closed interval)
561   * - means the min string possible
562   * + means the max string possible
563   *
564   * If the string is valid the *dest pointer is set to the redis object
565   * that will be used for the comparison, and ex will be set to 0 or 1
566   * respectively if the item is exclusive or inclusive. C_OK will be
567   * returned.
568   *
569   * If the string is not a valid range C_ERR is returned, and the value
570   * of *dest and *ex is undefined. */
zslParseLexRangeItem(robj * item,sds * dest,int * ex)571 int zslParseLexRangeItem(robj *item, sds *dest, int *ex) {
572     char *c = item->ptr;
573 
574     switch(c[0]) {
575     case '+':
576         if (c[1] != '\0') return C_ERR;
577         *ex = 1;
578         *dest = shared.maxstring;
579         return C_OK;
580     case '-':
581         if (c[1] != '\0') return C_ERR;
582         *ex = 1;
583         *dest = shared.minstring;
584         return C_OK;
585     case '(':
586         *ex = 1;
587         *dest = sdsnewlen(c+1,sdslen(c)-1);
588         return C_OK;
589     case '[':
590         *ex = 0;
591         *dest = sdsnewlen(c+1,sdslen(c)-1);
592         return C_OK;
593     default:
594         return C_ERR;
595     }
596 }
597 
598 /* Free a lex range structure, must be called only after zelParseLexRange()
599  * populated the structure with success (C_OK returned). */
zslFreeLexRange(zlexrangespec * spec)600 void zslFreeLexRange(zlexrangespec *spec) {
601     if (spec->min != shared.minstring &&
602         spec->min != shared.maxstring) sdsfree(spec->min);
603     if (spec->max != shared.minstring &&
604         spec->max != shared.maxstring) sdsfree(spec->max);
605 }
606 
607 /* Populate the lex rangespec according to the objects min and max.
608  *
609  * Return C_OK on success. On error C_ERR is returned.
610  * When OK is returned the structure must be freed with zslFreeLexRange(),
611  * otherwise no release is needed. */
zslParseLexRange(robj * min,robj * max,zlexrangespec * spec)612 int zslParseLexRange(robj *min, robj *max, zlexrangespec *spec) {
613     /* The range can't be valid if objects are integer encoded.
614      * Every item must start with ( or [. */
615     if (min->encoding == OBJ_ENCODING_INT ||
616         max->encoding == OBJ_ENCODING_INT) return C_ERR;
617 
618     spec->min = spec->max = NULL;
619     if (zslParseLexRangeItem(min, &spec->min, &spec->minex) == C_ERR ||
620         zslParseLexRangeItem(max, &spec->max, &spec->maxex) == C_ERR) {
621         zslFreeLexRange(spec);
622         return C_ERR;
623     } else {
624         return C_OK;
625     }
626 }
627 
628 /* This is just a wrapper to sdscmp() that is able to
629  * handle shared.minstring and shared.maxstring as the equivalent of
630  * -inf and +inf for strings */
sdscmplex(sds a,sds b)631 int sdscmplex(sds a, sds b) {
632     if (a == b) return 0;
633     if (a == shared.minstring || b == shared.maxstring) return -1;
634     if (a == shared.maxstring || b == shared.minstring) return 1;
635     return sdscmp(a,b);
636 }
637 
zslLexValueGteMin(sds value,zlexrangespec * spec)638 int zslLexValueGteMin(sds value, zlexrangespec *spec) {
639     return spec->minex ?
640         (sdscmplex(value,spec->min) > 0) :
641         (sdscmplex(value,spec->min) >= 0);
642 }
643 
zslLexValueLteMax(sds value,zlexrangespec * spec)644 int zslLexValueLteMax(sds value, zlexrangespec *spec) {
645     return spec->maxex ?
646         (sdscmplex(value,spec->max) < 0) :
647         (sdscmplex(value,spec->max) <= 0);
648 }
649 
650 /* Returns if there is a part of the zset is in the lex range. */
zslIsInLexRange(zskiplist * zsl,zlexrangespec * range)651 int zslIsInLexRange(zskiplist *zsl, zlexrangespec *range) {
652     zskiplistNode *x;
653 
654     /* Test for ranges that will always be empty. */
655     int cmp = sdscmplex(range->min,range->max);
656     if (cmp > 0 || (cmp == 0 && (range->minex || range->maxex)))
657         return 0;
658     x = zsl->tail;
659     if (x == NULL || !zslLexValueGteMin(x->ele,range))
660         return 0;
661     x = zsl->header->level[0].forward;
662     if (x == NULL || !zslLexValueLteMax(x->ele,range))
663         return 0;
664     return 1;
665 }
666 
667 /* Find the first node that is contained in the specified lex range.
668  * Returns NULL when no element is contained in the range. */
zslFirstInLexRange(zskiplist * zsl,zlexrangespec * range)669 zskiplistNode *zslFirstInLexRange(zskiplist *zsl, zlexrangespec *range) {
670     zskiplistNode *x;
671     int i;
672 
673     /* If everything is out of range, return early. */
674     if (!zslIsInLexRange(zsl,range)) return NULL;
675 
676     x = zsl->header;
677     for (i = zsl->level-1; i >= 0; i--) {
678         /* Go forward while *OUT* of range. */
679         while (x->level[i].forward &&
680             !zslLexValueGteMin(x->level[i].forward->ele,range))
681                 x = x->level[i].forward;
682     }
683 
684     /* This is an inner range, so the next node cannot be NULL. */
685     x = x->level[0].forward;
686     serverAssert(x != NULL);
687 
688     /* Check if score <= max. */
689     if (!zslLexValueLteMax(x->ele,range)) return NULL;
690     return x;
691 }
692 
693 /* Find the last node that is contained in the specified range.
694  * Returns NULL when no element is contained in the range. */
zslLastInLexRange(zskiplist * zsl,zlexrangespec * range)695 zskiplistNode *zslLastInLexRange(zskiplist *zsl, zlexrangespec *range) {
696     zskiplistNode *x;
697     int i;
698 
699     /* If everything is out of range, return early. */
700     if (!zslIsInLexRange(zsl,range)) return NULL;
701 
702     x = zsl->header;
703     for (i = zsl->level-1; i >= 0; i--) {
704         /* Go forward while *IN* range. */
705         while (x->level[i].forward &&
706             zslLexValueLteMax(x->level[i].forward->ele,range))
707                 x = x->level[i].forward;
708     }
709 
710     /* This is an inner range, so this node cannot be NULL. */
711     serverAssert(x != NULL);
712 
713     /* Check if score >= min. */
714     if (!zslLexValueGteMin(x->ele,range)) return NULL;
715     return x;
716 }
717 
718 /*-----------------------------------------------------------------------------
719  * Ziplist-backed sorted set API
720  *----------------------------------------------------------------------------*/
721 
zzlGetScore(unsigned char * sptr)722 double zzlGetScore(unsigned char *sptr) {
723     unsigned char *vstr;
724     unsigned int vlen;
725     long long vlong;
726     char buf[128];
727     double score;
728 
729     serverAssert(sptr != NULL);
730     serverAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
731 
732     if (vstr) {
733         memcpy(buf,vstr,vlen);
734         buf[vlen] = '\0';
735         score = strtod(buf,NULL);
736     } else {
737         score = vlong;
738     }
739 
740     return score;
741 }
742 
743 /* Return a ziplist element as an SDS string. */
ziplistGetObject(unsigned char * sptr)744 sds ziplistGetObject(unsigned char *sptr) {
745     unsigned char *vstr;
746     unsigned int vlen;
747     long long vlong;
748 
749     serverAssert(sptr != NULL);
750     serverAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
751 
752     if (vstr) {
753         return sdsnewlen((char*)vstr,vlen);
754     } else {
755         return sdsfromlonglong(vlong);
756     }
757 }
758 
759 /* Compare element in sorted set with given element. */
zzlCompareElements(unsigned char * eptr,unsigned char * cstr,unsigned int clen)760 int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
761     unsigned char *vstr;
762     unsigned int vlen;
763     long long vlong;
764     unsigned char vbuf[32];
765     int minlen, cmp;
766 
767     serverAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
768     if (vstr == NULL) {
769         /* Store string representation of long long in buf. */
770         vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
771         vstr = vbuf;
772     }
773 
774     minlen = (vlen < clen) ? vlen : clen;
775     cmp = memcmp(vstr,cstr,minlen);
776     if (cmp == 0) return vlen-clen;
777     return cmp;
778 }
779 
zzlLength(unsigned char * zl)780 unsigned int zzlLength(unsigned char *zl) {
781     return ziplistLen(zl)/2;
782 }
783 
784 /* Move to next entry based on the values in eptr and sptr. Both are set to
785  * NULL when there is no next entry. */
zzlNext(unsigned char * zl,unsigned char ** eptr,unsigned char ** sptr)786 void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
787     unsigned char *_eptr, *_sptr;
788     serverAssert(*eptr != NULL && *sptr != NULL);
789 
790     _eptr = ziplistNext(zl,*sptr);
791     if (_eptr != NULL) {
792         _sptr = ziplistNext(zl,_eptr);
793         serverAssert(_sptr != NULL);
794     } else {
795         /* No next entry. */
796         _sptr = NULL;
797     }
798 
799     *eptr = _eptr;
800     *sptr = _sptr;
801 }
802 
803 /* Move to the previous entry based on the values in eptr and sptr. Both are
804  * set to NULL when there is no next entry. */
zzlPrev(unsigned char * zl,unsigned char ** eptr,unsigned char ** sptr)805 void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
806     unsigned char *_eptr, *_sptr;
807     serverAssert(*eptr != NULL && *sptr != NULL);
808 
809     _sptr = ziplistPrev(zl,*eptr);
810     if (_sptr != NULL) {
811         _eptr = ziplistPrev(zl,_sptr);
812         serverAssert(_eptr != NULL);
813     } else {
814         /* No previous entry. */
815         _eptr = NULL;
816     }
817 
818     *eptr = _eptr;
819     *sptr = _sptr;
820 }
821 
822 /* Returns if there is a part of the zset is in range. Should only be used
823  * internally by zzlFirstInRange and zzlLastInRange. */
zzlIsInRange(unsigned char * zl,zrangespec * range)824 int zzlIsInRange(unsigned char *zl, zrangespec *range) {
825     unsigned char *p;
826     double score;
827 
828     /* Test for ranges that will always be empty. */
829     if (range->min > range->max ||
830             (range->min == range->max && (range->minex || range->maxex)))
831         return 0;
832 
833     p = ziplistIndex(zl,-1); /* Last score. */
834     if (p == NULL) return 0; /* Empty sorted set */
835     score = zzlGetScore(p);
836     if (!zslValueGteMin(score,range))
837         return 0;
838 
839     p = ziplistIndex(zl,1); /* First score. */
840     serverAssert(p != NULL);
841     score = zzlGetScore(p);
842     if (!zslValueLteMax(score,range))
843         return 0;
844 
845     return 1;
846 }
847 
848 /* Find pointer to the first element contained in the specified range.
849  * Returns NULL when no element is contained in the range. */
zzlFirstInRange(unsigned char * zl,zrangespec * range)850 unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec *range) {
851     unsigned char *eptr = ziplistIndex(zl,0), *sptr;
852     double score;
853 
854     /* If everything is out of range, return early. */
855     if (!zzlIsInRange(zl,range)) return NULL;
856 
857     while (eptr != NULL) {
858         sptr = ziplistNext(zl,eptr);
859         serverAssert(sptr != NULL);
860 
861         score = zzlGetScore(sptr);
862         if (zslValueGteMin(score,range)) {
863             /* Check if score <= max. */
864             if (zslValueLteMax(score,range))
865                 return eptr;
866             return NULL;
867         }
868 
869         /* Move to next element. */
870         eptr = ziplistNext(zl,sptr);
871     }
872 
873     return NULL;
874 }
875 
876 /* Find pointer to the last element contained in the specified range.
877  * Returns NULL when no element is contained in the range. */
zzlLastInRange(unsigned char * zl,zrangespec * range)878 unsigned char *zzlLastInRange(unsigned char *zl, zrangespec *range) {
879     unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
880     double score;
881 
882     /* If everything is out of range, return early. */
883     if (!zzlIsInRange(zl,range)) return NULL;
884 
885     while (eptr != NULL) {
886         sptr = ziplistNext(zl,eptr);
887         serverAssert(sptr != NULL);
888 
889         score = zzlGetScore(sptr);
890         if (zslValueLteMax(score,range)) {
891             /* Check if score >= min. */
892             if (zslValueGteMin(score,range))
893                 return eptr;
894             return NULL;
895         }
896 
897         /* Move to previous element by moving to the score of previous element.
898          * When this returns NULL, we know there also is no element. */
899         sptr = ziplistPrev(zl,eptr);
900         if (sptr != NULL)
901             serverAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
902         else
903             eptr = NULL;
904     }
905 
906     return NULL;
907 }
908 
zzlLexValueGteMin(unsigned char * p,zlexrangespec * spec)909 int zzlLexValueGteMin(unsigned char *p, zlexrangespec *spec) {
910     sds value = ziplistGetObject(p);
911     int res = zslLexValueGteMin(value,spec);
912     sdsfree(value);
913     return res;
914 }
915 
zzlLexValueLteMax(unsigned char * p,zlexrangespec * spec)916 int zzlLexValueLteMax(unsigned char *p, zlexrangespec *spec) {
917     sds value = ziplistGetObject(p);
918     int res = zslLexValueLteMax(value,spec);
919     sdsfree(value);
920     return res;
921 }
922 
923 /* Returns if there is a part of the zset is in range. Should only be used
924  * internally by zzlFirstInRange and zzlLastInRange. */
zzlIsInLexRange(unsigned char * zl,zlexrangespec * range)925 int zzlIsInLexRange(unsigned char *zl, zlexrangespec *range) {
926     unsigned char *p;
927 
928     /* Test for ranges that will always be empty. */
929     int cmp = sdscmplex(range->min,range->max);
930     if (cmp > 0 || (cmp == 0 && (range->minex || range->maxex)))
931         return 0;
932 
933     p = ziplistIndex(zl,-2); /* Last element. */
934     if (p == NULL) return 0;
935     if (!zzlLexValueGteMin(p,range))
936         return 0;
937 
938     p = ziplistIndex(zl,0); /* First element. */
939     serverAssert(p != NULL);
940     if (!zzlLexValueLteMax(p,range))
941         return 0;
942 
943     return 1;
944 }
945 
946 /* Find pointer to the first element contained in the specified lex range.
947  * Returns NULL when no element is contained in the range. */
zzlFirstInLexRange(unsigned char * zl,zlexrangespec * range)948 unsigned char *zzlFirstInLexRange(unsigned char *zl, zlexrangespec *range) {
949     unsigned char *eptr = ziplistIndex(zl,0), *sptr;
950 
951     /* If everything is out of range, return early. */
952     if (!zzlIsInLexRange(zl,range)) return NULL;
953 
954     while (eptr != NULL) {
955         if (zzlLexValueGteMin(eptr,range)) {
956             /* Check if score <= max. */
957             if (zzlLexValueLteMax(eptr,range))
958                 return eptr;
959             return NULL;
960         }
961 
962         /* Move to next element. */
963         sptr = ziplistNext(zl,eptr); /* This element score. Skip it. */
964         serverAssert(sptr != NULL);
965         eptr = ziplistNext(zl,sptr); /* Next element. */
966     }
967 
968     return NULL;
969 }
970 
971 /* Find pointer to the last element contained in the specified lex range.
972  * Returns NULL when no element is contained in the range. */
zzlLastInLexRange(unsigned char * zl,zlexrangespec * range)973 unsigned char *zzlLastInLexRange(unsigned char *zl, zlexrangespec *range) {
974     unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
975 
976     /* If everything is out of range, return early. */
977     if (!zzlIsInLexRange(zl,range)) return NULL;
978 
979     while (eptr != NULL) {
980         if (zzlLexValueLteMax(eptr,range)) {
981             /* Check if score >= min. */
982             if (zzlLexValueGteMin(eptr,range))
983                 return eptr;
984             return NULL;
985         }
986 
987         /* Move to previous element by moving to the score of previous element.
988          * When this returns NULL, we know there also is no element. */
989         sptr = ziplistPrev(zl,eptr);
990         if (sptr != NULL)
991             serverAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
992         else
993             eptr = NULL;
994     }
995 
996     return NULL;
997 }
998 
zzlFind(unsigned char * zl,sds ele,double * score)999 unsigned char *zzlFind(unsigned char *zl, sds ele, double *score) {
1000     unsigned char *eptr = ziplistIndex(zl,0), *sptr;
1001 
1002     while (eptr != NULL) {
1003         sptr = ziplistNext(zl,eptr);
1004         serverAssert(sptr != NULL);
1005 
1006         if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele))) {
1007             /* Matching element, pull out score. */
1008             if (score != NULL) *score = zzlGetScore(sptr);
1009             return eptr;
1010         }
1011 
1012         /* Move to next element. */
1013         eptr = ziplistNext(zl,sptr);
1014     }
1015     return NULL;
1016 }
1017 
1018 /* Delete (element,score) pair from ziplist. Use local copy of eptr because we
1019  * don't want to modify the one given as argument. */
zzlDelete(unsigned char * zl,unsigned char * eptr)1020 unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) {
1021     unsigned char *p = eptr;
1022 
1023     /* TODO: add function to ziplist API to delete N elements from offset. */
1024     zl = ziplistDelete(zl,&p);
1025     zl = ziplistDelete(zl,&p);
1026     return zl;
1027 }
1028 
zzlInsertAt(unsigned char * zl,unsigned char * eptr,sds ele,double score)1029 unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, sds ele, double score) {
1030     unsigned char *sptr;
1031     char scorebuf[128];
1032     int scorelen;
1033     size_t offset;
1034 
1035     scorelen = d2string(scorebuf,sizeof(scorebuf),score);
1036     if (eptr == NULL) {
1037         zl = ziplistPush(zl,(unsigned char*)ele,sdslen(ele),ZIPLIST_TAIL);
1038         zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
1039     } else {
1040         /* Keep offset relative to zl, as it might be re-allocated. */
1041         offset = eptr-zl;
1042         zl = ziplistInsert(zl,eptr,(unsigned char*)ele,sdslen(ele));
1043         eptr = zl+offset;
1044 
1045         /* Insert score after the element. */
1046         serverAssert((sptr = ziplistNext(zl,eptr)) != NULL);
1047         zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
1048     }
1049     return zl;
1050 }
1051 
1052 /* Insert (element,score) pair in ziplist. This function assumes the element is
1053  * not yet present in the list. */
zzlInsert(unsigned char * zl,sds ele,double score)1054 unsigned char *zzlInsert(unsigned char *zl, sds ele, double score) {
1055     unsigned char *eptr = ziplistIndex(zl,0), *sptr;
1056     double s;
1057 
1058     while (eptr != NULL) {
1059         sptr = ziplistNext(zl,eptr);
1060         serverAssert(sptr != NULL);
1061         s = zzlGetScore(sptr);
1062 
1063         if (s > score) {
1064             /* First element with score larger than score for element to be
1065              * inserted. This means we should take its spot in the list to
1066              * maintain ordering. */
1067             zl = zzlInsertAt(zl,eptr,ele,score);
1068             break;
1069         } else if (s == score) {
1070             /* Ensure lexicographical ordering for elements. */
1071             if (zzlCompareElements(eptr,(unsigned char*)ele,sdslen(ele)) > 0) {
1072                 zl = zzlInsertAt(zl,eptr,ele,score);
1073                 break;
1074             }
1075         }
1076 
1077         /* Move to next element. */
1078         eptr = ziplistNext(zl,sptr);
1079     }
1080 
1081     /* Push on tail of list when it was not yet inserted. */
1082     if (eptr == NULL)
1083         zl = zzlInsertAt(zl,NULL,ele,score);
1084     return zl;
1085 }
1086 
zzlDeleteRangeByScore(unsigned char * zl,zrangespec * range,unsigned long * deleted)1087 unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec *range, unsigned long *deleted) {
1088     unsigned char *eptr, *sptr;
1089     double score;
1090     unsigned long num = 0;
1091 
1092     if (deleted != NULL) *deleted = 0;
1093 
1094     eptr = zzlFirstInRange(zl,range);
1095     if (eptr == NULL) return zl;
1096 
1097     /* When the tail of the ziplist is deleted, eptr will point to the sentinel
1098      * byte and ziplistNext will return NULL. */
1099     while ((sptr = ziplistNext(zl,eptr)) != NULL) {
1100         score = zzlGetScore(sptr);
1101         if (zslValueLteMax(score,range)) {
1102             /* Delete both the element and the score. */
1103             zl = ziplistDelete(zl,&eptr);
1104             zl = ziplistDelete(zl,&eptr);
1105             num++;
1106         } else {
1107             /* No longer in range. */
1108             break;
1109         }
1110     }
1111 
1112     if (deleted != NULL) *deleted = num;
1113     return zl;
1114 }
1115 
zzlDeleteRangeByLex(unsigned char * zl,zlexrangespec * range,unsigned long * deleted)1116 unsigned char *zzlDeleteRangeByLex(unsigned char *zl, zlexrangespec *range, unsigned long *deleted) {
1117     unsigned char *eptr, *sptr;
1118     unsigned long num = 0;
1119 
1120     if (deleted != NULL) *deleted = 0;
1121 
1122     eptr = zzlFirstInLexRange(zl,range);
1123     if (eptr == NULL) return zl;
1124 
1125     /* When the tail of the ziplist is deleted, eptr will point to the sentinel
1126      * byte and ziplistNext will return NULL. */
1127     while ((sptr = ziplistNext(zl,eptr)) != NULL) {
1128         if (zzlLexValueLteMax(eptr,range)) {
1129             /* Delete both the element and the score. */
1130             zl = ziplistDelete(zl,&eptr);
1131             zl = ziplistDelete(zl,&eptr);
1132             num++;
1133         } else {
1134             /* No longer in range. */
1135             break;
1136         }
1137     }
1138 
1139     if (deleted != NULL) *deleted = num;
1140     return zl;
1141 }
1142 
1143 /* Delete all the elements with rank between start and end from the skiplist.
1144  * Start and end are inclusive. Note that start and end need to be 1-based */
zzlDeleteRangeByRank(unsigned char * zl,unsigned int start,unsigned int end,unsigned long * deleted)1145 unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) {
1146     unsigned int num = (end-start)+1;
1147     if (deleted) *deleted = num;
1148     zl = ziplistDeleteRange(zl,2*(start-1),2*num);
1149     return zl;
1150 }
1151 
1152 /*-----------------------------------------------------------------------------
1153  * Common sorted set API
1154  *----------------------------------------------------------------------------*/
1155 
zsetLength(const robj * zobj)1156 unsigned long zsetLength(const robj *zobj) {
1157     unsigned long length = 0;
1158     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
1159         length = zzlLength(zobj->ptr);
1160     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1161         length = ((const zset*)zobj->ptr)->zsl->length;
1162     } else {
1163         serverPanic("Unknown sorted set encoding");
1164     }
1165     return length;
1166 }
1167 
zsetConvert(robj * zobj,int encoding)1168 void zsetConvert(robj *zobj, int encoding) {
1169     zset *zs;
1170     zskiplistNode *node, *next;
1171     sds ele;
1172     double score;
1173 
1174     if (zobj->encoding == encoding) return;
1175     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
1176         unsigned char *zl = zobj->ptr;
1177         unsigned char *eptr, *sptr;
1178         unsigned char *vstr;
1179         unsigned int vlen;
1180         long long vlong;
1181 
1182         if (encoding != OBJ_ENCODING_SKIPLIST)
1183             serverPanic("Unknown target encoding");
1184 
1185         zs = zmalloc(sizeof(*zs));
1186         zs->dict = dictCreate(&zsetDictType,NULL);
1187         zs->zsl = zslCreate();
1188 
1189         eptr = ziplistIndex(zl,0);
1190         serverAssertWithInfo(NULL,zobj,eptr != NULL);
1191         sptr = ziplistNext(zl,eptr);
1192         serverAssertWithInfo(NULL,zobj,sptr != NULL);
1193 
1194         while (eptr != NULL) {
1195             score = zzlGetScore(sptr);
1196             serverAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
1197             if (vstr == NULL)
1198                 ele = sdsfromlonglong(vlong);
1199             else
1200                 ele = sdsnewlen((char*)vstr,vlen);
1201 
1202             node = zslInsert(zs->zsl,score,ele);
1203             serverAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK);
1204             zzlNext(zl,&eptr,&sptr);
1205         }
1206 
1207         zfree(zobj->ptr);
1208         zobj->ptr = zs;
1209         zobj->encoding = OBJ_ENCODING_SKIPLIST;
1210     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1211         unsigned char *zl = ziplistNew();
1212 
1213         if (encoding != OBJ_ENCODING_ZIPLIST)
1214             serverPanic("Unknown target encoding");
1215 
1216         /* Approach similar to zslFree(), since we want to free the skiplist at
1217          * the same time as creating the ziplist. */
1218         zs = zobj->ptr;
1219         dictRelease(zs->dict);
1220         node = zs->zsl->header->level[0].forward;
1221         zfree(zs->zsl->header);
1222         zfree(zs->zsl);
1223 
1224         while (node) {
1225             zl = zzlInsertAt(zl,NULL,node->ele,node->score);
1226             next = node->level[0].forward;
1227             zslFreeNode(node);
1228             node = next;
1229         }
1230 
1231         zfree(zs);
1232         zobj->ptr = zl;
1233         zobj->encoding = OBJ_ENCODING_ZIPLIST;
1234     } else {
1235         serverPanic("Unknown sorted set encoding");
1236     }
1237 }
1238 
1239 /* Convert the sorted set object into a ziplist if it is not already a ziplist
1240  * and if the number of elements and the maximum element size is within the
1241  * expected ranges. */
zsetConvertToZiplistIfNeeded(robj * zobj,size_t maxelelen)1242 void zsetConvertToZiplistIfNeeded(robj *zobj, size_t maxelelen) {
1243     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) return;
1244     zset *zset = zobj->ptr;
1245 
1246     if (zset->zsl->length <= server.zset_max_ziplist_entries &&
1247         maxelelen <= server.zset_max_ziplist_value)
1248             zsetConvert(zobj,OBJ_ENCODING_ZIPLIST);
1249 }
1250 
1251 /* Return (by reference) the score of the specified member of the sorted set
1252  * storing it into *score. If the element does not exist C_ERR is returned
1253  * otherwise C_OK is returned and *score is correctly populated.
1254  * If 'zobj' or 'member' is NULL, C_ERR is returned. */
zsetScore(robj * zobj,sds member,double * score)1255 int zsetScore(robj *zobj, sds member, double *score) {
1256     if (!zobj || !member) return C_ERR;
1257 
1258     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
1259         if (zzlFind(zobj->ptr, member, score) == NULL) return C_ERR;
1260     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1261         zset *zs = zobj->ptr;
1262         dictEntry *de = dictFind(zs->dict, member);
1263         if (de == NULL) return C_ERR;
1264         *score = *(double*)dictGetVal(de);
1265     } else {
1266         serverPanic("Unknown sorted set encoding");
1267     }
1268     return C_OK;
1269 }
1270 
1271 /* Add a new element or update the score of an existing element in a sorted
1272  * set, regardless of its encoding.
1273  *
1274  * The set of flags change the command behavior. They are passed with an integer
1275  * pointer since the function will clear the flags and populate them with
1276  * other flags to indicate different conditions.
1277  *
1278  * The input flags are the following:
1279  *
1280  * ZADD_INCR: Increment the current element score by 'score' instead of updating
1281  *            the current element score. If the element does not exist, we
1282  *            assume 0 as previous score.
1283  * ZADD_NX:   Perform the operation only if the element does not exist.
1284  * ZADD_XX:   Perform the operation only if the element already exist.
1285  *
1286  * When ZADD_INCR is used, the new score of the element is stored in
1287  * '*newscore' if 'newscore' is not NULL.
1288  *
1289  * The returned flags are the following:
1290  *
1291  * ZADD_NAN:     The resulting score is not a number.
1292  * ZADD_ADDED:   The element was added (not present before the call).
1293  * ZADD_UPDATED: The element score was updated.
1294  * ZADD_NOP:     No operation was performed because of NX or XX.
1295  *
1296  * Return value:
1297  *
1298  * The function returns 1 on success, and sets the appropriate flags
1299  * ADDED or UPDATED to signal what happened during the operation (note that
1300  * none could be set if we re-added an element using the same score it used
1301  * to have, or in the case a zero increment is used).
1302  *
1303  * The function returns 0 on erorr, currently only when the increment
1304  * produces a NAN condition, or when the 'score' value is NAN since the
1305  * start.
1306  *
1307  * The commad as a side effect of adding a new element may convert the sorted
1308  * set internal encoding from ziplist to hashtable+skiplist.
1309  *
1310  * Memory managemnet of 'ele':
1311  *
1312  * The function does not take ownership of the 'ele' SDS string, but copies
1313  * it if needed. */
zsetAdd(robj * zobj,double score,sds ele,int * flags,double * newscore)1314 int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
1315     /* Turn options into simple to check vars. */
1316     int incr = (*flags & ZADD_INCR) != 0;
1317     int nx = (*flags & ZADD_NX) != 0;
1318     int xx = (*flags & ZADD_XX) != 0;
1319     *flags = 0; /* We'll return our response flags. */
1320     double curscore;
1321 
1322     /* NaN as input is an error regardless of all the other parameters. */
1323     if (isnan(score)) {
1324         *flags = ZADD_NAN;
1325         return 0;
1326     }
1327 
1328     /* Update the sorted set according to its encoding. */
1329     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
1330         unsigned char *eptr;
1331 
1332         if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
1333             /* NX? Return, same element already exists. */
1334             if (nx) {
1335                 *flags |= ZADD_NOP;
1336                 return 1;
1337             }
1338 
1339             /* Prepare the score for the increment if needed. */
1340             if (incr) {
1341                 score += curscore;
1342                 if (isnan(score)) {
1343                     *flags |= ZADD_NAN;
1344                     return 0;
1345                 }
1346                 if (newscore) *newscore = score;
1347             }
1348 
1349             /* Remove and re-insert when score changed. */
1350             if (score != curscore) {
1351                 zobj->ptr = zzlDelete(zobj->ptr,eptr);
1352                 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
1353                 *flags |= ZADD_UPDATED;
1354             }
1355             return 1;
1356         } else if (!xx) {
1357             /* Optimize: check if the element is too large or the list
1358              * becomes too long *before* executing zzlInsert. */
1359             zobj->ptr = zzlInsert(zobj->ptr,ele,score);
1360             if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
1361                 zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
1362             if (sdslen(ele) > server.zset_max_ziplist_value)
1363                 zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
1364             if (newscore) *newscore = score;
1365             *flags |= ZADD_ADDED;
1366             return 1;
1367         } else {
1368             *flags |= ZADD_NOP;
1369             return 1;
1370         }
1371     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1372         zset *zs = zobj->ptr;
1373         zskiplistNode *znode;
1374         dictEntry *de;
1375 
1376         de = dictFind(zs->dict,ele);
1377         if (de != NULL) {
1378             /* NX? Return, same element already exists. */
1379             if (nx) {
1380                 *flags |= ZADD_NOP;
1381                 return 1;
1382             }
1383             curscore = *(double*)dictGetVal(de);
1384 
1385             /* Prepare the score for the increment if needed. */
1386             if (incr) {
1387                 score += curscore;
1388                 if (isnan(score)) {
1389                     *flags |= ZADD_NAN;
1390                     return 0;
1391                 }
1392                 if (newscore) *newscore = score;
1393             }
1394 
1395             /* Remove and re-insert when score changes. */
1396             if (score != curscore) {
1397                 znode = zslUpdateScore(zs->zsl,curscore,ele,score);
1398                 /* Note that we did not removed the original element from
1399                  * the hash table representing the sorted set, so we just
1400                  * update the score. */
1401                 dictGetVal(de) = &znode->score; /* Update score ptr. */
1402                 *flags |= ZADD_UPDATED;
1403             }
1404             return 1;
1405         } else if (!xx) {
1406             ele = sdsdup(ele);
1407             znode = zslInsert(zs->zsl,score,ele);
1408             serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
1409             *flags |= ZADD_ADDED;
1410             if (newscore) *newscore = score;
1411             return 1;
1412         } else {
1413             *flags |= ZADD_NOP;
1414             return 1;
1415         }
1416     } else {
1417         serverPanic("Unknown sorted set encoding");
1418     }
1419     return 0; /* Never reached. */
1420 }
1421 
1422 /* Delete the element 'ele' from the sorted set, returning 1 if the element
1423  * existed and was deleted, 0 otherwise (the element was not there). */
zsetDel(robj * zobj,sds ele)1424 int zsetDel(robj *zobj, sds ele) {
1425     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
1426         unsigned char *eptr;
1427 
1428         if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) {
1429             zobj->ptr = zzlDelete(zobj->ptr,eptr);
1430             return 1;
1431         }
1432     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1433         zset *zs = zobj->ptr;
1434         dictEntry *de;
1435         double score;
1436 
1437         de = dictUnlink(zs->dict,ele);
1438         if (de != NULL) {
1439             /* Get the score in order to delete from the skiplist later. */
1440             score = *(double*)dictGetVal(de);
1441 
1442             /* Delete from the hash table and later from the skiplist.
1443              * Note that the order is important: deleting from the skiplist
1444              * actually releases the SDS string representing the element,
1445              * which is shared between the skiplist and the hash table, so
1446              * we need to delete from the skiplist as the final step. */
1447             dictFreeUnlinkedEntry(zs->dict,de);
1448 
1449             /* Delete from skiplist. */
1450             int retval = zslDelete(zs->zsl,score,ele,NULL);
1451             serverAssert(retval);
1452 
1453             if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1454             return 1;
1455         }
1456     } else {
1457         serverPanic("Unknown sorted set encoding");
1458     }
1459     return 0; /* No such element found. */
1460 }
1461 
1462 /* Given a sorted set object returns the 0-based rank of the object or
1463  * -1 if the object does not exist.
1464  *
1465  * For rank we mean the position of the element in the sorted collection
1466  * of elements. So the first element has rank 0, the second rank 1, and so
1467  * forth up to length-1 elements.
1468  *
1469  * If 'reverse' is false, the rank is returned considering as first element
1470  * the one with the lowest score. Otherwise if 'reverse' is non-zero
1471  * the rank is computed considering as element with rank 0 the one with
1472  * the highest score. */
zsetRank(robj * zobj,sds ele,int reverse)1473 long zsetRank(robj *zobj, sds ele, int reverse) {
1474     unsigned long llen;
1475     unsigned long rank;
1476 
1477     llen = zsetLength(zobj);
1478 
1479     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
1480         unsigned char *zl = zobj->ptr;
1481         unsigned char *eptr, *sptr;
1482 
1483         eptr = ziplistIndex(zl,0);
1484         serverAssert(eptr != NULL);
1485         sptr = ziplistNext(zl,eptr);
1486         serverAssert(sptr != NULL);
1487 
1488         rank = 1;
1489         while(eptr != NULL) {
1490             if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele)))
1491                 break;
1492             rank++;
1493             zzlNext(zl,&eptr,&sptr);
1494         }
1495 
1496         if (eptr != NULL) {
1497             if (reverse)
1498                 return llen-rank;
1499             else
1500                 return rank-1;
1501         } else {
1502             return -1;
1503         }
1504     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1505         zset *zs = zobj->ptr;
1506         zskiplist *zsl = zs->zsl;
1507         dictEntry *de;
1508         double score;
1509 
1510         de = dictFind(zs->dict,ele);
1511         if (de != NULL) {
1512             score = *(double*)dictGetVal(de);
1513             rank = zslGetRank(zsl,score,ele);
1514             /* Existing elements always have a rank. */
1515             serverAssert(rank != 0);
1516             if (reverse)
1517                 return llen-rank;
1518             else
1519                 return rank-1;
1520         } else {
1521             return -1;
1522         }
1523     } else {
1524         serverPanic("Unknown sorted set encoding");
1525     }
1526 }
1527 
1528 /*-----------------------------------------------------------------------------
1529  * Sorted set commands
1530  *----------------------------------------------------------------------------*/
1531 
1532 /* This generic command implements both ZADD and ZINCRBY. */
zaddGenericCommand(client * c,int flags)1533 void zaddGenericCommand(client *c, int flags) {
1534     static char *nanerr = "resulting score is not a number (NaN)";
1535     robj *key = c->argv[1];
1536     robj *zobj;
1537     sds ele;
1538     double score = 0, *scores = NULL;
1539     int j, elements;
1540     int scoreidx = 0;
1541     /* The following vars are used in order to track what the command actually
1542      * did during the execution, to reply to the client and to trigger the
1543      * notification of keyspace change. */
1544     int added = 0;      /* Number of new elements added. */
1545     int updated = 0;    /* Number of elements with updated score. */
1546     int processed = 0;  /* Number of elements processed, may remain zero with
1547                            options like XX. */
1548 
1549     /* Parse options. At the end 'scoreidx' is set to the argument position
1550      * of the score of the first score-element pair. */
1551     scoreidx = 2;
1552     while(scoreidx < c->argc) {
1553         char *opt = c->argv[scoreidx]->ptr;
1554         if (!strcasecmp(opt,"nx")) flags |= ZADD_NX;
1555         else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX;
1556         else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH;
1557         else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR;
1558         else break;
1559         scoreidx++;
1560     }
1561 
1562     /* Turn options into simple to check vars. */
1563     int incr = (flags & ZADD_INCR) != 0;
1564     int nx = (flags & ZADD_NX) != 0;
1565     int xx = (flags & ZADD_XX) != 0;
1566     int ch = (flags & ZADD_CH) != 0;
1567 
1568     /* After the options, we expect to have an even number of args, since
1569      * we expect any number of score-element pairs. */
1570     elements = c->argc-scoreidx;
1571     if (elements % 2 || !elements) {
1572         addReply(c,shared.syntaxerr);
1573         return;
1574     }
1575     elements /= 2; /* Now this holds the number of score-element pairs. */
1576 
1577     /* Check for incompatible options. */
1578     if (nx && xx) {
1579         addReplyError(c,
1580             "XX and NX options at the same time are not compatible");
1581         return;
1582     }
1583 
1584     if (incr && elements > 1) {
1585         addReplyError(c,
1586             "INCR option supports a single increment-element pair");
1587         return;
1588     }
1589 
1590     /* Start parsing all the scores, we need to emit any syntax error
1591      * before executing additions to the sorted set, as the command should
1592      * either execute fully or nothing at all. */
1593     scores = zmalloc(sizeof(double)*elements);
1594     for (j = 0; j < elements; j++) {
1595         if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
1596             != C_OK) goto cleanup;
1597     }
1598 
1599     /* Lookup the key and create the sorted set if does not exist. */
1600     zobj = lookupKeyWrite(c->db,key);
1601     if (zobj == NULL) {
1602         if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
1603         if (server.zset_max_ziplist_entries == 0 ||
1604             server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
1605         {
1606             zobj = createZsetObject();
1607         } else {
1608             zobj = createZsetZiplistObject();
1609         }
1610         dbAdd(c->db,key,zobj);
1611     } else {
1612         if (zobj->type != OBJ_ZSET) {
1613             addReply(c,shared.wrongtypeerr);
1614             goto cleanup;
1615         }
1616     }
1617 
1618     for (j = 0; j < elements; j++) {
1619         double newscore;
1620         score = scores[j];
1621         int retflags = flags;
1622 
1623         ele = c->argv[scoreidx+1+j*2]->ptr;
1624         int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
1625         if (retval == 0) {
1626             addReplyError(c,nanerr);
1627             goto cleanup;
1628         }
1629         if (retflags & ZADD_ADDED) added++;
1630         if (retflags & ZADD_UPDATED) updated++;
1631         if (!(retflags & ZADD_NOP)) processed++;
1632         score = newscore;
1633     }
1634     server.dirty += (added+updated);
1635 
1636 reply_to_client:
1637     if (incr) { /* ZINCRBY or INCR option. */
1638         if (processed)
1639             addReplyDouble(c,score);
1640         else
1641             addReply(c,shared.nullbulk);
1642     } else { /* ZADD. */
1643         addReplyLongLong(c,ch ? added+updated : added);
1644     }
1645 
1646 cleanup:
1647     zfree(scores);
1648     if (added || updated) {
1649         signalModifiedKey(c->db,key);
1650         notifyKeyspaceEvent(NOTIFY_ZSET,
1651             incr ? "zincr" : "zadd", key, c->db->id);
1652     }
1653 }
1654 
zaddCommand(client * c)1655 void zaddCommand(client *c) {
1656     zaddGenericCommand(c,ZADD_NONE);
1657 }
1658 
zincrbyCommand(client * c)1659 void zincrbyCommand(client *c) {
1660     zaddGenericCommand(c,ZADD_INCR);
1661 }
1662 
zremCommand(client * c)1663 void zremCommand(client *c) {
1664     robj *key = c->argv[1];
1665     robj *zobj;
1666     int deleted = 0, keyremoved = 0, j;
1667 
1668     if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1669         checkType(c,zobj,OBJ_ZSET)) return;
1670 
1671     for (j = 2; j < c->argc; j++) {
1672         if (zsetDel(zobj,c->argv[j]->ptr)) deleted++;
1673         if (zsetLength(zobj) == 0) {
1674             dbDelete(c->db,key);
1675             keyremoved = 1;
1676             break;
1677         }
1678     }
1679 
1680     if (deleted) {
1681         notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id);
1682         if (keyremoved)
1683             notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
1684         signalModifiedKey(c->db,key);
1685         server.dirty += deleted;
1686     }
1687     addReplyLongLong(c,deleted);
1688 }
1689 
1690 /* Implements ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZREMRANGEBYLEX commands. */
1691 #define ZRANGE_RANK 0
1692 #define ZRANGE_SCORE 1
1693 #define ZRANGE_LEX 2
zremrangeGenericCommand(client * c,int rangetype)1694 void zremrangeGenericCommand(client *c, int rangetype) {
1695     robj *key = c->argv[1];
1696     robj *zobj;
1697     int keyremoved = 0;
1698     unsigned long deleted = 0;
1699     zrangespec range;
1700     zlexrangespec lexrange;
1701     long start, end, llen;
1702 
1703     /* Step 1: Parse the range. */
1704     if (rangetype == ZRANGE_RANK) {
1705         if ((getLongFromObjectOrReply(c,c->argv[2],&start,NULL) != C_OK) ||
1706             (getLongFromObjectOrReply(c,c->argv[3],&end,NULL) != C_OK))
1707             return;
1708     } else if (rangetype == ZRANGE_SCORE) {
1709         if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK) {
1710             addReplyError(c,"min or max is not a float");
1711             return;
1712         }
1713     } else if (rangetype == ZRANGE_LEX) {
1714         if (zslParseLexRange(c->argv[2],c->argv[3],&lexrange) != C_OK) {
1715             addReplyError(c,"min or max not valid string range item");
1716             return;
1717         }
1718     }
1719 
1720     /* Step 2: Lookup & range sanity checks if needed. */
1721     if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1722         checkType(c,zobj,OBJ_ZSET)) goto cleanup;
1723 
1724     if (rangetype == ZRANGE_RANK) {
1725         /* Sanitize indexes. */
1726         llen = zsetLength(zobj);
1727         if (start < 0) start = llen+start;
1728         if (end < 0) end = llen+end;
1729         if (start < 0) start = 0;
1730 
1731         /* Invariant: start >= 0, so this test will be true when end < 0.
1732          * The range is empty when start > end or start >= length. */
1733         if (start > end || start >= llen) {
1734             addReply(c,shared.czero);
1735             goto cleanup;
1736         }
1737         if (end >= llen) end = llen-1;
1738     }
1739 
1740     /* Step 3: Perform the range deletion operation. */
1741     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
1742         switch(rangetype) {
1743         case ZRANGE_RANK:
1744             zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);
1745             break;
1746         case ZRANGE_SCORE:
1747             zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,&range,&deleted);
1748             break;
1749         case ZRANGE_LEX:
1750             zobj->ptr = zzlDeleteRangeByLex(zobj->ptr,&lexrange,&deleted);
1751             break;
1752         }
1753         if (zzlLength(zobj->ptr) == 0) {
1754             dbDelete(c->db,key);
1755             keyremoved = 1;
1756         }
1757     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1758         zset *zs = zobj->ptr;
1759         switch(rangetype) {
1760         case ZRANGE_RANK:
1761             deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
1762             break;
1763         case ZRANGE_SCORE:
1764             deleted = zslDeleteRangeByScore(zs->zsl,&range,zs->dict);
1765             break;
1766         case ZRANGE_LEX:
1767             deleted = zslDeleteRangeByLex(zs->zsl,&lexrange,zs->dict);
1768             break;
1769         }
1770         if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1771         if (dictSize(zs->dict) == 0) {
1772             dbDelete(c->db,key);
1773             keyremoved = 1;
1774         }
1775     } else {
1776         serverPanic("Unknown sorted set encoding");
1777     }
1778 
1779     /* Step 4: Notifications and reply. */
1780     if (deleted) {
1781         char *event[3] = {"zremrangebyrank","zremrangebyscore","zremrangebylex"};
1782         signalModifiedKey(c->db,key);
1783         notifyKeyspaceEvent(NOTIFY_ZSET,event[rangetype],key,c->db->id);
1784         if (keyremoved)
1785             notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
1786     }
1787     server.dirty += deleted;
1788     addReplyLongLong(c,deleted);
1789 
1790 cleanup:
1791     if (rangetype == ZRANGE_LEX) zslFreeLexRange(&lexrange);
1792 }
1793 
zremrangebyrankCommand(client * c)1794 void zremrangebyrankCommand(client *c) {
1795     zremrangeGenericCommand(c,ZRANGE_RANK);
1796 }
1797 
zremrangebyscoreCommand(client * c)1798 void zremrangebyscoreCommand(client *c) {
1799     zremrangeGenericCommand(c,ZRANGE_SCORE);
1800 }
1801 
zremrangebylexCommand(client * c)1802 void zremrangebylexCommand(client *c) {
1803     zremrangeGenericCommand(c,ZRANGE_LEX);
1804 }
1805 
1806 typedef struct {
1807     robj *subject;
1808     int type; /* Set, sorted set */
1809     int encoding;
1810     double weight;
1811 
1812     union {
1813         /* Set iterators. */
1814         union _iterset {
1815             struct {
1816                 intset *is;
1817                 int ii;
1818             } is;
1819             struct {
1820                 dict *dict;
1821                 dictIterator *di;
1822                 dictEntry *de;
1823             } ht;
1824         } set;
1825 
1826         /* Sorted set iterators. */
1827         union _iterzset {
1828             struct {
1829                 unsigned char *zl;
1830                 unsigned char *eptr, *sptr;
1831             } zl;
1832             struct {
1833                 zset *zs;
1834                 zskiplistNode *node;
1835             } sl;
1836         } zset;
1837     } iter;
1838 } zsetopsrc;
1839 
1840 
1841 /* Use dirty flags for pointers that need to be cleaned up in the next
1842  * iteration over the zsetopval. The dirty flag for the long long value is
1843  * special, since long long values don't need cleanup. Instead, it means that
1844  * we already checked that "ell" holds a long long, or tried to convert another
1845  * representation into a long long value. When this was successful,
1846  * OPVAL_VALID_LL is set as well. */
1847 #define OPVAL_DIRTY_SDS 1
1848 #define OPVAL_DIRTY_LL 2
1849 #define OPVAL_VALID_LL 4
1850 
1851 /* Store value retrieved from the iterator. */
1852 typedef struct {
1853     int flags;
1854     unsigned char _buf[32]; /* Private buffer. */
1855     sds ele;
1856     unsigned char *estr;
1857     unsigned int elen;
1858     long long ell;
1859     double score;
1860 } zsetopval;
1861 
1862 typedef union _iterset iterset;
1863 typedef union _iterzset iterzset;
1864 
zuiInitIterator(zsetopsrc * op)1865 void zuiInitIterator(zsetopsrc *op) {
1866     if (op->subject == NULL)
1867         return;
1868 
1869     if (op->type == OBJ_SET) {
1870         iterset *it = &op->iter.set;
1871         if (op->encoding == OBJ_ENCODING_INTSET) {
1872             it->is.is = op->subject->ptr;
1873             it->is.ii = 0;
1874         } else if (op->encoding == OBJ_ENCODING_HT) {
1875             it->ht.dict = op->subject->ptr;
1876             it->ht.di = dictGetIterator(op->subject->ptr);
1877             it->ht.de = dictNext(it->ht.di);
1878         } else {
1879             serverPanic("Unknown set encoding");
1880         }
1881     } else if (op->type == OBJ_ZSET) {
1882         iterzset *it = &op->iter.zset;
1883         if (op->encoding == OBJ_ENCODING_ZIPLIST) {
1884             it->zl.zl = op->subject->ptr;
1885             it->zl.eptr = ziplistIndex(it->zl.zl,0);
1886             if (it->zl.eptr != NULL) {
1887                 it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr);
1888                 serverAssert(it->zl.sptr != NULL);
1889             }
1890         } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
1891             it->sl.zs = op->subject->ptr;
1892             it->sl.node = it->sl.zs->zsl->header->level[0].forward;
1893         } else {
1894             serverPanic("Unknown sorted set encoding");
1895         }
1896     } else {
1897         serverPanic("Unsupported type");
1898     }
1899 }
1900 
zuiClearIterator(zsetopsrc * op)1901 void zuiClearIterator(zsetopsrc *op) {
1902     if (op->subject == NULL)
1903         return;
1904 
1905     if (op->type == OBJ_SET) {
1906         iterset *it = &op->iter.set;
1907         if (op->encoding == OBJ_ENCODING_INTSET) {
1908             UNUSED(it); /* skip */
1909         } else if (op->encoding == OBJ_ENCODING_HT) {
1910             dictReleaseIterator(it->ht.di);
1911         } else {
1912             serverPanic("Unknown set encoding");
1913         }
1914     } else if (op->type == OBJ_ZSET) {
1915         iterzset *it = &op->iter.zset;
1916         if (op->encoding == OBJ_ENCODING_ZIPLIST) {
1917             UNUSED(it); /* skip */
1918         } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
1919             UNUSED(it); /* skip */
1920         } else {
1921             serverPanic("Unknown sorted set encoding");
1922         }
1923     } else {
1924         serverPanic("Unsupported type");
1925     }
1926 }
1927 
zuiLength(zsetopsrc * op)1928 unsigned long zuiLength(zsetopsrc *op) {
1929     if (op->subject == NULL)
1930         return 0;
1931 
1932     if (op->type == OBJ_SET) {
1933         if (op->encoding == OBJ_ENCODING_INTSET) {
1934             return intsetLen(op->subject->ptr);
1935         } else if (op->encoding == OBJ_ENCODING_HT) {
1936             dict *ht = op->subject->ptr;
1937             return dictSize(ht);
1938         } else {
1939             serverPanic("Unknown set encoding");
1940         }
1941     } else if (op->type == OBJ_ZSET) {
1942         if (op->encoding == OBJ_ENCODING_ZIPLIST) {
1943             return zzlLength(op->subject->ptr);
1944         } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
1945             zset *zs = op->subject->ptr;
1946             return zs->zsl->length;
1947         } else {
1948             serverPanic("Unknown sorted set encoding");
1949         }
1950     } else {
1951         serverPanic("Unsupported type");
1952     }
1953 }
1954 
1955 /* Check if the current value is valid. If so, store it in the passed structure
1956  * and move to the next element. If not valid, this means we have reached the
1957  * end of the structure and can abort. */
zuiNext(zsetopsrc * op,zsetopval * val)1958 int zuiNext(zsetopsrc *op, zsetopval *val) {
1959     if (op->subject == NULL)
1960         return 0;
1961 
1962     if (val->flags & OPVAL_DIRTY_SDS)
1963         sdsfree(val->ele);
1964 
1965     memset(val,0,sizeof(zsetopval));
1966 
1967     if (op->type == OBJ_SET) {
1968         iterset *it = &op->iter.set;
1969         if (op->encoding == OBJ_ENCODING_INTSET) {
1970             int64_t ell;
1971 
1972             if (!intsetGet(it->is.is,it->is.ii,&ell))
1973                 return 0;
1974             val->ell = ell;
1975             val->score = 1.0;
1976 
1977             /* Move to next element. */
1978             it->is.ii++;
1979         } else if (op->encoding == OBJ_ENCODING_HT) {
1980             if (it->ht.de == NULL)
1981                 return 0;
1982             val->ele = dictGetKey(it->ht.de);
1983             val->score = 1.0;
1984 
1985             /* Move to next element. */
1986             it->ht.de = dictNext(it->ht.di);
1987         } else {
1988             serverPanic("Unknown set encoding");
1989         }
1990     } else if (op->type == OBJ_ZSET) {
1991         iterzset *it = &op->iter.zset;
1992         if (op->encoding == OBJ_ENCODING_ZIPLIST) {
1993             /* No need to check both, but better be explicit. */
1994             if (it->zl.eptr == NULL || it->zl.sptr == NULL)
1995                 return 0;
1996             serverAssert(ziplistGet(it->zl.eptr,&val->estr,&val->elen,&val->ell));
1997             val->score = zzlGetScore(it->zl.sptr);
1998 
1999             /* Move to next element. */
2000             zzlNext(it->zl.zl,&it->zl.eptr,&it->zl.sptr);
2001         } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
2002             if (it->sl.node == NULL)
2003                 return 0;
2004             val->ele = it->sl.node->ele;
2005             val->score = it->sl.node->score;
2006 
2007             /* Move to next element. */
2008             it->sl.node = it->sl.node->level[0].forward;
2009         } else {
2010             serverPanic("Unknown sorted set encoding");
2011         }
2012     } else {
2013         serverPanic("Unsupported type");
2014     }
2015     return 1;
2016 }
2017 
zuiLongLongFromValue(zsetopval * val)2018 int zuiLongLongFromValue(zsetopval *val) {
2019     if (!(val->flags & OPVAL_DIRTY_LL)) {
2020         val->flags |= OPVAL_DIRTY_LL;
2021 
2022         if (val->ele != NULL) {
2023             if (string2ll(val->ele,sdslen(val->ele),&val->ell))
2024                 val->flags |= OPVAL_VALID_LL;
2025         } else if (val->estr != NULL) {
2026             if (string2ll((char*)val->estr,val->elen,&val->ell))
2027                 val->flags |= OPVAL_VALID_LL;
2028         } else {
2029             /* The long long was already set, flag as valid. */
2030             val->flags |= OPVAL_VALID_LL;
2031         }
2032     }
2033     return val->flags & OPVAL_VALID_LL;
2034 }
2035 
zuiSdsFromValue(zsetopval * val)2036 sds zuiSdsFromValue(zsetopval *val) {
2037     if (val->ele == NULL) {
2038         if (val->estr != NULL) {
2039             val->ele = sdsnewlen((char*)val->estr,val->elen);
2040         } else {
2041             val->ele = sdsfromlonglong(val->ell);
2042         }
2043         val->flags |= OPVAL_DIRTY_SDS;
2044     }
2045     return val->ele;
2046 }
2047 
2048 /* This is different from zuiSdsFromValue since returns a new SDS string
2049  * which is up to the caller to free. */
zuiNewSdsFromValue(zsetopval * val)2050 sds zuiNewSdsFromValue(zsetopval *val) {
2051     if (val->flags & OPVAL_DIRTY_SDS) {
2052         /* We have already one to return! */
2053         sds ele = val->ele;
2054         val->flags &= ~OPVAL_DIRTY_SDS;
2055         val->ele = NULL;
2056         return ele;
2057     } else if (val->ele) {
2058         return sdsdup(val->ele);
2059     } else if (val->estr) {
2060         return sdsnewlen((char*)val->estr,val->elen);
2061     } else {
2062         return sdsfromlonglong(val->ell);
2063     }
2064 }
2065 
zuiBufferFromValue(zsetopval * val)2066 int zuiBufferFromValue(zsetopval *val) {
2067     if (val->estr == NULL) {
2068         if (val->ele != NULL) {
2069             val->elen = sdslen(val->ele);
2070             val->estr = (unsigned char*)val->ele;
2071         } else {
2072             val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell);
2073             val->estr = val->_buf;
2074         }
2075     }
2076     return 1;
2077 }
2078 
2079 /* Find value pointed to by val in the source pointer to by op. When found,
2080  * return 1 and store its score in target. Return 0 otherwise. */
zuiFind(zsetopsrc * op,zsetopval * val,double * score)2081 int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
2082     if (op->subject == NULL)
2083         return 0;
2084 
2085     if (op->type == OBJ_SET) {
2086         if (op->encoding == OBJ_ENCODING_INTSET) {
2087             if (zuiLongLongFromValue(val) &&
2088                 intsetFind(op->subject->ptr,val->ell))
2089             {
2090                 *score = 1.0;
2091                 return 1;
2092             } else {
2093                 return 0;
2094             }
2095         } else if (op->encoding == OBJ_ENCODING_HT) {
2096             dict *ht = op->subject->ptr;
2097             zuiSdsFromValue(val);
2098             if (dictFind(ht,val->ele) != NULL) {
2099                 *score = 1.0;
2100                 return 1;
2101             } else {
2102                 return 0;
2103             }
2104         } else {
2105             serverPanic("Unknown set encoding");
2106         }
2107     } else if (op->type == OBJ_ZSET) {
2108         zuiSdsFromValue(val);
2109 
2110         if (op->encoding == OBJ_ENCODING_ZIPLIST) {
2111             if (zzlFind(op->subject->ptr,val->ele,score) != NULL) {
2112                 /* Score is already set by zzlFind. */
2113                 return 1;
2114             } else {
2115                 return 0;
2116             }
2117         } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
2118             zset *zs = op->subject->ptr;
2119             dictEntry *de;
2120             if ((de = dictFind(zs->dict,val->ele)) != NULL) {
2121                 *score = *(double*)dictGetVal(de);
2122                 return 1;
2123             } else {
2124                 return 0;
2125             }
2126         } else {
2127             serverPanic("Unknown sorted set encoding");
2128         }
2129     } else {
2130         serverPanic("Unsupported type");
2131     }
2132 }
2133 
zuiCompareByCardinality(const void * s1,const void * s2)2134 int zuiCompareByCardinality(const void *s1, const void *s2) {
2135     unsigned long first = zuiLength((zsetopsrc*)s1);
2136     unsigned long second = zuiLength((zsetopsrc*)s2);
2137     if (first > second) return 1;
2138     if (first < second) return -1;
2139     return 0;
2140 }
2141 
2142 #define REDIS_AGGR_SUM 1
2143 #define REDIS_AGGR_MIN 2
2144 #define REDIS_AGGR_MAX 3
2145 #define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e))
2146 
zunionInterAggregate(double * target,double val,int aggregate)2147 inline static void zunionInterAggregate(double *target, double val, int aggregate) {
2148     if (aggregate == REDIS_AGGR_SUM) {
2149         *target = *target + val;
2150         /* The result of adding two doubles is NaN when one variable
2151          * is +inf and the other is -inf. When these numbers are added,
2152          * we maintain the convention of the result being 0.0. */
2153         if (isnan(*target)) *target = 0.0;
2154     } else if (aggregate == REDIS_AGGR_MIN) {
2155         *target = val < *target ? val : *target;
2156     } else if (aggregate == REDIS_AGGR_MAX) {
2157         *target = val > *target ? val : *target;
2158     } else {
2159         /* safety net */
2160         serverPanic("Unknown ZUNION/INTER aggregate type");
2161     }
2162 }
2163 
2164 uint64_t dictSdsHash(const void *key);
2165 int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
2166 
2167 dictType setAccumulatorDictType = {
2168     dictSdsHash,               /* hash function */
2169     NULL,                      /* key dup */
2170     NULL,                      /* val dup */
2171     dictSdsKeyCompare,         /* key compare */
2172     NULL,                      /* key destructor */
2173     NULL                       /* val destructor */
2174 };
2175 
zunionInterGenericCommand(client * c,robj * dstkey,int op)2176 void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
2177     int i, j;
2178     long setnum;
2179     int aggregate = REDIS_AGGR_SUM;
2180     zsetopsrc *src;
2181     zsetopval zval;
2182     sds tmp;
2183     size_t maxelelen = 0;
2184     robj *dstobj;
2185     zset *dstzset;
2186     zskiplistNode *znode;
2187     int touched = 0;
2188 
2189     /* expect setnum input keys to be given */
2190     if ((getLongFromObjectOrReply(c, c->argv[2], &setnum, NULL) != C_OK))
2191         return;
2192 
2193     if (setnum < 1) {
2194         addReplyError(c,
2195             "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
2196         return;
2197     }
2198 
2199     /* test if the expected number of keys would overflow */
2200     if (setnum > c->argc-3) {
2201         addReply(c,shared.syntaxerr);
2202         return;
2203     }
2204 
2205     /* read keys to be used for input */
2206     src = zcalloc(sizeof(zsetopsrc) * setnum);
2207     for (i = 0, j = 3; i < setnum; i++, j++) {
2208         robj *obj = lookupKeyWrite(c->db,c->argv[j]);
2209         if (obj != NULL) {
2210             if (obj->type != OBJ_ZSET && obj->type != OBJ_SET) {
2211                 zfree(src);
2212                 addReply(c,shared.wrongtypeerr);
2213                 return;
2214             }
2215 
2216             src[i].subject = obj;
2217             src[i].type = obj->type;
2218             src[i].encoding = obj->encoding;
2219         } else {
2220             src[i].subject = NULL;
2221         }
2222 
2223         /* Default all weights to 1. */
2224         src[i].weight = 1.0;
2225     }
2226 
2227     /* parse optional extra arguments */
2228     if (j < c->argc) {
2229         int remaining = c->argc - j;
2230 
2231         while (remaining) {
2232             if (remaining >= (setnum + 1) &&
2233                 !strcasecmp(c->argv[j]->ptr,"weights"))
2234             {
2235                 j++; remaining--;
2236                 for (i = 0; i < setnum; i++, j++, remaining--) {
2237                     if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
2238                             "weight value is not a float") != C_OK)
2239                     {
2240                         zfree(src);
2241                         return;
2242                     }
2243                 }
2244             } else if (remaining >= 2 &&
2245                        !strcasecmp(c->argv[j]->ptr,"aggregate"))
2246             {
2247                 j++; remaining--;
2248                 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
2249                     aggregate = REDIS_AGGR_SUM;
2250                 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
2251                     aggregate = REDIS_AGGR_MIN;
2252                 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
2253                     aggregate = REDIS_AGGR_MAX;
2254                 } else {
2255                     zfree(src);
2256                     addReply(c,shared.syntaxerr);
2257                     return;
2258                 }
2259                 j++; remaining--;
2260             } else {
2261                 zfree(src);
2262                 addReply(c,shared.syntaxerr);
2263                 return;
2264             }
2265         }
2266     }
2267 
2268     /* sort sets from the smallest to largest, this will improve our
2269      * algorithm's performance */
2270     qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
2271 
2272     dstobj = createZsetObject();
2273     dstzset = dstobj->ptr;
2274     memset(&zval, 0, sizeof(zval));
2275 
2276     if (op == SET_OP_INTER) {
2277         /* Skip everything if the smallest input is empty. */
2278         if (zuiLength(&src[0]) > 0) {
2279             /* Precondition: as src[0] is non-empty and the inputs are ordered
2280              * by size, all src[i > 0] are non-empty too. */
2281             zuiInitIterator(&src[0]);
2282             while (zuiNext(&src[0],&zval)) {
2283                 double score, value;
2284 
2285                 score = src[0].weight * zval.score;
2286                 if (isnan(score)) score = 0;
2287 
2288                 for (j = 1; j < setnum; j++) {
2289                     /* It is not safe to access the zset we are
2290                      * iterating, so explicitly check for equal object. */
2291                     if (src[j].subject == src[0].subject) {
2292                         value = zval.score*src[j].weight;
2293                         zunionInterAggregate(&score,value,aggregate);
2294                     } else if (zuiFind(&src[j],&zval,&value)) {
2295                         value *= src[j].weight;
2296                         zunionInterAggregate(&score,value,aggregate);
2297                     } else {
2298                         break;
2299                     }
2300                 }
2301 
2302                 /* Only continue when present in every input. */
2303                 if (j == setnum) {
2304                     tmp = zuiNewSdsFromValue(&zval);
2305                     znode = zslInsert(dstzset->zsl,score,tmp);
2306                     dictAdd(dstzset->dict,tmp,&znode->score);
2307                     if (sdslen(tmp) > maxelelen) maxelelen = sdslen(tmp);
2308                 }
2309             }
2310             zuiClearIterator(&src[0]);
2311         }
2312     } else if (op == SET_OP_UNION) {
2313         dict *accumulator = dictCreate(&setAccumulatorDictType,NULL);
2314         dictIterator *di;
2315         dictEntry *de, *existing;
2316         double score;
2317 
2318         if (setnum) {
2319             /* Our union is at least as large as the largest set.
2320              * Resize the dictionary ASAP to avoid useless rehashing. */
2321             dictExpand(accumulator,zuiLength(&src[setnum-1]));
2322         }
2323 
2324         /* Step 1: Create a dictionary of elements -> aggregated-scores
2325          * by iterating one sorted set after the other. */
2326         for (i = 0; i < setnum; i++) {
2327             if (zuiLength(&src[i]) == 0) continue;
2328 
2329             zuiInitIterator(&src[i]);
2330             while (zuiNext(&src[i],&zval)) {
2331                 /* Initialize value */
2332                 score = src[i].weight * zval.score;
2333                 if (isnan(score)) score = 0;
2334 
2335                 /* Search for this element in the accumulating dictionary. */
2336                 de = dictAddRaw(accumulator,zuiSdsFromValue(&zval),&existing);
2337                 /* If we don't have it, we need to create a new entry. */
2338                 if (!existing) {
2339                     tmp = zuiNewSdsFromValue(&zval);
2340                     /* Remember the longest single element encountered,
2341                      * to understand if it's possible to convert to ziplist
2342                      * at the end. */
2343                      if (sdslen(tmp) > maxelelen) maxelelen = sdslen(tmp);
2344                     /* Update the element with its initial score. */
2345                     dictSetKey(accumulator, de, tmp);
2346                     dictSetDoubleVal(de,score);
2347                 } else {
2348                     /* Update the score with the score of the new instance
2349                      * of the element found in the current sorted set.
2350                      *
2351                      * Here we access directly the dictEntry double
2352                      * value inside the union as it is a big speedup
2353                      * compared to using the getDouble/setDouble API. */
2354                     zunionInterAggregate(&existing->v.d,score,aggregate);
2355                 }
2356             }
2357             zuiClearIterator(&src[i]);
2358         }
2359 
2360         /* Step 2: convert the dictionary into the final sorted set. */
2361         di = dictGetIterator(accumulator);
2362 
2363         /* We now are aware of the final size of the resulting sorted set,
2364          * let's resize the dictionary embedded inside the sorted set to the
2365          * right size, in order to save rehashing time. */
2366         dictExpand(dstzset->dict,dictSize(accumulator));
2367 
2368         while((de = dictNext(di)) != NULL) {
2369             sds ele = dictGetKey(de);
2370             score = dictGetDoubleVal(de);
2371             znode = zslInsert(dstzset->zsl,score,ele);
2372             dictAdd(dstzset->dict,ele,&znode->score);
2373         }
2374         dictReleaseIterator(di);
2375         dictRelease(accumulator);
2376     } else {
2377         serverPanic("Unknown operator");
2378     }
2379 
2380     if (dbDelete(c->db,dstkey))
2381         touched = 1;
2382     if (dstzset->zsl->length) {
2383         zsetConvertToZiplistIfNeeded(dstobj,maxelelen);
2384         dbAdd(c->db,dstkey,dstobj);
2385         addReplyLongLong(c,zsetLength(dstobj));
2386         signalModifiedKey(c->db,dstkey);
2387         notifyKeyspaceEvent(NOTIFY_ZSET,
2388             (op == SET_OP_UNION) ? "zunionstore" : "zinterstore",
2389             dstkey,c->db->id);
2390         server.dirty++;
2391     } else {
2392         decrRefCount(dstobj);
2393         addReply(c,shared.czero);
2394         if (touched) {
2395             signalModifiedKey(c->db,dstkey);
2396             notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id);
2397             server.dirty++;
2398         }
2399     }
2400     zfree(src);
2401 }
2402 
zunionstoreCommand(client * c)2403 void zunionstoreCommand(client *c) {
2404     zunionInterGenericCommand(c,c->argv[1], SET_OP_UNION);
2405 }
2406 
zinterstoreCommand(client * c)2407 void zinterstoreCommand(client *c) {
2408     zunionInterGenericCommand(c,c->argv[1], SET_OP_INTER);
2409 }
2410 
zrangeGenericCommand(client * c,int reverse)2411 void zrangeGenericCommand(client *c, int reverse) {
2412     robj *key = c->argv[1];
2413     robj *zobj;
2414     int withscores = 0;
2415     long start;
2416     long end;
2417     long llen;
2418     long rangelen;
2419 
2420     if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
2421         (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
2422 
2423     if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
2424         withscores = 1;
2425     } else if (c->argc >= 5) {
2426         addReply(c,shared.syntaxerr);
2427         return;
2428     }
2429 
2430     if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
2431          || checkType(c,zobj,OBJ_ZSET)) return;
2432 
2433     /* Sanitize indexes. */
2434     llen = zsetLength(zobj);
2435     if (start < 0) start = llen+start;
2436     if (end < 0) end = llen+end;
2437     if (start < 0) start = 0;
2438 
2439     /* Invariant: start >= 0, so this test will be true when end < 0.
2440      * The range is empty when start > end or start >= length. */
2441     if (start > end || start >= llen) {
2442         addReply(c,shared.emptymultibulk);
2443         return;
2444     }
2445     if (end >= llen) end = llen-1;
2446     rangelen = (end-start)+1;
2447 
2448     /* Return the result in form of a multi-bulk reply */
2449     addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
2450 
2451     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
2452         unsigned char *zl = zobj->ptr;
2453         unsigned char *eptr, *sptr;
2454         unsigned char *vstr;
2455         unsigned int vlen;
2456         long long vlong;
2457 
2458         if (reverse)
2459             eptr = ziplistIndex(zl,-2-(2*start));
2460         else
2461             eptr = ziplistIndex(zl,2*start);
2462 
2463         serverAssertWithInfo(c,zobj,eptr != NULL);
2464         sptr = ziplistNext(zl,eptr);
2465 
2466         while (rangelen--) {
2467             serverAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL);
2468             serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
2469             if (vstr == NULL)
2470                 addReplyBulkLongLong(c,vlong);
2471             else
2472                 addReplyBulkCBuffer(c,vstr,vlen);
2473 
2474             if (withscores)
2475                 addReplyDouble(c,zzlGetScore(sptr));
2476 
2477             if (reverse)
2478                 zzlPrev(zl,&eptr,&sptr);
2479             else
2480                 zzlNext(zl,&eptr,&sptr);
2481         }
2482 
2483     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
2484         zset *zs = zobj->ptr;
2485         zskiplist *zsl = zs->zsl;
2486         zskiplistNode *ln;
2487         sds ele;
2488 
2489         /* Check if starting point is trivial, before doing log(N) lookup. */
2490         if (reverse) {
2491             ln = zsl->tail;
2492             if (start > 0)
2493                 ln = zslGetElementByRank(zsl,llen-start);
2494         } else {
2495             ln = zsl->header->level[0].forward;
2496             if (start > 0)
2497                 ln = zslGetElementByRank(zsl,start+1);
2498         }
2499 
2500         while(rangelen--) {
2501             serverAssertWithInfo(c,zobj,ln != NULL);
2502             ele = ln->ele;
2503             addReplyBulkCBuffer(c,ele,sdslen(ele));
2504             if (withscores)
2505                 addReplyDouble(c,ln->score);
2506             ln = reverse ? ln->backward : ln->level[0].forward;
2507         }
2508     } else {
2509         serverPanic("Unknown sorted set encoding");
2510     }
2511 }
2512 
zrangeCommand(client * c)2513 void zrangeCommand(client *c) {
2514     zrangeGenericCommand(c,0);
2515 }
2516 
zrevrangeCommand(client * c)2517 void zrevrangeCommand(client *c) {
2518     zrangeGenericCommand(c,1);
2519 }
2520 
2521 /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
genericZrangebyscoreCommand(client * c,int reverse)2522 void genericZrangebyscoreCommand(client *c, int reverse) {
2523     zrangespec range;
2524     robj *key = c->argv[1];
2525     robj *zobj;
2526     long offset = 0, limit = -1;
2527     int withscores = 0;
2528     unsigned long rangelen = 0;
2529     void *replylen = NULL;
2530     int minidx, maxidx;
2531 
2532     /* Parse the range arguments. */
2533     if (reverse) {
2534         /* Range is given as [max,min] */
2535         maxidx = 2; minidx = 3;
2536     } else {
2537         /* Range is given as [min,max] */
2538         minidx = 2; maxidx = 3;
2539     }
2540 
2541     if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != C_OK) {
2542         addReplyError(c,"min or max is not a float");
2543         return;
2544     }
2545 
2546     /* Parse optional extra arguments. Note that ZCOUNT will exactly have
2547      * 4 arguments, so we'll never enter the following code path. */
2548     if (c->argc > 4) {
2549         int remaining = c->argc - 4;
2550         int pos = 4;
2551 
2552         while (remaining) {
2553             if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
2554                 pos++; remaining--;
2555                 withscores = 1;
2556             } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
2557                 if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL)
2558                         != C_OK) ||
2559                     (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL)
2560                         != C_OK))
2561                 {
2562                     return;
2563                 }
2564                 pos += 3; remaining -= 3;
2565             } else {
2566                 addReply(c,shared.syntaxerr);
2567                 return;
2568             }
2569         }
2570     }
2571 
2572     /* Ok, lookup the key and get the range */
2573     if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL ||
2574         checkType(c,zobj,OBJ_ZSET)) return;
2575 
2576     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
2577         unsigned char *zl = zobj->ptr;
2578         unsigned char *eptr, *sptr;
2579         unsigned char *vstr;
2580         unsigned int vlen;
2581         long long vlong;
2582         double score;
2583 
2584         /* If reversed, get the last node in range as starting point. */
2585         if (reverse) {
2586             eptr = zzlLastInRange(zl,&range);
2587         } else {
2588             eptr = zzlFirstInRange(zl,&range);
2589         }
2590 
2591         /* No "first" element in the specified interval. */
2592         if (eptr == NULL) {
2593             addReply(c, shared.emptymultibulk);
2594             return;
2595         }
2596 
2597         /* Get score pointer for the first element. */
2598         serverAssertWithInfo(c,zobj,eptr != NULL);
2599         sptr = ziplistNext(zl,eptr);
2600 
2601         /* We don't know in advance how many matching elements there are in the
2602          * list, so we push this object that will represent the multi-bulk
2603          * length in the output buffer, and will "fix" it later */
2604         replylen = addDeferredMultiBulkLength(c);
2605 
2606         /* If there is an offset, just traverse the number of elements without
2607          * checking the score because that is done in the next loop. */
2608         while (eptr && offset--) {
2609             if (reverse) {
2610                 zzlPrev(zl,&eptr,&sptr);
2611             } else {
2612                 zzlNext(zl,&eptr,&sptr);
2613             }
2614         }
2615 
2616         while (eptr && limit--) {
2617             score = zzlGetScore(sptr);
2618 
2619             /* Abort when the node is no longer in range. */
2620             if (reverse) {
2621                 if (!zslValueGteMin(score,&range)) break;
2622             } else {
2623                 if (!zslValueLteMax(score,&range)) break;
2624             }
2625 
2626             /* We know the element exists, so ziplistGet should always succeed */
2627             serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
2628 
2629             rangelen++;
2630             if (vstr == NULL) {
2631                 addReplyBulkLongLong(c,vlong);
2632             } else {
2633                 addReplyBulkCBuffer(c,vstr,vlen);
2634             }
2635 
2636             if (withscores) {
2637                 addReplyDouble(c,score);
2638             }
2639 
2640             /* Move to next node */
2641             if (reverse) {
2642                 zzlPrev(zl,&eptr,&sptr);
2643             } else {
2644                 zzlNext(zl,&eptr,&sptr);
2645             }
2646         }
2647     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
2648         zset *zs = zobj->ptr;
2649         zskiplist *zsl = zs->zsl;
2650         zskiplistNode *ln;
2651 
2652         /* If reversed, get the last node in range as starting point. */
2653         if (reverse) {
2654             ln = zslLastInRange(zsl,&range);
2655         } else {
2656             ln = zslFirstInRange(zsl,&range);
2657         }
2658 
2659         /* No "first" element in the specified interval. */
2660         if (ln == NULL) {
2661             addReply(c, shared.emptymultibulk);
2662             return;
2663         }
2664 
2665         /* We don't know in advance how many matching elements there are in the
2666          * list, so we push this object that will represent the multi-bulk
2667          * length in the output buffer, and will "fix" it later */
2668         replylen = addDeferredMultiBulkLength(c);
2669 
2670         /* If there is an offset, just traverse the number of elements without
2671          * checking the score because that is done in the next loop. */
2672         while (ln && offset--) {
2673             if (reverse) {
2674                 ln = ln->backward;
2675             } else {
2676                 ln = ln->level[0].forward;
2677             }
2678         }
2679 
2680         while (ln && limit--) {
2681             /* Abort when the node is no longer in range. */
2682             if (reverse) {
2683                 if (!zslValueGteMin(ln->score,&range)) break;
2684             } else {
2685                 if (!zslValueLteMax(ln->score,&range)) break;
2686             }
2687 
2688             rangelen++;
2689             addReplyBulkCBuffer(c,ln->ele,sdslen(ln->ele));
2690 
2691             if (withscores) {
2692                 addReplyDouble(c,ln->score);
2693             }
2694 
2695             /* Move to next node */
2696             if (reverse) {
2697                 ln = ln->backward;
2698             } else {
2699                 ln = ln->level[0].forward;
2700             }
2701         }
2702     } else {
2703         serverPanic("Unknown sorted set encoding");
2704     }
2705 
2706     if (withscores) {
2707         rangelen *= 2;
2708     }
2709 
2710     setDeferredMultiBulkLength(c, replylen, rangelen);
2711 }
2712 
zrangebyscoreCommand(client * c)2713 void zrangebyscoreCommand(client *c) {
2714     genericZrangebyscoreCommand(c,0);
2715 }
2716 
zrevrangebyscoreCommand(client * c)2717 void zrevrangebyscoreCommand(client *c) {
2718     genericZrangebyscoreCommand(c,1);
2719 }
2720 
zcountCommand(client * c)2721 void zcountCommand(client *c) {
2722     robj *key = c->argv[1];
2723     robj *zobj;
2724     zrangespec range;
2725     unsigned long count = 0;
2726 
2727     /* Parse the range arguments */
2728     if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK) {
2729         addReplyError(c,"min or max is not a float");
2730         return;
2731     }
2732 
2733     /* Lookup the sorted set */
2734     if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
2735         checkType(c, zobj, OBJ_ZSET)) return;
2736 
2737     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
2738         unsigned char *zl = zobj->ptr;
2739         unsigned char *eptr, *sptr;
2740         double score;
2741 
2742         /* Use the first element in range as the starting point */
2743         eptr = zzlFirstInRange(zl,&range);
2744 
2745         /* No "first" element */
2746         if (eptr == NULL) {
2747             addReply(c, shared.czero);
2748             return;
2749         }
2750 
2751         /* First element is in range */
2752         sptr = ziplistNext(zl,eptr);
2753         score = zzlGetScore(sptr);
2754         serverAssertWithInfo(c,zobj,zslValueLteMax(score,&range));
2755 
2756         /* Iterate over elements in range */
2757         while (eptr) {
2758             score = zzlGetScore(sptr);
2759 
2760             /* Abort when the node is no longer in range. */
2761             if (!zslValueLteMax(score,&range)) {
2762                 break;
2763             } else {
2764                 count++;
2765                 zzlNext(zl,&eptr,&sptr);
2766             }
2767         }
2768     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
2769         zset *zs = zobj->ptr;
2770         zskiplist *zsl = zs->zsl;
2771         zskiplistNode *zn;
2772         unsigned long rank;
2773 
2774         /* Find first element in range */
2775         zn = zslFirstInRange(zsl, &range);
2776 
2777         /* Use rank of first element, if any, to determine preliminary count */
2778         if (zn != NULL) {
2779             rank = zslGetRank(zsl, zn->score, zn->ele);
2780             count = (zsl->length - (rank - 1));
2781 
2782             /* Find last element in range */
2783             zn = zslLastInRange(zsl, &range);
2784 
2785             /* Use rank of last element, if any, to determine the actual count */
2786             if (zn != NULL) {
2787                 rank = zslGetRank(zsl, zn->score, zn->ele);
2788                 count -= (zsl->length - rank);
2789             }
2790         }
2791     } else {
2792         serverPanic("Unknown sorted set encoding");
2793     }
2794 
2795     addReplyLongLong(c, count);
2796 }
2797 
zlexcountCommand(client * c)2798 void zlexcountCommand(client *c) {
2799     robj *key = c->argv[1];
2800     robj *zobj;
2801     zlexrangespec range;
2802     unsigned long count = 0;
2803 
2804     /* Parse the range arguments */
2805     if (zslParseLexRange(c->argv[2],c->argv[3],&range) != C_OK) {
2806         addReplyError(c,"min or max not valid string range item");
2807         return;
2808     }
2809 
2810     /* Lookup the sorted set */
2811     if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
2812         checkType(c, zobj, OBJ_ZSET))
2813     {
2814         zslFreeLexRange(&range);
2815         return;
2816     }
2817 
2818     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
2819         unsigned char *zl = zobj->ptr;
2820         unsigned char *eptr, *sptr;
2821 
2822         /* Use the first element in range as the starting point */
2823         eptr = zzlFirstInLexRange(zl,&range);
2824 
2825         /* No "first" element */
2826         if (eptr == NULL) {
2827             zslFreeLexRange(&range);
2828             addReply(c, shared.czero);
2829             return;
2830         }
2831 
2832         /* First element is in range */
2833         sptr = ziplistNext(zl,eptr);
2834         serverAssertWithInfo(c,zobj,zzlLexValueLteMax(eptr,&range));
2835 
2836         /* Iterate over elements in range */
2837         while (eptr) {
2838             /* Abort when the node is no longer in range. */
2839             if (!zzlLexValueLteMax(eptr,&range)) {
2840                 break;
2841             } else {
2842                 count++;
2843                 zzlNext(zl,&eptr,&sptr);
2844             }
2845         }
2846     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
2847         zset *zs = zobj->ptr;
2848         zskiplist *zsl = zs->zsl;
2849         zskiplistNode *zn;
2850         unsigned long rank;
2851 
2852         /* Find first element in range */
2853         zn = zslFirstInLexRange(zsl, &range);
2854 
2855         /* Use rank of first element, if any, to determine preliminary count */
2856         if (zn != NULL) {
2857             rank = zslGetRank(zsl, zn->score, zn->ele);
2858             count = (zsl->length - (rank - 1));
2859 
2860             /* Find last element in range */
2861             zn = zslLastInLexRange(zsl, &range);
2862 
2863             /* Use rank of last element, if any, to determine the actual count */
2864             if (zn != NULL) {
2865                 rank = zslGetRank(zsl, zn->score, zn->ele);
2866                 count -= (zsl->length - rank);
2867             }
2868         }
2869     } else {
2870         serverPanic("Unknown sorted set encoding");
2871     }
2872 
2873     zslFreeLexRange(&range);
2874     addReplyLongLong(c, count);
2875 }
2876 
2877 /* This command implements ZRANGEBYLEX, ZREVRANGEBYLEX. */
genericZrangebylexCommand(client * c,int reverse)2878 void genericZrangebylexCommand(client *c, int reverse) {
2879     zlexrangespec range;
2880     robj *key = c->argv[1];
2881     robj *zobj;
2882     long offset = 0, limit = -1;
2883     unsigned long rangelen = 0;
2884     void *replylen = NULL;
2885     int minidx, maxidx;
2886 
2887     /* Parse the range arguments. */
2888     if (reverse) {
2889         /* Range is given as [max,min] */
2890         maxidx = 2; minidx = 3;
2891     } else {
2892         /* Range is given as [min,max] */
2893         minidx = 2; maxidx = 3;
2894     }
2895 
2896     if (zslParseLexRange(c->argv[minidx],c->argv[maxidx],&range) != C_OK) {
2897         addReplyError(c,"min or max not valid string range item");
2898         return;
2899     }
2900 
2901     /* Parse optional extra arguments. Note that ZCOUNT will exactly have
2902      * 4 arguments, so we'll never enter the following code path. */
2903     if (c->argc > 4) {
2904         int remaining = c->argc - 4;
2905         int pos = 4;
2906 
2907         while (remaining) {
2908             if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
2909                 if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != C_OK) ||
2910                     (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != C_OK)) {
2911                     zslFreeLexRange(&range);
2912                     return;
2913                 }
2914                 pos += 3; remaining -= 3;
2915             } else {
2916                 zslFreeLexRange(&range);
2917                 addReply(c,shared.syntaxerr);
2918                 return;
2919             }
2920         }
2921     }
2922 
2923     /* Ok, lookup the key and get the range */
2924     if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL ||
2925         checkType(c,zobj,OBJ_ZSET))
2926     {
2927         zslFreeLexRange(&range);
2928         return;
2929     }
2930 
2931     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
2932         unsigned char *zl = zobj->ptr;
2933         unsigned char *eptr, *sptr;
2934         unsigned char *vstr;
2935         unsigned int vlen;
2936         long long vlong;
2937 
2938         /* If reversed, get the last node in range as starting point. */
2939         if (reverse) {
2940             eptr = zzlLastInLexRange(zl,&range);
2941         } else {
2942             eptr = zzlFirstInLexRange(zl,&range);
2943         }
2944 
2945         /* No "first" element in the specified interval. */
2946         if (eptr == NULL) {
2947             addReply(c, shared.emptymultibulk);
2948             zslFreeLexRange(&range);
2949             return;
2950         }
2951 
2952         /* Get score pointer for the first element. */
2953         serverAssertWithInfo(c,zobj,eptr != NULL);
2954         sptr = ziplistNext(zl,eptr);
2955 
2956         /* We don't know in advance how many matching elements there are in the
2957          * list, so we push this object that will represent the multi-bulk
2958          * length in the output buffer, and will "fix" it later */
2959         replylen = addDeferredMultiBulkLength(c);
2960 
2961         /* If there is an offset, just traverse the number of elements without
2962          * checking the score because that is done in the next loop. */
2963         while (eptr && offset--) {
2964             if (reverse) {
2965                 zzlPrev(zl,&eptr,&sptr);
2966             } else {
2967                 zzlNext(zl,&eptr,&sptr);
2968             }
2969         }
2970 
2971         while (eptr && limit--) {
2972             /* Abort when the node is no longer in range. */
2973             if (reverse) {
2974                 if (!zzlLexValueGteMin(eptr,&range)) break;
2975             } else {
2976                 if (!zzlLexValueLteMax(eptr,&range)) break;
2977             }
2978 
2979             /* We know the element exists, so ziplistGet should always
2980              * succeed. */
2981             serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
2982 
2983             rangelen++;
2984             if (vstr == NULL) {
2985                 addReplyBulkLongLong(c,vlong);
2986             } else {
2987                 addReplyBulkCBuffer(c,vstr,vlen);
2988             }
2989 
2990             /* Move to next node */
2991             if (reverse) {
2992                 zzlPrev(zl,&eptr,&sptr);
2993             } else {
2994                 zzlNext(zl,&eptr,&sptr);
2995             }
2996         }
2997     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
2998         zset *zs = zobj->ptr;
2999         zskiplist *zsl = zs->zsl;
3000         zskiplistNode *ln;
3001 
3002         /* If reversed, get the last node in range as starting point. */
3003         if (reverse) {
3004             ln = zslLastInLexRange(zsl,&range);
3005         } else {
3006             ln = zslFirstInLexRange(zsl,&range);
3007         }
3008 
3009         /* No "first" element in the specified interval. */
3010         if (ln == NULL) {
3011             addReply(c, shared.emptymultibulk);
3012             zslFreeLexRange(&range);
3013             return;
3014         }
3015 
3016         /* We don't know in advance how many matching elements there are in the
3017          * list, so we push this object that will represent the multi-bulk
3018          * length in the output buffer, and will "fix" it later */
3019         replylen = addDeferredMultiBulkLength(c);
3020 
3021         /* If there is an offset, just traverse the number of elements without
3022          * checking the score because that is done in the next loop. */
3023         while (ln && offset--) {
3024             if (reverse) {
3025                 ln = ln->backward;
3026             } else {
3027                 ln = ln->level[0].forward;
3028             }
3029         }
3030 
3031         while (ln && limit--) {
3032             /* Abort when the node is no longer in range. */
3033             if (reverse) {
3034                 if (!zslLexValueGteMin(ln->ele,&range)) break;
3035             } else {
3036                 if (!zslLexValueLteMax(ln->ele,&range)) break;
3037             }
3038 
3039             rangelen++;
3040             addReplyBulkCBuffer(c,ln->ele,sdslen(ln->ele));
3041 
3042             /* Move to next node */
3043             if (reverse) {
3044                 ln = ln->backward;
3045             } else {
3046                 ln = ln->level[0].forward;
3047             }
3048         }
3049     } else {
3050         serverPanic("Unknown sorted set encoding");
3051     }
3052 
3053     zslFreeLexRange(&range);
3054     setDeferredMultiBulkLength(c, replylen, rangelen);
3055 }
3056 
zrangebylexCommand(client * c)3057 void zrangebylexCommand(client *c) {
3058     genericZrangebylexCommand(c,0);
3059 }
3060 
zrevrangebylexCommand(client * c)3061 void zrevrangebylexCommand(client *c) {
3062     genericZrangebylexCommand(c,1);
3063 }
3064 
zcardCommand(client * c)3065 void zcardCommand(client *c) {
3066     robj *key = c->argv[1];
3067     robj *zobj;
3068 
3069     if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
3070         checkType(c,zobj,OBJ_ZSET)) return;
3071 
3072     addReplyLongLong(c,zsetLength(zobj));
3073 }
3074 
zscoreCommand(client * c)3075 void zscoreCommand(client *c) {
3076     robj *key = c->argv[1];
3077     robj *zobj;
3078     double score;
3079 
3080     if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
3081         checkType(c,zobj,OBJ_ZSET)) return;
3082 
3083     if (zsetScore(zobj,c->argv[2]->ptr,&score) == C_ERR) {
3084         addReply(c,shared.nullbulk);
3085     } else {
3086         addReplyDouble(c,score);
3087     }
3088 }
3089 
zrankGenericCommand(client * c,int reverse)3090 void zrankGenericCommand(client *c, int reverse) {
3091     robj *key = c->argv[1];
3092     robj *ele = c->argv[2];
3093     robj *zobj;
3094     long rank;
3095 
3096     if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
3097         checkType(c,zobj,OBJ_ZSET)) return;
3098 
3099     serverAssertWithInfo(c,ele,sdsEncodedObject(ele));
3100     rank = zsetRank(zobj,ele->ptr,reverse);
3101     if (rank >= 0) {
3102         addReplyLongLong(c,rank);
3103     } else {
3104         addReply(c,shared.nullbulk);
3105     }
3106 }
3107 
zrankCommand(client * c)3108 void zrankCommand(client *c) {
3109     zrankGenericCommand(c, 0);
3110 }
3111 
zrevrankCommand(client * c)3112 void zrevrankCommand(client *c) {
3113     zrankGenericCommand(c, 1);
3114 }
3115 
zscanCommand(client * c)3116 void zscanCommand(client *c) {
3117     robj *o;
3118     unsigned long cursor;
3119 
3120     if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return;
3121     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL ||
3122         checkType(c,o,OBJ_ZSET)) return;
3123     scanGenericCommand(c,o,cursor);
3124 }
3125 
3126 /* This command implements the generic zpop operation, used by:
3127  * ZPOPMIN, ZPOPMAX, BZPOPMIN and BZPOPMAX. This function is also used
3128  * inside blocked.c in the unblocking stage of BZPOPMIN and BZPOPMAX.
3129  *
3130  * If 'emitkey' is true also the key name is emitted, useful for the blocking
3131  * behavior of BZPOP[MIN|MAX], since we can block into multiple keys.
3132  *
3133  * The synchronous version instead does not need to emit the key, but may
3134  * use the 'count' argument to return multiple items if available. */
genericZpopCommand(client * c,robj ** keyv,int keyc,int where,int emitkey,robj * countarg)3135 void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey, robj *countarg) {
3136     int idx;
3137     robj *key = NULL;
3138     robj *zobj = NULL;
3139     sds ele;
3140     double score;
3141     long count = 1;
3142 
3143     /* If a count argument as passed, parse it or return an error. */
3144     if (countarg) {
3145         if (getLongFromObjectOrReply(c,countarg,&count,NULL) != C_OK)
3146             return;
3147         if (count <= 0) {
3148             addReply(c,shared.emptymultibulk);
3149             return;
3150         }
3151     }
3152 
3153     /* Check type and break on the first error, otherwise identify candidate. */
3154     idx = 0;
3155     while (idx < keyc) {
3156         key = keyv[idx++];
3157         zobj = lookupKeyWrite(c->db,key);
3158         if (!zobj) continue;
3159         if (checkType(c,zobj,OBJ_ZSET)) return;
3160         break;
3161     }
3162 
3163     /* No candidate for zpopping, return empty. */
3164     if (!zobj) {
3165         addReply(c,shared.emptymultibulk);
3166         return;
3167     }
3168 
3169     void *arraylen_ptr = addDeferredMultiBulkLength(c);
3170     long arraylen = 0;
3171 
3172     /* We emit the key only for the blocking variant. */
3173     if (emitkey) addReplyBulk(c,key);
3174 
3175     /* Remove the element. */
3176     do {
3177         if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
3178             unsigned char *zl = zobj->ptr;
3179             unsigned char *eptr, *sptr;
3180             unsigned char *vstr;
3181             unsigned int vlen;
3182             long long vlong;
3183 
3184             /* Get the first or last element in the sorted set. */
3185             eptr = ziplistIndex(zl,where == ZSET_MAX ? -2 : 0);
3186             serverAssertWithInfo(c,zobj,eptr != NULL);
3187             serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
3188             if (vstr == NULL)
3189                 ele = sdsfromlonglong(vlong);
3190             else
3191                 ele = sdsnewlen(vstr,vlen);
3192 
3193             /* Get the score. */
3194             sptr = ziplistNext(zl,eptr);
3195             serverAssertWithInfo(c,zobj,sptr != NULL);
3196             score = zzlGetScore(sptr);
3197         } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
3198             zset *zs = zobj->ptr;
3199             zskiplist *zsl = zs->zsl;
3200             zskiplistNode *zln;
3201 
3202             /* Get the first or last element in the sorted set. */
3203             zln = (where == ZSET_MAX ? zsl->tail :
3204                                        zsl->header->level[0].forward);
3205 
3206             /* There must be an element in the sorted set. */
3207             serverAssertWithInfo(c,zobj,zln != NULL);
3208             ele = sdsdup(zln->ele);
3209             score = zln->score;
3210         } else {
3211             serverPanic("Unknown sorted set encoding");
3212         }
3213 
3214         serverAssertWithInfo(c,zobj,zsetDel(zobj,ele));
3215         server.dirty++;
3216 
3217         if (arraylen == 0) { /* Do this only for the first iteration. */
3218             char *events[2] = {"zpopmin","zpopmax"};
3219             notifyKeyspaceEvent(NOTIFY_ZSET,events[where],key,c->db->id);
3220             signalModifiedKey(c->db,key);
3221         }
3222 
3223         addReplyBulkCBuffer(c,ele,sdslen(ele));
3224         addReplyDouble(c,score);
3225         sdsfree(ele);
3226         arraylen += 2;
3227 
3228         /* Remove the key, if indeed needed. */
3229         if (zsetLength(zobj) == 0) {
3230             dbDelete(c->db,key);
3231             notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
3232             break;
3233         }
3234     } while(--count);
3235 
3236     setDeferredMultiBulkLength(c,arraylen_ptr,arraylen + (emitkey != 0));
3237 }
3238 
3239 /* ZPOPMIN key [<count>] */
zpopminCommand(client * c)3240 void zpopminCommand(client *c) {
3241     if (c->argc > 3) {
3242         addReply(c,shared.syntaxerr);
3243         return;
3244     }
3245     genericZpopCommand(c,&c->argv[1],1,ZSET_MIN,0,
3246         c->argc == 3 ? c->argv[2] : NULL);
3247 }
3248 
3249 /* ZMAXPOP key [<count>] */
zpopmaxCommand(client * c)3250 void zpopmaxCommand(client *c) {
3251     if (c->argc > 3) {
3252         addReply(c,shared.syntaxerr);
3253         return;
3254     }
3255     genericZpopCommand(c,&c->argv[1],1,ZSET_MAX,0,
3256         c->argc == 3 ? c->argv[2] : NULL);
3257 }
3258 
3259 /* BZPOPMIN / BZPOPMAX actual implementation. */
blockingGenericZpopCommand(client * c,int where)3260 void blockingGenericZpopCommand(client *c, int where) {
3261     robj *o;
3262     mstime_t timeout;
3263     int j;
3264 
3265     if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
3266         != C_OK) return;
3267 
3268     for (j = 1; j < c->argc-1; j++) {
3269         o = lookupKeyWrite(c->db,c->argv[j]);
3270         if (o != NULL) {
3271             if (o->type != OBJ_ZSET) {
3272                 addReply(c,shared.wrongtypeerr);
3273                 return;
3274             } else {
3275                 if (zsetLength(o) != 0) {
3276                     /* Non empty zset, this is like a normal ZPOP[MIN|MAX]. */
3277                     genericZpopCommand(c,&c->argv[j],1,where,1,NULL);
3278                     /* Replicate it as an ZPOP[MIN|MAX] instead of BZPOP[MIN|MAX]. */
3279                     rewriteClientCommandVector(c,2,
3280                         where == ZSET_MAX ? shared.zpopmax : shared.zpopmin,
3281                         c->argv[j]);
3282                     return;
3283                 }
3284             }
3285         }
3286     }
3287 
3288     /* If we are inside a MULTI/EXEC and the zset is empty the only thing
3289      * we can do is treating it as a timeout (even with timeout 0). */
3290     if (c->flags & CLIENT_MULTI) {
3291         addReply(c,shared.nullmultibulk);
3292         return;
3293     }
3294 
3295     /* If the keys do not exist we must block */
3296     blockForKeys(c,BLOCKED_ZSET,c->argv + 1,c->argc - 2,timeout,NULL,NULL);
3297 }
3298 
3299 // BZPOPMIN key [key ...] timeout
bzpopminCommand(client * c)3300 void bzpopminCommand(client *c) {
3301     blockingGenericZpopCommand(c,ZSET_MIN);
3302 }
3303 
3304 // BZPOPMAX key [key ...] timeout
bzpopmaxCommand(client * c)3305 void bzpopmaxCommand(client *c) {
3306     blockingGenericZpopCommand(c,ZSET_MAX);
3307 }
3308