xref: /redis-3.2.3/src/rdb.c (revision 21cffc26)
1 /*
2  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *   * Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *   * Redistributions in binary form must reproduce the above copyright
11  *     notice, this list of conditions and the following disclaimer in the
12  *     documentation and/or other materials provided with the distribution.
13  *   * Neither the name of Redis nor the names of its contributors may be used
14  *     to endorse or promote products derived from this software without
15  *     specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include "server.h"
31 #include "lzf.h"    /* LZF compression library */
32 #include "zipmap.h"
33 #include "endianconv.h"
34 
35 #include <math.h>
36 #include <sys/types.h>
37 #include <sys/time.h>
38 #include <sys/resource.h>
39 #include <sys/wait.h>
40 #include <arpa/inet.h>
41 #include <sys/stat.h>
42 #include <sys/param.h>
43 
44 #define RDB_LOAD_NONE   0
45 #define RDB_LOAD_ENC    (1<<0)
46 #define RDB_LOAD_PLAIN  (1<<1)
47 
48 #define rdbExitReportCorruptRDB(...) rdbCheckThenExit(__LINE__,__VA_ARGS__)
49 
50 extern int rdbCheckMode;
51 void rdbCheckError(const char *fmt, ...);
52 void rdbCheckSetError(const char *fmt, ...);
53 
rdbCheckThenExit(int linenum,char * reason,...)54 void rdbCheckThenExit(int linenum, char *reason, ...) {
55     va_list ap;
56     char msg[1024];
57     int len;
58 
59     len = snprintf(msg,sizeof(msg),
60         "Internal error in RDB reading function at rdb.c:%d -> ", linenum);
61     va_start(ap,reason);
62     vsnprintf(msg+len,sizeof(msg)-len,reason,ap);
63     va_end(ap);
64 
65     if (!rdbCheckMode) {
66         serverLog(LL_WARNING, "%s", msg);
67         char *argv[2] = {"",server.rdb_filename};
68         redis_check_rdb_main(2,argv);
69     } else {
70         rdbCheckError("%s",msg);
71     }
72     exit(1);
73 }
74 
rdbWriteRaw(rio * rdb,void * p,size_t len)75 static int rdbWriteRaw(rio *rdb, void *p, size_t len) {
76     if (rdb && rioWrite(rdb,p,len) == 0)
77         return -1;
78     return len;
79 }
80 
rdbSaveType(rio * rdb,unsigned char type)81 int rdbSaveType(rio *rdb, unsigned char type) {
82     return rdbWriteRaw(rdb,&type,1);
83 }
84 
85 /* Load a "type" in RDB format, that is a one byte unsigned integer.
86  * This function is not only used to load object types, but also special
87  * "types" like the end-of-file type, the EXPIRE type, and so forth. */
rdbLoadType(rio * rdb)88 int rdbLoadType(rio *rdb) {
89     unsigned char type;
90     if (rioRead(rdb,&type,1) == 0) return -1;
91     return type;
92 }
93 
rdbLoadTime(rio * rdb)94 time_t rdbLoadTime(rio *rdb) {
95     int32_t t32;
96     if (rioRead(rdb,&t32,4) == 0) return -1;
97     return (time_t)t32;
98 }
99 
rdbSaveMillisecondTime(rio * rdb,long long t)100 int rdbSaveMillisecondTime(rio *rdb, long long t) {
101     int64_t t64 = (int64_t) t;
102     return rdbWriteRaw(rdb,&t64,8);
103 }
104 
rdbLoadMillisecondTime(rio * rdb)105 long long rdbLoadMillisecondTime(rio *rdb) {
106     int64_t t64;
107     if (rioRead(rdb,&t64,8) == 0) return -1;
108     return (long long)t64;
109 }
110 
111 /* Saves an encoded length. The first two bits in the first byte are used to
112  * hold the encoding type. See the RDB_* definitions for more information
113  * on the types of encoding. */
rdbSaveLen(rio * rdb,uint32_t len)114 int rdbSaveLen(rio *rdb, uint32_t len) {
115     unsigned char buf[2];
116     size_t nwritten;
117 
118     if (len < (1<<6)) {
119         /* Save a 6 bit len */
120         buf[0] = (len&0xFF)|(RDB_6BITLEN<<6);
121         if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
122         nwritten = 1;
123     } else if (len < (1<<14)) {
124         /* Save a 14 bit len */
125         buf[0] = ((len>>8)&0xFF)|(RDB_14BITLEN<<6);
126         buf[1] = len&0xFF;
127         if (rdbWriteRaw(rdb,buf,2) == -1) return -1;
128         nwritten = 2;
129     } else {
130         /* Save a 32 bit len */
131         buf[0] = (RDB_32BITLEN<<6);
132         if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
133         len = htonl(len);
134         if (rdbWriteRaw(rdb,&len,4) == -1) return -1;
135         nwritten = 1+4;
136     }
137     return nwritten;
138 }
139 
140 /* Load an encoded length. The "isencoded" argument is set to 1 if the length
141  * is not actually a length but an "encoding type". See the RDB_ENC_*
142  * definitions in rdb.h for more information. */
rdbLoadLen(rio * rdb,int * isencoded)143 uint32_t rdbLoadLen(rio *rdb, int *isencoded) {
144     unsigned char buf[2];
145     uint32_t len;
146     int type;
147 
148     if (isencoded) *isencoded = 0;
149     if (rioRead(rdb,buf,1) == 0) return RDB_LENERR;
150     type = (buf[0]&0xC0)>>6;
151     if (type == RDB_ENCVAL) {
152         /* Read a 6 bit encoding type. */
153         if (isencoded) *isencoded = 1;
154         return buf[0]&0x3F;
155     } else if (type == RDB_6BITLEN) {
156         /* Read a 6 bit len. */
157         return buf[0]&0x3F;
158     } else if (type == RDB_14BITLEN) {
159         /* Read a 14 bit len. */
160         if (rioRead(rdb,buf+1,1) == 0) return RDB_LENERR;
161         return ((buf[0]&0x3F)<<8)|buf[1];
162     } else if (type == RDB_32BITLEN) {
163         /* Read a 32 bit len. */
164         if (rioRead(rdb,&len,4) == 0) return RDB_LENERR;
165         return ntohl(len);
166     } else {
167         rdbExitReportCorruptRDB(
168             "Unknown length encoding %d in rdbLoadLen()",type);
169         return -1; /* Never reached. */
170     }
171 }
172 
173 /* Encodes the "value" argument as integer when it fits in the supported ranges
174  * for encoded types. If the function successfully encodes the integer, the
175  * representation is stored in the buffer pointer to by "enc" and the string
176  * length is returned. Otherwise 0 is returned. */
rdbEncodeInteger(long long value,unsigned char * enc)177 int rdbEncodeInteger(long long value, unsigned char *enc) {
178     if (value >= -(1<<7) && value <= (1<<7)-1) {
179         enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT8;
180         enc[1] = value&0xFF;
181         return 2;
182     } else if (value >= -(1<<15) && value <= (1<<15)-1) {
183         enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT16;
184         enc[1] = value&0xFF;
185         enc[2] = (value>>8)&0xFF;
186         return 3;
187     } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
188         enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT32;
189         enc[1] = value&0xFF;
190         enc[2] = (value>>8)&0xFF;
191         enc[3] = (value>>16)&0xFF;
192         enc[4] = (value>>24)&0xFF;
193         return 5;
194     } else {
195         return 0;
196     }
197 }
198 
199 /* Loads an integer-encoded object with the specified encoding type "enctype".
200  * The returned value changes according to the flags, see
201  * rdbGenerincLoadStringObject() for more info. */
rdbLoadIntegerObject(rio * rdb,int enctype,int flags)202 void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags) {
203     int plain = flags & RDB_LOAD_PLAIN;
204     int encode = flags & RDB_LOAD_ENC;
205     unsigned char enc[4];
206     long long val;
207 
208     if (enctype == RDB_ENC_INT8) {
209         if (rioRead(rdb,enc,1) == 0) return NULL;
210         val = (signed char)enc[0];
211     } else if (enctype == RDB_ENC_INT16) {
212         uint16_t v;
213         if (rioRead(rdb,enc,2) == 0) return NULL;
214         v = enc[0]|(enc[1]<<8);
215         val = (int16_t)v;
216     } else if (enctype == RDB_ENC_INT32) {
217         uint32_t v;
218         if (rioRead(rdb,enc,4) == 0) return NULL;
219         v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
220         val = (int32_t)v;
221     } else {
222         val = 0; /* anti-warning */
223         rdbExitReportCorruptRDB("Unknown RDB integer encoding type %d",enctype);
224     }
225     if (plain) {
226         char buf[LONG_STR_SIZE], *p;
227         int len = ll2string(buf,sizeof(buf),val);
228         p = zmalloc(len);
229         memcpy(p,buf,len);
230         return p;
231     } else if (encode) {
232         return createStringObjectFromLongLong(val);
233     } else {
234         return createObject(OBJ_STRING,sdsfromlonglong(val));
235     }
236 }
237 
238 /* String objects in the form "2391" "-100" without any space and with a
239  * range of values that can fit in an 8, 16 or 32 bit signed value can be
240  * encoded as integers to save space */
rdbTryIntegerEncoding(char * s,size_t len,unsigned char * enc)241 int rdbTryIntegerEncoding(char *s, size_t len, unsigned char *enc) {
242     long long value;
243     char *endptr, buf[32];
244 
245     /* Check if it's possible to encode this value as a number */
246     value = strtoll(s, &endptr, 10);
247     if (endptr[0] != '\0') return 0;
248     ll2string(buf,32,value);
249 
250     /* If the number converted back into a string is not identical
251      * then it's not possible to encode the string as integer */
252     if (strlen(buf) != len || memcmp(buf,s,len)) return 0;
253 
254     return rdbEncodeInteger(value,enc);
255 }
256 
rdbSaveLzfBlob(rio * rdb,void * data,size_t compress_len,size_t original_len)257 ssize_t rdbSaveLzfBlob(rio *rdb, void *data, size_t compress_len,
258                        size_t original_len) {
259     unsigned char byte;
260     ssize_t n, nwritten = 0;
261 
262     /* Data compressed! Let's save it on disk */
263     byte = (RDB_ENCVAL<<6)|RDB_ENC_LZF;
264     if ((n = rdbWriteRaw(rdb,&byte,1)) == -1) goto writeerr;
265     nwritten += n;
266 
267     if ((n = rdbSaveLen(rdb,compress_len)) == -1) goto writeerr;
268     nwritten += n;
269 
270     if ((n = rdbSaveLen(rdb,original_len)) == -1) goto writeerr;
271     nwritten += n;
272 
273     if ((n = rdbWriteRaw(rdb,data,compress_len)) == -1) goto writeerr;
274     nwritten += n;
275 
276     return nwritten;
277 
278 writeerr:
279     return -1;
280 }
281 
rdbSaveLzfStringObject(rio * rdb,unsigned char * s,size_t len)282 ssize_t rdbSaveLzfStringObject(rio *rdb, unsigned char *s, size_t len) {
283     size_t comprlen, outlen;
284     void *out;
285 
286     /* We require at least four bytes compression for this to be worth it */
287     if (len <= 4) return 0;
288     outlen = len-4;
289     if ((out = zmalloc(outlen+1)) == NULL) return 0;
290     comprlen = lzf_compress(s, len, out, outlen);
291     if (comprlen == 0) {
292         zfree(out);
293         return 0;
294     }
295     ssize_t nwritten = rdbSaveLzfBlob(rdb, out, comprlen, len);
296     zfree(out);
297     return nwritten;
298 }
299 
300 /* Load an LZF compressed string in RDB format. The returned value
301  * changes according to 'flags'. For more info check the
302  * rdbGenericLoadStringObject() function. */
rdbLoadLzfStringObject(rio * rdb,int flags)303 void *rdbLoadLzfStringObject(rio *rdb, int flags) {
304     int plain = flags & RDB_LOAD_PLAIN;
305     unsigned int len, clen;
306     unsigned char *c = NULL;
307     sds val = NULL;
308 
309     if ((clen = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
310     if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
311     if ((c = zmalloc(clen)) == NULL) goto err;
312 
313     /* Allocate our target according to the uncompressed size. */
314     if (plain) {
315         val = zmalloc(len);
316     } else {
317         if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
318     }
319 
320     /* Load the compressed representation and uncompress it to target. */
321     if (rioRead(rdb,c,clen) == 0) goto err;
322     if (lzf_decompress(c,clen,val,len) == 0) {
323         if (rdbCheckMode) rdbCheckSetError("Invalid LZF compressed string");
324         goto err;
325     }
326     zfree(c);
327 
328     if (plain)
329         return val;
330     else
331         return createObject(OBJ_STRING,val);
332 err:
333     zfree(c);
334     if (plain)
335         zfree(val);
336     else
337         sdsfree(val);
338     return NULL;
339 }
340 
341 /* Save a string object as [len][data] on disk. If the object is a string
342  * representation of an integer value we try to save it in a special form */
rdbSaveRawString(rio * rdb,unsigned char * s,size_t len)343 ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len) {
344     int enclen;
345     ssize_t n, nwritten = 0;
346 
347     /* Try integer encoding */
348     if (len <= 11) {
349         unsigned char buf[5];
350         if ((enclen = rdbTryIntegerEncoding((char*)s,len,buf)) > 0) {
351             if (rdbWriteRaw(rdb,buf,enclen) == -1) return -1;
352             return enclen;
353         }
354     }
355 
356     /* Try LZF compression - under 20 bytes it's unable to compress even
357      * aaaaaaaaaaaaaaaaaa so skip it */
358     if (server.rdb_compression && len > 20) {
359         n = rdbSaveLzfStringObject(rdb,s,len);
360         if (n == -1) return -1;
361         if (n > 0) return n;
362         /* Return value of 0 means data can't be compressed, save the old way */
363     }
364 
365     /* Store verbatim */
366     if ((n = rdbSaveLen(rdb,len)) == -1) return -1;
367     nwritten += n;
368     if (len > 0) {
369         if (rdbWriteRaw(rdb,s,len) == -1) return -1;
370         nwritten += len;
371     }
372     return nwritten;
373 }
374 
375 /* Save a long long value as either an encoded string or a string. */
rdbSaveLongLongAsStringObject(rio * rdb,long long value)376 ssize_t rdbSaveLongLongAsStringObject(rio *rdb, long long value) {
377     unsigned char buf[32];
378     ssize_t n, nwritten = 0;
379     int enclen = rdbEncodeInteger(value,buf);
380     if (enclen > 0) {
381         return rdbWriteRaw(rdb,buf,enclen);
382     } else {
383         /* Encode as string */
384         enclen = ll2string((char*)buf,32,value);
385         serverAssert(enclen < 32);
386         if ((n = rdbSaveLen(rdb,enclen)) == -1) return -1;
387         nwritten += n;
388         if ((n = rdbWriteRaw(rdb,buf,enclen)) == -1) return -1;
389         nwritten += n;
390     }
391     return nwritten;
392 }
393 
394 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
rdbSaveStringObject(rio * rdb,robj * obj)395 int rdbSaveStringObject(rio *rdb, robj *obj) {
396     /* Avoid to decode the object, then encode it again, if the
397      * object is already integer encoded. */
398     if (obj->encoding == OBJ_ENCODING_INT) {
399         return rdbSaveLongLongAsStringObject(rdb,(long)obj->ptr);
400     } else {
401         serverAssertWithInfo(NULL,obj,sdsEncodedObject(obj));
402         return rdbSaveRawString(rdb,obj->ptr,sdslen(obj->ptr));
403     }
404 }
405 
406 /* Load a string object from an RDB file according to flags:
407  *
408  * RDB_LOAD_NONE (no flags): load an RDB object, unencoded.
409  * RDB_LOAD_ENC: If the returned type is a Redis object, try to
410  *               encode it in a special way to be more memory
411  *               efficient. When this flag is passed the function
412  *               no longer guarantees that obj->ptr is an SDS string.
413  * RDB_LOAD_PLAIN: Return a plain string allocated with zmalloc()
414  *                 instead of a Redis object with an sds in it.
415  * RDB_LOAD_SDS: Return an SDS string instead of a Redis object.
416  */
rdbGenericLoadStringObject(rio * rdb,int flags)417 void *rdbGenericLoadStringObject(rio *rdb, int flags) {
418     int encode = flags & RDB_LOAD_ENC;
419     int plain = flags & RDB_LOAD_PLAIN;
420     int isencoded;
421     uint32_t len;
422 
423     len = rdbLoadLen(rdb,&isencoded);
424     if (isencoded) {
425         switch(len) {
426         case RDB_ENC_INT8:
427         case RDB_ENC_INT16:
428         case RDB_ENC_INT32:
429             return rdbLoadIntegerObject(rdb,len,flags);
430         case RDB_ENC_LZF:
431             return rdbLoadLzfStringObject(rdb,flags);
432         default:
433             rdbExitReportCorruptRDB("Unknown RDB string encoding type %d",len);
434         }
435     }
436 
437     if (len == RDB_LENERR) return NULL;
438     if (!plain) {
439         robj *o = encode ? createStringObject(NULL,len) :
440                            createRawStringObject(NULL,len);
441         if (len && rioRead(rdb,o->ptr,len) == 0) {
442             decrRefCount(o);
443             return NULL;
444         }
445         return o;
446     } else {
447         void *buf = zmalloc(len);
448         if (len && rioRead(rdb,buf,len) == 0) {
449             zfree(buf);
450             return NULL;
451         }
452         return buf;
453     }
454 }
455 
rdbLoadStringObject(rio * rdb)456 robj *rdbLoadStringObject(rio *rdb) {
457     return rdbGenericLoadStringObject(rdb,RDB_LOAD_NONE);
458 }
459 
rdbLoadEncodedStringObject(rio * rdb)460 robj *rdbLoadEncodedStringObject(rio *rdb) {
461     return rdbGenericLoadStringObject(rdb,RDB_LOAD_ENC);
462 }
463 
464 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
465  * 8 bit integer specifying the length of the representation.
466  * This 8 bit integer has special values in order to specify the following
467  * conditions:
468  * 253: not a number
469  * 254: + inf
470  * 255: - inf
471  */
rdbSaveDoubleValue(rio * rdb,double val)472 int rdbSaveDoubleValue(rio *rdb, double val) {
473     unsigned char buf[128];
474     int len;
475 
476     if (isnan(val)) {
477         buf[0] = 253;
478         len = 1;
479     } else if (!isfinite(val)) {
480         len = 1;
481         buf[0] = (val < 0) ? 255 : 254;
482     } else {
483 #if (DBL_MANT_DIG >= 52) && (LLONG_MAX == 0x7fffffffffffffffLL)
484         /* Check if the float is in a safe range to be casted into a
485          * long long. We are assuming that long long is 64 bit here.
486          * Also we are assuming that there are no implementations around where
487          * double has precision < 52 bit.
488          *
489          * Under this assumptions we test if a double is inside an interval
490          * where casting to long long is safe. Then using two castings we
491          * make sure the decimal part is zero. If all this is true we use
492          * integer printing function that is much faster. */
493         double min = -4503599627370495; /* (2^52)-1 */
494         double max = 4503599627370496; /* -(2^52) */
495         if (val > min && val < max && val == ((double)((long long)val)))
496             ll2string((char*)buf+1,sizeof(buf)-1,(long long)val);
497         else
498 #endif
499             snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
500         buf[0] = strlen((char*)buf+1);
501         len = buf[0]+1;
502     }
503     return rdbWriteRaw(rdb,buf,len);
504 }
505 
506 /* For information about double serialization check rdbSaveDoubleValue() */
rdbLoadDoubleValue(rio * rdb,double * val)507 int rdbLoadDoubleValue(rio *rdb, double *val) {
508     char buf[256];
509     unsigned char len;
510 
511     if (rioRead(rdb,&len,1) == 0) return -1;
512     switch(len) {
513     case 255: *val = R_NegInf; return 0;
514     case 254: *val = R_PosInf; return 0;
515     case 253: *val = R_Nan; return 0;
516     default:
517         if (rioRead(rdb,buf,len) == 0) return -1;
518         buf[len] = '\0';
519         sscanf(buf, "%lg", val);
520         return 0;
521     }
522 }
523 
524 /* Save the object type of object "o". */
rdbSaveObjectType(rio * rdb,robj * o)525 int rdbSaveObjectType(rio *rdb, robj *o) {
526     switch (o->type) {
527     case OBJ_STRING:
528         return rdbSaveType(rdb,RDB_TYPE_STRING);
529     case OBJ_LIST:
530         if (o->encoding == OBJ_ENCODING_QUICKLIST)
531             return rdbSaveType(rdb,RDB_TYPE_LIST_QUICKLIST);
532         else
533             serverPanic("Unknown list encoding");
534     case OBJ_SET:
535         if (o->encoding == OBJ_ENCODING_INTSET)
536             return rdbSaveType(rdb,RDB_TYPE_SET_INTSET);
537         else if (o->encoding == OBJ_ENCODING_HT)
538             return rdbSaveType(rdb,RDB_TYPE_SET);
539         else
540             serverPanic("Unknown set encoding");
541     case OBJ_ZSET:
542         if (o->encoding == OBJ_ENCODING_ZIPLIST)
543             return rdbSaveType(rdb,RDB_TYPE_ZSET_ZIPLIST);
544         else if (o->encoding == OBJ_ENCODING_SKIPLIST)
545             return rdbSaveType(rdb,RDB_TYPE_ZSET);
546         else
547             serverPanic("Unknown sorted set encoding");
548     case OBJ_HASH:
549         if (o->encoding == OBJ_ENCODING_ZIPLIST)
550             return rdbSaveType(rdb,RDB_TYPE_HASH_ZIPLIST);
551         else if (o->encoding == OBJ_ENCODING_HT)
552             return rdbSaveType(rdb,RDB_TYPE_HASH);
553         else
554             serverPanic("Unknown hash encoding");
555     default:
556         serverPanic("Unknown object type");
557     }
558     return -1; /* avoid warning */
559 }
560 
561 /* Use rdbLoadType() to load a TYPE in RDB format, but returns -1 if the
562  * type is not specifically a valid Object Type. */
rdbLoadObjectType(rio * rdb)563 int rdbLoadObjectType(rio *rdb) {
564     int type;
565     if ((type = rdbLoadType(rdb)) == -1) return -1;
566     if (!rdbIsObjectType(type)) return -1;
567     return type;
568 }
569 
570 /* Save a Redis object. Returns -1 on error, number of bytes written on success. */
rdbSaveObject(rio * rdb,robj * o)571 ssize_t rdbSaveObject(rio *rdb, robj *o) {
572     ssize_t n = 0, nwritten = 0;
573 
574     if (o->type == OBJ_STRING) {
575         /* Save a string value */
576         if ((n = rdbSaveStringObject(rdb,o)) == -1) return -1;
577         nwritten += n;
578     } else if (o->type == OBJ_LIST) {
579         /* Save a list value */
580         if (o->encoding == OBJ_ENCODING_QUICKLIST) {
581             quicklist *ql = o->ptr;
582             quicklistNode *node = ql->head;
583 
584             if ((n = rdbSaveLen(rdb,ql->len)) == -1) return -1;
585             nwritten += n;
586 
587             do {
588                 if (quicklistNodeIsCompressed(node)) {
589                     void *data;
590                     size_t compress_len = quicklistGetLzf(node, &data);
591                     if ((n = rdbSaveLzfBlob(rdb,data,compress_len,node->sz)) == -1) return -1;
592                     nwritten += n;
593                 } else {
594                     if ((n = rdbSaveRawString(rdb,node->zl,node->sz)) == -1) return -1;
595                     nwritten += n;
596                 }
597             } while ((node = node->next));
598         } else {
599             serverPanic("Unknown list encoding");
600         }
601     } else if (o->type == OBJ_SET) {
602         /* Save a set value */
603         if (o->encoding == OBJ_ENCODING_HT) {
604             dict *set = o->ptr;
605             dictIterator *di = dictGetIterator(set);
606             dictEntry *de;
607 
608             if ((n = rdbSaveLen(rdb,dictSize(set))) == -1) return -1;
609             nwritten += n;
610 
611             while((de = dictNext(di)) != NULL) {
612                 robj *eleobj = dictGetKey(de);
613                 if ((n = rdbSaveStringObject(rdb,eleobj)) == -1) return -1;
614                 nwritten += n;
615             }
616             dictReleaseIterator(di);
617         } else if (o->encoding == OBJ_ENCODING_INTSET) {
618             size_t l = intsetBlobLen((intset*)o->ptr);
619 
620             if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
621             nwritten += n;
622         } else {
623             serverPanic("Unknown set encoding");
624         }
625     } else if (o->type == OBJ_ZSET) {
626         /* Save a sorted set value */
627         if (o->encoding == OBJ_ENCODING_ZIPLIST) {
628             size_t l = ziplistBlobLen((unsigned char*)o->ptr);
629 
630             if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
631             nwritten += n;
632         } else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
633             zset *zs = o->ptr;
634             dictIterator *di = dictGetIterator(zs->dict);
635             dictEntry *de;
636 
637             if ((n = rdbSaveLen(rdb,dictSize(zs->dict))) == -1) return -1;
638             nwritten += n;
639 
640             while((de = dictNext(di)) != NULL) {
641                 robj *eleobj = dictGetKey(de);
642                 double *score = dictGetVal(de);
643 
644                 if ((n = rdbSaveStringObject(rdb,eleobj)) == -1) return -1;
645                 nwritten += n;
646                 if ((n = rdbSaveDoubleValue(rdb,*score)) == -1) return -1;
647                 nwritten += n;
648             }
649             dictReleaseIterator(di);
650         } else {
651             serverPanic("Unknown sorted set encoding");
652         }
653     } else if (o->type == OBJ_HASH) {
654         /* Save a hash value */
655         if (o->encoding == OBJ_ENCODING_ZIPLIST) {
656             size_t l = ziplistBlobLen((unsigned char*)o->ptr);
657 
658             if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
659             nwritten += n;
660 
661         } else if (o->encoding == OBJ_ENCODING_HT) {
662             dictIterator *di = dictGetIterator(o->ptr);
663             dictEntry *de;
664 
665             if ((n = rdbSaveLen(rdb,dictSize((dict*)o->ptr))) == -1) return -1;
666             nwritten += n;
667 
668             while((de = dictNext(di)) != NULL) {
669                 robj *key = dictGetKey(de);
670                 robj *val = dictGetVal(de);
671 
672                 if ((n = rdbSaveStringObject(rdb,key)) == -1) return -1;
673                 nwritten += n;
674                 if ((n = rdbSaveStringObject(rdb,val)) == -1) return -1;
675                 nwritten += n;
676             }
677             dictReleaseIterator(di);
678 
679         } else {
680             serverPanic("Unknown hash encoding");
681         }
682 
683     } else {
684         serverPanic("Unknown object type");
685     }
686     return nwritten;
687 }
688 
689 /* Return the length the object will have on disk if saved with
690  * the rdbSaveObject() function. Currently we use a trick to get
691  * this length with very little changes to the code. In the future
692  * we could switch to a faster solution. */
rdbSavedObjectLen(robj * o)693 size_t rdbSavedObjectLen(robj *o) {
694     ssize_t len = rdbSaveObject(NULL,o);
695     serverAssertWithInfo(NULL,o,len != -1);
696     return len;
697 }
698 
699 /* Save a key-value pair, with expire time, type, key, value.
700  * On error -1 is returned.
701  * On success if the key was actually saved 1 is returned, otherwise 0
702  * is returned (the key was already expired). */
rdbSaveKeyValuePair(rio * rdb,robj * key,robj * val,long long expiretime,long long now)703 int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val,
704                         long long expiretime, long long now)
705 {
706     /* Save the expire time */
707     if (expiretime != -1) {
708         /* If this key is already expired skip it */
709         if (expiretime < now) return 0;
710         if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
711         if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
712     }
713 
714     /* Save type, key, value */
715     if (rdbSaveObjectType(rdb,val) == -1) return -1;
716     if (rdbSaveStringObject(rdb,key) == -1) return -1;
717     if (rdbSaveObject(rdb,val) == -1) return -1;
718     return 1;
719 }
720 
721 /* Save an AUX field. */
rdbSaveAuxField(rio * rdb,void * key,size_t keylen,void * val,size_t vallen)722 int rdbSaveAuxField(rio *rdb, void *key, size_t keylen, void *val, size_t vallen) {
723     if (rdbSaveType(rdb,RDB_OPCODE_AUX) == -1) return -1;
724     if (rdbSaveRawString(rdb,key,keylen) == -1) return -1;
725     if (rdbSaveRawString(rdb,val,vallen) == -1) return -1;
726     return 1;
727 }
728 
729 /* Wrapper for rdbSaveAuxField() used when key/val length can be obtained
730  * with strlen(). */
rdbSaveAuxFieldStrStr(rio * rdb,char * key,char * val)731 int rdbSaveAuxFieldStrStr(rio *rdb, char *key, char *val) {
732     return rdbSaveAuxField(rdb,key,strlen(key),val,strlen(val));
733 }
734 
735 /* Wrapper for strlen(key) + integer type (up to long long range). */
rdbSaveAuxFieldStrInt(rio * rdb,char * key,long long val)736 int rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) {
737     char buf[LONG_STR_SIZE];
738     int vlen = ll2string(buf,sizeof(buf),val);
739     return rdbSaveAuxField(rdb,key,strlen(key),buf,vlen);
740 }
741 
742 /* Save a few default AUX fields with information about the RDB generated. */
rdbSaveInfoAuxFields(rio * rdb)743 int rdbSaveInfoAuxFields(rio *rdb) {
744     int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
745 
746     /* Add a few fields about the state when the RDB was created. */
747     if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1;
748     if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1;
749     if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1;
750     if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1;
751     return 1;
752 }
753 
754 /* Produces a dump of the database in RDB format sending it to the specified
755  * Redis I/O channel. On success C_OK is returned, otherwise C_ERR
756  * is returned and part of the output, or all the output, can be
757  * missing because of I/O errors.
758  *
759  * When the function returns C_ERR and if 'error' is not NULL, the
760  * integer pointed by 'error' is set to the value of errno just after the I/O
761  * error. */
rdbSaveRio(rio * rdb,int * error)762 int rdbSaveRio(rio *rdb, int *error) {
763     dictIterator *di = NULL;
764     dictEntry *de;
765     char magic[10];
766     int j;
767     long long now = mstime();
768     uint64_t cksum;
769 
770     if (server.rdb_checksum)
771         rdb->update_cksum = rioGenericUpdateChecksum;
772     snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
773     if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
774     if (rdbSaveInfoAuxFields(rdb) == -1) goto werr;
775 
776     for (j = 0; j < server.dbnum; j++) {
777         redisDb *db = server.db+j;
778         dict *d = db->dict;
779         if (dictSize(d) == 0) continue;
780         di = dictGetSafeIterator(d);
781         if (!di) return C_ERR;
782 
783         /* Write the SELECT DB opcode */
784         if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
785         if (rdbSaveLen(rdb,j) == -1) goto werr;
786 
787         /* Write the RESIZE DB opcode. We trim the size to UINT32_MAX, which
788          * is currently the largest type we are able to represent in RDB sizes.
789          * However this does not limit the actual size of the DB to load since
790          * these sizes are just hints to resize the hash tables. */
791         uint32_t db_size, expires_size;
792         db_size = (dictSize(db->dict) <= UINT32_MAX) ?
793                                 dictSize(db->dict) :
794                                 UINT32_MAX;
795         expires_size = (dictSize(db->expires) <= UINT32_MAX) ?
796                                 dictSize(db->expires) :
797                                 UINT32_MAX;
798         if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
799         if (rdbSaveLen(rdb,db_size) == -1) goto werr;
800         if (rdbSaveLen(rdb,expires_size) == -1) goto werr;
801 
802         /* Iterate this DB writing every entry */
803         while((de = dictNext(di)) != NULL) {
804             sds keystr = dictGetKey(de);
805             robj key, *o = dictGetVal(de);
806             long long expire;
807 
808             initStaticStringObject(key,keystr);
809             expire = getExpire(db,&key);
810             if (rdbSaveKeyValuePair(rdb,&key,o,expire,now) == -1) goto werr;
811         }
812         dictReleaseIterator(di);
813     }
814     di = NULL; /* So that we don't release it again on error. */
815 
816     /* EOF opcode */
817     if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
818 
819     /* CRC64 checksum. It will be zero if checksum computation is disabled, the
820      * loading code skips the check in this case. */
821     cksum = rdb->cksum;
822     memrev64ifbe(&cksum);
823     if (rioWrite(rdb,&cksum,8) == 0) goto werr;
824     return C_OK;
825 
826 werr:
827     if (error) *error = errno;
828     if (di) dictReleaseIterator(di);
829     return C_ERR;
830 }
831 
832 /* This is just a wrapper to rdbSaveRio() that additionally adds a prefix
833  * and a suffix to the generated RDB dump. The prefix is:
834  *
835  * $EOF:<40 bytes unguessable hex string>\r\n
836  *
837  * While the suffix is the 40 bytes hex string we announced in the prefix.
838  * This way processes receiving the payload can understand when it ends
839  * without doing any processing of the content. */
rdbSaveRioWithEOFMark(rio * rdb,int * error)840 int rdbSaveRioWithEOFMark(rio *rdb, int *error) {
841     char eofmark[RDB_EOF_MARK_SIZE];
842 
843     getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE);
844     if (error) *error = 0;
845     if (rioWrite(rdb,"$EOF:",5) == 0) goto werr;
846     if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
847     if (rioWrite(rdb,"\r\n",2) == 0) goto werr;
848     if (rdbSaveRio(rdb,error) == C_ERR) goto werr;
849     if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
850     return C_OK;
851 
852 werr: /* Write error. */
853     /* Set 'error' only if not already set by rdbSaveRio() call. */
854     if (error && *error == 0) *error = errno;
855     return C_ERR;
856 }
857 
858 /* Save the DB on disk. Return C_ERR on error, C_OK on success. */
rdbSave(char * filename)859 int rdbSave(char *filename) {
860     char tmpfile[256];
861     char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
862     FILE *fp;
863     rio rdb;
864     int error = 0;
865 
866     snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
867     fp = fopen(tmpfile,"w");
868     if (!fp) {
869         char *cwdp = getcwd(cwd,MAXPATHLEN);
870         serverLog(LL_WARNING,
871             "Failed opening the RDB file %s (in server root dir %s) "
872             "for saving: %s",
873             filename,
874             cwdp ? cwdp : "unknown",
875             strerror(errno));
876         return C_ERR;
877     }
878 
879     rioInitWithFile(&rdb,fp);
880     if (rdbSaveRio(&rdb,&error) == C_ERR) {
881         errno = error;
882         goto werr;
883     }
884 
885     /* Make sure data will not remain on the OS's output buffers */
886     if (fflush(fp) == EOF) goto werr;
887     if (fsync(fileno(fp)) == -1) goto werr;
888     if (fclose(fp) == EOF) goto werr;
889 
890     /* Use RENAME to make sure the DB file is changed atomically only
891      * if the generate DB file is ok. */
892     if (rename(tmpfile,filename) == -1) {
893         char *cwdp = getcwd(cwd,MAXPATHLEN);
894         serverLog(LL_WARNING,
895             "Error moving temp DB file %s on the final "
896             "destination %s (in server root dir %s): %s",
897             tmpfile,
898             filename,
899             cwdp ? cwdp : "unknown",
900             strerror(errno));
901         unlink(tmpfile);
902         return C_ERR;
903     }
904 
905     serverLog(LL_NOTICE,"DB saved on disk");
906     server.dirty = 0;
907     server.lastsave = time(NULL);
908     server.lastbgsave_status = C_OK;
909     return C_OK;
910 
911 werr:
912     serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
913     fclose(fp);
914     unlink(tmpfile);
915     return C_ERR;
916 }
917 
rdbSaveBackground(char * filename)918 int rdbSaveBackground(char *filename) {
919     pid_t childpid;
920     long long start;
921 
922     if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
923 
924     server.dirty_before_bgsave = server.dirty;
925     server.lastbgsave_try = time(NULL);
926 
927     start = ustime();
928     if ((childpid = fork()) == 0) {
929         int retval;
930 
931         /* Child */
932         closeListeningSockets(0);
933         redisSetProcTitle("redis-rdb-bgsave");
934         retval = rdbSave(filename);
935         if (retval == C_OK) {
936             size_t private_dirty = zmalloc_get_private_dirty();
937 
938             if (private_dirty) {
939                 serverLog(LL_NOTICE,
940                     "RDB: %zu MB of memory used by copy-on-write",
941                     private_dirty/(1024*1024));
942             }
943         }
944         exitFromChild((retval == C_OK) ? 0 : 1);
945     } else {
946         /* Parent */
947         server.stat_fork_time = ustime()-start;
948         server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
949         latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
950         if (childpid == -1) {
951             server.lastbgsave_status = C_ERR;
952             serverLog(LL_WARNING,"Can't save in background: fork: %s",
953                 strerror(errno));
954             return C_ERR;
955         }
956         serverLog(LL_NOTICE,"Background saving started by pid %d",childpid);
957         server.rdb_save_time_start = time(NULL);
958         server.rdb_child_pid = childpid;
959         server.rdb_child_type = RDB_CHILD_TYPE_DISK;
960         updateDictResizePolicy();
961         return C_OK;
962     }
963     return C_OK; /* unreached */
964 }
965 
rdbRemoveTempFile(pid_t childpid)966 void rdbRemoveTempFile(pid_t childpid) {
967     char tmpfile[256];
968 
969     snprintf(tmpfile,sizeof(tmpfile),"temp-%d.rdb", (int) childpid);
970     unlink(tmpfile);
971 }
972 
973 /* Load a Redis object of the specified type from the specified file.
974  * On success a newly allocated object is returned, otherwise NULL. */
rdbLoadObject(int rdbtype,rio * rdb)975 robj *rdbLoadObject(int rdbtype, rio *rdb) {
976     robj *o = NULL, *ele, *dec;
977     size_t len;
978     unsigned int i;
979 
980     if (rdbtype == RDB_TYPE_STRING) {
981         /* Read string value */
982         if ((o = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
983         o = tryObjectEncoding(o);
984     } else if (rdbtype == RDB_TYPE_LIST) {
985         /* Read list value */
986         if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
987 
988         o = createQuicklistObject();
989         quicklistSetOptions(o->ptr, server.list_max_ziplist_size,
990                             server.list_compress_depth);
991 
992         /* Load every single element of the list */
993         while(len--) {
994             if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
995             dec = getDecodedObject(ele);
996             size_t len = sdslen(dec->ptr);
997             quicklistPushTail(o->ptr, dec->ptr, len);
998             decrRefCount(dec);
999             decrRefCount(ele);
1000         }
1001     } else if (rdbtype == RDB_TYPE_SET) {
1002         /* Read list/set value */
1003         if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
1004 
1005         /* Use a regular set when there are too many entries. */
1006         if (len > server.set_max_intset_entries) {
1007             o = createSetObject();
1008             /* It's faster to expand the dict to the right size asap in order
1009              * to avoid rehashing */
1010             if (len > DICT_HT_INITIAL_SIZE)
1011                 dictExpand(o->ptr,len);
1012         } else {
1013             o = createIntsetObject();
1014         }
1015 
1016         /* Load every single element of the list/set */
1017         for (i = 0; i < len; i++) {
1018             long long llval;
1019             if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
1020             ele = tryObjectEncoding(ele);
1021 
1022             if (o->encoding == OBJ_ENCODING_INTSET) {
1023                 /* Fetch integer value from element */
1024                 if (isObjectRepresentableAsLongLong(ele,&llval) == C_OK) {
1025                     o->ptr = intsetAdd(o->ptr,llval,NULL);
1026                 } else {
1027                     setTypeConvert(o,OBJ_ENCODING_HT);
1028                     dictExpand(o->ptr,len);
1029                 }
1030             }
1031 
1032             /* This will also be called when the set was just converted
1033              * to a regular hash table encoded set */
1034             if (o->encoding == OBJ_ENCODING_HT) {
1035                 dictAdd((dict*)o->ptr,ele,NULL);
1036             } else {
1037                 decrRefCount(ele);
1038             }
1039         }
1040     } else if (rdbtype == RDB_TYPE_ZSET) {
1041         /* Read list/set value */
1042         size_t zsetlen;
1043         size_t maxelelen = 0;
1044         zset *zs;
1045 
1046         if ((zsetlen = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
1047         o = createZsetObject();
1048         zs = o->ptr;
1049 
1050         /* Load every single element of the list/set */
1051         while(zsetlen--) {
1052             robj *ele;
1053             double score;
1054             zskiplistNode *znode;
1055 
1056             if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
1057             ele = tryObjectEncoding(ele);
1058             if (rdbLoadDoubleValue(rdb,&score) == -1) return NULL;
1059 
1060             /* Don't care about integer-encoded strings. */
1061             if (sdsEncodedObject(ele) && sdslen(ele->ptr) > maxelelen)
1062                 maxelelen = sdslen(ele->ptr);
1063 
1064             znode = zslInsert(zs->zsl,score,ele);
1065             dictAdd(zs->dict,ele,&znode->score);
1066             incrRefCount(ele); /* added to skiplist */
1067         }
1068 
1069         /* Convert *after* loading, since sorted sets are not stored ordered. */
1070         if (zsetLength(o) <= server.zset_max_ziplist_entries &&
1071             maxelelen <= server.zset_max_ziplist_value)
1072                 zsetConvert(o,OBJ_ENCODING_ZIPLIST);
1073     } else if (rdbtype == RDB_TYPE_HASH) {
1074         size_t len;
1075         int ret;
1076 
1077         len = rdbLoadLen(rdb, NULL);
1078         if (len == RDB_LENERR) return NULL;
1079 
1080         o = createHashObject();
1081 
1082         /* Too many entries? Use a hash table. */
1083         if (len > server.hash_max_ziplist_entries)
1084             hashTypeConvert(o, OBJ_ENCODING_HT);
1085 
1086         /* Load every field and value into the ziplist */
1087         while (o->encoding == OBJ_ENCODING_ZIPLIST && len > 0) {
1088             robj *field, *value;
1089 
1090             len--;
1091             /* Load raw strings */
1092             field = rdbLoadStringObject(rdb);
1093             if (field == NULL) return NULL;
1094             serverAssert(sdsEncodedObject(field));
1095             value = rdbLoadStringObject(rdb);
1096             if (value == NULL) return NULL;
1097             serverAssert(sdsEncodedObject(value));
1098 
1099             /* Add pair to ziplist */
1100             o->ptr = ziplistPush(o->ptr, field->ptr, sdslen(field->ptr), ZIPLIST_TAIL);
1101             o->ptr = ziplistPush(o->ptr, value->ptr, sdslen(value->ptr), ZIPLIST_TAIL);
1102             /* Convert to hash table if size threshold is exceeded */
1103             if (sdslen(field->ptr) > server.hash_max_ziplist_value ||
1104                 sdslen(value->ptr) > server.hash_max_ziplist_value)
1105             {
1106                 decrRefCount(field);
1107                 decrRefCount(value);
1108                 hashTypeConvert(o, OBJ_ENCODING_HT);
1109                 break;
1110             }
1111             decrRefCount(field);
1112             decrRefCount(value);
1113         }
1114 
1115         /* Load remaining fields and values into the hash table */
1116         while (o->encoding == OBJ_ENCODING_HT && len > 0) {
1117             robj *field, *value;
1118 
1119             len--;
1120             /* Load encoded strings */
1121             field = rdbLoadEncodedStringObject(rdb);
1122             if (field == NULL) return NULL;
1123             value = rdbLoadEncodedStringObject(rdb);
1124             if (value == NULL) return NULL;
1125 
1126             field = tryObjectEncoding(field);
1127             value = tryObjectEncoding(value);
1128 
1129             /* Add pair to hash table */
1130             ret = dictAdd((dict*)o->ptr, field, value);
1131             if (ret == DICT_ERR) {
1132                 rdbExitReportCorruptRDB("Duplicate keys detected");
1133             }
1134         }
1135 
1136         /* All pairs should be read by now */
1137         serverAssert(len == 0);
1138     } else if (rdbtype == RDB_TYPE_LIST_QUICKLIST) {
1139         if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
1140         o = createQuicklistObject();
1141         quicklistSetOptions(o->ptr, server.list_max_ziplist_size,
1142                             server.list_compress_depth);
1143 
1144         while (len--) {
1145             unsigned char *zl = rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN);
1146             if (zl == NULL) return NULL;
1147             quicklistAppendZiplist(o->ptr, zl);
1148         }
1149     } else if (rdbtype == RDB_TYPE_HASH_ZIPMAP  ||
1150                rdbtype == RDB_TYPE_LIST_ZIPLIST ||
1151                rdbtype == RDB_TYPE_SET_INTSET   ||
1152                rdbtype == RDB_TYPE_ZSET_ZIPLIST ||
1153                rdbtype == RDB_TYPE_HASH_ZIPLIST)
1154     {
1155         unsigned char *encoded = rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN);
1156         if (encoded == NULL) return NULL;
1157         o = createObject(OBJ_STRING,encoded); /* Obj type fixed below. */
1158 
1159         /* Fix the object encoding, and make sure to convert the encoded
1160          * data type into the base type if accordingly to the current
1161          * configuration there are too many elements in the encoded data
1162          * type. Note that we only check the length and not max element
1163          * size as this is an O(N) scan. Eventually everything will get
1164          * converted. */
1165         switch(rdbtype) {
1166             case RDB_TYPE_HASH_ZIPMAP:
1167                 /* Convert to ziplist encoded hash. This must be deprecated
1168                  * when loading dumps created by Redis 2.4 gets deprecated. */
1169                 {
1170                     unsigned char *zl = ziplistNew();
1171                     unsigned char *zi = zipmapRewind(o->ptr);
1172                     unsigned char *fstr, *vstr;
1173                     unsigned int flen, vlen;
1174                     unsigned int maxlen = 0;
1175 
1176                     while ((zi = zipmapNext(zi, &fstr, &flen, &vstr, &vlen)) != NULL) {
1177                         if (flen > maxlen) maxlen = flen;
1178                         if (vlen > maxlen) maxlen = vlen;
1179                         zl = ziplistPush(zl, fstr, flen, ZIPLIST_TAIL);
1180                         zl = ziplistPush(zl, vstr, vlen, ZIPLIST_TAIL);
1181                     }
1182 
1183                     zfree(o->ptr);
1184                     o->ptr = zl;
1185                     o->type = OBJ_HASH;
1186                     o->encoding = OBJ_ENCODING_ZIPLIST;
1187 
1188                     if (hashTypeLength(o) > server.hash_max_ziplist_entries ||
1189                         maxlen > server.hash_max_ziplist_value)
1190                     {
1191                         hashTypeConvert(o, OBJ_ENCODING_HT);
1192                     }
1193                 }
1194                 break;
1195             case RDB_TYPE_LIST_ZIPLIST:
1196                 o->type = OBJ_LIST;
1197                 o->encoding = OBJ_ENCODING_ZIPLIST;
1198                 listTypeConvert(o,OBJ_ENCODING_QUICKLIST);
1199                 break;
1200             case RDB_TYPE_SET_INTSET:
1201                 o->type = OBJ_SET;
1202                 o->encoding = OBJ_ENCODING_INTSET;
1203                 if (intsetLen(o->ptr) > server.set_max_intset_entries)
1204                     setTypeConvert(o,OBJ_ENCODING_HT);
1205                 break;
1206             case RDB_TYPE_ZSET_ZIPLIST:
1207                 o->type = OBJ_ZSET;
1208                 o->encoding = OBJ_ENCODING_ZIPLIST;
1209                 if (zsetLength(o) > server.zset_max_ziplist_entries)
1210                     zsetConvert(o,OBJ_ENCODING_SKIPLIST);
1211                 break;
1212             case RDB_TYPE_HASH_ZIPLIST:
1213                 o->type = OBJ_HASH;
1214                 o->encoding = OBJ_ENCODING_ZIPLIST;
1215                 if (hashTypeLength(o) > server.hash_max_ziplist_entries)
1216                     hashTypeConvert(o, OBJ_ENCODING_HT);
1217                 break;
1218             default:
1219                 rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
1220                 break;
1221         }
1222     } else {
1223         rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
1224     }
1225     return o;
1226 }
1227 
1228 /* Mark that we are loading in the global state and setup the fields
1229  * needed to provide loading stats. */
startLoading(FILE * fp)1230 void startLoading(FILE *fp) {
1231     struct stat sb;
1232 
1233     /* Load the DB */
1234     server.loading = 1;
1235     server.loading_start_time = time(NULL);
1236     server.loading_loaded_bytes = 0;
1237     if (fstat(fileno(fp), &sb) == -1) {
1238         server.loading_total_bytes = 0;
1239     } else {
1240         server.loading_total_bytes = sb.st_size;
1241     }
1242 }
1243 
1244 /* Refresh the loading progress info */
loadingProgress(off_t pos)1245 void loadingProgress(off_t pos) {
1246     server.loading_loaded_bytes = pos;
1247     if (server.stat_peak_memory < zmalloc_used_memory())
1248         server.stat_peak_memory = zmalloc_used_memory();
1249 }
1250 
1251 /* Loading finished */
stopLoading(void)1252 void stopLoading(void) {
1253     server.loading = 0;
1254 }
1255 
1256 /* Track loading progress in order to serve client's from time to time
1257    and if needed calculate rdb checksum  */
rdbLoadProgressCallback(rio * r,const void * buf,size_t len)1258 void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
1259     if (server.rdb_checksum)
1260         rioGenericUpdateChecksum(r, buf, len);
1261     if (server.loading_process_events_interval_bytes &&
1262         (r->processed_bytes + len)/server.loading_process_events_interval_bytes > r->processed_bytes/server.loading_process_events_interval_bytes)
1263     {
1264         /* The DB can take some non trivial amount of time to load. Update
1265          * our cached time since it is used to create and update the last
1266          * interaction time with clients and for other important things. */
1267         updateCachedTime();
1268         if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER)
1269             replicationSendNewlineToMaster();
1270         loadingProgress(r->processed_bytes);
1271         processEventsWhileBlocked();
1272     }
1273 }
1274 
rdbLoad(char * filename)1275 int rdbLoad(char *filename) {
1276     uint32_t dbid;
1277     int type, rdbver;
1278     redisDb *db = server.db+0;
1279     char buf[1024];
1280     long long expiretime, now = mstime();
1281     FILE *fp;
1282     rio rdb;
1283 
1284     if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
1285 
1286     rioInitWithFile(&rdb,fp);
1287     rdb.update_cksum = rdbLoadProgressCallback;
1288     rdb.max_processing_chunk = server.loading_process_events_interval_bytes;
1289     if (rioRead(&rdb,buf,9) == 0) goto eoferr;
1290     buf[9] = '\0';
1291     if (memcmp(buf,"REDIS",5) != 0) {
1292         fclose(fp);
1293         serverLog(LL_WARNING,"Wrong signature trying to load DB from file");
1294         errno = EINVAL;
1295         return C_ERR;
1296     }
1297     rdbver = atoi(buf+5);
1298     if (rdbver < 1 || rdbver > RDB_VERSION) {
1299         fclose(fp);
1300         serverLog(LL_WARNING,"Can't handle RDB format version %d",rdbver);
1301         errno = EINVAL;
1302         return C_ERR;
1303     }
1304 
1305     startLoading(fp);
1306     while(1) {
1307         robj *key, *val;
1308         expiretime = -1;
1309 
1310         /* Read type. */
1311         if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
1312 
1313         /* Handle special types. */
1314         if (type == RDB_OPCODE_EXPIRETIME) {
1315             /* EXPIRETIME: load an expire associated with the next key
1316              * to load. Note that after loading an expire we need to
1317              * load the actual type, and continue. */
1318             if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr;
1319             /* We read the time so we need to read the object type again. */
1320             if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
1321             /* the EXPIRETIME opcode specifies time in seconds, so convert
1322              * into milliseconds. */
1323             expiretime *= 1000;
1324         } else if (type == RDB_OPCODE_EXPIRETIME_MS) {
1325             /* EXPIRETIME_MS: milliseconds precision expire times introduced
1326              * with RDB v3. Like EXPIRETIME but no with more precision. */
1327             if ((expiretime = rdbLoadMillisecondTime(&rdb)) == -1) goto eoferr;
1328             /* We read the time so we need to read the object type again. */
1329             if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
1330         } else if (type == RDB_OPCODE_EOF) {
1331             /* EOF: End of file, exit the main loop. */
1332             break;
1333         } else if (type == RDB_OPCODE_SELECTDB) {
1334             /* SELECTDB: Select the specified database. */
1335             if ((dbid = rdbLoadLen(&rdb,NULL)) == RDB_LENERR)
1336                 goto eoferr;
1337             if (dbid >= (unsigned)server.dbnum) {
1338                 serverLog(LL_WARNING,
1339                     "FATAL: Data file was created with a Redis "
1340                     "server configured to handle more than %d "
1341                     "databases. Exiting\n", server.dbnum);
1342                 exit(1);
1343             }
1344             db = server.db+dbid;
1345             continue; /* Read type again. */
1346         } else if (type == RDB_OPCODE_RESIZEDB) {
1347             /* RESIZEDB: Hint about the size of the keys in the currently
1348              * selected data base, in order to avoid useless rehashing. */
1349             uint32_t db_size, expires_size;
1350             if ((db_size = rdbLoadLen(&rdb,NULL)) == RDB_LENERR)
1351                 goto eoferr;
1352             if ((expires_size = rdbLoadLen(&rdb,NULL)) == RDB_LENERR)
1353                 goto eoferr;
1354             dictExpand(db->dict,db_size);
1355             dictExpand(db->expires,expires_size);
1356             continue; /* Read type again. */
1357         } else if (type == RDB_OPCODE_AUX) {
1358             /* AUX: generic string-string fields. Use to add state to RDB
1359              * which is backward compatible. Implementations of RDB loading
1360              * are requierd to skip AUX fields they don't understand.
1361              *
1362              * An AUX field is composed of two strings: key and value. */
1363             robj *auxkey, *auxval;
1364             if ((auxkey = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;
1365             if ((auxval = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;
1366 
1367             if (((char*)auxkey->ptr)[0] == '%') {
1368                 /* All the fields with a name staring with '%' are considered
1369                  * information fields and are logged at startup with a log
1370                  * level of NOTICE. */
1371                 serverLog(LL_NOTICE,"RDB '%s': %s",
1372                     (char*)auxkey->ptr,
1373                     (char*)auxval->ptr);
1374             } else {
1375                 /* We ignore fields we don't understand, as by AUX field
1376                  * contract. */
1377                 serverLog(LL_DEBUG,"Unrecognized RDB AUX field: '%s'",
1378                     (char*)auxkey->ptr);
1379             }
1380 
1381             decrRefCount(auxkey);
1382             decrRefCount(auxval);
1383             continue; /* Read type again. */
1384         }
1385 
1386         /* Read key */
1387         if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;
1388         /* Read value */
1389         if ((val = rdbLoadObject(type,&rdb)) == NULL) goto eoferr;
1390         /* Check if the key already expired. This function is used when loading
1391          * an RDB file from disk, either at startup, or when an RDB was
1392          * received from the master. In the latter case, the master is
1393          * responsible for key expiry. If we would expire keys here, the
1394          * snapshot taken by the master may not be reflected on the slave. */
1395         if (server.masterhost == NULL && expiretime != -1 && expiretime < now) {
1396             decrRefCount(key);
1397             decrRefCount(val);
1398             continue;
1399         }
1400         /* Add the new object in the hash table */
1401         dbAdd(db,key,val);
1402 
1403         /* Set the expire time if needed */
1404         if (expiretime != -1) setExpire(db,key,expiretime);
1405 
1406         decrRefCount(key);
1407     }
1408     /* Verify the checksum if RDB version is >= 5 */
1409     if (rdbver >= 5 && server.rdb_checksum) {
1410         uint64_t cksum, expected = rdb.cksum;
1411 
1412         if (rioRead(&rdb,&cksum,8) == 0) goto eoferr;
1413         memrev64ifbe(&cksum);
1414         if (cksum == 0) {
1415             serverLog(LL_WARNING,"RDB file was saved with checksum disabled: no check performed.");
1416         } else if (cksum != expected) {
1417             serverLog(LL_WARNING,"Wrong RDB checksum. Aborting now.");
1418             rdbExitReportCorruptRDB("RDB CRC error");
1419         }
1420     }
1421 
1422     fclose(fp);
1423     stopLoading();
1424     return C_OK;
1425 
1426 eoferr: /* unexpected end of file is handled here with a fatal exit */
1427     serverLog(LL_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
1428     rdbExitReportCorruptRDB("Unexpected EOF reading RDB file");
1429     return C_ERR; /* Just to avoid warning */
1430 }
1431 
1432 /* A background saving child (BGSAVE) terminated its work. Handle this.
1433  * This function covers the case of actual BGSAVEs. */
backgroundSaveDoneHandlerDisk(int exitcode,int bysignal)1434 void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) {
1435     if (!bysignal && exitcode == 0) {
1436         serverLog(LL_NOTICE,
1437             "Background saving terminated with success");
1438         server.dirty = server.dirty - server.dirty_before_bgsave;
1439         server.lastsave = time(NULL);
1440         server.lastbgsave_status = C_OK;
1441     } else if (!bysignal && exitcode != 0) {
1442         serverLog(LL_WARNING, "Background saving error");
1443         server.lastbgsave_status = C_ERR;
1444     } else {
1445         mstime_t latency;
1446 
1447         serverLog(LL_WARNING,
1448             "Background saving terminated by signal %d", bysignal);
1449         latencyStartMonitor(latency);
1450         rdbRemoveTempFile(server.rdb_child_pid);
1451         latencyEndMonitor(latency);
1452         latencyAddSampleIfNeeded("rdb-unlink-temp-file",latency);
1453         /* SIGUSR1 is whitelisted, so we have a way to kill a child without
1454          * tirggering an error conditon. */
1455         if (bysignal != SIGUSR1)
1456             server.lastbgsave_status = C_ERR;
1457     }
1458     server.rdb_child_pid = -1;
1459     server.rdb_child_type = RDB_CHILD_TYPE_NONE;
1460     server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;
1461     server.rdb_save_time_start = -1;
1462     /* Possibly there are slaves waiting for a BGSAVE in order to be served
1463      * (the first stage of SYNC is a bulk transfer of dump.rdb) */
1464     updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? C_OK : C_ERR, RDB_CHILD_TYPE_DISK);
1465 }
1466 
1467 /* A background saving child (BGSAVE) terminated its work. Handle this.
1468  * This function covers the case of RDB -> Salves socket transfers for
1469  * diskless replication. */
backgroundSaveDoneHandlerSocket(int exitcode,int bysignal)1470 void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
1471     uint64_t *ok_slaves;
1472 
1473     if (!bysignal && exitcode == 0) {
1474         serverLog(LL_NOTICE,
1475             "Background RDB transfer terminated with success");
1476     } else if (!bysignal && exitcode != 0) {
1477         serverLog(LL_WARNING, "Background transfer error");
1478     } else {
1479         serverLog(LL_WARNING,
1480             "Background transfer terminated by signal %d", bysignal);
1481     }
1482     server.rdb_child_pid = -1;
1483     server.rdb_child_type = RDB_CHILD_TYPE_NONE;
1484     server.rdb_save_time_start = -1;
1485 
1486     /* If the child returns an OK exit code, read the set of slave client
1487      * IDs and the associated status code. We'll terminate all the slaves
1488      * in error state.
1489      *
1490      * If the process returned an error, consider the list of slaves that
1491      * can continue to be emtpy, so that it's just a special case of the
1492      * normal code path. */
1493     ok_slaves = zmalloc(sizeof(uint64_t)); /* Make space for the count. */
1494     ok_slaves[0] = 0;
1495     if (!bysignal && exitcode == 0) {
1496         int readlen = sizeof(uint64_t);
1497 
1498         if (read(server.rdb_pipe_read_result_from_child, ok_slaves, readlen) ==
1499                  readlen)
1500         {
1501             readlen = ok_slaves[0]*sizeof(uint64_t)*2;
1502 
1503             /* Make space for enough elements as specified by the first
1504              * uint64_t element in the array. */
1505             ok_slaves = zrealloc(ok_slaves,sizeof(uint64_t)+readlen);
1506             if (readlen &&
1507                 read(server.rdb_pipe_read_result_from_child, ok_slaves+1,
1508                      readlen) != readlen)
1509             {
1510                 ok_slaves[0] = 0;
1511             }
1512         }
1513     }
1514 
1515     close(server.rdb_pipe_read_result_from_child);
1516     close(server.rdb_pipe_write_result_to_parent);
1517 
1518     /* We can continue the replication process with all the slaves that
1519      * correctly received the full payload. Others are terminated. */
1520     listNode *ln;
1521     listIter li;
1522 
1523     listRewind(server.slaves,&li);
1524     while((ln = listNext(&li))) {
1525         client *slave = ln->value;
1526 
1527         if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
1528             uint64_t j;
1529             int errorcode = 0;
1530 
1531             /* Search for the slave ID in the reply. In order for a slave to
1532              * continue the replication process, we need to find it in the list,
1533              * and it must have an error code set to 0 (which means success). */
1534             for (j = 0; j < ok_slaves[0]; j++) {
1535                 if (slave->id == ok_slaves[2*j+1]) {
1536                     errorcode = ok_slaves[2*j+2];
1537                     break; /* Found in slaves list. */
1538                 }
1539             }
1540             if (j == ok_slaves[0] || errorcode != 0) {
1541                 serverLog(LL_WARNING,
1542                 "Closing slave %s: child->slave RDB transfer failed: %s",
1543                     replicationGetSlaveName(slave),
1544                     (errorcode == 0) ? "RDB transfer child aborted"
1545                                      : strerror(errorcode));
1546                 freeClient(slave);
1547             } else {
1548                 serverLog(LL_WARNING,
1549                 "Slave %s correctly received the streamed RDB file.",
1550                     replicationGetSlaveName(slave));
1551                 /* Restore the socket as non-blocking. */
1552                 anetNonBlock(NULL,slave->fd);
1553                 anetSendTimeout(NULL,slave->fd,0);
1554             }
1555         }
1556     }
1557     zfree(ok_slaves);
1558 
1559     updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? C_OK : C_ERR, RDB_CHILD_TYPE_SOCKET);
1560 }
1561 
1562 /* When a background RDB saving/transfer terminates, call the right handler. */
backgroundSaveDoneHandler(int exitcode,int bysignal)1563 void backgroundSaveDoneHandler(int exitcode, int bysignal) {
1564     switch(server.rdb_child_type) {
1565     case RDB_CHILD_TYPE_DISK:
1566         backgroundSaveDoneHandlerDisk(exitcode,bysignal);
1567         break;
1568     case RDB_CHILD_TYPE_SOCKET:
1569         backgroundSaveDoneHandlerSocket(exitcode,bysignal);
1570         break;
1571     default:
1572         serverPanic("Unknown RDB child type.");
1573         break;
1574     }
1575 }
1576 
1577 /* Spawn an RDB child that writes the RDB to the sockets of the slaves
1578  * that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */
rdbSaveToSlavesSockets(void)1579 int rdbSaveToSlavesSockets(void) {
1580     int *fds;
1581     uint64_t *clientids;
1582     int numfds;
1583     listNode *ln;
1584     listIter li;
1585     pid_t childpid;
1586     long long start;
1587     int pipefds[2];
1588 
1589     if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
1590 
1591     /* Before to fork, create a pipe that will be used in order to
1592      * send back to the parent the IDs of the slaves that successfully
1593      * received all the writes. */
1594     if (pipe(pipefds) == -1) return C_ERR;
1595     server.rdb_pipe_read_result_from_child = pipefds[0];
1596     server.rdb_pipe_write_result_to_parent = pipefds[1];
1597 
1598     /* Collect the file descriptors of the slaves we want to transfer
1599      * the RDB to, which are i WAIT_BGSAVE_START state. */
1600     fds = zmalloc(sizeof(int)*listLength(server.slaves));
1601     /* We also allocate an array of corresponding client IDs. This will
1602      * be useful for the child process in order to build the report
1603      * (sent via unix pipe) that will be sent to the parent. */
1604     clientids = zmalloc(sizeof(uint64_t)*listLength(server.slaves));
1605     numfds = 0;
1606 
1607     listRewind(server.slaves,&li);
1608     while((ln = listNext(&li))) {
1609         client *slave = ln->value;
1610 
1611         if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
1612             clientids[numfds] = slave->id;
1613             fds[numfds++] = slave->fd;
1614             replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset());
1615             /* Put the socket in blocking mode to simplify RDB transfer.
1616              * We'll restore it when the children returns (since duped socket
1617              * will share the O_NONBLOCK attribute with the parent). */
1618             anetBlock(NULL,slave->fd);
1619             anetSendTimeout(NULL,slave->fd,server.repl_timeout*1000);
1620         }
1621     }
1622 
1623     /* Create the child process. */
1624     start = ustime();
1625     if ((childpid = fork()) == 0) {
1626         /* Child */
1627         int retval;
1628         rio slave_sockets;
1629 
1630         rioInitWithFdset(&slave_sockets,fds,numfds);
1631         zfree(fds);
1632 
1633         closeListeningSockets(0);
1634         redisSetProcTitle("redis-rdb-to-slaves");
1635 
1636         retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL);
1637         if (retval == C_OK && rioFlush(&slave_sockets) == 0)
1638             retval = C_ERR;
1639 
1640         if (retval == C_OK) {
1641             size_t private_dirty = zmalloc_get_private_dirty();
1642 
1643             if (private_dirty) {
1644                 serverLog(LL_NOTICE,
1645                     "RDB: %zu MB of memory used by copy-on-write",
1646                     private_dirty/(1024*1024));
1647             }
1648 
1649             /* If we are returning OK, at least one slave was served
1650              * with the RDB file as expected, so we need to send a report
1651              * to the parent via the pipe. The format of the message is:
1652              *
1653              * <len> <slave[0].id> <slave[0].error> ...
1654              *
1655              * len, slave IDs, and slave errors, are all uint64_t integers,
1656              * so basically the reply is composed of 64 bits for the len field
1657              * plus 2 additional 64 bit integers for each entry, for a total
1658              * of 'len' entries.
1659              *
1660              * The 'id' represents the slave's client ID, so that the master
1661              * can match the report with a specific slave, and 'error' is
1662              * set to 0 if the replication process terminated with a success
1663              * or the error code if an error occurred. */
1664             void *msg = zmalloc(sizeof(uint64_t)*(1+2*numfds));
1665             uint64_t *len = msg;
1666             uint64_t *ids = len+1;
1667             int j, msglen;
1668 
1669             *len = numfds;
1670             for (j = 0; j < numfds; j++) {
1671                 *ids++ = clientids[j];
1672                 *ids++ = slave_sockets.io.fdset.state[j];
1673             }
1674 
1675             /* Write the message to the parent. If we have no good slaves or
1676              * we are unable to transfer the message to the parent, we exit
1677              * with an error so that the parent will abort the replication
1678              * process with all the childre that were waiting. */
1679             msglen = sizeof(uint64_t)*(1+2*numfds);
1680             if (*len == 0 ||
1681                 write(server.rdb_pipe_write_result_to_parent,msg,msglen)
1682                 != msglen)
1683             {
1684                 retval = C_ERR;
1685             }
1686             zfree(msg);
1687         }
1688         zfree(clientids);
1689         rioFreeFdset(&slave_sockets);
1690         exitFromChild((retval == C_OK) ? 0 : 1);
1691     } else {
1692         /* Parent */
1693         server.stat_fork_time = ustime()-start;
1694         server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
1695         latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
1696         if (childpid == -1) {
1697             serverLog(LL_WARNING,"Can't save in background: fork: %s",
1698                 strerror(errno));
1699 
1700             /* Undo the state change. The caller will perform cleanup on
1701              * all the slaves in BGSAVE_START state, but an early call to
1702              * replicationSetupSlaveForFullResync() turned it into BGSAVE_END */
1703             listRewind(server.slaves,&li);
1704             while((ln = listNext(&li))) {
1705                 client *slave = ln->value;
1706                 int j;
1707 
1708                 for (j = 0; j < numfds; j++) {
1709                     if (slave->id == clientids[j]) {
1710                         slave->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
1711                         break;
1712                     }
1713                 }
1714             }
1715             close(pipefds[0]);
1716             close(pipefds[1]);
1717         } else {
1718             serverLog(LL_NOTICE,"Background RDB transfer started by pid %d",
1719                 childpid);
1720             server.rdb_save_time_start = time(NULL);
1721             server.rdb_child_pid = childpid;
1722             server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;
1723             updateDictResizePolicy();
1724         }
1725         zfree(clientids);
1726         zfree(fds);
1727         return (childpid == -1) ? C_ERR : C_OK;
1728     }
1729     return C_OK; /* Unreached. */
1730 }
1731 
saveCommand(client * c)1732 void saveCommand(client *c) {
1733     if (server.rdb_child_pid != -1) {
1734         addReplyError(c,"Background save already in progress");
1735         return;
1736     }
1737     if (rdbSave(server.rdb_filename) == C_OK) {
1738         addReply(c,shared.ok);
1739     } else {
1740         addReply(c,shared.err);
1741     }
1742 }
1743 
1744 /* BGSAVE [SCHEDULE] */
bgsaveCommand(client * c)1745 void bgsaveCommand(client *c) {
1746     int schedule = 0;
1747 
1748     /* The SCHEDULE option changes the behavior of BGSAVE when an AOF rewrite
1749      * is in progress. Instead of returning an error a BGSAVE gets scheduled. */
1750     if (c->argc > 1) {
1751         if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"schedule")) {
1752             schedule = 1;
1753         } else {
1754             addReply(c,shared.syntaxerr);
1755             return;
1756         }
1757     }
1758 
1759     if (server.rdb_child_pid != -1) {
1760         addReplyError(c,"Background save already in progress");
1761     } else if (server.aof_child_pid != -1) {
1762         if (schedule) {
1763             server.rdb_bgsave_scheduled = 1;
1764             addReplyStatus(c,"Background saving scheduled");
1765         } else {
1766             addReplyError(c,
1767                 "An AOF log rewriting in progress: can't BGSAVE right now. "
1768                 "Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenver "
1769                 "possible.");
1770         }
1771     } else if (rdbSaveBackground(server.rdb_filename) == C_OK) {
1772         addReplyStatus(c,"Background saving started");
1773     } else {
1774         addReply(c,shared.err);
1775     }
1776 }
1777