xref: /redis-3.2.3/src/t_zset.c (revision 60323407)
1 /*
2  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3  * Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are met:
8  *
9  *   * Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *   * Redistributions in binary form must reproduce the above copyright
12  *     notice, this list of conditions and the following disclaimer in the
13  *     documentation and/or other materials provided with the distribution.
14  *   * Neither the name of Redis nor the names of its contributors may be used
15  *     to endorse or promote products derived from this software without
16  *     specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 /*-----------------------------------------------------------------------------
32  * Sorted set API
33  *----------------------------------------------------------------------------*/
34 
35 /* ZSETs are ordered sets using two data structures to hold the same elements
36  * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
37  * data structure.
38  *
39  * The elements are added to a hash table mapping Redis objects to scores.
40  * At the same time the elements are added to a skip list mapping scores
41  * to Redis objects (so objects are sorted by scores in this "view"). */
42 
43 /* This skiplist implementation is almost a C translation of the original
44  * algorithm described by William Pugh in "Skip Lists: A Probabilistic
45  * Alternative to Balanced Trees", modified in three ways:
46  * a) this implementation allows for repeated scores.
47  * b) the comparison is not just by key (our 'score') but by satellite data.
48  * c) there is a back pointer, so it's a doubly linked list with the back
49  * pointers being only at "level 1". This allows to traverse the list
50  * from tail to head, useful for ZREVRANGE. */
51 
52 #include "server.h"
53 #include <math.h>
54 
55 static int zslLexValueGteMin(robj *value, zlexrangespec *spec);
56 static int zslLexValueLteMax(robj *value, zlexrangespec *spec);
57 
zslCreateNode(int level,double score,robj * obj)58 zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
59     zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
60     zn->score = score;
61     zn->obj = obj;
62     return zn;
63 }
64 
zslCreate(void)65 zskiplist *zslCreate(void) {
66     int j;
67     zskiplist *zsl;
68 
69     zsl = zmalloc(sizeof(*zsl));
70     zsl->level = 1;
71     zsl->length = 0;
72     zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
73     for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
74         zsl->header->level[j].forward = NULL;
75         zsl->header->level[j].span = 0;
76     }
77     zsl->header->backward = NULL;
78     zsl->tail = NULL;
79     return zsl;
80 }
81 
zslFreeNode(zskiplistNode * node)82 void zslFreeNode(zskiplistNode *node) {
83     decrRefCount(node->obj);
84     zfree(node);
85 }
86 
zslFree(zskiplist * zsl)87 void zslFree(zskiplist *zsl) {
88     zskiplistNode *node = zsl->header->level[0].forward, *next;
89 
90     zfree(zsl->header);
91     while(node) {
92         next = node->level[0].forward;
93         zslFreeNode(node);
94         node = next;
95     }
96     zfree(zsl);
97 }
98 
99 /* Returns a random level for the new skiplist node we are going to create.
100  * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
101  * (both inclusive), with a powerlaw-alike distribution where higher
102  * levels are less likely to be returned. */
zslRandomLevel(void)103 int zslRandomLevel(void) {
104     int level = 1;
105     while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
106         level += 1;
107     return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
108 }
109 
zslInsert(zskiplist * zsl,double score,robj * obj)110 zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
111     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
112     unsigned int rank[ZSKIPLIST_MAXLEVEL];
113     int i, level;
114 
115     serverAssert(!isnan(score));
116     x = zsl->header;
117     for (i = zsl->level-1; i >= 0; i--) {
118         /* store rank that is crossed to reach the insert position */
119         rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
120         while (x->level[i].forward &&
121             (x->level[i].forward->score < score ||
122                 (x->level[i].forward->score == score &&
123                 compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
124             rank[i] += x->level[i].span;
125             x = x->level[i].forward;
126         }
127         update[i] = x;
128     }
129     /* we assume the key is not already inside, since we allow duplicated
130      * scores, and the re-insertion of score and redis object should never
131      * happen since the caller of zslInsert() should test in the hash table
132      * if the element is already inside or not. */
133     level = zslRandomLevel();
134     if (level > zsl->level) {
135         for (i = zsl->level; i < level; i++) {
136             rank[i] = 0;
137             update[i] = zsl->header;
138             update[i]->level[i].span = zsl->length;
139         }
140         zsl->level = level;
141     }
142     x = zslCreateNode(level,score,obj);
143     for (i = 0; i < level; i++) {
144         x->level[i].forward = update[i]->level[i].forward;
145         update[i]->level[i].forward = x;
146 
147         /* update span covered by update[i] as x is inserted here */
148         x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
149         update[i]->level[i].span = (rank[0] - rank[i]) + 1;
150     }
151 
152     /* increment span for untouched levels */
153     for (i = level; i < zsl->level; i++) {
154         update[i]->level[i].span++;
155     }
156 
157     x->backward = (update[0] == zsl->header) ? NULL : update[0];
158     if (x->level[0].forward)
159         x->level[0].forward->backward = x;
160     else
161         zsl->tail = x;
162     zsl->length++;
163     return x;
164 }
165 
166 /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
zslDeleteNode(zskiplist * zsl,zskiplistNode * x,zskiplistNode ** update)167 void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
168     int i;
169     for (i = 0; i < zsl->level; i++) {
170         if (update[i]->level[i].forward == x) {
171             update[i]->level[i].span += x->level[i].span - 1;
172             update[i]->level[i].forward = x->level[i].forward;
173         } else {
174             update[i]->level[i].span -= 1;
175         }
176     }
177     if (x->level[0].forward) {
178         x->level[0].forward->backward = x->backward;
179     } else {
180         zsl->tail = x->backward;
181     }
182     while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
183         zsl->level--;
184     zsl->length--;
185 }
186 
187 /* Delete an element with matching score/object from the skiplist. */
zslDelete(zskiplist * zsl,double score,robj * obj)188 int zslDelete(zskiplist *zsl, double score, robj *obj) {
189     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
190     int i;
191 
192     x = zsl->header;
193     for (i = zsl->level-1; i >= 0; i--) {
194         while (x->level[i].forward &&
195             (x->level[i].forward->score < score ||
196                 (x->level[i].forward->score == score &&
197                 compareStringObjects(x->level[i].forward->obj,obj) < 0)))
198             x = x->level[i].forward;
199         update[i] = x;
200     }
201     /* We may have multiple elements with the same score, what we need
202      * is to find the element with both the right score and object. */
203     x = x->level[0].forward;
204     if (x && score == x->score && equalStringObjects(x->obj,obj)) {
205         zslDeleteNode(zsl, x, update);
206         zslFreeNode(x);
207         return 1;
208     }
209     return 0; /* not found */
210 }
211 
zslValueGteMin(double value,zrangespec * spec)212 static int zslValueGteMin(double value, zrangespec *spec) {
213     return spec->minex ? (value > spec->min) : (value >= spec->min);
214 }
215 
zslValueLteMax(double value,zrangespec * spec)216 int zslValueLteMax(double value, zrangespec *spec) {
217     return spec->maxex ? (value < spec->max) : (value <= spec->max);
218 }
219 
220 /* Returns if there is a part of the zset is in range. */
zslIsInRange(zskiplist * zsl,zrangespec * range)221 int zslIsInRange(zskiplist *zsl, zrangespec *range) {
222     zskiplistNode *x;
223 
224     /* Test for ranges that will always be empty. */
225     if (range->min > range->max ||
226             (range->min == range->max && (range->minex || range->maxex)))
227         return 0;
228     x = zsl->tail;
229     if (x == NULL || !zslValueGteMin(x->score,range))
230         return 0;
231     x = zsl->header->level[0].forward;
232     if (x == NULL || !zslValueLteMax(x->score,range))
233         return 0;
234     return 1;
235 }
236 
237 /* Find the first node that is contained in the specified range.
238  * Returns NULL when no element is contained in the range. */
zslFirstInRange(zskiplist * zsl,zrangespec * range)239 zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) {
240     zskiplistNode *x;
241     int i;
242 
243     /* If everything is out of range, return early. */
244     if (!zslIsInRange(zsl,range)) return NULL;
245 
246     x = zsl->header;
247     for (i = zsl->level-1; i >= 0; i--) {
248         /* Go forward while *OUT* of range. */
249         while (x->level[i].forward &&
250             !zslValueGteMin(x->level[i].forward->score,range))
251                 x = x->level[i].forward;
252     }
253 
254     /* This is an inner range, so the next node cannot be NULL. */
255     x = x->level[0].forward;
256     serverAssert(x != NULL);
257 
258     /* Check if score <= max. */
259     if (!zslValueLteMax(x->score,range)) return NULL;
260     return x;
261 }
262 
263 /* Find the last node that is contained in the specified range.
264  * Returns NULL when no element is contained in the range. */
zslLastInRange(zskiplist * zsl,zrangespec * range)265 zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range) {
266     zskiplistNode *x;
267     int i;
268 
269     /* If everything is out of range, return early. */
270     if (!zslIsInRange(zsl,range)) return NULL;
271 
272     x = zsl->header;
273     for (i = zsl->level-1; i >= 0; i--) {
274         /* Go forward while *IN* range. */
275         while (x->level[i].forward &&
276             zslValueLteMax(x->level[i].forward->score,range))
277                 x = x->level[i].forward;
278     }
279 
280     /* This is an inner range, so this node cannot be NULL. */
281     serverAssert(x != NULL);
282 
283     /* Check if score >= min. */
284     if (!zslValueGteMin(x->score,range)) return NULL;
285     return x;
286 }
287 
288 /* Delete all the elements with score between min and max from the skiplist.
289  * Min and max are inclusive, so a score >= min || score <= max is deleted.
290  * Note that this function takes the reference to the hash table view of the
291  * sorted set, in order to remove the elements from the hash table too. */
zslDeleteRangeByScore(zskiplist * zsl,zrangespec * range,dict * dict)292 unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dict) {
293     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
294     unsigned long removed = 0;
295     int i;
296 
297     x = zsl->header;
298     for (i = zsl->level-1; i >= 0; i--) {
299         while (x->level[i].forward && (range->minex ?
300             x->level[i].forward->score <= range->min :
301             x->level[i].forward->score < range->min))
302                 x = x->level[i].forward;
303         update[i] = x;
304     }
305 
306     /* Current node is the last with score < or <= min. */
307     x = x->level[0].forward;
308 
309     /* Delete nodes while in range. */
310     while (x &&
311            (range->maxex ? x->score < range->max : x->score <= range->max))
312     {
313         zskiplistNode *next = x->level[0].forward;
314         zslDeleteNode(zsl,x,update);
315         dictDelete(dict,x->obj);
316         zslFreeNode(x);
317         removed++;
318         x = next;
319     }
320     return removed;
321 }
322 
zslDeleteRangeByLex(zskiplist * zsl,zlexrangespec * range,dict * dict)323 unsigned long zslDeleteRangeByLex(zskiplist *zsl, zlexrangespec *range, dict *dict) {
324     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
325     unsigned long removed = 0;
326     int i;
327 
328 
329     x = zsl->header;
330     for (i = zsl->level-1; i >= 0; i--) {
331         while (x->level[i].forward &&
332             !zslLexValueGteMin(x->level[i].forward->obj,range))
333                 x = x->level[i].forward;
334         update[i] = x;
335     }
336 
337     /* Current node is the last with score < or <= min. */
338     x = x->level[0].forward;
339 
340     /* Delete nodes while in range. */
341     while (x && zslLexValueLteMax(x->obj,range)) {
342         zskiplistNode *next = x->level[0].forward;
343         zslDeleteNode(zsl,x,update);
344         dictDelete(dict,x->obj);
345         zslFreeNode(x);
346         removed++;
347         x = next;
348     }
349     return removed;
350 }
351 
352 /* Delete all the elements with rank between start and end from the skiplist.
353  * Start and end are inclusive. Note that start and end need to be 1-based */
zslDeleteRangeByRank(zskiplist * zsl,unsigned int start,unsigned int end,dict * dict)354 unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
355     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
356     unsigned long traversed = 0, removed = 0;
357     int i;
358 
359     x = zsl->header;
360     for (i = zsl->level-1; i >= 0; i--) {
361         while (x->level[i].forward && (traversed + x->level[i].span) < start) {
362             traversed += x->level[i].span;
363             x = x->level[i].forward;
364         }
365         update[i] = x;
366     }
367 
368     traversed++;
369     x = x->level[0].forward;
370     while (x && traversed <= end) {
371         zskiplistNode *next = x->level[0].forward;
372         zslDeleteNode(zsl,x,update);
373         dictDelete(dict,x->obj);
374         zslFreeNode(x);
375         removed++;
376         traversed++;
377         x = next;
378     }
379     return removed;
380 }
381 
382 /* Find the rank for an element by both score and key.
383  * Returns 0 when the element cannot be found, rank otherwise.
384  * Note that the rank is 1-based due to the span of zsl->header to the
385  * first element. */
zslGetRank(zskiplist * zsl,double score,robj * o)386 unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
387     zskiplistNode *x;
388     unsigned long rank = 0;
389     int i;
390 
391     x = zsl->header;
392     for (i = zsl->level-1; i >= 0; i--) {
393         while (x->level[i].forward &&
394             (x->level[i].forward->score < score ||
395                 (x->level[i].forward->score == score &&
396                 compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
397             rank += x->level[i].span;
398             x = x->level[i].forward;
399         }
400 
401         /* x might be equal to zsl->header, so test if obj is non-NULL */
402         if (x->obj && equalStringObjects(x->obj,o)) {
403             return rank;
404         }
405     }
406     return 0;
407 }
408 
409 /* Finds an element by its rank. The rank argument needs to be 1-based. */
zslGetElementByRank(zskiplist * zsl,unsigned long rank)410 zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
411     zskiplistNode *x;
412     unsigned long traversed = 0;
413     int i;
414 
415     x = zsl->header;
416     for (i = zsl->level-1; i >= 0; i--) {
417         while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
418         {
419             traversed += x->level[i].span;
420             x = x->level[i].forward;
421         }
422         if (traversed == rank) {
423             return x;
424         }
425     }
426     return NULL;
427 }
428 
429 /* Populate the rangespec according to the objects min and max. */
zslParseRange(robj * min,robj * max,zrangespec * spec)430 static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
431     char *eptr;
432     spec->minex = spec->maxex = 0;
433 
434     /* Parse the min-max interval. If one of the values is prefixed
435      * by the "(" character, it's considered "open". For instance
436      * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
437      * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
438     if (min->encoding == OBJ_ENCODING_INT) {
439         spec->min = (long)min->ptr;
440     } else {
441         if (((char*)min->ptr)[0] == '(') {
442             spec->min = strtod((char*)min->ptr+1,&eptr);
443             if (eptr[0] != '\0' || isnan(spec->min)) return C_ERR;
444             spec->minex = 1;
445         } else {
446             spec->min = strtod((char*)min->ptr,&eptr);
447             if (eptr[0] != '\0' || isnan(spec->min)) return C_ERR;
448         }
449     }
450     if (max->encoding == OBJ_ENCODING_INT) {
451         spec->max = (long)max->ptr;
452     } else {
453         if (((char*)max->ptr)[0] == '(') {
454             spec->max = strtod((char*)max->ptr+1,&eptr);
455             if (eptr[0] != '\0' || isnan(spec->max)) return C_ERR;
456             spec->maxex = 1;
457         } else {
458             spec->max = strtod((char*)max->ptr,&eptr);
459             if (eptr[0] != '\0' || isnan(spec->max)) return C_ERR;
460         }
461     }
462 
463     return C_OK;
464 }
465 
466 /* ------------------------ Lexicographic ranges ---------------------------- */
467 
468 /* Parse max or min argument of ZRANGEBYLEX.
469   * (foo means foo (open interval)
470   * [foo means foo (closed interval)
471   * - means the min string possible
472   * + means the max string possible
473   *
474   * If the string is valid the *dest pointer is set to the redis object
475   * that will be used for the comparision, and ex will be set to 0 or 1
476   * respectively if the item is exclusive or inclusive. C_OK will be
477   * returned.
478   *
479   * If the string is not a valid range C_ERR is returned, and the value
480   * of *dest and *ex is undefined. */
zslParseLexRangeItem(robj * item,robj ** dest,int * ex)481 int zslParseLexRangeItem(robj *item, robj **dest, int *ex) {
482     char *c = item->ptr;
483 
484     switch(c[0]) {
485     case '+':
486         if (c[1] != '\0') return C_ERR;
487         *ex = 0;
488         *dest = shared.maxstring;
489         incrRefCount(shared.maxstring);
490         return C_OK;
491     case '-':
492         if (c[1] != '\0') return C_ERR;
493         *ex = 0;
494         *dest = shared.minstring;
495         incrRefCount(shared.minstring);
496         return C_OK;
497     case '(':
498         *ex = 1;
499         *dest = createStringObject(c+1,sdslen(c)-1);
500         return C_OK;
501     case '[':
502         *ex = 0;
503         *dest = createStringObject(c+1,sdslen(c)-1);
504         return C_OK;
505     default:
506         return C_ERR;
507     }
508 }
509 
510 /* Populate the rangespec according to the objects min and max.
511  *
512  * Return C_OK on success. On error C_ERR is returned.
513  * When OK is returned the structure must be freed with zslFreeLexRange(),
514  * otherwise no release is needed. */
zslParseLexRange(robj * min,robj * max,zlexrangespec * spec)515 static int zslParseLexRange(robj *min, robj *max, zlexrangespec *spec) {
516     /* The range can't be valid if objects are integer encoded.
517      * Every item must start with ( or [. */
518     if (min->encoding == OBJ_ENCODING_INT ||
519         max->encoding == OBJ_ENCODING_INT) return C_ERR;
520 
521     spec->min = spec->max = NULL;
522     if (zslParseLexRangeItem(min, &spec->min, &spec->minex) == C_ERR ||
523         zslParseLexRangeItem(max, &spec->max, &spec->maxex) == C_ERR) {
524         if (spec->min) decrRefCount(spec->min);
525         if (spec->max) decrRefCount(spec->max);
526         return C_ERR;
527     } else {
528         return C_OK;
529     }
530 }
531 
532 /* Free a lex range structure, must be called only after zelParseLexRange()
533  * populated the structure with success (C_OK returned). */
zslFreeLexRange(zlexrangespec * spec)534 void zslFreeLexRange(zlexrangespec *spec) {
535     decrRefCount(spec->min);
536     decrRefCount(spec->max);
537 }
538 
539 /* This is just a wrapper to compareStringObjects() that is able to
540  * handle shared.minstring and shared.maxstring as the equivalent of
541  * -inf and +inf for strings */
compareStringObjectsForLexRange(robj * a,robj * b)542 int compareStringObjectsForLexRange(robj *a, robj *b) {
543     if (a == b) return 0; /* This makes sure that we handle inf,inf and
544                              -inf,-inf ASAP. One special case less. */
545     if (a == shared.minstring || b == shared.maxstring) return -1;
546     if (a == shared.maxstring || b == shared.minstring) return 1;
547     return compareStringObjects(a,b);
548 }
549 
zslLexValueGteMin(robj * value,zlexrangespec * spec)550 static int zslLexValueGteMin(robj *value, zlexrangespec *spec) {
551     return spec->minex ?
552         (compareStringObjectsForLexRange(value,spec->min) > 0) :
553         (compareStringObjectsForLexRange(value,spec->min) >= 0);
554 }
555 
zslLexValueLteMax(robj * value,zlexrangespec * spec)556 static int zslLexValueLteMax(robj *value, zlexrangespec *spec) {
557     return spec->maxex ?
558         (compareStringObjectsForLexRange(value,spec->max) < 0) :
559         (compareStringObjectsForLexRange(value,spec->max) <= 0);
560 }
561 
562 /* Returns if there is a part of the zset is in the lex range. */
zslIsInLexRange(zskiplist * zsl,zlexrangespec * range)563 int zslIsInLexRange(zskiplist *zsl, zlexrangespec *range) {
564     zskiplistNode *x;
565 
566     /* Test for ranges that will always be empty. */
567     if (compareStringObjectsForLexRange(range->min,range->max) > 1 ||
568             (compareStringObjects(range->min,range->max) == 0 &&
569             (range->minex || range->maxex)))
570         return 0;
571     x = zsl->tail;
572     if (x == NULL || !zslLexValueGteMin(x->obj,range))
573         return 0;
574     x = zsl->header->level[0].forward;
575     if (x == NULL || !zslLexValueLteMax(x->obj,range))
576         return 0;
577     return 1;
578 }
579 
580 /* Find the first node that is contained in the specified lex range.
581  * Returns NULL when no element is contained in the range. */
zslFirstInLexRange(zskiplist * zsl,zlexrangespec * range)582 zskiplistNode *zslFirstInLexRange(zskiplist *zsl, zlexrangespec *range) {
583     zskiplistNode *x;
584     int i;
585 
586     /* If everything is out of range, return early. */
587     if (!zslIsInLexRange(zsl,range)) return NULL;
588 
589     x = zsl->header;
590     for (i = zsl->level-1; i >= 0; i--) {
591         /* Go forward while *OUT* of range. */
592         while (x->level[i].forward &&
593             !zslLexValueGteMin(x->level[i].forward->obj,range))
594                 x = x->level[i].forward;
595     }
596 
597     /* This is an inner range, so the next node cannot be NULL. */
598     x = x->level[0].forward;
599     serverAssert(x != NULL);
600 
601     /* Check if score <= max. */
602     if (!zslLexValueLteMax(x->obj,range)) return NULL;
603     return x;
604 }
605 
606 /* Find the last node that is contained in the specified range.
607  * Returns NULL when no element is contained in the range. */
zslLastInLexRange(zskiplist * zsl,zlexrangespec * range)608 zskiplistNode *zslLastInLexRange(zskiplist *zsl, zlexrangespec *range) {
609     zskiplistNode *x;
610     int i;
611 
612     /* If everything is out of range, return early. */
613     if (!zslIsInLexRange(zsl,range)) return NULL;
614 
615     x = zsl->header;
616     for (i = zsl->level-1; i >= 0; i--) {
617         /* Go forward while *IN* range. */
618         while (x->level[i].forward &&
619             zslLexValueLteMax(x->level[i].forward->obj,range))
620                 x = x->level[i].forward;
621     }
622 
623     /* This is an inner range, so this node cannot be NULL. */
624     serverAssert(x != NULL);
625 
626     /* Check if score >= min. */
627     if (!zslLexValueGteMin(x->obj,range)) return NULL;
628     return x;
629 }
630 
631 /*-----------------------------------------------------------------------------
632  * Ziplist-backed sorted set API
633  *----------------------------------------------------------------------------*/
634 
zzlGetScore(unsigned char * sptr)635 double zzlGetScore(unsigned char *sptr) {
636     unsigned char *vstr;
637     unsigned int vlen;
638     long long vlong;
639     char buf[128];
640     double score;
641 
642     serverAssert(sptr != NULL);
643     serverAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
644 
645     if (vstr) {
646         memcpy(buf,vstr,vlen);
647         buf[vlen] = '\0';
648         score = strtod(buf,NULL);
649     } else {
650         score = vlong;
651     }
652 
653     return score;
654 }
655 
656 /* Return a ziplist element as a Redis string object.
657  * This simple abstraction can be used to simplifies some code at the
658  * cost of some performance. */
ziplistGetObject(unsigned char * sptr)659 robj *ziplistGetObject(unsigned char *sptr) {
660     unsigned char *vstr;
661     unsigned int vlen;
662     long long vlong;
663 
664     serverAssert(sptr != NULL);
665     serverAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
666 
667     if (vstr) {
668         return createStringObject((char*)vstr,vlen);
669     } else {
670         return createStringObjectFromLongLong(vlong);
671     }
672 }
673 
674 /* Compare element in sorted set with given element. */
zzlCompareElements(unsigned char * eptr,unsigned char * cstr,unsigned int clen)675 int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
676     unsigned char *vstr;
677     unsigned int vlen;
678     long long vlong;
679     unsigned char vbuf[32];
680     int minlen, cmp;
681 
682     serverAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
683     if (vstr == NULL) {
684         /* Store string representation of long long in buf. */
685         vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
686         vstr = vbuf;
687     }
688 
689     minlen = (vlen < clen) ? vlen : clen;
690     cmp = memcmp(vstr,cstr,minlen);
691     if (cmp == 0) return vlen-clen;
692     return cmp;
693 }
694 
zzlLength(unsigned char * zl)695 unsigned int zzlLength(unsigned char *zl) {
696     return ziplistLen(zl)/2;
697 }
698 
699 /* Move to next entry based on the values in eptr and sptr. Both are set to
700  * NULL when there is no next entry. */
zzlNext(unsigned char * zl,unsigned char ** eptr,unsigned char ** sptr)701 void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
702     unsigned char *_eptr, *_sptr;
703     serverAssert(*eptr != NULL && *sptr != NULL);
704 
705     _eptr = ziplistNext(zl,*sptr);
706     if (_eptr != NULL) {
707         _sptr = ziplistNext(zl,_eptr);
708         serverAssert(_sptr != NULL);
709     } else {
710         /* No next entry. */
711         _sptr = NULL;
712     }
713 
714     *eptr = _eptr;
715     *sptr = _sptr;
716 }
717 
718 /* Move to the previous entry based on the values in eptr and sptr. Both are
719  * set to NULL when there is no next entry. */
zzlPrev(unsigned char * zl,unsigned char ** eptr,unsigned char ** sptr)720 void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
721     unsigned char *_eptr, *_sptr;
722     serverAssert(*eptr != NULL && *sptr != NULL);
723 
724     _sptr = ziplistPrev(zl,*eptr);
725     if (_sptr != NULL) {
726         _eptr = ziplistPrev(zl,_sptr);
727         serverAssert(_eptr != NULL);
728     } else {
729         /* No previous entry. */
730         _eptr = NULL;
731     }
732 
733     *eptr = _eptr;
734     *sptr = _sptr;
735 }
736 
737 /* Returns if there is a part of the zset is in range. Should only be used
738  * internally by zzlFirstInRange and zzlLastInRange. */
zzlIsInRange(unsigned char * zl,zrangespec * range)739 int zzlIsInRange(unsigned char *zl, zrangespec *range) {
740     unsigned char *p;
741     double score;
742 
743     /* Test for ranges that will always be empty. */
744     if (range->min > range->max ||
745             (range->min == range->max && (range->minex || range->maxex)))
746         return 0;
747 
748     p = ziplistIndex(zl,-1); /* Last score. */
749     if (p == NULL) return 0; /* Empty sorted set */
750     score = zzlGetScore(p);
751     if (!zslValueGteMin(score,range))
752         return 0;
753 
754     p = ziplistIndex(zl,1); /* First score. */
755     serverAssert(p != NULL);
756     score = zzlGetScore(p);
757     if (!zslValueLteMax(score,range))
758         return 0;
759 
760     return 1;
761 }
762 
763 /* Find pointer to the first element contained in the specified range.
764  * Returns NULL when no element is contained in the range. */
zzlFirstInRange(unsigned char * zl,zrangespec * range)765 unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec *range) {
766     unsigned char *eptr = ziplistIndex(zl,0), *sptr;
767     double score;
768 
769     /* If everything is out of range, return early. */
770     if (!zzlIsInRange(zl,range)) return NULL;
771 
772     while (eptr != NULL) {
773         sptr = ziplistNext(zl,eptr);
774         serverAssert(sptr != NULL);
775 
776         score = zzlGetScore(sptr);
777         if (zslValueGteMin(score,range)) {
778             /* Check if score <= max. */
779             if (zslValueLteMax(score,range))
780                 return eptr;
781             return NULL;
782         }
783 
784         /* Move to next element. */
785         eptr = ziplistNext(zl,sptr);
786     }
787 
788     return NULL;
789 }
790 
791 /* Find pointer to the last element contained in the specified range.
792  * Returns NULL when no element is contained in the range. */
zzlLastInRange(unsigned char * zl,zrangespec * range)793 unsigned char *zzlLastInRange(unsigned char *zl, zrangespec *range) {
794     unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
795     double score;
796 
797     /* If everything is out of range, return early. */
798     if (!zzlIsInRange(zl,range)) return NULL;
799 
800     while (eptr != NULL) {
801         sptr = ziplistNext(zl,eptr);
802         serverAssert(sptr != NULL);
803 
804         score = zzlGetScore(sptr);
805         if (zslValueLteMax(score,range)) {
806             /* Check if score >= min. */
807             if (zslValueGteMin(score,range))
808                 return eptr;
809             return NULL;
810         }
811 
812         /* Move to previous element by moving to the score of previous element.
813          * When this returns NULL, we know there also is no element. */
814         sptr = ziplistPrev(zl,eptr);
815         if (sptr != NULL)
816             serverAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
817         else
818             eptr = NULL;
819     }
820 
821     return NULL;
822 }
823 
zzlLexValueGteMin(unsigned char * p,zlexrangespec * spec)824 static int zzlLexValueGteMin(unsigned char *p, zlexrangespec *spec) {
825     robj *value = ziplistGetObject(p);
826     int res = zslLexValueGteMin(value,spec);
827     decrRefCount(value);
828     return res;
829 }
830 
zzlLexValueLteMax(unsigned char * p,zlexrangespec * spec)831 static int zzlLexValueLteMax(unsigned char *p, zlexrangespec *spec) {
832     robj *value = ziplistGetObject(p);
833     int res = zslLexValueLteMax(value,spec);
834     decrRefCount(value);
835     return res;
836 }
837 
838 /* Returns if there is a part of the zset is in range. Should only be used
839  * internally by zzlFirstInRange and zzlLastInRange. */
zzlIsInLexRange(unsigned char * zl,zlexrangespec * range)840 int zzlIsInLexRange(unsigned char *zl, zlexrangespec *range) {
841     unsigned char *p;
842 
843     /* Test for ranges that will always be empty. */
844     if (compareStringObjectsForLexRange(range->min,range->max) > 1 ||
845             (compareStringObjects(range->min,range->max) == 0 &&
846             (range->minex || range->maxex)))
847         return 0;
848 
849     p = ziplistIndex(zl,-2); /* Last element. */
850     if (p == NULL) return 0;
851     if (!zzlLexValueGteMin(p,range))
852         return 0;
853 
854     p = ziplistIndex(zl,0); /* First element. */
855     serverAssert(p != NULL);
856     if (!zzlLexValueLteMax(p,range))
857         return 0;
858 
859     return 1;
860 }
861 
862 /* Find pointer to the first element contained in the specified lex range.
863  * Returns NULL when no element is contained in the range. */
zzlFirstInLexRange(unsigned char * zl,zlexrangespec * range)864 unsigned char *zzlFirstInLexRange(unsigned char *zl, zlexrangespec *range) {
865     unsigned char *eptr = ziplistIndex(zl,0), *sptr;
866 
867     /* If everything is out of range, return early. */
868     if (!zzlIsInLexRange(zl,range)) return NULL;
869 
870     while (eptr != NULL) {
871         if (zzlLexValueGteMin(eptr,range)) {
872             /* Check if score <= max. */
873             if (zzlLexValueLteMax(eptr,range))
874                 return eptr;
875             return NULL;
876         }
877 
878         /* Move to next element. */
879         sptr = ziplistNext(zl,eptr); /* This element score. Skip it. */
880         serverAssert(sptr != NULL);
881         eptr = ziplistNext(zl,sptr); /* Next element. */
882     }
883 
884     return NULL;
885 }
886 
887 /* Find pointer to the last element contained in the specified lex range.
888  * Returns NULL when no element is contained in the range. */
zzlLastInLexRange(unsigned char * zl,zlexrangespec * range)889 unsigned char *zzlLastInLexRange(unsigned char *zl, zlexrangespec *range) {
890     unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
891 
892     /* If everything is out of range, return early. */
893     if (!zzlIsInLexRange(zl,range)) return NULL;
894 
895     while (eptr != NULL) {
896         if (zzlLexValueLteMax(eptr,range)) {
897             /* Check if score >= min. */
898             if (zzlLexValueGteMin(eptr,range))
899                 return eptr;
900             return NULL;
901         }
902 
903         /* Move to previous element by moving to the score of previous element.
904          * When this returns NULL, we know there also is no element. */
905         sptr = ziplistPrev(zl,eptr);
906         if (sptr != NULL)
907             serverAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
908         else
909             eptr = NULL;
910     }
911 
912     return NULL;
913 }
914 
zzlFind(unsigned char * zl,robj * ele,double * score)915 unsigned char *zzlFind(unsigned char *zl, robj *ele, double *score) {
916     unsigned char *eptr = ziplistIndex(zl,0), *sptr;
917 
918     ele = getDecodedObject(ele);
919     while (eptr != NULL) {
920         sptr = ziplistNext(zl,eptr);
921         serverAssertWithInfo(NULL,ele,sptr != NULL);
922 
923         if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) {
924             /* Matching element, pull out score. */
925             if (score != NULL) *score = zzlGetScore(sptr);
926             decrRefCount(ele);
927             return eptr;
928         }
929 
930         /* Move to next element. */
931         eptr = ziplistNext(zl,sptr);
932     }
933 
934     decrRefCount(ele);
935     return NULL;
936 }
937 
938 /* Delete (element,score) pair from ziplist. Use local copy of eptr because we
939  * don't want to modify the one given as argument. */
zzlDelete(unsigned char * zl,unsigned char * eptr)940 unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) {
941     unsigned char *p = eptr;
942 
943     /* TODO: add function to ziplist API to delete N elements from offset. */
944     zl = ziplistDelete(zl,&p);
945     zl = ziplistDelete(zl,&p);
946     return zl;
947 }
948 
zzlInsertAt(unsigned char * zl,unsigned char * eptr,robj * ele,double score)949 unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, robj *ele, double score) {
950     unsigned char *sptr;
951     char scorebuf[128];
952     int scorelen;
953     size_t offset;
954 
955     serverAssertWithInfo(NULL,ele,sdsEncodedObject(ele));
956     scorelen = d2string(scorebuf,sizeof(scorebuf),score);
957     if (eptr == NULL) {
958         zl = ziplistPush(zl,ele->ptr,sdslen(ele->ptr),ZIPLIST_TAIL);
959         zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
960     } else {
961         /* Keep offset relative to zl, as it might be re-allocated. */
962         offset = eptr-zl;
963         zl = ziplistInsert(zl,eptr,ele->ptr,sdslen(ele->ptr));
964         eptr = zl+offset;
965 
966         /* Insert score after the element. */
967         serverAssertWithInfo(NULL,ele,(sptr = ziplistNext(zl,eptr)) != NULL);
968         zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
969     }
970 
971     return zl;
972 }
973 
974 /* Insert (element,score) pair in ziplist. This function assumes the element is
975  * not yet present in the list. */
zzlInsert(unsigned char * zl,robj * ele,double score)976 unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) {
977     unsigned char *eptr = ziplistIndex(zl,0), *sptr;
978     double s;
979 
980     ele = getDecodedObject(ele);
981     while (eptr != NULL) {
982         sptr = ziplistNext(zl,eptr);
983         serverAssertWithInfo(NULL,ele,sptr != NULL);
984         s = zzlGetScore(sptr);
985 
986         if (s > score) {
987             /* First element with score larger than score for element to be
988              * inserted. This means we should take its spot in the list to
989              * maintain ordering. */
990             zl = zzlInsertAt(zl,eptr,ele,score);
991             break;
992         } else if (s == score) {
993             /* Ensure lexicographical ordering for elements. */
994             if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) {
995                 zl = zzlInsertAt(zl,eptr,ele,score);
996                 break;
997             }
998         }
999 
1000         /* Move to next element. */
1001         eptr = ziplistNext(zl,sptr);
1002     }
1003 
1004     /* Push on tail of list when it was not yet inserted. */
1005     if (eptr == NULL)
1006         zl = zzlInsertAt(zl,NULL,ele,score);
1007 
1008     decrRefCount(ele);
1009     return zl;
1010 }
1011 
zzlDeleteRangeByScore(unsigned char * zl,zrangespec * range,unsigned long * deleted)1012 unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec *range, unsigned long *deleted) {
1013     unsigned char *eptr, *sptr;
1014     double score;
1015     unsigned long num = 0;
1016 
1017     if (deleted != NULL) *deleted = 0;
1018 
1019     eptr = zzlFirstInRange(zl,range);
1020     if (eptr == NULL) return zl;
1021 
1022     /* When the tail of the ziplist is deleted, eptr will point to the sentinel
1023      * byte and ziplistNext will return NULL. */
1024     while ((sptr = ziplistNext(zl,eptr)) != NULL) {
1025         score = zzlGetScore(sptr);
1026         if (zslValueLteMax(score,range)) {
1027             /* Delete both the element and the score. */
1028             zl = ziplistDelete(zl,&eptr);
1029             zl = ziplistDelete(zl,&eptr);
1030             num++;
1031         } else {
1032             /* No longer in range. */
1033             break;
1034         }
1035     }
1036 
1037     if (deleted != NULL) *deleted = num;
1038     return zl;
1039 }
1040 
zzlDeleteRangeByLex(unsigned char * zl,zlexrangespec * range,unsigned long * deleted)1041 unsigned char *zzlDeleteRangeByLex(unsigned char *zl, zlexrangespec *range, unsigned long *deleted) {
1042     unsigned char *eptr, *sptr;
1043     unsigned long num = 0;
1044 
1045     if (deleted != NULL) *deleted = 0;
1046 
1047     eptr = zzlFirstInLexRange(zl,range);
1048     if (eptr == NULL) return zl;
1049 
1050     /* When the tail of the ziplist is deleted, eptr will point to the sentinel
1051      * byte and ziplistNext will return NULL. */
1052     while ((sptr = ziplistNext(zl,eptr)) != NULL) {
1053         if (zzlLexValueLteMax(eptr,range)) {
1054             /* Delete both the element and the score. */
1055             zl = ziplistDelete(zl,&eptr);
1056             zl = ziplistDelete(zl,&eptr);
1057             num++;
1058         } else {
1059             /* No longer in range. */
1060             break;
1061         }
1062     }
1063 
1064     if (deleted != NULL) *deleted = num;
1065     return zl;
1066 }
1067 
1068 /* Delete all the elements with rank between start and end from the skiplist.
1069  * Start and end are inclusive. Note that start and end need to be 1-based */
zzlDeleteRangeByRank(unsigned char * zl,unsigned int start,unsigned int end,unsigned long * deleted)1070 unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) {
1071     unsigned int num = (end-start)+1;
1072     if (deleted) *deleted = num;
1073     zl = ziplistDeleteRange(zl,2*(start-1),2*num);
1074     return zl;
1075 }
1076 
1077 /*-----------------------------------------------------------------------------
1078  * Common sorted set API
1079  *----------------------------------------------------------------------------*/
1080 
zsetLength(robj * zobj)1081 unsigned int zsetLength(robj *zobj) {
1082     int length = -1;
1083     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
1084         length = zzlLength(zobj->ptr);
1085     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1086         length = ((zset*)zobj->ptr)->zsl->length;
1087     } else {
1088         serverPanic("Unknown sorted set encoding");
1089     }
1090     return length;
1091 }
1092 
zsetConvert(robj * zobj,int encoding)1093 void zsetConvert(robj *zobj, int encoding) {
1094     zset *zs;
1095     zskiplistNode *node, *next;
1096     robj *ele;
1097     double score;
1098 
1099     if (zobj->encoding == encoding) return;
1100     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
1101         unsigned char *zl = zobj->ptr;
1102         unsigned char *eptr, *sptr;
1103         unsigned char *vstr;
1104         unsigned int vlen;
1105         long long vlong;
1106 
1107         if (encoding != OBJ_ENCODING_SKIPLIST)
1108             serverPanic("Unknown target encoding");
1109 
1110         zs = zmalloc(sizeof(*zs));
1111         zs->dict = dictCreate(&zsetDictType,NULL);
1112         zs->zsl = zslCreate();
1113 
1114         eptr = ziplistIndex(zl,0);
1115         serverAssertWithInfo(NULL,zobj,eptr != NULL);
1116         sptr = ziplistNext(zl,eptr);
1117         serverAssertWithInfo(NULL,zobj,sptr != NULL);
1118 
1119         while (eptr != NULL) {
1120             score = zzlGetScore(sptr);
1121             serverAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
1122             if (vstr == NULL)
1123                 ele = createStringObjectFromLongLong(vlong);
1124             else
1125                 ele = createStringObject((char*)vstr,vlen);
1126 
1127             /* Has incremented refcount since it was just created. */
1128             node = zslInsert(zs->zsl,score,ele);
1129             serverAssertWithInfo(NULL,zobj,dictAdd(zs->dict,ele,&node->score) == DICT_OK);
1130             incrRefCount(ele); /* Added to dictionary. */
1131             zzlNext(zl,&eptr,&sptr);
1132         }
1133 
1134         zfree(zobj->ptr);
1135         zobj->ptr = zs;
1136         zobj->encoding = OBJ_ENCODING_SKIPLIST;
1137     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1138         unsigned char *zl = ziplistNew();
1139 
1140         if (encoding != OBJ_ENCODING_ZIPLIST)
1141             serverPanic("Unknown target encoding");
1142 
1143         /* Approach similar to zslFree(), since we want to free the skiplist at
1144          * the same time as creating the ziplist. */
1145         zs = zobj->ptr;
1146         dictRelease(zs->dict);
1147         node = zs->zsl->header->level[0].forward;
1148         zfree(zs->zsl->header);
1149         zfree(zs->zsl);
1150 
1151         while (node) {
1152             ele = getDecodedObject(node->obj);
1153             zl = zzlInsertAt(zl,NULL,ele,node->score);
1154             decrRefCount(ele);
1155 
1156             next = node->level[0].forward;
1157             zslFreeNode(node);
1158             node = next;
1159         }
1160 
1161         zfree(zs);
1162         zobj->ptr = zl;
1163         zobj->encoding = OBJ_ENCODING_ZIPLIST;
1164     } else {
1165         serverPanic("Unknown sorted set encoding");
1166     }
1167 }
1168 
1169 /* Convert the sorted set object into a ziplist if it is not already a ziplist
1170  * and if the number of elements and the maximum element size is within the
1171  * expected ranges. */
zsetConvertToZiplistIfNeeded(robj * zobj,size_t maxelelen)1172 void zsetConvertToZiplistIfNeeded(robj *zobj, size_t maxelelen) {
1173     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) return;
1174     zset *zset = zobj->ptr;
1175 
1176     if (zset->zsl->length <= server.zset_max_ziplist_entries &&
1177         maxelelen <= server.zset_max_ziplist_value)
1178             zsetConvert(zobj,OBJ_ENCODING_ZIPLIST);
1179 }
1180 
1181 /* Return (by reference) the score of the specified member of the sorted set
1182  * storing it into *score. If the element does not exist C_ERR is returned
1183  * otherwise C_OK is returned and *score is correctly populated.
1184  * If 'zobj' or 'member' is NULL, C_ERR is returned. */
zsetScore(robj * zobj,robj * member,double * score)1185 int zsetScore(robj *zobj, robj *member, double *score) {
1186     if (!zobj || !member) return C_ERR;
1187 
1188     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
1189         if (zzlFind(zobj->ptr, member, score) == NULL) return C_ERR;
1190     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1191         zset *zs = zobj->ptr;
1192         dictEntry *de = dictFind(zs->dict, member);
1193         if (de == NULL) return C_ERR;
1194         *score = *(double*)dictGetVal(de);
1195     } else {
1196         serverPanic("Unknown sorted set encoding");
1197     }
1198     return C_OK;
1199 }
1200 
1201 /*-----------------------------------------------------------------------------
1202  * Sorted set commands
1203  *----------------------------------------------------------------------------*/
1204 
1205 /* This generic command implements both ZADD and ZINCRBY. */
1206 #define ZADD_NONE 0
1207 #define ZADD_INCR (1<<0)    /* Increment the score instead of setting it. */
1208 #define ZADD_NX (1<<1)      /* Don't touch elements not already existing. */
1209 #define ZADD_XX (1<<2)      /* Only touch elements already exisitng. */
1210 #define ZADD_CH (1<<3)      /* Return num of elements added or updated. */
zaddGenericCommand(client * c,int flags)1211 void zaddGenericCommand(client *c, int flags) {
1212     static char *nanerr = "resulting score is not a number (NaN)";
1213     robj *key = c->argv[1];
1214     robj *ele;
1215     robj *zobj;
1216     robj *curobj;
1217     double score = 0, *scores = NULL, curscore = 0.0;
1218     int j, elements;
1219     int scoreidx = 0;
1220     /* The following vars are used in order to track what the command actually
1221      * did during the execution, to reply to the client and to trigger the
1222      * notification of keyspace change. */
1223     int added = 0;      /* Number of new elements added. */
1224     int updated = 0;    /* Number of elements with updated score. */
1225     int processed = 0;  /* Number of elements processed, may remain zero with
1226                            options like XX. */
1227 
1228     /* Parse options. At the end 'scoreidx' is set to the argument position
1229      * of the score of the first score-element pair. */
1230     scoreidx = 2;
1231     while(scoreidx < c->argc) {
1232         char *opt = c->argv[scoreidx]->ptr;
1233         if (!strcasecmp(opt,"nx")) flags |= ZADD_NX;
1234         else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX;
1235         else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH;
1236         else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR;
1237         else break;
1238         scoreidx++;
1239     }
1240 
1241     /* Turn options into simple to check vars. */
1242     int incr = (flags & ZADD_INCR) != 0;
1243     int nx = (flags & ZADD_NX) != 0;
1244     int xx = (flags & ZADD_XX) != 0;
1245     int ch = (flags & ZADD_CH) != 0;
1246 
1247     /* After the options, we expect to have an even number of args, since
1248      * we expect any number of score-element pairs. */
1249     elements = c->argc-scoreidx;
1250     if (elements % 2) {
1251         addReply(c,shared.syntaxerr);
1252         return;
1253     }
1254     elements /= 2; /* Now this holds the number of score-element pairs. */
1255 
1256     /* Check for incompatible options. */
1257     if (nx && xx) {
1258         addReplyError(c,
1259             "XX and NX options at the same time are not compatible");
1260         return;
1261     }
1262 
1263     if (incr && elements > 1) {
1264         addReplyError(c,
1265             "INCR option supports a single increment-element pair");
1266         return;
1267     }
1268 
1269     /* Start parsing all the scores, we need to emit any syntax error
1270      * before executing additions to the sorted set, as the command should
1271      * either execute fully or nothing at all. */
1272     scores = zmalloc(sizeof(double)*elements);
1273     for (j = 0; j < elements; j++) {
1274         if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
1275             != C_OK) goto cleanup;
1276     }
1277 
1278     /* Lookup the key and create the sorted set if does not exist. */
1279     zobj = lookupKeyWrite(c->db,key);
1280     if (zobj == NULL) {
1281         if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
1282         if (server.zset_max_ziplist_entries == 0 ||
1283             server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
1284         {
1285             zobj = createZsetObject();
1286         } else {
1287             zobj = createZsetZiplistObject();
1288         }
1289         dbAdd(c->db,key,zobj);
1290     } else {
1291         if (zobj->type != OBJ_ZSET) {
1292             addReply(c,shared.wrongtypeerr);
1293             goto cleanup;
1294         }
1295     }
1296 
1297     for (j = 0; j < elements; j++) {
1298         score = scores[j];
1299 
1300         if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
1301             unsigned char *eptr;
1302 
1303             /* Prefer non-encoded element when dealing with ziplists. */
1304             ele = c->argv[scoreidx+1+j*2];
1305             if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
1306                 if (nx) continue;
1307                 if (incr) {
1308                     score += curscore;
1309                     if (isnan(score)) {
1310                         addReplyError(c,nanerr);
1311                         goto cleanup;
1312                     }
1313                 }
1314 
1315                 /* Remove and re-insert when score changed. */
1316                 if (score != curscore) {
1317                     zobj->ptr = zzlDelete(zobj->ptr,eptr);
1318                     zobj->ptr = zzlInsert(zobj->ptr,ele,score);
1319                     server.dirty++;
1320                     updated++;
1321                 }
1322                 processed++;
1323             } else if (!xx) {
1324                 /* Optimize: check if the element is too large or the list
1325                  * becomes too long *before* executing zzlInsert. */
1326                 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
1327                 if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
1328                     zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
1329                 if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
1330                     zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
1331                 server.dirty++;
1332                 added++;
1333                 processed++;
1334             }
1335         } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1336             zset *zs = zobj->ptr;
1337             zskiplistNode *znode;
1338             dictEntry *de;
1339 
1340             ele = c->argv[scoreidx+1+j*2] =
1341                 tryObjectEncoding(c->argv[scoreidx+1+j*2]);
1342             de = dictFind(zs->dict,ele);
1343             if (de != NULL) {
1344                 if (nx) continue;
1345                 curobj = dictGetKey(de);
1346                 curscore = *(double*)dictGetVal(de);
1347 
1348                 if (incr) {
1349                     score += curscore;
1350                     if (isnan(score)) {
1351                         addReplyError(c,nanerr);
1352                         /* Don't need to check if the sorted set is empty
1353                          * because we know it has at least one element. */
1354                         goto cleanup;
1355                     }
1356                 }
1357 
1358                 /* Remove and re-insert when score changed. We can safely
1359                  * delete the key object from the skiplist, since the
1360                  * dictionary still has a reference to it. */
1361                 if (score != curscore) {
1362                     serverAssertWithInfo(c,curobj,zslDelete(zs->zsl,curscore,curobj));
1363                     znode = zslInsert(zs->zsl,score,curobj);
1364                     incrRefCount(curobj); /* Re-inserted in skiplist. */
1365                     dictGetVal(de) = &znode->score; /* Update score ptr. */
1366                     server.dirty++;
1367                     updated++;
1368                 }
1369                 processed++;
1370             } else if (!xx) {
1371                 znode = zslInsert(zs->zsl,score,ele);
1372                 incrRefCount(ele); /* Inserted in skiplist. */
1373                 serverAssertWithInfo(c,NULL,dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
1374                 incrRefCount(ele); /* Added to dictionary. */
1375                 server.dirty++;
1376                 added++;
1377                 processed++;
1378             }
1379         } else {
1380             serverPanic("Unknown sorted set encoding");
1381         }
1382     }
1383 
1384 reply_to_client:
1385     if (incr) { /* ZINCRBY or INCR option. */
1386         if (processed)
1387             addReplyDouble(c,score);
1388         else
1389             addReply(c,shared.nullbulk);
1390     } else { /* ZADD. */
1391         addReplyLongLong(c,ch ? added+updated : added);
1392     }
1393 
1394 cleanup:
1395     zfree(scores);
1396     if (added || updated) {
1397         signalModifiedKey(c->db,key);
1398         notifyKeyspaceEvent(NOTIFY_ZSET,
1399             incr ? "zincr" : "zadd", key, c->db->id);
1400     }
1401 }
1402 
zaddCommand(client * c)1403 void zaddCommand(client *c) {
1404     zaddGenericCommand(c,ZADD_NONE);
1405 }
1406 
zincrbyCommand(client * c)1407 void zincrbyCommand(client *c) {
1408     zaddGenericCommand(c,ZADD_INCR);
1409 }
1410 
zremCommand(client * c)1411 void zremCommand(client *c) {
1412     robj *key = c->argv[1];
1413     robj *zobj;
1414     int deleted = 0, keyremoved = 0, j;
1415 
1416     if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1417         checkType(c,zobj,OBJ_ZSET)) return;
1418 
1419     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
1420         unsigned char *eptr;
1421 
1422         for (j = 2; j < c->argc; j++) {
1423             if ((eptr = zzlFind(zobj->ptr,c->argv[j],NULL)) != NULL) {
1424                 deleted++;
1425                 zobj->ptr = zzlDelete(zobj->ptr,eptr);
1426                 if (zzlLength(zobj->ptr) == 0) {
1427                     dbDelete(c->db,key);
1428                     keyremoved = 1;
1429                     break;
1430                 }
1431             }
1432         }
1433     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1434         zset *zs = zobj->ptr;
1435         dictEntry *de;
1436         double score;
1437 
1438         for (j = 2; j < c->argc; j++) {
1439             de = dictFind(zs->dict,c->argv[j]);
1440             if (de != NULL) {
1441                 deleted++;
1442 
1443                 /* Delete from the skiplist */
1444                 score = *(double*)dictGetVal(de);
1445                 serverAssertWithInfo(c,c->argv[j],zslDelete(zs->zsl,score,c->argv[j]));
1446 
1447                 /* Delete from the hash table */
1448                 dictDelete(zs->dict,c->argv[j]);
1449                 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1450                 if (dictSize(zs->dict) == 0) {
1451                     dbDelete(c->db,key);
1452                     keyremoved = 1;
1453                     break;
1454                 }
1455             }
1456         }
1457     } else {
1458         serverPanic("Unknown sorted set encoding");
1459     }
1460 
1461     if (deleted) {
1462         notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id);
1463         if (keyremoved)
1464             notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
1465         signalModifiedKey(c->db,key);
1466         server.dirty += deleted;
1467     }
1468     addReplyLongLong(c,deleted);
1469 }
1470 
1471 /* Implements ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZREMRANGEBYLEX commands. */
1472 #define ZRANGE_RANK 0
1473 #define ZRANGE_SCORE 1
1474 #define ZRANGE_LEX 2
zremrangeGenericCommand(client * c,int rangetype)1475 void zremrangeGenericCommand(client *c, int rangetype) {
1476     robj *key = c->argv[1];
1477     robj *zobj;
1478     int keyremoved = 0;
1479     unsigned long deleted = 0;
1480     zrangespec range;
1481     zlexrangespec lexrange;
1482     long start, end, llen;
1483 
1484     /* Step 1: Parse the range. */
1485     if (rangetype == ZRANGE_RANK) {
1486         if ((getLongFromObjectOrReply(c,c->argv[2],&start,NULL) != C_OK) ||
1487             (getLongFromObjectOrReply(c,c->argv[3],&end,NULL) != C_OK))
1488             return;
1489     } else if (rangetype == ZRANGE_SCORE) {
1490         if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK) {
1491             addReplyError(c,"min or max is not a float");
1492             return;
1493         }
1494     } else if (rangetype == ZRANGE_LEX) {
1495         if (zslParseLexRange(c->argv[2],c->argv[3],&lexrange) != C_OK) {
1496             addReplyError(c,"min or max not valid string range item");
1497             return;
1498         }
1499     }
1500 
1501     /* Step 2: Lookup & range sanity checks if needed. */
1502     if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1503         checkType(c,zobj,OBJ_ZSET)) goto cleanup;
1504 
1505     if (rangetype == ZRANGE_RANK) {
1506         /* Sanitize indexes. */
1507         llen = zsetLength(zobj);
1508         if (start < 0) start = llen+start;
1509         if (end < 0) end = llen+end;
1510         if (start < 0) start = 0;
1511 
1512         /* Invariant: start >= 0, so this test will be true when end < 0.
1513          * The range is empty when start > end or start >= length. */
1514         if (start > end || start >= llen) {
1515             addReply(c,shared.czero);
1516             goto cleanup;
1517         }
1518         if (end >= llen) end = llen-1;
1519     }
1520 
1521     /* Step 3: Perform the range deletion operation. */
1522     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
1523         switch(rangetype) {
1524         case ZRANGE_RANK:
1525             zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);
1526             break;
1527         case ZRANGE_SCORE:
1528             zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,&range,&deleted);
1529             break;
1530         case ZRANGE_LEX:
1531             zobj->ptr = zzlDeleteRangeByLex(zobj->ptr,&lexrange,&deleted);
1532             break;
1533         }
1534         if (zzlLength(zobj->ptr) == 0) {
1535             dbDelete(c->db,key);
1536             keyremoved = 1;
1537         }
1538     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1539         zset *zs = zobj->ptr;
1540         switch(rangetype) {
1541         case ZRANGE_RANK:
1542             deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
1543             break;
1544         case ZRANGE_SCORE:
1545             deleted = zslDeleteRangeByScore(zs->zsl,&range,zs->dict);
1546             break;
1547         case ZRANGE_LEX:
1548             deleted = zslDeleteRangeByLex(zs->zsl,&lexrange,zs->dict);
1549             break;
1550         }
1551         if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1552         if (dictSize(zs->dict) == 0) {
1553             dbDelete(c->db,key);
1554             keyremoved = 1;
1555         }
1556     } else {
1557         serverPanic("Unknown sorted set encoding");
1558     }
1559 
1560     /* Step 4: Notifications and reply. */
1561     if (deleted) {
1562         char *event[3] = {"zremrangebyrank","zremrangebyscore","zremrangebylex"};
1563         signalModifiedKey(c->db,key);
1564         notifyKeyspaceEvent(NOTIFY_ZSET,event[rangetype],key,c->db->id);
1565         if (keyremoved)
1566             notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
1567     }
1568     server.dirty += deleted;
1569     addReplyLongLong(c,deleted);
1570 
1571 cleanup:
1572     if (rangetype == ZRANGE_LEX) zslFreeLexRange(&lexrange);
1573 }
1574 
zremrangebyrankCommand(client * c)1575 void zremrangebyrankCommand(client *c) {
1576     zremrangeGenericCommand(c,ZRANGE_RANK);
1577 }
1578 
zremrangebyscoreCommand(client * c)1579 void zremrangebyscoreCommand(client *c) {
1580     zremrangeGenericCommand(c,ZRANGE_SCORE);
1581 }
1582 
zremrangebylexCommand(client * c)1583 void zremrangebylexCommand(client *c) {
1584     zremrangeGenericCommand(c,ZRANGE_LEX);
1585 }
1586 
1587 typedef struct {
1588     robj *subject;
1589     int type; /* Set, sorted set */
1590     int encoding;
1591     double weight;
1592 
1593     union {
1594         /* Set iterators. */
1595         union _iterset {
1596             struct {
1597                 intset *is;
1598                 int ii;
1599             } is;
1600             struct {
1601                 dict *dict;
1602                 dictIterator *di;
1603                 dictEntry *de;
1604             } ht;
1605         } set;
1606 
1607         /* Sorted set iterators. */
1608         union _iterzset {
1609             struct {
1610                 unsigned char *zl;
1611                 unsigned char *eptr, *sptr;
1612             } zl;
1613             struct {
1614                 zset *zs;
1615                 zskiplistNode *node;
1616             } sl;
1617         } zset;
1618     } iter;
1619 } zsetopsrc;
1620 
1621 
1622 /* Use dirty flags for pointers that need to be cleaned up in the next
1623  * iteration over the zsetopval. The dirty flag for the long long value is
1624  * special, since long long values don't need cleanup. Instead, it means that
1625  * we already checked that "ell" holds a long long, or tried to convert another
1626  * representation into a long long value. When this was successful,
1627  * OPVAL_VALID_LL is set as well. */
1628 #define OPVAL_DIRTY_ROBJ 1
1629 #define OPVAL_DIRTY_LL 2
1630 #define OPVAL_VALID_LL 4
1631 
1632 /* Store value retrieved from the iterator. */
1633 typedef struct {
1634     int flags;
1635     unsigned char _buf[32]; /* Private buffer. */
1636     robj *ele;
1637     unsigned char *estr;
1638     unsigned int elen;
1639     long long ell;
1640     double score;
1641 } zsetopval;
1642 
1643 typedef union _iterset iterset;
1644 typedef union _iterzset iterzset;
1645 
zuiInitIterator(zsetopsrc * op)1646 void zuiInitIterator(zsetopsrc *op) {
1647     if (op->subject == NULL)
1648         return;
1649 
1650     if (op->type == OBJ_SET) {
1651         iterset *it = &op->iter.set;
1652         if (op->encoding == OBJ_ENCODING_INTSET) {
1653             it->is.is = op->subject->ptr;
1654             it->is.ii = 0;
1655         } else if (op->encoding == OBJ_ENCODING_HT) {
1656             it->ht.dict = op->subject->ptr;
1657             it->ht.di = dictGetIterator(op->subject->ptr);
1658             it->ht.de = dictNext(it->ht.di);
1659         } else {
1660             serverPanic("Unknown set encoding");
1661         }
1662     } else if (op->type == OBJ_ZSET) {
1663         iterzset *it = &op->iter.zset;
1664         if (op->encoding == OBJ_ENCODING_ZIPLIST) {
1665             it->zl.zl = op->subject->ptr;
1666             it->zl.eptr = ziplistIndex(it->zl.zl,0);
1667             if (it->zl.eptr != NULL) {
1668                 it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr);
1669                 serverAssert(it->zl.sptr != NULL);
1670             }
1671         } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
1672             it->sl.zs = op->subject->ptr;
1673             it->sl.node = it->sl.zs->zsl->header->level[0].forward;
1674         } else {
1675             serverPanic("Unknown sorted set encoding");
1676         }
1677     } else {
1678         serverPanic("Unsupported type");
1679     }
1680 }
1681 
zuiClearIterator(zsetopsrc * op)1682 void zuiClearIterator(zsetopsrc *op) {
1683     if (op->subject == NULL)
1684         return;
1685 
1686     if (op->type == OBJ_SET) {
1687         iterset *it = &op->iter.set;
1688         if (op->encoding == OBJ_ENCODING_INTSET) {
1689             UNUSED(it); /* skip */
1690         } else if (op->encoding == OBJ_ENCODING_HT) {
1691             dictReleaseIterator(it->ht.di);
1692         } else {
1693             serverPanic("Unknown set encoding");
1694         }
1695     } else if (op->type == OBJ_ZSET) {
1696         iterzset *it = &op->iter.zset;
1697         if (op->encoding == OBJ_ENCODING_ZIPLIST) {
1698             UNUSED(it); /* skip */
1699         } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
1700             UNUSED(it); /* skip */
1701         } else {
1702             serverPanic("Unknown sorted set encoding");
1703         }
1704     } else {
1705         serverPanic("Unsupported type");
1706     }
1707 }
1708 
zuiLength(zsetopsrc * op)1709 int zuiLength(zsetopsrc *op) {
1710     if (op->subject == NULL)
1711         return 0;
1712 
1713     if (op->type == OBJ_SET) {
1714         if (op->encoding == OBJ_ENCODING_INTSET) {
1715             return intsetLen(op->subject->ptr);
1716         } else if (op->encoding == OBJ_ENCODING_HT) {
1717             dict *ht = op->subject->ptr;
1718             return dictSize(ht);
1719         } else {
1720             serverPanic("Unknown set encoding");
1721         }
1722     } else if (op->type == OBJ_ZSET) {
1723         if (op->encoding == OBJ_ENCODING_ZIPLIST) {
1724             return zzlLength(op->subject->ptr);
1725         } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
1726             zset *zs = op->subject->ptr;
1727             return zs->zsl->length;
1728         } else {
1729             serverPanic("Unknown sorted set encoding");
1730         }
1731     } else {
1732         serverPanic("Unsupported type");
1733     }
1734 }
1735 
1736 /* Check if the current value is valid. If so, store it in the passed structure
1737  * and move to the next element. If not valid, this means we have reached the
1738  * end of the structure and can abort. */
zuiNext(zsetopsrc * op,zsetopval * val)1739 int zuiNext(zsetopsrc *op, zsetopval *val) {
1740     if (op->subject == NULL)
1741         return 0;
1742 
1743     if (val->flags & OPVAL_DIRTY_ROBJ)
1744         decrRefCount(val->ele);
1745 
1746     memset(val,0,sizeof(zsetopval));
1747 
1748     if (op->type == OBJ_SET) {
1749         iterset *it = &op->iter.set;
1750         if (op->encoding == OBJ_ENCODING_INTSET) {
1751             int64_t ell;
1752 
1753             if (!intsetGet(it->is.is,it->is.ii,&ell))
1754                 return 0;
1755             val->ell = ell;
1756             val->score = 1.0;
1757 
1758             /* Move to next element. */
1759             it->is.ii++;
1760         } else if (op->encoding == OBJ_ENCODING_HT) {
1761             if (it->ht.de == NULL)
1762                 return 0;
1763             val->ele = dictGetKey(it->ht.de);
1764             val->score = 1.0;
1765 
1766             /* Move to next element. */
1767             it->ht.de = dictNext(it->ht.di);
1768         } else {
1769             serverPanic("Unknown set encoding");
1770         }
1771     } else if (op->type == OBJ_ZSET) {
1772         iterzset *it = &op->iter.zset;
1773         if (op->encoding == OBJ_ENCODING_ZIPLIST) {
1774             /* No need to check both, but better be explicit. */
1775             if (it->zl.eptr == NULL || it->zl.sptr == NULL)
1776                 return 0;
1777             serverAssert(ziplistGet(it->zl.eptr,&val->estr,&val->elen,&val->ell));
1778             val->score = zzlGetScore(it->zl.sptr);
1779 
1780             /* Move to next element. */
1781             zzlNext(it->zl.zl,&it->zl.eptr,&it->zl.sptr);
1782         } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
1783             if (it->sl.node == NULL)
1784                 return 0;
1785             val->ele = it->sl.node->obj;
1786             val->score = it->sl.node->score;
1787 
1788             /* Move to next element. */
1789             it->sl.node = it->sl.node->level[0].forward;
1790         } else {
1791             serverPanic("Unknown sorted set encoding");
1792         }
1793     } else {
1794         serverPanic("Unsupported type");
1795     }
1796     return 1;
1797 }
1798 
zuiLongLongFromValue(zsetopval * val)1799 int zuiLongLongFromValue(zsetopval *val) {
1800     if (!(val->flags & OPVAL_DIRTY_LL)) {
1801         val->flags |= OPVAL_DIRTY_LL;
1802 
1803         if (val->ele != NULL) {
1804             if (val->ele->encoding == OBJ_ENCODING_INT) {
1805                 val->ell = (long)val->ele->ptr;
1806                 val->flags |= OPVAL_VALID_LL;
1807             } else if (sdsEncodedObject(val->ele)) {
1808                 if (string2ll(val->ele->ptr,sdslen(val->ele->ptr),&val->ell))
1809                     val->flags |= OPVAL_VALID_LL;
1810             } else {
1811                 serverPanic("Unsupported element encoding");
1812             }
1813         } else if (val->estr != NULL) {
1814             if (string2ll((char*)val->estr,val->elen,&val->ell))
1815                 val->flags |= OPVAL_VALID_LL;
1816         } else {
1817             /* The long long was already set, flag as valid. */
1818             val->flags |= OPVAL_VALID_LL;
1819         }
1820     }
1821     return val->flags & OPVAL_VALID_LL;
1822 }
1823 
zuiObjectFromValue(zsetopval * val)1824 robj *zuiObjectFromValue(zsetopval *val) {
1825     if (val->ele == NULL) {
1826         if (val->estr != NULL) {
1827             val->ele = createStringObject((char*)val->estr,val->elen);
1828         } else {
1829             val->ele = createStringObjectFromLongLong(val->ell);
1830         }
1831         val->flags |= OPVAL_DIRTY_ROBJ;
1832     }
1833     return val->ele;
1834 }
1835 
zuiBufferFromValue(zsetopval * val)1836 int zuiBufferFromValue(zsetopval *val) {
1837     if (val->estr == NULL) {
1838         if (val->ele != NULL) {
1839             if (val->ele->encoding == OBJ_ENCODING_INT) {
1840                 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),(long)val->ele->ptr);
1841                 val->estr = val->_buf;
1842             } else if (sdsEncodedObject(val->ele)) {
1843                 val->elen = sdslen(val->ele->ptr);
1844                 val->estr = val->ele->ptr;
1845             } else {
1846                 serverPanic("Unsupported element encoding");
1847             }
1848         } else {
1849             val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell);
1850             val->estr = val->_buf;
1851         }
1852     }
1853     return 1;
1854 }
1855 
1856 /* Find value pointed to by val in the source pointer to by op. When found,
1857  * return 1 and store its score in target. Return 0 otherwise. */
zuiFind(zsetopsrc * op,zsetopval * val,double * score)1858 int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
1859     if (op->subject == NULL)
1860         return 0;
1861 
1862     if (op->type == OBJ_SET) {
1863         if (op->encoding == OBJ_ENCODING_INTSET) {
1864             if (zuiLongLongFromValue(val) &&
1865                 intsetFind(op->subject->ptr,val->ell))
1866             {
1867                 *score = 1.0;
1868                 return 1;
1869             } else {
1870                 return 0;
1871             }
1872         } else if (op->encoding == OBJ_ENCODING_HT) {
1873             dict *ht = op->subject->ptr;
1874             zuiObjectFromValue(val);
1875             if (dictFind(ht,val->ele) != NULL) {
1876                 *score = 1.0;
1877                 return 1;
1878             } else {
1879                 return 0;
1880             }
1881         } else {
1882             serverPanic("Unknown set encoding");
1883         }
1884     } else if (op->type == OBJ_ZSET) {
1885         zuiObjectFromValue(val);
1886 
1887         if (op->encoding == OBJ_ENCODING_ZIPLIST) {
1888             if (zzlFind(op->subject->ptr,val->ele,score) != NULL) {
1889                 /* Score is already set by zzlFind. */
1890                 return 1;
1891             } else {
1892                 return 0;
1893             }
1894         } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
1895             zset *zs = op->subject->ptr;
1896             dictEntry *de;
1897             if ((de = dictFind(zs->dict,val->ele)) != NULL) {
1898                 *score = *(double*)dictGetVal(de);
1899                 return 1;
1900             } else {
1901                 return 0;
1902             }
1903         } else {
1904             serverPanic("Unknown sorted set encoding");
1905         }
1906     } else {
1907         serverPanic("Unsupported type");
1908     }
1909 }
1910 
zuiCompareByCardinality(const void * s1,const void * s2)1911 int zuiCompareByCardinality(const void *s1, const void *s2) {
1912     return zuiLength((zsetopsrc*)s1) - zuiLength((zsetopsrc*)s2);
1913 }
1914 
1915 #define REDIS_AGGR_SUM 1
1916 #define REDIS_AGGR_MIN 2
1917 #define REDIS_AGGR_MAX 3
1918 #define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e))
1919 
zunionInterAggregate(double * target,double val,int aggregate)1920 inline static void zunionInterAggregate(double *target, double val, int aggregate) {
1921     if (aggregate == REDIS_AGGR_SUM) {
1922         *target = *target + val;
1923         /* The result of adding two doubles is NaN when one variable
1924          * is +inf and the other is -inf. When these numbers are added,
1925          * we maintain the convention of the result being 0.0. */
1926         if (isnan(*target)) *target = 0.0;
1927     } else if (aggregate == REDIS_AGGR_MIN) {
1928         *target = val < *target ? val : *target;
1929     } else if (aggregate == REDIS_AGGR_MAX) {
1930         *target = val > *target ? val : *target;
1931     } else {
1932         /* safety net */
1933         serverPanic("Unknown ZUNION/INTER aggregate type");
1934     }
1935 }
1936 
zunionInterGenericCommand(client * c,robj * dstkey,int op)1937 void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
1938     int i, j;
1939     long setnum;
1940     int aggregate = REDIS_AGGR_SUM;
1941     zsetopsrc *src;
1942     zsetopval zval;
1943     robj *tmp;
1944     unsigned int maxelelen = 0;
1945     robj *dstobj;
1946     zset *dstzset;
1947     zskiplistNode *znode;
1948     int touched = 0;
1949 
1950     /* expect setnum input keys to be given */
1951     if ((getLongFromObjectOrReply(c, c->argv[2], &setnum, NULL) != C_OK))
1952         return;
1953 
1954     if (setnum < 1) {
1955         addReplyError(c,
1956             "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
1957         return;
1958     }
1959 
1960     /* test if the expected number of keys would overflow */
1961     if (setnum > c->argc-3) {
1962         addReply(c,shared.syntaxerr);
1963         return;
1964     }
1965 
1966     /* read keys to be used for input */
1967     src = zcalloc(sizeof(zsetopsrc) * setnum);
1968     for (i = 0, j = 3; i < setnum; i++, j++) {
1969         robj *obj = lookupKeyWrite(c->db,c->argv[j]);
1970         if (obj != NULL) {
1971             if (obj->type != OBJ_ZSET && obj->type != OBJ_SET) {
1972                 zfree(src);
1973                 addReply(c,shared.wrongtypeerr);
1974                 return;
1975             }
1976 
1977             src[i].subject = obj;
1978             src[i].type = obj->type;
1979             src[i].encoding = obj->encoding;
1980         } else {
1981             src[i].subject = NULL;
1982         }
1983 
1984         /* Default all weights to 1. */
1985         src[i].weight = 1.0;
1986     }
1987 
1988     /* parse optional extra arguments */
1989     if (j < c->argc) {
1990         int remaining = c->argc - j;
1991 
1992         while (remaining) {
1993             if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
1994                 j++; remaining--;
1995                 for (i = 0; i < setnum; i++, j++, remaining--) {
1996                     if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
1997                             "weight value is not a float") != C_OK)
1998                     {
1999                         zfree(src);
2000                         return;
2001                     }
2002                 }
2003             } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
2004                 j++; remaining--;
2005                 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
2006                     aggregate = REDIS_AGGR_SUM;
2007                 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
2008                     aggregate = REDIS_AGGR_MIN;
2009                 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
2010                     aggregate = REDIS_AGGR_MAX;
2011                 } else {
2012                     zfree(src);
2013                     addReply(c,shared.syntaxerr);
2014                     return;
2015                 }
2016                 j++; remaining--;
2017             } else {
2018                 zfree(src);
2019                 addReply(c,shared.syntaxerr);
2020                 return;
2021             }
2022         }
2023     }
2024 
2025     /* sort sets from the smallest to largest, this will improve our
2026      * algorithm's performance */
2027     qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
2028 
2029     dstobj = createZsetObject();
2030     dstzset = dstobj->ptr;
2031     memset(&zval, 0, sizeof(zval));
2032 
2033     if (op == SET_OP_INTER) {
2034         /* Skip everything if the smallest input is empty. */
2035         if (zuiLength(&src[0]) > 0) {
2036             /* Precondition: as src[0] is non-empty and the inputs are ordered
2037              * by size, all src[i > 0] are non-empty too. */
2038             zuiInitIterator(&src[0]);
2039             while (zuiNext(&src[0],&zval)) {
2040                 double score, value;
2041 
2042                 score = src[0].weight * zval.score;
2043                 if (isnan(score)) score = 0;
2044 
2045                 for (j = 1; j < setnum; j++) {
2046                     /* It is not safe to access the zset we are
2047                      * iterating, so explicitly check for equal object. */
2048                     if (src[j].subject == src[0].subject) {
2049                         value = zval.score*src[j].weight;
2050                         zunionInterAggregate(&score,value,aggregate);
2051                     } else if (zuiFind(&src[j],&zval,&value)) {
2052                         value *= src[j].weight;
2053                         zunionInterAggregate(&score,value,aggregate);
2054                     } else {
2055                         break;
2056                     }
2057                 }
2058 
2059                 /* Only continue when present in every input. */
2060                 if (j == setnum) {
2061                     tmp = zuiObjectFromValue(&zval);
2062                     znode = zslInsert(dstzset->zsl,score,tmp);
2063                     incrRefCount(tmp); /* added to skiplist */
2064                     dictAdd(dstzset->dict,tmp,&znode->score);
2065                     incrRefCount(tmp); /* added to dictionary */
2066 
2067                     if (sdsEncodedObject(tmp)) {
2068                         if (sdslen(tmp->ptr) > maxelelen)
2069                             maxelelen = sdslen(tmp->ptr);
2070                     }
2071                 }
2072             }
2073             zuiClearIterator(&src[0]);
2074         }
2075     } else if (op == SET_OP_UNION) {
2076         dict *accumulator = dictCreate(&setDictType,NULL);
2077         dictIterator *di;
2078         dictEntry *de;
2079         double score;
2080 
2081         if (setnum) {
2082             /* Our union is at least as large as the largest set.
2083              * Resize the dictionary ASAP to avoid useless rehashing. */
2084             dictExpand(accumulator,zuiLength(&src[setnum-1]));
2085         }
2086 
2087         /* Step 1: Create a dictionary of elements -> aggregated-scores
2088          * by iterating one sorted set after the other. */
2089         for (i = 0; i < setnum; i++) {
2090             if (zuiLength(&src[i]) == 0) continue;
2091 
2092             zuiInitIterator(&src[i]);
2093             while (zuiNext(&src[i],&zval)) {
2094                 /* Initialize value */
2095                 score = src[i].weight * zval.score;
2096                 if (isnan(score)) score = 0;
2097 
2098                 /* Search for this element in the accumulating dictionary. */
2099                 de = dictFind(accumulator,zuiObjectFromValue(&zval));
2100                 /* If we don't have it, we need to create a new entry. */
2101                 if (de == NULL) {
2102                     tmp = zuiObjectFromValue(&zval);
2103                     /* Remember the longest single element encountered,
2104                      * to understand if it's possible to convert to ziplist
2105                      * at the end. */
2106                     if (sdsEncodedObject(tmp)) {
2107                         if (sdslen(tmp->ptr) > maxelelen)
2108                             maxelelen = sdslen(tmp->ptr);
2109                     }
2110                     /* Add the element with its initial score. */
2111                     de = dictAddRaw(accumulator,tmp);
2112                     incrRefCount(tmp);
2113                     dictSetDoubleVal(de,score);
2114                 } else {
2115                     /* Update the score with the score of the new instance
2116                      * of the element found in the current sorted set.
2117                      *
2118                      * Here we access directly the dictEntry double
2119                      * value inside the union as it is a big speedup
2120                      * compared to using the getDouble/setDouble API. */
2121                     zunionInterAggregate(&de->v.d,score,aggregate);
2122                 }
2123             }
2124             zuiClearIterator(&src[i]);
2125         }
2126 
2127         /* Step 2: convert the dictionary into the final sorted set. */
2128         di = dictGetIterator(accumulator);
2129 
2130         /* We now are aware of the final size of the resulting sorted set,
2131          * let's resize the dictionary embedded inside the sorted set to the
2132          * right size, in order to save rehashing time. */
2133         dictExpand(dstzset->dict,dictSize(accumulator));
2134 
2135         while((de = dictNext(di)) != NULL) {
2136             robj *ele = dictGetKey(de);
2137             score = dictGetDoubleVal(de);
2138             znode = zslInsert(dstzset->zsl,score,ele);
2139             incrRefCount(ele); /* added to skiplist */
2140             dictAdd(dstzset->dict,ele,&znode->score);
2141             incrRefCount(ele); /* added to dictionary */
2142         }
2143         dictReleaseIterator(di);
2144 
2145         /* We can free the accumulator dictionary now. */
2146         dictRelease(accumulator);
2147     } else {
2148         serverPanic("Unknown operator");
2149     }
2150 
2151     if (dbDelete(c->db,dstkey))
2152         touched = 1;
2153     if (dstzset->zsl->length) {
2154         zsetConvertToZiplistIfNeeded(dstobj,maxelelen);
2155         dbAdd(c->db,dstkey,dstobj);
2156         addReplyLongLong(c,zsetLength(dstobj));
2157         signalModifiedKey(c->db,dstkey);
2158         notifyKeyspaceEvent(NOTIFY_ZSET,
2159             (op == SET_OP_UNION) ? "zunionstore" : "zinterstore",
2160             dstkey,c->db->id);
2161         server.dirty++;
2162     } else {
2163         decrRefCount(dstobj);
2164         addReply(c,shared.czero);
2165         if (touched) {
2166             signalModifiedKey(c->db,dstkey);
2167             notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id);
2168             server.dirty++;
2169         }
2170     }
2171     zfree(src);
2172 }
2173 
zunionstoreCommand(client * c)2174 void zunionstoreCommand(client *c) {
2175     zunionInterGenericCommand(c,c->argv[1], SET_OP_UNION);
2176 }
2177 
zinterstoreCommand(client * c)2178 void zinterstoreCommand(client *c) {
2179     zunionInterGenericCommand(c,c->argv[1], SET_OP_INTER);
2180 }
2181 
zrangeGenericCommand(client * c,int reverse)2182 void zrangeGenericCommand(client *c, int reverse) {
2183     robj *key = c->argv[1];
2184     robj *zobj;
2185     int withscores = 0;
2186     long start;
2187     long end;
2188     int llen;
2189     int rangelen;
2190 
2191     if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
2192         (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
2193 
2194     if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
2195         withscores = 1;
2196     } else if (c->argc >= 5) {
2197         addReply(c,shared.syntaxerr);
2198         return;
2199     }
2200 
2201     if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
2202          || checkType(c,zobj,OBJ_ZSET)) return;
2203 
2204     /* Sanitize indexes. */
2205     llen = zsetLength(zobj);
2206     if (start < 0) start = llen+start;
2207     if (end < 0) end = llen+end;
2208     if (start < 0) start = 0;
2209 
2210     /* Invariant: start >= 0, so this test will be true when end < 0.
2211      * The range is empty when start > end or start >= length. */
2212     if (start > end || start >= llen) {
2213         addReply(c,shared.emptymultibulk);
2214         return;
2215     }
2216     if (end >= llen) end = llen-1;
2217     rangelen = (end-start)+1;
2218 
2219     /* Return the result in form of a multi-bulk reply */
2220     addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
2221 
2222     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
2223         unsigned char *zl = zobj->ptr;
2224         unsigned char *eptr, *sptr;
2225         unsigned char *vstr;
2226         unsigned int vlen;
2227         long long vlong;
2228 
2229         if (reverse)
2230             eptr = ziplistIndex(zl,-2-(2*start));
2231         else
2232             eptr = ziplistIndex(zl,2*start);
2233 
2234         serverAssertWithInfo(c,zobj,eptr != NULL);
2235         sptr = ziplistNext(zl,eptr);
2236 
2237         while (rangelen--) {
2238             serverAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL);
2239             serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
2240             if (vstr == NULL)
2241                 addReplyBulkLongLong(c,vlong);
2242             else
2243                 addReplyBulkCBuffer(c,vstr,vlen);
2244 
2245             if (withscores)
2246                 addReplyDouble(c,zzlGetScore(sptr));
2247 
2248             if (reverse)
2249                 zzlPrev(zl,&eptr,&sptr);
2250             else
2251                 zzlNext(zl,&eptr,&sptr);
2252         }
2253 
2254     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
2255         zset *zs = zobj->ptr;
2256         zskiplist *zsl = zs->zsl;
2257         zskiplistNode *ln;
2258         robj *ele;
2259 
2260         /* Check if starting point is trivial, before doing log(N) lookup. */
2261         if (reverse) {
2262             ln = zsl->tail;
2263             if (start > 0)
2264                 ln = zslGetElementByRank(zsl,llen-start);
2265         } else {
2266             ln = zsl->header->level[0].forward;
2267             if (start > 0)
2268                 ln = zslGetElementByRank(zsl,start+1);
2269         }
2270 
2271         while(rangelen--) {
2272             serverAssertWithInfo(c,zobj,ln != NULL);
2273             ele = ln->obj;
2274             addReplyBulk(c,ele);
2275             if (withscores)
2276                 addReplyDouble(c,ln->score);
2277             ln = reverse ? ln->backward : ln->level[0].forward;
2278         }
2279     } else {
2280         serverPanic("Unknown sorted set encoding");
2281     }
2282 }
2283 
zrangeCommand(client * c)2284 void zrangeCommand(client *c) {
2285     zrangeGenericCommand(c,0);
2286 }
2287 
zrevrangeCommand(client * c)2288 void zrevrangeCommand(client *c) {
2289     zrangeGenericCommand(c,1);
2290 }
2291 
2292 /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
genericZrangebyscoreCommand(client * c,int reverse)2293 void genericZrangebyscoreCommand(client *c, int reverse) {
2294     zrangespec range;
2295     robj *key = c->argv[1];
2296     robj *zobj;
2297     long offset = 0, limit = -1;
2298     int withscores = 0;
2299     unsigned long rangelen = 0;
2300     void *replylen = NULL;
2301     int minidx, maxidx;
2302 
2303     /* Parse the range arguments. */
2304     if (reverse) {
2305         /* Range is given as [max,min] */
2306         maxidx = 2; minidx = 3;
2307     } else {
2308         /* Range is given as [min,max] */
2309         minidx = 2; maxidx = 3;
2310     }
2311 
2312     if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != C_OK) {
2313         addReplyError(c,"min or max is not a float");
2314         return;
2315     }
2316 
2317     /* Parse optional extra arguments. Note that ZCOUNT will exactly have
2318      * 4 arguments, so we'll never enter the following code path. */
2319     if (c->argc > 4) {
2320         int remaining = c->argc - 4;
2321         int pos = 4;
2322 
2323         while (remaining) {
2324             if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
2325                 pos++; remaining--;
2326                 withscores = 1;
2327             } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
2328                 if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != C_OK) ||
2329                     (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != C_OK)) return;
2330                 pos += 3; remaining -= 3;
2331             } else {
2332                 addReply(c,shared.syntaxerr);
2333                 return;
2334             }
2335         }
2336     }
2337 
2338     /* Ok, lookup the key and get the range */
2339     if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL ||
2340         checkType(c,zobj,OBJ_ZSET)) return;
2341 
2342     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
2343         unsigned char *zl = zobj->ptr;
2344         unsigned char *eptr, *sptr;
2345         unsigned char *vstr;
2346         unsigned int vlen;
2347         long long vlong;
2348         double score;
2349 
2350         /* If reversed, get the last node in range as starting point. */
2351         if (reverse) {
2352             eptr = zzlLastInRange(zl,&range);
2353         } else {
2354             eptr = zzlFirstInRange(zl,&range);
2355         }
2356 
2357         /* No "first" element in the specified interval. */
2358         if (eptr == NULL) {
2359             addReply(c, shared.emptymultibulk);
2360             return;
2361         }
2362 
2363         /* Get score pointer for the first element. */
2364         serverAssertWithInfo(c,zobj,eptr != NULL);
2365         sptr = ziplistNext(zl,eptr);
2366 
2367         /* We don't know in advance how many matching elements there are in the
2368          * list, so we push this object that will represent the multi-bulk
2369          * length in the output buffer, and will "fix" it later */
2370         replylen = addDeferredMultiBulkLength(c);
2371 
2372         /* If there is an offset, just traverse the number of elements without
2373          * checking the score because that is done in the next loop. */
2374         while (eptr && offset--) {
2375             if (reverse) {
2376                 zzlPrev(zl,&eptr,&sptr);
2377             } else {
2378                 zzlNext(zl,&eptr,&sptr);
2379             }
2380         }
2381 
2382         while (eptr && limit--) {
2383             score = zzlGetScore(sptr);
2384 
2385             /* Abort when the node is no longer in range. */
2386             if (reverse) {
2387                 if (!zslValueGteMin(score,&range)) break;
2388             } else {
2389                 if (!zslValueLteMax(score,&range)) break;
2390             }
2391 
2392             /* We know the element exists, so ziplistGet should always succeed */
2393             serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
2394 
2395             rangelen++;
2396             if (vstr == NULL) {
2397                 addReplyBulkLongLong(c,vlong);
2398             } else {
2399                 addReplyBulkCBuffer(c,vstr,vlen);
2400             }
2401 
2402             if (withscores) {
2403                 addReplyDouble(c,score);
2404             }
2405 
2406             /* Move to next node */
2407             if (reverse) {
2408                 zzlPrev(zl,&eptr,&sptr);
2409             } else {
2410                 zzlNext(zl,&eptr,&sptr);
2411             }
2412         }
2413     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
2414         zset *zs = zobj->ptr;
2415         zskiplist *zsl = zs->zsl;
2416         zskiplistNode *ln;
2417 
2418         /* If reversed, get the last node in range as starting point. */
2419         if (reverse) {
2420             ln = zslLastInRange(zsl,&range);
2421         } else {
2422             ln = zslFirstInRange(zsl,&range);
2423         }
2424 
2425         /* No "first" element in the specified interval. */
2426         if (ln == NULL) {
2427             addReply(c, shared.emptymultibulk);
2428             return;
2429         }
2430 
2431         /* We don't know in advance how many matching elements there are in the
2432          * list, so we push this object that will represent the multi-bulk
2433          * length in the output buffer, and will "fix" it later */
2434         replylen = addDeferredMultiBulkLength(c);
2435 
2436         /* If there is an offset, just traverse the number of elements without
2437          * checking the score because that is done in the next loop. */
2438         while (ln && offset--) {
2439             if (reverse) {
2440                 ln = ln->backward;
2441             } else {
2442                 ln = ln->level[0].forward;
2443             }
2444         }
2445 
2446         while (ln && limit--) {
2447             /* Abort when the node is no longer in range. */
2448             if (reverse) {
2449                 if (!zslValueGteMin(ln->score,&range)) break;
2450             } else {
2451                 if (!zslValueLteMax(ln->score,&range)) break;
2452             }
2453 
2454             rangelen++;
2455             addReplyBulk(c,ln->obj);
2456 
2457             if (withscores) {
2458                 addReplyDouble(c,ln->score);
2459             }
2460 
2461             /* Move to next node */
2462             if (reverse) {
2463                 ln = ln->backward;
2464             } else {
2465                 ln = ln->level[0].forward;
2466             }
2467         }
2468     } else {
2469         serverPanic("Unknown sorted set encoding");
2470     }
2471 
2472     if (withscores) {
2473         rangelen *= 2;
2474     }
2475 
2476     setDeferredMultiBulkLength(c, replylen, rangelen);
2477 }
2478 
zrangebyscoreCommand(client * c)2479 void zrangebyscoreCommand(client *c) {
2480     genericZrangebyscoreCommand(c,0);
2481 }
2482 
zrevrangebyscoreCommand(client * c)2483 void zrevrangebyscoreCommand(client *c) {
2484     genericZrangebyscoreCommand(c,1);
2485 }
2486 
zcountCommand(client * c)2487 void zcountCommand(client *c) {
2488     robj *key = c->argv[1];
2489     robj *zobj;
2490     zrangespec range;
2491     int count = 0;
2492 
2493     /* Parse the range arguments */
2494     if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK) {
2495         addReplyError(c,"min or max is not a float");
2496         return;
2497     }
2498 
2499     /* Lookup the sorted set */
2500     if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
2501         checkType(c, zobj, OBJ_ZSET)) return;
2502 
2503     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
2504         unsigned char *zl = zobj->ptr;
2505         unsigned char *eptr, *sptr;
2506         double score;
2507 
2508         /* Use the first element in range as the starting point */
2509         eptr = zzlFirstInRange(zl,&range);
2510 
2511         /* No "first" element */
2512         if (eptr == NULL) {
2513             addReply(c, shared.czero);
2514             return;
2515         }
2516 
2517         /* First element is in range */
2518         sptr = ziplistNext(zl,eptr);
2519         score = zzlGetScore(sptr);
2520         serverAssertWithInfo(c,zobj,zslValueLteMax(score,&range));
2521 
2522         /* Iterate over elements in range */
2523         while (eptr) {
2524             score = zzlGetScore(sptr);
2525 
2526             /* Abort when the node is no longer in range. */
2527             if (!zslValueLteMax(score,&range)) {
2528                 break;
2529             } else {
2530                 count++;
2531                 zzlNext(zl,&eptr,&sptr);
2532             }
2533         }
2534     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
2535         zset *zs = zobj->ptr;
2536         zskiplist *zsl = zs->zsl;
2537         zskiplistNode *zn;
2538         unsigned long rank;
2539 
2540         /* Find first element in range */
2541         zn = zslFirstInRange(zsl, &range);
2542 
2543         /* Use rank of first element, if any, to determine preliminary count */
2544         if (zn != NULL) {
2545             rank = zslGetRank(zsl, zn->score, zn->obj);
2546             count = (zsl->length - (rank - 1));
2547 
2548             /* Find last element in range */
2549             zn = zslLastInRange(zsl, &range);
2550 
2551             /* Use rank of last element, if any, to determine the actual count */
2552             if (zn != NULL) {
2553                 rank = zslGetRank(zsl, zn->score, zn->obj);
2554                 count -= (zsl->length - rank);
2555             }
2556         }
2557     } else {
2558         serverPanic("Unknown sorted set encoding");
2559     }
2560 
2561     addReplyLongLong(c, count);
2562 }
2563 
zlexcountCommand(client * c)2564 void zlexcountCommand(client *c) {
2565     robj *key = c->argv[1];
2566     robj *zobj;
2567     zlexrangespec range;
2568     int count = 0;
2569 
2570     /* Parse the range arguments */
2571     if (zslParseLexRange(c->argv[2],c->argv[3],&range) != C_OK) {
2572         addReplyError(c,"min or max not valid string range item");
2573         return;
2574     }
2575 
2576     /* Lookup the sorted set */
2577     if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
2578         checkType(c, zobj, OBJ_ZSET))
2579     {
2580         zslFreeLexRange(&range);
2581         return;
2582     }
2583 
2584     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
2585         unsigned char *zl = zobj->ptr;
2586         unsigned char *eptr, *sptr;
2587 
2588         /* Use the first element in range as the starting point */
2589         eptr = zzlFirstInLexRange(zl,&range);
2590 
2591         /* No "first" element */
2592         if (eptr == NULL) {
2593             zslFreeLexRange(&range);
2594             addReply(c, shared.czero);
2595             return;
2596         }
2597 
2598         /* First element is in range */
2599         sptr = ziplistNext(zl,eptr);
2600         serverAssertWithInfo(c,zobj,zzlLexValueLteMax(eptr,&range));
2601 
2602         /* Iterate over elements in range */
2603         while (eptr) {
2604             /* Abort when the node is no longer in range. */
2605             if (!zzlLexValueLteMax(eptr,&range)) {
2606                 break;
2607             } else {
2608                 count++;
2609                 zzlNext(zl,&eptr,&sptr);
2610             }
2611         }
2612     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
2613         zset *zs = zobj->ptr;
2614         zskiplist *zsl = zs->zsl;
2615         zskiplistNode *zn;
2616         unsigned long rank;
2617 
2618         /* Find first element in range */
2619         zn = zslFirstInLexRange(zsl, &range);
2620 
2621         /* Use rank of first element, if any, to determine preliminary count */
2622         if (zn != NULL) {
2623             rank = zslGetRank(zsl, zn->score, zn->obj);
2624             count = (zsl->length - (rank - 1));
2625 
2626             /* Find last element in range */
2627             zn = zslLastInLexRange(zsl, &range);
2628 
2629             /* Use rank of last element, if any, to determine the actual count */
2630             if (zn != NULL) {
2631                 rank = zslGetRank(zsl, zn->score, zn->obj);
2632                 count -= (zsl->length - rank);
2633             }
2634         }
2635     } else {
2636         serverPanic("Unknown sorted set encoding");
2637     }
2638 
2639     zslFreeLexRange(&range);
2640     addReplyLongLong(c, count);
2641 }
2642 
2643 /* This command implements ZRANGEBYLEX, ZREVRANGEBYLEX. */
genericZrangebylexCommand(client * c,int reverse)2644 void genericZrangebylexCommand(client *c, int reverse) {
2645     zlexrangespec range;
2646     robj *key = c->argv[1];
2647     robj *zobj;
2648     long offset = 0, limit = -1;
2649     unsigned long rangelen = 0;
2650     void *replylen = NULL;
2651     int minidx, maxidx;
2652 
2653     /* Parse the range arguments. */
2654     if (reverse) {
2655         /* Range is given as [max,min] */
2656         maxidx = 2; minidx = 3;
2657     } else {
2658         /* Range is given as [min,max] */
2659         minidx = 2; maxidx = 3;
2660     }
2661 
2662     if (zslParseLexRange(c->argv[minidx],c->argv[maxidx],&range) != C_OK) {
2663         addReplyError(c,"min or max not valid string range item");
2664         return;
2665     }
2666 
2667     /* Parse optional extra arguments. Note that ZCOUNT will exactly have
2668      * 4 arguments, so we'll never enter the following code path. */
2669     if (c->argc > 4) {
2670         int remaining = c->argc - 4;
2671         int pos = 4;
2672 
2673         while (remaining) {
2674             if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
2675                 if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != C_OK) ||
2676                     (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != C_OK)) return;
2677                 pos += 3; remaining -= 3;
2678             } else {
2679                 zslFreeLexRange(&range);
2680                 addReply(c,shared.syntaxerr);
2681                 return;
2682             }
2683         }
2684     }
2685 
2686     /* Ok, lookup the key and get the range */
2687     if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL ||
2688         checkType(c,zobj,OBJ_ZSET))
2689     {
2690         zslFreeLexRange(&range);
2691         return;
2692     }
2693 
2694     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
2695         unsigned char *zl = zobj->ptr;
2696         unsigned char *eptr, *sptr;
2697         unsigned char *vstr;
2698         unsigned int vlen;
2699         long long vlong;
2700 
2701         /* If reversed, get the last node in range as starting point. */
2702         if (reverse) {
2703             eptr = zzlLastInLexRange(zl,&range);
2704         } else {
2705             eptr = zzlFirstInLexRange(zl,&range);
2706         }
2707 
2708         /* No "first" element in the specified interval. */
2709         if (eptr == NULL) {
2710             addReply(c, shared.emptymultibulk);
2711             zslFreeLexRange(&range);
2712             return;
2713         }
2714 
2715         /* Get score pointer for the first element. */
2716         serverAssertWithInfo(c,zobj,eptr != NULL);
2717         sptr = ziplistNext(zl,eptr);
2718 
2719         /* We don't know in advance how many matching elements there are in the
2720          * list, so we push this object that will represent the multi-bulk
2721          * length in the output buffer, and will "fix" it later */
2722         replylen = addDeferredMultiBulkLength(c);
2723 
2724         /* If there is an offset, just traverse the number of elements without
2725          * checking the score because that is done in the next loop. */
2726         while (eptr && offset--) {
2727             if (reverse) {
2728                 zzlPrev(zl,&eptr,&sptr);
2729             } else {
2730                 zzlNext(zl,&eptr,&sptr);
2731             }
2732         }
2733 
2734         while (eptr && limit--) {
2735             /* Abort when the node is no longer in range. */
2736             if (reverse) {
2737                 if (!zzlLexValueGteMin(eptr,&range)) break;
2738             } else {
2739                 if (!zzlLexValueLteMax(eptr,&range)) break;
2740             }
2741 
2742             /* We know the element exists, so ziplistGet should always
2743              * succeed. */
2744             serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
2745 
2746             rangelen++;
2747             if (vstr == NULL) {
2748                 addReplyBulkLongLong(c,vlong);
2749             } else {
2750                 addReplyBulkCBuffer(c,vstr,vlen);
2751             }
2752 
2753             /* Move to next node */
2754             if (reverse) {
2755                 zzlPrev(zl,&eptr,&sptr);
2756             } else {
2757                 zzlNext(zl,&eptr,&sptr);
2758             }
2759         }
2760     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
2761         zset *zs = zobj->ptr;
2762         zskiplist *zsl = zs->zsl;
2763         zskiplistNode *ln;
2764 
2765         /* If reversed, get the last node in range as starting point. */
2766         if (reverse) {
2767             ln = zslLastInLexRange(zsl,&range);
2768         } else {
2769             ln = zslFirstInLexRange(zsl,&range);
2770         }
2771 
2772         /* No "first" element in the specified interval. */
2773         if (ln == NULL) {
2774             addReply(c, shared.emptymultibulk);
2775             zslFreeLexRange(&range);
2776             return;
2777         }
2778 
2779         /* We don't know in advance how many matching elements there are in the
2780          * list, so we push this object that will represent the multi-bulk
2781          * length in the output buffer, and will "fix" it later */
2782         replylen = addDeferredMultiBulkLength(c);
2783 
2784         /* If there is an offset, just traverse the number of elements without
2785          * checking the score because that is done in the next loop. */
2786         while (ln && offset--) {
2787             if (reverse) {
2788                 ln = ln->backward;
2789             } else {
2790                 ln = ln->level[0].forward;
2791             }
2792         }
2793 
2794         while (ln && limit--) {
2795             /* Abort when the node is no longer in range. */
2796             if (reverse) {
2797                 if (!zslLexValueGteMin(ln->obj,&range)) break;
2798             } else {
2799                 if (!zslLexValueLteMax(ln->obj,&range)) break;
2800             }
2801 
2802             rangelen++;
2803             addReplyBulk(c,ln->obj);
2804 
2805             /* Move to next node */
2806             if (reverse) {
2807                 ln = ln->backward;
2808             } else {
2809                 ln = ln->level[0].forward;
2810             }
2811         }
2812     } else {
2813         serverPanic("Unknown sorted set encoding");
2814     }
2815 
2816     zslFreeLexRange(&range);
2817     setDeferredMultiBulkLength(c, replylen, rangelen);
2818 }
2819 
zrangebylexCommand(client * c)2820 void zrangebylexCommand(client *c) {
2821     genericZrangebylexCommand(c,0);
2822 }
2823 
zrevrangebylexCommand(client * c)2824 void zrevrangebylexCommand(client *c) {
2825     genericZrangebylexCommand(c,1);
2826 }
2827 
zcardCommand(client * c)2828 void zcardCommand(client *c) {
2829     robj *key = c->argv[1];
2830     robj *zobj;
2831 
2832     if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
2833         checkType(c,zobj,OBJ_ZSET)) return;
2834 
2835     addReplyLongLong(c,zsetLength(zobj));
2836 }
2837 
zscoreCommand(client * c)2838 void zscoreCommand(client *c) {
2839     robj *key = c->argv[1];
2840     robj *zobj;
2841     double score;
2842 
2843     if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
2844         checkType(c,zobj,OBJ_ZSET)) return;
2845 
2846     if (zsetScore(zobj,c->argv[2],&score) == C_ERR) {
2847         addReply(c,shared.nullbulk);
2848     } else {
2849         addReplyDouble(c,score);
2850     }
2851 }
2852 
zrankGenericCommand(client * c,int reverse)2853 void zrankGenericCommand(client *c, int reverse) {
2854     robj *key = c->argv[1];
2855     robj *ele = c->argv[2];
2856     robj *zobj;
2857     unsigned long llen;
2858     unsigned long rank;
2859 
2860     if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
2861         checkType(c,zobj,OBJ_ZSET)) return;
2862     llen = zsetLength(zobj);
2863 
2864     serverAssertWithInfo(c,ele,sdsEncodedObject(ele));
2865 
2866     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
2867         unsigned char *zl = zobj->ptr;
2868         unsigned char *eptr, *sptr;
2869 
2870         eptr = ziplistIndex(zl,0);
2871         serverAssertWithInfo(c,zobj,eptr != NULL);
2872         sptr = ziplistNext(zl,eptr);
2873         serverAssertWithInfo(c,zobj,sptr != NULL);
2874 
2875         rank = 1;
2876         while(eptr != NULL) {
2877             if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr)))
2878                 break;
2879             rank++;
2880             zzlNext(zl,&eptr,&sptr);
2881         }
2882 
2883         if (eptr != NULL) {
2884             if (reverse)
2885                 addReplyLongLong(c,llen-rank);
2886             else
2887                 addReplyLongLong(c,rank-1);
2888         } else {
2889             addReply(c,shared.nullbulk);
2890         }
2891     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
2892         zset *zs = zobj->ptr;
2893         zskiplist *zsl = zs->zsl;
2894         dictEntry *de;
2895         double score;
2896 
2897         ele = c->argv[2];
2898         de = dictFind(zs->dict,ele);
2899         if (de != NULL) {
2900             score = *(double*)dictGetVal(de);
2901             rank = zslGetRank(zsl,score,ele);
2902             serverAssertWithInfo(c,ele,rank); /* Existing elements always have a rank. */
2903             if (reverse)
2904                 addReplyLongLong(c,llen-rank);
2905             else
2906                 addReplyLongLong(c,rank-1);
2907         } else {
2908             addReply(c,shared.nullbulk);
2909         }
2910     } else {
2911         serverPanic("Unknown sorted set encoding");
2912     }
2913 }
2914 
zrankCommand(client * c)2915 void zrankCommand(client *c) {
2916     zrankGenericCommand(c, 0);
2917 }
2918 
zrevrankCommand(client * c)2919 void zrevrankCommand(client *c) {
2920     zrankGenericCommand(c, 1);
2921 }
2922 
zscanCommand(client * c)2923 void zscanCommand(client *c) {
2924     robj *o;
2925     unsigned long cursor;
2926 
2927     if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return;
2928     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL ||
2929         checkType(c,o,OBJ_ZSET)) return;
2930     scanGenericCommand(c,o,cursor);
2931 }
2932