1 #include "debug.h"
2 #include <sys/queue.h>
3 #include <unistd.h>
4 #include <time.h>
5 #include <signal.h>
6 #include <assert.h>
7 #include <string.h>
8
9 #include "mtcp.h"
10 #include "tcp_stream.h"
11 #include "eventpoll.h"
12 #include "tcp_in.h"
13 #include "pipe.h"
14 #include "tcp_rb.h"
15 #include "config.h"
16
17 #define MAX(a, b) ((a)>(b)?(a):(b))
18 #define MIN(a, b) ((a)<(b)?(a):(b))
19
20 #define SPIN_BEFORE_SLEEP FALSE
21 #define SPIN_THRESH 10000000
22
23 /*----------------------------------------------------------------------------*/
24 char *event_str[] = {"NONE", "IN", "PRI", "OUT", "ERR", "HUP", "RDHUP"};
25 /*----------------------------------------------------------------------------*/
26 char *
EventToString(uint32_t event)27 EventToString(uint32_t event)
28 {
29 switch (event) {
30 case MOS_EPOLLNONE:
31 return event_str[0];
32 break;
33 case MOS_EPOLLIN:
34 return event_str[1];
35 break;
36 case MOS_EPOLLPRI:
37 return event_str[2];
38 break;
39 case MOS_EPOLLOUT:
40 return event_str[3];
41 break;
42 case MOS_EPOLLERR:
43 return event_str[4];
44 break;
45 case MOS_EPOLLHUP:
46 return event_str[5];
47 break;
48 case MOS_EPOLLRDHUP:
49 return event_str[6];
50 break;
51 default:
52 assert(0);
53 }
54
55 assert(0);
56 return NULL;
57 }
58 /*----------------------------------------------------------------------------*/
59 struct event_queue *
CreateEventQueue(int size)60 CreateEventQueue(int size)
61 {
62 struct event_queue *eq;
63
64 eq = (struct event_queue *)calloc(1, sizeof(struct event_queue));
65 if (!eq)
66 return NULL;
67
68 eq->start = 0;
69 eq->end = 0;
70 eq->size = size;
71 eq->events = (struct mtcp_epoll_event_int *)
72 calloc(size, sizeof(struct mtcp_epoll_event_int));
73 if (!eq->events) {
74 free(eq);
75 return NULL;
76 }
77 eq->num_events = 0;
78
79 return eq;
80 }
81 /*----------------------------------------------------------------------------*/
82 void
DestroyEventQueue(struct event_queue * eq)83 DestroyEventQueue(struct event_queue *eq)
84 {
85 if (eq->events)
86 free(eq->events);
87
88 free(eq);
89 }
90 /*----------------------------------------------------------------------------*/
91 int
mtcp_epoll_create(mctx_t mctx,int size)92 mtcp_epoll_create(mctx_t mctx, int size)
93 {
94 mtcp_manager_t mtcp = g_mtcp[mctx->cpu];
95 struct mtcp_epoll *ep;
96 socket_map_t epsocket;
97
98 if (size <= 0) {
99 errno = EINVAL;
100 return -1;
101 }
102
103 epsocket = AllocateSocket(mctx, MOS_SOCK_EPOLL);
104 if (!epsocket) {
105 errno = ENFILE;
106 return -1;
107 }
108
109 ep = (struct mtcp_epoll *)calloc(1, sizeof(struct mtcp_epoll));
110 if (!ep) {
111 FreeSocket(mctx, epsocket->id, MOS_SOCK_EPOLL);
112 return -1;
113 }
114
115 /* create event queues */
116 ep->usr_queue = CreateEventQueue(size);
117 if (!ep->usr_queue) {
118 FreeSocket(mctx, epsocket->id, FALSE);
119 free(ep);
120 return -1;
121 }
122
123 ep->usr_shadow_queue = CreateEventQueue(size);
124 if (!ep->usr_shadow_queue) {
125 DestroyEventQueue(ep->usr_queue);
126 FreeSocket(mctx, epsocket->id, FALSE);
127 free(ep);
128 return -1;
129 }
130
131 ep->mtcp_queue = CreateEventQueue(size);
132 if (!ep->mtcp_queue) {
133 DestroyEventQueue(ep->usr_shadow_queue);
134 DestroyEventQueue(ep->usr_queue);
135 FreeSocket(mctx, epsocket->id, FALSE);
136 free(ep);
137 return -1;
138 }
139
140 TRACE_EPOLL("epoll structure of size %d created.\n", size);
141
142 mtcp->ep = ep;
143 epsocket->ep = ep;
144
145 if (pthread_mutex_init(&ep->epoll_lock, NULL)) {
146 DestroyEventQueue(ep->mtcp_queue);
147 DestroyEventQueue(ep->usr_shadow_queue);
148 DestroyEventQueue(ep->usr_queue);
149 FreeSocket(mctx, epsocket->id, FALSE);
150 free(ep);
151 return -1;
152 }
153 if (pthread_cond_init(&ep->epoll_cond, NULL)) {
154 DestroyEventQueue(ep->mtcp_queue);
155 DestroyEventQueue(ep->usr_shadow_queue);
156 DestroyEventQueue(ep->usr_queue);
157 FreeSocket(mctx, epsocket->id, FALSE);
158 free(ep);
159 return -1;
160 }
161
162 return epsocket->id;
163 }
164 /*----------------------------------------------------------------------------*/
165 int
CloseEpollSocket(mctx_t mctx,int epid)166 CloseEpollSocket(mctx_t mctx, int epid)
167 {
168 mtcp_manager_t mtcp;
169 struct mtcp_epoll *ep;
170
171 mtcp = GetMTCPManager(mctx);
172 if (!mtcp) {
173 return -1;
174 }
175
176 ep = mtcp->smap[epid].ep;
177 if (!ep) {
178 errno = EINVAL;
179 return -1;
180 }
181
182 DestroyEventQueue(ep->usr_queue);
183 DestroyEventQueue(ep->usr_shadow_queue);
184 DestroyEventQueue(ep->mtcp_queue);
185
186 pthread_mutex_lock(&ep->epoll_lock);
187 mtcp->ep = NULL;
188 mtcp->smap[epid].ep = NULL;
189 pthread_cond_signal(&ep->epoll_cond);
190 pthread_mutex_unlock(&ep->epoll_lock);
191
192 pthread_cond_destroy(&ep->epoll_cond);
193 pthread_mutex_destroy(&ep->epoll_lock);
194 free(ep);
195
196 return 0;
197 }
198 /*----------------------------------------------------------------------------*/
199 static int
RaisePendingStreamEvents(mtcp_manager_t mtcp,struct mtcp_epoll * ep,socket_map_t socket)200 RaisePendingStreamEvents(mtcp_manager_t mtcp,
201 struct mtcp_epoll *ep, socket_map_t socket)
202 {
203 tcp_stream *stream = socket->stream;
204
205 if (!stream)
206 return -1;
207 if (stream->state < TCP_ST_ESTABLISHED)
208 return -1;
209
210 TRACE_EPOLL("Stream %d at state %s\n",
211 stream->id, TCPStateToString(stream));
212 /* if there are payloads already read before epoll registration */
213 /* generate read event */
214 if (socket->epoll & MOS_EPOLLIN) {
215 struct tcp_recv_vars *rcvvar = stream->rcvvar;
216 if (rcvvar->rcvbuf && tcprb_cflen(rcvvar->rcvbuf) > 0) {
217 TRACE_EPOLL("Socket %d: Has existing payloads\n", socket->id);
218 AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
219 } else if (stream->state == TCP_ST_CLOSE_WAIT) {
220 TRACE_EPOLL("Socket %d: Waiting for close\n", socket->id);
221 AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
222 }
223 }
224
225 /* same thing to the write event */
226 if (socket->epoll & MOS_EPOLLOUT) {
227 struct tcp_send_vars *sndvar = stream->sndvar;
228 if (!sndvar->sndbuf ||
229 (sndvar->sndbuf && sndvar->sndbuf->len < sndvar->snd_wnd)) {
230 if (!(socket->events & MOS_EPOLLOUT)) {
231 TRACE_EPOLL("Socket %d: Adding write event\n", socket->id);
232 AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
233 }
234 }
235 }
236
237 return 0;
238 }
239 /*----------------------------------------------------------------------------*/
240 int
mtcp_epoll_ctl(mctx_t mctx,int epid,int op,int sockid,struct mtcp_epoll_event * event)241 mtcp_epoll_ctl(mctx_t mctx, int epid,
242 int op, int sockid, struct mtcp_epoll_event *event)
243 {
244 mtcp_manager_t mtcp;
245 struct mtcp_epoll *ep;
246 socket_map_t socket;
247 uint32_t events;
248
249 mtcp = GetMTCPManager(mctx);
250 if (!mtcp) {
251 return -1;
252 }
253
254 if (epid < 0 || epid >= g_config.mos->max_concurrency) {
255 TRACE_API("Epoll id %d out of range.\n", epid);
256 errno = EBADF;
257 return -1;
258 }
259
260 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
261 TRACE_API("Socket id %d out of range.\n", sockid);
262 errno = EBADF;
263 return -1;
264 }
265
266 if (mtcp->smap[epid].socktype == MOS_SOCK_UNUSED) {
267 errno = EBADF;
268 return -1;
269 }
270
271 if (mtcp->smap[epid].socktype != MOS_SOCK_EPOLL) {
272 errno = EINVAL;
273 return -1;
274 }
275
276 ep = mtcp->smap[epid].ep;
277 if (!ep || (!event && op != MOS_EPOLL_CTL_DEL)) {
278 errno = EINVAL;
279 return -1;
280 }
281 socket = &mtcp->smap[sockid];
282
283 if (op == MOS_EPOLL_CTL_ADD) {
284 if (socket->epoll) {
285 errno = EEXIST;
286 return -1;
287 }
288
289 /* EPOLLERR and EPOLLHUP are registered as default */
290 events = event->events;
291 events |= (MOS_EPOLLERR | MOS_EPOLLHUP);
292 socket->ep_data = event->data;
293 socket->epoll = events;
294
295 TRACE_EPOLL("Adding epoll socket %d(type %d) ET: %llu, IN: %llu, OUT: %llu\n",
296 socket->id, socket->socktype,
297 (unsigned long long)socket->epoll & MOS_EPOLLET,
298 (unsigned long long)socket->epoll & MOS_EPOLLIN,
299 (unsigned long long)socket->epoll & MOS_EPOLLOUT);
300
301 if (socket->socktype == MOS_SOCK_STREAM) {
302 RaisePendingStreamEvents(mtcp, ep, socket);
303 } else if (socket->socktype == MOS_SOCK_PIPE) {
304 RaisePendingPipeEvents(mctx, epid, sockid);
305 }
306
307 } else if (op == MOS_EPOLL_CTL_MOD) {
308 if (!socket->epoll) {
309 pthread_mutex_unlock(&ep->epoll_lock);
310 errno = ENOENT;
311 return -1;
312 }
313
314 events = event->events;
315 events |= (MOS_EPOLLERR | MOS_EPOLLHUP);
316 socket->ep_data = event->data;
317 socket->epoll = events;
318
319 if (socket->socktype == MOS_SOCK_STREAM) {
320 RaisePendingStreamEvents(mtcp, ep, socket);
321 } else if (socket->socktype == MOS_SOCK_PIPE) {
322 RaisePendingPipeEvents(mctx, epid, sockid);
323 }
324
325 } else if (op == MOS_EPOLL_CTL_DEL) {
326 if (!socket->epoll) {
327 errno = ENOENT;
328 return -1;
329 }
330
331 socket->epoll = MOS_EPOLLNONE;
332 }
333
334 return 0;
335 }
336 /*----------------------------------------------------------------------------*/
337 int
mtcp_epoll_wait(mctx_t mctx,int epid,struct mtcp_epoll_event * events,int maxevents,int timeout)338 mtcp_epoll_wait(mctx_t mctx, int epid,
339 struct mtcp_epoll_event *events, int maxevents, int timeout)
340 {
341 mtcp_manager_t mtcp;
342 struct mtcp_epoll *ep;
343 struct event_queue *eq;
344 struct event_queue *eq_shadow;
345 socket_map_t event_socket;
346 int validity;
347 int i, cnt, ret;
348 int num_events;
349
350 mtcp = GetMTCPManager(mctx);
351 if (!mtcp) {
352 return -1;
353 }
354
355 if (epid < 0 || epid >= g_config.mos->max_concurrency) {
356 TRACE_API("Epoll id %d out of range.\n", epid);
357 errno = EBADF;
358 return -1;
359 }
360
361 if (mtcp->smap[epid].socktype == MOS_SOCK_UNUSED) {
362 errno = EBADF;
363 return -1;
364 }
365
366 if (mtcp->smap[epid].socktype != MOS_SOCK_EPOLL) {
367 errno = EINVAL;
368 return -1;
369 }
370
371 ep = mtcp->smap[epid].ep;
372 if (!ep || !events || maxevents <= 0) {
373 errno = EINVAL;
374 return -1;
375 }
376
377 ep->stat.calls++;
378
379 #if SPIN_BEFORE_SLEEP
380 int spin = 0;
381 while (ep->num_events == 0 && spin < SPIN_THRESH) {
382 spin++;
383 }
384 #endif /* SPIN_BEFORE_SLEEP */
385
386 if (pthread_mutex_lock(&ep->epoll_lock)) {
387 if (errno == EDEADLK)
388 perror("mtcp_epoll_wait: epoll_lock blocked\n");
389 assert(0);
390 }
391
392 wait:
393 eq = ep->usr_queue;
394 eq_shadow = ep->usr_shadow_queue;
395
396 /* wait until event occurs */
397 while (eq->num_events == 0 && eq_shadow->num_events == 0 && timeout != 0) {
398
399 #if INTR_SLEEPING_MTCP
400 /* signal to mtcp thread if it is sleeping */
401 if (mtcp->wakeup_flag && mtcp->is_sleeping) {
402 pthread_kill(mtcp->ctx->thread, SIGUSR1);
403 }
404 #endif
405 ep->stat.waits++;
406 ep->waiting = TRUE;
407 if (timeout > 0) {
408 struct timespec deadline;
409
410 clock_gettime(CLOCK_REALTIME, &deadline);
411 if (timeout >= 1000) {
412 int sec;
413 sec = timeout / 1000;
414 deadline.tv_sec += sec;
415 timeout -= sec * 1000;
416 }
417
418 deadline.tv_nsec += timeout * 1000000;
419
420 if (deadline.tv_nsec >= 1000000000) {
421 deadline.tv_sec++;
422 deadline.tv_nsec -= 1000000000;
423 }
424
425 //deadline.tv_sec = mtcp->cur_tv.tv_sec;
426 //deadline.tv_nsec = (mtcp->cur_tv.tv_usec + timeout * 1000) * 1000;
427 ret = pthread_cond_timedwait(&ep->epoll_cond,
428 &ep->epoll_lock, &deadline);
429 if (ret && ret != ETIMEDOUT) {
430 /* errno set by pthread_cond_timedwait() */
431 pthread_mutex_unlock(&ep->epoll_lock);
432 TRACE_ERROR("pthread_cond_timedwait failed. ret: %d, error: %s\n",
433 ret, strerror(errno));
434 return -1;
435 }
436 timeout = 0;
437 } else if (timeout < 0) {
438 ret = pthread_cond_wait(&ep->epoll_cond, &ep->epoll_lock);
439 if (ret) {
440 /* errno set by pthread_cond_wait() */
441 pthread_mutex_unlock(&ep->epoll_lock);
442 TRACE_ERROR("pthread_cond_wait failed. ret: %d, error: %s\n",
443 ret, strerror(errno));
444 return -1;
445 }
446 }
447 ep->waiting = FALSE;
448
449 if (mtcp->ctx->done || mtcp->ctx->exit || mtcp->ctx->interrupt) {
450 mtcp->ctx->interrupt = FALSE;
451 //ret = pthread_cond_signal(&ep->epoll_cond);
452 pthread_mutex_unlock(&ep->epoll_lock);
453 errno = EINTR;
454 return -1;
455 }
456
457 }
458
459 /* fetch events from the user event queue */
460 cnt = 0;
461 num_events = eq->num_events;
462 for (i = 0; i < num_events && cnt < maxevents; i++) {
463 event_socket = &mtcp->smap[eq->events[eq->start].sockid];
464 validity = TRUE;
465 if (event_socket->socktype == MOS_SOCK_UNUSED)
466 validity = FALSE;
467 if (!(event_socket->epoll & eq->events[eq->start].ev.events))
468 validity = FALSE;
469 if (!(event_socket->events & eq->events[eq->start].ev.events))
470 validity = FALSE;
471
472 if (validity) {
473 events[cnt++] = eq->events[eq->start].ev;
474 assert(eq->events[eq->start].sockid >= 0);
475
476 TRACE_EPOLL("Socket %d: Handled event. event: %s, "
477 "start: %u, end: %u, num: %u\n",
478 event_socket->id,
479 EventToString(eq->events[eq->start].ev.events),
480 eq->start, eq->end, eq->num_events);
481 ep->stat.handled++;
482 } else {
483 TRACE_EPOLL("Socket %d: event %s invalidated.\n",
484 eq->events[eq->start].sockid,
485 EventToString(eq->events[eq->start].ev.events));
486 ep->stat.invalidated++;
487 }
488 event_socket->events &= (~eq->events[eq->start].ev.events);
489
490 eq->start++;
491 eq->num_events--;
492 if (eq->start >= eq->size) {
493 eq->start = 0;
494 }
495 }
496
497 /* fetch eventes from user shadow event queue */
498 eq = ep->usr_shadow_queue;
499 num_events = eq->num_events;
500 for (i = 0; i < num_events && cnt < maxevents; i++) {
501 event_socket = &mtcp->smap[eq->events[eq->start].sockid];
502 validity = TRUE;
503 if (event_socket->socktype == MOS_SOCK_UNUSED)
504 validity = FALSE;
505 if (!(event_socket->epoll & eq->events[eq->start].ev.events))
506 validity = FALSE;
507 if (!(event_socket->events & eq->events[eq->start].ev.events))
508 validity = FALSE;
509
510 if (validity) {
511 events[cnt++] = eq->events[eq->start].ev;
512 assert(eq->events[eq->start].sockid >= 0);
513
514 TRACE_EPOLL("Socket %d: Handled event. event: %s, "
515 "start: %u, end: %u, num: %u\n",
516 event_socket->id,
517 EventToString(eq->events[eq->start].ev.events),
518 eq->start, eq->end, eq->num_events);
519 ep->stat.handled++;
520 } else {
521 TRACE_EPOLL("Socket %d: event %s invalidated.\n",
522 eq->events[eq->start].sockid,
523 EventToString(eq->events[eq->start].ev.events));
524 ep->stat.invalidated++;
525 }
526 event_socket->events &= (~eq->events[eq->start].ev.events);
527
528 eq->start++;
529 eq->num_events--;
530 if (eq->start >= eq->size) {
531 eq->start = 0;
532 }
533 }
534
535 if (cnt == 0 && timeout != 0)
536 goto wait;
537
538 pthread_mutex_unlock(&ep->epoll_lock);
539
540 return cnt;
541 }
542 /*----------------------------------------------------------------------------*/
543 inline int
AddEpollEvent(struct mtcp_epoll * ep,int queue_type,socket_map_t socket,uint32_t event)544 AddEpollEvent(struct mtcp_epoll *ep,
545 int queue_type, socket_map_t socket, uint32_t event)
546 {
547 #ifdef DBGMSG
548 __PREPARE_DBGLOGGING();
549 #endif
550 struct event_queue *eq;
551 int index;
552
553 if (!ep || !socket || !event)
554 return -1;
555
556 ep->stat.issued++;
557
558 if (socket->events & event) {
559 return 0;
560 }
561
562 if (queue_type == MOS_EVENT_QUEUE) {
563 eq = ep->mtcp_queue;
564 } else if (queue_type == USR_EVENT_QUEUE) {
565 eq = ep->usr_queue;
566 pthread_mutex_lock(&ep->epoll_lock);
567 } else if (queue_type == USR_SHADOW_EVENT_QUEUE) {
568 eq = ep->usr_shadow_queue;
569 } else {
570 TRACE_ERROR("Non-existing event queue type!\n");
571 return -1;
572 }
573
574 if (eq->num_events >= eq->size) {
575 TRACE_ERROR("Exceeded epoll event queue! num_events: %d, size: %d\n",
576 eq->num_events, eq->size);
577 if (queue_type == USR_EVENT_QUEUE)
578 pthread_mutex_unlock(&ep->epoll_lock);
579 return -1;
580 }
581
582 index = eq->end++;
583
584 socket->events |= event;
585 eq->events[index].sockid = socket->id;
586 eq->events[index].ev.events = event;
587 eq->events[index].ev.data = socket->ep_data;
588
589 if (eq->end >= eq->size) {
590 eq->end = 0;
591 }
592 eq->num_events++;
593
594 TRACE_EPOLL("Socket %d New event: %s, start: %u, end: %u, num: %u\n",
595 eq->events[index].sockid,
596 EventToString(eq->events[index].ev.events),
597 eq->start, eq->end, eq->num_events);
598
599 if (queue_type == USR_EVENT_QUEUE)
600 pthread_mutex_unlock(&ep->epoll_lock);
601
602 ep->stat.registered++;
603
604 return 0;
605 }
606