1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // Functions related to the configuration management threads and VM
3 // TODO (v2): move worker thread related code back out of here.
4
5 #include "proxy.h"
6
7 // not using queue.h becuase those require specific storage for HEAD.
8 // it's not possible to have the HEAD simply be in the proxy context because
9 // it would need to know the offset into this private structure.
10 // This might be doable but the problem is too trivial to spend time on it.
11 #define MCP_LUAFILE_SIZE 16384
12 struct _mcp_luafile {
13 size_t size;
14 size_t used;
15 bool loaded; // flip this to false before each load use
16 char *buf;
17 char *fname; // filename to load
18 struct _mcp_luafile *next;
19 };
20
_dump_helper(lua_State * L,const void * p,size_t sz,void * ud)21 static int _dump_helper(lua_State *L, const void *p, size_t sz, void *ud) {
22 (void)L;
23 struct _mcp_luafile *db = ud;
24 if (db->used + sz > db->size) {
25 // increase by blocks instead of doubling to avoid memory waste
26 db->size += MCP_LUAFILE_SIZE;
27 char *nb = realloc(db->buf, db->size);
28 if (nb == NULL) {
29 return -1;
30 }
31 db->buf = nb;
32 }
33 memcpy(db->buf + db->used, (const char *)p, sz);
34 db->used += sz;
35 return 0;
36 }
37
_load_helper(lua_State * L,void * data,size_t * size)38 static const char * _load_helper(lua_State *L, void *data, size_t *size) {
39 (void)L;
40 struct _mcp_luafile *db = data;
41 if (db->loaded) {
42 *size = 0;
43 return NULL;
44 }
45 *size = db->used;
46 db->loaded = true;
47 return db->buf;
48 }
49
proxy_start_reload(void * arg)50 void proxy_start_reload(void *arg) {
51 proxy_ctx_t *ctx = arg;
52 if (pthread_mutex_trylock(&ctx->config_lock) == 0) {
53 ctx->loading = true;
54 pthread_cond_signal(&ctx->config_cond);
55 pthread_mutex_unlock(&ctx->config_lock);
56 }
57 }
58
proxy_first_confload(void * arg)59 int proxy_first_confload(void *arg) {
60 proxy_ctx_t *ctx = arg;
61 pthread_mutex_lock(&ctx->config_lock);
62 ctx->loading = true;
63 pthread_cond_signal(&ctx->config_cond);
64 pthread_mutex_unlock(&ctx->config_lock);
65
66 while (1) {
67 bool stop = false;
68 pthread_mutex_lock(&ctx->config_lock);
69 if (!ctx->loading) {
70 stop = true;
71 }
72 pthread_mutex_unlock(&ctx->config_lock);
73 if (stop)
74 break;
75 }
76 int fails = 0;
77 STAT_L(ctx);
78 fails = ctx->global_stats.config_reload_fails;
79 STAT_UL(ctx);
80 if (fails) {
81 return -1;
82 }
83
84 return 0;
85 }
86
87 // Manages a queue of inbound objects destined to be deallocated.
_proxy_manager_thread(void * arg)88 static void *_proxy_manager_thread(void *arg) {
89 proxy_ctx_t *ctx = arg;
90 globalobj_head_t head;
91
92 pthread_mutex_lock(&ctx->manager_lock);
93 while (1) {
94 STAILQ_INIT(&head);
95 while (STAILQ_EMPTY(&ctx->manager_head)) {
96 pthread_cond_wait(&ctx->manager_cond, &ctx->manager_lock);
97 }
98
99 // pull dealloc queue into local queue.
100 STAILQ_CONCAT(&head, &ctx->manager_head);
101 pthread_mutex_unlock(&ctx->manager_lock);
102
103 // Config lock is required for using config VM.
104 pthread_mutex_lock(&ctx->config_lock);
105 lua_State *L = ctx->proxy_state;
106 struct mcp_globalobj_s *g;
107 STAILQ_FOREACH(g, &head, next) {
108 // we let the object _gc() handle backend/etc references
109 pthread_mutex_lock(&g->lock);
110 assert(g->self_ref != -1);
111 // See comment on mcp_gobj_ref()
112 if (g->self_ref < -1) {
113 g->refcount--;
114 g->self_ref = -g->self_ref;
115 }
116 assert(g->self_ref > 0 || g->refcount == 0);
117 if (g->refcount == 0) {
118 luaL_unref(L, LUA_REGISTRYINDEX, g->self_ref);
119 g->self_ref = -1;
120 }
121 pthread_mutex_unlock(&g->lock);
122 }
123 // force lua garbage collection so any resources close out quickly.
124 lua_gc(L, LUA_GCCOLLECT);
125 // twice because objects with garbage collector handlers are only
126 // marked on the first collection cycle.
127 lua_gc(L, LUA_GCCOLLECT);
128 // must hold this lock while interacting with the config VM.
129 pthread_mutex_unlock(&ctx->config_lock);
130
131 // done.
132 pthread_mutex_lock(&ctx->manager_lock);
133 }
134
135 return NULL;
136 }
137
138 // TODO: only run routine if something changed.
139 // This compacts all of the names for proxy user stats into a linear buffer,
140 // which can save considerable CPU when emitting a large number of stats. It
141 // also saves some total memory by having one linear buffer instead of many
142 // potentially small aligned allocations.
proxy_config_stats_prep(proxy_ctx_t * ctx)143 static void proxy_config_stats_prep(proxy_ctx_t *ctx) {
144 char *oldnamebuf = ctx->user_stats_namebuf;
145 struct proxy_user_stats_entry *entries = ctx->user_stats;
146 size_t namelen = 0;
147
148 STAT_L(ctx);
149 // find size of new compact name buffer
150 for (int x = 0; x < ctx->user_stats_num; x++) {
151 if (entries[x].name) {
152 namelen += strlen(entries[x].name) + 1; // null byte
153 } else if (entries[x].cname) {
154 char *name = oldnamebuf + entries[x].cname;
155 namelen += strlen(name) + 1;
156 }
157 }
158 // start one byte into the cname buffer so we can do faster checks on if a
159 // name exists or not. so extend the buffer by one byte.
160 namelen++;
161
162 char *namebuf = calloc(1, namelen);
163 // copy names into the compact buffer
164 char *p = namebuf + 1;
165 for (int x = 0; x < ctx->user_stats_num; x++) {
166 struct proxy_user_stats_entry *e = &entries[x];
167 char *newname = NULL;
168 if (e->name) {
169 // skip blank names.
170 if (e->name[0]) {
171 newname = e->name;
172 }
173 } else if (e->cname) {
174 // else re-copy from old buffer
175 newname = oldnamebuf + e->cname;
176 }
177
178 if (newname) {
179 // set the buffer offset for this name
180 e->cname = p - namebuf;
181 // copy in the name
182 size_t nlen = strlen(newname);
183 memcpy(p, newname, nlen);
184 p += nlen;
185 *p = '\0'; // add null byte
186 p++;
187 } else {
188 // name is blank or doesn't exist, ensure we skip it.
189 e->cname = 0;
190 }
191
192 if (e->name) {
193 // now get rid of the name buffer.
194 free(e->name);
195 e->name = NULL;
196 }
197 }
198
199 ctx->user_stats_namebuf = namebuf;
200 if (oldnamebuf) {
201 free(oldnamebuf);
202 }
203 STAT_UL(ctx);
204 }
205
proxy_config_reload(proxy_ctx_t * ctx)206 static void proxy_config_reload(proxy_ctx_t *ctx) {
207 LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "start");
208 STAT_INCR(ctx, config_reloads, 1);
209 // gen. used for tracking object lifecycles over time.
210 // ie: ensuring old things are unloaded.
211 ctx->config_generation++;
212 lua_State *L = ctx->proxy_state;
213 lua_settop(L, 0); // clear off any crud that could have been left on the stack.
214
215 // The main stages of config reload are:
216 // - load and execute the config file
217 // - run mcp_config_pools()
218 // - for each worker:
219 // - copy and execute new lua code
220 // - copy selector table
221 // - run mcp_config_routes()
222
223 if (proxy_load_config(ctx) != 0) {
224 // Failed to load. log and wait for a retry.
225 STAT_INCR(ctx, config_reload_fails, 1);
226 LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "failed");
227 return;
228 }
229
230 proxy_config_stats_prep(ctx);
231
232 // TODO (v2): create a temporary VM to test-load the worker code into.
233 // failing to load partway through the worker VM reloads can be
234 // critically bad if we're not careful about references.
235 // IE: the config VM _must_ hold references to selectors and backends
236 // as long as they exist in any worker for any reason.
237
238 for (int x = 0; x < settings.num_threads; x++) {
239 LIBEVENT_THREAD *thr = get_worker_thread(x);
240
241 pthread_mutex_lock(&ctx->worker_lock);
242 ctx->worker_done = false;
243 ctx->worker_failed = false;
244 proxy_reload_notify(thr);
245 while (!ctx->worker_done) {
246 // in case of spurious wakeup.
247 pthread_cond_wait(&ctx->worker_cond, &ctx->worker_lock);
248 }
249 pthread_mutex_unlock(&ctx->worker_lock);
250
251 // Code load bailed.
252 if (ctx->worker_failed) {
253 STAT_INCR(ctx, config_reload_fails, 1);
254 LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "failed");
255 return;
256 }
257 }
258
259 // Need to clear the reset flag for the stats system after pushing the new
260 // config to each worker.
261 STAT_L(ctx);
262 for (int x = 0; x < ctx->user_stats_num; x++) {
263 ctx->user_stats[x].reset = false;
264 }
265 STAT_UL(ctx);
266
267 lua_pop(ctx->proxy_state, 1); // drop config_pools return value
268 LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "done");
269 }
270
271 // Very basic scheduler. Unsorted because we don't expect a huge list of
272 // functions to run.
proxy_run_crons(proxy_ctx_t * ctx)273 static void proxy_run_crons(proxy_ctx_t *ctx) {
274 lua_State *L = ctx->proxy_state;
275 assert(lua_gettop(L) == 0);
276 assert(ctx->cron_ref);
277 struct timespec now;
278
279 // Fetch the cron table. Created on startup so must exist.
280 lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->cron_ref);
281
282 clock_gettime(CLOCK_REALTIME, &now);
283 if (ctx->cron_next <= now.tv_sec) {
284 ctx->cron_next = INT_MAX;
285 } else {
286 // no crons ready.
287 return;
288 }
289
290 // Loop the cron entries.
291 lua_pushnil(L);
292 while (lua_next(L, 1) != 0) {
293 const char *key = lua_tostring(L, -2);
294 mcp_cron_t *ce = lua_touserdata(L, -1);
295 int idx = lua_absindex(L, -1);
296
297 // check generation.
298 if (ctx->config_generation != ce->gen) {
299 // remove entry.
300 lua_pushnil(L);
301 lua_setfield(L, 1, key);
302 } else if (ce->next <= now.tv_sec) {
303 // grab func and execute it
304 lua_getiuservalue(L, idx, 1);
305 // no arguments or return values
306 int res = lua_pcall(L, 0, 0, 0);
307 STAT_INCR(ctx, config_cron_runs, 1);
308 if (res != LUA_OK) {
309 LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_ERROR, NULL, lua_tostring(L, -1));
310 STAT_INCR(ctx, config_cron_fails, 1);
311 lua_pop(L, 1); // drop error.
312 }
313
314 if (ce->repeat) {
315 ce->next = now.tv_sec + ce->every;
316 // if rescheduled, check next against ctx. update if sooner
317 if (ctx->cron_next > ce->next) {
318 ctx->cron_next = ce->next;
319 }
320 } else {
321 // non-repeating cron. delete entry.
322 lua_pushnil(L);
323 lua_setfield(L, 1, key);
324 }
325 } else {
326 // not scheduled to run now, but check if we're next.
327 if (ctx->cron_next > ce->next) {
328 ctx->cron_next = ce->next;
329 }
330 }
331
332 lua_pop(L, 1); // drop value so we can loop.
333 }
334
335 lua_pop(L, 1); // drop cron table.
336 }
337
338 // Thread handling the configuration reload sequence.
339 // TODO (v2): get a logger instance.
340 // TODO (v2): making this "safer" will require a few phases of work.
341 // 1) JFDI
342 // 2) "test VM" -> from config thread, test the worker reload portion.
343 // 3) "unit testing" -> from same temporary worker VM, execute set of
344 // integration tests that must pass.
345 // 4) run update on each worker, collecting new mcp.attach() hooks.
346 // Once every worker has successfully executed and set new hooks, roll
347 // through a _second_ time to actually swap the hook structures and unref
348 // the old structures where marked dirty.
_proxy_config_thread(void * arg)349 static void *_proxy_config_thread(void *arg) {
350 proxy_ctx_t *ctx = arg;
351 struct timespec wait = {0};
352
353 logger_create();
354 pthread_mutex_lock(&ctx->config_lock);
355 pthread_cond_signal(&ctx->config_cond);
356 while (1) {
357 ctx->loading = false;
358
359 // cron only thinks in whole seconds.
360 wait.tv_sec = ctx->cron_next;
361 pthread_cond_timedwait(&ctx->config_cond, &ctx->config_lock, &wait);
362
363 proxy_run_crons(ctx);
364
365 if (ctx->loading) {
366 proxy_config_reload(ctx);
367 }
368 }
369
370 return NULL;
371 }
372
_start_proxy_config_threads(proxy_ctx_t * ctx)373 int _start_proxy_config_threads(proxy_ctx_t *ctx) {
374 int ret;
375
376 pthread_mutex_lock(&ctx->config_lock);
377 if ((ret = pthread_create(&ctx->config_tid, NULL,
378 _proxy_config_thread, ctx)) != 0) {
379 fprintf(stderr, "Failed to start proxy configuration thread: %s\n",
380 strerror(ret));
381 pthread_mutex_unlock(&ctx->config_lock);
382 return -1;
383 }
384 thread_setname(ctx->config_tid, "mc-prx-config");
385 // Avoid returning until the config thread has actually started.
386 pthread_cond_wait(&ctx->config_cond, &ctx->config_lock);
387 pthread_mutex_unlock(&ctx->config_lock);
388
389 pthread_mutex_lock(&ctx->manager_lock);
390 if ((ret = pthread_create(&ctx->manager_tid, NULL,
391 _proxy_manager_thread, ctx)) != 0) {
392 fprintf(stderr, "Failed to start proxy manager thread: %s\n",
393 strerror(ret));
394 pthread_mutex_unlock(&ctx->manager_lock);
395 return -1;
396 }
397 thread_setname(ctx->manager_tid, "mc-prx-manager");
398 pthread_mutex_unlock(&ctx->manager_lock);
399
400 return 0;
401 }
402
403 // this splits a list of lua startfiles into independent data chunk buffers
404 // we call this once the first time we start so we can use mallocs without
405 // having to armor against runtime malloc failures... as much.
proxy_init_startfiles(proxy_ctx_t * ctx,const char * files)406 static int proxy_init_startfiles(proxy_ctx_t *ctx, const char *files) {
407 char *flist = strdup(settings.proxy_startfile);
408 if (flist == NULL) {
409 fprintf(stderr, "ERROR: failed to allocate memory for parsing proxy_startfile\n");
410 return -1;
411 }
412
413 char *b;
414 for (const char *p = strtok_r(flist, ":", &b);
415 p != NULL;
416 p = strtok_r(NULL, ":", &b)) {
417 struct _mcp_luafile *db = calloc(sizeof(struct _mcp_luafile), 1);
418 if (db == NULL) {
419 fprintf(stderr, "ERROR: failed to allocate memory for parsing proxy_startfile\n");
420 return -1;
421 }
422 db->size = MCP_LUAFILE_SIZE;
423 db->buf = calloc(db->size, 1);
424 db->fname = strdup(p);
425 if (db->buf == NULL || db->fname == NULL) {
426 fprintf(stderr, "ERROR: failed to allocate memory while parsing proxy_startfile\n");
427 return -1;
428 }
429
430 // put new file at tail
431 if (ctx->proxy_code == NULL) {
432 ctx->proxy_code = db;
433 } else {
434 struct _mcp_luafile *list = ctx->proxy_code;
435 while (list->next) {
436 list = list->next;
437 }
438 assert(list->next == NULL);
439 list->next = db;
440 }
441 }
442
443 free(flist);
444 return 0;
445 }
446
proxy_load_files(proxy_ctx_t * ctx)447 static int proxy_load_files(proxy_ctx_t *ctx) {
448 lua_State *L = ctx->proxy_state;
449 struct _mcp_luafile *db = ctx->proxy_code;
450 assert(db);
451
452 while (db) {
453 int res;
454 // clear the buffer for reuse.
455 memset(db->buf, 0, db->size);
456 db->used = 0;
457
458 res = luaL_loadfile(L, db->fname);
459 if (res != LUA_OK) {
460 fprintf(stderr, "ERROR: Failed to load proxy_startfile: %s\n", lua_tostring(L, -1));
461 return -1;
462 }
463 // LUA_OK, LUA_ERRSYNTAX, LUA_ERRMEM, LUA_ERRFILE
464
465 // Now we need to dump the compiled code into bytecode.
466 // This will then get loaded into worker threads.
467 lua_dump(L, _dump_helper, db, 0);
468 // 0 means no error.
469
470 // now we complete the data load by calling the function.
471 res = lua_pcall(L, 0, LUA_MULTRET, 0);
472 if (res != LUA_OK) {
473 fprintf(stderr, "ERROR: Failed to load data into lua config state: %s\n", lua_tostring(L, -1));
474 exit(EXIT_FAILURE);
475 }
476
477 db = db->next;
478 }
479
480 return 0;
481 }
482
proxy_load_config(void * arg)483 int proxy_load_config(void *arg) {
484 proxy_ctx_t *ctx = arg;
485 lua_State *L = ctx->proxy_state;
486 int res = 0;
487
488 if (ctx->proxy_code == NULL) {
489 res = proxy_init_startfiles(ctx, settings.proxy_startfile);
490 if (res != 0) {
491 return res;
492 }
493 }
494
495 // load each of the data files in order.
496 res = proxy_load_files(ctx);
497
498 // call the mcp_config_pools function to get the central backends.
499 lua_getglobal(L, "mcp_config_pools");
500
501 if (lua_isnil(L, -1)) {
502 fprintf(stderr, "ERROR: Configuration file missing 'mcp_config_pools' function\n");
503 exit(EXIT_FAILURE);
504 }
505 lua_pushnil(L); // no "old" config yet.
506 if (lua_pcall(L, 1, 1, 0) != LUA_OK) {
507 fprintf(stderr, "ERROR: Failed to execute mcp_config_pools: %s\n", lua_tostring(L, -1));
508 exit(EXIT_FAILURE);
509 }
510
511 // result is our main config.
512 return 0;
513 }
514
_copy_pool(lua_State * from,lua_State * to,LIBEVENT_THREAD * thr)515 static int _copy_pool(lua_State *from, lua_State *to, LIBEVENT_THREAD *thr) {
516 // from, -3 should have the userdata.
517 mcp_pool_t *p = luaL_checkudata(from, -3, "mcp.pool");
518 size_t size = sizeof(mcp_pool_proxy_t);
519 mcp_pool_proxy_t *pp = lua_newuserdatauv(to, size, 0);
520 luaL_setmetatable(to, "mcp.pool_proxy");
521
522 pp->main = p;
523 if (p->use_iothread) {
524 pp->pool = p->pool;
525 } else {
526 // allow 0 indexing for backends when unique to each worker thread
527 pp->pool = &p->pool[thr->thread_baseid * p->pool_size];
528 }
529 lua_pushvalue(from, -3); // dupe pool for referencing
530 mcp_gobj_ref(from, &p->g); // pops obj copy
531 return 0;
532 }
533
534 static void _copy_config_table(lua_State *from, lua_State *to, LIBEVENT_THREAD *thr);
535 // (from, -1) is the source value
536 // should end with (to, -1) being the new value.
_copy_config_table(lua_State * from,lua_State * to,LIBEVENT_THREAD * thr)537 static void _copy_config_table(lua_State *from, lua_State *to, LIBEVENT_THREAD *thr) {
538 int type = lua_type(from, -1);
539 bool found = false;
540 luaL_checkstack(from, 4, "configuration error: table recursion too deep");
541 luaL_checkstack(to, 4, "configuration error: table recursion too deep");
542 switch (type) {
543 case LUA_TNIL:
544 lua_pushnil(to);
545 break;
546 case LUA_TUSERDATA:
547 // see dump_stack() - check if it's something we handle.
548 if (lua_getmetatable(from, -1) != 0) {
549 lua_pushstring(from, "__name");
550 if (lua_rawget(from, -2) != LUA_TNIL) {
551 const char *name = lua_tostring(from, -1);
552 if (strcmp(name, "mcp.pool") == 0) {
553 _copy_pool(from, to, thr);
554 found = true;
555 } else if (strcmp(name, "mcp.ratelim_global_tbf") == 0) {
556 mcp_ratelim_proxy_tbf(from, to);
557 found = true;
558 }
559 }
560 lua_pop(from, 2);
561 }
562 if (!found) {
563 proxy_lua_error(from, "unhandled userdata type in configuration table\n");
564 }
565 break;
566 case LUA_TNUMBER:
567 if (lua_isinteger(from, -1)) {
568 lua_pushinteger(to, lua_tointeger(from, -1));
569 } else {
570 lua_pushnumber(to, lua_tonumber(from, -1));
571 }
572 break;
573 case LUA_TSTRING:
574 lua_pushlstring(to, lua_tostring(from, -1), lua_rawlen(from, -1));
575 break;
576 case LUA_TBOOLEAN:
577 lua_pushboolean(to, lua_toboolean(from, -1));
578 break;
579 case LUA_TTABLE:
580 // TODO (v2): copy the metatable first?
581 // TODO (v2): size narr/nrec from old table and use createtable to
582 // pre-allocate.
583 lua_newtable(to); // throw new table on worker
584 int t = lua_absindex(from, -1); // static index of table to copy.
585 int nt = lua_absindex(to, -1); // static index of new table.
586 lua_pushnil(from); // start iterator for main
587 while (lua_next(from, t) != 0) {
588 // (key, -2), (val, -1)
589 int keytype = lua_type(from, -2);
590 // to intentionally limit complexity and allow for future
591 // optimizations we restrict what types may be used as keys
592 // for sub-tables.
593 switch (keytype) {
594 case LUA_TSTRING:
595 // to[l]string converts the actual key in the table
596 // into a string, so we must not do that unless it
597 // already is one.
598 lua_pushlstring(to, lua_tostring(from, -2), lua_rawlen(from, -2));
599 break;
600 case LUA_TNUMBER:
601 if (lua_isinteger(from, -2)) {
602 lua_pushinteger(to, lua_tointeger(from, -2));
603 } else {
604 lua_pushnumber(to, lua_tonumber(from, -2));
605 }
606 break;
607 default:
608 proxy_lua_error(from, "configuration table keys must be strings or numbers");
609 }
610 // lua_settable(to, n) - n being the table
611 // takes -2 key -1 value, pops both.
612 // use lua_absindex(L, -1) and so to convert easier?
613 _copy_config_table(from, to, thr); // push next value.
614 lua_settable(to, nt);
615 lua_pop(from, 1); // drop value, keep key.
616 }
617 // top of from is now the original table.
618 // top of to should be the new table.
619 break;
620 default:
621 proxy_lua_error(from, "unhandled data type in configuration table\n");
622 }
623 }
624
625 // Run from proxy worker to coordinate code reload.
626 // config_lock must be held first.
proxy_worker_reload(void * arg,LIBEVENT_THREAD * thr)627 void proxy_worker_reload(void *arg, LIBEVENT_THREAD *thr) {
628 proxy_ctx_t *ctx = arg;
629 pthread_mutex_lock(&ctx->worker_lock);
630 if (proxy_thread_loadconf(ctx, thr) != 0) {
631 ctx->worker_failed = true;
632 }
633 ctx->worker_done = true;
634 pthread_cond_signal(&ctx->worker_cond);
635 pthread_mutex_unlock(&ctx->worker_lock);
636 }
637
638 // FIXME (v2): need to test how to recover from an actual error here. error message
639 // needs to go somewhere useful, counters added, etc.
proxy_thread_loadconf(proxy_ctx_t * ctx,LIBEVENT_THREAD * thr)640 int proxy_thread_loadconf(proxy_ctx_t *ctx, LIBEVENT_THREAD *thr) {
641 lua_State *L = thr->L;
642 // load the precompiled config functions.
643
644 struct _mcp_luafile *db = ctx->proxy_code;
645 while (db) {
646 db->loaded = false;
647 int res = lua_load(L, _load_helper, db, "config", NULL);
648 if (res != LUA_OK) {
649 fprintf(stderr, "Failed to load data into worker thread: %s\n", lua_tostring(L, -1));
650 return -1;
651 }
652
653 res = lua_pcall(L, 0, LUA_MULTRET, 0);
654 if (res != LUA_OK) {
655 // FIXME (v2): don't exit here!
656 fprintf(stderr, "Failed to load data into worker thread: %s\n", lua_tostring(L, -1));
657 return -1;
658 }
659
660 db = db->next;
661 }
662
663 lua_getglobal(L, "mcp_config_routes");
664 // create deepcopy of argument to pass into mcp_config_routes.
665 // FIXME (v2): to avoid lua SIGABRT'ing on errors we need to protect the call
666 // normal pattern:
667 // lua_pushcfunction(L, &_copy_config_table);
668 // lua_pushlightuserdata(L, &L2);
669 // res = la_pcall(L, etc);
670 // ... but since this is cross-VM we could get errors from not the
671 // protected VM, breaking setjmp/etc.
672 // for this part of the code we should override lua_atpanic(),
673 // allowing us to specifically recover and bail.
674 // However, again, this will require the next version of the config reload
675 // code since we are re-using the VM's and a panic can leave us in a
676 // broken state.
677 // If the setjump/longjump combos are compatible a pcall for from and
678 // atpanic for to might work best, since the config VM is/should be long
679 // running and worker VM's should be rotated.
680 _copy_config_table(ctx->proxy_state, L, thr);
681
682 // copied value is in front of route function, now call it.
683 if (lua_pcall(L, 1, 0, 0) != LUA_OK) {
684 fprintf(stderr, "Failed to execute mcp_config_routes: %s\n", lua_tostring(L, -1));
685 return -1;
686 }
687
688 // update user stats
689 STAT_L(ctx);
690 struct proxy_user_stats_entry *us = ctx->user_stats;
691 int stats_num = ctx->user_stats_num;
692 struct proxy_user_stats *tus = NULL;
693 if (stats_num != 0) {
694 pthread_mutex_lock(&thr->stats.mutex);
695 if (thr->proxy_user_stats == NULL) {
696 tus = calloc(1, sizeof(struct proxy_user_stats));
697 thr->proxy_user_stats = tus;
698 } else {
699 tus = thr->proxy_user_stats;
700 }
701
702 // originally this was a realloc routine but it felt fragile.
703 // that might still be a better idea; still need to zero out the end.
704 uint64_t *counters = calloc(stats_num, sizeof(uint64_t));
705
706 // note that num_stats can _only_ grow in size.
707 if (tus->counters) {
708 // pull in old counters, if the names didn't change.
709 for (int x = 0; x < tus->num_stats; x++) {
710 if (us[x].reset) {
711 counters[x] = 0;
712 } else {
713 counters[x] = tus->counters[x];
714 }
715 }
716 assert(tus->num_stats <= stats_num);
717 free(tus->counters);
718 }
719 tus->counters = counters;
720 tus->num_stats = stats_num;
721
722 pthread_mutex_unlock(&thr->stats.mutex);
723 }
724 // also grab the concurrent request limit
725 thr->proxy_active_req_limit = ctx->active_req_limit;
726 STAT_UL(ctx);
727
728 // update limit counter(s)
729 pthread_mutex_lock(&thr->proxy_limit_lock);
730 thr->proxy_buffer_memory_limit = ctx->buffer_memory_limit;
731 pthread_mutex_unlock(&thr->proxy_limit_lock);
732
733 return 0;
734 }
735
736
737