xref: /redis-3.2.3/src/geo.c (revision fdafe233)
1 /*
2  * Copyright (c) 2014, Matt Stancliff <[email protected]>.
3  * Copyright (c) 2015-2016, Salvatore Sanfilippo <[email protected]>.
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are met:
8  *
9  *   * Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *   * Redistributions in binary form must reproduce the above copyright
12  *     notice, this list of conditions and the following disclaimer in the
13  *     documentation and/or other materials provided with the distribution.
14  *   * Neither the name of Redis nor the names of its contributors may be used
15  *     to endorse or promote products derived from this software without
16  *     specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 #include "geo.h"
32 #include "geohash_helper.h"
33 #include "debugmacro.h"
34 
35 /* Things exported from t_zset.c only for geo.c, since it is the only other
36  * part of Redis that requires close zset introspection. */
37 unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec *range);
38 int zslValueLteMax(double value, zrangespec *spec);
39 
40 /* ====================================================================
41  * This file implements the following commands:
42  *
43  *   - geoadd - add coordinates for value to geoset
44  *   - georadius - search radius by coordinates in geoset
45  *   - georadiusbymember - search radius based on geoset member position
46  * ==================================================================== */
47 
48 /* ====================================================================
49  * geoArray implementation
50  * ==================================================================== */
51 
52 /* Create a new array of geoPoints. */
geoArrayCreate(void)53 geoArray *geoArrayCreate(void) {
54     geoArray *ga = zmalloc(sizeof(*ga));
55     /* It gets allocated on first geoArrayAppend() call. */
56     ga->array = NULL;
57     ga->buckets = 0;
58     ga->used = 0;
59     return ga;
60 }
61 
62 /* Add a new entry and return its pointer so that the caller can populate
63  * it with data. */
geoArrayAppend(geoArray * ga)64 geoPoint *geoArrayAppend(geoArray *ga) {
65     if (ga->used == ga->buckets) {
66         ga->buckets = (ga->buckets == 0) ? 8 : ga->buckets*2;
67         ga->array = zrealloc(ga->array,sizeof(geoPoint)*ga->buckets);
68     }
69     geoPoint *gp = ga->array+ga->used;
70     ga->used++;
71     return gp;
72 }
73 
74 /* Destroy a geoArray created with geoArrayCreate(). */
geoArrayFree(geoArray * ga)75 void geoArrayFree(geoArray *ga) {
76     size_t i;
77     for (i = 0; i < ga->used; i++) sdsfree(ga->array[i].member);
78     zfree(ga->array);
79     zfree(ga);
80 }
81 
82 /* ====================================================================
83  * Helpers
84  * ==================================================================== */
decodeGeohash(double bits,double * xy)85 int decodeGeohash(double bits, double *xy) {
86     GeoHashBits hash = { .bits = (uint64_t)bits, .step = GEO_STEP_MAX };
87     return geohashDecodeToLongLatWGS84(hash, xy);
88 }
89 
90 /* Input Argument Helper */
91 /* Take a pointer to the latitude arg then use the next arg for longitude.
92  * On parse error C_ERR is returned, otherwise C_OK. */
extractLongLatOrReply(client * c,robj ** argv,double * xy)93 int extractLongLatOrReply(client *c, robj **argv, double *xy) {
94     int i;
95     for (i = 0; i < 2; i++) {
96         if (getDoubleFromObjectOrReply(c, argv[i], xy + i, NULL) !=
97             C_OK) {
98             return C_ERR;
99         }
100     }
101     if (xy[0] < GEO_LONG_MIN || xy[0] > GEO_LONG_MAX ||
102         xy[1] < GEO_LAT_MIN  || xy[1] > GEO_LAT_MAX) {
103         addReplySds(c, sdscatprintf(sdsempty(),
104             "-ERR invalid longitude,latitude pair %f,%f\r\n",xy[0],xy[1]));
105         return C_ERR;
106     }
107     return C_OK;
108 }
109 
110 /* Input Argument Helper */
111 /* Decode lat/long from a zset member's score.
112  * Returns C_OK on successful decoding, otherwise C_ERR is returned. */
longLatFromMember(robj * zobj,robj * member,double * xy)113 int longLatFromMember(robj *zobj, robj *member, double *xy) {
114     double score = 0;
115 
116     if (zsetScore(zobj, member, &score) == C_ERR) return C_ERR;
117     if (!decodeGeohash(score, xy)) return C_ERR;
118     return C_OK;
119 }
120 
121 /* Check that the unit argument matches one of the known units, and returns
122  * the conversion factor to meters (you need to divide meters by the conversion
123  * factor to convert to the right unit).
124  *
125  * If the unit is not valid, an error is reported to the client, and a value
126  * less than zero is returned. */
extractUnitOrReply(client * c,robj * unit)127 double extractUnitOrReply(client *c, robj *unit) {
128     char *u = unit->ptr;
129 
130     if (!strcmp(u, "m")) {
131         return 1;
132     } else if (!strcmp(u, "km")) {
133         return 1000;
134     } else if (!strcmp(u, "ft")) {
135         return 0.3048;
136     } else if (!strcmp(u, "mi")) {
137         return 1609.34;
138     } else {
139         addReplyError(c,
140             "unsupported unit provided. please use m, km, ft, mi");
141         return -1;
142     }
143 }
144 
145 /* Input Argument Helper.
146  * Extract the dinstance from the specified two arguments starting at 'argv'
147  * that shouldbe in the form: <number> <unit> and return the dinstance in the
148  * specified unit on success. *conversino is populated with the coefficient
149  * to use in order to convert meters to the unit.
150  *
151  * On error a value less than zero is returned. */
extractDistanceOrReply(client * c,robj ** argv,double * conversion)152 double extractDistanceOrReply(client *c, robj **argv,
153                                      double *conversion) {
154     double distance;
155     if (getDoubleFromObjectOrReply(c, argv[0], &distance,
156                                    "need numeric radius") != C_OK) {
157         return -1;
158     }
159 
160     if (distance < 0) {
161         addReplyError(c,"radius cannot be negative");
162         return -1;
163     }
164 
165     double to_meters = extractUnitOrReply(c,argv[1]);
166     if (to_meters < 0) {
167         return -1;
168     }
169 
170     if (conversion) *conversion = to_meters;
171     return distance * to_meters;
172 }
173 
174 /* The default addReplyDouble has too much accuracy.  We use this
175  * for returning location distances. "5.2145 meters away" is nicer
176  * than "5.2144992818115 meters away." We provide 4 digits after the dot
177  * so that the returned value is decently accurate even when the unit is
178  * the kilometer. */
addReplyDoubleDistance(client * c,double d)179 void addReplyDoubleDistance(client *c, double d) {
180     char dbuf[128];
181     int dlen = snprintf(dbuf, sizeof(dbuf), "%.4f", d);
182     addReplyBulkCBuffer(c, dbuf, dlen);
183 }
184 
185 /* Helper function for geoGetPointsInRange(): given a sorted set score
186  * representing a point, and another point (the center of our search) and
187  * a radius, appends this entry as a geoPoint into the specified geoArray
188  * only if the point is within the search area.
189  *
190  * returns C_OK if the point is included, or REIDS_ERR if it is outside. */
geoAppendIfWithinRadius(geoArray * ga,double lon,double lat,double radius,double score,sds member)191 int geoAppendIfWithinRadius(geoArray *ga, double lon, double lat, double radius, double score, sds member) {
192     double distance, xy[2];
193 
194     if (!decodeGeohash(score,xy)) return C_ERR; /* Can't decode. */
195     /* Note that geohashGetDistanceIfInRadiusWGS84() takes arguments in
196      * reverse order: longitude first, latitude later. */
197     if (!geohashGetDistanceIfInRadiusWGS84(lon,lat, xy[0], xy[1],
198                                            radius, &distance))
199     {
200         return C_ERR;
201     }
202 
203     /* Append the new element. */
204     geoPoint *gp = geoArrayAppend(ga);
205     gp->longitude = xy[0];
206     gp->latitude = xy[1];
207     gp->dist = distance;
208     gp->member = member;
209     gp->score = score;
210     return C_OK;
211 }
212 
213 /* Query a Redis sorted set to extract all the elements between 'min' and
214  * 'max', appending them into the array of geoPoint structures 'gparray'.
215  * The command returns the number of elements added to the array.
216  *
217  * Elements which are farest than 'radius' from the specified 'x' and 'y'
218  * coordinates are not included.
219  *
220  * The ability of this function to append to an existing set of points is
221  * important for good performances because querying by radius is performed
222  * using multiple queries to the sorted set, that we later need to sort
223  * via qsort. Similarly we need to be able to reject points outside the search
224  * radius area ASAP in order to allocate and process more points than needed. */
geoGetPointsInRange(robj * zobj,double min,double max,double lon,double lat,double radius,geoArray * ga)225 int geoGetPointsInRange(robj *zobj, double min, double max, double lon, double lat, double radius, geoArray *ga) {
226     /* minex 0 = include min in range; maxex 1 = exclude max in range */
227     /* That's: min <= val < max */
228     zrangespec range = { .min = min, .max = max, .minex = 0, .maxex = 1 };
229     size_t origincount = ga->used;
230     sds member;
231 
232     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
233         unsigned char *zl = zobj->ptr;
234         unsigned char *eptr, *sptr;
235         unsigned char *vstr = NULL;
236         unsigned int vlen = 0;
237         long long vlong = 0;
238         double score = 0;
239 
240         if ((eptr = zzlFirstInRange(zl, &range)) == NULL) {
241             /* Nothing exists starting at our min.  No results. */
242             return 0;
243         }
244 
245         sptr = ziplistNext(zl, eptr);
246         while (eptr) {
247             score = zzlGetScore(sptr);
248 
249             /* If we fell out of range, break. */
250             if (!zslValueLteMax(score, &range))
251                 break;
252 
253             /* We know the element exists. ziplistGet should always succeed */
254             ziplistGet(eptr, &vstr, &vlen, &vlong);
255             member = (vstr == NULL) ? sdsfromlonglong(vlong) :
256                                       sdsnewlen(vstr,vlen);
257             if (geoAppendIfWithinRadius(ga,lon,lat,radius,score,member)
258                 == C_ERR) sdsfree(member);
259             zzlNext(zl, &eptr, &sptr);
260         }
261     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
262         zset *zs = zobj->ptr;
263         zskiplist *zsl = zs->zsl;
264         zskiplistNode *ln;
265 
266         if ((ln = zslFirstInRange(zsl, &range)) == NULL) {
267             /* Nothing exists starting at our min.  No results. */
268             return 0;
269         }
270 
271         while (ln) {
272             robj *o = ln->obj;
273             /* Abort when the node is no longer in range. */
274             if (!zslValueLteMax(ln->score, &range))
275                 break;
276 
277             member = (o->encoding == OBJ_ENCODING_INT) ?
278                         sdsfromlonglong((long)o->ptr) :
279                         sdsdup(o->ptr);
280             if (geoAppendIfWithinRadius(ga,lon,lat,radius,ln->score,member)
281                 == C_ERR) sdsfree(member);
282             ln = ln->level[0].forward;
283         }
284     }
285     return ga->used - origincount;
286 }
287 
288 /* Compute the sorted set scores min (inclusive), max (exclusive) we should
289  * query in order to retrieve all the elements inside the specified area
290  * 'hash'. The two scores are returned by reference in *min and *max. */
scoresOfGeoHashBox(GeoHashBits hash,GeoHashFix52Bits * min,GeoHashFix52Bits * max)291 void scoresOfGeoHashBox(GeoHashBits hash, GeoHashFix52Bits *min, GeoHashFix52Bits *max) {
292     /* We want to compute the sorted set scores that will include all the
293      * elements inside the specified Geohash 'hash', which has as many
294      * bits as specified by hash.step * 2.
295      *
296      * So if step is, for example, 3, and the hash value in binary
297      * is 101010, since our score is 52 bits we want every element which
298      * is in binary: 101010?????????????????????????????????????????????
299      * Where ? can be 0 or 1.
300      *
301      * To get the min score we just use the initial hash value left
302      * shifted enough to get the 52 bit value. Later we increment the
303      * 6 bit prefis (see the hash.bits++ statement), and get the new
304      * prefix: 101011, which we align again to 52 bits to get the maximum
305      * value (which is excluded from the search). So we get everything
306      * between the two following scores (represented in binary):
307      *
308      * 1010100000000000000000000000000000000000000000000000 (included)
309      * and
310      * 1010110000000000000000000000000000000000000000000000 (excluded).
311      */
312     *min = geohashAlign52Bits(hash);
313     hash.bits++;
314     *max = geohashAlign52Bits(hash);
315 }
316 
317 /* Obtain all members between the min/max of this geohash bounding box.
318  * Populate a geoArray of GeoPoints by calling geoGetPointsInRange().
319  * Return the number of points added to the array. */
membersOfGeoHashBox(robj * zobj,GeoHashBits hash,geoArray * ga,double lon,double lat,double radius)320 int membersOfGeoHashBox(robj *zobj, GeoHashBits hash, geoArray *ga, double lon, double lat, double radius) {
321     GeoHashFix52Bits min, max;
322 
323     scoresOfGeoHashBox(hash,&min,&max);
324     return geoGetPointsInRange(zobj, min, max, lon, lat, radius, ga);
325 }
326 
327 /* Search all eight neighbors + self geohash box */
membersOfAllNeighbors(robj * zobj,GeoHashRadius n,double lon,double lat,double radius,geoArray * ga)328 int membersOfAllNeighbors(robj *zobj, GeoHashRadius n, double lon, double lat, double radius, geoArray *ga) {
329     GeoHashBits neighbors[9];
330     unsigned int i, count = 0, last_processed = 0;
331 
332     neighbors[0] = n.hash;
333     neighbors[1] = n.neighbors.north;
334     neighbors[2] = n.neighbors.south;
335     neighbors[3] = n.neighbors.east;
336     neighbors[4] = n.neighbors.west;
337     neighbors[5] = n.neighbors.north_east;
338     neighbors[6] = n.neighbors.north_west;
339     neighbors[7] = n.neighbors.south_east;
340     neighbors[8] = n.neighbors.south_west;
341 
342     /* For each neighbor (*and* our own hashbox), get all the matching
343      * members and add them to the potential result list. */
344     for (i = 0; i < sizeof(neighbors) / sizeof(*neighbors); i++) {
345         if (HASHISZERO(neighbors[i]))
346             continue;
347 
348         /* When a huge Radius (in the 5000 km range or more) is used,
349          * adjacent neighbors can be the same, leading to duplicated
350          * elements. Skip every range which is the same as the one
351          * processed previously. */
352         if (last_processed &&
353             neighbors[i].bits == neighbors[last_processed].bits &&
354             neighbors[i].step == neighbors[last_processed].step)
355             continue;
356         count += membersOfGeoHashBox(zobj, neighbors[i], ga, lon, lat, radius);
357         last_processed = i;
358     }
359     return count;
360 }
361 
362 /* Sort comparators for qsort() */
sort_gp_asc(const void * a,const void * b)363 static int sort_gp_asc(const void *a, const void *b) {
364     const struct geoPoint *gpa = a, *gpb = b;
365     /* We can't do adist - bdist because they are doubles and
366      * the comparator returns an int. */
367     if (gpa->dist > gpb->dist)
368         return 1;
369     else if (gpa->dist == gpb->dist)
370         return 0;
371     else
372         return -1;
373 }
374 
sort_gp_desc(const void * a,const void * b)375 static int sort_gp_desc(const void *a, const void *b) {
376     return -sort_gp_asc(a, b);
377 }
378 
379 /* ====================================================================
380  * Commands
381  * ==================================================================== */
382 
383 /* GEOADD key long lat name [long2 lat2 name2 ... longN latN nameN] */
geoaddCommand(client * c)384 void geoaddCommand(client *c) {
385     /* Check arguments number for sanity. */
386     if ((c->argc - 2) % 3 != 0) {
387         /* Need an odd number of arguments if we got this far... */
388         addReplyError(c, "syntax error. Try GEOADD key [x1] [y1] [name1] "
389                          "[x2] [y2] [name2] ... ");
390         return;
391     }
392 
393     int elements = (c->argc - 2) / 3;
394     int argc = 2+elements*2; /* ZADD key score ele ... */
395     robj **argv = zcalloc(argc*sizeof(robj*));
396     argv[0] = createRawStringObject("zadd",4);
397     argv[1] = c->argv[1]; /* key */
398     incrRefCount(argv[1]);
399 
400     /* Create the argument vector to call ZADD in order to add all
401      * the score,value pairs to the requested zset, where score is actually
402      * an encoded version of lat,long. */
403     int i;
404     for (i = 0; i < elements; i++) {
405         double xy[2];
406 
407         if (extractLongLatOrReply(c, (c->argv+2)+(i*3),xy) == C_ERR) {
408             for (i = 0; i < argc; i++)
409                 if (argv[i]) decrRefCount(argv[i]);
410             zfree(argv);
411             return;
412         }
413 
414         /* Turn the coordinates into the score of the element. */
415         GeoHashBits hash;
416         geohashEncodeWGS84(xy[0], xy[1], GEO_STEP_MAX, &hash);
417         GeoHashFix52Bits bits = geohashAlign52Bits(hash);
418         robj *score = createObject(OBJ_STRING, sdsfromlonglong(bits));
419         robj *val = c->argv[2 + i * 3 + 2];
420         argv[2+i*2] = score;
421         argv[3+i*2] = val;
422         incrRefCount(val);
423     }
424 
425     /* Finally call ZADD that will do the work for us. */
426     replaceClientCommandVector(c,argc,argv);
427     zaddCommand(c);
428 }
429 
430 #define SORT_NONE 0
431 #define SORT_ASC 1
432 #define SORT_DESC 2
433 
434 #define RADIUS_COORDS 1
435 #define RADIUS_MEMBER 2
436 
437 /* GEORADIUS key x y radius unit [WITHDIST] [WITHHASH] [WITHCOORD] [ASC|DESC]
438  *                               [COUNT count] [STORE key] [STOREDIST key]
439  * GEORADIUSBYMEMBER key member radius unit ... options ... */
georadiusGeneric(client * c,int type)440 void georadiusGeneric(client *c, int type) {
441     robj *key = c->argv[1];
442     robj *storekey = NULL;
443     int storedist = 0; /* 0 for STORE, 1 for STOREDIST. */
444 
445     /* Look up the requested zset */
446     robj *zobj = NULL;
447     if ((zobj = lookupKeyReadOrReply(c, key, shared.emptymultibulk)) == NULL ||
448         checkType(c, zobj, OBJ_ZSET)) {
449         return;
450     }
451 
452     /* Find long/lat to use for radius search based on inquiry type */
453     int base_args;
454     double xy[2] = { 0 };
455     if (type == RADIUS_COORDS) {
456         base_args = 6;
457         if (extractLongLatOrReply(c, c->argv + 2, xy) == C_ERR)
458             return;
459     } else if (type == RADIUS_MEMBER) {
460         base_args = 5;
461         robj *member = c->argv[2];
462         if (longLatFromMember(zobj, member, xy) == C_ERR) {
463             addReplyError(c, "could not decode requested zset member");
464             return;
465         }
466     } else {
467         addReplyError(c, "unknown georadius search type");
468         return;
469     }
470 
471     /* Extract radius and units from arguments */
472     double radius_meters = 0, conversion = 1;
473     if ((radius_meters = extractDistanceOrReply(c, c->argv + base_args - 2,
474                                                 &conversion)) < 0) {
475         return;
476     }
477 
478     /* Discover and populate all optional parameters. */
479     int withdist = 0, withhash = 0, withcoords = 0;
480     int sort = SORT_NONE;
481     long long count = 0;
482     if (c->argc > base_args) {
483         int remaining = c->argc - base_args;
484         for (int i = 0; i < remaining; i++) {
485             char *arg = c->argv[base_args + i]->ptr;
486             if (!strcasecmp(arg, "withdist")) {
487                 withdist = 1;
488             } else if (!strcasecmp(arg, "withhash")) {
489                 withhash = 1;
490             } else if (!strcasecmp(arg, "withcoord")) {
491                 withcoords = 1;
492             } else if (!strcasecmp(arg, "asc")) {
493                 sort = SORT_ASC;
494             } else if (!strcasecmp(arg, "desc")) {
495                 sort = SORT_DESC;
496             } else if (!strcasecmp(arg, "count") && (i+1) < remaining) {
497                 if (getLongLongFromObjectOrReply(c, c->argv[base_args+i+1],
498                     &count, NULL) != C_OK) return;
499                 if (count <= 0) {
500                     addReplyError(c,"COUNT must be > 0");
501                     return;
502                 }
503                 i++;
504             } else if (!strcasecmp(arg, "store") && (i+1) < remaining) {
505                 storekey = c->argv[base_args+i+1];
506                 storedist = 0;
507                 i++;
508             } else if (!strcasecmp(arg, "storedist") && (i+1) < remaining) {
509                 storekey = c->argv[base_args+i+1];
510                 storedist = 1;
511                 i++;
512             } else {
513                 addReply(c, shared.syntaxerr);
514                 return;
515             }
516         }
517     }
518 
519     /* Trap options not compatible with STORE and STOREDIST. */
520     if (storekey && (withdist || withhash || withcoords)) {
521         addReplyError(c,
522             "STORE option in GEORADIUS is not compatible with "
523             "WITHDIST, WITHHASH and WITHCOORDS options");
524         return;
525     }
526 
527     /* COUNT without ordering does not make much sense, force ASC
528      * ordering if COUNT was specified but no sorting was requested. */
529     if (count != 0 && sort == SORT_NONE) sort = SORT_ASC;
530 
531     /* Get all neighbor geohash boxes for our radius search */
532     GeoHashRadius georadius =
533         geohashGetAreasByRadiusWGS84(xy[0], xy[1], radius_meters);
534 
535     /* Search the zset for all matching points */
536     geoArray *ga = geoArrayCreate();
537     membersOfAllNeighbors(zobj, georadius, xy[0], xy[1], radius_meters, ga);
538 
539     /* If no matching results, the user gets an empty reply. */
540     if (ga->used == 0 && storekey == NULL) {
541         addReply(c, shared.emptymultibulk);
542         geoArrayFree(ga);
543         return;
544     }
545 
546     long result_length = ga->used;
547     long returned_items = (count == 0 || result_length < count) ?
548                           result_length : count;
549     long option_length = 0;
550 
551     /* Process [optional] requested sorting */
552     if (sort == SORT_ASC) {
553         qsort(ga->array, result_length, sizeof(geoPoint), sort_gp_asc);
554     } else if (sort == SORT_DESC) {
555         qsort(ga->array, result_length, sizeof(geoPoint), sort_gp_desc);
556     }
557 
558     if (storekey == NULL) {
559         /* No target key, return results to user. */
560 
561         /* Our options are self-contained nested multibulk replies, so we
562          * only need to track how many of those nested replies we return. */
563         if (withdist)
564             option_length++;
565 
566         if (withcoords)
567             option_length++;
568 
569         if (withhash)
570             option_length++;
571 
572         /* The multibulk len we send is exactly result_length. The result is
573          * either all strings of just zset members  *or* a nested multi-bulk
574          * reply containing the zset member string _and_ all the additional
575          * options the user enabled for this request. */
576         addReplyMultiBulkLen(c, returned_items);
577 
578         /* Finally send results back to the caller */
579         int i;
580         for (i = 0; i < returned_items; i++) {
581             geoPoint *gp = ga->array+i;
582             gp->dist /= conversion; /* Fix according to unit. */
583 
584             /* If we have options in option_length, return each sub-result
585              * as a nested multi-bulk.  Add 1 to account for result value
586              * itself. */
587             if (option_length)
588                 addReplyMultiBulkLen(c, option_length + 1);
589 
590             addReplyBulkSds(c,gp->member);
591             gp->member = NULL;
592 
593             if (withdist)
594                 addReplyDoubleDistance(c, gp->dist);
595 
596             if (withhash)
597                 addReplyLongLong(c, gp->score);
598 
599             if (withcoords) {
600                 addReplyMultiBulkLen(c, 2);
601                 addReplyHumanLongDouble(c, gp->longitude);
602                 addReplyHumanLongDouble(c, gp->latitude);
603             }
604         }
605     } else {
606         /* Target key, create a sorted set with the results. */
607         robj *zobj;
608         zset *zs;
609         int i;
610         size_t maxelelen = 0;
611 
612         if (returned_items) {
613             zobj = createZsetObject();
614             zs = zobj->ptr;
615         }
616 
617         for (i = 0; i < returned_items; i++) {
618             zskiplistNode *znode;
619             geoPoint *gp = ga->array+i;
620             gp->dist /= conversion; /* Fix according to unit. */
621             double score = storedist ? gp->dist : gp->score;
622             size_t elelen = sdslen(gp->member);
623             robj *ele = createObject(OBJ_STRING,gp->member);
624 
625             if (maxelelen < elelen) maxelelen = elelen;
626             incrRefCount(ele); /* Set refcount to 2 since we reference the
627                                   object both in the skiplist and dict. */
628             znode = zslInsert(zs->zsl,score,ele);
629             serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
630             gp->member = NULL;
631         }
632 
633         if (returned_items) {
634             zsetConvertToZiplistIfNeeded(zobj,maxelelen);
635             setKey(c->db,storekey,zobj);
636             decrRefCount(zobj);
637             notifyKeyspaceEvent(NOTIFY_LIST,"georadiusstore",storekey,
638                                 c->db->id);
639             server.dirty += returned_items;
640         } else if (dbDelete(c->db,storekey)) {
641             signalModifiedKey(c->db,storekey);
642             notifyKeyspaceEvent(NOTIFY_GENERIC,"del",storekey,c->db->id);
643             server.dirty++;
644         }
645         addReplyLongLong(c, returned_items);
646     }
647     geoArrayFree(ga);
648 }
649 
650 /* GEORADIUS wrapper function. */
georadiusCommand(client * c)651 void georadiusCommand(client *c) {
652     georadiusGeneric(c, RADIUS_COORDS);
653 }
654 
655 /* GEORADIUSBYMEMBER wrapper function. */
georadiusByMemberCommand(client * c)656 void georadiusByMemberCommand(client *c) {
657     georadiusGeneric(c, RADIUS_MEMBER);
658 }
659 
660 /* GEOHASH key ele1 ele2 ... eleN
661  *
662  * Returns an array with an 11 characters geohash representation of the
663  * position of the specified elements. */
geohashCommand(client * c)664 void geohashCommand(client *c) {
665     char *geoalphabet= "0123456789bcdefghjkmnpqrstuvwxyz";
666     int j;
667 
668     /* Look up the requested zset */
669     robj *zobj = NULL;
670     if ((zobj = lookupKeyReadOrReply(c, c->argv[1], shared.emptymultibulk))
671         == NULL || checkType(c, zobj, OBJ_ZSET)) return;
672 
673     /* Geohash elements one after the other, using a null bulk reply for
674      * missing elements. */
675     addReplyMultiBulkLen(c,c->argc-2);
676     for (j = 2; j < c->argc; j++) {
677         double score;
678         if (zsetScore(zobj, c->argv[j], &score) == C_ERR) {
679             addReply(c,shared.nullbulk);
680         } else {
681             /* The internal format we use for geocoding is a bit different
682              * than the standard, since we use as initial latitude range
683              * -85,85, while the normal geohashing algorithm uses -90,90.
684              * So we have to decode our position and re-encode using the
685              * standard ranges in order to output a valid geohash string. */
686 
687             /* Decode... */
688             double xy[2];
689             if (!decodeGeohash(score,xy)) {
690                 addReply(c,shared.nullbulk);
691                 continue;
692             }
693 
694             /* Re-encode */
695             GeoHashRange r[2];
696             GeoHashBits hash;
697             r[0].min = -180;
698             r[0].max = 180;
699             r[1].min = -90;
700             r[1].max = 90;
701             geohashEncode(&r[0],&r[1],xy[0],xy[1],26,&hash);
702 
703             char buf[12];
704             int i;
705             for (i = 0; i < 11; i++) {
706                 int idx = (hash.bits >> (52-((i+1)*5))) & 0x1f;
707                 buf[i] = geoalphabet[idx];
708             }
709             buf[11] = '\0';
710             addReplyBulkCBuffer(c,buf,11);
711         }
712     }
713 }
714 
715 /* GEOPOS key ele1 ele2 ... eleN
716  *
717  * Returns an array of two-items arrays representing the x,y position of each
718  * element specified in the arguments. For missing elements NULL is returned. */
geoposCommand(client * c)719 void geoposCommand(client *c) {
720     int j;
721 
722     /* Look up the requested zset */
723     robj *zobj = NULL;
724     if ((zobj = lookupKeyReadOrReply(c, c->argv[1], shared.emptymultibulk))
725         == NULL || checkType(c, zobj, OBJ_ZSET)) return;
726 
727     /* Report elements one after the other, using a null bulk reply for
728      * missing elements. */
729     addReplyMultiBulkLen(c,c->argc-2);
730     for (j = 2; j < c->argc; j++) {
731         double score;
732         if (zsetScore(zobj, c->argv[j], &score) == C_ERR) {
733             addReply(c,shared.nullmultibulk);
734         } else {
735             /* Decode... */
736             double xy[2];
737             if (!decodeGeohash(score,xy)) {
738                 addReply(c,shared.nullmultibulk);
739                 continue;
740             }
741             addReplyMultiBulkLen(c,2);
742             addReplyHumanLongDouble(c,xy[0]);
743             addReplyHumanLongDouble(c,xy[1]);
744         }
745     }
746 }
747 
748 /* GEODIST key ele1 ele2 [unit]
749  *
750  * Return the distance, in meters by default, otherwise accordig to "unit",
751  * between points ele1 and ele2. If one or more elements are missing NULL
752  * is returned. */
geodistCommand(client * c)753 void geodistCommand(client *c) {
754     double to_meter = 1;
755 
756     /* Check if there is the unit to extract, otherwise assume meters. */
757     if (c->argc == 5) {
758         to_meter = extractUnitOrReply(c,c->argv[4]);
759         if (to_meter < 0) return;
760     } else if (c->argc > 5) {
761         addReply(c,shared.syntaxerr);
762         return;
763     }
764 
765     /* Look up the requested zset */
766     robj *zobj = NULL;
767     if ((zobj = lookupKeyReadOrReply(c, c->argv[1], shared.emptybulk))
768         == NULL || checkType(c, zobj, OBJ_ZSET)) return;
769 
770     /* Get the scores. We need both otherwise NULL is returned. */
771     double score1, score2, xyxy[4];
772     if (zsetScore(zobj, c->argv[2], &score1) == C_ERR ||
773         zsetScore(zobj, c->argv[3], &score2) == C_ERR)
774     {
775         addReply(c,shared.nullbulk);
776         return;
777     }
778 
779     /* Decode & compute the distance. */
780     if (!decodeGeohash(score1,xyxy) || !decodeGeohash(score2,xyxy+2))
781         addReply(c,shared.nullbulk);
782     else
783         addReplyDoubleDistance(c,
784             geohashGetDistance(xyxy[0],xyxy[1],xyxy[2],xyxy[3]) / to_meter);
785 }
786