xref: /f-stack/app/redis-5.0.5/src/rdb.c (revision 572c4311)
1 /*
2  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *   * Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *   * Redistributions in binary form must reproduce the above copyright
11  *     notice, this list of conditions and the following disclaimer in the
12  *     documentation and/or other materials provided with the distribution.
13  *   * Neither the name of Redis nor the names of its contributors may be used
14  *     to endorse or promote products derived from this software without
15  *     specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include "server.h"
31 #include "lzf.h"    /* LZF compression library */
32 #include "zipmap.h"
33 #include "endianconv.h"
34 #include "stream.h"
35 
36 #include <math.h>
37 #include <sys/types.h>
38 #include <sys/time.h>
39 #include <sys/resource.h>
40 #include <sys/wait.h>
41 #include <arpa/inet.h>
42 #include <sys/stat.h>
43 #include <sys/param.h>
44 
45 #define rdbExitReportCorruptRDB(...) rdbCheckThenExit(__LINE__,__VA_ARGS__)
46 
47 extern int rdbCheckMode;
48 void rdbCheckError(const char *fmt, ...);
49 void rdbCheckSetError(const char *fmt, ...);
50 
rdbCheckThenExit(int linenum,char * reason,...)51 void rdbCheckThenExit(int linenum, char *reason, ...) {
52     va_list ap;
53     char msg[1024];
54     int len;
55 
56     len = snprintf(msg,sizeof(msg),
57         "Internal error in RDB reading function at rdb.c:%d -> ", linenum);
58     va_start(ap,reason);
59     vsnprintf(msg+len,sizeof(msg)-len,reason,ap);
60     va_end(ap);
61 
62     if (!rdbCheckMode) {
63         serverLog(LL_WARNING, "%s", msg);
64         char *argv[2] = {"",server.rdb_filename};
65         redis_check_rdb_main(2,argv,NULL);
66     } else {
67         rdbCheckError("%s",msg);
68     }
69     exit(1);
70 }
71 
rdbWriteRaw(rio * rdb,void * p,size_t len)72 static int rdbWriteRaw(rio *rdb, void *p, size_t len) {
73     if (rdb && rioWrite(rdb,p,len) == 0)
74         return -1;
75     return len;
76 }
77 
78 /* This is just a wrapper for the low level function rioRead() that will
79  * automatically abort if it is not possible to read the specified amount
80  * of bytes. */
rdbLoadRaw(rio * rdb,void * buf,uint64_t len)81 void rdbLoadRaw(rio *rdb, void *buf, uint64_t len) {
82     if (rioRead(rdb,buf,len) == 0) {
83         rdbExitReportCorruptRDB(
84             "Impossible to read %llu bytes in rdbLoadRaw()",
85             (unsigned long long) len);
86         return; /* Not reached. */
87     }
88 }
89 
rdbSaveType(rio * rdb,unsigned char type)90 int rdbSaveType(rio *rdb, unsigned char type) {
91     return rdbWriteRaw(rdb,&type,1);
92 }
93 
94 /* Load a "type" in RDB format, that is a one byte unsigned integer.
95  * This function is not only used to load object types, but also special
96  * "types" like the end-of-file type, the EXPIRE type, and so forth. */
rdbLoadType(rio * rdb)97 int rdbLoadType(rio *rdb) {
98     unsigned char type;
99     if (rioRead(rdb,&type,1) == 0) return -1;
100     return type;
101 }
102 
103 /* This is only used to load old databases stored with the RDB_OPCODE_EXPIRETIME
104  * opcode. New versions of Redis store using the RDB_OPCODE_EXPIRETIME_MS
105  * opcode. */
rdbLoadTime(rio * rdb)106 time_t rdbLoadTime(rio *rdb) {
107     int32_t t32;
108     rdbLoadRaw(rdb,&t32,4);
109     return (time_t)t32;
110 }
111 
rdbSaveMillisecondTime(rio * rdb,long long t)112 int rdbSaveMillisecondTime(rio *rdb, long long t) {
113     int64_t t64 = (int64_t) t;
114     memrev64ifbe(&t64); /* Store in little endian. */
115     return rdbWriteRaw(rdb,&t64,8);
116 }
117 
118 /* This function loads a time from the RDB file. It gets the version of the
119  * RDB because, unfortunately, before Redis 5 (RDB version 9), the function
120  * failed to convert data to/from little endian, so RDB files with keys having
121  * expires could not be shared between big endian and little endian systems
122  * (because the expire time will be totally wrong). The fix for this is just
123  * to call memrev64ifbe(), however if we fix this for all the RDB versions,
124  * this call will introduce an incompatibility for big endian systems:
125  * after upgrading to Redis version 5 they will no longer be able to load their
126  * own old RDB files. Because of that, we instead fix the function only for new
127  * RDB versions, and load older RDB versions as we used to do in the past,
128  * allowing big endian systems to load their own old RDB files. */
rdbLoadMillisecondTime(rio * rdb,int rdbver)129 long long rdbLoadMillisecondTime(rio *rdb, int rdbver) {
130     int64_t t64;
131     rdbLoadRaw(rdb,&t64,8);
132     if (rdbver >= 9) /* Check the top comment of this function. */
133         memrev64ifbe(&t64); /* Convert in big endian if the system is BE. */
134     return (long long)t64;
135 }
136 
137 /* Saves an encoded length. The first two bits in the first byte are used to
138  * hold the encoding type. See the RDB_* definitions for more information
139  * on the types of encoding. */
rdbSaveLen(rio * rdb,uint64_t len)140 int rdbSaveLen(rio *rdb, uint64_t len) {
141     unsigned char buf[2];
142     size_t nwritten;
143 
144     if (len < (1<<6)) {
145         /* Save a 6 bit len */
146         buf[0] = (len&0xFF)|(RDB_6BITLEN<<6);
147         if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
148         nwritten = 1;
149     } else if (len < (1<<14)) {
150         /* Save a 14 bit len */
151         buf[0] = ((len>>8)&0xFF)|(RDB_14BITLEN<<6);
152         buf[1] = len&0xFF;
153         if (rdbWriteRaw(rdb,buf,2) == -1) return -1;
154         nwritten = 2;
155     } else if (len <= UINT32_MAX) {
156         /* Save a 32 bit len */
157         buf[0] = RDB_32BITLEN;
158         if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
159         uint32_t len32 = htonl(len);
160         if (rdbWriteRaw(rdb,&len32,4) == -1) return -1;
161         nwritten = 1+4;
162     } else {
163         /* Save a 64 bit len */
164         buf[0] = RDB_64BITLEN;
165         if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
166         len = htonu64(len);
167         if (rdbWriteRaw(rdb,&len,8) == -1) return -1;
168         nwritten = 1+8;
169     }
170     return nwritten;
171 }
172 
173 
174 /* Load an encoded length. If the loaded length is a normal length as stored
175  * with rdbSaveLen(), the read length is set to '*lenptr'. If instead the
176  * loaded length describes a special encoding that follows, then '*isencoded'
177  * is set to 1 and the encoding format is stored at '*lenptr'.
178  *
179  * See the RDB_ENC_* definitions in rdb.h for more information on special
180  * encodings.
181  *
182  * The function returns -1 on error, 0 on success. */
rdbLoadLenByRef(rio * rdb,int * isencoded,uint64_t * lenptr)183 int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr) {
184     unsigned char buf[2];
185     int type;
186 
187     if (isencoded) *isencoded = 0;
188     if (rioRead(rdb,buf,1) == 0) return -1;
189     type = (buf[0]&0xC0)>>6;
190     if (type == RDB_ENCVAL) {
191         /* Read a 6 bit encoding type. */
192         if (isencoded) *isencoded = 1;
193         *lenptr = buf[0]&0x3F;
194     } else if (type == RDB_6BITLEN) {
195         /* Read a 6 bit len. */
196         *lenptr = buf[0]&0x3F;
197     } else if (type == RDB_14BITLEN) {
198         /* Read a 14 bit len. */
199         if (rioRead(rdb,buf+1,1) == 0) return -1;
200         *lenptr = ((buf[0]&0x3F)<<8)|buf[1];
201     } else if (buf[0] == RDB_32BITLEN) {
202         /* Read a 32 bit len. */
203         uint32_t len;
204         if (rioRead(rdb,&len,4) == 0) return -1;
205         *lenptr = ntohl(len);
206     } else if (buf[0] == RDB_64BITLEN) {
207         /* Read a 64 bit len. */
208         uint64_t len;
209         if (rioRead(rdb,&len,8) == 0) return -1;
210         *lenptr = ntohu64(len);
211     } else {
212         rdbExitReportCorruptRDB(
213             "Unknown length encoding %d in rdbLoadLen()",type);
214         return -1; /* Never reached. */
215     }
216     return 0;
217 }
218 
219 /* This is like rdbLoadLenByRef() but directly returns the value read
220  * from the RDB stream, signaling an error by returning RDB_LENERR
221  * (since it is a too large count to be applicable in any Redis data
222  * structure). */
rdbLoadLen(rio * rdb,int * isencoded)223 uint64_t rdbLoadLen(rio *rdb, int *isencoded) {
224     uint64_t len;
225 
226     if (rdbLoadLenByRef(rdb,isencoded,&len) == -1) return RDB_LENERR;
227     return len;
228 }
229 
230 /* Encodes the "value" argument as integer when it fits in the supported ranges
231  * for encoded types. If the function successfully encodes the integer, the
232  * representation is stored in the buffer pointer to by "enc" and the string
233  * length is returned. Otherwise 0 is returned. */
rdbEncodeInteger(long long value,unsigned char * enc)234 int rdbEncodeInteger(long long value, unsigned char *enc) {
235     if (value >= -(1<<7) && value <= (1<<7)-1) {
236         enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT8;
237         enc[1] = value&0xFF;
238         return 2;
239     } else if (value >= -(1<<15) && value <= (1<<15)-1) {
240         enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT16;
241         enc[1] = value&0xFF;
242         enc[2] = (value>>8)&0xFF;
243         return 3;
244     } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
245         enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT32;
246         enc[1] = value&0xFF;
247         enc[2] = (value>>8)&0xFF;
248         enc[3] = (value>>16)&0xFF;
249         enc[4] = (value>>24)&0xFF;
250         return 5;
251     } else {
252         return 0;
253     }
254 }
255 
256 /* Loads an integer-encoded object with the specified encoding type "enctype".
257  * The returned value changes according to the flags, see
258  * rdbGenerincLoadStringObject() for more info. */
rdbLoadIntegerObject(rio * rdb,int enctype,int flags,size_t * lenptr)259 void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags, size_t *lenptr) {
260     int plain = flags & RDB_LOAD_PLAIN;
261     int sds = flags & RDB_LOAD_SDS;
262     int encode = flags & RDB_LOAD_ENC;
263     unsigned char enc[4];
264     long long val;
265 
266     if (enctype == RDB_ENC_INT8) {
267         if (rioRead(rdb,enc,1) == 0) return NULL;
268         val = (signed char)enc[0];
269     } else if (enctype == RDB_ENC_INT16) {
270         uint16_t v;
271         if (rioRead(rdb,enc,2) == 0) return NULL;
272         v = enc[0]|(enc[1]<<8);
273         val = (int16_t)v;
274     } else if (enctype == RDB_ENC_INT32) {
275         uint32_t v;
276         if (rioRead(rdb,enc,4) == 0) return NULL;
277         v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
278         val = (int32_t)v;
279     } else {
280         val = 0; /* anti-warning */
281         rdbExitReportCorruptRDB("Unknown RDB integer encoding type %d",enctype);
282     }
283     if (plain || sds) {
284         char buf[LONG_STR_SIZE], *p;
285         int len = ll2string(buf,sizeof(buf),val);
286         if (lenptr) *lenptr = len;
287         p = plain ? zmalloc(len) : sdsnewlen(SDS_NOINIT,len);
288         memcpy(p,buf,len);
289         return p;
290     } else if (encode) {
291         return createStringObjectFromLongLongForValue(val);
292     } else {
293         return createObject(OBJ_STRING,sdsfromlonglong(val));
294     }
295 }
296 
297 /* String objects in the form "2391" "-100" without any space and with a
298  * range of values that can fit in an 8, 16 or 32 bit signed value can be
299  * encoded as integers to save space */
rdbTryIntegerEncoding(char * s,size_t len,unsigned char * enc)300 int rdbTryIntegerEncoding(char *s, size_t len, unsigned char *enc) {
301     long long value;
302     char *endptr, buf[32];
303 
304     /* Check if it's possible to encode this value as a number */
305     value = strtoll(s, &endptr, 10);
306     if (endptr[0] != '\0') return 0;
307     ll2string(buf,32,value);
308 
309     /* If the number converted back into a string is not identical
310      * then it's not possible to encode the string as integer */
311     if (strlen(buf) != len || memcmp(buf,s,len)) return 0;
312 
313     return rdbEncodeInteger(value,enc);
314 }
315 
rdbSaveLzfBlob(rio * rdb,void * data,size_t compress_len,size_t original_len)316 ssize_t rdbSaveLzfBlob(rio *rdb, void *data, size_t compress_len,
317                        size_t original_len) {
318     unsigned char byte;
319     ssize_t n, nwritten = 0;
320 
321     /* Data compressed! Let's save it on disk */
322     byte = (RDB_ENCVAL<<6)|RDB_ENC_LZF;
323     if ((n = rdbWriteRaw(rdb,&byte,1)) == -1) goto writeerr;
324     nwritten += n;
325 
326     if ((n = rdbSaveLen(rdb,compress_len)) == -1) goto writeerr;
327     nwritten += n;
328 
329     if ((n = rdbSaveLen(rdb,original_len)) == -1) goto writeerr;
330     nwritten += n;
331 
332     if ((n = rdbWriteRaw(rdb,data,compress_len)) == -1) goto writeerr;
333     nwritten += n;
334 
335     return nwritten;
336 
337 writeerr:
338     return -1;
339 }
340 
rdbSaveLzfStringObject(rio * rdb,unsigned char * s,size_t len)341 ssize_t rdbSaveLzfStringObject(rio *rdb, unsigned char *s, size_t len) {
342     size_t comprlen, outlen;
343     void *out;
344 
345     /* We require at least four bytes compression for this to be worth it */
346     if (len <= 4) return 0;
347     outlen = len-4;
348     if ((out = zmalloc(outlen+1)) == NULL) return 0;
349     comprlen = lzf_compress(s, len, out, outlen);
350     if (comprlen == 0) {
351         zfree(out);
352         return 0;
353     }
354     ssize_t nwritten = rdbSaveLzfBlob(rdb, out, comprlen, len);
355     zfree(out);
356     return nwritten;
357 }
358 
359 /* Load an LZF compressed string in RDB format. The returned value
360  * changes according to 'flags'. For more info check the
361  * rdbGenericLoadStringObject() function. */
rdbLoadLzfStringObject(rio * rdb,int flags,size_t * lenptr)362 void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr) {
363     int plain = flags & RDB_LOAD_PLAIN;
364     int sds = flags & RDB_LOAD_SDS;
365     uint64_t len, clen;
366     unsigned char *c = NULL;
367     char *val = NULL;
368 
369     if ((clen = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
370     if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
371     if ((c = zmalloc(clen)) == NULL) goto err;
372 
373     /* Allocate our target according to the uncompressed size. */
374     if (plain) {
375         val = zmalloc(len);
376     } else {
377         val = sdsnewlen(SDS_NOINIT,len);
378     }
379     if (lenptr) *lenptr = len;
380 
381     /* Load the compressed representation and uncompress it to target. */
382     if (rioRead(rdb,c,clen) == 0) goto err;
383     if (lzf_decompress(c,clen,val,len) == 0) {
384         if (rdbCheckMode) rdbCheckSetError("Invalid LZF compressed string");
385         goto err;
386     }
387     zfree(c);
388 
389     if (plain || sds) {
390         return val;
391     } else {
392         return createObject(OBJ_STRING,val);
393     }
394 err:
395     zfree(c);
396     if (plain)
397         zfree(val);
398     else
399         sdsfree(val);
400     return NULL;
401 }
402 
403 /* Save a string object as [len][data] on disk. If the object is a string
404  * representation of an integer value we try to save it in a special form */
rdbSaveRawString(rio * rdb,unsigned char * s,size_t len)405 ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len) {
406     int enclen;
407     ssize_t n, nwritten = 0;
408 
409     /* Try integer encoding */
410     if (len <= 11) {
411         unsigned char buf[5];
412         if ((enclen = rdbTryIntegerEncoding((char*)s,len,buf)) > 0) {
413             if (rdbWriteRaw(rdb,buf,enclen) == -1) return -1;
414             return enclen;
415         }
416     }
417 
418     /* Try LZF compression - under 20 bytes it's unable to compress even
419      * aaaaaaaaaaaaaaaaaa so skip it */
420     if (server.rdb_compression && len > 20) {
421         n = rdbSaveLzfStringObject(rdb,s,len);
422         if (n == -1) return -1;
423         if (n > 0) return n;
424         /* Return value of 0 means data can't be compressed, save the old way */
425     }
426 
427     /* Store verbatim */
428     if ((n = rdbSaveLen(rdb,len)) == -1) return -1;
429     nwritten += n;
430     if (len > 0) {
431         if (rdbWriteRaw(rdb,s,len) == -1) return -1;
432         nwritten += len;
433     }
434     return nwritten;
435 }
436 
437 /* Save a long long value as either an encoded string or a string. */
rdbSaveLongLongAsStringObject(rio * rdb,long long value)438 ssize_t rdbSaveLongLongAsStringObject(rio *rdb, long long value) {
439     unsigned char buf[32];
440     ssize_t n, nwritten = 0;
441     int enclen = rdbEncodeInteger(value,buf);
442     if (enclen > 0) {
443         return rdbWriteRaw(rdb,buf,enclen);
444     } else {
445         /* Encode as string */
446         enclen = ll2string((char*)buf,32,value);
447         serverAssert(enclen < 32);
448         if ((n = rdbSaveLen(rdb,enclen)) == -1) return -1;
449         nwritten += n;
450         if ((n = rdbWriteRaw(rdb,buf,enclen)) == -1) return -1;
451         nwritten += n;
452     }
453     return nwritten;
454 }
455 
456 /* Like rdbSaveRawString() gets a Redis object instead. */
rdbSaveStringObject(rio * rdb,robj * obj)457 ssize_t rdbSaveStringObject(rio *rdb, robj *obj) {
458     /* Avoid to decode the object, then encode it again, if the
459      * object is already integer encoded. */
460     if (obj->encoding == OBJ_ENCODING_INT) {
461         return rdbSaveLongLongAsStringObject(rdb,(long)obj->ptr);
462     } else {
463         serverAssertWithInfo(NULL,obj,sdsEncodedObject(obj));
464         return rdbSaveRawString(rdb,obj->ptr,sdslen(obj->ptr));
465     }
466 }
467 
468 /* Load a string object from an RDB file according to flags:
469  *
470  * RDB_LOAD_NONE (no flags): load an RDB object, unencoded.
471  * RDB_LOAD_ENC: If the returned type is a Redis object, try to
472  *               encode it in a special way to be more memory
473  *               efficient. When this flag is passed the function
474  *               no longer guarantees that obj->ptr is an SDS string.
475  * RDB_LOAD_PLAIN: Return a plain string allocated with zmalloc()
476  *                 instead of a Redis object with an sds in it.
477  * RDB_LOAD_SDS: Return an SDS string instead of a Redis object.
478  *
479  * On I/O error NULL is returned.
480  */
rdbGenericLoadStringObject(rio * rdb,int flags,size_t * lenptr)481 void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) {
482     int encode = flags & RDB_LOAD_ENC;
483     int plain = flags & RDB_LOAD_PLAIN;
484     int sds = flags & RDB_LOAD_SDS;
485     int isencoded;
486     uint64_t len;
487 
488     len = rdbLoadLen(rdb,&isencoded);
489     if (isencoded) {
490         switch(len) {
491         case RDB_ENC_INT8:
492         case RDB_ENC_INT16:
493         case RDB_ENC_INT32:
494             return rdbLoadIntegerObject(rdb,len,flags,lenptr);
495         case RDB_ENC_LZF:
496             return rdbLoadLzfStringObject(rdb,flags,lenptr);
497         default:
498             rdbExitReportCorruptRDB("Unknown RDB string encoding type %d",len);
499         }
500     }
501 
502     if (len == RDB_LENERR) return NULL;
503     if (plain || sds) {
504         void *buf = plain ? zmalloc(len) : sdsnewlen(SDS_NOINIT,len);
505         if (lenptr) *lenptr = len;
506         if (len && rioRead(rdb,buf,len) == 0) {
507             if (plain)
508                 zfree(buf);
509             else
510                 sdsfree(buf);
511             return NULL;
512         }
513         return buf;
514     } else {
515         robj *o = encode ? createStringObject(SDS_NOINIT,len) :
516                            createRawStringObject(SDS_NOINIT,len);
517         if (len && rioRead(rdb,o->ptr,len) == 0) {
518             decrRefCount(o);
519             return NULL;
520         }
521         return o;
522     }
523 }
524 
rdbLoadStringObject(rio * rdb)525 robj *rdbLoadStringObject(rio *rdb) {
526     return rdbGenericLoadStringObject(rdb,RDB_LOAD_NONE,NULL);
527 }
528 
rdbLoadEncodedStringObject(rio * rdb)529 robj *rdbLoadEncodedStringObject(rio *rdb) {
530     return rdbGenericLoadStringObject(rdb,RDB_LOAD_ENC,NULL);
531 }
532 
533 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
534  * 8 bit integer specifying the length of the representation.
535  * This 8 bit integer has special values in order to specify the following
536  * conditions:
537  * 253: not a number
538  * 254: + inf
539  * 255: - inf
540  */
rdbSaveDoubleValue(rio * rdb,double val)541 int rdbSaveDoubleValue(rio *rdb, double val) {
542     unsigned char buf[128];
543     int len;
544 
545     if (isnan(val)) {
546         buf[0] = 253;
547         len = 1;
548     } else if (!isfinite(val)) {
549         len = 1;
550         buf[0] = (val < 0) ? 255 : 254;
551     } else {
552 #if (DBL_MANT_DIG >= 52) && (LLONG_MAX == 0x7fffffffffffffffLL)
553         /* Check if the float is in a safe range to be casted into a
554          * long long. We are assuming that long long is 64 bit here.
555          * Also we are assuming that there are no implementations around where
556          * double has precision < 52 bit.
557          *
558          * Under this assumptions we test if a double is inside an interval
559          * where casting to long long is safe. Then using two castings we
560          * make sure the decimal part is zero. If all this is true we use
561          * integer printing function that is much faster. */
562         double min = -4503599627370495; /* (2^52)-1 */
563         double max = 4503599627370496; /* -(2^52) */
564         if (val > min && val < max && val == ((double)((long long)val)))
565             ll2string((char*)buf+1,sizeof(buf)-1,(long long)val);
566         else
567 #endif
568             snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
569         buf[0] = strlen((char*)buf+1);
570         len = buf[0]+1;
571     }
572     return rdbWriteRaw(rdb,buf,len);
573 }
574 
575 /* For information about double serialization check rdbSaveDoubleValue() */
rdbLoadDoubleValue(rio * rdb,double * val)576 int rdbLoadDoubleValue(rio *rdb, double *val) {
577     char buf[256];
578     unsigned char len;
579 
580     if (rioRead(rdb,&len,1) == 0) return -1;
581     switch(len) {
582     case 255: *val = R_NegInf; return 0;
583     case 254: *val = R_PosInf; return 0;
584     case 253: *val = R_Nan; return 0;
585     default:
586         if (rioRead(rdb,buf,len) == 0) return -1;
587         buf[len] = '\0';
588         sscanf(buf, "%lg", val);
589         return 0;
590     }
591 }
592 
593 /* Saves a double for RDB 8 or greater, where IE754 binary64 format is assumed.
594  * We just make sure the integer is always stored in little endian, otherwise
595  * the value is copied verbatim from memory to disk.
596  *
597  * Return -1 on error, the size of the serialized value on success. */
rdbSaveBinaryDoubleValue(rio * rdb,double val)598 int rdbSaveBinaryDoubleValue(rio *rdb, double val) {
599     memrev64ifbe(&val);
600     return rdbWriteRaw(rdb,&val,sizeof(val));
601 }
602 
603 /* Loads a double from RDB 8 or greater. See rdbSaveBinaryDoubleValue() for
604  * more info. On error -1 is returned, otherwise 0. */
rdbLoadBinaryDoubleValue(rio * rdb,double * val)605 int rdbLoadBinaryDoubleValue(rio *rdb, double *val) {
606     if (rioRead(rdb,val,sizeof(*val)) == 0) return -1;
607     memrev64ifbe(val);
608     return 0;
609 }
610 
611 /* Like rdbSaveBinaryDoubleValue() but single precision. */
rdbSaveBinaryFloatValue(rio * rdb,float val)612 int rdbSaveBinaryFloatValue(rio *rdb, float val) {
613     memrev32ifbe(&val);
614     return rdbWriteRaw(rdb,&val,sizeof(val));
615 }
616 
617 /* Like rdbLoadBinaryDoubleValue() but single precision. */
rdbLoadBinaryFloatValue(rio * rdb,float * val)618 int rdbLoadBinaryFloatValue(rio *rdb, float *val) {
619     if (rioRead(rdb,val,sizeof(*val)) == 0) return -1;
620     memrev32ifbe(val);
621     return 0;
622 }
623 
624 /* Save the object type of object "o". */
rdbSaveObjectType(rio * rdb,robj * o)625 int rdbSaveObjectType(rio *rdb, robj *o) {
626     switch (o->type) {
627     case OBJ_STRING:
628         return rdbSaveType(rdb,RDB_TYPE_STRING);
629     case OBJ_LIST:
630         if (o->encoding == OBJ_ENCODING_QUICKLIST)
631             return rdbSaveType(rdb,RDB_TYPE_LIST_QUICKLIST);
632         else
633             serverPanic("Unknown list encoding");
634     case OBJ_SET:
635         if (o->encoding == OBJ_ENCODING_INTSET)
636             return rdbSaveType(rdb,RDB_TYPE_SET_INTSET);
637         else if (o->encoding == OBJ_ENCODING_HT)
638             return rdbSaveType(rdb,RDB_TYPE_SET);
639         else
640             serverPanic("Unknown set encoding");
641     case OBJ_ZSET:
642         if (o->encoding == OBJ_ENCODING_ZIPLIST)
643             return rdbSaveType(rdb,RDB_TYPE_ZSET_ZIPLIST);
644         else if (o->encoding == OBJ_ENCODING_SKIPLIST)
645             return rdbSaveType(rdb,RDB_TYPE_ZSET_2);
646         else
647             serverPanic("Unknown sorted set encoding");
648     case OBJ_HASH:
649         if (o->encoding == OBJ_ENCODING_ZIPLIST)
650             return rdbSaveType(rdb,RDB_TYPE_HASH_ZIPLIST);
651         else if (o->encoding == OBJ_ENCODING_HT)
652             return rdbSaveType(rdb,RDB_TYPE_HASH);
653         else
654             serverPanic("Unknown hash encoding");
655     case OBJ_STREAM:
656         return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS);
657     case OBJ_MODULE:
658         return rdbSaveType(rdb,RDB_TYPE_MODULE_2);
659     default:
660         serverPanic("Unknown object type");
661     }
662     return -1; /* avoid warning */
663 }
664 
665 /* Use rdbLoadType() to load a TYPE in RDB format, but returns -1 if the
666  * type is not specifically a valid Object Type. */
rdbLoadObjectType(rio * rdb)667 int rdbLoadObjectType(rio *rdb) {
668     int type;
669     if ((type = rdbLoadType(rdb)) == -1) return -1;
670     if (!rdbIsObjectType(type)) return -1;
671     return type;
672 }
673 
674 /* This helper function serializes a consumer group Pending Entries List (PEL)
675  * into the RDB file. The 'nacks' argument tells the function if also persist
676  * the informations about the not acknowledged message, or if to persist
677  * just the IDs: this is useful because for the global consumer group PEL
678  * we serialized the NACKs as well, but when serializing the local consumer
679  * PELs we just add the ID, that will be resolved inside the global PEL to
680  * put a reference to the same structure. */
rdbSaveStreamPEL(rio * rdb,rax * pel,int nacks)681 ssize_t rdbSaveStreamPEL(rio *rdb, rax *pel, int nacks) {
682     ssize_t n, nwritten = 0;
683 
684     /* Number of entries in the PEL. */
685     if ((n = rdbSaveLen(rdb,raxSize(pel))) == -1) return -1;
686     nwritten += n;
687 
688     /* Save each entry. */
689     raxIterator ri;
690     raxStart(&ri,pel);
691     raxSeek(&ri,"^",NULL,0);
692     while(raxNext(&ri)) {
693         /* We store IDs in raw form as 128 big big endian numbers, like
694          * they are inside the radix tree key. */
695         if ((n = rdbWriteRaw(rdb,ri.key,sizeof(streamID))) == -1) return -1;
696         nwritten += n;
697 
698         if (nacks) {
699             streamNACK *nack = ri.data;
700             if ((n = rdbSaveMillisecondTime(rdb,nack->delivery_time)) == -1)
701                 return -1;
702             nwritten += n;
703             if ((n = rdbSaveLen(rdb,nack->delivery_count)) == -1) return -1;
704             nwritten += n;
705             /* We don't save the consumer name: we'll save the pending IDs
706              * for each consumer in the consumer PEL, and resolve the consumer
707              * at loading time. */
708         }
709     }
710     raxStop(&ri);
711     return nwritten;
712 }
713 
714 /* Serialize the consumers of a stream consumer group into the RDB. Helper
715  * function for the stream data type serialization. What we do here is to
716  * persist the consumer metadata, and it's PEL, for each consumer. */
rdbSaveStreamConsumers(rio * rdb,streamCG * cg)717 size_t rdbSaveStreamConsumers(rio *rdb, streamCG *cg) {
718     ssize_t n, nwritten = 0;
719 
720     /* Number of consumers in this consumer group. */
721     if ((n = rdbSaveLen(rdb,raxSize(cg->consumers))) == -1) return -1;
722     nwritten += n;
723 
724     /* Save each consumer. */
725     raxIterator ri;
726     raxStart(&ri,cg->consumers);
727     raxSeek(&ri,"^",NULL,0);
728     while(raxNext(&ri)) {
729         streamConsumer *consumer = ri.data;
730 
731         /* Consumer name. */
732         if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) return -1;
733         nwritten += n;
734 
735         /* Last seen time. */
736         if ((n = rdbSaveMillisecondTime(rdb,consumer->seen_time)) == -1)
737             return -1;
738         nwritten += n;
739 
740         /* Consumer PEL, without the ACKs (see last parameter of the function
741          * passed with value of 0), at loading time we'll lookup the ID
742          * in the consumer group global PEL and will put a reference in the
743          * consumer local PEL. */
744         if ((n = rdbSaveStreamPEL(rdb,consumer->pel,0)) == -1)
745             return -1;
746         nwritten += n;
747     }
748     raxStop(&ri);
749     return nwritten;
750 }
751 
752 /* Save a Redis object.
753  * Returns -1 on error, number of bytes written on success. */
rdbSaveObject(rio * rdb,robj * o,robj * key)754 ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
755     ssize_t n = 0, nwritten = 0;
756 
757     if (o->type == OBJ_STRING) {
758         /* Save a string value */
759         if ((n = rdbSaveStringObject(rdb,o)) == -1) return -1;
760         nwritten += n;
761     } else if (o->type == OBJ_LIST) {
762         /* Save a list value */
763         if (o->encoding == OBJ_ENCODING_QUICKLIST) {
764             quicklist *ql = o->ptr;
765             quicklistNode *node = ql->head;
766 
767             if ((n = rdbSaveLen(rdb,ql->len)) == -1) return -1;
768             nwritten += n;
769 
770             while(node) {
771                 if (quicklistNodeIsCompressed(node)) {
772                     void *data;
773                     size_t compress_len = quicklistGetLzf(node, &data);
774                     if ((n = rdbSaveLzfBlob(rdb,data,compress_len,node->sz)) == -1) return -1;
775                     nwritten += n;
776                 } else {
777                     if ((n = rdbSaveRawString(rdb,node->zl,node->sz)) == -1) return -1;
778                     nwritten += n;
779                 }
780                 node = node->next;
781             }
782         } else {
783             serverPanic("Unknown list encoding");
784         }
785     } else if (o->type == OBJ_SET) {
786         /* Save a set value */
787         if (o->encoding == OBJ_ENCODING_HT) {
788             dict *set = o->ptr;
789             dictIterator *di = dictGetIterator(set);
790             dictEntry *de;
791 
792             if ((n = rdbSaveLen(rdb,dictSize(set))) == -1) {
793                 dictReleaseIterator(di);
794                 return -1;
795             }
796             nwritten += n;
797 
798             while((de = dictNext(di)) != NULL) {
799                 sds ele = dictGetKey(de);
800                 if ((n = rdbSaveRawString(rdb,(unsigned char*)ele,sdslen(ele)))
801                     == -1)
802                 {
803                     dictReleaseIterator(di);
804                     return -1;
805                 }
806                 nwritten += n;
807             }
808             dictReleaseIterator(di);
809         } else if (o->encoding == OBJ_ENCODING_INTSET) {
810             size_t l = intsetBlobLen((intset*)o->ptr);
811 
812             if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
813             nwritten += n;
814         } else {
815             serverPanic("Unknown set encoding");
816         }
817     } else if (o->type == OBJ_ZSET) {
818         /* Save a sorted set value */
819         if (o->encoding == OBJ_ENCODING_ZIPLIST) {
820             size_t l = ziplistBlobLen((unsigned char*)o->ptr);
821 
822             if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
823             nwritten += n;
824         } else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
825             zset *zs = o->ptr;
826             zskiplist *zsl = zs->zsl;
827 
828             if ((n = rdbSaveLen(rdb,zsl->length)) == -1) return -1;
829             nwritten += n;
830 
831             /* We save the skiplist elements from the greatest to the smallest
832              * (that's trivial since the elements are already ordered in the
833              * skiplist): this improves the load process, since the next loaded
834              * element will always be the smaller, so adding to the skiplist
835              * will always immediately stop at the head, making the insertion
836              * O(1) instead of O(log(N)). */
837             zskiplistNode *zn = zsl->tail;
838             while (zn != NULL) {
839                 if ((n = rdbSaveRawString(rdb,
840                     (unsigned char*)zn->ele,sdslen(zn->ele))) == -1)
841                 {
842                     return -1;
843                 }
844                 nwritten += n;
845                 if ((n = rdbSaveBinaryDoubleValue(rdb,zn->score)) == -1)
846                     return -1;
847                 nwritten += n;
848                 zn = zn->backward;
849             }
850         } else {
851             serverPanic("Unknown sorted set encoding");
852         }
853     } else if (o->type == OBJ_HASH) {
854         /* Save a hash value */
855         if (o->encoding == OBJ_ENCODING_ZIPLIST) {
856             size_t l = ziplistBlobLen((unsigned char*)o->ptr);
857 
858             if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
859             nwritten += n;
860 
861         } else if (o->encoding == OBJ_ENCODING_HT) {
862             dictIterator *di = dictGetIterator(o->ptr);
863             dictEntry *de;
864 
865             if ((n = rdbSaveLen(rdb,dictSize((dict*)o->ptr))) == -1) {
866                 dictReleaseIterator(di);
867                 return -1;
868             }
869             nwritten += n;
870 
871             while((de = dictNext(di)) != NULL) {
872                 sds field = dictGetKey(de);
873                 sds value = dictGetVal(de);
874 
875                 if ((n = rdbSaveRawString(rdb,(unsigned char*)field,
876                         sdslen(field))) == -1)
877                 {
878                     dictReleaseIterator(di);
879                     return -1;
880                 }
881                 nwritten += n;
882                 if ((n = rdbSaveRawString(rdb,(unsigned char*)value,
883                         sdslen(value))) == -1)
884                 {
885                     dictReleaseIterator(di);
886                     return -1;
887                 }
888                 nwritten += n;
889             }
890             dictReleaseIterator(di);
891         } else {
892             serverPanic("Unknown hash encoding");
893         }
894     } else if (o->type == OBJ_STREAM) {
895         /* Store how many listpacks we have inside the radix tree. */
896         stream *s = o->ptr;
897         rax *rax = s->rax;
898         if ((n = rdbSaveLen(rdb,raxSize(rax))) == -1) return -1;
899         nwritten += n;
900 
901         /* Serialize all the listpacks inside the radix tree as they are,
902          * when loading back, we'll use the first entry of each listpack
903          * to insert it back into the radix tree. */
904         raxIterator ri;
905         raxStart(&ri,rax);
906         raxSeek(&ri,"^",NULL,0);
907         while (raxNext(&ri)) {
908             unsigned char *lp = ri.data;
909             size_t lp_bytes = lpBytes(lp);
910             if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) return -1;
911             nwritten += n;
912             if ((n = rdbSaveRawString(rdb,lp,lp_bytes)) == -1) return -1;
913             nwritten += n;
914         }
915         raxStop(&ri);
916 
917         /* Save the number of elements inside the stream. We cannot obtain
918          * this easily later, since our macro nodes should be checked for
919          * number of items: not a great CPU / space tradeoff. */
920         if ((n = rdbSaveLen(rdb,s->length)) == -1) return -1;
921         nwritten += n;
922         /* Save the last entry ID. */
923         if ((n = rdbSaveLen(rdb,s->last_id.ms)) == -1) return -1;
924         nwritten += n;
925         if ((n = rdbSaveLen(rdb,s->last_id.seq)) == -1) return -1;
926         nwritten += n;
927 
928         /* The consumer groups and their clients are part of the stream
929          * type, so serialize every consumer group. */
930 
931         /* Save the number of groups. */
932         size_t num_cgroups = s->cgroups ? raxSize(s->cgroups) : 0;
933         if ((n = rdbSaveLen(rdb,num_cgroups)) == -1) return -1;
934         nwritten += n;
935 
936         if (num_cgroups) {
937             /* Serialize each consumer group. */
938             raxStart(&ri,s->cgroups);
939             raxSeek(&ri,"^",NULL,0);
940             while(raxNext(&ri)) {
941                 streamCG *cg = ri.data;
942 
943                 /* Save the group name. */
944                 if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1)
945                     return -1;
946                 nwritten += n;
947 
948                 /* Last ID. */
949                 if ((n = rdbSaveLen(rdb,cg->last_id.ms)) == -1) return -1;
950                 nwritten += n;
951                 if ((n = rdbSaveLen(rdb,cg->last_id.seq)) == -1) return -1;
952                 nwritten += n;
953 
954                 /* Save the global PEL. */
955                 if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) return -1;
956                 nwritten += n;
957 
958                 /* Save the consumers of this group. */
959                 if ((n = rdbSaveStreamConsumers(rdb,cg)) == -1) return -1;
960                 nwritten += n;
961             }
962             raxStop(&ri);
963         }
964     } else if (o->type == OBJ_MODULE) {
965         /* Save a module-specific value. */
966         RedisModuleIO io;
967         moduleValue *mv = o->ptr;
968         moduleType *mt = mv->type;
969         moduleInitIOContext(io,mt,rdb,key);
970 
971         /* Write the "module" identifier as prefix, so that we'll be able
972          * to call the right module during loading. */
973         int retval = rdbSaveLen(rdb,mt->id);
974         if (retval == -1) return -1;
975         io.bytes += retval;
976 
977         /* Then write the module-specific representation + EOF marker. */
978         mt->rdb_save(&io,mv->value);
979         retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
980         if (retval == -1) return -1;
981         io.bytes += retval;
982 
983         if (io.ctx) {
984             moduleFreeContext(io.ctx);
985             zfree(io.ctx);
986         }
987         return io.error ? -1 : (ssize_t)io.bytes;
988     } else {
989         serverPanic("Unknown object type");
990     }
991     return nwritten;
992 }
993 
994 /* Return the length the object will have on disk if saved with
995  * the rdbSaveObject() function. Currently we use a trick to get
996  * this length with very little changes to the code. In the future
997  * we could switch to a faster solution. */
rdbSavedObjectLen(robj * o)998 size_t rdbSavedObjectLen(robj *o) {
999     ssize_t len = rdbSaveObject(NULL,o,NULL);
1000     serverAssertWithInfo(NULL,o,len != -1);
1001     return len;
1002 }
1003 
1004 /* Save a key-value pair, with expire time, type, key, value.
1005  * On error -1 is returned.
1006  * On success if the key was actually saved 1 is returned, otherwise 0
1007  * is returned (the key was already expired). */
rdbSaveKeyValuePair(rio * rdb,robj * key,robj * val,long long expiretime)1008 int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
1009     int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
1010     int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;
1011 
1012     /* Save the expire time */
1013     if (expiretime != -1) {
1014         if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
1015         if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
1016     }
1017 
1018     /* Save the LRU info. */
1019     if (savelru) {
1020         uint64_t idletime = estimateObjectIdleTime(val);
1021         idletime /= 1000; /* Using seconds is enough and requires less space.*/
1022         if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
1023         if (rdbSaveLen(rdb,idletime) == -1) return -1;
1024     }
1025 
1026     /* Save the LFU info. */
1027     if (savelfu) {
1028         uint8_t buf[1];
1029         buf[0] = LFUDecrAndReturn(val);
1030         /* We can encode this in exactly two bytes: the opcode and an 8
1031          * bit counter, since the frequency is logarithmic with a 0-255 range.
1032          * Note that we do not store the halving time because to reset it
1033          * a single time when loading does not affect the frequency much. */
1034         if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;
1035         if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
1036     }
1037 
1038     /* Save type, key, value */
1039     if (rdbSaveObjectType(rdb,val) == -1) return -1;
1040     if (rdbSaveStringObject(rdb,key) == -1) return -1;
1041     if (rdbSaveObject(rdb,val,key) == -1) return -1;
1042     return 1;
1043 }
1044 
1045 /* Save an AUX field. */
rdbSaveAuxField(rio * rdb,void * key,size_t keylen,void * val,size_t vallen)1046 ssize_t rdbSaveAuxField(rio *rdb, void *key, size_t keylen, void *val, size_t vallen) {
1047     ssize_t ret, len = 0;
1048     if ((ret = rdbSaveType(rdb,RDB_OPCODE_AUX)) == -1) return -1;
1049     len += ret;
1050     if ((ret = rdbSaveRawString(rdb,key,keylen)) == -1) return -1;
1051     len += ret;
1052     if ((ret = rdbSaveRawString(rdb,val,vallen)) == -1) return -1;
1053     len += ret;
1054     return len;
1055 }
1056 
1057 /* Wrapper for rdbSaveAuxField() used when key/val length can be obtained
1058  * with strlen(). */
rdbSaveAuxFieldStrStr(rio * rdb,char * key,char * val)1059 ssize_t rdbSaveAuxFieldStrStr(rio *rdb, char *key, char *val) {
1060     return rdbSaveAuxField(rdb,key,strlen(key),val,strlen(val));
1061 }
1062 
1063 /* Wrapper for strlen(key) + integer type (up to long long range). */
rdbSaveAuxFieldStrInt(rio * rdb,char * key,long long val)1064 ssize_t rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) {
1065     char buf[LONG_STR_SIZE];
1066     int vlen = ll2string(buf,sizeof(buf),val);
1067     return rdbSaveAuxField(rdb,key,strlen(key),buf,vlen);
1068 }
1069 
1070 /* Save a few default AUX fields with information about the RDB generated. */
rdbSaveInfoAuxFields(rio * rdb,int flags,rdbSaveInfo * rsi)1071 int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
1072     int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
1073     int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0;
1074 
1075     /* Add a few fields about the state when the RDB was created. */
1076     if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1;
1077     if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1;
1078     if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1;
1079     if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1;
1080 
1081     /* Handle saving options that generate aux fields. */
1082     if (rsi) {
1083         if (rdbSaveAuxFieldStrInt(rdb,"repl-stream-db",rsi->repl_stream_db)
1084             == -1) return -1;
1085         if (rdbSaveAuxFieldStrStr(rdb,"repl-id",server.replid)
1086             == -1) return -1;
1087         if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",server.master_repl_offset)
1088             == -1) return -1;
1089     }
1090     if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1;
1091     return 1;
1092 }
1093 
1094 /* Produces a dump of the database in RDB format sending it to the specified
1095  * Redis I/O channel. On success C_OK is returned, otherwise C_ERR
1096  * is returned and part of the output, or all the output, can be
1097  * missing because of I/O errors.
1098  *
1099  * When the function returns C_ERR and if 'error' is not NULL, the
1100  * integer pointed by 'error' is set to the value of errno just after the I/O
1101  * error. */
rdbSaveRio(rio * rdb,int * error,int flags,rdbSaveInfo * rsi)1102 int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
1103     dictIterator *di = NULL;
1104     dictEntry *de;
1105     char magic[10];
1106     int j;
1107     uint64_t cksum;
1108     size_t processed = 0;
1109 
1110     if (server.rdb_checksum)
1111         rdb->update_cksum = rioGenericUpdateChecksum;
1112     snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
1113     if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
1114     if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
1115 
1116     for (j = 0; j < server.dbnum; j++) {
1117         redisDb *db = server.db+j;
1118         dict *d = db->dict;
1119         if (dictSize(d) == 0) continue;
1120         di = dictGetSafeIterator(d);
1121 
1122         /* Write the SELECT DB opcode */
1123         if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
1124         if (rdbSaveLen(rdb,j) == -1) goto werr;
1125 
1126         /* Write the RESIZE DB opcode. We trim the size to UINT32_MAX, which
1127          * is currently the largest type we are able to represent in RDB sizes.
1128          * However this does not limit the actual size of the DB to load since
1129          * these sizes are just hints to resize the hash tables. */
1130         uint64_t db_size, expires_size;
1131         db_size = dictSize(db->dict);
1132         expires_size = dictSize(db->expires);
1133         if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
1134         if (rdbSaveLen(rdb,db_size) == -1) goto werr;
1135         if (rdbSaveLen(rdb,expires_size) == -1) goto werr;
1136 
1137         /* Iterate this DB writing every entry */
1138         while((de = dictNext(di)) != NULL) {
1139             sds keystr = dictGetKey(de);
1140             robj key, *o = dictGetVal(de);
1141             long long expire;
1142 
1143             initStaticStringObject(key,keystr);
1144             expire = getExpire(db,&key);
1145             if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;
1146 
1147             /* When this RDB is produced as part of an AOF rewrite, move
1148              * accumulated diff from parent to child while rewriting in
1149              * order to have a smaller final write. */
1150             if (flags & RDB_SAVE_AOF_PREAMBLE &&
1151                 rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
1152             {
1153                 processed = rdb->processed_bytes;
1154                 aofReadDiffFromParent();
1155             }
1156         }
1157         dictReleaseIterator(di);
1158         di = NULL; /* So that we don't release it again on error. */
1159     }
1160 
1161     /* If we are storing the replication information on disk, persist
1162      * the script cache as well: on successful PSYNC after a restart, we need
1163      * to be able to process any EVALSHA inside the replication backlog the
1164      * master will send us. */
1165     if (rsi && dictSize(server.lua_scripts)) {
1166         di = dictGetIterator(server.lua_scripts);
1167         while((de = dictNext(di)) != NULL) {
1168             robj *body = dictGetVal(de);
1169             if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
1170                 goto werr;
1171         }
1172         dictReleaseIterator(di);
1173         di = NULL; /* So that we don't release it again on error. */
1174     }
1175 
1176     /* EOF opcode */
1177     if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
1178 
1179     /* CRC64 checksum. It will be zero if checksum computation is disabled, the
1180      * loading code skips the check in this case. */
1181     cksum = rdb->cksum;
1182     memrev64ifbe(&cksum);
1183     if (rioWrite(rdb,&cksum,8) == 0) goto werr;
1184     return C_OK;
1185 
1186 werr:
1187     if (error) *error = errno;
1188     if (di) dictReleaseIterator(di);
1189     return C_ERR;
1190 }
1191 
1192 /* This is just a wrapper to rdbSaveRio() that additionally adds a prefix
1193  * and a suffix to the generated RDB dump. The prefix is:
1194  *
1195  * $EOF:<40 bytes unguessable hex string>\r\n
1196  *
1197  * While the suffix is the 40 bytes hex string we announced in the prefix.
1198  * This way processes receiving the payload can understand when it ends
1199  * without doing any processing of the content. */
rdbSaveRioWithEOFMark(rio * rdb,int * error,rdbSaveInfo * rsi)1200 int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) {
1201     char eofmark[RDB_EOF_MARK_SIZE];
1202 
1203     getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE);
1204     if (error) *error = 0;
1205     if (rioWrite(rdb,"$EOF:",5) == 0) goto werr;
1206     if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
1207     if (rioWrite(rdb,"\r\n",2) == 0) goto werr;
1208     if (rdbSaveRio(rdb,error,RDB_SAVE_NONE,rsi) == C_ERR) goto werr;
1209     if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
1210     return C_OK;
1211 
1212 werr: /* Write error. */
1213     /* Set 'error' only if not already set by rdbSaveRio() call. */
1214     if (error && *error == 0) *error = errno;
1215     return C_ERR;
1216 }
1217 
1218 /* Save the DB on disk. Return C_ERR on error, C_OK on success. */
rdbSave(char * filename,rdbSaveInfo * rsi)1219 int rdbSave(char *filename, rdbSaveInfo *rsi) {
1220     char tmpfile[256];
1221     char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
1222     FILE *fp;
1223     rio rdb;
1224     int error = 0;
1225 
1226     snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
1227     fp = fopen(tmpfile,"w");
1228     if (!fp) {
1229         char *cwdp = getcwd(cwd,MAXPATHLEN);
1230         serverLog(LL_WARNING,
1231             "Failed opening the RDB file %s (in server root dir %s) "
1232             "for saving: %s",
1233             filename,
1234             cwdp ? cwdp : "unknown",
1235             strerror(errno));
1236         return C_ERR;
1237     }
1238 
1239     rioInitWithFile(&rdb,fp);
1240 
1241     if (server.rdb_save_incremental_fsync)
1242         rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
1243 
1244     if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
1245         errno = error;
1246         goto werr;
1247     }
1248 
1249     /* Make sure data will not remain on the OS's output buffers */
1250     if (fflush(fp) == EOF) goto werr;
1251     if (fsync(fileno(fp)) == -1) goto werr;
1252     if (fclose(fp) == EOF) goto werr;
1253 
1254     /* Use RENAME to make sure the DB file is changed atomically only
1255      * if the generate DB file is ok. */
1256     if (rename(tmpfile,filename) == -1) {
1257         char *cwdp = getcwd(cwd,MAXPATHLEN);
1258         serverLog(LL_WARNING,
1259             "Error moving temp DB file %s on the final "
1260             "destination %s (in server root dir %s): %s",
1261             tmpfile,
1262             filename,
1263             cwdp ? cwdp : "unknown",
1264             strerror(errno));
1265         unlink(tmpfile);
1266         return C_ERR;
1267     }
1268 
1269     serverLog(LL_NOTICE,"DB saved on disk");
1270     server.dirty = 0;
1271     server.lastsave = time(NULL);
1272     server.lastbgsave_status = C_OK;
1273     return C_OK;
1274 
1275 werr:
1276     serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
1277     fclose(fp);
1278     unlink(tmpfile);
1279     return C_ERR;
1280 }
1281 
rdbSaveBackground(char * filename,rdbSaveInfo * rsi)1282 int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
1283     pid_t childpid;
1284     long long start;
1285 
1286     if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
1287 
1288     server.dirty_before_bgsave = server.dirty;
1289     server.lastbgsave_try = time(NULL);
1290     openChildInfoPipe();
1291 
1292     start = ustime();
1293     if ((childpid = fork()) == 0) {
1294         int retval;
1295 
1296         /* Child */
1297         closeListeningSockets(0);
1298         resetCpuAffinity("rdb-bgsave");
1299         redisSetProcTitle("redis-rdb-bgsave");
1300         retval = rdbSave(filename,rsi);
1301         if (retval == C_OK) {
1302             size_t private_dirty = zmalloc_get_private_dirty(-1);
1303 
1304             if (private_dirty) {
1305                 serverLog(LL_NOTICE,
1306                     "RDB: %zu MB of memory used by copy-on-write",
1307                     private_dirty/(1024*1024));
1308             }
1309 
1310             server.child_info_data.cow_size = private_dirty;
1311             sendChildInfo(CHILD_INFO_TYPE_RDB);
1312         }
1313         exitFromChild((retval == C_OK) ? 0 : 1);
1314     } else {
1315         /* Parent */
1316         server.stat_fork_time = ustime()-start;
1317         server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
1318         latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
1319         if (childpid == -1) {
1320             closeChildInfoPipe();
1321             server.lastbgsave_status = C_ERR;
1322             serverLog(LL_WARNING,"Can't save in background: fork: %s",
1323                 strerror(errno));
1324             return C_ERR;
1325         }
1326         serverLog(LL_NOTICE,"Background saving started by pid %d",childpid);
1327         server.rdb_save_time_start = time(NULL);
1328         server.rdb_child_pid = childpid;
1329         server.rdb_child_type = RDB_CHILD_TYPE_DISK;
1330         updateDictResizePolicy();
1331         return C_OK;
1332     }
1333     return C_OK; /* unreached */
1334 }
1335 
rdbRemoveTempFile(pid_t childpid)1336 void rdbRemoveTempFile(pid_t childpid) {
1337     char tmpfile[256];
1338 
1339     snprintf(tmpfile,sizeof(tmpfile),"temp-%d.rdb", (int) childpid);
1340     unlink(tmpfile);
1341 }
1342 
1343 /* This function is called by rdbLoadObject() when the code is in RDB-check
1344  * mode and we find a module value of type 2 that can be parsed without
1345  * the need of the actual module. The value is parsed for errors, finally
1346  * a dummy redis object is returned just to conform to the API. */
rdbLoadCheckModuleValue(rio * rdb,char * modulename)1347 robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename) {
1348     uint64_t opcode;
1349     while((opcode = rdbLoadLen(rdb,NULL)) != RDB_MODULE_OPCODE_EOF) {
1350         if (opcode == RDB_MODULE_OPCODE_SINT ||
1351             opcode == RDB_MODULE_OPCODE_UINT)
1352         {
1353             uint64_t len;
1354             if (rdbLoadLenByRef(rdb,NULL,&len) == -1) {
1355                 rdbExitReportCorruptRDB(
1356                     "Error reading integer from module %s value", modulename);
1357             }
1358         } else if (opcode == RDB_MODULE_OPCODE_STRING) {
1359             robj *o = rdbGenericLoadStringObject(rdb,RDB_LOAD_NONE,NULL);
1360             if (o == NULL) {
1361                 rdbExitReportCorruptRDB(
1362                     "Error reading string from module %s value", modulename);
1363             }
1364             decrRefCount(o);
1365         } else if (opcode == RDB_MODULE_OPCODE_FLOAT) {
1366             float val;
1367             if (rdbLoadBinaryFloatValue(rdb,&val) == -1) {
1368                 rdbExitReportCorruptRDB(
1369                     "Error reading float from module %s value", modulename);
1370             }
1371         } else if (opcode == RDB_MODULE_OPCODE_DOUBLE) {
1372             double val;
1373             if (rdbLoadBinaryDoubleValue(rdb,&val) == -1) {
1374                 rdbExitReportCorruptRDB(
1375                     "Error reading double from module %s value", modulename);
1376             }
1377         }
1378     }
1379     return createStringObject("module-dummy-value",18);
1380 }
1381 
1382 /* Load a Redis object of the specified type from the specified file.
1383  * On success a newly allocated object is returned, otherwise NULL. */
rdbLoadObject(int rdbtype,rio * rdb,robj * key)1384 robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
1385     robj *o = NULL, *ele, *dec;
1386     uint64_t len;
1387     unsigned int i;
1388 
1389     if (rdbtype == RDB_TYPE_STRING) {
1390         /* Read string value */
1391         if ((o = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
1392         o = tryObjectEncoding(o);
1393     } else if (rdbtype == RDB_TYPE_LIST) {
1394         /* Read list value */
1395         if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
1396 
1397         o = createQuicklistObject();
1398         quicklistSetOptions(o->ptr, server.list_max_ziplist_size,
1399                             server.list_compress_depth);
1400 
1401         /* Load every single element of the list */
1402         while(len--) {
1403             if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
1404             dec = getDecodedObject(ele);
1405             size_t len = sdslen(dec->ptr);
1406             quicklistPushTail(o->ptr, dec->ptr, len);
1407             decrRefCount(dec);
1408             decrRefCount(ele);
1409         }
1410     } else if (rdbtype == RDB_TYPE_SET) {
1411         /* Read Set value */
1412         if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
1413 
1414         /* Use a regular set when there are too many entries. */
1415         if (len > server.set_max_intset_entries) {
1416             o = createSetObject();
1417             /* It's faster to expand the dict to the right size asap in order
1418              * to avoid rehashing */
1419             if (len > DICT_HT_INITIAL_SIZE)
1420                 dictExpand(o->ptr,len);
1421         } else {
1422             o = createIntsetObject();
1423         }
1424 
1425         /* Load every single element of the set */
1426         for (i = 0; i < len; i++) {
1427             long long llval;
1428             sds sdsele;
1429 
1430             if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
1431                 == NULL) return NULL;
1432 
1433             if (o->encoding == OBJ_ENCODING_INTSET) {
1434                 /* Fetch integer value from element. */
1435                 if (isSdsRepresentableAsLongLong(sdsele,&llval) == C_OK) {
1436                     o->ptr = intsetAdd(o->ptr,llval,NULL);
1437                 } else {
1438                     setTypeConvert(o,OBJ_ENCODING_HT);
1439                     dictExpand(o->ptr,len);
1440                 }
1441             }
1442 
1443             /* This will also be called when the set was just converted
1444              * to a regular hash table encoded set. */
1445             if (o->encoding == OBJ_ENCODING_HT) {
1446                 dictAdd((dict*)o->ptr,sdsele,NULL);
1447             } else {
1448                 sdsfree(sdsele);
1449             }
1450         }
1451     } else if (rdbtype == RDB_TYPE_ZSET_2 || rdbtype == RDB_TYPE_ZSET) {
1452         /* Read list/set value. */
1453         uint64_t zsetlen;
1454         size_t maxelelen = 0;
1455         zset *zs;
1456 
1457         if ((zsetlen = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
1458         o = createZsetObject();
1459         zs = o->ptr;
1460 
1461         if (zsetlen > DICT_HT_INITIAL_SIZE)
1462             dictExpand(zs->dict,zsetlen);
1463 
1464         /* Load every single element of the sorted set. */
1465         while(zsetlen--) {
1466             sds sdsele;
1467             double score;
1468             zskiplistNode *znode;
1469 
1470             if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
1471                 == NULL) return NULL;
1472 
1473             if (rdbtype == RDB_TYPE_ZSET_2) {
1474                 if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) return NULL;
1475             } else {
1476                 if (rdbLoadDoubleValue(rdb,&score) == -1) return NULL;
1477             }
1478 
1479             /* Don't care about integer-encoded strings. */
1480             if (sdslen(sdsele) > maxelelen) maxelelen = sdslen(sdsele);
1481 
1482             znode = zslInsert(zs->zsl,score,sdsele);
1483             dictAdd(zs->dict,sdsele,&znode->score);
1484         }
1485 
1486         /* Convert *after* loading, since sorted sets are not stored ordered. */
1487         if (zsetLength(o) <= server.zset_max_ziplist_entries &&
1488             maxelelen <= server.zset_max_ziplist_value)
1489                 zsetConvert(o,OBJ_ENCODING_ZIPLIST);
1490     } else if (rdbtype == RDB_TYPE_HASH) {
1491         uint64_t len;
1492         int ret;
1493         sds field, value;
1494 
1495         len = rdbLoadLen(rdb, NULL);
1496         if (len == RDB_LENERR) return NULL;
1497 
1498         o = createHashObject();
1499 
1500         /* Too many entries? Use a hash table. */
1501         if (len > server.hash_max_ziplist_entries)
1502             hashTypeConvert(o, OBJ_ENCODING_HT);
1503 
1504         /* Load every field and value into the ziplist */
1505         while (o->encoding == OBJ_ENCODING_ZIPLIST && len > 0) {
1506             len--;
1507             /* Load raw strings */
1508             if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
1509                 == NULL) return NULL;
1510             if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
1511                 == NULL) return NULL;
1512 
1513             /* Add pair to ziplist */
1514             o->ptr = ziplistPush(o->ptr, (unsigned char*)field,
1515                     sdslen(field), ZIPLIST_TAIL);
1516             o->ptr = ziplistPush(o->ptr, (unsigned char*)value,
1517                     sdslen(value), ZIPLIST_TAIL);
1518 
1519             /* Convert to hash table if size threshold is exceeded */
1520             if (sdslen(field) > server.hash_max_ziplist_value ||
1521                 sdslen(value) > server.hash_max_ziplist_value)
1522             {
1523                 sdsfree(field);
1524                 sdsfree(value);
1525                 hashTypeConvert(o, OBJ_ENCODING_HT);
1526                 break;
1527             }
1528             sdsfree(field);
1529             sdsfree(value);
1530         }
1531 
1532         if (o->encoding == OBJ_ENCODING_HT && len > DICT_HT_INITIAL_SIZE)
1533             dictExpand(o->ptr,len);
1534 
1535         /* Load remaining fields and values into the hash table */
1536         while (o->encoding == OBJ_ENCODING_HT && len > 0) {
1537             len--;
1538             /* Load encoded strings */
1539             if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
1540                 == NULL) return NULL;
1541             if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
1542                 == NULL) return NULL;
1543 
1544             /* Add pair to hash table */
1545             ret = dictAdd((dict*)o->ptr, field, value);
1546             if (ret == DICT_ERR) {
1547                 rdbExitReportCorruptRDB("Duplicate keys detected");
1548             }
1549         }
1550 
1551         /* All pairs should be read by now */
1552         serverAssert(len == 0);
1553     } else if (rdbtype == RDB_TYPE_LIST_QUICKLIST) {
1554         if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
1555         o = createQuicklistObject();
1556         quicklistSetOptions(o->ptr, server.list_max_ziplist_size,
1557                             server.list_compress_depth);
1558 
1559         while (len--) {
1560             unsigned char *zl =
1561                 rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL);
1562             if (zl == NULL) return NULL;
1563             quicklistAppendZiplist(o->ptr, zl);
1564         }
1565     } else if (rdbtype == RDB_TYPE_HASH_ZIPMAP  ||
1566                rdbtype == RDB_TYPE_LIST_ZIPLIST ||
1567                rdbtype == RDB_TYPE_SET_INTSET   ||
1568                rdbtype == RDB_TYPE_ZSET_ZIPLIST ||
1569                rdbtype == RDB_TYPE_HASH_ZIPLIST)
1570     {
1571         unsigned char *encoded =
1572             rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL);
1573         if (encoded == NULL) return NULL;
1574         o = createObject(OBJ_STRING,encoded); /* Obj type fixed below. */
1575 
1576         /* Fix the object encoding, and make sure to convert the encoded
1577          * data type into the base type if accordingly to the current
1578          * configuration there are too many elements in the encoded data
1579          * type. Note that we only check the length and not max element
1580          * size as this is an O(N) scan. Eventually everything will get
1581          * converted. */
1582         switch(rdbtype) {
1583             case RDB_TYPE_HASH_ZIPMAP:
1584                 /* Convert to ziplist encoded hash. This must be deprecated
1585                  * when loading dumps created by Redis 2.4 gets deprecated. */
1586                 {
1587                     unsigned char *zl = ziplistNew();
1588                     unsigned char *zi = zipmapRewind(o->ptr);
1589                     unsigned char *fstr, *vstr;
1590                     unsigned int flen, vlen;
1591                     unsigned int maxlen = 0;
1592 
1593                     while ((zi = zipmapNext(zi, &fstr, &flen, &vstr, &vlen)) != NULL) {
1594                         if (flen > maxlen) maxlen = flen;
1595                         if (vlen > maxlen) maxlen = vlen;
1596                         zl = ziplistPush(zl, fstr, flen, ZIPLIST_TAIL);
1597                         zl = ziplistPush(zl, vstr, vlen, ZIPLIST_TAIL);
1598                     }
1599 
1600                     zfree(o->ptr);
1601                     o->ptr = zl;
1602                     o->type = OBJ_HASH;
1603                     o->encoding = OBJ_ENCODING_ZIPLIST;
1604 
1605                     if (hashTypeLength(o) > server.hash_max_ziplist_entries ||
1606                         maxlen > server.hash_max_ziplist_value)
1607                     {
1608                         hashTypeConvert(o, OBJ_ENCODING_HT);
1609                     }
1610                 }
1611                 break;
1612             case RDB_TYPE_LIST_ZIPLIST:
1613                 o->type = OBJ_LIST;
1614                 o->encoding = OBJ_ENCODING_ZIPLIST;
1615                 listTypeConvert(o,OBJ_ENCODING_QUICKLIST);
1616                 break;
1617             case RDB_TYPE_SET_INTSET:
1618                 o->type = OBJ_SET;
1619                 o->encoding = OBJ_ENCODING_INTSET;
1620                 if (intsetLen(o->ptr) > server.set_max_intset_entries)
1621                     setTypeConvert(o,OBJ_ENCODING_HT);
1622                 break;
1623             case RDB_TYPE_ZSET_ZIPLIST:
1624                 o->type = OBJ_ZSET;
1625                 o->encoding = OBJ_ENCODING_ZIPLIST;
1626                 if (zsetLength(o) > server.zset_max_ziplist_entries)
1627                     zsetConvert(o,OBJ_ENCODING_SKIPLIST);
1628                 break;
1629             case RDB_TYPE_HASH_ZIPLIST:
1630                 o->type = OBJ_HASH;
1631                 o->encoding = OBJ_ENCODING_ZIPLIST;
1632                 if (hashTypeLength(o) > server.hash_max_ziplist_entries)
1633                     hashTypeConvert(o, OBJ_ENCODING_HT);
1634                 break;
1635             default:
1636                 rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
1637                 break;
1638         }
1639     } else if (rdbtype == RDB_TYPE_STREAM_LISTPACKS) {
1640         o = createStreamObject();
1641         stream *s = o->ptr;
1642         uint64_t listpacks = rdbLoadLen(rdb,NULL);
1643 
1644         while(listpacks--) {
1645             /* Get the master ID, the one we'll use as key of the radix tree
1646              * node: the entries inside the listpack itself are delta-encoded
1647              * relatively to this ID. */
1648             sds nodekey = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
1649             if (nodekey == NULL) {
1650                 rdbExitReportCorruptRDB("Stream master ID loading failed: invalid encoding or I/O error.");
1651             }
1652             if (sdslen(nodekey) != sizeof(streamID)) {
1653                 rdbExitReportCorruptRDB("Stream node key entry is not the "
1654                                         "size of a stream ID");
1655             }
1656 
1657             /* Load the listpack. */
1658             unsigned char *lp =
1659                 rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL);
1660             if (lp == NULL) return NULL;
1661             unsigned char *first = lpFirst(lp);
1662             if (first == NULL) {
1663                 /* Serialized listpacks should never be empty, since on
1664                  * deletion we should remove the radix tree key if the
1665                  * resulting listpack is empty. */
1666                 rdbExitReportCorruptRDB("Empty listpack inside stream");
1667             }
1668 
1669             /* Insert the key in the radix tree. */
1670             int retval = raxInsert(s->rax,
1671                 (unsigned char*)nodekey,sizeof(streamID),lp,NULL);
1672             sdsfree(nodekey);
1673             if (!retval)
1674                 rdbExitReportCorruptRDB("Listpack re-added with existing key");
1675         }
1676         /* Load total number of items inside the stream. */
1677         s->length = rdbLoadLen(rdb,NULL);
1678         /* Load the last entry ID. */
1679         s->last_id.ms = rdbLoadLen(rdb,NULL);
1680         s->last_id.seq = rdbLoadLen(rdb,NULL);
1681 
1682         /* Consumer groups loading */
1683         size_t cgroups_count = rdbLoadLen(rdb,NULL);
1684         while(cgroups_count--) {
1685             /* Get the consumer group name and ID. We can then create the
1686              * consumer group ASAP and populate its structure as
1687              * we read more data. */
1688             streamID cg_id;
1689             sds cgname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
1690             if (cgname == NULL) {
1691                 rdbExitReportCorruptRDB(
1692                     "Error reading the consumer group name from Stream");
1693             }
1694             cg_id.ms = rdbLoadLen(rdb,NULL);
1695             cg_id.seq = rdbLoadLen(rdb,NULL);
1696             streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id);
1697             if (cgroup == NULL)
1698                 rdbExitReportCorruptRDB("Duplicated consumer group name %s",
1699                                          cgname);
1700             sdsfree(cgname);
1701 
1702             /* Load the global PEL for this consumer group, however we'll
1703              * not yet populate the NACK structures with the message
1704              * owner, since consumers for this group and their messages will
1705              * be read as a next step. So for now leave them not resolved
1706              * and later populate it. */
1707             size_t pel_size = rdbLoadLen(rdb,NULL);
1708             while(pel_size--) {
1709                 unsigned char rawid[sizeof(streamID)];
1710                 rdbLoadRaw(rdb,rawid,sizeof(rawid));
1711                 streamNACK *nack = streamCreateNACK(NULL);
1712                 nack->delivery_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
1713                 nack->delivery_count = rdbLoadLen(rdb,NULL);
1714                 if (!raxInsert(cgroup->pel,rawid,sizeof(rawid),nack,NULL))
1715                     rdbExitReportCorruptRDB("Duplicated gobal PEL entry "
1716                                             "loading stream consumer group");
1717             }
1718 
1719             /* Now that we loaded our global PEL, we need to load the
1720              * consumers and their local PELs. */
1721             size_t consumers_num = rdbLoadLen(rdb,NULL);
1722             while(consumers_num--) {
1723                 sds cname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
1724                 if (cname == NULL) {
1725                     rdbExitReportCorruptRDB(
1726                         "Error reading the consumer name from Stream group");
1727                 }
1728                 streamConsumer *consumer = streamLookupConsumer(cgroup,cname,
1729                                            1);
1730                 sdsfree(cname);
1731                 consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
1732 
1733                 /* Load the PEL about entries owned by this specific
1734                  * consumer. */
1735                 pel_size = rdbLoadLen(rdb,NULL);
1736                 while(pel_size--) {
1737                     unsigned char rawid[sizeof(streamID)];
1738                     rdbLoadRaw(rdb,rawid,sizeof(rawid));
1739                     streamNACK *nack = raxFind(cgroup->pel,rawid,sizeof(rawid));
1740                     if (nack == raxNotFound)
1741                         rdbExitReportCorruptRDB("Consumer entry not found in "
1742                                                 "group global PEL");
1743 
1744                     /* Set the NACK consumer, that was left to NULL when
1745                      * loading the global PEL. Then set the same shared
1746                      * NACK structure also in the consumer-specific PEL. */
1747                     nack->consumer = consumer;
1748                     if (!raxInsert(consumer->pel,rawid,sizeof(rawid),nack,NULL))
1749                         rdbExitReportCorruptRDB("Duplicated consumer PEL entry "
1750                                                 " loading a stream consumer "
1751                                                 "group");
1752                 }
1753             }
1754         }
1755     } else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) {
1756         uint64_t moduleid = rdbLoadLen(rdb,NULL);
1757         moduleType *mt = moduleTypeLookupModuleByID(moduleid);
1758         char name[10];
1759 
1760         if (rdbCheckMode && rdbtype == RDB_TYPE_MODULE_2) {
1761             moduleTypeNameByID(name,moduleid);
1762             return rdbLoadCheckModuleValue(rdb,name);
1763         }
1764 
1765         if (mt == NULL) {
1766             moduleTypeNameByID(name,moduleid);
1767             serverLog(LL_WARNING,"The RDB file contains module data I can't load: no matching module '%s'", name);
1768             exit(1);
1769         }
1770         RedisModuleIO io;
1771         moduleInitIOContext(io,mt,rdb,key);
1772         io.ver = (rdbtype == RDB_TYPE_MODULE) ? 1 : 2;
1773         /* Call the rdb_load method of the module providing the 10 bit
1774          * encoding version in the lower 10 bits of the module ID. */
1775         void *ptr = mt->rdb_load(&io,moduleid&1023);
1776         if (io.ctx) {
1777             moduleFreeContext(io.ctx);
1778             zfree(io.ctx);
1779         }
1780 
1781         /* Module v2 serialization has an EOF mark at the end. */
1782         if (io.ver == 2) {
1783             uint64_t eof = rdbLoadLen(rdb,NULL);
1784             if (eof != RDB_MODULE_OPCODE_EOF) {
1785                 serverLog(LL_WARNING,"The RDB file contains module data for the module '%s' that is not terminated by the proper module value EOF marker", name);
1786                 exit(1);
1787             }
1788         }
1789 
1790         if (ptr == NULL) {
1791             moduleTypeNameByID(name,moduleid);
1792             serverLog(LL_WARNING,"The RDB file contains module data for the module type '%s', that the responsible module is not able to load. Check for modules log above for additional clues.", name);
1793             exit(1);
1794         }
1795         o = createModuleObject(mt,ptr);
1796     } else {
1797         rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
1798     }
1799     return o;
1800 }
1801 
1802 /* Mark that we are loading in the global state and setup the fields
1803  * needed to provide loading stats. */
startLoading(FILE * fp)1804 void startLoading(FILE *fp) {
1805     struct stat sb;
1806 
1807     /* Load the DB */
1808     server.loading = 1;
1809     server.loading_start_time = time(NULL);
1810     server.loading_loaded_bytes = 0;
1811     if (fstat(fileno(fp), &sb) == -1) {
1812         server.loading_total_bytes = 0;
1813     } else {
1814         server.loading_total_bytes = sb.st_size;
1815     }
1816 }
1817 
1818 /* Refresh the loading progress info */
loadingProgress(off_t pos)1819 void loadingProgress(off_t pos) {
1820     server.loading_loaded_bytes = pos;
1821     if (server.stat_peak_memory < zmalloc_used_memory())
1822         server.stat_peak_memory = zmalloc_used_memory();
1823 }
1824 
1825 /* Loading finished */
stopLoading(void)1826 void stopLoading(void) {
1827     server.loading = 0;
1828 }
1829 
1830 /* Track loading progress in order to serve client's from time to time
1831    and if needed calculate rdb checksum  */
rdbLoadProgressCallback(rio * r,const void * buf,size_t len)1832 void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
1833     if (server.rdb_checksum)
1834         rioGenericUpdateChecksum(r, buf, len);
1835     if (server.loading_process_events_interval_bytes &&
1836         (r->processed_bytes + len)/server.loading_process_events_interval_bytes > r->processed_bytes/server.loading_process_events_interval_bytes)
1837     {
1838         /* The DB can take some non trivial amount of time to load. Update
1839          * our cached time since it is used to create and update the last
1840          * interaction time with clients and for other important things. */
1841         updateCachedTime();
1842         if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER)
1843             replicationSendNewlineToMaster();
1844         loadingProgress(r->processed_bytes);
1845         processEventsWhileBlocked();
1846     }
1847 }
1848 
1849 /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
1850  * otherwise C_ERR is returned and 'errno' is set accordingly. */
rdbLoadRio(rio * rdb,rdbSaveInfo * rsi,int loading_aof)1851 int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
1852     uint64_t dbid;
1853     int type, rdbver;
1854     redisDb *db = server.db+0;
1855     char buf[1024];
1856 
1857     rdb->update_cksum = rdbLoadProgressCallback;
1858     rdb->max_processing_chunk = server.loading_process_events_interval_bytes;
1859     if (rioRead(rdb,buf,9) == 0) goto eoferr;
1860     buf[9] = '\0';
1861     if (memcmp(buf,"REDIS",5) != 0) {
1862         serverLog(LL_WARNING,"Wrong signature trying to load DB from file");
1863         errno = EINVAL;
1864         return C_ERR;
1865     }
1866     rdbver = atoi(buf+5);
1867     if (rdbver < 1 || rdbver > RDB_VERSION) {
1868         serverLog(LL_WARNING,"Can't handle RDB format version %d",rdbver);
1869         errno = EINVAL;
1870         return C_ERR;
1871     }
1872 
1873     /* Key-specific attributes, set by opcodes before the key type. */
1874     long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
1875     long long lru_clock = LRU_CLOCK();
1876 
1877     while(1) {
1878         robj *key, *val;
1879 
1880         /* Read type. */
1881         if ((type = rdbLoadType(rdb)) == -1) goto eoferr;
1882 
1883         /* Handle special types. */
1884         if (type == RDB_OPCODE_EXPIRETIME) {
1885             /* EXPIRETIME: load an expire associated with the next key
1886              * to load. Note that after loading an expire we need to
1887              * load the actual type, and continue. */
1888             expiretime = rdbLoadTime(rdb);
1889             expiretime *= 1000;
1890             continue; /* Read next opcode. */
1891         } else if (type == RDB_OPCODE_EXPIRETIME_MS) {
1892             /* EXPIRETIME_MS: milliseconds precision expire times introduced
1893              * with RDB v3. Like EXPIRETIME but no with more precision. */
1894             expiretime = rdbLoadMillisecondTime(rdb,rdbver);
1895             continue; /* Read next opcode. */
1896         } else if (type == RDB_OPCODE_FREQ) {
1897             /* FREQ: LFU frequency. */
1898             uint8_t byte;
1899             if (rioRead(rdb,&byte,1) == 0) goto eoferr;
1900             lfu_freq = byte;
1901             continue; /* Read next opcode. */
1902         } else if (type == RDB_OPCODE_IDLE) {
1903             /* IDLE: LRU idle time. */
1904             uint64_t qword;
1905             if ((qword = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr;
1906             lru_idle = qword;
1907             continue; /* Read next opcode. */
1908         } else if (type == RDB_OPCODE_EOF) {
1909             /* EOF: End of file, exit the main loop. */
1910             break;
1911         } else if (type == RDB_OPCODE_SELECTDB) {
1912             /* SELECTDB: Select the specified database. */
1913             if ((dbid = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr;
1914             if (dbid >= (unsigned)server.dbnum) {
1915                 serverLog(LL_WARNING,
1916                     "FATAL: Data file was created with a Redis "
1917                     "server configured to handle more than %d "
1918                     "databases. Exiting\n", server.dbnum);
1919                 exit(1);
1920             }
1921             db = server.db+dbid;
1922             continue; /* Read next opcode. */
1923         } else if (type == RDB_OPCODE_RESIZEDB) {
1924             /* RESIZEDB: Hint about the size of the keys in the currently
1925              * selected data base, in order to avoid useless rehashing. */
1926             uint64_t db_size, expires_size;
1927             if ((db_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
1928                 goto eoferr;
1929             if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
1930                 goto eoferr;
1931             dictExpand(db->dict,db_size);
1932             dictExpand(db->expires,expires_size);
1933             continue; /* Read next opcode. */
1934         } else if (type == RDB_OPCODE_AUX) {
1935             /* AUX: generic string-string fields. Use to add state to RDB
1936              * which is backward compatible. Implementations of RDB loading
1937              * are requierd to skip AUX fields they don't understand.
1938              *
1939              * An AUX field is composed of two strings: key and value. */
1940             robj *auxkey, *auxval;
1941             if ((auxkey = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
1942             if ((auxval = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
1943 
1944             if (((char*)auxkey->ptr)[0] == '%') {
1945                 /* All the fields with a name staring with '%' are considered
1946                  * information fields and are logged at startup with a log
1947                  * level of NOTICE. */
1948                 serverLog(LL_NOTICE,"RDB '%s': %s",
1949                     (char*)auxkey->ptr,
1950                     (char*)auxval->ptr);
1951             } else if (!strcasecmp(auxkey->ptr,"repl-stream-db")) {
1952                 if (rsi) rsi->repl_stream_db = atoi(auxval->ptr);
1953             } else if (!strcasecmp(auxkey->ptr,"repl-id")) {
1954                 if (rsi && sdslen(auxval->ptr) == CONFIG_RUN_ID_SIZE) {
1955                     memcpy(rsi->repl_id,auxval->ptr,CONFIG_RUN_ID_SIZE+1);
1956                     rsi->repl_id_is_set = 1;
1957                 }
1958             } else if (!strcasecmp(auxkey->ptr,"repl-offset")) {
1959                 if (rsi) rsi->repl_offset = strtoll(auxval->ptr,NULL,10);
1960             } else if (!strcasecmp(auxkey->ptr,"lua")) {
1961                 /* Load the script back in memory. */
1962                 if (luaCreateFunction(NULL,server.lua,auxval) == NULL) {
1963                     rdbExitReportCorruptRDB(
1964                         "Can't load Lua script from RDB file! "
1965                         "BODY: %s", auxval->ptr);
1966                 }
1967             } else {
1968                 /* We ignore fields we don't understand, as by AUX field
1969                  * contract. */
1970                 serverLog(LL_DEBUG,"Unrecognized RDB AUX field: '%s'",
1971                     (char*)auxkey->ptr);
1972             }
1973 
1974             decrRefCount(auxkey);
1975             decrRefCount(auxval);
1976             continue; /* Read type again. */
1977         } else if (type == RDB_OPCODE_MODULE_AUX) {
1978             /* This is just for compatibility with the future: we have plans
1979              * to add the ability for modules to store anything in the RDB
1980              * file, like data that is not related to the Redis key space.
1981              * Such data will potentially be stored both before and after the
1982              * RDB keys-values section. For this reason since RDB version 9,
1983              * we have the ability to read a MODULE_AUX opcode followed by an
1984              * identifier of the module, and a serialized value in "MODULE V2"
1985              * format. */
1986             uint64_t moduleid = rdbLoadLen(rdb,NULL);
1987             moduleType *mt = moduleTypeLookupModuleByID(moduleid);
1988             char name[10];
1989             moduleTypeNameByID(name,moduleid);
1990 
1991             if (!rdbCheckMode && mt == NULL) {
1992                 /* Unknown module. */
1993                 serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load: no matching module '%s'", name);
1994                 exit(1);
1995             } else if (!rdbCheckMode && mt != NULL) {
1996                 /* This version of Redis actually does not know what to do
1997                  * with modules AUX data... */
1998                 serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load for the module '%s'. Probably you want to use a newer version of Redis which implements aux data callbacks", name);
1999                 exit(1);
2000             } else {
2001                 /* RDB check mode. */
2002                 robj *aux = rdbLoadCheckModuleValue(rdb,name);
2003                 decrRefCount(aux);
2004             }
2005         }
2006 
2007         /* Read key */
2008         if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
2009         /* Read value */
2010         if ((val = rdbLoadObject(type,rdb,key)) == NULL) goto eoferr;
2011         /* Check if the key already expired. This function is used when loading
2012          * an RDB file from disk, either at startup, or when an RDB was
2013          * received from the master. In the latter case, the master is
2014          * responsible for key expiry. If we would expire keys here, the
2015          * snapshot taken by the master may not be reflected on the slave. */
2016         if (server.masterhost == NULL && !loading_aof && expiretime != -1 && expiretime < now) {
2017             decrRefCount(key);
2018             decrRefCount(val);
2019         } else {
2020             /* Add the new object in the hash table */
2021             dbAdd(db,key,val);
2022 
2023             /* Set the expire time if needed */
2024             if (expiretime != -1) setExpire(NULL,db,key,expiretime);
2025 
2026             /* Set usage information (for eviction). */
2027             objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock);
2028 
2029             /* Decrement the key refcount since dbAdd() will take its
2030              * own reference. */
2031             decrRefCount(key);
2032         }
2033 
2034         /* Reset the state that is key-specified and is populated by
2035          * opcodes before the key, so that we start from scratch again. */
2036         expiretime = -1;
2037         lfu_freq = -1;
2038         lru_idle = -1;
2039     }
2040     /* Verify the checksum if RDB version is >= 5 */
2041     if (rdbver >= 5) {
2042         uint64_t cksum, expected = rdb->cksum;
2043 
2044         if (rioRead(rdb,&cksum,8) == 0) goto eoferr;
2045         if (server.rdb_checksum) {
2046             memrev64ifbe(&cksum);
2047             if (cksum == 0) {
2048                 serverLog(LL_WARNING,"RDB file was saved with checksum disabled: no check performed.");
2049             } else if (cksum != expected) {
2050                 serverLog(LL_WARNING,"Wrong RDB checksum. Aborting now.");
2051                 rdbExitReportCorruptRDB("RDB CRC error");
2052             }
2053         }
2054     }
2055     return C_OK;
2056 
2057 eoferr: /* unexpected end of file is handled here with a fatal exit */
2058     serverLog(LL_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2059     rdbExitReportCorruptRDB("Unexpected EOF reading RDB file");
2060     return C_ERR; /* Just to avoid warning */
2061 }
2062 
2063 /* Like rdbLoadRio() but takes a filename instead of a rio stream. The
2064  * filename is open for reading and a rio stream object created in order
2065  * to do the actual loading. Moreover the ETA displayed in the INFO
2066  * output is initialized and finalized.
2067  *
2068  * If you pass an 'rsi' structure initialied with RDB_SAVE_OPTION_INIT, the
2069  * loading code will fiil the information fields in the structure. */
rdbLoad(char * filename,rdbSaveInfo * rsi)2070 int rdbLoad(char *filename, rdbSaveInfo *rsi) {
2071     FILE *fp;
2072     rio rdb;
2073     int retval;
2074 
2075     if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
2076     startLoading(fp);
2077     rioInitWithFile(&rdb,fp);
2078     retval = rdbLoadRio(&rdb,rsi,0);
2079     fclose(fp);
2080     stopLoading();
2081     return retval;
2082 }
2083 
2084 /* A background saving child (BGSAVE) terminated its work. Handle this.
2085  * This function covers the case of actual BGSAVEs. */
backgroundSaveDoneHandlerDisk(int exitcode,int bysignal)2086 void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) {
2087     if (!bysignal && exitcode == 0) {
2088         serverLog(LL_NOTICE,
2089             "Background saving terminated with success");
2090         server.dirty = server.dirty - server.dirty_before_bgsave;
2091         server.lastsave = time(NULL);
2092         server.lastbgsave_status = C_OK;
2093     } else if (!bysignal && exitcode != 0) {
2094         serverLog(LL_WARNING, "Background saving error");
2095         server.lastbgsave_status = C_ERR;
2096     } else {
2097         mstime_t latency;
2098 
2099         serverLog(LL_WARNING,
2100             "Background saving terminated by signal %d", bysignal);
2101         latencyStartMonitor(latency);
2102         rdbRemoveTempFile(server.rdb_child_pid);
2103         latencyEndMonitor(latency);
2104         latencyAddSampleIfNeeded("rdb-unlink-temp-file",latency);
2105         /* SIGUSR1 is whitelisted, so we have a way to kill a child without
2106          * tirggering an error condition. */
2107         if (bysignal != SIGUSR1)
2108             server.lastbgsave_status = C_ERR;
2109     }
2110     server.rdb_child_pid = -1;
2111     server.rdb_child_type = RDB_CHILD_TYPE_NONE;
2112     server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;
2113     server.rdb_save_time_start = -1;
2114     /* Possibly there are slaves waiting for a BGSAVE in order to be served
2115      * (the first stage of SYNC is a bulk transfer of dump.rdb) */
2116     updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? C_OK : C_ERR, RDB_CHILD_TYPE_DISK);
2117 }
2118 
2119 /* A background saving child (BGSAVE) terminated its work. Handle this.
2120  * This function covers the case of RDB -> Salves socket transfers for
2121  * diskless replication. */
backgroundSaveDoneHandlerSocket(int exitcode,int bysignal)2122 void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
2123     uint64_t *ok_slaves;
2124 
2125     if (!bysignal && exitcode == 0) {
2126         serverLog(LL_NOTICE,
2127             "Background RDB transfer terminated with success");
2128     } else if (!bysignal && exitcode != 0) {
2129         serverLog(LL_WARNING, "Background transfer error");
2130     } else {
2131         serverLog(LL_WARNING,
2132             "Background transfer terminated by signal %d", bysignal);
2133     }
2134     server.rdb_child_pid = -1;
2135     server.rdb_child_type = RDB_CHILD_TYPE_NONE;
2136     server.rdb_save_time_start = -1;
2137 
2138     /* If the child returns an OK exit code, read the set of slave client
2139      * IDs and the associated status code. We'll terminate all the slaves
2140      * in error state.
2141      *
2142      * If the process returned an error, consider the list of slaves that
2143      * can continue to be empty, so that it's just a special case of the
2144      * normal code path. */
2145     ok_slaves = zmalloc(sizeof(uint64_t)); /* Make space for the count. */
2146     ok_slaves[0] = 0;
2147     if (!bysignal && exitcode == 0) {
2148         int readlen = sizeof(uint64_t);
2149 
2150         if (read(server.rdb_pipe_read_result_from_child, ok_slaves, readlen) ==
2151                  readlen)
2152         {
2153             readlen = ok_slaves[0]*sizeof(uint64_t)*2;
2154 
2155             /* Make space for enough elements as specified by the first
2156              * uint64_t element in the array. */
2157             ok_slaves = zrealloc(ok_slaves,sizeof(uint64_t)+readlen);
2158             if (readlen &&
2159                 read(server.rdb_pipe_read_result_from_child, ok_slaves+1,
2160                      readlen) != readlen)
2161             {
2162                 ok_slaves[0] = 0;
2163             }
2164         }
2165     }
2166 
2167     close(server.rdb_pipe_read_result_from_child);
2168     close(server.rdb_pipe_write_result_to_parent);
2169 
2170     /* We can continue the replication process with all the slaves that
2171      * correctly received the full payload. Others are terminated. */
2172     listNode *ln;
2173     listIter li;
2174 
2175     listRewind(server.slaves,&li);
2176     while((ln = listNext(&li))) {
2177         client *slave = ln->value;
2178 
2179         if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
2180             uint64_t j;
2181             int errorcode = 0;
2182 
2183             /* Search for the slave ID in the reply. In order for a slave to
2184              * continue the replication process, we need to find it in the list,
2185              * and it must have an error code set to 0 (which means success). */
2186             for (j = 0; j < ok_slaves[0]; j++) {
2187                 if (slave->id == ok_slaves[2*j+1]) {
2188                     errorcode = ok_slaves[2*j+2];
2189                     break; /* Found in slaves list. */
2190                 }
2191             }
2192             if (j == ok_slaves[0] || errorcode != 0) {
2193                 serverLog(LL_WARNING,
2194                 "Closing slave %s: child->slave RDB transfer failed: %s",
2195                     replicationGetSlaveName(slave),
2196                     (errorcode == 0) ? "RDB transfer child aborted"
2197                                      : strerror(errorcode));
2198                 freeClient(slave);
2199             } else {
2200                 serverLog(LL_WARNING,
2201                 "Slave %s correctly received the streamed RDB file.",
2202                     replicationGetSlaveName(slave));
2203                 /* Restore the socket as non-blocking. */
2204                 anetNonBlock(NULL,slave->fd);
2205                 anetSendTimeout(NULL,slave->fd,0);
2206             }
2207         }
2208     }
2209     zfree(ok_slaves);
2210 
2211     updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? C_OK : C_ERR, RDB_CHILD_TYPE_SOCKET);
2212 }
2213 
2214 /* When a background RDB saving/transfer terminates, call the right handler. */
backgroundSaveDoneHandler(int exitcode,int bysignal)2215 void backgroundSaveDoneHandler(int exitcode, int bysignal) {
2216     switch(server.rdb_child_type) {
2217     case RDB_CHILD_TYPE_DISK:
2218         backgroundSaveDoneHandlerDisk(exitcode,bysignal);
2219         break;
2220     case RDB_CHILD_TYPE_SOCKET:
2221         backgroundSaveDoneHandlerSocket(exitcode,bysignal);
2222         break;
2223     default:
2224         serverPanic("Unknown RDB child type.");
2225         break;
2226     }
2227 }
2228 
2229 /* Spawn an RDB child that writes the RDB to the sockets of the slaves
2230  * that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */
rdbSaveToSlavesSockets(rdbSaveInfo * rsi)2231 int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
2232     int *fds;
2233     uint64_t *clientids;
2234     int numfds;
2235     listNode *ln;
2236     listIter li;
2237     pid_t childpid;
2238     long long start;
2239     int pipefds[2];
2240 
2241     if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
2242 
2243     /* Before to fork, create a pipe that will be used in order to
2244      * send back to the parent the IDs of the slaves that successfully
2245      * received all the writes. */
2246     if (pipe(pipefds) == -1) return C_ERR;
2247     server.rdb_pipe_read_result_from_child = pipefds[0];
2248     server.rdb_pipe_write_result_to_parent = pipefds[1];
2249 
2250     /* Collect the file descriptors of the slaves we want to transfer
2251      * the RDB to, which are i WAIT_BGSAVE_START state. */
2252     fds = zmalloc(sizeof(int)*listLength(server.slaves));
2253     /* We also allocate an array of corresponding client IDs. This will
2254      * be useful for the child process in order to build the report
2255      * (sent via unix pipe) that will be sent to the parent. */
2256     clientids = zmalloc(sizeof(uint64_t)*listLength(server.slaves));
2257     numfds = 0;
2258 
2259     listRewind(server.slaves,&li);
2260     while((ln = listNext(&li))) {
2261         client *slave = ln->value;
2262 
2263         if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
2264             clientids[numfds] = slave->id;
2265             fds[numfds++] = slave->fd;
2266             replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset());
2267             /* Put the socket in blocking mode to simplify RDB transfer.
2268              * We'll restore it when the children returns (since duped socket
2269              * will share the O_NONBLOCK attribute with the parent). */
2270             anetBlock(NULL,slave->fd);
2271             anetSendTimeout(NULL,slave->fd,server.repl_timeout*1000);
2272         }
2273     }
2274 
2275     /* Create the child process. */
2276     openChildInfoPipe();
2277     start = ustime();
2278     if ((childpid = fork()) == 0) {
2279         /* Child */
2280         int retval;
2281         rio slave_sockets;
2282 
2283         rioInitWithFdset(&slave_sockets,fds,numfds);
2284         zfree(fds);
2285 
2286         closeListeningSockets(0);
2287         resetCpuAffinity("rdb2slave");
2288         redisSetProcTitle("redis-rdb-to-slaves");
2289 
2290         retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL,rsi);
2291         if (retval == C_OK && rioFlush(&slave_sockets) == 0)
2292             retval = C_ERR;
2293 
2294         if (retval == C_OK) {
2295             size_t private_dirty = zmalloc_get_private_dirty(-1);
2296 
2297             if (private_dirty) {
2298                 serverLog(LL_NOTICE,
2299                     "RDB: %zu MB of memory used by copy-on-write",
2300                     private_dirty/(1024*1024));
2301             }
2302 
2303             server.child_info_data.cow_size = private_dirty;
2304             sendChildInfo(CHILD_INFO_TYPE_RDB);
2305 
2306             /* If we are returning OK, at least one slave was served
2307              * with the RDB file as expected, so we need to send a report
2308              * to the parent via the pipe. The format of the message is:
2309              *
2310              * <len> <slave[0].id> <slave[0].error> ...
2311              *
2312              * len, slave IDs, and slave errors, are all uint64_t integers,
2313              * so basically the reply is composed of 64 bits for the len field
2314              * plus 2 additional 64 bit integers for each entry, for a total
2315              * of 'len' entries.
2316              *
2317              * The 'id' represents the slave's client ID, so that the master
2318              * can match the report with a specific slave, and 'error' is
2319              * set to 0 if the replication process terminated with a success
2320              * or the error code if an error occurred. */
2321             void *msg = zmalloc(sizeof(uint64_t)*(1+2*numfds));
2322             uint64_t *len = msg;
2323             uint64_t *ids = len+1;
2324             int j, msglen;
2325 
2326             *len = numfds;
2327             for (j = 0; j < numfds; j++) {
2328                 *ids++ = clientids[j];
2329                 *ids++ = slave_sockets.io.fdset.state[j];
2330             }
2331 
2332             /* Write the message to the parent. If we have no good slaves or
2333              * we are unable to transfer the message to the parent, we exit
2334              * with an error so that the parent will abort the replication
2335              * process with all the childre that were waiting. */
2336             msglen = sizeof(uint64_t)*(1+2*numfds);
2337             if (*len == 0 ||
2338                 write(server.rdb_pipe_write_result_to_parent,msg,msglen)
2339                 != msglen)
2340             {
2341                 retval = C_ERR;
2342             }
2343             zfree(msg);
2344         }
2345         zfree(clientids);
2346         rioFreeFdset(&slave_sockets);
2347         exitFromChild((retval == C_OK) ? 0 : 1);
2348     } else {
2349         /* Parent */
2350         if (childpid == -1) {
2351             serverLog(LL_WARNING,"Can't save in background: fork: %s",
2352                 strerror(errno));
2353 
2354             /* Undo the state change. The caller will perform cleanup on
2355              * all the slaves in BGSAVE_START state, but an early call to
2356              * replicationSetupSlaveForFullResync() turned it into BGSAVE_END */
2357             listRewind(server.slaves,&li);
2358             while((ln = listNext(&li))) {
2359                 client *slave = ln->value;
2360                 int j;
2361 
2362                 for (j = 0; j < numfds; j++) {
2363                     if (slave->id == clientids[j]) {
2364                         slave->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
2365                         break;
2366                     }
2367                 }
2368             }
2369             close(pipefds[0]);
2370             close(pipefds[1]);
2371             closeChildInfoPipe();
2372         } else {
2373             server.stat_fork_time = ustime()-start;
2374             server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
2375             latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
2376 
2377             serverLog(LL_NOTICE,"Background RDB transfer started by pid %d",
2378                 childpid);
2379             server.rdb_save_time_start = time(NULL);
2380             server.rdb_child_pid = childpid;
2381             server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;
2382             updateDictResizePolicy();
2383         }
2384         zfree(clientids);
2385         zfree(fds);
2386         return (childpid == -1) ? C_ERR : C_OK;
2387     }
2388     return C_OK; /* Unreached. */
2389 }
2390 
saveCommand(client * c)2391 void saveCommand(client *c) {
2392     if (server.rdb_child_pid != -1) {
2393         addReplyError(c,"Background save already in progress");
2394         return;
2395     }
2396     rdbSaveInfo rsi, *rsiptr;
2397     rsiptr = rdbPopulateSaveInfo(&rsi);
2398     if (rdbSave(server.rdb_filename,rsiptr) == C_OK) {
2399         addReply(c,shared.ok);
2400     } else {
2401         addReply(c,shared.err);
2402     }
2403 }
2404 
2405 /* BGSAVE [SCHEDULE] */
bgsaveCommand(client * c)2406 void bgsaveCommand(client *c) {
2407     int schedule = 0;
2408 
2409     /* The SCHEDULE option changes the behavior of BGSAVE when an AOF rewrite
2410      * is in progress. Instead of returning an error a BGSAVE gets scheduled. */
2411     if (c->argc > 1) {
2412         if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"schedule")) {
2413             schedule = 1;
2414         } else {
2415             addReply(c,shared.syntaxerr);
2416             return;
2417         }
2418     }
2419 
2420     rdbSaveInfo rsi, *rsiptr;
2421     rsiptr = rdbPopulateSaveInfo(&rsi);
2422 
2423     if (server.rdb_child_pid != -1) {
2424         addReplyError(c,"Background save already in progress");
2425     } else if (server.aof_child_pid != -1) {
2426         if (schedule) {
2427             server.rdb_bgsave_scheduled = 1;
2428             addReplyStatus(c,"Background saving scheduled");
2429         } else {
2430             addReplyError(c,
2431                 "An AOF log rewriting in progress: can't BGSAVE right now. "
2432                 "Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever "
2433                 "possible.");
2434         }
2435     } else if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK) {
2436         addReplyStatus(c,"Background saving started");
2437     } else {
2438         addReply(c,shared.err);
2439     }
2440 }
2441 
2442 /* Populate the rdbSaveInfo structure used to persist the replication
2443  * information inside the RDB file. Currently the structure explicitly
2444  * contains just the currently selected DB from the master stream, however
2445  * if the rdbSave*() family functions receive a NULL rsi structure also
2446  * the Replication ID/offset is not saved. The function popultes 'rsi'
2447  * that is normally stack-allocated in the caller, returns the populated
2448  * pointer if the instance has a valid master client, otherwise NULL
2449  * is returned, and the RDB saving will not persist any replication related
2450  * information. */
rdbPopulateSaveInfo(rdbSaveInfo * rsi)2451 rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) {
2452     rdbSaveInfo rsi_init = RDB_SAVE_INFO_INIT;
2453     *rsi = rsi_init;
2454 
2455     /* If the instance is a master, we can populate the replication info
2456      * only when repl_backlog is not NULL. If the repl_backlog is NULL,
2457      * it means that the instance isn't in any replication chains. In this
2458      * scenario the replication info is useless, because when a slave
2459      * connects to us, the NULL repl_backlog will trigger a full
2460      * synchronization, at the same time we will use a new replid and clear
2461      * replid2. */
2462     if (!server.masterhost && server.repl_backlog) {
2463         /* Note that when server.slaveseldb is -1, it means that this master
2464          * didn't apply any write commands after a full synchronization.
2465          * So we can let repl_stream_db be 0, this allows a restarted slave
2466          * to reload replication ID/offset, it's safe because the next write
2467          * command must generate a SELECT statement. */
2468         rsi->repl_stream_db = server.slaveseldb == -1 ? 0 : server.slaveseldb;
2469         return rsi;
2470     }
2471 
2472     /* If the instance is a slave we need a connected master
2473      * in order to fetch the currently selected DB. */
2474     if (server.master) {
2475         rsi->repl_stream_db = server.master->db->id;
2476         return rsi;
2477     }
2478 
2479     /* If we have a cached master we can use it in order to populate the
2480      * replication selected DB info inside the RDB file: the slave can
2481      * increment the master_repl_offset only from data arriving from the
2482      * master, so if we are disconnected the offset in the cached master
2483      * is valid. */
2484     if (server.cached_master) {
2485         rsi->repl_stream_db = server.cached_master->db->id;
2486         return rsi;
2487     }
2488     return NULL;
2489 }
2490