1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  * Thread management for memcached.
4  */
5 #include "memcached.h"
6 #ifdef EXTSTORE
7 #include "storage.h"
8 #endif
9 #ifdef HAVE_EVENTFD
10 #include <sys/eventfd.h>
11 #endif
12 #ifdef PROXY
13 #include "proto_proxy.h"
14 #endif
15 #include <assert.h>
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <string.h>
19 #include <pthread.h>
20 
21 #include "queue.h"
22 #include "tls.h"
23 
24 #ifdef __sun
25 #include <atomic.h>
26 #endif
27 
28 #define ITEMS_PER_ALLOC 64
29 
30 /* An item in the connection queue. */
31 enum conn_queue_item_modes {
32     queue_new_conn,   /* brand new connection. */
33     queue_pause,      /* pause thread */
34     queue_timeout,    /* socket sfd timed out */
35     queue_redispatch, /* return conn from side thread */
36     queue_stop,       /* exit thread */
37 #ifdef PROXY
38     queue_proxy_reload, /* signal proxy to reload worker VM */
39 #endif
40 };
41 typedef struct conn_queue_item CQ_ITEM;
42 struct conn_queue_item {
43     int               sfd;
44     enum conn_states  init_state;
45     int               event_flags;
46     int               read_buffer_size;
47     enum network_transport     transport;
48     enum conn_queue_item_modes mode;
49     conn *c;
50     void    *ssl;
51     uint64_t conntag;
52     enum protocol bproto;
53     io_pending_t *io; // IO when used for deferred IO handling.
54     STAILQ_ENTRY(conn_queue_item) i_next;
55 };
56 
57 /* A connection queue. */
58 typedef struct conn_queue CQ;
59 struct conn_queue {
60     STAILQ_HEAD(conn_ev_head, conn_queue_item) head;
61     pthread_mutex_t lock;
62     cache_t *cache; /* freelisted objects */
63 };
64 
65 /* Locks for cache LRU operations */
66 pthread_mutex_t lru_locks[POWER_LARGEST];
67 
68 /* Connection lock around accepting new connections */
69 pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER;
70 
71 #if !defined(HAVE_GCC_ATOMICS) && !defined(__sun)
72 pthread_mutex_t atomics_mutex = PTHREAD_MUTEX_INITIALIZER;
73 #endif
74 
75 /* Lock for global stats */
76 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
77 
78 /* Lock to cause worker threads to hang up after being woken */
79 static pthread_mutex_t worker_hang_lock;
80 
81 static pthread_mutex_t *item_locks;
82 /* size of the item lock hash table */
83 static uint32_t item_lock_count;
84 static unsigned int item_lock_hashpower;
85 #define hashsize(n) ((unsigned long int)1<<(n))
86 #define hashmask(n) (hashsize(n)-1)
87 
88 /*
89  * Each libevent instance has a wakeup pipe, which other threads
90  * can use to signal that they've put a new connection on its queue.
91  */
92 static LIBEVENT_THREAD *threads;
93 
94 /*
95  * Number of worker threads that have finished setting themselves up.
96  */
97 static int init_count = 0;
98 static pthread_mutex_t init_lock;
99 static pthread_cond_t init_cond;
100 
101 static void notify_worker(LIBEVENT_THREAD *t, CQ_ITEM *item);
102 static void notify_worker_fd(LIBEVENT_THREAD *t, int sfd, enum conn_queue_item_modes mode);
103 static CQ_ITEM *cqi_new(CQ *cq);
104 static void cq_push(CQ *cq, CQ_ITEM *item);
105 
106 static void thread_libevent_process(evutil_socket_t fd, short which, void *arg);
107 static void thread_libevent_ionotify(evutil_socket_t fd, short which, void *arg);
108 
109 /* item_lock() must be held for an item before any modifications to either its
110  * associated hash bucket, or the structure itself.
111  * LRU modifications must hold the item lock, and the LRU lock.
112  * LRU's accessing items must item_trylock() before modifying an item.
113  * Items accessible from an LRU must not be freed or modified
114  * without first locking and removing from the LRU.
115  */
116 
item_lock(uint32_t hv)117 void item_lock(uint32_t hv) {
118     mutex_lock(&item_locks[hv & hashmask(item_lock_hashpower)]);
119 }
120 
item_trylock(uint32_t hv)121 void *item_trylock(uint32_t hv) {
122     pthread_mutex_t *lock = &item_locks[hv & hashmask(item_lock_hashpower)];
123     if (pthread_mutex_trylock(lock) == 0) {
124         return lock;
125     }
126     return NULL;
127 }
128 
item_trylock_unlock(void * lock)129 void item_trylock_unlock(void *lock) {
130     mutex_unlock((pthread_mutex_t *) lock);
131 }
132 
item_unlock(uint32_t hv)133 void item_unlock(uint32_t hv) {
134     mutex_unlock(&item_locks[hv & hashmask(item_lock_hashpower)]);
135 }
136 
wait_for_thread_registration(int nthreads)137 static void wait_for_thread_registration(int nthreads) {
138     while (init_count < nthreads) {
139         pthread_cond_wait(&init_cond, &init_lock);
140     }
141 }
142 
register_thread_initialized(void)143 static void register_thread_initialized(void) {
144     pthread_mutex_lock(&init_lock);
145     init_count++;
146     pthread_cond_signal(&init_cond);
147     pthread_mutex_unlock(&init_lock);
148     /* Force worker threads to pile up if someone wants us to */
149     pthread_mutex_lock(&worker_hang_lock);
150     pthread_mutex_unlock(&worker_hang_lock);
151 }
152 
153 /* Must not be called with any deeper locks held */
pause_threads(enum pause_thread_types type)154 void pause_threads(enum pause_thread_types type) {
155     int i;
156     bool pause_workers = false;
157 
158     switch (type) {
159         case PAUSE_ALL_THREADS:
160             slabs_rebalancer_pause();
161             lru_maintainer_pause();
162             lru_crawler_pause();
163 #ifdef EXTSTORE
164             storage_compact_pause();
165             storage_write_pause();
166 #endif
167         case PAUSE_WORKER_THREADS:
168             pause_workers = true;
169             pthread_mutex_lock(&worker_hang_lock);
170             break;
171         case RESUME_ALL_THREADS:
172             slabs_rebalancer_resume();
173             lru_maintainer_resume();
174             lru_crawler_resume();
175 #ifdef EXTSTORE
176             storage_compact_resume();
177             storage_write_resume();
178 #endif
179         case RESUME_WORKER_THREADS:
180             pthread_mutex_unlock(&worker_hang_lock);
181             break;
182         default:
183             fprintf(stderr, "Unknown lock type: %d\n", type);
184             assert(1 == 0);
185             break;
186     }
187 
188     /* Only send a message if we have one. */
189     if (!pause_workers) {
190         return;
191     }
192 
193     pthread_mutex_lock(&init_lock);
194     init_count = 0;
195     for (i = 0; i < settings.num_threads; i++) {
196         notify_worker_fd(&threads[i], 0, queue_pause);
197     }
198     wait_for_thread_registration(settings.num_threads);
199     pthread_mutex_unlock(&init_lock);
200 }
201 
202 // MUST not be called with any deeper locks held
203 // MUST be called only by parent thread
204 // Note: listener thread is the "main" event base, which has exited its
205 // loop in order to call this function.
stop_threads(void)206 void stop_threads(void) {
207     int i;
208 
209     // assoc can call pause_threads(), so we have to stop it first.
210     stop_assoc_maintenance_thread();
211     if (settings.verbose > 0)
212         fprintf(stderr, "stopped assoc\n");
213 
214     if (settings.verbose > 0)
215         fprintf(stderr, "asking workers to stop\n");
216 
217     pthread_mutex_lock(&worker_hang_lock);
218     pthread_mutex_lock(&init_lock);
219     init_count = 0;
220     for (i = 0; i < settings.num_threads; i++) {
221         notify_worker_fd(&threads[i], 0, queue_stop);
222     }
223     wait_for_thread_registration(settings.num_threads);
224     pthread_mutex_unlock(&init_lock);
225 
226     // All of the workers are hung but haven't done cleanup yet.
227 
228     if (settings.verbose > 0)
229         fprintf(stderr, "asking background threads to stop\n");
230 
231     // stop each side thread.
232     // TODO: Verify these all work if the threads are already stopped
233     stop_item_crawler_thread(CRAWLER_WAIT);
234     if (settings.verbose > 0)
235         fprintf(stderr, "stopped lru crawler\n");
236     if (settings.lru_maintainer_thread) {
237         stop_lru_maintainer_thread();
238         if (settings.verbose > 0)
239             fprintf(stderr, "stopped maintainer\n");
240     }
241     if (settings.slab_reassign) {
242         stop_slab_maintenance_thread();
243         if (settings.verbose > 0)
244             fprintf(stderr, "stopped slab mover\n");
245     }
246     logger_stop();
247     if (settings.verbose > 0)
248         fprintf(stderr, "stopped logger thread\n");
249     stop_conn_timeout_thread();
250     if (settings.verbose > 0)
251         fprintf(stderr, "stopped idle timeout thread\n");
252 
253     // Close all connections then let the workers finally exit.
254     if (settings.verbose > 0)
255         fprintf(stderr, "closing connections\n");
256     conn_close_all();
257     pthread_mutex_unlock(&worker_hang_lock);
258     if (settings.verbose > 0)
259         fprintf(stderr, "reaping worker threads\n");
260     for (i = 0; i < settings.num_threads; i++) {
261         pthread_join(threads[i].thread_id, NULL);
262     }
263 
264     if (settings.verbose > 0)
265         fprintf(stderr, "all background threads stopped\n");
266 
267     // At this point, every background thread must be stopped.
268 }
269 
270 /*
271  * Initializes a connection queue.
272  */
cq_init(CQ * cq)273 static void cq_init(CQ *cq) {
274     pthread_mutex_init(&cq->lock, NULL);
275     STAILQ_INIT(&cq->head);
276     cq->cache = cache_create("cq", sizeof(CQ_ITEM), sizeof(char *));
277     if (cq->cache == NULL) {
278         fprintf(stderr, "Failed to create connection queue cache\n");
279         exit(EXIT_FAILURE);
280     }
281 }
282 
283 /*
284  * Looks for an item on a connection queue, but doesn't block if there isn't
285  * one.
286  * Returns the item, or NULL if no item is available
287  */
cq_pop(CQ * cq)288 static CQ_ITEM *cq_pop(CQ *cq) {
289     CQ_ITEM *item;
290 
291     pthread_mutex_lock(&cq->lock);
292     item = STAILQ_FIRST(&cq->head);
293     if (item != NULL) {
294         STAILQ_REMOVE_HEAD(&cq->head, i_next);
295     }
296     pthread_mutex_unlock(&cq->lock);
297 
298     return item;
299 }
300 
301 /*
302  * Adds an item to a connection queue.
303  */
cq_push(CQ * cq,CQ_ITEM * item)304 static void cq_push(CQ *cq, CQ_ITEM *item) {
305     pthread_mutex_lock(&cq->lock);
306     STAILQ_INSERT_TAIL(&cq->head, item, i_next);
307     pthread_mutex_unlock(&cq->lock);
308 }
309 
310 /*
311  * Returns a fresh connection queue item.
312  */
cqi_new(CQ * cq)313 static CQ_ITEM *cqi_new(CQ *cq) {
314     CQ_ITEM *item = cache_alloc(cq->cache);
315     if (item == NULL) {
316         STATS_LOCK();
317         stats.malloc_fails++;
318         STATS_UNLOCK();
319     }
320     return item;
321 }
322 
323 /*
324  * Frees a connection queue item (adds it to the freelist.)
325  */
cqi_free(CQ * cq,CQ_ITEM * item)326 static void cqi_free(CQ *cq, CQ_ITEM *item) {
327     cache_free(cq->cache, item);
328 }
329 
330 // TODO: Skip notify if queue wasn't empty?
331 // - Requires cq_push() returning a "was empty" flag
332 // - Requires event handling loop to pop the entire queue and work from that
333 // instead of the ev_count work there now.
334 // In testing this does result in a large performance uptick, but unclear how
335 // much that will transfer from a synthetic benchmark.
notify_worker(LIBEVENT_THREAD * t,CQ_ITEM * item)336 static void notify_worker(LIBEVENT_THREAD *t, CQ_ITEM *item) {
337     cq_push(t->ev_queue, item);
338 #ifdef HAVE_EVENTFD
339     uint64_t u = 1;
340     if (write(t->n.notify_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
341         perror("failed writing to worker eventfd");
342         /* TODO: This is a fatal problem. Can it ever happen temporarily? */
343     }
344 #else
345     char buf[1] = "c";
346     if (write(t->n.notify_send_fd, buf, 1) != 1) {
347         perror("Failed writing to notify pipe");
348         /* TODO: This is a fatal problem. Can it ever happen temporarily? */
349     }
350 #endif
351 }
352 
353 // NOTE: An external func that takes a conn *c might be cleaner overall.
notify_worker_fd(LIBEVENT_THREAD * t,int sfd,enum conn_queue_item_modes mode)354 static void notify_worker_fd(LIBEVENT_THREAD *t, int sfd, enum conn_queue_item_modes mode) {
355     CQ_ITEM *item;
356     while ( (item = cqi_new(t->ev_queue)) == NULL ) {
357         // NOTE: most callers of this function cannot fail, but mallocs in
358         // theory can fail. Small mallocs essentially never do without also
359         // killing the process. Syscalls can also fail but the original code
360         // never handled this either.
361         // As a compromise, I'm leaving this note and this loop: This alloc
362         // cannot fail, but pre-allocating the data is too much code in an
363         // area I want to keep more lean. If this CQ business becomes a more
364         // generic queue I'll reconsider.
365     }
366 
367     item->mode = mode;
368     item->sfd = sfd;
369     notify_worker(t, item);
370 }
371 
372 /*
373  * Creates a worker thread.
374  */
create_worker(void * (* func)(void *),void * arg)375 static void create_worker(void *(*func)(void *), void *arg) {
376     pthread_attr_t  attr;
377     int             ret;
378 
379     pthread_attr_init(&attr);
380 
381     if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0) {
382         fprintf(stderr, "Can't create thread: %s\n",
383                 strerror(ret));
384         exit(1);
385     }
386 
387     thread_setname(((LIBEVENT_THREAD*)arg)->thread_id, "mc-worker");
388 }
389 
390 /*
391  * Sets whether or not we accept new connections.
392  */
accept_new_conns(const bool do_accept)393 void accept_new_conns(const bool do_accept) {
394     pthread_mutex_lock(&conn_lock);
395     do_accept_new_conns(do_accept);
396     pthread_mutex_unlock(&conn_lock);
397 }
398 /****************************** LIBEVENT THREADS *****************************/
399 
setup_thread_notify(LIBEVENT_THREAD * me,struct thread_notify * tn,void (* cb)(int,short,void *))400 static void setup_thread_notify(LIBEVENT_THREAD *me, struct thread_notify *tn,
401         void(*cb)(int, short, void *)) {
402 #ifdef HAVE_EVENTFD
403     event_set(&tn->notify_event, tn->notify_event_fd,
404               EV_READ | EV_PERSIST, cb, me);
405 #else
406     event_set(&tn->notify_event, tn->notify_receive_fd,
407               EV_READ | EV_PERSIST, cb, me);
408 #endif
409     event_base_set(me->base, &tn->notify_event);
410 
411     if (event_add(&tn->notify_event, 0) == -1) {
412         fprintf(stderr, "Can't monitor libevent notify pipe\n");
413         exit(1);
414     }
415 }
416 
417 /*
418  * Set up a thread's information.
419  */
setup_thread(LIBEVENT_THREAD * me)420 static void setup_thread(LIBEVENT_THREAD *me) {
421 #if defined(LIBEVENT_VERSION_NUMBER) && LIBEVENT_VERSION_NUMBER >= 0x02000101
422     struct event_config *ev_config;
423     ev_config = event_config_new();
424     event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
425     me->base = event_base_new_with_config(ev_config);
426     event_config_free(ev_config);
427 #else
428     me->base = event_init();
429 #endif
430 
431     if (! me->base) {
432         fprintf(stderr, "Can't allocate event base\n");
433         exit(1);
434     }
435 
436     /* Listen for notifications from other threads */
437     setup_thread_notify(me, &me->n, thread_libevent_process);
438     setup_thread_notify(me, &me->ion, thread_libevent_ionotify);
439     pthread_mutex_init(&me->ion_lock, NULL);
440     STAILQ_INIT(&me->ion_head);
441 
442     me->ev_queue = malloc(sizeof(struct conn_queue));
443     if (me->ev_queue == NULL) {
444         perror("Failed to allocate memory for connection queue");
445         exit(EXIT_FAILURE);
446     }
447     cq_init(me->ev_queue);
448 
449     if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
450         perror("Failed to initialize mutex");
451         exit(EXIT_FAILURE);
452     }
453 
454     me->rbuf_cache = cache_create("rbuf", READ_BUFFER_SIZE, sizeof(char *));
455     if (me->rbuf_cache == NULL) {
456         fprintf(stderr, "Failed to create read buffer cache\n");
457         exit(EXIT_FAILURE);
458     }
459     // Note: we were cleanly passing in num_threads before, but this now
460     // relies on settings globals too much.
461     if (settings.read_buf_mem_limit) {
462         int limit = settings.read_buf_mem_limit / settings.num_threads;
463         if (limit < READ_BUFFER_SIZE) {
464             limit = 1;
465         } else {
466             limit = limit / READ_BUFFER_SIZE;
467         }
468         cache_set_limit(me->rbuf_cache, limit);
469     }
470 
471     me->io_cache = cache_create("io", sizeof(io_pending_t), sizeof(char*));
472     if (me->io_cache == NULL) {
473         fprintf(stderr, "Failed to create IO object cache\n");
474         exit(EXIT_FAILURE);
475     }
476 #ifdef TLS
477     if (settings.ssl_enabled) {
478         me->ssl_wbuf = (char *)malloc((size_t)settings.ssl_wbuf_size);
479         if (me->ssl_wbuf == NULL) {
480             fprintf(stderr, "Failed to allocate the SSL write buffer\n");
481             exit(EXIT_FAILURE);
482         }
483     }
484 #endif
485 #ifdef EXTSTORE
486     // me->storage is set just before this function is called.
487     if (me->storage) {
488         thread_io_queue_add(me, IO_QUEUE_EXTSTORE, me->storage,
489             storage_submit_cb);
490     }
491 #endif
492 #ifdef PROXY
493     thread_io_queue_add(me, IO_QUEUE_PROXY, settings.proxy_ctx, proxy_submit_cb);
494 
495     // TODO: maybe register hooks to be called here from sub-packages? ie;
496     // extstore, TLS, proxy.
497     if (settings.proxy_enabled) {
498         proxy_thread_init(settings.proxy_ctx, me);
499     }
500 #endif
501     thread_io_queue_add(me, IO_QUEUE_NONE, NULL, NULL);
502 }
503 
504 /*
505  * Worker thread: main event loop
506  */
worker_libevent(void * arg)507 static void *worker_libevent(void *arg) {
508     LIBEVENT_THREAD *me = arg;
509 
510     /* Any per-thread setup can happen here; memcached_thread_init() will block until
511      * all threads have finished initializing.
512      */
513     me->l = logger_create();
514     me->lru_bump_buf = item_lru_bump_buf_create();
515     if (me->l == NULL || me->lru_bump_buf == NULL) {
516         abort();
517     }
518 
519     if (settings.drop_privileges) {
520         drop_worker_privileges();
521     }
522 
523     register_thread_initialized();
524 #ifdef PROXY
525     while (!event_base_got_exit(me->base)) {
526         event_base_loop(me->base, EVLOOP_ONCE);
527         if (me->proxy_ctx) {
528             proxy_gc_poke(me);
529         }
530     }
531 #else
532     event_base_loop(me->base, 0);
533 #endif
534     // same mechanism used to watch for all threads exiting.
535     register_thread_initialized();
536 
537     event_base_free(me->base);
538     return NULL;
539 }
540 
541 // Syscalls can be expensive enough that handling a few of them once here can
542 // save both throughput and overall latency.
543 #define MAX_PIPE_EVENTS 32
544 
545 // dedicated worker thread notify system for IO objects.
thread_libevent_ionotify(evutil_socket_t fd,short which,void * arg)546 static void thread_libevent_ionotify(evutil_socket_t fd, short which, void *arg) {
547     LIBEVENT_THREAD *me = arg;
548     uint64_t ev_count = 0;
549     iop_head_t head;
550 
551     STAILQ_INIT(&head);
552 #ifdef HAVE_EVENTFD
553     if (read(fd, &ev_count, sizeof(uint64_t)) != sizeof(uint64_t)) {
554         if (settings.verbose > 0)
555             fprintf(stderr, "Can't read from libevent pipe\n");
556         return;
557     }
558 #else
559     char buf[MAX_PIPE_EVENTS];
560 
561     ev_count = read(fd, buf, MAX_PIPE_EVENTS);
562     if (ev_count == 0) {
563         if (settings.verbose > 0)
564             fprintf(stderr, "Can't read from libevent pipe\n");
565         return;
566     }
567 #endif
568 
569     // pull entire queue and zero the thread head.
570     // need to do this after reading a syscall as we are only guaranteed to
571     // get syscalls if the queue is empty.
572     pthread_mutex_lock(&me->ion_lock);
573     STAILQ_CONCAT(&head, &me->ion_head);
574     pthread_mutex_unlock(&me->ion_lock);
575 
576     while (!STAILQ_EMPTY(&head)) {
577         io_pending_t *io = STAILQ_FIRST(&head);
578         STAILQ_REMOVE_HEAD(&head, iop_next);
579         conn_io_queue_return(io);
580     }
581 }
582 
583 /*
584  * Processes an incoming "connection event" item. This is called when
585  * input arrives on the libevent wakeup pipe.
586  */
thread_libevent_process(evutil_socket_t fd,short which,void * arg)587 static void thread_libevent_process(evutil_socket_t fd, short which, void *arg) {
588     LIBEVENT_THREAD *me = arg;
589     CQ_ITEM *item;
590     conn *c;
591     uint64_t ev_count = 0; // max number of events to loop through this run.
592 #ifdef HAVE_EVENTFD
593     // NOTE: unlike pipe we aren't limiting the number of events per read.
594     // However we do limit the number of queue pulls to what the count was at
595     // the time of this function firing.
596     if (read(fd, &ev_count, sizeof(uint64_t)) != sizeof(uint64_t)) {
597         if (settings.verbose > 0)
598             fprintf(stderr, "Can't read from libevent pipe\n");
599         return;
600     }
601 #else
602     char buf[MAX_PIPE_EVENTS];
603 
604     ev_count = read(fd, buf, MAX_PIPE_EVENTS);
605     if (ev_count == 0) {
606         if (settings.verbose > 0)
607             fprintf(stderr, "Can't read from libevent pipe\n");
608         return;
609     }
610 #endif
611 
612     for (int x = 0; x < ev_count; x++) {
613         item = cq_pop(me->ev_queue);
614         if (item == NULL) {
615             return;
616         }
617 
618         switch (item->mode) {
619             case queue_new_conn:
620                 c = conn_new(item->sfd, item->init_state, item->event_flags,
621                                    item->read_buffer_size, item->transport,
622                                    me->base, item->ssl, item->conntag, item->bproto);
623                 if (c == NULL) {
624                     if (IS_UDP(item->transport)) {
625                         fprintf(stderr, "Can't listen for events on UDP socket\n");
626                         exit(1);
627                     } else {
628                         if (settings.verbose > 0) {
629                             fprintf(stderr, "Can't listen for events on fd %d\n",
630                                 item->sfd);
631                         }
632                         if (item->ssl) {
633                             ssl_conn_close(item->ssl);
634                             item->ssl = NULL;
635                         }
636                         close(item->sfd);
637                     }
638                 } else {
639                     c->thread = me;
640                     conn_io_queue_setup(c);
641 #ifdef TLS
642                     if (settings.ssl_enabled && c->ssl != NULL) {
643                         assert(c->thread && c->thread->ssl_wbuf);
644                         c->ssl_wbuf = c->thread->ssl_wbuf;
645                     }
646 #endif
647                 }
648                 break;
649             case queue_pause:
650                 /* we were told to pause and report in */
651                 register_thread_initialized();
652                 break;
653             case queue_timeout:
654                 /* a client socket timed out */
655                 conn_close_idle(conns[item->sfd]);
656                 break;
657             case queue_redispatch:
658                 /* a side thread redispatched a client connection */
659                 conn_worker_readd(conns[item->sfd]);
660                 break;
661             case queue_stop:
662                 /* asked to stop */
663                 event_base_loopexit(me->base, NULL);
664                 break;
665 #ifdef PROXY
666             case queue_proxy_reload:
667                 proxy_worker_reload(settings.proxy_ctx, me);
668                 break;
669 #endif
670         }
671 
672         cqi_free(me->ev_queue, item);
673     }
674 }
675 
676 // Interface is slightly different on various platforms.
677 // On linux, at least, the len limit is 16 bytes.
678 #define THR_NAME_MAXLEN 16
thread_setname(pthread_t thread,const char * name)679 void thread_setname(pthread_t thread, const char *name) {
680 assert(strlen(name) < THR_NAME_MAXLEN);
681 #if defined(__linux__) && defined(HAVE_PTHREAD_SETNAME_NP)
682 pthread_setname_np(thread, name);
683 #endif
684 }
685 #undef THR_NAME_MAXLEN
686 
687 // NOTE: need better encapsulation.
688 // used by the proxy module to iterate the worker threads.
get_worker_thread(int id)689 LIBEVENT_THREAD *get_worker_thread(int id) {
690     return &threads[id];
691 }
692 
693 /* Which thread we assigned a connection to most recently. */
694 static int last_thread = -1;
695 
696 /* Last thread we assigned to a connection based on napi_id */
697 static int last_thread_by_napi_id = -1;
698 
select_thread_round_robin(void)699 static LIBEVENT_THREAD *select_thread_round_robin(void)
700 {
701     int tid = (last_thread + 1) % settings.num_threads;
702 
703     last_thread = tid;
704 
705     return threads + tid;
706 }
707 
reset_threads_napi_id(void)708 static void reset_threads_napi_id(void)
709 {
710     LIBEVENT_THREAD *thread;
711     int i;
712 
713     for (i = 0; i < settings.num_threads; i++) {
714          thread = threads + i;
715          thread->napi_id = 0;
716     }
717 
718     last_thread_by_napi_id = -1;
719 }
720 
721 /* Select a worker thread based on the NAPI ID of an incoming connection
722  * request. NAPI ID is a globally unique ID that identifies a NIC RX queue
723  * on which a flow is received.
724  */
select_thread_by_napi_id(int sfd)725 static LIBEVENT_THREAD *select_thread_by_napi_id(int sfd)
726 {
727     LIBEVENT_THREAD *thread;
728     int napi_id, err, i;
729     socklen_t len;
730     int tid = -1;
731 
732     len = sizeof(socklen_t);
733     err = getsockopt(sfd, SOL_SOCKET, SO_INCOMING_NAPI_ID, &napi_id, &len);
734     if ((err == -1) || (napi_id == 0)) {
735         STATS_LOCK();
736         stats.round_robin_fallback++;
737         STATS_UNLOCK();
738         return select_thread_round_robin();
739     }
740 
741 select:
742     for (i = 0; i < settings.num_threads; i++) {
743          thread = threads + i;
744          if (last_thread_by_napi_id < i) {
745              thread->napi_id = napi_id;
746              last_thread_by_napi_id = i;
747              tid = i;
748              break;
749          }
750          if (thread->napi_id == napi_id) {
751              tid = i;
752              break;
753          }
754     }
755 
756     if (tid == -1) {
757         STATS_LOCK();
758         stats.unexpected_napi_ids++;
759         STATS_UNLOCK();
760         reset_threads_napi_id();
761         goto select;
762     }
763 
764     return threads + tid;
765 }
766 
767 /*
768  * Dispatches a new connection to another thread. This is only ever called
769  * from the main thread, either during initialization (for UDP) or because
770  * of an incoming connection.
771  */
dispatch_conn_new(int sfd,enum conn_states init_state,int event_flags,int read_buffer_size,enum network_transport transport,void * ssl,uint64_t conntag,enum protocol bproto)772 void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
773                        int read_buffer_size, enum network_transport transport, void *ssl,
774                        uint64_t conntag, enum protocol bproto) {
775     CQ_ITEM *item = NULL;
776     LIBEVENT_THREAD *thread;
777 
778     if (!settings.num_napi_ids)
779         thread = select_thread_round_robin();
780     else
781         thread = select_thread_by_napi_id(sfd);
782 
783     item = cqi_new(thread->ev_queue);
784     if (item == NULL) {
785         close(sfd);
786         /* given that malloc failed this may also fail, but let's try */
787         fprintf(stderr, "Failed to allocate memory for connection object\n");
788         return;
789     }
790 
791     item->sfd = sfd;
792     item->init_state = init_state;
793     item->event_flags = event_flags;
794     item->read_buffer_size = read_buffer_size;
795     item->transport = transport;
796     item->mode = queue_new_conn;
797     item->ssl = ssl;
798     item->conntag = conntag;
799     item->bproto = bproto;
800 
801     MEMCACHED_CONN_DISPATCH(sfd, (int64_t)thread->thread_id);
802     notify_worker(thread, item);
803 }
804 
805 /*
806  * Re-dispatches a connection back to the original thread. Can be called from
807  * any side thread borrowing a connection.
808  */
redispatch_conn(conn * c)809 void redispatch_conn(conn *c) {
810     notify_worker_fd(c->thread, c->sfd, queue_redispatch);
811 }
812 
timeout_conn(conn * c)813 void timeout_conn(conn *c) {
814     notify_worker_fd(c->thread, c->sfd, queue_timeout);
815 }
816 #ifdef PROXY
proxy_reload_notify(LIBEVENT_THREAD * t)817 void proxy_reload_notify(LIBEVENT_THREAD *t) {
818     notify_worker_fd(t, 0, queue_proxy_reload);
819 }
820 #endif
821 
return_io_pending(io_pending_t * io)822 void return_io_pending(io_pending_t *io) {
823     bool do_notify = false;
824     LIBEVENT_THREAD *t = io->thread;
825     pthread_mutex_lock(&t->ion_lock);
826     if (STAILQ_EMPTY(&t->ion_head)) {
827         do_notify = true;
828     }
829     STAILQ_INSERT_TAIL(&t->ion_head, io, iop_next);
830     pthread_mutex_unlock(&t->ion_lock);
831 
832     // skip the syscall if there was already data in the queue, as it's
833     // already been notified.
834     if (do_notify) {
835 #ifdef HAVE_EVENTFD
836         uint64_t u = 1;
837         if (write(t->ion.notify_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
838             perror("failed writing to worker eventfd");
839             /* TODO: This is a fatal problem. Can it ever happen temporarily? */
840         }
841 #else
842         char buf[1] = "c";
843         if (write(t->ion.notify_send_fd, buf, 1) != 1) {
844             perror("Failed writing to notify pipe");
845             /* TODO: This is a fatal problem. Can it ever happen temporarily? */
846         }
847 #endif
848     }
849 }
850 
851 /* This misses the allow_new_conns flag :( */
sidethread_conn_close(conn * c)852 void sidethread_conn_close(conn *c) {
853     if (settings.verbose > 1)
854         fprintf(stderr, "<%d connection closing from side thread.\n", c->sfd);
855 
856     c->state = conn_closing;
857     // redispatch will see closing flag and properly close connection.
858     redispatch_conn(c);
859     return;
860 }
861 
862 /********************************* ITEM ACCESS *******************************/
863 
864 /*
865  * Allocates a new item.
866  */
item_alloc(const char * key,size_t nkey,client_flags_t flags,rel_time_t exptime,int nbytes)867 item *item_alloc(const char *key, size_t nkey, client_flags_t flags, rel_time_t exptime, int nbytes) {
868     item *it;
869     /* do_item_alloc handles its own locks */
870     it = do_item_alloc(key, nkey, flags, exptime, nbytes);
871     return it;
872 }
873 
874 /*
875  * Returns an item if it hasn't been marked as expired,
876  * lazy-expiring as needed.
877  */
item_get(const char * key,const size_t nkey,LIBEVENT_THREAD * t,const bool do_update)878 item *item_get(const char *key, const size_t nkey, LIBEVENT_THREAD *t, const bool do_update) {
879     item *it;
880     uint32_t hv;
881     hv = hash(key, nkey);
882     item_lock(hv);
883     it = do_item_get(key, nkey, hv, t, do_update);
884     item_unlock(hv);
885     return it;
886 }
887 
888 // returns an item with the item lock held.
889 // lock will still be held even if return is NULL, allowing caller to replace
890 // an item atomically if desired.
item_get_locked(const char * key,const size_t nkey,LIBEVENT_THREAD * t,const bool do_update,uint32_t * hv)891 item *item_get_locked(const char *key, const size_t nkey, LIBEVENT_THREAD *t, const bool do_update, uint32_t *hv) {
892     item *it;
893     *hv = hash(key, nkey);
894     item_lock(*hv);
895     it = do_item_get(key, nkey, *hv, t, do_update);
896     return it;
897 }
898 
item_touch(const char * key,size_t nkey,uint32_t exptime,LIBEVENT_THREAD * t)899 item *item_touch(const char *key, size_t nkey, uint32_t exptime, LIBEVENT_THREAD *t) {
900     item *it;
901     uint32_t hv;
902     hv = hash(key, nkey);
903     item_lock(hv);
904     it = do_item_touch(key, nkey, exptime, hv, t);
905     item_unlock(hv);
906     return it;
907 }
908 
909 /*
910  * Decrements the reference count on an item and adds it to the freelist if
911  * needed.
912  */
item_remove(item * item)913 void item_remove(item *item) {
914     uint32_t hv;
915     hv = hash(ITEM_key(item), item->nkey);
916 
917     item_lock(hv);
918     do_item_remove(item);
919     item_unlock(hv);
920 }
921 
922 /*
923  * Replaces one item with another in the hashtable.
924  * Unprotected by a mutex lock since the core server does not require
925  * it to be thread-safe.
926  */
item_replace(item * old_it,item * new_it,const uint32_t hv,const uint64_t cas_in)927 int item_replace(item *old_it, item *new_it, const uint32_t hv, const uint64_t cas_in) {
928     return do_item_replace(old_it, new_it, hv, cas_in);
929 }
930 
931 /*
932  * Unlinks an item from the LRU and hashtable.
933  */
item_unlink(item * item)934 void item_unlink(item *item) {
935     uint32_t hv;
936     hv = hash(ITEM_key(item), item->nkey);
937     item_lock(hv);
938     do_item_unlink(item, hv);
939     item_unlock(hv);
940 }
941 
942 /*
943  * Does arithmetic on a numeric item value.
944  */
add_delta(LIBEVENT_THREAD * t,const char * key,const size_t nkey,bool incr,const int64_t delta,char * buf,uint64_t * cas)945 enum delta_result_type add_delta(LIBEVENT_THREAD *t, const char *key,
946                                  const size_t nkey, bool incr,
947                                  const int64_t delta, char *buf,
948                                  uint64_t *cas) {
949     enum delta_result_type ret;
950     uint32_t hv;
951 
952     hv = hash(key, nkey);
953     item_lock(hv);
954     ret = do_add_delta(t, key, nkey, incr, delta, buf, cas, hv, NULL);
955     item_unlock(hv);
956     return ret;
957 }
958 
959 /*
960  * Stores an item in the cache (high level, obeys set/add/replace semantics)
961  */
store_item(item * item,int comm,LIBEVENT_THREAD * t,int * nbytes,uint64_t * cas,const uint64_t cas_in,bool cas_stale)962 enum store_item_type store_item(item *item, int comm, LIBEVENT_THREAD *t, int *nbytes, uint64_t *cas, const uint64_t cas_in, bool cas_stale) {
963     enum store_item_type ret;
964     uint32_t hv;
965 
966     hv = hash(ITEM_key(item), item->nkey);
967     item_lock(hv);
968     ret = do_store_item(item, comm, t, hv, nbytes, cas, cas_in, cas_stale);
969     item_unlock(hv);
970     return ret;
971 }
972 
973 /******************************* GLOBAL STATS ******************************/
974 
STATS_LOCK(void)975 void STATS_LOCK(void) {
976     pthread_mutex_lock(&stats_lock);
977 }
978 
STATS_UNLOCK(void)979 void STATS_UNLOCK(void) {
980     pthread_mutex_unlock(&stats_lock);
981 }
982 
threadlocal_stats_reset(void)983 void threadlocal_stats_reset(void) {
984     int ii;
985     for (ii = 0; ii < settings.num_threads; ++ii) {
986         pthread_mutex_lock(&threads[ii].stats.mutex);
987 #define X(name) threads[ii].stats.name = 0;
988         THREAD_STATS_FIELDS
989 #ifdef EXTSTORE
990         EXTSTORE_THREAD_STATS_FIELDS
991 #endif
992 #ifdef PROXY
993         PROXY_THREAD_STATS_FIELDS
994 #endif
995 #undef X
996 
997         memset(&threads[ii].stats.slab_stats, 0,
998                 sizeof(threads[ii].stats.slab_stats));
999         memset(&threads[ii].stats.lru_hits, 0,
1000                 sizeof(uint64_t) * POWER_LARGEST);
1001 
1002         pthread_mutex_unlock(&threads[ii].stats.mutex);
1003     }
1004 }
1005 
threadlocal_stats_aggregate(struct thread_stats * stats)1006 void threadlocal_stats_aggregate(struct thread_stats *stats) {
1007     int ii, sid;
1008 
1009     /* The struct has a mutex, but we can safely set the whole thing
1010      * to zero since it is unused when aggregating. */
1011     memset(stats, 0, sizeof(*stats));
1012 
1013     for (ii = 0; ii < settings.num_threads; ++ii) {
1014         pthread_mutex_lock(&threads[ii].stats.mutex);
1015 #define X(name) stats->name += threads[ii].stats.name;
1016         THREAD_STATS_FIELDS
1017 #ifdef EXTSTORE
1018         EXTSTORE_THREAD_STATS_FIELDS
1019 #endif
1020 #ifdef PROXY
1021         PROXY_THREAD_STATS_FIELDS
1022 #endif
1023 #undef X
1024 
1025         for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
1026 #define X(name) stats->slab_stats[sid].name += \
1027             threads[ii].stats.slab_stats[sid].name;
1028             SLAB_STATS_FIELDS
1029 #undef X
1030         }
1031 
1032         for (sid = 0; sid < POWER_LARGEST; sid++) {
1033             stats->lru_hits[sid] +=
1034                 threads[ii].stats.lru_hits[sid];
1035             stats->slab_stats[CLEAR_LRU(sid)].get_hits +=
1036                 threads[ii].stats.lru_hits[sid];
1037         }
1038 
1039         stats->read_buf_count += threads[ii].rbuf_cache->total;
1040         stats->read_buf_bytes += threads[ii].rbuf_cache->total * READ_BUFFER_SIZE;
1041         stats->read_buf_bytes_free += threads[ii].rbuf_cache->freecurr * READ_BUFFER_SIZE;
1042         pthread_mutex_unlock(&threads[ii].stats.mutex);
1043     }
1044 }
1045 
slab_stats_aggregate(struct thread_stats * stats,struct slab_stats * out)1046 void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) {
1047     int sid;
1048 
1049     memset(out, 0, sizeof(*out));
1050 
1051     for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
1052 #define X(name) out->name += stats->slab_stats[sid].name;
1053         SLAB_STATS_FIELDS
1054 #undef X
1055     }
1056 }
1057 
memcached_thread_notify_init(struct thread_notify * tn)1058 static void memcached_thread_notify_init(struct thread_notify *tn) {
1059 #ifdef HAVE_EVENTFD
1060         tn->notify_event_fd = eventfd(0, EFD_NONBLOCK);
1061         if (tn->notify_event_fd == -1) {
1062             perror("failed creating eventfd for worker thread");
1063             exit(1);
1064         }
1065 #else
1066         int fds[2];
1067         if (pipe(fds)) {
1068             perror("Can't create notify pipe");
1069             exit(1);
1070         }
1071 
1072         tn->notify_receive_fd = fds[0];
1073         tn->notify_send_fd = fds[1];
1074 #endif
1075 }
1076 
1077 /*
1078  * Initializes the thread subsystem, creating various worker threads.
1079  *
1080  * nthreads  Number of worker event handler threads to spawn
1081  */
memcached_thread_init(int nthreads,void * arg)1082 void memcached_thread_init(int nthreads, void *arg) {
1083     int         i;
1084     int         power;
1085 
1086     for (i = 0; i < POWER_LARGEST; i++) {
1087         pthread_mutex_init(&lru_locks[i], NULL);
1088     }
1089     pthread_mutex_init(&worker_hang_lock, NULL);
1090 
1091     pthread_mutex_init(&init_lock, NULL);
1092     pthread_cond_init(&init_cond, NULL);
1093 
1094     /* Want a wide lock table, but don't waste memory */
1095     if (nthreads < 3) {
1096         power = 10;
1097     } else if (nthreads < 4) {
1098         power = 11;
1099     } else if (nthreads < 5) {
1100         power = 12;
1101     } else if (nthreads <= 10) {
1102         power = 13;
1103     } else if (nthreads <= 20) {
1104         power = 14;
1105     } else {
1106         /* 32k buckets. just under the hashpower default. */
1107         power = 15;
1108     }
1109 
1110     if (power >= hashpower) {
1111         fprintf(stderr, "Hash table power size (%d) cannot be equal to or less than item lock table (%d)\n", hashpower, power);
1112         fprintf(stderr, "Item lock table grows with `-t N` (worker threadcount)\n");
1113         fprintf(stderr, "Hash table grows with `-o hashpower=N` \n");
1114         exit(1);
1115     }
1116 
1117     item_lock_count = hashsize(power);
1118     item_lock_hashpower = power;
1119 
1120     item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
1121     if (! item_locks) {
1122         perror("Can't allocate item locks");
1123         exit(1);
1124     }
1125     for (i = 0; i < item_lock_count; i++) {
1126         pthread_mutex_init(&item_locks[i], NULL);
1127     }
1128 
1129     threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
1130     if (! threads) {
1131         perror("Can't allocate thread descriptors");
1132         exit(1);
1133     }
1134 
1135     for (i = 0; i < nthreads; i++) {
1136         memcached_thread_notify_init(&threads[i].n);
1137         memcached_thread_notify_init(&threads[i].ion);
1138 #ifdef EXTSTORE
1139         threads[i].storage = arg;
1140 #endif
1141         threads[i].thread_baseid = i;
1142         setup_thread(&threads[i]);
1143         /* Reserve three fds for the libevent base, and two for the pipe */
1144         stats_state.reserved_fds += 5;
1145     }
1146 
1147     /* Create threads after we've done all the libevent setup. */
1148     for (i = 0; i < nthreads; i++) {
1149         create_worker(worker_libevent, &threads[i]);
1150     }
1151 
1152     /* Wait for all the threads to set themselves up before returning. */
1153     pthread_mutex_lock(&init_lock);
1154     wait_for_thread_registration(nthreads);
1155     pthread_mutex_unlock(&init_lock);
1156 }
1157 
1158