xref: /mOS-networking-stack/core/src/eventpoll.c (revision a5e1a556)
1 #include "debug.h"
2 #include <sys/queue.h>
3 #include <unistd.h>
4 #include <time.h>
5 #include <signal.h>
6 #include <assert.h>
7 #include <string.h>
8 
9 #include "mtcp.h"
10 #include "tcp_stream.h"
11 #include "eventpoll.h"
12 #include "tcp_in.h"
13 #include "pipe.h"
14 #include "tcp_rb.h"
15 #include "config.h"
16 
17 #define MAX(a, b) ((a)>(b)?(a):(b))
18 #define MIN(a, b) ((a)<(b)?(a):(b))
19 
20 #define SPIN_BEFORE_SLEEP FALSE
21 #define SPIN_THRESH 10000000
22 
23 /*----------------------------------------------------------------------------*/
24 char *event_str[] = {"NONE", "IN", "PRI", "OUT", "ERR", "HUP", "RDHUP"};
25 /*----------------------------------------------------------------------------*/
26 char *
27 EventToString(uint32_t event)
28 {
29 	switch (event) {
30 		case MOS_EPOLLNONE:
31 			return event_str[0];
32 			break;
33 		case MOS_EPOLLIN:
34 			return event_str[1];
35 			break;
36 		case MOS_EPOLLPRI:
37 			return event_str[2];
38 			break;
39 		case MOS_EPOLLOUT:
40 			return event_str[3];
41 			break;
42 		case MOS_EPOLLERR:
43 			return event_str[4];
44 			break;
45 		case MOS_EPOLLHUP:
46 			return event_str[5];
47 			break;
48 		case MOS_EPOLLRDHUP:
49 			return event_str[6];
50 			break;
51 		default:
52 			assert(0);
53 	}
54 
55 	assert(0);
56 	return NULL;
57 }
58 /*----------------------------------------------------------------------------*/
59 struct event_queue *
60 CreateEventQueue(int size)
61 {
62 	struct event_queue *eq;
63 
64 	eq = (struct event_queue *)calloc(1, sizeof(struct event_queue));
65 	if (!eq)
66 		return NULL;
67 
68 	eq->start = 0;
69 	eq->end = 0;
70 	eq->size = size;
71 	eq->events = (struct mtcp_epoll_event_int *)
72 			calloc(size, sizeof(struct mtcp_epoll_event_int));
73 	if (!eq->events) {
74 		free(eq);
75 		return NULL;
76 	}
77 	eq->num_events = 0;
78 
79 	return eq;
80 }
81 /*----------------------------------------------------------------------------*/
82 void
83 DestroyEventQueue(struct event_queue *eq)
84 {
85 	if (eq->events)
86 		free(eq->events);
87 
88 	free(eq);
89 }
90 /*----------------------------------------------------------------------------*/
91 int
92 mtcp_epoll_create(mctx_t mctx, int size)
93 {
94 	mtcp_manager_t mtcp = g_mtcp[mctx->cpu];
95 	struct mtcp_epoll *ep;
96 	socket_map_t epsocket;
97 
98 	if (size <= 0) {
99 		errno = EINVAL;
100 		return -1;
101 	}
102 
103 	epsocket = AllocateSocket(mctx, MOS_SOCK_EPOLL);
104 	if (!epsocket) {
105 		errno = ENFILE;
106 		return -1;
107 	}
108 
109 	ep = (struct mtcp_epoll *)calloc(1, sizeof(struct mtcp_epoll));
110 	if (!ep) {
111 		FreeSocket(mctx, epsocket->id, MOS_SOCK_EPOLL);
112 		return -1;
113 	}
114 
115 	/* create event queues */
116 	ep->usr_queue = CreateEventQueue(size);
117 	if (!ep->usr_queue)
118 		return -1;
119 
120 	ep->usr_shadow_queue = CreateEventQueue(size);
121 	if (!ep->usr_shadow_queue) {
122 		DestroyEventQueue(ep->usr_queue);
123 		return -1;
124 	}
125 
126 	ep->mtcp_queue = CreateEventQueue(size);
127 	if (!ep->mtcp_queue) {
128 		DestroyEventQueue(ep->usr_queue);
129 		DestroyEventQueue(ep->usr_shadow_queue);
130 		return -1;
131 	}
132 
133 	TRACE_EPOLL("epoll structure of size %d created.\n", size);
134 
135 	mtcp->ep = ep;
136 	epsocket->ep = ep;
137 
138 	if (pthread_mutex_init(&ep->epoll_lock, NULL)) {
139 		return -1;
140 	}
141 	if (pthread_cond_init(&ep->epoll_cond, NULL)) {
142 		return -1;
143 	}
144 
145 	return epsocket->id;
146 }
147 /*----------------------------------------------------------------------------*/
148 int
149 CloseEpollSocket(mctx_t mctx, int epid)
150 {
151 	mtcp_manager_t mtcp;
152 	struct mtcp_epoll *ep;
153 
154 	mtcp = GetMTCPManager(mctx);
155 	if (!mtcp) {
156 		return -1;
157 	}
158 
159 	ep = mtcp->smap[epid].ep;
160 	if (!ep) {
161 		errno = EINVAL;
162 		return -1;
163 	}
164 
165 	DestroyEventQueue(ep->usr_queue);
166 	DestroyEventQueue(ep->usr_shadow_queue);
167 	DestroyEventQueue(ep->mtcp_queue);
168 	free(ep);
169 
170 	pthread_mutex_lock(&ep->epoll_lock);
171 	mtcp->ep = NULL;
172 	mtcp->smap[epid].ep = NULL;
173 	pthread_cond_signal(&ep->epoll_cond);
174 	pthread_mutex_unlock(&ep->epoll_lock);
175 
176 	pthread_cond_destroy(&ep->epoll_cond);
177 	pthread_mutex_destroy(&ep->epoll_lock);
178 
179 	return 0;
180 }
181 /*----------------------------------------------------------------------------*/
182 static int
183 RaisePendingStreamEvents(mtcp_manager_t mtcp,
184 		struct mtcp_epoll *ep, socket_map_t socket)
185 {
186 	tcp_stream *stream = socket->stream;
187 
188 	if (!stream)
189 		return -1;
190 	if (stream->state < TCP_ST_ESTABLISHED)
191 		return -1;
192 
193 	TRACE_EPOLL("Stream %d at state %s\n",
194 			stream->id, TCPStateToString(stream));
195 	/* if there are payloads already read before epoll registration */
196 	/* generate read event */
197 	if (socket->epoll & MOS_EPOLLIN) {
198 		struct tcp_recv_vars *rcvvar = stream->rcvvar;
199 		if (rcvvar->rcvbuf && tcprb_cflen(rcvvar->rcvbuf) > 0) {
200 			TRACE_EPOLL("Socket %d: Has existing payloads\n", socket->id);
201 			AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
202 		} else if (stream->state == TCP_ST_CLOSE_WAIT) {
203 			TRACE_EPOLL("Socket %d: Waiting for close\n", socket->id);
204 			AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
205 		}
206 	}
207 
208 	/* same thing to the write event */
209 	if (socket->epoll & MOS_EPOLLOUT) {
210 		struct tcp_send_vars *sndvar = stream->sndvar;
211 		if (!sndvar->sndbuf ||
212 				(sndvar->sndbuf && sndvar->sndbuf->len < sndvar->snd_wnd)) {
213 			if (!(socket->events & MOS_EPOLLOUT)) {
214 				TRACE_EPOLL("Socket %d: Adding write event\n", socket->id);
215 				AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
216 			}
217 		}
218 	}
219 
220 	return 0;
221 }
222 /*----------------------------------------------------------------------------*/
223 int
224 mtcp_epoll_ctl(mctx_t mctx, int epid,
225 		int op, int sockid, struct mtcp_epoll_event *event)
226 {
227 	mtcp_manager_t mtcp;
228 	struct mtcp_epoll *ep;
229 	socket_map_t socket;
230 	uint32_t events;
231 
232 	mtcp = GetMTCPManager(mctx);
233 	if (!mtcp) {
234 		return -1;
235 	}
236 
237 	if (epid < 0 || epid >= g_config.mos->max_concurrency) {
238 		TRACE_API("Epoll id %d out of range.\n", epid);
239 		errno = EBADF;
240 		return -1;
241 	}
242 
243 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
244 		TRACE_API("Socket id %d out of range.\n", sockid);
245 		errno = EBADF;
246 		return -1;
247 	}
248 
249 	if (mtcp->smap[epid].socktype == MOS_SOCK_UNUSED) {
250 		errno = EBADF;
251 		return -1;
252 	}
253 
254 	if (mtcp->smap[epid].socktype != MOS_SOCK_EPOLL) {
255 		errno = EINVAL;
256 		return -1;
257 	}
258 
259 	ep = mtcp->smap[epid].ep;
260 	if (!ep || (!event && op != MOS_EPOLL_CTL_DEL)) {
261 		errno = EINVAL;
262 		return -1;
263 	}
264 	socket = &mtcp->smap[sockid];
265 
266 	if (op == MOS_EPOLL_CTL_ADD) {
267 		if (socket->epoll) {
268 			errno = EEXIST;
269 			return -1;
270 		}
271 
272 		/* EPOLLERR and EPOLLHUP are registered as default */
273 		events = event->events;
274 		events |= (MOS_EPOLLERR | MOS_EPOLLHUP);
275 		socket->ep_data = event->data;
276 		socket->epoll = events;
277 
278 		TRACE_EPOLL("Adding epoll socket %d(type %d) ET: %llu, IN: %llu, OUT: %llu\n",
279 			    socket->id, socket->socktype,
280 			    (unsigned long long)socket->epoll & MOS_EPOLLET,
281 			    (unsigned long long)socket->epoll & MOS_EPOLLIN,
282 			    (unsigned long long)socket->epoll & MOS_EPOLLOUT);
283 
284 		if (socket->socktype == MOS_SOCK_STREAM) {
285 			RaisePendingStreamEvents(mtcp, ep, socket);
286 		} else if (socket->socktype == MOS_SOCK_PIPE) {
287 			RaisePendingPipeEvents(mctx, epid, sockid);
288 		}
289 
290 	} else if (op == MOS_EPOLL_CTL_MOD) {
291 		if (!socket->epoll) {
292 			pthread_mutex_unlock(&ep->epoll_lock);
293 			errno = ENOENT;
294 			return -1;
295 		}
296 
297 		events = event->events;
298 		events |= (MOS_EPOLLERR | MOS_EPOLLHUP);
299 		socket->ep_data = event->data;
300 		socket->epoll = events;
301 
302 		if (socket->socktype == MOS_SOCK_STREAM) {
303 			RaisePendingStreamEvents(mtcp, ep, socket);
304 		} else if (socket->socktype == MOS_SOCK_PIPE) {
305 			RaisePendingPipeEvents(mctx, epid, sockid);
306 		}
307 
308 	} else if (op == MOS_EPOLL_CTL_DEL) {
309 		if (!socket->epoll) {
310 			errno = ENOENT;
311 			return -1;
312 		}
313 
314 		socket->epoll = MOS_EPOLLNONE;
315 	}
316 
317 	return 0;
318 }
319 /*----------------------------------------------------------------------------*/
320 int
321 mtcp_epoll_wait(mctx_t mctx, int epid,
322 		struct mtcp_epoll_event *events, int maxevents, int timeout)
323 {
324 	mtcp_manager_t mtcp;
325 	struct mtcp_epoll *ep;
326 	struct event_queue *eq;
327 	struct event_queue *eq_shadow;
328 	socket_map_t event_socket;
329 	int validity;
330 	int i, cnt, ret;
331 	int num_events;
332 
333 	mtcp = GetMTCPManager(mctx);
334 	if (!mtcp) {
335 		return -1;
336 	}
337 
338 	if (epid < 0 || epid >= g_config.mos->max_concurrency) {
339 		TRACE_API("Epoll id %d out of range.\n", epid);
340 		errno = EBADF;
341 		return -1;
342 	}
343 
344 	if (mtcp->smap[epid].socktype == MOS_SOCK_UNUSED) {
345 		errno = EBADF;
346 		return -1;
347 	}
348 
349 	if (mtcp->smap[epid].socktype != MOS_SOCK_EPOLL) {
350 		errno = EINVAL;
351 		return -1;
352 	}
353 
354 	ep = mtcp->smap[epid].ep;
355 	if (!ep || !events || maxevents <= 0) {
356 		errno = EINVAL;
357 		return -1;
358 	}
359 
360 	ep->stat.calls++;
361 
362 #if SPIN_BEFORE_SLEEP
363 	int spin = 0;
364 	while (ep->num_events == 0 && spin < SPIN_THRESH) {
365 		spin++;
366 	}
367 #endif /* SPIN_BEFORE_SLEEP */
368 
369 	if (pthread_mutex_lock(&ep->epoll_lock)) {
370 		if (errno == EDEADLK)
371 			perror("mtcp_epoll_wait: epoll_lock blocked\n");
372 		assert(0);
373 	}
374 
375 wait:
376 	eq = ep->usr_queue;
377 	eq_shadow = ep->usr_shadow_queue;
378 
379 	/* wait until event occurs */
380 	while (eq->num_events == 0 && eq_shadow->num_events == 0 && timeout != 0) {
381 
382 #if INTR_SLEEPING_MTCP
383 		/* signal to mtcp thread if it is sleeping */
384 		if (mtcp->wakeup_flag && mtcp->is_sleeping) {
385 			pthread_kill(mtcp->ctx->thread, SIGUSR1);
386 		}
387 #endif
388 		ep->stat.waits++;
389 		ep->waiting = TRUE;
390 		if (timeout > 0) {
391 			struct timespec deadline;
392 
393 			clock_gettime(CLOCK_REALTIME, &deadline);
394 			if (timeout >= 1000) {
395 				int sec;
396 				sec = timeout / 1000;
397 				deadline.tv_sec += sec;
398 				timeout -= sec * 1000;
399 			}
400 
401 			deadline.tv_nsec += timeout * 1000000;
402 
403 			if (deadline.tv_nsec >= 1000000000) {
404 				deadline.tv_sec++;
405 				deadline.tv_nsec -= 1000000000;
406 			}
407 
408 			//deadline.tv_sec = mtcp->cur_tv.tv_sec;
409 			//deadline.tv_nsec = (mtcp->cur_tv.tv_usec + timeout * 1000) * 1000;
410 			ret = pthread_cond_timedwait(&ep->epoll_cond,
411 					&ep->epoll_lock, &deadline);
412 			if (ret && ret != ETIMEDOUT) {
413 				/* errno set by pthread_cond_timedwait() */
414 				pthread_mutex_unlock(&ep->epoll_lock);
415 				TRACE_ERROR("pthread_cond_timedwait failed. ret: %d, error: %s\n",
416 						ret, strerror(errno));
417 				return -1;
418 			}
419 			timeout = 0;
420 		} else if (timeout < 0) {
421 			ret = pthread_cond_wait(&ep->epoll_cond, &ep->epoll_lock);
422 			if (ret) {
423 				/* errno set by pthread_cond_wait() */
424 				pthread_mutex_unlock(&ep->epoll_lock);
425 				TRACE_ERROR("pthread_cond_wait failed. ret: %d, error: %s\n",
426 						ret, strerror(errno));
427 				return -1;
428 			}
429 		}
430 		ep->waiting = FALSE;
431 
432 		if (mtcp->ctx->done || mtcp->ctx->exit || mtcp->ctx->interrupt) {
433 			mtcp->ctx->interrupt = FALSE;
434 			//ret = pthread_cond_signal(&ep->epoll_cond);
435 			pthread_mutex_unlock(&ep->epoll_lock);
436 			errno = EINTR;
437 			return -1;
438 		}
439 
440 	}
441 
442 	/* fetch events from the user event queue */
443 	cnt = 0;
444 	num_events = eq->num_events;
445 	for (i = 0; i < num_events && cnt < maxevents; i++) {
446 		event_socket = &mtcp->smap[eq->events[eq->start].sockid];
447 		validity = TRUE;
448 		if (event_socket->socktype == MOS_SOCK_UNUSED)
449 			validity = FALSE;
450 		if (!(event_socket->epoll & eq->events[eq->start].ev.events))
451 			validity = FALSE;
452 		if (!(event_socket->events & eq->events[eq->start].ev.events))
453 			validity = FALSE;
454 
455 		if (validity) {
456 			events[cnt++] = eq->events[eq->start].ev;
457 			assert(eq->events[eq->start].sockid >= 0);
458 
459 			TRACE_EPOLL("Socket %d: Handled event. event: %s, "
460 					"start: %u, end: %u, num: %u\n",
461 					event_socket->id,
462 					EventToString(eq->events[eq->start].ev.events),
463 					eq->start, eq->end, eq->num_events);
464 			ep->stat.handled++;
465 		} else {
466 			TRACE_EPOLL("Socket %d: event %s invalidated.\n",
467 					eq->events[eq->start].sockid,
468 					EventToString(eq->events[eq->start].ev.events));
469 			ep->stat.invalidated++;
470 		}
471 		event_socket->events &= (~eq->events[eq->start].ev.events);
472 
473 		eq->start++;
474 		eq->num_events--;
475 		if (eq->start >= eq->size) {
476 			eq->start = 0;
477 		}
478 	}
479 
480 	/* fetch eventes from user shadow event queue */
481 	eq = ep->usr_shadow_queue;
482 	num_events = eq->num_events;
483 	for (i = 0; i < num_events && cnt < maxevents; i++) {
484 		event_socket = &mtcp->smap[eq->events[eq->start].sockid];
485 		validity = TRUE;
486 		if (event_socket->socktype == MOS_SOCK_UNUSED)
487 			validity = FALSE;
488 		if (!(event_socket->epoll & eq->events[eq->start].ev.events))
489 			validity = FALSE;
490 		if (!(event_socket->events & eq->events[eq->start].ev.events))
491 			validity = FALSE;
492 
493 		if (validity) {
494 			events[cnt++] = eq->events[eq->start].ev;
495 			assert(eq->events[eq->start].sockid >= 0);
496 
497 			TRACE_EPOLL("Socket %d: Handled event. event: %s, "
498 					"start: %u, end: %u, num: %u\n",
499 					event_socket->id,
500 					EventToString(eq->events[eq->start].ev.events),
501 					eq->start, eq->end, eq->num_events);
502 			ep->stat.handled++;
503 		} else {
504 			TRACE_EPOLL("Socket %d: event %s invalidated.\n",
505 					eq->events[eq->start].sockid,
506 					EventToString(eq->events[eq->start].ev.events));
507 			ep->stat.invalidated++;
508 		}
509 		event_socket->events &= (~eq->events[eq->start].ev.events);
510 
511 		eq->start++;
512 		eq->num_events--;
513 		if (eq->start >= eq->size) {
514 			eq->start = 0;
515 		}
516 	}
517 
518 	if (cnt == 0 && timeout != 0)
519 		goto wait;
520 
521 	pthread_mutex_unlock(&ep->epoll_lock);
522 
523 	return cnt;
524 }
525 /*----------------------------------------------------------------------------*/
526 inline int
527 AddEpollEvent(struct mtcp_epoll *ep,
528 		int queue_type, socket_map_t socket, uint32_t event)
529 {
530 #ifdef DBGMSG
531 	__PREPARE_DBGLOGGING();
532 #endif
533 	struct event_queue *eq;
534 	int index;
535 
536 	if (!ep || !socket || !event)
537 		return -1;
538 
539 	ep->stat.issued++;
540 
541 	if (socket->events & event) {
542 		return 0;
543 	}
544 
545 	if (queue_type == MOS_EVENT_QUEUE) {
546 		eq = ep->mtcp_queue;
547 	} else if (queue_type == USR_EVENT_QUEUE) {
548 		eq = ep->usr_queue;
549 		pthread_mutex_lock(&ep->epoll_lock);
550 	} else if (queue_type == USR_SHADOW_EVENT_QUEUE) {
551 		eq = ep->usr_shadow_queue;
552 	} else {
553 		TRACE_ERROR("Non-existing event queue type!\n");
554 		return -1;
555 	}
556 
557 	if (eq->num_events >= eq->size) {
558 		TRACE_ERROR("Exceeded epoll event queue! num_events: %d, size: %d\n",
559 				eq->num_events, eq->size);
560 		if (queue_type == USR_EVENT_QUEUE)
561 			pthread_mutex_unlock(&ep->epoll_lock);
562 		return -1;
563 	}
564 
565 	index = eq->end++;
566 
567 	socket->events |= event;
568 	eq->events[index].sockid = socket->id;
569 	eq->events[index].ev.events = event;
570 	eq->events[index].ev.data = socket->ep_data;
571 
572 	if (eq->end >= eq->size) {
573 		eq->end = 0;
574 	}
575 	eq->num_events++;
576 
577 	TRACE_EPOLL("Socket %d New event: %s, start: %u, end: %u, num: %u\n",
578 			eq->events[index].sockid,
579 			EventToString(eq->events[index].ev.events),
580 			eq->start, eq->end, eq->num_events);
581 
582 	if (queue_type == USR_EVENT_QUEUE)
583 		pthread_mutex_unlock(&ep->epoll_lock);
584 
585 	ep->stat.registered++;
586 
587 	return 0;
588 }
589