1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright (c) 2022, Cache Forge LLC, All rights reserved.
4 * Alan Kasindorf <[email protected]>
5 * Copyright (c) 2007, Last.fm, All rights reserved.
6 * Richard Jones <[email protected]>
7 * Christian Muehlhaeuser <[email protected]>
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are met:
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 * * Neither the name of the Last.fm Limited nor the
17 * names of its contributors may be used to endorse or promote products
18 * derived from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY Last.fm ``AS IS'' AND ANY
21 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
22 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23 * DISCLAIMED. IN NO EVENT SHALL Last.fm BE LIABLE FOR ANY
24 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
25 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
26 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
27 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
29 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 */
31
32 #include "proxy.h"
33 #include "md5.h"
34
35 #define DEFAULT_BUCKET_SIZE 160
36
37 typedef struct {
38 unsigned int point; // continuum point.
39 unsigned int id; // server id.
40 } cpoint;
41
42 typedef struct {
43 struct proxy_hash_caller phc; // passed back to the proxy API.
44 unsigned int total_buckets;
45 cpoint continuum[]; // points to server ids.
46 } ketama_t;
47
48 static uint64_t ketama_key_hasher(const void *key, size_t len, uint64_t seed);
49 static struct proxy_hash_func ketama_key_hash = {
50 ketama_key_hasher,
51 };
52
53 /* FROM ketama.c */
ketama_md5_digest(char * inString,unsigned char md5pword[16])54 static void ketama_md5_digest( char* inString, unsigned char md5pword[16] )
55 {
56 md5_state_t md5state;
57
58 md5_init( &md5state );
59 md5_append( &md5state, (unsigned char *)inString, strlen( inString ) );
60 md5_finish( &md5state, md5pword );
61 }
62
ketama_compare(const void * p1,const void * p2)63 static int ketama_compare(const void *p1, const void *p2) {
64 const cpoint *a = p1;
65 const cpoint *b = p2;
66
67 return (a->point < b->point) ? -1 : ((a->point > b->point) ? 1 : 0);
68 }
69
ketama_key_hasher(const void * key,size_t len,uint64_t seed)70 static uint64_t ketama_key_hasher(const void *key, size_t len, uint64_t seed) {
71 // NOTE: seed is ignored!
72 // embedding the md5 bits since key is specified with a length here.
73 md5_state_t md5state;
74 unsigned char digest[16];
75
76 md5_init(&md5state);
77 md5_append(&md5state, (unsigned char *)key, len);
78 md5_finish(&md5state, digest);
79
80 // mix the hash down (from ketama_hashi)
81 unsigned int h = (unsigned int)(( digest[3] << 24 )
82 | ( digest[2] << 16 )
83 | ( digest[1] << 8 )
84 | digest[0] );
85 return h;
86 }
87
88 // Note: must return lookupas as zero-indexed.
ketama_get_server(uint64_t hash,void * ctx)89 static uint32_t ketama_get_server(uint64_t hash, void *ctx) {
90 ketama_t *kt = (ketama_t *)ctx;
91 unsigned int h = hash;
92 int highp = kt->total_buckets;
93 int lowp = 0, midp;
94 unsigned int midval, midval1;
95
96 // divide and conquer array search to find server with next biggest
97 // point after what this key hashes to
98 while ( 1 )
99 {
100 midp = (int)( ( lowp+highp ) / 2 );
101
102 if ( midp == kt->total_buckets )
103 return kt->continuum[0].id-1; // if at the end, roll back to zeroth
104
105 midval = kt->continuum[midp].point;
106 midval1 = midp == 0 ? 0 : kt->continuum[midp-1].point;
107
108 if ( h <= midval && h > midval1 )
109 return kt->continuum[midp].id-1;
110
111 if ( midval < h )
112 lowp = midp + 1;
113 else
114 highp = midp - 1;
115
116 if ( lowp > highp )
117 return kt->continuum[0].id-1;
118 }
119 }
120 /* END FROM ketama.c */
121
122 // not much to be done about this without making the interface unusable.
123 #define MODE_DEFAULT 0 // uses xxhash
124 #define MODE_KETAMA 1 // uses md5
125 #define MODE_TWEMPROXY 2 // technically "libmemcached" ?
126 #define MODE_EVCACHE 3 // not sure why this has a funny string init.
127
128 // Not sure the hash algo used here matters all that much given the low number
129 // of points... but it might be better to let it be overrideable.
_add_server_default(ketama_t * kt,size_t hashstring_size,const char ** parts,lua_Integer bucket_size,lua_Integer id,unsigned int * cont)130 static void _add_server_default(ketama_t *kt, size_t hashstring_size, const char **parts,
131 lua_Integer bucket_size, lua_Integer id, unsigned int *cont) {
132 char *hashstring = malloc(hashstring_size);
133
134 for (int k = 0; k < bucket_size; k++) {
135 size_t len = snprintf(hashstring, hashstring_size, "%s:%s-%d", parts[0], parts[1], k);
136 kt->continuum[*cont].point = (unsigned int) XXH3_64bits(hashstring, len);
137 kt->continuum[*cont].id = id;
138 }
139
140 free(hashstring);
141 }
142
_add_server_ketama(ketama_t * kt,size_t hashstring_size,const char ** parts,lua_Integer bucket_size,lua_Integer id,unsigned int * cont)143 static void _add_server_ketama(ketama_t *kt, size_t hashstring_size, const char **parts,
144 lua_Integer bucket_size, lua_Integer id, unsigned int *cont) {
145 char *hashstring = malloc(hashstring_size);
146
147 for (int k = 0; k < bucket_size / 4; k++) {
148 unsigned char digest[16];
149
150 // - create hashing string for ketama
151 snprintf(hashstring, hashstring_size, "%s:%s-%d", parts[0], parts[1], k);
152
153 // - md5() hash it
154 // mostly from ketama.c
155 ketama_md5_digest(hashstring, digest);
156
157 /* Use successive 4-bytes from hash as numbers
158 * for the points on the circle: */
159 for(int h = 0; h < 4; h++ )
160 {
161 kt->continuum[*cont].point = ( digest[3+h*4] << 24 )
162 | ( digest[2+h*4] << 16 )
163 | ( digest[1+h*4] << 8 )
164 | digest[h*4];
165 kt->continuum[*cont].id = id;
166 (*cont)++;
167 }
168
169 }
170
171 free(hashstring);
172 }
173
_add_server_twemproxy(ketama_t * kt,size_t hashstring_size,const char ** parts,lua_Integer bucket_size,lua_Integer id,unsigned int * cont)174 static void _add_server_twemproxy(ketama_t *kt, size_t hashstring_size, const char **parts,
175 lua_Integer bucket_size, lua_Integer id, unsigned int *cont) {
176 char *hashstring = malloc(hashstring_size);
177
178 for (int k = 0; k < bucket_size / 4; k++) {
179 unsigned char digest[16];
180
181 // - create hashing string for ketama
182 if (strcmp(parts[1], "11211") == 0) {
183 // twemproxy sources libmemcached as removing the default port
184 // from the string if found.
185 snprintf(hashstring, hashstring_size, "%s-%d", parts[0], k);
186 } else {
187 snprintf(hashstring, hashstring_size, "%s:%s-%d", parts[0], parts[1], k);
188 }
189
190 // - md5() hash it
191 // mostly from ketama.c
192 ketama_md5_digest(hashstring, digest);
193
194 /* Use successive 4-bytes from hash as numbers
195 * for the points on the circle: */
196 for(int h = 0; h < 4; h++ )
197 {
198 kt->continuum[*cont].point = ( digest[3+h*4] << 24 )
199 | ( digest[2+h*4] << 16 )
200 | ( digest[1+h*4] << 8 )
201 | digest[h*4];
202 kt->continuum[*cont].id = id;
203 (*cont)++;
204 }
205
206 }
207
208 free(hashstring);
209 }
210
_add_server_evcache(ketama_t * kt,size_t hashstring_size,const char ** parts,lua_Integer bucket_size,lua_Integer id,unsigned int * cont)211 static void _add_server_evcache(ketama_t *kt, size_t hashstring_size, const char **parts,
212 lua_Integer bucket_size, lua_Integer id, unsigned int *cont) {
213 char *hashstring = malloc(hashstring_size);
214
215 for (int k = 0; k < bucket_size / 4; k++) {
216 unsigned char digest[16];
217
218 // - create hashing string for ketama
219 snprintf(hashstring, hashstring_size, "%s/%s:%s-%d", parts[0], parts[0], parts[1], k);
220 // - md5() hash it
221 // mostly from ketama.c
222 ketama_md5_digest(hashstring, digest);
223
224 /* Use successive 4-bytes from hash as numbers
225 * for the points on the circle: */
226 for(int h = 0; h < 4; h++ )
227 {
228 kt->continuum[*cont].point = ( digest[3+h*4] << 24 )
229 | ( digest[2+h*4] << 16 )
230 | ( digest[1+h*4] << 8 )
231 | digest[h*4];
232 kt->continuum[*cont].id = id;
233 (*cont)++;
234 }
235
236 }
237
238 free(hashstring);
239 }
240
241 #define PARTS 2
242 // stack = [pool, option]
ketama_new(lua_State * L)243 static int ketama_new(lua_State *L) {
244 lua_Integer bucket_size = DEFAULT_BUCKET_SIZE;
245 const char *parts[PARTS];
246 size_t partlens[PARTS];
247 int makemode = 0;
248
249 // check for UA_TTABLE at 1
250 luaL_checktype(L, 1, LUA_TTABLE);
251 // get number of servers in pool.
252 // NOTE: rawlen skips metatable redirection. if we care; lua_len instead.
253 lua_Unsigned total = lua_rawlen(L, 1);
254
255 // check for optional input
256 int argc = lua_gettop(L);
257 if (argc > 1) {
258 luaL_checktype(L, 2, LUA_TTABLE);
259 if (lua_getfield(L, 2, "omode") != LUA_TNIL) {
260 luaL_checktype(L, -1, LUA_TSTRING);
261 const char *mode = lua_tostring(L, -1);
262 if (strcmp(mode, "default") == 0) {
263 makemode = MODE_DEFAULT;
264 } else if (strcmp(mode, "ketama") == 0) {
265 makemode = MODE_KETAMA;
266 } else if (strcmp(mode, "twemproxy") == 0) {
267 makemode = MODE_TWEMPROXY;
268 } else if (strcmp(mode, "evcache") == 0) {
269 makemode = MODE_EVCACHE;
270 } else {
271 lua_pushstring(L, "ring_hash: bad omode argument");
272 lua_error(L);
273 }
274 }
275 lua_pop(L, 1); // pops the nil or mode
276
277 if (lua_getfield(L, 2, "obuckets") != LUA_TNIL) {
278 int success = 0;
279 bucket_size = lua_tointegerx(L, -1, &success);
280 if (!success || bucket_size < 1) {
281 lua_pushstring(L, "ring_hash: option argument must be a positive integer");
282 lua_error(L);
283 }
284 }
285 lua_pop(L, 1);
286 }
287
288 // newuserdatauv() sized for pool*
289 size_t size = sizeof(ketama_t) + sizeof(cpoint) * (total * bucket_size);
290 ketama_t *kt = lua_newuserdatauv(L, size, 0);
291 // TODO: check *kt.
292 kt->total_buckets = bucket_size * total;
293
294 // loop over pool
295 unsigned int cont = 0;
296 lua_pushnil(L); // start the pool iterator
297 while (lua_next(L, 1) != 0) {
298 // key is -2, value is -1.
299 // value is another table. need to query it to get what we need for
300 // the hash.
301 // hash string is: hostname/ipaddr:port-repitition
302 // TODO: bother doing error checking?
303 lua_getfield(L, -1, "id");
304 lua_Integer id = lua_tointeger(L, -1);
305 lua_pop(L, 1);
306
307 // FIXME: we need to do the lua_pop after string assembly to be safe.
308 lua_getfield(L, -1, "addr");
309 parts[0] = lua_tolstring(L, -1, &partlens[0]);
310 lua_pop(L, 1);
311 lua_getfield(L, -1, "port");
312 parts[1] = lua_tolstring(L, -1, &partlens[1]);
313 lua_pop(L, 1);
314
315 size_t hashstring_size = 0;
316 for (int x = 0; x < PARTS; x++) {
317 hashstring_size += partlens[x];
318 }
319
320 // We have up to 3 delimiters in the final hashstring and an index
321 // 16 bytes is plenty to accomodate this requirement.
322 hashstring_size += 16;
323
324 switch (makemode) {
325 case MODE_DEFAULT:
326 _add_server_default(kt, hashstring_size, parts, bucket_size, id, &cont);
327 break;
328 case MODE_KETAMA:
329 _add_server_ketama(kt, hashstring_size, parts, bucket_size, id, &cont);
330 break;
331 case MODE_TWEMPROXY:
332 _add_server_twemproxy(kt, hashstring_size, parts, bucket_size, id, &cont);
333 break;
334 case MODE_EVCACHE:
335 // EVCache uses the ipaddress couple of times, we need to factor that in
336 // when calculating the hashstring_size
337 hashstring_size += partlens[0];
338 _add_server_evcache(kt, hashstring_size, parts, bucket_size, id, &cont);
339 break;
340 }
341
342 lua_pop(L, 1); // remove value, leave key for next iteration.
343 }
344
345 // - qsort the points
346 qsort( &kt->continuum, cont, sizeof(cpoint), ketama_compare);
347
348 // set the hash/fetch function and the context ptr.
349 kt->phc.ctx = kt;
350 kt->phc.selector_func = ketama_get_server;
351
352 // - add a pushlightuserdata for the sub-struct with func/ctx.
353 lua_pushlightuserdata(L, &kt->phc);
354 // - return [UD, lightuserdata]
355 return 2;
356 }
357
mcplib_open_dist_ring_hash(lua_State * L)358 int mcplib_open_dist_ring_hash(lua_State *L) {
359 const struct luaL_Reg ketama_f[] = {
360 {"new", ketama_new},
361 {NULL, NULL},
362 };
363
364 luaL_newlib(L, ketama_f);
365 lua_pushlightuserdata(L, &ketama_key_hash);
366 lua_setfield(L, -2, "hash");
367
368 return 1;
369 }
370