1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 #include "memcached.h"
3 #ifdef EXTSTORE
4
5 #include "storage.h"
6 #include "extstore.h"
7 #include <stdlib.h>
8 #include <stdio.h>
9 #include <stddef.h>
10 #include <string.h>
11 #include <limits.h>
12 #include <ctype.h>
13
14 #define PAGE_BUCKET_DEFAULT 0
15 #define PAGE_BUCKET_COMPACT 1
16 #define PAGE_BUCKET_CHUNKED 2
17 #define PAGE_BUCKET_LOWTTL 3
18 #define PAGE_BUCKET_COLDCOMPACT 4
19 #define PAGE_BUCKET_OLD 5
20 // Not another bucket; this is the total number of buckets.
21 #define PAGE_BUCKET_COUNT 6
22
23 /*
24 * API functions
25 */
26 static void storage_finalize_cb(io_pending_t *pending);
27 static void storage_return_cb(io_pending_t *pending);
28
29 // re-cast an io_pending_t into this more descriptive structure.
30 // the first few items _must_ match the original struct.
31 typedef struct _io_pending_storage_t {
32 int io_queue_type;
33 LIBEVENT_THREAD *thread;
34 conn *c;
35 mc_resp *resp;
36 io_queue_cb return_cb; // called on worker thread.
37 io_queue_cb finalize_cb; // called back on the worker thread.
38 STAILQ_ENTRY(io_pending_t) iop_next; // queue chain.
39 /* original struct ends here */
40 item *hdr_it; /* original header item. */
41 obj_io io_ctx; /* embedded extstore IO header */
42 unsigned int iovec_data; /* specific index of data iovec */
43 bool noreply; /* whether the response had noreply set */
44 bool miss; /* signal a miss to unlink hdr_it */
45 bool badcrc; /* signal a crc failure */
46 bool active; /* tells if IO was dispatched or not */
47 } io_pending_storage_t;
48
49 static pthread_t storage_compact_tid;
50 static pthread_mutex_t storage_compact_plock;
51 static pthread_cond_t storage_compact_cond;
52
53 // Only call this if item has ITEM_HDR
storage_validate_item(void * e,item * it)54 bool storage_validate_item(void *e, item *it) {
55 item_hdr *hdr = (item_hdr *)ITEM_data(it);
56 if (extstore_check(e, hdr->page_id, hdr->page_version) != 0) {
57 return false;
58 } else {
59 return true;
60 }
61 }
62
storage_delete(void * e,item * it)63 void storage_delete(void *e, item *it) {
64 if (it->it_flags & ITEM_HDR) {
65 item_hdr *hdr = (item_hdr *)ITEM_data(it);
66 extstore_delete(e, hdr->page_id, hdr->page_version,
67 1, ITEM_ntotal(it));
68 }
69 }
70
71 // Function for the extra stats called from a protocol.
72 // NOTE: This either needs a name change or a wrapper, perhaps?
73 // it's defined here to reduce exposure of extstore.h to the rest of memcached
74 // but feels a little off being defined here.
75 // At very least maybe "process_storage_stats" in line with making this more
76 // of a generic wrapper module.
process_extstore_stats(ADD_STAT add_stats,void * c)77 void process_extstore_stats(ADD_STAT add_stats, void *c) {
78 int i;
79 char key_str[STAT_KEY_LEN];
80 char val_str[STAT_VAL_LEN];
81 int klen = 0, vlen = 0;
82 struct extstore_stats st;
83
84 assert(add_stats);
85
86 void *storage = ext_storage;
87 if (storage == NULL) {
88 return;
89 }
90 extstore_get_stats(storage, &st);
91 st.page_data = calloc(st.page_count, sizeof(struct extstore_page_data));
92 extstore_get_page_data(storage, &st);
93
94 for (i = 0; i < st.page_count; i++) {
95 APPEND_NUM_STAT(i, "version", "%llu",
96 (unsigned long long) st.page_data[i].version);
97 APPEND_NUM_STAT(i, "bytes", "%llu",
98 (unsigned long long) st.page_data[i].bytes_used);
99 APPEND_NUM_STAT(i, "bucket", "%u",
100 st.page_data[i].bucket);
101 APPEND_NUM_STAT(i, "free_bucket", "%u",
102 st.page_data[i].free_bucket);
103 }
104
105 free(st.page_data);
106 }
107
108 // Additional storage stats for the main stats output.
storage_stats(ADD_STAT add_stats,void * c)109 void storage_stats(ADD_STAT add_stats, void *c) {
110 struct extstore_stats st;
111 if (ext_storage) {
112 STATS_LOCK();
113 APPEND_STAT("extstore_compact_lost", "%llu", (unsigned long long)stats.extstore_compact_lost);
114 APPEND_STAT("extstore_compact_rescues", "%llu", (unsigned long long)stats.extstore_compact_rescues);
115 APPEND_STAT("extstore_compact_resc_cold", "%llu", (unsigned long long)stats.extstore_compact_resc_cold);
116 APPEND_STAT("extstore_compact_resc_old", "%llu", (unsigned long long)stats.extstore_compact_resc_old);
117 APPEND_STAT("extstore_compact_skipped", "%llu", (unsigned long long)stats.extstore_compact_skipped);
118 STATS_UNLOCK();
119 extstore_get_stats(ext_storage, &st);
120 APPEND_STAT("extstore_page_allocs", "%llu", (unsigned long long)st.page_allocs);
121 APPEND_STAT("extstore_page_evictions", "%llu", (unsigned long long)st.page_evictions);
122 APPEND_STAT("extstore_page_reclaims", "%llu", (unsigned long long)st.page_reclaims);
123 APPEND_STAT("extstore_pages_free", "%llu", (unsigned long long)st.pages_free);
124 APPEND_STAT("extstore_pages_used", "%llu", (unsigned long long)st.pages_used);
125 APPEND_STAT("extstore_objects_evicted", "%llu", (unsigned long long)st.objects_evicted);
126 APPEND_STAT("extstore_objects_read", "%llu", (unsigned long long)st.objects_read);
127 APPEND_STAT("extstore_objects_written", "%llu", (unsigned long long)st.objects_written);
128 APPEND_STAT("extstore_objects_used", "%llu", (unsigned long long)st.objects_used);
129 APPEND_STAT("extstore_bytes_evicted", "%llu", (unsigned long long)st.bytes_evicted);
130 APPEND_STAT("extstore_bytes_written", "%llu", (unsigned long long)st.bytes_written);
131 APPEND_STAT("extstore_bytes_read", "%llu", (unsigned long long)st.bytes_read);
132 APPEND_STAT("extstore_bytes_used", "%llu", (unsigned long long)st.bytes_used);
133 APPEND_STAT("extstore_bytes_fragmented", "%llu", (unsigned long long)st.bytes_fragmented);
134 APPEND_STAT("extstore_limit_maxbytes", "%llu", (unsigned long long)(st.page_count * st.page_size));
135 APPEND_STAT("extstore_io_queue", "%llu", (unsigned long long)(st.io_queue));
136 }
137
138 }
139
140 // This callback runs in the IO thread.
141 // TODO: Some or all of this should move to the
142 // io_pending's callback back in the worker thread.
143 // It might make sense to keep the crc32c check here though.
_storage_get_item_cb(void * e,obj_io * io,int ret)144 static void _storage_get_item_cb(void *e, obj_io *io, int ret) {
145 // FIXME: assumes success
146 io_pending_storage_t *p = (io_pending_storage_t *)io->data;
147 mc_resp *resp = p->resp;
148 conn *c = p->c;
149 assert(p->active == true);
150 item *read_it = (item *)io->buf;
151 bool miss = false;
152
153 // TODO: How to do counters for hit/misses?
154 if (ret < 1) {
155 miss = true;
156 } else {
157 uint32_t crc2;
158 uint32_t crc = (uint32_t) read_it->exptime;
159 int x;
160 // item is chunked, crc the iov's
161 if (io->iov != NULL) {
162 // first iov is the header, which we don't use beyond crc
163 crc2 = crc32c(0, (char *)io->iov[0].iov_base+STORE_OFFSET, io->iov[0].iov_len-STORE_OFFSET);
164 // make sure it's not sent. hack :(
165 io->iov[0].iov_len = 0;
166 for (x = 1; x < io->iovcnt; x++) {
167 crc2 = crc32c(crc2, (char *)io->iov[x].iov_base, io->iov[x].iov_len);
168 }
169 } else {
170 crc2 = crc32c(0, (char *)read_it+STORE_OFFSET, io->len-STORE_OFFSET);
171 }
172
173 if (crc != crc2) {
174 miss = true;
175 p->badcrc = true;
176 }
177 }
178
179 if (miss) {
180 if (p->noreply) {
181 // In all GET cases, noreply means we send nothing back.
182 resp->skip = true;
183 } else {
184 // TODO: This should be movable to the worker thread.
185 // Convert the binprot response into a miss response.
186 // The header requires knowing a bunch of stateful crap, so rather
187 // than simply writing out a "new" miss response we mangle what's
188 // already there.
189 if (c->protocol == binary_prot) {
190 protocol_binary_response_header *header =
191 (protocol_binary_response_header *)resp->wbuf;
192
193 // cut the extra nbytes off of the body_len
194 uint32_t body_len = ntohl(header->response.bodylen);
195 uint8_t hdr_len = header->response.extlen;
196 body_len -= resp->iov[p->iovec_data].iov_len + hdr_len;
197 resp->tosend -= resp->iov[p->iovec_data].iov_len + hdr_len;
198 header->response.extlen = 0;
199 header->response.status = (uint16_t)htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT);
200 header->response.bodylen = htonl(body_len);
201
202 // truncate the data response.
203 resp->iov[p->iovec_data].iov_len = 0;
204 // wipe the extlen iov... wish it was just a flat buffer.
205 resp->iov[p->iovec_data-1].iov_len = 0;
206 resp->chunked_data_iov = 0;
207 } else {
208 int i;
209 // Meta commands have EN status lines for miss, rather than
210 // END as a trailer as per normal ascii.
211 if (resp->iov[0].iov_len >= 3
212 && memcmp(resp->iov[0].iov_base, "VA ", 3) == 0) {
213 // TODO: These miss translators should use specific callback
214 // functions attached to the io wrap. This is weird :(
215 resp->iovcnt = 1;
216 resp->iov[0].iov_len = 4;
217 resp->iov[0].iov_base = "EN\r\n";
218 resp->tosend = 4;
219 } else {
220 // Wipe the iovecs up through our data injection.
221 // Allows trailers to be returned (END)
222 for (i = 0; i <= p->iovec_data; i++) {
223 resp->tosend -= resp->iov[i].iov_len;
224 resp->iov[i].iov_len = 0;
225 resp->iov[i].iov_base = NULL;
226 }
227 }
228 resp->chunked_total = 0;
229 resp->chunked_data_iov = 0;
230 }
231 }
232 p->miss = true;
233 } else {
234 assert(read_it->slabs_clsid != 0);
235 // TODO: should always use it instead of ITEM_data to kill more
236 // chunked special casing.
237 if ((read_it->it_flags & ITEM_CHUNKED) == 0) {
238 resp->iov[p->iovec_data].iov_base = ITEM_data(read_it);
239 }
240 p->miss = false;
241 }
242
243 p->active = false;
244 //assert(c->io_wrapleft >= 0);
245
246 return_io_pending((io_pending_t *)p);
247 }
248
storage_get_item(conn * c,item * it,mc_resp * resp)249 int storage_get_item(conn *c, item *it, mc_resp *resp) {
250 #ifdef NEED_ALIGN
251 item_hdr hdr;
252 memcpy(&hdr, ITEM_data(it), sizeof(hdr));
253 #else
254 item_hdr *hdr = (item_hdr *)ITEM_data(it);
255 #endif
256 io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_EXTSTORE);
257 size_t ntotal = ITEM_ntotal(it);
258 unsigned int clsid = slabs_clsid(ntotal);
259 item *new_it;
260 bool chunked = false;
261 if (ntotal > settings.slab_chunk_size_max) {
262 // Pull a chunked item header.
263 client_flags_t flags;
264 FLAGS_CONV(it, flags);
265 new_it = item_alloc(ITEM_key(it), it->nkey, flags, it->exptime, it->nbytes);
266 assert(new_it == NULL || (new_it->it_flags & ITEM_CHUNKED));
267 chunked = true;
268 } else {
269 new_it = do_item_alloc_pull(ntotal, clsid);
270 }
271 if (new_it == NULL)
272 return -1;
273 // so we can free the chunk on a miss
274 new_it->slabs_clsid = clsid;
275
276 io_pending_storage_t *p = do_cache_alloc(c->thread->io_cache);
277 // this is a re-cast structure, so assert that we never outsize it.
278 assert(sizeof(io_pending_t) >= sizeof(io_pending_storage_t));
279 memset(p, 0, sizeof(io_pending_storage_t));
280 p->active = true;
281 p->miss = false;
282 p->badcrc = false;
283 p->noreply = c->noreply;
284 p->thread = c->thread;
285 p->return_cb = storage_return_cb;
286 p->finalize_cb = storage_finalize_cb;
287 // io_pending owns the reference for this object now.
288 p->hdr_it = it;
289 p->resp = resp;
290 p->io_queue_type = IO_QUEUE_EXTSTORE;
291 obj_io *eio = &p->io_ctx;
292
293 // FIXME: error handling.
294 if (chunked) {
295 unsigned int ciovcnt = 0;
296 size_t remain = new_it->nbytes;
297 item_chunk *chunk = (item_chunk *) ITEM_schunk(new_it);
298 // TODO: This might make sense as a _global_ cache vs a per-thread.
299 // but we still can't load objects requiring > IOV_MAX iovs.
300 // In the meantime, these objects are rare/slow enough that
301 // malloc/freeing a statically sized object won't cause us much pain.
302 eio->iov = malloc(sizeof(struct iovec) * IOV_MAX);
303 if (eio->iov == NULL) {
304 item_remove(new_it);
305 do_cache_free(c->thread->io_cache, p);
306 return -1;
307 }
308
309 // fill the header so we can get the full data + crc back.
310 eio->iov[0].iov_base = new_it;
311 eio->iov[0].iov_len = ITEM_ntotal(new_it) - new_it->nbytes;
312 ciovcnt++;
313
314 while (remain > 0) {
315 chunk = do_item_alloc_chunk(chunk, remain);
316 // FIXME: _pure evil_, silently erroring if item is too large.
317 if (chunk == NULL || ciovcnt > IOV_MAX-1) {
318 item_remove(new_it);
319 free(eio->iov);
320 // TODO: wrapper function for freeing up an io wrap?
321 eio->iov = NULL;
322 do_cache_free(c->thread->io_cache, p);
323 return -1;
324 }
325 eio->iov[ciovcnt].iov_base = chunk->data;
326 eio->iov[ciovcnt].iov_len = (remain < chunk->size) ? remain : chunk->size;
327 chunk->used = (remain < chunk->size) ? remain : chunk->size;
328 remain -= chunk->size;
329 ciovcnt++;
330 }
331
332 eio->iovcnt = ciovcnt;
333 }
334
335 // Chunked or non chunked we reserve a response iov here.
336 p->iovec_data = resp->iovcnt;
337 int iovtotal = (c->protocol == binary_prot) ? it->nbytes - 2 : it->nbytes;
338 if (chunked) {
339 resp_add_chunked_iov(resp, new_it, iovtotal);
340 } else {
341 resp_add_iov(resp, "", iovtotal);
342 }
343
344 // We can't bail out anymore, so mc_resp owns the IO from here.
345 resp->io_pending = (io_pending_t *)p;
346
347 eio->buf = (void *)new_it;
348 p->c = c;
349
350 // We need to stack the sub-struct IO's together for submission.
351 eio->next = q->stack_ctx;
352 q->stack_ctx = eio;
353
354 // No need to stack the io_pending's together as they live on mc_resp's.
355 assert(q->count >= 0);
356 q->count++;
357 // reference ourselves for the callback.
358 eio->data = (void *)p;
359
360 // Now, fill in io->io based on what was in our header.
361 #ifdef NEED_ALIGN
362 eio->page_version = hdr.page_version;
363 eio->page_id = hdr.page_id;
364 eio->offset = hdr.offset;
365 #else
366 eio->page_version = hdr->page_version;
367 eio->page_id = hdr->page_id;
368 eio->offset = hdr->offset;
369 #endif
370 eio->len = ntotal;
371 eio->mode = OBJ_IO_READ;
372 eio->cb = _storage_get_item_cb;
373
374 // FIXME: This stat needs to move to reflect # of flash hits vs misses
375 // for now it's a good gauge on how often we request out to flash at
376 // least.
377 pthread_mutex_lock(&c->thread->stats.mutex);
378 c->thread->stats.get_extstore++;
379 pthread_mutex_unlock(&c->thread->stats.mutex);
380
381 return 0;
382 }
383
storage_submit_cb(io_queue_t * q)384 void storage_submit_cb(io_queue_t *q) {
385 // Don't need to do anything special for extstore.
386 extstore_submit(q->ctx, q->stack_ctx);
387
388 // need to reset the stack for next use.
389 q->stack_ctx = NULL;
390 }
391
392 // Runs locally in worker thread.
recache_or_free(io_pending_t * pending)393 static void recache_or_free(io_pending_t *pending) {
394 // re-cast to our specific struct.
395 io_pending_storage_t *p = (io_pending_storage_t *)pending;
396
397 conn *c = p->c;
398 obj_io *io = &p->io_ctx;
399 assert(io != NULL);
400 item *it = (item *)io->buf;
401 assert(c != NULL);
402 bool do_free = true;
403 if (p->active) {
404 // If request never dispatched, free the read buffer but leave the
405 // item header alone.
406 do_free = false;
407 size_t ntotal = ITEM_ntotal(p->hdr_it);
408 slabs_free(it, ntotal, slabs_clsid(ntotal));
409
410 io_queue_t *q = conn_io_queue_get(c, p->io_queue_type);
411 q->count--;
412 assert(q->count >= 0);
413 pthread_mutex_lock(&c->thread->stats.mutex);
414 c->thread->stats.get_aborted_extstore++;
415 pthread_mutex_unlock(&c->thread->stats.mutex);
416 } else if (p->miss) {
417 // If request was ultimately a miss, unlink the header.
418 do_free = false;
419 size_t ntotal = ITEM_ntotal(p->hdr_it);
420 item_unlink(p->hdr_it);
421 slabs_free(it, ntotal, slabs_clsid(ntotal));
422 pthread_mutex_lock(&c->thread->stats.mutex);
423 c->thread->stats.miss_from_extstore++;
424 if (p->badcrc)
425 c->thread->stats.badcrc_from_extstore++;
426 pthread_mutex_unlock(&c->thread->stats.mutex);
427 } else if (settings.ext_recache_rate) {
428 // hashvalue is cuddled during store
429 uint32_t hv = (uint32_t)it->time;
430 // opt to throw away rather than wait on a lock.
431 void *hold_lock = item_trylock(hv);
432 if (hold_lock != NULL) {
433 item *h_it = p->hdr_it;
434 uint8_t flags = ITEM_LINKED|ITEM_FETCHED|ITEM_ACTIVE;
435 // Item must be recently hit at least twice to recache.
436 if (((h_it->it_flags & flags) == flags) &&
437 h_it->time > current_time - ITEM_UPDATE_INTERVAL &&
438 c->recache_counter++ % settings.ext_recache_rate == 0) {
439 do_free = false;
440 // In case it's been updated.
441 it->exptime = h_it->exptime;
442 it->it_flags &= ~ITEM_LINKED;
443 it->refcount = 0;
444 it->h_next = NULL; // might not be necessary.
445 STORAGE_delete(c->thread->storage, h_it);
446 item_replace(h_it, it, hv, ITEM_get_cas(h_it));
447 pthread_mutex_lock(&c->thread->stats.mutex);
448 c->thread->stats.recache_from_extstore++;
449 pthread_mutex_unlock(&c->thread->stats.mutex);
450 }
451 }
452 if (hold_lock)
453 item_trylock_unlock(hold_lock);
454 }
455 if (do_free)
456 slabs_free(it, ITEM_ntotal(it), ITEM_clsid(it));
457
458 p->io_ctx.buf = NULL;
459 p->io_ctx.next = NULL;
460 p->active = false;
461
462 // TODO: reuse lock and/or hv.
463 item_remove(p->hdr_it);
464 }
465
466 // Called after an IO has been returned to the worker thread.
storage_return_cb(io_pending_t * pending)467 static void storage_return_cb(io_pending_t *pending) {
468 io_queue_t *q = conn_io_queue_get(pending->c, pending->io_queue_type);
469 q->count--;
470 if (q->count == 0) {
471 conn_worker_readd(pending->c);
472 }
473 }
474
475 // Called after responses have been transmitted. Need to free up related data.
storage_finalize_cb(io_pending_t * pending)476 static void storage_finalize_cb(io_pending_t *pending) {
477 recache_or_free(pending);
478 io_pending_storage_t *p = (io_pending_storage_t *)pending;
479 obj_io *io = &p->io_ctx;
480 // malloc'ed iovec list used for chunked extstore fetches.
481 if (io->iov) {
482 free(io->iov);
483 io->iov = NULL;
484 }
485 // don't need to free the main context, since it's embedded.
486 }
487
488 /*
489 * WRITE FLUSH THREAD
490 */
491
storage_write(void * storage,const int clsid,const int item_age)492 static int storage_write(void *storage, const int clsid, const int item_age) {
493 int did_moves = 0;
494 struct lru_pull_tail_return it_info;
495
496 it_info.it = NULL;
497 lru_pull_tail(clsid, COLD_LRU, 0, LRU_PULL_RETURN_ITEM, 0, &it_info);
498 /* Item is locked, and we have a reference to it. */
499 if (it_info.it == NULL) {
500 return did_moves;
501 }
502
503 obj_io io;
504 item *it = it_info.it;
505 /* First, storage for the header object */
506 size_t orig_ntotal = ITEM_ntotal(it);
507 client_flags_t flags;
508 if ((it->it_flags & ITEM_HDR) == 0 &&
509 (item_age == 0 || current_time - it->time > item_age)) {
510 FLAGS_CONV(it, flags);
511 item *hdr_it = do_item_alloc(ITEM_key(it), it->nkey, flags, it->exptime, sizeof(item_hdr));
512 /* Run the storage write understanding the start of the item is dirty.
513 * We will fill it (time/exptime/etc) from the header item on read.
514 */
515 if (hdr_it != NULL) {
516 int bucket = (it->it_flags & ITEM_CHUNKED) ?
517 PAGE_BUCKET_CHUNKED : PAGE_BUCKET_DEFAULT;
518 // Compress soon to expire items into similar pages.
519 if (it->exptime - current_time < settings.ext_low_ttl) {
520 bucket = PAGE_BUCKET_LOWTTL;
521 }
522 hdr_it->it_flags |= ITEM_HDR;
523 io.len = orig_ntotal;
524 io.mode = OBJ_IO_WRITE;
525 // NOTE: when the item is read back in, the slab mover
526 // may see it. Important to have refcount>=2 or ~ITEM_LINKED
527 assert(it->refcount >= 2);
528 // NOTE: write bucket vs free page bucket will disambiguate once
529 // lowttl feature is better understood.
530 if (extstore_write_request(storage, bucket, bucket, &io) == 0) {
531 // cuddle the hash value into the time field so we don't have
532 // to recalculate it.
533 item *buf_it = (item *) io.buf;
534 buf_it->time = it_info.hv;
535 // copy from past the headers + time headers.
536 // TODO: should be in items.c
537 if (it->it_flags & ITEM_CHUNKED) {
538 // Need to loop through the item and copy
539 item_chunk *sch = (item_chunk *) ITEM_schunk(it);
540 int remain = orig_ntotal;
541 int copied = 0;
542 // copy original header
543 int hdrtotal = ITEM_ntotal(it) - it->nbytes;
544 memcpy((char *)io.buf+STORE_OFFSET, (char *)it+STORE_OFFSET, hdrtotal - STORE_OFFSET);
545 copied = hdrtotal;
546 // copy data in like it were one large object.
547 while (sch && remain) {
548 assert(remain >= sch->used);
549 memcpy((char *)io.buf+copied, sch->data, sch->used);
550 // FIXME: use one variable?
551 remain -= sch->used;
552 copied += sch->used;
553 sch = sch->next;
554 }
555 } else {
556 memcpy((char *)io.buf+STORE_OFFSET, (char *)it+STORE_OFFSET, io.len-STORE_OFFSET);
557 }
558 // crc what we copied so we can do it sequentially.
559 buf_it->it_flags &= ~ITEM_LINKED;
560 buf_it->exptime = crc32c(0, (char*)io.buf+STORE_OFFSET, orig_ntotal-STORE_OFFSET);
561 extstore_write(storage, &io);
562 item_hdr *hdr = (item_hdr *) ITEM_data(hdr_it);
563 hdr->page_version = io.page_version;
564 hdr->page_id = io.page_id;
565 hdr->offset = io.offset;
566 // overload nbytes for the header it
567 hdr_it->nbytes = it->nbytes;
568 /* success! Now we need to fill relevant data into the new
569 * header and replace. Most of this requires the item lock
570 */
571 /* CAS gets set while linking. Copy post-replace */
572 item_replace(it, hdr_it, it_info.hv, ITEM_get_cas(it));
573 do_item_remove(hdr_it);
574 did_moves = 1;
575 LOGGER_LOG(NULL, LOG_EVICTIONS, LOGGER_EXTSTORE_WRITE, it, bucket);
576 } else {
577 /* Failed to write for some reason, can't continue. */
578 slabs_free(hdr_it, ITEM_ntotal(hdr_it), ITEM_clsid(hdr_it));
579 }
580 }
581 }
582 do_item_remove(it);
583 item_unlock(it_info.hv);
584 return did_moves;
585 }
586
587 static pthread_t storage_write_tid;
588 static pthread_mutex_t storage_write_plock;
589 #define WRITE_SLEEP_MIN 200
590
storage_write_thread(void * arg)591 static void *storage_write_thread(void *arg) {
592 void *storage = arg;
593 // NOTE: ignoring overflow since that would take years of uptime in a
594 // specific load pattern of never going to sleep.
595 unsigned int backoff[MAX_NUMBER_OF_SLAB_CLASSES] = {0};
596 unsigned int counter = 0;
597 useconds_t to_sleep = WRITE_SLEEP_MIN;
598 logger *l = logger_create();
599 if (l == NULL) {
600 fprintf(stderr, "Failed to allocate logger for storage compaction thread\n");
601 abort();
602 }
603
604 pthread_mutex_lock(&storage_write_plock);
605 // The compaction checker is CPU intensive, so we do a loose fudging to
606 // only activate it once every "slab page size" worth of bytes written.
607 // I was calling the compact checker once per run through this main loop,
608 // but we can end up doing lots of short loops without sleeping and end up
609 // calling the compact checker pretty frequently.
610 int check_compact = settings.slab_page_size;
611
612 while (1) {
613 // cache per-loop to avoid calls to the slabs_clsid() search loop
614 int min_class = slabs_clsid(settings.ext_item_size);
615 unsigned int global_pages = global_page_pool_size(NULL);
616 bool do_sleep = true;
617 int target_pages = 0;
618 if (global_pages < settings.ext_global_pool_min) {
619 target_pages = settings.ext_global_pool_min - global_pages;
620 }
621 counter++;
622 if (to_sleep > settings.ext_max_sleep)
623 to_sleep = settings.ext_max_sleep;
624
625 for (int x = 0; x < MAX_NUMBER_OF_SLAB_CLASSES; x++) {
626 bool did_move = false;
627 bool mem_limit_reached = false;
628 unsigned int chunks_free;
629 int item_age;
630
631 if (min_class > x || (backoff[x] && (counter % backoff[x] != 0))) {
632 continue;
633 }
634
635 // Avoid extra slab lock calls during heavy writing.
636 unsigned int chunks_perpage = 0;
637 chunks_free = slabs_available_chunks(x, &mem_limit_reached,
638 &chunks_perpage);
639
640 if (chunks_perpage == 0) {
641 // no slab class here, skip.
642 continue;
643 }
644 unsigned int target = chunks_perpage * target_pages;
645 // Loose estimate for cutting the calls to compacter
646 unsigned int chunk_size = settings.slab_page_size / chunks_perpage;
647
648 // storage_write() will fail and cut loop after filling write buffer.
649 while (1) {
650 // if we are low on chunks and no spare, push out early.
651 if (chunks_free < target) {
652 item_age = 0;
653 } else {
654 item_age = settings.ext_item_age;
655 }
656 if (storage_write(storage, x, item_age)) {
657 chunks_free++; // Allow stopping if we've done enough this loop
658 check_compact -= chunk_size;
659 // occasionally kick the compact checker.
660 if (check_compact < 0) {
661 pthread_cond_signal(&storage_compact_cond);
662 check_compact = settings.slab_page_size;
663 }
664 did_move = true;
665 do_sleep = false;
666 if (to_sleep > WRITE_SLEEP_MIN)
667 to_sleep /= 2;
668 } else {
669 break;
670 }
671 }
672
673 if (!did_move) {
674 backoff[x]++;
675 } else {
676 backoff[x] = 1;
677 }
678 }
679
680 // flip lock so we can be paused or stopped
681 pthread_mutex_unlock(&storage_write_plock);
682 if (do_sleep) {
683 // Only do backoffs on other slab classes if we're actively
684 // flushing at least one class.
685 for (int x = 0; x < MAX_NUMBER_OF_SLAB_CLASSES; x++) {
686 backoff[x] = 1;
687 }
688
689 // call the compact checker occasionally even if we're just
690 // sleeping.
691 check_compact -= to_sleep * 10;
692 if (check_compact < 0) {
693 pthread_cond_signal(&storage_compact_cond);
694 check_compact = settings.slab_page_size;
695 }
696
697 usleep(to_sleep);
698 to_sleep++;
699 }
700 pthread_mutex_lock(&storage_write_plock);
701 }
702 return NULL;
703 }
704
705 // TODO
706 // logger needs logger_destroy() to exist/work before this is safe.
707 /*int stop_storage_write_thread(void) {
708 int ret;
709 pthread_mutex_lock(&lru_maintainer_lock);
710 do_run_lru_maintainer_thread = 0;
711 pthread_mutex_unlock(&lru_maintainer_lock);
712 // WAKEUP SIGNAL
713 if ((ret = pthread_join(lru_maintainer_tid, NULL)) != 0) {
714 fprintf(stderr, "Failed to stop LRU maintainer thread: %s\n", strerror(ret));
715 return -1;
716 }
717 settings.lru_maintainer_thread = false;
718 return 0;
719 }*/
720
storage_write_pause(void)721 void storage_write_pause(void) {
722 pthread_mutex_lock(&storage_write_plock);
723 }
724
storage_write_resume(void)725 void storage_write_resume(void) {
726 pthread_mutex_unlock(&storage_write_plock);
727 }
728
start_storage_write_thread(void * arg)729 int start_storage_write_thread(void *arg) {
730 int ret;
731
732 pthread_mutex_init(&storage_write_plock, NULL);
733 if ((ret = pthread_create(&storage_write_tid, NULL,
734 storage_write_thread, arg)) != 0) {
735 fprintf(stderr, "Can't create storage_write thread: %s\n",
736 strerror(ret));
737 return -1;
738 }
739 thread_setname(storage_write_tid, "mc-ext-write");
740
741 return 0;
742 }
743
744 /*** COMPACTOR ***/
745 typedef struct __storage_buk {
746 unsigned int bucket;
747 unsigned int low_page;
748 unsigned int lowest_page;
749 uint64_t low_version;
750 uint64_t lowest_version;
751 unsigned int pages_free;
752 unsigned int pages_used;
753 unsigned int pages_total;
754 unsigned int bytes_fragmented; // fragmented bytes for low page
755 bool do_compact; // indicate this bucket should do a compaction.
756 bool do_compact_drop;
757 } _storage_buk;
758
759 struct _compact_flags {
760 unsigned int drop_unread : 1;
761 unsigned int has_coldcompact : 1;
762 unsigned int has_old : 1;
763 unsigned int use_old : 1;
764 };
765
766 /* Fetch stats from the external storage system and decide to compact.
767 */
storage_compact_check(void * storage,logger * l,uint32_t * page_id,uint64_t * page_version,uint64_t * page_size,struct _compact_flags * flags)768 static int storage_compact_check(void *storage, logger *l,
769 uint32_t *page_id, uint64_t *page_version,
770 uint64_t *page_size, struct _compact_flags *flags) {
771 struct extstore_stats st;
772 _storage_buk buckets[PAGE_BUCKET_COUNT];
773 _storage_buk *buk = NULL;
774 uint64_t frag_limit;
775 extstore_get_stats(storage, &st);
776 if (st.pages_used == 0)
777 return 0;
778
779 for (int x = 0; x < PAGE_BUCKET_COUNT; x++) {
780 memset(&buckets[x], 0, sizeof(_storage_buk));
781 buckets[x].low_version = ULLONG_MAX;
782 buckets[x].lowest_version = ULLONG_MAX;
783 }
784 flags->drop_unread = 0;
785
786 frag_limit = st.page_size * settings.ext_max_frag;
787 LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_FRAGINFO,
788 NULL, settings.ext_max_frag, frag_limit);
789 st.page_data = calloc(st.page_count, sizeof(struct extstore_page_data));
790 extstore_get_page_data(storage, &st);
791
792 // find either the most fragmented page or the lowest version.
793 for (int x = 0; x < st.page_count; x++) {
794 buk = &buckets[st.page_data[x].free_bucket];
795 buk->pages_total++;
796 if (st.page_data[x].version == 0) {
797 buk->pages_free++;
798 // free pages don't contribute after this point.
799 continue;
800 } else {
801 buk->pages_used++;
802 }
803
804 // skip pages actively being used.
805 if (st.page_data[x].active) {
806 continue;
807 }
808
809 if (st.page_data[x].version < buk->lowest_version) {
810 buk->lowest_page = x;
811 buk->lowest_version = st.page_data[x].version;
812 }
813 // track the most fragmented page.
814 unsigned int frag = st.page_size - st.page_data[x].bytes_used;
815 if (st.page_data[x].bytes_used < frag_limit && frag > buk->bytes_fragmented) {
816 buk->low_page = x;
817 buk->low_version = st.page_data[x].version;
818 buk->bytes_fragmented = frag;
819 }
820 }
821 *page_size = st.page_size;
822 free(st.page_data);
823
824 buk = &buckets[PAGE_BUCKET_COLDCOMPACT];
825 if (buk->pages_total != 0) {
826 flags->has_coldcompact = 1;
827 if (buk->pages_free == 0 && buk->lowest_version != ULLONG_MAX) {
828 extstore_evict_page(storage, buk->lowest_page, buk->lowest_version);
829 return 0;
830 }
831 }
832
833 buk = &buckets[PAGE_BUCKET_OLD];
834 if (buk->pages_total != 0) {
835 flags->has_old = 1;
836 if (buk->pages_free == 0 && buk->lowest_version != ULLONG_MAX) {
837 extstore_evict_page(storage, buk->lowest_page, buk->lowest_version);
838 return 0;
839 }
840 }
841
842 for (int x = 0; x < PAGE_BUCKET_COUNT; x++) {
843 buk = &buckets[x];
844 assert(buk->pages_total == (buk->pages_used + buk->pages_free));
845 unsigned int pages_total = buk->pages_total;
846 // only process buckets which have dedicated pages assigned.
847 // LOWTTL skips compaction.
848 if (pages_total == 0 || x == PAGE_BUCKET_LOWTTL)
849 continue;
850
851 if (buk->pages_free < settings.ext_compact_under) {
852 if (buk->low_version != ULLONG_MAX) {
853 // found a normally defraggable page.
854 *page_id = buk->low_page;
855 *page_version = buk->low_version;
856 return 1;
857 } else if (buk->pages_free < settings.ext_drop_under
858 && buk->lowest_version != ULLONG_MAX) {
859
860 if (x == PAGE_BUCKET_COLDCOMPACT || x == PAGE_BUCKET_OLD) {
861 // this freeing technique doesn't apply to these buckets.
862 // instead these buckets are eviction or normal
863 // defragmentation only.
864 continue;
865 }
866 // Nothing defraggable. Check for other usable conditions.
867 if (settings.ext_drop_unread) {
868 flags->drop_unread = 1;
869 }
870
871 // If OLD and/or COLDCOMPACT pages exist we should always have
872 // one free page in those buckets, so we can always attempt to
873 // defrag into them.
874 // If only COLDCOMPACT exists this will attempt to segment off
875 // parts of a page that haven't been used.
876 // If OLD exists everything else in this "oldest page" goes
877 // into the OLD stream.
878 if (flags->drop_unread || flags->has_coldcompact || flags->has_old) {
879 // only actually use the old flag if we can't compact.
880 flags->use_old = flags->has_old;
881 *page_id = buk->lowest_page;
882 *page_version = buk->lowest_version;
883 return 1;
884 }
885 }
886 }
887 }
888
889 return 0;
890 }
891
892 #define MIN_STORAGE_COMPACT_SLEEP 1000
893
894 struct storage_compact_wrap {
895 obj_io io;
896 pthread_mutex_t lock; // gates the bools.
897 bool done;
898 bool submitted;
899 bool miss; // version flipped out from under us
900 };
901
storage_compact_readback(void * storage,logger * l,struct _compact_flags flags,char * readback_buf,uint32_t page_id,uint64_t page_version,uint32_t page_offset,uint64_t read_size)902 static void storage_compact_readback(void *storage, logger *l,
903 struct _compact_flags flags, char *readback_buf,
904 uint32_t page_id, uint64_t page_version, uint32_t page_offset, uint64_t read_size) {
905 uint64_t offset = 0;
906 unsigned int rescues = 0;
907 unsigned int lost = 0;
908 unsigned int skipped = 0;
909 unsigned int rescue_cold = 0;
910 unsigned int rescue_old = 0;
911
912 while (offset < read_size) {
913 item *hdr_it = NULL;
914 item_hdr *hdr = NULL;
915 item *it = (item *)(readback_buf+offset);
916 unsigned int ntotal;
917 // probably zeroed out junk at the end of the wbuf
918 if (it->nkey == 0) {
919 break;
920 }
921
922 ntotal = ITEM_ntotal(it);
923 uint32_t hv = (uint32_t)it->time;
924 item_lock(hv);
925 // We don't have a conn and don't need to do most of do_item_get
926 hdr_it = assoc_find(ITEM_key(it), it->nkey, hv);
927 if (hdr_it != NULL) {
928 bool do_write = false;
929 int bucket = flags.use_old ? PAGE_BUCKET_OLD : PAGE_BUCKET_COMPACT;
930 refcount_incr(hdr_it);
931
932 // Check validity but don't bother removing it.
933 if ((hdr_it->it_flags & ITEM_HDR) && !item_is_flushed(hdr_it) &&
934 (hdr_it->exptime == 0 || hdr_it->exptime > current_time)) {
935 hdr = (item_hdr *)ITEM_data(hdr_it);
936 if (hdr->page_id == page_id && hdr->page_version == page_version
937 && hdr->offset == (int)offset + page_offset) {
938 // Item header is still completely valid.
939 extstore_delete(storage, page_id, page_version, 1, ntotal);
940 // special case inactive items.
941 do_write = true;
942 if (GET_LRU(hdr_it->slabs_clsid) == COLD_LRU) {
943 if (flags.has_coldcompact) {
944 // Write the cold items to a different stream.
945 bucket = PAGE_BUCKET_COLDCOMPACT;
946 } else if (flags.drop_unread) {
947 do_write = false;
948 skipped++;
949 }
950 }
951 }
952 }
953
954 if (do_write) {
955 bool do_update = false;
956 int tries;
957 obj_io io;
958 io.len = ntotal;
959 io.mode = OBJ_IO_WRITE;
960 for (tries = 10; tries > 0; tries--) {
961 if (extstore_write_request(storage, bucket, bucket, &io) == 0) {
962 memcpy(io.buf, it, io.len);
963 extstore_write(storage, &io);
964 do_update = true;
965 break;
966 } else {
967 usleep(1000);
968 }
969 }
970
971 if (do_update) {
972 bool rescued = false;
973 if (it->refcount == 2) {
974 hdr->page_version = io.page_version;
975 hdr->page_id = io.page_id;
976 hdr->offset = io.offset;
977 rescued = true;
978 } else {
979 // re-alloc and replace header.
980 client_flags_t flags;
981 FLAGS_CONV(hdr_it, flags);
982 item *new_it = do_item_alloc(ITEM_key(hdr_it), hdr_it->nkey, flags, hdr_it->exptime, sizeof(item_hdr));
983 if (new_it) {
984 // need to preserve the original item flags, but we
985 // start unlinked, with linked being added during
986 // item_replace below.
987 new_it->it_flags = hdr_it->it_flags & (~ITEM_LINKED);
988 new_it->time = hdr_it->time;
989 new_it->nbytes = hdr_it->nbytes;
990
991 // copy the hdr data.
992 item_hdr *new_hdr = (item_hdr *) ITEM_data(new_it);
993 new_hdr->page_version = io.page_version;
994 new_hdr->page_id = io.page_id;
995 new_hdr->offset = io.offset;
996
997 // replace the item in the hash table.
998 item_replace(hdr_it, new_it, hv, ITEM_get_cas(hdr_it));
999 do_item_remove(new_it); // release our reference.
1000 rescued = true;
1001 } else {
1002 lost++;
1003 }
1004 }
1005
1006 if (rescued) {
1007 rescues++;
1008 if (bucket == PAGE_BUCKET_COLDCOMPACT) {
1009 rescue_cold++;
1010 } else if (bucket == PAGE_BUCKET_OLD) {
1011 rescue_old++;
1012 }
1013 }
1014 } else {
1015 lost++;
1016 }
1017 }
1018
1019 do_item_remove(hdr_it);
1020 }
1021
1022 item_unlock(hv);
1023 offset += ntotal;
1024 if (read_size - offset < sizeof(struct _stritem))
1025 break;
1026 }
1027
1028 STATS_LOCK();
1029 stats.extstore_compact_lost += lost;
1030 stats.extstore_compact_rescues += rescues;
1031 stats.extstore_compact_skipped += skipped;
1032 stats.extstore_compact_resc_cold += rescue_cold;
1033 stats.extstore_compact_resc_old += rescue_old;
1034 STATS_UNLOCK();
1035 LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_READ_END,
1036 NULL, page_id, offset, rescues, lost, skipped);
1037 }
1038
1039 // wrap lock is held while waiting for this callback, preventing caller thread
1040 // from fast-looping.
_storage_compact_cb(void * e,obj_io * io,int ret)1041 static void _storage_compact_cb(void *e, obj_io *io, int ret) {
1042 struct storage_compact_wrap *wrap = (struct storage_compact_wrap *)io->data;
1043 assert(wrap->submitted == true);
1044
1045 if (ret < 1) {
1046 wrap->miss = true;
1047 }
1048 wrap->done = true;
1049
1050 pthread_mutex_unlock(&wrap->lock);
1051 }
1052
1053 // TODO: hoist the storage bits from lru_maintainer_thread in here.
1054 // would be nice if they could avoid hammering the same locks though?
1055 // I guess it's only COLD. that's probably fine.
storage_compact_thread(void * arg)1056 static void *storage_compact_thread(void *arg) {
1057 void *storage = arg;
1058 bool compacting = false;
1059 uint64_t page_version = 0;
1060 uint64_t page_size = 0;
1061 uint32_t page_offset = 0;
1062 uint32_t page_id = 0;
1063 struct _compact_flags flags;
1064 char *readback_buf = NULL;
1065 struct storage_compact_wrap wrap;
1066 memset(&flags, 0, sizeof(flags));
1067
1068 logger *l = logger_create();
1069 if (l == NULL) {
1070 fprintf(stderr, "Failed to allocate logger for storage compaction thread\n");
1071 abort();
1072 }
1073
1074 readback_buf = malloc(settings.ext_wbuf_size);
1075 if (readback_buf == NULL) {
1076 fprintf(stderr, "Failed to allocate readback buffer for storage compaction thread\n");
1077 abort();
1078 }
1079
1080 pthread_mutex_init(&wrap.lock, NULL);
1081 wrap.done = false;
1082 wrap.submitted = false;
1083 wrap.io.data = &wrap;
1084 wrap.io.iov = NULL;
1085 wrap.io.buf = (void *)readback_buf;
1086
1087 wrap.io.len = settings.ext_wbuf_size;
1088 wrap.io.mode = OBJ_IO_READ;
1089 wrap.io.cb = _storage_compact_cb;
1090 pthread_mutex_lock(&storage_compact_plock);
1091
1092 while (1) {
1093 if (!compacting && storage_compact_check(storage, l,
1094 &page_id, &page_version, &page_size, &flags)) {
1095 page_offset = 0;
1096 compacting = true;
1097 LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_START,
1098 NULL, page_id, page_version);
1099 } else {
1100 pthread_cond_wait(&storage_compact_cond, &storage_compact_plock);
1101 }
1102
1103 while (compacting) {
1104 pthread_mutex_lock(&wrap.lock);
1105 if (page_offset < page_size && !wrap.done && !wrap.submitted) {
1106 wrap.io.page_version = page_version;
1107 wrap.io.page_id = page_id;
1108 wrap.io.offset = page_offset;
1109 // FIXME: should be smarter about io->next (unlink at use?)
1110 wrap.io.next = NULL;
1111 wrap.submitted = true;
1112 wrap.miss = false;
1113
1114 extstore_submit_bg(storage, &wrap.io);
1115 } else if (wrap.miss) {
1116 LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_ABORT,
1117 NULL, page_id);
1118 wrap.done = false;
1119 wrap.submitted = false;
1120 compacting = false;
1121 } else if (wrap.submitted && wrap.done) {
1122 LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_READ_START,
1123 NULL, page_id, page_offset);
1124 storage_compact_readback(storage, l, flags,
1125 readback_buf, page_id, page_version, page_offset,
1126 settings.ext_wbuf_size);
1127 page_offset += settings.ext_wbuf_size;
1128 wrap.done = false;
1129 wrap.submitted = false;
1130 } else if (page_offset >= page_size) {
1131 compacting = false;
1132 wrap.done = false;
1133 wrap.submitted = false;
1134 extstore_close_page(storage, page_id, page_version);
1135 LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_END,
1136 NULL, page_id);
1137 // short cooling period between defragmentation runs.
1138 usleep(MIN_STORAGE_COMPACT_SLEEP);
1139 }
1140 pthread_mutex_unlock(&wrap.lock);
1141 }
1142 }
1143 free(readback_buf);
1144
1145 return NULL;
1146 }
1147
1148 // TODO
1149 // logger needs logger_destroy() to exist/work before this is safe.
1150 /*int stop_storage_compact_thread(void) {
1151 int ret;
1152 pthread_mutex_lock(&lru_maintainer_lock);
1153 do_run_lru_maintainer_thread = 0;
1154 pthread_mutex_unlock(&lru_maintainer_lock);
1155 if ((ret = pthread_join(lru_maintainer_tid, NULL)) != 0) {
1156 fprintf(stderr, "Failed to stop LRU maintainer thread: %s\n", strerror(ret));
1157 return -1;
1158 }
1159 settings.lru_maintainer_thread = false;
1160 return 0;
1161 }*/
1162
storage_compact_pause(void)1163 void storage_compact_pause(void) {
1164 pthread_mutex_lock(&storage_compact_plock);
1165 }
1166
storage_compact_resume(void)1167 void storage_compact_resume(void) {
1168 pthread_mutex_unlock(&storage_compact_plock);
1169 }
1170
start_storage_compact_thread(void * arg)1171 int start_storage_compact_thread(void *arg) {
1172 int ret;
1173
1174 pthread_mutex_init(&storage_compact_plock, NULL);
1175 pthread_cond_init(&storage_compact_cond, NULL);
1176 if ((ret = pthread_create(&storage_compact_tid, NULL,
1177 storage_compact_thread, arg)) != 0) {
1178 fprintf(stderr, "Can't create storage_compact thread: %s\n",
1179 strerror(ret));
1180 return -1;
1181 }
1182 thread_setname(storage_compact_tid, "mc-ext-compact");
1183
1184 return 0;
1185 }
1186
1187 /*** UTILITY ***/
1188 // /path/to/file:100G:bucket1
1189 // FIXME: Modifies argument. copy instead?
storage_conf_parse(char * arg,unsigned int page_size)1190 struct extstore_conf_file *storage_conf_parse(char *arg, unsigned int page_size) {
1191 struct extstore_conf_file *cf = NULL;
1192 char *b = NULL;
1193 char *p = strtok_r(arg, ":", &b);
1194 char unit = 0;
1195 uint64_t multiplier = 0;
1196 int base_size = 0;
1197 if (p == NULL)
1198 goto error;
1199 // First arg is the filepath.
1200 cf = calloc(1, sizeof(struct extstore_conf_file));
1201 cf->file = strdup(p);
1202
1203 p = strtok_r(NULL, ":", &b);
1204 if (p == NULL) {
1205 fprintf(stderr, "must supply size to ext_path, ie: ext_path=/f/e:64m (M|G|T|P supported)\n");
1206 goto error;
1207 }
1208 unit = tolower(p[strlen(p)-1]);
1209 p[strlen(p)-1] = '\0';
1210 // sigh.
1211 switch (unit) {
1212 case 'm':
1213 multiplier = 1024 * 1024;
1214 break;
1215 case 'g':
1216 multiplier = 1024 * 1024 * 1024;
1217 break;
1218 case 't':
1219 multiplier = 1024 * 1024;
1220 multiplier *= 1024 * 1024;
1221 break;
1222 case 'p':
1223 multiplier = 1024 * 1024;
1224 multiplier *= 1024 * 1024 * 1024;
1225 break;
1226 default:
1227 fprintf(stderr, "must supply size to ext_path, ie: ext_path=/f/e:64m (M|G|T|P supported)\n");
1228 goto error;
1229 }
1230 base_size = atoi(p);
1231 multiplier *= base_size;
1232 // page_count is nearest-but-not-larger-than pages * psize
1233 cf->page_count = multiplier / page_size;
1234 assert(page_size * cf->page_count <= multiplier);
1235 if (cf->page_count == 0) {
1236 fprintf(stderr, "supplied ext_path has zero size, cannot use\n");
1237 goto error;
1238 }
1239
1240 // final token would be a default free bucket
1241 p = strtok_r(NULL, ":", &b);
1242 // TODO: We reuse the original DEFINES for now,
1243 // but if lowttl gets split up this needs to be its own set.
1244 if (p != NULL) {
1245 if (strcmp(p, "compact") == 0) {
1246 cf->free_bucket = PAGE_BUCKET_COMPACT;
1247 } else if (strcmp(p, "lowttl") == 0) {
1248 cf->free_bucket = PAGE_BUCKET_LOWTTL;
1249 } else if (strcmp(p, "chunked") == 0) {
1250 cf->free_bucket = PAGE_BUCKET_CHUNKED;
1251 } else if (strcmp(p, "default") == 0) {
1252 cf->free_bucket = PAGE_BUCKET_DEFAULT;
1253 } else if (strcmp(p, "coldcompact") == 0) {
1254 cf->free_bucket = PAGE_BUCKET_COLDCOMPACT;
1255 } else if (strcmp(p, "old") == 0) {
1256 cf->free_bucket = PAGE_BUCKET_OLD;
1257 } else {
1258 fprintf(stderr, "Unknown extstore bucket: %s\n", p);
1259 goto error;
1260 }
1261 } else {
1262 // TODO: is this necessary?
1263 cf->free_bucket = PAGE_BUCKET_DEFAULT;
1264 }
1265
1266 return cf;
1267 error:
1268 if (cf) {
1269 if (cf->file)
1270 free(cf->file);
1271 free(cf);
1272 }
1273 return NULL;
1274 }
1275
1276 struct storage_settings {
1277 struct extstore_conf_file *storage_file;
1278 struct extstore_conf ext_cf;
1279 };
1280
storage_init_config(struct settings * s)1281 void *storage_init_config(struct settings *s) {
1282 struct storage_settings *cf = calloc(1, sizeof(struct storage_settings));
1283
1284 s->ext_item_size = 512;
1285 s->ext_item_age = UINT_MAX;
1286 s->ext_low_ttl = 0;
1287 s->ext_recache_rate = 2000;
1288 s->ext_max_frag = 0.8;
1289 s->ext_drop_unread = false;
1290 s->ext_wbuf_size = 1024 * 1024 * 4;
1291 s->ext_compact_under = 0;
1292 s->ext_drop_under = 0;
1293 s->ext_max_sleep = 1000000;
1294 s->slab_automove_freeratio = 0.01;
1295 s->ext_page_size = 1024 * 1024 * 64;
1296 s->ext_io_threadcount = 1;
1297 cf->ext_cf.page_size = settings.ext_page_size;
1298 cf->ext_cf.wbuf_size = settings.ext_wbuf_size;
1299 cf->ext_cf.io_threadcount = settings.ext_io_threadcount;
1300 cf->ext_cf.io_depth = 1;
1301 cf->ext_cf.page_buckets = PAGE_BUCKET_COUNT;
1302 cf->ext_cf.wbuf_count = cf->ext_cf.page_buckets;
1303
1304 return cf;
1305 }
1306
1307 // TODO: pass settings struct?
storage_read_config(void * conf,char ** subopt)1308 int storage_read_config(void *conf, char **subopt) {
1309 struct storage_settings *cf = conf;
1310 struct extstore_conf *ext_cf = &cf->ext_cf;
1311 char *subopts_value;
1312
1313 enum {
1314 EXT_PAGE_SIZE,
1315 EXT_WBUF_SIZE,
1316 EXT_THREADS,
1317 EXT_IO_DEPTH,
1318 EXT_PATH,
1319 EXT_ITEM_SIZE,
1320 EXT_ITEM_AGE,
1321 EXT_LOW_TTL,
1322 EXT_RECACHE_RATE,
1323 EXT_COMPACT_UNDER,
1324 EXT_DROP_UNDER,
1325 EXT_MAX_SLEEP,
1326 EXT_MAX_FRAG,
1327 EXT_DROP_UNREAD,
1328 SLAB_AUTOMOVE_FREERATIO, // FIXME: move this back?
1329 };
1330
1331 char *const subopts_tokens[] = {
1332 [EXT_PAGE_SIZE] = "ext_page_size",
1333 [EXT_WBUF_SIZE] = "ext_wbuf_size",
1334 [EXT_THREADS] = "ext_threads",
1335 [EXT_IO_DEPTH] = "ext_io_depth",
1336 [EXT_PATH] = "ext_path",
1337 [EXT_ITEM_SIZE] = "ext_item_size",
1338 [EXT_ITEM_AGE] = "ext_item_age",
1339 [EXT_LOW_TTL] = "ext_low_ttl",
1340 [EXT_RECACHE_RATE] = "ext_recache_rate",
1341 [EXT_COMPACT_UNDER] = "ext_compact_under",
1342 [EXT_DROP_UNDER] = "ext_drop_under",
1343 [EXT_MAX_SLEEP] = "ext_max_sleep",
1344 [EXT_MAX_FRAG] = "ext_max_frag",
1345 [EXT_DROP_UNREAD] = "ext_drop_unread",
1346 [SLAB_AUTOMOVE_FREERATIO] = "slab_automove_freeratio",
1347 NULL
1348 };
1349
1350 switch (getsubopt(subopt, subopts_tokens, &subopts_value)) {
1351 case EXT_PAGE_SIZE:
1352 if (cf->storage_file) {
1353 fprintf(stderr, "Must specify ext_page_size before any ext_path arguments\n");
1354 return 1;
1355 }
1356 if (subopts_value == NULL) {
1357 fprintf(stderr, "Missing ext_page_size argument\n");
1358 return 1;
1359 }
1360 if (!safe_strtoul(subopts_value, &ext_cf->page_size)) {
1361 fprintf(stderr, "could not parse argument to ext_page_size\n");
1362 return 1;
1363 }
1364 ext_cf->page_size *= 1024 * 1024; /* megabytes */
1365 break;
1366 case EXT_WBUF_SIZE:
1367 if (subopts_value == NULL) {
1368 fprintf(stderr, "Missing ext_wbuf_size argument\n");
1369 return 1;
1370 }
1371 if (!safe_strtoul(subopts_value, &ext_cf->wbuf_size)) {
1372 fprintf(stderr, "could not parse argument to ext_wbuf_size\n");
1373 return 1;
1374 }
1375 ext_cf->wbuf_size *= 1024 * 1024; /* megabytes */
1376 settings.ext_wbuf_size = ext_cf->wbuf_size;
1377 break;
1378 case EXT_THREADS:
1379 if (subopts_value == NULL) {
1380 fprintf(stderr, "Missing ext_threads argument\n");
1381 return 1;
1382 }
1383 if (!safe_strtoul(subopts_value, &ext_cf->io_threadcount)) {
1384 fprintf(stderr, "could not parse argument to ext_threads\n");
1385 return 1;
1386 }
1387 break;
1388 case EXT_IO_DEPTH:
1389 if (subopts_value == NULL) {
1390 fprintf(stderr, "Missing ext_io_depth argument\n");
1391 return 1;
1392 }
1393 if (!safe_strtoul(subopts_value, &ext_cf->io_depth)) {
1394 fprintf(stderr, "could not parse argument to ext_io_depth\n");
1395 return 1;
1396 }
1397 break;
1398 case EXT_ITEM_SIZE:
1399 if (subopts_value == NULL) {
1400 fprintf(stderr, "Missing ext_item_size argument\n");
1401 return 1;
1402 }
1403 if (!safe_strtoul(subopts_value, &settings.ext_item_size)) {
1404 fprintf(stderr, "could not parse argument to ext_item_size\n");
1405 return 1;
1406 }
1407 break;
1408 case EXT_ITEM_AGE:
1409 if (subopts_value == NULL) {
1410 fprintf(stderr, "Missing ext_item_age argument\n");
1411 return 1;
1412 }
1413 if (!safe_strtoul(subopts_value, &settings.ext_item_age)) {
1414 fprintf(stderr, "could not parse argument to ext_item_age\n");
1415 return 1;
1416 }
1417 break;
1418 case EXT_LOW_TTL:
1419 if (subopts_value == NULL) {
1420 fprintf(stderr, "Missing ext_low_ttl argument\n");
1421 return 1;
1422 }
1423 if (!safe_strtoul(subopts_value, &settings.ext_low_ttl)) {
1424 fprintf(stderr, "could not parse argument to ext_low_ttl\n");
1425 return 1;
1426 }
1427 break;
1428 case EXT_RECACHE_RATE:
1429 if (subopts_value == NULL) {
1430 fprintf(stderr, "Missing ext_recache_rate argument\n");
1431 return 1;
1432 }
1433 if (!safe_strtoul(subopts_value, &settings.ext_recache_rate)) {
1434 fprintf(stderr, "could not parse argument to ext_recache_rate\n");
1435 return 1;
1436 }
1437 break;
1438 case EXT_COMPACT_UNDER:
1439 if (subopts_value == NULL) {
1440 fprintf(stderr, "Missing ext_compact_under argument\n");
1441 return 1;
1442 }
1443 if (!safe_strtoul(subopts_value, &settings.ext_compact_under)) {
1444 fprintf(stderr, "could not parse argument to ext_compact_under\n");
1445 return 1;
1446 }
1447 break;
1448 case EXT_DROP_UNDER:
1449 if (subopts_value == NULL) {
1450 fprintf(stderr, "Missing ext_drop_under argument\n");
1451 return 1;
1452 }
1453 if (!safe_strtoul(subopts_value, &settings.ext_drop_under)) {
1454 fprintf(stderr, "could not parse argument to ext_drop_under\n");
1455 return 1;
1456 }
1457 break;
1458 case EXT_MAX_SLEEP:
1459 if (subopts_value == NULL) {
1460 fprintf(stderr, "Missing ext_max_sleep argument\n");
1461 return 1;
1462 }
1463 if (!safe_strtoul(subopts_value, &settings.ext_max_sleep)) {
1464 fprintf(stderr, "could not parse argument to ext_max_sleep\n");
1465 return 1;
1466 }
1467 break;
1468 case EXT_MAX_FRAG:
1469 if (subopts_value == NULL) {
1470 fprintf(stderr, "Missing ext_max_frag argument\n");
1471 return 1;
1472 }
1473 if (!safe_strtod(subopts_value, &settings.ext_max_frag)) {
1474 fprintf(stderr, "could not parse argument to ext_max_frag\n");
1475 return 1;
1476 }
1477 break;
1478 case SLAB_AUTOMOVE_FREERATIO:
1479 if (subopts_value == NULL) {
1480 fprintf(stderr, "Missing slab_automove_freeratio argument\n");
1481 return 1;
1482 }
1483 if (!safe_strtod(subopts_value, &settings.slab_automove_freeratio)) {
1484 fprintf(stderr, "could not parse argument to slab_automove_freeratio\n");
1485 return 1;
1486 }
1487 break;
1488 case EXT_DROP_UNREAD:
1489 settings.ext_drop_unread = true;
1490 break;
1491 case EXT_PATH:
1492 if (subopts_value) {
1493 struct extstore_conf_file *tmp = storage_conf_parse(subopts_value, ext_cf->page_size);
1494 if (tmp == NULL) {
1495 fprintf(stderr, "failed to parse ext_path argument\n");
1496 return 1;
1497 }
1498 if (cf->storage_file != NULL) {
1499 tmp->next = cf->storage_file;
1500 }
1501 cf->storage_file = tmp;
1502 } else {
1503 fprintf(stderr, "missing argument to ext_path, ie: ext_path=/d/file:5G\n");
1504 return 1;
1505 }
1506 break;
1507 default:
1508 fprintf(stderr, "Illegal suboption \"%s\"\n", subopts_value);
1509 return 1;
1510 }
1511
1512 return 0;
1513 }
1514
storage_check_config(void * conf)1515 int storage_check_config(void *conf) {
1516 struct storage_settings *cf = conf;
1517 struct extstore_conf *ext_cf = &cf->ext_cf;
1518
1519 if (cf->storage_file) {
1520 if (settings.item_size_max > ext_cf->wbuf_size) {
1521 fprintf(stderr, "-I (item_size_max: %d) cannot be larger than ext_wbuf_size: %d\n",
1522 settings.item_size_max, ext_cf->wbuf_size);
1523 return 1;
1524 }
1525
1526 if (settings.udpport) {
1527 fprintf(stderr, "Cannot use UDP with extstore enabled (-U 0 to disable)\n");
1528 return 1;
1529 }
1530
1531 return 0;
1532 }
1533
1534 return 2;
1535 }
1536
storage_init(void * conf)1537 void *storage_init(void *conf) {
1538 struct storage_settings *cf = conf;
1539 struct extstore_conf *ext_cf = &cf->ext_cf;
1540
1541 enum extstore_res eres;
1542 void *storage = NULL;
1543 if (settings.ext_compact_under == 0) {
1544 // If changing the default fraction, change the help text as well.
1545 settings.ext_compact_under = cf->storage_file->page_count * 0.01;
1546 settings.ext_drop_under = cf->storage_file->page_count * 0.01;
1547 if (settings.ext_compact_under < 1) {
1548 settings.ext_compact_under = 1;
1549 }
1550 if (settings.ext_drop_under < 1) {
1551 settings.ext_drop_under = 1;
1552 }
1553 }
1554 crc32c_init();
1555
1556 settings.ext_global_pool_min = 0;
1557 storage = extstore_init(cf->storage_file, ext_cf, &eres);
1558 if (storage == NULL) {
1559 fprintf(stderr, "Failed to initialize external storage: %s\n",
1560 extstore_err(eres));
1561 if (eres == EXTSTORE_INIT_OPEN_FAIL) {
1562 perror("extstore open");
1563 }
1564 return NULL;
1565 }
1566
1567 return storage;
1568 }
1569
1570 #endif
1571