1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3 /** \file
4 * The main memcached header holding commonly used data
5 * structures and function prototypes.
6 */
7
8 #ifdef HAVE_CONFIG_H
9 #include "config.h"
10 #endif
11
12 #include <sys/types.h>
13 #include <sys/socket.h>
14 #include <sys/time.h>
15 #include <netinet/in.h>
16 #include <event.h>
17 #include <netdb.h>
18 #include <pthread.h>
19 #include <unistd.h>
20 #include <assert.h>
21 #include <grp.h>
22 #include <signal.h>
23 /* need this to get IOV_MAX on some platforms. */
24 #ifndef __need_IOV_MAX
25 #define __need_IOV_MAX
26 #endif
27 #include <limits.h>
28 /* FreeBSD 4.x doesn't have IOV_MAX exposed. */
29 #ifndef IOV_MAX
30 #if defined(__FreeBSD__) || defined(__APPLE__) || defined(__GNU__)
31 # define IOV_MAX 1024
32 /* GNU/Hurd don't set MAXPATHLEN
33 * http://www.gnu.org/software/hurd/hurd/porting/guidelines.html#PATH_MAX_tt_MAX_PATH_tt_MAXPATHL */
34 #ifndef MAXPATHLEN
35 #define MAXPATHLEN 4096
36 #endif
37 #endif
38 #endif
39
40 #if defined(__linux__)
41 # define SOCK_COOKIE_ID SO_MARK
42 #elif defined(__FreeBSD__)
43 # define SOCK_COOKIE_ID SO_USER_COOKIE
44 #elif defined(__OpenBSD__)
45 # define SOCK_COOKIE_ID SO_RTABLE
46 #endif
47
48 #include "itoa_ljust.h"
49 #include "protocol_binary.h"
50 #include "cache.h"
51 #include "logger.h"
52 #include "queue.h"
53 #include "util.h"
54
55 #ifdef EXTSTORE
56 #include "crc32c.h"
57 #endif
58
59 #include "sasl_defs.h"
60
61 /* for NAPI pinning feature */
62 #ifndef SO_INCOMING_NAPI_ID
63 #define SO_INCOMING_NAPI_ID 56
64 #endif
65
66 /** Maximum length of a key. */
67 #define KEY_MAX_LENGTH 250
68
69 /** Maximum length of a uri encoded key. */
70 #define KEY_MAX_URI_ENCODED_LENGTH (KEY_MAX_LENGTH * 3 + 1)
71
72 /** Size of an incr buf. */
73 #define INCR_MAX_STORAGE_LEN 24
74
75 #define WRITE_BUFFER_SIZE 1024
76 #define READ_BUFFER_SIZE 16384
77 #define READ_BUFFER_CACHED 0
78 #define UDP_READ_BUFFER_SIZE 65536
79 #define UDP_MAX_PAYLOAD_SIZE 1400
80 #define UDP_HEADER_SIZE 8
81 #define UDP_DATA_SIZE 1392 // UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE
82 #define MAX_SENDBUF_SIZE (256 * 1024 * 1024)
83
84 /* Binary protocol stuff */
85 #define BIN_MAX_EXTLEN 20 // length of the _incr command is currently the longest.
86
87 /* Initial power multiplier for the hash table */
88 #define HASHPOWER_DEFAULT 16
89 #define HASHPOWER_MAX 32
90
91 /* Abstract the size of an item's client flag suffix */
92 #ifdef LARGE_CLIENT_FLAGS
93 typedef uint64_t client_flags_t;
94 #define safe_strtoflags safe_strtoull
95 #else
96 typedef uint32_t client_flags_t;
97 #define safe_strtoflags safe_strtoul
98 #endif
99
100 /*
101 * We only reposition items in the LRU queue if they haven't been repositioned
102 * in this many seconds. That saves us from churning on frequently-accessed
103 * items.
104 */
105 #define ITEM_UPDATE_INTERVAL 60
106
107 /*
108 * Valid range of the maximum size of an item, in bytes.
109 */
110 #define ITEM_SIZE_MAX_LOWER_LIMIT 1024
111 #define ITEM_SIZE_MAX_UPPER_LIMIT 1024 * 1024 * 1024
112
113 /* Slab sizing definitions. */
114 #define POWER_SMALLEST 1
115 #define POWER_LARGEST 256 /* actual cap is 255 */
116 #define SLAB_GLOBAL_PAGE_POOL 0 /* magic slab class for storing pages for reassignment */
117 #define CHUNK_ALIGN_BYTES 8
118 /* slab class max is a 6-bit number, -1. */
119 #define MAX_NUMBER_OF_SLAB_CLASSES (63 + 1)
120
121 /** How long an object can reasonably be assumed to be locked before
122 harvesting it on a low memory condition. Default: disabled. */
123 #define TAIL_REPAIR_TIME_DEFAULT 0
124
125 /* warning: don't use these macros with a function, as it evals its arg twice */
126 #define ITEM_get_cas(i) (((i)->it_flags & ITEM_CAS) ? \
127 (i)->data->cas : (uint64_t)0)
128
129 #define ITEM_set_cas(i,v) { \
130 if ((i)->it_flags & ITEM_CAS) { \
131 (i)->data->cas = v; \
132 } \
133 }
134
135 #define ITEM_key(item) (((char*)&((item)->data)) \
136 + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
137
138 #define ITEM_suffix(item) ((char*) &((item)->data) + (item)->nkey + 1 \
139 + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
140
141 #define ITEM_data(item) ((char*) &((item)->data) + (item)->nkey + 1 \
142 + (((item)->it_flags & ITEM_CFLAGS) ? sizeof(client_flags_t) : 0) \
143 + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
144
145 #define ITEM_ntotal(item) (sizeof(struct _stritem) + (item)->nkey + 1 \
146 + (item)->nbytes \
147 + (((item)->it_flags & ITEM_CFLAGS) ? sizeof(client_flags_t) : 0) \
148 + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
149
150 #define ITEM_clsid(item) ((item)->slabs_clsid & ~(3<<6))
151 #define ITEM_lruid(item) ((item)->slabs_clsid & (3<<6))
152
153 #define STAT_KEY_LEN 128
154 #define STAT_VAL_LEN 128
155
156 /** Append a simple stat with a stat name, value format and value */
157 #define APPEND_STAT(name, fmt, val) \
158 append_stat(name, add_stats, c, fmt, val);
159
160 /** Append an indexed stat with a stat name (with format), value format
161 and value */
162 #define APPEND_NUM_FMT_STAT(name_fmt, num, name, fmt, val) \
163 klen = snprintf(key_str, STAT_KEY_LEN, name_fmt, num, name); \
164 vlen = snprintf(val_str, STAT_VAL_LEN, fmt, val); \
165 add_stats(key_str, klen, val_str, vlen, c);
166
167 /** Common APPEND_NUM_FMT_STAT format. */
168 #define APPEND_NUM_STAT(num, name, fmt, val) \
169 APPEND_NUM_FMT_STAT("%d:%s", num, name, fmt, val)
170
171 /** Item client flag conversion */
172 #define FLAGS_CONV(it, flag) { \
173 if ((it)->it_flags & ITEM_CFLAGS) { \
174 flag = *((client_flags_t *)ITEM_suffix((it))); \
175 } else { \
176 flag = 0; \
177 } \
178 }
179
180 #define FLAGS_SIZE(item) (((item)->it_flags & ITEM_CFLAGS) ? sizeof(client_flags_t) : 0)
181
182 /**
183 * Callback for any function producing stats.
184 *
185 * @param key the stat's key
186 * @param klen length of the key
187 * @param val the stat's value in an ascii form (e.g. text form of a number)
188 * @param vlen length of the value
189 * @parm cookie magic callback cookie
190 */
191 typedef void (*ADD_STAT)(const char *key, const uint16_t klen,
192 const char *val, const uint32_t vlen,
193 const void *cookie);
194
195 /*
196 * NOTE: If you modify this table you _MUST_ update the function state_text
197 */
198 /**
199 * Possible states of a connection.
200 */
201 enum conn_states {
202 conn_listening, /**< the socket which listens for connections */
203 conn_new_cmd, /**< Prepare connection for next command */
204 conn_waiting, /**< waiting for a readable socket */
205 conn_read, /**< reading in a command line */
206 conn_parse_cmd, /**< try to parse a command from the input buffer */
207 conn_write, /**< writing out a simple response */
208 conn_nread, /**< reading in a fixed number of bytes */
209 conn_swallow, /**< swallowing unnecessary bytes w/o storing */
210 conn_closing, /**< closing this connection */
211 conn_mwrite, /**< writing out many items sequentially */
212 conn_closed, /**< connection is closed */
213 conn_watch, /**< held by the logger thread as a watcher */
214 conn_io_queue, /**< wait on async. process to get response object */
215 conn_io_resume, /**< ready to resume mwrite after async work */
216 conn_io_pending, /**< got woken up while waiting for async work */
217 conn_max_state /**< Max state value (used for assertion) */
218 };
219
220 enum bin_substates {
221 bin_no_state,
222 bin_reading_set_header,
223 bin_reading_cas_header,
224 bin_read_set_value,
225 bin_reading_get_key,
226 bin_reading_stat,
227 bin_reading_del_header,
228 bin_reading_incr_header,
229 bin_read_flush_exptime,
230 bin_reading_sasl_auth,
231 bin_reading_sasl_auth_data,
232 bin_reading_touch_key,
233 };
234
235 enum protocol {
236 ascii_prot = 3, /* arbitrary value. */
237 binary_prot,
238 negotiating_prot, /* Discovering the protocol */
239 #ifdef PROXY
240 proxy_prot,
241 #endif
242 };
243
244 enum network_transport {
245 local_transport, /* Unix sockets*/
246 tcp_transport,
247 udp_transport
248 };
249
250 enum pause_thread_types {
251 PAUSE_WORKER_THREADS = 0,
252 PAUSE_ALL_THREADS,
253 RESUME_ALL_THREADS,
254 RESUME_WORKER_THREADS
255 };
256
257 enum stop_reasons {
258 NOT_STOP,
259 GRACE_STOP,
260 EXIT_NORMALLY
261 };
262
263 enum close_reasons {
264 ERROR_CLOSE,
265 NORMAL_CLOSE,
266 IDLE_TIMEOUT_CLOSE,
267 SHUTDOWN_CLOSE,
268 };
269
270 #define IS_TCP(x) (x == tcp_transport)
271 #define IS_UDP(x) (x == udp_transport)
272
273 #define NREAD_ADD 1
274 #define NREAD_SET 2
275 #define NREAD_REPLACE 3
276 #define NREAD_APPEND 4
277 #define NREAD_PREPEND 5
278 #define NREAD_CAS 6
279 #define NREAD_APPENDVIV 7 // specific to meta
280 #define NREAD_PREPENDVIV 8 // specific to meta
281
282 #define CAS_ALLOW_STALE true
283 #define CAS_NO_STALE false
284
285 #define LOG_TYPE_DELETE 1
286 #define LOG_TYPE_META_DELETE 2
287
288 enum store_item_type {
289 NOT_STORED=0, STORED, EXISTS, NOT_FOUND, TOO_LARGE, NO_MEMORY
290 };
291
292 enum delta_result_type {
293 OK, NON_NUMERIC, EOM, DELTA_ITEM_NOT_FOUND, DELTA_ITEM_CAS_MISMATCH
294 };
295
296 /** Time relative to server start. Smaller than time_t on 64-bit systems. */
297 // TODO: Move to sub-header. needed in logger.h
298 //typedef unsigned int rel_time_t;
299
300 /** Use X macros to avoid iterating over the stats fields during reset and
301 * aggregation. No longer have to add new stats in 3+ places.
302 */
303
304 #define SLAB_STATS_FIELDS \
305 X(set_cmds) \
306 X(get_hits) \
307 X(touch_hits) \
308 X(delete_hits) \
309 X(cas_hits) \
310 X(cas_badval) \
311 X(incr_hits) \
312 X(decr_hits)
313
314 /** Stats stored per slab (and per thread). */
315 struct slab_stats {
316 #define X(name) uint64_t name;
317 SLAB_STATS_FIELDS
318 #undef X
319 };
320
321 #define THREAD_STATS_FIELDS \
322 X(get_cmds) \
323 X(get_misses) \
324 X(get_expired) \
325 X(get_flushed) \
326 X(touch_cmds) \
327 X(touch_misses) \
328 X(delete_misses) \
329 X(incr_misses) \
330 X(decr_misses) \
331 X(cas_misses) \
332 X(meta_cmds) \
333 X(bytes_read) \
334 X(bytes_written) \
335 X(flush_cmds) \
336 X(conn_yields) /* # of yields for connections (-R option)*/ \
337 X(auth_cmds) \
338 X(auth_errors) \
339 X(idle_kicks) /* idle connections killed */ \
340 X(response_obj_oom) \
341 X(response_obj_count) \
342 X(response_obj_bytes) \
343 X(read_buf_oom) \
344 X(store_too_large) \
345 X(store_no_memory)
346
347 #ifdef EXTSTORE
348 #define EXTSTORE_THREAD_STATS_FIELDS \
349 X(get_extstore) \
350 X(get_aborted_extstore) \
351 X(get_oom_extstore) \
352 X(recache_from_extstore) \
353 X(miss_from_extstore) \
354 X(badcrc_from_extstore)
355 #endif
356
357 #ifdef PROXY
358 #define PROXY_THREAD_STATS_FIELDS \
359 X(proxy_conn_requests) \
360 X(proxy_conn_errors) \
361 X(proxy_conn_oom) \
362 X(proxy_req_active)
363 #endif
364
365 /**
366 * Stats stored per-thread.
367 */
368 struct thread_stats {
369 pthread_mutex_t mutex;
370 #define X(name) uint64_t name;
371 THREAD_STATS_FIELDS
372 #ifdef EXTSTORE
373 EXTSTORE_THREAD_STATS_FIELDS
374 #endif
375 #ifdef PROXY
376 PROXY_THREAD_STATS_FIELDS
377 #endif
378 #undef X
379 struct slab_stats slab_stats[MAX_NUMBER_OF_SLAB_CLASSES];
380 uint64_t lru_hits[POWER_LARGEST];
381 uint64_t read_buf_count;
382 uint64_t read_buf_bytes;
383 uint64_t read_buf_bytes_free;
384 };
385
386 /**
387 * Global stats. Only resettable stats should go into this structure.
388 */
389 struct stats {
390 uint64_t total_items;
391 uint64_t total_conns;
392 uint64_t rejected_conns;
393 uint64_t malloc_fails;
394 uint64_t listen_disabled_num;
395 uint64_t slabs_moved; /* times slabs were moved around */
396 uint64_t slab_reassign_rescues; /* items rescued during slab move */
397 uint64_t slab_reassign_evictions_nomem; /* valid items lost during slab move */
398 uint64_t slab_reassign_inline_reclaim; /* valid items lost during slab move */
399 uint64_t slab_reassign_chunk_rescues; /* chunked-item chunks recovered */
400 uint64_t slab_reassign_busy_items; /* valid temporarily unmovable */
401 uint64_t slab_reassign_busy_deletes; /* refcounted items killed */
402 uint64_t lru_crawler_starts; /* Number of item crawlers kicked off */
403 uint64_t lru_maintainer_juggles; /* number of LRU bg pokes */
404 uint64_t time_in_listen_disabled_us; /* elapsed time in microseconds while server unable to process new connections */
405 uint64_t log_worker_dropped; /* logs dropped by worker threads */
406 uint64_t log_worker_written; /* logs written by worker threads */
407 uint64_t log_watcher_skipped; /* logs watchers missed */
408 uint64_t log_watcher_sent; /* logs sent to watcher buffers */
409 #ifdef EXTSTORE
410 uint64_t extstore_compact_lost; /* items lost because they were locked */
411 uint64_t extstore_compact_rescues; /* items re-written during compaction */
412 uint64_t extstore_compact_skipped; /* unhit items skipped during compaction */
413 uint64_t extstore_compact_resc_cold; /* items re-written during compaction */
414 uint64_t extstore_compact_resc_old; /* items re-written during compaction */
415 #endif
416 #ifdef TLS
417 uint64_t ssl_proto_errors; /* TLS failures during SSL_read() and SSL_write() calls */
418 uint64_t ssl_handshake_errors; /* TLS failures at accept/handshake time */
419 uint64_t ssl_new_sessions; /* successfully negotiated new (non-reused) TLS sessions */
420 #endif
421 struct timeval maxconns_entered; /* last time maxconns entered */
422 uint64_t unexpected_napi_ids; /* see doc/napi_ids.txt */
423 uint64_t round_robin_fallback; /* see doc/napi_ids.txt */
424 };
425
426 /**
427 * Global "state" stats. Reflects state that shouldn't be wiped ever.
428 * Ordered for some cache line locality for commonly updated counters.
429 */
430 struct stats_state {
431 uint64_t curr_items;
432 uint64_t curr_bytes;
433 uint64_t curr_conns;
434 uint64_t hash_bytes; /* size used for hash tables */
435 unsigned int conn_structs;
436 unsigned int reserved_fds;
437 unsigned int hash_power_level; /* Better hope it's not over 9000 */
438 unsigned int log_watchers; /* number of currently active watchers */
439 bool hash_is_expanding; /* If the hash table is being expanded */
440 bool accepting_conns; /* whether we are currently accepting */
441 bool slab_reassign_running; /* slab reassign in progress */
442 bool lru_crawler_running; /* crawl in progress */
443 };
444
445 #define MAX_VERBOSITY_LEVEL 2
446
447 /* When adding a setting, be sure to update process_stat_settings */
448 /**
449 * Globally accessible settings as derived from the commandline.
450 */
451 struct settings {
452 size_t maxbytes;
453 int maxconns;
454 int port;
455 int udpport;
456 char *inter;
457 int verbose;
458 rel_time_t oldest_live; /* ignore existing items older than this */
459 int evict_to_free;
460 char *socketpath; /* path to unix socket if using local socket */
461 char *auth_file; /* path to user authentication file */
462 int access; /* access mask (a la chmod) for unix domain socket */
463 double factor; /* chunk size growth factor */
464 int chunk_size;
465 int num_threads; /* number of worker (without dispatcher) libevent threads to run */
466 int num_threads_per_udp; /* number of worker threads serving each udp socket */
467 char prefix_delimiter; /* character that marks a key prefix (for stats) */
468 int detail_enabled; /* nonzero if we're collecting detailed stats */
469 int reqs_per_event; /* Maximum number of io to process on each
470 io-event. */
471 bool use_cas;
472 enum protocol binding_protocol;
473 int backlog;
474 int item_size_max; /* Maximum item size */
475 int slab_chunk_size_max; /* Upper end for chunks within slab pages. */
476 int slab_page_size; /* Slab's page units. */
477 volatile sig_atomic_t sig_hup; /* a HUP signal was received but not yet handled */
478 bool sasl; /* SASL on/off */
479 bool maxconns_fast; /* Whether or not to early close connections */
480 bool lru_crawler; /* Whether or not to enable the autocrawler thread */
481 bool lru_maintainer_thread; /* LRU maintainer background thread */
482 bool lru_segmented; /* Use split or flat LRU's */
483 bool slab_reassign; /* Whether or not slab reassignment is allowed */
484 bool ssl_enabled; /* indicates whether SSL is enabled */
485 int slab_automove; /* Whether or not to automatically move slabs */
486 double slab_automove_ratio; /* youngest must be within pct of oldest */
487 unsigned int slab_automove_window; /* window mover for algorithm */
488 int hashpower_init; /* Starting hash power level */
489 bool shutdown_command; /* allow shutdown command */
490 int tail_repair_time; /* LRU tail refcount leak repair time */
491 bool flush_enabled; /* flush_all enabled */
492 bool dump_enabled; /* whether cachedump/metadump commands work */
493 char *hash_algorithm; /* Hash algorithm in use */
494 int lru_crawler_sleep; /* Microsecond sleep between items */
495 uint32_t lru_crawler_tocrawl; /* Number of items to crawl per run */
496 int hot_lru_pct; /* percentage of slab space for HOT_LRU */
497 int warm_lru_pct; /* percentage of slab space for WARM_LRU */
498 double hot_max_factor; /* HOT tail age relative to COLD tail */
499 double warm_max_factor; /* WARM tail age relative to COLD tail */
500 int crawls_persleep; /* Number of LRU crawls to run before sleeping */
501 bool temp_lru; /* TTL < temporary_ttl uses TEMP_LRU */
502 uint32_t temporary_ttl; /* temporary LRU threshold */
503 int idle_timeout; /* Number of seconds to let connections idle */
504 unsigned int logger_watcher_buf_size; /* size of logger's per-watcher buffer */
505 unsigned int logger_buf_size; /* size of per-thread logger buffer */
506 unsigned int read_buf_mem_limit; /* total megabytes allowable for net buffers */
507 bool drop_privileges; /* Whether or not to drop unnecessary process privileges */
508 bool watch_enabled; /* allows watch commands to be dropped */
509 bool relaxed_privileges; /* Relax process restrictions when running testapp */
510 #ifdef EXTSTORE
511 unsigned int ext_io_threadcount; /* number of IO threads to run. */
512 unsigned int ext_page_size; /* size in megabytes of storage pages. */
513 unsigned int ext_item_size; /* minimum size of items to store externally */
514 unsigned int ext_item_age; /* max age of tail item before storing ext. */
515 unsigned int ext_low_ttl; /* remaining TTL below this uses own pages */
516 unsigned int ext_recache_rate; /* counter++ % recache_rate == 0 > recache */
517 unsigned int ext_wbuf_size; /* read only note for the engine */
518 unsigned int ext_compact_under; /* when fewer than this many pages, compact */
519 unsigned int ext_drop_under; /* when fewer than this many pages, drop COLD items */
520 unsigned int ext_max_sleep; /* maximum sleep time for extstore bg threads, in us */
521 double ext_max_frag; /* ideal maximum page fragmentation */
522 double slab_automove_freeratio; /* % of memory to hold free as buffer */
523 bool ext_drop_unread; /* skip unread items during compaction */
524 /* start flushing to extstore after memory below this */
525 unsigned int ext_global_pool_min;
526 #endif
527 #ifdef TLS
528 void *ssl_ctx; /* holds the SSL server context which has the server certificate */
529 char *ssl_chain_cert; /* path to the server SSL chain certificate */
530 char *ssl_key; /* path to the server key */
531 int ssl_verify_mode; /* client certificate verify mode */
532 int ssl_keyformat; /* key format , default is PEM */
533 char *ssl_ciphers; /* list of SSL ciphers */
534 char *ssl_ca_cert; /* certificate with CAs. */
535 rel_time_t ssl_last_cert_refresh_time; /* time of the last server certificate refresh */
536 unsigned int ssl_wbuf_size; /* size of the write buffer used by ssl_sendmsg method */
537 bool ssl_session_cache; /* enable SSL server session caching */
538 bool ssl_kernel_tls; /* enable server kTLS */
539 int ssl_min_version; /* minimum SSL protocol version to accept */
540 #endif
541 int num_napi_ids; /* maximum number of NAPI IDs */
542 char *memory_file; /* warm restart memory file path */
543 #ifdef PROXY
544 bool proxy_enabled;
545 bool proxy_uring; /* if the proxy should use io_uring */
546 bool proxy_memprofile; /* output detail of lua allocations */
547 char *proxy_startfile; /* lua file to run when workers start */
548 char *proxy_startarg; /* string argument to pass to proxy */
549 void *proxy_ctx; /* proxy's state context */
550 #endif
551 #ifdef SOCK_COOKIE_ID
552 uint32_t sock_cookie_id;
553 #endif
554 };
555
556 extern struct stats stats;
557 extern struct stats_state stats_state;
558 extern time_t process_started;
559 extern struct settings settings;
560
561 #define ITEM_LINKED 1
562 #define ITEM_CAS 2
563
564 /* temp */
565 #define ITEM_SLABBED 4
566
567 /* Item was fetched at least once in its lifetime */
568 #define ITEM_FETCHED 8
569 /* Appended on fetch, removed on LRU shuffling */
570 #define ITEM_ACTIVE 16
571 /* If an item's storage are chained chunks. */
572 #define ITEM_CHUNKED 32
573 #define ITEM_CHUNK 64
574 /* ITEM_data bulk is external to item */
575 #define ITEM_HDR 128
576 /* additional 4 bytes for item client flags */
577 #define ITEM_CFLAGS 256
578 /* item has sent out a token already */
579 #define ITEM_TOKEN_SENT 512
580 /* reserved, in case tokens should be a 2-bit count in future */
581 #define ITEM_TOKEN_RESERVED 1024
582 /* if item has been marked as a stale value */
583 #define ITEM_STALE 2048
584 /* if item key was sent in binary */
585 #define ITEM_KEY_BINARY 4096
586
587 /**
588 * Structure for storing items within memcached.
589 */
590 typedef struct _stritem {
591 /* Protected by LRU locks */
592 struct _stritem *next;
593 struct _stritem *prev;
594 /* Rest are protected by an item lock */
595 struct _stritem *h_next; /* hash chain next */
596 rel_time_t time; /* least recent access */
597 rel_time_t exptime; /* expire time */
598 int nbytes; /* size of data */
599 unsigned short refcount;
600 uint16_t it_flags; /* ITEM_* above */
601 uint8_t slabs_clsid;/* which slab class we're in */
602 uint8_t nkey; /* key length, w/terminating null and padding */
603 /* this odd type prevents type-punning issues when we do
604 * the little shuffle to save space when not using CAS. */
605 union {
606 uint64_t cas;
607 char end;
608 } data[];
609 /* if it_flags & ITEM_CAS we have 8 bytes CAS */
610 /* then null-terminated key */
611 /* then " flags length\r\n" (no terminating null) */
612 /* then data with terminating \r\n (no terminating null; it's binary!) */
613 } item;
614
615 // TODO: If we eventually want user loaded modules, we can't use an enum :(
616 enum crawler_run_type {
617 CRAWLER_AUTOEXPIRE=0, CRAWLER_EXPIRED, CRAWLER_METADUMP, CRAWLER_MGDUMP
618 };
619
620 typedef struct {
621 struct _stritem *next;
622 struct _stritem *prev;
623 struct _stritem *h_next; /* hash chain next */
624 rel_time_t time; /* least recent access */
625 rel_time_t exptime; /* expire time */
626 int nbytes; /* size of data */
627 unsigned short refcount;
628 uint16_t it_flags; /* ITEM_* above */
629 uint8_t slabs_clsid;/* which slab class we're in */
630 uint8_t nkey; /* key length, w/terminating null and padding */
631 uint32_t remaining; /* Max keys to crawl per slab per invocation */
632 uint64_t reclaimed; /* items reclaimed during this crawl. */
633 uint64_t unfetched; /* items reclaimed unfetched during this crawl. */
634 uint64_t checked; /* items examined during this crawl. */
635 } crawler;
636
637 /* Header when an item is actually a chunk of another item. */
638 typedef struct _strchunk {
639 struct _strchunk *next; /* points within its own chain. */
640 struct _strchunk *prev; /* can potentially point to the head. */
641 struct _stritem *head; /* always points to the owner chunk */
642 int size; /* available chunk space in bytes */
643 int used; /* chunk space used */
644 int nbytes; /* used. */
645 unsigned short refcount; /* used? */
646 uint16_t it_flags; /* ITEM_* above. */
647 uint8_t slabs_clsid; /* Same as above. */
648 uint8_t orig_clsid; /* For obj hdr chunks slabs_clsid is fake. */
649 char data[];
650 } item_chunk;
651
652 #ifdef NEED_ALIGN
ITEM_schunk(item * it)653 static inline char *ITEM_schunk(item *it) {
654 int offset = it->nkey + 1
655 + ((it->it_flags & ITEM_CFLAGS) ? sizeof(client_flags_t) : 0)
656 + ((it->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0);
657 int remain = offset % 8;
658 if (remain != 0) {
659 offset += 8 - remain;
660 }
661 return ((char *) &(it->data)) + offset;
662 }
663 #else
664 #define ITEM_schunk(item) ((char*) &((item)->data) + (item)->nkey + 1 \
665 + (((item)->it_flags & ITEM_CFLAGS) ? sizeof(client_flags_t) : 0) \
666 + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
667 #endif
668
669 #ifdef EXTSTORE
670 typedef struct {
671 unsigned int page_version; /* from IO header */
672 unsigned int offset; /* from IO header */
673 unsigned short page_id; /* from IO header */
674 } item_hdr;
675 #endif
676
677 #define IO_QUEUE_COUNT 3
678
679 #define IO_QUEUE_NONE 0
680 #define IO_QUEUE_EXTSTORE 1
681 #define IO_QUEUE_PROXY 2
682
683 typedef STAILQ_HEAD(iop_head_s, _io_pending_t) iop_head_t;
684 typedef struct _io_pending_t io_pending_t;
685 typedef struct io_queue_s io_queue_t;
686 typedef void (*io_queue_stack_cb)(io_queue_t *q);
687 typedef void (*io_queue_cb)(io_pending_t *pending);
688 // This structure used to be passed between threads, but is now owned entirely
689 // by the worker threads.
690 // IO pending objects are created and stacked into this structure. They are
691 // then sent off to remote threads.
692 // The objects are returned one at a time to the worker threads, and this
693 // structure is then consulted to see when to resume the worker.
694 struct io_queue_s {
695 void *ctx; // duplicated from io_queue_cb_t
696 void *stack_ctx; // module-specific context to be batch-submitted
697 int count; // ios to process before returning. only accessed by queue processor once submitted
698 int type; // duplicated from io_queue_cb_t
699 };
700
701 typedef struct io_queue_cb_s {
702 void *ctx; // untouched ptr for specific context
703 io_queue_stack_cb submit_cb; // callback given a full stack of pending IO's at once.
704 int type;
705 } io_queue_cb_t;
706
707 struct thread_notify {
708 struct event notify_event; /* listen event for notify pipe or eventfd */
709 #ifdef HAVE_EVENTFD
710 int notify_event_fd; /* notify counter */
711 #else
712 int notify_receive_fd; /* receiving end of notify pipe */
713 int notify_send_fd; /* sending end of notify pipe */
714 #endif
715 };
716
717 typedef struct _mc_resp_bundle mc_resp_bundle;
718 typedef struct {
719 pthread_t thread_id; /* unique ID of this thread */
720 struct event_base *base; /* libevent handle this thread uses */
721 struct thread_notify n; /* for thread notification */
722 struct thread_notify ion; /* for thread IO object notification */
723 pthread_mutex_t ion_lock; /* mutex for ion_head */
724 iop_head_t ion_head; /* queue for IO object return */
725 int cur_sfd; /* client fd for logging commands */
726 int thread_baseid; /* which "number" thread this is for data offsets */
727 struct thread_stats stats; /* Stats generated by this thread */
728 io_queue_cb_t io_queues[IO_QUEUE_COUNT];
729 struct conn_queue *ev_queue; /* Worker/conn event queue */
730 cache_t *rbuf_cache; /* static-sized read buffers */
731 mc_resp_bundle *open_bundle;
732 cache_t *io_cache; /* IO objects */
733 #ifdef EXTSTORE
734 void *storage; /* data object for storage system */
735 #endif
736 logger *l; /* logger buffer */
737 void *lru_bump_buf; /* async LRU bump buffer */
738 #ifdef TLS
739 char *ssl_wbuf;
740 #endif
741 int napi_id; /* napi id associated with this thread */
742 #ifdef PROXY
743 void *proxy_ctx; // proxy global context
744 void *L; // lua VM
745 void *proxy_hooks;
746 void *proxy_user_stats;
747 void *proxy_int_stats;
748 void *proxy_event_thread; // worker threads can also be proxy IO threads
749 struct event *proxy_gc_timer; // periodic GC pushing.
750 pthread_mutex_t proxy_limit_lock;
751 int proxy_vm_extra_kb;
752 int proxy_vm_last_kb;
753 unsigned int proxy_vm_negative_delta;
754 int proxy_vm_gcrunning;
755 bool proxy_vm_needspoke;
756 uint64_t proxy_active_req_limit;
757 uint64_t proxy_buffer_memory_limit; // protected by limit_lock
758 uint64_t proxy_buffer_memory_used; // protected by limit_lock
759 uint32_t proxy_rng[4]; // fast per-thread rng for lua.
760 // TODO: add ctx object so we can attach to queue.
761 #endif
762 } LIBEVENT_THREAD;
763
764 /**
765 * Response objects
766 */
767 #define MC_RESP_IOVCOUNT 4
768 typedef struct _mc_resp {
769 mc_resp_bundle *bundle; // ptr back to bundle
770 struct _mc_resp *next; // choo choo.
771 int wbytes; // bytes to write out of wbuf: might be able to nuke this.
772 int tosend; // total bytes to send for this response
773 void *write_and_free; /** free this memory after finishing writing */
774 io_pending_t *io_pending; /* pending IO descriptor for this response */
775
776 item *item; /* item associated with this response object, with reference held */
777 struct iovec iov[MC_RESP_IOVCOUNT]; /* built-in iovecs to simplify network code */
778 int chunked_total; /* total amount of chunked item data to send. */
779 uint8_t iovcnt;
780 uint8_t chunked_data_iov; /* this iov is a pointer to chunked data header */
781
782 /* instruct transmit to skip this response object. used by storage engines
783 * to asynchronously kill an object that was queued to write
784 */
785 bool skip;
786 bool free; // double free detection.
787 #ifdef PROXY
788 bool proxy_res; // we're handling a proxied response buffer.
789 #endif
790 // UDP bits. Copied in from the client.
791 uint16_t request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
792 uint16_t udp_sequence; /* packet counter when transmitting result */
793 uint16_t udp_total; /* total number of packets in sequence */
794 struct sockaddr_in6 request_addr; /* udp: Who sent this request */
795 socklen_t request_addr_size;
796
797 char wbuf[WRITE_BUFFER_SIZE];
798 } mc_resp;
799
800 #define MAX_RESP_PER_BUNDLE ((READ_BUFFER_SIZE - sizeof(mc_resp_bundle)) / sizeof(mc_resp))
801 struct _mc_resp_bundle {
802 uint8_t refcount;
803 uint8_t next_check; // next object to check on assignment.
804 LIBEVENT_THREAD *thread;
805 struct _mc_resp_bundle *next;
806 struct _mc_resp_bundle *prev;
807 mc_resp r[];
808 };
809
810 typedef struct conn conn;
811
812 struct _io_pending_t {
813 int io_queue_type; // matches one of IO_QUEUE_*
814 LIBEVENT_THREAD *thread;
815 conn *c;
816 mc_resp *resp; // associated response object
817 io_queue_cb return_cb; // called on worker thread.
818 io_queue_cb finalize_cb; // called back on the worker thread.
819 STAILQ_ENTRY(_io_pending_t) iop_next; // queue chain.
820 char data[120];
821 };
822
823 /**
824 * The structure representing a connection into memcached.
825 */
826 struct conn {
827 sasl_conn_t *sasl_conn;
828 int sfd;
829 bool sasl_started;
830 bool authenticated;
831 bool set_stale;
832 bool mset_res; /** uses mset format for return code */
833 bool close_after_write; /** flush write then move to close connection */
834 bool rbuf_malloced; /** read buffer was malloc'ed for ascii mget, needs free() */
835 bool item_malloced; /** item for conn_nread state is a temporary malloc */
836 uint8_t ssl_enabled;
837 void *ssl;
838 #ifdef TLS
839 char *ssl_wbuf;
840 #endif
841 enum conn_states state;
842 enum bin_substates substate;
843 rel_time_t last_cmd_time;
844 struct event event;
845 short ev_flags;
846 short which; /** which events were just triggered */
847
848 char *rbuf; /** buffer to read commands into */
849 char *rcurr; /** but if we parsed some already, this is where we stopped */
850 int rsize; /** total allocated size of rbuf */
851 int rbytes; /** how much data, starting from rcur, do we have unparsed */
852
853 mc_resp *resp; // tail response.
854 mc_resp *resp_head; // first response in current stack.
855 char *ritem; /** when we read in an item's value, it goes here */
856 int rlbytes;
857
858 /**
859 * item is used to hold an item structure created after reading the command
860 * line of set/add/replace commands, but before we finished reading the actual
861 * data. The data is read into ITEM_data(item) to avoid extra copying.
862 */
863
864 void *item; /* for commands set/add/replace */
865
866 /* data for the swallow state */
867 int sbytes; /* how many bytes to swallow */
868
869 int io_queues_submitted; /* see notes on io_queue_t */
870 io_queue_t io_queues[IO_QUEUE_COUNT]; /* set of deferred IO queues. */
871 #ifdef PROXY
872 void *proxy_rctx; /* pointer to active request context */
873 #endif
874 #ifdef EXTSTORE
875 unsigned int recache_counter;
876 #endif
877 enum protocol protocol; /* which protocol this connection speaks */
878 enum network_transport transport; /* what transport is used by this connection */
879 enum close_reasons close_reason; /* reason for transition into conn_closing */
880
881 /* data for UDP clients */
882 int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
883 struct sockaddr_in6 request_addr; /* udp: Who sent the most recent request */
884 socklen_t request_addr_size;
885
886 bool noreply; /* True if the reply should not be sent. */
887 /* current stats command */
888 struct {
889 char *buffer;
890 size_t size;
891 size_t offset;
892 } stats;
893
894 /* Binary protocol stuff */
895 /* This is where the binary header goes */
896 protocol_binary_request_header binary_header;
897 uint64_t cas; /* the cas to return */
898 uint64_t tag; /* listener stocket tag */
899 short cmd; /* current command being processed */
900 int opaque;
901 int keylen;
902 conn *next; /* Used for generating a list of conn structures */
903 LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
904 int (*try_read_command)(conn *c); /* pointer for top level input parser */
905 ssize_t (*read)(conn *c, void *buf, size_t count);
906 ssize_t (*sendmsg)(conn *c, struct msghdr *msg, int flags);
907 ssize_t (*write)(conn *c, void *buf, size_t count);
908 };
909
910 /* array of conn structures, indexed by file descriptor */
911 extern conn **conns;
912
913 /* current time of day (updated periodically) */
914 extern volatile rel_time_t current_time;
915
916 #ifdef MEMCACHED_DEBUG
917 extern volatile bool is_paused;
918 extern volatile int64_t delta;
919 #endif
920
921 /* TODO: Move to slabs.h? */
922 extern volatile int slab_rebalance_signal;
923
924 struct slab_rebalance {
925 void *slab_start;
926 void *slab_end;
927 void *slab_pos;
928 int s_clsid;
929 int d_clsid;
930 uint32_t busy_items;
931 uint32_t rescues;
932 uint32_t evictions_nomem;
933 uint32_t inline_reclaim;
934 uint32_t chunk_rescues;
935 uint32_t busy_deletes;
936 uint32_t busy_loops;
937 uint8_t done;
938 uint8_t *completed;
939 };
940
941 extern struct slab_rebalance slab_rebal;
942 #ifdef EXTSTORE
943 extern void *ext_storage;
944 #endif
945 /*
946 * Functions
947 */
948 void verify_default(const char* param, bool condition);
949 void do_accept_new_conns(const bool do_accept);
950 enum delta_result_type do_add_delta(LIBEVENT_THREAD *t, const char *key,
951 const size_t nkey, const bool incr,
952 const int64_t delta, char *buf,
953 uint64_t *cas, const uint32_t hv,
954 item **it_ret);
955 enum store_item_type do_store_item(item *item, int comm, LIBEVENT_THREAD *t, const uint32_t hv, int *nbytes, uint64_t *cas, const uint64_t cas_in, bool cas_stale);
956 void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb);
957 void conn_io_queue_setup(conn *c);
958 io_queue_t *conn_io_queue_get(conn *c, int type);
959 io_queue_cb_t *thread_io_queue_get(LIBEVENT_THREAD *t, int type);
960 void conn_io_queue_return(io_pending_t *io);
961 conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size,
962 enum network_transport transport, struct event_base *base, void *ssl, uint64_t conntag, enum protocol bproto);
963
964 void conn_worker_readd(conn *c);
965 extern int daemonize(int nochdir, int noclose);
966
967 #define mutex_lock(x) pthread_mutex_lock(x)
968 #define mutex_trylock(x) pthread_mutex_trylock(x)
969 #define mutex_unlock(x) pthread_mutex_unlock(x)
970
971 #include "stats_prefix.h"
972 #include "slabs.h"
973 #include "assoc.h"
974 #include "items.h"
975 #include "crawler.h"
976 #include "trace.h"
977 #include "hash.h"
978
979 /*
980 * Functions such as the libevent-related calls that need to do cross-thread
981 * communication in multithreaded mode (rather than actually doing the work
982 * in the current thread) are called via "dispatch_" frontends, which are
983 * also #define-d to directly call the underlying code in singlethreaded mode.
984 */
985 void memcached_thread_init(int nthreads, void *arg);
986 void redispatch_conn(conn *c);
987 void timeout_conn(conn *c);
988 #ifdef PROXY
989 void proxy_reload_notify(LIBEVENT_THREAD *t);
990 #endif
991 void return_io_pending(io_pending_t *io);
992 void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size,
993 enum network_transport transport, void *ssl, uint64_t conntag, enum protocol bproto);
994 void sidethread_conn_close(conn *c);
995
996 /* Lock wrappers for cache functions that are called from main loop. */
997 enum delta_result_type add_delta(LIBEVENT_THREAD *t, const char *key,
998 const size_t nkey, bool incr,
999 const int64_t delta, char *buf,
1000 uint64_t *cas);
1001 void accept_new_conns(const bool do_accept);
1002 void conn_close_idle(conn *c);
1003 void conn_close_all(void);
1004 item *item_alloc(const char *key, size_t nkey, client_flags_t flags, rel_time_t exptime, int nbytes);
1005 #define DO_UPDATE true
1006 #define DONT_UPDATE false
1007 item *item_get(const char *key, const size_t nkey, LIBEVENT_THREAD *t, const bool do_update);
1008 item *item_get_locked(const char *key, const size_t nkey, LIBEVENT_THREAD *t, const bool do_update, uint32_t *hv);
1009 item *item_touch(const char *key, const size_t nkey, uint32_t exptime, LIBEVENT_THREAD *t);
1010 int item_link(item *it);
1011 void item_remove(item *it);
1012 int item_replace(item *it, item *new_it, const uint32_t hv, const uint64_t cas_in);
1013 void item_unlink(item *it);
1014
1015 void item_lock(uint32_t hv);
1016 void *item_trylock(uint32_t hv);
1017 void item_trylock_unlock(void *arg);
1018 void item_unlock(uint32_t hv);
1019 void pause_threads(enum pause_thread_types type);
1020 void stop_threads(void);
1021 int stop_conn_timeout_thread(void);
1022 #define refcount_incr(it) ++(it->refcount)
1023 #define refcount_decr(it) --(it->refcount)
1024 void STATS_LOCK(void);
1025 void STATS_UNLOCK(void);
1026 #define THR_STATS_LOCK(t) pthread_mutex_lock(&t->stats.mutex)
1027 #define THR_STATS_UNLOCK(t) pthread_mutex_unlock(&t->stats.mutex)
1028 void threadlocal_stats_reset(void);
1029 void threadlocal_stats_aggregate(struct thread_stats *stats);
1030 void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out);
1031 void thread_setname(pthread_t thread, const char *name);
1032 LIBEVENT_THREAD *get_worker_thread(int id);
1033
1034 /* Stat processing functions */
1035 void append_stat(const char *name, ADD_STAT add_stats, conn *c,
1036 const char *fmt, ...);
1037
1038 enum store_item_type store_item(item *item, int comm, LIBEVENT_THREAD *t, int *nbytes, uint64_t *cas, const uint64_t cas_in, bool cas_stale);
1039
1040 /* Protocol related code */
1041 void out_string(conn *c, const char *str);
1042 #define REALTIME_MAXDELTA 60*60*24*30
1043 /* Negative exptimes can underflow and end up immortal. realtime() will
1044 immediately expire values that are greater than REALTIME_MAXDELTA, but less
1045 than process_started, so lets aim for that. */
1046 #define EXPTIME_TO_POSITIVE_TIME(exptime) (exptime < 0) ? \
1047 REALTIME_MAXDELTA + 1 : exptime
1048 rel_time_t realtime(const time_t exptime);
1049 item* limited_get(const char *key, size_t nkey, LIBEVENT_THREAD *t, uint32_t exptime, bool should_touch, bool do_update, bool *overflow);
1050 item* limited_get_locked(const char *key, size_t nkey, LIBEVENT_THREAD *t, bool do_update, uint32_t *hv, bool *overflow);
1051 // Read/Response object handlers.
1052 void resp_reset(mc_resp *resp);
1053 void resp_add_iov(mc_resp *resp, const void *buf, int len);
1054 void resp_add_chunked_iov(mc_resp *resp, const void *buf, int len);
1055 bool resp_start(conn *c);
1056 mc_resp *resp_start_unlinked(conn *c);
1057 mc_resp* resp_finish(conn *c, mc_resp *resp);
1058 void resp_free(LIBEVENT_THREAD *th, mc_resp *resp);
1059 bool resp_has_stack(conn *c);
1060 bool rbuf_switch_to_malloc(conn *c);
1061 void conn_release_items(conn *c);
1062 void conn_set_state(conn *c, enum conn_states state);
1063 void out_of_memory(conn *c, char *ascii_error);
1064 void out_errstring(conn *c, const char *str);
1065 void write_and_free(conn *c, char *buf, int bytes);
1066 void server_stats(ADD_STAT add_stats, void *c);
1067 void append_stats(const char *key, const uint16_t klen,
1068 const char *val, const uint32_t vlen,
1069 const void *cookie);
1070 /** Return a datum for stats in binary protocol */
1071 bool get_stats(const char *stat_type, int nkey, ADD_STAT add_stats, void *c);
1072 void stats_reset(void);
1073 void process_stat_settings(ADD_STAT add_stats, void *c);
1074 void process_stats_conns(ADD_STAT add_stats, void *c);
1075
1076 #if HAVE_DROP_PRIVILEGES
1077 extern void setup_privilege_violations_handler(void);
1078 extern void drop_privileges(void);
1079 #else
1080 #define setup_privilege_violations_handler()
1081 #define drop_privileges()
1082 #endif
1083
1084 #if HAVE_DROP_WORKER_PRIVILEGES
1085 extern void drop_worker_privileges(void);
1086 #else
1087 #define drop_worker_privileges()
1088 #endif
1089
1090 /* If supported, give compiler hints for branch prediction. */
1091 #if !defined(__GNUC__) || (__GNUC__ == 2 && __GNUC_MINOR__ < 96)
1092 #define __builtin_expect(x, expected_value) (x)
1093 #endif
1094
1095 #define likely(x) __builtin_expect((x),1)
1096 #define unlikely(x) __builtin_expect((x),0)
1097