1 /* Copyright 2016 Netflix.
2 *
3 * Use and distribution licensed under the BSD license. See
4 * the LICENSE file for full text.
5 */
6
7 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
8 #include "memcached.h"
9 #include "storage.h"
10 #include <sys/stat.h>
11 #include <sys/socket.h>
12 #include <sys/resource.h>
13 #include <fcntl.h>
14 #include <netinet/in.h>
15 #include <errno.h>
16 #include <stdlib.h>
17 #include <stdio.h>
18 #include <signal.h>
19 #include <string.h>
20 #include <time.h>
21 #include <assert.h>
22 #include <unistd.h>
23 #include <poll.h>
24
25 #include "base64.h"
26
27 #define LARGEST_ID POWER_LARGEST
28
29 typedef struct {
30 void *c; /* original connection structure. still with source thread attached. */
31 int sfd; /* client fd. */
32 int buflen;
33 int bufused;
34 char *buf; /* output buffer */
35 } crawler_client_t;
36
37 typedef struct _crawler_module_t crawler_module_t;
38
39 typedef void (*crawler_eval_func)(crawler_module_t *cm, item *it, uint32_t hv, int slab_cls);
40 typedef int (*crawler_init_func)(crawler_module_t *cm, void *data); // TODO: init args?
41 typedef void (*crawler_deinit_func)(crawler_module_t *cm); // TODO: extra args?
42 typedef void (*crawler_doneclass_func)(crawler_module_t *cm, int slab_cls);
43 typedef void (*crawler_finalize_func)(crawler_module_t *cm);
44
45 typedef struct {
46 crawler_init_func init; /* run before crawl starts */
47 crawler_eval_func eval; /* runs on an item. */
48 crawler_doneclass_func doneclass; /* runs once per sub-crawler completion. */
49 crawler_finalize_func finalize; /* runs once when all sub-crawlers are done. */
50 bool needs_lock; /* whether or not we need the LRU lock held when eval is called */
51 bool needs_client; /* whether or not to grab onto the remote client */
52 } crawler_module_reg_t;
53
54 struct _crawler_module_t {
55 void *data; /* opaque data pointer */
56 crawler_client_t c;
57 crawler_module_reg_t *mod;
58 int status; /* flags/code/etc for internal module usage */
59 };
60
61 static int crawler_expired_init(crawler_module_t *cm, void *data);
62 static void crawler_expired_doneclass(crawler_module_t *cm, int slab_cls);
63 static void crawler_expired_finalize(crawler_module_t *cm);
64 static void crawler_expired_eval(crawler_module_t *cm, item *search, uint32_t hv, int i);
65
66 crawler_module_reg_t crawler_expired_mod = {
67 .init = crawler_expired_init,
68 .eval = crawler_expired_eval,
69 .doneclass = crawler_expired_doneclass,
70 .finalize = crawler_expired_finalize,
71 .needs_lock = true,
72 .needs_client = false,
73 };
74
75 static int crawler_metadump_init(crawler_module_t *cm, void *data);
76 static void crawler_metadump_eval(crawler_module_t *cm, item *search, uint32_t hv, int i);
77 static void crawler_metadump_finalize(crawler_module_t *cm);
78
79 crawler_module_reg_t crawler_metadump_mod = {
80 .init = crawler_metadump_init,
81 .eval = crawler_metadump_eval,
82 .doneclass = NULL,
83 .finalize = crawler_metadump_finalize,
84 .needs_lock = false,
85 .needs_client = true,
86 };
87
88 static int crawler_mgdump_init(crawler_module_t *cm, void *data);
89 static void crawler_mgdump_eval(crawler_module_t *cm, item *search, uint32_t hv, int i);
90 static void crawler_mgdump_finalize(crawler_module_t *cm);
91
92 crawler_module_reg_t crawler_mgdump_mod = {
93 .init = crawler_mgdump_init,
94 .eval = crawler_mgdump_eval,
95 .doneclass = NULL,
96 .finalize = crawler_mgdump_finalize,
97 .needs_lock = false,
98 .needs_client = true,
99 };
100
101 crawler_module_reg_t *crawler_mod_regs[4] = {
102 &crawler_expired_mod,
103 &crawler_expired_mod,
104 &crawler_metadump_mod,
105 &crawler_mgdump_mod,
106 };
107
108 static int lru_crawler_write(crawler_client_t *c);
109 crawler_module_t active_crawler_mod;
110 enum crawler_run_type active_crawler_type;
111
112 static crawler crawlers[LARGEST_ID];
113
114 static int crawler_count = 0;
115 static volatile int do_run_lru_crawler_thread = 0;
116 static int lru_crawler_initialized = 0;
117 static pthread_mutex_t lru_crawler_lock = PTHREAD_MUTEX_INITIALIZER;
118 static pthread_cond_t lru_crawler_cond = PTHREAD_COND_INITIALIZER;
119 #ifdef EXTSTORE
120 /* TODO: pass this around */
121 static void *storage;
122 #endif
123
124 /* Will crawl all slab classes a minimum of once per hour */
125 #define MAX_MAINTCRAWL_WAIT 60 * 60
126
127 /*** LRU CRAWLER THREAD ***/
128
129 #define LRU_CRAWLER_MINBUFSPACE 8192
130
lru_crawler_close_client(crawler_client_t * c)131 static void lru_crawler_close_client(crawler_client_t *c) {
132 //fprintf(stderr, "CRAWLER: Closing client\n");
133 sidethread_conn_close(c->c);
134 c->c = NULL;
135 free(c->buf);
136 c->buf = NULL;
137 }
138
lru_crawler_release_client(crawler_client_t * c)139 static void lru_crawler_release_client(crawler_client_t *c) {
140 //fprintf(stderr, "CRAWLER: Closing client\n");
141 redispatch_conn(c->c);
142 c->c = NULL;
143 free(c->buf);
144 c->buf = NULL;
145 }
146
lru_crawler_expand_buf(crawler_client_t * c)147 static int lru_crawler_expand_buf(crawler_client_t *c) {
148 c->buflen *= 2;
149 char *nb = realloc(c->buf, c->buflen);
150 if (nb == NULL) {
151 return -1;
152 }
153 c->buf = nb;
154 return 0;
155 }
156
crawler_expired_init(crawler_module_t * cm,void * data)157 static int crawler_expired_init(crawler_module_t *cm, void *data) {
158 struct crawler_expired_data *d;
159 if (data != NULL) {
160 d = data;
161 d->is_external = true;
162 cm->data = data;
163 } else {
164 // allocate data.
165 d = calloc(1, sizeof(struct crawler_expired_data));
166 if (d == NULL) {
167 return -1;
168 }
169 // init lock.
170 pthread_mutex_init(&d->lock, NULL);
171 d->is_external = false;
172 d->start_time = current_time;
173
174 cm->data = d;
175 }
176 pthread_mutex_lock(&d->lock);
177 memset(&d->crawlerstats, 0, sizeof(crawlerstats_t) * POWER_LARGEST);
178 for (int x = 0; x < POWER_LARGEST; x++) {
179 d->crawlerstats[x].start_time = current_time;
180 d->crawlerstats[x].run_complete = false;
181 }
182 pthread_mutex_unlock(&d->lock);
183 return 0;
184 }
185
crawler_expired_doneclass(crawler_module_t * cm,int slab_cls)186 static void crawler_expired_doneclass(crawler_module_t *cm, int slab_cls) {
187 struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data;
188 pthread_mutex_lock(&d->lock);
189 d->crawlerstats[slab_cls].end_time = current_time;
190 d->crawlerstats[slab_cls].run_complete = true;
191 pthread_mutex_unlock(&d->lock);
192 }
193
crawler_expired_finalize(crawler_module_t * cm)194 static void crawler_expired_finalize(crawler_module_t *cm) {
195 struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data;
196 pthread_mutex_lock(&d->lock);
197 d->end_time = current_time;
198 d->crawl_complete = true;
199 pthread_mutex_unlock(&d->lock);
200
201 if (!d->is_external) {
202 free(d);
203 }
204 }
205
206 /* I pulled this out to make the main thread clearer, but it reaches into the
207 * main thread's values too much. Should rethink again.
208 */
crawler_expired_eval(crawler_module_t * cm,item * search,uint32_t hv,int i)209 static void crawler_expired_eval(crawler_module_t *cm, item *search, uint32_t hv, int i) {
210 struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data;
211 pthread_mutex_lock(&d->lock);
212 crawlerstats_t *s = &d->crawlerstats[i];
213 int is_flushed = item_is_flushed(search);
214 #ifdef EXTSTORE
215 bool is_valid = true;
216 if (search->it_flags & ITEM_HDR) {
217 is_valid = storage_validate_item(storage, search);
218 }
219 #endif
220 if ((search->exptime != 0 && search->exptime < current_time)
221 || is_flushed
222 #ifdef EXTSTORE
223 || !is_valid
224 #endif
225 ) {
226 crawlers[i].reclaimed++;
227 s->reclaimed++;
228
229 if (settings.verbose > 1) {
230 int ii;
231 char *key = ITEM_key(search);
232 fprintf(stderr, "LRU crawler found an expired item (flags: %d, slab: %d): ",
233 search->it_flags, search->slabs_clsid);
234 for (ii = 0; ii < search->nkey; ++ii) {
235 fprintf(stderr, "%c", key[ii]);
236 }
237 fprintf(stderr, "\n");
238 }
239 if ((search->it_flags & ITEM_FETCHED) == 0 && !is_flushed) {
240 crawlers[i].unfetched++;
241 }
242 #ifdef EXTSTORE
243 STORAGE_delete(storage, search);
244 #endif
245 do_item_unlink_nolock(search, hv);
246 do_item_remove(search);
247 } else {
248 s->seen++;
249 refcount_decr(search);
250 if (search->exptime == 0) {
251 s->noexp++;
252 } else if (search->exptime - current_time > 3599) {
253 s->ttl_hourplus++;
254 } else {
255 rel_time_t ttl_remain = search->exptime - current_time;
256 int bucket = ttl_remain / 60;
257 if (bucket <= 60) {
258 s->histo[bucket]++;
259 }
260 }
261 }
262 pthread_mutex_unlock(&d->lock);
263 }
264
crawler_metadump_init(crawler_module_t * cm,void * data)265 static int crawler_metadump_init(crawler_module_t *cm, void *data) {
266 cm->status = 0;
267 return 0;
268 }
269
crawler_metadump_eval(crawler_module_t * cm,item * it,uint32_t hv,int i)270 static void crawler_metadump_eval(crawler_module_t *cm, item *it, uint32_t hv, int i) {
271 char keybuf[KEY_MAX_URI_ENCODED_LENGTH];
272 int is_flushed = item_is_flushed(it);
273 /* Ignore expired content. */
274 if ((it->exptime != 0 && it->exptime < current_time)
275 || is_flushed) {
276 refcount_decr(it);
277 return;
278 }
279 client_flags_t flags;
280 FLAGS_CONV(it, flags);
281 // TODO: uriencode directly into the buffer.
282 uriencode(ITEM_key(it), keybuf, it->nkey, KEY_MAX_URI_ENCODED_LENGTH);
283 int total = snprintf(cm->c.buf + cm->c.bufused, 4096,
284 "key=%s exp=%ld la=%llu cas=%llu fetch=%s cls=%u size=%lu flags=%llu\n",
285 keybuf,
286 (it->exptime == 0) ? -1 : (long)(it->exptime + process_started),
287 (unsigned long long)(it->time + process_started),
288 (unsigned long long)ITEM_get_cas(it),
289 (it->it_flags & ITEM_FETCHED) ? "yes" : "no",
290 ITEM_clsid(it),
291 (unsigned long) ITEM_ntotal(it),
292 (unsigned long long)flags);
293 refcount_decr(it);
294 // TODO: some way of tracking the errors. these should be impossible given
295 // the space requirements.
296 if (total >= LRU_CRAWLER_MINBUFSPACE - 1 || total <= 0) {
297 // Failed to write, don't push it.
298 return;
299 }
300 cm->c.bufused += total;
301 }
302
crawler_metadump_finalize(crawler_module_t * cm)303 static void crawler_metadump_finalize(crawler_module_t *cm) {
304 if (cm->c.c != NULL) {
305 // flush any pending data.
306 if (lru_crawler_write(&cm->c) == 0) {
307 // Only nonzero status right now means we were locked
308 if (cm->status != 0) {
309 const char *errstr = "ERROR locked try again later\r\n";
310 size_t errlen = strlen(errstr);
311 memcpy(cm->c.buf, errstr, errlen);
312 cm->c.bufused += errlen;
313 } else {
314 memcpy(cm->c.buf, "END\r\n", 5);
315 cm->c.bufused += 5;
316 }
317 }
318 }
319 }
320
crawler_mgdump_init(crawler_module_t * cm,void * data)321 static int crawler_mgdump_init(crawler_module_t *cm, void *data) {
322 cm->status = 0;
323 return 0;
324 }
325
crawler_mgdump_eval(crawler_module_t * cm,item * it,uint32_t hv,int i)326 static void crawler_mgdump_eval(crawler_module_t *cm, item *it, uint32_t hv, int i) {
327 int is_flushed = item_is_flushed(it);
328 /* Ignore expired content. */
329 if ((it->exptime != 0 && it->exptime < current_time)
330 || is_flushed) {
331 refcount_decr(it);
332 return;
333 }
334
335 char *p = cm->c.buf + cm->c.bufused; // buffer offset.
336 char *start = p;
337 memcpy(p, "mg ", 3);
338 p += 3;
339 if (it->it_flags & ITEM_KEY_BINARY) {
340 p += base64_encode((unsigned char *) ITEM_key(it), it->nkey, (unsigned char*) p, LRU_CRAWLER_MINBUFSPACE/2);
341 memcpy(p, " b\r\n", 4);
342 p += 4;
343 } else {
344 memcpy(p, ITEM_key(it), it->nkey);
345 p += it->nkey;
346 memcpy(p, "\r\n", 2);
347 p += 2;
348 }
349 int total = p - start;
350
351 refcount_decr(it);
352 cm->c.bufused += total;
353 }
354
crawler_mgdump_finalize(crawler_module_t * cm)355 static void crawler_mgdump_finalize(crawler_module_t *cm) {
356 if (cm->c.c != NULL) {
357 // flush any pending data.
358 if (lru_crawler_write(&cm->c) == 0) {
359 // Only nonzero status right now means we were locked
360 if (cm->status != 0) {
361 const char *errstr = "ERROR locked try again later\r\n";
362 size_t errlen = strlen(errstr);
363 memcpy(cm->c.buf, errstr, errlen);
364 cm->c.bufused += errlen;
365 } else {
366 memcpy(cm->c.buf, "EN\r\n", 4);
367 cm->c.bufused += 4;
368 }
369 }
370 }
371 }
372
373 // write the whole buffer out to the client socket.
lru_crawler_write(crawler_client_t * c)374 static int lru_crawler_write(crawler_client_t *c) {
375 unsigned int data_size = c->bufused;
376 unsigned int sent = 0;
377 struct pollfd to_poll[1];
378 to_poll[0].fd = c->sfd;
379 to_poll[0].events = POLLOUT;
380
381 if (c->c == NULL) return -1;
382 if (data_size == 0) return 0;
383
384 while (sent < data_size) {
385 int ret = poll(to_poll, 1, 1000);
386
387 if (ret < 0) {
388 // fatal.
389 lru_crawler_close_client(c);
390 return -1;
391 }
392
393 if (ret == 0) return 0;
394
395 // check if socket was closed on us.
396 if (to_poll[0].revents & POLLIN) {
397 char buf[1];
398 int res = ((conn*)c->c)->read(c->c, buf, 1);
399 if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK))) {
400 lru_crawler_close_client(c);
401 return -1;
402 }
403 }
404
405 if (to_poll[0].revents & (POLLHUP|POLLERR)) {
406 // got socket hangup.
407 lru_crawler_close_client(c);
408 return -1;
409 } else if (to_poll[0].revents & POLLOUT) {
410 // socket is writeable.
411 int total = ((conn*)c->c)->write(c->c, c->buf + sent, data_size - sent);
412 if (total == -1) {
413 if (errno != EAGAIN && errno != EWOULDBLOCK) {
414 lru_crawler_close_client(c);
415 return -1;
416 }
417 } else if (total == 0) {
418 lru_crawler_close_client(c);
419 return -1;
420 }
421 sent += total;
422 }
423 } // while
424
425 // write buffer now empty.
426 c->bufused = 0;
427
428 return 0;
429 }
430
lru_crawler_class_done(int i)431 static void lru_crawler_class_done(int i) {
432 crawlers[i].it_flags = 0;
433 crawler_count--;
434 do_item_unlinktail_q((item *)&crawlers[i]);
435 do_item_stats_add_crawl(i, crawlers[i].reclaimed,
436 crawlers[i].unfetched, crawlers[i].checked);
437 pthread_mutex_unlock(&lru_locks[i]);
438 if (active_crawler_mod.mod->doneclass != NULL)
439 active_crawler_mod.mod->doneclass(&active_crawler_mod, i);
440 }
441
442 // ensure we build the buffer a little bit to cut down on poll/write syscalls.
443 #define MIN_ITEMS_PER_WRITE 16
item_crawl_hash(void)444 static void item_crawl_hash(void) {
445 // get iterator from assoc. can hang for a long time.
446 // - blocks hash expansion
447 void *iter = assoc_get_iterator();
448 int crawls_persleep = settings.crawls_persleep;
449 item *it = NULL;
450 int items = 0;
451
452 // Could not get the iterator: probably locked due to hash expansion.
453 if (iter == NULL) {
454 active_crawler_mod.status = 1;
455 return;
456 }
457
458 // loop while iterator returns something
459 // - iterator func handles bucket-walking
460 // - iterator returns with bucket locked.
461 while (assoc_iterate(iter, &it)) {
462 // if iterator returns true but no item, we're inbetween buckets and
463 // can do cleanup work without holding an item lock.
464 if (it == NULL) {
465 if (active_crawler_mod.c.c != NULL) {
466 if (items > MIN_ITEMS_PER_WRITE) {
467 int ret = lru_crawler_write(&active_crawler_mod.c);
468 items = 0;
469 if (ret != 0) {
470 // fail out and finalize.
471 break;
472 }
473 }
474 } else if (active_crawler_mod.mod->needs_client) {
475 // fail out and finalize.
476 break;
477 }
478
479 // - sleep bits from orig loop
480 if (crawls_persleep <= 0 && settings.lru_crawler_sleep) {
481 pthread_mutex_unlock(&lru_crawler_lock);
482 usleep(settings.lru_crawler_sleep);
483 pthread_mutex_lock(&lru_crawler_lock);
484 crawls_persleep = settings.crawls_persleep;
485 } else if (!settings.lru_crawler_sleep) {
486 // TODO: only cycle lock every N?
487 pthread_mutex_unlock(&lru_crawler_lock);
488 pthread_mutex_lock(&lru_crawler_lock);
489 }
490 continue;
491 }
492
493 // double check that the item isn't in a transitional state.
494 if (refcount_incr(it) < 2) {
495 refcount_decr(it);
496 continue;
497 }
498
499 // We're presently holding an item lock, so we cannot flush the
500 // buffer to the network socket as the syscall is both slow and could
501 // hang waiting for POLLOUT. Instead we must expand the buffer.
502 if (active_crawler_mod.c.c != NULL) {
503 crawler_client_t *c = &active_crawler_mod.c;
504 if (c->buflen - c->bufused < LRU_CRAWLER_MINBUFSPACE) {
505 if (lru_crawler_expand_buf(c) != 0) {
506 // failed to expand buffer, stop.
507 break;
508 }
509 }
510 }
511 // FIXME: missing hv and i are fine for metadump eval, but not fine
512 // for expire eval.
513 active_crawler_mod.mod->eval(&active_crawler_mod, it, 0, 0);
514 crawls_persleep--;
515 items++;
516 }
517
518 // must finalize or we leave the hash table expansion blocked.
519 assoc_iterate_final(iter);
520 return;
521 }
522
item_crawler_thread(void * arg)523 static void *item_crawler_thread(void *arg) {
524 int i;
525 int crawls_persleep = settings.crawls_persleep;
526
527 pthread_mutex_lock(&lru_crawler_lock);
528 pthread_cond_signal(&lru_crawler_cond);
529 settings.lru_crawler = true;
530 if (settings.verbose > 2)
531 fprintf(stderr, "Starting LRU crawler background thread\n");
532 while (do_run_lru_crawler_thread) {
533 pthread_cond_wait(&lru_crawler_cond, &lru_crawler_lock);
534
535 if (crawler_count == -1) {
536 item_crawl_hash();
537 crawler_count = 0;
538 } else {
539 while (crawler_count) {
540 item *search = NULL;
541 void *hold_lock = NULL;
542
543 for (i = POWER_SMALLEST; i < LARGEST_ID; i++) {
544 if (crawlers[i].it_flags != 1) {
545 continue;
546 }
547
548 if (active_crawler_mod.c.c != NULL) {
549 crawler_client_t *c = &active_crawler_mod.c;
550 if (c->buflen - c->bufused < LRU_CRAWLER_MINBUFSPACE) {
551 int ret = lru_crawler_write(c);
552 if (ret != 0) {
553 lru_crawler_class_done(i);
554 continue;
555 }
556 }
557 } else if (active_crawler_mod.mod->needs_client) {
558 lru_crawler_class_done(i);
559 continue;
560 }
561 pthread_mutex_lock(&lru_locks[i]);
562 search = do_item_crawl_q((item *)&crawlers[i]);
563 if (search == NULL ||
564 (crawlers[i].remaining && --crawlers[i].remaining < 1)) {
565 if (settings.verbose > 2)
566 fprintf(stderr, "Nothing left to crawl for %d\n", i);
567 lru_crawler_class_done(i);
568 continue;
569 }
570 uint32_t hv = hash(ITEM_key(search), search->nkey);
571 /* Attempt to hash item lock the "search" item. If locked, no
572 * other callers can incr the refcount
573 */
574 if ((hold_lock = item_trylock(hv)) == NULL) {
575 pthread_mutex_unlock(&lru_locks[i]);
576 continue;
577 }
578 /* Now see if the item is refcount locked */
579 if (refcount_incr(search) != 2) {
580 refcount_decr(search);
581 if (hold_lock)
582 item_trylock_unlock(hold_lock);
583 pthread_mutex_unlock(&lru_locks[i]);
584 continue;
585 }
586
587 crawlers[i].checked++;
588 /* Frees the item or decrements the refcount. */
589 /* Interface for this could improve: do the free/decr here
590 * instead? */
591 if (!active_crawler_mod.mod->needs_lock) {
592 pthread_mutex_unlock(&lru_locks[i]);
593 }
594
595 active_crawler_mod.mod->eval(&active_crawler_mod, search, hv, i);
596
597 if (hold_lock)
598 item_trylock_unlock(hold_lock);
599 if (active_crawler_mod.mod->needs_lock) {
600 pthread_mutex_unlock(&lru_locks[i]);
601 }
602
603 if (crawls_persleep-- <= 0 && settings.lru_crawler_sleep) {
604 pthread_mutex_unlock(&lru_crawler_lock);
605 usleep(settings.lru_crawler_sleep);
606 pthread_mutex_lock(&lru_crawler_lock);
607 crawls_persleep = settings.crawls_persleep;
608 } else if (!settings.lru_crawler_sleep) {
609 // TODO: only cycle lock every N?
610 pthread_mutex_unlock(&lru_crawler_lock);
611 pthread_mutex_lock(&lru_crawler_lock);
612 }
613 }
614 } // while
615 } // if crawler_count
616
617 if (active_crawler_mod.mod != NULL) {
618 if (active_crawler_mod.mod->finalize != NULL)
619 active_crawler_mod.mod->finalize(&active_crawler_mod);
620 while (active_crawler_mod.c.c != NULL && active_crawler_mod.c.bufused != 0) {
621 lru_crawler_write(&active_crawler_mod.c);
622 }
623 // Double checking in case the client closed during the poll
624 if (active_crawler_mod.c.c != NULL) {
625 lru_crawler_release_client(&active_crawler_mod.c);
626 }
627 active_crawler_mod.mod = NULL;
628 }
629
630 if (settings.verbose > 2)
631 fprintf(stderr, "LRU crawler thread sleeping\n");
632
633 STATS_LOCK();
634 stats_state.lru_crawler_running = false;
635 STATS_UNLOCK();
636 }
637 pthread_mutex_unlock(&lru_crawler_lock);
638 if (settings.verbose > 2)
639 fprintf(stderr, "LRU crawler thread stopping\n");
640 settings.lru_crawler = false;
641
642 return NULL;
643 }
644
645 static pthread_t item_crawler_tid;
646
stop_item_crawler_thread(bool wait)647 int stop_item_crawler_thread(bool wait) {
648 int ret;
649 pthread_mutex_lock(&lru_crawler_lock);
650 if (do_run_lru_crawler_thread == 0) {
651 pthread_mutex_unlock(&lru_crawler_lock);
652 return 0;
653 }
654 do_run_lru_crawler_thread = 0;
655 pthread_cond_signal(&lru_crawler_cond);
656 pthread_mutex_unlock(&lru_crawler_lock);
657 if (wait && (ret = pthread_join(item_crawler_tid, NULL)) != 0) {
658 fprintf(stderr, "Failed to stop LRU crawler thread: %s\n", strerror(ret));
659 return -1;
660 }
661 return 0;
662 }
663
664 /* Lock dance to "block" until thread is waiting on its condition:
665 * caller locks mtx. caller spawns thread.
666 * thread blocks on mutex.
667 * caller waits on condition, releases lock.
668 * thread gets lock, sends signal.
669 * caller can't wait, as thread has lock.
670 * thread waits on condition, releases lock
671 * caller wakes on condition, gets lock.
672 * caller immediately releases lock.
673 * thread is now safely waiting on condition before the caller returns.
674 */
start_item_crawler_thread(void)675 int start_item_crawler_thread(void) {
676 int ret;
677
678 if (settings.lru_crawler)
679 return -1;
680 pthread_mutex_lock(&lru_crawler_lock);
681 do_run_lru_crawler_thread = 1;
682 if ((ret = pthread_create(&item_crawler_tid, NULL,
683 item_crawler_thread, NULL)) != 0) {
684 fprintf(stderr, "Can't create LRU crawler thread: %s\n",
685 strerror(ret));
686 pthread_mutex_unlock(&lru_crawler_lock);
687 return -1;
688 }
689 thread_setname(item_crawler_tid, "mc-itemcrawler");
690 /* Avoid returning until the crawler has actually started */
691 pthread_cond_wait(&lru_crawler_cond, &lru_crawler_lock);
692 pthread_mutex_unlock(&lru_crawler_lock);
693
694 return 0;
695 }
696
697 /* 'remaining' is passed in so the LRU maintainer thread can scrub the whole
698 * LRU every time.
699 */
do_lru_crawler_start(uint32_t id,uint32_t remaining)700 static int do_lru_crawler_start(uint32_t id, uint32_t remaining) {
701 uint32_t sid = id;
702 int starts = 0;
703
704 pthread_mutex_lock(&lru_locks[sid]);
705 if (crawlers[sid].it_flags == 0) {
706 if (settings.verbose > 2)
707 fprintf(stderr, "Kicking LRU crawler off for LRU %u\n", sid);
708 crawlers[sid].nbytes = 0;
709 crawlers[sid].nkey = 0;
710 crawlers[sid].it_flags = 1; /* For a crawler, this means enabled. */
711 crawlers[sid].next = 0;
712 crawlers[sid].prev = 0;
713 crawlers[sid].time = 0;
714 if (remaining == LRU_CRAWLER_CAP_REMAINING) {
715 remaining = do_get_lru_size(sid);
716 }
717 /* Values for remaining:
718 * remaining = 0
719 * - scan all elements, until a NULL is reached
720 * - if empty, NULL is reached right away
721 * remaining = n + 1
722 * - first n elements are parsed (or until a NULL is reached)
723 */
724 if (remaining) remaining++;
725 crawlers[sid].remaining = remaining;
726 crawlers[sid].slabs_clsid = sid;
727 crawlers[sid].reclaimed = 0;
728 crawlers[sid].unfetched = 0;
729 crawlers[sid].checked = 0;
730 do_item_linktail_q((item *)&crawlers[sid]);
731 crawler_count++;
732 starts++;
733 }
734 pthread_mutex_unlock(&lru_locks[sid]);
735 return starts;
736 }
737
lru_crawler_set_client(crawler_module_t * cm,void * c,const int sfd)738 static int lru_crawler_set_client(crawler_module_t *cm, void *c, const int sfd) {
739 crawler_client_t *crawlc = &cm->c;
740 if (crawlc->c != NULL) {
741 return -1;
742 }
743 crawlc->c = c;
744 crawlc->sfd = sfd;
745
746 size_t size = LRU_CRAWLER_MINBUFSPACE * 16;
747 crawlc->buf = malloc(size);
748
749 if (crawlc->buf == NULL) {
750 return -2;
751 }
752 crawlc->buflen = size;
753 crawlc->bufused = 0;
754 return 0;
755 }
756
lru_crawler_start(uint8_t * ids,uint32_t remaining,const enum crawler_run_type type,void * data,void * c,const int sfd)757 int lru_crawler_start(uint8_t *ids, uint32_t remaining,
758 const enum crawler_run_type type, void *data,
759 void *c, const int sfd) {
760 int starts = 0;
761 bool is_running;
762 static rel_time_t block_ae_until = 0;
763 pthread_mutex_lock(&lru_crawler_lock);
764 STATS_LOCK();
765 is_running = stats_state.lru_crawler_running;
766 STATS_UNLOCK();
767 if (do_run_lru_crawler_thread == 0) {
768 pthread_mutex_unlock(&lru_crawler_lock);
769 return -2;
770 }
771
772 if (is_running &&
773 !(type == CRAWLER_AUTOEXPIRE && active_crawler_type == CRAWLER_AUTOEXPIRE)) {
774 pthread_mutex_unlock(&lru_crawler_lock);
775 block_ae_until = current_time + 60;
776 return -1;
777 }
778
779 if (type == CRAWLER_AUTOEXPIRE && block_ae_until > current_time) {
780 pthread_mutex_unlock(&lru_crawler_lock);
781 return -1;
782 }
783
784 /* hash table walk only supported with metadump for now. */
785 if (ids == NULL && type != CRAWLER_METADUMP && type != CRAWLER_MGDUMP) {
786 pthread_mutex_unlock(&lru_crawler_lock);
787 return -2;
788 }
789
790 /* Configure the module */
791 if (!is_running) {
792 assert(crawler_mod_regs[type] != NULL);
793 active_crawler_mod.mod = crawler_mod_regs[type];
794 active_crawler_type = type;
795 if (active_crawler_mod.mod->init != NULL) {
796 active_crawler_mod.mod->init(&active_crawler_mod, data);
797 }
798 if (active_crawler_mod.mod->needs_client) {
799 if (c == NULL || sfd == 0) {
800 pthread_mutex_unlock(&lru_crawler_lock);
801 return -2;
802 }
803 if (lru_crawler_set_client(&active_crawler_mod, c, sfd) != 0) {
804 pthread_mutex_unlock(&lru_crawler_lock);
805 return -2;
806 }
807 }
808 }
809
810 if (ids == NULL) {
811 /* NULL ids means to walk the hash table instead. */
812 starts = 1;
813 /* FIXME: hack to signal hash mode to the crawler thread.
814 * Something more clear would be nice.
815 */
816 crawler_count = -1;
817 } else {
818 /* we allow the autocrawler to restart sub-LRU's before completion */
819 for (int sid = POWER_SMALLEST; sid < POWER_LARGEST; sid++) {
820 if (ids[sid])
821 starts += do_lru_crawler_start(sid, remaining);
822 }
823 }
824 if (starts) {
825 STATS_LOCK();
826 stats_state.lru_crawler_running = true;
827 stats.lru_crawler_starts++;
828 STATS_UNLOCK();
829 pthread_cond_signal(&lru_crawler_cond);
830 }
831 pthread_mutex_unlock(&lru_crawler_lock);
832 return starts;
833 }
834
835 /*
836 * Also only clear the crawlerstats once per sid.
837 */
lru_crawler_crawl(char * slabs,const enum crawler_run_type type,void * c,const int sfd,unsigned int remaining)838 enum crawler_result_type lru_crawler_crawl(char *slabs, const enum crawler_run_type type,
839 void *c, const int sfd, unsigned int remaining) {
840 char *b = NULL;
841 uint32_t sid = 0;
842 int starts = 0;
843 uint8_t tocrawl[POWER_LARGEST];
844 bool hash_crawl = false;
845
846 /* FIXME: I added this while debugging. Don't think it's needed? */
847 memset(tocrawl, 0, sizeof(uint8_t) * POWER_LARGEST);
848 if (strcmp(slabs, "all") == 0) {
849 for (sid = 0; sid < POWER_LARGEST; sid++) {
850 tocrawl[sid] = 1;
851 }
852 } else if (strcmp(slabs, "hash") == 0) {
853 hash_crawl = true;
854 } else {
855 for (char *p = strtok_r(slabs, ",", &b);
856 p != NULL;
857 p = strtok_r(NULL, ",", &b)) {
858
859 if (!safe_strtoul(p, &sid) || sid < POWER_SMALLEST
860 || sid >= MAX_NUMBER_OF_SLAB_CLASSES) {
861 return CRAWLER_BADCLASS;
862 }
863 tocrawl[sid | TEMP_LRU] = 1;
864 tocrawl[sid | HOT_LRU] = 1;
865 tocrawl[sid | WARM_LRU] = 1;
866 tocrawl[sid | COLD_LRU] = 1;
867 }
868 }
869
870 starts = lru_crawler_start(hash_crawl ? NULL : tocrawl, remaining, type, NULL, c, sfd);
871 if (starts == -1) {
872 return CRAWLER_RUNNING;
873 } else if (starts == -2) {
874 return CRAWLER_ERROR; /* FIXME: not very helpful. */
875 } else if (starts) {
876 return CRAWLER_OK;
877 } else {
878 return CRAWLER_NOTSTARTED;
879 }
880 }
881
882 /* If we hold this lock, crawler can't wake up or move */
lru_crawler_pause(void)883 void lru_crawler_pause(void) {
884 pthread_mutex_lock(&lru_crawler_lock);
885 }
886
lru_crawler_resume(void)887 void lru_crawler_resume(void) {
888 pthread_mutex_unlock(&lru_crawler_lock);
889 }
890
init_lru_crawler(void * arg)891 int init_lru_crawler(void *arg) {
892 if (lru_crawler_initialized == 0) {
893 #ifdef EXTSTORE
894 storage = arg;
895 #endif
896 active_crawler_mod.c.c = NULL;
897 active_crawler_mod.mod = NULL;
898 active_crawler_mod.data = NULL;
899 lru_crawler_initialized = 1;
900 }
901 return 0;
902 }
903