1 /* A simple event-driven programming library. Originally I wrote this code
2 * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated
3 * it in form of a library for easy reuse.
4 *
5 * Copyright (c) 2006-2010, Salvatore Sanfilippo <antirez at gmail dot com>
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are met:
10 *
11 * * Redistributions of source code must retain the above copyright notice,
12 * this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 * * Neither the name of Redis nor the names of its contributors may be used
17 * to endorse or promote products derived from this software without
18 * specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 * POSSIBILITY OF SUCH DAMAGE.
31 */
32
33 #include <stdio.h>
34 #include <sys/time.h>
35 #include <sys/types.h>
36 #include <unistd.h>
37 #include <stdlib.h>
38 #include <poll.h>
39 #include <string.h>
40 #include <time.h>
41 #include <errno.h>
42
43 #include "ae.h"
44 #include "zmalloc.h"
45 #include "config.h"
46
47 /* Include the best multiplexing layer supported by this system.
48 * The following should be ordered by performances, descending. */
49 #ifdef HAVE_FF_KQUEUE
50 #include "ae_ff_kqueue.c"
51 #else
52 #ifdef HAVE_EVPORT
53 #include "ae_evport.c"
54 #else
55 #ifdef HAVE_EPOLL
56 #include "ae_epoll.c"
57 #else
58 #ifdef HAVE_KQUEUE
59 #include "ae_kqueue.c"
60 #else
61 #include "ae_select.c"
62 #endif
63 #endif
64 #endif
65 #endif
66
aeCreateEventLoop(int setsize)67 aeEventLoop *aeCreateEventLoop(int setsize) {
68 aeEventLoop *eventLoop;
69 int i;
70
71 if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
72 eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
73 eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
74 if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
75 eventLoop->setsize = setsize;
76 eventLoop->lastTime = time(NULL);
77 eventLoop->timeEventHead = NULL;
78 eventLoop->timeEventNextId = 0;
79 eventLoop->stop = 0;
80 eventLoop->maxfd = -1;
81 eventLoop->beforesleep = NULL;
82 eventLoop->aftersleep = NULL;
83 if (aeApiCreate(eventLoop) == -1) goto err;
84 /* Events with mask == AE_NONE are not set. So let's initialize the
85 * vector with it. */
86 for (i = 0; i < setsize; i++)
87 eventLoop->events[i].mask = AE_NONE;
88 return eventLoop;
89
90 err:
91 if (eventLoop) {
92 zfree(eventLoop->events);
93 zfree(eventLoop->fired);
94 zfree(eventLoop);
95 }
96 return NULL;
97 }
98
99 /* Return the current set size. */
aeGetSetSize(aeEventLoop * eventLoop)100 int aeGetSetSize(aeEventLoop *eventLoop) {
101 return eventLoop->setsize;
102 }
103
104 /* Resize the maximum set size of the event loop.
105 * If the requested set size is smaller than the current set size, but
106 * there is already a file descriptor in use that is >= the requested
107 * set size minus one, AE_ERR is returned and the operation is not
108 * performed at all.
109 *
110 * Otherwise AE_OK is returned and the operation is successful. */
aeResizeSetSize(aeEventLoop * eventLoop,int setsize)111 int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) {
112 int i;
113
114 if (setsize == eventLoop->setsize) return AE_OK;
115 if (eventLoop->maxfd >= setsize) return AE_ERR;
116 if (aeApiResize(eventLoop,setsize) == -1) return AE_ERR;
117
118 eventLoop->events = zrealloc(eventLoop->events,sizeof(aeFileEvent)*setsize);
119 eventLoop->fired = zrealloc(eventLoop->fired,sizeof(aeFiredEvent)*setsize);
120 eventLoop->setsize = setsize;
121
122 /* Make sure that if we created new slots, they are initialized with
123 * an AE_NONE mask. */
124 for (i = eventLoop->maxfd+1; i < setsize; i++)
125 eventLoop->events[i].mask = AE_NONE;
126 return AE_OK;
127 }
128
aeDeleteEventLoop(aeEventLoop * eventLoop)129 void aeDeleteEventLoop(aeEventLoop *eventLoop) {
130 aeApiFree(eventLoop);
131 zfree(eventLoop->events);
132 zfree(eventLoop->fired);
133 zfree(eventLoop);
134 }
135
aeStop(aeEventLoop * eventLoop)136 void aeStop(aeEventLoop *eventLoop) {
137 eventLoop->stop = 1;
138 }
139
aeCreateFileEvent(aeEventLoop * eventLoop,int fd,int mask,aeFileProc * proc,void * clientData)140 int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
141 aeFileProc *proc, void *clientData)
142 {
143 if (fd >= eventLoop->setsize) {
144 errno = ERANGE;
145 return AE_ERR;
146 }
147 aeFileEvent *fe = &eventLoop->events[fd];
148
149 if (aeApiAddEvent(eventLoop, fd, mask) == -1)
150 return AE_ERR;
151 fe->mask |= mask;
152 if (mask & AE_READABLE) fe->rfileProc = proc;
153 if (mask & AE_WRITABLE) fe->wfileProc = proc;
154 fe->clientData = clientData;
155 if (fd > eventLoop->maxfd)
156 eventLoop->maxfd = fd;
157 return AE_OK;
158 }
159
aeDeleteFileEvent(aeEventLoop * eventLoop,int fd,int mask)160 void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
161 {
162 if (fd >= eventLoop->setsize) return;
163 aeFileEvent *fe = &eventLoop->events[fd];
164 if (fe->mask == AE_NONE) return;
165
166 /* We want to always remove AE_BARRIER if set when AE_WRITABLE
167 * is removed. */
168 if (mask & AE_WRITABLE) mask |= AE_BARRIER;
169
170 aeApiDelEvent(eventLoop, fd, mask);
171 fe->mask = fe->mask & (~mask);
172 if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
173 /* Update the max fd */
174 int j;
175
176 for (j = eventLoop->maxfd-1; j >= 0; j--)
177 if (eventLoop->events[j].mask != AE_NONE) break;
178 eventLoop->maxfd = j;
179 }
180 }
181
aeGetFileEvents(aeEventLoop * eventLoop,int fd)182 int aeGetFileEvents(aeEventLoop *eventLoop, int fd) {
183 if (fd >= eventLoop->setsize) return 0;
184 aeFileEvent *fe = &eventLoop->events[fd];
185
186 return fe->mask;
187 }
188
aeGetTime(long * seconds,long * milliseconds)189 static void aeGetTime(long *seconds, long *milliseconds)
190 {
191 struct timeval tv;
192
193 #ifdef HAVE_FF_KQUEUE
194 ff_gettimeofday(&tv, NULL);
195 #else
196 gettimeofday(&tv, NULL);
197 #endif
198 *seconds = tv.tv_sec;
199 *milliseconds = tv.tv_usec/1000;
200 }
201
aeAddMillisecondsToNow(long long milliseconds,long * sec,long * ms)202 static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) {
203 long cur_sec, cur_ms, when_sec, when_ms;
204
205 aeGetTime(&cur_sec, &cur_ms);
206 when_sec = cur_sec + milliseconds/1000;
207 when_ms = cur_ms + milliseconds%1000;
208 if (when_ms >= 1000) {
209 when_sec ++;
210 when_ms -= 1000;
211 }
212 *sec = when_sec;
213 *ms = when_ms;
214 }
215
aeCreateTimeEvent(aeEventLoop * eventLoop,long long milliseconds,aeTimeProc * proc,void * clientData,aeEventFinalizerProc * finalizerProc)216 long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
217 aeTimeProc *proc, void *clientData,
218 aeEventFinalizerProc *finalizerProc)
219 {
220 long long id = eventLoop->timeEventNextId++;
221 aeTimeEvent *te;
222
223 te = zmalloc(sizeof(*te));
224 if (te == NULL) return AE_ERR;
225 te->id = id;
226 aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
227 te->timeProc = proc;
228 te->finalizerProc = finalizerProc;
229 te->clientData = clientData;
230 te->prev = NULL;
231 te->next = eventLoop->timeEventHead;
232 if (te->next)
233 te->next->prev = te;
234 eventLoop->timeEventHead = te;
235 return id;
236 }
237
aeDeleteTimeEvent(aeEventLoop * eventLoop,long long id)238 int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
239 {
240 aeTimeEvent *te = eventLoop->timeEventHead;
241 while(te) {
242 if (te->id == id) {
243 te->id = AE_DELETED_EVENT_ID;
244 return AE_OK;
245 }
246 te = te->next;
247 }
248 return AE_ERR; /* NO event with the specified ID found */
249 }
250
251 /* Search the first timer to fire.
252 * This operation is useful to know how many time the select can be
253 * put in sleep without to delay any event.
254 * If there are no timers NULL is returned.
255 *
256 * Note that's O(N) since time events are unsorted.
257 * Possible optimizations (not needed by Redis so far, but...):
258 * 1) Insert the event in order, so that the nearest is just the head.
259 * Much better but still insertion or deletion of timers is O(N).
260 * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
261 */
aeSearchNearestTimer(aeEventLoop * eventLoop)262 static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
263 {
264 aeTimeEvent *te = eventLoop->timeEventHead;
265 aeTimeEvent *nearest = NULL;
266
267 while(te) {
268 if (!nearest || te->when_sec < nearest->when_sec ||
269 (te->when_sec == nearest->when_sec &&
270 te->when_ms < nearest->when_ms))
271 nearest = te;
272 te = te->next;
273 }
274 return nearest;
275 }
276
277 /* Process time events */
processTimeEvents(aeEventLoop * eventLoop)278 static int processTimeEvents(aeEventLoop *eventLoop) {
279 int processed = 0;
280 aeTimeEvent *te;
281 long long maxId;
282 time_t now = time(NULL);
283
284 /* If the system clock is moved to the future, and then set back to the
285 * right value, time events may be delayed in a random way. Often this
286 * means that scheduled operations will not be performed soon enough.
287 *
288 * Here we try to detect system clock skews, and force all the time
289 * events to be processed ASAP when this happens: the idea is that
290 * processing events earlier is less dangerous than delaying them
291 * indefinitely, and practice suggests it is. */
292 if (now < eventLoop->lastTime) {
293 te = eventLoop->timeEventHead;
294 while(te) {
295 te->when_sec = 0;
296 te = te->next;
297 }
298 }
299 eventLoop->lastTime = now;
300
301 te = eventLoop->timeEventHead;
302 maxId = eventLoop->timeEventNextId-1;
303 while(te) {
304 long now_sec, now_ms;
305 long long id;
306
307 /* Remove events scheduled for deletion. */
308 if (te->id == AE_DELETED_EVENT_ID) {
309 aeTimeEvent *next = te->next;
310 if (te->prev)
311 te->prev->next = te->next;
312 else
313 eventLoop->timeEventHead = te->next;
314 if (te->next)
315 te->next->prev = te->prev;
316 if (te->finalizerProc)
317 te->finalizerProc(eventLoop, te->clientData);
318 zfree(te);
319 te = next;
320 continue;
321 }
322
323 /* Make sure we don't process time events created by time events in
324 * this iteration. Note that this check is currently useless: we always
325 * add new timers on the head, however if we change the implementation
326 * detail, this check may be useful again: we keep it here for future
327 * defense. */
328 if (te->id > maxId) {
329 te = te->next;
330 continue;
331 }
332 aeGetTime(&now_sec, &now_ms);
333 if (now_sec > te->when_sec ||
334 (now_sec == te->when_sec && now_ms >= te->when_ms))
335 {
336 int retval;
337
338 id = te->id;
339 retval = te->timeProc(eventLoop, id, te->clientData);
340 processed++;
341 if (retval != AE_NOMORE) {
342 aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
343 } else {
344 te->id = AE_DELETED_EVENT_ID;
345 }
346 }
347 te = te->next;
348 }
349 return processed;
350 }
351
352 /* Process every pending time event, then every pending file event
353 * (that may be registered by time event callbacks just processed).
354 * Without special flags the function sleeps until some file event
355 * fires, or when the next time event occurs (if any).
356 *
357 * If flags is 0, the function does nothing and returns.
358 * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
359 * if flags has AE_FILE_EVENTS set, file events are processed.
360 * if flags has AE_TIME_EVENTS set, time events are processed.
361 * if flags has AE_DONT_WAIT set the function returns ASAP until all
362 * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
363 * the events that's possible to process without to wait are processed.
364 *
365 * The function returns the number of events processed. */
aeProcessEvents(aeEventLoop * eventLoop,int flags)366 int aeProcessEvents(aeEventLoop *eventLoop, int flags)
367 {
368 int processed = 0, numevents;
369
370 /* Nothing to do? return ASAP */
371 if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
372
373 /* Note that we want call select() even if there are no
374 * file events to process as long as we want to process time
375 * events, in order to sleep until the next time event is ready
376 * to fire. */
377 if (eventLoop->maxfd != -1 ||
378 ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
379 int j;
380 aeTimeEvent *shortest = NULL;
381 struct timeval tv, *tvp;
382
383 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
384 shortest = aeSearchNearestTimer(eventLoop);
385 if (shortest) {
386 long now_sec, now_ms;
387
388 aeGetTime(&now_sec, &now_ms);
389 tvp = &tv;
390
391 /* How many milliseconds we need to wait for the next
392 * time event to fire? */
393 long long ms =
394 (shortest->when_sec - now_sec)*1000 +
395 shortest->when_ms - now_ms;
396
397 if (ms > 0) {
398 tvp->tv_sec = ms/1000;
399 tvp->tv_usec = (ms % 1000)*1000;
400 } else {
401 tvp->tv_sec = 0;
402 tvp->tv_usec = 0;
403 }
404 } else {
405 /* If we have to check for events but need to return
406 * ASAP because of AE_DONT_WAIT we need to set the timeout
407 * to zero */
408 if (flags & AE_DONT_WAIT) {
409 tv.tv_sec = tv.tv_usec = 0;
410 tvp = &tv;
411 } else {
412 /* Otherwise we can block */
413 tvp = NULL; /* wait forever */
414 }
415 }
416
417 /* Call the multiplexing API, will return only on timeout or when
418 * some event fires. */
419 numevents = aeApiPoll(eventLoop, tvp);
420
421 /* After sleep callback. */
422 if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
423 eventLoop->aftersleep(eventLoop);
424
425 for (j = 0; j < numevents; j++) {
426 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
427 int mask = eventLoop->fired[j].mask;
428 int fd = eventLoop->fired[j].fd;
429 int fired = 0; /* Number of events fired for current fd. */
430
431 /* Normally we execute the readable event first, and the writable
432 * event laster. This is useful as sometimes we may be able
433 * to serve the reply of a query immediately after processing the
434 * query.
435 *
436 * However if AE_BARRIER is set in the mask, our application is
437 * asking us to do the reverse: never fire the writable event
438 * after the readable. In such a case, we invert the calls.
439 * This is useful when, for instance, we want to do things
440 * in the beforeSleep() hook, like fsynching a file to disk,
441 * before replying to a client. */
442 int invert = fe->mask & AE_BARRIER;
443
444 /* Note the "fe->mask & mask & ..." code: maybe an already
445 * processed event removed an element that fired and we still
446 * didn't processed, so we check if the event is still valid.
447 *
448 * Fire the readable event if the call sequence is not
449 * inverted. */
450 if (!invert && fe->mask & mask & AE_READABLE) {
451 fe->rfileProc(eventLoop,fd,fe->clientData,mask);
452 fired++;
453 }
454
455 /* Fire the writable event. */
456 if (fe->mask & mask & AE_WRITABLE) {
457 if (!fired || fe->wfileProc != fe->rfileProc) {
458 fe->wfileProc(eventLoop,fd,fe->clientData,mask);
459 fired++;
460 }
461 }
462
463 /* If we have to invert the call, fire the readable event now
464 * after the writable one. */
465 if (invert && fe->mask & mask & AE_READABLE) {
466 if (!fired || fe->wfileProc != fe->rfileProc) {
467 fe->rfileProc(eventLoop,fd,fe->clientData,mask);
468 fired++;
469 }
470 }
471
472 processed++;
473 }
474 }
475 /* Check time events */
476 if (flags & AE_TIME_EVENTS)
477 processed += processTimeEvents(eventLoop);
478
479 return processed; /* return the number of processed file/time events */
480 }
481
482 /* Wait for milliseconds until the given file descriptor becomes
483 * writable/readable/exception */
aeWait(int fd,int mask,long long milliseconds)484 int aeWait(int fd, int mask, long long milliseconds) {
485 struct pollfd pfd;
486 int retmask = 0, retval;
487
488 memset(&pfd, 0, sizeof(pfd));
489 pfd.fd = fd;
490 if (mask & AE_READABLE) pfd.events |= POLLIN;
491 if (mask & AE_WRITABLE) pfd.events |= POLLOUT;
492
493 if ((retval = poll(&pfd, 1, milliseconds))== 1) {
494 if (pfd.revents & POLLIN) retmask |= AE_READABLE;
495 if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE;
496 if (pfd.revents & POLLERR) retmask |= AE_WRITABLE;
497 if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE;
498 return retmask;
499 } else {
500 return retval;
501 }
502 }
503
aeMain(aeEventLoop * eventLoop)504 void aeMain(aeEventLoop *eventLoop) {
505 if (eventLoop->beforesleep != NULL)
506 eventLoop->beforesleep(eventLoop);
507 aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
508 }
509
aeGetApiName(void)510 char *aeGetApiName(void) {
511 return aeApiName();
512 }
513
aeSetBeforeSleepProc(aeEventLoop * eventLoop,aeBeforeSleepProc * beforesleep)514 void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
515 eventLoop->beforesleep = beforesleep;
516 }
517
aeSetAfterSleepProc(aeEventLoop * eventLoop,aeBeforeSleepProc * aftersleep)518 void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) {
519 eventLoop->aftersleep = aftersleep;
520 }
521