1cafe7743SAsim Jamshed #include "debug.h"
276404edcSAsim Jamshed #include <sys/queue.h>
376404edcSAsim Jamshed #include <unistd.h>
476404edcSAsim Jamshed #include <time.h>
576404edcSAsim Jamshed #include <signal.h>
676404edcSAsim Jamshed #include <assert.h>
776404edcSAsim Jamshed #include <string.h>
876404edcSAsim Jamshed
976404edcSAsim Jamshed #include "mtcp.h"
1076404edcSAsim Jamshed #include "tcp_stream.h"
1176404edcSAsim Jamshed #include "eventpoll.h"
1276404edcSAsim Jamshed #include "tcp_in.h"
1376404edcSAsim Jamshed #include "pipe.h"
1476404edcSAsim Jamshed #include "tcp_rb.h"
1576404edcSAsim Jamshed #include "config.h"
1676404edcSAsim Jamshed
1776404edcSAsim Jamshed #define MAX(a, b) ((a)>(b)?(a):(b))
1876404edcSAsim Jamshed #define MIN(a, b) ((a)<(b)?(a):(b))
1976404edcSAsim Jamshed
2076404edcSAsim Jamshed #define SPIN_BEFORE_SLEEP FALSE
2176404edcSAsim Jamshed #define SPIN_THRESH 10000000
2276404edcSAsim Jamshed
2376404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
2476404edcSAsim Jamshed char *event_str[] = {"NONE", "IN", "PRI", "OUT", "ERR", "HUP", "RDHUP"};
2576404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
2676404edcSAsim Jamshed char *
EventToString(uint32_t event)2776404edcSAsim Jamshed EventToString(uint32_t event)
2876404edcSAsim Jamshed {
2976404edcSAsim Jamshed switch (event) {
3076404edcSAsim Jamshed case MOS_EPOLLNONE:
3176404edcSAsim Jamshed return event_str[0];
3276404edcSAsim Jamshed break;
3376404edcSAsim Jamshed case MOS_EPOLLIN:
3476404edcSAsim Jamshed return event_str[1];
3576404edcSAsim Jamshed break;
3676404edcSAsim Jamshed case MOS_EPOLLPRI:
3776404edcSAsim Jamshed return event_str[2];
3876404edcSAsim Jamshed break;
3976404edcSAsim Jamshed case MOS_EPOLLOUT:
4076404edcSAsim Jamshed return event_str[3];
4176404edcSAsim Jamshed break;
4276404edcSAsim Jamshed case MOS_EPOLLERR:
4376404edcSAsim Jamshed return event_str[4];
4476404edcSAsim Jamshed break;
4576404edcSAsim Jamshed case MOS_EPOLLHUP:
4676404edcSAsim Jamshed return event_str[5];
4776404edcSAsim Jamshed break;
4876404edcSAsim Jamshed case MOS_EPOLLRDHUP:
4976404edcSAsim Jamshed return event_str[6];
5076404edcSAsim Jamshed break;
5176404edcSAsim Jamshed default:
5276404edcSAsim Jamshed assert(0);
5376404edcSAsim Jamshed }
5476404edcSAsim Jamshed
5576404edcSAsim Jamshed assert(0);
5676404edcSAsim Jamshed return NULL;
5776404edcSAsim Jamshed }
5876404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
5976404edcSAsim Jamshed struct event_queue *
CreateEventQueue(int size)6076404edcSAsim Jamshed CreateEventQueue(int size)
6176404edcSAsim Jamshed {
6276404edcSAsim Jamshed struct event_queue *eq;
6376404edcSAsim Jamshed
6476404edcSAsim Jamshed eq = (struct event_queue *)calloc(1, sizeof(struct event_queue));
6576404edcSAsim Jamshed if (!eq)
6676404edcSAsim Jamshed return NULL;
6776404edcSAsim Jamshed
6876404edcSAsim Jamshed eq->start = 0;
6976404edcSAsim Jamshed eq->end = 0;
7076404edcSAsim Jamshed eq->size = size;
7176404edcSAsim Jamshed eq->events = (struct mtcp_epoll_event_int *)
7276404edcSAsim Jamshed calloc(size, sizeof(struct mtcp_epoll_event_int));
7376404edcSAsim Jamshed if (!eq->events) {
7476404edcSAsim Jamshed free(eq);
7576404edcSAsim Jamshed return NULL;
7676404edcSAsim Jamshed }
7776404edcSAsim Jamshed eq->num_events = 0;
7876404edcSAsim Jamshed
7976404edcSAsim Jamshed return eq;
8076404edcSAsim Jamshed }
8176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
8276404edcSAsim Jamshed void
DestroyEventQueue(struct event_queue * eq)8376404edcSAsim Jamshed DestroyEventQueue(struct event_queue *eq)
8476404edcSAsim Jamshed {
8576404edcSAsim Jamshed if (eq->events)
8676404edcSAsim Jamshed free(eq->events);
8776404edcSAsim Jamshed
8876404edcSAsim Jamshed free(eq);
8976404edcSAsim Jamshed }
9076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
9176404edcSAsim Jamshed int
mtcp_epoll_create(mctx_t mctx,int size)9276404edcSAsim Jamshed mtcp_epoll_create(mctx_t mctx, int size)
9376404edcSAsim Jamshed {
9476404edcSAsim Jamshed mtcp_manager_t mtcp = g_mtcp[mctx->cpu];
9576404edcSAsim Jamshed struct mtcp_epoll *ep;
9676404edcSAsim Jamshed socket_map_t epsocket;
9776404edcSAsim Jamshed
9876404edcSAsim Jamshed if (size <= 0) {
9976404edcSAsim Jamshed errno = EINVAL;
10076404edcSAsim Jamshed return -1;
10176404edcSAsim Jamshed }
10276404edcSAsim Jamshed
10376404edcSAsim Jamshed epsocket = AllocateSocket(mctx, MOS_SOCK_EPOLL);
10476404edcSAsim Jamshed if (!epsocket) {
10576404edcSAsim Jamshed errno = ENFILE;
10676404edcSAsim Jamshed return -1;
10776404edcSAsim Jamshed }
10876404edcSAsim Jamshed
10976404edcSAsim Jamshed ep = (struct mtcp_epoll *)calloc(1, sizeof(struct mtcp_epoll));
11076404edcSAsim Jamshed if (!ep) {
11176404edcSAsim Jamshed FreeSocket(mctx, epsocket->id, MOS_SOCK_EPOLL);
11276404edcSAsim Jamshed return -1;
11376404edcSAsim Jamshed }
11476404edcSAsim Jamshed
11576404edcSAsim Jamshed /* create event queues */
11676404edcSAsim Jamshed ep->usr_queue = CreateEventQueue(size);
117*8a941c7eSAsim Jamshed if (!ep->usr_queue) {
118*8a941c7eSAsim Jamshed FreeSocket(mctx, epsocket->id, FALSE);
119*8a941c7eSAsim Jamshed free(ep);
12076404edcSAsim Jamshed return -1;
121*8a941c7eSAsim Jamshed }
12276404edcSAsim Jamshed
12376404edcSAsim Jamshed ep->usr_shadow_queue = CreateEventQueue(size);
12476404edcSAsim Jamshed if (!ep->usr_shadow_queue) {
12576404edcSAsim Jamshed DestroyEventQueue(ep->usr_queue);
126*8a941c7eSAsim Jamshed FreeSocket(mctx, epsocket->id, FALSE);
127*8a941c7eSAsim Jamshed free(ep);
12876404edcSAsim Jamshed return -1;
12976404edcSAsim Jamshed }
13076404edcSAsim Jamshed
13176404edcSAsim Jamshed ep->mtcp_queue = CreateEventQueue(size);
13276404edcSAsim Jamshed if (!ep->mtcp_queue) {
13376404edcSAsim Jamshed DestroyEventQueue(ep->usr_shadow_queue);
134*8a941c7eSAsim Jamshed DestroyEventQueue(ep->usr_queue);
135*8a941c7eSAsim Jamshed FreeSocket(mctx, epsocket->id, FALSE);
136*8a941c7eSAsim Jamshed free(ep);
13776404edcSAsim Jamshed return -1;
13876404edcSAsim Jamshed }
13976404edcSAsim Jamshed
140e160edcaSAsim Jamshed TRACE_EPOLL("epoll structure of size %d created.\n", size);
14176404edcSAsim Jamshed
14276404edcSAsim Jamshed mtcp->ep = ep;
14376404edcSAsim Jamshed epsocket->ep = ep;
14476404edcSAsim Jamshed
14576404edcSAsim Jamshed if (pthread_mutex_init(&ep->epoll_lock, NULL)) {
146*8a941c7eSAsim Jamshed DestroyEventQueue(ep->mtcp_queue);
147*8a941c7eSAsim Jamshed DestroyEventQueue(ep->usr_shadow_queue);
148*8a941c7eSAsim Jamshed DestroyEventQueue(ep->usr_queue);
149*8a941c7eSAsim Jamshed FreeSocket(mctx, epsocket->id, FALSE);
150*8a941c7eSAsim Jamshed free(ep);
15176404edcSAsim Jamshed return -1;
15276404edcSAsim Jamshed }
15376404edcSAsim Jamshed if (pthread_cond_init(&ep->epoll_cond, NULL)) {
154*8a941c7eSAsim Jamshed DestroyEventQueue(ep->mtcp_queue);
155*8a941c7eSAsim Jamshed DestroyEventQueue(ep->usr_shadow_queue);
156*8a941c7eSAsim Jamshed DestroyEventQueue(ep->usr_queue);
157*8a941c7eSAsim Jamshed FreeSocket(mctx, epsocket->id, FALSE);
158*8a941c7eSAsim Jamshed free(ep);
15976404edcSAsim Jamshed return -1;
16076404edcSAsim Jamshed }
16176404edcSAsim Jamshed
16276404edcSAsim Jamshed return epsocket->id;
16376404edcSAsim Jamshed }
16476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
16576404edcSAsim Jamshed int
CloseEpollSocket(mctx_t mctx,int epid)16676404edcSAsim Jamshed CloseEpollSocket(mctx_t mctx, int epid)
16776404edcSAsim Jamshed {
16876404edcSAsim Jamshed mtcp_manager_t mtcp;
16976404edcSAsim Jamshed struct mtcp_epoll *ep;
17076404edcSAsim Jamshed
17176404edcSAsim Jamshed mtcp = GetMTCPManager(mctx);
17276404edcSAsim Jamshed if (!mtcp) {
17376404edcSAsim Jamshed return -1;
17476404edcSAsim Jamshed }
17576404edcSAsim Jamshed
17676404edcSAsim Jamshed ep = mtcp->smap[epid].ep;
17776404edcSAsim Jamshed if (!ep) {
17876404edcSAsim Jamshed errno = EINVAL;
17976404edcSAsim Jamshed return -1;
18076404edcSAsim Jamshed }
18176404edcSAsim Jamshed
18276404edcSAsim Jamshed DestroyEventQueue(ep->usr_queue);
18376404edcSAsim Jamshed DestroyEventQueue(ep->usr_shadow_queue);
18476404edcSAsim Jamshed DestroyEventQueue(ep->mtcp_queue);
18576404edcSAsim Jamshed
18676404edcSAsim Jamshed pthread_mutex_lock(&ep->epoll_lock);
18776404edcSAsim Jamshed mtcp->ep = NULL;
18876404edcSAsim Jamshed mtcp->smap[epid].ep = NULL;
18976404edcSAsim Jamshed pthread_cond_signal(&ep->epoll_cond);
19076404edcSAsim Jamshed pthread_mutex_unlock(&ep->epoll_lock);
19176404edcSAsim Jamshed
19276404edcSAsim Jamshed pthread_cond_destroy(&ep->epoll_cond);
19376404edcSAsim Jamshed pthread_mutex_destroy(&ep->epoll_lock);
194*8a941c7eSAsim Jamshed free(ep);
19576404edcSAsim Jamshed
19676404edcSAsim Jamshed return 0;
19776404edcSAsim Jamshed }
19876404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
19976404edcSAsim Jamshed static int
RaisePendingStreamEvents(mtcp_manager_t mtcp,struct mtcp_epoll * ep,socket_map_t socket)20076404edcSAsim Jamshed RaisePendingStreamEvents(mtcp_manager_t mtcp,
20176404edcSAsim Jamshed struct mtcp_epoll *ep, socket_map_t socket)
20276404edcSAsim Jamshed {
20376404edcSAsim Jamshed tcp_stream *stream = socket->stream;
20476404edcSAsim Jamshed
20576404edcSAsim Jamshed if (!stream)
20676404edcSAsim Jamshed return -1;
20776404edcSAsim Jamshed if (stream->state < TCP_ST_ESTABLISHED)
20876404edcSAsim Jamshed return -1;
20976404edcSAsim Jamshed
21076404edcSAsim Jamshed TRACE_EPOLL("Stream %d at state %s\n",
21176404edcSAsim Jamshed stream->id, TCPStateToString(stream));
21276404edcSAsim Jamshed /* if there are payloads already read before epoll registration */
21376404edcSAsim Jamshed /* generate read event */
21476404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLIN) {
21576404edcSAsim Jamshed struct tcp_recv_vars *rcvvar = stream->rcvvar;
21676404edcSAsim Jamshed if (rcvvar->rcvbuf && tcprb_cflen(rcvvar->rcvbuf) > 0) {
21776404edcSAsim Jamshed TRACE_EPOLL("Socket %d: Has existing payloads\n", socket->id);
21876404edcSAsim Jamshed AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
21976404edcSAsim Jamshed } else if (stream->state == TCP_ST_CLOSE_WAIT) {
22076404edcSAsim Jamshed TRACE_EPOLL("Socket %d: Waiting for close\n", socket->id);
22176404edcSAsim Jamshed AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
22276404edcSAsim Jamshed }
22376404edcSAsim Jamshed }
22476404edcSAsim Jamshed
22576404edcSAsim Jamshed /* same thing to the write event */
22676404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLOUT) {
22776404edcSAsim Jamshed struct tcp_send_vars *sndvar = stream->sndvar;
22876404edcSAsim Jamshed if (!sndvar->sndbuf ||
22976404edcSAsim Jamshed (sndvar->sndbuf && sndvar->sndbuf->len < sndvar->snd_wnd)) {
23076404edcSAsim Jamshed if (!(socket->events & MOS_EPOLLOUT)) {
23176404edcSAsim Jamshed TRACE_EPOLL("Socket %d: Adding write event\n", socket->id);
23276404edcSAsim Jamshed AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
23376404edcSAsim Jamshed }
23476404edcSAsim Jamshed }
23576404edcSAsim Jamshed }
23676404edcSAsim Jamshed
23776404edcSAsim Jamshed return 0;
23876404edcSAsim Jamshed }
23976404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
24076404edcSAsim Jamshed int
mtcp_epoll_ctl(mctx_t mctx,int epid,int op,int sockid,struct mtcp_epoll_event * event)24176404edcSAsim Jamshed mtcp_epoll_ctl(mctx_t mctx, int epid,
24276404edcSAsim Jamshed int op, int sockid, struct mtcp_epoll_event *event)
24376404edcSAsim Jamshed {
24476404edcSAsim Jamshed mtcp_manager_t mtcp;
24576404edcSAsim Jamshed struct mtcp_epoll *ep;
24676404edcSAsim Jamshed socket_map_t socket;
24776404edcSAsim Jamshed uint32_t events;
24876404edcSAsim Jamshed
24976404edcSAsim Jamshed mtcp = GetMTCPManager(mctx);
25076404edcSAsim Jamshed if (!mtcp) {
25176404edcSAsim Jamshed return -1;
25276404edcSAsim Jamshed }
25376404edcSAsim Jamshed
25476404edcSAsim Jamshed if (epid < 0 || epid >= g_config.mos->max_concurrency) {
25576404edcSAsim Jamshed TRACE_API("Epoll id %d out of range.\n", epid);
25676404edcSAsim Jamshed errno = EBADF;
25776404edcSAsim Jamshed return -1;
25876404edcSAsim Jamshed }
25976404edcSAsim Jamshed
26076404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
26176404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid);
26276404edcSAsim Jamshed errno = EBADF;
26376404edcSAsim Jamshed return -1;
26476404edcSAsim Jamshed }
26576404edcSAsim Jamshed
26676404edcSAsim Jamshed if (mtcp->smap[epid].socktype == MOS_SOCK_UNUSED) {
26776404edcSAsim Jamshed errno = EBADF;
26876404edcSAsim Jamshed return -1;
26976404edcSAsim Jamshed }
27076404edcSAsim Jamshed
27176404edcSAsim Jamshed if (mtcp->smap[epid].socktype != MOS_SOCK_EPOLL) {
27276404edcSAsim Jamshed errno = EINVAL;
27376404edcSAsim Jamshed return -1;
27476404edcSAsim Jamshed }
27576404edcSAsim Jamshed
27676404edcSAsim Jamshed ep = mtcp->smap[epid].ep;
27776404edcSAsim Jamshed if (!ep || (!event && op != MOS_EPOLL_CTL_DEL)) {
27876404edcSAsim Jamshed errno = EINVAL;
27976404edcSAsim Jamshed return -1;
28076404edcSAsim Jamshed }
28176404edcSAsim Jamshed socket = &mtcp->smap[sockid];
28276404edcSAsim Jamshed
28376404edcSAsim Jamshed if (op == MOS_EPOLL_CTL_ADD) {
28476404edcSAsim Jamshed if (socket->epoll) {
28576404edcSAsim Jamshed errno = EEXIST;
28676404edcSAsim Jamshed return -1;
28776404edcSAsim Jamshed }
28876404edcSAsim Jamshed
28976404edcSAsim Jamshed /* EPOLLERR and EPOLLHUP are registered as default */
29076404edcSAsim Jamshed events = event->events;
29176404edcSAsim Jamshed events |= (MOS_EPOLLERR | MOS_EPOLLHUP);
29276404edcSAsim Jamshed socket->ep_data = event->data;
29376404edcSAsim Jamshed socket->epoll = events;
29476404edcSAsim Jamshed
2954cb4e140SAsim Jamshed TRACE_EPOLL("Adding epoll socket %d(type %d) ET: %llu, IN: %llu, OUT: %llu\n",
2964cb4e140SAsim Jamshed socket->id, socket->socktype,
2974cb4e140SAsim Jamshed (unsigned long long)socket->epoll & MOS_EPOLLET,
2984cb4e140SAsim Jamshed (unsigned long long)socket->epoll & MOS_EPOLLIN,
2994cb4e140SAsim Jamshed (unsigned long long)socket->epoll & MOS_EPOLLOUT);
30076404edcSAsim Jamshed
30176404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_STREAM) {
30276404edcSAsim Jamshed RaisePendingStreamEvents(mtcp, ep, socket);
30376404edcSAsim Jamshed } else if (socket->socktype == MOS_SOCK_PIPE) {
30476404edcSAsim Jamshed RaisePendingPipeEvents(mctx, epid, sockid);
30576404edcSAsim Jamshed }
30676404edcSAsim Jamshed
30776404edcSAsim Jamshed } else if (op == MOS_EPOLL_CTL_MOD) {
30876404edcSAsim Jamshed if (!socket->epoll) {
30976404edcSAsim Jamshed pthread_mutex_unlock(&ep->epoll_lock);
31076404edcSAsim Jamshed errno = ENOENT;
31176404edcSAsim Jamshed return -1;
31276404edcSAsim Jamshed }
31376404edcSAsim Jamshed
31476404edcSAsim Jamshed events = event->events;
31576404edcSAsim Jamshed events |= (MOS_EPOLLERR | MOS_EPOLLHUP);
31676404edcSAsim Jamshed socket->ep_data = event->data;
31776404edcSAsim Jamshed socket->epoll = events;
31876404edcSAsim Jamshed
31976404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_STREAM) {
32076404edcSAsim Jamshed RaisePendingStreamEvents(mtcp, ep, socket);
32176404edcSAsim Jamshed } else if (socket->socktype == MOS_SOCK_PIPE) {
32276404edcSAsim Jamshed RaisePendingPipeEvents(mctx, epid, sockid);
32376404edcSAsim Jamshed }
32476404edcSAsim Jamshed
32576404edcSAsim Jamshed } else if (op == MOS_EPOLL_CTL_DEL) {
32676404edcSAsim Jamshed if (!socket->epoll) {
32776404edcSAsim Jamshed errno = ENOENT;
32876404edcSAsim Jamshed return -1;
32976404edcSAsim Jamshed }
33076404edcSAsim Jamshed
33176404edcSAsim Jamshed socket->epoll = MOS_EPOLLNONE;
33276404edcSAsim Jamshed }
33376404edcSAsim Jamshed
33476404edcSAsim Jamshed return 0;
33576404edcSAsim Jamshed }
33676404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
33776404edcSAsim Jamshed int
mtcp_epoll_wait(mctx_t mctx,int epid,struct mtcp_epoll_event * events,int maxevents,int timeout)33876404edcSAsim Jamshed mtcp_epoll_wait(mctx_t mctx, int epid,
33976404edcSAsim Jamshed struct mtcp_epoll_event *events, int maxevents, int timeout)
34076404edcSAsim Jamshed {
34176404edcSAsim Jamshed mtcp_manager_t mtcp;
34276404edcSAsim Jamshed struct mtcp_epoll *ep;
34376404edcSAsim Jamshed struct event_queue *eq;
34476404edcSAsim Jamshed struct event_queue *eq_shadow;
34576404edcSAsim Jamshed socket_map_t event_socket;
34676404edcSAsim Jamshed int validity;
34776404edcSAsim Jamshed int i, cnt, ret;
34876404edcSAsim Jamshed int num_events;
34976404edcSAsim Jamshed
35076404edcSAsim Jamshed mtcp = GetMTCPManager(mctx);
35176404edcSAsim Jamshed if (!mtcp) {
35276404edcSAsim Jamshed return -1;
35376404edcSAsim Jamshed }
35476404edcSAsim Jamshed
35576404edcSAsim Jamshed if (epid < 0 || epid >= g_config.mos->max_concurrency) {
35676404edcSAsim Jamshed TRACE_API("Epoll id %d out of range.\n", epid);
35776404edcSAsim Jamshed errno = EBADF;
35876404edcSAsim Jamshed return -1;
35976404edcSAsim Jamshed }
36076404edcSAsim Jamshed
36176404edcSAsim Jamshed if (mtcp->smap[epid].socktype == MOS_SOCK_UNUSED) {
36276404edcSAsim Jamshed errno = EBADF;
36376404edcSAsim Jamshed return -1;
36476404edcSAsim Jamshed }
36576404edcSAsim Jamshed
36676404edcSAsim Jamshed if (mtcp->smap[epid].socktype != MOS_SOCK_EPOLL) {
36776404edcSAsim Jamshed errno = EINVAL;
36876404edcSAsim Jamshed return -1;
36976404edcSAsim Jamshed }
37076404edcSAsim Jamshed
37176404edcSAsim Jamshed ep = mtcp->smap[epid].ep;
37276404edcSAsim Jamshed if (!ep || !events || maxevents <= 0) {
37376404edcSAsim Jamshed errno = EINVAL;
37476404edcSAsim Jamshed return -1;
37576404edcSAsim Jamshed }
37676404edcSAsim Jamshed
37776404edcSAsim Jamshed ep->stat.calls++;
37876404edcSAsim Jamshed
37976404edcSAsim Jamshed #if SPIN_BEFORE_SLEEP
38076404edcSAsim Jamshed int spin = 0;
38176404edcSAsim Jamshed while (ep->num_events == 0 && spin < SPIN_THRESH) {
38276404edcSAsim Jamshed spin++;
38376404edcSAsim Jamshed }
38476404edcSAsim Jamshed #endif /* SPIN_BEFORE_SLEEP */
38576404edcSAsim Jamshed
38676404edcSAsim Jamshed if (pthread_mutex_lock(&ep->epoll_lock)) {
38776404edcSAsim Jamshed if (errno == EDEADLK)
38876404edcSAsim Jamshed perror("mtcp_epoll_wait: epoll_lock blocked\n");
38976404edcSAsim Jamshed assert(0);
39076404edcSAsim Jamshed }
39176404edcSAsim Jamshed
39276404edcSAsim Jamshed wait:
39376404edcSAsim Jamshed eq = ep->usr_queue;
39476404edcSAsim Jamshed eq_shadow = ep->usr_shadow_queue;
39576404edcSAsim Jamshed
39676404edcSAsim Jamshed /* wait until event occurs */
39776404edcSAsim Jamshed while (eq->num_events == 0 && eq_shadow->num_events == 0 && timeout != 0) {
39876404edcSAsim Jamshed
39976404edcSAsim Jamshed #if INTR_SLEEPING_MTCP
40076404edcSAsim Jamshed /* signal to mtcp thread if it is sleeping */
40176404edcSAsim Jamshed if (mtcp->wakeup_flag && mtcp->is_sleeping) {
40276404edcSAsim Jamshed pthread_kill(mtcp->ctx->thread, SIGUSR1);
40376404edcSAsim Jamshed }
40476404edcSAsim Jamshed #endif
40576404edcSAsim Jamshed ep->stat.waits++;
40676404edcSAsim Jamshed ep->waiting = TRUE;
40776404edcSAsim Jamshed if (timeout > 0) {
40876404edcSAsim Jamshed struct timespec deadline;
40976404edcSAsim Jamshed
41076404edcSAsim Jamshed clock_gettime(CLOCK_REALTIME, &deadline);
411a5e1a556SAsim Jamshed if (timeout >= 1000) {
41276404edcSAsim Jamshed int sec;
41376404edcSAsim Jamshed sec = timeout / 1000;
41476404edcSAsim Jamshed deadline.tv_sec += sec;
41576404edcSAsim Jamshed timeout -= sec * 1000;
41676404edcSAsim Jamshed }
41776404edcSAsim Jamshed
418a5e1a556SAsim Jamshed deadline.tv_nsec += timeout * 1000000;
419a5e1a556SAsim Jamshed
42076404edcSAsim Jamshed if (deadline.tv_nsec >= 1000000000) {
42176404edcSAsim Jamshed deadline.tv_sec++;
42276404edcSAsim Jamshed deadline.tv_nsec -= 1000000000;
42376404edcSAsim Jamshed }
42476404edcSAsim Jamshed
42576404edcSAsim Jamshed //deadline.tv_sec = mtcp->cur_tv.tv_sec;
42676404edcSAsim Jamshed //deadline.tv_nsec = (mtcp->cur_tv.tv_usec + timeout * 1000) * 1000;
42776404edcSAsim Jamshed ret = pthread_cond_timedwait(&ep->epoll_cond,
42876404edcSAsim Jamshed &ep->epoll_lock, &deadline);
42976404edcSAsim Jamshed if (ret && ret != ETIMEDOUT) {
43076404edcSAsim Jamshed /* errno set by pthread_cond_timedwait() */
43176404edcSAsim Jamshed pthread_mutex_unlock(&ep->epoll_lock);
43276404edcSAsim Jamshed TRACE_ERROR("pthread_cond_timedwait failed. ret: %d, error: %s\n",
43376404edcSAsim Jamshed ret, strerror(errno));
43476404edcSAsim Jamshed return -1;
43576404edcSAsim Jamshed }
43676404edcSAsim Jamshed timeout = 0;
43776404edcSAsim Jamshed } else if (timeout < 0) {
43876404edcSAsim Jamshed ret = pthread_cond_wait(&ep->epoll_cond, &ep->epoll_lock);
43976404edcSAsim Jamshed if (ret) {
44076404edcSAsim Jamshed /* errno set by pthread_cond_wait() */
44176404edcSAsim Jamshed pthread_mutex_unlock(&ep->epoll_lock);
44276404edcSAsim Jamshed TRACE_ERROR("pthread_cond_wait failed. ret: %d, error: %s\n",
44376404edcSAsim Jamshed ret, strerror(errno));
44476404edcSAsim Jamshed return -1;
44576404edcSAsim Jamshed }
44676404edcSAsim Jamshed }
44776404edcSAsim Jamshed ep->waiting = FALSE;
44876404edcSAsim Jamshed
44976404edcSAsim Jamshed if (mtcp->ctx->done || mtcp->ctx->exit || mtcp->ctx->interrupt) {
45076404edcSAsim Jamshed mtcp->ctx->interrupt = FALSE;
45176404edcSAsim Jamshed //ret = pthread_cond_signal(&ep->epoll_cond);
45276404edcSAsim Jamshed pthread_mutex_unlock(&ep->epoll_lock);
45376404edcSAsim Jamshed errno = EINTR;
45476404edcSAsim Jamshed return -1;
45576404edcSAsim Jamshed }
45676404edcSAsim Jamshed
45776404edcSAsim Jamshed }
45876404edcSAsim Jamshed
45976404edcSAsim Jamshed /* fetch events from the user event queue */
46076404edcSAsim Jamshed cnt = 0;
46176404edcSAsim Jamshed num_events = eq->num_events;
46276404edcSAsim Jamshed for (i = 0; i < num_events && cnt < maxevents; i++) {
46376404edcSAsim Jamshed event_socket = &mtcp->smap[eq->events[eq->start].sockid];
46476404edcSAsim Jamshed validity = TRUE;
46576404edcSAsim Jamshed if (event_socket->socktype == MOS_SOCK_UNUSED)
46676404edcSAsim Jamshed validity = FALSE;
46776404edcSAsim Jamshed if (!(event_socket->epoll & eq->events[eq->start].ev.events))
46876404edcSAsim Jamshed validity = FALSE;
46976404edcSAsim Jamshed if (!(event_socket->events & eq->events[eq->start].ev.events))
47076404edcSAsim Jamshed validity = FALSE;
47176404edcSAsim Jamshed
47276404edcSAsim Jamshed if (validity) {
47376404edcSAsim Jamshed events[cnt++] = eq->events[eq->start].ev;
47476404edcSAsim Jamshed assert(eq->events[eq->start].sockid >= 0);
47576404edcSAsim Jamshed
47676404edcSAsim Jamshed TRACE_EPOLL("Socket %d: Handled event. event: %s, "
47776404edcSAsim Jamshed "start: %u, end: %u, num: %u\n",
47876404edcSAsim Jamshed event_socket->id,
47976404edcSAsim Jamshed EventToString(eq->events[eq->start].ev.events),
48076404edcSAsim Jamshed eq->start, eq->end, eq->num_events);
48176404edcSAsim Jamshed ep->stat.handled++;
48276404edcSAsim Jamshed } else {
48376404edcSAsim Jamshed TRACE_EPOLL("Socket %d: event %s invalidated.\n",
48476404edcSAsim Jamshed eq->events[eq->start].sockid,
48576404edcSAsim Jamshed EventToString(eq->events[eq->start].ev.events));
48676404edcSAsim Jamshed ep->stat.invalidated++;
48776404edcSAsim Jamshed }
48876404edcSAsim Jamshed event_socket->events &= (~eq->events[eq->start].ev.events);
48976404edcSAsim Jamshed
49076404edcSAsim Jamshed eq->start++;
49176404edcSAsim Jamshed eq->num_events--;
49276404edcSAsim Jamshed if (eq->start >= eq->size) {
49376404edcSAsim Jamshed eq->start = 0;
49476404edcSAsim Jamshed }
49576404edcSAsim Jamshed }
49676404edcSAsim Jamshed
49776404edcSAsim Jamshed /* fetch eventes from user shadow event queue */
49876404edcSAsim Jamshed eq = ep->usr_shadow_queue;
49976404edcSAsim Jamshed num_events = eq->num_events;
50076404edcSAsim Jamshed for (i = 0; i < num_events && cnt < maxevents; i++) {
50176404edcSAsim Jamshed event_socket = &mtcp->smap[eq->events[eq->start].sockid];
50276404edcSAsim Jamshed validity = TRUE;
50376404edcSAsim Jamshed if (event_socket->socktype == MOS_SOCK_UNUSED)
50476404edcSAsim Jamshed validity = FALSE;
50576404edcSAsim Jamshed if (!(event_socket->epoll & eq->events[eq->start].ev.events))
50676404edcSAsim Jamshed validity = FALSE;
50776404edcSAsim Jamshed if (!(event_socket->events & eq->events[eq->start].ev.events))
50876404edcSAsim Jamshed validity = FALSE;
50976404edcSAsim Jamshed
51076404edcSAsim Jamshed if (validity) {
51176404edcSAsim Jamshed events[cnt++] = eq->events[eq->start].ev;
51276404edcSAsim Jamshed assert(eq->events[eq->start].sockid >= 0);
51376404edcSAsim Jamshed
51476404edcSAsim Jamshed TRACE_EPOLL("Socket %d: Handled event. event: %s, "
51576404edcSAsim Jamshed "start: %u, end: %u, num: %u\n",
51676404edcSAsim Jamshed event_socket->id,
51776404edcSAsim Jamshed EventToString(eq->events[eq->start].ev.events),
51876404edcSAsim Jamshed eq->start, eq->end, eq->num_events);
51976404edcSAsim Jamshed ep->stat.handled++;
52076404edcSAsim Jamshed } else {
52176404edcSAsim Jamshed TRACE_EPOLL("Socket %d: event %s invalidated.\n",
52276404edcSAsim Jamshed eq->events[eq->start].sockid,
52376404edcSAsim Jamshed EventToString(eq->events[eq->start].ev.events));
52476404edcSAsim Jamshed ep->stat.invalidated++;
52576404edcSAsim Jamshed }
52676404edcSAsim Jamshed event_socket->events &= (~eq->events[eq->start].ev.events);
52776404edcSAsim Jamshed
52876404edcSAsim Jamshed eq->start++;
52976404edcSAsim Jamshed eq->num_events--;
53076404edcSAsim Jamshed if (eq->start >= eq->size) {
53176404edcSAsim Jamshed eq->start = 0;
53276404edcSAsim Jamshed }
53376404edcSAsim Jamshed }
53476404edcSAsim Jamshed
53576404edcSAsim Jamshed if (cnt == 0 && timeout != 0)
53676404edcSAsim Jamshed goto wait;
53776404edcSAsim Jamshed
53876404edcSAsim Jamshed pthread_mutex_unlock(&ep->epoll_lock);
53976404edcSAsim Jamshed
54076404edcSAsim Jamshed return cnt;
54176404edcSAsim Jamshed }
54276404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
54376404edcSAsim Jamshed inline int
AddEpollEvent(struct mtcp_epoll * ep,int queue_type,socket_map_t socket,uint32_t event)54476404edcSAsim Jamshed AddEpollEvent(struct mtcp_epoll *ep,
54576404edcSAsim Jamshed int queue_type, socket_map_t socket, uint32_t event)
54676404edcSAsim Jamshed {
547cafe7743SAsim Jamshed #ifdef DBGMSG
5483b6b9ba6SAsim Jamshed __PREPARE_DBGLOGGING();
549cafe7743SAsim Jamshed #endif
55076404edcSAsim Jamshed struct event_queue *eq;
55176404edcSAsim Jamshed int index;
55276404edcSAsim Jamshed
55376404edcSAsim Jamshed if (!ep || !socket || !event)
55476404edcSAsim Jamshed return -1;
55576404edcSAsim Jamshed
55676404edcSAsim Jamshed ep->stat.issued++;
55776404edcSAsim Jamshed
55876404edcSAsim Jamshed if (socket->events & event) {
55976404edcSAsim Jamshed return 0;
56076404edcSAsim Jamshed }
56176404edcSAsim Jamshed
56276404edcSAsim Jamshed if (queue_type == MOS_EVENT_QUEUE) {
56376404edcSAsim Jamshed eq = ep->mtcp_queue;
56476404edcSAsim Jamshed } else if (queue_type == USR_EVENT_QUEUE) {
56576404edcSAsim Jamshed eq = ep->usr_queue;
56676404edcSAsim Jamshed pthread_mutex_lock(&ep->epoll_lock);
56776404edcSAsim Jamshed } else if (queue_type == USR_SHADOW_EVENT_QUEUE) {
56876404edcSAsim Jamshed eq = ep->usr_shadow_queue;
56976404edcSAsim Jamshed } else {
57076404edcSAsim Jamshed TRACE_ERROR("Non-existing event queue type!\n");
57176404edcSAsim Jamshed return -1;
57276404edcSAsim Jamshed }
57376404edcSAsim Jamshed
57476404edcSAsim Jamshed if (eq->num_events >= eq->size) {
57576404edcSAsim Jamshed TRACE_ERROR("Exceeded epoll event queue! num_events: %d, size: %d\n",
57676404edcSAsim Jamshed eq->num_events, eq->size);
57776404edcSAsim Jamshed if (queue_type == USR_EVENT_QUEUE)
57876404edcSAsim Jamshed pthread_mutex_unlock(&ep->epoll_lock);
57976404edcSAsim Jamshed return -1;
58076404edcSAsim Jamshed }
58176404edcSAsim Jamshed
58276404edcSAsim Jamshed index = eq->end++;
58376404edcSAsim Jamshed
58476404edcSAsim Jamshed socket->events |= event;
58576404edcSAsim Jamshed eq->events[index].sockid = socket->id;
58676404edcSAsim Jamshed eq->events[index].ev.events = event;
58776404edcSAsim Jamshed eq->events[index].ev.data = socket->ep_data;
58876404edcSAsim Jamshed
58976404edcSAsim Jamshed if (eq->end >= eq->size) {
59076404edcSAsim Jamshed eq->end = 0;
59176404edcSAsim Jamshed }
59276404edcSAsim Jamshed eq->num_events++;
59376404edcSAsim Jamshed
59476404edcSAsim Jamshed TRACE_EPOLL("Socket %d New event: %s, start: %u, end: %u, num: %u\n",
595cafe7743SAsim Jamshed eq->events[index].sockid,
596cafe7743SAsim Jamshed EventToString(eq->events[index].ev.events),
597cafe7743SAsim Jamshed eq->start, eq->end, eq->num_events);
59876404edcSAsim Jamshed
59976404edcSAsim Jamshed if (queue_type == USR_EVENT_QUEUE)
60076404edcSAsim Jamshed pthread_mutex_unlock(&ep->epoll_lock);
60176404edcSAsim Jamshed
60276404edcSAsim Jamshed ep->stat.registered++;
60376404edcSAsim Jamshed
60476404edcSAsim Jamshed return 0;
60576404edcSAsim Jamshed }
606