15b304997SSteven Grimm /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
25b304997SSteven Grimm /*
35b304997SSteven Grimm * Thread management for memcached.
45b304997SSteven Grimm */
55b304997SSteven Grimm #include "memcached.h"
6869f1868Sdormando #include <assert.h>
75b304997SSteven Grimm #include <stdio.h>
85b304997SSteven Grimm #include <errno.h>
95b304997SSteven Grimm #include <stdlib.h>
10d9b97d80SPaul Lindner #include <string.h>
115b304997SSteven Grimm #include <pthread.h>
125b304997SSteven Grimm
133b961388Sdormando #ifdef __sun
143b961388Sdormando #include <atomic.h>
153b961388Sdormando #endif
163b961388Sdormando
175b304997SSteven Grimm #define ITEMS_PER_ALLOC 64
185b304997SSteven Grimm
195b304997SSteven Grimm /* An item in the connection queue. */
205b304997SSteven Grimm typedef struct conn_queue_item CQ_ITEM;
215b304997SSteven Grimm struct conn_queue_item {
225b304997SSteven Grimm int sfd;
23f20c4847SDustin Sallings enum conn_states init_state;
245b304997SSteven Grimm int event_flags;
255b304997SSteven Grimm int read_buffer_size;
2615ace4b5SEric Lambert enum network_transport transport;
275b304997SSteven Grimm CQ_ITEM *next;
285b304997SSteven Grimm };
295b304997SSteven Grimm
305b304997SSteven Grimm /* A connection queue. */
315b304997SSteven Grimm typedef struct conn_queue CQ;
325b304997SSteven Grimm struct conn_queue {
335b304997SSteven Grimm CQ_ITEM *head;
345b304997SSteven Grimm CQ_ITEM *tail;
355b304997SSteven Grimm pthread_mutex_t lock;
365b304997SSteven Grimm };
375b304997SSteven Grimm
3890a59871Sdormando /* Locks for cache LRU operations */
399bce42f2Sdormando pthread_mutex_t lru_locks[POWER_LARGEST];
405b304997SSteven Grimm
41a0e4a756Sdormando /* Connection lock around accepting new connections */
42a0e4a756Sdormando pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER;
43a0e4a756Sdormando
44de1f30c5Sdormando #if !defined(HAVE_GCC_ATOMICS) && !defined(__sun)
453b961388Sdormando pthread_mutex_t atomics_mutex = PTHREAD_MUTEX_INITIALIZER;
463b961388Sdormando #endif
473b961388Sdormando
485b304997SSteven Grimm /* Lock for global stats */
49543b1a34Sdormando static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
505b304997SSteven Grimm
516af7aa0bSdormando /* Lock to cause worker threads to hang up after being woken */
526af7aa0bSdormando static pthread_mutex_t worker_hang_lock;
536af7aa0bSdormando
545b304997SSteven Grimm /* Free list of CQ_ITEM structs */
555b304997SSteven Grimm static CQ_ITEM *cqi_freelist;
565b304997SSteven Grimm static pthread_mutex_t cqi_freelist_lock;
575b304997SSteven Grimm
588fe5bf1fSdormando static pthread_mutex_t *item_locks;
5997409b31Sdormando /* size of the item lock hash table */
6097409b31Sdormando static uint32_t item_lock_count;
61d2676b4aSJason CHAN unsigned int item_lock_hashpower;
621c94e12cSdormando #define hashsize(n) ((unsigned long int)1<<(n))
631c94e12cSdormando #define hashmask(n) (hashsize(n)-1)
648fe5bf1fSdormando
652fe44f1cSDmitry Isaykin static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;
662fe44f1cSDmitry Isaykin
675b304997SSteven Grimm /*
685b304997SSteven Grimm * Each libevent instance has a wakeup pipe, which other threads
695b304997SSteven Grimm * can use to signal that they've put a new connection on its queue.
705b304997SSteven Grimm */
715b304997SSteven Grimm static LIBEVENT_THREAD *threads;
725b304997SSteven Grimm
735b304997SSteven Grimm /*
742fe44f1cSDmitry Isaykin * Number of worker threads that have finished setting themselves up.
755b304997SSteven Grimm */
765b304997SSteven Grimm static int init_count = 0;
775b304997SSteven Grimm static pthread_mutex_t init_lock;
785b304997SSteven Grimm static pthread_cond_t init_cond;
795b304997SSteven Grimm
805b304997SSteven Grimm
815b304997SSteven Grimm static void thread_libevent_process(int fd, short which, void *arg);
825b304997SSteven Grimm
refcount_incr(unsigned short * refcount)83b19b41d9SSteve Wills unsigned short refcount_incr(unsigned short *refcount) {
84de1f30c5Sdormando #ifdef HAVE_GCC_ATOMICS
853b961388Sdormando return __sync_add_and_fetch(refcount, 1);
863b961388Sdormando #elif defined(__sun)
873b961388Sdormando return atomic_inc_ushort_nv(refcount);
883b961388Sdormando #else
893b961388Sdormando unsigned short res;
903b961388Sdormando mutex_lock(&atomics_mutex);
91de1f30c5Sdormando (*refcount)++;
923b961388Sdormando res = *refcount;
93890dfb75Sdormando mutex_unlock(&atomics_mutex);
943b961388Sdormando return res;
953b961388Sdormando #endif
963b961388Sdormando }
973b961388Sdormando
refcount_decr(unsigned short * refcount)98b19b41d9SSteve Wills unsigned short refcount_decr(unsigned short *refcount) {
99de1f30c5Sdormando #ifdef HAVE_GCC_ATOMICS
1003b961388Sdormando return __sync_sub_and_fetch(refcount, 1);
1013b961388Sdormando #elif defined(__sun)
1023b961388Sdormando return atomic_dec_ushort_nv(refcount);
1033b961388Sdormando #else
1043b961388Sdormando unsigned short res;
1053b961388Sdormando mutex_lock(&atomics_mutex);
106de1f30c5Sdormando (*refcount)--;
1073b961388Sdormando res = *refcount;
108890dfb75Sdormando mutex_unlock(&atomics_mutex);
1093b961388Sdormando return res;
1103b961388Sdormando #endif
1113b961388Sdormando }
1123b961388Sdormando
11369d1c699Sdormando /* item_lock() must be held for an item before any modifications to either its
11469d1c699Sdormando * associated hash bucket, or the structure itself.
11569d1c699Sdormando * LRU modifications must hold the item lock, and the LRU lock.
11669d1c699Sdormando * LRU's accessing items must item_trylock() before modifying an item.
11769d1c699Sdormando * Items accessable from an LRU must not be freed or modified
11869d1c699Sdormando * without first locking and removing from the LRU.
11969d1c699Sdormando */
12069d1c699Sdormando
item_lock(uint32_t hv)1211c94e12cSdormando void item_lock(uint32_t hv) {
1221d551408Sdormando mutex_lock(&item_locks[hv & hashmask(item_lock_hashpower)]);
1231c94e12cSdormando }
1241c94e12cSdormando
item_trylock(uint32_t hv)1251c94e12cSdormando void *item_trylock(uint32_t hv) {
1261d551408Sdormando pthread_mutex_t *lock = &item_locks[hv & hashmask(item_lock_hashpower)];
1271c94e12cSdormando if (pthread_mutex_trylock(lock) == 0) {
1281c94e12cSdormando return lock;
1291c94e12cSdormando }
1301c94e12cSdormando return NULL;
1311c94e12cSdormando }
1321c94e12cSdormando
item_trylock_unlock(void * lock)1331c94e12cSdormando void item_trylock_unlock(void *lock) {
1341c94e12cSdormando mutex_unlock((pthread_mutex_t *) lock);
1352db1bf46Sdormando }
1362db1bf46Sdormando
item_unlock(uint32_t hv)1378fe5bf1fSdormando void item_unlock(uint32_t hv) {
1381d551408Sdormando mutex_unlock(&item_locks[hv & hashmask(item_lock_hashpower)]);
1391c94e12cSdormando }
1401c94e12cSdormando
wait_for_thread_registration(int nthreads)1411c94e12cSdormando static void wait_for_thread_registration(int nthreads) {
1421c94e12cSdormando while (init_count < nthreads) {
1431c94e12cSdormando pthread_cond_wait(&init_cond, &init_lock);
1441c94e12cSdormando }
1451c94e12cSdormando }
1461c94e12cSdormando
register_thread_initialized(void)1471c94e12cSdormando static void register_thread_initialized(void) {
1481c94e12cSdormando pthread_mutex_lock(&init_lock);
1491c94e12cSdormando init_count++;
1501c94e12cSdormando pthread_cond_signal(&init_cond);
1511c94e12cSdormando pthread_mutex_unlock(&init_lock);
1526af7aa0bSdormando /* Force worker threads to pile up if someone wants us to */
1536af7aa0bSdormando pthread_mutex_lock(&worker_hang_lock);
1546af7aa0bSdormando pthread_mutex_unlock(&worker_hang_lock);
1551c94e12cSdormando }
1561c94e12cSdormando
15769d1c699Sdormando /* Must not be called with any deeper locks held */
pause_threads(enum pause_thread_types type)1586af7aa0bSdormando void pause_threads(enum pause_thread_types type) {
1591c94e12cSdormando char buf[1];
1601c94e12cSdormando int i;
1611c94e12cSdormando
1626af7aa0bSdormando buf[0] = 0;
1631c94e12cSdormando switch (type) {
1646af7aa0bSdormando case PAUSE_ALL_THREADS:
1656af7aa0bSdormando slabs_rebalancer_pause();
1666af7aa0bSdormando lru_crawler_pause();
167fb269897Sdormando lru_maintainer_pause();
1686af7aa0bSdormando case PAUSE_WORKER_THREADS:
1696af7aa0bSdormando buf[0] = 'p';
1706af7aa0bSdormando pthread_mutex_lock(&worker_hang_lock);
1711c94e12cSdormando break;
1726af7aa0bSdormando case RESUME_ALL_THREADS:
1736af7aa0bSdormando slabs_rebalancer_resume();
1746af7aa0bSdormando lru_crawler_resume();
175fb269897Sdormando lru_maintainer_resume();
1766af7aa0bSdormando case RESUME_WORKER_THREADS:
1776af7aa0bSdormando pthread_mutex_unlock(&worker_hang_lock);
1781c94e12cSdormando break;
1791c94e12cSdormando default:
1801c94e12cSdormando fprintf(stderr, "Unknown lock type: %d\n", type);
1811c94e12cSdormando assert(1 == 0);
1821c94e12cSdormando break;
1831c94e12cSdormando }
1841c94e12cSdormando
1856af7aa0bSdormando /* Only send a message if we have one. */
1866af7aa0bSdormando if (buf[0] == 0) {
1876af7aa0bSdormando return;
1886af7aa0bSdormando }
1896af7aa0bSdormando
1901c94e12cSdormando pthread_mutex_lock(&init_lock);
1911c94e12cSdormando init_count = 0;
1921c94e12cSdormando for (i = 0; i < settings.num_threads; i++) {
1931c94e12cSdormando if (write(threads[i].notify_send_fd, buf, 1) != 1) {
1941c94e12cSdormando perror("Failed writing to notify pipe");
1951c94e12cSdormando /* TODO: This is a fatal problem. Can it ever happen temporarily? */
1961c94e12cSdormando }
1971c94e12cSdormando }
1981c94e12cSdormando wait_for_thread_registration(settings.num_threads);
1991c94e12cSdormando pthread_mutex_unlock(&init_lock);
2008fe5bf1fSdormando }
2018fe5bf1fSdormando
2025b304997SSteven Grimm /*
2035b304997SSteven Grimm * Initializes a connection queue.
2045b304997SSteven Grimm */
cq_init(CQ * cq)2055b304997SSteven Grimm static void cq_init(CQ *cq) {
2065b304997SSteven Grimm pthread_mutex_init(&cq->lock, NULL);
2075b304997SSteven Grimm cq->head = NULL;
2085b304997SSteven Grimm cq->tail = NULL;
2095b304997SSteven Grimm }
2105b304997SSteven Grimm
2115b304997SSteven Grimm /*
2125b304997SSteven Grimm * Looks for an item on a connection queue, but doesn't block if there isn't
2135b304997SSteven Grimm * one.
214d9b97d80SPaul Lindner * Returns the item, or NULL if no item is available
2155b304997SSteven Grimm */
cq_pop(CQ * cq)216b2b49429SToru Maesaka static CQ_ITEM *cq_pop(CQ *cq) {
2175b304997SSteven Grimm CQ_ITEM *item;
2185b304997SSteven Grimm
2195b304997SSteven Grimm pthread_mutex_lock(&cq->lock);
2205b304997SSteven Grimm item = cq->head;
2215b304997SSteven Grimm if (NULL != item) {
2225b304997SSteven Grimm cq->head = item->next;
2235b304997SSteven Grimm if (NULL == cq->head)
2245b304997SSteven Grimm cq->tail = NULL;
2255b304997SSteven Grimm }
2265b304997SSteven Grimm pthread_mutex_unlock(&cq->lock);
2275b304997SSteven Grimm
2285b304997SSteven Grimm return item;
2295b304997SSteven Grimm }
2305b304997SSteven Grimm
2315b304997SSteven Grimm /*
2325b304997SSteven Grimm * Adds an item to a connection queue.
2335b304997SSteven Grimm */
cq_push(CQ * cq,CQ_ITEM * item)2345b304997SSteven Grimm static void cq_push(CQ *cq, CQ_ITEM *item) {
2355b304997SSteven Grimm item->next = NULL;
2365b304997SSteven Grimm
2375b304997SSteven Grimm pthread_mutex_lock(&cq->lock);
2385b304997SSteven Grimm if (NULL == cq->tail)
2395b304997SSteven Grimm cq->head = item;
2405b304997SSteven Grimm else
2415b304997SSteven Grimm cq->tail->next = item;
2425b304997SSteven Grimm cq->tail = item;
2435b304997SSteven Grimm pthread_mutex_unlock(&cq->lock);
2445b304997SSteven Grimm }
2455b304997SSteven Grimm
2465b304997SSteven Grimm /*
2475b304997SSteven Grimm * Returns a fresh connection queue item.
2485b304997SSteven Grimm */
cqi_new(void)249df1b7e42STrond Norbye static CQ_ITEM *cqi_new(void) {
2505b304997SSteven Grimm CQ_ITEM *item = NULL;
2515b304997SSteven Grimm pthread_mutex_lock(&cqi_freelist_lock);
2525b304997SSteven Grimm if (cqi_freelist) {
2535b304997SSteven Grimm item = cqi_freelist;
2545b304997SSteven Grimm cqi_freelist = item->next;
2555b304997SSteven Grimm }
2565b304997SSteven Grimm pthread_mutex_unlock(&cqi_freelist_lock);
2575b304997SSteven Grimm
2585b304997SSteven Grimm if (NULL == item) {
2595b304997SSteven Grimm int i;
2605b304997SSteven Grimm
2615b304997SSteven Grimm /* Allocate a bunch of items at once to reduce fragmentation */
2625b304997SSteven Grimm item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC);
263de021a9cSTrond Norbye if (NULL == item) {
264de021a9cSTrond Norbye STATS_LOCK();
265de021a9cSTrond Norbye stats.malloc_fails++;
266de021a9cSTrond Norbye STATS_UNLOCK();
2675b304997SSteven Grimm return NULL;
268de021a9cSTrond Norbye }
2695b304997SSteven Grimm
2705b304997SSteven Grimm /*
2715b304997SSteven Grimm * Link together all the new items except the first one
2725b304997SSteven Grimm * (which we'll return to the caller) for placement on
2735b304997SSteven Grimm * the freelist.
2745b304997SSteven Grimm */
2755b304997SSteven Grimm for (i = 2; i < ITEMS_PER_ALLOC; i++)
2765b304997SSteven Grimm item[i - 1].next = &item[i];
2775b304997SSteven Grimm
2785b304997SSteven Grimm pthread_mutex_lock(&cqi_freelist_lock);
2795b304997SSteven Grimm item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
2805b304997SSteven Grimm cqi_freelist = &item[1];
2815b304997SSteven Grimm pthread_mutex_unlock(&cqi_freelist_lock);
2825b304997SSteven Grimm }
2835b304997SSteven Grimm
2845b304997SSteven Grimm return item;
2855b304997SSteven Grimm }
2865b304997SSteven Grimm
2875b304997SSteven Grimm
2885b304997SSteven Grimm /*
2895b304997SSteven Grimm * Frees a connection queue item (adds it to the freelist.)
2905b304997SSteven Grimm */
cqi_free(CQ_ITEM * item)2915b304997SSteven Grimm static void cqi_free(CQ_ITEM *item) {
2925b304997SSteven Grimm pthread_mutex_lock(&cqi_freelist_lock);
2935b304997SSteven Grimm item->next = cqi_freelist;
2945b304997SSteven Grimm cqi_freelist = item;
2955b304997SSteven Grimm pthread_mutex_unlock(&cqi_freelist_lock);
2965b304997SSteven Grimm }
2975b304997SSteven Grimm
2985b304997SSteven Grimm
2995b304997SSteven Grimm /*
3005b304997SSteven Grimm * Creates a worker thread.
3015b304997SSteven Grimm */
create_worker(void * (* func)(void *),void * arg)3025b304997SSteven Grimm static void create_worker(void *(*func)(void *), void *arg) {
3035b304997SSteven Grimm pthread_attr_t attr;
3045b304997SSteven Grimm int ret;
3055b304997SSteven Grimm
3065b304997SSteven Grimm pthread_attr_init(&attr);
3075b304997SSteven Grimm
3087dc726beSSaman Barghi if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0) {
3095b304997SSteven Grimm fprintf(stderr, "Can't create thread: %s\n",
3105b304997SSteven Grimm strerror(ret));
3115b304997SSteven Grimm exit(1);
3125b304997SSteven Grimm }
3135b304997SSteven Grimm }
3145b304997SSteven Grimm
315a0e4a756Sdormando /*
316a0e4a756Sdormando * Sets whether or not we accept new connections.
317a0e4a756Sdormando */
accept_new_conns(const bool do_accept)318a0e4a756Sdormando void accept_new_conns(const bool do_accept) {
319a0e4a756Sdormando pthread_mutex_lock(&conn_lock);
320a0e4a756Sdormando do_accept_new_conns(do_accept);
321a0e4a756Sdormando pthread_mutex_unlock(&conn_lock);
322a0e4a756Sdormando }
3235b304997SSteven Grimm /****************************** LIBEVENT THREADS *****************************/
3245b304997SSteven Grimm
3255b304997SSteven Grimm /*
3265b304997SSteven Grimm * Set up a thread's information.
3275b304997SSteven Grimm */
setup_thread(LIBEVENT_THREAD * me)3285b304997SSteven Grimm static void setup_thread(LIBEVENT_THREAD *me) {
3295b304997SSteven Grimm me->base = event_init();
3305b304997SSteven Grimm if (! me->base) {
3315b304997SSteven Grimm fprintf(stderr, "Can't allocate event base\n");
3325b304997SSteven Grimm exit(1);
3335b304997SSteven Grimm }
3345b304997SSteven Grimm
3355b304997SSteven Grimm /* Listen for notifications from other threads */
3365b304997SSteven Grimm event_set(&me->notify_event, me->notify_receive_fd,
3375b304997SSteven Grimm EV_READ | EV_PERSIST, thread_libevent_process, me);
3385b304997SSteven Grimm event_base_set(me->base, &me->notify_event);
3395b304997SSteven Grimm
3405b304997SSteven Grimm if (event_add(&me->notify_event, 0) == -1) {
3415b304997SSteven Grimm fprintf(stderr, "Can't monitor libevent notify pipe\n");
3425b304997SSteven Grimm exit(1);
3435b304997SSteven Grimm }
3445b304997SSteven Grimm
3451fdfb7e9STrond Norbye me->new_conn_queue = malloc(sizeof(struct conn_queue));
3461fdfb7e9STrond Norbye if (me->new_conn_queue == NULL) {
3471fdfb7e9STrond Norbye perror("Failed to allocate memory for connection queue");
3481fdfb7e9STrond Norbye exit(EXIT_FAILURE);
3491fdfb7e9STrond Norbye }
3501fdfb7e9STrond Norbye cq_init(me->new_conn_queue);
3511fdfb7e9STrond Norbye
3521fdfb7e9STrond Norbye if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
3531fdfb7e9STrond Norbye perror("Failed to initialize mutex");
3541fdfb7e9STrond Norbye exit(EXIT_FAILURE);
3551fdfb7e9STrond Norbye }
3564c86fa59STrond Norbye
3574c86fa59STrond Norbye me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
3584c86fa59STrond Norbye NULL, NULL);
3594c86fa59STrond Norbye if (me->suffix_cache == NULL) {
3604c86fa59STrond Norbye fprintf(stderr, "Failed to create suffix cache\n");
3614c86fa59STrond Norbye exit(EXIT_FAILURE);
3624c86fa59STrond Norbye }
3635b304997SSteven Grimm }
3645b304997SSteven Grimm
3655b304997SSteven Grimm /*
3665b304997SSteven Grimm * Worker thread: main event loop
3675b304997SSteven Grimm */
worker_libevent(void * arg)3685b304997SSteven Grimm static void *worker_libevent(void *arg) {
3695b304997SSteven Grimm LIBEVENT_THREAD *me = arg;
3705b304997SSteven Grimm
371434c7cc5Sdormando /* Any per-thread setup can happen here; memcached_thread_init() will block until
3725b304997SSteven Grimm * all threads have finished initializing.
3735b304997SSteven Grimm */
374916fff36Sdormando me->l = logger_create();
375916fff36Sdormando if (me->l == NULL) {
376916fff36Sdormando abort();
377916fff36Sdormando }
3785b304997SSteven Grimm
3791c94e12cSdormando register_thread_initialized();
3805b304997SSteven Grimm
381df1b7e42STrond Norbye event_base_loop(me->base, 0);
382df1b7e42STrond Norbye return NULL;
3835b304997SSteven Grimm }
3845b304997SSteven Grimm
3855b304997SSteven Grimm
3865b304997SSteven Grimm /*
3875b304997SSteven Grimm * Processes an incoming "handle a new connection" item. This is called when
3885b304997SSteven Grimm * input arrives on the libevent wakeup pipe.
3895b304997SSteven Grimm */
thread_libevent_process(int fd,short which,void * arg)3905b304997SSteven Grimm static void thread_libevent_process(int fd, short which, void *arg) {
3915b304997SSteven Grimm LIBEVENT_THREAD *me = arg;
3925b304997SSteven Grimm CQ_ITEM *item;
3935b304997SSteven Grimm char buf[1];
39483ba6bd9SJay Grizzard unsigned int timeout_fd;
3955b304997SSteven Grimm
39683ba6bd9SJay Grizzard if (read(fd, buf, 1) != 1) {
3975b304997SSteven Grimm if (settings.verbose > 0)
3985b304997SSteven Grimm fprintf(stderr, "Can't read from libevent pipe\n");
39983ba6bd9SJay Grizzard return;
40083ba6bd9SJay Grizzard }
4015b304997SSteven Grimm
4021c94e12cSdormando switch (buf[0]) {
4031c94e12cSdormando case 'c':
4041fdfb7e9STrond Norbye item = cq_pop(me->new_conn_queue);
405d9b97d80SPaul Lindner
406d9b97d80SPaul Lindner if (NULL != item) {
4075b304997SSteven Grimm conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
4081e6a1326SJay Grizzard item->read_buffer_size, item->transport,
4091e6a1326SJay Grizzard me->base);
4100b211d45SSteven Grimm if (c == NULL) {
41115ace4b5SEric Lambert if (IS_UDP(item->transport)) {
4125b304997SSteven Grimm fprintf(stderr, "Can't listen for events on UDP socket\n");
4135b304997SSteven Grimm exit(1);
414d9b97d80SPaul Lindner } else {
4155b304997SSteven Grimm if (settings.verbose > 0) {
4165b304997SSteven Grimm fprintf(stderr, "Can't listen for events on fd %d\n",
4175b304997SSteven Grimm item->sfd);
4185b304997SSteven Grimm }
4195b304997SSteven Grimm close(item->sfd);
4205b304997SSteven Grimm }
4211fdfb7e9STrond Norbye } else {
4221fdfb7e9STrond Norbye c->thread = me;
4235b304997SSteven Grimm }
4245b304997SSteven Grimm cqi_free(item);
4255b304997SSteven Grimm }
4261c94e12cSdormando break;
4276af7aa0bSdormando /* we were told to pause and report in */
4286af7aa0bSdormando case 'p':
4291c94e12cSdormando register_thread_initialized();
4301c94e12cSdormando break;
43183ba6bd9SJay Grizzard /* a client socket timed out */
43283ba6bd9SJay Grizzard case 't':
43383ba6bd9SJay Grizzard if (read(fd, &timeout_fd, sizeof(timeout_fd)) != sizeof(timeout_fd)) {
43483ba6bd9SJay Grizzard if (settings.verbose > 0)
43583ba6bd9SJay Grizzard fprintf(stderr, "Can't read timeout fd from libevent pipe\n");
43683ba6bd9SJay Grizzard return;
43783ba6bd9SJay Grizzard }
43883ba6bd9SJay Grizzard conn_close_idle(conns[timeout_fd]);
43983ba6bd9SJay Grizzard break;
4401c94e12cSdormando }
4415b304997SSteven Grimm }
4425b304997SSteven Grimm
4435b304997SSteven Grimm /* Which thread we assigned a connection to most recently. */
4442fe44f1cSDmitry Isaykin static int last_thread = -1;
4455b304997SSteven Grimm
4465b304997SSteven Grimm /*
4475b304997SSteven Grimm * Dispatches a new connection to another thread. This is only ever called
4485b304997SSteven Grimm * from the main thread, either during initialization (for UDP) or because
4495b304997SSteven Grimm * of an incoming connection.
4505b304997SSteven Grimm */
dispatch_conn_new(int sfd,enum conn_states init_state,int event_flags,int read_buffer_size,enum network_transport transport)4511437fe90STrond Norbye void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
45215ace4b5SEric Lambert int read_buffer_size, enum network_transport transport) {
4535b304997SSteven Grimm CQ_ITEM *item = cqi_new();
4541c94e12cSdormando char buf[1];
4556af1d265STrond Norbye if (item == NULL) {
4566af1d265STrond Norbye close(sfd);
4576af1d265STrond Norbye /* given that malloc failed this may also fail, but let's try */
4586af1d265STrond Norbye fprintf(stderr, "Failed to allocate memory for connection object\n");
4596af1d265STrond Norbye return ;
4606af1d265STrond Norbye }
4616af1d265STrond Norbye
4622fe44f1cSDmitry Isaykin int tid = (last_thread + 1) % settings.num_threads;
4635b304997SSteven Grimm
464869f1868Sdormando LIBEVENT_THREAD *thread = threads + tid;
465869f1868Sdormando
466869f1868Sdormando last_thread = tid;
4675b304997SSteven Grimm
4685b304997SSteven Grimm item->sfd = sfd;
4695b304997SSteven Grimm item->init_state = init_state;
4705b304997SSteven Grimm item->event_flags = event_flags;
4715b304997SSteven Grimm item->read_buffer_size = read_buffer_size;
47215ace4b5SEric Lambert item->transport = transport;
4735b304997SSteven Grimm
4741fdfb7e9STrond Norbye cq_push(thread->new_conn_queue, item);
47568957214STrond Norbye
476b2e7e909SRicky Zhou MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
4771c94e12cSdormando buf[0] = 'c';
4781c94e12cSdormando if (write(thread->notify_send_fd, buf, 1) != 1) {
4795b304997SSteven Grimm perror("Writing to thread notify pipe");
4805b304997SSteven Grimm }
4815b304997SSteven Grimm }
4825b304997SSteven Grimm
4835b304997SSteven Grimm /*
484916fff36Sdormando * Re-dispatches a connection back to the original thread. Can be called from
485916fff36Sdormando * any side thread borrowing a connection.
486916fff36Sdormando * TODO: Look into this. too complicated?
487916fff36Sdormando */
488916fff36Sdormando #ifdef BOGUS_DEFINE
redispatch_conn(conn * c)489916fff36Sdormando void redispatch_conn(conn *c) {
490916fff36Sdormando CQ_ITEM *item = cqi_new();
491916fff36Sdormando char buf[1];
492916fff36Sdormando if (item == NULL) {
493916fff36Sdormando /* Can't cleanly redispatch connection. close it forcefully. */
494916fff36Sdormando /* FIXME: is conn_cleanup() necessary?
495916fff36Sdormando * if conn was handed off to a side thread it should be clean.
496916fff36Sdormando * could also put it into a "clean_me" state?
497916fff36Sdormando */
498916fff36Sdormando c->state = conn_closed;
499916fff36Sdormando close(c->sfd);
500916fff36Sdormando return;
501916fff36Sdormando }
502916fff36Sdormando LIBEVENT_THREAD *thread = c->thread;
503916fff36Sdormando item->sfd = sfd;
504916fff36Sdormando /* pass in the state somehow?
505916fff36Sdormando item->init_state = conn_closing; */
506916fff36Sdormando item->event_flags = c->event_flags;
507916fff36Sdormando item->conn = c;
508916fff36Sdormando }
509916fff36Sdormando #endif
510916fff36Sdormando
511916fff36Sdormando /* This misses the allow_new_conns flag :( */
sidethread_conn_close(conn * c)512916fff36Sdormando void sidethread_conn_close(conn *c) {
513916fff36Sdormando c->state = conn_closed;
514916fff36Sdormando if (settings.verbose > 1)
515916fff36Sdormando fprintf(stderr, "<%d connection closed from side thread.\n", c->sfd);
516916fff36Sdormando close(c->sfd);
517916fff36Sdormando
518916fff36Sdormando STATS_LOCK();
519cb01d504Sdormando stats_state.curr_conns--;
520916fff36Sdormando STATS_UNLOCK();
521916fff36Sdormando
522916fff36Sdormando return;
523916fff36Sdormando }
524916fff36Sdormando
525916fff36Sdormando /*
5265b304997SSteven Grimm * Returns true if this is the thread that listens for new TCP connections.
5275b304997SSteven Grimm */
is_listen_thread()528a9dcd9acSToru Maesaka int is_listen_thread() {
5292fe44f1cSDmitry Isaykin return pthread_self() == dispatcher_thread.thread_id;
5305b304997SSteven Grimm }
5315b304997SSteven Grimm
5325b304997SSteven Grimm /********************************* ITEM ACCESS *******************************/
5335b304997SSteven Grimm
5345b304997SSteven Grimm /*
5355b304997SSteven Grimm * Allocates a new item.
5365b304997SSteven Grimm */
item_alloc(char * key,size_t nkey,int flags,rel_time_t exptime,int nbytes)537a9dcd9acSToru Maesaka item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
5385b304997SSteven Grimm item *it;
53909f54cb4Sdormando /* do_item_alloc handles its own locks */
540*690a9a9dSEiichi Tsukata it = do_item_alloc(key, nkey, flags, exptime, nbytes);
5415b304997SSteven Grimm return it;
5425b304997SSteven Grimm }
5435b304997SSteven Grimm
5445b304997SSteven Grimm /*
5455da8dbabSTrond Norbye * Returns an item if it hasn't been marked as expired,
5465b304997SSteven Grimm * lazy-expiring as needed.
5475b304997SSteven Grimm */
item_get(const char * key,const size_t nkey,conn * c)5486895d23eSsergiocarlos item *item_get(const char *key, const size_t nkey, conn *c) {
5495b304997SSteven Grimm item *it;
550bab9acd1Sdormando uint32_t hv;
55105ca809cSdormando hv = hash(key, nkey);
5528fe5bf1fSdormando item_lock(hv);
5536895d23eSsergiocarlos it = do_item_get(key, nkey, hv, c);
5548fe5bf1fSdormando item_unlock(hv);
5555b304997SSteven Grimm return it;
5565b304997SSteven Grimm }
5575b304997SSteven Grimm
item_touch(const char * key,size_t nkey,uint32_t exptime,conn * c)5586895d23eSsergiocarlos item *item_touch(const char *key, size_t nkey, uint32_t exptime, conn *c) {
559d87f568aSdormando item *it;
560bab9acd1Sdormando uint32_t hv;
56105ca809cSdormando hv = hash(key, nkey);
5628fe5bf1fSdormando item_lock(hv);
5636895d23eSsergiocarlos it = do_item_touch(key, nkey, exptime, hv, c);
5648fe5bf1fSdormando item_unlock(hv);
565d87f568aSdormando return it;
566d87f568aSdormando }
567d87f568aSdormando
5685b304997SSteven Grimm /*
5695b304997SSteven Grimm * Links an item into the LRU and hashtable.
5705b304997SSteven Grimm */
item_link(item * item)571a9dcd9acSToru Maesaka int item_link(item *item) {
5725b304997SSteven Grimm int ret;
573bab9acd1Sdormando uint32_t hv;
5745b304997SSteven Grimm
57505ca809cSdormando hv = hash(ITEM_key(item), item->nkey);
5768fe5bf1fSdormando item_lock(hv);
577bab9acd1Sdormando ret = do_item_link(item, hv);
5788fe5bf1fSdormando item_unlock(hv);
5795b304997SSteven Grimm return ret;
5805b304997SSteven Grimm }
5815b304997SSteven Grimm
5825b304997SSteven Grimm /*
5835b304997SSteven Grimm * Decrements the reference count on an item and adds it to the freelist if
5845b304997SSteven Grimm * needed.
5855b304997SSteven Grimm */
item_remove(item * item)586a9dcd9acSToru Maesaka void item_remove(item *item) {
5878fe5bf1fSdormando uint32_t hv;
58805ca809cSdormando hv = hash(ITEM_key(item), item->nkey);
5898fe5bf1fSdormando
5908fe5bf1fSdormando item_lock(hv);
5915b304997SSteven Grimm do_item_remove(item);
5928fe5bf1fSdormando item_unlock(hv);
5935b304997SSteven Grimm }
5945b304997SSteven Grimm
5955b304997SSteven Grimm /*
5965b304997SSteven Grimm * Replaces one item with another in the hashtable.
597600ec99aSToru Maesaka * Unprotected by a mutex lock since the core server does not require
598600ec99aSToru Maesaka * it to be thread-safe.
5995b304997SSteven Grimm */
item_replace(item * old_it,item * new_it,const uint32_t hv)600bab9acd1Sdormando int item_replace(item *old_it, item *new_it, const uint32_t hv) {
601bab9acd1Sdormando return do_item_replace(old_it, new_it, hv);
6025b304997SSteven Grimm }
6035b304997SSteven Grimm
6045b304997SSteven Grimm /*
6055b304997SSteven Grimm * Unlinks an item from the LRU and hashtable.
6065b304997SSteven Grimm */
item_unlink(item * item)607a9dcd9acSToru Maesaka void item_unlink(item *item) {
608bab9acd1Sdormando uint32_t hv;
60905ca809cSdormando hv = hash(ITEM_key(item), item->nkey);
6108fe5bf1fSdormando item_lock(hv);
611bab9acd1Sdormando do_item_unlink(item, hv);
6128fe5bf1fSdormando item_unlock(hv);
6135b304997SSteven Grimm }
6145b304997SSteven Grimm
6155b304997SSteven Grimm /*
6165b304997SSteven Grimm * Moves an item to the back of the LRU queue.
6175b304997SSteven Grimm */
item_update(item * item)618a9dcd9acSToru Maesaka void item_update(item *item) {
6198fe5bf1fSdormando uint32_t hv;
62005ca809cSdormando hv = hash(ITEM_key(item), item->nkey);
6218fe5bf1fSdormando
6228fe5bf1fSdormando item_lock(hv);
6235b304997SSteven Grimm do_item_update(item);
6248fe5bf1fSdormando item_unlock(hv);
6255b304997SSteven Grimm }
6265b304997SSteven Grimm
6275b304997SSteven Grimm /*
6285b304997SSteven Grimm * Does arithmetic on a numeric item value.
6295b304997SSteven Grimm */
add_delta(conn * c,const char * key,const size_t nkey,int incr,const int64_t delta,char * buf,uint64_t * cas)630cbcd3872Sdormando enum delta_result_type add_delta(conn *c, const char *key,
631cbcd3872Sdormando const size_t nkey, int incr,
632ea2d42a5Sdormando const int64_t delta, char *buf,
633ea2d42a5Sdormando uint64_t *cas) {
634d044acb2SDustin Sallings enum delta_result_type ret;
635bab9acd1Sdormando uint32_t hv;
6365b304997SSteven Grimm
63705ca809cSdormando hv = hash(key, nkey);
6388fe5bf1fSdormando item_lock(hv);
639bab9acd1Sdormando ret = do_add_delta(c, key, nkey, incr, delta, buf, cas, hv);
6408fe5bf1fSdormando item_unlock(hv);
6415b304997SSteven Grimm return ret;
6425b304997SSteven Grimm }
6435b304997SSteven Grimm
6445b304997SSteven Grimm /*
6455b304997SSteven Grimm * Stores an item in the cache (high level, obeys set/add/replace semantics)
6465b304997SSteven Grimm */
store_item(item * item,int comm,conn * c)647e5d053c3SDustin Sallings enum store_item_type store_item(item *item, int comm, conn* c) {
648e5d053c3SDustin Sallings enum store_item_type ret;
649bab9acd1Sdormando uint32_t hv;
6505b304997SSteven Grimm
65105ca809cSdormando hv = hash(ITEM_key(item), item->nkey);
6528fe5bf1fSdormando item_lock(hv);
653bab9acd1Sdormando ret = do_store_item(item, comm, c, hv);
6548fe5bf1fSdormando item_unlock(hv);
6555b304997SSteven Grimm return ret;
6565b304997SSteven Grimm }
6575b304997SSteven Grimm
6585b304997SSteven Grimm /******************************* GLOBAL STATS ******************************/
6595b304997SSteven Grimm
STATS_LOCK()660a9dcd9acSToru Maesaka void STATS_LOCK() {
6614be8a5a7STrond Norbye pthread_mutex_lock(&stats_lock);
6625b304997SSteven Grimm }
6635b304997SSteven Grimm
STATS_UNLOCK()664a9dcd9acSToru Maesaka void STATS_UNLOCK() {
6654be8a5a7STrond Norbye pthread_mutex_unlock(&stats_lock);
6665b304997SSteven Grimm }
6675b304997SSteven Grimm
threadlocal_stats_reset(void)6681fdfb7e9STrond Norbye void threadlocal_stats_reset(void) {
66968c64594Sdormando int ii;
670effae30eSTrond Norbye for (ii = 0; ii < settings.num_threads; ++ii) {
6711fdfb7e9STrond Norbye pthread_mutex_lock(&threads[ii].stats.mutex);
67268c64594Sdormando #define X(name) threads[ii].stats.name = 0;
67368c64594Sdormando THREAD_STATS_FIELDS
67468c64594Sdormando #undef X
6751fdfb7e9STrond Norbye
67668c64594Sdormando memset(&threads[ii].stats.slab_stats, 0,
67768c64594Sdormando sizeof(threads[ii].stats.slab_stats));
67825b5189cSDustin Sallings
6791fdfb7e9STrond Norbye pthread_mutex_unlock(&threads[ii].stats.mutex);
6801fdfb7e9STrond Norbye }
6811fdfb7e9STrond Norbye }
6821fdfb7e9STrond Norbye
threadlocal_stats_aggregate(struct thread_stats * stats)6831fdfb7e9STrond Norbye void threadlocal_stats_aggregate(struct thread_stats *stats) {
68425b5189cSDustin Sallings int ii, sid;
6851fdfb7e9STrond Norbye
68692d6e124SDan McGee /* The struct has a mutex, but we can safely set the whole thing
68792d6e124SDan McGee * to zero since it is unused when aggregating. */
68892d6e124SDan McGee memset(stats, 0, sizeof(*stats));
68925b5189cSDustin Sallings
690effae30eSTrond Norbye for (ii = 0; ii < settings.num_threads; ++ii) {
6911fdfb7e9STrond Norbye pthread_mutex_lock(&threads[ii].stats.mutex);
69268c64594Sdormando #define X(name) stats->name += threads[ii].stats.name;
69368c64594Sdormando THREAD_STATS_FIELDS
69468c64594Sdormando #undef X
6951fdfb7e9STrond Norbye
69625b5189cSDustin Sallings for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
69768c64594Sdormando #define X(name) stats->slab_stats[sid].name += \
69868c64594Sdormando threads[ii].stats.slab_stats[sid].name;
69968c64594Sdormando SLAB_STATS_FIELDS
70068c64594Sdormando #undef X
70125b5189cSDustin Sallings }
70225b5189cSDustin Sallings
7031fdfb7e9STrond Norbye pthread_mutex_unlock(&threads[ii].stats.mutex);
7041fdfb7e9STrond Norbye }
7051fdfb7e9STrond Norbye }
7061fdfb7e9STrond Norbye
slab_stats_aggregate(struct thread_stats * stats,struct slab_stats * out)70725b5189cSDustin Sallings void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) {
70825b5189cSDustin Sallings int sid;
70925b5189cSDustin Sallings
71068c64594Sdormando memset(out, 0, sizeof(*out));
71125b5189cSDustin Sallings
71225b5189cSDustin Sallings for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
71368c64594Sdormando #define X(name) out->name += stats->slab_stats[sid].name;
71468c64594Sdormando SLAB_STATS_FIELDS
71568c64594Sdormando #undef X
71625b5189cSDustin Sallings }
71725b5189cSDustin Sallings }
7181fdfb7e9STrond Norbye
7195b304997SSteven Grimm /*
7205b304997SSteven Grimm * Initializes the thread subsystem, creating various worker threads.
7215b304997SSteven Grimm *
7222fe44f1cSDmitry Isaykin * nthreads Number of worker event handler threads to spawn
7235b304997SSteven Grimm * main_base Event base for main thread
7245b304997SSteven Grimm */
memcached_thread_init(int nthreads,struct event_base * main_base)725434c7cc5Sdormando void memcached_thread_init(int nthreads, struct event_base *main_base) {
7265b304997SSteven Grimm int i;
72797409b31Sdormando int power;
7285b304997SSteven Grimm
7299bce42f2Sdormando for (i = 0; i < POWER_LARGEST; i++) {
7309bce42f2Sdormando pthread_mutex_init(&lru_locks[i], NULL);
7319bce42f2Sdormando }
7326af7aa0bSdormando pthread_mutex_init(&worker_hang_lock, NULL);
7335b304997SSteven Grimm
7345b304997SSteven Grimm pthread_mutex_init(&init_lock, NULL);
7355b304997SSteven Grimm pthread_cond_init(&init_cond, NULL);
7365b304997SSteven Grimm
7375b304997SSteven Grimm pthread_mutex_init(&cqi_freelist_lock, NULL);
7385b304997SSteven Grimm cqi_freelist = NULL;
7395b304997SSteven Grimm
74097409b31Sdormando /* Want a wide lock table, but don't waste memory */
74197409b31Sdormando if (nthreads < 3) {
74297409b31Sdormando power = 10;
74397409b31Sdormando } else if (nthreads < 4) {
74497409b31Sdormando power = 11;
74597409b31Sdormando } else if (nthreads < 5) {
74697409b31Sdormando power = 12;
74797409b31Sdormando } else {
74897409b31Sdormando /* 8192 buckets, and central locks don't scale much past 5 threads */
74997409b31Sdormando power = 13;
75097409b31Sdormando }
75197409b31Sdormando
7526af7aa0bSdormando if (power >= hashpower) {
7536af7aa0bSdormando fprintf(stderr, "Hash table power size (%d) cannot be equal to or less than item lock table (%d)\n", hashpower, power);
7546af7aa0bSdormando fprintf(stderr, "Item lock table grows with `-t N` (worker threadcount)\n");
7556af7aa0bSdormando fprintf(stderr, "Hash table grows with `-o hashpower=N` \n");
7566af7aa0bSdormando exit(1);
7576af7aa0bSdormando }
7586af7aa0bSdormando
7591c94e12cSdormando item_lock_count = hashsize(power);
7601d551408Sdormando item_lock_hashpower = power;
76197409b31Sdormando
76297409b31Sdormando item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
7638fe5bf1fSdormando if (! item_locks) {
7648fe5bf1fSdormando perror("Can't allocate item locks");
7658fe5bf1fSdormando exit(1);
7668fe5bf1fSdormando }
76797409b31Sdormando for (i = 0; i < item_lock_count; i++) {
7688fe5bf1fSdormando pthread_mutex_init(&item_locks[i], NULL);
7698fe5bf1fSdormando }
7708fe5bf1fSdormando
771b2e7e909SRicky Zhou threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
7725b304997SSteven Grimm if (! threads) {
7735b304997SSteven Grimm perror("Can't allocate thread descriptors");
7745b304997SSteven Grimm exit(1);
7755b304997SSteven Grimm }
7765b304997SSteven Grimm
7772fe44f1cSDmitry Isaykin dispatcher_thread.base = main_base;
7782fe44f1cSDmitry Isaykin dispatcher_thread.thread_id = pthread_self();
7795b304997SSteven Grimm
7805b304997SSteven Grimm for (i = 0; i < nthreads; i++) {
7815b304997SSteven Grimm int fds[2];
7825b304997SSteven Grimm if (pipe(fds)) {
7835b304997SSteven Grimm perror("Can't create notify pipe");
7845b304997SSteven Grimm exit(1);
7855b304997SSteven Grimm }
7865b304997SSteven Grimm
7875b304997SSteven Grimm threads[i].notify_receive_fd = fds[0];
7885b304997SSteven Grimm threads[i].notify_send_fd = fds[1];
7895b304997SSteven Grimm
7905b304997SSteven Grimm setup_thread(&threads[i]);
791d1f9d992Sdormando /* Reserve three fds for the libevent base, and two for the pipe */
792cb01d504Sdormando stats_state.reserved_fds += 5;
7935b304997SSteven Grimm }
7945b304997SSteven Grimm
7955b304997SSteven Grimm /* Create threads after we've done all the libevent setup. */
7962fe44f1cSDmitry Isaykin for (i = 0; i < nthreads; i++) {
7975b304997SSteven Grimm create_worker(worker_libevent, &threads[i]);
7985b304997SSteven Grimm }
7995b304997SSteven Grimm
8005b304997SSteven Grimm /* Wait for all the threads to set themselves up before returning. */
8015b304997SSteven Grimm pthread_mutex_lock(&init_lock);
8021c94e12cSdormando wait_for_thread_registration(nthreads);
8035b304997SSteven Grimm pthread_mutex_unlock(&init_lock);
8045b304997SSteven Grimm }
8055b304997SSteven Grimm
806