1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 #include "memcached.h"
3 #ifdef EXTSTORE
4 
5 #include "storage.h"
6 #include "extstore.h"
7 #include <stdlib.h>
8 #include <stdio.h>
9 #include <stddef.h>
10 #include <string.h>
11 #include <limits.h>
12 #include <ctype.h>
13 
14 #define PAGE_BUCKET_DEFAULT 0
15 #define PAGE_BUCKET_COMPACT 1
16 #define PAGE_BUCKET_CHUNKED 2
17 #define PAGE_BUCKET_LOWTTL  3
18 #define PAGE_BUCKET_COLDCOMPACT 4
19 #define PAGE_BUCKET_OLD     5
20 // Not another bucket; this is the total number of buckets.
21 #define PAGE_BUCKET_COUNT   6
22 
23 /*
24  * API functions
25  */
26 static void storage_finalize_cb(io_pending_t *pending);
27 static void storage_return_cb(io_pending_t *pending);
28 
29 // re-cast an io_pending_t into this more descriptive structure.
30 // the first few items _must_ match the original struct.
31 typedef struct _io_pending_storage_t {
32     int io_queue_type;
33     LIBEVENT_THREAD *thread;
34     conn *c;
35     mc_resp *resp;
36     io_queue_cb return_cb;    // called on worker thread.
37     io_queue_cb finalize_cb;  // called back on the worker thread.
38     STAILQ_ENTRY(io_pending_t) iop_next; // queue chain.
39                               /* original struct ends here */
40     item *hdr_it;             /* original header item. */
41     obj_io io_ctx;            /* embedded extstore IO header */
42     unsigned int iovec_data;  /* specific index of data iovec */
43     bool noreply;             /* whether the response had noreply set */
44     bool miss;                /* signal a miss to unlink hdr_it */
45     bool badcrc;              /* signal a crc failure */
46     bool active;              /* tells if IO was dispatched or not */
47 } io_pending_storage_t;
48 
49 static pthread_t storage_compact_tid;
50 static pthread_mutex_t storage_compact_plock;
51 static pthread_cond_t storage_compact_cond;
52 
53 // Only call this if item has ITEM_HDR
storage_validate_item(void * e,item * it)54 bool storage_validate_item(void *e, item *it) {
55     item_hdr *hdr = (item_hdr *)ITEM_data(it);
56     if (extstore_check(e, hdr->page_id, hdr->page_version) != 0) {
57         return false;
58     } else {
59         return true;
60     }
61 }
62 
storage_delete(void * e,item * it)63 void storage_delete(void *e, item *it) {
64     if (it->it_flags & ITEM_HDR) {
65         item_hdr *hdr = (item_hdr *)ITEM_data(it);
66         extstore_delete(e, hdr->page_id, hdr->page_version,
67                 1, ITEM_ntotal(it));
68     }
69 }
70 
71 // Function for the extra stats called from a protocol.
72 // NOTE: This either needs a name change or a wrapper, perhaps?
73 // it's defined here to reduce exposure of extstore.h to the rest of memcached
74 // but feels a little off being defined here.
75 // At very least maybe "process_storage_stats" in line with making this more
76 // of a generic wrapper module.
process_extstore_stats(ADD_STAT add_stats,void * c)77 void process_extstore_stats(ADD_STAT add_stats, void *c) {
78     int i;
79     char key_str[STAT_KEY_LEN];
80     char val_str[STAT_VAL_LEN];
81     int klen = 0, vlen = 0;
82     struct extstore_stats st;
83 
84     assert(add_stats);
85 
86     void *storage = ext_storage;
87     if (storage == NULL) {
88         return;
89     }
90     extstore_get_stats(storage, &st);
91     st.page_data = calloc(st.page_count, sizeof(struct extstore_page_data));
92     extstore_get_page_data(storage, &st);
93 
94     for (i = 0; i < st.page_count; i++) {
95         APPEND_NUM_STAT(i, "version", "%llu",
96                 (unsigned long long) st.page_data[i].version);
97         APPEND_NUM_STAT(i, "bytes", "%llu",
98                 (unsigned long long) st.page_data[i].bytes_used);
99         APPEND_NUM_STAT(i, "bucket", "%u",
100                 st.page_data[i].bucket);
101         APPEND_NUM_STAT(i, "free_bucket", "%u",
102                 st.page_data[i].free_bucket);
103     }
104 
105     free(st.page_data);
106 }
107 
108 // Additional storage stats for the main stats output.
storage_stats(ADD_STAT add_stats,void * c)109 void storage_stats(ADD_STAT add_stats, void *c) {
110     struct extstore_stats st;
111     if (ext_storage) {
112         STATS_LOCK();
113         APPEND_STAT("extstore_compact_lost", "%llu", (unsigned long long)stats.extstore_compact_lost);
114         APPEND_STAT("extstore_compact_rescues", "%llu", (unsigned long long)stats.extstore_compact_rescues);
115         APPEND_STAT("extstore_compact_resc_cold", "%llu", (unsigned long long)stats.extstore_compact_resc_cold);
116         APPEND_STAT("extstore_compact_resc_old", "%llu", (unsigned long long)stats.extstore_compact_resc_old);
117         APPEND_STAT("extstore_compact_skipped", "%llu", (unsigned long long)stats.extstore_compact_skipped);
118         STATS_UNLOCK();
119         extstore_get_stats(ext_storage, &st);
120         APPEND_STAT("extstore_page_allocs", "%llu", (unsigned long long)st.page_allocs);
121         APPEND_STAT("extstore_page_evictions", "%llu", (unsigned long long)st.page_evictions);
122         APPEND_STAT("extstore_page_reclaims", "%llu", (unsigned long long)st.page_reclaims);
123         APPEND_STAT("extstore_pages_free", "%llu", (unsigned long long)st.pages_free);
124         APPEND_STAT("extstore_pages_used", "%llu", (unsigned long long)st.pages_used);
125         APPEND_STAT("extstore_objects_evicted", "%llu", (unsigned long long)st.objects_evicted);
126         APPEND_STAT("extstore_objects_read", "%llu", (unsigned long long)st.objects_read);
127         APPEND_STAT("extstore_objects_written", "%llu", (unsigned long long)st.objects_written);
128         APPEND_STAT("extstore_objects_used", "%llu", (unsigned long long)st.objects_used);
129         APPEND_STAT("extstore_bytes_evicted", "%llu", (unsigned long long)st.bytes_evicted);
130         APPEND_STAT("extstore_bytes_written", "%llu", (unsigned long long)st.bytes_written);
131         APPEND_STAT("extstore_bytes_read", "%llu", (unsigned long long)st.bytes_read);
132         APPEND_STAT("extstore_bytes_used", "%llu", (unsigned long long)st.bytes_used);
133         APPEND_STAT("extstore_bytes_fragmented", "%llu", (unsigned long long)st.bytes_fragmented);
134         APPEND_STAT("extstore_limit_maxbytes", "%llu", (unsigned long long)(st.page_count * st.page_size));
135         APPEND_STAT("extstore_io_queue", "%llu", (unsigned long long)(st.io_queue));
136     }
137 
138 }
139 
140 // This callback runs in the IO thread.
141 // TODO: Some or all of this should move to the
142 // io_pending's callback back in the worker thread.
143 // It might make sense to keep the crc32c check here though.
_storage_get_item_cb(void * e,obj_io * io,int ret)144 static void _storage_get_item_cb(void *e, obj_io *io, int ret) {
145     // FIXME: assumes success
146     io_pending_storage_t *p = (io_pending_storage_t *)io->data;
147     mc_resp *resp = p->resp;
148     conn *c = p->c;
149     assert(p->active == true);
150     item *read_it = (item *)io->buf;
151     bool miss = false;
152 
153     // TODO: How to do counters for hit/misses?
154     if (ret < 1) {
155         miss = true;
156     } else {
157         uint32_t crc2;
158         uint32_t crc = (uint32_t) read_it->exptime;
159         int x;
160         // item is chunked, crc the iov's
161         if (io->iov != NULL) {
162             // first iov is the header, which we don't use beyond crc
163             crc2 = crc32c(0, (char *)io->iov[0].iov_base+STORE_OFFSET, io->iov[0].iov_len-STORE_OFFSET);
164             // make sure it's not sent. hack :(
165             io->iov[0].iov_len = 0;
166             for (x = 1; x < io->iovcnt; x++) {
167                 crc2 = crc32c(crc2, (char *)io->iov[x].iov_base, io->iov[x].iov_len);
168             }
169         } else {
170             crc2 = crc32c(0, (char *)read_it+STORE_OFFSET, io->len-STORE_OFFSET);
171         }
172 
173         if (crc != crc2) {
174             miss = true;
175             p->badcrc = true;
176         }
177     }
178 
179     if (miss) {
180         if (p->noreply) {
181             // In all GET cases, noreply means we send nothing back.
182             resp->skip = true;
183         } else {
184             // TODO: This should be movable to the worker thread.
185             // Convert the binprot response into a miss response.
186             // The header requires knowing a bunch of stateful crap, so rather
187             // than simply writing out a "new" miss response we mangle what's
188             // already there.
189             if (c->protocol == binary_prot) {
190                 protocol_binary_response_header *header =
191                     (protocol_binary_response_header *)resp->wbuf;
192 
193                 // cut the extra nbytes off of the body_len
194                 uint32_t body_len = ntohl(header->response.bodylen);
195                 uint8_t hdr_len = header->response.extlen;
196                 body_len -= resp->iov[p->iovec_data].iov_len + hdr_len;
197                 resp->tosend -= resp->iov[p->iovec_data].iov_len + hdr_len;
198                 header->response.extlen = 0;
199                 header->response.status = (uint16_t)htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT);
200                 header->response.bodylen = htonl(body_len);
201 
202                 // truncate the data response.
203                 resp->iov[p->iovec_data].iov_len = 0;
204                 // wipe the extlen iov... wish it was just a flat buffer.
205                 resp->iov[p->iovec_data-1].iov_len = 0;
206                 resp->chunked_data_iov = 0;
207             } else {
208                 int i;
209                 // Meta commands have EN status lines for miss, rather than
210                 // END as a trailer as per normal ascii.
211                 if (resp->iov[0].iov_len >= 3
212                         && memcmp(resp->iov[0].iov_base, "VA ", 3) == 0) {
213                     // TODO: These miss translators should use specific callback
214                     // functions attached to the io wrap. This is weird :(
215                     resp->iovcnt = 1;
216                     resp->iov[0].iov_len = 4;
217                     resp->iov[0].iov_base = "EN\r\n";
218                     resp->tosend = 4;
219                 } else {
220                     // Wipe the iovecs up through our data injection.
221                     // Allows trailers to be returned (END)
222                     for (i = 0; i <= p->iovec_data; i++) {
223                         resp->tosend -= resp->iov[i].iov_len;
224                         resp->iov[i].iov_len = 0;
225                         resp->iov[i].iov_base = NULL;
226                     }
227                 }
228                 resp->chunked_total = 0;
229                 resp->chunked_data_iov = 0;
230             }
231         }
232         p->miss = true;
233     } else {
234         assert(read_it->slabs_clsid != 0);
235         // TODO: should always use it instead of ITEM_data to kill more
236         // chunked special casing.
237         if ((read_it->it_flags & ITEM_CHUNKED) == 0) {
238             resp->iov[p->iovec_data].iov_base = ITEM_data(read_it);
239         }
240         p->miss = false;
241     }
242 
243     p->active = false;
244     //assert(c->io_wrapleft >= 0);
245 
246     return_io_pending((io_pending_t *)p);
247 }
248 
storage_get_item(conn * c,item * it,mc_resp * resp)249 int storage_get_item(conn *c, item *it, mc_resp *resp) {
250 #ifdef NEED_ALIGN
251     item_hdr hdr;
252     memcpy(&hdr, ITEM_data(it), sizeof(hdr));
253 #else
254     item_hdr *hdr = (item_hdr *)ITEM_data(it);
255 #endif
256     io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_EXTSTORE);
257     size_t ntotal = ITEM_ntotal(it);
258     unsigned int clsid = slabs_clsid(ntotal);
259     item *new_it;
260     bool chunked = false;
261     if (ntotal > settings.slab_chunk_size_max) {
262         // Pull a chunked item header.
263         client_flags_t flags;
264         FLAGS_CONV(it, flags);
265         new_it = item_alloc(ITEM_key(it), it->nkey, flags, it->exptime, it->nbytes);
266         assert(new_it == NULL || (new_it->it_flags & ITEM_CHUNKED));
267         chunked = true;
268     } else {
269         new_it = do_item_alloc_pull(ntotal, clsid);
270     }
271     if (new_it == NULL)
272         return -1;
273     // so we can free the chunk on a miss
274     new_it->slabs_clsid = clsid;
275 
276     io_pending_storage_t *p = do_cache_alloc(c->thread->io_cache);
277     // this is a re-cast structure, so assert that we never outsize it.
278     assert(sizeof(io_pending_t) >= sizeof(io_pending_storage_t));
279     memset(p, 0, sizeof(io_pending_storage_t));
280     p->active = true;
281     p->miss = false;
282     p->badcrc = false;
283     p->noreply = c->noreply;
284     p->thread = c->thread;
285     p->return_cb = storage_return_cb;
286     p->finalize_cb = storage_finalize_cb;
287     // io_pending owns the reference for this object now.
288     p->hdr_it = it;
289     p->resp = resp;
290     p->io_queue_type = IO_QUEUE_EXTSTORE;
291     obj_io *eio = &p->io_ctx;
292 
293     // FIXME: error handling.
294     if (chunked) {
295         unsigned int ciovcnt = 0;
296         size_t remain = new_it->nbytes;
297         item_chunk *chunk = (item_chunk *) ITEM_schunk(new_it);
298         // TODO: This might make sense as a _global_ cache vs a per-thread.
299         // but we still can't load objects requiring > IOV_MAX iovs.
300         // In the meantime, these objects are rare/slow enough that
301         // malloc/freeing a statically sized object won't cause us much pain.
302         eio->iov = malloc(sizeof(struct iovec) * IOV_MAX);
303         if (eio->iov == NULL) {
304             item_remove(new_it);
305             do_cache_free(c->thread->io_cache, p);
306             return -1;
307         }
308 
309         // fill the header so we can get the full data + crc back.
310         eio->iov[0].iov_base = new_it;
311         eio->iov[0].iov_len = ITEM_ntotal(new_it) - new_it->nbytes;
312         ciovcnt++;
313 
314         while (remain > 0) {
315             chunk = do_item_alloc_chunk(chunk, remain);
316             // FIXME: _pure evil_, silently erroring if item is too large.
317             if (chunk == NULL || ciovcnt > IOV_MAX-1) {
318                 item_remove(new_it);
319                 free(eio->iov);
320                 // TODO: wrapper function for freeing up an io wrap?
321                 eio->iov = NULL;
322                 do_cache_free(c->thread->io_cache, p);
323                 return -1;
324             }
325             eio->iov[ciovcnt].iov_base = chunk->data;
326             eio->iov[ciovcnt].iov_len = (remain < chunk->size) ? remain : chunk->size;
327             chunk->used = (remain < chunk->size) ? remain : chunk->size;
328             remain -= chunk->size;
329             ciovcnt++;
330         }
331 
332         eio->iovcnt = ciovcnt;
333     }
334 
335     // Chunked or non chunked we reserve a response iov here.
336     p->iovec_data = resp->iovcnt;
337     int iovtotal = (c->protocol == binary_prot) ? it->nbytes - 2 : it->nbytes;
338     if (chunked) {
339         resp_add_chunked_iov(resp, new_it, iovtotal);
340     } else {
341         resp_add_iov(resp, "", iovtotal);
342     }
343 
344     // We can't bail out anymore, so mc_resp owns the IO from here.
345     resp->io_pending = (io_pending_t *)p;
346 
347     eio->buf = (void *)new_it;
348     p->c = c;
349 
350     // We need to stack the sub-struct IO's together for submission.
351     eio->next = q->stack_ctx;
352     q->stack_ctx = eio;
353 
354     // No need to stack the io_pending's together as they live on mc_resp's.
355     assert(q->count >= 0);
356     q->count++;
357     // reference ourselves for the callback.
358     eio->data = (void *)p;
359 
360     // Now, fill in io->io based on what was in our header.
361 #ifdef NEED_ALIGN
362     eio->page_version = hdr.page_version;
363     eio->page_id = hdr.page_id;
364     eio->offset = hdr.offset;
365 #else
366     eio->page_version = hdr->page_version;
367     eio->page_id = hdr->page_id;
368     eio->offset = hdr->offset;
369 #endif
370     eio->len = ntotal;
371     eio->mode = OBJ_IO_READ;
372     eio->cb = _storage_get_item_cb;
373 
374     // FIXME: This stat needs to move to reflect # of flash hits vs misses
375     // for now it's a good gauge on how often we request out to flash at
376     // least.
377     pthread_mutex_lock(&c->thread->stats.mutex);
378     c->thread->stats.get_extstore++;
379     pthread_mutex_unlock(&c->thread->stats.mutex);
380 
381     return 0;
382 }
383 
storage_submit_cb(io_queue_t * q)384 void storage_submit_cb(io_queue_t *q) {
385     // Don't need to do anything special for extstore.
386     extstore_submit(q->ctx, q->stack_ctx);
387 
388     // need to reset the stack for next use.
389     q->stack_ctx = NULL;
390 }
391 
392 // Runs locally in worker thread.
recache_or_free(io_pending_t * pending)393 static void recache_or_free(io_pending_t *pending) {
394     // re-cast to our specific struct.
395     io_pending_storage_t *p = (io_pending_storage_t *)pending;
396 
397     conn *c = p->c;
398     obj_io *io = &p->io_ctx;
399     assert(io != NULL);
400     item *it = (item *)io->buf;
401     assert(c != NULL);
402     bool do_free = true;
403     if (p->active) {
404         // If request never dispatched, free the read buffer but leave the
405         // item header alone.
406         do_free = false;
407         size_t ntotal = ITEM_ntotal(p->hdr_it);
408         slabs_free(it, ntotal, slabs_clsid(ntotal));
409 
410         io_queue_t *q = conn_io_queue_get(c, p->io_queue_type);
411         q->count--;
412         assert(q->count >= 0);
413         pthread_mutex_lock(&c->thread->stats.mutex);
414         c->thread->stats.get_aborted_extstore++;
415         pthread_mutex_unlock(&c->thread->stats.mutex);
416     } else if (p->miss) {
417         // If request was ultimately a miss, unlink the header.
418         do_free = false;
419         size_t ntotal = ITEM_ntotal(p->hdr_it);
420         item_unlink(p->hdr_it);
421         slabs_free(it, ntotal, slabs_clsid(ntotal));
422         pthread_mutex_lock(&c->thread->stats.mutex);
423         c->thread->stats.miss_from_extstore++;
424         if (p->badcrc)
425             c->thread->stats.badcrc_from_extstore++;
426         pthread_mutex_unlock(&c->thread->stats.mutex);
427     } else if (settings.ext_recache_rate) {
428         // hashvalue is cuddled during store
429         uint32_t hv = (uint32_t)it->time;
430         // opt to throw away rather than wait on a lock.
431         void *hold_lock = item_trylock(hv);
432         if (hold_lock != NULL) {
433             item *h_it = p->hdr_it;
434             uint8_t flags = ITEM_LINKED|ITEM_FETCHED|ITEM_ACTIVE;
435             // Item must be recently hit at least twice to recache.
436             if (((h_it->it_flags & flags) == flags) &&
437                     h_it->time > current_time - ITEM_UPDATE_INTERVAL &&
438                     c->recache_counter++ % settings.ext_recache_rate == 0) {
439                 do_free = false;
440                 // In case it's been updated.
441                 it->exptime = h_it->exptime;
442                 it->it_flags &= ~ITEM_LINKED;
443                 it->refcount = 0;
444                 it->h_next = NULL; // might not be necessary.
445                 STORAGE_delete(c->thread->storage, h_it);
446                 item_replace(h_it, it, hv, ITEM_get_cas(h_it));
447                 pthread_mutex_lock(&c->thread->stats.mutex);
448                 c->thread->stats.recache_from_extstore++;
449                 pthread_mutex_unlock(&c->thread->stats.mutex);
450             }
451         }
452         if (hold_lock)
453             item_trylock_unlock(hold_lock);
454     }
455     if (do_free)
456         slabs_free(it, ITEM_ntotal(it), ITEM_clsid(it));
457 
458     p->io_ctx.buf = NULL;
459     p->io_ctx.next = NULL;
460     p->active = false;
461 
462     // TODO: reuse lock and/or hv.
463     item_remove(p->hdr_it);
464 }
465 
466 // Called after an IO has been returned to the worker thread.
storage_return_cb(io_pending_t * pending)467 static void storage_return_cb(io_pending_t *pending) {
468     io_queue_t *q = conn_io_queue_get(pending->c, pending->io_queue_type);
469     q->count--;
470     if (q->count == 0) {
471         conn_worker_readd(pending->c);
472     }
473 }
474 
475 // Called after responses have been transmitted. Need to free up related data.
storage_finalize_cb(io_pending_t * pending)476 static void storage_finalize_cb(io_pending_t *pending) {
477     recache_or_free(pending);
478     io_pending_storage_t *p = (io_pending_storage_t *)pending;
479     obj_io *io = &p->io_ctx;
480     // malloc'ed iovec list used for chunked extstore fetches.
481     if (io->iov) {
482         free(io->iov);
483         io->iov = NULL;
484     }
485     // don't need to free the main context, since it's embedded.
486 }
487 
488 /*
489  * WRITE FLUSH THREAD
490  */
491 
storage_write(void * storage,const int clsid,const int item_age)492 static int storage_write(void *storage, const int clsid, const int item_age) {
493     int did_moves = 0;
494     struct lru_pull_tail_return it_info;
495 
496     it_info.it = NULL;
497     lru_pull_tail(clsid, COLD_LRU, 0, LRU_PULL_RETURN_ITEM, 0, &it_info);
498     /* Item is locked, and we have a reference to it. */
499     if (it_info.it == NULL) {
500         return did_moves;
501     }
502 
503     obj_io io;
504     item *it = it_info.it;
505     /* First, storage for the header object */
506     size_t orig_ntotal = ITEM_ntotal(it);
507     client_flags_t flags;
508     if ((it->it_flags & ITEM_HDR) == 0 &&
509             (item_age == 0 || current_time - it->time > item_age)) {
510         FLAGS_CONV(it, flags);
511         item *hdr_it = do_item_alloc(ITEM_key(it), it->nkey, flags, it->exptime, sizeof(item_hdr));
512         /* Run the storage write understanding the start of the item is dirty.
513          * We will fill it (time/exptime/etc) from the header item on read.
514          */
515         if (hdr_it != NULL) {
516             int bucket = (it->it_flags & ITEM_CHUNKED) ?
517                 PAGE_BUCKET_CHUNKED : PAGE_BUCKET_DEFAULT;
518             // Compress soon to expire items into similar pages.
519             if (it->exptime - current_time < settings.ext_low_ttl) {
520                 bucket = PAGE_BUCKET_LOWTTL;
521             }
522             hdr_it->it_flags |= ITEM_HDR;
523             io.len = orig_ntotal;
524             io.mode = OBJ_IO_WRITE;
525             // NOTE: when the item is read back in, the slab mover
526             // may see it. Important to have refcount>=2 or ~ITEM_LINKED
527             assert(it->refcount >= 2);
528             // NOTE: write bucket vs free page bucket will disambiguate once
529             // lowttl feature is better understood.
530             if (extstore_write_request(storage, bucket, bucket, &io) == 0) {
531                 // cuddle the hash value into the time field so we don't have
532                 // to recalculate it.
533                 item *buf_it = (item *) io.buf;
534                 buf_it->time = it_info.hv;
535                 // copy from past the headers + time headers.
536                 // TODO: should be in items.c
537                 if (it->it_flags & ITEM_CHUNKED) {
538                     // Need to loop through the item and copy
539                     item_chunk *sch = (item_chunk *) ITEM_schunk(it);
540                     int remain = orig_ntotal;
541                     int copied = 0;
542                     // copy original header
543                     int hdrtotal = ITEM_ntotal(it) - it->nbytes;
544                     memcpy((char *)io.buf+STORE_OFFSET, (char *)it+STORE_OFFSET, hdrtotal - STORE_OFFSET);
545                     copied = hdrtotal;
546                     // copy data in like it were one large object.
547                     while (sch && remain) {
548                         assert(remain >= sch->used);
549                         memcpy((char *)io.buf+copied, sch->data, sch->used);
550                         // FIXME: use one variable?
551                         remain -= sch->used;
552                         copied += sch->used;
553                         sch = sch->next;
554                     }
555                 } else {
556                     memcpy((char *)io.buf+STORE_OFFSET, (char *)it+STORE_OFFSET, io.len-STORE_OFFSET);
557                 }
558                 // crc what we copied so we can do it sequentially.
559                 buf_it->it_flags &= ~ITEM_LINKED;
560                 buf_it->exptime = crc32c(0, (char*)io.buf+STORE_OFFSET, orig_ntotal-STORE_OFFSET);
561                 extstore_write(storage, &io);
562                 item_hdr *hdr = (item_hdr *) ITEM_data(hdr_it);
563                 hdr->page_version = io.page_version;
564                 hdr->page_id = io.page_id;
565                 hdr->offset  = io.offset;
566                 // overload nbytes for the header it
567                 hdr_it->nbytes = it->nbytes;
568                 /* success! Now we need to fill relevant data into the new
569                  * header and replace. Most of this requires the item lock
570                  */
571                 /* CAS gets set while linking. Copy post-replace */
572                 item_replace(it, hdr_it, it_info.hv, ITEM_get_cas(it));
573                 do_item_remove(hdr_it);
574                 did_moves = 1;
575                 LOGGER_LOG(NULL, LOG_EVICTIONS, LOGGER_EXTSTORE_WRITE, it, bucket);
576             } else {
577                 /* Failed to write for some reason, can't continue. */
578                 slabs_free(hdr_it, ITEM_ntotal(hdr_it), ITEM_clsid(hdr_it));
579             }
580         }
581     }
582     do_item_remove(it);
583     item_unlock(it_info.hv);
584     return did_moves;
585 }
586 
587 static pthread_t storage_write_tid;
588 static pthread_mutex_t storage_write_plock;
589 #define WRITE_SLEEP_MIN 200
590 
storage_write_thread(void * arg)591 static void *storage_write_thread(void *arg) {
592     void *storage = arg;
593     // NOTE: ignoring overflow since that would take years of uptime in a
594     // specific load pattern of never going to sleep.
595     unsigned int backoff[MAX_NUMBER_OF_SLAB_CLASSES] = {0};
596     unsigned int counter = 0;
597     useconds_t to_sleep = WRITE_SLEEP_MIN;
598     logger *l = logger_create();
599     if (l == NULL) {
600         fprintf(stderr, "Failed to allocate logger for storage compaction thread\n");
601         abort();
602     }
603 
604     pthread_mutex_lock(&storage_write_plock);
605     // The compaction checker is CPU intensive, so we do a loose fudging to
606     // only activate it once every "slab page size" worth of bytes written.
607     // I was calling the compact checker once per run through this main loop,
608     // but we can end up doing lots of short loops without sleeping and end up
609     // calling the compact checker pretty frequently.
610     int check_compact = settings.slab_page_size;
611 
612     while (1) {
613         // cache per-loop to avoid calls to the slabs_clsid() search loop
614         int min_class = slabs_clsid(settings.ext_item_size);
615         unsigned int global_pages = global_page_pool_size(NULL);
616         bool do_sleep = true;
617         int target_pages = 0;
618         if (global_pages < settings.ext_global_pool_min) {
619             target_pages = settings.ext_global_pool_min - global_pages;
620         }
621         counter++;
622         if (to_sleep > settings.ext_max_sleep)
623             to_sleep = settings.ext_max_sleep;
624 
625         for (int x = 0; x < MAX_NUMBER_OF_SLAB_CLASSES; x++) {
626             bool did_move = false;
627             bool mem_limit_reached = false;
628             unsigned int chunks_free;
629             int item_age;
630 
631             if (min_class > x || (backoff[x] && (counter % backoff[x] != 0))) {
632                 continue;
633             }
634 
635             // Avoid extra slab lock calls during heavy writing.
636             unsigned int chunks_perpage = 0;
637             chunks_free = slabs_available_chunks(x, &mem_limit_reached,
638                     &chunks_perpage);
639 
640             if (chunks_perpage == 0) {
641                 // no slab class here, skip.
642                 continue;
643             }
644             unsigned int target = chunks_perpage * target_pages;
645             // Loose estimate for cutting the calls to compacter
646             unsigned int chunk_size = settings.slab_page_size / chunks_perpage;
647 
648             // storage_write() will fail and cut loop after filling write buffer.
649             while (1) {
650                 // if we are low on chunks and no spare, push out early.
651                 if (chunks_free < target) {
652                     item_age = 0;
653                 } else {
654                     item_age = settings.ext_item_age;
655                 }
656                 if (storage_write(storage, x, item_age)) {
657                     chunks_free++; // Allow stopping if we've done enough this loop
658                     check_compact -= chunk_size;
659                     // occasionally kick the compact checker.
660                     if (check_compact < 0) {
661                         pthread_cond_signal(&storage_compact_cond);
662                         check_compact = settings.slab_page_size;
663                     }
664                     did_move = true;
665                     do_sleep = false;
666                     if (to_sleep > WRITE_SLEEP_MIN)
667                         to_sleep /= 2;
668                 } else {
669                     break;
670                 }
671             }
672 
673             if (!did_move) {
674                 backoff[x]++;
675             } else {
676                 backoff[x] = 1;
677             }
678         }
679 
680         // flip lock so we can be paused or stopped
681         pthread_mutex_unlock(&storage_write_plock);
682         if (do_sleep) {
683             // Only do backoffs on other slab classes if we're actively
684             // flushing at least one class.
685             for (int x = 0; x < MAX_NUMBER_OF_SLAB_CLASSES; x++) {
686                 backoff[x] = 1;
687             }
688 
689             // call the compact checker occasionally even if we're just
690             // sleeping.
691             check_compact -= to_sleep * 10;
692             if (check_compact < 0) {
693                 pthread_cond_signal(&storage_compact_cond);
694                 check_compact = settings.slab_page_size;
695             }
696 
697             usleep(to_sleep);
698             to_sleep++;
699         }
700         pthread_mutex_lock(&storage_write_plock);
701     }
702     return NULL;
703 }
704 
705 // TODO
706 // logger needs logger_destroy() to exist/work before this is safe.
707 /*int stop_storage_write_thread(void) {
708     int ret;
709     pthread_mutex_lock(&lru_maintainer_lock);
710     do_run_lru_maintainer_thread = 0;
711     pthread_mutex_unlock(&lru_maintainer_lock);
712     // WAKEUP SIGNAL
713     if ((ret = pthread_join(lru_maintainer_tid, NULL)) != 0) {
714         fprintf(stderr, "Failed to stop LRU maintainer thread: %s\n", strerror(ret));
715         return -1;
716     }
717     settings.lru_maintainer_thread = false;
718     return 0;
719 }*/
720 
storage_write_pause(void)721 void storage_write_pause(void) {
722     pthread_mutex_lock(&storage_write_plock);
723 }
724 
storage_write_resume(void)725 void storage_write_resume(void) {
726     pthread_mutex_unlock(&storage_write_plock);
727 }
728 
start_storage_write_thread(void * arg)729 int start_storage_write_thread(void *arg) {
730     int ret;
731 
732     pthread_mutex_init(&storage_write_plock, NULL);
733     if ((ret = pthread_create(&storage_write_tid, NULL,
734         storage_write_thread, arg)) != 0) {
735         fprintf(stderr, "Can't create storage_write thread: %s\n",
736             strerror(ret));
737         return -1;
738     }
739     thread_setname(storage_write_tid, "mc-ext-write");
740 
741     return 0;
742 }
743 
744 /*** COMPACTOR ***/
745 typedef struct __storage_buk {
746     unsigned int bucket;
747     unsigned int low_page;
748     unsigned int lowest_page;
749     uint64_t low_version;
750     uint64_t lowest_version;
751     unsigned int pages_free;
752     unsigned int pages_used;
753     unsigned int pages_total;
754     unsigned int bytes_fragmented; // fragmented bytes for low page
755     bool do_compact; // indicate this bucket should do a compaction.
756     bool do_compact_drop;
757 } _storage_buk;
758 
759 struct _compact_flags {
760     unsigned int drop_unread : 1;
761     unsigned int has_coldcompact : 1;
762     unsigned int has_old : 1;
763     unsigned int use_old : 1;
764 };
765 
766 /* Fetch stats from the external storage system and decide to compact.
767  */
storage_compact_check(void * storage,logger * l,uint32_t * page_id,uint64_t * page_version,uint64_t * page_size,struct _compact_flags * flags)768 static int storage_compact_check(void *storage, logger *l,
769         uint32_t *page_id, uint64_t *page_version,
770         uint64_t *page_size, struct _compact_flags *flags) {
771     struct extstore_stats st;
772     _storage_buk buckets[PAGE_BUCKET_COUNT];
773     _storage_buk *buk = NULL;
774     uint64_t frag_limit;
775     extstore_get_stats(storage, &st);
776     if (st.pages_used == 0)
777         return 0;
778 
779     for (int x = 0; x < PAGE_BUCKET_COUNT; x++) {
780         memset(&buckets[x], 0, sizeof(_storage_buk));
781         buckets[x].low_version = ULLONG_MAX;
782         buckets[x].lowest_version = ULLONG_MAX;
783     }
784     flags->drop_unread = 0;
785 
786     frag_limit = st.page_size * settings.ext_max_frag;
787     LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_FRAGINFO,
788             NULL, settings.ext_max_frag, frag_limit);
789     st.page_data = calloc(st.page_count, sizeof(struct extstore_page_data));
790     extstore_get_page_data(storage, &st);
791 
792     // find either the most fragmented page or the lowest version.
793     for (int x = 0; x < st.page_count; x++) {
794         buk = &buckets[st.page_data[x].free_bucket];
795         buk->pages_total++;
796         if (st.page_data[x].version == 0) {
797             buk->pages_free++;
798             // free pages don't contribute after this point.
799             continue;
800         } else {
801             buk->pages_used++;
802         }
803 
804         // skip pages actively being used.
805         if (st.page_data[x].active) {
806             continue;
807         }
808 
809         if (st.page_data[x].version < buk->lowest_version) {
810             buk->lowest_page = x;
811             buk->lowest_version = st.page_data[x].version;
812         }
813         // track the most fragmented page.
814         unsigned int frag = st.page_size - st.page_data[x].bytes_used;
815         if (st.page_data[x].bytes_used < frag_limit && frag > buk->bytes_fragmented) {
816             buk->low_page = x;
817             buk->low_version = st.page_data[x].version;
818             buk->bytes_fragmented = frag;
819         }
820     }
821     *page_size = st.page_size;
822     free(st.page_data);
823 
824     buk = &buckets[PAGE_BUCKET_COLDCOMPACT];
825     if (buk->pages_total != 0) {
826         flags->has_coldcompact = 1;
827         if (buk->pages_free == 0 && buk->lowest_version != ULLONG_MAX) {
828             extstore_evict_page(storage, buk->lowest_page, buk->lowest_version);
829             return 0;
830         }
831     }
832 
833     buk = &buckets[PAGE_BUCKET_OLD];
834     if (buk->pages_total != 0) {
835         flags->has_old = 1;
836         if (buk->pages_free == 0 && buk->lowest_version != ULLONG_MAX) {
837             extstore_evict_page(storage, buk->lowest_page, buk->lowest_version);
838             return 0;
839         }
840     }
841 
842     for (int x = 0; x < PAGE_BUCKET_COUNT; x++) {
843         buk = &buckets[x];
844         assert(buk->pages_total == (buk->pages_used + buk->pages_free));
845         unsigned int pages_total = buk->pages_total;
846         // only process buckets which have dedicated pages assigned.
847         // LOWTTL skips compaction.
848         if (pages_total == 0 || x == PAGE_BUCKET_LOWTTL)
849             continue;
850 
851         if (buk->pages_free < settings.ext_compact_under) {
852             if (buk->low_version != ULLONG_MAX) {
853                 // found a normally defraggable page.
854                 *page_id = buk->low_page;
855                 *page_version = buk->low_version;
856                 return 1;
857             } else if (buk->pages_free < settings.ext_drop_under
858                     && buk->lowest_version != ULLONG_MAX) {
859 
860                 if (x == PAGE_BUCKET_COLDCOMPACT || x == PAGE_BUCKET_OLD) {
861                     // this freeing technique doesn't apply to these buckets.
862                     // instead these buckets are eviction or normal
863                     // defragmentation only.
864                     continue;
865                 }
866                 // Nothing defraggable. Check for other usable conditions.
867                 if (settings.ext_drop_unread) {
868                     flags->drop_unread = 1;
869                 }
870 
871                 // If OLD and/or COLDCOMPACT pages exist we should always have
872                 // one free page in those buckets, so we can always attempt to
873                 // defrag into them.
874                 // If only COLDCOMPACT exists this will attempt to segment off
875                 // parts of a page that haven't been used.
876                 // If OLD exists everything else in this "oldest page" goes
877                 // into the OLD stream.
878                 if (flags->drop_unread || flags->has_coldcompact || flags->has_old) {
879                     // only actually use the old flag if we can't compact.
880                     flags->use_old = flags->has_old;
881                     *page_id = buk->lowest_page;
882                     *page_version = buk->lowest_version;
883                     return 1;
884                 }
885             }
886         }
887     }
888 
889     return 0;
890 }
891 
892 #define MIN_STORAGE_COMPACT_SLEEP 1000
893 
894 struct storage_compact_wrap {
895     obj_io io;
896     pthread_mutex_t lock; // gates the bools.
897     bool done;
898     bool submitted;
899     bool miss; // version flipped out from under us
900 };
901 
storage_compact_readback(void * storage,logger * l,struct _compact_flags flags,char * readback_buf,uint32_t page_id,uint64_t page_version,uint32_t page_offset,uint64_t read_size)902 static void storage_compact_readback(void *storage, logger *l,
903         struct _compact_flags flags, char *readback_buf,
904         uint32_t page_id, uint64_t page_version, uint32_t page_offset, uint64_t read_size) {
905     uint64_t offset = 0;
906     unsigned int rescues = 0;
907     unsigned int lost = 0;
908     unsigned int skipped = 0;
909     unsigned int rescue_cold = 0;
910     unsigned int rescue_old = 0;
911 
912     while (offset < read_size) {
913         item *hdr_it = NULL;
914         item_hdr *hdr = NULL;
915         item *it = (item *)(readback_buf+offset);
916         unsigned int ntotal;
917         // probably zeroed out junk at the end of the wbuf
918         if (it->nkey == 0) {
919             break;
920         }
921 
922         ntotal = ITEM_ntotal(it);
923         uint32_t hv = (uint32_t)it->time;
924         item_lock(hv);
925         // We don't have a conn and don't need to do most of do_item_get
926         hdr_it = assoc_find(ITEM_key(it), it->nkey, hv);
927         if (hdr_it != NULL) {
928             bool do_write = false;
929             int bucket = flags.use_old ? PAGE_BUCKET_OLD : PAGE_BUCKET_COMPACT;
930             refcount_incr(hdr_it);
931 
932             // Check validity but don't bother removing it.
933             if ((hdr_it->it_flags & ITEM_HDR) && !item_is_flushed(hdr_it) &&
934                    (hdr_it->exptime == 0 || hdr_it->exptime > current_time)) {
935                 hdr = (item_hdr *)ITEM_data(hdr_it);
936                 if (hdr->page_id == page_id && hdr->page_version == page_version
937                         && hdr->offset == (int)offset + page_offset) {
938                     // Item header is still completely valid.
939                     extstore_delete(storage, page_id, page_version, 1, ntotal);
940                     // special case inactive items.
941                     do_write = true;
942                     if (GET_LRU(hdr_it->slabs_clsid) == COLD_LRU) {
943                         if (flags.has_coldcompact) {
944                             // Write the cold items to a different stream.
945                             bucket = PAGE_BUCKET_COLDCOMPACT;
946                         } else if (flags.drop_unread) {
947                             do_write = false;
948                             skipped++;
949                         }
950                     }
951                 }
952             }
953 
954             if (do_write) {
955                 bool do_update = false;
956                 int tries;
957                 obj_io io;
958                 io.len = ntotal;
959                 io.mode = OBJ_IO_WRITE;
960                 for (tries = 10; tries > 0; tries--) {
961                     if (extstore_write_request(storage, bucket, bucket, &io) == 0) {
962                         memcpy(io.buf, it, io.len);
963                         extstore_write(storage, &io);
964                         do_update = true;
965                         break;
966                     } else {
967                         usleep(1000);
968                     }
969                 }
970 
971                 if (do_update) {
972                     bool rescued = false;
973                     if (it->refcount == 2) {
974                         hdr->page_version = io.page_version;
975                         hdr->page_id = io.page_id;
976                         hdr->offset = io.offset;
977                         rescued = true;
978                     } else {
979                         // re-alloc and replace header.
980                         client_flags_t flags;
981                         FLAGS_CONV(hdr_it, flags);
982                         item *new_it = do_item_alloc(ITEM_key(hdr_it), hdr_it->nkey, flags, hdr_it->exptime, sizeof(item_hdr));
983                         if (new_it) {
984                             // need to preserve the original item flags, but we
985                             // start unlinked, with linked being added during
986                             // item_replace below.
987                             new_it->it_flags = hdr_it->it_flags & (~ITEM_LINKED);
988                             new_it->time = hdr_it->time;
989                             new_it->nbytes = hdr_it->nbytes;
990 
991                             // copy the hdr data.
992                             item_hdr *new_hdr = (item_hdr *) ITEM_data(new_it);
993                             new_hdr->page_version = io.page_version;
994                             new_hdr->page_id = io.page_id;
995                             new_hdr->offset = io.offset;
996 
997                             // replace the item in the hash table.
998                             item_replace(hdr_it, new_it, hv, ITEM_get_cas(hdr_it));
999                             do_item_remove(new_it); // release our reference.
1000                             rescued = true;
1001                         } else {
1002                             lost++;
1003                         }
1004                     }
1005 
1006                     if (rescued) {
1007                         rescues++;
1008                         if (bucket == PAGE_BUCKET_COLDCOMPACT) {
1009                             rescue_cold++;
1010                         } else if (bucket == PAGE_BUCKET_OLD) {
1011                             rescue_old++;
1012                         }
1013                     }
1014                 } else {
1015                     lost++;
1016                 }
1017             }
1018 
1019             do_item_remove(hdr_it);
1020         }
1021 
1022         item_unlock(hv);
1023         offset += ntotal;
1024         if (read_size - offset < sizeof(struct _stritem))
1025             break;
1026     }
1027 
1028     STATS_LOCK();
1029     stats.extstore_compact_lost += lost;
1030     stats.extstore_compact_rescues += rescues;
1031     stats.extstore_compact_skipped += skipped;
1032     stats.extstore_compact_resc_cold += rescue_cold;
1033     stats.extstore_compact_resc_old += rescue_old;
1034     STATS_UNLOCK();
1035     LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_READ_END,
1036             NULL, page_id, offset, rescues, lost, skipped);
1037 }
1038 
1039 // wrap lock is held while waiting for this callback, preventing caller thread
1040 // from fast-looping.
_storage_compact_cb(void * e,obj_io * io,int ret)1041 static void _storage_compact_cb(void *e, obj_io *io, int ret) {
1042     struct storage_compact_wrap *wrap = (struct storage_compact_wrap *)io->data;
1043     assert(wrap->submitted == true);
1044 
1045     if (ret < 1) {
1046         wrap->miss = true;
1047     }
1048     wrap->done = true;
1049 
1050     pthread_mutex_unlock(&wrap->lock);
1051 }
1052 
1053 // TODO: hoist the storage bits from lru_maintainer_thread in here.
1054 // would be nice if they could avoid hammering the same locks though?
1055 // I guess it's only COLD. that's probably fine.
storage_compact_thread(void * arg)1056 static void *storage_compact_thread(void *arg) {
1057     void *storage = arg;
1058     bool compacting = false;
1059     uint64_t page_version = 0;
1060     uint64_t page_size = 0;
1061     uint32_t page_offset = 0;
1062     uint32_t page_id = 0;
1063     struct _compact_flags flags;
1064     char *readback_buf = NULL;
1065     struct storage_compact_wrap wrap;
1066     memset(&flags, 0, sizeof(flags));
1067 
1068     logger *l = logger_create();
1069     if (l == NULL) {
1070         fprintf(stderr, "Failed to allocate logger for storage compaction thread\n");
1071         abort();
1072     }
1073 
1074     readback_buf = malloc(settings.ext_wbuf_size);
1075     if (readback_buf == NULL) {
1076         fprintf(stderr, "Failed to allocate readback buffer for storage compaction thread\n");
1077         abort();
1078     }
1079 
1080     pthread_mutex_init(&wrap.lock, NULL);
1081     wrap.done = false;
1082     wrap.submitted = false;
1083     wrap.io.data = &wrap;
1084     wrap.io.iov = NULL;
1085     wrap.io.buf = (void *)readback_buf;
1086 
1087     wrap.io.len = settings.ext_wbuf_size;
1088     wrap.io.mode = OBJ_IO_READ;
1089     wrap.io.cb = _storage_compact_cb;
1090     pthread_mutex_lock(&storage_compact_plock);
1091 
1092     while (1) {
1093         if (!compacting && storage_compact_check(storage, l,
1094                     &page_id, &page_version, &page_size, &flags)) {
1095             page_offset = 0;
1096             compacting = true;
1097             LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_START,
1098                     NULL, page_id, page_version);
1099         } else {
1100             pthread_cond_wait(&storage_compact_cond, &storage_compact_plock);
1101         }
1102 
1103         while (compacting) {
1104             pthread_mutex_lock(&wrap.lock);
1105             if (page_offset < page_size && !wrap.done && !wrap.submitted) {
1106                 wrap.io.page_version = page_version;
1107                 wrap.io.page_id = page_id;
1108                 wrap.io.offset = page_offset;
1109                 // FIXME: should be smarter about io->next (unlink at use?)
1110                 wrap.io.next = NULL;
1111                 wrap.submitted = true;
1112                 wrap.miss = false;
1113 
1114                 extstore_submit_bg(storage, &wrap.io);
1115             } else if (wrap.miss) {
1116                 LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_ABORT,
1117                         NULL, page_id);
1118                 wrap.done = false;
1119                 wrap.submitted = false;
1120                 compacting = false;
1121             } else if (wrap.submitted && wrap.done) {
1122                 LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_READ_START,
1123                         NULL, page_id, page_offset);
1124                 storage_compact_readback(storage, l, flags,
1125                         readback_buf, page_id, page_version, page_offset,
1126                         settings.ext_wbuf_size);
1127                 page_offset += settings.ext_wbuf_size;
1128                 wrap.done = false;
1129                 wrap.submitted = false;
1130             } else if (page_offset >= page_size) {
1131                 compacting = false;
1132                 wrap.done = false;
1133                 wrap.submitted = false;
1134                 extstore_close_page(storage, page_id, page_version);
1135                 LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_END,
1136                         NULL, page_id);
1137                 // short cooling period between defragmentation runs.
1138                 usleep(MIN_STORAGE_COMPACT_SLEEP);
1139             }
1140             pthread_mutex_unlock(&wrap.lock);
1141         }
1142     }
1143     free(readback_buf);
1144 
1145     return NULL;
1146 }
1147 
1148 // TODO
1149 // logger needs logger_destroy() to exist/work before this is safe.
1150 /*int stop_storage_compact_thread(void) {
1151     int ret;
1152     pthread_mutex_lock(&lru_maintainer_lock);
1153     do_run_lru_maintainer_thread = 0;
1154     pthread_mutex_unlock(&lru_maintainer_lock);
1155     if ((ret = pthread_join(lru_maintainer_tid, NULL)) != 0) {
1156         fprintf(stderr, "Failed to stop LRU maintainer thread: %s\n", strerror(ret));
1157         return -1;
1158     }
1159     settings.lru_maintainer_thread = false;
1160     return 0;
1161 }*/
1162 
storage_compact_pause(void)1163 void storage_compact_pause(void) {
1164     pthread_mutex_lock(&storage_compact_plock);
1165 }
1166 
storage_compact_resume(void)1167 void storage_compact_resume(void) {
1168     pthread_mutex_unlock(&storage_compact_plock);
1169 }
1170 
start_storage_compact_thread(void * arg)1171 int start_storage_compact_thread(void *arg) {
1172     int ret;
1173 
1174     pthread_mutex_init(&storage_compact_plock, NULL);
1175     pthread_cond_init(&storage_compact_cond, NULL);
1176     if ((ret = pthread_create(&storage_compact_tid, NULL,
1177         storage_compact_thread, arg)) != 0) {
1178         fprintf(stderr, "Can't create storage_compact thread: %s\n",
1179             strerror(ret));
1180         return -1;
1181     }
1182     thread_setname(storage_compact_tid, "mc-ext-compact");
1183 
1184     return 0;
1185 }
1186 
1187 /*** UTILITY ***/
1188 // /path/to/file:100G:bucket1
1189 // FIXME: Modifies argument. copy instead?
storage_conf_parse(char * arg,unsigned int page_size)1190 struct extstore_conf_file *storage_conf_parse(char *arg, unsigned int page_size) {
1191     struct extstore_conf_file *cf = NULL;
1192     char *b = NULL;
1193     char *p = strtok_r(arg, ":", &b);
1194     char unit = 0;
1195     uint64_t multiplier = 0;
1196     int base_size = 0;
1197     if (p == NULL)
1198         goto error;
1199     // First arg is the filepath.
1200     cf = calloc(1, sizeof(struct extstore_conf_file));
1201     cf->file = strdup(p);
1202 
1203     p = strtok_r(NULL, ":", &b);
1204     if (p == NULL) {
1205         fprintf(stderr, "must supply size to ext_path, ie: ext_path=/f/e:64m (M|G|T|P supported)\n");
1206         goto error;
1207     }
1208     unit = tolower(p[strlen(p)-1]);
1209     p[strlen(p)-1] = '\0';
1210     // sigh.
1211     switch (unit) {
1212         case 'm':
1213             multiplier = 1024 * 1024;
1214             break;
1215         case 'g':
1216             multiplier = 1024 * 1024 * 1024;
1217             break;
1218         case 't':
1219             multiplier = 1024 * 1024;
1220             multiplier *= 1024 * 1024;
1221             break;
1222         case 'p':
1223             multiplier = 1024 * 1024;
1224             multiplier *= 1024 * 1024 * 1024;
1225             break;
1226         default:
1227             fprintf(stderr, "must supply size to ext_path, ie: ext_path=/f/e:64m (M|G|T|P supported)\n");
1228             goto error;
1229     }
1230     base_size = atoi(p);
1231     multiplier *= base_size;
1232     // page_count is nearest-but-not-larger-than pages * psize
1233     cf->page_count = multiplier / page_size;
1234     assert(page_size * cf->page_count <= multiplier);
1235     if (cf->page_count == 0) {
1236         fprintf(stderr, "supplied ext_path has zero size, cannot use\n");
1237         goto error;
1238     }
1239 
1240     // final token would be a default free bucket
1241     p = strtok_r(NULL, ":", &b);
1242     // TODO: We reuse the original DEFINES for now,
1243     // but if lowttl gets split up this needs to be its own set.
1244     if (p != NULL) {
1245         if (strcmp(p, "compact") == 0) {
1246             cf->free_bucket = PAGE_BUCKET_COMPACT;
1247         } else if (strcmp(p, "lowttl") == 0) {
1248             cf->free_bucket = PAGE_BUCKET_LOWTTL;
1249         } else if (strcmp(p, "chunked") == 0) {
1250             cf->free_bucket = PAGE_BUCKET_CHUNKED;
1251         } else if (strcmp(p, "default") == 0) {
1252             cf->free_bucket = PAGE_BUCKET_DEFAULT;
1253         } else if (strcmp(p, "coldcompact") == 0) {
1254             cf->free_bucket = PAGE_BUCKET_COLDCOMPACT;
1255         } else if (strcmp(p, "old") == 0) {
1256             cf->free_bucket = PAGE_BUCKET_OLD;
1257         } else {
1258             fprintf(stderr, "Unknown extstore bucket: %s\n", p);
1259             goto error;
1260         }
1261     } else {
1262         // TODO: is this necessary?
1263         cf->free_bucket = PAGE_BUCKET_DEFAULT;
1264     }
1265 
1266     return cf;
1267 error:
1268     if (cf) {
1269         if (cf->file)
1270             free(cf->file);
1271         free(cf);
1272     }
1273     return NULL;
1274 }
1275 
1276 struct storage_settings {
1277     struct extstore_conf_file *storage_file;
1278     struct extstore_conf ext_cf;
1279 };
1280 
storage_init_config(struct settings * s)1281 void *storage_init_config(struct settings *s) {
1282     struct storage_settings *cf = calloc(1, sizeof(struct storage_settings));
1283 
1284     s->ext_item_size = 512;
1285     s->ext_item_age = UINT_MAX;
1286     s->ext_low_ttl = 0;
1287     s->ext_recache_rate = 2000;
1288     s->ext_max_frag = 0.8;
1289     s->ext_drop_unread = false;
1290     s->ext_wbuf_size = 1024 * 1024 * 4;
1291     s->ext_compact_under = 0;
1292     s->ext_drop_under = 0;
1293     s->ext_max_sleep = 1000000;
1294     s->slab_automove_freeratio = 0.01;
1295     s->ext_page_size = 1024 * 1024 * 64;
1296     s->ext_io_threadcount = 1;
1297     cf->ext_cf.page_size = settings.ext_page_size;
1298     cf->ext_cf.wbuf_size = settings.ext_wbuf_size;
1299     cf->ext_cf.io_threadcount = settings.ext_io_threadcount;
1300     cf->ext_cf.io_depth = 1;
1301     cf->ext_cf.page_buckets = PAGE_BUCKET_COUNT;
1302     cf->ext_cf.wbuf_count = cf->ext_cf.page_buckets;
1303 
1304     return cf;
1305 }
1306 
1307 // TODO: pass settings struct?
storage_read_config(void * conf,char ** subopt)1308 int storage_read_config(void *conf, char **subopt) {
1309     struct storage_settings *cf = conf;
1310     struct extstore_conf *ext_cf = &cf->ext_cf;
1311     char *subopts_value;
1312 
1313     enum {
1314         EXT_PAGE_SIZE,
1315         EXT_WBUF_SIZE,
1316         EXT_THREADS,
1317         EXT_IO_DEPTH,
1318         EXT_PATH,
1319         EXT_ITEM_SIZE,
1320         EXT_ITEM_AGE,
1321         EXT_LOW_TTL,
1322         EXT_RECACHE_RATE,
1323         EXT_COMPACT_UNDER,
1324         EXT_DROP_UNDER,
1325         EXT_MAX_SLEEP,
1326         EXT_MAX_FRAG,
1327         EXT_DROP_UNREAD,
1328         SLAB_AUTOMOVE_FREERATIO, // FIXME: move this back?
1329     };
1330 
1331     char *const subopts_tokens[] = {
1332         [EXT_PAGE_SIZE] = "ext_page_size",
1333         [EXT_WBUF_SIZE] = "ext_wbuf_size",
1334         [EXT_THREADS] = "ext_threads",
1335         [EXT_IO_DEPTH] = "ext_io_depth",
1336         [EXT_PATH] = "ext_path",
1337         [EXT_ITEM_SIZE] = "ext_item_size",
1338         [EXT_ITEM_AGE] = "ext_item_age",
1339         [EXT_LOW_TTL] = "ext_low_ttl",
1340         [EXT_RECACHE_RATE] = "ext_recache_rate",
1341         [EXT_COMPACT_UNDER] = "ext_compact_under",
1342         [EXT_DROP_UNDER] = "ext_drop_under",
1343         [EXT_MAX_SLEEP] = "ext_max_sleep",
1344         [EXT_MAX_FRAG] = "ext_max_frag",
1345         [EXT_DROP_UNREAD] = "ext_drop_unread",
1346         [SLAB_AUTOMOVE_FREERATIO] = "slab_automove_freeratio",
1347         NULL
1348     };
1349 
1350     switch (getsubopt(subopt, subopts_tokens, &subopts_value)) {
1351         case EXT_PAGE_SIZE:
1352             if (cf->storage_file) {
1353                 fprintf(stderr, "Must specify ext_page_size before any ext_path arguments\n");
1354                 return 1;
1355             }
1356             if (subopts_value == NULL) {
1357                 fprintf(stderr, "Missing ext_page_size argument\n");
1358                 return 1;
1359             }
1360             if (!safe_strtoul(subopts_value, &ext_cf->page_size)) {
1361                 fprintf(stderr, "could not parse argument to ext_page_size\n");
1362                 return 1;
1363             }
1364             ext_cf->page_size *= 1024 * 1024; /* megabytes */
1365             break;
1366         case EXT_WBUF_SIZE:
1367             if (subopts_value == NULL) {
1368                 fprintf(stderr, "Missing ext_wbuf_size argument\n");
1369                 return 1;
1370             }
1371             if (!safe_strtoul(subopts_value, &ext_cf->wbuf_size)) {
1372                 fprintf(stderr, "could not parse argument to ext_wbuf_size\n");
1373                 return 1;
1374             }
1375             ext_cf->wbuf_size *= 1024 * 1024; /* megabytes */
1376             settings.ext_wbuf_size = ext_cf->wbuf_size;
1377             break;
1378         case EXT_THREADS:
1379             if (subopts_value == NULL) {
1380                 fprintf(stderr, "Missing ext_threads argument\n");
1381                 return 1;
1382             }
1383             if (!safe_strtoul(subopts_value, &ext_cf->io_threadcount)) {
1384                 fprintf(stderr, "could not parse argument to ext_threads\n");
1385                 return 1;
1386             }
1387             break;
1388         case EXT_IO_DEPTH:
1389             if (subopts_value == NULL) {
1390                 fprintf(stderr, "Missing ext_io_depth argument\n");
1391                 return 1;
1392             }
1393             if (!safe_strtoul(subopts_value, &ext_cf->io_depth)) {
1394                 fprintf(stderr, "could not parse argument to ext_io_depth\n");
1395                 return 1;
1396             }
1397             break;
1398         case EXT_ITEM_SIZE:
1399             if (subopts_value == NULL) {
1400                 fprintf(stderr, "Missing ext_item_size argument\n");
1401                 return 1;
1402             }
1403             if (!safe_strtoul(subopts_value, &settings.ext_item_size)) {
1404                 fprintf(stderr, "could not parse argument to ext_item_size\n");
1405                 return 1;
1406             }
1407             break;
1408         case EXT_ITEM_AGE:
1409             if (subopts_value == NULL) {
1410                 fprintf(stderr, "Missing ext_item_age argument\n");
1411                 return 1;
1412             }
1413             if (!safe_strtoul(subopts_value, &settings.ext_item_age)) {
1414                 fprintf(stderr, "could not parse argument to ext_item_age\n");
1415                 return 1;
1416             }
1417             break;
1418         case EXT_LOW_TTL:
1419             if (subopts_value == NULL) {
1420                 fprintf(stderr, "Missing ext_low_ttl argument\n");
1421                 return 1;
1422             }
1423             if (!safe_strtoul(subopts_value, &settings.ext_low_ttl)) {
1424                 fprintf(stderr, "could not parse argument to ext_low_ttl\n");
1425                 return 1;
1426             }
1427             break;
1428         case EXT_RECACHE_RATE:
1429             if (subopts_value == NULL) {
1430                 fprintf(stderr, "Missing ext_recache_rate argument\n");
1431                 return 1;
1432             }
1433             if (!safe_strtoul(subopts_value, &settings.ext_recache_rate)) {
1434                 fprintf(stderr, "could not parse argument to ext_recache_rate\n");
1435                 return 1;
1436             }
1437             break;
1438         case EXT_COMPACT_UNDER:
1439             if (subopts_value == NULL) {
1440                 fprintf(stderr, "Missing ext_compact_under argument\n");
1441                 return 1;
1442             }
1443             if (!safe_strtoul(subopts_value, &settings.ext_compact_under)) {
1444                 fprintf(stderr, "could not parse argument to ext_compact_under\n");
1445                 return 1;
1446             }
1447             break;
1448         case EXT_DROP_UNDER:
1449             if (subopts_value == NULL) {
1450                 fprintf(stderr, "Missing ext_drop_under argument\n");
1451                 return 1;
1452             }
1453             if (!safe_strtoul(subopts_value, &settings.ext_drop_under)) {
1454                 fprintf(stderr, "could not parse argument to ext_drop_under\n");
1455                 return 1;
1456             }
1457             break;
1458         case EXT_MAX_SLEEP:
1459             if (subopts_value == NULL) {
1460                 fprintf(stderr, "Missing ext_max_sleep argument\n");
1461                 return 1;
1462             }
1463             if (!safe_strtoul(subopts_value, &settings.ext_max_sleep)) {
1464                 fprintf(stderr, "could not parse argument to ext_max_sleep\n");
1465                 return 1;
1466             }
1467             break;
1468         case EXT_MAX_FRAG:
1469             if (subopts_value == NULL) {
1470                 fprintf(stderr, "Missing ext_max_frag argument\n");
1471                 return 1;
1472             }
1473             if (!safe_strtod(subopts_value, &settings.ext_max_frag)) {
1474                 fprintf(stderr, "could not parse argument to ext_max_frag\n");
1475                 return 1;
1476             }
1477             break;
1478         case SLAB_AUTOMOVE_FREERATIO:
1479             if (subopts_value == NULL) {
1480                 fprintf(stderr, "Missing slab_automove_freeratio argument\n");
1481                 return 1;
1482             }
1483             if (!safe_strtod(subopts_value, &settings.slab_automove_freeratio)) {
1484                 fprintf(stderr, "could not parse argument to slab_automove_freeratio\n");
1485                 return 1;
1486             }
1487             break;
1488         case EXT_DROP_UNREAD:
1489             settings.ext_drop_unread = true;
1490             break;
1491         case EXT_PATH:
1492             if (subopts_value) {
1493                 struct extstore_conf_file *tmp = storage_conf_parse(subopts_value, ext_cf->page_size);
1494                 if (tmp == NULL) {
1495                     fprintf(stderr, "failed to parse ext_path argument\n");
1496                     return 1;
1497                 }
1498                 if (cf->storage_file != NULL) {
1499                     tmp->next = cf->storage_file;
1500                 }
1501                 cf->storage_file = tmp;
1502             } else {
1503                 fprintf(stderr, "missing argument to ext_path, ie: ext_path=/d/file:5G\n");
1504                 return 1;
1505             }
1506             break;
1507         default:
1508             fprintf(stderr, "Illegal suboption \"%s\"\n", subopts_value);
1509             return 1;
1510     }
1511 
1512     return 0;
1513 }
1514 
storage_check_config(void * conf)1515 int storage_check_config(void *conf) {
1516     struct storage_settings *cf = conf;
1517     struct extstore_conf *ext_cf = &cf->ext_cf;
1518 
1519     if (cf->storage_file) {
1520         if (settings.item_size_max > ext_cf->wbuf_size) {
1521             fprintf(stderr, "-I (item_size_max: %d) cannot be larger than ext_wbuf_size: %d\n",
1522                 settings.item_size_max, ext_cf->wbuf_size);
1523             return 1;
1524         }
1525 
1526         if (settings.udpport) {
1527             fprintf(stderr, "Cannot use UDP with extstore enabled (-U 0 to disable)\n");
1528             return 1;
1529         }
1530 
1531         return 0;
1532     }
1533 
1534     return 2;
1535 }
1536 
storage_init(void * conf)1537 void *storage_init(void *conf) {
1538     struct storage_settings *cf = conf;
1539     struct extstore_conf *ext_cf = &cf->ext_cf;
1540 
1541     enum extstore_res eres;
1542     void *storage = NULL;
1543     if (settings.ext_compact_under == 0) {
1544         // If changing the default fraction, change the help text as well.
1545         settings.ext_compact_under = cf->storage_file->page_count * 0.01;
1546         settings.ext_drop_under = cf->storage_file->page_count * 0.01;
1547         if (settings.ext_compact_under < 1) {
1548             settings.ext_compact_under = 1;
1549         }
1550         if (settings.ext_drop_under < 1) {
1551             settings.ext_drop_under = 1;
1552         }
1553     }
1554     crc32c_init();
1555 
1556     settings.ext_global_pool_min = 0;
1557     storage = extstore_init(cf->storage_file, ext_cf, &eres);
1558     if (storage == NULL) {
1559         fprintf(stderr, "Failed to initialize external storage: %s\n",
1560                 extstore_err(eres));
1561         if (eres == EXTSTORE_INIT_OPEN_FAIL) {
1562             perror("extstore open");
1563         }
1564         return NULL;
1565     }
1566 
1567     return storage;
1568 }
1569 
1570 #endif
1571