1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  * Hash table
4  *
5  * The hash function used here is by Bob Jenkins, 1996:
6  *    <http://burtleburtle.net/bob/hash/doobs.html>
7  *       "By Bob Jenkins, 1996.  [email protected].
8  *       You may use this code any way you wish, private, educational,
9  *       or commercial.  It's free."
10  *
11  * The rest of the file is licensed under the BSD license.  See LICENSE.
12  */
13 
14 #include "memcached.h"
15 #include <sys/stat.h>
16 #include <sys/socket.h>
17 #include <sys/resource.h>
18 #include <signal.h>
19 #include <fcntl.h>
20 #include <netinet/in.h>
21 #include <errno.h>
22 #include <stdlib.h>
23 #include <stdio.h>
24 #include <string.h>
25 #include <assert.h>
26 #include <pthread.h>
27 
28 static pthread_cond_t maintenance_cond = PTHREAD_COND_INITIALIZER;
29 static pthread_mutex_t maintenance_lock = PTHREAD_MUTEX_INITIALIZER;
30 
31 /* how many powers of 2's worth of buckets we use */
32 unsigned int hashpower = HASHPOWER_DEFAULT;
33 
34 #define hashsize(n) ((uint64_t)1<<(n))
35 #define hashmask(n) (hashsize(n)-1)
36 
37 /* Main hash table. This is where we look except during expansion. */
38 static item** primary_hashtable = 0;
39 
40 /*
41  * Previous hash table. During expansion, we look here for keys that haven't
42  * been moved over to the primary yet.
43  */
44 static item** old_hashtable = 0;
45 
46 /* Flag: Are we in the middle of expanding now? */
47 static bool expanding = false;
48 
49 /*
50  * During expansion we migrate values with bucket granularity; this is how
51  * far we've gotten so far. Ranges from 0 .. hashsize(hashpower - 1) - 1.
52  */
53 static uint64_t expand_bucket = 0;
54 
assoc_init(const int hashtable_init)55 void assoc_init(const int hashtable_init) {
56     if (hashtable_init) {
57         hashpower = hashtable_init;
58     }
59     primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));
60     if (! primary_hashtable) {
61         fprintf(stderr, "Failed to init hashtable.\n");
62         exit(EXIT_FAILURE);
63     }
64     STATS_LOCK();
65     stats_state.hash_power_level = hashpower;
66     stats_state.hash_bytes = hashsize(hashpower) * sizeof(void *);
67     STATS_UNLOCK();
68 }
69 
assoc_find(const char * key,const size_t nkey,const uint32_t hv)70 item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) {
71     item *it;
72     uint64_t oldbucket;
73 
74     if (expanding &&
75         (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
76     {
77         it = old_hashtable[oldbucket];
78     } else {
79         it = primary_hashtable[hv & hashmask(hashpower)];
80     }
81 
82     item *ret = NULL;
83 #ifdef ENABLE_DTRACE
84     int depth = 0;
85 #endif
86     while (it) {
87         if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) {
88             ret = it;
89             break;
90         }
91         it = it->h_next;
92 #ifdef ENABLE_DTRACE
93         ++depth;
94 #endif
95     }
96     MEMCACHED_ASSOC_FIND(key, nkey, depth);
97     return ret;
98 }
99 
100 /* returns the address of the item pointer before the key.  if *item == 0,
101    the item wasn't found */
102 
_hashitem_before(const char * key,const size_t nkey,const uint32_t hv)103 static item** _hashitem_before (const char *key, const size_t nkey, const uint32_t hv) {
104     item **pos;
105     uint64_t oldbucket;
106 
107     if (expanding &&
108         (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
109     {
110         pos = &old_hashtable[oldbucket];
111     } else {
112         pos = &primary_hashtable[hv & hashmask(hashpower)];
113     }
114 
115     while (*pos && ((nkey != (*pos)->nkey) || memcmp(key, ITEM_key(*pos), nkey))) {
116         pos = &(*pos)->h_next;
117     }
118     return pos;
119 }
120 
121 /* grows the hashtable to the next power of 2. */
assoc_expand(void)122 static void assoc_expand(void) {
123     old_hashtable = primary_hashtable;
124 
125     primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
126     if (primary_hashtable) {
127         if (settings.verbose > 1)
128             fprintf(stderr, "Hash table expansion starting\n");
129         hashpower++;
130         expanding = true;
131         expand_bucket = 0;
132         STATS_LOCK();
133         stats_state.hash_power_level = hashpower;
134         stats_state.hash_bytes += hashsize(hashpower) * sizeof(void *);
135         stats_state.hash_is_expanding = true;
136         STATS_UNLOCK();
137     } else {
138         primary_hashtable = old_hashtable;
139         /* Bad news, but we can keep running. */
140     }
141 }
142 
assoc_start_expand(uint64_t curr_items)143 void assoc_start_expand(uint64_t curr_items) {
144     if (pthread_mutex_trylock(&maintenance_lock) == 0) {
145         if (curr_items > (hashsize(hashpower) * 3) / 2 && hashpower < HASHPOWER_MAX) {
146             pthread_cond_signal(&maintenance_cond);
147         }
148         pthread_mutex_unlock(&maintenance_lock);
149     }
150 }
151 
152 /* Note: this isn't an assoc_update.  The key must not already exist to call this */
assoc_insert(item * it,const uint32_t hv)153 int assoc_insert(item *it, const uint32_t hv) {
154     uint64_t oldbucket;
155 
156 //    assert(assoc_find(ITEM_key(it), it->nkey) == 0);  /* shouldn't have duplicately named things defined */
157 
158     if (expanding &&
159         (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
160     {
161         it->h_next = old_hashtable[oldbucket];
162         old_hashtable[oldbucket] = it;
163     } else {
164         it->h_next = primary_hashtable[hv & hashmask(hashpower)];
165         primary_hashtable[hv & hashmask(hashpower)] = it;
166     }
167 
168     MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey);
169     return 1;
170 }
171 
assoc_delete(const char * key,const size_t nkey,const uint32_t hv)172 void assoc_delete(const char *key, const size_t nkey, const uint32_t hv) {
173     item **before = _hashitem_before(key, nkey, hv);
174 
175     if (*before) {
176         item *nxt;
177         /* The DTrace probe cannot be triggered as the last instruction
178          * due to possible tail-optimization by the compiler
179          */
180         MEMCACHED_ASSOC_DELETE(key, nkey);
181         nxt = (*before)->h_next;
182         (*before)->h_next = 0;   /* probably pointless, but whatever. */
183         *before = nxt;
184         return;
185     }
186     /* Note:  we never actually get here.  the callers don't delete things
187        they can't find. */
188     assert(*before != 0);
189 }
190 
191 
192 static volatile int do_run_maintenance_thread = 1;
193 
194 #define DEFAULT_HASH_BULK_MOVE 1
195 int hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
196 
assoc_maintenance_thread(void * arg)197 static void *assoc_maintenance_thread(void *arg) {
198 
199     mutex_lock(&maintenance_lock);
200     while (do_run_maintenance_thread) {
201         int ii = 0;
202 
203         /* There is only one expansion thread, so no need to global lock. */
204         for (ii = 0; ii < hash_bulk_move && expanding; ++ii) {
205             item *it, *next;
206             uint64_t bucket;
207             void *item_lock = NULL;
208 
209             /* bucket = hv & hashmask(hashpower) =>the bucket of hash table
210              * is the lowest N bits of the hv, and the bucket of item_locks is
211              *  also the lowest M bits of hv, and N is greater than M.
212              *  So we can process expanding with only one item_lock. cool! */
213             if ((item_lock = item_trylock(expand_bucket))) {
214                     for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
215                         next = it->h_next;
216                         bucket = hash(ITEM_key(it), it->nkey) & hashmask(hashpower);
217                         it->h_next = primary_hashtable[bucket];
218                         primary_hashtable[bucket] = it;
219                     }
220 
221                     old_hashtable[expand_bucket] = NULL;
222 
223                     expand_bucket++;
224                     if (expand_bucket == hashsize(hashpower - 1)) {
225                         expanding = false;
226                         free(old_hashtable);
227                         STATS_LOCK();
228                         stats_state.hash_bytes -= hashsize(hashpower - 1) * sizeof(void *);
229                         stats_state.hash_is_expanding = false;
230                         STATS_UNLOCK();
231                         if (settings.verbose > 1)
232                             fprintf(stderr, "Hash table expansion done\n");
233                     }
234 
235             } else {
236                 usleep(10*1000 - 1);
237             }
238 
239             if (item_lock) {
240                 item_trylock_unlock(item_lock);
241                 item_lock = NULL;
242             }
243         }
244 
245         if (!expanding) {
246             /* We are done expanding.. just wait for next invocation */
247             pthread_cond_wait(&maintenance_cond, &maintenance_lock);
248             /* assoc_expand() swaps out the hash table entirely, so we need
249              * all threads to not hold any references related to the hash
250              * table while this happens.
251              * This is instead of a more complex, possibly slower algorithm to
252              * allow dynamic hash table expansion without causing significant
253              * wait times.
254              */
255             if (do_run_maintenance_thread) {
256                 pause_threads(PAUSE_ALL_THREADS);
257                 assoc_expand();
258                 pause_threads(RESUME_ALL_THREADS);
259             }
260         }
261     }
262     mutex_unlock(&maintenance_lock);
263     return NULL;
264 }
265 
266 static pthread_t maintenance_tid;
267 
start_assoc_maintenance_thread(void)268 int start_assoc_maintenance_thread(void) {
269     int ret;
270     char *env = getenv("MEMCACHED_HASH_BULK_MOVE");
271     if (env != NULL) {
272         hash_bulk_move = atoi(env);
273         if (hash_bulk_move == 0) {
274             hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
275         }
276     }
277 
278     if ((ret = pthread_create(&maintenance_tid, NULL,
279                               assoc_maintenance_thread, NULL)) != 0) {
280         fprintf(stderr, "Can't create thread: %s\n", strerror(ret));
281         return -1;
282     }
283     thread_setname(maintenance_tid, "mc-assocmaint");
284     return 0;
285 }
286 
stop_assoc_maintenance_thread(void)287 void stop_assoc_maintenance_thread(void) {
288     mutex_lock(&maintenance_lock);
289     do_run_maintenance_thread = 0;
290     pthread_cond_signal(&maintenance_cond);
291     mutex_unlock(&maintenance_lock);
292 
293     /* Wait for the maintenance thread to stop */
294     pthread_join(maintenance_tid, NULL);
295 }
296 
297 struct assoc_iterator {
298     uint64_t bucket;
299     item *it;
300     item *next;
301     bool bucket_locked;
302 };
303 
assoc_get_iterator(void)304 void *assoc_get_iterator(void) {
305     struct assoc_iterator *iter = calloc(1, sizeof(struct assoc_iterator));
306     if (iter == NULL) {
307         return NULL;
308     }
309     // this will hang the caller while a hash table expansion is running.
310     if (mutex_trylock(&maintenance_lock) == 0) {
311         return iter;
312     } else {
313         return NULL;
314     }
315 }
316 
assoc_iterate(void * iterp,item ** it)317 bool assoc_iterate(void *iterp, item **it) {
318     struct assoc_iterator *iter = (struct assoc_iterator *) iterp;
319     *it = NULL;
320     // - if locked bucket and next, update next and return
321     if (iter->bucket_locked) {
322         if (iter->next != NULL) {
323             iter->it = iter->next;
324             iter->next = iter->it->h_next;
325             *it = iter->it;
326         } else {
327             // unlock previous bucket, if any
328             item_unlock(iter->bucket);
329             // iterate the bucket post since it starts at 0.
330             iter->bucket++;
331             iter->bucket_locked = false;
332             *it = NULL;
333         }
334         return true;
335     }
336 
337     // - loop until we hit the end or find something.
338     if (iter->bucket != hashsize(hashpower)) {
339         // - lock next bucket
340         item_lock(iter->bucket);
341         iter->bucket_locked = true;
342         // - only check the primary hash table since expand is blocked.
343         iter->it = primary_hashtable[iter->bucket];
344         if (iter->it != NULL) {
345             // - set it, next and return
346             iter->next = iter->it->h_next;
347             *it = iter->it;
348         } else {
349             // - nothing found in this bucket, try next.
350             item_unlock(iter->bucket);
351             iter->bucket_locked = false;
352             iter->bucket++;
353         }
354     } else {
355         return false;
356     }
357 
358     return true;
359 }
360 
assoc_iterate_final(void * iterp)361 void assoc_iterate_final(void *iterp) {
362     struct assoc_iterator *iter = (struct assoc_iterator *) iterp;
363     if (iter->bucket_locked) {
364         item_unlock(iter->bucket);
365     }
366     mutex_unlock(&maintenance_lock);
367     free(iter);
368 }
369