1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  * Functions for handling the proxy layer. wraps text protocols
4  *
5  * NOTE: many lua functions generate pointers via "lua_newuserdatauv" or
6  * similar. Normal memory checking isn't done as lua will throw a high level
7  * error if malloc fails. Must keep this in mind while allocating data so any
8  * manually malloc'ed information gets freed properly.
9  */
10 
11 #include "proxy.h"
12 
13 #define PROCESS_MULTIGET true
14 #define PROCESS_NORMAL false
15 #define PROXY_GC_BACKGROUND_SECONDS 2
16 static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool multiget);
17 static void *mcp_profile_alloc(void *ud, void *ptr, size_t osize, size_t nsize);
18 
19 /******** EXTERNAL FUNCTIONS ******/
20 // functions starting with _ are breakouts for the public functions.
21 
_proxy_advance_lastkb(lua_State * L,LIBEVENT_THREAD * t)22 static inline void _proxy_advance_lastkb(lua_State *L, LIBEVENT_THREAD *t) {
23     int new_kb = lua_gc(L, LUA_GCCOUNT);
24     // We need to slew the increase in "gc pause" because the lua GC actually
25     // needs to run twice to free a userdata: once to run the _gc's and again
26     // to actually clean up the object.
27     // Meaning we will continually increase in size.
28     if (new_kb > t->proxy_vm_last_kb) {
29         new_kb = t->proxy_vm_last_kb + (new_kb - t->proxy_vm_last_kb) * 0.50;
30     }
31 
32     // remove the memory freed during this cycle so we can kick off the GC
33     // early if we're very aggressively making garbage.
34     // carry our negative delta forward so a huge reclaim can push for a
35     // couple cycles.
36     if (t->proxy_vm_negative_delta >= new_kb) {
37         t->proxy_vm_negative_delta -= new_kb;
38         new_kb = 1;
39     } else {
40         new_kb -= t->proxy_vm_negative_delta;
41         t->proxy_vm_negative_delta = 0;
42     }
43 
44     t->proxy_vm_last_kb = new_kb;
45 }
46 
47 // The lua GC is paused while running requests. Run it manually inbetween
48 // processing network events.
proxy_gc_poke(LIBEVENT_THREAD * t)49 void proxy_gc_poke(LIBEVENT_THREAD *t) {
50     lua_State *L = t->L;
51     struct proxy_int_stats *is = t->proxy_int_stats;
52     int vm_kb = lua_gc(L, LUA_GCCOUNT) + t->proxy_vm_extra_kb;
53     if (t->proxy_vm_last_kb == 0) {
54         t->proxy_vm_last_kb = vm_kb;
55     }
56     WSTAT_L(t);
57     is->vm_memory_kb = vm_kb;
58     WSTAT_UL(t);
59 
60     // equivalent of luagc "pause" value
61     int last = t->proxy_vm_last_kb;
62     if (t->proxy_vm_gcrunning <= 0 && vm_kb > last * 2) {
63         t->proxy_vm_gcrunning = 1;
64         //fprintf(stderr, "PROXYGC: proxy_gc_poke START [cur: %d - last: %d]\n", vm_kb, last);
65     }
66 
67     // We configure small GC "steps" then increase the number of times we run
68     // a step based on current memory usage.
69     if (t->proxy_vm_gcrunning > 0) {
70         t->proxy_vm_needspoke = false;
71         int loops = t->proxy_vm_gcrunning;
72         int done = 0;
73         /*fprintf(stderr, "PROXYGC: proxy_gc_poke [cur: %d - last: %d - loops: %d]\n",
74             vm_kb,
75             t->proxy_vm_last_kb,
76             loops);*/
77         while (loops-- && !done) {
78             // reset counters once full GC cycle has completed
79             done = lua_gc(L, LUA_GCSTEP, 0);
80         }
81 
82         int vm_kb_after = lua_gc(L, LUA_GCCOUNT);
83         int vm_kb_clean = vm_kb - t->proxy_vm_extra_kb;
84         if (vm_kb_clean > vm_kb_after) {
85             // track the amount of memory freed during the GC cycle.
86             t->proxy_vm_negative_delta += vm_kb_clean - vm_kb_after;
87         }
88 
89         if (done) {
90             _proxy_advance_lastkb(L, t);
91             t->proxy_vm_extra_kb = 0;
92             t->proxy_vm_gcrunning = 0;
93             WSTAT_L(t);
94             is->vm_gc_runs++;
95             WSTAT_UL(t);
96             //fprintf(stderr, "PROXYGC: proxy_gc_poke COMPLETE [cur: %d next: %d]\n", lua_gc(L, LUA_GCCOUNT), t->proxy_vm_last_kb);
97         }
98 
99         // increase the aggressiveness by memory bloat level.
100         if (t->proxy_vm_gcrunning && (last*2) + (last * t->proxy_vm_gcrunning*0.25) < vm_kb) {
101             t->proxy_vm_gcrunning++;
102             //fprintf(stderr, "PROXYGC: proxy_gc_poke INCREASING AGGRESSIVENESS [cur: %d - aggro: %d]\n", t->proxy_vm_last_kb, t->proxy_vm_gcrunning);
103         } else if (t->proxy_vm_gcrunning > 1) {
104             // memory can drop during a run, let the GC slow down again.
105             t->proxy_vm_gcrunning--;
106             //fprintf(stderr, "PROXYGC: proxy_gc_poke DECREASING AGGRESSIVENESS [cur: %d - aggro: %d]\n", t->proxy_vm_last_kb, t->proxy_vm_gcrunning);
107         }
108     }
109 }
110 
111 // every couple seconds we force-run one GC step.
112 // this is needed until after API1 is retired and pool objects are no longer
113 // managed by the GC.
114 // We use a negative value so a "timer poke" GC run doesn't cause requests to
115 // suddenly aggressively run the GC.
proxy_gc_timerpoke(evutil_socket_t fd,short event,void * arg)116 static void proxy_gc_timerpoke(evutil_socket_t fd, short event, void *arg) {
117     LIBEVENT_THREAD *t = arg;
118     struct timeval next = { PROXY_GC_BACKGROUND_SECONDS, 0 };
119     evtimer_add(t->proxy_gc_timer, &next);
120     // if GC ran within the last few seconds, don't do anything.
121     if (!t->proxy_vm_needspoke) {
122         t->proxy_vm_needspoke = true;
123         return;
124     }
125 
126     // if we weren't told to skip and there's otherwise no GC running, start a
127     // GC run.
128     if (t->proxy_vm_gcrunning == 0) {
129         t->proxy_vm_gcrunning = -1;
130     }
131 
132     // only advance GC if we're doing our own timer run.
133     if (t->proxy_vm_gcrunning == -1 && lua_gc(t->L, LUA_GCSTEP, 0)) {
134         _proxy_advance_lastkb(t->L, t);
135         t->proxy_vm_extra_kb = 0;
136         t->proxy_vm_gcrunning = 0;
137     }
138 }
139 
proxy_bufmem_checkadd(LIBEVENT_THREAD * t,int len)140 bool proxy_bufmem_checkadd(LIBEVENT_THREAD *t, int len) {
141     bool oom = false;
142     pthread_mutex_lock(&t->proxy_limit_lock);
143     if (t->proxy_buffer_memory_used > t->proxy_buffer_memory_limit) {
144         oom = true;
145     } else {
146         t->proxy_buffer_memory_used += len;
147     }
148     pthread_mutex_unlock(&t->proxy_limit_lock);
149     return oom;
150 }
151 
152 // see also: process_extstore_stats()
proxy_stats(void * arg,ADD_STAT add_stats,void * c)153 void proxy_stats(void *arg, ADD_STAT add_stats, void *c) {
154     if (arg == NULL) {
155        return;
156     }
157     proxy_ctx_t *ctx = arg;
158 
159     STAT_L(ctx);
160     APPEND_STAT("proxy_config_reloads", "%llu", (unsigned long long)ctx->global_stats.config_reloads);
161     APPEND_STAT("proxy_config_reload_fails", "%llu", (unsigned long long)ctx->global_stats.config_reload_fails);
162     APPEND_STAT("proxy_config_cron_runs", "%llu", (unsigned long long)ctx->global_stats.config_cron_runs);
163     APPEND_STAT("proxy_config_cron_fails", "%llu", (unsigned long long)ctx->global_stats.config_cron_fails);
164     APPEND_STAT("proxy_backend_total", "%llu", (unsigned long long)ctx->global_stats.backend_total);
165     APPEND_STAT("proxy_backend_marked_bad", "%llu", (unsigned long long)ctx->global_stats.backend_marked_bad);
166     APPEND_STAT("proxy_backend_failed", "%llu", (unsigned long long)ctx->global_stats.backend_failed);
167     APPEND_STAT("proxy_request_failed_depth", "%llu", (unsigned long long)ctx->global_stats.request_failed_depth);
168     STAT_UL(ctx);
169 }
170 
process_proxy_stats(void * arg,ADD_STAT add_stats,void * c)171 void process_proxy_stats(void *arg, ADD_STAT add_stats, void *c) {
172     char key_str[STAT_KEY_LEN];
173     struct proxy_int_stats istats = {0};
174     uint64_t req_limit = 0;
175     uint64_t buffer_memory_limit = 0;
176     uint64_t buffer_memory_used = 0;
177 
178     if (!arg) {
179         return;
180     }
181     proxy_ctx_t *ctx = arg;
182     STAT_L(ctx);
183     req_limit = ctx->active_req_limit;
184     buffer_memory_limit = ctx->buffer_memory_limit;
185 
186     // prepare aggregated counters.
187     struct proxy_user_stats_entry *us = ctx->user_stats;
188     int stats_num = ctx->user_stats_num;
189     uint64_t counters[stats_num];
190     memset(counters, 0, sizeof(counters));
191 
192     // TODO (v3): more globals to remove and/or change API method.
193     // aggregate worker thread counters.
194     for (int x = 0; x < settings.num_threads; x++) {
195         LIBEVENT_THREAD *t = get_worker_thread(x);
196         struct proxy_user_stats *tus = t->proxy_user_stats;
197         struct proxy_int_stats *is = t->proxy_int_stats;
198         WSTAT_L(t);
199         for (int i = 0; i < CMD_FINAL; i++) {
200             istats.counters[i] += is->counters[i];
201         }
202         istats.vm_gc_runs += is->vm_gc_runs;
203         istats.vm_memory_kb += is->vm_memory_kb;
204         if (tus && tus->num_stats >= stats_num) {
205             for (int i = 0; i < stats_num; i++) {
206                 counters[i] += tus->counters[i];
207             }
208         }
209         WSTAT_UL(t);
210         pthread_mutex_lock(&t->proxy_limit_lock);
211         buffer_memory_used += t->proxy_buffer_memory_used;
212         pthread_mutex_unlock(&t->proxy_limit_lock);
213     }
214 
215     // return all of the user generated stats
216     if (ctx->user_stats_namebuf) {
217         char vbuf[INCR_MAX_STORAGE_LEN];
218         char *e = NULL; // ptr into vbuf
219         const char *pfx = "user_";
220         const size_t pfxlen = strlen(pfx);
221         for (int x = 0; x < stats_num; x++) {
222             if (us[x].cname) {
223                 char *name = ctx->user_stats_namebuf + us[x].cname;
224                 size_t nlen = strlen(name);
225                 if (nlen > STAT_KEY_LEN-6) {
226                     // impossible, but for paranoia.
227                     nlen = STAT_KEY_LEN-6;
228                 }
229                 // avoiding an snprintf call for some performance ("user_%s")
230                 memcpy(key_str, pfx, pfxlen);
231                 memcpy(key_str+pfxlen, name, nlen);
232                 key_str[pfxlen+nlen] = '\0';
233 
234                 // APPEND_STAT() calls another snprintf, which calls our
235                 // add_stats argument. Lets skip yet another snprintf with
236                 // some unrolling.
237                 e = itoa_u64(counters[x], vbuf);
238                 *(e+1) = '\0';
239                 add_stats(key_str, pfxlen+nlen, vbuf, e-vbuf, c);
240             }
241         }
242     }
243 
244     STAT_UL(ctx);
245 
246     if (buffer_memory_limit == UINT64_MAX) {
247         buffer_memory_limit = 0;
248     } else {
249         buffer_memory_limit *= settings.num_threads;
250     }
251     if (req_limit == UINT64_MAX) {
252         req_limit = 0;
253     } else {
254         req_limit *= settings.num_threads;
255     }
256 
257     // return proxy counters
258     APPEND_STAT("active_req_limit", "%llu", (unsigned long long)req_limit);
259     APPEND_STAT("buffer_memory_limit", "%llu", (unsigned long long)buffer_memory_limit);
260     APPEND_STAT("buffer_memory_used", "%llu", (unsigned long long)buffer_memory_used);
261     APPEND_STAT("vm_gc_runs", "%llu", (unsigned long long)istats.vm_gc_runs);
262     APPEND_STAT("vm_memory_kb", "%llu", (unsigned long long)istats.vm_memory_kb);
263     APPEND_STAT("cmd_mg", "%llu", (unsigned long long)istats.counters[CMD_MG]);
264     APPEND_STAT("cmd_ms", "%llu", (unsigned long long)istats.counters[CMD_MS]);
265     APPEND_STAT("cmd_md", "%llu", (unsigned long long)istats.counters[CMD_MD]);
266     APPEND_STAT("cmd_mn", "%llu", (unsigned long long)istats.counters[CMD_MN]);
267     APPEND_STAT("cmd_ma", "%llu", (unsigned long long)istats.counters[CMD_MA]);
268     APPEND_STAT("cmd_me", "%llu", (unsigned long long)istats.counters[CMD_ME]);
269     APPEND_STAT("cmd_get", "%llu", (unsigned long long)istats.counters[CMD_GET]);
270     APPEND_STAT("cmd_gat", "%llu", (unsigned long long)istats.counters[CMD_GAT]);
271     APPEND_STAT("cmd_set", "%llu", (unsigned long long)istats.counters[CMD_SET]);
272     APPEND_STAT("cmd_add", "%llu", (unsigned long long)istats.counters[CMD_ADD]);
273     APPEND_STAT("cmd_cas", "%llu", (unsigned long long)istats.counters[CMD_CAS]);
274     APPEND_STAT("cmd_gets", "%llu", (unsigned long long)istats.counters[CMD_GETS]);
275     APPEND_STAT("cmd_gats", "%llu", (unsigned long long)istats.counters[CMD_GATS]);
276     APPEND_STAT("cmd_incr", "%llu", (unsigned long long)istats.counters[CMD_INCR]);
277     APPEND_STAT("cmd_decr", "%llu", (unsigned long long)istats.counters[CMD_DECR]);
278     APPEND_STAT("cmd_touch", "%llu", (unsigned long long)istats.counters[CMD_TOUCH]);
279     APPEND_STAT("cmd_append", "%llu", (unsigned long long)istats.counters[CMD_APPEND]);
280     APPEND_STAT("cmd_prepend", "%llu", (unsigned long long)istats.counters[CMD_PREPEND]);
281     APPEND_STAT("cmd_delete", "%llu", (unsigned long long)istats.counters[CMD_DELETE]);
282     APPEND_STAT("cmd_replace", "%llu", (unsigned long long)istats.counters[CMD_REPLACE]);
283 }
284 
process_proxy_funcstats(void * arg,ADD_STAT add_stats,void * c)285 void process_proxy_funcstats(void *arg, ADD_STAT add_stats, void *c) {
286     char key_str[STAT_KEY_LEN];
287     if (!arg) {
288         return;
289     }
290     proxy_ctx_t *ctx = arg;
291     lua_State *L = ctx->proxy_sharedvm;
292     pthread_mutex_lock(&ctx->sharedvm_lock);
293 
294     // iterate all of the named function slots
295     lua_pushnil(L);
296     while (lua_next(L, SHAREDVM_FGEN_IDX) != 0) {
297         int n = lua_tointeger(L, -1);
298         lua_pop(L, 1); // drop the value, leave the key.
299         if (n != 0) {
300             // reuse the key. make a copy since rawget will pop it.
301             lua_pushvalue(L, -1);
302             lua_rawget(L, SHAREDVM_FGENSLOT_IDX);
303             int slots = lua_tointeger(L, -1);
304             lua_pop(L, 1); // drop the slot count.
305 
306             // now grab the name key.
307             const char *name = lua_tostring(L, -1);
308             snprintf(key_str, STAT_KEY_LEN-1, "funcs_%s", name);
309             APPEND_STAT(key_str, "%d", n);
310             snprintf(key_str, STAT_KEY_LEN-1, "slots_%s", name);
311             APPEND_STAT(key_str, "%d", slots);
312         } else {
313             // TODO: It is safe to delete keys here. Slightly complex so low
314             // priority.
315         }
316     }
317 
318     pthread_mutex_unlock(&ctx->sharedvm_lock);
319 }
320 
process_proxy_bestats(void * arg,ADD_STAT add_stats,void * c)321 void process_proxy_bestats(void *arg, ADD_STAT add_stats, void *c) {
322     char key_str[STAT_KEY_LEN];
323     if (!arg) {
324         return;
325     }
326     proxy_ctx_t *ctx = arg;
327     lua_State *L = ctx->proxy_sharedvm;
328     pthread_mutex_lock(&ctx->sharedvm_lock);
329 
330     // iterate all of the listed backends
331     lua_pushnil(L);
332     while (lua_next(L, SHAREDVM_BACKEND_IDX) != 0) {
333         int n = lua_tointeger(L, -1);
334         lua_pop(L, 1); // drop the value, leave the key.
335         if (n != 0) {
336             // now grab the name key.
337             const char *name = lua_tostring(L, -1);
338             snprintf(key_str, STAT_KEY_LEN-1, "bad_%s", name);
339             APPEND_STAT(key_str, "%d", n);
340         } else {
341             // delete keys of backends that are no longer bad or no longer
342             // exist to keep the table small.
343             const char *name = lua_tostring(L, -1);
344             lua_pushnil(L);
345             lua_setfield(L, SHAREDVM_BACKEND_IDX, name);
346         }
347     }
348 
349     pthread_mutex_unlock(&ctx->sharedvm_lock);
350 }
351 
352 // start the centralized lua state and config thread.
proxy_init(bool use_uring,bool proxy_memprofile)353 void *proxy_init(bool use_uring, bool proxy_memprofile) {
354     proxy_ctx_t *ctx = calloc(1, sizeof(proxy_ctx_t));
355     ctx->use_uring = use_uring;
356     ctx->memprofile = proxy_memprofile;
357 
358     pthread_mutex_init(&ctx->config_lock, NULL);
359     pthread_cond_init(&ctx->config_cond, NULL);
360     pthread_mutex_init(&ctx->worker_lock, NULL);
361     pthread_cond_init(&ctx->worker_cond, NULL);
362     pthread_mutex_init(&ctx->manager_lock, NULL);
363     pthread_cond_init(&ctx->manager_cond, NULL);
364     pthread_mutex_init(&ctx->stats_lock, NULL);
365 
366     ctx->active_req_limit = UINT64_MAX;
367     ctx->buffer_memory_limit = UINT64_MAX;
368 
369     // FIXME (v2): default defines.
370     ctx->tunables.tcp_keepalive = false;
371     ctx->tunables.backend_failure_limit = 3;
372     ctx->tunables.connect.tv_sec = 5;
373     ctx->tunables.retry.tv_sec = 3;
374     ctx->tunables.read.tv_sec = 3;
375     ctx->tunables.flap_backoff_ramp = 1.5;
376     ctx->tunables.flap_backoff_max = 3600;
377     ctx->tunables.backend_depth_limit = 0;
378     ctx->tunables.max_ustats = MAX_USTATS_DEFAULT;
379     ctx->tunables.use_iothread = false;
380     ctx->tunables.use_tls = false;
381 
382     STAILQ_INIT(&ctx->manager_head);
383     lua_State *L = NULL;
384     if (ctx->memprofile) {
385         struct mcp_memprofile *prof = calloc(1, sizeof(struct mcp_memprofile));
386         prof->id = ctx->memprofile_thread_counter++;
387         L = lua_newstate(mcp_profile_alloc, prof);
388     } else {
389         L = luaL_newstate();
390     }
391     ctx->proxy_state = L;
392     luaL_openlibs(L);
393     // NOTE: might need to differentiate the libs yes?
394     proxy_register_libs(ctx, NULL, L);
395     // Create the cron table.
396     lua_newtable(L);
397     ctx->cron_ref = luaL_ref(L, LUA_REGISTRYINDEX);
398     ctx->cron_next = INT_MAX;
399 
400     // set up the shared state VM. Used by short-lock events (counters/state)
401     // for global visibility.
402     pthread_mutex_init(&ctx->sharedvm_lock, NULL);
403     ctx->proxy_sharedvm = luaL_newstate();
404     luaL_openlibs(ctx->proxy_sharedvm);
405     // we keep info tables in the top level stack so we don't have to
406     // constantly fetch them from registry.
407     lua_newtable(ctx->proxy_sharedvm); // fgen count
408     lua_newtable(ctx->proxy_sharedvm); // fgen slot count
409     lua_newtable(ctx->proxy_sharedvm); // backend down status
410 
411     // Create/start the IO thread, which we need before servers
412     // start getting created.
413     proxy_event_thread_t *t = calloc(1, sizeof(proxy_event_thread_t));
414     ctx->proxy_io_thread = t;
415     proxy_init_event_thread(t, ctx, NULL);
416 
417     pthread_create(&t->thread_id, NULL, proxy_event_thread, t);
418     thread_setname(t->thread_id, "mc-prx-io");
419 
420     _start_proxy_config_threads(ctx);
421     return ctx;
422 }
423 
424 // Initialize the VM for an individual worker thread.
proxy_thread_init(void * ctx,LIBEVENT_THREAD * thr)425 void proxy_thread_init(void *ctx, LIBEVENT_THREAD *thr) {
426     assert(ctx != NULL);
427     assert(thr != NULL);
428 
429     // Create the hook table.
430     thr->proxy_hooks = calloc(CMD_SIZE, sizeof(struct proxy_hook));
431     if (thr->proxy_hooks == NULL) {
432         fprintf(stderr, "Failed to allocate proxy hooks\n");
433         exit(EXIT_FAILURE);
434     }
435     thr->proxy_int_stats = calloc(1, sizeof(struct proxy_int_stats));
436     if (thr->proxy_int_stats == NULL) {
437         fprintf(stderr, "Failed to allocate proxy thread stats\n");
438         exit(EXIT_FAILURE);
439     }
440     pthread_mutex_init(&thr->proxy_limit_lock, NULL);
441     thr->proxy_ctx = ctx;
442 
443     // Initialize the lua state.
444     proxy_ctx_t *pctx = ctx;
445     lua_State *L = NULL;
446     if (pctx->memprofile) {
447         struct mcp_memprofile *prof = calloc(1, sizeof(struct mcp_memprofile));
448         prof->id = pctx->memprofile_thread_counter++;
449         L = lua_newstate(mcp_profile_alloc, prof);
450     } else {
451         L = luaL_newstate();
452     }
453 
454     // With smaller requests the default incremental collector appears to
455     // never complete. With this simple tuning (def-1, def, def) it seems
456     // fine.
457     // We can't use GCGEN until we manage pools with reference counting, as
458     // they may never hit GC and thus never release their connection
459     // resources.
460     lua_gc(L, LUA_GCINC, 199, 100, 12);
461     lua_gc(L, LUA_GCSTOP); // handle GC on our own schedule.
462     thr->L = L;
463     luaL_openlibs(L);
464     proxy_register_libs(ctx, thr, L);
465     // TODO: srand on time? do we need to bother?
466     for (int x = 0; x < 3; x++) {
467         thr->proxy_rng[x] = rand();
468     }
469 
470     thr->proxy_gc_timer = evtimer_new(thr->base, proxy_gc_timerpoke, thr);
471     // kick off the timer loop.
472     proxy_gc_timerpoke(0, 0, thr);
473 
474     // Create a proxy event thread structure to piggyback on the worker.
475     proxy_event_thread_t *t = calloc(1, sizeof(proxy_event_thread_t));
476     thr->proxy_event_thread = t;
477     proxy_init_event_thread(t, ctx, thr->base);
478 }
479 
480 // ctx_stack is a stack of io_pending_proxy_t's.
481 // head of q->s_ctx is the "newest" request so we must push into the head
482 // of the next queue, as requests are dequeued from the head
proxy_submit_cb(io_queue_t * q)483 void proxy_submit_cb(io_queue_t *q) {
484     proxy_event_thread_t *e = ((proxy_ctx_t *)q->ctx)->proxy_io_thread;
485     io_pending_proxy_t *p = q->stack_ctx;
486     io_head_t head;
487     be_head_t w_head; // worker local stack.
488     STAILQ_INIT(&head);
489     STAILQ_INIT(&w_head);
490 
491     // NOTE: responses get returned in the correct order no matter what, since
492     // mc_resp's are linked.
493     // we just need to ensure stuff is parsed off the backend in the correct
494     // order.
495     // So we can do with a single list here, but we need to repair the list as
496     // responses are parsed. (in the req_remaining-- section)
497     // TODO (v2):
498     // - except we can't do that because the deferred IO stack isn't
499     // compatible with queue.h.
500     // So for now we build the secondary list with an STAILQ, which
501     // can be transplanted/etc.
502     while (p) {
503         mcp_backend_t *be;
504         P_DEBUG("%s: queueing req for backend: %p\n", __func__, (void *)p);
505         if (p->qcount_incr) {
506             // funny workaround: async IOP's don't count toward
507             // resuming a connection, only the completion of the async
508             // condition.
509             q->count++;
510         }
511 
512         if (p->background) {
513             P_DEBUG("%s: fast-returning background object: %p\n", __func__, (void *)p);
514             // intercept background requests
515             // this call cannot recurse if we're on the worker thread,
516             // since the worker thread has to finish executing this
517             // function in order to pick up the returned IO.
518             return_io_pending((io_pending_t *)p);
519             p = p->next;
520             continue;
521         }
522         be = p->backend;
523 
524         if (be->use_io_thread) {
525             STAILQ_INSERT_HEAD(&head, p, io_next);
526         } else {
527             // emulate some of handler_dequeue()
528             STAILQ_INSERT_HEAD(&be->io_head, p, io_next);
529             assert(be->depth > -1);
530             be->depth++;
531             if (!be->stacked) {
532                 be->stacked = true;
533                 STAILQ_INSERT_TAIL(&w_head, be, be_next);
534             }
535         }
536 
537         p = p->next;
538     }
539 
540     // clear out the submit queue so we can re-queue new IO's inline.
541     q->stack_ctx = NULL;
542 
543     if (!STAILQ_EMPTY(&head)) {
544         bool do_notify = false;
545         P_DEBUG("%s: submitting queue to IO thread\n", __func__);
546         // Transfer request stack to event thread.
547         pthread_mutex_lock(&e->mutex);
548         if (STAILQ_EMPTY(&e->io_head_in)) {
549             do_notify = true;
550         }
551         STAILQ_CONCAT(&e->io_head_in, &head);
552         // No point in holding the lock since we're not doing a cond signal.
553         pthread_mutex_unlock(&e->mutex);
554 
555         if (do_notify) {
556         // Signal to check queue.
557 #ifdef USE_EVENTFD
558         uint64_t u = 1;
559         // TODO (v2): check result? is it ever possible to get a short write/failure
560         // for an eventfd?
561         if (write(e->event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
562             assert(1 == 0);
563         }
564 #else
565         if (write(e->notify_send_fd, "w", 1) <= 0) {
566             assert(1 == 0);
567         }
568 #endif
569         }
570     }
571 
572     if (!STAILQ_EMPTY(&w_head)) {
573         P_DEBUG("%s: running inline worker queue\n", __func__);
574         // emulating proxy_event_handler
575         proxy_run_backend_queue(&w_head);
576     }
577     return;
578 }
579 
580 // This function handles return processing for the "old style" API:
581 // currently just `mcp.internal()`
proxy_return_rctx_cb(io_pending_t * pending)582 void proxy_return_rctx_cb(io_pending_t *pending) {
583     io_pending_proxy_t *p = (io_pending_proxy_t *)pending;
584     if (p->client_resp && p->client_resp->blen) {
585         // FIXME: workaround for buffer memory being external to objects.
586         // can't run 0 since that means something special (run the GC)
587         unsigned int kb = p->client_resp->blen / 1000;
588         p->thread->proxy_vm_extra_kb += kb > 0 ? kb : 1;
589     }
590 
591     mcp_rcontext_t *rctx = p->rctx;
592     lua_rotate(rctx->Lc, 1, 1);
593     lua_settop(rctx->Lc, 1);
594     // hold the resp for a minute.
595     mc_resp *resp = rctx->resp;
596 
597     proxy_run_rcontext(rctx);
598     mcp_funcgen_return_rctx(rctx);
599 
600     io_queue_t *q = conn_io_queue_get(p->c, p->io_queue_type);
601     // Detatch the iop from the mc_resp and free it here.
602     conn *c = p->c;
603     if (p->io_type != IO_PENDING_TYPE_EXTSTORE) {
604         // if we're doing an extstore subrequest, the iop needs to live until
605         // resp's ->finish_cb is called.
606         resp->io_pending = NULL;
607         do_cache_free(p->thread->io_cache, p);
608     }
609 
610     q->count--;
611     if (q->count == 0) {
612         // call re-add directly since we're already in the worker thread.
613         conn_worker_readd(c);
614     }
615 }
616 
617 // This is called if resp_finish is called while an iop exists on the
618 // resp.
619 // so we need to release our iop and rctx.
620 // - This can't happen unless we're doing extstore fetches.
621 // - the request context is freed before connection processing resumes.
proxy_finalize_rctx_cb(io_pending_t * pending)622 void proxy_finalize_rctx_cb(io_pending_t *pending) {
623     io_pending_proxy_t *p = (io_pending_proxy_t *)pending;
624 
625     if (p->io_type == IO_PENDING_TYPE_EXTSTORE) {
626         if (p->hdr_it) {
627             // TODO: lock once, worst case this hashes/locks twice.
628             if (p->miss) {
629                 item_unlink(p->hdr_it);
630             }
631             item_remove(p->hdr_it);
632         }
633     }
634 }
635 
try_read_command_proxy(conn * c)636 int try_read_command_proxy(conn *c) {
637     char *el, *cont;
638 
639     if (c->rbytes == 0)
640         return 0;
641 
642     el = memchr(c->rcurr, '\n', c->rbytes);
643     if (!el) {
644         if (c->rbytes > 1024) {
645             /*
646              * We didn't have a '\n' in the first k. This _has_ to be a
647              * large multiget, if not we should just nuke the connection.
648              */
649             char *ptr = c->rcurr;
650             while (*ptr == ' ') { /* ignore leading whitespaces */
651                 ++ptr;
652             }
653 
654             if (ptr - c->rcurr > 100 ||
655                 (strncmp(ptr, "get ", 4) && strncmp(ptr, "gets ", 5))) {
656 
657                 conn_set_state(c, conn_closing);
658                 return 1;
659             }
660 
661             // ASCII multigets are unbound, so our fixed size rbuf may not
662             // work for this particular workload... For backcompat we'll use a
663             // malloc/realloc/free routine just for this.
664             if (!c->rbuf_malloced) {
665                 if (!rbuf_switch_to_malloc(c)) {
666                     conn_set_state(c, conn_closing);
667                     return 1;
668                 }
669             }
670         }
671 
672         return 0;
673     }
674     cont = el + 1;
675 
676     assert(cont <= (c->rcurr + c->rbytes));
677 
678     c->last_cmd_time = current_time;
679     proxy_process_command(c, c->rcurr, cont - c->rcurr, PROCESS_NORMAL);
680 
681     c->rbytes -= (cont - c->rcurr);
682     c->rcurr = cont;
683 
684     assert(c->rcurr <= (c->rbuf + c->rsize));
685 
686     return 1;
687 
688 }
689 
690 // Called when a connection is closed while in nread state reading a set
691 // Must only be called with an active coroutine.
proxy_cleanup_conn(conn * c)692 void proxy_cleanup_conn(conn *c) {
693     assert(c->proxy_rctx);
694     mcp_rcontext_t *rctx = c->proxy_rctx;
695     assert(rctx->pending_reqs == 1);
696     rctx->pending_reqs = 0;
697 
698     mcp_funcgen_return_rctx(rctx);
699     c->proxy_rctx = NULL;
700 }
701 
702 // we buffered a SET of some kind.
complete_nread_proxy(conn * c)703 void complete_nread_proxy(conn *c) {
704     assert(c != NULL);
705 
706     LIBEVENT_THREAD *thr = c->thread;
707     lua_State *L = thr->L;
708 
709     if (c->proxy_rctx == NULL) {
710         complete_nread_ascii(c);
711         return;
712     }
713 
714     conn_set_state(c, conn_new_cmd);
715 
716     assert(c->proxy_rctx);
717     mcp_rcontext_t *rctx = c->proxy_rctx;
718     mcp_request_t *rq = rctx->request;
719 
720     if (strncmp((char *)c->item + rq->pr.vlen - 2, "\r\n", 2) != 0) {
721         lua_settop(L, 0); // clear anything remaining on the main thread.
722         // FIXME (v2): need to set noreply false if mset_res, but that's kind
723         // of a weird hack to begin with. Evaluate how to best do that here.
724         out_string(c, "CLIENT_ERROR bad data chunk");
725         rctx->pending_reqs--;
726         mcp_funcgen_return_rctx(rctx);
727         return;
728     }
729 
730     // We move ownership of the c->item buffer from the connection to the
731     // request object here. Else we can double free if the conn closes while
732     // inside nread.
733     rq->pr.vbuf = c->item;
734     c->item = NULL;
735     c->item_malloced = false;
736     c->proxy_rctx = NULL;
737     pthread_mutex_lock(&thr->proxy_limit_lock);
738     thr->proxy_buffer_memory_used += rq->pr.vlen;
739     pthread_mutex_unlock(&thr->proxy_limit_lock);
740 
741     proxy_run_rcontext(rctx);
742     mcp_funcgen_return_rctx(rctx);
743 
744     lua_settop(L, 0); // clear anything remaining on the main thread.
745 
746     return;
747 }
748 
749 // Simple error wrapper for common failures.
750 // lua_error() is a jump so this function never returns
751 // for clarity add a 'return' after calls to this.
proxy_lua_error(lua_State * L,const char * s)752 void proxy_lua_error(lua_State *L, const char *s) {
753     lua_pushstring(L, s);
754     lua_error(L);
755 }
756 
757 // Need a custom function so we can prefix lua strings easily.
proxy_out_errstring(mc_resp * resp,char * type,const char * str)758 void proxy_out_errstring(mc_resp *resp, char *type, const char *str) {
759     size_t len;
760     size_t prefix_len = strlen(type);
761 
762     assert(resp != NULL);
763 
764     resp_reset(resp);
765     // avoid noreply since we're throwing important errors.
766 
767     // Fill response object with static string.
768     len = strlen(str);
769     if ((len + prefix_len + 2) > WRITE_BUFFER_SIZE) {
770         /* ought to be always enough. just fail for simplicity */
771         str = "SERVER_ERROR output line too long";
772         len = strlen(str);
773     }
774 
775     char *w = resp->wbuf;
776     memcpy(w, type, prefix_len);
777     w += prefix_len;
778 
779     memcpy(w, str, len);
780     w += len;
781 
782     memcpy(w, "\r\n", 2);
783     resp_add_iov(resp, resp->wbuf, len + prefix_len + 2);
784     return;
785 }
786 
787 // NOTE: See notes in mcp_queue_io; the secondary problem with setting the
788 // noreply mode from the response object is that the proxy can return strings
789 // manually, so we have no way to obey what the original request wanted in
790 // that case.
_set_noreply_mode(mc_resp * resp,mcp_resp_t * r)791 static void _set_noreply_mode(mc_resp *resp, mcp_resp_t *r) {
792     switch (r->mode) {
793         case RESP_MODE_NORMAL:
794             break;
795         case RESP_MODE_NOREPLY:
796             // ascii noreply only threw egregious errors to client
797             if (r->status == MCMC_OK) {
798                 resp->skip = true;
799             }
800             break;
801         case RESP_MODE_METAQUIET:
802             if (r->resp.code == MCMC_CODE_END) {
803                 resp->skip = true;
804             } else if (r->cmd != CMD_MG && r->resp.code == MCMC_CODE_OK) {
805                 // FIXME (v2): mcmc's parser needs to help us out a bit more
806                 // here.
807                 // This is a broken case in the protocol though; quiet mode
808                 // ignores HD for mutations but not get.
809                 resp->skip = true;
810             }
811             break;
812         default:
813             assert(1 == 0);
814     }
815 }
816 
_proxy_run_rcontext_queues(mcp_rcontext_t * rctx)817 static void _proxy_run_rcontext_queues(mcp_rcontext_t *rctx) {
818     for (int x = 0; x < rctx->fgen->max_queues; x++) {
819         mcp_run_rcontext_handle(rctx, x);
820     }
821 }
822 
_proxy_run_tresp_to_resp(mc_resp * tresp,mc_resp * resp)823 static void _proxy_run_tresp_to_resp(mc_resp *tresp, mc_resp *resp) {
824     // The internal cache handler has created a resp we want to swap in
825     // here. It would be fastest to swap *resp's position in the
826     // link but if the set is deep this would instead be slow, so
827     // we copy over details from this temporary resp instead.
828 
829     // So far all we fill is the wbuf and some iov's? so just copy
830     // that + the UDP info?
831     memcpy(resp->wbuf, tresp->wbuf, tresp->iov[0].iov_len);
832     resp->tosend = 0;
833     for (int x = 0; x < tresp->iovcnt; x++) {
834         resp->iov[x] = tresp->iov[x];
835         resp->tosend += tresp->iov[x].iov_len;
836     }
837     // resp->iov[x].iov_base needs to be updated if it's
838     // pointing within its wbuf.
839     // FIXME: This is too fragile. we need to be able to
840     // inherit details and swap resp objects around.
841     if (tresp->iov[0].iov_base == tresp->wbuf) {
842         resp->iov[0].iov_base = resp->wbuf;
843     }
844     resp->iovcnt = tresp->iovcnt;
845     resp->chunked_total = tresp->chunked_total;
846     resp->chunked_data_iov = tresp->chunked_data_iov;
847     // copy UDP headers...
848     resp->request_id = tresp->request_id;
849     resp->udp_sequence = tresp->udp_sequence;
850     resp->udp_total = tresp->udp_total;
851     resp->request_addr = tresp->request_addr;
852     resp->request_addr_size = tresp->request_addr_size;
853     resp->item = tresp->item; // will be populated if not extstore fetch
854     tresp->item = NULL; // move ownership of the item to resp from tresp
855     resp->skip = tresp->skip;
856 }
857 
858 // HACK NOTES:
859 // These are self-notes for dormando mostly.
860 // The IO queue system does not work well with the proxy, as we need to:
861 // - only increment q->count during the submit phase
862 //   - .. because a resumed coroutine can queue more data.
863 //   - and we will never hit q->count == 0
864 //   - .. and then never resume the main connection. (conn_worker_readd)
865 //   - which will never submit the new sub-requests
866 // - need to only increment q->count once per stack of requests coming from a
867 //   resp.
868 //
869 // For RQU backed requests (new API) there isn't an easy place to test for
870 // "the first request", because:
871 // - The connection queue is a stack of _all_ requests pending on this
872 // connection, and many requests can arrive in one batch.
873 //   - Thus we cannot simply check if there are items in the queue
874 // - RQU's can be recursive, so we have to loop back to the parent to check to
875 //   see if we're the first queue or not.
876 //
877 // This hack workaround exists so I can fix the IO queue subsystem as a change
878 // independent of the RCTX change, as the IO queue touches everything and
879 // scares the shit out of me. It's much easier to make changes to it in
880 // isolation, when all existing systems are currently working and testable.
881 //
882 // Description of the hack:
883 // - in mcp_queue_io: roll up rctx to parent, and if we are the first IO to queue
884 // since the rcontext started, set p->qcounr_incr = true
885 // Later in submit_cb:
886 // - q->count++ if p->qcount_incr.
887 //
888 // Finally, in proxy_return_rqu_cb:
889 // - If parent completed non-yielded work, q->count-- to allow conn
890 // resumption.
891 // - At bottom of rqu_cb(), flush any IO queues for the connection in case we
892 // re-queued work.
proxy_run_rcontext(mcp_rcontext_t * rctx)893 int proxy_run_rcontext(mcp_rcontext_t *rctx) {
894     int nresults = 0;
895     lua_State *Lc = rctx->Lc;
896     assert(rctx->lua_narg != 0);
897     int cores = lua_resume(Lc, NULL, rctx->lua_narg, &nresults);
898     rctx->lua_narg = 1; // reset to default since not-default is uncommon.
899     size_t rlen = 0;
900     mc_resp *resp = rctx->resp;
901 
902     if (cores == LUA_OK) {
903         // don't touch the result object if we were a sub-context.
904         if (!rctx->parent) {
905             int type = lua_type(Lc, 1);
906             mcp_resp_t *r = NULL;
907             P_DEBUG("%s: coroutine completed. return type: %d\n", __func__, type);
908             if (type == LUA_TUSERDATA && (r = luaL_testudata(Lc, 1, "mcp.response")) != NULL) {
909                 _set_noreply_mode(resp, r);
910                 if (r->status != MCMC_OK && r->resp.type != MCMC_RESP_ERRMSG) {
911                     proxy_out_errstring(resp, PROXY_SERVER_ERROR, "backend failure");
912                 } else if (r->cresp) {
913                     mc_resp *tresp = r->cresp;
914 
915                     _proxy_run_tresp_to_resp(tresp, resp);
916                     // we let the mcp_resp gc handler free up tresp and any
917                     // associated io_pending's of its own later.
918                 } else if (r->buf) {
919                     // response set from C.
920                     resp->write_and_free = r->buf;
921                     resp_add_iov(resp, r->buf, r->blen);
922                     // stash the length to later remove from memory tracking
923                     resp->wbytes = r->blen + r->extra;
924                     resp->proxy_res = true;
925                     r->buf = NULL;
926                 } else {
927                     // Empty response: used for ascii multiget emulation.
928                 }
929 
930             } else if (type == LUA_TSTRING) {
931                 // response is a raw string from lua.
932                 const char *s = lua_tolstring(Lc, 1, &rlen);
933                 size_t l = rlen > WRITE_BUFFER_SIZE ? WRITE_BUFFER_SIZE : rlen;
934                 memcpy(resp->wbuf, s, l);
935                 resp_add_iov(resp, resp->wbuf, l);
936                 lua_pop(Lc, 1);
937             } else {
938                 proxy_out_errstring(resp, PROXY_SERVER_ERROR, "bad response");
939             }
940         }
941 
942         rctx->pending_reqs--;
943     } else if (cores == LUA_YIELD) {
944         int yield_type = lua_tointeger(Lc, -1);
945         P_DEBUG("%s: coroutine yielded. return type: %d\n", __func__, yield_type);
946         assert(yield_type != 0);
947         lua_pop(Lc, 1);
948 
949         int res = 0;
950         switch (yield_type) {
951             case MCP_YIELD_INTERNAL:
952                 // stack should be: rq, res
953                 if (rctx->parent) {
954                     LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_ERROR, NULL, "cannot run mcp.internal from a sub request");
955                     rctx->pending_reqs--;
956                     return LUA_ERRRUN;
957                 } else {
958                     res = mcplib_internal_run(rctx);
959                     if (res == 0) {
960                         // stack should still be: rq, res
961                         // TODO: turn this function into a for loop that re-runs on
962                         // certain status codes, to avoid recursive depth here.
963                         // or maybe... a goto? :P
964                         proxy_run_rcontext(rctx);
965                     } else if (res > 0) {
966                         // internal run queued for extstore.
967                     } else {
968                         assert(res < 0);
969                         proxy_out_errstring(resp, PROXY_SERVER_ERROR, "bad request");
970                     }
971                 }
972                 break;
973             case MCP_YIELD_WAITCOND:
974             case MCP_YIELD_WAITHANDLE:
975                 // Even if we're in WAITHANDLE, we want to dispatch any queued
976                 // requests, so we still need to iterate the full set of qslots.
977                 _proxy_run_rcontext_queues(rctx);
978                 break;
979             case MCP_YIELD_SLEEP:
980                 // Pause coroutine and do nothing. Alarm will resume.
981                 break;
982             default:
983                 abort();
984         }
985 
986     } else {
987         // Log the error where it happens, then the parent will handle a
988         // result object normally.
989         P_DEBUG("%s: Failed to run coroutine: %s\n", __func__, lua_tostring(Lc, -1));
990         LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_ERROR, NULL, lua_tostring(Lc, -1));
991         if (!rctx->parent) {
992             proxy_out_errstring(resp, PROXY_SERVER_ERROR, "lua failure");
993         }
994         rctx->pending_reqs--;
995     }
996 
997     return cores;
998 }
999 
1000 // basically any data before the first key.
1001 // max is like 15ish plus spaces. we can be more strict about how many spaces
1002 // to expect because any client spamming space is being deliberately stupid
1003 // anyway.
1004 #define MAX_CMD_PREFIX 20
1005 
proxy_process_command(conn * c,char * command,size_t cmdlen,bool multiget)1006 static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool multiget) {
1007     assert(c != NULL);
1008     LIBEVENT_THREAD *thr = c->thread;
1009     struct proxy_hook *hooks = thr->proxy_hooks;
1010     lua_State *L = thr->L;
1011     proxy_ctx_t *ctx = thr->proxy_ctx;
1012     mcp_parser_t pr = {0};
1013 
1014     // Avoid doing resp_start() here, instead do it a bit later or as-needed.
1015     // This allows us to hop over to the internal text protocol parser, which
1016     // also calls resp_start().
1017     // Tighter integration later should obviate the need for this, it is not a
1018     // permanent solution.
1019     int ret = process_request(&pr, command, cmdlen);
1020     if (ret != 0) {
1021         WSTAT_INCR(c->thread, proxy_conn_errors, 1);
1022         if (!resp_start(c)) {
1023             conn_set_state(c, conn_closing);
1024             return;
1025         }
1026         proxy_out_errstring(c->resp, PROXY_CLIENT_ERROR, "parsing request");
1027         if (ret == -2) {
1028             // Kill connection on more critical parse failure.
1029             conn_set_state(c, conn_closing);
1030         }
1031         return;
1032     }
1033 
1034     struct proxy_hook *hook = &hooks[pr.command];
1035     struct proxy_hook_ref hook_ref = hook->ref;
1036     // if client came from a tagged listener, scan for a more specific hook.
1037     // TODO: (v2) avoiding a hash table lookup here, but maybe some other
1038     // datastructure would suffice. for 4-8 tags this is perfectly fast.
1039     if (c->tag && hook->tagged) {
1040         struct proxy_hook_tagged *pht = hook->tagged;
1041         while (pht->ref.lua_ref) {
1042             if (c->tag == pht->tag) {
1043                 hook_ref = pht->ref;
1044                 break;
1045             }
1046             pht++;
1047         }
1048     }
1049 
1050     if (!hook_ref.lua_ref) {
1051         // need to pass our command string into the internal handler.
1052         // to minimize the code change, this means allowing it to tokenize the
1053         // full command. The proxy's indirect parser should be built out to
1054         // become common code for both proxy and ascii handlers.
1055         // For now this means we have to null-terminate the command string,
1056         // then call into text protocol handler.
1057         // FIXME (v2): use a ptr or something; don't like this code.
1058         if (cmdlen > 1 && command[cmdlen-2] == '\r') {
1059             command[cmdlen-2] = '\0';
1060         } else {
1061             command[cmdlen-1] = '\0';
1062         }
1063         // lets nread_proxy know we're in ascii mode.
1064         c->proxy_rctx = NULL;
1065         process_command_ascii(c, command);
1066         return;
1067     }
1068 
1069     // If ascii multiget, we turn this into a self-calling loop :(
1070     // create new request with next key, call this func again, then advance
1071     // original string.
1072     // might be better to split this function; the below bits turn into a
1073     // function call, then we don't re-process the above bits in the same way?
1074     // The way this is detected/passed on is very fragile.
1075     if (!multiget && pr.cmd_type == CMD_TYPE_GET && pr.has_space) {
1076         uint32_t keyoff = pr.tokens[pr.keytoken];
1077         while (pr.klen != 0) {
1078             char temp[KEY_MAX_LENGTH + MAX_CMD_PREFIX + 30];
1079             char *cur = temp;
1080             // Core daemon can abort the entire command if one key is bad, but
1081             // we cannot from the proxy. Instead we have to inject errors into
1082             // the stream. This should, thankfully, be rare at least.
1083             if (pr.tokens[pr.keytoken] > MAX_CMD_PREFIX) {
1084                 if (!resp_start(c)) {
1085                     conn_set_state(c, conn_closing);
1086                     return;
1087                 }
1088                 proxy_out_errstring(c->resp, PROXY_CLIENT_ERROR, "malformed request");
1089             } else if (pr.klen > KEY_MAX_LENGTH) {
1090                 if (!resp_start(c)) {
1091                     conn_set_state(c, conn_closing);
1092                     return;
1093                 }
1094                 proxy_out_errstring(c->resp, PROXY_CLIENT_ERROR, "key too long");
1095             } else {
1096                 // copy original request up until the original key token.
1097                 memcpy(cur, pr.request, pr.tokens[pr.keytoken]);
1098                 cur += pr.tokens[pr.keytoken];
1099 
1100                 // now copy in our "current" key.
1101                 memcpy(cur, &pr.request[keyoff], pr.klen);
1102                 cur += pr.klen;
1103 
1104                 memcpy(cur, "\r\n", 2);
1105                 cur += 2;
1106 
1107                 *cur = '\0';
1108                 P_DEBUG("%s: new multiget sub request: %s [%u/%u]\n", __func__, temp, keyoff, pr.klen);
1109                 proxy_process_command(c, temp, cur - temp, PROCESS_MULTIGET);
1110             }
1111 
1112             // now advance to the next key.
1113             keyoff = _process_request_next_key(&pr);
1114         }
1115 
1116         if (!resp_start(c)) {
1117             conn_set_state(c, conn_closing);
1118             return;
1119         }
1120 
1121         // The above recursions should have created c->resp's in dispatch
1122         // order.
1123         // So now we add another one at the end to create the capping END
1124         // string.
1125         memcpy(c->resp->wbuf, ENDSTR, ENDLEN);
1126         resp_add_iov(c->resp, c->resp->wbuf, ENDLEN);
1127 
1128         return;
1129     }
1130 
1131     // We test the command length all the way down here because multigets can
1132     // be very long, and they're chopped up by now.
1133     if (cmdlen >= MCP_REQUEST_MAXLEN) {
1134         WSTAT_INCR(c->thread, proxy_conn_errors, 1);
1135         if (!resp_start(c)) {
1136             conn_set_state(c, conn_closing);
1137             return;
1138         }
1139         proxy_out_errstring(c->resp, PROXY_CLIENT_ERROR, "request too long");
1140         conn_set_state(c, conn_closing);
1141         return;
1142     }
1143 
1144     if (!resp_start(c)) {
1145         conn_set_state(c, conn_closing);
1146         return;
1147     }
1148 
1149     // Count requests handled by proxy vs local.
1150     // Also batch the counts down this far so we can lock once for the active
1151     // counter instead of twice.
1152     struct proxy_int_stats *istats = c->thread->proxy_int_stats;
1153     uint64_t active_reqs = 0;
1154     WSTAT_L(c->thread);
1155     istats->counters[pr.command]++;
1156     c->thread->stats.proxy_conn_requests++;
1157     active_reqs = c->thread->stats.proxy_req_active;
1158     WSTAT_UL(c->thread);
1159 
1160     if (active_reqs >= ctx->active_req_limit) {
1161         proxy_out_errstring(c->resp, PROXY_SERVER_ERROR, "active request limit reached");
1162         if (pr.vlen != 0) {
1163             c->sbytes = pr.vlen;
1164             conn_set_state(c, conn_swallow);
1165         }
1166         return;
1167     }
1168 
1169     // hook is owned by a function generator.
1170     mcp_rcontext_t *rctx = mcp_funcgen_start(L, hook_ref.ctx, &pr);
1171     if (rctx == NULL) {
1172         proxy_out_errstring(c->resp, PROXY_SERVER_ERROR, "lua start failure");
1173         if (pr.vlen != 0) {
1174             c->sbytes = pr.vlen;
1175             conn_set_state(c, conn_swallow);
1176         }
1177         return;
1178     }
1179 
1180     mcp_set_request(&pr, rctx->request, command, cmdlen);
1181     rctx->request->ascii_multiget = multiget;
1182     rctx->c = c;
1183     rctx->conn_fd = c->sfd;
1184     rctx->pending_reqs++; // seed counter with the "main" request
1185     // remember the top level mc_resp, because further requests on the
1186     // same connection will replace c->resp.
1187     rctx->resp = c->resp;
1188 
1189     // for the very first call we need to place:
1190     // - rctx->function_ref + rctx->request_ref
1191     // I _think_ here is the right place to do that?
1192     lua_rawgeti(rctx->Lc, LUA_REGISTRYINDEX, rctx->function_ref);
1193     lua_rawgeti(rctx->Lc, LUA_REGISTRYINDEX, rctx->request_ref);
1194 
1195     if (pr.vlen != 0) {
1196         c->item = NULL;
1197         // Need to add the used memory later due to needing an extra callback
1198         // handler on error during nread.
1199         bool oom = proxy_bufmem_checkadd(c->thread, 0);
1200 
1201         // relying on temporary malloc's not having fragmentation
1202         if (!oom) {
1203             c->item = malloc(pr.vlen);
1204         }
1205         if (c->item == NULL) {
1206             // return the RCTX
1207             rctx->pending_reqs--;
1208             mcp_funcgen_return_rctx(rctx);
1209             // normal cleanup
1210             lua_settop(L, 0);
1211             proxy_out_errstring(c->resp, PROXY_SERVER_ERROR, "out of memory");
1212             c->sbytes = pr.vlen;
1213             conn_set_state(c, conn_swallow);
1214             return;
1215         }
1216         c->item_malloced = true;
1217         c->ritem = c->item;
1218         c->rlbytes = pr.vlen;
1219 
1220         // remember the request context for later.
1221         c->proxy_rctx = rctx;
1222 
1223         conn_set_state(c, conn_nread);
1224         return;
1225     }
1226 
1227     proxy_run_rcontext(rctx);
1228     mcp_funcgen_return_rctx(rctx);
1229 
1230     lua_settop(L, 0); // clear any junk from the main thread.
1231 }
1232 
mcp_prep_bare_resobj(lua_State * L,LIBEVENT_THREAD * t)1233 mcp_resp_t *mcp_prep_bare_resobj(lua_State *L, LIBEVENT_THREAD *t) {
1234     mcp_resp_t *r = lua_newuserdatauv(L, sizeof(mcp_resp_t), 0);
1235     // FIXME (v2): is this memset still necessary? I was using it for
1236     // debugging.
1237     memset(r, 0, sizeof(mcp_resp_t));
1238     r->thread = t;
1239     assert(r->thread != NULL);
1240     gettimeofday(&r->start, NULL);
1241 
1242     luaL_getmetatable(L, "mcp.response");
1243     lua_setmetatable(L, -2);
1244 
1245     return r;
1246 }
1247 
mcp_set_resobj(mcp_resp_t * r,mcp_request_t * rq,mcp_backend_t * be,LIBEVENT_THREAD * t)1248 void mcp_set_resobj(mcp_resp_t *r, mcp_request_t *rq, mcp_backend_t *be, LIBEVENT_THREAD *t) {
1249     memset(r, 0, sizeof(mcp_resp_t));
1250     r->buf = NULL;
1251     r->blen = 0;
1252     r->thread = t;
1253     assert(r->thread != NULL);
1254     gettimeofday(&r->start, NULL);
1255     // Set noreply mode.
1256     // TODO (v2): the response "inherits" the request's noreply mode, which isn't
1257     // strictly correct; we should inherit based on the request that spawned
1258     // the coroutine but the structure doesn't allow that yet.
1259     // Should also be able to settle this exact mode from the parser so we
1260     // don't have to re-branch here.
1261     if (rq->pr.noreply) {
1262         if (rq->pr.cmd_type == CMD_TYPE_META) {
1263             r->mode = RESP_MODE_METAQUIET;
1264             for (int x = 2; x < rq->pr.ntokens; x++) {
1265                 if (rq->request[rq->pr.tokens[x]] == 'q') {
1266                     rq->request[rq->pr.tokens[x]] = ' ';
1267                 }
1268             }
1269         } else {
1270             r->mode = RESP_MODE_NOREPLY;
1271             rq->request[rq->pr.reqlen - 3] = 'Y';
1272         }
1273     } else {
1274         r->mode = RESP_MODE_NORMAL;
1275     }
1276 
1277     r->cmd = rq->pr.command;
1278 
1279     strncpy(r->be_name, be->name, MAX_NAMELEN+1);
1280     strncpy(r->be_port, be->port, MAX_PORTLEN+1);
1281 
1282 }
1283 
mcp_prep_resobj(lua_State * L,mcp_request_t * rq,mcp_backend_t * be,LIBEVENT_THREAD * t)1284 mcp_resp_t *mcp_prep_resobj(lua_State *L, mcp_request_t *rq, mcp_backend_t *be, LIBEVENT_THREAD *t) {
1285     mcp_resp_t *r = lua_newuserdatauv(L, sizeof(mcp_resp_t), 0);
1286     mcp_set_resobj(r, rq, be, t);
1287 
1288     luaL_getmetatable(L, "mcp.response");
1289     lua_setmetatable(L, -2);
1290 
1291     return r;
1292 }
1293 
mcp_resp_set_elapsed(mcp_resp_t * r)1294 void mcp_resp_set_elapsed(mcp_resp_t *r) {
1295     struct timeval end;
1296     // stamp the elapsed time into the response object.
1297     gettimeofday(&end, NULL);
1298     r->elapsed = (end.tv_sec - r->start.tv_sec) * 1000000 +
1299         (end.tv_usec - r->start.tv_usec);
1300 }
1301 
1302 // Used for any cases where we're queueing requests to the IO subsystem.
1303 // NOTE: it's not currently possible to limit the memory used by the IO
1304 // object cache. So this check is redundant, and any callers may proceed
1305 // as though it is successful.
mcp_queue_rctx_io(mcp_rcontext_t * rctx,mcp_request_t * rq,mcp_backend_t * be,mcp_resp_t * r)1306 io_pending_proxy_t *mcp_queue_rctx_io(mcp_rcontext_t *rctx, mcp_request_t *rq, mcp_backend_t *be, mcp_resp_t *r) {
1307     conn *c = rctx->c;
1308     io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_PROXY);
1309     io_pending_proxy_t *p = do_cache_alloc(c->thread->io_cache);
1310     if (p == NULL) {
1311         WSTAT_INCR(c->thread, proxy_conn_oom, 1);
1312         proxy_lua_error(rctx->Lc, "out of memory allocating from IO cache");
1313         // NOTE: the error call above jumps to an error handler, so this does
1314         // not actually return.
1315         return NULL;
1316     }
1317 
1318     // this is a re-cast structure, so assert that we never outsize it.
1319     assert(sizeof(io_pending_t) >= sizeof(io_pending_proxy_t));
1320     memset(p, 0, sizeof(io_pending_proxy_t));
1321     // set up back references.
1322     p->io_queue_type = IO_QUEUE_PROXY;
1323     p->thread = c->thread;
1324     p->c = c;
1325     p->client_resp = r;
1326     p->flushed = false;
1327     p->return_cb = NULL;
1328     p->finalize_cb = proxy_finalize_rctx_cb;
1329 
1330     // pass along the request context for resumption.
1331     p->rctx = rctx;
1332 
1333     if (rq) {
1334         p->ascii_multiget = rq->ascii_multiget;
1335         // The direct backend object. Lc is holding the reference in the stack
1336         p->backend = be;
1337 
1338         mcp_request_attach(rq, p);
1339     }
1340 
1341     // HACK
1342     // find the parent rctx
1343     while (rctx->parent) {
1344         rctx = rctx->parent;
1345     }
1346     // Hack to enforce the first iop increments client IO queue counter.
1347     if (!rctx->first_queue) {
1348         rctx->first_queue = true;
1349         p->qcount_incr = true;
1350     }
1351     // END HACK
1352 
1353     // link into the batch chain.
1354     p->next = q->stack_ctx;
1355     q->stack_ctx = p;
1356     P_DEBUG("%s: queued\n", __func__);
1357 
1358     return p;
1359 }
1360 
1361 // DO NOT call this method frequently! globally locked!
mcp_sharedvm_delta(proxy_ctx_t * ctx,int tidx,const char * name,int delta)1362 void mcp_sharedvm_delta(proxy_ctx_t *ctx, int tidx, const char *name, int delta) {
1363     lua_State *L = ctx->proxy_sharedvm;
1364     pthread_mutex_lock(&ctx->sharedvm_lock);
1365 
1366     if (lua_getfield(L, tidx, name) == LUA_TNIL) {
1367         lua_pop(L, 1);
1368         lua_pushinteger(L, delta);
1369         lua_setfield(L, tidx, name);
1370     } else {
1371         lua_pushinteger(L, delta);
1372         lua_arith(L, LUA_OPADD);
1373         lua_setfield(L, tidx, name);
1374     }
1375 
1376     pthread_mutex_unlock(&ctx->sharedvm_lock);
1377 }
1378 
mcp_sharedvm_remove(proxy_ctx_t * ctx,int tidx,const char * name)1379 void mcp_sharedvm_remove(proxy_ctx_t *ctx, int tidx, const char *name) {
1380     lua_State *L = ctx->proxy_sharedvm;
1381     pthread_mutex_lock(&ctx->sharedvm_lock);
1382 
1383     lua_pushnil(L);
1384     lua_setfield(L, tidx, name);
1385 
1386     pthread_mutex_unlock(&ctx->sharedvm_lock);
1387 }
1388 
1389 // Global object support code.
1390 // Global objects are created in the configuration VM, and referenced in
1391 // worker VMs via proxy objects that refer back to memory in the
1392 // configuration VM.
1393 // We manage reference counts: once all remote proxy objects are collected, we
1394 // signal the config thread to remove a final reference and collect garbage to
1395 // remove the global object.
1396 
mcp_gobj_enqueue(proxy_ctx_t * ctx,struct mcp_globalobj_s * g)1397 static void mcp_gobj_enqueue(proxy_ctx_t *ctx, struct mcp_globalobj_s *g) {
1398     pthread_mutex_lock(&ctx->manager_lock);
1399     STAILQ_INSERT_TAIL(&ctx->manager_head, g, next);
1400     pthread_cond_signal(&ctx->manager_cond);
1401     pthread_mutex_unlock(&ctx->manager_lock);
1402 }
1403 
1404 // References the object, initializing the self-reference if necessary.
1405 // Call from config thread, with global object on top of stack.
mcp_gobj_ref(lua_State * L,struct mcp_globalobj_s * g)1406 void mcp_gobj_ref(lua_State *L, struct mcp_globalobj_s *g) {
1407     pthread_mutex_lock(&g->lock);
1408     if (g->self_ref == 0) {
1409         // Initialization requires a small dance:
1410         // - store a negative of our ref, increase refcount an extra time
1411         // - then link and signal the manager thread as though we were GC'ing
1412         // the object.
1413         // - the manager thread will later acknowledge the initialization of
1414         // this global object and negate the self_ref again
1415         // - this prevents an unused proxy object from causing the global
1416         // object to be reaped early while we are still copying it to worker
1417         // threads, as the manager thread will block waiting for the config
1418         // thread to finish its reload work.
1419         g->self_ref = -luaL_ref(L, LUA_REGISTRYINDEX);
1420         g->refcount++;
1421         proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1422         mcp_gobj_enqueue(ctx, g);
1423     } else {
1424         lua_pop(L, 1); // drop the reference we didn't end up using.
1425     }
1426     g->refcount++;
1427     pthread_mutex_unlock(&g->lock);
1428 }
1429 
mcp_gobj_unref(proxy_ctx_t * ctx,struct mcp_globalobj_s * g)1430 void mcp_gobj_unref(proxy_ctx_t *ctx, struct mcp_globalobj_s *g) {
1431     pthread_mutex_lock(&g->lock);
1432     g->refcount--;
1433     if (g->refcount == 0) {
1434         mcp_gobj_enqueue(ctx, g);
1435     }
1436     pthread_mutex_unlock(&g->lock);
1437 }
1438 
mcp_gobj_finalize(struct mcp_globalobj_s * g)1439 void mcp_gobj_finalize(struct mcp_globalobj_s *g) {
1440     pthread_mutex_destroy(&g->lock);
1441 }
1442 
mcp_profile_alloc(void * ud,void * ptr,size_t osize,size_t nsize)1443 static void *mcp_profile_alloc(void *ud, void *ptr, size_t osize,
1444                                             size_t nsize) {
1445     struct mcp_memprofile *prof = ud;
1446     struct timespec now;
1447     clock_gettime(CLOCK_MONOTONIC, &now);
1448     enum mcp_memprofile_types t = mcp_memp_free;
1449     if (ptr == NULL) {
1450         switch (osize) {
1451             case LUA_TSTRING:
1452                 t = mcp_memp_string;
1453                 //fprintf(stderr, "alloc string: %ld\n", nsize);
1454                 break;
1455             case LUA_TTABLE:
1456                 t = mcp_memp_table;
1457                 //fprintf(stderr, "alloc table: %ld\n", nsize);
1458                 break;
1459             case LUA_TFUNCTION:
1460                 t = mcp_memp_func;
1461                 //fprintf(stderr, "alloc func: %ld\n", nsize);
1462                 break;
1463             case LUA_TUSERDATA:
1464                 t = mcp_memp_userdata;
1465                 //fprintf(stderr, "alloc userdata: %ld\n", nsize);
1466                 break;
1467             case LUA_TTHREAD:
1468                 t = mcp_memp_thread;
1469                 //fprintf(stderr, "alloc thread: %ld\n", nsize);
1470                 break;
1471             default:
1472                 t = mcp_memp_default;
1473                 //fprintf(stderr, "alloc osize: %ld nsize: %ld\n", osize, nsize);
1474         }
1475         prof->allocs[t]++;
1476         prof->alloc_bytes[t] += nsize;
1477     } else {
1478         if (nsize != 0) {
1479             prof->allocs[mcp_memp_realloc]++;
1480             prof->alloc_bytes[mcp_memp_realloc] += nsize;
1481         } else {
1482             prof->allocs[mcp_memp_free]++;
1483             prof->alloc_bytes[mcp_memp_free] += osize;
1484         }
1485         //fprintf(stderr, "realloc: osize: %ld nsize: %ld\n", osize, nsize);
1486     }
1487 
1488     if (now.tv_sec != prof->last_status.tv_sec) {
1489         prof->last_status.tv_sec = now.tv_sec;
1490         fprintf(stderr, "MEMPROF[%d]:\tstring[%llu][%llu] table[%llu][%llu] func[%llu][%llu] udata[%llu][%llu] thr[%llu][%llu] def[%llu][%llu] realloc[%llu][%llu] free[%llu][%llu]\n",
1491                 prof->id,
1492                 (unsigned long long)prof->allocs[1],
1493                 (unsigned long long)prof->alloc_bytes[1],
1494                 (unsigned long long)prof->allocs[2],
1495                 (unsigned long long)prof->alloc_bytes[2],
1496                 (unsigned long long)prof->allocs[3],
1497                 (unsigned long long)prof->alloc_bytes[3],
1498                 (unsigned long long)prof->allocs[4],
1499                 (unsigned long long)prof->alloc_bytes[4],
1500                 (unsigned long long)prof->allocs[5],
1501                 (unsigned long long)prof->alloc_bytes[5],
1502                 (unsigned long long)prof->allocs[6],
1503                 (unsigned long long)prof->alloc_bytes[6],
1504                 (unsigned long long)prof->allocs[7],
1505                 (unsigned long long)prof->alloc_bytes[7],
1506                 (unsigned long long)prof->allocs[0],
1507                 (unsigned long long)prof->alloc_bytes[0]);
1508         for (int x = 0; x < 8; x++) {
1509             prof->allocs[x] = 0;
1510             prof->alloc_bytes[x] = 0;
1511         }
1512     }
1513 
1514     if (nsize == 0) {
1515         free(ptr);
1516         return NULL;
1517     } else {
1518         return realloc(ptr, nsize);
1519     }
1520 }
1521 
1522 // Common lua debug command.
dump_stack(lua_State * L,const char * msg)1523 __attribute__((unused)) void dump_stack(lua_State *L, const char *msg) {
1524     int top = lua_gettop(L);
1525     int i = 1;
1526     fprintf(stderr, "--TOP OF STACK [%d] | %s\n", top, msg);
1527     for (; i < top + 1; i++) {
1528         int type = lua_type(L, i);
1529         void *udata = NULL;
1530         // lets find the metatable of this userdata to identify it.
1531         if (lua_getmetatable(L, i) != 0) {
1532             lua_pushstring(L, "__name");
1533             if (lua_rawget(L, -2) != LUA_TNIL) {
1534                 if (type == LUA_TUSERDATA) {
1535                     udata = lua_touserdata(L, i);
1536                 }
1537                 fprintf(stderr, "--|%d| [%s] (%s) [ptr: %p]\n", i, lua_typename(L, type), lua_tostring(L, -1), udata);
1538                 lua_pop(L, 2);
1539                 continue;
1540             }
1541             lua_pop(L, 2);
1542         }
1543         if (type == LUA_TSTRING) {
1544             fprintf(stderr, "--|%d| [%s] | %s\n", i, lua_typename(L, type), lua_tostring(L, i));
1545         } else {
1546             if (type == LUA_TUSERDATA) {
1547                 udata = lua_touserdata(L, i);
1548             }
1549             fprintf(stderr, "--|%d| [%s] [ptr: %p]\n", i, lua_typename(L, type), udata);
1550         }
1551     }
1552     fprintf(stderr, "-----------------\n");
1553 }
1554 
1555 // Not very pretty, but helped.
1556 // Nice to haves:
1557 // - summarize counts for each metatable (easy enough to do from logging)
1558 // - use a less noisy stack dump instead of calling dump_stack()
dump_registry(lua_State * L,const char * msg)1559 __attribute__((unused)) void dump_registry(lua_State *L, const char *msg) {
1560     int ref_size = lua_rawlen(L, LUA_REGISTRYINDEX);
1561     fprintf(stderr, "--LUA REGISTRY TABLE [%d] | %s\n", ref_size, msg);
1562     // walk registry
1563     int ridx = lua_absindex(L, LUA_REGISTRYINDEX);
1564     int udata = 0;
1565     int number = 0;
1566     int string = 0;
1567     int function = 0;
1568     int table = 0;
1569     lua_pushnil(L);
1570     while (lua_next(L, ridx) != 0) {
1571         dump_stack(L, "===registry entry===");
1572         int type = lua_type(L, -1);
1573         if (type == LUA_TUSERDATA) {
1574             udata++;
1575         } else if (type == LUA_TNUMBER) {
1576             number++;
1577         } else if (type == LUA_TSTRING) {
1578             string++;
1579         } else if (type == LUA_TFUNCTION) {
1580             function++;
1581         } else if (type == LUA_TTABLE) {
1582             table++;
1583         }
1584         lua_pop(L, 1); // drop value
1585     }
1586     fprintf(stderr, "SUMMARY:\n\n");
1587     fprintf(stderr, "### UDATA\t[%d]\n", udata);
1588     fprintf(stderr, "### NUMBER\t[%d]\n", number);
1589     fprintf(stderr, "### STRING\t[%d]\n", string);
1590     fprintf(stderr, "### FUNCTION\t[%d]\n", function );
1591     fprintf(stderr, "### TABLE\t[%d]\n", table);
1592     fprintf(stderr, "-----------------\n");
1593 }
1594 
1595 // Searches for a function generator with a specific name attached.
1596 // Adding breakpoints on the print lines lets you inspect the fgen and its
1597 // slots.
dump_funcgen(lua_State * L,const char * name,const char * msg)1598 __attribute__((unused)) void dump_funcgen(lua_State *L, const char *name, const char *msg) {
1599     int ref_size = lua_rawlen(L, LUA_REGISTRYINDEX);
1600     fprintf(stderr, "--LUA FUNCGEN FINDER [%d] | %s\n", ref_size, msg);
1601     // walk registry
1602     int ridx = lua_absindex(L, LUA_REGISTRYINDEX);
1603     lua_pushnil(L);
1604     while (lua_next(L, ridx) != 0) {
1605         int type = lua_type(L, -1);
1606         if (type == LUA_TUSERDATA) {
1607             mcp_funcgen_t *f = luaL_testudata(L, -1, "mcp.funcgen");
1608             if (f != NULL && strcmp(name, f->name) == 0) {
1609                 fprintf(stderr, "===found funcgen [%s] [%p]===\n", f->name, (void *)f);
1610                 lua_getiuservalue(L, -1, 1);
1611                 int tidx = lua_absindex(L, -1);
1612                 lua_pushnil(L);
1613                 while (lua_next(L, tidx) != 0) {
1614                     mcp_rcontext_t *rctx = lua_touserdata(L, -1);
1615                     if (rctx != NULL) {
1616                         fprintf(stderr, "-- slot: [%p]\n", (void *)rctx);
1617                     }
1618                     lua_pop(L, 1); // drop value
1619                 }
1620                 lua_pop(L, 1); // drop slot table
1621             }
1622         }
1623         lua_pop(L, 1); // drop value
1624     }
1625     fprintf(stderr, "-----------------\n");
1626 }
1627 
dump_pool_info(mcp_pool_t * p)1628 static void dump_pool_info(mcp_pool_t *p) {
1629     fprintf(stderr, "--pool: [%s] size: [%d] be_total: [%d] rc: [%d] io: [%d]\n",
1630             p->beprefix, p->pool_size, p->pool_be_total, p->g.refcount, p->use_iothread);
1631 
1632     for (int x = 0; x < p->pool_be_total; x++) {
1633         mcp_backend_t *be = p->pool[x].be;
1634         // Dumb: pool_be_total is wrong if pool is using iothread. Why?
1635         if (be != NULL) {
1636             fprintf(stderr, "  --be[%d] label: [%s] name: [%s] conns: [%d] depth: [%d]\n",
1637                     x, be->label, be->name, be->conncount, be->depth);
1638             for (int i = 0; i < be->conncount; i++) {
1639                 struct mcp_backendconn_s *bec = &be->be[i];
1640                 fprintf(stderr, "    --bec[%d] bad: [%d] failcnt: [%d] depth: [%d] state: [%d] can_write[%d] write_event[%d]\n",
1641                         i, bec->bad, bec->failed_count, bec->depth, bec->state, bec->can_write, event_pending(&bec->timeout_event, EV_WRITE, NULL));
1642             }
1643         }
1644     }
1645     fprintf(stderr, "=======\n");
1646 }
1647 
1648 // Dumps some info about pools.
1649 // If given the config thread, it should find the main pools
1650 // If given a worker thread, it will look for the pool proxy objects and find
1651 // the main pools that way.
dump_pools(lua_State * L,const char * msg)1652 __attribute__((unused)) void dump_pools(lua_State *L, const char *msg) {
1653     int ref_size = lua_rawlen(L, LUA_REGISTRYINDEX);
1654     fprintf(stderr, "--LUA POOL DUMPER [%d] | %s\n", ref_size, msg);
1655     // walk registry
1656     int ridx = lua_absindex(L, LUA_REGISTRYINDEX);
1657     lua_pushnil(L);
1658     while (lua_next(L, ridx) != 0) {
1659         int type = lua_type(L, -1);
1660         if (type == LUA_TUSERDATA) {
1661             mcp_pool_t *p = luaL_testudata(L, -1, "mcp.pool");
1662             if (p != NULL) {
1663                 dump_pool_info(p);
1664             } else {
1665                 mcp_pool_proxy_t *pp = luaL_testudata(L, -1, "mcp.pool_proxy");
1666                 if (pp != NULL) {
1667                     dump_pool_info(pp->main);
1668                 }
1669             }
1670         }
1671         lua_pop(L, 1); // drop value
1672     }
1673     fprintf(stderr, "-----------------\n");
1674 
1675 }
1676