xref: /memcached-1.4.29/thread.c (revision 690a9a9d)
15b304997SSteven Grimm /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
25b304997SSteven Grimm /*
35b304997SSteven Grimm  * Thread management for memcached.
45b304997SSteven Grimm  */
55b304997SSteven Grimm #include "memcached.h"
6869f1868Sdormando #include <assert.h>
75b304997SSteven Grimm #include <stdio.h>
85b304997SSteven Grimm #include <errno.h>
95b304997SSteven Grimm #include <stdlib.h>
10d9b97d80SPaul Lindner #include <string.h>
115b304997SSteven Grimm #include <pthread.h>
125b304997SSteven Grimm 
133b961388Sdormando #ifdef __sun
143b961388Sdormando #include <atomic.h>
153b961388Sdormando #endif
163b961388Sdormando 
175b304997SSteven Grimm #define ITEMS_PER_ALLOC 64
185b304997SSteven Grimm 
195b304997SSteven Grimm /* An item in the connection queue. */
205b304997SSteven Grimm typedef struct conn_queue_item CQ_ITEM;
215b304997SSteven Grimm struct conn_queue_item {
225b304997SSteven Grimm     int               sfd;
23f20c4847SDustin Sallings     enum conn_states  init_state;
245b304997SSteven Grimm     int               event_flags;
255b304997SSteven Grimm     int               read_buffer_size;
2615ace4b5SEric Lambert     enum network_transport     transport;
275b304997SSteven Grimm     CQ_ITEM          *next;
285b304997SSteven Grimm };
295b304997SSteven Grimm 
305b304997SSteven Grimm /* A connection queue. */
315b304997SSteven Grimm typedef struct conn_queue CQ;
325b304997SSteven Grimm struct conn_queue {
335b304997SSteven Grimm     CQ_ITEM *head;
345b304997SSteven Grimm     CQ_ITEM *tail;
355b304997SSteven Grimm     pthread_mutex_t lock;
365b304997SSteven Grimm };
375b304997SSteven Grimm 
3890a59871Sdormando /* Locks for cache LRU operations */
399bce42f2Sdormando pthread_mutex_t lru_locks[POWER_LARGEST];
405b304997SSteven Grimm 
41a0e4a756Sdormando /* Connection lock around accepting new connections */
42a0e4a756Sdormando pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER;
43a0e4a756Sdormando 
44de1f30c5Sdormando #if !defined(HAVE_GCC_ATOMICS) && !defined(__sun)
453b961388Sdormando pthread_mutex_t atomics_mutex = PTHREAD_MUTEX_INITIALIZER;
463b961388Sdormando #endif
473b961388Sdormando 
485b304997SSteven Grimm /* Lock for global stats */
49543b1a34Sdormando static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
505b304997SSteven Grimm 
516af7aa0bSdormando /* Lock to cause worker threads to hang up after being woken */
526af7aa0bSdormando static pthread_mutex_t worker_hang_lock;
536af7aa0bSdormando 
545b304997SSteven Grimm /* Free list of CQ_ITEM structs */
555b304997SSteven Grimm static CQ_ITEM *cqi_freelist;
565b304997SSteven Grimm static pthread_mutex_t cqi_freelist_lock;
575b304997SSteven Grimm 
588fe5bf1fSdormando static pthread_mutex_t *item_locks;
5997409b31Sdormando /* size of the item lock hash table */
6097409b31Sdormando static uint32_t item_lock_count;
61d2676b4aSJason CHAN unsigned int item_lock_hashpower;
621c94e12cSdormando #define hashsize(n) ((unsigned long int)1<<(n))
631c94e12cSdormando #define hashmask(n) (hashsize(n)-1)
648fe5bf1fSdormando 
652fe44f1cSDmitry Isaykin static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;
662fe44f1cSDmitry Isaykin 
675b304997SSteven Grimm /*
685b304997SSteven Grimm  * Each libevent instance has a wakeup pipe, which other threads
695b304997SSteven Grimm  * can use to signal that they've put a new connection on its queue.
705b304997SSteven Grimm  */
715b304997SSteven Grimm static LIBEVENT_THREAD *threads;
725b304997SSteven Grimm 
735b304997SSteven Grimm /*
742fe44f1cSDmitry Isaykin  * Number of worker threads that have finished setting themselves up.
755b304997SSteven Grimm  */
765b304997SSteven Grimm static int init_count = 0;
775b304997SSteven Grimm static pthread_mutex_t init_lock;
785b304997SSteven Grimm static pthread_cond_t init_cond;
795b304997SSteven Grimm 
805b304997SSteven Grimm 
815b304997SSteven Grimm static void thread_libevent_process(int fd, short which, void *arg);
825b304997SSteven Grimm 
refcount_incr(unsigned short * refcount)83b19b41d9SSteve Wills unsigned short refcount_incr(unsigned short *refcount) {
84de1f30c5Sdormando #ifdef HAVE_GCC_ATOMICS
853b961388Sdormando     return __sync_add_and_fetch(refcount, 1);
863b961388Sdormando #elif defined(__sun)
873b961388Sdormando     return atomic_inc_ushort_nv(refcount);
883b961388Sdormando #else
893b961388Sdormando     unsigned short res;
903b961388Sdormando     mutex_lock(&atomics_mutex);
91de1f30c5Sdormando     (*refcount)++;
923b961388Sdormando     res = *refcount;
93890dfb75Sdormando     mutex_unlock(&atomics_mutex);
943b961388Sdormando     return res;
953b961388Sdormando #endif
963b961388Sdormando }
973b961388Sdormando 
refcount_decr(unsigned short * refcount)98b19b41d9SSteve Wills unsigned short refcount_decr(unsigned short *refcount) {
99de1f30c5Sdormando #ifdef HAVE_GCC_ATOMICS
1003b961388Sdormando     return __sync_sub_and_fetch(refcount, 1);
1013b961388Sdormando #elif defined(__sun)
1023b961388Sdormando     return atomic_dec_ushort_nv(refcount);
1033b961388Sdormando #else
1043b961388Sdormando     unsigned short res;
1053b961388Sdormando     mutex_lock(&atomics_mutex);
106de1f30c5Sdormando     (*refcount)--;
1073b961388Sdormando     res = *refcount;
108890dfb75Sdormando     mutex_unlock(&atomics_mutex);
1093b961388Sdormando     return res;
1103b961388Sdormando #endif
1113b961388Sdormando }
1123b961388Sdormando 
11369d1c699Sdormando /* item_lock() must be held for an item before any modifications to either its
11469d1c699Sdormando  * associated hash bucket, or the structure itself.
11569d1c699Sdormando  * LRU modifications must hold the item lock, and the LRU lock.
11669d1c699Sdormando  * LRU's accessing items must item_trylock() before modifying an item.
11769d1c699Sdormando  * Items accessable from an LRU must not be freed or modified
11869d1c699Sdormando  * without first locking and removing from the LRU.
11969d1c699Sdormando  */
12069d1c699Sdormando 
item_lock(uint32_t hv)1211c94e12cSdormando void item_lock(uint32_t hv) {
1221d551408Sdormando     mutex_lock(&item_locks[hv & hashmask(item_lock_hashpower)]);
1231c94e12cSdormando }
1241c94e12cSdormando 
item_trylock(uint32_t hv)1251c94e12cSdormando void *item_trylock(uint32_t hv) {
1261d551408Sdormando     pthread_mutex_t *lock = &item_locks[hv & hashmask(item_lock_hashpower)];
1271c94e12cSdormando     if (pthread_mutex_trylock(lock) == 0) {
1281c94e12cSdormando         return lock;
1291c94e12cSdormando     }
1301c94e12cSdormando     return NULL;
1311c94e12cSdormando }
1321c94e12cSdormando 
item_trylock_unlock(void * lock)1331c94e12cSdormando void item_trylock_unlock(void *lock) {
1341c94e12cSdormando     mutex_unlock((pthread_mutex_t *) lock);
1352db1bf46Sdormando }
1362db1bf46Sdormando 
item_unlock(uint32_t hv)1378fe5bf1fSdormando void item_unlock(uint32_t hv) {
1381d551408Sdormando     mutex_unlock(&item_locks[hv & hashmask(item_lock_hashpower)]);
1391c94e12cSdormando }
1401c94e12cSdormando 
wait_for_thread_registration(int nthreads)1411c94e12cSdormando static void wait_for_thread_registration(int nthreads) {
1421c94e12cSdormando     while (init_count < nthreads) {
1431c94e12cSdormando         pthread_cond_wait(&init_cond, &init_lock);
1441c94e12cSdormando     }
1451c94e12cSdormando }
1461c94e12cSdormando 
register_thread_initialized(void)1471c94e12cSdormando static void register_thread_initialized(void) {
1481c94e12cSdormando     pthread_mutex_lock(&init_lock);
1491c94e12cSdormando     init_count++;
1501c94e12cSdormando     pthread_cond_signal(&init_cond);
1511c94e12cSdormando     pthread_mutex_unlock(&init_lock);
1526af7aa0bSdormando     /* Force worker threads to pile up if someone wants us to */
1536af7aa0bSdormando     pthread_mutex_lock(&worker_hang_lock);
1546af7aa0bSdormando     pthread_mutex_unlock(&worker_hang_lock);
1551c94e12cSdormando }
1561c94e12cSdormando 
15769d1c699Sdormando /* Must not be called with any deeper locks held */
pause_threads(enum pause_thread_types type)1586af7aa0bSdormando void pause_threads(enum pause_thread_types type) {
1591c94e12cSdormando     char buf[1];
1601c94e12cSdormando     int i;
1611c94e12cSdormando 
1626af7aa0bSdormando     buf[0] = 0;
1631c94e12cSdormando     switch (type) {
1646af7aa0bSdormando         case PAUSE_ALL_THREADS:
1656af7aa0bSdormando             slabs_rebalancer_pause();
1666af7aa0bSdormando             lru_crawler_pause();
167fb269897Sdormando             lru_maintainer_pause();
1686af7aa0bSdormando         case PAUSE_WORKER_THREADS:
1696af7aa0bSdormando             buf[0] = 'p';
1706af7aa0bSdormando             pthread_mutex_lock(&worker_hang_lock);
1711c94e12cSdormando             break;
1726af7aa0bSdormando         case RESUME_ALL_THREADS:
1736af7aa0bSdormando             slabs_rebalancer_resume();
1746af7aa0bSdormando             lru_crawler_resume();
175fb269897Sdormando             lru_maintainer_resume();
1766af7aa0bSdormando         case RESUME_WORKER_THREADS:
1776af7aa0bSdormando             pthread_mutex_unlock(&worker_hang_lock);
1781c94e12cSdormando             break;
1791c94e12cSdormando         default:
1801c94e12cSdormando             fprintf(stderr, "Unknown lock type: %d\n", type);
1811c94e12cSdormando             assert(1 == 0);
1821c94e12cSdormando             break;
1831c94e12cSdormando     }
1841c94e12cSdormando 
1856af7aa0bSdormando     /* Only send a message if we have one. */
1866af7aa0bSdormando     if (buf[0] == 0) {
1876af7aa0bSdormando         return;
1886af7aa0bSdormando     }
1896af7aa0bSdormando 
1901c94e12cSdormando     pthread_mutex_lock(&init_lock);
1911c94e12cSdormando     init_count = 0;
1921c94e12cSdormando     for (i = 0; i < settings.num_threads; i++) {
1931c94e12cSdormando         if (write(threads[i].notify_send_fd, buf, 1) != 1) {
1941c94e12cSdormando             perror("Failed writing to notify pipe");
1951c94e12cSdormando             /* TODO: This is a fatal problem. Can it ever happen temporarily? */
1961c94e12cSdormando         }
1971c94e12cSdormando     }
1981c94e12cSdormando     wait_for_thread_registration(settings.num_threads);
1991c94e12cSdormando     pthread_mutex_unlock(&init_lock);
2008fe5bf1fSdormando }
2018fe5bf1fSdormando 
2025b304997SSteven Grimm /*
2035b304997SSteven Grimm  * Initializes a connection queue.
2045b304997SSteven Grimm  */
cq_init(CQ * cq)2055b304997SSteven Grimm static void cq_init(CQ *cq) {
2065b304997SSteven Grimm     pthread_mutex_init(&cq->lock, NULL);
2075b304997SSteven Grimm     cq->head = NULL;
2085b304997SSteven Grimm     cq->tail = NULL;
2095b304997SSteven Grimm }
2105b304997SSteven Grimm 
2115b304997SSteven Grimm /*
2125b304997SSteven Grimm  * Looks for an item on a connection queue, but doesn't block if there isn't
2135b304997SSteven Grimm  * one.
214d9b97d80SPaul Lindner  * Returns the item, or NULL if no item is available
2155b304997SSteven Grimm  */
cq_pop(CQ * cq)216b2b49429SToru Maesaka static CQ_ITEM *cq_pop(CQ *cq) {
2175b304997SSteven Grimm     CQ_ITEM *item;
2185b304997SSteven Grimm 
2195b304997SSteven Grimm     pthread_mutex_lock(&cq->lock);
2205b304997SSteven Grimm     item = cq->head;
2215b304997SSteven Grimm     if (NULL != item) {
2225b304997SSteven Grimm         cq->head = item->next;
2235b304997SSteven Grimm         if (NULL == cq->head)
2245b304997SSteven Grimm             cq->tail = NULL;
2255b304997SSteven Grimm     }
2265b304997SSteven Grimm     pthread_mutex_unlock(&cq->lock);
2275b304997SSteven Grimm 
2285b304997SSteven Grimm     return item;
2295b304997SSteven Grimm }
2305b304997SSteven Grimm 
2315b304997SSteven Grimm /*
2325b304997SSteven Grimm  * Adds an item to a connection queue.
2335b304997SSteven Grimm  */
cq_push(CQ * cq,CQ_ITEM * item)2345b304997SSteven Grimm static void cq_push(CQ *cq, CQ_ITEM *item) {
2355b304997SSteven Grimm     item->next = NULL;
2365b304997SSteven Grimm 
2375b304997SSteven Grimm     pthread_mutex_lock(&cq->lock);
2385b304997SSteven Grimm     if (NULL == cq->tail)
2395b304997SSteven Grimm         cq->head = item;
2405b304997SSteven Grimm     else
2415b304997SSteven Grimm         cq->tail->next = item;
2425b304997SSteven Grimm     cq->tail = item;
2435b304997SSteven Grimm     pthread_mutex_unlock(&cq->lock);
2445b304997SSteven Grimm }
2455b304997SSteven Grimm 
2465b304997SSteven Grimm /*
2475b304997SSteven Grimm  * Returns a fresh connection queue item.
2485b304997SSteven Grimm  */
cqi_new(void)249df1b7e42STrond Norbye static CQ_ITEM *cqi_new(void) {
2505b304997SSteven Grimm     CQ_ITEM *item = NULL;
2515b304997SSteven Grimm     pthread_mutex_lock(&cqi_freelist_lock);
2525b304997SSteven Grimm     if (cqi_freelist) {
2535b304997SSteven Grimm         item = cqi_freelist;
2545b304997SSteven Grimm         cqi_freelist = item->next;
2555b304997SSteven Grimm     }
2565b304997SSteven Grimm     pthread_mutex_unlock(&cqi_freelist_lock);
2575b304997SSteven Grimm 
2585b304997SSteven Grimm     if (NULL == item) {
2595b304997SSteven Grimm         int i;
2605b304997SSteven Grimm 
2615b304997SSteven Grimm         /* Allocate a bunch of items at once to reduce fragmentation */
2625b304997SSteven Grimm         item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC);
263de021a9cSTrond Norbye         if (NULL == item) {
264de021a9cSTrond Norbye             STATS_LOCK();
265de021a9cSTrond Norbye             stats.malloc_fails++;
266de021a9cSTrond Norbye             STATS_UNLOCK();
2675b304997SSteven Grimm             return NULL;
268de021a9cSTrond Norbye         }
2695b304997SSteven Grimm 
2705b304997SSteven Grimm         /*
2715b304997SSteven Grimm          * Link together all the new items except the first one
2725b304997SSteven Grimm          * (which we'll return to the caller) for placement on
2735b304997SSteven Grimm          * the freelist.
2745b304997SSteven Grimm          */
2755b304997SSteven Grimm         for (i = 2; i < ITEMS_PER_ALLOC; i++)
2765b304997SSteven Grimm             item[i - 1].next = &item[i];
2775b304997SSteven Grimm 
2785b304997SSteven Grimm         pthread_mutex_lock(&cqi_freelist_lock);
2795b304997SSteven Grimm         item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
2805b304997SSteven Grimm         cqi_freelist = &item[1];
2815b304997SSteven Grimm         pthread_mutex_unlock(&cqi_freelist_lock);
2825b304997SSteven Grimm     }
2835b304997SSteven Grimm 
2845b304997SSteven Grimm     return item;
2855b304997SSteven Grimm }
2865b304997SSteven Grimm 
2875b304997SSteven Grimm 
2885b304997SSteven Grimm /*
2895b304997SSteven Grimm  * Frees a connection queue item (adds it to the freelist.)
2905b304997SSteven Grimm  */
cqi_free(CQ_ITEM * item)2915b304997SSteven Grimm static void cqi_free(CQ_ITEM *item) {
2925b304997SSteven Grimm     pthread_mutex_lock(&cqi_freelist_lock);
2935b304997SSteven Grimm     item->next = cqi_freelist;
2945b304997SSteven Grimm     cqi_freelist = item;
2955b304997SSteven Grimm     pthread_mutex_unlock(&cqi_freelist_lock);
2965b304997SSteven Grimm }
2975b304997SSteven Grimm 
2985b304997SSteven Grimm 
2995b304997SSteven Grimm /*
3005b304997SSteven Grimm  * Creates a worker thread.
3015b304997SSteven Grimm  */
create_worker(void * (* func)(void *),void * arg)3025b304997SSteven Grimm static void create_worker(void *(*func)(void *), void *arg) {
3035b304997SSteven Grimm     pthread_attr_t  attr;
3045b304997SSteven Grimm     int             ret;
3055b304997SSteven Grimm 
3065b304997SSteven Grimm     pthread_attr_init(&attr);
3075b304997SSteven Grimm 
3087dc726beSSaman Barghi     if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0) {
3095b304997SSteven Grimm         fprintf(stderr, "Can't create thread: %s\n",
3105b304997SSteven Grimm                 strerror(ret));
3115b304997SSteven Grimm         exit(1);
3125b304997SSteven Grimm     }
3135b304997SSteven Grimm }
3145b304997SSteven Grimm 
315a0e4a756Sdormando /*
316a0e4a756Sdormando  * Sets whether or not we accept new connections.
317a0e4a756Sdormando  */
accept_new_conns(const bool do_accept)318a0e4a756Sdormando void accept_new_conns(const bool do_accept) {
319a0e4a756Sdormando     pthread_mutex_lock(&conn_lock);
320a0e4a756Sdormando     do_accept_new_conns(do_accept);
321a0e4a756Sdormando     pthread_mutex_unlock(&conn_lock);
322a0e4a756Sdormando }
3235b304997SSteven Grimm /****************************** LIBEVENT THREADS *****************************/
3245b304997SSteven Grimm 
3255b304997SSteven Grimm /*
3265b304997SSteven Grimm  * Set up a thread's information.
3275b304997SSteven Grimm  */
setup_thread(LIBEVENT_THREAD * me)3285b304997SSteven Grimm static void setup_thread(LIBEVENT_THREAD *me) {
3295b304997SSteven Grimm     me->base = event_init();
3305b304997SSteven Grimm     if (! me->base) {
3315b304997SSteven Grimm         fprintf(stderr, "Can't allocate event base\n");
3325b304997SSteven Grimm         exit(1);
3335b304997SSteven Grimm     }
3345b304997SSteven Grimm 
3355b304997SSteven Grimm     /* Listen for notifications from other threads */
3365b304997SSteven Grimm     event_set(&me->notify_event, me->notify_receive_fd,
3375b304997SSteven Grimm               EV_READ | EV_PERSIST, thread_libevent_process, me);
3385b304997SSteven Grimm     event_base_set(me->base, &me->notify_event);
3395b304997SSteven Grimm 
3405b304997SSteven Grimm     if (event_add(&me->notify_event, 0) == -1) {
3415b304997SSteven Grimm         fprintf(stderr, "Can't monitor libevent notify pipe\n");
3425b304997SSteven Grimm         exit(1);
3435b304997SSteven Grimm     }
3445b304997SSteven Grimm 
3451fdfb7e9STrond Norbye     me->new_conn_queue = malloc(sizeof(struct conn_queue));
3461fdfb7e9STrond Norbye     if (me->new_conn_queue == NULL) {
3471fdfb7e9STrond Norbye         perror("Failed to allocate memory for connection queue");
3481fdfb7e9STrond Norbye         exit(EXIT_FAILURE);
3491fdfb7e9STrond Norbye     }
3501fdfb7e9STrond Norbye     cq_init(me->new_conn_queue);
3511fdfb7e9STrond Norbye 
3521fdfb7e9STrond Norbye     if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
3531fdfb7e9STrond Norbye         perror("Failed to initialize mutex");
3541fdfb7e9STrond Norbye         exit(EXIT_FAILURE);
3551fdfb7e9STrond Norbye     }
3564c86fa59STrond Norbye 
3574c86fa59STrond Norbye     me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
3584c86fa59STrond Norbye                                     NULL, NULL);
3594c86fa59STrond Norbye     if (me->suffix_cache == NULL) {
3604c86fa59STrond Norbye         fprintf(stderr, "Failed to create suffix cache\n");
3614c86fa59STrond Norbye         exit(EXIT_FAILURE);
3624c86fa59STrond Norbye     }
3635b304997SSteven Grimm }
3645b304997SSteven Grimm 
3655b304997SSteven Grimm /*
3665b304997SSteven Grimm  * Worker thread: main event loop
3675b304997SSteven Grimm  */
worker_libevent(void * arg)3685b304997SSteven Grimm static void *worker_libevent(void *arg) {
3695b304997SSteven Grimm     LIBEVENT_THREAD *me = arg;
3705b304997SSteven Grimm 
371434c7cc5Sdormando     /* Any per-thread setup can happen here; memcached_thread_init() will block until
3725b304997SSteven Grimm      * all threads have finished initializing.
3735b304997SSteven Grimm      */
374916fff36Sdormando     me->l = logger_create();
375916fff36Sdormando     if (me->l == NULL) {
376916fff36Sdormando         abort();
377916fff36Sdormando     }
3785b304997SSteven Grimm 
3791c94e12cSdormando     register_thread_initialized();
3805b304997SSteven Grimm 
381df1b7e42STrond Norbye     event_base_loop(me->base, 0);
382df1b7e42STrond Norbye     return NULL;
3835b304997SSteven Grimm }
3845b304997SSteven Grimm 
3855b304997SSteven Grimm 
3865b304997SSteven Grimm /*
3875b304997SSteven Grimm  * Processes an incoming "handle a new connection" item. This is called when
3885b304997SSteven Grimm  * input arrives on the libevent wakeup pipe.
3895b304997SSteven Grimm  */
thread_libevent_process(int fd,short which,void * arg)3905b304997SSteven Grimm static void thread_libevent_process(int fd, short which, void *arg) {
3915b304997SSteven Grimm     LIBEVENT_THREAD *me = arg;
3925b304997SSteven Grimm     CQ_ITEM *item;
3935b304997SSteven Grimm     char buf[1];
39483ba6bd9SJay Grizzard     unsigned int timeout_fd;
3955b304997SSteven Grimm 
39683ba6bd9SJay Grizzard     if (read(fd, buf, 1) != 1) {
3975b304997SSteven Grimm         if (settings.verbose > 0)
3985b304997SSteven Grimm             fprintf(stderr, "Can't read from libevent pipe\n");
39983ba6bd9SJay Grizzard         return;
40083ba6bd9SJay Grizzard     }
4015b304997SSteven Grimm 
4021c94e12cSdormando     switch (buf[0]) {
4031c94e12cSdormando     case 'c':
4041fdfb7e9STrond Norbye         item = cq_pop(me->new_conn_queue);
405d9b97d80SPaul Lindner 
406d9b97d80SPaul Lindner         if (NULL != item) {
4075b304997SSteven Grimm             conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
4081e6a1326SJay Grizzard                                item->read_buffer_size, item->transport,
4091e6a1326SJay Grizzard                                me->base);
4100b211d45SSteven Grimm             if (c == NULL) {
41115ace4b5SEric Lambert                 if (IS_UDP(item->transport)) {
4125b304997SSteven Grimm                     fprintf(stderr, "Can't listen for events on UDP socket\n");
4135b304997SSteven Grimm                     exit(1);
414d9b97d80SPaul Lindner                 } else {
4155b304997SSteven Grimm                     if (settings.verbose > 0) {
4165b304997SSteven Grimm                         fprintf(stderr, "Can't listen for events on fd %d\n",
4175b304997SSteven Grimm                             item->sfd);
4185b304997SSteven Grimm                     }
4195b304997SSteven Grimm                     close(item->sfd);
4205b304997SSteven Grimm                 }
4211fdfb7e9STrond Norbye             } else {
4221fdfb7e9STrond Norbye                 c->thread = me;
4235b304997SSteven Grimm             }
4245b304997SSteven Grimm             cqi_free(item);
4255b304997SSteven Grimm         }
4261c94e12cSdormando         break;
4276af7aa0bSdormando     /* we were told to pause and report in */
4286af7aa0bSdormando     case 'p':
4291c94e12cSdormando         register_thread_initialized();
4301c94e12cSdormando         break;
43183ba6bd9SJay Grizzard     /* a client socket timed out */
43283ba6bd9SJay Grizzard     case 't':
43383ba6bd9SJay Grizzard         if (read(fd, &timeout_fd, sizeof(timeout_fd)) != sizeof(timeout_fd)) {
43483ba6bd9SJay Grizzard             if (settings.verbose > 0)
43583ba6bd9SJay Grizzard                 fprintf(stderr, "Can't read timeout fd from libevent pipe\n");
43683ba6bd9SJay Grizzard             return;
43783ba6bd9SJay Grizzard         }
43883ba6bd9SJay Grizzard         conn_close_idle(conns[timeout_fd]);
43983ba6bd9SJay Grizzard         break;
4401c94e12cSdormando     }
4415b304997SSteven Grimm }
4425b304997SSteven Grimm 
4435b304997SSteven Grimm /* Which thread we assigned a connection to most recently. */
4442fe44f1cSDmitry Isaykin static int last_thread = -1;
4455b304997SSteven Grimm 
4465b304997SSteven Grimm /*
4475b304997SSteven Grimm  * Dispatches a new connection to another thread. This is only ever called
4485b304997SSteven Grimm  * from the main thread, either during initialization (for UDP) or because
4495b304997SSteven Grimm  * of an incoming connection.
4505b304997SSteven Grimm  */
dispatch_conn_new(int sfd,enum conn_states init_state,int event_flags,int read_buffer_size,enum network_transport transport)4511437fe90STrond Norbye void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
45215ace4b5SEric Lambert                        int read_buffer_size, enum network_transport transport) {
4535b304997SSteven Grimm     CQ_ITEM *item = cqi_new();
4541c94e12cSdormando     char buf[1];
4556af1d265STrond Norbye     if (item == NULL) {
4566af1d265STrond Norbye         close(sfd);
4576af1d265STrond Norbye         /* given that malloc failed this may also fail, but let's try */
4586af1d265STrond Norbye         fprintf(stderr, "Failed to allocate memory for connection object\n");
4596af1d265STrond Norbye         return ;
4606af1d265STrond Norbye     }
4616af1d265STrond Norbye 
4622fe44f1cSDmitry Isaykin     int tid = (last_thread + 1) % settings.num_threads;
4635b304997SSteven Grimm 
464869f1868Sdormando     LIBEVENT_THREAD *thread = threads + tid;
465869f1868Sdormando 
466869f1868Sdormando     last_thread = tid;
4675b304997SSteven Grimm 
4685b304997SSteven Grimm     item->sfd = sfd;
4695b304997SSteven Grimm     item->init_state = init_state;
4705b304997SSteven Grimm     item->event_flags = event_flags;
4715b304997SSteven Grimm     item->read_buffer_size = read_buffer_size;
47215ace4b5SEric Lambert     item->transport = transport;
4735b304997SSteven Grimm 
4741fdfb7e9STrond Norbye     cq_push(thread->new_conn_queue, item);
47568957214STrond Norbye 
476b2e7e909SRicky Zhou     MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
4771c94e12cSdormando     buf[0] = 'c';
4781c94e12cSdormando     if (write(thread->notify_send_fd, buf, 1) != 1) {
4795b304997SSteven Grimm         perror("Writing to thread notify pipe");
4805b304997SSteven Grimm     }
4815b304997SSteven Grimm }
4825b304997SSteven Grimm 
4835b304997SSteven Grimm /*
484916fff36Sdormando  * Re-dispatches a connection back to the original thread. Can be called from
485916fff36Sdormando  * any side thread borrowing a connection.
486916fff36Sdormando  * TODO: Look into this. too complicated?
487916fff36Sdormando  */
488916fff36Sdormando #ifdef BOGUS_DEFINE
redispatch_conn(conn * c)489916fff36Sdormando void redispatch_conn(conn *c) {
490916fff36Sdormando     CQ_ITEM *item = cqi_new();
491916fff36Sdormando     char buf[1];
492916fff36Sdormando     if (item == NULL) {
493916fff36Sdormando         /* Can't cleanly redispatch connection. close it forcefully. */
494916fff36Sdormando         /* FIXME: is conn_cleanup() necessary?
495916fff36Sdormando          * if conn was handed off to a side thread it should be clean.
496916fff36Sdormando          * could also put it into a "clean_me" state?
497916fff36Sdormando          */
498916fff36Sdormando         c->state = conn_closed;
499916fff36Sdormando         close(c->sfd);
500916fff36Sdormando         return;
501916fff36Sdormando     }
502916fff36Sdormando     LIBEVENT_THREAD *thread = c->thread;
503916fff36Sdormando     item->sfd = sfd;
504916fff36Sdormando     /* pass in the state somehow?
505916fff36Sdormando     item->init_state = conn_closing; */
506916fff36Sdormando     item->event_flags = c->event_flags;
507916fff36Sdormando     item->conn = c;
508916fff36Sdormando }
509916fff36Sdormando #endif
510916fff36Sdormando 
511916fff36Sdormando /* This misses the allow_new_conns flag :( */
sidethread_conn_close(conn * c)512916fff36Sdormando void sidethread_conn_close(conn *c) {
513916fff36Sdormando     c->state = conn_closed;
514916fff36Sdormando     if (settings.verbose > 1)
515916fff36Sdormando         fprintf(stderr, "<%d connection closed from side thread.\n", c->sfd);
516916fff36Sdormando     close(c->sfd);
517916fff36Sdormando 
518916fff36Sdormando     STATS_LOCK();
519cb01d504Sdormando     stats_state.curr_conns--;
520916fff36Sdormando     STATS_UNLOCK();
521916fff36Sdormando 
522916fff36Sdormando     return;
523916fff36Sdormando }
524916fff36Sdormando 
525916fff36Sdormando /*
5265b304997SSteven Grimm  * Returns true if this is the thread that listens for new TCP connections.
5275b304997SSteven Grimm  */
is_listen_thread()528a9dcd9acSToru Maesaka int is_listen_thread() {
5292fe44f1cSDmitry Isaykin     return pthread_self() == dispatcher_thread.thread_id;
5305b304997SSteven Grimm }
5315b304997SSteven Grimm 
5325b304997SSteven Grimm /********************************* ITEM ACCESS *******************************/
5335b304997SSteven Grimm 
5345b304997SSteven Grimm /*
5355b304997SSteven Grimm  * Allocates a new item.
5365b304997SSteven Grimm  */
item_alloc(char * key,size_t nkey,int flags,rel_time_t exptime,int nbytes)537a9dcd9acSToru Maesaka item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
5385b304997SSteven Grimm     item *it;
53909f54cb4Sdormando     /* do_item_alloc handles its own locks */
540*690a9a9dSEiichi Tsukata     it = do_item_alloc(key, nkey, flags, exptime, nbytes);
5415b304997SSteven Grimm     return it;
5425b304997SSteven Grimm }
5435b304997SSteven Grimm 
5445b304997SSteven Grimm /*
5455da8dbabSTrond Norbye  * Returns an item if it hasn't been marked as expired,
5465b304997SSteven Grimm  * lazy-expiring as needed.
5475b304997SSteven Grimm  */
item_get(const char * key,const size_t nkey,conn * c)5486895d23eSsergiocarlos item *item_get(const char *key, const size_t nkey, conn *c) {
5495b304997SSteven Grimm     item *it;
550bab9acd1Sdormando     uint32_t hv;
55105ca809cSdormando     hv = hash(key, nkey);
5528fe5bf1fSdormando     item_lock(hv);
5536895d23eSsergiocarlos     it = do_item_get(key, nkey, hv, c);
5548fe5bf1fSdormando     item_unlock(hv);
5555b304997SSteven Grimm     return it;
5565b304997SSteven Grimm }
5575b304997SSteven Grimm 
item_touch(const char * key,size_t nkey,uint32_t exptime,conn * c)5586895d23eSsergiocarlos item *item_touch(const char *key, size_t nkey, uint32_t exptime, conn *c) {
559d87f568aSdormando     item *it;
560bab9acd1Sdormando     uint32_t hv;
56105ca809cSdormando     hv = hash(key, nkey);
5628fe5bf1fSdormando     item_lock(hv);
5636895d23eSsergiocarlos     it = do_item_touch(key, nkey, exptime, hv, c);
5648fe5bf1fSdormando     item_unlock(hv);
565d87f568aSdormando     return it;
566d87f568aSdormando }
567d87f568aSdormando 
5685b304997SSteven Grimm /*
5695b304997SSteven Grimm  * Links an item into the LRU and hashtable.
5705b304997SSteven Grimm  */
item_link(item * item)571a9dcd9acSToru Maesaka int item_link(item *item) {
5725b304997SSteven Grimm     int ret;
573bab9acd1Sdormando     uint32_t hv;
5745b304997SSteven Grimm 
57505ca809cSdormando     hv = hash(ITEM_key(item), item->nkey);
5768fe5bf1fSdormando     item_lock(hv);
577bab9acd1Sdormando     ret = do_item_link(item, hv);
5788fe5bf1fSdormando     item_unlock(hv);
5795b304997SSteven Grimm     return ret;
5805b304997SSteven Grimm }
5815b304997SSteven Grimm 
5825b304997SSteven Grimm /*
5835b304997SSteven Grimm  * Decrements the reference count on an item and adds it to the freelist if
5845b304997SSteven Grimm  * needed.
5855b304997SSteven Grimm  */
item_remove(item * item)586a9dcd9acSToru Maesaka void item_remove(item *item) {
5878fe5bf1fSdormando     uint32_t hv;
58805ca809cSdormando     hv = hash(ITEM_key(item), item->nkey);
5898fe5bf1fSdormando 
5908fe5bf1fSdormando     item_lock(hv);
5915b304997SSteven Grimm     do_item_remove(item);
5928fe5bf1fSdormando     item_unlock(hv);
5935b304997SSteven Grimm }
5945b304997SSteven Grimm 
5955b304997SSteven Grimm /*
5965b304997SSteven Grimm  * Replaces one item with another in the hashtable.
597600ec99aSToru Maesaka  * Unprotected by a mutex lock since the core server does not require
598600ec99aSToru Maesaka  * it to be thread-safe.
5995b304997SSteven Grimm  */
item_replace(item * old_it,item * new_it,const uint32_t hv)600bab9acd1Sdormando int item_replace(item *old_it, item *new_it, const uint32_t hv) {
601bab9acd1Sdormando     return do_item_replace(old_it, new_it, hv);
6025b304997SSteven Grimm }
6035b304997SSteven Grimm 
6045b304997SSteven Grimm /*
6055b304997SSteven Grimm  * Unlinks an item from the LRU and hashtable.
6065b304997SSteven Grimm  */
item_unlink(item * item)607a9dcd9acSToru Maesaka void item_unlink(item *item) {
608bab9acd1Sdormando     uint32_t hv;
60905ca809cSdormando     hv = hash(ITEM_key(item), item->nkey);
6108fe5bf1fSdormando     item_lock(hv);
611bab9acd1Sdormando     do_item_unlink(item, hv);
6128fe5bf1fSdormando     item_unlock(hv);
6135b304997SSteven Grimm }
6145b304997SSteven Grimm 
6155b304997SSteven Grimm /*
6165b304997SSteven Grimm  * Moves an item to the back of the LRU queue.
6175b304997SSteven Grimm  */
item_update(item * item)618a9dcd9acSToru Maesaka void item_update(item *item) {
6198fe5bf1fSdormando     uint32_t hv;
62005ca809cSdormando     hv = hash(ITEM_key(item), item->nkey);
6218fe5bf1fSdormando 
6228fe5bf1fSdormando     item_lock(hv);
6235b304997SSteven Grimm     do_item_update(item);
6248fe5bf1fSdormando     item_unlock(hv);
6255b304997SSteven Grimm }
6265b304997SSteven Grimm 
6275b304997SSteven Grimm /*
6285b304997SSteven Grimm  * Does arithmetic on a numeric item value.
6295b304997SSteven Grimm  */
add_delta(conn * c,const char * key,const size_t nkey,int incr,const int64_t delta,char * buf,uint64_t * cas)630cbcd3872Sdormando enum delta_result_type add_delta(conn *c, const char *key,
631cbcd3872Sdormando                                  const size_t nkey, int incr,
632ea2d42a5Sdormando                                  const int64_t delta, char *buf,
633ea2d42a5Sdormando                                  uint64_t *cas) {
634d044acb2SDustin Sallings     enum delta_result_type ret;
635bab9acd1Sdormando     uint32_t hv;
6365b304997SSteven Grimm 
63705ca809cSdormando     hv = hash(key, nkey);
6388fe5bf1fSdormando     item_lock(hv);
639bab9acd1Sdormando     ret = do_add_delta(c, key, nkey, incr, delta, buf, cas, hv);
6408fe5bf1fSdormando     item_unlock(hv);
6415b304997SSteven Grimm     return ret;
6425b304997SSteven Grimm }
6435b304997SSteven Grimm 
6445b304997SSteven Grimm /*
6455b304997SSteven Grimm  * Stores an item in the cache (high level, obeys set/add/replace semantics)
6465b304997SSteven Grimm  */
store_item(item * item,int comm,conn * c)647e5d053c3SDustin Sallings enum store_item_type store_item(item *item, int comm, conn* c) {
648e5d053c3SDustin Sallings     enum store_item_type ret;
649bab9acd1Sdormando     uint32_t hv;
6505b304997SSteven Grimm 
65105ca809cSdormando     hv = hash(ITEM_key(item), item->nkey);
6528fe5bf1fSdormando     item_lock(hv);
653bab9acd1Sdormando     ret = do_store_item(item, comm, c, hv);
6548fe5bf1fSdormando     item_unlock(hv);
6555b304997SSteven Grimm     return ret;
6565b304997SSteven Grimm }
6575b304997SSteven Grimm 
6585b304997SSteven Grimm /******************************* GLOBAL STATS ******************************/
6595b304997SSteven Grimm 
STATS_LOCK()660a9dcd9acSToru Maesaka void STATS_LOCK() {
6614be8a5a7STrond Norbye     pthread_mutex_lock(&stats_lock);
6625b304997SSteven Grimm }
6635b304997SSteven Grimm 
STATS_UNLOCK()664a9dcd9acSToru Maesaka void STATS_UNLOCK() {
6654be8a5a7STrond Norbye     pthread_mutex_unlock(&stats_lock);
6665b304997SSteven Grimm }
6675b304997SSteven Grimm 
threadlocal_stats_reset(void)6681fdfb7e9STrond Norbye void threadlocal_stats_reset(void) {
66968c64594Sdormando     int ii;
670effae30eSTrond Norbye     for (ii = 0; ii < settings.num_threads; ++ii) {
6711fdfb7e9STrond Norbye         pthread_mutex_lock(&threads[ii].stats.mutex);
67268c64594Sdormando #define X(name) threads[ii].stats.name = 0;
67368c64594Sdormando         THREAD_STATS_FIELDS
67468c64594Sdormando #undef X
6751fdfb7e9STrond Norbye 
67668c64594Sdormando         memset(&threads[ii].stats.slab_stats, 0,
67768c64594Sdormando                 sizeof(threads[ii].stats.slab_stats));
67825b5189cSDustin Sallings 
6791fdfb7e9STrond Norbye         pthread_mutex_unlock(&threads[ii].stats.mutex);
6801fdfb7e9STrond Norbye     }
6811fdfb7e9STrond Norbye }
6821fdfb7e9STrond Norbye 
threadlocal_stats_aggregate(struct thread_stats * stats)6831fdfb7e9STrond Norbye void threadlocal_stats_aggregate(struct thread_stats *stats) {
68425b5189cSDustin Sallings     int ii, sid;
6851fdfb7e9STrond Norbye 
68692d6e124SDan McGee     /* The struct has a mutex, but we can safely set the whole thing
68792d6e124SDan McGee      * to zero since it is unused when aggregating. */
68892d6e124SDan McGee     memset(stats, 0, sizeof(*stats));
68925b5189cSDustin Sallings 
690effae30eSTrond Norbye     for (ii = 0; ii < settings.num_threads; ++ii) {
6911fdfb7e9STrond Norbye         pthread_mutex_lock(&threads[ii].stats.mutex);
69268c64594Sdormando #define X(name) stats->name += threads[ii].stats.name;
69368c64594Sdormando         THREAD_STATS_FIELDS
69468c64594Sdormando #undef X
6951fdfb7e9STrond Norbye 
69625b5189cSDustin Sallings         for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
69768c64594Sdormando #define X(name) stats->slab_stats[sid].name += \
69868c64594Sdormando             threads[ii].stats.slab_stats[sid].name;
69968c64594Sdormando             SLAB_STATS_FIELDS
70068c64594Sdormando #undef X
70125b5189cSDustin Sallings         }
70225b5189cSDustin Sallings 
7031fdfb7e9STrond Norbye         pthread_mutex_unlock(&threads[ii].stats.mutex);
7041fdfb7e9STrond Norbye     }
7051fdfb7e9STrond Norbye }
7061fdfb7e9STrond Norbye 
slab_stats_aggregate(struct thread_stats * stats,struct slab_stats * out)70725b5189cSDustin Sallings void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) {
70825b5189cSDustin Sallings     int sid;
70925b5189cSDustin Sallings 
71068c64594Sdormando     memset(out, 0, sizeof(*out));
71125b5189cSDustin Sallings 
71225b5189cSDustin Sallings     for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
71368c64594Sdormando #define X(name) out->name += stats->slab_stats[sid].name;
71468c64594Sdormando         SLAB_STATS_FIELDS
71568c64594Sdormando #undef X
71625b5189cSDustin Sallings     }
71725b5189cSDustin Sallings }
7181fdfb7e9STrond Norbye 
7195b304997SSteven Grimm /*
7205b304997SSteven Grimm  * Initializes the thread subsystem, creating various worker threads.
7215b304997SSteven Grimm  *
7222fe44f1cSDmitry Isaykin  * nthreads  Number of worker event handler threads to spawn
7235b304997SSteven Grimm  * main_base Event base for main thread
7245b304997SSteven Grimm  */
memcached_thread_init(int nthreads,struct event_base * main_base)725434c7cc5Sdormando void memcached_thread_init(int nthreads, struct event_base *main_base) {
7265b304997SSteven Grimm     int         i;
72797409b31Sdormando     int         power;
7285b304997SSteven Grimm 
7299bce42f2Sdormando     for (i = 0; i < POWER_LARGEST; i++) {
7309bce42f2Sdormando         pthread_mutex_init(&lru_locks[i], NULL);
7319bce42f2Sdormando     }
7326af7aa0bSdormando     pthread_mutex_init(&worker_hang_lock, NULL);
7335b304997SSteven Grimm 
7345b304997SSteven Grimm     pthread_mutex_init(&init_lock, NULL);
7355b304997SSteven Grimm     pthread_cond_init(&init_cond, NULL);
7365b304997SSteven Grimm 
7375b304997SSteven Grimm     pthread_mutex_init(&cqi_freelist_lock, NULL);
7385b304997SSteven Grimm     cqi_freelist = NULL;
7395b304997SSteven Grimm 
74097409b31Sdormando     /* Want a wide lock table, but don't waste memory */
74197409b31Sdormando     if (nthreads < 3) {
74297409b31Sdormando         power = 10;
74397409b31Sdormando     } else if (nthreads < 4) {
74497409b31Sdormando         power = 11;
74597409b31Sdormando     } else if (nthreads < 5) {
74697409b31Sdormando         power = 12;
74797409b31Sdormando     } else {
74897409b31Sdormando         /* 8192 buckets, and central locks don't scale much past 5 threads */
74997409b31Sdormando         power = 13;
75097409b31Sdormando     }
75197409b31Sdormando 
7526af7aa0bSdormando     if (power >= hashpower) {
7536af7aa0bSdormando         fprintf(stderr, "Hash table power size (%d) cannot be equal to or less than item lock table (%d)\n", hashpower, power);
7546af7aa0bSdormando         fprintf(stderr, "Item lock table grows with `-t N` (worker threadcount)\n");
7556af7aa0bSdormando         fprintf(stderr, "Hash table grows with `-o hashpower=N` \n");
7566af7aa0bSdormando         exit(1);
7576af7aa0bSdormando     }
7586af7aa0bSdormando 
7591c94e12cSdormando     item_lock_count = hashsize(power);
7601d551408Sdormando     item_lock_hashpower = power;
76197409b31Sdormando 
76297409b31Sdormando     item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
7638fe5bf1fSdormando     if (! item_locks) {
7648fe5bf1fSdormando         perror("Can't allocate item locks");
7658fe5bf1fSdormando         exit(1);
7668fe5bf1fSdormando     }
76797409b31Sdormando     for (i = 0; i < item_lock_count; i++) {
7688fe5bf1fSdormando         pthread_mutex_init(&item_locks[i], NULL);
7698fe5bf1fSdormando     }
7708fe5bf1fSdormando 
771b2e7e909SRicky Zhou     threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
7725b304997SSteven Grimm     if (! threads) {
7735b304997SSteven Grimm         perror("Can't allocate thread descriptors");
7745b304997SSteven Grimm         exit(1);
7755b304997SSteven Grimm     }
7765b304997SSteven Grimm 
7772fe44f1cSDmitry Isaykin     dispatcher_thread.base = main_base;
7782fe44f1cSDmitry Isaykin     dispatcher_thread.thread_id = pthread_self();
7795b304997SSteven Grimm 
7805b304997SSteven Grimm     for (i = 0; i < nthreads; i++) {
7815b304997SSteven Grimm         int fds[2];
7825b304997SSteven Grimm         if (pipe(fds)) {
7835b304997SSteven Grimm             perror("Can't create notify pipe");
7845b304997SSteven Grimm             exit(1);
7855b304997SSteven Grimm         }
7865b304997SSteven Grimm 
7875b304997SSteven Grimm         threads[i].notify_receive_fd = fds[0];
7885b304997SSteven Grimm         threads[i].notify_send_fd = fds[1];
7895b304997SSteven Grimm 
7905b304997SSteven Grimm         setup_thread(&threads[i]);
791d1f9d992Sdormando         /* Reserve three fds for the libevent base, and two for the pipe */
792cb01d504Sdormando         stats_state.reserved_fds += 5;
7935b304997SSteven Grimm     }
7945b304997SSteven Grimm 
7955b304997SSteven Grimm     /* Create threads after we've done all the libevent setup. */
7962fe44f1cSDmitry Isaykin     for (i = 0; i < nthreads; i++) {
7975b304997SSteven Grimm         create_worker(worker_libevent, &threads[i]);
7985b304997SSteven Grimm     }
7995b304997SSteven Grimm 
8005b304997SSteven Grimm     /* Wait for all the threads to set themselves up before returning. */
8015b304997SSteven Grimm     pthread_mutex_lock(&init_lock);
8021c94e12cSdormando     wait_for_thread_registration(nthreads);
8035b304997SSteven Grimm     pthread_mutex_unlock(&init_lock);
8045b304997SSteven Grimm }
8055b304997SSteven Grimm 
806