1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // Functions related to the backend handler thread.
3
4 #include "proxy.h"
5 #include "proxy_tls.h"
6
7 enum proxy_be_failures {
8 P_BE_FAIL_TIMEOUT = 0,
9 P_BE_FAIL_DISCONNECTED,
10 P_BE_FAIL_CONNECTING,
11 P_BE_FAIL_CONNTIMEOUT,
12 P_BE_FAIL_READVALIDATE,
13 P_BE_FAIL_BADVALIDATE,
14 P_BE_FAIL_WRITING,
15 P_BE_FAIL_READING,
16 P_BE_FAIL_PARSING,
17 P_BE_FAIL_CLOSED,
18 P_BE_FAIL_UNHANDLEDRES,
19 P_BE_FAIL_OOM,
20 P_BE_FAIL_ENDSYNC,
21 P_BE_FAIL_TRAILINGDATA,
22 P_BE_FAIL_INVALIDPROTOCOL,
23 };
24
25 const char *proxy_be_failure_text[] = {
26 [P_BE_FAIL_TIMEOUT] = "timeout",
27 [P_BE_FAIL_DISCONNECTED] = "disconnected",
28 [P_BE_FAIL_CONNECTING] = "connecting",
29 [P_BE_FAIL_CONNTIMEOUT] = "conntimeout",
30 [P_BE_FAIL_READVALIDATE] = "readvalidate",
31 [P_BE_FAIL_BADVALIDATE] = "badvalidate",
32 [P_BE_FAIL_WRITING] = "writing",
33 [P_BE_FAIL_READING] = "reading",
34 [P_BE_FAIL_PARSING] = "parsing",
35 [P_BE_FAIL_CLOSED] = "closedsock",
36 [P_BE_FAIL_UNHANDLEDRES] = "unhandledres",
37 [P_BE_FAIL_OOM] = "outofmemory",
38 [P_BE_FAIL_ENDSYNC] = "missingend",
39 [P_BE_FAIL_TRAILINGDATA] = "trailingdata",
40 [P_BE_FAIL_INVALIDPROTOCOL] = "invalidprotocol",
41 NULL
42 };
43
44 static void proxy_backend_handler(const int fd, const short which, void *arg);
45 static void proxy_backend_tls_handler(const int fd, const short which, void *arg);
46 static void proxy_beconn_handler(const int fd, const short which, void *arg);
47 static void proxy_beconn_tls_handler(const int fd, const short which, void *arg);
48 static void proxy_event_handler(evutil_socket_t fd, short which, void *arg);
49 static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg);
50 static int _prep_pending_write(struct mcp_backendconn_s *be);
51 static void _post_pending_write(struct mcp_backendconn_s *be, ssize_t sent);
52 static int _flush_pending_write(struct mcp_backendconn_s *be);
53 static int _flush_pending_tls_write(struct mcp_backendconn_s *be);
54 static void _cleanup_backend(mcp_backend_t *be);
55 static void _reset_bad_backend(struct mcp_backendconn_s *be, enum proxy_be_failures err);
56 static void _set_main_event(struct mcp_backendconn_s *be, struct event_base *base, int flags, struct timeval *t, event_callback_fn callback);
57 static void _stop_main_event(struct mcp_backendconn_s *be);
58 static void _start_write_event(struct mcp_backendconn_s *be);
59 static void _stop_write_event(struct mcp_backendconn_s *be);
60 static void _start_timeout_event(struct mcp_backendconn_s *be);
61 static void _stop_timeout_event(struct mcp_backendconn_s *be);
62 static int proxy_backend_drive_machine(struct mcp_backendconn_s *be);
63
64 /* Helper routines common to io_uring and libevent modes */
65
66 // TODO (v3): doing an inline syscall here, not ideal for uring mode.
67 // leaving for now since this should be extremely uncommon.
_beconn_send_validate(struct mcp_backendconn_s * be)68 static int _beconn_send_validate(struct mcp_backendconn_s *be) {
69 const char *str = "version\r\n";
70 const ssize_t len = strlen(str);
71
72 ssize_t res = write(mcmc_fd(be->client), str, len);
73
74 if (res == -1) {
75 return -1;
76 }
77
78 // I'm making an opinionated statement that we should be able to write
79 // "version\r\n" into a fresh socket without hitting EAGAIN.
80 if (res < len) {
81 return -1;
82 }
83
84 return 1;
85 }
86
_proxy_beconn_checkconnect(struct mcp_backendconn_s * be)87 static int _proxy_beconn_checkconnect(struct mcp_backendconn_s *be) {
88 int err = 0;
89 // We were connecting, now ensure we're properly connected.
90 if (mcmc_check_nonblock_connect(be->client, &err) != MCMC_OK) {
91 P_DEBUG("%s: backend failed to connect (%s:%s)\n", __func__, be->be_parent->name, be->be_parent->port);
92 // kick the bad backend, clear the queue, retry later.
93 // FIXME (v2): if a connect fails, anything currently in the queue
94 // should be safe to hold up until their timeout.
95 _reset_bad_backend(be, P_BE_FAIL_CONNECTING);
96 return -1;
97 }
98 P_DEBUG("%s: backend connected [fd: %d] (%s:%s)\n", __func__, mcmc_fd(be->client), be->be_parent->name, be->be_parent->port);
99 be->connecting = false;
100 be->state = mcp_backend_read;
101
102 // seed the failure time for the flap check.
103 gettimeofday(&be->last_failed, NULL);
104
105 be->validating = true;
106 // TODO: make validation optional.
107
108 return 0;
109 }
110
111 // Use a simple heuristic to choose a backend connection socket out of a list
112 // of sockets.
proxy_choose_beconn(mcp_backend_t * be)113 struct mcp_backendconn_s *proxy_choose_beconn(mcp_backend_t *be) {
114 struct mcp_backendconn_s *bec = &be->be[0];
115 if (be->conncount != 1) {
116 int depth = INT_MAX;
117 // TODO: to computationally limit + ensure each connection stays
118 // somewhat warm:
119 // - remember idx of last conn used.
120 // - if next idx has a lower depth, use that one instead
121 // - tick idx (and reset if necessary)
122 // else under low loads only the first conn will ever get used (which
123 // is normally good; but sometimes bad if using stateful firewalls)
124 for (int x = 0; x < be->conncount; x++) {
125 struct mcp_backendconn_s *bec_i = &be->be[x];
126 if (bec_i->bad) {
127 continue;
128 }
129 if (bec_i->depth == 0) {
130 bec = bec_i;
131 break;
132 } else if (bec_i->depth < depth) {
133 depth = bec_i->depth;
134 bec = bec_i;
135 }
136 }
137 }
138
139 return bec;
140 }
141
_proxy_event_handler_dequeue(proxy_event_thread_t * t)142 static void _proxy_event_handler_dequeue(proxy_event_thread_t *t) {
143 io_head_t head;
144
145 STAILQ_INIT(&head);
146 STAILQ_INIT(&t->be_head);
147
148 // Pull the entire stack of inbound into local queue.
149 pthread_mutex_lock(&t->mutex);
150 STAILQ_CONCAT(&head, &t->io_head_in);
151 pthread_mutex_unlock(&t->mutex);
152
153 while (!STAILQ_EMPTY(&head)) {
154 io_pending_proxy_t *io = STAILQ_FIRST(&head);
155 io->flushed = false;
156
157 // _no_ mutex on backends. they are owned by the event thread.
158 STAILQ_REMOVE_HEAD(&head, io_next);
159 // paranoia about moving items between lists.
160 io->io_next.stqe_next = NULL;
161
162 mcp_backend_t *be = io->backend;
163 STAILQ_INSERT_TAIL(&be->io_head, io, io_next);
164 assert(be->depth > -1);
165 be->depth++;
166 if (!be->stacked) {
167 be->stacked = true;
168 STAILQ_INSERT_TAIL(&t->be_head, be, be_next);
169 }
170 }
171 }
172
_cleanup_backend(mcp_backend_t * be)173 static void _cleanup_backend(mcp_backend_t *be) {
174 for (int x = 0; x < be->conncount; x++) {
175 struct mcp_backendconn_s *bec = &be->be[x];
176 // remove any pending events.
177 if (!be->tunables.down) {
178 int pending = event_pending(&bec->main_event, EV_READ|EV_WRITE|EV_TIMEOUT, NULL);
179 if (pending != 0) {
180 event_del(&bec->main_event); // an error to call event_del() without event.
181 }
182 pending = event_pending(&bec->write_event, EV_READ|EV_WRITE|EV_TIMEOUT, NULL);
183 if (pending != 0) {
184 event_del(&bec->write_event); // an error to call event_del() without event.
185 }
186 pending = event_pending(&bec->timeout_event, EV_TIMEOUT, NULL);
187 if (pending != 0) {
188 event_del(&bec->timeout_event); // an error to call event_del() without event.
189 }
190
191 // - assert on empty queue
192 assert(STAILQ_EMPTY(&bec->io_head));
193
194 mcp_tls_shutdown(bec);
195 mcmc_disconnect(bec->client);
196
197 if (bec->bad) {
198 mcp_sharedvm_delta(bec->event_thread->ctx, SHAREDVM_BACKEND_IDX,
199 bec->be_parent->label, -1);
200 }
201 }
202 // - free be->client
203 free(bec->client);
204 // - free be->rbuf
205 free(bec->rbuf);
206 }
207 // free once parent has had all connections closed off.
208 free(be);
209 }
210
_setup_backend(mcp_backend_t * be)211 static void _setup_backend(mcp_backend_t *be) {
212 for (int x = 0; x < be->conncount; x++) {
213 struct mcp_backendconn_s *bec = &be->be[x];
214 if (be->tunables.down) {
215 // backend is "forced" into a bad state. never connect or
216 // otherwise attempt to use it.
217 be->be[x].bad = true;
218 continue;
219 }
220 // assign the initial events to the backend, so we don't have to
221 // constantly check if they were initialized yet elsewhere.
222 // note these events will not fire until event_add() is called.
223 int status = mcmc_connect(bec->client, be->name, be->port, bec->connect_flags);
224 event_callback_fn _beconn_handler = &proxy_beconn_handler;
225 event_callback_fn _backend_handler = &proxy_backend_handler;
226 if (be->tunables.use_tls) {
227 _beconn_handler = &proxy_beconn_tls_handler;
228 _backend_handler = &proxy_backend_tls_handler;
229 }
230 event_assign(&bec->main_event, bec->event_thread->base, mcmc_fd(bec->client), EV_WRITE|EV_TIMEOUT, _beconn_handler, bec);
231 event_assign(&bec->write_event, bec->event_thread->base, mcmc_fd(bec->client), EV_WRITE|EV_TIMEOUT, _backend_handler, bec);
232 event_assign(&bec->timeout_event, bec->event_thread->base, -1, EV_TIMEOUT, _backend_handler, bec);
233
234 if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) {
235 // if we're already connected for some reason, still push it
236 // through the connection handler to keep the code unified. It
237 // will auto-wake because the socket is writeable.
238 bec->connecting = true;
239 bec->can_write = false;
240 // kick off the event we intialized above.
241 event_add(&bec->main_event, &bec->tunables.connect);
242 } else {
243 _reset_bad_backend(bec, P_BE_FAIL_CONNECTING);
244 }
245 }
246 }
247
248 // event handler for injecting backends for processing
249 // currently just for initiating connections the first time.
proxy_event_beconn(evutil_socket_t fd,short which,void * arg)250 static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) {
251 proxy_event_thread_t *t = arg;
252
253 #ifdef USE_EVENTFD
254 uint64_t u;
255 if (read(fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
256 // Temporary error or wasn't actually ready to read somehow.
257 return;
258 }
259 #else
260 char buf[1];
261 if (read(fd, buf, 1) != 1) {
262 P_DEBUG("%s: pipe read failed\n", __func__);
263 return;
264 }
265 #endif
266
267 beconn_head_t head;
268
269 STAILQ_INIT(&head);
270 pthread_mutex_lock(&t->mutex);
271 STAILQ_CONCAT(&head, &t->beconn_head_in);
272 pthread_mutex_unlock(&t->mutex);
273
274 // Think we should reuse this code path for manually instructing backends
275 // to disable/etc but not coding for that generically. We just need to
276 // check the state of the backend when it reaches here or some flags at
277 // least.
278 // FIXME: another ->stacked flag?
279 // Either that or remove the STAILQ code and just using an array of
280 // ptr's.
281 mcp_backend_t *be = NULL;
282 // be can be freed by the loop, so can't use STAILQ_FOREACH.
283 while (!STAILQ_EMPTY(&head)) {
284 be = STAILQ_FIRST(&head);
285 STAILQ_REMOVE_HEAD(&head, beconn_next);
286 if (be->transferred) {
287 // If this object was already transferred here, we're being
288 // signalled to clean it up and free.
289 _cleanup_backend(be);
290 } else {
291 be->transferred = true;
292 _setup_backend(be);
293 }
294 }
295 }
296
_proxy_flush_backend_queue(mcp_backend_t * be)297 static void _proxy_flush_backend_queue(mcp_backend_t *be) {
298 io_pending_proxy_t *io = NULL;
299 P_DEBUG("%s: fast failing request to bad backend (%s:%s) depth: %d\n", __func__, be->name, be->port, be->depth);
300
301 while (!STAILQ_EMPTY(&be->io_head)) {
302 io = STAILQ_FIRST(&be->io_head);
303 STAILQ_REMOVE_HEAD(&be->io_head, io_next);
304 mcp_resp_set_elapsed(io->client_resp);
305 io->client_resp->status = MCMC_ERR;
306 io->client_resp->resp.code = MCMC_CODE_SERVER_ERROR;
307 be->depth--;
308 assert(be->depth > -1);
309 return_io_pending((io_pending_t *)io);
310 }
311 }
312
proxy_run_backend_queue(be_head_t * head)313 void proxy_run_backend_queue(be_head_t *head) {
314 mcp_backend_t *be;
315 STAILQ_FOREACH(be, head, be_next) {
316 be->stacked = false;
317 int flags = 0;
318 struct mcp_backendconn_s *bec = proxy_choose_beconn(be);
319
320 int limit = be->tunables.backend_depth_limit;
321 if (bec->bad) {
322 // TODO: another counter for fast fails?
323 _proxy_flush_backend_queue(be);
324 continue;
325 } else if (limit && bec->depth > limit) {
326 proxy_ctx_t *ctx = bec->event_thread->ctx;
327 STAT_INCR(ctx, request_failed_depth, be->depth);
328 _proxy_flush_backend_queue(be);
329 continue;
330 }
331
332 // drop new requests onto end of conn's io-head, reset the backend one.
333 STAILQ_CONCAT(&bec->io_head, &be->io_head);
334 bec->depth += be->depth;
335 be->depth = 0;
336
337 if (bec->connecting || bec->validating) {
338 P_DEBUG("%s: deferring IO pending connecting (%s:%s)\n", __func__, be->name, be->port);
339 } else {
340 if (!bec->ssl) {
341 flags = _flush_pending_write(bec);
342 } else {
343 flags = _flush_pending_tls_write(bec);
344 }
345
346 if (flags == -1) {
347 _reset_bad_backend(bec, P_BE_FAIL_WRITING);
348 } else if (flags & EV_WRITE) {
349 // only get here because we need to kick off the write handler
350 _start_write_event(bec);
351 }
352
353 if (bec->pending_read) {
354 _start_timeout_event(bec);
355 }
356
357 }
358 }
359 }
360
361 // event handler for executing backend requests
proxy_event_handler(evutil_socket_t fd,short which,void * arg)362 static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
363 proxy_event_thread_t *t = arg;
364
365 #ifdef USE_EVENTFD
366 uint64_t u;
367 if (read(fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
368 // Temporary error or wasn't actually ready to read somehow.
369 return;
370 }
371 #else
372 char buf[1];
373 // TODO (v2): This is a lot more fatal than it should be. can it fail? can
374 // it blow up the server?
375 // TODO (v2): a cross-platform method of speeding this up would be nice. With
376 // event fds we can queue N events and wakeup once here.
377 // If we're pulling one byte out of the pipe at a time here it'll just
378 // wake us up too often.
379 // If the pipe is O_NONBLOCK then maybe just a larger read would work?
380 if (read(fd, buf, 1) != 1) {
381 P_DEBUG("%s: pipe read failed\n", __func__);
382 return;
383 }
384 #endif
385
386 _proxy_event_handler_dequeue(t);
387
388 // Re-walk each backend and check set event as required.
389 proxy_run_backend_queue(&t->be_head);
390 }
391
proxy_event_thread(void * arg)392 void *proxy_event_thread(void *arg) {
393 proxy_event_thread_t *t = arg;
394
395 logger_create(); // TODO (v2): add logger ptr to structure
396 event_base_loop(t->base, 0);
397 event_base_free(t->base);
398
399 // TODO (v2): join bt threads, free array.
400
401 return NULL;
402 }
403
_set_main_event(struct mcp_backendconn_s * be,struct event_base * base,int flags,struct timeval * t,event_callback_fn callback)404 static void _set_main_event(struct mcp_backendconn_s *be, struct event_base *base, int flags, struct timeval *t, event_callback_fn callback) {
405 int pending = event_pending(&be->main_event, EV_READ|EV_WRITE|EV_TIMEOUT, NULL);
406 if (pending != 0) {
407 event_del(&be->main_event); // replace existing event.
408 }
409
410 int fd = mcmc_fd(be->client);
411 if (fd == 0) {
412 fd = -1; // need to pass -1 to event assign if we're not operating on
413 // a connection.
414 }
415 event_assign(&be->main_event, base, fd,
416 flags, callback, be);
417 event_add(&be->main_event, t);
418 }
419
_stop_main_event(struct mcp_backendconn_s * be)420 static void _stop_main_event(struct mcp_backendconn_s *be) {
421 event_del(&be->main_event);
422 }
423
_start_write_event(struct mcp_backendconn_s * be)424 static void _start_write_event(struct mcp_backendconn_s *be) {
425 int pending = event_pending(&be->write_event, EV_WRITE|EV_TIMEOUT, NULL);
426 if (pending != 0) {
427 return;
428 }
429 // FIXME: wasn't there a write timeout?
430 event_add(&be->write_event, &be->tunables.read);
431 }
432
_stop_write_event(struct mcp_backendconn_s * be)433 static void _stop_write_event(struct mcp_backendconn_s *be) {
434 event_del(&be->write_event);
435 }
436
437 // handle the read timeouts with a side event, so we can stick with a
438 // persistent listener (optimization + catch disconnects faster)
_start_timeout_event(struct mcp_backendconn_s * be)439 static void _start_timeout_event(struct mcp_backendconn_s *be) {
440 int pending = event_pending(&be->timeout_event, EV_TIMEOUT, NULL);
441 if (pending != 0) {
442 return;
443 }
444 event_add(&be->timeout_event, &be->tunables.read);
445 }
446
_stop_timeout_event(struct mcp_backendconn_s * be)447 static void _stop_timeout_event(struct mcp_backendconn_s *be) {
448 int pending = event_pending(&be->timeout_event, EV_TIMEOUT, NULL);
449 if (pending == 0) {
450 return;
451 }
452 event_del(&be->timeout_event);
453 }
454
_drive_machine_next(struct mcp_backendconn_s * be,io_pending_proxy_t * p)455 static void _drive_machine_next(struct mcp_backendconn_s *be, io_pending_proxy_t *p) {
456 // set the head here. when we break the head will be correct.
457 STAILQ_REMOVE_HEAD(&be->io_head, io_next);
458 be->depth--;
459 assert(p != be->io_next); // don't remove what we need to flush.
460 assert(be->depth > -1);
461 be->pending_read--;
462 assert(be->pending_read > -1);
463
464 mcp_resp_set_elapsed(p->client_resp);
465 // have to do the q->count-- and == 0 and redispatch_conn()
466 // stuff here. The moment we call return_io here we
467 // don't own *p anymore.
468 return_io_pending((io_pending_t *)p);
469 be->state = mcp_backend_read;
470 }
471
472 // NOTES:
473 // - mcp_backend_read: grab req_stack_head, do things
474 // read -> next, want_read -> next | read_end, etc.
proxy_backend_drive_machine(struct mcp_backendconn_s * be)475 static int proxy_backend_drive_machine(struct mcp_backendconn_s *be) {
476 bool stop = false;
477 io_pending_proxy_t *p = NULL;
478 int flags = 0;
479
480 p = STAILQ_FIRST(&be->io_head);
481 if (p == NULL) {
482 // got a read event, but nothing was queued.
483 // probably means a disconnect event.
484 // TODO (v2): could probably confirm this by attempting to read the
485 // socket, getsockopt, or something else simply for logging or
486 // statistical purposes.
487 // In this case we know it's going to be a close so error.
488 flags = P_BE_FAIL_CLOSED;
489 P_DEBUG("%s: read event but nothing in IO queue\n", __func__);
490 return flags;
491 }
492
493 while (!stop) {
494 mcp_resp_t *r;
495
496 switch(be->state) {
497 case mcp_backend_read:
498 assert(p != NULL);
499 // FIXME: remove the _read state?
500 be->state = mcp_backend_parse;
501 break;
502 case mcp_backend_parse:
503 r = p->client_resp;
504 r->status = mcmc_parse_buf(be->rbuf, be->rbufused, &r->resp);
505
506 // Quick check if we need more data.
507 if (r->resp.code == MCMC_WANT_READ) {
508 return 0;
509 }
510
511 // we actually don't care about anything but the value length
512 // TODO (v2): if vlen != vlen_read, pull an item and copy the data.
513 int extra_space = 0;
514 // if all goes well, move to the next request.
515 be->state = mcp_backend_next;
516 switch (r->resp.type) {
517 case MCMC_RESP_GET:
518 // We're in GET mode. we only support one key per
519 // GET in the proxy backends, so we need to later check
520 // for an END.
521 extra_space = ENDLEN;
522 be->state = mcp_backend_read_end;
523 break;
524 case MCMC_RESP_END:
525 // this is a MISS from a GET request
526 // or final handler from a STAT request.
527 assert(r->resp.vlen == 0);
528 if (p->ascii_multiget) {
529 // Ascii multiget hack mode; consume END's
530 be->rbufused -= r->resp.reslen;
531 if (be->rbufused > 0) {
532 memmove(be->rbuf, be->rbuf+r->resp.reslen, be->rbufused);
533 }
534
535 be->state = mcp_backend_next;
536 continue;
537 }
538 break;
539 case MCMC_RESP_META:
540 // we can handle meta responses easily since they're self
541 // contained.
542 break;
543 case MCMC_RESP_GENERIC:
544 case MCMC_RESP_NUMERIC:
545 break;
546 case MCMC_RESP_ERRMSG: // received an error message
547 if (r->resp.code != MCMC_CODE_SERVER_ERROR) {
548 // Non server errors are protocol errors; can't trust
549 // the connection anymore.
550 be->state = mcp_backend_next_close;
551 }
552 break;
553 case MCMC_RESP_FAIL:
554 P_DEBUG("%s: mcmc_read failed [%d]\n", __func__, r->status);
555 flags = P_BE_FAIL_PARSING;
556 stop = true;
557 break;
558 // TODO (v2): No-op response?
559 default:
560 P_DEBUG("%s: Unhandled response from backend: %d\n", __func__, r->resp.type);
561 // unhandled :(
562 flags = P_BE_FAIL_UNHANDLEDRES;
563 stop = true;
564 break;
565 }
566
567 // r->resp.reslen + r->resp.vlen is the total length of the response.
568 // TODO (v2): need to associate a buffer with this response...
569 // for now we simply malloc, but reusable buffers should be used
570
571 r->blen = r->resp.reslen + r->resp.vlen;
572 {
573 bool oom = proxy_bufmem_checkadd(r->thread, r->blen + extra_space);
574
575 if (oom) {
576 flags = P_BE_FAIL_OOM;
577 // need to zero out blen so we don't over-decrement later
578 r->blen = 0;
579 stop = true;
580 break;
581 }
582 }
583 r->buf = malloc(r->blen + extra_space);
584 if (r->buf == NULL) {
585 // Enforce accounting.
586 pthread_mutex_lock(&r->thread->proxy_limit_lock);
587 r->thread->proxy_buffer_memory_used -= r->blen + extra_space;
588 pthread_mutex_unlock(&r->thread->proxy_limit_lock);
589
590 flags = P_BE_FAIL_OOM;
591 r->blen = 0;
592 stop = true;
593 break;
594 }
595
596 P_DEBUG("%s: r->status: %d, r->bread: %d, r->vlen: %lu\n", __func__, r->status, r->bread, r->resp.vlen);
597 if (r->resp.vlen != r->resp.vlen_read) {
598 // shouldn't be possible to have excess in buffer
599 // if we're dealing with a partial value.
600 assert(be->rbufused == r->resp.reslen+r->resp.vlen_read);
601 P_DEBUG("%s: got a short read, moving to want_read\n", __func__);
602 // copy the partial and advance mcmc's buffer digestion.
603 memcpy(r->buf, be->rbuf, r->resp.reslen + r->resp.vlen_read);
604 r->bread = r->resp.reslen + r->resp.vlen_read;
605 be->rbufused = 0;
606 be->state = mcp_backend_want_read;
607 flags = 0;
608 stop = true;
609 break;
610 } else {
611 // mcmc's already counted the value as read if it fit in
612 // the original buffer...
613 memcpy(r->buf, be->rbuf, r->resp.reslen+r->resp.vlen_read);
614 }
615
616 // had a response, advance the buffer.
617 be->rbufused -= r->resp.reslen + r->resp.vlen_read;
618 if (be->rbufused > 0) {
619 memmove(be->rbuf, be->rbuf+r->resp.reslen+r->resp.vlen_read, be->rbufused);
620 }
621
622 break;
623 case mcp_backend_read_end:
624 r = p->client_resp;
625 // we need to ensure the next data in the stream is "END\r\n"
626 // if not, the stack is desynced and we lose it.
627
628 if (be->rbufused >= ENDLEN) {
629 if (memcmp(be->rbuf, ENDSTR, ENDLEN) != 0) {
630 flags = P_BE_FAIL_ENDSYNC;
631 stop = true;
632 break;
633 } else {
634 // response is good.
635 // FIXME (v2): copy what the server actually sent?
636 if (!p->ascii_multiget) {
637 // sigh... if part of a multiget we need to eat the END
638 // markers down here.
639 memcpy(r->buf+r->blen, ENDSTR, ENDLEN);
640 r->blen += 5;
641 } else {
642 r->extra = 5;
643 }
644
645 // advance buffer
646 be->rbufused -= ENDLEN;
647 if (be->rbufused > 0) {
648 memmove(be->rbuf, be->rbuf+ENDLEN, be->rbufused);
649 }
650 }
651 } else {
652 flags = 0;
653 stop = true;
654 break;
655 }
656
657 be->state = mcp_backend_next;
658
659 break;
660 case mcp_backend_want_read:
661 // Continuing a read from earlier
662 r = p->client_resp;
663 // take bread input and see if we're done reading the value,
664 // else advance, set buffers, return next.
665 P_DEBUG("%s: [want_read] r->bread: %d vlen: %lu\n", __func__, r->bread, r->resp.vlen);
666 assert(be->rbufused != 0);
667 size_t tocopy = be->rbufused < r->blen - r->bread ?
668 be->rbufused : r->blen - r->bread;
669 memcpy(r->buf+r->bread, be->rbuf, tocopy);
670 r->bread += tocopy;
671
672 if (r->bread >= r->blen) {
673 // all done copying data.
674 if (r->resp.type == MCMC_RESP_GET) {
675 be->state = mcp_backend_read_end;
676 } else {
677 be->state = mcp_backend_next;
678 }
679
680 // shuffle remaining buffer.
681 be->rbufused -= tocopy;
682 if (be->rbufused > 0) {
683 memmove(be->rbuf, be->rbuf+tocopy, be->rbufused);
684 }
685 } else {
686 assert(tocopy == be->rbufused);
687 // signal to caller to issue a read.
688 be->rbufused = 0;
689 flags = 0;
690 stop = true;
691 }
692
693 break;
694 case mcp_backend_next:
695 _drive_machine_next(be, p);
696
697 if (STAILQ_EMPTY(&be->io_head)) {
698 stop = true;
699 // if there're no pending requests, the read buffer
700 // should also be empty.
701 if (be->rbufused > 0) {
702 flags = P_BE_FAIL_TRAILINGDATA;
703 }
704 break;
705 } else {
706 p = STAILQ_FIRST(&be->io_head);
707 }
708
709 // if leftover, keep processing IO's.
710 // if no more data in buffer, need to re-set stack head and re-set
711 // event.
712 P_DEBUG("%s: [next] remain: %lu\n", __func__, be->rbufused);
713 if (be->rbufused != 0) {
714 // data trailing in the buffer, for a different request.
715 be->state = mcp_backend_parse;
716 } else {
717 // need to read more data, buffer is empty.
718 stop = true;
719 }
720
721 break;
722 case mcp_backend_next_close:
723 // we advance and return the current IO, then kill the conn.
724 _drive_machine_next(be, p);
725 stop = true;
726 flags = P_BE_FAIL_INVALIDPROTOCOL;
727
728 break;
729 default:
730 // TODO (v2): at some point (after v1?) this should attempt to recover,
731 // though we should only get here from memory corruption and
732 // bailing may be the right thing to do.
733 fprintf(stderr, "%s: invalid backend state: %d\n", __func__, be->state);
734 assert(false);
735 } // switch
736 } // while
737
738 return flags;
739 }
740
_backend_reconnect(struct mcp_backendconn_s * be)741 static void _backend_reconnect(struct mcp_backendconn_s *be) {
742 int status = mcmc_connect(be->client, be->be_parent->name, be->be_parent->port, be->connect_flags);
743 if (status == MCMC_CONNECTED) {
744 // TODO (v2): unexpected but lets let it be here.
745 be->connecting = false;
746 be->can_write = true;
747 } else if (status == MCMC_CONNECTING) {
748 be->connecting = true;
749 be->can_write = false;
750 } else {
751 // failed to immediately re-establish the connection.
752 // need to put the BE into a bad/retry state.
753 be->connecting = false;
754 be->can_write = true;
755 }
756 // re-create the write handler for the new file descriptor.
757 // the main event will be re-assigned after this call.
758 event_callback_fn _backend_handler = &proxy_backend_handler;
759 if (be->be_parent->tunables.use_tls) {
760 _backend_handler = &proxy_backend_tls_handler;
761 }
762 event_assign(&be->write_event, be->event_thread->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, _backend_handler, be);
763 // do not need to re-assign the timer event because it's not tied to fd
764 }
765
766 // All we need to do here is schedule the backend to attempt to connect again.
proxy_backend_retry_handler(const int fd,const short which,void * arg)767 static void proxy_backend_retry_handler(const int fd, const short which, void *arg) {
768 struct mcp_backendconn_s *be = arg;
769 assert(which & EV_TIMEOUT);
770 struct timeval tmp_time = be->tunables.connect;
771 _backend_reconnect(be);
772 event_callback_fn _backend_handler = &proxy_beconn_handler;
773 if (be->be_parent->tunables.use_tls) {
774 _backend_handler = &proxy_beconn_tls_handler;
775 }
776 _set_main_event(be, be->event_thread->base, EV_WRITE, &tmp_time, _backend_handler);
777 }
778
779 // must be called after _reset_bad_backend(), so the backend is currently
780 // clear.
781 // TODO (v2): extra counter for "backend connect tries" so it's still possible
782 // to see dead backends exist
_backend_reschedule(struct mcp_backendconn_s * be)783 static void _backend_reschedule(struct mcp_backendconn_s *be) {
784 bool failed = false;
785 struct timeval tmp_time = {0};
786 long int retry_time = be->tunables.retry.tv_sec;
787 char *badtext = "markedbad";
788 if (be->flap_count > be->tunables.backend_failure_limit) {
789 // reduce retry frequency to avoid noise.
790 float backoff = retry_time;
791 for (int x = 0; x < be->flap_count; x++) {
792 backoff *= be->tunables.flap_backoff_ramp;
793 }
794 retry_time = (uint32_t)backoff;
795
796 if (retry_time > be->tunables.flap_backoff_max) {
797 retry_time = be->tunables.flap_backoff_max;
798 }
799 badtext = "markedbadflap";
800 failed = true;
801 } else if (be->failed_count > be->tunables.backend_failure_limit) {
802 failed = true;
803 }
804 tmp_time.tv_sec = retry_time;
805
806 if (failed) {
807 if (!be->bad) {
808 P_DEBUG("%s: marking backend as bad\n", __func__);
809 STAT_INCR(be->event_thread->ctx, backend_marked_bad, 1);
810 mcp_sharedvm_delta(be->event_thread->ctx, SHAREDVM_BACKEND_IDX,
811 be->be_parent->label, 1);
812 LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_BE_ERROR, NULL, badtext, be->be_parent->name, be->be_parent->port, be->be_parent->label, 0, NULL, 0, retry_time);
813 }
814 be->bad = true;
815 _set_main_event(be, be->event_thread->base, EV_TIMEOUT, &tmp_time, proxy_backend_retry_handler);
816 } else {
817 struct timeval tmp_time = be->tunables.connect;
818 STAT_INCR(be->event_thread->ctx, backend_failed, 1);
819 _backend_reconnect(be);
820 event_callback_fn _backend_handler = &proxy_beconn_handler;
821 if (be->be_parent->tunables.use_tls) {
822 _backend_handler = &proxy_beconn_tls_handler;
823 }
824 _set_main_event(be, be->event_thread->base, EV_WRITE, &tmp_time, _backend_handler);
825 }
826 }
827
_backend_flap_check(struct mcp_backendconn_s * be,enum proxy_be_failures err)828 static void _backend_flap_check(struct mcp_backendconn_s *be, enum proxy_be_failures err) {
829 struct timeval now;
830 struct timeval *flap = &be->tunables.flap;
831
832 switch (err) {
833 case P_BE_FAIL_TIMEOUT:
834 case P_BE_FAIL_DISCONNECTED:
835 case P_BE_FAIL_WRITING:
836 case P_BE_FAIL_READING:
837 if (flap->tv_sec != 0 || flap->tv_usec != 0) {
838 struct timeval delta = {0};
839 int64_t subsec = 0;
840 gettimeofday(&now, NULL);
841 delta.tv_sec = now.tv_sec - be->last_failed.tv_sec;
842 subsec = now.tv_usec - be->last_failed.tv_usec;
843 if (subsec < 0) {
844 // tv_usec is specced as "at least" [-1, 1000000]
845 // so to guarantee lower negatives we need this temp var.
846 delta.tv_sec--;
847 subsec += 1000000;
848 delta.tv_usec = subsec;
849 }
850
851 if (flap->tv_sec < delta.tv_sec ||
852 (flap->tv_sec == delta.tv_sec && flap->tv_usec < delta.tv_usec)) {
853 // delta is larger than our flap range. reset the flap counter.
854 be->flap_count = 0;
855 } else {
856 // seems like we flapped again.
857 be->flap_count++;
858 }
859 be->last_failed = now;
860 }
861 break;
862 default:
863 // only perform a flap check on network related errors.
864 break;
865 }
866 }
867
868 // TODO (v2): add a second argument for assigning a specific error to all pending
869 // IO's (ie; timeout).
870 // The backend has gotten into a bad state (timed out, protocol desync, or
871 // some other supposedly unrecoverable error: purge the queue and
872 // cycle the socket.
873 // Note that some types of errors may not require flushing the queue and
874 // should be fixed as they're figured out.
875 // _must_ be called from within the event thread.
_reset_bad_backend(struct mcp_backendconn_s * be,enum proxy_be_failures err)876 static void _reset_bad_backend(struct mcp_backendconn_s *be, enum proxy_be_failures err) {
877 io_pending_proxy_t *io = NULL;
878 P_DEBUG("%s: resetting bad backend: [fd: %d] %s\n", __func__, mcmc_fd(be->client), proxy_be_failure_text[err]);
879 // Can't use STAILQ_FOREACH() since r_io_p() free's the current
880 // io. STAILQ_FOREACH_SAFE maybe?
881 int depth = be->depth;
882 while (!STAILQ_EMPTY(&be->io_head)) {
883 io = STAILQ_FIRST(&be->io_head);
884 STAILQ_REMOVE_HEAD(&be->io_head, io_next);
885 // TODO (v2): Unsure if this is the best way of surfacing errors to lua,
886 // but will do for V1.
887 mcp_resp_set_elapsed(io->client_resp);
888 io->client_resp->status = MCMC_ERR;
889 io->client_resp->resp.code = MCMC_CODE_SERVER_ERROR;
890 be->depth--;
891 assert(be->depth > -1);
892 return_io_pending((io_pending_t *)io);
893 }
894
895 STAILQ_INIT(&be->io_head);
896 be->io_next = NULL; // also reset the write offset.
897
898 // Only log if we don't already know it's messed up.
899 if (!be->bad) {
900 LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_BE_ERROR, NULL, proxy_be_failure_text[err], be->be_parent->name, be->be_parent->port, be->be_parent->label, depth, be->rbuf, be->rbufused, 0);
901 }
902
903 // reset buffer to blank state.
904 be->rbufused = 0;
905 be->pending_read = 0;
906 // clear events so the reconnect handler can re-arm them with a few fd.
907 _stop_write_event(be);
908 _stop_main_event(be);
909 _stop_timeout_event(be);
910 mcp_tls_shutdown(be);
911 mcmc_disconnect(be->client);
912 // we leave the main event alone, because be_failed() always overwrites.
913
914 // check failure counters and schedule a retry.
915 be->failed_count++;
916 _backend_flap_check(be, err);
917 _backend_reschedule(be);
918 }
919
_prep_pending_write(struct mcp_backendconn_s * be)920 static int _prep_pending_write(struct mcp_backendconn_s *be) {
921 struct iovec *iovs = be->write_iovs;
922 io_pending_proxy_t *io = NULL;
923 int iovused = 0;
924 if (be->io_next == NULL) {
925 // separate pointer for how far into the list we've flushed.
926 io = STAILQ_FIRST(&be->io_head);
927 } else {
928 io = be->io_next;
929 }
930 assert(io != NULL);
931 for (; io; io = STAILQ_NEXT(io, io_next)) {
932 // TODO (v2): paranoia for now, but this check should never fire
933 if (io->flushed)
934 continue;
935
936 if (io->iovcnt + iovused > BE_IOV_MAX) {
937 // We will need to keep writing later.
938 break;
939 }
940
941 memcpy(&iovs[iovused], io->iov, sizeof(struct iovec)*io->iovcnt);
942 iovused += io->iovcnt;
943 }
944 return iovused;
945 }
946
947 // returns true if any pending writes were fully flushed.
_post_pending_write(struct mcp_backendconn_s * be,ssize_t sent)948 static void _post_pending_write(struct mcp_backendconn_s *be, ssize_t sent) {
949 io_pending_proxy_t *io = be->io_next;
950 if (io == NULL) {
951 io = STAILQ_FIRST(&be->io_head);
952 }
953
954 for (; io; io = STAILQ_NEXT(io, io_next)) {
955 bool flushed = true;
956 if (io->flushed)
957 continue;
958 if (sent >= io->iovbytes) {
959 // short circuit for common case.
960 sent -= io->iovbytes;
961 } else {
962 io->iovbytes -= sent;
963 for (int x = 0; x < io->iovcnt; x++) {
964 struct iovec *iov = &io->iov[x];
965 if (sent >= iov->iov_len) {
966 sent -= iov->iov_len;
967 iov->iov_len = 0;
968 } else {
969 iov->iov_len -= sent;
970 iov->iov_base = (char *)iov->iov_base + sent;
971 sent = 0;
972 flushed = false;
973 break;
974 }
975 }
976 }
977 io->flushed = flushed;
978 if (flushed) {
979 be->pending_read++;
980 }
981
982 if (sent <= 0) {
983 // really shouldn't be negative, though.
984 assert(sent >= 0);
985 break;
986 }
987 } // for
988
989 // resume the flush from this point.
990 if (io != NULL) {
991 if (!io->flushed) {
992 be->io_next = io;
993 } else {
994 // Check for incomplete list because we hit the iovcnt limit.
995 io_pending_proxy_t *nio = STAILQ_NEXT(io, io_next);
996 if (nio != NULL && !nio->flushed) {
997 be->io_next = nio;
998 } else {
999 be->io_next = NULL;
1000 }
1001 }
1002 } else {
1003 be->io_next = NULL;
1004 }
1005 }
1006
_flush_pending_write(struct mcp_backendconn_s * be)1007 static int _flush_pending_write(struct mcp_backendconn_s *be) {
1008 int flags = 0;
1009 // Allow us to be called with an empty stack to prevent dev errors.
1010 if (STAILQ_EMPTY(&be->io_head)) {
1011 return 0;
1012 }
1013
1014 int iovcnt = _prep_pending_write(be);
1015
1016 ssize_t sent = writev(mcmc_fd(be->client), be->write_iovs, iovcnt);
1017 if (sent > 0) {
1018 _post_pending_write(be, sent);
1019 // still have unflushed pending IO's, check for write and re-loop.
1020 if (be->io_next) {
1021 be->can_write = false;
1022 flags |= EV_WRITE;
1023 }
1024 } else if (sent == -1) {
1025 if (errno == EAGAIN || errno == EWOULDBLOCK) {
1026 be->can_write = false;
1027 flags |= EV_WRITE;
1028 } else {
1029 flags = -1;
1030 }
1031 }
1032
1033 return flags;
1034 }
1035
_flush_pending_tls_write(struct mcp_backendconn_s * be)1036 static int _flush_pending_tls_write(struct mcp_backendconn_s *be) {
1037 int flags = 0;
1038 // Allow us to be called with an empty stack to prevent dev errors.
1039 if (STAILQ_EMPTY(&be->io_head)) {
1040 return 0;
1041 }
1042
1043 int iovcnt = _prep_pending_write(be);
1044
1045 int sent = mcp_tls_writev(be, iovcnt);
1046 if (sent > 0) {
1047 _post_pending_write(be, sent);
1048 // FIXME: can _post_pending_write do this and return EV_WRITE?
1049 // still have unflushed pending IO's, check for write and re-loop.
1050 if (be->io_next) {
1051 be->can_write = false;
1052 flags |= EV_WRITE;
1053 }
1054 } else if (sent == MCP_TLS_NEEDIO) {
1055 // want io
1056 be->can_write = false;
1057 flags |= EV_WRITE;
1058 } else if (sent == MCP_TLS_ERR) {
1059 // hard error from tls
1060 flags = -1;
1061 }
1062
1063 return flags;
1064 }
1065
1066
proxy_bevalidate_tls_handler(const int fd,const short which,void * arg)1067 static void proxy_bevalidate_tls_handler(const int fd, const short which, void *arg) {
1068 assert(arg != NULL);
1069 struct mcp_backendconn_s *be = arg;
1070 int flags = EV_TIMEOUT;
1071 struct timeval tmp_time = be->tunables.read;
1072
1073 if (which & EV_TIMEOUT) {
1074 P_DEBUG("%s: backend timed out while connecting [fd: %d]\n", __func__, mcmc_fd(be->client));
1075 if (be->connecting) {
1076 _reset_bad_backend(be, P_BE_FAIL_CONNTIMEOUT);
1077 } else {
1078 _reset_bad_backend(be, P_BE_FAIL_READVALIDATE);
1079 }
1080 return;
1081 }
1082
1083 if (which & EV_READ) {
1084 int read = mcp_tls_read(be);
1085
1086 if (read > 0) {
1087 mcmc_resp_t r;
1088
1089 int status = mcmc_parse_buf(be->rbuf, be->rbufused, &r);
1090 if (status == MCMC_ERR) {
1091 // Needed more data for a version line, somehow. I feel like
1092 // this should set off some alarms, but it is possible.
1093 if (r.code == MCMC_WANT_READ) {
1094 _set_main_event(be, be->event_thread->base, EV_READ, &tmp_time, proxy_bevalidate_tls_handler);
1095 return;
1096 }
1097
1098 _reset_bad_backend(be, P_BE_FAIL_READVALIDATE);
1099 return;
1100 }
1101
1102 if (r.code != MCMC_CODE_VERSION) {
1103 _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE);
1104 return;
1105 }
1106
1107 be->validating = false;
1108 be->rbufused = 0;
1109 } else if (read == 0) {
1110 // not connected or error.
1111 _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
1112 return;
1113 } else if (read == MCP_TLS_NEEDIO) {
1114 // try again failure.
1115 _set_main_event(be, be->event_thread->base, EV_READ, &tmp_time, proxy_bevalidate_tls_handler);
1116 return;
1117 } else if (read == MCP_TLS_ERR) {
1118 // hard failure.
1119 _reset_bad_backend(be, P_BE_FAIL_READING);
1120 return;
1121 }
1122
1123 // Passed validation, don't need to re-read, flush any pending writes.
1124 int res = _flush_pending_tls_write(be);
1125 if (res == -1) {
1126 _reset_bad_backend(be, P_BE_FAIL_WRITING);
1127 return;
1128 }
1129 if (flags & EV_WRITE) {
1130 _start_write_event(be);
1131 }
1132 if (be->pending_read) {
1133 _start_timeout_event(be);
1134 }
1135 }
1136
1137 // switch to the primary persistent read event.
1138 if (!be->validating) {
1139 _set_main_event(be, be->event_thread->base, EV_READ|EV_PERSIST, NULL, proxy_backend_tls_handler);
1140
1141 // we're happily validated and switching to normal processing, so
1142 // _now_ the backend is no longer "bad".
1143 // If we reset the failed count earlier we then can fail the
1144 // validation loop indefinitely without ever being marked bad.
1145 if (be->bad) {
1146 // was bad, need to mark as no longer bad in shared space.
1147 mcp_sharedvm_delta(be->event_thread->ctx, SHAREDVM_BACKEND_IDX,
1148 be->be_parent->label, -1);
1149 }
1150 be->bad = false;
1151 be->failed_count = 0;
1152 }
1153 }
1154
1155 // Libevent handler when we're in TLS mode. Unfortunately the code is
1156 // different enough to warrant its own function.
proxy_beconn_tls_handler(const int fd,const short which,void * arg)1157 static void proxy_beconn_tls_handler(const int fd, const short which, void *arg) {
1158 assert(arg != NULL);
1159 struct mcp_backendconn_s *be = arg;
1160 //int flags = EV_TIMEOUT;
1161 struct timeval tmp_time = be->tunables.read;
1162
1163 if (which & EV_TIMEOUT) {
1164 P_DEBUG("%s: backend timed out while connecting [fd: %d]\n", __func__, mcmc_fd(be->client));
1165 if (be->connecting) {
1166 _reset_bad_backend(be, P_BE_FAIL_CONNTIMEOUT);
1167 } else {
1168 _reset_bad_backend(be, P_BE_FAIL_READVALIDATE);
1169 }
1170 return;
1171 }
1172
1173 if (which & EV_WRITE) {
1174 be->can_write = true;
1175
1176 if (be->connecting) {
1177 if (_proxy_beconn_checkconnect(be) == -1) {
1178 return;
1179 }
1180 // TODO: check return code.
1181 mcp_tls_connect(be);
1182 // fall through to handshake attempt.
1183 }
1184 }
1185
1186 assert(be->validating);
1187 int ret = mcp_tls_handshake(be);
1188 if (ret == MCP_TLS_NEEDIO) {
1189 // Need to try again.
1190 _set_main_event(be, be->event_thread->base, EV_READ, &tmp_time, proxy_beconn_tls_handler);
1191 return;
1192 } else if (ret == 1) {
1193 // handshake complete.
1194 if (mcp_tls_send_validate(be) != MCP_TLS_OK) {
1195 _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE);
1196 return;
1197 }
1198
1199 // switch to another handler for the final stage.
1200 _set_main_event(be, be->event_thread->base, EV_READ, &tmp_time, proxy_bevalidate_tls_handler);
1201 } else if (ret < 0) {
1202 // FIXME: FAIL_HANDSHAKE
1203 _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE);
1204 return;
1205 }
1206 }
1207
1208 // Libevent handler for backends in a connecting state.
proxy_beconn_handler(const int fd,const short which,void * arg)1209 static void proxy_beconn_handler(const int fd, const short which, void *arg) {
1210 assert(arg != NULL);
1211 struct mcp_backendconn_s *be = arg;
1212 int flags = EV_TIMEOUT;
1213 struct timeval tmp_time = be->tunables.read;
1214
1215 if (which & EV_TIMEOUT) {
1216 P_DEBUG("%s: backend timed out while connecting [fd: %d]\n", __func__, mcmc_fd(be->client));
1217 if (be->connecting) {
1218 _reset_bad_backend(be, P_BE_FAIL_CONNTIMEOUT);
1219 } else {
1220 _reset_bad_backend(be, P_BE_FAIL_READVALIDATE);
1221 }
1222 return;
1223 }
1224
1225 if (which & EV_WRITE) {
1226 be->can_write = true;
1227
1228 if (be->connecting) {
1229 if (_proxy_beconn_checkconnect(be) == -1) {
1230 return;
1231 }
1232 if (_beconn_send_validate(be) == -1) {
1233 _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE);
1234 return;
1235 }
1236 _set_main_event(be, be->event_thread->base, EV_READ, &tmp_time, proxy_beconn_handler);
1237 }
1238
1239 // TODO: currently never taken, until validation is made optional.
1240 if (!be->validating) {
1241 int res = _flush_pending_write(be);
1242 if (res == -1) {
1243 _reset_bad_backend(be, P_BE_FAIL_WRITING);
1244 return;
1245 }
1246 flags |= res;
1247 // FIXME: set write event?
1248 }
1249 }
1250
1251 if (which & EV_READ) {
1252 assert(be->validating);
1253
1254 int read = recv(mcmc_fd(be->client), be->rbuf + be->rbufused, READ_BUFFER_SIZE - be->rbufused, 0);
1255 if (read > 0) {
1256 mcmc_resp_t r;
1257 be->rbufused += read;
1258
1259 int status = mcmc_parse_buf(be->rbuf, be->rbufused, &r);
1260 if (status == MCMC_ERR) {
1261 // Needed more data for a version line, somehow. I feel like
1262 // this should set off some alarms, but it is possible.
1263 if (r.code == MCMC_WANT_READ) {
1264 _set_main_event(be, be->event_thread->base, EV_READ, &tmp_time, proxy_beconn_handler);
1265 return;
1266 }
1267
1268 _reset_bad_backend(be, P_BE_FAIL_READVALIDATE);
1269 return;
1270 }
1271
1272 if (r.code != MCMC_CODE_VERSION) {
1273 _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE);
1274 return;
1275 }
1276
1277 be->validating = false;
1278 be->rbufused = 0;
1279 } else if (read == 0) {
1280 // not connected or error.
1281 _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
1282 return;
1283 } else if (read == -1) {
1284 // sit on epoll again.
1285 if (errno != EAGAIN && errno != EWOULDBLOCK) {
1286 _reset_bad_backend(be, P_BE_FAIL_READING);
1287 return;
1288 }
1289 _set_main_event(be, be->event_thread->base, EV_READ, &tmp_time, proxy_beconn_handler);
1290 return;
1291 }
1292
1293 // Passed validation, don't need to re-read, flush any pending writes.
1294 int res = _flush_pending_write(be);
1295 if (res == -1) {
1296 _reset_bad_backend(be, P_BE_FAIL_WRITING);
1297 return;
1298 }
1299 if (res & EV_WRITE) {
1300 _start_write_event(be);
1301 }
1302 if (be->pending_read) {
1303 _start_timeout_event(be);
1304 }
1305 }
1306
1307 // switch to the primary persistent read event.
1308 if (!be->validating) {
1309 _set_main_event(be, be->event_thread->base, EV_READ|EV_PERSIST, NULL, proxy_backend_handler);
1310
1311 // we're happily validated and switching to normal processing, so
1312 // _now_ the backend is no longer "bad".
1313 // If we reset the failed count earlier we then can fail the
1314 // validation loop indefinitely without ever being marked bad.
1315 if (be->bad) {
1316 // was bad, need to mark as no longer bad in shared space.
1317 mcp_sharedvm_delta(be->event_thread->ctx, SHAREDVM_BACKEND_IDX,
1318 be->be_parent->label, -1);
1319 }
1320 be->bad = false;
1321 be->failed_count = 0;
1322 }
1323 }
1324
proxy_backend_tls_handler(const int fd,const short which,void * arg)1325 static void proxy_backend_tls_handler(const int fd, const short which, void *arg) {
1326 struct mcp_backendconn_s *be = arg;
1327
1328 if (which & EV_TIMEOUT) {
1329 P_DEBUG("%s: timeout received, killing backend queue\n", __func__);
1330 _reset_bad_backend(be, P_BE_FAIL_TIMEOUT);
1331 return;
1332 }
1333
1334 if (which & EV_WRITE) {
1335 be->can_write = true;
1336 int res = _flush_pending_tls_write(be);
1337 if (res == -1) {
1338 _reset_bad_backend(be, P_BE_FAIL_WRITING);
1339 return;
1340 }
1341 if (res & EV_WRITE) {
1342 _start_write_event(be);
1343 }
1344 }
1345
1346 if (which & EV_READ) {
1347 // got a read event, always kill the pending read timer.
1348 _stop_timeout_event(be);
1349 // We do the syscall here before diving into the state machine to allow a
1350 // common code path for io_uring/epoll/tls/etc
1351 int read = mcp_tls_read(be);
1352 if (read > 0) {
1353 int res = proxy_backend_drive_machine(be);
1354 if (res != 0) {
1355 _reset_bad_backend(be, res);
1356 return;
1357 }
1358 } else if (read == 0) {
1359 // not connected or error.
1360 _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
1361 return;
1362 } else if (read == MCP_TLS_NEEDIO) {
1363 // sit on epoll again.
1364 return;
1365 } else if (read == MCP_TLS_ERR) {
1366 _reset_bad_backend(be, P_BE_FAIL_READING);
1367 return;
1368 }
1369
1370 #ifdef PROXY_DEBUG
1371 if (!STAILQ_EMPTY(&be->io_head)) {
1372 P_DEBUG("backend has leftover IOs: %d\n", be->depth);
1373 }
1374 #endif
1375 }
1376
1377 if (be->pending_read) {
1378 _start_timeout_event(be);
1379 }
1380 }
1381
1382 // The libevent backend callback handler.
1383 // If we end up resetting a backend, it will get put back into a connecting
1384 // state.
proxy_backend_handler(const int fd,const short which,void * arg)1385 static void proxy_backend_handler(const int fd, const short which, void *arg) {
1386 struct mcp_backendconn_s *be = arg;
1387
1388 if (which & EV_TIMEOUT) {
1389 P_DEBUG("%s: timeout received, killing backend queue\n", __func__);
1390 _reset_bad_backend(be, P_BE_FAIL_TIMEOUT);
1391 return;
1392 }
1393
1394 if (which & EV_WRITE) {
1395 be->can_write = true;
1396 int res = _flush_pending_write(be);
1397 if (res == -1) {
1398 _reset_bad_backend(be, P_BE_FAIL_WRITING);
1399 return;
1400 }
1401 if (res & EV_WRITE) {
1402 _start_write_event(be);
1403 }
1404 }
1405
1406 if (which & EV_READ) {
1407 // got a read event, always kill the pending read timer.
1408 _stop_timeout_event(be);
1409 // We do the syscall here before diving into the state machine to allow a
1410 // common code path for io_uring/epoll
1411 int read = recv(mcmc_fd(be->client), be->rbuf + be->rbufused,
1412 READ_BUFFER_SIZE - be->rbufused, 0);
1413 if (read > 0) {
1414 be->rbufused += read;
1415 int res = proxy_backend_drive_machine(be);
1416 if (res != 0) {
1417 _reset_bad_backend(be, res);
1418 return;
1419 }
1420 } else if (read == 0) {
1421 // not connected or error.
1422 _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
1423 return;
1424 } else if (read == -1) {
1425 // sit on epoll again.
1426 if (errno != EAGAIN && errno != EWOULDBLOCK) {
1427 _reset_bad_backend(be, P_BE_FAIL_READING);
1428 return;
1429 }
1430 }
1431
1432 #ifdef PROXY_DEBUG
1433 if (!STAILQ_EMPTY(&be->io_head)) {
1434 P_DEBUG("backend has leftover IOs: %d\n", be->depth);
1435 }
1436 #endif
1437 }
1438
1439 if (be->pending_read) {
1440 _start_timeout_event(be);
1441 }
1442 }
1443
proxy_init_event_thread(proxy_event_thread_t * t,proxy_ctx_t * ctx,struct event_base * base)1444 void proxy_init_event_thread(proxy_event_thread_t *t, proxy_ctx_t *ctx, struct event_base *base) {
1445 t->ctx = ctx;
1446 #ifdef USE_EVENTFD
1447 t->event_fd = eventfd(0, EFD_NONBLOCK);
1448 if (t->event_fd == -1) {
1449 perror("failed to create backend notify eventfd");
1450 exit(1);
1451 }
1452 t->be_event_fd = eventfd(0, EFD_NONBLOCK);
1453 if (t->be_event_fd == -1) {
1454 perror("failed to create backend notify eventfd");
1455 exit(1);
1456 }
1457 #else
1458 int fds[2];
1459 if (pipe(fds)) {
1460 perror("can't create proxy backend notify pipe");
1461 exit(1);
1462 }
1463
1464 t->notify_receive_fd = fds[0];
1465 t->notify_send_fd = fds[1];
1466
1467 if (pipe(fds)) {
1468 perror("can't create proxy backend connection notify pipe");
1469 exit(1);
1470 }
1471 t->be_notify_receive_fd = fds[0];
1472 t->be_notify_send_fd = fds[1];
1473 #endif
1474
1475 // incoming request queue.
1476 STAILQ_INIT(&t->io_head_in);
1477 STAILQ_INIT(&t->beconn_head_in);
1478 pthread_mutex_init(&t->mutex, NULL);
1479 pthread_cond_init(&t->cond, NULL);
1480
1481 // initialize the event system.
1482
1483 #ifdef HAVE_LIBURING
1484 if (t->ctx->use_uring) {
1485 fprintf(stderr, "Sorry, io_uring not supported right now\n");
1486 abort();
1487 }
1488 #endif
1489
1490 if (base == NULL) {
1491 struct event_config *ev_config;
1492 ev_config = event_config_new();
1493 event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
1494 t->base = event_base_new_with_config(ev_config);
1495 event_config_free(ev_config);
1496 if (! t->base) {
1497 fprintf(stderr, "Can't allocate event base\n");
1498 exit(1);
1499 }
1500 } else {
1501 // reusing an event base from a worker thread.
1502 t->base = base;
1503 }
1504
1505 // listen for notifications.
1506 // NULL was thread_libevent_process
1507 // FIXME (v2): use modern format? (event_assign)
1508 #ifdef USE_EVENTFD
1509 event_set(&t->notify_event, t->event_fd,
1510 EV_READ | EV_PERSIST, proxy_event_handler, t);
1511 event_set(&t->beconn_event, t->be_event_fd,
1512 EV_READ | EV_PERSIST, proxy_event_beconn, t);
1513 #else
1514 event_set(&t->notify_event, t->notify_receive_fd,
1515 EV_READ | EV_PERSIST, proxy_event_handler, t);
1516 event_set(&t->beconn_event, t->be_notify_receive_fd,
1517 EV_READ | EV_PERSIST, proxy_event_beconn, t);
1518 #endif
1519
1520 event_base_set(t->base, &t->notify_event);
1521 if (event_add(&t->notify_event, 0) == -1) {
1522 fprintf(stderr, "Can't monitor libevent notify pipe\n");
1523 exit(1);
1524 }
1525 event_base_set(t->base, &t->beconn_event);
1526 if (event_add(&t->beconn_event, 0) == -1) {
1527 fprintf(stderr, "Can't monitor libevent notify pipe\n");
1528 exit(1);
1529 }
1530 }
1531
1532
1533