1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 
3 #include "proxy.h"
4 #ifdef TLS
5 #include "tls.h"
6 #endif
7 
8 static mcp_funcgen_t *mcp_funcgen_route(lua_State *L, mcp_funcgen_t *fgen, mcp_parser_t *pr);
9 static int mcp_funcgen_router_cleanup(lua_State *L, mcp_funcgen_t *fgen);
10 static void _mcplib_funcgen_cache(mcp_funcgen_t *fgen, mcp_rcontext_t *rctx);
11 static void mcp_funcgen_cleanup(lua_State *L, mcp_funcgen_t *fgen);
12 static void mcp_resume_rctx_from_cb(mcp_rcontext_t *rctx);
13 static void proxy_return_rqu_cb(io_pending_t *pending);
14 
_mcp_queue_hack(conn * c)15 static inline void _mcp_queue_hack(conn *c) {
16     if (c) {
17         // HACK
18         // see notes above proxy_run_rcontext.
19         // in case the above resume calls queued new work, we have to submit
20         // it to the backend handling system here.
21         for (io_queue_t *q = c->io_queues; q->type != IO_QUEUE_NONE; q++) {
22             if (q->stack_ctx != NULL) {
23                 io_queue_cb_t *qcb = thread_io_queue_get(c->thread, q->type);
24                 qcb->submit_cb(q);
25             }
26         }
27     }
28 }
29 
30 // If we're GC'ed but not closed, it means it was created but never
31 // attached to a function, so ensure everything is closed properly.
mcplib_funcgen_gc(lua_State * L)32 int mcplib_funcgen_gc(lua_State *L) {
33     mcp_funcgen_t *fgen = luaL_checkudata(L, -1, "mcp.funcgen");
34     if (fgen->closed) {
35         return 0;
36     }
37     assert(fgen->self_ref == 0);
38 
39     mcp_funcgen_cleanup(L, fgen);
40     fgen->closed = true;
41     return 0;
42 }
43 
44 // handler for *_wait_*() variants and sleep calls
mcp_funcgen_wait_handler(const int fd,const short which,void * arg)45 static void mcp_funcgen_wait_handler(const int fd, const short which, void *arg) {
46     mcp_rcontext_t *rctx = arg;
47 
48     // if we were in waiting: reset wait mode, push wait_done + boolean true
49     // if we were in sleep: reset wait mode.
50     // immediately resume.
51     lua_settop(rctx->Lc, 0);
52     rctx->wait_count = 0;
53     rctx->lua_narg = 2;
54     if (rctx->wait_mode == QWAIT_HANDLE) {
55         // if timed out then we shouldn't have a result. just push nil.
56         lua_pushnil(rctx->Lc);
57     } else if (rctx->wait_mode == QWAIT_SLEEP) {
58         // no extra arg.
59         rctx->lua_narg = 1;
60     } else {
61         // how many results were processed
62         lua_pushinteger(rctx->Lc, rctx->wait_done);
63     }
64     // "timed out"
65     lua_pushboolean(rctx->Lc, 1);
66 
67     rctx->wait_mode = QWAIT_IDLE;
68 
69     mcp_resume_rctx_from_cb(rctx);
70 }
71 
72 // For describing functions which generate functions which can execute
73 // requests.
74 // These "generator functions" handle pre-allocating and creating a memory
75 // heirarchy, allowing dynamic runtimes at high speed.
76 
77 // must be called with fgen on top of stack in fgen->thread->L
mcp_rcontext_cleanup(lua_State * L,mcp_funcgen_t * fgen,mcp_rcontext_t * rctx,int fgen_idx)78 static void mcp_rcontext_cleanup(lua_State *L, mcp_funcgen_t *fgen, mcp_rcontext_t *rctx, int fgen_idx) {
79     luaL_unref(L, LUA_REGISTRYINDEX, rctx->coroutine_ref);
80     luaL_unref(L, LUA_REGISTRYINDEX, rctx->function_ref);
81     if (rctx->request_ref) {
82         luaL_unref(L, LUA_REGISTRYINDEX, rctx->request_ref);
83     }
84     assert(rctx->pending_reqs == 0);
85 
86     // cleanup of request queue entries. recurse funcgen cleanup.
87     for (int x = 0; x < fgen->max_queues; x++) {
88         struct mcp_rqueue_s *rqu = &rctx->qslots[x];
89         if (rqu->obj_type == RQUEUE_TYPE_POOL) {
90             // nothing to do.
91         } else if (rqu->obj_type == RQUEUE_TYPE_FGEN) {
92             // don't need to recurse, just free the subrctx.
93             mcp_rcontext_t *subrctx = rqu->obj;
94             lua_rawgeti(L, LUA_REGISTRYINDEX, subrctx->fgen->self_ref);
95             mcp_rcontext_cleanup(L, subrctx->fgen, subrctx, lua_absindex(L, -1));
96             lua_pop(L, 1); // drop subrctx fgen
97         } else if (rqu->obj_type != RQUEUE_TYPE_NONE) {
98             assert(1 == 0);
99         }
100 
101         if (rqu->res_ref) {
102             luaL_unref(L, LUA_REGISTRYINDEX, rqu->res_ref);
103             rqu->res_ref = 0;
104         }
105 
106         if (rqu->cb_ref) {
107             luaL_unref(L, LUA_REGISTRYINDEX, rqu->cb_ref);
108             rqu->cb_ref = 0;
109         }
110     }
111 
112     // nuke alarm if set.
113     // should only be paranoia here, but just in case.
114     if (event_pending(&rctx->timeout_event, EV_TIMEOUT, NULL)) {
115         event_del(&rctx->timeout_event);
116     }
117 
118     lua_getiuservalue(L, fgen_idx, 1);
119     luaL_unref(L, -1, rctx->self_ref);
120     rctx->self_ref = 0;
121     lua_pop(L, 1); // drop freelist table
122 
123     fgen->total--;
124     LIBEVENT_THREAD *t = PROXY_GET_THR(L);
125     // Fake an allocation when we free slots as they are long running data.
126     // This tricks the GC into running and freeing them.
127     t->proxy_vm_extra_kb += 2;
128     mcp_sharedvm_delta(t->proxy_ctx, SHAREDVM_FGENSLOT_IDX, fgen->name, -1);
129 }
130 
131 // TODO: switch from an array to a STAILQ so we can avoid the memory
132 // management and error handling.
133 // Realistically it's impossible for these to error so we're safe for now.
134 #ifdef MEMCACHED_DEBUG
135 // require fewer test rounds for unit tests.
136 #define FGEN_FREE_PRESSURE_MAX 100
137 #define FGEN_FREE_PRESSURE_DROP 10
138 #define FGEN_FREE_WAIT 0
139 #else
140 #define FGEN_FREE_PRESSURE_MAX 5000
141 #define FGEN_FREE_PRESSURE_DROP 200
142 #define FGEN_FREE_WAIT 60 // seconds.
143 #endif
_mcplib_funcgen_cache(mcp_funcgen_t * fgen,mcp_rcontext_t * rctx)144 static void _mcplib_funcgen_cache(mcp_funcgen_t *fgen, mcp_rcontext_t *rctx) {
145     bool do_cache = true;
146     // Easing algorithm to decide when to "early free" rctx slots:
147     // - If we recently allocated a slot, reset pressure.
148     // - Each time an rctx is freed and more than half of available rctx's are
149     // free, increase pressure.
150     // - If free rctx are less than half of total, reduce pressure.
151     // - If pressure is too high, immediately free the rctx, then drop the
152     // pressure slightly.
153     // - If pressure is too high, and has been for more than FGEN_FREE_WAIT
154     // seconds, immediately free the rctx, then drop the pressure slightly.
155     //
156     // This should allow bursty traffic to avoid spinning on alloc/frees,
157     // while one-time bursts will slowly free slots back down to a min of 1.
158     if (fgen->free > fgen->total/2 - 1) {
159         if (fgen->free_pressure++ > FGEN_FREE_PRESSURE_MAX) {
160             struct timespec now;
161             clock_gettime(CLOCK_REALTIME, &now);
162             if (fgen->free_waiter.tv_sec == 0) {
163                 fgen->free_waiter.tv_sec = now.tv_sec + FGEN_FREE_WAIT;
164             }
165 
166             if (now.tv_sec >= fgen->free_waiter.tv_sec) {
167                 do_cache = false;
168             }
169             // check again in a little while.
170             fgen->free_pressure -= FGEN_FREE_PRESSURE_DROP;
171         }
172     } else {
173         fgen->free_pressure >>= 1;
174         // must be too-free for a full wait period before releasing.
175         fgen->free_waiter.tv_sec = 0;
176     }
177 
178     if (do_cache) {
179         if (fgen->free + 1 >= fgen->free_max) {
180             int x = fgen->free_max;
181             fgen->free_max *= 2;
182             fgen->list = realloc(fgen->list, fgen->free_max * sizeof(mcp_rcontext_t *));
183             for (; x < fgen->free_max; x++) {
184                 fgen->list[x] = NULL;
185             }
186         }
187         fgen->list[fgen->free] = rctx;
188         fgen->free++;
189     } else {
190         // do not cache the rctx
191         assert(fgen->self_ref);
192         lua_State *L = fgen->thread->L;
193         lua_rawgeti(L, LUA_REGISTRYINDEX, fgen->self_ref);
194         mcp_rcontext_cleanup(L, fgen, rctx, lua_absindex(L, -1));
195         lua_pop(L, 1); // drop fgen
196     }
197 
198     // we're closed and every outstanding request slot has been
199     // returned.
200     if (fgen->closed && fgen->free == fgen->total) {
201         mcp_funcgen_cleanup(fgen->thread->L, fgen);
202     }
203 }
204 
205 // call with stack: mcp.funcgen -2, function -1
_mcplib_funcgen_gencall(lua_State * L)206 static int _mcplib_funcgen_gencall(lua_State *L) {
207     mcp_funcgen_t *fgen = luaL_checkudata(L, -2, "mcp.funcgen");
208     int fgen_idx = lua_absindex(L, -2);
209     // create the ctx object.
210     size_t rctx_len = sizeof(mcp_rcontext_t) + sizeof(struct mcp_rqueue_s) * fgen->max_queues;
211     mcp_rcontext_t *rc = lua_newuserdatauv(L, rctx_len, 0);
212     memset(rc, 0, rctx_len);
213 
214     luaL_getmetatable(L, "mcp.rcontext");
215     lua_setmetatable(L, -2);
216     // allow the rctx to reference the function generator.
217     rc->fgen = fgen;
218     rc->lua_narg = 1;
219 
220     // initialize the queue slots based on the fgen parent
221     for (int x = 0; x < fgen->max_queues; x++) {
222         struct mcp_rqueue_s *frqu = &fgen->queue_list[x];
223         struct mcp_rqueue_s *rqu = &rc->qslots[x];
224         rqu->obj_type = frqu->obj_type;
225         if (frqu->obj_type == RQUEUE_TYPE_POOL) {
226             rqu->obj_ref = 0;
227             rqu->obj = frqu->obj;
228             mcp_resp_t *r = mcp_prep_bare_resobj(L, fgen->thread);
229             rqu->res_ref = luaL_ref(L, LUA_REGISTRYINDEX);
230             rqu->res_obj = r;
231         } else if (frqu->obj_type == RQUEUE_TYPE_FGEN) {
232             // owner funcgen already holds the subfgen reference, so here we're just
233             // grabbing a subrctx to pin into the slot.
234             mcp_funcgen_t *fg = frqu->obj;
235             mcp_rcontext_t *subrctx = mcp_funcgen_get_rctx(L, fg->self_ref, fg);
236             if (subrctx == NULL) {
237                 proxy_lua_error(L, "failed to generate request slot during queue_assign()");
238             }
239 
240             // if this rctx ever had a request object assigned to it, we can get
241             // rid of it. we're pinning the subrctx in here and don't want
242             // to waste memory.
243             if (subrctx->request_ref) {
244                 luaL_unref(L, LUA_REGISTRYINDEX, subrctx->request_ref);
245                 subrctx->request_ref = 0;
246                 subrctx->request = NULL;
247             }
248 
249             // link the new rctx into this chain; we'll hold onto it until the
250             // parent de-allocates.
251             subrctx->parent = rc;
252             subrctx->parent_handle = x;
253             rqu->obj = subrctx;
254         }
255     }
256 
257     // copy the rcontext reference
258     lua_pushvalue(L, -1);
259 
260     // issue a rotation so one rcontext is now below genfunc, and one rcontext
261     // is on the top.
262     // right shift: gf, rc1, rc2 -> rc2, gf, rc1
263     lua_rotate(L, -3, 1);
264 
265     // current stack should be func, mcp.rcontext.
266     int call_argnum = 1;
267     // stack will be func, rctx, arg if there is an arg.
268     if (fgen->argument_ref) {
269         lua_rawgeti(L, LUA_REGISTRYINDEX, fgen->argument_ref);
270         call_argnum++;
271     }
272 
273     // can throw an error upstream.
274     lua_call(L, call_argnum, 1);
275 
276     // we should have a top level function as a result.
277     if (!lua_isfunction(L, -1)) {
278         proxy_lua_error(L, "function generator didn't return a function");
279         return 0;
280     }
281     // can't fail past this point.
282 
283     // pop the returned function.
284     rc->function_ref = luaL_ref(L, LUA_REGISTRYINDEX);
285 
286     // link the rcontext into the function generator.
287     fgen->total++;
288 
289     lua_getiuservalue(L, fgen_idx, 1); // get the reference table.
290     // rc, t -> t, rc
291     lua_rotate(L, -2, 1);
292     rc->self_ref = luaL_ref(L, -2); // pop rcontext
293     lua_pop(L, 1); // pop ref table.
294 
295     _mcplib_funcgen_cache(fgen, rc);
296 
297     // associate a coroutine thread with this context.
298     rc->Lc = lua_newthread(L);
299     assert(rc->Lc);
300     rc->coroutine_ref = luaL_ref(L, LUA_REGISTRYINDEX);
301 
302     // increment the slot counter
303     LIBEVENT_THREAD *t = PROXY_GET_THR(L);
304     mcp_sharedvm_delta(t->proxy_ctx, SHAREDVM_FGENSLOT_IDX, fgen->name, 1);
305 
306     event_assign(&rc->timeout_event, t->base, -1, EV_TIMEOUT, mcp_funcgen_wait_handler, rc);
307 
308     // return the fgen.
309     // FIXME: just return 0? need to adjust caller to not mis-ref the
310     // generator function.
311     return 1;
312 }
313 
_mcp_funcgen_return_rctx(mcp_rcontext_t * rctx)314 static void _mcp_funcgen_return_rctx(mcp_rcontext_t *rctx) {
315     mcp_funcgen_t *fgen = rctx->fgen;
316     assert(rctx->pending_reqs == 0);
317     int res = lua_resetthread(rctx->Lc);
318     if (res != LUA_OK) {
319         // TODO: I was under the impression it was possible to reuse a
320         // coroutine from an error state, but it seems like this only works if
321         // the routine landed in LUA_YIELD or LUA_OK
322         // Leaving a note here to triple check this or if my memory was wrong.
323         // Instead for now we throw away the coroutine if it was involved in
324         // an error. Realistically this shouldn't be normal so it might not
325         // matter anyway.
326         lua_State *L = fgen->thread->L;
327         luaL_unref(L, LUA_REGISTRYINDEX, rctx->coroutine_ref);
328         rctx->Lc = lua_newthread(L);
329         assert(rctx->Lc);
330         rctx->coroutine_ref = luaL_ref(L, LUA_REGISTRYINDEX);
331     } else {
332         lua_settop(rctx->Lc, 0);
333     }
334     rctx->wait_mode = QWAIT_IDLE;
335     rctx->resp = NULL;
336     rctx->first_queue = false; // HACK
337     if (rctx->request) {
338         mcp_request_cleanup(fgen->thread, rctx->request);
339     }
340 
341     // nuke alarm if set.
342     if (event_pending(&rctx->timeout_event, EV_TIMEOUT, NULL)) {
343         event_del(&rctx->timeout_event);
344     }
345 
346     // reset each rqu.
347     for (int x = 0; x < fgen->max_queues; x++) {
348         struct mcp_rqueue_s *rqu = &rctx->qslots[x];
349         if (rqu->res_ref) {
350             if (rqu->res_obj) {
351                 // using a persistent object.
352                 mcp_response_cleanup(fgen->thread, rqu->res_obj);
353             } else {
354                 // temporary error object
355                 luaL_unref(rctx->Lc, LUA_REGISTRYINDEX, rqu->res_ref);
356                 rqu->res_ref = 0;
357             }
358         }
359         if (rqu->req_ref) {
360             luaL_unref(rctx->Lc, LUA_REGISTRYINDEX, rqu->req_ref);
361             rqu->req_ref = 0;
362         }
363         assert(rqu->state != RQUEUE_ACTIVE);
364         rqu->state = RQUEUE_IDLE;
365         rqu->flags = 0;
366         rqu->rq = NULL;
367         if (rqu->obj_type == RQUEUE_TYPE_FGEN) {
368             _mcp_funcgen_return_rctx(rqu->obj);
369         }
370     }
371 }
372 
373 // TODO: check rctx->awaiting before returning?
374 // TODO: separate the "cleanup" portion from the "Return to cache" portion, so
375 // we can call that directly for subrctx's
mcp_funcgen_return_rctx(mcp_rcontext_t * rctx)376 void mcp_funcgen_return_rctx(mcp_rcontext_t *rctx) {
377     mcp_funcgen_t *fgen = rctx->fgen;
378     if (rctx->pending_reqs != 0) {
379         // not ready to return to cache yet.
380         return;
381     }
382     if (rctx->parent) {
383         // Important: we need to hold the parent request reference until this
384         // subrctx is fully depleted of outstanding requests itself.
385         rctx->parent->pending_reqs--;
386         assert(rctx->parent->pending_reqs > -1);
387         if (rctx->parent->pending_reqs == 0) {
388             mcp_funcgen_return_rctx(rctx->parent);
389         }
390         return;
391     }
392     WSTAT_DECR(rctx->fgen->thread, proxy_req_active, 1);
393     _mcp_funcgen_return_rctx(rctx);
394     _mcplib_funcgen_cache(fgen, rctx);
395 }
396 
mcp_funcgen_get_rctx(lua_State * L,int fgen_ref,mcp_funcgen_t * fgen)397 mcp_rcontext_t *mcp_funcgen_get_rctx(lua_State *L, int fgen_ref, mcp_funcgen_t *fgen) {
398     mcp_rcontext_t *rctx = NULL;
399     // nothing left in slot cache, generate a new function.
400     if (fgen->free == 0) {
401         // reset free pressure so we try to keep the rctx cached
402         fgen->free_pressure = 0;
403         fgen->free_waiter.tv_sec = 0;
404         // TODO (perf): pre-create this c closure somewhere hidden.
405         lua_pushcclosure(L, _mcplib_funcgen_gencall, 0);
406         // pull in the funcgen object
407         lua_rawgeti(L, LUA_REGISTRYINDEX, fgen_ref);
408         // then generator function
409         lua_rawgeti(L, LUA_REGISTRYINDEX, fgen->generator_ref);
410         // then generate a new function slot.
411         int res = lua_pcall(L, 2, 1, 0);
412         if (res != LUA_OK) {
413             LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_ERROR, NULL, lua_tostring(L, -1));
414             lua_settop(L, 0);
415             return NULL;
416         }
417         lua_pop(L, 1); // drop the extra funcgen
418     } else {
419         P_DEBUG("%s: serving from cache\n", __func__);
420     }
421 
422     rctx = fgen->list[fgen->free-1];
423     fgen->list[fgen->free-1] = NULL;
424     fgen->free--;
425 
426     // on non-error, return the response object upward.
427     return rctx;
428 }
429 
mcp_funcgen_start(lua_State * L,mcp_funcgen_t * fgen,mcp_parser_t * pr)430 mcp_rcontext_t *mcp_funcgen_start(lua_State *L, mcp_funcgen_t *fgen, mcp_parser_t *pr) {
431     if (fgen->is_router) {
432         fgen = mcp_funcgen_route(L, fgen, pr);
433         if (fgen == NULL) {
434             return NULL;
435         }
436     }
437     // fgen->self_ref must be valid because we cannot start a function that
438     // hasn't been referenced anywhere.
439     mcp_rcontext_t *rctx = mcp_funcgen_get_rctx(L, fgen->self_ref, fgen);
440 
441     if (rctx == NULL) {
442         return NULL;
443     }
444 
445     // only top level rctx's can have a request object assigned to them.
446     // so we create them late here, in the start function.
447     // Note that we can _technically_ fail with an OOM here, but we've not set
448     // up lua in a way that OOM's are possible.
449     if (rctx->request_ref == 0) {
450         mcp_request_t *rq = lua_newuserdatauv(L, sizeof(mcp_request_t) + MCP_REQUEST_MAXLEN + KEY_MAX_LENGTH, 0);
451         memset(rq, 0, sizeof(mcp_request_t));
452         luaL_getmetatable(L, "mcp.request");
453         lua_setmetatable(L, -2);
454 
455         rctx->request_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pop the request
456         rctx->request = rq;
457     }
458 
459     // TODO: could probably move a few more lines from proto_proxy into here,
460     // but that's splitting hairs.
461     WSTAT_INCR(fgen->thread, proxy_req_active, 1);
462     return rctx;
463 }
464 
465 // calling either with self_ref set, or with fgen in stack -1 (ie; from GC
466 // function without ever being attached to anything)
mcp_funcgen_cleanup(lua_State * L,mcp_funcgen_t * fgen)467 static void mcp_funcgen_cleanup(lua_State *L, mcp_funcgen_t *fgen) {
468     int fgen_idx = 0;
469     lua_checkstack(L, 5); // paranoia. this can recurse from a router.
470     // pull the fgen into the stack.
471     if (fgen->self_ref) {
472         // pull self onto the stack and hold until the end of the func.
473         lua_rawgeti(L, LUA_REGISTRYINDEX, fgen->self_ref);
474         fgen_idx = lua_absindex(L, -1); // remember fgen offset
475         // remove the C reference to the fgen
476         luaL_unref(L, LUA_REGISTRYINDEX, fgen->self_ref);
477         fgen->self_ref = 0;
478     } else if (fgen->closed) {
479         // we've already cleaned up, probably redundant call from _gc()
480         return;
481     } else {
482         // not closed, no self-ref, so must be unattached and coming from GC
483         fgen_idx = lua_absindex(L, -1);
484     }
485 
486     if (fgen->is_router) {
487         // we're actually a "router", send this out for cleanup.
488         mcp_funcgen_router_cleanup(L, fgen);
489     }
490 
491     // decrement the slot counter
492     LIBEVENT_THREAD *t = PROXY_GET_THR(L);
493     mcp_sharedvm_delta(t->proxy_ctx, SHAREDVM_FGEN_IDX, fgen->name, -1);
494 
495     // Walk every request context and issue cleanup.
496     for (int x = 0; x < fgen->free_max; x++) {
497         mcp_rcontext_t *rctx = fgen->list[x];
498         if (rctx == NULL) {
499             continue;
500         }
501         mcp_rcontext_cleanup(L, fgen, rctx, fgen_idx);
502     }
503 
504     if (fgen->argument_ref) {
505         luaL_unref(L, LUA_REGISTRYINDEX, fgen->argument_ref);
506         fgen->argument_ref = 0;
507     }
508 
509     if (fgen->generator_ref) {
510         luaL_unref(L, LUA_REGISTRYINDEX, fgen->generator_ref);
511         fgen->generator_ref = 0;
512     }
513 
514     if (fgen->queue_list) {
515         for (int x = 0; x < fgen->max_queues; x++) {
516             struct mcp_rqueue_s *rqu = &fgen->queue_list[x];
517             if (rqu->obj_type == RQUEUE_TYPE_POOL) {
518                 // just the obj_ref
519                 luaL_unref(L, LUA_REGISTRYINDEX, rqu->obj_ref);
520             } else if (rqu->obj_type == RQUEUE_TYPE_FGEN) {
521                 // don't need to recurse, just deref.
522                 mcp_funcgen_t *subfgen = rqu->obj;
523                 mcp_funcgen_dereference(L, subfgen);
524             } else if (rqu->obj_type != RQUEUE_TYPE_NONE) {
525                 assert(1 == 0);
526             }
527         }
528         free(fgen->queue_list);
529     }
530 
531     free(fgen->list);
532     fgen->list = NULL;
533     lua_pop(L, 1); // drop funcgen reference
534 }
535 
536 // Must be called with the function generator at on top of stack
537 // Pops the value from the stack.
mcp_funcgen_reference(lua_State * L)538 void mcp_funcgen_reference(lua_State *L) {
539     mcp_funcgen_t *fgen = luaL_checkudata(L, -1, "mcp.funcgen");
540     if (fgen->self_ref) {
541         fgen->refcount++;
542         lua_pop(L, 1); // ensure we drop the extra value.
543     } else {
544         fgen->self_ref = luaL_ref(L, LUA_REGISTRYINDEX);
545         fgen->refcount = 1;
546     }
547     P_DEBUG("%s: funcgen referenced: %d\n", __func__, fgen->refcount);
548 }
549 
mcp_funcgen_dereference(lua_State * L,mcp_funcgen_t * fgen)550 void mcp_funcgen_dereference(lua_State *L, mcp_funcgen_t *fgen) {
551     assert(fgen->refcount > 0);
552     fgen->refcount--;
553     P_DEBUG("%s: funcgen dereferenced: %d\n", __func__, fgen->refcount);
554     if (fgen->refcount == 0) {
555         fgen->closed = true;
556 
557         P_DEBUG("%s: funcgen cleaning up\n", __func__);
558         if (fgen->free == fgen->total) {
559             mcp_funcgen_cleanup(L, fgen);
560         }
561     }
562 }
563 
564 // All we need to do here is copy the function reference we've stashed into
565 // the C closure's upvalue and return it.
_mcplib_funcgenbare_generator(lua_State * L)566 static int _mcplib_funcgenbare_generator(lua_State *L) {
567     lua_pushvalue(L, lua_upvalueindex(1));
568     return 1;
569 }
570 
571 // helper function to create a function generator with a "default" function.
572 // the function passed in here is a standard 'function(r) etc end' prototype,
573 // which we want to always return instead of calling a real generator
574 // function.
mcplib_funcgenbare_new(lua_State * L)575 int mcplib_funcgenbare_new(lua_State *L) {
576     if (!lua_isfunction(L, -1)) {
577         proxy_lua_error(L, "Must pass a function to mcp.funcgenbare_new");
578         return 0;
579     }
580 
581     // Pops the function into the upvalue of this C closure function.
582     lua_pushcclosure(L, _mcplib_funcgenbare_generator, 1);
583     // FIXME: not urgent, but this function chain isn't stack balanced, and its caller has
584     // to drop an extra reference.
585     // Need to re-audit and decide if we still need this pushvalue here or if
586     // we can drop the pop from the caller and leave this function balanced.
587     lua_pushvalue(L, -1);
588     int gen_ref = luaL_ref(L, LUA_REGISTRYINDEX);
589 
590     // Pass our fakeish generator function down the line.
591     mcplib_funcgen_new(L);
592 
593     mcp_funcgen_t *fgen = lua_touserdata(L, -1);
594     strncpy(fgen->name, "anonymous", FGEN_NAME_MAXLEN);
595     mcp_sharedvm_delta(fgen->thread->proxy_ctx, SHAREDVM_FGEN_IDX, fgen->name, 1);
596 
597     fgen->generator_ref = gen_ref;
598     fgen->ready = true;
599     return 1;
600 }
601 
602 #define FGEN_DEFAULT_FREELIST_SIZE 8
mcplib_funcgen_new(lua_State * L)603 int mcplib_funcgen_new(lua_State *L) {
604     LIBEVENT_THREAD *t = PROXY_GET_THR(L);
605 
606     mcp_funcgen_t *fgen = lua_newuserdatauv(L, sizeof(mcp_funcgen_t), 2);
607     memset(fgen, 0, sizeof(mcp_funcgen_t));
608     fgen->thread = t;
609     fgen->free_max = FGEN_DEFAULT_FREELIST_SIZE;
610     fgen->list = calloc(fgen->free_max, sizeof(mcp_rcontext_t *));
611 
612     luaL_getmetatable(L, "mcp.funcgen");
613     lua_setmetatable(L, -2);
614 
615     // the table we will use to hold references to rctx's
616     lua_createtable(L, 8, 0);
617     // set our table into the uservalue 1 of fgen (idx -2)
618     // pops the table.
619     lua_setiuservalue(L, -2, 1);
620 
621     return 1;
622 }
623 
mcplib_funcgen_new_handle(lua_State * L)624 int mcplib_funcgen_new_handle(lua_State *L) {
625     mcp_funcgen_t *fgen = lua_touserdata(L, 1);
626     mcp_pool_proxy_t *pp = NULL;
627     mcp_funcgen_t *fg = NULL;
628 
629     if (fgen->ready) {
630         proxy_lua_error(L, "cannot modify function generator after calling ready");
631         return 0;
632     }
633 
634     if ((pp = luaL_testudata(L, 2, "mcp.pool_proxy")) != NULL) {
635         // good.
636     } else if ((fg = luaL_testudata(L, 2, "mcp.funcgen")) != NULL) {
637         if (fg->is_router) {
638             proxy_lua_error(L, "cannot assign a router to a handle in new_handle");
639             return 0;
640         }
641         if (fg->closed) {
642             proxy_lua_error(L, "cannot use a replaced function in new_handle");
643             return 0;
644         }
645     } else {
646         proxy_lua_error(L, "invalid argument to new_handle");
647         return 0;
648     }
649 
650     fgen->max_queues++;
651     if (fgen->queue_list == NULL) {
652         fgen->queue_list = malloc(sizeof(struct mcp_rqueue_s));
653     } else {
654         fgen->queue_list = realloc(fgen->queue_list, fgen->max_queues * sizeof(struct mcp_rqueue_s));
655     }
656     if (fgen->queue_list == NULL) {
657         proxy_lua_error(L, "failed to realloc queue list during new_handle()");
658         return 0;
659     }
660 
661     struct mcp_rqueue_s *rqu = &fgen->queue_list[fgen->max_queues-1];
662     memset(rqu, 0, sizeof(*rqu));
663 
664     if (pp) {
665         // pops pp from the stack
666         rqu->obj_ref = luaL_ref(L, LUA_REGISTRYINDEX);
667         rqu->obj_type = RQUEUE_TYPE_POOL;
668         rqu->obj = pp;
669     } else {
670         // pops the fgen from the stack.
671         mcp_funcgen_reference(L);
672         rqu->obj_type = RQUEUE_TYPE_FGEN;
673         rqu->obj = fg;
674     }
675 
676     lua_pushinteger(L, fgen->max_queues-1);
677     return 1;
678 }
679 
mcplib_funcgen_ready(lua_State * L)680 int mcplib_funcgen_ready(lua_State *L) {
681     mcp_funcgen_t *fgen = lua_touserdata(L, 1);
682     luaL_checktype(L, 2, LUA_TTABLE);
683 
684     if (fgen->ready) {
685         proxy_lua_error(L, "cannot modify function generator after calling ready");
686         return 0;
687     }
688 
689     if (lua_getfield(L, 2, "f") != LUA_TFUNCTION) {
690         proxy_lua_error(L, "Must specify generator function ('f') to fgen:ready");
691         return 0;
692     }
693     fgen->generator_ref = luaL_ref(L, LUA_REGISTRYINDEX);
694 
695     if (lua_getfield(L, 2, "a") != LUA_TNIL) {
696         fgen->argument_ref = luaL_ref(L, LUA_REGISTRYINDEX);
697     } else {
698         lua_pop(L, 1);
699     }
700 
701     if (lua_getfield(L, 2, "n") == LUA_TSTRING) {
702         size_t len = 0;
703         const char *name = lua_tolstring(L, -1, &len);
704         strncpy(fgen->name, name, FGEN_NAME_MAXLEN);
705     } else {
706         strncpy(fgen->name, "anonymous", FGEN_NAME_MAXLEN);
707         lua_pop(L, 1);
708     }
709 
710     // now we test the generator function and create the first slot.
711     lua_pushvalue(L, 1); // copy the funcgen to pass into gencall
712     lua_rawgeti(L, LUA_REGISTRYINDEX, fgen->generator_ref); // for gencall
713     _mcplib_funcgen_gencall(L);
714     lua_pop(L, 1); // drop extra funcgen ref.
715 
716     // add us to the global state
717     mcp_sharedvm_delta(fgen->thread->proxy_ctx, SHAREDVM_FGEN_IDX, fgen->name, 1);
718 
719     fgen->ready = true;
720     return 1;
721 }
722 
723 // Handlers for request contexts
724 
mcplib_rcontext_handle_set_cb(lua_State * L)725 int mcplib_rcontext_handle_set_cb(lua_State *L) {
726     mcp_rcontext_t *rctx = lua_touserdata(L, 1);
727     luaL_checktype(L, 2, LUA_TNUMBER);
728     luaL_checktype(L, 3, LUA_TFUNCTION);
729 
730     int handle = lua_tointeger(L, 2);
731     if (handle < 0 || handle >= rctx->fgen->max_queues) {
732         proxy_lua_error(L, "invalid handle passed to queue_set_cb");
733         return 0;
734     }
735 
736     struct mcp_rqueue_s *rqu = &rctx->qslots[handle];
737     if (rqu->cb_ref) {
738         luaL_unref(L, LUA_REGISTRYINDEX, rqu->cb_ref);
739     }
740     rqu->cb_ref = luaL_ref(L, LUA_REGISTRYINDEX);
741 
742     return 0;
743 }
744 
745 // call with request object on top of stack.
746 // pops the request object
747 // FIXME: callers are doing a pushvalue(L, 2) and then in here we're also
748 // pushvalue(L, 2)
749 // Think this should just document as needing the request object top of stack
750 // and xmove without the extra push bits.
_mcplib_rcontext_queue(lua_State * L,mcp_rcontext_t * rctx,mcp_request_t * rq,int handle)751 static void _mcplib_rcontext_queue(lua_State *L, mcp_rcontext_t *rctx, mcp_request_t *rq, int handle) {
752     if (handle < 0 || handle >= rctx->fgen->max_queues) {
753         proxy_lua_error(L, "attempted to enqueue an invalid handle");
754         return;
755     }
756     struct mcp_rqueue_s *rqu = &rctx->qslots[handle];
757 
758     if (rqu->state != RQUEUE_IDLE) {
759         lua_pop(L, 1);
760         return;
761     }
762 
763     // If we're queueing to an fgen, arm the coroutine while we have the
764     // objects handy. Else this requires roundtripping a luaL_ref/luaL_unref
765     // later.
766     if (rqu->obj_type == RQUEUE_TYPE_FGEN) {
767         mcp_rcontext_t *subrctx = rqu->obj;
768         lua_pushvalue(L, 2); // duplicate the request obj
769         lua_rawgeti(subrctx->Lc, LUA_REGISTRYINDEX, subrctx->function_ref);
770         lua_xmove(L, subrctx->Lc, 1); // move the requet object.
771         subrctx->pending_reqs++;
772     }
773 
774     // hold the request reference.
775     rqu->req_ref = luaL_ref(L, LUA_REGISTRYINDEX);
776 
777     rqu->state = RQUEUE_QUEUED;
778     rqu->rq = rq;
779 }
780 
781 // first arg is rcontext
782 // then a request object
783 // then either a handle (integer) or array style table of handles
mcplib_rcontext_enqueue(lua_State * L)784 int mcplib_rcontext_enqueue(lua_State *L) {
785     mcp_rcontext_t *rctx = lua_touserdata(L, 1);
786     mcp_request_t *rq = luaL_checkudata(L, 2, "mcp.request");
787 
788     if (rctx->wait_mode != QWAIT_IDLE) {
789         proxy_lua_error(L, "enqueue: cannot enqueue new requests while in a wait");
790         return 0;
791     }
792 
793     if (!rq->pr.keytoken) {
794         proxy_lua_error(L, "cannot queue requests without a key");
795         return 0;
796     }
797 
798     int type = lua_type(L, 3);
799     if (type == LUA_TNUMBER) {
800         int handle = lua_tointeger(L, 3);
801 
802         lua_pushvalue(L, 2);
803         _mcplib_rcontext_queue(L, rctx, rq, handle);
804     } else if (type == LUA_TTABLE) {
805         unsigned int len = lua_rawlen(L, 3);
806         for (int x = 0; x < len; x++) {
807             type = lua_rawgeti(L, 3, x+1);
808             if (type != LUA_TNUMBER) {
809                 proxy_lua_error(L, "invalid handle passed to queue via array table");
810                 return 0;
811             }
812 
813             int handle = lua_tointeger(L, 4);
814             lua_pop(L, 1);
815 
816             lua_pushvalue(L, 2);
817             _mcplib_rcontext_queue(L, rctx, rq, handle);
818         }
819     } else {
820         proxy_lua_error(L, "must pass a handle or a table to queue");
821         return 0;
822     }
823 
824     return 0;
825 }
826 
827 // TODO: pre-generate a result object into sub-rctx's that we can pull up for
828 // this, instead of allocating outside of a protected call.
_mcp_resume_rctx_process_error(mcp_rcontext_t * rctx,struct mcp_rqueue_s * rqu)829 static void _mcp_resume_rctx_process_error(mcp_rcontext_t *rctx, struct mcp_rqueue_s *rqu) {
830     // we have an error. need to mark the error into the parent rqu
831     rqu->flags |= RQUEUE_R_ERROR|RQUEUE_R_ANY;
832     mcp_resp_t *r = mcp_prep_bare_resobj(rctx->Lc, rctx->fgen->thread);
833     r->status = MCMC_ERR;
834     r->resp.code = MCMC_CODE_SERVER_ERROR;
835     assert(rqu->res_ref == 0);
836     rqu->res_ref = luaL_ref(rctx->Lc, LUA_REGISTRYINDEX);
837     mcp_process_rqueue_return(rctx->parent, rctx->parent_handle, r);
838     if (rctx->parent->wait_count) {
839         mcp_process_rctx_wait(rctx->parent, rctx->parent_handle);
840     }
841 }
842 
_mcp_start_rctx_process_error(mcp_rcontext_t * rctx,struct mcp_rqueue_s * rqu)843 static void _mcp_start_rctx_process_error(mcp_rcontext_t *rctx, struct mcp_rqueue_s *rqu) {
844     // we have an error. need to mark the error into the parent rqu
845     rqu->flags |= RQUEUE_R_ERROR|RQUEUE_R_ANY;
846     mcp_resp_t *r = mcp_prep_bare_resobj(rctx->Lc, rctx->fgen->thread);
847     r->status = MCMC_ERR;
848     r->resp.code = MCMC_CODE_SERVER_ERROR;
849     assert(rqu->res_ref == 0);
850     rqu->res_ref = luaL_ref(rctx->Lc, LUA_REGISTRYINDEX);
851 
852     // queue an IO to return later.
853     io_pending_proxy_t *p = mcp_queue_rctx_io(rctx->parent, NULL, NULL, r);
854     p->return_cb = proxy_return_rqu_cb;
855     p->queue_handle = rctx->parent_handle;
856     p->background = true;
857 }
858 
mcp_start_subrctx(mcp_rcontext_t * rctx)859 static void mcp_start_subrctx(mcp_rcontext_t *rctx) {
860     int res = proxy_run_rcontext(rctx);
861     struct mcp_rqueue_s *rqu = &rctx->parent->qslots[rctx->parent_handle];
862     if (res == LUA_OK) {
863         int type = lua_type(rctx->Lc, 1);
864         mcp_resp_t *r = NULL;
865         if (type == LUA_TUSERDATA && (r = luaL_testudata(rctx->Lc, 1, "mcp.response")) != NULL) {
866             // move stack result object into parent rctx rqu slot.
867             assert(rqu->res_ref == 0);
868             rqu->res_ref = luaL_ref(rctx->Lc, LUA_REGISTRYINDEX);
869 
870             io_pending_proxy_t *p = mcp_queue_rctx_io(rctx->parent, NULL, NULL, r);
871             p->return_cb = proxy_return_rqu_cb;
872             p->queue_handle = rctx->parent_handle;
873             // TODO: change name of property to fast-return once mcp.await is
874             // retired.
875             p->background = true;
876         } else if (type == LUA_TSTRING) {
877             // TODO: wrap with a resobj and parse it.
878             // for now we bypass the rqueue process handling
879             // meaning no callbacks/etc.
880             assert(rqu->res_ref == 0);
881             rqu->res_ref = luaL_ref(rctx->Lc, LUA_REGISTRYINDEX);
882             rqu->flags |= RQUEUE_R_ANY;
883             rqu->state = RQUEUE_COMPLETE;
884             io_pending_proxy_t *p = mcp_queue_rctx_io(rctx->parent, NULL, NULL, NULL);
885             p->return_cb = proxy_return_rqu_cb;
886             p->queue_handle = rctx->parent_handle;
887             p->background = true;
888         } else {
889             // generate a generic object with an error.
890             _mcp_start_rctx_process_error(rctx, rqu);
891         }
892     } else if (res == LUA_YIELD) {
893         // normal.
894     } else {
895         lua_pop(rctx->Lc, 1); // drop the error message.
896         _mcp_start_rctx_process_error(rctx, rqu);
897     }
898 }
899 
mcp_resume_rctx_from_cb(mcp_rcontext_t * rctx)900 static void mcp_resume_rctx_from_cb(mcp_rcontext_t *rctx) {
901     int res = proxy_run_rcontext(rctx);
902     if (rctx->parent) {
903         struct mcp_rqueue_s *rqu = &rctx->parent->qslots[rctx->parent_handle];
904         if (res == LUA_OK) {
905             int type = lua_type(rctx->Lc, 1);
906             mcp_resp_t *r = NULL;
907             if (type == LUA_TUSERDATA && (r = luaL_testudata(rctx->Lc, 1, "mcp.response")) != NULL) {
908                 // move stack result object into parent rctx rqu slot.
909                 assert(rqu->res_ref == 0);
910                 rqu->res_ref = luaL_ref(rctx->Lc, LUA_REGISTRYINDEX);
911                 mcp_process_rqueue_return(rctx->parent, rctx->parent_handle, r);
912             } else if (type == LUA_TSTRING) {
913                 // TODO: wrap with a resobj and parse it.
914                 // for now we bypass the rqueue process handling
915                 // meaning no callbacks/etc.
916                 assert(rqu->res_ref == 0);
917                 rqu->res_ref = luaL_ref(rctx->Lc, LUA_REGISTRYINDEX);
918                 rqu->flags |= RQUEUE_R_ANY;
919                 rqu->state = RQUEUE_COMPLETE;
920             } else {
921                 // generate a generic object with an error.
922                 _mcp_resume_rctx_process_error(rctx, rqu);
923             }
924             if (rctx->parent->wait_count) {
925                 mcp_process_rctx_wait(rctx->parent, rctx->parent_handle);
926             }
927             mcp_funcgen_return_rctx(rctx);
928         } else if (res == LUA_YIELD) {
929             // normal.
930             _mcp_queue_hack(rctx->c);
931         } else {
932             lua_pop(rctx->Lc, 1); // drop the error message.
933             _mcp_resume_rctx_process_error(rctx, rqu);
934             mcp_funcgen_return_rctx(rctx);
935         }
936     } else {
937         // Parent rctx has returned either a response or error to its top
938         // level resp object and is now complete.
939         // HACK
940         // see notes in proxy_run_rcontext()
941         // NOTE: this function is called from rqu_cb(), which pushes the
942         // submission loop. This code below can call drive_machine(), which
943         // calls submission loop if stuff is queued.
944         // Would remove redundancy if we can signal up to rqu_cb() and either
945         // q->count-- or do the inline submission at that level.
946         if (res != LUA_YIELD) {
947             mcp_funcgen_return_rctx(rctx);
948             io_queue_t *q = conn_io_queue_get(rctx->c, IO_QUEUE_PROXY);
949             q->count--;
950             if (q->count == 0) {
951                 // call re-add directly since we're already in the worker thread.
952                 conn_worker_readd(rctx->c);
953             }
954         } else if (res == LUA_YIELD) {
955             _mcp_queue_hack(rctx->c);
956         }
957     }
958 }
959 
960 // This "Dummy" IO immediately resumes the yielded function, without a result
961 // attached.
proxy_return_rqu_dummy_cb(io_pending_t * pending)962 static void proxy_return_rqu_dummy_cb(io_pending_t *pending) {
963     io_pending_proxy_t *p = (io_pending_proxy_t *)pending;
964     mcp_rcontext_t *rctx = p->rctx;
965 
966     rctx->pending_reqs--;
967     assert(rctx->pending_reqs > -1);
968 
969     lua_settop(rctx->Lc, 0);
970     lua_pushinteger(rctx->Lc, 0); // return a "0" done count to the function.
971     mcp_resume_rctx_from_cb(rctx);
972 
973     do_cache_free(p->thread->io_cache, p);
974 }
975 
mcp_process_rctx_wait(mcp_rcontext_t * rctx,int handle)976 void mcp_process_rctx_wait(mcp_rcontext_t *rctx, int handle) {
977     struct mcp_rqueue_s *rqu = &rctx->qslots[handle];
978     int status = rqu->flags;
979     assert(rqu->state == RQUEUE_COMPLETE);
980     // waiting for some IO's to complete before continuing.
981     // meaning if we "match good" here, we can resume.
982     // we can also resume if we are in wait mode but pending_reqs is down
983     // to 1.
984     switch (rctx->wait_mode) {
985         case QWAIT_IDLE:
986             // should be impossible to get here.
987             // TODO: find a better path for throwing real errors from these
988             // side cases. would feel better long term.
989             abort();
990             break;
991         case QWAIT_GOOD:
992             if (status & RQUEUE_R_GOOD) {
993                 rctx->wait_done++;
994                 rqu->state = RQUEUE_WAITED;
995             }
996             break;
997         case QWAIT_OK:
998             if (status & (RQUEUE_R_GOOD|RQUEUE_R_OK)) {
999                 rctx->wait_done++;
1000                 rqu->state = RQUEUE_WAITED;
1001             }
1002             break;
1003         case QWAIT_ANY:
1004             rctx->wait_done++;
1005             rqu->state = RQUEUE_WAITED;
1006             break;
1007         case QWAIT_FASTGOOD:
1008             if (status & RQUEUE_R_GOOD) {
1009                 rctx->wait_done++;
1010                 rqu->state = RQUEUE_WAITED;
1011                 // resume early if "good"
1012                 status |= RQUEUE_R_RESUME;
1013             } else if (status & RQUEUE_R_OK) {
1014                 // count but don't resume early if "ok"
1015                 rctx->wait_done++;
1016                 rqu->state = RQUEUE_WAITED;
1017             }
1018             break;
1019         case QWAIT_HANDLE:
1020             // waiting for a specific handle to return
1021             if (handle == rctx->wait_handle) {
1022                 rctx->wait_done++;
1023                 rqu->state = RQUEUE_WAITED;
1024             }
1025             break;
1026         case QWAIT_SLEEP:
1027             assert(1 == 0); // should not get here.
1028             break;
1029     }
1030 
1031     assert(rctx->pending_reqs != 0);
1032     if ((status & RQUEUE_R_RESUME) || rctx->wait_done == rctx->wait_count || rctx->pending_reqs == 1) {
1033         // ran out of stuff to wait for. time to resume.
1034         // TODO: can we do the settop at the yield? nothing we need to
1035         // keep in the stack in this mode.
1036         lua_settop(rctx->Lc, 0);
1037         rctx->wait_count = 0;
1038         if (rctx->wait_mode == QWAIT_HANDLE) {
1039             mcp_rcontext_push_rqu_res(rctx->Lc, rctx, handle);
1040         } else {
1041             lua_pushinteger(rctx->Lc, rctx->wait_done);
1042         }
1043         rctx->wait_mode = QWAIT_IDLE;
1044 
1045         // nuke alarm if set.
1046         if (event_pending(&rctx->timeout_event, EV_TIMEOUT, NULL)) {
1047             event_del(&rctx->timeout_event);
1048         }
1049 
1050         mcp_resume_rctx_from_cb(rctx);
1051     }
1052 }
1053 
1054 // sets the slot's return status code, to be used for filtering responses
1055 // later.
1056 // if a callback was set, execute it now.
mcp_process_rqueue_return(mcp_rcontext_t * rctx,int handle,mcp_resp_t * res)1057 int mcp_process_rqueue_return(mcp_rcontext_t *rctx, int handle, mcp_resp_t *res) {
1058     struct mcp_rqueue_s *rqu = &rctx->qslots[handle];
1059     uint8_t flag = RQUEUE_R_ANY;
1060 
1061     assert(rqu->state == RQUEUE_ACTIVE);
1062     rqu->state = RQUEUE_COMPLETE;
1063     if (res->status == MCMC_OK) {
1064         if (res->resp.code != MCMC_CODE_END) {
1065             flag = RQUEUE_R_GOOD;
1066         } else {
1067             flag = RQUEUE_R_OK;
1068         }
1069     }
1070 
1071     if (rqu->cb_ref) {
1072         lua_settop(rctx->Lc, 0);
1073         lua_rawgeti(rctx->Lc, LUA_REGISTRYINDEX, rqu->cb_ref);
1074         lua_rawgeti(rctx->Lc, LUA_REGISTRYINDEX, rqu->res_ref);
1075         lua_rawgeti(rctx->Lc, LUA_REGISTRYINDEX, rqu->req_ref);
1076         if (lua_pcall(rctx->Lc, 2, 2, 0) != LUA_OK) {
1077             LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_ERROR, NULL, lua_tostring(rctx->Lc, -1));
1078         } else if (lua_isinteger(rctx->Lc, 1)) {
1079             // allow overriding the result flag from the callback.
1080             enum mcp_rqueue_e mode = lua_tointeger(rctx->Lc, 1);
1081             switch (mode) {
1082                 case QWAIT_GOOD:
1083                     flag = RQUEUE_R_GOOD;
1084                     break;
1085                 case QWAIT_OK:
1086                     flag = RQUEUE_R_OK;
1087                     break;
1088                 case QWAIT_ANY:
1089                     break;
1090                 default:
1091                     // ANY
1092                     break;
1093             }
1094 
1095             // if second result return shortcut status code
1096             if (lua_toboolean(rctx->Lc, 2)) {
1097                 flag |= RQUEUE_R_RESUME;
1098             }
1099         }
1100         lua_settop(rctx->Lc, 0); // FIXME: This might not be necessary.
1101                                  // we settop _before_ calling cb's and
1102                                  // _before_ setting up for a coro resume.
1103     }
1104     rqu->flags |= flag;
1105     return rqu->flags;
1106 }
1107 
1108 // specific function for queue-based returns.
proxy_return_rqu_cb(io_pending_t * pending)1109 static void proxy_return_rqu_cb(io_pending_t *pending) {
1110     io_pending_proxy_t *p = (io_pending_proxy_t *)pending;
1111     mcp_rcontext_t *rctx = p->rctx;
1112 
1113     if (p->client_resp) {
1114         mcp_process_rqueue_return(rctx, p->queue_handle, p->client_resp);
1115     }
1116     rctx->pending_reqs--;
1117     assert(rctx->pending_reqs > -1);
1118 
1119     if (rctx->wait_count) {
1120         mcp_process_rctx_wait(rctx, p->queue_handle);
1121     } else {
1122         mcp_funcgen_return_rctx(rctx);
1123     }
1124 
1125     do_cache_free(p->thread->io_cache, p);
1126 }
1127 
mcp_run_rcontext_handle(mcp_rcontext_t * rctx,int handle)1128 void mcp_run_rcontext_handle(mcp_rcontext_t *rctx, int handle) {
1129     struct mcp_rqueue_s *rqu = NULL;
1130     rqu = &rctx->qslots[handle];
1131 
1132     if (rqu->state == RQUEUE_QUEUED) {
1133         rqu->state = RQUEUE_ACTIVE;
1134         if (rqu->obj_type == RQUEUE_TYPE_POOL) {
1135             mcp_request_t *rq = rqu->rq;
1136             mcp_backend_t *be = mcplib_pool_proxy_call_helper(rqu->obj, MCP_PARSER_KEY(rq->pr), rq->pr.klen);
1137             // FIXME: queue requires conn because we're stacking objects
1138             // into the connection for later submission, which means we
1139             // absolutely cannot queue things once *c becomes invalid.
1140             // need to assert/block this from happening.
1141             mcp_set_resobj(rqu->res_obj, rq, be, rctx->fgen->thread);
1142             io_pending_proxy_t *p = mcp_queue_rctx_io(rctx, rq, be, rqu->res_obj);
1143             p->return_cb = proxy_return_rqu_cb;
1144             p->queue_handle = handle;
1145             rctx->pending_reqs++;
1146         } else if (rqu->obj_type == RQUEUE_TYPE_FGEN) {
1147             // TODO: NULL the ->c post-return?
1148             mcp_rcontext_t *subrctx = rqu->obj;
1149             subrctx->c = rctx->c;
1150             rctx->pending_reqs++;
1151             mcp_start_subrctx(subrctx);
1152         } else {
1153             assert(1==0);
1154         }
1155     } else if (rqu->state == RQUEUE_COMPLETE && rctx->wait_count) {
1156         // The slot was previously completed from an earlier dispatch, but we
1157         // haven't "waited" on it yet.
1158         mcp_process_rctx_wait(rctx, handle);
1159     }
1160 }
1161 
_mcplib_set_rctx_alarm(lua_State * L,mcp_rcontext_t * rctx,int arg)1162 static inline void _mcplib_set_rctx_alarm(lua_State *L, mcp_rcontext_t *rctx, int arg) {
1163     int isnum = 0;
1164     lua_Number secondsf = lua_tonumberx(L, arg, &isnum);
1165     if (!isnum) {
1166         proxy_lua_error(L, "timeout argument to wait or sleep must be a number");
1167         return;
1168     }
1169     int pending = event_pending(&rctx->timeout_event, EV_TIMEOUT, NULL);
1170     if ((pending & (EV_TIMEOUT)) == 0) {
1171         struct timeval tv = { .tv_sec = 0, .tv_usec = 0 };
1172         lua_Integer secondsi = (lua_Integer) secondsf;
1173         lua_Number subseconds = secondsf - secondsi;
1174 
1175         tv.tv_sec = secondsi;
1176         tv.tv_usec = MICROSECONDS(subseconds);
1177         event_add(&rctx->timeout_event, &tv);
1178     }
1179 }
1180 
1181 // TODO: one more function to wait on a list of handles? to queue and wait on
1182 // a list of handles? expand wait_cond()
1183 
_mcplib_rcontext_wait_prep(lua_State * L,mcp_rcontext_t * rctx,int argc)1184 static inline int _mcplib_rcontext_wait_prep(lua_State *L, mcp_rcontext_t *rctx, int argc) {
1185     int mode = QWAIT_ANY;
1186     int wait = 0;
1187 
1188     if (rctx->wait_mode != QWAIT_IDLE) {
1189         proxy_lua_error(L, "wait_cond: cannot call while already in wait mode");
1190         return 0;
1191     }
1192 
1193     if (argc < 2) {
1194         proxy_lua_error(L, "must pass at least count to wait_cond");
1195         return 0;
1196     }
1197 
1198     int isnum = 0;
1199     wait = lua_tointegerx(L, 2, &isnum);
1200     if (!isnum || wait < 0) {
1201         proxy_lua_error(L, "wait count for wait_cond must be a positive integer");
1202         return 0;
1203     }
1204 
1205     if (argc > 2) {
1206         mode = lua_tointeger(L, 3);
1207     }
1208 
1209     switch (mode) {
1210         case QWAIT_ANY:
1211         case QWAIT_OK:
1212         case QWAIT_GOOD:
1213         case QWAIT_FASTGOOD:
1214             break;
1215         default:
1216             proxy_lua_error(L, "invalid mode sent to wait_cond");
1217             return 0;
1218     }
1219 
1220     rctx->wait_count = wait;
1221     rctx->wait_done = 0;
1222     rctx->wait_mode = mode;
1223 
1224     return 0;
1225 }
1226 
1227 // takes num, filter mode
mcplib_rcontext_wait_cond(lua_State * L)1228 int mcplib_rcontext_wait_cond(lua_State *L) {
1229     int argc = lua_gettop(L);
1230     mcp_rcontext_t *rctx = lua_touserdata(L, 1);
1231 
1232     _mcplib_rcontext_wait_prep(L, rctx, argc);
1233 
1234     // waiting for none, meaning just execute the queues.
1235     if (rctx->wait_count == 0) {
1236         io_pending_proxy_t *p = mcp_queue_rctx_io(rctx, NULL, NULL, NULL);
1237         p->return_cb = proxy_return_rqu_dummy_cb;
1238         p->background = true;
1239         rctx->pending_reqs++;
1240         rctx->wait_mode = QWAIT_IDLE; // not actually waiting.
1241     } else if (argc > 3) {
1242         // optional wait timeout. does not cancel existing request!
1243         _mcplib_set_rctx_alarm(L, rctx, 4);
1244     }
1245 
1246     lua_pushinteger(L, MCP_YIELD_WAITCOND);
1247     return lua_yield(L, 1);
1248 }
1249 
mcplib_rcontext_enqueue_and_wait(lua_State * L)1250 int mcplib_rcontext_enqueue_and_wait(lua_State *L) {
1251     mcp_rcontext_t *rctx = lua_touserdata(L, 1);
1252     mcp_request_t *rq = luaL_checkudata(L, 2, "mcp.request");
1253     int isnum = 0;
1254     int handle = lua_tointegerx(L, 3, &isnum);
1255 
1256     if (rctx->wait_mode != QWAIT_IDLE) {
1257         proxy_lua_error(L, "wait_cond: cannot call while already in wait mode");
1258         return 0;
1259     }
1260 
1261     if (!rq->pr.keytoken) {
1262         proxy_lua_error(L, "cannot queue requests without a key");
1263         return 0;
1264     }
1265 
1266     if (!isnum) {
1267         proxy_lua_error(L, "invalid handle passed to enqueue_and_wait");
1268         return 0;
1269     }
1270 
1271     // queue up this handle and yield for the direct wait.
1272     lua_pushvalue(L, 2);
1273     _mcplib_rcontext_queue(L, rctx, rq, handle);
1274 
1275     if (lua_gettop(L) > 3) {
1276         _mcplib_set_rctx_alarm(L, rctx, 4);
1277     }
1278 
1279     rctx->wait_done = 0;
1280     rctx->wait_count = 1;
1281     rctx->wait_mode = QWAIT_HANDLE;
1282     rctx->wait_handle = handle;
1283 
1284     lua_pushinteger(L, MCP_YIELD_WAITHANDLE);
1285     return lua_yield(L, 1);
1286 }
1287 
mcplib_rcontext_wait_handle(lua_State * L)1288 int mcplib_rcontext_wait_handle(lua_State *L) {
1289     mcp_rcontext_t *rctx = lua_touserdata(L, 1);
1290     int isnum = 0;
1291     int handle = lua_tointegerx(L, 2, &isnum);
1292 
1293     if (rctx->wait_mode != QWAIT_IDLE) {
1294         proxy_lua_error(L, "wait: cannot call while already in wait mode");
1295         return 0;
1296     }
1297 
1298     if (!isnum || handle < 0 || handle >= rctx->fgen->max_queues) {
1299         proxy_lua_error(L, "invalid handle passed to wait_handle");
1300         return 0;
1301     }
1302 
1303     struct mcp_rqueue_s *rqu = &rctx->qslots[handle];
1304     if (rqu->state == RQUEUE_IDLE) {
1305         proxy_lua_error(L, "wait_handle called on unqueued handle");
1306         return 0;
1307     }
1308 
1309     if (lua_gettop(L) > 2) {
1310         _mcplib_set_rctx_alarm(L, rctx, 3);
1311     }
1312 
1313     rctx->wait_done = 0;
1314     rctx->wait_count = 1;
1315     rctx->wait_mode = QWAIT_HANDLE;
1316     rctx->wait_handle = handle;
1317 
1318     lua_pushinteger(L, MCP_YIELD_WAITHANDLE);
1319     return lua_yield(L, 1);
1320 }
1321 
1322 // TODO: This is disabled due to issues with the IO subsystem. Fixing this
1323 // requires retiring of API1 to allow refactoring or some extra roundtrip
1324 // work.
1325 // - if rctx:sleep() is called, the coroutine is suspended.
1326 // - once resumed after the timeout, we may later suspend again and make
1327 // backend requests
1328 // - once the coroutine is completed, we need to check if the owning client
1329 // conn is ready to be resumed
1330 // - BUG: we can only get into the "conn is in IO queue wait" state if a
1331 // sub-IO was created and submitted somewhere.
1332 // - This means either rctx:sleep() needs to be implemented by submitting a
1333 // dummy IO req (as other code does)
1334 // - OR we need to refactor the IO system so the dummies aren't required
1335 // anymore.
1336 //
1337 // If a dummy is used, we would have to implement this as:
1338 // - immediately submit a dummy IO if sleep is called.
1339 // - this allows the IO system to move the connection into the right state
1340 // - will immediately circle back then set an alarm for the sleep timeout
1341 // - once the sleep resumes, run code as normal. resumption should work
1342 // properly since we've entered the correct state originally.
1343 //
1344 // This adds a lot of CPU overhead to sleep, which should be fine given the
1345 // nature of the function, but also adds a lot of code and increases the
1346 // chances of bugs. So I'm leaving it out until this can be implemented more
1347 // simply.
mcplib_rcontext_sleep(lua_State * L)1348 int mcplib_rcontext_sleep(lua_State *L) {
1349     mcp_rcontext_t *rctx = lua_touserdata(L, 1);
1350     if (rctx->wait_mode != QWAIT_IDLE) {
1351         proxy_lua_error(L, "sleep: cannot call while already in wait mode");
1352         return 0;
1353     };
1354 
1355     _mcplib_set_rctx_alarm(L, rctx, 2);
1356     rctx->wait_mode = QWAIT_SLEEP;
1357 
1358     lua_pushinteger(L, MCP_YIELD_SLEEP);
1359     return lua_yield(L, 1);
1360 }
1361 
_mcplib_rcontext_checkhandle(lua_State * L)1362 static inline struct mcp_rqueue_s *_mcplib_rcontext_checkhandle(lua_State *L) {
1363     mcp_rcontext_t *rctx = lua_touserdata(L, 1);
1364     int isnum = 0;
1365     int handle = lua_tointegerx(L, 2, &isnum);
1366     if (!isnum || handle < 0 || handle >= rctx->fgen->max_queues) {
1367         proxy_lua_error(L, "invalid queue handle passed to :good/:ok:/:any");
1368         return NULL;
1369     }
1370 
1371     struct mcp_rqueue_s *rqu = &rctx->qslots[handle];
1372     return rqu;
1373 }
1374 
mcplib_rcontext_res_good(lua_State * L)1375 int mcplib_rcontext_res_good(lua_State *L) {
1376     struct mcp_rqueue_s *rqu = _mcplib_rcontext_checkhandle(L);
1377     if (rqu->flags & RQUEUE_R_GOOD) {
1378         lua_rawgeti(L, LUA_REGISTRYINDEX, rqu->res_ref);
1379     } else {
1380         lua_pushnil(L);
1381     }
1382     return 1;
1383 }
1384 
mcplib_rcontext_res_ok(lua_State * L)1385 int mcplib_rcontext_res_ok(lua_State *L) {
1386     struct mcp_rqueue_s *rqu = _mcplib_rcontext_checkhandle(L);
1387     if (rqu->flags & (RQUEUE_R_OK|RQUEUE_R_GOOD)) {
1388         lua_rawgeti(L, LUA_REGISTRYINDEX, rqu->res_ref);
1389     } else {
1390         lua_pushnil(L);
1391     }
1392     return 1;
1393 }
1394 
mcplib_rcontext_res_any(lua_State * L)1395 int mcplib_rcontext_res_any(lua_State *L) {
1396     struct mcp_rqueue_s *rqu = _mcplib_rcontext_checkhandle(L);
1397     if (rqu->flags & (RQUEUE_R_ANY|RQUEUE_R_OK|RQUEUE_R_GOOD)) {
1398         lua_rawgeti(L, LUA_REGISTRYINDEX, rqu->res_ref);
1399     } else {
1400         // Shouldn't be possible to get here, unless you're asking about a
1401         // queue that was never armed or hasn't completed yet.
1402         lua_pushnil(L);
1403     }
1404     return 1;
1405 }
1406 
1407 // returns res, RES_GOOD|OK|ANY
mcplib_rcontext_result(lua_State * L)1408 int mcplib_rcontext_result(lua_State *L) {
1409     struct mcp_rqueue_s *rqu = _mcplib_rcontext_checkhandle(L);
1410     if (rqu->flags & (RQUEUE_R_ANY|RQUEUE_R_OK|RQUEUE_R_GOOD)) {
1411         lua_rawgeti(L, LUA_REGISTRYINDEX, rqu->res_ref);
1412         // mask away any other queue flags.
1413         lua_pushinteger(L, rqu->flags & (RQUEUE_R_ANY|RQUEUE_R_OK|RQUEUE_R_GOOD));
1414     } else {
1415         lua_pushnil(L);
1416         lua_pushnil(L);
1417     }
1418 
1419     return 2;
1420 }
1421 
mcplib_rcontext_cfd(lua_State * L)1422 int mcplib_rcontext_cfd(lua_State *L) {
1423     mcp_rcontext_t *rctx = lua_touserdata(L, 1);
1424     lua_pushinteger(L, rctx->conn_fd);
1425     return 1;
1426 }
1427 
1428 // Must not call this if rctx has returned result to client already.
mcplib_rcontext_tls_peer_cn(lua_State * L)1429 int mcplib_rcontext_tls_peer_cn(lua_State *L) {
1430     mcp_rcontext_t *rctx = lua_touserdata(L, 1);
1431     if (!rctx->c) {
1432         lua_pushnil(L);
1433         return 1;
1434     }
1435 
1436 #ifdef TLS
1437     int len = 0;
1438     const unsigned char *cn = ssl_get_peer_cn(rctx->c, &len);
1439     if (cn) {
1440         lua_pushlstring(L, (const char *)cn, len);
1441     } else {
1442         lua_pushnil(L);
1443     }
1444 #else
1445     lua_pushnil(L);
1446 #endif
1447     return 1;
1448 }
1449 
1450 // TODO: stub function.
1451 // need to attach request function to rcontext, by using another function that
1452 // doesn't fill out the request info
mcplib_rcontext_request_new(lua_State * L)1453 int mcplib_rcontext_request_new(lua_State *L) {
1454     mcp_parser_t pr = {0};
1455 
1456     mcp_new_request(L, &pr, " ", 1);
1457     return 1;
1458 }
1459 
1460 // TODO: stub function, see request_new above.
mcplib_rcontext_response_new(lua_State * L)1461 int mcplib_rcontext_response_new(lua_State *L) {
1462     mcp_resp_t *r = lua_newuserdatauv(L, sizeof(mcp_resp_t), 0);
1463     memset(r, 0, sizeof(mcp_resp_t));
1464     luaL_getmetatable(L, "mcp.response");
1465     lua_setmetatable(L, -2);
1466     return 1;
1467 }
1468 
1469 // the supplied handle must be valid.
mcp_rcontext_push_rqu_res(lua_State * L,mcp_rcontext_t * rctx,int handle)1470 void mcp_rcontext_push_rqu_res(lua_State *L, mcp_rcontext_t *rctx, int handle) {
1471     struct mcp_rqueue_s *rqu = &rctx->qslots[handle];
1472     lua_rawgeti(L, LUA_REGISTRYINDEX, rqu->res_ref);
1473 }
1474 
1475 /*
1476  * Specialized router funcgen.
1477  * For routing a key across a map of possible function generators, we use a
1478  * specialized function generator. This is to keep the attach and start code
1479  * consistent, as they only need to think about function generators.
1480  * It also keeps the cleanup code consistent, as when a "router" funcgen is
1481  * replaced by mcp.attach() during a reload, we can immediately dereference
1482  * all of the route fgens, rather than have to wait for GC.
1483  *
1484  * Another upside is when we're starting a new request, we can immediately
1485  * swap out the top level fgen object, rather than force all routes to be
1486  * processed as sub-funcs, which is a tiny bit slower and disallows custom
1487  * request object sizes.
1488  *
1489  * The downside is this will appear to be bolted onto the side of the existing
1490  * structs rather than be its own object, like I initially wanted.
1491  */
1492 
_mcp_router_shortsep(const char * key,const int klen,const char needle,size_t * len)1493 static inline const char *_mcp_router_shortsep(const char *key, const int klen, const char needle, size_t *len) {
1494     const char *end = NULL;
1495     const char *lookup = NULL;
1496 
1497     end = memchr(key, needle, klen);
1498     if (end == NULL) {
1499         lookup = key;
1500     } else {
1501         lookup = key;
1502         *len = end - key;
1503     }
1504 
1505     return lookup;
1506 }
1507 
1508 // we take some liberties here because we know needle and key can't be zero
1509 // this isn't the most hyper optimized search but prefixes and separators
1510 // should both be short.
_mcp_router_longsep(const char * key,const int klen,const char * needle,size_t * len)1511 static inline const char *_mcp_router_longsep(const char *key, const int klen, const char *needle, size_t *len) {
1512     const char *end = NULL;
1513     const char *lookup = key;
1514     size_t nlen = strlen(needle);
1515 
1516     end = memchr(key, needle[0], klen);
1517     if (end == NULL) {
1518         // definitely no needle in this haystack.
1519         return key;
1520     }
1521 
1522     // find the last possible position
1523     const char *last = key + (klen - nlen);
1524 
1525     while (end <= last) {
1526         if (*end == needle[0] && memcmp(end, needle, nlen) == 0) {
1527             lookup = key;
1528             *len = end - key;
1529             break;
1530         }
1531         end++;
1532     }
1533 
1534     return lookup;
1535 }
1536 
_mcp_router_anchorsm(const char * key,const int klen,const char * needle,size_t * len)1537 static inline const char *_mcp_router_anchorsm(const char *key, const int klen, const char *needle, size_t *len) {
1538     // check the first byte anchor.
1539     if (key[0] != needle[0]) {
1540         return NULL;
1541     }
1542 
1543     // rest is same as shortsep.
1544     return _mcp_router_shortsep(key+1, klen-1, needle[1], len);
1545 }
1546 
_mcp_router_anchorbig(const char * key,const int klen,const struct mcp_router_long_s * conf,size_t * len)1547 static inline const char *_mcp_router_anchorbig(const char *key, const int klen, const struct mcp_router_long_s *conf, size_t *len) {
1548     // check long anchored prefix.
1549     size_t slen = strlen(conf->start);
1550     // check for start len+2 to avoid sending a zero byte haystack to longsep
1551     if (slen+2 > klen || memcmp(key, conf->start, slen) != 0) {
1552         return NULL;
1553     }
1554 
1555     // rest is same as longsep
1556     return _mcp_router_longsep(key+slen, klen-slen, conf->stop, len);
1557 }
1558 
_mcp_funcgen_route_fallback(struct mcp_funcgen_router * fr,int cmd)1559 static inline mcp_funcgen_t *_mcp_funcgen_route_fallback(struct mcp_funcgen_router *fr, int cmd) {
1560     if (fr->cmap[cmd]) {
1561         return fr->cmap[cmd];
1562     }
1563     return fr->def_fgen;
1564 }
1565 
mcp_funcgen_route(lua_State * L,mcp_funcgen_t * fgen,mcp_parser_t * pr)1566 static mcp_funcgen_t *mcp_funcgen_route(lua_State *L, mcp_funcgen_t *fgen, mcp_parser_t *pr) {
1567     struct mcp_funcgen_router *fr = (struct mcp_funcgen_router *)fgen;
1568     if (pr->klen == 0) {
1569         return NULL;
1570     }
1571     const char *key = &pr->request[pr->tokens[pr->keytoken]];
1572     const char *lookup = NULL;
1573     size_t lookuplen = 0;
1574     switch(fr->type) {
1575         case FGEN_ROUTER_NONE:
1576             break;
1577         case FGEN_ROUTER_CMDMAP:
1578             // short circuit if all we can do is cmap and default.
1579             return _mcp_funcgen_route_fallback(fr, pr->command);
1580             break;
1581         case FGEN_ROUTER_SHORTSEP:
1582             lookup = _mcp_router_shortsep(key, pr->klen, fr->conf.sep, &lookuplen);
1583             break;
1584         case FGEN_ROUTER_LONGSEP:
1585             lookup = _mcp_router_longsep(key, pr->klen, fr->conf.lsep, &lookuplen);
1586             break;
1587         case FGEN_ROUTER_ANCHORSM:
1588             lookup = _mcp_router_anchorsm(key, pr->klen, fr->conf.anchorsm, &lookuplen);
1589             break;
1590         case FGEN_ROUTER_ANCHORBIG:
1591             lookup = _mcp_router_anchorbig(key, pr->klen, &fr->conf.big, &lookuplen);
1592             break;
1593     }
1594 
1595     if (lookuplen == 0) {
1596         return _mcp_funcgen_route_fallback(fr, pr->command);
1597     }
1598 
1599     // hoping the lua short string cache helps us avoid allocations at least.
1600     // since this lookup code is internal to the router object we can optimize
1601     // this later and remove the lua bits.
1602     lua_rawgeti(L, LUA_REGISTRYINDEX, fr->map_ref);
1603     lua_pushlstring(L, lookup, lookuplen);
1604     lua_rawget(L, -2); // pops key, returns value
1605     if (lua_isnil(L, -1)) {
1606         lua_pop(L, 2); // drop nil and map.
1607         return _mcp_funcgen_route_fallback(fr, pr->command);
1608     } else {
1609         int type = lua_type(L, -1);
1610         if (type == LUA_TUSERDATA) {
1611             mcp_funcgen_t *nfgen = lua_touserdata(L, -1);
1612             lua_pop(L, 2); // drop fgen and map.
1613             return nfgen;
1614         } else if (type == LUA_TTABLE) {
1615             lua_rawgeti(L, -1, pr->command);
1616             // If nil, check CMD_ANY_STORAGE index for a cmap default
1617             if (lua_isnil(L, -1)) {
1618                 lua_pop(L, 1); // drop nil.
1619                 // check if we have a local-default
1620                 lua_rawgeti(L, -1, CMD_ANY_STORAGE);
1621                 if (lua_isnil(L, -1)) {
1622                     lua_pop(L, 3); // drop map, cmd map, nil
1623                     return _mcp_funcgen_route_fallback(fr, pr->command);
1624                 } else {
1625                     mcp_funcgen_t *nfgen = lua_touserdata(L, -1);
1626                     lua_pop(L, 3); // drop map, cmd map, fgen
1627                     return nfgen;
1628                 }
1629             }
1630             mcp_funcgen_t *nfgen = lua_touserdata(L, -1);
1631             lua_pop(L, 3); // drop fgen, cmd map, map
1632             return nfgen;
1633         } else {
1634             return _mcp_funcgen_route_fallback(fr, pr->command);
1635         }
1636     }
1637 }
1638 
1639 // called from mcp_funcgen_cleanup if necessary.
mcp_funcgen_router_cleanup(lua_State * L,mcp_funcgen_t * fgen)1640 static int mcp_funcgen_router_cleanup(lua_State *L, mcp_funcgen_t *fgen) {
1641     struct mcp_funcgen_router *fr = (struct mcp_funcgen_router *)fgen;
1642     if (fr->map_ref) {
1643         lua_rawgeti(L, LUA_REGISTRYINDEX, fr->map_ref);
1644 
1645         // walk the map, de-ref any funcgens found.
1646         int tidx = lua_absindex(L, -1);
1647         lua_pushnil(L);
1648         while (lua_next(L, tidx) != 0) {
1649             int type = lua_type(L, -1);
1650             if (type == LUA_TUSERDATA) {
1651                 mcp_funcgen_t *mfgen = lua_touserdata(L, -1);
1652                 mcp_funcgen_dereference(L, mfgen);
1653                 lua_pop(L, 1);
1654             } else if (type == LUA_TTABLE) {
1655                 int midx = lua_absindex(L, -1);
1656                 lua_pushnil(L);
1657                 while (lua_next(L, midx) != 0) {
1658                     mcp_funcgen_t *mfgen = lua_touserdata(L, -1);
1659                     mcp_funcgen_dereference(L, mfgen);
1660                     lua_pop(L, 1); // drop value
1661                 }
1662                 lua_pop(L, 1); // drop command map table
1663             }
1664         }
1665 
1666         lua_pop(L, 1); // drop the table.
1667         luaL_unref(L, LUA_REGISTRYINDEX, fr->map_ref);
1668         fr->map_ref = 0;
1669     }
1670 
1671     // release any command map entries.
1672     for (int x = 0; x < CMD_END_STORAGE; x++) {
1673         if (fr->cmap[x]) {
1674             mcp_funcgen_dereference(L, fr->cmap[x]);
1675             fr->cmap[x] = NULL;
1676         }
1677     }
1678 
1679     if (fr->def_fgen) {
1680         mcp_funcgen_dereference(L, fr->def_fgen);
1681         fr->def_fgen = NULL;
1682     }
1683 
1684     return 0;
1685 }
1686 
1687 // Note: the string should be safe to use after popping it here, because we
1688 // were fetching it from a table, but I might consider copying it into a
1689 // buffer from the caller first.
_mcplib_router_new_check(lua_State * L,const char * arg,size_t * len)1690 static const char *_mcplib_router_new_check(lua_State *L, const char *arg, size_t *len) {
1691     int type = lua_getfield(L, 1, arg);
1692     if (type == LUA_TSTRING) {
1693         const char *sep = lua_tolstring(L, -1, len);
1694         if (*len == 0) {
1695             proxy_lua_ferror(L, "must pass a non-zero length string to %s in mcp.router_new", arg);
1696         } else if (*len > KEY_HASH_FILTER_MAX) {
1697             proxy_lua_ferror(L, "%s is too long in mcp.router_new", arg);
1698         }
1699         lua_pop(L, 1); // drop key
1700         return sep;
1701     } else if (type != LUA_TNIL) {
1702         proxy_lua_ferror(L, "must pass a string to %s in mcp.router_new", arg);
1703     }
1704     return NULL;
1705 }
1706 
_mcplib_router_new_cmapcheck(lua_State * L)1707 static void _mcplib_router_new_cmapcheck(lua_State *L) {
1708     int tidx = lua_absindex(L, -1);
1709     lua_pushnil(L); // init next table key.
1710     while (lua_next(L, tidx) != 0) {
1711         if (!lua_isinteger(L, -2)) {
1712             proxy_lua_error(L, "Non integer key in router command map in router_new");
1713         }
1714         int cmd = lua_tointeger(L, -2);
1715         if ((cmd <= 0 || cmd >= CMD_END_STORAGE) && cmd != CMD_ANY_STORAGE) {
1716             proxy_lua_error(L, "Bad command in router command map in router_new");
1717         }
1718         luaL_checkudata(L, -1, "mcp.funcgen");
1719         lua_pop(L, 1); // drop val, keep key.
1720     }
1721 }
1722 
_mcplib_router_new_mapcheck(lua_State * L)1723 static size_t _mcplib_router_new_mapcheck(lua_State *L) {
1724     size_t route_count = 0;
1725     if (!lua_istable(L, -1)) {
1726         proxy_lua_error(L, "Must pass a table to map argument of router_new");
1727     }
1728     // walk map table, get size count.
1729     lua_pushnil(L); // init table key.
1730     while (lua_next(L, 2) != 0) {
1731         int type = lua_type(L, -1);
1732         if (type == LUA_TUSERDATA) {
1733             luaL_checkudata(L, -1, "mcp.funcgen");
1734         } else if (type == LUA_TTABLE) {
1735             // If table, it's a command map, poke in and validate.
1736             _mcplib_router_new_cmapcheck(L);
1737         } else {
1738             proxy_lua_error(L, "unhandled data in router_new map");
1739         }
1740         route_count++;
1741         lua_pop(L, 1); // drop val, keep key.
1742     }
1743 
1744     return route_count;
1745 }
1746 
1747 // reads the configuration for the router based on the mode.
_mcplib_router_new_mode(lua_State * L,struct mcp_funcgen_router * fr)1748 static void _mcplib_router_new_mode(lua_State *L, struct mcp_funcgen_router *fr) {
1749     const char *type = lua_tostring(L, -1);
1750     size_t len = 0;
1751     const char *sep = NULL;
1752 
1753     // change internal type based on length of separator
1754     if (strcmp(type, "prefix") == 0) {
1755         sep = _mcplib_router_new_check(L, "stop", &len);
1756         if (sep == NULL) {
1757             // defaults
1758             fr->type = FGEN_ROUTER_SHORTSEP;
1759             fr->conf.sep = '/';
1760         } else if (len == 1) {
1761             // optimized shortsep case.
1762             fr->type = FGEN_ROUTER_SHORTSEP;
1763             fr->conf.sep = sep[0];
1764         } else {
1765             // len is long.
1766             fr->type = FGEN_ROUTER_LONGSEP;
1767             memcpy(fr->conf.lsep, sep, len);
1768             fr->conf.lsep[len] = '\0'; // cap it.
1769         }
1770     } else if (strcmp(type, "anchor") == 0) {
1771         size_t elen = 0; // stop len.
1772         const char *usep = _mcplib_router_new_check(L, "stop", &elen);
1773         sep = _mcplib_router_new_check(L, "start", &len);
1774         if (sep == NULL && usep == NULL) {
1775             // no arguments, use a default.
1776             fr->type = FGEN_ROUTER_ANCHORSM;
1777             fr->conf.anchorsm[0] = '/';
1778             fr->conf.anchorsm[1] = '/';
1779         } else if (sep == NULL || usep == NULL) {
1780             // reduce the combinatorial space because I'm lazy.
1781             proxy_lua_error(L, "must specify start and stop if mode is anchor in mcp.router_new");
1782         } else if (len == 1 && elen == 1) {
1783             fr->type = FGEN_ROUTER_ANCHORSM;
1784             fr->conf.anchorsm[0] = sep[0];
1785             fr->conf.anchorsm[1] = usep[0];
1786         } else {
1787             fr->type = FGEN_ROUTER_ANCHORBIG;
1788             memcpy(fr->conf.big.start, sep, len);
1789             memcpy(fr->conf.big.stop, usep, elen);
1790             fr->conf.big.start[len] = '\0';
1791             fr->conf.big.stop[elen] = '\0';
1792         }
1793     } else {
1794         proxy_lua_error(L, "unknown type passed to mcp.router_new");
1795     }
1796 }
1797 
1798 // FIXME: error if map or cmap not passed in?
mcplib_router_new(lua_State * L)1799 int mcplib_router_new(lua_State *L) {
1800     struct mcp_funcgen_router fr = {0};
1801     size_t route_count = 0;
1802     bool has_map = false;
1803 
1804     if (!lua_istable(L, 1)) {
1805         proxy_lua_error(L, "Must pass a table of arguments to mcp.router_new");
1806     }
1807 
1808     if (lua_getfield(L, 1, "map") != LUA_TNIL) {
1809         route_count = _mcplib_router_new_mapcheck(L);
1810         has_map = true;
1811     }
1812     lua_pop(L, 1); // drop map or nil
1813 
1814     if (lua_getfield(L, 1, "cmap") != LUA_TNIL) {
1815         if (!lua_istable(L, -1)) {
1816             proxy_lua_error(L, "Must pass a table to cmap argument of mcp.router_new");
1817         }
1818         _mcplib_router_new_cmapcheck(L);
1819     } else {
1820         if (!has_map) {
1821             proxy_lua_error(L, "Must pass map and/or cmap to mcp.router_new");
1822         }
1823     }
1824     lua_pop(L, 1);
1825 
1826     fr.fgen_self.is_router = true;
1827 
1828     // config:
1829     // { mode = "anchor", start = "/", stop = "/" }
1830     // { mode = "prefix", stop = "/" }
1831     if (has_map) {
1832         // default to a short prefix type with a single byte separator.
1833         fr.type = FGEN_ROUTER_SHORTSEP;
1834         fr.conf.sep = '/';
1835 
1836         if (lua_getfield(L, 1, "mode") == LUA_TSTRING) {
1837             _mcplib_router_new_mode(L, &fr);
1838         }
1839         lua_pop(L, 1); // drop mode or nil.
1840     } else {
1841         // pure command map router.
1842         fr.type = FGEN_ROUTER_CMDMAP;
1843     }
1844 
1845     struct mcp_funcgen_router *router = lua_newuserdatauv(L, sizeof(struct mcp_funcgen_router), 0);
1846     memset(router, 0, sizeof(*router));
1847     mcp_funcgen_t *fgen = &router->fgen_self;
1848 
1849     luaL_getmetatable(L, "mcp.funcgen");
1850     lua_setmetatable(L, -2);
1851 
1852     int type = lua_getfield(L, 1, "default");
1853     if (type == LUA_TUSERDATA) {
1854         fr.def_fgen = luaL_checkudata(L, -1, "mcp.funcgen");
1855         mcp_funcgen_reference(L); // pops the funcgen.
1856     } else {
1857         lua_pop(L, 1);
1858     }
1859 
1860     memcpy(router, &fr, sizeof(struct mcp_funcgen_router));
1861     strncpy(fgen->name, "mcp_router", FGEN_NAME_MAXLEN);
1862 
1863     if (has_map) {
1864         // walk map table again, funcgen_ref everyone.
1865         lua_createtable(L, 0, route_count);
1866         lua_pushvalue(L, -1); // dupe table ref for a moment.
1867         router->map_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops extra map
1868 
1869         int mymap = lua_absindex(L, -1);
1870         lua_getfield(L, 1, "map");
1871         int argmap = lua_absindex(L, -1);
1872         lua_pushnil(L); // seed walk of the passed in map
1873 
1874         while (lua_next(L, argmap) != 0) {
1875             // types are already validated.
1876             int type = lua_type(L, -1);
1877             if (type == LUA_TUSERDATA) {
1878                 // first lets reference the function generator.
1879                 lua_pushvalue(L, -1); // duplicate value.
1880                 mcp_funcgen_reference(L); // pops the funcgen after referencing.
1881 
1882                 // duplicate key.
1883                 lua_pushvalue(L, -2);
1884                 // move key underneath value
1885                 lua_insert(L, -2); // take top (key) and move it down one.
1886                 // now key, key, value
1887                 lua_rawset(L, mymap); // pops k, v into our internal table.
1888             } else if (type == LUA_TTABLE) {
1889                 int tidx = lua_absindex(L, -1); // idx of our command map table.
1890                 lua_createtable(L, CMD_END_STORAGE, 0);
1891                 int midx = lua_absindex(L, -1); // idx of our new command map.
1892                 lua_pushnil(L); // seed the iterator
1893                 while (lua_next(L, tidx) != 0) {
1894                     lua_pushvalue(L, -1); // duplicate value.
1895                     mcp_funcgen_reference(L); // pop funcgen.
1896 
1897                     lua_pushvalue(L, -2); // dupe key.
1898                     lua_insert(L, -2); // move key down one.
1899                     lua_rawset(L, midx); // set to new map table.
1900                 }
1901 
1902                 // -1: new command map
1903                 // -2: input command map
1904                 // -3: key
1905                 lua_pushvalue(L, -3); // dupe key
1906                 lua_insert(L, -2); // move key down below new cmd map
1907                 lua_rawset(L, mymap); // pop key, new map into main map.
1908                 lua_pop(L, 1); // drop input table.
1909             }
1910         }
1911 
1912         lua_pop(L, 2); // drop argmap, mymap.
1913     }
1914 
1915     // process a command map directly into our internal table.
1916     if (lua_getfield(L, 1, "cmap") != LUA_TNIL) {
1917         int tidx = lua_absindex(L, -1); // idx of our command map table.
1918         lua_pushnil(L); // seed the iterator
1919         while (lua_next(L, tidx) != 0) {
1920             int cmd = lua_tointeger(L, -2);
1921             mcp_funcgen_t *cfgen = lua_touserdata(L, -1);
1922             mcp_funcgen_reference(L); // pop funcgen.
1923             router->cmap[cmd] = cfgen;
1924         }
1925     }
1926     lua_pop(L, 1);
1927 
1928     LIBEVENT_THREAD *t = PROXY_GET_THR(L);
1929     fgen->thread = t;
1930     mcp_sharedvm_delta(t->proxy_ctx, SHAREDVM_FGEN_IDX, "mcp_router", 1);
1931 
1932     return 1;
1933 }
1934