xref: /mOS-networking-stack/core/src/eventpoll.c (revision cafe7743)
1 #include "debug.h"
2 #include <sys/queue.h>
3 #include <unistd.h>
4 #include <time.h>
5 #include <signal.h>
6 #include <assert.h>
7 #include <string.h>
8 
9 #include "mtcp.h"
10 #include "tcp_stream.h"
11 #include "eventpoll.h"
12 #include "tcp_in.h"
13 #include "pipe.h"
14 #include "tcp_rb.h"
15 #include "config.h"
16 
17 #define MAX(a, b) ((a)>(b)?(a):(b))
18 #define MIN(a, b) ((a)<(b)?(a):(b))
19 
20 #define SPIN_BEFORE_SLEEP FALSE
21 #define SPIN_THRESH 10000000
22 
23 /*----------------------------------------------------------------------------*/
24 char *event_str[] = {"NONE", "IN", "PRI", "OUT", "ERR", "HUP", "RDHUP"};
25 /*----------------------------------------------------------------------------*/
26 char *
27 EventToString(uint32_t event)
28 {
29 	switch (event) {
30 		case MOS_EPOLLNONE:
31 			return event_str[0];
32 			break;
33 		case MOS_EPOLLIN:
34 			return event_str[1];
35 			break;
36 		case MOS_EPOLLPRI:
37 			return event_str[2];
38 			break;
39 		case MOS_EPOLLOUT:
40 			return event_str[3];
41 			break;
42 		case MOS_EPOLLERR:
43 			return event_str[4];
44 			break;
45 		case MOS_EPOLLHUP:
46 			return event_str[5];
47 			break;
48 		case MOS_EPOLLRDHUP:
49 			return event_str[6];
50 			break;
51 		default:
52 			assert(0);
53 	}
54 
55 	assert(0);
56 	return NULL;
57 }
58 /*----------------------------------------------------------------------------*/
59 struct event_queue *
60 CreateEventQueue(int size)
61 {
62 	struct event_queue *eq;
63 
64 	eq = (struct event_queue *)calloc(1, sizeof(struct event_queue));
65 	if (!eq)
66 		return NULL;
67 
68 	eq->start = 0;
69 	eq->end = 0;
70 	eq->size = size;
71 	eq->events = (struct mtcp_epoll_event_int *)
72 			calloc(size, sizeof(struct mtcp_epoll_event_int));
73 	if (!eq->events) {
74 		free(eq);
75 		return NULL;
76 	}
77 	eq->num_events = 0;
78 
79 	return eq;
80 }
81 /*----------------------------------------------------------------------------*/
82 void
83 DestroyEventQueue(struct event_queue *eq)
84 {
85 	if (eq->events)
86 		free(eq->events);
87 
88 	free(eq);
89 }
90 /*----------------------------------------------------------------------------*/
91 int
92 mtcp_epoll_create(mctx_t mctx, int size)
93 {
94 	mtcp_manager_t mtcp = g_mtcp[mctx->cpu];
95 	struct mtcp_epoll *ep;
96 	socket_map_t epsocket;
97 
98 	if (size <= 0) {
99 		errno = EINVAL;
100 		return -1;
101 	}
102 
103 	epsocket = AllocateSocket(mctx, MOS_SOCK_EPOLL);
104 	if (!epsocket) {
105 		errno = ENFILE;
106 		return -1;
107 	}
108 
109 	ep = (struct mtcp_epoll *)calloc(1, sizeof(struct mtcp_epoll));
110 	if (!ep) {
111 		FreeSocket(mctx, epsocket->id, MOS_SOCK_EPOLL);
112 		return -1;
113 	}
114 
115 	/* create event queues */
116 	ep->usr_queue = CreateEventQueue(size);
117 	if (!ep->usr_queue)
118 		return -1;
119 
120 	ep->usr_shadow_queue = CreateEventQueue(size);
121 	if (!ep->usr_shadow_queue) {
122 		DestroyEventQueue(ep->usr_queue);
123 		return -1;
124 	}
125 
126 	ep->mtcp_queue = CreateEventQueue(size);
127 	if (!ep->mtcp_queue) {
128 		DestroyEventQueue(ep->usr_queue);
129 		DestroyEventQueue(ep->usr_shadow_queue);
130 		return -1;
131 	}
132 
133 	TRACE_EPOLL("epoll structure of size %d created.\n", size);
134 
135 	mtcp->ep = ep;
136 	epsocket->ep = ep;
137 
138 	if (pthread_mutex_init(&ep->epoll_lock, NULL)) {
139 		return -1;
140 	}
141 	if (pthread_cond_init(&ep->epoll_cond, NULL)) {
142 		return -1;
143 	}
144 
145 	return epsocket->id;
146 }
147 /*----------------------------------------------------------------------------*/
148 int
149 CloseEpollSocket(mctx_t mctx, int epid)
150 {
151 	mtcp_manager_t mtcp;
152 	struct mtcp_epoll *ep;
153 
154 	mtcp = GetMTCPManager(mctx);
155 	if (!mtcp) {
156 		return -1;
157 	}
158 
159 	ep = mtcp->smap[epid].ep;
160 	if (!ep) {
161 		errno = EINVAL;
162 		return -1;
163 	}
164 
165 	DestroyEventQueue(ep->usr_queue);
166 	DestroyEventQueue(ep->usr_shadow_queue);
167 	DestroyEventQueue(ep->mtcp_queue);
168 	free(ep);
169 
170 	pthread_mutex_lock(&ep->epoll_lock);
171 	mtcp->ep = NULL;
172 	mtcp->smap[epid].ep = NULL;
173 	pthread_cond_signal(&ep->epoll_cond);
174 	pthread_mutex_unlock(&ep->epoll_lock);
175 
176 	pthread_cond_destroy(&ep->epoll_cond);
177 	pthread_mutex_destroy(&ep->epoll_lock);
178 
179 	return 0;
180 }
181 /*----------------------------------------------------------------------------*/
182 static int
183 RaisePendingStreamEvents(mtcp_manager_t mtcp,
184 		struct mtcp_epoll *ep, socket_map_t socket)
185 {
186 	tcp_stream *stream = socket->stream;
187 
188 	if (!stream)
189 		return -1;
190 	if (stream->state < TCP_ST_ESTABLISHED)
191 		return -1;
192 
193 	TRACE_EPOLL("Stream %d at state %s\n",
194 			stream->id, TCPStateToString(stream));
195 	/* if there are payloads already read before epoll registration */
196 	/* generate read event */
197 	if (socket->epoll & MOS_EPOLLIN) {
198 		struct tcp_recv_vars *rcvvar = stream->rcvvar;
199 #ifdef NEWRB
200 		if (rcvvar->rcvbuf && tcprb_cflen(rcvvar->rcvbuf) > 0) {
201 #else
202 		if (rcvvar->rcvbuf && rcvvar->rcvbuf->merged_len > 0) {
203 #endif
204 			TRACE_EPOLL("Socket %d: Has existing payloads\n", socket->id);
205 			AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
206 		} else if (stream->state == TCP_ST_CLOSE_WAIT) {
207 			TRACE_EPOLL("Socket %d: Waiting for close\n", socket->id);
208 			AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
209 		}
210 	}
211 
212 	/* same thing to the write event */
213 	if (socket->epoll & MOS_EPOLLOUT) {
214 		struct tcp_send_vars *sndvar = stream->sndvar;
215 		if (!sndvar->sndbuf ||
216 				(sndvar->sndbuf && sndvar->sndbuf->len < sndvar->snd_wnd)) {
217 			if (!(socket->events & MOS_EPOLLOUT)) {
218 				TRACE_EPOLL("Socket %d: Adding write event\n", socket->id);
219 				AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
220 			}
221 		}
222 	}
223 
224 	return 0;
225 }
226 /*----------------------------------------------------------------------------*/
227 int
228 mtcp_epoll_ctl(mctx_t mctx, int epid,
229 		int op, int sockid, struct mtcp_epoll_event *event)
230 {
231 	mtcp_manager_t mtcp;
232 	struct mtcp_epoll *ep;
233 	socket_map_t socket;
234 	uint32_t events;
235 
236 	mtcp = GetMTCPManager(mctx);
237 	if (!mtcp) {
238 		return -1;
239 	}
240 
241 	if (epid < 0 || epid >= g_config.mos->max_concurrency) {
242 		TRACE_API("Epoll id %d out of range.\n", epid);
243 		errno = EBADF;
244 		return -1;
245 	}
246 
247 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
248 		TRACE_API("Socket id %d out of range.\n", sockid);
249 		errno = EBADF;
250 		return -1;
251 	}
252 
253 	if (mtcp->smap[epid].socktype == MOS_SOCK_UNUSED) {
254 		errno = EBADF;
255 		return -1;
256 	}
257 
258 	if (mtcp->smap[epid].socktype != MOS_SOCK_EPOLL) {
259 		errno = EINVAL;
260 		return -1;
261 	}
262 
263 	ep = mtcp->smap[epid].ep;
264 	if (!ep || (!event && op != MOS_EPOLL_CTL_DEL)) {
265 		errno = EINVAL;
266 		return -1;
267 	}
268 	socket = &mtcp->smap[sockid];
269 
270 	if (op == MOS_EPOLL_CTL_ADD) {
271 		if (socket->epoll) {
272 			errno = EEXIST;
273 			return -1;
274 		}
275 
276 		/* EPOLLERR and EPOLLHUP are registered as default */
277 		events = event->events;
278 		events |= (MOS_EPOLLERR | MOS_EPOLLHUP);
279 		socket->ep_data = event->data;
280 		socket->epoll = events;
281 
282 		TRACE_EPOLL("Adding epoll socket %d(type %d) ET: %u, IN: %u, OUT: %u\n",
283 				socket->id, socket->socktype, socket->epoll & MOS_EPOLLET,
284 				socket->epoll & MOS_EPOLLIN, socket->epoll & MOS_EPOLLOUT);
285 
286 		if (socket->socktype == MOS_SOCK_STREAM) {
287 			RaisePendingStreamEvents(mtcp, ep, socket);
288 		} else if (socket->socktype == MOS_SOCK_PIPE) {
289 			RaisePendingPipeEvents(mctx, epid, sockid);
290 		}
291 
292 	} else if (op == MOS_EPOLL_CTL_MOD) {
293 		if (!socket->epoll) {
294 			pthread_mutex_unlock(&ep->epoll_lock);
295 			errno = ENOENT;
296 			return -1;
297 		}
298 
299 		events = event->events;
300 		events |= (MOS_EPOLLERR | MOS_EPOLLHUP);
301 		socket->ep_data = event->data;
302 		socket->epoll = events;
303 
304 		if (socket->socktype == MOS_SOCK_STREAM) {
305 			RaisePendingStreamEvents(mtcp, ep, socket);
306 		} else if (socket->socktype == MOS_SOCK_PIPE) {
307 			RaisePendingPipeEvents(mctx, epid, sockid);
308 		}
309 
310 	} else if (op == MOS_EPOLL_CTL_DEL) {
311 		if (!socket->epoll) {
312 			errno = ENOENT;
313 			return -1;
314 		}
315 
316 		socket->epoll = MOS_EPOLLNONE;
317 	}
318 
319 	return 0;
320 }
321 /*----------------------------------------------------------------------------*/
322 int
323 mtcp_epoll_wait(mctx_t mctx, int epid,
324 		struct mtcp_epoll_event *events, int maxevents, int timeout)
325 {
326 	mtcp_manager_t mtcp;
327 	struct mtcp_epoll *ep;
328 	struct event_queue *eq;
329 	struct event_queue *eq_shadow;
330 	socket_map_t event_socket;
331 	int validity;
332 	int i, cnt, ret;
333 	int num_events;
334 
335 	mtcp = GetMTCPManager(mctx);
336 	if (!mtcp) {
337 		return -1;
338 	}
339 
340 	if (epid < 0 || epid >= g_config.mos->max_concurrency) {
341 		TRACE_API("Epoll id %d out of range.\n", epid);
342 		errno = EBADF;
343 		return -1;
344 	}
345 
346 	if (mtcp->smap[epid].socktype == MOS_SOCK_UNUSED) {
347 		errno = EBADF;
348 		return -1;
349 	}
350 
351 	if (mtcp->smap[epid].socktype != MOS_SOCK_EPOLL) {
352 		errno = EINVAL;
353 		return -1;
354 	}
355 
356 	ep = mtcp->smap[epid].ep;
357 	if (!ep || !events || maxevents <= 0) {
358 		errno = EINVAL;
359 		return -1;
360 	}
361 
362 	ep->stat.calls++;
363 
364 #if SPIN_BEFORE_SLEEP
365 	int spin = 0;
366 	while (ep->num_events == 0 && spin < SPIN_THRESH) {
367 		spin++;
368 	}
369 #endif /* SPIN_BEFORE_SLEEP */
370 
371 	if (pthread_mutex_lock(&ep->epoll_lock)) {
372 		if (errno == EDEADLK)
373 			perror("mtcp_epoll_wait: epoll_lock blocked\n");
374 		assert(0);
375 	}
376 
377 wait:
378 	eq = ep->usr_queue;
379 	eq_shadow = ep->usr_shadow_queue;
380 
381 	/* wait until event occurs */
382 	while (eq->num_events == 0 && eq_shadow->num_events == 0 && timeout != 0) {
383 
384 #if INTR_SLEEPING_MTCP
385 		/* signal to mtcp thread if it is sleeping */
386 		if (mtcp->wakeup_flag && mtcp->is_sleeping) {
387 			pthread_kill(mtcp->ctx->thread, SIGUSR1);
388 		}
389 #endif
390 		ep->stat.waits++;
391 		ep->waiting = TRUE;
392 		if (timeout > 0) {
393 			struct timespec deadline;
394 
395 			clock_gettime(CLOCK_REALTIME, &deadline);
396 			if (timeout > 1000) {
397 				int sec;
398 				sec = timeout / 1000;
399 				deadline.tv_sec += sec;
400 				timeout -= sec * 1000;
401 			}
402 
403 			if (deadline.tv_nsec >= 1000000000) {
404 				deadline.tv_sec++;
405 				deadline.tv_nsec -= 1000000000;
406 			}
407 
408 			//deadline.tv_sec = mtcp->cur_tv.tv_sec;
409 			//deadline.tv_nsec = (mtcp->cur_tv.tv_usec + timeout * 1000) * 1000;
410 			ret = pthread_cond_timedwait(&ep->epoll_cond,
411 					&ep->epoll_lock, &deadline);
412 			if (ret && ret != ETIMEDOUT) {
413 				/* errno set by pthread_cond_timedwait() */
414 				pthread_mutex_unlock(&ep->epoll_lock);
415 				TRACE_ERROR("pthread_cond_timedwait failed. ret: %d, error: %s\n",
416 						ret, strerror(errno));
417 				return -1;
418 			}
419 			timeout = 0;
420 		} else if (timeout < 0) {
421 			ret = pthread_cond_wait(&ep->epoll_cond, &ep->epoll_lock);
422 			if (ret) {
423 				/* errno set by pthread_cond_wait() */
424 				pthread_mutex_unlock(&ep->epoll_lock);
425 				TRACE_ERROR("pthread_cond_wait failed. ret: %d, error: %s\n",
426 						ret, strerror(errno));
427 				return -1;
428 			}
429 		}
430 		ep->waiting = FALSE;
431 
432 		if (mtcp->ctx->done || mtcp->ctx->exit || mtcp->ctx->interrupt) {
433 			mtcp->ctx->interrupt = FALSE;
434 			//ret = pthread_cond_signal(&ep->epoll_cond);
435 			pthread_mutex_unlock(&ep->epoll_lock);
436 			errno = EINTR;
437 			return -1;
438 		}
439 
440 	}
441 
442 	/* fetch events from the user event queue */
443 	cnt = 0;
444 	num_events = eq->num_events;
445 	for (i = 0; i < num_events && cnt < maxevents; i++) {
446 		event_socket = &mtcp->smap[eq->events[eq->start].sockid];
447 		validity = TRUE;
448 		if (event_socket->socktype == MOS_SOCK_UNUSED)
449 			validity = FALSE;
450 		if (!(event_socket->epoll & eq->events[eq->start].ev.events))
451 			validity = FALSE;
452 		if (!(event_socket->events & eq->events[eq->start].ev.events))
453 			validity = FALSE;
454 
455 		if (validity) {
456 			events[cnt++] = eq->events[eq->start].ev;
457 			assert(eq->events[eq->start].sockid >= 0);
458 
459 			TRACE_EPOLL("Socket %d: Handled event. event: %s, "
460 					"start: %u, end: %u, num: %u\n",
461 					event_socket->id,
462 					EventToString(eq->events[eq->start].ev.events),
463 					eq->start, eq->end, eq->num_events);
464 			ep->stat.handled++;
465 		} else {
466 			TRACE_EPOLL("Socket %d: event %s invalidated.\n",
467 					eq->events[eq->start].sockid,
468 					EventToString(eq->events[eq->start].ev.events));
469 			ep->stat.invalidated++;
470 		}
471 		event_socket->events &= (~eq->events[eq->start].ev.events);
472 
473 		eq->start++;
474 		eq->num_events--;
475 		if (eq->start >= eq->size) {
476 			eq->start = 0;
477 		}
478 	}
479 
480 	/* fetch eventes from user shadow event queue */
481 	eq = ep->usr_shadow_queue;
482 	num_events = eq->num_events;
483 	for (i = 0; i < num_events && cnt < maxevents; i++) {
484 		event_socket = &mtcp->smap[eq->events[eq->start].sockid];
485 		validity = TRUE;
486 		if (event_socket->socktype == MOS_SOCK_UNUSED)
487 			validity = FALSE;
488 		if (!(event_socket->epoll & eq->events[eq->start].ev.events))
489 			validity = FALSE;
490 		if (!(event_socket->events & eq->events[eq->start].ev.events))
491 			validity = FALSE;
492 
493 		if (validity) {
494 			events[cnt++] = eq->events[eq->start].ev;
495 			assert(eq->events[eq->start].sockid >= 0);
496 
497 			TRACE_EPOLL("Socket %d: Handled event. event: %s, "
498 					"start: %u, end: %u, num: %u\n",
499 					event_socket->id,
500 					EventToString(eq->events[eq->start].ev.events),
501 					eq->start, eq->end, eq->num_events);
502 			ep->stat.handled++;
503 		} else {
504 			TRACE_EPOLL("Socket %d: event %s invalidated.\n",
505 					eq->events[eq->start].sockid,
506 					EventToString(eq->events[eq->start].ev.events));
507 			ep->stat.invalidated++;
508 		}
509 		event_socket->events &= (~eq->events[eq->start].ev.events);
510 
511 		eq->start++;
512 		eq->num_events--;
513 		if (eq->start >= eq->size) {
514 			eq->start = 0;
515 		}
516 	}
517 
518 	if (cnt == 0 && timeout != 0)
519 		goto wait;
520 
521 	pthread_mutex_unlock(&ep->epoll_lock);
522 
523 	return cnt;
524 }
525 /*----------------------------------------------------------------------------*/
526 inline int
527 AddEpollEvent(struct mtcp_epoll *ep,
528 		int queue_type, socket_map_t socket, uint32_t event)
529 {
530 #ifdef DBGMSG
531 	__PREPARE_DGBLOGGING();
532 #endif
533 	struct event_queue *eq;
534 	int index;
535 
536 	if (!ep || !socket || !event)
537 		return -1;
538 
539 	ep->stat.issued++;
540 
541 	if (socket->events & event) {
542 		return 0;
543 	}
544 
545 	if (queue_type == MOS_EVENT_QUEUE) {
546 		eq = ep->mtcp_queue;
547 	} else if (queue_type == USR_EVENT_QUEUE) {
548 		eq = ep->usr_queue;
549 		pthread_mutex_lock(&ep->epoll_lock);
550 	} else if (queue_type == USR_SHADOW_EVENT_QUEUE) {
551 		eq = ep->usr_shadow_queue;
552 	} else {
553 		TRACE_ERROR("Non-existing event queue type!\n");
554 		return -1;
555 	}
556 
557 	if (eq->num_events >= eq->size) {
558 		TRACE_ERROR("Exceeded epoll event queue! num_events: %d, size: %d\n",
559 				eq->num_events, eq->size);
560 		if (queue_type == USR_EVENT_QUEUE)
561 			pthread_mutex_unlock(&ep->epoll_lock);
562 		return -1;
563 	}
564 
565 	index = eq->end++;
566 
567 	socket->events |= event;
568 	eq->events[index].sockid = socket->id;
569 	eq->events[index].ev.events = event;
570 	eq->events[index].ev.data = socket->ep_data;
571 
572 	if (eq->end >= eq->size) {
573 		eq->end = 0;
574 	}
575 	eq->num_events++;
576 
577 	TRACE_EPOLL("Socket %d New event: %s, start: %u, end: %u, num: %u\n",
578 			eq->events[index].sockid,
579 			EventToString(eq->events[index].ev.events),
580 			eq->start, eq->end, eq->num_events);
581 
582 	if (queue_type == USR_EVENT_QUEUE)
583 		pthread_mutex_unlock(&ep->epoll_lock);
584 
585 	ep->stat.registered++;
586 
587 	return 0;
588 }
589