1 #ifndef PROXY_H 2 #define PROXY_H 3 4 #include "memcached.h" 5 #include "extstore.h" 6 #include <string.h> 7 #include <stdlib.h> 8 #include <ctype.h> 9 #include <errno.h> 10 11 #include <lua.h> 12 #include <lualib.h> 13 #include <lauxlib.h> 14 15 #include "config.h" 16 17 #if defined(__linux__) 18 #define USE_EVENTFD 1 19 #include <sys/eventfd.h> 20 #endif 21 22 #ifdef HAVE_LIBURING 23 #include <liburing.h> 24 #include <poll.h> // POLLOUT for liburing. 25 #define PRING_QUEUE_SQ_ENTRIES 2048 26 #define PRING_QUEUE_CQ_ENTRIES 16384 27 #endif 28 29 #include "proto_proxy.h" 30 #include "proto_text.h" 31 #include "queue.h" 32 #define XXH_INLINE_ALL // modifier for xxh3's include below 33 #include "xxhash.h" 34 35 #ifdef PROXY_DEBUG 36 #define P_DEBUG(...) \ 37 do { \ 38 fprintf(stderr, __VA_ARGS__); \ 39 } while (0) 40 #else 41 #define P_DEBUG(...) 42 #endif 43 44 #define WSTAT_L(t) pthread_mutex_lock(&t->stats.mutex); 45 #define WSTAT_UL(t) pthread_mutex_unlock(&t->stats.mutex); 46 #define WSTAT_INCR(t, stat, amount) { \ 47 pthread_mutex_lock(&t->stats.mutex); \ 48 t->stats.stat += amount; \ 49 pthread_mutex_unlock(&t->stats.mutex); \ 50 } 51 #define WSTAT_DECR(t, stat, amount) { \ 52 pthread_mutex_lock(&t->stats.mutex); \ 53 t->stats.stat -= amount; \ 54 pthread_mutex_unlock(&t->stats.mutex); \ 55 } 56 #define STAT_L(ctx) pthread_mutex_lock(&ctx->stats_lock); 57 #define STAT_UL(ctx) pthread_mutex_unlock(&ctx->stats_lock); 58 #define STAT_INCR(ctx, stat, amount) { \ 59 pthread_mutex_lock(&ctx->stats_lock); \ 60 ctx->global_stats.stat += amount; \ 61 pthread_mutex_unlock(&ctx->stats_lock); \ 62 } 63 64 #define STAT_DECR(ctx, stat, amount) { \ 65 pthread_mutex_lock(&ctx->stats_lock); \ 66 ctx->global_stats.stat -= amount; \ 67 pthread_mutex_unlock(&ctx->stats_lock); \ 68 } 69 70 // FIXME (v2): do include dir properly. 71 #include "vendor/mcmc/mcmc.h" 72 73 enum mcp_memprofile_types { 74 mcp_memp_free = 0, 75 mcp_memp_string, 76 mcp_memp_table, 77 mcp_memp_func, 78 mcp_memp_userdata, 79 mcp_memp_thread, 80 mcp_memp_default, 81 mcp_memp_realloc, 82 }; 83 84 struct mcp_memprofile { 85 struct timespec last_status; // for per-second prints on status 86 int id; 87 uint64_t allocs[8]; 88 uint64_t alloc_bytes[8]; 89 }; 90 91 // for various time conversion functions 92 #define NANOSECONDS(x) ((x) * 1E9 + 0.5) 93 #define MICROSECONDS(x) ((x) * 1E6 + 0.5) 94 95 // Note: value created from thin air. Could be shorter. 96 #define MCP_REQUEST_MAXLEN KEY_MAX_LENGTH * 2 97 98 #define ENDSTR "END\r\n" 99 #define ENDLEN sizeof(ENDSTR)-1 100 101 #define MCP_BACKEND_UPVALUE 1 102 103 #define MCP_YIELD_INTERNAL 1 104 #define MCP_YIELD_WAITCOND 2 105 #define MCP_YIELD_WAITHANDLE 3 106 #define MCP_YIELD_SLEEP 4 107 108 #define SHAREDVM_FGEN_IDX 1 109 #define SHAREDVM_FGENSLOT_IDX 2 110 #define SHAREDVM_BACKEND_IDX 3 111 112 // all possible commands. 113 #define CMD_FIELDS \ 114 X(CMD_MG) \ 115 X(CMD_MS) \ 116 X(CMD_MD) \ 117 X(CMD_MN) \ 118 X(CMD_MA) \ 119 X(CMD_ME) \ 120 X(CMD_GET) \ 121 X(CMD_GAT) \ 122 X(CMD_SET) \ 123 X(CMD_ADD) \ 124 X(CMD_CAS) \ 125 X(CMD_GETS) \ 126 X(CMD_GATS) \ 127 X(CMD_INCR) \ 128 X(CMD_DECR) \ 129 X(CMD_TOUCH) \ 130 X(CMD_APPEND) \ 131 X(CMD_DELETE) \ 132 X(CMD_REPLACE) \ 133 X(CMD_PREPEND) \ 134 X(CMD_END_STORAGE) \ 135 X(CMD_QUIT) \ 136 X(CMD_STATS) \ 137 X(CMD_SLABS) \ 138 X(CMD_WATCH) \ 139 X(CMD_LRU) \ 140 X(CMD_VERSION) \ 141 X(CMD_SHUTDOWN) \ 142 X(CMD_EXTSTORE) \ 143 X(CMD_FLUSH_ALL) \ 144 X(CMD_VERBOSITY) \ 145 X(CMD_LRU_CRAWLER) \ 146 X(CMD_REFRESH_CERTS) \ 147 X(CMD_CACHE_MEMLIMIT) 148 149 #define X(name) name, 150 enum proxy_defines { 151 P_OK = 0, 152 CMD_FIELDS 153 CMD_SIZE, // used to define array size for command hooks. 154 CMD_ANY, // override _all_ commands 155 CMD_ANY_STORAGE, // override commands specific to key storage. 156 CMD_FINAL, // end cap for convenience. 157 }; 158 #undef X 159 160 // certain classes of ascii commands have similar parsing (ie; 161 // get/gets/gat/gats). Use types so we don't have to test a ton of them. 162 enum proxy_cmd_types { 163 CMD_TYPE_GENERIC = 0, 164 CMD_TYPE_GET, // get/gets/gat/gats 165 CMD_TYPE_META, // m*'s. 166 }; 167 168 typedef struct _io_pending_proxy_t io_pending_proxy_t; 169 typedef struct proxy_event_thread_s proxy_event_thread_t; 170 171 #ifdef HAVE_LIBURING 172 // TODO: pass in cqe->res instead of cqe? 173 typedef void (*proxy_event_cb)(void *udata, struct io_uring_cqe *cqe); 174 typedef struct { 175 void *udata; 176 proxy_event_cb cb; 177 bool set; // NOTE: not sure if necessary if code structured properly 178 } proxy_event_t; 179 180 void *proxy_event_thread_ur(void *arg); 181 #endif 182 183 // Note: This ends up wasting a few counters, but simplifies the rest of the 184 // process for handling internal worker stats. 185 struct proxy_int_stats { 186 uint64_t vm_gc_runs; 187 uint64_t vm_memory_kb; 188 uint64_t counters[CMD_FINAL]; 189 }; 190 191 struct proxy_user_stats { 192 int num_stats; // number of stats, for sizing various arrays 193 uint64_t *counters; // array of counters. 194 }; 195 196 struct proxy_user_stats_entry { 197 char *name; 198 unsigned int cname; // offset into compact name buffer 199 bool reset; // counter must reset this cycle 200 }; 201 202 struct proxy_global_stats { 203 uint64_t config_reloads; 204 uint64_t config_reload_fails; 205 uint64_t config_cron_runs; 206 uint64_t config_cron_fails; 207 uint64_t backend_total; 208 uint64_t backend_marked_bad; // backend set to autofail 209 uint64_t backend_failed; // an error caused a backend reset 210 uint64_t request_failed_depth; // requests fast-failed due to be depth 211 }; 212 213 struct proxy_tunables { 214 struct timeval connect; 215 struct timeval retry; // wait time before retrying a dead backend 216 struct timeval read; 217 struct timeval flap; // need to stay connected this long or it's flapping 218 float flap_backoff_ramp; // factorial for retry time 219 uint32_t flap_backoff_max; // don't backoff longer than this. 220 int backend_depth_limit; // requests fast fail once depth over this limit 221 int backend_failure_limit; 222 int max_ustats; // limit the ustats index. 223 bool tcp_keepalive; 224 bool use_iothread; // default for using the bg io thread. 225 bool use_tls; // whether or not be should use TLS 226 bool down; // backend is forced into a down/bad state. 227 }; 228 229 typedef STAILQ_HEAD(globalobj_head_s, mcp_globalobj_s) globalobj_head_t; 230 typedef struct { 231 lua_State *proxy_state; // main configuration vm 232 proxy_event_thread_t *proxy_io_thread; 233 uint64_t active_req_limit; // max total in-flight requests 234 uint64_t buffer_memory_limit; // max bytes for send/receive buffers. 235 #ifdef PROXY_TLS 236 void *tls_ctx; 237 #endif 238 int user_stats_num; // highest seen stat index 239 struct proxy_user_stats_entry *user_stats; 240 char *user_stats_namebuf; // compact linear buffer for stat names 241 struct proxy_tunables tunables; // NOTE: updates covered by stats_lock 242 struct proxy_global_stats global_stats; 243 // less frequently used entries down here. 244 void *proxy_code; 245 lua_State *proxy_sharedvm; // sub VM for short-lock global events/data 246 pthread_mutex_t stats_lock; // used for rare global counters 247 pthread_mutex_t config_lock; 248 pthread_cond_t config_cond; 249 pthread_t config_tid; 250 pthread_mutex_t worker_lock; 251 pthread_cond_t worker_cond; 252 pthread_t manager_tid; // deallocation management thread 253 pthread_mutex_t manager_lock; 254 pthread_cond_t manager_cond; 255 pthread_mutex_t sharedvm_lock; // protect statevm above 256 globalobj_head_t manager_head; // stack for pool deallocation. 257 int config_generation; // counter tracking config reloads 258 int cron_ref; // reference to lua cron table 259 int cron_next; // next cron to sleep to / execute 260 bool worker_done; // signal variable for the worker lock/cond system. 261 bool worker_failed; // covered by worker_lock as well. 262 bool use_uring; // use IO_URING for backend connections. 263 bool loading; // bool indicating an active config load. 264 bool memprofile; // indicate if we want to profile lua memory. 265 uint8_t memprofile_thread_counter; 266 } proxy_ctx_t; 267 268 #define PROXY_GET_THR_CTX(L) ((*(LIBEVENT_THREAD **)lua_getextraspace(L))->proxy_ctx) 269 #define PROXY_GET_THR(L) (*(LIBEVENT_THREAD **)lua_getextraspace(L)) 270 // Operations from the config VM don't have a libevent thread. 271 #define PROXY_GET_CTX(L) (*(proxy_ctx_t **)lua_getextraspace(L)) 272 273 struct proxy_hook_ref { 274 int lua_ref; 275 void *ctx; // if we're a generator based function. 276 }; 277 278 struct proxy_hook_tagged { 279 uint64_t tag; 280 struct proxy_hook_ref ref; 281 }; 282 283 struct proxy_hook { 284 struct proxy_hook_ref ref; 285 int tagcount; 286 struct proxy_hook_tagged *tagged; // array of possible tagged hooks. 287 }; 288 289 // TODO (v2): some hash functions (crc?) might require initializers. If we run into 290 // any the interface might need expanding. 291 typedef uint64_t (*key_hash_func)(const void *key, size_t len, uint64_t seed); 292 struct proxy_hash_func { 293 key_hash_func func; 294 }; 295 typedef const char *(*key_hash_filter_func)(const char *conf, const char *key, size_t klen, size_t *newlen); 296 typedef uint32_t (*hash_selector_func)(uint64_t hash, void *ctx); 297 struct proxy_hash_caller { 298 hash_selector_func selector_func; 299 void *ctx; 300 }; 301 302 enum mcp_backend_states { 303 mcp_backend_read = 0, // waiting to read any response 304 mcp_backend_parse, // have some buffered data to check 305 mcp_backend_read_end, // looking for an "END" marker for GET 306 mcp_backend_want_read, // read more data to complete command 307 mcp_backend_next, // advance to the next IO 308 mcp_backend_next_close, // complete current request, then close socket 309 }; 310 311 typedef struct mcp_cron_s mcp_cron_t; 312 typedef struct mcp_backend_wrap_s mcp_backend_wrap_t; 313 typedef struct mcp_backend_label_s mcp_backend_label_t; 314 typedef struct mcp_backend_s mcp_backend_t; 315 typedef struct mcp_request_s mcp_request_t; 316 typedef struct mcp_parser_s mcp_parser_t; 317 typedef struct mcp_rcontext_s mcp_rcontext_t; 318 typedef struct mcp_funcgen_s mcp_funcgen_t; 319 320 #define PARSER_MAX_TOKENS 24 321 322 struct mcp_parser_meta_s { 323 uint64_t flags; 324 }; 325 326 // Note that we must use offsets into request for tokens, 327 // as *request can change between parsing and later accessors. 328 struct mcp_parser_s { 329 const char *request; 330 void *vbuf; // temporary buffer for holding value lengths. 331 uint8_t command; 332 uint8_t cmd_type; // command class. 333 uint8_t ntokens; 334 uint8_t keytoken; // because GAT. sigh. also cmds without a key. 335 uint32_t parsed; // how far into the request we parsed already 336 uint32_t reqlen; // full length of request buffer. 337 uint32_t endlen; // index to the start of \r\n or \n 338 int vlen; 339 uint32_t klen; // length of key. 340 uint16_t tokens[PARSER_MAX_TOKENS]; // offsets for start of each token 341 bool has_space; // a space was found after the last byte parsed. 342 bool noreply; // if quiet/noreply mode is set. 343 union { 344 struct mcp_parser_meta_s meta; 345 } t; 346 }; 347 348 #define MCP_PARSER_KEY(pr) (&pr.request[pr.tokens[pr.keytoken]]) 349 350 #define MAX_REQ_TOKENS 2 351 struct mcp_request_s { 352 mcp_parser_t pr; // non-lua-specific parser handling. 353 bool ascii_multiget; // ascii multiget mode. (hide errors/END) 354 char request[]; 355 }; 356 357 struct mcp_cron_s { 358 uint32_t gen; 359 uint32_t next; 360 uint32_t every; 361 bool repeat; 362 }; 363 364 typedef STAILQ_HEAD(io_head_s, _io_pending_proxy_t) io_head_t; 365 #define MAX_LABELLEN 512 366 #define MAX_NAMELEN 255 367 #define MAX_PORTLEN 6 368 // TODO (v2): IOV_MAX tends to be 1000+ which would allow for more batching but we 369 // don't have a good temporary space and don't want to malloc/free on every 370 // write. transmit() uses the stack but we can't do that for uring's use case. 371 #if MEMCACHED_DEBUG 372 #define BE_IOV_MAX 128 // let bench tests trigger max condition easily 373 #elif (IOV_MAX > 1024) 374 #define BE_IOV_MAX 1024 375 #else 376 #define BE_IOV_MAX IOV_MAX 377 #endif 378 // lua descriptor object: passed to pools, which create wrappers. 379 struct mcp_backend_label_s { 380 char name[MAX_NAMELEN+1]; 381 char port[MAX_PORTLEN+1]; 382 char label[MAX_LABELLEN+1]; 383 size_t llen; // cache label length for small speedup in pool creation. 384 int conncount; // number of sockets to make. 385 struct proxy_tunables tunables; 386 }; 387 388 // lua object wrapper meant to own a malloc'ed conn structure 389 // when this object is created, it ships its connection to the real owner 390 // (worker, IO thread, etc) 391 // when this object is garbage collected, it ships a notice to the owner 392 // thread to stop using and free the backend conn memory. 393 struct mcp_backend_wrap_s { 394 mcp_backend_t *be; 395 }; 396 397 struct mcp_backendconn_s { 398 mcp_backend_t *be_parent; // find the wrapper. 399 int self; // our index into the parent array. 400 int depth; // total number of requests in queue 401 int pending_read; // number of requests written to socket, pending read. 402 int failed_count; // number of fails (timeouts) in a row 403 int flap_count; // number of times we've "flapped" into bad state. 404 proxy_event_thread_t *event_thread; // event thread owning this backend. 405 void *client; // mcmc client 406 #ifdef PROXY_TLS 407 void *ssl; 408 #endif 409 io_head_t io_head; // stack of requests. 410 io_pending_proxy_t *io_next; // next request to write. 411 char *rbuf; // statically allocated read buffer. 412 size_t rbufused; // currently active bytes in the buffer 413 struct event main_event; // libevent: changes role, mostly for main read events 414 struct event write_event; // libevent: only used when socket wbuf full 415 struct event timeout_event; // libevent: alarm for pending reads 416 struct proxy_tunables tunables; 417 struct timeval last_failed; // time the backend was last reset. 418 enum mcp_backend_states state; // readback state machine 419 int connect_flags; // flags to pass to mcmc_connect 420 bool connecting; // in the process of an asynch connection. 421 bool validating; // in process of validating a new backend connection. 422 bool can_write; // recently got a WANT_WRITE or are connecting. 423 bool bad; // timed out, marked as bad. 424 #ifndef PROXY_TLS 425 bool ssl; 426 #endif 427 struct iovec write_iovs[BE_IOV_MAX]; // iovs to stage batched writes 428 }; 429 430 // TODO: move depth and flags to a second top level array so we can make index 431 // decisions from fewer memory stalls. 432 struct mcp_backend_s { 433 int conncount; // total number of connections managed. 434 int depth; // temporary depth counter for io_head 435 bool transferred; // if beconn has been shipped to owner thread. 436 bool use_io_thread; // note if this backend is worker-local or not. 437 bool stacked; // if backend already queued for syscalls. 438 STAILQ_ENTRY(mcp_backend_s) beconn_next; // stack for connecting conns 439 STAILQ_ENTRY(mcp_backend_s) be_next; // stack for backends 440 io_head_t io_head; // stack of inbound requests. 441 char name[MAX_NAMELEN+1]; 442 char port[MAX_PORTLEN+1]; 443 char label[MAX_LABELLEN+1]; 444 struct proxy_tunables tunables; // this gets copied a few times for speed. 445 struct mcp_backendconn_s be[]; 446 }; 447 typedef STAILQ_HEAD(be_head_s, mcp_backend_s) be_head_t; 448 typedef STAILQ_HEAD(beconn_head_s, mcp_backend_s) beconn_head_t; 449 450 struct proxy_event_thread_s { 451 pthread_t thread_id; 452 struct event_base *base; 453 struct event notify_event; // listen event for the notify pipe/eventfd. 454 struct event beconn_event; // listener for backends in connect state 455 #ifdef HAVE_LIBURING 456 struct io_uring ring; 457 proxy_event_t ur_notify_event; // listen on eventfd. 458 proxy_event_t ur_benotify_event; // listen on eventfd for backend connections. 459 eventfd_t event_counter; 460 eventfd_t beevent_counter; 461 bool use_uring; 462 #endif 463 #ifdef PROXY_TLS 464 char *tls_wbuf; 465 size_t tls_wbuf_size; 466 #endif 467 pthread_mutex_t mutex; // covers stack. 468 pthread_cond_t cond; // condition to wait on while stack drains. 469 io_head_t io_head_in; // inbound requests to process. 470 be_head_t be_head; // stack of backends for processing. 471 beconn_head_t beconn_head_in; // stack of backends for connection processing. 472 #ifdef USE_EVENTFD 473 int event_fd; // for request ingestion 474 int be_event_fd; // for backend ingestion 475 #else 476 int notify_receive_fd; 477 int notify_send_fd; 478 int be_notify_receive_fd; 479 int be_notify_send_fd; 480 #endif 481 proxy_ctx_t *ctx; // main context. 482 }; 483 484 enum mcp_resp_mode { 485 RESP_MODE_NORMAL = 0, 486 RESP_MODE_NOREPLY, 487 RESP_MODE_METAQUIET 488 }; 489 490 #define RESP_CMD_MAX 8 491 typedef struct { 492 mcmc_resp_t resp; 493 mcmc_tokenizer_t tok; // optional tokenization of res 494 char *buf; // response line + potentially value. 495 mc_resp *cresp; // client mc_resp object during extstore fetches. 496 LIBEVENT_THREAD *thread; // cresp's owner thread needed for extstore cleanup. 497 unsigned int blen; // total size of the value to read. 498 struct timeval start; // time this object was created. 499 long elapsed; // time elapsed once handled. 500 int status; // status code from mcmc_read() 501 int bread; // amount of bytes read into value so far. 502 uint8_t cmd; // from parser (pr.command) 503 uint8_t extra; // ascii multiget hack for memory accounting. extra blen. 504 enum mcp_resp_mode mode; // reply mode (for noreply fixing) 505 char be_name[MAX_NAMELEN+1]; 506 char be_port[MAX_PORTLEN+1]; 507 } mcp_resp_t; 508 509 // re-cast an io_pending_t into this more descriptive structure. 510 // the first few items _must_ match the original struct. 511 #define IO_PENDING_TYPE_PROXY 0 512 #define IO_PENDING_TYPE_EXTSTORE 1 513 struct _io_pending_proxy_t { 514 int io_queue_type; 515 LIBEVENT_THREAD *thread; 516 conn *c; 517 mc_resp *resp; 518 io_queue_cb return_cb; // called on worker thread. 519 io_queue_cb finalize_cb; // called back on the worker thread. 520 STAILQ_ENTRY(io_pending_t) iop_next; // queue chain. 521 // original struct ends here 522 523 mcp_rcontext_t *rctx; // pointer to request context. 524 int queue_handle; // queue slot to return this result to 525 bool ascii_multiget; // passed on from mcp_r_t 526 uint8_t io_type; // extstore IO or backend IO 527 union { 528 // extstore IO. 529 struct { 530 obj_io eio; 531 item *hdr_it; 532 mc_resp *tresp; // temporary mc_resp for storage to fill. 533 int gettype; 534 int iovec_data; 535 bool miss; 536 bool badcrc; 537 bool active; 538 }; 539 // backend request IO 540 struct { 541 // FIXME: use top level next chain 542 struct _io_pending_proxy_t *next; // stack for IO submission 543 STAILQ_ENTRY(_io_pending_proxy_t) io_next; // stack for backends 544 mcp_backend_t *backend; // backend server to request from 545 struct iovec iov[2]; // request string + tail buffer 546 int iovcnt; // 1 or 2... 547 unsigned int iovbytes; // total bytes in the iovec 548 mcp_resp_t *client_resp; // reference (currently pointing to a lua object) 549 bool flushed; // whether we've fully written this request to a backend. 550 bool background; // dummy IO for backgrounded awaits 551 bool qcount_incr; // HACK. 552 }; 553 }; 554 }; 555 556 struct mcp_globalobj_s { 557 pthread_mutex_t lock; // protects refcount/object. 558 STAILQ_ENTRY(mcp_globalobj_s) next; 559 int refcount; 560 int self_ref; 561 }; 562 563 // Note: does *be have to be a sub-struct? how stable are userdata pointers? 564 // https://stackoverflow.com/questions/38718475/lifetime-of-lua-userdata-pointers 565 // - says no. 566 typedef struct { 567 int ref; // luaL_ref reference of backend_wrap_t obj. 568 mcp_backend_t *be; 569 } mcp_pool_be_t; 570 571 #define KEY_HASH_FILTER_MAX 5 572 typedef struct mcp_pool_s mcp_pool_t; 573 struct mcp_pool_s { 574 struct proxy_hash_caller phc; 575 key_hash_filter_func key_filter; 576 key_hash_func key_hasher; 577 proxy_ctx_t *ctx; // main context. 578 char key_filter_conf[KEY_HASH_FILTER_MAX+1]; 579 struct mcp_globalobj_s g; 580 char beprefix[MAX_LABELLEN+1]; // TODO: should probably be shorter. 581 uint64_t hash_seed; // calculated from a string. 582 int pool_size; 583 int pool_be_total; // can be different from pool size for worker IO 584 int phc_ref; 585 bool use_iothread; 586 mcp_pool_be_t pool[]; 587 }; 588 589 typedef struct { 590 mcp_pool_t *main; // ptr to original 591 mcp_pool_be_t *pool; // ptr to main->pool starting offset for owner thread. 592 } mcp_pool_proxy_t; 593 594 // utils 595 bool proxy_bufmem_checkadd(LIBEVENT_THREAD *t, int len); 596 void mcp_sharedvm_delta(proxy_ctx_t *ctx, int tidx, const char *name, int delta); 597 void mcp_sharedvm_remove(proxy_ctx_t *ctx, int tidx, const char *name); 598 599 void mcp_gobj_ref(lua_State *L, struct mcp_globalobj_s *g); 600 void mcp_gobj_unref(proxy_ctx_t *ctx, struct mcp_globalobj_s *g); 601 void mcp_gobj_finalize(struct mcp_globalobj_s *g); 602 603 // networking interface 604 void proxy_init_event_thread(proxy_event_thread_t *t, proxy_ctx_t *ctx, struct event_base *base); 605 void *proxy_event_thread(void *arg); 606 void proxy_run_backend_queue(be_head_t *head); 607 struct mcp_backendconn_s *proxy_choose_beconn(mcp_backend_t *be); 608 mcp_resp_t *mcp_prep_resobj(lua_State *L, mcp_request_t *rq, mcp_backend_t *be, LIBEVENT_THREAD *t); 609 mcp_resp_t *mcp_prep_bare_resobj(lua_State *L, LIBEVENT_THREAD *t); 610 void mcp_resp_set_elapsed(mcp_resp_t *r); 611 io_pending_proxy_t *mcp_queue_rctx_io(mcp_rcontext_t *rctx, mcp_request_t *rq, mcp_backend_t *be, mcp_resp_t *r); 612 613 // internal request interface 614 int mcplib_internal(lua_State *L); 615 int mcplib_internal_run(mcp_rcontext_t *rctx); 616 617 // user stats interface 618 #define MAX_USTATS_DEFAULT 1024 619 int mcplib_add_stat(lua_State *L); 620 int mcplib_stat(lua_State *L); 621 size_t _process_request_next_key(mcp_parser_t *pr); 622 int process_request(mcp_parser_t *pr, const char *command, size_t cmdlen); 623 mcp_request_t *mcp_new_request(lua_State *L, mcp_parser_t *pr, const char *command, size_t cmdlen); 624 void mcp_set_request(mcp_parser_t *pr, mcp_request_t *r, const char *command, size_t cmdlen); 625 626 // rate limit interfaces 627 int mcplib_ratelim_tbf(lua_State *L); 628 int mcplib_ratelim_tbf_call(lua_State *L); 629 int mcplib_ratelim_global_tbf(lua_State *L); 630 int mcplib_ratelim_proxy_tbf_call(lua_State *L); 631 int mcp_ratelim_proxy_tbf(lua_State *from, lua_State *to); 632 int mcplib_ratelim_global_tbf_gc(lua_State *L); 633 int mcplib_ratelim_proxy_tbf_gc(lua_State *L); 634 635 // request function generator interface 636 void proxy_return_rctx_cb(io_pending_t *pending); 637 void proxy_finalize_rctx_cb(io_pending_t *pending); 638 639 enum mcp_rqueue_e { 640 QWAIT_IDLE = 0, 641 QWAIT_ANY, 642 QWAIT_OK, 643 QWAIT_GOOD, 644 QWAIT_FASTGOOD, 645 QWAIT_HANDLE, 646 QWAIT_SLEEP, 647 }; 648 649 #define FGEN_NAME_MAXLEN 80 650 struct mcp_funcgen_s { 651 LIBEVENT_THREAD *thread; // worker thread that created this funcgen. 652 int generator_ref; // reference to the generator function. 653 int self_ref; // self-reference if we're attached anywhere 654 int argument_ref; // reference to an argument to pass to generator 655 int max_queues; // how many queue slots rctx's have 656 unsigned int refcount; // reference counter 657 unsigned int total; // total contexts managed 658 unsigned int free; // free contexts 659 unsigned int free_max; // size of list below. 660 unsigned int free_pressure; // "pressure" for when to early release rctx 661 bool closed; // the hook holding this fgen has been replaced 662 bool ready; // if we're locked down or not. 663 bool is_router; // if this fgen is actually a router object. 664 struct timespec free_waiter; // must be "too free" for this much time 665 mcp_rcontext_t **list; 666 struct mcp_rqueue_s *queue_list; 667 char name[FGEN_NAME_MAXLEN+1]; // string name for the generator. 668 }; 669 670 enum mcp_funcgen_router_e { 671 FGEN_ROUTER_NONE = 0, 672 FGEN_ROUTER_CMDMAP, 673 FGEN_ROUTER_SHORTSEP, 674 FGEN_ROUTER_LONGSEP, 675 FGEN_ROUTER_ANCHORSM, 676 FGEN_ROUTER_ANCHORBIG, 677 }; 678 679 struct mcp_router_long_s { 680 char start[KEY_HASH_FILTER_MAX+1]; 681 char stop[KEY_HASH_FILTER_MAX+1]; 682 }; 683 684 // To simplify the attach/start code we wrap a funcgen with the router 685 // structure. This allows us to have a larger router structure without 686 // bloating the fgen object itself, and still benefit from letting funcgen 687 // new/cleanup handle most of the memory management. 688 struct mcp_funcgen_router { 689 mcp_funcgen_t fgen_self; 690 enum mcp_funcgen_router_e type; 691 union { 692 char sep; 693 char lsep[KEY_HASH_FILTER_MAX+1]; 694 char anchorsm[2]; // short anchored mode. 695 struct mcp_router_long_s big; 696 } conf; 697 int map_ref; 698 mcp_funcgen_t *def_fgen; // default route 699 mcp_funcgen_t *cmap[CMD_END_STORAGE]; // fallback command map 700 }; 701 702 #define RQUEUE_TYPE_NONE 0 703 #define RQUEUE_TYPE_POOL 1 704 #define RQUEUE_TYPE_FGEN 2 705 #define RQUEUE_ASSIGNED (1<<0) 706 #define RQUEUE_R_RESUME (1<<1) 707 #define RQUEUE_R_GOOD (1<<3) 708 #define RQUEUE_R_OK (1<<4) 709 #define RQUEUE_R_ANY (1<<5) 710 #define RQUEUE_R_ERROR (1<<7) 711 712 enum mcp_rqueue_state { 713 RQUEUE_IDLE = 0, 714 RQUEUE_QUEUED, 715 RQUEUE_ACTIVE, 716 RQUEUE_COMPLETE, 717 RQUEUE_WAITED 718 }; 719 720 struct mcp_rqueue_s { 721 int obj_ref; // reference to pool/func/etc object 722 int cb_ref; // if a lua callback was specified 723 int req_ref; // reference to associated request object. 724 int res_ref; // reference to lua response object. 725 void *obj; // direct pointer to the object for fast access. 726 mcp_request_t *rq; // request set to this slot 727 mcp_resp_t *res_obj; // pointer to result object 728 enum mcp_rqueue_state state; // queued/active/etc 729 uint8_t obj_type; // what the obj_ref actually is. 730 uint8_t flags; // bit flags for various states 731 }; 732 733 struct mcp_rcontext_s { 734 int self_ref; // reference to our own object 735 int request_ref; // top level request for this context. 736 int function_ref; // ref to the created route function. 737 int coroutine_ref; // ref to our encompassing coroutine. 738 unsigned int async_pending; // legacy async handling 739 int pending_reqs; // pending requests and sub-requests 740 unsigned int wait_count; 741 unsigned int wait_done; // TODO: change these variables to uint8's 742 int wait_handle; // waiting on a specific queue slot 743 int parent_handle; // queue slot in parent rctx 744 int conn_fd; // fd of the originating client, as *c can become invalid 745 enum mcp_rqueue_e wait_mode; 746 uint8_t lua_narg; // number of responses to push when yield resuming. 747 bool first_queue; // HACK 748 lua_State *Lc; // coroutine thread pointer. 749 mcp_request_t *request; // ptr to the above reference. 750 mcp_rcontext_t *parent; // parent rctx in the call graph 751 conn *c; // associated client object. 752 mc_resp *resp; // top level response object to fill. 753 mcp_funcgen_t *fgen; // parent function generator context. 754 struct event timeout_event; // for *_wait_timeout() and sleep() calls 755 struct mcp_rqueue_s qslots[]; // queueable slots. 756 }; 757 758 #define mcp_is_flag_invalid(f) (f < 65 || f > 122) 759 760 void mcp_run_rcontext_handle(mcp_rcontext_t *rctx, int handle); 761 void mcp_process_rctx_wait(mcp_rcontext_t *rctx, int handle); 762 int mcp_process_rqueue_return(mcp_rcontext_t *rctx, int handle, mcp_resp_t *res); 763 int mcplib_rcontext_handle_set_cb(lua_State *L); 764 int mcplib_rcontext_enqueue(lua_State *L); 765 int mcplib_rcontext_wait_cond(lua_State *L); 766 int mcplib_rcontext_wait_handle(lua_State *L); 767 int mcplib_rcontext_enqueue_and_wait(lua_State *L); 768 int mcplib_rcontext_res_good(lua_State *L); 769 int mcplib_rcontext_res_any(lua_State *L); 770 int mcplib_rcontext_res_ok(lua_State *L); 771 int mcplib_rcontext_result(lua_State *L); 772 int mcplib_rcontext_cfd(lua_State *L); 773 int mcplib_rcontext_tls_peer_cn(lua_State *L); 774 int mcplib_rcontext_request_new(lua_State *L); 775 int mcplib_rcontext_response_new(lua_State *L); 776 int mcplib_rcontext_sleep(lua_State *L); 777 int mcplib_funcgenbare_new(lua_State *L); 778 int mcplib_funcgen_new(lua_State *L); 779 int mcplib_funcgen_new_handle(lua_State *L); 780 int mcplib_funcgen_ready(lua_State *L); 781 int mcplib_router_new(lua_State *L); 782 mcp_rcontext_t *mcp_funcgen_start(lua_State *L, mcp_funcgen_t *fgen, mcp_parser_t *pr); 783 mcp_rcontext_t *mcp_funcgen_get_rctx(lua_State *L, int fgen_ref, mcp_funcgen_t *fgen); 784 void mcp_funcgen_return_rctx(mcp_rcontext_t *rctx); 785 int mcplib_funcgen_gc(lua_State *L); 786 void mcp_funcgen_reference(lua_State *L); 787 void mcp_funcgen_dereference(lua_State *L, mcp_funcgen_t *fgen); 788 void mcp_rcontext_push_rqu_res(lua_State *L, mcp_rcontext_t *rctx, int handle); 789 790 791 int mcplib_factory_command_new(lua_State *L); 792 793 // request interface 794 int mcplib_request(lua_State *L); 795 int mcplib_request_command(lua_State *L); 796 int mcplib_request_key(lua_State *L); 797 int mcplib_request_ltrimkey(lua_State *L); 798 int mcplib_request_rtrimkey(lua_State *L); 799 int mcplib_request_token(lua_State *L); 800 int mcplib_request_token_int(lua_State *L); 801 int mcplib_request_ntokens(lua_State *L); 802 int mcplib_request_has_flag(lua_State *L); 803 int mcplib_request_flag_token(lua_State *L); 804 int mcplib_request_flag_token_int(lua_State *L); 805 int mcplib_request_flag_add(lua_State *L); 806 int mcplib_request_flag_set(lua_State *L); 807 int mcplib_request_flag_replace(lua_State *L); 808 int mcplib_request_flag_del(lua_State *L); 809 int mcplib_request_gc(lua_State *L); 810 int mcplib_request_match_res(lua_State *L); 811 void mcp_request_cleanup(LIBEVENT_THREAD *t, mcp_request_t *rq); 812 813 // response interface 814 int mcplib_response_elapsed(lua_State *L); 815 int mcplib_response_ok(lua_State *L); 816 int mcplib_response_hit(lua_State *L); 817 int mcplib_response_vlen(lua_State *L); 818 int mcplib_response_code(lua_State *L); 819 int mcplib_response_line(lua_State *L); 820 int mcplib_response_flag_blank(lua_State *L); 821 822 // inspector interface 823 int mcplib_req_inspector_new(lua_State *L); 824 int mcplib_res_inspector_new(lua_State *L); 825 int mcplib_inspector_gc(lua_State *L); 826 int mcplib_inspector_call(lua_State *L); 827 828 // mutator interface 829 int mcplib_req_mutator_new(lua_State *L); 830 int mcplib_res_mutator_new(lua_State *L); 831 int mcplib_mutator_gc(lua_State *L); 832 int mcplib_mutator_call(lua_State *L); 833 834 void mcp_response_cleanup(LIBEVENT_THREAD *t, mcp_resp_t *r); 835 void mcp_set_resobj(mcp_resp_t *r, mcp_request_t *rq, mcp_backend_t *be, LIBEVENT_THREAD *t); 836 int mcplib_response_gc(lua_State *L); 837 int mcplib_response_close(lua_State *L); 838 839 int mcplib_open_dist_jump_hash(lua_State *L); 840 int mcplib_open_dist_ring_hash(lua_State *L); 841 842 int proxy_run_rcontext(mcp_rcontext_t *rctx); 843 mcp_backend_t *mcplib_pool_proxy_call_helper(mcp_pool_proxy_t *pp, const char *key, size_t len); 844 void mcp_request_attach(mcp_request_t *rq, io_pending_proxy_t *p); 845 int mcp_request_render(mcp_request_t *rq, int idx, char flag, const char *tok, size_t len); 846 int mcp_request_append(mcp_request_t *rq, const char flag, const char *tok, size_t len); 847 int mcp_request_find_flag_index(mcp_request_t *rq, const char flag); 848 int mcp_request_find_flag_token(mcp_request_t *rq, const char flag, const char **token, size_t *len); 849 int mcp_request_find_flag_tokenint64(mcp_request_t *rq, const char flag, int64_t *token); 850 void proxy_lua_error(lua_State *L, const char *s); 851 #define proxy_lua_ferror(L, fmt, ...) \ 852 do { \ 853 lua_pushfstring(L, fmt, __VA_ARGS__); \ 854 lua_error(L); \ 855 } while (0) 856 857 #define PROXY_SERVER_ERROR "SERVER_ERROR " 858 #define PROXY_CLIENT_ERROR "CLIENT_ERROR " 859 void proxy_out_errstring(mc_resp *resp, char *type, const char *str); 860 int _start_proxy_config_threads(proxy_ctx_t *ctx); 861 int proxy_thread_loadconf(proxy_ctx_t *ctx, LIBEVENT_THREAD *thr); 862 863 // TODO (v2): more .h files, perhaps? 864 int mcplib_open_hash_xxhash(lua_State *L); 865 866 __attribute__((unused)) void dump_stack(lua_State *L, const char *msg); 867 __attribute__((unused)) void dump_registry(lua_State *L, const char *msg); 868 __attribute__((unused)) void dump_funcgen(lua_State *L, const char *name, const char *msg); 869 __attribute__((unused)) void dump_pools(lua_State *L, const char *msg); 870 #endif 871