1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3 #include <arpa/inet.h>
4 #include <stdlib.h>
5 #include <stdio.h>
6 #include <string.h>
7 #include <errno.h>
8 #include <poll.h>
9 #include <ctype.h>
10 #include <stdarg.h>
11
12 #if defined(__sun)
13 #include <atomic.h>
14 #endif
15
16 #include "memcached.h"
17 #include "bipbuffer.h"
18
19 #ifdef LOGGER_DEBUG
20 #define L_DEBUG(...) \
21 do { \
22 fprintf(stderr, __VA_ARGS__); \
23 } while (0)
24 #else
25 #define L_DEBUG(...)
26 #endif
27
28
29 /* TODO: put this in a struct and ditch the global vars. */
30 static logger *logger_stack_head = NULL;
31 static logger *logger_stack_tail = NULL;
32 static unsigned int logger_count = 0;
33 static volatile int do_run_logger_thread = 1;
34 static pthread_t logger_tid;
35 pthread_mutex_t logger_stack_lock = PTHREAD_MUTEX_INITIALIZER;
36 pthread_cond_t logger_stack_cond = PTHREAD_COND_INITIALIZER;
37
38 pthread_key_t logger_key;
39
40 #if !defined(HAVE_GCC_64ATOMICS) && !defined(__sun)
41 pthread_mutex_t logger_atomics_mutex = PTHREAD_MUTEX_INITIALIZER;
42 #endif
43
44 #define WATCHER_LIMIT 20
45 logger_watcher *watchers[20];
46 struct pollfd watchers_pollfds[20];
47 int watcher_count = 0;
48
49 #define WATCHER_ALL -1
50 static int logger_thread_poll_watchers(int force_poll, int watcher);
51
52 /* helpers for logger_log */
53
_logger_log_text(logentry * e,const entry_details * d,const void * entry,va_list ap)54 static void _logger_log_text(logentry *e, const entry_details *d, const void *entry, va_list ap) {
55 int reqlen = d->reqlen;
56 int total = vsnprintf((char *) e->data, reqlen, d->format, ap);
57 if (total <= 0) {
58 fprintf(stderr, "LOGGER: Failed to vsnprintf a text entry: (total) %d\n", total);
59 }
60 e->size = total + 1; // null byte
61 }
62
_logger_log_evictions(logentry * e,const entry_details * d,const void * entry,va_list ap)63 static void _logger_log_evictions(logentry *e, const entry_details *d, const void *entry, va_list ap) {
64 item *it = (item *)entry;
65 struct logentry_eviction *le = (struct logentry_eviction *) e->data;
66
67 le->exptime = (it->exptime > 0) ? (long long int)(it->exptime - current_time) : (long long int) -1;
68 le->latime = current_time - it->time;
69 le->it_flags = it->it_flags;
70 le->nkey = it->nkey;
71 le->nbytes = it->nbytes;
72 le->clsid = ITEM_clsid(it);
73 memcpy(le->key, ITEM_key(it), it->nkey);
74 e->size = sizeof(struct logentry_eviction) + le->nkey;
75 }
76 #ifdef EXTSTORE
_logger_log_ext_write(logentry * e,const entry_details * d,const void * entry,va_list ap)77 static void _logger_log_ext_write(logentry *e, const entry_details *d, const void *entry, va_list ap) {
78 item *it = (item *)entry;
79 int ew_bucket = va_arg(ap, int);
80
81 struct logentry_ext_write *le = (struct logentry_ext_write *) e->data;
82 le->exptime = (it->exptime > 0) ? (long long int)(it->exptime - current_time) : (long long int) -1;
83 le->latime = current_time - it->time;
84 le->it_flags = it->it_flags;
85 le->nkey = it->nkey;
86 le->clsid = ITEM_clsid(it);
87 le->bucket = (uint8_t)ew_bucket;
88 memcpy(le->key, ITEM_key(it), it->nkey);
89 e->size = sizeof(struct logentry_ext_write) + le->nkey;
90 }
91 #endif
92 // 0 == nf, 1 == found. 2 == flushed. 3 == expired.
93 // might be useful to store/print the flags an item has?
94 // could also collapse this and above code into an "item status" struct. wait
95 // for more endpoints to be written before making it generic, though.
_logger_log_item_get(logentry * e,const entry_details * d,const void * entry,va_list ap)96 static void _logger_log_item_get(logentry *e, const entry_details *d, const void *entry, va_list ap) {
97 int was_found = va_arg(ap, int);
98 char *key = va_arg(ap, char *);
99 int nkey = va_arg(ap, int);
100 int nbytes = va_arg(ap, int);
101 uint8_t clsid = va_arg(ap, int);
102 int sfd = va_arg(ap, int);
103
104 struct logentry_item_get *le = (struct logentry_item_get *) e->data;
105 le->was_found = was_found;
106 le->nkey = nkey;
107 le->nbytes = nbytes;
108 le->clsid = clsid;
109 memcpy(le->key, key, nkey);
110 le->sfd = sfd;
111 e->size = sizeof(struct logentry_item_get) + nkey;
112 }
113
_logger_log_item_store(logentry * e,const entry_details * d,const void * entry,va_list ap)114 static void _logger_log_item_store(logentry *e, const entry_details *d, const void *entry, va_list ap) {
115 enum store_item_type status = va_arg(ap, enum store_item_type);
116 int comm = va_arg(ap, int);
117 char *key = va_arg(ap, char *);
118 int nkey = va_arg(ap, int);
119 int nbytes = va_arg(ap, int);
120 rel_time_t ttl = va_arg(ap, rel_time_t);
121 uint8_t clsid = va_arg(ap, int);
122 int sfd = va_arg(ap, int);
123
124 struct logentry_item_store *le = (struct logentry_item_store *) e->data;
125 le->status = status;
126 le->cmd = comm;
127 le->nkey = nkey;
128 le->nbytes = nbytes;
129 le->clsid = clsid;
130 if (ttl != 0) {
131 le->ttl = ttl - current_time;
132 } else {
133 le->ttl = 0;
134 }
135 memcpy(le->key, key, nkey);
136 le->sfd = sfd;
137 e->size = sizeof(struct logentry_item_store) + nkey;
138 }
139
_logger_log_item_deleted(logentry * e,const entry_details * d,const void * entry,va_list ap)140 static void _logger_log_item_deleted(logentry *e, const entry_details *d, const void *entry, va_list ap) {
141 item *it = (item *)entry;
142 int comm = va_arg(ap, int);
143 struct logentry_deletion *le = (struct logentry_deletion *) e->data;
144 le->nkey = it->nkey;
145 le->cmd = comm;
146 le->nbytes = it->nbytes;
147 le->clsid = ITEM_clsid(it);
148 memcpy(le->key, ITEM_key(it), it->nkey);
149 e->size = sizeof(struct logentry_deletion) + le->nkey;
150 }
151
_logger_log_conn_event(logentry * e,const entry_details * d,const void * entry,va_list ap)152 static void _logger_log_conn_event(logentry *e, const entry_details *d, const void *entry, va_list ap) {
153 struct sockaddr_in6 *addr = va_arg(ap, struct sockaddr_in6 *);
154 socklen_t addrlen = va_arg(ap, socklen_t);
155 enum network_transport transport = va_arg(ap, enum network_transport);
156 enum close_reasons reason = va_arg(ap, enum close_reasons);
157 int sfd = va_arg(ap, int);
158
159 struct logentry_conn_event *le = (struct logentry_conn_event *) e->data;
160
161 memcpy(&le->addr, addr, addrlen);
162 le->sfd = sfd;
163 le->transport = transport;
164 le->reason = reason;
165 e->size = sizeof(struct logentry_conn_event);
166 }
167
168 /*************************
169 * Util functions used by the logger background thread
170 *************************/
171
_logger_util_addr_endpoint(struct sockaddr_in6 * addr,char * rip,size_t riplen,unsigned short * rport)172 static int _logger_util_addr_endpoint(struct sockaddr_in6 *addr, char *rip,
173 size_t riplen, unsigned short *rport) {
174 memset(rip, 0, riplen);
175 *rport = 0;
176
177 switch (addr->sin6_family) {
178 case AF_INET:
179 inet_ntop(AF_INET, &((struct sockaddr_in *) addr)->sin_addr,
180 rip, riplen - 1);
181 *rport = ntohs(((struct sockaddr_in *) addr)->sin_port);
182 break;
183 case AF_INET6:
184 inet_ntop(AF_INET6, &((struct sockaddr_in6 *) addr)->sin6_addr,
185 rip, riplen - 1);
186 *rport = ntohs(((struct sockaddr_in6 *) addr)->sin6_port);
187 break;
188 #ifndef DISABLE_UNIX_SOCKET
189 // Connections on Unix socket transports have c->request_addr zeroed out.
190 case AF_UNSPEC:
191 case AF_UNIX:
192 strncpy(rip, "unix", strlen("unix") + 1);
193 break;
194 #endif // #ifndef DISABLE_UNIX_SOCKET
195 }
196
197 return 0;
198 }
199
200 /*************************
201 * Logger background thread functions. Aggregates per-worker buffers and
202 * writes to any watchers.
203 *************************/
204
205 #define LOGGER_PARSE_SCRATCH 4096
206
_logger_parse_text(logentry * e,char * scratch)207 static int _logger_parse_text(logentry *e, char *scratch) {
208 return snprintf(scratch, LOGGER_PARSE_SCRATCH, "ts=%lld.%d gid=%llu %s\n",
209 (long long int)e->tv.tv_sec, (int)e->tv.tv_usec,
210 (unsigned long long) e->gid, (char *) e->data);
211 }
212
_logger_parse_ise(logentry * e,char * scratch)213 static int _logger_parse_ise(logentry *e, char *scratch) {
214 int total;
215 const char *cmd = "na";
216 char keybuf[KEY_MAX_URI_ENCODED_LENGTH];
217 struct logentry_item_store *le = (struct logentry_item_store *) e->data;
218 const char * const status_map[] = {
219 "not_stored", "stored", "exists", "not_found", "too_large", "no_memory" };
220 const char * const cmd_map[] = {
221 "null", "add", "set", "replace", "append", "prepend", "cas", "append", "prepend" };
222
223 if (le->cmd <= 8)
224 cmd = cmd_map[le->cmd];
225
226 uriencode(le->key, keybuf, le->nkey, KEY_MAX_URI_ENCODED_LENGTH);
227 total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
228 "ts=%lld.%d gid=%llu type=item_store key=%s status=%s cmd=%s ttl=%u clsid=%u cfd=%d size=%d\n",
229 (long long int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
230 keybuf, status_map[le->status], cmd, le->ttl, le->clsid, le->sfd,
231 le->nbytes > 0 ? le->nbytes - 2 : 0); // CLRF
232 return total;
233 }
234
_logger_parse_ige(logentry * e,char * scratch)235 static int _logger_parse_ige(logentry *e, char *scratch) {
236 int total;
237 struct logentry_item_get *le = (struct logentry_item_get *) e->data;
238 char keybuf[KEY_MAX_URI_ENCODED_LENGTH];
239 const char * const was_found_map[] = {
240 "not_found", "found", "flushed", "expired" };
241
242 uriencode(le->key, keybuf, le->nkey, KEY_MAX_URI_ENCODED_LENGTH);
243 total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
244 "ts=%lld.%d gid=%llu type=item_get key=%s status=%s clsid=%u cfd=%d size=%d\n",
245 (long long int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
246 keybuf, was_found_map[le->was_found], le->clsid, le->sfd,
247 le->nbytes > 0 ? le->nbytes - 2 : 0); // CLRF
248 return total;
249 }
250
_logger_parse_ee(logentry * e,char * scratch)251 static int _logger_parse_ee(logentry *e, char *scratch) {
252 int total;
253 char keybuf[KEY_MAX_URI_ENCODED_LENGTH];
254 struct logentry_eviction *le = (struct logentry_eviction *) e->data;
255 uriencode(le->key, keybuf, le->nkey, KEY_MAX_URI_ENCODED_LENGTH);
256 total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
257 "ts=%lld.%d gid=%llu type=eviction key=%s fetch=%s ttl=%lld la=%d clsid=%u size=%d\n",
258 (long long int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
259 keybuf, (le->it_flags & ITEM_FETCHED) ? "yes" : "no",
260 (long long int)le->exptime, le->latime, le->clsid,
261 le->nbytes > 0 ? le->nbytes - 2 : 0); // CLRF
262
263 return total;
264 }
265
_logger_parse_ide(logentry * e,char * scratch)266 static int _logger_parse_ide(logentry *e, char *scratch) {
267 int total;
268 const char *cmd = "na";
269 const char * const cmd_map[] = {
270 "null", "delete", "md" };
271 char keybuf[KEY_MAX_URI_ENCODED_LENGTH];
272 struct logentry_deletion *le = (struct logentry_deletion *) e->data;
273 uriencode(le->key, keybuf, le->nkey, KEY_MAX_URI_ENCODED_LENGTH);
274
275 if (le->cmd <= 2)
276 cmd = cmd_map[le->cmd];
277
278 total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
279 "ts=%d.%d gid=%llu type=deleted key=%s cmd=%s clsid=%u size=%d\n",
280 (int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
281 keybuf, cmd, le->clsid,
282 le->nbytes > 0 ? le->nbytes - 2 : 0); // CLRF
283 return total;
284 }
285
286 #ifdef EXTSTORE
_logger_parse_extw(logentry * e,char * scratch)287 static int _logger_parse_extw(logentry *e, char *scratch) {
288 int total;
289 char keybuf[KEY_MAX_URI_ENCODED_LENGTH];
290 struct logentry_ext_write *le = (struct logentry_ext_write *) e->data;
291 uriencode(le->key, keybuf, le->nkey, KEY_MAX_URI_ENCODED_LENGTH);
292 total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
293 "ts=%lld.%d gid=%llu type=extwrite key=%s fetch=%s ttl=%lld la=%d clsid=%u bucket=%u\n",
294 (long long int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
295 keybuf, (le->it_flags & ITEM_FETCHED) ? "yes" : "no",
296 (long long int)le->exptime, le->latime, le->clsid, le->bucket);
297
298 return total;
299 }
300 #endif
301
_logger_parse_cne(logentry * e,char * scratch)302 static int _logger_parse_cne(logentry *e, char *scratch) {
303 int total;
304 unsigned short rport = 0;
305 char rip[64];
306 struct logentry_conn_event *le = (struct logentry_conn_event *) e->data;
307 const char * const transport_map[] = { "local", "tcp", "udp" };
308
309 _logger_util_addr_endpoint(&le->addr, rip, sizeof(rip), &rport);
310
311 total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
312 "ts=%lld.%d gid=%llu type=conn_new rip=%s rport=%hu transport=%s cfd=%d\n",
313 (long long int) e->tv.tv_sec, (int) e->tv.tv_usec, (unsigned long long) e->gid,
314 rip, rport, transport_map[le->transport], le->sfd);
315
316 return total;
317 }
318
_logger_parse_cce(logentry * e,char * scratch)319 static int _logger_parse_cce(logentry *e, char *scratch) {
320 int total;
321 unsigned short rport = 0;
322 char rip[64];
323 struct logentry_conn_event *le = (struct logentry_conn_event *) e->data;
324 const char * const transport_map[] = { "local", "tcp", "udp" };
325 const char * const reason_map[] = { "error", "normal", "idle_timeout", "shutdown" };
326
327 _logger_util_addr_endpoint(&le->addr, rip, sizeof(rip), &rport);
328
329 total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
330 "ts=%lld.%d gid=%llu type=conn_close rip=%s rport=%hu transport=%s reason=%s cfd=%d\n",
331 (long long int) e->tv.tv_sec, (int) e->tv.tv_usec, (unsigned long long) e->gid,
332 rip, rport, transport_map[le->transport],
333 reason_map[le->reason], le->sfd);
334
335 return total;
336 }
337
338 #ifdef PROXY
339 // TODO (v2): the length caps here are all magic numbers. Haven't thought of
340 // something yet that I like better.
341 // Should at least make a define to the max log len (1024) and do some math
342 // here.
_logger_log_proxy_req(logentry * e,const entry_details * d,const void * entry,va_list ap)343 static void _logger_log_proxy_req(logentry *e, const entry_details *d, const void *entry, va_list ap) {
344 char *req = va_arg(ap, char *);
345 int reqlen = va_arg(ap, uint32_t);
346 long elapsed = va_arg(ap, long);
347 unsigned short type = va_arg(ap, int);
348 unsigned short code = va_arg(ap, int);
349 int status = va_arg(ap, int);
350 int conn_fd = va_arg(ap, int);
351 char *detail = va_arg(ap, char *);
352 int dlen = va_arg(ap, int);
353 char *be_name = va_arg(ap, char *);
354 char *be_port = va_arg(ap, char *);
355
356 struct logentry_proxy_req *le = (void *)e->data;
357 le->type = type;
358 le->code = code;
359 le->status = status;
360 le->conn_fd = conn_fd;
361 le->dlen = dlen;
362 le->elapsed = elapsed;
363 if (be_name && be_port) {
364 le->be_namelen = strlen(be_name);
365 le->be_portlen = strlen(be_port);
366 } else {
367 le->be_namelen = 0;
368 le->be_portlen = 0;
369 }
370 char *data = le->data;
371 if (req[reqlen-2] == '\r') {
372 reqlen -= 2;
373 } else {
374 reqlen--;
375 }
376 if (reqlen > 300) {
377 reqlen = 300;
378 }
379 if (dlen > 150) {
380 dlen = 150;
381 }
382 // be_namelen and be_portlen can't be longer than 255+6
383 le->reqlen = reqlen;
384 memcpy(data, req, reqlen);
385 data += reqlen;
386 memcpy(data, detail, dlen);
387 data += dlen;
388 memcpy(data, be_name, le->be_namelen);
389 data += le->be_namelen;
390 memcpy(data, be_port, le->be_portlen);
391 e->size = sizeof(struct logentry_proxy_req) + reqlen + dlen + le->be_namelen + le->be_portlen;
392 }
393
_logger_parse_prx_req(logentry * e,char * scratch)394 static int _logger_parse_prx_req(logentry *e, char *scratch) {
395 int total;
396 struct logentry_proxy_req *le = (void *)e->data;
397
398 total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
399 "ts=%lld.%d gid=%llu type=proxy_req elapsed=%lu type=%d code=%d status=%d cfd=%d be=%.*s:%.*s detail=%.*s req=%.*s\n",
400 (long long int) e->tv.tv_sec, (int) e->tv.tv_usec, (unsigned long long) e->gid,
401 le->elapsed, le->type, le->code, le->status, le->conn_fd,
402 (int)le->be_namelen, le->data+le->reqlen+le->dlen,
403 (int)le->be_portlen, le->data+le->reqlen+le->dlen+le->be_namelen, // fml.
404 (int)le->dlen, le->data+le->reqlen, (int)le->reqlen, le->data
405 );
406 return total;
407 }
408
409 #define MAX_RBUF_READ 100
_logger_log_proxy_errbe(logentry * e,const entry_details * d,const void * entry,va_list ap)410 static void _logger_log_proxy_errbe(logentry *e, const entry_details *d, const void *entry, va_list ap) {
411 char *errmsg = va_arg(ap, char *);
412 char *be_name = va_arg(ap, char *);
413 char *be_port = va_arg(ap, char *);
414 char *be_label = va_arg(ap, char *);
415 int be_depth = va_arg(ap, int);
416 char *be_rbuf = va_arg(ap, char *);
417 int be_rbuflen = va_arg(ap, int);
418 int be_retry = va_arg(ap, int);
419
420 struct logentry_proxy_errbe *le = (void *)e->data;
421 le->be_depth = be_depth;
422 le->retry = be_retry;
423 le->errlen = strlen(errmsg);
424 if (be_name && be_port) {
425 le->be_namelen = strlen(be_name);
426 le->be_portlen = strlen(be_port);
427 }
428
429 if (be_label) {
430 le->be_labellen = strlen(be_label);
431 }
432
433 le->be_rbuflen = be_rbuflen;
434 if (be_rbuflen > MAX_RBUF_READ) {
435 le->be_rbuflen = MAX_RBUF_READ;
436 }
437
438 char *data = le->data;
439 memcpy(data, errmsg, le->errlen);
440 data += le->errlen;
441 memcpy(data, be_name, le->be_namelen);
442 data += le->be_namelen;
443 memcpy(data, be_port, le->be_portlen);
444 data += le->be_portlen;
445 memcpy(data, be_label, le->be_labellen);
446 data += le->be_labellen;
447 memcpy(data, be_rbuf, le->be_rbuflen);
448 data += le->be_rbuflen;
449
450 e->size = sizeof(struct logentry_proxy_errbe) + (data - le->data);
451 }
452
_logger_parse_prx_errbe(logentry * e,char * scratch)453 static int _logger_parse_prx_errbe(logentry *e, char *scratch) {
454 int total;
455 char rbuf[MAX_RBUF_READ * 3]; // x 3 for worst case URI encoding.
456 struct logentry_proxy_errbe *le = (void *)e->data;
457 char *data = le->data;
458 char *errmsg = data;
459 data += le->errlen;
460 char *be_name = data;
461 data += le->be_namelen;
462 char *be_port = data;
463 data += le->be_portlen;
464 char *be_label = data;
465 data += le->be_labellen;
466 char *be_rbuf = data;
467
468 uriencode(be_rbuf, rbuf, le->be_rbuflen, MAX_RBUF_READ * 3);
469 if (le->retry) {
470 total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
471 "ts=%lld.%d gid=%llu type=proxy_backend error=%.*s name=%.*s port=%.*s label=%.*s retry=%d\n",
472 (long long int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
473 (int)le->errlen, errmsg, (int)le->be_namelen, be_name,
474 (int)le->be_portlen, be_port, (int)le->be_labellen, be_label, le->retry);
475 } else {
476 total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
477 "ts=%lld.%d gid=%llu type=proxy_backend error=%.*s name=%.*s port=%.*s label=%.*s depth=%d rbuf=%s\n",
478 (long long int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
479 (int)le->errlen, errmsg, (int)le->be_namelen, be_name,
480 (int)le->be_portlen, be_port, (int)le->be_labellen, be_label, le->be_depth, rbuf);
481 }
482
483 return total;
484 }
485 #endif
486
487 /* Should this go somewhere else? */
488 static const entry_details default_entries[] = {
489 [LOGGER_ASCII_CMD] = {512, LOG_RAWCMDS, _logger_log_text, _logger_parse_text, "<%d %s"},
490 [LOGGER_EVICTION] = {512, LOG_EVICTIONS, _logger_log_evictions, _logger_parse_ee, NULL},
491 [LOGGER_ITEM_GET] = {512, LOG_FETCHERS, _logger_log_item_get, _logger_parse_ige, NULL},
492 [LOGGER_ITEM_STORE] = {512, LOG_MUTATIONS, _logger_log_item_store, _logger_parse_ise, NULL},
493 [LOGGER_CRAWLER_STATUS] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text,
494 "type=lru_crawler crawler=%d lru=%s low_mark=%llu next_reclaims=%llu since_run=%u next_run=%d elapsed=%u examined=%llu reclaimed=%llu"
495 },
496 [LOGGER_SLAB_MOVE] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text,
497 "type=slab_move src=%d dst=%d"
498 },
499 [LOGGER_CONNECTION_NEW] = {512, LOG_CONNEVENTS, _logger_log_conn_event, _logger_parse_cne, NULL},
500 [LOGGER_CONNECTION_CLOSE] = {512, LOG_CONNEVENTS, _logger_log_conn_event, _logger_parse_cce, NULL},
501 [LOGGER_CONNECTION_ERROR] = {512, LOG_CONNEVENTS, _logger_log_text, _logger_parse_text,
502 "type=connerr fd=%d msg=%s"
503 },
504 [LOGGER_CONNECTION_TLSERROR] = {512, LOG_CONNEVENTS, _logger_log_text, _logger_parse_text,
505 "type=conntlserr fd=%d msg=%s"
506 },
507 [LOGGER_DELETIONS] = {512, LOG_DELETIONS, _logger_log_item_deleted, _logger_parse_ide, NULL},
508 #ifdef EXTSTORE
509 [LOGGER_EXTSTORE_WRITE] = {512, LOG_EVICTIONS, _logger_log_ext_write, _logger_parse_extw, NULL},
510 [LOGGER_COMPACT_START] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text,
511 "type=compact_start id=%lu version=%llu"
512 },
513 [LOGGER_COMPACT_ABORT] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text,
514 "type=compact_abort id=%lu"
515 },
516 [LOGGER_COMPACT_READ_START] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text,
517 "type=compact_read_start id=%lu offset=%llu"
518 },
519 [LOGGER_COMPACT_READ_END] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text,
520 "type=compact_read_end id=%lu offset=%llu rescues=%lu lost=%lu skipped=%lu"
521 },
522 [LOGGER_COMPACT_END] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text,
523 "type=compact_end id=%lu"
524 },
525 [LOGGER_COMPACT_FRAGINFO] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text,
526 "type=compact_fraginfo ratio=%.2f bytes=%lu"
527 },
528 #endif
529 #ifdef PROXY
530 [LOGGER_PROXY_CONFIG] = {512, LOG_PROXYEVENTS, _logger_log_text, _logger_parse_text,
531 "type=proxy_conf status=%s"
532 },
533 [LOGGER_PROXY_REQ] = {1024, LOG_PROXYREQS, _logger_log_proxy_req, _logger_parse_prx_req, NULL},
534 [LOGGER_PROXY_ERROR] = {512, LOG_PROXYEVENTS, _logger_log_text, _logger_parse_text,
535 "type=proxy_error msg=%s"
536 },
537 [LOGGER_PROXY_USER] = {512, LOG_PROXYUSER, _logger_log_text, _logger_parse_text,
538 "type=proxy_user msg=%s"
539 },
540 [LOGGER_PROXY_BE_ERROR] = {512, LOG_PROXYEVENTS, _logger_log_proxy_errbe, _logger_parse_prx_errbe,
541 NULL
542 },
543
544 #endif
545 };
546
547 /*************************
548 * Util functions shared between bg thread and workers
549 *************************/
550
551 /* Logger GID's can be used by watchers to put logs back into strict order
552 */
553 static uint64_t logger_gid = 0;
logger_get_gid(void)554 uint64_t logger_get_gid(void) {
555 #ifdef HAVE_GCC_64ATOMICS
556 return __sync_add_and_fetch(&logger_gid, 1);
557 #elif defined(__sun)
558 return atomic_inc_64_nv(&logger_gid);
559 #else
560 mutex_lock(&logger_atomics_mutex);
561 uint64_t res = ++logger_gid;
562 mutex_unlock(&logger_atomics_mutex);
563 return res;
564 #endif
565 }
566
logger_set_gid(uint64_t gid)567 void logger_set_gid(uint64_t gid) {
568 #ifdef HAVE_GCC_64ATOMICS
569 __sync_add_and_fetch(&logger_gid, gid);
570 #elif defined(__sun)
571 atomic_add_64(&logger_gid);
572 #else
573 mutex_lock(&logger_atomics_mutex);
574 logger_gid = gid;
575 mutex_unlock(&logger_atomics_mutex);
576 #endif
577 }
578
579 /* TODO: genericize lists. would be nice to import queue.h if the impact is
580 * studied... otherwise can just write a local one.
581 */
582 /* Add to the list of threads with a logger object */
logger_link_q(logger * l)583 static void logger_link_q(logger *l) {
584 pthread_mutex_lock(&logger_stack_lock);
585 assert(l != logger_stack_head);
586
587 l->prev = 0;
588 l->next = logger_stack_head;
589 if (l->next) l->next->prev = l;
590 logger_stack_head = l;
591 if (logger_stack_tail == 0) logger_stack_tail = l;
592 logger_count++;
593 pthread_mutex_unlock(&logger_stack_lock);
594 return;
595 }
596
597 /* Remove from the list of threads with a logger object */
598 /*static void logger_unlink_q(logger *l) {
599 pthread_mutex_lock(&logger_stack_lock);
600 if (logger_stack_head == l) {
601 assert(l->prev == 0);
602 logger_stack_head = l->next;
603 }
604 if (logger_stack_tail == l) {
605 assert(l->next == 0);
606 logger_stack_tail = l->prev;
607 }
608 assert(l->next != l);
609 assert(l->prev != l);
610
611 if (l->next) l->next->prev = l->prev;
612 if (l->prev) l->prev->next = l->next;
613 logger_count--;
614 pthread_mutex_unlock(&logger_stack_lock);
615 return;
616 }*/
617
618 /* Called with logger stack locked.
619 * Iterates over every watcher collecting enabled flags.
620 */
logger_set_flags(void)621 static void logger_set_flags(void) {
622 logger *l = NULL;
623 int x = 0;
624 uint16_t f = 0; /* logger eflags */
625
626 for (x = 0; x < WATCHER_LIMIT; x++) {
627 logger_watcher *w = watchers[x];
628 if (w == NULL)
629 continue;
630
631 f |= w->eflags;
632 }
633 for (l = logger_stack_head; l != NULL; l=l->next) {
634 pthread_mutex_lock(&l->mutex);
635 l->eflags = f;
636 pthread_mutex_unlock(&l->mutex);
637 }
638 return;
639 }
640
641 /* Completes rendering of log line. */
logger_thread_parse_entry(logentry * e,struct logger_stats * ls,char * scratch,int * scratch_len)642 static enum logger_parse_entry_ret logger_thread_parse_entry(logentry *e, struct logger_stats *ls,
643 char *scratch, int *scratch_len) {
644 int total = 0;
645 const entry_details *d = &default_entries[e->event];
646 assert(d->parse_cb != NULL);
647 total = d->parse_cb(e, scratch);
648
649 if (total >= LOGGER_PARSE_SCRATCH || total <= 0) {
650 L_DEBUG("LOGGER: Failed to flatten log entry!\n");
651 return LOGGER_PARSE_ENTRY_FAILED;
652 } else {
653 *scratch_len = total;
654 }
655
656 return LOGGER_PARSE_ENTRY_OK;
657 }
658
659 /* Writes flattened entry to available watchers */
logger_thread_write_entry(logentry * e,struct logger_stats * ls,char * scratch,int scratch_len)660 static void logger_thread_write_entry(logentry *e, struct logger_stats *ls,
661 char *scratch, int scratch_len) {
662 int x, total;
663 /* Write the line into available watchers with matching flags */
664 for (x = 0; x < WATCHER_LIMIT; x++) {
665 logger_watcher *w = watchers[x];
666 char *skip_scr = NULL;
667 if (w == NULL || (e->eflags & w->eflags) == 0 || (e->gid < w->min_gid))
668 continue;
669
670 /* Avoid poll()'ing constantly when buffer is full by resetting a
671 * flag periodically.
672 */
673 while (!w->failed_flush &&
674 (skip_scr = (char *) bipbuf_request(w->buf, scratch_len + 128)) == NULL) {
675 if (logger_thread_poll_watchers(0, x) <= 0) {
676 L_DEBUG("LOGGER: Watcher had no free space for line of size (%d)\n", scratch_len + 128);
677 w->failed_flush = true;
678 }
679 }
680
681 if (w->failed_flush) {
682 L_DEBUG("LOGGER: Fast skipped for watcher [%d] due to failed_flush\n", w->sfd);
683 w->skipped++;
684 ls->watcher_skipped++;
685 continue;
686 }
687
688 if (w->skipped > 0) {
689 total = snprintf(skip_scr, 128, "skipped=%llu\n", (unsigned long long) w->skipped);
690 if (total >= 128 || total <= 0) {
691 L_DEBUG("LOGGER: Failed to flatten skipped message into watcher [%d]\n", w->sfd);
692 w->skipped++;
693 ls->watcher_skipped++;
694 continue;
695 }
696 bipbuf_push(w->buf, total);
697 w->skipped = 0;
698 }
699 /* Can't fail because bipbuf_request succeeded. */
700 bipbuf_offer(w->buf, (unsigned char *) scratch, scratch_len);
701 ls->watcher_sent++;
702 }
703 }
704
705 /* Called with logger stack locked.
706 * Releases every chunk associated with a watcher and closes the connection.
707 * We can't presently send a connection back to the worker for further
708 * processing.
709 */
logger_thread_close_watcher(logger_watcher * w)710 static void logger_thread_close_watcher(logger_watcher *w) {
711 L_DEBUG("LOGGER: Closing dead watcher\n");
712 watchers[w->id] = NULL;
713 sidethread_conn_close(w->c);
714 watcher_count--;
715 bipbuf_free(w->buf);
716 free(w);
717 logger_set_flags();
718 }
719
720 /* Reads a particular worker thread's available bipbuf bytes. Parses each log
721 * entry into the watcher buffers.
722 */
logger_thread_read(logger * l,struct logger_stats * ls)723 static int logger_thread_read(logger *l, struct logger_stats *ls) {
724 unsigned int size;
725 unsigned int pos = 0;
726 unsigned char *data;
727 char scratch[LOGGER_PARSE_SCRATCH];
728 logentry *e;
729 pthread_mutex_lock(&l->mutex);
730 data = bipbuf_peek_all(l->buf, &size);
731 pthread_mutex_unlock(&l->mutex);
732
733 if (data == NULL) {
734 return 0;
735 }
736 L_DEBUG("LOGGER: Got %d bytes from bipbuffer\n", size);
737
738 /* parse buffer */
739 while (pos < size && watcher_count > 0) {
740 enum logger_parse_entry_ret ret;
741 int scratch_len = 0;
742 e = (logentry *) (data + pos);
743 ret = logger_thread_parse_entry(e, ls, scratch, &scratch_len);
744 if (ret != LOGGER_PARSE_ENTRY_OK) {
745 /* TODO: stats counter */
746 fprintf(stderr, "LOGGER: Failed to parse log entry\n");
747 } else {
748 logger_thread_write_entry(e, ls, scratch, scratch_len);
749 }
750 pos += sizeof(logentry) + e->size + e->pad;
751 }
752 assert(pos <= size);
753
754 pthread_mutex_lock(&l->mutex);
755 data = bipbuf_poll(l->buf, size);
756 ls->worker_written += l->written;
757 ls->worker_dropped += l->dropped;
758 l->written = 0;
759 l->dropped = 0;
760 pthread_mutex_unlock(&l->mutex);
761 if (data == NULL) {
762 fprintf(stderr, "LOGGER: unexpectedly couldn't advance buf pointer\n");
763 assert(0);
764 }
765 return size; /* maybe the count of objects iterated? */
766 }
767
768 /* Since the event loop code isn't reusable without a refactor, and we have a
769 * limited number of potential watchers, we run our own poll loop.
770 * This calls poll() unnecessarily during write flushes, should be possible to
771 * micro-optimize later.
772 *
773 * This flushes buffers attached to watchers, iterating through the bytes set
774 * to each worker. Also checks for readability in case client connection was
775 * closed.
776 *
777 * Allows a specific watcher to be flushed (if buf full)
778 */
logger_thread_poll_watchers(int force_poll,int watcher)779 static int logger_thread_poll_watchers(int force_poll, int watcher) {
780 int x;
781 int nfd = 0;
782 unsigned char *data;
783 unsigned int data_size = 0;
784 int flushed = 0;
785
786 for (x = 0; x < WATCHER_LIMIT; x++) {
787 logger_watcher *w = watchers[x];
788 if (w == NULL || (watcher != WATCHER_ALL && x != watcher))
789 continue;
790
791 data = bipbuf_peek_all(w->buf, &data_size);
792 if (data != NULL) {
793 watchers_pollfds[nfd].fd = w->sfd;
794 watchers_pollfds[nfd].events = POLLOUT;
795 nfd++;
796 } else if (force_poll) {
797 watchers_pollfds[nfd].fd = w->sfd;
798 watchers_pollfds[nfd].events = POLLIN;
799 nfd++;
800 }
801 /* This gets set after a call to poll, and should be used to gate on
802 * calling poll again.
803 */
804 w->failed_flush = false;
805 }
806
807 if (nfd == 0)
808 return 0;
809
810 //L_DEBUG("LOGGER: calling poll() [data_size: %d]\n", data_size);
811 int ret = poll(watchers_pollfds, nfd, 0);
812
813 if (ret < 0) {
814 perror("something failed with logger thread watcher fd polling");
815 return -1;
816 }
817
818 nfd = 0;
819 for (x = 0; x < WATCHER_LIMIT; x++) {
820 logger_watcher *w = watchers[x];
821 if (w == NULL || (watcher != WATCHER_ALL && x != watcher))
822 continue;
823
824 data_size = 0;
825 /* Early detection of a disconnect. Otherwise we have to wait until
826 * the next write
827 */
828 if (watchers_pollfds[nfd].revents & POLLIN) {
829 char buf[1];
830 int res = ((conn*)w->c)->read(w->c, buf, 1);
831 if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK))) {
832 L_DEBUG("LOGGER: watcher closed remotely\n");
833 logger_thread_close_watcher(w);
834 nfd++;
835 continue;
836 }
837 }
838 if ((data = bipbuf_peek_all(w->buf, &data_size)) != NULL) {
839 if (watchers_pollfds[nfd].revents & (POLLHUP|POLLERR)) {
840 L_DEBUG("LOGGER: watcher closed during poll() call\n");
841 logger_thread_close_watcher(w);
842 } else if (watchers_pollfds[nfd].revents & POLLOUT) {
843 int total = 0;
844
845 /* We can write a bit. */
846 switch (w->t) {
847 case LOGGER_WATCHER_STDERR:
848 total = fwrite(data, 1, data_size, stderr);
849 break;
850 case LOGGER_WATCHER_CLIENT:
851 total = ((conn*)w->c)->write(w->c, data, data_size);
852 break;
853 }
854
855 L_DEBUG("LOGGER: poll() wrote %d to %d (data_size: %d) (bipbuf_used: %d)\n", total, w->sfd,
856 data_size, bipbuf_used(w->buf));
857 if (total == -1) {
858 if (errno != EAGAIN && errno != EWOULDBLOCK) {
859 logger_thread_close_watcher(w);
860 }
861 L_DEBUG("LOGGER: watcher hit EAGAIN\n");
862 } else if (total == 0) {
863 logger_thread_close_watcher(w);
864 } else {
865 bipbuf_poll(w->buf, total);
866 flushed += total;
867 }
868 }
869 }
870 nfd++;
871 }
872 return flushed;
873 }
874
logger_thread_flush_stats(struct logger_stats * ls)875 static void logger_thread_flush_stats(struct logger_stats *ls) {
876 STATS_LOCK();
877 stats.log_worker_dropped += ls->worker_dropped;
878 stats.log_worker_written += ls->worker_written;
879 stats.log_watcher_skipped += ls->watcher_skipped;
880 stats.log_watcher_sent += ls->watcher_sent;
881 stats_state.log_watchers = ls->watcher_count;
882 STATS_UNLOCK();
883 }
884
885 #define MAX_LOGGER_SLEEP 1000000
886 #define MIN_LOGGER_SLEEP 1000
887
888 /* Primary logger thread routine */
logger_thread(void * arg)889 static void *logger_thread(void *arg) {
890 useconds_t to_sleep = MIN_LOGGER_SLEEP;
891 L_DEBUG("LOGGER: Starting logger thread\n");
892 // TODO: If we ever have item references in the logger code, will need to
893 // ensure everything is dequeued before stopping the thread.
894 while (do_run_logger_thread) {
895 int found_logs = 0;
896 logger *l;
897 struct logger_stats ls;
898 memset(&ls, 0, sizeof(struct logger_stats));
899
900 /* only sleep if we're *above* the minimum */
901 if (to_sleep > MIN_LOGGER_SLEEP)
902 usleep(to_sleep);
903
904 /* Call function to iterate each logger. */
905 pthread_mutex_lock(&logger_stack_lock);
906 if (watcher_count == 0) {
907 // Not bothering to loop on the condition here since it's fine to
908 // walk through with zero watchers.
909 pthread_cond_wait(&logger_stack_cond, &logger_stack_lock);
910 }
911 for (l = logger_stack_head; l != NULL; l=l->next) {
912 /* lock logger, call function to manipulate it */
913 found_logs += logger_thread_read(l, &ls);
914 }
915
916 logger_thread_poll_watchers(1, WATCHER_ALL);
917
918 /* capture the current count within mutual exclusion of the lock */
919 ls.watcher_count = watcher_count;
920
921 pthread_mutex_unlock(&logger_stack_lock);
922
923 /* TODO: abstract into a function and share with lru_crawler */
924 if (!found_logs) {
925 if (to_sleep < MAX_LOGGER_SLEEP)
926 to_sleep += to_sleep / 8;
927 if (to_sleep > MAX_LOGGER_SLEEP)
928 to_sleep = MAX_LOGGER_SLEEP;
929 } else {
930 to_sleep /= 2;
931 if (to_sleep < MIN_LOGGER_SLEEP)
932 to_sleep = MIN_LOGGER_SLEEP;
933 }
934 logger_thread_flush_stats(&ls);
935 }
936
937 return NULL;
938 }
939
start_logger_thread(void)940 static int start_logger_thread(void) {
941 int ret;
942 do_run_logger_thread = 1;
943 if ((ret = pthread_create(&logger_tid, NULL,
944 logger_thread, NULL)) != 0) {
945 fprintf(stderr, "Can't start logger thread: %s\n", strerror(ret));
946 return -1;
947 }
948 thread_setname(logger_tid, "mc-log");
949 return 0;
950 }
951
stop_logger_thread(void)952 static int stop_logger_thread(void) {
953 // Guarantees that the logger thread is waiting on 'logger_stack_cond'
954 // before we signal it.
955 pthread_mutex_lock(&logger_stack_lock);
956 do_run_logger_thread = 0;
957 pthread_cond_signal(&logger_stack_cond);
958 pthread_mutex_unlock(&logger_stack_lock);
959 pthread_join(logger_tid, NULL);
960 return 0;
961 }
962
963 /*************************
964 * Public functions for submitting logs and starting loggers from workers.
965 *************************/
966
967 /* Global logger thread start/init */
logger_init(void)968 void logger_init(void) {
969 /* TODO: auto destructor when threads exit */
970 /* TODO: error handling */
971
972 /* init stack for iterating loggers */
973 logger_stack_head = 0;
974 logger_stack_tail = 0;
975 pthread_key_create(&logger_key, NULL);
976
977 if (start_logger_thread() != 0) {
978 abort();
979 }
980
981 /* This is what adding a STDERR watcher looks like. should replace old
982 * "verbose" settings. */
983 //logger_add_watcher(NULL, 0);
984 return;
985 }
986
logger_stop(void)987 void logger_stop(void) {
988 stop_logger_thread();
989 }
990
991 /* called *from* the thread using a logger.
992 * initializes the per-thread bipbuf, links it into the list of loggers
993 */
logger_create(void)994 logger *logger_create(void) {
995 L_DEBUG("LOGGER: Creating and linking new logger instance\n");
996 logger *l = calloc(1, sizeof(logger));
997 if (l == NULL) {
998 return NULL;
999 }
1000
1001 l->buf = bipbuf_new(settings.logger_buf_size);
1002 if (l->buf == NULL) {
1003 free(l);
1004 return NULL;
1005 }
1006
1007 l->entry_map = default_entries;
1008
1009 pthread_mutex_init(&l->mutex, NULL);
1010 pthread_setspecific(logger_key, l);
1011
1012 /* add to list of loggers */
1013 logger_link_q(l);
1014 return l;
1015 }
1016
1017 /* Public function for logging an entry.
1018 * Tries to encapsulate as much of the formatting as possible to simplify the
1019 * caller's code.
1020 */
logger_log(logger * l,const enum log_entry_type event,const void * entry,...)1021 enum logger_ret_type logger_log(logger *l, const enum log_entry_type event, const void *entry, ...) {
1022 bipbuf_t *buf = l->buf;
1023 bool nospace = false;
1024 va_list ap;
1025 logentry *e;
1026
1027 const entry_details *d = &l->entry_map[event];
1028 int reqlen = d->reqlen;
1029
1030 pthread_mutex_lock(&l->mutex);
1031 /* Request a maximum length of data to write to */
1032 e = (logentry *) bipbuf_request(buf, (sizeof(logentry) + reqlen));
1033 if (e == NULL) {
1034 l->dropped++;
1035 pthread_mutex_unlock(&l->mutex);
1036 return LOGGER_RET_NOSPACE;
1037 }
1038 e->event = event;
1039 e->pad = 0;
1040 e->gid = logger_get_gid();
1041 /* TODO: Could pass this down as an argument now that we're using
1042 * LOGGER_LOG() macro.
1043 */
1044 e->eflags = d->eflags;
1045 /* Noting time isn't optional. A feature may be added to avoid rendering
1046 * time and/or gid to a logger.
1047 */
1048 gettimeofday(&e->tv, NULL);
1049
1050 va_start(ap, entry);
1051 d->log_cb(e, d, entry, ap);
1052 va_end(ap);
1053
1054 #ifdef NEED_ALIGN
1055 /* Need to ensure *next* request is aligned. */
1056 if (sizeof(logentry) + e->size % 8 != 0) {
1057 e->pad = 8 - (sizeof(logentry) + e->size % 8);
1058 }
1059 #endif
1060
1061 /* Push pointer forward by the actual amount required */
1062 if (bipbuf_push(buf, (sizeof(logentry) + e->size + e->pad)) == 0) {
1063 fprintf(stderr, "LOGGER: Failed to bipbuf push a text entry\n");
1064 pthread_mutex_unlock(&l->mutex);
1065 return LOGGER_RET_ERR;
1066 }
1067 l->written++;
1068 L_DEBUG("LOGGER: Requested %d bytes, wrote %lu bytes\n", reqlen,
1069 (sizeof(logentry) + e->size));
1070
1071 pthread_mutex_unlock(&l->mutex);
1072
1073 if (nospace) {
1074 return LOGGER_RET_NOSPACE;
1075 } else {
1076 return LOGGER_RET_OK;
1077 }
1078 }
1079
1080 /* Passes a client connection socket from a primary worker thread to the
1081 * logger thread. Caller *must* event_del() the client before handing it over.
1082 * Presently there's no way to hand the client back to the worker thread.
1083 */
logger_add_watcher(void * c,const int sfd,uint16_t f)1084 enum logger_add_watcher_ret logger_add_watcher(void *c, const int sfd, uint16_t f) {
1085 int x;
1086 logger_watcher *w = NULL;
1087 pthread_mutex_lock(&logger_stack_lock);
1088 if (watcher_count >= WATCHER_LIMIT) {
1089 pthread_mutex_unlock(&logger_stack_lock);
1090 return LOGGER_ADD_WATCHER_TOO_MANY;
1091 }
1092
1093 for (x = 0; x < WATCHER_LIMIT-1; x++) {
1094 if (watchers[x] == NULL)
1095 break;
1096 }
1097
1098 w = calloc(1, sizeof(logger_watcher));
1099 if (w == NULL) {
1100 pthread_mutex_unlock(&logger_stack_lock);
1101 return LOGGER_ADD_WATCHER_FAILED;
1102 }
1103 w->c = c;
1104 w->sfd = sfd;
1105 if (sfd == 0 && c == NULL) {
1106 w->t = LOGGER_WATCHER_STDERR;
1107 } else {
1108 w->t = LOGGER_WATCHER_CLIENT;
1109 }
1110 w->id = x;
1111 w->eflags = f;
1112 w->min_gid = logger_get_gid();
1113 w->buf = bipbuf_new(settings.logger_watcher_buf_size);
1114 if (w->buf == NULL) {
1115 free(w);
1116 pthread_mutex_unlock(&logger_stack_lock);
1117 return LOGGER_ADD_WATCHER_FAILED;
1118 }
1119 bipbuf_offer(w->buf, (unsigned char *) "OK\r\n", 4);
1120
1121 watchers[x] = w;
1122 watcher_count++;
1123 /* Update what flags the global logs will watch */
1124 logger_set_flags();
1125 pthread_cond_signal(&logger_stack_cond);
1126
1127 pthread_mutex_unlock(&logger_stack_lock);
1128 return LOGGER_ADD_WATCHER_OK;
1129 }
1130