1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3 #include "proxy.h"
4 #ifdef TLS
5 #include "tls.h"
6 #endif
7
8 static mcp_funcgen_t *mcp_funcgen_route(lua_State *L, mcp_funcgen_t *fgen, mcp_parser_t *pr);
9 static int mcp_funcgen_router_cleanup(lua_State *L, mcp_funcgen_t *fgen);
10 static void _mcplib_funcgen_cache(mcp_funcgen_t *fgen, mcp_rcontext_t *rctx);
11 static void mcp_funcgen_cleanup(lua_State *L, mcp_funcgen_t *fgen);
12 static void mcp_resume_rctx_from_cb(mcp_rcontext_t *rctx);
13 static void proxy_return_rqu_cb(io_pending_t *pending);
14
_mcp_queue_hack(conn * c)15 static inline void _mcp_queue_hack(conn *c) {
16 if (c) {
17 // HACK
18 // see notes above proxy_run_rcontext.
19 // in case the above resume calls queued new work, we have to submit
20 // it to the backend handling system here.
21 for (io_queue_t *q = c->io_queues; q->type != IO_QUEUE_NONE; q++) {
22 if (q->stack_ctx != NULL) {
23 io_queue_cb_t *qcb = thread_io_queue_get(c->thread, q->type);
24 qcb->submit_cb(q);
25 }
26 }
27 }
28 }
29
30 // If we're GC'ed but not closed, it means it was created but never
31 // attached to a function, so ensure everything is closed properly.
mcplib_funcgen_gc(lua_State * L)32 int mcplib_funcgen_gc(lua_State *L) {
33 mcp_funcgen_t *fgen = luaL_checkudata(L, -1, "mcp.funcgen");
34 if (fgen->closed) {
35 return 0;
36 }
37 assert(fgen->self_ref == 0);
38
39 mcp_funcgen_cleanup(L, fgen);
40 fgen->closed = true;
41 return 0;
42 }
43
44 // handler for *_wait_*() variants and sleep calls
mcp_funcgen_wait_handler(const int fd,const short which,void * arg)45 static void mcp_funcgen_wait_handler(const int fd, const short which, void *arg) {
46 mcp_rcontext_t *rctx = arg;
47
48 // if we were in waiting: reset wait mode, push wait_done + boolean true
49 // if we were in sleep: reset wait mode.
50 // immediately resume.
51 lua_settop(rctx->Lc, 0);
52 rctx->wait_count = 0;
53 rctx->lua_narg = 2;
54 if (rctx->wait_mode == QWAIT_HANDLE) {
55 // if timed out then we shouldn't have a result. just push nil.
56 lua_pushnil(rctx->Lc);
57 } else if (rctx->wait_mode == QWAIT_SLEEP) {
58 // no extra arg.
59 rctx->lua_narg = 1;
60 } else {
61 // how many results were processed
62 lua_pushinteger(rctx->Lc, rctx->wait_done);
63 }
64 // "timed out"
65 lua_pushboolean(rctx->Lc, 1);
66
67 rctx->wait_mode = QWAIT_IDLE;
68
69 mcp_resume_rctx_from_cb(rctx);
70 }
71
72 // For describing functions which generate functions which can execute
73 // requests.
74 // These "generator functions" handle pre-allocating and creating a memory
75 // heirarchy, allowing dynamic runtimes at high speed.
76
77 // must be called with fgen on top of stack in fgen->thread->L
mcp_rcontext_cleanup(lua_State * L,mcp_funcgen_t * fgen,mcp_rcontext_t * rctx,int fgen_idx)78 static void mcp_rcontext_cleanup(lua_State *L, mcp_funcgen_t *fgen, mcp_rcontext_t *rctx, int fgen_idx) {
79 luaL_unref(L, LUA_REGISTRYINDEX, rctx->coroutine_ref);
80 luaL_unref(L, LUA_REGISTRYINDEX, rctx->function_ref);
81 if (rctx->request_ref) {
82 luaL_unref(L, LUA_REGISTRYINDEX, rctx->request_ref);
83 }
84 assert(rctx->pending_reqs == 0);
85
86 // cleanup of request queue entries. recurse funcgen cleanup.
87 for (int x = 0; x < fgen->max_queues; x++) {
88 struct mcp_rqueue_s *rqu = &rctx->qslots[x];
89 if (rqu->obj_type == RQUEUE_TYPE_POOL) {
90 // nothing to do.
91 } else if (rqu->obj_type == RQUEUE_TYPE_FGEN) {
92 // don't need to recurse, just free the subrctx.
93 mcp_rcontext_t *subrctx = rqu->obj;
94 lua_rawgeti(L, LUA_REGISTRYINDEX, subrctx->fgen->self_ref);
95 mcp_rcontext_cleanup(L, subrctx->fgen, subrctx, lua_absindex(L, -1));
96 lua_pop(L, 1); // drop subrctx fgen
97 } else if (rqu->obj_type != RQUEUE_TYPE_NONE) {
98 assert(1 == 0);
99 }
100
101 if (rqu->res_ref) {
102 luaL_unref(L, LUA_REGISTRYINDEX, rqu->res_ref);
103 rqu->res_ref = 0;
104 }
105
106 if (rqu->cb_ref) {
107 luaL_unref(L, LUA_REGISTRYINDEX, rqu->cb_ref);
108 rqu->cb_ref = 0;
109 }
110 }
111
112 // nuke alarm if set.
113 // should only be paranoia here, but just in case.
114 if (event_pending(&rctx->timeout_event, EV_TIMEOUT, NULL)) {
115 event_del(&rctx->timeout_event);
116 }
117
118 lua_getiuservalue(L, fgen_idx, 1);
119 luaL_unref(L, -1, rctx->self_ref);
120 rctx->self_ref = 0;
121 lua_pop(L, 1); // drop freelist table
122
123 fgen->total--;
124 LIBEVENT_THREAD *t = PROXY_GET_THR(L);
125 // Fake an allocation when we free slots as they are long running data.
126 // This tricks the GC into running and freeing them.
127 t->proxy_vm_extra_kb += 2;
128 mcp_sharedvm_delta(t->proxy_ctx, SHAREDVM_FGENSLOT_IDX, fgen->name, -1);
129 }
130
131 // TODO: switch from an array to a STAILQ so we can avoid the memory
132 // management and error handling.
133 // Realistically it's impossible for these to error so we're safe for now.
134 #ifdef MEMCACHED_DEBUG
135 // require fewer test rounds for unit tests.
136 #define FGEN_FREE_PRESSURE_MAX 100
137 #define FGEN_FREE_PRESSURE_DROP 10
138 #define FGEN_FREE_WAIT 0
139 #else
140 #define FGEN_FREE_PRESSURE_MAX 5000
141 #define FGEN_FREE_PRESSURE_DROP 200
142 #define FGEN_FREE_WAIT 60 // seconds.
143 #endif
_mcplib_funcgen_cache(mcp_funcgen_t * fgen,mcp_rcontext_t * rctx)144 static void _mcplib_funcgen_cache(mcp_funcgen_t *fgen, mcp_rcontext_t *rctx) {
145 bool do_cache = true;
146 // Easing algorithm to decide when to "early free" rctx slots:
147 // - If we recently allocated a slot, reset pressure.
148 // - Each time an rctx is freed and more than half of available rctx's are
149 // free, increase pressure.
150 // - If free rctx are less than half of total, reduce pressure.
151 // - If pressure is too high, immediately free the rctx, then drop the
152 // pressure slightly.
153 // - If pressure is too high, and has been for more than FGEN_FREE_WAIT
154 // seconds, immediately free the rctx, then drop the pressure slightly.
155 //
156 // This should allow bursty traffic to avoid spinning on alloc/frees,
157 // while one-time bursts will slowly free slots back down to a min of 1.
158 if (fgen->free > fgen->total/2 - 1) {
159 if (fgen->free_pressure++ > FGEN_FREE_PRESSURE_MAX) {
160 struct timespec now;
161 clock_gettime(CLOCK_REALTIME, &now);
162 if (fgen->free_waiter.tv_sec == 0) {
163 fgen->free_waiter.tv_sec = now.tv_sec + FGEN_FREE_WAIT;
164 }
165
166 if (now.tv_sec >= fgen->free_waiter.tv_sec) {
167 do_cache = false;
168 }
169 // check again in a little while.
170 fgen->free_pressure -= FGEN_FREE_PRESSURE_DROP;
171 }
172 } else {
173 fgen->free_pressure >>= 1;
174 // must be too-free for a full wait period before releasing.
175 fgen->free_waiter.tv_sec = 0;
176 }
177
178 if (do_cache) {
179 if (fgen->free + 1 >= fgen->free_max) {
180 int x = fgen->free_max;
181 fgen->free_max *= 2;
182 fgen->list = realloc(fgen->list, fgen->free_max * sizeof(mcp_rcontext_t *));
183 for (; x < fgen->free_max; x++) {
184 fgen->list[x] = NULL;
185 }
186 }
187 fgen->list[fgen->free] = rctx;
188 fgen->free++;
189 } else {
190 // do not cache the rctx
191 assert(fgen->self_ref);
192 lua_State *L = fgen->thread->L;
193 lua_rawgeti(L, LUA_REGISTRYINDEX, fgen->self_ref);
194 mcp_rcontext_cleanup(L, fgen, rctx, lua_absindex(L, -1));
195 lua_pop(L, 1); // drop fgen
196 }
197
198 // we're closed and every outstanding request slot has been
199 // returned.
200 if (fgen->closed && fgen->free == fgen->total) {
201 mcp_funcgen_cleanup(fgen->thread->L, fgen);
202 }
203 }
204
205 // call with stack: mcp.funcgen -2, function -1
_mcplib_funcgen_gencall(lua_State * L)206 static int _mcplib_funcgen_gencall(lua_State *L) {
207 mcp_funcgen_t *fgen = luaL_checkudata(L, -2, "mcp.funcgen");
208 int fgen_idx = lua_absindex(L, -2);
209 // create the ctx object.
210 size_t rctx_len = sizeof(mcp_rcontext_t) + sizeof(struct mcp_rqueue_s) * fgen->max_queues;
211 mcp_rcontext_t *rc = lua_newuserdatauv(L, rctx_len, 0);
212 memset(rc, 0, rctx_len);
213
214 luaL_getmetatable(L, "mcp.rcontext");
215 lua_setmetatable(L, -2);
216 // allow the rctx to reference the function generator.
217 rc->fgen = fgen;
218 rc->lua_narg = 1;
219
220 // initialize the queue slots based on the fgen parent
221 for (int x = 0; x < fgen->max_queues; x++) {
222 struct mcp_rqueue_s *frqu = &fgen->queue_list[x];
223 struct mcp_rqueue_s *rqu = &rc->qslots[x];
224 rqu->obj_type = frqu->obj_type;
225 if (frqu->obj_type == RQUEUE_TYPE_POOL) {
226 rqu->obj_ref = 0;
227 rqu->obj = frqu->obj;
228 mcp_resp_t *r = mcp_prep_bare_resobj(L, fgen->thread);
229 rqu->res_ref = luaL_ref(L, LUA_REGISTRYINDEX);
230 rqu->res_obj = r;
231 } else if (frqu->obj_type == RQUEUE_TYPE_FGEN) {
232 // owner funcgen already holds the subfgen reference, so here we're just
233 // grabbing a subrctx to pin into the slot.
234 mcp_funcgen_t *fg = frqu->obj;
235 mcp_rcontext_t *subrctx = mcp_funcgen_get_rctx(L, fg->self_ref, fg);
236 if (subrctx == NULL) {
237 proxy_lua_error(L, "failed to generate request slot during queue_assign()");
238 }
239
240 // if this rctx ever had a request object assigned to it, we can get
241 // rid of it. we're pinning the subrctx in here and don't want
242 // to waste memory.
243 if (subrctx->request_ref) {
244 luaL_unref(L, LUA_REGISTRYINDEX, subrctx->request_ref);
245 subrctx->request_ref = 0;
246 subrctx->request = NULL;
247 }
248
249 // link the new rctx into this chain; we'll hold onto it until the
250 // parent de-allocates.
251 subrctx->parent = rc;
252 subrctx->parent_handle = x;
253 rqu->obj = subrctx;
254 }
255 }
256
257 // copy the rcontext reference
258 lua_pushvalue(L, -1);
259
260 // issue a rotation so one rcontext is now below genfunc, and one rcontext
261 // is on the top.
262 // right shift: gf, rc1, rc2 -> rc2, gf, rc1
263 lua_rotate(L, -3, 1);
264
265 // current stack should be func, mcp.rcontext.
266 int call_argnum = 1;
267 // stack will be func, rctx, arg if there is an arg.
268 if (fgen->argument_ref) {
269 lua_rawgeti(L, LUA_REGISTRYINDEX, fgen->argument_ref);
270 call_argnum++;
271 }
272
273 // can throw an error upstream.
274 lua_call(L, call_argnum, 1);
275
276 // we should have a top level function as a result.
277 if (!lua_isfunction(L, -1)) {
278 proxy_lua_error(L, "function generator didn't return a function");
279 return 0;
280 }
281 // can't fail past this point.
282
283 // pop the returned function.
284 rc->function_ref = luaL_ref(L, LUA_REGISTRYINDEX);
285
286 // link the rcontext into the function generator.
287 fgen->total++;
288
289 lua_getiuservalue(L, fgen_idx, 1); // get the reference table.
290 // rc, t -> t, rc
291 lua_rotate(L, -2, 1);
292 rc->self_ref = luaL_ref(L, -2); // pop rcontext
293 lua_pop(L, 1); // pop ref table.
294
295 _mcplib_funcgen_cache(fgen, rc);
296
297 // associate a coroutine thread with this context.
298 rc->Lc = lua_newthread(L);
299 assert(rc->Lc);
300 rc->coroutine_ref = luaL_ref(L, LUA_REGISTRYINDEX);
301
302 // increment the slot counter
303 LIBEVENT_THREAD *t = PROXY_GET_THR(L);
304 mcp_sharedvm_delta(t->proxy_ctx, SHAREDVM_FGENSLOT_IDX, fgen->name, 1);
305
306 event_assign(&rc->timeout_event, t->base, -1, EV_TIMEOUT, mcp_funcgen_wait_handler, rc);
307
308 // return the fgen.
309 // FIXME: just return 0? need to adjust caller to not mis-ref the
310 // generator function.
311 return 1;
312 }
313
_mcp_funcgen_return_rctx(mcp_rcontext_t * rctx)314 static void _mcp_funcgen_return_rctx(mcp_rcontext_t *rctx) {
315 mcp_funcgen_t *fgen = rctx->fgen;
316 assert(rctx->pending_reqs == 0);
317 int res = lua_resetthread(rctx->Lc);
318 if (res != LUA_OK) {
319 // TODO: I was under the impression it was possible to reuse a
320 // coroutine from an error state, but it seems like this only works if
321 // the routine landed in LUA_YIELD or LUA_OK
322 // Leaving a note here to triple check this or if my memory was wrong.
323 // Instead for now we throw away the coroutine if it was involved in
324 // an error. Realistically this shouldn't be normal so it might not
325 // matter anyway.
326 lua_State *L = fgen->thread->L;
327 luaL_unref(L, LUA_REGISTRYINDEX, rctx->coroutine_ref);
328 rctx->Lc = lua_newthread(L);
329 assert(rctx->Lc);
330 rctx->coroutine_ref = luaL_ref(L, LUA_REGISTRYINDEX);
331 } else {
332 lua_settop(rctx->Lc, 0);
333 }
334 rctx->wait_mode = QWAIT_IDLE;
335 rctx->resp = NULL;
336 rctx->first_queue = false; // HACK
337 if (rctx->request) {
338 mcp_request_cleanup(fgen->thread, rctx->request);
339 }
340
341 // nuke alarm if set.
342 if (event_pending(&rctx->timeout_event, EV_TIMEOUT, NULL)) {
343 event_del(&rctx->timeout_event);
344 }
345
346 // reset each rqu.
347 for (int x = 0; x < fgen->max_queues; x++) {
348 struct mcp_rqueue_s *rqu = &rctx->qslots[x];
349 if (rqu->res_ref) {
350 if (rqu->res_obj) {
351 // using a persistent object.
352 mcp_response_cleanup(fgen->thread, rqu->res_obj);
353 } else {
354 // temporary error object
355 luaL_unref(rctx->Lc, LUA_REGISTRYINDEX, rqu->res_ref);
356 rqu->res_ref = 0;
357 }
358 }
359 if (rqu->req_ref) {
360 luaL_unref(rctx->Lc, LUA_REGISTRYINDEX, rqu->req_ref);
361 rqu->req_ref = 0;
362 }
363 assert(rqu->state != RQUEUE_ACTIVE);
364 rqu->state = RQUEUE_IDLE;
365 rqu->flags = 0;
366 rqu->rq = NULL;
367 if (rqu->obj_type == RQUEUE_TYPE_FGEN) {
368 _mcp_funcgen_return_rctx(rqu->obj);
369 }
370 }
371 }
372
373 // TODO: check rctx->awaiting before returning?
374 // TODO: separate the "cleanup" portion from the "Return to cache" portion, so
375 // we can call that directly for subrctx's
mcp_funcgen_return_rctx(mcp_rcontext_t * rctx)376 void mcp_funcgen_return_rctx(mcp_rcontext_t *rctx) {
377 mcp_funcgen_t *fgen = rctx->fgen;
378 if (rctx->pending_reqs != 0) {
379 // not ready to return to cache yet.
380 return;
381 }
382 if (rctx->parent) {
383 // Important: we need to hold the parent request reference until this
384 // subrctx is fully depleted of outstanding requests itself.
385 rctx->parent->pending_reqs--;
386 assert(rctx->parent->pending_reqs > -1);
387 if (rctx->parent->pending_reqs == 0) {
388 mcp_funcgen_return_rctx(rctx->parent);
389 }
390 return;
391 }
392 WSTAT_DECR(rctx->fgen->thread, proxy_req_active, 1);
393 _mcp_funcgen_return_rctx(rctx);
394 _mcplib_funcgen_cache(fgen, rctx);
395 }
396
mcp_funcgen_get_rctx(lua_State * L,int fgen_ref,mcp_funcgen_t * fgen)397 mcp_rcontext_t *mcp_funcgen_get_rctx(lua_State *L, int fgen_ref, mcp_funcgen_t *fgen) {
398 mcp_rcontext_t *rctx = NULL;
399 // nothing left in slot cache, generate a new function.
400 if (fgen->free == 0) {
401 // reset free pressure so we try to keep the rctx cached
402 fgen->free_pressure = 0;
403 fgen->free_waiter.tv_sec = 0;
404 // TODO (perf): pre-create this c closure somewhere hidden.
405 lua_pushcclosure(L, _mcplib_funcgen_gencall, 0);
406 // pull in the funcgen object
407 lua_rawgeti(L, LUA_REGISTRYINDEX, fgen_ref);
408 // then generator function
409 lua_rawgeti(L, LUA_REGISTRYINDEX, fgen->generator_ref);
410 // then generate a new function slot.
411 int res = lua_pcall(L, 2, 1, 0);
412 if (res != LUA_OK) {
413 LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_ERROR, NULL, lua_tostring(L, -1));
414 lua_settop(L, 0);
415 return NULL;
416 }
417 lua_pop(L, 1); // drop the extra funcgen
418 } else {
419 P_DEBUG("%s: serving from cache\n", __func__);
420 }
421
422 rctx = fgen->list[fgen->free-1];
423 fgen->list[fgen->free-1] = NULL;
424 fgen->free--;
425
426 // on non-error, return the response object upward.
427 return rctx;
428 }
429
mcp_funcgen_start(lua_State * L,mcp_funcgen_t * fgen,mcp_parser_t * pr)430 mcp_rcontext_t *mcp_funcgen_start(lua_State *L, mcp_funcgen_t *fgen, mcp_parser_t *pr) {
431 if (fgen->is_router) {
432 fgen = mcp_funcgen_route(L, fgen, pr);
433 if (fgen == NULL) {
434 return NULL;
435 }
436 }
437 // fgen->self_ref must be valid because we cannot start a function that
438 // hasn't been referenced anywhere.
439 mcp_rcontext_t *rctx = mcp_funcgen_get_rctx(L, fgen->self_ref, fgen);
440
441 if (rctx == NULL) {
442 return NULL;
443 }
444
445 // only top level rctx's can have a request object assigned to them.
446 // so we create them late here, in the start function.
447 // Note that we can _technically_ fail with an OOM here, but we've not set
448 // up lua in a way that OOM's are possible.
449 if (rctx->request_ref == 0) {
450 mcp_request_t *rq = lua_newuserdatauv(L, sizeof(mcp_request_t) + MCP_REQUEST_MAXLEN + KEY_MAX_LENGTH, 0);
451 memset(rq, 0, sizeof(mcp_request_t));
452 luaL_getmetatable(L, "mcp.request");
453 lua_setmetatable(L, -2);
454
455 rctx->request_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pop the request
456 rctx->request = rq;
457 }
458
459 // TODO: could probably move a few more lines from proto_proxy into here,
460 // but that's splitting hairs.
461 WSTAT_INCR(fgen->thread, proxy_req_active, 1);
462 return rctx;
463 }
464
465 // calling either with self_ref set, or with fgen in stack -1 (ie; from GC
466 // function without ever being attached to anything)
mcp_funcgen_cleanup(lua_State * L,mcp_funcgen_t * fgen)467 static void mcp_funcgen_cleanup(lua_State *L, mcp_funcgen_t *fgen) {
468 int fgen_idx = 0;
469 lua_checkstack(L, 5); // paranoia. this can recurse from a router.
470 // pull the fgen into the stack.
471 if (fgen->self_ref) {
472 // pull self onto the stack and hold until the end of the func.
473 lua_rawgeti(L, LUA_REGISTRYINDEX, fgen->self_ref);
474 fgen_idx = lua_absindex(L, -1); // remember fgen offset
475 // remove the C reference to the fgen
476 luaL_unref(L, LUA_REGISTRYINDEX, fgen->self_ref);
477 fgen->self_ref = 0;
478 } else if (fgen->closed) {
479 // we've already cleaned up, probably redundant call from _gc()
480 return;
481 } else {
482 // not closed, no self-ref, so must be unattached and coming from GC
483 fgen_idx = lua_absindex(L, -1);
484 }
485
486 if (fgen->is_router) {
487 // we're actually a "router", send this out for cleanup.
488 mcp_funcgen_router_cleanup(L, fgen);
489 }
490
491 // decrement the slot counter
492 LIBEVENT_THREAD *t = PROXY_GET_THR(L);
493 mcp_sharedvm_delta(t->proxy_ctx, SHAREDVM_FGEN_IDX, fgen->name, -1);
494
495 // Walk every request context and issue cleanup.
496 for (int x = 0; x < fgen->free_max; x++) {
497 mcp_rcontext_t *rctx = fgen->list[x];
498 if (rctx == NULL) {
499 continue;
500 }
501 mcp_rcontext_cleanup(L, fgen, rctx, fgen_idx);
502 }
503
504 if (fgen->argument_ref) {
505 luaL_unref(L, LUA_REGISTRYINDEX, fgen->argument_ref);
506 fgen->argument_ref = 0;
507 }
508
509 if (fgen->generator_ref) {
510 luaL_unref(L, LUA_REGISTRYINDEX, fgen->generator_ref);
511 fgen->generator_ref = 0;
512 }
513
514 if (fgen->queue_list) {
515 for (int x = 0; x < fgen->max_queues; x++) {
516 struct mcp_rqueue_s *rqu = &fgen->queue_list[x];
517 if (rqu->obj_type == RQUEUE_TYPE_POOL) {
518 // just the obj_ref
519 luaL_unref(L, LUA_REGISTRYINDEX, rqu->obj_ref);
520 } else if (rqu->obj_type == RQUEUE_TYPE_FGEN) {
521 // don't need to recurse, just deref.
522 mcp_funcgen_t *subfgen = rqu->obj;
523 mcp_funcgen_dereference(L, subfgen);
524 } else if (rqu->obj_type != RQUEUE_TYPE_NONE) {
525 assert(1 == 0);
526 }
527 }
528 free(fgen->queue_list);
529 }
530
531 free(fgen->list);
532 fgen->list = NULL;
533 lua_pop(L, 1); // drop funcgen reference
534 }
535
536 // Must be called with the function generator at on top of stack
537 // Pops the value from the stack.
mcp_funcgen_reference(lua_State * L)538 void mcp_funcgen_reference(lua_State *L) {
539 mcp_funcgen_t *fgen = luaL_checkudata(L, -1, "mcp.funcgen");
540 if (fgen->self_ref) {
541 fgen->refcount++;
542 lua_pop(L, 1); // ensure we drop the extra value.
543 } else {
544 fgen->self_ref = luaL_ref(L, LUA_REGISTRYINDEX);
545 fgen->refcount = 1;
546 }
547 P_DEBUG("%s: funcgen referenced: %d\n", __func__, fgen->refcount);
548 }
549
mcp_funcgen_dereference(lua_State * L,mcp_funcgen_t * fgen)550 void mcp_funcgen_dereference(lua_State *L, mcp_funcgen_t *fgen) {
551 assert(fgen->refcount > 0);
552 fgen->refcount--;
553 P_DEBUG("%s: funcgen dereferenced: %d\n", __func__, fgen->refcount);
554 if (fgen->refcount == 0) {
555 fgen->closed = true;
556
557 P_DEBUG("%s: funcgen cleaning up\n", __func__);
558 if (fgen->free == fgen->total) {
559 mcp_funcgen_cleanup(L, fgen);
560 }
561 }
562 }
563
564 // All we need to do here is copy the function reference we've stashed into
565 // the C closure's upvalue and return it.
_mcplib_funcgenbare_generator(lua_State * L)566 static int _mcplib_funcgenbare_generator(lua_State *L) {
567 lua_pushvalue(L, lua_upvalueindex(1));
568 return 1;
569 }
570
571 // helper function to create a function generator with a "default" function.
572 // the function passed in here is a standard 'function(r) etc end' prototype,
573 // which we want to always return instead of calling a real generator
574 // function.
mcplib_funcgenbare_new(lua_State * L)575 int mcplib_funcgenbare_new(lua_State *L) {
576 if (!lua_isfunction(L, -1)) {
577 proxy_lua_error(L, "Must pass a function to mcp.funcgenbare_new");
578 return 0;
579 }
580
581 // Pops the function into the upvalue of this C closure function.
582 lua_pushcclosure(L, _mcplib_funcgenbare_generator, 1);
583 // FIXME: not urgent, but this function chain isn't stack balanced, and its caller has
584 // to drop an extra reference.
585 // Need to re-audit and decide if we still need this pushvalue here or if
586 // we can drop the pop from the caller and leave this function balanced.
587 lua_pushvalue(L, -1);
588 int gen_ref = luaL_ref(L, LUA_REGISTRYINDEX);
589
590 // Pass our fakeish generator function down the line.
591 mcplib_funcgen_new(L);
592
593 mcp_funcgen_t *fgen = lua_touserdata(L, -1);
594 strncpy(fgen->name, "anonymous", FGEN_NAME_MAXLEN);
595 mcp_sharedvm_delta(fgen->thread->proxy_ctx, SHAREDVM_FGEN_IDX, fgen->name, 1);
596
597 fgen->generator_ref = gen_ref;
598 fgen->ready = true;
599 return 1;
600 }
601
602 #define FGEN_DEFAULT_FREELIST_SIZE 8
mcplib_funcgen_new(lua_State * L)603 int mcplib_funcgen_new(lua_State *L) {
604 LIBEVENT_THREAD *t = PROXY_GET_THR(L);
605
606 mcp_funcgen_t *fgen = lua_newuserdatauv(L, sizeof(mcp_funcgen_t), 2);
607 memset(fgen, 0, sizeof(mcp_funcgen_t));
608 fgen->thread = t;
609 fgen->free_max = FGEN_DEFAULT_FREELIST_SIZE;
610 fgen->list = calloc(fgen->free_max, sizeof(mcp_rcontext_t *));
611
612 luaL_getmetatable(L, "mcp.funcgen");
613 lua_setmetatable(L, -2);
614
615 // the table we will use to hold references to rctx's
616 lua_createtable(L, 8, 0);
617 // set our table into the uservalue 1 of fgen (idx -2)
618 // pops the table.
619 lua_setiuservalue(L, -2, 1);
620
621 return 1;
622 }
623
mcplib_funcgen_new_handle(lua_State * L)624 int mcplib_funcgen_new_handle(lua_State *L) {
625 mcp_funcgen_t *fgen = lua_touserdata(L, 1);
626 mcp_pool_proxy_t *pp = NULL;
627 mcp_funcgen_t *fg = NULL;
628
629 if (fgen->ready) {
630 proxy_lua_error(L, "cannot modify function generator after calling ready");
631 return 0;
632 }
633
634 if ((pp = luaL_testudata(L, 2, "mcp.pool_proxy")) != NULL) {
635 // good.
636 } else if ((fg = luaL_testudata(L, 2, "mcp.funcgen")) != NULL) {
637 if (fg->is_router) {
638 proxy_lua_error(L, "cannot assign a router to a handle in new_handle");
639 return 0;
640 }
641 if (fg->closed) {
642 proxy_lua_error(L, "cannot use a replaced function in new_handle");
643 return 0;
644 }
645 } else {
646 proxy_lua_error(L, "invalid argument to new_handle");
647 return 0;
648 }
649
650 fgen->max_queues++;
651 if (fgen->queue_list == NULL) {
652 fgen->queue_list = malloc(sizeof(struct mcp_rqueue_s));
653 } else {
654 fgen->queue_list = realloc(fgen->queue_list, fgen->max_queues * sizeof(struct mcp_rqueue_s));
655 }
656 if (fgen->queue_list == NULL) {
657 proxy_lua_error(L, "failed to realloc queue list during new_handle()");
658 return 0;
659 }
660
661 struct mcp_rqueue_s *rqu = &fgen->queue_list[fgen->max_queues-1];
662 memset(rqu, 0, sizeof(*rqu));
663
664 if (pp) {
665 // pops pp from the stack
666 rqu->obj_ref = luaL_ref(L, LUA_REGISTRYINDEX);
667 rqu->obj_type = RQUEUE_TYPE_POOL;
668 rqu->obj = pp;
669 } else {
670 // pops the fgen from the stack.
671 mcp_funcgen_reference(L);
672 rqu->obj_type = RQUEUE_TYPE_FGEN;
673 rqu->obj = fg;
674 }
675
676 lua_pushinteger(L, fgen->max_queues-1);
677 return 1;
678 }
679
mcplib_funcgen_ready(lua_State * L)680 int mcplib_funcgen_ready(lua_State *L) {
681 mcp_funcgen_t *fgen = lua_touserdata(L, 1);
682 luaL_checktype(L, 2, LUA_TTABLE);
683
684 if (fgen->ready) {
685 proxy_lua_error(L, "cannot modify function generator after calling ready");
686 return 0;
687 }
688
689 if (lua_getfield(L, 2, "f") != LUA_TFUNCTION) {
690 proxy_lua_error(L, "Must specify generator function ('f') to fgen:ready");
691 return 0;
692 }
693 fgen->generator_ref = luaL_ref(L, LUA_REGISTRYINDEX);
694
695 if (lua_getfield(L, 2, "a") != LUA_TNIL) {
696 fgen->argument_ref = luaL_ref(L, LUA_REGISTRYINDEX);
697 } else {
698 lua_pop(L, 1);
699 }
700
701 if (lua_getfield(L, 2, "n") == LUA_TSTRING) {
702 size_t len = 0;
703 const char *name = lua_tolstring(L, -1, &len);
704 strncpy(fgen->name, name, FGEN_NAME_MAXLEN);
705 } else {
706 strncpy(fgen->name, "anonymous", FGEN_NAME_MAXLEN);
707 lua_pop(L, 1);
708 }
709
710 // now we test the generator function and create the first slot.
711 lua_pushvalue(L, 1); // copy the funcgen to pass into gencall
712 lua_rawgeti(L, LUA_REGISTRYINDEX, fgen->generator_ref); // for gencall
713 _mcplib_funcgen_gencall(L);
714 lua_pop(L, 1); // drop extra funcgen ref.
715
716 // add us to the global state
717 mcp_sharedvm_delta(fgen->thread->proxy_ctx, SHAREDVM_FGEN_IDX, fgen->name, 1);
718
719 fgen->ready = true;
720 return 1;
721 }
722
723 // Handlers for request contexts
724
mcplib_rcontext_handle_set_cb(lua_State * L)725 int mcplib_rcontext_handle_set_cb(lua_State *L) {
726 mcp_rcontext_t *rctx = lua_touserdata(L, 1);
727 luaL_checktype(L, 2, LUA_TNUMBER);
728 luaL_checktype(L, 3, LUA_TFUNCTION);
729
730 int handle = lua_tointeger(L, 2);
731 if (handle < 0 || handle >= rctx->fgen->max_queues) {
732 proxy_lua_error(L, "invalid handle passed to queue_set_cb");
733 return 0;
734 }
735
736 struct mcp_rqueue_s *rqu = &rctx->qslots[handle];
737 if (rqu->cb_ref) {
738 luaL_unref(L, LUA_REGISTRYINDEX, rqu->cb_ref);
739 }
740 rqu->cb_ref = luaL_ref(L, LUA_REGISTRYINDEX);
741
742 return 0;
743 }
744
745 // call with request object on top of stack.
746 // pops the request object
747 // FIXME: callers are doing a pushvalue(L, 2) and then in here we're also
748 // pushvalue(L, 2)
749 // Think this should just document as needing the request object top of stack
750 // and xmove without the extra push bits.
_mcplib_rcontext_queue(lua_State * L,mcp_rcontext_t * rctx,mcp_request_t * rq,int handle)751 static void _mcplib_rcontext_queue(lua_State *L, mcp_rcontext_t *rctx, mcp_request_t *rq, int handle) {
752 if (handle < 0 || handle >= rctx->fgen->max_queues) {
753 proxy_lua_error(L, "attempted to enqueue an invalid handle");
754 return;
755 }
756 struct mcp_rqueue_s *rqu = &rctx->qslots[handle];
757
758 if (rqu->state != RQUEUE_IDLE) {
759 lua_pop(L, 1);
760 return;
761 }
762
763 // If we're queueing to an fgen, arm the coroutine while we have the
764 // objects handy. Else this requires roundtripping a luaL_ref/luaL_unref
765 // later.
766 if (rqu->obj_type == RQUEUE_TYPE_FGEN) {
767 mcp_rcontext_t *subrctx = rqu->obj;
768 lua_pushvalue(L, 2); // duplicate the request obj
769 lua_rawgeti(subrctx->Lc, LUA_REGISTRYINDEX, subrctx->function_ref);
770 lua_xmove(L, subrctx->Lc, 1); // move the requet object.
771 subrctx->pending_reqs++;
772 }
773
774 // hold the request reference.
775 rqu->req_ref = luaL_ref(L, LUA_REGISTRYINDEX);
776
777 rqu->state = RQUEUE_QUEUED;
778 rqu->rq = rq;
779 }
780
781 // first arg is rcontext
782 // then a request object
783 // then either a handle (integer) or array style table of handles
mcplib_rcontext_enqueue(lua_State * L)784 int mcplib_rcontext_enqueue(lua_State *L) {
785 mcp_rcontext_t *rctx = lua_touserdata(L, 1);
786 mcp_request_t *rq = luaL_checkudata(L, 2, "mcp.request");
787
788 if (rctx->wait_mode != QWAIT_IDLE) {
789 proxy_lua_error(L, "enqueue: cannot enqueue new requests while in a wait");
790 return 0;
791 }
792
793 if (!rq->pr.keytoken) {
794 proxy_lua_error(L, "cannot queue requests without a key");
795 return 0;
796 }
797
798 int type = lua_type(L, 3);
799 if (type == LUA_TNUMBER) {
800 int handle = lua_tointeger(L, 3);
801
802 lua_pushvalue(L, 2);
803 _mcplib_rcontext_queue(L, rctx, rq, handle);
804 } else if (type == LUA_TTABLE) {
805 unsigned int len = lua_rawlen(L, 3);
806 for (int x = 0; x < len; x++) {
807 type = lua_rawgeti(L, 3, x+1);
808 if (type != LUA_TNUMBER) {
809 proxy_lua_error(L, "invalid handle passed to queue via array table");
810 return 0;
811 }
812
813 int handle = lua_tointeger(L, 4);
814 lua_pop(L, 1);
815
816 lua_pushvalue(L, 2);
817 _mcplib_rcontext_queue(L, rctx, rq, handle);
818 }
819 } else {
820 proxy_lua_error(L, "must pass a handle or a table to queue");
821 return 0;
822 }
823
824 return 0;
825 }
826
827 // TODO: pre-generate a result object into sub-rctx's that we can pull up for
828 // this, instead of allocating outside of a protected call.
_mcp_resume_rctx_process_error(mcp_rcontext_t * rctx,struct mcp_rqueue_s * rqu)829 static void _mcp_resume_rctx_process_error(mcp_rcontext_t *rctx, struct mcp_rqueue_s *rqu) {
830 // we have an error. need to mark the error into the parent rqu
831 rqu->flags |= RQUEUE_R_ERROR|RQUEUE_R_ANY;
832 mcp_resp_t *r = mcp_prep_bare_resobj(rctx->Lc, rctx->fgen->thread);
833 r->status = MCMC_ERR;
834 r->resp.code = MCMC_CODE_SERVER_ERROR;
835 assert(rqu->res_ref == 0);
836 rqu->res_ref = luaL_ref(rctx->Lc, LUA_REGISTRYINDEX);
837 mcp_process_rqueue_return(rctx->parent, rctx->parent_handle, r);
838 if (rctx->parent->wait_count) {
839 mcp_process_rctx_wait(rctx->parent, rctx->parent_handle);
840 }
841 }
842
_mcp_start_rctx_process_error(mcp_rcontext_t * rctx,struct mcp_rqueue_s * rqu)843 static void _mcp_start_rctx_process_error(mcp_rcontext_t *rctx, struct mcp_rqueue_s *rqu) {
844 // we have an error. need to mark the error into the parent rqu
845 rqu->flags |= RQUEUE_R_ERROR|RQUEUE_R_ANY;
846 mcp_resp_t *r = mcp_prep_bare_resobj(rctx->Lc, rctx->fgen->thread);
847 r->status = MCMC_ERR;
848 r->resp.code = MCMC_CODE_SERVER_ERROR;
849 assert(rqu->res_ref == 0);
850 rqu->res_ref = luaL_ref(rctx->Lc, LUA_REGISTRYINDEX);
851
852 // queue an IO to return later.
853 io_pending_proxy_t *p = mcp_queue_rctx_io(rctx->parent, NULL, NULL, r);
854 p->return_cb = proxy_return_rqu_cb;
855 p->queue_handle = rctx->parent_handle;
856 p->background = true;
857 }
858
mcp_start_subrctx(mcp_rcontext_t * rctx)859 static void mcp_start_subrctx(mcp_rcontext_t *rctx) {
860 int res = proxy_run_rcontext(rctx);
861 struct mcp_rqueue_s *rqu = &rctx->parent->qslots[rctx->parent_handle];
862 if (res == LUA_OK) {
863 int type = lua_type(rctx->Lc, 1);
864 mcp_resp_t *r = NULL;
865 if (type == LUA_TUSERDATA && (r = luaL_testudata(rctx->Lc, 1, "mcp.response")) != NULL) {
866 // move stack result object into parent rctx rqu slot.
867 assert(rqu->res_ref == 0);
868 rqu->res_ref = luaL_ref(rctx->Lc, LUA_REGISTRYINDEX);
869
870 io_pending_proxy_t *p = mcp_queue_rctx_io(rctx->parent, NULL, NULL, r);
871 p->return_cb = proxy_return_rqu_cb;
872 p->queue_handle = rctx->parent_handle;
873 // TODO: change name of property to fast-return once mcp.await is
874 // retired.
875 p->background = true;
876 } else if (type == LUA_TSTRING) {
877 // TODO: wrap with a resobj and parse it.
878 // for now we bypass the rqueue process handling
879 // meaning no callbacks/etc.
880 assert(rqu->res_ref == 0);
881 rqu->res_ref = luaL_ref(rctx->Lc, LUA_REGISTRYINDEX);
882 rqu->flags |= RQUEUE_R_ANY;
883 rqu->state = RQUEUE_COMPLETE;
884 io_pending_proxy_t *p = mcp_queue_rctx_io(rctx->parent, NULL, NULL, NULL);
885 p->return_cb = proxy_return_rqu_cb;
886 p->queue_handle = rctx->parent_handle;
887 p->background = true;
888 } else {
889 // generate a generic object with an error.
890 _mcp_start_rctx_process_error(rctx, rqu);
891 }
892 } else if (res == LUA_YIELD) {
893 // normal.
894 } else {
895 lua_pop(rctx->Lc, 1); // drop the error message.
896 _mcp_start_rctx_process_error(rctx, rqu);
897 }
898 }
899
mcp_resume_rctx_from_cb(mcp_rcontext_t * rctx)900 static void mcp_resume_rctx_from_cb(mcp_rcontext_t *rctx) {
901 int res = proxy_run_rcontext(rctx);
902 if (rctx->parent) {
903 struct mcp_rqueue_s *rqu = &rctx->parent->qslots[rctx->parent_handle];
904 if (res == LUA_OK) {
905 int type = lua_type(rctx->Lc, 1);
906 mcp_resp_t *r = NULL;
907 if (type == LUA_TUSERDATA && (r = luaL_testudata(rctx->Lc, 1, "mcp.response")) != NULL) {
908 // move stack result object into parent rctx rqu slot.
909 assert(rqu->res_ref == 0);
910 rqu->res_ref = luaL_ref(rctx->Lc, LUA_REGISTRYINDEX);
911 mcp_process_rqueue_return(rctx->parent, rctx->parent_handle, r);
912 } else if (type == LUA_TSTRING) {
913 // TODO: wrap with a resobj and parse it.
914 // for now we bypass the rqueue process handling
915 // meaning no callbacks/etc.
916 assert(rqu->res_ref == 0);
917 rqu->res_ref = luaL_ref(rctx->Lc, LUA_REGISTRYINDEX);
918 rqu->flags |= RQUEUE_R_ANY;
919 rqu->state = RQUEUE_COMPLETE;
920 } else {
921 // generate a generic object with an error.
922 _mcp_resume_rctx_process_error(rctx, rqu);
923 }
924 if (rctx->parent->wait_count) {
925 mcp_process_rctx_wait(rctx->parent, rctx->parent_handle);
926 }
927 mcp_funcgen_return_rctx(rctx);
928 } else if (res == LUA_YIELD) {
929 // normal.
930 _mcp_queue_hack(rctx->c);
931 } else {
932 lua_pop(rctx->Lc, 1); // drop the error message.
933 _mcp_resume_rctx_process_error(rctx, rqu);
934 mcp_funcgen_return_rctx(rctx);
935 }
936 } else {
937 // Parent rctx has returned either a response or error to its top
938 // level resp object and is now complete.
939 // HACK
940 // see notes in proxy_run_rcontext()
941 // NOTE: this function is called from rqu_cb(), which pushes the
942 // submission loop. This code below can call drive_machine(), which
943 // calls submission loop if stuff is queued.
944 // Would remove redundancy if we can signal up to rqu_cb() and either
945 // q->count-- or do the inline submission at that level.
946 if (res != LUA_YIELD) {
947 mcp_funcgen_return_rctx(rctx);
948 io_queue_t *q = conn_io_queue_get(rctx->c, IO_QUEUE_PROXY);
949 q->count--;
950 if (q->count == 0) {
951 // call re-add directly since we're already in the worker thread.
952 conn_worker_readd(rctx->c);
953 }
954 } else if (res == LUA_YIELD) {
955 _mcp_queue_hack(rctx->c);
956 }
957 }
958 }
959
960 // This "Dummy" IO immediately resumes the yielded function, without a result
961 // attached.
proxy_return_rqu_dummy_cb(io_pending_t * pending)962 static void proxy_return_rqu_dummy_cb(io_pending_t *pending) {
963 io_pending_proxy_t *p = (io_pending_proxy_t *)pending;
964 mcp_rcontext_t *rctx = p->rctx;
965
966 rctx->pending_reqs--;
967 assert(rctx->pending_reqs > -1);
968
969 lua_settop(rctx->Lc, 0);
970 lua_pushinteger(rctx->Lc, 0); // return a "0" done count to the function.
971 mcp_resume_rctx_from_cb(rctx);
972
973 do_cache_free(p->thread->io_cache, p);
974 }
975
mcp_process_rctx_wait(mcp_rcontext_t * rctx,int handle)976 void mcp_process_rctx_wait(mcp_rcontext_t *rctx, int handle) {
977 struct mcp_rqueue_s *rqu = &rctx->qslots[handle];
978 int status = rqu->flags;
979 assert(rqu->state == RQUEUE_COMPLETE);
980 // waiting for some IO's to complete before continuing.
981 // meaning if we "match good" here, we can resume.
982 // we can also resume if we are in wait mode but pending_reqs is down
983 // to 1.
984 switch (rctx->wait_mode) {
985 case QWAIT_IDLE:
986 // should be impossible to get here.
987 // TODO: find a better path for throwing real errors from these
988 // side cases. would feel better long term.
989 abort();
990 break;
991 case QWAIT_GOOD:
992 if (status & RQUEUE_R_GOOD) {
993 rctx->wait_done++;
994 rqu->state = RQUEUE_WAITED;
995 }
996 break;
997 case QWAIT_OK:
998 if (status & (RQUEUE_R_GOOD|RQUEUE_R_OK)) {
999 rctx->wait_done++;
1000 rqu->state = RQUEUE_WAITED;
1001 }
1002 break;
1003 case QWAIT_ANY:
1004 rctx->wait_done++;
1005 rqu->state = RQUEUE_WAITED;
1006 break;
1007 case QWAIT_FASTGOOD:
1008 if (status & RQUEUE_R_GOOD) {
1009 rctx->wait_done++;
1010 rqu->state = RQUEUE_WAITED;
1011 // resume early if "good"
1012 status |= RQUEUE_R_RESUME;
1013 } else if (status & RQUEUE_R_OK) {
1014 // count but don't resume early if "ok"
1015 rctx->wait_done++;
1016 rqu->state = RQUEUE_WAITED;
1017 }
1018 break;
1019 case QWAIT_HANDLE:
1020 // waiting for a specific handle to return
1021 if (handle == rctx->wait_handle) {
1022 rctx->wait_done++;
1023 rqu->state = RQUEUE_WAITED;
1024 }
1025 break;
1026 case QWAIT_SLEEP:
1027 assert(1 == 0); // should not get here.
1028 break;
1029 }
1030
1031 assert(rctx->pending_reqs != 0);
1032 if ((status & RQUEUE_R_RESUME) || rctx->wait_done == rctx->wait_count || rctx->pending_reqs == 1) {
1033 // ran out of stuff to wait for. time to resume.
1034 // TODO: can we do the settop at the yield? nothing we need to
1035 // keep in the stack in this mode.
1036 lua_settop(rctx->Lc, 0);
1037 rctx->wait_count = 0;
1038 if (rctx->wait_mode == QWAIT_HANDLE) {
1039 mcp_rcontext_push_rqu_res(rctx->Lc, rctx, handle);
1040 } else {
1041 lua_pushinteger(rctx->Lc, rctx->wait_done);
1042 }
1043 rctx->wait_mode = QWAIT_IDLE;
1044
1045 // nuke alarm if set.
1046 if (event_pending(&rctx->timeout_event, EV_TIMEOUT, NULL)) {
1047 event_del(&rctx->timeout_event);
1048 }
1049
1050 mcp_resume_rctx_from_cb(rctx);
1051 }
1052 }
1053
1054 // sets the slot's return status code, to be used for filtering responses
1055 // later.
1056 // if a callback was set, execute it now.
mcp_process_rqueue_return(mcp_rcontext_t * rctx,int handle,mcp_resp_t * res)1057 int mcp_process_rqueue_return(mcp_rcontext_t *rctx, int handle, mcp_resp_t *res) {
1058 struct mcp_rqueue_s *rqu = &rctx->qslots[handle];
1059 uint8_t flag = RQUEUE_R_ANY;
1060
1061 assert(rqu->state == RQUEUE_ACTIVE);
1062 rqu->state = RQUEUE_COMPLETE;
1063 if (res->status == MCMC_OK) {
1064 if (res->resp.code != MCMC_CODE_END) {
1065 flag = RQUEUE_R_GOOD;
1066 } else {
1067 flag = RQUEUE_R_OK;
1068 }
1069 }
1070
1071 if (rqu->cb_ref) {
1072 lua_settop(rctx->Lc, 0);
1073 lua_rawgeti(rctx->Lc, LUA_REGISTRYINDEX, rqu->cb_ref);
1074 lua_rawgeti(rctx->Lc, LUA_REGISTRYINDEX, rqu->res_ref);
1075 lua_rawgeti(rctx->Lc, LUA_REGISTRYINDEX, rqu->req_ref);
1076 if (lua_pcall(rctx->Lc, 2, 2, 0) != LUA_OK) {
1077 LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_ERROR, NULL, lua_tostring(rctx->Lc, -1));
1078 } else if (lua_isinteger(rctx->Lc, 1)) {
1079 // allow overriding the result flag from the callback.
1080 enum mcp_rqueue_e mode = lua_tointeger(rctx->Lc, 1);
1081 switch (mode) {
1082 case QWAIT_GOOD:
1083 flag = RQUEUE_R_GOOD;
1084 break;
1085 case QWAIT_OK:
1086 flag = RQUEUE_R_OK;
1087 break;
1088 case QWAIT_ANY:
1089 break;
1090 default:
1091 // ANY
1092 break;
1093 }
1094
1095 // if second result return shortcut status code
1096 if (lua_toboolean(rctx->Lc, 2)) {
1097 flag |= RQUEUE_R_RESUME;
1098 }
1099 }
1100 lua_settop(rctx->Lc, 0); // FIXME: This might not be necessary.
1101 // we settop _before_ calling cb's and
1102 // _before_ setting up for a coro resume.
1103 }
1104 rqu->flags |= flag;
1105 return rqu->flags;
1106 }
1107
1108 // specific function for queue-based returns.
proxy_return_rqu_cb(io_pending_t * pending)1109 static void proxy_return_rqu_cb(io_pending_t *pending) {
1110 io_pending_proxy_t *p = (io_pending_proxy_t *)pending;
1111 mcp_rcontext_t *rctx = p->rctx;
1112
1113 if (p->client_resp) {
1114 mcp_process_rqueue_return(rctx, p->queue_handle, p->client_resp);
1115 }
1116 rctx->pending_reqs--;
1117 assert(rctx->pending_reqs > -1);
1118
1119 if (rctx->wait_count) {
1120 mcp_process_rctx_wait(rctx, p->queue_handle);
1121 } else {
1122 mcp_funcgen_return_rctx(rctx);
1123 }
1124
1125 do_cache_free(p->thread->io_cache, p);
1126 }
1127
mcp_run_rcontext_handle(mcp_rcontext_t * rctx,int handle)1128 void mcp_run_rcontext_handle(mcp_rcontext_t *rctx, int handle) {
1129 struct mcp_rqueue_s *rqu = NULL;
1130 rqu = &rctx->qslots[handle];
1131
1132 if (rqu->state == RQUEUE_QUEUED) {
1133 rqu->state = RQUEUE_ACTIVE;
1134 if (rqu->obj_type == RQUEUE_TYPE_POOL) {
1135 mcp_request_t *rq = rqu->rq;
1136 mcp_backend_t *be = mcplib_pool_proxy_call_helper(rqu->obj, MCP_PARSER_KEY(rq->pr), rq->pr.klen);
1137 // FIXME: queue requires conn because we're stacking objects
1138 // into the connection for later submission, which means we
1139 // absolutely cannot queue things once *c becomes invalid.
1140 // need to assert/block this from happening.
1141 mcp_set_resobj(rqu->res_obj, rq, be, rctx->fgen->thread);
1142 io_pending_proxy_t *p = mcp_queue_rctx_io(rctx, rq, be, rqu->res_obj);
1143 p->return_cb = proxy_return_rqu_cb;
1144 p->queue_handle = handle;
1145 rctx->pending_reqs++;
1146 } else if (rqu->obj_type == RQUEUE_TYPE_FGEN) {
1147 // TODO: NULL the ->c post-return?
1148 mcp_rcontext_t *subrctx = rqu->obj;
1149 subrctx->c = rctx->c;
1150 rctx->pending_reqs++;
1151 mcp_start_subrctx(subrctx);
1152 } else {
1153 assert(1==0);
1154 }
1155 } else if (rqu->state == RQUEUE_COMPLETE && rctx->wait_count) {
1156 // The slot was previously completed from an earlier dispatch, but we
1157 // haven't "waited" on it yet.
1158 mcp_process_rctx_wait(rctx, handle);
1159 }
1160 }
1161
_mcplib_set_rctx_alarm(lua_State * L,mcp_rcontext_t * rctx,int arg)1162 static inline void _mcplib_set_rctx_alarm(lua_State *L, mcp_rcontext_t *rctx, int arg) {
1163 int isnum = 0;
1164 lua_Number secondsf = lua_tonumberx(L, arg, &isnum);
1165 if (!isnum) {
1166 proxy_lua_error(L, "timeout argument to wait or sleep must be a number");
1167 return;
1168 }
1169 int pending = event_pending(&rctx->timeout_event, EV_TIMEOUT, NULL);
1170 if ((pending & (EV_TIMEOUT)) == 0) {
1171 struct timeval tv = { .tv_sec = 0, .tv_usec = 0 };
1172 lua_Integer secondsi = (lua_Integer) secondsf;
1173 lua_Number subseconds = secondsf - secondsi;
1174
1175 tv.tv_sec = secondsi;
1176 tv.tv_usec = MICROSECONDS(subseconds);
1177 event_add(&rctx->timeout_event, &tv);
1178 }
1179 }
1180
1181 // TODO: one more function to wait on a list of handles? to queue and wait on
1182 // a list of handles? expand wait_cond()
1183
_mcplib_rcontext_wait_prep(lua_State * L,mcp_rcontext_t * rctx,int argc)1184 static inline int _mcplib_rcontext_wait_prep(lua_State *L, mcp_rcontext_t *rctx, int argc) {
1185 int mode = QWAIT_ANY;
1186 int wait = 0;
1187
1188 if (rctx->wait_mode != QWAIT_IDLE) {
1189 proxy_lua_error(L, "wait_cond: cannot call while already in wait mode");
1190 return 0;
1191 }
1192
1193 if (argc < 2) {
1194 proxy_lua_error(L, "must pass at least count to wait_cond");
1195 return 0;
1196 }
1197
1198 int isnum = 0;
1199 wait = lua_tointegerx(L, 2, &isnum);
1200 if (!isnum || wait < 0) {
1201 proxy_lua_error(L, "wait count for wait_cond must be a positive integer");
1202 return 0;
1203 }
1204
1205 if (argc > 2) {
1206 mode = lua_tointeger(L, 3);
1207 }
1208
1209 switch (mode) {
1210 case QWAIT_ANY:
1211 case QWAIT_OK:
1212 case QWAIT_GOOD:
1213 case QWAIT_FASTGOOD:
1214 break;
1215 default:
1216 proxy_lua_error(L, "invalid mode sent to wait_cond");
1217 return 0;
1218 }
1219
1220 rctx->wait_count = wait;
1221 rctx->wait_done = 0;
1222 rctx->wait_mode = mode;
1223
1224 return 0;
1225 }
1226
1227 // takes num, filter mode
mcplib_rcontext_wait_cond(lua_State * L)1228 int mcplib_rcontext_wait_cond(lua_State *L) {
1229 int argc = lua_gettop(L);
1230 mcp_rcontext_t *rctx = lua_touserdata(L, 1);
1231
1232 _mcplib_rcontext_wait_prep(L, rctx, argc);
1233
1234 // waiting for none, meaning just execute the queues.
1235 if (rctx->wait_count == 0) {
1236 io_pending_proxy_t *p = mcp_queue_rctx_io(rctx, NULL, NULL, NULL);
1237 p->return_cb = proxy_return_rqu_dummy_cb;
1238 p->background = true;
1239 rctx->pending_reqs++;
1240 rctx->wait_mode = QWAIT_IDLE; // not actually waiting.
1241 } else if (argc > 3) {
1242 // optional wait timeout. does not cancel existing request!
1243 _mcplib_set_rctx_alarm(L, rctx, 4);
1244 }
1245
1246 lua_pushinteger(L, MCP_YIELD_WAITCOND);
1247 return lua_yield(L, 1);
1248 }
1249
mcplib_rcontext_enqueue_and_wait(lua_State * L)1250 int mcplib_rcontext_enqueue_and_wait(lua_State *L) {
1251 mcp_rcontext_t *rctx = lua_touserdata(L, 1);
1252 mcp_request_t *rq = luaL_checkudata(L, 2, "mcp.request");
1253 int isnum = 0;
1254 int handle = lua_tointegerx(L, 3, &isnum);
1255
1256 if (rctx->wait_mode != QWAIT_IDLE) {
1257 proxy_lua_error(L, "wait_cond: cannot call while already in wait mode");
1258 return 0;
1259 }
1260
1261 if (!rq->pr.keytoken) {
1262 proxy_lua_error(L, "cannot queue requests without a key");
1263 return 0;
1264 }
1265
1266 if (!isnum) {
1267 proxy_lua_error(L, "invalid handle passed to enqueue_and_wait");
1268 return 0;
1269 }
1270
1271 // queue up this handle and yield for the direct wait.
1272 lua_pushvalue(L, 2);
1273 _mcplib_rcontext_queue(L, rctx, rq, handle);
1274
1275 if (lua_gettop(L) > 3) {
1276 _mcplib_set_rctx_alarm(L, rctx, 4);
1277 }
1278
1279 rctx->wait_done = 0;
1280 rctx->wait_count = 1;
1281 rctx->wait_mode = QWAIT_HANDLE;
1282 rctx->wait_handle = handle;
1283
1284 lua_pushinteger(L, MCP_YIELD_WAITHANDLE);
1285 return lua_yield(L, 1);
1286 }
1287
mcplib_rcontext_wait_handle(lua_State * L)1288 int mcplib_rcontext_wait_handle(lua_State *L) {
1289 mcp_rcontext_t *rctx = lua_touserdata(L, 1);
1290 int isnum = 0;
1291 int handle = lua_tointegerx(L, 2, &isnum);
1292
1293 if (rctx->wait_mode != QWAIT_IDLE) {
1294 proxy_lua_error(L, "wait: cannot call while already in wait mode");
1295 return 0;
1296 }
1297
1298 if (!isnum || handle < 0 || handle >= rctx->fgen->max_queues) {
1299 proxy_lua_error(L, "invalid handle passed to wait_handle");
1300 return 0;
1301 }
1302
1303 struct mcp_rqueue_s *rqu = &rctx->qslots[handle];
1304 if (rqu->state == RQUEUE_IDLE) {
1305 proxy_lua_error(L, "wait_handle called on unqueued handle");
1306 return 0;
1307 }
1308
1309 if (lua_gettop(L) > 2) {
1310 _mcplib_set_rctx_alarm(L, rctx, 3);
1311 }
1312
1313 rctx->wait_done = 0;
1314 rctx->wait_count = 1;
1315 rctx->wait_mode = QWAIT_HANDLE;
1316 rctx->wait_handle = handle;
1317
1318 lua_pushinteger(L, MCP_YIELD_WAITHANDLE);
1319 return lua_yield(L, 1);
1320 }
1321
1322 // TODO: This is disabled due to issues with the IO subsystem. Fixing this
1323 // requires retiring of API1 to allow refactoring or some extra roundtrip
1324 // work.
1325 // - if rctx:sleep() is called, the coroutine is suspended.
1326 // - once resumed after the timeout, we may later suspend again and make
1327 // backend requests
1328 // - once the coroutine is completed, we need to check if the owning client
1329 // conn is ready to be resumed
1330 // - BUG: we can only get into the "conn is in IO queue wait" state if a
1331 // sub-IO was created and submitted somewhere.
1332 // - This means either rctx:sleep() needs to be implemented by submitting a
1333 // dummy IO req (as other code does)
1334 // - OR we need to refactor the IO system so the dummies aren't required
1335 // anymore.
1336 //
1337 // If a dummy is used, we would have to implement this as:
1338 // - immediately submit a dummy IO if sleep is called.
1339 // - this allows the IO system to move the connection into the right state
1340 // - will immediately circle back then set an alarm for the sleep timeout
1341 // - once the sleep resumes, run code as normal. resumption should work
1342 // properly since we've entered the correct state originally.
1343 //
1344 // This adds a lot of CPU overhead to sleep, which should be fine given the
1345 // nature of the function, but also adds a lot of code and increases the
1346 // chances of bugs. So I'm leaving it out until this can be implemented more
1347 // simply.
mcplib_rcontext_sleep(lua_State * L)1348 int mcplib_rcontext_sleep(lua_State *L) {
1349 mcp_rcontext_t *rctx = lua_touserdata(L, 1);
1350 if (rctx->wait_mode != QWAIT_IDLE) {
1351 proxy_lua_error(L, "sleep: cannot call while already in wait mode");
1352 return 0;
1353 };
1354
1355 _mcplib_set_rctx_alarm(L, rctx, 2);
1356 rctx->wait_mode = QWAIT_SLEEP;
1357
1358 lua_pushinteger(L, MCP_YIELD_SLEEP);
1359 return lua_yield(L, 1);
1360 }
1361
_mcplib_rcontext_checkhandle(lua_State * L)1362 static inline struct mcp_rqueue_s *_mcplib_rcontext_checkhandle(lua_State *L) {
1363 mcp_rcontext_t *rctx = lua_touserdata(L, 1);
1364 int isnum = 0;
1365 int handle = lua_tointegerx(L, 2, &isnum);
1366 if (!isnum || handle < 0 || handle >= rctx->fgen->max_queues) {
1367 proxy_lua_error(L, "invalid queue handle passed to :good/:ok:/:any");
1368 return NULL;
1369 }
1370
1371 struct mcp_rqueue_s *rqu = &rctx->qslots[handle];
1372 return rqu;
1373 }
1374
mcplib_rcontext_res_good(lua_State * L)1375 int mcplib_rcontext_res_good(lua_State *L) {
1376 struct mcp_rqueue_s *rqu = _mcplib_rcontext_checkhandle(L);
1377 if (rqu->flags & RQUEUE_R_GOOD) {
1378 lua_rawgeti(L, LUA_REGISTRYINDEX, rqu->res_ref);
1379 } else {
1380 lua_pushnil(L);
1381 }
1382 return 1;
1383 }
1384
mcplib_rcontext_res_ok(lua_State * L)1385 int mcplib_rcontext_res_ok(lua_State *L) {
1386 struct mcp_rqueue_s *rqu = _mcplib_rcontext_checkhandle(L);
1387 if (rqu->flags & (RQUEUE_R_OK|RQUEUE_R_GOOD)) {
1388 lua_rawgeti(L, LUA_REGISTRYINDEX, rqu->res_ref);
1389 } else {
1390 lua_pushnil(L);
1391 }
1392 return 1;
1393 }
1394
mcplib_rcontext_res_any(lua_State * L)1395 int mcplib_rcontext_res_any(lua_State *L) {
1396 struct mcp_rqueue_s *rqu = _mcplib_rcontext_checkhandle(L);
1397 if (rqu->flags & (RQUEUE_R_ANY|RQUEUE_R_OK|RQUEUE_R_GOOD)) {
1398 lua_rawgeti(L, LUA_REGISTRYINDEX, rqu->res_ref);
1399 } else {
1400 // Shouldn't be possible to get here, unless you're asking about a
1401 // queue that was never armed or hasn't completed yet.
1402 lua_pushnil(L);
1403 }
1404 return 1;
1405 }
1406
1407 // returns res, RES_GOOD|OK|ANY
mcplib_rcontext_result(lua_State * L)1408 int mcplib_rcontext_result(lua_State *L) {
1409 struct mcp_rqueue_s *rqu = _mcplib_rcontext_checkhandle(L);
1410 if (rqu->flags & (RQUEUE_R_ANY|RQUEUE_R_OK|RQUEUE_R_GOOD)) {
1411 lua_rawgeti(L, LUA_REGISTRYINDEX, rqu->res_ref);
1412 // mask away any other queue flags.
1413 lua_pushinteger(L, rqu->flags & (RQUEUE_R_ANY|RQUEUE_R_OK|RQUEUE_R_GOOD));
1414 } else {
1415 lua_pushnil(L);
1416 lua_pushnil(L);
1417 }
1418
1419 return 2;
1420 }
1421
mcplib_rcontext_cfd(lua_State * L)1422 int mcplib_rcontext_cfd(lua_State *L) {
1423 mcp_rcontext_t *rctx = lua_touserdata(L, 1);
1424 lua_pushinteger(L, rctx->conn_fd);
1425 return 1;
1426 }
1427
1428 // Must not call this if rctx has returned result to client already.
mcplib_rcontext_tls_peer_cn(lua_State * L)1429 int mcplib_rcontext_tls_peer_cn(lua_State *L) {
1430 mcp_rcontext_t *rctx = lua_touserdata(L, 1);
1431 if (!rctx->c) {
1432 lua_pushnil(L);
1433 return 1;
1434 }
1435
1436 #ifdef TLS
1437 int len = 0;
1438 const unsigned char *cn = ssl_get_peer_cn(rctx->c, &len);
1439 if (cn) {
1440 lua_pushlstring(L, (const char *)cn, len);
1441 } else {
1442 lua_pushnil(L);
1443 }
1444 #else
1445 lua_pushnil(L);
1446 #endif
1447 return 1;
1448 }
1449
1450 // TODO: stub function.
1451 // need to attach request function to rcontext, by using another function that
1452 // doesn't fill out the request info
mcplib_rcontext_request_new(lua_State * L)1453 int mcplib_rcontext_request_new(lua_State *L) {
1454 mcp_parser_t pr = {0};
1455
1456 mcp_new_request(L, &pr, " ", 1);
1457 return 1;
1458 }
1459
1460 // TODO: stub function, see request_new above.
mcplib_rcontext_response_new(lua_State * L)1461 int mcplib_rcontext_response_new(lua_State *L) {
1462 mcp_resp_t *r = lua_newuserdatauv(L, sizeof(mcp_resp_t), 0);
1463 memset(r, 0, sizeof(mcp_resp_t));
1464 luaL_getmetatable(L, "mcp.response");
1465 lua_setmetatable(L, -2);
1466 return 1;
1467 }
1468
1469 // the supplied handle must be valid.
mcp_rcontext_push_rqu_res(lua_State * L,mcp_rcontext_t * rctx,int handle)1470 void mcp_rcontext_push_rqu_res(lua_State *L, mcp_rcontext_t *rctx, int handle) {
1471 struct mcp_rqueue_s *rqu = &rctx->qslots[handle];
1472 lua_rawgeti(L, LUA_REGISTRYINDEX, rqu->res_ref);
1473 }
1474
1475 /*
1476 * Specialized router funcgen.
1477 * For routing a key across a map of possible function generators, we use a
1478 * specialized function generator. This is to keep the attach and start code
1479 * consistent, as they only need to think about function generators.
1480 * It also keeps the cleanup code consistent, as when a "router" funcgen is
1481 * replaced by mcp.attach() during a reload, we can immediately dereference
1482 * all of the route fgens, rather than have to wait for GC.
1483 *
1484 * Another upside is when we're starting a new request, we can immediately
1485 * swap out the top level fgen object, rather than force all routes to be
1486 * processed as sub-funcs, which is a tiny bit slower and disallows custom
1487 * request object sizes.
1488 *
1489 * The downside is this will appear to be bolted onto the side of the existing
1490 * structs rather than be its own object, like I initially wanted.
1491 */
1492
_mcp_router_shortsep(const char * key,const int klen,const char needle,size_t * len)1493 static inline const char *_mcp_router_shortsep(const char *key, const int klen, const char needle, size_t *len) {
1494 const char *end = NULL;
1495 const char *lookup = NULL;
1496
1497 end = memchr(key, needle, klen);
1498 if (end == NULL) {
1499 lookup = key;
1500 } else {
1501 lookup = key;
1502 *len = end - key;
1503 }
1504
1505 return lookup;
1506 }
1507
1508 // we take some liberties here because we know needle and key can't be zero
1509 // this isn't the most hyper optimized search but prefixes and separators
1510 // should both be short.
_mcp_router_longsep(const char * key,const int klen,const char * needle,size_t * len)1511 static inline const char *_mcp_router_longsep(const char *key, const int klen, const char *needle, size_t *len) {
1512 const char *end = NULL;
1513 const char *lookup = key;
1514 size_t nlen = strlen(needle);
1515
1516 end = memchr(key, needle[0], klen);
1517 if (end == NULL) {
1518 // definitely no needle in this haystack.
1519 return key;
1520 }
1521
1522 // find the last possible position
1523 const char *last = key + (klen - nlen);
1524
1525 while (end <= last) {
1526 if (*end == needle[0] && memcmp(end, needle, nlen) == 0) {
1527 lookup = key;
1528 *len = end - key;
1529 break;
1530 }
1531 end++;
1532 }
1533
1534 return lookup;
1535 }
1536
_mcp_router_anchorsm(const char * key,const int klen,const char * needle,size_t * len)1537 static inline const char *_mcp_router_anchorsm(const char *key, const int klen, const char *needle, size_t *len) {
1538 // check the first byte anchor.
1539 if (key[0] != needle[0]) {
1540 return NULL;
1541 }
1542
1543 // rest is same as shortsep.
1544 return _mcp_router_shortsep(key+1, klen-1, needle[1], len);
1545 }
1546
_mcp_router_anchorbig(const char * key,const int klen,const struct mcp_router_long_s * conf,size_t * len)1547 static inline const char *_mcp_router_anchorbig(const char *key, const int klen, const struct mcp_router_long_s *conf, size_t *len) {
1548 // check long anchored prefix.
1549 size_t slen = strlen(conf->start);
1550 // check for start len+2 to avoid sending a zero byte haystack to longsep
1551 if (slen+2 > klen || memcmp(key, conf->start, slen) != 0) {
1552 return NULL;
1553 }
1554
1555 // rest is same as longsep
1556 return _mcp_router_longsep(key+slen, klen-slen, conf->stop, len);
1557 }
1558
_mcp_funcgen_route_fallback(struct mcp_funcgen_router * fr,int cmd)1559 static inline mcp_funcgen_t *_mcp_funcgen_route_fallback(struct mcp_funcgen_router *fr, int cmd) {
1560 if (fr->cmap[cmd]) {
1561 return fr->cmap[cmd];
1562 }
1563 return fr->def_fgen;
1564 }
1565
mcp_funcgen_route(lua_State * L,mcp_funcgen_t * fgen,mcp_parser_t * pr)1566 static mcp_funcgen_t *mcp_funcgen_route(lua_State *L, mcp_funcgen_t *fgen, mcp_parser_t *pr) {
1567 struct mcp_funcgen_router *fr = (struct mcp_funcgen_router *)fgen;
1568 if (pr->klen == 0) {
1569 return NULL;
1570 }
1571 const char *key = &pr->request[pr->tokens[pr->keytoken]];
1572 const char *lookup = NULL;
1573 size_t lookuplen = 0;
1574 switch(fr->type) {
1575 case FGEN_ROUTER_NONE:
1576 break;
1577 case FGEN_ROUTER_CMDMAP:
1578 // short circuit if all we can do is cmap and default.
1579 return _mcp_funcgen_route_fallback(fr, pr->command);
1580 break;
1581 case FGEN_ROUTER_SHORTSEP:
1582 lookup = _mcp_router_shortsep(key, pr->klen, fr->conf.sep, &lookuplen);
1583 break;
1584 case FGEN_ROUTER_LONGSEP:
1585 lookup = _mcp_router_longsep(key, pr->klen, fr->conf.lsep, &lookuplen);
1586 break;
1587 case FGEN_ROUTER_ANCHORSM:
1588 lookup = _mcp_router_anchorsm(key, pr->klen, fr->conf.anchorsm, &lookuplen);
1589 break;
1590 case FGEN_ROUTER_ANCHORBIG:
1591 lookup = _mcp_router_anchorbig(key, pr->klen, &fr->conf.big, &lookuplen);
1592 break;
1593 }
1594
1595 if (lookuplen == 0) {
1596 return _mcp_funcgen_route_fallback(fr, pr->command);
1597 }
1598
1599 // hoping the lua short string cache helps us avoid allocations at least.
1600 // since this lookup code is internal to the router object we can optimize
1601 // this later and remove the lua bits.
1602 lua_rawgeti(L, LUA_REGISTRYINDEX, fr->map_ref);
1603 lua_pushlstring(L, lookup, lookuplen);
1604 lua_rawget(L, -2); // pops key, returns value
1605 if (lua_isnil(L, -1)) {
1606 lua_pop(L, 2); // drop nil and map.
1607 return _mcp_funcgen_route_fallback(fr, pr->command);
1608 } else {
1609 int type = lua_type(L, -1);
1610 if (type == LUA_TUSERDATA) {
1611 mcp_funcgen_t *nfgen = lua_touserdata(L, -1);
1612 lua_pop(L, 2); // drop fgen and map.
1613 return nfgen;
1614 } else if (type == LUA_TTABLE) {
1615 lua_rawgeti(L, -1, pr->command);
1616 // If nil, check CMD_ANY_STORAGE index for a cmap default
1617 if (lua_isnil(L, -1)) {
1618 lua_pop(L, 1); // drop nil.
1619 // check if we have a local-default
1620 lua_rawgeti(L, -1, CMD_ANY_STORAGE);
1621 if (lua_isnil(L, -1)) {
1622 lua_pop(L, 3); // drop map, cmd map, nil
1623 return _mcp_funcgen_route_fallback(fr, pr->command);
1624 } else {
1625 mcp_funcgen_t *nfgen = lua_touserdata(L, -1);
1626 lua_pop(L, 3); // drop map, cmd map, fgen
1627 return nfgen;
1628 }
1629 }
1630 mcp_funcgen_t *nfgen = lua_touserdata(L, -1);
1631 lua_pop(L, 3); // drop fgen, cmd map, map
1632 return nfgen;
1633 } else {
1634 return _mcp_funcgen_route_fallback(fr, pr->command);
1635 }
1636 }
1637 }
1638
1639 // called from mcp_funcgen_cleanup if necessary.
mcp_funcgen_router_cleanup(lua_State * L,mcp_funcgen_t * fgen)1640 static int mcp_funcgen_router_cleanup(lua_State *L, mcp_funcgen_t *fgen) {
1641 struct mcp_funcgen_router *fr = (struct mcp_funcgen_router *)fgen;
1642 if (fr->map_ref) {
1643 lua_rawgeti(L, LUA_REGISTRYINDEX, fr->map_ref);
1644
1645 // walk the map, de-ref any funcgens found.
1646 int tidx = lua_absindex(L, -1);
1647 lua_pushnil(L);
1648 while (lua_next(L, tidx) != 0) {
1649 int type = lua_type(L, -1);
1650 if (type == LUA_TUSERDATA) {
1651 mcp_funcgen_t *mfgen = lua_touserdata(L, -1);
1652 mcp_funcgen_dereference(L, mfgen);
1653 lua_pop(L, 1);
1654 } else if (type == LUA_TTABLE) {
1655 int midx = lua_absindex(L, -1);
1656 lua_pushnil(L);
1657 while (lua_next(L, midx) != 0) {
1658 mcp_funcgen_t *mfgen = lua_touserdata(L, -1);
1659 mcp_funcgen_dereference(L, mfgen);
1660 lua_pop(L, 1); // drop value
1661 }
1662 lua_pop(L, 1); // drop command map table
1663 }
1664 }
1665
1666 lua_pop(L, 1); // drop the table.
1667 luaL_unref(L, LUA_REGISTRYINDEX, fr->map_ref);
1668 fr->map_ref = 0;
1669 }
1670
1671 // release any command map entries.
1672 for (int x = 0; x < CMD_END_STORAGE; x++) {
1673 if (fr->cmap[x]) {
1674 mcp_funcgen_dereference(L, fr->cmap[x]);
1675 fr->cmap[x] = NULL;
1676 }
1677 }
1678
1679 if (fr->def_fgen) {
1680 mcp_funcgen_dereference(L, fr->def_fgen);
1681 fr->def_fgen = NULL;
1682 }
1683
1684 return 0;
1685 }
1686
1687 // Note: the string should be safe to use after popping it here, because we
1688 // were fetching it from a table, but I might consider copying it into a
1689 // buffer from the caller first.
_mcplib_router_new_check(lua_State * L,const char * arg,size_t * len)1690 static const char *_mcplib_router_new_check(lua_State *L, const char *arg, size_t *len) {
1691 int type = lua_getfield(L, 1, arg);
1692 if (type == LUA_TSTRING) {
1693 const char *sep = lua_tolstring(L, -1, len);
1694 if (*len == 0) {
1695 proxy_lua_ferror(L, "must pass a non-zero length string to %s in mcp.router_new", arg);
1696 } else if (*len > KEY_HASH_FILTER_MAX) {
1697 proxy_lua_ferror(L, "%s is too long in mcp.router_new", arg);
1698 }
1699 lua_pop(L, 1); // drop key
1700 return sep;
1701 } else if (type != LUA_TNIL) {
1702 proxy_lua_ferror(L, "must pass a string to %s in mcp.router_new", arg);
1703 }
1704 return NULL;
1705 }
1706
_mcplib_router_new_cmapcheck(lua_State * L)1707 static void _mcplib_router_new_cmapcheck(lua_State *L) {
1708 int tidx = lua_absindex(L, -1);
1709 lua_pushnil(L); // init next table key.
1710 while (lua_next(L, tidx) != 0) {
1711 if (!lua_isinteger(L, -2)) {
1712 proxy_lua_error(L, "Non integer key in router command map in router_new");
1713 }
1714 int cmd = lua_tointeger(L, -2);
1715 if ((cmd <= 0 || cmd >= CMD_END_STORAGE) && cmd != CMD_ANY_STORAGE) {
1716 proxy_lua_error(L, "Bad command in router command map in router_new");
1717 }
1718 luaL_checkudata(L, -1, "mcp.funcgen");
1719 lua_pop(L, 1); // drop val, keep key.
1720 }
1721 }
1722
_mcplib_router_new_mapcheck(lua_State * L)1723 static size_t _mcplib_router_new_mapcheck(lua_State *L) {
1724 size_t route_count = 0;
1725 if (!lua_istable(L, -1)) {
1726 proxy_lua_error(L, "Must pass a table to map argument of router_new");
1727 }
1728 // walk map table, get size count.
1729 lua_pushnil(L); // init table key.
1730 while (lua_next(L, 2) != 0) {
1731 int type = lua_type(L, -1);
1732 if (type == LUA_TUSERDATA) {
1733 luaL_checkudata(L, -1, "mcp.funcgen");
1734 } else if (type == LUA_TTABLE) {
1735 // If table, it's a command map, poke in and validate.
1736 _mcplib_router_new_cmapcheck(L);
1737 } else {
1738 proxy_lua_error(L, "unhandled data in router_new map");
1739 }
1740 route_count++;
1741 lua_pop(L, 1); // drop val, keep key.
1742 }
1743
1744 return route_count;
1745 }
1746
1747 // reads the configuration for the router based on the mode.
_mcplib_router_new_mode(lua_State * L,struct mcp_funcgen_router * fr)1748 static void _mcplib_router_new_mode(lua_State *L, struct mcp_funcgen_router *fr) {
1749 const char *type = lua_tostring(L, -1);
1750 size_t len = 0;
1751 const char *sep = NULL;
1752
1753 // change internal type based on length of separator
1754 if (strcmp(type, "prefix") == 0) {
1755 sep = _mcplib_router_new_check(L, "stop", &len);
1756 if (sep == NULL) {
1757 // defaults
1758 fr->type = FGEN_ROUTER_SHORTSEP;
1759 fr->conf.sep = '/';
1760 } else if (len == 1) {
1761 // optimized shortsep case.
1762 fr->type = FGEN_ROUTER_SHORTSEP;
1763 fr->conf.sep = sep[0];
1764 } else {
1765 // len is long.
1766 fr->type = FGEN_ROUTER_LONGSEP;
1767 memcpy(fr->conf.lsep, sep, len);
1768 fr->conf.lsep[len] = '\0'; // cap it.
1769 }
1770 } else if (strcmp(type, "anchor") == 0) {
1771 size_t elen = 0; // stop len.
1772 const char *usep = _mcplib_router_new_check(L, "stop", &elen);
1773 sep = _mcplib_router_new_check(L, "start", &len);
1774 if (sep == NULL && usep == NULL) {
1775 // no arguments, use a default.
1776 fr->type = FGEN_ROUTER_ANCHORSM;
1777 fr->conf.anchorsm[0] = '/';
1778 fr->conf.anchorsm[1] = '/';
1779 } else if (sep == NULL || usep == NULL) {
1780 // reduce the combinatorial space because I'm lazy.
1781 proxy_lua_error(L, "must specify start and stop if mode is anchor in mcp.router_new");
1782 } else if (len == 1 && elen == 1) {
1783 fr->type = FGEN_ROUTER_ANCHORSM;
1784 fr->conf.anchorsm[0] = sep[0];
1785 fr->conf.anchorsm[1] = usep[0];
1786 } else {
1787 fr->type = FGEN_ROUTER_ANCHORBIG;
1788 memcpy(fr->conf.big.start, sep, len);
1789 memcpy(fr->conf.big.stop, usep, elen);
1790 fr->conf.big.start[len] = '\0';
1791 fr->conf.big.stop[elen] = '\0';
1792 }
1793 } else {
1794 proxy_lua_error(L, "unknown type passed to mcp.router_new");
1795 }
1796 }
1797
1798 // FIXME: error if map or cmap not passed in?
mcplib_router_new(lua_State * L)1799 int mcplib_router_new(lua_State *L) {
1800 struct mcp_funcgen_router fr = {0};
1801 size_t route_count = 0;
1802 bool has_map = false;
1803
1804 if (!lua_istable(L, 1)) {
1805 proxy_lua_error(L, "Must pass a table of arguments to mcp.router_new");
1806 }
1807
1808 if (lua_getfield(L, 1, "map") != LUA_TNIL) {
1809 route_count = _mcplib_router_new_mapcheck(L);
1810 has_map = true;
1811 }
1812 lua_pop(L, 1); // drop map or nil
1813
1814 if (lua_getfield(L, 1, "cmap") != LUA_TNIL) {
1815 if (!lua_istable(L, -1)) {
1816 proxy_lua_error(L, "Must pass a table to cmap argument of mcp.router_new");
1817 }
1818 _mcplib_router_new_cmapcheck(L);
1819 } else {
1820 if (!has_map) {
1821 proxy_lua_error(L, "Must pass map and/or cmap to mcp.router_new");
1822 }
1823 }
1824 lua_pop(L, 1);
1825
1826 fr.fgen_self.is_router = true;
1827
1828 // config:
1829 // { mode = "anchor", start = "/", stop = "/" }
1830 // { mode = "prefix", stop = "/" }
1831 if (has_map) {
1832 // default to a short prefix type with a single byte separator.
1833 fr.type = FGEN_ROUTER_SHORTSEP;
1834 fr.conf.sep = '/';
1835
1836 if (lua_getfield(L, 1, "mode") == LUA_TSTRING) {
1837 _mcplib_router_new_mode(L, &fr);
1838 }
1839 lua_pop(L, 1); // drop mode or nil.
1840 } else {
1841 // pure command map router.
1842 fr.type = FGEN_ROUTER_CMDMAP;
1843 }
1844
1845 struct mcp_funcgen_router *router = lua_newuserdatauv(L, sizeof(struct mcp_funcgen_router), 0);
1846 memset(router, 0, sizeof(*router));
1847 mcp_funcgen_t *fgen = &router->fgen_self;
1848
1849 luaL_getmetatable(L, "mcp.funcgen");
1850 lua_setmetatable(L, -2);
1851
1852 int type = lua_getfield(L, 1, "default");
1853 if (type == LUA_TUSERDATA) {
1854 fr.def_fgen = luaL_checkudata(L, -1, "mcp.funcgen");
1855 mcp_funcgen_reference(L); // pops the funcgen.
1856 } else {
1857 lua_pop(L, 1);
1858 }
1859
1860 memcpy(router, &fr, sizeof(struct mcp_funcgen_router));
1861 strncpy(fgen->name, "mcp_router", FGEN_NAME_MAXLEN);
1862
1863 if (has_map) {
1864 // walk map table again, funcgen_ref everyone.
1865 lua_createtable(L, 0, route_count);
1866 lua_pushvalue(L, -1); // dupe table ref for a moment.
1867 router->map_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops extra map
1868
1869 int mymap = lua_absindex(L, -1);
1870 lua_getfield(L, 1, "map");
1871 int argmap = lua_absindex(L, -1);
1872 lua_pushnil(L); // seed walk of the passed in map
1873
1874 while (lua_next(L, argmap) != 0) {
1875 // types are already validated.
1876 int type = lua_type(L, -1);
1877 if (type == LUA_TUSERDATA) {
1878 // first lets reference the function generator.
1879 lua_pushvalue(L, -1); // duplicate value.
1880 mcp_funcgen_reference(L); // pops the funcgen after referencing.
1881
1882 // duplicate key.
1883 lua_pushvalue(L, -2);
1884 // move key underneath value
1885 lua_insert(L, -2); // take top (key) and move it down one.
1886 // now key, key, value
1887 lua_rawset(L, mymap); // pops k, v into our internal table.
1888 } else if (type == LUA_TTABLE) {
1889 int tidx = lua_absindex(L, -1); // idx of our command map table.
1890 lua_createtable(L, CMD_END_STORAGE, 0);
1891 int midx = lua_absindex(L, -1); // idx of our new command map.
1892 lua_pushnil(L); // seed the iterator
1893 while (lua_next(L, tidx) != 0) {
1894 lua_pushvalue(L, -1); // duplicate value.
1895 mcp_funcgen_reference(L); // pop funcgen.
1896
1897 lua_pushvalue(L, -2); // dupe key.
1898 lua_insert(L, -2); // move key down one.
1899 lua_rawset(L, midx); // set to new map table.
1900 }
1901
1902 // -1: new command map
1903 // -2: input command map
1904 // -3: key
1905 lua_pushvalue(L, -3); // dupe key
1906 lua_insert(L, -2); // move key down below new cmd map
1907 lua_rawset(L, mymap); // pop key, new map into main map.
1908 lua_pop(L, 1); // drop input table.
1909 }
1910 }
1911
1912 lua_pop(L, 2); // drop argmap, mymap.
1913 }
1914
1915 // process a command map directly into our internal table.
1916 if (lua_getfield(L, 1, "cmap") != LUA_TNIL) {
1917 int tidx = lua_absindex(L, -1); // idx of our command map table.
1918 lua_pushnil(L); // seed the iterator
1919 while (lua_next(L, tidx) != 0) {
1920 int cmd = lua_tointeger(L, -2);
1921 mcp_funcgen_t *cfgen = lua_touserdata(L, -1);
1922 mcp_funcgen_reference(L); // pop funcgen.
1923 router->cmap[cmd] = cfgen;
1924 }
1925 }
1926 lua_pop(L, 1);
1927
1928 LIBEVENT_THREAD *t = PROXY_GET_THR(L);
1929 fgen->thread = t;
1930 mcp_sharedvm_delta(t->proxy_ctx, SHAREDVM_FGEN_IDX, "mcp_router", 1);
1931
1932 return 1;
1933 }
1934