1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 
3 #include <arpa/inet.h>
4 #include <stdlib.h>
5 #include <stdio.h>
6 #include <string.h>
7 #include <errno.h>
8 #include <poll.h>
9 #include <ctype.h>
10 #include <stdarg.h>
11 
12 #if defined(__sun)
13 #include <atomic.h>
14 #endif
15 
16 #include "memcached.h"
17 #include "bipbuffer.h"
18 
19 #ifdef LOGGER_DEBUG
20 #define L_DEBUG(...) \
21     do { \
22         fprintf(stderr, __VA_ARGS__); \
23     } while (0)
24 #else
25 #define L_DEBUG(...)
26 #endif
27 
28 
29 /* TODO: put this in a struct and ditch the global vars. */
30 static logger *logger_stack_head = NULL;
31 static logger *logger_stack_tail = NULL;
32 static unsigned int logger_count = 0;
33 static volatile int do_run_logger_thread = 1;
34 static pthread_t logger_tid;
35 pthread_mutex_t logger_stack_lock = PTHREAD_MUTEX_INITIALIZER;
36 pthread_cond_t logger_stack_cond = PTHREAD_COND_INITIALIZER;
37 
38 pthread_key_t logger_key;
39 
40 #if !defined(HAVE_GCC_64ATOMICS) && !defined(__sun)
41 pthread_mutex_t logger_atomics_mutex = PTHREAD_MUTEX_INITIALIZER;
42 #endif
43 
44 #define WATCHER_LIMIT 20
45 logger_watcher *watchers[20];
46 struct pollfd watchers_pollfds[20];
47 int watcher_count = 0;
48 
49 #define WATCHER_ALL -1
50 static int logger_thread_poll_watchers(int force_poll, int watcher);
51 
52 /* helpers for logger_log */
53 
_logger_log_text(logentry * e,const entry_details * d,const void * entry,va_list ap)54 static void _logger_log_text(logentry *e, const entry_details *d, const void *entry, va_list ap) {
55     int reqlen = d->reqlen;
56     int total = vsnprintf((char *) e->data, reqlen, d->format, ap);
57     if (total <= 0) {
58         fprintf(stderr, "LOGGER: Failed to vsnprintf a text entry: (total) %d\n", total);
59     }
60     e->size = total + 1; // null byte
61 }
62 
_logger_log_evictions(logentry * e,const entry_details * d,const void * entry,va_list ap)63 static void _logger_log_evictions(logentry *e, const entry_details *d, const void *entry, va_list ap) {
64     item *it = (item *)entry;
65     struct logentry_eviction *le = (struct logentry_eviction *) e->data;
66 
67     le->exptime = (it->exptime > 0) ? (long long int)(it->exptime - current_time) : (long long int) -1;
68     le->latime = current_time - it->time;
69     le->it_flags = it->it_flags;
70     le->nkey = it->nkey;
71     le->nbytes = it->nbytes;
72     le->clsid = ITEM_clsid(it);
73     memcpy(le->key, ITEM_key(it), it->nkey);
74     e->size = sizeof(struct logentry_eviction) + le->nkey;
75 }
76 #ifdef EXTSTORE
_logger_log_ext_write(logentry * e,const entry_details * d,const void * entry,va_list ap)77 static void _logger_log_ext_write(logentry *e, const entry_details *d, const void *entry, va_list ap) {
78     item *it = (item *)entry;
79     int ew_bucket = va_arg(ap, int);
80 
81     struct logentry_ext_write *le = (struct logentry_ext_write *) e->data;
82     le->exptime = (it->exptime > 0) ? (long long int)(it->exptime - current_time) : (long long int) -1;
83     le->latime = current_time - it->time;
84     le->it_flags = it->it_flags;
85     le->nkey = it->nkey;
86     le->clsid = ITEM_clsid(it);
87     le->bucket = (uint8_t)ew_bucket;
88     memcpy(le->key, ITEM_key(it), it->nkey);
89     e->size = sizeof(struct logentry_ext_write) + le->nkey;
90 }
91 #endif
92 // 0 == nf, 1 == found. 2 == flushed. 3 == expired.
93 // might be useful to store/print the flags an item has?
94 // could also collapse this and above code into an "item status" struct. wait
95 // for more endpoints to be written before making it generic, though.
_logger_log_item_get(logentry * e,const entry_details * d,const void * entry,va_list ap)96 static void _logger_log_item_get(logentry *e, const entry_details *d, const void *entry, va_list ap) {
97     int was_found = va_arg(ap, int);
98     char *key = va_arg(ap, char *);
99     int nkey = va_arg(ap, int);
100     int nbytes = va_arg(ap, int);
101     uint8_t clsid = va_arg(ap, int);
102     int sfd = va_arg(ap, int);
103 
104     struct logentry_item_get *le = (struct logentry_item_get *) e->data;
105     le->was_found = was_found;
106     le->nkey = nkey;
107     le->nbytes = nbytes;
108     le->clsid = clsid;
109     memcpy(le->key, key, nkey);
110     le->sfd = sfd;
111     e->size = sizeof(struct logentry_item_get) + nkey;
112 }
113 
_logger_log_item_store(logentry * e,const entry_details * d,const void * entry,va_list ap)114 static void _logger_log_item_store(logentry *e, const entry_details *d, const void *entry, va_list ap) {
115     enum store_item_type status = va_arg(ap, enum store_item_type);
116     int comm = va_arg(ap, int);
117     char *key = va_arg(ap, char *);
118     int nkey = va_arg(ap, int);
119     int nbytes = va_arg(ap, int);
120     rel_time_t ttl = va_arg(ap, rel_time_t);
121     uint8_t clsid = va_arg(ap, int);
122     int sfd = va_arg(ap, int);
123 
124     struct logentry_item_store *le = (struct logentry_item_store *) e->data;
125     le->status = status;
126     le->cmd = comm;
127     le->nkey = nkey;
128     le->nbytes = nbytes;
129     le->clsid = clsid;
130     if (ttl != 0) {
131         le->ttl = ttl - current_time;
132     } else {
133         le->ttl = 0;
134     }
135     memcpy(le->key, key, nkey);
136     le->sfd = sfd;
137     e->size = sizeof(struct logentry_item_store) + nkey;
138 }
139 
_logger_log_item_deleted(logentry * e,const entry_details * d,const void * entry,va_list ap)140 static void _logger_log_item_deleted(logentry *e, const entry_details *d, const void *entry, va_list ap) {
141     item *it = (item *)entry;
142     int comm = va_arg(ap, int);
143     struct logentry_deletion *le = (struct logentry_deletion *) e->data;
144     le->nkey = it->nkey;
145     le->cmd = comm;
146     le->nbytes = it->nbytes;
147     le->clsid = ITEM_clsid(it);
148     memcpy(le->key, ITEM_key(it), it->nkey);
149     e->size = sizeof(struct logentry_deletion) + le->nkey;
150 }
151 
_logger_log_conn_event(logentry * e,const entry_details * d,const void * entry,va_list ap)152 static void _logger_log_conn_event(logentry *e, const entry_details *d, const void *entry, va_list ap) {
153     struct sockaddr_in6 *addr = va_arg(ap, struct sockaddr_in6 *);
154     socklen_t addrlen = va_arg(ap, socklen_t);
155     enum network_transport transport = va_arg(ap, enum network_transport);
156     enum close_reasons reason = va_arg(ap, enum close_reasons);
157     int sfd = va_arg(ap, int);
158 
159     struct logentry_conn_event *le = (struct logentry_conn_event *) e->data;
160 
161     memcpy(&le->addr, addr, addrlen);
162     le->sfd = sfd;
163     le->transport = transport;
164     le->reason = reason;
165     e->size = sizeof(struct logentry_conn_event);
166 }
167 
168 /*************************
169  * Util functions used by the logger background thread
170  *************************/
171 
_logger_util_addr_endpoint(struct sockaddr_in6 * addr,char * rip,size_t riplen,unsigned short * rport)172 static int _logger_util_addr_endpoint(struct sockaddr_in6 *addr, char *rip,
173         size_t riplen, unsigned short *rport) {
174     memset(rip, 0, riplen);
175     *rport = 0;
176 
177     switch (addr->sin6_family) {
178         case AF_INET:
179             inet_ntop(AF_INET, &((struct sockaddr_in *) addr)->sin_addr,
180                     rip, riplen - 1);
181             *rport = ntohs(((struct sockaddr_in *) addr)->sin_port);
182             break;
183         case AF_INET6:
184             inet_ntop(AF_INET6, &((struct sockaddr_in6 *) addr)->sin6_addr,
185                     rip, riplen - 1);
186             *rport = ntohs(((struct sockaddr_in6 *) addr)->sin6_port);
187             break;
188 #ifndef DISABLE_UNIX_SOCKET
189         // Connections on Unix socket transports have c->request_addr zeroed out.
190         case AF_UNSPEC:
191         case AF_UNIX:
192             strncpy(rip, "unix", strlen("unix") + 1);
193             break;
194 #endif // #ifndef DISABLE_UNIX_SOCKET
195     }
196 
197     return 0;
198 }
199 
200 /*************************
201  * Logger background thread functions. Aggregates per-worker buffers and
202  * writes to any watchers.
203  *************************/
204 
205 #define LOGGER_PARSE_SCRATCH 4096
206 
_logger_parse_text(logentry * e,char * scratch)207 static int _logger_parse_text(logentry *e, char *scratch) {
208     return snprintf(scratch, LOGGER_PARSE_SCRATCH, "ts=%lld.%d gid=%llu %s\n",
209             (long long int)e->tv.tv_sec, (int)e->tv.tv_usec,
210             (unsigned long long) e->gid, (char *) e->data);
211 }
212 
_logger_parse_ise(logentry * e,char * scratch)213 static int _logger_parse_ise(logentry *e, char *scratch) {
214     int total;
215     const char *cmd = "na";
216     char keybuf[KEY_MAX_URI_ENCODED_LENGTH];
217     struct logentry_item_store *le = (struct logentry_item_store *) e->data;
218     const char * const status_map[] = {
219         "not_stored", "stored", "exists", "not_found", "too_large", "no_memory" };
220     const char * const cmd_map[] = {
221         "null", "add", "set", "replace", "append", "prepend", "cas", "append", "prepend" };
222 
223     if (le->cmd <= 8)
224         cmd = cmd_map[le->cmd];
225 
226     uriencode(le->key, keybuf, le->nkey, KEY_MAX_URI_ENCODED_LENGTH);
227     total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
228             "ts=%lld.%d gid=%llu type=item_store key=%s status=%s cmd=%s ttl=%u clsid=%u cfd=%d size=%d\n",
229             (long long int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
230             keybuf, status_map[le->status], cmd, le->ttl, le->clsid, le->sfd,
231             le->nbytes > 0 ? le->nbytes - 2 : 0); // CLRF
232     return total;
233 }
234 
_logger_parse_ige(logentry * e,char * scratch)235 static int _logger_parse_ige(logentry *e, char *scratch) {
236     int total;
237     struct logentry_item_get *le = (struct logentry_item_get *) e->data;
238     char keybuf[KEY_MAX_URI_ENCODED_LENGTH];
239     const char * const was_found_map[] = {
240         "not_found", "found", "flushed", "expired" };
241 
242     uriencode(le->key, keybuf, le->nkey, KEY_MAX_URI_ENCODED_LENGTH);
243     total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
244             "ts=%lld.%d gid=%llu type=item_get key=%s status=%s clsid=%u cfd=%d size=%d\n",
245             (long long int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
246             keybuf, was_found_map[le->was_found], le->clsid, le->sfd,
247             le->nbytes > 0 ? le->nbytes - 2 : 0); // CLRF
248     return total;
249 }
250 
_logger_parse_ee(logentry * e,char * scratch)251 static int _logger_parse_ee(logentry *e, char *scratch) {
252     int total;
253     char keybuf[KEY_MAX_URI_ENCODED_LENGTH];
254     struct logentry_eviction *le = (struct logentry_eviction *) e->data;
255     uriencode(le->key, keybuf, le->nkey, KEY_MAX_URI_ENCODED_LENGTH);
256     total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
257             "ts=%lld.%d gid=%llu type=eviction key=%s fetch=%s ttl=%lld la=%d clsid=%u size=%d\n",
258             (long long int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
259             keybuf, (le->it_flags & ITEM_FETCHED) ? "yes" : "no",
260             (long long int)le->exptime, le->latime, le->clsid,
261             le->nbytes > 0 ? le->nbytes - 2 : 0); // CLRF
262 
263     return total;
264 }
265 
_logger_parse_ide(logentry * e,char * scratch)266 static int _logger_parse_ide(logentry *e, char *scratch) {
267     int total;
268     const char *cmd = "na";
269     const char * const cmd_map[] = {
270             "null", "delete", "md" };
271     char keybuf[KEY_MAX_URI_ENCODED_LENGTH];
272     struct logentry_deletion *le = (struct logentry_deletion *) e->data;
273     uriencode(le->key, keybuf, le->nkey, KEY_MAX_URI_ENCODED_LENGTH);
274 
275     if (le->cmd <= 2)
276         cmd = cmd_map[le->cmd];
277 
278     total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
279                      "ts=%d.%d gid=%llu type=deleted key=%s cmd=%s clsid=%u size=%d\n",
280                      (int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
281                      keybuf, cmd, le->clsid,
282                      le->nbytes > 0 ? le->nbytes - 2 : 0); // CLRF
283     return total;
284 }
285 
286 #ifdef EXTSTORE
_logger_parse_extw(logentry * e,char * scratch)287 static int _logger_parse_extw(logentry *e, char *scratch) {
288     int total;
289     char keybuf[KEY_MAX_URI_ENCODED_LENGTH];
290     struct logentry_ext_write *le = (struct logentry_ext_write *) e->data;
291     uriencode(le->key, keybuf, le->nkey, KEY_MAX_URI_ENCODED_LENGTH);
292     total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
293             "ts=%lld.%d gid=%llu type=extwrite key=%s fetch=%s ttl=%lld la=%d clsid=%u bucket=%u\n",
294             (long long int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
295             keybuf, (le->it_flags & ITEM_FETCHED) ? "yes" : "no",
296             (long long int)le->exptime, le->latime, le->clsid, le->bucket);
297 
298     return total;
299 }
300 #endif
301 
_logger_parse_cne(logentry * e,char * scratch)302 static int _logger_parse_cne(logentry *e, char *scratch) {
303     int total;
304     unsigned short rport = 0;
305     char rip[64];
306     struct logentry_conn_event *le = (struct logentry_conn_event *) e->data;
307     const char * const transport_map[] = { "local", "tcp", "udp" };
308 
309     _logger_util_addr_endpoint(&le->addr, rip, sizeof(rip), &rport);
310 
311     total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
312             "ts=%lld.%d gid=%llu type=conn_new rip=%s rport=%hu transport=%s cfd=%d\n",
313             (long long int) e->tv.tv_sec, (int) e->tv.tv_usec, (unsigned long long) e->gid,
314             rip, rport, transport_map[le->transport], le->sfd);
315 
316     return total;
317 }
318 
_logger_parse_cce(logentry * e,char * scratch)319 static int _logger_parse_cce(logentry *e, char *scratch) {
320     int total;
321     unsigned short rport = 0;
322     char rip[64];
323     struct logentry_conn_event *le = (struct logentry_conn_event *) e->data;
324     const char * const transport_map[] = { "local", "tcp", "udp" };
325     const char * const reason_map[] = { "error", "normal", "idle_timeout", "shutdown" };
326 
327     _logger_util_addr_endpoint(&le->addr, rip, sizeof(rip), &rport);
328 
329     total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
330             "ts=%lld.%d gid=%llu type=conn_close rip=%s rport=%hu transport=%s reason=%s cfd=%d\n",
331             (long long int) e->tv.tv_sec, (int) e->tv.tv_usec, (unsigned long long) e->gid,
332             rip, rport, transport_map[le->transport],
333             reason_map[le->reason], le->sfd);
334 
335     return total;
336 }
337 
338 #ifdef PROXY
339 // TODO (v2): the length caps here are all magic numbers. Haven't thought of
340 // something yet that I like better.
341 // Should at least make a define to the max log len (1024) and do some math
342 // here.
_logger_log_proxy_req(logentry * e,const entry_details * d,const void * entry,va_list ap)343 static void _logger_log_proxy_req(logentry *e, const entry_details *d, const void *entry, va_list ap) {
344     char *req = va_arg(ap, char *);
345     int reqlen = va_arg(ap, uint32_t);
346     long elapsed = va_arg(ap, long);
347     unsigned short type = va_arg(ap, int);
348     unsigned short code = va_arg(ap, int);
349     int status = va_arg(ap, int);
350     int conn_fd = va_arg(ap, int);
351     char *detail = va_arg(ap, char *);
352     int dlen = va_arg(ap, int);
353     char *be_name = va_arg(ap, char *);
354     char *be_port = va_arg(ap, char *);
355 
356     struct logentry_proxy_req *le = (void *)e->data;
357     le->type = type;
358     le->code = code;
359     le->status = status;
360     le->conn_fd = conn_fd;
361     le->dlen = dlen;
362     le->elapsed = elapsed;
363     if (be_name && be_port) {
364         le->be_namelen = strlen(be_name);
365         le->be_portlen = strlen(be_port);
366     } else {
367         le->be_namelen = 0;
368         le->be_portlen = 0;
369     }
370     char *data = le->data;
371     if (req[reqlen-2] == '\r') {
372         reqlen -= 2;
373     } else {
374         reqlen--;
375     }
376     if (reqlen > 300) {
377         reqlen = 300;
378     }
379     if (dlen > 150) {
380         dlen = 150;
381     }
382     // be_namelen and be_portlen can't be longer than 255+6
383     le->reqlen = reqlen;
384     memcpy(data, req, reqlen);
385     data += reqlen;
386     memcpy(data, detail, dlen);
387     data += dlen;
388     memcpy(data, be_name, le->be_namelen);
389     data += le->be_namelen;
390     memcpy(data, be_port, le->be_portlen);
391     e->size = sizeof(struct logentry_proxy_req) + reqlen + dlen + le->be_namelen + le->be_portlen;
392 }
393 
_logger_parse_prx_req(logentry * e,char * scratch)394 static int _logger_parse_prx_req(logentry *e, char *scratch) {
395     int total;
396     struct logentry_proxy_req *le = (void *)e->data;
397 
398     total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
399             "ts=%lld.%d gid=%llu type=proxy_req elapsed=%lu type=%d code=%d status=%d cfd=%d be=%.*s:%.*s detail=%.*s req=%.*s\n",
400             (long long int) e->tv.tv_sec, (int) e->tv.tv_usec, (unsigned long long) e->gid,
401             le->elapsed, le->type, le->code, le->status, le->conn_fd,
402             (int)le->be_namelen, le->data+le->reqlen+le->dlen,
403             (int)le->be_portlen, le->data+le->reqlen+le->dlen+le->be_namelen, // fml.
404             (int)le->dlen, le->data+le->reqlen, (int)le->reqlen, le->data
405             );
406     return total;
407 }
408 
409 #define MAX_RBUF_READ 100
_logger_log_proxy_errbe(logentry * e,const entry_details * d,const void * entry,va_list ap)410 static void _logger_log_proxy_errbe(logentry *e, const entry_details *d, const void *entry, va_list ap) {
411     char *errmsg = va_arg(ap, char *);
412     char *be_name = va_arg(ap, char *);
413     char *be_port = va_arg(ap, char *);
414     char *be_label = va_arg(ap, char *);
415     int be_depth = va_arg(ap, int);
416     char *be_rbuf = va_arg(ap, char *);
417     int be_rbuflen = va_arg(ap, int);
418     int be_retry = va_arg(ap, int);
419 
420     struct logentry_proxy_errbe *le = (void *)e->data;
421     le->be_depth = be_depth;
422     le->retry = be_retry;
423     le->errlen = strlen(errmsg);
424     if (be_name && be_port) {
425         le->be_namelen = strlen(be_name);
426         le->be_portlen = strlen(be_port);
427     }
428 
429     if (be_label) {
430         le->be_labellen = strlen(be_label);
431     }
432 
433     le->be_rbuflen = be_rbuflen;
434     if (be_rbuflen > MAX_RBUF_READ) {
435         le->be_rbuflen = MAX_RBUF_READ;
436     }
437 
438     char *data = le->data;
439     memcpy(data, errmsg, le->errlen);
440     data += le->errlen;
441     memcpy(data, be_name, le->be_namelen);
442     data += le->be_namelen;
443     memcpy(data, be_port, le->be_portlen);
444     data += le->be_portlen;
445     memcpy(data, be_label, le->be_labellen);
446     data += le->be_labellen;
447     memcpy(data, be_rbuf, le->be_rbuflen);
448     data += le->be_rbuflen;
449 
450     e->size = sizeof(struct logentry_proxy_errbe) + (data - le->data);
451 }
452 
_logger_parse_prx_errbe(logentry * e,char * scratch)453 static int _logger_parse_prx_errbe(logentry *e, char *scratch) {
454     int total;
455     char rbuf[MAX_RBUF_READ * 3]; // x 3 for worst case URI encoding.
456     struct logentry_proxy_errbe *le = (void *)e->data;
457     char *data = le->data;
458     char *errmsg = data;
459     data += le->errlen;
460     char *be_name = data;
461     data += le->be_namelen;
462     char *be_port = data;
463     data += le->be_portlen;
464     char *be_label = data;
465     data += le->be_labellen;
466     char *be_rbuf = data;
467 
468     uriencode(be_rbuf, rbuf, le->be_rbuflen, MAX_RBUF_READ * 3);
469     if (le->retry) {
470         total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
471                 "ts=%lld.%d gid=%llu type=proxy_backend error=%.*s name=%.*s port=%.*s label=%.*s retry=%d\n",
472                 (long long int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
473                 (int)le->errlen, errmsg, (int)le->be_namelen, be_name,
474                 (int)le->be_portlen, be_port, (int)le->be_labellen, be_label, le->retry);
475     } else {
476         total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
477                 "ts=%lld.%d gid=%llu type=proxy_backend error=%.*s name=%.*s port=%.*s label=%.*s depth=%d rbuf=%s\n",
478                 (long long int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
479                 (int)le->errlen, errmsg, (int)le->be_namelen, be_name,
480                 (int)le->be_portlen, be_port, (int)le->be_labellen, be_label, le->be_depth, rbuf);
481     }
482 
483     return total;
484 }
485 #endif
486 
487 /* Should this go somewhere else? */
488 static const entry_details default_entries[] = {
489     [LOGGER_ASCII_CMD] = {512, LOG_RAWCMDS, _logger_log_text, _logger_parse_text, "<%d %s"},
490     [LOGGER_EVICTION] = {512, LOG_EVICTIONS, _logger_log_evictions, _logger_parse_ee, NULL},
491     [LOGGER_ITEM_GET] = {512, LOG_FETCHERS, _logger_log_item_get, _logger_parse_ige, NULL},
492     [LOGGER_ITEM_STORE] = {512, LOG_MUTATIONS, _logger_log_item_store, _logger_parse_ise, NULL},
493     [LOGGER_CRAWLER_STATUS] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text,
494         "type=lru_crawler crawler=%d lru=%s low_mark=%llu next_reclaims=%llu since_run=%u next_run=%d elapsed=%u examined=%llu reclaimed=%llu"
495     },
496     [LOGGER_SLAB_MOVE] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text,
497         "type=slab_move src=%d dst=%d"
498     },
499     [LOGGER_CONNECTION_NEW] = {512, LOG_CONNEVENTS, _logger_log_conn_event, _logger_parse_cne, NULL},
500     [LOGGER_CONNECTION_CLOSE] = {512, LOG_CONNEVENTS, _logger_log_conn_event, _logger_parse_cce, NULL},
501     [LOGGER_CONNECTION_ERROR] = {512, LOG_CONNEVENTS, _logger_log_text, _logger_parse_text,
502         "type=connerr fd=%d msg=%s"
503     },
504     [LOGGER_CONNECTION_TLSERROR] = {512, LOG_CONNEVENTS, _logger_log_text, _logger_parse_text,
505         "type=conntlserr fd=%d msg=%s"
506     },
507     [LOGGER_DELETIONS] = {512, LOG_DELETIONS, _logger_log_item_deleted, _logger_parse_ide, NULL},
508 #ifdef EXTSTORE
509     [LOGGER_EXTSTORE_WRITE] = {512, LOG_EVICTIONS, _logger_log_ext_write, _logger_parse_extw, NULL},
510     [LOGGER_COMPACT_START] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text,
511         "type=compact_start id=%lu version=%llu"
512     },
513     [LOGGER_COMPACT_ABORT] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text,
514         "type=compact_abort id=%lu"
515     },
516     [LOGGER_COMPACT_READ_START] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text,
517         "type=compact_read_start id=%lu offset=%llu"
518     },
519     [LOGGER_COMPACT_READ_END] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text,
520         "type=compact_read_end id=%lu offset=%llu rescues=%lu lost=%lu skipped=%lu"
521     },
522     [LOGGER_COMPACT_END] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text,
523         "type=compact_end id=%lu"
524     },
525     [LOGGER_COMPACT_FRAGINFO] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text,
526         "type=compact_fraginfo ratio=%.2f bytes=%lu"
527     },
528 #endif
529 #ifdef PROXY
530     [LOGGER_PROXY_CONFIG] = {512, LOG_PROXYEVENTS, _logger_log_text, _logger_parse_text,
531         "type=proxy_conf status=%s"
532     },
533     [LOGGER_PROXY_REQ] = {1024, LOG_PROXYREQS, _logger_log_proxy_req, _logger_parse_prx_req, NULL},
534     [LOGGER_PROXY_ERROR] = {512, LOG_PROXYEVENTS, _logger_log_text, _logger_parse_text,
535         "type=proxy_error msg=%s"
536     },
537     [LOGGER_PROXY_USER] = {512, LOG_PROXYUSER, _logger_log_text, _logger_parse_text,
538         "type=proxy_user msg=%s"
539     },
540     [LOGGER_PROXY_BE_ERROR] = {512, LOG_PROXYEVENTS, _logger_log_proxy_errbe, _logger_parse_prx_errbe,
541         NULL
542     },
543 
544 #endif
545 };
546 
547 /*************************
548  * Util functions shared between bg thread and workers
549  *************************/
550 
551 /* Logger GID's can be used by watchers to put logs back into strict order
552  */
553 static uint64_t logger_gid = 0;
logger_get_gid(void)554 uint64_t logger_get_gid(void) {
555 #ifdef HAVE_GCC_64ATOMICS
556     return __sync_add_and_fetch(&logger_gid, 1);
557 #elif defined(__sun)
558     return atomic_inc_64_nv(&logger_gid);
559 #else
560     mutex_lock(&logger_atomics_mutex);
561     uint64_t res = ++logger_gid;
562     mutex_unlock(&logger_atomics_mutex);
563     return res;
564 #endif
565 }
566 
logger_set_gid(uint64_t gid)567 void logger_set_gid(uint64_t gid) {
568 #ifdef HAVE_GCC_64ATOMICS
569     __sync_add_and_fetch(&logger_gid, gid);
570 #elif defined(__sun)
571     atomic_add_64(&logger_gid);
572 #else
573     mutex_lock(&logger_atomics_mutex);
574     logger_gid = gid;
575     mutex_unlock(&logger_atomics_mutex);
576 #endif
577 }
578 
579 /* TODO: genericize lists. would be nice to import queue.h if the impact is
580  * studied... otherwise can just write a local one.
581  */
582 /* Add to the list of threads with a logger object */
logger_link_q(logger * l)583 static void logger_link_q(logger *l) {
584     pthread_mutex_lock(&logger_stack_lock);
585     assert(l != logger_stack_head);
586 
587     l->prev = 0;
588     l->next = logger_stack_head;
589     if (l->next) l->next->prev = l;
590     logger_stack_head = l;
591     if (logger_stack_tail == 0) logger_stack_tail = l;
592     logger_count++;
593     pthread_mutex_unlock(&logger_stack_lock);
594     return;
595 }
596 
597 /* Remove from the list of threads with a logger object */
598 /*static void logger_unlink_q(logger *l) {
599     pthread_mutex_lock(&logger_stack_lock);
600     if (logger_stack_head == l) {
601         assert(l->prev == 0);
602         logger_stack_head = l->next;
603     }
604     if (logger_stack_tail == l) {
605         assert(l->next == 0);
606         logger_stack_tail = l->prev;
607     }
608     assert(l->next != l);
609     assert(l->prev != l);
610 
611     if (l->next) l->next->prev = l->prev;
612     if (l->prev) l->prev->next = l->next;
613     logger_count--;
614     pthread_mutex_unlock(&logger_stack_lock);
615     return;
616 }*/
617 
618 /* Called with logger stack locked.
619  * Iterates over every watcher collecting enabled flags.
620  */
logger_set_flags(void)621 static void logger_set_flags(void) {
622     logger *l = NULL;
623     int x = 0;
624     uint16_t f = 0; /* logger eflags */
625 
626     for (x = 0; x < WATCHER_LIMIT; x++) {
627         logger_watcher *w = watchers[x];
628         if (w == NULL)
629             continue;
630 
631         f |= w->eflags;
632     }
633     for (l = logger_stack_head; l != NULL; l=l->next) {
634         pthread_mutex_lock(&l->mutex);
635         l->eflags = f;
636         pthread_mutex_unlock(&l->mutex);
637     }
638     return;
639 }
640 
641 /* Completes rendering of log line. */
logger_thread_parse_entry(logentry * e,struct logger_stats * ls,char * scratch,int * scratch_len)642 static enum logger_parse_entry_ret logger_thread_parse_entry(logentry *e, struct logger_stats *ls,
643         char *scratch, int *scratch_len) {
644     int total = 0;
645     const entry_details *d = &default_entries[e->event];
646     assert(d->parse_cb != NULL);
647     total = d->parse_cb(e, scratch);
648 
649     if (total >= LOGGER_PARSE_SCRATCH || total <= 0) {
650         L_DEBUG("LOGGER: Failed to flatten log entry!\n");
651         return LOGGER_PARSE_ENTRY_FAILED;
652     } else {
653         *scratch_len = total;
654     }
655 
656     return LOGGER_PARSE_ENTRY_OK;
657 }
658 
659 /* Writes flattened entry to available watchers */
logger_thread_write_entry(logentry * e,struct logger_stats * ls,char * scratch,int scratch_len)660 static void logger_thread_write_entry(logentry *e, struct logger_stats *ls,
661         char *scratch, int scratch_len) {
662     int x, total;
663     /* Write the line into available watchers with matching flags */
664     for (x = 0; x < WATCHER_LIMIT; x++) {
665         logger_watcher *w = watchers[x];
666         char *skip_scr = NULL;
667         if (w == NULL || (e->eflags & w->eflags) == 0 || (e->gid < w->min_gid))
668             continue;
669 
670          /* Avoid poll()'ing constantly when buffer is full by resetting a
671          * flag periodically.
672          */
673         while (!w->failed_flush &&
674                 (skip_scr = (char *) bipbuf_request(w->buf, scratch_len + 128)) == NULL) {
675             if (logger_thread_poll_watchers(0, x) <= 0) {
676                 L_DEBUG("LOGGER: Watcher had no free space for line of size (%d)\n", scratch_len + 128);
677                 w->failed_flush = true;
678             }
679         }
680 
681         if (w->failed_flush) {
682             L_DEBUG("LOGGER: Fast skipped for watcher [%d] due to failed_flush\n", w->sfd);
683             w->skipped++;
684             ls->watcher_skipped++;
685             continue;
686         }
687 
688         if (w->skipped > 0) {
689             total = snprintf(skip_scr, 128, "skipped=%llu\n", (unsigned long long) w->skipped);
690             if (total >= 128 || total <= 0) {
691                 L_DEBUG("LOGGER: Failed to flatten skipped message into watcher [%d]\n", w->sfd);
692                 w->skipped++;
693                 ls->watcher_skipped++;
694                 continue;
695             }
696             bipbuf_push(w->buf, total);
697             w->skipped = 0;
698         }
699         /* Can't fail because bipbuf_request succeeded. */
700         bipbuf_offer(w->buf, (unsigned char *) scratch, scratch_len);
701         ls->watcher_sent++;
702     }
703 }
704 
705 /* Called with logger stack locked.
706  * Releases every chunk associated with a watcher and closes the connection.
707  * We can't presently send a connection back to the worker for further
708  * processing.
709  */
logger_thread_close_watcher(logger_watcher * w)710 static void logger_thread_close_watcher(logger_watcher *w) {
711     L_DEBUG("LOGGER: Closing dead watcher\n");
712     watchers[w->id] = NULL;
713     sidethread_conn_close(w->c);
714     watcher_count--;
715     bipbuf_free(w->buf);
716     free(w);
717     logger_set_flags();
718 }
719 
720 /* Reads a particular worker thread's available bipbuf bytes. Parses each log
721  * entry into the watcher buffers.
722  */
logger_thread_read(logger * l,struct logger_stats * ls)723 static int logger_thread_read(logger *l, struct logger_stats *ls) {
724     unsigned int size;
725     unsigned int pos = 0;
726     unsigned char *data;
727     char scratch[LOGGER_PARSE_SCRATCH];
728     logentry *e;
729     pthread_mutex_lock(&l->mutex);
730     data = bipbuf_peek_all(l->buf, &size);
731     pthread_mutex_unlock(&l->mutex);
732 
733     if (data == NULL) {
734         return 0;
735     }
736     L_DEBUG("LOGGER: Got %d bytes from bipbuffer\n", size);
737 
738     /* parse buffer */
739     while (pos < size && watcher_count > 0) {
740         enum logger_parse_entry_ret ret;
741         int scratch_len = 0;
742         e = (logentry *) (data + pos);
743         ret = logger_thread_parse_entry(e, ls, scratch, &scratch_len);
744         if (ret != LOGGER_PARSE_ENTRY_OK) {
745             /* TODO: stats counter */
746             fprintf(stderr, "LOGGER: Failed to parse log entry\n");
747         } else {
748             logger_thread_write_entry(e, ls, scratch, scratch_len);
749         }
750         pos += sizeof(logentry) + e->size + e->pad;
751     }
752     assert(pos <= size);
753 
754     pthread_mutex_lock(&l->mutex);
755     data = bipbuf_poll(l->buf, size);
756     ls->worker_written += l->written;
757     ls->worker_dropped += l->dropped;
758     l->written = 0;
759     l->dropped = 0;
760     pthread_mutex_unlock(&l->mutex);
761     if (data == NULL) {
762         fprintf(stderr, "LOGGER: unexpectedly couldn't advance buf pointer\n");
763         assert(0);
764     }
765     return size; /* maybe the count of objects iterated? */
766 }
767 
768 /* Since the event loop code isn't reusable without a refactor, and we have a
769  * limited number of potential watchers, we run our own poll loop.
770  * This calls poll() unnecessarily during write flushes, should be possible to
771  * micro-optimize later.
772  *
773  * This flushes buffers attached to watchers, iterating through the bytes set
774  * to each worker. Also checks for readability in case client connection was
775  * closed.
776  *
777  * Allows a specific watcher to be flushed (if buf full)
778  */
logger_thread_poll_watchers(int force_poll,int watcher)779 static int logger_thread_poll_watchers(int force_poll, int watcher) {
780     int x;
781     int nfd = 0;
782     unsigned char *data;
783     unsigned int data_size = 0;
784     int flushed = 0;
785 
786     for (x = 0; x < WATCHER_LIMIT; x++) {
787         logger_watcher *w = watchers[x];
788         if (w == NULL || (watcher != WATCHER_ALL && x != watcher))
789             continue;
790 
791         data = bipbuf_peek_all(w->buf, &data_size);
792         if (data != NULL) {
793             watchers_pollfds[nfd].fd = w->sfd;
794             watchers_pollfds[nfd].events = POLLOUT;
795             nfd++;
796         } else if (force_poll) {
797             watchers_pollfds[nfd].fd = w->sfd;
798             watchers_pollfds[nfd].events = POLLIN;
799             nfd++;
800         }
801         /* This gets set after a call to poll, and should be used to gate on
802          * calling poll again.
803          */
804         w->failed_flush = false;
805     }
806 
807     if (nfd == 0)
808         return 0;
809 
810     //L_DEBUG("LOGGER: calling poll() [data_size: %d]\n", data_size);
811     int ret = poll(watchers_pollfds, nfd, 0);
812 
813     if (ret < 0) {
814         perror("something failed with logger thread watcher fd polling");
815         return -1;
816     }
817 
818     nfd = 0;
819     for (x = 0; x < WATCHER_LIMIT; x++) {
820         logger_watcher *w = watchers[x];
821         if (w == NULL || (watcher != WATCHER_ALL && x != watcher))
822             continue;
823 
824         data_size = 0;
825         /* Early detection of a disconnect. Otherwise we have to wait until
826          * the next write
827          */
828         if (watchers_pollfds[nfd].revents & POLLIN) {
829             char buf[1];
830             int res = ((conn*)w->c)->read(w->c, buf, 1);
831             if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK))) {
832                 L_DEBUG("LOGGER: watcher closed remotely\n");
833                 logger_thread_close_watcher(w);
834                 nfd++;
835                 continue;
836             }
837         }
838         if ((data = bipbuf_peek_all(w->buf, &data_size)) != NULL) {
839             if (watchers_pollfds[nfd].revents & (POLLHUP|POLLERR)) {
840                 L_DEBUG("LOGGER: watcher closed during poll() call\n");
841                 logger_thread_close_watcher(w);
842             } else if (watchers_pollfds[nfd].revents & POLLOUT) {
843                 int total = 0;
844 
845                 /* We can write a bit. */
846                 switch (w->t) {
847                     case LOGGER_WATCHER_STDERR:
848                         total = fwrite(data, 1, data_size, stderr);
849                         break;
850                     case LOGGER_WATCHER_CLIENT:
851                         total = ((conn*)w->c)->write(w->c, data, data_size);
852                         break;
853                 }
854 
855                 L_DEBUG("LOGGER: poll() wrote %d to %d (data_size: %d) (bipbuf_used: %d)\n", total, w->sfd,
856                         data_size, bipbuf_used(w->buf));
857                 if (total == -1) {
858                     if (errno != EAGAIN && errno != EWOULDBLOCK) {
859                         logger_thread_close_watcher(w);
860                     }
861                     L_DEBUG("LOGGER: watcher hit EAGAIN\n");
862                 } else if (total == 0) {
863                     logger_thread_close_watcher(w);
864                 } else {
865                     bipbuf_poll(w->buf, total);
866                     flushed += total;
867                 }
868             }
869         }
870         nfd++;
871     }
872     return flushed;
873 }
874 
logger_thread_flush_stats(struct logger_stats * ls)875 static void logger_thread_flush_stats(struct logger_stats *ls) {
876     STATS_LOCK();
877     stats.log_worker_dropped  += ls->worker_dropped;
878     stats.log_worker_written  += ls->worker_written;
879     stats.log_watcher_skipped += ls->watcher_skipped;
880     stats.log_watcher_sent    += ls->watcher_sent;
881     stats_state.log_watchers   = ls->watcher_count;
882     STATS_UNLOCK();
883 }
884 
885 #define MAX_LOGGER_SLEEP 1000000
886 #define MIN_LOGGER_SLEEP 1000
887 
888 /* Primary logger thread routine */
logger_thread(void * arg)889 static void *logger_thread(void *arg) {
890     useconds_t to_sleep = MIN_LOGGER_SLEEP;
891     L_DEBUG("LOGGER: Starting logger thread\n");
892     // TODO: If we ever have item references in the logger code, will need to
893     // ensure everything is dequeued before stopping the thread.
894     while (do_run_logger_thread) {
895         int found_logs = 0;
896         logger *l;
897         struct logger_stats ls;
898         memset(&ls, 0, sizeof(struct logger_stats));
899 
900         /* only sleep if we're *above* the minimum */
901         if (to_sleep > MIN_LOGGER_SLEEP)
902             usleep(to_sleep);
903 
904         /* Call function to iterate each logger. */
905         pthread_mutex_lock(&logger_stack_lock);
906         if (watcher_count == 0) {
907             // Not bothering to loop on the condition here since it's fine to
908             // walk through with zero watchers.
909             pthread_cond_wait(&logger_stack_cond, &logger_stack_lock);
910         }
911         for (l = logger_stack_head; l != NULL; l=l->next) {
912             /* lock logger, call function to manipulate it */
913             found_logs += logger_thread_read(l, &ls);
914         }
915 
916         logger_thread_poll_watchers(1, WATCHER_ALL);
917 
918         /* capture the current count within mutual exclusion of the lock */
919         ls.watcher_count = watcher_count;
920 
921         pthread_mutex_unlock(&logger_stack_lock);
922 
923         /* TODO: abstract into a function and share with lru_crawler */
924         if (!found_logs) {
925             if (to_sleep < MAX_LOGGER_SLEEP)
926                 to_sleep += to_sleep / 8;
927             if (to_sleep > MAX_LOGGER_SLEEP)
928                 to_sleep = MAX_LOGGER_SLEEP;
929         } else {
930             to_sleep /= 2;
931             if (to_sleep < MIN_LOGGER_SLEEP)
932                 to_sleep = MIN_LOGGER_SLEEP;
933         }
934         logger_thread_flush_stats(&ls);
935     }
936 
937     return NULL;
938 }
939 
start_logger_thread(void)940 static int start_logger_thread(void) {
941     int ret;
942     do_run_logger_thread = 1;
943     if ((ret = pthread_create(&logger_tid, NULL,
944                               logger_thread, NULL)) != 0) {
945         fprintf(stderr, "Can't start logger thread: %s\n", strerror(ret));
946         return -1;
947     }
948     thread_setname(logger_tid, "mc-log");
949     return 0;
950 }
951 
stop_logger_thread(void)952 static int stop_logger_thread(void) {
953     // Guarantees that the logger thread is waiting on 'logger_stack_cond'
954     // before we signal it.
955     pthread_mutex_lock(&logger_stack_lock);
956     do_run_logger_thread = 0;
957     pthread_cond_signal(&logger_stack_cond);
958     pthread_mutex_unlock(&logger_stack_lock);
959     pthread_join(logger_tid, NULL);
960     return 0;
961 }
962 
963 /*************************
964  * Public functions for submitting logs and starting loggers from workers.
965  *************************/
966 
967 /* Global logger thread start/init */
logger_init(void)968 void logger_init(void) {
969     /* TODO: auto destructor when threads exit */
970     /* TODO: error handling */
971 
972     /* init stack for iterating loggers */
973     logger_stack_head = 0;
974     logger_stack_tail = 0;
975     pthread_key_create(&logger_key, NULL);
976 
977     if (start_logger_thread() != 0) {
978         abort();
979     }
980 
981     /* This is what adding a STDERR watcher looks like. should replace old
982      * "verbose" settings. */
983     //logger_add_watcher(NULL, 0);
984     return;
985 }
986 
logger_stop(void)987 void logger_stop(void) {
988     stop_logger_thread();
989 }
990 
991 /* called *from* the thread using a logger.
992  * initializes the per-thread bipbuf, links it into the list of loggers
993  */
logger_create(void)994 logger *logger_create(void) {
995     L_DEBUG("LOGGER: Creating and linking new logger instance\n");
996     logger *l = calloc(1, sizeof(logger));
997     if (l == NULL) {
998         return NULL;
999     }
1000 
1001     l->buf = bipbuf_new(settings.logger_buf_size);
1002     if (l->buf == NULL) {
1003         free(l);
1004         return NULL;
1005     }
1006 
1007     l->entry_map = default_entries;
1008 
1009     pthread_mutex_init(&l->mutex, NULL);
1010     pthread_setspecific(logger_key, l);
1011 
1012     /* add to list of loggers */
1013     logger_link_q(l);
1014     return l;
1015 }
1016 
1017 /* Public function for logging an entry.
1018  * Tries to encapsulate as much of the formatting as possible to simplify the
1019  * caller's code.
1020  */
logger_log(logger * l,const enum log_entry_type event,const void * entry,...)1021 enum logger_ret_type logger_log(logger *l, const enum log_entry_type event, const void *entry, ...) {
1022     bipbuf_t *buf = l->buf;
1023     bool nospace = false;
1024     va_list ap;
1025     logentry *e;
1026 
1027     const entry_details *d = &l->entry_map[event];
1028     int reqlen = d->reqlen;
1029 
1030     pthread_mutex_lock(&l->mutex);
1031     /* Request a maximum length of data to write to */
1032     e = (logentry *) bipbuf_request(buf, (sizeof(logentry) + reqlen));
1033     if (e == NULL) {
1034         l->dropped++;
1035         pthread_mutex_unlock(&l->mutex);
1036         return LOGGER_RET_NOSPACE;
1037     }
1038     e->event = event;
1039     e->pad = 0;
1040     e->gid = logger_get_gid();
1041     /* TODO: Could pass this down as an argument now that we're using
1042      * LOGGER_LOG() macro.
1043      */
1044     e->eflags = d->eflags;
1045     /* Noting time isn't optional. A feature may be added to avoid rendering
1046      * time and/or gid to a logger.
1047      */
1048     gettimeofday(&e->tv, NULL);
1049 
1050     va_start(ap, entry);
1051     d->log_cb(e, d, entry, ap);
1052     va_end(ap);
1053 
1054 #ifdef NEED_ALIGN
1055     /* Need to ensure *next* request is aligned. */
1056     if (sizeof(logentry) + e->size % 8 != 0) {
1057         e->pad = 8 - (sizeof(logentry) + e->size % 8);
1058     }
1059 #endif
1060 
1061     /* Push pointer forward by the actual amount required */
1062     if (bipbuf_push(buf, (sizeof(logentry) + e->size + e->pad)) == 0) {
1063         fprintf(stderr, "LOGGER: Failed to bipbuf push a text entry\n");
1064         pthread_mutex_unlock(&l->mutex);
1065         return LOGGER_RET_ERR;
1066     }
1067     l->written++;
1068     L_DEBUG("LOGGER: Requested %d bytes, wrote %lu bytes\n", reqlen,
1069             (sizeof(logentry) + e->size));
1070 
1071     pthread_mutex_unlock(&l->mutex);
1072 
1073     if (nospace) {
1074         return LOGGER_RET_NOSPACE;
1075     } else {
1076         return LOGGER_RET_OK;
1077     }
1078 }
1079 
1080 /* Passes a client connection socket from a primary worker thread to the
1081  * logger thread. Caller *must* event_del() the client before handing it over.
1082  * Presently there's no way to hand the client back to the worker thread.
1083  */
logger_add_watcher(void * c,const int sfd,uint16_t f)1084 enum logger_add_watcher_ret logger_add_watcher(void *c, const int sfd, uint16_t f) {
1085     int x;
1086     logger_watcher *w = NULL;
1087     pthread_mutex_lock(&logger_stack_lock);
1088     if (watcher_count >= WATCHER_LIMIT) {
1089         pthread_mutex_unlock(&logger_stack_lock);
1090         return LOGGER_ADD_WATCHER_TOO_MANY;
1091     }
1092 
1093     for (x = 0; x < WATCHER_LIMIT-1; x++) {
1094         if (watchers[x] == NULL)
1095             break;
1096     }
1097 
1098     w = calloc(1, sizeof(logger_watcher));
1099     if (w == NULL) {
1100         pthread_mutex_unlock(&logger_stack_lock);
1101         return LOGGER_ADD_WATCHER_FAILED;
1102     }
1103     w->c = c;
1104     w->sfd = sfd;
1105     if (sfd == 0 && c == NULL) {
1106         w->t = LOGGER_WATCHER_STDERR;
1107     } else {
1108         w->t = LOGGER_WATCHER_CLIENT;
1109     }
1110     w->id = x;
1111     w->eflags = f;
1112     w->min_gid = logger_get_gid();
1113     w->buf = bipbuf_new(settings.logger_watcher_buf_size);
1114     if (w->buf == NULL) {
1115         free(w);
1116         pthread_mutex_unlock(&logger_stack_lock);
1117         return LOGGER_ADD_WATCHER_FAILED;
1118     }
1119     bipbuf_offer(w->buf, (unsigned char *) "OK\r\n", 4);
1120 
1121     watchers[x] = w;
1122     watcher_count++;
1123     /* Update what flags the global logs will watch */
1124     logger_set_flags();
1125     pthread_cond_signal(&logger_stack_cond);
1126 
1127     pthread_mutex_unlock(&logger_stack_lock);
1128     return LOGGER_ADD_WATCHER_OK;
1129 }
1130