1 /*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * * Neither the name of Redis nor the names of its contributors may be used
15 * to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 /*-----------------------------------------------------------------------------
32 * Sorted set API
33 *----------------------------------------------------------------------------*/
34
35 /* ZSETs are ordered sets using two data structures to hold the same elements
36 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
37 * data structure.
38 *
39 * The elements are added to a hash table mapping Redis objects to scores.
40 * At the same time the elements are added to a skip list mapping scores
41 * to Redis objects (so objects are sorted by scores in this "view"). */
42
43 /* This skiplist implementation is almost a C translation of the original
44 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
45 * Alternative to Balanced Trees", modified in three ways:
46 * a) this implementation allows for repeated scores.
47 * b) the comparison is not just by key (our 'score') but by satellite data.
48 * c) there is a back pointer, so it's a doubly linked list with the back
49 * pointers being only at "level 1". This allows to traverse the list
50 * from tail to head, useful for ZREVRANGE. */
51
52 #include "server.h"
53 #include <math.h>
54
55 static int zslLexValueGteMin(robj *value, zlexrangespec *spec);
56 static int zslLexValueLteMax(robj *value, zlexrangespec *spec);
57
zslCreateNode(int level,double score,robj * obj)58 zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
59 zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
60 zn->score = score;
61 zn->obj = obj;
62 return zn;
63 }
64
zslCreate(void)65 zskiplist *zslCreate(void) {
66 int j;
67 zskiplist *zsl;
68
69 zsl = zmalloc(sizeof(*zsl));
70 zsl->level = 1;
71 zsl->length = 0;
72 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
73 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
74 zsl->header->level[j].forward = NULL;
75 zsl->header->level[j].span = 0;
76 }
77 zsl->header->backward = NULL;
78 zsl->tail = NULL;
79 return zsl;
80 }
81
zslFreeNode(zskiplistNode * node)82 void zslFreeNode(zskiplistNode *node) {
83 decrRefCount(node->obj);
84 zfree(node);
85 }
86
zslFree(zskiplist * zsl)87 void zslFree(zskiplist *zsl) {
88 zskiplistNode *node = zsl->header->level[0].forward, *next;
89
90 zfree(zsl->header);
91 while(node) {
92 next = node->level[0].forward;
93 zslFreeNode(node);
94 node = next;
95 }
96 zfree(zsl);
97 }
98
99 /* Returns a random level for the new skiplist node we are going to create.
100 * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
101 * (both inclusive), with a powerlaw-alike distribution where higher
102 * levels are less likely to be returned. */
zslRandomLevel(void)103 int zslRandomLevel(void) {
104 int level = 1;
105 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
106 level += 1;
107 return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
108 }
109
zslInsert(zskiplist * zsl,double score,robj * obj)110 zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
111 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
112 unsigned int rank[ZSKIPLIST_MAXLEVEL];
113 int i, level;
114
115 serverAssert(!isnan(score));
116 x = zsl->header;
117 for (i = zsl->level-1; i >= 0; i--) {
118 /* store rank that is crossed to reach the insert position */
119 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
120 while (x->level[i].forward &&
121 (x->level[i].forward->score < score ||
122 (x->level[i].forward->score == score &&
123 compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
124 rank[i] += x->level[i].span;
125 x = x->level[i].forward;
126 }
127 update[i] = x;
128 }
129 /* we assume the key is not already inside, since we allow duplicated
130 * scores, and the re-insertion of score and redis object should never
131 * happen since the caller of zslInsert() should test in the hash table
132 * if the element is already inside or not. */
133 level = zslRandomLevel();
134 if (level > zsl->level) {
135 for (i = zsl->level; i < level; i++) {
136 rank[i] = 0;
137 update[i] = zsl->header;
138 update[i]->level[i].span = zsl->length;
139 }
140 zsl->level = level;
141 }
142 x = zslCreateNode(level,score,obj);
143 for (i = 0; i < level; i++) {
144 x->level[i].forward = update[i]->level[i].forward;
145 update[i]->level[i].forward = x;
146
147 /* update span covered by update[i] as x is inserted here */
148 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
149 update[i]->level[i].span = (rank[0] - rank[i]) + 1;
150 }
151
152 /* increment span for untouched levels */
153 for (i = level; i < zsl->level; i++) {
154 update[i]->level[i].span++;
155 }
156
157 x->backward = (update[0] == zsl->header) ? NULL : update[0];
158 if (x->level[0].forward)
159 x->level[0].forward->backward = x;
160 else
161 zsl->tail = x;
162 zsl->length++;
163 return x;
164 }
165
166 /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
zslDeleteNode(zskiplist * zsl,zskiplistNode * x,zskiplistNode ** update)167 void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
168 int i;
169 for (i = 0; i < zsl->level; i++) {
170 if (update[i]->level[i].forward == x) {
171 update[i]->level[i].span += x->level[i].span - 1;
172 update[i]->level[i].forward = x->level[i].forward;
173 } else {
174 update[i]->level[i].span -= 1;
175 }
176 }
177 if (x->level[0].forward) {
178 x->level[0].forward->backward = x->backward;
179 } else {
180 zsl->tail = x->backward;
181 }
182 while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
183 zsl->level--;
184 zsl->length--;
185 }
186
187 /* Delete an element with matching score/object from the skiplist. */
zslDelete(zskiplist * zsl,double score,robj * obj)188 int zslDelete(zskiplist *zsl, double score, robj *obj) {
189 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
190 int i;
191
192 x = zsl->header;
193 for (i = zsl->level-1; i >= 0; i--) {
194 while (x->level[i].forward &&
195 (x->level[i].forward->score < score ||
196 (x->level[i].forward->score == score &&
197 compareStringObjects(x->level[i].forward->obj,obj) < 0)))
198 x = x->level[i].forward;
199 update[i] = x;
200 }
201 /* We may have multiple elements with the same score, what we need
202 * is to find the element with both the right score and object. */
203 x = x->level[0].forward;
204 if (x && score == x->score && equalStringObjects(x->obj,obj)) {
205 zslDeleteNode(zsl, x, update);
206 zslFreeNode(x);
207 return 1;
208 }
209 return 0; /* not found */
210 }
211
zslValueGteMin(double value,zrangespec * spec)212 static int zslValueGteMin(double value, zrangespec *spec) {
213 return spec->minex ? (value > spec->min) : (value >= spec->min);
214 }
215
zslValueLteMax(double value,zrangespec * spec)216 int zslValueLteMax(double value, zrangespec *spec) {
217 return spec->maxex ? (value < spec->max) : (value <= spec->max);
218 }
219
220 /* Returns if there is a part of the zset is in range. */
zslIsInRange(zskiplist * zsl,zrangespec * range)221 int zslIsInRange(zskiplist *zsl, zrangespec *range) {
222 zskiplistNode *x;
223
224 /* Test for ranges that will always be empty. */
225 if (range->min > range->max ||
226 (range->min == range->max && (range->minex || range->maxex)))
227 return 0;
228 x = zsl->tail;
229 if (x == NULL || !zslValueGteMin(x->score,range))
230 return 0;
231 x = zsl->header->level[0].forward;
232 if (x == NULL || !zslValueLteMax(x->score,range))
233 return 0;
234 return 1;
235 }
236
237 /* Find the first node that is contained in the specified range.
238 * Returns NULL when no element is contained in the range. */
zslFirstInRange(zskiplist * zsl,zrangespec * range)239 zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) {
240 zskiplistNode *x;
241 int i;
242
243 /* If everything is out of range, return early. */
244 if (!zslIsInRange(zsl,range)) return NULL;
245
246 x = zsl->header;
247 for (i = zsl->level-1; i >= 0; i--) {
248 /* Go forward while *OUT* of range. */
249 while (x->level[i].forward &&
250 !zslValueGteMin(x->level[i].forward->score,range))
251 x = x->level[i].forward;
252 }
253
254 /* This is an inner range, so the next node cannot be NULL. */
255 x = x->level[0].forward;
256 serverAssert(x != NULL);
257
258 /* Check if score <= max. */
259 if (!zslValueLteMax(x->score,range)) return NULL;
260 return x;
261 }
262
263 /* Find the last node that is contained in the specified range.
264 * Returns NULL when no element is contained in the range. */
zslLastInRange(zskiplist * zsl,zrangespec * range)265 zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range) {
266 zskiplistNode *x;
267 int i;
268
269 /* If everything is out of range, return early. */
270 if (!zslIsInRange(zsl,range)) return NULL;
271
272 x = zsl->header;
273 for (i = zsl->level-1; i >= 0; i--) {
274 /* Go forward while *IN* range. */
275 while (x->level[i].forward &&
276 zslValueLteMax(x->level[i].forward->score,range))
277 x = x->level[i].forward;
278 }
279
280 /* This is an inner range, so this node cannot be NULL. */
281 serverAssert(x != NULL);
282
283 /* Check if score >= min. */
284 if (!zslValueGteMin(x->score,range)) return NULL;
285 return x;
286 }
287
288 /* Delete all the elements with score between min and max from the skiplist.
289 * Min and max are inclusive, so a score >= min || score <= max is deleted.
290 * Note that this function takes the reference to the hash table view of the
291 * sorted set, in order to remove the elements from the hash table too. */
zslDeleteRangeByScore(zskiplist * zsl,zrangespec * range,dict * dict)292 unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dict) {
293 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
294 unsigned long removed = 0;
295 int i;
296
297 x = zsl->header;
298 for (i = zsl->level-1; i >= 0; i--) {
299 while (x->level[i].forward && (range->minex ?
300 x->level[i].forward->score <= range->min :
301 x->level[i].forward->score < range->min))
302 x = x->level[i].forward;
303 update[i] = x;
304 }
305
306 /* Current node is the last with score < or <= min. */
307 x = x->level[0].forward;
308
309 /* Delete nodes while in range. */
310 while (x &&
311 (range->maxex ? x->score < range->max : x->score <= range->max))
312 {
313 zskiplistNode *next = x->level[0].forward;
314 zslDeleteNode(zsl,x,update);
315 dictDelete(dict,x->obj);
316 zslFreeNode(x);
317 removed++;
318 x = next;
319 }
320 return removed;
321 }
322
zslDeleteRangeByLex(zskiplist * zsl,zlexrangespec * range,dict * dict)323 unsigned long zslDeleteRangeByLex(zskiplist *zsl, zlexrangespec *range, dict *dict) {
324 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
325 unsigned long removed = 0;
326 int i;
327
328
329 x = zsl->header;
330 for (i = zsl->level-1; i >= 0; i--) {
331 while (x->level[i].forward &&
332 !zslLexValueGteMin(x->level[i].forward->obj,range))
333 x = x->level[i].forward;
334 update[i] = x;
335 }
336
337 /* Current node is the last with score < or <= min. */
338 x = x->level[0].forward;
339
340 /* Delete nodes while in range. */
341 while (x && zslLexValueLteMax(x->obj,range)) {
342 zskiplistNode *next = x->level[0].forward;
343 zslDeleteNode(zsl,x,update);
344 dictDelete(dict,x->obj);
345 zslFreeNode(x);
346 removed++;
347 x = next;
348 }
349 return removed;
350 }
351
352 /* Delete all the elements with rank between start and end from the skiplist.
353 * Start and end are inclusive. Note that start and end need to be 1-based */
zslDeleteRangeByRank(zskiplist * zsl,unsigned int start,unsigned int end,dict * dict)354 unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
355 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
356 unsigned long traversed = 0, removed = 0;
357 int i;
358
359 x = zsl->header;
360 for (i = zsl->level-1; i >= 0; i--) {
361 while (x->level[i].forward && (traversed + x->level[i].span) < start) {
362 traversed += x->level[i].span;
363 x = x->level[i].forward;
364 }
365 update[i] = x;
366 }
367
368 traversed++;
369 x = x->level[0].forward;
370 while (x && traversed <= end) {
371 zskiplistNode *next = x->level[0].forward;
372 zslDeleteNode(zsl,x,update);
373 dictDelete(dict,x->obj);
374 zslFreeNode(x);
375 removed++;
376 traversed++;
377 x = next;
378 }
379 return removed;
380 }
381
382 /* Find the rank for an element by both score and key.
383 * Returns 0 when the element cannot be found, rank otherwise.
384 * Note that the rank is 1-based due to the span of zsl->header to the
385 * first element. */
zslGetRank(zskiplist * zsl,double score,robj * o)386 unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
387 zskiplistNode *x;
388 unsigned long rank = 0;
389 int i;
390
391 x = zsl->header;
392 for (i = zsl->level-1; i >= 0; i--) {
393 while (x->level[i].forward &&
394 (x->level[i].forward->score < score ||
395 (x->level[i].forward->score == score &&
396 compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
397 rank += x->level[i].span;
398 x = x->level[i].forward;
399 }
400
401 /* x might be equal to zsl->header, so test if obj is non-NULL */
402 if (x->obj && equalStringObjects(x->obj,o)) {
403 return rank;
404 }
405 }
406 return 0;
407 }
408
409 /* Finds an element by its rank. The rank argument needs to be 1-based. */
zslGetElementByRank(zskiplist * zsl,unsigned long rank)410 zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
411 zskiplistNode *x;
412 unsigned long traversed = 0;
413 int i;
414
415 x = zsl->header;
416 for (i = zsl->level-1; i >= 0; i--) {
417 while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
418 {
419 traversed += x->level[i].span;
420 x = x->level[i].forward;
421 }
422 if (traversed == rank) {
423 return x;
424 }
425 }
426 return NULL;
427 }
428
429 /* Populate the rangespec according to the objects min and max. */
zslParseRange(robj * min,robj * max,zrangespec * spec)430 static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
431 char *eptr;
432 spec->minex = spec->maxex = 0;
433
434 /* Parse the min-max interval. If one of the values is prefixed
435 * by the "(" character, it's considered "open". For instance
436 * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
437 * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
438 if (min->encoding == OBJ_ENCODING_INT) {
439 spec->min = (long)min->ptr;
440 } else {
441 if (((char*)min->ptr)[0] == '(') {
442 spec->min = strtod((char*)min->ptr+1,&eptr);
443 if (eptr[0] != '\0' || isnan(spec->min)) return C_ERR;
444 spec->minex = 1;
445 } else {
446 spec->min = strtod((char*)min->ptr,&eptr);
447 if (eptr[0] != '\0' || isnan(spec->min)) return C_ERR;
448 }
449 }
450 if (max->encoding == OBJ_ENCODING_INT) {
451 spec->max = (long)max->ptr;
452 } else {
453 if (((char*)max->ptr)[0] == '(') {
454 spec->max = strtod((char*)max->ptr+1,&eptr);
455 if (eptr[0] != '\0' || isnan(spec->max)) return C_ERR;
456 spec->maxex = 1;
457 } else {
458 spec->max = strtod((char*)max->ptr,&eptr);
459 if (eptr[0] != '\0' || isnan(spec->max)) return C_ERR;
460 }
461 }
462
463 return C_OK;
464 }
465
466 /* ------------------------ Lexicographic ranges ---------------------------- */
467
468 /* Parse max or min argument of ZRANGEBYLEX.
469 * (foo means foo (open interval)
470 * [foo means foo (closed interval)
471 * - means the min string possible
472 * + means the max string possible
473 *
474 * If the string is valid the *dest pointer is set to the redis object
475 * that will be used for the comparision, and ex will be set to 0 or 1
476 * respectively if the item is exclusive or inclusive. C_OK will be
477 * returned.
478 *
479 * If the string is not a valid range C_ERR is returned, and the value
480 * of *dest and *ex is undefined. */
zslParseLexRangeItem(robj * item,robj ** dest,int * ex)481 int zslParseLexRangeItem(robj *item, robj **dest, int *ex) {
482 char *c = item->ptr;
483
484 switch(c[0]) {
485 case '+':
486 if (c[1] != '\0') return C_ERR;
487 *ex = 0;
488 *dest = shared.maxstring;
489 incrRefCount(shared.maxstring);
490 return C_OK;
491 case '-':
492 if (c[1] != '\0') return C_ERR;
493 *ex = 0;
494 *dest = shared.minstring;
495 incrRefCount(shared.minstring);
496 return C_OK;
497 case '(':
498 *ex = 1;
499 *dest = createStringObject(c+1,sdslen(c)-1);
500 return C_OK;
501 case '[':
502 *ex = 0;
503 *dest = createStringObject(c+1,sdslen(c)-1);
504 return C_OK;
505 default:
506 return C_ERR;
507 }
508 }
509
510 /* Populate the rangespec according to the objects min and max.
511 *
512 * Return C_OK on success. On error C_ERR is returned.
513 * When OK is returned the structure must be freed with zslFreeLexRange(),
514 * otherwise no release is needed. */
zslParseLexRange(robj * min,robj * max,zlexrangespec * spec)515 static int zslParseLexRange(robj *min, robj *max, zlexrangespec *spec) {
516 /* The range can't be valid if objects are integer encoded.
517 * Every item must start with ( or [. */
518 if (min->encoding == OBJ_ENCODING_INT ||
519 max->encoding == OBJ_ENCODING_INT) return C_ERR;
520
521 spec->min = spec->max = NULL;
522 if (zslParseLexRangeItem(min, &spec->min, &spec->minex) == C_ERR ||
523 zslParseLexRangeItem(max, &spec->max, &spec->maxex) == C_ERR) {
524 if (spec->min) decrRefCount(spec->min);
525 if (spec->max) decrRefCount(spec->max);
526 return C_ERR;
527 } else {
528 return C_OK;
529 }
530 }
531
532 /* Free a lex range structure, must be called only after zelParseLexRange()
533 * populated the structure with success (C_OK returned). */
zslFreeLexRange(zlexrangespec * spec)534 void zslFreeLexRange(zlexrangespec *spec) {
535 decrRefCount(spec->min);
536 decrRefCount(spec->max);
537 }
538
539 /* This is just a wrapper to compareStringObjects() that is able to
540 * handle shared.minstring and shared.maxstring as the equivalent of
541 * -inf and +inf for strings */
compareStringObjectsForLexRange(robj * a,robj * b)542 int compareStringObjectsForLexRange(robj *a, robj *b) {
543 if (a == b) return 0; /* This makes sure that we handle inf,inf and
544 -inf,-inf ASAP. One special case less. */
545 if (a == shared.minstring || b == shared.maxstring) return -1;
546 if (a == shared.maxstring || b == shared.minstring) return 1;
547 return compareStringObjects(a,b);
548 }
549
zslLexValueGteMin(robj * value,zlexrangespec * spec)550 static int zslLexValueGteMin(robj *value, zlexrangespec *spec) {
551 return spec->minex ?
552 (compareStringObjectsForLexRange(value,spec->min) > 0) :
553 (compareStringObjectsForLexRange(value,spec->min) >= 0);
554 }
555
zslLexValueLteMax(robj * value,zlexrangespec * spec)556 static int zslLexValueLteMax(robj *value, zlexrangespec *spec) {
557 return spec->maxex ?
558 (compareStringObjectsForLexRange(value,spec->max) < 0) :
559 (compareStringObjectsForLexRange(value,spec->max) <= 0);
560 }
561
562 /* Returns if there is a part of the zset is in the lex range. */
zslIsInLexRange(zskiplist * zsl,zlexrangespec * range)563 int zslIsInLexRange(zskiplist *zsl, zlexrangespec *range) {
564 zskiplistNode *x;
565
566 /* Test for ranges that will always be empty. */
567 if (compareStringObjectsForLexRange(range->min,range->max) > 1 ||
568 (compareStringObjects(range->min,range->max) == 0 &&
569 (range->minex || range->maxex)))
570 return 0;
571 x = zsl->tail;
572 if (x == NULL || !zslLexValueGteMin(x->obj,range))
573 return 0;
574 x = zsl->header->level[0].forward;
575 if (x == NULL || !zslLexValueLteMax(x->obj,range))
576 return 0;
577 return 1;
578 }
579
580 /* Find the first node that is contained in the specified lex range.
581 * Returns NULL when no element is contained in the range. */
zslFirstInLexRange(zskiplist * zsl,zlexrangespec * range)582 zskiplistNode *zslFirstInLexRange(zskiplist *zsl, zlexrangespec *range) {
583 zskiplistNode *x;
584 int i;
585
586 /* If everything is out of range, return early. */
587 if (!zslIsInLexRange(zsl,range)) return NULL;
588
589 x = zsl->header;
590 for (i = zsl->level-1; i >= 0; i--) {
591 /* Go forward while *OUT* of range. */
592 while (x->level[i].forward &&
593 !zslLexValueGteMin(x->level[i].forward->obj,range))
594 x = x->level[i].forward;
595 }
596
597 /* This is an inner range, so the next node cannot be NULL. */
598 x = x->level[0].forward;
599 serverAssert(x != NULL);
600
601 /* Check if score <= max. */
602 if (!zslLexValueLteMax(x->obj,range)) return NULL;
603 return x;
604 }
605
606 /* Find the last node that is contained in the specified range.
607 * Returns NULL when no element is contained in the range. */
zslLastInLexRange(zskiplist * zsl,zlexrangespec * range)608 zskiplistNode *zslLastInLexRange(zskiplist *zsl, zlexrangespec *range) {
609 zskiplistNode *x;
610 int i;
611
612 /* If everything is out of range, return early. */
613 if (!zslIsInLexRange(zsl,range)) return NULL;
614
615 x = zsl->header;
616 for (i = zsl->level-1; i >= 0; i--) {
617 /* Go forward while *IN* range. */
618 while (x->level[i].forward &&
619 zslLexValueLteMax(x->level[i].forward->obj,range))
620 x = x->level[i].forward;
621 }
622
623 /* This is an inner range, so this node cannot be NULL. */
624 serverAssert(x != NULL);
625
626 /* Check if score >= min. */
627 if (!zslLexValueGteMin(x->obj,range)) return NULL;
628 return x;
629 }
630
631 /*-----------------------------------------------------------------------------
632 * Ziplist-backed sorted set API
633 *----------------------------------------------------------------------------*/
634
zzlGetScore(unsigned char * sptr)635 double zzlGetScore(unsigned char *sptr) {
636 unsigned char *vstr;
637 unsigned int vlen;
638 long long vlong;
639 char buf[128];
640 double score;
641
642 serverAssert(sptr != NULL);
643 serverAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
644
645 if (vstr) {
646 memcpy(buf,vstr,vlen);
647 buf[vlen] = '\0';
648 score = strtod(buf,NULL);
649 } else {
650 score = vlong;
651 }
652
653 return score;
654 }
655
656 /* Return a ziplist element as a Redis string object.
657 * This simple abstraction can be used to simplifies some code at the
658 * cost of some performance. */
ziplistGetObject(unsigned char * sptr)659 robj *ziplistGetObject(unsigned char *sptr) {
660 unsigned char *vstr;
661 unsigned int vlen;
662 long long vlong;
663
664 serverAssert(sptr != NULL);
665 serverAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
666
667 if (vstr) {
668 return createStringObject((char*)vstr,vlen);
669 } else {
670 return createStringObjectFromLongLong(vlong);
671 }
672 }
673
674 /* Compare element in sorted set with given element. */
zzlCompareElements(unsigned char * eptr,unsigned char * cstr,unsigned int clen)675 int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
676 unsigned char *vstr;
677 unsigned int vlen;
678 long long vlong;
679 unsigned char vbuf[32];
680 int minlen, cmp;
681
682 serverAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
683 if (vstr == NULL) {
684 /* Store string representation of long long in buf. */
685 vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
686 vstr = vbuf;
687 }
688
689 minlen = (vlen < clen) ? vlen : clen;
690 cmp = memcmp(vstr,cstr,minlen);
691 if (cmp == 0) return vlen-clen;
692 return cmp;
693 }
694
zzlLength(unsigned char * zl)695 unsigned int zzlLength(unsigned char *zl) {
696 return ziplistLen(zl)/2;
697 }
698
699 /* Move to next entry based on the values in eptr and sptr. Both are set to
700 * NULL when there is no next entry. */
zzlNext(unsigned char * zl,unsigned char ** eptr,unsigned char ** sptr)701 void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
702 unsigned char *_eptr, *_sptr;
703 serverAssert(*eptr != NULL && *sptr != NULL);
704
705 _eptr = ziplistNext(zl,*sptr);
706 if (_eptr != NULL) {
707 _sptr = ziplistNext(zl,_eptr);
708 serverAssert(_sptr != NULL);
709 } else {
710 /* No next entry. */
711 _sptr = NULL;
712 }
713
714 *eptr = _eptr;
715 *sptr = _sptr;
716 }
717
718 /* Move to the previous entry based on the values in eptr and sptr. Both are
719 * set to NULL when there is no next entry. */
zzlPrev(unsigned char * zl,unsigned char ** eptr,unsigned char ** sptr)720 void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
721 unsigned char *_eptr, *_sptr;
722 serverAssert(*eptr != NULL && *sptr != NULL);
723
724 _sptr = ziplistPrev(zl,*eptr);
725 if (_sptr != NULL) {
726 _eptr = ziplistPrev(zl,_sptr);
727 serverAssert(_eptr != NULL);
728 } else {
729 /* No previous entry. */
730 _eptr = NULL;
731 }
732
733 *eptr = _eptr;
734 *sptr = _sptr;
735 }
736
737 /* Returns if there is a part of the zset is in range. Should only be used
738 * internally by zzlFirstInRange and zzlLastInRange. */
zzlIsInRange(unsigned char * zl,zrangespec * range)739 int zzlIsInRange(unsigned char *zl, zrangespec *range) {
740 unsigned char *p;
741 double score;
742
743 /* Test for ranges that will always be empty. */
744 if (range->min > range->max ||
745 (range->min == range->max && (range->minex || range->maxex)))
746 return 0;
747
748 p = ziplistIndex(zl,-1); /* Last score. */
749 if (p == NULL) return 0; /* Empty sorted set */
750 score = zzlGetScore(p);
751 if (!zslValueGteMin(score,range))
752 return 0;
753
754 p = ziplistIndex(zl,1); /* First score. */
755 serverAssert(p != NULL);
756 score = zzlGetScore(p);
757 if (!zslValueLteMax(score,range))
758 return 0;
759
760 return 1;
761 }
762
763 /* Find pointer to the first element contained in the specified range.
764 * Returns NULL when no element is contained in the range. */
zzlFirstInRange(unsigned char * zl,zrangespec * range)765 unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec *range) {
766 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
767 double score;
768
769 /* If everything is out of range, return early. */
770 if (!zzlIsInRange(zl,range)) return NULL;
771
772 while (eptr != NULL) {
773 sptr = ziplistNext(zl,eptr);
774 serverAssert(sptr != NULL);
775
776 score = zzlGetScore(sptr);
777 if (zslValueGteMin(score,range)) {
778 /* Check if score <= max. */
779 if (zslValueLteMax(score,range))
780 return eptr;
781 return NULL;
782 }
783
784 /* Move to next element. */
785 eptr = ziplistNext(zl,sptr);
786 }
787
788 return NULL;
789 }
790
791 /* Find pointer to the last element contained in the specified range.
792 * Returns NULL when no element is contained in the range. */
zzlLastInRange(unsigned char * zl,zrangespec * range)793 unsigned char *zzlLastInRange(unsigned char *zl, zrangespec *range) {
794 unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
795 double score;
796
797 /* If everything is out of range, return early. */
798 if (!zzlIsInRange(zl,range)) return NULL;
799
800 while (eptr != NULL) {
801 sptr = ziplistNext(zl,eptr);
802 serverAssert(sptr != NULL);
803
804 score = zzlGetScore(sptr);
805 if (zslValueLteMax(score,range)) {
806 /* Check if score >= min. */
807 if (zslValueGteMin(score,range))
808 return eptr;
809 return NULL;
810 }
811
812 /* Move to previous element by moving to the score of previous element.
813 * When this returns NULL, we know there also is no element. */
814 sptr = ziplistPrev(zl,eptr);
815 if (sptr != NULL)
816 serverAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
817 else
818 eptr = NULL;
819 }
820
821 return NULL;
822 }
823
zzlLexValueGteMin(unsigned char * p,zlexrangespec * spec)824 static int zzlLexValueGteMin(unsigned char *p, zlexrangespec *spec) {
825 robj *value = ziplistGetObject(p);
826 int res = zslLexValueGteMin(value,spec);
827 decrRefCount(value);
828 return res;
829 }
830
zzlLexValueLteMax(unsigned char * p,zlexrangespec * spec)831 static int zzlLexValueLteMax(unsigned char *p, zlexrangespec *spec) {
832 robj *value = ziplistGetObject(p);
833 int res = zslLexValueLteMax(value,spec);
834 decrRefCount(value);
835 return res;
836 }
837
838 /* Returns if there is a part of the zset is in range. Should only be used
839 * internally by zzlFirstInRange and zzlLastInRange. */
zzlIsInLexRange(unsigned char * zl,zlexrangespec * range)840 int zzlIsInLexRange(unsigned char *zl, zlexrangespec *range) {
841 unsigned char *p;
842
843 /* Test for ranges that will always be empty. */
844 if (compareStringObjectsForLexRange(range->min,range->max) > 1 ||
845 (compareStringObjects(range->min,range->max) == 0 &&
846 (range->minex || range->maxex)))
847 return 0;
848
849 p = ziplistIndex(zl,-2); /* Last element. */
850 if (p == NULL) return 0;
851 if (!zzlLexValueGteMin(p,range))
852 return 0;
853
854 p = ziplistIndex(zl,0); /* First element. */
855 serverAssert(p != NULL);
856 if (!zzlLexValueLteMax(p,range))
857 return 0;
858
859 return 1;
860 }
861
862 /* Find pointer to the first element contained in the specified lex range.
863 * Returns NULL when no element is contained in the range. */
zzlFirstInLexRange(unsigned char * zl,zlexrangespec * range)864 unsigned char *zzlFirstInLexRange(unsigned char *zl, zlexrangespec *range) {
865 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
866
867 /* If everything is out of range, return early. */
868 if (!zzlIsInLexRange(zl,range)) return NULL;
869
870 while (eptr != NULL) {
871 if (zzlLexValueGteMin(eptr,range)) {
872 /* Check if score <= max. */
873 if (zzlLexValueLteMax(eptr,range))
874 return eptr;
875 return NULL;
876 }
877
878 /* Move to next element. */
879 sptr = ziplistNext(zl,eptr); /* This element score. Skip it. */
880 serverAssert(sptr != NULL);
881 eptr = ziplistNext(zl,sptr); /* Next element. */
882 }
883
884 return NULL;
885 }
886
887 /* Find pointer to the last element contained in the specified lex range.
888 * Returns NULL when no element is contained in the range. */
zzlLastInLexRange(unsigned char * zl,zlexrangespec * range)889 unsigned char *zzlLastInLexRange(unsigned char *zl, zlexrangespec *range) {
890 unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
891
892 /* If everything is out of range, return early. */
893 if (!zzlIsInLexRange(zl,range)) return NULL;
894
895 while (eptr != NULL) {
896 if (zzlLexValueLteMax(eptr,range)) {
897 /* Check if score >= min. */
898 if (zzlLexValueGteMin(eptr,range))
899 return eptr;
900 return NULL;
901 }
902
903 /* Move to previous element by moving to the score of previous element.
904 * When this returns NULL, we know there also is no element. */
905 sptr = ziplistPrev(zl,eptr);
906 if (sptr != NULL)
907 serverAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
908 else
909 eptr = NULL;
910 }
911
912 return NULL;
913 }
914
zzlFind(unsigned char * zl,robj * ele,double * score)915 unsigned char *zzlFind(unsigned char *zl, robj *ele, double *score) {
916 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
917
918 ele = getDecodedObject(ele);
919 while (eptr != NULL) {
920 sptr = ziplistNext(zl,eptr);
921 serverAssertWithInfo(NULL,ele,sptr != NULL);
922
923 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) {
924 /* Matching element, pull out score. */
925 if (score != NULL) *score = zzlGetScore(sptr);
926 decrRefCount(ele);
927 return eptr;
928 }
929
930 /* Move to next element. */
931 eptr = ziplistNext(zl,sptr);
932 }
933
934 decrRefCount(ele);
935 return NULL;
936 }
937
938 /* Delete (element,score) pair from ziplist. Use local copy of eptr because we
939 * don't want to modify the one given as argument. */
zzlDelete(unsigned char * zl,unsigned char * eptr)940 unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) {
941 unsigned char *p = eptr;
942
943 /* TODO: add function to ziplist API to delete N elements from offset. */
944 zl = ziplistDelete(zl,&p);
945 zl = ziplistDelete(zl,&p);
946 return zl;
947 }
948
zzlInsertAt(unsigned char * zl,unsigned char * eptr,robj * ele,double score)949 unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, robj *ele, double score) {
950 unsigned char *sptr;
951 char scorebuf[128];
952 int scorelen;
953 size_t offset;
954
955 serverAssertWithInfo(NULL,ele,sdsEncodedObject(ele));
956 scorelen = d2string(scorebuf,sizeof(scorebuf),score);
957 if (eptr == NULL) {
958 zl = ziplistPush(zl,ele->ptr,sdslen(ele->ptr),ZIPLIST_TAIL);
959 zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
960 } else {
961 /* Keep offset relative to zl, as it might be re-allocated. */
962 offset = eptr-zl;
963 zl = ziplistInsert(zl,eptr,ele->ptr,sdslen(ele->ptr));
964 eptr = zl+offset;
965
966 /* Insert score after the element. */
967 serverAssertWithInfo(NULL,ele,(sptr = ziplistNext(zl,eptr)) != NULL);
968 zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
969 }
970
971 return zl;
972 }
973
974 /* Insert (element,score) pair in ziplist. This function assumes the element is
975 * not yet present in the list. */
zzlInsert(unsigned char * zl,robj * ele,double score)976 unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) {
977 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
978 double s;
979
980 ele = getDecodedObject(ele);
981 while (eptr != NULL) {
982 sptr = ziplistNext(zl,eptr);
983 serverAssertWithInfo(NULL,ele,sptr != NULL);
984 s = zzlGetScore(sptr);
985
986 if (s > score) {
987 /* First element with score larger than score for element to be
988 * inserted. This means we should take its spot in the list to
989 * maintain ordering. */
990 zl = zzlInsertAt(zl,eptr,ele,score);
991 break;
992 } else if (s == score) {
993 /* Ensure lexicographical ordering for elements. */
994 if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) {
995 zl = zzlInsertAt(zl,eptr,ele,score);
996 break;
997 }
998 }
999
1000 /* Move to next element. */
1001 eptr = ziplistNext(zl,sptr);
1002 }
1003
1004 /* Push on tail of list when it was not yet inserted. */
1005 if (eptr == NULL)
1006 zl = zzlInsertAt(zl,NULL,ele,score);
1007
1008 decrRefCount(ele);
1009 return zl;
1010 }
1011
zzlDeleteRangeByScore(unsigned char * zl,zrangespec * range,unsigned long * deleted)1012 unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec *range, unsigned long *deleted) {
1013 unsigned char *eptr, *sptr;
1014 double score;
1015 unsigned long num = 0;
1016
1017 if (deleted != NULL) *deleted = 0;
1018
1019 eptr = zzlFirstInRange(zl,range);
1020 if (eptr == NULL) return zl;
1021
1022 /* When the tail of the ziplist is deleted, eptr will point to the sentinel
1023 * byte and ziplistNext will return NULL. */
1024 while ((sptr = ziplistNext(zl,eptr)) != NULL) {
1025 score = zzlGetScore(sptr);
1026 if (zslValueLteMax(score,range)) {
1027 /* Delete both the element and the score. */
1028 zl = ziplistDelete(zl,&eptr);
1029 zl = ziplistDelete(zl,&eptr);
1030 num++;
1031 } else {
1032 /* No longer in range. */
1033 break;
1034 }
1035 }
1036
1037 if (deleted != NULL) *deleted = num;
1038 return zl;
1039 }
1040
zzlDeleteRangeByLex(unsigned char * zl,zlexrangespec * range,unsigned long * deleted)1041 unsigned char *zzlDeleteRangeByLex(unsigned char *zl, zlexrangespec *range, unsigned long *deleted) {
1042 unsigned char *eptr, *sptr;
1043 unsigned long num = 0;
1044
1045 if (deleted != NULL) *deleted = 0;
1046
1047 eptr = zzlFirstInLexRange(zl,range);
1048 if (eptr == NULL) return zl;
1049
1050 /* When the tail of the ziplist is deleted, eptr will point to the sentinel
1051 * byte and ziplistNext will return NULL. */
1052 while ((sptr = ziplistNext(zl,eptr)) != NULL) {
1053 if (zzlLexValueLteMax(eptr,range)) {
1054 /* Delete both the element and the score. */
1055 zl = ziplistDelete(zl,&eptr);
1056 zl = ziplistDelete(zl,&eptr);
1057 num++;
1058 } else {
1059 /* No longer in range. */
1060 break;
1061 }
1062 }
1063
1064 if (deleted != NULL) *deleted = num;
1065 return zl;
1066 }
1067
1068 /* Delete all the elements with rank between start and end from the skiplist.
1069 * Start and end are inclusive. Note that start and end need to be 1-based */
zzlDeleteRangeByRank(unsigned char * zl,unsigned int start,unsigned int end,unsigned long * deleted)1070 unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) {
1071 unsigned int num = (end-start)+1;
1072 if (deleted) *deleted = num;
1073 zl = ziplistDeleteRange(zl,2*(start-1),2*num);
1074 return zl;
1075 }
1076
1077 /*-----------------------------------------------------------------------------
1078 * Common sorted set API
1079 *----------------------------------------------------------------------------*/
1080
zsetLength(robj * zobj)1081 unsigned int zsetLength(robj *zobj) {
1082 int length = -1;
1083 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
1084 length = zzlLength(zobj->ptr);
1085 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1086 length = ((zset*)zobj->ptr)->zsl->length;
1087 } else {
1088 serverPanic("Unknown sorted set encoding");
1089 }
1090 return length;
1091 }
1092
zsetConvert(robj * zobj,int encoding)1093 void zsetConvert(robj *zobj, int encoding) {
1094 zset *zs;
1095 zskiplistNode *node, *next;
1096 robj *ele;
1097 double score;
1098
1099 if (zobj->encoding == encoding) return;
1100 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
1101 unsigned char *zl = zobj->ptr;
1102 unsigned char *eptr, *sptr;
1103 unsigned char *vstr;
1104 unsigned int vlen;
1105 long long vlong;
1106
1107 if (encoding != OBJ_ENCODING_SKIPLIST)
1108 serverPanic("Unknown target encoding");
1109
1110 zs = zmalloc(sizeof(*zs));
1111 zs->dict = dictCreate(&zsetDictType,NULL);
1112 zs->zsl = zslCreate();
1113
1114 eptr = ziplistIndex(zl,0);
1115 serverAssertWithInfo(NULL,zobj,eptr != NULL);
1116 sptr = ziplistNext(zl,eptr);
1117 serverAssertWithInfo(NULL,zobj,sptr != NULL);
1118
1119 while (eptr != NULL) {
1120 score = zzlGetScore(sptr);
1121 serverAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
1122 if (vstr == NULL)
1123 ele = createStringObjectFromLongLong(vlong);
1124 else
1125 ele = createStringObject((char*)vstr,vlen);
1126
1127 /* Has incremented refcount since it was just created. */
1128 node = zslInsert(zs->zsl,score,ele);
1129 serverAssertWithInfo(NULL,zobj,dictAdd(zs->dict,ele,&node->score) == DICT_OK);
1130 incrRefCount(ele); /* Added to dictionary. */
1131 zzlNext(zl,&eptr,&sptr);
1132 }
1133
1134 zfree(zobj->ptr);
1135 zobj->ptr = zs;
1136 zobj->encoding = OBJ_ENCODING_SKIPLIST;
1137 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1138 unsigned char *zl = ziplistNew();
1139
1140 if (encoding != OBJ_ENCODING_ZIPLIST)
1141 serverPanic("Unknown target encoding");
1142
1143 /* Approach similar to zslFree(), since we want to free the skiplist at
1144 * the same time as creating the ziplist. */
1145 zs = zobj->ptr;
1146 dictRelease(zs->dict);
1147 node = zs->zsl->header->level[0].forward;
1148 zfree(zs->zsl->header);
1149 zfree(zs->zsl);
1150
1151 while (node) {
1152 ele = getDecodedObject(node->obj);
1153 zl = zzlInsertAt(zl,NULL,ele,node->score);
1154 decrRefCount(ele);
1155
1156 next = node->level[0].forward;
1157 zslFreeNode(node);
1158 node = next;
1159 }
1160
1161 zfree(zs);
1162 zobj->ptr = zl;
1163 zobj->encoding = OBJ_ENCODING_ZIPLIST;
1164 } else {
1165 serverPanic("Unknown sorted set encoding");
1166 }
1167 }
1168
1169 /* Convert the sorted set object into a ziplist if it is not already a ziplist
1170 * and if the number of elements and the maximum element size is within the
1171 * expected ranges. */
zsetConvertToZiplistIfNeeded(robj * zobj,size_t maxelelen)1172 void zsetConvertToZiplistIfNeeded(robj *zobj, size_t maxelelen) {
1173 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) return;
1174 zset *zset = zobj->ptr;
1175
1176 if (zset->zsl->length <= server.zset_max_ziplist_entries &&
1177 maxelelen <= server.zset_max_ziplist_value)
1178 zsetConvert(zobj,OBJ_ENCODING_ZIPLIST);
1179 }
1180
1181 /* Return (by reference) the score of the specified member of the sorted set
1182 * storing it into *score. If the element does not exist C_ERR is returned
1183 * otherwise C_OK is returned and *score is correctly populated.
1184 * If 'zobj' or 'member' is NULL, C_ERR is returned. */
zsetScore(robj * zobj,robj * member,double * score)1185 int zsetScore(robj *zobj, robj *member, double *score) {
1186 if (!zobj || !member) return C_ERR;
1187
1188 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
1189 if (zzlFind(zobj->ptr, member, score) == NULL) return C_ERR;
1190 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1191 zset *zs = zobj->ptr;
1192 dictEntry *de = dictFind(zs->dict, member);
1193 if (de == NULL) return C_ERR;
1194 *score = *(double*)dictGetVal(de);
1195 } else {
1196 serverPanic("Unknown sorted set encoding");
1197 }
1198 return C_OK;
1199 }
1200
1201 /*-----------------------------------------------------------------------------
1202 * Sorted set commands
1203 *----------------------------------------------------------------------------*/
1204
1205 /* This generic command implements both ZADD and ZINCRBY. */
1206 #define ZADD_NONE 0
1207 #define ZADD_INCR (1<<0) /* Increment the score instead of setting it. */
1208 #define ZADD_NX (1<<1) /* Don't touch elements not already existing. */
1209 #define ZADD_XX (1<<2) /* Only touch elements already exisitng. */
1210 #define ZADD_CH (1<<3) /* Return num of elements added or updated. */
zaddGenericCommand(client * c,int flags)1211 void zaddGenericCommand(client *c, int flags) {
1212 static char *nanerr = "resulting score is not a number (NaN)";
1213 robj *key = c->argv[1];
1214 robj *ele;
1215 robj *zobj;
1216 robj *curobj;
1217 double score = 0, *scores = NULL, curscore = 0.0;
1218 int j, elements;
1219 int scoreidx = 0;
1220 /* The following vars are used in order to track what the command actually
1221 * did during the execution, to reply to the client and to trigger the
1222 * notification of keyspace change. */
1223 int added = 0; /* Number of new elements added. */
1224 int updated = 0; /* Number of elements with updated score. */
1225 int processed = 0; /* Number of elements processed, may remain zero with
1226 options like XX. */
1227
1228 /* Parse options. At the end 'scoreidx' is set to the argument position
1229 * of the score of the first score-element pair. */
1230 scoreidx = 2;
1231 while(scoreidx < c->argc) {
1232 char *opt = c->argv[scoreidx]->ptr;
1233 if (!strcasecmp(opt,"nx")) flags |= ZADD_NX;
1234 else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX;
1235 else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH;
1236 else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR;
1237 else break;
1238 scoreidx++;
1239 }
1240
1241 /* Turn options into simple to check vars. */
1242 int incr = (flags & ZADD_INCR) != 0;
1243 int nx = (flags & ZADD_NX) != 0;
1244 int xx = (flags & ZADD_XX) != 0;
1245 int ch = (flags & ZADD_CH) != 0;
1246
1247 /* After the options, we expect to have an even number of args, since
1248 * we expect any number of score-element pairs. */
1249 elements = c->argc-scoreidx;
1250 if (elements % 2) {
1251 addReply(c,shared.syntaxerr);
1252 return;
1253 }
1254 elements /= 2; /* Now this holds the number of score-element pairs. */
1255
1256 /* Check for incompatible options. */
1257 if (nx && xx) {
1258 addReplyError(c,
1259 "XX and NX options at the same time are not compatible");
1260 return;
1261 }
1262
1263 if (incr && elements > 1) {
1264 addReplyError(c,
1265 "INCR option supports a single increment-element pair");
1266 return;
1267 }
1268
1269 /* Start parsing all the scores, we need to emit any syntax error
1270 * before executing additions to the sorted set, as the command should
1271 * either execute fully or nothing at all. */
1272 scores = zmalloc(sizeof(double)*elements);
1273 for (j = 0; j < elements; j++) {
1274 if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
1275 != C_OK) goto cleanup;
1276 }
1277
1278 /* Lookup the key and create the sorted set if does not exist. */
1279 zobj = lookupKeyWrite(c->db,key);
1280 if (zobj == NULL) {
1281 if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
1282 if (server.zset_max_ziplist_entries == 0 ||
1283 server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
1284 {
1285 zobj = createZsetObject();
1286 } else {
1287 zobj = createZsetZiplistObject();
1288 }
1289 dbAdd(c->db,key,zobj);
1290 } else {
1291 if (zobj->type != OBJ_ZSET) {
1292 addReply(c,shared.wrongtypeerr);
1293 goto cleanup;
1294 }
1295 }
1296
1297 for (j = 0; j < elements; j++) {
1298 score = scores[j];
1299
1300 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
1301 unsigned char *eptr;
1302
1303 /* Prefer non-encoded element when dealing with ziplists. */
1304 ele = c->argv[scoreidx+1+j*2];
1305 if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
1306 if (nx) continue;
1307 if (incr) {
1308 score += curscore;
1309 if (isnan(score)) {
1310 addReplyError(c,nanerr);
1311 goto cleanup;
1312 }
1313 }
1314
1315 /* Remove and re-insert when score changed. */
1316 if (score != curscore) {
1317 zobj->ptr = zzlDelete(zobj->ptr,eptr);
1318 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
1319 server.dirty++;
1320 updated++;
1321 }
1322 processed++;
1323 } else if (!xx) {
1324 /* Optimize: check if the element is too large or the list
1325 * becomes too long *before* executing zzlInsert. */
1326 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
1327 if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
1328 zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
1329 if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
1330 zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
1331 server.dirty++;
1332 added++;
1333 processed++;
1334 }
1335 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1336 zset *zs = zobj->ptr;
1337 zskiplistNode *znode;
1338 dictEntry *de;
1339
1340 ele = c->argv[scoreidx+1+j*2] =
1341 tryObjectEncoding(c->argv[scoreidx+1+j*2]);
1342 de = dictFind(zs->dict,ele);
1343 if (de != NULL) {
1344 if (nx) continue;
1345 curobj = dictGetKey(de);
1346 curscore = *(double*)dictGetVal(de);
1347
1348 if (incr) {
1349 score += curscore;
1350 if (isnan(score)) {
1351 addReplyError(c,nanerr);
1352 /* Don't need to check if the sorted set is empty
1353 * because we know it has at least one element. */
1354 goto cleanup;
1355 }
1356 }
1357
1358 /* Remove and re-insert when score changed. We can safely
1359 * delete the key object from the skiplist, since the
1360 * dictionary still has a reference to it. */
1361 if (score != curscore) {
1362 serverAssertWithInfo(c,curobj,zslDelete(zs->zsl,curscore,curobj));
1363 znode = zslInsert(zs->zsl,score,curobj);
1364 incrRefCount(curobj); /* Re-inserted in skiplist. */
1365 dictGetVal(de) = &znode->score; /* Update score ptr. */
1366 server.dirty++;
1367 updated++;
1368 }
1369 processed++;
1370 } else if (!xx) {
1371 znode = zslInsert(zs->zsl,score,ele);
1372 incrRefCount(ele); /* Inserted in skiplist. */
1373 serverAssertWithInfo(c,NULL,dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
1374 incrRefCount(ele); /* Added to dictionary. */
1375 server.dirty++;
1376 added++;
1377 processed++;
1378 }
1379 } else {
1380 serverPanic("Unknown sorted set encoding");
1381 }
1382 }
1383
1384 reply_to_client:
1385 if (incr) { /* ZINCRBY or INCR option. */
1386 if (processed)
1387 addReplyDouble(c,score);
1388 else
1389 addReply(c,shared.nullbulk);
1390 } else { /* ZADD. */
1391 addReplyLongLong(c,ch ? added+updated : added);
1392 }
1393
1394 cleanup:
1395 zfree(scores);
1396 if (added || updated) {
1397 signalModifiedKey(c->db,key);
1398 notifyKeyspaceEvent(NOTIFY_ZSET,
1399 incr ? "zincr" : "zadd", key, c->db->id);
1400 }
1401 }
1402
zaddCommand(client * c)1403 void zaddCommand(client *c) {
1404 zaddGenericCommand(c,ZADD_NONE);
1405 }
1406
zincrbyCommand(client * c)1407 void zincrbyCommand(client *c) {
1408 zaddGenericCommand(c,ZADD_INCR);
1409 }
1410
zremCommand(client * c)1411 void zremCommand(client *c) {
1412 robj *key = c->argv[1];
1413 robj *zobj;
1414 int deleted = 0, keyremoved = 0, j;
1415
1416 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1417 checkType(c,zobj,OBJ_ZSET)) return;
1418
1419 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
1420 unsigned char *eptr;
1421
1422 for (j = 2; j < c->argc; j++) {
1423 if ((eptr = zzlFind(zobj->ptr,c->argv[j],NULL)) != NULL) {
1424 deleted++;
1425 zobj->ptr = zzlDelete(zobj->ptr,eptr);
1426 if (zzlLength(zobj->ptr) == 0) {
1427 dbDelete(c->db,key);
1428 keyremoved = 1;
1429 break;
1430 }
1431 }
1432 }
1433 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1434 zset *zs = zobj->ptr;
1435 dictEntry *de;
1436 double score;
1437
1438 for (j = 2; j < c->argc; j++) {
1439 de = dictFind(zs->dict,c->argv[j]);
1440 if (de != NULL) {
1441 deleted++;
1442
1443 /* Delete from the skiplist */
1444 score = *(double*)dictGetVal(de);
1445 serverAssertWithInfo(c,c->argv[j],zslDelete(zs->zsl,score,c->argv[j]));
1446
1447 /* Delete from the hash table */
1448 dictDelete(zs->dict,c->argv[j]);
1449 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1450 if (dictSize(zs->dict) == 0) {
1451 dbDelete(c->db,key);
1452 keyremoved = 1;
1453 break;
1454 }
1455 }
1456 }
1457 } else {
1458 serverPanic("Unknown sorted set encoding");
1459 }
1460
1461 if (deleted) {
1462 notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id);
1463 if (keyremoved)
1464 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
1465 signalModifiedKey(c->db,key);
1466 server.dirty += deleted;
1467 }
1468 addReplyLongLong(c,deleted);
1469 }
1470
1471 /* Implements ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZREMRANGEBYLEX commands. */
1472 #define ZRANGE_RANK 0
1473 #define ZRANGE_SCORE 1
1474 #define ZRANGE_LEX 2
zremrangeGenericCommand(client * c,int rangetype)1475 void zremrangeGenericCommand(client *c, int rangetype) {
1476 robj *key = c->argv[1];
1477 robj *zobj;
1478 int keyremoved = 0;
1479 unsigned long deleted = 0;
1480 zrangespec range;
1481 zlexrangespec lexrange;
1482 long start, end, llen;
1483
1484 /* Step 1: Parse the range. */
1485 if (rangetype == ZRANGE_RANK) {
1486 if ((getLongFromObjectOrReply(c,c->argv[2],&start,NULL) != C_OK) ||
1487 (getLongFromObjectOrReply(c,c->argv[3],&end,NULL) != C_OK))
1488 return;
1489 } else if (rangetype == ZRANGE_SCORE) {
1490 if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK) {
1491 addReplyError(c,"min or max is not a float");
1492 return;
1493 }
1494 } else if (rangetype == ZRANGE_LEX) {
1495 if (zslParseLexRange(c->argv[2],c->argv[3],&lexrange) != C_OK) {
1496 addReplyError(c,"min or max not valid string range item");
1497 return;
1498 }
1499 }
1500
1501 /* Step 2: Lookup & range sanity checks if needed. */
1502 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1503 checkType(c,zobj,OBJ_ZSET)) goto cleanup;
1504
1505 if (rangetype == ZRANGE_RANK) {
1506 /* Sanitize indexes. */
1507 llen = zsetLength(zobj);
1508 if (start < 0) start = llen+start;
1509 if (end < 0) end = llen+end;
1510 if (start < 0) start = 0;
1511
1512 /* Invariant: start >= 0, so this test will be true when end < 0.
1513 * The range is empty when start > end or start >= length. */
1514 if (start > end || start >= llen) {
1515 addReply(c,shared.czero);
1516 goto cleanup;
1517 }
1518 if (end >= llen) end = llen-1;
1519 }
1520
1521 /* Step 3: Perform the range deletion operation. */
1522 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
1523 switch(rangetype) {
1524 case ZRANGE_RANK:
1525 zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);
1526 break;
1527 case ZRANGE_SCORE:
1528 zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,&range,&deleted);
1529 break;
1530 case ZRANGE_LEX:
1531 zobj->ptr = zzlDeleteRangeByLex(zobj->ptr,&lexrange,&deleted);
1532 break;
1533 }
1534 if (zzlLength(zobj->ptr) == 0) {
1535 dbDelete(c->db,key);
1536 keyremoved = 1;
1537 }
1538 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
1539 zset *zs = zobj->ptr;
1540 switch(rangetype) {
1541 case ZRANGE_RANK:
1542 deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
1543 break;
1544 case ZRANGE_SCORE:
1545 deleted = zslDeleteRangeByScore(zs->zsl,&range,zs->dict);
1546 break;
1547 case ZRANGE_LEX:
1548 deleted = zslDeleteRangeByLex(zs->zsl,&lexrange,zs->dict);
1549 break;
1550 }
1551 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1552 if (dictSize(zs->dict) == 0) {
1553 dbDelete(c->db,key);
1554 keyremoved = 1;
1555 }
1556 } else {
1557 serverPanic("Unknown sorted set encoding");
1558 }
1559
1560 /* Step 4: Notifications and reply. */
1561 if (deleted) {
1562 char *event[3] = {"zremrangebyrank","zremrangebyscore","zremrangebylex"};
1563 signalModifiedKey(c->db,key);
1564 notifyKeyspaceEvent(NOTIFY_ZSET,event[rangetype],key,c->db->id);
1565 if (keyremoved)
1566 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
1567 }
1568 server.dirty += deleted;
1569 addReplyLongLong(c,deleted);
1570
1571 cleanup:
1572 if (rangetype == ZRANGE_LEX) zslFreeLexRange(&lexrange);
1573 }
1574
zremrangebyrankCommand(client * c)1575 void zremrangebyrankCommand(client *c) {
1576 zremrangeGenericCommand(c,ZRANGE_RANK);
1577 }
1578
zremrangebyscoreCommand(client * c)1579 void zremrangebyscoreCommand(client *c) {
1580 zremrangeGenericCommand(c,ZRANGE_SCORE);
1581 }
1582
zremrangebylexCommand(client * c)1583 void zremrangebylexCommand(client *c) {
1584 zremrangeGenericCommand(c,ZRANGE_LEX);
1585 }
1586
1587 typedef struct {
1588 robj *subject;
1589 int type; /* Set, sorted set */
1590 int encoding;
1591 double weight;
1592
1593 union {
1594 /* Set iterators. */
1595 union _iterset {
1596 struct {
1597 intset *is;
1598 int ii;
1599 } is;
1600 struct {
1601 dict *dict;
1602 dictIterator *di;
1603 dictEntry *de;
1604 } ht;
1605 } set;
1606
1607 /* Sorted set iterators. */
1608 union _iterzset {
1609 struct {
1610 unsigned char *zl;
1611 unsigned char *eptr, *sptr;
1612 } zl;
1613 struct {
1614 zset *zs;
1615 zskiplistNode *node;
1616 } sl;
1617 } zset;
1618 } iter;
1619 } zsetopsrc;
1620
1621
1622 /* Use dirty flags for pointers that need to be cleaned up in the next
1623 * iteration over the zsetopval. The dirty flag for the long long value is
1624 * special, since long long values don't need cleanup. Instead, it means that
1625 * we already checked that "ell" holds a long long, or tried to convert another
1626 * representation into a long long value. When this was successful,
1627 * OPVAL_VALID_LL is set as well. */
1628 #define OPVAL_DIRTY_ROBJ 1
1629 #define OPVAL_DIRTY_LL 2
1630 #define OPVAL_VALID_LL 4
1631
1632 /* Store value retrieved from the iterator. */
1633 typedef struct {
1634 int flags;
1635 unsigned char _buf[32]; /* Private buffer. */
1636 robj *ele;
1637 unsigned char *estr;
1638 unsigned int elen;
1639 long long ell;
1640 double score;
1641 } zsetopval;
1642
1643 typedef union _iterset iterset;
1644 typedef union _iterzset iterzset;
1645
zuiInitIterator(zsetopsrc * op)1646 void zuiInitIterator(zsetopsrc *op) {
1647 if (op->subject == NULL)
1648 return;
1649
1650 if (op->type == OBJ_SET) {
1651 iterset *it = &op->iter.set;
1652 if (op->encoding == OBJ_ENCODING_INTSET) {
1653 it->is.is = op->subject->ptr;
1654 it->is.ii = 0;
1655 } else if (op->encoding == OBJ_ENCODING_HT) {
1656 it->ht.dict = op->subject->ptr;
1657 it->ht.di = dictGetIterator(op->subject->ptr);
1658 it->ht.de = dictNext(it->ht.di);
1659 } else {
1660 serverPanic("Unknown set encoding");
1661 }
1662 } else if (op->type == OBJ_ZSET) {
1663 iterzset *it = &op->iter.zset;
1664 if (op->encoding == OBJ_ENCODING_ZIPLIST) {
1665 it->zl.zl = op->subject->ptr;
1666 it->zl.eptr = ziplistIndex(it->zl.zl,0);
1667 if (it->zl.eptr != NULL) {
1668 it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr);
1669 serverAssert(it->zl.sptr != NULL);
1670 }
1671 } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
1672 it->sl.zs = op->subject->ptr;
1673 it->sl.node = it->sl.zs->zsl->header->level[0].forward;
1674 } else {
1675 serverPanic("Unknown sorted set encoding");
1676 }
1677 } else {
1678 serverPanic("Unsupported type");
1679 }
1680 }
1681
zuiClearIterator(zsetopsrc * op)1682 void zuiClearIterator(zsetopsrc *op) {
1683 if (op->subject == NULL)
1684 return;
1685
1686 if (op->type == OBJ_SET) {
1687 iterset *it = &op->iter.set;
1688 if (op->encoding == OBJ_ENCODING_INTSET) {
1689 UNUSED(it); /* skip */
1690 } else if (op->encoding == OBJ_ENCODING_HT) {
1691 dictReleaseIterator(it->ht.di);
1692 } else {
1693 serverPanic("Unknown set encoding");
1694 }
1695 } else if (op->type == OBJ_ZSET) {
1696 iterzset *it = &op->iter.zset;
1697 if (op->encoding == OBJ_ENCODING_ZIPLIST) {
1698 UNUSED(it); /* skip */
1699 } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
1700 UNUSED(it); /* skip */
1701 } else {
1702 serverPanic("Unknown sorted set encoding");
1703 }
1704 } else {
1705 serverPanic("Unsupported type");
1706 }
1707 }
1708
zuiLength(zsetopsrc * op)1709 int zuiLength(zsetopsrc *op) {
1710 if (op->subject == NULL)
1711 return 0;
1712
1713 if (op->type == OBJ_SET) {
1714 if (op->encoding == OBJ_ENCODING_INTSET) {
1715 return intsetLen(op->subject->ptr);
1716 } else if (op->encoding == OBJ_ENCODING_HT) {
1717 dict *ht = op->subject->ptr;
1718 return dictSize(ht);
1719 } else {
1720 serverPanic("Unknown set encoding");
1721 }
1722 } else if (op->type == OBJ_ZSET) {
1723 if (op->encoding == OBJ_ENCODING_ZIPLIST) {
1724 return zzlLength(op->subject->ptr);
1725 } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
1726 zset *zs = op->subject->ptr;
1727 return zs->zsl->length;
1728 } else {
1729 serverPanic("Unknown sorted set encoding");
1730 }
1731 } else {
1732 serverPanic("Unsupported type");
1733 }
1734 }
1735
1736 /* Check if the current value is valid. If so, store it in the passed structure
1737 * and move to the next element. If not valid, this means we have reached the
1738 * end of the structure and can abort. */
zuiNext(zsetopsrc * op,zsetopval * val)1739 int zuiNext(zsetopsrc *op, zsetopval *val) {
1740 if (op->subject == NULL)
1741 return 0;
1742
1743 if (val->flags & OPVAL_DIRTY_ROBJ)
1744 decrRefCount(val->ele);
1745
1746 memset(val,0,sizeof(zsetopval));
1747
1748 if (op->type == OBJ_SET) {
1749 iterset *it = &op->iter.set;
1750 if (op->encoding == OBJ_ENCODING_INTSET) {
1751 int64_t ell;
1752
1753 if (!intsetGet(it->is.is,it->is.ii,&ell))
1754 return 0;
1755 val->ell = ell;
1756 val->score = 1.0;
1757
1758 /* Move to next element. */
1759 it->is.ii++;
1760 } else if (op->encoding == OBJ_ENCODING_HT) {
1761 if (it->ht.de == NULL)
1762 return 0;
1763 val->ele = dictGetKey(it->ht.de);
1764 val->score = 1.0;
1765
1766 /* Move to next element. */
1767 it->ht.de = dictNext(it->ht.di);
1768 } else {
1769 serverPanic("Unknown set encoding");
1770 }
1771 } else if (op->type == OBJ_ZSET) {
1772 iterzset *it = &op->iter.zset;
1773 if (op->encoding == OBJ_ENCODING_ZIPLIST) {
1774 /* No need to check both, but better be explicit. */
1775 if (it->zl.eptr == NULL || it->zl.sptr == NULL)
1776 return 0;
1777 serverAssert(ziplistGet(it->zl.eptr,&val->estr,&val->elen,&val->ell));
1778 val->score = zzlGetScore(it->zl.sptr);
1779
1780 /* Move to next element. */
1781 zzlNext(it->zl.zl,&it->zl.eptr,&it->zl.sptr);
1782 } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
1783 if (it->sl.node == NULL)
1784 return 0;
1785 val->ele = it->sl.node->obj;
1786 val->score = it->sl.node->score;
1787
1788 /* Move to next element. */
1789 it->sl.node = it->sl.node->level[0].forward;
1790 } else {
1791 serverPanic("Unknown sorted set encoding");
1792 }
1793 } else {
1794 serverPanic("Unsupported type");
1795 }
1796 return 1;
1797 }
1798
zuiLongLongFromValue(zsetopval * val)1799 int zuiLongLongFromValue(zsetopval *val) {
1800 if (!(val->flags & OPVAL_DIRTY_LL)) {
1801 val->flags |= OPVAL_DIRTY_LL;
1802
1803 if (val->ele != NULL) {
1804 if (val->ele->encoding == OBJ_ENCODING_INT) {
1805 val->ell = (long)val->ele->ptr;
1806 val->flags |= OPVAL_VALID_LL;
1807 } else if (sdsEncodedObject(val->ele)) {
1808 if (string2ll(val->ele->ptr,sdslen(val->ele->ptr),&val->ell))
1809 val->flags |= OPVAL_VALID_LL;
1810 } else {
1811 serverPanic("Unsupported element encoding");
1812 }
1813 } else if (val->estr != NULL) {
1814 if (string2ll((char*)val->estr,val->elen,&val->ell))
1815 val->flags |= OPVAL_VALID_LL;
1816 } else {
1817 /* The long long was already set, flag as valid. */
1818 val->flags |= OPVAL_VALID_LL;
1819 }
1820 }
1821 return val->flags & OPVAL_VALID_LL;
1822 }
1823
zuiObjectFromValue(zsetopval * val)1824 robj *zuiObjectFromValue(zsetopval *val) {
1825 if (val->ele == NULL) {
1826 if (val->estr != NULL) {
1827 val->ele = createStringObject((char*)val->estr,val->elen);
1828 } else {
1829 val->ele = createStringObjectFromLongLong(val->ell);
1830 }
1831 val->flags |= OPVAL_DIRTY_ROBJ;
1832 }
1833 return val->ele;
1834 }
1835
zuiBufferFromValue(zsetopval * val)1836 int zuiBufferFromValue(zsetopval *val) {
1837 if (val->estr == NULL) {
1838 if (val->ele != NULL) {
1839 if (val->ele->encoding == OBJ_ENCODING_INT) {
1840 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),(long)val->ele->ptr);
1841 val->estr = val->_buf;
1842 } else if (sdsEncodedObject(val->ele)) {
1843 val->elen = sdslen(val->ele->ptr);
1844 val->estr = val->ele->ptr;
1845 } else {
1846 serverPanic("Unsupported element encoding");
1847 }
1848 } else {
1849 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell);
1850 val->estr = val->_buf;
1851 }
1852 }
1853 return 1;
1854 }
1855
1856 /* Find value pointed to by val in the source pointer to by op. When found,
1857 * return 1 and store its score in target. Return 0 otherwise. */
zuiFind(zsetopsrc * op,zsetopval * val,double * score)1858 int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
1859 if (op->subject == NULL)
1860 return 0;
1861
1862 if (op->type == OBJ_SET) {
1863 if (op->encoding == OBJ_ENCODING_INTSET) {
1864 if (zuiLongLongFromValue(val) &&
1865 intsetFind(op->subject->ptr,val->ell))
1866 {
1867 *score = 1.0;
1868 return 1;
1869 } else {
1870 return 0;
1871 }
1872 } else if (op->encoding == OBJ_ENCODING_HT) {
1873 dict *ht = op->subject->ptr;
1874 zuiObjectFromValue(val);
1875 if (dictFind(ht,val->ele) != NULL) {
1876 *score = 1.0;
1877 return 1;
1878 } else {
1879 return 0;
1880 }
1881 } else {
1882 serverPanic("Unknown set encoding");
1883 }
1884 } else if (op->type == OBJ_ZSET) {
1885 zuiObjectFromValue(val);
1886
1887 if (op->encoding == OBJ_ENCODING_ZIPLIST) {
1888 if (zzlFind(op->subject->ptr,val->ele,score) != NULL) {
1889 /* Score is already set by zzlFind. */
1890 return 1;
1891 } else {
1892 return 0;
1893 }
1894 } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
1895 zset *zs = op->subject->ptr;
1896 dictEntry *de;
1897 if ((de = dictFind(zs->dict,val->ele)) != NULL) {
1898 *score = *(double*)dictGetVal(de);
1899 return 1;
1900 } else {
1901 return 0;
1902 }
1903 } else {
1904 serverPanic("Unknown sorted set encoding");
1905 }
1906 } else {
1907 serverPanic("Unsupported type");
1908 }
1909 }
1910
zuiCompareByCardinality(const void * s1,const void * s2)1911 int zuiCompareByCardinality(const void *s1, const void *s2) {
1912 return zuiLength((zsetopsrc*)s1) - zuiLength((zsetopsrc*)s2);
1913 }
1914
1915 #define REDIS_AGGR_SUM 1
1916 #define REDIS_AGGR_MIN 2
1917 #define REDIS_AGGR_MAX 3
1918 #define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e))
1919
zunionInterAggregate(double * target,double val,int aggregate)1920 inline static void zunionInterAggregate(double *target, double val, int aggregate) {
1921 if (aggregate == REDIS_AGGR_SUM) {
1922 *target = *target + val;
1923 /* The result of adding two doubles is NaN when one variable
1924 * is +inf and the other is -inf. When these numbers are added,
1925 * we maintain the convention of the result being 0.0. */
1926 if (isnan(*target)) *target = 0.0;
1927 } else if (aggregate == REDIS_AGGR_MIN) {
1928 *target = val < *target ? val : *target;
1929 } else if (aggregate == REDIS_AGGR_MAX) {
1930 *target = val > *target ? val : *target;
1931 } else {
1932 /* safety net */
1933 serverPanic("Unknown ZUNION/INTER aggregate type");
1934 }
1935 }
1936
zunionInterGenericCommand(client * c,robj * dstkey,int op)1937 void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
1938 int i, j;
1939 long setnum;
1940 int aggregate = REDIS_AGGR_SUM;
1941 zsetopsrc *src;
1942 zsetopval zval;
1943 robj *tmp;
1944 unsigned int maxelelen = 0;
1945 robj *dstobj;
1946 zset *dstzset;
1947 zskiplistNode *znode;
1948 int touched = 0;
1949
1950 /* expect setnum input keys to be given */
1951 if ((getLongFromObjectOrReply(c, c->argv[2], &setnum, NULL) != C_OK))
1952 return;
1953
1954 if (setnum < 1) {
1955 addReplyError(c,
1956 "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
1957 return;
1958 }
1959
1960 /* test if the expected number of keys would overflow */
1961 if (setnum > c->argc-3) {
1962 addReply(c,shared.syntaxerr);
1963 return;
1964 }
1965
1966 /* read keys to be used for input */
1967 src = zcalloc(sizeof(zsetopsrc) * setnum);
1968 for (i = 0, j = 3; i < setnum; i++, j++) {
1969 robj *obj = lookupKeyWrite(c->db,c->argv[j]);
1970 if (obj != NULL) {
1971 if (obj->type != OBJ_ZSET && obj->type != OBJ_SET) {
1972 zfree(src);
1973 addReply(c,shared.wrongtypeerr);
1974 return;
1975 }
1976
1977 src[i].subject = obj;
1978 src[i].type = obj->type;
1979 src[i].encoding = obj->encoding;
1980 } else {
1981 src[i].subject = NULL;
1982 }
1983
1984 /* Default all weights to 1. */
1985 src[i].weight = 1.0;
1986 }
1987
1988 /* parse optional extra arguments */
1989 if (j < c->argc) {
1990 int remaining = c->argc - j;
1991
1992 while (remaining) {
1993 if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
1994 j++; remaining--;
1995 for (i = 0; i < setnum; i++, j++, remaining--) {
1996 if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
1997 "weight value is not a float") != C_OK)
1998 {
1999 zfree(src);
2000 return;
2001 }
2002 }
2003 } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
2004 j++; remaining--;
2005 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
2006 aggregate = REDIS_AGGR_SUM;
2007 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
2008 aggregate = REDIS_AGGR_MIN;
2009 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
2010 aggregate = REDIS_AGGR_MAX;
2011 } else {
2012 zfree(src);
2013 addReply(c,shared.syntaxerr);
2014 return;
2015 }
2016 j++; remaining--;
2017 } else {
2018 zfree(src);
2019 addReply(c,shared.syntaxerr);
2020 return;
2021 }
2022 }
2023 }
2024
2025 /* sort sets from the smallest to largest, this will improve our
2026 * algorithm's performance */
2027 qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
2028
2029 dstobj = createZsetObject();
2030 dstzset = dstobj->ptr;
2031 memset(&zval, 0, sizeof(zval));
2032
2033 if (op == SET_OP_INTER) {
2034 /* Skip everything if the smallest input is empty. */
2035 if (zuiLength(&src[0]) > 0) {
2036 /* Precondition: as src[0] is non-empty and the inputs are ordered
2037 * by size, all src[i > 0] are non-empty too. */
2038 zuiInitIterator(&src[0]);
2039 while (zuiNext(&src[0],&zval)) {
2040 double score, value;
2041
2042 score = src[0].weight * zval.score;
2043 if (isnan(score)) score = 0;
2044
2045 for (j = 1; j < setnum; j++) {
2046 /* It is not safe to access the zset we are
2047 * iterating, so explicitly check for equal object. */
2048 if (src[j].subject == src[0].subject) {
2049 value = zval.score*src[j].weight;
2050 zunionInterAggregate(&score,value,aggregate);
2051 } else if (zuiFind(&src[j],&zval,&value)) {
2052 value *= src[j].weight;
2053 zunionInterAggregate(&score,value,aggregate);
2054 } else {
2055 break;
2056 }
2057 }
2058
2059 /* Only continue when present in every input. */
2060 if (j == setnum) {
2061 tmp = zuiObjectFromValue(&zval);
2062 znode = zslInsert(dstzset->zsl,score,tmp);
2063 incrRefCount(tmp); /* added to skiplist */
2064 dictAdd(dstzset->dict,tmp,&znode->score);
2065 incrRefCount(tmp); /* added to dictionary */
2066
2067 if (sdsEncodedObject(tmp)) {
2068 if (sdslen(tmp->ptr) > maxelelen)
2069 maxelelen = sdslen(tmp->ptr);
2070 }
2071 }
2072 }
2073 zuiClearIterator(&src[0]);
2074 }
2075 } else if (op == SET_OP_UNION) {
2076 dict *accumulator = dictCreate(&setDictType,NULL);
2077 dictIterator *di;
2078 dictEntry *de;
2079 double score;
2080
2081 if (setnum) {
2082 /* Our union is at least as large as the largest set.
2083 * Resize the dictionary ASAP to avoid useless rehashing. */
2084 dictExpand(accumulator,zuiLength(&src[setnum-1]));
2085 }
2086
2087 /* Step 1: Create a dictionary of elements -> aggregated-scores
2088 * by iterating one sorted set after the other. */
2089 for (i = 0; i < setnum; i++) {
2090 if (zuiLength(&src[i]) == 0) continue;
2091
2092 zuiInitIterator(&src[i]);
2093 while (zuiNext(&src[i],&zval)) {
2094 /* Initialize value */
2095 score = src[i].weight * zval.score;
2096 if (isnan(score)) score = 0;
2097
2098 /* Search for this element in the accumulating dictionary. */
2099 de = dictFind(accumulator,zuiObjectFromValue(&zval));
2100 /* If we don't have it, we need to create a new entry. */
2101 if (de == NULL) {
2102 tmp = zuiObjectFromValue(&zval);
2103 /* Remember the longest single element encountered,
2104 * to understand if it's possible to convert to ziplist
2105 * at the end. */
2106 if (sdsEncodedObject(tmp)) {
2107 if (sdslen(tmp->ptr) > maxelelen)
2108 maxelelen = sdslen(tmp->ptr);
2109 }
2110 /* Add the element with its initial score. */
2111 de = dictAddRaw(accumulator,tmp);
2112 incrRefCount(tmp);
2113 dictSetDoubleVal(de,score);
2114 } else {
2115 /* Update the score with the score of the new instance
2116 * of the element found in the current sorted set.
2117 *
2118 * Here we access directly the dictEntry double
2119 * value inside the union as it is a big speedup
2120 * compared to using the getDouble/setDouble API. */
2121 zunionInterAggregate(&de->v.d,score,aggregate);
2122 }
2123 }
2124 zuiClearIterator(&src[i]);
2125 }
2126
2127 /* Step 2: convert the dictionary into the final sorted set. */
2128 di = dictGetIterator(accumulator);
2129
2130 /* We now are aware of the final size of the resulting sorted set,
2131 * let's resize the dictionary embedded inside the sorted set to the
2132 * right size, in order to save rehashing time. */
2133 dictExpand(dstzset->dict,dictSize(accumulator));
2134
2135 while((de = dictNext(di)) != NULL) {
2136 robj *ele = dictGetKey(de);
2137 score = dictGetDoubleVal(de);
2138 znode = zslInsert(dstzset->zsl,score,ele);
2139 incrRefCount(ele); /* added to skiplist */
2140 dictAdd(dstzset->dict,ele,&znode->score);
2141 incrRefCount(ele); /* added to dictionary */
2142 }
2143 dictReleaseIterator(di);
2144
2145 /* We can free the accumulator dictionary now. */
2146 dictRelease(accumulator);
2147 } else {
2148 serverPanic("Unknown operator");
2149 }
2150
2151 if (dbDelete(c->db,dstkey))
2152 touched = 1;
2153 if (dstzset->zsl->length) {
2154 zsetConvertToZiplistIfNeeded(dstobj,maxelelen);
2155 dbAdd(c->db,dstkey,dstobj);
2156 addReplyLongLong(c,zsetLength(dstobj));
2157 signalModifiedKey(c->db,dstkey);
2158 notifyKeyspaceEvent(NOTIFY_ZSET,
2159 (op == SET_OP_UNION) ? "zunionstore" : "zinterstore",
2160 dstkey,c->db->id);
2161 server.dirty++;
2162 } else {
2163 decrRefCount(dstobj);
2164 addReply(c,shared.czero);
2165 if (touched) {
2166 signalModifiedKey(c->db,dstkey);
2167 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id);
2168 server.dirty++;
2169 }
2170 }
2171 zfree(src);
2172 }
2173
zunionstoreCommand(client * c)2174 void zunionstoreCommand(client *c) {
2175 zunionInterGenericCommand(c,c->argv[1], SET_OP_UNION);
2176 }
2177
zinterstoreCommand(client * c)2178 void zinterstoreCommand(client *c) {
2179 zunionInterGenericCommand(c,c->argv[1], SET_OP_INTER);
2180 }
2181
zrangeGenericCommand(client * c,int reverse)2182 void zrangeGenericCommand(client *c, int reverse) {
2183 robj *key = c->argv[1];
2184 robj *zobj;
2185 int withscores = 0;
2186 long start;
2187 long end;
2188 int llen;
2189 int rangelen;
2190
2191 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
2192 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
2193
2194 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
2195 withscores = 1;
2196 } else if (c->argc >= 5) {
2197 addReply(c,shared.syntaxerr);
2198 return;
2199 }
2200
2201 if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
2202 || checkType(c,zobj,OBJ_ZSET)) return;
2203
2204 /* Sanitize indexes. */
2205 llen = zsetLength(zobj);
2206 if (start < 0) start = llen+start;
2207 if (end < 0) end = llen+end;
2208 if (start < 0) start = 0;
2209
2210 /* Invariant: start >= 0, so this test will be true when end < 0.
2211 * The range is empty when start > end or start >= length. */
2212 if (start > end || start >= llen) {
2213 addReply(c,shared.emptymultibulk);
2214 return;
2215 }
2216 if (end >= llen) end = llen-1;
2217 rangelen = (end-start)+1;
2218
2219 /* Return the result in form of a multi-bulk reply */
2220 addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
2221
2222 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
2223 unsigned char *zl = zobj->ptr;
2224 unsigned char *eptr, *sptr;
2225 unsigned char *vstr;
2226 unsigned int vlen;
2227 long long vlong;
2228
2229 if (reverse)
2230 eptr = ziplistIndex(zl,-2-(2*start));
2231 else
2232 eptr = ziplistIndex(zl,2*start);
2233
2234 serverAssertWithInfo(c,zobj,eptr != NULL);
2235 sptr = ziplistNext(zl,eptr);
2236
2237 while (rangelen--) {
2238 serverAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL);
2239 serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
2240 if (vstr == NULL)
2241 addReplyBulkLongLong(c,vlong);
2242 else
2243 addReplyBulkCBuffer(c,vstr,vlen);
2244
2245 if (withscores)
2246 addReplyDouble(c,zzlGetScore(sptr));
2247
2248 if (reverse)
2249 zzlPrev(zl,&eptr,&sptr);
2250 else
2251 zzlNext(zl,&eptr,&sptr);
2252 }
2253
2254 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
2255 zset *zs = zobj->ptr;
2256 zskiplist *zsl = zs->zsl;
2257 zskiplistNode *ln;
2258 robj *ele;
2259
2260 /* Check if starting point is trivial, before doing log(N) lookup. */
2261 if (reverse) {
2262 ln = zsl->tail;
2263 if (start > 0)
2264 ln = zslGetElementByRank(zsl,llen-start);
2265 } else {
2266 ln = zsl->header->level[0].forward;
2267 if (start > 0)
2268 ln = zslGetElementByRank(zsl,start+1);
2269 }
2270
2271 while(rangelen--) {
2272 serverAssertWithInfo(c,zobj,ln != NULL);
2273 ele = ln->obj;
2274 addReplyBulk(c,ele);
2275 if (withscores)
2276 addReplyDouble(c,ln->score);
2277 ln = reverse ? ln->backward : ln->level[0].forward;
2278 }
2279 } else {
2280 serverPanic("Unknown sorted set encoding");
2281 }
2282 }
2283
zrangeCommand(client * c)2284 void zrangeCommand(client *c) {
2285 zrangeGenericCommand(c,0);
2286 }
2287
zrevrangeCommand(client * c)2288 void zrevrangeCommand(client *c) {
2289 zrangeGenericCommand(c,1);
2290 }
2291
2292 /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
genericZrangebyscoreCommand(client * c,int reverse)2293 void genericZrangebyscoreCommand(client *c, int reverse) {
2294 zrangespec range;
2295 robj *key = c->argv[1];
2296 robj *zobj;
2297 long offset = 0, limit = -1;
2298 int withscores = 0;
2299 unsigned long rangelen = 0;
2300 void *replylen = NULL;
2301 int minidx, maxidx;
2302
2303 /* Parse the range arguments. */
2304 if (reverse) {
2305 /* Range is given as [max,min] */
2306 maxidx = 2; minidx = 3;
2307 } else {
2308 /* Range is given as [min,max] */
2309 minidx = 2; maxidx = 3;
2310 }
2311
2312 if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != C_OK) {
2313 addReplyError(c,"min or max is not a float");
2314 return;
2315 }
2316
2317 /* Parse optional extra arguments. Note that ZCOUNT will exactly have
2318 * 4 arguments, so we'll never enter the following code path. */
2319 if (c->argc > 4) {
2320 int remaining = c->argc - 4;
2321 int pos = 4;
2322
2323 while (remaining) {
2324 if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
2325 pos++; remaining--;
2326 withscores = 1;
2327 } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
2328 if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != C_OK) ||
2329 (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != C_OK)) return;
2330 pos += 3; remaining -= 3;
2331 } else {
2332 addReply(c,shared.syntaxerr);
2333 return;
2334 }
2335 }
2336 }
2337
2338 /* Ok, lookup the key and get the range */
2339 if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL ||
2340 checkType(c,zobj,OBJ_ZSET)) return;
2341
2342 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
2343 unsigned char *zl = zobj->ptr;
2344 unsigned char *eptr, *sptr;
2345 unsigned char *vstr;
2346 unsigned int vlen;
2347 long long vlong;
2348 double score;
2349
2350 /* If reversed, get the last node in range as starting point. */
2351 if (reverse) {
2352 eptr = zzlLastInRange(zl,&range);
2353 } else {
2354 eptr = zzlFirstInRange(zl,&range);
2355 }
2356
2357 /* No "first" element in the specified interval. */
2358 if (eptr == NULL) {
2359 addReply(c, shared.emptymultibulk);
2360 return;
2361 }
2362
2363 /* Get score pointer for the first element. */
2364 serverAssertWithInfo(c,zobj,eptr != NULL);
2365 sptr = ziplistNext(zl,eptr);
2366
2367 /* We don't know in advance how many matching elements there are in the
2368 * list, so we push this object that will represent the multi-bulk
2369 * length in the output buffer, and will "fix" it later */
2370 replylen = addDeferredMultiBulkLength(c);
2371
2372 /* If there is an offset, just traverse the number of elements without
2373 * checking the score because that is done in the next loop. */
2374 while (eptr && offset--) {
2375 if (reverse) {
2376 zzlPrev(zl,&eptr,&sptr);
2377 } else {
2378 zzlNext(zl,&eptr,&sptr);
2379 }
2380 }
2381
2382 while (eptr && limit--) {
2383 score = zzlGetScore(sptr);
2384
2385 /* Abort when the node is no longer in range. */
2386 if (reverse) {
2387 if (!zslValueGteMin(score,&range)) break;
2388 } else {
2389 if (!zslValueLteMax(score,&range)) break;
2390 }
2391
2392 /* We know the element exists, so ziplistGet should always succeed */
2393 serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
2394
2395 rangelen++;
2396 if (vstr == NULL) {
2397 addReplyBulkLongLong(c,vlong);
2398 } else {
2399 addReplyBulkCBuffer(c,vstr,vlen);
2400 }
2401
2402 if (withscores) {
2403 addReplyDouble(c,score);
2404 }
2405
2406 /* Move to next node */
2407 if (reverse) {
2408 zzlPrev(zl,&eptr,&sptr);
2409 } else {
2410 zzlNext(zl,&eptr,&sptr);
2411 }
2412 }
2413 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
2414 zset *zs = zobj->ptr;
2415 zskiplist *zsl = zs->zsl;
2416 zskiplistNode *ln;
2417
2418 /* If reversed, get the last node in range as starting point. */
2419 if (reverse) {
2420 ln = zslLastInRange(zsl,&range);
2421 } else {
2422 ln = zslFirstInRange(zsl,&range);
2423 }
2424
2425 /* No "first" element in the specified interval. */
2426 if (ln == NULL) {
2427 addReply(c, shared.emptymultibulk);
2428 return;
2429 }
2430
2431 /* We don't know in advance how many matching elements there are in the
2432 * list, so we push this object that will represent the multi-bulk
2433 * length in the output buffer, and will "fix" it later */
2434 replylen = addDeferredMultiBulkLength(c);
2435
2436 /* If there is an offset, just traverse the number of elements without
2437 * checking the score because that is done in the next loop. */
2438 while (ln && offset--) {
2439 if (reverse) {
2440 ln = ln->backward;
2441 } else {
2442 ln = ln->level[0].forward;
2443 }
2444 }
2445
2446 while (ln && limit--) {
2447 /* Abort when the node is no longer in range. */
2448 if (reverse) {
2449 if (!zslValueGteMin(ln->score,&range)) break;
2450 } else {
2451 if (!zslValueLteMax(ln->score,&range)) break;
2452 }
2453
2454 rangelen++;
2455 addReplyBulk(c,ln->obj);
2456
2457 if (withscores) {
2458 addReplyDouble(c,ln->score);
2459 }
2460
2461 /* Move to next node */
2462 if (reverse) {
2463 ln = ln->backward;
2464 } else {
2465 ln = ln->level[0].forward;
2466 }
2467 }
2468 } else {
2469 serverPanic("Unknown sorted set encoding");
2470 }
2471
2472 if (withscores) {
2473 rangelen *= 2;
2474 }
2475
2476 setDeferredMultiBulkLength(c, replylen, rangelen);
2477 }
2478
zrangebyscoreCommand(client * c)2479 void zrangebyscoreCommand(client *c) {
2480 genericZrangebyscoreCommand(c,0);
2481 }
2482
zrevrangebyscoreCommand(client * c)2483 void zrevrangebyscoreCommand(client *c) {
2484 genericZrangebyscoreCommand(c,1);
2485 }
2486
zcountCommand(client * c)2487 void zcountCommand(client *c) {
2488 robj *key = c->argv[1];
2489 robj *zobj;
2490 zrangespec range;
2491 int count = 0;
2492
2493 /* Parse the range arguments */
2494 if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK) {
2495 addReplyError(c,"min or max is not a float");
2496 return;
2497 }
2498
2499 /* Lookup the sorted set */
2500 if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
2501 checkType(c, zobj, OBJ_ZSET)) return;
2502
2503 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
2504 unsigned char *zl = zobj->ptr;
2505 unsigned char *eptr, *sptr;
2506 double score;
2507
2508 /* Use the first element in range as the starting point */
2509 eptr = zzlFirstInRange(zl,&range);
2510
2511 /* No "first" element */
2512 if (eptr == NULL) {
2513 addReply(c, shared.czero);
2514 return;
2515 }
2516
2517 /* First element is in range */
2518 sptr = ziplistNext(zl,eptr);
2519 score = zzlGetScore(sptr);
2520 serverAssertWithInfo(c,zobj,zslValueLteMax(score,&range));
2521
2522 /* Iterate over elements in range */
2523 while (eptr) {
2524 score = zzlGetScore(sptr);
2525
2526 /* Abort when the node is no longer in range. */
2527 if (!zslValueLteMax(score,&range)) {
2528 break;
2529 } else {
2530 count++;
2531 zzlNext(zl,&eptr,&sptr);
2532 }
2533 }
2534 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
2535 zset *zs = zobj->ptr;
2536 zskiplist *zsl = zs->zsl;
2537 zskiplistNode *zn;
2538 unsigned long rank;
2539
2540 /* Find first element in range */
2541 zn = zslFirstInRange(zsl, &range);
2542
2543 /* Use rank of first element, if any, to determine preliminary count */
2544 if (zn != NULL) {
2545 rank = zslGetRank(zsl, zn->score, zn->obj);
2546 count = (zsl->length - (rank - 1));
2547
2548 /* Find last element in range */
2549 zn = zslLastInRange(zsl, &range);
2550
2551 /* Use rank of last element, if any, to determine the actual count */
2552 if (zn != NULL) {
2553 rank = zslGetRank(zsl, zn->score, zn->obj);
2554 count -= (zsl->length - rank);
2555 }
2556 }
2557 } else {
2558 serverPanic("Unknown sorted set encoding");
2559 }
2560
2561 addReplyLongLong(c, count);
2562 }
2563
zlexcountCommand(client * c)2564 void zlexcountCommand(client *c) {
2565 robj *key = c->argv[1];
2566 robj *zobj;
2567 zlexrangespec range;
2568 int count = 0;
2569
2570 /* Parse the range arguments */
2571 if (zslParseLexRange(c->argv[2],c->argv[3],&range) != C_OK) {
2572 addReplyError(c,"min or max not valid string range item");
2573 return;
2574 }
2575
2576 /* Lookup the sorted set */
2577 if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
2578 checkType(c, zobj, OBJ_ZSET))
2579 {
2580 zslFreeLexRange(&range);
2581 return;
2582 }
2583
2584 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
2585 unsigned char *zl = zobj->ptr;
2586 unsigned char *eptr, *sptr;
2587
2588 /* Use the first element in range as the starting point */
2589 eptr = zzlFirstInLexRange(zl,&range);
2590
2591 /* No "first" element */
2592 if (eptr == NULL) {
2593 zslFreeLexRange(&range);
2594 addReply(c, shared.czero);
2595 return;
2596 }
2597
2598 /* First element is in range */
2599 sptr = ziplistNext(zl,eptr);
2600 serverAssertWithInfo(c,zobj,zzlLexValueLteMax(eptr,&range));
2601
2602 /* Iterate over elements in range */
2603 while (eptr) {
2604 /* Abort when the node is no longer in range. */
2605 if (!zzlLexValueLteMax(eptr,&range)) {
2606 break;
2607 } else {
2608 count++;
2609 zzlNext(zl,&eptr,&sptr);
2610 }
2611 }
2612 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
2613 zset *zs = zobj->ptr;
2614 zskiplist *zsl = zs->zsl;
2615 zskiplistNode *zn;
2616 unsigned long rank;
2617
2618 /* Find first element in range */
2619 zn = zslFirstInLexRange(zsl, &range);
2620
2621 /* Use rank of first element, if any, to determine preliminary count */
2622 if (zn != NULL) {
2623 rank = zslGetRank(zsl, zn->score, zn->obj);
2624 count = (zsl->length - (rank - 1));
2625
2626 /* Find last element in range */
2627 zn = zslLastInLexRange(zsl, &range);
2628
2629 /* Use rank of last element, if any, to determine the actual count */
2630 if (zn != NULL) {
2631 rank = zslGetRank(zsl, zn->score, zn->obj);
2632 count -= (zsl->length - rank);
2633 }
2634 }
2635 } else {
2636 serverPanic("Unknown sorted set encoding");
2637 }
2638
2639 zslFreeLexRange(&range);
2640 addReplyLongLong(c, count);
2641 }
2642
2643 /* This command implements ZRANGEBYLEX, ZREVRANGEBYLEX. */
genericZrangebylexCommand(client * c,int reverse)2644 void genericZrangebylexCommand(client *c, int reverse) {
2645 zlexrangespec range;
2646 robj *key = c->argv[1];
2647 robj *zobj;
2648 long offset = 0, limit = -1;
2649 unsigned long rangelen = 0;
2650 void *replylen = NULL;
2651 int minidx, maxidx;
2652
2653 /* Parse the range arguments. */
2654 if (reverse) {
2655 /* Range is given as [max,min] */
2656 maxidx = 2; minidx = 3;
2657 } else {
2658 /* Range is given as [min,max] */
2659 minidx = 2; maxidx = 3;
2660 }
2661
2662 if (zslParseLexRange(c->argv[minidx],c->argv[maxidx],&range) != C_OK) {
2663 addReplyError(c,"min or max not valid string range item");
2664 return;
2665 }
2666
2667 /* Parse optional extra arguments. Note that ZCOUNT will exactly have
2668 * 4 arguments, so we'll never enter the following code path. */
2669 if (c->argc > 4) {
2670 int remaining = c->argc - 4;
2671 int pos = 4;
2672
2673 while (remaining) {
2674 if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
2675 if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != C_OK) ||
2676 (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != C_OK)) return;
2677 pos += 3; remaining -= 3;
2678 } else {
2679 zslFreeLexRange(&range);
2680 addReply(c,shared.syntaxerr);
2681 return;
2682 }
2683 }
2684 }
2685
2686 /* Ok, lookup the key and get the range */
2687 if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL ||
2688 checkType(c,zobj,OBJ_ZSET))
2689 {
2690 zslFreeLexRange(&range);
2691 return;
2692 }
2693
2694 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
2695 unsigned char *zl = zobj->ptr;
2696 unsigned char *eptr, *sptr;
2697 unsigned char *vstr;
2698 unsigned int vlen;
2699 long long vlong;
2700
2701 /* If reversed, get the last node in range as starting point. */
2702 if (reverse) {
2703 eptr = zzlLastInLexRange(zl,&range);
2704 } else {
2705 eptr = zzlFirstInLexRange(zl,&range);
2706 }
2707
2708 /* No "first" element in the specified interval. */
2709 if (eptr == NULL) {
2710 addReply(c, shared.emptymultibulk);
2711 zslFreeLexRange(&range);
2712 return;
2713 }
2714
2715 /* Get score pointer for the first element. */
2716 serverAssertWithInfo(c,zobj,eptr != NULL);
2717 sptr = ziplistNext(zl,eptr);
2718
2719 /* We don't know in advance how many matching elements there are in the
2720 * list, so we push this object that will represent the multi-bulk
2721 * length in the output buffer, and will "fix" it later */
2722 replylen = addDeferredMultiBulkLength(c);
2723
2724 /* If there is an offset, just traverse the number of elements without
2725 * checking the score because that is done in the next loop. */
2726 while (eptr && offset--) {
2727 if (reverse) {
2728 zzlPrev(zl,&eptr,&sptr);
2729 } else {
2730 zzlNext(zl,&eptr,&sptr);
2731 }
2732 }
2733
2734 while (eptr && limit--) {
2735 /* Abort when the node is no longer in range. */
2736 if (reverse) {
2737 if (!zzlLexValueGteMin(eptr,&range)) break;
2738 } else {
2739 if (!zzlLexValueLteMax(eptr,&range)) break;
2740 }
2741
2742 /* We know the element exists, so ziplistGet should always
2743 * succeed. */
2744 serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
2745
2746 rangelen++;
2747 if (vstr == NULL) {
2748 addReplyBulkLongLong(c,vlong);
2749 } else {
2750 addReplyBulkCBuffer(c,vstr,vlen);
2751 }
2752
2753 /* Move to next node */
2754 if (reverse) {
2755 zzlPrev(zl,&eptr,&sptr);
2756 } else {
2757 zzlNext(zl,&eptr,&sptr);
2758 }
2759 }
2760 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
2761 zset *zs = zobj->ptr;
2762 zskiplist *zsl = zs->zsl;
2763 zskiplistNode *ln;
2764
2765 /* If reversed, get the last node in range as starting point. */
2766 if (reverse) {
2767 ln = zslLastInLexRange(zsl,&range);
2768 } else {
2769 ln = zslFirstInLexRange(zsl,&range);
2770 }
2771
2772 /* No "first" element in the specified interval. */
2773 if (ln == NULL) {
2774 addReply(c, shared.emptymultibulk);
2775 zslFreeLexRange(&range);
2776 return;
2777 }
2778
2779 /* We don't know in advance how many matching elements there are in the
2780 * list, so we push this object that will represent the multi-bulk
2781 * length in the output buffer, and will "fix" it later */
2782 replylen = addDeferredMultiBulkLength(c);
2783
2784 /* If there is an offset, just traverse the number of elements without
2785 * checking the score because that is done in the next loop. */
2786 while (ln && offset--) {
2787 if (reverse) {
2788 ln = ln->backward;
2789 } else {
2790 ln = ln->level[0].forward;
2791 }
2792 }
2793
2794 while (ln && limit--) {
2795 /* Abort when the node is no longer in range. */
2796 if (reverse) {
2797 if (!zslLexValueGteMin(ln->obj,&range)) break;
2798 } else {
2799 if (!zslLexValueLteMax(ln->obj,&range)) break;
2800 }
2801
2802 rangelen++;
2803 addReplyBulk(c,ln->obj);
2804
2805 /* Move to next node */
2806 if (reverse) {
2807 ln = ln->backward;
2808 } else {
2809 ln = ln->level[0].forward;
2810 }
2811 }
2812 } else {
2813 serverPanic("Unknown sorted set encoding");
2814 }
2815
2816 zslFreeLexRange(&range);
2817 setDeferredMultiBulkLength(c, replylen, rangelen);
2818 }
2819
zrangebylexCommand(client * c)2820 void zrangebylexCommand(client *c) {
2821 genericZrangebylexCommand(c,0);
2822 }
2823
zrevrangebylexCommand(client * c)2824 void zrevrangebylexCommand(client *c) {
2825 genericZrangebylexCommand(c,1);
2826 }
2827
zcardCommand(client * c)2828 void zcardCommand(client *c) {
2829 robj *key = c->argv[1];
2830 robj *zobj;
2831
2832 if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
2833 checkType(c,zobj,OBJ_ZSET)) return;
2834
2835 addReplyLongLong(c,zsetLength(zobj));
2836 }
2837
zscoreCommand(client * c)2838 void zscoreCommand(client *c) {
2839 robj *key = c->argv[1];
2840 robj *zobj;
2841 double score;
2842
2843 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
2844 checkType(c,zobj,OBJ_ZSET)) return;
2845
2846 if (zsetScore(zobj,c->argv[2],&score) == C_ERR) {
2847 addReply(c,shared.nullbulk);
2848 } else {
2849 addReplyDouble(c,score);
2850 }
2851 }
2852
zrankGenericCommand(client * c,int reverse)2853 void zrankGenericCommand(client *c, int reverse) {
2854 robj *key = c->argv[1];
2855 robj *ele = c->argv[2];
2856 robj *zobj;
2857 unsigned long llen;
2858 unsigned long rank;
2859
2860 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
2861 checkType(c,zobj,OBJ_ZSET)) return;
2862 llen = zsetLength(zobj);
2863
2864 serverAssertWithInfo(c,ele,sdsEncodedObject(ele));
2865
2866 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
2867 unsigned char *zl = zobj->ptr;
2868 unsigned char *eptr, *sptr;
2869
2870 eptr = ziplistIndex(zl,0);
2871 serverAssertWithInfo(c,zobj,eptr != NULL);
2872 sptr = ziplistNext(zl,eptr);
2873 serverAssertWithInfo(c,zobj,sptr != NULL);
2874
2875 rank = 1;
2876 while(eptr != NULL) {
2877 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr)))
2878 break;
2879 rank++;
2880 zzlNext(zl,&eptr,&sptr);
2881 }
2882
2883 if (eptr != NULL) {
2884 if (reverse)
2885 addReplyLongLong(c,llen-rank);
2886 else
2887 addReplyLongLong(c,rank-1);
2888 } else {
2889 addReply(c,shared.nullbulk);
2890 }
2891 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
2892 zset *zs = zobj->ptr;
2893 zskiplist *zsl = zs->zsl;
2894 dictEntry *de;
2895 double score;
2896
2897 ele = c->argv[2];
2898 de = dictFind(zs->dict,ele);
2899 if (de != NULL) {
2900 score = *(double*)dictGetVal(de);
2901 rank = zslGetRank(zsl,score,ele);
2902 serverAssertWithInfo(c,ele,rank); /* Existing elements always have a rank. */
2903 if (reverse)
2904 addReplyLongLong(c,llen-rank);
2905 else
2906 addReplyLongLong(c,rank-1);
2907 } else {
2908 addReply(c,shared.nullbulk);
2909 }
2910 } else {
2911 serverPanic("Unknown sorted set encoding");
2912 }
2913 }
2914
zrankCommand(client * c)2915 void zrankCommand(client *c) {
2916 zrankGenericCommand(c, 0);
2917 }
2918
zrevrankCommand(client * c)2919 void zrevrankCommand(client *c) {
2920 zrankGenericCommand(c, 1);
2921 }
2922
zscanCommand(client * c)2923 void zscanCommand(client *c) {
2924 robj *o;
2925 unsigned long cursor;
2926
2927 if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return;
2928 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL ||
2929 checkType(c,o,OBJ_ZSET)) return;
2930 scanGenericCommand(c,o,cursor);
2931 }
2932