1916fff36Sdormando /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2916fff36Sdormando
3916fff36Sdormando #include <stdlib.h>
4725cdf4dSMathieu CARBONNEAUX #include <stdio.h>
5916fff36Sdormando #include <string.h>
6916fff36Sdormando #include <errno.h>
7916fff36Sdormando #include <poll.h>
8f27a1022Sdormando #include <ctype.h>
9916fff36Sdormando
10916fff36Sdormando #include "memcached.h"
11916fff36Sdormando #include "bipbuffer.h"
12916fff36Sdormando
13a4f8b982Sdormando #ifdef LOGGER_DEBUG
14916fff36Sdormando #define L_DEBUG(...) \
15916fff36Sdormando do { \
16916fff36Sdormando fprintf(stderr, __VA_ARGS__); \
17916fff36Sdormando } while (0)
18916fff36Sdormando #else
19916fff36Sdormando #define L_DEBUG(...)
20916fff36Sdormando #endif
21916fff36Sdormando
22916fff36Sdormando
23916fff36Sdormando /* TODO: put this in a struct and ditch the global vars. */
24916fff36Sdormando static logger *logger_stack_head = NULL;
25916fff36Sdormando static logger *logger_stack_tail = NULL;
26916fff36Sdormando static unsigned int logger_count = 0;
27916fff36Sdormando static volatile int do_run_logger_thread = 1;
28916fff36Sdormando static pthread_t logger_tid;
29916fff36Sdormando pthread_mutex_t logger_stack_lock = PTHREAD_MUTEX_INITIALIZER;
30916fff36Sdormando
31916fff36Sdormando pthread_key_t logger_key;
32916fff36Sdormando
33*60e98600Sdormando #if !defined(HAVE_GCC_64ATOMICS) && !defined(__sun)
34916fff36Sdormando pthread_mutex_t logger_atomics_mutex = PTHREAD_MUTEX_INITIALIZER;
35916fff36Sdormando #endif
36916fff36Sdormando
37916fff36Sdormando #define WATCHER_LIMIT 20
38916fff36Sdormando logger_watcher *watchers[20];
39916fff36Sdormando struct pollfd watchers_pollfds[20];
40916fff36Sdormando int watcher_count = 0;
41916fff36Sdormando
42f27a1022Sdormando static char *logger_uriencode_map[256];
43f27a1022Sdormando static char logger_uriencode_str[768];
44f27a1022Sdormando
45916fff36Sdormando /* Should this go somewhere else? */
46916fff36Sdormando static const entry_details default_entries[] = {
47d5f7f89aSdormando [LOGGER_ASCII_CMD] = {LOGGER_TEXT_ENTRY, 512, LOG_RAWCMDS, "<%d %s"},
48c7fbccebSdormando [LOGGER_EVICTION] = {LOGGER_EVICTION_ENTRY, 512, LOG_EVICTIONS, NULL},
49c7fbccebSdormando [LOGGER_ITEM_GET] = {LOGGER_ITEM_GET_ENTRY, 512, LOG_FETCHERS, NULL},
50c7fbccebSdormando [LOGGER_ITEM_STORE] = {LOGGER_ITEM_STORE_ENTRY, 512, LOG_MUTATIONS, NULL}
51916fff36Sdormando };
52916fff36Sdormando
531b03a9c6Sdormando #define WATCHER_ALL -1
54426310bcSdormando static int logger_thread_poll_watchers(int force_poll, int watcher);
55426310bcSdormando
56426310bcSdormando /*************************
57426310bcSdormando * Util functions shared between bg thread and workers
58426310bcSdormando *************************/
59916fff36Sdormando
logger_uriencode_init(void)60f27a1022Sdormando static void logger_uriencode_init(void) {
61f27a1022Sdormando int x;
62f27a1022Sdormando char *str = logger_uriencode_str;
63f27a1022Sdormando for (x = 0; x < 256; x++) {
64f27a1022Sdormando if (isalnum(x) || x == '-' || x == '.' || x == '_' || x == '~') {
65f27a1022Sdormando logger_uriencode_map[x] = NULL;
66f27a1022Sdormando } else {
67f27a1022Sdormando snprintf(str, 4, "%%%02X", x);
68f27a1022Sdormando logger_uriencode_map[x] = str;
69f27a1022Sdormando str += 3; /* lobbing off the \0 is fine */
70f27a1022Sdormando }
71f27a1022Sdormando }
72f27a1022Sdormando }
73f27a1022Sdormando
logger_uriencode(const char * src,char * dst,const size_t srclen,const size_t dstlen)74f27a1022Sdormando static bool logger_uriencode(const char *src, char *dst, const size_t srclen, const size_t dstlen) {
75f27a1022Sdormando int x;
76f27a1022Sdormando size_t d = 0;
77f27a1022Sdormando for (x = 0; x < srclen; x++) {
78f27a1022Sdormando if (d + 4 >= dstlen)
79f27a1022Sdormando return false;
80f27a1022Sdormando if (logger_uriencode_map[(unsigned char) src[x]] != NULL) {
81f27a1022Sdormando memcpy(&dst[d], logger_uriencode_map[(unsigned char) src[x]], 3);
82f27a1022Sdormando d += 3;
83f27a1022Sdormando } else {
84f27a1022Sdormando dst[d] = src[x];
85f27a1022Sdormando d++;
86f27a1022Sdormando }
87f27a1022Sdormando }
88f27a1022Sdormando dst[d] = '\0';
89f27a1022Sdormando return true;
90f27a1022Sdormando }
91f27a1022Sdormando
92916fff36Sdormando /* Logger GID's can be used by watchers to put logs back into strict order
93916fff36Sdormando */
logger_get_gid(void)94916fff36Sdormando static uint64_t logger_get_gid(void) {
95916fff36Sdormando static uint64_t logger_gid = 0;
96*60e98600Sdormando #ifdef HAVE_GCC_64ATOMICS
97916fff36Sdormando return __sync_add_and_fetch(&logger_gid, 1);
98916fff36Sdormando #elif defined(__sun)
99916fff36Sdormando return atomic_inc_64_nv(&logger_gid);
100916fff36Sdormando #else
101916fff36Sdormando mutex_lock(&logger_atomics_mutex);
102916fff36Sdormando uint64_t res = ++logger_gid;
103916fff36Sdormando mutex_unlock(&logger_atomics_mutex);
104916fff36Sdormando return res;
105916fff36Sdormando #endif
106916fff36Sdormando }
107916fff36Sdormando
108916fff36Sdormando /* TODO: genericize lists. would be nice to import queue.h if the impact is
109916fff36Sdormando * studied... otherwise can just write a local one.
110916fff36Sdormando */
111916fff36Sdormando /* Add to the list of threads with a logger object */
logger_link_q(logger * l)112916fff36Sdormando static void logger_link_q(logger *l) {
113916fff36Sdormando pthread_mutex_lock(&logger_stack_lock);
114916fff36Sdormando assert(l != logger_stack_head);
115916fff36Sdormando
116916fff36Sdormando l->prev = 0;
117916fff36Sdormando l->next = logger_stack_head;
118916fff36Sdormando if (l->next) l->next->prev = l;
119916fff36Sdormando logger_stack_head = l;
120916fff36Sdormando if (logger_stack_tail == 0) logger_stack_tail = l;
121916fff36Sdormando logger_count++;
122916fff36Sdormando pthread_mutex_unlock(&logger_stack_lock);
123916fff36Sdormando return;
124916fff36Sdormando }
125916fff36Sdormando
126916fff36Sdormando /* Remove from the list of threads with a logger object */
127916fff36Sdormando /*static void logger_unlink_q(logger *l) {
128916fff36Sdormando pthread_mutex_lock(&logger_stack_lock);
129916fff36Sdormando if (logger_stack_head == l) {
130916fff36Sdormando assert(l->prev == 0);
131916fff36Sdormando logger_stack_head = l->next;
132916fff36Sdormando }
133916fff36Sdormando if (logger_stack_tail == l) {
134916fff36Sdormando assert(l->next == 0);
135916fff36Sdormando logger_stack_tail = l->prev;
136916fff36Sdormando }
137916fff36Sdormando assert(l->next != l);
138916fff36Sdormando assert(l->prev != l);
139916fff36Sdormando
140916fff36Sdormando if (l->next) l->next->prev = l->prev;
141916fff36Sdormando if (l->prev) l->prev->next = l->next;
142916fff36Sdormando logger_count--;
143916fff36Sdormando pthread_mutex_unlock(&logger_stack_lock);
144916fff36Sdormando return;
145916fff36Sdormando }*/
146916fff36Sdormando
147426310bcSdormando /* Called with logger stack locked.
148426310bcSdormando * Iterates over every watcher collecting enabled flags.
149426310bcSdormando */
logger_set_flags(void)150426310bcSdormando static void logger_set_flags(void) {
151426310bcSdormando logger *l = NULL;
152426310bcSdormando int x = 0;
153426310bcSdormando uint16_t f = 0; /* logger eflags */
154426310bcSdormando
155426310bcSdormando for (x = 0; x < WATCHER_LIMIT; x++) {
156426310bcSdormando logger_watcher *w = watchers[x];
157426310bcSdormando if (w == NULL)
158426310bcSdormando continue;
159426310bcSdormando
160426310bcSdormando f |= w->eflags;
161426310bcSdormando }
162426310bcSdormando for (l = logger_stack_head; l != NULL; l=l->next) {
163426310bcSdormando pthread_mutex_lock(&l->mutex);
164426310bcSdormando l->eflags = f;
165426310bcSdormando pthread_mutex_unlock(&l->mutex);
166426310bcSdormando }
167426310bcSdormando return;
168426310bcSdormando }
169426310bcSdormando
170426310bcSdormando /*************************
171426310bcSdormando * Logger background thread functions. Aggregates per-worker buffers and
172426310bcSdormando * writes to any watchers.
173426310bcSdormando *************************/
174426310bcSdormando
175a4f8b982Sdormando #define LOGGER_PARSE_SCRATCH 4096
1761eec5d41Sdormando
_logger_thread_parse_ise(logentry * e,char * scratch)1771eec5d41Sdormando static int _logger_thread_parse_ise(logentry *e, char *scratch) {
1781eec5d41Sdormando int total;
1791eec5d41Sdormando const char *cmd = "na";
1801eec5d41Sdormando char keybuf[KEY_MAX_LENGTH * 3 + 1];
1811eec5d41Sdormando struct logentry_item_store *le = (struct logentry_item_store *) e->data;
1821eec5d41Sdormando const char * const status_map[] = {
1831eec5d41Sdormando "stored", "exists", "not_found", "too_large", "no_memory" };
1841eec5d41Sdormando const char * const cmd_map[] = {
1851eec5d41Sdormando "add", "set", "replace", "append", "prepend", "cas" };
1861eec5d41Sdormando
1871eec5d41Sdormando if (le->cmd <= 5)
1881eec5d41Sdormando cmd = cmd_map[le->cmd];
1891eec5d41Sdormando
1901eec5d41Sdormando logger_uriencode(le->key, keybuf, le->nkey, LOGGER_PARSE_SCRATCH);
1911eec5d41Sdormando total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
1921eec5d41Sdormando "ts=%d.%d gid=%llu type=item_store key=%s status=%s cmd=%s\n",
1931eec5d41Sdormando (int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
1941eec5d41Sdormando keybuf, status_map[le->status], cmd);
1951eec5d41Sdormando return total;
1961eec5d41Sdormando }
1971eec5d41Sdormando
_logger_thread_parse_ige(logentry * e,char * scratch)1981eec5d41Sdormando static int _logger_thread_parse_ige(logentry *e, char *scratch) {
1991eec5d41Sdormando int total;
2001eec5d41Sdormando struct logentry_item_get *le = (struct logentry_item_get *) e->data;
2011eec5d41Sdormando char keybuf[KEY_MAX_LENGTH * 3 + 1];
2021eec5d41Sdormando const char * const was_found_map[] = {
2031eec5d41Sdormando "not_found", "found", "flushed", "expired" };
2041eec5d41Sdormando
2051eec5d41Sdormando logger_uriencode(le->key, keybuf, le->nkey, LOGGER_PARSE_SCRATCH);
2061eec5d41Sdormando total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
2071eec5d41Sdormando "ts=%d.%d gid=%llu type=item_get key=%s status=%s\n",
2081eec5d41Sdormando (int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
2091eec5d41Sdormando keybuf, was_found_map[le->was_found]);
2101eec5d41Sdormando return total;
2111eec5d41Sdormando }
2121eec5d41Sdormando
_logger_thread_parse_ee(logentry * e,char * scratch)2131eec5d41Sdormando static int _logger_thread_parse_ee(logentry *e, char *scratch) {
2141eec5d41Sdormando int total;
2151eec5d41Sdormando char keybuf[KEY_MAX_LENGTH * 3 + 1];
2161eec5d41Sdormando struct logentry_eviction *le = (struct logentry_eviction *) e->data;
2171eec5d41Sdormando logger_uriencode(le->key, keybuf, le->nkey, LOGGER_PARSE_SCRATCH);
2181eec5d41Sdormando total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
2191eec5d41Sdormando "ts=%d.%d gid=%llu type=eviction key=%s fetch=%s ttl=%lld la=%d\n",
2201eec5d41Sdormando (int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
2211eec5d41Sdormando keybuf, (le->it_flags & ITEM_FETCHED) ? "yes" : "no",
2221eec5d41Sdormando (long long int)le->exptime, le->latime);
2231eec5d41Sdormando
2241eec5d41Sdormando return total;
2251eec5d41Sdormando }
2261eec5d41Sdormando
2271eec5d41Sdormando /* Completes rendering of log line. */
logger_thread_parse_entry(logentry * e,struct logger_stats * ls,char * scratch,int * scratch_len)2281eec5d41Sdormando static enum logger_parse_entry_ret logger_thread_parse_entry(logentry *e, struct logger_stats *ls,
2291eec5d41Sdormando char *scratch, int *scratch_len) {
230916fff36Sdormando int total = 0;
231916fff36Sdormando
232916fff36Sdormando switch (e->event) {
233916fff36Sdormando case LOGGER_TEXT_ENTRY:
234cb8257e3Sdormando total = snprintf(scratch, LOGGER_PARSE_SCRATCH, "ts=%d.%d gid=%llu %s\n",
235cb8257e3Sdormando (int)e->tv.tv_sec, (int)e->tv.tv_usec,
236cb8257e3Sdormando (unsigned long long) e->gid, (char *) e->data);
237cb8257e3Sdormando break;
2381eec5d41Sdormando case LOGGER_EVICTION_ENTRY:
2391eec5d41Sdormando total = _logger_thread_parse_ee(e, scratch);
240cb8257e3Sdormando break;
2411eec5d41Sdormando case LOGGER_ITEM_GET_ENTRY:
2421eec5d41Sdormando total = _logger_thread_parse_ige(e, scratch);
243c7fbccebSdormando break;
2441eec5d41Sdormando case LOGGER_ITEM_STORE_ENTRY:
2451eec5d41Sdormando total = _logger_thread_parse_ise(e, scratch);
246c7fbccebSdormando break;
247c7fbccebSdormando
248cb8257e3Sdormando }
249cb8257e3Sdormando
250a4f8b982Sdormando if (total >= LOGGER_PARSE_SCRATCH || total <= 0) {
251d5f7f89aSdormando L_DEBUG("LOGGER: Failed to flatten log entry!\n");
252d5f7f89aSdormando return LOGGER_PARSE_ENTRY_FAILED;
253916fff36Sdormando } else {
2541eec5d41Sdormando *scratch_len = total + 1;
255916fff36Sdormando }
256916fff36Sdormando
2571eec5d41Sdormando return LOGGER_PARSE_ENTRY_OK;
2581eec5d41Sdormando }
2591eec5d41Sdormando
2601eec5d41Sdormando /* Writes flattened entry to available watchers */
logger_thread_write_entry(logentry * e,struct logger_stats * ls,char * scratch,int scratch_len)2611eec5d41Sdormando static void logger_thread_write_entry(logentry *e, struct logger_stats *ls,
2621eec5d41Sdormando char *scratch, int scratch_len) {
2631eec5d41Sdormando int x, total;
26482e4e9a1Sdormando /* Write the line into available watchers with matching flags */
265a4f8b982Sdormando for (x = 0; x < WATCHER_LIMIT; x++) {
266a4f8b982Sdormando logger_watcher *w = watchers[x];
26782e4e9a1Sdormando char *skip_scr = NULL;
26882e4e9a1Sdormando if (w == NULL || (e->eflags & w->eflags) == 0)
269a4f8b982Sdormando continue;
270a4f8b982Sdormando
27182e4e9a1Sdormando /* Avoid poll()'ing constantly when buffer is full by resetting a
27282e4e9a1Sdormando * flag periodically.
27382e4e9a1Sdormando */
27482e4e9a1Sdormando while (!w->failed_flush &&
2751eec5d41Sdormando (skip_scr = (char *) bipbuf_request(w->buf, scratch_len + 128)) == NULL) {
27682e4e9a1Sdormando if (logger_thread_poll_watchers(0, x) <= 0) {
27782e4e9a1Sdormando L_DEBUG("LOGGER: Watcher had no free space for line of size (%d)\n", line_size);
27882e4e9a1Sdormando w->failed_flush = true;
27982e4e9a1Sdormando }
2801b03a9c6Sdormando }
2811b03a9c6Sdormando
2821b03a9c6Sdormando if (w->failed_flush) {
2831b03a9c6Sdormando L_DEBUG("LOGGER: Fast skipped for watcher [%d] due to failed_flush\n", w->sfd);
284a4f8b982Sdormando w->skipped++;
2850503b5e2Sdormando ls->watcher_skipped++;
28682e4e9a1Sdormando continue;
28782e4e9a1Sdormando }
28882e4e9a1Sdormando
28982e4e9a1Sdormando if (w->skipped > 0) {
290663d39faSdormando total = snprintf(skip_scr, 128, "skipped=%llu\n", (unsigned long long) w->skipped);
2911b03a9c6Sdormando if (total >= 128 || total <= 0) {
2921b03a9c6Sdormando L_DEBUG("LOGGER: Failed to flatten skipped message into watcher [%d]\n", w->sfd);
2931b03a9c6Sdormando w->skipped++;
2940503b5e2Sdormando ls->watcher_skipped++;
29582e4e9a1Sdormando continue;
29682e4e9a1Sdormando }
2971b03a9c6Sdormando bipbuf_push(w->buf, total + 1);
2981b03a9c6Sdormando w->skipped = 0;
29982e4e9a1Sdormando }
30082e4e9a1Sdormando /* Can't fail because bipbuf_request succeeded. */
3011eec5d41Sdormando bipbuf_offer(w->buf, (unsigned char *) scratch, scratch_len);
3020503b5e2Sdormando ls->watcher_sent++;
3031b03a9c6Sdormando }
304916fff36Sdormando }
305916fff36Sdormando
306916fff36Sdormando /* Called with logger stack locked.
307916fff36Sdormando * Releases every chunk associated with a watcher and closes the connection.
308916fff36Sdormando * We can't presently send a connection back to the worker for further
309916fff36Sdormando * processing.
310916fff36Sdormando */
logger_thread_close_watcher(logger_watcher * w)311426310bcSdormando static void logger_thread_close_watcher(logger_watcher *w) {
312916fff36Sdormando L_DEBUG("LOGGER: Closing dead watcher\n");
313916fff36Sdormando watchers[w->id] = NULL;
314916fff36Sdormando sidethread_conn_close(w->c);
315916fff36Sdormando watcher_count--;
316a4f8b982Sdormando bipbuf_free(w->buf);
317916fff36Sdormando free(w);
31873407543Sdormando logger_set_flags();
319916fff36Sdormando }
320916fff36Sdormando
321916fff36Sdormando /* Reads a particular worker thread's available bipbuf bytes. Parses each log
322a4f8b982Sdormando * entry into the watcher buffers.
323916fff36Sdormando */
logger_thread_read(logger * l,struct logger_stats * ls)3240503b5e2Sdormando static int logger_thread_read(logger *l, struct logger_stats *ls) {
325916fff36Sdormando unsigned int size;
326916fff36Sdormando unsigned int pos = 0;
327916fff36Sdormando unsigned char *data;
3281eec5d41Sdormando char scratch[LOGGER_PARSE_SCRATCH];
329916fff36Sdormando logentry *e;
330916fff36Sdormando pthread_mutex_lock(&l->mutex);
331916fff36Sdormando data = bipbuf_peek_all(l->buf, &size);
332916fff36Sdormando pthread_mutex_unlock(&l->mutex);
333916fff36Sdormando
334916fff36Sdormando if (data == NULL) {
335916fff36Sdormando return 0;
336916fff36Sdormando }
337a4f8b982Sdormando L_DEBUG("LOGGER: Got %d bytes from bipbuffer\n", size);
338916fff36Sdormando
339916fff36Sdormando /* parse buffer */
340916fff36Sdormando while (pos < size && watcher_count > 0) {
341916fff36Sdormando enum logger_parse_entry_ret ret;
3421eec5d41Sdormando int scratch_len = 0;
343916fff36Sdormando e = (logentry *) (data + pos);
3441eec5d41Sdormando ret = logger_thread_parse_entry(e, ls, scratch, &scratch_len);
3451b03a9c6Sdormando if (ret != LOGGER_PARSE_ENTRY_OK) {
346426310bcSdormando /* TODO: stats counter */
347916fff36Sdormando fprintf(stderr, "LOGGER: Failed to parse log entry\n");
3481eec5d41Sdormando } else {
3491eec5d41Sdormando logger_thread_write_entry(e, ls, scratch, scratch_len);
350916fff36Sdormando }
351916fff36Sdormando pos += sizeof(logentry) + e->size;
352916fff36Sdormando }
353916fff36Sdormando assert(pos <= size);
354916fff36Sdormando
355916fff36Sdormando pthread_mutex_lock(&l->mutex);
356916fff36Sdormando data = bipbuf_poll(l->buf, size);
3570503b5e2Sdormando ls->worker_written += l->written;
3580503b5e2Sdormando ls->worker_dropped += l->dropped;
3590503b5e2Sdormando l->written = 0;
3600503b5e2Sdormando l->dropped = 0;
361916fff36Sdormando pthread_mutex_unlock(&l->mutex);
362916fff36Sdormando if (data == NULL) {
363916fff36Sdormando fprintf(stderr, "LOGGER: unexpectedly couldn't advance buf pointer\n");
364663d39faSdormando assert(0);
365916fff36Sdormando }
366916fff36Sdormando return size; /* maybe the count of objects iterated? */
367916fff36Sdormando }
368916fff36Sdormando
369916fff36Sdormando /* Since the event loop code isn't reusable without a refactor, and we have a
370916fff36Sdormando * limited number of potential watchers, we run our own poll loop.
371916fff36Sdormando * This calls poll() unnecessarily during write flushes, should be possible to
372916fff36Sdormando * micro-optimize later.
373916fff36Sdormando *
3741b03a9c6Sdormando * This flushes buffers attached to watchers, iterating through the bytes set
375916fff36Sdormando * to each worker. Also checks for readability in case client connection was
376916fff36Sdormando * closed.
3771b03a9c6Sdormando *
3781b03a9c6Sdormando * Allows a specific watcher to be flushed (if buf full)
379916fff36Sdormando */
logger_thread_poll_watchers(int force_poll,int watcher)380426310bcSdormando static int logger_thread_poll_watchers(int force_poll, int watcher) {
381916fff36Sdormando int x;
382916fff36Sdormando int nfd = 0;
383d5f7f89aSdormando unsigned char *data;
384d5f7f89aSdormando unsigned int data_size = 0;
3851b03a9c6Sdormando int flushed = 0;
386916fff36Sdormando
387916fff36Sdormando for (x = 0; x < WATCHER_LIMIT; x++) {
388a4f8b982Sdormando logger_watcher *w = watchers[x];
3891b03a9c6Sdormando if (w == NULL || (watcher != WATCHER_ALL && x != watcher))
390916fff36Sdormando continue;
391916fff36Sdormando
392a4f8b982Sdormando data = bipbuf_peek_all(w->buf, &data_size);
393a4f8b982Sdormando if (data != NULL) {
394916fff36Sdormando watchers_pollfds[nfd].fd = w->sfd;
395916fff36Sdormando watchers_pollfds[nfd].events = POLLOUT;
396916fff36Sdormando nfd++;
397916fff36Sdormando } else if (force_poll) {
398916fff36Sdormando watchers_pollfds[nfd].fd = w->sfd;
399916fff36Sdormando watchers_pollfds[nfd].events = POLLIN;
400916fff36Sdormando nfd++;
401916fff36Sdormando }
4021b03a9c6Sdormando /* This gets set after a call to poll, and should be used to gate on
4031b03a9c6Sdormando * calling poll again.
4041b03a9c6Sdormando */
4051b03a9c6Sdormando w->failed_flush = false;
406916fff36Sdormando }
407916fff36Sdormando
408916fff36Sdormando if (nfd == 0)
4091b03a9c6Sdormando return 0;
410916fff36Sdormando
411a4f8b982Sdormando //L_DEBUG("LOGGER: calling poll() [data_size: %d]\n", data_size);
412916fff36Sdormando int ret = poll(watchers_pollfds, nfd, 0);
413916fff36Sdormando
414916fff36Sdormando if (ret < 0) {
415916fff36Sdormando perror("something failed with logger thread watcher fd polling");
4161b03a9c6Sdormando return -1;
417916fff36Sdormando }
418916fff36Sdormando
419916fff36Sdormando nfd = 0;
420916fff36Sdormando for (x = 0; x < WATCHER_LIMIT; x++) {
421a4f8b982Sdormando logger_watcher *w = watchers[x];
422916fff36Sdormando if (w == NULL)
423916fff36Sdormando continue;
424a4f8b982Sdormando
425a4f8b982Sdormando data_size = 0;
426916fff36Sdormando /* Early detection of a disconnect. Otherwise we have to wait until
427916fff36Sdormando * the next write
428916fff36Sdormando */
429916fff36Sdormando if (watchers_pollfds[nfd].revents & POLLIN) {
430916fff36Sdormando char buf[1];
431916fff36Sdormando int res = read(w->sfd, buf, 1);
432916fff36Sdormando if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK))) {
433426310bcSdormando logger_thread_close_watcher(w);
434916fff36Sdormando nfd++;
435916fff36Sdormando continue;
436916fff36Sdormando }
437916fff36Sdormando }
438a4f8b982Sdormando if ((data = bipbuf_peek_all(w->buf, &data_size)) != NULL) {
439916fff36Sdormando if (watchers_pollfds[nfd].revents & (POLLHUP|POLLERR)) {
440916fff36Sdormando L_DEBUG("LOGGER: watcher closed during poll() call\n");
441426310bcSdormando logger_thread_close_watcher(w);
442916fff36Sdormando } else if (watchers_pollfds[nfd].revents & POLLOUT) {
443916fff36Sdormando int total = 0;
444d5f7f89aSdormando
445916fff36Sdormando /* We can write a bit. */
446916fff36Sdormando switch (w->t) {
447916fff36Sdormando case LOGGER_WATCHER_STDERR:
448a4f8b982Sdormando total = fwrite(data, 1, data_size, stderr);
449916fff36Sdormando break;
450916fff36Sdormando case LOGGER_WATCHER_CLIENT:
451a4f8b982Sdormando total = write(w->sfd, data, data_size);
452916fff36Sdormando break;
453916fff36Sdormando }
454916fff36Sdormando
455a4f8b982Sdormando L_DEBUG("LOGGER: poll() wrote %d to %d (data_size: %d) (bipbuf_used: %d)\n", total, w->sfd,
456a4f8b982Sdormando data_size, bipbuf_used(w->buf));
457916fff36Sdormando if (total == -1) {
458916fff36Sdormando if (errno != EAGAIN && errno != EWOULDBLOCK) {
459426310bcSdormando logger_thread_close_watcher(w);
460916fff36Sdormando }
461916fff36Sdormando L_DEBUG("LOGGER: watcher hit EAGAIN\n");
462916fff36Sdormando } else if (total == 0) {
463426310bcSdormando logger_thread_close_watcher(w);
464d5f7f89aSdormando } else {
465a4f8b982Sdormando bipbuf_poll(w->buf, total);
4661b03a9c6Sdormando flushed += total;
467916fff36Sdormando }
468d5f7f89aSdormando }
469916fff36Sdormando }
470916fff36Sdormando nfd++;
471916fff36Sdormando }
4721b03a9c6Sdormando return flushed;
473916fff36Sdormando }
474d5f7f89aSdormando
logger_thread_sum_stats(struct logger_stats * ls)4750503b5e2Sdormando static void logger_thread_sum_stats(struct logger_stats *ls) {
4760503b5e2Sdormando STATS_LOCK();
4770503b5e2Sdormando stats.log_worker_dropped += ls->worker_dropped;
4780503b5e2Sdormando stats.log_worker_written += ls->worker_written;
4790503b5e2Sdormando stats.log_watcher_skipped += ls->watcher_skipped;
4800503b5e2Sdormando stats.log_watcher_sent += ls->watcher_sent;
4810503b5e2Sdormando STATS_UNLOCK();
4820503b5e2Sdormando }
4830503b5e2Sdormando
484916fff36Sdormando #define MAX_LOGGER_SLEEP 100000
485916fff36Sdormando #define MIN_LOGGER_SLEEP 0
486916fff36Sdormando
487916fff36Sdormando /* Primary logger thread routine */
logger_thread(void * arg)488916fff36Sdormando static void *logger_thread(void *arg) {
489916fff36Sdormando useconds_t to_sleep = MIN_LOGGER_SLEEP;
490916fff36Sdormando L_DEBUG("LOGGER: Starting logger thread\n");
491916fff36Sdormando while (do_run_logger_thread) {
492916fff36Sdormando int found_logs = 0;
4930503b5e2Sdormando logger *l;
4940503b5e2Sdormando struct logger_stats ls;
4950503b5e2Sdormando memset(&ls, 0, sizeof(struct logger_stats));
496916fff36Sdormando if (to_sleep)
497916fff36Sdormando usleep(to_sleep);
498916fff36Sdormando
499916fff36Sdormando /* Call function to iterate each logger. */
500916fff36Sdormando pthread_mutex_lock(&logger_stack_lock);
5010503b5e2Sdormando for (l = logger_stack_head; l != NULL; l=l->next) {
5020503b5e2Sdormando /* lock logger, call function to manipulate it */
5030503b5e2Sdormando found_logs += logger_thread_read(l, &ls);
5040503b5e2Sdormando }
5050503b5e2Sdormando
506426310bcSdormando logger_thread_poll_watchers(1, WATCHER_ALL);
507916fff36Sdormando pthread_mutex_unlock(&logger_stack_lock);
508916fff36Sdormando
509916fff36Sdormando /* TODO: abstract into a function and share with lru_crawler */
510916fff36Sdormando if (!found_logs) {
511916fff36Sdormando if (to_sleep < MAX_LOGGER_SLEEP)
512916fff36Sdormando to_sleep += 50;
513916fff36Sdormando } else {
514916fff36Sdormando to_sleep /= 2;
515916fff36Sdormando if (to_sleep < 50)
516916fff36Sdormando to_sleep = MIN_LOGGER_SLEEP;
517916fff36Sdormando }
5180503b5e2Sdormando logger_thread_sum_stats(&ls);
519916fff36Sdormando }
520916fff36Sdormando
521916fff36Sdormando return NULL;
522916fff36Sdormando }
523916fff36Sdormando
start_logger_thread(void)524916fff36Sdormando static int start_logger_thread(void) {
525916fff36Sdormando int ret;
526916fff36Sdormando do_run_logger_thread = 1;
527916fff36Sdormando if ((ret = pthread_create(&logger_tid, NULL,
528916fff36Sdormando logger_thread, NULL)) != 0) {
529916fff36Sdormando fprintf(stderr, "Can't start logger thread: %s\n", strerror(ret));
530916fff36Sdormando return -1;
531916fff36Sdormando }
532916fff36Sdormando return 0;
533916fff36Sdormando }
534916fff36Sdormando
535916fff36Sdormando // future.
536916fff36Sdormando /*static int stop_logger_thread(void) {
537916fff36Sdormando do_run_logger_thread = 0;
538916fff36Sdormando pthread_join(logger_tid, NULL);
539916fff36Sdormando return 0;
540916fff36Sdormando }*/
541916fff36Sdormando
542426310bcSdormando /*************************
543426310bcSdormando * Public functions for submitting logs and starting loggers from workers.
544426310bcSdormando *************************/
545426310bcSdormando
546916fff36Sdormando /* Global logger thread start/init */
logger_init(void)547916fff36Sdormando void logger_init(void) {
548916fff36Sdormando /* TODO: auto destructor when threads exit */
549916fff36Sdormando /* TODO: error handling */
550916fff36Sdormando
551916fff36Sdormando /* init stack for iterating loggers */
552916fff36Sdormando logger_stack_head = 0;
553916fff36Sdormando logger_stack_tail = 0;
554916fff36Sdormando pthread_key_create(&logger_key, NULL);
555f27a1022Sdormando logger_uriencode_init();
556916fff36Sdormando
557916fff36Sdormando if (start_logger_thread() != 0) {
558916fff36Sdormando abort();
559916fff36Sdormando }
5600503b5e2Sdormando
5610503b5e2Sdormando /* This can be removed once the global stats initializer is improved */
5620503b5e2Sdormando STATS_LOCK();
5630503b5e2Sdormando stats.log_worker_dropped = 0;
5640503b5e2Sdormando stats.log_worker_written = 0;
5650503b5e2Sdormando stats.log_watcher_skipped = 0;
5660503b5e2Sdormando stats.log_watcher_sent = 0;
5670503b5e2Sdormando STATS_UNLOCK();
568663d39faSdormando /* This is what adding a STDERR watcher looks like. should replace old
569663d39faSdormando * "verbose" settings. */
570916fff36Sdormando //logger_add_watcher(NULL, 0);
571916fff36Sdormando return;
572916fff36Sdormando }
573916fff36Sdormando
574916fff36Sdormando /* called *from* the thread using a logger.
575916fff36Sdormando * initializes the per-thread bipbuf, links it into the list of loggers
576916fff36Sdormando */
logger_create(void)577916fff36Sdormando logger *logger_create(void) {
578916fff36Sdormando L_DEBUG("LOGGER: Creating and linking new logger instance\n");
579916fff36Sdormando logger *l = calloc(1, sizeof(logger));
580916fff36Sdormando if (l == NULL) {
581916fff36Sdormando return NULL;
582916fff36Sdormando }
583916fff36Sdormando
584d704f2c0Sdormando l->buf = bipbuf_new(settings.logger_buf_size);
585916fff36Sdormando if (l->buf == NULL) {
586916fff36Sdormando free(l);
587916fff36Sdormando return NULL;
588916fff36Sdormando }
589916fff36Sdormando
590916fff36Sdormando l->entry_map = default_entries;
591916fff36Sdormando
592916fff36Sdormando pthread_mutex_init(&l->mutex, NULL);
593916fff36Sdormando pthread_setspecific(logger_key, l);
594916fff36Sdormando
595916fff36Sdormando /* add to list of loggers */
596916fff36Sdormando logger_link_q(l);
597916fff36Sdormando return l;
598916fff36Sdormando }
599916fff36Sdormando
600cb8257e3Sdormando /* helpers for logger_log */
601cb8257e3Sdormando
_logger_log_evictions(logentry * e,item * it)602cb8257e3Sdormando static void _logger_log_evictions(logentry *e, item *it) {
603cb8257e3Sdormando struct logentry_eviction *le = (struct logentry_eviction *) e->data;
604cb8257e3Sdormando le->exptime = (it->exptime > 0) ? (long long int)(it->exptime - current_time) : (long long int) -1;
605cb8257e3Sdormando le->latime = current_time - it->time;
606cb8257e3Sdormando le->it_flags = it->it_flags;
607cb8257e3Sdormando le->nkey = it->nkey;
608cb8257e3Sdormando memcpy(le->key, ITEM_key(it), it->nkey);
609cb8257e3Sdormando e->size = sizeof(struct logentry_eviction) + le->nkey;
610cb8257e3Sdormando }
611cb8257e3Sdormando
612c7fbccebSdormando /* 0 == nf, 1 == found. 2 == flushed. 3 == expired.
613c7fbccebSdormando * might be useful to store/print the flags an item has?
614c7fbccebSdormando * could also collapse this and above code into an "item status" struct. wait
615c7fbccebSdormando * for more endpoints to be written before making it generic, though.
6164267ed80Sdormando * TODO: This and below should track and reprint the client fd.
617c7fbccebSdormando */
_logger_log_item_get(logentry * e,const int was_found,const char * key,const int nkey)618c7fbccebSdormando static void _logger_log_item_get(logentry *e, const int was_found, const char *key, const int nkey) {
619c7fbccebSdormando struct logentry_item_get *le = (struct logentry_item_get *) e->data;
620c7fbccebSdormando le->was_found = was_found;
621c7fbccebSdormando le->nkey = nkey;
622c7fbccebSdormando memcpy(le->key, key, nkey);
623c7fbccebSdormando e->size = sizeof(struct logentry_item_get) + nkey;
624c7fbccebSdormando }
625c7fbccebSdormando
_logger_log_item_store(logentry * e,const enum store_item_type status,const int comm,char * key,const int nkey)626c7fbccebSdormando static void _logger_log_item_store(logentry *e, const enum store_item_type status,
627c7fbccebSdormando const int comm, char *key, const int nkey) {
628c7fbccebSdormando struct logentry_item_store *le = (struct logentry_item_store *) e->data;
629c7fbccebSdormando le->status = status;
630c7fbccebSdormando le->cmd = comm;
631c7fbccebSdormando le->nkey = nkey;
632c7fbccebSdormando memcpy(le->key, key, nkey);
633c7fbccebSdormando e->size = sizeof(struct logentry_item_store) + nkey;
634c7fbccebSdormando }
635c7fbccebSdormando
636916fff36Sdormando /* Public function for logging an entry.
637916fff36Sdormando * Tries to encapsulate as much of the formatting as possible to simplify the
638916fff36Sdormando * caller's code.
639916fff36Sdormando */
logger_log(logger * l,const enum log_entry_type event,const void * entry,...)640916fff36Sdormando enum logger_ret_type logger_log(logger *l, const enum log_entry_type event, const void *entry, ...) {
641916fff36Sdormando bipbuf_t *buf = l->buf;
642916fff36Sdormando bool nospace = false;
643916fff36Sdormando va_list ap;
644916fff36Sdormando int total = 0;
645916fff36Sdormando logentry *e;
646916fff36Sdormando
647916fff36Sdormando const entry_details *d = &l->entry_map[event];
648916fff36Sdormando int reqlen = d->reqlen;
649916fff36Sdormando
650916fff36Sdormando pthread_mutex_lock(&l->mutex);
651916fff36Sdormando /* Request a maximum length of data to write to */
652916fff36Sdormando e = (logentry *) bipbuf_request(buf, (sizeof(logentry) + reqlen));
653916fff36Sdormando if (e == NULL) {
654916fff36Sdormando pthread_mutex_unlock(&l->mutex);
6550503b5e2Sdormando l->dropped++;
656916fff36Sdormando return LOGGER_RET_NOSPACE;
657916fff36Sdormando }
658cb8257e3Sdormando e->gid = logger_get_gid();
659916fff36Sdormando e->event = d->subtype;
660c7fbccebSdormando /* TODO: Could pass this down as an argument now that we're using
661c7fbccebSdormando * LOGGER_LOG() macro.
662c7fbccebSdormando */
663a4f8b982Sdormando e->eflags = d->eflags;
664a4f8b982Sdormando /* Noting time isn't optional. A feature may be added to avoid rendering
665a4f8b982Sdormando * time and/or gid to a logger.
666a4f8b982Sdormando */
667a4f8b982Sdormando gettimeofday(&e->tv, NULL);
668916fff36Sdormando
669916fff36Sdormando switch (d->subtype) {
670916fff36Sdormando case LOGGER_TEXT_ENTRY:
671916fff36Sdormando va_start(ap, entry);
672916fff36Sdormando total = vsnprintf((char *) e->data, reqlen, d->format, ap);
673916fff36Sdormando va_end(ap);
674916fff36Sdormando if (total >= reqlen || total <= 0) {
675916fff36Sdormando fprintf(stderr, "LOGGER: Failed to vsnprintf a text entry: (total) %d\n", total);
676916fff36Sdormando break;
677916fff36Sdormando }
678916fff36Sdormando e->size = total + 1; /* null byte */
679916fff36Sdormando
680916fff36Sdormando break;
681916fff36Sdormando case LOGGER_EVICTION_ENTRY:
682cb8257e3Sdormando _logger_log_evictions(e, (item *)entry);
683916fff36Sdormando break;
684c7fbccebSdormando case LOGGER_ITEM_GET_ENTRY:
685c7fbccebSdormando va_start(ap, entry);
686c7fbccebSdormando int was_found = va_arg(ap, int);
687c7fbccebSdormando char *key = va_arg(ap, char *);
688c7fbccebSdormando size_t nkey = va_arg(ap, size_t);
689c7fbccebSdormando _logger_log_item_get(e, was_found, key, nkey);
690c7fbccebSdormando va_end(ap);
691c7fbccebSdormando break;
692c7fbccebSdormando case LOGGER_ITEM_STORE_ENTRY:
693c7fbccebSdormando va_start(ap, entry);
694c7fbccebSdormando enum store_item_type status = va_arg(ap, enum store_item_type);
695c7fbccebSdormando int comm = va_arg(ap, int);
696c7fbccebSdormando char *skey = va_arg(ap, char *);
697c7fbccebSdormando size_t snkey = va_arg(ap, size_t);
698c7fbccebSdormando _logger_log_item_store(e, status, comm, skey, snkey);
699c7fbccebSdormando break;
700916fff36Sdormando }
701916fff36Sdormando
702916fff36Sdormando /* Push pointer forward by the actual amount required */
703916fff36Sdormando if (bipbuf_push(buf, (sizeof(logentry) + e->size)) == 0) {
704916fff36Sdormando fprintf(stderr, "LOGGER: Failed to bipbuf push a text entry\n");
705916fff36Sdormando pthread_mutex_unlock(&l->mutex);
706916fff36Sdormando return LOGGER_RET_ERR;
707916fff36Sdormando }
7080503b5e2Sdormando l->written++;
709916fff36Sdormando L_DEBUG("LOGGER: Requested %d bytes, wrote %d bytes\n", reqlen, total + 1);
710916fff36Sdormando
711916fff36Sdormando pthread_mutex_unlock(&l->mutex);
712916fff36Sdormando
713916fff36Sdormando if (nospace) {
714916fff36Sdormando return LOGGER_RET_NOSPACE;
715916fff36Sdormando } else {
716916fff36Sdormando return LOGGER_RET_OK;
717916fff36Sdormando }
718916fff36Sdormando }
719916fff36Sdormando
720916fff36Sdormando /* Passes a client connection socket from a primary worker thread to the
721916fff36Sdormando * logger thread. Caller *must* event_del() the client before handing it over.
722916fff36Sdormando * Presently there's no way to hand the client back to the worker thread.
723916fff36Sdormando */
logger_add_watcher(void * c,const int sfd,uint16_t f)7246a4e8e1fSdormando enum logger_add_watcher_ret logger_add_watcher(void *c, const int sfd, uint16_t f) {
725916fff36Sdormando int x;
726916fff36Sdormando logger_watcher *w = NULL;
727916fff36Sdormando pthread_mutex_lock(&logger_stack_lock);
728916fff36Sdormando if (watcher_count >= WATCHER_LIMIT) {
729916fff36Sdormando return LOGGER_ADD_WATCHER_TOO_MANY;
730916fff36Sdormando }
731916fff36Sdormando
732916fff36Sdormando for (x = 0; x < WATCHER_LIMIT; x++) {
733916fff36Sdormando if (watchers[x] == NULL)
734916fff36Sdormando break;
735916fff36Sdormando }
736916fff36Sdormando
737916fff36Sdormando w = calloc(1, sizeof(logger_watcher));
738a4f8b982Sdormando if (w == NULL) {
739a4f8b982Sdormando pthread_mutex_unlock(&logger_stack_lock);
740916fff36Sdormando return LOGGER_ADD_WATCHER_FAILED;
741a4f8b982Sdormando }
742916fff36Sdormando w->c = c;
743916fff36Sdormando w->sfd = sfd;
744916fff36Sdormando if (sfd == 0 && c == NULL) {
745916fff36Sdormando w->t = LOGGER_WATCHER_STDERR;
746916fff36Sdormando } else {
747916fff36Sdormando w->t = LOGGER_WATCHER_CLIENT;
748916fff36Sdormando }
749916fff36Sdormando w->id = x;
7506a4e8e1fSdormando w->eflags = f;
751d704f2c0Sdormando w->buf = bipbuf_new(settings.logger_watcher_buf_size);
752a4f8b982Sdormando if (w->buf == NULL) {
753a4f8b982Sdormando free(w);
754a4f8b982Sdormando pthread_mutex_unlock(&logger_stack_lock);
755a4f8b982Sdormando return LOGGER_ADD_WATCHER_FAILED;
756916fff36Sdormando }
757efa436feSdormando bipbuf_offer(w->buf, (unsigned char *) "OK\r\n", 4);
758a4f8b982Sdormando
759916fff36Sdormando watchers[x] = w;
760916fff36Sdormando watcher_count++;
76173407543Sdormando /* Update what flags the global logs will watch */
76273407543Sdormando logger_set_flags();
763916fff36Sdormando
764916fff36Sdormando pthread_mutex_unlock(&logger_stack_lock);
765916fff36Sdormando return LOGGER_ADD_WATCHER_OK;
766916fff36Sdormando }
767