1 /* A simple event-driven programming library. Originally I wrote this code 2 * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated 3 * it in form of a library for easy reuse. 4 * 5 * Copyright (c) 2006-2010, Salvatore Sanfilippo <antirez at gmail dot com> 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions are met: 10 * 11 * * Redistributions of source code must retain the above copyright notice, 12 * this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * * Neither the name of Redis nor the names of its contributors may be used 17 * to endorse or promote products derived from this software without 18 * specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 30 * POSSIBILITY OF SUCH DAMAGE. 31 */ 32 33 #include <stdio.h> 34 #include <sys/time.h> 35 #include <sys/types.h> 36 #include <unistd.h> 37 #include <stdlib.h> 38 #include <poll.h> 39 #include <string.h> 40 #include <time.h> 41 #include <errno.h> 42 43 #include "ae.h" 44 #include "zmalloc.h" 45 #include "config.h" 46 47 /* Include the best multiplexing layer supported by this system. 48 * The following should be ordered by performances, descending. */ 49 #ifdef HAVE_FF_KQUEUE 50 #include "ae_ff_kqueue.c" 51 #else 52 #ifdef HAVE_EVPORT 53 #include "ae_evport.c" 54 #else 55 #ifdef HAVE_EPOLL 56 #include "ae_epoll.c" 57 #else 58 #ifdef HAVE_KQUEUE 59 #include "ae_kqueue.c" 60 #else 61 #include "ae_select.c" 62 #endif 63 #endif 64 #endif 65 #endif 66 67 aeEventLoop *aeCreateEventLoop(int setsize) { 68 aeEventLoop *eventLoop; 69 int i; 70 71 if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; 72 eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); 73 eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); 74 if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; 75 eventLoop->setsize = setsize; 76 eventLoop->lastTime = time(NULL); 77 eventLoop->timeEventHead = NULL; 78 eventLoop->timeEventNextId = 0; 79 eventLoop->stop = 0; 80 eventLoop->maxfd = -1; 81 eventLoop->beforesleep = NULL; 82 eventLoop->aftersleep = NULL; 83 if (aeApiCreate(eventLoop) == -1) goto err; 84 /* Events with mask == AE_NONE are not set. So let's initialize the 85 * vector with it. */ 86 for (i = 0; i < setsize; i++) 87 eventLoop->events[i].mask = AE_NONE; 88 return eventLoop; 89 90 err: 91 if (eventLoop) { 92 zfree(eventLoop->events); 93 zfree(eventLoop->fired); 94 zfree(eventLoop); 95 } 96 return NULL; 97 } 98 99 /* Return the current set size. */ 100 int aeGetSetSize(aeEventLoop *eventLoop) { 101 return eventLoop->setsize; 102 } 103 104 /* Resize the maximum set size of the event loop. 105 * If the requested set size is smaller than the current set size, but 106 * there is already a file descriptor in use that is >= the requested 107 * set size minus one, AE_ERR is returned and the operation is not 108 * performed at all. 109 * 110 * Otherwise AE_OK is returned and the operation is successful. */ 111 int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) { 112 int i; 113 114 if (setsize == eventLoop->setsize) return AE_OK; 115 if (eventLoop->maxfd >= setsize) return AE_ERR; 116 if (aeApiResize(eventLoop,setsize) == -1) return AE_ERR; 117 118 eventLoop->events = zrealloc(eventLoop->events,sizeof(aeFileEvent)*setsize); 119 eventLoop->fired = zrealloc(eventLoop->fired,sizeof(aeFiredEvent)*setsize); 120 eventLoop->setsize = setsize; 121 122 /* Make sure that if we created new slots, they are initialized with 123 * an AE_NONE mask. */ 124 for (i = eventLoop->maxfd+1; i < setsize; i++) 125 eventLoop->events[i].mask = AE_NONE; 126 return AE_OK; 127 } 128 129 void aeDeleteEventLoop(aeEventLoop *eventLoop) { 130 aeApiFree(eventLoop); 131 zfree(eventLoop->events); 132 zfree(eventLoop->fired); 133 zfree(eventLoop); 134 } 135 136 void aeStop(aeEventLoop *eventLoop) { 137 eventLoop->stop = 1; 138 } 139 140 int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, 141 aeFileProc *proc, void *clientData) 142 { 143 if (fd >= eventLoop->setsize) { 144 errno = ERANGE; 145 return AE_ERR; 146 } 147 aeFileEvent *fe = &eventLoop->events[fd]; 148 149 if (aeApiAddEvent(eventLoop, fd, mask) == -1) 150 return AE_ERR; 151 fe->mask |= mask; 152 if (mask & AE_READABLE) fe->rfileProc = proc; 153 if (mask & AE_WRITABLE) fe->wfileProc = proc; 154 fe->clientData = clientData; 155 if (fd > eventLoop->maxfd) 156 eventLoop->maxfd = fd; 157 return AE_OK; 158 } 159 160 void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) 161 { 162 if (fd >= eventLoop->setsize) return; 163 aeFileEvent *fe = &eventLoop->events[fd]; 164 if (fe->mask == AE_NONE) return; 165 166 /* We want to always remove AE_BARRIER if set when AE_WRITABLE 167 * is removed. */ 168 if (mask & AE_WRITABLE) mask |= AE_BARRIER; 169 170 aeApiDelEvent(eventLoop, fd, mask); 171 fe->mask = fe->mask & (~mask); 172 if (fd == eventLoop->maxfd && fe->mask == AE_NONE) { 173 /* Update the max fd */ 174 int j; 175 176 for (j = eventLoop->maxfd-1; j >= 0; j--) 177 if (eventLoop->events[j].mask != AE_NONE) break; 178 eventLoop->maxfd = j; 179 } 180 } 181 182 int aeGetFileEvents(aeEventLoop *eventLoop, int fd) { 183 if (fd >= eventLoop->setsize) return 0; 184 aeFileEvent *fe = &eventLoop->events[fd]; 185 186 return fe->mask; 187 } 188 189 static void aeGetTime(long *seconds, long *milliseconds) 190 { 191 struct timeval tv; 192 193 #ifdef HAVE_FF_KQUEUE 194 ff_gettimeofday(&tv, NULL); 195 #else 196 gettimeofday(&tv, NULL); 197 #endif 198 *seconds = tv.tv_sec; 199 *milliseconds = tv.tv_usec/1000; 200 } 201 202 static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) { 203 long cur_sec, cur_ms, when_sec, when_ms; 204 205 aeGetTime(&cur_sec, &cur_ms); 206 when_sec = cur_sec + milliseconds/1000; 207 when_ms = cur_ms + milliseconds%1000; 208 if (when_ms >= 1000) { 209 when_sec ++; 210 when_ms -= 1000; 211 } 212 *sec = when_sec; 213 *ms = when_ms; 214 } 215 216 long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, 217 aeTimeProc *proc, void *clientData, 218 aeEventFinalizerProc *finalizerProc) 219 { 220 long long id = eventLoop->timeEventNextId++; 221 aeTimeEvent *te; 222 223 te = zmalloc(sizeof(*te)); 224 if (te == NULL) return AE_ERR; 225 te->id = id; 226 aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms); 227 te->timeProc = proc; 228 te->finalizerProc = finalizerProc; 229 te->clientData = clientData; 230 te->prev = NULL; 231 te->next = eventLoop->timeEventHead; 232 if (te->next) 233 te->next->prev = te; 234 eventLoop->timeEventHead = te; 235 return id; 236 } 237 238 int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id) 239 { 240 aeTimeEvent *te = eventLoop->timeEventHead; 241 while(te) { 242 if (te->id == id) { 243 te->id = AE_DELETED_EVENT_ID; 244 return AE_OK; 245 } 246 te = te->next; 247 } 248 return AE_ERR; /* NO event with the specified ID found */ 249 } 250 251 /* Search the first timer to fire. 252 * This operation is useful to know how many time the select can be 253 * put in sleep without to delay any event. 254 * If there are no timers NULL is returned. 255 * 256 * Note that's O(N) since time events are unsorted. 257 * Possible optimizations (not needed by Redis so far, but...): 258 * 1) Insert the event in order, so that the nearest is just the head. 259 * Much better but still insertion or deletion of timers is O(N). 260 * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)). 261 */ 262 static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop) 263 { 264 aeTimeEvent *te = eventLoop->timeEventHead; 265 aeTimeEvent *nearest = NULL; 266 267 while(te) { 268 if (!nearest || te->when_sec < nearest->when_sec || 269 (te->when_sec == nearest->when_sec && 270 te->when_ms < nearest->when_ms)) 271 nearest = te; 272 te = te->next; 273 } 274 return nearest; 275 } 276 277 /* Process time events */ 278 static int processTimeEvents(aeEventLoop *eventLoop) { 279 int processed = 0; 280 aeTimeEvent *te; 281 long long maxId; 282 time_t now = time(NULL); 283 284 /* If the system clock is moved to the future, and then set back to the 285 * right value, time events may be delayed in a random way. Often this 286 * means that scheduled operations will not be performed soon enough. 287 * 288 * Here we try to detect system clock skews, and force all the time 289 * events to be processed ASAP when this happens: the idea is that 290 * processing events earlier is less dangerous than delaying them 291 * indefinitely, and practice suggests it is. */ 292 if (now < eventLoop->lastTime) { 293 te = eventLoop->timeEventHead; 294 while(te) { 295 te->when_sec = 0; 296 te = te->next; 297 } 298 } 299 eventLoop->lastTime = now; 300 301 te = eventLoop->timeEventHead; 302 maxId = eventLoop->timeEventNextId-1; 303 while(te) { 304 long now_sec, now_ms; 305 long long id; 306 307 /* Remove events scheduled for deletion. */ 308 if (te->id == AE_DELETED_EVENT_ID) { 309 aeTimeEvent *next = te->next; 310 if (te->prev) 311 te->prev->next = te->next; 312 else 313 eventLoop->timeEventHead = te->next; 314 if (te->next) 315 te->next->prev = te->prev; 316 if (te->finalizerProc) 317 te->finalizerProc(eventLoop, te->clientData); 318 zfree(te); 319 te = next; 320 continue; 321 } 322 323 /* Make sure we don't process time events created by time events in 324 * this iteration. Note that this check is currently useless: we always 325 * add new timers on the head, however if we change the implementation 326 * detail, this check may be useful again: we keep it here for future 327 * defense. */ 328 if (te->id > maxId) { 329 te = te->next; 330 continue; 331 } 332 aeGetTime(&now_sec, &now_ms); 333 if (now_sec > te->when_sec || 334 (now_sec == te->when_sec && now_ms >= te->when_ms)) 335 { 336 int retval; 337 338 id = te->id; 339 retval = te->timeProc(eventLoop, id, te->clientData); 340 processed++; 341 if (retval != AE_NOMORE) { 342 aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); 343 } else { 344 te->id = AE_DELETED_EVENT_ID; 345 } 346 } 347 te = te->next; 348 } 349 return processed; 350 } 351 352 /* Process every pending time event, then every pending file event 353 * (that may be registered by time event callbacks just processed). 354 * Without special flags the function sleeps until some file event 355 * fires, or when the next time event occurs (if any). 356 * 357 * If flags is 0, the function does nothing and returns. 358 * if flags has AE_ALL_EVENTS set, all the kind of events are processed. 359 * if flags has AE_FILE_EVENTS set, file events are processed. 360 * if flags has AE_TIME_EVENTS set, time events are processed. 361 * if flags has AE_DONT_WAIT set the function returns ASAP until all 362 * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called. 363 * the events that's possible to process without to wait are processed. 364 * 365 * The function returns the number of events processed. */ 366 int aeProcessEvents(aeEventLoop *eventLoop, int flags) 367 { 368 int processed = 0, numevents; 369 370 /* Nothing to do? return ASAP */ 371 if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; 372 373 /* Note that we want call select() even if there are no 374 * file events to process as long as we want to process time 375 * events, in order to sleep until the next time event is ready 376 * to fire. */ 377 if (eventLoop->maxfd != -1 || 378 ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { 379 int j; 380 aeTimeEvent *shortest = NULL; 381 struct timeval tv, *tvp; 382 383 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) 384 shortest = aeSearchNearestTimer(eventLoop); 385 if (shortest) { 386 long now_sec, now_ms; 387 388 aeGetTime(&now_sec, &now_ms); 389 tvp = &tv; 390 391 /* How many milliseconds we need to wait for the next 392 * time event to fire? */ 393 long long ms = 394 (shortest->when_sec - now_sec)*1000 + 395 shortest->when_ms - now_ms; 396 397 if (ms > 0) { 398 tvp->tv_sec = ms/1000; 399 tvp->tv_usec = (ms % 1000)*1000; 400 } else { 401 tvp->tv_sec = 0; 402 tvp->tv_usec = 0; 403 } 404 } else { 405 /* If we have to check for events but need to return 406 * ASAP because of AE_DONT_WAIT we need to set the timeout 407 * to zero */ 408 if (flags & AE_DONT_WAIT) { 409 tv.tv_sec = tv.tv_usec = 0; 410 tvp = &tv; 411 } else { 412 /* Otherwise we can block */ 413 tvp = NULL; /* wait forever */ 414 } 415 } 416 417 /* Call the multiplexing API, will return only on timeout or when 418 * some event fires. */ 419 numevents = aeApiPoll(eventLoop, tvp); 420 421 /* After sleep callback. */ 422 if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) 423 eventLoop->aftersleep(eventLoop); 424 425 for (j = 0; j < numevents; j++) { 426 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; 427 int mask = eventLoop->fired[j].mask; 428 int fd = eventLoop->fired[j].fd; 429 int fired = 0; /* Number of events fired for current fd. */ 430 431 /* Normally we execute the readable event first, and the writable 432 * event laster. This is useful as sometimes we may be able 433 * to serve the reply of a query immediately after processing the 434 * query. 435 * 436 * However if AE_BARRIER is set in the mask, our application is 437 * asking us to do the reverse: never fire the writable event 438 * after the readable. In such a case, we invert the calls. 439 * This is useful when, for instance, we want to do things 440 * in the beforeSleep() hook, like fsynching a file to disk, 441 * before replying to a client. */ 442 int invert = fe->mask & AE_BARRIER; 443 444 /* Note the "fe->mask & mask & ..." code: maybe an already 445 * processed event removed an element that fired and we still 446 * didn't processed, so we check if the event is still valid. 447 * 448 * Fire the readable event if the call sequence is not 449 * inverted. */ 450 if (!invert && fe->mask & mask & AE_READABLE) { 451 fe->rfileProc(eventLoop,fd,fe->clientData,mask); 452 fired++; 453 } 454 455 /* Fire the writable event. */ 456 if (fe->mask & mask & AE_WRITABLE) { 457 if (!fired || fe->wfileProc != fe->rfileProc) { 458 fe->wfileProc(eventLoop,fd,fe->clientData,mask); 459 fired++; 460 } 461 } 462 463 /* If we have to invert the call, fire the readable event now 464 * after the writable one. */ 465 if (invert && fe->mask & mask & AE_READABLE) { 466 if (!fired || fe->wfileProc != fe->rfileProc) { 467 fe->rfileProc(eventLoop,fd,fe->clientData,mask); 468 fired++; 469 } 470 } 471 472 processed++; 473 } 474 } 475 /* Check time events */ 476 if (flags & AE_TIME_EVENTS) 477 processed += processTimeEvents(eventLoop); 478 479 return processed; /* return the number of processed file/time events */ 480 } 481 482 /* Wait for milliseconds until the given file descriptor becomes 483 * writable/readable/exception */ 484 int aeWait(int fd, int mask, long long milliseconds) { 485 struct pollfd pfd; 486 int retmask = 0, retval; 487 488 memset(&pfd, 0, sizeof(pfd)); 489 pfd.fd = fd; 490 if (mask & AE_READABLE) pfd.events |= POLLIN; 491 if (mask & AE_WRITABLE) pfd.events |= POLLOUT; 492 493 if ((retval = poll(&pfd, 1, milliseconds))== 1) { 494 if (pfd.revents & POLLIN) retmask |= AE_READABLE; 495 if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE; 496 if (pfd.revents & POLLERR) retmask |= AE_WRITABLE; 497 if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE; 498 return retmask; 499 } else { 500 return retval; 501 } 502 } 503 504 void aeMain(aeEventLoop *eventLoop) { 505 if (eventLoop->beforesleep != NULL) 506 eventLoop->beforesleep(eventLoop); 507 aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); 508 } 509 510 char *aeGetApiName(void) { 511 return aeApiName(); 512 } 513 514 void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) { 515 eventLoop->beforesleep = beforesleep; 516 } 517 518 void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) { 519 eventLoop->aftersleep = aftersleep; 520 } 521