1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Hash table
4 *
5 * The hash function used here is by Bob Jenkins, 1996:
6 * <http://burtleburtle.net/bob/hash/doobs.html>
7 * "By Bob Jenkins, 1996. [email protected].
8 * You may use this code any way you wish, private, educational,
9 * or commercial. It's free."
10 *
11 * The rest of the file is licensed under the BSD license. See LICENSE.
12 */
13
14 #include "memcached.h"
15 #include <sys/stat.h>
16 #include <sys/socket.h>
17 #include <sys/resource.h>
18 #include <signal.h>
19 #include <fcntl.h>
20 #include <netinet/in.h>
21 #include <errno.h>
22 #include <stdlib.h>
23 #include <stdio.h>
24 #include <string.h>
25 #include <assert.h>
26 #include <pthread.h>
27
28 static pthread_cond_t maintenance_cond = PTHREAD_COND_INITIALIZER;
29 static pthread_mutex_t maintenance_lock = PTHREAD_MUTEX_INITIALIZER;
30
31 /* how many powers of 2's worth of buckets we use */
32 unsigned int hashpower = HASHPOWER_DEFAULT;
33
34 #define hashsize(n) ((uint64_t)1<<(n))
35 #define hashmask(n) (hashsize(n)-1)
36
37 /* Main hash table. This is where we look except during expansion. */
38 static item** primary_hashtable = 0;
39
40 /*
41 * Previous hash table. During expansion, we look here for keys that haven't
42 * been moved over to the primary yet.
43 */
44 static item** old_hashtable = 0;
45
46 /* Flag: Are we in the middle of expanding now? */
47 static bool expanding = false;
48
49 /*
50 * During expansion we migrate values with bucket granularity; this is how
51 * far we've gotten so far. Ranges from 0 .. hashsize(hashpower - 1) - 1.
52 */
53 static uint64_t expand_bucket = 0;
54
assoc_init(const int hashtable_init)55 void assoc_init(const int hashtable_init) {
56 if (hashtable_init) {
57 hashpower = hashtable_init;
58 }
59 primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));
60 if (! primary_hashtable) {
61 fprintf(stderr, "Failed to init hashtable.\n");
62 exit(EXIT_FAILURE);
63 }
64 STATS_LOCK();
65 stats_state.hash_power_level = hashpower;
66 stats_state.hash_bytes = hashsize(hashpower) * sizeof(void *);
67 STATS_UNLOCK();
68 }
69
assoc_find(const char * key,const size_t nkey,const uint32_t hv)70 item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) {
71 item *it;
72 uint64_t oldbucket;
73
74 if (expanding &&
75 (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
76 {
77 it = old_hashtable[oldbucket];
78 } else {
79 it = primary_hashtable[hv & hashmask(hashpower)];
80 }
81
82 item *ret = NULL;
83 #ifdef ENABLE_DTRACE
84 int depth = 0;
85 #endif
86 while (it) {
87 if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) {
88 ret = it;
89 break;
90 }
91 it = it->h_next;
92 #ifdef ENABLE_DTRACE
93 ++depth;
94 #endif
95 }
96 MEMCACHED_ASSOC_FIND(key, nkey, depth);
97 return ret;
98 }
99
100 /* returns the address of the item pointer before the key. if *item == 0,
101 the item wasn't found */
102
_hashitem_before(const char * key,const size_t nkey,const uint32_t hv)103 static item** _hashitem_before (const char *key, const size_t nkey, const uint32_t hv) {
104 item **pos;
105 uint64_t oldbucket;
106
107 if (expanding &&
108 (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
109 {
110 pos = &old_hashtable[oldbucket];
111 } else {
112 pos = &primary_hashtable[hv & hashmask(hashpower)];
113 }
114
115 while (*pos && ((nkey != (*pos)->nkey) || memcmp(key, ITEM_key(*pos), nkey))) {
116 pos = &(*pos)->h_next;
117 }
118 return pos;
119 }
120
121 /* grows the hashtable to the next power of 2. */
assoc_expand(void)122 static void assoc_expand(void) {
123 old_hashtable = primary_hashtable;
124
125 primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
126 if (primary_hashtable) {
127 if (settings.verbose > 1)
128 fprintf(stderr, "Hash table expansion starting\n");
129 hashpower++;
130 expanding = true;
131 expand_bucket = 0;
132 STATS_LOCK();
133 stats_state.hash_power_level = hashpower;
134 stats_state.hash_bytes += hashsize(hashpower) * sizeof(void *);
135 stats_state.hash_is_expanding = true;
136 STATS_UNLOCK();
137 } else {
138 primary_hashtable = old_hashtable;
139 /* Bad news, but we can keep running. */
140 }
141 }
142
assoc_start_expand(uint64_t curr_items)143 void assoc_start_expand(uint64_t curr_items) {
144 if (pthread_mutex_trylock(&maintenance_lock) == 0) {
145 if (curr_items > (hashsize(hashpower) * 3) / 2 && hashpower < HASHPOWER_MAX) {
146 pthread_cond_signal(&maintenance_cond);
147 }
148 pthread_mutex_unlock(&maintenance_lock);
149 }
150 }
151
152 /* Note: this isn't an assoc_update. The key must not already exist to call this */
assoc_insert(item * it,const uint32_t hv)153 int assoc_insert(item *it, const uint32_t hv) {
154 uint64_t oldbucket;
155
156 // assert(assoc_find(ITEM_key(it), it->nkey) == 0); /* shouldn't have duplicately named things defined */
157
158 if (expanding &&
159 (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
160 {
161 it->h_next = old_hashtable[oldbucket];
162 old_hashtable[oldbucket] = it;
163 } else {
164 it->h_next = primary_hashtable[hv & hashmask(hashpower)];
165 primary_hashtable[hv & hashmask(hashpower)] = it;
166 }
167
168 MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey);
169 return 1;
170 }
171
assoc_delete(const char * key,const size_t nkey,const uint32_t hv)172 void assoc_delete(const char *key, const size_t nkey, const uint32_t hv) {
173 item **before = _hashitem_before(key, nkey, hv);
174
175 if (*before) {
176 item *nxt;
177 /* The DTrace probe cannot be triggered as the last instruction
178 * due to possible tail-optimization by the compiler
179 */
180 MEMCACHED_ASSOC_DELETE(key, nkey);
181 nxt = (*before)->h_next;
182 (*before)->h_next = 0; /* probably pointless, but whatever. */
183 *before = nxt;
184 return;
185 }
186 /* Note: we never actually get here. the callers don't delete things
187 they can't find. */
188 assert(*before != 0);
189 }
190
191
192 static volatile int do_run_maintenance_thread = 1;
193
194 #define DEFAULT_HASH_BULK_MOVE 1
195 int hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
196
assoc_maintenance_thread(void * arg)197 static void *assoc_maintenance_thread(void *arg) {
198
199 mutex_lock(&maintenance_lock);
200 while (do_run_maintenance_thread) {
201 int ii = 0;
202
203 /* There is only one expansion thread, so no need to global lock. */
204 for (ii = 0; ii < hash_bulk_move && expanding; ++ii) {
205 item *it, *next;
206 uint64_t bucket;
207 void *item_lock = NULL;
208
209 /* bucket = hv & hashmask(hashpower) =>the bucket of hash table
210 * is the lowest N bits of the hv, and the bucket of item_locks is
211 * also the lowest M bits of hv, and N is greater than M.
212 * So we can process expanding with only one item_lock. cool! */
213 if ((item_lock = item_trylock(expand_bucket))) {
214 for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
215 next = it->h_next;
216 bucket = hash(ITEM_key(it), it->nkey) & hashmask(hashpower);
217 it->h_next = primary_hashtable[bucket];
218 primary_hashtable[bucket] = it;
219 }
220
221 old_hashtable[expand_bucket] = NULL;
222
223 expand_bucket++;
224 if (expand_bucket == hashsize(hashpower - 1)) {
225 expanding = false;
226 free(old_hashtable);
227 STATS_LOCK();
228 stats_state.hash_bytes -= hashsize(hashpower - 1) * sizeof(void *);
229 stats_state.hash_is_expanding = false;
230 STATS_UNLOCK();
231 if (settings.verbose > 1)
232 fprintf(stderr, "Hash table expansion done\n");
233 }
234
235 } else {
236 usleep(10*1000 - 1);
237 }
238
239 if (item_lock) {
240 item_trylock_unlock(item_lock);
241 item_lock = NULL;
242 }
243 }
244
245 if (!expanding) {
246 /* We are done expanding.. just wait for next invocation */
247 pthread_cond_wait(&maintenance_cond, &maintenance_lock);
248 /* assoc_expand() swaps out the hash table entirely, so we need
249 * all threads to not hold any references related to the hash
250 * table while this happens.
251 * This is instead of a more complex, possibly slower algorithm to
252 * allow dynamic hash table expansion without causing significant
253 * wait times.
254 */
255 if (do_run_maintenance_thread) {
256 pause_threads(PAUSE_ALL_THREADS);
257 assoc_expand();
258 pause_threads(RESUME_ALL_THREADS);
259 }
260 }
261 }
262 mutex_unlock(&maintenance_lock);
263 return NULL;
264 }
265
266 static pthread_t maintenance_tid;
267
start_assoc_maintenance_thread(void)268 int start_assoc_maintenance_thread(void) {
269 int ret;
270 char *env = getenv("MEMCACHED_HASH_BULK_MOVE");
271 if (env != NULL) {
272 hash_bulk_move = atoi(env);
273 if (hash_bulk_move == 0) {
274 hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
275 }
276 }
277
278 if ((ret = pthread_create(&maintenance_tid, NULL,
279 assoc_maintenance_thread, NULL)) != 0) {
280 fprintf(stderr, "Can't create thread: %s\n", strerror(ret));
281 return -1;
282 }
283 thread_setname(maintenance_tid, "mc-assocmaint");
284 return 0;
285 }
286
stop_assoc_maintenance_thread(void)287 void stop_assoc_maintenance_thread(void) {
288 mutex_lock(&maintenance_lock);
289 do_run_maintenance_thread = 0;
290 pthread_cond_signal(&maintenance_cond);
291 mutex_unlock(&maintenance_lock);
292
293 /* Wait for the maintenance thread to stop */
294 pthread_join(maintenance_tid, NULL);
295 }
296
297 struct assoc_iterator {
298 uint64_t bucket;
299 item *it;
300 item *next;
301 bool bucket_locked;
302 };
303
assoc_get_iterator(void)304 void *assoc_get_iterator(void) {
305 struct assoc_iterator *iter = calloc(1, sizeof(struct assoc_iterator));
306 if (iter == NULL) {
307 return NULL;
308 }
309 // this will hang the caller while a hash table expansion is running.
310 if (mutex_trylock(&maintenance_lock) == 0) {
311 return iter;
312 } else {
313 return NULL;
314 }
315 }
316
assoc_iterate(void * iterp,item ** it)317 bool assoc_iterate(void *iterp, item **it) {
318 struct assoc_iterator *iter = (struct assoc_iterator *) iterp;
319 *it = NULL;
320 // - if locked bucket and next, update next and return
321 if (iter->bucket_locked) {
322 if (iter->next != NULL) {
323 iter->it = iter->next;
324 iter->next = iter->it->h_next;
325 *it = iter->it;
326 } else {
327 // unlock previous bucket, if any
328 item_unlock(iter->bucket);
329 // iterate the bucket post since it starts at 0.
330 iter->bucket++;
331 iter->bucket_locked = false;
332 *it = NULL;
333 }
334 return true;
335 }
336
337 // - loop until we hit the end or find something.
338 if (iter->bucket != hashsize(hashpower)) {
339 // - lock next bucket
340 item_lock(iter->bucket);
341 iter->bucket_locked = true;
342 // - only check the primary hash table since expand is blocked.
343 iter->it = primary_hashtable[iter->bucket];
344 if (iter->it != NULL) {
345 // - set it, next and return
346 iter->next = iter->it->h_next;
347 *it = iter->it;
348 } else {
349 // - nothing found in this bucket, try next.
350 item_unlock(iter->bucket);
351 iter->bucket_locked = false;
352 iter->bucket++;
353 }
354 } else {
355 return false;
356 }
357
358 return true;
359 }
360
assoc_iterate_final(void * iterp)361 void assoc_iterate_final(void *iterp) {
362 struct assoc_iterator *iter = (struct assoc_iterator *) iterp;
363 if (iter->bucket_locked) {
364 item_unlock(iter->bucket);
365 }
366 mutex_unlock(&maintenance_lock);
367 free(iter);
368 }
369