1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // Functions related to local command execution.
3
4 #include "proxy.h"
5 #include "storage.h"
6
7 #define PROXY_STORAGE_GET 0
8 #define PROXY_STORAGE_MG 1
9 #define _DO_CAS true
10 #define _NO_CAS false
11 #define _DO_TOUCH true
12 #define _NO_TOUCH false
13
_store_item_copy_from_buf(item * d_it,char * buf,const int len)14 static int _store_item_copy_from_buf(item *d_it, char *buf, const int len) {
15 if (d_it->it_flags & ITEM_CHUNKED) {
16 item_chunk *dch = (item_chunk *) ITEM_schunk(d_it);
17 int done = 0;
18 // Fill dch's via a flat data buffer
19 while (len > done && dch) {
20 int todo = (dch->size - dch->used < len - done)
21 ? dch->size - dch->used : len - done;
22 memcpy(dch->data + dch->used, buf + done, todo);
23 done += todo;
24 dch->used += todo;
25 assert(dch->used <= dch->size);
26
27 if (dch->size == dch->used) {
28 item_chunk *tch = do_item_alloc_chunk(dch, len - done);
29 if (tch) {
30 dch = tch;
31 } else {
32 return -1;
33 }
34 }
35 }
36 assert(len == done);
37 } else {
38 memcpy(ITEM_data(d_it), buf, len);
39 }
40
41 return 0;
42 }
43
44 // TODO (v2): out_string() needs to change to just take a *resp, but I don't
45 // want to do the huge refactor in this change series. So for now we have a
46 // custom out_string().
pout_string(mc_resp * resp,const char * str)47 static void pout_string(mc_resp *resp, const char *str) {
48 size_t len;
49 bool skip = resp->skip;
50 assert(resp != NULL);
51
52 // if response was original filled with something, but we're now writing
53 // out an error or similar, have to reset the object first.
54 resp_reset(resp);
55
56 // We blank the response "just in case", but if we're not intending on
57 // sending it lets not rewrite it.
58 if (skip) {
59 resp->skip = true;
60 return;
61 }
62
63 // Fill response object with static string.
64
65 len = strlen(str);
66 if ((len + 2) > WRITE_BUFFER_SIZE) {
67 /* ought to be always enough. just fail for simplicity */
68 str = "SERVER_ERROR output line too long";
69 len = strlen(str);
70 }
71
72 memcpy(resp->wbuf, str, len);
73 memcpy(resp->wbuf + len, "\r\n", 2);
74 resp_add_iov(resp, resp->wbuf, len + 2);
75
76 return;
77 }
78
79 // For meta commands error strings override the quiet flag.
pout_errstring(mc_resp * resp,const char * str)80 static void pout_errstring(mc_resp *resp, const char *str) {
81 resp->skip = false;
82 pout_string(resp, str);
83 }
84
85 #ifdef EXTSTORE
_storage_get_item_cb(void * e,obj_io * eio,int ret)86 static void _storage_get_item_cb(void *e, obj_io *eio, int ret) {
87 io_pending_proxy_t *io = (io_pending_proxy_t *)eio->data;
88 assert(io->active == true);
89 mc_resp *resp = io->tresp;
90 item *read_it = (item *)eio->buf;
91 bool miss = false;
92
93 if (ret < 1) {
94 miss = true;
95 } else {
96 uint32_t crc2;
97 uint32_t crc = (uint32_t) read_it->exptime;
98 crc2 = crc32c(0, (char *)read_it+STORE_OFFSET, eio->len-STORE_OFFSET);
99
100 if (crc != crc2) {
101 miss = true;
102 io->badcrc = true;
103 }
104 }
105
106 if (miss && !resp->skip) {
107 resp->iovcnt = 1;
108 if (io->gettype == PROXY_STORAGE_GET) {
109 if (io->ascii_multiget) {
110 resp->iov[0].iov_len = 0;
111 resp->iov[0].iov_base = "";
112 resp->tosend = 0;
113 } else {
114 resp->iov[0].iov_len = 5;
115 resp->iov[0].iov_base = "END\r\n";
116 resp->tosend = 5;
117 }
118 } else if (io->gettype == PROXY_STORAGE_MG) {
119 resp->iov[0].iov_len = 4;
120 resp->iov[0].iov_base = "EN\r\n";
121 resp->tosend = 5;
122 } else {
123 assert(1 == 0);
124 }
125 }
126
127 if (!miss) {
128 resp->iov[io->iovec_data].iov_base = ITEM_data(read_it);
129 }
130 io->miss = miss;
131 io->active = false;
132
133 // in proxy mode we tend to return IO's as they happen so we can keep
134 // latency down more.
135 return_io_pending((io_pending_t *)io);
136 }
137
138 // TODO (v2): if the item is smaller than resp->wbuf[] shouldn't we just read
139 // directly into there? item only necessary for recache.
proxy_storage_get(LIBEVENT_THREAD * t,item * it,mc_resp * resp,int type)140 static int proxy_storage_get(LIBEVENT_THREAD *t, item *it, mc_resp *resp,
141 int type) {
142 #ifdef NEED_ALIGN
143 item_hdr hdr;
144 memcpy(&hdr, ITEM_data(it), sizeof(hdr));
145 #else
146 item_hdr *hdr = (item_hdr *)ITEM_data(it);
147 #endif
148 size_t ntotal = ITEM_ntotal(it);
149
150 io_pending_proxy_t *io = do_cache_alloc(t->io_cache);
151 // this is a re-cast structure, so assert that we never outsize it.
152 assert(sizeof(io_pending_t) >= sizeof(io_pending_proxy_t));
153 memset(io, 0, sizeof(io_pending_proxy_t));
154 io->active = true;
155 // io_pending owns the reference for this object now.
156 io->hdr_it = it;
157 io->tresp = resp; // our mc_resp is a temporary object.
158 io->io_queue_type = IO_QUEUE_EXTSTORE;
159 io->io_type = IO_PENDING_TYPE_EXTSTORE; // proxy specific sub-type.
160 io->gettype = type;
161 io->thread = t;
162 io->return_cb = proxy_return_rctx_cb;
163 io->finalize_cb = proxy_finalize_rctx_cb;
164 obj_io *eio = &io->eio;
165
166 eio->buf = malloc(ntotal);
167 if (eio->buf == NULL) {
168 do_cache_free(t->io_cache, io);
169 return -1;
170 }
171
172 io->iovec_data = resp->iovcnt;
173 resp_add_iov(resp, "", it->nbytes);
174
175 // We can't bail out anymore, so mc_resp owns the IO from here.
176 resp->io_pending = (io_pending_t *)io;
177
178 // reference ourselves for the callback.
179 eio->data = (void *)io;
180
181 // Now, fill in io->io based on what was in our header.
182 #ifdef NEED_ALIGN
183 eio->page_version = hdr.page_version;
184 eio->page_id = hdr.page_id;
185 eio->offset = hdr.offset;
186 #else
187 eio->page_version = hdr->page_version;
188 eio->page_id = hdr->page_id;
189 eio->offset = hdr->offset;
190 #endif
191 eio->len = ntotal;
192 eio->mode = OBJ_IO_READ;
193 eio->cb = _storage_get_item_cb;
194
195 pthread_mutex_lock(&t->stats.mutex);
196 t->stats.get_extstore++;
197 pthread_mutex_unlock(&t->stats.mutex);
198
199 return 0;
200 }
201 #endif // EXTSTORE
202
203 /* client flags == 0 means use no storage for client flags */
make_ascii_get_suffix(char * suffix,item * it,bool return_cas,int nbytes)204 static inline int make_ascii_get_suffix(char *suffix, item *it, bool return_cas, int nbytes) {
205 char *p = suffix;
206 *p = ' ';
207 p++;
208 if (FLAGS_SIZE(it) == 0) {
209 *p = '0';
210 p++;
211 } else {
212 p = itoa_u64(*((client_flags_t *) ITEM_suffix(it)), p);
213 }
214 *p = ' ';
215 p = itoa_u32(nbytes-2, p+1);
216
217 if (return_cas) {
218 *p = ' ';
219 p = itoa_u64(ITEM_get_cas(it), p+1);
220 }
221
222 *p = '\r';
223 *(p+1) = '\n';
224 *(p+2) = '\0';
225 return (p - suffix) + 2;
226 }
227
process_get_cmd(LIBEVENT_THREAD * t,mcp_parser_t * pr,mc_resp * resp,bool return_cas,bool should_touch)228 static void process_get_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp, bool return_cas, bool should_touch) {
229 const char *key = &pr->request[pr->tokens[pr->keytoken]];
230 int nkey = pr->klen;
231 rel_time_t exptime = 0;
232 bool overflow = false; // unused.
233
234 if (nkey > KEY_MAX_LENGTH) {
235 pout_string(resp, "CLIENT_ERROR bad command line format");
236 return;
237 }
238
239 item *it = limited_get(key, nkey, t, exptime, should_touch, DO_UPDATE, &overflow);
240 if (it) {
241 int nbytes = it->nbytes;;
242 nbytes = it->nbytes;
243 char *p = resp->wbuf;
244 memcpy(p, "VALUE ", 6);
245 p += 6;
246 memcpy(p, ITEM_key(it), it->nkey);
247 p += it->nkey;
248 p += make_ascii_get_suffix(p, it, return_cas, nbytes);
249 resp_add_iov(resp, resp->wbuf, p - resp->wbuf);
250
251 #ifdef EXTSTORE
252 if (it->it_flags & ITEM_HDR) {
253 if (proxy_storage_get(t, it, resp, PROXY_STORAGE_GET) != 0) {
254 pthread_mutex_lock(&t->stats.mutex);
255 t->stats.get_oom_extstore++;
256 pthread_mutex_unlock(&t->stats.mutex);
257
258 item_remove(it);
259 proxy_out_errstring(resp, PROXY_SERVER_ERROR, "out of memory writing get response");
260 return;
261 }
262 } else if ((it->it_flags & ITEM_CHUNKED) == 0) {
263 resp_add_iov(resp, ITEM_data(it), it->nbytes);
264 } else {
265 resp_add_chunked_iov(resp, it, it->nbytes);
266 }
267 #else
268 if ((it->it_flags & ITEM_CHUNKED) == 0) {
269 resp_add_iov(resp, ITEM_data(it), it->nbytes);
270 } else {
271 resp_add_chunked_iov(resp, it, it->nbytes);
272 }
273 #endif
274
275 /* item_get() has incremented it->refcount for us */
276 pthread_mutex_lock(&t->stats.mutex);
277 if (should_touch) {
278 t->stats.touch_cmds++;
279 t->stats.slab_stats[ITEM_clsid(it)].touch_hits++;
280 } else {
281 t->stats.lru_hits[it->slabs_clsid]++;
282 t->stats.get_cmds++;
283 }
284 pthread_mutex_unlock(&t->stats.mutex);
285 #ifdef EXTSTORE
286 /* If ITEM_HDR, an io_wrap owns the reference. */
287 if ((it->it_flags & ITEM_HDR) == 0) {
288 resp->item = it;
289 }
290 #else
291 resp->item = it;
292 #endif
293 } else {
294 pthread_mutex_lock(&t->stats.mutex);
295 if (should_touch) {
296 t->stats.touch_cmds++;
297 t->stats.touch_misses++;
298 } else {
299 t->stats.get_misses++;
300 t->stats.get_cmds++;
301 }
302 pthread_mutex_unlock(&t->stats.mutex);
303 }
304
305 resp_add_iov(resp, "END\r\n", 5);
306 return;
307 }
308
process_update_cmd(LIBEVENT_THREAD * t,mcp_parser_t * pr,mc_resp * resp,int comm,bool handle_cas)309 static void process_update_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp, int comm, bool handle_cas) {
310 const char *key = &pr->request[pr->tokens[pr->keytoken]];
311 size_t nkey = pr->klen;
312 client_flags_t flags;
313 int32_t exptime_int = 0;
314 rel_time_t exptime = 0;
315 uint64_t req_cas_id = 0;
316 item *it;
317
318 assert(resp != NULL);
319
320 if (nkey > KEY_MAX_LENGTH) {
321 pout_string(resp, "CLIENT_ERROR bad command line format");
322 return;
323 }
324
325 // TODO (v2): these safe_str* functions operate on C _strings_, but these
326 // tokens simply end with a space or carriage return/newline, so we either
327 // need custom functions or validate harder that these calls won't bite us
328 // later.
329 if (! (safe_strtoflags(&pr->request[pr->tokens[2]], &flags)
330 && safe_strtol(&pr->request[pr->tokens[3]], &exptime_int))) {
331 pout_string(resp, "CLIENT_ERROR bad command line format");
332 return;
333 }
334
335 exptime = realtime(EXPTIME_TO_POSITIVE_TIME(exptime_int));
336
337 // does cas value exist?
338 if (handle_cas) {
339 if (!safe_strtoull(&pr->request[pr->tokens[5]], &req_cas_id)) {
340 pout_string(resp, "CLIENT_ERROR bad command line format");
341 return;
342 }
343 }
344
345 // vlen is validated from the main parser.
346
347 if (settings.detail_enabled) {
348 stats_prefix_record_set(key, nkey);
349 }
350
351 it = item_alloc(key, nkey, flags, exptime, pr->vlen);
352
353 if (it == 0) {
354 //enum store_item_type status;
355 if (! item_size_ok(nkey, flags, pr->vlen)) {
356 pout_string(resp, "SERVER_ERROR object too large for cache");
357 //status = TOO_LARGE;
358 pthread_mutex_lock(&t->stats.mutex);
359 t->stats.store_too_large++;
360 pthread_mutex_unlock(&t->stats.mutex);
361 } else {
362 pout_string(resp, "SERVER_ERROR out of memory storing object");
363 //status = NO_MEMORY;
364 pthread_mutex_lock(&t->stats.mutex);
365 t->stats.store_no_memory++;
366 pthread_mutex_unlock(&t->stats.mutex);
367 }
368 //LOGGER_LOG(c->thread->l, LOG_MUTATIONS, LOGGER_ITEM_STORE,
369 // NULL, status, comm, key, nkey, 0, 0, c->sfd);
370
371 /* Avoid stale data persisting in cache because we failed alloc.
372 * Unacceptable for SET. Anywhere else too? */
373 if (comm == NREAD_SET) {
374 it = item_get(key, nkey, t, DONT_UPDATE);
375 if (it) {
376 item_unlink(it);
377 STORAGE_delete(t->storage, it);
378 item_remove(it);
379 }
380 }
381
382 return;
383 }
384 ITEM_set_cas(it, req_cas_id);
385
386 pthread_mutex_lock(&t->stats.mutex);
387 t->stats.slab_stats[ITEM_clsid(it)].set_cmds++;
388 pthread_mutex_unlock(&t->stats.mutex);
389
390 // complete_nread_proxy() does the data chunk check so all we need to do
391 // is copy the data.
392 if (_store_item_copy_from_buf(it, pr->vbuf, it->nbytes) != 0) {
393 pout_string(resp, "SERVER_ERROR out of memory storing object");
394 item_remove(it);
395 return;
396 }
397
398 int ret = store_item(it, comm, t, NULL, NULL, (settings.use_cas) ? get_cas_id() : 0, CAS_NO_STALE);
399 switch (ret) {
400 case STORED:
401 pout_string(resp, "STORED");
402 break;
403 case EXISTS:
404 pout_string(resp, "EXISTS");
405 break;
406 case NOT_FOUND:
407 pout_string(resp, "NOT_FOUND");
408 break;
409 case NOT_STORED:
410 pout_string(resp, "NOT_STORED");
411 break;
412 default:
413 pout_string(resp, "SERVER_ERROR Unhandled storage type.");
414 }
415
416 // We don't need to hold a reference since the item was fully read.
417 item_remove(it);
418 }
419
process_arithmetic_cmd(LIBEVENT_THREAD * t,mcp_parser_t * pr,mc_resp * resp,const bool incr)420 static void process_arithmetic_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp, const bool incr) {
421 char temp[INCR_MAX_STORAGE_LEN];
422 uint64_t delta;
423 const char *key = &pr->request[pr->tokens[pr->keytoken]];
424 size_t nkey = pr->klen;
425
426 assert(t != NULL);
427
428 if (nkey > KEY_MAX_LENGTH) {
429 pout_string(resp, "CLIENT_ERROR bad command line format");
430 return;
431 }
432
433 if (!safe_strtoull(&pr->request[pr->tokens[2]], &delta)) {
434 pout_string(resp, "CLIENT_ERROR invalid numeric delta argument");
435 return;
436 }
437
438 switch(add_delta(t, key, nkey, incr, delta, temp, NULL)) {
439 case OK:
440 pout_string(resp, temp);
441 break;
442 case NON_NUMERIC:
443 pout_string(resp, "CLIENT_ERROR cannot increment or decrement non-numeric value");
444 break;
445 case EOM:
446 pout_string(resp, "SERVER_ERROR out of memory");
447 break;
448 case DELTA_ITEM_NOT_FOUND:
449 pthread_mutex_lock(&t->stats.mutex);
450 if (incr) {
451 t->stats.incr_misses++;
452 } else {
453 t->stats.decr_misses++;
454 }
455 pthread_mutex_unlock(&t->stats.mutex);
456
457 pout_string(resp, "NOT_FOUND");
458 break;
459 case DELTA_ITEM_CAS_MISMATCH:
460 break; /* Should never get here */
461 }
462 }
463
process_delete_cmd(LIBEVENT_THREAD * t,mcp_parser_t * pr,mc_resp * resp)464 static void process_delete_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp) {
465 const char *key = &pr->request[pr->tokens[pr->keytoken]];
466 size_t nkey = pr->klen;
467 item *it;
468 uint32_t hv;
469
470 assert(t != NULL);
471
472 // NOTE: removed a compatibility bodge from a decade ago.
473 // delete used to take a "delay" argument, which was removed, but some
474 // ancient php clients always sent a 0 argument, which would then fail.
475 // It's been long enough that I don't want to carry this forward into the
476 // new parser.
477
478 if (nkey > KEY_MAX_LENGTH) {
479 pout_string(resp, "CLIENT_ERROR bad command line format");
480 return;
481 }
482
483 it = item_get_locked(key, nkey, t, DONT_UPDATE, &hv);
484 if (it) {
485 //MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey);
486
487 pthread_mutex_lock(&t->stats.mutex);
488 t->stats.slab_stats[ITEM_clsid(it)].delete_hits++;
489 pthread_mutex_unlock(&t->stats.mutex);
490 LOGGER_LOG(NULL, LOG_DELETIONS, LOGGER_DELETIONS, it, LOG_TYPE_DELETE);
491 do_item_unlink(it, hv);
492 STORAGE_delete(t->storage, it);
493 do_item_remove(it); /* release our reference */
494 pout_string(resp, "DELETED");
495 } else {
496 pthread_mutex_lock(&t->stats.mutex);
497 t->stats.delete_misses++;
498 pthread_mutex_unlock(&t->stats.mutex);
499
500 pout_string(resp, "NOT_FOUND");
501 }
502 item_unlock(hv);
503 }
504
process_touch_cmd(LIBEVENT_THREAD * t,mcp_parser_t * pr,mc_resp * resp)505 static void process_touch_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp) {
506 const char *key = &pr->request[pr->tokens[pr->keytoken]];
507 size_t nkey = pr->klen;
508 int32_t exptime_int = 0;
509 rel_time_t exptime = 0;
510 item *it;
511
512 assert(t != NULL);
513
514 if (nkey > KEY_MAX_LENGTH) {
515 pout_string(resp, "CLIENT_ERROR bad command line format");
516 return;
517 }
518
519 if (!safe_strtol(&pr->request[pr->tokens[2]], &exptime_int)) {
520 pout_string(resp, "CLIENT_ERROR invalid exptime argument");
521 return;
522 }
523
524 exptime = realtime(EXPTIME_TO_POSITIVE_TIME(exptime_int));
525 it = item_touch(key, nkey, exptime, t);
526 if (it) {
527 pthread_mutex_lock(&t->stats.mutex);
528 t->stats.touch_cmds++;
529 t->stats.slab_stats[ITEM_clsid(it)].touch_hits++;
530 pthread_mutex_unlock(&t->stats.mutex);
531
532 pout_string(resp, "TOUCHED");
533 item_remove(it);
534 } else {
535 pthread_mutex_lock(&t->stats.mutex);
536 t->stats.touch_cmds++;
537 t->stats.touch_misses++;
538 pthread_mutex_unlock(&t->stats.mutex);
539
540 pout_string(resp, "NOT_FOUND");
541 }
542 }
543
544 /*** meta command handlers ***/
545
546 // FIXME: macro or public interface, this is copypasted.
_process_token_len(mcp_parser_t * pr,size_t token)547 static int _process_token_len(mcp_parser_t *pr, size_t token) {
548 const char *s = pr->request + pr->tokens[token];
549 const char *e = pr->request + pr->tokens[token+1];
550 // start of next token is after any space delimiters, so back those out.
551 while (*(e-1) == ' ') {
552 e--;
553 }
554 return e - s;
555 }
556
557 #define META_SPACE(p) { \
558 *p = ' '; \
559 p++; \
560 }
561
562 #define META_CHAR(p, c) { \
563 *p = ' '; \
564 *(p+1) = c; \
565 p += 2; \
566 }
567
568 // FIXME: binary key support.
569 #define META_KEY(p, key, nkey, bin) { \
570 META_CHAR(p, 'k'); \
571 memcpy(p, key, nkey); \
572 p += nkey; \
573 }
574
575 #define MFLAG_MAX_OPT_LENGTH 20
576 #define MFLAG_MAX_OPAQUE_LENGTH 32
577
578 struct _meta_flags {
579 unsigned int has_error :1; // flipped if we found an error during parsing.
580 unsigned int no_update :1;
581 unsigned int locked :1;
582 unsigned int vivify :1;
583 unsigned int la :1;
584 unsigned int hit :1;
585 unsigned int value :1;
586 unsigned int set_stale :1;
587 unsigned int no_reply :1;
588 unsigned int has_cas :1;
589 unsigned int has_cas_in :1;
590 unsigned int new_ttl :1;
591 unsigned int key_binary:1;
592 unsigned int remove_val:1;
593 char mode; // single character mode switch, common to ms/ma
594 rel_time_t exptime;
595 rel_time_t autoviv_exptime;
596 rel_time_t recache_time;
597 client_flags_t client_flags;
598 uint64_t req_cas_id;
599 uint64_t cas_id_in; // client supplied next-CAS
600 uint64_t delta; // ma
601 uint64_t initial; // ma
602 };
603
_meta_flag_preparse(mcp_parser_t * pr,const size_t start,struct _meta_flags * of,char ** errstr)604 static int _meta_flag_preparse(mcp_parser_t *pr, const size_t start,
605 struct _meta_flags *of, char **errstr) {
606 unsigned int i;
607 //size_t ret;
608 int32_t tmp_int;
609 uint8_t seen[127] = {0};
610 // Start just past the key token. Look at first character of each token.
611 for (i = start; i < pr->ntokens; i++) {
612 uint8_t o = (uint8_t)pr->request[pr->tokens[i]];
613 // zero out repeat flags so we don't over-parse for return data.
614 if (o >= 127 || seen[o] != 0) {
615 *errstr = "CLIENT_ERROR duplicate flag";
616 return -1;
617 }
618 seen[o] = 1;
619 switch (o) {
620 // base64 decode the key in-place, as the binary should always be
621 // shorter and the conversion code buffers bytes.
622 // TODO: we need temporary space for the binary key decode since
623 // request should be const.
624 /*case 'b':
625 ret = base64_decode((unsigned char *)tokens[KEY_TOKEN].value, tokens[KEY_TOKEN].length,
626 (unsigned char *)tokens[KEY_TOKEN].value, tokens[KEY_TOKEN].length);
627 if (ret == 0) {
628 // Failed to decode
629 *errstr = "CLIENT_ERROR error decoding key";
630 of->has_error = 1;
631 }
632 tokens[KEY_TOKEN].length = ret;
633 of->key_binary = 1;
634 break;*/
635 /* Negative exptimes can underflow and end up immortal. realtime() will
636 immediately expire values that are greater than REALTIME_MAXDELTA, but less
637 than process_started, so lets aim for that. */
638 case 'N':
639 of->locked = 1;
640 of->vivify = 1;
641 if (!safe_strtol(&pr->request[pr->tokens[i]+1], &tmp_int)) {
642 *errstr = "CLIENT_ERROR bad token in command line format";
643 of->has_error = 1;
644 } else {
645 of->autoviv_exptime = realtime(EXPTIME_TO_POSITIVE_TIME(tmp_int));
646 }
647 break;
648 case 'T':
649 of->locked = 1;
650 if (!safe_strtol(&pr->request[pr->tokens[i]+1], &tmp_int)) {
651 *errstr = "CLIENT_ERROR bad token in command line format";
652 of->has_error = 1;
653 } else {
654 of->exptime = realtime(EXPTIME_TO_POSITIVE_TIME(tmp_int));
655 of->new_ttl = true;
656 }
657 break;
658 case 'R':
659 of->locked = 1;
660 if (!safe_strtol(&pr->request[pr->tokens[i]+1], &tmp_int)) {
661 *errstr = "CLIENT_ERROR bad token in command line format";
662 of->has_error = 1;
663 } else {
664 of->recache_time = realtime(EXPTIME_TO_POSITIVE_TIME(tmp_int));
665 }
666 break;
667 case 'l':
668 of->la = 1;
669 of->locked = 1; // need locked to delay LRU bump
670 break;
671 case 'O':
672 case 'P':
673 case 'L':
674 break;
675 case 'k': // known but no special handling
676 case 's':
677 case 't':
678 case 'c':
679 case 'f':
680 break;
681 case 'v':
682 of->value = 1;
683 break;
684 case 'h':
685 of->locked = 1; // need locked to delay LRU bump
686 break;
687 case 'u':
688 of->no_update = 1;
689 break;
690 case 'q':
691 of->no_reply = 1;
692 break;
693 case 'x':
694 of->remove_val = 1;
695 break;
696 // mset-related.
697 case 'F':
698 if (!safe_strtoflags(&pr->request[pr->tokens[i]+1], &of->client_flags)) {
699 of->has_error = true;
700 }
701 break;
702 case 'C': // mset, mdelete, marithmetic
703 if (!safe_strtoull(&pr->request[pr->tokens[i]+1], &of->req_cas_id)) {
704 *errstr = "CLIENT_ERROR bad token in command line format";
705 of->has_error = true;
706 } else {
707 of->has_cas = true;
708 }
709 break;
710 case 'E': // ms, md, ma
711 if (!safe_strtoull(&pr->request[pr->tokens[i]+1], &of->cas_id_in)) {
712 *errstr = "CLIENT_ERROR bad token in command line format";
713 of->has_error = true;
714 } else {
715 of->has_cas_in = true;
716 }
717 break;
718 case 'M': // mset and marithmetic mode switch
719 // FIXME: this used to error if the token isn't a single byte.
720 // It probably should still?
721 of->mode = pr->request[pr->tokens[i]+1];
722 break;
723 case 'J': // marithmetic initial value
724 if (!safe_strtoull(&pr->request[pr->tokens[i]+1], &of->initial)) {
725 *errstr = "CLIENT_ERROR invalid numeric initial value";
726 of->has_error = 1;
727 }
728 break;
729 case 'D': // marithmetic delta value
730 if (!safe_strtoull(&pr->request[pr->tokens[i]+1], &of->delta)) {
731 *errstr = "CLIENT_ERROR invalid numeric delta value";
732 of->has_error = 1;
733 }
734 break;
735 case 'I':
736 of->set_stale = 1;
737 break;
738 default: // unknown flag, bail.
739 *errstr = "CLIENT_ERROR invalid flag";
740 return -1;
741 }
742 }
743
744 return of->has_error ? -1 : 0;
745 }
746
process_mget_cmd(LIBEVENT_THREAD * t,mcp_parser_t * pr,mc_resp * resp)747 static void process_mget_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp) {
748 const char *key = &pr->request[pr->tokens[pr->keytoken]];
749 size_t nkey = pr->klen;
750 item *it;
751 unsigned int i = 0;
752 struct _meta_flags of = {0}; // option bitflags.
753 uint32_t hv; // cached hash value for unlocking an item.
754 bool failed = false;
755 bool item_created = false;
756 bool won_token = false;
757 bool ttl_set = false;
758 char *errstr = "CLIENT_ERROR bad command line format";
759 assert(t != NULL);
760 char *p = resp->wbuf;
761 int tlen = 0;
762
763 // FIXME: still needed?
764 //WANT_TOKENS_MIN(ntokens, 3);
765
766 if (nkey > KEY_MAX_LENGTH) {
767 pout_string(resp, "CLIENT_ERROR bad command line format");
768 return;
769 }
770
771 if (pr->ntokens > MFLAG_MAX_OPT_LENGTH) {
772 // TODO: ensure the command tokenizer gives us at least this many
773 pout_errstring(resp, "CLIENT_ERROR options flags are too long");
774 return;
775 }
776
777 // scrubs duplicated options and sets flags for how to load the item.
778 // we pass in the first token that should be a flag.
779 if (_meta_flag_preparse(pr, 2, &of, &errstr) != 0) {
780 pout_errstring(resp, errstr);
781 return;
782 }
783
784 bool overflow = false;
785 if (!of.locked) {
786 it = limited_get(key, nkey, t, 0, false, !of.no_update, &overflow);
787 } else {
788 // If we had to lock the item, we're doing our own bump later.
789 it = limited_get_locked(key, nkey, t, DONT_UPDATE, &hv, &overflow);
790 }
791
792 // Since we're a new protocol, we can actually inform users that refcount
793 // overflow is happening by straight up throwing an error.
794 // We definitely don't want to re-autovivify by accident.
795 if (overflow) {
796 assert(it == NULL);
797 pout_errstring(resp, "SERVER_ERROR refcount overflow during fetch");
798 return;
799 }
800
801 if (it == NULL && of.vivify) {
802 // Fill in the exptime during parsing later.
803 it = item_alloc(key, nkey, 0, realtime(0), 2);
804 // We don't actually need any of do_store_item's logic:
805 // - already fetched and missed an existing item.
806 // - lock is still held.
807 // - not append/prepend/replace
808 // - not testing CAS
809 if (it != NULL) {
810 // I look forward to the day I get rid of this :)
811 memcpy(ITEM_data(it), "\r\n", 2);
812 // NOTE: This initializes the CAS value.
813 do_item_link(it, hv, of.has_cas_in ? of.cas_id_in : get_cas_id());
814 item_created = true;
815 }
816 }
817
818 // don't have to check result of add_iov() since the iov size defaults are
819 // enough.
820 if (it) {
821 if (of.value) {
822 memcpy(p, "VA ", 3);
823 p = itoa_u32(it->nbytes-2, p+3);
824 } else {
825 memcpy(p, "HD", 2);
826 p += 2;
827 }
828
829 for (i = pr->keytoken+1; i < pr->ntokens; i++) {
830 switch (pr->request[pr->tokens[i]]) {
831 case 'T':
832 ttl_set = true;
833 it->exptime = of.exptime;
834 break;
835 case 'N':
836 if (item_created) {
837 it->exptime = of.autoviv_exptime;
838 won_token = true;
839 }
840 break;
841 case 'R':
842 // If we haven't autovivified and supplied token is less
843 // than current TTL, mark a win.
844 if ((it->it_flags & ITEM_TOKEN_SENT) == 0
845 && !item_created
846 && it->exptime != 0
847 && it->exptime < of.recache_time) {
848 won_token = true;
849 }
850 break;
851 case 's':
852 META_CHAR(p, 's');
853 p = itoa_u32(it->nbytes-2, p);
854 break;
855 case 't':
856 // TTL remaining as of this request.
857 // needs to be relative because server clocks may not be in sync.
858 META_CHAR(p, 't');
859 if (it->exptime == 0) {
860 *p = '-';
861 *(p+1) = '1';
862 p += 2;
863 } else {
864 p = itoa_u32(it->exptime - current_time, p);
865 }
866 break;
867 case 'c':
868 META_CHAR(p, 'c');
869 p = itoa_u64(ITEM_get_cas(it), p);
870 break;
871 case 'f':
872 META_CHAR(p, 'f');
873 if (FLAGS_SIZE(it) == 0) {
874 *p = '0';
875 p++;
876 } else {
877 p = itoa_u64(*((client_flags_t *) ITEM_suffix(it)), p);
878 }
879 break;
880 case 'l':
881 META_CHAR(p, 'l');
882 p = itoa_u32(current_time - it->time, p);
883 break;
884 case 'h':
885 META_CHAR(p, 'h');
886 if (it->it_flags & ITEM_FETCHED) {
887 *p = '1';
888 } else {
889 *p = '0';
890 }
891 p++;
892 break;
893 case 'O':
894 tlen = _process_token_len(pr, i);
895 if (tlen > MFLAG_MAX_OPAQUE_LENGTH) {
896 errstr = "CLIENT_ERROR opaque token too long";
897 goto error;
898 }
899 META_SPACE(p);
900 memcpy(p, &pr->request[pr->tokens[i]], tlen);
901 p += tlen;
902 break;
903 case 'k':
904 META_KEY(p, ITEM_key(it), it->nkey, (it->it_flags & ITEM_KEY_BINARY));
905 break;
906 }
907 }
908
909 // Has this item already sent a token?
910 // Important to do this here so we don't send W with Z.
911 // Isn't critical, but easier for client authors to understand.
912 if (it->it_flags & ITEM_TOKEN_SENT) {
913 META_CHAR(p, 'Z');
914 }
915 if (it->it_flags & ITEM_STALE) {
916 META_CHAR(p, 'X');
917 // FIXME: think hard about this. is this a default, or a flag?
918 if ((it->it_flags & ITEM_TOKEN_SENT) == 0) {
919 // If we're stale but no token already sent, now send one.
920 won_token = true;
921 }
922 }
923
924 if (won_token) {
925 // Mark a win into the flag buffer.
926 META_CHAR(p, 'W');
927 it->it_flags |= ITEM_TOKEN_SENT;
928 }
929
930 *p = '\r';
931 *(p+1) = '\n';
932 *(p+2) = '\0';
933 p += 2;
934 // finally, chain in the buffer.
935 resp_add_iov(resp, resp->wbuf, p - resp->wbuf);
936
937 if (of.value) {
938 #ifdef EXTSTORE
939 if (it->it_flags & ITEM_HDR) {
940 if (proxy_storage_get(t, it, resp, PROXY_STORAGE_MG) != 0) {
941 pthread_mutex_lock(&t->stats.mutex);
942 t->stats.get_oom_extstore++;
943 pthread_mutex_unlock(&t->stats.mutex);
944
945 failed = true;
946 }
947 } else if ((it->it_flags & ITEM_CHUNKED) == 0) {
948 resp_add_iov(resp, ITEM_data(it), it->nbytes);
949 } else {
950 resp_add_chunked_iov(resp, it, it->nbytes);
951 }
952 #else
953 if ((it->it_flags & ITEM_CHUNKED) == 0) {
954 resp_add_iov(resp, ITEM_data(it), it->nbytes);
955 } else {
956 resp_add_chunked_iov(resp, it, it->nbytes);
957 }
958 #endif
959 }
960
961 // need to hold the ref at least because of the key above.
962 #ifdef EXTSTORE
963 if (!failed) {
964 if ((it->it_flags & ITEM_HDR) != 0 && of.value) {
965 // Only have extstore clean if header and returning value.
966 resp->item = NULL;
967 } else {
968 resp->item = it;
969 }
970 } else {
971 // Failed to set up extstore fetch.
972 if (of.locked) {
973 do_item_remove(it);
974 } else {
975 item_remove(it);
976 }
977 }
978 #else
979 resp->item = it;
980 #endif
981 } else {
982 failed = true;
983 }
984
985 if (of.locked) {
986 // Delayed bump so we could get fetched/last access time pre-update.
987 if (!of.no_update && it != NULL) {
988 do_item_bump(t, it, hv);
989 }
990 item_unlock(hv);
991 }
992
993 // we count this command as a normal one if we've gotten this far.
994 // TODO: for autovivify case, miss never happens. Is this okay?
995 if (!failed) {
996 pthread_mutex_lock(&t->stats.mutex);
997 if (ttl_set) {
998 t->stats.touch_cmds++;
999 t->stats.slab_stats[ITEM_clsid(it)].touch_hits++;
1000 } else {
1001 t->stats.lru_hits[it->slabs_clsid]++;
1002 t->stats.get_cmds++;
1003 }
1004 pthread_mutex_unlock(&t->stats.mutex);
1005 } else {
1006 pthread_mutex_lock(&t->stats.mutex);
1007 if (ttl_set) {
1008 t->stats.touch_cmds++;
1009 t->stats.touch_misses++;
1010 } else {
1011 t->stats.get_misses++;
1012 t->stats.get_cmds++;
1013 }
1014 pthread_mutex_unlock(&t->stats.mutex);
1015
1016 // This gets elided in noreply mode.
1017 if (of.no_reply)
1018 resp->skip = true;
1019 memcpy(p, "EN", 2);
1020 p += 2;
1021 for (i = pr->keytoken+1; i < pr->ntokens; i++) {
1022 switch (pr->request[pr->tokens[i]]) {
1023 // TODO: macro perhaps?
1024 case 'O':
1025 tlen = _process_token_len(pr, i);
1026 if (tlen > MFLAG_MAX_OPAQUE_LENGTH) {
1027 errstr = "CLIENT_ERROR opaque token too long";
1028 goto error;
1029 }
1030 META_SPACE(p);
1031 memcpy(p, &pr->request[pr->tokens[i]], tlen);
1032 p += tlen;
1033 break;
1034 case 'k':
1035 META_KEY(p, key, nkey, of.key_binary);
1036 break;
1037 }
1038 }
1039 resp->wbytes = p - resp->wbuf;
1040 memcpy(resp->wbuf + resp->wbytes, "\r\n", 2);
1041 resp->wbytes += 2;
1042 resp_add_iov(resp, resp->wbuf, resp->wbytes);
1043 }
1044 return;
1045 error:
1046 if (it) {
1047 do_item_remove(it);
1048 if (of.locked) {
1049 item_unlock(hv);
1050 }
1051 }
1052 pout_errstring(resp, errstr);
1053 }
1054
process_mset_cmd(LIBEVENT_THREAD * t,mcp_parser_t * pr,mc_resp * resp)1055 static void process_mset_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp) {
1056 const char *key = &pr->request[pr->tokens[pr->keytoken]];
1057 size_t nkey = pr->klen;
1058
1059 item *it;
1060 int i;
1061 short comm = NREAD_SET;
1062 struct _meta_flags of = {0}; // option bitflags.
1063 char *errstr = "CLIENT_ERROR bad command line format";
1064 uint32_t hv; // cached hash value.
1065 int vlen = pr->vlen; // value from data line.
1066 assert(t != NULL);
1067 char *p = resp->wbuf;
1068 int tlen = 0;
1069
1070 //WANT_TOKENS_MIN(ntokens, 3);
1071
1072 if (nkey > KEY_MAX_LENGTH || pr->ntokens < 3) {
1073 pout_string(resp, "CLIENT_ERROR bad command line format");
1074 return;
1075 }
1076
1077 if (pr->ntokens > MFLAG_MAX_OPT_LENGTH) {
1078 // TODO: ensure the command tokenizer gives us at least this many
1079 pout_errstring(resp, "CLIENT_ERROR options flags are too long");
1080 return;
1081 }
1082
1083 // We need to at least try to get the size to properly slurp bad bytes
1084 // after an error.
1085 // we pass in the first token that should be a flag.
1086 if (_meta_flag_preparse(pr, 3, &of, &errstr) != 0) {
1087 goto error;
1088 }
1089
1090 rel_time_t exptime = of.exptime;
1091 // "mode switch" to alternative commands
1092 switch (of.mode) {
1093 case 0:
1094 break; // no mode supplied.
1095 case 'E': // Add...
1096 comm = NREAD_ADD;
1097 break;
1098 case 'A': // Append.
1099 if (of.vivify) {
1100 comm = NREAD_APPENDVIV;
1101 exptime = of.autoviv_exptime;
1102 } else {
1103 comm = NREAD_APPEND;
1104 }
1105 break;
1106 case 'P': // Prepend.
1107 if (of.vivify) {
1108 comm = NREAD_PREPENDVIV;
1109 exptime = of.autoviv_exptime;
1110 } else {
1111 comm = NREAD_PREPEND;
1112 }
1113 break;
1114 case 'R': // Replace.
1115 comm = NREAD_REPLACE;
1116 break;
1117 case 'S': // Set. Default.
1118 comm = NREAD_SET;
1119 break;
1120 default:
1121 errstr = "CLIENT_ERROR invalid mode for ms M token";
1122 goto error;
1123 }
1124
1125 // The item storage function doesn't exactly map to mset.
1126 // If a CAS value is supplied, upgrade default SET mode to CAS mode.
1127 // Also allows REPLACE to work, as REPLACE + CAS works the same as CAS.
1128 // add-with-cas works the same as add; but could only LRU bump if match..
1129 // APPEND/PREPEND allow a simplified CAS check.
1130 if (of.has_cas && (comm == NREAD_SET || comm == NREAD_REPLACE)) {
1131 comm = NREAD_CAS;
1132 }
1133
1134 it = item_alloc(key, nkey, of.client_flags, exptime, vlen);
1135
1136 if (it == 0) {
1137 if (! item_size_ok(nkey, of.client_flags, vlen)) {
1138 errstr = "SERVER_ERROR object too large for cache";
1139 pthread_mutex_lock(&t->stats.mutex);
1140 t->stats.store_too_large++;
1141 pthread_mutex_unlock(&t->stats.mutex);
1142 } else {
1143 errstr = "SERVER_ERROR out of memory storing object";
1144 pthread_mutex_lock(&t->stats.mutex);
1145 t->stats.store_no_memory++;
1146 pthread_mutex_unlock(&t->stats.mutex);
1147 }
1148
1149 /* Avoid stale data persisting in cache because we failed alloc. */
1150 // NOTE: only if SET mode?
1151 it = item_get_locked(key, nkey, t, DONT_UPDATE, &hv);
1152 if (it) {
1153 do_item_unlink(it, hv);
1154 STORAGE_delete(t->storage, it);
1155 do_item_remove(it);
1156 }
1157 item_unlock(hv);
1158
1159 goto error;
1160 }
1161 ITEM_set_cas(it, of.req_cas_id);
1162
1163 // data should already be read into the request.
1164
1165 // Prevent printing back the key in meta commands as garbage.
1166 if (of.key_binary) {
1167 it->it_flags |= ITEM_KEY_BINARY;
1168 }
1169
1170 bool set_stale = CAS_NO_STALE;
1171 if (of.set_stale && comm == NREAD_CAS) {
1172 set_stale = CAS_ALLOW_STALE;
1173 }
1174 resp->wbytes = p - resp->wbuf;
1175
1176 pthread_mutex_lock(&t->stats.mutex);
1177 t->stats.slab_stats[ITEM_clsid(it)].set_cmds++;
1178 pthread_mutex_unlock(&t->stats.mutex);
1179
1180 // complete_nread_proxy() does the data chunk check so all we need to do
1181 // is copy the data.
1182 if (_store_item_copy_from_buf(it, pr->vbuf, it->nbytes) != 0) {
1183 pout_string(resp, "SERVER_ERROR out of memory storing object");
1184 item_remove(it);
1185 return;
1186 }
1187
1188 uint64_t cas = 0;
1189 int nbytes = 0;
1190 int ret = store_item(it, comm, t, &nbytes, &cas, of.has_cas_in ? of.cas_id_in : get_cas_id(), set_stale);
1191 switch (ret) {
1192 case STORED:
1193 memcpy(p, "HD", 2);
1194 // Only place noreply is used for meta cmds is a nominal response.
1195 if (of.no_reply) {
1196 resp->skip = true;
1197 }
1198 break;
1199 case EXISTS:
1200 memcpy(p, "EX", 2);
1201 break;
1202 case NOT_FOUND:
1203 memcpy(p, "NF", 2);
1204 break;
1205 case NOT_STORED:
1206 memcpy(p, "NS", 2);
1207 break;
1208 default:
1209 pout_errstring(resp, "SERVER_ERROR Unhandled storage type.");
1210 return;
1211
1212 }
1213 p += 2;
1214
1215 for (i = pr->keytoken+1; i < pr->ntokens; i++) {
1216 switch (pr->request[pr->tokens[i]]) {
1217 case 'O':
1218 tlen = _process_token_len(pr, i);
1219 if (tlen > MFLAG_MAX_OPAQUE_LENGTH) {
1220 errstr = "CLIENT_ERROR opaque token too long";
1221 goto error;
1222 }
1223 META_SPACE(p);
1224 memcpy(p, &pr->request[pr->tokens[i]], tlen);
1225 p += tlen;
1226 break;
1227 case 'k':
1228 META_KEY(p, ITEM_key(it), it->nkey, (it->it_flags & ITEM_KEY_BINARY));
1229 break;
1230 case 'c':
1231 META_CHAR(p, 'c');
1232 p = itoa_u64(cas, p);
1233 break;
1234 case 's':
1235 // Get final item size, ie from append/prepend
1236 META_CHAR(p, 's');
1237 // If the size changed during append/prepend
1238 if (nbytes != 0) {
1239 p = itoa_u32(nbytes-2, p);
1240 } else {
1241 p = itoa_u32(it->nbytes-2, p);
1242 }
1243 break;
1244 }
1245 }
1246
1247 // We don't need to free pr->vbuf as that is owned by *rq
1248 // either way, there's no c->item or resp->item reference right now.
1249
1250 memcpy(p, "\r\n", 2);
1251 p += 2;
1252 // we're offset into wbuf, but good convention to track wbytes.
1253 resp->wbytes = p - resp->wbuf;
1254 resp_add_iov(resp, resp->wbuf, resp->wbytes);
1255
1256 item_remove(it);
1257
1258 return;
1259 error:
1260 // Note: no errors possible after the item was successfully allocated.
1261 // So we're just looking at dumping error codes and returning.
1262 pout_errstring(resp, errstr);
1263 }
1264
process_mdelete_cmd(LIBEVENT_THREAD * t,mcp_parser_t * pr,mc_resp * resp)1265 static void process_mdelete_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp) {
1266 const char *key = &pr->request[pr->tokens[pr->keytoken]];
1267 size_t nkey = pr->klen;
1268 item *it = NULL;
1269 int i;
1270 uint32_t hv;
1271 struct _meta_flags of = {0}; // option bitflags.
1272 char *errstr = "CLIENT_ERROR bad command line format";
1273 assert(t != NULL);
1274 // reserve bytes for status code
1275 char *p = resp->wbuf + 2;
1276 int tlen = 0;
1277
1278 //WANT_TOKENS_MIN(ntokens, 3);
1279
1280 if (nkey > KEY_MAX_LENGTH) {
1281 pout_string(resp, "CLIENT_ERROR bad command line format");
1282 return;
1283 }
1284
1285 if (pr->ntokens > MFLAG_MAX_OPT_LENGTH) {
1286 // TODO: ensure the command tokenizer gives us at least this many
1287 pout_errstring(resp, "CLIENT_ERROR options flags are too long");
1288 return;
1289 }
1290
1291 // scrubs duplicated options and sets flags for how to load the item.
1292 // we pass in the first token that should be a flag.
1293 // FIXME: not using the preparse errstr?
1294 if (_meta_flag_preparse(pr, 2, &of, &errstr) != 0) {
1295 pout_errstring(resp, "CLIENT_ERROR invalid or duplicate flag");
1296 return;
1297 }
1298
1299 for (i = pr->keytoken+1; i < pr->ntokens; i++) {
1300 switch (pr->request[pr->tokens[i]]) {
1301 // TODO: macro perhaps?
1302 case 'O':
1303 tlen = _process_token_len(pr, i);
1304 if (tlen > MFLAG_MAX_OPAQUE_LENGTH) {
1305 errstr = "CLIENT_ERROR opaque token too long";
1306 goto error;
1307 }
1308 META_SPACE(p);
1309 memcpy(p, &pr->request[pr->tokens[i]], tlen);
1310 p += tlen;
1311 break;
1312 case 'k':
1313 META_KEY(p, key, nkey, of.key_binary);
1314 break;
1315 }
1316 }
1317
1318 it = item_get_locked(key, nkey, t, DONT_UPDATE, &hv);
1319 if (it) {
1320 // allow only deleting/marking if a CAS value matches.
1321 if (of.has_cas && ITEM_get_cas(it) != of.req_cas_id) {
1322 pthread_mutex_lock(&t->stats.mutex);
1323 t->stats.delete_misses++;
1324 pthread_mutex_unlock(&t->stats.mutex);
1325
1326 memcpy(resp->wbuf, "EX", 2);
1327 goto cleanup;
1328 }
1329
1330 // If requested, create a new empty tombstone item.
1331 if (of.remove_val) {
1332 item *new_it = item_alloc(key, nkey, of.client_flags, of.exptime, 2);
1333 if (new_it != NULL) {
1334 memcpy(ITEM_data(new_it), "\r\n", 2);
1335 if (do_store_item(new_it, NREAD_SET, t, hv, NULL, NULL,
1336 of.has_cas_in ? of.cas_id_in : ITEM_get_cas(it), CAS_NO_STALE)) {
1337 do_item_remove(it);
1338 it = new_it;
1339 } else {
1340 do_item_remove(new_it);
1341 memcpy(resp->wbuf, "NS", 2);
1342 goto cleanup;
1343 }
1344 } else {
1345 errstr = "SERVER_ERROR out of memory";
1346 goto error;
1347 }
1348 }
1349
1350 // If we're to set this item as stale, we don't actually want to
1351 // delete it. We mark the stale bit, bump CAS, and update exptime if
1352 // we were supplied a new TTL.
1353 if (of.set_stale) {
1354 if (of.new_ttl) {
1355 it->exptime = of.exptime;
1356 }
1357 it->it_flags |= ITEM_STALE;
1358 // Also need to remove TOKEN_SENT, so next client can win.
1359 it->it_flags &= ~ITEM_TOKEN_SENT;
1360
1361 ITEM_set_cas(it, of.has_cas_in ? of.cas_id_in : get_cas_id());
1362 if (of.no_reply)
1363 resp->skip = true;
1364
1365 memcpy(resp->wbuf, "HD", 2);
1366 } else {
1367 pthread_mutex_lock(&t->stats.mutex);
1368 t->stats.slab_stats[ITEM_clsid(it)].delete_hits++;
1369 pthread_mutex_unlock(&t->stats.mutex);
1370 LOGGER_LOG(NULL, LOG_DELETIONS, LOGGER_DELETIONS, it, LOG_TYPE_META_DELETE);
1371
1372 if (!of.remove_val) {
1373 do_item_unlink(it, hv);
1374 STORAGE_delete(t->storage, it);
1375 }
1376 if (of.no_reply)
1377 resp->skip = true;
1378 memcpy(resp->wbuf, "HD", 2);
1379 }
1380 goto cleanup;
1381 } else {
1382 pthread_mutex_lock(&t->stats.mutex);
1383 t->stats.delete_misses++;
1384 pthread_mutex_unlock(&t->stats.mutex);
1385
1386 memcpy(resp->wbuf, "NF", 2);
1387 goto cleanup;
1388 }
1389 cleanup:
1390 if (it) {
1391 do_item_remove(it);
1392 }
1393 // Item is always returned locked, even if missing.
1394 item_unlock(hv);
1395 resp->wbytes = p - resp->wbuf;
1396 memcpy(resp->wbuf + resp->wbytes, "\r\n", 2);
1397 resp->wbytes += 2;
1398 resp_add_iov(resp, resp->wbuf, resp->wbytes);
1399 //conn_set_state(c, conn_new_cmd);
1400 return;
1401 error:
1402 // cleanup if an error happens after we fetched an item.
1403 if (it) {
1404 do_item_remove(it);
1405 item_unlock(hv);
1406 }
1407 pout_errstring(resp, errstr);
1408 }
1409
process_marithmetic_cmd(LIBEVENT_THREAD * t,mcp_parser_t * pr,mc_resp * resp)1410 static void process_marithmetic_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp) {
1411 const char *key = &pr->request[pr->tokens[pr->keytoken]];
1412 size_t nkey = pr->klen;
1413 int i;
1414 struct _meta_flags of = {0}; // option bitflags.
1415 char *errstr = "CLIENT_ERROR bad command line format";
1416 assert(t != NULL);
1417 // no reservation (like del/set) since we post-process the status line.
1418 char *p = resp->wbuf;
1419 int tlen = 0;
1420
1421 // If no argument supplied, incr or decr by one.
1422 of.delta = 1;
1423 of.initial = 0; // redundant, for clarity.
1424 bool incr = true; // default mode is to increment.
1425 bool locked = false;
1426 uint32_t hv = 0;
1427 item *it = NULL; // item returned by do_add_delta.
1428
1429 //WANT_TOKENS_MIN(ntokens, 3);
1430
1431 if (nkey > KEY_MAX_LENGTH) {
1432 pout_string(resp, "CLIENT_ERROR bad command line format");
1433 return;
1434 }
1435
1436 if (pr->ntokens > MFLAG_MAX_OPT_LENGTH) {
1437 // TODO: ensure the command tokenizer gives us at least this many
1438 pout_errstring(resp, "CLIENT_ERROR options flags are too long");
1439 return;
1440 }
1441
1442 // scrubs duplicated options and sets flags for how to load the item.
1443 // we pass in the first token that should be a flag.
1444 if (_meta_flag_preparse(pr, 2, &of, &errstr) != 0) {
1445 pout_errstring(resp, "CLIENT_ERROR invalid or duplicate flag");
1446 return;
1447 }
1448 //c->noreply = of.no_reply;
1449
1450 // "mode switch" to alternative commands
1451 switch (of.mode) {
1452 case 0: // no switch supplied.
1453 break;
1454 case 'I': // Incr (default)
1455 case '+':
1456 incr = true;
1457 break;
1458 case 'D': // Decr.
1459 case '-':
1460 incr = false;
1461 break;
1462 default:
1463 errstr = "CLIENT_ERROR invalid mode for ma M token";
1464 goto error;
1465 break;
1466 }
1467
1468 // take hash value and manually lock item... hold lock during store phase
1469 // on miss and avoid recalculating the hash multiple times.
1470 hv = hash(key, nkey);
1471 item_lock(hv);
1472 locked = true;
1473 char tmpbuf[INCR_MAX_STORAGE_LEN];
1474
1475 // return a referenced item if it exists, so we can modify it here, rather
1476 // than adding even more parameters to do_add_delta.
1477 bool item_created = false;
1478 uint64_t cas = 0;
1479 switch(do_add_delta(t, key, nkey, incr, of.delta, tmpbuf, &of.req_cas_id, hv, &it)) {
1480 case OK:
1481 //if (c->noreply)
1482 // resp->skip = true;
1483 // *it was filled, set the status below.
1484 if (of.has_cas_in) {
1485 // override the CAS. slightly inefficient but fixing that can wait
1486 // until the next time do_add_delta is changed.
1487 ITEM_set_cas(it, of.cas_id_in);
1488 }
1489 cas = ITEM_get_cas(it);
1490 break;
1491 case NON_NUMERIC:
1492 errstr = "CLIENT_ERROR cannot increment or decrement non-numeric value";
1493 goto error;
1494 break;
1495 case EOM:
1496 errstr = "SERVER_ERROR out of memory";
1497 goto error;
1498 break;
1499 case DELTA_ITEM_NOT_FOUND:
1500 if (of.vivify) {
1501 itoa_u64(of.initial, tmpbuf);
1502 int vlen = strlen(tmpbuf);
1503
1504 it = item_alloc(key, nkey, 0, 0, vlen+2);
1505 if (it != NULL) {
1506 memcpy(ITEM_data(it), tmpbuf, vlen);
1507 memcpy(ITEM_data(it) + vlen, "\r\n", 2);
1508 if (do_store_item(it, NREAD_ADD, t, hv, NULL, &cas,
1509 of.has_cas_in ? of.cas_id_in : get_cas_id(), CAS_NO_STALE)) {
1510 item_created = true;
1511 } else {
1512 // Not sure how we can get here if we're holding the lock.
1513 memcpy(resp->wbuf, "NS", 2);
1514 }
1515 } else {
1516 errstr = "SERVER_ERROR Out of memory allocating new item";
1517 goto error;
1518 }
1519 } else {
1520 pthread_mutex_lock(&t->stats.mutex);
1521 if (incr) {
1522 t->stats.incr_misses++;
1523 } else {
1524 t->stats.decr_misses++;
1525 }
1526 pthread_mutex_unlock(&t->stats.mutex);
1527 // won't have a valid it here.
1528 memcpy(p, "NF", 2);
1529 p += 2;
1530 }
1531 break;
1532 case DELTA_ITEM_CAS_MISMATCH:
1533 // also returns without a valid it.
1534 memcpy(p, "EX", 2);
1535 p += 2;
1536 break;
1537 }
1538
1539 // final loop
1540 // allows building the response with information after vivifying from a
1541 // miss, or returning a new CAS value after add_delta().
1542 if (it) {
1543 size_t vlen = strlen(tmpbuf);
1544 if (of.value) {
1545 memcpy(p, "VA ", 3);
1546 p = itoa_u32(vlen, p+3);
1547 } else {
1548 memcpy(p, "HD", 2);
1549 p += 2;
1550 }
1551
1552 for (i = pr->keytoken+1; i < pr->ntokens; i++) {
1553 switch (pr->request[pr->tokens[i]]) {
1554 case 'c':
1555 META_CHAR(p, 'c');
1556 p = itoa_u64(cas, p);
1557 break;
1558 case 't':
1559 META_CHAR(p, 't');
1560 if (it->exptime == 0) {
1561 *p = '-';
1562 *(p+1) = '1';
1563 p += 2;
1564 } else {
1565 p = itoa_u32(it->exptime - current_time, p);
1566 }
1567 break;
1568 case 'T':
1569 it->exptime = of.exptime;
1570 break;
1571 case 'N':
1572 if (item_created) {
1573 it->exptime = of.autoviv_exptime;
1574 }
1575 break;
1576 case 'O':
1577 tlen = _process_token_len(pr, i);
1578 if (tlen > MFLAG_MAX_OPAQUE_LENGTH) {
1579 errstr = "CLIENT_ERROR opaque token too long";
1580 goto error;
1581 }
1582 META_SPACE(p);
1583 memcpy(p, &pr->request[pr->tokens[i]], tlen);
1584 p += tlen;
1585 break;
1586 case 'k':
1587 META_KEY(p, key, nkey, of.key_binary);
1588 break;
1589 }
1590 }
1591
1592 if (of.value) {
1593 *p = '\r';
1594 *(p+1) = '\n';
1595 p += 2;
1596 memcpy(p, tmpbuf, vlen);
1597 p += vlen;
1598 }
1599
1600 do_item_remove(it);
1601 } else {
1602 // No item to handle. still need to return opaque/key tokens
1603 for (i = pr->keytoken+1; i < pr->ntokens; i++) {
1604 switch (pr->request[pr->tokens[i]]) {
1605 case 'O':
1606 tlen = _process_token_len(pr, i);
1607 if (tlen > MFLAG_MAX_OPAQUE_LENGTH) {
1608 errstr = "CLIENT_ERROR opaque token too long";
1609 goto error;
1610 }
1611 META_SPACE(p);
1612 memcpy(p, &pr->request[pr->tokens[i]], tlen);
1613 p += tlen;
1614 break;
1615 case 'k':
1616 META_KEY(p, key, nkey, of.key_binary);
1617 break;
1618 }
1619 }
1620 }
1621
1622 item_unlock(hv);
1623
1624 resp->wbytes = p - resp->wbuf;
1625 memcpy(resp->wbuf + resp->wbytes, "\r\n", 2);
1626 resp->wbytes += 2;
1627 resp_add_iov(resp, resp->wbuf, resp->wbytes);
1628 return;
1629 error:
1630 if (it != NULL)
1631 do_item_remove(it);
1632 if (locked)
1633 item_unlock(hv);
1634 pout_errstring(resp, errstr);
1635 }
1636
1637 /*** Lua and internal handler ***/
1638
mcplib_internal(lua_State * L)1639 int mcplib_internal(lua_State *L) {
1640 luaL_checkudata(L, 1, "mcp.request");
1641 mcp_resp_t *r = lua_newuserdatauv(L, sizeof(mcp_resp_t), 0);
1642 memset(r, 0, sizeof(mcp_resp_t));
1643 luaL_getmetatable(L, "mcp.response");
1644 lua_setmetatable(L, -2);
1645
1646 lua_pushinteger(L, MCP_YIELD_INTERNAL);
1647 return lua_yield(L, 2);
1648 }
1649
1650 // we're pretending to be p_c_ascii(), but reusing our already tokenized code.
1651 // the text parser should eventually move to the new tokenizer and we can
1652 // merge all of this code together.
mcplib_internal_run(mcp_rcontext_t * rctx)1653 int mcplib_internal_run(mcp_rcontext_t *rctx) {
1654 lua_State *L = rctx->Lc;
1655 mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request");
1656 mcp_resp_t *r = luaL_checkudata(L, 2, "mcp.response");
1657 mc_resp *resp = resp_start_unlinked(rctx->c);
1658 LIBEVENT_THREAD *t = rctx->c->thread;
1659 mcp_parser_t *pr = &rq->pr;
1660 if (resp == NULL) {
1661 return -1;
1662 }
1663
1664 // TODO: meta no-op isn't handled here. haven't decided how yet.
1665 switch (rq->pr.command) {
1666 case CMD_MG:
1667 process_mget_cmd(t, pr, resp);
1668 break;
1669 case CMD_MS:
1670 process_mset_cmd(t, pr, resp);
1671 break;
1672 case CMD_MD:
1673 process_mdelete_cmd(t, pr, resp);
1674 break;
1675 case CMD_MA:
1676 process_marithmetic_cmd(t, pr, resp);
1677 break;
1678 case CMD_GET:
1679 process_get_cmd(t, pr, resp, _NO_CAS, _NO_TOUCH);
1680 break;
1681 case CMD_GETS:
1682 process_get_cmd(t, pr, resp, _DO_CAS, _NO_TOUCH);
1683 break;
1684 case CMD_GAT:
1685 process_get_cmd(t, pr, resp, _NO_CAS, _DO_TOUCH);
1686 break;
1687 case CMD_GATS:
1688 process_get_cmd(t, pr, resp, _DO_CAS, _DO_TOUCH);
1689 break;
1690 case CMD_SET:
1691 process_update_cmd(t, pr, resp, NREAD_SET, _NO_CAS);
1692 break;
1693 case CMD_ADD:
1694 process_update_cmd(t, pr, resp, NREAD_ADD, _NO_CAS);
1695 break;
1696 case CMD_APPEND:
1697 process_update_cmd(t, pr, resp, NREAD_APPEND, _NO_CAS);
1698 break;
1699 case CMD_PREPEND:
1700 process_update_cmd(t, pr, resp, NREAD_PREPEND, _NO_CAS);
1701 break;
1702 case CMD_CAS:
1703 process_update_cmd(t, pr, resp, NREAD_CAS, _DO_CAS);
1704 break;
1705 case CMD_REPLACE:
1706 process_update_cmd(t, pr, resp, NREAD_REPLACE, _DO_CAS);
1707 break;
1708 case CMD_INCR:
1709 process_arithmetic_cmd(t, pr, resp, true);
1710 break;
1711 case CMD_DECR:
1712 process_arithmetic_cmd(t, pr, resp, false);
1713 break;
1714 case CMD_DELETE:
1715 process_delete_cmd(t, pr, resp);
1716 break;
1717 case CMD_TOUCH:
1718 process_touch_cmd(t, pr, resp);
1719 break;
1720 default:
1721 resp_free(t, resp);
1722 return -1;
1723 }
1724
1725 // TODO: I'd like to shortcut the parsing here, but if we want the resp
1726 // object to have full support (ie: resp:line()/etc) it might be necessary
1727 // to still do a full parsing. It might be possible to
1728 // wrap the main commands with something that decorates r->resp directly
1729 // instead of going through a parser to save some CPU.
1730 // Either way this is a lot less code.
1731 mcmc_parse_buf(resp->iov[0].iov_base, resp->iov[0].iov_len, &r->resp);
1732
1733 if (rq->ascii_multiget) {
1734 if (r->resp.type == MCMC_RESP_GET) {
1735 // Blank out the END. Bad hack.
1736 resp->iovcnt--;
1737 } else if (r->resp.type == MCMC_RESP_END) {
1738 resp->skip = true;
1739 }
1740 }
1741
1742 // in case someone logs this response it should make sense.
1743 memcpy(r->be_name, "internal", strlen("internal"));
1744 memcpy(r->be_port, "0", 1);
1745
1746 // TODO: r-> will need status/code/mode copied from resp.
1747 r->cresp = resp;
1748 r->thread = t;
1749 r->cmd = rq->pr.command;
1750 // Always return OK from here as this is signalling an internal error.
1751 r->status = MCMC_OK;
1752
1753 // resp object is associated with the
1754 // response object, which is about a
1755 // kilobyte.
1756 t->proxy_vm_extra_kb++;
1757
1758 if (resp->io_pending) {
1759 // TODO (v2): here we move the IO from the temporary resp to the top
1760 // resp, but this feels kludgy so I'm leaving an explicit note to find
1761 // a better way to do this.
1762 rctx->resp->io_pending = resp->io_pending;
1763 resp->io_pending = NULL;
1764
1765 // Add io object to extstore submission queue.
1766 io_queue_t *q = conn_io_queue_get(rctx->c, IO_QUEUE_EXTSTORE);
1767 io_pending_proxy_t *io = (io_pending_proxy_t *)rctx->resp->io_pending;
1768
1769 io->eio.next = q->stack_ctx;
1770 q->stack_ctx = &io->eio;
1771 assert(q->count >= 0);
1772 q->count++;
1773
1774 io->rctx = rctx;
1775 io->c = rctx->c;
1776 io->ascii_multiget = rq->ascii_multiget;
1777 // mark the buffer into the mcp_resp for freeing later.
1778 r->buf = io->eio.buf;
1779 return 1;
1780 }
1781
1782 return 0;
1783 }
1784