1 /*  Copyright 2016 Netflix.
2  *
3  *  Use and distribution licensed under the BSD license.  See
4  *  the LICENSE file for full text.
5  */
6 
7 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
8 #include "memcached.h"
9 #include "storage.h"
10 #include <sys/stat.h>
11 #include <sys/socket.h>
12 #include <sys/resource.h>
13 #include <fcntl.h>
14 #include <netinet/in.h>
15 #include <errno.h>
16 #include <stdlib.h>
17 #include <stdio.h>
18 #include <signal.h>
19 #include <string.h>
20 #include <time.h>
21 #include <assert.h>
22 #include <unistd.h>
23 #include <poll.h>
24 
25 #include "base64.h"
26 
27 #define LARGEST_ID POWER_LARGEST
28 
29 typedef struct {
30     void *c; /* original connection structure. still with source thread attached. */
31     int sfd; /* client fd. */
32     int buflen;
33     int bufused;
34     char *buf; /* output buffer */
35 } crawler_client_t;
36 
37 typedef struct _crawler_module_t crawler_module_t;
38 
39 typedef void (*crawler_eval_func)(crawler_module_t *cm, item *it, uint32_t hv, int slab_cls);
40 typedef int (*crawler_init_func)(crawler_module_t *cm, void *data); // TODO: init args?
41 typedef void (*crawler_deinit_func)(crawler_module_t *cm); // TODO: extra args?
42 typedef void (*crawler_doneclass_func)(crawler_module_t *cm, int slab_cls);
43 typedef void (*crawler_finalize_func)(crawler_module_t *cm);
44 
45 typedef struct {
46     crawler_init_func init; /* run before crawl starts */
47     crawler_eval_func eval; /* runs on an item. */
48     crawler_doneclass_func doneclass; /* runs once per sub-crawler completion. */
49     crawler_finalize_func finalize; /* runs once when all sub-crawlers are done. */
50     bool needs_lock; /* whether or not we need the LRU lock held when eval is called */
51     bool needs_client; /* whether or not to grab onto the remote client */
52 } crawler_module_reg_t;
53 
54 struct _crawler_module_t {
55     void *data; /* opaque data pointer */
56     crawler_client_t c;
57     crawler_module_reg_t *mod;
58     int status; /* flags/code/etc for internal module usage */
59 };
60 
61 static int crawler_expired_init(crawler_module_t *cm, void *data);
62 static void crawler_expired_doneclass(crawler_module_t *cm, int slab_cls);
63 static void crawler_expired_finalize(crawler_module_t *cm);
64 static void crawler_expired_eval(crawler_module_t *cm, item *search, uint32_t hv, int i);
65 
66 crawler_module_reg_t crawler_expired_mod = {
67     .init = crawler_expired_init,
68     .eval = crawler_expired_eval,
69     .doneclass = crawler_expired_doneclass,
70     .finalize = crawler_expired_finalize,
71     .needs_lock = true,
72     .needs_client = false,
73 };
74 
75 static int crawler_metadump_init(crawler_module_t *cm, void *data);
76 static void crawler_metadump_eval(crawler_module_t *cm, item *search, uint32_t hv, int i);
77 static void crawler_metadump_finalize(crawler_module_t *cm);
78 
79 crawler_module_reg_t crawler_metadump_mod = {
80     .init = crawler_metadump_init,
81     .eval = crawler_metadump_eval,
82     .doneclass = NULL,
83     .finalize = crawler_metadump_finalize,
84     .needs_lock = false,
85     .needs_client = true,
86 };
87 
88 static int crawler_mgdump_init(crawler_module_t *cm, void *data);
89 static void crawler_mgdump_eval(crawler_module_t *cm, item *search, uint32_t hv, int i);
90 static void crawler_mgdump_finalize(crawler_module_t *cm);
91 
92 crawler_module_reg_t crawler_mgdump_mod = {
93     .init = crawler_mgdump_init,
94     .eval = crawler_mgdump_eval,
95     .doneclass = NULL,
96     .finalize = crawler_mgdump_finalize,
97     .needs_lock = false,
98     .needs_client = true,
99 };
100 
101 crawler_module_reg_t *crawler_mod_regs[4] = {
102     &crawler_expired_mod,
103     &crawler_expired_mod,
104     &crawler_metadump_mod,
105     &crawler_mgdump_mod,
106 };
107 
108 static int lru_crawler_write(crawler_client_t *c);
109 crawler_module_t active_crawler_mod;
110 enum crawler_run_type active_crawler_type;
111 
112 static crawler crawlers[LARGEST_ID];
113 
114 static int crawler_count = 0;
115 static volatile int do_run_lru_crawler_thread = 0;
116 static int lru_crawler_initialized = 0;
117 static pthread_mutex_t lru_crawler_lock = PTHREAD_MUTEX_INITIALIZER;
118 static pthread_cond_t  lru_crawler_cond = PTHREAD_COND_INITIALIZER;
119 #ifdef EXTSTORE
120 /* TODO: pass this around */
121 static void *storage;
122 #endif
123 
124 /* Will crawl all slab classes a minimum of once per hour */
125 #define MAX_MAINTCRAWL_WAIT 60 * 60
126 
127 /*** LRU CRAWLER THREAD ***/
128 
129 #define LRU_CRAWLER_MINBUFSPACE 8192
130 
lru_crawler_close_client(crawler_client_t * c)131 static void lru_crawler_close_client(crawler_client_t *c) {
132     //fprintf(stderr, "CRAWLER: Closing client\n");
133     sidethread_conn_close(c->c);
134     c->c = NULL;
135     free(c->buf);
136     c->buf = NULL;
137 }
138 
lru_crawler_release_client(crawler_client_t * c)139 static void lru_crawler_release_client(crawler_client_t *c) {
140     //fprintf(stderr, "CRAWLER: Closing client\n");
141     redispatch_conn(c->c);
142     c->c = NULL;
143     free(c->buf);
144     c->buf = NULL;
145 }
146 
lru_crawler_expand_buf(crawler_client_t * c)147 static int lru_crawler_expand_buf(crawler_client_t *c) {
148     c->buflen *= 2;
149     char *nb = realloc(c->buf, c->buflen);
150     if (nb == NULL) {
151         return -1;
152     }
153     c->buf = nb;
154     return 0;
155 }
156 
crawler_expired_init(crawler_module_t * cm,void * data)157 static int crawler_expired_init(crawler_module_t *cm, void *data) {
158     struct crawler_expired_data *d;
159     if (data != NULL) {
160         d = data;
161         d->is_external = true;
162         cm->data = data;
163     } else {
164         // allocate data.
165         d = calloc(1, sizeof(struct crawler_expired_data));
166         if (d == NULL) {
167             return -1;
168         }
169         // init lock.
170         pthread_mutex_init(&d->lock, NULL);
171         d->is_external = false;
172         d->start_time = current_time;
173 
174         cm->data = d;
175     }
176     pthread_mutex_lock(&d->lock);
177     memset(&d->crawlerstats, 0, sizeof(crawlerstats_t) * POWER_LARGEST);
178     for (int x = 0; x < POWER_LARGEST; x++) {
179         d->crawlerstats[x].start_time = current_time;
180         d->crawlerstats[x].run_complete = false;
181     }
182     pthread_mutex_unlock(&d->lock);
183     return 0;
184 }
185 
crawler_expired_doneclass(crawler_module_t * cm,int slab_cls)186 static void crawler_expired_doneclass(crawler_module_t *cm, int slab_cls) {
187     struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data;
188     pthread_mutex_lock(&d->lock);
189     d->crawlerstats[slab_cls].end_time = current_time;
190     d->crawlerstats[slab_cls].run_complete = true;
191     pthread_mutex_unlock(&d->lock);
192 }
193 
crawler_expired_finalize(crawler_module_t * cm)194 static void crawler_expired_finalize(crawler_module_t *cm) {
195     struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data;
196     pthread_mutex_lock(&d->lock);
197     d->end_time = current_time;
198     d->crawl_complete = true;
199     pthread_mutex_unlock(&d->lock);
200 
201     if (!d->is_external) {
202         free(d);
203     }
204 }
205 
206 /* I pulled this out to make the main thread clearer, but it reaches into the
207  * main thread's values too much. Should rethink again.
208  */
crawler_expired_eval(crawler_module_t * cm,item * search,uint32_t hv,int i)209 static void crawler_expired_eval(crawler_module_t *cm, item *search, uint32_t hv, int i) {
210     struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data;
211     pthread_mutex_lock(&d->lock);
212     crawlerstats_t *s = &d->crawlerstats[i];
213     int is_flushed = item_is_flushed(search);
214 #ifdef EXTSTORE
215     bool is_valid = true;
216     if (search->it_flags & ITEM_HDR) {
217         is_valid = storage_validate_item(storage, search);
218     }
219 #endif
220     if ((search->exptime != 0 && search->exptime < current_time)
221         || is_flushed
222 #ifdef EXTSTORE
223         || !is_valid
224 #endif
225         ) {
226         crawlers[i].reclaimed++;
227         s->reclaimed++;
228 
229         if (settings.verbose > 1) {
230             int ii;
231             char *key = ITEM_key(search);
232             fprintf(stderr, "LRU crawler found an expired item (flags: %d, slab: %d): ",
233                 search->it_flags, search->slabs_clsid);
234             for (ii = 0; ii < search->nkey; ++ii) {
235                 fprintf(stderr, "%c", key[ii]);
236             }
237             fprintf(stderr, "\n");
238         }
239         if ((search->it_flags & ITEM_FETCHED) == 0 && !is_flushed) {
240             crawlers[i].unfetched++;
241         }
242 #ifdef EXTSTORE
243         STORAGE_delete(storage, search);
244 #endif
245         do_item_unlink_nolock(search, hv);
246         do_item_remove(search);
247     } else {
248         s->seen++;
249         refcount_decr(search);
250         if (search->exptime == 0) {
251             s->noexp++;
252         } else if (search->exptime - current_time > 3599) {
253             s->ttl_hourplus++;
254         } else {
255             rel_time_t ttl_remain = search->exptime - current_time;
256             int bucket = ttl_remain / 60;
257             if (bucket <= 60) {
258                 s->histo[bucket]++;
259             }
260         }
261     }
262     pthread_mutex_unlock(&d->lock);
263 }
264 
crawler_metadump_init(crawler_module_t * cm,void * data)265 static int crawler_metadump_init(crawler_module_t *cm, void *data) {
266     cm->status = 0;
267     return 0;
268 }
269 
crawler_metadump_eval(crawler_module_t * cm,item * it,uint32_t hv,int i)270 static void crawler_metadump_eval(crawler_module_t *cm, item *it, uint32_t hv, int i) {
271     char keybuf[KEY_MAX_URI_ENCODED_LENGTH];
272     int is_flushed = item_is_flushed(it);
273     /* Ignore expired content. */
274     if ((it->exptime != 0 && it->exptime < current_time)
275         || is_flushed) {
276         refcount_decr(it);
277         return;
278     }
279     client_flags_t flags;
280     FLAGS_CONV(it, flags);
281     // TODO: uriencode directly into the buffer.
282     uriencode(ITEM_key(it), keybuf, it->nkey, KEY_MAX_URI_ENCODED_LENGTH);
283     int total = snprintf(cm->c.buf + cm->c.bufused, 4096,
284             "key=%s exp=%ld la=%llu cas=%llu fetch=%s cls=%u size=%lu flags=%llu\n",
285             keybuf,
286             (it->exptime == 0) ? -1 : (long)(it->exptime + process_started),
287             (unsigned long long)(it->time + process_started),
288             (unsigned long long)ITEM_get_cas(it),
289             (it->it_flags & ITEM_FETCHED) ? "yes" : "no",
290             ITEM_clsid(it),
291             (unsigned long) ITEM_ntotal(it),
292             (unsigned long long)flags);
293     refcount_decr(it);
294     // TODO: some way of tracking the errors. these should be impossible given
295     // the space requirements.
296     if (total >= LRU_CRAWLER_MINBUFSPACE - 1 || total <= 0) {
297         // Failed to write, don't push it.
298         return;
299     }
300     cm->c.bufused += total;
301 }
302 
crawler_metadump_finalize(crawler_module_t * cm)303 static void crawler_metadump_finalize(crawler_module_t *cm) {
304     if (cm->c.c != NULL) {
305         // flush any pending data.
306         if (lru_crawler_write(&cm->c) == 0) {
307             // Only nonzero status right now means we were locked
308             if (cm->status != 0) {
309                 const char *errstr = "ERROR locked try again later\r\n";
310                 size_t errlen = strlen(errstr);
311                 memcpy(cm->c.buf, errstr, errlen);
312                 cm->c.bufused += errlen;
313             } else {
314                 memcpy(cm->c.buf, "END\r\n", 5);
315                 cm->c.bufused += 5;
316             }
317         }
318     }
319 }
320 
crawler_mgdump_init(crawler_module_t * cm,void * data)321 static int crawler_mgdump_init(crawler_module_t *cm, void *data) {
322     cm->status = 0;
323     return 0;
324 }
325 
crawler_mgdump_eval(crawler_module_t * cm,item * it,uint32_t hv,int i)326 static void crawler_mgdump_eval(crawler_module_t *cm, item *it, uint32_t hv, int i) {
327     int is_flushed = item_is_flushed(it);
328     /* Ignore expired content. */
329     if ((it->exptime != 0 && it->exptime < current_time)
330         || is_flushed) {
331         refcount_decr(it);
332         return;
333     }
334 
335     char *p = cm->c.buf + cm->c.bufused; // buffer offset.
336     char *start = p;
337     memcpy(p, "mg ", 3);
338     p += 3;
339     if (it->it_flags & ITEM_KEY_BINARY) {
340         p += base64_encode((unsigned char *) ITEM_key(it), it->nkey, (unsigned char*) p, LRU_CRAWLER_MINBUFSPACE/2);
341         memcpy(p, " b\r\n", 4);
342         p += 4;
343     } else {
344         memcpy(p, ITEM_key(it), it->nkey);
345         p += it->nkey;
346         memcpy(p, "\r\n", 2);
347         p += 2;
348     }
349     int total = p - start;
350 
351     refcount_decr(it);
352     cm->c.bufused += total;
353 }
354 
crawler_mgdump_finalize(crawler_module_t * cm)355 static void crawler_mgdump_finalize(crawler_module_t *cm) {
356     if (cm->c.c != NULL) {
357         // flush any pending data.
358         if (lru_crawler_write(&cm->c) == 0) {
359             // Only nonzero status right now means we were locked
360             if (cm->status != 0) {
361                 const char *errstr = "ERROR locked try again later\r\n";
362                 size_t errlen = strlen(errstr);
363                 memcpy(cm->c.buf, errstr, errlen);
364                 cm->c.bufused += errlen;
365             } else {
366                 memcpy(cm->c.buf, "EN\r\n", 4);
367                 cm->c.bufused += 4;
368             }
369         }
370     }
371 }
372 
373 // write the whole buffer out to the client socket.
lru_crawler_write(crawler_client_t * c)374 static int lru_crawler_write(crawler_client_t *c) {
375     unsigned int data_size = c->bufused;
376     unsigned int sent = 0;
377     struct pollfd to_poll[1];
378     to_poll[0].fd = c->sfd;
379     to_poll[0].events = POLLOUT;
380 
381     if (c->c == NULL) return -1;
382     if (data_size == 0) return 0;
383 
384     while (sent < data_size) {
385         int ret = poll(to_poll, 1, 1000);
386 
387         if (ret < 0) {
388             // fatal.
389             lru_crawler_close_client(c);
390             return -1;
391         }
392 
393         if (ret == 0) return 0;
394 
395         // check if socket was closed on us.
396         if (to_poll[0].revents & POLLIN) {
397             char buf[1];
398             int res = ((conn*)c->c)->read(c->c, buf, 1);
399             if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK))) {
400                 lru_crawler_close_client(c);
401                 return -1;
402             }
403         }
404 
405         if (to_poll[0].revents & (POLLHUP|POLLERR)) {
406             // got socket hangup.
407             lru_crawler_close_client(c);
408             return -1;
409         } else if (to_poll[0].revents & POLLOUT) {
410             // socket is writeable.
411             int total = ((conn*)c->c)->write(c->c, c->buf + sent, data_size - sent);
412             if (total == -1) {
413                 if (errno != EAGAIN && errno != EWOULDBLOCK) {
414                     lru_crawler_close_client(c);
415                     return -1;
416                 }
417             } else if (total == 0) {
418                 lru_crawler_close_client(c);
419                 return -1;
420             }
421             sent += total;
422         }
423     } // while
424 
425     // write buffer now empty.
426     c->bufused = 0;
427 
428     return 0;
429 }
430 
lru_crawler_class_done(int i)431 static void lru_crawler_class_done(int i) {
432     crawlers[i].it_flags = 0;
433     crawler_count--;
434     do_item_unlinktail_q((item *)&crawlers[i]);
435     do_item_stats_add_crawl(i, crawlers[i].reclaimed,
436             crawlers[i].unfetched, crawlers[i].checked);
437     pthread_mutex_unlock(&lru_locks[i]);
438     if (active_crawler_mod.mod->doneclass != NULL)
439         active_crawler_mod.mod->doneclass(&active_crawler_mod, i);
440 }
441 
442 // ensure we build the buffer a little bit to cut down on poll/write syscalls.
443 #define MIN_ITEMS_PER_WRITE 16
item_crawl_hash(void)444 static void item_crawl_hash(void) {
445     // get iterator from assoc. can hang for a long time.
446     // - blocks hash expansion
447     void *iter = assoc_get_iterator();
448     int crawls_persleep = settings.crawls_persleep;
449     item *it = NULL;
450     int items = 0;
451 
452     // Could not get the iterator: probably locked due to hash expansion.
453     if (iter == NULL) {
454         active_crawler_mod.status = 1;
455         return;
456     }
457 
458     // loop while iterator returns something
459     // - iterator func handles bucket-walking
460     // - iterator returns with bucket locked.
461     while (assoc_iterate(iter, &it)) {
462         // if iterator returns true but no item, we're inbetween buckets and
463         // can do cleanup work without holding an item lock.
464         if (it == NULL) {
465             if (active_crawler_mod.c.c != NULL) {
466                 if (items > MIN_ITEMS_PER_WRITE) {
467                     int ret = lru_crawler_write(&active_crawler_mod.c);
468                     items = 0;
469                     if (ret != 0) {
470                         // fail out and finalize.
471                         break;
472                     }
473                 }
474             } else if (active_crawler_mod.mod->needs_client) {
475                 // fail out and finalize.
476                 break;
477             }
478 
479             // - sleep bits from orig loop
480             if (crawls_persleep <= 0 && settings.lru_crawler_sleep) {
481                 pthread_mutex_unlock(&lru_crawler_lock);
482                 usleep(settings.lru_crawler_sleep);
483                 pthread_mutex_lock(&lru_crawler_lock);
484                 crawls_persleep = settings.crawls_persleep;
485             } else if (!settings.lru_crawler_sleep) {
486                 // TODO: only cycle lock every N?
487                 pthread_mutex_unlock(&lru_crawler_lock);
488                 pthread_mutex_lock(&lru_crawler_lock);
489             }
490             continue;
491         }
492 
493         // double check that the item isn't in a transitional state.
494         if (refcount_incr(it) < 2) {
495             refcount_decr(it);
496             continue;
497         }
498 
499         // We're presently holding an item lock, so we cannot flush the
500         // buffer to the network socket as the syscall is both slow and could
501         // hang waiting for POLLOUT. Instead we must expand the buffer.
502         if (active_crawler_mod.c.c != NULL) {
503             crawler_client_t *c = &active_crawler_mod.c;
504             if (c->buflen - c->bufused < LRU_CRAWLER_MINBUFSPACE) {
505                 if (lru_crawler_expand_buf(c) != 0) {
506                     // failed to expand buffer, stop.
507                     break;
508                 }
509             }
510         }
511         // FIXME: missing hv and i are fine for metadump eval, but not fine
512         // for expire eval.
513         active_crawler_mod.mod->eval(&active_crawler_mod, it, 0, 0);
514         crawls_persleep--;
515         items++;
516     }
517 
518     // must finalize or we leave the hash table expansion blocked.
519     assoc_iterate_final(iter);
520     return;
521 }
522 
item_crawler_thread(void * arg)523 static void *item_crawler_thread(void *arg) {
524     int i;
525     int crawls_persleep = settings.crawls_persleep;
526 
527     pthread_mutex_lock(&lru_crawler_lock);
528     pthread_cond_signal(&lru_crawler_cond);
529     settings.lru_crawler = true;
530     if (settings.verbose > 2)
531         fprintf(stderr, "Starting LRU crawler background thread\n");
532     while (do_run_lru_crawler_thread) {
533     pthread_cond_wait(&lru_crawler_cond, &lru_crawler_lock);
534 
535     if (crawler_count == -1) {
536         item_crawl_hash();
537         crawler_count = 0;
538     } else {
539     while (crawler_count) {
540         item *search = NULL;
541         void *hold_lock = NULL;
542 
543         for (i = POWER_SMALLEST; i < LARGEST_ID; i++) {
544             if (crawlers[i].it_flags != 1) {
545                 continue;
546             }
547 
548             if (active_crawler_mod.c.c != NULL) {
549                 crawler_client_t *c = &active_crawler_mod.c;
550                 if (c->buflen - c->bufused < LRU_CRAWLER_MINBUFSPACE) {
551                     int ret = lru_crawler_write(c);
552                     if (ret != 0) {
553                         lru_crawler_class_done(i);
554                         continue;
555                     }
556                 }
557             } else if (active_crawler_mod.mod->needs_client) {
558                 lru_crawler_class_done(i);
559                 continue;
560             }
561             pthread_mutex_lock(&lru_locks[i]);
562             search = do_item_crawl_q((item *)&crawlers[i]);
563             if (search == NULL ||
564                 (crawlers[i].remaining && --crawlers[i].remaining < 1)) {
565                 if (settings.verbose > 2)
566                     fprintf(stderr, "Nothing left to crawl for %d\n", i);
567                 lru_crawler_class_done(i);
568                 continue;
569             }
570             uint32_t hv = hash(ITEM_key(search), search->nkey);
571             /* Attempt to hash item lock the "search" item. If locked, no
572              * other callers can incr the refcount
573              */
574             if ((hold_lock = item_trylock(hv)) == NULL) {
575                 pthread_mutex_unlock(&lru_locks[i]);
576                 continue;
577             }
578             /* Now see if the item is refcount locked */
579             if (refcount_incr(search) != 2) {
580                 refcount_decr(search);
581                 if (hold_lock)
582                     item_trylock_unlock(hold_lock);
583                 pthread_mutex_unlock(&lru_locks[i]);
584                 continue;
585             }
586 
587             crawlers[i].checked++;
588             /* Frees the item or decrements the refcount. */
589             /* Interface for this could improve: do the free/decr here
590              * instead? */
591             if (!active_crawler_mod.mod->needs_lock) {
592                 pthread_mutex_unlock(&lru_locks[i]);
593             }
594 
595             active_crawler_mod.mod->eval(&active_crawler_mod, search, hv, i);
596 
597             if (hold_lock)
598                 item_trylock_unlock(hold_lock);
599             if (active_crawler_mod.mod->needs_lock) {
600                 pthread_mutex_unlock(&lru_locks[i]);
601             }
602 
603             if (crawls_persleep-- <= 0 && settings.lru_crawler_sleep) {
604                 pthread_mutex_unlock(&lru_crawler_lock);
605                 usleep(settings.lru_crawler_sleep);
606                 pthread_mutex_lock(&lru_crawler_lock);
607                 crawls_persleep = settings.crawls_persleep;
608             } else if (!settings.lru_crawler_sleep) {
609                 // TODO: only cycle lock every N?
610                 pthread_mutex_unlock(&lru_crawler_lock);
611                 pthread_mutex_lock(&lru_crawler_lock);
612             }
613         }
614     } // while
615     } // if crawler_count
616 
617     if (active_crawler_mod.mod != NULL) {
618         if (active_crawler_mod.mod->finalize != NULL)
619             active_crawler_mod.mod->finalize(&active_crawler_mod);
620         while (active_crawler_mod.c.c != NULL && active_crawler_mod.c.bufused != 0) {
621             lru_crawler_write(&active_crawler_mod.c);
622         }
623         // Double checking in case the client closed during the poll
624         if (active_crawler_mod.c.c != NULL) {
625             lru_crawler_release_client(&active_crawler_mod.c);
626         }
627         active_crawler_mod.mod = NULL;
628     }
629 
630     if (settings.verbose > 2)
631         fprintf(stderr, "LRU crawler thread sleeping\n");
632 
633     STATS_LOCK();
634     stats_state.lru_crawler_running = false;
635     STATS_UNLOCK();
636     }
637     pthread_mutex_unlock(&lru_crawler_lock);
638     if (settings.verbose > 2)
639         fprintf(stderr, "LRU crawler thread stopping\n");
640     settings.lru_crawler = false;
641 
642     return NULL;
643 }
644 
645 static pthread_t item_crawler_tid;
646 
stop_item_crawler_thread(bool wait)647 int stop_item_crawler_thread(bool wait) {
648     int ret;
649     pthread_mutex_lock(&lru_crawler_lock);
650     if (do_run_lru_crawler_thread == 0) {
651         pthread_mutex_unlock(&lru_crawler_lock);
652         return 0;
653     }
654     do_run_lru_crawler_thread = 0;
655     pthread_cond_signal(&lru_crawler_cond);
656     pthread_mutex_unlock(&lru_crawler_lock);
657     if (wait && (ret = pthread_join(item_crawler_tid, NULL)) != 0) {
658         fprintf(stderr, "Failed to stop LRU crawler thread: %s\n", strerror(ret));
659         return -1;
660     }
661     return 0;
662 }
663 
664 /* Lock dance to "block" until thread is waiting on its condition:
665  * caller locks mtx. caller spawns thread.
666  * thread blocks on mutex.
667  * caller waits on condition, releases lock.
668  * thread gets lock, sends signal.
669  * caller can't wait, as thread has lock.
670  * thread waits on condition, releases lock
671  * caller wakes on condition, gets lock.
672  * caller immediately releases lock.
673  * thread is now safely waiting on condition before the caller returns.
674  */
start_item_crawler_thread(void)675 int start_item_crawler_thread(void) {
676     int ret;
677 
678     if (settings.lru_crawler)
679         return -1;
680     pthread_mutex_lock(&lru_crawler_lock);
681     do_run_lru_crawler_thread = 1;
682     if ((ret = pthread_create(&item_crawler_tid, NULL,
683         item_crawler_thread, NULL)) != 0) {
684         fprintf(stderr, "Can't create LRU crawler thread: %s\n",
685             strerror(ret));
686         pthread_mutex_unlock(&lru_crawler_lock);
687         return -1;
688     }
689     thread_setname(item_crawler_tid, "mc-itemcrawler");
690     /* Avoid returning until the crawler has actually started */
691     pthread_cond_wait(&lru_crawler_cond, &lru_crawler_lock);
692     pthread_mutex_unlock(&lru_crawler_lock);
693 
694     return 0;
695 }
696 
697 /* 'remaining' is passed in so the LRU maintainer thread can scrub the whole
698  * LRU every time.
699  */
do_lru_crawler_start(uint32_t id,uint32_t remaining)700 static int do_lru_crawler_start(uint32_t id, uint32_t remaining) {
701     uint32_t sid = id;
702     int starts = 0;
703 
704     pthread_mutex_lock(&lru_locks[sid]);
705     if (crawlers[sid].it_flags == 0) {
706         if (settings.verbose > 2)
707             fprintf(stderr, "Kicking LRU crawler off for LRU %u\n", sid);
708         crawlers[sid].nbytes = 0;
709         crawlers[sid].nkey = 0;
710         crawlers[sid].it_flags = 1; /* For a crawler, this means enabled. */
711         crawlers[sid].next = 0;
712         crawlers[sid].prev = 0;
713         crawlers[sid].time = 0;
714         if (remaining == LRU_CRAWLER_CAP_REMAINING) {
715             remaining = do_get_lru_size(sid);
716         }
717         /* Values for remaining:
718          * remaining = 0
719          * - scan all elements, until a NULL is reached
720          * - if empty, NULL is reached right away
721          * remaining = n + 1
722          * - first n elements are parsed (or until a NULL is reached)
723          */
724         if (remaining) remaining++;
725         crawlers[sid].remaining = remaining;
726         crawlers[sid].slabs_clsid = sid;
727         crawlers[sid].reclaimed = 0;
728         crawlers[sid].unfetched = 0;
729         crawlers[sid].checked = 0;
730         do_item_linktail_q((item *)&crawlers[sid]);
731         crawler_count++;
732         starts++;
733     }
734     pthread_mutex_unlock(&lru_locks[sid]);
735     return starts;
736 }
737 
lru_crawler_set_client(crawler_module_t * cm,void * c,const int sfd)738 static int lru_crawler_set_client(crawler_module_t *cm, void *c, const int sfd) {
739     crawler_client_t *crawlc = &cm->c;
740     if (crawlc->c != NULL) {
741         return -1;
742     }
743     crawlc->c = c;
744     crawlc->sfd = sfd;
745 
746     size_t size = LRU_CRAWLER_MINBUFSPACE * 16;
747     crawlc->buf = malloc(size);
748 
749     if (crawlc->buf == NULL) {
750         return -2;
751     }
752     crawlc->buflen = size;
753     crawlc->bufused = 0;
754     return 0;
755 }
756 
lru_crawler_start(uint8_t * ids,uint32_t remaining,const enum crawler_run_type type,void * data,void * c,const int sfd)757 int lru_crawler_start(uint8_t *ids, uint32_t remaining,
758                              const enum crawler_run_type type, void *data,
759                              void *c, const int sfd) {
760     int starts = 0;
761     bool is_running;
762     static rel_time_t block_ae_until = 0;
763     pthread_mutex_lock(&lru_crawler_lock);
764     STATS_LOCK();
765     is_running = stats_state.lru_crawler_running;
766     STATS_UNLOCK();
767     if (do_run_lru_crawler_thread == 0) {
768         pthread_mutex_unlock(&lru_crawler_lock);
769         return -2;
770     }
771 
772     if (is_running &&
773             !(type == CRAWLER_AUTOEXPIRE && active_crawler_type == CRAWLER_AUTOEXPIRE)) {
774         pthread_mutex_unlock(&lru_crawler_lock);
775         block_ae_until = current_time + 60;
776         return -1;
777     }
778 
779     if (type == CRAWLER_AUTOEXPIRE && block_ae_until > current_time) {
780         pthread_mutex_unlock(&lru_crawler_lock);
781         return -1;
782     }
783 
784     /* hash table walk only supported with metadump for now. */
785     if (ids == NULL && type != CRAWLER_METADUMP && type != CRAWLER_MGDUMP) {
786         pthread_mutex_unlock(&lru_crawler_lock);
787         return -2;
788     }
789 
790     /* Configure the module */
791     if (!is_running) {
792         assert(crawler_mod_regs[type] != NULL);
793         active_crawler_mod.mod = crawler_mod_regs[type];
794         active_crawler_type = type;
795         if (active_crawler_mod.mod->init != NULL) {
796             active_crawler_mod.mod->init(&active_crawler_mod, data);
797         }
798         if (active_crawler_mod.mod->needs_client) {
799             if (c == NULL || sfd == 0) {
800                 pthread_mutex_unlock(&lru_crawler_lock);
801                 return -2;
802             }
803             if (lru_crawler_set_client(&active_crawler_mod, c, sfd) != 0) {
804                 pthread_mutex_unlock(&lru_crawler_lock);
805                 return -2;
806             }
807         }
808     }
809 
810     if (ids == NULL) {
811         /* NULL ids means to walk the hash table instead. */
812         starts = 1;
813         /* FIXME: hack to signal hash mode to the crawler thread.
814          * Something more clear would be nice.
815          */
816         crawler_count = -1;
817     } else {
818         /* we allow the autocrawler to restart sub-LRU's before completion */
819         for (int sid = POWER_SMALLEST; sid < POWER_LARGEST; sid++) {
820             if (ids[sid])
821                 starts += do_lru_crawler_start(sid, remaining);
822         }
823     }
824     if (starts) {
825         STATS_LOCK();
826         stats_state.lru_crawler_running = true;
827         stats.lru_crawler_starts++;
828         STATS_UNLOCK();
829         pthread_cond_signal(&lru_crawler_cond);
830     }
831     pthread_mutex_unlock(&lru_crawler_lock);
832     return starts;
833 }
834 
835 /*
836  * Also only clear the crawlerstats once per sid.
837  */
lru_crawler_crawl(char * slabs,const enum crawler_run_type type,void * c,const int sfd,unsigned int remaining)838 enum crawler_result_type lru_crawler_crawl(char *slabs, const enum crawler_run_type type,
839         void *c, const int sfd, unsigned int remaining) {
840     char *b = NULL;
841     uint32_t sid = 0;
842     int starts = 0;
843     uint8_t tocrawl[POWER_LARGEST];
844     bool hash_crawl = false;
845 
846     /* FIXME: I added this while debugging. Don't think it's needed? */
847     memset(tocrawl, 0, sizeof(uint8_t) * POWER_LARGEST);
848     if (strcmp(slabs, "all") == 0) {
849         for (sid = 0; sid < POWER_LARGEST; sid++) {
850             tocrawl[sid] = 1;
851         }
852     } else if (strcmp(slabs, "hash") == 0) {
853         hash_crawl = true;
854     } else {
855         for (char *p = strtok_r(slabs, ",", &b);
856              p != NULL;
857              p = strtok_r(NULL, ",", &b)) {
858 
859             if (!safe_strtoul(p, &sid) || sid < POWER_SMALLEST
860                     || sid >= MAX_NUMBER_OF_SLAB_CLASSES) {
861                 return CRAWLER_BADCLASS;
862             }
863             tocrawl[sid | TEMP_LRU] = 1;
864             tocrawl[sid | HOT_LRU] = 1;
865             tocrawl[sid | WARM_LRU] = 1;
866             tocrawl[sid | COLD_LRU] = 1;
867         }
868     }
869 
870     starts = lru_crawler_start(hash_crawl ? NULL : tocrawl, remaining, type, NULL, c, sfd);
871     if (starts == -1) {
872         return CRAWLER_RUNNING;
873     } else if (starts == -2) {
874         return CRAWLER_ERROR; /* FIXME: not very helpful. */
875     } else if (starts) {
876         return CRAWLER_OK;
877     } else {
878         return CRAWLER_NOTSTARTED;
879     }
880 }
881 
882 /* If we hold this lock, crawler can't wake up or move */
lru_crawler_pause(void)883 void lru_crawler_pause(void) {
884     pthread_mutex_lock(&lru_crawler_lock);
885 }
886 
lru_crawler_resume(void)887 void lru_crawler_resume(void) {
888     pthread_mutex_unlock(&lru_crawler_lock);
889 }
890 
init_lru_crawler(void * arg)891 int init_lru_crawler(void *arg) {
892     if (lru_crawler_initialized == 0) {
893 #ifdef EXTSTORE
894         storage = arg;
895 #endif
896         active_crawler_mod.c.c = NULL;
897         active_crawler_mod.mod = NULL;
898         active_crawler_mod.data = NULL;
899         lru_crawler_initialized = 1;
900     }
901     return 0;
902 }
903