xref: /memcached-1.4.29/items.c (revision 690a9a9d)
1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 #include "memcached.h"
3 #include <sys/stat.h>
4 #include <sys/socket.h>
5 #include <sys/resource.h>
6 #include <fcntl.h>
7 #include <netinet/in.h>
8 #include <errno.h>
9 #include <stdlib.h>
10 #include <stdio.h>
11 #include <signal.h>
12 #include <string.h>
13 #include <time.h>
14 #include <assert.h>
15 #include <unistd.h>
16 
17 /* Forward Declarations */
18 static void item_link_q(item *it);
19 static void item_unlink_q(item *it);
20 
21 #define HOT_LRU 0
22 #define WARM_LRU 64
23 #define COLD_LRU 128
24 #define NOEXP_LRU 192
25 static unsigned int lru_type_map[4] = {HOT_LRU, WARM_LRU, COLD_LRU, NOEXP_LRU};
26 
27 #define CLEAR_LRU(id) (id & ~(3<<6))
28 
29 #define LARGEST_ID POWER_LARGEST
30 typedef struct {
31     uint64_t evicted;
32     uint64_t evicted_nonzero;
33     uint64_t reclaimed;
34     uint64_t outofmemory;
35     uint64_t tailrepairs;
36     uint64_t expired_unfetched; /* items reclaimed but never touched */
37     uint64_t evicted_unfetched; /* items evicted but never touched */
38     uint64_t crawler_reclaimed;
39     uint64_t crawler_items_checked;
40     uint64_t lrutail_reflocked;
41     uint64_t moves_to_cold;
42     uint64_t moves_to_warm;
43     uint64_t moves_within_lru;
44     uint64_t direct_reclaims;
45     rel_time_t evicted_time;
46 } itemstats_t;
47 
48 typedef struct {
49     uint64_t histo[61];
50     uint64_t ttl_hourplus;
51     uint64_t noexp;
52     uint64_t reclaimed;
53     uint64_t seen;
54     rel_time_t start_time;
55     rel_time_t end_time;
56     bool run_complete;
57 } crawlerstats_t;
58 
59 static item *heads[LARGEST_ID];
60 static item *tails[LARGEST_ID];
61 static crawler crawlers[LARGEST_ID];
62 static itemstats_t itemstats[LARGEST_ID];
63 static unsigned int sizes[LARGEST_ID];
64 static uint64_t sizes_bytes[LARGEST_ID];
65 static unsigned int *stats_sizes_hist = NULL;
66 static uint64_t stats_sizes_cas_min = 0;
67 static int stats_sizes_buckets = 0;
68 static crawlerstats_t crawlerstats[MAX_NUMBER_OF_SLAB_CLASSES];
69 
70 static int crawler_count = 0;
71 static volatile int do_run_lru_crawler_thread = 0;
72 static volatile int do_run_lru_maintainer_thread = 0;
73 static int lru_crawler_initialized = 0;
74 static int lru_maintainer_initialized = 0;
75 static int lru_maintainer_check_clsid = 0;
76 static pthread_mutex_t lru_crawler_lock = PTHREAD_MUTEX_INITIALIZER;
77 static pthread_cond_t  lru_crawler_cond = PTHREAD_COND_INITIALIZER;
78 static pthread_mutex_t lru_crawler_stats_lock = PTHREAD_MUTEX_INITIALIZER;
79 static pthread_mutex_t lru_maintainer_lock = PTHREAD_MUTEX_INITIALIZER;
80 static pthread_mutex_t cas_id_lock = PTHREAD_MUTEX_INITIALIZER;
81 static pthread_mutex_t stats_sizes_lock = PTHREAD_MUTEX_INITIALIZER;
82 
item_stats_reset(void)83 void item_stats_reset(void) {
84     int i;
85     for (i = 0; i < LARGEST_ID; i++) {
86         pthread_mutex_lock(&lru_locks[i]);
87         memset(&itemstats[i], 0, sizeof(itemstats_t));
88         pthread_mutex_unlock(&lru_locks[i]);
89     }
90 }
91 
92 static int lru_pull_tail(const int orig_id, const int cur_lru,
93         const uint64_t total_bytes, const bool do_evict);
94 static int lru_crawler_start(uint32_t id, uint32_t remaining);
95 
96 /* Get the next CAS id for a new item. */
97 /* TODO: refactor some atomics for this. */
get_cas_id(void)98 uint64_t get_cas_id(void) {
99     static uint64_t cas_id = 0;
100     pthread_mutex_lock(&cas_id_lock);
101     uint64_t next_id = ++cas_id;
102     pthread_mutex_unlock(&cas_id_lock);
103     return next_id;
104 }
105 
item_is_flushed(item * it)106 int item_is_flushed(item *it) {
107     rel_time_t oldest_live = settings.oldest_live;
108     uint64_t cas = ITEM_get_cas(it);
109     uint64_t oldest_cas = settings.oldest_cas;
110     if (oldest_live == 0 || oldest_live > current_time)
111         return 0;
112     if ((it->time <= oldest_live)
113             || (oldest_cas != 0 && cas != 0 && cas < oldest_cas)) {
114         return 1;
115     }
116     return 0;
117 }
118 
noexp_lru_size(int slabs_clsid)119 static unsigned int noexp_lru_size(int slabs_clsid) {
120     int id = CLEAR_LRU(slabs_clsid);
121     id |= NOEXP_LRU;
122     unsigned int ret;
123     pthread_mutex_lock(&lru_locks[id]);
124     ret = sizes_bytes[id];
125     pthread_mutex_unlock(&lru_locks[id]);
126     return ret;
127 }
128 
129 /* Enable this for reference-count debugging. */
130 #if 0
131 # define DEBUG_REFCNT(it,op) \
132                 fprintf(stderr, "item %x refcnt(%c) %d %c%c%c\n", \
133                         it, op, it->refcount, \
134                         (it->it_flags & ITEM_LINKED) ? 'L' : ' ', \
135                         (it->it_flags & ITEM_SLABBED) ? 'S' : ' ')
136 #else
137 # define DEBUG_REFCNT(it,op) while(0)
138 #endif
139 
140 /**
141  * Generates the variable-sized part of the header for an object.
142  *
143  * key     - The key
144  * nkey    - The length of the key
145  * flags   - key flags
146  * nbytes  - Number of bytes to hold value and addition CRLF terminator
147  * suffix  - Buffer for the "VALUE" line suffix (flags, size).
148  * nsuffix - The length of the suffix is stored here.
149  *
150  * Returns the total size of the header.
151  */
item_make_header(const uint8_t nkey,const unsigned int flags,const int nbytes,char * suffix,uint8_t * nsuffix)152 static size_t item_make_header(const uint8_t nkey, const unsigned int flags, const int nbytes,
153                      char *suffix, uint8_t *nsuffix) {
154     /* suffix is defined at 40 chars elsewhere.. */
155     *nsuffix = (uint8_t) snprintf(suffix, 40, " %u %d\r\n", flags, nbytes - 2);
156     return sizeof(item) + nkey + *nsuffix + nbytes;
157 }
158 
do_item_alloc(char * key,const size_t nkey,const unsigned int flags,const rel_time_t exptime,const int nbytes)159 item *do_item_alloc(char *key, const size_t nkey, const unsigned int flags,
160                     const rel_time_t exptime, const int nbytes) {
161     int i;
162     uint8_t nsuffix;
163     item *it = NULL;
164     char suffix[40];
165     size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix);
166     if (settings.use_cas) {
167         ntotal += sizeof(uint64_t);
168     }
169 
170     unsigned int id = slabs_clsid(ntotal);
171     if (id == 0)
172         return 0;
173 
174     /* If no memory is available, attempt a direct LRU juggle/eviction */
175     /* This is a race in order to simplify lru_pull_tail; in cases where
176      * locked items are on the tail, you want them to fall out and cause
177      * occasional OOM's, rather than internally work around them.
178      * This also gives one fewer code path for slab alloc/free
179      */
180     /* TODO: if power_largest, try a lot more times? or a number of times
181      * based on how many chunks the new object should take up?
182      * or based on the size of an object lru_pull_tail() says it evicted?
183      * This is a classical GC problem if "large items" are of too varying of
184      * sizes. This is actually okay here since the larger the data, the more
185      * bandwidth it takes, the more time we can loop in comparison to serving
186      * and replacing small items.
187      */
188     for (i = 0; i < 10; i++) {
189         uint64_t total_bytes;
190         /* Try to reclaim memory first */
191         if (!settings.lru_maintainer_thread) {
192             lru_pull_tail(id, COLD_LRU, 0, false);
193         }
194         it = slabs_alloc(ntotal, id, &total_bytes, 0);
195 
196         if (settings.expirezero_does_not_evict)
197             total_bytes -= noexp_lru_size(id);
198 
199         if (it == NULL) {
200             if (settings.lru_maintainer_thread) {
201                 lru_pull_tail(id, HOT_LRU, total_bytes, false);
202                 lru_pull_tail(id, WARM_LRU, total_bytes, false);
203                 if (lru_pull_tail(id, COLD_LRU, total_bytes, true) <= 0)
204                     break;
205             } else {
206                 if (lru_pull_tail(id, COLD_LRU, 0, true) <= 0)
207                     break;
208             }
209         } else {
210             break;
211         }
212     }
213 
214     if (i > 0) {
215         pthread_mutex_lock(&lru_locks[id]);
216         itemstats[id].direct_reclaims += i;
217         pthread_mutex_unlock(&lru_locks[id]);
218     }
219 
220     if (it == NULL) {
221         pthread_mutex_lock(&lru_locks[id]);
222         itemstats[id].outofmemory++;
223         pthread_mutex_unlock(&lru_locks[id]);
224         return NULL;
225     }
226 
227     assert(it->slabs_clsid == 0);
228     //assert(it != heads[id]);
229 
230     /* Refcount is seeded to 1 by slabs_alloc() */
231     it->next = it->prev = 0;
232 
233     /* Items are initially loaded into the HOT_LRU. This is '0' but I want at
234      * least a note here. Compiler (hopefully?) optimizes this out.
235      */
236     if (settings.lru_maintainer_thread) {
237         if (exptime == 0 && settings.expirezero_does_not_evict) {
238             id |= NOEXP_LRU;
239         } else {
240             id |= HOT_LRU;
241         }
242     } else {
243         /* There is only COLD in compat-mode */
244         id |= COLD_LRU;
245     }
246     it->slabs_clsid = id;
247 
248     DEBUG_REFCNT(it, '*');
249     it->it_flags |= settings.use_cas ? ITEM_CAS : 0;
250     it->nkey = nkey;
251     it->nbytes = nbytes;
252     memcpy(ITEM_key(it), key, nkey);
253     it->exptime = exptime;
254     memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);
255     it->nsuffix = nsuffix;
256 
257     /* Need to shuffle the pointer stored in h_next into it->data. */
258     if (it->it_flags & ITEM_CHUNKED) {
259         item_chunk *chunk = (item_chunk *) ITEM_data(it);
260 
261         chunk->next = (item_chunk *) it->h_next;
262         chunk->prev = 0;
263         chunk->head = it;
264         /* Need to chain back into the head's chunk */
265         chunk->next->prev = chunk;
266         chunk->size = chunk->next->size - ((char *)chunk - (char *)it);
267         chunk->used = 0;
268         assert(chunk->size > 0);
269     }
270     it->h_next = 0;
271 
272     return it;
273 }
274 
item_free(item * it)275 void item_free(item *it) {
276     size_t ntotal = ITEM_ntotal(it);
277     unsigned int clsid;
278     assert((it->it_flags & ITEM_LINKED) == 0);
279     assert(it != heads[it->slabs_clsid]);
280     assert(it != tails[it->slabs_clsid]);
281     assert(it->refcount == 0);
282 
283     /* so slab size changer can tell later if item is already free or not */
284     clsid = ITEM_clsid(it);
285     DEBUG_REFCNT(it, 'F');
286     slabs_free(it, ntotal, clsid);
287 }
288 
289 /**
290  * Returns true if an item will fit in the cache (its size does not exceed
291  * the maximum for a cache entry.)
292  */
item_size_ok(const size_t nkey,const int flags,const int nbytes)293 bool item_size_ok(const size_t nkey, const int flags, const int nbytes) {
294     char prefix[40];
295     uint8_t nsuffix;
296 
297     size_t ntotal = item_make_header(nkey + 1, flags, nbytes,
298                                      prefix, &nsuffix);
299     if (settings.use_cas) {
300         ntotal += sizeof(uint64_t);
301     }
302 
303     return slabs_clsid(ntotal) != 0;
304 }
305 
do_item_link_q(item * it)306 static void do_item_link_q(item *it) { /* item is the new head */
307     item **head, **tail;
308     assert((it->it_flags & ITEM_SLABBED) == 0);
309 
310     head = &heads[it->slabs_clsid];
311     tail = &tails[it->slabs_clsid];
312     assert(it != *head);
313     assert((*head && *tail) || (*head == 0 && *tail == 0));
314     it->prev = 0;
315     it->next = *head;
316     if (it->next) it->next->prev = it;
317     *head = it;
318     if (*tail == 0) *tail = it;
319     sizes[it->slabs_clsid]++;
320     sizes_bytes[it->slabs_clsid] += it->nbytes;
321     return;
322 }
323 
item_link_q(item * it)324 static void item_link_q(item *it) {
325     pthread_mutex_lock(&lru_locks[it->slabs_clsid]);
326     do_item_link_q(it);
327     pthread_mutex_unlock(&lru_locks[it->slabs_clsid]);
328 }
329 
do_item_unlink_q(item * it)330 static void do_item_unlink_q(item *it) {
331     item **head, **tail;
332     head = &heads[it->slabs_clsid];
333     tail = &tails[it->slabs_clsid];
334 
335     if (*head == it) {
336         assert(it->prev == 0);
337         *head = it->next;
338     }
339     if (*tail == it) {
340         assert(it->next == 0);
341         *tail = it->prev;
342     }
343     assert(it->next != it);
344     assert(it->prev != it);
345 
346     if (it->next) it->next->prev = it->prev;
347     if (it->prev) it->prev->next = it->next;
348     sizes[it->slabs_clsid]--;
349     sizes_bytes[it->slabs_clsid] -= it->nbytes;
350     return;
351 }
352 
item_unlink_q(item * it)353 static void item_unlink_q(item *it) {
354     pthread_mutex_lock(&lru_locks[it->slabs_clsid]);
355     do_item_unlink_q(it);
356     pthread_mutex_unlock(&lru_locks[it->slabs_clsid]);
357 }
358 
do_item_link(item * it,const uint32_t hv)359 int do_item_link(item *it, const uint32_t hv) {
360     MEMCACHED_ITEM_LINK(ITEM_key(it), it->nkey, it->nbytes);
361     assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
362     it->it_flags |= ITEM_LINKED;
363     it->time = current_time;
364 
365     STATS_LOCK();
366     stats_state.curr_bytes += ITEM_ntotal(it);
367     stats_state.curr_items += 1;
368     stats.total_items += 1;
369     STATS_UNLOCK();
370 
371     /* Allocate a new CAS ID on link. */
372     ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
373     assoc_insert(it, hv);
374     item_link_q(it);
375     refcount_incr(&it->refcount);
376     item_stats_sizes_add(it);
377 
378     return 1;
379 }
380 
do_item_unlink(item * it,const uint32_t hv)381 void do_item_unlink(item *it, const uint32_t hv) {
382     MEMCACHED_ITEM_UNLINK(ITEM_key(it), it->nkey, it->nbytes);
383     if ((it->it_flags & ITEM_LINKED) != 0) {
384         it->it_flags &= ~ITEM_LINKED;
385         STATS_LOCK();
386         stats_state.curr_bytes -= ITEM_ntotal(it);
387         stats_state.curr_items -= 1;
388         STATS_UNLOCK();
389         item_stats_sizes_remove(it);
390         assoc_delete(ITEM_key(it), it->nkey, hv);
391         item_unlink_q(it);
392         do_item_remove(it);
393     }
394 }
395 
396 /* FIXME: Is it necessary to keep this copy/pasted code? */
do_item_unlink_nolock(item * it,const uint32_t hv)397 void do_item_unlink_nolock(item *it, const uint32_t hv) {
398     MEMCACHED_ITEM_UNLINK(ITEM_key(it), it->nkey, it->nbytes);
399     if ((it->it_flags & ITEM_LINKED) != 0) {
400         it->it_flags &= ~ITEM_LINKED;
401         STATS_LOCK();
402         stats_state.curr_bytes -= ITEM_ntotal(it);
403         stats_state.curr_items -= 1;
404         STATS_UNLOCK();
405         item_stats_sizes_remove(it);
406         assoc_delete(ITEM_key(it), it->nkey, hv);
407         do_item_unlink_q(it);
408         do_item_remove(it);
409     }
410 }
411 
do_item_remove(item * it)412 void do_item_remove(item *it) {
413     MEMCACHED_ITEM_REMOVE(ITEM_key(it), it->nkey, it->nbytes);
414     assert((it->it_flags & ITEM_SLABBED) == 0);
415     assert(it->refcount > 0);
416 
417     if (refcount_decr(&it->refcount) == 0) {
418         item_free(it);
419     }
420 }
421 
422 /* Copy/paste to avoid adding two extra branches for all common calls, since
423  * _nolock is only used in an uncommon case where we want to relink. */
do_item_update_nolock(item * it)424 void do_item_update_nolock(item *it) {
425     MEMCACHED_ITEM_UPDATE(ITEM_key(it), it->nkey, it->nbytes);
426     if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
427         assert((it->it_flags & ITEM_SLABBED) == 0);
428 
429         if ((it->it_flags & ITEM_LINKED) != 0) {
430             do_item_unlink_q(it);
431             it->time = current_time;
432             do_item_link_q(it);
433         }
434     }
435 }
436 
437 /* Bump the last accessed time, or relink if we're in compat mode */
do_item_update(item * it)438 void do_item_update(item *it) {
439     MEMCACHED_ITEM_UPDATE(ITEM_key(it), it->nkey, it->nbytes);
440     if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
441         assert((it->it_flags & ITEM_SLABBED) == 0);
442 
443         if ((it->it_flags & ITEM_LINKED) != 0) {
444             it->time = current_time;
445             if (!settings.lru_maintainer_thread) {
446                 item_unlink_q(it);
447                 item_link_q(it);
448             }
449         }
450     }
451 }
452 
do_item_replace(item * it,item * new_it,const uint32_t hv)453 int do_item_replace(item *it, item *new_it, const uint32_t hv) {
454     MEMCACHED_ITEM_REPLACE(ITEM_key(it), it->nkey, it->nbytes,
455                            ITEM_key(new_it), new_it->nkey, new_it->nbytes);
456     assert((it->it_flags & ITEM_SLABBED) == 0);
457 
458     do_item_unlink(it, hv);
459     return do_item_link(new_it, hv);
460 }
461 
462 /*@null@*/
463 /* This is walking the line of violating lock order, but I think it's safe.
464  * If the LRU lock is held, an item in the LRU cannot be wiped and freed.
465  * The data could possibly be overwritten, but this is only accessing the
466  * headers.
467  * It may not be the best idea to leave it like this, but for now it's safe.
468  * FIXME: only dumps the hot LRU with the new LRU's.
469  */
item_cachedump(const unsigned int slabs_clsid,const unsigned int limit,unsigned int * bytes)470 char *item_cachedump(const unsigned int slabs_clsid, const unsigned int limit, unsigned int *bytes) {
471     unsigned int memlimit = 2 * 1024 * 1024;   /* 2MB max response size */
472     char *buffer;
473     unsigned int bufcurr;
474     item *it;
475     unsigned int len;
476     unsigned int shown = 0;
477     char key_temp[KEY_MAX_LENGTH + 1];
478     char temp[512];
479     unsigned int id = slabs_clsid;
480     if (!settings.lru_maintainer_thread)
481         id |= COLD_LRU;
482 
483     pthread_mutex_lock(&lru_locks[id]);
484     it = heads[id];
485 
486     buffer = malloc((size_t)memlimit);
487     if (buffer == 0) {
488         return NULL;
489     }
490     bufcurr = 0;
491 
492     while (it != NULL && (limit == 0 || shown < limit)) {
493         assert(it->nkey <= KEY_MAX_LENGTH);
494         if (it->nbytes == 0 && it->nkey == 0) {
495             it = it->next;
496             continue;
497         }
498         /* Copy the key since it may not be null-terminated in the struct */
499         strncpy(key_temp, ITEM_key(it), it->nkey);
500         key_temp[it->nkey] = 0x00; /* terminate */
501         len = snprintf(temp, sizeof(temp), "ITEM %s [%d b; %lu s]\r\n",
502                        key_temp, it->nbytes - 2,
503                        it->exptime == 0 ? 0 :
504                        (unsigned long)it->exptime + process_started);
505         if (bufcurr + len + 6 > memlimit)  /* 6 is END\r\n\0 */
506             break;
507         memcpy(buffer + bufcurr, temp, len);
508         bufcurr += len;
509         shown++;
510         it = it->next;
511     }
512 
513     memcpy(buffer + bufcurr, "END\r\n", 6);
514     bufcurr += 5;
515 
516     *bytes = bufcurr;
517     pthread_mutex_unlock(&lru_locks[id]);
518     return buffer;
519 }
520 
item_stats_totals(ADD_STAT add_stats,void * c)521 void item_stats_totals(ADD_STAT add_stats, void *c) {
522     itemstats_t totals;
523     memset(&totals, 0, sizeof(itemstats_t));
524     int n;
525     for (n = 0; n < MAX_NUMBER_OF_SLAB_CLASSES; n++) {
526         int x;
527         int i;
528         for (x = 0; x < 4; x++) {
529             i = n | lru_type_map[x];
530             pthread_mutex_lock(&lru_locks[i]);
531             totals.expired_unfetched += itemstats[i].expired_unfetched;
532             totals.evicted_unfetched += itemstats[i].evicted_unfetched;
533             totals.evicted += itemstats[i].evicted;
534             totals.reclaimed += itemstats[i].reclaimed;
535             totals.crawler_reclaimed += itemstats[i].crawler_reclaimed;
536             totals.crawler_items_checked += itemstats[i].crawler_items_checked;
537             totals.lrutail_reflocked += itemstats[i].lrutail_reflocked;
538             totals.moves_to_cold += itemstats[i].moves_to_cold;
539             totals.moves_to_warm += itemstats[i].moves_to_warm;
540             totals.moves_within_lru += itemstats[i].moves_within_lru;
541             totals.direct_reclaims += itemstats[i].direct_reclaims;
542             pthread_mutex_unlock(&lru_locks[i]);
543         }
544     }
545     APPEND_STAT("expired_unfetched", "%llu",
546                 (unsigned long long)totals.expired_unfetched);
547     APPEND_STAT("evicted_unfetched", "%llu",
548                 (unsigned long long)totals.evicted_unfetched);
549     APPEND_STAT("evictions", "%llu",
550                 (unsigned long long)totals.evicted);
551     APPEND_STAT("reclaimed", "%llu",
552                 (unsigned long long)totals.reclaimed);
553     APPEND_STAT("crawler_reclaimed", "%llu",
554                 (unsigned long long)totals.crawler_reclaimed);
555     APPEND_STAT("crawler_items_checked", "%llu",
556                 (unsigned long long)totals.crawler_items_checked);
557     APPEND_STAT("lrutail_reflocked", "%llu",
558                 (unsigned long long)totals.lrutail_reflocked);
559     if (settings.lru_maintainer_thread) {
560         APPEND_STAT("moves_to_cold", "%llu",
561                     (unsigned long long)totals.moves_to_cold);
562         APPEND_STAT("moves_to_warm", "%llu",
563                     (unsigned long long)totals.moves_to_warm);
564         APPEND_STAT("moves_within_lru", "%llu",
565                     (unsigned long long)totals.moves_within_lru);
566         APPEND_STAT("direct_reclaims", "%llu",
567                     (unsigned long long)totals.direct_reclaims);
568     }
569 }
570 
item_stats(ADD_STAT add_stats,void * c)571 void item_stats(ADD_STAT add_stats, void *c) {
572     itemstats_t totals;
573     int n;
574     for (n = 0; n < MAX_NUMBER_OF_SLAB_CLASSES; n++) {
575         memset(&totals, 0, sizeof(itemstats_t));
576         int x;
577         int i;
578         unsigned int size = 0;
579         unsigned int age  = 0;
580         unsigned int lru_size_map[4];
581         const char *fmt = "items:%d:%s";
582         char key_str[STAT_KEY_LEN];
583         char val_str[STAT_VAL_LEN];
584         int klen = 0, vlen = 0;
585         for (x = 0; x < 4; x++) {
586             i = n | lru_type_map[x];
587             pthread_mutex_lock(&lru_locks[i]);
588             totals.evicted += itemstats[i].evicted;
589             totals.evicted_nonzero += itemstats[i].evicted_nonzero;
590             totals.outofmemory += itemstats[i].outofmemory;
591             totals.tailrepairs += itemstats[i].tailrepairs;
592             totals.reclaimed += itemstats[i].reclaimed;
593             totals.expired_unfetched += itemstats[i].expired_unfetched;
594             totals.evicted_unfetched += itemstats[i].evicted_unfetched;
595             totals.crawler_reclaimed += itemstats[i].crawler_reclaimed;
596             totals.crawler_items_checked += itemstats[i].crawler_items_checked;
597             totals.lrutail_reflocked += itemstats[i].lrutail_reflocked;
598             totals.moves_to_cold += itemstats[i].moves_to_cold;
599             totals.moves_to_warm += itemstats[i].moves_to_warm;
600             totals.moves_within_lru += itemstats[i].moves_within_lru;
601             totals.direct_reclaims += itemstats[i].direct_reclaims;
602             size += sizes[i];
603             lru_size_map[x] = sizes[i];
604             if (lru_type_map[x] == COLD_LRU && tails[i] != NULL)
605                 age = current_time - tails[i]->time;
606             pthread_mutex_unlock(&lru_locks[i]);
607         }
608         if (size == 0)
609             continue;
610         APPEND_NUM_FMT_STAT(fmt, n, "number", "%u", size);
611         if (settings.lru_maintainer_thread) {
612             APPEND_NUM_FMT_STAT(fmt, n, "number_hot", "%u", lru_size_map[0]);
613             APPEND_NUM_FMT_STAT(fmt, n, "number_warm", "%u", lru_size_map[1]);
614             APPEND_NUM_FMT_STAT(fmt, n, "number_cold", "%u", lru_size_map[2]);
615             if (settings.expirezero_does_not_evict) {
616                 APPEND_NUM_FMT_STAT(fmt, n, "number_noexp", "%u", lru_size_map[3]);
617             }
618         }
619         APPEND_NUM_FMT_STAT(fmt, n, "age", "%u", age);
620         APPEND_NUM_FMT_STAT(fmt, n, "evicted",
621                             "%llu", (unsigned long long)totals.evicted);
622         APPEND_NUM_FMT_STAT(fmt, n, "evicted_nonzero",
623                             "%llu", (unsigned long long)totals.evicted_nonzero);
624         APPEND_NUM_FMT_STAT(fmt, n, "evicted_time",
625                             "%u", totals.evicted_time);
626         APPEND_NUM_FMT_STAT(fmt, n, "outofmemory",
627                             "%llu", (unsigned long long)totals.outofmemory);
628         APPEND_NUM_FMT_STAT(fmt, n, "tailrepairs",
629                             "%llu", (unsigned long long)totals.tailrepairs);
630         APPEND_NUM_FMT_STAT(fmt, n, "reclaimed",
631                             "%llu", (unsigned long long)totals.reclaimed);
632         APPEND_NUM_FMT_STAT(fmt, n, "expired_unfetched",
633                             "%llu", (unsigned long long)totals.expired_unfetched);
634         APPEND_NUM_FMT_STAT(fmt, n, "evicted_unfetched",
635                             "%llu", (unsigned long long)totals.evicted_unfetched);
636         APPEND_NUM_FMT_STAT(fmt, n, "crawler_reclaimed",
637                             "%llu", (unsigned long long)totals.crawler_reclaimed);
638         APPEND_NUM_FMT_STAT(fmt, n, "crawler_items_checked",
639                             "%llu", (unsigned long long)totals.crawler_items_checked);
640         APPEND_NUM_FMT_STAT(fmt, n, "lrutail_reflocked",
641                             "%llu", (unsigned long long)totals.lrutail_reflocked);
642         if (settings.lru_maintainer_thread) {
643             APPEND_NUM_FMT_STAT(fmt, n, "moves_to_cold",
644                                 "%llu", (unsigned long long)totals.moves_to_cold);
645             APPEND_NUM_FMT_STAT(fmt, n, "moves_to_warm",
646                                 "%llu", (unsigned long long)totals.moves_to_warm);
647             APPEND_NUM_FMT_STAT(fmt, n, "moves_within_lru",
648                                 "%llu", (unsigned long long)totals.moves_within_lru);
649             APPEND_NUM_FMT_STAT(fmt, n, "direct_reclaims",
650                                 "%llu", (unsigned long long)totals.direct_reclaims);
651         }
652     }
653 
654     /* getting here means both ascii and binary terminators fit */
655     add_stats(NULL, 0, NULL, 0, c);
656 }
657 
item_stats_sizes_status(void)658 bool item_stats_sizes_status(void) {
659     bool ret = false;
660     mutex_lock(&stats_sizes_lock);
661     if (stats_sizes_hist != NULL)
662         ret = true;
663     mutex_unlock(&stats_sizes_lock);
664     return ret;
665 }
666 
item_stats_sizes_init(void)667 void item_stats_sizes_init(void) {
668     if (stats_sizes_hist != NULL)
669         return;
670     stats_sizes_buckets = settings.item_size_max / 32 + 1;
671     stats_sizes_hist = calloc(stats_sizes_buckets, sizeof(int));
672     stats_sizes_cas_min = (settings.use_cas) ? get_cas_id() : 0;
673 }
674 
item_stats_sizes_enable(ADD_STAT add_stats,void * c)675 void item_stats_sizes_enable(ADD_STAT add_stats, void *c) {
676     mutex_lock(&stats_sizes_lock);
677     if (!settings.use_cas) {
678         APPEND_STAT("sizes_status", "error", "");
679         APPEND_STAT("sizes_error", "cas_support_disabled", "");
680     } else if (stats_sizes_hist == NULL) {
681         item_stats_sizes_init();
682         if (stats_sizes_hist != NULL) {
683             APPEND_STAT("sizes_status", "enabled", "");
684         } else {
685             APPEND_STAT("sizes_status", "error", "");
686             APPEND_STAT("sizes_error", "no_memory", "");
687         }
688     } else {
689         APPEND_STAT("sizes_status", "enabled", "");
690     }
691     mutex_unlock(&stats_sizes_lock);
692 }
693 
item_stats_sizes_disable(ADD_STAT add_stats,void * c)694 void item_stats_sizes_disable(ADD_STAT add_stats, void *c) {
695     mutex_lock(&stats_sizes_lock);
696     if (stats_sizes_hist != NULL) {
697         free(stats_sizes_hist);
698         stats_sizes_hist = NULL;
699     }
700     APPEND_STAT("sizes_status", "disabled", "");
701     mutex_unlock(&stats_sizes_lock);
702 }
703 
item_stats_sizes_add(item * it)704 void item_stats_sizes_add(item *it) {
705     if (stats_sizes_hist == NULL || stats_sizes_cas_min > ITEM_get_cas(it))
706         return;
707     int ntotal = ITEM_ntotal(it);
708     int bucket = ntotal / 32;
709     if ((ntotal % 32) != 0) bucket++;
710     if (bucket < stats_sizes_buckets) stats_sizes_hist[bucket]++;
711 }
712 
713 /* I think there's no way for this to be accurate without using the CAS value.
714  * Since items getting their time value bumped will pass this validation.
715  */
item_stats_sizes_remove(item * it)716 void item_stats_sizes_remove(item *it) {
717     if (stats_sizes_hist == NULL || stats_sizes_cas_min > ITEM_get_cas(it))
718         return;
719     int ntotal = ITEM_ntotal(it);
720     int bucket = ntotal / 32;
721     if ((ntotal % 32) != 0) bucket++;
722     if (bucket < stats_sizes_buckets) stats_sizes_hist[bucket]--;
723 }
724 
725 /** dumps out a list of objects of each size, with granularity of 32 bytes */
726 /*@null@*/
727 /* Locks are correct based on a technicality. Holds LRU lock while doing the
728  * work, so items can't go invalid, and it's only looking at header sizes
729  * which don't change.
730  */
item_stats_sizes(ADD_STAT add_stats,void * c)731 void item_stats_sizes(ADD_STAT add_stats, void *c) {
732     mutex_lock(&stats_sizes_lock);
733 
734     if (stats_sizes_hist != NULL) {
735         int i;
736         for (i = 0; i < stats_sizes_buckets; i++) {
737             if (stats_sizes_hist[i] != 0) {
738                 char key[8];
739                 snprintf(key, sizeof(key), "%d", i * 32);
740                 APPEND_STAT(key, "%u", stats_sizes_hist[i]);
741             }
742         }
743     } else {
744         APPEND_STAT("sizes_status", "disabled", "");
745     }
746 
747     add_stats(NULL, 0, NULL, 0, c);
748     mutex_unlock(&stats_sizes_lock);
749 }
750 
751 /** wrapper around assoc_find which does the lazy expiration logic */
do_item_get(const char * key,const size_t nkey,const uint32_t hv,conn * c)752 item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c) {
753     item *it = assoc_find(key, nkey, hv);
754     if (it != NULL) {
755         refcount_incr(&it->refcount);
756         /* Optimization for slab reassignment. prevents popular items from
757          * jamming in busy wait. Can only do this here to satisfy lock order
758          * of item_lock, slabs_lock. */
759         /* This was made unsafe by removal of the cache_lock:
760          * slab_rebalance_signal and slab_rebal.* are modified in a separate
761          * thread under slabs_lock. If slab_rebalance_signal = 1, slab_start =
762          * NULL (0), but slab_end is still equal to some value, this would end
763          * up unlinking every item fetched.
764          * This is either an acceptable loss, or if slab_rebalance_signal is
765          * true, slab_start/slab_end should be put behind the slabs_lock.
766          * Which would cause a huge potential slowdown.
767          * Could also use a specific lock for slab_rebal.* and
768          * slab_rebalance_signal (shorter lock?)
769          */
770         /*if (slab_rebalance_signal &&
771             ((void *)it >= slab_rebal.slab_start && (void *)it < slab_rebal.slab_end)) {
772             do_item_unlink(it, hv);
773             do_item_remove(it);
774             it = NULL;
775         }*/
776     }
777     int was_found = 0;
778 
779     if (settings.verbose > 2) {
780         int ii;
781         if (it == NULL) {
782             fprintf(stderr, "> NOT FOUND ");
783         } else {
784             fprintf(stderr, "> FOUND KEY ");
785         }
786         for (ii = 0; ii < nkey; ++ii) {
787             fprintf(stderr, "%c", key[ii]);
788         }
789     }
790 
791     if (it != NULL) {
792         was_found = 1;
793         if (item_is_flushed(it)) {
794             do_item_unlink(it, hv);
795             do_item_remove(it);
796             it = NULL;
797             pthread_mutex_lock(&c->thread->stats.mutex);
798             c->thread->stats.get_flushed++;
799             pthread_mutex_unlock(&c->thread->stats.mutex);
800             if (settings.verbose > 2) {
801                 fprintf(stderr, " -nuked by flush");
802             }
803             was_found = 2;
804         } else if (it->exptime != 0 && it->exptime <= current_time) {
805             do_item_unlink(it, hv);
806             do_item_remove(it);
807             it = NULL;
808             pthread_mutex_lock(&c->thread->stats.mutex);
809             c->thread->stats.get_expired++;
810             pthread_mutex_unlock(&c->thread->stats.mutex);
811             if (settings.verbose > 2) {
812                 fprintf(stderr, " -nuked by expire");
813             }
814             was_found = 3;
815         } else {
816             it->it_flags |= ITEM_FETCHED|ITEM_ACTIVE;
817             DEBUG_REFCNT(it, '+');
818         }
819     }
820 
821     if (settings.verbose > 2)
822         fprintf(stderr, "\n");
823     /* For now this is in addition to the above verbose logging. */
824     LOGGER_LOG(c->thread->l, LOG_FETCHERS, LOGGER_ITEM_GET, NULL, was_found, key, nkey);
825 
826     return it;
827 }
828 
do_item_touch(const char * key,size_t nkey,uint32_t exptime,const uint32_t hv,conn * c)829 item *do_item_touch(const char *key, size_t nkey, uint32_t exptime,
830                     const uint32_t hv, conn *c) {
831     item *it = do_item_get(key, nkey, hv, c);
832     if (it != NULL) {
833         it->exptime = exptime;
834     }
835     return it;
836 }
837 
838 /*** LRU MAINTENANCE THREAD ***/
839 
840 /* Returns number of items remove, expired, or evicted.
841  * Callable from worker threads or the LRU maintainer thread */
lru_pull_tail(const int orig_id,const int cur_lru,const uint64_t total_bytes,const bool do_evict)842 static int lru_pull_tail(const int orig_id, const int cur_lru,
843         const uint64_t total_bytes, const bool do_evict) {
844     item *it = NULL;
845     int id = orig_id;
846     int removed = 0;
847     if (id == 0)
848         return 0;
849 
850     int tries = 5;
851     item *search;
852     item *next_it;
853     void *hold_lock = NULL;
854     unsigned int move_to_lru = 0;
855     uint64_t limit = 0;
856 
857     id |= cur_lru;
858     pthread_mutex_lock(&lru_locks[id]);
859     search = tails[id];
860     /* We walk up *only* for locked items, and if bottom is expired. */
861     for (; tries > 0 && search != NULL; tries--, search=next_it) {
862         /* we might relink search mid-loop, so search->prev isn't reliable */
863         next_it = search->prev;
864         if (search->nbytes == 0 && search->nkey == 0 && search->it_flags == 1) {
865             /* We are a crawler, ignore it. */
866             tries++;
867             continue;
868         }
869         uint32_t hv = hash(ITEM_key(search), search->nkey);
870         /* Attempt to hash item lock the "search" item. If locked, no
871          * other callers can incr the refcount. Also skip ourselves. */
872         if ((hold_lock = item_trylock(hv)) == NULL)
873             continue;
874         /* Now see if the item is refcount locked */
875         if (refcount_incr(&search->refcount) != 2) {
876             /* Note pathological case with ref'ed items in tail.
877              * Can still unlink the item, but it won't be reusable yet */
878             itemstats[id].lrutail_reflocked++;
879             /* In case of refcount leaks, enable for quick workaround. */
880             /* WARNING: This can cause terrible corruption */
881             if (settings.tail_repair_time &&
882                     search->time + settings.tail_repair_time < current_time) {
883                 itemstats[id].tailrepairs++;
884                 search->refcount = 1;
885                 /* This will call item_remove -> item_free since refcnt is 1 */
886                 do_item_unlink_nolock(search, hv);
887                 item_trylock_unlock(hold_lock);
888                 continue;
889             }
890         }
891 
892         /* Expired or flushed */
893         if ((search->exptime != 0 && search->exptime < current_time)
894             || item_is_flushed(search)) {
895             itemstats[id].reclaimed++;
896             if ((search->it_flags & ITEM_FETCHED) == 0) {
897                 itemstats[id].expired_unfetched++;
898             }
899             /* refcnt 2 -> 1 */
900             do_item_unlink_nolock(search, hv);
901             /* refcnt 1 -> 0 -> item_free */
902             do_item_remove(search);
903             item_trylock_unlock(hold_lock);
904             removed++;
905 
906             /* If all we're finding are expired, can keep going */
907             continue;
908         }
909 
910         /* If we're HOT_LRU or WARM_LRU and over size limit, send to COLD_LRU.
911          * If we're COLD_LRU, send to WARM_LRU unless we need to evict
912          */
913         switch (cur_lru) {
914             case HOT_LRU:
915                 limit = total_bytes * settings.hot_lru_pct / 100;
916             case WARM_LRU:
917                 if (limit == 0)
918                     limit = total_bytes * settings.warm_lru_pct / 100;
919                 if (sizes_bytes[id] > limit) {
920                     itemstats[id].moves_to_cold++;
921                     move_to_lru = COLD_LRU;
922                     do_item_unlink_q(search);
923                     it = search;
924                     removed++;
925                     break;
926                 } else if ((search->it_flags & ITEM_ACTIVE) != 0) {
927                     /* Only allow ACTIVE relinking if we're not too large. */
928                     itemstats[id].moves_within_lru++;
929                     search->it_flags &= ~ITEM_ACTIVE;
930                     do_item_update_nolock(search);
931                     do_item_remove(search);
932                     item_trylock_unlock(hold_lock);
933                 } else {
934                     /* Don't want to move to COLD, not active, bail out */
935                     it = search;
936                 }
937                 break;
938             case COLD_LRU:
939                 it = search; /* No matter what, we're stopping */
940                 if (do_evict) {
941                     if (settings.evict_to_free == 0) {
942                         /* Don't think we need a counter for this. It'll OOM.  */
943                         break;
944                     }
945                     itemstats[id].evicted++;
946                     itemstats[id].evicted_time = current_time - search->time;
947                     if (search->exptime != 0)
948                         itemstats[id].evicted_nonzero++;
949                     if ((search->it_flags & ITEM_FETCHED) == 0) {
950                         itemstats[id].evicted_unfetched++;
951                     }
952                     LOGGER_LOG(NULL, LOG_EVICTIONS, LOGGER_EVICTION, search);
953                     do_item_unlink_nolock(search, hv);
954                     removed++;
955                     if (settings.slab_automove == 2) {
956                         slabs_reassign(-1, orig_id);
957                     }
958                 } else if ((search->it_flags & ITEM_ACTIVE) != 0
959                         && settings.lru_maintainer_thread) {
960                     itemstats[id].moves_to_warm++;
961                     search->it_flags &= ~ITEM_ACTIVE;
962                     move_to_lru = WARM_LRU;
963                     do_item_unlink_q(search);
964                     removed++;
965                 }
966                 break;
967         }
968         if (it != NULL)
969             break;
970     }
971 
972     pthread_mutex_unlock(&lru_locks[id]);
973 
974     if (it != NULL) {
975         if (move_to_lru) {
976             it->slabs_clsid = ITEM_clsid(it);
977             it->slabs_clsid |= move_to_lru;
978             item_link_q(it);
979         }
980         do_item_remove(it);
981         item_trylock_unlock(hold_lock);
982     }
983 
984     return removed;
985 }
986 
987 /* Loop up to N times:
988  * If too many items are in HOT_LRU, push to COLD_LRU
989  * If too many items are in WARM_LRU, push to COLD_LRU
990  * If too many items are in COLD_LRU, poke COLD_LRU tail
991  * 1000 loops with 1ms min sleep gives us under 1m items shifted/sec. The
992  * locks can't handle much more than that. Leaving a TODO for how to
993  * autoadjust in the future.
994  */
lru_maintainer_juggle(const int slabs_clsid)995 static int lru_maintainer_juggle(const int slabs_clsid) {
996     int i;
997     int did_moves = 0;
998     bool mem_limit_reached = false;
999     uint64_t total_bytes = 0;
1000     unsigned int chunks_perslab = 0;
1001     unsigned int chunks_free = 0;
1002     /* TODO: if free_chunks below high watermark, increase aggressiveness */
1003     chunks_free = slabs_available_chunks(slabs_clsid, &mem_limit_reached,
1004             &total_bytes, &chunks_perslab);
1005     if (settings.expirezero_does_not_evict)
1006         total_bytes -= noexp_lru_size(slabs_clsid);
1007 
1008     /* If slab automove is enabled on any level, and we have more than 2 pages
1009      * worth of chunks free in this class, ask (gently) to reassign a page
1010      * from this class back into the global pool (0)
1011      */
1012     if (settings.slab_automove > 0 && chunks_free > (chunks_perslab * 2.5)) {
1013         slabs_reassign(slabs_clsid, SLAB_GLOBAL_PAGE_POOL);
1014     }
1015 
1016     /* Juggle HOT/WARM up to N times */
1017     for (i = 0; i < 1000; i++) {
1018         int do_more = 0;
1019         if (lru_pull_tail(slabs_clsid, HOT_LRU, total_bytes, false) ||
1020             lru_pull_tail(slabs_clsid, WARM_LRU, total_bytes, false)) {
1021             do_more++;
1022         }
1023         do_more += lru_pull_tail(slabs_clsid, COLD_LRU, total_bytes, false);
1024         if (do_more == 0)
1025             break;
1026         did_moves++;
1027     }
1028     return did_moves;
1029 }
1030 
1031 /* Will crawl all slab classes a minimum of once per hour */
1032 #define MAX_MAINTCRAWL_WAIT 60 * 60
1033 
1034 /* Hoping user input will improve this function. This is all a wild guess.
1035  * Operation: Kicks crawler for each slab id. Crawlers take some statistics as
1036  * to items with nonzero expirations. It then buckets how many items will
1037  * expire per minute for the next hour.
1038  * This function checks the results of a run, and if it things more than 1% of
1039  * expirable objects are ready to go, kick the crawler again to reap.
1040  * It will also kick the crawler once per minute regardless, waiting a minute
1041  * longer for each time it has no work to do, up to an hour wait time.
1042  * The latter is to avoid newly started daemons from waiting too long before
1043  * retrying a crawl.
1044  */
lru_maintainer_crawler_check(void)1045 static void lru_maintainer_crawler_check(void) {
1046     int i;
1047     static rel_time_t last_crawls[MAX_NUMBER_OF_SLAB_CLASSES];
1048     static rel_time_t next_crawl_wait[MAX_NUMBER_OF_SLAB_CLASSES];
1049     for (i = POWER_SMALLEST; i < MAX_NUMBER_OF_SLAB_CLASSES; i++) {
1050         crawlerstats_t *s = &crawlerstats[i];
1051         /* We've not successfully kicked off a crawl yet. */
1052         if (last_crawls[i] == 0) {
1053             if (lru_crawler_start(i, 0) > 0) {
1054                 last_crawls[i] = current_time;
1055             }
1056         }
1057         pthread_mutex_lock(&lru_crawler_stats_lock);
1058         if (s->run_complete) {
1059             int x;
1060             /* Should we crawl again? */
1061             uint64_t possible_reclaims = s->seen - s->noexp;
1062             uint64_t available_reclaims = 0;
1063             /* Need to think we can free at least 1% of the items before
1064              * crawling. */
1065             /* FIXME: Configurable? */
1066             uint64_t low_watermark = (s->seen / 100) + 1;
1067             rel_time_t since_run = current_time - s->end_time;
1068             /* Don't bother if the payoff is too low. */
1069             if (settings.verbose > 1)
1070                 fprintf(stderr, "maint crawler: low_watermark: %llu, possible_reclaims: %llu, since_run: %u\n",
1071                         (unsigned long long)low_watermark, (unsigned long long)possible_reclaims,
1072                         (unsigned int)since_run);
1073             for (x = 0; x < 60; x++) {
1074                 if (since_run < (x * 60) + 60)
1075                     break;
1076                 available_reclaims += s->histo[x];
1077             }
1078             if (available_reclaims > low_watermark) {
1079                 last_crawls[i] = 0;
1080                 if (next_crawl_wait[i] > 60)
1081                     next_crawl_wait[i] -= 60;
1082             } else if (since_run > 5 && since_run > next_crawl_wait[i]) {
1083                 last_crawls[i] = 0;
1084                 if (next_crawl_wait[i] < MAX_MAINTCRAWL_WAIT)
1085                     next_crawl_wait[i] += 60;
1086             }
1087             if (settings.verbose > 1)
1088                 fprintf(stderr, "maint crawler: available reclaims: %llu, next_crawl: %u\n", (unsigned long long)available_reclaims, next_crawl_wait[i]);
1089         }
1090         pthread_mutex_unlock(&lru_crawler_stats_lock);
1091     }
1092 }
1093 
1094 static pthread_t lru_maintainer_tid;
1095 
1096 #define MAX_LRU_MAINTAINER_SLEEP 1000000
1097 #define MIN_LRU_MAINTAINER_SLEEP 1000
1098 
lru_maintainer_thread(void * arg)1099 static void *lru_maintainer_thread(void *arg) {
1100     int i;
1101     useconds_t to_sleep = MIN_LRU_MAINTAINER_SLEEP;
1102     rel_time_t last_crawler_check = 0;
1103 
1104     pthread_mutex_lock(&lru_maintainer_lock);
1105     if (settings.verbose > 2)
1106         fprintf(stderr, "Starting LRU maintainer background thread\n");
1107     while (do_run_lru_maintainer_thread) {
1108         int did_moves = 0;
1109         pthread_mutex_unlock(&lru_maintainer_lock);
1110         usleep(to_sleep);
1111         pthread_mutex_lock(&lru_maintainer_lock);
1112 
1113         STATS_LOCK();
1114         stats.lru_maintainer_juggles++;
1115         STATS_UNLOCK();
1116         /* We were asked to immediately wake up and poke a particular slab
1117          * class due to a low watermark being hit */
1118         if (lru_maintainer_check_clsid != 0) {
1119             did_moves = lru_maintainer_juggle(lru_maintainer_check_clsid);
1120             lru_maintainer_check_clsid = 0;
1121         } else {
1122             for (i = POWER_SMALLEST; i < MAX_NUMBER_OF_SLAB_CLASSES; i++) {
1123                 did_moves += lru_maintainer_juggle(i);
1124             }
1125         }
1126         if (did_moves == 0) {
1127             if (to_sleep < MAX_LRU_MAINTAINER_SLEEP)
1128                 to_sleep += 1000;
1129         } else {
1130             to_sleep /= 2;
1131             if (to_sleep < MIN_LRU_MAINTAINER_SLEEP)
1132                 to_sleep = MIN_LRU_MAINTAINER_SLEEP;
1133         }
1134         /* Once per second at most */
1135         if (settings.lru_crawler && last_crawler_check != current_time) {
1136             lru_maintainer_crawler_check();
1137             last_crawler_check = current_time;
1138         }
1139     }
1140     pthread_mutex_unlock(&lru_maintainer_lock);
1141     if (settings.verbose > 2)
1142         fprintf(stderr, "LRU maintainer thread stopping\n");
1143 
1144     return NULL;
1145 }
stop_lru_maintainer_thread(void)1146 int stop_lru_maintainer_thread(void) {
1147     int ret;
1148     pthread_mutex_lock(&lru_maintainer_lock);
1149     /* LRU thread is a sleep loop, will die on its own */
1150     do_run_lru_maintainer_thread = 0;
1151     pthread_mutex_unlock(&lru_maintainer_lock);
1152     if ((ret = pthread_join(lru_maintainer_tid, NULL)) != 0) {
1153         fprintf(stderr, "Failed to stop LRU maintainer thread: %s\n", strerror(ret));
1154         return -1;
1155     }
1156     settings.lru_maintainer_thread = false;
1157     return 0;
1158 }
1159 
start_lru_maintainer_thread(void)1160 int start_lru_maintainer_thread(void) {
1161     int ret;
1162 
1163     pthread_mutex_lock(&lru_maintainer_lock);
1164     do_run_lru_maintainer_thread = 1;
1165     settings.lru_maintainer_thread = true;
1166     if ((ret = pthread_create(&lru_maintainer_tid, NULL,
1167         lru_maintainer_thread, NULL)) != 0) {
1168         fprintf(stderr, "Can't create LRU maintainer thread: %s\n",
1169             strerror(ret));
1170         pthread_mutex_unlock(&lru_maintainer_lock);
1171         return -1;
1172     }
1173     pthread_mutex_unlock(&lru_maintainer_lock);
1174 
1175     return 0;
1176 }
1177 
1178 /* If we hold this lock, crawler can't wake up or move */
lru_maintainer_pause(void)1179 void lru_maintainer_pause(void) {
1180     pthread_mutex_lock(&lru_maintainer_lock);
1181 }
1182 
lru_maintainer_resume(void)1183 void lru_maintainer_resume(void) {
1184     pthread_mutex_unlock(&lru_maintainer_lock);
1185 }
1186 
init_lru_maintainer(void)1187 int init_lru_maintainer(void) {
1188     if (lru_maintainer_initialized == 0) {
1189         pthread_mutex_init(&lru_maintainer_lock, NULL);
1190         lru_maintainer_initialized = 1;
1191     }
1192     return 0;
1193 }
1194 
1195 /*** ITEM CRAWLER THREAD ***/
1196 
crawler_link_q(item * it)1197 static void crawler_link_q(item *it) { /* item is the new tail */
1198     item **head, **tail;
1199     assert(it->it_flags == 1);
1200     assert(it->nbytes == 0);
1201 
1202     head = &heads[it->slabs_clsid];
1203     tail = &tails[it->slabs_clsid];
1204     assert(*tail != 0);
1205     assert(it != *tail);
1206     assert((*head && *tail) || (*head == 0 && *tail == 0));
1207     it->prev = *tail;
1208     it->next = 0;
1209     if (it->prev) {
1210         assert(it->prev->next == 0);
1211         it->prev->next = it;
1212     }
1213     *tail = it;
1214     if (*head == 0) *head = it;
1215     return;
1216 }
1217 
crawler_unlink_q(item * it)1218 static void crawler_unlink_q(item *it) {
1219     item **head, **tail;
1220     head = &heads[it->slabs_clsid];
1221     tail = &tails[it->slabs_clsid];
1222 
1223     if (*head == it) {
1224         assert(it->prev == 0);
1225         *head = it->next;
1226     }
1227     if (*tail == it) {
1228         assert(it->next == 0);
1229         *tail = it->prev;
1230     }
1231     assert(it->next != it);
1232     assert(it->prev != it);
1233 
1234     if (it->next) it->next->prev = it->prev;
1235     if (it->prev) it->prev->next = it->next;
1236     return;
1237 }
1238 
1239 /* This is too convoluted, but it's a difficult shuffle. Try to rewrite it
1240  * more clearly. */
crawler_crawl_q(item * it)1241 static item *crawler_crawl_q(item *it) {
1242     item **head, **tail;
1243     assert(it->it_flags == 1);
1244     assert(it->nbytes == 0);
1245     head = &heads[it->slabs_clsid];
1246     tail = &tails[it->slabs_clsid];
1247 
1248     /* We've hit the head, pop off */
1249     if (it->prev == 0) {
1250         assert(*head == it);
1251         if (it->next) {
1252             *head = it->next;
1253             assert(it->next->prev == it);
1254             it->next->prev = 0;
1255         }
1256         return NULL; /* Done */
1257     }
1258 
1259     /* Swing ourselves in front of the next item */
1260     /* NB: If there is a prev, we can't be the head */
1261     assert(it->prev != it);
1262     if (it->prev) {
1263         if (*head == it->prev) {
1264             /* Prev was the head, now we're the head */
1265             *head = it;
1266         }
1267         if (*tail == it) {
1268             /* We are the tail, now they are the tail */
1269             *tail = it->prev;
1270         }
1271         assert(it->next != it);
1272         if (it->next) {
1273             assert(it->prev->next == it);
1274             it->prev->next = it->next;
1275             it->next->prev = it->prev;
1276         } else {
1277             /* Tail. Move this above? */
1278             it->prev->next = 0;
1279         }
1280         /* prev->prev's next is it->prev */
1281         it->next = it->prev;
1282         it->prev = it->next->prev;
1283         it->next->prev = it;
1284         /* New it->prev now, if we're not at the head. */
1285         if (it->prev) {
1286             it->prev->next = it;
1287         }
1288     }
1289     assert(it->next != it);
1290     assert(it->prev != it);
1291 
1292     return it->next; /* success */
1293 }
1294 
1295 /* I pulled this out to make the main thread clearer, but it reaches into the
1296  * main thread's values too much. Should rethink again.
1297  */
item_crawler_evaluate(item * search,uint32_t hv,int i)1298 static void item_crawler_evaluate(item *search, uint32_t hv, int i) {
1299     int slab_id = CLEAR_LRU(i);
1300     crawlerstats_t *s = &crawlerstats[slab_id];
1301     itemstats[i].crawler_items_checked++;
1302     int is_flushed = item_is_flushed(search);
1303     if ((search->exptime != 0 && search->exptime < current_time)
1304         || is_flushed) {
1305         itemstats[i].crawler_reclaimed++;
1306         s->reclaimed++;
1307 
1308         if (settings.verbose > 1) {
1309             int ii;
1310             char *key = ITEM_key(search);
1311             fprintf(stderr, "LRU crawler found an expired item (flags: %d, slab: %d): ",
1312                 search->it_flags, search->slabs_clsid);
1313             for (ii = 0; ii < search->nkey; ++ii) {
1314                 fprintf(stderr, "%c", key[ii]);
1315             }
1316             fprintf(stderr, "\n");
1317         }
1318         if ((search->it_flags & ITEM_FETCHED) == 0 && !is_flushed) {
1319             itemstats[i].expired_unfetched++;
1320         }
1321         do_item_unlink_nolock(search, hv);
1322         do_item_remove(search);
1323         assert(search->slabs_clsid == 0);
1324     } else {
1325         s->seen++;
1326         refcount_decr(&search->refcount);
1327         if (search->exptime == 0) {
1328             s->noexp++;
1329         } else if (search->exptime - current_time > 3599) {
1330             s->ttl_hourplus++;
1331         } else {
1332             rel_time_t ttl_remain = search->exptime - current_time;
1333             int bucket = ttl_remain / 60;
1334             s->histo[bucket]++;
1335         }
1336     }
1337 }
1338 
item_crawler_thread(void * arg)1339 static void *item_crawler_thread(void *arg) {
1340     int i;
1341     int crawls_persleep = settings.crawls_persleep;
1342 
1343     pthread_mutex_lock(&lru_crawler_lock);
1344     pthread_cond_signal(&lru_crawler_cond);
1345     settings.lru_crawler = true;
1346     if (settings.verbose > 2)
1347         fprintf(stderr, "Starting LRU crawler background thread\n");
1348     while (do_run_lru_crawler_thread) {
1349     pthread_cond_wait(&lru_crawler_cond, &lru_crawler_lock);
1350 
1351     while (crawler_count) {
1352         item *search = NULL;
1353         void *hold_lock = NULL;
1354 
1355         for (i = POWER_SMALLEST; i < LARGEST_ID; i++) {
1356             if (crawlers[i].it_flags != 1) {
1357                 continue;
1358             }
1359             pthread_mutex_lock(&lru_locks[i]);
1360             search = crawler_crawl_q((item *)&crawlers[i]);
1361             if (search == NULL ||
1362                 (crawlers[i].remaining && --crawlers[i].remaining < 1)) {
1363                 if (settings.verbose > 2)
1364                     fprintf(stderr, "Nothing left to crawl for %d\n", i);
1365                 crawlers[i].it_flags = 0;
1366                 crawler_count--;
1367                 crawler_unlink_q((item *)&crawlers[i]);
1368                 pthread_mutex_unlock(&lru_locks[i]);
1369                 pthread_mutex_lock(&lru_crawler_stats_lock);
1370                 crawlerstats[CLEAR_LRU(i)].end_time = current_time;
1371                 crawlerstats[CLEAR_LRU(i)].run_complete = true;
1372                 pthread_mutex_unlock(&lru_crawler_stats_lock);
1373                 continue;
1374             }
1375             uint32_t hv = hash(ITEM_key(search), search->nkey);
1376             /* Attempt to hash item lock the "search" item. If locked, no
1377              * other callers can incr the refcount
1378              */
1379             if ((hold_lock = item_trylock(hv)) == NULL) {
1380                 pthread_mutex_unlock(&lru_locks[i]);
1381                 continue;
1382             }
1383             /* Now see if the item is refcount locked */
1384             if (refcount_incr(&search->refcount) != 2) {
1385                 refcount_decr(&search->refcount);
1386                 if (hold_lock)
1387                     item_trylock_unlock(hold_lock);
1388                 pthread_mutex_unlock(&lru_locks[i]);
1389                 continue;
1390             }
1391 
1392             /* Frees the item or decrements the refcount. */
1393             /* Interface for this could improve: do the free/decr here
1394              * instead? */
1395             pthread_mutex_lock(&lru_crawler_stats_lock);
1396             item_crawler_evaluate(search, hv, i);
1397             pthread_mutex_unlock(&lru_crawler_stats_lock);
1398 
1399             if (hold_lock)
1400                 item_trylock_unlock(hold_lock);
1401             pthread_mutex_unlock(&lru_locks[i]);
1402 
1403             if (crawls_persleep <= 0 && settings.lru_crawler_sleep) {
1404                 usleep(settings.lru_crawler_sleep);
1405                 crawls_persleep = settings.crawls_persleep;
1406             }
1407         }
1408     }
1409     if (settings.verbose > 2)
1410         fprintf(stderr, "LRU crawler thread sleeping\n");
1411     STATS_LOCK();
1412     stats_state.lru_crawler_running = false;
1413     STATS_UNLOCK();
1414     }
1415     pthread_mutex_unlock(&lru_crawler_lock);
1416     if (settings.verbose > 2)
1417         fprintf(stderr, "LRU crawler thread stopping\n");
1418 
1419     return NULL;
1420 }
1421 
1422 static pthread_t item_crawler_tid;
1423 
stop_item_crawler_thread(void)1424 int stop_item_crawler_thread(void) {
1425     int ret;
1426     pthread_mutex_lock(&lru_crawler_lock);
1427     do_run_lru_crawler_thread = 0;
1428     pthread_cond_signal(&lru_crawler_cond);
1429     pthread_mutex_unlock(&lru_crawler_lock);
1430     if ((ret = pthread_join(item_crawler_tid, NULL)) != 0) {
1431         fprintf(stderr, "Failed to stop LRU crawler thread: %s\n", strerror(ret));
1432         return -1;
1433     }
1434     settings.lru_crawler = false;
1435     return 0;
1436 }
1437 
1438 /* Lock dance to "block" until thread is waiting on its condition:
1439  * caller locks mtx. caller spawns thread.
1440  * thread blocks on mutex.
1441  * caller waits on condition, releases lock.
1442  * thread gets lock, sends signal.
1443  * caller can't wait, as thread has lock.
1444  * thread waits on condition, releases lock
1445  * caller wakes on condition, gets lock.
1446  * caller immediately releases lock.
1447  * thread is now safely waiting on condition before the caller returns.
1448  */
start_item_crawler_thread(void)1449 int start_item_crawler_thread(void) {
1450     int ret;
1451 
1452     if (settings.lru_crawler)
1453         return -1;
1454     pthread_mutex_lock(&lru_crawler_lock);
1455     do_run_lru_crawler_thread = 1;
1456     if ((ret = pthread_create(&item_crawler_tid, NULL,
1457         item_crawler_thread, NULL)) != 0) {
1458         fprintf(stderr, "Can't create LRU crawler thread: %s\n",
1459             strerror(ret));
1460         pthread_mutex_unlock(&lru_crawler_lock);
1461         return -1;
1462     }
1463     /* Avoid returning until the crawler has actually started */
1464     pthread_cond_wait(&lru_crawler_cond, &lru_crawler_lock);
1465     pthread_mutex_unlock(&lru_crawler_lock);
1466 
1467     return 0;
1468 }
1469 
1470 /* 'remaining' is passed in so the LRU maintainer thread can scrub the whole
1471  * LRU every time.
1472  */
do_lru_crawler_start(uint32_t id,uint32_t remaining)1473 static int do_lru_crawler_start(uint32_t id, uint32_t remaining) {
1474     int i;
1475     uint32_t sid;
1476     uint32_t tocrawl[3];
1477     int starts = 0;
1478     tocrawl[0] = id | HOT_LRU;
1479     tocrawl[1] = id | WARM_LRU;
1480     tocrawl[2] = id | COLD_LRU;
1481 
1482     for (i = 0; i < 3; i++) {
1483         sid = tocrawl[i];
1484         pthread_mutex_lock(&lru_locks[sid]);
1485         if (tails[sid] != NULL) {
1486             if (settings.verbose > 2)
1487                 fprintf(stderr, "Kicking LRU crawler off for LRU %u\n", sid);
1488             crawlers[sid].nbytes = 0;
1489             crawlers[sid].nkey = 0;
1490             crawlers[sid].it_flags = 1; /* For a crawler, this means enabled. */
1491             crawlers[sid].next = 0;
1492             crawlers[sid].prev = 0;
1493             crawlers[sid].time = 0;
1494             crawlers[sid].remaining = remaining;
1495             crawlers[sid].slabs_clsid = sid;
1496             crawler_link_q((item *)&crawlers[sid]);
1497             crawler_count++;
1498             starts++;
1499         }
1500         pthread_mutex_unlock(&lru_locks[sid]);
1501     }
1502     if (starts) {
1503         STATS_LOCK();
1504         stats_state.lru_crawler_running = true;
1505         stats.lru_crawler_starts++;
1506         STATS_UNLOCK();
1507         pthread_mutex_lock(&lru_crawler_stats_lock);
1508         memset(&crawlerstats[id], 0, sizeof(crawlerstats_t));
1509         crawlerstats[id].start_time = current_time;
1510         pthread_mutex_unlock(&lru_crawler_stats_lock);
1511     }
1512     return starts;
1513 }
1514 
lru_crawler_start(uint32_t id,uint32_t remaining)1515 static int lru_crawler_start(uint32_t id, uint32_t remaining) {
1516     int starts;
1517     if (pthread_mutex_trylock(&lru_crawler_lock) != 0) {
1518         return 0;
1519     }
1520     starts = do_lru_crawler_start(id, remaining);
1521     if (starts) {
1522         pthread_cond_signal(&lru_crawler_cond);
1523     }
1524     pthread_mutex_unlock(&lru_crawler_lock);
1525     return starts;
1526 }
1527 
1528 /* FIXME: Split this into two functions: one to kick a crawler for a sid, and one to
1529  * parse the string. LRU maintainer code is generating a string to set up a
1530  * sid.
1531  * Also only clear the crawlerstats once per sid.
1532  */
lru_crawler_crawl(char * slabs)1533 enum crawler_result_type lru_crawler_crawl(char *slabs) {
1534     char *b = NULL;
1535     uint32_t sid = 0;
1536     int starts = 0;
1537     uint8_t tocrawl[MAX_NUMBER_OF_SLAB_CLASSES];
1538     if (pthread_mutex_trylock(&lru_crawler_lock) != 0) {
1539         return CRAWLER_RUNNING;
1540     }
1541 
1542     /* FIXME: I added this while debugging. Don't think it's needed? */
1543     memset(tocrawl, 0, sizeof(uint8_t) * MAX_NUMBER_OF_SLAB_CLASSES);
1544     if (strcmp(slabs, "all") == 0) {
1545         for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
1546             tocrawl[sid] = 1;
1547         }
1548     } else {
1549         for (char *p = strtok_r(slabs, ",", &b);
1550              p != NULL;
1551              p = strtok_r(NULL, ",", &b)) {
1552 
1553             if (!safe_strtoul(p, &sid) || sid < POWER_SMALLEST
1554                     || sid >= MAX_NUMBER_OF_SLAB_CLASSES-1) {
1555                 pthread_mutex_unlock(&lru_crawler_lock);
1556                 return CRAWLER_BADCLASS;
1557             }
1558             tocrawl[sid] = 1;
1559         }
1560     }
1561 
1562     for (sid = POWER_SMALLEST; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
1563         if (tocrawl[sid])
1564             starts += do_lru_crawler_start(sid, settings.lru_crawler_tocrawl);
1565     }
1566     if (starts) {
1567         pthread_cond_signal(&lru_crawler_cond);
1568         pthread_mutex_unlock(&lru_crawler_lock);
1569         return CRAWLER_OK;
1570     } else {
1571         pthread_mutex_unlock(&lru_crawler_lock);
1572         return CRAWLER_NOTSTARTED;
1573     }
1574 }
1575 
1576 /* If we hold this lock, crawler can't wake up or move */
lru_crawler_pause(void)1577 void lru_crawler_pause(void) {
1578     pthread_mutex_lock(&lru_crawler_lock);
1579 }
1580 
lru_crawler_resume(void)1581 void lru_crawler_resume(void) {
1582     pthread_mutex_unlock(&lru_crawler_lock);
1583 }
1584 
init_lru_crawler(void)1585 int init_lru_crawler(void) {
1586     if (lru_crawler_initialized == 0) {
1587         memset(&crawlerstats, 0, sizeof(crawlerstats_t) * MAX_NUMBER_OF_SLAB_CLASSES);
1588         if (pthread_cond_init(&lru_crawler_cond, NULL) != 0) {
1589             fprintf(stderr, "Can't initialize lru crawler condition\n");
1590             return -1;
1591         }
1592         pthread_mutex_init(&lru_crawler_lock, NULL);
1593         lru_crawler_initialized = 1;
1594     }
1595     return 0;
1596 }
1597