1 /* 2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * * Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * * Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * * Neither the name of Redis nor the names of its contributors may be used 14 * to endorse or promote products derived from this software without 15 * specific prior written permission. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27 * POSSIBILITY OF SUCH DAMAGE. 28 */ 29 30 #include "server.h" 31 #include "lzf.h" /* LZF compression library */ 32 #include "zipmap.h" 33 #include "endianconv.h" 34 #include "stream.h" 35 36 #include <math.h> 37 #include <sys/types.h> 38 #include <sys/time.h> 39 #include <sys/resource.h> 40 #include <sys/wait.h> 41 #include <arpa/inet.h> 42 #include <sys/stat.h> 43 #include <sys/param.h> 44 45 #define rdbExitReportCorruptRDB(...) rdbCheckThenExit(__LINE__,__VA_ARGS__) 46 47 extern int rdbCheckMode; 48 void rdbCheckError(const char *fmt, ...); 49 void rdbCheckSetError(const char *fmt, ...); 50 51 void rdbCheckThenExit(int linenum, char *reason, ...) { 52 va_list ap; 53 char msg[1024]; 54 int len; 55 56 len = snprintf(msg,sizeof(msg), 57 "Internal error in RDB reading function at rdb.c:%d -> ", linenum); 58 va_start(ap,reason); 59 vsnprintf(msg+len,sizeof(msg)-len,reason,ap); 60 va_end(ap); 61 62 if (!rdbCheckMode) { 63 serverLog(LL_WARNING, "%s", msg); 64 char *argv[2] = {"",server.rdb_filename}; 65 redis_check_rdb_main(2,argv,NULL); 66 } else { 67 rdbCheckError("%s",msg); 68 } 69 exit(1); 70 } 71 72 static int rdbWriteRaw(rio *rdb, void *p, size_t len) { 73 if (rdb && rioWrite(rdb,p,len) == 0) 74 return -1; 75 return len; 76 } 77 78 /* This is just a wrapper for the low level function rioRead() that will 79 * automatically abort if it is not possible to read the specified amount 80 * of bytes. */ 81 void rdbLoadRaw(rio *rdb, void *buf, uint64_t len) { 82 if (rioRead(rdb,buf,len) == 0) { 83 rdbExitReportCorruptRDB( 84 "Impossible to read %llu bytes in rdbLoadRaw()", 85 (unsigned long long) len); 86 return; /* Not reached. */ 87 } 88 } 89 90 int rdbSaveType(rio *rdb, unsigned char type) { 91 return rdbWriteRaw(rdb,&type,1); 92 } 93 94 /* Load a "type" in RDB format, that is a one byte unsigned integer. 95 * This function is not only used to load object types, but also special 96 * "types" like the end-of-file type, the EXPIRE type, and so forth. */ 97 int rdbLoadType(rio *rdb) { 98 unsigned char type; 99 if (rioRead(rdb,&type,1) == 0) return -1; 100 return type; 101 } 102 103 /* This is only used to load old databases stored with the RDB_OPCODE_EXPIRETIME 104 * opcode. New versions of Redis store using the RDB_OPCODE_EXPIRETIME_MS 105 * opcode. */ 106 time_t rdbLoadTime(rio *rdb) { 107 int32_t t32; 108 rdbLoadRaw(rdb,&t32,4); 109 return (time_t)t32; 110 } 111 112 int rdbSaveMillisecondTime(rio *rdb, long long t) { 113 int64_t t64 = (int64_t) t; 114 memrev64ifbe(&t64); /* Store in little endian. */ 115 return rdbWriteRaw(rdb,&t64,8); 116 } 117 118 /* This function loads a time from the RDB file. It gets the version of the 119 * RDB because, unfortunately, before Redis 5 (RDB version 9), the function 120 * failed to convert data to/from little endian, so RDB files with keys having 121 * expires could not be shared between big endian and little endian systems 122 * (because the expire time will be totally wrong). The fix for this is just 123 * to call memrev64ifbe(), however if we fix this for all the RDB versions, 124 * this call will introduce an incompatibility for big endian systems: 125 * after upgrading to Redis version 5 they will no longer be able to load their 126 * own old RDB files. Because of that, we instead fix the function only for new 127 * RDB versions, and load older RDB versions as we used to do in the past, 128 * allowing big endian systems to load their own old RDB files. */ 129 long long rdbLoadMillisecondTime(rio *rdb, int rdbver) { 130 int64_t t64; 131 rdbLoadRaw(rdb,&t64,8); 132 if (rdbver >= 9) /* Check the top comment of this function. */ 133 memrev64ifbe(&t64); /* Convert in big endian if the system is BE. */ 134 return (long long)t64; 135 } 136 137 /* Saves an encoded length. The first two bits in the first byte are used to 138 * hold the encoding type. See the RDB_* definitions for more information 139 * on the types of encoding. */ 140 int rdbSaveLen(rio *rdb, uint64_t len) { 141 unsigned char buf[2]; 142 size_t nwritten; 143 144 if (len < (1<<6)) { 145 /* Save a 6 bit len */ 146 buf[0] = (len&0xFF)|(RDB_6BITLEN<<6); 147 if (rdbWriteRaw(rdb,buf,1) == -1) return -1; 148 nwritten = 1; 149 } else if (len < (1<<14)) { 150 /* Save a 14 bit len */ 151 buf[0] = ((len>>8)&0xFF)|(RDB_14BITLEN<<6); 152 buf[1] = len&0xFF; 153 if (rdbWriteRaw(rdb,buf,2) == -1) return -1; 154 nwritten = 2; 155 } else if (len <= UINT32_MAX) { 156 /* Save a 32 bit len */ 157 buf[0] = RDB_32BITLEN; 158 if (rdbWriteRaw(rdb,buf,1) == -1) return -1; 159 uint32_t len32 = htonl(len); 160 if (rdbWriteRaw(rdb,&len32,4) == -1) return -1; 161 nwritten = 1+4; 162 } else { 163 /* Save a 64 bit len */ 164 buf[0] = RDB_64BITLEN; 165 if (rdbWriteRaw(rdb,buf,1) == -1) return -1; 166 len = htonu64(len); 167 if (rdbWriteRaw(rdb,&len,8) == -1) return -1; 168 nwritten = 1+8; 169 } 170 return nwritten; 171 } 172 173 174 /* Load an encoded length. If the loaded length is a normal length as stored 175 * with rdbSaveLen(), the read length is set to '*lenptr'. If instead the 176 * loaded length describes a special encoding that follows, then '*isencoded' 177 * is set to 1 and the encoding format is stored at '*lenptr'. 178 * 179 * See the RDB_ENC_* definitions in rdb.h for more information on special 180 * encodings. 181 * 182 * The function returns -1 on error, 0 on success. */ 183 int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr) { 184 unsigned char buf[2]; 185 int type; 186 187 if (isencoded) *isencoded = 0; 188 if (rioRead(rdb,buf,1) == 0) return -1; 189 type = (buf[0]&0xC0)>>6; 190 if (type == RDB_ENCVAL) { 191 /* Read a 6 bit encoding type. */ 192 if (isencoded) *isencoded = 1; 193 *lenptr = buf[0]&0x3F; 194 } else if (type == RDB_6BITLEN) { 195 /* Read a 6 bit len. */ 196 *lenptr = buf[0]&0x3F; 197 } else if (type == RDB_14BITLEN) { 198 /* Read a 14 bit len. */ 199 if (rioRead(rdb,buf+1,1) == 0) return -1; 200 *lenptr = ((buf[0]&0x3F)<<8)|buf[1]; 201 } else if (buf[0] == RDB_32BITLEN) { 202 /* Read a 32 bit len. */ 203 uint32_t len; 204 if (rioRead(rdb,&len,4) == 0) return -1; 205 *lenptr = ntohl(len); 206 } else if (buf[0] == RDB_64BITLEN) { 207 /* Read a 64 bit len. */ 208 uint64_t len; 209 if (rioRead(rdb,&len,8) == 0) return -1; 210 *lenptr = ntohu64(len); 211 } else { 212 rdbExitReportCorruptRDB( 213 "Unknown length encoding %d in rdbLoadLen()",type); 214 return -1; /* Never reached. */ 215 } 216 return 0; 217 } 218 219 /* This is like rdbLoadLenByRef() but directly returns the value read 220 * from the RDB stream, signaling an error by returning RDB_LENERR 221 * (since it is a too large count to be applicable in any Redis data 222 * structure). */ 223 uint64_t rdbLoadLen(rio *rdb, int *isencoded) { 224 uint64_t len; 225 226 if (rdbLoadLenByRef(rdb,isencoded,&len) == -1) return RDB_LENERR; 227 return len; 228 } 229 230 /* Encodes the "value" argument as integer when it fits in the supported ranges 231 * for encoded types. If the function successfully encodes the integer, the 232 * representation is stored in the buffer pointer to by "enc" and the string 233 * length is returned. Otherwise 0 is returned. */ 234 int rdbEncodeInteger(long long value, unsigned char *enc) { 235 if (value >= -(1<<7) && value <= (1<<7)-1) { 236 enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT8; 237 enc[1] = value&0xFF; 238 return 2; 239 } else if (value >= -(1<<15) && value <= (1<<15)-1) { 240 enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT16; 241 enc[1] = value&0xFF; 242 enc[2] = (value>>8)&0xFF; 243 return 3; 244 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) { 245 enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT32; 246 enc[1] = value&0xFF; 247 enc[2] = (value>>8)&0xFF; 248 enc[3] = (value>>16)&0xFF; 249 enc[4] = (value>>24)&0xFF; 250 return 5; 251 } else { 252 return 0; 253 } 254 } 255 256 /* Loads an integer-encoded object with the specified encoding type "enctype". 257 * The returned value changes according to the flags, see 258 * rdbGenerincLoadStringObject() for more info. */ 259 void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags, size_t *lenptr) { 260 int plain = flags & RDB_LOAD_PLAIN; 261 int sds = flags & RDB_LOAD_SDS; 262 int encode = flags & RDB_LOAD_ENC; 263 unsigned char enc[4]; 264 long long val; 265 266 if (enctype == RDB_ENC_INT8) { 267 if (rioRead(rdb,enc,1) == 0) return NULL; 268 val = (signed char)enc[0]; 269 } else if (enctype == RDB_ENC_INT16) { 270 uint16_t v; 271 if (rioRead(rdb,enc,2) == 0) return NULL; 272 v = enc[0]|(enc[1]<<8); 273 val = (int16_t)v; 274 } else if (enctype == RDB_ENC_INT32) { 275 uint32_t v; 276 if (rioRead(rdb,enc,4) == 0) return NULL; 277 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24); 278 val = (int32_t)v; 279 } else { 280 val = 0; /* anti-warning */ 281 rdbExitReportCorruptRDB("Unknown RDB integer encoding type %d",enctype); 282 } 283 if (plain || sds) { 284 char buf[LONG_STR_SIZE], *p; 285 int len = ll2string(buf,sizeof(buf),val); 286 if (lenptr) *lenptr = len; 287 p = plain ? zmalloc(len) : sdsnewlen(SDS_NOINIT,len); 288 memcpy(p,buf,len); 289 return p; 290 } else if (encode) { 291 return createStringObjectFromLongLongForValue(val); 292 } else { 293 return createObject(OBJ_STRING,sdsfromlonglong(val)); 294 } 295 } 296 297 /* String objects in the form "2391" "-100" without any space and with a 298 * range of values that can fit in an 8, 16 or 32 bit signed value can be 299 * encoded as integers to save space */ 300 int rdbTryIntegerEncoding(char *s, size_t len, unsigned char *enc) { 301 long long value; 302 char *endptr, buf[32]; 303 304 /* Check if it's possible to encode this value as a number */ 305 value = strtoll(s, &endptr, 10); 306 if (endptr[0] != '\0') return 0; 307 ll2string(buf,32,value); 308 309 /* If the number converted back into a string is not identical 310 * then it's not possible to encode the string as integer */ 311 if (strlen(buf) != len || memcmp(buf,s,len)) return 0; 312 313 return rdbEncodeInteger(value,enc); 314 } 315 316 ssize_t rdbSaveLzfBlob(rio *rdb, void *data, size_t compress_len, 317 size_t original_len) { 318 unsigned char byte; 319 ssize_t n, nwritten = 0; 320 321 /* Data compressed! Let's save it on disk */ 322 byte = (RDB_ENCVAL<<6)|RDB_ENC_LZF; 323 if ((n = rdbWriteRaw(rdb,&byte,1)) == -1) goto writeerr; 324 nwritten += n; 325 326 if ((n = rdbSaveLen(rdb,compress_len)) == -1) goto writeerr; 327 nwritten += n; 328 329 if ((n = rdbSaveLen(rdb,original_len)) == -1) goto writeerr; 330 nwritten += n; 331 332 if ((n = rdbWriteRaw(rdb,data,compress_len)) == -1) goto writeerr; 333 nwritten += n; 334 335 return nwritten; 336 337 writeerr: 338 return -1; 339 } 340 341 ssize_t rdbSaveLzfStringObject(rio *rdb, unsigned char *s, size_t len) { 342 size_t comprlen, outlen; 343 void *out; 344 345 /* We require at least four bytes compression for this to be worth it */ 346 if (len <= 4) return 0; 347 outlen = len-4; 348 if ((out = zmalloc(outlen+1)) == NULL) return 0; 349 comprlen = lzf_compress(s, len, out, outlen); 350 if (comprlen == 0) { 351 zfree(out); 352 return 0; 353 } 354 ssize_t nwritten = rdbSaveLzfBlob(rdb, out, comprlen, len); 355 zfree(out); 356 return nwritten; 357 } 358 359 /* Load an LZF compressed string in RDB format. The returned value 360 * changes according to 'flags'. For more info check the 361 * rdbGenericLoadStringObject() function. */ 362 void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr) { 363 int plain = flags & RDB_LOAD_PLAIN; 364 int sds = flags & RDB_LOAD_SDS; 365 uint64_t len, clen; 366 unsigned char *c = NULL; 367 char *val = NULL; 368 369 if ((clen = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL; 370 if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL; 371 if ((c = zmalloc(clen)) == NULL) goto err; 372 373 /* Allocate our target according to the uncompressed size. */ 374 if (plain) { 375 val = zmalloc(len); 376 } else { 377 val = sdsnewlen(SDS_NOINIT,len); 378 } 379 if (lenptr) *lenptr = len; 380 381 /* Load the compressed representation and uncompress it to target. */ 382 if (rioRead(rdb,c,clen) == 0) goto err; 383 if (lzf_decompress(c,clen,val,len) == 0) { 384 if (rdbCheckMode) rdbCheckSetError("Invalid LZF compressed string"); 385 goto err; 386 } 387 zfree(c); 388 389 if (plain || sds) { 390 return val; 391 } else { 392 return createObject(OBJ_STRING,val); 393 } 394 err: 395 zfree(c); 396 if (plain) 397 zfree(val); 398 else 399 sdsfree(val); 400 return NULL; 401 } 402 403 /* Save a string object as [len][data] on disk. If the object is a string 404 * representation of an integer value we try to save it in a special form */ 405 ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len) { 406 int enclen; 407 ssize_t n, nwritten = 0; 408 409 /* Try integer encoding */ 410 if (len <= 11) { 411 unsigned char buf[5]; 412 if ((enclen = rdbTryIntegerEncoding((char*)s,len,buf)) > 0) { 413 if (rdbWriteRaw(rdb,buf,enclen) == -1) return -1; 414 return enclen; 415 } 416 } 417 418 /* Try LZF compression - under 20 bytes it's unable to compress even 419 * aaaaaaaaaaaaaaaaaa so skip it */ 420 if (server.rdb_compression && len > 20) { 421 n = rdbSaveLzfStringObject(rdb,s,len); 422 if (n == -1) return -1; 423 if (n > 0) return n; 424 /* Return value of 0 means data can't be compressed, save the old way */ 425 } 426 427 /* Store verbatim */ 428 if ((n = rdbSaveLen(rdb,len)) == -1) return -1; 429 nwritten += n; 430 if (len > 0) { 431 if (rdbWriteRaw(rdb,s,len) == -1) return -1; 432 nwritten += len; 433 } 434 return nwritten; 435 } 436 437 /* Save a long long value as either an encoded string or a string. */ 438 ssize_t rdbSaveLongLongAsStringObject(rio *rdb, long long value) { 439 unsigned char buf[32]; 440 ssize_t n, nwritten = 0; 441 int enclen = rdbEncodeInteger(value,buf); 442 if (enclen > 0) { 443 return rdbWriteRaw(rdb,buf,enclen); 444 } else { 445 /* Encode as string */ 446 enclen = ll2string((char*)buf,32,value); 447 serverAssert(enclen < 32); 448 if ((n = rdbSaveLen(rdb,enclen)) == -1) return -1; 449 nwritten += n; 450 if ((n = rdbWriteRaw(rdb,buf,enclen)) == -1) return -1; 451 nwritten += n; 452 } 453 return nwritten; 454 } 455 456 /* Like rdbSaveRawString() gets a Redis object instead. */ 457 ssize_t rdbSaveStringObject(rio *rdb, robj *obj) { 458 /* Avoid to decode the object, then encode it again, if the 459 * object is already integer encoded. */ 460 if (obj->encoding == OBJ_ENCODING_INT) { 461 return rdbSaveLongLongAsStringObject(rdb,(long)obj->ptr); 462 } else { 463 serverAssertWithInfo(NULL,obj,sdsEncodedObject(obj)); 464 return rdbSaveRawString(rdb,obj->ptr,sdslen(obj->ptr)); 465 } 466 } 467 468 /* Load a string object from an RDB file according to flags: 469 * 470 * RDB_LOAD_NONE (no flags): load an RDB object, unencoded. 471 * RDB_LOAD_ENC: If the returned type is a Redis object, try to 472 * encode it in a special way to be more memory 473 * efficient. When this flag is passed the function 474 * no longer guarantees that obj->ptr is an SDS string. 475 * RDB_LOAD_PLAIN: Return a plain string allocated with zmalloc() 476 * instead of a Redis object with an sds in it. 477 * RDB_LOAD_SDS: Return an SDS string instead of a Redis object. 478 * 479 * On I/O error NULL is returned. 480 */ 481 void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) { 482 int encode = flags & RDB_LOAD_ENC; 483 int plain = flags & RDB_LOAD_PLAIN; 484 int sds = flags & RDB_LOAD_SDS; 485 int isencoded; 486 uint64_t len; 487 488 len = rdbLoadLen(rdb,&isencoded); 489 if (isencoded) { 490 switch(len) { 491 case RDB_ENC_INT8: 492 case RDB_ENC_INT16: 493 case RDB_ENC_INT32: 494 return rdbLoadIntegerObject(rdb,len,flags,lenptr); 495 case RDB_ENC_LZF: 496 return rdbLoadLzfStringObject(rdb,flags,lenptr); 497 default: 498 rdbExitReportCorruptRDB("Unknown RDB string encoding type %d",len); 499 } 500 } 501 502 if (len == RDB_LENERR) return NULL; 503 if (plain || sds) { 504 void *buf = plain ? zmalloc(len) : sdsnewlen(SDS_NOINIT,len); 505 if (lenptr) *lenptr = len; 506 if (len && rioRead(rdb,buf,len) == 0) { 507 if (plain) 508 zfree(buf); 509 else 510 sdsfree(buf); 511 return NULL; 512 } 513 return buf; 514 } else { 515 robj *o = encode ? createStringObject(SDS_NOINIT,len) : 516 createRawStringObject(SDS_NOINIT,len); 517 if (len && rioRead(rdb,o->ptr,len) == 0) { 518 decrRefCount(o); 519 return NULL; 520 } 521 return o; 522 } 523 } 524 525 robj *rdbLoadStringObject(rio *rdb) { 526 return rdbGenericLoadStringObject(rdb,RDB_LOAD_NONE,NULL); 527 } 528 529 robj *rdbLoadEncodedStringObject(rio *rdb) { 530 return rdbGenericLoadStringObject(rdb,RDB_LOAD_ENC,NULL); 531 } 532 533 /* Save a double value. Doubles are saved as strings prefixed by an unsigned 534 * 8 bit integer specifying the length of the representation. 535 * This 8 bit integer has special values in order to specify the following 536 * conditions: 537 * 253: not a number 538 * 254: + inf 539 * 255: - inf 540 */ 541 int rdbSaveDoubleValue(rio *rdb, double val) { 542 unsigned char buf[128]; 543 int len; 544 545 if (isnan(val)) { 546 buf[0] = 253; 547 len = 1; 548 } else if (!isfinite(val)) { 549 len = 1; 550 buf[0] = (val < 0) ? 255 : 254; 551 } else { 552 #if (DBL_MANT_DIG >= 52) && (LLONG_MAX == 0x7fffffffffffffffLL) 553 /* Check if the float is in a safe range to be casted into a 554 * long long. We are assuming that long long is 64 bit here. 555 * Also we are assuming that there are no implementations around where 556 * double has precision < 52 bit. 557 * 558 * Under this assumptions we test if a double is inside an interval 559 * where casting to long long is safe. Then using two castings we 560 * make sure the decimal part is zero. If all this is true we use 561 * integer printing function that is much faster. */ 562 double min = -4503599627370495; /* (2^52)-1 */ 563 double max = 4503599627370496; /* -(2^52) */ 564 if (val > min && val < max && val == ((double)((long long)val))) 565 ll2string((char*)buf+1,sizeof(buf)-1,(long long)val); 566 else 567 #endif 568 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val); 569 buf[0] = strlen((char*)buf+1); 570 len = buf[0]+1; 571 } 572 return rdbWriteRaw(rdb,buf,len); 573 } 574 575 /* For information about double serialization check rdbSaveDoubleValue() */ 576 int rdbLoadDoubleValue(rio *rdb, double *val) { 577 char buf[256]; 578 unsigned char len; 579 580 if (rioRead(rdb,&len,1) == 0) return -1; 581 switch(len) { 582 case 255: *val = R_NegInf; return 0; 583 case 254: *val = R_PosInf; return 0; 584 case 253: *val = R_Nan; return 0; 585 default: 586 if (rioRead(rdb,buf,len) == 0) return -1; 587 buf[len] = '\0'; 588 sscanf(buf, "%lg", val); 589 return 0; 590 } 591 } 592 593 /* Saves a double for RDB 8 or greater, where IE754 binary64 format is assumed. 594 * We just make sure the integer is always stored in little endian, otherwise 595 * the value is copied verbatim from memory to disk. 596 * 597 * Return -1 on error, the size of the serialized value on success. */ 598 int rdbSaveBinaryDoubleValue(rio *rdb, double val) { 599 memrev64ifbe(&val); 600 return rdbWriteRaw(rdb,&val,sizeof(val)); 601 } 602 603 /* Loads a double from RDB 8 or greater. See rdbSaveBinaryDoubleValue() for 604 * more info. On error -1 is returned, otherwise 0. */ 605 int rdbLoadBinaryDoubleValue(rio *rdb, double *val) { 606 if (rioRead(rdb,val,sizeof(*val)) == 0) return -1; 607 memrev64ifbe(val); 608 return 0; 609 } 610 611 /* Like rdbSaveBinaryDoubleValue() but single precision. */ 612 int rdbSaveBinaryFloatValue(rio *rdb, float val) { 613 memrev32ifbe(&val); 614 return rdbWriteRaw(rdb,&val,sizeof(val)); 615 } 616 617 /* Like rdbLoadBinaryDoubleValue() but single precision. */ 618 int rdbLoadBinaryFloatValue(rio *rdb, float *val) { 619 if (rioRead(rdb,val,sizeof(*val)) == 0) return -1; 620 memrev32ifbe(val); 621 return 0; 622 } 623 624 /* Save the object type of object "o". */ 625 int rdbSaveObjectType(rio *rdb, robj *o) { 626 switch (o->type) { 627 case OBJ_STRING: 628 return rdbSaveType(rdb,RDB_TYPE_STRING); 629 case OBJ_LIST: 630 if (o->encoding == OBJ_ENCODING_QUICKLIST) 631 return rdbSaveType(rdb,RDB_TYPE_LIST_QUICKLIST); 632 else 633 serverPanic("Unknown list encoding"); 634 case OBJ_SET: 635 if (o->encoding == OBJ_ENCODING_INTSET) 636 return rdbSaveType(rdb,RDB_TYPE_SET_INTSET); 637 else if (o->encoding == OBJ_ENCODING_HT) 638 return rdbSaveType(rdb,RDB_TYPE_SET); 639 else 640 serverPanic("Unknown set encoding"); 641 case OBJ_ZSET: 642 if (o->encoding == OBJ_ENCODING_ZIPLIST) 643 return rdbSaveType(rdb,RDB_TYPE_ZSET_ZIPLIST); 644 else if (o->encoding == OBJ_ENCODING_SKIPLIST) 645 return rdbSaveType(rdb,RDB_TYPE_ZSET_2); 646 else 647 serverPanic("Unknown sorted set encoding"); 648 case OBJ_HASH: 649 if (o->encoding == OBJ_ENCODING_ZIPLIST) 650 return rdbSaveType(rdb,RDB_TYPE_HASH_ZIPLIST); 651 else if (o->encoding == OBJ_ENCODING_HT) 652 return rdbSaveType(rdb,RDB_TYPE_HASH); 653 else 654 serverPanic("Unknown hash encoding"); 655 case OBJ_STREAM: 656 return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS); 657 case OBJ_MODULE: 658 return rdbSaveType(rdb,RDB_TYPE_MODULE_2); 659 default: 660 serverPanic("Unknown object type"); 661 } 662 return -1; /* avoid warning */ 663 } 664 665 /* Use rdbLoadType() to load a TYPE in RDB format, but returns -1 if the 666 * type is not specifically a valid Object Type. */ 667 int rdbLoadObjectType(rio *rdb) { 668 int type; 669 if ((type = rdbLoadType(rdb)) == -1) return -1; 670 if (!rdbIsObjectType(type)) return -1; 671 return type; 672 } 673 674 /* This helper function serializes a consumer group Pending Entries List (PEL) 675 * into the RDB file. The 'nacks' argument tells the function if also persist 676 * the informations about the not acknowledged message, or if to persist 677 * just the IDs: this is useful because for the global consumer group PEL 678 * we serialized the NACKs as well, but when serializing the local consumer 679 * PELs we just add the ID, that will be resolved inside the global PEL to 680 * put a reference to the same structure. */ 681 ssize_t rdbSaveStreamPEL(rio *rdb, rax *pel, int nacks) { 682 ssize_t n, nwritten = 0; 683 684 /* Number of entries in the PEL. */ 685 if ((n = rdbSaveLen(rdb,raxSize(pel))) == -1) return -1; 686 nwritten += n; 687 688 /* Save each entry. */ 689 raxIterator ri; 690 raxStart(&ri,pel); 691 raxSeek(&ri,"^",NULL,0); 692 while(raxNext(&ri)) { 693 /* We store IDs in raw form as 128 big big endian numbers, like 694 * they are inside the radix tree key. */ 695 if ((n = rdbWriteRaw(rdb,ri.key,sizeof(streamID))) == -1) return -1; 696 nwritten += n; 697 698 if (nacks) { 699 streamNACK *nack = ri.data; 700 if ((n = rdbSaveMillisecondTime(rdb,nack->delivery_time)) == -1) 701 return -1; 702 nwritten += n; 703 if ((n = rdbSaveLen(rdb,nack->delivery_count)) == -1) return -1; 704 nwritten += n; 705 /* We don't save the consumer name: we'll save the pending IDs 706 * for each consumer in the consumer PEL, and resolve the consumer 707 * at loading time. */ 708 } 709 } 710 raxStop(&ri); 711 return nwritten; 712 } 713 714 /* Serialize the consumers of a stream consumer group into the RDB. Helper 715 * function for the stream data type serialization. What we do here is to 716 * persist the consumer metadata, and it's PEL, for each consumer. */ 717 size_t rdbSaveStreamConsumers(rio *rdb, streamCG *cg) { 718 ssize_t n, nwritten = 0; 719 720 /* Number of consumers in this consumer group. */ 721 if ((n = rdbSaveLen(rdb,raxSize(cg->consumers))) == -1) return -1; 722 nwritten += n; 723 724 /* Save each consumer. */ 725 raxIterator ri; 726 raxStart(&ri,cg->consumers); 727 raxSeek(&ri,"^",NULL,0); 728 while(raxNext(&ri)) { 729 streamConsumer *consumer = ri.data; 730 731 /* Consumer name. */ 732 if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) return -1; 733 nwritten += n; 734 735 /* Last seen time. */ 736 if ((n = rdbSaveMillisecondTime(rdb,consumer->seen_time)) == -1) 737 return -1; 738 nwritten += n; 739 740 /* Consumer PEL, without the ACKs (see last parameter of the function 741 * passed with value of 0), at loading time we'll lookup the ID 742 * in the consumer group global PEL and will put a reference in the 743 * consumer local PEL. */ 744 if ((n = rdbSaveStreamPEL(rdb,consumer->pel,0)) == -1) 745 return -1; 746 nwritten += n; 747 } 748 raxStop(&ri); 749 return nwritten; 750 } 751 752 /* Save a Redis object. 753 * Returns -1 on error, number of bytes written on success. */ 754 ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) { 755 ssize_t n = 0, nwritten = 0; 756 757 if (o->type == OBJ_STRING) { 758 /* Save a string value */ 759 if ((n = rdbSaveStringObject(rdb,o)) == -1) return -1; 760 nwritten += n; 761 } else if (o->type == OBJ_LIST) { 762 /* Save a list value */ 763 if (o->encoding == OBJ_ENCODING_QUICKLIST) { 764 quicklist *ql = o->ptr; 765 quicklistNode *node = ql->head; 766 767 if ((n = rdbSaveLen(rdb,ql->len)) == -1) return -1; 768 nwritten += n; 769 770 while(node) { 771 if (quicklistNodeIsCompressed(node)) { 772 void *data; 773 size_t compress_len = quicklistGetLzf(node, &data); 774 if ((n = rdbSaveLzfBlob(rdb,data,compress_len,node->sz)) == -1) return -1; 775 nwritten += n; 776 } else { 777 if ((n = rdbSaveRawString(rdb,node->zl,node->sz)) == -1) return -1; 778 nwritten += n; 779 } 780 node = node->next; 781 } 782 } else { 783 serverPanic("Unknown list encoding"); 784 } 785 } else if (o->type == OBJ_SET) { 786 /* Save a set value */ 787 if (o->encoding == OBJ_ENCODING_HT) { 788 dict *set = o->ptr; 789 dictIterator *di = dictGetIterator(set); 790 dictEntry *de; 791 792 if ((n = rdbSaveLen(rdb,dictSize(set))) == -1) { 793 dictReleaseIterator(di); 794 return -1; 795 } 796 nwritten += n; 797 798 while((de = dictNext(di)) != NULL) { 799 sds ele = dictGetKey(de); 800 if ((n = rdbSaveRawString(rdb,(unsigned char*)ele,sdslen(ele))) 801 == -1) 802 { 803 dictReleaseIterator(di); 804 return -1; 805 } 806 nwritten += n; 807 } 808 dictReleaseIterator(di); 809 } else if (o->encoding == OBJ_ENCODING_INTSET) { 810 size_t l = intsetBlobLen((intset*)o->ptr); 811 812 if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1; 813 nwritten += n; 814 } else { 815 serverPanic("Unknown set encoding"); 816 } 817 } else if (o->type == OBJ_ZSET) { 818 /* Save a sorted set value */ 819 if (o->encoding == OBJ_ENCODING_ZIPLIST) { 820 size_t l = ziplistBlobLen((unsigned char*)o->ptr); 821 822 if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1; 823 nwritten += n; 824 } else if (o->encoding == OBJ_ENCODING_SKIPLIST) { 825 zset *zs = o->ptr; 826 zskiplist *zsl = zs->zsl; 827 828 if ((n = rdbSaveLen(rdb,zsl->length)) == -1) return -1; 829 nwritten += n; 830 831 /* We save the skiplist elements from the greatest to the smallest 832 * (that's trivial since the elements are already ordered in the 833 * skiplist): this improves the load process, since the next loaded 834 * element will always be the smaller, so adding to the skiplist 835 * will always immediately stop at the head, making the insertion 836 * O(1) instead of O(log(N)). */ 837 zskiplistNode *zn = zsl->tail; 838 while (zn != NULL) { 839 if ((n = rdbSaveRawString(rdb, 840 (unsigned char*)zn->ele,sdslen(zn->ele))) == -1) 841 { 842 return -1; 843 } 844 nwritten += n; 845 if ((n = rdbSaveBinaryDoubleValue(rdb,zn->score)) == -1) 846 return -1; 847 nwritten += n; 848 zn = zn->backward; 849 } 850 } else { 851 serverPanic("Unknown sorted set encoding"); 852 } 853 } else if (o->type == OBJ_HASH) { 854 /* Save a hash value */ 855 if (o->encoding == OBJ_ENCODING_ZIPLIST) { 856 size_t l = ziplistBlobLen((unsigned char*)o->ptr); 857 858 if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1; 859 nwritten += n; 860 861 } else if (o->encoding == OBJ_ENCODING_HT) { 862 dictIterator *di = dictGetIterator(o->ptr); 863 dictEntry *de; 864 865 if ((n = rdbSaveLen(rdb,dictSize((dict*)o->ptr))) == -1) { 866 dictReleaseIterator(di); 867 return -1; 868 } 869 nwritten += n; 870 871 while((de = dictNext(di)) != NULL) { 872 sds field = dictGetKey(de); 873 sds value = dictGetVal(de); 874 875 if ((n = rdbSaveRawString(rdb,(unsigned char*)field, 876 sdslen(field))) == -1) 877 { 878 dictReleaseIterator(di); 879 return -1; 880 } 881 nwritten += n; 882 if ((n = rdbSaveRawString(rdb,(unsigned char*)value, 883 sdslen(value))) == -1) 884 { 885 dictReleaseIterator(di); 886 return -1; 887 } 888 nwritten += n; 889 } 890 dictReleaseIterator(di); 891 } else { 892 serverPanic("Unknown hash encoding"); 893 } 894 } else if (o->type == OBJ_STREAM) { 895 /* Store how many listpacks we have inside the radix tree. */ 896 stream *s = o->ptr; 897 rax *rax = s->rax; 898 if ((n = rdbSaveLen(rdb,raxSize(rax))) == -1) return -1; 899 nwritten += n; 900 901 /* Serialize all the listpacks inside the radix tree as they are, 902 * when loading back, we'll use the first entry of each listpack 903 * to insert it back into the radix tree. */ 904 raxIterator ri; 905 raxStart(&ri,rax); 906 raxSeek(&ri,"^",NULL,0); 907 while (raxNext(&ri)) { 908 unsigned char *lp = ri.data; 909 size_t lp_bytes = lpBytes(lp); 910 if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) return -1; 911 nwritten += n; 912 if ((n = rdbSaveRawString(rdb,lp,lp_bytes)) == -1) return -1; 913 nwritten += n; 914 } 915 raxStop(&ri); 916 917 /* Save the number of elements inside the stream. We cannot obtain 918 * this easily later, since our macro nodes should be checked for 919 * number of items: not a great CPU / space tradeoff. */ 920 if ((n = rdbSaveLen(rdb,s->length)) == -1) return -1; 921 nwritten += n; 922 /* Save the last entry ID. */ 923 if ((n = rdbSaveLen(rdb,s->last_id.ms)) == -1) return -1; 924 nwritten += n; 925 if ((n = rdbSaveLen(rdb,s->last_id.seq)) == -1) return -1; 926 nwritten += n; 927 928 /* The consumer groups and their clients are part of the stream 929 * type, so serialize every consumer group. */ 930 931 /* Save the number of groups. */ 932 size_t num_cgroups = s->cgroups ? raxSize(s->cgroups) : 0; 933 if ((n = rdbSaveLen(rdb,num_cgroups)) == -1) return -1; 934 nwritten += n; 935 936 if (num_cgroups) { 937 /* Serialize each consumer group. */ 938 raxStart(&ri,s->cgroups); 939 raxSeek(&ri,"^",NULL,0); 940 while(raxNext(&ri)) { 941 streamCG *cg = ri.data; 942 943 /* Save the group name. */ 944 if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) 945 return -1; 946 nwritten += n; 947 948 /* Last ID. */ 949 if ((n = rdbSaveLen(rdb,cg->last_id.ms)) == -1) return -1; 950 nwritten += n; 951 if ((n = rdbSaveLen(rdb,cg->last_id.seq)) == -1) return -1; 952 nwritten += n; 953 954 /* Save the global PEL. */ 955 if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) return -1; 956 nwritten += n; 957 958 /* Save the consumers of this group. */ 959 if ((n = rdbSaveStreamConsumers(rdb,cg)) == -1) return -1; 960 nwritten += n; 961 } 962 raxStop(&ri); 963 } 964 } else if (o->type == OBJ_MODULE) { 965 /* Save a module-specific value. */ 966 RedisModuleIO io; 967 moduleValue *mv = o->ptr; 968 moduleType *mt = mv->type; 969 moduleInitIOContext(io,mt,rdb,key); 970 971 /* Write the "module" identifier as prefix, so that we'll be able 972 * to call the right module during loading. */ 973 int retval = rdbSaveLen(rdb,mt->id); 974 if (retval == -1) return -1; 975 io.bytes += retval; 976 977 /* Then write the module-specific representation + EOF marker. */ 978 mt->rdb_save(&io,mv->value); 979 retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF); 980 if (retval == -1) return -1; 981 io.bytes += retval; 982 983 if (io.ctx) { 984 moduleFreeContext(io.ctx); 985 zfree(io.ctx); 986 } 987 return io.error ? -1 : (ssize_t)io.bytes; 988 } else { 989 serverPanic("Unknown object type"); 990 } 991 return nwritten; 992 } 993 994 /* Return the length the object will have on disk if saved with 995 * the rdbSaveObject() function. Currently we use a trick to get 996 * this length with very little changes to the code. In the future 997 * we could switch to a faster solution. */ 998 size_t rdbSavedObjectLen(robj *o) { 999 ssize_t len = rdbSaveObject(NULL,o,NULL); 1000 serverAssertWithInfo(NULL,o,len != -1); 1001 return len; 1002 } 1003 1004 /* Save a key-value pair, with expire time, type, key, value. 1005 * On error -1 is returned. 1006 * On success if the key was actually saved 1 is returned, otherwise 0 1007 * is returned (the key was already expired). */ 1008 int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) { 1009 int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU; 1010 int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU; 1011 1012 /* Save the expire time */ 1013 if (expiretime != -1) { 1014 if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1; 1015 if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1; 1016 } 1017 1018 /* Save the LRU info. */ 1019 if (savelru) { 1020 uint64_t idletime = estimateObjectIdleTime(val); 1021 idletime /= 1000; /* Using seconds is enough and requires less space.*/ 1022 if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1; 1023 if (rdbSaveLen(rdb,idletime) == -1) return -1; 1024 } 1025 1026 /* Save the LFU info. */ 1027 if (savelfu) { 1028 uint8_t buf[1]; 1029 buf[0] = LFUDecrAndReturn(val); 1030 /* We can encode this in exactly two bytes: the opcode and an 8 1031 * bit counter, since the frequency is logarithmic with a 0-255 range. 1032 * Note that we do not store the halving time because to reset it 1033 * a single time when loading does not affect the frequency much. */ 1034 if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1; 1035 if (rdbWriteRaw(rdb,buf,1) == -1) return -1; 1036 } 1037 1038 /* Save type, key, value */ 1039 if (rdbSaveObjectType(rdb,val) == -1) return -1; 1040 if (rdbSaveStringObject(rdb,key) == -1) return -1; 1041 if (rdbSaveObject(rdb,val,key) == -1) return -1; 1042 return 1; 1043 } 1044 1045 /* Save an AUX field. */ 1046 ssize_t rdbSaveAuxField(rio *rdb, void *key, size_t keylen, void *val, size_t vallen) { 1047 ssize_t ret, len = 0; 1048 if ((ret = rdbSaveType(rdb,RDB_OPCODE_AUX)) == -1) return -1; 1049 len += ret; 1050 if ((ret = rdbSaveRawString(rdb,key,keylen)) == -1) return -1; 1051 len += ret; 1052 if ((ret = rdbSaveRawString(rdb,val,vallen)) == -1) return -1; 1053 len += ret; 1054 return len; 1055 } 1056 1057 /* Wrapper for rdbSaveAuxField() used when key/val length can be obtained 1058 * with strlen(). */ 1059 ssize_t rdbSaveAuxFieldStrStr(rio *rdb, char *key, char *val) { 1060 return rdbSaveAuxField(rdb,key,strlen(key),val,strlen(val)); 1061 } 1062 1063 /* Wrapper for strlen(key) + integer type (up to long long range). */ 1064 ssize_t rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) { 1065 char buf[LONG_STR_SIZE]; 1066 int vlen = ll2string(buf,sizeof(buf),val); 1067 return rdbSaveAuxField(rdb,key,strlen(key),buf,vlen); 1068 } 1069 1070 /* Save a few default AUX fields with information about the RDB generated. */ 1071 int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) { 1072 int redis_bits = (sizeof(void*) == 8) ? 64 : 32; 1073 int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0; 1074 1075 /* Add a few fields about the state when the RDB was created. */ 1076 if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1; 1077 if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1; 1078 if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1; 1079 if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1; 1080 1081 /* Handle saving options that generate aux fields. */ 1082 if (rsi) { 1083 if (rdbSaveAuxFieldStrInt(rdb,"repl-stream-db",rsi->repl_stream_db) 1084 == -1) return -1; 1085 if (rdbSaveAuxFieldStrStr(rdb,"repl-id",server.replid) 1086 == -1) return -1; 1087 if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",server.master_repl_offset) 1088 == -1) return -1; 1089 } 1090 if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1; 1091 return 1; 1092 } 1093 1094 /* Produces a dump of the database in RDB format sending it to the specified 1095 * Redis I/O channel. On success C_OK is returned, otherwise C_ERR 1096 * is returned and part of the output, or all the output, can be 1097 * missing because of I/O errors. 1098 * 1099 * When the function returns C_ERR and if 'error' is not NULL, the 1100 * integer pointed by 'error' is set to the value of errno just after the I/O 1101 * error. */ 1102 int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { 1103 dictIterator *di = NULL; 1104 dictEntry *de; 1105 char magic[10]; 1106 int j; 1107 uint64_t cksum; 1108 size_t processed = 0; 1109 1110 if (server.rdb_checksum) 1111 rdb->update_cksum = rioGenericUpdateChecksum; 1112 snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION); 1113 if (rdbWriteRaw(rdb,magic,9) == -1) goto werr; 1114 if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr; 1115 1116 for (j = 0; j < server.dbnum; j++) { 1117 redisDb *db = server.db+j; 1118 dict *d = db->dict; 1119 if (dictSize(d) == 0) continue; 1120 di = dictGetSafeIterator(d); 1121 1122 /* Write the SELECT DB opcode */ 1123 if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr; 1124 if (rdbSaveLen(rdb,j) == -1) goto werr; 1125 1126 /* Write the RESIZE DB opcode. We trim the size to UINT32_MAX, which 1127 * is currently the largest type we are able to represent in RDB sizes. 1128 * However this does not limit the actual size of the DB to load since 1129 * these sizes are just hints to resize the hash tables. */ 1130 uint64_t db_size, expires_size; 1131 db_size = dictSize(db->dict); 1132 expires_size = dictSize(db->expires); 1133 if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr; 1134 if (rdbSaveLen(rdb,db_size) == -1) goto werr; 1135 if (rdbSaveLen(rdb,expires_size) == -1) goto werr; 1136 1137 /* Iterate this DB writing every entry */ 1138 while((de = dictNext(di)) != NULL) { 1139 sds keystr = dictGetKey(de); 1140 robj key, *o = dictGetVal(de); 1141 long long expire; 1142 1143 initStaticStringObject(key,keystr); 1144 expire = getExpire(db,&key); 1145 if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr; 1146 1147 /* When this RDB is produced as part of an AOF rewrite, move 1148 * accumulated diff from parent to child while rewriting in 1149 * order to have a smaller final write. */ 1150 if (flags & RDB_SAVE_AOF_PREAMBLE && 1151 rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) 1152 { 1153 processed = rdb->processed_bytes; 1154 aofReadDiffFromParent(); 1155 } 1156 } 1157 dictReleaseIterator(di); 1158 di = NULL; /* So that we don't release it again on error. */ 1159 } 1160 1161 /* If we are storing the replication information on disk, persist 1162 * the script cache as well: on successful PSYNC after a restart, we need 1163 * to be able to process any EVALSHA inside the replication backlog the 1164 * master will send us. */ 1165 if (rsi && dictSize(server.lua_scripts)) { 1166 di = dictGetIterator(server.lua_scripts); 1167 while((de = dictNext(di)) != NULL) { 1168 robj *body = dictGetVal(de); 1169 if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1) 1170 goto werr; 1171 } 1172 dictReleaseIterator(di); 1173 di = NULL; /* So that we don't release it again on error. */ 1174 } 1175 1176 /* EOF opcode */ 1177 if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr; 1178 1179 /* CRC64 checksum. It will be zero if checksum computation is disabled, the 1180 * loading code skips the check in this case. */ 1181 cksum = rdb->cksum; 1182 memrev64ifbe(&cksum); 1183 if (rioWrite(rdb,&cksum,8) == 0) goto werr; 1184 return C_OK; 1185 1186 werr: 1187 if (error) *error = errno; 1188 if (di) dictReleaseIterator(di); 1189 return C_ERR; 1190 } 1191 1192 /* This is just a wrapper to rdbSaveRio() that additionally adds a prefix 1193 * and a suffix to the generated RDB dump. The prefix is: 1194 * 1195 * $EOF:<40 bytes unguessable hex string>\r\n 1196 * 1197 * While the suffix is the 40 bytes hex string we announced in the prefix. 1198 * This way processes receiving the payload can understand when it ends 1199 * without doing any processing of the content. */ 1200 int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) { 1201 char eofmark[RDB_EOF_MARK_SIZE]; 1202 1203 getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE); 1204 if (error) *error = 0; 1205 if (rioWrite(rdb,"$EOF:",5) == 0) goto werr; 1206 if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; 1207 if (rioWrite(rdb,"\r\n",2) == 0) goto werr; 1208 if (rdbSaveRio(rdb,error,RDB_SAVE_NONE,rsi) == C_ERR) goto werr; 1209 if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; 1210 return C_OK; 1211 1212 werr: /* Write error. */ 1213 /* Set 'error' only if not already set by rdbSaveRio() call. */ 1214 if (error && *error == 0) *error = errno; 1215 return C_ERR; 1216 } 1217 1218 /* Save the DB on disk. Return C_ERR on error, C_OK on success. */ 1219 int rdbSave(char *filename, rdbSaveInfo *rsi) { 1220 char tmpfile[256]; 1221 char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */ 1222 FILE *fp; 1223 rio rdb; 1224 int error = 0; 1225 1226 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); 1227 fp = fopen(tmpfile,"w"); 1228 if (!fp) { 1229 char *cwdp = getcwd(cwd,MAXPATHLEN); 1230 serverLog(LL_WARNING, 1231 "Failed opening the RDB file %s (in server root dir %s) " 1232 "for saving: %s", 1233 filename, 1234 cwdp ? cwdp : "unknown", 1235 strerror(errno)); 1236 return C_ERR; 1237 } 1238 1239 rioInitWithFile(&rdb,fp); 1240 1241 if (server.rdb_save_incremental_fsync) 1242 rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES); 1243 1244 if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) { 1245 errno = error; 1246 goto werr; 1247 } 1248 1249 /* Make sure data will not remain on the OS's output buffers */ 1250 if (fflush(fp) == EOF) goto werr; 1251 if (fsync(fileno(fp)) == -1) goto werr; 1252 if (fclose(fp) == EOF) goto werr; 1253 1254 /* Use RENAME to make sure the DB file is changed atomically only 1255 * if the generate DB file is ok. */ 1256 if (rename(tmpfile,filename) == -1) { 1257 char *cwdp = getcwd(cwd,MAXPATHLEN); 1258 serverLog(LL_WARNING, 1259 "Error moving temp DB file %s on the final " 1260 "destination %s (in server root dir %s): %s", 1261 tmpfile, 1262 filename, 1263 cwdp ? cwdp : "unknown", 1264 strerror(errno)); 1265 unlink(tmpfile); 1266 return C_ERR; 1267 } 1268 1269 serverLog(LL_NOTICE,"DB saved on disk"); 1270 server.dirty = 0; 1271 server.lastsave = time(NULL); 1272 server.lastbgsave_status = C_OK; 1273 return C_OK; 1274 1275 werr: 1276 serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno)); 1277 fclose(fp); 1278 unlink(tmpfile); 1279 return C_ERR; 1280 } 1281 1282 int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) { 1283 pid_t childpid; 1284 long long start; 1285 1286 if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR; 1287 1288 server.dirty_before_bgsave = server.dirty; 1289 server.lastbgsave_try = time(NULL); 1290 openChildInfoPipe(); 1291 1292 start = ustime(); 1293 if ((childpid = fork()) == 0) { 1294 int retval; 1295 1296 /* Child */ 1297 closeListeningSockets(0); 1298 resetCpuAffinity("rdb-bgsave"); 1299 redisSetProcTitle("redis-rdb-bgsave"); 1300 retval = rdbSave(filename,rsi); 1301 if (retval == C_OK) { 1302 size_t private_dirty = zmalloc_get_private_dirty(-1); 1303 1304 if (private_dirty) { 1305 serverLog(LL_NOTICE, 1306 "RDB: %zu MB of memory used by copy-on-write", 1307 private_dirty/(1024*1024)); 1308 } 1309 1310 server.child_info_data.cow_size = private_dirty; 1311 sendChildInfo(CHILD_INFO_TYPE_RDB); 1312 } 1313 exitFromChild((retval == C_OK) ? 0 : 1); 1314 } else { 1315 /* Parent */ 1316 server.stat_fork_time = ustime()-start; 1317 server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */ 1318 latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000); 1319 if (childpid == -1) { 1320 closeChildInfoPipe(); 1321 server.lastbgsave_status = C_ERR; 1322 serverLog(LL_WARNING,"Can't save in background: fork: %s", 1323 strerror(errno)); 1324 return C_ERR; 1325 } 1326 serverLog(LL_NOTICE,"Background saving started by pid %d",childpid); 1327 server.rdb_save_time_start = time(NULL); 1328 server.rdb_child_pid = childpid; 1329 server.rdb_child_type = RDB_CHILD_TYPE_DISK; 1330 updateDictResizePolicy(); 1331 return C_OK; 1332 } 1333 return C_OK; /* unreached */ 1334 } 1335 1336 void rdbRemoveTempFile(pid_t childpid) { 1337 char tmpfile[256]; 1338 1339 snprintf(tmpfile,sizeof(tmpfile),"temp-%d.rdb", (int) childpid); 1340 unlink(tmpfile); 1341 } 1342 1343 /* This function is called by rdbLoadObject() when the code is in RDB-check 1344 * mode and we find a module value of type 2 that can be parsed without 1345 * the need of the actual module. The value is parsed for errors, finally 1346 * a dummy redis object is returned just to conform to the API. */ 1347 robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename) { 1348 uint64_t opcode; 1349 while((opcode = rdbLoadLen(rdb,NULL)) != RDB_MODULE_OPCODE_EOF) { 1350 if (opcode == RDB_MODULE_OPCODE_SINT || 1351 opcode == RDB_MODULE_OPCODE_UINT) 1352 { 1353 uint64_t len; 1354 if (rdbLoadLenByRef(rdb,NULL,&len) == -1) { 1355 rdbExitReportCorruptRDB( 1356 "Error reading integer from module %s value", modulename); 1357 } 1358 } else if (opcode == RDB_MODULE_OPCODE_STRING) { 1359 robj *o = rdbGenericLoadStringObject(rdb,RDB_LOAD_NONE,NULL); 1360 if (o == NULL) { 1361 rdbExitReportCorruptRDB( 1362 "Error reading string from module %s value", modulename); 1363 } 1364 decrRefCount(o); 1365 } else if (opcode == RDB_MODULE_OPCODE_FLOAT) { 1366 float val; 1367 if (rdbLoadBinaryFloatValue(rdb,&val) == -1) { 1368 rdbExitReportCorruptRDB( 1369 "Error reading float from module %s value", modulename); 1370 } 1371 } else if (opcode == RDB_MODULE_OPCODE_DOUBLE) { 1372 double val; 1373 if (rdbLoadBinaryDoubleValue(rdb,&val) == -1) { 1374 rdbExitReportCorruptRDB( 1375 "Error reading double from module %s value", modulename); 1376 } 1377 } 1378 } 1379 return createStringObject("module-dummy-value",18); 1380 } 1381 1382 /* Load a Redis object of the specified type from the specified file. 1383 * On success a newly allocated object is returned, otherwise NULL. */ 1384 robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { 1385 robj *o = NULL, *ele, *dec; 1386 uint64_t len; 1387 unsigned int i; 1388 1389 if (rdbtype == RDB_TYPE_STRING) { 1390 /* Read string value */ 1391 if ((o = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL; 1392 o = tryObjectEncoding(o); 1393 } else if (rdbtype == RDB_TYPE_LIST) { 1394 /* Read list value */ 1395 if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL; 1396 1397 o = createQuicklistObject(); 1398 quicklistSetOptions(o->ptr, server.list_max_ziplist_size, 1399 server.list_compress_depth); 1400 1401 /* Load every single element of the list */ 1402 while(len--) { 1403 if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL; 1404 dec = getDecodedObject(ele); 1405 size_t len = sdslen(dec->ptr); 1406 quicklistPushTail(o->ptr, dec->ptr, len); 1407 decrRefCount(dec); 1408 decrRefCount(ele); 1409 } 1410 } else if (rdbtype == RDB_TYPE_SET) { 1411 /* Read Set value */ 1412 if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL; 1413 1414 /* Use a regular set when there are too many entries. */ 1415 if (len > server.set_max_intset_entries) { 1416 o = createSetObject(); 1417 /* It's faster to expand the dict to the right size asap in order 1418 * to avoid rehashing */ 1419 if (len > DICT_HT_INITIAL_SIZE) 1420 dictExpand(o->ptr,len); 1421 } else { 1422 o = createIntsetObject(); 1423 } 1424 1425 /* Load every single element of the set */ 1426 for (i = 0; i < len; i++) { 1427 long long llval; 1428 sds sdsele; 1429 1430 if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) 1431 == NULL) return NULL; 1432 1433 if (o->encoding == OBJ_ENCODING_INTSET) { 1434 /* Fetch integer value from element. */ 1435 if (isSdsRepresentableAsLongLong(sdsele,&llval) == C_OK) { 1436 o->ptr = intsetAdd(o->ptr,llval,NULL); 1437 } else { 1438 setTypeConvert(o,OBJ_ENCODING_HT); 1439 dictExpand(o->ptr,len); 1440 } 1441 } 1442 1443 /* This will also be called when the set was just converted 1444 * to a regular hash table encoded set. */ 1445 if (o->encoding == OBJ_ENCODING_HT) { 1446 dictAdd((dict*)o->ptr,sdsele,NULL); 1447 } else { 1448 sdsfree(sdsele); 1449 } 1450 } 1451 } else if (rdbtype == RDB_TYPE_ZSET_2 || rdbtype == RDB_TYPE_ZSET) { 1452 /* Read list/set value. */ 1453 uint64_t zsetlen; 1454 size_t maxelelen = 0; 1455 zset *zs; 1456 1457 if ((zsetlen = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL; 1458 o = createZsetObject(); 1459 zs = o->ptr; 1460 1461 if (zsetlen > DICT_HT_INITIAL_SIZE) 1462 dictExpand(zs->dict,zsetlen); 1463 1464 /* Load every single element of the sorted set. */ 1465 while(zsetlen--) { 1466 sds sdsele; 1467 double score; 1468 zskiplistNode *znode; 1469 1470 if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) 1471 == NULL) return NULL; 1472 1473 if (rdbtype == RDB_TYPE_ZSET_2) { 1474 if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) return NULL; 1475 } else { 1476 if (rdbLoadDoubleValue(rdb,&score) == -1) return NULL; 1477 } 1478 1479 /* Don't care about integer-encoded strings. */ 1480 if (sdslen(sdsele) > maxelelen) maxelelen = sdslen(sdsele); 1481 1482 znode = zslInsert(zs->zsl,score,sdsele); 1483 dictAdd(zs->dict,sdsele,&znode->score); 1484 } 1485 1486 /* Convert *after* loading, since sorted sets are not stored ordered. */ 1487 if (zsetLength(o) <= server.zset_max_ziplist_entries && 1488 maxelelen <= server.zset_max_ziplist_value) 1489 zsetConvert(o,OBJ_ENCODING_ZIPLIST); 1490 } else if (rdbtype == RDB_TYPE_HASH) { 1491 uint64_t len; 1492 int ret; 1493 sds field, value; 1494 1495 len = rdbLoadLen(rdb, NULL); 1496 if (len == RDB_LENERR) return NULL; 1497 1498 o = createHashObject(); 1499 1500 /* Too many entries? Use a hash table. */ 1501 if (len > server.hash_max_ziplist_entries) 1502 hashTypeConvert(o, OBJ_ENCODING_HT); 1503 1504 /* Load every field and value into the ziplist */ 1505 while (o->encoding == OBJ_ENCODING_ZIPLIST && len > 0) { 1506 len--; 1507 /* Load raw strings */ 1508 if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) 1509 == NULL) return NULL; 1510 if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) 1511 == NULL) return NULL; 1512 1513 /* Add pair to ziplist */ 1514 o->ptr = ziplistPush(o->ptr, (unsigned char*)field, 1515 sdslen(field), ZIPLIST_TAIL); 1516 o->ptr = ziplistPush(o->ptr, (unsigned char*)value, 1517 sdslen(value), ZIPLIST_TAIL); 1518 1519 /* Convert to hash table if size threshold is exceeded */ 1520 if (sdslen(field) > server.hash_max_ziplist_value || 1521 sdslen(value) > server.hash_max_ziplist_value) 1522 { 1523 sdsfree(field); 1524 sdsfree(value); 1525 hashTypeConvert(o, OBJ_ENCODING_HT); 1526 break; 1527 } 1528 sdsfree(field); 1529 sdsfree(value); 1530 } 1531 1532 if (o->encoding == OBJ_ENCODING_HT && len > DICT_HT_INITIAL_SIZE) 1533 dictExpand(o->ptr,len); 1534 1535 /* Load remaining fields and values into the hash table */ 1536 while (o->encoding == OBJ_ENCODING_HT && len > 0) { 1537 len--; 1538 /* Load encoded strings */ 1539 if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) 1540 == NULL) return NULL; 1541 if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) 1542 == NULL) return NULL; 1543 1544 /* Add pair to hash table */ 1545 ret = dictAdd((dict*)o->ptr, field, value); 1546 if (ret == DICT_ERR) { 1547 rdbExitReportCorruptRDB("Duplicate keys detected"); 1548 } 1549 } 1550 1551 /* All pairs should be read by now */ 1552 serverAssert(len == 0); 1553 } else if (rdbtype == RDB_TYPE_LIST_QUICKLIST) { 1554 if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL; 1555 o = createQuicklistObject(); 1556 quicklistSetOptions(o->ptr, server.list_max_ziplist_size, 1557 server.list_compress_depth); 1558 1559 while (len--) { 1560 unsigned char *zl = 1561 rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL); 1562 if (zl == NULL) return NULL; 1563 quicklistAppendZiplist(o->ptr, zl); 1564 } 1565 } else if (rdbtype == RDB_TYPE_HASH_ZIPMAP || 1566 rdbtype == RDB_TYPE_LIST_ZIPLIST || 1567 rdbtype == RDB_TYPE_SET_INTSET || 1568 rdbtype == RDB_TYPE_ZSET_ZIPLIST || 1569 rdbtype == RDB_TYPE_HASH_ZIPLIST) 1570 { 1571 unsigned char *encoded = 1572 rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL); 1573 if (encoded == NULL) return NULL; 1574 o = createObject(OBJ_STRING,encoded); /* Obj type fixed below. */ 1575 1576 /* Fix the object encoding, and make sure to convert the encoded 1577 * data type into the base type if accordingly to the current 1578 * configuration there are too many elements in the encoded data 1579 * type. Note that we only check the length and not max element 1580 * size as this is an O(N) scan. Eventually everything will get 1581 * converted. */ 1582 switch(rdbtype) { 1583 case RDB_TYPE_HASH_ZIPMAP: 1584 /* Convert to ziplist encoded hash. This must be deprecated 1585 * when loading dumps created by Redis 2.4 gets deprecated. */ 1586 { 1587 unsigned char *zl = ziplistNew(); 1588 unsigned char *zi = zipmapRewind(o->ptr); 1589 unsigned char *fstr, *vstr; 1590 unsigned int flen, vlen; 1591 unsigned int maxlen = 0; 1592 1593 while ((zi = zipmapNext(zi, &fstr, &flen, &vstr, &vlen)) != NULL) { 1594 if (flen > maxlen) maxlen = flen; 1595 if (vlen > maxlen) maxlen = vlen; 1596 zl = ziplistPush(zl, fstr, flen, ZIPLIST_TAIL); 1597 zl = ziplistPush(zl, vstr, vlen, ZIPLIST_TAIL); 1598 } 1599 1600 zfree(o->ptr); 1601 o->ptr = zl; 1602 o->type = OBJ_HASH; 1603 o->encoding = OBJ_ENCODING_ZIPLIST; 1604 1605 if (hashTypeLength(o) > server.hash_max_ziplist_entries || 1606 maxlen > server.hash_max_ziplist_value) 1607 { 1608 hashTypeConvert(o, OBJ_ENCODING_HT); 1609 } 1610 } 1611 break; 1612 case RDB_TYPE_LIST_ZIPLIST: 1613 o->type = OBJ_LIST; 1614 o->encoding = OBJ_ENCODING_ZIPLIST; 1615 listTypeConvert(o,OBJ_ENCODING_QUICKLIST); 1616 break; 1617 case RDB_TYPE_SET_INTSET: 1618 o->type = OBJ_SET; 1619 o->encoding = OBJ_ENCODING_INTSET; 1620 if (intsetLen(o->ptr) > server.set_max_intset_entries) 1621 setTypeConvert(o,OBJ_ENCODING_HT); 1622 break; 1623 case RDB_TYPE_ZSET_ZIPLIST: 1624 o->type = OBJ_ZSET; 1625 o->encoding = OBJ_ENCODING_ZIPLIST; 1626 if (zsetLength(o) > server.zset_max_ziplist_entries) 1627 zsetConvert(o,OBJ_ENCODING_SKIPLIST); 1628 break; 1629 case RDB_TYPE_HASH_ZIPLIST: 1630 o->type = OBJ_HASH; 1631 o->encoding = OBJ_ENCODING_ZIPLIST; 1632 if (hashTypeLength(o) > server.hash_max_ziplist_entries) 1633 hashTypeConvert(o, OBJ_ENCODING_HT); 1634 break; 1635 default: 1636 rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype); 1637 break; 1638 } 1639 } else if (rdbtype == RDB_TYPE_STREAM_LISTPACKS) { 1640 o = createStreamObject(); 1641 stream *s = o->ptr; 1642 uint64_t listpacks = rdbLoadLen(rdb,NULL); 1643 1644 while(listpacks--) { 1645 /* Get the master ID, the one we'll use as key of the radix tree 1646 * node: the entries inside the listpack itself are delta-encoded 1647 * relatively to this ID. */ 1648 sds nodekey = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL); 1649 if (nodekey == NULL) { 1650 rdbExitReportCorruptRDB("Stream master ID loading failed: invalid encoding or I/O error."); 1651 } 1652 if (sdslen(nodekey) != sizeof(streamID)) { 1653 rdbExitReportCorruptRDB("Stream node key entry is not the " 1654 "size of a stream ID"); 1655 } 1656 1657 /* Load the listpack. */ 1658 unsigned char *lp = 1659 rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL); 1660 if (lp == NULL) return NULL; 1661 unsigned char *first = lpFirst(lp); 1662 if (first == NULL) { 1663 /* Serialized listpacks should never be empty, since on 1664 * deletion we should remove the radix tree key if the 1665 * resulting listpack is empty. */ 1666 rdbExitReportCorruptRDB("Empty listpack inside stream"); 1667 } 1668 1669 /* Insert the key in the radix tree. */ 1670 int retval = raxInsert(s->rax, 1671 (unsigned char*)nodekey,sizeof(streamID),lp,NULL); 1672 sdsfree(nodekey); 1673 if (!retval) 1674 rdbExitReportCorruptRDB("Listpack re-added with existing key"); 1675 } 1676 /* Load total number of items inside the stream. */ 1677 s->length = rdbLoadLen(rdb,NULL); 1678 /* Load the last entry ID. */ 1679 s->last_id.ms = rdbLoadLen(rdb,NULL); 1680 s->last_id.seq = rdbLoadLen(rdb,NULL); 1681 1682 /* Consumer groups loading */ 1683 size_t cgroups_count = rdbLoadLen(rdb,NULL); 1684 while(cgroups_count--) { 1685 /* Get the consumer group name and ID. We can then create the 1686 * consumer group ASAP and populate its structure as 1687 * we read more data. */ 1688 streamID cg_id; 1689 sds cgname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL); 1690 if (cgname == NULL) { 1691 rdbExitReportCorruptRDB( 1692 "Error reading the consumer group name from Stream"); 1693 } 1694 cg_id.ms = rdbLoadLen(rdb,NULL); 1695 cg_id.seq = rdbLoadLen(rdb,NULL); 1696 streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id); 1697 if (cgroup == NULL) 1698 rdbExitReportCorruptRDB("Duplicated consumer group name %s", 1699 cgname); 1700 sdsfree(cgname); 1701 1702 /* Load the global PEL for this consumer group, however we'll 1703 * not yet populate the NACK structures with the message 1704 * owner, since consumers for this group and their messages will 1705 * be read as a next step. So for now leave them not resolved 1706 * and later populate it. */ 1707 size_t pel_size = rdbLoadLen(rdb,NULL); 1708 while(pel_size--) { 1709 unsigned char rawid[sizeof(streamID)]; 1710 rdbLoadRaw(rdb,rawid,sizeof(rawid)); 1711 streamNACK *nack = streamCreateNACK(NULL); 1712 nack->delivery_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); 1713 nack->delivery_count = rdbLoadLen(rdb,NULL); 1714 if (!raxInsert(cgroup->pel,rawid,sizeof(rawid),nack,NULL)) 1715 rdbExitReportCorruptRDB("Duplicated gobal PEL entry " 1716 "loading stream consumer group"); 1717 } 1718 1719 /* Now that we loaded our global PEL, we need to load the 1720 * consumers and their local PELs. */ 1721 size_t consumers_num = rdbLoadLen(rdb,NULL); 1722 while(consumers_num--) { 1723 sds cname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL); 1724 if (cname == NULL) { 1725 rdbExitReportCorruptRDB( 1726 "Error reading the consumer name from Stream group"); 1727 } 1728 streamConsumer *consumer = streamLookupConsumer(cgroup,cname, 1729 1); 1730 sdsfree(cname); 1731 consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); 1732 1733 /* Load the PEL about entries owned by this specific 1734 * consumer. */ 1735 pel_size = rdbLoadLen(rdb,NULL); 1736 while(pel_size--) { 1737 unsigned char rawid[sizeof(streamID)]; 1738 rdbLoadRaw(rdb,rawid,sizeof(rawid)); 1739 streamNACK *nack = raxFind(cgroup->pel,rawid,sizeof(rawid)); 1740 if (nack == raxNotFound) 1741 rdbExitReportCorruptRDB("Consumer entry not found in " 1742 "group global PEL"); 1743 1744 /* Set the NACK consumer, that was left to NULL when 1745 * loading the global PEL. Then set the same shared 1746 * NACK structure also in the consumer-specific PEL. */ 1747 nack->consumer = consumer; 1748 if (!raxInsert(consumer->pel,rawid,sizeof(rawid),nack,NULL)) 1749 rdbExitReportCorruptRDB("Duplicated consumer PEL entry " 1750 " loading a stream consumer " 1751 "group"); 1752 } 1753 } 1754 } 1755 } else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) { 1756 uint64_t moduleid = rdbLoadLen(rdb,NULL); 1757 moduleType *mt = moduleTypeLookupModuleByID(moduleid); 1758 char name[10]; 1759 1760 if (rdbCheckMode && rdbtype == RDB_TYPE_MODULE_2) { 1761 moduleTypeNameByID(name,moduleid); 1762 return rdbLoadCheckModuleValue(rdb,name); 1763 } 1764 1765 if (mt == NULL) { 1766 moduleTypeNameByID(name,moduleid); 1767 serverLog(LL_WARNING,"The RDB file contains module data I can't load: no matching module '%s'", name); 1768 exit(1); 1769 } 1770 RedisModuleIO io; 1771 moduleInitIOContext(io,mt,rdb,key); 1772 io.ver = (rdbtype == RDB_TYPE_MODULE) ? 1 : 2; 1773 /* Call the rdb_load method of the module providing the 10 bit 1774 * encoding version in the lower 10 bits of the module ID. */ 1775 void *ptr = mt->rdb_load(&io,moduleid&1023); 1776 if (io.ctx) { 1777 moduleFreeContext(io.ctx); 1778 zfree(io.ctx); 1779 } 1780 1781 /* Module v2 serialization has an EOF mark at the end. */ 1782 if (io.ver == 2) { 1783 uint64_t eof = rdbLoadLen(rdb,NULL); 1784 if (eof != RDB_MODULE_OPCODE_EOF) { 1785 serverLog(LL_WARNING,"The RDB file contains module data for the module '%s' that is not terminated by the proper module value EOF marker", name); 1786 exit(1); 1787 } 1788 } 1789 1790 if (ptr == NULL) { 1791 moduleTypeNameByID(name,moduleid); 1792 serverLog(LL_WARNING,"The RDB file contains module data for the module type '%s', that the responsible module is not able to load. Check for modules log above for additional clues.", name); 1793 exit(1); 1794 } 1795 o = createModuleObject(mt,ptr); 1796 } else { 1797 rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype); 1798 } 1799 return o; 1800 } 1801 1802 /* Mark that we are loading in the global state and setup the fields 1803 * needed to provide loading stats. */ 1804 void startLoading(FILE *fp) { 1805 struct stat sb; 1806 1807 /* Load the DB */ 1808 server.loading = 1; 1809 server.loading_start_time = time(NULL); 1810 server.loading_loaded_bytes = 0; 1811 if (fstat(fileno(fp), &sb) == -1) { 1812 server.loading_total_bytes = 0; 1813 } else { 1814 server.loading_total_bytes = sb.st_size; 1815 } 1816 } 1817 1818 /* Refresh the loading progress info */ 1819 void loadingProgress(off_t pos) { 1820 server.loading_loaded_bytes = pos; 1821 if (server.stat_peak_memory < zmalloc_used_memory()) 1822 server.stat_peak_memory = zmalloc_used_memory(); 1823 } 1824 1825 /* Loading finished */ 1826 void stopLoading(void) { 1827 server.loading = 0; 1828 } 1829 1830 /* Track loading progress in order to serve client's from time to time 1831 and if needed calculate rdb checksum */ 1832 void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { 1833 if (server.rdb_checksum) 1834 rioGenericUpdateChecksum(r, buf, len); 1835 if (server.loading_process_events_interval_bytes && 1836 (r->processed_bytes + len)/server.loading_process_events_interval_bytes > r->processed_bytes/server.loading_process_events_interval_bytes) 1837 { 1838 /* The DB can take some non trivial amount of time to load. Update 1839 * our cached time since it is used to create and update the last 1840 * interaction time with clients and for other important things. */ 1841 updateCachedTime(); 1842 if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER) 1843 replicationSendNewlineToMaster(); 1844 loadingProgress(r->processed_bytes); 1845 processEventsWhileBlocked(); 1846 } 1847 } 1848 1849 /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, 1850 * otherwise C_ERR is returned and 'errno' is set accordingly. */ 1851 int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { 1852 uint64_t dbid; 1853 int type, rdbver; 1854 redisDb *db = server.db+0; 1855 char buf[1024]; 1856 1857 rdb->update_cksum = rdbLoadProgressCallback; 1858 rdb->max_processing_chunk = server.loading_process_events_interval_bytes; 1859 if (rioRead(rdb,buf,9) == 0) goto eoferr; 1860 buf[9] = '\0'; 1861 if (memcmp(buf,"REDIS",5) != 0) { 1862 serverLog(LL_WARNING,"Wrong signature trying to load DB from file"); 1863 errno = EINVAL; 1864 return C_ERR; 1865 } 1866 rdbver = atoi(buf+5); 1867 if (rdbver < 1 || rdbver > RDB_VERSION) { 1868 serverLog(LL_WARNING,"Can't handle RDB format version %d",rdbver); 1869 errno = EINVAL; 1870 return C_ERR; 1871 } 1872 1873 /* Key-specific attributes, set by opcodes before the key type. */ 1874 long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime(); 1875 long long lru_clock = LRU_CLOCK(); 1876 1877 while(1) { 1878 robj *key, *val; 1879 1880 /* Read type. */ 1881 if ((type = rdbLoadType(rdb)) == -1) goto eoferr; 1882 1883 /* Handle special types. */ 1884 if (type == RDB_OPCODE_EXPIRETIME) { 1885 /* EXPIRETIME: load an expire associated with the next key 1886 * to load. Note that after loading an expire we need to 1887 * load the actual type, and continue. */ 1888 expiretime = rdbLoadTime(rdb); 1889 expiretime *= 1000; 1890 continue; /* Read next opcode. */ 1891 } else if (type == RDB_OPCODE_EXPIRETIME_MS) { 1892 /* EXPIRETIME_MS: milliseconds precision expire times introduced 1893 * with RDB v3. Like EXPIRETIME but no with more precision. */ 1894 expiretime = rdbLoadMillisecondTime(rdb,rdbver); 1895 continue; /* Read next opcode. */ 1896 } else if (type == RDB_OPCODE_FREQ) { 1897 /* FREQ: LFU frequency. */ 1898 uint8_t byte; 1899 if (rioRead(rdb,&byte,1) == 0) goto eoferr; 1900 lfu_freq = byte; 1901 continue; /* Read next opcode. */ 1902 } else if (type == RDB_OPCODE_IDLE) { 1903 /* IDLE: LRU idle time. */ 1904 uint64_t qword; 1905 if ((qword = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr; 1906 lru_idle = qword; 1907 continue; /* Read next opcode. */ 1908 } else if (type == RDB_OPCODE_EOF) { 1909 /* EOF: End of file, exit the main loop. */ 1910 break; 1911 } else if (type == RDB_OPCODE_SELECTDB) { 1912 /* SELECTDB: Select the specified database. */ 1913 if ((dbid = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr; 1914 if (dbid >= (unsigned)server.dbnum) { 1915 serverLog(LL_WARNING, 1916 "FATAL: Data file was created with a Redis " 1917 "server configured to handle more than %d " 1918 "databases. Exiting\n", server.dbnum); 1919 exit(1); 1920 } 1921 db = server.db+dbid; 1922 continue; /* Read next opcode. */ 1923 } else if (type == RDB_OPCODE_RESIZEDB) { 1924 /* RESIZEDB: Hint about the size of the keys in the currently 1925 * selected data base, in order to avoid useless rehashing. */ 1926 uint64_t db_size, expires_size; 1927 if ((db_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR) 1928 goto eoferr; 1929 if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR) 1930 goto eoferr; 1931 dictExpand(db->dict,db_size); 1932 dictExpand(db->expires,expires_size); 1933 continue; /* Read next opcode. */ 1934 } else if (type == RDB_OPCODE_AUX) { 1935 /* AUX: generic string-string fields. Use to add state to RDB 1936 * which is backward compatible. Implementations of RDB loading 1937 * are requierd to skip AUX fields they don't understand. 1938 * 1939 * An AUX field is composed of two strings: key and value. */ 1940 robj *auxkey, *auxval; 1941 if ((auxkey = rdbLoadStringObject(rdb)) == NULL) goto eoferr; 1942 if ((auxval = rdbLoadStringObject(rdb)) == NULL) goto eoferr; 1943 1944 if (((char*)auxkey->ptr)[0] == '%') { 1945 /* All the fields with a name staring with '%' are considered 1946 * information fields and are logged at startup with a log 1947 * level of NOTICE. */ 1948 serverLog(LL_NOTICE,"RDB '%s': %s", 1949 (char*)auxkey->ptr, 1950 (char*)auxval->ptr); 1951 } else if (!strcasecmp(auxkey->ptr,"repl-stream-db")) { 1952 if (rsi) rsi->repl_stream_db = atoi(auxval->ptr); 1953 } else if (!strcasecmp(auxkey->ptr,"repl-id")) { 1954 if (rsi && sdslen(auxval->ptr) == CONFIG_RUN_ID_SIZE) { 1955 memcpy(rsi->repl_id,auxval->ptr,CONFIG_RUN_ID_SIZE+1); 1956 rsi->repl_id_is_set = 1; 1957 } 1958 } else if (!strcasecmp(auxkey->ptr,"repl-offset")) { 1959 if (rsi) rsi->repl_offset = strtoll(auxval->ptr,NULL,10); 1960 } else if (!strcasecmp(auxkey->ptr,"lua")) { 1961 /* Load the script back in memory. */ 1962 if (luaCreateFunction(NULL,server.lua,auxval) == NULL) { 1963 rdbExitReportCorruptRDB( 1964 "Can't load Lua script from RDB file! " 1965 "BODY: %s", auxval->ptr); 1966 } 1967 } else { 1968 /* We ignore fields we don't understand, as by AUX field 1969 * contract. */ 1970 serverLog(LL_DEBUG,"Unrecognized RDB AUX field: '%s'", 1971 (char*)auxkey->ptr); 1972 } 1973 1974 decrRefCount(auxkey); 1975 decrRefCount(auxval); 1976 continue; /* Read type again. */ 1977 } else if (type == RDB_OPCODE_MODULE_AUX) { 1978 /* This is just for compatibility with the future: we have plans 1979 * to add the ability for modules to store anything in the RDB 1980 * file, like data that is not related to the Redis key space. 1981 * Such data will potentially be stored both before and after the 1982 * RDB keys-values section. For this reason since RDB version 9, 1983 * we have the ability to read a MODULE_AUX opcode followed by an 1984 * identifier of the module, and a serialized value in "MODULE V2" 1985 * format. */ 1986 uint64_t moduleid = rdbLoadLen(rdb,NULL); 1987 moduleType *mt = moduleTypeLookupModuleByID(moduleid); 1988 char name[10]; 1989 moduleTypeNameByID(name,moduleid); 1990 1991 if (!rdbCheckMode && mt == NULL) { 1992 /* Unknown module. */ 1993 serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load: no matching module '%s'", name); 1994 exit(1); 1995 } else if (!rdbCheckMode && mt != NULL) { 1996 /* This version of Redis actually does not know what to do 1997 * with modules AUX data... */ 1998 serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load for the module '%s'. Probably you want to use a newer version of Redis which implements aux data callbacks", name); 1999 exit(1); 2000 } else { 2001 /* RDB check mode. */ 2002 robj *aux = rdbLoadCheckModuleValue(rdb,name); 2003 decrRefCount(aux); 2004 } 2005 } 2006 2007 /* Read key */ 2008 if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr; 2009 /* Read value */ 2010 if ((val = rdbLoadObject(type,rdb,key)) == NULL) goto eoferr; 2011 /* Check if the key already expired. This function is used when loading 2012 * an RDB file from disk, either at startup, or when an RDB was 2013 * received from the master. In the latter case, the master is 2014 * responsible for key expiry. If we would expire keys here, the 2015 * snapshot taken by the master may not be reflected on the slave. */ 2016 if (server.masterhost == NULL && !loading_aof && expiretime != -1 && expiretime < now) { 2017 decrRefCount(key); 2018 decrRefCount(val); 2019 } else { 2020 /* Add the new object in the hash table */ 2021 dbAdd(db,key,val); 2022 2023 /* Set the expire time if needed */ 2024 if (expiretime != -1) setExpire(NULL,db,key,expiretime); 2025 2026 /* Set usage information (for eviction). */ 2027 objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock); 2028 2029 /* Decrement the key refcount since dbAdd() will take its 2030 * own reference. */ 2031 decrRefCount(key); 2032 } 2033 2034 /* Reset the state that is key-specified and is populated by 2035 * opcodes before the key, so that we start from scratch again. */ 2036 expiretime = -1; 2037 lfu_freq = -1; 2038 lru_idle = -1; 2039 } 2040 /* Verify the checksum if RDB version is >= 5 */ 2041 if (rdbver >= 5) { 2042 uint64_t cksum, expected = rdb->cksum; 2043 2044 if (rioRead(rdb,&cksum,8) == 0) goto eoferr; 2045 if (server.rdb_checksum) { 2046 memrev64ifbe(&cksum); 2047 if (cksum == 0) { 2048 serverLog(LL_WARNING,"RDB file was saved with checksum disabled: no check performed."); 2049 } else if (cksum != expected) { 2050 serverLog(LL_WARNING,"Wrong RDB checksum. Aborting now."); 2051 rdbExitReportCorruptRDB("RDB CRC error"); 2052 } 2053 } 2054 } 2055 return C_OK; 2056 2057 eoferr: /* unexpected end of file is handled here with a fatal exit */ 2058 serverLog(LL_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now."); 2059 rdbExitReportCorruptRDB("Unexpected EOF reading RDB file"); 2060 return C_ERR; /* Just to avoid warning */ 2061 } 2062 2063 /* Like rdbLoadRio() but takes a filename instead of a rio stream. The 2064 * filename is open for reading and a rio stream object created in order 2065 * to do the actual loading. Moreover the ETA displayed in the INFO 2066 * output is initialized and finalized. 2067 * 2068 * If you pass an 'rsi' structure initialied with RDB_SAVE_OPTION_INIT, the 2069 * loading code will fiil the information fields in the structure. */ 2070 int rdbLoad(char *filename, rdbSaveInfo *rsi) { 2071 FILE *fp; 2072 rio rdb; 2073 int retval; 2074 2075 if ((fp = fopen(filename,"r")) == NULL) return C_ERR; 2076 startLoading(fp); 2077 rioInitWithFile(&rdb,fp); 2078 retval = rdbLoadRio(&rdb,rsi,0); 2079 fclose(fp); 2080 stopLoading(); 2081 return retval; 2082 } 2083 2084 /* A background saving child (BGSAVE) terminated its work. Handle this. 2085 * This function covers the case of actual BGSAVEs. */ 2086 void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) { 2087 if (!bysignal && exitcode == 0) { 2088 serverLog(LL_NOTICE, 2089 "Background saving terminated with success"); 2090 server.dirty = server.dirty - server.dirty_before_bgsave; 2091 server.lastsave = time(NULL); 2092 server.lastbgsave_status = C_OK; 2093 } else if (!bysignal && exitcode != 0) { 2094 serverLog(LL_WARNING, "Background saving error"); 2095 server.lastbgsave_status = C_ERR; 2096 } else { 2097 mstime_t latency; 2098 2099 serverLog(LL_WARNING, 2100 "Background saving terminated by signal %d", bysignal); 2101 latencyStartMonitor(latency); 2102 rdbRemoveTempFile(server.rdb_child_pid); 2103 latencyEndMonitor(latency); 2104 latencyAddSampleIfNeeded("rdb-unlink-temp-file",latency); 2105 /* SIGUSR1 is whitelisted, so we have a way to kill a child without 2106 * tirggering an error condition. */ 2107 if (bysignal != SIGUSR1) 2108 server.lastbgsave_status = C_ERR; 2109 } 2110 server.rdb_child_pid = -1; 2111 server.rdb_child_type = RDB_CHILD_TYPE_NONE; 2112 server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start; 2113 server.rdb_save_time_start = -1; 2114 /* Possibly there are slaves waiting for a BGSAVE in order to be served 2115 * (the first stage of SYNC is a bulk transfer of dump.rdb) */ 2116 updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? C_OK : C_ERR, RDB_CHILD_TYPE_DISK); 2117 } 2118 2119 /* A background saving child (BGSAVE) terminated its work. Handle this. 2120 * This function covers the case of RDB -> Salves socket transfers for 2121 * diskless replication. */ 2122 void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { 2123 uint64_t *ok_slaves; 2124 2125 if (!bysignal && exitcode == 0) { 2126 serverLog(LL_NOTICE, 2127 "Background RDB transfer terminated with success"); 2128 } else if (!bysignal && exitcode != 0) { 2129 serverLog(LL_WARNING, "Background transfer error"); 2130 } else { 2131 serverLog(LL_WARNING, 2132 "Background transfer terminated by signal %d", bysignal); 2133 } 2134 server.rdb_child_pid = -1; 2135 server.rdb_child_type = RDB_CHILD_TYPE_NONE; 2136 server.rdb_save_time_start = -1; 2137 2138 /* If the child returns an OK exit code, read the set of slave client 2139 * IDs and the associated status code. We'll terminate all the slaves 2140 * in error state. 2141 * 2142 * If the process returned an error, consider the list of slaves that 2143 * can continue to be empty, so that it's just a special case of the 2144 * normal code path. */ 2145 ok_slaves = zmalloc(sizeof(uint64_t)); /* Make space for the count. */ 2146 ok_slaves[0] = 0; 2147 if (!bysignal && exitcode == 0) { 2148 int readlen = sizeof(uint64_t); 2149 2150 if (read(server.rdb_pipe_read_result_from_child, ok_slaves, readlen) == 2151 readlen) 2152 { 2153 readlen = ok_slaves[0]*sizeof(uint64_t)*2; 2154 2155 /* Make space for enough elements as specified by the first 2156 * uint64_t element in the array. */ 2157 ok_slaves = zrealloc(ok_slaves,sizeof(uint64_t)+readlen); 2158 if (readlen && 2159 read(server.rdb_pipe_read_result_from_child, ok_slaves+1, 2160 readlen) != readlen) 2161 { 2162 ok_slaves[0] = 0; 2163 } 2164 } 2165 } 2166 2167 close(server.rdb_pipe_read_result_from_child); 2168 close(server.rdb_pipe_write_result_to_parent); 2169 2170 /* We can continue the replication process with all the slaves that 2171 * correctly received the full payload. Others are terminated. */ 2172 listNode *ln; 2173 listIter li; 2174 2175 listRewind(server.slaves,&li); 2176 while((ln = listNext(&li))) { 2177 client *slave = ln->value; 2178 2179 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) { 2180 uint64_t j; 2181 int errorcode = 0; 2182 2183 /* Search for the slave ID in the reply. In order for a slave to 2184 * continue the replication process, we need to find it in the list, 2185 * and it must have an error code set to 0 (which means success). */ 2186 for (j = 0; j < ok_slaves[0]; j++) { 2187 if (slave->id == ok_slaves[2*j+1]) { 2188 errorcode = ok_slaves[2*j+2]; 2189 break; /* Found in slaves list. */ 2190 } 2191 } 2192 if (j == ok_slaves[0] || errorcode != 0) { 2193 serverLog(LL_WARNING, 2194 "Closing slave %s: child->slave RDB transfer failed: %s", 2195 replicationGetSlaveName(slave), 2196 (errorcode == 0) ? "RDB transfer child aborted" 2197 : strerror(errorcode)); 2198 freeClient(slave); 2199 } else { 2200 serverLog(LL_WARNING, 2201 "Slave %s correctly received the streamed RDB file.", 2202 replicationGetSlaveName(slave)); 2203 /* Restore the socket as non-blocking. */ 2204 anetNonBlock(NULL,slave->fd); 2205 anetSendTimeout(NULL,slave->fd,0); 2206 } 2207 } 2208 } 2209 zfree(ok_slaves); 2210 2211 updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? C_OK : C_ERR, RDB_CHILD_TYPE_SOCKET); 2212 } 2213 2214 /* When a background RDB saving/transfer terminates, call the right handler. */ 2215 void backgroundSaveDoneHandler(int exitcode, int bysignal) { 2216 switch(server.rdb_child_type) { 2217 case RDB_CHILD_TYPE_DISK: 2218 backgroundSaveDoneHandlerDisk(exitcode,bysignal); 2219 break; 2220 case RDB_CHILD_TYPE_SOCKET: 2221 backgroundSaveDoneHandlerSocket(exitcode,bysignal); 2222 break; 2223 default: 2224 serverPanic("Unknown RDB child type."); 2225 break; 2226 } 2227 } 2228 2229 /* Spawn an RDB child that writes the RDB to the sockets of the slaves 2230 * that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */ 2231 int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { 2232 int *fds; 2233 uint64_t *clientids; 2234 int numfds; 2235 listNode *ln; 2236 listIter li; 2237 pid_t childpid; 2238 long long start; 2239 int pipefds[2]; 2240 2241 if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR; 2242 2243 /* Before to fork, create a pipe that will be used in order to 2244 * send back to the parent the IDs of the slaves that successfully 2245 * received all the writes. */ 2246 if (pipe(pipefds) == -1) return C_ERR; 2247 server.rdb_pipe_read_result_from_child = pipefds[0]; 2248 server.rdb_pipe_write_result_to_parent = pipefds[1]; 2249 2250 /* Collect the file descriptors of the slaves we want to transfer 2251 * the RDB to, which are i WAIT_BGSAVE_START state. */ 2252 fds = zmalloc(sizeof(int)*listLength(server.slaves)); 2253 /* We also allocate an array of corresponding client IDs. This will 2254 * be useful for the child process in order to build the report 2255 * (sent via unix pipe) that will be sent to the parent. */ 2256 clientids = zmalloc(sizeof(uint64_t)*listLength(server.slaves)); 2257 numfds = 0; 2258 2259 listRewind(server.slaves,&li); 2260 while((ln = listNext(&li))) { 2261 client *slave = ln->value; 2262 2263 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { 2264 clientids[numfds] = slave->id; 2265 fds[numfds++] = slave->fd; 2266 replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset()); 2267 /* Put the socket in blocking mode to simplify RDB transfer. 2268 * We'll restore it when the children returns (since duped socket 2269 * will share the O_NONBLOCK attribute with the parent). */ 2270 anetBlock(NULL,slave->fd); 2271 anetSendTimeout(NULL,slave->fd,server.repl_timeout*1000); 2272 } 2273 } 2274 2275 /* Create the child process. */ 2276 openChildInfoPipe(); 2277 start = ustime(); 2278 if ((childpid = fork()) == 0) { 2279 /* Child */ 2280 int retval; 2281 rio slave_sockets; 2282 2283 rioInitWithFdset(&slave_sockets,fds,numfds); 2284 zfree(fds); 2285 2286 closeListeningSockets(0); 2287 resetCpuAffinity("rdb2slave"); 2288 redisSetProcTitle("redis-rdb-to-slaves"); 2289 2290 retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL,rsi); 2291 if (retval == C_OK && rioFlush(&slave_sockets) == 0) 2292 retval = C_ERR; 2293 2294 if (retval == C_OK) { 2295 size_t private_dirty = zmalloc_get_private_dirty(-1); 2296 2297 if (private_dirty) { 2298 serverLog(LL_NOTICE, 2299 "RDB: %zu MB of memory used by copy-on-write", 2300 private_dirty/(1024*1024)); 2301 } 2302 2303 server.child_info_data.cow_size = private_dirty; 2304 sendChildInfo(CHILD_INFO_TYPE_RDB); 2305 2306 /* If we are returning OK, at least one slave was served 2307 * with the RDB file as expected, so we need to send a report 2308 * to the parent via the pipe. The format of the message is: 2309 * 2310 * <len> <slave[0].id> <slave[0].error> ... 2311 * 2312 * len, slave IDs, and slave errors, are all uint64_t integers, 2313 * so basically the reply is composed of 64 bits for the len field 2314 * plus 2 additional 64 bit integers for each entry, for a total 2315 * of 'len' entries. 2316 * 2317 * The 'id' represents the slave's client ID, so that the master 2318 * can match the report with a specific slave, and 'error' is 2319 * set to 0 if the replication process terminated with a success 2320 * or the error code if an error occurred. */ 2321 void *msg = zmalloc(sizeof(uint64_t)*(1+2*numfds)); 2322 uint64_t *len = msg; 2323 uint64_t *ids = len+1; 2324 int j, msglen; 2325 2326 *len = numfds; 2327 for (j = 0; j < numfds; j++) { 2328 *ids++ = clientids[j]; 2329 *ids++ = slave_sockets.io.fdset.state[j]; 2330 } 2331 2332 /* Write the message to the parent. If we have no good slaves or 2333 * we are unable to transfer the message to the parent, we exit 2334 * with an error so that the parent will abort the replication 2335 * process with all the childre that were waiting. */ 2336 msglen = sizeof(uint64_t)*(1+2*numfds); 2337 if (*len == 0 || 2338 write(server.rdb_pipe_write_result_to_parent,msg,msglen) 2339 != msglen) 2340 { 2341 retval = C_ERR; 2342 } 2343 zfree(msg); 2344 } 2345 zfree(clientids); 2346 rioFreeFdset(&slave_sockets); 2347 exitFromChild((retval == C_OK) ? 0 : 1); 2348 } else { 2349 /* Parent */ 2350 if (childpid == -1) { 2351 serverLog(LL_WARNING,"Can't save in background: fork: %s", 2352 strerror(errno)); 2353 2354 /* Undo the state change. The caller will perform cleanup on 2355 * all the slaves in BGSAVE_START state, but an early call to 2356 * replicationSetupSlaveForFullResync() turned it into BGSAVE_END */ 2357 listRewind(server.slaves,&li); 2358 while((ln = listNext(&li))) { 2359 client *slave = ln->value; 2360 int j; 2361 2362 for (j = 0; j < numfds; j++) { 2363 if (slave->id == clientids[j]) { 2364 slave->replstate = SLAVE_STATE_WAIT_BGSAVE_START; 2365 break; 2366 } 2367 } 2368 } 2369 close(pipefds[0]); 2370 close(pipefds[1]); 2371 closeChildInfoPipe(); 2372 } else { 2373 server.stat_fork_time = ustime()-start; 2374 server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */ 2375 latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000); 2376 2377 serverLog(LL_NOTICE,"Background RDB transfer started by pid %d", 2378 childpid); 2379 server.rdb_save_time_start = time(NULL); 2380 server.rdb_child_pid = childpid; 2381 server.rdb_child_type = RDB_CHILD_TYPE_SOCKET; 2382 updateDictResizePolicy(); 2383 } 2384 zfree(clientids); 2385 zfree(fds); 2386 return (childpid == -1) ? C_ERR : C_OK; 2387 } 2388 return C_OK; /* Unreached. */ 2389 } 2390 2391 void saveCommand(client *c) { 2392 if (server.rdb_child_pid != -1) { 2393 addReplyError(c,"Background save already in progress"); 2394 return; 2395 } 2396 rdbSaveInfo rsi, *rsiptr; 2397 rsiptr = rdbPopulateSaveInfo(&rsi); 2398 if (rdbSave(server.rdb_filename,rsiptr) == C_OK) { 2399 addReply(c,shared.ok); 2400 } else { 2401 addReply(c,shared.err); 2402 } 2403 } 2404 2405 /* BGSAVE [SCHEDULE] */ 2406 void bgsaveCommand(client *c) { 2407 int schedule = 0; 2408 2409 /* The SCHEDULE option changes the behavior of BGSAVE when an AOF rewrite 2410 * is in progress. Instead of returning an error a BGSAVE gets scheduled. */ 2411 if (c->argc > 1) { 2412 if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"schedule")) { 2413 schedule = 1; 2414 } else { 2415 addReply(c,shared.syntaxerr); 2416 return; 2417 } 2418 } 2419 2420 rdbSaveInfo rsi, *rsiptr; 2421 rsiptr = rdbPopulateSaveInfo(&rsi); 2422 2423 if (server.rdb_child_pid != -1) { 2424 addReplyError(c,"Background save already in progress"); 2425 } else if (server.aof_child_pid != -1) { 2426 if (schedule) { 2427 server.rdb_bgsave_scheduled = 1; 2428 addReplyStatus(c,"Background saving scheduled"); 2429 } else { 2430 addReplyError(c, 2431 "An AOF log rewriting in progress: can't BGSAVE right now. " 2432 "Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever " 2433 "possible."); 2434 } 2435 } else if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK) { 2436 addReplyStatus(c,"Background saving started"); 2437 } else { 2438 addReply(c,shared.err); 2439 } 2440 } 2441 2442 /* Populate the rdbSaveInfo structure used to persist the replication 2443 * information inside the RDB file. Currently the structure explicitly 2444 * contains just the currently selected DB from the master stream, however 2445 * if the rdbSave*() family functions receive a NULL rsi structure also 2446 * the Replication ID/offset is not saved. The function popultes 'rsi' 2447 * that is normally stack-allocated in the caller, returns the populated 2448 * pointer if the instance has a valid master client, otherwise NULL 2449 * is returned, and the RDB saving will not persist any replication related 2450 * information. */ 2451 rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) { 2452 rdbSaveInfo rsi_init = RDB_SAVE_INFO_INIT; 2453 *rsi = rsi_init; 2454 2455 /* If the instance is a master, we can populate the replication info 2456 * only when repl_backlog is not NULL. If the repl_backlog is NULL, 2457 * it means that the instance isn't in any replication chains. In this 2458 * scenario the replication info is useless, because when a slave 2459 * connects to us, the NULL repl_backlog will trigger a full 2460 * synchronization, at the same time we will use a new replid and clear 2461 * replid2. */ 2462 if (!server.masterhost && server.repl_backlog) { 2463 /* Note that when server.slaveseldb is -1, it means that this master 2464 * didn't apply any write commands after a full synchronization. 2465 * So we can let repl_stream_db be 0, this allows a restarted slave 2466 * to reload replication ID/offset, it's safe because the next write 2467 * command must generate a SELECT statement. */ 2468 rsi->repl_stream_db = server.slaveseldb == -1 ? 0 : server.slaveseldb; 2469 return rsi; 2470 } 2471 2472 /* If the instance is a slave we need a connected master 2473 * in order to fetch the currently selected DB. */ 2474 if (server.master) { 2475 rsi->repl_stream_db = server.master->db->id; 2476 return rsi; 2477 } 2478 2479 /* If we have a cached master we can use it in order to populate the 2480 * replication selected DB info inside the RDB file: the slave can 2481 * increment the master_repl_offset only from data arriving from the 2482 * master, so if we are disconnected the offset in the cached master 2483 * is valid. */ 2484 if (server.cached_master) { 2485 rsi->repl_stream_db = server.cached_master->db->id; 2486 return rsi; 2487 } 2488 return NULL; 2489 } 2490