1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 
3 #include "proxy.h"
4 #include "proxy_tls.h"
5 #include "storage.h" // for stats call
6 
7 // func prototype example:
8 // static int fname (lua_State *L)
9 // normal library open:
10 // int luaopen_mcp(lua_State *L) { }
11 
12 struct _mcplib_statctx_s {
13     lua_State *L;
14 };
15 
_mcplib_append_stats(const char * key,const uint16_t klen,const char * val,const uint32_t vlen,const void * cookie)16 static void _mcplib_append_stats(const char *key, const uint16_t klen,
17                   const char *val, const uint32_t vlen,
18                   const void *cookie) {
19     // k + v == 0 means END, but we don't use END for this lua API.
20     if (klen == 0) {
21         return;
22     }
23 
24     // cookie -> struct
25     const struct _mcplib_statctx_s *c = cookie;
26     lua_State *L = c->L;
27     // table should always be on the top.
28     lua_pushlstring(L, key, klen);
29     lua_pushlstring(L, val, vlen);
30     lua_rawset(L, -3);
31 }
32 
_mcplib_append_section_stats(const char * key,const uint16_t klen,const char * val,const uint32_t vlen,const void * cookie)33 static void _mcplib_append_section_stats(const char *key, const uint16_t klen,
34                   const char *val, const uint32_t vlen,
35                   const void *cookie) {
36     char stat[STAT_KEY_LEN];
37     long section = 0;
38     if (klen == 0) {
39         return;
40     }
41 
42     const struct _mcplib_statctx_s *c = cookie;
43     lua_State *L = c->L;
44     // table must be at the top when this function is called.
45     int tidx = lua_absindex(L, -1);
46 
47     // NOTE: sscanf is not great, especially with numerics due to UD for out
48     // of range data. It is safe to use here because we're generating the
49     // strings, and we don't use this function on anything that has user
50     // defined data (ie; stats proxy). Otherwise sscanf saves a lot of code so
51     // we use it here.
52     if (sscanf(key, "items:%ld:%s", &section, stat) == 2
53             || sscanf(key, "%ld:%s", &section, stat) == 2) {
54         // stats [items, slabs, conns]
55         if (lua_rawgeti(L, tidx, section) == LUA_TNIL) {
56             lua_pop(L, 1); // drop the nil
57             // no sub-section table yet, create one.
58             lua_newtable(L);
59             lua_pushvalue(L, -1); // copy the table
60             lua_rawseti(L, tidx, section); // remember the table
61             // now top of stack is the table.
62         }
63 
64         lua_pushstring(L, stat);
65         lua_pushlstring(L, val, vlen);
66         lua_rawset(L, -3); // put key/val into sub-table
67         lua_pop(L, 1); // pop sub-table.
68     } else {
69         // normal stat counter.
70         lua_pushlstring(L, key, klen);
71         lua_pushlstring(L, val, vlen);
72         lua_rawset(L, tidx);
73     }
74 }
75 
76 // reimplementation of proto_text.c:process_stat()
mcplib_server_stats(lua_State * L)77 static int mcplib_server_stats(lua_State *L) {
78     int argc = lua_gettop(L);
79     proxy_ctx_t *ctx = PROXY_GET_CTX(L);
80     lua_newtable(L); // the table to return.
81     struct _mcplib_statctx_s c = {
82         L,
83     };
84 
85     if (argc == 0 || lua_isnil(L, 1)) {
86         server_stats(&_mcplib_append_stats, &c);
87         get_stats(NULL, 0, &_mcplib_append_stats, &c);
88     } else {
89         const char *cmd = luaL_checkstring(L, 1);
90         if (strcmp(cmd, "settings") == 0) {
91             process_stat_settings(&_mcplib_append_stats, &c);
92         } else if (strcmp(cmd, "conns") == 0) {
93             process_stats_conns(&_mcplib_append_section_stats, &c);
94 #ifdef EXTSTORE
95         } else if (strcmp(cmd, "extstore") == 0) {
96             process_extstore_stats(&_mcplib_append_stats, &c);
97 #endif
98         } else if (strcmp(cmd, "proxy") == 0) {
99             process_proxy_stats(ctx, &_mcplib_append_stats, &c);
100         } else if (strcmp(cmd, "proxyfuncs") == 0) {
101             process_proxy_funcstats(ctx, &_mcplib_append_stats, &c);
102         } else if (strcmp(cmd, "proxybe") == 0) {
103             process_proxy_bestats(ctx, &_mcplib_append_stats, &c);
104         } else {
105             if (get_stats(cmd, strlen(cmd), &_mcplib_append_section_stats, &c)) {
106                 // all good.
107             } else {
108                 // unknown command.
109                 proxy_lua_error(L, "unknown subcommand passed to server_stats");
110             }
111         }
112     }
113 
114     // return the table.
115     return 1;
116 }
117 
_mcplib_backend_get_waittime(lua_Number secondsf)118 static lua_Integer _mcplib_backend_get_waittime(lua_Number secondsf) {
119     lua_Integer secondsi = (lua_Integer) secondsf;
120     lua_Number subseconds = secondsf - secondsi;
121     if (subseconds >= 0.5) {
122         // Yes, I know this rounding is probably wrong. it's close enough.
123         // Rounding functions have tricky portability and whole-integer
124         // rounding is at least simpler to reason about.
125         secondsi++;
126     }
127     if (secondsi < 1) {
128         secondsi = 1;
129     }
130     return secondsi;
131 }
132 
133 // take string, table as arg:
134 // name, { every =, rerun = false, func = f }
135 // repeat defaults to true
mcplib_register_cron(lua_State * L)136 static int mcplib_register_cron(lua_State *L) {
137     proxy_ctx_t *ctx = PROXY_GET_CTX(L);
138     const char *name = luaL_checkstring(L, 1);
139     luaL_checktype(L, 2, LUA_TTABLE);
140 
141     // reserve an upvalue for storing the function.
142     mcp_cron_t *ce = lua_newuserdatauv(L, sizeof(mcp_cron_t), 1);
143     memset(ce, 0, sizeof(*ce));
144 
145     // default repeat.
146     ce->repeat = true;
147     // sync config generation.
148     ce->gen = ctx->config_generation;
149 
150     if (lua_getfield(L, 2, "func") != LUA_TNIL) {
151         luaL_checktype(L, -1, LUA_TFUNCTION);
152         lua_setiuservalue(L, 3, 1); // pop value
153     } else {
154         proxy_lua_error(L, "proxy cron entry missing 'func' field");
155         return 0;
156     }
157 
158     if (lua_getfield(L, 2, "rerun") != LUA_TNIL) {
159         int rerun = lua_toboolean(L, -1);
160         if (!rerun) {
161             ce->repeat = false;
162         }
163     }
164     lua_pop(L, 1); // pop val or nil
165 
166     // TODO: set a limit on 'every' so we don't have to worry about
167     // underflows. a year? a month?
168     if (lua_getfield(L, 2, "every") != LUA_TNIL) {
169         luaL_checktype(L, -1, LUA_TNUMBER);
170         int every = lua_tointeger(L, -1);
171         if (every < 1) {
172             proxy_lua_error(L, "proxy cron entry 'every' must be > 0");
173             return 0;
174         }
175         ce->every = every;
176     } else {
177         proxy_lua_error(L, "proxy cron entry missing 'every' field");
178         return 0;
179     }
180     lua_pop(L, 1); // pop val or nil
181 
182     // schedule the next cron run
183     struct timespec now;
184     clock_gettime(CLOCK_REALTIME, &now);
185     ce->next = now.tv_sec + ce->every;
186     // we may adjust ce->next shortly, so don't update global yet.
187 
188     // valid cron entry, now place into cron table.
189     lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->cron_ref);
190 
191     // first, check if a cron of this name already exists.
192     // if so and the 'every' field matches, inherit its 'next' field
193     // so we don't perpetually reschedule all crons.
194     if (lua_getfield(L, -1, name) != LUA_TNIL) {
195         mcp_cron_t *oldce = lua_touserdata(L, -1);
196         if (ce->every == oldce->every) {
197             ce->next = oldce->next;
198         }
199     }
200     lua_pop(L, 1); // drop val/nil
201 
202     lua_pushvalue(L, 3); // duplicate cron entry
203     lua_setfield(L, -2, name); // pop duplicate cron entry
204     lua_pop(L, 1); // drop cron table
205 
206     // update central cron sleep.
207     if (ctx->cron_next > ce->next) {
208         ctx->cron_next = ce->next;
209     }
210 
211     return 0;
212 }
213 
214 // just set ctx->loading = true
215 // called from config thread, so config_lock must be held, so it's safe to
216 // modify protected ctx contents.
mcplib_schedule_config_reload(lua_State * L)217 static int mcplib_schedule_config_reload(lua_State *L) {
218     proxy_ctx_t *ctx = PROXY_GET_CTX(L);
219     ctx->loading = true;
220     return 0;
221 }
222 
mcplib_time_real_millis(lua_State * L)223 static int mcplib_time_real_millis(lua_State *L) {
224     struct timespec now;
225     clock_gettime(CLOCK_REALTIME, &now);
226     lua_Integer t = now.tv_nsec / 1000000 + now.tv_sec * 1000;
227     lua_pushinteger(L, t);
228     return 1;
229 }
230 
mcplib_time_mono_millis(lua_State * L)231 static int mcplib_time_mono_millis(lua_State *L) {
232     struct timespec now;
233     clock_gettime(CLOCK_MONOTONIC, &now);
234     lua_Integer t = now.tv_nsec / 1000000 + now.tv_sec * 1000;
235     lua_pushinteger(L, t);
236     return 1;
237 }
238 
239 // end util funcs.
240 
241 // NOTE: backends are global objects owned by pool objects.
242 // Each pool has a "proxy pool object" distributed to each worker VM.
243 // proxy pool objects are held at the same time as any request exists on a
244 // backend, in the coroutine stack during yield()
245 // To free a backend: All proxies for a pool are collected, then the central
246 // pool is collected, which releases backend references, which allows backend
247 // to be collected.
mcplib_backend_wrap_gc(lua_State * L)248 static int mcplib_backend_wrap_gc(lua_State *L) {
249     mcp_backend_wrap_t *bew = luaL_checkudata(L, -1, "mcp.backendwrap");
250     proxy_ctx_t *ctx = PROXY_GET_CTX(L);
251 
252     if (bew->be != NULL) {
253         mcp_backend_t *be = bew->be;
254         // TODO (v3): technically a race where a backend could be created,
255         // queued, but not picked up before being gc'ed again. In practice
256         // this is impossible but at some point we should close the loop here.
257         // Since we're running in the config thread it could just busy poll
258         // until the connection was picked up.
259         assert(be->transferred);
260         // There has to be at least one connection, and the event_thread will
261         // always be the same.
262         proxy_event_thread_t *e = be->be[0].event_thread;
263         pthread_mutex_lock(&e->mutex);
264         STAILQ_INSERT_TAIL(&e->beconn_head_in, be, beconn_next);
265         pthread_mutex_unlock(&e->mutex);
266 
267         // Signal to check queue.
268 #ifdef USE_EVENTFD
269         uint64_t u = 1;
270         // TODO (v2): check result? is it ever possible to get a short write/failure
271         // for an eventfd?
272         if (write(e->be_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
273             assert(1 == 0);
274         }
275 #else
276         if (write(e->be_notify_send_fd, "w", 1) <= 0) {
277             assert(1 == 0);
278         }
279 #endif
280     }
281 
282     STAT_DECR(ctx, backend_total, 1);
283 
284     return 0;
285 }
286 
mcplib_backend_gc(lua_State * L)287 static int mcplib_backend_gc(lua_State *L) {
288     return 0; // no-op.
289 }
290 
291 // backend label object; given to pools which then find or create backend
292 // objects as necessary.
293 // allow optionally passing a table of arguments for extended options:
294 // { label = "etc", "host" = "127.0.0.1", port = "11211",
295 //   readtimeout = 0.5, connecttimeout = 1, retrytime = 3,
296 //   failurelimit = 3, tcpkeepalive = false }
mcplib_backend(lua_State * L)297 static int mcplib_backend(lua_State *L) {
298     size_t llen = 0;
299     size_t nlen = 0;
300     size_t plen = 0;
301     proxy_ctx_t *ctx = PROXY_GET_CTX(L);
302     mcp_backend_label_t *be = lua_newuserdatauv(L, sizeof(mcp_backend_label_t), 0);
303     memset(be, 0, sizeof(*be));
304     const char *label;
305     const char *name;
306     const char *port;
307     // copy global defaults for tunables.
308     memcpy(&be->tunables, &ctx->tunables, sizeof(be->tunables));
309     be->conncount = 1; // one connection per backend as default.
310 
311     if (lua_istable(L, 1)) {
312 
313         // We don't pop the label/host/port strings so lua won't change them
314         // until after the function call.
315         if (lua_getfield(L, 1, "label") != LUA_TNIL) {
316             label = luaL_checklstring(L, -1, &llen);
317         } else {
318             proxy_lua_error(L, "backend must have a label argument");
319             return 0;
320         }
321 
322         if (lua_getfield(L, 1, "host") != LUA_TNIL) {
323             name = luaL_checklstring(L, -1, &nlen);
324         } else {
325             proxy_lua_error(L, "backend must have a host argument");
326             return 0;
327         }
328 
329         // TODO: allow a default port.
330         if (lua_getfield(L, 1, "port") != LUA_TNIL) {
331             port = luaL_checklstring(L, -1, &plen);
332         } else {
333             proxy_lua_error(L, "backend must have a port argument");
334             return 0;
335         }
336 
337         if (lua_getfield(L, 1, "tcpkeepalive") != LUA_TNIL) {
338             be->tunables.tcp_keepalive = lua_toboolean(L, -1);
339         }
340         lua_pop(L, 1);
341 
342         if (lua_getfield(L, 1, "tls") != LUA_TNIL) {
343             be->tunables.use_tls = lua_toboolean(L, -1);
344         }
345         lua_pop(L, 1);
346 
347         if (lua_getfield(L, 1, "failurelimit") != LUA_TNIL) {
348             int limit = luaL_checkinteger(L, -1);
349             if (limit < 0) {
350                 proxy_lua_error(L, "failurelimit must be >= 0");
351                 return 0;
352             }
353 
354             be->tunables.backend_failure_limit = limit;
355         }
356         lua_pop(L, 1);
357 
358         if (lua_getfield(L, 1, "depthlimit") != LUA_TNIL) {
359             int limit = luaL_checkinteger(L, -1);
360             if (limit < 0) {
361                 proxy_lua_error(L, "depthlimit must be >= 0");
362                 return 0;
363             }
364 
365             be->tunables.backend_depth_limit = limit;
366         }
367         lua_pop(L, 1);
368 
369         if (lua_getfield(L, 1, "connecttimeout") != LUA_TNIL) {
370             lua_Number secondsf = luaL_checknumber(L, -1);
371             lua_Integer secondsi = (lua_Integer) secondsf;
372             lua_Number subseconds = secondsf - secondsi;
373 
374             be->tunables.connect.tv_sec = secondsi;
375             be->tunables.connect.tv_usec = MICROSECONDS(subseconds);
376         }
377         lua_pop(L, 1);
378 
379         // TODO (v2): print deprecation warning.
380         if (lua_getfield(L, 1, "retrytimeout") != LUA_TNIL) {
381             be->tunables.retry.tv_sec =
382                 _mcplib_backend_get_waittime(luaL_checknumber(L, -1));
383         }
384         lua_pop(L, 1);
385 
386         if (lua_getfield(L, 1, "retrywaittime") != LUA_TNIL) {
387             be->tunables.retry.tv_sec =
388                 _mcplib_backend_get_waittime(luaL_checknumber(L, -1));
389         }
390         lua_pop(L, 1);
391 
392         if (lua_getfield(L, 1, "retrytimeout") != LUA_TNIL) {
393             lua_Number secondsf = luaL_checknumber(L, -1);
394             lua_Integer secondsi = (lua_Integer) secondsf;
395             lua_Number subseconds = secondsf - secondsi;
396 
397             be->tunables.retry.tv_sec = secondsi;
398             be->tunables.retry.tv_usec = MICROSECONDS(subseconds);
399         }
400         lua_pop(L, 1);
401 
402         if (lua_getfield(L, 1, "readtimeout") != LUA_TNIL) {
403             lua_Number secondsf = luaL_checknumber(L, -1);
404             lua_Integer secondsi = (lua_Integer) secondsf;
405             lua_Number subseconds = secondsf - secondsi;
406 
407             be->tunables.read.tv_sec = secondsi;
408             be->tunables.read.tv_usec = MICROSECONDS(subseconds);
409         }
410         lua_pop(L, 1);
411 
412         if (lua_getfield(L, 1, "down") != LUA_TNIL) {
413             int down = lua_toboolean(L, -1);
414             be->tunables.down = down;
415         }
416         lua_pop(L, 1);
417 
418         if (lua_getfield(L, 1, "flaptime") != LUA_TNIL) {
419             lua_Number secondsf = luaL_checknumber(L, -1);
420             lua_Integer secondsi = (lua_Integer) secondsf;
421             lua_Number subseconds = secondsf - secondsi;
422 
423             be->tunables.flap.tv_sec = secondsi;
424             be->tunables.flap.tv_usec = MICROSECONDS(subseconds);
425         }
426         lua_pop(L, 1);
427 
428         if (lua_getfield(L, 1, "flapbackofframp") != LUA_TNIL) {
429             float ramp = luaL_checknumber(L, -1);
430             if (ramp <= 1.1) {
431                 ramp = 1.1;
432             }
433             be->tunables.flap_backoff_ramp = ramp;
434         }
435         lua_pop(L, 1);
436 
437         if (lua_getfield(L, 1, "flapbackoffmax") != LUA_TNIL) {
438             luaL_checknumber(L, -1);
439             uint32_t max = lua_tointeger(L, -1);
440             be->tunables.flap_backoff_max = max;
441         }
442         lua_pop(L, 1);
443 
444         if (lua_getfield(L, 1, "connections") != LUA_TNIL) {
445             int c = luaL_checkinteger(L, -1);
446             if (c <= 0) {
447                 proxy_lua_error(L, "backend connections argument must be >= 0");
448                 return 0;
449             } else if (c > 8) {
450                 proxy_lua_error(L, "backend connections argument must be <= 8");
451                 return 0;
452             }
453 
454             be->conncount = c;
455         }
456         lua_pop(L, 1);
457 
458     } else {
459         label = luaL_checklstring(L, 1, &llen);
460         name = luaL_checklstring(L, 2, &nlen);
461         port = luaL_checklstring(L, 3, &plen);
462     }
463 
464     if (llen > MAX_LABELLEN-1) {
465         proxy_lua_error(L, "backend label too long");
466         return 0;
467     }
468 
469     if (nlen > MAX_NAMELEN-1) {
470         proxy_lua_error(L, "backend name too long");
471         return 0;
472     }
473 
474     if (plen > MAX_PORTLEN-1) {
475         proxy_lua_error(L, "backend port too long");
476         return 0;
477     }
478 
479     memcpy(be->label, label, llen);
480     be->label[llen] = '\0';
481     memcpy(be->name, name, nlen);
482     be->name[nlen] = '\0';
483     memcpy(be->port, port, plen);
484     be->port[plen] = '\0';
485     be->llen = llen;
486     if (lua_istable(L, 1)) {
487         lua_pop(L, 3); // drop label, name, port.
488     }
489     luaL_getmetatable(L, "mcp.backend");
490     lua_setmetatable(L, -2); // set metatable to userdata.
491 
492     return 1; // return be object.
493 }
494 
495 // Called with the cache label at top of the stack.
_mcplib_backend_checkcache(lua_State * L,mcp_backend_label_t * bel)496 static mcp_backend_wrap_t *_mcplib_backend_checkcache(lua_State *L, mcp_backend_label_t *bel) {
497     // first check our reference table to compare.
498     // Note: The upvalue won't be found unless we're running from a function with it
499     // set as an upvalue.
500     int ret = lua_gettable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE));
501     if (ret != LUA_TNIL) {
502         mcp_backend_wrap_t *be_orig = luaL_checkudata(L, -1, "mcp.backendwrap");
503         if (strncmp(be_orig->be->name, bel->name, MAX_NAMELEN) == 0
504                 && strncmp(be_orig->be->port, bel->port, MAX_PORTLEN) == 0
505                 && be_orig->be->conncount == bel->conncount
506                 && memcmp(&be_orig->be->tunables, &bel->tunables, sizeof(bel->tunables)) == 0) {
507             // backend is the same, return it.
508             return be_orig;
509         } else {
510             // backend not the same, pop from stack and make new one.
511             lua_pop(L, 1);
512         }
513     } else {
514         lua_pop(L, 1); // pop the nil.
515     }
516 
517     return NULL;
518 }
519 
_mcplib_make_backendconn(lua_State * L,mcp_backend_label_t * bel,proxy_event_thread_t * e)520 static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_label_t *bel,
521         proxy_event_thread_t *e) {
522     proxy_ctx_t *ctx = PROXY_GET_CTX(L);
523 
524     mcp_backend_wrap_t *bew = lua_newuserdatauv(L, sizeof(mcp_backend_wrap_t), 0);
525     luaL_getmetatable(L, "mcp.backendwrap");
526     lua_setmetatable(L, -2); // set metatable to userdata.
527 
528     mcp_backend_t *be = calloc(1, sizeof(mcp_backend_t) + sizeof(struct mcp_backendconn_s) * bel->conncount);
529     if (be == NULL) {
530         proxy_lua_error(L, "out of memory allocating backend connection");
531         return NULL;
532     }
533     bew->be = be;
534 
535     strncpy(be->name, bel->name, MAX_NAMELEN+1);
536     strncpy(be->port, bel->port, MAX_PORTLEN+1);
537     strncpy(be->label, bel->label, MAX_LABELLEN+1);
538     memcpy(&be->tunables, &bel->tunables, sizeof(bel->tunables));
539     be->conncount = bel->conncount;
540     STAILQ_INIT(&be->io_head);
541 
542     for (int x = 0; x < bel->conncount; x++) {
543         struct mcp_backendconn_s *bec = &be->be[x];
544         bec->be_parent = be;
545         memcpy(&bec->tunables, &bel->tunables, sizeof(bel->tunables));
546         STAILQ_INIT(&bec->io_head);
547         bec->state = mcp_backend_read;
548 
549         // this leaves a permanent buffer on the backend, which is fine
550         // unless you have billions of backends.
551         // we can later optimize for pulling buffers from idle backends.
552         bec->rbuf = malloc(READ_BUFFER_SIZE);
553         if (bec->rbuf == NULL) {
554             proxy_lua_error(L, "out of memory allocating backend");
555             return NULL;
556         }
557 
558         // initialize the client
559         bec->client = malloc(mcmc_size(MCMC_OPTION_BLANK));
560         if (bec->client == NULL) {
561             proxy_lua_error(L, "out of memory allocating backend");
562             return NULL;
563         }
564         // TODO (v2): no way to change the TCP_KEEPALIVE state post-construction.
565         // This is a trivial fix if we ensure a backend's owning event thread is
566         // set before it can be used in the proxy, as it would have access to the
567         // tunables structure. _reset_bad_backend() may not have its event thread
568         // set 100% of the time and I don't want to introduce a crash right now,
569         // so I'm writing this overly long comment. :)
570         int flags = MCMC_OPTION_NONBLOCK;
571         STAT_L(ctx);
572         if (ctx->tunables.tcp_keepalive) {
573             flags |= MCMC_OPTION_TCP_KEEPALIVE;
574         }
575         STAT_UL(ctx);
576         bec->connect_flags = flags;
577 
578         // FIXME: remove ifdef via an initialized checker? or
579         // mcp_tls_backend_init response code?
580 #ifdef PROXY_TLS
581         if (be->tunables.use_tls && !ctx->tls_ctx) {
582             proxy_lua_error(L, "TLS requested but not initialized: call mcp.init_tls()");
583             return NULL;
584         }
585 #endif
586         mcp_tls_backend_init(ctx, bec);
587 
588         bec->event_thread = e;
589     }
590     pthread_mutex_lock(&e->mutex);
591     STAILQ_INSERT_TAIL(&e->beconn_head_in, be, beconn_next);
592     pthread_mutex_unlock(&e->mutex);
593 
594     // Signal to check queue.
595 #ifdef USE_EVENTFD
596     uint64_t u = 1;
597     // TODO (v2): check result? is it ever possible to get a short write/failure
598     // for an eventfd?
599     if (write(e->be_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
600         assert(1 == 0);
601     }
602 #else
603     if (write(e->be_notify_send_fd, "w", 1) <= 0) {
604         assert(1 == 0);
605     }
606 #endif
607 
608     lua_pushvalue(L, -2); // push the label string back to the top.
609     // Add this new backend connection to the object cache.
610     lua_pushvalue(L, -2); // copy the backend reference to the top.
611     // set our new backend wrapper object into the reference table.
612     lua_settable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE));
613     // stack is back to having backend on the top.
614 
615     STAT_INCR(ctx, backend_total, 1);
616 
617     return bew;
618 }
619 
mcplib_pool_gc(lua_State * L)620 static int mcplib_pool_gc(lua_State *L) {
621     mcp_pool_t *p = luaL_checkudata(L, -1, "mcp.pool");
622 
623     mcp_gobj_finalize(&p->g);
624 
625     luaL_unref(L, LUA_REGISTRYINDEX, p->phc_ref);
626 
627     for (int x = 0; x < p->pool_be_total; x++) {
628         if (p->pool[x].ref) {
629             luaL_unref(L, LUA_REGISTRYINDEX, p->pool[x].ref);
630         }
631     }
632 
633     return 0;
634 }
635 
636 // Looks for a short string in a key to separate which part gets hashed vs
637 // sent to the backend node.
638 // ie: "foo:bar|#|restofkey" - only "foo:bar" gets hashed.
mcp_key_hash_filter_stop(const char * conf,const char * key,size_t klen,size_t * newlen)639 static const char *mcp_key_hash_filter_stop(const char *conf, const char *key, size_t klen, size_t *newlen) {
640     char temp[KEY_MAX_LENGTH+1];
641     *newlen = klen;
642     if (klen > KEY_MAX_LENGTH) {
643         // Hedging against potential bugs.
644         return key;
645     }
646 
647     memcpy(temp, key, klen);
648     temp[klen+1] = '\0';
649 
650     // TODO (v2): memmem would avoid the temp key and memcpy here, but it's
651     // not technically portable. An easy improvement would be to detect
652     // memmem() in `configure` and only use strstr/copy as a fallback.
653     // Since keys are short it's unlikely this would be a major performance
654     // win.
655     char *found = strstr(temp, conf);
656 
657     if (found) {
658         *newlen = found - temp;
659     }
660 
661     // hash stop can't change where keys start.
662     return key;
663 }
664 
665 // Takes a two character "tag", ie; "{}", or "$$", searches string for the
666 // first then second character. Only hashes the portion within these tags.
667 // *conf _must_ be two characters.
mcp_key_hash_filter_tag(const char * conf,const char * key,size_t klen,size_t * newlen)668 static const char *mcp_key_hash_filter_tag(const char *conf, const char *key, size_t klen, size_t *newlen) {
669     *newlen = klen;
670 
671     const char *t1 = memchr(key, conf[0], klen);
672     if (t1) {
673         size_t remain = klen - (t1 - key);
674         // must be at least one character inbetween the tags to hash.
675         if (remain > 1) {
676             const char *t2 = memchr(t1, conf[1], remain);
677 
678             if (t2) {
679                 *newlen = t2 - t1 - 1;
680                 return t1+1;
681             }
682         }
683     }
684 
685     return key;
686 }
687 
_mcplib_pool_dist(lua_State * L,mcp_pool_t * p)688 static void _mcplib_pool_dist(lua_State *L, mcp_pool_t *p) {
689     luaL_checktype(L, -1, LUA_TTABLE);
690     if (lua_getfield(L, -1, "new") != LUA_TFUNCTION) {
691         proxy_lua_error(L, "key distribution object missing 'new' function");
692         return;
693     }
694 
695     // - now create the copy pool table
696     lua_createtable(L, p->pool_size, 0); // give the new pool table a sizing hint.
697     for (int x = 1; x <= p->pool_size; x++) {
698         mcp_backend_t *be = p->pool[x-1].be;
699         lua_createtable(L, 0, 4);
700         // stack = [p, h, f, optN, newpool, backend]
701         // the key should be fine for id? maybe don't need to duplicate
702         // this?
703         lua_pushinteger(L, x);
704         lua_setfield(L, -2, "id");
705         // we don't use the hostname for ketama hashing
706         // so passing ip for hostname is fine
707         lua_pushstring(L, be->name);
708         lua_setfield(L, -2, "addr");
709         lua_pushstring(L, be->port);
710         lua_setfield(L, -2, "port");
711 
712         // set the backend table into the new pool table.
713         lua_rawseti(L, -2, x);
714     }
715 
716     // we can either use lua_insert() or possibly _rotate to shift
717     // things into the right place, but simplest is to just copy the
718     // option arg to the end of the stack.
719     lua_pushvalue(L, 2);
720     //   - stack should be: pool, opts, func, pooltable, opts
721 
722     // call the dist new function.
723     int res = lua_pcall(L, 2, 2, 0);
724 
725     if (res != LUA_OK) {
726         lua_error(L); // error should be on the stack already.
727         return;
728     }
729 
730     // -1 is lightuserdata ptr to the struct (which must be owned by the
731     // userdata), which is later used for internal calls.
732     struct proxy_hash_caller *phc;
733 
734     luaL_checktype(L, -1, LUA_TLIGHTUSERDATA);
735     luaL_checktype(L, -2, LUA_TUSERDATA);
736     phc = lua_touserdata(L, -1);
737     memcpy(&p->phc, phc, sizeof(*phc));
738     lua_pop(L, 1);
739     // -2 was userdata we need to hold a reference to
740     p->phc_ref = luaL_ref(L, LUA_REGISTRYINDEX);
741     // UD now popped from stack.
742 }
743 
744 // in the proxy object, we can alias a ptr to the pool to where it needs to be
745 // based on worker number or io_thread right?
_mcplib_pool_make_be_loop(lua_State * L,mcp_pool_t * p,int offset,proxy_event_thread_t * t)746 static void _mcplib_pool_make_be_loop(lua_State *L, mcp_pool_t *p, int offset, proxy_event_thread_t *t) {
747     // remember lua arrays are 1 indexed.
748     for (int x = 1; x <= p->pool_size; x++) {
749         mcp_pool_be_t *s = &p->pool[x-1 + (offset * p->pool_size)];
750         lua_geti(L, 1, x); // get next server into the stack.
751         // If we bail here, the pool _gc() should handle releasing any backend
752         // references we made so far.
753         mcp_backend_label_t *bel = luaL_checkudata(L, -1, "mcp.backend");
754 
755         // check label for pre-existing backend conn/wrapper
756         // TODO (v2): there're native ways of "from C make lua strings"
757         int toconcat = 1;
758         if (p->beprefix[0] != '\0') {
759             lua_pushstring(L, p->beprefix);
760             toconcat++;
761         }
762         if (p->use_iothread) {
763             lua_pushstring(L, ":io:");
764             toconcat++;
765         } else {
766             lua_pushstring(L, ":w");
767             lua_pushinteger(L, offset);
768             lua_pushstring(L, ":");
769             toconcat += 3;
770         }
771         lua_pushlstring(L, bel->label, bel->llen);
772         lua_concat(L, toconcat);
773 
774         lua_pushvalue(L, -1); // copy the label string for the create method.
775         mcp_backend_wrap_t *bew = _mcplib_backend_checkcache(L, bel);
776         if (bew == NULL) {
777             bew = _mcplib_make_backendconn(L, bel, t);
778         }
779         s->be = bew->be; // unwrap the backend connection for direct ref.
780         bew->be->use_io_thread = p->use_iothread;
781 
782         // If found from cache or made above, the backend wrapper is on the
783         // top of the stack, so we can now take its reference.
784         // The wrapper abstraction allows the be memory to be owned by its
785         // destination thread (IO thread/etc).
786 
787         s->ref = luaL_ref(L, LUA_REGISTRYINDEX); // references and pops object.
788         lua_pop(L, 1); // pop the mcp.backend label object.
789         lua_pop(L, 1); // drop extra label copy.
790     }
791 }
792 
793 // call with table of backends in 1
_mcplib_pool_make_be(lua_State * L,mcp_pool_t * p)794 static void _mcplib_pool_make_be(lua_State *L, mcp_pool_t *p) {
795     if (p->use_iothread) {
796         proxy_ctx_t *ctx = PROXY_GET_CTX(L);
797         _mcplib_pool_make_be_loop(L, p, 0, ctx->proxy_io_thread);
798     } else {
799         // TODO (v3) globals.
800         for (int n = 0; n < settings.num_threads; n++) {
801             LIBEVENT_THREAD *t = get_worker_thread(n);
802             _mcplib_pool_make_be_loop(L, p, t->thread_baseid, t->proxy_event_thread);
803         }
804     }
805 }
806 
807 // p = mcp.pool(backends, { dist = f, hashfilter = f, seed = "a", hash = f })
mcplib_pool(lua_State * L)808 static int mcplib_pool(lua_State *L) {
809     proxy_ctx_t *ctx = PROXY_GET_CTX(L);
810     int argc = lua_gettop(L);
811     luaL_checktype(L, 1, LUA_TTABLE);
812     int n = luaL_len(L, 1); // get length of array table
813     int workers = settings.num_threads; // TODO (v3): globals usage.
814 
815     size_t plen = sizeof(mcp_pool_t) + (sizeof(mcp_pool_be_t) * n * workers);
816     mcp_pool_t *p = lua_newuserdatauv(L, plen, 0);
817     // Zero the memory before use, so we can realibly use __gc to clean up
818     memset(p, 0, plen);
819     p->pool_size = n;
820     p->pool_be_total = n * workers;
821     p->use_iothread = ctx->tunables.use_iothread;
822     // TODO (v2): Nicer if this is fetched from mcp.default_key_hash
823     p->key_hasher = XXH3_64bits_withSeed;
824     pthread_mutex_init(&p->g.lock, NULL);
825     p->ctx = PROXY_GET_CTX(L);
826 
827     luaL_setmetatable(L, "mcp.pool");
828 
829     // Allow passing an ignored nil as a second argument. Makes the lua easier
830     int type = lua_type(L, 2);
831     if (argc == 1 || type == LUA_TNIL) {
832         _mcplib_pool_make_be(L, p);
833         lua_getglobal(L, "mcp");
834         // TODO (v2): decide on a mcp.default_dist and use that instead
835         if (lua_getfield(L, -1, "dist_jump_hash") != LUA_TNIL) {
836             _mcplib_pool_dist(L, p);
837             lua_pop(L, 1); // pop "dist_jump_hash" value.
838         } else {
839             lua_pop(L, 1);
840         }
841         lua_pop(L, 1); // pop "mcp"
842         return 1;
843     }
844 
845     // Supplied with an options table. We inspect this table to decorate the
846     // pool, then pass it along to the a constructor if necessary.
847     luaL_checktype(L, 2, LUA_TTABLE);
848 
849     if (lua_getfield(L, 2, "iothread") != LUA_TNIL) {
850         luaL_checktype(L, -1, LUA_TBOOLEAN);
851         int use_iothread = lua_toboolean(L, -1);
852         if (use_iothread) {
853             p->use_iothread = true;
854         } else {
855             p->use_iothread = false;
856         }
857         lua_pop(L, 1); // remove value.
858     } else {
859         lua_pop(L, 1); // pop the nil.
860     }
861 
862     if (lua_getfield(L, 2, "beprefix") != LUA_TNIL) {
863         luaL_checktype(L, -1, LUA_TSTRING);
864         size_t len = 0;
865         const char *bepfx = lua_tolstring(L, -1, &len);
866         memcpy(p->beprefix, bepfx, len);
867         p->beprefix[len+1] = '\0';
868         lua_pop(L, 1); // pop beprefix string.
869     } else {
870         lua_pop(L, 1); // pop the nil.
871     }
872     _mcplib_pool_make_be(L, p);
873 
874     // stack: backends, options, mcp.pool
875     if (lua_getfield(L, 2, "dist") != LUA_TNIL) {
876         // overriding the distribution function.
877         _mcplib_pool_dist(L, p);
878         lua_pop(L, 1); // remove the dist table from stack.
879     } else {
880         lua_pop(L, 1); // pop the nil.
881 
882         // use the default dist if not specified with an override table.
883         lua_getglobal(L, "mcp");
884         // TODO (v2): decide on a mcp.default_dist and use that instead
885         if (lua_getfield(L, -1, "dist_jump_hash") != LUA_TNIL) {
886             _mcplib_pool_dist(L, p);
887             lua_pop(L, 1); // pop "dist_jump_hash" value.
888         } else {
889             lua_pop(L, 1);
890         }
891         lua_pop(L, 1); // pop "mcp"
892     }
893 
894     if (lua_getfield(L, 2, "filter") != LUA_TNIL) {
895         luaL_checktype(L, -1, LUA_TSTRING);
896         const char *f_type = lua_tostring(L, -1);
897         if (strcmp(f_type, "stop") == 0) {
898             p->key_filter = mcp_key_hash_filter_stop;
899         } else if (strcmp(f_type, "tags") == 0) {
900             p->key_filter = mcp_key_hash_filter_tag;
901         } else {
902             proxy_lua_ferror(L, "unknown hash filter specified: %s\n", f_type);
903         }
904 
905         lua_pop(L, 1); // pops "filter" value.
906 
907         if (lua_getfield(L, 2, "filter_conf") == LUA_TSTRING) {
908             size_t len = 0;
909             const char *conf = lua_tolstring(L, -1, &len);
910             if (len < 2 || len > KEY_HASH_FILTER_MAX) {
911                 proxy_lua_ferror(L, "hash filter conf must be between 2 and %d characters", KEY_HASH_FILTER_MAX);
912             }
913 
914             memcpy(p->key_filter_conf, conf, len);
915             p->key_filter_conf[len+1] = '\0';
916         } else {
917             proxy_lua_error(L, "hash filter requires 'filter_conf' string");
918         }
919         lua_pop(L, 1); // pops "filter_conf" value.
920     } else {
921         lua_pop(L, 1); // pop the nil.
922     }
923 
924     if (lua_getfield(L, 2, "hash") != LUA_TNIL) {
925         luaL_checktype(L, -1, LUA_TLIGHTUSERDATA);
926         struct proxy_hash_func *phf = lua_touserdata(L, -1);
927         p->key_hasher = phf->func;
928         lua_pop(L, 1);
929     } else {
930         lua_pop(L, 1); // pop the nil.
931     }
932 
933     if (lua_getfield(L, 2, "seed") != LUA_TNIL) {
934         luaL_checktype(L, -1, LUA_TSTRING);
935         size_t seedlen;
936         const char *seedstr = lua_tolstring(L, -1, &seedlen);
937         // Note: the custom hasher for a dist may be "weird" in some cases, so
938         // we use a standard hash method for the seed here.
939         // I'm open to changing this (ie; mcp.pool_seed_hasher = etc)
940         p->hash_seed = XXH3_64bits(seedstr, seedlen);
941 
942         lua_pop(L, 1);
943     } else {
944         lua_pop(L, 1); // pop the nil.
945     }
946 
947     if (p->phc.selector_func == NULL) {
948         proxy_lua_error(L, "cannot create pool missing 'dist' argument");
949     }
950 
951     return 1;
952 }
953 
mcplib_pool_proxy_gc(lua_State * L)954 static int mcplib_pool_proxy_gc(lua_State *L) {
955     mcp_pool_proxy_t *pp = luaL_checkudata(L, -1, "mcp.pool_proxy");
956     mcp_pool_t *p = pp->main;
957     pthread_mutex_lock(&p->g.lock);
958     p->g.refcount--;
959     if (p->g.refcount == 0) {
960         proxy_ctx_t *ctx = p->ctx;
961         pthread_mutex_lock(&ctx->manager_lock);
962         STAILQ_INSERT_TAIL(&ctx->manager_head, &p->g, next);
963         pthread_cond_signal(&ctx->manager_cond);
964         pthread_mutex_unlock(&ctx->manager_lock);
965     }
966     pthread_mutex_unlock(&p->g.lock);
967 
968     return 0;
969 }
970 
mcplib_pool_proxy_call_helper(mcp_pool_proxy_t * pp,const char * key,size_t len)971 mcp_backend_t *mcplib_pool_proxy_call_helper(mcp_pool_proxy_t *pp, const char *key, size_t len) {
972     mcp_pool_t *p = pp->main;
973     if (p->key_filter) {
974         key = p->key_filter(p->key_filter_conf, key, len, &len);
975         P_DEBUG("%s: filtered key for hashing (%.*s)\n", __func__, (int)len, key);
976     }
977     uint64_t hash = p->key_hasher(key, len, p->hash_seed);
978     uint32_t lookup = p->phc.selector_func(hash, p->phc.ctx);
979 
980     assert(p->phc.ctx != NULL);
981     if (lookup >= p->pool_size) {
982         return NULL;
983     }
984 
985     return pp->pool[lookup].be;
986 }
987 
mcplib_backend_use_iothread(lua_State * L)988 static int mcplib_backend_use_iothread(lua_State *L) {
989     luaL_checktype(L, -1, LUA_TBOOLEAN);
990     int state = lua_toboolean(L, -1);
991     proxy_ctx_t *ctx = PROXY_GET_CTX(L);
992 
993     STAT_L(ctx);
994     ctx->tunables.use_iothread = state;
995     STAT_UL(ctx);
996 
997     return 0;
998 }
999 
mcplib_backend_use_tls(lua_State * L)1000 static int mcplib_backend_use_tls(lua_State *L) {
1001     luaL_checktype(L, -1, LUA_TBOOLEAN);
1002     int state = lua_toboolean(L, -1);
1003     proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1004 #ifndef PROXY_TLS
1005     if (state == 1) {
1006         proxy_lua_error(L, "cannot set mcp.backend_use_tls: TLS support not compiled");
1007     }
1008 #endif
1009     STAT_L(ctx);
1010     ctx->tunables.use_tls = state;
1011     STAT_UL(ctx);
1012 
1013     return 0;
1014 }
1015 
1016 // TODO: error checking.
mcplib_init_tls(lua_State * L)1017 static int mcplib_init_tls(lua_State *L) {
1018 #ifndef PROXY_TLS
1019     proxy_lua_error(L, "cannot run mcp.init_tls: TLS support not compiled");
1020 #else
1021     proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1022     mcp_tls_init(ctx);
1023 #endif
1024 
1025     return 0;
1026 }
1027 
mcplib_tcp_keepalive(lua_State * L)1028 static int mcplib_tcp_keepalive(lua_State *L) {
1029     luaL_checktype(L, -1, LUA_TBOOLEAN);
1030     int state = lua_toboolean(L, -1);
1031     proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1032 
1033     STAT_L(ctx);
1034     ctx->tunables.tcp_keepalive = state;
1035     STAT_UL(ctx);
1036 
1037     return 0;
1038 }
1039 
mcplib_backend_failure_limit(lua_State * L)1040 static int mcplib_backend_failure_limit(lua_State *L) {
1041     int limit = luaL_checkinteger(L, -1);
1042     proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1043 
1044     if (limit < 0) {
1045         proxy_lua_error(L, "backend_failure_limit must be >= 0");
1046         return 0;
1047     }
1048 
1049     STAT_L(ctx);
1050     ctx->tunables.backend_failure_limit = limit;
1051     STAT_UL(ctx);
1052 
1053     return 0;
1054 }
1055 
mcplib_backend_depth_limit(lua_State * L)1056 static int mcplib_backend_depth_limit(lua_State *L) {
1057     int limit = luaL_checkinteger(L, -1);
1058     proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1059 
1060     if (limit < 0) {
1061         proxy_lua_error(L, "backend_depth_limit must be >= 0");
1062         return 0;
1063     }
1064 
1065     STAT_L(ctx);
1066     ctx->tunables.backend_depth_limit = limit;
1067     STAT_UL(ctx);
1068 
1069     return 0;
1070 }
1071 
mcplib_backend_connect_timeout(lua_State * L)1072 static int mcplib_backend_connect_timeout(lua_State *L) {
1073     lua_Number secondsf = luaL_checknumber(L, -1);
1074     lua_Integer secondsi = (lua_Integer) secondsf;
1075     lua_Number subseconds = secondsf - secondsi;
1076     proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1077 
1078     STAT_L(ctx);
1079     ctx->tunables.connect.tv_sec = secondsi;
1080     ctx->tunables.connect.tv_usec = MICROSECONDS(subseconds);
1081     STAT_UL(ctx);
1082 
1083     return 0;
1084 }
1085 
mcplib_backend_retry_waittime(lua_State * L)1086 static int mcplib_backend_retry_waittime(lua_State *L) {
1087     lua_Number secondsf = luaL_checknumber(L, -1);
1088     proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1089     lua_Integer secondsi = _mcplib_backend_get_waittime(secondsf);
1090 
1091     STAT_L(ctx);
1092     ctx->tunables.retry.tv_sec = secondsi;
1093     ctx->tunables.retry.tv_usec = 0;
1094     STAT_UL(ctx);
1095 
1096     return 0;
1097 }
1098 
1099 // TODO (v2): deprecation notice print when using this function.
mcplib_backend_retry_timeout(lua_State * L)1100 static int mcplib_backend_retry_timeout(lua_State *L) {
1101     return mcplib_backend_retry_waittime(L);
1102 }
1103 
mcplib_backend_read_timeout(lua_State * L)1104 static int mcplib_backend_read_timeout(lua_State *L) {
1105     lua_Number secondsf = luaL_checknumber(L, -1);
1106     lua_Integer secondsi = (lua_Integer) secondsf;
1107     lua_Number subseconds = secondsf - secondsi;
1108     proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1109 
1110     STAT_L(ctx);
1111     ctx->tunables.read.tv_sec = secondsi;
1112     ctx->tunables.read.tv_usec = MICROSECONDS(subseconds);
1113     STAT_UL(ctx);
1114 
1115     return 0;
1116 }
1117 
mcplib_backend_flap_time(lua_State * L)1118 static int mcplib_backend_flap_time(lua_State *L) {
1119     lua_Number secondsf = luaL_checknumber(L, -1);
1120     lua_Integer secondsi = (lua_Integer) secondsf;
1121     lua_Number subseconds = secondsf - secondsi;
1122     proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1123 
1124     STAT_L(ctx);
1125     ctx->tunables.flap.tv_sec = secondsi;
1126     ctx->tunables.flap.tv_usec = MICROSECONDS(subseconds);
1127     STAT_UL(ctx);
1128 
1129     return 0;
1130 }
1131 
mcplib_backend_flap_backoff_ramp(lua_State * L)1132 static int mcplib_backend_flap_backoff_ramp(lua_State *L) {
1133     float factor = luaL_checknumber(L, -1);
1134     proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1135     if (factor <= 1.1) {
1136         factor = 1.1;
1137     }
1138 
1139     STAT_L(ctx);
1140     ctx->tunables.flap_backoff_ramp = factor;
1141     STAT_UL(ctx);
1142 
1143     return 0;
1144 }
1145 
mcplib_backend_flap_backoff_max(lua_State * L)1146 static int mcplib_backend_flap_backoff_max(lua_State *L) {
1147     luaL_checknumber(L, -1);
1148     uint32_t max = lua_tointeger(L, -1);
1149     proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1150 
1151     STAT_L(ctx);
1152     ctx->tunables.flap_backoff_max = max;
1153     STAT_UL(ctx);
1154 
1155     return 0;
1156 }
1157 
mcplib_stat_limit(lua_State * L)1158 static int mcplib_stat_limit(lua_State *L) {
1159     proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1160     int limit = luaL_checkinteger(L, -1);
1161 
1162     if (limit == 0) {
1163         limit = MAX_USTATS_DEFAULT;
1164     }
1165     if (limit > MAX_USTATS_DEFAULT) {
1166         fprintf(stderr, "PROXY WARNING: setting ustats limit above default may cause performance problems\n");
1167     }
1168 
1169     // lock isn't necessary as this is only used from the config thread.
1170     // keeping the lock call for code consistency.
1171     STAT_L(ctx);
1172     ctx->tunables.max_ustats = limit;
1173     STAT_UL(ctx);
1174     return 0;
1175 }
1176 
mcplib_active_req_limit(lua_State * L)1177 static int mcplib_active_req_limit(lua_State *L) {
1178     proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1179     uint64_t limit = luaL_checkinteger(L, -1);
1180 
1181     if (limit == 0) {
1182         limit = UINT64_MAX;
1183     } else {
1184         // FIXME: global
1185         int tcount = settings.num_threads;
1186         // The actual limit is per-worker-thread, so divide it up.
1187         if (limit > tcount * 2) {
1188             limit /= tcount;
1189         }
1190     }
1191 
1192     STAT_L(ctx);
1193     ctx->active_req_limit = limit;
1194     STAT_UL(ctx);
1195 
1196     return 0;
1197 }
1198 
1199 // limit specified in kilobytes
mcplib_buffer_memory_limit(lua_State * L)1200 static int mcplib_buffer_memory_limit(lua_State *L) {
1201     proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1202     uint64_t limit = luaL_checkinteger(L, -1);
1203 
1204     if (limit == 0) {
1205         limit = UINT64_MAX;
1206     } else {
1207         limit *= 1024;
1208 
1209         int tcount = settings.num_threads;
1210         if (limit > tcount * 2) {
1211             limit /= tcount;
1212         }
1213     }
1214     ctx->buffer_memory_limit = limit;
1215 
1216     return 0;
1217 }
1218 
1219 // mcp.attach(mcp.HOOK_NAME, function)
1220 // fill hook structure: if lua function, use luaL_ref() to store the func
mcplib_attach(lua_State * L)1221 static int mcplib_attach(lua_State *L) {
1222     // Pull the original worker thread out of the shared mcplib upvalue.
1223     LIBEVENT_THREAD *t = PROXY_GET_THR(L);
1224 
1225     int hook = luaL_checkinteger(L, 1);
1226     // pushvalue to dupe func and etc.
1227     // can leave original func on stack afterward because it'll get cleared.
1228     int loop_end = 0;
1229     int loop_start = 1;
1230     if (hook == CMD_ANY) {
1231         // if CMD_ANY we need individually set loop 1 to CMD_SIZE.
1232         loop_end = CMD_SIZE;
1233     } else if (hook == CMD_ANY_STORAGE) {
1234         // if CMD_ANY_STORAGE we only override get/set/etc.
1235         loop_end = CMD_END_STORAGE;
1236     } else {
1237         loop_start = hook;
1238         loop_end = hook + 1;
1239     }
1240 
1241     mcp_funcgen_t *fgen = NULL;
1242     if (lua_isfunction(L, 2)) {
1243         // create a funcgen with null generator that calls this function
1244         lua_pushvalue(L, 2); // function must be at top of stack.
1245         mcplib_funcgenbare_new(L); // convert it into a function generator.
1246         fgen = luaL_checkudata(L, -1, "mcp.funcgen"); // set our pointer ref.
1247         lua_replace(L, 2); // move the function generator over the input
1248                            // function. necessary for alignment with the rest
1249                            // of the code.
1250         lua_pop(L, 1); // drop the extra generator function reference.
1251     } else if ((fgen = luaL_testudata(L, 2, "mcp.funcgen")) != NULL) {
1252         // good
1253     } else {
1254         proxy_lua_error(L, "mcp.attach: must pass a function");
1255         return 0;
1256     }
1257 
1258     if (fgen->closed) {
1259         proxy_lua_error(L, "mcp.attach: cannot use a previously replaced function");
1260         return 0;
1261     }
1262 
1263     {
1264         struct proxy_hook *hooks = t->proxy_hooks;
1265         uint64_t tag = 0; // listener socket tag
1266 
1267         if (lua_isstring(L, 3)) {
1268             size_t len;
1269             const char *stag = lua_tolstring(L, 3, &len);
1270             if (len < 1 || len > 8) {
1271                 proxy_lua_error(L, "mcp.attach: tag must be 1 to 8 characters");
1272                 return 0;
1273             }
1274             memcpy(&tag, stag, len);
1275         }
1276 
1277         for (int x = loop_start; x < loop_end; x++) {
1278             struct proxy_hook *h = &hooks[x];
1279             if (x == CMD_MN) {
1280                 // disallow overriding MN so client pipeline flushes work.
1281                 // need to add flush support before allowing override
1282                 continue;
1283             }
1284             lua_pushvalue(L, 2); // duplicate the ref.
1285             struct proxy_hook_ref *href = &h->ref;
1286 
1287             if (tag) {
1288                 // listener was tagged. use the extended hook structure.
1289                 struct proxy_hook_tagged *pht = h->tagged;
1290 
1291                 if (h->tagcount == 0) {
1292                     pht = calloc(1, sizeof(struct proxy_hook_tagged));
1293                     if (pht == NULL) {
1294                         proxy_lua_error(L, "mcp.attach: failure allocating tagged hooks");
1295                         return 0;
1296                     }
1297                     h->tagcount = 1;
1298                     h->tagged = pht;
1299                 }
1300 
1301                 bool found = false;
1302                 for (int x = 0; x < h->tagcount; x++) {
1303                     if (pht->tag == tag || pht->tag == 0) {
1304                         found = true;
1305                         break;
1306                     }
1307                     pht++;
1308                 }
1309 
1310                 // need to resize the array to fit the new tag.
1311                 if (!found) {
1312                     struct proxy_hook_tagged *temp = realloc(h->tagged, sizeof(struct proxy_hook_tagged) * (h->tagcount+1));
1313                     if (!temp) {
1314                         proxy_lua_error(L, "mcp.attach: failure to resize tagged hooks");
1315                         return 0;
1316                     }
1317                     pht = &temp[h->tagcount];
1318                     memset(pht, 0, sizeof(*pht));
1319                     h->tagcount++;
1320                     h->tagged = temp;
1321                 }
1322 
1323                 href = &pht->ref;
1324                 pht->tag = tag;
1325             }
1326 
1327             // now assign our hook reference.
1328             if (href->lua_ref) {
1329                 // Found existing tagged hook.
1330                 luaL_unref(L, LUA_REGISTRYINDEX, href->lua_ref);
1331                 mcp_funcgen_dereference(L, href->ctx);
1332             }
1333 
1334             lua_pushvalue(L, -1); // duplicate the funcgen
1335             mcp_funcgen_reference(L);
1336             href->lua_ref = luaL_ref(L, LUA_REGISTRYINDEX);
1337             href->ctx = fgen;
1338             assert(href->lua_ref != 0);
1339         }
1340     }
1341 
1342     return 0;
1343 }
1344 
1345 /*** START lua interface to logger ***/
1346 
1347 // user logger specific to the config thread
mcplib_ct_log(lua_State * L)1348 static int mcplib_ct_log(lua_State *L) {
1349     const char *msg = luaL_checkstring(L, -1);
1350     // The only difference is we pull the logger from thread local storage.
1351     LOGGER_LOG(NULL, LOG_PROXYUSER, LOGGER_PROXY_USER, NULL, msg);
1352     return 0;
1353 }
1354 
mcplib_log(lua_State * L)1355 static int mcplib_log(lua_State *L) {
1356     LIBEVENT_THREAD *t = PROXY_GET_THR(L);
1357     const char *msg = luaL_checkstring(L, -1);
1358     LOGGER_LOG(t->l, LOG_PROXYUSER, LOGGER_PROXY_USER, NULL, msg);
1359     return 0;
1360 }
1361 
1362 // (request, resp, "detail")
mcplib_log_req(lua_State * L)1363 static int mcplib_log_req(lua_State *L) {
1364     LIBEVENT_THREAD *t = PROXY_GET_THR(L);
1365     logger *l = t->l;
1366     // Not using the LOGGER_LOG macro so we can avoid as much overhead as
1367     // possible when logging is disabled.
1368     if (! (l->eflags & LOG_PROXYREQS)) {
1369         return 0;
1370     }
1371     int rtype = 0;
1372     int rcode = 0;
1373     int rstatus = 0;
1374     long elapsed = 0;
1375     char *rname = NULL;
1376     char *rport = NULL;
1377 
1378     mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request");
1379     int type = lua_type(L, 2);
1380     if (type == LUA_TUSERDATA) {
1381         mcp_resp_t *rs = luaL_checkudata(L, 2, "mcp.response");
1382         rtype = rs->resp.type;
1383         rcode = rs->resp.code;
1384         rstatus = rs->status;
1385         rname = rs->be_name;
1386         rport = rs->be_port;
1387         elapsed = rs->elapsed;
1388     }
1389     size_t dlen = 0;
1390     const char *detail = luaL_optlstring(L, 3, NULL, &dlen);
1391     int cfd = luaL_optinteger(L, 4, 0);
1392 
1393     logger_log(l, LOGGER_PROXY_REQ, NULL, rq->pr.request, rq->pr.reqlen, elapsed, rtype, rcode, rstatus, cfd, detail, dlen, rname, rport);
1394 
1395     return 0;
1396 }
1397 
_rotl(const uint32_t x,int k)1398 static inline uint32_t _rotl(const uint32_t x, int k) {
1399     return (x << k) | (x >> (32 - k));
1400 }
1401 
1402 // xoroshiro128++ 32bit version.
_nextrand(uint32_t * s)1403 static uint32_t _nextrand(uint32_t *s) {
1404     const uint32_t result = _rotl(s[0] + s[3], 7) + s[0];
1405 
1406     const uint32_t t = s[1] << 9;
1407 
1408     s[2] ^= s[0];
1409     s[3] ^= s[1];
1410     s[1] ^= s[2];
1411     s[0] ^= s[3];
1412 
1413     s[2] ^= t;
1414 
1415     s[3] = _rotl(s[3], 11);
1416 
1417     return result;
1418 }
1419 
1420 
1421 // (milliseconds, sample_rate, allerrors, request, resp, "detail")
mcplib_log_reqsample(lua_State * L)1422 static int mcplib_log_reqsample(lua_State *L) {
1423     LIBEVENT_THREAD *t = PROXY_GET_THR(L);
1424     logger *l = t->l;
1425     // Not using the LOGGER_LOG macro so we can avoid as much overhead as
1426     // possible when logging is disabled.
1427     if (! (l->eflags & LOG_PROXYREQS)) {
1428         return 0;
1429     }
1430     int rtype = 0;
1431     int rcode = 0;
1432     int rstatus = 0;
1433     long elapsed = 0;
1434     char *rname = NULL;
1435     char *rport = NULL;
1436 
1437     int ms = luaL_checkinteger(L, 1);
1438     int rate = luaL_checkinteger(L, 2);
1439     int allerr = lua_toboolean(L, 3);
1440     mcp_request_t *rq = luaL_checkudata(L, 4, "mcp.request");
1441     int type = lua_type(L, 5);
1442     if (type == LUA_TUSERDATA) {
1443         mcp_resp_t *rs = luaL_checkudata(L, 5, "mcp.response");
1444         rtype = rs->resp.type;
1445         rcode = rs->resp.code;
1446         rstatus = rs->status;
1447         rname = rs->be_name;
1448         rport = rs->be_port;
1449         elapsed = rs->elapsed;
1450     }
1451     size_t dlen = 0;
1452     const char *detail = luaL_optlstring(L, 6, NULL, &dlen);
1453     int cfd = luaL_optinteger(L, 7, 0);
1454 
1455     bool do_log = false;
1456     if (allerr && rstatus != MCMC_OK) {
1457         do_log = true;
1458     } else if (ms > 0 && elapsed > ms * 1000) {
1459         do_log = true;
1460     } else if (rate > 0) {
1461         // slightly biased random-to-rate without adding a loop, which is
1462         // completely fine for this use case.
1463         uint32_t rnd = (uint64_t)_nextrand(t->proxy_rng) * (uint64_t)rate >> 32;
1464         if (rnd == 0) {
1465             do_log = true;
1466         }
1467     }
1468 
1469     if (do_log) {
1470         logger_log(l, LOGGER_PROXY_REQ, NULL, rq->pr.request, rq->pr.reqlen, elapsed, rtype, rcode, rstatus, cfd, detail, dlen, rname, rport);
1471     }
1472 
1473     return 0;
1474 }
1475 
1476 // TODO: slowsample
1477 // _err versions?
1478 
1479 /*** END lua interface to logger ***/
1480 
proxy_register_defines(lua_State * L)1481 static void proxy_register_defines(lua_State *L) {
1482 #define X(x) \
1483     lua_pushinteger(L, x); \
1484     lua_setfield(L, -2, #x);
1485 #define Y(x, l) \
1486     lua_pushinteger(L, x); \
1487     lua_setfield(L, -2, l);
1488 
1489     X(MCMC_CODE_STORED);
1490     X(MCMC_CODE_EXISTS);
1491     X(MCMC_CODE_DELETED);
1492     X(MCMC_CODE_TOUCHED);
1493     X(MCMC_CODE_VERSION);
1494     X(MCMC_CODE_NOT_FOUND);
1495     X(MCMC_CODE_NOT_STORED);
1496     X(MCMC_CODE_OK);
1497     X(MCMC_CODE_NOP);
1498     X(MCMC_CODE_END);
1499     X(MCMC_CODE_ERROR);
1500     X(MCMC_CODE_CLIENT_ERROR);
1501     X(MCMC_CODE_SERVER_ERROR);
1502     X(MCMC_ERR);
1503     X(P_OK);
1504     X(CMD_ANY);
1505     X(CMD_ANY_STORAGE);
1506     Y(QWAIT_ANY, "WAIT_ANY");
1507     Y(QWAIT_OK, "WAIT_OK");
1508     Y(QWAIT_GOOD, "WAIT_GOOD");
1509     Y(QWAIT_FASTGOOD, "WAIT_FASTGOOD");
1510     Y(RQUEUE_R_GOOD, "RES_GOOD");
1511     Y(RQUEUE_R_OK, "RES_OK");
1512     Y(RQUEUE_R_ANY, "RES_ANY");
1513     CMD_FIELDS
1514 #undef X
1515 #undef Y
1516 
1517     lua_pushboolean(L, 1);
1518     lua_setfield(L, -2, "WAIT_RESUME");
1519 }
1520 
1521 // TODO: low priority malloc error handling.
proxy_register_startarg(lua_State * L)1522 static void proxy_register_startarg(lua_State *L) {
1523     int idx = lua_absindex(L, -1); // remember 'mcp' table.
1524     if (settings.proxy_startarg == NULL) {
1525         // no argument given.
1526         lua_pushboolean(L, 0);
1527         lua_setfield(L, idx, "start_arg");
1528         return;
1529     }
1530 
1531     char *sarg = strdup(settings.proxy_startarg);
1532     if (strchr(sarg, ':') == NULL) {
1533         // just upload the string
1534         lua_pushstring(L, sarg);
1535     } else {
1536         // split into a table and set that instead.
1537         lua_newtable(L);
1538         int nidx = lua_absindex(L, -1);
1539         char *b = NULL;
1540         for (char *p = strtok_r(sarg, ":", &b);
1541                 p != NULL;
1542                 p = strtok_r(NULL, ":", &b)) {
1543             char *e = NULL;
1544             char *name = strtok_r(p, "_", &e);
1545             lua_pushstring(L, name); // table -> key
1546             char *value = strtok_r(NULL, "_", &e);
1547             if (value == NULL) {
1548                 lua_pushboolean(L, 1); // table -> key -> True
1549             } else {
1550                 lua_pushstring(L, value); // table -> key -> value
1551             }
1552             lua_settable(L, nidx);
1553         }
1554     }
1555     free(sarg);
1556     lua_setfield(L, idx, "start_arg");
1557 }
1558 
1559 // Creates and returns the top level "mcp" module
proxy_register_libs(void * ctx,LIBEVENT_THREAD * t,void * state)1560 int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) {
1561     lua_State *L = state;
1562 
1563     const struct luaL_Reg mcplib_backend_m[] = {
1564         {"__gc", mcplib_backend_gc},
1565         {NULL, NULL}
1566     };
1567 
1568     const struct luaL_Reg mcplib_backend_wrap_m[] = {
1569         {"__gc", mcplib_backend_wrap_gc},
1570         {NULL, NULL}
1571     };
1572 
1573     const struct luaL_Reg mcplib_request_m[] = {
1574         {"command", mcplib_request_command},
1575         {"key", mcplib_request_key},
1576         {"ltrimkey", mcplib_request_ltrimkey},
1577         {"rtrimkey", mcplib_request_rtrimkey},
1578         {"token", mcplib_request_token},
1579         {"token_int", mcplib_request_token_int},
1580         {"ntokens", mcplib_request_ntokens},
1581         {"has_flag", mcplib_request_has_flag},
1582         {"flag_token", mcplib_request_flag_token},
1583         {"flag_token_int", mcplib_request_flag_token_int},
1584         {"flag_add", mcplib_request_flag_add},
1585         {"flag_set", mcplib_request_flag_set},
1586         {"flag_replace", mcplib_request_flag_replace},
1587         {"flag_del", mcplib_request_flag_del},
1588         {"match_res", mcplib_request_match_res},
1589         {"__tostring", NULL},
1590         {"__gc", mcplib_request_gc},
1591         {NULL, NULL}
1592     };
1593 
1594     const struct luaL_Reg mcplib_response_m[] = {
1595         {"ok", mcplib_response_ok},
1596         {"hit", mcplib_response_hit},
1597         {"vlen", mcplib_response_vlen},
1598         {"code", mcplib_response_code},
1599         {"line", mcplib_response_line},
1600         {"flag_blank", mcplib_response_flag_blank},
1601         {"elapsed", mcplib_response_elapsed},
1602         {"__gc", mcplib_response_gc},
1603         {"__close", mcplib_response_close},
1604         {"close", mcplib_response_close},
1605         {NULL, NULL}
1606     };
1607 
1608     const struct luaL_Reg mcplib_pool_m[] = {
1609         {"__gc", mcplib_pool_gc},
1610         {NULL, NULL}
1611     };
1612 
1613     const struct luaL_Reg mcplib_pool_proxy_m[] = {
1614         {"__gc", mcplib_pool_proxy_gc},
1615         {NULL, NULL}
1616     };
1617 
1618     const struct luaL_Reg mcplib_ratelim_tbf_m[] = {
1619         {"__call", mcplib_ratelim_tbf_call},
1620         {NULL, NULL}
1621     };
1622 
1623     const struct luaL_Reg mcplib_ratelim_global_tbf_m[] = {
1624         {"__gc", mcplib_ratelim_global_tbf_gc},
1625         {NULL, NULL}
1626     };
1627 
1628     const struct luaL_Reg mcplib_ratelim_proxy_tbf_m[] = {
1629         {"__call", mcplib_ratelim_proxy_tbf_call},
1630         {"__gc", mcplib_ratelim_proxy_tbf_gc},
1631         {NULL, NULL}
1632     };
1633 
1634     const struct luaL_Reg mcplib_rcontext_m[] = {
1635         {"handle_set_cb", mcplib_rcontext_handle_set_cb},
1636         {"enqueue", mcplib_rcontext_enqueue},
1637         {"wait_cond", mcplib_rcontext_wait_cond},
1638         {"enqueue_and_wait", mcplib_rcontext_enqueue_and_wait},
1639         {"wait_handle", mcplib_rcontext_wait_handle},
1640         {"res_good", mcplib_rcontext_res_good},
1641         {"res_ok", mcplib_rcontext_res_ok},
1642         {"res_any", mcplib_rcontext_res_any},
1643         {"result", mcplib_rcontext_result},
1644         {"cfd", mcplib_rcontext_cfd},
1645         {"tls_peer_cn", mcplib_rcontext_tls_peer_cn},
1646         {"request_new", mcplib_rcontext_request_new},
1647         {"response_new", mcplib_rcontext_response_new},
1648         //{"sleep", mcplib_rcontext_sleep}, see comments on function
1649         {NULL, NULL}
1650     };
1651 
1652     const struct luaL_Reg mcplib_funcgen_m[] = {
1653         {"__gc", mcplib_funcgen_gc},
1654         {"new_handle", mcplib_funcgen_new_handle},
1655         {"ready", mcplib_funcgen_ready},
1656         {NULL, NULL}
1657     };
1658 
1659     const struct luaL_Reg mcplib_inspector_m[] = {
1660         {"__gc", mcplib_inspector_gc},
1661         {"__call", mcplib_inspector_call},
1662         {NULL, NULL},
1663     };
1664 
1665     const struct luaL_Reg mcplib_mutator_m[] = {
1666         {"__gc", mcplib_mutator_gc},
1667         {"__call", mcplib_mutator_call},
1668         {NULL, NULL},
1669     };
1670 
1671     const struct luaL_Reg mcplib_f_config [] = {
1672         {"pool", mcplib_pool},
1673         {"backend", mcplib_backend},
1674         {"add_stat", mcplib_add_stat},
1675         {"ratelim_global_tbf", mcplib_ratelim_global_tbf},
1676         {"stat_limit", mcplib_stat_limit},
1677         {"backend_connect_timeout", mcplib_backend_connect_timeout},
1678         {"backend_retry_timeout", mcplib_backend_retry_timeout},
1679         {"backend_retry_waittime", mcplib_backend_retry_waittime},
1680         {"backend_read_timeout", mcplib_backend_read_timeout},
1681         {"backend_failure_limit", mcplib_backend_failure_limit},
1682         {"backend_depth_limit", mcplib_backend_depth_limit},
1683         {"backend_flap_time", mcplib_backend_flap_time},
1684         {"backend_flap_backoff_ramp", mcplib_backend_flap_backoff_ramp},
1685         {"backend_flap_backoff_max", mcplib_backend_flap_backoff_max},
1686         {"backend_use_iothread", mcplib_backend_use_iothread},
1687         {"backend_use_tls", mcplib_backend_use_tls},
1688         {"init_tls", mcplib_init_tls},
1689         {"tcp_keepalive", mcplib_tcp_keepalive},
1690         {"active_req_limit", mcplib_active_req_limit},
1691         {"buffer_memory_limit", mcplib_buffer_memory_limit},
1692         {"schedule_config_reload", mcplib_schedule_config_reload},
1693         {"register_cron", mcplib_register_cron},
1694         {"server_stats", mcplib_server_stats},
1695         {"log", mcplib_ct_log},
1696         {NULL, NULL}
1697     };
1698 
1699     const struct luaL_Reg mcplib_f_routes [] = {
1700         {"internal", mcplib_internal},
1701         {"attach", mcplib_attach},
1702         {"funcgen_new", mcplib_funcgen_new},
1703         {"router_new", mcplib_router_new},
1704         {"log", mcplib_log},
1705         {"log_req", mcplib_log_req},
1706         {"log_reqsample", mcplib_log_reqsample},
1707         {"stat", mcplib_stat},
1708         {"request", mcplib_request},
1709         {"ratelim_tbf", mcplib_ratelim_tbf},
1710         {"req_inspector_new", mcplib_req_inspector_new},
1711         {"res_inspector_new", mcplib_res_inspector_new},
1712         {"req_mutator_new", mcplib_req_mutator_new},
1713         {"res_mutator_new", mcplib_res_mutator_new},
1714         {"time_real_millis", mcplib_time_real_millis},
1715         {"time_mono_millis", mcplib_time_mono_millis},
1716         {NULL, NULL}
1717     };
1718     // VM's have void* extra space in the VM by default for fast-access to a
1719     // context pointer like this. In some cases upvalues are inaccessible (ie;
1720     // GC's) but we still need access to the proxy global context.
1721     void **extra = lua_getextraspace(L);
1722 
1723     if (t != NULL) {
1724         // If thread VM, extra is the libevent thread
1725         *extra = t;
1726         luaL_newmetatable(L, "mcp.request");
1727         lua_pushvalue(L, -1); // duplicate metatable.
1728         lua_setfield(L, -2, "__index"); // mt.__index = mt
1729         luaL_setfuncs(L, mcplib_request_m, 0); // register methods
1730         lua_pop(L, 1);
1731 
1732         luaL_newmetatable(L, "mcp.response");
1733         lua_pushvalue(L, -1); // duplicate metatable.
1734         lua_setfield(L, -2, "__index"); // mt.__index = mt
1735         luaL_setfuncs(L, mcplib_response_m, 0); // register methods
1736         lua_pop(L, 1);
1737 
1738         luaL_newmetatable(L, "mcp.pool_proxy");
1739         lua_pushvalue(L, -1); // duplicate metatable.
1740         lua_setfield(L, -2, "__index"); // mt.__index = mt
1741         luaL_setfuncs(L, mcplib_pool_proxy_m, 0); // register methods
1742         lua_pop(L, 1); // drop the hash selector metatable
1743 
1744         luaL_newmetatable(L, "mcp.ratelim_tbf");
1745         lua_pushvalue(L, -1); // duplicate metatable.
1746         lua_setfield(L, -2, "__index"); // mt.__index = mt
1747         luaL_setfuncs(L, mcplib_ratelim_tbf_m, 0); // register methods
1748         lua_pop(L, 1);
1749 
1750         luaL_newmetatable(L, "mcp.ratelim_proxy_tbf");
1751         lua_pushvalue(L, -1); // duplicate metatable.
1752         lua_setfield(L, -2, "__index"); // mt.__index = mt
1753         luaL_setfuncs(L, mcplib_ratelim_proxy_tbf_m, 0); // register methods
1754         lua_pop(L, 1);
1755 
1756         luaL_newmetatable(L, "mcp.inspector");
1757         lua_pushvalue(L, -1); // duplicate metatable.
1758         lua_setfield(L, -2, "__index"); // mt.__index = mt
1759         luaL_setfuncs(L, mcplib_inspector_m, 0); // register methods
1760         lua_pop(L, 1);
1761 
1762         luaL_newmetatable(L, "mcp.mutator");
1763         lua_pushvalue(L, -1); // duplicate metatable.
1764         lua_setfield(L, -2, "__index"); // mt.__index = mt
1765         luaL_setfuncs(L, mcplib_mutator_m, 0); // register methods
1766         lua_pop(L, 1);
1767 
1768         luaL_newmetatable(L, "mcp.rcontext");
1769         lua_pushvalue(L, -1); // duplicate metatable.
1770         lua_setfield(L, -2, "__index"); // mt.__index = mt
1771         luaL_setfuncs(L, mcplib_rcontext_m, 0); // register methods
1772         lua_pop(L, 1);
1773 
1774         luaL_newmetatable(L, "mcp.funcgen");
1775         lua_pushvalue(L, -1); // duplicate metatable.
1776         lua_setfield(L, -2, "__index"); // mt.__index = mt
1777         luaL_setfuncs(L, mcplib_funcgen_m, 0); // register methods
1778         lua_pop(L, 1);
1779 
1780         // marks a special C-compatible route function.
1781         luaL_newmetatable(L, "mcp.rfunc");
1782         lua_pop(L, 1);
1783 
1784         // function generator userdata.
1785         luaL_newmetatable(L, "mcp.funcgen");
1786         lua_pop(L, 1);
1787 
1788         luaL_newlibtable(L, mcplib_f_routes);
1789     } else {
1790         // Change the extra space override for the configuration VM to just point
1791         // straight to ctx.
1792         *extra = ctx;
1793 
1794         luaL_newmetatable(L, "mcp.backend");
1795         lua_pushvalue(L, -1); // duplicate metatable.
1796         lua_setfield(L, -2, "__index"); // mt.__index = mt
1797         luaL_setfuncs(L, mcplib_backend_m, 0); // register methods
1798         lua_pop(L, 1);
1799 
1800         luaL_newmetatable(L, "mcp.backendwrap");
1801         lua_pushvalue(L, -1); // duplicate metatable.
1802         lua_setfield(L, -2, "__index"); // mt.__index = mt
1803         luaL_setfuncs(L, mcplib_backend_wrap_m, 0); // register methods
1804         lua_pop(L, 1);
1805 
1806         luaL_newmetatable(L, "mcp.pool");
1807         lua_pushvalue(L, -1); // duplicate metatable.
1808         lua_setfield(L, -2, "__index"); // mt.__index = mt
1809         luaL_setfuncs(L, mcplib_pool_m, 0); // register methods
1810         lua_pop(L, 1); // drop the hash selector metatable
1811 
1812         luaL_newmetatable(L, "mcp.ratelim_global_tbf");
1813         lua_pushvalue(L, -1); // duplicate metatable.
1814         lua_setfield(L, -2, "__index"); // mt.__index = mt
1815         luaL_setfuncs(L, mcplib_ratelim_global_tbf_m, 0); // register methods
1816         lua_pop(L, 1);
1817 
1818         luaL_newlibtable(L, mcplib_f_config);
1819     }
1820 
1821     // create main library table.
1822     //luaL_newlib(L, mcplib_f);
1823     // TODO (v2): luaL_newlibtable() just pre-allocs the exact number of things
1824     // here.
1825     // can replace with createtable and add the num. of the constant
1826     // definitions.
1827     proxy_register_defines(L);
1828 
1829     mcplib_open_hash_xxhash(L);
1830     lua_setfield(L, -2, "hash_xxhash");
1831     // hash function for selectors.
1832     // have to wrap the function in a struct because function pointers aren't
1833     // pointer pointers :)
1834     mcplib_open_dist_jump_hash(L);
1835     lua_setfield(L, -2, "dist_jump_hash");
1836     mcplib_open_dist_ring_hash(L);
1837     lua_setfield(L, -2, "dist_ring_hash");
1838 
1839     // create weak table for storing backends by label.
1840     lua_newtable(L); // {}
1841     lua_newtable(L); // {}, {} for metatable
1842     lua_pushstring(L, "v"); // {}, {}, "v" for weak values.
1843     lua_setfield(L, -2, "__mode"); // {}, {__mode = "v"}
1844     lua_setmetatable(L, -2); // {__mt = {__mode = "v"} }
1845 
1846     if (t != NULL) {
1847         luaL_setfuncs(L, mcplib_f_routes, 1); // store upvalues.
1848     } else {
1849         luaL_setfuncs(L, mcplib_f_config, 1); // store upvalues.
1850     }
1851 
1852     // every VM gets a copy of the start arguments to work with.
1853     proxy_register_startarg(L);
1854 
1855     lua_setglobal(L, "mcp"); // set the lib table to mcp global.
1856     return 1;
1857 }
1858