1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Thread management for memcached.
4 */
5 #include "memcached.h"
6 #include <assert.h>
7 #include <stdio.h>
8 #include <errno.h>
9 #include <stdlib.h>
10 #include <string.h>
11 #include <pthread.h>
12
13 #ifdef __sun
14 #include <atomic.h>
15 #endif
16
17 #define ITEMS_PER_ALLOC 64
18
19 /* An item in the connection queue. */
20 typedef struct conn_queue_item CQ_ITEM;
21 struct conn_queue_item {
22 int sfd;
23 enum conn_states init_state;
24 int event_flags;
25 int read_buffer_size;
26 enum network_transport transport;
27 CQ_ITEM *next;
28 };
29
30 /* A connection queue. */
31 typedef struct conn_queue CQ;
32 struct conn_queue {
33 CQ_ITEM *head;
34 CQ_ITEM *tail;
35 pthread_mutex_t lock;
36 };
37
38 /* Locks for cache LRU operations */
39 pthread_mutex_t lru_locks[POWER_LARGEST];
40
41 /* Connection lock around accepting new connections */
42 pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER;
43
44 #if !defined(HAVE_GCC_ATOMICS) && !defined(__sun)
45 pthread_mutex_t atomics_mutex = PTHREAD_MUTEX_INITIALIZER;
46 #endif
47
48 /* Lock for global stats */
49 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
50
51 /* Lock to cause worker threads to hang up after being woken */
52 static pthread_mutex_t worker_hang_lock;
53
54 /* Free list of CQ_ITEM structs */
55 static CQ_ITEM *cqi_freelist;
56 static pthread_mutex_t cqi_freelist_lock;
57
58 static pthread_mutex_t *item_locks;
59 /* size of the item lock hash table */
60 static uint32_t item_lock_count;
61 unsigned int item_lock_hashpower;
62 #define hashsize(n) ((unsigned long int)1<<(n))
63 #define hashmask(n) (hashsize(n)-1)
64
65 static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;
66
67 /*
68 * Each libevent instance has a wakeup pipe, which other threads
69 * can use to signal that they've put a new connection on its queue.
70 */
71 static LIBEVENT_THREAD *threads;
72
73 /*
74 * Number of worker threads that have finished setting themselves up.
75 */
76 static int init_count = 0;
77 static pthread_mutex_t init_lock;
78 static pthread_cond_t init_cond;
79
80
81 static void thread_libevent_process(int fd, short which, void *arg);
82
refcount_incr(unsigned short * refcount)83 unsigned short refcount_incr(unsigned short *refcount) {
84 #ifdef HAVE_GCC_ATOMICS
85 return __sync_add_and_fetch(refcount, 1);
86 #elif defined(__sun)
87 return atomic_inc_ushort_nv(refcount);
88 #else
89 unsigned short res;
90 mutex_lock(&atomics_mutex);
91 (*refcount)++;
92 res = *refcount;
93 mutex_unlock(&atomics_mutex);
94 return res;
95 #endif
96 }
97
refcount_decr(unsigned short * refcount)98 unsigned short refcount_decr(unsigned short *refcount) {
99 #ifdef HAVE_GCC_ATOMICS
100 return __sync_sub_and_fetch(refcount, 1);
101 #elif defined(__sun)
102 return atomic_dec_ushort_nv(refcount);
103 #else
104 unsigned short res;
105 mutex_lock(&atomics_mutex);
106 (*refcount)--;
107 res = *refcount;
108 mutex_unlock(&atomics_mutex);
109 return res;
110 #endif
111 }
112
113 /* item_lock() must be held for an item before any modifications to either its
114 * associated hash bucket, or the structure itself.
115 * LRU modifications must hold the item lock, and the LRU lock.
116 * LRU's accessing items must item_trylock() before modifying an item.
117 * Items accessable from an LRU must not be freed or modified
118 * without first locking and removing from the LRU.
119 */
120
item_lock(uint32_t hv)121 void item_lock(uint32_t hv) {
122 mutex_lock(&item_locks[hv & hashmask(item_lock_hashpower)]);
123 }
124
item_trylock(uint32_t hv)125 void *item_trylock(uint32_t hv) {
126 pthread_mutex_t *lock = &item_locks[hv & hashmask(item_lock_hashpower)];
127 if (pthread_mutex_trylock(lock) == 0) {
128 return lock;
129 }
130 return NULL;
131 }
132
item_trylock_unlock(void * lock)133 void item_trylock_unlock(void *lock) {
134 mutex_unlock((pthread_mutex_t *) lock);
135 }
136
item_unlock(uint32_t hv)137 void item_unlock(uint32_t hv) {
138 mutex_unlock(&item_locks[hv & hashmask(item_lock_hashpower)]);
139 }
140
wait_for_thread_registration(int nthreads)141 static void wait_for_thread_registration(int nthreads) {
142 while (init_count < nthreads) {
143 pthread_cond_wait(&init_cond, &init_lock);
144 }
145 }
146
register_thread_initialized(void)147 static void register_thread_initialized(void) {
148 pthread_mutex_lock(&init_lock);
149 init_count++;
150 pthread_cond_signal(&init_cond);
151 pthread_mutex_unlock(&init_lock);
152 /* Force worker threads to pile up if someone wants us to */
153 pthread_mutex_lock(&worker_hang_lock);
154 pthread_mutex_unlock(&worker_hang_lock);
155 }
156
157 /* Must not be called with any deeper locks held */
pause_threads(enum pause_thread_types type)158 void pause_threads(enum pause_thread_types type) {
159 char buf[1];
160 int i;
161
162 buf[0] = 0;
163 switch (type) {
164 case PAUSE_ALL_THREADS:
165 slabs_rebalancer_pause();
166 lru_crawler_pause();
167 lru_maintainer_pause();
168 case PAUSE_WORKER_THREADS:
169 buf[0] = 'p';
170 pthread_mutex_lock(&worker_hang_lock);
171 break;
172 case RESUME_ALL_THREADS:
173 slabs_rebalancer_resume();
174 lru_crawler_resume();
175 lru_maintainer_resume();
176 case RESUME_WORKER_THREADS:
177 pthread_mutex_unlock(&worker_hang_lock);
178 break;
179 default:
180 fprintf(stderr, "Unknown lock type: %d\n", type);
181 assert(1 == 0);
182 break;
183 }
184
185 /* Only send a message if we have one. */
186 if (buf[0] == 0) {
187 return;
188 }
189
190 pthread_mutex_lock(&init_lock);
191 init_count = 0;
192 for (i = 0; i < settings.num_threads; i++) {
193 if (write(threads[i].notify_send_fd, buf, 1) != 1) {
194 perror("Failed writing to notify pipe");
195 /* TODO: This is a fatal problem. Can it ever happen temporarily? */
196 }
197 }
198 wait_for_thread_registration(settings.num_threads);
199 pthread_mutex_unlock(&init_lock);
200 }
201
202 /*
203 * Initializes a connection queue.
204 */
cq_init(CQ * cq)205 static void cq_init(CQ *cq) {
206 pthread_mutex_init(&cq->lock, NULL);
207 cq->head = NULL;
208 cq->tail = NULL;
209 }
210
211 /*
212 * Looks for an item on a connection queue, but doesn't block if there isn't
213 * one.
214 * Returns the item, or NULL if no item is available
215 */
cq_pop(CQ * cq)216 static CQ_ITEM *cq_pop(CQ *cq) {
217 CQ_ITEM *item;
218
219 pthread_mutex_lock(&cq->lock);
220 item = cq->head;
221 if (NULL != item) {
222 cq->head = item->next;
223 if (NULL == cq->head)
224 cq->tail = NULL;
225 }
226 pthread_mutex_unlock(&cq->lock);
227
228 return item;
229 }
230
231 /*
232 * Adds an item to a connection queue.
233 */
cq_push(CQ * cq,CQ_ITEM * item)234 static void cq_push(CQ *cq, CQ_ITEM *item) {
235 item->next = NULL;
236
237 pthread_mutex_lock(&cq->lock);
238 if (NULL == cq->tail)
239 cq->head = item;
240 else
241 cq->tail->next = item;
242 cq->tail = item;
243 pthread_mutex_unlock(&cq->lock);
244 }
245
246 /*
247 * Returns a fresh connection queue item.
248 */
cqi_new(void)249 static CQ_ITEM *cqi_new(void) {
250 CQ_ITEM *item = NULL;
251 pthread_mutex_lock(&cqi_freelist_lock);
252 if (cqi_freelist) {
253 item = cqi_freelist;
254 cqi_freelist = item->next;
255 }
256 pthread_mutex_unlock(&cqi_freelist_lock);
257
258 if (NULL == item) {
259 int i;
260
261 /* Allocate a bunch of items at once to reduce fragmentation */
262 item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC);
263 if (NULL == item) {
264 STATS_LOCK();
265 stats.malloc_fails++;
266 STATS_UNLOCK();
267 return NULL;
268 }
269
270 /*
271 * Link together all the new items except the first one
272 * (which we'll return to the caller) for placement on
273 * the freelist.
274 */
275 for (i = 2; i < ITEMS_PER_ALLOC; i++)
276 item[i - 1].next = &item[i];
277
278 pthread_mutex_lock(&cqi_freelist_lock);
279 item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
280 cqi_freelist = &item[1];
281 pthread_mutex_unlock(&cqi_freelist_lock);
282 }
283
284 return item;
285 }
286
287
288 /*
289 * Frees a connection queue item (adds it to the freelist.)
290 */
cqi_free(CQ_ITEM * item)291 static void cqi_free(CQ_ITEM *item) {
292 pthread_mutex_lock(&cqi_freelist_lock);
293 item->next = cqi_freelist;
294 cqi_freelist = item;
295 pthread_mutex_unlock(&cqi_freelist_lock);
296 }
297
298
299 /*
300 * Creates a worker thread.
301 */
create_worker(void * (* func)(void *),void * arg)302 static void create_worker(void *(*func)(void *), void *arg) {
303 pthread_attr_t attr;
304 int ret;
305
306 pthread_attr_init(&attr);
307
308 if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0) {
309 fprintf(stderr, "Can't create thread: %s\n",
310 strerror(ret));
311 exit(1);
312 }
313 }
314
315 /*
316 * Sets whether or not we accept new connections.
317 */
accept_new_conns(const bool do_accept)318 void accept_new_conns(const bool do_accept) {
319 pthread_mutex_lock(&conn_lock);
320 do_accept_new_conns(do_accept);
321 pthread_mutex_unlock(&conn_lock);
322 }
323 /****************************** LIBEVENT THREADS *****************************/
324
325 /*
326 * Set up a thread's information.
327 */
setup_thread(LIBEVENT_THREAD * me)328 static void setup_thread(LIBEVENT_THREAD *me) {
329 me->base = event_init();
330 if (! me->base) {
331 fprintf(stderr, "Can't allocate event base\n");
332 exit(1);
333 }
334
335 /* Listen for notifications from other threads */
336 event_set(&me->notify_event, me->notify_receive_fd,
337 EV_READ | EV_PERSIST, thread_libevent_process, me);
338 event_base_set(me->base, &me->notify_event);
339
340 if (event_add(&me->notify_event, 0) == -1) {
341 fprintf(stderr, "Can't monitor libevent notify pipe\n");
342 exit(1);
343 }
344
345 me->new_conn_queue = malloc(sizeof(struct conn_queue));
346 if (me->new_conn_queue == NULL) {
347 perror("Failed to allocate memory for connection queue");
348 exit(EXIT_FAILURE);
349 }
350 cq_init(me->new_conn_queue);
351
352 if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
353 perror("Failed to initialize mutex");
354 exit(EXIT_FAILURE);
355 }
356
357 me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
358 NULL, NULL);
359 if (me->suffix_cache == NULL) {
360 fprintf(stderr, "Failed to create suffix cache\n");
361 exit(EXIT_FAILURE);
362 }
363 }
364
365 /*
366 * Worker thread: main event loop
367 */
worker_libevent(void * arg)368 static void *worker_libevent(void *arg) {
369 LIBEVENT_THREAD *me = arg;
370
371 /* Any per-thread setup can happen here; memcached_thread_init() will block until
372 * all threads have finished initializing.
373 */
374 me->l = logger_create();
375 if (me->l == NULL) {
376 abort();
377 }
378
379 register_thread_initialized();
380
381 event_base_loop(me->base, 0);
382 return NULL;
383 }
384
385
386 /*
387 * Processes an incoming "handle a new connection" item. This is called when
388 * input arrives on the libevent wakeup pipe.
389 */
thread_libevent_process(int fd,short which,void * arg)390 static void thread_libevent_process(int fd, short which, void *arg) {
391 LIBEVENT_THREAD *me = arg;
392 CQ_ITEM *item;
393 char buf[1];
394 unsigned int timeout_fd;
395
396 if (read(fd, buf, 1) != 1) {
397 if (settings.verbose > 0)
398 fprintf(stderr, "Can't read from libevent pipe\n");
399 return;
400 }
401
402 switch (buf[0]) {
403 case 'c':
404 item = cq_pop(me->new_conn_queue);
405
406 if (NULL != item) {
407 conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
408 item->read_buffer_size, item->transport,
409 me->base);
410 if (c == NULL) {
411 if (IS_UDP(item->transport)) {
412 fprintf(stderr, "Can't listen for events on UDP socket\n");
413 exit(1);
414 } else {
415 if (settings.verbose > 0) {
416 fprintf(stderr, "Can't listen for events on fd %d\n",
417 item->sfd);
418 }
419 close(item->sfd);
420 }
421 } else {
422 c->thread = me;
423 }
424 cqi_free(item);
425 }
426 break;
427 /* we were told to pause and report in */
428 case 'p':
429 register_thread_initialized();
430 break;
431 /* a client socket timed out */
432 case 't':
433 if (read(fd, &timeout_fd, sizeof(timeout_fd)) != sizeof(timeout_fd)) {
434 if (settings.verbose > 0)
435 fprintf(stderr, "Can't read timeout fd from libevent pipe\n");
436 return;
437 }
438 conn_close_idle(conns[timeout_fd]);
439 break;
440 }
441 }
442
443 /* Which thread we assigned a connection to most recently. */
444 static int last_thread = -1;
445
446 /*
447 * Dispatches a new connection to another thread. This is only ever called
448 * from the main thread, either during initialization (for UDP) or because
449 * of an incoming connection.
450 */
dispatch_conn_new(int sfd,enum conn_states init_state,int event_flags,int read_buffer_size,enum network_transport transport)451 void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
452 int read_buffer_size, enum network_transport transport) {
453 CQ_ITEM *item = cqi_new();
454 char buf[1];
455 if (item == NULL) {
456 close(sfd);
457 /* given that malloc failed this may also fail, but let's try */
458 fprintf(stderr, "Failed to allocate memory for connection object\n");
459 return ;
460 }
461
462 int tid = (last_thread + 1) % settings.num_threads;
463
464 LIBEVENT_THREAD *thread = threads + tid;
465
466 last_thread = tid;
467
468 item->sfd = sfd;
469 item->init_state = init_state;
470 item->event_flags = event_flags;
471 item->read_buffer_size = read_buffer_size;
472 item->transport = transport;
473
474 cq_push(thread->new_conn_queue, item);
475
476 MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
477 buf[0] = 'c';
478 if (write(thread->notify_send_fd, buf, 1) != 1) {
479 perror("Writing to thread notify pipe");
480 }
481 }
482
483 /*
484 * Re-dispatches a connection back to the original thread. Can be called from
485 * any side thread borrowing a connection.
486 * TODO: Look into this. too complicated?
487 */
488 #ifdef BOGUS_DEFINE
redispatch_conn(conn * c)489 void redispatch_conn(conn *c) {
490 CQ_ITEM *item = cqi_new();
491 char buf[1];
492 if (item == NULL) {
493 /* Can't cleanly redispatch connection. close it forcefully. */
494 /* FIXME: is conn_cleanup() necessary?
495 * if conn was handed off to a side thread it should be clean.
496 * could also put it into a "clean_me" state?
497 */
498 c->state = conn_closed;
499 close(c->sfd);
500 return;
501 }
502 LIBEVENT_THREAD *thread = c->thread;
503 item->sfd = sfd;
504 /* pass in the state somehow?
505 item->init_state = conn_closing; */
506 item->event_flags = c->event_flags;
507 item->conn = c;
508 }
509 #endif
510
511 /* This misses the allow_new_conns flag :( */
sidethread_conn_close(conn * c)512 void sidethread_conn_close(conn *c) {
513 c->state = conn_closed;
514 if (settings.verbose > 1)
515 fprintf(stderr, "<%d connection closed from side thread.\n", c->sfd);
516 close(c->sfd);
517
518 STATS_LOCK();
519 stats_state.curr_conns--;
520 STATS_UNLOCK();
521
522 return;
523 }
524
525 /*
526 * Returns true if this is the thread that listens for new TCP connections.
527 */
is_listen_thread()528 int is_listen_thread() {
529 return pthread_self() == dispatcher_thread.thread_id;
530 }
531
532 /********************************* ITEM ACCESS *******************************/
533
534 /*
535 * Allocates a new item.
536 */
item_alloc(char * key,size_t nkey,int flags,rel_time_t exptime,int nbytes)537 item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
538 item *it;
539 /* do_item_alloc handles its own locks */
540 it = do_item_alloc(key, nkey, flags, exptime, nbytes);
541 return it;
542 }
543
544 /*
545 * Returns an item if it hasn't been marked as expired,
546 * lazy-expiring as needed.
547 */
item_get(const char * key,const size_t nkey,conn * c)548 item *item_get(const char *key, const size_t nkey, conn *c) {
549 item *it;
550 uint32_t hv;
551 hv = hash(key, nkey);
552 item_lock(hv);
553 it = do_item_get(key, nkey, hv, c);
554 item_unlock(hv);
555 return it;
556 }
557
item_touch(const char * key,size_t nkey,uint32_t exptime,conn * c)558 item *item_touch(const char *key, size_t nkey, uint32_t exptime, conn *c) {
559 item *it;
560 uint32_t hv;
561 hv = hash(key, nkey);
562 item_lock(hv);
563 it = do_item_touch(key, nkey, exptime, hv, c);
564 item_unlock(hv);
565 return it;
566 }
567
568 /*
569 * Links an item into the LRU and hashtable.
570 */
item_link(item * item)571 int item_link(item *item) {
572 int ret;
573 uint32_t hv;
574
575 hv = hash(ITEM_key(item), item->nkey);
576 item_lock(hv);
577 ret = do_item_link(item, hv);
578 item_unlock(hv);
579 return ret;
580 }
581
582 /*
583 * Decrements the reference count on an item and adds it to the freelist if
584 * needed.
585 */
item_remove(item * item)586 void item_remove(item *item) {
587 uint32_t hv;
588 hv = hash(ITEM_key(item), item->nkey);
589
590 item_lock(hv);
591 do_item_remove(item);
592 item_unlock(hv);
593 }
594
595 /*
596 * Replaces one item with another in the hashtable.
597 * Unprotected by a mutex lock since the core server does not require
598 * it to be thread-safe.
599 */
item_replace(item * old_it,item * new_it,const uint32_t hv)600 int item_replace(item *old_it, item *new_it, const uint32_t hv) {
601 return do_item_replace(old_it, new_it, hv);
602 }
603
604 /*
605 * Unlinks an item from the LRU and hashtable.
606 */
item_unlink(item * item)607 void item_unlink(item *item) {
608 uint32_t hv;
609 hv = hash(ITEM_key(item), item->nkey);
610 item_lock(hv);
611 do_item_unlink(item, hv);
612 item_unlock(hv);
613 }
614
615 /*
616 * Moves an item to the back of the LRU queue.
617 */
item_update(item * item)618 void item_update(item *item) {
619 uint32_t hv;
620 hv = hash(ITEM_key(item), item->nkey);
621
622 item_lock(hv);
623 do_item_update(item);
624 item_unlock(hv);
625 }
626
627 /*
628 * Does arithmetic on a numeric item value.
629 */
add_delta(conn * c,const char * key,const size_t nkey,int incr,const int64_t delta,char * buf,uint64_t * cas)630 enum delta_result_type add_delta(conn *c, const char *key,
631 const size_t nkey, int incr,
632 const int64_t delta, char *buf,
633 uint64_t *cas) {
634 enum delta_result_type ret;
635 uint32_t hv;
636
637 hv = hash(key, nkey);
638 item_lock(hv);
639 ret = do_add_delta(c, key, nkey, incr, delta, buf, cas, hv);
640 item_unlock(hv);
641 return ret;
642 }
643
644 /*
645 * Stores an item in the cache (high level, obeys set/add/replace semantics)
646 */
store_item(item * item,int comm,conn * c)647 enum store_item_type store_item(item *item, int comm, conn* c) {
648 enum store_item_type ret;
649 uint32_t hv;
650
651 hv = hash(ITEM_key(item), item->nkey);
652 item_lock(hv);
653 ret = do_store_item(item, comm, c, hv);
654 item_unlock(hv);
655 return ret;
656 }
657
658 /******************************* GLOBAL STATS ******************************/
659
STATS_LOCK()660 void STATS_LOCK() {
661 pthread_mutex_lock(&stats_lock);
662 }
663
STATS_UNLOCK()664 void STATS_UNLOCK() {
665 pthread_mutex_unlock(&stats_lock);
666 }
667
threadlocal_stats_reset(void)668 void threadlocal_stats_reset(void) {
669 int ii;
670 for (ii = 0; ii < settings.num_threads; ++ii) {
671 pthread_mutex_lock(&threads[ii].stats.mutex);
672 #define X(name) threads[ii].stats.name = 0;
673 THREAD_STATS_FIELDS
674 #undef X
675
676 memset(&threads[ii].stats.slab_stats, 0,
677 sizeof(threads[ii].stats.slab_stats));
678
679 pthread_mutex_unlock(&threads[ii].stats.mutex);
680 }
681 }
682
threadlocal_stats_aggregate(struct thread_stats * stats)683 void threadlocal_stats_aggregate(struct thread_stats *stats) {
684 int ii, sid;
685
686 /* The struct has a mutex, but we can safely set the whole thing
687 * to zero since it is unused when aggregating. */
688 memset(stats, 0, sizeof(*stats));
689
690 for (ii = 0; ii < settings.num_threads; ++ii) {
691 pthread_mutex_lock(&threads[ii].stats.mutex);
692 #define X(name) stats->name += threads[ii].stats.name;
693 THREAD_STATS_FIELDS
694 #undef X
695
696 for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
697 #define X(name) stats->slab_stats[sid].name += \
698 threads[ii].stats.slab_stats[sid].name;
699 SLAB_STATS_FIELDS
700 #undef X
701 }
702
703 pthread_mutex_unlock(&threads[ii].stats.mutex);
704 }
705 }
706
slab_stats_aggregate(struct thread_stats * stats,struct slab_stats * out)707 void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) {
708 int sid;
709
710 memset(out, 0, sizeof(*out));
711
712 for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
713 #define X(name) out->name += stats->slab_stats[sid].name;
714 SLAB_STATS_FIELDS
715 #undef X
716 }
717 }
718
719 /*
720 * Initializes the thread subsystem, creating various worker threads.
721 *
722 * nthreads Number of worker event handler threads to spawn
723 * main_base Event base for main thread
724 */
memcached_thread_init(int nthreads,struct event_base * main_base)725 void memcached_thread_init(int nthreads, struct event_base *main_base) {
726 int i;
727 int power;
728
729 for (i = 0; i < POWER_LARGEST; i++) {
730 pthread_mutex_init(&lru_locks[i], NULL);
731 }
732 pthread_mutex_init(&worker_hang_lock, NULL);
733
734 pthread_mutex_init(&init_lock, NULL);
735 pthread_cond_init(&init_cond, NULL);
736
737 pthread_mutex_init(&cqi_freelist_lock, NULL);
738 cqi_freelist = NULL;
739
740 /* Want a wide lock table, but don't waste memory */
741 if (nthreads < 3) {
742 power = 10;
743 } else if (nthreads < 4) {
744 power = 11;
745 } else if (nthreads < 5) {
746 power = 12;
747 } else {
748 /* 8192 buckets, and central locks don't scale much past 5 threads */
749 power = 13;
750 }
751
752 if (power >= hashpower) {
753 fprintf(stderr, "Hash table power size (%d) cannot be equal to or less than item lock table (%d)\n", hashpower, power);
754 fprintf(stderr, "Item lock table grows with `-t N` (worker threadcount)\n");
755 fprintf(stderr, "Hash table grows with `-o hashpower=N` \n");
756 exit(1);
757 }
758
759 item_lock_count = hashsize(power);
760 item_lock_hashpower = power;
761
762 item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
763 if (! item_locks) {
764 perror("Can't allocate item locks");
765 exit(1);
766 }
767 for (i = 0; i < item_lock_count; i++) {
768 pthread_mutex_init(&item_locks[i], NULL);
769 }
770
771 threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
772 if (! threads) {
773 perror("Can't allocate thread descriptors");
774 exit(1);
775 }
776
777 dispatcher_thread.base = main_base;
778 dispatcher_thread.thread_id = pthread_self();
779
780 for (i = 0; i < nthreads; i++) {
781 int fds[2];
782 if (pipe(fds)) {
783 perror("Can't create notify pipe");
784 exit(1);
785 }
786
787 threads[i].notify_receive_fd = fds[0];
788 threads[i].notify_send_fd = fds[1];
789
790 setup_thread(&threads[i]);
791 /* Reserve three fds for the libevent base, and two for the pipe */
792 stats_state.reserved_fds += 5;
793 }
794
795 /* Create threads after we've done all the libevent setup. */
796 for (i = 0; i < nthreads; i++) {
797 create_worker(worker_libevent, &threads[i]);
798 }
799
800 /* Wait for all the threads to set themselves up before returning. */
801 pthread_mutex_lock(&init_lock);
802 wait_for_thread_registration(nthreads);
803 pthread_mutex_unlock(&init_lock);
804 }
805
806