1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 
3 #include "config.h"
4 // FIXME: config.h?
5 #include <stdint.h>
6 #include <stdbool.h>
7 // end FIXME
8 #include <stdlib.h>
9 #include <limits.h>
10 #include <pthread.h>
11 #include <sys/types.h>
12 #include <sys/stat.h>
13 #include <sys/uio.h>
14 #include <fcntl.h>
15 #include <unistd.h>
16 #include <stdio.h>
17 #include <string.h>
18 #include <assert.h>
19 #include "extstore.h"
20 
21 // TODO: better if an init option turns this on/off.
22 #ifdef EXTSTORE_DEBUG
23 #define E_DEBUG(...) \
24     do { \
25         fprintf(stderr, __VA_ARGS__); \
26     } while (0)
27 #else
28 #define E_DEBUG(...)
29 #endif
30 
31 #define STAT_L(e) pthread_mutex_lock(&e->stats_mutex);
32 #define STAT_UL(e) pthread_mutex_unlock(&e->stats_mutex);
33 #define STAT_INCR(e, stat, amount) { \
34     pthread_mutex_lock(&e->stats_mutex); \
35     e->stats.stat += amount; \
36     pthread_mutex_unlock(&e->stats_mutex); \
37 }
38 
39 #define STAT_DECR(e, stat, amount) { \
40     pthread_mutex_lock(&e->stats_mutex); \
41     e->stats.stat -= amount; \
42     pthread_mutex_unlock(&e->stats_mutex); \
43 }
44 
45 typedef struct __store_wbuf {
46     struct __store_wbuf *next;
47     char *buf;
48     char *buf_pos;
49     unsigned int free;
50     unsigned int size;
51     unsigned int offset; /* offset into page this write starts at */
52     bool full; /* done writing to this page */
53     bool flushed; /* whether wbuf has been flushed to disk */
54 } _store_wbuf;
55 
56 typedef struct _store_page {
57     pthread_mutex_t mutex; /* Need to be held for most operations */
58     uint64_t obj_count; /* _delete can decrease post-closing */
59     uint64_t bytes_used; /* _delete can decrease post-closing */
60     uint64_t offset; /* starting address of page within fd */
61     unsigned int version;
62     unsigned int refcount;
63     unsigned int allocated;
64     unsigned int written; /* item offsets can be past written if wbuf not flushed */
65     unsigned int bucket; /* which bucket the page is linked into */
66     unsigned int free_bucket; /* which bucket this page returns to when freed */
67     int fd;
68     unsigned short id;
69     bool active; /* actively being written to */
70     bool closed; /* closed and draining before free */
71     bool free; /* on freelist */
72     _store_wbuf *wbuf; /* currently active wbuf from the stack */
73     struct _store_page *next;
74 } store_page;
75 
76 typedef struct store_engine store_engine;
77 typedef struct {
78     pthread_mutex_t mutex;
79     pthread_cond_t cond;
80     obj_io *queue;
81     obj_io *queue_tail;
82     store_engine *e;
83     unsigned int depth; // queue depth
84 } store_io_thread;
85 
86 // sub-struct for maintenance related tasks.
87 struct store_maint {
88     pthread_mutex_t mutex;
89 };
90 
91 struct store_engine {
92     pthread_mutex_t mutex; /* covers internal stacks and variables */
93     store_page *pages; /* directly addressable page list */
94     _store_wbuf *wbuf_stack; /* wbuf freelist */
95     obj_io *io_stack; /* IO's to use with submitting wbuf's */
96     store_io_thread *io_threads;
97     store_io_thread *bg_thread; /* dedicated thread for write submit / compact ops */
98     store_page **page_buckets; /* stack of pages currently allocated to each bucket */
99     store_page **free_page_buckets; /* stack of use-case isolated free pages */
100     size_t page_size;
101     unsigned int version; /* global version counter */
102     unsigned int last_io_thread; /* round robin the IO threads */
103     unsigned int io_threadcount; /* count of IO threads */
104     unsigned int page_count;
105     unsigned int page_free; /* unallocated pages */
106     unsigned int page_bucketcount; /* count of potential page buckets */
107     unsigned int free_page_bucketcount; /* count of free page buckets */
108     unsigned int io_depth; /* FIXME: Might cache into thr struct */
109     pthread_mutex_t stats_mutex;
110     struct extstore_stats stats;
111     struct store_maint maint;
112 };
113 
114 // FIXME: code is duplicated from thread.c since extstore.c doesn't pull in
115 // the memcached ecosystem. worth starting a cross-utility header with static
116 // definitions/macros?
117 // keeping a minimal func here for now.
118 #define THR_NAME_MAXLEN 16
thread_setname(pthread_t thread,const char * name)119 static void thread_setname(pthread_t thread, const char *name) {
120 assert(strlen(name) < THR_NAME_MAXLEN);
121 #if defined(__linux__) && defined(HAVE_PTHREAD_SETNAME_NP)
122 pthread_setname_np(thread, name);
123 #endif
124 }
125 #undef THR_NAME_MAXLEN
126 
wbuf_new(size_t size)127 static _store_wbuf *wbuf_new(size_t size) {
128     _store_wbuf *b = calloc(1, sizeof(_store_wbuf));
129     if (b == NULL)
130         return NULL;
131     b->buf = calloc(size, sizeof(char));
132     if (b->buf == NULL) {
133         free(b);
134         return NULL;
135     }
136     b->buf_pos = b->buf;
137     b->free = size;
138     b->size = size;
139     return b;
140 }
141 
_get_io_thread(store_engine * e)142 static store_io_thread *_get_io_thread(store_engine *e) {
143     int tid = -1;
144     long long int low = LLONG_MAX;
145     pthread_mutex_lock(&e->mutex);
146     // find smallest queue. ignoring lock since being wrong isn't fatal.
147     // TODO: if average queue depth can be quickly tracked, can break as soon
148     // as we see a thread that's less than average, and start from last_io_thread
149     for (int x = 0; x < e->io_threadcount; x++) {
150         if (e->io_threads[x].depth == 0) {
151             tid = x;
152             break;
153         } else if (e->io_threads[x].depth < low) {
154                 tid = x;
155             low = e->io_threads[x].depth;
156         }
157     }
158     pthread_mutex_unlock(&e->mutex);
159 
160     return &e->io_threads[tid];
161 }
162 
_next_version(store_engine * e)163 static uint64_t _next_version(store_engine *e) {
164     return e->version++;
165 }
166 // internal only method for freeing a page up
167 static void _free_page(store_engine *e, store_page *p);
168 
169 static void *extstore_io_thread(void *arg);
170 
171 /* Copies stats internal to engine and computes any derived values */
extstore_get_stats(void * ptr,struct extstore_stats * st)172 void extstore_get_stats(void *ptr, struct extstore_stats *st) {
173     store_engine *e = (store_engine *)ptr;
174     STAT_L(e);
175     memcpy(st, &e->stats, sizeof(struct extstore_stats));
176     STAT_UL(e);
177 
178     // grab pages_free/pages_used
179     pthread_mutex_lock(&e->mutex);
180     st->pages_free = e->page_free;
181     st->pages_used = e->page_count - e->page_free;
182     pthread_mutex_unlock(&e->mutex);
183     st->io_queue = 0;
184     for (int x = 0; x < e->io_threadcount; x++) {
185         pthread_mutex_lock(&e->io_threads[x].mutex);
186         st->io_queue += e->io_threads[x].depth;
187         pthread_mutex_unlock(&e->io_threads[x].mutex);
188     }
189     // calculate bytes_fragmented.
190     // note that open and yet-filled pages count against fragmentation.
191     st->bytes_fragmented = st->pages_used * e->page_size -
192         st->bytes_used;
193 }
194 
extstore_get_page_data(void * ptr,struct extstore_stats * st)195 void extstore_get_page_data(void *ptr, struct extstore_stats *st) {
196     store_engine *e = (store_engine *)ptr;
197     pthread_mutex_lock(&e->maint.mutex);
198     struct extstore_page_data *pd = st->page_data;
199 
200     for (int i = 0; i < e->page_count; i++) {
201         store_page *p = &e->pages[i];
202         pthread_mutex_lock(&p->mutex);
203 
204         pd[p->id].free_bucket = p->free_bucket;
205         pd[p->id].version = p->version;
206         pd[p->id].bytes_used = p->bytes_used;
207         if (p->active) {
208             pd[p->id].active = true;
209         }
210         if (p->active || p->free) {
211             pthread_mutex_unlock(&p->mutex);
212             continue;
213         }
214         if (p->obj_count > 0 && !p->closed) {
215             pd[p->id].bucket = p->bucket;
216         }
217         if ((p->obj_count == 0 || p->closed) && p->refcount == 0) {
218             _free_page(e, p);
219         }
220         pthread_mutex_unlock(&p->mutex);
221     }
222 
223     pthread_mutex_unlock(&e->maint.mutex);
224 }
225 
extstore_err(enum extstore_res res)226 const char *extstore_err(enum extstore_res res) {
227     const char *rv = "unknown error";
228     switch (res) {
229         case EXTSTORE_INIT_BAD_WBUF_SIZE:
230             rv = "page_size must be divisible by wbuf_size";
231             break;
232         case EXTSTORE_INIT_NEED_MORE_WBUF:
233             rv = "wbuf_count must be >= page_buckets";
234             break;
235         case EXTSTORE_INIT_NEED_MORE_BUCKETS:
236             rv = "page_buckets must be > 0";
237             break;
238         case EXTSTORE_INIT_PAGE_WBUF_ALIGNMENT:
239             rv = "page_size and wbuf_size must be divisible by 1024*1024*2";
240             break;
241         case EXTSTORE_INIT_TOO_MANY_PAGES:
242             rv = "page_count must total to < 65536. Increase page_size or lower path sizes";
243             break;
244         case EXTSTORE_INIT_OOM:
245             rv = "failed calloc for engine";
246             break;
247         case EXTSTORE_INIT_OPEN_FAIL:
248             rv = "failed to open file";
249             break;
250         case EXTSTORE_INIT_THREAD_FAIL:
251             break;
252     }
253     return rv;
254 }
255 
256 // TODO: #define's for DEFAULT_BUCKET, FREE_VERSION, etc
extstore_init(struct extstore_conf_file * fh,struct extstore_conf * cf,enum extstore_res * res)257 void *extstore_init(struct extstore_conf_file *fh, struct extstore_conf *cf,
258         enum extstore_res *res) {
259     int i;
260     struct extstore_conf_file *f = NULL;
261     pthread_t thread;
262 
263     if (cf->page_size % cf->wbuf_size != 0) {
264         *res = EXTSTORE_INIT_BAD_WBUF_SIZE;
265         return NULL;
266     }
267     // Should ensure at least one write buffer per potential page
268     if (cf->page_buckets > cf->wbuf_count) {
269         *res = EXTSTORE_INIT_NEED_MORE_WBUF;
270         return NULL;
271     }
272     if (cf->page_buckets < 1) {
273         *res = EXTSTORE_INIT_NEED_MORE_BUCKETS;
274         return NULL;
275     }
276 
277     // TODO: More intelligence around alignment of flash erasure block sizes
278     if (cf->page_size % (1024 * 1024 * 2) != 0 ||
279         cf->wbuf_size % (1024 * 1024 * 2) != 0) {
280         *res = EXTSTORE_INIT_PAGE_WBUF_ALIGNMENT;
281         return NULL;
282     }
283 
284     store_engine *e = calloc(1, sizeof(store_engine));
285     if (e == NULL) {
286         *res = EXTSTORE_INIT_OOM;
287         return NULL;
288     }
289 
290     e->page_size = cf->page_size;
291     uint64_t temp_page_count = 0;
292     for (f = fh; f != NULL; f = f->next) {
293         f->fd = open(f->file, O_RDWR | O_CREAT, 0644);
294         if (f->fd < 0) {
295             *res = EXTSTORE_INIT_OPEN_FAIL;
296 #ifdef EXTSTORE_DEBUG
297             perror("extstore open");
298 #endif
299             free(e);
300             return NULL;
301         }
302         // use an fcntl lock to help avoid double starting.
303         struct flock lock;
304         lock.l_type = F_WRLCK;
305         lock.l_start = 0;
306         lock.l_whence = SEEK_SET;
307         lock.l_len = 0;
308         if (fcntl(f->fd, F_SETLK, &lock) < 0) {
309             *res = EXTSTORE_INIT_OPEN_FAIL;
310             free(e);
311             return NULL;
312         }
313         if (ftruncate(f->fd, 0) < 0) {
314             *res = EXTSTORE_INIT_OPEN_FAIL;
315             free(e);
316             return NULL;
317         }
318 
319         temp_page_count += f->page_count;
320         f->offset = 0;
321     }
322 
323     if (temp_page_count >= UINT16_MAX) {
324         *res = EXTSTORE_INIT_TOO_MANY_PAGES;
325         free(e);
326         return NULL;
327     }
328     e->page_count = temp_page_count;
329 
330     e->pages = calloc(e->page_count, sizeof(store_page));
331     if (e->pages == NULL) {
332         *res = EXTSTORE_INIT_OOM;
333         // FIXME: loop-close. make error label
334         free(e);
335         return NULL;
336     }
337 
338     // interleave the pages between devices
339     f = NULL; // start at the first device.
340     for (i = 0; i < e->page_count; i++) {
341         // find next device with available pages
342         while (1) {
343             // restart the loop
344             if (f == NULL || f->next == NULL) {
345                 f = fh;
346             } else {
347                 f = f->next;
348             }
349             if (f->page_count) {
350                 f->page_count--;
351                 break;
352             }
353         }
354         pthread_mutex_init(&e->pages[i].mutex, NULL);
355         e->pages[i].id = i;
356         e->pages[i].fd = f->fd;
357         e->pages[i].free_bucket = f->free_bucket;
358         e->pages[i].offset = f->offset;
359         e->pages[i].free = true;
360         f->offset += e->page_size;
361     }
362 
363     // free page buckets allows the app to organize devices by use case
364     e->free_page_buckets = calloc(cf->page_buckets, sizeof(store_page *));
365     e->free_page_bucketcount = cf->page_buckets;
366 
367     e->page_free = e->page_count;
368     for (i = e->page_count-1; i >= 0; i--) {
369         int fb = e->pages[i].free_bucket;
370         e->pages[i].next = e->free_page_buckets[fb];
371         e->free_page_buckets[fb] = &e->pages[i];
372     }
373 
374     // 0 is magic "page is freed" version
375     e->version = 1;
376 
377     // scratch data for stats. TODO: malloc failure handle
378     e->stats.page_data =
379         calloc(e->page_count, sizeof(struct extstore_page_data));
380     e->stats.page_count = e->page_count;
381     e->stats.page_size = e->page_size;
382 
383     // page buckets lazily have pages assigned into them
384     e->page_buckets = calloc(cf->page_buckets, sizeof(store_page *));
385     e->page_bucketcount = cf->page_buckets;
386 
387     // allocate write buffers
388     // also IO's to use for shipping to IO thread
389     for (i = 0; i < cf->wbuf_count; i++) {
390         _store_wbuf *w = wbuf_new(cf->wbuf_size);
391         obj_io *io = calloc(1, sizeof(obj_io));
392         /* TODO: on error, loop again and free stack. */
393         w->next = e->wbuf_stack;
394         e->wbuf_stack = w;
395         io->next = e->io_stack;
396         e->io_stack = io;
397     }
398 
399     pthread_mutex_init(&e->mutex, NULL);
400     pthread_mutex_init(&e->stats_mutex, NULL);
401     pthread_mutex_init(&e->maint.mutex, NULL);
402 
403     e->io_depth = cf->io_depth;
404 
405     // spawn threads
406     e->io_threads = calloc(cf->io_threadcount, sizeof(store_io_thread));
407     for (i = 0; i < cf->io_threadcount; i++) {
408         pthread_mutex_init(&e->io_threads[i].mutex, NULL);
409         pthread_cond_init(&e->io_threads[i].cond, NULL);
410         e->io_threads[i].e = e;
411         // FIXME: error handling
412         pthread_create(&thread, NULL, extstore_io_thread, &e->io_threads[i]);
413         thread_setname(thread, "mc-ext-io");
414     }
415     e->io_threadcount = cf->io_threadcount;
416 
417     // dedicated IO thread for certain non-hotpath functions.
418     e->bg_thread = calloc(1, sizeof(store_io_thread));
419     e->bg_thread->e = e;
420     pthread_mutex_init(&e->bg_thread->mutex, NULL);
421     pthread_cond_init(&e->bg_thread->cond, NULL);
422     pthread_create(&thread, NULL, extstore_io_thread, e->bg_thread);
423     thread_setname(thread, "mc-ext-bgio");
424 
425     return (void *)e;
426 }
427 
428 // Call without *e locked, not a fast function.
_evict_page(store_engine * e,unsigned int bucket,unsigned int free_bucket)429 static void _evict_page(store_engine *e, unsigned int bucket,
430         unsigned int free_bucket) {
431     struct extstore_stats st;
432     st.page_data = calloc(e->page_count, sizeof(struct extstore_page_data));
433     extstore_get_page_data(e, &st);
434     uint64_t low_version = ULLONG_MAX;
435     unsigned int low_page = 0;
436 
437     // find lowest version of anything in free_bucket OR 0
438     // unless free_bucket is 0
439     for (int i = 0; i < e->page_count; i++) {
440         // must belong to 0 or the requested free_bucket
441         if (st.page_data[i].free_bucket &&
442             st.page_data[i].free_bucket != free_bucket) {
443             continue;
444         }
445 
446         // found a free page, don't evict.
447         if (st.page_data[i].version == 0) {
448             low_version = ULLONG_MAX;
449             break;
450         }
451 
452         // find the lowest version.
453         if (!st.page_data[i].active &&
454                 st.page_data[i].version < low_version) {
455             low_page = i;
456             low_version = st.page_data[i].version;
457         }
458     }
459 
460     if (low_version != ULLONG_MAX) {
461         extstore_evict_page(e, low_page, low_version);
462     }
463 }
464 
465 // call with *e locked
_allocate_page(store_engine * e,unsigned int bucket,unsigned int free_bucket)466 static store_page *_allocate_page(store_engine *e, unsigned int bucket,
467         unsigned int free_bucket) {
468     E_DEBUG("EXTSTORE: allocating new page [bucket:%u]\n", bucket);
469     assert(!e->page_buckets[bucket] || e->page_buckets[bucket]->allocated == e->page_size);
470     store_page *tmp = NULL;
471     if (e->free_page_buckets[free_bucket] != NULL) {
472         assert(e->page_free > 0);
473         tmp = e->free_page_buckets[free_bucket];
474         e->free_page_buckets[free_bucket] = tmp->next;
475     } else if (e->free_page_buckets[0] != NULL) {
476         // fall back to default bucket.
477         assert(e->page_free > 0);
478         tmp = e->free_page_buckets[0];
479         e->free_page_buckets[0] = tmp->next;
480     }
481     if (tmp != NULL) {
482         tmp->next = e->page_buckets[bucket];
483         e->page_buckets[bucket] = tmp;
484         tmp->active = true;
485         tmp->free = false;
486         tmp->closed = false;
487         tmp->version = _next_version(e);
488         tmp->bucket = bucket;
489         e->page_free--;
490         STAT_INCR(e, page_allocs, 1);
491     }
492 
493     if (tmp)
494         E_DEBUG("EXTSTORE: got page %u [free:%u]\n", tmp->id, e->page_free);
495     return tmp;
496 }
497 
498 // call with *p locked. locks *e
_allocate_wbuf(store_engine * e,store_page * p)499 static void _allocate_wbuf(store_engine *e, store_page *p) {
500     _store_wbuf *wbuf = NULL;
501     assert(!p->wbuf);
502     pthread_mutex_lock(&e->mutex);
503     if (e->wbuf_stack) {
504         wbuf = e->wbuf_stack;
505         e->wbuf_stack = wbuf->next;
506         wbuf->next = 0;
507     }
508     pthread_mutex_unlock(&e->mutex);
509     if (wbuf) {
510         wbuf->offset = p->allocated;
511         p->allocated += wbuf->size;
512         wbuf->free = wbuf->size;
513         wbuf->buf_pos = wbuf->buf;
514         wbuf->full = false;
515         wbuf->flushed = false;
516 
517         p->wbuf = wbuf;
518     }
519 }
520 
521 /* callback after wbuf is flushed. can only remove wbuf's from the head onward
522  * if successfully flushed, which complicates this routine. each callback
523  * attempts to free the wbuf stack, which is finally done when the head wbuf's
524  * callback happens.
525  * It's rare flushes would happen out of order.
526  */
_wbuf_cb(void * ep,obj_io * io,int ret)527 static void _wbuf_cb(void *ep, obj_io *io, int ret) {
528     store_engine *e = (store_engine *)ep;
529     store_page *p = &e->pages[io->page_id];
530     _store_wbuf *w = (_store_wbuf *) io->data;
531 
532     // TODO: Examine return code. Not entirely sure how to handle errors.
533     // Naive first-pass should probably cause the page to close/free.
534     w->flushed = true;
535     pthread_mutex_lock(&p->mutex);
536     assert(p->wbuf != NULL && p->wbuf == w);
537     assert(p->written == w->offset);
538     p->written += w->size;
539     p->wbuf = NULL;
540 
541     if (p->written == e->page_size)
542         p->active = false;
543 
544     // return the wbuf
545     pthread_mutex_lock(&e->mutex);
546     w->next = e->wbuf_stack;
547     e->wbuf_stack = w;
548     // also return the IO we just used.
549     io->next = e->io_stack;
550     e->io_stack = io;
551     pthread_mutex_unlock(&e->mutex);
552     pthread_mutex_unlock(&p->mutex);
553 }
554 
555 /* Wraps pages current wbuf in an io and submits to IO thread.
556  * Called with p locked, locks e.
557  */
_submit_wbuf(store_engine * e,store_page * p)558 static void _submit_wbuf(store_engine *e, store_page *p) {
559     _store_wbuf *w;
560     pthread_mutex_lock(&e->mutex);
561     obj_io *io = e->io_stack;
562     e->io_stack = io->next;
563     pthread_mutex_unlock(&e->mutex);
564     w = p->wbuf;
565 
566     // zero out the end of the wbuf to allow blind readback of data.
567     memset(w->buf + (w->size - w->free), 0, w->free);
568 
569     io->next = NULL;
570     io->mode = OBJ_IO_WRITE;
571     io->page_id = p->id;
572     io->data = w;
573     io->offset = w->offset;
574     io->len = w->size;
575     io->buf = w->buf;
576     io->cb = _wbuf_cb;
577 
578     extstore_submit_bg(e, io);
579 }
580 
581 /* engine write function; takes engine, item_io.
582  * fast fail if no available write buffer (flushing)
583  * lock engine context, find active page, unlock
584  * if page full, submit page/buffer to io thread.
585  *
586  * write is designed to be flaky; if page full, caller must try again to get
587  * new page. best if used from a background thread that can harmlessly retry.
588  */
589 
extstore_write_request(void * ptr,unsigned int bucket,unsigned int free_bucket,obj_io * io)590 int extstore_write_request(void *ptr, unsigned int bucket,
591         unsigned int free_bucket, obj_io *io) {
592     store_engine *e = (store_engine *)ptr;
593     store_page *p;
594     int ret = -1;
595     if (bucket >= e->page_bucketcount)
596         return ret;
597 
598     pthread_mutex_lock(&e->mutex);
599     p = e->page_buckets[bucket];
600     if (!p) {
601         p = _allocate_page(e, bucket, free_bucket);
602     }
603     pthread_mutex_unlock(&e->mutex);
604     if (!p) {
605         _evict_page(e, bucket, free_bucket);
606         return ret;
607     }
608 
609     pthread_mutex_lock(&p->mutex);
610 
611     // FIXME: can't null out page_buckets!!!
612     // page is full, clear bucket and retry later.
613     if (!p->active ||
614             ((!p->wbuf || p->wbuf->full) && p->allocated >= e->page_size)) {
615         pthread_mutex_unlock(&p->mutex);
616         pthread_mutex_lock(&e->mutex);
617         store_page *temp_p = _allocate_page(e, bucket, free_bucket);
618         pthread_mutex_unlock(&e->mutex);
619         if (!temp_p) {
620             _evict_page(e, bucket, free_bucket);
621         }
622         return ret;
623     }
624 
625     // if io won't fit, submit IO for wbuf and find new one.
626     if (p->wbuf && p->wbuf->free < io->len && !p->wbuf->full) {
627         _submit_wbuf(e, p);
628         p->wbuf->full = true;
629     }
630 
631     if (!p->wbuf && p->allocated < e->page_size) {
632         _allocate_wbuf(e, p);
633     }
634 
635     // hand over buffer for caller to copy into
636     // leaves p locked.
637     if (p->wbuf && !p->wbuf->full && p->wbuf->free >= io->len) {
638         io->buf = p->wbuf->buf_pos;
639         io->page_id = p->id;
640         return 0;
641     }
642 
643     pthread_mutex_unlock(&p->mutex);
644     // p->written is incremented post-wbuf flush
645     return ret;
646 }
647 
648 /* _must_ be called after a successful write_request.
649  * fills the rest of io structure.
650  */
extstore_write(void * ptr,obj_io * io)651 void extstore_write(void *ptr, obj_io *io) {
652     store_engine *e = (store_engine *)ptr;
653     store_page *p = &e->pages[io->page_id];
654 
655     io->offset = p->wbuf->offset + (p->wbuf->size - p->wbuf->free);
656     io->page_version = p->version;
657     p->wbuf->buf_pos += io->len;
658     p->wbuf->free -= io->len;
659     p->bytes_used += io->len;
660     p->obj_count++;
661     STAT_L(e);
662     e->stats.bytes_written += io->len;
663     e->stats.bytes_used += io->len;
664     e->stats.objects_written++;
665     e->stats.objects_used++;
666     STAT_UL(e);
667 
668     pthread_mutex_unlock(&p->mutex);
669 }
670 
671 /* engine submit function; takes engine, item_io stack.
672  * lock io_thread context and add stack
673  * signal io thread to wake.
674  * return success.
675  */
_extstore_submit(void * ptr,obj_io * io,store_io_thread * t)676 static int _extstore_submit(void *ptr, obj_io *io, store_io_thread *t) {
677     unsigned int depth = 0;
678     obj_io *tio = io;
679     obj_io *tail = NULL;
680     while (tio != NULL) {
681         tail = tio; // keep updating potential tail.
682         depth++;
683         tio = tio->next;
684     }
685 
686     pthread_mutex_lock(&t->mutex);
687 
688     t->depth += depth;
689     if (t->queue == NULL) {
690         t->queue = io;
691         t->queue_tail = tail;
692     } else {
693         // Have to put the *io stack at the end of current queue.
694         assert(tail->next == NULL);
695         assert(t->queue_tail->next == NULL);
696         t->queue_tail->next = io;
697         t->queue_tail = tail;
698     }
699 
700     pthread_mutex_unlock(&t->mutex);
701 
702     //pthread_mutex_lock(&t->mutex);
703     pthread_cond_signal(&t->cond);
704     //pthread_mutex_unlock(&t->mutex);
705     return 0;
706 }
707 
extstore_submit(void * ptr,obj_io * io)708 int extstore_submit(void *ptr, obj_io *io) {
709     store_engine *e = (store_engine *)ptr;
710     store_io_thread *t = _get_io_thread(e);
711     return _extstore_submit(ptr, io, t);
712 }
713 
extstore_submit_bg(void * ptr,obj_io * io)714 int extstore_submit_bg(void *ptr, obj_io *io) {
715     store_engine *e = (store_engine *)ptr;
716     store_io_thread *t = e->bg_thread;
717     return _extstore_submit(ptr, io, t);
718 }
719 
720 /* engine note delete function: takes engine, page id, size?
721  * note that an item in this page is no longer valid
722  */
extstore_delete(void * ptr,unsigned int page_id,uint64_t page_version,unsigned int count,unsigned int bytes)723 int extstore_delete(void *ptr, unsigned int page_id, uint64_t page_version,
724         unsigned int count, unsigned int bytes) {
725     store_engine *e = (store_engine *)ptr;
726     // FIXME: validate page_id in bounds
727     store_page *p = &e->pages[page_id];
728     int ret = 0;
729 
730     pthread_mutex_lock(&p->mutex);
731     if (!p->closed && p->version == page_version) {
732         if (p->bytes_used >= bytes) {
733             p->bytes_used -= bytes;
734         } else {
735             p->bytes_used = 0;
736         }
737 
738         if (p->obj_count >= count) {
739             p->obj_count -= count;
740         } else {
741             p->obj_count = 0; // caller has bad accounting?
742         }
743         STAT_L(e);
744         e->stats.bytes_used -= bytes;
745         e->stats.objects_used -= count;
746         STAT_UL(e);
747 
748         if (p->obj_count == 0 && p->refcount == 0 && !p->active) {
749             _free_page(e, p);
750         }
751     } else {
752         ret = -1;
753     }
754     pthread_mutex_unlock(&p->mutex);
755     return ret;
756 }
757 
extstore_check(void * ptr,unsigned int page_id,uint64_t page_version)758 int extstore_check(void *ptr, unsigned int page_id, uint64_t page_version) {
759     store_engine *e = (store_engine *)ptr;
760     store_page *p = &e->pages[page_id];
761     int ret = 0;
762 
763     pthread_mutex_lock(&p->mutex);
764     if (p->version != page_version)
765         ret = -1;
766     pthread_mutex_unlock(&p->mutex);
767     return ret;
768 }
769 
770 /* allows a compactor to say "we're done with this page, kill it." */
extstore_close_page(void * ptr,unsigned int page_id,uint64_t page_version)771 void extstore_close_page(void *ptr, unsigned int page_id, uint64_t page_version) {
772     store_engine *e = (store_engine *)ptr;
773     store_page *p = &e->pages[page_id];
774 
775     pthread_mutex_lock(&p->mutex);
776     if (!p->closed && !p->active && p->version == page_version) {
777         p->closed = true;
778         if (p->refcount == 0) {
779             _free_page(e, p);
780         }
781     }
782     pthread_mutex_unlock(&p->mutex);
783 }
784 
785 /* signal that we've forcefully ejected rather than gracefully closed */
extstore_evict_page(void * ptr,unsigned int page_id,uint64_t page_version)786 void extstore_evict_page(void *ptr, unsigned int page_id, uint64_t page_version) {
787     store_engine *e = (store_engine *)ptr;
788     store_page *p = &e->pages[page_id];
789 
790     pthread_mutex_lock(&p->mutex);
791     if (!p->closed && !p->active && p->version == page_version) {
792         E_DEBUG("EXTSTORE: evicting page [%d] [v: %llu]\n",
793                 p->id, (unsigned long long) p->version);
794 
795         p->closed = true;
796         STAT_L(e);
797         e->stats.page_evictions++;
798         e->stats.objects_evicted += p->obj_count;
799         e->stats.bytes_evicted += p->bytes_used;
800         STAT_UL(e);
801         if (p->refcount == 0) {
802             _free_page(e, p);
803         }
804     }
805     pthread_mutex_unlock(&p->mutex);
806 }
807 
808 /* Finds an attached wbuf that can satisfy the read.
809  * Since wbufs can potentially be flushed to disk out of order, they are only
810  * removed as the head of the list successfully flushes to disk.
811  */
812 // call with *p locked
813 // FIXME: protect from reading past wbuf
_read_from_wbuf(store_page * p,obj_io * io)814 static inline int _read_from_wbuf(store_page *p, obj_io *io) {
815     _store_wbuf *wbuf = p->wbuf;
816     assert(wbuf != NULL);
817     assert(io->offset < p->written + wbuf->size);
818     if (io->iov == NULL) {
819         memcpy(io->buf, wbuf->buf + (io->offset - wbuf->offset), io->len);
820     } else {
821         int x;
822         unsigned int off = io->offset - wbuf->offset;
823         // need to loop fill iovecs
824         for (x = 0; x < io->iovcnt; x++) {
825             struct iovec *iov = &io->iov[x];
826             memcpy(iov->iov_base, wbuf->buf + off, iov->iov_len);
827             off += iov->iov_len;
828         }
829     }
830     return io->len;
831 }
832 
833 /* engine IO thread; takes engine context
834  * manage writes/reads
835  * runs IO callbacks inline after each IO
836  */
837 // FIXME: protect from reading past page
extstore_io_thread(void * arg)838 static void *extstore_io_thread(void *arg) {
839     store_io_thread *me = (store_io_thread *)arg;
840     store_engine *e = me->e;
841     while (1) {
842         obj_io *io_stack = NULL;
843         pthread_mutex_lock(&me->mutex);
844         if (me->queue == NULL) {
845             pthread_cond_wait(&me->cond, &me->mutex);
846         }
847 
848         // Pull and disconnect a batch from the queue
849         // Chew small batches from the queue so the IO thread picker can keep
850         // the IO queue depth even, instead of piling on threads one at a time
851         // as they gobble a queue.
852         if (me->queue != NULL) {
853             int i;
854             obj_io *end = NULL;
855             io_stack = me->queue;
856             end = io_stack;
857             for (i = 1; i < e->io_depth; i++) {
858                 if (end->next) {
859                     end = end->next;
860                 } else {
861                     me->queue_tail = end->next;
862                     break;
863                 }
864             }
865             me->depth -= i;
866             me->queue = end->next;
867             end->next = NULL;
868         }
869         pthread_mutex_unlock(&me->mutex);
870 
871         obj_io *cur_io = io_stack;
872         while (cur_io) {
873             // We need to note next before the callback in case the obj_io
874             // gets reused.
875             obj_io *next = cur_io->next;
876             int ret = 0;
877             int do_op = 1;
878             store_page *p = &e->pages[cur_io->page_id];
879             // TODO: loop if not enough bytes were read/written.
880             switch (cur_io->mode) {
881                 case OBJ_IO_READ:
882                     // Page is currently open. deal if read is past the end.
883                     pthread_mutex_lock(&p->mutex);
884                     if (!p->free && !p->closed && p->version == cur_io->page_version) {
885                         if (p->active && cur_io->offset >= p->written) {
886                             ret = _read_from_wbuf(p, cur_io);
887                             do_op = 0;
888                         } else {
889                             p->refcount++;
890                         }
891                         STAT_L(e);
892                         e->stats.bytes_read += cur_io->len;
893                         e->stats.objects_read++;
894                         STAT_UL(e);
895                     } else {
896                         do_op = 0;
897                         ret = -2; // TODO: enum in IO for status?
898                     }
899                     pthread_mutex_unlock(&p->mutex);
900                     if (do_op) {
901 #if !defined(HAVE_PREAD) || !defined(HAVE_PREADV)
902                         // TODO: lseek offset is natively 64-bit on OS X, but
903                         // perhaps not on all platforms? Else use lseek64()
904                         ret = lseek(p->fd, p->offset + cur_io->offset, SEEK_SET);
905                         if (ret >= 0) {
906                             if (cur_io->iov == NULL) {
907                                 ret = read(p->fd, cur_io->buf, cur_io->len);
908                             } else {
909                                 ret = readv(p->fd, cur_io->iov, cur_io->iovcnt);
910                             }
911                         }
912 #else
913                         if (cur_io->iov == NULL) {
914                             ret = pread(p->fd, cur_io->buf, cur_io->len, p->offset + cur_io->offset);
915                         } else {
916                             ret = preadv(p->fd, cur_io->iov, cur_io->iovcnt, p->offset + cur_io->offset);
917                         }
918 #endif
919                     }
920                     break;
921                 case OBJ_IO_WRITE:
922                     do_op = 0;
923                     // FIXME: Should hold refcount during write. doesn't
924                     // currently matter since page can't free while active.
925                     ret = pwrite(p->fd, cur_io->buf, cur_io->len, p->offset + cur_io->offset);
926                     break;
927             }
928             if (ret == 0) {
929                 E_DEBUG("read returned nothing\n");
930             }
931 
932 #ifdef EXTSTORE_DEBUG
933             if (ret == -1) {
934                 perror("read/write op failed");
935             }
936 #endif
937             cur_io->cb(e, cur_io, ret);
938             if (do_op) {
939                 pthread_mutex_lock(&p->mutex);
940                 p->refcount--;
941                 pthread_mutex_unlock(&p->mutex);
942             }
943             cur_io = next;
944         }
945     }
946 
947     return NULL;
948 }
949 
950 // call with *p locked.
_free_page(store_engine * e,store_page * p)951 static void _free_page(store_engine *e, store_page *p) {
952     store_page *tmp = NULL;
953     store_page *prev = NULL;
954     E_DEBUG("EXTSTORE: freeing page %u\n", p->id);
955     STAT_L(e);
956     e->stats.objects_used -= p->obj_count;
957     e->stats.bytes_used -= p->bytes_used;
958     e->stats.page_reclaims++;
959     STAT_UL(e);
960     pthread_mutex_lock(&e->mutex);
961     // unlink page from bucket list
962     tmp = e->page_buckets[p->bucket];
963     while (tmp) {
964         if (tmp == p) {
965             if (prev) {
966                 prev->next = tmp->next;
967             } else {
968                 e->page_buckets[p->bucket] = tmp->next;
969             }
970             tmp->next = NULL;
971             break;
972         }
973         prev = tmp;
974         tmp = tmp->next;
975     }
976     // reset most values
977     p->version = 0;
978     p->obj_count = 0;
979     p->bytes_used = 0;
980     p->allocated = 0;
981     p->written = 0;
982     p->bucket = 0;
983     p->active = false;
984     p->closed = false;
985     p->free = true;
986     // add to page stack
987     p->next = e->free_page_buckets[p->free_bucket];
988     e->free_page_buckets[p->free_bucket] = p;
989     e->page_free++;
990     E_DEBUG("EXTSTORE: pages free %u\n", e->page_free);
991     pthread_mutex_unlock(&e->mutex);
992 }
993