1 #ifndef PROXY_H
2 #define PROXY_H
3 
4 #include "memcached.h"
5 #include "extstore.h"
6 #include <string.h>
7 #include <stdlib.h>
8 #include <ctype.h>
9 #include <errno.h>
10 
11 #include <lua.h>
12 #include <lualib.h>
13 #include <lauxlib.h>
14 
15 #include "config.h"
16 
17 #if defined(__linux__)
18 #define USE_EVENTFD 1
19 #include <sys/eventfd.h>
20 #endif
21 
22 #ifdef HAVE_LIBURING
23 #include <liburing.h>
24 #include <poll.h> // POLLOUT for liburing.
25 #define PRING_QUEUE_SQ_ENTRIES 2048
26 #define PRING_QUEUE_CQ_ENTRIES 16384
27 #endif
28 
29 #include "proto_proxy.h"
30 #include "proto_text.h"
31 #include "queue.h"
32 #define XXH_INLINE_ALL // modifier for xxh3's include below
33 #include "xxhash.h"
34 
35 #ifdef PROXY_DEBUG
36 #define P_DEBUG(...) \
37     do { \
38         fprintf(stderr, __VA_ARGS__); \
39     } while (0)
40 #else
41 #define P_DEBUG(...)
42 #endif
43 
44 #define WSTAT_L(t) pthread_mutex_lock(&t->stats.mutex);
45 #define WSTAT_UL(t) pthread_mutex_unlock(&t->stats.mutex);
46 #define WSTAT_INCR(t, stat, amount) { \
47     pthread_mutex_lock(&t->stats.mutex); \
48     t->stats.stat += amount; \
49     pthread_mutex_unlock(&t->stats.mutex); \
50 }
51 #define WSTAT_DECR(t, stat, amount) { \
52     pthread_mutex_lock(&t->stats.mutex); \
53     t->stats.stat -= amount; \
54     pthread_mutex_unlock(&t->stats.mutex); \
55 }
56 #define STAT_L(ctx) pthread_mutex_lock(&ctx->stats_lock);
57 #define STAT_UL(ctx) pthread_mutex_unlock(&ctx->stats_lock);
58 #define STAT_INCR(ctx, stat, amount) { \
59         pthread_mutex_lock(&ctx->stats_lock); \
60         ctx->global_stats.stat += amount; \
61         pthread_mutex_unlock(&ctx->stats_lock); \
62 }
63 
64 #define STAT_DECR(ctx, stat, amount) { \
65         pthread_mutex_lock(&ctx->stats_lock); \
66         ctx->global_stats.stat -= amount; \
67         pthread_mutex_unlock(&ctx->stats_lock); \
68 }
69 
70 // FIXME (v2): do include dir properly.
71 #include "vendor/mcmc/mcmc.h"
72 
73 enum mcp_memprofile_types {
74     mcp_memp_free = 0,
75     mcp_memp_string,
76     mcp_memp_table,
77     mcp_memp_func,
78     mcp_memp_userdata,
79     mcp_memp_thread,
80     mcp_memp_default,
81     mcp_memp_realloc,
82 };
83 
84 struct mcp_memprofile {
85     struct timespec last_status; // for per-second prints on status
86     int id;
87     uint64_t allocs[8];
88     uint64_t alloc_bytes[8];
89 };
90 
91 // for various time conversion functions
92 #define NANOSECONDS(x) ((x) * 1E9 + 0.5)
93 #define MICROSECONDS(x) ((x) * 1E6 + 0.5)
94 
95 // Note: value created from thin air. Could be shorter.
96 #define MCP_REQUEST_MAXLEN KEY_MAX_LENGTH * 2
97 
98 #define ENDSTR "END\r\n"
99 #define ENDLEN sizeof(ENDSTR)-1
100 
101 #define MCP_BACKEND_UPVALUE 1
102 
103 #define MCP_YIELD_INTERNAL 1
104 #define MCP_YIELD_WAITCOND 2
105 #define MCP_YIELD_WAITHANDLE 3
106 #define MCP_YIELD_SLEEP 4
107 
108 #define SHAREDVM_FGEN_IDX 1
109 #define SHAREDVM_FGENSLOT_IDX 2
110 #define SHAREDVM_BACKEND_IDX 3
111 
112 // all possible commands.
113 #define CMD_FIELDS \
114     X(CMD_MG) \
115     X(CMD_MS) \
116     X(CMD_MD) \
117     X(CMD_MN) \
118     X(CMD_MA) \
119     X(CMD_ME) \
120     X(CMD_GET) \
121     X(CMD_GAT) \
122     X(CMD_SET) \
123     X(CMD_ADD) \
124     X(CMD_CAS) \
125     X(CMD_GETS) \
126     X(CMD_GATS) \
127     X(CMD_INCR) \
128     X(CMD_DECR) \
129     X(CMD_TOUCH) \
130     X(CMD_APPEND) \
131     X(CMD_DELETE) \
132     X(CMD_REPLACE) \
133     X(CMD_PREPEND) \
134     X(CMD_END_STORAGE) \
135     X(CMD_QUIT) \
136     X(CMD_STATS) \
137     X(CMD_SLABS) \
138     X(CMD_WATCH) \
139     X(CMD_LRU) \
140     X(CMD_VERSION) \
141     X(CMD_SHUTDOWN) \
142     X(CMD_EXTSTORE) \
143     X(CMD_FLUSH_ALL) \
144     X(CMD_VERBOSITY) \
145     X(CMD_LRU_CRAWLER) \
146     X(CMD_REFRESH_CERTS) \
147     X(CMD_CACHE_MEMLIMIT)
148 
149 #define X(name) name,
150 enum proxy_defines {
151     P_OK = 0,
152     CMD_FIELDS
153     CMD_SIZE, // used to define array size for command hooks.
154     CMD_ANY, // override _all_ commands
155     CMD_ANY_STORAGE, // override commands specific to key storage.
156     CMD_FINAL, // end cap for convenience.
157 };
158 #undef X
159 
160 // certain classes of ascii commands have similar parsing (ie;
161 // get/gets/gat/gats). Use types so we don't have to test a ton of them.
162 enum proxy_cmd_types {
163     CMD_TYPE_GENERIC = 0,
164     CMD_TYPE_GET, // get/gets/gat/gats
165     CMD_TYPE_META, // m*'s.
166 };
167 
168 typedef struct _io_pending_proxy_t io_pending_proxy_t;
169 typedef struct proxy_event_thread_s proxy_event_thread_t;
170 
171 #ifdef HAVE_LIBURING
172 // TODO: pass in cqe->res instead of cqe?
173 typedef void (*proxy_event_cb)(void *udata, struct io_uring_cqe *cqe);
174 typedef struct {
175     void *udata;
176     proxy_event_cb cb;
177     bool set; // NOTE: not sure if necessary if code structured properly
178 } proxy_event_t;
179 
180 void *proxy_event_thread_ur(void *arg);
181 #endif
182 
183 // Note: This ends up wasting a few counters, but simplifies the rest of the
184 // process for handling internal worker stats.
185 struct proxy_int_stats {
186     uint64_t vm_gc_runs;
187     uint64_t vm_memory_kb;
188     uint64_t counters[CMD_FINAL];
189 };
190 
191 struct proxy_user_stats {
192     int num_stats; // number of stats, for sizing various arrays
193     uint64_t *counters; // array of counters.
194 };
195 
196 struct proxy_user_stats_entry {
197     char *name;
198     unsigned int cname; // offset into compact name buffer
199     bool reset; // counter must reset this cycle
200 };
201 
202 struct proxy_global_stats {
203     uint64_t config_reloads;
204     uint64_t config_reload_fails;
205     uint64_t config_cron_runs;
206     uint64_t config_cron_fails;
207     uint64_t backend_total;
208     uint64_t backend_marked_bad; // backend set to autofail
209     uint64_t backend_failed; // an error caused a backend reset
210     uint64_t request_failed_depth; // requests fast-failed due to be depth
211 };
212 
213 struct proxy_tunables {
214     struct timeval connect;
215     struct timeval retry; // wait time before retrying a dead backend
216     struct timeval read;
217     struct timeval flap; // need to stay connected this long or it's flapping
218     float flap_backoff_ramp; // factorial for retry time
219     uint32_t flap_backoff_max; // don't backoff longer than this.
220     int backend_depth_limit; // requests fast fail once depth over this limit
221     int backend_failure_limit;
222     int max_ustats; // limit the ustats index.
223     bool tcp_keepalive;
224     bool use_iothread; // default for using the bg io thread.
225     bool use_tls; // whether or not be should use TLS
226     bool down; // backend is forced into a down/bad state.
227 };
228 
229 typedef STAILQ_HEAD(globalobj_head_s, mcp_globalobj_s) globalobj_head_t;
230 typedef struct {
231     lua_State *proxy_state; // main configuration vm
232     proxy_event_thread_t *proxy_io_thread;
233     uint64_t active_req_limit; // max total in-flight requests
234     uint64_t buffer_memory_limit; // max bytes for send/receive buffers.
235 #ifdef PROXY_TLS
236     void *tls_ctx;
237 #endif
238     int user_stats_num; // highest seen stat index
239     struct proxy_user_stats_entry *user_stats;
240     char *user_stats_namebuf; // compact linear buffer for stat names
241     struct proxy_tunables tunables; // NOTE: updates covered by stats_lock
242     struct proxy_global_stats global_stats;
243     // less frequently used entries down here.
244     void *proxy_code;
245     lua_State *proxy_sharedvm; // sub VM for short-lock global events/data
246     pthread_mutex_t stats_lock; // used for rare global counters
247     pthread_mutex_t config_lock;
248     pthread_cond_t config_cond;
249     pthread_t config_tid;
250     pthread_mutex_t worker_lock;
251     pthread_cond_t worker_cond;
252     pthread_t manager_tid; // deallocation management thread
253     pthread_mutex_t manager_lock;
254     pthread_cond_t manager_cond;
255     pthread_mutex_t sharedvm_lock; // protect statevm above
256     globalobj_head_t manager_head; // stack for pool deallocation.
257     int config_generation; // counter tracking config reloads
258     int cron_ref; // reference to lua cron table
259     int cron_next; // next cron to sleep to / execute
260     bool worker_done; // signal variable for the worker lock/cond system.
261     bool worker_failed; // covered by worker_lock as well.
262     bool use_uring; // use IO_URING for backend connections.
263     bool loading; // bool indicating an active config load.
264     bool memprofile; // indicate if we want to profile lua memory.
265     uint8_t memprofile_thread_counter;
266 } proxy_ctx_t;
267 
268 #define PROXY_GET_THR_CTX(L) ((*(LIBEVENT_THREAD **)lua_getextraspace(L))->proxy_ctx)
269 #define PROXY_GET_THR(L) (*(LIBEVENT_THREAD **)lua_getextraspace(L))
270 // Operations from the config VM don't have a libevent thread.
271 #define PROXY_GET_CTX(L) (*(proxy_ctx_t **)lua_getextraspace(L))
272 
273 struct proxy_hook_ref {
274     int lua_ref;
275     void *ctx; // if we're a generator based function.
276 };
277 
278 struct proxy_hook_tagged {
279     uint64_t tag;
280     struct proxy_hook_ref ref;
281 };
282 
283 struct proxy_hook {
284     struct proxy_hook_ref ref;
285     int tagcount;
286     struct proxy_hook_tagged *tagged; // array of possible tagged hooks.
287 };
288 
289 // TODO (v2): some hash functions (crc?) might require initializers. If we run into
290 // any the interface might need expanding.
291 typedef uint64_t (*key_hash_func)(const void *key, size_t len, uint64_t seed);
292 struct proxy_hash_func {
293     key_hash_func func;
294 };
295 typedef const char *(*key_hash_filter_func)(const char *conf, const char *key, size_t klen, size_t *newlen);
296 typedef uint32_t (*hash_selector_func)(uint64_t hash, void *ctx);
297 struct proxy_hash_caller {
298     hash_selector_func selector_func;
299     void *ctx;
300 };
301 
302 enum mcp_backend_states {
303     mcp_backend_read = 0, // waiting to read any response
304     mcp_backend_parse, // have some buffered data to check
305     mcp_backend_read_end, // looking for an "END" marker for GET
306     mcp_backend_want_read, // read more data to complete command
307     mcp_backend_next, // advance to the next IO
308     mcp_backend_next_close, // complete current request, then close socket
309 };
310 
311 typedef struct mcp_cron_s mcp_cron_t;
312 typedef struct mcp_backend_wrap_s mcp_backend_wrap_t;
313 typedef struct mcp_backend_label_s mcp_backend_label_t;
314 typedef struct mcp_backend_s mcp_backend_t;
315 typedef struct mcp_request_s mcp_request_t;
316 typedef struct mcp_parser_s mcp_parser_t;
317 typedef struct mcp_rcontext_s mcp_rcontext_t;
318 typedef struct mcp_funcgen_s mcp_funcgen_t;
319 
320 #define PARSER_MAX_TOKENS 24
321 
322 struct mcp_parser_meta_s {
323     uint64_t flags;
324 };
325 
326 // Note that we must use offsets into request for tokens,
327 // as *request can change between parsing and later accessors.
328 struct mcp_parser_s {
329     const char *request;
330     void *vbuf; // temporary buffer for holding value lengths.
331     uint8_t command;
332     uint8_t cmd_type; // command class.
333     uint8_t ntokens;
334     uint8_t keytoken; // because GAT. sigh. also cmds without a key.
335     uint32_t parsed; // how far into the request we parsed already
336     uint32_t reqlen; // full length of request buffer.
337     uint32_t endlen; // index to the start of \r\n or \n
338     int vlen;
339     uint32_t klen; // length of key.
340     uint16_t tokens[PARSER_MAX_TOKENS]; // offsets for start of each token
341     bool has_space; // a space was found after the last byte parsed.
342     bool noreply; // if quiet/noreply mode is set.
343     union {
344         struct mcp_parser_meta_s meta;
345     } t;
346 };
347 
348 #define MCP_PARSER_KEY(pr) (&pr.request[pr.tokens[pr.keytoken]])
349 
350 #define MAX_REQ_TOKENS 2
351 struct mcp_request_s {
352     mcp_parser_t pr; // non-lua-specific parser handling.
353     bool ascii_multiget; // ascii multiget mode. (hide errors/END)
354     char request[];
355 };
356 
357 struct mcp_cron_s {
358     uint32_t gen;
359     uint32_t next;
360     uint32_t every;
361     bool repeat;
362 };
363 
364 typedef STAILQ_HEAD(io_head_s, _io_pending_proxy_t) io_head_t;
365 #define MAX_LABELLEN 512
366 #define MAX_NAMELEN 255
367 #define MAX_PORTLEN 6
368 // TODO (v2): IOV_MAX tends to be 1000+ which would allow for more batching but we
369 // don't have a good temporary space and don't want to malloc/free on every
370 // write. transmit() uses the stack but we can't do that for uring's use case.
371 #if MEMCACHED_DEBUG
372 #define BE_IOV_MAX 128 // let bench tests trigger max condition easily
373 #elif (IOV_MAX > 1024)
374 #define BE_IOV_MAX 1024
375 #else
376 #define BE_IOV_MAX IOV_MAX
377 #endif
378 // lua descriptor object: passed to pools, which create wrappers.
379 struct mcp_backend_label_s {
380     char name[MAX_NAMELEN+1];
381     char port[MAX_PORTLEN+1];
382     char label[MAX_LABELLEN+1];
383     size_t llen; // cache label length for small speedup in pool creation.
384     int conncount; // number of sockets to make.
385     struct proxy_tunables tunables;
386 };
387 
388 // lua object wrapper meant to own a malloc'ed conn structure
389 // when this object is created, it ships its connection to the real owner
390 // (worker, IO thread, etc)
391 // when this object is garbage collected, it ships a notice to the owner
392 // thread to stop using and free the backend conn memory.
393 struct mcp_backend_wrap_s {
394     mcp_backend_t *be;
395 };
396 
397 struct mcp_backendconn_s {
398     mcp_backend_t *be_parent; // find the wrapper.
399     int self; // our index into the parent array.
400     int depth; // total number of requests in queue
401     int pending_read; // number of requests written to socket, pending read.
402     int failed_count; // number of fails (timeouts) in a row
403     int flap_count; // number of times we've "flapped" into bad state.
404     proxy_event_thread_t *event_thread; // event thread owning this backend.
405     void *client; // mcmc client
406 #ifdef PROXY_TLS
407     void *ssl;
408 #endif
409     io_head_t io_head; // stack of requests.
410     io_pending_proxy_t *io_next; // next request to write.
411     char *rbuf; // statically allocated read buffer.
412     size_t rbufused; // currently active bytes in the buffer
413     struct event main_event; // libevent: changes role, mostly for main read events
414     struct event write_event; // libevent: only used when socket wbuf full
415     struct event timeout_event; // libevent: alarm for pending reads
416     struct proxy_tunables tunables;
417     struct timeval last_failed; // time the backend was last reset.
418     enum mcp_backend_states state; // readback state machine
419     int connect_flags; // flags to pass to mcmc_connect
420     bool connecting; // in the process of an asynch connection.
421     bool validating; // in process of validating a new backend connection.
422     bool can_write; // recently got a WANT_WRITE or are connecting.
423     bool bad; // timed out, marked as bad.
424 #ifndef PROXY_TLS
425     bool ssl;
426 #endif
427     struct iovec write_iovs[BE_IOV_MAX]; // iovs to stage batched writes
428 };
429 
430 // TODO: move depth and flags to a second top level array so we can make index
431 // decisions from fewer memory stalls.
432 struct mcp_backend_s {
433     int conncount; // total number of connections managed.
434     int depth; // temporary depth counter for io_head
435     bool transferred; // if beconn has been shipped to owner thread.
436     bool use_io_thread; // note if this backend is worker-local or not.
437     bool stacked; // if backend already queued for syscalls.
438     STAILQ_ENTRY(mcp_backend_s) beconn_next; // stack for connecting conns
439     STAILQ_ENTRY(mcp_backend_s) be_next; // stack for backends
440     io_head_t io_head; // stack of inbound requests.
441     char name[MAX_NAMELEN+1];
442     char port[MAX_PORTLEN+1];
443     char label[MAX_LABELLEN+1];
444     struct proxy_tunables tunables; // this gets copied a few times for speed.
445     struct mcp_backendconn_s be[];
446 };
447 typedef STAILQ_HEAD(be_head_s, mcp_backend_s) be_head_t;
448 typedef STAILQ_HEAD(beconn_head_s, mcp_backend_s) beconn_head_t;
449 
450 struct proxy_event_thread_s {
451     pthread_t thread_id;
452     struct event_base *base;
453     struct event notify_event; // listen event for the notify pipe/eventfd.
454     struct event beconn_event; // listener for backends in connect state
455 #ifdef HAVE_LIBURING
456     struct io_uring ring;
457     proxy_event_t ur_notify_event; // listen on eventfd.
458     proxy_event_t ur_benotify_event; // listen on eventfd for backend connections.
459     eventfd_t event_counter;
460     eventfd_t beevent_counter;
461     bool use_uring;
462 #endif
463 #ifdef PROXY_TLS
464     char *tls_wbuf;
465     size_t tls_wbuf_size;
466 #endif
467     pthread_mutex_t mutex; // covers stack.
468     pthread_cond_t cond; // condition to wait on while stack drains.
469     io_head_t io_head_in; // inbound requests to process.
470     be_head_t be_head; // stack of backends for processing.
471     beconn_head_t beconn_head_in; // stack of backends for connection processing.
472 #ifdef USE_EVENTFD
473     int event_fd; // for request ingestion
474     int be_event_fd; // for backend ingestion
475 #else
476     int notify_receive_fd;
477     int notify_send_fd;
478     int be_notify_receive_fd;
479     int be_notify_send_fd;
480 #endif
481     proxy_ctx_t *ctx; // main context.
482 };
483 
484 enum mcp_resp_mode {
485     RESP_MODE_NORMAL = 0,
486     RESP_MODE_NOREPLY,
487     RESP_MODE_METAQUIET
488 };
489 
490 #define RESP_CMD_MAX 8
491 typedef struct {
492     mcmc_resp_t resp;
493     mcmc_tokenizer_t tok; // optional tokenization of res
494     char *buf; // response line + potentially value.
495     mc_resp *cresp; // client mc_resp object during extstore fetches.
496     LIBEVENT_THREAD *thread; // cresp's owner thread needed for extstore cleanup.
497     unsigned int blen; // total size of the value to read.
498     struct timeval start; // time this object was created.
499     long elapsed; // time elapsed once handled.
500     int status; // status code from mcmc_read()
501     int bread; // amount of bytes read into value so far.
502     uint8_t cmd; // from parser (pr.command)
503     uint8_t extra; // ascii multiget hack for memory accounting. extra blen.
504     enum mcp_resp_mode mode; // reply mode (for noreply fixing)
505     char be_name[MAX_NAMELEN+1];
506     char be_port[MAX_PORTLEN+1];
507 } mcp_resp_t;
508 
509 // re-cast an io_pending_t into this more descriptive structure.
510 // the first few items _must_ match the original struct.
511 #define IO_PENDING_TYPE_PROXY 0
512 #define IO_PENDING_TYPE_EXTSTORE 1
513 struct _io_pending_proxy_t {
514     int io_queue_type;
515     LIBEVENT_THREAD *thread;
516     conn *c;
517     mc_resp *resp;
518     io_queue_cb return_cb; // called on worker thread.
519     io_queue_cb finalize_cb; // called back on the worker thread.
520     STAILQ_ENTRY(io_pending_t) iop_next; // queue chain.
521     // original struct ends here
522 
523     mcp_rcontext_t *rctx; // pointer to request context.
524     int queue_handle; // queue slot to return this result to
525     bool ascii_multiget; // passed on from mcp_r_t
526     uint8_t io_type; // extstore IO or backend IO
527     union {
528         // extstore IO.
529         struct {
530             obj_io eio;
531             item *hdr_it;
532             mc_resp *tresp; // temporary mc_resp for storage to fill.
533             int gettype;
534             int iovec_data;
535             bool miss;
536             bool badcrc;
537             bool active;
538         };
539         // backend request IO
540         struct {
541             // FIXME: use top level next chain
542             struct _io_pending_proxy_t *next; // stack for IO submission
543             STAILQ_ENTRY(_io_pending_proxy_t) io_next; // stack for backends
544             mcp_backend_t *backend; // backend server to request from
545             struct iovec iov[2]; // request string + tail buffer
546             int iovcnt; // 1 or 2...
547             unsigned int iovbytes; // total bytes in the iovec
548             mcp_resp_t *client_resp; // reference (currently pointing to a lua object)
549             bool flushed; // whether we've fully written this request to a backend.
550             bool background; // dummy IO for backgrounded awaits
551             bool qcount_incr; // HACK.
552         };
553     };
554 };
555 
556 struct mcp_globalobj_s {
557     pthread_mutex_t lock; // protects refcount/object.
558     STAILQ_ENTRY(mcp_globalobj_s) next;
559     int refcount;
560     int self_ref;
561 };
562 
563 // Note: does *be have to be a sub-struct? how stable are userdata pointers?
564 // https://stackoverflow.com/questions/38718475/lifetime-of-lua-userdata-pointers
565 // - says no.
566 typedef struct {
567     int ref; // luaL_ref reference of backend_wrap_t obj.
568     mcp_backend_t *be;
569 } mcp_pool_be_t;
570 
571 #define KEY_HASH_FILTER_MAX 5
572 typedef struct mcp_pool_s mcp_pool_t;
573 struct mcp_pool_s {
574     struct proxy_hash_caller phc;
575     key_hash_filter_func key_filter;
576     key_hash_func key_hasher;
577     proxy_ctx_t *ctx; // main context.
578     char key_filter_conf[KEY_HASH_FILTER_MAX+1];
579     struct mcp_globalobj_s g;
580     char beprefix[MAX_LABELLEN+1]; // TODO: should probably be shorter.
581     uint64_t hash_seed; // calculated from a string.
582     int pool_size;
583     int pool_be_total; // can be different from pool size for worker IO
584     int phc_ref;
585     bool use_iothread;
586     mcp_pool_be_t pool[];
587 };
588 
589 typedef struct {
590     mcp_pool_t *main; // ptr to original
591     mcp_pool_be_t *pool; // ptr to main->pool starting offset for owner thread.
592 } mcp_pool_proxy_t;
593 
594 // utils
595 bool proxy_bufmem_checkadd(LIBEVENT_THREAD *t, int len);
596 void mcp_sharedvm_delta(proxy_ctx_t *ctx, int tidx, const char *name, int delta);
597 void mcp_sharedvm_remove(proxy_ctx_t *ctx, int tidx, const char *name);
598 
599 void mcp_gobj_ref(lua_State *L, struct mcp_globalobj_s *g);
600 void mcp_gobj_unref(proxy_ctx_t *ctx, struct mcp_globalobj_s *g);
601 void mcp_gobj_finalize(struct mcp_globalobj_s *g);
602 
603 // networking interface
604 void proxy_init_event_thread(proxy_event_thread_t *t, proxy_ctx_t *ctx, struct event_base *base);
605 void *proxy_event_thread(void *arg);
606 void proxy_run_backend_queue(be_head_t *head);
607 struct mcp_backendconn_s *proxy_choose_beconn(mcp_backend_t *be);
608 mcp_resp_t *mcp_prep_resobj(lua_State *L, mcp_request_t *rq, mcp_backend_t *be, LIBEVENT_THREAD *t);
609 mcp_resp_t *mcp_prep_bare_resobj(lua_State *L, LIBEVENT_THREAD *t);
610 void mcp_resp_set_elapsed(mcp_resp_t *r);
611 io_pending_proxy_t *mcp_queue_rctx_io(mcp_rcontext_t *rctx, mcp_request_t *rq, mcp_backend_t *be, mcp_resp_t *r);
612 
613 // internal request interface
614 int mcplib_internal(lua_State *L);
615 int mcplib_internal_run(mcp_rcontext_t *rctx);
616 
617 // user stats interface
618 #define MAX_USTATS_DEFAULT 1024
619 int mcplib_add_stat(lua_State *L);
620 int mcplib_stat(lua_State *L);
621 size_t _process_request_next_key(mcp_parser_t *pr);
622 int process_request(mcp_parser_t *pr, const char *command, size_t cmdlen);
623 mcp_request_t *mcp_new_request(lua_State *L, mcp_parser_t *pr, const char *command, size_t cmdlen);
624 void mcp_set_request(mcp_parser_t *pr, mcp_request_t *r, const char *command, size_t cmdlen);
625 
626 // rate limit interfaces
627 int mcplib_ratelim_tbf(lua_State *L);
628 int mcplib_ratelim_tbf_call(lua_State *L);
629 int mcplib_ratelim_global_tbf(lua_State *L);
630 int mcplib_ratelim_proxy_tbf_call(lua_State *L);
631 int mcp_ratelim_proxy_tbf(lua_State *from, lua_State *to);
632 int mcplib_ratelim_global_tbf_gc(lua_State *L);
633 int mcplib_ratelim_proxy_tbf_gc(lua_State *L);
634 
635 // request function generator interface
636 void proxy_return_rctx_cb(io_pending_t *pending);
637 void proxy_finalize_rctx_cb(io_pending_t *pending);
638 
639 enum mcp_rqueue_e {
640     QWAIT_IDLE = 0,
641     QWAIT_ANY,
642     QWAIT_OK,
643     QWAIT_GOOD,
644     QWAIT_FASTGOOD,
645     QWAIT_HANDLE,
646     QWAIT_SLEEP,
647 };
648 
649 #define FGEN_NAME_MAXLEN 80
650 struct mcp_funcgen_s {
651     LIBEVENT_THREAD *thread; // worker thread that created this funcgen.
652     int generator_ref; // reference to the generator function.
653     int self_ref; // self-reference if we're attached anywhere
654     int argument_ref; // reference to an argument to pass to generator
655     int max_queues; // how many queue slots rctx's have
656     unsigned int refcount; // reference counter
657     unsigned int total; // total contexts managed
658     unsigned int free; // free contexts
659     unsigned int free_max; // size of list below.
660     unsigned int free_pressure; // "pressure" for when to early release rctx
661     bool closed; // the hook holding this fgen has been replaced
662     bool ready; // if we're locked down or not.
663     bool is_router; // if this fgen is actually a router object.
664     struct timespec free_waiter; // must be "too free" for this much time
665     mcp_rcontext_t **list;
666     struct mcp_rqueue_s *queue_list;
667     char name[FGEN_NAME_MAXLEN+1]; // string name for the generator.
668 };
669 
670 enum mcp_funcgen_router_e {
671     FGEN_ROUTER_NONE = 0,
672     FGEN_ROUTER_CMDMAP,
673     FGEN_ROUTER_SHORTSEP,
674     FGEN_ROUTER_LONGSEP,
675     FGEN_ROUTER_ANCHORSM,
676     FGEN_ROUTER_ANCHORBIG,
677 };
678 
679 struct mcp_router_long_s {
680     char start[KEY_HASH_FILTER_MAX+1];
681     char stop[KEY_HASH_FILTER_MAX+1];
682 };
683 
684 // To simplify the attach/start code we wrap a funcgen with the router
685 // structure. This allows us to have a larger router structure without
686 // bloating the fgen object itself, and still benefit from letting funcgen
687 // new/cleanup handle most of the memory management.
688 struct mcp_funcgen_router {
689     mcp_funcgen_t fgen_self;
690     enum mcp_funcgen_router_e type;
691     union {
692         char sep;
693         char lsep[KEY_HASH_FILTER_MAX+1];
694         char anchorsm[2]; // short anchored mode.
695         struct mcp_router_long_s big;
696     } conf;
697     int map_ref;
698     mcp_funcgen_t *def_fgen; // default route
699     mcp_funcgen_t *cmap[CMD_END_STORAGE]; // fallback command map
700 };
701 
702 #define RQUEUE_TYPE_NONE 0
703 #define RQUEUE_TYPE_POOL 1
704 #define RQUEUE_TYPE_FGEN 2
705 #define RQUEUE_ASSIGNED (1<<0)
706 #define RQUEUE_R_RESUME (1<<1)
707 #define RQUEUE_R_GOOD (1<<3)
708 #define RQUEUE_R_OK (1<<4)
709 #define RQUEUE_R_ANY (1<<5)
710 #define RQUEUE_R_ERROR (1<<7)
711 
712 enum mcp_rqueue_state {
713     RQUEUE_IDLE = 0,
714     RQUEUE_QUEUED,
715     RQUEUE_ACTIVE,
716     RQUEUE_COMPLETE,
717     RQUEUE_WAITED
718 };
719 
720 struct mcp_rqueue_s {
721     int obj_ref; // reference to pool/func/etc object
722     int cb_ref; // if a lua callback was specified
723     int req_ref; // reference to associated request object.
724     int res_ref; // reference to lua response object.
725     void *obj; // direct pointer to the object for fast access.
726     mcp_request_t *rq; // request set to this slot
727     mcp_resp_t *res_obj; // pointer to result object
728     enum mcp_rqueue_state state; // queued/active/etc
729     uint8_t obj_type; // what the obj_ref actually is.
730     uint8_t flags; // bit flags for various states
731 };
732 
733 struct mcp_rcontext_s {
734     int self_ref; // reference to our own object
735     int request_ref; // top level request for this context.
736     int function_ref; // ref to the created route function.
737     int coroutine_ref; // ref to our encompassing coroutine.
738     unsigned int async_pending; // legacy async handling
739     int pending_reqs; // pending requests and sub-requests
740     unsigned int wait_count;
741     unsigned int wait_done; // TODO: change these variables to uint8's
742     int wait_handle; // waiting on a specific queue slot
743     int parent_handle; // queue slot in parent rctx
744     int conn_fd; // fd of the originating client, as *c can become invalid
745     enum mcp_rqueue_e wait_mode;
746     uint8_t lua_narg; // number of responses to push when yield resuming.
747     bool first_queue; // HACK
748     lua_State *Lc; // coroutine thread pointer.
749     mcp_request_t *request; // ptr to the above reference.
750     mcp_rcontext_t *parent; // parent rctx in the call graph
751     conn *c; // associated client object.
752     mc_resp *resp; // top level response object to fill.
753     mcp_funcgen_t *fgen; // parent function generator context.
754     struct event timeout_event; // for *_wait_timeout() and sleep() calls
755     struct mcp_rqueue_s qslots[]; // queueable slots.
756 };
757 
758 #define mcp_is_flag_invalid(f) (f < 65 || f > 122)
759 
760 void mcp_run_rcontext_handle(mcp_rcontext_t *rctx, int handle);
761 void mcp_process_rctx_wait(mcp_rcontext_t *rctx, int handle);
762 int mcp_process_rqueue_return(mcp_rcontext_t *rctx, int handle, mcp_resp_t *res);
763 int mcplib_rcontext_handle_set_cb(lua_State *L);
764 int mcplib_rcontext_enqueue(lua_State *L);
765 int mcplib_rcontext_wait_cond(lua_State *L);
766 int mcplib_rcontext_wait_handle(lua_State *L);
767 int mcplib_rcontext_enqueue_and_wait(lua_State *L);
768 int mcplib_rcontext_res_good(lua_State *L);
769 int mcplib_rcontext_res_any(lua_State *L);
770 int mcplib_rcontext_res_ok(lua_State *L);
771 int mcplib_rcontext_result(lua_State *L);
772 int mcplib_rcontext_cfd(lua_State *L);
773 int mcplib_rcontext_tls_peer_cn(lua_State *L);
774 int mcplib_rcontext_request_new(lua_State *L);
775 int mcplib_rcontext_response_new(lua_State *L);
776 int mcplib_rcontext_sleep(lua_State *L);
777 int mcplib_funcgenbare_new(lua_State *L);
778 int mcplib_funcgen_new(lua_State *L);
779 int mcplib_funcgen_new_handle(lua_State *L);
780 int mcplib_funcgen_ready(lua_State *L);
781 int mcplib_router_new(lua_State *L);
782 mcp_rcontext_t *mcp_funcgen_start(lua_State *L, mcp_funcgen_t *fgen, mcp_parser_t *pr);
783 mcp_rcontext_t *mcp_funcgen_get_rctx(lua_State *L, int fgen_ref, mcp_funcgen_t *fgen);
784 void mcp_funcgen_return_rctx(mcp_rcontext_t *rctx);
785 int mcplib_funcgen_gc(lua_State *L);
786 void mcp_funcgen_reference(lua_State *L);
787 void mcp_funcgen_dereference(lua_State *L, mcp_funcgen_t *fgen);
788 void mcp_rcontext_push_rqu_res(lua_State *L, mcp_rcontext_t *rctx, int handle);
789 
790 
791 int mcplib_factory_command_new(lua_State *L);
792 
793 // request interface
794 int mcplib_request(lua_State *L);
795 int mcplib_request_command(lua_State *L);
796 int mcplib_request_key(lua_State *L);
797 int mcplib_request_ltrimkey(lua_State *L);
798 int mcplib_request_rtrimkey(lua_State *L);
799 int mcplib_request_token(lua_State *L);
800 int mcplib_request_token_int(lua_State *L);
801 int mcplib_request_ntokens(lua_State *L);
802 int mcplib_request_has_flag(lua_State *L);
803 int mcplib_request_flag_token(lua_State *L);
804 int mcplib_request_flag_token_int(lua_State *L);
805 int mcplib_request_flag_add(lua_State *L);
806 int mcplib_request_flag_set(lua_State *L);
807 int mcplib_request_flag_replace(lua_State *L);
808 int mcplib_request_flag_del(lua_State *L);
809 int mcplib_request_gc(lua_State *L);
810 int mcplib_request_match_res(lua_State *L);
811 void mcp_request_cleanup(LIBEVENT_THREAD *t, mcp_request_t *rq);
812 
813 // response interface
814 int mcplib_response_elapsed(lua_State *L);
815 int mcplib_response_ok(lua_State *L);
816 int mcplib_response_hit(lua_State *L);
817 int mcplib_response_vlen(lua_State *L);
818 int mcplib_response_code(lua_State *L);
819 int mcplib_response_line(lua_State *L);
820 int mcplib_response_flag_blank(lua_State *L);
821 
822 // inspector interface
823 int mcplib_req_inspector_new(lua_State *L);
824 int mcplib_res_inspector_new(lua_State *L);
825 int mcplib_inspector_gc(lua_State *L);
826 int mcplib_inspector_call(lua_State *L);
827 
828 // mutator interface
829 int mcplib_req_mutator_new(lua_State *L);
830 int mcplib_res_mutator_new(lua_State *L);
831 int mcplib_mutator_gc(lua_State *L);
832 int mcplib_mutator_call(lua_State *L);
833 
834 void mcp_response_cleanup(LIBEVENT_THREAD *t, mcp_resp_t *r);
835 void mcp_set_resobj(mcp_resp_t *r, mcp_request_t *rq, mcp_backend_t *be, LIBEVENT_THREAD *t);
836 int mcplib_response_gc(lua_State *L);
837 int mcplib_response_close(lua_State *L);
838 
839 int mcplib_open_dist_jump_hash(lua_State *L);
840 int mcplib_open_dist_ring_hash(lua_State *L);
841 
842 int proxy_run_rcontext(mcp_rcontext_t *rctx);
843 mcp_backend_t *mcplib_pool_proxy_call_helper(mcp_pool_proxy_t *pp, const char *key, size_t len);
844 void mcp_request_attach(mcp_request_t *rq, io_pending_proxy_t *p);
845 int mcp_request_render(mcp_request_t *rq, int idx, char flag, const char *tok, size_t len);
846 int mcp_request_append(mcp_request_t *rq, const char flag, const char *tok, size_t len);
847 int mcp_request_find_flag_index(mcp_request_t *rq, const char flag);
848 int mcp_request_find_flag_token(mcp_request_t *rq, const char flag, const char **token, size_t *len);
849 int mcp_request_find_flag_tokenint64(mcp_request_t *rq, const char flag, int64_t *token);
850 void proxy_lua_error(lua_State *L, const char *s);
851 #define proxy_lua_ferror(L, fmt, ...) \
852     do { \
853         lua_pushfstring(L, fmt, __VA_ARGS__); \
854         lua_error(L); \
855     } while (0)
856 
857 #define PROXY_SERVER_ERROR "SERVER_ERROR "
858 #define PROXY_CLIENT_ERROR "CLIENT_ERROR "
859 void proxy_out_errstring(mc_resp *resp, char *type, const char *str);
860 int _start_proxy_config_threads(proxy_ctx_t *ctx);
861 int proxy_thread_loadconf(proxy_ctx_t *ctx, LIBEVENT_THREAD *thr);
862 
863 // TODO (v2): more .h files, perhaps?
864 int mcplib_open_hash_xxhash(lua_State *L);
865 
866 __attribute__((unused)) void dump_stack(lua_State *L, const char *msg);
867 __attribute__((unused)) void dump_registry(lua_State *L, const char *msg);
868 __attribute__((unused)) void dump_funcgen(lua_State *L, const char *name, const char *msg);
869 __attribute__((unused)) void dump_pools(lua_State *L, const char *msg);
870 #endif
871