xref: /mOS-networking-stack/core/src/eventpoll.c (revision e5df9dc1)
1 #include "debug.h"
2 #include <sys/queue.h>
3 #include <unistd.h>
4 #include <time.h>
5 #include <signal.h>
6 #include <assert.h>
7 #include <string.h>
8 
9 #include "mtcp.h"
10 #include "tcp_stream.h"
11 #include "eventpoll.h"
12 #include "tcp_in.h"
13 #include "pipe.h"
14 #include "tcp_rb.h"
15 #include "config.h"
16 
17 #define MAX(a, b) ((a)>(b)?(a):(b))
18 #define MIN(a, b) ((a)<(b)?(a):(b))
19 
20 #define SPIN_BEFORE_SLEEP FALSE
21 #define SPIN_THRESH 10000000
22 
23 /*----------------------------------------------------------------------------*/
24 char *event_str[] = {"NONE", "IN", "PRI", "OUT", "ERR", "HUP", "RDHUP"};
25 /*----------------------------------------------------------------------------*/
26 char *
27 EventToString(uint32_t event)
28 {
29 	switch (event) {
30 		case MOS_EPOLLNONE:
31 			return event_str[0];
32 			break;
33 		case MOS_EPOLLIN:
34 			return event_str[1];
35 			break;
36 		case MOS_EPOLLPRI:
37 			return event_str[2];
38 			break;
39 		case MOS_EPOLLOUT:
40 			return event_str[3];
41 			break;
42 		case MOS_EPOLLERR:
43 			return event_str[4];
44 			break;
45 		case MOS_EPOLLHUP:
46 			return event_str[5];
47 			break;
48 		case MOS_EPOLLRDHUP:
49 			return event_str[6];
50 			break;
51 		default:
52 			assert(0);
53 	}
54 
55 	assert(0);
56 	return NULL;
57 }
58 /*----------------------------------------------------------------------------*/
59 struct event_queue *
60 CreateEventQueue(int size)
61 {
62 	struct event_queue *eq;
63 
64 	eq = (struct event_queue *)calloc(1, sizeof(struct event_queue));
65 	if (!eq)
66 		return NULL;
67 
68 	eq->start = 0;
69 	eq->end = 0;
70 	eq->size = size;
71 	eq->events = (struct mtcp_epoll_event_int *)
72 			calloc(size, sizeof(struct mtcp_epoll_event_int));
73 	if (!eq->events) {
74 		free(eq);
75 		return NULL;
76 	}
77 	eq->num_events = 0;
78 
79 	return eq;
80 }
81 /*----------------------------------------------------------------------------*/
82 void
83 DestroyEventQueue(struct event_queue *eq)
84 {
85 	if (eq->events)
86 		free(eq->events);
87 
88 	free(eq);
89 }
90 /*----------------------------------------------------------------------------*/
91 int
92 mtcp_epoll_create(mctx_t mctx, int size)
93 {
94 	mtcp_manager_t mtcp = g_mtcp[mctx->cpu];
95 	struct mtcp_epoll *ep;
96 	socket_map_t epsocket;
97 
98 	if (size <= 0) {
99 		errno = EINVAL;
100 		return -1;
101 	}
102 
103 	epsocket = AllocateSocket(mctx, MOS_SOCK_EPOLL);
104 	if (!epsocket) {
105 		errno = ENFILE;
106 		return -1;
107 	}
108 
109 	ep = (struct mtcp_epoll *)calloc(1, sizeof(struct mtcp_epoll));
110 	if (!ep) {
111 		FreeSocket(mctx, epsocket->id, MOS_SOCK_EPOLL);
112 		return -1;
113 	}
114 
115 	/* create event queues */
116 	ep->usr_queue = CreateEventQueue(size);
117 	if (!ep->usr_queue) {
118 		FreeSocket(mctx, epsocket->id, FALSE);
119 		free(ep);
120 		return -1;
121 	}
122 
123 	ep->usr_shadow_queue = CreateEventQueue(size);
124 	if (!ep->usr_shadow_queue) {
125 		DestroyEventQueue(ep->usr_queue);
126 		FreeSocket(mctx, epsocket->id, FALSE);
127 		free(ep);
128 		return -1;
129 	}
130 
131 	ep->mtcp_queue = CreateEventQueue(size);
132 	if (!ep->mtcp_queue) {
133 		DestroyEventQueue(ep->usr_shadow_queue);
134 		DestroyEventQueue(ep->usr_queue);
135 		FreeSocket(mctx, epsocket->id, FALSE);
136 		free(ep);
137 		return -1;
138 	}
139 
140 	TRACE_EPOLL("epoll structure of size %d created.\n", size);
141 
142 	mtcp->ep = ep;
143 	epsocket->ep = ep;
144 
145 	if (pthread_mutex_init(&ep->epoll_lock, NULL)) {
146 		DestroyEventQueue(ep->mtcp_queue);
147 		DestroyEventQueue(ep->usr_shadow_queue);
148 		DestroyEventQueue(ep->usr_queue);
149 		FreeSocket(mctx, epsocket->id, FALSE);
150 		free(ep);
151 		return -1;
152 	}
153 	if (pthread_cond_init(&ep->epoll_cond, NULL)) {
154 		DestroyEventQueue(ep->mtcp_queue);
155 		DestroyEventQueue(ep->usr_shadow_queue);
156 		DestroyEventQueue(ep->usr_queue);
157 		FreeSocket(mctx, epsocket->id, FALSE);
158 		free(ep);
159 		return -1;
160 	}
161 
162 	return epsocket->id;
163 }
164 /*----------------------------------------------------------------------------*/
165 int
166 CloseEpollSocket(mctx_t mctx, int epid)
167 {
168 	mtcp_manager_t mtcp;
169 	struct mtcp_epoll *ep;
170 
171 	mtcp = GetMTCPManager(mctx);
172 	if (!mtcp) {
173 		return -1;
174 	}
175 
176 	ep = mtcp->smap[epid].ep;
177 	if (!ep) {
178 		errno = EINVAL;
179 		return -1;
180 	}
181 
182 	DestroyEventQueue(ep->usr_queue);
183 	DestroyEventQueue(ep->usr_shadow_queue);
184 	DestroyEventQueue(ep->mtcp_queue);
185 
186 	pthread_mutex_lock(&ep->epoll_lock);
187 	mtcp->ep = NULL;
188 	mtcp->smap[epid].ep = NULL;
189 	pthread_cond_signal(&ep->epoll_cond);
190 	pthread_mutex_unlock(&ep->epoll_lock);
191 
192 	pthread_cond_destroy(&ep->epoll_cond);
193 	pthread_mutex_destroy(&ep->epoll_lock);
194 	free(ep);
195 
196 	return 0;
197 }
198 /*----------------------------------------------------------------------------*/
199 static int
200 RaisePendingStreamEvents(mtcp_manager_t mtcp,
201 		struct mtcp_epoll *ep, socket_map_t socket)
202 {
203 	tcp_stream *stream = socket->stream;
204 
205 	if (!stream)
206 		return -1;
207 	if (stream->state < TCP_ST_ESTABLISHED)
208 		return -1;
209 
210 	TRACE_EPOLL("Stream %d at state %s\n",
211 			stream->id, TCPStateToString(stream));
212 	/* if there are payloads already read before epoll registration */
213 	/* generate read event */
214 	if (socket->epoll & MOS_EPOLLIN) {
215 		struct tcp_recv_vars *rcvvar = stream->rcvvar;
216 		if (rcvvar->rcvbuf && tcprb_cflen(rcvvar->rcvbuf) > 0) {
217 			TRACE_EPOLL("Socket %d: Has existing payloads\n", socket->id);
218 			AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
219 		} else if (stream->state == TCP_ST_CLOSE_WAIT) {
220 			TRACE_EPOLL("Socket %d: Waiting for close\n", socket->id);
221 			AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
222 		}
223 	}
224 
225 	/* same thing to the write event */
226 	if (socket->epoll & MOS_EPOLLOUT) {
227 		struct tcp_send_vars *sndvar = stream->sndvar;
228 		if (!sndvar->sndbuf ||
229 				(sndvar->sndbuf && sndvar->sndbuf->len < sndvar->snd_wnd)) {
230 			if (!(socket->events & MOS_EPOLLOUT)) {
231 				TRACE_EPOLL("Socket %d: Adding write event\n", socket->id);
232 				AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
233 			}
234 		}
235 	}
236 
237 	return 0;
238 }
239 /*----------------------------------------------------------------------------*/
240 int
241 mtcp_epoll_ctl(mctx_t mctx, int epid,
242 		int op, int sockid, struct mtcp_epoll_event *event)
243 {
244 	mtcp_manager_t mtcp;
245 	struct mtcp_epoll *ep;
246 	socket_map_t socket;
247 	uint32_t events;
248 
249 	mtcp = GetMTCPManager(mctx);
250 	if (!mtcp) {
251 		return -1;
252 	}
253 
254 	if (epid < 0 || epid >= g_config.mos->max_concurrency) {
255 		TRACE_API("Epoll id %d out of range.\n", epid);
256 		errno = EBADF;
257 		return -1;
258 	}
259 
260 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
261 		TRACE_API("Socket id %d out of range.\n", sockid);
262 		errno = EBADF;
263 		return -1;
264 	}
265 
266 	if (mtcp->smap[epid].socktype == MOS_SOCK_UNUSED) {
267 		errno = EBADF;
268 		return -1;
269 	}
270 
271 	if (mtcp->smap[epid].socktype != MOS_SOCK_EPOLL) {
272 		errno = EINVAL;
273 		return -1;
274 	}
275 
276 	ep = mtcp->smap[epid].ep;
277 	if (!ep || (!event && op != MOS_EPOLL_CTL_DEL)) {
278 		errno = EINVAL;
279 		return -1;
280 	}
281 	socket = &mtcp->smap[sockid];
282 
283 	if (op == MOS_EPOLL_CTL_ADD) {
284 		if (socket->epoll) {
285 			errno = EEXIST;
286 			return -1;
287 		}
288 
289 		/* EPOLLERR and EPOLLHUP are registered as default */
290 		events = event->events;
291 		events |= (MOS_EPOLLERR | MOS_EPOLLHUP);
292 		socket->ep_data = event->data;
293 		socket->epoll = events;
294 
295 		TRACE_EPOLL("Adding epoll socket %d(type %d) ET: %llu, IN: %llu, OUT: %llu\n",
296 			    socket->id, socket->socktype,
297 			    (unsigned long long)socket->epoll & MOS_EPOLLET,
298 			    (unsigned long long)socket->epoll & MOS_EPOLLIN,
299 			    (unsigned long long)socket->epoll & MOS_EPOLLOUT);
300 
301 		if (socket->socktype == MOS_SOCK_STREAM) {
302 			RaisePendingStreamEvents(mtcp, ep, socket);
303 		} else if (socket->socktype == MOS_SOCK_PIPE) {
304 			RaisePendingPipeEvents(mctx, epid, sockid);
305 		}
306 
307 	} else if (op == MOS_EPOLL_CTL_MOD) {
308 		if (!socket->epoll) {
309 			pthread_mutex_unlock(&ep->epoll_lock);
310 			errno = ENOENT;
311 			return -1;
312 		}
313 
314 		events = event->events;
315 		events |= (MOS_EPOLLERR | MOS_EPOLLHUP);
316 		socket->ep_data = event->data;
317 		socket->epoll = events;
318 
319 		if (socket->socktype == MOS_SOCK_STREAM) {
320 			RaisePendingStreamEvents(mtcp, ep, socket);
321 		} else if (socket->socktype == MOS_SOCK_PIPE) {
322 			RaisePendingPipeEvents(mctx, epid, sockid);
323 		}
324 
325 	} else if (op == MOS_EPOLL_CTL_DEL) {
326 		if (!socket->epoll) {
327 			errno = ENOENT;
328 			return -1;
329 		}
330 
331 		socket->epoll = MOS_EPOLLNONE;
332 	}
333 
334 	return 0;
335 }
336 /*----------------------------------------------------------------------------*/
337 int
338 mtcp_epoll_wait(mctx_t mctx, int epid,
339 		struct mtcp_epoll_event *events, int maxevents, int timeout)
340 {
341 	mtcp_manager_t mtcp;
342 	struct mtcp_epoll *ep;
343 	struct event_queue *eq;
344 	struct event_queue *eq_shadow;
345 	socket_map_t event_socket;
346 	int validity;
347 	int i, cnt, ret;
348 	int num_events;
349 
350 	mtcp = GetMTCPManager(mctx);
351 	if (!mtcp) {
352 		return -1;
353 	}
354 
355 	if (epid < 0 || epid >= g_config.mos->max_concurrency) {
356 		TRACE_API("Epoll id %d out of range.\n", epid);
357 		errno = EBADF;
358 		return -1;
359 	}
360 
361 	if (mtcp->smap[epid].socktype == MOS_SOCK_UNUSED) {
362 		errno = EBADF;
363 		return -1;
364 	}
365 
366 	if (mtcp->smap[epid].socktype != MOS_SOCK_EPOLL) {
367 		errno = EINVAL;
368 		return -1;
369 	}
370 
371 	ep = mtcp->smap[epid].ep;
372 	if (!ep || !events || maxevents <= 0) {
373 		errno = EINVAL;
374 		return -1;
375 	}
376 
377 	ep->stat.calls++;
378 
379 #if SPIN_BEFORE_SLEEP
380 	int spin = 0;
381 	while (ep->num_events == 0 && spin < SPIN_THRESH) {
382 		spin++;
383 	}
384 #endif /* SPIN_BEFORE_SLEEP */
385 
386 	if (pthread_mutex_lock(&ep->epoll_lock)) {
387 		if (errno == EDEADLK)
388 			perror("mtcp_epoll_wait: epoll_lock blocked\n");
389 		assert(0);
390 	}
391 
392 wait:
393 	eq = ep->usr_queue;
394 	eq_shadow = ep->usr_shadow_queue;
395 
396 	/* wait until event occurs */
397 	while (eq->num_events == 0 && eq_shadow->num_events == 0 && timeout != 0) {
398 
399 #if INTR_SLEEPING_MTCP
400 		/* signal to mtcp thread if it is sleeping */
401 		if (mtcp->wakeup_flag && mtcp->is_sleeping) {
402 			pthread_kill(mtcp->ctx->thread, SIGUSR1);
403 		}
404 #endif
405 		ep->stat.waits++;
406 		ep->waiting = TRUE;
407 		if (timeout > 0) {
408 			struct timespec deadline;
409 
410 			clock_gettime(CLOCK_REALTIME, &deadline);
411 			if (timeout >= 1000) {
412 				int sec;
413 				sec = timeout / 1000;
414 				deadline.tv_sec += sec;
415 				timeout -= sec * 1000;
416 			}
417 
418 			deadline.tv_nsec += timeout * 1000000;
419 
420 			if (deadline.tv_nsec >= 1000000000) {
421 				deadline.tv_sec++;
422 				deadline.tv_nsec -= 1000000000;
423 			}
424 
425 			//deadline.tv_sec = mtcp->cur_tv.tv_sec;
426 			//deadline.tv_nsec = (mtcp->cur_tv.tv_usec + timeout * 1000) * 1000;
427 			ret = pthread_cond_timedwait(&ep->epoll_cond,
428 					&ep->epoll_lock, &deadline);
429 			if (ret && ret != ETIMEDOUT) {
430 				/* errno set by pthread_cond_timedwait() */
431 				pthread_mutex_unlock(&ep->epoll_lock);
432 				TRACE_ERROR("pthread_cond_timedwait failed. ret: %d, error: %s\n",
433 						ret, strerror(errno));
434 				return -1;
435 			}
436 			timeout = 0;
437 		} else if (timeout < 0) {
438 			ret = pthread_cond_wait(&ep->epoll_cond, &ep->epoll_lock);
439 			if (ret) {
440 				/* errno set by pthread_cond_wait() */
441 				pthread_mutex_unlock(&ep->epoll_lock);
442 				TRACE_ERROR("pthread_cond_wait failed. ret: %d, error: %s\n",
443 						ret, strerror(errno));
444 				return -1;
445 			}
446 		}
447 		ep->waiting = FALSE;
448 
449 		if (mtcp->ctx->done || mtcp->ctx->exit || mtcp->ctx->interrupt) {
450 			mtcp->ctx->interrupt = FALSE;
451 			//ret = pthread_cond_signal(&ep->epoll_cond);
452 			pthread_mutex_unlock(&ep->epoll_lock);
453 			errno = EINTR;
454 			return -1;
455 		}
456 
457 	}
458 
459 	/* fetch events from the user event queue */
460 	cnt = 0;
461 	num_events = eq->num_events;
462 	for (i = 0; i < num_events && cnt < maxevents; i++) {
463 		event_socket = &mtcp->smap[eq->events[eq->start].sockid];
464 		validity = TRUE;
465 		if (event_socket->socktype == MOS_SOCK_UNUSED)
466 			validity = FALSE;
467 		if (!(event_socket->epoll & eq->events[eq->start].ev.events))
468 			validity = FALSE;
469 		if (!(event_socket->events & eq->events[eq->start].ev.events))
470 			validity = FALSE;
471 
472 		if (validity) {
473 			events[cnt++] = eq->events[eq->start].ev;
474 			assert(eq->events[eq->start].sockid >= 0);
475 
476 			TRACE_EPOLL("Socket %d: Handled event. event: %s, "
477 					"start: %u, end: %u, num: %u\n",
478 					event_socket->id,
479 					EventToString(eq->events[eq->start].ev.events),
480 					eq->start, eq->end, eq->num_events);
481 			ep->stat.handled++;
482 		} else {
483 			TRACE_EPOLL("Socket %d: event %s invalidated.\n",
484 					eq->events[eq->start].sockid,
485 					EventToString(eq->events[eq->start].ev.events));
486 			ep->stat.invalidated++;
487 		}
488 		event_socket->events &= (~eq->events[eq->start].ev.events);
489 
490 		eq->start++;
491 		eq->num_events--;
492 		if (eq->start >= eq->size) {
493 			eq->start = 0;
494 		}
495 	}
496 
497 	/* fetch eventes from user shadow event queue */
498 	eq = ep->usr_shadow_queue;
499 	num_events = eq->num_events;
500 	for (i = 0; i < num_events && cnt < maxevents; i++) {
501 		event_socket = &mtcp->smap[eq->events[eq->start].sockid];
502 		validity = TRUE;
503 		if (event_socket->socktype == MOS_SOCK_UNUSED)
504 			validity = FALSE;
505 		if (!(event_socket->epoll & eq->events[eq->start].ev.events))
506 			validity = FALSE;
507 		if (!(event_socket->events & eq->events[eq->start].ev.events))
508 			validity = FALSE;
509 
510 		if (validity) {
511 			events[cnt++] = eq->events[eq->start].ev;
512 			assert(eq->events[eq->start].sockid >= 0);
513 
514 			TRACE_EPOLL("Socket %d: Handled event. event: %s, "
515 					"start: %u, end: %u, num: %u\n",
516 					event_socket->id,
517 					EventToString(eq->events[eq->start].ev.events),
518 					eq->start, eq->end, eq->num_events);
519 			ep->stat.handled++;
520 		} else {
521 			TRACE_EPOLL("Socket %d: event %s invalidated.\n",
522 					eq->events[eq->start].sockid,
523 					EventToString(eq->events[eq->start].ev.events));
524 			ep->stat.invalidated++;
525 		}
526 		event_socket->events &= (~eq->events[eq->start].ev.events);
527 
528 		eq->start++;
529 		eq->num_events--;
530 		if (eq->start >= eq->size) {
531 			eq->start = 0;
532 		}
533 	}
534 
535 	if (cnt == 0 && timeout != 0)
536 		goto wait;
537 
538 	pthread_mutex_unlock(&ep->epoll_lock);
539 
540 	return cnt;
541 }
542 /*----------------------------------------------------------------------------*/
543 inline int
544 AddEpollEvent(struct mtcp_epoll *ep,
545 		int queue_type, socket_map_t socket, uint32_t event)
546 {
547 #ifdef DBGMSG
548 	__PREPARE_DBGLOGGING();
549 #endif
550 	struct event_queue *eq;
551 	int index;
552 
553 	if (!ep || !socket || !event)
554 		return -1;
555 
556 	ep->stat.issued++;
557 
558 	if (socket->events & event) {
559 		return 0;
560 	}
561 
562 	if (queue_type == MOS_EVENT_QUEUE) {
563 		eq = ep->mtcp_queue;
564 	} else if (queue_type == USR_EVENT_QUEUE) {
565 		eq = ep->usr_queue;
566 		pthread_mutex_lock(&ep->epoll_lock);
567 	} else if (queue_type == USR_SHADOW_EVENT_QUEUE) {
568 		eq = ep->usr_shadow_queue;
569 	} else {
570 		TRACE_ERROR("Non-existing event queue type!\n");
571 		return -1;
572 	}
573 
574 	if (eq->num_events >= eq->size) {
575 		TRACE_ERROR("Exceeded epoll event queue! num_events: %d, size: %d\n",
576 				eq->num_events, eq->size);
577 		if (queue_type == USR_EVENT_QUEUE)
578 			pthread_mutex_unlock(&ep->epoll_lock);
579 		return -1;
580 	}
581 
582 	index = eq->end++;
583 
584 	socket->events |= event;
585 	eq->events[index].sockid = socket->id;
586 	eq->events[index].ev.events = event;
587 	eq->events[index].ev.data = socket->ep_data;
588 
589 	if (eq->end >= eq->size) {
590 		eq->end = 0;
591 	}
592 	eq->num_events++;
593 
594 	TRACE_EPOLL("Socket %d New event: %s, start: %u, end: %u, num: %u\n",
595 			eq->events[index].sockid,
596 			EventToString(eq->events[index].ev.events),
597 			eq->start, eq->end, eq->num_events);
598 
599 	if (queue_type == USR_EVENT_QUEUE)
600 		pthread_mutex_unlock(&ep->epoll_lock);
601 
602 	ep->stat.registered++;
603 
604 	return 0;
605 }
606