xref: /f-stack/app/redis-5.0.5/src/ae.c (revision 572c4311)
1 /* A simple event-driven programming library. Originally I wrote this code
2  * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated
3  * it in form of a library for easy reuse.
4  *
5  * Copyright (c) 2006-2010, Salvatore Sanfilippo <antirez at gmail dot com>
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  *   * Redistributions of source code must retain the above copyright notice,
12  *     this list of conditions and the following disclaimer.
13  *   * Redistributions in binary form must reproduce the above copyright
14  *     notice, this list of conditions and the following disclaimer in the
15  *     documentation and/or other materials provided with the distribution.
16  *   * Neither the name of Redis nor the names of its contributors may be used
17  *     to endorse or promote products derived from this software without
18  *     specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
24  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30  * POSSIBILITY OF SUCH DAMAGE.
31  */
32 
33 #include <stdio.h>
34 #include <sys/time.h>
35 #include <sys/types.h>
36 #include <unistd.h>
37 #include <stdlib.h>
38 #include <poll.h>
39 #include <string.h>
40 #include <time.h>
41 #include <errno.h>
42 
43 #include "ae.h"
44 #include "zmalloc.h"
45 #include "config.h"
46 
47 /* Include the best multiplexing layer supported by this system.
48  * The following should be ordered by performances, descending. */
49  #ifdef HAVE_FF_KQUEUE
50 #include "ae_ff_kqueue.c"
51 #else
52     #ifdef HAVE_EVPORT
53     #include "ae_evport.c"
54     #else
55         #ifdef HAVE_EPOLL
56         #include "ae_epoll.c"
57         #else
58             #ifdef HAVE_KQUEUE
59             #include "ae_kqueue.c"
60             #else
61             #include "ae_select.c"
62             #endif
63         #endif
64     #endif
65 #endif
66 
aeCreateEventLoop(int setsize)67 aeEventLoop *aeCreateEventLoop(int setsize) {
68     aeEventLoop *eventLoop;
69     int i;
70 
71     if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
72     eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
73     eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
74     if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
75     eventLoop->setsize = setsize;
76     eventLoop->lastTime = time(NULL);
77     eventLoop->timeEventHead = NULL;
78     eventLoop->timeEventNextId = 0;
79     eventLoop->stop = 0;
80     eventLoop->maxfd = -1;
81     eventLoop->beforesleep = NULL;
82     eventLoop->aftersleep = NULL;
83     if (aeApiCreate(eventLoop) == -1) goto err;
84     /* Events with mask == AE_NONE are not set. So let's initialize the
85      * vector with it. */
86     for (i = 0; i < setsize; i++)
87         eventLoop->events[i].mask = AE_NONE;
88     return eventLoop;
89 
90 err:
91     if (eventLoop) {
92         zfree(eventLoop->events);
93         zfree(eventLoop->fired);
94         zfree(eventLoop);
95     }
96     return NULL;
97 }
98 
99 /* Return the current set size. */
aeGetSetSize(aeEventLoop * eventLoop)100 int aeGetSetSize(aeEventLoop *eventLoop) {
101     return eventLoop->setsize;
102 }
103 
104 /* Resize the maximum set size of the event loop.
105  * If the requested set size is smaller than the current set size, but
106  * there is already a file descriptor in use that is >= the requested
107  * set size minus one, AE_ERR is returned and the operation is not
108  * performed at all.
109  *
110  * Otherwise AE_OK is returned and the operation is successful. */
aeResizeSetSize(aeEventLoop * eventLoop,int setsize)111 int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) {
112     int i;
113 
114     if (setsize == eventLoop->setsize) return AE_OK;
115     if (eventLoop->maxfd >= setsize) return AE_ERR;
116     if (aeApiResize(eventLoop,setsize) == -1) return AE_ERR;
117 
118     eventLoop->events = zrealloc(eventLoop->events,sizeof(aeFileEvent)*setsize);
119     eventLoop->fired = zrealloc(eventLoop->fired,sizeof(aeFiredEvent)*setsize);
120     eventLoop->setsize = setsize;
121 
122     /* Make sure that if we created new slots, they are initialized with
123      * an AE_NONE mask. */
124     for (i = eventLoop->maxfd+1; i < setsize; i++)
125         eventLoop->events[i].mask = AE_NONE;
126     return AE_OK;
127 }
128 
aeDeleteEventLoop(aeEventLoop * eventLoop)129 void aeDeleteEventLoop(aeEventLoop *eventLoop) {
130     aeApiFree(eventLoop);
131     zfree(eventLoop->events);
132     zfree(eventLoop->fired);
133     zfree(eventLoop);
134 }
135 
aeStop(aeEventLoop * eventLoop)136 void aeStop(aeEventLoop *eventLoop) {
137     eventLoop->stop = 1;
138 }
139 
aeCreateFileEvent(aeEventLoop * eventLoop,int fd,int mask,aeFileProc * proc,void * clientData)140 int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
141         aeFileProc *proc, void *clientData)
142 {
143     if (fd >= eventLoop->setsize) {
144         errno = ERANGE;
145         return AE_ERR;
146     }
147     aeFileEvent *fe = &eventLoop->events[fd];
148 
149     if (aeApiAddEvent(eventLoop, fd, mask) == -1)
150         return AE_ERR;
151     fe->mask |= mask;
152     if (mask & AE_READABLE) fe->rfileProc = proc;
153     if (mask & AE_WRITABLE) fe->wfileProc = proc;
154     fe->clientData = clientData;
155     if (fd > eventLoop->maxfd)
156         eventLoop->maxfd = fd;
157     return AE_OK;
158 }
159 
aeDeleteFileEvent(aeEventLoop * eventLoop,int fd,int mask)160 void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
161 {
162     if (fd >= eventLoop->setsize) return;
163     aeFileEvent *fe = &eventLoop->events[fd];
164     if (fe->mask == AE_NONE) return;
165 
166     /* We want to always remove AE_BARRIER if set when AE_WRITABLE
167      * is removed. */
168     if (mask & AE_WRITABLE) mask |= AE_BARRIER;
169 
170     aeApiDelEvent(eventLoop, fd, mask);
171     fe->mask = fe->mask & (~mask);
172     if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
173         /* Update the max fd */
174         int j;
175 
176         for (j = eventLoop->maxfd-1; j >= 0; j--)
177             if (eventLoop->events[j].mask != AE_NONE) break;
178         eventLoop->maxfd = j;
179     }
180 }
181 
aeGetFileEvents(aeEventLoop * eventLoop,int fd)182 int aeGetFileEvents(aeEventLoop *eventLoop, int fd) {
183     if (fd >= eventLoop->setsize) return 0;
184     aeFileEvent *fe = &eventLoop->events[fd];
185 
186     return fe->mask;
187 }
188 
aeGetTime(long * seconds,long * milliseconds)189 static void aeGetTime(long *seconds, long *milliseconds)
190 {
191     struct timeval tv;
192 
193 #ifdef HAVE_FF_KQUEUE
194         ff_gettimeofday(&tv, NULL);
195 #else
196     gettimeofday(&tv, NULL);
197 #endif
198     *seconds = tv.tv_sec;
199     *milliseconds = tv.tv_usec/1000;
200 }
201 
aeAddMillisecondsToNow(long long milliseconds,long * sec,long * ms)202 static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) {
203     long cur_sec, cur_ms, when_sec, when_ms;
204 
205     aeGetTime(&cur_sec, &cur_ms);
206     when_sec = cur_sec + milliseconds/1000;
207     when_ms = cur_ms + milliseconds%1000;
208     if (when_ms >= 1000) {
209         when_sec ++;
210         when_ms -= 1000;
211     }
212     *sec = when_sec;
213     *ms = when_ms;
214 }
215 
aeCreateTimeEvent(aeEventLoop * eventLoop,long long milliseconds,aeTimeProc * proc,void * clientData,aeEventFinalizerProc * finalizerProc)216 long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
217         aeTimeProc *proc, void *clientData,
218         aeEventFinalizerProc *finalizerProc)
219 {
220     long long id = eventLoop->timeEventNextId++;
221     aeTimeEvent *te;
222 
223     te = zmalloc(sizeof(*te));
224     if (te == NULL) return AE_ERR;
225     te->id = id;
226     aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
227     te->timeProc = proc;
228     te->finalizerProc = finalizerProc;
229     te->clientData = clientData;
230     te->prev = NULL;
231     te->next = eventLoop->timeEventHead;
232     if (te->next)
233         te->next->prev = te;
234     eventLoop->timeEventHead = te;
235     return id;
236 }
237 
aeDeleteTimeEvent(aeEventLoop * eventLoop,long long id)238 int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
239 {
240     aeTimeEvent *te = eventLoop->timeEventHead;
241     while(te) {
242         if (te->id == id) {
243             te->id = AE_DELETED_EVENT_ID;
244             return AE_OK;
245         }
246         te = te->next;
247     }
248     return AE_ERR; /* NO event with the specified ID found */
249 }
250 
251 /* Search the first timer to fire.
252  * This operation is useful to know how many time the select can be
253  * put in sleep without to delay any event.
254  * If there are no timers NULL is returned.
255  *
256  * Note that's O(N) since time events are unsorted.
257  * Possible optimizations (not needed by Redis so far, but...):
258  * 1) Insert the event in order, so that the nearest is just the head.
259  *    Much better but still insertion or deletion of timers is O(N).
260  * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
261  */
aeSearchNearestTimer(aeEventLoop * eventLoop)262 static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
263 {
264     aeTimeEvent *te = eventLoop->timeEventHead;
265     aeTimeEvent *nearest = NULL;
266 
267     while(te) {
268         if (!nearest || te->when_sec < nearest->when_sec ||
269                 (te->when_sec == nearest->when_sec &&
270                  te->when_ms < nearest->when_ms))
271             nearest = te;
272         te = te->next;
273     }
274     return nearest;
275 }
276 
277 /* Process time events */
processTimeEvents(aeEventLoop * eventLoop)278 static int processTimeEvents(aeEventLoop *eventLoop) {
279     int processed = 0;
280     aeTimeEvent *te;
281     long long maxId;
282     time_t now = time(NULL);
283 
284     /* If the system clock is moved to the future, and then set back to the
285      * right value, time events may be delayed in a random way. Often this
286      * means that scheduled operations will not be performed soon enough.
287      *
288      * Here we try to detect system clock skews, and force all the time
289      * events to be processed ASAP when this happens: the idea is that
290      * processing events earlier is less dangerous than delaying them
291      * indefinitely, and practice suggests it is. */
292     if (now < eventLoop->lastTime) {
293         te = eventLoop->timeEventHead;
294         while(te) {
295             te->when_sec = 0;
296             te = te->next;
297         }
298     }
299     eventLoop->lastTime = now;
300 
301     te = eventLoop->timeEventHead;
302     maxId = eventLoop->timeEventNextId-1;
303     while(te) {
304         long now_sec, now_ms;
305         long long id;
306 
307         /* Remove events scheduled for deletion. */
308         if (te->id == AE_DELETED_EVENT_ID) {
309             aeTimeEvent *next = te->next;
310             if (te->prev)
311                 te->prev->next = te->next;
312             else
313                 eventLoop->timeEventHead = te->next;
314             if (te->next)
315                 te->next->prev = te->prev;
316             if (te->finalizerProc)
317                 te->finalizerProc(eventLoop, te->clientData);
318             zfree(te);
319             te = next;
320             continue;
321         }
322 
323         /* Make sure we don't process time events created by time events in
324          * this iteration. Note that this check is currently useless: we always
325          * add new timers on the head, however if we change the implementation
326          * detail, this check may be useful again: we keep it here for future
327          * defense. */
328         if (te->id > maxId) {
329             te = te->next;
330             continue;
331         }
332         aeGetTime(&now_sec, &now_ms);
333         if (now_sec > te->when_sec ||
334             (now_sec == te->when_sec && now_ms >= te->when_ms))
335         {
336             int retval;
337 
338             id = te->id;
339             retval = te->timeProc(eventLoop, id, te->clientData);
340             processed++;
341             if (retval != AE_NOMORE) {
342                 aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
343             } else {
344                 te->id = AE_DELETED_EVENT_ID;
345             }
346         }
347         te = te->next;
348     }
349     return processed;
350 }
351 
352 /* Process every pending time event, then every pending file event
353  * (that may be registered by time event callbacks just processed).
354  * Without special flags the function sleeps until some file event
355  * fires, or when the next time event occurs (if any).
356  *
357  * If flags is 0, the function does nothing and returns.
358  * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
359  * if flags has AE_FILE_EVENTS set, file events are processed.
360  * if flags has AE_TIME_EVENTS set, time events are processed.
361  * if flags has AE_DONT_WAIT set the function returns ASAP until all
362  * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
363  * the events that's possible to process without to wait are processed.
364  *
365  * The function returns the number of events processed. */
aeProcessEvents(aeEventLoop * eventLoop,int flags)366 int aeProcessEvents(aeEventLoop *eventLoop, int flags)
367 {
368     int processed = 0, numevents;
369 
370     /* Nothing to do? return ASAP */
371     if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
372 
373     /* Note that we want call select() even if there are no
374      * file events to process as long as we want to process time
375      * events, in order to sleep until the next time event is ready
376      * to fire. */
377     if (eventLoop->maxfd != -1 ||
378         ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
379         int j;
380         aeTimeEvent *shortest = NULL;
381         struct timeval tv, *tvp;
382 
383         if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
384             shortest = aeSearchNearestTimer(eventLoop);
385         if (shortest) {
386             long now_sec, now_ms;
387 
388             aeGetTime(&now_sec, &now_ms);
389             tvp = &tv;
390 
391             /* How many milliseconds we need to wait for the next
392              * time event to fire? */
393             long long ms =
394                 (shortest->when_sec - now_sec)*1000 +
395                 shortest->when_ms - now_ms;
396 
397             if (ms > 0) {
398                 tvp->tv_sec = ms/1000;
399                 tvp->tv_usec = (ms % 1000)*1000;
400             } else {
401                 tvp->tv_sec = 0;
402                 tvp->tv_usec = 0;
403             }
404         } else {
405             /* If we have to check for events but need to return
406              * ASAP because of AE_DONT_WAIT we need to set the timeout
407              * to zero */
408             if (flags & AE_DONT_WAIT) {
409                 tv.tv_sec = tv.tv_usec = 0;
410                 tvp = &tv;
411             } else {
412                 /* Otherwise we can block */
413                 tvp = NULL; /* wait forever */
414             }
415         }
416 
417         /* Call the multiplexing API, will return only on timeout or when
418          * some event fires. */
419         numevents = aeApiPoll(eventLoop, tvp);
420 
421         /* After sleep callback. */
422         if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
423             eventLoop->aftersleep(eventLoop);
424 
425         for (j = 0; j < numevents; j++) {
426             aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
427             int mask = eventLoop->fired[j].mask;
428             int fd = eventLoop->fired[j].fd;
429             int fired = 0; /* Number of events fired for current fd. */
430 
431             /* Normally we execute the readable event first, and the writable
432              * event laster. This is useful as sometimes we may be able
433              * to serve the reply of a query immediately after processing the
434              * query.
435              *
436              * However if AE_BARRIER is set in the mask, our application is
437              * asking us to do the reverse: never fire the writable event
438              * after the readable. In such a case, we invert the calls.
439              * This is useful when, for instance, we want to do things
440              * in the beforeSleep() hook, like fsynching a file to disk,
441              * before replying to a client. */
442             int invert = fe->mask & AE_BARRIER;
443 
444             /* Note the "fe->mask & mask & ..." code: maybe an already
445              * processed event removed an element that fired and we still
446              * didn't processed, so we check if the event is still valid.
447              *
448              * Fire the readable event if the call sequence is not
449              * inverted. */
450             if (!invert && fe->mask & mask & AE_READABLE) {
451                 fe->rfileProc(eventLoop,fd,fe->clientData,mask);
452                 fired++;
453             }
454 
455             /* Fire the writable event. */
456             if (fe->mask & mask & AE_WRITABLE) {
457                 if (!fired || fe->wfileProc != fe->rfileProc) {
458                     fe->wfileProc(eventLoop,fd,fe->clientData,mask);
459                     fired++;
460                 }
461             }
462 
463             /* If we have to invert the call, fire the readable event now
464              * after the writable one. */
465             if (invert && fe->mask & mask & AE_READABLE) {
466                 if (!fired || fe->wfileProc != fe->rfileProc) {
467                     fe->rfileProc(eventLoop,fd,fe->clientData,mask);
468                     fired++;
469                 }
470             }
471 
472             processed++;
473         }
474     }
475     /* Check time events */
476     if (flags & AE_TIME_EVENTS)
477         processed += processTimeEvents(eventLoop);
478 
479     return processed; /* return the number of processed file/time events */
480 }
481 
482 /* Wait for milliseconds until the given file descriptor becomes
483  * writable/readable/exception */
aeWait(int fd,int mask,long long milliseconds)484 int aeWait(int fd, int mask, long long milliseconds) {
485     struct pollfd pfd;
486     int retmask = 0, retval;
487 
488     memset(&pfd, 0, sizeof(pfd));
489     pfd.fd = fd;
490     if (mask & AE_READABLE) pfd.events |= POLLIN;
491     if (mask & AE_WRITABLE) pfd.events |= POLLOUT;
492 
493     if ((retval = poll(&pfd, 1, milliseconds))== 1) {
494         if (pfd.revents & POLLIN) retmask |= AE_READABLE;
495         if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE;
496         if (pfd.revents & POLLERR) retmask |= AE_WRITABLE;
497         if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE;
498         return retmask;
499     } else {
500         return retval;
501     }
502 }
503 
aeMain(aeEventLoop * eventLoop)504 void aeMain(aeEventLoop *eventLoop) {
505     if (eventLoop->beforesleep != NULL)
506         eventLoop->beforesleep(eventLoop);
507     aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
508 }
509 
aeGetApiName(void)510 char *aeGetApiName(void) {
511     return aeApiName();
512 }
513 
aeSetBeforeSleepProc(aeEventLoop * eventLoop,aeBeforeSleepProc * beforesleep)514 void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
515     eventLoop->beforesleep = beforesleep;
516 }
517 
aeSetAfterSleepProc(aeEventLoop * eventLoop,aeBeforeSleepProc * aftersleep)518 void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) {
519     eventLoop->aftersleep = aftersleep;
520 }
521