1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 
3 /** \file
4  * The main memcached header holding commonly used data
5  * structures and function prototypes.
6  */
7 
8 #ifdef HAVE_CONFIG_H
9 #include "config.h"
10 #endif
11 
12 #include <sys/types.h>
13 #include <sys/socket.h>
14 #include <sys/time.h>
15 #include <netinet/in.h>
16 #include <event.h>
17 #include <netdb.h>
18 #include <pthread.h>
19 #include <unistd.h>
20 #include <assert.h>
21 #include <grp.h>
22 #include <signal.h>
23 /* need this to get IOV_MAX on some platforms. */
24 #ifndef __need_IOV_MAX
25 #define __need_IOV_MAX
26 #endif
27 #include <limits.h>
28 /* FreeBSD 4.x doesn't have IOV_MAX exposed. */
29 #ifndef IOV_MAX
30 #if defined(__FreeBSD__) || defined(__APPLE__) || defined(__GNU__)
31 # define IOV_MAX 1024
32 /* GNU/Hurd don't set MAXPATHLEN
33  * http://www.gnu.org/software/hurd/hurd/porting/guidelines.html#PATH_MAX_tt_MAX_PATH_tt_MAXPATHL */
34 #ifndef MAXPATHLEN
35 #define MAXPATHLEN 4096
36 #endif
37 #endif
38 #endif
39 
40 #if defined(__linux__)
41 # define SOCK_COOKIE_ID SO_MARK
42 #elif defined(__FreeBSD__)
43 # define SOCK_COOKIE_ID SO_USER_COOKIE
44 #elif defined(__OpenBSD__)
45 # define SOCK_COOKIE_ID SO_RTABLE
46 #endif
47 
48 #include "itoa_ljust.h"
49 #include "protocol_binary.h"
50 #include "cache.h"
51 #include "logger.h"
52 #include "queue.h"
53 #include "util.h"
54 
55 #ifdef EXTSTORE
56 #include "crc32c.h"
57 #endif
58 
59 #include "sasl_defs.h"
60 
61 /* for NAPI pinning feature */
62 #ifndef SO_INCOMING_NAPI_ID
63 #define SO_INCOMING_NAPI_ID 56
64 #endif
65 
66 /** Maximum length of a key. */
67 #define KEY_MAX_LENGTH 250
68 
69 /** Maximum length of a uri encoded key. */
70 #define KEY_MAX_URI_ENCODED_LENGTH (KEY_MAX_LENGTH  * 3 + 1)
71 
72 /** Size of an incr buf. */
73 #define INCR_MAX_STORAGE_LEN 24
74 
75 #define WRITE_BUFFER_SIZE 1024
76 #define READ_BUFFER_SIZE 16384
77 #define READ_BUFFER_CACHED 0
78 #define UDP_READ_BUFFER_SIZE 65536
79 #define UDP_MAX_PAYLOAD_SIZE 1400
80 #define UDP_HEADER_SIZE 8
81 #define UDP_DATA_SIZE 1392 // UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE
82 #define MAX_SENDBUF_SIZE (256 * 1024 * 1024)
83 
84 /* Binary protocol stuff */
85 #define BIN_MAX_EXTLEN 20 // length of the _incr command is currently the longest.
86 
87 /* Initial power multiplier for the hash table */
88 #define HASHPOWER_DEFAULT 16
89 #define HASHPOWER_MAX 32
90 
91 /* Abstract the size of an item's client flag suffix */
92 #ifdef LARGE_CLIENT_FLAGS
93 typedef uint64_t client_flags_t;
94 #define safe_strtoflags safe_strtoull
95 #else
96 typedef uint32_t client_flags_t;
97 #define safe_strtoflags safe_strtoul
98 #endif
99 
100 /*
101  * We only reposition items in the LRU queue if they haven't been repositioned
102  * in this many seconds. That saves us from churning on frequently-accessed
103  * items.
104  */
105 #define ITEM_UPDATE_INTERVAL 60
106 
107 /*
108  * Valid range of the maximum size of an item, in bytes.
109  */
110 #define ITEM_SIZE_MAX_LOWER_LIMIT 1024
111 #define ITEM_SIZE_MAX_UPPER_LIMIT 1024 * 1024 * 1024
112 
113 /* Slab sizing definitions. */
114 #define POWER_SMALLEST 1
115 #define POWER_LARGEST  256 /* actual cap is 255 */
116 #define SLAB_GLOBAL_PAGE_POOL 0 /* magic slab class for storing pages for reassignment */
117 #define CHUNK_ALIGN_BYTES 8
118 /* slab class max is a 6-bit number, -1. */
119 #define MAX_NUMBER_OF_SLAB_CLASSES (63 + 1)
120 
121 /** How long an object can reasonably be assumed to be locked before
122     harvesting it on a low memory condition. Default: disabled. */
123 #define TAIL_REPAIR_TIME_DEFAULT 0
124 
125 /* warning: don't use these macros with a function, as it evals its arg twice */
126 #define ITEM_get_cas(i) (((i)->it_flags & ITEM_CAS) ? \
127         (i)->data->cas : (uint64_t)0)
128 
129 #define ITEM_set_cas(i,v) { \
130     if ((i)->it_flags & ITEM_CAS) { \
131         (i)->data->cas = v; \
132     } \
133 }
134 
135 #define ITEM_key(item) (((char*)&((item)->data)) \
136          + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
137 
138 #define ITEM_suffix(item) ((char*) &((item)->data) + (item)->nkey + 1 \
139          + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
140 
141 #define ITEM_data(item) ((char*) &((item)->data) + (item)->nkey + 1 \
142          + (((item)->it_flags & ITEM_CFLAGS) ? sizeof(client_flags_t) : 0) \
143          + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
144 
145 #define ITEM_ntotal(item) (sizeof(struct _stritem) + (item)->nkey + 1 \
146          + (item)->nbytes \
147          + (((item)->it_flags & ITEM_CFLAGS) ? sizeof(client_flags_t) : 0) \
148          + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
149 
150 #define ITEM_clsid(item) ((item)->slabs_clsid & ~(3<<6))
151 #define ITEM_lruid(item) ((item)->slabs_clsid & (3<<6))
152 
153 #define STAT_KEY_LEN 128
154 #define STAT_VAL_LEN 128
155 
156 /** Append a simple stat with a stat name, value format and value */
157 #define APPEND_STAT(name, fmt, val) \
158     append_stat(name, add_stats, c, fmt, val);
159 
160 /** Append an indexed stat with a stat name (with format), value format
161     and value */
162 #define APPEND_NUM_FMT_STAT(name_fmt, num, name, fmt, val)          \
163     klen = snprintf(key_str, STAT_KEY_LEN, name_fmt, num, name);    \
164     vlen = snprintf(val_str, STAT_VAL_LEN, fmt, val);               \
165     add_stats(key_str, klen, val_str, vlen, c);
166 
167 /** Common APPEND_NUM_FMT_STAT format. */
168 #define APPEND_NUM_STAT(num, name, fmt, val) \
169     APPEND_NUM_FMT_STAT("%d:%s", num, name, fmt, val)
170 
171 /** Item client flag conversion */
172 #define FLAGS_CONV(it, flag) { \
173     if ((it)->it_flags & ITEM_CFLAGS) { \
174         flag = *((client_flags_t *)ITEM_suffix((it))); \
175     } else { \
176         flag = 0; \
177     } \
178 }
179 
180 #define FLAGS_SIZE(item) (((item)->it_flags & ITEM_CFLAGS) ? sizeof(client_flags_t) : 0)
181 
182 /**
183  * Callback for any function producing stats.
184  *
185  * @param key the stat's key
186  * @param klen length of the key
187  * @param val the stat's value in an ascii form (e.g. text form of a number)
188  * @param vlen length of the value
189  * @parm cookie magic callback cookie
190  */
191 typedef void (*ADD_STAT)(const char *key, const uint16_t klen,
192                          const char *val, const uint32_t vlen,
193                          const void *cookie);
194 
195 /*
196  * NOTE: If you modify this table you _MUST_ update the function state_text
197  */
198 /**
199  * Possible states of a connection.
200  */
201 enum conn_states {
202     conn_listening,  /**< the socket which listens for connections */
203     conn_new_cmd,    /**< Prepare connection for next command */
204     conn_waiting,    /**< waiting for a readable socket */
205     conn_read,       /**< reading in a command line */
206     conn_parse_cmd,  /**< try to parse a command from the input buffer */
207     conn_write,      /**< writing out a simple response */
208     conn_nread,      /**< reading in a fixed number of bytes */
209     conn_swallow,    /**< swallowing unnecessary bytes w/o storing */
210     conn_closing,    /**< closing this connection */
211     conn_mwrite,     /**< writing out many items sequentially */
212     conn_closed,     /**< connection is closed */
213     conn_watch,      /**< held by the logger thread as a watcher */
214     conn_io_queue,   /**< wait on async. process to get response object */
215     conn_io_resume,  /**< ready to resume mwrite after async work */
216     conn_io_pending, /**< got woken up while waiting for async work */
217     conn_max_state   /**< Max state value (used for assertion) */
218 };
219 
220 enum bin_substates {
221     bin_no_state,
222     bin_reading_set_header,
223     bin_reading_cas_header,
224     bin_read_set_value,
225     bin_reading_get_key,
226     bin_reading_stat,
227     bin_reading_del_header,
228     bin_reading_incr_header,
229     bin_read_flush_exptime,
230     bin_reading_sasl_auth,
231     bin_reading_sasl_auth_data,
232     bin_reading_touch_key,
233 };
234 
235 enum protocol {
236     ascii_prot = 3, /* arbitrary value. */
237     binary_prot,
238     negotiating_prot, /* Discovering the protocol */
239 #ifdef PROXY
240     proxy_prot,
241 #endif
242 };
243 
244 enum network_transport {
245     local_transport, /* Unix sockets*/
246     tcp_transport,
247     udp_transport
248 };
249 
250 enum pause_thread_types {
251     PAUSE_WORKER_THREADS = 0,
252     PAUSE_ALL_THREADS,
253     RESUME_ALL_THREADS,
254     RESUME_WORKER_THREADS
255 };
256 
257 enum stop_reasons {
258     NOT_STOP,
259     GRACE_STOP,
260     EXIT_NORMALLY
261 };
262 
263 enum close_reasons {
264     ERROR_CLOSE,
265     NORMAL_CLOSE,
266     IDLE_TIMEOUT_CLOSE,
267     SHUTDOWN_CLOSE,
268 };
269 
270 #define IS_TCP(x) (x == tcp_transport)
271 #define IS_UDP(x) (x == udp_transport)
272 
273 #define NREAD_ADD 1
274 #define NREAD_SET 2
275 #define NREAD_REPLACE 3
276 #define NREAD_APPEND 4
277 #define NREAD_PREPEND 5
278 #define NREAD_CAS 6
279 #define NREAD_APPENDVIV 7 // specific to meta
280 #define NREAD_PREPENDVIV 8 // specific to meta
281 
282 #define CAS_ALLOW_STALE true
283 #define CAS_NO_STALE false
284 
285 #define LOG_TYPE_DELETE 1
286 #define LOG_TYPE_META_DELETE 2
287 
288 enum store_item_type {
289     NOT_STORED=0, STORED, EXISTS, NOT_FOUND, TOO_LARGE, NO_MEMORY
290 };
291 
292 enum delta_result_type {
293     OK, NON_NUMERIC, EOM, DELTA_ITEM_NOT_FOUND, DELTA_ITEM_CAS_MISMATCH
294 };
295 
296 /** Time relative to server start. Smaller than time_t on 64-bit systems. */
297 // TODO: Move to sub-header. needed in logger.h
298 //typedef unsigned int rel_time_t;
299 
300 /** Use X macros to avoid iterating over the stats fields during reset and
301  * aggregation. No longer have to add new stats in 3+ places.
302  */
303 
304 #define SLAB_STATS_FIELDS \
305     X(set_cmds) \
306     X(get_hits) \
307     X(touch_hits) \
308     X(delete_hits) \
309     X(cas_hits) \
310     X(cas_badval) \
311     X(incr_hits) \
312     X(decr_hits)
313 
314 /** Stats stored per slab (and per thread). */
315 struct slab_stats {
316 #define X(name) uint64_t    name;
317     SLAB_STATS_FIELDS
318 #undef X
319 };
320 
321 #define THREAD_STATS_FIELDS \
322     X(get_cmds) \
323     X(get_misses) \
324     X(get_expired) \
325     X(get_flushed) \
326     X(touch_cmds) \
327     X(touch_misses) \
328     X(delete_misses) \
329     X(incr_misses) \
330     X(decr_misses) \
331     X(cas_misses) \
332     X(meta_cmds) \
333     X(bytes_read) \
334     X(bytes_written) \
335     X(flush_cmds) \
336     X(conn_yields) /* # of yields for connections (-R option)*/ \
337     X(auth_cmds) \
338     X(auth_errors) \
339     X(idle_kicks) /* idle connections killed */ \
340     X(response_obj_oom) \
341     X(response_obj_count) \
342     X(response_obj_bytes) \
343     X(read_buf_oom) \
344     X(store_too_large) \
345     X(store_no_memory)
346 
347 #ifdef EXTSTORE
348 #define EXTSTORE_THREAD_STATS_FIELDS \
349     X(get_extstore) \
350     X(get_aborted_extstore) \
351     X(get_oom_extstore) \
352     X(recache_from_extstore) \
353     X(miss_from_extstore) \
354     X(badcrc_from_extstore)
355 #endif
356 
357 #ifdef PROXY
358 #define PROXY_THREAD_STATS_FIELDS \
359     X(proxy_conn_requests) \
360     X(proxy_conn_errors) \
361     X(proxy_conn_oom) \
362     X(proxy_req_active)
363 #endif
364 
365 /**
366  * Stats stored per-thread.
367  */
368 struct thread_stats {
369     pthread_mutex_t   mutex;
370 #define X(name) uint64_t    name;
371     THREAD_STATS_FIELDS
372 #ifdef EXTSTORE
373     EXTSTORE_THREAD_STATS_FIELDS
374 #endif
375 #ifdef PROXY
376     PROXY_THREAD_STATS_FIELDS
377 #endif
378 #undef X
379     struct slab_stats slab_stats[MAX_NUMBER_OF_SLAB_CLASSES];
380     uint64_t lru_hits[POWER_LARGEST];
381     uint64_t read_buf_count;
382     uint64_t read_buf_bytes;
383     uint64_t read_buf_bytes_free;
384 };
385 
386 /**
387  * Global stats. Only resettable stats should go into this structure.
388  */
389 struct stats {
390     uint64_t      total_items;
391     uint64_t      total_conns;
392     uint64_t      rejected_conns;
393     uint64_t      malloc_fails;
394     uint64_t      listen_disabled_num;
395     uint64_t      slabs_moved;       /* times slabs were moved around */
396     uint64_t      slab_reassign_rescues; /* items rescued during slab move */
397     uint64_t      slab_reassign_evictions_nomem; /* valid items lost during slab move */
398     uint64_t      slab_reassign_inline_reclaim; /* valid items lost during slab move */
399     uint64_t      slab_reassign_chunk_rescues; /* chunked-item chunks recovered */
400     uint64_t      slab_reassign_busy_items; /* valid temporarily unmovable */
401     uint64_t      slab_reassign_busy_deletes; /* refcounted items killed */
402     uint64_t      lru_crawler_starts; /* Number of item crawlers kicked off */
403     uint64_t      lru_maintainer_juggles; /* number of LRU bg pokes */
404     uint64_t      time_in_listen_disabled_us;  /* elapsed time in microseconds while server unable to process new connections */
405     uint64_t      log_worker_dropped; /* logs dropped by worker threads */
406     uint64_t      log_worker_written; /* logs written by worker threads */
407     uint64_t      log_watcher_skipped; /* logs watchers missed */
408     uint64_t      log_watcher_sent; /* logs sent to watcher buffers */
409 #ifdef EXTSTORE
410     uint64_t      extstore_compact_lost; /* items lost because they were locked */
411     uint64_t      extstore_compact_rescues; /* items re-written during compaction */
412     uint64_t      extstore_compact_skipped; /* unhit items skipped during compaction */
413     uint64_t      extstore_compact_resc_cold; /* items re-written during compaction */
414     uint64_t      extstore_compact_resc_old; /* items re-written during compaction */
415 #endif
416 #ifdef TLS
417     uint64_t      ssl_proto_errors; /* TLS failures during SSL_read() and SSL_write() calls */
418     uint64_t      ssl_handshake_errors; /* TLS failures at accept/handshake time */
419     uint64_t      ssl_new_sessions; /* successfully negotiated new (non-reused) TLS sessions */
420 #endif
421     struct timeval maxconns_entered;  /* last time maxconns entered */
422     uint64_t      unexpected_napi_ids;  /* see doc/napi_ids.txt */
423     uint64_t      round_robin_fallback; /* see doc/napi_ids.txt */
424 };
425 
426 /**
427  * Global "state" stats. Reflects state that shouldn't be wiped ever.
428  * Ordered for some cache line locality for commonly updated counters.
429  */
430 struct stats_state {
431     uint64_t      curr_items;
432     uint64_t      curr_bytes;
433     uint64_t      curr_conns;
434     uint64_t      hash_bytes;       /* size used for hash tables */
435     unsigned int  conn_structs;
436     unsigned int  reserved_fds;
437     unsigned int  hash_power_level; /* Better hope it's not over 9000 */
438     unsigned int  log_watchers; /* number of currently active watchers */
439     bool          hash_is_expanding; /* If the hash table is being expanded */
440     bool          accepting_conns;  /* whether we are currently accepting */
441     bool          slab_reassign_running; /* slab reassign in progress */
442     bool          lru_crawler_running; /* crawl in progress */
443 };
444 
445 #define MAX_VERBOSITY_LEVEL 2
446 
447 /* When adding a setting, be sure to update process_stat_settings */
448 /**
449  * Globally accessible settings as derived from the commandline.
450  */
451 struct settings {
452     size_t maxbytes;
453     int maxconns;
454     int port;
455     int udpport;
456     char *inter;
457     int verbose;
458     rel_time_t oldest_live; /* ignore existing items older than this */
459     int evict_to_free;
460     char *socketpath;   /* path to unix socket if using local socket */
461     char *auth_file;    /* path to user authentication file */
462     int access;  /* access mask (a la chmod) for unix domain socket */
463     double factor;          /* chunk size growth factor */
464     int chunk_size;
465     int num_threads;        /* number of worker (without dispatcher) libevent threads to run */
466     int num_threads_per_udp; /* number of worker threads serving each udp socket */
467     char prefix_delimiter;  /* character that marks a key prefix (for stats) */
468     int detail_enabled;     /* nonzero if we're collecting detailed stats */
469     int reqs_per_event;     /* Maximum number of io to process on each
470                                io-event. */
471     bool use_cas;
472     enum protocol binding_protocol;
473     int backlog;
474     int item_size_max;        /* Maximum item size */
475     int slab_chunk_size_max;  /* Upper end for chunks within slab pages. */
476     int slab_page_size;     /* Slab's page units. */
477     volatile sig_atomic_t sig_hup;  /* a HUP signal was received but not yet handled */
478     bool sasl;              /* SASL on/off */
479     bool maxconns_fast;     /* Whether or not to early close connections */
480     bool lru_crawler;        /* Whether or not to enable the autocrawler thread */
481     bool lru_maintainer_thread; /* LRU maintainer background thread */
482     bool lru_segmented;     /* Use split or flat LRU's */
483     bool slab_reassign;     /* Whether or not slab reassignment is allowed */
484     bool ssl_enabled; /* indicates whether SSL is enabled */
485     int slab_automove;     /* Whether or not to automatically move slabs */
486     double slab_automove_ratio; /* youngest must be within pct of oldest */
487     unsigned int slab_automove_window; /* window mover for algorithm */
488     int hashpower_init;     /* Starting hash power level */
489     bool shutdown_command; /* allow shutdown command */
490     int tail_repair_time;   /* LRU tail refcount leak repair time */
491     bool flush_enabled;     /* flush_all enabled */
492     bool dump_enabled;      /* whether cachedump/metadump commands work */
493     char *hash_algorithm;     /* Hash algorithm in use */
494     int lru_crawler_sleep;  /* Microsecond sleep between items */
495     uint32_t lru_crawler_tocrawl; /* Number of items to crawl per run */
496     int hot_lru_pct; /* percentage of slab space for HOT_LRU */
497     int warm_lru_pct; /* percentage of slab space for WARM_LRU */
498     double hot_max_factor; /* HOT tail age relative to COLD tail */
499     double warm_max_factor; /* WARM tail age relative to COLD tail */
500     int crawls_persleep; /* Number of LRU crawls to run before sleeping */
501     bool temp_lru; /* TTL < temporary_ttl uses TEMP_LRU */
502     uint32_t temporary_ttl; /* temporary LRU threshold */
503     int idle_timeout;       /* Number of seconds to let connections idle */
504     unsigned int logger_watcher_buf_size; /* size of logger's per-watcher buffer */
505     unsigned int logger_buf_size; /* size of per-thread logger buffer */
506     unsigned int read_buf_mem_limit; /* total megabytes allowable for net buffers */
507     bool drop_privileges;   /* Whether or not to drop unnecessary process privileges */
508     bool watch_enabled; /* allows watch commands to be dropped */
509     bool relaxed_privileges;   /* Relax process restrictions when running testapp */
510 #ifdef EXTSTORE
511     unsigned int ext_io_threadcount; /* number of IO threads to run. */
512     unsigned int ext_page_size; /* size in megabytes of storage pages. */
513     unsigned int ext_item_size; /* minimum size of items to store externally */
514     unsigned int ext_item_age; /* max age of tail item before storing ext. */
515     unsigned int ext_low_ttl; /* remaining TTL below this uses own pages */
516     unsigned int ext_recache_rate; /* counter++ % recache_rate == 0 > recache */
517     unsigned int ext_wbuf_size; /* read only note for the engine */
518     unsigned int ext_compact_under; /* when fewer than this many pages, compact */
519     unsigned int ext_drop_under; /* when fewer than this many pages, drop COLD items */
520     unsigned int ext_max_sleep; /* maximum sleep time for extstore bg threads, in us */
521     double ext_max_frag; /* ideal maximum page fragmentation */
522     double slab_automove_freeratio; /* % of memory to hold free as buffer */
523     bool ext_drop_unread; /* skip unread items during compaction */
524     /* start flushing to extstore after memory below this */
525     unsigned int ext_global_pool_min;
526 #endif
527 #ifdef TLS
528     void *ssl_ctx; /* holds the SSL server context which has the server certificate */
529     char *ssl_chain_cert; /* path to the server SSL chain certificate */
530     char *ssl_key; /* path to the server key */
531     int ssl_verify_mode; /* client certificate verify mode */
532     int ssl_keyformat; /* key format , default is PEM */
533     char *ssl_ciphers; /* list of SSL ciphers */
534     char *ssl_ca_cert; /* certificate with CAs. */
535     rel_time_t ssl_last_cert_refresh_time; /* time of the last server certificate refresh */
536     unsigned int ssl_wbuf_size; /* size of the write buffer used by ssl_sendmsg method */
537     bool ssl_session_cache; /* enable SSL server session caching */
538     bool ssl_kernel_tls; /* enable server kTLS */
539     int ssl_min_version; /* minimum SSL protocol version to accept */
540 #endif
541     int num_napi_ids;   /* maximum number of NAPI IDs */
542     char *memory_file;  /* warm restart memory file path */
543 #ifdef PROXY
544     bool proxy_enabled;
545     bool proxy_uring; /* if the proxy should use io_uring */
546     bool proxy_memprofile; /* output detail of lua allocations */
547     char *proxy_startfile; /* lua file to run when workers start */
548     char *proxy_startarg; /* string argument to pass to proxy */
549     void *proxy_ctx; /* proxy's state context */
550 #endif
551 #ifdef SOCK_COOKIE_ID
552     uint32_t sock_cookie_id;
553 #endif
554 };
555 
556 extern struct stats stats;
557 extern struct stats_state stats_state;
558 extern time_t process_started;
559 extern struct settings settings;
560 
561 #define ITEM_LINKED 1
562 #define ITEM_CAS 2
563 
564 /* temp */
565 #define ITEM_SLABBED 4
566 
567 /* Item was fetched at least once in its lifetime */
568 #define ITEM_FETCHED 8
569 /* Appended on fetch, removed on LRU shuffling */
570 #define ITEM_ACTIVE 16
571 /* If an item's storage are chained chunks. */
572 #define ITEM_CHUNKED 32
573 #define ITEM_CHUNK 64
574 /* ITEM_data bulk is external to item */
575 #define ITEM_HDR 128
576 /* additional 4 bytes for item client flags */
577 #define ITEM_CFLAGS 256
578 /* item has sent out a token already */
579 #define ITEM_TOKEN_SENT 512
580 /* reserved, in case tokens should be a 2-bit count in future */
581 #define ITEM_TOKEN_RESERVED 1024
582 /* if item has been marked as a stale value */
583 #define ITEM_STALE 2048
584 /* if item key was sent in binary */
585 #define ITEM_KEY_BINARY 4096
586 
587 /**
588  * Structure for storing items within memcached.
589  */
590 typedef struct _stritem {
591     /* Protected by LRU locks */
592     struct _stritem *next;
593     struct _stritem *prev;
594     /* Rest are protected by an item lock */
595     struct _stritem *h_next;    /* hash chain next */
596     rel_time_t      time;       /* least recent access */
597     rel_time_t      exptime;    /* expire time */
598     int             nbytes;     /* size of data */
599     unsigned short  refcount;
600     uint16_t        it_flags;   /* ITEM_* above */
601     uint8_t         slabs_clsid;/* which slab class we're in */
602     uint8_t         nkey;       /* key length, w/terminating null and padding */
603     /* this odd type prevents type-punning issues when we do
604      * the little shuffle to save space when not using CAS. */
605     union {
606         uint64_t cas;
607         char end;
608     } data[];
609     /* if it_flags & ITEM_CAS we have 8 bytes CAS */
610     /* then null-terminated key */
611     /* then " flags length\r\n" (no terminating null) */
612     /* then data with terminating \r\n (no terminating null; it's binary!) */
613 } item;
614 
615 // TODO: If we eventually want user loaded modules, we can't use an enum :(
616 enum crawler_run_type {
617     CRAWLER_AUTOEXPIRE=0, CRAWLER_EXPIRED, CRAWLER_METADUMP, CRAWLER_MGDUMP
618 };
619 
620 typedef struct {
621     struct _stritem *next;
622     struct _stritem *prev;
623     struct _stritem *h_next;    /* hash chain next */
624     rel_time_t      time;       /* least recent access */
625     rel_time_t      exptime;    /* expire time */
626     int             nbytes;     /* size of data */
627     unsigned short  refcount;
628     uint16_t        it_flags;   /* ITEM_* above */
629     uint8_t         slabs_clsid;/* which slab class we're in */
630     uint8_t         nkey;       /* key length, w/terminating null and padding */
631     uint32_t        remaining;  /* Max keys to crawl per slab per invocation */
632     uint64_t        reclaimed;  /* items reclaimed during this crawl. */
633     uint64_t        unfetched;  /* items reclaimed unfetched during this crawl. */
634     uint64_t        checked;    /* items examined during this crawl. */
635 } crawler;
636 
637 /* Header when an item is actually a chunk of another item. */
638 typedef struct _strchunk {
639     struct _strchunk *next;     /* points within its own chain. */
640     struct _strchunk *prev;     /* can potentially point to the head. */
641     struct _stritem  *head;     /* always points to the owner chunk */
642     int              size;      /* available chunk space in bytes */
643     int              used;      /* chunk space used */
644     int              nbytes;    /* used. */
645     unsigned short   refcount;  /* used? */
646     uint16_t         it_flags;  /* ITEM_* above. */
647     uint8_t          slabs_clsid; /* Same as above. */
648     uint8_t          orig_clsid; /* For obj hdr chunks slabs_clsid is fake. */
649     char data[];
650 } item_chunk;
651 
652 #ifdef NEED_ALIGN
ITEM_schunk(item * it)653 static inline char *ITEM_schunk(item *it) {
654     int offset = it->nkey + 1
655         + ((it->it_flags & ITEM_CFLAGS) ? sizeof(client_flags_t) : 0)
656         + ((it->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0);
657     int remain = offset % 8;
658     if (remain != 0) {
659         offset += 8 - remain;
660     }
661     return ((char *) &(it->data)) + offset;
662 }
663 #else
664 #define ITEM_schunk(item) ((char*) &((item)->data) + (item)->nkey + 1 \
665          + (((item)->it_flags & ITEM_CFLAGS) ? sizeof(client_flags_t) : 0) \
666          + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
667 #endif
668 
669 #ifdef EXTSTORE
670 typedef struct {
671     unsigned int page_version; /* from IO header */
672     unsigned int offset; /* from IO header */
673     unsigned short page_id; /* from IO header */
674 } item_hdr;
675 #endif
676 
677 #define IO_QUEUE_COUNT 3
678 
679 #define IO_QUEUE_NONE 0
680 #define IO_QUEUE_EXTSTORE 1
681 #define IO_QUEUE_PROXY 2
682 
683 typedef STAILQ_HEAD(iop_head_s, _io_pending_t) iop_head_t;
684 typedef struct _io_pending_t io_pending_t;
685 typedef struct io_queue_s io_queue_t;
686 typedef void (*io_queue_stack_cb)(io_queue_t *q);
687 typedef void (*io_queue_cb)(io_pending_t *pending);
688 // This structure used to be passed between threads, but is now owned entirely
689 // by the worker threads.
690 // IO pending objects are created and stacked into this structure. They are
691 // then sent off to remote threads.
692 // The objects are returned one at a time to the worker threads, and this
693 // structure is then consulted to see when to resume the worker.
694 struct io_queue_s {
695     void *ctx; // duplicated from io_queue_cb_t
696     void *stack_ctx; // module-specific context to be batch-submitted
697     int count; // ios to process before returning. only accessed by queue processor once submitted
698     int type; // duplicated from io_queue_cb_t
699 };
700 
701 typedef struct io_queue_cb_s {
702     void *ctx; // untouched ptr for specific context
703     io_queue_stack_cb submit_cb; // callback given a full stack of pending IO's at once.
704     int type;
705 } io_queue_cb_t;
706 
707 struct thread_notify {
708     struct event notify_event;  /* listen event for notify pipe or eventfd */
709 #ifdef HAVE_EVENTFD
710     int notify_event_fd;        /* notify counter */
711 #else
712     int notify_receive_fd;      /* receiving end of notify pipe */
713     int notify_send_fd;         /* sending end of notify pipe */
714 #endif
715 };
716 
717 typedef struct _mc_resp_bundle mc_resp_bundle;
718 typedef struct {
719     pthread_t thread_id;        /* unique ID of this thread */
720     struct event_base *base;    /* libevent handle this thread uses */
721     struct thread_notify n;     /* for thread notification */
722     struct thread_notify ion;   /* for thread IO object notification */
723     pthread_mutex_t ion_lock;   /* mutex for ion_head */
724     iop_head_t ion_head;        /* queue for IO object return */
725     int cur_sfd;                /* client fd for logging commands */
726     int thread_baseid;          /* which "number" thread this is for data offsets */
727     struct thread_stats stats;  /* Stats generated by this thread */
728     io_queue_cb_t io_queues[IO_QUEUE_COUNT];
729     struct conn_queue *ev_queue; /* Worker/conn event queue */
730     cache_t *rbuf_cache;        /* static-sized read buffers */
731     mc_resp_bundle *open_bundle;
732     cache_t *io_cache;          /* IO objects */
733 #ifdef EXTSTORE
734     void *storage;              /* data object for storage system */
735 #endif
736     logger *l;                  /* logger buffer */
737     void *lru_bump_buf;         /* async LRU bump buffer */
738 #ifdef TLS
739     char   *ssl_wbuf;
740 #endif
741     int napi_id;                /* napi id associated with this thread */
742 #ifdef PROXY
743     void *proxy_ctx; // proxy global context
744     void *L; // lua VM
745     void *proxy_hooks;
746     void *proxy_user_stats;
747     void *proxy_int_stats;
748     void *proxy_event_thread; // worker threads can also be proxy IO threads
749     struct event *proxy_gc_timer; // periodic GC pushing.
750     pthread_mutex_t proxy_limit_lock;
751     int proxy_vm_extra_kb;
752     int proxy_vm_last_kb;
753     unsigned int proxy_vm_negative_delta;
754     int proxy_vm_gcrunning;
755     bool proxy_vm_needspoke;
756     uint64_t proxy_active_req_limit;
757     uint64_t proxy_buffer_memory_limit; // protected by limit_lock
758     uint64_t proxy_buffer_memory_used; // protected by limit_lock
759     uint32_t proxy_rng[4]; // fast per-thread rng for lua.
760     // TODO: add ctx object so we can attach to queue.
761 #endif
762 } LIBEVENT_THREAD;
763 
764 /**
765  * Response objects
766  */
767 #define MC_RESP_IOVCOUNT 4
768 typedef struct _mc_resp {
769     mc_resp_bundle *bundle; // ptr back to bundle
770     struct _mc_resp *next; // choo choo.
771     int wbytes; // bytes to write out of wbuf: might be able to nuke this.
772     int tosend; // total bytes to send for this response
773     void *write_and_free; /** free this memory after finishing writing */
774     io_pending_t *io_pending; /* pending IO descriptor for this response */
775 
776     item *item; /* item associated with this response object, with reference held */
777     struct iovec iov[MC_RESP_IOVCOUNT]; /* built-in iovecs to simplify network code */
778     int chunked_total; /* total amount of chunked item data to send. */
779     uint8_t iovcnt;
780     uint8_t chunked_data_iov; /* this iov is a pointer to chunked data header */
781 
782     /* instruct transmit to skip this response object. used by storage engines
783      * to asynchronously kill an object that was queued to write
784      */
785     bool skip;
786     bool free; // double free detection.
787 #ifdef PROXY
788     bool proxy_res; // we're handling a proxied response buffer.
789 #endif
790     // UDP bits. Copied in from the client.
791     uint16_t    request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
792     uint16_t    udp_sequence; /* packet counter when transmitting result */
793     uint16_t    udp_total; /* total number of packets in sequence */
794     struct sockaddr_in6 request_addr; /* udp: Who sent this request */
795     socklen_t request_addr_size;
796 
797     char wbuf[WRITE_BUFFER_SIZE];
798 } mc_resp;
799 
800 #define MAX_RESP_PER_BUNDLE ((READ_BUFFER_SIZE - sizeof(mc_resp_bundle)) / sizeof(mc_resp))
801 struct _mc_resp_bundle {
802     uint8_t refcount;
803     uint8_t next_check; // next object to check on assignment.
804     LIBEVENT_THREAD *thread;
805     struct _mc_resp_bundle *next;
806     struct _mc_resp_bundle *prev;
807     mc_resp r[];
808 };
809 
810 typedef struct conn conn;
811 
812 struct _io_pending_t {
813     int io_queue_type; // matches one of IO_QUEUE_*
814     LIBEVENT_THREAD *thread;
815     conn *c;
816     mc_resp *resp; // associated response object
817     io_queue_cb return_cb; // called on worker thread.
818     io_queue_cb finalize_cb; // called back on the worker thread.
819     STAILQ_ENTRY(_io_pending_t) iop_next; // queue chain.
820     char data[120];
821 };
822 
823 /**
824  * The structure representing a connection into memcached.
825  */
826 struct conn {
827     sasl_conn_t *sasl_conn;
828     int    sfd;
829     bool sasl_started;
830     bool authenticated;
831     bool set_stale;
832     bool mset_res; /** uses mset format for return code */
833     bool close_after_write; /** flush write then move to close connection */
834     bool rbuf_malloced; /** read buffer was malloc'ed for ascii mget, needs free() */
835     bool item_malloced; /** item for conn_nread state is a temporary malloc */
836     uint8_t ssl_enabled;
837     void    *ssl;
838 #ifdef TLS
839     char   *ssl_wbuf;
840 #endif
841     enum conn_states  state;
842     enum bin_substates substate;
843     rel_time_t last_cmd_time;
844     struct event event;
845     short  ev_flags;
846     short  which;   /** which events were just triggered */
847 
848     char   *rbuf;   /** buffer to read commands into */
849     char   *rcurr;  /** but if we parsed some already, this is where we stopped */
850     int    rsize;   /** total allocated size of rbuf */
851     int    rbytes;  /** how much data, starting from rcur, do we have unparsed */
852 
853     mc_resp *resp; // tail response.
854     mc_resp *resp_head; // first response in current stack.
855     char   *ritem;  /** when we read in an item's value, it goes here */
856     int    rlbytes;
857 
858     /**
859      * item is used to hold an item structure created after reading the command
860      * line of set/add/replace commands, but before we finished reading the actual
861      * data. The data is read into ITEM_data(item) to avoid extra copying.
862      */
863 
864     void   *item;     /* for commands set/add/replace  */
865 
866     /* data for the swallow state */
867     int    sbytes;    /* how many bytes to swallow */
868 
869     int io_queues_submitted; /* see notes on io_queue_t */
870     io_queue_t io_queues[IO_QUEUE_COUNT]; /* set of deferred IO queues. */
871 #ifdef PROXY
872     void *proxy_rctx; /* pointer to active request context */
873 #endif
874 #ifdef EXTSTORE
875     unsigned int recache_counter;
876 #endif
877     enum protocol protocol;   /* which protocol this connection speaks */
878     enum network_transport transport; /* what transport is used by this connection */
879     enum close_reasons close_reason; /* reason for transition into conn_closing */
880 
881     /* data for UDP clients */
882     int    request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
883     struct sockaddr_in6 request_addr; /* udp: Who sent the most recent request */
884     socklen_t request_addr_size;
885 
886     bool   noreply;   /* True if the reply should not be sent. */
887     /* current stats command */
888     struct {
889         char *buffer;
890         size_t size;
891         size_t offset;
892     } stats;
893 
894     /* Binary protocol stuff */
895     /* This is where the binary header goes */
896     protocol_binary_request_header binary_header;
897     uint64_t cas; /* the cas to return */
898     uint64_t tag; /* listener stocket tag */
899     short cmd; /* current command being processed */
900     int opaque;
901     int keylen;
902     conn   *next;     /* Used for generating a list of conn structures */
903     LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
904     int (*try_read_command)(conn *c); /* pointer for top level input parser */
905     ssize_t (*read)(conn  *c, void *buf, size_t count);
906     ssize_t (*sendmsg)(conn *c, struct msghdr *msg, int flags);
907     ssize_t (*write)(conn *c, void *buf, size_t count);
908 };
909 
910 /* array of conn structures, indexed by file descriptor */
911 extern conn **conns;
912 
913 /* current time of day (updated periodically) */
914 extern volatile rel_time_t current_time;
915 
916 #ifdef MEMCACHED_DEBUG
917 extern volatile bool is_paused;
918 extern volatile int64_t delta;
919 #endif
920 
921 /* TODO: Move to slabs.h? */
922 extern volatile int slab_rebalance_signal;
923 
924 struct slab_rebalance {
925     void *slab_start;
926     void *slab_end;
927     void *slab_pos;
928     int s_clsid;
929     int d_clsid;
930     uint32_t busy_items;
931     uint32_t rescues;
932     uint32_t evictions_nomem;
933     uint32_t inline_reclaim;
934     uint32_t chunk_rescues;
935     uint32_t busy_deletes;
936     uint32_t busy_loops;
937     uint8_t done;
938     uint8_t *completed;
939 };
940 
941 extern struct slab_rebalance slab_rebal;
942 #ifdef EXTSTORE
943 extern void *ext_storage;
944 #endif
945 /*
946  * Functions
947  */
948 void verify_default(const char* param, bool condition);
949 void do_accept_new_conns(const bool do_accept);
950 enum delta_result_type do_add_delta(LIBEVENT_THREAD *t, const char *key,
951                                     const size_t nkey, const bool incr,
952                                     const int64_t delta, char *buf,
953                                     uint64_t *cas, const uint32_t hv,
954                                     item **it_ret);
955 enum store_item_type do_store_item(item *item, int comm, LIBEVENT_THREAD *t, const uint32_t hv, int *nbytes, uint64_t *cas, const uint64_t cas_in, bool cas_stale);
956 void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb);
957 void conn_io_queue_setup(conn *c);
958 io_queue_t *conn_io_queue_get(conn *c, int type);
959 io_queue_cb_t *thread_io_queue_get(LIBEVENT_THREAD *t, int type);
960 void conn_io_queue_return(io_pending_t *io);
961 conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size,
962     enum network_transport transport, struct event_base *base, void *ssl, uint64_t conntag, enum protocol bproto);
963 
964 void conn_worker_readd(conn *c);
965 extern int daemonize(int nochdir, int noclose);
966 
967 #define mutex_lock(x) pthread_mutex_lock(x)
968 #define mutex_trylock(x) pthread_mutex_trylock(x)
969 #define mutex_unlock(x) pthread_mutex_unlock(x)
970 
971 #include "stats_prefix.h"
972 #include "slabs.h"
973 #include "assoc.h"
974 #include "items.h"
975 #include "crawler.h"
976 #include "trace.h"
977 #include "hash.h"
978 
979 /*
980  * Functions such as the libevent-related calls that need to do cross-thread
981  * communication in multithreaded mode (rather than actually doing the work
982  * in the current thread) are called via "dispatch_" frontends, which are
983  * also #define-d to directly call the underlying code in singlethreaded mode.
984  */
985 void memcached_thread_init(int nthreads, void *arg);
986 void redispatch_conn(conn *c);
987 void timeout_conn(conn *c);
988 #ifdef PROXY
989 void proxy_reload_notify(LIBEVENT_THREAD *t);
990 #endif
991 void return_io_pending(io_pending_t *io);
992 void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size,
993     enum network_transport transport, void *ssl, uint64_t conntag, enum protocol bproto);
994 void sidethread_conn_close(conn *c);
995 
996 /* Lock wrappers for cache functions that are called from main loop. */
997 enum delta_result_type add_delta(LIBEVENT_THREAD *t, const char *key,
998                                  const size_t nkey, bool incr,
999                                  const int64_t delta, char *buf,
1000                                  uint64_t *cas);
1001 void accept_new_conns(const bool do_accept);
1002 void  conn_close_idle(conn *c);
1003 void  conn_close_all(void);
1004 item *item_alloc(const char *key, size_t nkey, client_flags_t flags, rel_time_t exptime, int nbytes);
1005 #define DO_UPDATE true
1006 #define DONT_UPDATE false
1007 item *item_get(const char *key, const size_t nkey, LIBEVENT_THREAD *t, const bool do_update);
1008 item *item_get_locked(const char *key, const size_t nkey, LIBEVENT_THREAD *t, const bool do_update, uint32_t *hv);
1009 item *item_touch(const char *key, const size_t nkey, uint32_t exptime, LIBEVENT_THREAD *t);
1010 int   item_link(item *it);
1011 void  item_remove(item *it);
1012 int   item_replace(item *it, item *new_it, const uint32_t hv, const uint64_t cas_in);
1013 void  item_unlink(item *it);
1014 
1015 void item_lock(uint32_t hv);
1016 void *item_trylock(uint32_t hv);
1017 void item_trylock_unlock(void *arg);
1018 void item_unlock(uint32_t hv);
1019 void pause_threads(enum pause_thread_types type);
1020 void stop_threads(void);
1021 int stop_conn_timeout_thread(void);
1022 #define refcount_incr(it) ++(it->refcount)
1023 #define refcount_decr(it) --(it->refcount)
1024 void STATS_LOCK(void);
1025 void STATS_UNLOCK(void);
1026 #define THR_STATS_LOCK(t) pthread_mutex_lock(&t->stats.mutex)
1027 #define THR_STATS_UNLOCK(t) pthread_mutex_unlock(&t->stats.mutex)
1028 void threadlocal_stats_reset(void);
1029 void threadlocal_stats_aggregate(struct thread_stats *stats);
1030 void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out);
1031 void thread_setname(pthread_t thread, const char *name);
1032 LIBEVENT_THREAD *get_worker_thread(int id);
1033 
1034 /* Stat processing functions */
1035 void append_stat(const char *name, ADD_STAT add_stats, conn *c,
1036                  const char *fmt, ...);
1037 
1038 enum store_item_type store_item(item *item, int comm, LIBEVENT_THREAD *t, int *nbytes, uint64_t *cas, const uint64_t cas_in, bool cas_stale);
1039 
1040 /* Protocol related code */
1041 void out_string(conn *c, const char *str);
1042 #define REALTIME_MAXDELTA 60*60*24*30
1043 /* Negative exptimes can underflow and end up immortal. realtime() will
1044    immediately expire values that are greater than REALTIME_MAXDELTA, but less
1045    than process_started, so lets aim for that. */
1046 #define EXPTIME_TO_POSITIVE_TIME(exptime) (exptime < 0) ? \
1047         REALTIME_MAXDELTA + 1 : exptime
1048 rel_time_t realtime(const time_t exptime);
1049 item* limited_get(const char *key, size_t nkey, LIBEVENT_THREAD *t, uint32_t exptime, bool should_touch, bool do_update, bool *overflow);
1050 item* limited_get_locked(const char *key, size_t nkey, LIBEVENT_THREAD *t, bool do_update, uint32_t *hv, bool *overflow);
1051 // Read/Response object handlers.
1052 void resp_reset(mc_resp *resp);
1053 void resp_add_iov(mc_resp *resp, const void *buf, int len);
1054 void resp_add_chunked_iov(mc_resp *resp, const void *buf, int len);
1055 bool resp_start(conn *c);
1056 mc_resp *resp_start_unlinked(conn *c);
1057 mc_resp* resp_finish(conn *c, mc_resp *resp);
1058 void resp_free(LIBEVENT_THREAD *th, mc_resp *resp);
1059 bool resp_has_stack(conn *c);
1060 bool rbuf_switch_to_malloc(conn *c);
1061 void conn_release_items(conn *c);
1062 void conn_set_state(conn *c, enum conn_states state);
1063 void out_of_memory(conn *c, char *ascii_error);
1064 void out_errstring(conn *c, const char *str);
1065 void write_and_free(conn *c, char *buf, int bytes);
1066 void server_stats(ADD_STAT add_stats, void *c);
1067 void append_stats(const char *key, const uint16_t klen,
1068                   const char *val, const uint32_t vlen,
1069                   const void *cookie);
1070 /** Return a datum for stats in binary protocol */
1071 bool get_stats(const char *stat_type, int nkey, ADD_STAT add_stats, void *c);
1072 void stats_reset(void);
1073 void process_stat_settings(ADD_STAT add_stats, void *c);
1074 void process_stats_conns(ADD_STAT add_stats, void *c);
1075 
1076 #if HAVE_DROP_PRIVILEGES
1077 extern void setup_privilege_violations_handler(void);
1078 extern void drop_privileges(void);
1079 #else
1080 #define setup_privilege_violations_handler()
1081 #define drop_privileges()
1082 #endif
1083 
1084 #if HAVE_DROP_WORKER_PRIVILEGES
1085 extern void drop_worker_privileges(void);
1086 #else
1087 #define drop_worker_privileges()
1088 #endif
1089 
1090 /* If supported, give compiler hints for branch prediction. */
1091 #if !defined(__GNUC__) || (__GNUC__ == 2 && __GNUC_MINOR__ < 96)
1092 #define __builtin_expect(x, expected_value) (x)
1093 #endif
1094 
1095 #define likely(x)       __builtin_expect((x),1)
1096 #define unlikely(x)     __builtin_expect((x),0)
1097