xref: /memcached-1.4.29/assoc.c (revision cb01d504)
1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  * Hash table
4  *
5  * The hash function used here is by Bob Jenkins, 1996:
6  *    <http://burtleburtle.net/bob/hash/doobs.html>
7  *       "By Bob Jenkins, 1996.  [email protected].
8  *       You may use this code any way you wish, private, educational,
9  *       or commercial.  It's free."
10  *
11  * The rest of the file is licensed under the BSD license.  See LICENSE.
12  */
13 
14 #include "memcached.h"
15 #include <sys/stat.h>
16 #include <sys/socket.h>
17 #include <sys/resource.h>
18 #include <signal.h>
19 #include <fcntl.h>
20 #include <netinet/in.h>
21 #include <errno.h>
22 #include <stdlib.h>
23 #include <stdio.h>
24 #include <string.h>
25 #include <assert.h>
26 #include <pthread.h>
27 
28 static pthread_cond_t maintenance_cond = PTHREAD_COND_INITIALIZER;
29 static pthread_mutex_t maintenance_lock = PTHREAD_MUTEX_INITIALIZER;
30 static pthread_mutex_t hash_items_counter_lock = PTHREAD_MUTEX_INITIALIZER;
31 
32 typedef  unsigned long  int  ub4;   /* unsigned 4-byte quantities */
33 typedef  unsigned       char ub1;   /* unsigned 1-byte quantities */
34 
35 /* how many powers of 2's worth of buckets we use */
36 unsigned int hashpower = HASHPOWER_DEFAULT;
37 
38 #define hashsize(n) ((ub4)1<<(n))
39 #define hashmask(n) (hashsize(n)-1)
40 
41 /* Main hash table. This is where we look except during expansion. */
42 static item** primary_hashtable = 0;
43 
44 /*
45  * Previous hash table. During expansion, we look here for keys that haven't
46  * been moved over to the primary yet.
47  */
48 static item** old_hashtable = 0;
49 
50 /* Number of items in the hash table. */
51 static unsigned int hash_items = 0;
52 
53 /* Flag: Are we in the middle of expanding now? */
54 static bool expanding = false;
55 static bool started_expanding = false;
56 
57 /*
58  * During expansion we migrate values with bucket granularity; this is how
59  * far we've gotten so far. Ranges from 0 .. hashsize(hashpower - 1) - 1.
60  */
61 static unsigned int expand_bucket = 0;
62 
assoc_init(const int hashtable_init)63 void assoc_init(const int hashtable_init) {
64     if (hashtable_init) {
65         hashpower = hashtable_init;
66     }
67     primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));
68     if (! primary_hashtable) {
69         fprintf(stderr, "Failed to init hashtable.\n");
70         exit(EXIT_FAILURE);
71     }
72     STATS_LOCK();
73     stats_state.hash_power_level = hashpower;
74     stats_state.hash_bytes = hashsize(hashpower) * sizeof(void *);
75     STATS_UNLOCK();
76 }
77 
assoc_find(const char * key,const size_t nkey,const uint32_t hv)78 item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) {
79     item *it;
80     unsigned int oldbucket;
81 
82     if (expanding &&
83         (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
84     {
85         it = old_hashtable[oldbucket];
86     } else {
87         it = primary_hashtable[hv & hashmask(hashpower)];
88     }
89 
90     item *ret = NULL;
91     int depth = 0;
92     while (it) {
93         if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) {
94             ret = it;
95             break;
96         }
97         it = it->h_next;
98         ++depth;
99     }
100     MEMCACHED_ASSOC_FIND(key, nkey, depth);
101     return ret;
102 }
103 
104 /* returns the address of the item pointer before the key.  if *item == 0,
105    the item wasn't found */
106 
_hashitem_before(const char * key,const size_t nkey,const uint32_t hv)107 static item** _hashitem_before (const char *key, const size_t nkey, const uint32_t hv) {
108     item **pos;
109     unsigned int oldbucket;
110 
111     if (expanding &&
112         (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
113     {
114         pos = &old_hashtable[oldbucket];
115     } else {
116         pos = &primary_hashtable[hv & hashmask(hashpower)];
117     }
118 
119     while (*pos && ((nkey != (*pos)->nkey) || memcmp(key, ITEM_key(*pos), nkey))) {
120         pos = &(*pos)->h_next;
121     }
122     return pos;
123 }
124 
125 /* grows the hashtable to the next power of 2. */
assoc_expand(void)126 static void assoc_expand(void) {
127     old_hashtable = primary_hashtable;
128 
129     primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
130     if (primary_hashtable) {
131         if (settings.verbose > 1)
132             fprintf(stderr, "Hash table expansion starting\n");
133         hashpower++;
134         expanding = true;
135         expand_bucket = 0;
136         STATS_LOCK();
137         stats_state.hash_power_level = hashpower;
138         stats_state.hash_bytes += hashsize(hashpower) * sizeof(void *);
139         stats_state.hash_is_expanding = true;
140         STATS_UNLOCK();
141     } else {
142         primary_hashtable = old_hashtable;
143         /* Bad news, but we can keep running. */
144     }
145 }
146 
assoc_start_expand(void)147 static void assoc_start_expand(void) {
148     if (started_expanding)
149         return;
150 
151     started_expanding = true;
152     pthread_cond_signal(&maintenance_cond);
153 }
154 
155 /* Note: this isn't an assoc_update.  The key must not already exist to call this */
assoc_insert(item * it,const uint32_t hv)156 int assoc_insert(item *it, const uint32_t hv) {
157     unsigned int oldbucket;
158 
159 //    assert(assoc_find(ITEM_key(it), it->nkey) == 0);  /* shouldn't have duplicately named things defined */
160 
161     if (expanding &&
162         (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
163     {
164         it->h_next = old_hashtable[oldbucket];
165         old_hashtable[oldbucket] = it;
166     } else {
167         it->h_next = primary_hashtable[hv & hashmask(hashpower)];
168         primary_hashtable[hv & hashmask(hashpower)] = it;
169     }
170 
171     pthread_mutex_lock(&hash_items_counter_lock);
172     hash_items++;
173     if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
174         assoc_start_expand();
175     }
176     pthread_mutex_unlock(&hash_items_counter_lock);
177 
178     MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey, hash_items);
179     return 1;
180 }
181 
assoc_delete(const char * key,const size_t nkey,const uint32_t hv)182 void assoc_delete(const char *key, const size_t nkey, const uint32_t hv) {
183     item **before = _hashitem_before(key, nkey, hv);
184 
185     if (*before) {
186         item *nxt;
187         pthread_mutex_lock(&hash_items_counter_lock);
188         hash_items--;
189         pthread_mutex_unlock(&hash_items_counter_lock);
190         /* The DTrace probe cannot be triggered as the last instruction
191          * due to possible tail-optimization by the compiler
192          */
193         MEMCACHED_ASSOC_DELETE(key, nkey, hash_items);
194         nxt = (*before)->h_next;
195         (*before)->h_next = 0;   /* probably pointless, but whatever. */
196         *before = nxt;
197         return;
198     }
199     /* Note:  we never actually get here.  the callers don't delete things
200        they can't find. */
201     assert(*before != 0);
202 }
203 
204 
205 static volatile int do_run_maintenance_thread = 1;
206 
207 #define DEFAULT_HASH_BULK_MOVE 1
208 int hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
209 
assoc_maintenance_thread(void * arg)210 static void *assoc_maintenance_thread(void *arg) {
211 
212     mutex_lock(&maintenance_lock);
213     while (do_run_maintenance_thread) {
214         int ii = 0;
215 
216         /* There is only one expansion thread, so no need to global lock. */
217         for (ii = 0; ii < hash_bulk_move && expanding; ++ii) {
218             item *it, *next;
219             int bucket;
220             void *item_lock = NULL;
221 
222             /* bucket = hv & hashmask(hashpower) =>the bucket of hash table
223              * is the lowest N bits of the hv, and the bucket of item_locks is
224              *  also the lowest M bits of hv, and N is greater than M.
225              *  So we can process expanding with only one item_lock. cool! */
226             if ((item_lock = item_trylock(expand_bucket))) {
227                     for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
228                         next = it->h_next;
229                         bucket = hash(ITEM_key(it), it->nkey) & hashmask(hashpower);
230                         it->h_next = primary_hashtable[bucket];
231                         primary_hashtable[bucket] = it;
232                     }
233 
234                     old_hashtable[expand_bucket] = NULL;
235 
236                     expand_bucket++;
237                     if (expand_bucket == hashsize(hashpower - 1)) {
238                         expanding = false;
239                         free(old_hashtable);
240                         STATS_LOCK();
241                         stats_state.hash_bytes -= hashsize(hashpower - 1) * sizeof(void *);
242                         stats_state.hash_is_expanding = false;
243                         STATS_UNLOCK();
244                         if (settings.verbose > 1)
245                             fprintf(stderr, "Hash table expansion done\n");
246                     }
247 
248             } else {
249                 usleep(10*1000);
250             }
251 
252             if (item_lock) {
253                 item_trylock_unlock(item_lock);
254                 item_lock = NULL;
255             }
256         }
257 
258         if (!expanding) {
259             /* We are done expanding.. just wait for next invocation */
260             started_expanding = false;
261             pthread_cond_wait(&maintenance_cond, &maintenance_lock);
262             /* assoc_expand() swaps out the hash table entirely, so we need
263              * all threads to not hold any references related to the hash
264              * table while this happens.
265              * This is instead of a more complex, possibly slower algorithm to
266              * allow dynamic hash table expansion without causing significant
267              * wait times.
268              */
269             pause_threads(PAUSE_ALL_THREADS);
270             assoc_expand();
271             pause_threads(RESUME_ALL_THREADS);
272         }
273     }
274     return NULL;
275 }
276 
277 static pthread_t maintenance_tid;
278 
start_assoc_maintenance_thread()279 int start_assoc_maintenance_thread() {
280     int ret;
281     char *env = getenv("MEMCACHED_HASH_BULK_MOVE");
282     if (env != NULL) {
283         hash_bulk_move = atoi(env);
284         if (hash_bulk_move == 0) {
285             hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
286         }
287     }
288     pthread_mutex_init(&maintenance_lock, NULL);
289     if ((ret = pthread_create(&maintenance_tid, NULL,
290                               assoc_maintenance_thread, NULL)) != 0) {
291         fprintf(stderr, "Can't create thread: %s\n", strerror(ret));
292         return -1;
293     }
294     return 0;
295 }
296 
stop_assoc_maintenance_thread()297 void stop_assoc_maintenance_thread() {
298     mutex_lock(&maintenance_lock);
299     do_run_maintenance_thread = 0;
300     pthread_cond_signal(&maintenance_cond);
301     mutex_unlock(&maintenance_lock);
302 
303     /* Wait for the maintenance thread to stop */
304     pthread_join(maintenance_tid, NULL);
305 }
306 
307