xref: /memcached-1.4.29/logger.c (revision 60e98600)
1916fff36Sdormando /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2916fff36Sdormando 
3916fff36Sdormando #include <stdlib.h>
4725cdf4dSMathieu CARBONNEAUX #include <stdio.h>
5916fff36Sdormando #include <string.h>
6916fff36Sdormando #include <errno.h>
7916fff36Sdormando #include <poll.h>
8f27a1022Sdormando #include <ctype.h>
9916fff36Sdormando 
10916fff36Sdormando #include "memcached.h"
11916fff36Sdormando #include "bipbuffer.h"
12916fff36Sdormando 
13a4f8b982Sdormando #ifdef LOGGER_DEBUG
14916fff36Sdormando #define L_DEBUG(...) \
15916fff36Sdormando     do { \
16916fff36Sdormando         fprintf(stderr, __VA_ARGS__); \
17916fff36Sdormando     } while (0)
18916fff36Sdormando #else
19916fff36Sdormando #define L_DEBUG(...)
20916fff36Sdormando #endif
21916fff36Sdormando 
22916fff36Sdormando 
23916fff36Sdormando /* TODO: put this in a struct and ditch the global vars. */
24916fff36Sdormando static logger *logger_stack_head = NULL;
25916fff36Sdormando static logger *logger_stack_tail = NULL;
26916fff36Sdormando static unsigned int logger_count = 0;
27916fff36Sdormando static volatile int do_run_logger_thread = 1;
28916fff36Sdormando static pthread_t logger_tid;
29916fff36Sdormando pthread_mutex_t logger_stack_lock = PTHREAD_MUTEX_INITIALIZER;
30916fff36Sdormando 
31916fff36Sdormando pthread_key_t logger_key;
32916fff36Sdormando 
33*60e98600Sdormando #if !defined(HAVE_GCC_64ATOMICS) && !defined(__sun)
34916fff36Sdormando pthread_mutex_t logger_atomics_mutex = PTHREAD_MUTEX_INITIALIZER;
35916fff36Sdormando #endif
36916fff36Sdormando 
37916fff36Sdormando #define WATCHER_LIMIT 20
38916fff36Sdormando logger_watcher *watchers[20];
39916fff36Sdormando struct pollfd watchers_pollfds[20];
40916fff36Sdormando int watcher_count = 0;
41916fff36Sdormando 
42f27a1022Sdormando static char *logger_uriencode_map[256];
43f27a1022Sdormando static char logger_uriencode_str[768];
44f27a1022Sdormando 
45916fff36Sdormando /* Should this go somewhere else? */
46916fff36Sdormando static const entry_details default_entries[] = {
47d5f7f89aSdormando     [LOGGER_ASCII_CMD] = {LOGGER_TEXT_ENTRY, 512, LOG_RAWCMDS, "<%d %s"},
48c7fbccebSdormando     [LOGGER_EVICTION] = {LOGGER_EVICTION_ENTRY, 512, LOG_EVICTIONS, NULL},
49c7fbccebSdormando     [LOGGER_ITEM_GET] = {LOGGER_ITEM_GET_ENTRY, 512, LOG_FETCHERS, NULL},
50c7fbccebSdormando     [LOGGER_ITEM_STORE] = {LOGGER_ITEM_STORE_ENTRY, 512, LOG_MUTATIONS, NULL}
51916fff36Sdormando };
52916fff36Sdormando 
531b03a9c6Sdormando #define WATCHER_ALL -1
54426310bcSdormando static int logger_thread_poll_watchers(int force_poll, int watcher);
55426310bcSdormando 
56426310bcSdormando /*************************
57426310bcSdormando  * Util functions shared between bg thread and workers
58426310bcSdormando  *************************/
59916fff36Sdormando 
logger_uriencode_init(void)60f27a1022Sdormando static void logger_uriencode_init(void) {
61f27a1022Sdormando     int x;
62f27a1022Sdormando     char *str = logger_uriencode_str;
63f27a1022Sdormando     for (x = 0; x < 256; x++) {
64f27a1022Sdormando         if (isalnum(x) || x == '-' || x == '.' || x == '_' || x == '~') {
65f27a1022Sdormando             logger_uriencode_map[x] = NULL;
66f27a1022Sdormando         } else {
67f27a1022Sdormando             snprintf(str, 4, "%%%02X", x);
68f27a1022Sdormando             logger_uriencode_map[x] = str;
69f27a1022Sdormando             str += 3; /* lobbing off the \0 is fine */
70f27a1022Sdormando         }
71f27a1022Sdormando     }
72f27a1022Sdormando }
73f27a1022Sdormando 
logger_uriencode(const char * src,char * dst,const size_t srclen,const size_t dstlen)74f27a1022Sdormando static bool logger_uriencode(const char *src, char *dst, const size_t srclen, const size_t dstlen) {
75f27a1022Sdormando     int x;
76f27a1022Sdormando     size_t d = 0;
77f27a1022Sdormando     for (x = 0; x < srclen; x++) {
78f27a1022Sdormando         if (d + 4 >= dstlen)
79f27a1022Sdormando             return false;
80f27a1022Sdormando         if (logger_uriencode_map[(unsigned char) src[x]] != NULL) {
81f27a1022Sdormando             memcpy(&dst[d], logger_uriencode_map[(unsigned char) src[x]], 3);
82f27a1022Sdormando             d += 3;
83f27a1022Sdormando         } else {
84f27a1022Sdormando             dst[d] = src[x];
85f27a1022Sdormando             d++;
86f27a1022Sdormando         }
87f27a1022Sdormando     }
88f27a1022Sdormando     dst[d] = '\0';
89f27a1022Sdormando     return true;
90f27a1022Sdormando }
91f27a1022Sdormando 
92916fff36Sdormando /* Logger GID's can be used by watchers to put logs back into strict order
93916fff36Sdormando  */
logger_get_gid(void)94916fff36Sdormando static uint64_t logger_get_gid(void) {
95916fff36Sdormando     static uint64_t logger_gid = 0;
96*60e98600Sdormando #ifdef HAVE_GCC_64ATOMICS
97916fff36Sdormando     return __sync_add_and_fetch(&logger_gid, 1);
98916fff36Sdormando #elif defined(__sun)
99916fff36Sdormando     return atomic_inc_64_nv(&logger_gid);
100916fff36Sdormando #else
101916fff36Sdormando     mutex_lock(&logger_atomics_mutex);
102916fff36Sdormando     uint64_t res = ++logger_gid;
103916fff36Sdormando     mutex_unlock(&logger_atomics_mutex);
104916fff36Sdormando     return res;
105916fff36Sdormando #endif
106916fff36Sdormando }
107916fff36Sdormando 
108916fff36Sdormando /* TODO: genericize lists. would be nice to import queue.h if the impact is
109916fff36Sdormando  * studied... otherwise can just write a local one.
110916fff36Sdormando  */
111916fff36Sdormando /* Add to the list of threads with a logger object */
logger_link_q(logger * l)112916fff36Sdormando static void logger_link_q(logger *l) {
113916fff36Sdormando     pthread_mutex_lock(&logger_stack_lock);
114916fff36Sdormando     assert(l != logger_stack_head);
115916fff36Sdormando 
116916fff36Sdormando     l->prev = 0;
117916fff36Sdormando     l->next = logger_stack_head;
118916fff36Sdormando     if (l->next) l->next->prev = l;
119916fff36Sdormando     logger_stack_head = l;
120916fff36Sdormando     if (logger_stack_tail == 0) logger_stack_tail = l;
121916fff36Sdormando     logger_count++;
122916fff36Sdormando     pthread_mutex_unlock(&logger_stack_lock);
123916fff36Sdormando     return;
124916fff36Sdormando }
125916fff36Sdormando 
126916fff36Sdormando /* Remove from the list of threads with a logger object */
127916fff36Sdormando /*static void logger_unlink_q(logger *l) {
128916fff36Sdormando     pthread_mutex_lock(&logger_stack_lock);
129916fff36Sdormando     if (logger_stack_head == l) {
130916fff36Sdormando         assert(l->prev == 0);
131916fff36Sdormando         logger_stack_head = l->next;
132916fff36Sdormando     }
133916fff36Sdormando     if (logger_stack_tail == l) {
134916fff36Sdormando         assert(l->next == 0);
135916fff36Sdormando         logger_stack_tail = l->prev;
136916fff36Sdormando     }
137916fff36Sdormando     assert(l->next != l);
138916fff36Sdormando     assert(l->prev != l);
139916fff36Sdormando 
140916fff36Sdormando     if (l->next) l->next->prev = l->prev;
141916fff36Sdormando     if (l->prev) l->prev->next = l->next;
142916fff36Sdormando     logger_count--;
143916fff36Sdormando     pthread_mutex_unlock(&logger_stack_lock);
144916fff36Sdormando     return;
145916fff36Sdormando }*/
146916fff36Sdormando 
147426310bcSdormando /* Called with logger stack locked.
148426310bcSdormando  * Iterates over every watcher collecting enabled flags.
149426310bcSdormando  */
logger_set_flags(void)150426310bcSdormando static void logger_set_flags(void) {
151426310bcSdormando     logger *l = NULL;
152426310bcSdormando     int x = 0;
153426310bcSdormando     uint16_t f = 0; /* logger eflags */
154426310bcSdormando 
155426310bcSdormando     for (x = 0; x < WATCHER_LIMIT; x++) {
156426310bcSdormando         logger_watcher *w = watchers[x];
157426310bcSdormando         if (w == NULL)
158426310bcSdormando             continue;
159426310bcSdormando 
160426310bcSdormando         f |= w->eflags;
161426310bcSdormando     }
162426310bcSdormando     for (l = logger_stack_head; l != NULL; l=l->next) {
163426310bcSdormando         pthread_mutex_lock(&l->mutex);
164426310bcSdormando         l->eflags = f;
165426310bcSdormando         pthread_mutex_unlock(&l->mutex);
166426310bcSdormando     }
167426310bcSdormando     return;
168426310bcSdormando }
169426310bcSdormando 
170426310bcSdormando /*************************
171426310bcSdormando  * Logger background thread functions. Aggregates per-worker buffers and
172426310bcSdormando  * writes to any watchers.
173426310bcSdormando  *************************/
174426310bcSdormando 
175a4f8b982Sdormando #define LOGGER_PARSE_SCRATCH 4096
1761eec5d41Sdormando 
_logger_thread_parse_ise(logentry * e,char * scratch)1771eec5d41Sdormando static int _logger_thread_parse_ise(logentry *e, char *scratch) {
1781eec5d41Sdormando     int total;
1791eec5d41Sdormando     const char *cmd = "na";
1801eec5d41Sdormando     char keybuf[KEY_MAX_LENGTH * 3 + 1];
1811eec5d41Sdormando     struct logentry_item_store *le = (struct logentry_item_store *) e->data;
1821eec5d41Sdormando     const char * const status_map[] = {
1831eec5d41Sdormando         "stored", "exists", "not_found", "too_large", "no_memory" };
1841eec5d41Sdormando     const char * const cmd_map[] = {
1851eec5d41Sdormando         "add", "set", "replace", "append", "prepend", "cas" };
1861eec5d41Sdormando 
1871eec5d41Sdormando     if (le->cmd <= 5)
1881eec5d41Sdormando         cmd = cmd_map[le->cmd];
1891eec5d41Sdormando 
1901eec5d41Sdormando     logger_uriencode(le->key, keybuf, le->nkey, LOGGER_PARSE_SCRATCH);
1911eec5d41Sdormando     total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
1921eec5d41Sdormando             "ts=%d.%d gid=%llu type=item_store key=%s status=%s cmd=%s\n",
1931eec5d41Sdormando             (int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
1941eec5d41Sdormando             keybuf, status_map[le->status], cmd);
1951eec5d41Sdormando     return total;
1961eec5d41Sdormando }
1971eec5d41Sdormando 
_logger_thread_parse_ige(logentry * e,char * scratch)1981eec5d41Sdormando static int _logger_thread_parse_ige(logentry *e, char *scratch) {
1991eec5d41Sdormando     int total;
2001eec5d41Sdormando     struct logentry_item_get *le = (struct logentry_item_get *) e->data;
2011eec5d41Sdormando     char keybuf[KEY_MAX_LENGTH * 3 + 1];
2021eec5d41Sdormando     const char * const was_found_map[] = {
2031eec5d41Sdormando         "not_found", "found", "flushed", "expired" };
2041eec5d41Sdormando 
2051eec5d41Sdormando     logger_uriencode(le->key, keybuf, le->nkey, LOGGER_PARSE_SCRATCH);
2061eec5d41Sdormando     total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
2071eec5d41Sdormando             "ts=%d.%d gid=%llu type=item_get key=%s status=%s\n",
2081eec5d41Sdormando             (int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
2091eec5d41Sdormando             keybuf, was_found_map[le->was_found]);
2101eec5d41Sdormando     return total;
2111eec5d41Sdormando }
2121eec5d41Sdormando 
_logger_thread_parse_ee(logentry * e,char * scratch)2131eec5d41Sdormando static int _logger_thread_parse_ee(logentry *e, char *scratch) {
2141eec5d41Sdormando     int total;
2151eec5d41Sdormando     char keybuf[KEY_MAX_LENGTH * 3 + 1];
2161eec5d41Sdormando     struct logentry_eviction *le = (struct logentry_eviction *) e->data;
2171eec5d41Sdormando     logger_uriencode(le->key, keybuf, le->nkey, LOGGER_PARSE_SCRATCH);
2181eec5d41Sdormando     total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
2191eec5d41Sdormando             "ts=%d.%d gid=%llu type=eviction key=%s fetch=%s ttl=%lld la=%d\n",
2201eec5d41Sdormando             (int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
2211eec5d41Sdormando             keybuf, (le->it_flags & ITEM_FETCHED) ? "yes" : "no",
2221eec5d41Sdormando             (long long int)le->exptime, le->latime);
2231eec5d41Sdormando 
2241eec5d41Sdormando     return total;
2251eec5d41Sdormando }
2261eec5d41Sdormando 
2271eec5d41Sdormando /* Completes rendering of log line. */
logger_thread_parse_entry(logentry * e,struct logger_stats * ls,char * scratch,int * scratch_len)2281eec5d41Sdormando static enum logger_parse_entry_ret logger_thread_parse_entry(logentry *e, struct logger_stats *ls,
2291eec5d41Sdormando         char *scratch, int *scratch_len) {
230916fff36Sdormando     int total = 0;
231916fff36Sdormando 
232916fff36Sdormando     switch (e->event) {
233916fff36Sdormando         case LOGGER_TEXT_ENTRY:
234cb8257e3Sdormando             total = snprintf(scratch, LOGGER_PARSE_SCRATCH, "ts=%d.%d gid=%llu %s\n",
235cb8257e3Sdormando                         (int)e->tv.tv_sec, (int)e->tv.tv_usec,
236cb8257e3Sdormando                         (unsigned long long) e->gid, (char *) e->data);
237cb8257e3Sdormando             break;
2381eec5d41Sdormando         case LOGGER_EVICTION_ENTRY:
2391eec5d41Sdormando             total = _logger_thread_parse_ee(e, scratch);
240cb8257e3Sdormando             break;
2411eec5d41Sdormando         case LOGGER_ITEM_GET_ENTRY:
2421eec5d41Sdormando             total = _logger_thread_parse_ige(e, scratch);
243c7fbccebSdormando             break;
2441eec5d41Sdormando         case LOGGER_ITEM_STORE_ENTRY:
2451eec5d41Sdormando             total = _logger_thread_parse_ise(e, scratch);
246c7fbccebSdormando             break;
247c7fbccebSdormando 
248cb8257e3Sdormando     }
249cb8257e3Sdormando 
250a4f8b982Sdormando     if (total >= LOGGER_PARSE_SCRATCH || total <= 0) {
251d5f7f89aSdormando         L_DEBUG("LOGGER: Failed to flatten log entry!\n");
252d5f7f89aSdormando         return LOGGER_PARSE_ENTRY_FAILED;
253916fff36Sdormando     } else {
2541eec5d41Sdormando         *scratch_len = total + 1;
255916fff36Sdormando     }
256916fff36Sdormando 
2571eec5d41Sdormando     return LOGGER_PARSE_ENTRY_OK;
2581eec5d41Sdormando }
2591eec5d41Sdormando 
2601eec5d41Sdormando /* Writes flattened entry to available watchers */
logger_thread_write_entry(logentry * e,struct logger_stats * ls,char * scratch,int scratch_len)2611eec5d41Sdormando static void logger_thread_write_entry(logentry *e, struct logger_stats *ls,
2621eec5d41Sdormando         char *scratch, int scratch_len) {
2631eec5d41Sdormando     int x, total;
26482e4e9a1Sdormando     /* Write the line into available watchers with matching flags */
265a4f8b982Sdormando     for (x = 0; x < WATCHER_LIMIT; x++) {
266a4f8b982Sdormando         logger_watcher *w = watchers[x];
26782e4e9a1Sdormando         char *skip_scr = NULL;
26882e4e9a1Sdormando         if (w == NULL || (e->eflags & w->eflags) == 0)
269a4f8b982Sdormando             continue;
270a4f8b982Sdormando 
27182e4e9a1Sdormando          /* Avoid poll()'ing constantly when buffer is full by resetting a
27282e4e9a1Sdormando          * flag periodically.
27382e4e9a1Sdormando          */
27482e4e9a1Sdormando         while (!w->failed_flush &&
2751eec5d41Sdormando                 (skip_scr = (char *) bipbuf_request(w->buf, scratch_len + 128)) == NULL) {
27682e4e9a1Sdormando             if (logger_thread_poll_watchers(0, x) <= 0) {
27782e4e9a1Sdormando                 L_DEBUG("LOGGER: Watcher had no free space for line of size (%d)\n", line_size);
27882e4e9a1Sdormando                 w->failed_flush = true;
27982e4e9a1Sdormando             }
2801b03a9c6Sdormando         }
2811b03a9c6Sdormando 
2821b03a9c6Sdormando         if (w->failed_flush) {
2831b03a9c6Sdormando             L_DEBUG("LOGGER: Fast skipped for watcher [%d] due to failed_flush\n", w->sfd);
284a4f8b982Sdormando             w->skipped++;
2850503b5e2Sdormando             ls->watcher_skipped++;
28682e4e9a1Sdormando             continue;
28782e4e9a1Sdormando         }
28882e4e9a1Sdormando 
28982e4e9a1Sdormando         if (w->skipped > 0) {
290663d39faSdormando             total = snprintf(skip_scr, 128, "skipped=%llu\n", (unsigned long long) w->skipped);
2911b03a9c6Sdormando             if (total >= 128 || total <= 0) {
2921b03a9c6Sdormando                 L_DEBUG("LOGGER: Failed to flatten skipped message into watcher [%d]\n", w->sfd);
2931b03a9c6Sdormando                 w->skipped++;
2940503b5e2Sdormando                 ls->watcher_skipped++;
29582e4e9a1Sdormando                 continue;
29682e4e9a1Sdormando             }
2971b03a9c6Sdormando             bipbuf_push(w->buf, total + 1);
2981b03a9c6Sdormando             w->skipped = 0;
29982e4e9a1Sdormando         }
30082e4e9a1Sdormando         /* Can't fail because bipbuf_request succeeded. */
3011eec5d41Sdormando         bipbuf_offer(w->buf, (unsigned char *) scratch, scratch_len);
3020503b5e2Sdormando         ls->watcher_sent++;
3031b03a9c6Sdormando     }
304916fff36Sdormando }
305916fff36Sdormando 
306916fff36Sdormando /* Called with logger stack locked.
307916fff36Sdormando  * Releases every chunk associated with a watcher and closes the connection.
308916fff36Sdormando  * We can't presently send a connection back to the worker for further
309916fff36Sdormando  * processing.
310916fff36Sdormando  */
logger_thread_close_watcher(logger_watcher * w)311426310bcSdormando static void logger_thread_close_watcher(logger_watcher *w) {
312916fff36Sdormando     L_DEBUG("LOGGER: Closing dead watcher\n");
313916fff36Sdormando     watchers[w->id] = NULL;
314916fff36Sdormando     sidethread_conn_close(w->c);
315916fff36Sdormando     watcher_count--;
316a4f8b982Sdormando     bipbuf_free(w->buf);
317916fff36Sdormando     free(w);
31873407543Sdormando     logger_set_flags();
319916fff36Sdormando }
320916fff36Sdormando 
321916fff36Sdormando /* Reads a particular worker thread's available bipbuf bytes. Parses each log
322a4f8b982Sdormando  * entry into the watcher buffers.
323916fff36Sdormando  */
logger_thread_read(logger * l,struct logger_stats * ls)3240503b5e2Sdormando static int logger_thread_read(logger *l, struct logger_stats *ls) {
325916fff36Sdormando     unsigned int size;
326916fff36Sdormando     unsigned int pos = 0;
327916fff36Sdormando     unsigned char *data;
3281eec5d41Sdormando     char scratch[LOGGER_PARSE_SCRATCH];
329916fff36Sdormando     logentry *e;
330916fff36Sdormando     pthread_mutex_lock(&l->mutex);
331916fff36Sdormando     data = bipbuf_peek_all(l->buf, &size);
332916fff36Sdormando     pthread_mutex_unlock(&l->mutex);
333916fff36Sdormando 
334916fff36Sdormando     if (data == NULL) {
335916fff36Sdormando         return 0;
336916fff36Sdormando     }
337a4f8b982Sdormando     L_DEBUG("LOGGER: Got %d bytes from bipbuffer\n", size);
338916fff36Sdormando 
339916fff36Sdormando     /* parse buffer */
340916fff36Sdormando     while (pos < size && watcher_count > 0) {
341916fff36Sdormando         enum logger_parse_entry_ret ret;
3421eec5d41Sdormando         int scratch_len = 0;
343916fff36Sdormando         e = (logentry *) (data + pos);
3441eec5d41Sdormando         ret = logger_thread_parse_entry(e, ls, scratch, &scratch_len);
3451b03a9c6Sdormando         if (ret != LOGGER_PARSE_ENTRY_OK) {
346426310bcSdormando             /* TODO: stats counter */
347916fff36Sdormando             fprintf(stderr, "LOGGER: Failed to parse log entry\n");
3481eec5d41Sdormando         } else {
3491eec5d41Sdormando             logger_thread_write_entry(e, ls, scratch, scratch_len);
350916fff36Sdormando         }
351916fff36Sdormando         pos += sizeof(logentry) + e->size;
352916fff36Sdormando     }
353916fff36Sdormando     assert(pos <= size);
354916fff36Sdormando 
355916fff36Sdormando     pthread_mutex_lock(&l->mutex);
356916fff36Sdormando     data = bipbuf_poll(l->buf, size);
3570503b5e2Sdormando     ls->worker_written += l->written;
3580503b5e2Sdormando     ls->worker_dropped += l->dropped;
3590503b5e2Sdormando     l->written = 0;
3600503b5e2Sdormando     l->dropped = 0;
361916fff36Sdormando     pthread_mutex_unlock(&l->mutex);
362916fff36Sdormando     if (data == NULL) {
363916fff36Sdormando         fprintf(stderr, "LOGGER: unexpectedly couldn't advance buf pointer\n");
364663d39faSdormando         assert(0);
365916fff36Sdormando     }
366916fff36Sdormando     return size; /* maybe the count of objects iterated? */
367916fff36Sdormando }
368916fff36Sdormando 
369916fff36Sdormando /* Since the event loop code isn't reusable without a refactor, and we have a
370916fff36Sdormando  * limited number of potential watchers, we run our own poll loop.
371916fff36Sdormando  * This calls poll() unnecessarily during write flushes, should be possible to
372916fff36Sdormando  * micro-optimize later.
373916fff36Sdormando  *
3741b03a9c6Sdormando  * This flushes buffers attached to watchers, iterating through the bytes set
375916fff36Sdormando  * to each worker. Also checks for readability in case client connection was
376916fff36Sdormando  * closed.
3771b03a9c6Sdormando  *
3781b03a9c6Sdormando  * Allows a specific watcher to be flushed (if buf full)
379916fff36Sdormando  */
logger_thread_poll_watchers(int force_poll,int watcher)380426310bcSdormando static int logger_thread_poll_watchers(int force_poll, int watcher) {
381916fff36Sdormando     int x;
382916fff36Sdormando     int nfd = 0;
383d5f7f89aSdormando     unsigned char *data;
384d5f7f89aSdormando     unsigned int data_size = 0;
3851b03a9c6Sdormando     int flushed = 0;
386916fff36Sdormando 
387916fff36Sdormando     for (x = 0; x < WATCHER_LIMIT; x++) {
388a4f8b982Sdormando         logger_watcher *w = watchers[x];
3891b03a9c6Sdormando         if (w == NULL || (watcher != WATCHER_ALL && x != watcher))
390916fff36Sdormando             continue;
391916fff36Sdormando 
392a4f8b982Sdormando         data = bipbuf_peek_all(w->buf, &data_size);
393a4f8b982Sdormando         if (data != NULL) {
394916fff36Sdormando             watchers_pollfds[nfd].fd = w->sfd;
395916fff36Sdormando             watchers_pollfds[nfd].events = POLLOUT;
396916fff36Sdormando             nfd++;
397916fff36Sdormando         } else if (force_poll) {
398916fff36Sdormando             watchers_pollfds[nfd].fd = w->sfd;
399916fff36Sdormando             watchers_pollfds[nfd].events = POLLIN;
400916fff36Sdormando             nfd++;
401916fff36Sdormando         }
4021b03a9c6Sdormando         /* This gets set after a call to poll, and should be used to gate on
4031b03a9c6Sdormando          * calling poll again.
4041b03a9c6Sdormando          */
4051b03a9c6Sdormando         w->failed_flush = false;
406916fff36Sdormando     }
407916fff36Sdormando 
408916fff36Sdormando     if (nfd == 0)
4091b03a9c6Sdormando         return 0;
410916fff36Sdormando 
411a4f8b982Sdormando     //L_DEBUG("LOGGER: calling poll() [data_size: %d]\n", data_size);
412916fff36Sdormando     int ret = poll(watchers_pollfds, nfd, 0);
413916fff36Sdormando 
414916fff36Sdormando     if (ret < 0) {
415916fff36Sdormando         perror("something failed with logger thread watcher fd polling");
4161b03a9c6Sdormando         return -1;
417916fff36Sdormando     }
418916fff36Sdormando 
419916fff36Sdormando     nfd = 0;
420916fff36Sdormando     for (x = 0; x < WATCHER_LIMIT; x++) {
421a4f8b982Sdormando         logger_watcher *w = watchers[x];
422916fff36Sdormando         if (w == NULL)
423916fff36Sdormando             continue;
424a4f8b982Sdormando 
425a4f8b982Sdormando         data_size = 0;
426916fff36Sdormando         /* Early detection of a disconnect. Otherwise we have to wait until
427916fff36Sdormando          * the next write
428916fff36Sdormando          */
429916fff36Sdormando         if (watchers_pollfds[nfd].revents & POLLIN) {
430916fff36Sdormando             char buf[1];
431916fff36Sdormando             int res = read(w->sfd, buf, 1);
432916fff36Sdormando             if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK))) {
433426310bcSdormando                 logger_thread_close_watcher(w);
434916fff36Sdormando                 nfd++;
435916fff36Sdormando                 continue;
436916fff36Sdormando             }
437916fff36Sdormando         }
438a4f8b982Sdormando         if ((data = bipbuf_peek_all(w->buf, &data_size)) != NULL) {
439916fff36Sdormando             if (watchers_pollfds[nfd].revents & (POLLHUP|POLLERR)) {
440916fff36Sdormando                 L_DEBUG("LOGGER: watcher closed during poll() call\n");
441426310bcSdormando                 logger_thread_close_watcher(w);
442916fff36Sdormando             } else if (watchers_pollfds[nfd].revents & POLLOUT) {
443916fff36Sdormando                 int total = 0;
444d5f7f89aSdormando 
445916fff36Sdormando                 /* We can write a bit. */
446916fff36Sdormando                 switch (w->t) {
447916fff36Sdormando                     case LOGGER_WATCHER_STDERR:
448a4f8b982Sdormando                         total = fwrite(data, 1, data_size, stderr);
449916fff36Sdormando                         break;
450916fff36Sdormando                     case LOGGER_WATCHER_CLIENT:
451a4f8b982Sdormando                         total = write(w->sfd, data, data_size);
452916fff36Sdormando                         break;
453916fff36Sdormando                 }
454916fff36Sdormando 
455a4f8b982Sdormando                 L_DEBUG("LOGGER: poll() wrote %d to %d (data_size: %d) (bipbuf_used: %d)\n", total, w->sfd,
456a4f8b982Sdormando                         data_size, bipbuf_used(w->buf));
457916fff36Sdormando                 if (total == -1) {
458916fff36Sdormando                     if (errno != EAGAIN && errno != EWOULDBLOCK) {
459426310bcSdormando                         logger_thread_close_watcher(w);
460916fff36Sdormando                     }
461916fff36Sdormando                     L_DEBUG("LOGGER: watcher hit EAGAIN\n");
462916fff36Sdormando                 } else if (total == 0) {
463426310bcSdormando                     logger_thread_close_watcher(w);
464d5f7f89aSdormando                 } else {
465a4f8b982Sdormando                     bipbuf_poll(w->buf, total);
4661b03a9c6Sdormando                     flushed += total;
467916fff36Sdormando                 }
468d5f7f89aSdormando             }
469916fff36Sdormando         }
470916fff36Sdormando         nfd++;
471916fff36Sdormando     }
4721b03a9c6Sdormando     return flushed;
473916fff36Sdormando }
474d5f7f89aSdormando 
logger_thread_sum_stats(struct logger_stats * ls)4750503b5e2Sdormando static void logger_thread_sum_stats(struct logger_stats *ls) {
4760503b5e2Sdormando     STATS_LOCK();
4770503b5e2Sdormando     stats.log_worker_dropped  += ls->worker_dropped;
4780503b5e2Sdormando     stats.log_worker_written  += ls->worker_written;
4790503b5e2Sdormando     stats.log_watcher_skipped += ls->watcher_skipped;
4800503b5e2Sdormando     stats.log_watcher_sent    += ls->watcher_sent;
4810503b5e2Sdormando     STATS_UNLOCK();
4820503b5e2Sdormando }
4830503b5e2Sdormando 
484916fff36Sdormando #define MAX_LOGGER_SLEEP 100000
485916fff36Sdormando #define MIN_LOGGER_SLEEP 0
486916fff36Sdormando 
487916fff36Sdormando /* Primary logger thread routine */
logger_thread(void * arg)488916fff36Sdormando static void *logger_thread(void *arg) {
489916fff36Sdormando     useconds_t to_sleep = MIN_LOGGER_SLEEP;
490916fff36Sdormando     L_DEBUG("LOGGER: Starting logger thread\n");
491916fff36Sdormando     while (do_run_logger_thread) {
492916fff36Sdormando         int found_logs = 0;
4930503b5e2Sdormando         logger *l;
4940503b5e2Sdormando         struct logger_stats ls;
4950503b5e2Sdormando         memset(&ls, 0, sizeof(struct logger_stats));
496916fff36Sdormando         if (to_sleep)
497916fff36Sdormando             usleep(to_sleep);
498916fff36Sdormando 
499916fff36Sdormando         /* Call function to iterate each logger. */
500916fff36Sdormando         pthread_mutex_lock(&logger_stack_lock);
5010503b5e2Sdormando         for (l = logger_stack_head; l != NULL; l=l->next) {
5020503b5e2Sdormando             /* lock logger, call function to manipulate it */
5030503b5e2Sdormando             found_logs += logger_thread_read(l, &ls);
5040503b5e2Sdormando         }
5050503b5e2Sdormando 
506426310bcSdormando         logger_thread_poll_watchers(1, WATCHER_ALL);
507916fff36Sdormando         pthread_mutex_unlock(&logger_stack_lock);
508916fff36Sdormando 
509916fff36Sdormando         /* TODO: abstract into a function and share with lru_crawler */
510916fff36Sdormando         if (!found_logs) {
511916fff36Sdormando             if (to_sleep < MAX_LOGGER_SLEEP)
512916fff36Sdormando                 to_sleep += 50;
513916fff36Sdormando         } else {
514916fff36Sdormando             to_sleep /= 2;
515916fff36Sdormando             if (to_sleep < 50)
516916fff36Sdormando                 to_sleep = MIN_LOGGER_SLEEP;
517916fff36Sdormando         }
5180503b5e2Sdormando         logger_thread_sum_stats(&ls);
519916fff36Sdormando     }
520916fff36Sdormando 
521916fff36Sdormando     return NULL;
522916fff36Sdormando }
523916fff36Sdormando 
start_logger_thread(void)524916fff36Sdormando static int start_logger_thread(void) {
525916fff36Sdormando     int ret;
526916fff36Sdormando     do_run_logger_thread = 1;
527916fff36Sdormando     if ((ret = pthread_create(&logger_tid, NULL,
528916fff36Sdormando                               logger_thread, NULL)) != 0) {
529916fff36Sdormando         fprintf(stderr, "Can't start logger thread: %s\n", strerror(ret));
530916fff36Sdormando         return -1;
531916fff36Sdormando     }
532916fff36Sdormando     return 0;
533916fff36Sdormando }
534916fff36Sdormando 
535916fff36Sdormando // future.
536916fff36Sdormando /*static int stop_logger_thread(void) {
537916fff36Sdormando     do_run_logger_thread = 0;
538916fff36Sdormando     pthread_join(logger_tid, NULL);
539916fff36Sdormando     return 0;
540916fff36Sdormando }*/
541916fff36Sdormando 
542426310bcSdormando /*************************
543426310bcSdormando  * Public functions for submitting logs and starting loggers from workers.
544426310bcSdormando  *************************/
545426310bcSdormando 
546916fff36Sdormando /* Global logger thread start/init */
logger_init(void)547916fff36Sdormando void logger_init(void) {
548916fff36Sdormando     /* TODO: auto destructor when threads exit */
549916fff36Sdormando     /* TODO: error handling */
550916fff36Sdormando 
551916fff36Sdormando     /* init stack for iterating loggers */
552916fff36Sdormando     logger_stack_head = 0;
553916fff36Sdormando     logger_stack_tail = 0;
554916fff36Sdormando     pthread_key_create(&logger_key, NULL);
555f27a1022Sdormando     logger_uriencode_init();
556916fff36Sdormando 
557916fff36Sdormando     if (start_logger_thread() != 0) {
558916fff36Sdormando         abort();
559916fff36Sdormando     }
5600503b5e2Sdormando 
5610503b5e2Sdormando     /* This can be removed once the global stats initializer is improved */
5620503b5e2Sdormando     STATS_LOCK();
5630503b5e2Sdormando     stats.log_worker_dropped = 0;
5640503b5e2Sdormando     stats.log_worker_written = 0;
5650503b5e2Sdormando     stats.log_watcher_skipped = 0;
5660503b5e2Sdormando     stats.log_watcher_sent = 0;
5670503b5e2Sdormando     STATS_UNLOCK();
568663d39faSdormando     /* This is what adding a STDERR watcher looks like. should replace old
569663d39faSdormando      * "verbose" settings. */
570916fff36Sdormando     //logger_add_watcher(NULL, 0);
571916fff36Sdormando     return;
572916fff36Sdormando }
573916fff36Sdormando 
574916fff36Sdormando /* called *from* the thread using a logger.
575916fff36Sdormando  * initializes the per-thread bipbuf, links it into the list of loggers
576916fff36Sdormando  */
logger_create(void)577916fff36Sdormando logger *logger_create(void) {
578916fff36Sdormando     L_DEBUG("LOGGER: Creating and linking new logger instance\n");
579916fff36Sdormando     logger *l = calloc(1, sizeof(logger));
580916fff36Sdormando     if (l == NULL) {
581916fff36Sdormando         return NULL;
582916fff36Sdormando     }
583916fff36Sdormando 
584d704f2c0Sdormando     l->buf = bipbuf_new(settings.logger_buf_size);
585916fff36Sdormando     if (l->buf == NULL) {
586916fff36Sdormando         free(l);
587916fff36Sdormando         return NULL;
588916fff36Sdormando     }
589916fff36Sdormando 
590916fff36Sdormando     l->entry_map = default_entries;
591916fff36Sdormando 
592916fff36Sdormando     pthread_mutex_init(&l->mutex, NULL);
593916fff36Sdormando     pthread_setspecific(logger_key, l);
594916fff36Sdormando 
595916fff36Sdormando     /* add to list of loggers */
596916fff36Sdormando     logger_link_q(l);
597916fff36Sdormando     return l;
598916fff36Sdormando }
599916fff36Sdormando 
600cb8257e3Sdormando /* helpers for logger_log */
601cb8257e3Sdormando 
_logger_log_evictions(logentry * e,item * it)602cb8257e3Sdormando static void _logger_log_evictions(logentry *e, item *it) {
603cb8257e3Sdormando     struct logentry_eviction *le = (struct logentry_eviction *) e->data;
604cb8257e3Sdormando     le->exptime = (it->exptime > 0) ? (long long int)(it->exptime - current_time) : (long long int) -1;
605cb8257e3Sdormando     le->latime = current_time - it->time;
606cb8257e3Sdormando     le->it_flags = it->it_flags;
607cb8257e3Sdormando     le->nkey = it->nkey;
608cb8257e3Sdormando     memcpy(le->key, ITEM_key(it), it->nkey);
609cb8257e3Sdormando     e->size = sizeof(struct logentry_eviction) + le->nkey;
610cb8257e3Sdormando }
611cb8257e3Sdormando 
612c7fbccebSdormando /* 0 == nf, 1 == found. 2 == flushed. 3 == expired.
613c7fbccebSdormando  * might be useful to store/print the flags an item has?
614c7fbccebSdormando  * could also collapse this and above code into an "item status" struct. wait
615c7fbccebSdormando  * for more endpoints to be written before making it generic, though.
6164267ed80Sdormando  * TODO: This and below should track and reprint the client fd.
617c7fbccebSdormando  */
_logger_log_item_get(logentry * e,const int was_found,const char * key,const int nkey)618c7fbccebSdormando static void _logger_log_item_get(logentry *e, const int was_found, const char *key, const int nkey) {
619c7fbccebSdormando     struct logentry_item_get *le = (struct logentry_item_get *) e->data;
620c7fbccebSdormando     le->was_found = was_found;
621c7fbccebSdormando     le->nkey = nkey;
622c7fbccebSdormando     memcpy(le->key, key, nkey);
623c7fbccebSdormando     e->size = sizeof(struct logentry_item_get) + nkey;
624c7fbccebSdormando }
625c7fbccebSdormando 
_logger_log_item_store(logentry * e,const enum store_item_type status,const int comm,char * key,const int nkey)626c7fbccebSdormando static void _logger_log_item_store(logentry *e, const enum store_item_type status,
627c7fbccebSdormando         const int comm, char *key, const int nkey) {
628c7fbccebSdormando     struct logentry_item_store *le = (struct logentry_item_store *) e->data;
629c7fbccebSdormando     le->status = status;
630c7fbccebSdormando     le->cmd = comm;
631c7fbccebSdormando     le->nkey = nkey;
632c7fbccebSdormando     memcpy(le->key, key, nkey);
633c7fbccebSdormando     e->size = sizeof(struct logentry_item_store) + nkey;
634c7fbccebSdormando }
635c7fbccebSdormando 
636916fff36Sdormando /* Public function for logging an entry.
637916fff36Sdormando  * Tries to encapsulate as much of the formatting as possible to simplify the
638916fff36Sdormando  * caller's code.
639916fff36Sdormando  */
logger_log(logger * l,const enum log_entry_type event,const void * entry,...)640916fff36Sdormando enum logger_ret_type logger_log(logger *l, const enum log_entry_type event, const void *entry, ...) {
641916fff36Sdormando     bipbuf_t *buf = l->buf;
642916fff36Sdormando     bool nospace = false;
643916fff36Sdormando     va_list ap;
644916fff36Sdormando     int total = 0;
645916fff36Sdormando     logentry *e;
646916fff36Sdormando 
647916fff36Sdormando     const entry_details *d = &l->entry_map[event];
648916fff36Sdormando     int reqlen = d->reqlen;
649916fff36Sdormando 
650916fff36Sdormando     pthread_mutex_lock(&l->mutex);
651916fff36Sdormando     /* Request a maximum length of data to write to */
652916fff36Sdormando     e = (logentry *) bipbuf_request(buf, (sizeof(logentry) + reqlen));
653916fff36Sdormando     if (e == NULL) {
654916fff36Sdormando         pthread_mutex_unlock(&l->mutex);
6550503b5e2Sdormando         l->dropped++;
656916fff36Sdormando         return LOGGER_RET_NOSPACE;
657916fff36Sdormando     }
658cb8257e3Sdormando     e->gid = logger_get_gid();
659916fff36Sdormando     e->event = d->subtype;
660c7fbccebSdormando     /* TODO: Could pass this down as an argument now that we're using
661c7fbccebSdormando      * LOGGER_LOG() macro.
662c7fbccebSdormando      */
663a4f8b982Sdormando     e->eflags = d->eflags;
664a4f8b982Sdormando     /* Noting time isn't optional. A feature may be added to avoid rendering
665a4f8b982Sdormando      * time and/or gid to a logger.
666a4f8b982Sdormando      */
667a4f8b982Sdormando     gettimeofday(&e->tv, NULL);
668916fff36Sdormando 
669916fff36Sdormando     switch (d->subtype) {
670916fff36Sdormando         case LOGGER_TEXT_ENTRY:
671916fff36Sdormando             va_start(ap, entry);
672916fff36Sdormando             total = vsnprintf((char *) e->data, reqlen, d->format, ap);
673916fff36Sdormando             va_end(ap);
674916fff36Sdormando             if (total >= reqlen || total <= 0) {
675916fff36Sdormando                 fprintf(stderr, "LOGGER: Failed to vsnprintf a text entry: (total) %d\n", total);
676916fff36Sdormando                 break;
677916fff36Sdormando             }
678916fff36Sdormando             e->size = total + 1; /* null byte */
679916fff36Sdormando 
680916fff36Sdormando             break;
681916fff36Sdormando         case LOGGER_EVICTION_ENTRY:
682cb8257e3Sdormando             _logger_log_evictions(e, (item *)entry);
683916fff36Sdormando             break;
684c7fbccebSdormando         case LOGGER_ITEM_GET_ENTRY:
685c7fbccebSdormando             va_start(ap, entry);
686c7fbccebSdormando             int was_found = va_arg(ap, int);
687c7fbccebSdormando             char *key = va_arg(ap, char *);
688c7fbccebSdormando             size_t nkey = va_arg(ap, size_t);
689c7fbccebSdormando             _logger_log_item_get(e, was_found, key, nkey);
690c7fbccebSdormando             va_end(ap);
691c7fbccebSdormando             break;
692c7fbccebSdormando         case LOGGER_ITEM_STORE_ENTRY:
693c7fbccebSdormando             va_start(ap, entry);
694c7fbccebSdormando             enum store_item_type status = va_arg(ap, enum store_item_type);
695c7fbccebSdormando             int comm = va_arg(ap, int);
696c7fbccebSdormando             char *skey = va_arg(ap, char *);
697c7fbccebSdormando             size_t snkey = va_arg(ap, size_t);
698c7fbccebSdormando             _logger_log_item_store(e, status, comm, skey, snkey);
699c7fbccebSdormando             break;
700916fff36Sdormando     }
701916fff36Sdormando 
702916fff36Sdormando     /* Push pointer forward by the actual amount required */
703916fff36Sdormando     if (bipbuf_push(buf, (sizeof(logentry) + e->size)) == 0) {
704916fff36Sdormando         fprintf(stderr, "LOGGER: Failed to bipbuf push a text entry\n");
705916fff36Sdormando         pthread_mutex_unlock(&l->mutex);
706916fff36Sdormando         return LOGGER_RET_ERR;
707916fff36Sdormando     }
7080503b5e2Sdormando     l->written++;
709916fff36Sdormando     L_DEBUG("LOGGER: Requested %d bytes, wrote %d bytes\n", reqlen, total + 1);
710916fff36Sdormando 
711916fff36Sdormando     pthread_mutex_unlock(&l->mutex);
712916fff36Sdormando 
713916fff36Sdormando     if (nospace) {
714916fff36Sdormando         return LOGGER_RET_NOSPACE;
715916fff36Sdormando     } else {
716916fff36Sdormando         return LOGGER_RET_OK;
717916fff36Sdormando     }
718916fff36Sdormando }
719916fff36Sdormando 
720916fff36Sdormando /* Passes a client connection socket from a primary worker thread to the
721916fff36Sdormando  * logger thread. Caller *must* event_del() the client before handing it over.
722916fff36Sdormando  * Presently there's no way to hand the client back to the worker thread.
723916fff36Sdormando  */
logger_add_watcher(void * c,const int sfd,uint16_t f)7246a4e8e1fSdormando enum logger_add_watcher_ret logger_add_watcher(void *c, const int sfd, uint16_t f) {
725916fff36Sdormando     int x;
726916fff36Sdormando     logger_watcher *w = NULL;
727916fff36Sdormando     pthread_mutex_lock(&logger_stack_lock);
728916fff36Sdormando     if (watcher_count >= WATCHER_LIMIT) {
729916fff36Sdormando         return LOGGER_ADD_WATCHER_TOO_MANY;
730916fff36Sdormando     }
731916fff36Sdormando 
732916fff36Sdormando     for (x = 0; x < WATCHER_LIMIT; x++) {
733916fff36Sdormando         if (watchers[x] == NULL)
734916fff36Sdormando             break;
735916fff36Sdormando     }
736916fff36Sdormando 
737916fff36Sdormando     w = calloc(1, sizeof(logger_watcher));
738a4f8b982Sdormando     if (w == NULL) {
739a4f8b982Sdormando         pthread_mutex_unlock(&logger_stack_lock);
740916fff36Sdormando         return LOGGER_ADD_WATCHER_FAILED;
741a4f8b982Sdormando     }
742916fff36Sdormando     w->c = c;
743916fff36Sdormando     w->sfd = sfd;
744916fff36Sdormando     if (sfd == 0 && c == NULL) {
745916fff36Sdormando         w->t = LOGGER_WATCHER_STDERR;
746916fff36Sdormando     } else {
747916fff36Sdormando         w->t = LOGGER_WATCHER_CLIENT;
748916fff36Sdormando     }
749916fff36Sdormando     w->id = x;
7506a4e8e1fSdormando     w->eflags = f;
751d704f2c0Sdormando     w->buf = bipbuf_new(settings.logger_watcher_buf_size);
752a4f8b982Sdormando     if (w->buf == NULL) {
753a4f8b982Sdormando         free(w);
754a4f8b982Sdormando         pthread_mutex_unlock(&logger_stack_lock);
755a4f8b982Sdormando         return LOGGER_ADD_WATCHER_FAILED;
756916fff36Sdormando     }
757efa436feSdormando     bipbuf_offer(w->buf, (unsigned char *) "OK\r\n", 4);
758a4f8b982Sdormando 
759916fff36Sdormando     watchers[x] = w;
760916fff36Sdormando     watcher_count++;
76173407543Sdormando     /* Update what flags the global logs will watch */
76273407543Sdormando     logger_set_flags();
763916fff36Sdormando 
764916fff36Sdormando     pthread_mutex_unlock(&logger_stack_lock);
765916fff36Sdormando     return LOGGER_ADD_WATCHER_OK;
766916fff36Sdormando }
767