1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Functions for handling the proxy layer. wraps text protocols
4 *
5 * NOTE: many lua functions generate pointers via "lua_newuserdatauv" or
6 * similar. Normal memory checking isn't done as lua will throw a high level
7 * error if malloc fails. Must keep this in mind while allocating data so any
8 * manually malloc'ed information gets freed properly.
9 */
10
11 #include "proxy.h"
12
13 #define PROCESS_MULTIGET true
14 #define PROCESS_NORMAL false
15 #define PROXY_GC_BACKGROUND_SECONDS 2
16 static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool multiget);
17 static void *mcp_profile_alloc(void *ud, void *ptr, size_t osize, size_t nsize);
18
19 /******** EXTERNAL FUNCTIONS ******/
20 // functions starting with _ are breakouts for the public functions.
21
_proxy_advance_lastkb(lua_State * L,LIBEVENT_THREAD * t)22 static inline void _proxy_advance_lastkb(lua_State *L, LIBEVENT_THREAD *t) {
23 int new_kb = lua_gc(L, LUA_GCCOUNT);
24 // We need to slew the increase in "gc pause" because the lua GC actually
25 // needs to run twice to free a userdata: once to run the _gc's and again
26 // to actually clean up the object.
27 // Meaning we will continually increase in size.
28 if (new_kb > t->proxy_vm_last_kb) {
29 new_kb = t->proxy_vm_last_kb + (new_kb - t->proxy_vm_last_kb) * 0.50;
30 }
31
32 // remove the memory freed during this cycle so we can kick off the GC
33 // early if we're very aggressively making garbage.
34 // carry our negative delta forward so a huge reclaim can push for a
35 // couple cycles.
36 if (t->proxy_vm_negative_delta >= new_kb) {
37 t->proxy_vm_negative_delta -= new_kb;
38 new_kb = 1;
39 } else {
40 new_kb -= t->proxy_vm_negative_delta;
41 t->proxy_vm_negative_delta = 0;
42 }
43
44 t->proxy_vm_last_kb = new_kb;
45 }
46
47 // The lua GC is paused while running requests. Run it manually inbetween
48 // processing network events.
proxy_gc_poke(LIBEVENT_THREAD * t)49 void proxy_gc_poke(LIBEVENT_THREAD *t) {
50 lua_State *L = t->L;
51 struct proxy_int_stats *is = t->proxy_int_stats;
52 int vm_kb = lua_gc(L, LUA_GCCOUNT) + t->proxy_vm_extra_kb;
53 if (t->proxy_vm_last_kb == 0) {
54 t->proxy_vm_last_kb = vm_kb;
55 }
56 WSTAT_L(t);
57 is->vm_memory_kb = vm_kb;
58 WSTAT_UL(t);
59
60 // equivalent of luagc "pause" value
61 int last = t->proxy_vm_last_kb;
62 if (t->proxy_vm_gcrunning <= 0 && vm_kb > last * 2) {
63 t->proxy_vm_gcrunning = 1;
64 //fprintf(stderr, "PROXYGC: proxy_gc_poke START [cur: %d - last: %d]\n", vm_kb, last);
65 }
66
67 // We configure small GC "steps" then increase the number of times we run
68 // a step based on current memory usage.
69 if (t->proxy_vm_gcrunning > 0) {
70 t->proxy_vm_needspoke = false;
71 int loops = t->proxy_vm_gcrunning;
72 int done = 0;
73 /*fprintf(stderr, "PROXYGC: proxy_gc_poke [cur: %d - last: %d - loops: %d]\n",
74 vm_kb,
75 t->proxy_vm_last_kb,
76 loops);*/
77 while (loops-- && !done) {
78 // reset counters once full GC cycle has completed
79 done = lua_gc(L, LUA_GCSTEP, 0);
80 }
81
82 int vm_kb_after = lua_gc(L, LUA_GCCOUNT);
83 int vm_kb_clean = vm_kb - t->proxy_vm_extra_kb;
84 if (vm_kb_clean > vm_kb_after) {
85 // track the amount of memory freed during the GC cycle.
86 t->proxy_vm_negative_delta += vm_kb_clean - vm_kb_after;
87 }
88
89 if (done) {
90 _proxy_advance_lastkb(L, t);
91 t->proxy_vm_extra_kb = 0;
92 t->proxy_vm_gcrunning = 0;
93 WSTAT_L(t);
94 is->vm_gc_runs++;
95 WSTAT_UL(t);
96 //fprintf(stderr, "PROXYGC: proxy_gc_poke COMPLETE [cur: %d next: %d]\n", lua_gc(L, LUA_GCCOUNT), t->proxy_vm_last_kb);
97 }
98
99 // increase the aggressiveness by memory bloat level.
100 if (t->proxy_vm_gcrunning && (last*2) + (last * t->proxy_vm_gcrunning*0.25) < vm_kb) {
101 t->proxy_vm_gcrunning++;
102 //fprintf(stderr, "PROXYGC: proxy_gc_poke INCREASING AGGRESSIVENESS [cur: %d - aggro: %d]\n", t->proxy_vm_last_kb, t->proxy_vm_gcrunning);
103 } else if (t->proxy_vm_gcrunning > 1) {
104 // memory can drop during a run, let the GC slow down again.
105 t->proxy_vm_gcrunning--;
106 //fprintf(stderr, "PROXYGC: proxy_gc_poke DECREASING AGGRESSIVENESS [cur: %d - aggro: %d]\n", t->proxy_vm_last_kb, t->proxy_vm_gcrunning);
107 }
108 }
109 }
110
111 // every couple seconds we force-run one GC step.
112 // this is needed until after API1 is retired and pool objects are no longer
113 // managed by the GC.
114 // We use a negative value so a "timer poke" GC run doesn't cause requests to
115 // suddenly aggressively run the GC.
proxy_gc_timerpoke(evutil_socket_t fd,short event,void * arg)116 static void proxy_gc_timerpoke(evutil_socket_t fd, short event, void *arg) {
117 LIBEVENT_THREAD *t = arg;
118 struct timeval next = { PROXY_GC_BACKGROUND_SECONDS, 0 };
119 evtimer_add(t->proxy_gc_timer, &next);
120 // if GC ran within the last few seconds, don't do anything.
121 if (!t->proxy_vm_needspoke) {
122 t->proxy_vm_needspoke = true;
123 return;
124 }
125
126 // if we weren't told to skip and there's otherwise no GC running, start a
127 // GC run.
128 if (t->proxy_vm_gcrunning == 0) {
129 t->proxy_vm_gcrunning = -1;
130 }
131
132 // only advance GC if we're doing our own timer run.
133 if (t->proxy_vm_gcrunning == -1 && lua_gc(t->L, LUA_GCSTEP, 0)) {
134 _proxy_advance_lastkb(t->L, t);
135 t->proxy_vm_extra_kb = 0;
136 t->proxy_vm_gcrunning = 0;
137 }
138 }
139
proxy_bufmem_checkadd(LIBEVENT_THREAD * t,int len)140 bool proxy_bufmem_checkadd(LIBEVENT_THREAD *t, int len) {
141 bool oom = false;
142 pthread_mutex_lock(&t->proxy_limit_lock);
143 if (t->proxy_buffer_memory_used > t->proxy_buffer_memory_limit) {
144 oom = true;
145 } else {
146 t->proxy_buffer_memory_used += len;
147 }
148 pthread_mutex_unlock(&t->proxy_limit_lock);
149 return oom;
150 }
151
152 // see also: process_extstore_stats()
proxy_stats(void * arg,ADD_STAT add_stats,void * c)153 void proxy_stats(void *arg, ADD_STAT add_stats, void *c) {
154 if (arg == NULL) {
155 return;
156 }
157 proxy_ctx_t *ctx = arg;
158
159 STAT_L(ctx);
160 APPEND_STAT("proxy_config_reloads", "%llu", (unsigned long long)ctx->global_stats.config_reloads);
161 APPEND_STAT("proxy_config_reload_fails", "%llu", (unsigned long long)ctx->global_stats.config_reload_fails);
162 APPEND_STAT("proxy_config_cron_runs", "%llu", (unsigned long long)ctx->global_stats.config_cron_runs);
163 APPEND_STAT("proxy_config_cron_fails", "%llu", (unsigned long long)ctx->global_stats.config_cron_fails);
164 APPEND_STAT("proxy_backend_total", "%llu", (unsigned long long)ctx->global_stats.backend_total);
165 APPEND_STAT("proxy_backend_marked_bad", "%llu", (unsigned long long)ctx->global_stats.backend_marked_bad);
166 APPEND_STAT("proxy_backend_failed", "%llu", (unsigned long long)ctx->global_stats.backend_failed);
167 APPEND_STAT("proxy_request_failed_depth", "%llu", (unsigned long long)ctx->global_stats.request_failed_depth);
168 STAT_UL(ctx);
169 }
170
process_proxy_stats(void * arg,ADD_STAT add_stats,void * c)171 void process_proxy_stats(void *arg, ADD_STAT add_stats, void *c) {
172 char key_str[STAT_KEY_LEN];
173 struct proxy_int_stats istats = {0};
174 uint64_t req_limit = 0;
175 uint64_t buffer_memory_limit = 0;
176 uint64_t buffer_memory_used = 0;
177
178 if (!arg) {
179 return;
180 }
181 proxy_ctx_t *ctx = arg;
182 STAT_L(ctx);
183 req_limit = ctx->active_req_limit;
184 buffer_memory_limit = ctx->buffer_memory_limit;
185
186 // prepare aggregated counters.
187 struct proxy_user_stats_entry *us = ctx->user_stats;
188 int stats_num = ctx->user_stats_num;
189 uint64_t counters[stats_num];
190 memset(counters, 0, sizeof(counters));
191
192 // TODO (v3): more globals to remove and/or change API method.
193 // aggregate worker thread counters.
194 for (int x = 0; x < settings.num_threads; x++) {
195 LIBEVENT_THREAD *t = get_worker_thread(x);
196 struct proxy_user_stats *tus = t->proxy_user_stats;
197 struct proxy_int_stats *is = t->proxy_int_stats;
198 WSTAT_L(t);
199 for (int i = 0; i < CMD_FINAL; i++) {
200 istats.counters[i] += is->counters[i];
201 }
202 istats.vm_gc_runs += is->vm_gc_runs;
203 istats.vm_memory_kb += is->vm_memory_kb;
204 if (tus && tus->num_stats >= stats_num) {
205 for (int i = 0; i < stats_num; i++) {
206 counters[i] += tus->counters[i];
207 }
208 }
209 WSTAT_UL(t);
210 pthread_mutex_lock(&t->proxy_limit_lock);
211 buffer_memory_used += t->proxy_buffer_memory_used;
212 pthread_mutex_unlock(&t->proxy_limit_lock);
213 }
214
215 // return all of the user generated stats
216 if (ctx->user_stats_namebuf) {
217 char vbuf[INCR_MAX_STORAGE_LEN];
218 char *e = NULL; // ptr into vbuf
219 const char *pfx = "user_";
220 const size_t pfxlen = strlen(pfx);
221 for (int x = 0; x < stats_num; x++) {
222 if (us[x].cname) {
223 char *name = ctx->user_stats_namebuf + us[x].cname;
224 size_t nlen = strlen(name);
225 if (nlen > STAT_KEY_LEN-6) {
226 // impossible, but for paranoia.
227 nlen = STAT_KEY_LEN-6;
228 }
229 // avoiding an snprintf call for some performance ("user_%s")
230 memcpy(key_str, pfx, pfxlen);
231 memcpy(key_str+pfxlen, name, nlen);
232 key_str[pfxlen+nlen] = '\0';
233
234 // APPEND_STAT() calls another snprintf, which calls our
235 // add_stats argument. Lets skip yet another snprintf with
236 // some unrolling.
237 e = itoa_u64(counters[x], vbuf);
238 *(e+1) = '\0';
239 add_stats(key_str, pfxlen+nlen, vbuf, e-vbuf, c);
240 }
241 }
242 }
243
244 STAT_UL(ctx);
245
246 if (buffer_memory_limit == UINT64_MAX) {
247 buffer_memory_limit = 0;
248 } else {
249 buffer_memory_limit *= settings.num_threads;
250 }
251 if (req_limit == UINT64_MAX) {
252 req_limit = 0;
253 } else {
254 req_limit *= settings.num_threads;
255 }
256
257 // return proxy counters
258 APPEND_STAT("active_req_limit", "%llu", (unsigned long long)req_limit);
259 APPEND_STAT("buffer_memory_limit", "%llu", (unsigned long long)buffer_memory_limit);
260 APPEND_STAT("buffer_memory_used", "%llu", (unsigned long long)buffer_memory_used);
261 APPEND_STAT("vm_gc_runs", "%llu", (unsigned long long)istats.vm_gc_runs);
262 APPEND_STAT("vm_memory_kb", "%llu", (unsigned long long)istats.vm_memory_kb);
263 APPEND_STAT("cmd_mg", "%llu", (unsigned long long)istats.counters[CMD_MG]);
264 APPEND_STAT("cmd_ms", "%llu", (unsigned long long)istats.counters[CMD_MS]);
265 APPEND_STAT("cmd_md", "%llu", (unsigned long long)istats.counters[CMD_MD]);
266 APPEND_STAT("cmd_mn", "%llu", (unsigned long long)istats.counters[CMD_MN]);
267 APPEND_STAT("cmd_ma", "%llu", (unsigned long long)istats.counters[CMD_MA]);
268 APPEND_STAT("cmd_me", "%llu", (unsigned long long)istats.counters[CMD_ME]);
269 APPEND_STAT("cmd_get", "%llu", (unsigned long long)istats.counters[CMD_GET]);
270 APPEND_STAT("cmd_gat", "%llu", (unsigned long long)istats.counters[CMD_GAT]);
271 APPEND_STAT("cmd_set", "%llu", (unsigned long long)istats.counters[CMD_SET]);
272 APPEND_STAT("cmd_add", "%llu", (unsigned long long)istats.counters[CMD_ADD]);
273 APPEND_STAT("cmd_cas", "%llu", (unsigned long long)istats.counters[CMD_CAS]);
274 APPEND_STAT("cmd_gets", "%llu", (unsigned long long)istats.counters[CMD_GETS]);
275 APPEND_STAT("cmd_gats", "%llu", (unsigned long long)istats.counters[CMD_GATS]);
276 APPEND_STAT("cmd_incr", "%llu", (unsigned long long)istats.counters[CMD_INCR]);
277 APPEND_STAT("cmd_decr", "%llu", (unsigned long long)istats.counters[CMD_DECR]);
278 APPEND_STAT("cmd_touch", "%llu", (unsigned long long)istats.counters[CMD_TOUCH]);
279 APPEND_STAT("cmd_append", "%llu", (unsigned long long)istats.counters[CMD_APPEND]);
280 APPEND_STAT("cmd_prepend", "%llu", (unsigned long long)istats.counters[CMD_PREPEND]);
281 APPEND_STAT("cmd_delete", "%llu", (unsigned long long)istats.counters[CMD_DELETE]);
282 APPEND_STAT("cmd_replace", "%llu", (unsigned long long)istats.counters[CMD_REPLACE]);
283 }
284
process_proxy_funcstats(void * arg,ADD_STAT add_stats,void * c)285 void process_proxy_funcstats(void *arg, ADD_STAT add_stats, void *c) {
286 char key_str[STAT_KEY_LEN];
287 if (!arg) {
288 return;
289 }
290 proxy_ctx_t *ctx = arg;
291 lua_State *L = ctx->proxy_sharedvm;
292 pthread_mutex_lock(&ctx->sharedvm_lock);
293
294 // iterate all of the named function slots
295 lua_pushnil(L);
296 while (lua_next(L, SHAREDVM_FGEN_IDX) != 0) {
297 int n = lua_tointeger(L, -1);
298 lua_pop(L, 1); // drop the value, leave the key.
299 if (n != 0) {
300 // reuse the key. make a copy since rawget will pop it.
301 lua_pushvalue(L, -1);
302 lua_rawget(L, SHAREDVM_FGENSLOT_IDX);
303 int slots = lua_tointeger(L, -1);
304 lua_pop(L, 1); // drop the slot count.
305
306 // now grab the name key.
307 const char *name = lua_tostring(L, -1);
308 snprintf(key_str, STAT_KEY_LEN-1, "funcs_%s", name);
309 APPEND_STAT(key_str, "%d", n);
310 snprintf(key_str, STAT_KEY_LEN-1, "slots_%s", name);
311 APPEND_STAT(key_str, "%d", slots);
312 } else {
313 // TODO: It is safe to delete keys here. Slightly complex so low
314 // priority.
315 }
316 }
317
318 pthread_mutex_unlock(&ctx->sharedvm_lock);
319 }
320
process_proxy_bestats(void * arg,ADD_STAT add_stats,void * c)321 void process_proxy_bestats(void *arg, ADD_STAT add_stats, void *c) {
322 char key_str[STAT_KEY_LEN];
323 if (!arg) {
324 return;
325 }
326 proxy_ctx_t *ctx = arg;
327 lua_State *L = ctx->proxy_sharedvm;
328 pthread_mutex_lock(&ctx->sharedvm_lock);
329
330 // iterate all of the listed backends
331 lua_pushnil(L);
332 while (lua_next(L, SHAREDVM_BACKEND_IDX) != 0) {
333 int n = lua_tointeger(L, -1);
334 lua_pop(L, 1); // drop the value, leave the key.
335 if (n != 0) {
336 // now grab the name key.
337 const char *name = lua_tostring(L, -1);
338 snprintf(key_str, STAT_KEY_LEN-1, "bad_%s", name);
339 APPEND_STAT(key_str, "%d", n);
340 } else {
341 // delete keys of backends that are no longer bad or no longer
342 // exist to keep the table small.
343 const char *name = lua_tostring(L, -1);
344 lua_pushnil(L);
345 lua_setfield(L, SHAREDVM_BACKEND_IDX, name);
346 }
347 }
348
349 pthread_mutex_unlock(&ctx->sharedvm_lock);
350 }
351
352 // start the centralized lua state and config thread.
proxy_init(bool use_uring,bool proxy_memprofile)353 void *proxy_init(bool use_uring, bool proxy_memprofile) {
354 proxy_ctx_t *ctx = calloc(1, sizeof(proxy_ctx_t));
355 ctx->use_uring = use_uring;
356 ctx->memprofile = proxy_memprofile;
357
358 pthread_mutex_init(&ctx->config_lock, NULL);
359 pthread_cond_init(&ctx->config_cond, NULL);
360 pthread_mutex_init(&ctx->worker_lock, NULL);
361 pthread_cond_init(&ctx->worker_cond, NULL);
362 pthread_mutex_init(&ctx->manager_lock, NULL);
363 pthread_cond_init(&ctx->manager_cond, NULL);
364 pthread_mutex_init(&ctx->stats_lock, NULL);
365
366 ctx->active_req_limit = UINT64_MAX;
367 ctx->buffer_memory_limit = UINT64_MAX;
368
369 // FIXME (v2): default defines.
370 ctx->tunables.tcp_keepalive = false;
371 ctx->tunables.backend_failure_limit = 3;
372 ctx->tunables.connect.tv_sec = 5;
373 ctx->tunables.retry.tv_sec = 3;
374 ctx->tunables.read.tv_sec = 3;
375 ctx->tunables.flap_backoff_ramp = 1.5;
376 ctx->tunables.flap_backoff_max = 3600;
377 ctx->tunables.backend_depth_limit = 0;
378 ctx->tunables.max_ustats = MAX_USTATS_DEFAULT;
379 ctx->tunables.use_iothread = false;
380 ctx->tunables.use_tls = false;
381
382 STAILQ_INIT(&ctx->manager_head);
383 lua_State *L = NULL;
384 if (ctx->memprofile) {
385 struct mcp_memprofile *prof = calloc(1, sizeof(struct mcp_memprofile));
386 prof->id = ctx->memprofile_thread_counter++;
387 L = lua_newstate(mcp_profile_alloc, prof);
388 } else {
389 L = luaL_newstate();
390 }
391 ctx->proxy_state = L;
392 luaL_openlibs(L);
393 // NOTE: might need to differentiate the libs yes?
394 proxy_register_libs(ctx, NULL, L);
395 // Create the cron table.
396 lua_newtable(L);
397 ctx->cron_ref = luaL_ref(L, LUA_REGISTRYINDEX);
398 ctx->cron_next = INT_MAX;
399
400 // set up the shared state VM. Used by short-lock events (counters/state)
401 // for global visibility.
402 pthread_mutex_init(&ctx->sharedvm_lock, NULL);
403 ctx->proxy_sharedvm = luaL_newstate();
404 luaL_openlibs(ctx->proxy_sharedvm);
405 // we keep info tables in the top level stack so we don't have to
406 // constantly fetch them from registry.
407 lua_newtable(ctx->proxy_sharedvm); // fgen count
408 lua_newtable(ctx->proxy_sharedvm); // fgen slot count
409 lua_newtable(ctx->proxy_sharedvm); // backend down status
410
411 // Create/start the IO thread, which we need before servers
412 // start getting created.
413 proxy_event_thread_t *t = calloc(1, sizeof(proxy_event_thread_t));
414 ctx->proxy_io_thread = t;
415 proxy_init_event_thread(t, ctx, NULL);
416
417 pthread_create(&t->thread_id, NULL, proxy_event_thread, t);
418 thread_setname(t->thread_id, "mc-prx-io");
419
420 _start_proxy_config_threads(ctx);
421 return ctx;
422 }
423
424 // Initialize the VM for an individual worker thread.
proxy_thread_init(void * ctx,LIBEVENT_THREAD * thr)425 void proxy_thread_init(void *ctx, LIBEVENT_THREAD *thr) {
426 assert(ctx != NULL);
427 assert(thr != NULL);
428
429 // Create the hook table.
430 thr->proxy_hooks = calloc(CMD_SIZE, sizeof(struct proxy_hook));
431 if (thr->proxy_hooks == NULL) {
432 fprintf(stderr, "Failed to allocate proxy hooks\n");
433 exit(EXIT_FAILURE);
434 }
435 thr->proxy_int_stats = calloc(1, sizeof(struct proxy_int_stats));
436 if (thr->proxy_int_stats == NULL) {
437 fprintf(stderr, "Failed to allocate proxy thread stats\n");
438 exit(EXIT_FAILURE);
439 }
440 pthread_mutex_init(&thr->proxy_limit_lock, NULL);
441 thr->proxy_ctx = ctx;
442
443 // Initialize the lua state.
444 proxy_ctx_t *pctx = ctx;
445 lua_State *L = NULL;
446 if (pctx->memprofile) {
447 struct mcp_memprofile *prof = calloc(1, sizeof(struct mcp_memprofile));
448 prof->id = pctx->memprofile_thread_counter++;
449 L = lua_newstate(mcp_profile_alloc, prof);
450 } else {
451 L = luaL_newstate();
452 }
453
454 // With smaller requests the default incremental collector appears to
455 // never complete. With this simple tuning (def-1, def, def) it seems
456 // fine.
457 // We can't use GCGEN until we manage pools with reference counting, as
458 // they may never hit GC and thus never release their connection
459 // resources.
460 lua_gc(L, LUA_GCINC, 199, 100, 12);
461 lua_gc(L, LUA_GCSTOP); // handle GC on our own schedule.
462 thr->L = L;
463 luaL_openlibs(L);
464 proxy_register_libs(ctx, thr, L);
465 // TODO: srand on time? do we need to bother?
466 for (int x = 0; x < 3; x++) {
467 thr->proxy_rng[x] = rand();
468 }
469
470 thr->proxy_gc_timer = evtimer_new(thr->base, proxy_gc_timerpoke, thr);
471 // kick off the timer loop.
472 proxy_gc_timerpoke(0, 0, thr);
473
474 // Create a proxy event thread structure to piggyback on the worker.
475 proxy_event_thread_t *t = calloc(1, sizeof(proxy_event_thread_t));
476 thr->proxy_event_thread = t;
477 proxy_init_event_thread(t, ctx, thr->base);
478 }
479
480 // ctx_stack is a stack of io_pending_proxy_t's.
481 // head of q->s_ctx is the "newest" request so we must push into the head
482 // of the next queue, as requests are dequeued from the head
proxy_submit_cb(io_queue_t * q)483 void proxy_submit_cb(io_queue_t *q) {
484 proxy_event_thread_t *e = ((proxy_ctx_t *)q->ctx)->proxy_io_thread;
485 io_pending_proxy_t *p = q->stack_ctx;
486 io_head_t head;
487 be_head_t w_head; // worker local stack.
488 STAILQ_INIT(&head);
489 STAILQ_INIT(&w_head);
490
491 // NOTE: responses get returned in the correct order no matter what, since
492 // mc_resp's are linked.
493 // we just need to ensure stuff is parsed off the backend in the correct
494 // order.
495 // So we can do with a single list here, but we need to repair the list as
496 // responses are parsed. (in the req_remaining-- section)
497 // TODO (v2):
498 // - except we can't do that because the deferred IO stack isn't
499 // compatible with queue.h.
500 // So for now we build the secondary list with an STAILQ, which
501 // can be transplanted/etc.
502 while (p) {
503 mcp_backend_t *be;
504 P_DEBUG("%s: queueing req for backend: %p\n", __func__, (void *)p);
505 if (p->qcount_incr) {
506 // funny workaround: async IOP's don't count toward
507 // resuming a connection, only the completion of the async
508 // condition.
509 q->count++;
510 }
511
512 if (p->background) {
513 P_DEBUG("%s: fast-returning background object: %p\n", __func__, (void *)p);
514 // intercept background requests
515 // this call cannot recurse if we're on the worker thread,
516 // since the worker thread has to finish executing this
517 // function in order to pick up the returned IO.
518 return_io_pending((io_pending_t *)p);
519 p = p->next;
520 continue;
521 }
522 be = p->backend;
523
524 if (be->use_io_thread) {
525 STAILQ_INSERT_HEAD(&head, p, io_next);
526 } else {
527 // emulate some of handler_dequeue()
528 STAILQ_INSERT_HEAD(&be->io_head, p, io_next);
529 assert(be->depth > -1);
530 be->depth++;
531 if (!be->stacked) {
532 be->stacked = true;
533 STAILQ_INSERT_TAIL(&w_head, be, be_next);
534 }
535 }
536
537 p = p->next;
538 }
539
540 // clear out the submit queue so we can re-queue new IO's inline.
541 q->stack_ctx = NULL;
542
543 if (!STAILQ_EMPTY(&head)) {
544 bool do_notify = false;
545 P_DEBUG("%s: submitting queue to IO thread\n", __func__);
546 // Transfer request stack to event thread.
547 pthread_mutex_lock(&e->mutex);
548 if (STAILQ_EMPTY(&e->io_head_in)) {
549 do_notify = true;
550 }
551 STAILQ_CONCAT(&e->io_head_in, &head);
552 // No point in holding the lock since we're not doing a cond signal.
553 pthread_mutex_unlock(&e->mutex);
554
555 if (do_notify) {
556 // Signal to check queue.
557 #ifdef USE_EVENTFD
558 uint64_t u = 1;
559 // TODO (v2): check result? is it ever possible to get a short write/failure
560 // for an eventfd?
561 if (write(e->event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
562 assert(1 == 0);
563 }
564 #else
565 if (write(e->notify_send_fd, "w", 1) <= 0) {
566 assert(1 == 0);
567 }
568 #endif
569 }
570 }
571
572 if (!STAILQ_EMPTY(&w_head)) {
573 P_DEBUG("%s: running inline worker queue\n", __func__);
574 // emulating proxy_event_handler
575 proxy_run_backend_queue(&w_head);
576 }
577 return;
578 }
579
580 // This function handles return processing for the "old style" API:
581 // currently just `mcp.internal()`
proxy_return_rctx_cb(io_pending_t * pending)582 void proxy_return_rctx_cb(io_pending_t *pending) {
583 io_pending_proxy_t *p = (io_pending_proxy_t *)pending;
584 if (p->client_resp && p->client_resp->blen) {
585 // FIXME: workaround for buffer memory being external to objects.
586 // can't run 0 since that means something special (run the GC)
587 unsigned int kb = p->client_resp->blen / 1000;
588 p->thread->proxy_vm_extra_kb += kb > 0 ? kb : 1;
589 }
590
591 mcp_rcontext_t *rctx = p->rctx;
592 lua_rotate(rctx->Lc, 1, 1);
593 lua_settop(rctx->Lc, 1);
594 // hold the resp for a minute.
595 mc_resp *resp = rctx->resp;
596
597 proxy_run_rcontext(rctx);
598 mcp_funcgen_return_rctx(rctx);
599
600 io_queue_t *q = conn_io_queue_get(p->c, p->io_queue_type);
601 // Detatch the iop from the mc_resp and free it here.
602 conn *c = p->c;
603 if (p->io_type != IO_PENDING_TYPE_EXTSTORE) {
604 // if we're doing an extstore subrequest, the iop needs to live until
605 // resp's ->finish_cb is called.
606 resp->io_pending = NULL;
607 do_cache_free(p->thread->io_cache, p);
608 }
609
610 q->count--;
611 if (q->count == 0) {
612 // call re-add directly since we're already in the worker thread.
613 conn_worker_readd(c);
614 }
615 }
616
617 // This is called if resp_finish is called while an iop exists on the
618 // resp.
619 // so we need to release our iop and rctx.
620 // - This can't happen unless we're doing extstore fetches.
621 // - the request context is freed before connection processing resumes.
proxy_finalize_rctx_cb(io_pending_t * pending)622 void proxy_finalize_rctx_cb(io_pending_t *pending) {
623 io_pending_proxy_t *p = (io_pending_proxy_t *)pending;
624
625 if (p->io_type == IO_PENDING_TYPE_EXTSTORE) {
626 if (p->hdr_it) {
627 // TODO: lock once, worst case this hashes/locks twice.
628 if (p->miss) {
629 item_unlink(p->hdr_it);
630 }
631 item_remove(p->hdr_it);
632 }
633 }
634 }
635
try_read_command_proxy(conn * c)636 int try_read_command_proxy(conn *c) {
637 char *el, *cont;
638
639 if (c->rbytes == 0)
640 return 0;
641
642 el = memchr(c->rcurr, '\n', c->rbytes);
643 if (!el) {
644 if (c->rbytes > 1024) {
645 /*
646 * We didn't have a '\n' in the first k. This _has_ to be a
647 * large multiget, if not we should just nuke the connection.
648 */
649 char *ptr = c->rcurr;
650 while (*ptr == ' ') { /* ignore leading whitespaces */
651 ++ptr;
652 }
653
654 if (ptr - c->rcurr > 100 ||
655 (strncmp(ptr, "get ", 4) && strncmp(ptr, "gets ", 5))) {
656
657 conn_set_state(c, conn_closing);
658 return 1;
659 }
660
661 // ASCII multigets are unbound, so our fixed size rbuf may not
662 // work for this particular workload... For backcompat we'll use a
663 // malloc/realloc/free routine just for this.
664 if (!c->rbuf_malloced) {
665 if (!rbuf_switch_to_malloc(c)) {
666 conn_set_state(c, conn_closing);
667 return 1;
668 }
669 }
670 }
671
672 return 0;
673 }
674 cont = el + 1;
675
676 assert(cont <= (c->rcurr + c->rbytes));
677
678 c->last_cmd_time = current_time;
679 proxy_process_command(c, c->rcurr, cont - c->rcurr, PROCESS_NORMAL);
680
681 c->rbytes -= (cont - c->rcurr);
682 c->rcurr = cont;
683
684 assert(c->rcurr <= (c->rbuf + c->rsize));
685
686 return 1;
687
688 }
689
690 // Called when a connection is closed while in nread state reading a set
691 // Must only be called with an active coroutine.
proxy_cleanup_conn(conn * c)692 void proxy_cleanup_conn(conn *c) {
693 assert(c->proxy_rctx);
694 mcp_rcontext_t *rctx = c->proxy_rctx;
695 assert(rctx->pending_reqs == 1);
696 rctx->pending_reqs = 0;
697
698 mcp_funcgen_return_rctx(rctx);
699 c->proxy_rctx = NULL;
700 }
701
702 // we buffered a SET of some kind.
complete_nread_proxy(conn * c)703 void complete_nread_proxy(conn *c) {
704 assert(c != NULL);
705
706 LIBEVENT_THREAD *thr = c->thread;
707 lua_State *L = thr->L;
708
709 if (c->proxy_rctx == NULL) {
710 complete_nread_ascii(c);
711 return;
712 }
713
714 conn_set_state(c, conn_new_cmd);
715
716 assert(c->proxy_rctx);
717 mcp_rcontext_t *rctx = c->proxy_rctx;
718 mcp_request_t *rq = rctx->request;
719
720 if (strncmp((char *)c->item + rq->pr.vlen - 2, "\r\n", 2) != 0) {
721 lua_settop(L, 0); // clear anything remaining on the main thread.
722 // FIXME (v2): need to set noreply false if mset_res, but that's kind
723 // of a weird hack to begin with. Evaluate how to best do that here.
724 out_string(c, "CLIENT_ERROR bad data chunk");
725 rctx->pending_reqs--;
726 mcp_funcgen_return_rctx(rctx);
727 return;
728 }
729
730 // We move ownership of the c->item buffer from the connection to the
731 // request object here. Else we can double free if the conn closes while
732 // inside nread.
733 rq->pr.vbuf = c->item;
734 c->item = NULL;
735 c->item_malloced = false;
736 c->proxy_rctx = NULL;
737 pthread_mutex_lock(&thr->proxy_limit_lock);
738 thr->proxy_buffer_memory_used += rq->pr.vlen;
739 pthread_mutex_unlock(&thr->proxy_limit_lock);
740
741 proxy_run_rcontext(rctx);
742 mcp_funcgen_return_rctx(rctx);
743
744 lua_settop(L, 0); // clear anything remaining on the main thread.
745
746 return;
747 }
748
749 // Simple error wrapper for common failures.
750 // lua_error() is a jump so this function never returns
751 // for clarity add a 'return' after calls to this.
proxy_lua_error(lua_State * L,const char * s)752 void proxy_lua_error(lua_State *L, const char *s) {
753 lua_pushstring(L, s);
754 lua_error(L);
755 }
756
757 // Need a custom function so we can prefix lua strings easily.
proxy_out_errstring(mc_resp * resp,char * type,const char * str)758 void proxy_out_errstring(mc_resp *resp, char *type, const char *str) {
759 size_t len;
760 size_t prefix_len = strlen(type);
761
762 assert(resp != NULL);
763
764 resp_reset(resp);
765 // avoid noreply since we're throwing important errors.
766
767 // Fill response object with static string.
768 len = strlen(str);
769 if ((len + prefix_len + 2) > WRITE_BUFFER_SIZE) {
770 /* ought to be always enough. just fail for simplicity */
771 str = "SERVER_ERROR output line too long";
772 len = strlen(str);
773 }
774
775 char *w = resp->wbuf;
776 memcpy(w, type, prefix_len);
777 w += prefix_len;
778
779 memcpy(w, str, len);
780 w += len;
781
782 memcpy(w, "\r\n", 2);
783 resp_add_iov(resp, resp->wbuf, len + prefix_len + 2);
784 return;
785 }
786
787 // NOTE: See notes in mcp_queue_io; the secondary problem with setting the
788 // noreply mode from the response object is that the proxy can return strings
789 // manually, so we have no way to obey what the original request wanted in
790 // that case.
_set_noreply_mode(mc_resp * resp,mcp_resp_t * r)791 static void _set_noreply_mode(mc_resp *resp, mcp_resp_t *r) {
792 switch (r->mode) {
793 case RESP_MODE_NORMAL:
794 break;
795 case RESP_MODE_NOREPLY:
796 // ascii noreply only threw egregious errors to client
797 if (r->status == MCMC_OK) {
798 resp->skip = true;
799 }
800 break;
801 case RESP_MODE_METAQUIET:
802 if (r->resp.code == MCMC_CODE_END) {
803 resp->skip = true;
804 } else if (r->cmd != CMD_MG && r->resp.code == MCMC_CODE_OK) {
805 // FIXME (v2): mcmc's parser needs to help us out a bit more
806 // here.
807 // This is a broken case in the protocol though; quiet mode
808 // ignores HD for mutations but not get.
809 resp->skip = true;
810 }
811 break;
812 default:
813 assert(1 == 0);
814 }
815 }
816
_proxy_run_rcontext_queues(mcp_rcontext_t * rctx)817 static void _proxy_run_rcontext_queues(mcp_rcontext_t *rctx) {
818 for (int x = 0; x < rctx->fgen->max_queues; x++) {
819 mcp_run_rcontext_handle(rctx, x);
820 }
821 }
822
_proxy_run_tresp_to_resp(mc_resp * tresp,mc_resp * resp)823 static void _proxy_run_tresp_to_resp(mc_resp *tresp, mc_resp *resp) {
824 // The internal cache handler has created a resp we want to swap in
825 // here. It would be fastest to swap *resp's position in the
826 // link but if the set is deep this would instead be slow, so
827 // we copy over details from this temporary resp instead.
828
829 // So far all we fill is the wbuf and some iov's? so just copy
830 // that + the UDP info?
831 memcpy(resp->wbuf, tresp->wbuf, tresp->iov[0].iov_len);
832 resp->tosend = 0;
833 for (int x = 0; x < tresp->iovcnt; x++) {
834 resp->iov[x] = tresp->iov[x];
835 resp->tosend += tresp->iov[x].iov_len;
836 }
837 // resp->iov[x].iov_base needs to be updated if it's
838 // pointing within its wbuf.
839 // FIXME: This is too fragile. we need to be able to
840 // inherit details and swap resp objects around.
841 if (tresp->iov[0].iov_base == tresp->wbuf) {
842 resp->iov[0].iov_base = resp->wbuf;
843 }
844 resp->iovcnt = tresp->iovcnt;
845 resp->chunked_total = tresp->chunked_total;
846 resp->chunked_data_iov = tresp->chunked_data_iov;
847 // copy UDP headers...
848 resp->request_id = tresp->request_id;
849 resp->udp_sequence = tresp->udp_sequence;
850 resp->udp_total = tresp->udp_total;
851 resp->request_addr = tresp->request_addr;
852 resp->request_addr_size = tresp->request_addr_size;
853 resp->item = tresp->item; // will be populated if not extstore fetch
854 tresp->item = NULL; // move ownership of the item to resp from tresp
855 resp->skip = tresp->skip;
856 }
857
858 // HACK NOTES:
859 // These are self-notes for dormando mostly.
860 // The IO queue system does not work well with the proxy, as we need to:
861 // - only increment q->count during the submit phase
862 // - .. because a resumed coroutine can queue more data.
863 // - and we will never hit q->count == 0
864 // - .. and then never resume the main connection. (conn_worker_readd)
865 // - which will never submit the new sub-requests
866 // - need to only increment q->count once per stack of requests coming from a
867 // resp.
868 //
869 // For RQU backed requests (new API) there isn't an easy place to test for
870 // "the first request", because:
871 // - The connection queue is a stack of _all_ requests pending on this
872 // connection, and many requests can arrive in one batch.
873 // - Thus we cannot simply check if there are items in the queue
874 // - RQU's can be recursive, so we have to loop back to the parent to check to
875 // see if we're the first queue or not.
876 //
877 // This hack workaround exists so I can fix the IO queue subsystem as a change
878 // independent of the RCTX change, as the IO queue touches everything and
879 // scares the shit out of me. It's much easier to make changes to it in
880 // isolation, when all existing systems are currently working and testable.
881 //
882 // Description of the hack:
883 // - in mcp_queue_io: roll up rctx to parent, and if we are the first IO to queue
884 // since the rcontext started, set p->qcounr_incr = true
885 // Later in submit_cb:
886 // - q->count++ if p->qcount_incr.
887 //
888 // Finally, in proxy_return_rqu_cb:
889 // - If parent completed non-yielded work, q->count-- to allow conn
890 // resumption.
891 // - At bottom of rqu_cb(), flush any IO queues for the connection in case we
892 // re-queued work.
proxy_run_rcontext(mcp_rcontext_t * rctx)893 int proxy_run_rcontext(mcp_rcontext_t *rctx) {
894 int nresults = 0;
895 lua_State *Lc = rctx->Lc;
896 assert(rctx->lua_narg != 0);
897 int cores = lua_resume(Lc, NULL, rctx->lua_narg, &nresults);
898 rctx->lua_narg = 1; // reset to default since not-default is uncommon.
899 size_t rlen = 0;
900 mc_resp *resp = rctx->resp;
901
902 if (cores == LUA_OK) {
903 // don't touch the result object if we were a sub-context.
904 if (!rctx->parent) {
905 int type = lua_type(Lc, 1);
906 mcp_resp_t *r = NULL;
907 P_DEBUG("%s: coroutine completed. return type: %d\n", __func__, type);
908 if (type == LUA_TUSERDATA && (r = luaL_testudata(Lc, 1, "mcp.response")) != NULL) {
909 _set_noreply_mode(resp, r);
910 if (r->status != MCMC_OK && r->resp.type != MCMC_RESP_ERRMSG) {
911 proxy_out_errstring(resp, PROXY_SERVER_ERROR, "backend failure");
912 } else if (r->cresp) {
913 mc_resp *tresp = r->cresp;
914
915 _proxy_run_tresp_to_resp(tresp, resp);
916 // we let the mcp_resp gc handler free up tresp and any
917 // associated io_pending's of its own later.
918 } else if (r->buf) {
919 // response set from C.
920 resp->write_and_free = r->buf;
921 resp_add_iov(resp, r->buf, r->blen);
922 // stash the length to later remove from memory tracking
923 resp->wbytes = r->blen + r->extra;
924 resp->proxy_res = true;
925 r->buf = NULL;
926 } else {
927 // Empty response: used for ascii multiget emulation.
928 }
929
930 } else if (type == LUA_TSTRING) {
931 // response is a raw string from lua.
932 const char *s = lua_tolstring(Lc, 1, &rlen);
933 size_t l = rlen > WRITE_BUFFER_SIZE ? WRITE_BUFFER_SIZE : rlen;
934 memcpy(resp->wbuf, s, l);
935 resp_add_iov(resp, resp->wbuf, l);
936 lua_pop(Lc, 1);
937 } else {
938 proxy_out_errstring(resp, PROXY_SERVER_ERROR, "bad response");
939 }
940 }
941
942 rctx->pending_reqs--;
943 } else if (cores == LUA_YIELD) {
944 int yield_type = lua_tointeger(Lc, -1);
945 P_DEBUG("%s: coroutine yielded. return type: %d\n", __func__, yield_type);
946 assert(yield_type != 0);
947 lua_pop(Lc, 1);
948
949 int res = 0;
950 switch (yield_type) {
951 case MCP_YIELD_INTERNAL:
952 // stack should be: rq, res
953 if (rctx->parent) {
954 LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_ERROR, NULL, "cannot run mcp.internal from a sub request");
955 rctx->pending_reqs--;
956 return LUA_ERRRUN;
957 } else {
958 res = mcplib_internal_run(rctx);
959 if (res == 0) {
960 // stack should still be: rq, res
961 // TODO: turn this function into a for loop that re-runs on
962 // certain status codes, to avoid recursive depth here.
963 // or maybe... a goto? :P
964 proxy_run_rcontext(rctx);
965 } else if (res > 0) {
966 // internal run queued for extstore.
967 } else {
968 assert(res < 0);
969 proxy_out_errstring(resp, PROXY_SERVER_ERROR, "bad request");
970 }
971 }
972 break;
973 case MCP_YIELD_WAITCOND:
974 case MCP_YIELD_WAITHANDLE:
975 // Even if we're in WAITHANDLE, we want to dispatch any queued
976 // requests, so we still need to iterate the full set of qslots.
977 _proxy_run_rcontext_queues(rctx);
978 break;
979 case MCP_YIELD_SLEEP:
980 // Pause coroutine and do nothing. Alarm will resume.
981 break;
982 default:
983 abort();
984 }
985
986 } else {
987 // Log the error where it happens, then the parent will handle a
988 // result object normally.
989 P_DEBUG("%s: Failed to run coroutine: %s\n", __func__, lua_tostring(Lc, -1));
990 LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_ERROR, NULL, lua_tostring(Lc, -1));
991 if (!rctx->parent) {
992 proxy_out_errstring(resp, PROXY_SERVER_ERROR, "lua failure");
993 }
994 rctx->pending_reqs--;
995 }
996
997 return cores;
998 }
999
1000 // basically any data before the first key.
1001 // max is like 15ish plus spaces. we can be more strict about how many spaces
1002 // to expect because any client spamming space is being deliberately stupid
1003 // anyway.
1004 #define MAX_CMD_PREFIX 20
1005
proxy_process_command(conn * c,char * command,size_t cmdlen,bool multiget)1006 static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool multiget) {
1007 assert(c != NULL);
1008 LIBEVENT_THREAD *thr = c->thread;
1009 struct proxy_hook *hooks = thr->proxy_hooks;
1010 lua_State *L = thr->L;
1011 proxy_ctx_t *ctx = thr->proxy_ctx;
1012 mcp_parser_t pr = {0};
1013
1014 // Avoid doing resp_start() here, instead do it a bit later or as-needed.
1015 // This allows us to hop over to the internal text protocol parser, which
1016 // also calls resp_start().
1017 // Tighter integration later should obviate the need for this, it is not a
1018 // permanent solution.
1019 int ret = process_request(&pr, command, cmdlen);
1020 if (ret != 0) {
1021 WSTAT_INCR(c->thread, proxy_conn_errors, 1);
1022 if (!resp_start(c)) {
1023 conn_set_state(c, conn_closing);
1024 return;
1025 }
1026 proxy_out_errstring(c->resp, PROXY_CLIENT_ERROR, "parsing request");
1027 if (ret == -2) {
1028 // Kill connection on more critical parse failure.
1029 conn_set_state(c, conn_closing);
1030 }
1031 return;
1032 }
1033
1034 struct proxy_hook *hook = &hooks[pr.command];
1035 struct proxy_hook_ref hook_ref = hook->ref;
1036 // if client came from a tagged listener, scan for a more specific hook.
1037 // TODO: (v2) avoiding a hash table lookup here, but maybe some other
1038 // datastructure would suffice. for 4-8 tags this is perfectly fast.
1039 if (c->tag && hook->tagged) {
1040 struct proxy_hook_tagged *pht = hook->tagged;
1041 while (pht->ref.lua_ref) {
1042 if (c->tag == pht->tag) {
1043 hook_ref = pht->ref;
1044 break;
1045 }
1046 pht++;
1047 }
1048 }
1049
1050 if (!hook_ref.lua_ref) {
1051 // need to pass our command string into the internal handler.
1052 // to minimize the code change, this means allowing it to tokenize the
1053 // full command. The proxy's indirect parser should be built out to
1054 // become common code for both proxy and ascii handlers.
1055 // For now this means we have to null-terminate the command string,
1056 // then call into text protocol handler.
1057 // FIXME (v2): use a ptr or something; don't like this code.
1058 if (cmdlen > 1 && command[cmdlen-2] == '\r') {
1059 command[cmdlen-2] = '\0';
1060 } else {
1061 command[cmdlen-1] = '\0';
1062 }
1063 // lets nread_proxy know we're in ascii mode.
1064 c->proxy_rctx = NULL;
1065 process_command_ascii(c, command);
1066 return;
1067 }
1068
1069 // If ascii multiget, we turn this into a self-calling loop :(
1070 // create new request with next key, call this func again, then advance
1071 // original string.
1072 // might be better to split this function; the below bits turn into a
1073 // function call, then we don't re-process the above bits in the same way?
1074 // The way this is detected/passed on is very fragile.
1075 if (!multiget && pr.cmd_type == CMD_TYPE_GET && pr.has_space) {
1076 uint32_t keyoff = pr.tokens[pr.keytoken];
1077 while (pr.klen != 0) {
1078 char temp[KEY_MAX_LENGTH + MAX_CMD_PREFIX + 30];
1079 char *cur = temp;
1080 // Core daemon can abort the entire command if one key is bad, but
1081 // we cannot from the proxy. Instead we have to inject errors into
1082 // the stream. This should, thankfully, be rare at least.
1083 if (pr.tokens[pr.keytoken] > MAX_CMD_PREFIX) {
1084 if (!resp_start(c)) {
1085 conn_set_state(c, conn_closing);
1086 return;
1087 }
1088 proxy_out_errstring(c->resp, PROXY_CLIENT_ERROR, "malformed request");
1089 } else if (pr.klen > KEY_MAX_LENGTH) {
1090 if (!resp_start(c)) {
1091 conn_set_state(c, conn_closing);
1092 return;
1093 }
1094 proxy_out_errstring(c->resp, PROXY_CLIENT_ERROR, "key too long");
1095 } else {
1096 // copy original request up until the original key token.
1097 memcpy(cur, pr.request, pr.tokens[pr.keytoken]);
1098 cur += pr.tokens[pr.keytoken];
1099
1100 // now copy in our "current" key.
1101 memcpy(cur, &pr.request[keyoff], pr.klen);
1102 cur += pr.klen;
1103
1104 memcpy(cur, "\r\n", 2);
1105 cur += 2;
1106
1107 *cur = '\0';
1108 P_DEBUG("%s: new multiget sub request: %s [%u/%u]\n", __func__, temp, keyoff, pr.klen);
1109 proxy_process_command(c, temp, cur - temp, PROCESS_MULTIGET);
1110 }
1111
1112 // now advance to the next key.
1113 keyoff = _process_request_next_key(&pr);
1114 }
1115
1116 if (!resp_start(c)) {
1117 conn_set_state(c, conn_closing);
1118 return;
1119 }
1120
1121 // The above recursions should have created c->resp's in dispatch
1122 // order.
1123 // So now we add another one at the end to create the capping END
1124 // string.
1125 memcpy(c->resp->wbuf, ENDSTR, ENDLEN);
1126 resp_add_iov(c->resp, c->resp->wbuf, ENDLEN);
1127
1128 return;
1129 }
1130
1131 // We test the command length all the way down here because multigets can
1132 // be very long, and they're chopped up by now.
1133 if (cmdlen >= MCP_REQUEST_MAXLEN) {
1134 WSTAT_INCR(c->thread, proxy_conn_errors, 1);
1135 if (!resp_start(c)) {
1136 conn_set_state(c, conn_closing);
1137 return;
1138 }
1139 proxy_out_errstring(c->resp, PROXY_CLIENT_ERROR, "request too long");
1140 conn_set_state(c, conn_closing);
1141 return;
1142 }
1143
1144 if (!resp_start(c)) {
1145 conn_set_state(c, conn_closing);
1146 return;
1147 }
1148
1149 // Count requests handled by proxy vs local.
1150 // Also batch the counts down this far so we can lock once for the active
1151 // counter instead of twice.
1152 struct proxy_int_stats *istats = c->thread->proxy_int_stats;
1153 uint64_t active_reqs = 0;
1154 WSTAT_L(c->thread);
1155 istats->counters[pr.command]++;
1156 c->thread->stats.proxy_conn_requests++;
1157 active_reqs = c->thread->stats.proxy_req_active;
1158 WSTAT_UL(c->thread);
1159
1160 if (active_reqs >= ctx->active_req_limit) {
1161 proxy_out_errstring(c->resp, PROXY_SERVER_ERROR, "active request limit reached");
1162 if (pr.vlen != 0) {
1163 c->sbytes = pr.vlen;
1164 conn_set_state(c, conn_swallow);
1165 }
1166 return;
1167 }
1168
1169 // hook is owned by a function generator.
1170 mcp_rcontext_t *rctx = mcp_funcgen_start(L, hook_ref.ctx, &pr);
1171 if (rctx == NULL) {
1172 proxy_out_errstring(c->resp, PROXY_SERVER_ERROR, "lua start failure");
1173 if (pr.vlen != 0) {
1174 c->sbytes = pr.vlen;
1175 conn_set_state(c, conn_swallow);
1176 }
1177 return;
1178 }
1179
1180 mcp_set_request(&pr, rctx->request, command, cmdlen);
1181 rctx->request->ascii_multiget = multiget;
1182 rctx->c = c;
1183 rctx->conn_fd = c->sfd;
1184 rctx->pending_reqs++; // seed counter with the "main" request
1185 // remember the top level mc_resp, because further requests on the
1186 // same connection will replace c->resp.
1187 rctx->resp = c->resp;
1188
1189 // for the very first call we need to place:
1190 // - rctx->function_ref + rctx->request_ref
1191 // I _think_ here is the right place to do that?
1192 lua_rawgeti(rctx->Lc, LUA_REGISTRYINDEX, rctx->function_ref);
1193 lua_rawgeti(rctx->Lc, LUA_REGISTRYINDEX, rctx->request_ref);
1194
1195 if (pr.vlen != 0) {
1196 c->item = NULL;
1197 // Need to add the used memory later due to needing an extra callback
1198 // handler on error during nread.
1199 bool oom = proxy_bufmem_checkadd(c->thread, 0);
1200
1201 // relying on temporary malloc's not having fragmentation
1202 if (!oom) {
1203 c->item = malloc(pr.vlen);
1204 }
1205 if (c->item == NULL) {
1206 // return the RCTX
1207 rctx->pending_reqs--;
1208 mcp_funcgen_return_rctx(rctx);
1209 // normal cleanup
1210 lua_settop(L, 0);
1211 proxy_out_errstring(c->resp, PROXY_SERVER_ERROR, "out of memory");
1212 c->sbytes = pr.vlen;
1213 conn_set_state(c, conn_swallow);
1214 return;
1215 }
1216 c->item_malloced = true;
1217 c->ritem = c->item;
1218 c->rlbytes = pr.vlen;
1219
1220 // remember the request context for later.
1221 c->proxy_rctx = rctx;
1222
1223 conn_set_state(c, conn_nread);
1224 return;
1225 }
1226
1227 proxy_run_rcontext(rctx);
1228 mcp_funcgen_return_rctx(rctx);
1229
1230 lua_settop(L, 0); // clear any junk from the main thread.
1231 }
1232
mcp_prep_bare_resobj(lua_State * L,LIBEVENT_THREAD * t)1233 mcp_resp_t *mcp_prep_bare_resobj(lua_State *L, LIBEVENT_THREAD *t) {
1234 mcp_resp_t *r = lua_newuserdatauv(L, sizeof(mcp_resp_t), 0);
1235 // FIXME (v2): is this memset still necessary? I was using it for
1236 // debugging.
1237 memset(r, 0, sizeof(mcp_resp_t));
1238 r->thread = t;
1239 assert(r->thread != NULL);
1240 gettimeofday(&r->start, NULL);
1241
1242 luaL_getmetatable(L, "mcp.response");
1243 lua_setmetatable(L, -2);
1244
1245 return r;
1246 }
1247
mcp_set_resobj(mcp_resp_t * r,mcp_request_t * rq,mcp_backend_t * be,LIBEVENT_THREAD * t)1248 void mcp_set_resobj(mcp_resp_t *r, mcp_request_t *rq, mcp_backend_t *be, LIBEVENT_THREAD *t) {
1249 memset(r, 0, sizeof(mcp_resp_t));
1250 r->buf = NULL;
1251 r->blen = 0;
1252 r->thread = t;
1253 assert(r->thread != NULL);
1254 gettimeofday(&r->start, NULL);
1255 // Set noreply mode.
1256 // TODO (v2): the response "inherits" the request's noreply mode, which isn't
1257 // strictly correct; we should inherit based on the request that spawned
1258 // the coroutine but the structure doesn't allow that yet.
1259 // Should also be able to settle this exact mode from the parser so we
1260 // don't have to re-branch here.
1261 if (rq->pr.noreply) {
1262 if (rq->pr.cmd_type == CMD_TYPE_META) {
1263 r->mode = RESP_MODE_METAQUIET;
1264 for (int x = 2; x < rq->pr.ntokens; x++) {
1265 if (rq->request[rq->pr.tokens[x]] == 'q') {
1266 rq->request[rq->pr.tokens[x]] = ' ';
1267 }
1268 }
1269 } else {
1270 r->mode = RESP_MODE_NOREPLY;
1271 rq->request[rq->pr.reqlen - 3] = 'Y';
1272 }
1273 } else {
1274 r->mode = RESP_MODE_NORMAL;
1275 }
1276
1277 r->cmd = rq->pr.command;
1278
1279 strncpy(r->be_name, be->name, MAX_NAMELEN+1);
1280 strncpy(r->be_port, be->port, MAX_PORTLEN+1);
1281
1282 }
1283
mcp_prep_resobj(lua_State * L,mcp_request_t * rq,mcp_backend_t * be,LIBEVENT_THREAD * t)1284 mcp_resp_t *mcp_prep_resobj(lua_State *L, mcp_request_t *rq, mcp_backend_t *be, LIBEVENT_THREAD *t) {
1285 mcp_resp_t *r = lua_newuserdatauv(L, sizeof(mcp_resp_t), 0);
1286 mcp_set_resobj(r, rq, be, t);
1287
1288 luaL_getmetatable(L, "mcp.response");
1289 lua_setmetatable(L, -2);
1290
1291 return r;
1292 }
1293
mcp_resp_set_elapsed(mcp_resp_t * r)1294 void mcp_resp_set_elapsed(mcp_resp_t *r) {
1295 struct timeval end;
1296 // stamp the elapsed time into the response object.
1297 gettimeofday(&end, NULL);
1298 r->elapsed = (end.tv_sec - r->start.tv_sec) * 1000000 +
1299 (end.tv_usec - r->start.tv_usec);
1300 }
1301
1302 // Used for any cases where we're queueing requests to the IO subsystem.
1303 // NOTE: it's not currently possible to limit the memory used by the IO
1304 // object cache. So this check is redundant, and any callers may proceed
1305 // as though it is successful.
mcp_queue_rctx_io(mcp_rcontext_t * rctx,mcp_request_t * rq,mcp_backend_t * be,mcp_resp_t * r)1306 io_pending_proxy_t *mcp_queue_rctx_io(mcp_rcontext_t *rctx, mcp_request_t *rq, mcp_backend_t *be, mcp_resp_t *r) {
1307 conn *c = rctx->c;
1308 io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_PROXY);
1309 io_pending_proxy_t *p = do_cache_alloc(c->thread->io_cache);
1310 if (p == NULL) {
1311 WSTAT_INCR(c->thread, proxy_conn_oom, 1);
1312 proxy_lua_error(rctx->Lc, "out of memory allocating from IO cache");
1313 // NOTE: the error call above jumps to an error handler, so this does
1314 // not actually return.
1315 return NULL;
1316 }
1317
1318 // this is a re-cast structure, so assert that we never outsize it.
1319 assert(sizeof(io_pending_t) >= sizeof(io_pending_proxy_t));
1320 memset(p, 0, sizeof(io_pending_proxy_t));
1321 // set up back references.
1322 p->io_queue_type = IO_QUEUE_PROXY;
1323 p->thread = c->thread;
1324 p->c = c;
1325 p->client_resp = r;
1326 p->flushed = false;
1327 p->return_cb = NULL;
1328 p->finalize_cb = proxy_finalize_rctx_cb;
1329
1330 // pass along the request context for resumption.
1331 p->rctx = rctx;
1332
1333 if (rq) {
1334 p->ascii_multiget = rq->ascii_multiget;
1335 // The direct backend object. Lc is holding the reference in the stack
1336 p->backend = be;
1337
1338 mcp_request_attach(rq, p);
1339 }
1340
1341 // HACK
1342 // find the parent rctx
1343 while (rctx->parent) {
1344 rctx = rctx->parent;
1345 }
1346 // Hack to enforce the first iop increments client IO queue counter.
1347 if (!rctx->first_queue) {
1348 rctx->first_queue = true;
1349 p->qcount_incr = true;
1350 }
1351 // END HACK
1352
1353 // link into the batch chain.
1354 p->next = q->stack_ctx;
1355 q->stack_ctx = p;
1356 P_DEBUG("%s: queued\n", __func__);
1357
1358 return p;
1359 }
1360
1361 // DO NOT call this method frequently! globally locked!
mcp_sharedvm_delta(proxy_ctx_t * ctx,int tidx,const char * name,int delta)1362 void mcp_sharedvm_delta(proxy_ctx_t *ctx, int tidx, const char *name, int delta) {
1363 lua_State *L = ctx->proxy_sharedvm;
1364 pthread_mutex_lock(&ctx->sharedvm_lock);
1365
1366 if (lua_getfield(L, tidx, name) == LUA_TNIL) {
1367 lua_pop(L, 1);
1368 lua_pushinteger(L, delta);
1369 lua_setfield(L, tidx, name);
1370 } else {
1371 lua_pushinteger(L, delta);
1372 lua_arith(L, LUA_OPADD);
1373 lua_setfield(L, tidx, name);
1374 }
1375
1376 pthread_mutex_unlock(&ctx->sharedvm_lock);
1377 }
1378
mcp_sharedvm_remove(proxy_ctx_t * ctx,int tidx,const char * name)1379 void mcp_sharedvm_remove(proxy_ctx_t *ctx, int tidx, const char *name) {
1380 lua_State *L = ctx->proxy_sharedvm;
1381 pthread_mutex_lock(&ctx->sharedvm_lock);
1382
1383 lua_pushnil(L);
1384 lua_setfield(L, tidx, name);
1385
1386 pthread_mutex_unlock(&ctx->sharedvm_lock);
1387 }
1388
1389 // Global object support code.
1390 // Global objects are created in the configuration VM, and referenced in
1391 // worker VMs via proxy objects that refer back to memory in the
1392 // configuration VM.
1393 // We manage reference counts: once all remote proxy objects are collected, we
1394 // signal the config thread to remove a final reference and collect garbage to
1395 // remove the global object.
1396
mcp_gobj_enqueue(proxy_ctx_t * ctx,struct mcp_globalobj_s * g)1397 static void mcp_gobj_enqueue(proxy_ctx_t *ctx, struct mcp_globalobj_s *g) {
1398 pthread_mutex_lock(&ctx->manager_lock);
1399 STAILQ_INSERT_TAIL(&ctx->manager_head, g, next);
1400 pthread_cond_signal(&ctx->manager_cond);
1401 pthread_mutex_unlock(&ctx->manager_lock);
1402 }
1403
1404 // References the object, initializing the self-reference if necessary.
1405 // Call from config thread, with global object on top of stack.
mcp_gobj_ref(lua_State * L,struct mcp_globalobj_s * g)1406 void mcp_gobj_ref(lua_State *L, struct mcp_globalobj_s *g) {
1407 pthread_mutex_lock(&g->lock);
1408 if (g->self_ref == 0) {
1409 // Initialization requires a small dance:
1410 // - store a negative of our ref, increase refcount an extra time
1411 // - then link and signal the manager thread as though we were GC'ing
1412 // the object.
1413 // - the manager thread will later acknowledge the initialization of
1414 // this global object and negate the self_ref again
1415 // - this prevents an unused proxy object from causing the global
1416 // object to be reaped early while we are still copying it to worker
1417 // threads, as the manager thread will block waiting for the config
1418 // thread to finish its reload work.
1419 g->self_ref = -luaL_ref(L, LUA_REGISTRYINDEX);
1420 g->refcount++;
1421 proxy_ctx_t *ctx = PROXY_GET_CTX(L);
1422 mcp_gobj_enqueue(ctx, g);
1423 } else {
1424 lua_pop(L, 1); // drop the reference we didn't end up using.
1425 }
1426 g->refcount++;
1427 pthread_mutex_unlock(&g->lock);
1428 }
1429
mcp_gobj_unref(proxy_ctx_t * ctx,struct mcp_globalobj_s * g)1430 void mcp_gobj_unref(proxy_ctx_t *ctx, struct mcp_globalobj_s *g) {
1431 pthread_mutex_lock(&g->lock);
1432 g->refcount--;
1433 if (g->refcount == 0) {
1434 mcp_gobj_enqueue(ctx, g);
1435 }
1436 pthread_mutex_unlock(&g->lock);
1437 }
1438
mcp_gobj_finalize(struct mcp_globalobj_s * g)1439 void mcp_gobj_finalize(struct mcp_globalobj_s *g) {
1440 pthread_mutex_destroy(&g->lock);
1441 }
1442
mcp_profile_alloc(void * ud,void * ptr,size_t osize,size_t nsize)1443 static void *mcp_profile_alloc(void *ud, void *ptr, size_t osize,
1444 size_t nsize) {
1445 struct mcp_memprofile *prof = ud;
1446 struct timespec now;
1447 clock_gettime(CLOCK_MONOTONIC, &now);
1448 enum mcp_memprofile_types t = mcp_memp_free;
1449 if (ptr == NULL) {
1450 switch (osize) {
1451 case LUA_TSTRING:
1452 t = mcp_memp_string;
1453 //fprintf(stderr, "alloc string: %ld\n", nsize);
1454 break;
1455 case LUA_TTABLE:
1456 t = mcp_memp_table;
1457 //fprintf(stderr, "alloc table: %ld\n", nsize);
1458 break;
1459 case LUA_TFUNCTION:
1460 t = mcp_memp_func;
1461 //fprintf(stderr, "alloc func: %ld\n", nsize);
1462 break;
1463 case LUA_TUSERDATA:
1464 t = mcp_memp_userdata;
1465 //fprintf(stderr, "alloc userdata: %ld\n", nsize);
1466 break;
1467 case LUA_TTHREAD:
1468 t = mcp_memp_thread;
1469 //fprintf(stderr, "alloc thread: %ld\n", nsize);
1470 break;
1471 default:
1472 t = mcp_memp_default;
1473 //fprintf(stderr, "alloc osize: %ld nsize: %ld\n", osize, nsize);
1474 }
1475 prof->allocs[t]++;
1476 prof->alloc_bytes[t] += nsize;
1477 } else {
1478 if (nsize != 0) {
1479 prof->allocs[mcp_memp_realloc]++;
1480 prof->alloc_bytes[mcp_memp_realloc] += nsize;
1481 } else {
1482 prof->allocs[mcp_memp_free]++;
1483 prof->alloc_bytes[mcp_memp_free] += osize;
1484 }
1485 //fprintf(stderr, "realloc: osize: %ld nsize: %ld\n", osize, nsize);
1486 }
1487
1488 if (now.tv_sec != prof->last_status.tv_sec) {
1489 prof->last_status.tv_sec = now.tv_sec;
1490 fprintf(stderr, "MEMPROF[%d]:\tstring[%llu][%llu] table[%llu][%llu] func[%llu][%llu] udata[%llu][%llu] thr[%llu][%llu] def[%llu][%llu] realloc[%llu][%llu] free[%llu][%llu]\n",
1491 prof->id,
1492 (unsigned long long)prof->allocs[1],
1493 (unsigned long long)prof->alloc_bytes[1],
1494 (unsigned long long)prof->allocs[2],
1495 (unsigned long long)prof->alloc_bytes[2],
1496 (unsigned long long)prof->allocs[3],
1497 (unsigned long long)prof->alloc_bytes[3],
1498 (unsigned long long)prof->allocs[4],
1499 (unsigned long long)prof->alloc_bytes[4],
1500 (unsigned long long)prof->allocs[5],
1501 (unsigned long long)prof->alloc_bytes[5],
1502 (unsigned long long)prof->allocs[6],
1503 (unsigned long long)prof->alloc_bytes[6],
1504 (unsigned long long)prof->allocs[7],
1505 (unsigned long long)prof->alloc_bytes[7],
1506 (unsigned long long)prof->allocs[0],
1507 (unsigned long long)prof->alloc_bytes[0]);
1508 for (int x = 0; x < 8; x++) {
1509 prof->allocs[x] = 0;
1510 prof->alloc_bytes[x] = 0;
1511 }
1512 }
1513
1514 if (nsize == 0) {
1515 free(ptr);
1516 return NULL;
1517 } else {
1518 return realloc(ptr, nsize);
1519 }
1520 }
1521
1522 // Common lua debug command.
dump_stack(lua_State * L,const char * msg)1523 __attribute__((unused)) void dump_stack(lua_State *L, const char *msg) {
1524 int top = lua_gettop(L);
1525 int i = 1;
1526 fprintf(stderr, "--TOP OF STACK [%d] | %s\n", top, msg);
1527 for (; i < top + 1; i++) {
1528 int type = lua_type(L, i);
1529 void *udata = NULL;
1530 // lets find the metatable of this userdata to identify it.
1531 if (lua_getmetatable(L, i) != 0) {
1532 lua_pushstring(L, "__name");
1533 if (lua_rawget(L, -2) != LUA_TNIL) {
1534 if (type == LUA_TUSERDATA) {
1535 udata = lua_touserdata(L, i);
1536 }
1537 fprintf(stderr, "--|%d| [%s] (%s) [ptr: %p]\n", i, lua_typename(L, type), lua_tostring(L, -1), udata);
1538 lua_pop(L, 2);
1539 continue;
1540 }
1541 lua_pop(L, 2);
1542 }
1543 if (type == LUA_TSTRING) {
1544 fprintf(stderr, "--|%d| [%s] | %s\n", i, lua_typename(L, type), lua_tostring(L, i));
1545 } else {
1546 if (type == LUA_TUSERDATA) {
1547 udata = lua_touserdata(L, i);
1548 }
1549 fprintf(stderr, "--|%d| [%s] [ptr: %p]\n", i, lua_typename(L, type), udata);
1550 }
1551 }
1552 fprintf(stderr, "-----------------\n");
1553 }
1554
1555 // Not very pretty, but helped.
1556 // Nice to haves:
1557 // - summarize counts for each metatable (easy enough to do from logging)
1558 // - use a less noisy stack dump instead of calling dump_stack()
dump_registry(lua_State * L,const char * msg)1559 __attribute__((unused)) void dump_registry(lua_State *L, const char *msg) {
1560 int ref_size = lua_rawlen(L, LUA_REGISTRYINDEX);
1561 fprintf(stderr, "--LUA REGISTRY TABLE [%d] | %s\n", ref_size, msg);
1562 // walk registry
1563 int ridx = lua_absindex(L, LUA_REGISTRYINDEX);
1564 int udata = 0;
1565 int number = 0;
1566 int string = 0;
1567 int function = 0;
1568 int table = 0;
1569 lua_pushnil(L);
1570 while (lua_next(L, ridx) != 0) {
1571 dump_stack(L, "===registry entry===");
1572 int type = lua_type(L, -1);
1573 if (type == LUA_TUSERDATA) {
1574 udata++;
1575 } else if (type == LUA_TNUMBER) {
1576 number++;
1577 } else if (type == LUA_TSTRING) {
1578 string++;
1579 } else if (type == LUA_TFUNCTION) {
1580 function++;
1581 } else if (type == LUA_TTABLE) {
1582 table++;
1583 }
1584 lua_pop(L, 1); // drop value
1585 }
1586 fprintf(stderr, "SUMMARY:\n\n");
1587 fprintf(stderr, "### UDATA\t[%d]\n", udata);
1588 fprintf(stderr, "### NUMBER\t[%d]\n", number);
1589 fprintf(stderr, "### STRING\t[%d]\n", string);
1590 fprintf(stderr, "### FUNCTION\t[%d]\n", function );
1591 fprintf(stderr, "### TABLE\t[%d]\n", table);
1592 fprintf(stderr, "-----------------\n");
1593 }
1594
1595 // Searches for a function generator with a specific name attached.
1596 // Adding breakpoints on the print lines lets you inspect the fgen and its
1597 // slots.
dump_funcgen(lua_State * L,const char * name,const char * msg)1598 __attribute__((unused)) void dump_funcgen(lua_State *L, const char *name, const char *msg) {
1599 int ref_size = lua_rawlen(L, LUA_REGISTRYINDEX);
1600 fprintf(stderr, "--LUA FUNCGEN FINDER [%d] | %s\n", ref_size, msg);
1601 // walk registry
1602 int ridx = lua_absindex(L, LUA_REGISTRYINDEX);
1603 lua_pushnil(L);
1604 while (lua_next(L, ridx) != 0) {
1605 int type = lua_type(L, -1);
1606 if (type == LUA_TUSERDATA) {
1607 mcp_funcgen_t *f = luaL_testudata(L, -1, "mcp.funcgen");
1608 if (f != NULL && strcmp(name, f->name) == 0) {
1609 fprintf(stderr, "===found funcgen [%s] [%p]===\n", f->name, (void *)f);
1610 lua_getiuservalue(L, -1, 1);
1611 int tidx = lua_absindex(L, -1);
1612 lua_pushnil(L);
1613 while (lua_next(L, tidx) != 0) {
1614 mcp_rcontext_t *rctx = lua_touserdata(L, -1);
1615 if (rctx != NULL) {
1616 fprintf(stderr, "-- slot: [%p]\n", (void *)rctx);
1617 }
1618 lua_pop(L, 1); // drop value
1619 }
1620 lua_pop(L, 1); // drop slot table
1621 }
1622 }
1623 lua_pop(L, 1); // drop value
1624 }
1625 fprintf(stderr, "-----------------\n");
1626 }
1627
dump_pool_info(mcp_pool_t * p)1628 static void dump_pool_info(mcp_pool_t *p) {
1629 fprintf(stderr, "--pool: [%s] size: [%d] be_total: [%d] rc: [%d] io: [%d]\n",
1630 p->beprefix, p->pool_size, p->pool_be_total, p->g.refcount, p->use_iothread);
1631
1632 for (int x = 0; x < p->pool_be_total; x++) {
1633 mcp_backend_t *be = p->pool[x].be;
1634 // Dumb: pool_be_total is wrong if pool is using iothread. Why?
1635 if (be != NULL) {
1636 fprintf(stderr, " --be[%d] label: [%s] name: [%s] conns: [%d] depth: [%d]\n",
1637 x, be->label, be->name, be->conncount, be->depth);
1638 for (int i = 0; i < be->conncount; i++) {
1639 struct mcp_backendconn_s *bec = &be->be[i];
1640 fprintf(stderr, " --bec[%d] bad: [%d] failcnt: [%d] depth: [%d] state: [%d] can_write[%d] write_event[%d]\n",
1641 i, bec->bad, bec->failed_count, bec->depth, bec->state, bec->can_write, event_pending(&bec->timeout_event, EV_WRITE, NULL));
1642 }
1643 }
1644 }
1645 fprintf(stderr, "=======\n");
1646 }
1647
1648 // Dumps some info about pools.
1649 // If given the config thread, it should find the main pools
1650 // If given a worker thread, it will look for the pool proxy objects and find
1651 // the main pools that way.
dump_pools(lua_State * L,const char * msg)1652 __attribute__((unused)) void dump_pools(lua_State *L, const char *msg) {
1653 int ref_size = lua_rawlen(L, LUA_REGISTRYINDEX);
1654 fprintf(stderr, "--LUA POOL DUMPER [%d] | %s\n", ref_size, msg);
1655 // walk registry
1656 int ridx = lua_absindex(L, LUA_REGISTRYINDEX);
1657 lua_pushnil(L);
1658 while (lua_next(L, ridx) != 0) {
1659 int type = lua_type(L, -1);
1660 if (type == LUA_TUSERDATA) {
1661 mcp_pool_t *p = luaL_testudata(L, -1, "mcp.pool");
1662 if (p != NULL) {
1663 dump_pool_info(p);
1664 } else {
1665 mcp_pool_proxy_t *pp = luaL_testudata(L, -1, "mcp.pool_proxy");
1666 if (pp != NULL) {
1667 dump_pool_info(pp->main);
1668 }
1669 }
1670 }
1671 lua_pop(L, 1); // drop value
1672 }
1673 fprintf(stderr, "-----------------\n");
1674
1675 }
1676