xref: /mOS-networking-stack/core/src/eventpoll.c (revision 3b6b9ba6)
1 #include "debug.h"
2 #include <sys/queue.h>
3 #include <unistd.h>
4 #include <time.h>
5 #include <signal.h>
6 #include <assert.h>
7 #include <string.h>
8 
9 #include "mtcp.h"
10 #include "tcp_stream.h"
11 #include "eventpoll.h"
12 #include "tcp_in.h"
13 #include "pipe.h"
14 #include "tcp_rb.h"
15 #include "config.h"
16 
17 #define MAX(a, b) ((a)>(b)?(a):(b))
18 #define MIN(a, b) ((a)<(b)?(a):(b))
19 
20 #define SPIN_BEFORE_SLEEP FALSE
21 #define SPIN_THRESH 10000000
22 
23 /*----------------------------------------------------------------------------*/
24 char *event_str[] = {"NONE", "IN", "PRI", "OUT", "ERR", "HUP", "RDHUP"};
25 /*----------------------------------------------------------------------------*/
26 char *
27 EventToString(uint32_t event)
28 {
29 	switch (event) {
30 		case MOS_EPOLLNONE:
31 			return event_str[0];
32 			break;
33 		case MOS_EPOLLIN:
34 			return event_str[1];
35 			break;
36 		case MOS_EPOLLPRI:
37 			return event_str[2];
38 			break;
39 		case MOS_EPOLLOUT:
40 			return event_str[3];
41 			break;
42 		case MOS_EPOLLERR:
43 			return event_str[4];
44 			break;
45 		case MOS_EPOLLHUP:
46 			return event_str[5];
47 			break;
48 		case MOS_EPOLLRDHUP:
49 			return event_str[6];
50 			break;
51 		default:
52 			assert(0);
53 	}
54 
55 	assert(0);
56 	return NULL;
57 }
58 /*----------------------------------------------------------------------------*/
59 struct event_queue *
60 CreateEventQueue(int size)
61 {
62 	struct event_queue *eq;
63 
64 	eq = (struct event_queue *)calloc(1, sizeof(struct event_queue));
65 	if (!eq)
66 		return NULL;
67 
68 	eq->start = 0;
69 	eq->end = 0;
70 	eq->size = size;
71 	eq->events = (struct mtcp_epoll_event_int *)
72 			calloc(size, sizeof(struct mtcp_epoll_event_int));
73 	if (!eq->events) {
74 		free(eq);
75 		return NULL;
76 	}
77 	eq->num_events = 0;
78 
79 	return eq;
80 }
81 /*----------------------------------------------------------------------------*/
82 void
83 DestroyEventQueue(struct event_queue *eq)
84 {
85 	if (eq->events)
86 		free(eq->events);
87 
88 	free(eq);
89 }
90 /*----------------------------------------------------------------------------*/
91 int
92 mtcp_epoll_create(mctx_t mctx, int size)
93 {
94 	mtcp_manager_t mtcp = g_mtcp[mctx->cpu];
95 	struct mtcp_epoll *ep;
96 	socket_map_t epsocket;
97 
98 	if (size <= 0) {
99 		errno = EINVAL;
100 		return -1;
101 	}
102 
103 	epsocket = AllocateSocket(mctx, MOS_SOCK_EPOLL);
104 	if (!epsocket) {
105 		errno = ENFILE;
106 		return -1;
107 	}
108 
109 	ep = (struct mtcp_epoll *)calloc(1, sizeof(struct mtcp_epoll));
110 	if (!ep) {
111 		FreeSocket(mctx, epsocket->id, MOS_SOCK_EPOLL);
112 		return -1;
113 	}
114 
115 	/* create event queues */
116 	ep->usr_queue = CreateEventQueue(size);
117 	if (!ep->usr_queue)
118 		return -1;
119 
120 	ep->usr_shadow_queue = CreateEventQueue(size);
121 	if (!ep->usr_shadow_queue) {
122 		DestroyEventQueue(ep->usr_queue);
123 		return -1;
124 	}
125 
126 	ep->mtcp_queue = CreateEventQueue(size);
127 	if (!ep->mtcp_queue) {
128 		DestroyEventQueue(ep->usr_queue);
129 		DestroyEventQueue(ep->usr_shadow_queue);
130 		return -1;
131 	}
132 
133 	TRACE_EPOLL("epoll structure of size %d created.\n", size);
134 
135 	mtcp->ep = ep;
136 	epsocket->ep = ep;
137 
138 	if (pthread_mutex_init(&ep->epoll_lock, NULL)) {
139 		return -1;
140 	}
141 	if (pthread_cond_init(&ep->epoll_cond, NULL)) {
142 		return -1;
143 	}
144 
145 	return epsocket->id;
146 }
147 /*----------------------------------------------------------------------------*/
148 int
149 CloseEpollSocket(mctx_t mctx, int epid)
150 {
151 	mtcp_manager_t mtcp;
152 	struct mtcp_epoll *ep;
153 
154 	mtcp = GetMTCPManager(mctx);
155 	if (!mtcp) {
156 		return -1;
157 	}
158 
159 	ep = mtcp->smap[epid].ep;
160 	if (!ep) {
161 		errno = EINVAL;
162 		return -1;
163 	}
164 
165 	DestroyEventQueue(ep->usr_queue);
166 	DestroyEventQueue(ep->usr_shadow_queue);
167 	DestroyEventQueue(ep->mtcp_queue);
168 	free(ep);
169 
170 	pthread_mutex_lock(&ep->epoll_lock);
171 	mtcp->ep = NULL;
172 	mtcp->smap[epid].ep = NULL;
173 	pthread_cond_signal(&ep->epoll_cond);
174 	pthread_mutex_unlock(&ep->epoll_lock);
175 
176 	pthread_cond_destroy(&ep->epoll_cond);
177 	pthread_mutex_destroy(&ep->epoll_lock);
178 
179 	return 0;
180 }
181 /*----------------------------------------------------------------------------*/
182 static int
183 RaisePendingStreamEvents(mtcp_manager_t mtcp,
184 		struct mtcp_epoll *ep, socket_map_t socket)
185 {
186 	tcp_stream *stream = socket->stream;
187 
188 	if (!stream)
189 		return -1;
190 	if (stream->state < TCP_ST_ESTABLISHED)
191 		return -1;
192 
193 	TRACE_EPOLL("Stream %d at state %s\n",
194 			stream->id, TCPStateToString(stream));
195 	/* if there are payloads already read before epoll registration */
196 	/* generate read event */
197 	if (socket->epoll & MOS_EPOLLIN) {
198 		struct tcp_recv_vars *rcvvar = stream->rcvvar;
199 		if (rcvvar->rcvbuf && tcprb_cflen(rcvvar->rcvbuf) > 0) {
200 			TRACE_EPOLL("Socket %d: Has existing payloads\n", socket->id);
201 			AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
202 		} else if (stream->state == TCP_ST_CLOSE_WAIT) {
203 			TRACE_EPOLL("Socket %d: Waiting for close\n", socket->id);
204 			AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
205 		}
206 	}
207 
208 	/* same thing to the write event */
209 	if (socket->epoll & MOS_EPOLLOUT) {
210 		struct tcp_send_vars *sndvar = stream->sndvar;
211 		if (!sndvar->sndbuf ||
212 				(sndvar->sndbuf && sndvar->sndbuf->len < sndvar->snd_wnd)) {
213 			if (!(socket->events & MOS_EPOLLOUT)) {
214 				TRACE_EPOLL("Socket %d: Adding write event\n", socket->id);
215 				AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
216 			}
217 		}
218 	}
219 
220 	return 0;
221 }
222 /*----------------------------------------------------------------------------*/
223 int
224 mtcp_epoll_ctl(mctx_t mctx, int epid,
225 		int op, int sockid, struct mtcp_epoll_event *event)
226 {
227 	mtcp_manager_t mtcp;
228 	struct mtcp_epoll *ep;
229 	socket_map_t socket;
230 	uint32_t events;
231 
232 	mtcp = GetMTCPManager(mctx);
233 	if (!mtcp) {
234 		return -1;
235 	}
236 
237 	if (epid < 0 || epid >= g_config.mos->max_concurrency) {
238 		TRACE_API("Epoll id %d out of range.\n", epid);
239 		errno = EBADF;
240 		return -1;
241 	}
242 
243 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
244 		TRACE_API("Socket id %d out of range.\n", sockid);
245 		errno = EBADF;
246 		return -1;
247 	}
248 
249 	if (mtcp->smap[epid].socktype == MOS_SOCK_UNUSED) {
250 		errno = EBADF;
251 		return -1;
252 	}
253 
254 	if (mtcp->smap[epid].socktype != MOS_SOCK_EPOLL) {
255 		errno = EINVAL;
256 		return -1;
257 	}
258 
259 	ep = mtcp->smap[epid].ep;
260 	if (!ep || (!event && op != MOS_EPOLL_CTL_DEL)) {
261 		errno = EINVAL;
262 		return -1;
263 	}
264 	socket = &mtcp->smap[sockid];
265 
266 	if (op == MOS_EPOLL_CTL_ADD) {
267 		if (socket->epoll) {
268 			errno = EEXIST;
269 			return -1;
270 		}
271 
272 		/* EPOLLERR and EPOLLHUP are registered as default */
273 		events = event->events;
274 		events |= (MOS_EPOLLERR | MOS_EPOLLHUP);
275 		socket->ep_data = event->data;
276 		socket->epoll = events;
277 
278 		TRACE_EPOLL("Adding epoll socket %d(type %d) ET: %u, IN: %u, OUT: %u\n",
279 				socket->id, socket->socktype, socket->epoll & MOS_EPOLLET,
280 				socket->epoll & MOS_EPOLLIN, socket->epoll & MOS_EPOLLOUT);
281 
282 		if (socket->socktype == MOS_SOCK_STREAM) {
283 			RaisePendingStreamEvents(mtcp, ep, socket);
284 		} else if (socket->socktype == MOS_SOCK_PIPE) {
285 			RaisePendingPipeEvents(mctx, epid, sockid);
286 		}
287 
288 	} else if (op == MOS_EPOLL_CTL_MOD) {
289 		if (!socket->epoll) {
290 			pthread_mutex_unlock(&ep->epoll_lock);
291 			errno = ENOENT;
292 			return -1;
293 		}
294 
295 		events = event->events;
296 		events |= (MOS_EPOLLERR | MOS_EPOLLHUP);
297 		socket->ep_data = event->data;
298 		socket->epoll = events;
299 
300 		if (socket->socktype == MOS_SOCK_STREAM) {
301 			RaisePendingStreamEvents(mtcp, ep, socket);
302 		} else if (socket->socktype == MOS_SOCK_PIPE) {
303 			RaisePendingPipeEvents(mctx, epid, sockid);
304 		}
305 
306 	} else if (op == MOS_EPOLL_CTL_DEL) {
307 		if (!socket->epoll) {
308 			errno = ENOENT;
309 			return -1;
310 		}
311 
312 		socket->epoll = MOS_EPOLLNONE;
313 	}
314 
315 	return 0;
316 }
317 /*----------------------------------------------------------------------------*/
318 int
319 mtcp_epoll_wait(mctx_t mctx, int epid,
320 		struct mtcp_epoll_event *events, int maxevents, int timeout)
321 {
322 	mtcp_manager_t mtcp;
323 	struct mtcp_epoll *ep;
324 	struct event_queue *eq;
325 	struct event_queue *eq_shadow;
326 	socket_map_t event_socket;
327 	int validity;
328 	int i, cnt, ret;
329 	int num_events;
330 
331 	mtcp = GetMTCPManager(mctx);
332 	if (!mtcp) {
333 		return -1;
334 	}
335 
336 	if (epid < 0 || epid >= g_config.mos->max_concurrency) {
337 		TRACE_API("Epoll id %d out of range.\n", epid);
338 		errno = EBADF;
339 		return -1;
340 	}
341 
342 	if (mtcp->smap[epid].socktype == MOS_SOCK_UNUSED) {
343 		errno = EBADF;
344 		return -1;
345 	}
346 
347 	if (mtcp->smap[epid].socktype != MOS_SOCK_EPOLL) {
348 		errno = EINVAL;
349 		return -1;
350 	}
351 
352 	ep = mtcp->smap[epid].ep;
353 	if (!ep || !events || maxevents <= 0) {
354 		errno = EINVAL;
355 		return -1;
356 	}
357 
358 	ep->stat.calls++;
359 
360 #if SPIN_BEFORE_SLEEP
361 	int spin = 0;
362 	while (ep->num_events == 0 && spin < SPIN_THRESH) {
363 		spin++;
364 	}
365 #endif /* SPIN_BEFORE_SLEEP */
366 
367 	if (pthread_mutex_lock(&ep->epoll_lock)) {
368 		if (errno == EDEADLK)
369 			perror("mtcp_epoll_wait: epoll_lock blocked\n");
370 		assert(0);
371 	}
372 
373 wait:
374 	eq = ep->usr_queue;
375 	eq_shadow = ep->usr_shadow_queue;
376 
377 	/* wait until event occurs */
378 	while (eq->num_events == 0 && eq_shadow->num_events == 0 && timeout != 0) {
379 
380 #if INTR_SLEEPING_MTCP
381 		/* signal to mtcp thread if it is sleeping */
382 		if (mtcp->wakeup_flag && mtcp->is_sleeping) {
383 			pthread_kill(mtcp->ctx->thread, SIGUSR1);
384 		}
385 #endif
386 		ep->stat.waits++;
387 		ep->waiting = TRUE;
388 		if (timeout > 0) {
389 			struct timespec deadline;
390 
391 			clock_gettime(CLOCK_REALTIME, &deadline);
392 			if (timeout > 1000) {
393 				int sec;
394 				sec = timeout / 1000;
395 				deadline.tv_sec += sec;
396 				timeout -= sec * 1000;
397 			}
398 
399 			if (deadline.tv_nsec >= 1000000000) {
400 				deadline.tv_sec++;
401 				deadline.tv_nsec -= 1000000000;
402 			}
403 
404 			//deadline.tv_sec = mtcp->cur_tv.tv_sec;
405 			//deadline.tv_nsec = (mtcp->cur_tv.tv_usec + timeout * 1000) * 1000;
406 			ret = pthread_cond_timedwait(&ep->epoll_cond,
407 					&ep->epoll_lock, &deadline);
408 			if (ret && ret != ETIMEDOUT) {
409 				/* errno set by pthread_cond_timedwait() */
410 				pthread_mutex_unlock(&ep->epoll_lock);
411 				TRACE_ERROR("pthread_cond_timedwait failed. ret: %d, error: %s\n",
412 						ret, strerror(errno));
413 				return -1;
414 			}
415 			timeout = 0;
416 		} else if (timeout < 0) {
417 			ret = pthread_cond_wait(&ep->epoll_cond, &ep->epoll_lock);
418 			if (ret) {
419 				/* errno set by pthread_cond_wait() */
420 				pthread_mutex_unlock(&ep->epoll_lock);
421 				TRACE_ERROR("pthread_cond_wait failed. ret: %d, error: %s\n",
422 						ret, strerror(errno));
423 				return -1;
424 			}
425 		}
426 		ep->waiting = FALSE;
427 
428 		if (mtcp->ctx->done || mtcp->ctx->exit || mtcp->ctx->interrupt) {
429 			mtcp->ctx->interrupt = FALSE;
430 			//ret = pthread_cond_signal(&ep->epoll_cond);
431 			pthread_mutex_unlock(&ep->epoll_lock);
432 			errno = EINTR;
433 			return -1;
434 		}
435 
436 	}
437 
438 	/* fetch events from the user event queue */
439 	cnt = 0;
440 	num_events = eq->num_events;
441 	for (i = 0; i < num_events && cnt < maxevents; i++) {
442 		event_socket = &mtcp->smap[eq->events[eq->start].sockid];
443 		validity = TRUE;
444 		if (event_socket->socktype == MOS_SOCK_UNUSED)
445 			validity = FALSE;
446 		if (!(event_socket->epoll & eq->events[eq->start].ev.events))
447 			validity = FALSE;
448 		if (!(event_socket->events & eq->events[eq->start].ev.events))
449 			validity = FALSE;
450 
451 		if (validity) {
452 			events[cnt++] = eq->events[eq->start].ev;
453 			assert(eq->events[eq->start].sockid >= 0);
454 
455 			TRACE_EPOLL("Socket %d: Handled event. event: %s, "
456 					"start: %u, end: %u, num: %u\n",
457 					event_socket->id,
458 					EventToString(eq->events[eq->start].ev.events),
459 					eq->start, eq->end, eq->num_events);
460 			ep->stat.handled++;
461 		} else {
462 			TRACE_EPOLL("Socket %d: event %s invalidated.\n",
463 					eq->events[eq->start].sockid,
464 					EventToString(eq->events[eq->start].ev.events));
465 			ep->stat.invalidated++;
466 		}
467 		event_socket->events &= (~eq->events[eq->start].ev.events);
468 
469 		eq->start++;
470 		eq->num_events--;
471 		if (eq->start >= eq->size) {
472 			eq->start = 0;
473 		}
474 	}
475 
476 	/* fetch eventes from user shadow event queue */
477 	eq = ep->usr_shadow_queue;
478 	num_events = eq->num_events;
479 	for (i = 0; i < num_events && cnt < maxevents; i++) {
480 		event_socket = &mtcp->smap[eq->events[eq->start].sockid];
481 		validity = TRUE;
482 		if (event_socket->socktype == MOS_SOCK_UNUSED)
483 			validity = FALSE;
484 		if (!(event_socket->epoll & eq->events[eq->start].ev.events))
485 			validity = FALSE;
486 		if (!(event_socket->events & eq->events[eq->start].ev.events))
487 			validity = FALSE;
488 
489 		if (validity) {
490 			events[cnt++] = eq->events[eq->start].ev;
491 			assert(eq->events[eq->start].sockid >= 0);
492 
493 			TRACE_EPOLL("Socket %d: Handled event. event: %s, "
494 					"start: %u, end: %u, num: %u\n",
495 					event_socket->id,
496 					EventToString(eq->events[eq->start].ev.events),
497 					eq->start, eq->end, eq->num_events);
498 			ep->stat.handled++;
499 		} else {
500 			TRACE_EPOLL("Socket %d: event %s invalidated.\n",
501 					eq->events[eq->start].sockid,
502 					EventToString(eq->events[eq->start].ev.events));
503 			ep->stat.invalidated++;
504 		}
505 		event_socket->events &= (~eq->events[eq->start].ev.events);
506 
507 		eq->start++;
508 		eq->num_events--;
509 		if (eq->start >= eq->size) {
510 			eq->start = 0;
511 		}
512 	}
513 
514 	if (cnt == 0 && timeout != 0)
515 		goto wait;
516 
517 	pthread_mutex_unlock(&ep->epoll_lock);
518 
519 	return cnt;
520 }
521 /*----------------------------------------------------------------------------*/
522 inline int
523 AddEpollEvent(struct mtcp_epoll *ep,
524 		int queue_type, socket_map_t socket, uint32_t event)
525 {
526 #ifdef DBGMSG
527 	__PREPARE_DBGLOGGING();
528 #endif
529 	struct event_queue *eq;
530 	int index;
531 
532 	if (!ep || !socket || !event)
533 		return -1;
534 
535 	ep->stat.issued++;
536 
537 	if (socket->events & event) {
538 		return 0;
539 	}
540 
541 	if (queue_type == MOS_EVENT_QUEUE) {
542 		eq = ep->mtcp_queue;
543 	} else if (queue_type == USR_EVENT_QUEUE) {
544 		eq = ep->usr_queue;
545 		pthread_mutex_lock(&ep->epoll_lock);
546 	} else if (queue_type == USR_SHADOW_EVENT_QUEUE) {
547 		eq = ep->usr_shadow_queue;
548 	} else {
549 		TRACE_ERROR("Non-existing event queue type!\n");
550 		return -1;
551 	}
552 
553 	if (eq->num_events >= eq->size) {
554 		TRACE_ERROR("Exceeded epoll event queue! num_events: %d, size: %d\n",
555 				eq->num_events, eq->size);
556 		if (queue_type == USR_EVENT_QUEUE)
557 			pthread_mutex_unlock(&ep->epoll_lock);
558 		return -1;
559 	}
560 
561 	index = eq->end++;
562 
563 	socket->events |= event;
564 	eq->events[index].sockid = socket->id;
565 	eq->events[index].ev.events = event;
566 	eq->events[index].ev.data = socket->ep_data;
567 
568 	if (eq->end >= eq->size) {
569 		eq->end = 0;
570 	}
571 	eq->num_events++;
572 
573 	TRACE_EPOLL("Socket %d New event: %s, start: %u, end: %u, num: %u\n",
574 			eq->events[index].sockid,
575 			EventToString(eq->events[index].ev.events),
576 			eq->start, eq->end, eq->num_events);
577 
578 	if (queue_type == USR_EVENT_QUEUE)
579 		pthread_mutex_unlock(&ep->epoll_lock);
580 
581 	ep->stat.registered++;
582 
583 	return 0;
584 }
585