1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 #include "memcached.h"
3 #include <sys/stat.h>
4 #include <sys/socket.h>
5 #include <sys/resource.h>
6 #include <fcntl.h>
7 #include <netinet/in.h>
8 #include <errno.h>
9 #include <stdlib.h>
10 #include <stdio.h>
11 #include <signal.h>
12 #include <string.h>
13 #include <time.h>
14 #include <assert.h>
15 #include <unistd.h>
16
17 /* Forward Declarations */
18 static void item_link_q(item *it);
19 static void item_unlink_q(item *it);
20
21 #define HOT_LRU 0
22 #define WARM_LRU 64
23 #define COLD_LRU 128
24 #define NOEXP_LRU 192
25 static unsigned int lru_type_map[4] = {HOT_LRU, WARM_LRU, COLD_LRU, NOEXP_LRU};
26
27 #define CLEAR_LRU(id) (id & ~(3<<6))
28
29 #define LARGEST_ID POWER_LARGEST
30 typedef struct {
31 uint64_t evicted;
32 uint64_t evicted_nonzero;
33 uint64_t reclaimed;
34 uint64_t outofmemory;
35 uint64_t tailrepairs;
36 uint64_t expired_unfetched; /* items reclaimed but never touched */
37 uint64_t evicted_unfetched; /* items evicted but never touched */
38 uint64_t crawler_reclaimed;
39 uint64_t crawler_items_checked;
40 uint64_t lrutail_reflocked;
41 uint64_t moves_to_cold;
42 uint64_t moves_to_warm;
43 uint64_t moves_within_lru;
44 uint64_t direct_reclaims;
45 rel_time_t evicted_time;
46 } itemstats_t;
47
48 typedef struct {
49 uint64_t histo[61];
50 uint64_t ttl_hourplus;
51 uint64_t noexp;
52 uint64_t reclaimed;
53 uint64_t seen;
54 rel_time_t start_time;
55 rel_time_t end_time;
56 bool run_complete;
57 } crawlerstats_t;
58
59 static item *heads[LARGEST_ID];
60 static item *tails[LARGEST_ID];
61 static crawler crawlers[LARGEST_ID];
62 static itemstats_t itemstats[LARGEST_ID];
63 static unsigned int sizes[LARGEST_ID];
64 static uint64_t sizes_bytes[LARGEST_ID];
65 static unsigned int *stats_sizes_hist = NULL;
66 static uint64_t stats_sizes_cas_min = 0;
67 static int stats_sizes_buckets = 0;
68 static crawlerstats_t crawlerstats[MAX_NUMBER_OF_SLAB_CLASSES];
69
70 static int crawler_count = 0;
71 static volatile int do_run_lru_crawler_thread = 0;
72 static volatile int do_run_lru_maintainer_thread = 0;
73 static int lru_crawler_initialized = 0;
74 static int lru_maintainer_initialized = 0;
75 static int lru_maintainer_check_clsid = 0;
76 static pthread_mutex_t lru_crawler_lock = PTHREAD_MUTEX_INITIALIZER;
77 static pthread_cond_t lru_crawler_cond = PTHREAD_COND_INITIALIZER;
78 static pthread_mutex_t lru_crawler_stats_lock = PTHREAD_MUTEX_INITIALIZER;
79 static pthread_mutex_t lru_maintainer_lock = PTHREAD_MUTEX_INITIALIZER;
80 static pthread_mutex_t cas_id_lock = PTHREAD_MUTEX_INITIALIZER;
81 static pthread_mutex_t stats_sizes_lock = PTHREAD_MUTEX_INITIALIZER;
82
item_stats_reset(void)83 void item_stats_reset(void) {
84 int i;
85 for (i = 0; i < LARGEST_ID; i++) {
86 pthread_mutex_lock(&lru_locks[i]);
87 memset(&itemstats[i], 0, sizeof(itemstats_t));
88 pthread_mutex_unlock(&lru_locks[i]);
89 }
90 }
91
92 static int lru_pull_tail(const int orig_id, const int cur_lru,
93 const uint64_t total_bytes, const bool do_evict);
94 static int lru_crawler_start(uint32_t id, uint32_t remaining);
95
96 /* Get the next CAS id for a new item. */
97 /* TODO: refactor some atomics for this. */
get_cas_id(void)98 uint64_t get_cas_id(void) {
99 static uint64_t cas_id = 0;
100 pthread_mutex_lock(&cas_id_lock);
101 uint64_t next_id = ++cas_id;
102 pthread_mutex_unlock(&cas_id_lock);
103 return next_id;
104 }
105
item_is_flushed(item * it)106 int item_is_flushed(item *it) {
107 rel_time_t oldest_live = settings.oldest_live;
108 uint64_t cas = ITEM_get_cas(it);
109 uint64_t oldest_cas = settings.oldest_cas;
110 if (oldest_live == 0 || oldest_live > current_time)
111 return 0;
112 if ((it->time <= oldest_live)
113 || (oldest_cas != 0 && cas != 0 && cas < oldest_cas)) {
114 return 1;
115 }
116 return 0;
117 }
118
noexp_lru_size(int slabs_clsid)119 static unsigned int noexp_lru_size(int slabs_clsid) {
120 int id = CLEAR_LRU(slabs_clsid);
121 id |= NOEXP_LRU;
122 unsigned int ret;
123 pthread_mutex_lock(&lru_locks[id]);
124 ret = sizes_bytes[id];
125 pthread_mutex_unlock(&lru_locks[id]);
126 return ret;
127 }
128
129 /* Enable this for reference-count debugging. */
130 #if 0
131 # define DEBUG_REFCNT(it,op) \
132 fprintf(stderr, "item %x refcnt(%c) %d %c%c%c\n", \
133 it, op, it->refcount, \
134 (it->it_flags & ITEM_LINKED) ? 'L' : ' ', \
135 (it->it_flags & ITEM_SLABBED) ? 'S' : ' ')
136 #else
137 # define DEBUG_REFCNT(it,op) while(0)
138 #endif
139
140 /**
141 * Generates the variable-sized part of the header for an object.
142 *
143 * key - The key
144 * nkey - The length of the key
145 * flags - key flags
146 * nbytes - Number of bytes to hold value and addition CRLF terminator
147 * suffix - Buffer for the "VALUE" line suffix (flags, size).
148 * nsuffix - The length of the suffix is stored here.
149 *
150 * Returns the total size of the header.
151 */
item_make_header(const uint8_t nkey,const unsigned int flags,const int nbytes,char * suffix,uint8_t * nsuffix)152 static size_t item_make_header(const uint8_t nkey, const unsigned int flags, const int nbytes,
153 char *suffix, uint8_t *nsuffix) {
154 /* suffix is defined at 40 chars elsewhere.. */
155 *nsuffix = (uint8_t) snprintf(suffix, 40, " %u %d\r\n", flags, nbytes - 2);
156 return sizeof(item) + nkey + *nsuffix + nbytes;
157 }
158
do_item_alloc(char * key,const size_t nkey,const unsigned int flags,const rel_time_t exptime,const int nbytes)159 item *do_item_alloc(char *key, const size_t nkey, const unsigned int flags,
160 const rel_time_t exptime, const int nbytes) {
161 int i;
162 uint8_t nsuffix;
163 item *it = NULL;
164 char suffix[40];
165 size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix);
166 if (settings.use_cas) {
167 ntotal += sizeof(uint64_t);
168 }
169
170 unsigned int id = slabs_clsid(ntotal);
171 if (id == 0)
172 return 0;
173
174 /* If no memory is available, attempt a direct LRU juggle/eviction */
175 /* This is a race in order to simplify lru_pull_tail; in cases where
176 * locked items are on the tail, you want them to fall out and cause
177 * occasional OOM's, rather than internally work around them.
178 * This also gives one fewer code path for slab alloc/free
179 */
180 /* TODO: if power_largest, try a lot more times? or a number of times
181 * based on how many chunks the new object should take up?
182 * or based on the size of an object lru_pull_tail() says it evicted?
183 * This is a classical GC problem if "large items" are of too varying of
184 * sizes. This is actually okay here since the larger the data, the more
185 * bandwidth it takes, the more time we can loop in comparison to serving
186 * and replacing small items.
187 */
188 for (i = 0; i < 10; i++) {
189 uint64_t total_bytes;
190 /* Try to reclaim memory first */
191 if (!settings.lru_maintainer_thread) {
192 lru_pull_tail(id, COLD_LRU, 0, false);
193 }
194 it = slabs_alloc(ntotal, id, &total_bytes, 0);
195
196 if (settings.expirezero_does_not_evict)
197 total_bytes -= noexp_lru_size(id);
198
199 if (it == NULL) {
200 if (settings.lru_maintainer_thread) {
201 lru_pull_tail(id, HOT_LRU, total_bytes, false);
202 lru_pull_tail(id, WARM_LRU, total_bytes, false);
203 if (lru_pull_tail(id, COLD_LRU, total_bytes, true) <= 0)
204 break;
205 } else {
206 if (lru_pull_tail(id, COLD_LRU, 0, true) <= 0)
207 break;
208 }
209 } else {
210 break;
211 }
212 }
213
214 if (i > 0) {
215 pthread_mutex_lock(&lru_locks[id]);
216 itemstats[id].direct_reclaims += i;
217 pthread_mutex_unlock(&lru_locks[id]);
218 }
219
220 if (it == NULL) {
221 pthread_mutex_lock(&lru_locks[id]);
222 itemstats[id].outofmemory++;
223 pthread_mutex_unlock(&lru_locks[id]);
224 return NULL;
225 }
226
227 assert(it->slabs_clsid == 0);
228 //assert(it != heads[id]);
229
230 /* Refcount is seeded to 1 by slabs_alloc() */
231 it->next = it->prev = 0;
232
233 /* Items are initially loaded into the HOT_LRU. This is '0' but I want at
234 * least a note here. Compiler (hopefully?) optimizes this out.
235 */
236 if (settings.lru_maintainer_thread) {
237 if (exptime == 0 && settings.expirezero_does_not_evict) {
238 id |= NOEXP_LRU;
239 } else {
240 id |= HOT_LRU;
241 }
242 } else {
243 /* There is only COLD in compat-mode */
244 id |= COLD_LRU;
245 }
246 it->slabs_clsid = id;
247
248 DEBUG_REFCNT(it, '*');
249 it->it_flags |= settings.use_cas ? ITEM_CAS : 0;
250 it->nkey = nkey;
251 it->nbytes = nbytes;
252 memcpy(ITEM_key(it), key, nkey);
253 it->exptime = exptime;
254 memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);
255 it->nsuffix = nsuffix;
256
257 /* Need to shuffle the pointer stored in h_next into it->data. */
258 if (it->it_flags & ITEM_CHUNKED) {
259 item_chunk *chunk = (item_chunk *) ITEM_data(it);
260
261 chunk->next = (item_chunk *) it->h_next;
262 chunk->prev = 0;
263 chunk->head = it;
264 /* Need to chain back into the head's chunk */
265 chunk->next->prev = chunk;
266 chunk->size = chunk->next->size - ((char *)chunk - (char *)it);
267 chunk->used = 0;
268 assert(chunk->size > 0);
269 }
270 it->h_next = 0;
271
272 return it;
273 }
274
item_free(item * it)275 void item_free(item *it) {
276 size_t ntotal = ITEM_ntotal(it);
277 unsigned int clsid;
278 assert((it->it_flags & ITEM_LINKED) == 0);
279 assert(it != heads[it->slabs_clsid]);
280 assert(it != tails[it->slabs_clsid]);
281 assert(it->refcount == 0);
282
283 /* so slab size changer can tell later if item is already free or not */
284 clsid = ITEM_clsid(it);
285 DEBUG_REFCNT(it, 'F');
286 slabs_free(it, ntotal, clsid);
287 }
288
289 /**
290 * Returns true if an item will fit in the cache (its size does not exceed
291 * the maximum for a cache entry.)
292 */
item_size_ok(const size_t nkey,const int flags,const int nbytes)293 bool item_size_ok(const size_t nkey, const int flags, const int nbytes) {
294 char prefix[40];
295 uint8_t nsuffix;
296
297 size_t ntotal = item_make_header(nkey + 1, flags, nbytes,
298 prefix, &nsuffix);
299 if (settings.use_cas) {
300 ntotal += sizeof(uint64_t);
301 }
302
303 return slabs_clsid(ntotal) != 0;
304 }
305
do_item_link_q(item * it)306 static void do_item_link_q(item *it) { /* item is the new head */
307 item **head, **tail;
308 assert((it->it_flags & ITEM_SLABBED) == 0);
309
310 head = &heads[it->slabs_clsid];
311 tail = &tails[it->slabs_clsid];
312 assert(it != *head);
313 assert((*head && *tail) || (*head == 0 && *tail == 0));
314 it->prev = 0;
315 it->next = *head;
316 if (it->next) it->next->prev = it;
317 *head = it;
318 if (*tail == 0) *tail = it;
319 sizes[it->slabs_clsid]++;
320 sizes_bytes[it->slabs_clsid] += it->nbytes;
321 return;
322 }
323
item_link_q(item * it)324 static void item_link_q(item *it) {
325 pthread_mutex_lock(&lru_locks[it->slabs_clsid]);
326 do_item_link_q(it);
327 pthread_mutex_unlock(&lru_locks[it->slabs_clsid]);
328 }
329
do_item_unlink_q(item * it)330 static void do_item_unlink_q(item *it) {
331 item **head, **tail;
332 head = &heads[it->slabs_clsid];
333 tail = &tails[it->slabs_clsid];
334
335 if (*head == it) {
336 assert(it->prev == 0);
337 *head = it->next;
338 }
339 if (*tail == it) {
340 assert(it->next == 0);
341 *tail = it->prev;
342 }
343 assert(it->next != it);
344 assert(it->prev != it);
345
346 if (it->next) it->next->prev = it->prev;
347 if (it->prev) it->prev->next = it->next;
348 sizes[it->slabs_clsid]--;
349 sizes_bytes[it->slabs_clsid] -= it->nbytes;
350 return;
351 }
352
item_unlink_q(item * it)353 static void item_unlink_q(item *it) {
354 pthread_mutex_lock(&lru_locks[it->slabs_clsid]);
355 do_item_unlink_q(it);
356 pthread_mutex_unlock(&lru_locks[it->slabs_clsid]);
357 }
358
do_item_link(item * it,const uint32_t hv)359 int do_item_link(item *it, const uint32_t hv) {
360 MEMCACHED_ITEM_LINK(ITEM_key(it), it->nkey, it->nbytes);
361 assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
362 it->it_flags |= ITEM_LINKED;
363 it->time = current_time;
364
365 STATS_LOCK();
366 stats_state.curr_bytes += ITEM_ntotal(it);
367 stats_state.curr_items += 1;
368 stats.total_items += 1;
369 STATS_UNLOCK();
370
371 /* Allocate a new CAS ID on link. */
372 ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
373 assoc_insert(it, hv);
374 item_link_q(it);
375 refcount_incr(&it->refcount);
376 item_stats_sizes_add(it);
377
378 return 1;
379 }
380
do_item_unlink(item * it,const uint32_t hv)381 void do_item_unlink(item *it, const uint32_t hv) {
382 MEMCACHED_ITEM_UNLINK(ITEM_key(it), it->nkey, it->nbytes);
383 if ((it->it_flags & ITEM_LINKED) != 0) {
384 it->it_flags &= ~ITEM_LINKED;
385 STATS_LOCK();
386 stats_state.curr_bytes -= ITEM_ntotal(it);
387 stats_state.curr_items -= 1;
388 STATS_UNLOCK();
389 item_stats_sizes_remove(it);
390 assoc_delete(ITEM_key(it), it->nkey, hv);
391 item_unlink_q(it);
392 do_item_remove(it);
393 }
394 }
395
396 /* FIXME: Is it necessary to keep this copy/pasted code? */
do_item_unlink_nolock(item * it,const uint32_t hv)397 void do_item_unlink_nolock(item *it, const uint32_t hv) {
398 MEMCACHED_ITEM_UNLINK(ITEM_key(it), it->nkey, it->nbytes);
399 if ((it->it_flags & ITEM_LINKED) != 0) {
400 it->it_flags &= ~ITEM_LINKED;
401 STATS_LOCK();
402 stats_state.curr_bytes -= ITEM_ntotal(it);
403 stats_state.curr_items -= 1;
404 STATS_UNLOCK();
405 item_stats_sizes_remove(it);
406 assoc_delete(ITEM_key(it), it->nkey, hv);
407 do_item_unlink_q(it);
408 do_item_remove(it);
409 }
410 }
411
do_item_remove(item * it)412 void do_item_remove(item *it) {
413 MEMCACHED_ITEM_REMOVE(ITEM_key(it), it->nkey, it->nbytes);
414 assert((it->it_flags & ITEM_SLABBED) == 0);
415 assert(it->refcount > 0);
416
417 if (refcount_decr(&it->refcount) == 0) {
418 item_free(it);
419 }
420 }
421
422 /* Copy/paste to avoid adding two extra branches for all common calls, since
423 * _nolock is only used in an uncommon case where we want to relink. */
do_item_update_nolock(item * it)424 void do_item_update_nolock(item *it) {
425 MEMCACHED_ITEM_UPDATE(ITEM_key(it), it->nkey, it->nbytes);
426 if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
427 assert((it->it_flags & ITEM_SLABBED) == 0);
428
429 if ((it->it_flags & ITEM_LINKED) != 0) {
430 do_item_unlink_q(it);
431 it->time = current_time;
432 do_item_link_q(it);
433 }
434 }
435 }
436
437 /* Bump the last accessed time, or relink if we're in compat mode */
do_item_update(item * it)438 void do_item_update(item *it) {
439 MEMCACHED_ITEM_UPDATE(ITEM_key(it), it->nkey, it->nbytes);
440 if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
441 assert((it->it_flags & ITEM_SLABBED) == 0);
442
443 if ((it->it_flags & ITEM_LINKED) != 0) {
444 it->time = current_time;
445 if (!settings.lru_maintainer_thread) {
446 item_unlink_q(it);
447 item_link_q(it);
448 }
449 }
450 }
451 }
452
do_item_replace(item * it,item * new_it,const uint32_t hv)453 int do_item_replace(item *it, item *new_it, const uint32_t hv) {
454 MEMCACHED_ITEM_REPLACE(ITEM_key(it), it->nkey, it->nbytes,
455 ITEM_key(new_it), new_it->nkey, new_it->nbytes);
456 assert((it->it_flags & ITEM_SLABBED) == 0);
457
458 do_item_unlink(it, hv);
459 return do_item_link(new_it, hv);
460 }
461
462 /*@null@*/
463 /* This is walking the line of violating lock order, but I think it's safe.
464 * If the LRU lock is held, an item in the LRU cannot be wiped and freed.
465 * The data could possibly be overwritten, but this is only accessing the
466 * headers.
467 * It may not be the best idea to leave it like this, but for now it's safe.
468 * FIXME: only dumps the hot LRU with the new LRU's.
469 */
item_cachedump(const unsigned int slabs_clsid,const unsigned int limit,unsigned int * bytes)470 char *item_cachedump(const unsigned int slabs_clsid, const unsigned int limit, unsigned int *bytes) {
471 unsigned int memlimit = 2 * 1024 * 1024; /* 2MB max response size */
472 char *buffer;
473 unsigned int bufcurr;
474 item *it;
475 unsigned int len;
476 unsigned int shown = 0;
477 char key_temp[KEY_MAX_LENGTH + 1];
478 char temp[512];
479 unsigned int id = slabs_clsid;
480 if (!settings.lru_maintainer_thread)
481 id |= COLD_LRU;
482
483 pthread_mutex_lock(&lru_locks[id]);
484 it = heads[id];
485
486 buffer = malloc((size_t)memlimit);
487 if (buffer == 0) {
488 return NULL;
489 }
490 bufcurr = 0;
491
492 while (it != NULL && (limit == 0 || shown < limit)) {
493 assert(it->nkey <= KEY_MAX_LENGTH);
494 if (it->nbytes == 0 && it->nkey == 0) {
495 it = it->next;
496 continue;
497 }
498 /* Copy the key since it may not be null-terminated in the struct */
499 strncpy(key_temp, ITEM_key(it), it->nkey);
500 key_temp[it->nkey] = 0x00; /* terminate */
501 len = snprintf(temp, sizeof(temp), "ITEM %s [%d b; %lu s]\r\n",
502 key_temp, it->nbytes - 2,
503 it->exptime == 0 ? 0 :
504 (unsigned long)it->exptime + process_started);
505 if (bufcurr + len + 6 > memlimit) /* 6 is END\r\n\0 */
506 break;
507 memcpy(buffer + bufcurr, temp, len);
508 bufcurr += len;
509 shown++;
510 it = it->next;
511 }
512
513 memcpy(buffer + bufcurr, "END\r\n", 6);
514 bufcurr += 5;
515
516 *bytes = bufcurr;
517 pthread_mutex_unlock(&lru_locks[id]);
518 return buffer;
519 }
520
item_stats_totals(ADD_STAT add_stats,void * c)521 void item_stats_totals(ADD_STAT add_stats, void *c) {
522 itemstats_t totals;
523 memset(&totals, 0, sizeof(itemstats_t));
524 int n;
525 for (n = 0; n < MAX_NUMBER_OF_SLAB_CLASSES; n++) {
526 int x;
527 int i;
528 for (x = 0; x < 4; x++) {
529 i = n | lru_type_map[x];
530 pthread_mutex_lock(&lru_locks[i]);
531 totals.expired_unfetched += itemstats[i].expired_unfetched;
532 totals.evicted_unfetched += itemstats[i].evicted_unfetched;
533 totals.evicted += itemstats[i].evicted;
534 totals.reclaimed += itemstats[i].reclaimed;
535 totals.crawler_reclaimed += itemstats[i].crawler_reclaimed;
536 totals.crawler_items_checked += itemstats[i].crawler_items_checked;
537 totals.lrutail_reflocked += itemstats[i].lrutail_reflocked;
538 totals.moves_to_cold += itemstats[i].moves_to_cold;
539 totals.moves_to_warm += itemstats[i].moves_to_warm;
540 totals.moves_within_lru += itemstats[i].moves_within_lru;
541 totals.direct_reclaims += itemstats[i].direct_reclaims;
542 pthread_mutex_unlock(&lru_locks[i]);
543 }
544 }
545 APPEND_STAT("expired_unfetched", "%llu",
546 (unsigned long long)totals.expired_unfetched);
547 APPEND_STAT("evicted_unfetched", "%llu",
548 (unsigned long long)totals.evicted_unfetched);
549 APPEND_STAT("evictions", "%llu",
550 (unsigned long long)totals.evicted);
551 APPEND_STAT("reclaimed", "%llu",
552 (unsigned long long)totals.reclaimed);
553 APPEND_STAT("crawler_reclaimed", "%llu",
554 (unsigned long long)totals.crawler_reclaimed);
555 APPEND_STAT("crawler_items_checked", "%llu",
556 (unsigned long long)totals.crawler_items_checked);
557 APPEND_STAT("lrutail_reflocked", "%llu",
558 (unsigned long long)totals.lrutail_reflocked);
559 if (settings.lru_maintainer_thread) {
560 APPEND_STAT("moves_to_cold", "%llu",
561 (unsigned long long)totals.moves_to_cold);
562 APPEND_STAT("moves_to_warm", "%llu",
563 (unsigned long long)totals.moves_to_warm);
564 APPEND_STAT("moves_within_lru", "%llu",
565 (unsigned long long)totals.moves_within_lru);
566 APPEND_STAT("direct_reclaims", "%llu",
567 (unsigned long long)totals.direct_reclaims);
568 }
569 }
570
item_stats(ADD_STAT add_stats,void * c)571 void item_stats(ADD_STAT add_stats, void *c) {
572 itemstats_t totals;
573 int n;
574 for (n = 0; n < MAX_NUMBER_OF_SLAB_CLASSES; n++) {
575 memset(&totals, 0, sizeof(itemstats_t));
576 int x;
577 int i;
578 unsigned int size = 0;
579 unsigned int age = 0;
580 unsigned int lru_size_map[4];
581 const char *fmt = "items:%d:%s";
582 char key_str[STAT_KEY_LEN];
583 char val_str[STAT_VAL_LEN];
584 int klen = 0, vlen = 0;
585 for (x = 0; x < 4; x++) {
586 i = n | lru_type_map[x];
587 pthread_mutex_lock(&lru_locks[i]);
588 totals.evicted += itemstats[i].evicted;
589 totals.evicted_nonzero += itemstats[i].evicted_nonzero;
590 totals.outofmemory += itemstats[i].outofmemory;
591 totals.tailrepairs += itemstats[i].tailrepairs;
592 totals.reclaimed += itemstats[i].reclaimed;
593 totals.expired_unfetched += itemstats[i].expired_unfetched;
594 totals.evicted_unfetched += itemstats[i].evicted_unfetched;
595 totals.crawler_reclaimed += itemstats[i].crawler_reclaimed;
596 totals.crawler_items_checked += itemstats[i].crawler_items_checked;
597 totals.lrutail_reflocked += itemstats[i].lrutail_reflocked;
598 totals.moves_to_cold += itemstats[i].moves_to_cold;
599 totals.moves_to_warm += itemstats[i].moves_to_warm;
600 totals.moves_within_lru += itemstats[i].moves_within_lru;
601 totals.direct_reclaims += itemstats[i].direct_reclaims;
602 size += sizes[i];
603 lru_size_map[x] = sizes[i];
604 if (lru_type_map[x] == COLD_LRU && tails[i] != NULL)
605 age = current_time - tails[i]->time;
606 pthread_mutex_unlock(&lru_locks[i]);
607 }
608 if (size == 0)
609 continue;
610 APPEND_NUM_FMT_STAT(fmt, n, "number", "%u", size);
611 if (settings.lru_maintainer_thread) {
612 APPEND_NUM_FMT_STAT(fmt, n, "number_hot", "%u", lru_size_map[0]);
613 APPEND_NUM_FMT_STAT(fmt, n, "number_warm", "%u", lru_size_map[1]);
614 APPEND_NUM_FMT_STAT(fmt, n, "number_cold", "%u", lru_size_map[2]);
615 if (settings.expirezero_does_not_evict) {
616 APPEND_NUM_FMT_STAT(fmt, n, "number_noexp", "%u", lru_size_map[3]);
617 }
618 }
619 APPEND_NUM_FMT_STAT(fmt, n, "age", "%u", age);
620 APPEND_NUM_FMT_STAT(fmt, n, "evicted",
621 "%llu", (unsigned long long)totals.evicted);
622 APPEND_NUM_FMT_STAT(fmt, n, "evicted_nonzero",
623 "%llu", (unsigned long long)totals.evicted_nonzero);
624 APPEND_NUM_FMT_STAT(fmt, n, "evicted_time",
625 "%u", totals.evicted_time);
626 APPEND_NUM_FMT_STAT(fmt, n, "outofmemory",
627 "%llu", (unsigned long long)totals.outofmemory);
628 APPEND_NUM_FMT_STAT(fmt, n, "tailrepairs",
629 "%llu", (unsigned long long)totals.tailrepairs);
630 APPEND_NUM_FMT_STAT(fmt, n, "reclaimed",
631 "%llu", (unsigned long long)totals.reclaimed);
632 APPEND_NUM_FMT_STAT(fmt, n, "expired_unfetched",
633 "%llu", (unsigned long long)totals.expired_unfetched);
634 APPEND_NUM_FMT_STAT(fmt, n, "evicted_unfetched",
635 "%llu", (unsigned long long)totals.evicted_unfetched);
636 APPEND_NUM_FMT_STAT(fmt, n, "crawler_reclaimed",
637 "%llu", (unsigned long long)totals.crawler_reclaimed);
638 APPEND_NUM_FMT_STAT(fmt, n, "crawler_items_checked",
639 "%llu", (unsigned long long)totals.crawler_items_checked);
640 APPEND_NUM_FMT_STAT(fmt, n, "lrutail_reflocked",
641 "%llu", (unsigned long long)totals.lrutail_reflocked);
642 if (settings.lru_maintainer_thread) {
643 APPEND_NUM_FMT_STAT(fmt, n, "moves_to_cold",
644 "%llu", (unsigned long long)totals.moves_to_cold);
645 APPEND_NUM_FMT_STAT(fmt, n, "moves_to_warm",
646 "%llu", (unsigned long long)totals.moves_to_warm);
647 APPEND_NUM_FMT_STAT(fmt, n, "moves_within_lru",
648 "%llu", (unsigned long long)totals.moves_within_lru);
649 APPEND_NUM_FMT_STAT(fmt, n, "direct_reclaims",
650 "%llu", (unsigned long long)totals.direct_reclaims);
651 }
652 }
653
654 /* getting here means both ascii and binary terminators fit */
655 add_stats(NULL, 0, NULL, 0, c);
656 }
657
item_stats_sizes_status(void)658 bool item_stats_sizes_status(void) {
659 bool ret = false;
660 mutex_lock(&stats_sizes_lock);
661 if (stats_sizes_hist != NULL)
662 ret = true;
663 mutex_unlock(&stats_sizes_lock);
664 return ret;
665 }
666
item_stats_sizes_init(void)667 void item_stats_sizes_init(void) {
668 if (stats_sizes_hist != NULL)
669 return;
670 stats_sizes_buckets = settings.item_size_max / 32 + 1;
671 stats_sizes_hist = calloc(stats_sizes_buckets, sizeof(int));
672 stats_sizes_cas_min = (settings.use_cas) ? get_cas_id() : 0;
673 }
674
item_stats_sizes_enable(ADD_STAT add_stats,void * c)675 void item_stats_sizes_enable(ADD_STAT add_stats, void *c) {
676 mutex_lock(&stats_sizes_lock);
677 if (!settings.use_cas) {
678 APPEND_STAT("sizes_status", "error", "");
679 APPEND_STAT("sizes_error", "cas_support_disabled", "");
680 } else if (stats_sizes_hist == NULL) {
681 item_stats_sizes_init();
682 if (stats_sizes_hist != NULL) {
683 APPEND_STAT("sizes_status", "enabled", "");
684 } else {
685 APPEND_STAT("sizes_status", "error", "");
686 APPEND_STAT("sizes_error", "no_memory", "");
687 }
688 } else {
689 APPEND_STAT("sizes_status", "enabled", "");
690 }
691 mutex_unlock(&stats_sizes_lock);
692 }
693
item_stats_sizes_disable(ADD_STAT add_stats,void * c)694 void item_stats_sizes_disable(ADD_STAT add_stats, void *c) {
695 mutex_lock(&stats_sizes_lock);
696 if (stats_sizes_hist != NULL) {
697 free(stats_sizes_hist);
698 stats_sizes_hist = NULL;
699 }
700 APPEND_STAT("sizes_status", "disabled", "");
701 mutex_unlock(&stats_sizes_lock);
702 }
703
item_stats_sizes_add(item * it)704 void item_stats_sizes_add(item *it) {
705 if (stats_sizes_hist == NULL || stats_sizes_cas_min > ITEM_get_cas(it))
706 return;
707 int ntotal = ITEM_ntotal(it);
708 int bucket = ntotal / 32;
709 if ((ntotal % 32) != 0) bucket++;
710 if (bucket < stats_sizes_buckets) stats_sizes_hist[bucket]++;
711 }
712
713 /* I think there's no way for this to be accurate without using the CAS value.
714 * Since items getting their time value bumped will pass this validation.
715 */
item_stats_sizes_remove(item * it)716 void item_stats_sizes_remove(item *it) {
717 if (stats_sizes_hist == NULL || stats_sizes_cas_min > ITEM_get_cas(it))
718 return;
719 int ntotal = ITEM_ntotal(it);
720 int bucket = ntotal / 32;
721 if ((ntotal % 32) != 0) bucket++;
722 if (bucket < stats_sizes_buckets) stats_sizes_hist[bucket]--;
723 }
724
725 /** dumps out a list of objects of each size, with granularity of 32 bytes */
726 /*@null@*/
727 /* Locks are correct based on a technicality. Holds LRU lock while doing the
728 * work, so items can't go invalid, and it's only looking at header sizes
729 * which don't change.
730 */
item_stats_sizes(ADD_STAT add_stats,void * c)731 void item_stats_sizes(ADD_STAT add_stats, void *c) {
732 mutex_lock(&stats_sizes_lock);
733
734 if (stats_sizes_hist != NULL) {
735 int i;
736 for (i = 0; i < stats_sizes_buckets; i++) {
737 if (stats_sizes_hist[i] != 0) {
738 char key[8];
739 snprintf(key, sizeof(key), "%d", i * 32);
740 APPEND_STAT(key, "%u", stats_sizes_hist[i]);
741 }
742 }
743 } else {
744 APPEND_STAT("sizes_status", "disabled", "");
745 }
746
747 add_stats(NULL, 0, NULL, 0, c);
748 mutex_unlock(&stats_sizes_lock);
749 }
750
751 /** wrapper around assoc_find which does the lazy expiration logic */
do_item_get(const char * key,const size_t nkey,const uint32_t hv,conn * c)752 item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c) {
753 item *it = assoc_find(key, nkey, hv);
754 if (it != NULL) {
755 refcount_incr(&it->refcount);
756 /* Optimization for slab reassignment. prevents popular items from
757 * jamming in busy wait. Can only do this here to satisfy lock order
758 * of item_lock, slabs_lock. */
759 /* This was made unsafe by removal of the cache_lock:
760 * slab_rebalance_signal and slab_rebal.* are modified in a separate
761 * thread under slabs_lock. If slab_rebalance_signal = 1, slab_start =
762 * NULL (0), but slab_end is still equal to some value, this would end
763 * up unlinking every item fetched.
764 * This is either an acceptable loss, or if slab_rebalance_signal is
765 * true, slab_start/slab_end should be put behind the slabs_lock.
766 * Which would cause a huge potential slowdown.
767 * Could also use a specific lock for slab_rebal.* and
768 * slab_rebalance_signal (shorter lock?)
769 */
770 /*if (slab_rebalance_signal &&
771 ((void *)it >= slab_rebal.slab_start && (void *)it < slab_rebal.slab_end)) {
772 do_item_unlink(it, hv);
773 do_item_remove(it);
774 it = NULL;
775 }*/
776 }
777 int was_found = 0;
778
779 if (settings.verbose > 2) {
780 int ii;
781 if (it == NULL) {
782 fprintf(stderr, "> NOT FOUND ");
783 } else {
784 fprintf(stderr, "> FOUND KEY ");
785 }
786 for (ii = 0; ii < nkey; ++ii) {
787 fprintf(stderr, "%c", key[ii]);
788 }
789 }
790
791 if (it != NULL) {
792 was_found = 1;
793 if (item_is_flushed(it)) {
794 do_item_unlink(it, hv);
795 do_item_remove(it);
796 it = NULL;
797 pthread_mutex_lock(&c->thread->stats.mutex);
798 c->thread->stats.get_flushed++;
799 pthread_mutex_unlock(&c->thread->stats.mutex);
800 if (settings.verbose > 2) {
801 fprintf(stderr, " -nuked by flush");
802 }
803 was_found = 2;
804 } else if (it->exptime != 0 && it->exptime <= current_time) {
805 do_item_unlink(it, hv);
806 do_item_remove(it);
807 it = NULL;
808 pthread_mutex_lock(&c->thread->stats.mutex);
809 c->thread->stats.get_expired++;
810 pthread_mutex_unlock(&c->thread->stats.mutex);
811 if (settings.verbose > 2) {
812 fprintf(stderr, " -nuked by expire");
813 }
814 was_found = 3;
815 } else {
816 it->it_flags |= ITEM_FETCHED|ITEM_ACTIVE;
817 DEBUG_REFCNT(it, '+');
818 }
819 }
820
821 if (settings.verbose > 2)
822 fprintf(stderr, "\n");
823 /* For now this is in addition to the above verbose logging. */
824 LOGGER_LOG(c->thread->l, LOG_FETCHERS, LOGGER_ITEM_GET, NULL, was_found, key, nkey);
825
826 return it;
827 }
828
do_item_touch(const char * key,size_t nkey,uint32_t exptime,const uint32_t hv,conn * c)829 item *do_item_touch(const char *key, size_t nkey, uint32_t exptime,
830 const uint32_t hv, conn *c) {
831 item *it = do_item_get(key, nkey, hv, c);
832 if (it != NULL) {
833 it->exptime = exptime;
834 }
835 return it;
836 }
837
838 /*** LRU MAINTENANCE THREAD ***/
839
840 /* Returns number of items remove, expired, or evicted.
841 * Callable from worker threads or the LRU maintainer thread */
lru_pull_tail(const int orig_id,const int cur_lru,const uint64_t total_bytes,const bool do_evict)842 static int lru_pull_tail(const int orig_id, const int cur_lru,
843 const uint64_t total_bytes, const bool do_evict) {
844 item *it = NULL;
845 int id = orig_id;
846 int removed = 0;
847 if (id == 0)
848 return 0;
849
850 int tries = 5;
851 item *search;
852 item *next_it;
853 void *hold_lock = NULL;
854 unsigned int move_to_lru = 0;
855 uint64_t limit = 0;
856
857 id |= cur_lru;
858 pthread_mutex_lock(&lru_locks[id]);
859 search = tails[id];
860 /* We walk up *only* for locked items, and if bottom is expired. */
861 for (; tries > 0 && search != NULL; tries--, search=next_it) {
862 /* we might relink search mid-loop, so search->prev isn't reliable */
863 next_it = search->prev;
864 if (search->nbytes == 0 && search->nkey == 0 && search->it_flags == 1) {
865 /* We are a crawler, ignore it. */
866 tries++;
867 continue;
868 }
869 uint32_t hv = hash(ITEM_key(search), search->nkey);
870 /* Attempt to hash item lock the "search" item. If locked, no
871 * other callers can incr the refcount. Also skip ourselves. */
872 if ((hold_lock = item_trylock(hv)) == NULL)
873 continue;
874 /* Now see if the item is refcount locked */
875 if (refcount_incr(&search->refcount) != 2) {
876 /* Note pathological case with ref'ed items in tail.
877 * Can still unlink the item, but it won't be reusable yet */
878 itemstats[id].lrutail_reflocked++;
879 /* In case of refcount leaks, enable for quick workaround. */
880 /* WARNING: This can cause terrible corruption */
881 if (settings.tail_repair_time &&
882 search->time + settings.tail_repair_time < current_time) {
883 itemstats[id].tailrepairs++;
884 search->refcount = 1;
885 /* This will call item_remove -> item_free since refcnt is 1 */
886 do_item_unlink_nolock(search, hv);
887 item_trylock_unlock(hold_lock);
888 continue;
889 }
890 }
891
892 /* Expired or flushed */
893 if ((search->exptime != 0 && search->exptime < current_time)
894 || item_is_flushed(search)) {
895 itemstats[id].reclaimed++;
896 if ((search->it_flags & ITEM_FETCHED) == 0) {
897 itemstats[id].expired_unfetched++;
898 }
899 /* refcnt 2 -> 1 */
900 do_item_unlink_nolock(search, hv);
901 /* refcnt 1 -> 0 -> item_free */
902 do_item_remove(search);
903 item_trylock_unlock(hold_lock);
904 removed++;
905
906 /* If all we're finding are expired, can keep going */
907 continue;
908 }
909
910 /* If we're HOT_LRU or WARM_LRU and over size limit, send to COLD_LRU.
911 * If we're COLD_LRU, send to WARM_LRU unless we need to evict
912 */
913 switch (cur_lru) {
914 case HOT_LRU:
915 limit = total_bytes * settings.hot_lru_pct / 100;
916 case WARM_LRU:
917 if (limit == 0)
918 limit = total_bytes * settings.warm_lru_pct / 100;
919 if (sizes_bytes[id] > limit) {
920 itemstats[id].moves_to_cold++;
921 move_to_lru = COLD_LRU;
922 do_item_unlink_q(search);
923 it = search;
924 removed++;
925 break;
926 } else if ((search->it_flags & ITEM_ACTIVE) != 0) {
927 /* Only allow ACTIVE relinking if we're not too large. */
928 itemstats[id].moves_within_lru++;
929 search->it_flags &= ~ITEM_ACTIVE;
930 do_item_update_nolock(search);
931 do_item_remove(search);
932 item_trylock_unlock(hold_lock);
933 } else {
934 /* Don't want to move to COLD, not active, bail out */
935 it = search;
936 }
937 break;
938 case COLD_LRU:
939 it = search; /* No matter what, we're stopping */
940 if (do_evict) {
941 if (settings.evict_to_free == 0) {
942 /* Don't think we need a counter for this. It'll OOM. */
943 break;
944 }
945 itemstats[id].evicted++;
946 itemstats[id].evicted_time = current_time - search->time;
947 if (search->exptime != 0)
948 itemstats[id].evicted_nonzero++;
949 if ((search->it_flags & ITEM_FETCHED) == 0) {
950 itemstats[id].evicted_unfetched++;
951 }
952 LOGGER_LOG(NULL, LOG_EVICTIONS, LOGGER_EVICTION, search);
953 do_item_unlink_nolock(search, hv);
954 removed++;
955 if (settings.slab_automove == 2) {
956 slabs_reassign(-1, orig_id);
957 }
958 } else if ((search->it_flags & ITEM_ACTIVE) != 0
959 && settings.lru_maintainer_thread) {
960 itemstats[id].moves_to_warm++;
961 search->it_flags &= ~ITEM_ACTIVE;
962 move_to_lru = WARM_LRU;
963 do_item_unlink_q(search);
964 removed++;
965 }
966 break;
967 }
968 if (it != NULL)
969 break;
970 }
971
972 pthread_mutex_unlock(&lru_locks[id]);
973
974 if (it != NULL) {
975 if (move_to_lru) {
976 it->slabs_clsid = ITEM_clsid(it);
977 it->slabs_clsid |= move_to_lru;
978 item_link_q(it);
979 }
980 do_item_remove(it);
981 item_trylock_unlock(hold_lock);
982 }
983
984 return removed;
985 }
986
987 /* Loop up to N times:
988 * If too many items are in HOT_LRU, push to COLD_LRU
989 * If too many items are in WARM_LRU, push to COLD_LRU
990 * If too many items are in COLD_LRU, poke COLD_LRU tail
991 * 1000 loops with 1ms min sleep gives us under 1m items shifted/sec. The
992 * locks can't handle much more than that. Leaving a TODO for how to
993 * autoadjust in the future.
994 */
lru_maintainer_juggle(const int slabs_clsid)995 static int lru_maintainer_juggle(const int slabs_clsid) {
996 int i;
997 int did_moves = 0;
998 bool mem_limit_reached = false;
999 uint64_t total_bytes = 0;
1000 unsigned int chunks_perslab = 0;
1001 unsigned int chunks_free = 0;
1002 /* TODO: if free_chunks below high watermark, increase aggressiveness */
1003 chunks_free = slabs_available_chunks(slabs_clsid, &mem_limit_reached,
1004 &total_bytes, &chunks_perslab);
1005 if (settings.expirezero_does_not_evict)
1006 total_bytes -= noexp_lru_size(slabs_clsid);
1007
1008 /* If slab automove is enabled on any level, and we have more than 2 pages
1009 * worth of chunks free in this class, ask (gently) to reassign a page
1010 * from this class back into the global pool (0)
1011 */
1012 if (settings.slab_automove > 0 && chunks_free > (chunks_perslab * 2.5)) {
1013 slabs_reassign(slabs_clsid, SLAB_GLOBAL_PAGE_POOL);
1014 }
1015
1016 /* Juggle HOT/WARM up to N times */
1017 for (i = 0; i < 1000; i++) {
1018 int do_more = 0;
1019 if (lru_pull_tail(slabs_clsid, HOT_LRU, total_bytes, false) ||
1020 lru_pull_tail(slabs_clsid, WARM_LRU, total_bytes, false)) {
1021 do_more++;
1022 }
1023 do_more += lru_pull_tail(slabs_clsid, COLD_LRU, total_bytes, false);
1024 if (do_more == 0)
1025 break;
1026 did_moves++;
1027 }
1028 return did_moves;
1029 }
1030
1031 /* Will crawl all slab classes a minimum of once per hour */
1032 #define MAX_MAINTCRAWL_WAIT 60 * 60
1033
1034 /* Hoping user input will improve this function. This is all a wild guess.
1035 * Operation: Kicks crawler for each slab id. Crawlers take some statistics as
1036 * to items with nonzero expirations. It then buckets how many items will
1037 * expire per minute for the next hour.
1038 * This function checks the results of a run, and if it things more than 1% of
1039 * expirable objects are ready to go, kick the crawler again to reap.
1040 * It will also kick the crawler once per minute regardless, waiting a minute
1041 * longer for each time it has no work to do, up to an hour wait time.
1042 * The latter is to avoid newly started daemons from waiting too long before
1043 * retrying a crawl.
1044 */
lru_maintainer_crawler_check(void)1045 static void lru_maintainer_crawler_check(void) {
1046 int i;
1047 static rel_time_t last_crawls[MAX_NUMBER_OF_SLAB_CLASSES];
1048 static rel_time_t next_crawl_wait[MAX_NUMBER_OF_SLAB_CLASSES];
1049 for (i = POWER_SMALLEST; i < MAX_NUMBER_OF_SLAB_CLASSES; i++) {
1050 crawlerstats_t *s = &crawlerstats[i];
1051 /* We've not successfully kicked off a crawl yet. */
1052 if (last_crawls[i] == 0) {
1053 if (lru_crawler_start(i, 0) > 0) {
1054 last_crawls[i] = current_time;
1055 }
1056 }
1057 pthread_mutex_lock(&lru_crawler_stats_lock);
1058 if (s->run_complete) {
1059 int x;
1060 /* Should we crawl again? */
1061 uint64_t possible_reclaims = s->seen - s->noexp;
1062 uint64_t available_reclaims = 0;
1063 /* Need to think we can free at least 1% of the items before
1064 * crawling. */
1065 /* FIXME: Configurable? */
1066 uint64_t low_watermark = (s->seen / 100) + 1;
1067 rel_time_t since_run = current_time - s->end_time;
1068 /* Don't bother if the payoff is too low. */
1069 if (settings.verbose > 1)
1070 fprintf(stderr, "maint crawler: low_watermark: %llu, possible_reclaims: %llu, since_run: %u\n",
1071 (unsigned long long)low_watermark, (unsigned long long)possible_reclaims,
1072 (unsigned int)since_run);
1073 for (x = 0; x < 60; x++) {
1074 if (since_run < (x * 60) + 60)
1075 break;
1076 available_reclaims += s->histo[x];
1077 }
1078 if (available_reclaims > low_watermark) {
1079 last_crawls[i] = 0;
1080 if (next_crawl_wait[i] > 60)
1081 next_crawl_wait[i] -= 60;
1082 } else if (since_run > 5 && since_run > next_crawl_wait[i]) {
1083 last_crawls[i] = 0;
1084 if (next_crawl_wait[i] < MAX_MAINTCRAWL_WAIT)
1085 next_crawl_wait[i] += 60;
1086 }
1087 if (settings.verbose > 1)
1088 fprintf(stderr, "maint crawler: available reclaims: %llu, next_crawl: %u\n", (unsigned long long)available_reclaims, next_crawl_wait[i]);
1089 }
1090 pthread_mutex_unlock(&lru_crawler_stats_lock);
1091 }
1092 }
1093
1094 static pthread_t lru_maintainer_tid;
1095
1096 #define MAX_LRU_MAINTAINER_SLEEP 1000000
1097 #define MIN_LRU_MAINTAINER_SLEEP 1000
1098
lru_maintainer_thread(void * arg)1099 static void *lru_maintainer_thread(void *arg) {
1100 int i;
1101 useconds_t to_sleep = MIN_LRU_MAINTAINER_SLEEP;
1102 rel_time_t last_crawler_check = 0;
1103
1104 pthread_mutex_lock(&lru_maintainer_lock);
1105 if (settings.verbose > 2)
1106 fprintf(stderr, "Starting LRU maintainer background thread\n");
1107 while (do_run_lru_maintainer_thread) {
1108 int did_moves = 0;
1109 pthread_mutex_unlock(&lru_maintainer_lock);
1110 usleep(to_sleep);
1111 pthread_mutex_lock(&lru_maintainer_lock);
1112
1113 STATS_LOCK();
1114 stats.lru_maintainer_juggles++;
1115 STATS_UNLOCK();
1116 /* We were asked to immediately wake up and poke a particular slab
1117 * class due to a low watermark being hit */
1118 if (lru_maintainer_check_clsid != 0) {
1119 did_moves = lru_maintainer_juggle(lru_maintainer_check_clsid);
1120 lru_maintainer_check_clsid = 0;
1121 } else {
1122 for (i = POWER_SMALLEST; i < MAX_NUMBER_OF_SLAB_CLASSES; i++) {
1123 did_moves += lru_maintainer_juggle(i);
1124 }
1125 }
1126 if (did_moves == 0) {
1127 if (to_sleep < MAX_LRU_MAINTAINER_SLEEP)
1128 to_sleep += 1000;
1129 } else {
1130 to_sleep /= 2;
1131 if (to_sleep < MIN_LRU_MAINTAINER_SLEEP)
1132 to_sleep = MIN_LRU_MAINTAINER_SLEEP;
1133 }
1134 /* Once per second at most */
1135 if (settings.lru_crawler && last_crawler_check != current_time) {
1136 lru_maintainer_crawler_check();
1137 last_crawler_check = current_time;
1138 }
1139 }
1140 pthread_mutex_unlock(&lru_maintainer_lock);
1141 if (settings.verbose > 2)
1142 fprintf(stderr, "LRU maintainer thread stopping\n");
1143
1144 return NULL;
1145 }
stop_lru_maintainer_thread(void)1146 int stop_lru_maintainer_thread(void) {
1147 int ret;
1148 pthread_mutex_lock(&lru_maintainer_lock);
1149 /* LRU thread is a sleep loop, will die on its own */
1150 do_run_lru_maintainer_thread = 0;
1151 pthread_mutex_unlock(&lru_maintainer_lock);
1152 if ((ret = pthread_join(lru_maintainer_tid, NULL)) != 0) {
1153 fprintf(stderr, "Failed to stop LRU maintainer thread: %s\n", strerror(ret));
1154 return -1;
1155 }
1156 settings.lru_maintainer_thread = false;
1157 return 0;
1158 }
1159
start_lru_maintainer_thread(void)1160 int start_lru_maintainer_thread(void) {
1161 int ret;
1162
1163 pthread_mutex_lock(&lru_maintainer_lock);
1164 do_run_lru_maintainer_thread = 1;
1165 settings.lru_maintainer_thread = true;
1166 if ((ret = pthread_create(&lru_maintainer_tid, NULL,
1167 lru_maintainer_thread, NULL)) != 0) {
1168 fprintf(stderr, "Can't create LRU maintainer thread: %s\n",
1169 strerror(ret));
1170 pthread_mutex_unlock(&lru_maintainer_lock);
1171 return -1;
1172 }
1173 pthread_mutex_unlock(&lru_maintainer_lock);
1174
1175 return 0;
1176 }
1177
1178 /* If we hold this lock, crawler can't wake up or move */
lru_maintainer_pause(void)1179 void lru_maintainer_pause(void) {
1180 pthread_mutex_lock(&lru_maintainer_lock);
1181 }
1182
lru_maintainer_resume(void)1183 void lru_maintainer_resume(void) {
1184 pthread_mutex_unlock(&lru_maintainer_lock);
1185 }
1186
init_lru_maintainer(void)1187 int init_lru_maintainer(void) {
1188 if (lru_maintainer_initialized == 0) {
1189 pthread_mutex_init(&lru_maintainer_lock, NULL);
1190 lru_maintainer_initialized = 1;
1191 }
1192 return 0;
1193 }
1194
1195 /*** ITEM CRAWLER THREAD ***/
1196
crawler_link_q(item * it)1197 static void crawler_link_q(item *it) { /* item is the new tail */
1198 item **head, **tail;
1199 assert(it->it_flags == 1);
1200 assert(it->nbytes == 0);
1201
1202 head = &heads[it->slabs_clsid];
1203 tail = &tails[it->slabs_clsid];
1204 assert(*tail != 0);
1205 assert(it != *tail);
1206 assert((*head && *tail) || (*head == 0 && *tail == 0));
1207 it->prev = *tail;
1208 it->next = 0;
1209 if (it->prev) {
1210 assert(it->prev->next == 0);
1211 it->prev->next = it;
1212 }
1213 *tail = it;
1214 if (*head == 0) *head = it;
1215 return;
1216 }
1217
crawler_unlink_q(item * it)1218 static void crawler_unlink_q(item *it) {
1219 item **head, **tail;
1220 head = &heads[it->slabs_clsid];
1221 tail = &tails[it->slabs_clsid];
1222
1223 if (*head == it) {
1224 assert(it->prev == 0);
1225 *head = it->next;
1226 }
1227 if (*tail == it) {
1228 assert(it->next == 0);
1229 *tail = it->prev;
1230 }
1231 assert(it->next != it);
1232 assert(it->prev != it);
1233
1234 if (it->next) it->next->prev = it->prev;
1235 if (it->prev) it->prev->next = it->next;
1236 return;
1237 }
1238
1239 /* This is too convoluted, but it's a difficult shuffle. Try to rewrite it
1240 * more clearly. */
crawler_crawl_q(item * it)1241 static item *crawler_crawl_q(item *it) {
1242 item **head, **tail;
1243 assert(it->it_flags == 1);
1244 assert(it->nbytes == 0);
1245 head = &heads[it->slabs_clsid];
1246 tail = &tails[it->slabs_clsid];
1247
1248 /* We've hit the head, pop off */
1249 if (it->prev == 0) {
1250 assert(*head == it);
1251 if (it->next) {
1252 *head = it->next;
1253 assert(it->next->prev == it);
1254 it->next->prev = 0;
1255 }
1256 return NULL; /* Done */
1257 }
1258
1259 /* Swing ourselves in front of the next item */
1260 /* NB: If there is a prev, we can't be the head */
1261 assert(it->prev != it);
1262 if (it->prev) {
1263 if (*head == it->prev) {
1264 /* Prev was the head, now we're the head */
1265 *head = it;
1266 }
1267 if (*tail == it) {
1268 /* We are the tail, now they are the tail */
1269 *tail = it->prev;
1270 }
1271 assert(it->next != it);
1272 if (it->next) {
1273 assert(it->prev->next == it);
1274 it->prev->next = it->next;
1275 it->next->prev = it->prev;
1276 } else {
1277 /* Tail. Move this above? */
1278 it->prev->next = 0;
1279 }
1280 /* prev->prev's next is it->prev */
1281 it->next = it->prev;
1282 it->prev = it->next->prev;
1283 it->next->prev = it;
1284 /* New it->prev now, if we're not at the head. */
1285 if (it->prev) {
1286 it->prev->next = it;
1287 }
1288 }
1289 assert(it->next != it);
1290 assert(it->prev != it);
1291
1292 return it->next; /* success */
1293 }
1294
1295 /* I pulled this out to make the main thread clearer, but it reaches into the
1296 * main thread's values too much. Should rethink again.
1297 */
item_crawler_evaluate(item * search,uint32_t hv,int i)1298 static void item_crawler_evaluate(item *search, uint32_t hv, int i) {
1299 int slab_id = CLEAR_LRU(i);
1300 crawlerstats_t *s = &crawlerstats[slab_id];
1301 itemstats[i].crawler_items_checked++;
1302 int is_flushed = item_is_flushed(search);
1303 if ((search->exptime != 0 && search->exptime < current_time)
1304 || is_flushed) {
1305 itemstats[i].crawler_reclaimed++;
1306 s->reclaimed++;
1307
1308 if (settings.verbose > 1) {
1309 int ii;
1310 char *key = ITEM_key(search);
1311 fprintf(stderr, "LRU crawler found an expired item (flags: %d, slab: %d): ",
1312 search->it_flags, search->slabs_clsid);
1313 for (ii = 0; ii < search->nkey; ++ii) {
1314 fprintf(stderr, "%c", key[ii]);
1315 }
1316 fprintf(stderr, "\n");
1317 }
1318 if ((search->it_flags & ITEM_FETCHED) == 0 && !is_flushed) {
1319 itemstats[i].expired_unfetched++;
1320 }
1321 do_item_unlink_nolock(search, hv);
1322 do_item_remove(search);
1323 assert(search->slabs_clsid == 0);
1324 } else {
1325 s->seen++;
1326 refcount_decr(&search->refcount);
1327 if (search->exptime == 0) {
1328 s->noexp++;
1329 } else if (search->exptime - current_time > 3599) {
1330 s->ttl_hourplus++;
1331 } else {
1332 rel_time_t ttl_remain = search->exptime - current_time;
1333 int bucket = ttl_remain / 60;
1334 s->histo[bucket]++;
1335 }
1336 }
1337 }
1338
item_crawler_thread(void * arg)1339 static void *item_crawler_thread(void *arg) {
1340 int i;
1341 int crawls_persleep = settings.crawls_persleep;
1342
1343 pthread_mutex_lock(&lru_crawler_lock);
1344 pthread_cond_signal(&lru_crawler_cond);
1345 settings.lru_crawler = true;
1346 if (settings.verbose > 2)
1347 fprintf(stderr, "Starting LRU crawler background thread\n");
1348 while (do_run_lru_crawler_thread) {
1349 pthread_cond_wait(&lru_crawler_cond, &lru_crawler_lock);
1350
1351 while (crawler_count) {
1352 item *search = NULL;
1353 void *hold_lock = NULL;
1354
1355 for (i = POWER_SMALLEST; i < LARGEST_ID; i++) {
1356 if (crawlers[i].it_flags != 1) {
1357 continue;
1358 }
1359 pthread_mutex_lock(&lru_locks[i]);
1360 search = crawler_crawl_q((item *)&crawlers[i]);
1361 if (search == NULL ||
1362 (crawlers[i].remaining && --crawlers[i].remaining < 1)) {
1363 if (settings.verbose > 2)
1364 fprintf(stderr, "Nothing left to crawl for %d\n", i);
1365 crawlers[i].it_flags = 0;
1366 crawler_count--;
1367 crawler_unlink_q((item *)&crawlers[i]);
1368 pthread_mutex_unlock(&lru_locks[i]);
1369 pthread_mutex_lock(&lru_crawler_stats_lock);
1370 crawlerstats[CLEAR_LRU(i)].end_time = current_time;
1371 crawlerstats[CLEAR_LRU(i)].run_complete = true;
1372 pthread_mutex_unlock(&lru_crawler_stats_lock);
1373 continue;
1374 }
1375 uint32_t hv = hash(ITEM_key(search), search->nkey);
1376 /* Attempt to hash item lock the "search" item. If locked, no
1377 * other callers can incr the refcount
1378 */
1379 if ((hold_lock = item_trylock(hv)) == NULL) {
1380 pthread_mutex_unlock(&lru_locks[i]);
1381 continue;
1382 }
1383 /* Now see if the item is refcount locked */
1384 if (refcount_incr(&search->refcount) != 2) {
1385 refcount_decr(&search->refcount);
1386 if (hold_lock)
1387 item_trylock_unlock(hold_lock);
1388 pthread_mutex_unlock(&lru_locks[i]);
1389 continue;
1390 }
1391
1392 /* Frees the item or decrements the refcount. */
1393 /* Interface for this could improve: do the free/decr here
1394 * instead? */
1395 pthread_mutex_lock(&lru_crawler_stats_lock);
1396 item_crawler_evaluate(search, hv, i);
1397 pthread_mutex_unlock(&lru_crawler_stats_lock);
1398
1399 if (hold_lock)
1400 item_trylock_unlock(hold_lock);
1401 pthread_mutex_unlock(&lru_locks[i]);
1402
1403 if (crawls_persleep <= 0 && settings.lru_crawler_sleep) {
1404 usleep(settings.lru_crawler_sleep);
1405 crawls_persleep = settings.crawls_persleep;
1406 }
1407 }
1408 }
1409 if (settings.verbose > 2)
1410 fprintf(stderr, "LRU crawler thread sleeping\n");
1411 STATS_LOCK();
1412 stats_state.lru_crawler_running = false;
1413 STATS_UNLOCK();
1414 }
1415 pthread_mutex_unlock(&lru_crawler_lock);
1416 if (settings.verbose > 2)
1417 fprintf(stderr, "LRU crawler thread stopping\n");
1418
1419 return NULL;
1420 }
1421
1422 static pthread_t item_crawler_tid;
1423
stop_item_crawler_thread(void)1424 int stop_item_crawler_thread(void) {
1425 int ret;
1426 pthread_mutex_lock(&lru_crawler_lock);
1427 do_run_lru_crawler_thread = 0;
1428 pthread_cond_signal(&lru_crawler_cond);
1429 pthread_mutex_unlock(&lru_crawler_lock);
1430 if ((ret = pthread_join(item_crawler_tid, NULL)) != 0) {
1431 fprintf(stderr, "Failed to stop LRU crawler thread: %s\n", strerror(ret));
1432 return -1;
1433 }
1434 settings.lru_crawler = false;
1435 return 0;
1436 }
1437
1438 /* Lock dance to "block" until thread is waiting on its condition:
1439 * caller locks mtx. caller spawns thread.
1440 * thread blocks on mutex.
1441 * caller waits on condition, releases lock.
1442 * thread gets lock, sends signal.
1443 * caller can't wait, as thread has lock.
1444 * thread waits on condition, releases lock
1445 * caller wakes on condition, gets lock.
1446 * caller immediately releases lock.
1447 * thread is now safely waiting on condition before the caller returns.
1448 */
start_item_crawler_thread(void)1449 int start_item_crawler_thread(void) {
1450 int ret;
1451
1452 if (settings.lru_crawler)
1453 return -1;
1454 pthread_mutex_lock(&lru_crawler_lock);
1455 do_run_lru_crawler_thread = 1;
1456 if ((ret = pthread_create(&item_crawler_tid, NULL,
1457 item_crawler_thread, NULL)) != 0) {
1458 fprintf(stderr, "Can't create LRU crawler thread: %s\n",
1459 strerror(ret));
1460 pthread_mutex_unlock(&lru_crawler_lock);
1461 return -1;
1462 }
1463 /* Avoid returning until the crawler has actually started */
1464 pthread_cond_wait(&lru_crawler_cond, &lru_crawler_lock);
1465 pthread_mutex_unlock(&lru_crawler_lock);
1466
1467 return 0;
1468 }
1469
1470 /* 'remaining' is passed in so the LRU maintainer thread can scrub the whole
1471 * LRU every time.
1472 */
do_lru_crawler_start(uint32_t id,uint32_t remaining)1473 static int do_lru_crawler_start(uint32_t id, uint32_t remaining) {
1474 int i;
1475 uint32_t sid;
1476 uint32_t tocrawl[3];
1477 int starts = 0;
1478 tocrawl[0] = id | HOT_LRU;
1479 tocrawl[1] = id | WARM_LRU;
1480 tocrawl[2] = id | COLD_LRU;
1481
1482 for (i = 0; i < 3; i++) {
1483 sid = tocrawl[i];
1484 pthread_mutex_lock(&lru_locks[sid]);
1485 if (tails[sid] != NULL) {
1486 if (settings.verbose > 2)
1487 fprintf(stderr, "Kicking LRU crawler off for LRU %u\n", sid);
1488 crawlers[sid].nbytes = 0;
1489 crawlers[sid].nkey = 0;
1490 crawlers[sid].it_flags = 1; /* For a crawler, this means enabled. */
1491 crawlers[sid].next = 0;
1492 crawlers[sid].prev = 0;
1493 crawlers[sid].time = 0;
1494 crawlers[sid].remaining = remaining;
1495 crawlers[sid].slabs_clsid = sid;
1496 crawler_link_q((item *)&crawlers[sid]);
1497 crawler_count++;
1498 starts++;
1499 }
1500 pthread_mutex_unlock(&lru_locks[sid]);
1501 }
1502 if (starts) {
1503 STATS_LOCK();
1504 stats_state.lru_crawler_running = true;
1505 stats.lru_crawler_starts++;
1506 STATS_UNLOCK();
1507 pthread_mutex_lock(&lru_crawler_stats_lock);
1508 memset(&crawlerstats[id], 0, sizeof(crawlerstats_t));
1509 crawlerstats[id].start_time = current_time;
1510 pthread_mutex_unlock(&lru_crawler_stats_lock);
1511 }
1512 return starts;
1513 }
1514
lru_crawler_start(uint32_t id,uint32_t remaining)1515 static int lru_crawler_start(uint32_t id, uint32_t remaining) {
1516 int starts;
1517 if (pthread_mutex_trylock(&lru_crawler_lock) != 0) {
1518 return 0;
1519 }
1520 starts = do_lru_crawler_start(id, remaining);
1521 if (starts) {
1522 pthread_cond_signal(&lru_crawler_cond);
1523 }
1524 pthread_mutex_unlock(&lru_crawler_lock);
1525 return starts;
1526 }
1527
1528 /* FIXME: Split this into two functions: one to kick a crawler for a sid, and one to
1529 * parse the string. LRU maintainer code is generating a string to set up a
1530 * sid.
1531 * Also only clear the crawlerstats once per sid.
1532 */
lru_crawler_crawl(char * slabs)1533 enum crawler_result_type lru_crawler_crawl(char *slabs) {
1534 char *b = NULL;
1535 uint32_t sid = 0;
1536 int starts = 0;
1537 uint8_t tocrawl[MAX_NUMBER_OF_SLAB_CLASSES];
1538 if (pthread_mutex_trylock(&lru_crawler_lock) != 0) {
1539 return CRAWLER_RUNNING;
1540 }
1541
1542 /* FIXME: I added this while debugging. Don't think it's needed? */
1543 memset(tocrawl, 0, sizeof(uint8_t) * MAX_NUMBER_OF_SLAB_CLASSES);
1544 if (strcmp(slabs, "all") == 0) {
1545 for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
1546 tocrawl[sid] = 1;
1547 }
1548 } else {
1549 for (char *p = strtok_r(slabs, ",", &b);
1550 p != NULL;
1551 p = strtok_r(NULL, ",", &b)) {
1552
1553 if (!safe_strtoul(p, &sid) || sid < POWER_SMALLEST
1554 || sid >= MAX_NUMBER_OF_SLAB_CLASSES-1) {
1555 pthread_mutex_unlock(&lru_crawler_lock);
1556 return CRAWLER_BADCLASS;
1557 }
1558 tocrawl[sid] = 1;
1559 }
1560 }
1561
1562 for (sid = POWER_SMALLEST; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
1563 if (tocrawl[sid])
1564 starts += do_lru_crawler_start(sid, settings.lru_crawler_tocrawl);
1565 }
1566 if (starts) {
1567 pthread_cond_signal(&lru_crawler_cond);
1568 pthread_mutex_unlock(&lru_crawler_lock);
1569 return CRAWLER_OK;
1570 } else {
1571 pthread_mutex_unlock(&lru_crawler_lock);
1572 return CRAWLER_NOTSTARTED;
1573 }
1574 }
1575
1576 /* If we hold this lock, crawler can't wake up or move */
lru_crawler_pause(void)1577 void lru_crawler_pause(void) {
1578 pthread_mutex_lock(&lru_crawler_lock);
1579 }
1580
lru_crawler_resume(void)1581 void lru_crawler_resume(void) {
1582 pthread_mutex_unlock(&lru_crawler_lock);
1583 }
1584
init_lru_crawler(void)1585 int init_lru_crawler(void) {
1586 if (lru_crawler_initialized == 0) {
1587 memset(&crawlerstats, 0, sizeof(crawlerstats_t) * MAX_NUMBER_OF_SLAB_CLASSES);
1588 if (pthread_cond_init(&lru_crawler_cond, NULL) != 0) {
1589 fprintf(stderr, "Can't initialize lru crawler condition\n");
1590 return -1;
1591 }
1592 pthread_mutex_init(&lru_crawler_lock, NULL);
1593 lru_crawler_initialized = 1;
1594 }
1595 return 0;
1596 }
1597