1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3 #include "proxy.h"
4 #include "proxy_tls.h"
5 #include "storage.h" // for stats call
6
7 // func prototype example:
8 // static int fname (lua_State *L)
9 // normal library open:
10 // int luaopen_mcp(lua_State *L) { }
11
12 struct _mcplib_statctx_s {
13 lua_State *L;
14 };
15
_mcplib_append_stats(const char * key,const uint16_t klen,const char * val,const uint32_t vlen,const void * cookie)16 static void _mcplib_append_stats(const char *key, const uint16_t klen,
17 const char *val, const uint32_t vlen,
18 const void *cookie) {
19 // k + v == 0 means END, but we don't use END for this lua API.
20 if (klen == 0) {
21 return;
22 }
23
24 // cookie -> struct
25 const struct _mcplib_statctx_s *c = cookie;
26 lua_State *L = c->L;
27 // table should always be on the top.
28 lua_pushlstring(L, key, klen);
29 lua_pushlstring(L, val, vlen);
30 lua_rawset(L, -3);
31 }
32
_mcplib_append_section_stats(const char * key,const uint16_t klen,const char * val,const uint32_t vlen,const void * cookie)33 static void _mcplib_append_section_stats(const char *key, const uint16_t klen,
34 const char *val, const uint32_t vlen,
35 const void *cookie) {
36 char stat[STAT_KEY_LEN];
37 long section = 0;
38 if (klen == 0) {
39 return;
40 }
41
42 const struct _mcplib_statctx_s *c = cookie;
43 lua_State *L = c->L;
44 // table must be at the top when this function is called.
45 int tidx = lua_absindex(L, -1);
46
47 // NOTE: sscanf is not great, especially with numerics due to UD for out
48 // of range data. It is safe to use here because we're generating the
49 // strings, and we don't use this function on anything that has user
50 // defined data (ie; stats proxy). Otherwise sscanf saves a lot of code so
51 // we use it here.
52 if (sscanf(key, "items:%ld:%s", §ion, stat) == 2
53 || sscanf(key, "%ld:%s", §ion, stat) == 2) {
54 // stats [items, slabs, conns]
55 if (lua_rawgeti(L, tidx, section) == LUA_TNIL) {
56 lua_pop(L, 1); // drop the nil
57 // no sub-section table yet, create one.
58 lua_newtable(L);
59 lua_pushvalue(L, -1); // copy the table
60 lua_rawseti(L, tidx, section); // remember the table
61 // now top of stack is the table.
62 }
63
64 lua_pushstring(L, stat);
65 lua_pushlstring(L, val, vlen);
66 lua_rawset(L, -3); // put key/val into sub-table
67 lua_pop(L, 1); // pop sub-table.
68 } else {
69 // normal stat counter.
70 lua_pushlstring(L, key, klen);
71 lua_pushlstring(L, val, vlen);
72 lua_rawset(L, tidx);
73 }
74 }
75
76 // reimplementation of proto_text.c:process_stat()
mcplib_server_stats(lua_State * L)77 static int mcplib_server_stats(lua_State *L) {
78 int argc = lua_gettop(L);
79 proxy_ctx_t *ctx = PROXY_GET_CTX(L);
80 lua_newtable(L); // the table to return.
81 struct _mcplib_statctx_s c = {
82 L,
83 };
84
85 if (argc == 0 || lua_isnil(L, 1)) {
86 server_stats(&_mcplib_append_stats, &c);
87 get_stats(NULL, 0, &_mcplib_append_stats, &c);
88 } else {
89 const char *cmd = luaL_checkstring(L, 1);
90 if (strcmp(cmd, "settings") == 0) {
91 process_stat_settings(&_mcplib_append_stats, &c);
92 } else if (strcmp(cmd, "conns") == 0) {
93 process_stats_conns(&_mcplib_append_section_stats, &c);
94 #ifdef EXTSTORE
95 } else if (strcmp(cmd, "extstore") == 0) {
96 process_extstore_stats(&_mcplib_append_stats, &c);
97 #endif
98 } else if (strcmp(cmd, "proxy") == 0) {
99 process_proxy_stats(ctx, &_mcplib_append_stats, &c);
100 } else if (strcmp(cmd, "proxyfuncs") == 0) {
101 process_proxy_funcstats(ctx, &_mcplib_append_stats, &c);
102 } else if (strcmp(cmd, "proxybe") == 0) {
103 process_proxy_bestats(ctx, &_mcplib_append_stats, &c);
104 } else {
105 if (get_stats(cmd, strlen(cmd), &_mcplib_append_section_stats, &c)) {
106 // all good.
107 } else {
108 // unknown command.
109 proxy_lua_error(L, "unknown subcommand passed to server_stats");
110 }
111 }
112 }
113
114 // return the table.
115 return 1;
116 }
117
_mcplib_backend_get_waittime(lua_Number secondsf)118 static lua_Integer _mcplib_backend_get_waittime(lua_Number secondsf) {
119 lua_Integer secondsi = (lua_Integer) secondsf;
120 lua_Number subseconds = secondsf - secondsi;
121 if (subseconds >= 0.5) {
122 // Yes, I know this rounding is probably wrong. it's close enough.
123 // Rounding functions have tricky portability and whole-integer
124 // rounding is at least simpler to reason about.
125 secondsi++;
126 }
127 if (secondsi < 1) {
128 secondsi = 1;
129 }
130 return secondsi;
131 }
132
133 // take string, table as arg:
134 // name, { every =, rerun = false, func = f }
135 // repeat defaults to true
mcplib_register_cron(lua_State * L)136 static int mcplib_register_cron(lua_State *L) {
137 proxy_ctx_t *ctx = PROXY_GET_CTX(L);
138 const char *name = luaL_checkstring(L, 1);
139 luaL_checktype(L, 2, LUA_TTABLE);
140
141 // reserve an upvalue for storing the function.
142 mcp_cron_t *ce = lua_newuserdatauv(L, sizeof(mcp_cron_t), 1);
143 memset(ce, 0, sizeof(*ce));
144
145 // default repeat.
146 ce->repeat = true;
147 // sync config generation.
148 ce->gen = ctx->config_generation;
149
150 if (lua_getfield(L, 2, "func") != LUA_TNIL) {
151 luaL_checktype(L, -1, LUA_TFUNCTION);
152 lua_setiuservalue(L, 3, 1); // pop value
153 } else {
154 proxy_lua_error(L, "proxy cron entry missing 'func' field");
155 return 0;
156 }
157
158 if (lua_getfield(L, 2, "rerun") != LUA_TNIL) {
159 int rerun = lua_toboolean(L, -1);
160 if (!rerun) {
161 ce->repeat = false;
162 }
163 }
164 lua_pop(L, 1); // pop val or nil
165
166 // TODO: set a limit on 'every' so we don't have to worry about
167 // underflows. a year? a month?
168 if (lua_getfield(L, 2, "every") != LUA_TNIL) {
169 luaL_checktype(L, -1, LUA_TNUMBER);
170 int every = lua_tointeger(L, -1);
171 if (every < 1) {
172 proxy_lua_error(L, "proxy cron entry 'every' must be > 0");
173 return 0;
174 }
175 ce->every = every;
176 } else {
177 proxy_lua_error(L, "proxy cron entry missing 'every' field");
178 return 0;
179 }
180 lua_pop(L, 1); // pop val or nil
181
182 // schedule the next cron run
183 struct timespec now;
184 clock_gettime(CLOCK_REALTIME, &now);
185 ce->next = now.tv_sec + ce->every;
186 // we may adjust ce->next shortly, so don't update global yet.
187
188 // valid cron entry, now place into cron table.
189 lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->cron_ref);
190
191 // first, check if a cron of this name already exists.
192 // if so and the 'every' field matches, inherit its 'next' field
193 // so we don't perpetually reschedule all crons.
194 if (lua_getfield(L, -1, name) != LUA_TNIL) {
195 mcp_cron_t *oldce = lua_touserdata(L, -1);
196 if (ce->every == oldce->every) {
197 ce->next = oldce->next;
198 }
199 }
200 lua_pop(L, 1); // drop val/nil
201
202 lua_pushvalue(L, 3); // duplicate cron entry
203 lua_setfield(L, -2, name); // pop duplicate cron entry
204 lua_pop(L, 1); // drop cron table
205
206 // update central cron sleep.
207 if (ctx->cron_next > ce->next) {
208 ctx->cron_next = ce->next;
209 }
210
211 return 0;
212 }
213
214 // just set ctx->loading = true
215 // called from config thread, so config_lock must be held, so it's safe to
216 // modify protected ctx contents.
mcplib_schedule_config_reload(lua_State * L)217 static int mcplib_schedule_config_reload(lua_State *L) {
218 proxy_ctx_t *ctx = PROXY_GET_CTX(L);
219 ctx->loading = true;
220 return 0;
221 }
222
mcplib_time_real_millis(lua_State * L)223 static int mcplib_time_real_millis(lua_State *L) {
224 struct timespec now;
225 clock_gettime(CLOCK_REALTIME, &now);
226 lua_Integer t = now.tv_nsec / 1000000 + now.tv_sec * 1000;
227 lua_pushinteger(L, t);
228 return 1;
229 }
230
mcplib_time_mono_millis(lua_State * L)231 static int mcplib_time_mono_millis(lua_State *L) {
232 struct timespec now;
233 clock_gettime(CLOCK_MONOTONIC, &now);
234 lua_Integer t = now.tv_nsec / 1000000 + now.tv_sec * 1000;
235 lua_pushinteger(L, t);
236 return 1;
237 }
238
239 // end util funcs.
240
241 // NOTE: backends are global objects owned by pool objects.
242 // Each pool has a "proxy pool object" distributed to each worker VM.
243 // proxy pool objects are held at the same time as any request exists on a
244 // backend, in the coroutine stack during yield()
245 // To free a backend: All proxies for a pool are collected, then the central
246 // pool is collected, which releases backend references, which allows backend
247 // to be collected.
mcplib_backend_wrap_gc(lua_State * L)248 static int mcplib_backend_wrap_gc(lua_State *L) {
249 mcp_backend_wrap_t *bew = luaL_checkudata(L, -1, "mcp.backendwrap");
250 proxy_ctx_t *ctx = PROXY_GET_CTX(L);
251
252 if (bew->be != NULL) {
253 mcp_backend_t *be = bew->be;
254 // TODO (v3): technically a race where a backend could be created,
255 // queued, but not picked up before being gc'ed again. In practice
256 // this is impossible but at some point we should close the loop here.
257 // Since we're running in the config thread it could just busy poll
258 // until the connection was picked up.
259 assert(be->transferred);
260 // There has to be at least one connection, and the event_thread will
261 // always be the same.
262 proxy_event_thread_t *e = be->be[0].event_thread;
263 pthread_mutex_lock(&e->mutex);
264 STAILQ_INSERT_TAIL(&e->beconn_head_in, be, beconn_next);
265 pthread_mutex_unlock(&e->mutex);
266
267 // Signal to check queue.
268 #ifdef USE_EVENTFD
269 uint64_t u = 1;
270 // TODO (v2): check result? is it ever possible to get a short write/failure
271 // for an eventfd?
272 if (write(e->be_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
273 assert(1 == 0);
274 }
275 #else
276 if (write(e->be_notify_send_fd, "w", 1) <= 0) {
277 assert(1 == 0);
278 }
279 #endif
280 }
281
282 STAT_DECR(ctx, backend_total, 1);
283
284 return 0;
285 }
286
mcplib_backend_gc(lua_State * L)287 static int mcplib_backend_gc(lua_State *L) {
288 return 0; // no-op.
289 }
290
291 // backend label object; given to pools which then find or create backend
292 // objects as necessary.
293 // allow optionally passing a table of arguments for extended options:
294 // { label = "etc", "host" = "127.0.0.1", port = "11211",
295 // readtimeout = 0.5, connecttimeout = 1, retrytime = 3,
296 // failurelimit = 3, tcpkeepalive = false }
mcplib_backend(lua_State * L)297 static int mcplib_backend(lua_State *L) {
298 size_t llen = 0;
299 size_t nlen = 0;
300 size_t plen = 0;
301 proxy_ctx_t *ctx = PROXY_GET_CTX(L);
302 mcp_backend_label_t *be = lua_newuserdatauv(L, sizeof(mcp_backend_label_t), 0);
303 memset(be, 0, sizeof(*be));
304 const char *label;
305 const char *name;
306 const char *port;
307 // copy global defaults for tunables.
308 memcpy(&be->tunables, &ctx->tunables, sizeof(be->tunables));
309 be->conncount = 1; // one connection per backend as default.
310
311 if (lua_istable(L, 1)) {
312
313 // We don't pop the label/host/port strings so lua won't change them
314 // until after the function call.
315 if (lua_getfield(L, 1, "label") != LUA_TNIL) {
316 label = luaL_checklstring(L, -1, &llen);
317 } else {
318 proxy_lua_error(L, "backend must have a label argument");
319 return 0;
320 }
321
322 if (lua_getfield(L, 1, "host") != LUA_TNIL) {
323 name = luaL_checklstring(L, -1, &nlen);
324 } else {
325 proxy_lua_error(L, "backend must have a host argument");
326 return 0;
327 }
328
329 // TODO: allow a default port.
330 if (lua_getfield(L, 1, "port") != LUA_TNIL) {
331 port = luaL_checklstring(L, -1, &plen);
332 } else {
333 proxy_lua_error(L, "backend must have a port argument");
334 return 0;
335 }
336
337 if (lua_getfield(L, 1, "tcpkeepalive") != LUA_TNIL) {
338 be->tunables.tcp_keepalive = lua_toboolean(L, -1);
339 }
340 lua_pop(L, 1);
341
342 if (lua_getfield(L, 1, "tls") != LUA_TNIL) {
343 be->tunables.use_tls = lua_toboolean(L, -1);
344 }
345 lua_pop(L, 1);
346
347 if (lua_getfield(L, 1, "failurelimit") != LUA_TNIL) {
348 int limit = luaL_checkinteger(L, -1);
349 if (limit < 0) {
350 proxy_lua_error(L, "failurelimit must be >= 0");
351 return 0;
352 }
353
354 be->tunables.backend_failure_limit = limit;
355 }
356 lua_pop(L, 1);
357
358 if (lua_getfield(L, 1, "depthlimit") != LUA_TNIL) {
359 int limit = luaL_checkinteger(L, -1);
360 if (limit < 0) {
361 proxy_lua_error(L, "depthlimit must be >= 0");
362 return 0;
363 }
364
365 be->tunables.backend_depth_limit = limit;
366 }
367 lua_pop(L, 1);
368
369 if (lua_getfield(L, 1, "connecttimeout") != LUA_TNIL) {
370 lua_Number secondsf = luaL_checknumber(L, -1);
371 lua_Integer secondsi = (lua_Integer) secondsf;
372 lua_Number subseconds = secondsf - secondsi;
373
374 be->tunables.connect.tv_sec = secondsi;
375 be->tunables.connect.tv_usec = MICROSECONDS(subseconds);
376 }
377 lua_pop(L, 1);
378
379 // TODO (v2): print deprecation warning.
380 if (lua_getfield(L, 1, "retrytimeout") != LUA_TNIL) {
381 be->tunables.retry.tv_sec =
382 _mcplib_backend_get_waittime(luaL_checknumber(L, -1));
383 }
384 lua_pop(L, 1);
385
386 if (lua_getfield(L, 1, "retrywaittime") != LUA_TNIL) {
387 be->tunables.retry.tv_sec =
388 _mcplib_backend_get_waittime(luaL_checknumber(L, -1));
389 }
390 lua_pop(L, 1);
391
392 if (lua_getfield(L, 1, "retrytimeout") != LUA_TNIL) {
393 lua_Number secondsf = luaL_checknumber(L, -1);
394 lua_Integer secondsi = (lua_Integer) secondsf;
395 lua_Number subseconds = secondsf - secondsi;
396
397 be->tunables.retry.tv_sec = secondsi;
398 be->tunables.retry.tv_usec = MICROSECONDS(subseconds);
399 }
400 lua_pop(L, 1);
401
402 if (lua_getfield(L, 1, "readtimeout") != LUA_TNIL) {
403 lua_Number secondsf = luaL_checknumber(L, -1);
404 lua_Integer secondsi = (lua_Integer) secondsf;
405 lua_Number subseconds = secondsf - secondsi;
406
407 be->tunables.read.tv_sec = secondsi;
408 be->tunables.read.tv_usec = MICROSECONDS(subseconds);
409 }
410 lua_pop(L, 1);
411
412 if (lua_getfield(L, 1, "down") != LUA_TNIL) {
413 int down = lua_toboolean(L, -1);
414 be->tunables.down = down;
415 }
416 lua_pop(L, 1);
417
418 if (lua_getfield(L, 1, "flaptime") != LUA_TNIL) {
419 lua_Number secondsf = luaL_checknumber(L, -1);
420 lua_Integer secondsi = (lua_Integer) secondsf;
421 lua_Number subseconds = secondsf - secondsi;
422
423 be->tunables.flap.tv_sec = secondsi;
424 be->tunables.flap.tv_usec = MICROSECONDS(subseconds);
425 }
426 lua_pop(L, 1);
427
428 if (lua_getfield(L, 1, "flapbackofframp") != LUA_TNIL) {
429 float ramp = luaL_checknumber(L, -1);
430 if (ramp <= 1.1) {
431 ramp = 1.1;
432 }
433 be->tunables.flap_backoff_ramp = ramp;
434 }
435 lua_pop(L, 1);
436
437 if (lua_getfield(L, 1, "flapbackoffmax") != LUA_TNIL) {
438 luaL_checknumber(L, -1);
439 uint32_t max = lua_tointeger(L, -1);
440 be->tunables.flap_backoff_max = max;
441 }
442 lua_pop(L, 1);
443
444 if (lua_getfield(L, 1, "connections") != LUA_TNIL) {
445 int c = luaL_checkinteger(L, -1);
446 if (c <= 0) {
447 proxy_lua_error(L, "backend connections argument must be >= 0");
448 return 0;
449 } else if (c > 8) {
450 proxy_lua_error(L, "backend connections argument must be <= 8");
451 return 0;
452 }
453
454 be->conncount = c;
455 }
456 lua_pop(L, 1);
457
458 } else {
459 label = luaL_checklstring(L, 1, &llen);
460 name = luaL_checklstring(L, 2, &nlen);
461 port = luaL_checklstring(L, 3, &plen);
462 }
463
464 if (llen > MAX_LABELLEN-1) {
465 proxy_lua_error(L, "backend label too long");
466 return 0;
467 }
468
469 if (nlen > MAX_NAMELEN-1) {
470 proxy_lua_error(L, "backend name too long");
471 return 0;
472 }
473
474 if (plen > MAX_PORTLEN-1) {
475 proxy_lua_error(L, "backend port too long");
476 return 0;
477 }
478
479 memcpy(be->label, label, llen);
480 be->label[llen] = '\0';
481 memcpy(be->name, name, nlen);
482 be->name[nlen] = '\0';
483 memcpy(be->port, port, plen);
484 be->port[plen] = '\0';
485 be->llen = llen;
486 if (lua_istable(L, 1)) {
487 lua_pop(L, 3); // drop label, name, port.
488 }
489 luaL_getmetatable(L, "mcp.backend");
490 lua_setmetatable(L, -2); // set metatable to userdata.
491
492 return 1; // return be object.
493 }
494
495 // Called with the cache label at top of the stack.
_mcplib_backend_checkcache(lua_State * L,mcp_backend_label_t * bel)496 static mcp_backend_wrap_t *_mcplib_backend_checkcache(lua_State *L, mcp_backend_label_t *bel) {
497 // first check our reference table to compare.
498 // Note: The upvalue won't be found unless we're running from a function with it
499 // set as an upvalue.
500 int ret = lua_gettable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE));
501 if (ret != LUA_TNIL) {
502 mcp_backend_wrap_t *be_orig = luaL_checkudata(L, -1, "mcp.backendwrap");
503 if (strncmp(be_orig->be->name, bel->name, MAX_NAMELEN) == 0
504 && strncmp(be_orig->be->port, bel->port, MAX_PORTLEN) == 0
505 && be_orig->be->conncount == bel->conncount
506 && memcmp(&be_orig->be->tunables, &bel->tunables, sizeof(bel->tunables)) == 0) {
507 // backend is the same, return it.
508 return be_orig;
509 } else {
510 // backend not the same, pop from stack and make new one.
511 lua_pop(L, 1);
512 }
513 } else {
514 lua_pop(L, 1); // pop the nil.
515 }
516
517 return NULL;
518 }
519
_mcplib_make_backendconn(lua_State * L,mcp_backend_label_t * bel,proxy_event_thread_t * e)520 static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_label_t *bel,
521 proxy_event_thread_t *e) {
522 proxy_ctx_t *ctx = PROXY_GET_CTX(L);
523
524 mcp_backend_wrap_t *bew = lua_newuserdatauv(L, sizeof(mcp_backend_wrap_t), 0);
525 luaL_getmetatable(L, "mcp.backendwrap");
526 lua_setmetatable(L, -2); // set metatable to userdata.
527
528 mcp_backend_t *be = calloc(1, sizeof(mcp_backend_t) + sizeof(struct mcp_backendconn_s) * bel->conncount);
529 if (be == NULL) {
530 proxy_lua_error(L, "out of memory allocating backend connection");
531 return NULL;
532 }
533 bew->be = be;
534
535 strncpy(be->name, bel->name, MAX_NAMELEN+1);
536 strncpy(be->port, bel->port, MAX_PORTLEN+1);
537 strncpy(be->label, bel->label, MAX_LABELLEN+1);
538 memcpy(&be->tunables, &bel->tunables, sizeof(bel->tunables));
539 be->conncount = bel->conncount;
540 STAILQ_INIT(&be->io_head);
541
542 for (int x = 0; x < bel->conncount; x++) {
543 struct mcp_backendconn_s *bec = &be->be[x];
544 bec->be_parent = be;
545 memcpy(&bec->tunables, &bel->tunables, sizeof(bel->tunables));
546 STAILQ_INIT(&bec->io_head);
547 bec->state = mcp_backend_read;
548
549 // this leaves a permanent buffer on the backend, which is fine
550 // unless you have billions of backends.
551 // we can later optimize for pulling buffers from idle backends.
552 bec->rbuf = malloc(READ_BUFFER_SIZE);
553 if (bec->rbuf == NULL) {
554 proxy_lua_error(L, "out of memory allocating backend");
555 return NULL;
556 }
557
558 // initialize the client
559 bec->client = malloc(mcmc_size(MCMC_OPTION_BLANK));
560 if (bec->client == NULL) {
561 proxy_lua_error(L, "out of memory allocating backend");
562 return NULL;
563 }
564 // TODO (v2): no way to change the TCP_KEEPALIVE state post-construction.
565 // This is a trivial fix if we ensure a backend's owning event thread is
566 // set before it can be used in the proxy, as it would have access to the
567 // tunables structure. _reset_bad_backend() may not have its event thread
568 // set 100% of the time and I don't want to introduce a crash right now,
569 // so I'm writing this overly long comment. :)
570 int flags = MCMC_OPTION_NONBLOCK;
571 STAT_L(ctx);
572 if (ctx->tunables.tcp_keepalive) {
573 flags |= MCMC_OPTION_TCP_KEEPALIVE;
574 }
575 STAT_UL(ctx);
576 bec->connect_flags = flags;
577
578 // FIXME: remove ifdef via an initialized checker? or
579 // mcp_tls_backend_init response code?
580 #ifdef PROXY_TLS
581 if (be->tunables.use_tls && !ctx->tls_ctx) {
582 proxy_lua_error(L, "TLS requested but not initialized: call mcp.init_tls()");
583 return NULL;
584 }
585 #endif
586 mcp_tls_backend_init(ctx, bec);
587
588 bec->event_thread = e;
589 }
590 pthread_mutex_lock(&e->mutex);
591 STAILQ_INSERT_TAIL(&e->beconn_head_in, be, beconn_next);
592 pthread_mutex_unlock(&e->mutex);
593
594 // Signal to check queue.
595 #ifdef USE_EVENTFD
596 uint64_t u = 1;
597 // TODO (v2): check result? is it ever possible to get a short write/failure
598 // for an eventfd?
599 if (write(e->be_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
600 assert(1 == 0);
601 }
602 #else
603 if (write(e->be_notify_send_fd, "w", 1) <= 0) {
604 assert(1 == 0);
605 }
606 #endif
607
608 lua_pushvalue(L, -2); // push the label string back to the top.
609 // Add this new backend connection to the object cache.
610 lua_pushvalue(L, -2); // copy the backend reference to the top.
611 // set our new backend wrapper object into the reference table.
612 lua_settable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE));
613 // stack is back to having backend on the top.
614
615 STAT_INCR(ctx, backend_total, 1);
616
617 return bew;
618 }
619
mcplib_pool_gc(lua_State * L)620 static int mcplib_pool_gc(lua_State *L) {
621 mcp_pool_t *p = luaL_checkudata(L, -1, "mcp.pool");
622
623 mcp_gobj_finalize(&p->g);
624
625 luaL_unref(L, LUA_REGISTRYINDEX, p->phc_ref);
626
627 for (int x = 0; x < p->pool_be_total; x++) {
628 if (p->pool[x].ref) {
629 luaL_unref(L, LUA_REGISTRYINDEX, p->pool[x].ref);
630 }
631 }
632
633 return 0;
634 }
635
636 // Looks for a short string in a key to separate which part gets hashed vs
637 // sent to the backend node.
638 // ie: "foo:bar|#|restofkey" - only "foo:bar" gets hashed.
mcp_key_hash_filter_stop(const char * conf,const char * key,size_t klen,size_t * newlen)639 static const char *mcp_key_hash_filter_stop(const char *conf, const char *key, size_t klen, size_t *newlen) {
640 char temp[KEY_MAX_LENGTH+1];
641 *newlen = klen;
642 if (klen > KEY_MAX_LENGTH) {
643 // Hedging against potential bugs.
644 return key;
645 }
646
647 memcpy(temp, key, klen);
648 temp[klen+1] = '\0';
649
650 // TODO (v2): memmem would avoid the temp key and memcpy here, but it's
651 // not technically portable. An easy improvement would be to detect
652 // memmem() in `configure` and only use strstr/copy as a fallback.
653 // Since keys are short it's unlikely this would be a major performance
654 // win.
655 char *found = strstr(temp, conf);
656
657 if (found) {
658 *newlen = found - temp;
659 }
660
661 // hash stop can't change where keys start.
662 return key;
663 }
664
665 // Takes a two character "tag", ie; "{}", or "$$", searches string for the
666 // first then second character. Only hashes the portion within these tags.
667 // *conf _must_ be two characters.
mcp_key_hash_filter_tag(const char * conf,const char * key,size_t klen,size_t * newlen)668 static const char *mcp_key_hash_filter_tag(const char *conf, const char *key, size_t klen, size_t *newlen) {
669 *newlen = klen;
670
671 const char *t1 = memchr(key, conf[0], klen);
672 if (t1) {
673 size_t remain = klen - (t1 - key);
674 // must be at least one character inbetween the tags to hash.
675 if (remain > 1) {
676 const char *t2 = memchr(t1, conf[1], remain);
677
678 if (t2) {
679 *newlen = t2 - t1 - 1;
680 return t1+1;
681 }
682 }
683 }
684
685 return key;
686 }
687
_mcplib_pool_dist(lua_State * L,mcp_pool_t * p)688 static void _mcplib_pool_dist(lua_State *L, mcp_pool_t *p) {
689 luaL_checktype(L, -1, LUA_TTABLE);
690 if (lua_getfield(L, -1, "new") != LUA_TFUNCTION) {
691 proxy_lua_error(L, "key distribution object missing 'new' function");
692 return;
693 }
694
695 // - now create the copy pool table
696 lua_createtable(L, p->pool_size, 0); // give the new pool table a sizing hint.
697 for (int x = 1; x <= p->pool_size; x++) {
698 mcp_backend_t *be = p->pool[x-1].be;
699 lua_createtable(L, 0, 4);
700 // stack = [p, h, f, optN, newpool, backend]
701 // the key should be fine for id? maybe don't need to duplicate
702 // this?
703 lua_pushinteger(L, x);
704 lua_setfield(L, -2, "id");
705 // we don't use the hostname for ketama hashing
706 // so passing ip for hostname is fine
707 lua_pushstring(L, be->name);
708 lua_setfield(L, -2, "addr");
709 lua_pushstring(L, be->port);
710 lua_setfield(L, -2, "port");
711
712 // set the backend table into the new pool table.
713 lua_rawseti(L, -2, x);
714 }
715
716 // we can either use lua_insert() or possibly _rotate to shift
717 // things into the right place, but simplest is to just copy the
718 // option arg to the end of the stack.
719 lua_pushvalue(L, 2);
720 // - stack should be: pool, opts, func, pooltable, opts
721
722 // call the dist new function.
723 int res = lua_pcall(L, 2, 2, 0);
724
725 if (res != LUA_OK) {
726 lua_error(L); // error should be on the stack already.
727 return;
728 }
729
730 // -1 is lightuserdata ptr to the struct (which must be owned by the
731 // userdata), which is later used for internal calls.
732 struct proxy_hash_caller *phc;
733
734 luaL_checktype(L, -1, LUA_TLIGHTUSERDATA);
735 luaL_checktype(L, -2, LUA_TUSERDATA);
736 phc = lua_touserdata(L, -1);
737 memcpy(&p->phc, phc, sizeof(*phc));
738 lua_pop(L, 1);
739 // -2 was userdata we need to hold a reference to
740 p->phc_ref = luaL_ref(L, LUA_REGISTRYINDEX);
741 // UD now popped from stack.
742 }
743
744 // in the proxy object, we can alias a ptr to the pool to where it needs to be
745 // based on worker number or io_thread right?
_mcplib_pool_make_be_loop(lua_State * L,mcp_pool_t * p,int offset,proxy_event_thread_t * t)746 static void _mcplib_pool_make_be_loop(lua_State *L, mcp_pool_t *p, int offset, proxy_event_thread_t *t) {
747 // remember lua arrays are 1 indexed.
748 for (int x = 1; x <= p->pool_size; x++) {
749 mcp_pool_be_t *s = &p->pool[x-1 + (offset * p->pool_size)];
750 lua_geti(L, 1, x); // get next server into the stack.
751 // If we bail here, the pool _gc() should handle releasing any backend
752 // references we made so far.
753 mcp_backend_label_t *bel = luaL_checkudata(L, -1, "mcp.backend");
754
755 // check label for pre-existing backend conn/wrapper
756 // TODO (v2): there're native ways of "from C make lua strings"
757 int toconcat = 1;
758 if (p->beprefix[0] != '\0') {
759 lua_pushstring(L, p->beprefix);
760 toconcat++;
761 }
762 if (p->use_iothread) {
763 lua_pushstring(L, ":io:");
764 toconcat++;
765 } else {
766 lua_pushstring(L, ":w");
767 lua_pushinteger(L, offset);
768 lua_pushstring(L, ":");
769 toconcat += 3;
770 }
771 lua_pushlstring(L, bel->label, bel->llen);
772 lua_concat(L, toconcat);
773
774 lua_pushvalue(L, -1); // copy the label string for the create method.
775 mcp_backend_wrap_t *bew = _mcplib_backend_checkcache(L, bel);
776 if (bew == NULL) {
777 bew = _mcplib_make_backendconn(L, bel, t);
778 }
779 s->be = bew->be; // unwrap the backend connection for direct ref.
780 bew->be->use_io_thread = p->use_iothread;
781
782 // If found from cache or made above, the backend wrapper is on the
783 // top of the stack, so we can now take its reference.
784 // The wrapper abstraction allows the be memory to be owned by its
785 // destination thread (IO thread/etc).
786
787 s->ref = luaL_ref(L, LUA_REGISTRYINDEX); // references and pops object.
788 lua_pop(L, 1); // pop the mcp.backend label object.
789 lua_pop(L, 1); // drop extra label copy.
790 }
791 }
792
793 // call with table of backends in 1
_mcplib_pool_make_be(lua_State * L,mcp_pool_t * p)794 static void _mcplib_pool_make_be(lua_State *L, mcp_pool_t *p) {
795 if (p->use_iothread) {
796 proxy_ctx_t *ctx = PROXY_GET_CTX(L);
797 _mcplib_pool_make_be_loop(L, p, 0, ctx->proxy_io_thread);
798 } else {
799 // TODO (v3) globals.
800 for (int n = 0; n < settings.num_threads; n++) {
801 LIBEVENT_THREAD *t = get_worker_thread(n);
802 _mcplib_pool_make_be_loop(L, p, t->thread_baseid, t->proxy_event_thread);
803 }
804 }
805 }
806
807 // p = mcp.pool(backends, { dist = f, hashfilter = f, seed = "a", hash = f })
mcplib_pool(lua_State * L)808 static int mcplib_pool(lua_State *L) {
809 proxy_ctx_t *ctx = PROXY_GET_CTX(L);
810 int argc = lua_gettop(L);
811 luaL_checktype(L, 1, LUA_TTABLE);
812 int n = luaL_len(L, 1); // get length of array table
813 int workers = settings.num_threads; // TODO (v3): globals usage.
814
815 size_t plen = sizeof(mcp_pool_t) + (sizeof(mcp_pool_be_t) * n * workers);
816 mcp_pool_t *p = lua_newuserdatauv(L, plen, 0);
817 // Zero the memory before use, so we can realibly use __gc to clean up
818 memset(p, 0, plen);
819 p->pool_size = n;
820 p->pool_be_total = n * workers;
821 p->use_iothread = ctx->tunables.use_iothread;
822 // TODO (v2): Nicer if this is fetched from mcp.default_key_hash
823 p->key_hasher = XXH3_64bits_withSeed;
824 pthread_mutex_init(&p->g.lock, NULL);
825 p->ctx = PROXY_GET_CTX(L);
826
827 luaL_setmetatable(L, "mcp.pool");
828
829 // Allow passing an ignored nil as a second argument. Makes the lua easier
830 int type = lua_type(L, 2);
831 if (argc == 1 || type == LUA_TNIL) {
832 _mcplib_pool_make_be(L, p);
833 lua_getglobal(L, "mcp");
834 // TODO (v2): decide on a mcp.default_dist and use that instead
835 if (lua_getfield(L, -1, "dist_jump_hash") != LUA_TNIL) {
836 _mcplib_pool_dist(L, p);
837 lua_pop(L, 1); // pop "dist_jump_hash" value.
838 } else {
839 lua_pop(L, 1);
840 }
841 lua_pop(L, 1); // pop "mcp"
842 return 1;
843 }
844
845 // Supplied with an options table. We inspect this table to decorate the
846 // pool, then pass it along to the a constructor if necessary.
847 luaL_checktype(L, 2, LUA_TTABLE);
848
849 if (lua_getfield(L, 2, "iothread") != LUA_TNIL) {
850 luaL_checktype(L, -1, LUA_TBOOLEAN);
851 int use_iothread = lua_toboolean(L, -1);
852 if (use_iothread) {
853 p->use_iothread = true;
854 } else {
855 p->use_iothread = false;
856 }
857 lua_pop(L, 1); // remove value.
858 } else {
859 lua_pop(L, 1); // pop the nil.
860 }
861
862 if (lua_getfield(L, 2, "beprefix") != LUA_TNIL) {
863 luaL_checktype(L, -1, LUA_TSTRING);
864 size_t len = 0;
865 const char *bepfx = lua_tolstring(L, -1, &len);
866 memcpy(p->beprefix, bepfx, len);
867 p->beprefix[len+1] = '\0';
868 lua_pop(L, 1); // pop beprefix string.
869 } else {
870 lua_pop(L, 1); // pop the nil.
871 }
872 _mcplib_pool_make_be(L, p);
873
874 // stack: backends, options, mcp.pool
875 if (lua_getfield(L, 2, "dist") != LUA_TNIL) {
876 // overriding the distribution function.
877 _mcplib_pool_dist(L, p);
878 lua_pop(L, 1); // remove the dist table from stack.
879 } else {
880 lua_pop(L, 1); // pop the nil.
881
882 // use the default dist if not specified with an override table.
883 lua_getglobal(L, "mcp");
884 // TODO (v2): decide on a mcp.default_dist and use that instead
885 if (lua_getfield(L, -1, "dist_jump_hash") != LUA_TNIL) {
886 _mcplib_pool_dist(L, p);
887 lua_pop(L, 1); // pop "dist_jump_hash" value.
888 } else {
889 lua_pop(L, 1);
890 }
891 lua_pop(L, 1); // pop "mcp"
892 }
893
894 if (lua_getfield(L, 2, "filter") != LUA_TNIL) {
895 luaL_checktype(L, -1, LUA_TSTRING);
896 const char *f_type = lua_tostring(L, -1);
897 if (strcmp(f_type, "stop") == 0) {
898 p->key_filter = mcp_key_hash_filter_stop;
899 } else if (strcmp(f_type, "tags") == 0) {
900 p->key_filter = mcp_key_hash_filter_tag;
901 } else {
902 proxy_lua_ferror(L, "unknown hash filter specified: %s\n", f_type);
903 }
904
905 lua_pop(L, 1); // pops "filter" value.
906
907 if (lua_getfield(L, 2, "filter_conf") == LUA_TSTRING) {
908 size_t len = 0;
909 const char *conf = lua_tolstring(L, -1, &len);
910 if (len < 2 || len > KEY_HASH_FILTER_MAX) {
911 proxy_lua_ferror(L, "hash filter conf must be between 2 and %d characters", KEY_HASH_FILTER_MAX);
912 }
913
914 memcpy(p->key_filter_conf, conf, len);
915 p->key_filter_conf[len+1] = '\0';
916 } else {
917 proxy_lua_error(L, "hash filter requires 'filter_conf' string");
918 }
919 lua_pop(L, 1); // pops "filter_conf" value.
920 } else {
921 lua_pop(L, 1); // pop the nil.
922 }
923
924 if (lua_getfield(L, 2, "hash") != LUA_TNIL) {
925 luaL_checktype(L, -1, LUA_TLIGHTUSERDATA);
926 struct proxy_hash_func *phf = lua_touserdata(L, -1);
927 p->key_hasher = phf->func;
928 lua_pop(L, 1);
929 } else {
930 lua_pop(L, 1); // pop the nil.
931 }
932
933 if (lua_getfield(L, 2, "seed") != LUA_TNIL) {
934 luaL_checktype(L, -1, LUA_TSTRING);
935 size_t seedlen;
936 const char *seedstr = lua_tolstring(L, -1, &seedlen);
937 // Note: the custom hasher for a dist may be "weird" in some cases, so
938 // we use a standard hash method for the seed here.
939 // I'm open to changing this (ie; mcp.pool_seed_hasher = etc)
940 p->hash_seed = XXH3_64bits(seedstr, seedlen);
941
942 lua_pop(L, 1);
943 } else {
944 lua_pop(L, 1); // pop the nil.
945 }
946
947 if (p->phc.selector_func == NULL) {
948 proxy_lua_error(L, "cannot create pool missing 'dist' argument");
949 }
950
951 return 1;
952 }
953
mcplib_pool_proxy_gc(lua_State * L)954 static int mcplib_pool_proxy_gc(lua_State *L) {
955 mcp_pool_proxy_t *pp = luaL_checkudata(L, -1, "mcp.pool_proxy");
956 mcp_pool_t *p = pp->main;
957 pthread_mutex_lock(&p->g.lock);
958 p->g.refcount--;
959 if (p->g.refcount == 0) {
960 proxy_ctx_t *ctx = p->ctx;
961 pthread_mutex_lock(&ctx->manager_lock);
962 STAILQ_INSERT_TAIL(&ctx->manager_head, &p->g, next);
963 pthread_cond_signal(&ctx->manager_cond);
964 pthread_mutex_unlock(&ctx->manager_lock);
965 }
966 pthread_mutex_unlock(&p->g.lock);
967
968 return 0;
969 }
970
mcplib_pool_proxy_call_helper(mcp_pool_proxy_t * pp,const char * key,size_t len)971 mcp_backend_t *mcplib_pool_proxy_call_helper(mcp_pool_proxy_t *pp, const char *key, size_t len) {
972 mcp_pool_t *p = pp->main;
973 if (p->key_filter) {
974 key = p->key_filter(p->key_filter_conf, key, len, &len);
975 P_DEBUG("%s: filtered key for hashing (%.*s)\n", __func__, (int)len, key);
976 }
977 uint64_t hash = p->key_hasher(key, len, p->hash_seed);
978 uint32_t lookup = p->phc.selector_func(hash, p->phc.ctx);
979
980 assert(p->phc.ctx != NULL);
981 if (lookup >= p->pool_size) {
982 return NULL;
983 }
984
985 return pp->pool[lookup].be;
986 }
987
mcplib_backend_use_iothread(lua_State * L)988 static int mcplib_backend_use_iothread(lua_State *L) {
989 luaL_checktype(L, -1, LUA_TBOOLEAN);
990 int state = lua_toboolean(L, -1);
991 proxy_ctx_t *ctx = PROXY_GET_CTX(L);
992
993 STAT_L(ctx);
994 ctx->tunables.use_iothread = state;
995 STAT_UL(ctx);
996
997 return 0;
998 }
999
mcplib_backend_use_tls(lua_State * L)1000 static int mcplib_backend_use_tls(lua_State *L) {
1001 luaL_checktype(L, -1, LUA_TBOOLEAN);
1002 int state = lua_toboolean(L, -1);
1003 proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1004 #ifndef PROXY_TLS
1005 if (state == 1) {
1006 proxy_lua_error(L, "cannot set mcp.backend_use_tls: TLS support not compiled");
1007 }
1008 #endif
1009 STAT_L(ctx);
1010 ctx->tunables.use_tls = state;
1011 STAT_UL(ctx);
1012
1013 return 0;
1014 }
1015
1016 // TODO: error checking.
mcplib_init_tls(lua_State * L)1017 static int mcplib_init_tls(lua_State *L) {
1018 #ifndef PROXY_TLS
1019 proxy_lua_error(L, "cannot run mcp.init_tls: TLS support not compiled");
1020 #else
1021 proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1022 mcp_tls_init(ctx);
1023 #endif
1024
1025 return 0;
1026 }
1027
mcplib_tcp_keepalive(lua_State * L)1028 static int mcplib_tcp_keepalive(lua_State *L) {
1029 luaL_checktype(L, -1, LUA_TBOOLEAN);
1030 int state = lua_toboolean(L, -1);
1031 proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1032
1033 STAT_L(ctx);
1034 ctx->tunables.tcp_keepalive = state;
1035 STAT_UL(ctx);
1036
1037 return 0;
1038 }
1039
mcplib_backend_failure_limit(lua_State * L)1040 static int mcplib_backend_failure_limit(lua_State *L) {
1041 int limit = luaL_checkinteger(L, -1);
1042 proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1043
1044 if (limit < 0) {
1045 proxy_lua_error(L, "backend_failure_limit must be >= 0");
1046 return 0;
1047 }
1048
1049 STAT_L(ctx);
1050 ctx->tunables.backend_failure_limit = limit;
1051 STAT_UL(ctx);
1052
1053 return 0;
1054 }
1055
mcplib_backend_depth_limit(lua_State * L)1056 static int mcplib_backend_depth_limit(lua_State *L) {
1057 int limit = luaL_checkinteger(L, -1);
1058 proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1059
1060 if (limit < 0) {
1061 proxy_lua_error(L, "backend_depth_limit must be >= 0");
1062 return 0;
1063 }
1064
1065 STAT_L(ctx);
1066 ctx->tunables.backend_depth_limit = limit;
1067 STAT_UL(ctx);
1068
1069 return 0;
1070 }
1071
mcplib_backend_connect_timeout(lua_State * L)1072 static int mcplib_backend_connect_timeout(lua_State *L) {
1073 lua_Number secondsf = luaL_checknumber(L, -1);
1074 lua_Integer secondsi = (lua_Integer) secondsf;
1075 lua_Number subseconds = secondsf - secondsi;
1076 proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1077
1078 STAT_L(ctx);
1079 ctx->tunables.connect.tv_sec = secondsi;
1080 ctx->tunables.connect.tv_usec = MICROSECONDS(subseconds);
1081 STAT_UL(ctx);
1082
1083 return 0;
1084 }
1085
mcplib_backend_retry_waittime(lua_State * L)1086 static int mcplib_backend_retry_waittime(lua_State *L) {
1087 lua_Number secondsf = luaL_checknumber(L, -1);
1088 proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1089 lua_Integer secondsi = _mcplib_backend_get_waittime(secondsf);
1090
1091 STAT_L(ctx);
1092 ctx->tunables.retry.tv_sec = secondsi;
1093 ctx->tunables.retry.tv_usec = 0;
1094 STAT_UL(ctx);
1095
1096 return 0;
1097 }
1098
1099 // TODO (v2): deprecation notice print when using this function.
mcplib_backend_retry_timeout(lua_State * L)1100 static int mcplib_backend_retry_timeout(lua_State *L) {
1101 return mcplib_backend_retry_waittime(L);
1102 }
1103
mcplib_backend_read_timeout(lua_State * L)1104 static int mcplib_backend_read_timeout(lua_State *L) {
1105 lua_Number secondsf = luaL_checknumber(L, -1);
1106 lua_Integer secondsi = (lua_Integer) secondsf;
1107 lua_Number subseconds = secondsf - secondsi;
1108 proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1109
1110 STAT_L(ctx);
1111 ctx->tunables.read.tv_sec = secondsi;
1112 ctx->tunables.read.tv_usec = MICROSECONDS(subseconds);
1113 STAT_UL(ctx);
1114
1115 return 0;
1116 }
1117
mcplib_backend_flap_time(lua_State * L)1118 static int mcplib_backend_flap_time(lua_State *L) {
1119 lua_Number secondsf = luaL_checknumber(L, -1);
1120 lua_Integer secondsi = (lua_Integer) secondsf;
1121 lua_Number subseconds = secondsf - secondsi;
1122 proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1123
1124 STAT_L(ctx);
1125 ctx->tunables.flap.tv_sec = secondsi;
1126 ctx->tunables.flap.tv_usec = MICROSECONDS(subseconds);
1127 STAT_UL(ctx);
1128
1129 return 0;
1130 }
1131
mcplib_backend_flap_backoff_ramp(lua_State * L)1132 static int mcplib_backend_flap_backoff_ramp(lua_State *L) {
1133 float factor = luaL_checknumber(L, -1);
1134 proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1135 if (factor <= 1.1) {
1136 factor = 1.1;
1137 }
1138
1139 STAT_L(ctx);
1140 ctx->tunables.flap_backoff_ramp = factor;
1141 STAT_UL(ctx);
1142
1143 return 0;
1144 }
1145
mcplib_backend_flap_backoff_max(lua_State * L)1146 static int mcplib_backend_flap_backoff_max(lua_State *L) {
1147 luaL_checknumber(L, -1);
1148 uint32_t max = lua_tointeger(L, -1);
1149 proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1150
1151 STAT_L(ctx);
1152 ctx->tunables.flap_backoff_max = max;
1153 STAT_UL(ctx);
1154
1155 return 0;
1156 }
1157
mcplib_stat_limit(lua_State * L)1158 static int mcplib_stat_limit(lua_State *L) {
1159 proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1160 int limit = luaL_checkinteger(L, -1);
1161
1162 if (limit == 0) {
1163 limit = MAX_USTATS_DEFAULT;
1164 }
1165 if (limit > MAX_USTATS_DEFAULT) {
1166 fprintf(stderr, "PROXY WARNING: setting ustats limit above default may cause performance problems\n");
1167 }
1168
1169 // lock isn't necessary as this is only used from the config thread.
1170 // keeping the lock call for code consistency.
1171 STAT_L(ctx);
1172 ctx->tunables.max_ustats = limit;
1173 STAT_UL(ctx);
1174 return 0;
1175 }
1176
mcplib_active_req_limit(lua_State * L)1177 static int mcplib_active_req_limit(lua_State *L) {
1178 proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1179 uint64_t limit = luaL_checkinteger(L, -1);
1180
1181 if (limit == 0) {
1182 limit = UINT64_MAX;
1183 } else {
1184 // FIXME: global
1185 int tcount = settings.num_threads;
1186 // The actual limit is per-worker-thread, so divide it up.
1187 if (limit > tcount * 2) {
1188 limit /= tcount;
1189 }
1190 }
1191
1192 STAT_L(ctx);
1193 ctx->active_req_limit = limit;
1194 STAT_UL(ctx);
1195
1196 return 0;
1197 }
1198
1199 // limit specified in kilobytes
mcplib_buffer_memory_limit(lua_State * L)1200 static int mcplib_buffer_memory_limit(lua_State *L) {
1201 proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1202 uint64_t limit = luaL_checkinteger(L, -1);
1203
1204 if (limit == 0) {
1205 limit = UINT64_MAX;
1206 } else {
1207 limit *= 1024;
1208
1209 int tcount = settings.num_threads;
1210 if (limit > tcount * 2) {
1211 limit /= tcount;
1212 }
1213 }
1214 ctx->buffer_memory_limit = limit;
1215
1216 return 0;
1217 }
1218
1219 // mcp.attach(mcp.HOOK_NAME, function)
1220 // fill hook structure: if lua function, use luaL_ref() to store the func
mcplib_attach(lua_State * L)1221 static int mcplib_attach(lua_State *L) {
1222 // Pull the original worker thread out of the shared mcplib upvalue.
1223 LIBEVENT_THREAD *t = PROXY_GET_THR(L);
1224
1225 int hook = luaL_checkinteger(L, 1);
1226 // pushvalue to dupe func and etc.
1227 // can leave original func on stack afterward because it'll get cleared.
1228 int loop_end = 0;
1229 int loop_start = 1;
1230 if (hook == CMD_ANY) {
1231 // if CMD_ANY we need individually set loop 1 to CMD_SIZE.
1232 loop_end = CMD_SIZE;
1233 } else if (hook == CMD_ANY_STORAGE) {
1234 // if CMD_ANY_STORAGE we only override get/set/etc.
1235 loop_end = CMD_END_STORAGE;
1236 } else {
1237 loop_start = hook;
1238 loop_end = hook + 1;
1239 }
1240
1241 mcp_funcgen_t *fgen = NULL;
1242 if (lua_isfunction(L, 2)) {
1243 // create a funcgen with null generator that calls this function
1244 lua_pushvalue(L, 2); // function must be at top of stack.
1245 mcplib_funcgenbare_new(L); // convert it into a function generator.
1246 fgen = luaL_checkudata(L, -1, "mcp.funcgen"); // set our pointer ref.
1247 lua_replace(L, 2); // move the function generator over the input
1248 // function. necessary for alignment with the rest
1249 // of the code.
1250 lua_pop(L, 1); // drop the extra generator function reference.
1251 } else if ((fgen = luaL_testudata(L, 2, "mcp.funcgen")) != NULL) {
1252 // good
1253 } else {
1254 proxy_lua_error(L, "mcp.attach: must pass a function");
1255 return 0;
1256 }
1257
1258 if (fgen->closed) {
1259 proxy_lua_error(L, "mcp.attach: cannot use a previously replaced function");
1260 return 0;
1261 }
1262
1263 {
1264 struct proxy_hook *hooks = t->proxy_hooks;
1265 uint64_t tag = 0; // listener socket tag
1266
1267 if (lua_isstring(L, 3)) {
1268 size_t len;
1269 const char *stag = lua_tolstring(L, 3, &len);
1270 if (len < 1 || len > 8) {
1271 proxy_lua_error(L, "mcp.attach: tag must be 1 to 8 characters");
1272 return 0;
1273 }
1274 memcpy(&tag, stag, len);
1275 }
1276
1277 for (int x = loop_start; x < loop_end; x++) {
1278 struct proxy_hook *h = &hooks[x];
1279 if (x == CMD_MN) {
1280 // disallow overriding MN so client pipeline flushes work.
1281 // need to add flush support before allowing override
1282 continue;
1283 }
1284 lua_pushvalue(L, 2); // duplicate the ref.
1285 struct proxy_hook_ref *href = &h->ref;
1286
1287 if (tag) {
1288 // listener was tagged. use the extended hook structure.
1289 struct proxy_hook_tagged *pht = h->tagged;
1290
1291 if (h->tagcount == 0) {
1292 pht = calloc(1, sizeof(struct proxy_hook_tagged));
1293 if (pht == NULL) {
1294 proxy_lua_error(L, "mcp.attach: failure allocating tagged hooks");
1295 return 0;
1296 }
1297 h->tagcount = 1;
1298 h->tagged = pht;
1299 }
1300
1301 bool found = false;
1302 for (int x = 0; x < h->tagcount; x++) {
1303 if (pht->tag == tag || pht->tag == 0) {
1304 found = true;
1305 break;
1306 }
1307 pht++;
1308 }
1309
1310 // need to resize the array to fit the new tag.
1311 if (!found) {
1312 struct proxy_hook_tagged *temp = realloc(h->tagged, sizeof(struct proxy_hook_tagged) * (h->tagcount+1));
1313 if (!temp) {
1314 proxy_lua_error(L, "mcp.attach: failure to resize tagged hooks");
1315 return 0;
1316 }
1317 pht = &temp[h->tagcount];
1318 memset(pht, 0, sizeof(*pht));
1319 h->tagcount++;
1320 h->tagged = temp;
1321 }
1322
1323 href = &pht->ref;
1324 pht->tag = tag;
1325 }
1326
1327 // now assign our hook reference.
1328 if (href->lua_ref) {
1329 // Found existing tagged hook.
1330 luaL_unref(L, LUA_REGISTRYINDEX, href->lua_ref);
1331 mcp_funcgen_dereference(L, href->ctx);
1332 }
1333
1334 lua_pushvalue(L, -1); // duplicate the funcgen
1335 mcp_funcgen_reference(L);
1336 href->lua_ref = luaL_ref(L, LUA_REGISTRYINDEX);
1337 href->ctx = fgen;
1338 assert(href->lua_ref != 0);
1339 }
1340 }
1341
1342 return 0;
1343 }
1344
1345 /*** START lua interface to logger ***/
1346
1347 // user logger specific to the config thread
mcplib_ct_log(lua_State * L)1348 static int mcplib_ct_log(lua_State *L) {
1349 const char *msg = luaL_checkstring(L, -1);
1350 // The only difference is we pull the logger from thread local storage.
1351 LOGGER_LOG(NULL, LOG_PROXYUSER, LOGGER_PROXY_USER, NULL, msg);
1352 return 0;
1353 }
1354
mcplib_log(lua_State * L)1355 static int mcplib_log(lua_State *L) {
1356 LIBEVENT_THREAD *t = PROXY_GET_THR(L);
1357 const char *msg = luaL_checkstring(L, -1);
1358 LOGGER_LOG(t->l, LOG_PROXYUSER, LOGGER_PROXY_USER, NULL, msg);
1359 return 0;
1360 }
1361
1362 // (request, resp, "detail")
mcplib_log_req(lua_State * L)1363 static int mcplib_log_req(lua_State *L) {
1364 LIBEVENT_THREAD *t = PROXY_GET_THR(L);
1365 logger *l = t->l;
1366 // Not using the LOGGER_LOG macro so we can avoid as much overhead as
1367 // possible when logging is disabled.
1368 if (! (l->eflags & LOG_PROXYREQS)) {
1369 return 0;
1370 }
1371 int rtype = 0;
1372 int rcode = 0;
1373 int rstatus = 0;
1374 long elapsed = 0;
1375 char *rname = NULL;
1376 char *rport = NULL;
1377
1378 mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request");
1379 int type = lua_type(L, 2);
1380 if (type == LUA_TUSERDATA) {
1381 mcp_resp_t *rs = luaL_checkudata(L, 2, "mcp.response");
1382 rtype = rs->resp.type;
1383 rcode = rs->resp.code;
1384 rstatus = rs->status;
1385 rname = rs->be_name;
1386 rport = rs->be_port;
1387 elapsed = rs->elapsed;
1388 }
1389 size_t dlen = 0;
1390 const char *detail = luaL_optlstring(L, 3, NULL, &dlen);
1391 int cfd = luaL_optinteger(L, 4, 0);
1392
1393 logger_log(l, LOGGER_PROXY_REQ, NULL, rq->pr.request, rq->pr.reqlen, elapsed, rtype, rcode, rstatus, cfd, detail, dlen, rname, rport);
1394
1395 return 0;
1396 }
1397
_rotl(const uint32_t x,int k)1398 static inline uint32_t _rotl(const uint32_t x, int k) {
1399 return (x << k) | (x >> (32 - k));
1400 }
1401
1402 // xoroshiro128++ 32bit version.
_nextrand(uint32_t * s)1403 static uint32_t _nextrand(uint32_t *s) {
1404 const uint32_t result = _rotl(s[0] + s[3], 7) + s[0];
1405
1406 const uint32_t t = s[1] << 9;
1407
1408 s[2] ^= s[0];
1409 s[3] ^= s[1];
1410 s[1] ^= s[2];
1411 s[0] ^= s[3];
1412
1413 s[2] ^= t;
1414
1415 s[3] = _rotl(s[3], 11);
1416
1417 return result;
1418 }
1419
1420
1421 // (milliseconds, sample_rate, allerrors, request, resp, "detail")
mcplib_log_reqsample(lua_State * L)1422 static int mcplib_log_reqsample(lua_State *L) {
1423 LIBEVENT_THREAD *t = PROXY_GET_THR(L);
1424 logger *l = t->l;
1425 // Not using the LOGGER_LOG macro so we can avoid as much overhead as
1426 // possible when logging is disabled.
1427 if (! (l->eflags & LOG_PROXYREQS)) {
1428 return 0;
1429 }
1430 int rtype = 0;
1431 int rcode = 0;
1432 int rstatus = 0;
1433 long elapsed = 0;
1434 char *rname = NULL;
1435 char *rport = NULL;
1436
1437 int ms = luaL_checkinteger(L, 1);
1438 int rate = luaL_checkinteger(L, 2);
1439 int allerr = lua_toboolean(L, 3);
1440 mcp_request_t *rq = luaL_checkudata(L, 4, "mcp.request");
1441 int type = lua_type(L, 5);
1442 if (type == LUA_TUSERDATA) {
1443 mcp_resp_t *rs = luaL_checkudata(L, 5, "mcp.response");
1444 rtype = rs->resp.type;
1445 rcode = rs->resp.code;
1446 rstatus = rs->status;
1447 rname = rs->be_name;
1448 rport = rs->be_port;
1449 elapsed = rs->elapsed;
1450 }
1451 size_t dlen = 0;
1452 const char *detail = luaL_optlstring(L, 6, NULL, &dlen);
1453 int cfd = luaL_optinteger(L, 7, 0);
1454
1455 bool do_log = false;
1456 if (allerr && rstatus != MCMC_OK) {
1457 do_log = true;
1458 } else if (ms > 0 && elapsed > ms * 1000) {
1459 do_log = true;
1460 } else if (rate > 0) {
1461 // slightly biased random-to-rate without adding a loop, which is
1462 // completely fine for this use case.
1463 uint32_t rnd = (uint64_t)_nextrand(t->proxy_rng) * (uint64_t)rate >> 32;
1464 if (rnd == 0) {
1465 do_log = true;
1466 }
1467 }
1468
1469 if (do_log) {
1470 logger_log(l, LOGGER_PROXY_REQ, NULL, rq->pr.request, rq->pr.reqlen, elapsed, rtype, rcode, rstatus, cfd, detail, dlen, rname, rport);
1471 }
1472
1473 return 0;
1474 }
1475
1476 // TODO: slowsample
1477 // _err versions?
1478
1479 /*** END lua interface to logger ***/
1480
proxy_register_defines(lua_State * L)1481 static void proxy_register_defines(lua_State *L) {
1482 #define X(x) \
1483 lua_pushinteger(L, x); \
1484 lua_setfield(L, -2, #x);
1485 #define Y(x, l) \
1486 lua_pushinteger(L, x); \
1487 lua_setfield(L, -2, l);
1488
1489 X(MCMC_CODE_STORED);
1490 X(MCMC_CODE_EXISTS);
1491 X(MCMC_CODE_DELETED);
1492 X(MCMC_CODE_TOUCHED);
1493 X(MCMC_CODE_VERSION);
1494 X(MCMC_CODE_NOT_FOUND);
1495 X(MCMC_CODE_NOT_STORED);
1496 X(MCMC_CODE_OK);
1497 X(MCMC_CODE_NOP);
1498 X(MCMC_CODE_END);
1499 X(MCMC_CODE_ERROR);
1500 X(MCMC_CODE_CLIENT_ERROR);
1501 X(MCMC_CODE_SERVER_ERROR);
1502 X(MCMC_ERR);
1503 X(P_OK);
1504 X(CMD_ANY);
1505 X(CMD_ANY_STORAGE);
1506 Y(QWAIT_ANY, "WAIT_ANY");
1507 Y(QWAIT_OK, "WAIT_OK");
1508 Y(QWAIT_GOOD, "WAIT_GOOD");
1509 Y(QWAIT_FASTGOOD, "WAIT_FASTGOOD");
1510 Y(RQUEUE_R_GOOD, "RES_GOOD");
1511 Y(RQUEUE_R_OK, "RES_OK");
1512 Y(RQUEUE_R_ANY, "RES_ANY");
1513 CMD_FIELDS
1514 #undef X
1515 #undef Y
1516
1517 lua_pushboolean(L, 1);
1518 lua_setfield(L, -2, "WAIT_RESUME");
1519 }
1520
1521 // TODO: low priority malloc error handling.
proxy_register_startarg(lua_State * L)1522 static void proxy_register_startarg(lua_State *L) {
1523 int idx = lua_absindex(L, -1); // remember 'mcp' table.
1524 if (settings.proxy_startarg == NULL) {
1525 // no argument given.
1526 lua_pushboolean(L, 0);
1527 lua_setfield(L, idx, "start_arg");
1528 return;
1529 }
1530
1531 char *sarg = strdup(settings.proxy_startarg);
1532 if (strchr(sarg, ':') == NULL) {
1533 // just upload the string
1534 lua_pushstring(L, sarg);
1535 } else {
1536 // split into a table and set that instead.
1537 lua_newtable(L);
1538 int nidx = lua_absindex(L, -1);
1539 char *b = NULL;
1540 for (char *p = strtok_r(sarg, ":", &b);
1541 p != NULL;
1542 p = strtok_r(NULL, ":", &b)) {
1543 char *e = NULL;
1544 char *name = strtok_r(p, "_", &e);
1545 lua_pushstring(L, name); // table -> key
1546 char *value = strtok_r(NULL, "_", &e);
1547 if (value == NULL) {
1548 lua_pushboolean(L, 1); // table -> key -> True
1549 } else {
1550 lua_pushstring(L, value); // table -> key -> value
1551 }
1552 lua_settable(L, nidx);
1553 }
1554 }
1555 free(sarg);
1556 lua_setfield(L, idx, "start_arg");
1557 }
1558
1559 // Creates and returns the top level "mcp" module
proxy_register_libs(void * ctx,LIBEVENT_THREAD * t,void * state)1560 int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) {
1561 lua_State *L = state;
1562
1563 const struct luaL_Reg mcplib_backend_m[] = {
1564 {"__gc", mcplib_backend_gc},
1565 {NULL, NULL}
1566 };
1567
1568 const struct luaL_Reg mcplib_backend_wrap_m[] = {
1569 {"__gc", mcplib_backend_wrap_gc},
1570 {NULL, NULL}
1571 };
1572
1573 const struct luaL_Reg mcplib_request_m[] = {
1574 {"command", mcplib_request_command},
1575 {"key", mcplib_request_key},
1576 {"ltrimkey", mcplib_request_ltrimkey},
1577 {"rtrimkey", mcplib_request_rtrimkey},
1578 {"token", mcplib_request_token},
1579 {"token_int", mcplib_request_token_int},
1580 {"ntokens", mcplib_request_ntokens},
1581 {"has_flag", mcplib_request_has_flag},
1582 {"flag_token", mcplib_request_flag_token},
1583 {"flag_token_int", mcplib_request_flag_token_int},
1584 {"flag_add", mcplib_request_flag_add},
1585 {"flag_set", mcplib_request_flag_set},
1586 {"flag_replace", mcplib_request_flag_replace},
1587 {"flag_del", mcplib_request_flag_del},
1588 {"match_res", mcplib_request_match_res},
1589 {"__tostring", NULL},
1590 {"__gc", mcplib_request_gc},
1591 {NULL, NULL}
1592 };
1593
1594 const struct luaL_Reg mcplib_response_m[] = {
1595 {"ok", mcplib_response_ok},
1596 {"hit", mcplib_response_hit},
1597 {"vlen", mcplib_response_vlen},
1598 {"code", mcplib_response_code},
1599 {"line", mcplib_response_line},
1600 {"flag_blank", mcplib_response_flag_blank},
1601 {"elapsed", mcplib_response_elapsed},
1602 {"__gc", mcplib_response_gc},
1603 {"__close", mcplib_response_close},
1604 {"close", mcplib_response_close},
1605 {NULL, NULL}
1606 };
1607
1608 const struct luaL_Reg mcplib_pool_m[] = {
1609 {"__gc", mcplib_pool_gc},
1610 {NULL, NULL}
1611 };
1612
1613 const struct luaL_Reg mcplib_pool_proxy_m[] = {
1614 {"__gc", mcplib_pool_proxy_gc},
1615 {NULL, NULL}
1616 };
1617
1618 const struct luaL_Reg mcplib_ratelim_tbf_m[] = {
1619 {"__call", mcplib_ratelim_tbf_call},
1620 {NULL, NULL}
1621 };
1622
1623 const struct luaL_Reg mcplib_ratelim_global_tbf_m[] = {
1624 {"__gc", mcplib_ratelim_global_tbf_gc},
1625 {NULL, NULL}
1626 };
1627
1628 const struct luaL_Reg mcplib_ratelim_proxy_tbf_m[] = {
1629 {"__call", mcplib_ratelim_proxy_tbf_call},
1630 {"__gc", mcplib_ratelim_proxy_tbf_gc},
1631 {NULL, NULL}
1632 };
1633
1634 const struct luaL_Reg mcplib_rcontext_m[] = {
1635 {"handle_set_cb", mcplib_rcontext_handle_set_cb},
1636 {"enqueue", mcplib_rcontext_enqueue},
1637 {"wait_cond", mcplib_rcontext_wait_cond},
1638 {"enqueue_and_wait", mcplib_rcontext_enqueue_and_wait},
1639 {"wait_handle", mcplib_rcontext_wait_handle},
1640 {"res_good", mcplib_rcontext_res_good},
1641 {"res_ok", mcplib_rcontext_res_ok},
1642 {"res_any", mcplib_rcontext_res_any},
1643 {"result", mcplib_rcontext_result},
1644 {"cfd", mcplib_rcontext_cfd},
1645 {"tls_peer_cn", mcplib_rcontext_tls_peer_cn},
1646 {"request_new", mcplib_rcontext_request_new},
1647 {"response_new", mcplib_rcontext_response_new},
1648 //{"sleep", mcplib_rcontext_sleep}, see comments on function
1649 {NULL, NULL}
1650 };
1651
1652 const struct luaL_Reg mcplib_funcgen_m[] = {
1653 {"__gc", mcplib_funcgen_gc},
1654 {"new_handle", mcplib_funcgen_new_handle},
1655 {"ready", mcplib_funcgen_ready},
1656 {NULL, NULL}
1657 };
1658
1659 const struct luaL_Reg mcplib_inspector_m[] = {
1660 {"__gc", mcplib_inspector_gc},
1661 {"__call", mcplib_inspector_call},
1662 {NULL, NULL},
1663 };
1664
1665 const struct luaL_Reg mcplib_mutator_m[] = {
1666 {"__gc", mcplib_mutator_gc},
1667 {"__call", mcplib_mutator_call},
1668 {NULL, NULL},
1669 };
1670
1671 const struct luaL_Reg mcplib_f_config [] = {
1672 {"pool", mcplib_pool},
1673 {"backend", mcplib_backend},
1674 {"add_stat", mcplib_add_stat},
1675 {"ratelim_global_tbf", mcplib_ratelim_global_tbf},
1676 {"stat_limit", mcplib_stat_limit},
1677 {"backend_connect_timeout", mcplib_backend_connect_timeout},
1678 {"backend_retry_timeout", mcplib_backend_retry_timeout},
1679 {"backend_retry_waittime", mcplib_backend_retry_waittime},
1680 {"backend_read_timeout", mcplib_backend_read_timeout},
1681 {"backend_failure_limit", mcplib_backend_failure_limit},
1682 {"backend_depth_limit", mcplib_backend_depth_limit},
1683 {"backend_flap_time", mcplib_backend_flap_time},
1684 {"backend_flap_backoff_ramp", mcplib_backend_flap_backoff_ramp},
1685 {"backend_flap_backoff_max", mcplib_backend_flap_backoff_max},
1686 {"backend_use_iothread", mcplib_backend_use_iothread},
1687 {"backend_use_tls", mcplib_backend_use_tls},
1688 {"init_tls", mcplib_init_tls},
1689 {"tcp_keepalive", mcplib_tcp_keepalive},
1690 {"active_req_limit", mcplib_active_req_limit},
1691 {"buffer_memory_limit", mcplib_buffer_memory_limit},
1692 {"schedule_config_reload", mcplib_schedule_config_reload},
1693 {"register_cron", mcplib_register_cron},
1694 {"server_stats", mcplib_server_stats},
1695 {"log", mcplib_ct_log},
1696 {NULL, NULL}
1697 };
1698
1699 const struct luaL_Reg mcplib_f_routes [] = {
1700 {"internal", mcplib_internal},
1701 {"attach", mcplib_attach},
1702 {"funcgen_new", mcplib_funcgen_new},
1703 {"router_new", mcplib_router_new},
1704 {"log", mcplib_log},
1705 {"log_req", mcplib_log_req},
1706 {"log_reqsample", mcplib_log_reqsample},
1707 {"stat", mcplib_stat},
1708 {"request", mcplib_request},
1709 {"ratelim_tbf", mcplib_ratelim_tbf},
1710 {"req_inspector_new", mcplib_req_inspector_new},
1711 {"res_inspector_new", mcplib_res_inspector_new},
1712 {"req_mutator_new", mcplib_req_mutator_new},
1713 {"res_mutator_new", mcplib_res_mutator_new},
1714 {"time_real_millis", mcplib_time_real_millis},
1715 {"time_mono_millis", mcplib_time_mono_millis},
1716 {NULL, NULL}
1717 };
1718 // VM's have void* extra space in the VM by default for fast-access to a
1719 // context pointer like this. In some cases upvalues are inaccessible (ie;
1720 // GC's) but we still need access to the proxy global context.
1721 void **extra = lua_getextraspace(L);
1722
1723 if (t != NULL) {
1724 // If thread VM, extra is the libevent thread
1725 *extra = t;
1726 luaL_newmetatable(L, "mcp.request");
1727 lua_pushvalue(L, -1); // duplicate metatable.
1728 lua_setfield(L, -2, "__index"); // mt.__index = mt
1729 luaL_setfuncs(L, mcplib_request_m, 0); // register methods
1730 lua_pop(L, 1);
1731
1732 luaL_newmetatable(L, "mcp.response");
1733 lua_pushvalue(L, -1); // duplicate metatable.
1734 lua_setfield(L, -2, "__index"); // mt.__index = mt
1735 luaL_setfuncs(L, mcplib_response_m, 0); // register methods
1736 lua_pop(L, 1);
1737
1738 luaL_newmetatable(L, "mcp.pool_proxy");
1739 lua_pushvalue(L, -1); // duplicate metatable.
1740 lua_setfield(L, -2, "__index"); // mt.__index = mt
1741 luaL_setfuncs(L, mcplib_pool_proxy_m, 0); // register methods
1742 lua_pop(L, 1); // drop the hash selector metatable
1743
1744 luaL_newmetatable(L, "mcp.ratelim_tbf");
1745 lua_pushvalue(L, -1); // duplicate metatable.
1746 lua_setfield(L, -2, "__index"); // mt.__index = mt
1747 luaL_setfuncs(L, mcplib_ratelim_tbf_m, 0); // register methods
1748 lua_pop(L, 1);
1749
1750 luaL_newmetatable(L, "mcp.ratelim_proxy_tbf");
1751 lua_pushvalue(L, -1); // duplicate metatable.
1752 lua_setfield(L, -2, "__index"); // mt.__index = mt
1753 luaL_setfuncs(L, mcplib_ratelim_proxy_tbf_m, 0); // register methods
1754 lua_pop(L, 1);
1755
1756 luaL_newmetatable(L, "mcp.inspector");
1757 lua_pushvalue(L, -1); // duplicate metatable.
1758 lua_setfield(L, -2, "__index"); // mt.__index = mt
1759 luaL_setfuncs(L, mcplib_inspector_m, 0); // register methods
1760 lua_pop(L, 1);
1761
1762 luaL_newmetatable(L, "mcp.mutator");
1763 lua_pushvalue(L, -1); // duplicate metatable.
1764 lua_setfield(L, -2, "__index"); // mt.__index = mt
1765 luaL_setfuncs(L, mcplib_mutator_m, 0); // register methods
1766 lua_pop(L, 1);
1767
1768 luaL_newmetatable(L, "mcp.rcontext");
1769 lua_pushvalue(L, -1); // duplicate metatable.
1770 lua_setfield(L, -2, "__index"); // mt.__index = mt
1771 luaL_setfuncs(L, mcplib_rcontext_m, 0); // register methods
1772 lua_pop(L, 1);
1773
1774 luaL_newmetatable(L, "mcp.funcgen");
1775 lua_pushvalue(L, -1); // duplicate metatable.
1776 lua_setfield(L, -2, "__index"); // mt.__index = mt
1777 luaL_setfuncs(L, mcplib_funcgen_m, 0); // register methods
1778 lua_pop(L, 1);
1779
1780 // marks a special C-compatible route function.
1781 luaL_newmetatable(L, "mcp.rfunc");
1782 lua_pop(L, 1);
1783
1784 // function generator userdata.
1785 luaL_newmetatable(L, "mcp.funcgen");
1786 lua_pop(L, 1);
1787
1788 luaL_newlibtable(L, mcplib_f_routes);
1789 } else {
1790 // Change the extra space override for the configuration VM to just point
1791 // straight to ctx.
1792 *extra = ctx;
1793
1794 luaL_newmetatable(L, "mcp.backend");
1795 lua_pushvalue(L, -1); // duplicate metatable.
1796 lua_setfield(L, -2, "__index"); // mt.__index = mt
1797 luaL_setfuncs(L, mcplib_backend_m, 0); // register methods
1798 lua_pop(L, 1);
1799
1800 luaL_newmetatable(L, "mcp.backendwrap");
1801 lua_pushvalue(L, -1); // duplicate metatable.
1802 lua_setfield(L, -2, "__index"); // mt.__index = mt
1803 luaL_setfuncs(L, mcplib_backend_wrap_m, 0); // register methods
1804 lua_pop(L, 1);
1805
1806 luaL_newmetatable(L, "mcp.pool");
1807 lua_pushvalue(L, -1); // duplicate metatable.
1808 lua_setfield(L, -2, "__index"); // mt.__index = mt
1809 luaL_setfuncs(L, mcplib_pool_m, 0); // register methods
1810 lua_pop(L, 1); // drop the hash selector metatable
1811
1812 luaL_newmetatable(L, "mcp.ratelim_global_tbf");
1813 lua_pushvalue(L, -1); // duplicate metatable.
1814 lua_setfield(L, -2, "__index"); // mt.__index = mt
1815 luaL_setfuncs(L, mcplib_ratelim_global_tbf_m, 0); // register methods
1816 lua_pop(L, 1);
1817
1818 luaL_newlibtable(L, mcplib_f_config);
1819 }
1820
1821 // create main library table.
1822 //luaL_newlib(L, mcplib_f);
1823 // TODO (v2): luaL_newlibtable() just pre-allocs the exact number of things
1824 // here.
1825 // can replace with createtable and add the num. of the constant
1826 // definitions.
1827 proxy_register_defines(L);
1828
1829 mcplib_open_hash_xxhash(L);
1830 lua_setfield(L, -2, "hash_xxhash");
1831 // hash function for selectors.
1832 // have to wrap the function in a struct because function pointers aren't
1833 // pointer pointers :)
1834 mcplib_open_dist_jump_hash(L);
1835 lua_setfield(L, -2, "dist_jump_hash");
1836 mcplib_open_dist_ring_hash(L);
1837 lua_setfield(L, -2, "dist_ring_hash");
1838
1839 // create weak table for storing backends by label.
1840 lua_newtable(L); // {}
1841 lua_newtable(L); // {}, {} for metatable
1842 lua_pushstring(L, "v"); // {}, {}, "v" for weak values.
1843 lua_setfield(L, -2, "__mode"); // {}, {__mode = "v"}
1844 lua_setmetatable(L, -2); // {__mt = {__mode = "v"} }
1845
1846 if (t != NULL) {
1847 luaL_setfuncs(L, mcplib_f_routes, 1); // store upvalues.
1848 } else {
1849 luaL_setfuncs(L, mcplib_f_config, 1); // store upvalues.
1850 }
1851
1852 // every VM gets a copy of the start arguments to work with.
1853 proxy_register_startarg(L);
1854
1855 lua_setglobal(L, "mcp"); // set the lib table to mcp global.
1856 return 1;
1857 }
1858