1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3 #include "config.h"
4 // FIXME: config.h?
5 #include <stdint.h>
6 #include <stdbool.h>
7 // end FIXME
8 #include <stdlib.h>
9 #include <limits.h>
10 #include <pthread.h>
11 #include <sys/types.h>
12 #include <sys/stat.h>
13 #include <sys/uio.h>
14 #include <fcntl.h>
15 #include <unistd.h>
16 #include <stdio.h>
17 #include <string.h>
18 #include <assert.h>
19 #include "extstore.h"
20
21 // TODO: better if an init option turns this on/off.
22 #ifdef EXTSTORE_DEBUG
23 #define E_DEBUG(...) \
24 do { \
25 fprintf(stderr, __VA_ARGS__); \
26 } while (0)
27 #else
28 #define E_DEBUG(...)
29 #endif
30
31 #define STAT_L(e) pthread_mutex_lock(&e->stats_mutex);
32 #define STAT_UL(e) pthread_mutex_unlock(&e->stats_mutex);
33 #define STAT_INCR(e, stat, amount) { \
34 pthread_mutex_lock(&e->stats_mutex); \
35 e->stats.stat += amount; \
36 pthread_mutex_unlock(&e->stats_mutex); \
37 }
38
39 #define STAT_DECR(e, stat, amount) { \
40 pthread_mutex_lock(&e->stats_mutex); \
41 e->stats.stat -= amount; \
42 pthread_mutex_unlock(&e->stats_mutex); \
43 }
44
45 typedef struct __store_wbuf {
46 struct __store_wbuf *next;
47 char *buf;
48 char *buf_pos;
49 unsigned int free;
50 unsigned int size;
51 unsigned int offset; /* offset into page this write starts at */
52 bool full; /* done writing to this page */
53 bool flushed; /* whether wbuf has been flushed to disk */
54 } _store_wbuf;
55
56 typedef struct _store_page {
57 pthread_mutex_t mutex; /* Need to be held for most operations */
58 uint64_t obj_count; /* _delete can decrease post-closing */
59 uint64_t bytes_used; /* _delete can decrease post-closing */
60 uint64_t offset; /* starting address of page within fd */
61 unsigned int version;
62 unsigned int refcount;
63 unsigned int allocated;
64 unsigned int written; /* item offsets can be past written if wbuf not flushed */
65 unsigned int bucket; /* which bucket the page is linked into */
66 unsigned int free_bucket; /* which bucket this page returns to when freed */
67 int fd;
68 unsigned short id;
69 bool active; /* actively being written to */
70 bool closed; /* closed and draining before free */
71 bool free; /* on freelist */
72 _store_wbuf *wbuf; /* currently active wbuf from the stack */
73 struct _store_page *next;
74 } store_page;
75
76 typedef struct store_engine store_engine;
77 typedef struct {
78 pthread_mutex_t mutex;
79 pthread_cond_t cond;
80 obj_io *queue;
81 obj_io *queue_tail;
82 store_engine *e;
83 unsigned int depth; // queue depth
84 } store_io_thread;
85
86 // sub-struct for maintenance related tasks.
87 struct store_maint {
88 pthread_mutex_t mutex;
89 };
90
91 struct store_engine {
92 pthread_mutex_t mutex; /* covers internal stacks and variables */
93 store_page *pages; /* directly addressable page list */
94 _store_wbuf *wbuf_stack; /* wbuf freelist */
95 obj_io *io_stack; /* IO's to use with submitting wbuf's */
96 store_io_thread *io_threads;
97 store_io_thread *bg_thread; /* dedicated thread for write submit / compact ops */
98 store_page **page_buckets; /* stack of pages currently allocated to each bucket */
99 store_page **free_page_buckets; /* stack of use-case isolated free pages */
100 size_t page_size;
101 unsigned int version; /* global version counter */
102 unsigned int last_io_thread; /* round robin the IO threads */
103 unsigned int io_threadcount; /* count of IO threads */
104 unsigned int page_count;
105 unsigned int page_free; /* unallocated pages */
106 unsigned int page_bucketcount; /* count of potential page buckets */
107 unsigned int free_page_bucketcount; /* count of free page buckets */
108 unsigned int io_depth; /* FIXME: Might cache into thr struct */
109 pthread_mutex_t stats_mutex;
110 struct extstore_stats stats;
111 struct store_maint maint;
112 };
113
114 // FIXME: code is duplicated from thread.c since extstore.c doesn't pull in
115 // the memcached ecosystem. worth starting a cross-utility header with static
116 // definitions/macros?
117 // keeping a minimal func here for now.
118 #define THR_NAME_MAXLEN 16
thread_setname(pthread_t thread,const char * name)119 static void thread_setname(pthread_t thread, const char *name) {
120 assert(strlen(name) < THR_NAME_MAXLEN);
121 #if defined(__linux__) && defined(HAVE_PTHREAD_SETNAME_NP)
122 pthread_setname_np(thread, name);
123 #endif
124 }
125 #undef THR_NAME_MAXLEN
126
wbuf_new(size_t size)127 static _store_wbuf *wbuf_new(size_t size) {
128 _store_wbuf *b = calloc(1, sizeof(_store_wbuf));
129 if (b == NULL)
130 return NULL;
131 b->buf = calloc(size, sizeof(char));
132 if (b->buf == NULL) {
133 free(b);
134 return NULL;
135 }
136 b->buf_pos = b->buf;
137 b->free = size;
138 b->size = size;
139 return b;
140 }
141
_get_io_thread(store_engine * e)142 static store_io_thread *_get_io_thread(store_engine *e) {
143 int tid = -1;
144 long long int low = LLONG_MAX;
145 pthread_mutex_lock(&e->mutex);
146 // find smallest queue. ignoring lock since being wrong isn't fatal.
147 // TODO: if average queue depth can be quickly tracked, can break as soon
148 // as we see a thread that's less than average, and start from last_io_thread
149 for (int x = 0; x < e->io_threadcount; x++) {
150 if (e->io_threads[x].depth == 0) {
151 tid = x;
152 break;
153 } else if (e->io_threads[x].depth < low) {
154 tid = x;
155 low = e->io_threads[x].depth;
156 }
157 }
158 pthread_mutex_unlock(&e->mutex);
159
160 return &e->io_threads[tid];
161 }
162
_next_version(store_engine * e)163 static uint64_t _next_version(store_engine *e) {
164 return e->version++;
165 }
166 // internal only method for freeing a page up
167 static void _free_page(store_engine *e, store_page *p);
168
169 static void *extstore_io_thread(void *arg);
170
171 /* Copies stats internal to engine and computes any derived values */
extstore_get_stats(void * ptr,struct extstore_stats * st)172 void extstore_get_stats(void *ptr, struct extstore_stats *st) {
173 store_engine *e = (store_engine *)ptr;
174 STAT_L(e);
175 memcpy(st, &e->stats, sizeof(struct extstore_stats));
176 STAT_UL(e);
177
178 // grab pages_free/pages_used
179 pthread_mutex_lock(&e->mutex);
180 st->pages_free = e->page_free;
181 st->pages_used = e->page_count - e->page_free;
182 pthread_mutex_unlock(&e->mutex);
183 st->io_queue = 0;
184 for (int x = 0; x < e->io_threadcount; x++) {
185 pthread_mutex_lock(&e->io_threads[x].mutex);
186 st->io_queue += e->io_threads[x].depth;
187 pthread_mutex_unlock(&e->io_threads[x].mutex);
188 }
189 // calculate bytes_fragmented.
190 // note that open and yet-filled pages count against fragmentation.
191 st->bytes_fragmented = st->pages_used * e->page_size -
192 st->bytes_used;
193 }
194
extstore_get_page_data(void * ptr,struct extstore_stats * st)195 void extstore_get_page_data(void *ptr, struct extstore_stats *st) {
196 store_engine *e = (store_engine *)ptr;
197 pthread_mutex_lock(&e->maint.mutex);
198 struct extstore_page_data *pd = st->page_data;
199
200 for (int i = 0; i < e->page_count; i++) {
201 store_page *p = &e->pages[i];
202 pthread_mutex_lock(&p->mutex);
203
204 pd[p->id].free_bucket = p->free_bucket;
205 pd[p->id].version = p->version;
206 pd[p->id].bytes_used = p->bytes_used;
207 if (p->active) {
208 pd[p->id].active = true;
209 }
210 if (p->active || p->free) {
211 pthread_mutex_unlock(&p->mutex);
212 continue;
213 }
214 if (p->obj_count > 0 && !p->closed) {
215 pd[p->id].bucket = p->bucket;
216 }
217 if ((p->obj_count == 0 || p->closed) && p->refcount == 0) {
218 _free_page(e, p);
219 }
220 pthread_mutex_unlock(&p->mutex);
221 }
222
223 pthread_mutex_unlock(&e->maint.mutex);
224 }
225
extstore_err(enum extstore_res res)226 const char *extstore_err(enum extstore_res res) {
227 const char *rv = "unknown error";
228 switch (res) {
229 case EXTSTORE_INIT_BAD_WBUF_SIZE:
230 rv = "page_size must be divisible by wbuf_size";
231 break;
232 case EXTSTORE_INIT_NEED_MORE_WBUF:
233 rv = "wbuf_count must be >= page_buckets";
234 break;
235 case EXTSTORE_INIT_NEED_MORE_BUCKETS:
236 rv = "page_buckets must be > 0";
237 break;
238 case EXTSTORE_INIT_PAGE_WBUF_ALIGNMENT:
239 rv = "page_size and wbuf_size must be divisible by 1024*1024*2";
240 break;
241 case EXTSTORE_INIT_TOO_MANY_PAGES:
242 rv = "page_count must total to < 65536. Increase page_size or lower path sizes";
243 break;
244 case EXTSTORE_INIT_OOM:
245 rv = "failed calloc for engine";
246 break;
247 case EXTSTORE_INIT_OPEN_FAIL:
248 rv = "failed to open file";
249 break;
250 case EXTSTORE_INIT_THREAD_FAIL:
251 break;
252 }
253 return rv;
254 }
255
256 // TODO: #define's for DEFAULT_BUCKET, FREE_VERSION, etc
extstore_init(struct extstore_conf_file * fh,struct extstore_conf * cf,enum extstore_res * res)257 void *extstore_init(struct extstore_conf_file *fh, struct extstore_conf *cf,
258 enum extstore_res *res) {
259 int i;
260 struct extstore_conf_file *f = NULL;
261 pthread_t thread;
262
263 if (cf->page_size % cf->wbuf_size != 0) {
264 *res = EXTSTORE_INIT_BAD_WBUF_SIZE;
265 return NULL;
266 }
267 // Should ensure at least one write buffer per potential page
268 if (cf->page_buckets > cf->wbuf_count) {
269 *res = EXTSTORE_INIT_NEED_MORE_WBUF;
270 return NULL;
271 }
272 if (cf->page_buckets < 1) {
273 *res = EXTSTORE_INIT_NEED_MORE_BUCKETS;
274 return NULL;
275 }
276
277 // TODO: More intelligence around alignment of flash erasure block sizes
278 if (cf->page_size % (1024 * 1024 * 2) != 0 ||
279 cf->wbuf_size % (1024 * 1024 * 2) != 0) {
280 *res = EXTSTORE_INIT_PAGE_WBUF_ALIGNMENT;
281 return NULL;
282 }
283
284 store_engine *e = calloc(1, sizeof(store_engine));
285 if (e == NULL) {
286 *res = EXTSTORE_INIT_OOM;
287 return NULL;
288 }
289
290 e->page_size = cf->page_size;
291 uint64_t temp_page_count = 0;
292 for (f = fh; f != NULL; f = f->next) {
293 f->fd = open(f->file, O_RDWR | O_CREAT, 0644);
294 if (f->fd < 0) {
295 *res = EXTSTORE_INIT_OPEN_FAIL;
296 #ifdef EXTSTORE_DEBUG
297 perror("extstore open");
298 #endif
299 free(e);
300 return NULL;
301 }
302 // use an fcntl lock to help avoid double starting.
303 struct flock lock;
304 lock.l_type = F_WRLCK;
305 lock.l_start = 0;
306 lock.l_whence = SEEK_SET;
307 lock.l_len = 0;
308 if (fcntl(f->fd, F_SETLK, &lock) < 0) {
309 *res = EXTSTORE_INIT_OPEN_FAIL;
310 free(e);
311 return NULL;
312 }
313 if (ftruncate(f->fd, 0) < 0) {
314 *res = EXTSTORE_INIT_OPEN_FAIL;
315 free(e);
316 return NULL;
317 }
318
319 temp_page_count += f->page_count;
320 f->offset = 0;
321 }
322
323 if (temp_page_count >= UINT16_MAX) {
324 *res = EXTSTORE_INIT_TOO_MANY_PAGES;
325 free(e);
326 return NULL;
327 }
328 e->page_count = temp_page_count;
329
330 e->pages = calloc(e->page_count, sizeof(store_page));
331 if (e->pages == NULL) {
332 *res = EXTSTORE_INIT_OOM;
333 // FIXME: loop-close. make error label
334 free(e);
335 return NULL;
336 }
337
338 // interleave the pages between devices
339 f = NULL; // start at the first device.
340 for (i = 0; i < e->page_count; i++) {
341 // find next device with available pages
342 while (1) {
343 // restart the loop
344 if (f == NULL || f->next == NULL) {
345 f = fh;
346 } else {
347 f = f->next;
348 }
349 if (f->page_count) {
350 f->page_count--;
351 break;
352 }
353 }
354 pthread_mutex_init(&e->pages[i].mutex, NULL);
355 e->pages[i].id = i;
356 e->pages[i].fd = f->fd;
357 e->pages[i].free_bucket = f->free_bucket;
358 e->pages[i].offset = f->offset;
359 e->pages[i].free = true;
360 f->offset += e->page_size;
361 }
362
363 // free page buckets allows the app to organize devices by use case
364 e->free_page_buckets = calloc(cf->page_buckets, sizeof(store_page *));
365 e->free_page_bucketcount = cf->page_buckets;
366
367 e->page_free = e->page_count;
368 for (i = e->page_count-1; i >= 0; i--) {
369 int fb = e->pages[i].free_bucket;
370 e->pages[i].next = e->free_page_buckets[fb];
371 e->free_page_buckets[fb] = &e->pages[i];
372 }
373
374 // 0 is magic "page is freed" version
375 e->version = 1;
376
377 // scratch data for stats. TODO: malloc failure handle
378 e->stats.page_data =
379 calloc(e->page_count, sizeof(struct extstore_page_data));
380 e->stats.page_count = e->page_count;
381 e->stats.page_size = e->page_size;
382
383 // page buckets lazily have pages assigned into them
384 e->page_buckets = calloc(cf->page_buckets, sizeof(store_page *));
385 e->page_bucketcount = cf->page_buckets;
386
387 // allocate write buffers
388 // also IO's to use for shipping to IO thread
389 for (i = 0; i < cf->wbuf_count; i++) {
390 _store_wbuf *w = wbuf_new(cf->wbuf_size);
391 obj_io *io = calloc(1, sizeof(obj_io));
392 /* TODO: on error, loop again and free stack. */
393 w->next = e->wbuf_stack;
394 e->wbuf_stack = w;
395 io->next = e->io_stack;
396 e->io_stack = io;
397 }
398
399 pthread_mutex_init(&e->mutex, NULL);
400 pthread_mutex_init(&e->stats_mutex, NULL);
401 pthread_mutex_init(&e->maint.mutex, NULL);
402
403 e->io_depth = cf->io_depth;
404
405 // spawn threads
406 e->io_threads = calloc(cf->io_threadcount, sizeof(store_io_thread));
407 for (i = 0; i < cf->io_threadcount; i++) {
408 pthread_mutex_init(&e->io_threads[i].mutex, NULL);
409 pthread_cond_init(&e->io_threads[i].cond, NULL);
410 e->io_threads[i].e = e;
411 // FIXME: error handling
412 pthread_create(&thread, NULL, extstore_io_thread, &e->io_threads[i]);
413 thread_setname(thread, "mc-ext-io");
414 }
415 e->io_threadcount = cf->io_threadcount;
416
417 // dedicated IO thread for certain non-hotpath functions.
418 e->bg_thread = calloc(1, sizeof(store_io_thread));
419 e->bg_thread->e = e;
420 pthread_mutex_init(&e->bg_thread->mutex, NULL);
421 pthread_cond_init(&e->bg_thread->cond, NULL);
422 pthread_create(&thread, NULL, extstore_io_thread, e->bg_thread);
423 thread_setname(thread, "mc-ext-bgio");
424
425 return (void *)e;
426 }
427
428 // Call without *e locked, not a fast function.
_evict_page(store_engine * e,unsigned int bucket,unsigned int free_bucket)429 static void _evict_page(store_engine *e, unsigned int bucket,
430 unsigned int free_bucket) {
431 struct extstore_stats st;
432 st.page_data = calloc(e->page_count, sizeof(struct extstore_page_data));
433 extstore_get_page_data(e, &st);
434 uint64_t low_version = ULLONG_MAX;
435 unsigned int low_page = 0;
436
437 // find lowest version of anything in free_bucket OR 0
438 // unless free_bucket is 0
439 for (int i = 0; i < e->page_count; i++) {
440 // must belong to 0 or the requested free_bucket
441 if (st.page_data[i].free_bucket &&
442 st.page_data[i].free_bucket != free_bucket) {
443 continue;
444 }
445
446 // found a free page, don't evict.
447 if (st.page_data[i].version == 0) {
448 low_version = ULLONG_MAX;
449 break;
450 }
451
452 // find the lowest version.
453 if (!st.page_data[i].active &&
454 st.page_data[i].version < low_version) {
455 low_page = i;
456 low_version = st.page_data[i].version;
457 }
458 }
459
460 if (low_version != ULLONG_MAX) {
461 extstore_evict_page(e, low_page, low_version);
462 }
463 }
464
465 // call with *e locked
_allocate_page(store_engine * e,unsigned int bucket,unsigned int free_bucket)466 static store_page *_allocate_page(store_engine *e, unsigned int bucket,
467 unsigned int free_bucket) {
468 E_DEBUG("EXTSTORE: allocating new page [bucket:%u]\n", bucket);
469 assert(!e->page_buckets[bucket] || e->page_buckets[bucket]->allocated == e->page_size);
470 store_page *tmp = NULL;
471 if (e->free_page_buckets[free_bucket] != NULL) {
472 assert(e->page_free > 0);
473 tmp = e->free_page_buckets[free_bucket];
474 e->free_page_buckets[free_bucket] = tmp->next;
475 } else if (e->free_page_buckets[0] != NULL) {
476 // fall back to default bucket.
477 assert(e->page_free > 0);
478 tmp = e->free_page_buckets[0];
479 e->free_page_buckets[0] = tmp->next;
480 }
481 if (tmp != NULL) {
482 tmp->next = e->page_buckets[bucket];
483 e->page_buckets[bucket] = tmp;
484 tmp->active = true;
485 tmp->free = false;
486 tmp->closed = false;
487 tmp->version = _next_version(e);
488 tmp->bucket = bucket;
489 e->page_free--;
490 STAT_INCR(e, page_allocs, 1);
491 }
492
493 if (tmp)
494 E_DEBUG("EXTSTORE: got page %u [free:%u]\n", tmp->id, e->page_free);
495 return tmp;
496 }
497
498 // call with *p locked. locks *e
_allocate_wbuf(store_engine * e,store_page * p)499 static void _allocate_wbuf(store_engine *e, store_page *p) {
500 _store_wbuf *wbuf = NULL;
501 assert(!p->wbuf);
502 pthread_mutex_lock(&e->mutex);
503 if (e->wbuf_stack) {
504 wbuf = e->wbuf_stack;
505 e->wbuf_stack = wbuf->next;
506 wbuf->next = 0;
507 }
508 pthread_mutex_unlock(&e->mutex);
509 if (wbuf) {
510 wbuf->offset = p->allocated;
511 p->allocated += wbuf->size;
512 wbuf->free = wbuf->size;
513 wbuf->buf_pos = wbuf->buf;
514 wbuf->full = false;
515 wbuf->flushed = false;
516
517 p->wbuf = wbuf;
518 }
519 }
520
521 /* callback after wbuf is flushed. can only remove wbuf's from the head onward
522 * if successfully flushed, which complicates this routine. each callback
523 * attempts to free the wbuf stack, which is finally done when the head wbuf's
524 * callback happens.
525 * It's rare flushes would happen out of order.
526 */
_wbuf_cb(void * ep,obj_io * io,int ret)527 static void _wbuf_cb(void *ep, obj_io *io, int ret) {
528 store_engine *e = (store_engine *)ep;
529 store_page *p = &e->pages[io->page_id];
530 _store_wbuf *w = (_store_wbuf *) io->data;
531
532 // TODO: Examine return code. Not entirely sure how to handle errors.
533 // Naive first-pass should probably cause the page to close/free.
534 w->flushed = true;
535 pthread_mutex_lock(&p->mutex);
536 assert(p->wbuf != NULL && p->wbuf == w);
537 assert(p->written == w->offset);
538 p->written += w->size;
539 p->wbuf = NULL;
540
541 if (p->written == e->page_size)
542 p->active = false;
543
544 // return the wbuf
545 pthread_mutex_lock(&e->mutex);
546 w->next = e->wbuf_stack;
547 e->wbuf_stack = w;
548 // also return the IO we just used.
549 io->next = e->io_stack;
550 e->io_stack = io;
551 pthread_mutex_unlock(&e->mutex);
552 pthread_mutex_unlock(&p->mutex);
553 }
554
555 /* Wraps pages current wbuf in an io and submits to IO thread.
556 * Called with p locked, locks e.
557 */
_submit_wbuf(store_engine * e,store_page * p)558 static void _submit_wbuf(store_engine *e, store_page *p) {
559 _store_wbuf *w;
560 pthread_mutex_lock(&e->mutex);
561 obj_io *io = e->io_stack;
562 e->io_stack = io->next;
563 pthread_mutex_unlock(&e->mutex);
564 w = p->wbuf;
565
566 // zero out the end of the wbuf to allow blind readback of data.
567 memset(w->buf + (w->size - w->free), 0, w->free);
568
569 io->next = NULL;
570 io->mode = OBJ_IO_WRITE;
571 io->page_id = p->id;
572 io->data = w;
573 io->offset = w->offset;
574 io->len = w->size;
575 io->buf = w->buf;
576 io->cb = _wbuf_cb;
577
578 extstore_submit_bg(e, io);
579 }
580
581 /* engine write function; takes engine, item_io.
582 * fast fail if no available write buffer (flushing)
583 * lock engine context, find active page, unlock
584 * if page full, submit page/buffer to io thread.
585 *
586 * write is designed to be flaky; if page full, caller must try again to get
587 * new page. best if used from a background thread that can harmlessly retry.
588 */
589
extstore_write_request(void * ptr,unsigned int bucket,unsigned int free_bucket,obj_io * io)590 int extstore_write_request(void *ptr, unsigned int bucket,
591 unsigned int free_bucket, obj_io *io) {
592 store_engine *e = (store_engine *)ptr;
593 store_page *p;
594 int ret = -1;
595 if (bucket >= e->page_bucketcount)
596 return ret;
597
598 pthread_mutex_lock(&e->mutex);
599 p = e->page_buckets[bucket];
600 if (!p) {
601 p = _allocate_page(e, bucket, free_bucket);
602 }
603 pthread_mutex_unlock(&e->mutex);
604 if (!p) {
605 _evict_page(e, bucket, free_bucket);
606 return ret;
607 }
608
609 pthread_mutex_lock(&p->mutex);
610
611 // FIXME: can't null out page_buckets!!!
612 // page is full, clear bucket and retry later.
613 if (!p->active ||
614 ((!p->wbuf || p->wbuf->full) && p->allocated >= e->page_size)) {
615 pthread_mutex_unlock(&p->mutex);
616 pthread_mutex_lock(&e->mutex);
617 store_page *temp_p = _allocate_page(e, bucket, free_bucket);
618 pthread_mutex_unlock(&e->mutex);
619 if (!temp_p) {
620 _evict_page(e, bucket, free_bucket);
621 }
622 return ret;
623 }
624
625 // if io won't fit, submit IO for wbuf and find new one.
626 if (p->wbuf && p->wbuf->free < io->len && !p->wbuf->full) {
627 _submit_wbuf(e, p);
628 p->wbuf->full = true;
629 }
630
631 if (!p->wbuf && p->allocated < e->page_size) {
632 _allocate_wbuf(e, p);
633 }
634
635 // hand over buffer for caller to copy into
636 // leaves p locked.
637 if (p->wbuf && !p->wbuf->full && p->wbuf->free >= io->len) {
638 io->buf = p->wbuf->buf_pos;
639 io->page_id = p->id;
640 return 0;
641 }
642
643 pthread_mutex_unlock(&p->mutex);
644 // p->written is incremented post-wbuf flush
645 return ret;
646 }
647
648 /* _must_ be called after a successful write_request.
649 * fills the rest of io structure.
650 */
extstore_write(void * ptr,obj_io * io)651 void extstore_write(void *ptr, obj_io *io) {
652 store_engine *e = (store_engine *)ptr;
653 store_page *p = &e->pages[io->page_id];
654
655 io->offset = p->wbuf->offset + (p->wbuf->size - p->wbuf->free);
656 io->page_version = p->version;
657 p->wbuf->buf_pos += io->len;
658 p->wbuf->free -= io->len;
659 p->bytes_used += io->len;
660 p->obj_count++;
661 STAT_L(e);
662 e->stats.bytes_written += io->len;
663 e->stats.bytes_used += io->len;
664 e->stats.objects_written++;
665 e->stats.objects_used++;
666 STAT_UL(e);
667
668 pthread_mutex_unlock(&p->mutex);
669 }
670
671 /* engine submit function; takes engine, item_io stack.
672 * lock io_thread context and add stack
673 * signal io thread to wake.
674 * return success.
675 */
_extstore_submit(void * ptr,obj_io * io,store_io_thread * t)676 static int _extstore_submit(void *ptr, obj_io *io, store_io_thread *t) {
677 unsigned int depth = 0;
678 obj_io *tio = io;
679 obj_io *tail = NULL;
680 while (tio != NULL) {
681 tail = tio; // keep updating potential tail.
682 depth++;
683 tio = tio->next;
684 }
685
686 pthread_mutex_lock(&t->mutex);
687
688 t->depth += depth;
689 if (t->queue == NULL) {
690 t->queue = io;
691 t->queue_tail = tail;
692 } else {
693 // Have to put the *io stack at the end of current queue.
694 assert(tail->next == NULL);
695 assert(t->queue_tail->next == NULL);
696 t->queue_tail->next = io;
697 t->queue_tail = tail;
698 }
699
700 pthread_mutex_unlock(&t->mutex);
701
702 //pthread_mutex_lock(&t->mutex);
703 pthread_cond_signal(&t->cond);
704 //pthread_mutex_unlock(&t->mutex);
705 return 0;
706 }
707
extstore_submit(void * ptr,obj_io * io)708 int extstore_submit(void *ptr, obj_io *io) {
709 store_engine *e = (store_engine *)ptr;
710 store_io_thread *t = _get_io_thread(e);
711 return _extstore_submit(ptr, io, t);
712 }
713
extstore_submit_bg(void * ptr,obj_io * io)714 int extstore_submit_bg(void *ptr, obj_io *io) {
715 store_engine *e = (store_engine *)ptr;
716 store_io_thread *t = e->bg_thread;
717 return _extstore_submit(ptr, io, t);
718 }
719
720 /* engine note delete function: takes engine, page id, size?
721 * note that an item in this page is no longer valid
722 */
extstore_delete(void * ptr,unsigned int page_id,uint64_t page_version,unsigned int count,unsigned int bytes)723 int extstore_delete(void *ptr, unsigned int page_id, uint64_t page_version,
724 unsigned int count, unsigned int bytes) {
725 store_engine *e = (store_engine *)ptr;
726 // FIXME: validate page_id in bounds
727 store_page *p = &e->pages[page_id];
728 int ret = 0;
729
730 pthread_mutex_lock(&p->mutex);
731 if (!p->closed && p->version == page_version) {
732 if (p->bytes_used >= bytes) {
733 p->bytes_used -= bytes;
734 } else {
735 p->bytes_used = 0;
736 }
737
738 if (p->obj_count >= count) {
739 p->obj_count -= count;
740 } else {
741 p->obj_count = 0; // caller has bad accounting?
742 }
743 STAT_L(e);
744 e->stats.bytes_used -= bytes;
745 e->stats.objects_used -= count;
746 STAT_UL(e);
747
748 if (p->obj_count == 0 && p->refcount == 0 && !p->active) {
749 _free_page(e, p);
750 }
751 } else {
752 ret = -1;
753 }
754 pthread_mutex_unlock(&p->mutex);
755 return ret;
756 }
757
extstore_check(void * ptr,unsigned int page_id,uint64_t page_version)758 int extstore_check(void *ptr, unsigned int page_id, uint64_t page_version) {
759 store_engine *e = (store_engine *)ptr;
760 store_page *p = &e->pages[page_id];
761 int ret = 0;
762
763 pthread_mutex_lock(&p->mutex);
764 if (p->version != page_version)
765 ret = -1;
766 pthread_mutex_unlock(&p->mutex);
767 return ret;
768 }
769
770 /* allows a compactor to say "we're done with this page, kill it." */
extstore_close_page(void * ptr,unsigned int page_id,uint64_t page_version)771 void extstore_close_page(void *ptr, unsigned int page_id, uint64_t page_version) {
772 store_engine *e = (store_engine *)ptr;
773 store_page *p = &e->pages[page_id];
774
775 pthread_mutex_lock(&p->mutex);
776 if (!p->closed && !p->active && p->version == page_version) {
777 p->closed = true;
778 if (p->refcount == 0) {
779 _free_page(e, p);
780 }
781 }
782 pthread_mutex_unlock(&p->mutex);
783 }
784
785 /* signal that we've forcefully ejected rather than gracefully closed */
extstore_evict_page(void * ptr,unsigned int page_id,uint64_t page_version)786 void extstore_evict_page(void *ptr, unsigned int page_id, uint64_t page_version) {
787 store_engine *e = (store_engine *)ptr;
788 store_page *p = &e->pages[page_id];
789
790 pthread_mutex_lock(&p->mutex);
791 if (!p->closed && !p->active && p->version == page_version) {
792 E_DEBUG("EXTSTORE: evicting page [%d] [v: %llu]\n",
793 p->id, (unsigned long long) p->version);
794
795 p->closed = true;
796 STAT_L(e);
797 e->stats.page_evictions++;
798 e->stats.objects_evicted += p->obj_count;
799 e->stats.bytes_evicted += p->bytes_used;
800 STAT_UL(e);
801 if (p->refcount == 0) {
802 _free_page(e, p);
803 }
804 }
805 pthread_mutex_unlock(&p->mutex);
806 }
807
808 /* Finds an attached wbuf that can satisfy the read.
809 * Since wbufs can potentially be flushed to disk out of order, they are only
810 * removed as the head of the list successfully flushes to disk.
811 */
812 // call with *p locked
813 // FIXME: protect from reading past wbuf
_read_from_wbuf(store_page * p,obj_io * io)814 static inline int _read_from_wbuf(store_page *p, obj_io *io) {
815 _store_wbuf *wbuf = p->wbuf;
816 assert(wbuf != NULL);
817 assert(io->offset < p->written + wbuf->size);
818 if (io->iov == NULL) {
819 memcpy(io->buf, wbuf->buf + (io->offset - wbuf->offset), io->len);
820 } else {
821 int x;
822 unsigned int off = io->offset - wbuf->offset;
823 // need to loop fill iovecs
824 for (x = 0; x < io->iovcnt; x++) {
825 struct iovec *iov = &io->iov[x];
826 memcpy(iov->iov_base, wbuf->buf + off, iov->iov_len);
827 off += iov->iov_len;
828 }
829 }
830 return io->len;
831 }
832
833 /* engine IO thread; takes engine context
834 * manage writes/reads
835 * runs IO callbacks inline after each IO
836 */
837 // FIXME: protect from reading past page
extstore_io_thread(void * arg)838 static void *extstore_io_thread(void *arg) {
839 store_io_thread *me = (store_io_thread *)arg;
840 store_engine *e = me->e;
841 while (1) {
842 obj_io *io_stack = NULL;
843 pthread_mutex_lock(&me->mutex);
844 if (me->queue == NULL) {
845 pthread_cond_wait(&me->cond, &me->mutex);
846 }
847
848 // Pull and disconnect a batch from the queue
849 // Chew small batches from the queue so the IO thread picker can keep
850 // the IO queue depth even, instead of piling on threads one at a time
851 // as they gobble a queue.
852 if (me->queue != NULL) {
853 int i;
854 obj_io *end = NULL;
855 io_stack = me->queue;
856 end = io_stack;
857 for (i = 1; i < e->io_depth; i++) {
858 if (end->next) {
859 end = end->next;
860 } else {
861 me->queue_tail = end->next;
862 break;
863 }
864 }
865 me->depth -= i;
866 me->queue = end->next;
867 end->next = NULL;
868 }
869 pthread_mutex_unlock(&me->mutex);
870
871 obj_io *cur_io = io_stack;
872 while (cur_io) {
873 // We need to note next before the callback in case the obj_io
874 // gets reused.
875 obj_io *next = cur_io->next;
876 int ret = 0;
877 int do_op = 1;
878 store_page *p = &e->pages[cur_io->page_id];
879 // TODO: loop if not enough bytes were read/written.
880 switch (cur_io->mode) {
881 case OBJ_IO_READ:
882 // Page is currently open. deal if read is past the end.
883 pthread_mutex_lock(&p->mutex);
884 if (!p->free && !p->closed && p->version == cur_io->page_version) {
885 if (p->active && cur_io->offset >= p->written) {
886 ret = _read_from_wbuf(p, cur_io);
887 do_op = 0;
888 } else {
889 p->refcount++;
890 }
891 STAT_L(e);
892 e->stats.bytes_read += cur_io->len;
893 e->stats.objects_read++;
894 STAT_UL(e);
895 } else {
896 do_op = 0;
897 ret = -2; // TODO: enum in IO for status?
898 }
899 pthread_mutex_unlock(&p->mutex);
900 if (do_op) {
901 #if !defined(HAVE_PREAD) || !defined(HAVE_PREADV)
902 // TODO: lseek offset is natively 64-bit on OS X, but
903 // perhaps not on all platforms? Else use lseek64()
904 ret = lseek(p->fd, p->offset + cur_io->offset, SEEK_SET);
905 if (ret >= 0) {
906 if (cur_io->iov == NULL) {
907 ret = read(p->fd, cur_io->buf, cur_io->len);
908 } else {
909 ret = readv(p->fd, cur_io->iov, cur_io->iovcnt);
910 }
911 }
912 #else
913 if (cur_io->iov == NULL) {
914 ret = pread(p->fd, cur_io->buf, cur_io->len, p->offset + cur_io->offset);
915 } else {
916 ret = preadv(p->fd, cur_io->iov, cur_io->iovcnt, p->offset + cur_io->offset);
917 }
918 #endif
919 }
920 break;
921 case OBJ_IO_WRITE:
922 do_op = 0;
923 // FIXME: Should hold refcount during write. doesn't
924 // currently matter since page can't free while active.
925 ret = pwrite(p->fd, cur_io->buf, cur_io->len, p->offset + cur_io->offset);
926 break;
927 }
928 if (ret == 0) {
929 E_DEBUG("read returned nothing\n");
930 }
931
932 #ifdef EXTSTORE_DEBUG
933 if (ret == -1) {
934 perror("read/write op failed");
935 }
936 #endif
937 cur_io->cb(e, cur_io, ret);
938 if (do_op) {
939 pthread_mutex_lock(&p->mutex);
940 p->refcount--;
941 pthread_mutex_unlock(&p->mutex);
942 }
943 cur_io = next;
944 }
945 }
946
947 return NULL;
948 }
949
950 // call with *p locked.
_free_page(store_engine * e,store_page * p)951 static void _free_page(store_engine *e, store_page *p) {
952 store_page *tmp = NULL;
953 store_page *prev = NULL;
954 E_DEBUG("EXTSTORE: freeing page %u\n", p->id);
955 STAT_L(e);
956 e->stats.objects_used -= p->obj_count;
957 e->stats.bytes_used -= p->bytes_used;
958 e->stats.page_reclaims++;
959 STAT_UL(e);
960 pthread_mutex_lock(&e->mutex);
961 // unlink page from bucket list
962 tmp = e->page_buckets[p->bucket];
963 while (tmp) {
964 if (tmp == p) {
965 if (prev) {
966 prev->next = tmp->next;
967 } else {
968 e->page_buckets[p->bucket] = tmp->next;
969 }
970 tmp->next = NULL;
971 break;
972 }
973 prev = tmp;
974 tmp = tmp->next;
975 }
976 // reset most values
977 p->version = 0;
978 p->obj_count = 0;
979 p->bytes_used = 0;
980 p->allocated = 0;
981 p->written = 0;
982 p->bucket = 0;
983 p->active = false;
984 p->closed = false;
985 p->free = true;
986 // add to page stack
987 p->next = e->free_page_buckets[p->free_bucket];
988 e->free_page_buckets[p->free_bucket] = p;
989 e->page_free++;
990 E_DEBUG("EXTSTORE: pages free %u\n", e->page_free);
991 pthread_mutex_unlock(&e->mutex);
992 }
993