1 /* SORT command and helper functions. 2 * 3 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions are met: 8 * 9 * * Redistributions of source code must retain the above copyright notice, 10 * this list of conditions and the following disclaimer. 11 * * Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * * Neither the name of Redis nor the names of its contributors may be used 15 * to endorse or promote products derived from this software without 16 * specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 32 #include "redis.h" 33 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */ 34 #include <math.h> /* isnan() */ 35 36 zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank); 37 38 redisSortOperation *createSortOperation(int type, robj *pattern) { 39 redisSortOperation *so = zmalloc(sizeof(*so)); 40 so->type = type; 41 so->pattern = pattern; 42 return so; 43 } 44 45 /* Return the value associated to the key with a name obtained using 46 * the following rules: 47 * 48 * 1) The first occurrence of '*' in 'pattern' is substituted with 'subst'. 49 * 50 * 2) If 'pattern' matches the "->" string, everything on the left of 51 * the arrow is treated as the name of a hash field, and the part on the 52 * left as the key name containing a hash. The value of the specified 53 * field is returned. 54 * 55 * 3) If 'pattern' equals "#", the function simply returns 'subst' itself so 56 * that the SORT command can be used like: SORT key GET # to retrieve 57 * the Set/List elements directly. 58 * 59 * The returned object will always have its refcount increased by 1 60 * when it is non-NULL. */ 61 robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) { 62 char *p, *f, *k; 63 sds spat, ssub; 64 robj *keyobj, *fieldobj = NULL, *o; 65 int prefixlen, sublen, postfixlen, fieldlen; 66 67 /* If the pattern is "#" return the substitution object itself in order 68 * to implement the "SORT ... GET #" feature. */ 69 spat = pattern->ptr; 70 if (spat[0] == '#' && spat[1] == '\0') { 71 incrRefCount(subst); 72 return subst; 73 } 74 75 /* The substitution object may be specially encoded. If so we create 76 * a decoded object on the fly. Otherwise getDecodedObject will just 77 * increment the ref count, that we'll decrement later. */ 78 subst = getDecodedObject(subst); 79 ssub = subst->ptr; 80 81 /* If we can't find '*' in the pattern we return NULL as to GET a 82 * fixed key does not make sense. */ 83 p = strchr(spat,'*'); 84 if (!p) { 85 decrRefCount(subst); 86 return NULL; 87 } 88 89 /* Find out if we're dealing with a hash dereference. */ 90 if ((f = strstr(p+1, "->")) != NULL && *(f+2) != '\0') { 91 fieldlen = sdslen(spat)-(f-spat)-2; 92 fieldobj = createStringObject(f+2,fieldlen); 93 } else { 94 fieldlen = 0; 95 } 96 97 /* Perform the '*' substitution. */ 98 prefixlen = p-spat; 99 sublen = sdslen(ssub); 100 postfixlen = sdslen(spat)-(prefixlen+1)-(fieldlen ? fieldlen+2 : 0); 101 keyobj = createStringObject(NULL,prefixlen+sublen+postfixlen); 102 k = keyobj->ptr; 103 memcpy(k,spat,prefixlen); 104 memcpy(k+prefixlen,ssub,sublen); 105 memcpy(k+prefixlen+sublen,p+1,postfixlen); 106 decrRefCount(subst); /* Incremented by decodeObject() */ 107 108 /* Lookup substituted key */ 109 o = lookupKeyRead(db,keyobj); 110 if (o == NULL) goto noobj; 111 112 if (fieldobj) { 113 if (o->type != REDIS_HASH) goto noobj; 114 115 /* Retrieve value from hash by the field name. This operation 116 * already increases the refcount of the returned object. */ 117 o = hashTypeGetObject(o, fieldobj); 118 } else { 119 if (o->type != REDIS_STRING) goto noobj; 120 121 /* Every object that this function returns needs to have its refcount 122 * increased. sortCommand decreases it again. */ 123 incrRefCount(o); 124 } 125 decrRefCount(keyobj); 126 if (fieldobj) decrRefCount(fieldobj); 127 return o; 128 129 noobj: 130 decrRefCount(keyobj); 131 if (fieldlen) decrRefCount(fieldobj); 132 return NULL; 133 } 134 135 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with 136 * the additional parameter is not standard but a BSD-specific we have to 137 * pass sorting parameters via the global 'server' structure */ 138 int sortCompare(const void *s1, const void *s2) { 139 const redisSortObject *so1 = s1, *so2 = s2; 140 int cmp; 141 142 if (!server.sort_alpha) { 143 /* Numeric sorting. Here it's trivial as we precomputed scores */ 144 if (so1->u.score > so2->u.score) { 145 cmp = 1; 146 } else if (so1->u.score < so2->u.score) { 147 cmp = -1; 148 } else { 149 /* Objects have the same score, but we don't want the comparison 150 * to be undefined, so we compare objects lexicographically. 151 * This way the result of SORT is deterministic. */ 152 cmp = compareStringObjects(so1->obj,so2->obj); 153 } 154 } else { 155 /* Alphanumeric sorting */ 156 if (server.sort_bypattern) { 157 if (!so1->u.cmpobj || !so2->u.cmpobj) { 158 /* At least one compare object is NULL */ 159 if (so1->u.cmpobj == so2->u.cmpobj) 160 cmp = 0; 161 else if (so1->u.cmpobj == NULL) 162 cmp = -1; 163 else 164 cmp = 1; 165 } else { 166 /* We have both the objects, compare them. */ 167 if (server.sort_store) { 168 cmp = compareStringObjects(so1->u.cmpobj,so2->u.cmpobj); 169 } else { 170 /* Here we can use strcoll() directly as we are sure that 171 * the objects are decoded string objects. */ 172 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr); 173 } 174 } 175 } else { 176 /* Compare elements directly. */ 177 if (server.sort_store) { 178 cmp = compareStringObjects(so1->obj,so2->obj); 179 } else { 180 cmp = collateStringObjects(so1->obj,so2->obj); 181 } 182 } 183 } 184 return server.sort_desc ? -cmp : cmp; 185 } 186 187 /* The SORT command is the most complex command in Redis. Warning: this code 188 * is optimized for speed and a bit less for readability */ 189 void sortCommand(redisClient *c) { 190 list *operations; 191 unsigned int outputlen = 0; 192 int desc = 0, alpha = 0; 193 long limit_start = 0, limit_count = -1, start, end; 194 int j, dontsort = 0, vectorlen; 195 int getop = 0; /* GET operation counter */ 196 int int_convertion_error = 0; 197 int syntax_error = 0; 198 robj *sortval, *sortby = NULL, *storekey = NULL; 199 redisSortObject *vector; /* Resulting vector to sort */ 200 201 /* Lookup the key to sort. It must be of the right types */ 202 sortval = lookupKeyRead(c->db,c->argv[1]); 203 if (sortval && sortval->type != REDIS_SET && 204 sortval->type != REDIS_LIST && 205 sortval->type != REDIS_ZSET) 206 { 207 addReply(c,shared.wrongtypeerr); 208 return; 209 } 210 211 /* Create a list of operations to perform for every sorted element. 212 * Operations can be GET/DEL/INCR/DECR */ 213 operations = listCreate(); 214 listSetFreeMethod(operations,zfree); 215 j = 2; /* options start at argv[2] */ 216 217 /* Now we need to protect sortval incrementing its count, in the future 218 * SORT may have options able to overwrite/delete keys during the sorting 219 * and the sorted key itself may get destroyed */ 220 if (sortval) 221 incrRefCount(sortval); 222 else 223 sortval = createQuicklistObject(); 224 225 /* The SORT command has an SQL-alike syntax, parse it */ 226 while(j < c->argc) { 227 int leftargs = c->argc-j-1; 228 if (!strcasecmp(c->argv[j]->ptr,"asc")) { 229 desc = 0; 230 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) { 231 desc = 1; 232 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) { 233 alpha = 1; 234 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) { 235 if ((getLongFromObjectOrReply(c, c->argv[j+1], &limit_start, NULL) 236 != REDIS_OK) || 237 (getLongFromObjectOrReply(c, c->argv[j+2], &limit_count, NULL) 238 != REDIS_OK)) 239 { 240 syntax_error++; 241 break; 242 } 243 j+=2; 244 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) { 245 storekey = c->argv[j+1]; 246 j++; 247 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) { 248 sortby = c->argv[j+1]; 249 /* If the BY pattern does not contain '*', i.e. it is constant, 250 * we don't need to sort nor to lookup the weight keys. */ 251 if (strchr(c->argv[j+1]->ptr,'*') == NULL) { 252 dontsort = 1; 253 } else { 254 /* If BY is specified with a real patter, we can't accept 255 * it in cluster mode. */ 256 if (server.cluster_enabled) { 257 addReplyError(c,"BY option of SORT denied in Cluster mode."); 258 syntax_error++; 259 break; 260 } 261 } 262 j++; 263 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) { 264 if (server.cluster_enabled) { 265 addReplyError(c,"GET option of SORT denied in Cluster mode."); 266 syntax_error++; 267 break; 268 } 269 listAddNodeTail(operations,createSortOperation( 270 REDIS_SORT_GET,c->argv[j+1])); 271 getop++; 272 j++; 273 } else { 274 addReply(c,shared.syntaxerr); 275 syntax_error++; 276 break; 277 } 278 j++; 279 } 280 281 /* Handle syntax errors set during options parsing. */ 282 if (syntax_error) { 283 decrRefCount(sortval); 284 listRelease(operations); 285 return; 286 } 287 288 /* When sorting a set with no sort specified, we must sort the output 289 * so the result is consistent across scripting and replication. 290 * 291 * The other types (list, sorted set) will retain their native order 292 * even if no sort order is requested, so they remain stable across 293 * scripting and replication. */ 294 if (dontsort && 295 sortval->type == REDIS_SET && 296 (storekey || c->flags & REDIS_LUA_CLIENT)) 297 { 298 /* Force ALPHA sorting */ 299 dontsort = 0; 300 alpha = 1; 301 sortby = NULL; 302 } 303 304 /* Destructively convert encoded sorted sets for SORT. */ 305 if (sortval->type == REDIS_ZSET) 306 zsetConvert(sortval, REDIS_ENCODING_SKIPLIST); 307 308 /* Objtain the length of the object to sort. */ 309 switch(sortval->type) { 310 case REDIS_LIST: vectorlen = listTypeLength(sortval); break; 311 case REDIS_SET: vectorlen = setTypeSize(sortval); break; 312 case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break; 313 default: vectorlen = 0; redisPanic("Bad SORT type"); /* Avoid GCC warning */ 314 } 315 316 /* Perform LIMIT start,count sanity checking. */ 317 start = (limit_start < 0) ? 0 : limit_start; 318 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1; 319 if (start >= vectorlen) { 320 start = vectorlen-1; 321 end = vectorlen-2; 322 } 323 if (end >= vectorlen) end = vectorlen-1; 324 325 /* Whenever possible, we load elements into the output array in a more 326 * direct way. This is possible if: 327 * 328 * 1) The object to sort is a sorted set or a list (internally sorted). 329 * 2) There is nothing to sort as dontsort is true (BY <constant string>). 330 * 331 * In this special case, if we have a LIMIT option that actually reduces 332 * the number of elements to fetch, we also optimize to just load the 333 * range we are interested in and allocating a vector that is big enough 334 * for the selected range length. */ 335 if ((sortval->type == REDIS_ZSET || sortval->type == REDIS_LIST) && 336 dontsort && 337 (start != 0 || end != vectorlen-1)) 338 { 339 vectorlen = end-start+1; 340 } 341 342 /* Load the sorting vector with all the objects to sort */ 343 vector = zmalloc(sizeof(redisSortObject)*vectorlen); 344 j = 0; 345 346 if (sortval->type == REDIS_LIST && dontsort) { 347 /* Special handling for a list, if 'dontsort' is true. 348 * This makes sure we return elements in the list original 349 * ordering, accordingly to DESC / ASC options. 350 * 351 * Note that in this case we also handle LIMIT here in a direct 352 * way, just getting the required range, as an optimization. */ 353 if (end >= start) { 354 listTypeIterator *li; 355 listTypeEntry entry; 356 li = listTypeInitIterator(sortval, 357 desc ? (long)(listTypeLength(sortval) - start - 1) : start, 358 desc ? REDIS_HEAD : REDIS_TAIL); 359 360 while(j < vectorlen && listTypeNext(li,&entry)) { 361 vector[j].obj = listTypeGet(&entry); 362 vector[j].u.score = 0; 363 vector[j].u.cmpobj = NULL; 364 j++; 365 } 366 listTypeReleaseIterator(li); 367 /* Fix start/end: output code is not aware of this optimization. */ 368 end -= start; 369 start = 0; 370 } 371 } else if (sortval->type == REDIS_LIST) { 372 listTypeIterator *li = listTypeInitIterator(sortval,0,REDIS_TAIL); 373 listTypeEntry entry; 374 while(listTypeNext(li,&entry)) { 375 vector[j].obj = listTypeGet(&entry); 376 vector[j].u.score = 0; 377 vector[j].u.cmpobj = NULL; 378 j++; 379 } 380 listTypeReleaseIterator(li); 381 } else if (sortval->type == REDIS_SET) { 382 setTypeIterator *si = setTypeInitIterator(sortval); 383 robj *ele; 384 while((ele = setTypeNextObject(si)) != NULL) { 385 vector[j].obj = ele; 386 vector[j].u.score = 0; 387 vector[j].u.cmpobj = NULL; 388 j++; 389 } 390 setTypeReleaseIterator(si); 391 } else if (sortval->type == REDIS_ZSET && dontsort) { 392 /* Special handling for a sorted set, if 'dontsort' is true. 393 * This makes sure we return elements in the sorted set original 394 * ordering, accordingly to DESC / ASC options. 395 * 396 * Note that in this case we also handle LIMIT here in a direct 397 * way, just getting the required range, as an optimization. */ 398 399 zset *zs = sortval->ptr; 400 zskiplist *zsl = zs->zsl; 401 zskiplistNode *ln; 402 robj *ele; 403 int rangelen = vectorlen; 404 405 /* Check if starting point is trivial, before doing log(N) lookup. */ 406 if (desc) { 407 long zsetlen = dictSize(((zset*)sortval->ptr)->dict); 408 409 ln = zsl->tail; 410 if (start > 0) 411 ln = zslGetElementByRank(zsl,zsetlen-start); 412 } else { 413 ln = zsl->header->level[0].forward; 414 if (start > 0) 415 ln = zslGetElementByRank(zsl,start+1); 416 } 417 418 while(rangelen--) { 419 redisAssertWithInfo(c,sortval,ln != NULL); 420 ele = ln->obj; 421 vector[j].obj = ele; 422 vector[j].u.score = 0; 423 vector[j].u.cmpobj = NULL; 424 j++; 425 ln = desc ? ln->backward : ln->level[0].forward; 426 } 427 /* Fix start/end: output code is not aware of this optimization. */ 428 end -= start; 429 start = 0; 430 } else if (sortval->type == REDIS_ZSET) { 431 dict *set = ((zset*)sortval->ptr)->dict; 432 dictIterator *di; 433 dictEntry *setele; 434 di = dictGetIterator(set); 435 while((setele = dictNext(di)) != NULL) { 436 vector[j].obj = dictGetKey(setele); 437 vector[j].u.score = 0; 438 vector[j].u.cmpobj = NULL; 439 j++; 440 } 441 dictReleaseIterator(di); 442 } else { 443 redisPanic("Unknown type"); 444 } 445 redisAssertWithInfo(c,sortval,j == vectorlen); 446 447 /* Now it's time to load the right scores in the sorting vector */ 448 if (dontsort == 0) { 449 for (j = 0; j < vectorlen; j++) { 450 robj *byval; 451 if (sortby) { 452 /* lookup value to sort by */ 453 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj); 454 if (!byval) continue; 455 } else { 456 /* use object itself to sort by */ 457 byval = vector[j].obj; 458 } 459 460 if (alpha) { 461 if (sortby) vector[j].u.cmpobj = getDecodedObject(byval); 462 } else { 463 if (sdsEncodedObject(byval)) { 464 char *eptr; 465 466 vector[j].u.score = strtod(byval->ptr,&eptr); 467 if (eptr[0] != '\0' || errno == ERANGE || 468 isnan(vector[j].u.score)) 469 { 470 int_convertion_error = 1; 471 } 472 } else if (byval->encoding == REDIS_ENCODING_INT) { 473 /* Don't need to decode the object if it's 474 * integer-encoded (the only encoding supported) so 475 * far. We can just cast it */ 476 vector[j].u.score = (long)byval->ptr; 477 } else { 478 redisAssertWithInfo(c,sortval,1 != 1); 479 } 480 } 481 482 /* when the object was retrieved using lookupKeyByPattern, 483 * its refcount needs to be decreased. */ 484 if (sortby) { 485 decrRefCount(byval); 486 } 487 } 488 } 489 490 if (dontsort == 0) { 491 server.sort_desc = desc; 492 server.sort_alpha = alpha; 493 server.sort_bypattern = sortby ? 1 : 0; 494 server.sort_store = storekey ? 1 : 0; 495 if (sortby && (start != 0 || end != vectorlen-1)) 496 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end); 497 else 498 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare); 499 } 500 501 /* Send command output to the output buffer, performing the specified 502 * GET/DEL/INCR/DECR operations if any. */ 503 outputlen = getop ? getop*(end-start+1) : end-start+1; 504 if (int_convertion_error) { 505 addReplyError(c,"One or more scores can't be converted into double"); 506 } else if (storekey == NULL) { 507 /* STORE option not specified, sent the sorting result to client */ 508 addReplyMultiBulkLen(c,outputlen); 509 for (j = start; j <= end; j++) { 510 listNode *ln; 511 listIter li; 512 513 if (!getop) addReplyBulk(c,vector[j].obj); 514 listRewind(operations,&li); 515 while((ln = listNext(&li))) { 516 redisSortOperation *sop = ln->value; 517 robj *val = lookupKeyByPattern(c->db,sop->pattern, 518 vector[j].obj); 519 520 if (sop->type == REDIS_SORT_GET) { 521 if (!val) { 522 addReply(c,shared.nullbulk); 523 } else { 524 addReplyBulk(c,val); 525 decrRefCount(val); 526 } 527 } else { 528 /* Always fails */ 529 redisAssertWithInfo(c,sortval,sop->type == REDIS_SORT_GET); 530 } 531 } 532 } 533 } else { 534 robj *sobj = createQuicklistObject(); 535 536 /* STORE option specified, set the sorting result as a List object */ 537 for (j = start; j <= end; j++) { 538 listNode *ln; 539 listIter li; 540 541 if (!getop) { 542 listTypePush(sobj,vector[j].obj,REDIS_TAIL); 543 } else { 544 listRewind(operations,&li); 545 while((ln = listNext(&li))) { 546 redisSortOperation *sop = ln->value; 547 robj *val = lookupKeyByPattern(c->db,sop->pattern, 548 vector[j].obj); 549 550 if (sop->type == REDIS_SORT_GET) { 551 if (!val) val = createStringObject("",0); 552 553 /* listTypePush does an incrRefCount, so we should take care 554 * care of the incremented refcount caused by either 555 * lookupKeyByPattern or createStringObject("",0) */ 556 listTypePush(sobj,val,REDIS_TAIL); 557 decrRefCount(val); 558 } else { 559 /* Always fails */ 560 redisAssertWithInfo(c,sortval,sop->type == REDIS_SORT_GET); 561 } 562 } 563 } 564 } 565 if (outputlen) { 566 setKey(c->db,storekey,sobj); 567 notifyKeyspaceEvent(REDIS_NOTIFY_LIST,"sortstore",storekey, 568 c->db->id); 569 server.dirty += outputlen; 570 } else if (dbDelete(c->db,storekey)) { 571 signalModifiedKey(c->db,storekey); 572 notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",storekey,c->db->id); 573 server.dirty++; 574 } 575 decrRefCount(sobj); 576 addReplyLongLong(c,outputlen); 577 } 578 579 /* Cleanup */ 580 if (sortval->type == REDIS_LIST || sortval->type == REDIS_SET) 581 for (j = 0; j < vectorlen; j++) 582 decrRefCount(vector[j].obj); 583 decrRefCount(sortval); 584 listRelease(operations); 585 for (j = 0; j < vectorlen; j++) { 586 if (alpha && vector[j].u.cmpobj) 587 decrRefCount(vector[j].u.cmpobj); 588 } 589 zfree(vector); 590 } 591