xref: /memcached-1.4.29/thread.c (revision 690a9a9d)
1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  * Thread management for memcached.
4  */
5 #include "memcached.h"
6 #include <assert.h>
7 #include <stdio.h>
8 #include <errno.h>
9 #include <stdlib.h>
10 #include <string.h>
11 #include <pthread.h>
12 
13 #ifdef __sun
14 #include <atomic.h>
15 #endif
16 
17 #define ITEMS_PER_ALLOC 64
18 
19 /* An item in the connection queue. */
20 typedef struct conn_queue_item CQ_ITEM;
21 struct conn_queue_item {
22     int               sfd;
23     enum conn_states  init_state;
24     int               event_flags;
25     int               read_buffer_size;
26     enum network_transport     transport;
27     CQ_ITEM          *next;
28 };
29 
30 /* A connection queue. */
31 typedef struct conn_queue CQ;
32 struct conn_queue {
33     CQ_ITEM *head;
34     CQ_ITEM *tail;
35     pthread_mutex_t lock;
36 };
37 
38 /* Locks for cache LRU operations */
39 pthread_mutex_t lru_locks[POWER_LARGEST];
40 
41 /* Connection lock around accepting new connections */
42 pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER;
43 
44 #if !defined(HAVE_GCC_ATOMICS) && !defined(__sun)
45 pthread_mutex_t atomics_mutex = PTHREAD_MUTEX_INITIALIZER;
46 #endif
47 
48 /* Lock for global stats */
49 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
50 
51 /* Lock to cause worker threads to hang up after being woken */
52 static pthread_mutex_t worker_hang_lock;
53 
54 /* Free list of CQ_ITEM structs */
55 static CQ_ITEM *cqi_freelist;
56 static pthread_mutex_t cqi_freelist_lock;
57 
58 static pthread_mutex_t *item_locks;
59 /* size of the item lock hash table */
60 static uint32_t item_lock_count;
61 unsigned int item_lock_hashpower;
62 #define hashsize(n) ((unsigned long int)1<<(n))
63 #define hashmask(n) (hashsize(n)-1)
64 
65 static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;
66 
67 /*
68  * Each libevent instance has a wakeup pipe, which other threads
69  * can use to signal that they've put a new connection on its queue.
70  */
71 static LIBEVENT_THREAD *threads;
72 
73 /*
74  * Number of worker threads that have finished setting themselves up.
75  */
76 static int init_count = 0;
77 static pthread_mutex_t init_lock;
78 static pthread_cond_t init_cond;
79 
80 
81 static void thread_libevent_process(int fd, short which, void *arg);
82 
refcount_incr(unsigned short * refcount)83 unsigned short refcount_incr(unsigned short *refcount) {
84 #ifdef HAVE_GCC_ATOMICS
85     return __sync_add_and_fetch(refcount, 1);
86 #elif defined(__sun)
87     return atomic_inc_ushort_nv(refcount);
88 #else
89     unsigned short res;
90     mutex_lock(&atomics_mutex);
91     (*refcount)++;
92     res = *refcount;
93     mutex_unlock(&atomics_mutex);
94     return res;
95 #endif
96 }
97 
refcount_decr(unsigned short * refcount)98 unsigned short refcount_decr(unsigned short *refcount) {
99 #ifdef HAVE_GCC_ATOMICS
100     return __sync_sub_and_fetch(refcount, 1);
101 #elif defined(__sun)
102     return atomic_dec_ushort_nv(refcount);
103 #else
104     unsigned short res;
105     mutex_lock(&atomics_mutex);
106     (*refcount)--;
107     res = *refcount;
108     mutex_unlock(&atomics_mutex);
109     return res;
110 #endif
111 }
112 
113 /* item_lock() must be held for an item before any modifications to either its
114  * associated hash bucket, or the structure itself.
115  * LRU modifications must hold the item lock, and the LRU lock.
116  * LRU's accessing items must item_trylock() before modifying an item.
117  * Items accessable from an LRU must not be freed or modified
118  * without first locking and removing from the LRU.
119  */
120 
item_lock(uint32_t hv)121 void item_lock(uint32_t hv) {
122     mutex_lock(&item_locks[hv & hashmask(item_lock_hashpower)]);
123 }
124 
item_trylock(uint32_t hv)125 void *item_trylock(uint32_t hv) {
126     pthread_mutex_t *lock = &item_locks[hv & hashmask(item_lock_hashpower)];
127     if (pthread_mutex_trylock(lock) == 0) {
128         return lock;
129     }
130     return NULL;
131 }
132 
item_trylock_unlock(void * lock)133 void item_trylock_unlock(void *lock) {
134     mutex_unlock((pthread_mutex_t *) lock);
135 }
136 
item_unlock(uint32_t hv)137 void item_unlock(uint32_t hv) {
138     mutex_unlock(&item_locks[hv & hashmask(item_lock_hashpower)]);
139 }
140 
wait_for_thread_registration(int nthreads)141 static void wait_for_thread_registration(int nthreads) {
142     while (init_count < nthreads) {
143         pthread_cond_wait(&init_cond, &init_lock);
144     }
145 }
146 
register_thread_initialized(void)147 static void register_thread_initialized(void) {
148     pthread_mutex_lock(&init_lock);
149     init_count++;
150     pthread_cond_signal(&init_cond);
151     pthread_mutex_unlock(&init_lock);
152     /* Force worker threads to pile up if someone wants us to */
153     pthread_mutex_lock(&worker_hang_lock);
154     pthread_mutex_unlock(&worker_hang_lock);
155 }
156 
157 /* Must not be called with any deeper locks held */
pause_threads(enum pause_thread_types type)158 void pause_threads(enum pause_thread_types type) {
159     char buf[1];
160     int i;
161 
162     buf[0] = 0;
163     switch (type) {
164         case PAUSE_ALL_THREADS:
165             slabs_rebalancer_pause();
166             lru_crawler_pause();
167             lru_maintainer_pause();
168         case PAUSE_WORKER_THREADS:
169             buf[0] = 'p';
170             pthread_mutex_lock(&worker_hang_lock);
171             break;
172         case RESUME_ALL_THREADS:
173             slabs_rebalancer_resume();
174             lru_crawler_resume();
175             lru_maintainer_resume();
176         case RESUME_WORKER_THREADS:
177             pthread_mutex_unlock(&worker_hang_lock);
178             break;
179         default:
180             fprintf(stderr, "Unknown lock type: %d\n", type);
181             assert(1 == 0);
182             break;
183     }
184 
185     /* Only send a message if we have one. */
186     if (buf[0] == 0) {
187         return;
188     }
189 
190     pthread_mutex_lock(&init_lock);
191     init_count = 0;
192     for (i = 0; i < settings.num_threads; i++) {
193         if (write(threads[i].notify_send_fd, buf, 1) != 1) {
194             perror("Failed writing to notify pipe");
195             /* TODO: This is a fatal problem. Can it ever happen temporarily? */
196         }
197     }
198     wait_for_thread_registration(settings.num_threads);
199     pthread_mutex_unlock(&init_lock);
200 }
201 
202 /*
203  * Initializes a connection queue.
204  */
cq_init(CQ * cq)205 static void cq_init(CQ *cq) {
206     pthread_mutex_init(&cq->lock, NULL);
207     cq->head = NULL;
208     cq->tail = NULL;
209 }
210 
211 /*
212  * Looks for an item on a connection queue, but doesn't block if there isn't
213  * one.
214  * Returns the item, or NULL if no item is available
215  */
cq_pop(CQ * cq)216 static CQ_ITEM *cq_pop(CQ *cq) {
217     CQ_ITEM *item;
218 
219     pthread_mutex_lock(&cq->lock);
220     item = cq->head;
221     if (NULL != item) {
222         cq->head = item->next;
223         if (NULL == cq->head)
224             cq->tail = NULL;
225     }
226     pthread_mutex_unlock(&cq->lock);
227 
228     return item;
229 }
230 
231 /*
232  * Adds an item to a connection queue.
233  */
cq_push(CQ * cq,CQ_ITEM * item)234 static void cq_push(CQ *cq, CQ_ITEM *item) {
235     item->next = NULL;
236 
237     pthread_mutex_lock(&cq->lock);
238     if (NULL == cq->tail)
239         cq->head = item;
240     else
241         cq->tail->next = item;
242     cq->tail = item;
243     pthread_mutex_unlock(&cq->lock);
244 }
245 
246 /*
247  * Returns a fresh connection queue item.
248  */
cqi_new(void)249 static CQ_ITEM *cqi_new(void) {
250     CQ_ITEM *item = NULL;
251     pthread_mutex_lock(&cqi_freelist_lock);
252     if (cqi_freelist) {
253         item = cqi_freelist;
254         cqi_freelist = item->next;
255     }
256     pthread_mutex_unlock(&cqi_freelist_lock);
257 
258     if (NULL == item) {
259         int i;
260 
261         /* Allocate a bunch of items at once to reduce fragmentation */
262         item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC);
263         if (NULL == item) {
264             STATS_LOCK();
265             stats.malloc_fails++;
266             STATS_UNLOCK();
267             return NULL;
268         }
269 
270         /*
271          * Link together all the new items except the first one
272          * (which we'll return to the caller) for placement on
273          * the freelist.
274          */
275         for (i = 2; i < ITEMS_PER_ALLOC; i++)
276             item[i - 1].next = &item[i];
277 
278         pthread_mutex_lock(&cqi_freelist_lock);
279         item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
280         cqi_freelist = &item[1];
281         pthread_mutex_unlock(&cqi_freelist_lock);
282     }
283 
284     return item;
285 }
286 
287 
288 /*
289  * Frees a connection queue item (adds it to the freelist.)
290  */
cqi_free(CQ_ITEM * item)291 static void cqi_free(CQ_ITEM *item) {
292     pthread_mutex_lock(&cqi_freelist_lock);
293     item->next = cqi_freelist;
294     cqi_freelist = item;
295     pthread_mutex_unlock(&cqi_freelist_lock);
296 }
297 
298 
299 /*
300  * Creates a worker thread.
301  */
create_worker(void * (* func)(void *),void * arg)302 static void create_worker(void *(*func)(void *), void *arg) {
303     pthread_attr_t  attr;
304     int             ret;
305 
306     pthread_attr_init(&attr);
307 
308     if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0) {
309         fprintf(stderr, "Can't create thread: %s\n",
310                 strerror(ret));
311         exit(1);
312     }
313 }
314 
315 /*
316  * Sets whether or not we accept new connections.
317  */
accept_new_conns(const bool do_accept)318 void accept_new_conns(const bool do_accept) {
319     pthread_mutex_lock(&conn_lock);
320     do_accept_new_conns(do_accept);
321     pthread_mutex_unlock(&conn_lock);
322 }
323 /****************************** LIBEVENT THREADS *****************************/
324 
325 /*
326  * Set up a thread's information.
327  */
setup_thread(LIBEVENT_THREAD * me)328 static void setup_thread(LIBEVENT_THREAD *me) {
329     me->base = event_init();
330     if (! me->base) {
331         fprintf(stderr, "Can't allocate event base\n");
332         exit(1);
333     }
334 
335     /* Listen for notifications from other threads */
336     event_set(&me->notify_event, me->notify_receive_fd,
337               EV_READ | EV_PERSIST, thread_libevent_process, me);
338     event_base_set(me->base, &me->notify_event);
339 
340     if (event_add(&me->notify_event, 0) == -1) {
341         fprintf(stderr, "Can't monitor libevent notify pipe\n");
342         exit(1);
343     }
344 
345     me->new_conn_queue = malloc(sizeof(struct conn_queue));
346     if (me->new_conn_queue == NULL) {
347         perror("Failed to allocate memory for connection queue");
348         exit(EXIT_FAILURE);
349     }
350     cq_init(me->new_conn_queue);
351 
352     if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
353         perror("Failed to initialize mutex");
354         exit(EXIT_FAILURE);
355     }
356 
357     me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
358                                     NULL, NULL);
359     if (me->suffix_cache == NULL) {
360         fprintf(stderr, "Failed to create suffix cache\n");
361         exit(EXIT_FAILURE);
362     }
363 }
364 
365 /*
366  * Worker thread: main event loop
367  */
worker_libevent(void * arg)368 static void *worker_libevent(void *arg) {
369     LIBEVENT_THREAD *me = arg;
370 
371     /* Any per-thread setup can happen here; memcached_thread_init() will block until
372      * all threads have finished initializing.
373      */
374     me->l = logger_create();
375     if (me->l == NULL) {
376         abort();
377     }
378 
379     register_thread_initialized();
380 
381     event_base_loop(me->base, 0);
382     return NULL;
383 }
384 
385 
386 /*
387  * Processes an incoming "handle a new connection" item. This is called when
388  * input arrives on the libevent wakeup pipe.
389  */
thread_libevent_process(int fd,short which,void * arg)390 static void thread_libevent_process(int fd, short which, void *arg) {
391     LIBEVENT_THREAD *me = arg;
392     CQ_ITEM *item;
393     char buf[1];
394     unsigned int timeout_fd;
395 
396     if (read(fd, buf, 1) != 1) {
397         if (settings.verbose > 0)
398             fprintf(stderr, "Can't read from libevent pipe\n");
399         return;
400     }
401 
402     switch (buf[0]) {
403     case 'c':
404         item = cq_pop(me->new_conn_queue);
405 
406         if (NULL != item) {
407             conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
408                                item->read_buffer_size, item->transport,
409                                me->base);
410             if (c == NULL) {
411                 if (IS_UDP(item->transport)) {
412                     fprintf(stderr, "Can't listen for events on UDP socket\n");
413                     exit(1);
414                 } else {
415                     if (settings.verbose > 0) {
416                         fprintf(stderr, "Can't listen for events on fd %d\n",
417                             item->sfd);
418                     }
419                     close(item->sfd);
420                 }
421             } else {
422                 c->thread = me;
423             }
424             cqi_free(item);
425         }
426         break;
427     /* we were told to pause and report in */
428     case 'p':
429         register_thread_initialized();
430         break;
431     /* a client socket timed out */
432     case 't':
433         if (read(fd, &timeout_fd, sizeof(timeout_fd)) != sizeof(timeout_fd)) {
434             if (settings.verbose > 0)
435                 fprintf(stderr, "Can't read timeout fd from libevent pipe\n");
436             return;
437         }
438         conn_close_idle(conns[timeout_fd]);
439         break;
440     }
441 }
442 
443 /* Which thread we assigned a connection to most recently. */
444 static int last_thread = -1;
445 
446 /*
447  * Dispatches a new connection to another thread. This is only ever called
448  * from the main thread, either during initialization (for UDP) or because
449  * of an incoming connection.
450  */
dispatch_conn_new(int sfd,enum conn_states init_state,int event_flags,int read_buffer_size,enum network_transport transport)451 void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
452                        int read_buffer_size, enum network_transport transport) {
453     CQ_ITEM *item = cqi_new();
454     char buf[1];
455     if (item == NULL) {
456         close(sfd);
457         /* given that malloc failed this may also fail, but let's try */
458         fprintf(stderr, "Failed to allocate memory for connection object\n");
459         return ;
460     }
461 
462     int tid = (last_thread + 1) % settings.num_threads;
463 
464     LIBEVENT_THREAD *thread = threads + tid;
465 
466     last_thread = tid;
467 
468     item->sfd = sfd;
469     item->init_state = init_state;
470     item->event_flags = event_flags;
471     item->read_buffer_size = read_buffer_size;
472     item->transport = transport;
473 
474     cq_push(thread->new_conn_queue, item);
475 
476     MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
477     buf[0] = 'c';
478     if (write(thread->notify_send_fd, buf, 1) != 1) {
479         perror("Writing to thread notify pipe");
480     }
481 }
482 
483 /*
484  * Re-dispatches a connection back to the original thread. Can be called from
485  * any side thread borrowing a connection.
486  * TODO: Look into this. too complicated?
487  */
488 #ifdef BOGUS_DEFINE
redispatch_conn(conn * c)489 void redispatch_conn(conn *c) {
490     CQ_ITEM *item = cqi_new();
491     char buf[1];
492     if (item == NULL) {
493         /* Can't cleanly redispatch connection. close it forcefully. */
494         /* FIXME: is conn_cleanup() necessary?
495          * if conn was handed off to a side thread it should be clean.
496          * could also put it into a "clean_me" state?
497          */
498         c->state = conn_closed;
499         close(c->sfd);
500         return;
501     }
502     LIBEVENT_THREAD *thread = c->thread;
503     item->sfd = sfd;
504     /* pass in the state somehow?
505     item->init_state = conn_closing; */
506     item->event_flags = c->event_flags;
507     item->conn = c;
508 }
509 #endif
510 
511 /* This misses the allow_new_conns flag :( */
sidethread_conn_close(conn * c)512 void sidethread_conn_close(conn *c) {
513     c->state = conn_closed;
514     if (settings.verbose > 1)
515         fprintf(stderr, "<%d connection closed from side thread.\n", c->sfd);
516     close(c->sfd);
517 
518     STATS_LOCK();
519     stats_state.curr_conns--;
520     STATS_UNLOCK();
521 
522     return;
523 }
524 
525 /*
526  * Returns true if this is the thread that listens for new TCP connections.
527  */
is_listen_thread()528 int is_listen_thread() {
529     return pthread_self() == dispatcher_thread.thread_id;
530 }
531 
532 /********************************* ITEM ACCESS *******************************/
533 
534 /*
535  * Allocates a new item.
536  */
item_alloc(char * key,size_t nkey,int flags,rel_time_t exptime,int nbytes)537 item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
538     item *it;
539     /* do_item_alloc handles its own locks */
540     it = do_item_alloc(key, nkey, flags, exptime, nbytes);
541     return it;
542 }
543 
544 /*
545  * Returns an item if it hasn't been marked as expired,
546  * lazy-expiring as needed.
547  */
item_get(const char * key,const size_t nkey,conn * c)548 item *item_get(const char *key, const size_t nkey, conn *c) {
549     item *it;
550     uint32_t hv;
551     hv = hash(key, nkey);
552     item_lock(hv);
553     it = do_item_get(key, nkey, hv, c);
554     item_unlock(hv);
555     return it;
556 }
557 
item_touch(const char * key,size_t nkey,uint32_t exptime,conn * c)558 item *item_touch(const char *key, size_t nkey, uint32_t exptime, conn *c) {
559     item *it;
560     uint32_t hv;
561     hv = hash(key, nkey);
562     item_lock(hv);
563     it = do_item_touch(key, nkey, exptime, hv, c);
564     item_unlock(hv);
565     return it;
566 }
567 
568 /*
569  * Links an item into the LRU and hashtable.
570  */
item_link(item * item)571 int item_link(item *item) {
572     int ret;
573     uint32_t hv;
574 
575     hv = hash(ITEM_key(item), item->nkey);
576     item_lock(hv);
577     ret = do_item_link(item, hv);
578     item_unlock(hv);
579     return ret;
580 }
581 
582 /*
583  * Decrements the reference count on an item and adds it to the freelist if
584  * needed.
585  */
item_remove(item * item)586 void item_remove(item *item) {
587     uint32_t hv;
588     hv = hash(ITEM_key(item), item->nkey);
589 
590     item_lock(hv);
591     do_item_remove(item);
592     item_unlock(hv);
593 }
594 
595 /*
596  * Replaces one item with another in the hashtable.
597  * Unprotected by a mutex lock since the core server does not require
598  * it to be thread-safe.
599  */
item_replace(item * old_it,item * new_it,const uint32_t hv)600 int item_replace(item *old_it, item *new_it, const uint32_t hv) {
601     return do_item_replace(old_it, new_it, hv);
602 }
603 
604 /*
605  * Unlinks an item from the LRU and hashtable.
606  */
item_unlink(item * item)607 void item_unlink(item *item) {
608     uint32_t hv;
609     hv = hash(ITEM_key(item), item->nkey);
610     item_lock(hv);
611     do_item_unlink(item, hv);
612     item_unlock(hv);
613 }
614 
615 /*
616  * Moves an item to the back of the LRU queue.
617  */
item_update(item * item)618 void item_update(item *item) {
619     uint32_t hv;
620     hv = hash(ITEM_key(item), item->nkey);
621 
622     item_lock(hv);
623     do_item_update(item);
624     item_unlock(hv);
625 }
626 
627 /*
628  * Does arithmetic on a numeric item value.
629  */
add_delta(conn * c,const char * key,const size_t nkey,int incr,const int64_t delta,char * buf,uint64_t * cas)630 enum delta_result_type add_delta(conn *c, const char *key,
631                                  const size_t nkey, int incr,
632                                  const int64_t delta, char *buf,
633                                  uint64_t *cas) {
634     enum delta_result_type ret;
635     uint32_t hv;
636 
637     hv = hash(key, nkey);
638     item_lock(hv);
639     ret = do_add_delta(c, key, nkey, incr, delta, buf, cas, hv);
640     item_unlock(hv);
641     return ret;
642 }
643 
644 /*
645  * Stores an item in the cache (high level, obeys set/add/replace semantics)
646  */
store_item(item * item,int comm,conn * c)647 enum store_item_type store_item(item *item, int comm, conn* c) {
648     enum store_item_type ret;
649     uint32_t hv;
650 
651     hv = hash(ITEM_key(item), item->nkey);
652     item_lock(hv);
653     ret = do_store_item(item, comm, c, hv);
654     item_unlock(hv);
655     return ret;
656 }
657 
658 /******************************* GLOBAL STATS ******************************/
659 
STATS_LOCK()660 void STATS_LOCK() {
661     pthread_mutex_lock(&stats_lock);
662 }
663 
STATS_UNLOCK()664 void STATS_UNLOCK() {
665     pthread_mutex_unlock(&stats_lock);
666 }
667 
threadlocal_stats_reset(void)668 void threadlocal_stats_reset(void) {
669     int ii;
670     for (ii = 0; ii < settings.num_threads; ++ii) {
671         pthread_mutex_lock(&threads[ii].stats.mutex);
672 #define X(name) threads[ii].stats.name = 0;
673         THREAD_STATS_FIELDS
674 #undef X
675 
676         memset(&threads[ii].stats.slab_stats, 0,
677                 sizeof(threads[ii].stats.slab_stats));
678 
679         pthread_mutex_unlock(&threads[ii].stats.mutex);
680     }
681 }
682 
threadlocal_stats_aggregate(struct thread_stats * stats)683 void threadlocal_stats_aggregate(struct thread_stats *stats) {
684     int ii, sid;
685 
686     /* The struct has a mutex, but we can safely set the whole thing
687      * to zero since it is unused when aggregating. */
688     memset(stats, 0, sizeof(*stats));
689 
690     for (ii = 0; ii < settings.num_threads; ++ii) {
691         pthread_mutex_lock(&threads[ii].stats.mutex);
692 #define X(name) stats->name += threads[ii].stats.name;
693         THREAD_STATS_FIELDS
694 #undef X
695 
696         for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
697 #define X(name) stats->slab_stats[sid].name += \
698             threads[ii].stats.slab_stats[sid].name;
699             SLAB_STATS_FIELDS
700 #undef X
701         }
702 
703         pthread_mutex_unlock(&threads[ii].stats.mutex);
704     }
705 }
706 
slab_stats_aggregate(struct thread_stats * stats,struct slab_stats * out)707 void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) {
708     int sid;
709 
710     memset(out, 0, sizeof(*out));
711 
712     for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
713 #define X(name) out->name += stats->slab_stats[sid].name;
714         SLAB_STATS_FIELDS
715 #undef X
716     }
717 }
718 
719 /*
720  * Initializes the thread subsystem, creating various worker threads.
721  *
722  * nthreads  Number of worker event handler threads to spawn
723  * main_base Event base for main thread
724  */
memcached_thread_init(int nthreads,struct event_base * main_base)725 void memcached_thread_init(int nthreads, struct event_base *main_base) {
726     int         i;
727     int         power;
728 
729     for (i = 0; i < POWER_LARGEST; i++) {
730         pthread_mutex_init(&lru_locks[i], NULL);
731     }
732     pthread_mutex_init(&worker_hang_lock, NULL);
733 
734     pthread_mutex_init(&init_lock, NULL);
735     pthread_cond_init(&init_cond, NULL);
736 
737     pthread_mutex_init(&cqi_freelist_lock, NULL);
738     cqi_freelist = NULL;
739 
740     /* Want a wide lock table, but don't waste memory */
741     if (nthreads < 3) {
742         power = 10;
743     } else if (nthreads < 4) {
744         power = 11;
745     } else if (nthreads < 5) {
746         power = 12;
747     } else {
748         /* 8192 buckets, and central locks don't scale much past 5 threads */
749         power = 13;
750     }
751 
752     if (power >= hashpower) {
753         fprintf(stderr, "Hash table power size (%d) cannot be equal to or less than item lock table (%d)\n", hashpower, power);
754         fprintf(stderr, "Item lock table grows with `-t N` (worker threadcount)\n");
755         fprintf(stderr, "Hash table grows with `-o hashpower=N` \n");
756         exit(1);
757     }
758 
759     item_lock_count = hashsize(power);
760     item_lock_hashpower = power;
761 
762     item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
763     if (! item_locks) {
764         perror("Can't allocate item locks");
765         exit(1);
766     }
767     for (i = 0; i < item_lock_count; i++) {
768         pthread_mutex_init(&item_locks[i], NULL);
769     }
770 
771     threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
772     if (! threads) {
773         perror("Can't allocate thread descriptors");
774         exit(1);
775     }
776 
777     dispatcher_thread.base = main_base;
778     dispatcher_thread.thread_id = pthread_self();
779 
780     for (i = 0; i < nthreads; i++) {
781         int fds[2];
782         if (pipe(fds)) {
783             perror("Can't create notify pipe");
784             exit(1);
785         }
786 
787         threads[i].notify_receive_fd = fds[0];
788         threads[i].notify_send_fd = fds[1];
789 
790         setup_thread(&threads[i]);
791         /* Reserve three fds for the libevent base, and two for the pipe */
792         stats_state.reserved_fds += 5;
793     }
794 
795     /* Create threads after we've done all the libevent setup. */
796     for (i = 0; i < nthreads; i++) {
797         create_worker(worker_libevent, &threads[i]);
798     }
799 
800     /* Wait for all the threads to set themselves up before returning. */
801     pthread_mutex_lock(&init_lock);
802     wait_for_thread_registration(nthreads);
803     pthread_mutex_unlock(&init_lock);
804 }
805 
806