1 #include "server.h"
2 #include "bio.h"
3 #include "atomicvar.h"
4 #include "cluster.h"
5
6 static size_t lazyfree_objects = 0;
7 pthread_mutex_t lazyfree_objects_mutex = PTHREAD_MUTEX_INITIALIZER;
8
9 /* Return the number of currently pending objects to free. */
lazyfreeGetPendingObjectsCount(void)10 size_t lazyfreeGetPendingObjectsCount(void) {
11 size_t aux;
12 atomicGet(lazyfree_objects,aux);
13 return aux;
14 }
15
16 /* Return the amount of work needed in order to free an object.
17 * The return value is not always the actual number of allocations the
18 * object is compoesd of, but a number proportional to it.
19 *
20 * For strings the function always returns 1.
21 *
22 * For aggregated objects represented by hash tables or other data structures
23 * the function just returns the number of elements the object is composed of.
24 *
25 * Objects composed of single allocations are always reported as having a
26 * single item even if they are actually logical composed of multiple
27 * elements.
28 *
29 * For lists the function returns the number of elements in the quicklist
30 * representing the list. */
lazyfreeGetFreeEffort(robj * obj)31 size_t lazyfreeGetFreeEffort(robj *obj) {
32 if (obj->type == OBJ_LIST) {
33 quicklist *ql = obj->ptr;
34 return ql->len;
35 } else if (obj->type == OBJ_SET && obj->encoding == OBJ_ENCODING_HT) {
36 dict *ht = obj->ptr;
37 return dictSize(ht);
38 } else if (obj->type == OBJ_ZSET && obj->encoding == OBJ_ENCODING_SKIPLIST){
39 zset *zs = obj->ptr;
40 return zs->zsl->length;
41 } else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HT) {
42 dict *ht = obj->ptr;
43 return dictSize(ht);
44 } else {
45 return 1; /* Everything else is a single allocation. */
46 }
47 }
48
49 /* Delete a key, value, and associated expiration entry if any, from the DB.
50 * If there are enough allocations to free the value object may be put into
51 * a lazy free list instead of being freed synchronously. The lazy free list
52 * will be reclaimed in a different bio.c thread. */
53 #define LAZYFREE_THRESHOLD 64
dbAsyncDelete(redisDb * db,robj * key)54 int dbAsyncDelete(redisDb *db, robj *key) {
55 /* Deleting an entry from the expires dict will not free the sds of
56 * the key, because it is shared with the main dictionary. */
57 if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
58
59 /* If the value is composed of a few allocations, to free in a lazy way
60 * is actually just slower... So under a certain limit we just free
61 * the object synchronously. */
62 dictEntry *de = dictUnlink(db->dict,key->ptr);
63 if (de) {
64 robj *val = dictGetVal(de);
65 size_t free_effort = lazyfreeGetFreeEffort(val);
66
67 /* If releasing the object is too much work, do it in the background
68 * by adding the object to the lazy free list.
69 * Note that if the object is shared, to reclaim it now it is not
70 * possible. This rarely happens, however sometimes the implementation
71 * of parts of the Redis core may call incrRefCount() to protect
72 * objects, and then call dbDelete(). In this case we'll fall
73 * through and reach the dictFreeUnlinkedEntry() call, that will be
74 * equivalent to just calling decrRefCount(). */
75 if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
76 atomicIncr(lazyfree_objects,1);
77 bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
78 dictSetVal(db->dict,de,NULL);
79 }
80 }
81
82 /* Release the key-val pair, or just the key if we set the val
83 * field to NULL in order to lazy free it later. */
84 if (de) {
85 dictFreeUnlinkedEntry(db->dict,de);
86 if (server.cluster_enabled) slotToKeyDel(key);
87 return 1;
88 } else {
89 return 0;
90 }
91 }
92
93 /* Free an object, if the object is huge enough, free it in async way. */
freeObjAsync(robj * o)94 void freeObjAsync(robj *o) {
95 size_t free_effort = lazyfreeGetFreeEffort(o);
96 if (free_effort > LAZYFREE_THRESHOLD && o->refcount == 1) {
97 atomicIncr(lazyfree_objects,1);
98 bioCreateBackgroundJob(BIO_LAZY_FREE,o,NULL,NULL);
99 } else {
100 decrRefCount(o);
101 }
102 }
103
104 /* Empty a Redis DB asynchronously. What the function does actually is to
105 * create a new empty set of hash tables and scheduling the old ones for
106 * lazy freeing. */
emptyDbAsync(redisDb * db)107 void emptyDbAsync(redisDb *db) {
108 dict *oldht1 = db->dict, *oldht2 = db->expires;
109 db->dict = dictCreate(&dbDictType,NULL);
110 db->expires = dictCreate(&keyptrDictType,NULL);
111 atomicIncr(lazyfree_objects,dictSize(oldht1));
112 bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2);
113 }
114
115 /* Empty the slots-keys map of Redis CLuster by creating a new empty one
116 * and scheduiling the old for lazy freeing. */
slotToKeyFlushAsync(void)117 void slotToKeyFlushAsync(void) {
118 rax *old = server.cluster->slots_to_keys;
119
120 server.cluster->slots_to_keys = raxNew();
121 memset(server.cluster->slots_keys_count,0,
122 sizeof(server.cluster->slots_keys_count));
123 atomicIncr(lazyfree_objects,old->numele);
124 bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,NULL,old);
125 }
126
127 /* Release objects from the lazyfree thread. It's just decrRefCount()
128 * updating the count of objects to release. */
lazyfreeFreeObjectFromBioThread(robj * o)129 void lazyfreeFreeObjectFromBioThread(robj *o) {
130 decrRefCount(o);
131 atomicDecr(lazyfree_objects,1);
132 }
133
134 /* Release a database from the lazyfree thread. The 'db' pointer is the
135 * database which was substitutied with a fresh one in the main thread
136 * when the database was logically deleted. 'sl' is a skiplist used by
137 * Redis Cluster in order to take the hash slots -> keys mapping. This
138 * may be NULL if Redis Cluster is disabled. */
lazyfreeFreeDatabaseFromBioThread(dict * ht1,dict * ht2)139 void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) {
140 size_t numkeys = dictSize(ht1);
141 dictRelease(ht1);
142 dictRelease(ht2);
143 atomicDecr(lazyfree_objects,numkeys);
144 }
145
146 /* Release the skiplist mapping Redis Cluster keys to slots in the
147 * lazyfree thread. */
lazyfreeFreeSlotsMapFromBioThread(rax * rt)148 void lazyfreeFreeSlotsMapFromBioThread(rax *rt) {
149 size_t len = rt->numele;
150 raxFree(rt);
151 atomicDecr(lazyfree_objects,len);
152 }
153