1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Thread management for memcached.
4 */
5 #include "memcached.h"
6 #ifdef EXTSTORE
7 #include "storage.h"
8 #endif
9 #ifdef HAVE_EVENTFD
10 #include <sys/eventfd.h>
11 #endif
12 #ifdef PROXY
13 #include "proto_proxy.h"
14 #endif
15 #include <assert.h>
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <string.h>
19 #include <pthread.h>
20
21 #include "queue.h"
22 #include "tls.h"
23
24 #ifdef __sun
25 #include <atomic.h>
26 #endif
27
28 #define ITEMS_PER_ALLOC 64
29
30 /* An item in the connection queue. */
31 enum conn_queue_item_modes {
32 queue_new_conn, /* brand new connection. */
33 queue_pause, /* pause thread */
34 queue_timeout, /* socket sfd timed out */
35 queue_redispatch, /* return conn from side thread */
36 queue_stop, /* exit thread */
37 #ifdef PROXY
38 queue_proxy_reload, /* signal proxy to reload worker VM */
39 #endif
40 };
41 typedef struct conn_queue_item CQ_ITEM;
42 struct conn_queue_item {
43 int sfd;
44 enum conn_states init_state;
45 int event_flags;
46 int read_buffer_size;
47 enum network_transport transport;
48 enum conn_queue_item_modes mode;
49 conn *c;
50 void *ssl;
51 uint64_t conntag;
52 enum protocol bproto;
53 io_pending_t *io; // IO when used for deferred IO handling.
54 STAILQ_ENTRY(conn_queue_item) i_next;
55 };
56
57 /* A connection queue. */
58 typedef struct conn_queue CQ;
59 struct conn_queue {
60 STAILQ_HEAD(conn_ev_head, conn_queue_item) head;
61 pthread_mutex_t lock;
62 cache_t *cache; /* freelisted objects */
63 };
64
65 /* Locks for cache LRU operations */
66 pthread_mutex_t lru_locks[POWER_LARGEST];
67
68 /* Connection lock around accepting new connections */
69 pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER;
70
71 #if !defined(HAVE_GCC_ATOMICS) && !defined(__sun)
72 pthread_mutex_t atomics_mutex = PTHREAD_MUTEX_INITIALIZER;
73 #endif
74
75 /* Lock for global stats */
76 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
77
78 /* Lock to cause worker threads to hang up after being woken */
79 static pthread_mutex_t worker_hang_lock;
80
81 static pthread_mutex_t *item_locks;
82 /* size of the item lock hash table */
83 static uint32_t item_lock_count;
84 static unsigned int item_lock_hashpower;
85 #define hashsize(n) ((unsigned long int)1<<(n))
86 #define hashmask(n) (hashsize(n)-1)
87
88 /*
89 * Each libevent instance has a wakeup pipe, which other threads
90 * can use to signal that they've put a new connection on its queue.
91 */
92 static LIBEVENT_THREAD *threads;
93
94 /*
95 * Number of worker threads that have finished setting themselves up.
96 */
97 static int init_count = 0;
98 static pthread_mutex_t init_lock;
99 static pthread_cond_t init_cond;
100
101 static void notify_worker(LIBEVENT_THREAD *t, CQ_ITEM *item);
102 static void notify_worker_fd(LIBEVENT_THREAD *t, int sfd, enum conn_queue_item_modes mode);
103 static CQ_ITEM *cqi_new(CQ *cq);
104 static void cq_push(CQ *cq, CQ_ITEM *item);
105
106 static void thread_libevent_process(evutil_socket_t fd, short which, void *arg);
107 static void thread_libevent_ionotify(evutil_socket_t fd, short which, void *arg);
108
109 /* item_lock() must be held for an item before any modifications to either its
110 * associated hash bucket, or the structure itself.
111 * LRU modifications must hold the item lock, and the LRU lock.
112 * LRU's accessing items must item_trylock() before modifying an item.
113 * Items accessible from an LRU must not be freed or modified
114 * without first locking and removing from the LRU.
115 */
116
item_lock(uint32_t hv)117 void item_lock(uint32_t hv) {
118 mutex_lock(&item_locks[hv & hashmask(item_lock_hashpower)]);
119 }
120
item_trylock(uint32_t hv)121 void *item_trylock(uint32_t hv) {
122 pthread_mutex_t *lock = &item_locks[hv & hashmask(item_lock_hashpower)];
123 if (pthread_mutex_trylock(lock) == 0) {
124 return lock;
125 }
126 return NULL;
127 }
128
item_trylock_unlock(void * lock)129 void item_trylock_unlock(void *lock) {
130 mutex_unlock((pthread_mutex_t *) lock);
131 }
132
item_unlock(uint32_t hv)133 void item_unlock(uint32_t hv) {
134 mutex_unlock(&item_locks[hv & hashmask(item_lock_hashpower)]);
135 }
136
wait_for_thread_registration(int nthreads)137 static void wait_for_thread_registration(int nthreads) {
138 while (init_count < nthreads) {
139 pthread_cond_wait(&init_cond, &init_lock);
140 }
141 }
142
register_thread_initialized(void)143 static void register_thread_initialized(void) {
144 pthread_mutex_lock(&init_lock);
145 init_count++;
146 pthread_cond_signal(&init_cond);
147 pthread_mutex_unlock(&init_lock);
148 /* Force worker threads to pile up if someone wants us to */
149 pthread_mutex_lock(&worker_hang_lock);
150 pthread_mutex_unlock(&worker_hang_lock);
151 }
152
153 /* Must not be called with any deeper locks held */
pause_threads(enum pause_thread_types type)154 void pause_threads(enum pause_thread_types type) {
155 int i;
156 bool pause_workers = false;
157
158 switch (type) {
159 case PAUSE_ALL_THREADS:
160 slabs_rebalancer_pause();
161 lru_maintainer_pause();
162 lru_crawler_pause();
163 #ifdef EXTSTORE
164 storage_compact_pause();
165 storage_write_pause();
166 #endif
167 case PAUSE_WORKER_THREADS:
168 pause_workers = true;
169 pthread_mutex_lock(&worker_hang_lock);
170 break;
171 case RESUME_ALL_THREADS:
172 slabs_rebalancer_resume();
173 lru_maintainer_resume();
174 lru_crawler_resume();
175 #ifdef EXTSTORE
176 storage_compact_resume();
177 storage_write_resume();
178 #endif
179 case RESUME_WORKER_THREADS:
180 pthread_mutex_unlock(&worker_hang_lock);
181 break;
182 default:
183 fprintf(stderr, "Unknown lock type: %d\n", type);
184 assert(1 == 0);
185 break;
186 }
187
188 /* Only send a message if we have one. */
189 if (!pause_workers) {
190 return;
191 }
192
193 pthread_mutex_lock(&init_lock);
194 init_count = 0;
195 for (i = 0; i < settings.num_threads; i++) {
196 notify_worker_fd(&threads[i], 0, queue_pause);
197 }
198 wait_for_thread_registration(settings.num_threads);
199 pthread_mutex_unlock(&init_lock);
200 }
201
202 // MUST not be called with any deeper locks held
203 // MUST be called only by parent thread
204 // Note: listener thread is the "main" event base, which has exited its
205 // loop in order to call this function.
stop_threads(void)206 void stop_threads(void) {
207 int i;
208
209 // assoc can call pause_threads(), so we have to stop it first.
210 stop_assoc_maintenance_thread();
211 if (settings.verbose > 0)
212 fprintf(stderr, "stopped assoc\n");
213
214 if (settings.verbose > 0)
215 fprintf(stderr, "asking workers to stop\n");
216
217 pthread_mutex_lock(&worker_hang_lock);
218 pthread_mutex_lock(&init_lock);
219 init_count = 0;
220 for (i = 0; i < settings.num_threads; i++) {
221 notify_worker_fd(&threads[i], 0, queue_stop);
222 }
223 wait_for_thread_registration(settings.num_threads);
224 pthread_mutex_unlock(&init_lock);
225
226 // All of the workers are hung but haven't done cleanup yet.
227
228 if (settings.verbose > 0)
229 fprintf(stderr, "asking background threads to stop\n");
230
231 // stop each side thread.
232 // TODO: Verify these all work if the threads are already stopped
233 stop_item_crawler_thread(CRAWLER_WAIT);
234 if (settings.verbose > 0)
235 fprintf(stderr, "stopped lru crawler\n");
236 if (settings.lru_maintainer_thread) {
237 stop_lru_maintainer_thread();
238 if (settings.verbose > 0)
239 fprintf(stderr, "stopped maintainer\n");
240 }
241 if (settings.slab_reassign) {
242 stop_slab_maintenance_thread();
243 if (settings.verbose > 0)
244 fprintf(stderr, "stopped slab mover\n");
245 }
246 logger_stop();
247 if (settings.verbose > 0)
248 fprintf(stderr, "stopped logger thread\n");
249 stop_conn_timeout_thread();
250 if (settings.verbose > 0)
251 fprintf(stderr, "stopped idle timeout thread\n");
252
253 // Close all connections then let the workers finally exit.
254 if (settings.verbose > 0)
255 fprintf(stderr, "closing connections\n");
256 conn_close_all();
257 pthread_mutex_unlock(&worker_hang_lock);
258 if (settings.verbose > 0)
259 fprintf(stderr, "reaping worker threads\n");
260 for (i = 0; i < settings.num_threads; i++) {
261 pthread_join(threads[i].thread_id, NULL);
262 }
263
264 if (settings.verbose > 0)
265 fprintf(stderr, "all background threads stopped\n");
266
267 // At this point, every background thread must be stopped.
268 }
269
270 /*
271 * Initializes a connection queue.
272 */
cq_init(CQ * cq)273 static void cq_init(CQ *cq) {
274 pthread_mutex_init(&cq->lock, NULL);
275 STAILQ_INIT(&cq->head);
276 cq->cache = cache_create("cq", sizeof(CQ_ITEM), sizeof(char *));
277 if (cq->cache == NULL) {
278 fprintf(stderr, "Failed to create connection queue cache\n");
279 exit(EXIT_FAILURE);
280 }
281 }
282
283 /*
284 * Looks for an item on a connection queue, but doesn't block if there isn't
285 * one.
286 * Returns the item, or NULL if no item is available
287 */
cq_pop(CQ * cq)288 static CQ_ITEM *cq_pop(CQ *cq) {
289 CQ_ITEM *item;
290
291 pthread_mutex_lock(&cq->lock);
292 item = STAILQ_FIRST(&cq->head);
293 if (item != NULL) {
294 STAILQ_REMOVE_HEAD(&cq->head, i_next);
295 }
296 pthread_mutex_unlock(&cq->lock);
297
298 return item;
299 }
300
301 /*
302 * Adds an item to a connection queue.
303 */
cq_push(CQ * cq,CQ_ITEM * item)304 static void cq_push(CQ *cq, CQ_ITEM *item) {
305 pthread_mutex_lock(&cq->lock);
306 STAILQ_INSERT_TAIL(&cq->head, item, i_next);
307 pthread_mutex_unlock(&cq->lock);
308 }
309
310 /*
311 * Returns a fresh connection queue item.
312 */
cqi_new(CQ * cq)313 static CQ_ITEM *cqi_new(CQ *cq) {
314 CQ_ITEM *item = cache_alloc(cq->cache);
315 if (item == NULL) {
316 STATS_LOCK();
317 stats.malloc_fails++;
318 STATS_UNLOCK();
319 }
320 return item;
321 }
322
323 /*
324 * Frees a connection queue item (adds it to the freelist.)
325 */
cqi_free(CQ * cq,CQ_ITEM * item)326 static void cqi_free(CQ *cq, CQ_ITEM *item) {
327 cache_free(cq->cache, item);
328 }
329
330 // TODO: Skip notify if queue wasn't empty?
331 // - Requires cq_push() returning a "was empty" flag
332 // - Requires event handling loop to pop the entire queue and work from that
333 // instead of the ev_count work there now.
334 // In testing this does result in a large performance uptick, but unclear how
335 // much that will transfer from a synthetic benchmark.
notify_worker(LIBEVENT_THREAD * t,CQ_ITEM * item)336 static void notify_worker(LIBEVENT_THREAD *t, CQ_ITEM *item) {
337 cq_push(t->ev_queue, item);
338 #ifdef HAVE_EVENTFD
339 uint64_t u = 1;
340 if (write(t->n.notify_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
341 perror("failed writing to worker eventfd");
342 /* TODO: This is a fatal problem. Can it ever happen temporarily? */
343 }
344 #else
345 char buf[1] = "c";
346 if (write(t->n.notify_send_fd, buf, 1) != 1) {
347 perror("Failed writing to notify pipe");
348 /* TODO: This is a fatal problem. Can it ever happen temporarily? */
349 }
350 #endif
351 }
352
353 // NOTE: An external func that takes a conn *c might be cleaner overall.
notify_worker_fd(LIBEVENT_THREAD * t,int sfd,enum conn_queue_item_modes mode)354 static void notify_worker_fd(LIBEVENT_THREAD *t, int sfd, enum conn_queue_item_modes mode) {
355 CQ_ITEM *item;
356 while ( (item = cqi_new(t->ev_queue)) == NULL ) {
357 // NOTE: most callers of this function cannot fail, but mallocs in
358 // theory can fail. Small mallocs essentially never do without also
359 // killing the process. Syscalls can also fail but the original code
360 // never handled this either.
361 // As a compromise, I'm leaving this note and this loop: This alloc
362 // cannot fail, but pre-allocating the data is too much code in an
363 // area I want to keep more lean. If this CQ business becomes a more
364 // generic queue I'll reconsider.
365 }
366
367 item->mode = mode;
368 item->sfd = sfd;
369 notify_worker(t, item);
370 }
371
372 /*
373 * Creates a worker thread.
374 */
create_worker(void * (* func)(void *),void * arg)375 static void create_worker(void *(*func)(void *), void *arg) {
376 pthread_attr_t attr;
377 int ret;
378
379 pthread_attr_init(&attr);
380
381 if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0) {
382 fprintf(stderr, "Can't create thread: %s\n",
383 strerror(ret));
384 exit(1);
385 }
386
387 thread_setname(((LIBEVENT_THREAD*)arg)->thread_id, "mc-worker");
388 }
389
390 /*
391 * Sets whether or not we accept new connections.
392 */
accept_new_conns(const bool do_accept)393 void accept_new_conns(const bool do_accept) {
394 pthread_mutex_lock(&conn_lock);
395 do_accept_new_conns(do_accept);
396 pthread_mutex_unlock(&conn_lock);
397 }
398 /****************************** LIBEVENT THREADS *****************************/
399
setup_thread_notify(LIBEVENT_THREAD * me,struct thread_notify * tn,void (* cb)(int,short,void *))400 static void setup_thread_notify(LIBEVENT_THREAD *me, struct thread_notify *tn,
401 void(*cb)(int, short, void *)) {
402 #ifdef HAVE_EVENTFD
403 event_set(&tn->notify_event, tn->notify_event_fd,
404 EV_READ | EV_PERSIST, cb, me);
405 #else
406 event_set(&tn->notify_event, tn->notify_receive_fd,
407 EV_READ | EV_PERSIST, cb, me);
408 #endif
409 event_base_set(me->base, &tn->notify_event);
410
411 if (event_add(&tn->notify_event, 0) == -1) {
412 fprintf(stderr, "Can't monitor libevent notify pipe\n");
413 exit(1);
414 }
415 }
416
417 /*
418 * Set up a thread's information.
419 */
setup_thread(LIBEVENT_THREAD * me)420 static void setup_thread(LIBEVENT_THREAD *me) {
421 #if defined(LIBEVENT_VERSION_NUMBER) && LIBEVENT_VERSION_NUMBER >= 0x02000101
422 struct event_config *ev_config;
423 ev_config = event_config_new();
424 event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
425 me->base = event_base_new_with_config(ev_config);
426 event_config_free(ev_config);
427 #else
428 me->base = event_init();
429 #endif
430
431 if (! me->base) {
432 fprintf(stderr, "Can't allocate event base\n");
433 exit(1);
434 }
435
436 /* Listen for notifications from other threads */
437 setup_thread_notify(me, &me->n, thread_libevent_process);
438 setup_thread_notify(me, &me->ion, thread_libevent_ionotify);
439 pthread_mutex_init(&me->ion_lock, NULL);
440 STAILQ_INIT(&me->ion_head);
441
442 me->ev_queue = malloc(sizeof(struct conn_queue));
443 if (me->ev_queue == NULL) {
444 perror("Failed to allocate memory for connection queue");
445 exit(EXIT_FAILURE);
446 }
447 cq_init(me->ev_queue);
448
449 if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
450 perror("Failed to initialize mutex");
451 exit(EXIT_FAILURE);
452 }
453
454 me->rbuf_cache = cache_create("rbuf", READ_BUFFER_SIZE, sizeof(char *));
455 if (me->rbuf_cache == NULL) {
456 fprintf(stderr, "Failed to create read buffer cache\n");
457 exit(EXIT_FAILURE);
458 }
459 // Note: we were cleanly passing in num_threads before, but this now
460 // relies on settings globals too much.
461 if (settings.read_buf_mem_limit) {
462 int limit = settings.read_buf_mem_limit / settings.num_threads;
463 if (limit < READ_BUFFER_SIZE) {
464 limit = 1;
465 } else {
466 limit = limit / READ_BUFFER_SIZE;
467 }
468 cache_set_limit(me->rbuf_cache, limit);
469 }
470
471 me->io_cache = cache_create("io", sizeof(io_pending_t), sizeof(char*));
472 if (me->io_cache == NULL) {
473 fprintf(stderr, "Failed to create IO object cache\n");
474 exit(EXIT_FAILURE);
475 }
476 #ifdef TLS
477 if (settings.ssl_enabled) {
478 me->ssl_wbuf = (char *)malloc((size_t)settings.ssl_wbuf_size);
479 if (me->ssl_wbuf == NULL) {
480 fprintf(stderr, "Failed to allocate the SSL write buffer\n");
481 exit(EXIT_FAILURE);
482 }
483 }
484 #endif
485 #ifdef EXTSTORE
486 // me->storage is set just before this function is called.
487 if (me->storage) {
488 thread_io_queue_add(me, IO_QUEUE_EXTSTORE, me->storage,
489 storage_submit_cb);
490 }
491 #endif
492 #ifdef PROXY
493 thread_io_queue_add(me, IO_QUEUE_PROXY, settings.proxy_ctx, proxy_submit_cb);
494
495 // TODO: maybe register hooks to be called here from sub-packages? ie;
496 // extstore, TLS, proxy.
497 if (settings.proxy_enabled) {
498 proxy_thread_init(settings.proxy_ctx, me);
499 }
500 #endif
501 thread_io_queue_add(me, IO_QUEUE_NONE, NULL, NULL);
502 }
503
504 /*
505 * Worker thread: main event loop
506 */
worker_libevent(void * arg)507 static void *worker_libevent(void *arg) {
508 LIBEVENT_THREAD *me = arg;
509
510 /* Any per-thread setup can happen here; memcached_thread_init() will block until
511 * all threads have finished initializing.
512 */
513 me->l = logger_create();
514 me->lru_bump_buf = item_lru_bump_buf_create();
515 if (me->l == NULL || me->lru_bump_buf == NULL) {
516 abort();
517 }
518
519 if (settings.drop_privileges) {
520 drop_worker_privileges();
521 }
522
523 register_thread_initialized();
524 #ifdef PROXY
525 while (!event_base_got_exit(me->base)) {
526 event_base_loop(me->base, EVLOOP_ONCE);
527 if (me->proxy_ctx) {
528 proxy_gc_poke(me);
529 }
530 }
531 #else
532 event_base_loop(me->base, 0);
533 #endif
534 // same mechanism used to watch for all threads exiting.
535 register_thread_initialized();
536
537 event_base_free(me->base);
538 return NULL;
539 }
540
541 // Syscalls can be expensive enough that handling a few of them once here can
542 // save both throughput and overall latency.
543 #define MAX_PIPE_EVENTS 32
544
545 // dedicated worker thread notify system for IO objects.
thread_libevent_ionotify(evutil_socket_t fd,short which,void * arg)546 static void thread_libevent_ionotify(evutil_socket_t fd, short which, void *arg) {
547 LIBEVENT_THREAD *me = arg;
548 uint64_t ev_count = 0;
549 iop_head_t head;
550
551 STAILQ_INIT(&head);
552 #ifdef HAVE_EVENTFD
553 if (read(fd, &ev_count, sizeof(uint64_t)) != sizeof(uint64_t)) {
554 if (settings.verbose > 0)
555 fprintf(stderr, "Can't read from libevent pipe\n");
556 return;
557 }
558 #else
559 char buf[MAX_PIPE_EVENTS];
560
561 ev_count = read(fd, buf, MAX_PIPE_EVENTS);
562 if (ev_count == 0) {
563 if (settings.verbose > 0)
564 fprintf(stderr, "Can't read from libevent pipe\n");
565 return;
566 }
567 #endif
568
569 // pull entire queue and zero the thread head.
570 // need to do this after reading a syscall as we are only guaranteed to
571 // get syscalls if the queue is empty.
572 pthread_mutex_lock(&me->ion_lock);
573 STAILQ_CONCAT(&head, &me->ion_head);
574 pthread_mutex_unlock(&me->ion_lock);
575
576 while (!STAILQ_EMPTY(&head)) {
577 io_pending_t *io = STAILQ_FIRST(&head);
578 STAILQ_REMOVE_HEAD(&head, iop_next);
579 conn_io_queue_return(io);
580 }
581 }
582
583 /*
584 * Processes an incoming "connection event" item. This is called when
585 * input arrives on the libevent wakeup pipe.
586 */
thread_libevent_process(evutil_socket_t fd,short which,void * arg)587 static void thread_libevent_process(evutil_socket_t fd, short which, void *arg) {
588 LIBEVENT_THREAD *me = arg;
589 CQ_ITEM *item;
590 conn *c;
591 uint64_t ev_count = 0; // max number of events to loop through this run.
592 #ifdef HAVE_EVENTFD
593 // NOTE: unlike pipe we aren't limiting the number of events per read.
594 // However we do limit the number of queue pulls to what the count was at
595 // the time of this function firing.
596 if (read(fd, &ev_count, sizeof(uint64_t)) != sizeof(uint64_t)) {
597 if (settings.verbose > 0)
598 fprintf(stderr, "Can't read from libevent pipe\n");
599 return;
600 }
601 #else
602 char buf[MAX_PIPE_EVENTS];
603
604 ev_count = read(fd, buf, MAX_PIPE_EVENTS);
605 if (ev_count == 0) {
606 if (settings.verbose > 0)
607 fprintf(stderr, "Can't read from libevent pipe\n");
608 return;
609 }
610 #endif
611
612 for (int x = 0; x < ev_count; x++) {
613 item = cq_pop(me->ev_queue);
614 if (item == NULL) {
615 return;
616 }
617
618 switch (item->mode) {
619 case queue_new_conn:
620 c = conn_new(item->sfd, item->init_state, item->event_flags,
621 item->read_buffer_size, item->transport,
622 me->base, item->ssl, item->conntag, item->bproto);
623 if (c == NULL) {
624 if (IS_UDP(item->transport)) {
625 fprintf(stderr, "Can't listen for events on UDP socket\n");
626 exit(1);
627 } else {
628 if (settings.verbose > 0) {
629 fprintf(stderr, "Can't listen for events on fd %d\n",
630 item->sfd);
631 }
632 if (item->ssl) {
633 ssl_conn_close(item->ssl);
634 item->ssl = NULL;
635 }
636 close(item->sfd);
637 }
638 } else {
639 c->thread = me;
640 conn_io_queue_setup(c);
641 #ifdef TLS
642 if (settings.ssl_enabled && c->ssl != NULL) {
643 assert(c->thread && c->thread->ssl_wbuf);
644 c->ssl_wbuf = c->thread->ssl_wbuf;
645 }
646 #endif
647 }
648 break;
649 case queue_pause:
650 /* we were told to pause and report in */
651 register_thread_initialized();
652 break;
653 case queue_timeout:
654 /* a client socket timed out */
655 conn_close_idle(conns[item->sfd]);
656 break;
657 case queue_redispatch:
658 /* a side thread redispatched a client connection */
659 conn_worker_readd(conns[item->sfd]);
660 break;
661 case queue_stop:
662 /* asked to stop */
663 event_base_loopexit(me->base, NULL);
664 break;
665 #ifdef PROXY
666 case queue_proxy_reload:
667 proxy_worker_reload(settings.proxy_ctx, me);
668 break;
669 #endif
670 }
671
672 cqi_free(me->ev_queue, item);
673 }
674 }
675
676 // Interface is slightly different on various platforms.
677 // On linux, at least, the len limit is 16 bytes.
678 #define THR_NAME_MAXLEN 16
thread_setname(pthread_t thread,const char * name)679 void thread_setname(pthread_t thread, const char *name) {
680 assert(strlen(name) < THR_NAME_MAXLEN);
681 #if defined(__linux__) && defined(HAVE_PTHREAD_SETNAME_NP)
682 pthread_setname_np(thread, name);
683 #endif
684 }
685 #undef THR_NAME_MAXLEN
686
687 // NOTE: need better encapsulation.
688 // used by the proxy module to iterate the worker threads.
get_worker_thread(int id)689 LIBEVENT_THREAD *get_worker_thread(int id) {
690 return &threads[id];
691 }
692
693 /* Which thread we assigned a connection to most recently. */
694 static int last_thread = -1;
695
696 /* Last thread we assigned to a connection based on napi_id */
697 static int last_thread_by_napi_id = -1;
698
select_thread_round_robin(void)699 static LIBEVENT_THREAD *select_thread_round_robin(void)
700 {
701 int tid = (last_thread + 1) % settings.num_threads;
702
703 last_thread = tid;
704
705 return threads + tid;
706 }
707
reset_threads_napi_id(void)708 static void reset_threads_napi_id(void)
709 {
710 LIBEVENT_THREAD *thread;
711 int i;
712
713 for (i = 0; i < settings.num_threads; i++) {
714 thread = threads + i;
715 thread->napi_id = 0;
716 }
717
718 last_thread_by_napi_id = -1;
719 }
720
721 /* Select a worker thread based on the NAPI ID of an incoming connection
722 * request. NAPI ID is a globally unique ID that identifies a NIC RX queue
723 * on which a flow is received.
724 */
select_thread_by_napi_id(int sfd)725 static LIBEVENT_THREAD *select_thread_by_napi_id(int sfd)
726 {
727 LIBEVENT_THREAD *thread;
728 int napi_id, err, i;
729 socklen_t len;
730 int tid = -1;
731
732 len = sizeof(socklen_t);
733 err = getsockopt(sfd, SOL_SOCKET, SO_INCOMING_NAPI_ID, &napi_id, &len);
734 if ((err == -1) || (napi_id == 0)) {
735 STATS_LOCK();
736 stats.round_robin_fallback++;
737 STATS_UNLOCK();
738 return select_thread_round_robin();
739 }
740
741 select:
742 for (i = 0; i < settings.num_threads; i++) {
743 thread = threads + i;
744 if (last_thread_by_napi_id < i) {
745 thread->napi_id = napi_id;
746 last_thread_by_napi_id = i;
747 tid = i;
748 break;
749 }
750 if (thread->napi_id == napi_id) {
751 tid = i;
752 break;
753 }
754 }
755
756 if (tid == -1) {
757 STATS_LOCK();
758 stats.unexpected_napi_ids++;
759 STATS_UNLOCK();
760 reset_threads_napi_id();
761 goto select;
762 }
763
764 return threads + tid;
765 }
766
767 /*
768 * Dispatches a new connection to another thread. This is only ever called
769 * from the main thread, either during initialization (for UDP) or because
770 * of an incoming connection.
771 */
dispatch_conn_new(int sfd,enum conn_states init_state,int event_flags,int read_buffer_size,enum network_transport transport,void * ssl,uint64_t conntag,enum protocol bproto)772 void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
773 int read_buffer_size, enum network_transport transport, void *ssl,
774 uint64_t conntag, enum protocol bproto) {
775 CQ_ITEM *item = NULL;
776 LIBEVENT_THREAD *thread;
777
778 if (!settings.num_napi_ids)
779 thread = select_thread_round_robin();
780 else
781 thread = select_thread_by_napi_id(sfd);
782
783 item = cqi_new(thread->ev_queue);
784 if (item == NULL) {
785 close(sfd);
786 /* given that malloc failed this may also fail, but let's try */
787 fprintf(stderr, "Failed to allocate memory for connection object\n");
788 return;
789 }
790
791 item->sfd = sfd;
792 item->init_state = init_state;
793 item->event_flags = event_flags;
794 item->read_buffer_size = read_buffer_size;
795 item->transport = transport;
796 item->mode = queue_new_conn;
797 item->ssl = ssl;
798 item->conntag = conntag;
799 item->bproto = bproto;
800
801 MEMCACHED_CONN_DISPATCH(sfd, (int64_t)thread->thread_id);
802 notify_worker(thread, item);
803 }
804
805 /*
806 * Re-dispatches a connection back to the original thread. Can be called from
807 * any side thread borrowing a connection.
808 */
redispatch_conn(conn * c)809 void redispatch_conn(conn *c) {
810 notify_worker_fd(c->thread, c->sfd, queue_redispatch);
811 }
812
timeout_conn(conn * c)813 void timeout_conn(conn *c) {
814 notify_worker_fd(c->thread, c->sfd, queue_timeout);
815 }
816 #ifdef PROXY
proxy_reload_notify(LIBEVENT_THREAD * t)817 void proxy_reload_notify(LIBEVENT_THREAD *t) {
818 notify_worker_fd(t, 0, queue_proxy_reload);
819 }
820 #endif
821
return_io_pending(io_pending_t * io)822 void return_io_pending(io_pending_t *io) {
823 bool do_notify = false;
824 LIBEVENT_THREAD *t = io->thread;
825 pthread_mutex_lock(&t->ion_lock);
826 if (STAILQ_EMPTY(&t->ion_head)) {
827 do_notify = true;
828 }
829 STAILQ_INSERT_TAIL(&t->ion_head, io, iop_next);
830 pthread_mutex_unlock(&t->ion_lock);
831
832 // skip the syscall if there was already data in the queue, as it's
833 // already been notified.
834 if (do_notify) {
835 #ifdef HAVE_EVENTFD
836 uint64_t u = 1;
837 if (write(t->ion.notify_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
838 perror("failed writing to worker eventfd");
839 /* TODO: This is a fatal problem. Can it ever happen temporarily? */
840 }
841 #else
842 char buf[1] = "c";
843 if (write(t->ion.notify_send_fd, buf, 1) != 1) {
844 perror("Failed writing to notify pipe");
845 /* TODO: This is a fatal problem. Can it ever happen temporarily? */
846 }
847 #endif
848 }
849 }
850
851 /* This misses the allow_new_conns flag :( */
sidethread_conn_close(conn * c)852 void sidethread_conn_close(conn *c) {
853 if (settings.verbose > 1)
854 fprintf(stderr, "<%d connection closing from side thread.\n", c->sfd);
855
856 c->state = conn_closing;
857 // redispatch will see closing flag and properly close connection.
858 redispatch_conn(c);
859 return;
860 }
861
862 /********************************* ITEM ACCESS *******************************/
863
864 /*
865 * Allocates a new item.
866 */
item_alloc(const char * key,size_t nkey,client_flags_t flags,rel_time_t exptime,int nbytes)867 item *item_alloc(const char *key, size_t nkey, client_flags_t flags, rel_time_t exptime, int nbytes) {
868 item *it;
869 /* do_item_alloc handles its own locks */
870 it = do_item_alloc(key, nkey, flags, exptime, nbytes);
871 return it;
872 }
873
874 /*
875 * Returns an item if it hasn't been marked as expired,
876 * lazy-expiring as needed.
877 */
item_get(const char * key,const size_t nkey,LIBEVENT_THREAD * t,const bool do_update)878 item *item_get(const char *key, const size_t nkey, LIBEVENT_THREAD *t, const bool do_update) {
879 item *it;
880 uint32_t hv;
881 hv = hash(key, nkey);
882 item_lock(hv);
883 it = do_item_get(key, nkey, hv, t, do_update);
884 item_unlock(hv);
885 return it;
886 }
887
888 // returns an item with the item lock held.
889 // lock will still be held even if return is NULL, allowing caller to replace
890 // an item atomically if desired.
item_get_locked(const char * key,const size_t nkey,LIBEVENT_THREAD * t,const bool do_update,uint32_t * hv)891 item *item_get_locked(const char *key, const size_t nkey, LIBEVENT_THREAD *t, const bool do_update, uint32_t *hv) {
892 item *it;
893 *hv = hash(key, nkey);
894 item_lock(*hv);
895 it = do_item_get(key, nkey, *hv, t, do_update);
896 return it;
897 }
898
item_touch(const char * key,size_t nkey,uint32_t exptime,LIBEVENT_THREAD * t)899 item *item_touch(const char *key, size_t nkey, uint32_t exptime, LIBEVENT_THREAD *t) {
900 item *it;
901 uint32_t hv;
902 hv = hash(key, nkey);
903 item_lock(hv);
904 it = do_item_touch(key, nkey, exptime, hv, t);
905 item_unlock(hv);
906 return it;
907 }
908
909 /*
910 * Decrements the reference count on an item and adds it to the freelist if
911 * needed.
912 */
item_remove(item * item)913 void item_remove(item *item) {
914 uint32_t hv;
915 hv = hash(ITEM_key(item), item->nkey);
916
917 item_lock(hv);
918 do_item_remove(item);
919 item_unlock(hv);
920 }
921
922 /*
923 * Replaces one item with another in the hashtable.
924 * Unprotected by a mutex lock since the core server does not require
925 * it to be thread-safe.
926 */
item_replace(item * old_it,item * new_it,const uint32_t hv,const uint64_t cas_in)927 int item_replace(item *old_it, item *new_it, const uint32_t hv, const uint64_t cas_in) {
928 return do_item_replace(old_it, new_it, hv, cas_in);
929 }
930
931 /*
932 * Unlinks an item from the LRU and hashtable.
933 */
item_unlink(item * item)934 void item_unlink(item *item) {
935 uint32_t hv;
936 hv = hash(ITEM_key(item), item->nkey);
937 item_lock(hv);
938 do_item_unlink(item, hv);
939 item_unlock(hv);
940 }
941
942 /*
943 * Does arithmetic on a numeric item value.
944 */
add_delta(LIBEVENT_THREAD * t,const char * key,const size_t nkey,bool incr,const int64_t delta,char * buf,uint64_t * cas)945 enum delta_result_type add_delta(LIBEVENT_THREAD *t, const char *key,
946 const size_t nkey, bool incr,
947 const int64_t delta, char *buf,
948 uint64_t *cas) {
949 enum delta_result_type ret;
950 uint32_t hv;
951
952 hv = hash(key, nkey);
953 item_lock(hv);
954 ret = do_add_delta(t, key, nkey, incr, delta, buf, cas, hv, NULL);
955 item_unlock(hv);
956 return ret;
957 }
958
959 /*
960 * Stores an item in the cache (high level, obeys set/add/replace semantics)
961 */
store_item(item * item,int comm,LIBEVENT_THREAD * t,int * nbytes,uint64_t * cas,const uint64_t cas_in,bool cas_stale)962 enum store_item_type store_item(item *item, int comm, LIBEVENT_THREAD *t, int *nbytes, uint64_t *cas, const uint64_t cas_in, bool cas_stale) {
963 enum store_item_type ret;
964 uint32_t hv;
965
966 hv = hash(ITEM_key(item), item->nkey);
967 item_lock(hv);
968 ret = do_store_item(item, comm, t, hv, nbytes, cas, cas_in, cas_stale);
969 item_unlock(hv);
970 return ret;
971 }
972
973 /******************************* GLOBAL STATS ******************************/
974
STATS_LOCK(void)975 void STATS_LOCK(void) {
976 pthread_mutex_lock(&stats_lock);
977 }
978
STATS_UNLOCK(void)979 void STATS_UNLOCK(void) {
980 pthread_mutex_unlock(&stats_lock);
981 }
982
threadlocal_stats_reset(void)983 void threadlocal_stats_reset(void) {
984 int ii;
985 for (ii = 0; ii < settings.num_threads; ++ii) {
986 pthread_mutex_lock(&threads[ii].stats.mutex);
987 #define X(name) threads[ii].stats.name = 0;
988 THREAD_STATS_FIELDS
989 #ifdef EXTSTORE
990 EXTSTORE_THREAD_STATS_FIELDS
991 #endif
992 #ifdef PROXY
993 PROXY_THREAD_STATS_FIELDS
994 #endif
995 #undef X
996
997 memset(&threads[ii].stats.slab_stats, 0,
998 sizeof(threads[ii].stats.slab_stats));
999 memset(&threads[ii].stats.lru_hits, 0,
1000 sizeof(uint64_t) * POWER_LARGEST);
1001
1002 pthread_mutex_unlock(&threads[ii].stats.mutex);
1003 }
1004 }
1005
threadlocal_stats_aggregate(struct thread_stats * stats)1006 void threadlocal_stats_aggregate(struct thread_stats *stats) {
1007 int ii, sid;
1008
1009 /* The struct has a mutex, but we can safely set the whole thing
1010 * to zero since it is unused when aggregating. */
1011 memset(stats, 0, sizeof(*stats));
1012
1013 for (ii = 0; ii < settings.num_threads; ++ii) {
1014 pthread_mutex_lock(&threads[ii].stats.mutex);
1015 #define X(name) stats->name += threads[ii].stats.name;
1016 THREAD_STATS_FIELDS
1017 #ifdef EXTSTORE
1018 EXTSTORE_THREAD_STATS_FIELDS
1019 #endif
1020 #ifdef PROXY
1021 PROXY_THREAD_STATS_FIELDS
1022 #endif
1023 #undef X
1024
1025 for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
1026 #define X(name) stats->slab_stats[sid].name += \
1027 threads[ii].stats.slab_stats[sid].name;
1028 SLAB_STATS_FIELDS
1029 #undef X
1030 }
1031
1032 for (sid = 0; sid < POWER_LARGEST; sid++) {
1033 stats->lru_hits[sid] +=
1034 threads[ii].stats.lru_hits[sid];
1035 stats->slab_stats[CLEAR_LRU(sid)].get_hits +=
1036 threads[ii].stats.lru_hits[sid];
1037 }
1038
1039 stats->read_buf_count += threads[ii].rbuf_cache->total;
1040 stats->read_buf_bytes += threads[ii].rbuf_cache->total * READ_BUFFER_SIZE;
1041 stats->read_buf_bytes_free += threads[ii].rbuf_cache->freecurr * READ_BUFFER_SIZE;
1042 pthread_mutex_unlock(&threads[ii].stats.mutex);
1043 }
1044 }
1045
slab_stats_aggregate(struct thread_stats * stats,struct slab_stats * out)1046 void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) {
1047 int sid;
1048
1049 memset(out, 0, sizeof(*out));
1050
1051 for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
1052 #define X(name) out->name += stats->slab_stats[sid].name;
1053 SLAB_STATS_FIELDS
1054 #undef X
1055 }
1056 }
1057
memcached_thread_notify_init(struct thread_notify * tn)1058 static void memcached_thread_notify_init(struct thread_notify *tn) {
1059 #ifdef HAVE_EVENTFD
1060 tn->notify_event_fd = eventfd(0, EFD_NONBLOCK);
1061 if (tn->notify_event_fd == -1) {
1062 perror("failed creating eventfd for worker thread");
1063 exit(1);
1064 }
1065 #else
1066 int fds[2];
1067 if (pipe(fds)) {
1068 perror("Can't create notify pipe");
1069 exit(1);
1070 }
1071
1072 tn->notify_receive_fd = fds[0];
1073 tn->notify_send_fd = fds[1];
1074 #endif
1075 }
1076
1077 /*
1078 * Initializes the thread subsystem, creating various worker threads.
1079 *
1080 * nthreads Number of worker event handler threads to spawn
1081 */
memcached_thread_init(int nthreads,void * arg)1082 void memcached_thread_init(int nthreads, void *arg) {
1083 int i;
1084 int power;
1085
1086 for (i = 0; i < POWER_LARGEST; i++) {
1087 pthread_mutex_init(&lru_locks[i], NULL);
1088 }
1089 pthread_mutex_init(&worker_hang_lock, NULL);
1090
1091 pthread_mutex_init(&init_lock, NULL);
1092 pthread_cond_init(&init_cond, NULL);
1093
1094 /* Want a wide lock table, but don't waste memory */
1095 if (nthreads < 3) {
1096 power = 10;
1097 } else if (nthreads < 4) {
1098 power = 11;
1099 } else if (nthreads < 5) {
1100 power = 12;
1101 } else if (nthreads <= 10) {
1102 power = 13;
1103 } else if (nthreads <= 20) {
1104 power = 14;
1105 } else {
1106 /* 32k buckets. just under the hashpower default. */
1107 power = 15;
1108 }
1109
1110 if (power >= hashpower) {
1111 fprintf(stderr, "Hash table power size (%d) cannot be equal to or less than item lock table (%d)\n", hashpower, power);
1112 fprintf(stderr, "Item lock table grows with `-t N` (worker threadcount)\n");
1113 fprintf(stderr, "Hash table grows with `-o hashpower=N` \n");
1114 exit(1);
1115 }
1116
1117 item_lock_count = hashsize(power);
1118 item_lock_hashpower = power;
1119
1120 item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
1121 if (! item_locks) {
1122 perror("Can't allocate item locks");
1123 exit(1);
1124 }
1125 for (i = 0; i < item_lock_count; i++) {
1126 pthread_mutex_init(&item_locks[i], NULL);
1127 }
1128
1129 threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
1130 if (! threads) {
1131 perror("Can't allocate thread descriptors");
1132 exit(1);
1133 }
1134
1135 for (i = 0; i < nthreads; i++) {
1136 memcached_thread_notify_init(&threads[i].n);
1137 memcached_thread_notify_init(&threads[i].ion);
1138 #ifdef EXTSTORE
1139 threads[i].storage = arg;
1140 #endif
1141 threads[i].thread_baseid = i;
1142 setup_thread(&threads[i]);
1143 /* Reserve three fds for the libevent base, and two for the pipe */
1144 stats_state.reserved_fds += 5;
1145 }
1146
1147 /* Create threads after we've done all the libevent setup. */
1148 for (i = 0; i < nthreads; i++) {
1149 create_worker(worker_libevent, &threads[i]);
1150 }
1151
1152 /* Wait for all the threads to set themselves up before returning. */
1153 pthread_mutex_lock(&init_lock);
1154 wait_for_thread_registration(nthreads);
1155 pthread_mutex_unlock(&init_lock);
1156 }
1157
1158