xref: /redis-3.2.3/src/networking.c (revision 0a45fbc3)
1 /*
2  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *   * Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *   * Redistributions in binary form must reproduce the above copyright
11  *     notice, this list of conditions and the following disclaimer in the
12  *     documentation and/or other materials provided with the distribution.
13  *   * Neither the name of Redis nor the names of its contributors may be used
14  *     to endorse or promote products derived from this software without
15  *     specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include "server.h"
31 #include <sys/uio.h>
32 #include <math.h>
33 
34 static void setProtocolError(client *c, int pos);
35 
36 /* Return the size consumed from the allocator, for the specified SDS string,
37  * including internal fragmentation. This function is used in order to compute
38  * the client output buffer size. */
sdsZmallocSize(sds s)39 size_t sdsZmallocSize(sds s) {
40     void *sh = sdsAllocPtr(s);
41     return zmalloc_size(sh);
42 }
43 
44 /* Return the amount of memory used by the sds string at object->ptr
45  * for a string object. */
getStringObjectSdsUsedMemory(robj * o)46 size_t getStringObjectSdsUsedMemory(robj *o) {
47     serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);
48     switch(o->encoding) {
49     case OBJ_ENCODING_RAW: return sdsZmallocSize(o->ptr);
50     case OBJ_ENCODING_EMBSTR: return zmalloc_size(o)-sizeof(robj);
51     default: return 0; /* Just integer encoding for now. */
52     }
53 }
54 
dupClientReplyValue(void * o)55 void *dupClientReplyValue(void *o) {
56     incrRefCount((robj*)o);
57     return o;
58 }
59 
listMatchObjects(void * a,void * b)60 int listMatchObjects(void *a, void *b) {
61     return equalStringObjects(a,b);
62 }
63 
createClient(int fd)64 client *createClient(int fd) {
65     client *c = zmalloc(sizeof(client));
66 
67     /* passing -1 as fd it is possible to create a non connected client.
68      * This is useful since all the commands needs to be executed
69      * in the context of a client. When commands are executed in other
70      * contexts (for instance a Lua script) we need a non connected client. */
71     if (fd != -1) {
72         anetNonBlock(NULL,fd);
73         anetEnableTcpNoDelay(NULL,fd);
74         if (server.tcpkeepalive)
75             anetKeepAlive(NULL,fd,server.tcpkeepalive);
76         if (aeCreateFileEvent(server.el,fd,AE_READABLE,
77             readQueryFromClient, c) == AE_ERR)
78         {
79             close(fd);
80             zfree(c);
81             return NULL;
82         }
83     }
84 
85     selectDb(c,0);
86     c->id = server.next_client_id++;
87     c->fd = fd;
88     c->name = NULL;
89     c->bufpos = 0;
90     c->querybuf = sdsempty();
91     c->querybuf_peak = 0;
92     c->reqtype = 0;
93     c->argc = 0;
94     c->argv = NULL;
95     c->cmd = c->lastcmd = NULL;
96     c->multibulklen = 0;
97     c->bulklen = -1;
98     c->sentlen = 0;
99     c->flags = 0;
100     c->ctime = c->lastinteraction = server.unixtime;
101     c->authenticated = 0;
102     c->replstate = REPL_STATE_NONE;
103     c->repl_put_online_on_ack = 0;
104     c->reploff = 0;
105     c->repl_ack_off = 0;
106     c->repl_ack_time = 0;
107     c->slave_listening_port = 0;
108     c->slave_ip[0] = '\0';
109     c->slave_capa = SLAVE_CAPA_NONE;
110     c->reply = listCreate();
111     c->reply_bytes = 0;
112     c->obuf_soft_limit_reached_time = 0;
113     listSetFreeMethod(c->reply,decrRefCountVoid);
114     listSetDupMethod(c->reply,dupClientReplyValue);
115     c->btype = BLOCKED_NONE;
116     c->bpop.timeout = 0;
117     c->bpop.keys = dictCreate(&setDictType,NULL);
118     c->bpop.target = NULL;
119     c->bpop.numreplicas = 0;
120     c->bpop.reploffset = 0;
121     c->woff = 0;
122     c->watched_keys = listCreate();
123     c->pubsub_channels = dictCreate(&setDictType,NULL);
124     c->pubsub_patterns = listCreate();
125     c->peerid = NULL;
126     listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
127     listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
128     if (fd != -1) listAddNodeTail(server.clients,c);
129     initClientMultiState(c);
130     return c;
131 }
132 
133 /* This function is called every time we are going to transmit new data
134  * to the client. The behavior is the following:
135  *
136  * If the client should receive new data (normal clients will) the function
137  * returns C_OK, and make sure to install the write handler in our event
138  * loop so that when the socket is writable new data gets written.
139  *
140  * If the client should not receive new data, because it is a fake client
141  * (used to load AOF in memory), a master or because the setup of the write
142  * handler failed, the function returns C_ERR.
143  *
144  * The function may return C_OK without actually installing the write
145  * event handler in the following cases:
146  *
147  * 1) The event handler should already be installed since the output buffer
148  *    already contained something.
149  * 2) The client is a slave but not yet online, so we want to just accumulate
150  *    writes in the buffer but not actually sending them yet.
151  *
152  * Typically gets called every time a reply is built, before adding more
153  * data to the clients output buffers. If the function returns C_ERR no
154  * data should be appended to the output buffers. */
prepareClientToWrite(client * c)155 int prepareClientToWrite(client *c) {
156     /* If it's the Lua client we always return ok without installing any
157      * handler since there is no socket at all. */
158     if (c->flags & CLIENT_LUA) return C_OK;
159 
160     /* CLIENT REPLY OFF / SKIP handling: don't send replies. */
161     if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;
162 
163     /* Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag
164      * is set. */
165     if ((c->flags & CLIENT_MASTER) &&
166         !(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;
167 
168     if (c->fd <= 0) return C_ERR; /* Fake client for AOF loading. */
169 
170     /* Schedule the client to write the output buffers to the socket only
171      * if not already done (there were no pending writes already and the client
172      * was yet not flagged), and, for slaves, if the slave can actually
173      * receive writes at this stage. */
174     if (!clientHasPendingReplies(c) &&
175         !(c->flags & CLIENT_PENDING_WRITE) &&
176         (c->replstate == REPL_STATE_NONE ||
177          (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
178     {
179         /* Here instead of installing the write handler, we just flag the
180          * client and put it into a list of clients that have something
181          * to write to the socket. This way before re-entering the event
182          * loop, we can try to directly write to the client sockets avoiding
183          * a system call. We'll only really install the write handler if
184          * we'll not be able to write the whole reply at once. */
185         c->flags |= CLIENT_PENDING_WRITE;
186         listAddNodeHead(server.clients_pending_write,c);
187     }
188 
189     /* Authorize the caller to queue in the output buffer of this client. */
190     return C_OK;
191 }
192 
193 /* Create a duplicate of the last object in the reply list when
194  * it is not exclusively owned by the reply list. */
dupLastObjectIfNeeded(list * reply)195 robj *dupLastObjectIfNeeded(list *reply) {
196     robj *new, *cur;
197     listNode *ln;
198     serverAssert(listLength(reply) > 0);
199     ln = listLast(reply);
200     cur = listNodeValue(ln);
201     if (cur->refcount > 1) {
202         new = dupStringObject(cur);
203         decrRefCount(cur);
204         listNodeValue(ln) = new;
205     }
206     return listNodeValue(ln);
207 }
208 
209 /* -----------------------------------------------------------------------------
210  * Low level functions to add more data to output buffers.
211  * -------------------------------------------------------------------------- */
212 
_addReplyToBuffer(client * c,const char * s,size_t len)213 int _addReplyToBuffer(client *c, const char *s, size_t len) {
214     size_t available = sizeof(c->buf)-c->bufpos;
215 
216     if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;
217 
218     /* If there already are entries in the reply list, we cannot
219      * add anything more to the static buffer. */
220     if (listLength(c->reply) > 0) return C_ERR;
221 
222     /* Check that the buffer has enough space available for this string. */
223     if (len > available) return C_ERR;
224 
225     memcpy(c->buf+c->bufpos,s,len);
226     c->bufpos+=len;
227     return C_OK;
228 }
229 
_addReplyObjectToList(client * c,robj * o)230 void _addReplyObjectToList(client *c, robj *o) {
231     robj *tail;
232 
233     if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
234 
235     if (listLength(c->reply) == 0) {
236         incrRefCount(o);
237         listAddNodeTail(c->reply,o);
238         c->reply_bytes += getStringObjectSdsUsedMemory(o);
239     } else {
240         tail = listNodeValue(listLast(c->reply));
241 
242         /* Append to this object when possible. */
243         if (tail->ptr != NULL &&
244             tail->encoding == OBJ_ENCODING_RAW &&
245             sdslen(tail->ptr)+sdslen(o->ptr) <= PROTO_REPLY_CHUNK_BYTES)
246         {
247             c->reply_bytes -= sdsZmallocSize(tail->ptr);
248             tail = dupLastObjectIfNeeded(c->reply);
249             tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
250             c->reply_bytes += sdsZmallocSize(tail->ptr);
251         } else {
252             incrRefCount(o);
253             listAddNodeTail(c->reply,o);
254             c->reply_bytes += getStringObjectSdsUsedMemory(o);
255         }
256     }
257     asyncCloseClientOnOutputBufferLimitReached(c);
258 }
259 
260 /* This method takes responsibility over the sds. When it is no longer
261  * needed it will be free'd, otherwise it ends up in a robj. */
_addReplySdsToList(client * c,sds s)262 void _addReplySdsToList(client *c, sds s) {
263     robj *tail;
264 
265     if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
266         sdsfree(s);
267         return;
268     }
269 
270     if (listLength(c->reply) == 0) {
271         listAddNodeTail(c->reply,createObject(OBJ_STRING,s));
272         c->reply_bytes += sdsZmallocSize(s);
273     } else {
274         tail = listNodeValue(listLast(c->reply));
275 
276         /* Append to this object when possible. */
277         if (tail->ptr != NULL && tail->encoding == OBJ_ENCODING_RAW &&
278             sdslen(tail->ptr)+sdslen(s) <= PROTO_REPLY_CHUNK_BYTES)
279         {
280             c->reply_bytes -= sdsZmallocSize(tail->ptr);
281             tail = dupLastObjectIfNeeded(c->reply);
282             tail->ptr = sdscatlen(tail->ptr,s,sdslen(s));
283             c->reply_bytes += sdsZmallocSize(tail->ptr);
284             sdsfree(s);
285         } else {
286             listAddNodeTail(c->reply,createObject(OBJ_STRING,s));
287             c->reply_bytes += sdsZmallocSize(s);
288         }
289     }
290     asyncCloseClientOnOutputBufferLimitReached(c);
291 }
292 
_addReplyStringToList(client * c,const char * s,size_t len)293 void _addReplyStringToList(client *c, const char *s, size_t len) {
294     robj *tail;
295 
296     if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
297 
298     if (listLength(c->reply) == 0) {
299         robj *o = createStringObject(s,len);
300 
301         listAddNodeTail(c->reply,o);
302         c->reply_bytes += getStringObjectSdsUsedMemory(o);
303     } else {
304         tail = listNodeValue(listLast(c->reply));
305 
306         /* Append to this object when possible. */
307         if (tail->ptr != NULL && tail->encoding == OBJ_ENCODING_RAW &&
308             sdslen(tail->ptr)+len <= PROTO_REPLY_CHUNK_BYTES)
309         {
310             c->reply_bytes -= sdsZmallocSize(tail->ptr);
311             tail = dupLastObjectIfNeeded(c->reply);
312             tail->ptr = sdscatlen(tail->ptr,s,len);
313             c->reply_bytes += sdsZmallocSize(tail->ptr);
314         } else {
315             robj *o = createStringObject(s,len);
316 
317             listAddNodeTail(c->reply,o);
318             c->reply_bytes += getStringObjectSdsUsedMemory(o);
319         }
320     }
321     asyncCloseClientOnOutputBufferLimitReached(c);
322 }
323 
324 /* -----------------------------------------------------------------------------
325  * Higher level functions to queue data on the client output buffer.
326  * The following functions are the ones that commands implementations will call.
327  * -------------------------------------------------------------------------- */
328 
addReply(client * c,robj * obj)329 void addReply(client *c, robj *obj) {
330     if (prepareClientToWrite(c) != C_OK) return;
331 
332     /* This is an important place where we can avoid copy-on-write
333      * when there is a saving child running, avoiding touching the
334      * refcount field of the object if it's not needed.
335      *
336      * If the encoding is RAW and there is room in the static buffer
337      * we'll be able to send the object to the client without
338      * messing with its page. */
339     if (sdsEncodedObject(obj)) {
340         if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
341             _addReplyObjectToList(c,obj);
342     } else if (obj->encoding == OBJ_ENCODING_INT) {
343         /* Optimization: if there is room in the static buffer for 32 bytes
344          * (more than the max chars a 64 bit integer can take as string) we
345          * avoid decoding the object and go for the lower level approach. */
346         if (listLength(c->reply) == 0 && (sizeof(c->buf) - c->bufpos) >= 32) {
347             char buf[32];
348             int len;
349 
350             len = ll2string(buf,sizeof(buf),(long)obj->ptr);
351             if (_addReplyToBuffer(c,buf,len) == C_OK)
352                 return;
353             /* else... continue with the normal code path, but should never
354              * happen actually since we verified there is room. */
355         }
356         obj = getDecodedObject(obj);
357         if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
358             _addReplyObjectToList(c,obj);
359         decrRefCount(obj);
360     } else {
361         serverPanic("Wrong obj->encoding in addReply()");
362     }
363 }
364 
addReplySds(client * c,sds s)365 void addReplySds(client *c, sds s) {
366     if (prepareClientToWrite(c) != C_OK) {
367         /* The caller expects the sds to be free'd. */
368         sdsfree(s);
369         return;
370     }
371     if (_addReplyToBuffer(c,s,sdslen(s)) == C_OK) {
372         sdsfree(s);
373     } else {
374         /* This method free's the sds when it is no longer needed. */
375         _addReplySdsToList(c,s);
376     }
377 }
378 
addReplyString(client * c,const char * s,size_t len)379 void addReplyString(client *c, const char *s, size_t len) {
380     if (prepareClientToWrite(c) != C_OK) return;
381     if (_addReplyToBuffer(c,s,len) != C_OK)
382         _addReplyStringToList(c,s,len);
383 }
384 
addReplyErrorLength(client * c,const char * s,size_t len)385 void addReplyErrorLength(client *c, const char *s, size_t len) {
386     addReplyString(c,"-ERR ",5);
387     addReplyString(c,s,len);
388     addReplyString(c,"\r\n",2);
389 }
390 
addReplyError(client * c,const char * err)391 void addReplyError(client *c, const char *err) {
392     addReplyErrorLength(c,err,strlen(err));
393 }
394 
addReplyErrorFormat(client * c,const char * fmt,...)395 void addReplyErrorFormat(client *c, const char *fmt, ...) {
396     size_t l, j;
397     va_list ap;
398     va_start(ap,fmt);
399     sds s = sdscatvprintf(sdsempty(),fmt,ap);
400     va_end(ap);
401     /* Make sure there are no newlines in the string, otherwise invalid protocol
402      * is emitted. */
403     l = sdslen(s);
404     for (j = 0; j < l; j++) {
405         if (s[j] == '\r' || s[j] == '\n') s[j] = ' ';
406     }
407     addReplyErrorLength(c,s,sdslen(s));
408     sdsfree(s);
409 }
410 
addReplyStatusLength(client * c,const char * s,size_t len)411 void addReplyStatusLength(client *c, const char *s, size_t len) {
412     addReplyString(c,"+",1);
413     addReplyString(c,s,len);
414     addReplyString(c,"\r\n",2);
415 }
416 
addReplyStatus(client * c,const char * status)417 void addReplyStatus(client *c, const char *status) {
418     addReplyStatusLength(c,status,strlen(status));
419 }
420 
addReplyStatusFormat(client * c,const char * fmt,...)421 void addReplyStatusFormat(client *c, const char *fmt, ...) {
422     va_list ap;
423     va_start(ap,fmt);
424     sds s = sdscatvprintf(sdsempty(),fmt,ap);
425     va_end(ap);
426     addReplyStatusLength(c,s,sdslen(s));
427     sdsfree(s);
428 }
429 
430 /* Adds an empty object to the reply list that will contain the multi bulk
431  * length, which is not known when this function is called. */
addDeferredMultiBulkLength(client * c)432 void *addDeferredMultiBulkLength(client *c) {
433     /* Note that we install the write event here even if the object is not
434      * ready to be sent, since we are sure that before returning to the
435      * event loop setDeferredMultiBulkLength() will be called. */
436     if (prepareClientToWrite(c) != C_OK) return NULL;
437     listAddNodeTail(c->reply,createObject(OBJ_STRING,NULL));
438     return listLast(c->reply);
439 }
440 
441 /* Populate the length object and try gluing it to the next chunk. */
setDeferredMultiBulkLength(client * c,void * node,long length)442 void setDeferredMultiBulkLength(client *c, void *node, long length) {
443     listNode *ln = (listNode*)node;
444     robj *len, *next;
445 
446     /* Abort when *node is NULL (see addDeferredMultiBulkLength). */
447     if (node == NULL) return;
448 
449     len = listNodeValue(ln);
450     len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length);
451     len->encoding = OBJ_ENCODING_RAW; /* in case it was an EMBSTR. */
452     c->reply_bytes += sdsZmallocSize(len->ptr);
453     if (ln->next != NULL) {
454         next = listNodeValue(ln->next);
455 
456         /* Only glue when the next node is non-NULL (an sds in this case) */
457         if (next->ptr != NULL) {
458             c->reply_bytes -= sdsZmallocSize(len->ptr);
459             c->reply_bytes -= getStringObjectSdsUsedMemory(next);
460             len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr));
461             c->reply_bytes += sdsZmallocSize(len->ptr);
462             listDelNode(c->reply,ln->next);
463         }
464     }
465     asyncCloseClientOnOutputBufferLimitReached(c);
466 }
467 
468 /* Add a double as a bulk reply */
addReplyDouble(client * c,double d)469 void addReplyDouble(client *c, double d) {
470     char dbuf[128], sbuf[128];
471     int dlen, slen;
472     if (isinf(d)) {
473         /* Libc in odd systems (Hi Solaris!) will format infinite in a
474          * different way, so better to handle it in an explicit way. */
475         addReplyBulkCString(c, d > 0 ? "inf" : "-inf");
476     } else {
477         dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
478         slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
479         addReplyString(c,sbuf,slen);
480     }
481 }
482 
483 /* Add a long double as a bulk reply, but uses a human readable formatting
484  * of the double instead of exposing the crude behavior of doubles to the
485  * dear user. */
addReplyHumanLongDouble(client * c,long double d)486 void addReplyHumanLongDouble(client *c, long double d) {
487     robj *o = createStringObjectFromLongDouble(d,1);
488     addReplyBulk(c,o);
489     decrRefCount(o);
490 }
491 
492 /* Add a long long as integer reply or bulk len / multi bulk count.
493  * Basically this is used to output <prefix><long long><crlf>. */
addReplyLongLongWithPrefix(client * c,long long ll,char prefix)494 void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) {
495     char buf[128];
496     int len;
497 
498     /* Things like $3\r\n or *2\r\n are emitted very often by the protocol
499      * so we have a few shared objects to use if the integer is small
500      * like it is most of the times. */
501     if (prefix == '*' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) {
502         addReply(c,shared.mbulkhdr[ll]);
503         return;
504     } else if (prefix == '$' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) {
505         addReply(c,shared.bulkhdr[ll]);
506         return;
507     }
508 
509     buf[0] = prefix;
510     len = ll2string(buf+1,sizeof(buf)-1,ll);
511     buf[len+1] = '\r';
512     buf[len+2] = '\n';
513     addReplyString(c,buf,len+3);
514 }
515 
addReplyLongLong(client * c,long long ll)516 void addReplyLongLong(client *c, long long ll) {
517     if (ll == 0)
518         addReply(c,shared.czero);
519     else if (ll == 1)
520         addReply(c,shared.cone);
521     else
522         addReplyLongLongWithPrefix(c,ll,':');
523 }
524 
addReplyMultiBulkLen(client * c,long length)525 void addReplyMultiBulkLen(client *c, long length) {
526     if (length < OBJ_SHARED_BULKHDR_LEN)
527         addReply(c,shared.mbulkhdr[length]);
528     else
529         addReplyLongLongWithPrefix(c,length,'*');
530 }
531 
532 /* Create the length prefix of a bulk reply, example: $2234 */
addReplyBulkLen(client * c,robj * obj)533 void addReplyBulkLen(client *c, robj *obj) {
534     size_t len;
535 
536     if (sdsEncodedObject(obj)) {
537         len = sdslen(obj->ptr);
538     } else {
539         long n = (long)obj->ptr;
540 
541         /* Compute how many bytes will take this integer as a radix 10 string */
542         len = 1;
543         if (n < 0) {
544             len++;
545             n = -n;
546         }
547         while((n = n/10) != 0) {
548             len++;
549         }
550     }
551 
552     if (len < OBJ_SHARED_BULKHDR_LEN)
553         addReply(c,shared.bulkhdr[len]);
554     else
555         addReplyLongLongWithPrefix(c,len,'$');
556 }
557 
558 /* Add a Redis Object as a bulk reply */
addReplyBulk(client * c,robj * obj)559 void addReplyBulk(client *c, robj *obj) {
560     addReplyBulkLen(c,obj);
561     addReply(c,obj);
562     addReply(c,shared.crlf);
563 }
564 
565 /* Add a C buffer as bulk reply */
addReplyBulkCBuffer(client * c,const void * p,size_t len)566 void addReplyBulkCBuffer(client *c, const void *p, size_t len) {
567     addReplyLongLongWithPrefix(c,len,'$');
568     addReplyString(c,p,len);
569     addReply(c,shared.crlf);
570 }
571 
572 /* Add sds to reply (takes ownership of sds and frees it) */
addReplyBulkSds(client * c,sds s)573 void addReplyBulkSds(client *c, sds s)  {
574     addReplySds(c,sdscatfmt(sdsempty(),"$%u\r\n",
575         (unsigned long)sdslen(s)));
576     addReplySds(c,s);
577     addReply(c,shared.crlf);
578 }
579 
580 /* Add a C nul term string as bulk reply */
addReplyBulkCString(client * c,const char * s)581 void addReplyBulkCString(client *c, const char *s) {
582     if (s == NULL) {
583         addReply(c,shared.nullbulk);
584     } else {
585         addReplyBulkCBuffer(c,s,strlen(s));
586     }
587 }
588 
589 /* Add a long long as a bulk reply */
addReplyBulkLongLong(client * c,long long ll)590 void addReplyBulkLongLong(client *c, long long ll) {
591     char buf[64];
592     int len;
593 
594     len = ll2string(buf,64,ll);
595     addReplyBulkCBuffer(c,buf,len);
596 }
597 
598 /* Copy 'src' client output buffers into 'dst' client output buffers.
599  * The function takes care of freeing the old output buffers of the
600  * destination client. */
copyClientOutputBuffer(client * dst,client * src)601 void copyClientOutputBuffer(client *dst, client *src) {
602     listRelease(dst->reply);
603     dst->reply = listDup(src->reply);
604     memcpy(dst->buf,src->buf,src->bufpos);
605     dst->bufpos = src->bufpos;
606     dst->reply_bytes = src->reply_bytes;
607 }
608 
609 /* Return true if the specified client has pending reply buffers to write to
610  * the socket. */
clientHasPendingReplies(client * c)611 int clientHasPendingReplies(client *c) {
612     return c->bufpos || listLength(c->reply);
613 }
614 
615 #define MAX_ACCEPTS_PER_CALL 1000
acceptCommonHandler(int fd,int flags,char * ip)616 static void acceptCommonHandler(int fd, int flags, char *ip) {
617     client *c;
618     if ((c = createClient(fd)) == NULL) {
619         serverLog(LL_WARNING,
620             "Error registering fd event for the new client: %s (fd=%d)",
621             strerror(errno),fd);
622         close(fd); /* May be already closed, just ignore errors */
623         return;
624     }
625     /* If maxclient directive is set and this is one client more... close the
626      * connection. Note that we create the client instead to check before
627      * for this condition, since now the socket is already set in non-blocking
628      * mode and we can send an error for free using the Kernel I/O */
629     if (listLength(server.clients) > server.maxclients) {
630         char *err = "-ERR max number of clients reached\r\n";
631 
632         /* That's a best effort error message, don't check write errors */
633         if (write(c->fd,err,strlen(err)) == -1) {
634             /* Nothing to do, Just to avoid the warning... */
635         }
636         server.stat_rejected_conn++;
637         freeClient(c);
638         return;
639     }
640 
641     /* If the server is running in protected mode (the default) and there
642      * is no password set, nor a specific interface is bound, we don't accept
643      * requests from non loopback interfaces. Instead we try to explain the
644      * user what to do to fix it if needed. */
645     if (server.protected_mode &&
646         server.bindaddr_count == 0 &&
647         server.requirepass == NULL &&
648         !(flags & CLIENT_UNIX_SOCKET) &&
649         ip != NULL)
650     {
651         if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) {
652             char *err =
653                 "-DENIED Redis is running in protected mode because protected "
654                 "mode is enabled, no bind address was specified, no "
655                 "authentication password is requested to clients. In this mode "
656                 "connections are only accepted from the loopback interface. "
657                 "If you want to connect from external computers to Redis you "
658                 "may adopt one of the following solutions: "
659                 "1) Just disable protected mode sending the command "
660                 "'CONFIG SET protected-mode no' from the loopback interface "
661                 "by connecting to Redis from the same host the server is "
662                 "running, however MAKE SURE Redis is not publicly accessible "
663                 "from internet if you do so. Use CONFIG REWRITE to make this "
664                 "change permanent. "
665                 "2) Alternatively you can just disable the protected mode by "
666                 "editing the Redis configuration file, and setting the protected "
667                 "mode option to 'no', and then restarting the server. "
668                 "3) If you started the server manually just for testing, restart "
669                 "it with the '--protected-mode no' option. "
670                 "4) Setup a bind address or an authentication password. "
671                 "NOTE: You only need to do one of the above things in order for "
672                 "the server to start accepting connections from the outside.\r\n";
673             if (write(c->fd,err,strlen(err)) == -1) {
674                 /* Nothing to do, Just to avoid the warning... */
675             }
676             server.stat_rejected_conn++;
677             freeClient(c);
678             return;
679         }
680     }
681 
682     server.stat_numconnections++;
683     c->flags |= flags;
684 }
685 
acceptTcpHandler(aeEventLoop * el,int fd,void * privdata,int mask)686 void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
687     int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
688     char cip[NET_IP_STR_LEN];
689     UNUSED(el);
690     UNUSED(mask);
691     UNUSED(privdata);
692 
693     while(max--) {
694         cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
695         if (cfd == ANET_ERR) {
696             if (errno != EWOULDBLOCK)
697                 serverLog(LL_WARNING,
698                     "Accepting client connection: %s", server.neterr);
699             return;
700         }
701         serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
702         acceptCommonHandler(cfd,0,cip);
703     }
704 }
705 
acceptUnixHandler(aeEventLoop * el,int fd,void * privdata,int mask)706 void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
707     int cfd, max = MAX_ACCEPTS_PER_CALL;
708     UNUSED(el);
709     UNUSED(mask);
710     UNUSED(privdata);
711 
712     while(max--) {
713         cfd = anetUnixAccept(server.neterr, fd);
714         if (cfd == ANET_ERR) {
715             if (errno != EWOULDBLOCK)
716                 serverLog(LL_WARNING,
717                     "Accepting client connection: %s", server.neterr);
718             return;
719         }
720         serverLog(LL_VERBOSE,"Accepted connection to %s", server.unixsocket);
721         acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL);
722     }
723 }
724 
freeClientArgv(client * c)725 static void freeClientArgv(client *c) {
726     int j;
727     for (j = 0; j < c->argc; j++)
728         decrRefCount(c->argv[j]);
729     c->argc = 0;
730     c->cmd = NULL;
731 }
732 
733 /* Close all the slaves connections. This is useful in chained replication
734  * when we resync with our own master and want to force all our slaves to
735  * resync with us as well. */
disconnectSlaves(void)736 void disconnectSlaves(void) {
737     while (listLength(server.slaves)) {
738         listNode *ln = listFirst(server.slaves);
739         freeClient((client*)ln->value);
740     }
741 }
742 
743 /* Remove the specified client from global lists where the client could
744  * be referenced, not including the Pub/Sub channels.
745  * This is used by freeClient() and replicationCacheMaster(). */
unlinkClient(client * c)746 void unlinkClient(client *c) {
747     listNode *ln;
748 
749     /* If this is marked as current client unset it. */
750     if (server.current_client == c) server.current_client = NULL;
751 
752     /* Certain operations must be done only if the client has an active socket.
753      * If the client was already unlinked or if it's a "fake client" the
754      * fd is already set to -1. */
755     if (c->fd != -1) {
756         /* Remove from the list of active clients. */
757         ln = listSearchKey(server.clients,c);
758         serverAssert(ln != NULL);
759         listDelNode(server.clients,ln);
760 
761         /* Unregister async I/O handlers and close the socket. */
762         aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
763         aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
764         close(c->fd);
765         c->fd = -1;
766     }
767 
768     /* Remove from the list of pending writes if needed. */
769     if (c->flags & CLIENT_PENDING_WRITE) {
770         ln = listSearchKey(server.clients_pending_write,c);
771         serverAssert(ln != NULL);
772         listDelNode(server.clients_pending_write,ln);
773         c->flags &= ~CLIENT_PENDING_WRITE;
774     }
775 
776     /* When client was just unblocked because of a blocking operation,
777      * remove it from the list of unblocked clients. */
778     if (c->flags & CLIENT_UNBLOCKED) {
779         ln = listSearchKey(server.unblocked_clients,c);
780         serverAssert(ln != NULL);
781         listDelNode(server.unblocked_clients,ln);
782         c->flags &= ~CLIENT_UNBLOCKED;
783     }
784 }
785 
freeClient(client * c)786 void freeClient(client *c) {
787     listNode *ln;
788 
789     /* If it is our master that's beging disconnected we should make sure
790      * to cache the state to try a partial resynchronization later.
791      *
792      * Note that before doing this we make sure that the client is not in
793      * some unexpected state, by checking its flags. */
794     if (server.master && c->flags & CLIENT_MASTER) {
795         serverLog(LL_WARNING,"Connection with master lost.");
796         if (!(c->flags & (CLIENT_CLOSE_AFTER_REPLY|
797                           CLIENT_CLOSE_ASAP|
798                           CLIENT_BLOCKED|
799                           CLIENT_UNBLOCKED)))
800         {
801             replicationCacheMaster(c);
802             return;
803         }
804     }
805 
806     /* Log link disconnection with slave */
807     if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) {
808         serverLog(LL_WARNING,"Connection with slave %s lost.",
809             replicationGetSlaveName(c));
810     }
811 
812     /* Free the query buffer */
813     sdsfree(c->querybuf);
814     c->querybuf = NULL;
815 
816     /* Deallocate structures used to block on blocking ops. */
817     if (c->flags & CLIENT_BLOCKED) unblockClient(c);
818     dictRelease(c->bpop.keys);
819 
820     /* UNWATCH all the keys */
821     unwatchAllKeys(c);
822     listRelease(c->watched_keys);
823 
824     /* Unsubscribe from all the pubsub channels */
825     pubsubUnsubscribeAllChannels(c,0);
826     pubsubUnsubscribeAllPatterns(c,0);
827     dictRelease(c->pubsub_channels);
828     listRelease(c->pubsub_patterns);
829 
830     /* Free data structures. */
831     listRelease(c->reply);
832     freeClientArgv(c);
833 
834     /* Unlink the client: this will close the socket, remove the I/O
835      * handlers, and remove references of the client from different
836      * places where active clients may be referenced. */
837     unlinkClient(c);
838 
839     /* Master/slave cleanup Case 1:
840      * we lost the connection with a slave. */
841     if (c->flags & CLIENT_SLAVE) {
842         if (c->replstate == SLAVE_STATE_SEND_BULK) {
843             if (c->repldbfd != -1) close(c->repldbfd);
844             if (c->replpreamble) sdsfree(c->replpreamble);
845         }
846         list *l = (c->flags & CLIENT_MONITOR) ? server.monitors : server.slaves;
847         ln = listSearchKey(l,c);
848         serverAssert(ln != NULL);
849         listDelNode(l,ln);
850         /* We need to remember the time when we started to have zero
851          * attached slaves, as after some time we'll free the replication
852          * backlog. */
853         if (c->flags & CLIENT_SLAVE && listLength(server.slaves) == 0)
854             server.repl_no_slaves_since = server.unixtime;
855         refreshGoodSlavesCount();
856     }
857 
858     /* Master/slave cleanup Case 2:
859      * we lost the connection with the master. */
860     if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection();
861 
862     /* If this client was scheduled for async freeing we need to remove it
863      * from the queue. */
864     if (c->flags & CLIENT_CLOSE_ASAP) {
865         ln = listSearchKey(server.clients_to_close,c);
866         serverAssert(ln != NULL);
867         listDelNode(server.clients_to_close,ln);
868     }
869 
870     /* Release other dynamically allocated client structure fields,
871      * and finally release the client structure itself. */
872     if (c->name) decrRefCount(c->name);
873     zfree(c->argv);
874     freeClientMultiState(c);
875     sdsfree(c->peerid);
876     zfree(c);
877 }
878 
879 /* Schedule a client to free it at a safe time in the serverCron() function.
880  * This function is useful when we need to terminate a client but we are in
881  * a context where calling freeClient() is not possible, because the client
882  * should be valid for the continuation of the flow of the program. */
freeClientAsync(client * c)883 void freeClientAsync(client *c) {
884     if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return;
885     c->flags |= CLIENT_CLOSE_ASAP;
886     listAddNodeTail(server.clients_to_close,c);
887 }
888 
freeClientsInAsyncFreeQueue(void)889 void freeClientsInAsyncFreeQueue(void) {
890     while (listLength(server.clients_to_close)) {
891         listNode *ln = listFirst(server.clients_to_close);
892         client *c = listNodeValue(ln);
893 
894         c->flags &= ~CLIENT_CLOSE_ASAP;
895         freeClient(c);
896         listDelNode(server.clients_to_close,ln);
897     }
898 }
899 
900 /* Write data in output buffers to client. Return C_OK if the client
901  * is still valid after the call, C_ERR if it was freed. */
writeToClient(int fd,client * c,int handler_installed)902 int writeToClient(int fd, client *c, int handler_installed) {
903     ssize_t nwritten = 0, totwritten = 0;
904     size_t objlen;
905     size_t objmem;
906     robj *o;
907 
908     while(clientHasPendingReplies(c)) {
909         if (c->bufpos > 0) {
910             nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
911             if (nwritten <= 0) break;
912             c->sentlen += nwritten;
913             totwritten += nwritten;
914 
915             /* If the buffer was sent, set bufpos to zero to continue with
916              * the remainder of the reply. */
917             if ((int)c->sentlen == c->bufpos) {
918                 c->bufpos = 0;
919                 c->sentlen = 0;
920             }
921         } else {
922             o = listNodeValue(listFirst(c->reply));
923             objlen = sdslen(o->ptr);
924             objmem = getStringObjectSdsUsedMemory(o);
925 
926             if (objlen == 0) {
927                 listDelNode(c->reply,listFirst(c->reply));
928                 c->reply_bytes -= objmem;
929                 continue;
930             }
931 
932             nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
933             if (nwritten <= 0) break;
934             c->sentlen += nwritten;
935             totwritten += nwritten;
936 
937             /* If we fully sent the object on head go to the next one */
938             if (c->sentlen == objlen) {
939                 listDelNode(c->reply,listFirst(c->reply));
940                 c->sentlen = 0;
941                 c->reply_bytes -= objmem;
942             }
943         }
944         /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
945          * bytes, in a single threaded server it's a good idea to serve
946          * other clients as well, even if a very large request comes from
947          * super fast link that is always able to accept data (in real world
948          * scenario think about 'KEYS *' against the loopback interface).
949          *
950          * However if we are over the maxmemory limit we ignore that and
951          * just deliver as much data as it is possible to deliver. */
952         server.stat_net_output_bytes += totwritten;
953         if (totwritten > NET_MAX_WRITES_PER_EVENT &&
954             (server.maxmemory == 0 ||
955              zmalloc_used_memory() < server.maxmemory)) break;
956     }
957     if (nwritten == -1) {
958         if (errno == EAGAIN) {
959             nwritten = 0;
960         } else {
961             serverLog(LL_VERBOSE,
962                 "Error writing to client: %s", strerror(errno));
963             freeClient(c);
964             return C_ERR;
965         }
966     }
967     if (totwritten > 0) {
968         /* For clients representing masters we don't count sending data
969          * as an interaction, since we always send REPLCONF ACK commands
970          * that take some time to just fill the socket output buffer.
971          * We just rely on data / pings received for timeout detection. */
972         if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
973     }
974     if (!clientHasPendingReplies(c)) {
975         c->sentlen = 0;
976         if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
977 
978         /* Close connection after entire reply has been sent. */
979         if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
980             freeClient(c);
981             return C_ERR;
982         }
983     }
984     return C_OK;
985 }
986 
987 /* Write event handler. Just send data to the client. */
sendReplyToClient(aeEventLoop * el,int fd,void * privdata,int mask)988 void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
989     UNUSED(el);
990     UNUSED(mask);
991     writeToClient(fd,privdata,1);
992 }
993 
994 /* This function is called just before entering the event loop, in the hope
995  * we can just write the replies to the client output buffer without any
996  * need to use a syscall in order to install the writable event handler,
997  * get it called, and so forth. */
handleClientsWithPendingWrites(void)998 int handleClientsWithPendingWrites(void) {
999     listIter li;
1000     listNode *ln;
1001     int processed = listLength(server.clients_pending_write);
1002 
1003     listRewind(server.clients_pending_write,&li);
1004     while((ln = listNext(&li))) {
1005         client *c = listNodeValue(ln);
1006         c->flags &= ~CLIENT_PENDING_WRITE;
1007         listDelNode(server.clients_pending_write,ln);
1008 
1009         /* Try to write buffers to the client socket. */
1010         if (writeToClient(c->fd,c,0) == C_ERR) continue;
1011 
1012         /* If there is nothing left, do nothing. Otherwise install
1013          * the write handler. */
1014         if (clientHasPendingReplies(c) &&
1015             aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1016                 sendReplyToClient, c) == AE_ERR)
1017         {
1018             freeClientAsync(c);
1019         }
1020     }
1021     return processed;
1022 }
1023 
1024 /* resetClient prepare the client to process the next command */
resetClient(client * c)1025 void resetClient(client *c) {
1026     redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL;
1027 
1028     freeClientArgv(c);
1029     c->reqtype = 0;
1030     c->multibulklen = 0;
1031     c->bulklen = -1;
1032 
1033     /* We clear the ASKING flag as well if we are not inside a MULTI, and
1034      * if what we just executed is not the ASKING command itself. */
1035     if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand)
1036         c->flags &= ~CLIENT_ASKING;
1037 
1038     /* Remove the CLIENT_REPLY_SKIP flag if any so that the reply
1039      * to the next command will be sent, but set the flag if the command
1040      * we just processed was "CLIENT REPLY SKIP". */
1041     c->flags &= ~CLIENT_REPLY_SKIP;
1042     if (c->flags & CLIENT_REPLY_SKIP_NEXT) {
1043         c->flags |= CLIENT_REPLY_SKIP;
1044         c->flags &= ~CLIENT_REPLY_SKIP_NEXT;
1045     }
1046 }
1047 
processInlineBuffer(client * c)1048 int processInlineBuffer(client *c) {
1049     char *newline;
1050     int argc, j;
1051     sds *argv, aux;
1052     size_t querylen;
1053 
1054     /* Search for end of line */
1055     newline = strchr(c->querybuf,'\n');
1056 
1057     /* Nothing to do without a \r\n */
1058     if (newline == NULL) {
1059         if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) {
1060             addReplyError(c,"Protocol error: too big inline request");
1061             setProtocolError(c,0);
1062         }
1063         return C_ERR;
1064     }
1065 
1066     /* Handle the \r\n case. */
1067     if (newline && newline != c->querybuf && *(newline-1) == '\r')
1068         newline--;
1069 
1070     /* Split the input buffer up to the \r\n */
1071     querylen = newline-(c->querybuf);
1072     aux = sdsnewlen(c->querybuf,querylen);
1073     argv = sdssplitargs(aux,&argc);
1074     sdsfree(aux);
1075     if (argv == NULL) {
1076         addReplyError(c,"Protocol error: unbalanced quotes in request");
1077         setProtocolError(c,0);
1078         return C_ERR;
1079     }
1080 
1081     /* Newline from slaves can be used to refresh the last ACK time.
1082      * This is useful for a slave to ping back while loading a big
1083      * RDB file. */
1084     if (querylen == 0 && c->flags & CLIENT_SLAVE)
1085         c->repl_ack_time = server.unixtime;
1086 
1087     /* Leave data after the first line of the query in the buffer */
1088     sdsrange(c->querybuf,querylen+2,-1);
1089 
1090     /* Setup argv array on client structure */
1091     if (argc) {
1092         if (c->argv) zfree(c->argv);
1093         c->argv = zmalloc(sizeof(robj*)*argc);
1094     }
1095 
1096     /* Create redis objects for all arguments. */
1097     for (c->argc = 0, j = 0; j < argc; j++) {
1098         if (sdslen(argv[j])) {
1099             c->argv[c->argc] = createObject(OBJ_STRING,argv[j]);
1100             c->argc++;
1101         } else {
1102             sdsfree(argv[j]);
1103         }
1104     }
1105     zfree(argv);
1106     return C_OK;
1107 }
1108 
1109 /* Helper function. Trims query buffer to make the function that processes
1110  * multi bulk requests idempotent. */
setProtocolError(client * c,int pos)1111 static void setProtocolError(client *c, int pos) {
1112     if (server.verbosity <= LL_VERBOSE) {
1113         sds client = catClientInfoString(sdsempty(),c);
1114         serverLog(LL_VERBOSE,
1115             "Protocol error from client: %s", client);
1116         sdsfree(client);
1117     }
1118     c->flags |= CLIENT_CLOSE_AFTER_REPLY;
1119     sdsrange(c->querybuf,pos,-1);
1120 }
1121 
processMultibulkBuffer(client * c)1122 int processMultibulkBuffer(client *c) {
1123     char *newline = NULL;
1124     int pos = 0, ok;
1125     long long ll;
1126 
1127     if (c->multibulklen == 0) {
1128         /* The client should have been reset */
1129         serverAssertWithInfo(c,NULL,c->argc == 0);
1130 
1131         /* Multi bulk length cannot be read without a \r\n */
1132         newline = strchr(c->querybuf,'\r');
1133         if (newline == NULL) {
1134             if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) {
1135                 addReplyError(c,"Protocol error: too big mbulk count string");
1136                 setProtocolError(c,0);
1137             }
1138             return C_ERR;
1139         }
1140 
1141         /* Buffer should also contain \n */
1142         if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
1143             return C_ERR;
1144 
1145         /* We know for sure there is a whole line since newline != NULL,
1146          * so go ahead and find out the multi bulk length. */
1147         serverAssertWithInfo(c,NULL,c->querybuf[0] == '*');
1148         ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
1149         if (!ok || ll > 1024*1024) {
1150             addReplyError(c,"Protocol error: invalid multibulk length");
1151             setProtocolError(c,pos);
1152             return C_ERR;
1153         }
1154 
1155         pos = (newline-c->querybuf)+2;
1156         if (ll <= 0) {
1157             sdsrange(c->querybuf,pos,-1);
1158             return C_OK;
1159         }
1160 
1161         c->multibulklen = ll;
1162 
1163         /* Setup argv array on client structure */
1164         if (c->argv) zfree(c->argv);
1165         c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
1166     }
1167 
1168     serverAssertWithInfo(c,NULL,c->multibulklen > 0);
1169     while(c->multibulklen) {
1170         /* Read bulk length if unknown */
1171         if (c->bulklen == -1) {
1172             newline = strchr(c->querybuf+pos,'\r');
1173             if (newline == NULL) {
1174                 if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) {
1175                     addReplyError(c,
1176                         "Protocol error: too big bulk count string");
1177                     setProtocolError(c,0);
1178                     return C_ERR;
1179                 }
1180                 break;
1181             }
1182 
1183             /* Buffer should also contain \n */
1184             if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
1185                 break;
1186 
1187             if (c->querybuf[pos] != '$') {
1188                 addReplyErrorFormat(c,
1189                     "Protocol error: expected '$', got '%c'",
1190                     c->querybuf[pos]);
1191                 setProtocolError(c,pos);
1192                 return C_ERR;
1193             }
1194 
1195             ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
1196             if (!ok || ll < 0 || ll > 512*1024*1024) {
1197                 addReplyError(c,"Protocol error: invalid bulk length");
1198                 setProtocolError(c,pos);
1199                 return C_ERR;
1200             }
1201 
1202             pos += newline-(c->querybuf+pos)+2;
1203             if (ll >= PROTO_MBULK_BIG_ARG) {
1204                 size_t qblen;
1205 
1206                 /* If we are going to read a large object from network
1207                  * try to make it likely that it will start at c->querybuf
1208                  * boundary so that we can optimize object creation
1209                  * avoiding a large copy of data. */
1210                 sdsrange(c->querybuf,pos,-1);
1211                 pos = 0;
1212                 qblen = sdslen(c->querybuf);
1213                 /* Hint the sds library about the amount of bytes this string is
1214                  * going to contain. */
1215                 if (qblen < (size_t)ll+2)
1216                     c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-qblen);
1217             }
1218             c->bulklen = ll;
1219         }
1220 
1221         /* Read bulk argument */
1222         if (sdslen(c->querybuf)-pos < (unsigned)(c->bulklen+2)) {
1223             /* Not enough data (+2 == trailing \r\n) */
1224             break;
1225         } else {
1226             /* Optimization: if the buffer contains JUST our bulk element
1227              * instead of creating a new object by *copying* the sds we
1228              * just use the current sds string. */
1229             if (pos == 0 &&
1230                 c->bulklen >= PROTO_MBULK_BIG_ARG &&
1231                 (signed) sdslen(c->querybuf) == c->bulklen+2)
1232             {
1233                 c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
1234                 sdsIncrLen(c->querybuf,-2); /* remove CRLF */
1235                 /* Assume that if we saw a fat argument we'll see another one
1236                  * likely... */
1237                 c->querybuf = sdsnewlen(NULL,c->bulklen+2);
1238                 sdsclear(c->querybuf);
1239                 pos = 0;
1240             } else {
1241                 c->argv[c->argc++] =
1242                     createStringObject(c->querybuf+pos,c->bulklen);
1243                 pos += c->bulklen+2;
1244             }
1245             c->bulklen = -1;
1246             c->multibulklen--;
1247         }
1248     }
1249 
1250     /* Trim to pos */
1251     if (pos) sdsrange(c->querybuf,pos,-1);
1252 
1253     /* We're done when c->multibulk == 0 */
1254     if (c->multibulklen == 0) return C_OK;
1255 
1256     /* Still not read to process the command */
1257     return C_ERR;
1258 }
1259 
processInputBuffer(client * c)1260 void processInputBuffer(client *c) {
1261     server.current_client = c;
1262     /* Keep processing while there is something in the input buffer */
1263     while(sdslen(c->querybuf)) {
1264         /* Return if clients are paused. */
1265         if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
1266 
1267         /* Immediately abort if the client is in the middle of something. */
1268         if (c->flags & CLIENT_BLOCKED) break;
1269 
1270         /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
1271          * written to the client. Make sure to not let the reply grow after
1272          * this flag has been set (i.e. don't process more commands). */
1273         if (c->flags & CLIENT_CLOSE_AFTER_REPLY) break;
1274 
1275         /* Determine request type when unknown. */
1276         if (!c->reqtype) {
1277             if (c->querybuf[0] == '*') {
1278                 c->reqtype = PROTO_REQ_MULTIBULK;
1279             } else {
1280                 c->reqtype = PROTO_REQ_INLINE;
1281             }
1282         }
1283 
1284         if (c->reqtype == PROTO_REQ_INLINE) {
1285             if (processInlineBuffer(c) != C_OK) break;
1286         } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
1287             if (processMultibulkBuffer(c) != C_OK) break;
1288         } else {
1289             serverPanic("Unknown request type");
1290         }
1291 
1292         /* Multibulk processing could see a <= 0 length. */
1293         if (c->argc == 0) {
1294             resetClient(c);
1295         } else {
1296             /* Only reset the client when the command was executed. */
1297             if (processCommand(c) == C_OK)
1298                 resetClient(c);
1299             /* freeMemoryIfNeeded may flush slave output buffers. This may result
1300              * into a slave, that may be the active client, to be freed. */
1301             if (server.current_client == NULL) break;
1302         }
1303     }
1304     server.current_client = NULL;
1305 }
1306 
readQueryFromClient(aeEventLoop * el,int fd,void * privdata,int mask)1307 void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1308     client *c = (client*) privdata;
1309     int nread, readlen;
1310     size_t qblen;
1311     UNUSED(el);
1312     UNUSED(mask);
1313 
1314     readlen = PROTO_IOBUF_LEN;
1315     /* If this is a multi bulk request, and we are processing a bulk reply
1316      * that is large enough, try to maximize the probability that the query
1317      * buffer contains exactly the SDS string representing the object, even
1318      * at the risk of requiring more read(2) calls. This way the function
1319      * processMultiBulkBuffer() can avoid copying buffers to create the
1320      * Redis Object representing the argument. */
1321     if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
1322         && c->bulklen >= PROTO_MBULK_BIG_ARG)
1323     {
1324         int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);
1325 
1326         if (remaining < readlen) readlen = remaining;
1327     }
1328 
1329     qblen = sdslen(c->querybuf);
1330     if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
1331     c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
1332     nread = read(fd, c->querybuf+qblen, readlen);
1333     if (nread == -1) {
1334         if (errno == EAGAIN) {
1335             return;
1336         } else {
1337             serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
1338             freeClient(c);
1339             return;
1340         }
1341     } else if (nread == 0) {
1342         serverLog(LL_VERBOSE, "Client closed connection");
1343         freeClient(c);
1344         return;
1345     }
1346 
1347     sdsIncrLen(c->querybuf,nread);
1348     c->lastinteraction = server.unixtime;
1349     if (c->flags & CLIENT_MASTER) c->reploff += nread;
1350     server.stat_net_input_bytes += nread;
1351     if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
1352         sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
1353 
1354         bytes = sdscatrepr(bytes,c->querybuf,64);
1355         serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
1356         sdsfree(ci);
1357         sdsfree(bytes);
1358         freeClient(c);
1359         return;
1360     }
1361     processInputBuffer(c);
1362 }
1363 
getClientsMaxBuffers(unsigned long * longest_output_list,unsigned long * biggest_input_buffer)1364 void getClientsMaxBuffers(unsigned long *longest_output_list,
1365                           unsigned long *biggest_input_buffer) {
1366     client *c;
1367     listNode *ln;
1368     listIter li;
1369     unsigned long lol = 0, bib = 0;
1370 
1371     listRewind(server.clients,&li);
1372     while ((ln = listNext(&li)) != NULL) {
1373         c = listNodeValue(ln);
1374 
1375         if (listLength(c->reply) > lol) lol = listLength(c->reply);
1376         if (sdslen(c->querybuf) > bib) bib = sdslen(c->querybuf);
1377     }
1378     *longest_output_list = lol;
1379     *biggest_input_buffer = bib;
1380 }
1381 
1382 /* A Redis "Peer ID" is a colon separated ip:port pair.
1383  * For IPv4 it's in the form x.y.z.k:port, example: "127.0.0.1:1234".
1384  * For IPv6 addresses we use [] around the IP part, like in "[::1]:1234".
1385  * For Unix sockets we use path:0, like in "/tmp/redis:0".
1386  *
1387  * A Peer ID always fits inside a buffer of NET_PEER_ID_LEN bytes, including
1388  * the null term.
1389  *
1390  * On failure the function still populates 'peerid' with the "?:0" string
1391  * in case you want to relax error checking or need to display something
1392  * anyway (see anetPeerToString implementation for more info). */
genClientPeerId(client * client,char * peerid,size_t peerid_len)1393 void genClientPeerId(client *client, char *peerid,
1394                             size_t peerid_len) {
1395     if (client->flags & CLIENT_UNIX_SOCKET) {
1396         /* Unix socket client. */
1397         snprintf(peerid,peerid_len,"%s:0",server.unixsocket);
1398     } else {
1399         /* TCP client. */
1400         anetFormatPeer(client->fd,peerid,peerid_len);
1401     }
1402 }
1403 
1404 /* This function returns the client peer id, by creating and caching it
1405  * if client->peerid is NULL, otherwise returning the cached value.
1406  * The Peer ID never changes during the life of the client, however it
1407  * is expensive to compute. */
getClientPeerId(client * c)1408 char *getClientPeerId(client *c) {
1409     char peerid[NET_PEER_ID_LEN];
1410 
1411     if (c->peerid == NULL) {
1412         genClientPeerId(c,peerid,sizeof(peerid));
1413         c->peerid = sdsnew(peerid);
1414     }
1415     return c->peerid;
1416 }
1417 
1418 /* Concatenate a string representing the state of a client in an human
1419  * readable format, into the sds string 's'. */
catClientInfoString(sds s,client * client)1420 sds catClientInfoString(sds s, client *client) {
1421     char flags[16], events[3], *p;
1422     int emask;
1423 
1424     p = flags;
1425     if (client->flags & CLIENT_SLAVE) {
1426         if (client->flags & CLIENT_MONITOR)
1427             *p++ = 'O';
1428         else
1429             *p++ = 'S';
1430     }
1431     if (client->flags & CLIENT_MASTER) *p++ = 'M';
1432     if (client->flags & CLIENT_MULTI) *p++ = 'x';
1433     if (client->flags & CLIENT_BLOCKED) *p++ = 'b';
1434     if (client->flags & CLIENT_DIRTY_CAS) *p++ = 'd';
1435     if (client->flags & CLIENT_CLOSE_AFTER_REPLY) *p++ = 'c';
1436     if (client->flags & CLIENT_UNBLOCKED) *p++ = 'u';
1437     if (client->flags & CLIENT_CLOSE_ASAP) *p++ = 'A';
1438     if (client->flags & CLIENT_UNIX_SOCKET) *p++ = 'U';
1439     if (client->flags & CLIENT_READONLY) *p++ = 'r';
1440     if (p == flags) *p++ = 'N';
1441     *p++ = '\0';
1442 
1443     emask = client->fd == -1 ? 0 : aeGetFileEvents(server.el,client->fd);
1444     p = events;
1445     if (emask & AE_READABLE) *p++ = 'r';
1446     if (emask & AE_WRITABLE) *p++ = 'w';
1447     *p = '\0';
1448     return sdscatfmt(s,
1449         "id=%U addr=%s fd=%i name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U obl=%U oll=%U omem=%U events=%s cmd=%s",
1450         (unsigned long long) client->id,
1451         getClientPeerId(client),
1452         client->fd,
1453         client->name ? (char*)client->name->ptr : "",
1454         (long long)(server.unixtime - client->ctime),
1455         (long long)(server.unixtime - client->lastinteraction),
1456         flags,
1457         client->db->id,
1458         (int) dictSize(client->pubsub_channels),
1459         (int) listLength(client->pubsub_patterns),
1460         (client->flags & CLIENT_MULTI) ? client->mstate.count : -1,
1461         (unsigned long long) sdslen(client->querybuf),
1462         (unsigned long long) sdsavail(client->querybuf),
1463         (unsigned long long) client->bufpos,
1464         (unsigned long long) listLength(client->reply),
1465         (unsigned long long) getClientOutputBufferMemoryUsage(client),
1466         events,
1467         client->lastcmd ? client->lastcmd->name : "NULL");
1468 }
1469 
getAllClientsInfoString(void)1470 sds getAllClientsInfoString(void) {
1471     listNode *ln;
1472     listIter li;
1473     client *client;
1474     sds o = sdsnewlen(NULL,200*listLength(server.clients));
1475     sdsclear(o);
1476     listRewind(server.clients,&li);
1477     while ((ln = listNext(&li)) != NULL) {
1478         client = listNodeValue(ln);
1479         o = catClientInfoString(o,client);
1480         o = sdscatlen(o,"\n",1);
1481     }
1482     return o;
1483 }
1484 
clientCommand(client * c)1485 void clientCommand(client *c) {
1486     listNode *ln;
1487     listIter li;
1488     client *client;
1489 
1490     if (!strcasecmp(c->argv[1]->ptr,"list") && c->argc == 2) {
1491         /* CLIENT LIST */
1492         sds o = getAllClientsInfoString();
1493         addReplyBulkCBuffer(c,o,sdslen(o));
1494         sdsfree(o);
1495     } else if (!strcasecmp(c->argv[1]->ptr,"reply") && c->argc == 3) {
1496         /* CLIENT REPLY ON|OFF|SKIP */
1497         if (!strcasecmp(c->argv[2]->ptr,"on")) {
1498             c->flags &= ~(CLIENT_REPLY_SKIP|CLIENT_REPLY_OFF);
1499             addReply(c,shared.ok);
1500         } else if (!strcasecmp(c->argv[2]->ptr,"off")) {
1501             c->flags |= CLIENT_REPLY_OFF;
1502         } else if (!strcasecmp(c->argv[2]->ptr,"skip")) {
1503             if (!(c->flags & CLIENT_REPLY_OFF))
1504                 c->flags |= CLIENT_REPLY_SKIP_NEXT;
1505         } else {
1506             addReply(c,shared.syntaxerr);
1507             return;
1508         }
1509     } else if (!strcasecmp(c->argv[1]->ptr,"kill")) {
1510         /* CLIENT KILL <ip:port>
1511          * CLIENT KILL <option> [value] ... <option> [value] */
1512         char *addr = NULL;
1513         int type = -1;
1514         uint64_t id = 0;
1515         int skipme = 1;
1516         int killed = 0, close_this_client = 0;
1517 
1518         if (c->argc == 3) {
1519             /* Old style syntax: CLIENT KILL <addr> */
1520             addr = c->argv[2]->ptr;
1521             skipme = 0; /* With the old form, you can kill yourself. */
1522         } else if (c->argc > 3) {
1523             int i = 2; /* Next option index. */
1524 
1525             /* New style syntax: parse options. */
1526             while(i < c->argc) {
1527                 int moreargs = c->argc > i+1;
1528 
1529                 if (!strcasecmp(c->argv[i]->ptr,"id") && moreargs) {
1530                     long long tmp;
1531 
1532                     if (getLongLongFromObjectOrReply(c,c->argv[i+1],&tmp,NULL)
1533                         != C_OK) return;
1534                     id = tmp;
1535                 } else if (!strcasecmp(c->argv[i]->ptr,"type") && moreargs) {
1536                     type = getClientTypeByName(c->argv[i+1]->ptr);
1537                     if (type == -1) {
1538                         addReplyErrorFormat(c,"Unknown client type '%s'",
1539                             (char*) c->argv[i+1]->ptr);
1540                         return;
1541                     }
1542                 } else if (!strcasecmp(c->argv[i]->ptr,"addr") && moreargs) {
1543                     addr = c->argv[i+1]->ptr;
1544                 } else if (!strcasecmp(c->argv[i]->ptr,"skipme") && moreargs) {
1545                     if (!strcasecmp(c->argv[i+1]->ptr,"yes")) {
1546                         skipme = 1;
1547                     } else if (!strcasecmp(c->argv[i+1]->ptr,"no")) {
1548                         skipme = 0;
1549                     } else {
1550                         addReply(c,shared.syntaxerr);
1551                         return;
1552                     }
1553                 } else {
1554                     addReply(c,shared.syntaxerr);
1555                     return;
1556                 }
1557                 i += 2;
1558             }
1559         } else {
1560             addReply(c,shared.syntaxerr);
1561             return;
1562         }
1563 
1564         /* Iterate clients killing all the matching clients. */
1565         listRewind(server.clients,&li);
1566         while ((ln = listNext(&li)) != NULL) {
1567             client = listNodeValue(ln);
1568             if (addr && strcmp(getClientPeerId(client),addr) != 0) continue;
1569             if (type != -1 && getClientType(client) != type) continue;
1570             if (id != 0 && client->id != id) continue;
1571             if (c == client && skipme) continue;
1572 
1573             /* Kill it. */
1574             if (c == client) {
1575                 close_this_client = 1;
1576             } else {
1577                 freeClient(client);
1578             }
1579             killed++;
1580         }
1581 
1582         /* Reply according to old/new format. */
1583         if (c->argc == 3) {
1584             if (killed == 0)
1585                 addReplyError(c,"No such client");
1586             else
1587                 addReply(c,shared.ok);
1588         } else {
1589             addReplyLongLong(c,killed);
1590         }
1591 
1592         /* If this client has to be closed, flag it as CLOSE_AFTER_REPLY
1593          * only after we queued the reply to its output buffers. */
1594         if (close_this_client) c->flags |= CLIENT_CLOSE_AFTER_REPLY;
1595     } else if (!strcasecmp(c->argv[1]->ptr,"setname") && c->argc == 3) {
1596         int j, len = sdslen(c->argv[2]->ptr);
1597         char *p = c->argv[2]->ptr;
1598 
1599         /* Setting the client name to an empty string actually removes
1600          * the current name. */
1601         if (len == 0) {
1602             if (c->name) decrRefCount(c->name);
1603             c->name = NULL;
1604             addReply(c,shared.ok);
1605             return;
1606         }
1607 
1608         /* Otherwise check if the charset is ok. We need to do this otherwise
1609          * CLIENT LIST format will break. You should always be able to
1610          * split by space to get the different fields. */
1611         for (j = 0; j < len; j++) {
1612             if (p[j] < '!' || p[j] > '~') { /* ASCII is assumed. */
1613                 addReplyError(c,
1614                     "Client names cannot contain spaces, "
1615                     "newlines or special characters.");
1616                 return;
1617             }
1618         }
1619         if (c->name) decrRefCount(c->name);
1620         c->name = c->argv[2];
1621         incrRefCount(c->name);
1622         addReply(c,shared.ok);
1623     } else if (!strcasecmp(c->argv[1]->ptr,"getname") && c->argc == 2) {
1624         if (c->name)
1625             addReplyBulk(c,c->name);
1626         else
1627             addReply(c,shared.nullbulk);
1628     } else if (!strcasecmp(c->argv[1]->ptr,"pause") && c->argc == 3) {
1629         long long duration;
1630 
1631         if (getTimeoutFromObjectOrReply(c,c->argv[2],&duration,UNIT_MILLISECONDS)
1632                                         != C_OK) return;
1633         pauseClients(duration);
1634         addReply(c,shared.ok);
1635     } else {
1636         addReplyError(c, "Syntax error, try CLIENT (LIST | KILL | GETNAME | SETNAME | PAUSE | REPLY)");
1637     }
1638 }
1639 
1640 /* Rewrite the command vector of the client. All the new objects ref count
1641  * is incremented. The old command vector is freed, and the old objects
1642  * ref count is decremented. */
rewriteClientCommandVector(client * c,int argc,...)1643 void rewriteClientCommandVector(client *c, int argc, ...) {
1644     va_list ap;
1645     int j;
1646     robj **argv; /* The new argument vector */
1647 
1648     argv = zmalloc(sizeof(robj*)*argc);
1649     va_start(ap,argc);
1650     for (j = 0; j < argc; j++) {
1651         robj *a;
1652 
1653         a = va_arg(ap, robj*);
1654         argv[j] = a;
1655         incrRefCount(a);
1656     }
1657     /* We free the objects in the original vector at the end, so we are
1658      * sure that if the same objects are reused in the new vector the
1659      * refcount gets incremented before it gets decremented. */
1660     for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]);
1661     zfree(c->argv);
1662     /* Replace argv and argc with our new versions. */
1663     c->argv = argv;
1664     c->argc = argc;
1665     c->cmd = lookupCommandOrOriginal(c->argv[0]->ptr);
1666     serverAssertWithInfo(c,NULL,c->cmd != NULL);
1667     va_end(ap);
1668 }
1669 
1670 /* Completely replace the client command vector with the provided one. */
replaceClientCommandVector(client * c,int argc,robj ** argv)1671 void replaceClientCommandVector(client *c, int argc, robj **argv) {
1672     freeClientArgv(c);
1673     zfree(c->argv);
1674     c->argv = argv;
1675     c->argc = argc;
1676     c->cmd = lookupCommandOrOriginal(c->argv[0]->ptr);
1677     serverAssertWithInfo(c,NULL,c->cmd != NULL);
1678 }
1679 
1680 /* Rewrite a single item in the command vector.
1681  * The new val ref count is incremented, and the old decremented.
1682  *
1683  * It is possible to specify an argument over the current size of the
1684  * argument vector: in this case the array of objects gets reallocated
1685  * and c->argc set to the max value. However it's up to the caller to
1686  *
1687  * 1. Make sure there are no "holes" and all the arguments are set.
1688  * 2. If the original argument vector was longer than the one we
1689  *    want to end with, it's up to the caller to set c->argc and
1690  *    free the no longer used objects on c->argv. */
rewriteClientCommandArgument(client * c,int i,robj * newval)1691 void rewriteClientCommandArgument(client *c, int i, robj *newval) {
1692     robj *oldval;
1693 
1694     if (i >= c->argc) {
1695         c->argv = zrealloc(c->argv,sizeof(robj*)*(i+1));
1696         c->argc = i+1;
1697         c->argv[i] = NULL;
1698     }
1699     oldval = c->argv[i];
1700     c->argv[i] = newval;
1701     incrRefCount(newval);
1702     if (oldval) decrRefCount(oldval);
1703 
1704     /* If this is the command name make sure to fix c->cmd. */
1705     if (i == 0) {
1706         c->cmd = lookupCommandOrOriginal(c->argv[0]->ptr);
1707         serverAssertWithInfo(c,NULL,c->cmd != NULL);
1708     }
1709 }
1710 
1711 /* This function returns the number of bytes that Redis is virtually
1712  * using to store the reply still not read by the client.
1713  * It is "virtual" since the reply output list may contain objects that
1714  * are shared and are not really using additional memory.
1715  *
1716  * The function returns the total sum of the length of all the objects
1717  * stored in the output list, plus the memory used to allocate every
1718  * list node. The static reply buffer is not taken into account since it
1719  * is allocated anyway.
1720  *
1721  * Note: this function is very fast so can be called as many time as
1722  * the caller wishes. The main usage of this function currently is
1723  * enforcing the client output length limits. */
getClientOutputBufferMemoryUsage(client * c)1724 unsigned long getClientOutputBufferMemoryUsage(client *c) {
1725     unsigned long list_item_size = sizeof(listNode)+sizeof(robj);
1726 
1727     return c->reply_bytes + (list_item_size*listLength(c->reply));
1728 }
1729 
1730 /* Get the class of a client, used in order to enforce limits to different
1731  * classes of clients.
1732  *
1733  * The function will return one of the following:
1734  * CLIENT_TYPE_NORMAL -> Normal client
1735  * CLIENT_TYPE_SLAVE  -> Slave or client executing MONITOR command
1736  * CLIENT_TYPE_PUBSUB -> Client subscribed to Pub/Sub channels
1737  * CLIENT_TYPE_MASTER -> The client representing our replication master.
1738  */
getClientType(client * c)1739 int getClientType(client *c) {
1740     if (c->flags & CLIENT_MASTER) return CLIENT_TYPE_MASTER;
1741     if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR))
1742         return CLIENT_TYPE_SLAVE;
1743     if (c->flags & CLIENT_PUBSUB) return CLIENT_TYPE_PUBSUB;
1744     return CLIENT_TYPE_NORMAL;
1745 }
1746 
getClientTypeByName(char * name)1747 int getClientTypeByName(char *name) {
1748     if (!strcasecmp(name,"normal")) return CLIENT_TYPE_NORMAL;
1749     else if (!strcasecmp(name,"slave")) return CLIENT_TYPE_SLAVE;
1750     else if (!strcasecmp(name,"pubsub")) return CLIENT_TYPE_PUBSUB;
1751     else if (!strcasecmp(name,"master")) return CLIENT_TYPE_MASTER;
1752     else return -1;
1753 }
1754 
getClientTypeName(int class)1755 char *getClientTypeName(int class) {
1756     switch(class) {
1757     case CLIENT_TYPE_NORMAL: return "normal";
1758     case CLIENT_TYPE_SLAVE:  return "slave";
1759     case CLIENT_TYPE_PUBSUB: return "pubsub";
1760     case CLIENT_TYPE_MASTER: return "master";
1761     default:                       return NULL;
1762     }
1763 }
1764 
1765 /* The function checks if the client reached output buffer soft or hard
1766  * limit, and also update the state needed to check the soft limit as
1767  * a side effect.
1768  *
1769  * Return value: non-zero if the client reached the soft or the hard limit.
1770  *               Otherwise zero is returned. */
checkClientOutputBufferLimits(client * c)1771 int checkClientOutputBufferLimits(client *c) {
1772     int soft = 0, hard = 0, class;
1773     unsigned long used_mem = getClientOutputBufferMemoryUsage(c);
1774 
1775     class = getClientType(c);
1776     /* For the purpose of output buffer limiting, masters are handled
1777      * like normal clients. */
1778     if (class == CLIENT_TYPE_MASTER) class = CLIENT_TYPE_NORMAL;
1779 
1780     if (server.client_obuf_limits[class].hard_limit_bytes &&
1781         used_mem >= server.client_obuf_limits[class].hard_limit_bytes)
1782         hard = 1;
1783     if (server.client_obuf_limits[class].soft_limit_bytes &&
1784         used_mem >= server.client_obuf_limits[class].soft_limit_bytes)
1785         soft = 1;
1786 
1787     /* We need to check if the soft limit is reached continuously for the
1788      * specified amount of seconds. */
1789     if (soft) {
1790         if (c->obuf_soft_limit_reached_time == 0) {
1791             c->obuf_soft_limit_reached_time = server.unixtime;
1792             soft = 0; /* First time we see the soft limit reached */
1793         } else {
1794             time_t elapsed = server.unixtime - c->obuf_soft_limit_reached_time;
1795 
1796             if (elapsed <=
1797                 server.client_obuf_limits[class].soft_limit_seconds) {
1798                 soft = 0; /* The client still did not reached the max number of
1799                              seconds for the soft limit to be considered
1800                              reached. */
1801             }
1802         }
1803     } else {
1804         c->obuf_soft_limit_reached_time = 0;
1805     }
1806     return soft || hard;
1807 }
1808 
1809 /* Asynchronously close a client if soft or hard limit is reached on the
1810  * output buffer size. The caller can check if the client will be closed
1811  * checking if the client CLIENT_CLOSE_ASAP flag is set.
1812  *
1813  * Note: we need to close the client asynchronously because this function is
1814  * called from contexts where the client can't be freed safely, i.e. from the
1815  * lower level functions pushing data inside the client output buffers. */
asyncCloseClientOnOutputBufferLimitReached(client * c)1816 void asyncCloseClientOnOutputBufferLimitReached(client *c) {
1817     serverAssert(c->reply_bytes < SIZE_MAX-(1024*64));
1818     if (c->reply_bytes == 0 || c->flags & CLIENT_CLOSE_ASAP) return;
1819     if (checkClientOutputBufferLimits(c)) {
1820         sds client = catClientInfoString(sdsempty(),c);
1821 
1822         freeClientAsync(c);
1823         serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);
1824         sdsfree(client);
1825     }
1826 }
1827 
1828 /* Helper function used by freeMemoryIfNeeded() in order to flush slaves
1829  * output buffers without returning control to the event loop.
1830  * This is also called by SHUTDOWN for a best-effort attempt to send
1831  * slaves the latest writes. */
flushSlavesOutputBuffers(void)1832 void flushSlavesOutputBuffers(void) {
1833     listIter li;
1834     listNode *ln;
1835 
1836     listRewind(server.slaves,&li);
1837     while((ln = listNext(&li))) {
1838         client *slave = listNodeValue(ln);
1839         int events;
1840 
1841         /* Note that the following will not flush output buffers of slaves
1842          * in STATE_ONLINE but having put_online_on_ack set to true: in this
1843          * case the writable event is never installed, since the purpose
1844          * of put_online_on_ack is to postpone the moment it is installed.
1845          * This is what we want since slaves in this state should not receive
1846          * writes before the first ACK. */
1847         events = aeGetFileEvents(server.el,slave->fd);
1848         if (events & AE_WRITABLE &&
1849             slave->replstate == SLAVE_STATE_ONLINE &&
1850             clientHasPendingReplies(slave))
1851         {
1852             writeToClient(slave->fd,slave,0);
1853         }
1854     }
1855 }
1856 
1857 /* Pause clients up to the specified unixtime (in ms). While clients
1858  * are paused no command is processed from clients, so the data set can't
1859  * change during that time.
1860  *
1861  * However while this function pauses normal and Pub/Sub clients, slaves are
1862  * still served, so this function can be used on server upgrades where it is
1863  * required that slaves process the latest bytes from the replication stream
1864  * before being turned to masters.
1865  *
1866  * This function is also internally used by Redis Cluster for the manual
1867  * failover procedure implemented by CLUSTER FAILOVER.
1868  *
1869  * The function always succeed, even if there is already a pause in progress.
1870  * In such a case, the pause is extended if the duration is more than the
1871  * time left for the previous duration. However if the duration is smaller
1872  * than the time left for the previous pause, no change is made to the
1873  * left duration. */
pauseClients(mstime_t end)1874 void pauseClients(mstime_t end) {
1875     if (!server.clients_paused || end > server.clients_pause_end_time)
1876         server.clients_pause_end_time = end;
1877     server.clients_paused = 1;
1878 }
1879 
1880 /* Return non-zero if clients are currently paused. As a side effect the
1881  * function checks if the pause time was reached and clear it. */
clientsArePaused(void)1882 int clientsArePaused(void) {
1883     if (server.clients_paused &&
1884         server.clients_pause_end_time < server.mstime)
1885     {
1886         listNode *ln;
1887         listIter li;
1888         client *c;
1889 
1890         server.clients_paused = 0;
1891 
1892         /* Put all the clients in the unblocked clients queue in order to
1893          * force the re-processing of the input buffer if any. */
1894         listRewind(server.clients,&li);
1895         while ((ln = listNext(&li)) != NULL) {
1896             c = listNodeValue(ln);
1897 
1898             /* Don't touch slaves and blocked clients. The latter pending
1899              * requests be processed when unblocked. */
1900             if (c->flags & (CLIENT_SLAVE|CLIENT_BLOCKED)) continue;
1901             c->flags |= CLIENT_UNBLOCKED;
1902             listAddNodeTail(server.unblocked_clients,c);
1903         }
1904     }
1905     return server.clients_paused;
1906 }
1907 
1908 /* This function is called by Redis in order to process a few events from
1909  * time to time while blocked into some not interruptible operation.
1910  * This allows to reply to clients with the -LOADING error while loading the
1911  * data set at startup or after a full resynchronization with the master
1912  * and so forth.
1913  *
1914  * It calls the event loop in order to process a few events. Specifically we
1915  * try to call the event loop 4 times as long as we receive acknowledge that
1916  * some event was processed, in order to go forward with the accept, read,
1917  * write, close sequence needed to serve a client.
1918  *
1919  * The function returns the total number of events processed. */
processEventsWhileBlocked(void)1920 int processEventsWhileBlocked(void) {
1921     int iterations = 4; /* See the function top-comment. */
1922     int count = 0;
1923     while (iterations--) {
1924         int events = 0;
1925         events += aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
1926         events += handleClientsWithPendingWrites();
1927         if (!events) break;
1928         count += events;
1929     }
1930     return count;
1931 }
1932