1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // Functions related to the backend handler thread.
3 
4 #include "proxy.h"
5 #include "proxy_tls.h"
6 
7 enum proxy_be_failures {
8     P_BE_FAIL_TIMEOUT = 0,
9     P_BE_FAIL_DISCONNECTED,
10     P_BE_FAIL_CONNECTING,
11     P_BE_FAIL_CONNTIMEOUT,
12     P_BE_FAIL_READVALIDATE,
13     P_BE_FAIL_BADVALIDATE,
14     P_BE_FAIL_WRITING,
15     P_BE_FAIL_READING,
16     P_BE_FAIL_PARSING,
17     P_BE_FAIL_CLOSED,
18     P_BE_FAIL_UNHANDLEDRES,
19     P_BE_FAIL_OOM,
20     P_BE_FAIL_ENDSYNC,
21     P_BE_FAIL_TRAILINGDATA,
22     P_BE_FAIL_INVALIDPROTOCOL,
23 };
24 
25 const char *proxy_be_failure_text[] = {
26     [P_BE_FAIL_TIMEOUT] = "timeout",
27     [P_BE_FAIL_DISCONNECTED] = "disconnected",
28     [P_BE_FAIL_CONNECTING] = "connecting",
29     [P_BE_FAIL_CONNTIMEOUT] = "conntimeout",
30     [P_BE_FAIL_READVALIDATE] = "readvalidate",
31     [P_BE_FAIL_BADVALIDATE] = "badvalidate",
32     [P_BE_FAIL_WRITING] = "writing",
33     [P_BE_FAIL_READING] = "reading",
34     [P_BE_FAIL_PARSING] = "parsing",
35     [P_BE_FAIL_CLOSED] = "closedsock",
36     [P_BE_FAIL_UNHANDLEDRES] = "unhandledres",
37     [P_BE_FAIL_OOM] = "outofmemory",
38     [P_BE_FAIL_ENDSYNC] = "missingend",
39     [P_BE_FAIL_TRAILINGDATA] = "trailingdata",
40     [P_BE_FAIL_INVALIDPROTOCOL] = "invalidprotocol",
41     NULL
42 };
43 
44 static void proxy_backend_handler(const int fd, const short which, void *arg);
45 static void proxy_backend_tls_handler(const int fd, const short which, void *arg);
46 static void proxy_beconn_handler(const int fd, const short which, void *arg);
47 static void proxy_beconn_tls_handler(const int fd, const short which, void *arg);
48 static void proxy_event_handler(evutil_socket_t fd, short which, void *arg);
49 static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg);
50 static int _prep_pending_write(struct mcp_backendconn_s *be);
51 static void _post_pending_write(struct mcp_backendconn_s *be, ssize_t sent);
52 static int _flush_pending_write(struct mcp_backendconn_s *be);
53 static int _flush_pending_tls_write(struct mcp_backendconn_s *be);
54 static void _cleanup_backend(mcp_backend_t *be);
55 static void _reset_bad_backend(struct mcp_backendconn_s *be, enum proxy_be_failures err);
56 static void _set_main_event(struct mcp_backendconn_s *be, struct event_base *base, int flags, struct timeval *t, event_callback_fn callback);
57 static void _stop_main_event(struct mcp_backendconn_s *be);
58 static void _start_write_event(struct mcp_backendconn_s *be);
59 static void _stop_write_event(struct mcp_backendconn_s *be);
60 static void _start_timeout_event(struct mcp_backendconn_s *be);
61 static void _stop_timeout_event(struct mcp_backendconn_s *be);
62 static int proxy_backend_drive_machine(struct mcp_backendconn_s *be);
63 
64 /* Helper routines common to io_uring and libevent modes */
65 
66 // TODO (v3): doing an inline syscall here, not ideal for uring mode.
67 // leaving for now since this should be extremely uncommon.
_beconn_send_validate(struct mcp_backendconn_s * be)68 static int _beconn_send_validate(struct mcp_backendconn_s *be) {
69     const char *str = "version\r\n";
70     const ssize_t len = strlen(str);
71 
72     ssize_t res = write(mcmc_fd(be->client), str, len);
73 
74     if (res == -1) {
75         return -1;
76     }
77 
78     // I'm making an opinionated statement that we should be able to write
79     // "version\r\n" into a fresh socket without hitting EAGAIN.
80     if (res < len) {
81         return -1;
82     }
83 
84     return 1;
85 }
86 
_proxy_beconn_checkconnect(struct mcp_backendconn_s * be)87 static int _proxy_beconn_checkconnect(struct mcp_backendconn_s *be) {
88     int err = 0;
89     // We were connecting, now ensure we're properly connected.
90     if (mcmc_check_nonblock_connect(be->client, &err) != MCMC_OK) {
91         P_DEBUG("%s: backend failed to connect (%s:%s)\n", __func__, be->be_parent->name, be->be_parent->port);
92         // kick the bad backend, clear the queue, retry later.
93         // FIXME (v2): if a connect fails, anything currently in the queue
94         // should be safe to hold up until their timeout.
95         _reset_bad_backend(be, P_BE_FAIL_CONNECTING);
96         return -1;
97     }
98     P_DEBUG("%s: backend connected [fd: %d] (%s:%s)\n", __func__, mcmc_fd(be->client), be->be_parent->name, be->be_parent->port);
99     be->connecting = false;
100     be->state = mcp_backend_read;
101 
102     // seed the failure time for the flap check.
103     gettimeofday(&be->last_failed, NULL);
104 
105     be->validating = true;
106     // TODO: make validation optional.
107 
108     return 0;
109 }
110 
111 // Use a simple heuristic to choose a backend connection socket out of a list
112 // of sockets.
proxy_choose_beconn(mcp_backend_t * be)113 struct mcp_backendconn_s *proxy_choose_beconn(mcp_backend_t *be) {
114     struct mcp_backendconn_s *bec = &be->be[0];
115     if (be->conncount != 1) {
116         int depth = INT_MAX;
117         // TODO: to computationally limit + ensure each connection stays
118         // somewhat warm:
119         // - remember idx of last conn used.
120         // - if next idx has a lower depth, use that one instead
121         // - tick idx (and reset if necessary)
122         // else under low loads only the first conn will ever get used (which
123         // is normally good; but sometimes bad if using stateful firewalls)
124         for (int x = 0; x < be->conncount; x++) {
125             struct mcp_backendconn_s *bec_i = &be->be[x];
126             if (bec_i->bad) {
127                 continue;
128             }
129             if (bec_i->depth == 0) {
130                 bec = bec_i;
131                 break;
132             } else if (bec_i->depth < depth) {
133                 depth = bec_i->depth;
134                 bec = bec_i;
135             }
136         }
137     }
138 
139     return bec;
140 }
141 
_proxy_event_handler_dequeue(proxy_event_thread_t * t)142 static void _proxy_event_handler_dequeue(proxy_event_thread_t *t) {
143     io_head_t head;
144 
145     STAILQ_INIT(&head);
146     STAILQ_INIT(&t->be_head);
147 
148     // Pull the entire stack of inbound into local queue.
149     pthread_mutex_lock(&t->mutex);
150     STAILQ_CONCAT(&head, &t->io_head_in);
151     pthread_mutex_unlock(&t->mutex);
152 
153     while (!STAILQ_EMPTY(&head)) {
154         io_pending_proxy_t *io = STAILQ_FIRST(&head);
155         io->flushed = false;
156 
157         // _no_ mutex on backends. they are owned by the event thread.
158         STAILQ_REMOVE_HEAD(&head, io_next);
159         // paranoia about moving items between lists.
160         io->io_next.stqe_next = NULL;
161 
162         mcp_backend_t *be = io->backend;
163         STAILQ_INSERT_TAIL(&be->io_head, io, io_next);
164         assert(be->depth > -1);
165         be->depth++;
166         if (!be->stacked) {
167             be->stacked = true;
168             STAILQ_INSERT_TAIL(&t->be_head, be, be_next);
169         }
170     }
171 }
172 
_cleanup_backend(mcp_backend_t * be)173 static void _cleanup_backend(mcp_backend_t *be) {
174     for (int x = 0; x < be->conncount; x++) {
175         struct mcp_backendconn_s *bec = &be->be[x];
176         // remove any pending events.
177         if (!be->tunables.down) {
178             int pending = event_pending(&bec->main_event, EV_READ|EV_WRITE|EV_TIMEOUT, NULL);
179             if (pending != 0) {
180                 event_del(&bec->main_event); // an error to call event_del() without event.
181             }
182             pending = event_pending(&bec->write_event, EV_READ|EV_WRITE|EV_TIMEOUT, NULL);
183             if (pending != 0) {
184                 event_del(&bec->write_event); // an error to call event_del() without event.
185             }
186             pending = event_pending(&bec->timeout_event, EV_TIMEOUT, NULL);
187             if (pending != 0) {
188                 event_del(&bec->timeout_event); // an error to call event_del() without event.
189             }
190 
191             // - assert on empty queue
192             assert(STAILQ_EMPTY(&bec->io_head));
193 
194             mcp_tls_shutdown(bec);
195             mcmc_disconnect(bec->client);
196 
197             if (bec->bad) {
198                 mcp_sharedvm_delta(bec->event_thread->ctx, SHAREDVM_BACKEND_IDX,
199                     bec->be_parent->label, -1);
200             }
201         }
202         // - free be->client
203         free(bec->client);
204         // - free be->rbuf
205         free(bec->rbuf);
206     }
207     // free once parent has had all connections closed off.
208     free(be);
209 }
210 
_setup_backend(mcp_backend_t * be)211 static void _setup_backend(mcp_backend_t *be) {
212     for (int x = 0; x < be->conncount; x++) {
213         struct mcp_backendconn_s *bec = &be->be[x];
214         if (be->tunables.down) {
215             // backend is "forced" into a bad state. never connect or
216             // otherwise attempt to use it.
217             be->be[x].bad = true;
218             continue;
219         }
220         // assign the initial events to the backend, so we don't have to
221         // constantly check if they were initialized yet elsewhere.
222         // note these events will not fire until event_add() is called.
223         int status = mcmc_connect(bec->client, be->name, be->port, bec->connect_flags);
224         event_callback_fn _beconn_handler = &proxy_beconn_handler;
225         event_callback_fn _backend_handler = &proxy_backend_handler;
226         if (be->tunables.use_tls) {
227             _beconn_handler = &proxy_beconn_tls_handler;
228             _backend_handler = &proxy_backend_tls_handler;
229         }
230         event_assign(&bec->main_event, bec->event_thread->base, mcmc_fd(bec->client), EV_WRITE|EV_TIMEOUT, _beconn_handler, bec);
231         event_assign(&bec->write_event, bec->event_thread->base, mcmc_fd(bec->client), EV_WRITE|EV_TIMEOUT, _backend_handler, bec);
232         event_assign(&bec->timeout_event, bec->event_thread->base, -1, EV_TIMEOUT, _backend_handler, bec);
233 
234         if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) {
235             // if we're already connected for some reason, still push it
236             // through the connection handler to keep the code unified. It
237             // will auto-wake because the socket is writeable.
238             bec->connecting = true;
239             bec->can_write = false;
240             // kick off the event we intialized above.
241             event_add(&bec->main_event, &bec->tunables.connect);
242         } else {
243             _reset_bad_backend(bec, P_BE_FAIL_CONNECTING);
244         }
245     }
246 }
247 
248 // event handler for injecting backends for processing
249 // currently just for initiating connections the first time.
proxy_event_beconn(evutil_socket_t fd,short which,void * arg)250 static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) {
251     proxy_event_thread_t *t = arg;
252 
253 #ifdef USE_EVENTFD
254     uint64_t u;
255     if (read(fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
256         // Temporary error or wasn't actually ready to read somehow.
257         return;
258     }
259 #else
260     char buf[1];
261     if (read(fd, buf, 1) != 1) {
262         P_DEBUG("%s: pipe read failed\n", __func__);
263         return;
264     }
265 #endif
266 
267     beconn_head_t head;
268 
269     STAILQ_INIT(&head);
270     pthread_mutex_lock(&t->mutex);
271     STAILQ_CONCAT(&head, &t->beconn_head_in);
272     pthread_mutex_unlock(&t->mutex);
273 
274     // Think we should reuse this code path for manually instructing backends
275     // to disable/etc but not coding for that generically. We just need to
276     // check the state of the backend when it reaches here or some flags at
277     // least.
278     // FIXME: another ->stacked flag?
279     // Either that or remove the STAILQ code and just using an array of
280     // ptr's.
281     mcp_backend_t *be = NULL;
282     // be can be freed by the loop, so can't use STAILQ_FOREACH.
283     while (!STAILQ_EMPTY(&head)) {
284         be = STAILQ_FIRST(&head);
285         STAILQ_REMOVE_HEAD(&head, beconn_next);
286         if (be->transferred) {
287             // If this object was already transferred here, we're being
288             // signalled to clean it up and free.
289             _cleanup_backend(be);
290         } else {
291             be->transferred = true;
292             _setup_backend(be);
293         }
294     }
295 }
296 
_proxy_flush_backend_queue(mcp_backend_t * be)297 static void _proxy_flush_backend_queue(mcp_backend_t *be) {
298     io_pending_proxy_t *io = NULL;
299     P_DEBUG("%s: fast failing request to bad backend (%s:%s) depth: %d\n", __func__, be->name, be->port, be->depth);
300 
301     while (!STAILQ_EMPTY(&be->io_head)) {
302         io = STAILQ_FIRST(&be->io_head);
303         STAILQ_REMOVE_HEAD(&be->io_head, io_next);
304         mcp_resp_set_elapsed(io->client_resp);
305         io->client_resp->status = MCMC_ERR;
306         io->client_resp->resp.code = MCMC_CODE_SERVER_ERROR;
307         be->depth--;
308         assert(be->depth > -1);
309         return_io_pending((io_pending_t *)io);
310     }
311 }
312 
proxy_run_backend_queue(be_head_t * head)313 void proxy_run_backend_queue(be_head_t *head) {
314     mcp_backend_t *be;
315     STAILQ_FOREACH(be, head, be_next) {
316         be->stacked = false;
317         int flags = 0;
318         struct mcp_backendconn_s *bec = proxy_choose_beconn(be);
319 
320         int limit = be->tunables.backend_depth_limit;
321         if (bec->bad) {
322             // TODO: another counter for fast fails?
323             _proxy_flush_backend_queue(be);
324             continue;
325         } else if (limit && bec->depth > limit) {
326             proxy_ctx_t *ctx = bec->event_thread->ctx;
327             STAT_INCR(ctx, request_failed_depth, be->depth);
328             _proxy_flush_backend_queue(be);
329             continue;
330         }
331 
332         // drop new requests onto end of conn's io-head, reset the backend one.
333         STAILQ_CONCAT(&bec->io_head, &be->io_head);
334         bec->depth += be->depth;
335         be->depth = 0;
336 
337         if (bec->connecting || bec->validating) {
338             P_DEBUG("%s: deferring IO pending connecting (%s:%s)\n", __func__, be->name, be->port);
339         } else {
340             if (!bec->ssl) {
341                 flags = _flush_pending_write(bec);
342             } else {
343                 flags = _flush_pending_tls_write(bec);
344             }
345 
346             if (flags == -1) {
347                 _reset_bad_backend(bec, P_BE_FAIL_WRITING);
348             } else if (flags & EV_WRITE) {
349                 // only get here because we need to kick off the write handler
350                 _start_write_event(bec);
351             }
352 
353             if (bec->pending_read) {
354                 _start_timeout_event(bec);
355             }
356 
357         }
358     }
359 }
360 
361 // event handler for executing backend requests
proxy_event_handler(evutil_socket_t fd,short which,void * arg)362 static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
363     proxy_event_thread_t *t = arg;
364 
365 #ifdef USE_EVENTFD
366     uint64_t u;
367     if (read(fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
368         // Temporary error or wasn't actually ready to read somehow.
369         return;
370     }
371 #else
372     char buf[1];
373     // TODO (v2): This is a lot more fatal than it should be. can it fail? can
374     // it blow up the server?
375     // TODO (v2): a cross-platform method of speeding this up would be nice. With
376     // event fds we can queue N events and wakeup once here.
377     // If we're pulling one byte out of the pipe at a time here it'll just
378     // wake us up too often.
379     // If the pipe is O_NONBLOCK then maybe just a larger read would work?
380     if (read(fd, buf, 1) != 1) {
381         P_DEBUG("%s: pipe read failed\n", __func__);
382         return;
383     }
384 #endif
385 
386     _proxy_event_handler_dequeue(t);
387 
388     // Re-walk each backend and check set event as required.
389     proxy_run_backend_queue(&t->be_head);
390 }
391 
proxy_event_thread(void * arg)392 void *proxy_event_thread(void *arg) {
393     proxy_event_thread_t *t = arg;
394 
395     logger_create(); // TODO (v2): add logger ptr to structure
396     event_base_loop(t->base, 0);
397     event_base_free(t->base);
398 
399     // TODO (v2): join bt threads, free array.
400 
401     return NULL;
402 }
403 
_set_main_event(struct mcp_backendconn_s * be,struct event_base * base,int flags,struct timeval * t,event_callback_fn callback)404 static void _set_main_event(struct mcp_backendconn_s *be, struct event_base *base, int flags, struct timeval *t, event_callback_fn callback) {
405     int pending = event_pending(&be->main_event, EV_READ|EV_WRITE|EV_TIMEOUT, NULL);
406     if (pending != 0) {
407         event_del(&be->main_event); // replace existing event.
408     }
409 
410     int fd = mcmc_fd(be->client);
411     if (fd == 0) {
412         fd = -1; // need to pass -1 to event assign if we're not operating on
413                  // a connection.
414     }
415     event_assign(&be->main_event, base, fd,
416             flags, callback, be);
417     event_add(&be->main_event, t);
418 }
419 
_stop_main_event(struct mcp_backendconn_s * be)420 static void _stop_main_event(struct mcp_backendconn_s *be) {
421     event_del(&be->main_event);
422 }
423 
_start_write_event(struct mcp_backendconn_s * be)424 static void _start_write_event(struct mcp_backendconn_s *be) {
425     int pending = event_pending(&be->write_event, EV_WRITE|EV_TIMEOUT, NULL);
426     if (pending != 0) {
427         return;
428     }
429     // FIXME: wasn't there a write timeout?
430     event_add(&be->write_event, &be->tunables.read);
431 }
432 
_stop_write_event(struct mcp_backendconn_s * be)433 static void _stop_write_event(struct mcp_backendconn_s *be) {
434     event_del(&be->write_event);
435 }
436 
437 // handle the read timeouts with a side event, so we can stick with a
438 // persistent listener (optimization + catch disconnects faster)
_start_timeout_event(struct mcp_backendconn_s * be)439 static void _start_timeout_event(struct mcp_backendconn_s *be) {
440     int pending = event_pending(&be->timeout_event, EV_TIMEOUT, NULL);
441     if (pending != 0) {
442         return;
443     }
444     event_add(&be->timeout_event, &be->tunables.read);
445 }
446 
_stop_timeout_event(struct mcp_backendconn_s * be)447 static void _stop_timeout_event(struct mcp_backendconn_s *be) {
448     int pending = event_pending(&be->timeout_event, EV_TIMEOUT, NULL);
449     if (pending == 0) {
450         return;
451     }
452     event_del(&be->timeout_event);
453 }
454 
_drive_machine_next(struct mcp_backendconn_s * be,io_pending_proxy_t * p)455 static void _drive_machine_next(struct mcp_backendconn_s *be, io_pending_proxy_t *p) {
456     // set the head here. when we break the head will be correct.
457     STAILQ_REMOVE_HEAD(&be->io_head, io_next);
458     be->depth--;
459     assert(p != be->io_next); // don't remove what we need to flush.
460     assert(be->depth > -1);
461     be->pending_read--;
462     assert(be->pending_read > -1);
463 
464     mcp_resp_set_elapsed(p->client_resp);
465     // have to do the q->count-- and == 0 and redispatch_conn()
466     // stuff here. The moment we call return_io here we
467     // don't own *p anymore.
468     return_io_pending((io_pending_t *)p);
469     be->state = mcp_backend_read;
470 }
471 
472 // NOTES:
473 // - mcp_backend_read: grab req_stack_head, do things
474 // read -> next, want_read -> next | read_end, etc.
proxy_backend_drive_machine(struct mcp_backendconn_s * be)475 static int proxy_backend_drive_machine(struct mcp_backendconn_s *be) {
476     bool stop = false;
477     io_pending_proxy_t *p = NULL;
478     int flags = 0;
479 
480     p = STAILQ_FIRST(&be->io_head);
481     if (p == NULL) {
482         // got a read event, but nothing was queued.
483         // probably means a disconnect event.
484         // TODO (v2): could probably confirm this by attempting to read the
485         // socket, getsockopt, or something else simply for logging or
486         // statistical purposes.
487         // In this case we know it's going to be a close so error.
488         flags = P_BE_FAIL_CLOSED;
489         P_DEBUG("%s: read event but nothing in IO queue\n", __func__);
490         return flags;
491     }
492 
493     while (!stop) {
494         mcp_resp_t *r;
495 
496     switch(be->state) {
497         case mcp_backend_read:
498             assert(p != NULL);
499             // FIXME: remove the _read state?
500             be->state = mcp_backend_parse;
501             break;
502         case mcp_backend_parse:
503             r = p->client_resp;
504             r->status = mcmc_parse_buf(be->rbuf, be->rbufused, &r->resp);
505 
506             // Quick check if we need more data.
507             if (r->resp.code == MCMC_WANT_READ) {
508                 return 0;
509             }
510 
511             // we actually don't care about anything but the value length
512             // TODO (v2): if vlen != vlen_read, pull an item and copy the data.
513             int extra_space = 0;
514             // if all goes well, move to the next request.
515             be->state = mcp_backend_next;
516             switch (r->resp.type) {
517                 case MCMC_RESP_GET:
518                     // We're in GET mode. we only support one key per
519                     // GET in the proxy backends, so we need to later check
520                     // for an END.
521                     extra_space = ENDLEN;
522                     be->state = mcp_backend_read_end;
523                     break;
524                 case MCMC_RESP_END:
525                     // this is a MISS from a GET request
526                     // or final handler from a STAT request.
527                     assert(r->resp.vlen == 0);
528                     if (p->ascii_multiget) {
529                         // Ascii multiget hack mode; consume END's
530                         be->rbufused -= r->resp.reslen;
531                         if (be->rbufused > 0) {
532                             memmove(be->rbuf, be->rbuf+r->resp.reslen, be->rbufused);
533                         }
534 
535                         be->state = mcp_backend_next;
536                         continue;
537                     }
538                     break;
539                 case MCMC_RESP_META:
540                     // we can handle meta responses easily since they're self
541                     // contained.
542                     break;
543                 case MCMC_RESP_GENERIC:
544                 case MCMC_RESP_NUMERIC:
545                     break;
546                 case MCMC_RESP_ERRMSG: // received an error message
547                     if (r->resp.code != MCMC_CODE_SERVER_ERROR) {
548                         // Non server errors are protocol errors; can't trust
549                         // the connection anymore.
550                         be->state = mcp_backend_next_close;
551                     }
552                     break;
553                 case MCMC_RESP_FAIL:
554                     P_DEBUG("%s: mcmc_read failed [%d]\n", __func__, r->status);
555                     flags = P_BE_FAIL_PARSING;
556                     stop = true;
557                     break;
558                 // TODO (v2): No-op response?
559                 default:
560                     P_DEBUG("%s: Unhandled response from backend: %d\n", __func__, r->resp.type);
561                     // unhandled :(
562                     flags = P_BE_FAIL_UNHANDLEDRES;
563                     stop = true;
564                     break;
565             }
566 
567             // r->resp.reslen + r->resp.vlen is the total length of the response.
568             // TODO (v2): need to associate a buffer with this response...
569             // for now we simply malloc, but reusable buffers should be used
570 
571             r->blen = r->resp.reslen + r->resp.vlen;
572             {
573                 bool oom = proxy_bufmem_checkadd(r->thread, r->blen + extra_space);
574 
575                 if (oom) {
576                     flags = P_BE_FAIL_OOM;
577                     // need to zero out blen so we don't over-decrement later
578                     r->blen = 0;
579                     stop = true;
580                     break;
581                 }
582             }
583             r->buf = malloc(r->blen + extra_space);
584             if (r->buf == NULL) {
585                 // Enforce accounting.
586                 pthread_mutex_lock(&r->thread->proxy_limit_lock);
587                 r->thread->proxy_buffer_memory_used -= r->blen + extra_space;
588                 pthread_mutex_unlock(&r->thread->proxy_limit_lock);
589 
590                 flags = P_BE_FAIL_OOM;
591                 r->blen = 0;
592                 stop = true;
593                 break;
594             }
595 
596             P_DEBUG("%s: r->status: %d, r->bread: %d, r->vlen: %lu\n", __func__, r->status, r->bread, r->resp.vlen);
597             if (r->resp.vlen != r->resp.vlen_read) {
598                 // shouldn't be possible to have excess in buffer
599                 // if we're dealing with a partial value.
600                 assert(be->rbufused == r->resp.reslen+r->resp.vlen_read);
601                 P_DEBUG("%s: got a short read, moving to want_read\n", __func__);
602                 // copy the partial and advance mcmc's buffer digestion.
603                 memcpy(r->buf, be->rbuf, r->resp.reslen + r->resp.vlen_read);
604                 r->bread = r->resp.reslen + r->resp.vlen_read;
605                 be->rbufused = 0;
606                 be->state = mcp_backend_want_read;
607                 flags = 0;
608                 stop = true;
609                 break;
610             } else {
611                 // mcmc's already counted the value as read if it fit in
612                 // the original buffer...
613                 memcpy(r->buf, be->rbuf, r->resp.reslen+r->resp.vlen_read);
614             }
615 
616             // had a response, advance the buffer.
617             be->rbufused -= r->resp.reslen + r->resp.vlen_read;
618             if (be->rbufused > 0) {
619                 memmove(be->rbuf, be->rbuf+r->resp.reslen+r->resp.vlen_read, be->rbufused);
620             }
621 
622             break;
623         case mcp_backend_read_end:
624             r = p->client_resp;
625             // we need to ensure the next data in the stream is "END\r\n"
626             // if not, the stack is desynced and we lose it.
627 
628             if (be->rbufused >= ENDLEN) {
629                 if (memcmp(be->rbuf, ENDSTR, ENDLEN) != 0) {
630                     flags = P_BE_FAIL_ENDSYNC;
631                     stop = true;
632                     break;
633                 } else {
634                     // response is good.
635                     // FIXME (v2): copy what the server actually sent?
636                     if (!p->ascii_multiget) {
637                         // sigh... if part of a multiget we need to eat the END
638                         // markers down here.
639                         memcpy(r->buf+r->blen, ENDSTR, ENDLEN);
640                         r->blen += 5;
641                     } else {
642                         r->extra = 5;
643                     }
644 
645                     // advance buffer
646                     be->rbufused -= ENDLEN;
647                     if (be->rbufused > 0) {
648                         memmove(be->rbuf, be->rbuf+ENDLEN, be->rbufused);
649                     }
650                 }
651             } else {
652                 flags = 0;
653                 stop = true;
654                 break;
655             }
656 
657             be->state = mcp_backend_next;
658 
659             break;
660         case mcp_backend_want_read:
661             // Continuing a read from earlier
662             r = p->client_resp;
663             // take bread input and see if we're done reading the value,
664             // else advance, set buffers, return next.
665             P_DEBUG("%s: [want_read] r->bread: %d vlen: %lu\n", __func__, r->bread, r->resp.vlen);
666             assert(be->rbufused != 0);
667             size_t tocopy = be->rbufused < r->blen - r->bread ?
668                 be->rbufused : r->blen - r->bread;
669             memcpy(r->buf+r->bread, be->rbuf, tocopy);
670             r->bread += tocopy;
671 
672             if (r->bread >= r->blen) {
673                 // all done copying data.
674                 if (r->resp.type == MCMC_RESP_GET) {
675                     be->state = mcp_backend_read_end;
676                 } else {
677                     be->state = mcp_backend_next;
678                 }
679 
680                 // shuffle remaining buffer.
681                 be->rbufused -= tocopy;
682                 if (be->rbufused > 0) {
683                     memmove(be->rbuf, be->rbuf+tocopy, be->rbufused);
684                 }
685             } else {
686                 assert(tocopy == be->rbufused);
687                 // signal to caller to issue a read.
688                 be->rbufused = 0;
689                 flags = 0;
690                 stop = true;
691             }
692 
693             break;
694         case mcp_backend_next:
695             _drive_machine_next(be, p);
696 
697             if (STAILQ_EMPTY(&be->io_head)) {
698                 stop = true;
699                 // if there're no pending requests, the read buffer
700                 // should also be empty.
701                 if (be->rbufused > 0) {
702                     flags = P_BE_FAIL_TRAILINGDATA;
703                 }
704                 break;
705             } else {
706                 p = STAILQ_FIRST(&be->io_head);
707             }
708 
709             // if leftover, keep processing IO's.
710             // if no more data in buffer, need to re-set stack head and re-set
711             // event.
712             P_DEBUG("%s: [next] remain: %lu\n", __func__, be->rbufused);
713             if (be->rbufused != 0) {
714                 // data trailing in the buffer, for a different request.
715                 be->state = mcp_backend_parse;
716             } else {
717                 // need to read more data, buffer is empty.
718                 stop = true;
719             }
720 
721             break;
722         case mcp_backend_next_close:
723             // we advance and return the current IO, then kill the conn.
724             _drive_machine_next(be, p);
725             stop = true;
726             flags = P_BE_FAIL_INVALIDPROTOCOL;
727 
728             break;
729         default:
730             // TODO (v2): at some point (after v1?) this should attempt to recover,
731             // though we should only get here from memory corruption and
732             // bailing may be the right thing to do.
733             fprintf(stderr, "%s: invalid backend state: %d\n", __func__, be->state);
734             assert(false);
735     } // switch
736     } // while
737 
738     return flags;
739 }
740 
_backend_reconnect(struct mcp_backendconn_s * be)741 static void _backend_reconnect(struct mcp_backendconn_s *be) {
742     int status = mcmc_connect(be->client, be->be_parent->name, be->be_parent->port, be->connect_flags);
743     if (status == MCMC_CONNECTED) {
744         // TODO (v2): unexpected but lets let it be here.
745         be->connecting = false;
746         be->can_write = true;
747     } else if (status == MCMC_CONNECTING) {
748         be->connecting = true;
749         be->can_write = false;
750     } else {
751         // failed to immediately re-establish the connection.
752         // need to put the BE into a bad/retry state.
753         be->connecting = false;
754         be->can_write = true;
755     }
756     // re-create the write handler for the new file descriptor.
757     // the main event will be re-assigned after this call.
758     event_callback_fn _backend_handler = &proxy_backend_handler;
759     if (be->be_parent->tunables.use_tls) {
760         _backend_handler = &proxy_backend_tls_handler;
761     }
762     event_assign(&be->write_event, be->event_thread->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, _backend_handler, be);
763     // do not need to re-assign the timer event because it's not tied to fd
764 }
765 
766 // All we need to do here is schedule the backend to attempt to connect again.
proxy_backend_retry_handler(const int fd,const short which,void * arg)767 static void proxy_backend_retry_handler(const int fd, const short which, void *arg) {
768     struct mcp_backendconn_s *be = arg;
769     assert(which & EV_TIMEOUT);
770     struct timeval tmp_time = be->tunables.connect;
771     _backend_reconnect(be);
772     event_callback_fn _backend_handler = &proxy_beconn_handler;
773     if (be->be_parent->tunables.use_tls) {
774         _backend_handler = &proxy_beconn_tls_handler;
775     }
776     _set_main_event(be, be->event_thread->base, EV_WRITE, &tmp_time, _backend_handler);
777 }
778 
779 // must be called after _reset_bad_backend(), so the backend is currently
780 // clear.
781 // TODO (v2): extra counter for "backend connect tries" so it's still possible
782 // to see dead backends exist
_backend_reschedule(struct mcp_backendconn_s * be)783 static void _backend_reschedule(struct mcp_backendconn_s *be) {
784     bool failed = false;
785     struct timeval tmp_time = {0};
786     long int retry_time = be->tunables.retry.tv_sec;
787     char *badtext = "markedbad";
788     if (be->flap_count > be->tunables.backend_failure_limit) {
789         // reduce retry frequency to avoid noise.
790         float backoff = retry_time;
791         for (int x = 0; x < be->flap_count; x++) {
792             backoff *= be->tunables.flap_backoff_ramp;
793         }
794         retry_time = (uint32_t)backoff;
795 
796         if (retry_time > be->tunables.flap_backoff_max) {
797             retry_time = be->tunables.flap_backoff_max;
798         }
799         badtext = "markedbadflap";
800         failed = true;
801     } else if (be->failed_count > be->tunables.backend_failure_limit) {
802         failed = true;
803     }
804     tmp_time.tv_sec = retry_time;
805 
806     if (failed) {
807         if (!be->bad) {
808             P_DEBUG("%s: marking backend as bad\n", __func__);
809             STAT_INCR(be->event_thread->ctx, backend_marked_bad, 1);
810             mcp_sharedvm_delta(be->event_thread->ctx, SHAREDVM_BACKEND_IDX,
811                     be->be_parent->label, 1);
812             LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_BE_ERROR, NULL, badtext, be->be_parent->name, be->be_parent->port, be->be_parent->label, 0, NULL, 0, retry_time);
813         }
814         be->bad = true;
815        _set_main_event(be, be->event_thread->base, EV_TIMEOUT, &tmp_time, proxy_backend_retry_handler);
816     } else {
817         struct timeval tmp_time = be->tunables.connect;
818         STAT_INCR(be->event_thread->ctx, backend_failed, 1);
819         _backend_reconnect(be);
820         event_callback_fn _backend_handler = &proxy_beconn_handler;
821         if (be->be_parent->tunables.use_tls) {
822             _backend_handler = &proxy_beconn_tls_handler;
823         }
824         _set_main_event(be, be->event_thread->base, EV_WRITE, &tmp_time, _backend_handler);
825     }
826 }
827 
_backend_flap_check(struct mcp_backendconn_s * be,enum proxy_be_failures err)828 static void _backend_flap_check(struct mcp_backendconn_s *be, enum proxy_be_failures err) {
829     struct timeval now;
830     struct timeval *flap = &be->tunables.flap;
831 
832     switch (err) {
833         case P_BE_FAIL_TIMEOUT:
834         case P_BE_FAIL_DISCONNECTED:
835         case P_BE_FAIL_WRITING:
836         case P_BE_FAIL_READING:
837             if (flap->tv_sec != 0 || flap->tv_usec != 0) {
838                 struct timeval delta = {0};
839                 int64_t subsec = 0;
840                 gettimeofday(&now, NULL);
841                 delta.tv_sec = now.tv_sec - be->last_failed.tv_sec;
842                 subsec = now.tv_usec - be->last_failed.tv_usec;
843                 if (subsec < 0) {
844                     // tv_usec is specced as "at least" [-1, 1000000]
845                     // so to guarantee lower negatives we need this temp var.
846                     delta.tv_sec--;
847                     subsec += 1000000;
848                     delta.tv_usec = subsec;
849                 }
850 
851                 if (flap->tv_sec < delta.tv_sec ||
852                     (flap->tv_sec == delta.tv_sec && flap->tv_usec < delta.tv_usec)) {
853                     // delta is larger than our flap range. reset the flap counter.
854                     be->flap_count = 0;
855                 } else {
856                     // seems like we flapped again.
857                     be->flap_count++;
858                 }
859                 be->last_failed = now;
860             }
861             break;
862         default:
863             // only perform a flap check on network related errors.
864             break;
865     }
866 }
867 
868 // TODO (v2): add a second argument for assigning a specific error to all pending
869 // IO's (ie; timeout).
870 // The backend has gotten into a bad state (timed out, protocol desync, or
871 // some other supposedly unrecoverable error: purge the queue and
872 // cycle the socket.
873 // Note that some types of errors may not require flushing the queue and
874 // should be fixed as they're figured out.
875 // _must_ be called from within the event thread.
_reset_bad_backend(struct mcp_backendconn_s * be,enum proxy_be_failures err)876 static void _reset_bad_backend(struct mcp_backendconn_s *be, enum proxy_be_failures err) {
877     io_pending_proxy_t *io = NULL;
878     P_DEBUG("%s: resetting bad backend: [fd: %d] %s\n", __func__, mcmc_fd(be->client), proxy_be_failure_text[err]);
879     // Can't use STAILQ_FOREACH() since r_io_p() free's the current
880     // io. STAILQ_FOREACH_SAFE maybe?
881     int depth = be->depth;
882     while (!STAILQ_EMPTY(&be->io_head)) {
883         io = STAILQ_FIRST(&be->io_head);
884         STAILQ_REMOVE_HEAD(&be->io_head, io_next);
885         // TODO (v2): Unsure if this is the best way of surfacing errors to lua,
886         // but will do for V1.
887         mcp_resp_set_elapsed(io->client_resp);
888         io->client_resp->status = MCMC_ERR;
889         io->client_resp->resp.code = MCMC_CODE_SERVER_ERROR;
890         be->depth--;
891         assert(be->depth > -1);
892         return_io_pending((io_pending_t *)io);
893     }
894 
895     STAILQ_INIT(&be->io_head);
896     be->io_next = NULL; // also reset the write offset.
897 
898     // Only log if we don't already know it's messed up.
899     if (!be->bad) {
900         LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_BE_ERROR, NULL, proxy_be_failure_text[err], be->be_parent->name, be->be_parent->port, be->be_parent->label, depth, be->rbuf, be->rbufused, 0);
901     }
902 
903     // reset buffer to blank state.
904     be->rbufused = 0;
905     be->pending_read = 0;
906     // clear events so the reconnect handler can re-arm them with a few fd.
907     _stop_write_event(be);
908     _stop_main_event(be);
909     _stop_timeout_event(be);
910     mcp_tls_shutdown(be);
911     mcmc_disconnect(be->client);
912     // we leave the main event alone, because be_failed() always overwrites.
913 
914     // check failure counters and schedule a retry.
915     be->failed_count++;
916     _backend_flap_check(be, err);
917     _backend_reschedule(be);
918 }
919 
_prep_pending_write(struct mcp_backendconn_s * be)920 static int _prep_pending_write(struct mcp_backendconn_s *be) {
921     struct iovec *iovs = be->write_iovs;
922     io_pending_proxy_t *io = NULL;
923     int iovused = 0;
924     if (be->io_next == NULL) {
925         // separate pointer for how far into the list we've flushed.
926         io = STAILQ_FIRST(&be->io_head);
927     } else {
928         io = be->io_next;
929     }
930     assert(io != NULL);
931     for (; io; io = STAILQ_NEXT(io, io_next)) {
932         // TODO (v2): paranoia for now, but this check should never fire
933         if (io->flushed)
934             continue;
935 
936         if (io->iovcnt + iovused > BE_IOV_MAX) {
937             // We will need to keep writing later.
938             break;
939         }
940 
941         memcpy(&iovs[iovused], io->iov, sizeof(struct iovec)*io->iovcnt);
942         iovused += io->iovcnt;
943     }
944     return iovused;
945 }
946 
947 // returns true if any pending writes were fully flushed.
_post_pending_write(struct mcp_backendconn_s * be,ssize_t sent)948 static void _post_pending_write(struct mcp_backendconn_s *be, ssize_t sent) {
949     io_pending_proxy_t *io = be->io_next;
950     if (io == NULL) {
951         io = STAILQ_FIRST(&be->io_head);
952     }
953 
954     for (; io; io = STAILQ_NEXT(io, io_next)) {
955         bool flushed = true;
956         if (io->flushed)
957             continue;
958         if (sent >= io->iovbytes) {
959             // short circuit for common case.
960             sent -= io->iovbytes;
961         } else {
962             io->iovbytes -= sent;
963             for (int x = 0; x < io->iovcnt; x++) {
964                 struct iovec *iov = &io->iov[x];
965                 if (sent >= iov->iov_len) {
966                     sent -= iov->iov_len;
967                     iov->iov_len = 0;
968                 } else {
969                     iov->iov_len -= sent;
970                     iov->iov_base = (char *)iov->iov_base + sent;
971                     sent = 0;
972                     flushed = false;
973                     break;
974                 }
975             }
976         }
977         io->flushed = flushed;
978         if (flushed) {
979             be->pending_read++;
980         }
981 
982         if (sent <= 0) {
983             // really shouldn't be negative, though.
984             assert(sent >= 0);
985             break;
986         }
987     } // for
988 
989     // resume the flush from this point.
990     if (io != NULL) {
991         if (!io->flushed) {
992             be->io_next = io;
993         } else {
994             // Check for incomplete list because we hit the iovcnt limit.
995             io_pending_proxy_t *nio = STAILQ_NEXT(io, io_next);
996             if (nio != NULL && !nio->flushed) {
997                 be->io_next = nio;
998             } else {
999                 be->io_next = NULL;
1000             }
1001         }
1002     } else {
1003         be->io_next = NULL;
1004     }
1005 }
1006 
_flush_pending_write(struct mcp_backendconn_s * be)1007 static int _flush_pending_write(struct mcp_backendconn_s *be) {
1008     int flags = 0;
1009     // Allow us to be called with an empty stack to prevent dev errors.
1010     if (STAILQ_EMPTY(&be->io_head)) {
1011         return 0;
1012     }
1013 
1014     int iovcnt = _prep_pending_write(be);
1015 
1016     ssize_t sent = writev(mcmc_fd(be->client), be->write_iovs, iovcnt);
1017     if (sent > 0) {
1018         _post_pending_write(be, sent);
1019         // still have unflushed pending IO's, check for write and re-loop.
1020         if (be->io_next) {
1021             be->can_write = false;
1022             flags |= EV_WRITE;
1023         }
1024     } else if (sent == -1) {
1025         if (errno == EAGAIN || errno == EWOULDBLOCK) {
1026             be->can_write = false;
1027             flags |= EV_WRITE;
1028         } else {
1029             flags = -1;
1030         }
1031     }
1032 
1033     return flags;
1034 }
1035 
_flush_pending_tls_write(struct mcp_backendconn_s * be)1036 static int _flush_pending_tls_write(struct mcp_backendconn_s *be) {
1037     int flags = 0;
1038     // Allow us to be called with an empty stack to prevent dev errors.
1039     if (STAILQ_EMPTY(&be->io_head)) {
1040         return 0;
1041     }
1042 
1043     int iovcnt = _prep_pending_write(be);
1044 
1045     int sent = mcp_tls_writev(be, iovcnt);
1046     if (sent > 0) {
1047         _post_pending_write(be, sent);
1048         // FIXME: can _post_pending_write do this and return EV_WRITE?
1049         // still have unflushed pending IO's, check for write and re-loop.
1050         if (be->io_next) {
1051             be->can_write = false;
1052             flags |= EV_WRITE;
1053         }
1054     } else if (sent == MCP_TLS_NEEDIO) {
1055         // want io
1056         be->can_write = false;
1057         flags |= EV_WRITE;
1058     } else if (sent == MCP_TLS_ERR) {
1059         // hard error from tls
1060         flags = -1;
1061     }
1062 
1063     return flags;
1064 }
1065 
1066 
proxy_bevalidate_tls_handler(const int fd,const short which,void * arg)1067 static void proxy_bevalidate_tls_handler(const int fd, const short which, void *arg) {
1068     assert(arg != NULL);
1069     struct mcp_backendconn_s *be = arg;
1070     int flags = EV_TIMEOUT;
1071     struct timeval tmp_time = be->tunables.read;
1072 
1073     if (which & EV_TIMEOUT) {
1074         P_DEBUG("%s: backend timed out while connecting [fd: %d]\n", __func__, mcmc_fd(be->client));
1075         if (be->connecting) {
1076             _reset_bad_backend(be, P_BE_FAIL_CONNTIMEOUT);
1077         } else {
1078             _reset_bad_backend(be, P_BE_FAIL_READVALIDATE);
1079         }
1080         return;
1081     }
1082 
1083     if (which & EV_READ) {
1084         int read = mcp_tls_read(be);
1085 
1086         if (read > 0) {
1087             mcmc_resp_t r;
1088 
1089             int status = mcmc_parse_buf(be->rbuf, be->rbufused, &r);
1090             if (status == MCMC_ERR) {
1091                 // Needed more data for a version line, somehow. I feel like
1092                 // this should set off some alarms, but it is possible.
1093                 if (r.code == MCMC_WANT_READ) {
1094                     _set_main_event(be, be->event_thread->base, EV_READ, &tmp_time, proxy_bevalidate_tls_handler);
1095                     return;
1096                 }
1097 
1098                 _reset_bad_backend(be, P_BE_FAIL_READVALIDATE);
1099                 return;
1100             }
1101 
1102             if (r.code != MCMC_CODE_VERSION) {
1103                 _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE);
1104                 return;
1105             }
1106 
1107             be->validating = false;
1108             be->rbufused = 0;
1109         } else if (read == 0) {
1110             // not connected or error.
1111             _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
1112             return;
1113         } else if (read == MCP_TLS_NEEDIO) {
1114             // try again failure.
1115             _set_main_event(be, be->event_thread->base, EV_READ, &tmp_time, proxy_bevalidate_tls_handler);
1116             return;
1117         } else if (read == MCP_TLS_ERR) {
1118             // hard failure.
1119             _reset_bad_backend(be, P_BE_FAIL_READING);
1120             return;
1121         }
1122 
1123         // Passed validation, don't need to re-read, flush any pending writes.
1124         int res = _flush_pending_tls_write(be);
1125         if (res == -1) {
1126             _reset_bad_backend(be, P_BE_FAIL_WRITING);
1127             return;
1128         }
1129         if (flags & EV_WRITE) {
1130             _start_write_event(be);
1131         }
1132         if (be->pending_read) {
1133             _start_timeout_event(be);
1134         }
1135     }
1136 
1137     // switch to the primary persistent read event.
1138     if (!be->validating) {
1139         _set_main_event(be, be->event_thread->base, EV_READ|EV_PERSIST, NULL, proxy_backend_tls_handler);
1140 
1141         // we're happily validated and switching to normal processing, so
1142         // _now_ the backend is no longer "bad".
1143         // If we reset the failed count earlier we then can fail the
1144         // validation loop indefinitely without ever being marked bad.
1145         if (be->bad) {
1146             // was bad, need to mark as no longer bad in shared space.
1147             mcp_sharedvm_delta(be->event_thread->ctx, SHAREDVM_BACKEND_IDX,
1148                     be->be_parent->label, -1);
1149         }
1150         be->bad = false;
1151         be->failed_count = 0;
1152     }
1153 }
1154 
1155 // Libevent handler when we're in TLS mode. Unfortunately the code is
1156 // different enough to warrant its own function.
proxy_beconn_tls_handler(const int fd,const short which,void * arg)1157 static void proxy_beconn_tls_handler(const int fd, const short which, void *arg) {
1158     assert(arg != NULL);
1159     struct mcp_backendconn_s *be = arg;
1160     //int flags = EV_TIMEOUT;
1161     struct timeval tmp_time = be->tunables.read;
1162 
1163     if (which & EV_TIMEOUT) {
1164         P_DEBUG("%s: backend timed out while connecting [fd: %d]\n", __func__, mcmc_fd(be->client));
1165         if (be->connecting) {
1166             _reset_bad_backend(be, P_BE_FAIL_CONNTIMEOUT);
1167         } else {
1168             _reset_bad_backend(be, P_BE_FAIL_READVALIDATE);
1169         }
1170         return;
1171     }
1172 
1173     if (which & EV_WRITE) {
1174         be->can_write = true;
1175 
1176         if (be->connecting) {
1177             if (_proxy_beconn_checkconnect(be) == -1) {
1178                 return;
1179             }
1180             // TODO: check return code.
1181             mcp_tls_connect(be);
1182             // fall through to handshake attempt.
1183         }
1184     }
1185 
1186     assert(be->validating);
1187     int ret = mcp_tls_handshake(be);
1188     if (ret == MCP_TLS_NEEDIO) {
1189         // Need to try again.
1190         _set_main_event(be, be->event_thread->base, EV_READ, &tmp_time, proxy_beconn_tls_handler);
1191         return;
1192     } else if (ret == 1) {
1193         // handshake complete.
1194         if (mcp_tls_send_validate(be) != MCP_TLS_OK) {
1195             _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE);
1196             return;
1197         }
1198 
1199         // switch to another handler for the final stage.
1200         _set_main_event(be, be->event_thread->base, EV_READ, &tmp_time, proxy_bevalidate_tls_handler);
1201     } else if (ret < 0) {
1202         // FIXME: FAIL_HANDSHAKE
1203         _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE);
1204         return;
1205     }
1206 }
1207 
1208 // Libevent handler for backends in a connecting state.
proxy_beconn_handler(const int fd,const short which,void * arg)1209 static void proxy_beconn_handler(const int fd, const short which, void *arg) {
1210     assert(arg != NULL);
1211     struct mcp_backendconn_s *be = arg;
1212     int flags = EV_TIMEOUT;
1213     struct timeval tmp_time = be->tunables.read;
1214 
1215     if (which & EV_TIMEOUT) {
1216         P_DEBUG("%s: backend timed out while connecting [fd: %d]\n", __func__, mcmc_fd(be->client));
1217         if (be->connecting) {
1218             _reset_bad_backend(be, P_BE_FAIL_CONNTIMEOUT);
1219         } else {
1220             _reset_bad_backend(be, P_BE_FAIL_READVALIDATE);
1221         }
1222         return;
1223     }
1224 
1225     if (which & EV_WRITE) {
1226         be->can_write = true;
1227 
1228         if (be->connecting) {
1229             if (_proxy_beconn_checkconnect(be) == -1) {
1230                 return;
1231             }
1232             if (_beconn_send_validate(be) == -1) {
1233                 _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE);
1234                 return;
1235             }
1236             _set_main_event(be, be->event_thread->base, EV_READ, &tmp_time, proxy_beconn_handler);
1237         }
1238 
1239         // TODO: currently never taken, until validation is made optional.
1240         if (!be->validating) {
1241             int res = _flush_pending_write(be);
1242             if (res == -1) {
1243                 _reset_bad_backend(be, P_BE_FAIL_WRITING);
1244                 return;
1245             }
1246             flags |= res;
1247             // FIXME: set write event?
1248         }
1249     }
1250 
1251     if (which & EV_READ) {
1252         assert(be->validating);
1253 
1254         int read = recv(mcmc_fd(be->client), be->rbuf + be->rbufused, READ_BUFFER_SIZE - be->rbufused, 0);
1255         if (read > 0) {
1256             mcmc_resp_t r;
1257             be->rbufused += read;
1258 
1259             int status = mcmc_parse_buf(be->rbuf, be->rbufused, &r);
1260             if (status == MCMC_ERR) {
1261                 // Needed more data for a version line, somehow. I feel like
1262                 // this should set off some alarms, but it is possible.
1263                 if (r.code == MCMC_WANT_READ) {
1264                     _set_main_event(be, be->event_thread->base, EV_READ, &tmp_time, proxy_beconn_handler);
1265                     return;
1266                 }
1267 
1268                 _reset_bad_backend(be, P_BE_FAIL_READVALIDATE);
1269                 return;
1270             }
1271 
1272             if (r.code != MCMC_CODE_VERSION) {
1273                 _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE);
1274                 return;
1275             }
1276 
1277             be->validating = false;
1278             be->rbufused = 0;
1279         } else if (read == 0) {
1280             // not connected or error.
1281             _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
1282             return;
1283         } else if (read == -1) {
1284             // sit on epoll again.
1285             if (errno != EAGAIN && errno != EWOULDBLOCK) {
1286                 _reset_bad_backend(be, P_BE_FAIL_READING);
1287                 return;
1288             }
1289             _set_main_event(be, be->event_thread->base, EV_READ, &tmp_time, proxy_beconn_handler);
1290             return;
1291         }
1292 
1293         // Passed validation, don't need to re-read, flush any pending writes.
1294         int res = _flush_pending_write(be);
1295         if (res == -1) {
1296             _reset_bad_backend(be, P_BE_FAIL_WRITING);
1297             return;
1298         }
1299         if (res & EV_WRITE) {
1300             _start_write_event(be);
1301         }
1302         if (be->pending_read) {
1303             _start_timeout_event(be);
1304         }
1305     }
1306 
1307     // switch to the primary persistent read event.
1308     if (!be->validating) {
1309         _set_main_event(be, be->event_thread->base, EV_READ|EV_PERSIST, NULL, proxy_backend_handler);
1310 
1311         // we're happily validated and switching to normal processing, so
1312         // _now_ the backend is no longer "bad".
1313         // If we reset the failed count earlier we then can fail the
1314         // validation loop indefinitely without ever being marked bad.
1315         if (be->bad) {
1316             // was bad, need to mark as no longer bad in shared space.
1317             mcp_sharedvm_delta(be->event_thread->ctx, SHAREDVM_BACKEND_IDX,
1318                     be->be_parent->label, -1);
1319         }
1320         be->bad = false;
1321         be->failed_count = 0;
1322     }
1323 }
1324 
proxy_backend_tls_handler(const int fd,const short which,void * arg)1325 static void proxy_backend_tls_handler(const int fd, const short which, void *arg) {
1326     struct mcp_backendconn_s *be = arg;
1327 
1328     if (which & EV_TIMEOUT) {
1329         P_DEBUG("%s: timeout received, killing backend queue\n", __func__);
1330         _reset_bad_backend(be, P_BE_FAIL_TIMEOUT);
1331         return;
1332     }
1333 
1334     if (which & EV_WRITE) {
1335         be->can_write = true;
1336         int res = _flush_pending_tls_write(be);
1337         if (res == -1) {
1338             _reset_bad_backend(be, P_BE_FAIL_WRITING);
1339             return;
1340         }
1341         if (res & EV_WRITE) {
1342             _start_write_event(be);
1343         }
1344     }
1345 
1346     if (which & EV_READ) {
1347         // got a read event, always kill the pending read timer.
1348         _stop_timeout_event(be);
1349         // We do the syscall here before diving into the state machine to allow a
1350         // common code path for io_uring/epoll/tls/etc
1351         int read = mcp_tls_read(be);
1352         if (read > 0) {
1353             int res = proxy_backend_drive_machine(be);
1354             if (res != 0) {
1355                 _reset_bad_backend(be, res);
1356                 return;
1357             }
1358         } else if (read == 0) {
1359             // not connected or error.
1360             _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
1361             return;
1362         } else if (read == MCP_TLS_NEEDIO) {
1363             // sit on epoll again.
1364             return;
1365         } else if (read == MCP_TLS_ERR) {
1366             _reset_bad_backend(be, P_BE_FAIL_READING);
1367             return;
1368         }
1369 
1370 #ifdef PROXY_DEBUG
1371         if (!STAILQ_EMPTY(&be->io_head)) {
1372             P_DEBUG("backend has leftover IOs: %d\n", be->depth);
1373         }
1374 #endif
1375     }
1376 
1377     if (be->pending_read) {
1378         _start_timeout_event(be);
1379     }
1380 }
1381 
1382 // The libevent backend callback handler.
1383 // If we end up resetting a backend, it will get put back into a connecting
1384 // state.
proxy_backend_handler(const int fd,const short which,void * arg)1385 static void proxy_backend_handler(const int fd, const short which, void *arg) {
1386     struct mcp_backendconn_s *be = arg;
1387 
1388     if (which & EV_TIMEOUT) {
1389         P_DEBUG("%s: timeout received, killing backend queue\n", __func__);
1390         _reset_bad_backend(be, P_BE_FAIL_TIMEOUT);
1391         return;
1392     }
1393 
1394     if (which & EV_WRITE) {
1395         be->can_write = true;
1396         int res = _flush_pending_write(be);
1397         if (res == -1) {
1398             _reset_bad_backend(be, P_BE_FAIL_WRITING);
1399             return;
1400         }
1401         if (res & EV_WRITE) {
1402             _start_write_event(be);
1403         }
1404     }
1405 
1406     if (which & EV_READ) {
1407         // got a read event, always kill the pending read timer.
1408         _stop_timeout_event(be);
1409         // We do the syscall here before diving into the state machine to allow a
1410         // common code path for io_uring/epoll
1411         int read = recv(mcmc_fd(be->client), be->rbuf + be->rbufused,
1412                     READ_BUFFER_SIZE - be->rbufused, 0);
1413         if (read > 0) {
1414             be->rbufused += read;
1415             int res = proxy_backend_drive_machine(be);
1416             if (res != 0) {
1417                 _reset_bad_backend(be, res);
1418                 return;
1419             }
1420         } else if (read == 0) {
1421             // not connected or error.
1422             _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
1423             return;
1424         } else if (read == -1) {
1425             // sit on epoll again.
1426             if (errno != EAGAIN && errno != EWOULDBLOCK) {
1427                 _reset_bad_backend(be, P_BE_FAIL_READING);
1428                 return;
1429             }
1430         }
1431 
1432 #ifdef PROXY_DEBUG
1433         if (!STAILQ_EMPTY(&be->io_head)) {
1434             P_DEBUG("backend has leftover IOs: %d\n", be->depth);
1435         }
1436 #endif
1437     }
1438 
1439     if (be->pending_read) {
1440         _start_timeout_event(be);
1441     }
1442 }
1443 
proxy_init_event_thread(proxy_event_thread_t * t,proxy_ctx_t * ctx,struct event_base * base)1444 void proxy_init_event_thread(proxy_event_thread_t *t, proxy_ctx_t *ctx, struct event_base *base) {
1445     t->ctx = ctx;
1446 #ifdef USE_EVENTFD
1447     t->event_fd = eventfd(0, EFD_NONBLOCK);
1448     if (t->event_fd == -1) {
1449         perror("failed to create backend notify eventfd");
1450         exit(1);
1451     }
1452     t->be_event_fd = eventfd(0, EFD_NONBLOCK);
1453     if (t->be_event_fd == -1) {
1454         perror("failed to create backend notify eventfd");
1455         exit(1);
1456     }
1457 #else
1458     int fds[2];
1459     if (pipe(fds)) {
1460         perror("can't create proxy backend notify pipe");
1461         exit(1);
1462     }
1463 
1464     t->notify_receive_fd = fds[0];
1465     t->notify_send_fd = fds[1];
1466 
1467     if (pipe(fds)) {
1468         perror("can't create proxy backend connection notify pipe");
1469         exit(1);
1470     }
1471     t->be_notify_receive_fd = fds[0];
1472     t->be_notify_send_fd = fds[1];
1473 #endif
1474 
1475     // incoming request queue.
1476     STAILQ_INIT(&t->io_head_in);
1477     STAILQ_INIT(&t->beconn_head_in);
1478     pthread_mutex_init(&t->mutex, NULL);
1479     pthread_cond_init(&t->cond, NULL);
1480 
1481     // initialize the event system.
1482 
1483 #ifdef HAVE_LIBURING
1484     if (t->ctx->use_uring) {
1485         fprintf(stderr, "Sorry, io_uring not supported right now\n");
1486         abort();
1487     }
1488 #endif
1489 
1490     if (base == NULL) {
1491         struct event_config *ev_config;
1492         ev_config = event_config_new();
1493         event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
1494         t->base = event_base_new_with_config(ev_config);
1495         event_config_free(ev_config);
1496         if (! t->base) {
1497             fprintf(stderr, "Can't allocate event base\n");
1498             exit(1);
1499         }
1500     } else {
1501         // reusing an event base from a worker thread.
1502         t->base = base;
1503     }
1504 
1505     // listen for notifications.
1506     // NULL was thread_libevent_process
1507     // FIXME (v2): use modern format? (event_assign)
1508 #ifdef USE_EVENTFD
1509     event_set(&t->notify_event, t->event_fd,
1510           EV_READ | EV_PERSIST, proxy_event_handler, t);
1511     event_set(&t->beconn_event, t->be_event_fd,
1512           EV_READ | EV_PERSIST, proxy_event_beconn, t);
1513 #else
1514     event_set(&t->notify_event, t->notify_receive_fd,
1515           EV_READ | EV_PERSIST, proxy_event_handler, t);
1516     event_set(&t->beconn_event, t->be_notify_receive_fd,
1517           EV_READ | EV_PERSIST, proxy_event_beconn, t);
1518 #endif
1519 
1520     event_base_set(t->base, &t->notify_event);
1521     if (event_add(&t->notify_event, 0) == -1) {
1522         fprintf(stderr, "Can't monitor libevent notify pipe\n");
1523         exit(1);
1524     }
1525     event_base_set(t->base, &t->beconn_event);
1526     if (event_add(&t->beconn_event, 0) == -1) {
1527         fprintf(stderr, "Can't monitor libevent notify pipe\n");
1528         exit(1);
1529     }
1530 }
1531 
1532 
1533