1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // Functions related to local command execution.
3 
4 #include "proxy.h"
5 #include "storage.h"
6 
7 #define PROXY_STORAGE_GET 0
8 #define PROXY_STORAGE_MG 1
9 #define _DO_CAS true
10 #define _NO_CAS false
11 #define _DO_TOUCH true
12 #define _NO_TOUCH false
13 
_store_item_copy_from_buf(item * d_it,char * buf,const int len)14 static int _store_item_copy_from_buf(item *d_it, char *buf, const int len) {
15     if (d_it->it_flags & ITEM_CHUNKED) {
16         item_chunk *dch = (item_chunk *) ITEM_schunk(d_it);
17         int done = 0;
18         // Fill dch's via a flat data buffer
19         while (len > done && dch) {
20             int todo = (dch->size - dch->used < len - done)
21                 ? dch->size - dch->used : len - done;
22             memcpy(dch->data + dch->used, buf + done, todo);
23             done += todo;
24             dch->used += todo;
25             assert(dch->used <= dch->size);
26 
27             if (dch->size == dch->used) {
28                 item_chunk *tch = do_item_alloc_chunk(dch, len - done);
29                 if (tch) {
30                     dch = tch;
31                 } else {
32                     return -1;
33                 }
34             }
35         }
36         assert(len == done);
37     } else {
38         memcpy(ITEM_data(d_it), buf, len);
39     }
40 
41     return 0;
42 }
43 
44 // TODO (v2): out_string() needs to change to just take a *resp, but I don't
45 // want to do the huge refactor in this change series. So for now we have a
46 // custom out_string().
pout_string(mc_resp * resp,const char * str)47 static void pout_string(mc_resp *resp, const char *str) {
48     size_t len;
49     bool skip = resp->skip;
50     assert(resp != NULL);
51 
52     // if response was original filled with something, but we're now writing
53     // out an error or similar, have to reset the object first.
54     resp_reset(resp);
55 
56     // We blank the response "just in case", but if we're not intending on
57     // sending it lets not rewrite it.
58     if (skip) {
59         resp->skip = true;
60         return;
61     }
62 
63     // Fill response object with static string.
64 
65     len = strlen(str);
66     if ((len + 2) > WRITE_BUFFER_SIZE) {
67         /* ought to be always enough. just fail for simplicity */
68         str = "SERVER_ERROR output line too long";
69         len = strlen(str);
70     }
71 
72     memcpy(resp->wbuf, str, len);
73     memcpy(resp->wbuf + len, "\r\n", 2);
74     resp_add_iov(resp, resp->wbuf, len + 2);
75 
76     return;
77 }
78 
79 // For meta commands error strings override the quiet flag.
pout_errstring(mc_resp * resp,const char * str)80 static void pout_errstring(mc_resp *resp, const char *str) {
81     resp->skip = false;
82     pout_string(resp, str);
83 }
84 
85 #ifdef EXTSTORE
_storage_get_item_cb(void * e,obj_io * eio,int ret)86 static void _storage_get_item_cb(void *e, obj_io *eio, int ret) {
87     io_pending_proxy_t *io = (io_pending_proxy_t *)eio->data;
88     assert(io->active == true);
89     mc_resp *resp = io->tresp;
90     item *read_it = (item *)eio->buf;
91     bool miss = false;
92 
93     if (ret < 1) {
94         miss = true;
95     } else {
96         uint32_t crc2;
97         uint32_t crc = (uint32_t) read_it->exptime;
98         crc2 = crc32c(0, (char *)read_it+STORE_OFFSET, eio->len-STORE_OFFSET);
99 
100         if (crc != crc2) {
101             miss = true;
102             io->badcrc = true;
103         }
104     }
105 
106     if (miss && !resp->skip) {
107         resp->iovcnt = 1;
108         if (io->gettype == PROXY_STORAGE_GET) {
109             if (io->ascii_multiget) {
110                 resp->iov[0].iov_len = 0;
111                 resp->iov[0].iov_base = "";
112                 resp->tosend = 0;
113             } else {
114                 resp->iov[0].iov_len = 5;
115                 resp->iov[0].iov_base = "END\r\n";
116                 resp->tosend = 5;
117             }
118         } else if (io->gettype == PROXY_STORAGE_MG) {
119             resp->iov[0].iov_len = 4;
120             resp->iov[0].iov_base = "EN\r\n";
121             resp->tosend = 5;
122         } else {
123             assert(1 == 0);
124         }
125     }
126 
127     if (!miss) {
128         resp->iov[io->iovec_data].iov_base = ITEM_data(read_it);
129     }
130     io->miss = miss;
131     io->active = false;
132 
133     // in proxy mode we tend to return IO's as they happen so we can keep
134     // latency down more.
135     return_io_pending((io_pending_t *)io);
136 }
137 
138 // TODO (v2): if the item is smaller than resp->wbuf[] shouldn't we just read
139 // directly into there? item only necessary for recache.
proxy_storage_get(LIBEVENT_THREAD * t,item * it,mc_resp * resp,int type)140 static int proxy_storage_get(LIBEVENT_THREAD *t, item *it, mc_resp *resp,
141         int type) {
142 #ifdef NEED_ALIGN
143     item_hdr hdr;
144     memcpy(&hdr, ITEM_data(it), sizeof(hdr));
145 #else
146     item_hdr *hdr = (item_hdr *)ITEM_data(it);
147 #endif
148     size_t ntotal = ITEM_ntotal(it);
149 
150     io_pending_proxy_t *io = do_cache_alloc(t->io_cache);
151     // this is a re-cast structure, so assert that we never outsize it.
152     assert(sizeof(io_pending_t) >= sizeof(io_pending_proxy_t));
153     memset(io, 0, sizeof(io_pending_proxy_t));
154     io->active = true;
155     // io_pending owns the reference for this object now.
156     io->hdr_it = it;
157     io->tresp = resp; // our mc_resp is a temporary object.
158     io->io_queue_type = IO_QUEUE_EXTSTORE;
159     io->io_type = IO_PENDING_TYPE_EXTSTORE; // proxy specific sub-type.
160     io->gettype = type;
161     io->thread = t;
162     io->return_cb = proxy_return_rctx_cb;
163     io->finalize_cb = proxy_finalize_rctx_cb;
164     obj_io *eio = &io->eio;
165 
166     eio->buf = malloc(ntotal);
167     if (eio->buf == NULL) {
168         do_cache_free(t->io_cache, io);
169         return -1;
170     }
171 
172     io->iovec_data = resp->iovcnt;
173     resp_add_iov(resp, "", it->nbytes);
174 
175     // We can't bail out anymore, so mc_resp owns the IO from here.
176     resp->io_pending = (io_pending_t *)io;
177 
178     // reference ourselves for the callback.
179     eio->data = (void *)io;
180 
181     // Now, fill in io->io based on what was in our header.
182 #ifdef NEED_ALIGN
183     eio->page_version = hdr.page_version;
184     eio->page_id = hdr.page_id;
185     eio->offset = hdr.offset;
186 #else
187     eio->page_version = hdr->page_version;
188     eio->page_id = hdr->page_id;
189     eio->offset = hdr->offset;
190 #endif
191     eio->len = ntotal;
192     eio->mode = OBJ_IO_READ;
193     eio->cb = _storage_get_item_cb;
194 
195     pthread_mutex_lock(&t->stats.mutex);
196     t->stats.get_extstore++;
197     pthread_mutex_unlock(&t->stats.mutex);
198 
199     return 0;
200 }
201 #endif // EXTSTORE
202 
203 /* client flags == 0 means use no storage for client flags */
make_ascii_get_suffix(char * suffix,item * it,bool return_cas,int nbytes)204 static inline int make_ascii_get_suffix(char *suffix, item *it, bool return_cas, int nbytes) {
205     char *p = suffix;
206     *p = ' ';
207     p++;
208     if (FLAGS_SIZE(it) == 0) {
209         *p = '0';
210         p++;
211     } else {
212         p = itoa_u64(*((client_flags_t *) ITEM_suffix(it)), p);
213     }
214     *p = ' ';
215     p = itoa_u32(nbytes-2, p+1);
216 
217     if (return_cas) {
218         *p = ' ';
219         p = itoa_u64(ITEM_get_cas(it), p+1);
220     }
221 
222     *p = '\r';
223     *(p+1) = '\n';
224     *(p+2) = '\0';
225     return (p - suffix) + 2;
226 }
227 
process_get_cmd(LIBEVENT_THREAD * t,mcp_parser_t * pr,mc_resp * resp,bool return_cas,bool should_touch)228 static void process_get_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp, bool return_cas, bool should_touch) {
229     const char *key = &pr->request[pr->tokens[pr->keytoken]];
230     int nkey = pr->klen;
231     rel_time_t exptime = 0;
232     bool overflow = false; // unused.
233 
234     if (nkey > KEY_MAX_LENGTH) {
235         pout_string(resp, "CLIENT_ERROR bad command line format");
236         return;
237     }
238 
239     item *it = limited_get(key, nkey, t, exptime, should_touch, DO_UPDATE, &overflow);
240     if (it) {
241       int nbytes = it->nbytes;;
242       nbytes = it->nbytes;
243       char *p = resp->wbuf;
244       memcpy(p, "VALUE ", 6);
245       p += 6;
246       memcpy(p, ITEM_key(it), it->nkey);
247       p += it->nkey;
248       p += make_ascii_get_suffix(p, it, return_cas, nbytes);
249       resp_add_iov(resp, resp->wbuf, p - resp->wbuf);
250 
251 #ifdef EXTSTORE
252       if (it->it_flags & ITEM_HDR) {
253           if (proxy_storage_get(t, it, resp, PROXY_STORAGE_GET) != 0) {
254               pthread_mutex_lock(&t->stats.mutex);
255               t->stats.get_oom_extstore++;
256               pthread_mutex_unlock(&t->stats.mutex);
257 
258               item_remove(it);
259               proxy_out_errstring(resp, PROXY_SERVER_ERROR, "out of memory writing get response");
260               return;
261           }
262       } else if ((it->it_flags & ITEM_CHUNKED) == 0) {
263           resp_add_iov(resp, ITEM_data(it), it->nbytes);
264       } else {
265           resp_add_chunked_iov(resp, it, it->nbytes);
266       }
267 #else
268       if ((it->it_flags & ITEM_CHUNKED) == 0) {
269           resp_add_iov(resp, ITEM_data(it), it->nbytes);
270       } else {
271           resp_add_chunked_iov(resp, it, it->nbytes);
272       }
273 #endif
274 
275         /* item_get() has incremented it->refcount for us */
276         pthread_mutex_lock(&t->stats.mutex);
277         if (should_touch) {
278             t->stats.touch_cmds++;
279             t->stats.slab_stats[ITEM_clsid(it)].touch_hits++;
280         } else {
281             t->stats.lru_hits[it->slabs_clsid]++;
282             t->stats.get_cmds++;
283         }
284         pthread_mutex_unlock(&t->stats.mutex);
285 #ifdef EXTSTORE
286         /* If ITEM_HDR, an io_wrap owns the reference. */
287         if ((it->it_flags & ITEM_HDR) == 0) {
288             resp->item = it;
289         }
290 #else
291         resp->item = it;
292 #endif
293     } else {
294         pthread_mutex_lock(&t->stats.mutex);
295         if (should_touch) {
296             t->stats.touch_cmds++;
297             t->stats.touch_misses++;
298         } else {
299             t->stats.get_misses++;
300             t->stats.get_cmds++;
301         }
302         pthread_mutex_unlock(&t->stats.mutex);
303     }
304 
305     resp_add_iov(resp, "END\r\n", 5);
306     return;
307 }
308 
process_update_cmd(LIBEVENT_THREAD * t,mcp_parser_t * pr,mc_resp * resp,int comm,bool handle_cas)309 static void process_update_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp, int comm, bool handle_cas) {
310     const char *key = &pr->request[pr->tokens[pr->keytoken]];
311     size_t nkey = pr->klen;
312     client_flags_t flags;
313     int32_t exptime_int = 0;
314     rel_time_t exptime = 0;
315     uint64_t req_cas_id = 0;
316     item *it;
317 
318     assert(resp != NULL);
319 
320     if (nkey > KEY_MAX_LENGTH) {
321         pout_string(resp, "CLIENT_ERROR bad command line format");
322         return;
323     }
324 
325     // TODO (v2): these safe_str* functions operate on C _strings_, but these
326     // tokens simply end with a space or carriage return/newline, so we either
327     // need custom functions or validate harder that these calls won't bite us
328     // later.
329     if (! (safe_strtoflags(&pr->request[pr->tokens[2]], &flags)
330            && safe_strtol(&pr->request[pr->tokens[3]], &exptime_int))) {
331         pout_string(resp, "CLIENT_ERROR bad command line format");
332         return;
333     }
334 
335     exptime = realtime(EXPTIME_TO_POSITIVE_TIME(exptime_int));
336 
337     // does cas value exist?
338     if (handle_cas) {
339         if (!safe_strtoull(&pr->request[pr->tokens[5]], &req_cas_id)) {
340             pout_string(resp, "CLIENT_ERROR bad command line format");
341             return;
342         }
343     }
344 
345     // vlen is validated from the main parser.
346 
347     if (settings.detail_enabled) {
348         stats_prefix_record_set(key, nkey);
349     }
350 
351     it = item_alloc(key, nkey, flags, exptime, pr->vlen);
352 
353     if (it == 0) {
354         //enum store_item_type status;
355         if (! item_size_ok(nkey, flags, pr->vlen)) {
356             pout_string(resp, "SERVER_ERROR object too large for cache");
357             //status = TOO_LARGE;
358             pthread_mutex_lock(&t->stats.mutex);
359             t->stats.store_too_large++;
360             pthread_mutex_unlock(&t->stats.mutex);
361         } else {
362             pout_string(resp, "SERVER_ERROR out of memory storing object");
363             //status = NO_MEMORY;
364             pthread_mutex_lock(&t->stats.mutex);
365             t->stats.store_no_memory++;
366             pthread_mutex_unlock(&t->stats.mutex);
367         }
368         //LOGGER_LOG(c->thread->l, LOG_MUTATIONS, LOGGER_ITEM_STORE,
369         //        NULL, status, comm, key, nkey, 0, 0, c->sfd);
370 
371         /* Avoid stale data persisting in cache because we failed alloc.
372          * Unacceptable for SET. Anywhere else too? */
373         if (comm == NREAD_SET) {
374             it = item_get(key, nkey, t, DONT_UPDATE);
375             if (it) {
376                 item_unlink(it);
377                 STORAGE_delete(t->storage, it);
378                 item_remove(it);
379             }
380         }
381 
382         return;
383     }
384     ITEM_set_cas(it, req_cas_id);
385 
386     pthread_mutex_lock(&t->stats.mutex);
387     t->stats.slab_stats[ITEM_clsid(it)].set_cmds++;
388     pthread_mutex_unlock(&t->stats.mutex);
389 
390     // complete_nread_proxy() does the data chunk check so all we need to do
391     // is copy the data.
392     if (_store_item_copy_from_buf(it, pr->vbuf, it->nbytes) != 0) {
393         pout_string(resp, "SERVER_ERROR out of memory storing object");
394         item_remove(it);
395         return;
396     }
397 
398     int ret = store_item(it, comm, t, NULL, NULL, (settings.use_cas) ? get_cas_id() : 0, CAS_NO_STALE);
399     switch (ret) {
400     case STORED:
401       pout_string(resp, "STORED");
402       break;
403     case EXISTS:
404       pout_string(resp, "EXISTS");
405       break;
406     case NOT_FOUND:
407       pout_string(resp, "NOT_FOUND");
408       break;
409     case NOT_STORED:
410       pout_string(resp, "NOT_STORED");
411       break;
412     default:
413       pout_string(resp, "SERVER_ERROR Unhandled storage type.");
414     }
415 
416     // We don't need to hold a reference since the item was fully read.
417     item_remove(it);
418 }
419 
process_arithmetic_cmd(LIBEVENT_THREAD * t,mcp_parser_t * pr,mc_resp * resp,const bool incr)420 static void process_arithmetic_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp, const bool incr) {
421     char temp[INCR_MAX_STORAGE_LEN];
422     uint64_t delta;
423     const char *key = &pr->request[pr->tokens[pr->keytoken]];
424     size_t nkey = pr->klen;
425 
426     assert(t != NULL);
427 
428     if (nkey > KEY_MAX_LENGTH) {
429         pout_string(resp, "CLIENT_ERROR bad command line format");
430         return;
431     }
432 
433     if (!safe_strtoull(&pr->request[pr->tokens[2]], &delta)) {
434         pout_string(resp, "CLIENT_ERROR invalid numeric delta argument");
435         return;
436     }
437 
438     switch(add_delta(t, key, nkey, incr, delta, temp, NULL)) {
439     case OK:
440         pout_string(resp, temp);
441         break;
442     case NON_NUMERIC:
443         pout_string(resp, "CLIENT_ERROR cannot increment or decrement non-numeric value");
444         break;
445     case EOM:
446         pout_string(resp, "SERVER_ERROR out of memory");
447         break;
448     case DELTA_ITEM_NOT_FOUND:
449         pthread_mutex_lock(&t->stats.mutex);
450         if (incr) {
451             t->stats.incr_misses++;
452         } else {
453             t->stats.decr_misses++;
454         }
455         pthread_mutex_unlock(&t->stats.mutex);
456 
457         pout_string(resp, "NOT_FOUND");
458         break;
459     case DELTA_ITEM_CAS_MISMATCH:
460         break; /* Should never get here */
461     }
462 }
463 
process_delete_cmd(LIBEVENT_THREAD * t,mcp_parser_t * pr,mc_resp * resp)464 static void process_delete_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp) {
465     const char *key = &pr->request[pr->tokens[pr->keytoken]];
466     size_t nkey = pr->klen;
467     item *it;
468     uint32_t hv;
469 
470     assert(t != NULL);
471 
472     // NOTE: removed a compatibility bodge from a decade ago.
473     // delete used to take a "delay" argument, which was removed, but some
474     // ancient php clients always sent a 0 argument, which would then fail.
475     // It's been long enough that I don't want to carry this forward into the
476     // new parser.
477 
478     if (nkey > KEY_MAX_LENGTH) {
479         pout_string(resp, "CLIENT_ERROR bad command line format");
480         return;
481     }
482 
483     it = item_get_locked(key, nkey, t, DONT_UPDATE, &hv);
484     if (it) {
485         //MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey);
486 
487         pthread_mutex_lock(&t->stats.mutex);
488         t->stats.slab_stats[ITEM_clsid(it)].delete_hits++;
489         pthread_mutex_unlock(&t->stats.mutex);
490         LOGGER_LOG(NULL, LOG_DELETIONS, LOGGER_DELETIONS, it, LOG_TYPE_DELETE);
491         do_item_unlink(it, hv);
492         STORAGE_delete(t->storage, it);
493         do_item_remove(it);      /* release our reference */
494         pout_string(resp, "DELETED");
495     } else {
496         pthread_mutex_lock(&t->stats.mutex);
497         t->stats.delete_misses++;
498         pthread_mutex_unlock(&t->stats.mutex);
499 
500         pout_string(resp, "NOT_FOUND");
501     }
502     item_unlock(hv);
503 }
504 
process_touch_cmd(LIBEVENT_THREAD * t,mcp_parser_t * pr,mc_resp * resp)505 static void process_touch_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp) {
506     const char *key = &pr->request[pr->tokens[pr->keytoken]];
507     size_t nkey = pr->klen;
508     int32_t exptime_int = 0;
509     rel_time_t exptime = 0;
510     item *it;
511 
512     assert(t != NULL);
513 
514     if (nkey > KEY_MAX_LENGTH) {
515         pout_string(resp, "CLIENT_ERROR bad command line format");
516         return;
517     }
518 
519     if (!safe_strtol(&pr->request[pr->tokens[2]], &exptime_int)) {
520         pout_string(resp, "CLIENT_ERROR invalid exptime argument");
521         return;
522     }
523 
524     exptime = realtime(EXPTIME_TO_POSITIVE_TIME(exptime_int));
525     it = item_touch(key, nkey, exptime, t);
526     if (it) {
527         pthread_mutex_lock(&t->stats.mutex);
528         t->stats.touch_cmds++;
529         t->stats.slab_stats[ITEM_clsid(it)].touch_hits++;
530         pthread_mutex_unlock(&t->stats.mutex);
531 
532         pout_string(resp, "TOUCHED");
533         item_remove(it);
534     } else {
535         pthread_mutex_lock(&t->stats.mutex);
536         t->stats.touch_cmds++;
537         t->stats.touch_misses++;
538         pthread_mutex_unlock(&t->stats.mutex);
539 
540         pout_string(resp, "NOT_FOUND");
541     }
542 }
543 
544 /*** meta command handlers ***/
545 
546 // FIXME: macro or public interface, this is copypasted.
_process_token_len(mcp_parser_t * pr,size_t token)547 static int _process_token_len(mcp_parser_t *pr, size_t token) {
548   const char *s = pr->request + pr->tokens[token];
549   const char *e = pr->request + pr->tokens[token+1];
550   // start of next token is after any space delimiters, so back those out.
551   while (*(e-1) == ' ') {
552       e--;
553   }
554   return e - s;
555 }
556 
557 #define META_SPACE(p) { \
558     *p = ' '; \
559     p++; \
560 }
561 
562 #define META_CHAR(p, c) { \
563     *p = ' '; \
564     *(p+1) = c; \
565     p += 2; \
566 }
567 
568 // FIXME: binary key support.
569 #define META_KEY(p, key, nkey, bin) { \
570     META_CHAR(p, 'k'); \
571     memcpy(p, key, nkey); \
572     p += nkey; \
573 }
574 
575 #define MFLAG_MAX_OPT_LENGTH 20
576 #define MFLAG_MAX_OPAQUE_LENGTH 32
577 
578 struct _meta_flags {
579     unsigned int has_error :1; // flipped if we found an error during parsing.
580     unsigned int no_update :1;
581     unsigned int locked :1;
582     unsigned int vivify :1;
583     unsigned int la :1;
584     unsigned int hit :1;
585     unsigned int value :1;
586     unsigned int set_stale :1;
587     unsigned int no_reply :1;
588     unsigned int has_cas :1;
589     unsigned int has_cas_in :1;
590     unsigned int new_ttl :1;
591     unsigned int key_binary:1;
592     unsigned int remove_val:1;
593     char mode; // single character mode switch, common to ms/ma
594     rel_time_t exptime;
595     rel_time_t autoviv_exptime;
596     rel_time_t recache_time;
597     client_flags_t client_flags;
598     uint64_t req_cas_id;
599     uint64_t cas_id_in; // client supplied next-CAS
600     uint64_t delta; // ma
601     uint64_t initial; // ma
602 };
603 
_meta_flag_preparse(mcp_parser_t * pr,const size_t start,struct _meta_flags * of,char ** errstr)604 static int _meta_flag_preparse(mcp_parser_t *pr, const size_t start,
605         struct _meta_flags *of, char **errstr) {
606     unsigned int i;
607     //size_t ret;
608     int32_t tmp_int;
609     uint8_t seen[127] = {0};
610     // Start just past the key token. Look at first character of each token.
611     for (i = start; i < pr->ntokens; i++) {
612         uint8_t o = (uint8_t)pr->request[pr->tokens[i]];
613         // zero out repeat flags so we don't over-parse for return data.
614         if (o >= 127 || seen[o] != 0) {
615             *errstr = "CLIENT_ERROR duplicate flag";
616             return -1;
617         }
618         seen[o] = 1;
619         switch (o) {
620             // base64 decode the key in-place, as the binary should always be
621             // shorter and the conversion code buffers bytes.
622             // TODO: we need temporary space for the binary key decode since
623             // request should be const.
624             /*case 'b':
625                 ret = base64_decode((unsigned char *)tokens[KEY_TOKEN].value, tokens[KEY_TOKEN].length,
626                             (unsigned char *)tokens[KEY_TOKEN].value, tokens[KEY_TOKEN].length);
627                 if (ret == 0) {
628                     // Failed to decode
629                     *errstr = "CLIENT_ERROR error decoding key";
630                     of->has_error = 1;
631                 }
632                 tokens[KEY_TOKEN].length = ret;
633                 of->key_binary = 1;
634                 break;*/
635             /* Negative exptimes can underflow and end up immortal. realtime() will
636                immediately expire values that are greater than REALTIME_MAXDELTA, but less
637                than process_started, so lets aim for that. */
638             case 'N':
639                 of->locked = 1;
640                 of->vivify = 1;
641                 if (!safe_strtol(&pr->request[pr->tokens[i]+1], &tmp_int)) {
642                     *errstr = "CLIENT_ERROR bad token in command line format";
643                     of->has_error = 1;
644                 } else {
645                     of->autoviv_exptime = realtime(EXPTIME_TO_POSITIVE_TIME(tmp_int));
646                 }
647                 break;
648             case 'T':
649                 of->locked = 1;
650                 if (!safe_strtol(&pr->request[pr->tokens[i]+1], &tmp_int)) {
651                     *errstr = "CLIENT_ERROR bad token in command line format";
652                     of->has_error = 1;
653                 } else {
654                     of->exptime = realtime(EXPTIME_TO_POSITIVE_TIME(tmp_int));
655                     of->new_ttl = true;
656                 }
657                 break;
658             case 'R':
659                 of->locked = 1;
660                 if (!safe_strtol(&pr->request[pr->tokens[i]+1], &tmp_int)) {
661                     *errstr = "CLIENT_ERROR bad token in command line format";
662                     of->has_error = 1;
663                 } else {
664                     of->recache_time = realtime(EXPTIME_TO_POSITIVE_TIME(tmp_int));
665                 }
666                 break;
667             case 'l':
668                 of->la = 1;
669                 of->locked = 1; // need locked to delay LRU bump
670                 break;
671             case 'O':
672             case 'P':
673             case 'L':
674                 break;
675             case 'k': // known but no special handling
676             case 's':
677             case 't':
678             case 'c':
679             case 'f':
680                 break;
681             case 'v':
682                 of->value = 1;
683                 break;
684             case 'h':
685                 of->locked = 1; // need locked to delay LRU bump
686                 break;
687             case 'u':
688                 of->no_update = 1;
689                 break;
690             case 'q':
691                 of->no_reply = 1;
692                 break;
693             case 'x':
694                 of->remove_val = 1;
695                 break;
696             // mset-related.
697             case 'F':
698                 if (!safe_strtoflags(&pr->request[pr->tokens[i]+1], &of->client_flags)) {
699                     of->has_error = true;
700                 }
701                 break;
702             case 'C': // mset, mdelete, marithmetic
703                 if (!safe_strtoull(&pr->request[pr->tokens[i]+1], &of->req_cas_id)) {
704                     *errstr = "CLIENT_ERROR bad token in command line format";
705                     of->has_error = true;
706                 } else {
707                     of->has_cas = true;
708                 }
709                 break;
710             case 'E': // ms, md, ma
711                 if (!safe_strtoull(&pr->request[pr->tokens[i]+1], &of->cas_id_in)) {
712                     *errstr = "CLIENT_ERROR bad token in command line format";
713                     of->has_error = true;
714                 } else {
715                     of->has_cas_in = true;
716                 }
717                 break;
718             case 'M': // mset and marithmetic mode switch
719                 // FIXME: this used to error if the token isn't a single byte.
720                 // It probably should still?
721                 of->mode = pr->request[pr->tokens[i]+1];
722                 break;
723             case 'J': // marithmetic initial value
724                 if (!safe_strtoull(&pr->request[pr->tokens[i]+1], &of->initial)) {
725                     *errstr = "CLIENT_ERROR invalid numeric initial value";
726                     of->has_error = 1;
727                 }
728                 break;
729             case 'D': // marithmetic delta value
730                 if (!safe_strtoull(&pr->request[pr->tokens[i]+1], &of->delta)) {
731                     *errstr = "CLIENT_ERROR invalid numeric delta value";
732                     of->has_error = 1;
733                 }
734                 break;
735             case 'I':
736                 of->set_stale = 1;
737                 break;
738             default: // unknown flag, bail.
739                 *errstr = "CLIENT_ERROR invalid flag";
740                 return -1;
741         }
742     }
743 
744     return of->has_error ? -1 : 0;
745 }
746 
process_mget_cmd(LIBEVENT_THREAD * t,mcp_parser_t * pr,mc_resp * resp)747 static void process_mget_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp) {
748     const char *key = &pr->request[pr->tokens[pr->keytoken]];
749     size_t nkey = pr->klen;
750     item *it;
751     unsigned int i = 0;
752     struct _meta_flags of = {0}; // option bitflags.
753     uint32_t hv; // cached hash value for unlocking an item.
754     bool failed = false;
755     bool item_created = false;
756     bool won_token = false;
757     bool ttl_set = false;
758     char *errstr = "CLIENT_ERROR bad command line format";
759     assert(t != NULL);
760     char *p = resp->wbuf;
761     int tlen = 0;
762 
763     // FIXME: still needed?
764     //WANT_TOKENS_MIN(ntokens, 3);
765 
766     if (nkey > KEY_MAX_LENGTH) {
767         pout_string(resp, "CLIENT_ERROR bad command line format");
768         return;
769     }
770 
771     if (pr->ntokens > MFLAG_MAX_OPT_LENGTH) {
772         // TODO: ensure the command tokenizer gives us at least this many
773         pout_errstring(resp, "CLIENT_ERROR options flags are too long");
774         return;
775     }
776 
777     // scrubs duplicated options and sets flags for how to load the item.
778     // we pass in the first token that should be a flag.
779     if (_meta_flag_preparse(pr, 2, &of, &errstr) != 0) {
780         pout_errstring(resp, errstr);
781         return;
782     }
783 
784     bool overflow = false;
785     if (!of.locked) {
786         it = limited_get(key, nkey, t, 0, false, !of.no_update, &overflow);
787     } else {
788         // If we had to lock the item, we're doing our own bump later.
789         it = limited_get_locked(key, nkey, t, DONT_UPDATE, &hv, &overflow);
790     }
791 
792     // Since we're a new protocol, we can actually inform users that refcount
793     // overflow is happening by straight up throwing an error.
794     // We definitely don't want to re-autovivify by accident.
795     if (overflow) {
796         assert(it == NULL);
797         pout_errstring(resp, "SERVER_ERROR refcount overflow during fetch");
798         return;
799     }
800 
801     if (it == NULL && of.vivify) {
802         // Fill in the exptime during parsing later.
803         it = item_alloc(key, nkey, 0, realtime(0), 2);
804         // We don't actually need any of do_store_item's logic:
805         // - already fetched and missed an existing item.
806         // - lock is still held.
807         // - not append/prepend/replace
808         // - not testing CAS
809         if (it != NULL) {
810             // I look forward to the day I get rid of this :)
811             memcpy(ITEM_data(it), "\r\n", 2);
812             // NOTE: This initializes the CAS value.
813             do_item_link(it, hv, of.has_cas_in ? of.cas_id_in : get_cas_id());
814             item_created = true;
815         }
816     }
817 
818     // don't have to check result of add_iov() since the iov size defaults are
819     // enough.
820     if (it) {
821         if (of.value) {
822             memcpy(p, "VA ", 3);
823             p = itoa_u32(it->nbytes-2, p+3);
824         } else {
825             memcpy(p, "HD", 2);
826             p += 2;
827         }
828 
829         for (i = pr->keytoken+1; i < pr->ntokens; i++) {
830             switch (pr->request[pr->tokens[i]]) {
831                 case 'T':
832                     ttl_set = true;
833                     it->exptime = of.exptime;
834                     break;
835                 case 'N':
836                     if (item_created) {
837                         it->exptime = of.autoviv_exptime;
838                         won_token = true;
839                     }
840                     break;
841                 case 'R':
842                     // If we haven't autovivified and supplied token is less
843                     // than current TTL, mark a win.
844                     if ((it->it_flags & ITEM_TOKEN_SENT) == 0
845                             && !item_created
846                             && it->exptime != 0
847                             && it->exptime < of.recache_time) {
848                         won_token = true;
849                     }
850                     break;
851                 case 's':
852                     META_CHAR(p, 's');
853                     p = itoa_u32(it->nbytes-2, p);
854                     break;
855                 case 't':
856                     // TTL remaining as of this request.
857                     // needs to be relative because server clocks may not be in sync.
858                     META_CHAR(p, 't');
859                     if (it->exptime == 0) {
860                         *p = '-';
861                         *(p+1) = '1';
862                         p += 2;
863                     } else {
864                         p = itoa_u32(it->exptime - current_time, p);
865                     }
866                     break;
867                 case 'c':
868                     META_CHAR(p, 'c');
869                     p = itoa_u64(ITEM_get_cas(it), p);
870                     break;
871                 case 'f':
872                     META_CHAR(p, 'f');
873                     if (FLAGS_SIZE(it) == 0) {
874                         *p = '0';
875                         p++;
876                     } else {
877                         p = itoa_u64(*((client_flags_t *) ITEM_suffix(it)), p);
878                     }
879                     break;
880                 case 'l':
881                     META_CHAR(p, 'l');
882                     p = itoa_u32(current_time - it->time, p);
883                     break;
884                 case 'h':
885                     META_CHAR(p, 'h');
886                     if (it->it_flags & ITEM_FETCHED) {
887                         *p = '1';
888                     } else {
889                         *p = '0';
890                     }
891                     p++;
892                     break;
893                 case 'O':
894                     tlen = _process_token_len(pr, i);
895                     if (tlen > MFLAG_MAX_OPAQUE_LENGTH) {
896                         errstr = "CLIENT_ERROR opaque token too long";
897                         goto error;
898                     }
899                     META_SPACE(p);
900                     memcpy(p, &pr->request[pr->tokens[i]], tlen);
901                     p += tlen;
902                     break;
903                 case 'k':
904                     META_KEY(p, ITEM_key(it), it->nkey, (it->it_flags & ITEM_KEY_BINARY));
905                     break;
906             }
907         }
908 
909         // Has this item already sent a token?
910         // Important to do this here so we don't send W with Z.
911         // Isn't critical, but easier for client authors to understand.
912         if (it->it_flags & ITEM_TOKEN_SENT) {
913             META_CHAR(p, 'Z');
914         }
915         if (it->it_flags & ITEM_STALE) {
916             META_CHAR(p, 'X');
917             // FIXME: think hard about this. is this a default, or a flag?
918             if ((it->it_flags & ITEM_TOKEN_SENT) == 0) {
919                 // If we're stale but no token already sent, now send one.
920                 won_token = true;
921             }
922         }
923 
924         if (won_token) {
925             // Mark a win into the flag buffer.
926             META_CHAR(p, 'W');
927             it->it_flags |= ITEM_TOKEN_SENT;
928         }
929 
930         *p = '\r';
931         *(p+1) = '\n';
932         *(p+2) = '\0';
933         p += 2;
934         // finally, chain in the buffer.
935         resp_add_iov(resp, resp->wbuf, p - resp->wbuf);
936 
937         if (of.value) {
938 #ifdef EXTSTORE
939             if (it->it_flags & ITEM_HDR) {
940                 if (proxy_storage_get(t, it, resp, PROXY_STORAGE_MG) != 0) {
941                     pthread_mutex_lock(&t->stats.mutex);
942                     t->stats.get_oom_extstore++;
943                     pthread_mutex_unlock(&t->stats.mutex);
944 
945                     failed = true;
946                 }
947             } else if ((it->it_flags & ITEM_CHUNKED) == 0) {
948                 resp_add_iov(resp, ITEM_data(it), it->nbytes);
949             } else {
950                 resp_add_chunked_iov(resp, it, it->nbytes);
951             }
952 #else
953             if ((it->it_flags & ITEM_CHUNKED) == 0) {
954                 resp_add_iov(resp, ITEM_data(it), it->nbytes);
955             } else {
956                 resp_add_chunked_iov(resp, it, it->nbytes);
957             }
958 #endif
959         }
960 
961         // need to hold the ref at least because of the key above.
962 #ifdef EXTSTORE
963         if (!failed) {
964             if ((it->it_flags & ITEM_HDR) != 0 && of.value) {
965                 // Only have extstore clean if header and returning value.
966                 resp->item = NULL;
967             } else {
968                 resp->item = it;
969             }
970         } else {
971             // Failed to set up extstore fetch.
972             if (of.locked) {
973                 do_item_remove(it);
974             } else {
975                 item_remove(it);
976             }
977         }
978 #else
979         resp->item = it;
980 #endif
981     } else {
982         failed = true;
983     }
984 
985     if (of.locked) {
986         // Delayed bump so we could get fetched/last access time pre-update.
987         if (!of.no_update && it != NULL) {
988             do_item_bump(t, it, hv);
989         }
990         item_unlock(hv);
991     }
992 
993     // we count this command as a normal one if we've gotten this far.
994     // TODO: for autovivify case, miss never happens. Is this okay?
995     if (!failed) {
996         pthread_mutex_lock(&t->stats.mutex);
997         if (ttl_set) {
998             t->stats.touch_cmds++;
999             t->stats.slab_stats[ITEM_clsid(it)].touch_hits++;
1000         } else {
1001             t->stats.lru_hits[it->slabs_clsid]++;
1002             t->stats.get_cmds++;
1003         }
1004         pthread_mutex_unlock(&t->stats.mutex);
1005     } else {
1006         pthread_mutex_lock(&t->stats.mutex);
1007         if (ttl_set) {
1008             t->stats.touch_cmds++;
1009             t->stats.touch_misses++;
1010         } else {
1011             t->stats.get_misses++;
1012             t->stats.get_cmds++;
1013         }
1014         pthread_mutex_unlock(&t->stats.mutex);
1015 
1016         // This gets elided in noreply mode.
1017         if (of.no_reply)
1018             resp->skip = true;
1019         memcpy(p, "EN", 2);
1020         p += 2;
1021         for (i = pr->keytoken+1; i < pr->ntokens; i++) {
1022             switch (pr->request[pr->tokens[i]]) {
1023                 // TODO: macro perhaps?
1024                 case 'O':
1025                     tlen = _process_token_len(pr, i);
1026                     if (tlen > MFLAG_MAX_OPAQUE_LENGTH) {
1027                         errstr = "CLIENT_ERROR opaque token too long";
1028                         goto error;
1029                     }
1030                     META_SPACE(p);
1031                     memcpy(p, &pr->request[pr->tokens[i]], tlen);
1032                     p += tlen;
1033                     break;
1034                 case 'k':
1035                     META_KEY(p, key, nkey, of.key_binary);
1036                     break;
1037             }
1038         }
1039         resp->wbytes = p - resp->wbuf;
1040         memcpy(resp->wbuf + resp->wbytes, "\r\n", 2);
1041         resp->wbytes += 2;
1042         resp_add_iov(resp, resp->wbuf, resp->wbytes);
1043     }
1044     return;
1045 error:
1046     if (it) {
1047         do_item_remove(it);
1048         if (of.locked) {
1049             item_unlock(hv);
1050         }
1051     }
1052     pout_errstring(resp, errstr);
1053 }
1054 
process_mset_cmd(LIBEVENT_THREAD * t,mcp_parser_t * pr,mc_resp * resp)1055 static void process_mset_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp) {
1056     const char *key = &pr->request[pr->tokens[pr->keytoken]];
1057     size_t nkey = pr->klen;
1058 
1059     item *it;
1060     int i;
1061     short comm = NREAD_SET;
1062     struct _meta_flags of = {0}; // option bitflags.
1063     char *errstr = "CLIENT_ERROR bad command line format";
1064     uint32_t hv; // cached hash value.
1065     int vlen = pr->vlen; // value from data line.
1066     assert(t != NULL);
1067     char *p = resp->wbuf;
1068     int tlen = 0;
1069 
1070     //WANT_TOKENS_MIN(ntokens, 3);
1071 
1072     if (nkey > KEY_MAX_LENGTH || pr->ntokens < 3) {
1073         pout_string(resp, "CLIENT_ERROR bad command line format");
1074         return;
1075     }
1076 
1077     if (pr->ntokens > MFLAG_MAX_OPT_LENGTH) {
1078         // TODO: ensure the command tokenizer gives us at least this many
1079         pout_errstring(resp, "CLIENT_ERROR options flags are too long");
1080         return;
1081     }
1082 
1083     // We need to at least try to get the size to properly slurp bad bytes
1084     // after an error.
1085     // we pass in the first token that should be a flag.
1086     if (_meta_flag_preparse(pr, 3, &of, &errstr) != 0) {
1087         goto error;
1088     }
1089 
1090     rel_time_t exptime = of.exptime;
1091     // "mode switch" to alternative commands
1092     switch (of.mode) {
1093         case 0:
1094             break; // no mode supplied.
1095         case 'E': // Add...
1096             comm = NREAD_ADD;
1097             break;
1098         case 'A': // Append.
1099             if (of.vivify) {
1100                 comm = NREAD_APPENDVIV;
1101                 exptime = of.autoviv_exptime;
1102             } else {
1103                 comm = NREAD_APPEND;
1104             }
1105             break;
1106         case 'P': // Prepend.
1107             if (of.vivify) {
1108                 comm = NREAD_PREPENDVIV;
1109                 exptime = of.autoviv_exptime;
1110             } else {
1111                 comm = NREAD_PREPEND;
1112             }
1113             break;
1114         case 'R': // Replace.
1115             comm = NREAD_REPLACE;
1116             break;
1117         case 'S': // Set. Default.
1118             comm = NREAD_SET;
1119             break;
1120         default:
1121             errstr = "CLIENT_ERROR invalid mode for ms M token";
1122             goto error;
1123     }
1124 
1125     // The item storage function doesn't exactly map to mset.
1126     // If a CAS value is supplied, upgrade default SET mode to CAS mode.
1127     // Also allows REPLACE to work, as REPLACE + CAS works the same as CAS.
1128     // add-with-cas works the same as add; but could only LRU bump if match..
1129     // APPEND/PREPEND allow a simplified CAS check.
1130     if (of.has_cas && (comm == NREAD_SET || comm == NREAD_REPLACE)) {
1131         comm = NREAD_CAS;
1132     }
1133 
1134     it = item_alloc(key, nkey, of.client_flags, exptime, vlen);
1135 
1136     if (it == 0) {
1137         if (! item_size_ok(nkey, of.client_flags, vlen)) {
1138             errstr = "SERVER_ERROR object too large for cache";
1139             pthread_mutex_lock(&t->stats.mutex);
1140             t->stats.store_too_large++;
1141             pthread_mutex_unlock(&t->stats.mutex);
1142         } else {
1143             errstr = "SERVER_ERROR out of memory storing object";
1144             pthread_mutex_lock(&t->stats.mutex);
1145             t->stats.store_no_memory++;
1146             pthread_mutex_unlock(&t->stats.mutex);
1147         }
1148 
1149         /* Avoid stale data persisting in cache because we failed alloc. */
1150         // NOTE: only if SET mode?
1151         it = item_get_locked(key, nkey, t, DONT_UPDATE, &hv);
1152         if (it) {
1153             do_item_unlink(it, hv);
1154             STORAGE_delete(t->storage, it);
1155             do_item_remove(it);
1156         }
1157         item_unlock(hv);
1158 
1159         goto error;
1160     }
1161     ITEM_set_cas(it, of.req_cas_id);
1162 
1163     // data should already be read into the request.
1164 
1165     // Prevent printing back the key in meta commands as garbage.
1166     if (of.key_binary) {
1167         it->it_flags |= ITEM_KEY_BINARY;
1168     }
1169 
1170     bool set_stale = CAS_NO_STALE;
1171     if (of.set_stale && comm == NREAD_CAS) {
1172         set_stale = CAS_ALLOW_STALE;
1173     }
1174     resp->wbytes = p - resp->wbuf;
1175 
1176     pthread_mutex_lock(&t->stats.mutex);
1177     t->stats.slab_stats[ITEM_clsid(it)].set_cmds++;
1178     pthread_mutex_unlock(&t->stats.mutex);
1179 
1180     // complete_nread_proxy() does the data chunk check so all we need to do
1181     // is copy the data.
1182     if (_store_item_copy_from_buf(it, pr->vbuf, it->nbytes) != 0) {
1183         pout_string(resp, "SERVER_ERROR out of memory storing object");
1184         item_remove(it);
1185         return;
1186     }
1187 
1188     uint64_t cas = 0;
1189     int nbytes = 0;
1190     int ret = store_item(it, comm, t, &nbytes, &cas, of.has_cas_in ? of.cas_id_in : get_cas_id(), set_stale);
1191     switch (ret) {
1192         case STORED:
1193           memcpy(p, "HD", 2);
1194           // Only place noreply is used for meta cmds is a nominal response.
1195           if (of.no_reply) {
1196               resp->skip = true;
1197           }
1198           break;
1199         case EXISTS:
1200           memcpy(p, "EX", 2);
1201           break;
1202         case NOT_FOUND:
1203           memcpy(p, "NF", 2);
1204           break;
1205         case NOT_STORED:
1206           memcpy(p, "NS", 2);
1207           break;
1208         default:
1209           pout_errstring(resp, "SERVER_ERROR Unhandled storage type.");
1210           return;
1211 
1212     }
1213     p += 2;
1214 
1215     for (i = pr->keytoken+1; i < pr->ntokens; i++) {
1216         switch (pr->request[pr->tokens[i]]) {
1217             case 'O':
1218                 tlen = _process_token_len(pr, i);
1219                 if (tlen > MFLAG_MAX_OPAQUE_LENGTH) {
1220                     errstr = "CLIENT_ERROR opaque token too long";
1221                     goto error;
1222                 }
1223                 META_SPACE(p);
1224                 memcpy(p, &pr->request[pr->tokens[i]], tlen);
1225                 p += tlen;
1226                 break;
1227             case 'k':
1228                 META_KEY(p, ITEM_key(it), it->nkey, (it->it_flags & ITEM_KEY_BINARY));
1229                 break;
1230             case 'c':
1231                 META_CHAR(p, 'c');
1232                 p = itoa_u64(cas, p);
1233                 break;
1234             case 's':
1235                 // Get final item size, ie from append/prepend
1236                 META_CHAR(p, 's');
1237                 // If the size changed during append/prepend
1238                 if (nbytes != 0) {
1239                     p = itoa_u32(nbytes-2, p);
1240                 } else {
1241                     p = itoa_u32(it->nbytes-2, p);
1242                 }
1243                 break;
1244         }
1245     }
1246 
1247     // We don't need to free pr->vbuf as that is owned by *rq
1248     // either way, there's no c->item or resp->item reference right now.
1249 
1250     memcpy(p, "\r\n", 2);
1251     p += 2;
1252     // we're offset into wbuf, but good convention to track wbytes.
1253     resp->wbytes = p - resp->wbuf;
1254     resp_add_iov(resp, resp->wbuf, resp->wbytes);
1255 
1256     item_remove(it);
1257 
1258     return;
1259 error:
1260     // Note: no errors possible after the item was successfully allocated.
1261     // So we're just looking at dumping error codes and returning.
1262     pout_errstring(resp, errstr);
1263 }
1264 
process_mdelete_cmd(LIBEVENT_THREAD * t,mcp_parser_t * pr,mc_resp * resp)1265 static void process_mdelete_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp) {
1266     const char *key = &pr->request[pr->tokens[pr->keytoken]];
1267     size_t nkey = pr->klen;
1268     item *it = NULL;
1269     int i;
1270     uint32_t hv;
1271     struct _meta_flags of = {0}; // option bitflags.
1272     char *errstr = "CLIENT_ERROR bad command line format";
1273     assert(t != NULL);
1274     // reserve bytes for status code
1275     char *p = resp->wbuf + 2;
1276     int tlen = 0;
1277 
1278     //WANT_TOKENS_MIN(ntokens, 3);
1279 
1280     if (nkey > KEY_MAX_LENGTH) {
1281         pout_string(resp, "CLIENT_ERROR bad command line format");
1282         return;
1283     }
1284 
1285     if (pr->ntokens > MFLAG_MAX_OPT_LENGTH) {
1286         // TODO: ensure the command tokenizer gives us at least this many
1287         pout_errstring(resp, "CLIENT_ERROR options flags are too long");
1288         return;
1289     }
1290 
1291     // scrubs duplicated options and sets flags for how to load the item.
1292     // we pass in the first token that should be a flag.
1293     // FIXME: not using the preparse errstr?
1294     if (_meta_flag_preparse(pr, 2, &of, &errstr) != 0) {
1295         pout_errstring(resp, "CLIENT_ERROR invalid or duplicate flag");
1296         return;
1297     }
1298 
1299     for (i = pr->keytoken+1; i < pr->ntokens; i++) {
1300         switch (pr->request[pr->tokens[i]]) {
1301             // TODO: macro perhaps?
1302             case 'O':
1303                 tlen = _process_token_len(pr, i);
1304                 if (tlen > MFLAG_MAX_OPAQUE_LENGTH) {
1305                     errstr = "CLIENT_ERROR opaque token too long";
1306                     goto error;
1307                 }
1308                 META_SPACE(p);
1309                 memcpy(p, &pr->request[pr->tokens[i]], tlen);
1310                 p += tlen;
1311                 break;
1312             case 'k':
1313                 META_KEY(p, key, nkey, of.key_binary);
1314                 break;
1315         }
1316     }
1317 
1318     it = item_get_locked(key, nkey, t, DONT_UPDATE, &hv);
1319     if (it) {
1320         // allow only deleting/marking if a CAS value matches.
1321         if (of.has_cas && ITEM_get_cas(it) != of.req_cas_id) {
1322             pthread_mutex_lock(&t->stats.mutex);
1323             t->stats.delete_misses++;
1324             pthread_mutex_unlock(&t->stats.mutex);
1325 
1326             memcpy(resp->wbuf, "EX", 2);
1327             goto cleanup;
1328         }
1329 
1330         // If requested, create a new empty tombstone item.
1331         if (of.remove_val) {
1332             item *new_it = item_alloc(key, nkey, of.client_flags, of.exptime, 2);
1333             if (new_it != NULL) {
1334                 memcpy(ITEM_data(new_it), "\r\n", 2);
1335                 if (do_store_item(new_it, NREAD_SET, t, hv, NULL, NULL,
1336                             of.has_cas_in ? of.cas_id_in : ITEM_get_cas(it), CAS_NO_STALE)) {
1337                     do_item_remove(it);
1338                     it = new_it;
1339                 } else {
1340                     do_item_remove(new_it);
1341                     memcpy(resp->wbuf, "NS", 2);
1342                     goto cleanup;
1343                 }
1344             } else {
1345                 errstr = "SERVER_ERROR out of memory";
1346                 goto error;
1347             }
1348         }
1349 
1350         // If we're to set this item as stale, we don't actually want to
1351         // delete it. We mark the stale bit, bump CAS, and update exptime if
1352         // we were supplied a new TTL.
1353         if (of.set_stale) {
1354             if (of.new_ttl) {
1355                 it->exptime = of.exptime;
1356             }
1357             it->it_flags |= ITEM_STALE;
1358             // Also need to remove TOKEN_SENT, so next client can win.
1359             it->it_flags &= ~ITEM_TOKEN_SENT;
1360 
1361             ITEM_set_cas(it, of.has_cas_in ? of.cas_id_in : get_cas_id());
1362             if (of.no_reply)
1363                 resp->skip = true;
1364 
1365             memcpy(resp->wbuf, "HD", 2);
1366         } else {
1367             pthread_mutex_lock(&t->stats.mutex);
1368             t->stats.slab_stats[ITEM_clsid(it)].delete_hits++;
1369             pthread_mutex_unlock(&t->stats.mutex);
1370             LOGGER_LOG(NULL, LOG_DELETIONS, LOGGER_DELETIONS, it, LOG_TYPE_META_DELETE);
1371 
1372             if (!of.remove_val) {
1373                 do_item_unlink(it, hv);
1374                 STORAGE_delete(t->storage, it);
1375             }
1376             if (of.no_reply)
1377                 resp->skip = true;
1378             memcpy(resp->wbuf, "HD", 2);
1379         }
1380         goto cleanup;
1381     } else {
1382         pthread_mutex_lock(&t->stats.mutex);
1383         t->stats.delete_misses++;
1384         pthread_mutex_unlock(&t->stats.mutex);
1385 
1386         memcpy(resp->wbuf, "NF", 2);
1387         goto cleanup;
1388     }
1389 cleanup:
1390     if (it) {
1391         do_item_remove(it);
1392     }
1393     // Item is always returned locked, even if missing.
1394     item_unlock(hv);
1395     resp->wbytes = p - resp->wbuf;
1396     memcpy(resp->wbuf + resp->wbytes, "\r\n", 2);
1397     resp->wbytes += 2;
1398     resp_add_iov(resp, resp->wbuf, resp->wbytes);
1399     //conn_set_state(c, conn_new_cmd);
1400     return;
1401 error:
1402     // cleanup if an error happens after we fetched an item.
1403     if (it) {
1404         do_item_remove(it);
1405         item_unlock(hv);
1406     }
1407     pout_errstring(resp, errstr);
1408 }
1409 
process_marithmetic_cmd(LIBEVENT_THREAD * t,mcp_parser_t * pr,mc_resp * resp)1410 static void process_marithmetic_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp) {
1411     const char *key = &pr->request[pr->tokens[pr->keytoken]];
1412     size_t nkey = pr->klen;
1413     int i;
1414     struct _meta_flags of = {0}; // option bitflags.
1415     char *errstr = "CLIENT_ERROR bad command line format";
1416     assert(t != NULL);
1417     // no reservation (like del/set) since we post-process the status line.
1418     char *p = resp->wbuf;
1419     int tlen = 0;
1420 
1421     // If no argument supplied, incr or decr by one.
1422     of.delta = 1;
1423     of.initial = 0; // redundant, for clarity.
1424     bool incr = true; // default mode is to increment.
1425     bool locked = false;
1426     uint32_t hv = 0;
1427     item *it = NULL; // item returned by do_add_delta.
1428 
1429     //WANT_TOKENS_MIN(ntokens, 3);
1430 
1431     if (nkey > KEY_MAX_LENGTH) {
1432         pout_string(resp, "CLIENT_ERROR bad command line format");
1433         return;
1434     }
1435 
1436     if (pr->ntokens > MFLAG_MAX_OPT_LENGTH) {
1437         // TODO: ensure the command tokenizer gives us at least this many
1438         pout_errstring(resp, "CLIENT_ERROR options flags are too long");
1439         return;
1440     }
1441 
1442     // scrubs duplicated options and sets flags for how to load the item.
1443     // we pass in the first token that should be a flag.
1444     if (_meta_flag_preparse(pr, 2, &of, &errstr) != 0) {
1445         pout_errstring(resp, "CLIENT_ERROR invalid or duplicate flag");
1446         return;
1447     }
1448     //c->noreply = of.no_reply;
1449 
1450     // "mode switch" to alternative commands
1451     switch (of.mode) {
1452         case 0: // no switch supplied.
1453             break;
1454         case 'I': // Incr (default)
1455         case '+':
1456             incr = true;
1457             break;
1458         case 'D': // Decr.
1459         case '-':
1460             incr = false;
1461             break;
1462         default:
1463             errstr = "CLIENT_ERROR invalid mode for ma M token";
1464             goto error;
1465             break;
1466     }
1467 
1468     // take hash value and manually lock item... hold lock during store phase
1469     // on miss and avoid recalculating the hash multiple times.
1470     hv = hash(key, nkey);
1471     item_lock(hv);
1472     locked = true;
1473     char tmpbuf[INCR_MAX_STORAGE_LEN];
1474 
1475     // return a referenced item if it exists, so we can modify it here, rather
1476     // than adding even more parameters to do_add_delta.
1477     bool item_created = false;
1478     uint64_t cas = 0;
1479     switch(do_add_delta(t, key, nkey, incr, of.delta, tmpbuf, &of.req_cas_id, hv, &it)) {
1480     case OK:
1481         //if (c->noreply)
1482         //    resp->skip = true;
1483         // *it was filled, set the status below.
1484         if (of.has_cas_in) {
1485             // override the CAS. slightly inefficient but fixing that can wait
1486             // until the next time do_add_delta is changed.
1487             ITEM_set_cas(it, of.cas_id_in);
1488         }
1489         cas = ITEM_get_cas(it);
1490         break;
1491     case NON_NUMERIC:
1492         errstr = "CLIENT_ERROR cannot increment or decrement non-numeric value";
1493         goto error;
1494         break;
1495     case EOM:
1496         errstr = "SERVER_ERROR out of memory";
1497         goto error;
1498         break;
1499     case DELTA_ITEM_NOT_FOUND:
1500         if (of.vivify) {
1501             itoa_u64(of.initial, tmpbuf);
1502             int vlen = strlen(tmpbuf);
1503 
1504             it = item_alloc(key, nkey, 0, 0, vlen+2);
1505             if (it != NULL) {
1506                 memcpy(ITEM_data(it), tmpbuf, vlen);
1507                 memcpy(ITEM_data(it) + vlen, "\r\n", 2);
1508                 if (do_store_item(it, NREAD_ADD, t, hv, NULL, &cas,
1509                             of.has_cas_in ? of.cas_id_in : get_cas_id(), CAS_NO_STALE)) {
1510                     item_created = true;
1511                 } else {
1512                     // Not sure how we can get here if we're holding the lock.
1513                     memcpy(resp->wbuf, "NS", 2);
1514                 }
1515             } else {
1516                 errstr = "SERVER_ERROR Out of memory allocating new item";
1517                 goto error;
1518             }
1519         } else {
1520             pthread_mutex_lock(&t->stats.mutex);
1521             if (incr) {
1522                 t->stats.incr_misses++;
1523             } else {
1524                 t->stats.decr_misses++;
1525             }
1526             pthread_mutex_unlock(&t->stats.mutex);
1527             // won't have a valid it here.
1528             memcpy(p, "NF", 2);
1529             p += 2;
1530         }
1531         break;
1532     case DELTA_ITEM_CAS_MISMATCH:
1533         // also returns without a valid it.
1534         memcpy(p, "EX", 2);
1535         p += 2;
1536         break;
1537     }
1538 
1539     // final loop
1540     // allows building the response with information after vivifying from a
1541     // miss, or returning a new CAS value after add_delta().
1542     if (it) {
1543         size_t vlen = strlen(tmpbuf);
1544         if (of.value) {
1545             memcpy(p, "VA ", 3);
1546             p = itoa_u32(vlen, p+3);
1547         } else {
1548             memcpy(p, "HD", 2);
1549             p += 2;
1550         }
1551 
1552         for (i = pr->keytoken+1; i < pr->ntokens; i++) {
1553             switch (pr->request[pr->tokens[i]]) {
1554                 case 'c':
1555                     META_CHAR(p, 'c');
1556                     p = itoa_u64(cas, p);
1557                     break;
1558                 case 't':
1559                     META_CHAR(p, 't');
1560                     if (it->exptime == 0) {
1561                         *p = '-';
1562                         *(p+1) = '1';
1563                         p += 2;
1564                     } else {
1565                         p = itoa_u32(it->exptime - current_time, p);
1566                     }
1567                     break;
1568                 case 'T':
1569                     it->exptime = of.exptime;
1570                     break;
1571                 case 'N':
1572                     if (item_created) {
1573                         it->exptime = of.autoviv_exptime;
1574                     }
1575                     break;
1576                 case 'O':
1577                     tlen = _process_token_len(pr, i);
1578                     if (tlen > MFLAG_MAX_OPAQUE_LENGTH) {
1579                         errstr = "CLIENT_ERROR opaque token too long";
1580                         goto error;
1581                     }
1582                     META_SPACE(p);
1583                     memcpy(p, &pr->request[pr->tokens[i]], tlen);
1584                     p += tlen;
1585                     break;
1586                 case 'k':
1587                     META_KEY(p, key, nkey, of.key_binary);
1588                     break;
1589             }
1590         }
1591 
1592         if (of.value) {
1593             *p = '\r';
1594             *(p+1) = '\n';
1595             p += 2;
1596             memcpy(p, tmpbuf, vlen);
1597             p += vlen;
1598         }
1599 
1600         do_item_remove(it);
1601     } else {
1602         // No item to handle. still need to return opaque/key tokens
1603         for (i = pr->keytoken+1; i < pr->ntokens; i++) {
1604             switch (pr->request[pr->tokens[i]]) {
1605                 case 'O':
1606                     tlen = _process_token_len(pr, i);
1607                     if (tlen > MFLAG_MAX_OPAQUE_LENGTH) {
1608                         errstr = "CLIENT_ERROR opaque token too long";
1609                         goto error;
1610                     }
1611                     META_SPACE(p);
1612                     memcpy(p, &pr->request[pr->tokens[i]], tlen);
1613                     p += tlen;
1614                     break;
1615                 case 'k':
1616                     META_KEY(p, key, nkey, of.key_binary);
1617                     break;
1618             }
1619         }
1620     }
1621 
1622     item_unlock(hv);
1623 
1624     resp->wbytes = p - resp->wbuf;
1625     memcpy(resp->wbuf + resp->wbytes, "\r\n", 2);
1626     resp->wbytes += 2;
1627     resp_add_iov(resp, resp->wbuf, resp->wbytes);
1628     return;
1629 error:
1630     if (it != NULL)
1631         do_item_remove(it);
1632     if (locked)
1633         item_unlock(hv);
1634     pout_errstring(resp, errstr);
1635 }
1636 
1637 /*** Lua and internal handler ***/
1638 
mcplib_internal(lua_State * L)1639 int mcplib_internal(lua_State *L) {
1640     luaL_checkudata(L, 1, "mcp.request");
1641     mcp_resp_t *r = lua_newuserdatauv(L, sizeof(mcp_resp_t), 0);
1642     memset(r, 0, sizeof(mcp_resp_t));
1643     luaL_getmetatable(L, "mcp.response");
1644     lua_setmetatable(L, -2);
1645 
1646     lua_pushinteger(L, MCP_YIELD_INTERNAL);
1647     return lua_yield(L, 2);
1648 }
1649 
1650 // we're pretending to be p_c_ascii(), but reusing our already tokenized code.
1651 // the text parser should eventually move to the new tokenizer and we can
1652 // merge all of this code together.
mcplib_internal_run(mcp_rcontext_t * rctx)1653 int mcplib_internal_run(mcp_rcontext_t *rctx) {
1654     lua_State *L = rctx->Lc;
1655     mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request");
1656     mcp_resp_t *r = luaL_checkudata(L, 2, "mcp.response");
1657     mc_resp *resp = resp_start_unlinked(rctx->c);
1658     LIBEVENT_THREAD *t = rctx->c->thread;
1659     mcp_parser_t *pr = &rq->pr;
1660     if (resp == NULL) {
1661         return -1;
1662     }
1663 
1664     // TODO: meta no-op isn't handled here. haven't decided how yet.
1665     switch (rq->pr.command) {
1666         case CMD_MG:
1667             process_mget_cmd(t, pr, resp);
1668             break;
1669         case CMD_MS:
1670             process_mset_cmd(t, pr, resp);
1671             break;
1672         case CMD_MD:
1673             process_mdelete_cmd(t, pr, resp);
1674             break;
1675         case CMD_MA:
1676             process_marithmetic_cmd(t, pr, resp);
1677             break;
1678         case CMD_GET:
1679             process_get_cmd(t, pr, resp, _NO_CAS, _NO_TOUCH);
1680             break;
1681         case CMD_GETS:
1682             process_get_cmd(t, pr, resp, _DO_CAS, _NO_TOUCH);
1683             break;
1684         case CMD_GAT:
1685             process_get_cmd(t, pr, resp, _NO_CAS, _DO_TOUCH);
1686             break;
1687         case CMD_GATS:
1688             process_get_cmd(t, pr, resp, _DO_CAS, _DO_TOUCH);
1689             break;
1690         case CMD_SET:
1691             process_update_cmd(t, pr, resp, NREAD_SET, _NO_CAS);
1692             break;
1693         case CMD_ADD:
1694             process_update_cmd(t, pr, resp, NREAD_ADD, _NO_CAS);
1695             break;
1696         case CMD_APPEND:
1697             process_update_cmd(t, pr, resp, NREAD_APPEND, _NO_CAS);
1698             break;
1699         case CMD_PREPEND:
1700             process_update_cmd(t, pr, resp, NREAD_PREPEND, _NO_CAS);
1701             break;
1702         case CMD_CAS:
1703             process_update_cmd(t, pr, resp, NREAD_CAS, _DO_CAS);
1704             break;
1705         case CMD_REPLACE:
1706             process_update_cmd(t, pr, resp, NREAD_REPLACE, _DO_CAS);
1707             break;
1708         case CMD_INCR:
1709             process_arithmetic_cmd(t, pr, resp, true);
1710             break;
1711         case CMD_DECR:
1712             process_arithmetic_cmd(t, pr, resp, false);
1713             break;
1714         case CMD_DELETE:
1715             process_delete_cmd(t, pr, resp);
1716             break;
1717         case CMD_TOUCH:
1718             process_touch_cmd(t, pr, resp);
1719             break;
1720         default:
1721             resp_free(t, resp);
1722             return -1;
1723     }
1724 
1725     // TODO: I'd like to shortcut the parsing here, but if we want the resp
1726     // object to have full support (ie: resp:line()/etc) it might be necessary
1727     // to still do a full parsing. It might be possible to
1728     // wrap the main commands with something that decorates r->resp directly
1729     // instead of going through a parser to save some CPU.
1730     // Either way this is a lot less code.
1731     mcmc_parse_buf(resp->iov[0].iov_base, resp->iov[0].iov_len, &r->resp);
1732 
1733     if (rq->ascii_multiget) {
1734         if (r->resp.type == MCMC_RESP_GET) {
1735             // Blank out the END. Bad hack.
1736             resp->iovcnt--;
1737         } else if (r->resp.type == MCMC_RESP_END) {
1738             resp->skip = true;
1739         }
1740     }
1741 
1742     // in case someone logs this response it should make sense.
1743     memcpy(r->be_name, "internal", strlen("internal"));
1744     memcpy(r->be_port, "0", 1);
1745 
1746     // TODO: r-> will need status/code/mode copied from resp.
1747     r->cresp = resp;
1748     r->thread = t;
1749     r->cmd = rq->pr.command;
1750     // Always return OK from here as this is signalling an internal error.
1751     r->status = MCMC_OK;
1752 
1753     // resp object is associated with the
1754     // response object, which is about a
1755     // kilobyte.
1756     t->proxy_vm_extra_kb++;
1757 
1758     if (resp->io_pending) {
1759         // TODO (v2): here we move the IO from the temporary resp to the top
1760         // resp, but this feels kludgy so I'm leaving an explicit note to find
1761         // a better way to do this.
1762         rctx->resp->io_pending = resp->io_pending;
1763         resp->io_pending = NULL;
1764 
1765         // Add io object to extstore submission queue.
1766         io_queue_t *q = conn_io_queue_get(rctx->c, IO_QUEUE_EXTSTORE);
1767         io_pending_proxy_t *io = (io_pending_proxy_t *)rctx->resp->io_pending;
1768 
1769         io->eio.next = q->stack_ctx;
1770         q->stack_ctx = &io->eio;
1771         assert(q->count >= 0);
1772         q->count++;
1773 
1774         io->rctx = rctx;
1775         io->c = rctx->c;
1776         io->ascii_multiget = rq->ascii_multiget;
1777         // mark the buffer into the mcp_resp for freeing later.
1778         r->buf = io->eio.buf;
1779         return 1;
1780     }
1781 
1782     return 0;
1783 }
1784