1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright (c) 2022, Cache Forge LLC, All rights reserved.
4 * Alan Kasindorf <[email protected]>
5 * Copyright (c) 2007, Last.fm, All rights reserved.
6 * Richard Jones <[email protected]>
7 * Christian Muehlhaeuser <[email protected]>
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are met:
11 *     * Redistributions of source code must retain the above copyright
12 *       notice, this list of conditions and the following disclaimer.
13 *     * Redistributions in binary form must reproduce the above copyright
14 *       notice, this list of conditions and the following disclaimer in the
15 *       documentation and/or other materials provided with the distribution.
16 *     * Neither the name of the Last.fm Limited nor the
17 *       names of its contributors may be used to endorse or promote products
18 *       derived from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY Last.fm ``AS IS'' AND ANY
21 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
22 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23 * DISCLAIMED. IN NO EVENT SHALL Last.fm BE LIABLE FOR ANY
24 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
25 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
26 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
27 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
29 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 */
31 
32 #include "proxy.h"
33 #include "md5.h"
34 
35 #define DEFAULT_BUCKET_SIZE 160
36 
37 typedef struct {
38     unsigned int point; // continuum point.
39     unsigned int id; // server id.
40 } cpoint;
41 
42 typedef struct {
43     struct proxy_hash_caller phc; // passed back to the proxy API.
44     unsigned int total_buckets;
45     cpoint continuum[]; // points to server ids.
46 } ketama_t;
47 
48 static uint64_t ketama_key_hasher(const void *key, size_t len, uint64_t seed);
49 static struct proxy_hash_func ketama_key_hash = {
50     ketama_key_hasher,
51 };
52 
53 /* FROM ketama.c */
ketama_md5_digest(char * inString,unsigned char md5pword[16])54 static void ketama_md5_digest( char* inString, unsigned char md5pword[16] )
55 {
56     md5_state_t md5state;
57 
58     md5_init( &md5state );
59     md5_append( &md5state, (unsigned char *)inString, strlen( inString ) );
60     md5_finish( &md5state, md5pword );
61 }
62 
ketama_compare(const void * p1,const void * p2)63 static int ketama_compare(const void *p1, const void *p2) {
64     const cpoint *a = p1;
65     const cpoint *b = p2;
66 
67     return (a->point < b->point) ? -1 : ((a->point > b->point) ? 1 : 0);
68 }
69 
ketama_key_hasher(const void * key,size_t len,uint64_t seed)70 static uint64_t ketama_key_hasher(const void *key, size_t len, uint64_t seed) {
71     // NOTE: seed is ignored!
72     // embedding the md5 bits since key is specified with a length here.
73     md5_state_t md5state;
74     unsigned char digest[16];
75 
76     md5_init(&md5state);
77     md5_append(&md5state, (unsigned char *)key, len);
78     md5_finish(&md5state, digest);
79 
80     // mix the hash down (from ketama_hashi)
81     unsigned int h = (unsigned int)(( digest[3] << 24 )
82                         | ( digest[2] << 16 )
83                         | ( digest[1] <<  8 )
84                         |   digest[0] );
85     return h;
86 }
87 
88 // Note: must return lookupas as zero-indexed.
ketama_get_server(uint64_t hash,void * ctx)89 static uint32_t ketama_get_server(uint64_t hash, void *ctx) {
90     ketama_t *kt = (ketama_t *)ctx;
91     unsigned int h = hash;
92     int highp = kt->total_buckets;
93     int lowp = 0, midp;
94     unsigned int midval, midval1;
95 
96     // divide and conquer array search to find server with next biggest
97     // point after what this key hashes to
98     while ( 1 )
99     {
100         midp = (int)( ( lowp+highp ) / 2 );
101 
102         if ( midp == kt->total_buckets )
103             return kt->continuum[0].id-1; // if at the end, roll back to zeroth
104 
105         midval = kt->continuum[midp].point;
106         midval1 = midp == 0 ? 0 : kt->continuum[midp-1].point;
107 
108         if ( h <= midval && h > midval1 )
109             return kt->continuum[midp].id-1;
110 
111         if ( midval < h )
112             lowp = midp + 1;
113         else
114             highp = midp - 1;
115 
116         if ( lowp > highp )
117             return kt->continuum[0].id-1;
118     }
119 }
120 /* END FROM ketama.c */
121 
122 // not much to be done about this without making the interface unusable.
123 #define MODE_DEFAULT 0 // uses xxhash
124 #define MODE_KETAMA 1 // uses md5
125 #define MODE_TWEMPROXY 2 // technically "libmemcached" ?
126 #define MODE_EVCACHE 3 // not sure why this has a funny string init.
127 
128 // Not sure the hash algo used here matters all that much given the low number
129 // of points... but it might be better to let it be overrideable.
_add_server_default(ketama_t * kt,size_t hashstring_size,const char ** parts,lua_Integer bucket_size,lua_Integer id,unsigned int * cont)130 static void _add_server_default(ketama_t *kt, size_t hashstring_size, const char **parts,
131         lua_Integer bucket_size, lua_Integer id, unsigned int *cont) {
132     char *hashstring = malloc(hashstring_size);
133 
134     for (int k = 0; k < bucket_size; k++) {
135         size_t len = snprintf(hashstring, hashstring_size, "%s:%s-%d", parts[0], parts[1], k);
136         kt->continuum[*cont].point = (unsigned int) XXH3_64bits(hashstring, len);
137         kt->continuum[*cont].id = id;
138     }
139 
140     free(hashstring);
141 }
142 
_add_server_ketama(ketama_t * kt,size_t hashstring_size,const char ** parts,lua_Integer bucket_size,lua_Integer id,unsigned int * cont)143 static void _add_server_ketama(ketama_t *kt, size_t hashstring_size, const char **parts,
144         lua_Integer bucket_size, lua_Integer id, unsigned int *cont) {
145     char *hashstring = malloc(hashstring_size);
146 
147     for (int k = 0; k < bucket_size / 4; k++) {
148         unsigned char digest[16];
149 
150         // - create hashing string for ketama
151         snprintf(hashstring, hashstring_size, "%s:%s-%d", parts[0], parts[1], k);
152 
153         // - md5() hash it
154         // mostly from ketama.c
155         ketama_md5_digest(hashstring, digest);
156 
157         /* Use successive 4-bytes from hash as numbers
158          * for the points on the circle: */
159         for(int h = 0; h < 4; h++ )
160         {
161             kt->continuum[*cont].point = ( digest[3+h*4] << 24 )
162                                   | ( digest[2+h*4] << 16 )
163                                   | ( digest[1+h*4] <<  8 )
164                                   |   digest[h*4];
165             kt->continuum[*cont].id = id;
166             (*cont)++;
167         }
168 
169     }
170 
171     free(hashstring);
172 }
173 
_add_server_twemproxy(ketama_t * kt,size_t hashstring_size,const char ** parts,lua_Integer bucket_size,lua_Integer id,unsigned int * cont)174 static void _add_server_twemproxy(ketama_t *kt, size_t hashstring_size, const char **parts,
175         lua_Integer bucket_size, lua_Integer id, unsigned int *cont) {
176     char *hashstring = malloc(hashstring_size);
177 
178     for (int k = 0; k < bucket_size / 4; k++) {
179         unsigned char digest[16];
180 
181         // - create hashing string for ketama
182         if (strcmp(parts[1], "11211") == 0) {
183             // twemproxy sources libmemcached as removing the default port
184             // from the string if found.
185             snprintf(hashstring, hashstring_size, "%s-%d", parts[0], k);
186         } else {
187             snprintf(hashstring, hashstring_size, "%s:%s-%d", parts[0], parts[1], k);
188         }
189 
190         // - md5() hash it
191         // mostly from ketama.c
192         ketama_md5_digest(hashstring, digest);
193 
194         /* Use successive 4-bytes from hash as numbers
195          * for the points on the circle: */
196         for(int h = 0; h < 4; h++ )
197         {
198             kt->continuum[*cont].point = ( digest[3+h*4] << 24 )
199                                   | ( digest[2+h*4] << 16 )
200                                   | ( digest[1+h*4] <<  8 )
201                                   |   digest[h*4];
202             kt->continuum[*cont].id = id;
203             (*cont)++;
204         }
205 
206     }
207 
208     free(hashstring);
209 }
210 
_add_server_evcache(ketama_t * kt,size_t hashstring_size,const char ** parts,lua_Integer bucket_size,lua_Integer id,unsigned int * cont)211 static void _add_server_evcache(ketama_t *kt, size_t hashstring_size, const char **parts,
212         lua_Integer bucket_size, lua_Integer id, unsigned int *cont) {
213     char *hashstring = malloc(hashstring_size);
214 
215     for (int k = 0; k < bucket_size / 4; k++) {
216         unsigned char digest[16];
217 
218         // - create hashing string for ketama
219         snprintf(hashstring, hashstring_size, "%s/%s:%s-%d", parts[0], parts[0], parts[1], k);
220         // - md5() hash it
221         // mostly from ketama.c
222         ketama_md5_digest(hashstring, digest);
223 
224         /* Use successive 4-bytes from hash as numbers
225          * for the points on the circle: */
226         for(int h = 0; h < 4; h++ )
227         {
228             kt->continuum[*cont].point = ( digest[3+h*4] << 24 )
229                                   | ( digest[2+h*4] << 16 )
230                                   | ( digest[1+h*4] <<  8 )
231                                   |   digest[h*4];
232             kt->continuum[*cont].id = id;
233             (*cont)++;
234         }
235 
236     }
237 
238     free(hashstring);
239 }
240 
241 #define PARTS 2
242 // stack = [pool, option]
ketama_new(lua_State * L)243 static int ketama_new(lua_State *L) {
244     lua_Integer bucket_size = DEFAULT_BUCKET_SIZE;
245     const char *parts[PARTS];
246     size_t partlens[PARTS];
247     int makemode = 0;
248 
249     // check for UA_TTABLE at 1
250     luaL_checktype(L, 1, LUA_TTABLE);
251     // get number of servers in pool.
252     // NOTE: rawlen skips metatable redirection. if we care; lua_len instead.
253     lua_Unsigned total = lua_rawlen(L, 1);
254 
255     // check for optional input
256     int argc = lua_gettop(L);
257     if (argc > 1) {
258         luaL_checktype(L, 2, LUA_TTABLE);
259         if (lua_getfield(L, 2, "omode") != LUA_TNIL) {
260             luaL_checktype(L, -1, LUA_TSTRING);
261             const char *mode = lua_tostring(L, -1);
262             if (strcmp(mode, "default") == 0) {
263                 makemode = MODE_DEFAULT;
264             } else if (strcmp(mode, "ketama") == 0) {
265                 makemode = MODE_KETAMA;
266             } else if (strcmp(mode, "twemproxy") == 0) {
267                 makemode = MODE_TWEMPROXY;
268             } else if (strcmp(mode, "evcache") == 0) {
269                 makemode = MODE_EVCACHE;
270             } else {
271                 lua_pushstring(L, "ring_hash: bad omode argument");
272                 lua_error(L);
273             }
274         }
275         lua_pop(L, 1); // pops the nil or mode
276 
277         if (lua_getfield(L, 2, "obuckets") != LUA_TNIL) {
278           int success = 0;
279           bucket_size = lua_tointegerx(L, -1, &success);
280           if (!success || bucket_size < 1) {
281               lua_pushstring(L, "ring_hash: option argument must be a positive integer");
282               lua_error(L);
283           }
284         }
285         lua_pop(L, 1);
286     }
287 
288     // newuserdatauv() sized for pool*
289     size_t size = sizeof(ketama_t) + sizeof(cpoint) * (total * bucket_size);
290     ketama_t *kt = lua_newuserdatauv(L, size, 0);
291     // TODO: check *kt.
292     kt->total_buckets = bucket_size * total;
293 
294     // loop over pool
295     unsigned int cont = 0;
296     lua_pushnil(L); // start the pool iterator
297     while (lua_next(L, 1) != 0) {
298         // key is -2, value is -1.
299         // value is another table. need to query it to get what we need for
300         // the hash.
301         // hash string is: hostname/ipaddr:port-repitition
302         // TODO: bother doing error checking?
303         lua_getfield(L, -1, "id");
304         lua_Integer id = lua_tointeger(L, -1);
305         lua_pop(L, 1);
306 
307         // FIXME: we need to do the lua_pop after string assembly to be safe.
308         lua_getfield(L, -1, "addr");
309         parts[0] = lua_tolstring(L, -1, &partlens[0]);
310         lua_pop(L, 1);
311         lua_getfield(L, -1, "port");
312         parts[1] = lua_tolstring(L, -1, &partlens[1]);
313         lua_pop(L, 1);
314 
315         size_t hashstring_size = 0;
316         for (int x = 0; x < PARTS; x++) {
317             hashstring_size += partlens[x];
318         }
319 
320         // We have up to 3 delimiters in the final hashstring and an index
321         // 16 bytes is plenty to accomodate this requirement.
322         hashstring_size += 16;
323 
324         switch (makemode) {
325             case MODE_DEFAULT:
326                 _add_server_default(kt, hashstring_size, parts, bucket_size, id, &cont);
327                 break;
328             case MODE_KETAMA:
329                 _add_server_ketama(kt, hashstring_size, parts, bucket_size, id, &cont);
330                 break;
331             case MODE_TWEMPROXY:
332                 _add_server_twemproxy(kt, hashstring_size, parts, bucket_size, id, &cont);
333                 break;
334             case MODE_EVCACHE:
335                 // EVCache uses the ipaddress couple of times, we need to factor that in
336                 // when calculating the hashstring_size
337                 hashstring_size += partlens[0];
338                 _add_server_evcache(kt, hashstring_size, parts, bucket_size, id, &cont);
339                 break;
340         }
341 
342         lua_pop(L, 1); // remove value, leave key for next iteration.
343     }
344 
345     // - qsort the points
346     qsort( &kt->continuum, cont, sizeof(cpoint), ketama_compare);
347 
348     // set the hash/fetch function and the context ptr.
349     kt->phc.ctx = kt;
350     kt->phc.selector_func = ketama_get_server;
351 
352     // - add a pushlightuserdata for the sub-struct with func/ctx.
353     lua_pushlightuserdata(L, &kt->phc);
354     // - return [UD, lightuserdata]
355     return 2;
356 }
357 
mcplib_open_dist_ring_hash(lua_State * L)358 int mcplib_open_dist_ring_hash(lua_State *L) {
359     const struct luaL_Reg ketama_f[] = {
360         {"new", ketama_new},
361         {NULL, NULL},
362     };
363 
364     luaL_newlib(L, ketama_f);
365     lua_pushlightuserdata(L, &ketama_key_hash);
366     lua_setfield(L, -2, "hash");
367 
368     return 1;
369 }
370