1 /*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include "server.h"
31 #include <sys/uio.h>
32 #include <math.h>
33
34 static void setProtocolError(client *c, int pos);
35
36 /* Return the size consumed from the allocator, for the specified SDS string,
37 * including internal fragmentation. This function is used in order to compute
38 * the client output buffer size. */
sdsZmallocSize(sds s)39 size_t sdsZmallocSize(sds s) {
40 void *sh = sdsAllocPtr(s);
41 return zmalloc_size(sh);
42 }
43
44 /* Return the amount of memory used by the sds string at object->ptr
45 * for a string object. */
getStringObjectSdsUsedMemory(robj * o)46 size_t getStringObjectSdsUsedMemory(robj *o) {
47 serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);
48 switch(o->encoding) {
49 case OBJ_ENCODING_RAW: return sdsZmallocSize(o->ptr);
50 case OBJ_ENCODING_EMBSTR: return zmalloc_size(o)-sizeof(robj);
51 default: return 0; /* Just integer encoding for now. */
52 }
53 }
54
dupClientReplyValue(void * o)55 void *dupClientReplyValue(void *o) {
56 incrRefCount((robj*)o);
57 return o;
58 }
59
listMatchObjects(void * a,void * b)60 int listMatchObjects(void *a, void *b) {
61 return equalStringObjects(a,b);
62 }
63
createClient(int fd)64 client *createClient(int fd) {
65 client *c = zmalloc(sizeof(client));
66
67 /* passing -1 as fd it is possible to create a non connected client.
68 * This is useful since all the commands needs to be executed
69 * in the context of a client. When commands are executed in other
70 * contexts (for instance a Lua script) we need a non connected client. */
71 if (fd != -1) {
72 anetNonBlock(NULL,fd);
73 anetEnableTcpNoDelay(NULL,fd);
74 if (server.tcpkeepalive)
75 anetKeepAlive(NULL,fd,server.tcpkeepalive);
76 if (aeCreateFileEvent(server.el,fd,AE_READABLE,
77 readQueryFromClient, c) == AE_ERR)
78 {
79 close(fd);
80 zfree(c);
81 return NULL;
82 }
83 }
84
85 selectDb(c,0);
86 c->id = server.next_client_id++;
87 c->fd = fd;
88 c->name = NULL;
89 c->bufpos = 0;
90 c->querybuf = sdsempty();
91 c->querybuf_peak = 0;
92 c->reqtype = 0;
93 c->argc = 0;
94 c->argv = NULL;
95 c->cmd = c->lastcmd = NULL;
96 c->multibulklen = 0;
97 c->bulklen = -1;
98 c->sentlen = 0;
99 c->flags = 0;
100 c->ctime = c->lastinteraction = server.unixtime;
101 c->authenticated = 0;
102 c->replstate = REPL_STATE_NONE;
103 c->repl_put_online_on_ack = 0;
104 c->reploff = 0;
105 c->repl_ack_off = 0;
106 c->repl_ack_time = 0;
107 c->slave_listening_port = 0;
108 c->slave_ip[0] = '\0';
109 c->slave_capa = SLAVE_CAPA_NONE;
110 c->reply = listCreate();
111 c->reply_bytes = 0;
112 c->obuf_soft_limit_reached_time = 0;
113 listSetFreeMethod(c->reply,decrRefCountVoid);
114 listSetDupMethod(c->reply,dupClientReplyValue);
115 c->btype = BLOCKED_NONE;
116 c->bpop.timeout = 0;
117 c->bpop.keys = dictCreate(&setDictType,NULL);
118 c->bpop.target = NULL;
119 c->bpop.numreplicas = 0;
120 c->bpop.reploffset = 0;
121 c->woff = 0;
122 c->watched_keys = listCreate();
123 c->pubsub_channels = dictCreate(&setDictType,NULL);
124 c->pubsub_patterns = listCreate();
125 c->peerid = NULL;
126 listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
127 listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
128 if (fd != -1) listAddNodeTail(server.clients,c);
129 initClientMultiState(c);
130 return c;
131 }
132
133 /* This function is called every time we are going to transmit new data
134 * to the client. The behavior is the following:
135 *
136 * If the client should receive new data (normal clients will) the function
137 * returns C_OK, and make sure to install the write handler in our event
138 * loop so that when the socket is writable new data gets written.
139 *
140 * If the client should not receive new data, because it is a fake client
141 * (used to load AOF in memory), a master or because the setup of the write
142 * handler failed, the function returns C_ERR.
143 *
144 * The function may return C_OK without actually installing the write
145 * event handler in the following cases:
146 *
147 * 1) The event handler should already be installed since the output buffer
148 * already contained something.
149 * 2) The client is a slave but not yet online, so we want to just accumulate
150 * writes in the buffer but not actually sending them yet.
151 *
152 * Typically gets called every time a reply is built, before adding more
153 * data to the clients output buffers. If the function returns C_ERR no
154 * data should be appended to the output buffers. */
prepareClientToWrite(client * c)155 int prepareClientToWrite(client *c) {
156 /* If it's the Lua client we always return ok without installing any
157 * handler since there is no socket at all. */
158 if (c->flags & CLIENT_LUA) return C_OK;
159
160 /* CLIENT REPLY OFF / SKIP handling: don't send replies. */
161 if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;
162
163 /* Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag
164 * is set. */
165 if ((c->flags & CLIENT_MASTER) &&
166 !(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;
167
168 if (c->fd <= 0) return C_ERR; /* Fake client for AOF loading. */
169
170 /* Schedule the client to write the output buffers to the socket only
171 * if not already done (there were no pending writes already and the client
172 * was yet not flagged), and, for slaves, if the slave can actually
173 * receive writes at this stage. */
174 if (!clientHasPendingReplies(c) &&
175 !(c->flags & CLIENT_PENDING_WRITE) &&
176 (c->replstate == REPL_STATE_NONE ||
177 (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
178 {
179 /* Here instead of installing the write handler, we just flag the
180 * client and put it into a list of clients that have something
181 * to write to the socket. This way before re-entering the event
182 * loop, we can try to directly write to the client sockets avoiding
183 * a system call. We'll only really install the write handler if
184 * we'll not be able to write the whole reply at once. */
185 c->flags |= CLIENT_PENDING_WRITE;
186 listAddNodeHead(server.clients_pending_write,c);
187 }
188
189 /* Authorize the caller to queue in the output buffer of this client. */
190 return C_OK;
191 }
192
193 /* Create a duplicate of the last object in the reply list when
194 * it is not exclusively owned by the reply list. */
dupLastObjectIfNeeded(list * reply)195 robj *dupLastObjectIfNeeded(list *reply) {
196 robj *new, *cur;
197 listNode *ln;
198 serverAssert(listLength(reply) > 0);
199 ln = listLast(reply);
200 cur = listNodeValue(ln);
201 if (cur->refcount > 1) {
202 new = dupStringObject(cur);
203 decrRefCount(cur);
204 listNodeValue(ln) = new;
205 }
206 return listNodeValue(ln);
207 }
208
209 /* -----------------------------------------------------------------------------
210 * Low level functions to add more data to output buffers.
211 * -------------------------------------------------------------------------- */
212
_addReplyToBuffer(client * c,const char * s,size_t len)213 int _addReplyToBuffer(client *c, const char *s, size_t len) {
214 size_t available = sizeof(c->buf)-c->bufpos;
215
216 if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;
217
218 /* If there already are entries in the reply list, we cannot
219 * add anything more to the static buffer. */
220 if (listLength(c->reply) > 0) return C_ERR;
221
222 /* Check that the buffer has enough space available for this string. */
223 if (len > available) return C_ERR;
224
225 memcpy(c->buf+c->bufpos,s,len);
226 c->bufpos+=len;
227 return C_OK;
228 }
229
_addReplyObjectToList(client * c,robj * o)230 void _addReplyObjectToList(client *c, robj *o) {
231 robj *tail;
232
233 if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
234
235 if (listLength(c->reply) == 0) {
236 incrRefCount(o);
237 listAddNodeTail(c->reply,o);
238 c->reply_bytes += getStringObjectSdsUsedMemory(o);
239 } else {
240 tail = listNodeValue(listLast(c->reply));
241
242 /* Append to this object when possible. */
243 if (tail->ptr != NULL &&
244 tail->encoding == OBJ_ENCODING_RAW &&
245 sdslen(tail->ptr)+sdslen(o->ptr) <= PROTO_REPLY_CHUNK_BYTES)
246 {
247 c->reply_bytes -= sdsZmallocSize(tail->ptr);
248 tail = dupLastObjectIfNeeded(c->reply);
249 tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
250 c->reply_bytes += sdsZmallocSize(tail->ptr);
251 } else {
252 incrRefCount(o);
253 listAddNodeTail(c->reply,o);
254 c->reply_bytes += getStringObjectSdsUsedMemory(o);
255 }
256 }
257 asyncCloseClientOnOutputBufferLimitReached(c);
258 }
259
260 /* This method takes responsibility over the sds. When it is no longer
261 * needed it will be free'd, otherwise it ends up in a robj. */
_addReplySdsToList(client * c,sds s)262 void _addReplySdsToList(client *c, sds s) {
263 robj *tail;
264
265 if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
266 sdsfree(s);
267 return;
268 }
269
270 if (listLength(c->reply) == 0) {
271 listAddNodeTail(c->reply,createObject(OBJ_STRING,s));
272 c->reply_bytes += sdsZmallocSize(s);
273 } else {
274 tail = listNodeValue(listLast(c->reply));
275
276 /* Append to this object when possible. */
277 if (tail->ptr != NULL && tail->encoding == OBJ_ENCODING_RAW &&
278 sdslen(tail->ptr)+sdslen(s) <= PROTO_REPLY_CHUNK_BYTES)
279 {
280 c->reply_bytes -= sdsZmallocSize(tail->ptr);
281 tail = dupLastObjectIfNeeded(c->reply);
282 tail->ptr = sdscatlen(tail->ptr,s,sdslen(s));
283 c->reply_bytes += sdsZmallocSize(tail->ptr);
284 sdsfree(s);
285 } else {
286 listAddNodeTail(c->reply,createObject(OBJ_STRING,s));
287 c->reply_bytes += sdsZmallocSize(s);
288 }
289 }
290 asyncCloseClientOnOutputBufferLimitReached(c);
291 }
292
_addReplyStringToList(client * c,const char * s,size_t len)293 void _addReplyStringToList(client *c, const char *s, size_t len) {
294 robj *tail;
295
296 if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
297
298 if (listLength(c->reply) == 0) {
299 robj *o = createStringObject(s,len);
300
301 listAddNodeTail(c->reply,o);
302 c->reply_bytes += getStringObjectSdsUsedMemory(o);
303 } else {
304 tail = listNodeValue(listLast(c->reply));
305
306 /* Append to this object when possible. */
307 if (tail->ptr != NULL && tail->encoding == OBJ_ENCODING_RAW &&
308 sdslen(tail->ptr)+len <= PROTO_REPLY_CHUNK_BYTES)
309 {
310 c->reply_bytes -= sdsZmallocSize(tail->ptr);
311 tail = dupLastObjectIfNeeded(c->reply);
312 tail->ptr = sdscatlen(tail->ptr,s,len);
313 c->reply_bytes += sdsZmallocSize(tail->ptr);
314 } else {
315 robj *o = createStringObject(s,len);
316
317 listAddNodeTail(c->reply,o);
318 c->reply_bytes += getStringObjectSdsUsedMemory(o);
319 }
320 }
321 asyncCloseClientOnOutputBufferLimitReached(c);
322 }
323
324 /* -----------------------------------------------------------------------------
325 * Higher level functions to queue data on the client output buffer.
326 * The following functions are the ones that commands implementations will call.
327 * -------------------------------------------------------------------------- */
328
addReply(client * c,robj * obj)329 void addReply(client *c, robj *obj) {
330 if (prepareClientToWrite(c) != C_OK) return;
331
332 /* This is an important place where we can avoid copy-on-write
333 * when there is a saving child running, avoiding touching the
334 * refcount field of the object if it's not needed.
335 *
336 * If the encoding is RAW and there is room in the static buffer
337 * we'll be able to send the object to the client without
338 * messing with its page. */
339 if (sdsEncodedObject(obj)) {
340 if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
341 _addReplyObjectToList(c,obj);
342 } else if (obj->encoding == OBJ_ENCODING_INT) {
343 /* Optimization: if there is room in the static buffer for 32 bytes
344 * (more than the max chars a 64 bit integer can take as string) we
345 * avoid decoding the object and go for the lower level approach. */
346 if (listLength(c->reply) == 0 && (sizeof(c->buf) - c->bufpos) >= 32) {
347 char buf[32];
348 int len;
349
350 len = ll2string(buf,sizeof(buf),(long)obj->ptr);
351 if (_addReplyToBuffer(c,buf,len) == C_OK)
352 return;
353 /* else... continue with the normal code path, but should never
354 * happen actually since we verified there is room. */
355 }
356 obj = getDecodedObject(obj);
357 if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
358 _addReplyObjectToList(c,obj);
359 decrRefCount(obj);
360 } else {
361 serverPanic("Wrong obj->encoding in addReply()");
362 }
363 }
364
addReplySds(client * c,sds s)365 void addReplySds(client *c, sds s) {
366 if (prepareClientToWrite(c) != C_OK) {
367 /* The caller expects the sds to be free'd. */
368 sdsfree(s);
369 return;
370 }
371 if (_addReplyToBuffer(c,s,sdslen(s)) == C_OK) {
372 sdsfree(s);
373 } else {
374 /* This method free's the sds when it is no longer needed. */
375 _addReplySdsToList(c,s);
376 }
377 }
378
addReplyString(client * c,const char * s,size_t len)379 void addReplyString(client *c, const char *s, size_t len) {
380 if (prepareClientToWrite(c) != C_OK) return;
381 if (_addReplyToBuffer(c,s,len) != C_OK)
382 _addReplyStringToList(c,s,len);
383 }
384
addReplyErrorLength(client * c,const char * s,size_t len)385 void addReplyErrorLength(client *c, const char *s, size_t len) {
386 addReplyString(c,"-ERR ",5);
387 addReplyString(c,s,len);
388 addReplyString(c,"\r\n",2);
389 }
390
addReplyError(client * c,const char * err)391 void addReplyError(client *c, const char *err) {
392 addReplyErrorLength(c,err,strlen(err));
393 }
394
addReplyErrorFormat(client * c,const char * fmt,...)395 void addReplyErrorFormat(client *c, const char *fmt, ...) {
396 size_t l, j;
397 va_list ap;
398 va_start(ap,fmt);
399 sds s = sdscatvprintf(sdsempty(),fmt,ap);
400 va_end(ap);
401 /* Make sure there are no newlines in the string, otherwise invalid protocol
402 * is emitted. */
403 l = sdslen(s);
404 for (j = 0; j < l; j++) {
405 if (s[j] == '\r' || s[j] == '\n') s[j] = ' ';
406 }
407 addReplyErrorLength(c,s,sdslen(s));
408 sdsfree(s);
409 }
410
addReplyStatusLength(client * c,const char * s,size_t len)411 void addReplyStatusLength(client *c, const char *s, size_t len) {
412 addReplyString(c,"+",1);
413 addReplyString(c,s,len);
414 addReplyString(c,"\r\n",2);
415 }
416
addReplyStatus(client * c,const char * status)417 void addReplyStatus(client *c, const char *status) {
418 addReplyStatusLength(c,status,strlen(status));
419 }
420
addReplyStatusFormat(client * c,const char * fmt,...)421 void addReplyStatusFormat(client *c, const char *fmt, ...) {
422 va_list ap;
423 va_start(ap,fmt);
424 sds s = sdscatvprintf(sdsempty(),fmt,ap);
425 va_end(ap);
426 addReplyStatusLength(c,s,sdslen(s));
427 sdsfree(s);
428 }
429
430 /* Adds an empty object to the reply list that will contain the multi bulk
431 * length, which is not known when this function is called. */
addDeferredMultiBulkLength(client * c)432 void *addDeferredMultiBulkLength(client *c) {
433 /* Note that we install the write event here even if the object is not
434 * ready to be sent, since we are sure that before returning to the
435 * event loop setDeferredMultiBulkLength() will be called. */
436 if (prepareClientToWrite(c) != C_OK) return NULL;
437 listAddNodeTail(c->reply,createObject(OBJ_STRING,NULL));
438 return listLast(c->reply);
439 }
440
441 /* Populate the length object and try gluing it to the next chunk. */
setDeferredMultiBulkLength(client * c,void * node,long length)442 void setDeferredMultiBulkLength(client *c, void *node, long length) {
443 listNode *ln = (listNode*)node;
444 robj *len, *next;
445
446 /* Abort when *node is NULL (see addDeferredMultiBulkLength). */
447 if (node == NULL) return;
448
449 len = listNodeValue(ln);
450 len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length);
451 len->encoding = OBJ_ENCODING_RAW; /* in case it was an EMBSTR. */
452 c->reply_bytes += sdsZmallocSize(len->ptr);
453 if (ln->next != NULL) {
454 next = listNodeValue(ln->next);
455
456 /* Only glue when the next node is non-NULL (an sds in this case) */
457 if (next->ptr != NULL) {
458 c->reply_bytes -= sdsZmallocSize(len->ptr);
459 c->reply_bytes -= getStringObjectSdsUsedMemory(next);
460 len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr));
461 c->reply_bytes += sdsZmallocSize(len->ptr);
462 listDelNode(c->reply,ln->next);
463 }
464 }
465 asyncCloseClientOnOutputBufferLimitReached(c);
466 }
467
468 /* Add a double as a bulk reply */
addReplyDouble(client * c,double d)469 void addReplyDouble(client *c, double d) {
470 char dbuf[128], sbuf[128];
471 int dlen, slen;
472 if (isinf(d)) {
473 /* Libc in odd systems (Hi Solaris!) will format infinite in a
474 * different way, so better to handle it in an explicit way. */
475 addReplyBulkCString(c, d > 0 ? "inf" : "-inf");
476 } else {
477 dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
478 slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
479 addReplyString(c,sbuf,slen);
480 }
481 }
482
483 /* Add a long double as a bulk reply, but uses a human readable formatting
484 * of the double instead of exposing the crude behavior of doubles to the
485 * dear user. */
addReplyHumanLongDouble(client * c,long double d)486 void addReplyHumanLongDouble(client *c, long double d) {
487 robj *o = createStringObjectFromLongDouble(d,1);
488 addReplyBulk(c,o);
489 decrRefCount(o);
490 }
491
492 /* Add a long long as integer reply or bulk len / multi bulk count.
493 * Basically this is used to output <prefix><long long><crlf>. */
addReplyLongLongWithPrefix(client * c,long long ll,char prefix)494 void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) {
495 char buf[128];
496 int len;
497
498 /* Things like $3\r\n or *2\r\n are emitted very often by the protocol
499 * so we have a few shared objects to use if the integer is small
500 * like it is most of the times. */
501 if (prefix == '*' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) {
502 addReply(c,shared.mbulkhdr[ll]);
503 return;
504 } else if (prefix == '$' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) {
505 addReply(c,shared.bulkhdr[ll]);
506 return;
507 }
508
509 buf[0] = prefix;
510 len = ll2string(buf+1,sizeof(buf)-1,ll);
511 buf[len+1] = '\r';
512 buf[len+2] = '\n';
513 addReplyString(c,buf,len+3);
514 }
515
addReplyLongLong(client * c,long long ll)516 void addReplyLongLong(client *c, long long ll) {
517 if (ll == 0)
518 addReply(c,shared.czero);
519 else if (ll == 1)
520 addReply(c,shared.cone);
521 else
522 addReplyLongLongWithPrefix(c,ll,':');
523 }
524
addReplyMultiBulkLen(client * c,long length)525 void addReplyMultiBulkLen(client *c, long length) {
526 if (length < OBJ_SHARED_BULKHDR_LEN)
527 addReply(c,shared.mbulkhdr[length]);
528 else
529 addReplyLongLongWithPrefix(c,length,'*');
530 }
531
532 /* Create the length prefix of a bulk reply, example: $2234 */
addReplyBulkLen(client * c,robj * obj)533 void addReplyBulkLen(client *c, robj *obj) {
534 size_t len;
535
536 if (sdsEncodedObject(obj)) {
537 len = sdslen(obj->ptr);
538 } else {
539 long n = (long)obj->ptr;
540
541 /* Compute how many bytes will take this integer as a radix 10 string */
542 len = 1;
543 if (n < 0) {
544 len++;
545 n = -n;
546 }
547 while((n = n/10) != 0) {
548 len++;
549 }
550 }
551
552 if (len < OBJ_SHARED_BULKHDR_LEN)
553 addReply(c,shared.bulkhdr[len]);
554 else
555 addReplyLongLongWithPrefix(c,len,'$');
556 }
557
558 /* Add a Redis Object as a bulk reply */
addReplyBulk(client * c,robj * obj)559 void addReplyBulk(client *c, robj *obj) {
560 addReplyBulkLen(c,obj);
561 addReply(c,obj);
562 addReply(c,shared.crlf);
563 }
564
565 /* Add a C buffer as bulk reply */
addReplyBulkCBuffer(client * c,const void * p,size_t len)566 void addReplyBulkCBuffer(client *c, const void *p, size_t len) {
567 addReplyLongLongWithPrefix(c,len,'$');
568 addReplyString(c,p,len);
569 addReply(c,shared.crlf);
570 }
571
572 /* Add sds to reply (takes ownership of sds and frees it) */
addReplyBulkSds(client * c,sds s)573 void addReplyBulkSds(client *c, sds s) {
574 addReplySds(c,sdscatfmt(sdsempty(),"$%u\r\n",
575 (unsigned long)sdslen(s)));
576 addReplySds(c,s);
577 addReply(c,shared.crlf);
578 }
579
580 /* Add a C nul term string as bulk reply */
addReplyBulkCString(client * c,const char * s)581 void addReplyBulkCString(client *c, const char *s) {
582 if (s == NULL) {
583 addReply(c,shared.nullbulk);
584 } else {
585 addReplyBulkCBuffer(c,s,strlen(s));
586 }
587 }
588
589 /* Add a long long as a bulk reply */
addReplyBulkLongLong(client * c,long long ll)590 void addReplyBulkLongLong(client *c, long long ll) {
591 char buf[64];
592 int len;
593
594 len = ll2string(buf,64,ll);
595 addReplyBulkCBuffer(c,buf,len);
596 }
597
598 /* Copy 'src' client output buffers into 'dst' client output buffers.
599 * The function takes care of freeing the old output buffers of the
600 * destination client. */
copyClientOutputBuffer(client * dst,client * src)601 void copyClientOutputBuffer(client *dst, client *src) {
602 listRelease(dst->reply);
603 dst->reply = listDup(src->reply);
604 memcpy(dst->buf,src->buf,src->bufpos);
605 dst->bufpos = src->bufpos;
606 dst->reply_bytes = src->reply_bytes;
607 }
608
609 /* Return true if the specified client has pending reply buffers to write to
610 * the socket. */
clientHasPendingReplies(client * c)611 int clientHasPendingReplies(client *c) {
612 return c->bufpos || listLength(c->reply);
613 }
614
615 #define MAX_ACCEPTS_PER_CALL 1000
acceptCommonHandler(int fd,int flags,char * ip)616 static void acceptCommonHandler(int fd, int flags, char *ip) {
617 client *c;
618 if ((c = createClient(fd)) == NULL) {
619 serverLog(LL_WARNING,
620 "Error registering fd event for the new client: %s (fd=%d)",
621 strerror(errno),fd);
622 close(fd); /* May be already closed, just ignore errors */
623 return;
624 }
625 /* If maxclient directive is set and this is one client more... close the
626 * connection. Note that we create the client instead to check before
627 * for this condition, since now the socket is already set in non-blocking
628 * mode and we can send an error for free using the Kernel I/O */
629 if (listLength(server.clients) > server.maxclients) {
630 char *err = "-ERR max number of clients reached\r\n";
631
632 /* That's a best effort error message, don't check write errors */
633 if (write(c->fd,err,strlen(err)) == -1) {
634 /* Nothing to do, Just to avoid the warning... */
635 }
636 server.stat_rejected_conn++;
637 freeClient(c);
638 return;
639 }
640
641 /* If the server is running in protected mode (the default) and there
642 * is no password set, nor a specific interface is bound, we don't accept
643 * requests from non loopback interfaces. Instead we try to explain the
644 * user what to do to fix it if needed. */
645 if (server.protected_mode &&
646 server.bindaddr_count == 0 &&
647 server.requirepass == NULL &&
648 !(flags & CLIENT_UNIX_SOCKET) &&
649 ip != NULL)
650 {
651 if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) {
652 char *err =
653 "-DENIED Redis is running in protected mode because protected "
654 "mode is enabled, no bind address was specified, no "
655 "authentication password is requested to clients. In this mode "
656 "connections are only accepted from the loopback interface. "
657 "If you want to connect from external computers to Redis you "
658 "may adopt one of the following solutions: "
659 "1) Just disable protected mode sending the command "
660 "'CONFIG SET protected-mode no' from the loopback interface "
661 "by connecting to Redis from the same host the server is "
662 "running, however MAKE SURE Redis is not publicly accessible "
663 "from internet if you do so. Use CONFIG REWRITE to make this "
664 "change permanent. "
665 "2) Alternatively you can just disable the protected mode by "
666 "editing the Redis configuration file, and setting the protected "
667 "mode option to 'no', and then restarting the server. "
668 "3) If you started the server manually just for testing, restart "
669 "it with the '--protected-mode no' option. "
670 "4) Setup a bind address or an authentication password. "
671 "NOTE: You only need to do one of the above things in order for "
672 "the server to start accepting connections from the outside.\r\n";
673 if (write(c->fd,err,strlen(err)) == -1) {
674 /* Nothing to do, Just to avoid the warning... */
675 }
676 server.stat_rejected_conn++;
677 freeClient(c);
678 return;
679 }
680 }
681
682 server.stat_numconnections++;
683 c->flags |= flags;
684 }
685
acceptTcpHandler(aeEventLoop * el,int fd,void * privdata,int mask)686 void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
687 int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
688 char cip[NET_IP_STR_LEN];
689 UNUSED(el);
690 UNUSED(mask);
691 UNUSED(privdata);
692
693 while(max--) {
694 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
695 if (cfd == ANET_ERR) {
696 if (errno != EWOULDBLOCK)
697 serverLog(LL_WARNING,
698 "Accepting client connection: %s", server.neterr);
699 return;
700 }
701 serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
702 acceptCommonHandler(cfd,0,cip);
703 }
704 }
705
acceptUnixHandler(aeEventLoop * el,int fd,void * privdata,int mask)706 void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
707 int cfd, max = MAX_ACCEPTS_PER_CALL;
708 UNUSED(el);
709 UNUSED(mask);
710 UNUSED(privdata);
711
712 while(max--) {
713 cfd = anetUnixAccept(server.neterr, fd);
714 if (cfd == ANET_ERR) {
715 if (errno != EWOULDBLOCK)
716 serverLog(LL_WARNING,
717 "Accepting client connection: %s", server.neterr);
718 return;
719 }
720 serverLog(LL_VERBOSE,"Accepted connection to %s", server.unixsocket);
721 acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL);
722 }
723 }
724
freeClientArgv(client * c)725 static void freeClientArgv(client *c) {
726 int j;
727 for (j = 0; j < c->argc; j++)
728 decrRefCount(c->argv[j]);
729 c->argc = 0;
730 c->cmd = NULL;
731 }
732
733 /* Close all the slaves connections. This is useful in chained replication
734 * when we resync with our own master and want to force all our slaves to
735 * resync with us as well. */
disconnectSlaves(void)736 void disconnectSlaves(void) {
737 while (listLength(server.slaves)) {
738 listNode *ln = listFirst(server.slaves);
739 freeClient((client*)ln->value);
740 }
741 }
742
743 /* Remove the specified client from global lists where the client could
744 * be referenced, not including the Pub/Sub channels.
745 * This is used by freeClient() and replicationCacheMaster(). */
unlinkClient(client * c)746 void unlinkClient(client *c) {
747 listNode *ln;
748
749 /* If this is marked as current client unset it. */
750 if (server.current_client == c) server.current_client = NULL;
751
752 /* Certain operations must be done only if the client has an active socket.
753 * If the client was already unlinked or if it's a "fake client" the
754 * fd is already set to -1. */
755 if (c->fd != -1) {
756 /* Remove from the list of active clients. */
757 ln = listSearchKey(server.clients,c);
758 serverAssert(ln != NULL);
759 listDelNode(server.clients,ln);
760
761 /* Unregister async I/O handlers and close the socket. */
762 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
763 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
764 close(c->fd);
765 c->fd = -1;
766 }
767
768 /* Remove from the list of pending writes if needed. */
769 if (c->flags & CLIENT_PENDING_WRITE) {
770 ln = listSearchKey(server.clients_pending_write,c);
771 serverAssert(ln != NULL);
772 listDelNode(server.clients_pending_write,ln);
773 c->flags &= ~CLIENT_PENDING_WRITE;
774 }
775
776 /* When client was just unblocked because of a blocking operation,
777 * remove it from the list of unblocked clients. */
778 if (c->flags & CLIENT_UNBLOCKED) {
779 ln = listSearchKey(server.unblocked_clients,c);
780 serverAssert(ln != NULL);
781 listDelNode(server.unblocked_clients,ln);
782 c->flags &= ~CLIENT_UNBLOCKED;
783 }
784 }
785
freeClient(client * c)786 void freeClient(client *c) {
787 listNode *ln;
788
789 /* If it is our master that's beging disconnected we should make sure
790 * to cache the state to try a partial resynchronization later.
791 *
792 * Note that before doing this we make sure that the client is not in
793 * some unexpected state, by checking its flags. */
794 if (server.master && c->flags & CLIENT_MASTER) {
795 serverLog(LL_WARNING,"Connection with master lost.");
796 if (!(c->flags & (CLIENT_CLOSE_AFTER_REPLY|
797 CLIENT_CLOSE_ASAP|
798 CLIENT_BLOCKED|
799 CLIENT_UNBLOCKED)))
800 {
801 replicationCacheMaster(c);
802 return;
803 }
804 }
805
806 /* Log link disconnection with slave */
807 if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) {
808 serverLog(LL_WARNING,"Connection with slave %s lost.",
809 replicationGetSlaveName(c));
810 }
811
812 /* Free the query buffer */
813 sdsfree(c->querybuf);
814 c->querybuf = NULL;
815
816 /* Deallocate structures used to block on blocking ops. */
817 if (c->flags & CLIENT_BLOCKED) unblockClient(c);
818 dictRelease(c->bpop.keys);
819
820 /* UNWATCH all the keys */
821 unwatchAllKeys(c);
822 listRelease(c->watched_keys);
823
824 /* Unsubscribe from all the pubsub channels */
825 pubsubUnsubscribeAllChannels(c,0);
826 pubsubUnsubscribeAllPatterns(c,0);
827 dictRelease(c->pubsub_channels);
828 listRelease(c->pubsub_patterns);
829
830 /* Free data structures. */
831 listRelease(c->reply);
832 freeClientArgv(c);
833
834 /* Unlink the client: this will close the socket, remove the I/O
835 * handlers, and remove references of the client from different
836 * places where active clients may be referenced. */
837 unlinkClient(c);
838
839 /* Master/slave cleanup Case 1:
840 * we lost the connection with a slave. */
841 if (c->flags & CLIENT_SLAVE) {
842 if (c->replstate == SLAVE_STATE_SEND_BULK) {
843 if (c->repldbfd != -1) close(c->repldbfd);
844 if (c->replpreamble) sdsfree(c->replpreamble);
845 }
846 list *l = (c->flags & CLIENT_MONITOR) ? server.monitors : server.slaves;
847 ln = listSearchKey(l,c);
848 serverAssert(ln != NULL);
849 listDelNode(l,ln);
850 /* We need to remember the time when we started to have zero
851 * attached slaves, as after some time we'll free the replication
852 * backlog. */
853 if (c->flags & CLIENT_SLAVE && listLength(server.slaves) == 0)
854 server.repl_no_slaves_since = server.unixtime;
855 refreshGoodSlavesCount();
856 }
857
858 /* Master/slave cleanup Case 2:
859 * we lost the connection with the master. */
860 if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection();
861
862 /* If this client was scheduled for async freeing we need to remove it
863 * from the queue. */
864 if (c->flags & CLIENT_CLOSE_ASAP) {
865 ln = listSearchKey(server.clients_to_close,c);
866 serverAssert(ln != NULL);
867 listDelNode(server.clients_to_close,ln);
868 }
869
870 /* Release other dynamically allocated client structure fields,
871 * and finally release the client structure itself. */
872 if (c->name) decrRefCount(c->name);
873 zfree(c->argv);
874 freeClientMultiState(c);
875 sdsfree(c->peerid);
876 zfree(c);
877 }
878
879 /* Schedule a client to free it at a safe time in the serverCron() function.
880 * This function is useful when we need to terminate a client but we are in
881 * a context where calling freeClient() is not possible, because the client
882 * should be valid for the continuation of the flow of the program. */
freeClientAsync(client * c)883 void freeClientAsync(client *c) {
884 if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return;
885 c->flags |= CLIENT_CLOSE_ASAP;
886 listAddNodeTail(server.clients_to_close,c);
887 }
888
freeClientsInAsyncFreeQueue(void)889 void freeClientsInAsyncFreeQueue(void) {
890 while (listLength(server.clients_to_close)) {
891 listNode *ln = listFirst(server.clients_to_close);
892 client *c = listNodeValue(ln);
893
894 c->flags &= ~CLIENT_CLOSE_ASAP;
895 freeClient(c);
896 listDelNode(server.clients_to_close,ln);
897 }
898 }
899
900 /* Write data in output buffers to client. Return C_OK if the client
901 * is still valid after the call, C_ERR if it was freed. */
writeToClient(int fd,client * c,int handler_installed)902 int writeToClient(int fd, client *c, int handler_installed) {
903 ssize_t nwritten = 0, totwritten = 0;
904 size_t objlen;
905 size_t objmem;
906 robj *o;
907
908 while(clientHasPendingReplies(c)) {
909 if (c->bufpos > 0) {
910 nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
911 if (nwritten <= 0) break;
912 c->sentlen += nwritten;
913 totwritten += nwritten;
914
915 /* If the buffer was sent, set bufpos to zero to continue with
916 * the remainder of the reply. */
917 if ((int)c->sentlen == c->bufpos) {
918 c->bufpos = 0;
919 c->sentlen = 0;
920 }
921 } else {
922 o = listNodeValue(listFirst(c->reply));
923 objlen = sdslen(o->ptr);
924 objmem = getStringObjectSdsUsedMemory(o);
925
926 if (objlen == 0) {
927 listDelNode(c->reply,listFirst(c->reply));
928 c->reply_bytes -= objmem;
929 continue;
930 }
931
932 nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
933 if (nwritten <= 0) break;
934 c->sentlen += nwritten;
935 totwritten += nwritten;
936
937 /* If we fully sent the object on head go to the next one */
938 if (c->sentlen == objlen) {
939 listDelNode(c->reply,listFirst(c->reply));
940 c->sentlen = 0;
941 c->reply_bytes -= objmem;
942 }
943 }
944 /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
945 * bytes, in a single threaded server it's a good idea to serve
946 * other clients as well, even if a very large request comes from
947 * super fast link that is always able to accept data (in real world
948 * scenario think about 'KEYS *' against the loopback interface).
949 *
950 * However if we are over the maxmemory limit we ignore that and
951 * just deliver as much data as it is possible to deliver. */
952 server.stat_net_output_bytes += totwritten;
953 if (totwritten > NET_MAX_WRITES_PER_EVENT &&
954 (server.maxmemory == 0 ||
955 zmalloc_used_memory() < server.maxmemory)) break;
956 }
957 if (nwritten == -1) {
958 if (errno == EAGAIN) {
959 nwritten = 0;
960 } else {
961 serverLog(LL_VERBOSE,
962 "Error writing to client: %s", strerror(errno));
963 freeClient(c);
964 return C_ERR;
965 }
966 }
967 if (totwritten > 0) {
968 /* For clients representing masters we don't count sending data
969 * as an interaction, since we always send REPLCONF ACK commands
970 * that take some time to just fill the socket output buffer.
971 * We just rely on data / pings received for timeout detection. */
972 if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
973 }
974 if (!clientHasPendingReplies(c)) {
975 c->sentlen = 0;
976 if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
977
978 /* Close connection after entire reply has been sent. */
979 if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
980 freeClient(c);
981 return C_ERR;
982 }
983 }
984 return C_OK;
985 }
986
987 /* Write event handler. Just send data to the client. */
sendReplyToClient(aeEventLoop * el,int fd,void * privdata,int mask)988 void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
989 UNUSED(el);
990 UNUSED(mask);
991 writeToClient(fd,privdata,1);
992 }
993
994 /* This function is called just before entering the event loop, in the hope
995 * we can just write the replies to the client output buffer without any
996 * need to use a syscall in order to install the writable event handler,
997 * get it called, and so forth. */
handleClientsWithPendingWrites(void)998 int handleClientsWithPendingWrites(void) {
999 listIter li;
1000 listNode *ln;
1001 int processed = listLength(server.clients_pending_write);
1002
1003 listRewind(server.clients_pending_write,&li);
1004 while((ln = listNext(&li))) {
1005 client *c = listNodeValue(ln);
1006 c->flags &= ~CLIENT_PENDING_WRITE;
1007 listDelNode(server.clients_pending_write,ln);
1008
1009 /* Try to write buffers to the client socket. */
1010 if (writeToClient(c->fd,c,0) == C_ERR) continue;
1011
1012 /* If there is nothing left, do nothing. Otherwise install
1013 * the write handler. */
1014 if (clientHasPendingReplies(c) &&
1015 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1016 sendReplyToClient, c) == AE_ERR)
1017 {
1018 freeClientAsync(c);
1019 }
1020 }
1021 return processed;
1022 }
1023
1024 /* resetClient prepare the client to process the next command */
resetClient(client * c)1025 void resetClient(client *c) {
1026 redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL;
1027
1028 freeClientArgv(c);
1029 c->reqtype = 0;
1030 c->multibulklen = 0;
1031 c->bulklen = -1;
1032
1033 /* We clear the ASKING flag as well if we are not inside a MULTI, and
1034 * if what we just executed is not the ASKING command itself. */
1035 if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand)
1036 c->flags &= ~CLIENT_ASKING;
1037
1038 /* Remove the CLIENT_REPLY_SKIP flag if any so that the reply
1039 * to the next command will be sent, but set the flag if the command
1040 * we just processed was "CLIENT REPLY SKIP". */
1041 c->flags &= ~CLIENT_REPLY_SKIP;
1042 if (c->flags & CLIENT_REPLY_SKIP_NEXT) {
1043 c->flags |= CLIENT_REPLY_SKIP;
1044 c->flags &= ~CLIENT_REPLY_SKIP_NEXT;
1045 }
1046 }
1047
processInlineBuffer(client * c)1048 int processInlineBuffer(client *c) {
1049 char *newline;
1050 int argc, j;
1051 sds *argv, aux;
1052 size_t querylen;
1053
1054 /* Search for end of line */
1055 newline = strchr(c->querybuf,'\n');
1056
1057 /* Nothing to do without a \r\n */
1058 if (newline == NULL) {
1059 if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) {
1060 addReplyError(c,"Protocol error: too big inline request");
1061 setProtocolError(c,0);
1062 }
1063 return C_ERR;
1064 }
1065
1066 /* Handle the \r\n case. */
1067 if (newline && newline != c->querybuf && *(newline-1) == '\r')
1068 newline--;
1069
1070 /* Split the input buffer up to the \r\n */
1071 querylen = newline-(c->querybuf);
1072 aux = sdsnewlen(c->querybuf,querylen);
1073 argv = sdssplitargs(aux,&argc);
1074 sdsfree(aux);
1075 if (argv == NULL) {
1076 addReplyError(c,"Protocol error: unbalanced quotes in request");
1077 setProtocolError(c,0);
1078 return C_ERR;
1079 }
1080
1081 /* Newline from slaves can be used to refresh the last ACK time.
1082 * This is useful for a slave to ping back while loading a big
1083 * RDB file. */
1084 if (querylen == 0 && c->flags & CLIENT_SLAVE)
1085 c->repl_ack_time = server.unixtime;
1086
1087 /* Leave data after the first line of the query in the buffer */
1088 sdsrange(c->querybuf,querylen+2,-1);
1089
1090 /* Setup argv array on client structure */
1091 if (argc) {
1092 if (c->argv) zfree(c->argv);
1093 c->argv = zmalloc(sizeof(robj*)*argc);
1094 }
1095
1096 /* Create redis objects for all arguments. */
1097 for (c->argc = 0, j = 0; j < argc; j++) {
1098 if (sdslen(argv[j])) {
1099 c->argv[c->argc] = createObject(OBJ_STRING,argv[j]);
1100 c->argc++;
1101 } else {
1102 sdsfree(argv[j]);
1103 }
1104 }
1105 zfree(argv);
1106 return C_OK;
1107 }
1108
1109 /* Helper function. Trims query buffer to make the function that processes
1110 * multi bulk requests idempotent. */
setProtocolError(client * c,int pos)1111 static void setProtocolError(client *c, int pos) {
1112 if (server.verbosity <= LL_VERBOSE) {
1113 sds client = catClientInfoString(sdsempty(),c);
1114 serverLog(LL_VERBOSE,
1115 "Protocol error from client: %s", client);
1116 sdsfree(client);
1117 }
1118 c->flags |= CLIENT_CLOSE_AFTER_REPLY;
1119 sdsrange(c->querybuf,pos,-1);
1120 }
1121
processMultibulkBuffer(client * c)1122 int processMultibulkBuffer(client *c) {
1123 char *newline = NULL;
1124 int pos = 0, ok;
1125 long long ll;
1126
1127 if (c->multibulklen == 0) {
1128 /* The client should have been reset */
1129 serverAssertWithInfo(c,NULL,c->argc == 0);
1130
1131 /* Multi bulk length cannot be read without a \r\n */
1132 newline = strchr(c->querybuf,'\r');
1133 if (newline == NULL) {
1134 if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) {
1135 addReplyError(c,"Protocol error: too big mbulk count string");
1136 setProtocolError(c,0);
1137 }
1138 return C_ERR;
1139 }
1140
1141 /* Buffer should also contain \n */
1142 if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
1143 return C_ERR;
1144
1145 /* We know for sure there is a whole line since newline != NULL,
1146 * so go ahead and find out the multi bulk length. */
1147 serverAssertWithInfo(c,NULL,c->querybuf[0] == '*');
1148 ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
1149 if (!ok || ll > 1024*1024) {
1150 addReplyError(c,"Protocol error: invalid multibulk length");
1151 setProtocolError(c,pos);
1152 return C_ERR;
1153 }
1154
1155 pos = (newline-c->querybuf)+2;
1156 if (ll <= 0) {
1157 sdsrange(c->querybuf,pos,-1);
1158 return C_OK;
1159 }
1160
1161 c->multibulklen = ll;
1162
1163 /* Setup argv array on client structure */
1164 if (c->argv) zfree(c->argv);
1165 c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
1166 }
1167
1168 serverAssertWithInfo(c,NULL,c->multibulklen > 0);
1169 while(c->multibulklen) {
1170 /* Read bulk length if unknown */
1171 if (c->bulklen == -1) {
1172 newline = strchr(c->querybuf+pos,'\r');
1173 if (newline == NULL) {
1174 if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) {
1175 addReplyError(c,
1176 "Protocol error: too big bulk count string");
1177 setProtocolError(c,0);
1178 return C_ERR;
1179 }
1180 break;
1181 }
1182
1183 /* Buffer should also contain \n */
1184 if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
1185 break;
1186
1187 if (c->querybuf[pos] != '$') {
1188 addReplyErrorFormat(c,
1189 "Protocol error: expected '$', got '%c'",
1190 c->querybuf[pos]);
1191 setProtocolError(c,pos);
1192 return C_ERR;
1193 }
1194
1195 ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
1196 if (!ok || ll < 0 || ll > 512*1024*1024) {
1197 addReplyError(c,"Protocol error: invalid bulk length");
1198 setProtocolError(c,pos);
1199 return C_ERR;
1200 }
1201
1202 pos += newline-(c->querybuf+pos)+2;
1203 if (ll >= PROTO_MBULK_BIG_ARG) {
1204 size_t qblen;
1205
1206 /* If we are going to read a large object from network
1207 * try to make it likely that it will start at c->querybuf
1208 * boundary so that we can optimize object creation
1209 * avoiding a large copy of data. */
1210 sdsrange(c->querybuf,pos,-1);
1211 pos = 0;
1212 qblen = sdslen(c->querybuf);
1213 /* Hint the sds library about the amount of bytes this string is
1214 * going to contain. */
1215 if (qblen < (size_t)ll+2)
1216 c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-qblen);
1217 }
1218 c->bulklen = ll;
1219 }
1220
1221 /* Read bulk argument */
1222 if (sdslen(c->querybuf)-pos < (unsigned)(c->bulklen+2)) {
1223 /* Not enough data (+2 == trailing \r\n) */
1224 break;
1225 } else {
1226 /* Optimization: if the buffer contains JUST our bulk element
1227 * instead of creating a new object by *copying* the sds we
1228 * just use the current sds string. */
1229 if (pos == 0 &&
1230 c->bulklen >= PROTO_MBULK_BIG_ARG &&
1231 (signed) sdslen(c->querybuf) == c->bulklen+2)
1232 {
1233 c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
1234 sdsIncrLen(c->querybuf,-2); /* remove CRLF */
1235 /* Assume that if we saw a fat argument we'll see another one
1236 * likely... */
1237 c->querybuf = sdsnewlen(NULL,c->bulklen+2);
1238 sdsclear(c->querybuf);
1239 pos = 0;
1240 } else {
1241 c->argv[c->argc++] =
1242 createStringObject(c->querybuf+pos,c->bulklen);
1243 pos += c->bulklen+2;
1244 }
1245 c->bulklen = -1;
1246 c->multibulklen--;
1247 }
1248 }
1249
1250 /* Trim to pos */
1251 if (pos) sdsrange(c->querybuf,pos,-1);
1252
1253 /* We're done when c->multibulk == 0 */
1254 if (c->multibulklen == 0) return C_OK;
1255
1256 /* Still not read to process the command */
1257 return C_ERR;
1258 }
1259
processInputBuffer(client * c)1260 void processInputBuffer(client *c) {
1261 server.current_client = c;
1262 /* Keep processing while there is something in the input buffer */
1263 while(sdslen(c->querybuf)) {
1264 /* Return if clients are paused. */
1265 if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
1266
1267 /* Immediately abort if the client is in the middle of something. */
1268 if (c->flags & CLIENT_BLOCKED) break;
1269
1270 /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
1271 * written to the client. Make sure to not let the reply grow after
1272 * this flag has been set (i.e. don't process more commands). */
1273 if (c->flags & CLIENT_CLOSE_AFTER_REPLY) break;
1274
1275 /* Determine request type when unknown. */
1276 if (!c->reqtype) {
1277 if (c->querybuf[0] == '*') {
1278 c->reqtype = PROTO_REQ_MULTIBULK;
1279 } else {
1280 c->reqtype = PROTO_REQ_INLINE;
1281 }
1282 }
1283
1284 if (c->reqtype == PROTO_REQ_INLINE) {
1285 if (processInlineBuffer(c) != C_OK) break;
1286 } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
1287 if (processMultibulkBuffer(c) != C_OK) break;
1288 } else {
1289 serverPanic("Unknown request type");
1290 }
1291
1292 /* Multibulk processing could see a <= 0 length. */
1293 if (c->argc == 0) {
1294 resetClient(c);
1295 } else {
1296 /* Only reset the client when the command was executed. */
1297 if (processCommand(c) == C_OK)
1298 resetClient(c);
1299 /* freeMemoryIfNeeded may flush slave output buffers. This may result
1300 * into a slave, that may be the active client, to be freed. */
1301 if (server.current_client == NULL) break;
1302 }
1303 }
1304 server.current_client = NULL;
1305 }
1306
readQueryFromClient(aeEventLoop * el,int fd,void * privdata,int mask)1307 void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1308 client *c = (client*) privdata;
1309 int nread, readlen;
1310 size_t qblen;
1311 UNUSED(el);
1312 UNUSED(mask);
1313
1314 readlen = PROTO_IOBUF_LEN;
1315 /* If this is a multi bulk request, and we are processing a bulk reply
1316 * that is large enough, try to maximize the probability that the query
1317 * buffer contains exactly the SDS string representing the object, even
1318 * at the risk of requiring more read(2) calls. This way the function
1319 * processMultiBulkBuffer() can avoid copying buffers to create the
1320 * Redis Object representing the argument. */
1321 if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
1322 && c->bulklen >= PROTO_MBULK_BIG_ARG)
1323 {
1324 int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);
1325
1326 if (remaining < readlen) readlen = remaining;
1327 }
1328
1329 qblen = sdslen(c->querybuf);
1330 if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
1331 c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
1332 nread = read(fd, c->querybuf+qblen, readlen);
1333 if (nread == -1) {
1334 if (errno == EAGAIN) {
1335 return;
1336 } else {
1337 serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
1338 freeClient(c);
1339 return;
1340 }
1341 } else if (nread == 0) {
1342 serverLog(LL_VERBOSE, "Client closed connection");
1343 freeClient(c);
1344 return;
1345 }
1346
1347 sdsIncrLen(c->querybuf,nread);
1348 c->lastinteraction = server.unixtime;
1349 if (c->flags & CLIENT_MASTER) c->reploff += nread;
1350 server.stat_net_input_bytes += nread;
1351 if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
1352 sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
1353
1354 bytes = sdscatrepr(bytes,c->querybuf,64);
1355 serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
1356 sdsfree(ci);
1357 sdsfree(bytes);
1358 freeClient(c);
1359 return;
1360 }
1361 processInputBuffer(c);
1362 }
1363
getClientsMaxBuffers(unsigned long * longest_output_list,unsigned long * biggest_input_buffer)1364 void getClientsMaxBuffers(unsigned long *longest_output_list,
1365 unsigned long *biggest_input_buffer) {
1366 client *c;
1367 listNode *ln;
1368 listIter li;
1369 unsigned long lol = 0, bib = 0;
1370
1371 listRewind(server.clients,&li);
1372 while ((ln = listNext(&li)) != NULL) {
1373 c = listNodeValue(ln);
1374
1375 if (listLength(c->reply) > lol) lol = listLength(c->reply);
1376 if (sdslen(c->querybuf) > bib) bib = sdslen(c->querybuf);
1377 }
1378 *longest_output_list = lol;
1379 *biggest_input_buffer = bib;
1380 }
1381
1382 /* A Redis "Peer ID" is a colon separated ip:port pair.
1383 * For IPv4 it's in the form x.y.z.k:port, example: "127.0.0.1:1234".
1384 * For IPv6 addresses we use [] around the IP part, like in "[::1]:1234".
1385 * For Unix sockets we use path:0, like in "/tmp/redis:0".
1386 *
1387 * A Peer ID always fits inside a buffer of NET_PEER_ID_LEN bytes, including
1388 * the null term.
1389 *
1390 * On failure the function still populates 'peerid' with the "?:0" string
1391 * in case you want to relax error checking or need to display something
1392 * anyway (see anetPeerToString implementation for more info). */
genClientPeerId(client * client,char * peerid,size_t peerid_len)1393 void genClientPeerId(client *client, char *peerid,
1394 size_t peerid_len) {
1395 if (client->flags & CLIENT_UNIX_SOCKET) {
1396 /* Unix socket client. */
1397 snprintf(peerid,peerid_len,"%s:0",server.unixsocket);
1398 } else {
1399 /* TCP client. */
1400 anetFormatPeer(client->fd,peerid,peerid_len);
1401 }
1402 }
1403
1404 /* This function returns the client peer id, by creating and caching it
1405 * if client->peerid is NULL, otherwise returning the cached value.
1406 * The Peer ID never changes during the life of the client, however it
1407 * is expensive to compute. */
getClientPeerId(client * c)1408 char *getClientPeerId(client *c) {
1409 char peerid[NET_PEER_ID_LEN];
1410
1411 if (c->peerid == NULL) {
1412 genClientPeerId(c,peerid,sizeof(peerid));
1413 c->peerid = sdsnew(peerid);
1414 }
1415 return c->peerid;
1416 }
1417
1418 /* Concatenate a string representing the state of a client in an human
1419 * readable format, into the sds string 's'. */
catClientInfoString(sds s,client * client)1420 sds catClientInfoString(sds s, client *client) {
1421 char flags[16], events[3], *p;
1422 int emask;
1423
1424 p = flags;
1425 if (client->flags & CLIENT_SLAVE) {
1426 if (client->flags & CLIENT_MONITOR)
1427 *p++ = 'O';
1428 else
1429 *p++ = 'S';
1430 }
1431 if (client->flags & CLIENT_MASTER) *p++ = 'M';
1432 if (client->flags & CLIENT_MULTI) *p++ = 'x';
1433 if (client->flags & CLIENT_BLOCKED) *p++ = 'b';
1434 if (client->flags & CLIENT_DIRTY_CAS) *p++ = 'd';
1435 if (client->flags & CLIENT_CLOSE_AFTER_REPLY) *p++ = 'c';
1436 if (client->flags & CLIENT_UNBLOCKED) *p++ = 'u';
1437 if (client->flags & CLIENT_CLOSE_ASAP) *p++ = 'A';
1438 if (client->flags & CLIENT_UNIX_SOCKET) *p++ = 'U';
1439 if (client->flags & CLIENT_READONLY) *p++ = 'r';
1440 if (p == flags) *p++ = 'N';
1441 *p++ = '\0';
1442
1443 emask = client->fd == -1 ? 0 : aeGetFileEvents(server.el,client->fd);
1444 p = events;
1445 if (emask & AE_READABLE) *p++ = 'r';
1446 if (emask & AE_WRITABLE) *p++ = 'w';
1447 *p = '\0';
1448 return sdscatfmt(s,
1449 "id=%U addr=%s fd=%i name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U obl=%U oll=%U omem=%U events=%s cmd=%s",
1450 (unsigned long long) client->id,
1451 getClientPeerId(client),
1452 client->fd,
1453 client->name ? (char*)client->name->ptr : "",
1454 (long long)(server.unixtime - client->ctime),
1455 (long long)(server.unixtime - client->lastinteraction),
1456 flags,
1457 client->db->id,
1458 (int) dictSize(client->pubsub_channels),
1459 (int) listLength(client->pubsub_patterns),
1460 (client->flags & CLIENT_MULTI) ? client->mstate.count : -1,
1461 (unsigned long long) sdslen(client->querybuf),
1462 (unsigned long long) sdsavail(client->querybuf),
1463 (unsigned long long) client->bufpos,
1464 (unsigned long long) listLength(client->reply),
1465 (unsigned long long) getClientOutputBufferMemoryUsage(client),
1466 events,
1467 client->lastcmd ? client->lastcmd->name : "NULL");
1468 }
1469
getAllClientsInfoString(void)1470 sds getAllClientsInfoString(void) {
1471 listNode *ln;
1472 listIter li;
1473 client *client;
1474 sds o = sdsnewlen(NULL,200*listLength(server.clients));
1475 sdsclear(o);
1476 listRewind(server.clients,&li);
1477 while ((ln = listNext(&li)) != NULL) {
1478 client = listNodeValue(ln);
1479 o = catClientInfoString(o,client);
1480 o = sdscatlen(o,"\n",1);
1481 }
1482 return o;
1483 }
1484
clientCommand(client * c)1485 void clientCommand(client *c) {
1486 listNode *ln;
1487 listIter li;
1488 client *client;
1489
1490 if (!strcasecmp(c->argv[1]->ptr,"list") && c->argc == 2) {
1491 /* CLIENT LIST */
1492 sds o = getAllClientsInfoString();
1493 addReplyBulkCBuffer(c,o,sdslen(o));
1494 sdsfree(o);
1495 } else if (!strcasecmp(c->argv[1]->ptr,"reply") && c->argc == 3) {
1496 /* CLIENT REPLY ON|OFF|SKIP */
1497 if (!strcasecmp(c->argv[2]->ptr,"on")) {
1498 c->flags &= ~(CLIENT_REPLY_SKIP|CLIENT_REPLY_OFF);
1499 addReply(c,shared.ok);
1500 } else if (!strcasecmp(c->argv[2]->ptr,"off")) {
1501 c->flags |= CLIENT_REPLY_OFF;
1502 } else if (!strcasecmp(c->argv[2]->ptr,"skip")) {
1503 if (!(c->flags & CLIENT_REPLY_OFF))
1504 c->flags |= CLIENT_REPLY_SKIP_NEXT;
1505 } else {
1506 addReply(c,shared.syntaxerr);
1507 return;
1508 }
1509 } else if (!strcasecmp(c->argv[1]->ptr,"kill")) {
1510 /* CLIENT KILL <ip:port>
1511 * CLIENT KILL <option> [value] ... <option> [value] */
1512 char *addr = NULL;
1513 int type = -1;
1514 uint64_t id = 0;
1515 int skipme = 1;
1516 int killed = 0, close_this_client = 0;
1517
1518 if (c->argc == 3) {
1519 /* Old style syntax: CLIENT KILL <addr> */
1520 addr = c->argv[2]->ptr;
1521 skipme = 0; /* With the old form, you can kill yourself. */
1522 } else if (c->argc > 3) {
1523 int i = 2; /* Next option index. */
1524
1525 /* New style syntax: parse options. */
1526 while(i < c->argc) {
1527 int moreargs = c->argc > i+1;
1528
1529 if (!strcasecmp(c->argv[i]->ptr,"id") && moreargs) {
1530 long long tmp;
1531
1532 if (getLongLongFromObjectOrReply(c,c->argv[i+1],&tmp,NULL)
1533 != C_OK) return;
1534 id = tmp;
1535 } else if (!strcasecmp(c->argv[i]->ptr,"type") && moreargs) {
1536 type = getClientTypeByName(c->argv[i+1]->ptr);
1537 if (type == -1) {
1538 addReplyErrorFormat(c,"Unknown client type '%s'",
1539 (char*) c->argv[i+1]->ptr);
1540 return;
1541 }
1542 } else if (!strcasecmp(c->argv[i]->ptr,"addr") && moreargs) {
1543 addr = c->argv[i+1]->ptr;
1544 } else if (!strcasecmp(c->argv[i]->ptr,"skipme") && moreargs) {
1545 if (!strcasecmp(c->argv[i+1]->ptr,"yes")) {
1546 skipme = 1;
1547 } else if (!strcasecmp(c->argv[i+1]->ptr,"no")) {
1548 skipme = 0;
1549 } else {
1550 addReply(c,shared.syntaxerr);
1551 return;
1552 }
1553 } else {
1554 addReply(c,shared.syntaxerr);
1555 return;
1556 }
1557 i += 2;
1558 }
1559 } else {
1560 addReply(c,shared.syntaxerr);
1561 return;
1562 }
1563
1564 /* Iterate clients killing all the matching clients. */
1565 listRewind(server.clients,&li);
1566 while ((ln = listNext(&li)) != NULL) {
1567 client = listNodeValue(ln);
1568 if (addr && strcmp(getClientPeerId(client),addr) != 0) continue;
1569 if (type != -1 && getClientType(client) != type) continue;
1570 if (id != 0 && client->id != id) continue;
1571 if (c == client && skipme) continue;
1572
1573 /* Kill it. */
1574 if (c == client) {
1575 close_this_client = 1;
1576 } else {
1577 freeClient(client);
1578 }
1579 killed++;
1580 }
1581
1582 /* Reply according to old/new format. */
1583 if (c->argc == 3) {
1584 if (killed == 0)
1585 addReplyError(c,"No such client");
1586 else
1587 addReply(c,shared.ok);
1588 } else {
1589 addReplyLongLong(c,killed);
1590 }
1591
1592 /* If this client has to be closed, flag it as CLOSE_AFTER_REPLY
1593 * only after we queued the reply to its output buffers. */
1594 if (close_this_client) c->flags |= CLIENT_CLOSE_AFTER_REPLY;
1595 } else if (!strcasecmp(c->argv[1]->ptr,"setname") && c->argc == 3) {
1596 int j, len = sdslen(c->argv[2]->ptr);
1597 char *p = c->argv[2]->ptr;
1598
1599 /* Setting the client name to an empty string actually removes
1600 * the current name. */
1601 if (len == 0) {
1602 if (c->name) decrRefCount(c->name);
1603 c->name = NULL;
1604 addReply(c,shared.ok);
1605 return;
1606 }
1607
1608 /* Otherwise check if the charset is ok. We need to do this otherwise
1609 * CLIENT LIST format will break. You should always be able to
1610 * split by space to get the different fields. */
1611 for (j = 0; j < len; j++) {
1612 if (p[j] < '!' || p[j] > '~') { /* ASCII is assumed. */
1613 addReplyError(c,
1614 "Client names cannot contain spaces, "
1615 "newlines or special characters.");
1616 return;
1617 }
1618 }
1619 if (c->name) decrRefCount(c->name);
1620 c->name = c->argv[2];
1621 incrRefCount(c->name);
1622 addReply(c,shared.ok);
1623 } else if (!strcasecmp(c->argv[1]->ptr,"getname") && c->argc == 2) {
1624 if (c->name)
1625 addReplyBulk(c,c->name);
1626 else
1627 addReply(c,shared.nullbulk);
1628 } else if (!strcasecmp(c->argv[1]->ptr,"pause") && c->argc == 3) {
1629 long long duration;
1630
1631 if (getTimeoutFromObjectOrReply(c,c->argv[2],&duration,UNIT_MILLISECONDS)
1632 != C_OK) return;
1633 pauseClients(duration);
1634 addReply(c,shared.ok);
1635 } else {
1636 addReplyError(c, "Syntax error, try CLIENT (LIST | KILL | GETNAME | SETNAME | PAUSE | REPLY)");
1637 }
1638 }
1639
1640 /* Rewrite the command vector of the client. All the new objects ref count
1641 * is incremented. The old command vector is freed, and the old objects
1642 * ref count is decremented. */
rewriteClientCommandVector(client * c,int argc,...)1643 void rewriteClientCommandVector(client *c, int argc, ...) {
1644 va_list ap;
1645 int j;
1646 robj **argv; /* The new argument vector */
1647
1648 argv = zmalloc(sizeof(robj*)*argc);
1649 va_start(ap,argc);
1650 for (j = 0; j < argc; j++) {
1651 robj *a;
1652
1653 a = va_arg(ap, robj*);
1654 argv[j] = a;
1655 incrRefCount(a);
1656 }
1657 /* We free the objects in the original vector at the end, so we are
1658 * sure that if the same objects are reused in the new vector the
1659 * refcount gets incremented before it gets decremented. */
1660 for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]);
1661 zfree(c->argv);
1662 /* Replace argv and argc with our new versions. */
1663 c->argv = argv;
1664 c->argc = argc;
1665 c->cmd = lookupCommandOrOriginal(c->argv[0]->ptr);
1666 serverAssertWithInfo(c,NULL,c->cmd != NULL);
1667 va_end(ap);
1668 }
1669
1670 /* Completely replace the client command vector with the provided one. */
replaceClientCommandVector(client * c,int argc,robj ** argv)1671 void replaceClientCommandVector(client *c, int argc, robj **argv) {
1672 freeClientArgv(c);
1673 zfree(c->argv);
1674 c->argv = argv;
1675 c->argc = argc;
1676 c->cmd = lookupCommandOrOriginal(c->argv[0]->ptr);
1677 serverAssertWithInfo(c,NULL,c->cmd != NULL);
1678 }
1679
1680 /* Rewrite a single item in the command vector.
1681 * The new val ref count is incremented, and the old decremented.
1682 *
1683 * It is possible to specify an argument over the current size of the
1684 * argument vector: in this case the array of objects gets reallocated
1685 * and c->argc set to the max value. However it's up to the caller to
1686 *
1687 * 1. Make sure there are no "holes" and all the arguments are set.
1688 * 2. If the original argument vector was longer than the one we
1689 * want to end with, it's up to the caller to set c->argc and
1690 * free the no longer used objects on c->argv. */
rewriteClientCommandArgument(client * c,int i,robj * newval)1691 void rewriteClientCommandArgument(client *c, int i, robj *newval) {
1692 robj *oldval;
1693
1694 if (i >= c->argc) {
1695 c->argv = zrealloc(c->argv,sizeof(robj*)*(i+1));
1696 c->argc = i+1;
1697 c->argv[i] = NULL;
1698 }
1699 oldval = c->argv[i];
1700 c->argv[i] = newval;
1701 incrRefCount(newval);
1702 if (oldval) decrRefCount(oldval);
1703
1704 /* If this is the command name make sure to fix c->cmd. */
1705 if (i == 0) {
1706 c->cmd = lookupCommandOrOriginal(c->argv[0]->ptr);
1707 serverAssertWithInfo(c,NULL,c->cmd != NULL);
1708 }
1709 }
1710
1711 /* This function returns the number of bytes that Redis is virtually
1712 * using to store the reply still not read by the client.
1713 * It is "virtual" since the reply output list may contain objects that
1714 * are shared and are not really using additional memory.
1715 *
1716 * The function returns the total sum of the length of all the objects
1717 * stored in the output list, plus the memory used to allocate every
1718 * list node. The static reply buffer is not taken into account since it
1719 * is allocated anyway.
1720 *
1721 * Note: this function is very fast so can be called as many time as
1722 * the caller wishes. The main usage of this function currently is
1723 * enforcing the client output length limits. */
getClientOutputBufferMemoryUsage(client * c)1724 unsigned long getClientOutputBufferMemoryUsage(client *c) {
1725 unsigned long list_item_size = sizeof(listNode)+sizeof(robj);
1726
1727 return c->reply_bytes + (list_item_size*listLength(c->reply));
1728 }
1729
1730 /* Get the class of a client, used in order to enforce limits to different
1731 * classes of clients.
1732 *
1733 * The function will return one of the following:
1734 * CLIENT_TYPE_NORMAL -> Normal client
1735 * CLIENT_TYPE_SLAVE -> Slave or client executing MONITOR command
1736 * CLIENT_TYPE_PUBSUB -> Client subscribed to Pub/Sub channels
1737 * CLIENT_TYPE_MASTER -> The client representing our replication master.
1738 */
getClientType(client * c)1739 int getClientType(client *c) {
1740 if (c->flags & CLIENT_MASTER) return CLIENT_TYPE_MASTER;
1741 if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR))
1742 return CLIENT_TYPE_SLAVE;
1743 if (c->flags & CLIENT_PUBSUB) return CLIENT_TYPE_PUBSUB;
1744 return CLIENT_TYPE_NORMAL;
1745 }
1746
getClientTypeByName(char * name)1747 int getClientTypeByName(char *name) {
1748 if (!strcasecmp(name,"normal")) return CLIENT_TYPE_NORMAL;
1749 else if (!strcasecmp(name,"slave")) return CLIENT_TYPE_SLAVE;
1750 else if (!strcasecmp(name,"pubsub")) return CLIENT_TYPE_PUBSUB;
1751 else if (!strcasecmp(name,"master")) return CLIENT_TYPE_MASTER;
1752 else return -1;
1753 }
1754
getClientTypeName(int class)1755 char *getClientTypeName(int class) {
1756 switch(class) {
1757 case CLIENT_TYPE_NORMAL: return "normal";
1758 case CLIENT_TYPE_SLAVE: return "slave";
1759 case CLIENT_TYPE_PUBSUB: return "pubsub";
1760 case CLIENT_TYPE_MASTER: return "master";
1761 default: return NULL;
1762 }
1763 }
1764
1765 /* The function checks if the client reached output buffer soft or hard
1766 * limit, and also update the state needed to check the soft limit as
1767 * a side effect.
1768 *
1769 * Return value: non-zero if the client reached the soft or the hard limit.
1770 * Otherwise zero is returned. */
checkClientOutputBufferLimits(client * c)1771 int checkClientOutputBufferLimits(client *c) {
1772 int soft = 0, hard = 0, class;
1773 unsigned long used_mem = getClientOutputBufferMemoryUsage(c);
1774
1775 class = getClientType(c);
1776 /* For the purpose of output buffer limiting, masters are handled
1777 * like normal clients. */
1778 if (class == CLIENT_TYPE_MASTER) class = CLIENT_TYPE_NORMAL;
1779
1780 if (server.client_obuf_limits[class].hard_limit_bytes &&
1781 used_mem >= server.client_obuf_limits[class].hard_limit_bytes)
1782 hard = 1;
1783 if (server.client_obuf_limits[class].soft_limit_bytes &&
1784 used_mem >= server.client_obuf_limits[class].soft_limit_bytes)
1785 soft = 1;
1786
1787 /* We need to check if the soft limit is reached continuously for the
1788 * specified amount of seconds. */
1789 if (soft) {
1790 if (c->obuf_soft_limit_reached_time == 0) {
1791 c->obuf_soft_limit_reached_time = server.unixtime;
1792 soft = 0; /* First time we see the soft limit reached */
1793 } else {
1794 time_t elapsed = server.unixtime - c->obuf_soft_limit_reached_time;
1795
1796 if (elapsed <=
1797 server.client_obuf_limits[class].soft_limit_seconds) {
1798 soft = 0; /* The client still did not reached the max number of
1799 seconds for the soft limit to be considered
1800 reached. */
1801 }
1802 }
1803 } else {
1804 c->obuf_soft_limit_reached_time = 0;
1805 }
1806 return soft || hard;
1807 }
1808
1809 /* Asynchronously close a client if soft or hard limit is reached on the
1810 * output buffer size. The caller can check if the client will be closed
1811 * checking if the client CLIENT_CLOSE_ASAP flag is set.
1812 *
1813 * Note: we need to close the client asynchronously because this function is
1814 * called from contexts where the client can't be freed safely, i.e. from the
1815 * lower level functions pushing data inside the client output buffers. */
asyncCloseClientOnOutputBufferLimitReached(client * c)1816 void asyncCloseClientOnOutputBufferLimitReached(client *c) {
1817 serverAssert(c->reply_bytes < SIZE_MAX-(1024*64));
1818 if (c->reply_bytes == 0 || c->flags & CLIENT_CLOSE_ASAP) return;
1819 if (checkClientOutputBufferLimits(c)) {
1820 sds client = catClientInfoString(sdsempty(),c);
1821
1822 freeClientAsync(c);
1823 serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);
1824 sdsfree(client);
1825 }
1826 }
1827
1828 /* Helper function used by freeMemoryIfNeeded() in order to flush slaves
1829 * output buffers without returning control to the event loop.
1830 * This is also called by SHUTDOWN for a best-effort attempt to send
1831 * slaves the latest writes. */
flushSlavesOutputBuffers(void)1832 void flushSlavesOutputBuffers(void) {
1833 listIter li;
1834 listNode *ln;
1835
1836 listRewind(server.slaves,&li);
1837 while((ln = listNext(&li))) {
1838 client *slave = listNodeValue(ln);
1839 int events;
1840
1841 /* Note that the following will not flush output buffers of slaves
1842 * in STATE_ONLINE but having put_online_on_ack set to true: in this
1843 * case the writable event is never installed, since the purpose
1844 * of put_online_on_ack is to postpone the moment it is installed.
1845 * This is what we want since slaves in this state should not receive
1846 * writes before the first ACK. */
1847 events = aeGetFileEvents(server.el,slave->fd);
1848 if (events & AE_WRITABLE &&
1849 slave->replstate == SLAVE_STATE_ONLINE &&
1850 clientHasPendingReplies(slave))
1851 {
1852 writeToClient(slave->fd,slave,0);
1853 }
1854 }
1855 }
1856
1857 /* Pause clients up to the specified unixtime (in ms). While clients
1858 * are paused no command is processed from clients, so the data set can't
1859 * change during that time.
1860 *
1861 * However while this function pauses normal and Pub/Sub clients, slaves are
1862 * still served, so this function can be used on server upgrades where it is
1863 * required that slaves process the latest bytes from the replication stream
1864 * before being turned to masters.
1865 *
1866 * This function is also internally used by Redis Cluster for the manual
1867 * failover procedure implemented by CLUSTER FAILOVER.
1868 *
1869 * The function always succeed, even if there is already a pause in progress.
1870 * In such a case, the pause is extended if the duration is more than the
1871 * time left for the previous duration. However if the duration is smaller
1872 * than the time left for the previous pause, no change is made to the
1873 * left duration. */
pauseClients(mstime_t end)1874 void pauseClients(mstime_t end) {
1875 if (!server.clients_paused || end > server.clients_pause_end_time)
1876 server.clients_pause_end_time = end;
1877 server.clients_paused = 1;
1878 }
1879
1880 /* Return non-zero if clients are currently paused. As a side effect the
1881 * function checks if the pause time was reached and clear it. */
clientsArePaused(void)1882 int clientsArePaused(void) {
1883 if (server.clients_paused &&
1884 server.clients_pause_end_time < server.mstime)
1885 {
1886 listNode *ln;
1887 listIter li;
1888 client *c;
1889
1890 server.clients_paused = 0;
1891
1892 /* Put all the clients in the unblocked clients queue in order to
1893 * force the re-processing of the input buffer if any. */
1894 listRewind(server.clients,&li);
1895 while ((ln = listNext(&li)) != NULL) {
1896 c = listNodeValue(ln);
1897
1898 /* Don't touch slaves and blocked clients. The latter pending
1899 * requests be processed when unblocked. */
1900 if (c->flags & (CLIENT_SLAVE|CLIENT_BLOCKED)) continue;
1901 c->flags |= CLIENT_UNBLOCKED;
1902 listAddNodeTail(server.unblocked_clients,c);
1903 }
1904 }
1905 return server.clients_paused;
1906 }
1907
1908 /* This function is called by Redis in order to process a few events from
1909 * time to time while blocked into some not interruptible operation.
1910 * This allows to reply to clients with the -LOADING error while loading the
1911 * data set at startup or after a full resynchronization with the master
1912 * and so forth.
1913 *
1914 * It calls the event loop in order to process a few events. Specifically we
1915 * try to call the event loop 4 times as long as we receive acknowledge that
1916 * some event was processed, in order to go forward with the accept, read,
1917 * write, close sequence needed to serve a client.
1918 *
1919 * The function returns the total number of events processed. */
processEventsWhileBlocked(void)1920 int processEventsWhileBlocked(void) {
1921 int iterations = 4; /* See the function top-comment. */
1922 int count = 0;
1923 while (iterations--) {
1924 int events = 0;
1925 events += aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
1926 events += handleClientsWithPendingWrites();
1927 if (!events) break;
1928 count += events;
1929 }
1930 return count;
1931 }
1932