1 /* 2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * * Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * * Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * * Neither the name of Redis nor the names of its contributors may be used 14 * to endorse or promote products derived from this software without 15 * specific prior written permission. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27 * POSSIBILITY OF SUCH DAMAGE. 28 */ 29 30 #include "server.h" 31 #include "atomicvar.h" 32 #include <sys/socket.h> 33 #include <sys/uio.h> 34 #include <math.h> 35 #include <ctype.h> 36 37 static void setProtocolError(const char *errstr, client *c); 38 39 /* Return the size consumed from the allocator, for the specified SDS string, 40 * including internal fragmentation. This function is used in order to compute 41 * the client output buffer size. */ 42 size_t sdsZmallocSize(sds s) { 43 void *sh = sdsAllocPtr(s); 44 return zmalloc_size(sh); 45 } 46 47 /* Return the amount of memory used by the sds string at object->ptr 48 * for a string object. */ 49 size_t getStringObjectSdsUsedMemory(robj *o) { 50 serverAssertWithInfo(NULL,o,o->type == OBJ_STRING); 51 switch(o->encoding) { 52 case OBJ_ENCODING_RAW: return sdsZmallocSize(o->ptr); 53 case OBJ_ENCODING_EMBSTR: return zmalloc_size(o)-sizeof(robj); 54 default: return 0; /* Just integer encoding for now. */ 55 } 56 } 57 58 /* Client.reply list dup and free methods. */ 59 void *dupClientReplyValue(void *o) { 60 clientReplyBlock *old = o; 61 clientReplyBlock *buf = zmalloc(sizeof(clientReplyBlock) + old->size); 62 memcpy(buf, o, sizeof(clientReplyBlock) + old->size); 63 return buf; 64 } 65 66 void freeClientReplyValue(void *o) { 67 zfree(o); 68 } 69 70 int listMatchObjects(void *a, void *b) { 71 return equalStringObjects(a,b); 72 } 73 74 /* This function links the client to the global linked list of clients. 75 * unlinkClient() does the opposite, among other things. */ 76 void linkClient(client *c) { 77 listAddNodeTail(server.clients,c); 78 /* Note that we remember the linked list node where the client is stored, 79 * this way removing the client in unlinkClient() will not require 80 * a linear scan, but just a constant time operation. */ 81 c->client_list_node = listLast(server.clients); 82 uint64_t id = htonu64(c->id); 83 raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL); 84 } 85 86 client *createClient(int fd) { 87 client *c = zmalloc(sizeof(client)); 88 89 /* passing -1 as fd it is possible to create a non connected client. 90 * This is useful since all the commands needs to be executed 91 * in the context of a client. When commands are executed in other 92 * contexts (for instance a Lua script) we need a non connected client. */ 93 if (fd != -1) { 94 anetNonBlock(NULL,fd); 95 anetEnableTcpNoDelay(NULL,fd); 96 if (server.tcpkeepalive) 97 anetKeepAlive(NULL,fd,server.tcpkeepalive); 98 if (aeCreateFileEvent(server.el,fd,AE_READABLE, 99 readQueryFromClient, c) == AE_ERR) 100 { 101 close(fd); 102 zfree(c); 103 return NULL; 104 } 105 } 106 107 selectDb(c,0); 108 uint64_t client_id; 109 atomicGetIncr(server.next_client_id,client_id,1); 110 c->id = client_id; 111 c->fd = fd; 112 c->name = NULL; 113 c->bufpos = 0; 114 c->qb_pos = 0; 115 c->querybuf = sdsempty(); 116 c->pending_querybuf = sdsempty(); 117 c->querybuf_peak = 0; 118 c->reqtype = 0; 119 c->argc = 0; 120 c->argv = NULL; 121 c->cmd = c->lastcmd = NULL; 122 c->multibulklen = 0; 123 c->bulklen = -1; 124 c->sentlen = 0; 125 c->flags = 0; 126 c->ctime = c->lastinteraction = server.unixtime; 127 c->authenticated = 0; 128 c->replstate = REPL_STATE_NONE; 129 c->repl_put_online_on_ack = 0; 130 c->reploff = 0; 131 c->read_reploff = 0; 132 c->repl_ack_off = 0; 133 c->repl_ack_time = 0; 134 c->slave_listening_port = 0; 135 c->slave_ip[0] = '\0'; 136 c->slave_capa = SLAVE_CAPA_NONE; 137 c->reply = listCreate(); 138 c->reply_bytes = 0; 139 c->obuf_soft_limit_reached_time = 0; 140 listSetFreeMethod(c->reply,freeClientReplyValue); 141 listSetDupMethod(c->reply,dupClientReplyValue); 142 c->btype = BLOCKED_NONE; 143 c->bpop.timeout = 0; 144 c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL); 145 c->bpop.target = NULL; 146 c->bpop.xread_group = NULL; 147 c->bpop.xread_consumer = NULL; 148 c->bpop.xread_group_noack = 0; 149 c->bpop.numreplicas = 0; 150 c->bpop.reploffset = 0; 151 c->woff = 0; 152 c->watched_keys = listCreate(); 153 c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL); 154 c->pubsub_patterns = listCreate(); 155 c->peerid = NULL; 156 c->client_list_node = NULL; 157 listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid); 158 listSetMatchMethod(c->pubsub_patterns,listMatchObjects); 159 if (fd != -1) linkClient(c); 160 initClientMultiState(c); 161 return c; 162 } 163 164 /* This funciton puts the client in the queue of clients that should write 165 * their output buffers to the socket. Note that it does not *yet* install 166 * the write handler, to start clients are put in a queue of clients that need 167 * to write, so we try to do that before returning in the event loop (see the 168 * handleClientsWithPendingWrites() function). 169 * If we fail and there is more data to write, compared to what the socket 170 * buffers can hold, then we'll really install the handler. */ 171 void clientInstallWriteHandler(client *c) { 172 /* Schedule the client to write the output buffers to the socket only 173 * if not already done and, for slaves, if the slave can actually receive 174 * writes at this stage. */ 175 if (!(c->flags & CLIENT_PENDING_WRITE) && 176 (c->replstate == REPL_STATE_NONE || 177 (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))) 178 { 179 /* Here instead of installing the write handler, we just flag the 180 * client and put it into a list of clients that have something 181 * to write to the socket. This way before re-entering the event 182 * loop, we can try to directly write to the client sockets avoiding 183 * a system call. We'll only really install the write handler if 184 * we'll not be able to write the whole reply at once. */ 185 c->flags |= CLIENT_PENDING_WRITE; 186 listAddNodeHead(server.clients_pending_write,c); 187 } 188 } 189 190 /* This function is called every time we are going to transmit new data 191 * to the client. The behavior is the following: 192 * 193 * If the client should receive new data (normal clients will) the function 194 * returns C_OK, and make sure to install the write handler in our event 195 * loop so that when the socket is writable new data gets written. 196 * 197 * If the client should not receive new data, because it is a fake client 198 * (used to load AOF in memory), a master or because the setup of the write 199 * handler failed, the function returns C_ERR. 200 * 201 * The function may return C_OK without actually installing the write 202 * event handler in the following cases: 203 * 204 * 1) The event handler should already be installed since the output buffer 205 * already contains something. 206 * 2) The client is a slave but not yet online, so we want to just accumulate 207 * writes in the buffer but not actually sending them yet. 208 * 209 * Typically gets called every time a reply is built, before adding more 210 * data to the clients output buffers. If the function returns C_ERR no 211 * data should be appended to the output buffers. */ 212 int prepareClientToWrite(client *c) { 213 /* If it's the Lua client we always return ok without installing any 214 * handler since there is no socket at all. */ 215 if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK; 216 217 /* CLIENT REPLY OFF / SKIP handling: don't send replies. */ 218 if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR; 219 220 /* Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag 221 * is set. */ 222 if ((c->flags & CLIENT_MASTER) && 223 !(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR; 224 225 if (c->fd <= 0) return C_ERR; /* Fake client for AOF loading. */ 226 227 /* Schedule the client to write the output buffers to the socket, unless 228 * it should already be setup to do so (it has already pending data). */ 229 if (!clientHasPendingReplies(c)) clientInstallWriteHandler(c); 230 231 /* Authorize the caller to queue in the output buffer of this client. */ 232 return C_OK; 233 } 234 235 /* ----------------------------------------------------------------------------- 236 * Low level functions to add more data to output buffers. 237 * -------------------------------------------------------------------------- */ 238 239 int _addReplyToBuffer(client *c, const char *s, size_t len) { 240 size_t available = sizeof(c->buf)-c->bufpos; 241 242 if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK; 243 244 /* If there already are entries in the reply list, we cannot 245 * add anything more to the static buffer. */ 246 if (listLength(c->reply) > 0) return C_ERR; 247 248 /* Check that the buffer has enough space available for this string. */ 249 if (len > available) return C_ERR; 250 251 memcpy(c->buf+c->bufpos,s,len); 252 c->bufpos+=len; 253 return C_OK; 254 } 255 256 void _addReplyStringToList(client *c, const char *s, size_t len) { 257 if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return; 258 259 listNode *ln = listLast(c->reply); 260 clientReplyBlock *tail = ln? listNodeValue(ln): NULL; 261 262 /* Note that 'tail' may be NULL even if we have a tail node, becuase when 263 * addDeferredMultiBulkLength() is used, it sets a dummy node to NULL just 264 * fo fill it later, when the size of the bulk length is set. */ 265 266 /* Append to tail string when possible. */ 267 if (tail) { 268 /* Copy the part we can fit into the tail, and leave the rest for a 269 * new node */ 270 size_t avail = tail->size - tail->used; 271 size_t copy = avail >= len? len: avail; 272 memcpy(tail->buf + tail->used, s, copy); 273 tail->used += copy; 274 s += copy; 275 len -= copy; 276 } 277 if (len) { 278 /* Create a new node, make sure it is allocated to at 279 * least PROTO_REPLY_CHUNK_BYTES */ 280 size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len; 281 tail = zmalloc(size + sizeof(clientReplyBlock)); 282 /* take over the allocation's internal fragmentation */ 283 tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock); 284 tail->used = len; 285 memcpy(tail->buf, s, len); 286 listAddNodeTail(c->reply, tail); 287 c->reply_bytes += tail->size; 288 } 289 asyncCloseClientOnOutputBufferLimitReached(c); 290 } 291 292 /* ----------------------------------------------------------------------------- 293 * Higher level functions to queue data on the client output buffer. 294 * The following functions are the ones that commands implementations will call. 295 * -------------------------------------------------------------------------- */ 296 297 /* Add the object 'obj' string representation to the client output buffer. */ 298 void addReply(client *c, robj *obj) { 299 if (prepareClientToWrite(c) != C_OK) return; 300 301 if (sdsEncodedObject(obj)) { 302 if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) 303 _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr)); 304 } else if (obj->encoding == OBJ_ENCODING_INT) { 305 /* For integer encoded strings we just convert it into a string 306 * using our optimized function, and attach the resulting string 307 * to the output buffer. */ 308 char buf[32]; 309 size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr); 310 if (_addReplyToBuffer(c,buf,len) != C_OK) 311 _addReplyStringToList(c,buf,len); 312 } else { 313 serverPanic("Wrong obj->encoding in addReply()"); 314 } 315 } 316 317 /* Add the SDS 's' string to the client output buffer, as a side effect 318 * the SDS string is freed. */ 319 void addReplySds(client *c, sds s) { 320 if (prepareClientToWrite(c) != C_OK) { 321 /* The caller expects the sds to be free'd. */ 322 sdsfree(s); 323 return; 324 } 325 if (_addReplyToBuffer(c,s,sdslen(s)) != C_OK) 326 _addReplyStringToList(c,s,sdslen(s)); 327 sdsfree(s); 328 } 329 330 /* This low level function just adds whatever protocol you send it to the 331 * client buffer, trying the static buffer initially, and using the string 332 * of objects if not possible. 333 * 334 * It is efficient because does not create an SDS object nor an Redis object 335 * if not needed. The object will only be created by calling 336 * _addReplyStringToList() if we fail to extend the existing tail object 337 * in the list of objects. */ 338 void addReplyString(client *c, const char *s, size_t len) { 339 if (prepareClientToWrite(c) != C_OK) return; 340 if (_addReplyToBuffer(c,s,len) != C_OK) 341 _addReplyStringToList(c,s,len); 342 } 343 344 /* Low level function called by the addReplyError...() functions. 345 * It emits the protocol for a Redis error, in the form: 346 * 347 * -ERRORCODE Error Message<CR><LF> 348 * 349 * If the error code is already passed in the string 's', the error 350 * code provided is used, otherwise the string "-ERR " for the generic 351 * error code is automatically added. */ 352 void addReplyErrorLength(client *c, const char *s, size_t len) { 353 /* If the string already starts with "-..." then the error code 354 * is provided by the caller. Otherwise we use "-ERR". */ 355 if (!len || s[0] != '-') addReplyString(c,"-ERR ",5); 356 addReplyString(c,s,len); 357 addReplyString(c,"\r\n",2); 358 359 /* Sometimes it could be normal that a slave replies to a master with 360 * an error and this function gets called. Actually the error will never 361 * be sent because addReply*() against master clients has no effect... 362 * A notable example is: 363 * 364 * EVAL 'redis.call("incr",KEYS[1]); redis.call("nonexisting")' 1 x 365 * 366 * Where the master must propagate the first change even if the second 367 * will produce an error. However it is useful to log such events since 368 * they are rare and may hint at errors in a script or a bug in Redis. */ 369 if (c->flags & (CLIENT_MASTER|CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) { 370 char* to = c->flags & CLIENT_MASTER? "master": "replica"; 371 char* from = c->flags & CLIENT_MASTER? "replica": "master"; 372 char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>"; 373 serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error " 374 "to its %s: '%s' after processing the command " 375 "'%s'", from, to, s, cmdname); 376 } 377 } 378 379 void addReplyError(client *c, const char *err) { 380 addReplyErrorLength(c,err,strlen(err)); 381 } 382 383 void addReplyErrorFormat(client *c, const char *fmt, ...) { 384 size_t l, j; 385 va_list ap; 386 va_start(ap,fmt); 387 sds s = sdscatvprintf(sdsempty(),fmt,ap); 388 va_end(ap); 389 /* Make sure there are no newlines in the string, otherwise invalid protocol 390 * is emitted. */ 391 l = sdslen(s); 392 for (j = 0; j < l; j++) { 393 if (s[j] == '\r' || s[j] == '\n') s[j] = ' '; 394 } 395 addReplyErrorLength(c,s,sdslen(s)); 396 sdsfree(s); 397 } 398 399 void addReplyStatusLength(client *c, const char *s, size_t len) { 400 addReplyString(c,"+",1); 401 addReplyString(c,s,len); 402 addReplyString(c,"\r\n",2); 403 } 404 405 void addReplyStatus(client *c, const char *status) { 406 addReplyStatusLength(c,status,strlen(status)); 407 } 408 409 void addReplyStatusFormat(client *c, const char *fmt, ...) { 410 va_list ap; 411 va_start(ap,fmt); 412 sds s = sdscatvprintf(sdsempty(),fmt,ap); 413 va_end(ap); 414 addReplyStatusLength(c,s,sdslen(s)); 415 sdsfree(s); 416 } 417 418 /* Adds an empty object to the reply list that will contain the multi bulk 419 * length, which is not known when this function is called. */ 420 void *addDeferredMultiBulkLength(client *c) { 421 /* Note that we install the write event here even if the object is not 422 * ready to be sent, since we are sure that before returning to the 423 * event loop setDeferredMultiBulkLength() will be called. */ 424 if (prepareClientToWrite(c) != C_OK) return NULL; 425 listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */ 426 return listLast(c->reply); 427 } 428 429 /* Populate the length object and try gluing it to the next chunk. */ 430 void setDeferredMultiBulkLength(client *c, void *node, long length) { 431 listNode *ln = (listNode*)node; 432 clientReplyBlock *next; 433 char lenstr[128]; 434 size_t lenstr_len = sprintf(lenstr, "*%ld\r\n", length); 435 436 /* Abort when *node is NULL: when the client should not accept writes 437 * we return NULL in addDeferredMultiBulkLength() */ 438 if (node == NULL) return; 439 serverAssert(!listNodeValue(ln)); 440 441 /* Normally we fill this dummy NULL node, added by addDeferredMultiBulkLength(), 442 * with a new buffer structure containing the protocol needed to specify 443 * the length of the array following. However sometimes when there is 444 * little memory to move, we may instead remove this NULL node, and prefix 445 * our protocol in the node immediately after to it, in order to save a 446 * write(2) syscall later. Conditions needed to do it: 447 * 448 * - The next node is non-NULL, 449 * - It has enough room already allocated 450 * - And not too large (avoid large memmove) */ 451 if (ln->next != NULL && (next = listNodeValue(ln->next)) && 452 next->size - next->used >= lenstr_len && 453 next->used < PROTO_REPLY_CHUNK_BYTES * 4) { 454 memmove(next->buf + lenstr_len, next->buf, next->used); 455 memcpy(next->buf, lenstr, lenstr_len); 456 next->used += lenstr_len; 457 listDelNode(c->reply,ln); 458 } else { 459 /* Create a new node */ 460 clientReplyBlock *buf = zmalloc(lenstr_len + sizeof(clientReplyBlock)); 461 /* Take over the allocation's internal fragmentation */ 462 buf->size = zmalloc_usable(buf) - sizeof(clientReplyBlock); 463 buf->used = lenstr_len; 464 memcpy(buf->buf, lenstr, lenstr_len); 465 listNodeValue(ln) = buf; 466 c->reply_bytes += buf->size; 467 } 468 asyncCloseClientOnOutputBufferLimitReached(c); 469 } 470 471 /* Add a double as a bulk reply */ 472 void addReplyDouble(client *c, double d) { 473 char dbuf[128], sbuf[128]; 474 int dlen, slen; 475 if (isinf(d)) { 476 /* Libc in odd systems (Hi Solaris!) will format infinite in a 477 * different way, so better to handle it in an explicit way. */ 478 addReplyBulkCString(c, d > 0 ? "inf" : "-inf"); 479 } else { 480 dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d); 481 slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf); 482 addReplyString(c,sbuf,slen); 483 } 484 } 485 486 /* Add a long double as a bulk reply, but uses a human readable formatting 487 * of the double instead of exposing the crude behavior of doubles to the 488 * dear user. */ 489 void addReplyHumanLongDouble(client *c, long double d) { 490 robj *o = createStringObjectFromLongDouble(d,1); 491 addReplyBulk(c,o); 492 decrRefCount(o); 493 } 494 495 /* Add a long long as integer reply or bulk len / multi bulk count. 496 * Basically this is used to output <prefix><long long><crlf>. */ 497 void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) { 498 char buf[128]; 499 int len; 500 501 /* Things like $3\r\n or *2\r\n are emitted very often by the protocol 502 * so we have a few shared objects to use if the integer is small 503 * like it is most of the times. */ 504 if (prefix == '*' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) { 505 addReply(c,shared.mbulkhdr[ll]); 506 return; 507 } else if (prefix == '$' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) { 508 addReply(c,shared.bulkhdr[ll]); 509 return; 510 } 511 512 buf[0] = prefix; 513 len = ll2string(buf+1,sizeof(buf)-1,ll); 514 buf[len+1] = '\r'; 515 buf[len+2] = '\n'; 516 addReplyString(c,buf,len+3); 517 } 518 519 void addReplyLongLong(client *c, long long ll) { 520 if (ll == 0) 521 addReply(c,shared.czero); 522 else if (ll == 1) 523 addReply(c,shared.cone); 524 else 525 addReplyLongLongWithPrefix(c,ll,':'); 526 } 527 528 void addReplyMultiBulkLen(client *c, long length) { 529 if (length < OBJ_SHARED_BULKHDR_LEN) 530 addReply(c,shared.mbulkhdr[length]); 531 else 532 addReplyLongLongWithPrefix(c,length,'*'); 533 } 534 535 /* Create the length prefix of a bulk reply, example: $2234 */ 536 void addReplyBulkLen(client *c, robj *obj) { 537 size_t len; 538 539 if (sdsEncodedObject(obj)) { 540 len = sdslen(obj->ptr); 541 } else { 542 long n = (long)obj->ptr; 543 544 /* Compute how many bytes will take this integer as a radix 10 string */ 545 len = 1; 546 if (n < 0) { 547 len++; 548 n = -n; 549 } 550 while((n = n/10) != 0) { 551 len++; 552 } 553 } 554 555 if (len < OBJ_SHARED_BULKHDR_LEN) 556 addReply(c,shared.bulkhdr[len]); 557 else 558 addReplyLongLongWithPrefix(c,len,'$'); 559 } 560 561 /* Add a Redis Object as a bulk reply */ 562 void addReplyBulk(client *c, robj *obj) { 563 addReplyBulkLen(c,obj); 564 addReply(c,obj); 565 addReply(c,shared.crlf); 566 } 567 568 /* Add a C buffer as bulk reply */ 569 void addReplyBulkCBuffer(client *c, const void *p, size_t len) { 570 addReplyLongLongWithPrefix(c,len,'$'); 571 addReplyString(c,p,len); 572 addReply(c,shared.crlf); 573 } 574 575 /* Add sds to reply (takes ownership of sds and frees it) */ 576 void addReplyBulkSds(client *c, sds s) { 577 addReplyLongLongWithPrefix(c,sdslen(s),'$'); 578 addReplySds(c,s); 579 addReply(c,shared.crlf); 580 } 581 582 /* Add a C null term string as bulk reply */ 583 void addReplyBulkCString(client *c, const char *s) { 584 if (s == NULL) { 585 addReply(c,shared.nullbulk); 586 } else { 587 addReplyBulkCBuffer(c,s,strlen(s)); 588 } 589 } 590 591 /* Add a long long as a bulk reply */ 592 void addReplyBulkLongLong(client *c, long long ll) { 593 char buf[64]; 594 int len; 595 596 len = ll2string(buf,64,ll); 597 addReplyBulkCBuffer(c,buf,len); 598 } 599 600 /* Add an array of C strings as status replies with a heading. 601 * This function is typically invoked by from commands that support 602 * subcommands in response to the 'help' subcommand. The help array 603 * is terminated by NULL sentinel. */ 604 void addReplyHelp(client *c, const char **help) { 605 sds cmd = sdsnew((char*) c->argv[0]->ptr); 606 void *blenp = addDeferredMultiBulkLength(c); 607 int blen = 0; 608 609 sdstoupper(cmd); 610 addReplyStatusFormat(c, 611 "%s <subcommand> arg arg ... arg. Subcommands are:",cmd); 612 sdsfree(cmd); 613 614 while (help[blen]) addReplyStatus(c,help[blen++]); 615 616 blen++; /* Account for the header line(s). */ 617 setDeferredMultiBulkLength(c,blenp,blen); 618 } 619 620 /* Add a suggestive error reply. 621 * This function is typically invoked by from commands that support 622 * subcommands in response to an unknown subcommand or argument error. */ 623 void addReplySubcommandSyntaxError(client *c) { 624 sds cmd = sdsnew((char*) c->argv[0]->ptr); 625 sdstoupper(cmd); 626 addReplyErrorFormat(c, 627 "Unknown subcommand or wrong number of arguments for '%s'. Try %s HELP.", 628 (char*)c->argv[1]->ptr,cmd); 629 sdsfree(cmd); 630 } 631 632 /* Append 'src' client output buffers into 'dst' client output buffers. 633 * This function clears the output buffers of 'src' */ 634 void AddReplyFromClient(client *dst, client *src) { 635 if (prepareClientToWrite(dst) != C_OK) 636 return; 637 addReplyString(dst,src->buf, src->bufpos); 638 if (listLength(src->reply)) 639 listJoin(dst->reply,src->reply); 640 dst->reply_bytes += src->reply_bytes; 641 src->reply_bytes = 0; 642 src->bufpos = 0; 643 } 644 645 /* Copy 'src' client output buffers into 'dst' client output buffers. 646 * The function takes care of freeing the old output buffers of the 647 * destination client. */ 648 void copyClientOutputBuffer(client *dst, client *src) { 649 listRelease(dst->reply); 650 dst->sentlen = 0; 651 dst->reply = listDup(src->reply); 652 memcpy(dst->buf,src->buf,src->bufpos); 653 dst->bufpos = src->bufpos; 654 dst->reply_bytes = src->reply_bytes; 655 } 656 657 /* Return true if the specified client has pending reply buffers to write to 658 * the socket. */ 659 int clientHasPendingReplies(client *c) { 660 return c->bufpos || listLength(c->reply); 661 } 662 663 #define MAX_ACCEPTS_PER_CALL 500 664 static void acceptCommonHandler(int fd, int flags, char *ip) { 665 client *c; 666 if ((c = createClient(fd)) == NULL) { 667 serverLog(LL_WARNING, 668 "Error registering fd event for the new client: %s (fd=%d)", 669 strerror(errno),fd); 670 close(fd); /* May be already closed, just ignore errors */ 671 return; 672 } 673 /* If maxclient directive is set and this is one client more... close the 674 * connection. Note that we create the client instead to check before 675 * for this condition, since now the socket is already set in non-blocking 676 * mode and we can send an error for free using the Kernel I/O */ 677 if (listLength(server.clients) > server.maxclients) { 678 char *err = "-ERR max number of clients reached\r\n"; 679 680 /* That's a best effort error message, don't check write errors */ 681 if (write(c->fd,err,strlen(err)) == -1) { 682 /* Nothing to do, Just to avoid the warning... */ 683 } 684 server.stat_rejected_conn++; 685 freeClient(c); 686 return; 687 } 688 689 /* If the server is running in protected mode (the default) and there 690 * is no password set, nor a specific interface is bound, we don't accept 691 * requests from non loopback interfaces. Instead we try to explain the 692 * user what to do to fix it if needed. */ 693 if (server.protected_mode && 694 server.bindaddr_count == 0 && 695 server.requirepass == NULL && 696 !(flags & CLIENT_UNIX_SOCKET) && 697 ip != NULL) 698 { 699 if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) { 700 char *err = 701 "-DENIED Redis is running in protected mode because protected " 702 "mode is enabled, no bind address was specified, no " 703 "authentication password is requested to clients. In this mode " 704 "connections are only accepted from the loopback interface. " 705 "If you want to connect from external computers to Redis you " 706 "may adopt one of the following solutions: " 707 "1) Just disable protected mode sending the command " 708 "'CONFIG SET protected-mode no' from the loopback interface " 709 "by connecting to Redis from the same host the server is " 710 "running, however MAKE SURE Redis is not publicly accessible " 711 "from internet if you do so. Use CONFIG REWRITE to make this " 712 "change permanent. " 713 "2) Alternatively you can just disable the protected mode by " 714 "editing the Redis configuration file, and setting the protected " 715 "mode option to 'no', and then restarting the server. " 716 "3) If you started the server manually just for testing, restart " 717 "it with the '--protected-mode no' option. " 718 "4) Setup a bind address or an authentication password. " 719 "NOTE: You only need to do one of the above things in order for " 720 "the server to start accepting connections from the outside.\r\n"; 721 if (write(c->fd,err,strlen(err)) == -1) { 722 /* Nothing to do, Just to avoid the warning... */ 723 } 724 server.stat_rejected_conn++; 725 freeClient(c); 726 return; 727 } 728 } 729 730 server.stat_numconnections++; 731 c->flags |= flags; 732 } 733 734 void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { 735 int cport, cfd, max = MAX_ACCEPTS_PER_CALL; 736 char cip[NET_IP_STR_LEN]; 737 UNUSED(el); 738 UNUSED(mask); 739 UNUSED(privdata); 740 741 while(max--) { 742 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); 743 if (cfd == ANET_ERR) { 744 if (errno != EWOULDBLOCK) 745 serverLog(LL_WARNING, 746 "Accepting client connection: %s", server.neterr); 747 return; 748 } 749 serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); 750 acceptCommonHandler(cfd,0,cip); 751 } 752 } 753 754 void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) { 755 int cfd, max = MAX_ACCEPTS_PER_CALL; 756 UNUSED(el); 757 UNUSED(mask); 758 UNUSED(privdata); 759 760 while(max--) { 761 cfd = anetUnixAccept(server.neterr, fd); 762 if (cfd == ANET_ERR) { 763 if (errno != EWOULDBLOCK) 764 serverLog(LL_WARNING, 765 "Accepting client connection: %s", server.neterr); 766 return; 767 } 768 serverLog(LL_VERBOSE,"Accepted connection to %s", server.unixsocket); 769 acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL); 770 } 771 } 772 773 static void freeClientArgv(client *c) { 774 int j; 775 for (j = 0; j < c->argc; j++) 776 decrRefCount(c->argv[j]); 777 c->argc = 0; 778 c->cmd = NULL; 779 } 780 781 /* Close all the slaves connections. This is useful in chained replication 782 * when we resync with our own master and want to force all our slaves to 783 * resync with us as well. */ 784 void disconnectSlaves(void) { 785 while (listLength(server.slaves)) { 786 listNode *ln = listFirst(server.slaves); 787 freeClient((client*)ln->value); 788 } 789 } 790 791 /* Remove the specified client from global lists where the client could 792 * be referenced, not including the Pub/Sub channels. 793 * This is used by freeClient() and replicationCacheMaster(). */ 794 void unlinkClient(client *c) { 795 listNode *ln; 796 797 /* If this is marked as current client unset it. */ 798 if (server.current_client == c) server.current_client = NULL; 799 800 /* Certain operations must be done only if the client has an active socket. 801 * If the client was already unlinked or if it's a "fake client" the 802 * fd is already set to -1. */ 803 if (c->fd != -1) { 804 /* Remove from the list of active clients. */ 805 if (c->client_list_node) { 806 uint64_t id = htonu64(c->id); 807 raxRemove(server.clients_index,(unsigned char*)&id,sizeof(id),NULL); 808 listDelNode(server.clients,c->client_list_node); 809 c->client_list_node = NULL; 810 } 811 812 /* In the case of diskless replication the fork is writing to the 813 * sockets and just closing the fd isn't enough, if we don't also 814 * shutdown the socket the fork will continue to write to the slave 815 * and the salve will only find out that it was disconnected when 816 * it will finish reading the rdb. */ 817 if ((c->flags & CLIENT_SLAVE) && 818 (c->replstate == SLAVE_STATE_WAIT_BGSAVE_END)) { 819 shutdown(c->fd, SHUT_RDWR); 820 } 821 822 /* Unregister async I/O handlers and close the socket. */ 823 aeDeleteFileEvent(server.el,c->fd,AE_READABLE); 824 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); 825 close(c->fd); 826 c->fd = -1; 827 } 828 829 /* Remove from the list of pending writes if needed. */ 830 if (c->flags & CLIENT_PENDING_WRITE) { 831 ln = listSearchKey(server.clients_pending_write,c); 832 serverAssert(ln != NULL); 833 listDelNode(server.clients_pending_write,ln); 834 c->flags &= ~CLIENT_PENDING_WRITE; 835 } 836 837 /* When client was just unblocked because of a blocking operation, 838 * remove it from the list of unblocked clients. */ 839 if (c->flags & CLIENT_UNBLOCKED) { 840 ln = listSearchKey(server.unblocked_clients,c); 841 serverAssert(ln != NULL); 842 listDelNode(server.unblocked_clients,ln); 843 c->flags &= ~CLIENT_UNBLOCKED; 844 } 845 } 846 847 void freeClient(client *c) { 848 listNode *ln; 849 850 /* If a client is protected, yet we need to free it right now, make sure 851 * to at least use asynchronous freeing. */ 852 if (c->flags & CLIENT_PROTECTED) { 853 freeClientAsync(c); 854 return; 855 } 856 857 /* If it is our master that's beging disconnected we should make sure 858 * to cache the state to try a partial resynchronization later. 859 * 860 * Note that before doing this we make sure that the client is not in 861 * some unexpected state, by checking its flags. */ 862 if (server.master && c->flags & CLIENT_MASTER) { 863 serverLog(LL_WARNING,"Connection with master lost."); 864 if (!(c->flags & (CLIENT_CLOSE_AFTER_REPLY| 865 CLIENT_CLOSE_ASAP| 866 CLIENT_BLOCKED))) 867 { 868 replicationCacheMaster(c); 869 return; 870 } 871 } 872 873 /* Log link disconnection with slave */ 874 if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) { 875 serverLog(LL_WARNING,"Connection with replica %s lost.", 876 replicationGetSlaveName(c)); 877 } 878 879 /* Free the query buffer */ 880 sdsfree(c->querybuf); 881 sdsfree(c->pending_querybuf); 882 c->querybuf = NULL; 883 884 /* Deallocate structures used to block on blocking ops. */ 885 if (c->flags & CLIENT_BLOCKED) unblockClient(c); 886 dictRelease(c->bpop.keys); 887 888 /* UNWATCH all the keys */ 889 unwatchAllKeys(c); 890 listRelease(c->watched_keys); 891 892 /* Unsubscribe from all the pubsub channels */ 893 pubsubUnsubscribeAllChannels(c,0); 894 pubsubUnsubscribeAllPatterns(c,0); 895 dictRelease(c->pubsub_channels); 896 listRelease(c->pubsub_patterns); 897 898 /* Free data structures. */ 899 listRelease(c->reply); 900 freeClientArgv(c); 901 902 /* Unlink the client: this will close the socket, remove the I/O 903 * handlers, and remove references of the client from different 904 * places where active clients may be referenced. */ 905 unlinkClient(c); 906 907 /* Master/slave cleanup Case 1: 908 * we lost the connection with a slave. */ 909 if (c->flags & CLIENT_SLAVE) { 910 if (c->replstate == SLAVE_STATE_SEND_BULK) { 911 if (c->repldbfd != -1) close(c->repldbfd); 912 if (c->replpreamble) sdsfree(c->replpreamble); 913 } 914 list *l = (c->flags & CLIENT_MONITOR) ? server.monitors : server.slaves; 915 ln = listSearchKey(l,c); 916 serverAssert(ln != NULL); 917 listDelNode(l,ln); 918 /* We need to remember the time when we started to have zero 919 * attached slaves, as after some time we'll free the replication 920 * backlog. */ 921 if (c->flags & CLIENT_SLAVE && listLength(server.slaves) == 0) 922 server.repl_no_slaves_since = server.unixtime; 923 refreshGoodSlavesCount(); 924 } 925 926 /* Master/slave cleanup Case 2: 927 * we lost the connection with the master. */ 928 if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection(); 929 930 /* If this client was scheduled for async freeing we need to remove it 931 * from the queue. */ 932 if (c->flags & CLIENT_CLOSE_ASAP) { 933 ln = listSearchKey(server.clients_to_close,c); 934 serverAssert(ln != NULL); 935 listDelNode(server.clients_to_close,ln); 936 } 937 938 /* Release other dynamically allocated client structure fields, 939 * and finally release the client structure itself. */ 940 if (c->name) decrRefCount(c->name); 941 zfree(c->argv); 942 freeClientMultiState(c); 943 sdsfree(c->peerid); 944 zfree(c); 945 } 946 947 /* Schedule a client to free it at a safe time in the serverCron() function. 948 * This function is useful when we need to terminate a client but we are in 949 * a context where calling freeClient() is not possible, because the client 950 * should be valid for the continuation of the flow of the program. */ 951 void freeClientAsync(client *c) { 952 if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return; 953 c->flags |= CLIENT_CLOSE_ASAP; 954 listAddNodeTail(server.clients_to_close,c); 955 } 956 957 void freeClientsInAsyncFreeQueue(void) { 958 while (listLength(server.clients_to_close)) { 959 listNode *ln = listFirst(server.clients_to_close); 960 client *c = listNodeValue(ln); 961 962 c->flags &= ~CLIENT_CLOSE_ASAP; 963 freeClient(c); 964 listDelNode(server.clients_to_close,ln); 965 } 966 } 967 968 /* Return a client by ID, or NULL if the client ID is not in the set 969 * of registered clients. Note that "fake clients", created with -1 as FD, 970 * are not registered clients. */ 971 client *lookupClientByID(uint64_t id) { 972 id = htonu64(id); 973 client *c = raxFind(server.clients_index,(unsigned char*)&id,sizeof(id)); 974 return (c == raxNotFound) ? NULL : c; 975 } 976 977 /* Write data in output buffers to client. Return C_OK if the client 978 * is still valid after the call, C_ERR if it was freed. */ 979 int writeToClient(int fd, client *c, int handler_installed) { 980 ssize_t nwritten = 0, totwritten = 0; 981 size_t objlen; 982 clientReplyBlock *o; 983 984 while(clientHasPendingReplies(c)) { 985 if (c->bufpos > 0) { 986 nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen); 987 if (nwritten <= 0) break; 988 c->sentlen += nwritten; 989 totwritten += nwritten; 990 991 /* If the buffer was sent, set bufpos to zero to continue with 992 * the remainder of the reply. */ 993 if ((int)c->sentlen == c->bufpos) { 994 c->bufpos = 0; 995 c->sentlen = 0; 996 } 997 } else { 998 o = listNodeValue(listFirst(c->reply)); 999 objlen = o->used; 1000 1001 if (objlen == 0) { 1002 c->reply_bytes -= o->size; 1003 listDelNode(c->reply,listFirst(c->reply)); 1004 continue; 1005 } 1006 1007 nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen); 1008 if (nwritten <= 0) break; 1009 c->sentlen += nwritten; 1010 totwritten += nwritten; 1011 1012 /* If we fully sent the object on head go to the next one */ 1013 if (c->sentlen == objlen) { 1014 c->reply_bytes -= o->size; 1015 listDelNode(c->reply,listFirst(c->reply)); 1016 c->sentlen = 0; 1017 /* If there are no longer objects in the list, we expect 1018 * the count of reply bytes to be exactly zero. */ 1019 if (listLength(c->reply) == 0) 1020 serverAssert(c->reply_bytes == 0); 1021 } 1022 } 1023 /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT 1024 * bytes, in a single threaded server it's a good idea to serve 1025 * other clients as well, even if a very large request comes from 1026 * super fast link that is always able to accept data (in real world 1027 * scenario think about 'KEYS *' against the loopback interface). 1028 * 1029 * However if we are over the maxmemory limit we ignore that and 1030 * just deliver as much data as it is possible to deliver. 1031 * 1032 * Moreover, we also send as much as possible if the client is 1033 * a slave (otherwise, on high-speed traffic, the replication 1034 * buffer will grow indefinitely) */ 1035 if (totwritten > NET_MAX_WRITES_PER_EVENT && 1036 (server.maxmemory == 0 || 1037 zmalloc_used_memory() < server.maxmemory) && 1038 !(c->flags & CLIENT_SLAVE)) break; 1039 } 1040 server.stat_net_output_bytes += totwritten; 1041 if (nwritten == -1) { 1042 if (errno == EAGAIN) { 1043 nwritten = 0; 1044 } else { 1045 serverLog(LL_VERBOSE, 1046 "Error writing to client: %s", strerror(errno)); 1047 freeClient(c); 1048 return C_ERR; 1049 } 1050 } 1051 if (totwritten > 0) { 1052 /* For clients representing masters we don't count sending data 1053 * as an interaction, since we always send REPLCONF ACK commands 1054 * that take some time to just fill the socket output buffer. 1055 * We just rely on data / pings received for timeout detection. */ 1056 if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime; 1057 } 1058 if (!clientHasPendingReplies(c)) { 1059 c->sentlen = 0; 1060 if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); 1061 1062 /* Close connection after entire reply has been sent. */ 1063 if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { 1064 freeClient(c); 1065 return C_ERR; 1066 } 1067 } 1068 return C_OK; 1069 } 1070 1071 /* Write event handler. Just send data to the client. */ 1072 void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { 1073 UNUSED(el); 1074 UNUSED(mask); 1075 writeToClient(fd,privdata,1); 1076 } 1077 1078 /* This function is called just before entering the event loop, in the hope 1079 * we can just write the replies to the client output buffer without any 1080 * need to use a syscall in order to install the writable event handler, 1081 * get it called, and so forth. */ 1082 int handleClientsWithPendingWrites(void) { 1083 listIter li; 1084 listNode *ln; 1085 int processed = listLength(server.clients_pending_write); 1086 1087 listRewind(server.clients_pending_write,&li); 1088 while((ln = listNext(&li))) { 1089 client *c = listNodeValue(ln); 1090 c->flags &= ~CLIENT_PENDING_WRITE; 1091 listDelNode(server.clients_pending_write,ln); 1092 1093 /* If a client is protected, don't do anything, 1094 * that may trigger write error or recreate handler. */ 1095 if (c->flags & CLIENT_PROTECTED) continue; 1096 1097 /* Try to write buffers to the client socket. */ 1098 if (writeToClient(c->fd,c,0) == C_ERR) continue; 1099 1100 /* If after the synchronous writes above we still have data to 1101 * output to the client, we need to install the writable handler. */ 1102 if (clientHasPendingReplies(c)) { 1103 int ae_flags = AE_WRITABLE; 1104 /* For the fsync=always policy, we want that a given FD is never 1105 * served for reading and writing in the same event loop iteration, 1106 * so that in the middle of receiving the query, and serving it 1107 * to the client, we'll call beforeSleep() that will do the 1108 * actual fsync of AOF to disk. AE_BARRIER ensures that. */ 1109 if (server.aof_state == AOF_ON && 1110 server.aof_fsync == AOF_FSYNC_ALWAYS) 1111 { 1112 ae_flags |= AE_BARRIER; 1113 } 1114 if (aeCreateFileEvent(server.el, c->fd, ae_flags, 1115 sendReplyToClient, c) == AE_ERR) 1116 { 1117 freeClientAsync(c); 1118 } 1119 } 1120 } 1121 return processed; 1122 } 1123 1124 /* resetClient prepare the client to process the next command */ 1125 void resetClient(client *c) { 1126 redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL; 1127 1128 freeClientArgv(c); 1129 c->reqtype = 0; 1130 c->multibulklen = 0; 1131 c->bulklen = -1; 1132 1133 /* We clear the ASKING flag as well if we are not inside a MULTI, and 1134 * if what we just executed is not the ASKING command itself. */ 1135 if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand) 1136 c->flags &= ~CLIENT_ASKING; 1137 1138 /* Remove the CLIENT_REPLY_SKIP flag if any so that the reply 1139 * to the next command will be sent, but set the flag if the command 1140 * we just processed was "CLIENT REPLY SKIP". */ 1141 c->flags &= ~CLIENT_REPLY_SKIP; 1142 if (c->flags & CLIENT_REPLY_SKIP_NEXT) { 1143 c->flags |= CLIENT_REPLY_SKIP; 1144 c->flags &= ~CLIENT_REPLY_SKIP_NEXT; 1145 } 1146 } 1147 1148 /* This funciton is used when we want to re-enter the event loop but there 1149 * is the risk that the client we are dealing with will be freed in some 1150 * way. This happens for instance in: 1151 * 1152 * * DEBUG RELOAD and similar. 1153 * * When a Lua script is in -BUSY state. 1154 * 1155 * So the function will protect the client by doing two things: 1156 * 1157 * 1) It removes the file events. This way it is not possible that an 1158 * error is signaled on the socket, freeing the client. 1159 * 2) Moreover it makes sure that if the client is freed in a different code 1160 * path, it is not really released, but only marked for later release. */ 1161 void protectClient(client *c) { 1162 c->flags |= CLIENT_PROTECTED; 1163 aeDeleteFileEvent(server.el,c->fd,AE_READABLE); 1164 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); 1165 } 1166 1167 /* This will undo the client protection done by protectClient() */ 1168 void unprotectClient(client *c) { 1169 if (c->flags & CLIENT_PROTECTED) { 1170 c->flags &= ~CLIENT_PROTECTED; 1171 aeCreateFileEvent(server.el,c->fd,AE_READABLE,readQueryFromClient,c); 1172 if (clientHasPendingReplies(c)) clientInstallWriteHandler(c); 1173 } 1174 } 1175 1176 /* Like processMultibulkBuffer(), but for the inline protocol instead of RESP, 1177 * this function consumes the client query buffer and creates a command ready 1178 * to be executed inside the client structure. Returns C_OK if the command 1179 * is ready to be executed, or C_ERR if there is still protocol to read to 1180 * have a well formed command. The function also returns C_ERR when there is 1181 * a protocol error: in such a case the client structure is setup to reply 1182 * with the error and close the connection. */ 1183 int processInlineBuffer(client *c) { 1184 char *newline; 1185 int argc, j, linefeed_chars = 1; 1186 sds *argv, aux; 1187 size_t querylen; 1188 1189 /* Search for end of line */ 1190 newline = strchr(c->querybuf+c->qb_pos,'\n'); 1191 1192 /* Nothing to do without a \r\n */ 1193 if (newline == NULL) { 1194 if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) { 1195 addReplyError(c,"Protocol error: too big inline request"); 1196 setProtocolError("too big inline request",c); 1197 } 1198 return C_ERR; 1199 } 1200 1201 /* Handle the \r\n case. */ 1202 if (newline && newline != c->querybuf+c->qb_pos && *(newline-1) == '\r') 1203 newline--, linefeed_chars++; 1204 1205 /* Split the input buffer up to the \r\n */ 1206 querylen = newline-(c->querybuf+c->qb_pos); 1207 aux = sdsnewlen(c->querybuf+c->qb_pos,querylen); 1208 argv = sdssplitargs(aux,&argc); 1209 sdsfree(aux); 1210 if (argv == NULL) { 1211 addReplyError(c,"Protocol error: unbalanced quotes in request"); 1212 setProtocolError("unbalanced quotes in inline request",c); 1213 return C_ERR; 1214 } 1215 1216 /* Newline from slaves can be used to refresh the last ACK time. 1217 * This is useful for a slave to ping back while loading a big 1218 * RDB file. */ 1219 if (querylen == 0 && c->flags & CLIENT_SLAVE) 1220 c->repl_ack_time = server.unixtime; 1221 1222 /* Move querybuffer position to the next query in the buffer. */ 1223 c->qb_pos += querylen+linefeed_chars; 1224 1225 /* Setup argv array on client structure */ 1226 if (argc) { 1227 if (c->argv) zfree(c->argv); 1228 c->argv = zmalloc(sizeof(robj*)*argc); 1229 } 1230 1231 /* Create redis objects for all arguments. */ 1232 for (c->argc = 0, j = 0; j < argc; j++) { 1233 if (sdslen(argv[j])) { 1234 c->argv[c->argc] = createObject(OBJ_STRING,argv[j]); 1235 c->argc++; 1236 } else { 1237 sdsfree(argv[j]); 1238 } 1239 } 1240 zfree(argv); 1241 return C_OK; 1242 } 1243 1244 /* Helper function. Record protocol erro details in server log, 1245 * and set the client as CLIENT_CLOSE_AFTER_REPLY. */ 1246 #define PROTO_DUMP_LEN 128 1247 static void setProtocolError(const char *errstr, client *c) { 1248 if (server.verbosity <= LL_VERBOSE) { 1249 sds client = catClientInfoString(sdsempty(),c); 1250 1251 /* Sample some protocol to given an idea about what was inside. */ 1252 char buf[256]; 1253 if (sdslen(c->querybuf)-c->qb_pos < PROTO_DUMP_LEN) { 1254 snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%s'", c->querybuf+c->qb_pos); 1255 } else { 1256 snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%.*s' (... more %zu bytes ...) '%.*s'", PROTO_DUMP_LEN/2, c->querybuf+c->qb_pos, sdslen(c->querybuf)-c->qb_pos-PROTO_DUMP_LEN, PROTO_DUMP_LEN/2, c->querybuf+sdslen(c->querybuf)-PROTO_DUMP_LEN/2); 1257 } 1258 1259 /* Remove non printable chars. */ 1260 char *p = buf; 1261 while (*p != '\0') { 1262 if (!isprint(*p)) *p = '.'; 1263 p++; 1264 } 1265 1266 /* Log all the client and protocol info. */ 1267 serverLog(LL_VERBOSE, 1268 "Protocol error (%s) from client: %s. %s", errstr, client, buf); 1269 sdsfree(client); 1270 } 1271 c->flags |= CLIENT_CLOSE_AFTER_REPLY; 1272 } 1273 1274 /* Process the query buffer for client 'c', setting up the client argument 1275 * vector for command execution. Returns C_OK if after running the function 1276 * the client has a well-formed ready to be processed command, otherwise 1277 * C_ERR if there is still to read more buffer to get the full command. 1278 * The function also returns C_ERR when there is a protocol error: in such a 1279 * case the client structure is setup to reply with the error and close 1280 * the connection. 1281 * 1282 * This function is called if processInputBuffer() detects that the next 1283 * command is in RESP format, so the first byte in the command is found 1284 * to be '*'. Otherwise for inline commands processInlineBuffer() is called. */ 1285 int processMultibulkBuffer(client *c) { 1286 char *newline = NULL; 1287 int ok; 1288 long long ll; 1289 1290 if (c->multibulklen == 0) { 1291 /* The client should have been reset */ 1292 serverAssertWithInfo(c,NULL,c->argc == 0); 1293 1294 /* Multi bulk length cannot be read without a \r\n */ 1295 newline = strchr(c->querybuf+c->qb_pos,'\r'); 1296 if (newline == NULL) { 1297 if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) { 1298 addReplyError(c,"Protocol error: too big mbulk count string"); 1299 setProtocolError("too big mbulk count string",c); 1300 } 1301 return C_ERR; 1302 } 1303 1304 /* Buffer should also contain \n */ 1305 if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2)) 1306 return C_ERR; 1307 1308 /* We know for sure there is a whole line since newline != NULL, 1309 * so go ahead and find out the multi bulk length. */ 1310 serverAssertWithInfo(c,NULL,c->querybuf[c->qb_pos] == '*'); 1311 ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll); 1312 if (!ok || ll > 1024*1024) { 1313 addReplyError(c,"Protocol error: invalid multibulk length"); 1314 setProtocolError("invalid mbulk count",c); 1315 return C_ERR; 1316 } 1317 1318 c->qb_pos = (newline-c->querybuf)+2; 1319 1320 if (ll <= 0) return C_OK; 1321 1322 c->multibulklen = ll; 1323 1324 /* Setup argv array on client structure */ 1325 if (c->argv) zfree(c->argv); 1326 c->argv = zmalloc(sizeof(robj*)*c->multibulklen); 1327 } 1328 1329 serverAssertWithInfo(c,NULL,c->multibulklen > 0); 1330 while(c->multibulklen) { 1331 /* Read bulk length if unknown */ 1332 if (c->bulklen == -1) { 1333 newline = strchr(c->querybuf+c->qb_pos,'\r'); 1334 if (newline == NULL) { 1335 if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) { 1336 addReplyError(c, 1337 "Protocol error: too big bulk count string"); 1338 setProtocolError("too big bulk count string",c); 1339 return C_ERR; 1340 } 1341 break; 1342 } 1343 1344 /* Buffer should also contain \n */ 1345 if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2)) 1346 break; 1347 1348 if (c->querybuf[c->qb_pos] != '$') { 1349 addReplyErrorFormat(c, 1350 "Protocol error: expected '$', got '%c'", 1351 c->querybuf[c->qb_pos]); 1352 setProtocolError("expected $ but got something else",c); 1353 return C_ERR; 1354 } 1355 1356 ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll); 1357 if (!ok || ll < 0 || ll > server.proto_max_bulk_len) { 1358 addReplyError(c,"Protocol error: invalid bulk length"); 1359 setProtocolError("invalid bulk length",c); 1360 return C_ERR; 1361 } 1362 1363 c->qb_pos = newline-c->querybuf+2; 1364 if (ll >= PROTO_MBULK_BIG_ARG) { 1365 /* If we are going to read a large object from network 1366 * try to make it likely that it will start at c->querybuf 1367 * boundary so that we can optimize object creation 1368 * avoiding a large copy of data. 1369 * 1370 * But only when the data we have not parsed is less than 1371 * or equal to ll+2. If the data length is greater than 1372 * ll+2, trimming querybuf is just a waste of time, because 1373 * at this time the querybuf contains not only our bulk. */ 1374 if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) { 1375 sdsrange(c->querybuf,c->qb_pos,-1); 1376 c->qb_pos = 0; 1377 /* Hint the sds library about the amount of bytes this string is 1378 * going to contain. */ 1379 c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2); 1380 } 1381 } 1382 c->bulklen = ll; 1383 } 1384 1385 /* Read bulk argument */ 1386 if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) { 1387 /* Not enough data (+2 == trailing \r\n) */ 1388 break; 1389 } else { 1390 /* Optimization: if the buffer contains JUST our bulk element 1391 * instead of creating a new object by *copying* the sds we 1392 * just use the current sds string. */ 1393 if (c->qb_pos == 0 && 1394 c->bulklen >= PROTO_MBULK_BIG_ARG && 1395 sdslen(c->querybuf) == (size_t)(c->bulklen+2)) 1396 { 1397 c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf); 1398 sdsIncrLen(c->querybuf,-2); /* remove CRLF */ 1399 /* Assume that if we saw a fat argument we'll see another one 1400 * likely... */ 1401 c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2); 1402 sdsclear(c->querybuf); 1403 } else { 1404 c->argv[c->argc++] = 1405 createStringObject(c->querybuf+c->qb_pos,c->bulklen); 1406 c->qb_pos += c->bulklen+2; 1407 } 1408 c->bulklen = -1; 1409 c->multibulklen--; 1410 } 1411 } 1412 1413 /* We're done when c->multibulk == 0 */ 1414 if (c->multibulklen == 0) return C_OK; 1415 1416 /* Still not ready to process the command */ 1417 return C_ERR; 1418 } 1419 1420 /* This function is called every time, in the client structure 'c', there is 1421 * more query buffer to process, because we read more data from the socket 1422 * or because a client was blocked and later reactivated, so there could be 1423 * pending query buffer, already representing a full command, to process. */ 1424 void processInputBuffer(client *c) { 1425 server.current_client = c; 1426 1427 /* Keep processing while there is something in the input buffer */ 1428 while(c->qb_pos < sdslen(c->querybuf)) { 1429 /* Return if clients are paused. */ 1430 if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break; 1431 1432 /* Immediately abort if the client is in the middle of something. */ 1433 if (c->flags & CLIENT_BLOCKED) break; 1434 1435 /* Don't process input from the master while there is a busy script 1436 * condition on the slave. We want just to accumulate the replication 1437 * stream (instead of replying -BUSY like we do with other clients) and 1438 * later resume the processing. */ 1439 if (server.lua_timedout && c->flags & CLIENT_MASTER) break; 1440 1441 /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is 1442 * written to the client. Make sure to not let the reply grow after 1443 * this flag has been set (i.e. don't process more commands). 1444 * 1445 * The same applies for clients we want to terminate ASAP. */ 1446 if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break; 1447 1448 /* Determine request type when unknown. */ 1449 if (!c->reqtype) { 1450 if (c->querybuf[c->qb_pos] == '*') { 1451 c->reqtype = PROTO_REQ_MULTIBULK; 1452 } else { 1453 c->reqtype = PROTO_REQ_INLINE; 1454 } 1455 } 1456 1457 if (c->reqtype == PROTO_REQ_INLINE) { 1458 if (processInlineBuffer(c) != C_OK) break; 1459 } else if (c->reqtype == PROTO_REQ_MULTIBULK) { 1460 if (processMultibulkBuffer(c) != C_OK) break; 1461 } else { 1462 serverPanic("Unknown request type"); 1463 } 1464 1465 /* Multibulk processing could see a <= 0 length. */ 1466 if (c->argc == 0) { 1467 resetClient(c); 1468 } else { 1469 /* Only reset the client when the command was executed. */ 1470 if (processCommand(c) == C_OK) { 1471 if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { 1472 /* Update the applied replication offset of our master. */ 1473 c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; 1474 } 1475 1476 /* Don't reset the client structure for clients blocked in a 1477 * module blocking command, so that the reply callback will 1478 * still be able to access the client argv and argc field. 1479 * The client will be reset in unblockClientFromModule(). */ 1480 if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE) 1481 resetClient(c); 1482 } 1483 /* freeMemoryIfNeeded may flush slave output buffers. This may 1484 * result into a slave, that may be the active client, to be 1485 * freed. */ 1486 if (server.current_client == NULL) break; 1487 } 1488 } 1489 1490 /* Trim to pos */ 1491 if (server.current_client != NULL && c->qb_pos) { 1492 sdsrange(c->querybuf,c->qb_pos,-1); 1493 c->qb_pos = 0; 1494 } 1495 1496 server.current_client = NULL; 1497 } 1498 1499 /* This is a wrapper for processInputBuffer that also cares about handling 1500 * the replication forwarding to the sub-slaves, in case the client 'c' 1501 * is flagged as master. Usually you want to call this instead of the 1502 * raw processInputBuffer(). */ 1503 void processInputBufferAndReplicate(client *c) { 1504 if (!(c->flags & CLIENT_MASTER)) { 1505 processInputBuffer(c); 1506 } else { 1507 size_t prev_offset = c->reploff; 1508 processInputBuffer(c); 1509 size_t applied = c->reploff - prev_offset; 1510 if (applied) { 1511 replicationFeedSlavesFromMasterStream(server.slaves, 1512 c->pending_querybuf, applied); 1513 sdsrange(c->pending_querybuf,applied,-1); 1514 } 1515 } 1516 } 1517 1518 void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { 1519 client *c = (client*) privdata; 1520 int nread, readlen; 1521 size_t qblen; 1522 UNUSED(el); 1523 UNUSED(mask); 1524 1525 readlen = PROTO_IOBUF_LEN; 1526 /* If this is a multi bulk request, and we are processing a bulk reply 1527 * that is large enough, try to maximize the probability that the query 1528 * buffer contains exactly the SDS string representing the object, even 1529 * at the risk of requiring more read(2) calls. This way the function 1530 * processMultiBulkBuffer() can avoid copying buffers to create the 1531 * Redis Object representing the argument. */ 1532 if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 1533 && c->bulklen >= PROTO_MBULK_BIG_ARG) 1534 { 1535 ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf); 1536 1537 /* Note that the 'remaining' variable may be zero in some edge case, 1538 * for example once we resume a blocked client after CLIENT PAUSE. */ 1539 if (remaining > 0 && remaining < readlen) readlen = remaining; 1540 } 1541 1542 qblen = sdslen(c->querybuf); 1543 if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; 1544 c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); 1545 nread = read(fd, c->querybuf+qblen, readlen); 1546 if (nread == -1) { 1547 if (errno == EAGAIN) { 1548 return; 1549 } else { 1550 serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno)); 1551 freeClient(c); 1552 return; 1553 } 1554 } else if (nread == 0) { 1555 serverLog(LL_VERBOSE, "Client closed connection"); 1556 freeClient(c); 1557 return; 1558 } else if (c->flags & CLIENT_MASTER) { 1559 /* Append the query buffer to the pending (not applied) buffer 1560 * of the master. We'll use this buffer later in order to have a 1561 * copy of the string applied by the last command executed. */ 1562 c->pending_querybuf = sdscatlen(c->pending_querybuf, 1563 c->querybuf+qblen,nread); 1564 } 1565 1566 sdsIncrLen(c->querybuf,nread); 1567 c->lastinteraction = server.unixtime; 1568 if (c->flags & CLIENT_MASTER) c->read_reploff += nread; 1569 server.stat_net_input_bytes += nread; 1570 if (sdslen(c->querybuf) > server.client_max_querybuf_len) { 1571 sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); 1572 1573 bytes = sdscatrepr(bytes,c->querybuf,64); 1574 serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); 1575 sdsfree(ci); 1576 sdsfree(bytes); 1577 freeClient(c); 1578 return; 1579 } 1580 1581 /* Time to process the buffer. If the client is a master we need to 1582 * compute the difference between the applied offset before and after 1583 * processing the buffer, to understand how much of the replication stream 1584 * was actually applied to the master state: this quantity, and its 1585 * corresponding part of the replication stream, will be propagated to 1586 * the sub-slaves and to the replication backlog. */ 1587 processInputBufferAndReplicate(c); 1588 } 1589 1590 void getClientsMaxBuffers(unsigned long *longest_output_list, 1591 unsigned long *biggest_input_buffer) { 1592 client *c; 1593 listNode *ln; 1594 listIter li; 1595 unsigned long lol = 0, bib = 0; 1596 1597 listRewind(server.clients,&li); 1598 while ((ln = listNext(&li)) != NULL) { 1599 c = listNodeValue(ln); 1600 1601 if (listLength(c->reply) > lol) lol = listLength(c->reply); 1602 if (sdslen(c->querybuf) > bib) bib = sdslen(c->querybuf); 1603 } 1604 *longest_output_list = lol; 1605 *biggest_input_buffer = bib; 1606 } 1607 1608 /* A Redis "Peer ID" is a colon separated ip:port pair. 1609 * For IPv4 it's in the form x.y.z.k:port, example: "127.0.0.1:1234". 1610 * For IPv6 addresses we use [] around the IP part, like in "[::1]:1234". 1611 * For Unix sockets we use path:0, like in "/tmp/redis:0". 1612 * 1613 * A Peer ID always fits inside a buffer of NET_PEER_ID_LEN bytes, including 1614 * the null term. 1615 * 1616 * On failure the function still populates 'peerid' with the "?:0" string 1617 * in case you want to relax error checking or need to display something 1618 * anyway (see anetPeerToString implementation for more info). */ 1619 void genClientPeerId(client *client, char *peerid, 1620 size_t peerid_len) { 1621 if (client->flags & CLIENT_UNIX_SOCKET) { 1622 /* Unix socket client. */ 1623 snprintf(peerid,peerid_len,"%s:0",server.unixsocket); 1624 } else { 1625 /* TCP client. */ 1626 anetFormatPeer(client->fd,peerid,peerid_len); 1627 } 1628 } 1629 1630 /* This function returns the client peer id, by creating and caching it 1631 * if client->peerid is NULL, otherwise returning the cached value. 1632 * The Peer ID never changes during the life of the client, however it 1633 * is expensive to compute. */ 1634 char *getClientPeerId(client *c) { 1635 char peerid[NET_PEER_ID_LEN]; 1636 1637 if (c->peerid == NULL) { 1638 genClientPeerId(c,peerid,sizeof(peerid)); 1639 c->peerid = sdsnew(peerid); 1640 } 1641 return c->peerid; 1642 } 1643 1644 /* Concatenate a string representing the state of a client in an human 1645 * readable format, into the sds string 's'. */ 1646 sds catClientInfoString(sds s, client *client) { 1647 char flags[16], events[3], *p; 1648 int emask; 1649 1650 p = flags; 1651 if (client->flags & CLIENT_SLAVE) { 1652 if (client->flags & CLIENT_MONITOR) 1653 *p++ = 'O'; 1654 else 1655 *p++ = 'S'; 1656 } 1657 if (client->flags & CLIENT_MASTER) *p++ = 'M'; 1658 if (client->flags & CLIENT_PUBSUB) *p++ = 'P'; 1659 if (client->flags & CLIENT_MULTI) *p++ = 'x'; 1660 if (client->flags & CLIENT_BLOCKED) *p++ = 'b'; 1661 if (client->flags & CLIENT_DIRTY_CAS) *p++ = 'd'; 1662 if (client->flags & CLIENT_CLOSE_AFTER_REPLY) *p++ = 'c'; 1663 if (client->flags & CLIENT_UNBLOCKED) *p++ = 'u'; 1664 if (client->flags & CLIENT_CLOSE_ASAP) *p++ = 'A'; 1665 if (client->flags & CLIENT_UNIX_SOCKET) *p++ = 'U'; 1666 if (client->flags & CLIENT_READONLY) *p++ = 'r'; 1667 if (p == flags) *p++ = 'N'; 1668 *p++ = '\0'; 1669 1670 emask = client->fd == -1 ? 0 : aeGetFileEvents(server.el,client->fd); 1671 p = events; 1672 if (emask & AE_READABLE) *p++ = 'r'; 1673 if (emask & AE_WRITABLE) *p++ = 'w'; 1674 *p = '\0'; 1675 return sdscatfmt(s, 1676 "id=%U addr=%s fd=%i name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U obl=%U oll=%U omem=%U events=%s cmd=%s", 1677 (unsigned long long) client->id, 1678 getClientPeerId(client), 1679 client->fd, 1680 client->name ? (char*)client->name->ptr : "", 1681 (long long)(server.unixtime - client->ctime), 1682 (long long)(server.unixtime - client->lastinteraction), 1683 flags, 1684 client->db->id, 1685 (int) dictSize(client->pubsub_channels), 1686 (int) listLength(client->pubsub_patterns), 1687 (client->flags & CLIENT_MULTI) ? client->mstate.count : -1, 1688 (unsigned long long) sdslen(client->querybuf), 1689 (unsigned long long) sdsavail(client->querybuf), 1690 (unsigned long long) client->bufpos, 1691 (unsigned long long) listLength(client->reply), 1692 (unsigned long long) getClientOutputBufferMemoryUsage(client), 1693 events, 1694 client->lastcmd ? client->lastcmd->name : "NULL"); 1695 } 1696 1697 sds getAllClientsInfoString(int type) { 1698 listNode *ln; 1699 listIter li; 1700 client *client; 1701 sds o = sdsnewlen(SDS_NOINIT,200*listLength(server.clients)); 1702 sdsclear(o); 1703 listRewind(server.clients,&li); 1704 while ((ln = listNext(&li)) != NULL) { 1705 client = listNodeValue(ln); 1706 if (type != -1 && getClientType(client) != type) continue; 1707 o = catClientInfoString(o,client); 1708 o = sdscatlen(o,"\n",1); 1709 } 1710 return o; 1711 } 1712 1713 void clientCommand(client *c) { 1714 listNode *ln; 1715 listIter li; 1716 client *client; 1717 1718 if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { 1719 const char *help[] = { 1720 "id -- Return the ID of the current connection.", 1721 "getname -- Return the name of the current connection.", 1722 "kill <ip:port> -- Kill connection made from <ip:port>.", 1723 "kill <option> <value> [option value ...] -- Kill connections. Options are:", 1724 " addr <ip:port> -- Kill connection made from <ip:port>", 1725 " type (normal|master|replica|pubsub) -- Kill connections by type.", 1726 " skipme (yes|no) -- Skip killing current connection (default: yes).", 1727 "list [options ...] -- Return information about client connections. Options:", 1728 " type (normal|master|replica|pubsub) -- Return clients of specified type.", 1729 "pause <timeout> -- Suspend all Redis clients for <timout> milliseconds.", 1730 "reply (on|off|skip) -- Control the replies sent to the current connection.", 1731 "setname <name> -- Assign the name <name> to the current connection.", 1732 "unblock <clientid> [TIMEOUT|ERROR] -- Unblock the specified blocked client.", 1733 NULL 1734 }; 1735 addReplyHelp(c, help); 1736 } else if (!strcasecmp(c->argv[1]->ptr,"id") && c->argc == 2) { 1737 /* CLIENT ID */ 1738 addReplyLongLong(c,c->id); 1739 } else if (!strcasecmp(c->argv[1]->ptr,"list")) { 1740 /* CLIENT LIST */ 1741 int type = -1; 1742 if (c->argc == 4 && !strcasecmp(c->argv[2]->ptr,"type")) { 1743 type = getClientTypeByName(c->argv[3]->ptr); 1744 if (type == -1) { 1745 addReplyErrorFormat(c,"Unknown client type '%s'", 1746 (char*) c->argv[3]->ptr); 1747 return; 1748 } 1749 } else if (c->argc != 2) { 1750 addReply(c,shared.syntaxerr); 1751 return; 1752 } 1753 sds o = getAllClientsInfoString(type); 1754 addReplyBulkCBuffer(c,o,sdslen(o)); 1755 sdsfree(o); 1756 } else if (!strcasecmp(c->argv[1]->ptr,"reply") && c->argc == 3) { 1757 /* CLIENT REPLY ON|OFF|SKIP */ 1758 if (!strcasecmp(c->argv[2]->ptr,"on")) { 1759 c->flags &= ~(CLIENT_REPLY_SKIP|CLIENT_REPLY_OFF); 1760 addReply(c,shared.ok); 1761 } else if (!strcasecmp(c->argv[2]->ptr,"off")) { 1762 c->flags |= CLIENT_REPLY_OFF; 1763 } else if (!strcasecmp(c->argv[2]->ptr,"skip")) { 1764 if (!(c->flags & CLIENT_REPLY_OFF)) 1765 c->flags |= CLIENT_REPLY_SKIP_NEXT; 1766 } else { 1767 addReply(c,shared.syntaxerr); 1768 return; 1769 } 1770 } else if (!strcasecmp(c->argv[1]->ptr,"kill")) { 1771 /* CLIENT KILL <ip:port> 1772 * CLIENT KILL <option> [value] ... <option> [value] */ 1773 char *addr = NULL; 1774 int type = -1; 1775 uint64_t id = 0; 1776 int skipme = 1; 1777 int killed = 0, close_this_client = 0; 1778 1779 if (c->argc == 3) { 1780 /* Old style syntax: CLIENT KILL <addr> */ 1781 addr = c->argv[2]->ptr; 1782 skipme = 0; /* With the old form, you can kill yourself. */ 1783 } else if (c->argc > 3) { 1784 int i = 2; /* Next option index. */ 1785 1786 /* New style syntax: parse options. */ 1787 while(i < c->argc) { 1788 int moreargs = c->argc > i+1; 1789 1790 if (!strcasecmp(c->argv[i]->ptr,"id") && moreargs) { 1791 long long tmp; 1792 1793 if (getLongLongFromObjectOrReply(c,c->argv[i+1],&tmp,NULL) 1794 != C_OK) return; 1795 id = tmp; 1796 } else if (!strcasecmp(c->argv[i]->ptr,"type") && moreargs) { 1797 type = getClientTypeByName(c->argv[i+1]->ptr); 1798 if (type == -1) { 1799 addReplyErrorFormat(c,"Unknown client type '%s'", 1800 (char*) c->argv[i+1]->ptr); 1801 return; 1802 } 1803 } else if (!strcasecmp(c->argv[i]->ptr,"addr") && moreargs) { 1804 addr = c->argv[i+1]->ptr; 1805 } else if (!strcasecmp(c->argv[i]->ptr,"skipme") && moreargs) { 1806 if (!strcasecmp(c->argv[i+1]->ptr,"yes")) { 1807 skipme = 1; 1808 } else if (!strcasecmp(c->argv[i+1]->ptr,"no")) { 1809 skipme = 0; 1810 } else { 1811 addReply(c,shared.syntaxerr); 1812 return; 1813 } 1814 } else { 1815 addReply(c,shared.syntaxerr); 1816 return; 1817 } 1818 i += 2; 1819 } 1820 } else { 1821 addReply(c,shared.syntaxerr); 1822 return; 1823 } 1824 1825 /* Iterate clients killing all the matching clients. */ 1826 listRewind(server.clients,&li); 1827 while ((ln = listNext(&li)) != NULL) { 1828 client = listNodeValue(ln); 1829 if (addr && strcmp(getClientPeerId(client),addr) != 0) continue; 1830 if (type != -1 && getClientType(client) != type) continue; 1831 if (id != 0 && client->id != id) continue; 1832 if (c == client && skipme) continue; 1833 1834 /* Kill it. */ 1835 if (c == client) { 1836 close_this_client = 1; 1837 } else { 1838 freeClient(client); 1839 } 1840 killed++; 1841 } 1842 1843 /* Reply according to old/new format. */ 1844 if (c->argc == 3) { 1845 if (killed == 0) 1846 addReplyError(c,"No such client"); 1847 else 1848 addReply(c,shared.ok); 1849 } else { 1850 addReplyLongLong(c,killed); 1851 } 1852 1853 /* If this client has to be closed, flag it as CLOSE_AFTER_REPLY 1854 * only after we queued the reply to its output buffers. */ 1855 if (close_this_client) c->flags |= CLIENT_CLOSE_AFTER_REPLY; 1856 } else if (!strcasecmp(c->argv[1]->ptr,"unblock") && (c->argc == 3 || 1857 c->argc == 4)) 1858 { 1859 /* CLIENT UNBLOCK <id> [timeout|error] */ 1860 long long id; 1861 int unblock_error = 0; 1862 1863 if (c->argc == 4) { 1864 if (!strcasecmp(c->argv[3]->ptr,"timeout")) { 1865 unblock_error = 0; 1866 } else if (!strcasecmp(c->argv[3]->ptr,"error")) { 1867 unblock_error = 1; 1868 } else { 1869 addReplyError(c, 1870 "CLIENT UNBLOCK reason should be TIMEOUT or ERROR"); 1871 return; 1872 } 1873 } 1874 if (getLongLongFromObjectOrReply(c,c->argv[2],&id,NULL) 1875 != C_OK) return; 1876 struct client *target = lookupClientByID(id); 1877 if (target && target->flags & CLIENT_BLOCKED) { 1878 if (unblock_error) 1879 addReplyError(target, 1880 "-UNBLOCKED client unblocked via CLIENT UNBLOCK"); 1881 else 1882 replyToBlockedClientTimedOut(target); 1883 unblockClient(target); 1884 addReply(c,shared.cone); 1885 } else { 1886 addReply(c,shared.czero); 1887 } 1888 } else if (!strcasecmp(c->argv[1]->ptr,"setname") && c->argc == 3) { 1889 int j, len = sdslen(c->argv[2]->ptr); 1890 char *p = c->argv[2]->ptr; 1891 1892 /* Setting the client name to an empty string actually removes 1893 * the current name. */ 1894 if (len == 0) { 1895 if (c->name) decrRefCount(c->name); 1896 c->name = NULL; 1897 addReply(c,shared.ok); 1898 return; 1899 } 1900 1901 /* Otherwise check if the charset is ok. We need to do this otherwise 1902 * CLIENT LIST format will break. You should always be able to 1903 * split by space to get the different fields. */ 1904 for (j = 0; j < len; j++) { 1905 if (p[j] < '!' || p[j] > '~') { /* ASCII is assumed. */ 1906 addReplyError(c, 1907 "Client names cannot contain spaces, " 1908 "newlines or special characters."); 1909 return; 1910 } 1911 } 1912 if (c->name) decrRefCount(c->name); 1913 c->name = c->argv[2]; 1914 incrRefCount(c->name); 1915 addReply(c,shared.ok); 1916 } else if (!strcasecmp(c->argv[1]->ptr,"getname") && c->argc == 2) { 1917 if (c->name) 1918 addReplyBulk(c,c->name); 1919 else 1920 addReply(c,shared.nullbulk); 1921 } else if (!strcasecmp(c->argv[1]->ptr,"pause") && c->argc == 3) { 1922 long long duration; 1923 1924 if (getTimeoutFromObjectOrReply(c,c->argv[2],&duration,UNIT_MILLISECONDS) 1925 != C_OK) return; 1926 pauseClients(duration); 1927 addReply(c,shared.ok); 1928 } else { 1929 addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try CLIENT HELP", (char*)c->argv[1]->ptr); 1930 } 1931 } 1932 1933 /* This callback is bound to POST and "Host:" command names. Those are not 1934 * really commands, but are used in security attacks in order to talk to 1935 * Redis instances via HTTP, with a technique called "cross protocol scripting" 1936 * which exploits the fact that services like Redis will discard invalid 1937 * HTTP headers and will process what follows. 1938 * 1939 * As a protection against this attack, Redis will terminate the connection 1940 * when a POST or "Host:" header is seen, and will log the event from 1941 * time to time (to avoid creating a DOS as a result of too many logs). */ 1942 void securityWarningCommand(client *c) { 1943 static time_t logged_time; 1944 time_t now = time(NULL); 1945 1946 if (labs(now-logged_time) > 60) { 1947 serverLog(LL_WARNING,"Possible SECURITY ATTACK detected. It looks like somebody is sending POST or Host: commands to Redis. This is likely due to an attacker attempting to use Cross Protocol Scripting to compromise your Redis instance. Connection aborted."); 1948 logged_time = now; 1949 } 1950 freeClientAsync(c); 1951 } 1952 1953 /* Rewrite the command vector of the client. All the new objects ref count 1954 * is incremented. The old command vector is freed, and the old objects 1955 * ref count is decremented. */ 1956 void rewriteClientCommandVector(client *c, int argc, ...) { 1957 va_list ap; 1958 int j; 1959 robj **argv; /* The new argument vector */ 1960 1961 argv = zmalloc(sizeof(robj*)*argc); 1962 va_start(ap,argc); 1963 for (j = 0; j < argc; j++) { 1964 robj *a; 1965 1966 a = va_arg(ap, robj*); 1967 argv[j] = a; 1968 incrRefCount(a); 1969 } 1970 /* We free the objects in the original vector at the end, so we are 1971 * sure that if the same objects are reused in the new vector the 1972 * refcount gets incremented before it gets decremented. */ 1973 for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]); 1974 zfree(c->argv); 1975 /* Replace argv and argc with our new versions. */ 1976 c->argv = argv; 1977 c->argc = argc; 1978 c->cmd = lookupCommandOrOriginal(c->argv[0]->ptr); 1979 serverAssertWithInfo(c,NULL,c->cmd != NULL); 1980 va_end(ap); 1981 } 1982 1983 /* Completely replace the client command vector with the provided one. */ 1984 void replaceClientCommandVector(client *c, int argc, robj **argv) { 1985 freeClientArgv(c); 1986 zfree(c->argv); 1987 c->argv = argv; 1988 c->argc = argc; 1989 c->cmd = lookupCommandOrOriginal(c->argv[0]->ptr); 1990 serverAssertWithInfo(c,NULL,c->cmd != NULL); 1991 } 1992 1993 /* Rewrite a single item in the command vector. 1994 * The new val ref count is incremented, and the old decremented. 1995 * 1996 * It is possible to specify an argument over the current size of the 1997 * argument vector: in this case the array of objects gets reallocated 1998 * and c->argc set to the max value. However it's up to the caller to 1999 * 2000 * 1. Make sure there are no "holes" and all the arguments are set. 2001 * 2. If the original argument vector was longer than the one we 2002 * want to end with, it's up to the caller to set c->argc and 2003 * free the no longer used objects on c->argv. */ 2004 void rewriteClientCommandArgument(client *c, int i, robj *newval) { 2005 robj *oldval; 2006 2007 if (i >= c->argc) { 2008 c->argv = zrealloc(c->argv,sizeof(robj*)*(i+1)); 2009 c->argc = i+1; 2010 c->argv[i] = NULL; 2011 } 2012 oldval = c->argv[i]; 2013 c->argv[i] = newval; 2014 incrRefCount(newval); 2015 if (oldval) decrRefCount(oldval); 2016 2017 /* If this is the command name make sure to fix c->cmd. */ 2018 if (i == 0) { 2019 c->cmd = lookupCommandOrOriginal(c->argv[0]->ptr); 2020 serverAssertWithInfo(c,NULL,c->cmd != NULL); 2021 } 2022 } 2023 2024 /* This function returns the number of bytes that Redis is 2025 * using to store the reply still not read by the client. 2026 * 2027 * Note: this function is very fast so can be called as many time as 2028 * the caller wishes. The main usage of this function currently is 2029 * enforcing the client output length limits. */ 2030 unsigned long getClientOutputBufferMemoryUsage(client *c) { 2031 unsigned long list_item_size = sizeof(listNode) + sizeof(clientReplyBlock); 2032 return c->reply_bytes + (list_item_size*listLength(c->reply)); 2033 } 2034 2035 /* Get the class of a client, used in order to enforce limits to different 2036 * classes of clients. 2037 * 2038 * The function will return one of the following: 2039 * CLIENT_TYPE_NORMAL -> Normal client 2040 * CLIENT_TYPE_SLAVE -> Slave or client executing MONITOR command 2041 * CLIENT_TYPE_PUBSUB -> Client subscribed to Pub/Sub channels 2042 * CLIENT_TYPE_MASTER -> The client representing our replication master. 2043 */ 2044 int getClientType(client *c) { 2045 if (c->flags & CLIENT_MASTER) return CLIENT_TYPE_MASTER; 2046 if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) 2047 return CLIENT_TYPE_SLAVE; 2048 if (c->flags & CLIENT_PUBSUB) return CLIENT_TYPE_PUBSUB; 2049 return CLIENT_TYPE_NORMAL; 2050 } 2051 2052 int getClientTypeByName(char *name) { 2053 if (!strcasecmp(name,"normal")) return CLIENT_TYPE_NORMAL; 2054 else if (!strcasecmp(name,"slave")) return CLIENT_TYPE_SLAVE; 2055 else if (!strcasecmp(name,"replica")) return CLIENT_TYPE_SLAVE; 2056 else if (!strcasecmp(name,"pubsub")) return CLIENT_TYPE_PUBSUB; 2057 else if (!strcasecmp(name,"master")) return CLIENT_TYPE_MASTER; 2058 else return -1; 2059 } 2060 2061 char *getClientTypeName(int class) { 2062 switch(class) { 2063 case CLIENT_TYPE_NORMAL: return "normal"; 2064 case CLIENT_TYPE_SLAVE: return "slave"; 2065 case CLIENT_TYPE_PUBSUB: return "pubsub"; 2066 case CLIENT_TYPE_MASTER: return "master"; 2067 default: return NULL; 2068 } 2069 } 2070 2071 /* The function checks if the client reached output buffer soft or hard 2072 * limit, and also update the state needed to check the soft limit as 2073 * a side effect. 2074 * 2075 * Return value: non-zero if the client reached the soft or the hard limit. 2076 * Otherwise zero is returned. */ 2077 int checkClientOutputBufferLimits(client *c) { 2078 int soft = 0, hard = 0, class; 2079 unsigned long used_mem = getClientOutputBufferMemoryUsage(c); 2080 2081 class = getClientType(c); 2082 /* For the purpose of output buffer limiting, masters are handled 2083 * like normal clients. */ 2084 if (class == CLIENT_TYPE_MASTER) class = CLIENT_TYPE_NORMAL; 2085 2086 if (server.client_obuf_limits[class].hard_limit_bytes && 2087 used_mem >= server.client_obuf_limits[class].hard_limit_bytes) 2088 hard = 1; 2089 if (server.client_obuf_limits[class].soft_limit_bytes && 2090 used_mem >= server.client_obuf_limits[class].soft_limit_bytes) 2091 soft = 1; 2092 2093 /* We need to check if the soft limit is reached continuously for the 2094 * specified amount of seconds. */ 2095 if (soft) { 2096 if (c->obuf_soft_limit_reached_time == 0) { 2097 c->obuf_soft_limit_reached_time = server.unixtime; 2098 soft = 0; /* First time we see the soft limit reached */ 2099 } else { 2100 time_t elapsed = server.unixtime - c->obuf_soft_limit_reached_time; 2101 2102 if (elapsed <= 2103 server.client_obuf_limits[class].soft_limit_seconds) { 2104 soft = 0; /* The client still did not reached the max number of 2105 seconds for the soft limit to be considered 2106 reached. */ 2107 } 2108 } 2109 } else { 2110 c->obuf_soft_limit_reached_time = 0; 2111 } 2112 return soft || hard; 2113 } 2114 2115 /* Asynchronously close a client if soft or hard limit is reached on the 2116 * output buffer size. The caller can check if the client will be closed 2117 * checking if the client CLIENT_CLOSE_ASAP flag is set. 2118 * 2119 * Note: we need to close the client asynchronously because this function is 2120 * called from contexts where the client can't be freed safely, i.e. from the 2121 * lower level functions pushing data inside the client output buffers. */ 2122 void asyncCloseClientOnOutputBufferLimitReached(client *c) { 2123 if (c->fd == -1) return; /* It is unsafe to free fake clients. */ 2124 serverAssert(c->reply_bytes < SIZE_MAX-(1024*64)); 2125 if (c->reply_bytes == 0 || c->flags & CLIENT_CLOSE_ASAP) return; 2126 if (checkClientOutputBufferLimits(c)) { 2127 sds client = catClientInfoString(sdsempty(),c); 2128 2129 freeClientAsync(c); 2130 serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client); 2131 sdsfree(client); 2132 } 2133 } 2134 2135 /* Helper function used by freeMemoryIfNeeded() in order to flush slaves 2136 * output buffers without returning control to the event loop. 2137 * This is also called by SHUTDOWN for a best-effort attempt to send 2138 * slaves the latest writes. */ 2139 void flushSlavesOutputBuffers(void) { 2140 listIter li; 2141 listNode *ln; 2142 2143 listRewind(server.slaves,&li); 2144 while((ln = listNext(&li))) { 2145 client *slave = listNodeValue(ln); 2146 int events; 2147 2148 /* Note that the following will not flush output buffers of slaves 2149 * in STATE_ONLINE but having put_online_on_ack set to true: in this 2150 * case the writable event is never installed, since the purpose 2151 * of put_online_on_ack is to postpone the moment it is installed. 2152 * This is what we want since slaves in this state should not receive 2153 * writes before the first ACK. */ 2154 events = aeGetFileEvents(server.el,slave->fd); 2155 if (events & AE_WRITABLE && 2156 slave->replstate == SLAVE_STATE_ONLINE && 2157 clientHasPendingReplies(slave)) 2158 { 2159 writeToClient(slave->fd,slave,0); 2160 } 2161 } 2162 } 2163 2164 /* Pause clients up to the specified unixtime (in ms). While clients 2165 * are paused no command is processed from clients, so the data set can't 2166 * change during that time. 2167 * 2168 * However while this function pauses normal and Pub/Sub clients, slaves are 2169 * still served, so this function can be used on server upgrades where it is 2170 * required that slaves process the latest bytes from the replication stream 2171 * before being turned to masters. 2172 * 2173 * This function is also internally used by Redis Cluster for the manual 2174 * failover procedure implemented by CLUSTER FAILOVER. 2175 * 2176 * The function always succeed, even if there is already a pause in progress. 2177 * In such a case, the pause is extended if the duration is more than the 2178 * time left for the previous duration. However if the duration is smaller 2179 * than the time left for the previous pause, no change is made to the 2180 * left duration. */ 2181 void pauseClients(mstime_t end) { 2182 if (!server.clients_paused || end > server.clients_pause_end_time) 2183 server.clients_pause_end_time = end; 2184 server.clients_paused = 1; 2185 } 2186 2187 /* Return non-zero if clients are currently paused. As a side effect the 2188 * function checks if the pause time was reached and clear it. */ 2189 int clientsArePaused(void) { 2190 if (server.clients_paused && 2191 server.clients_pause_end_time < server.mstime) 2192 { 2193 listNode *ln; 2194 listIter li; 2195 client *c; 2196 2197 server.clients_paused = 0; 2198 2199 /* Put all the clients in the unblocked clients queue in order to 2200 * force the re-processing of the input buffer if any. */ 2201 listRewind(server.clients,&li); 2202 while ((ln = listNext(&li)) != NULL) { 2203 c = listNodeValue(ln); 2204 2205 /* Don't touch slaves and blocked clients. 2206 * The latter pending requests will be processed when unblocked. */ 2207 if (c->flags & (CLIENT_SLAVE|CLIENT_BLOCKED)) continue; 2208 queueClientForReprocessing(c); 2209 } 2210 } 2211 return server.clients_paused; 2212 } 2213 2214 /* This function is called by Redis in order to process a few events from 2215 * time to time while blocked into some not interruptible operation. 2216 * This allows to reply to clients with the -LOADING error while loading the 2217 * data set at startup or after a full resynchronization with the master 2218 * and so forth. 2219 * 2220 * It calls the event loop in order to process a few events. Specifically we 2221 * try to call the event loop 4 times as long as we receive acknowledge that 2222 * some event was processed, in order to go forward with the accept, read, 2223 * write, close sequence needed to serve a client. 2224 * 2225 * The function returns the total number of events processed. */ 2226 int processEventsWhileBlocked(void) { 2227 int iterations = 4; /* See the function top-comment. */ 2228 int count = 0; 2229 while (iterations--) { 2230 int events = 0; 2231 events += aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT); 2232 events += handleClientsWithPendingWrites(); 2233 if (!events) break; 2234 count += events; 2235 } 2236 return count; 2237 } 2238