1 /* Background I/O service for Redis. 2 * 3 * This file implements operations that we need to perform in the background. 4 * Currently there is only a single operation, that is a background close(2) 5 * system call. This is needed as when the process is the last owner of a 6 * reference to a file closing it means unlinking it, and the deletion of the 7 * file is slow, blocking the server. 8 * 9 * In the future we'll either continue implementing new things we need or 10 * we'll switch to libeio. However there are probably long term uses for this 11 * file as we may want to put here Redis specific background tasks (for instance 12 * it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL 13 * implementation). 14 * 15 * DESIGN 16 * ------ 17 * 18 * The design is trivial, we have a structure representing a job to perform 19 * and a different thread and job queue for every job type. 20 * Every thread waits for new jobs in its queue, and process every job 21 * sequentially. 22 * 23 * Jobs of the same type are guaranteed to be processed from the least 24 * recently inserted to the most recently inserted (older jobs processed 25 * first). 26 * 27 * Currently there is no way for the creator of the job to be notified about 28 * the completion of the operation, this will only be added when/if needed. 29 * 30 * ---------------------------------------------------------------------------- 31 * 32 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> 33 * All rights reserved. 34 * 35 * Redistribution and use in source and binary forms, with or without 36 * modification, are permitted provided that the following conditions are met: 37 * 38 * * Redistributions of source code must retain the above copyright notice, 39 * this list of conditions and the following disclaimer. 40 * * Redistributions in binary form must reproduce the above copyright 41 * notice, this list of conditions and the following disclaimer in the 42 * documentation and/or other materials provided with the distribution. 43 * * Neither the name of Redis nor the names of its contributors may be used 44 * to endorse or promote products derived from this software without 45 * specific prior written permission. 46 * 47 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 48 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 49 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 50 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 51 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 52 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 53 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 54 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 55 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 56 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 57 * POSSIBILITY OF SUCH DAMAGE. 58 */ 59 60 61 #include "server.h" 62 #include "bio.h" 63 64 static pthread_t bio_threads[BIO_NUM_OPS]; 65 static pthread_mutex_t bio_mutex[BIO_NUM_OPS]; 66 static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS]; 67 static pthread_cond_t bio_step_cond[BIO_NUM_OPS]; 68 static list *bio_jobs[BIO_NUM_OPS]; 69 /* The following array is used to hold the number of pending jobs for every 70 * OP type. This allows us to export the bioPendingJobsOfType() API that is 71 * useful when the main thread wants to perform some operation that may involve 72 * objects shared with the background thread. The main thread will just wait 73 * that there are no longer jobs of this type to be executed before performing 74 * the sensible operation. This data is also useful for reporting. */ 75 static unsigned long long bio_pending[BIO_NUM_OPS]; 76 77 /* This structure represents a background Job. It is only used locally to this 78 * file as the API does not expose the internals at all. */ 79 struct bio_job { 80 time_t time; /* Time at which the job was created. */ 81 /* Job specific arguments pointers. If we need to pass more than three 82 * arguments we can just pass a pointer to a structure or alike. */ 83 void *arg1, *arg2, *arg3; 84 }; 85 86 void *bioProcessBackgroundJobs(void *arg); 87 void lazyfreeFreeObjectFromBioThread(robj *o); 88 void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2); 89 void lazyfreeFreeSlotsMapFromBioThread(zskiplist *sl); 90 91 /* Make sure we have enough stack to perform all the things we do in the 92 * main thread. */ 93 #define REDIS_THREAD_STACK_SIZE (1024*1024*4) 94 95 /* Initialize the background system, spawning the thread. */ 96 void bioInit(void) { 97 pthread_attr_t attr; 98 pthread_t thread; 99 size_t stacksize; 100 int j; 101 102 /* Initialization of state vars and objects */ 103 for (j = 0; j < BIO_NUM_OPS; j++) { 104 pthread_mutex_init(&bio_mutex[j],NULL); 105 pthread_cond_init(&bio_newjob_cond[j],NULL); 106 pthread_cond_init(&bio_step_cond[j],NULL); 107 bio_jobs[j] = listCreate(); 108 bio_pending[j] = 0; 109 } 110 111 /* Set the stack size as by default it may be small in some system */ 112 pthread_attr_init(&attr); 113 pthread_attr_getstacksize(&attr,&stacksize); 114 if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */ 115 while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; 116 pthread_attr_setstacksize(&attr, stacksize); 117 118 /* Ready to spawn our threads. We use the single argument the thread 119 * function accepts in order to pass the job ID the thread is 120 * responsible of. */ 121 for (j = 0; j < BIO_NUM_OPS; j++) { 122 void *arg = (void*)(unsigned long) j; 123 if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { 124 serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs."); 125 exit(1); 126 } 127 bio_threads[j] = thread; 128 } 129 } 130 131 void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) { 132 struct bio_job *job = zmalloc(sizeof(*job)); 133 134 job->time = time(NULL); 135 job->arg1 = arg1; 136 job->arg2 = arg2; 137 job->arg3 = arg3; 138 pthread_mutex_lock(&bio_mutex[type]); 139 listAddNodeTail(bio_jobs[type],job); 140 bio_pending[type]++; 141 pthread_cond_signal(&bio_newjob_cond[type]); 142 pthread_mutex_unlock(&bio_mutex[type]); 143 } 144 145 void *bioProcessBackgroundJobs(void *arg) { 146 struct bio_job *job; 147 unsigned long type = (unsigned long) arg; 148 sigset_t sigset; 149 150 /* Check that the type is within the right interval. */ 151 if (type >= BIO_NUM_OPS) { 152 serverLog(LL_WARNING, 153 "Warning: bio thread started with wrong type %lu",type); 154 return NULL; 155 } 156 157 /* Make the thread killable at any time, so that bioKillThreads() 158 * can work reliably. */ 159 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); 160 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); 161 162 pthread_mutex_lock(&bio_mutex[type]); 163 /* Block SIGALRM so we are sure that only the main thread will 164 * receive the watchdog signal. */ 165 sigemptyset(&sigset); 166 sigaddset(&sigset, SIGALRM); 167 if (pthread_sigmask(SIG_BLOCK, &sigset, NULL)) 168 serverLog(LL_WARNING, 169 "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno)); 170 171 while(1) { 172 listNode *ln; 173 174 /* The loop always starts with the lock hold. */ 175 if (listLength(bio_jobs[type]) == 0) { 176 pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]); 177 continue; 178 } 179 /* Pop the job from the queue. */ 180 ln = listFirst(bio_jobs[type]); 181 job = ln->value; 182 /* It is now possible to unlock the background system as we know have 183 * a stand alone job structure to process.*/ 184 pthread_mutex_unlock(&bio_mutex[type]); 185 186 /* Process the job accordingly to its type. */ 187 if (type == BIO_CLOSE_FILE) { 188 close((long)job->arg1); 189 } else if (type == BIO_AOF_FSYNC) { 190 redis_fsync((long)job->arg1); 191 } else if (type == BIO_LAZY_FREE) { 192 /* What we free changes depending on what arguments are set: 193 * arg1 -> free the object at pointer. 194 * arg2 & arg3 -> free two dictionaries (a Redis DB). 195 * only arg3 -> free the skiplist. */ 196 if (job->arg1) 197 lazyfreeFreeObjectFromBioThread(job->arg1); 198 else if (job->arg2 && job->arg3) 199 lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3); 200 else if (job->arg3) 201 lazyfreeFreeSlotsMapFromBioThread(job->arg3); 202 } else { 203 serverPanic("Wrong job type in bioProcessBackgroundJobs()."); 204 } 205 zfree(job); 206 207 /* Lock again before reiterating the loop, if there are no longer 208 * jobs to process we'll block again in pthread_cond_wait(). */ 209 pthread_mutex_lock(&bio_mutex[type]); 210 listDelNode(bio_jobs[type],ln); 211 bio_pending[type]--; 212 213 /* Unblock threads blocked on bioWaitStepOfType() if any. */ 214 pthread_cond_broadcast(&bio_step_cond[type]); 215 } 216 } 217 218 /* Return the number of pending jobs of the specified type. */ 219 unsigned long long bioPendingJobsOfType(int type) { 220 unsigned long long val; 221 pthread_mutex_lock(&bio_mutex[type]); 222 val = bio_pending[type]; 223 pthread_mutex_unlock(&bio_mutex[type]); 224 return val; 225 } 226 227 /* If there are pending jobs for the specified type, the function blocks 228 * and waits that the next job was processed. Otherwise the function 229 * does not block and returns ASAP. 230 * 231 * The function returns the number of jobs still to process of the 232 * requested type. 233 * 234 * This function is useful when from another thread, we want to wait 235 * a bio.c thread to do more work in a blocking way. 236 */ 237 unsigned long long bioWaitStepOfType(int type) { 238 unsigned long long val; 239 pthread_mutex_lock(&bio_mutex[type]); 240 val = bio_pending[type]; 241 if (val != 0) { 242 pthread_cond_wait(&bio_step_cond[type],&bio_mutex[type]); 243 val = bio_pending[type]; 244 } 245 pthread_mutex_unlock(&bio_mutex[type]); 246 return val; 247 } 248 249 /* Kill the running bio threads in an unclean way. This function should be 250 * used only when it's critical to stop the threads for some reason. 251 * Currently Redis does this only on crash (for instance on SIGSEGV) in order 252 * to perform a fast memory check without other threads messing with memory. */ 253 void bioKillThreads(void) { 254 int err, j; 255 256 for (j = 0; j < BIO_NUM_OPS; j++) { 257 if (pthread_cancel(bio_threads[j]) == 0) { 258 if ((err = pthread_join(bio_threads[j],NULL)) != 0) { 259 serverLog(LL_WARNING, 260 "Bio thread for job type #%d can be joined: %s", 261 j, strerror(err)); 262 } else { 263 serverLog(LL_WARNING, 264 "Bio thread for job type #%d terminated",j); 265 } 266 } 267 } 268 } 269