1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // Functions related to the configuration management threads and VM
3 // TODO (v2): move worker thread related code back out of here.
4 
5 #include "proxy.h"
6 
7 // not using queue.h becuase those require specific storage for HEAD.
8 // it's not possible to have the HEAD simply be in the proxy context because
9 // it would need to know the offset into this private structure.
10 // This might be doable but the problem is too trivial to spend time on it.
11 #define MCP_LUAFILE_SIZE 16384
12 struct _mcp_luafile {
13     size_t size;
14     size_t used;
15     bool loaded; // flip this to false before each load use
16     char *buf;
17     char *fname; // filename to load
18     struct _mcp_luafile *next;
19 };
20 
_dump_helper(lua_State * L,const void * p,size_t sz,void * ud)21 static int _dump_helper(lua_State *L, const void *p, size_t sz, void *ud) {
22     (void)L;
23     struct _mcp_luafile *db = ud;
24     if (db->used + sz > db->size) {
25         // increase by blocks instead of doubling to avoid memory waste
26         db->size += MCP_LUAFILE_SIZE;
27         char *nb = realloc(db->buf, db->size);
28         if (nb == NULL) {
29             return -1;
30         }
31         db->buf = nb;
32     }
33     memcpy(db->buf + db->used, (const char *)p, sz);
34     db->used += sz;
35     return 0;
36 }
37 
_load_helper(lua_State * L,void * data,size_t * size)38 static const char * _load_helper(lua_State *L, void *data, size_t *size) {
39     (void)L;
40     struct _mcp_luafile *db = data;
41     if (db->loaded) {
42         *size = 0;
43         return NULL;
44     }
45     *size = db->used;
46     db->loaded = true;
47     return db->buf;
48 }
49 
proxy_start_reload(void * arg)50 void proxy_start_reload(void *arg) {
51     proxy_ctx_t *ctx = arg;
52     if (pthread_mutex_trylock(&ctx->config_lock) == 0) {
53         ctx->loading = true;
54         pthread_cond_signal(&ctx->config_cond);
55         pthread_mutex_unlock(&ctx->config_lock);
56     }
57 }
58 
proxy_first_confload(void * arg)59 int proxy_first_confload(void *arg) {
60     proxy_ctx_t *ctx = arg;
61     pthread_mutex_lock(&ctx->config_lock);
62     ctx->loading = true;
63     pthread_cond_signal(&ctx->config_cond);
64     pthread_mutex_unlock(&ctx->config_lock);
65 
66     while (1) {
67         bool stop = false;
68         pthread_mutex_lock(&ctx->config_lock);
69         if (!ctx->loading) {
70             stop = true;
71         }
72         pthread_mutex_unlock(&ctx->config_lock);
73         if (stop)
74             break;
75     }
76     int fails = 0;
77     STAT_L(ctx);
78     fails = ctx->global_stats.config_reload_fails;
79     STAT_UL(ctx);
80     if (fails) {
81         return -1;
82     }
83 
84     return 0;
85 }
86 
87 // Manages a queue of inbound objects destined to be deallocated.
_proxy_manager_thread(void * arg)88 static void *_proxy_manager_thread(void *arg) {
89     proxy_ctx_t *ctx = arg;
90     globalobj_head_t head;
91 
92     pthread_mutex_lock(&ctx->manager_lock);
93     while (1) {
94         STAILQ_INIT(&head);
95         while (STAILQ_EMPTY(&ctx->manager_head)) {
96             pthread_cond_wait(&ctx->manager_cond, &ctx->manager_lock);
97         }
98 
99         // pull dealloc queue into local queue.
100         STAILQ_CONCAT(&head, &ctx->manager_head);
101         pthread_mutex_unlock(&ctx->manager_lock);
102 
103         // Config lock is required for using config VM.
104         pthread_mutex_lock(&ctx->config_lock);
105         lua_State *L = ctx->proxy_state;
106         struct mcp_globalobj_s *g;
107         STAILQ_FOREACH(g, &head, next) {
108             // we let the object _gc() handle backend/etc references
109             pthread_mutex_lock(&g->lock);
110             assert(g->self_ref != -1);
111             // See comment on mcp_gobj_ref()
112             if (g->self_ref < -1) {
113                 g->refcount--;
114                 g->self_ref = -g->self_ref;
115             }
116             assert(g->self_ref > 0 || g->refcount == 0);
117             if (g->refcount == 0) {
118                 luaL_unref(L, LUA_REGISTRYINDEX, g->self_ref);
119                 g->self_ref = -1;
120             }
121             pthread_mutex_unlock(&g->lock);
122         }
123         // force lua garbage collection so any resources close out quickly.
124         lua_gc(L, LUA_GCCOLLECT);
125         // twice because objects with garbage collector handlers are only
126         // marked on the first collection cycle.
127         lua_gc(L, LUA_GCCOLLECT);
128         // must hold this lock while interacting with the config VM.
129         pthread_mutex_unlock(&ctx->config_lock);
130 
131         // done.
132         pthread_mutex_lock(&ctx->manager_lock);
133     }
134 
135     return NULL;
136 }
137 
138 // TODO: only run routine if something changed.
139 // This compacts all of the names for proxy user stats into a linear buffer,
140 // which can save considerable CPU when emitting a large number of stats. It
141 // also saves some total memory by having one linear buffer instead of many
142 // potentially small aligned allocations.
proxy_config_stats_prep(proxy_ctx_t * ctx)143 static void proxy_config_stats_prep(proxy_ctx_t *ctx) {
144     char *oldnamebuf = ctx->user_stats_namebuf;
145     struct proxy_user_stats_entry *entries = ctx->user_stats;
146     size_t namelen = 0;
147 
148     STAT_L(ctx);
149     // find size of new compact name buffer
150     for (int x = 0; x < ctx->user_stats_num; x++) {
151         if (entries[x].name) {
152             namelen += strlen(entries[x].name) + 1; // null byte
153         } else if (entries[x].cname) {
154             char *name = oldnamebuf + entries[x].cname;
155             namelen += strlen(name) + 1;
156         }
157     }
158     // start one byte into the cname buffer so we can do faster checks on if a
159     // name exists or not. so extend the buffer by one byte.
160     namelen++;
161 
162     char *namebuf = calloc(1, namelen);
163     // copy names into the compact buffer
164     char *p = namebuf + 1;
165     for (int x = 0; x < ctx->user_stats_num; x++) {
166         struct proxy_user_stats_entry *e = &entries[x];
167         char *newname = NULL;
168         if (e->name) {
169             // skip blank names.
170             if (e->name[0]) {
171                 newname = e->name;
172             }
173         } else if (e->cname) {
174             // else re-copy from old buffer
175             newname = oldnamebuf + e->cname;
176         }
177 
178         if (newname) {
179             // set the buffer offset for this name
180             e->cname = p - namebuf;
181             // copy in the name
182             size_t nlen = strlen(newname);
183             memcpy(p, newname, nlen);
184             p += nlen;
185             *p = '\0'; // add null byte
186             p++;
187         } else {
188             // name is blank or doesn't exist, ensure we skip it.
189             e->cname = 0;
190         }
191 
192         if (e->name) {
193             // now get rid of the name buffer.
194             free(e->name);
195             e->name = NULL;
196         }
197     }
198 
199     ctx->user_stats_namebuf = namebuf;
200     if (oldnamebuf) {
201         free(oldnamebuf);
202     }
203     STAT_UL(ctx);
204 }
205 
proxy_config_reload(proxy_ctx_t * ctx)206 static void proxy_config_reload(proxy_ctx_t *ctx) {
207     LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "start");
208     STAT_INCR(ctx, config_reloads, 1);
209     // gen. used for tracking object lifecycles over time.
210     // ie: ensuring old things are unloaded.
211     ctx->config_generation++;
212     lua_State *L = ctx->proxy_state;
213     lua_settop(L, 0); // clear off any crud that could have been left on the stack.
214 
215     // The main stages of config reload are:
216     // - load and execute the config file
217     // - run mcp_config_pools()
218     // - for each worker:
219     //   - copy and execute new lua code
220     //   - copy selector table
221     //   - run mcp_config_routes()
222 
223     if (proxy_load_config(ctx) != 0) {
224         // Failed to load. log and wait for a retry.
225         STAT_INCR(ctx, config_reload_fails, 1);
226         LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "failed");
227         return;
228     }
229 
230     proxy_config_stats_prep(ctx);
231 
232     // TODO (v2): create a temporary VM to test-load the worker code into.
233     // failing to load partway through the worker VM reloads can be
234     // critically bad if we're not careful about references.
235     // IE: the config VM _must_ hold references to selectors and backends
236     // as long as they exist in any worker for any reason.
237 
238     for (int x = 0; x < settings.num_threads; x++) {
239         LIBEVENT_THREAD *thr = get_worker_thread(x);
240 
241         pthread_mutex_lock(&ctx->worker_lock);
242         ctx->worker_done = false;
243         ctx->worker_failed = false;
244         proxy_reload_notify(thr);
245         while (!ctx->worker_done) {
246             // in case of spurious wakeup.
247             pthread_cond_wait(&ctx->worker_cond, &ctx->worker_lock);
248         }
249         pthread_mutex_unlock(&ctx->worker_lock);
250 
251         // Code load bailed.
252         if (ctx->worker_failed) {
253             STAT_INCR(ctx, config_reload_fails, 1);
254             LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "failed");
255             return;
256         }
257     }
258 
259     // Need to clear the reset flag for the stats system after pushing the new
260     // config to each worker.
261     STAT_L(ctx);
262     for (int x = 0; x < ctx->user_stats_num; x++) {
263         ctx->user_stats[x].reset = false;
264     }
265     STAT_UL(ctx);
266 
267     lua_pop(ctx->proxy_state, 1); // drop config_pools return value
268     LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "done");
269 }
270 
271 // Very basic scheduler. Unsorted because we don't expect a huge list of
272 // functions to run.
proxy_run_crons(proxy_ctx_t * ctx)273 static void proxy_run_crons(proxy_ctx_t *ctx) {
274     lua_State *L = ctx->proxy_state;
275     assert(lua_gettop(L) == 0);
276     assert(ctx->cron_ref);
277     struct timespec now;
278 
279     // Fetch the cron table. Created on startup so must exist.
280     lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->cron_ref);
281 
282     clock_gettime(CLOCK_REALTIME, &now);
283     if (ctx->cron_next <= now.tv_sec) {
284         ctx->cron_next = INT_MAX;
285     } else {
286         // no crons ready.
287         return;
288     }
289 
290     // Loop the cron entries.
291     lua_pushnil(L);
292     while (lua_next(L, 1) != 0) {
293         const char *key = lua_tostring(L, -2);
294         mcp_cron_t *ce = lua_touserdata(L, -1);
295         int idx = lua_absindex(L, -1);
296 
297         // check generation.
298         if (ctx->config_generation != ce->gen) {
299             // remove entry.
300             lua_pushnil(L);
301             lua_setfield(L, 1, key);
302         } else if (ce->next <= now.tv_sec) {
303             // grab func and execute it
304             lua_getiuservalue(L, idx, 1);
305             // no arguments or return values
306             int res = lua_pcall(L, 0, 0, 0);
307             STAT_INCR(ctx, config_cron_runs, 1);
308             if (res != LUA_OK) {
309                 LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_ERROR, NULL, lua_tostring(L, -1));
310                 STAT_INCR(ctx, config_cron_fails, 1);
311                 lua_pop(L, 1); // drop error.
312             }
313 
314             if (ce->repeat) {
315                 ce->next = now.tv_sec + ce->every;
316                 // if rescheduled, check next against ctx. update if sooner
317                 if (ctx->cron_next > ce->next) {
318                     ctx->cron_next = ce->next;
319                 }
320             } else {
321                 // non-repeating cron. delete entry.
322                 lua_pushnil(L);
323                 lua_setfield(L, 1, key);
324             }
325         } else {
326             // not scheduled to run now, but check if we're next.
327             if (ctx->cron_next > ce->next) {
328                 ctx->cron_next = ce->next;
329             }
330         }
331 
332         lua_pop(L, 1); // drop value so we can loop.
333     }
334 
335     lua_pop(L, 1); // drop cron table.
336 }
337 
338 // Thread handling the configuration reload sequence.
339 // TODO (v2): get a logger instance.
340 // TODO (v2): making this "safer" will require a few phases of work.
341 // 1) JFDI
342 // 2) "test VM" -> from config thread, test the worker reload portion.
343 // 3) "unit testing" -> from same temporary worker VM, execute set of
344 // integration tests that must pass.
345 // 4) run update on each worker, collecting new mcp.attach() hooks.
346 //    Once every worker has successfully executed and set new hooks, roll
347 //    through a _second_ time to actually swap the hook structures and unref
348 //    the old structures where marked dirty.
_proxy_config_thread(void * arg)349 static void *_proxy_config_thread(void *arg) {
350     proxy_ctx_t *ctx = arg;
351     struct timespec wait = {0};
352 
353     logger_create();
354     pthread_mutex_lock(&ctx->config_lock);
355     pthread_cond_signal(&ctx->config_cond);
356     while (1) {
357         ctx->loading = false;
358 
359         // cron only thinks in whole seconds.
360         wait.tv_sec = ctx->cron_next;
361         pthread_cond_timedwait(&ctx->config_cond, &ctx->config_lock, &wait);
362 
363         proxy_run_crons(ctx);
364 
365         if (ctx->loading) {
366             proxy_config_reload(ctx);
367         }
368     }
369 
370     return NULL;
371 }
372 
_start_proxy_config_threads(proxy_ctx_t * ctx)373 int _start_proxy_config_threads(proxy_ctx_t *ctx) {
374     int ret;
375 
376     pthread_mutex_lock(&ctx->config_lock);
377     if ((ret = pthread_create(&ctx->config_tid, NULL,
378                     _proxy_config_thread, ctx)) != 0) {
379         fprintf(stderr, "Failed to start proxy configuration thread: %s\n",
380                 strerror(ret));
381         pthread_mutex_unlock(&ctx->config_lock);
382         return -1;
383     }
384     thread_setname(ctx->config_tid, "mc-prx-config");
385     // Avoid returning until the config thread has actually started.
386     pthread_cond_wait(&ctx->config_cond, &ctx->config_lock);
387     pthread_mutex_unlock(&ctx->config_lock);
388 
389     pthread_mutex_lock(&ctx->manager_lock);
390     if ((ret = pthread_create(&ctx->manager_tid, NULL,
391                     _proxy_manager_thread, ctx)) != 0) {
392         fprintf(stderr, "Failed to start proxy manager thread: %s\n",
393                 strerror(ret));
394         pthread_mutex_unlock(&ctx->manager_lock);
395         return -1;
396     }
397     thread_setname(ctx->manager_tid, "mc-prx-manager");
398     pthread_mutex_unlock(&ctx->manager_lock);
399 
400     return 0;
401 }
402 
403 // this splits a list of lua startfiles into independent data chunk buffers
404 // we call this once the first time we start so we can use mallocs without
405 // having to armor against runtime malloc failures... as much.
proxy_init_startfiles(proxy_ctx_t * ctx,const char * files)406 static int proxy_init_startfiles(proxy_ctx_t *ctx, const char *files) {
407     char *flist = strdup(settings.proxy_startfile);
408     if (flist == NULL) {
409         fprintf(stderr, "ERROR: failed to allocate memory for parsing proxy_startfile\n");
410         return -1;
411     }
412 
413     char *b;
414     for (const char *p = strtok_r(flist, ":", &b);
415             p != NULL;
416             p = strtok_r(NULL, ":", &b)) {
417         struct _mcp_luafile *db = calloc(sizeof(struct _mcp_luafile), 1);
418         if (db == NULL) {
419             fprintf(stderr, "ERROR: failed to allocate memory for parsing proxy_startfile\n");
420             return -1;
421         }
422         db->size = MCP_LUAFILE_SIZE;
423         db->buf = calloc(db->size, 1);
424         db->fname = strdup(p);
425         if (db->buf == NULL || db->fname == NULL) {
426             fprintf(stderr, "ERROR: failed to allocate memory while parsing proxy_startfile\n");
427             return -1;
428         }
429 
430         // put new file at tail
431         if (ctx->proxy_code == NULL) {
432             ctx->proxy_code = db;
433         } else {
434             struct _mcp_luafile *list = ctx->proxy_code;
435             while (list->next) {
436                 list = list->next;
437             }
438             assert(list->next == NULL);
439             list->next = db;
440         }
441     }
442 
443     free(flist);
444     return 0;
445 }
446 
proxy_load_files(proxy_ctx_t * ctx)447 static int proxy_load_files(proxy_ctx_t *ctx) {
448     lua_State *L = ctx->proxy_state;
449     struct _mcp_luafile *db = ctx->proxy_code;
450     assert(db);
451 
452     while (db) {
453         int res;
454         // clear the buffer for reuse.
455         memset(db->buf, 0, db->size);
456         db->used = 0;
457 
458         res = luaL_loadfile(L, db->fname);
459         if (res != LUA_OK) {
460             fprintf(stderr, "ERROR: Failed to load proxy_startfile: %s\n", lua_tostring(L, -1));
461             return -1;
462         }
463         // LUA_OK, LUA_ERRSYNTAX, LUA_ERRMEM, LUA_ERRFILE
464 
465         // Now we need to dump the compiled code into bytecode.
466         // This will then get loaded into worker threads.
467         lua_dump(L, _dump_helper, db, 0);
468         // 0 means no error.
469 
470         // now we complete the data load by calling the function.
471         res = lua_pcall(L, 0, LUA_MULTRET, 0);
472         if (res != LUA_OK) {
473             fprintf(stderr, "ERROR: Failed to load data into lua config state: %s\n", lua_tostring(L, -1));
474             exit(EXIT_FAILURE);
475         }
476 
477         db = db->next;
478     }
479 
480     return 0;
481 }
482 
proxy_load_config(void * arg)483 int proxy_load_config(void *arg) {
484     proxy_ctx_t *ctx = arg;
485     lua_State *L = ctx->proxy_state;
486     int res = 0;
487 
488     if (ctx->proxy_code == NULL) {
489         res = proxy_init_startfiles(ctx, settings.proxy_startfile);
490         if (res != 0) {
491             return res;
492         }
493     }
494 
495     // load each of the data files in order.
496     res = proxy_load_files(ctx);
497 
498     // call the mcp_config_pools function to get the central backends.
499     lua_getglobal(L, "mcp_config_pools");
500 
501     if (lua_isnil(L, -1)) {
502         fprintf(stderr, "ERROR: Configuration file missing 'mcp_config_pools' function\n");
503         exit(EXIT_FAILURE);
504     }
505     lua_pushnil(L); // no "old" config yet.
506     if (lua_pcall(L, 1, 1, 0) != LUA_OK) {
507         fprintf(stderr, "ERROR: Failed to execute mcp_config_pools: %s\n", lua_tostring(L, -1));
508         exit(EXIT_FAILURE);
509     }
510 
511     // result is our main config.
512     return 0;
513 }
514 
_copy_pool(lua_State * from,lua_State * to,LIBEVENT_THREAD * thr)515 static int _copy_pool(lua_State *from, lua_State *to, LIBEVENT_THREAD *thr) {
516     // from, -3 should have the userdata.
517     mcp_pool_t *p = luaL_checkudata(from, -3, "mcp.pool");
518     size_t size = sizeof(mcp_pool_proxy_t);
519     mcp_pool_proxy_t *pp = lua_newuserdatauv(to, size, 0);
520     luaL_setmetatable(to, "mcp.pool_proxy");
521 
522     pp->main = p;
523     if (p->use_iothread) {
524         pp->pool = p->pool;
525     } else {
526         // allow 0 indexing for backends when unique to each worker thread
527         pp->pool = &p->pool[thr->thread_baseid * p->pool_size];
528     }
529     lua_pushvalue(from, -3); // dupe pool for referencing
530     mcp_gobj_ref(from, &p->g); // pops obj copy
531     return 0;
532 }
533 
534 static void _copy_config_table(lua_State *from, lua_State *to, LIBEVENT_THREAD *thr);
535 // (from, -1) is the source value
536 // should end with (to, -1) being the new value.
_copy_config_table(lua_State * from,lua_State * to,LIBEVENT_THREAD * thr)537 static void _copy_config_table(lua_State *from, lua_State *to, LIBEVENT_THREAD *thr) {
538     int type = lua_type(from, -1);
539     bool found = false;
540     luaL_checkstack(from, 4, "configuration error: table recursion too deep");
541     luaL_checkstack(to, 4, "configuration error: table recursion too deep");
542     switch (type) {
543         case LUA_TNIL:
544             lua_pushnil(to);
545             break;
546         case LUA_TUSERDATA:
547             // see dump_stack() - check if it's something we handle.
548             if (lua_getmetatable(from, -1) != 0) {
549                 lua_pushstring(from, "__name");
550                 if (lua_rawget(from, -2) != LUA_TNIL) {
551                     const char *name = lua_tostring(from, -1);
552                     if (strcmp(name, "mcp.pool") == 0) {
553                         _copy_pool(from, to, thr);
554                         found = true;
555                     } else if (strcmp(name, "mcp.ratelim_global_tbf") == 0) {
556                         mcp_ratelim_proxy_tbf(from, to);
557                         found = true;
558                     }
559                 }
560                 lua_pop(from, 2);
561             }
562             if (!found) {
563                 proxy_lua_error(from, "unhandled userdata type in configuration table\n");
564             }
565             break;
566         case LUA_TNUMBER:
567             if (lua_isinteger(from, -1)) {
568                 lua_pushinteger(to, lua_tointeger(from, -1));
569             } else {
570                 lua_pushnumber(to, lua_tonumber(from, -1));
571             }
572             break;
573         case LUA_TSTRING:
574             lua_pushlstring(to, lua_tostring(from, -1), lua_rawlen(from, -1));
575             break;
576         case LUA_TBOOLEAN:
577             lua_pushboolean(to, lua_toboolean(from, -1));
578             break;
579         case LUA_TTABLE:
580             // TODO (v2): copy the metatable first?
581             // TODO (v2): size narr/nrec from old table and use createtable to
582             // pre-allocate.
583             lua_newtable(to); // throw new table on worker
584             int t = lua_absindex(from, -1); // static index of table to copy.
585             int nt = lua_absindex(to, -1); // static index of new table.
586             lua_pushnil(from); // start iterator for main
587             while (lua_next(from, t) != 0) {
588                 // (key, -2), (val, -1)
589                 int keytype = lua_type(from, -2);
590                 // to intentionally limit complexity and allow for future
591                 // optimizations we restrict what types may be used as keys
592                 // for sub-tables.
593                 switch (keytype) {
594                     case LUA_TSTRING:
595                         // to[l]string converts the actual key in the table
596                         // into a string, so we must not do that unless it
597                         // already is one.
598                         lua_pushlstring(to, lua_tostring(from, -2), lua_rawlen(from, -2));
599                         break;
600                     case LUA_TNUMBER:
601                         if (lua_isinteger(from, -2)) {
602                             lua_pushinteger(to, lua_tointeger(from, -2));
603                         } else {
604                             lua_pushnumber(to, lua_tonumber(from, -2));
605                         }
606                         break;
607                     default:
608                         proxy_lua_error(from, "configuration table keys must be strings or numbers");
609                 }
610                 // lua_settable(to, n) - n being the table
611                 // takes -2 key -1 value, pops both.
612                 // use lua_absindex(L, -1) and so to convert easier?
613                 _copy_config_table(from, to, thr); // push next value.
614                 lua_settable(to, nt);
615                 lua_pop(from, 1); // drop value, keep key.
616             }
617             // top of from is now the original table.
618             // top of to should be the new table.
619             break;
620         default:
621             proxy_lua_error(from, "unhandled data type in configuration table\n");
622     }
623 }
624 
625 // Run from proxy worker to coordinate code reload.
626 // config_lock must be held first.
proxy_worker_reload(void * arg,LIBEVENT_THREAD * thr)627 void proxy_worker_reload(void *arg, LIBEVENT_THREAD *thr) {
628     proxy_ctx_t *ctx = arg;
629     pthread_mutex_lock(&ctx->worker_lock);
630     if (proxy_thread_loadconf(ctx, thr) != 0) {
631         ctx->worker_failed = true;
632     }
633     ctx->worker_done = true;
634     pthread_cond_signal(&ctx->worker_cond);
635     pthread_mutex_unlock(&ctx->worker_lock);
636 }
637 
638 // FIXME (v2): need to test how to recover from an actual error here. error message
639 // needs to go somewhere useful, counters added, etc.
proxy_thread_loadconf(proxy_ctx_t * ctx,LIBEVENT_THREAD * thr)640 int proxy_thread_loadconf(proxy_ctx_t *ctx, LIBEVENT_THREAD *thr) {
641     lua_State *L = thr->L;
642     // load the precompiled config functions.
643 
644     struct _mcp_luafile *db = ctx->proxy_code;
645     while (db) {
646         db->loaded = false;
647         int res = lua_load(L, _load_helper, db, "config", NULL);
648         if (res != LUA_OK) {
649             fprintf(stderr, "Failed to load data into worker thread: %s\n", lua_tostring(L, -1));
650             return -1;
651         }
652 
653         res = lua_pcall(L, 0, LUA_MULTRET, 0);
654         if (res != LUA_OK) {
655             // FIXME (v2): don't exit here!
656             fprintf(stderr, "Failed to load data into worker thread: %s\n", lua_tostring(L, -1));
657             return -1;
658         }
659 
660         db = db->next;
661     }
662 
663     lua_getglobal(L, "mcp_config_routes");
664     // create deepcopy of argument to pass into mcp_config_routes.
665     // FIXME (v2): to avoid lua SIGABRT'ing on errors we need to protect the call
666     // normal pattern:
667     // lua_pushcfunction(L, &_copy_config_table);
668     // lua_pushlightuserdata(L, &L2);
669     // res = la_pcall(L, etc);
670     // ... but since this is cross-VM we could get errors from not the
671     // protected VM, breaking setjmp/etc.
672     // for this part of the code we should override lua_atpanic(),
673     // allowing us to specifically recover and bail.
674     // However, again, this will require the next version of the config reload
675     // code since we are re-using the VM's and a panic can leave us in a
676     // broken state.
677     // If the setjump/longjump combos are compatible a pcall for from and
678     // atpanic for to might work best, since the config VM is/should be long
679     // running and worker VM's should be rotated.
680     _copy_config_table(ctx->proxy_state, L, thr);
681 
682     // copied value is in front of route function, now call it.
683     if (lua_pcall(L, 1, 0, 0) != LUA_OK) {
684         fprintf(stderr, "Failed to execute mcp_config_routes: %s\n", lua_tostring(L, -1));
685         return -1;
686     }
687 
688     // update user stats
689     STAT_L(ctx);
690     struct proxy_user_stats_entry *us = ctx->user_stats;
691     int stats_num = ctx->user_stats_num;
692     struct proxy_user_stats *tus = NULL;
693     if (stats_num != 0) {
694         pthread_mutex_lock(&thr->stats.mutex);
695         if (thr->proxy_user_stats == NULL) {
696             tus = calloc(1, sizeof(struct proxy_user_stats));
697             thr->proxy_user_stats = tus;
698         } else {
699             tus = thr->proxy_user_stats;
700         }
701 
702         // originally this was a realloc routine but it felt fragile.
703         // that might still be a better idea; still need to zero out the end.
704         uint64_t *counters = calloc(stats_num, sizeof(uint64_t));
705 
706         // note that num_stats can _only_ grow in size.
707         if (tus->counters) {
708             // pull in old counters, if the names didn't change.
709             for (int x = 0; x < tus->num_stats; x++) {
710                 if (us[x].reset) {
711                     counters[x] = 0;
712                 } else {
713                     counters[x] = tus->counters[x];
714                 }
715             }
716             assert(tus->num_stats <= stats_num);
717             free(tus->counters);
718         }
719         tus->counters = counters;
720         tus->num_stats = stats_num;
721 
722         pthread_mutex_unlock(&thr->stats.mutex);
723     }
724     // also grab the concurrent request limit
725     thr->proxy_active_req_limit = ctx->active_req_limit;
726     STAT_UL(ctx);
727 
728     // update limit counter(s)
729     pthread_mutex_lock(&thr->proxy_limit_lock);
730     thr->proxy_buffer_memory_limit = ctx->buffer_memory_limit;
731     pthread_mutex_unlock(&thr->proxy_limit_lock);
732 
733     return 0;
734 }
735 
736 
737