1 #include "debug.h" 2 #include <sys/queue.h> 3 #include <unistd.h> 4 #include <time.h> 5 #include <signal.h> 6 #include <assert.h> 7 #include <string.h> 8 9 #include "mtcp.h" 10 #include "tcp_stream.h" 11 #include "eventpoll.h" 12 #include "tcp_in.h" 13 #include "pipe.h" 14 #include "tcp_rb.h" 15 #include "config.h" 16 17 #define MAX(a, b) ((a)>(b)?(a):(b)) 18 #define MIN(a, b) ((a)<(b)?(a):(b)) 19 20 #define SPIN_BEFORE_SLEEP FALSE 21 #define SPIN_THRESH 10000000 22 23 /*----------------------------------------------------------------------------*/ 24 char *event_str[] = {"NONE", "IN", "PRI", "OUT", "ERR", "HUP", "RDHUP"}; 25 /*----------------------------------------------------------------------------*/ 26 char * 27 EventToString(uint32_t event) 28 { 29 switch (event) { 30 case MOS_EPOLLNONE: 31 return event_str[0]; 32 break; 33 case MOS_EPOLLIN: 34 return event_str[1]; 35 break; 36 case MOS_EPOLLPRI: 37 return event_str[2]; 38 break; 39 case MOS_EPOLLOUT: 40 return event_str[3]; 41 break; 42 case MOS_EPOLLERR: 43 return event_str[4]; 44 break; 45 case MOS_EPOLLHUP: 46 return event_str[5]; 47 break; 48 case MOS_EPOLLRDHUP: 49 return event_str[6]; 50 break; 51 default: 52 assert(0); 53 } 54 55 assert(0); 56 return NULL; 57 } 58 /*----------------------------------------------------------------------------*/ 59 struct event_queue * 60 CreateEventQueue(int size) 61 { 62 struct event_queue *eq; 63 64 eq = (struct event_queue *)calloc(1, sizeof(struct event_queue)); 65 if (!eq) 66 return NULL; 67 68 eq->start = 0; 69 eq->end = 0; 70 eq->size = size; 71 eq->events = (struct mtcp_epoll_event_int *) 72 calloc(size, sizeof(struct mtcp_epoll_event_int)); 73 if (!eq->events) { 74 free(eq); 75 return NULL; 76 } 77 eq->num_events = 0; 78 79 return eq; 80 } 81 /*----------------------------------------------------------------------------*/ 82 void 83 DestroyEventQueue(struct event_queue *eq) 84 { 85 if (eq->events) 86 free(eq->events); 87 88 free(eq); 89 } 90 /*----------------------------------------------------------------------------*/ 91 int 92 mtcp_epoll_create(mctx_t mctx, int size) 93 { 94 mtcp_manager_t mtcp = g_mtcp[mctx->cpu]; 95 struct mtcp_epoll *ep; 96 socket_map_t epsocket; 97 98 if (size <= 0) { 99 errno = EINVAL; 100 return -1; 101 } 102 103 epsocket = AllocateSocket(mctx, MOS_SOCK_EPOLL); 104 if (!epsocket) { 105 errno = ENFILE; 106 return -1; 107 } 108 109 ep = (struct mtcp_epoll *)calloc(1, sizeof(struct mtcp_epoll)); 110 if (!ep) { 111 FreeSocket(mctx, epsocket->id, MOS_SOCK_EPOLL); 112 return -1; 113 } 114 115 /* create event queues */ 116 ep->usr_queue = CreateEventQueue(size); 117 if (!ep->usr_queue) 118 return -1; 119 120 ep->usr_shadow_queue = CreateEventQueue(size); 121 if (!ep->usr_shadow_queue) { 122 DestroyEventQueue(ep->usr_queue); 123 return -1; 124 } 125 126 ep->mtcp_queue = CreateEventQueue(size); 127 if (!ep->mtcp_queue) { 128 DestroyEventQueue(ep->usr_queue); 129 DestroyEventQueue(ep->usr_shadow_queue); 130 return -1; 131 } 132 133 TRACE_EPOLL("epoll structure of size %d created.\n", size); 134 135 mtcp->ep = ep; 136 epsocket->ep = ep; 137 138 if (pthread_mutex_init(&ep->epoll_lock, NULL)) { 139 return -1; 140 } 141 if (pthread_cond_init(&ep->epoll_cond, NULL)) { 142 return -1; 143 } 144 145 return epsocket->id; 146 } 147 /*----------------------------------------------------------------------------*/ 148 int 149 CloseEpollSocket(mctx_t mctx, int epid) 150 { 151 mtcp_manager_t mtcp; 152 struct mtcp_epoll *ep; 153 154 mtcp = GetMTCPManager(mctx); 155 if (!mtcp) { 156 return -1; 157 } 158 159 ep = mtcp->smap[epid].ep; 160 if (!ep) { 161 errno = EINVAL; 162 return -1; 163 } 164 165 DestroyEventQueue(ep->usr_queue); 166 DestroyEventQueue(ep->usr_shadow_queue); 167 DestroyEventQueue(ep->mtcp_queue); 168 free(ep); 169 170 pthread_mutex_lock(&ep->epoll_lock); 171 mtcp->ep = NULL; 172 mtcp->smap[epid].ep = NULL; 173 pthread_cond_signal(&ep->epoll_cond); 174 pthread_mutex_unlock(&ep->epoll_lock); 175 176 pthread_cond_destroy(&ep->epoll_cond); 177 pthread_mutex_destroy(&ep->epoll_lock); 178 179 return 0; 180 } 181 /*----------------------------------------------------------------------------*/ 182 static int 183 RaisePendingStreamEvents(mtcp_manager_t mtcp, 184 struct mtcp_epoll *ep, socket_map_t socket) 185 { 186 tcp_stream *stream = socket->stream; 187 188 if (!stream) 189 return -1; 190 if (stream->state < TCP_ST_ESTABLISHED) 191 return -1; 192 193 TRACE_EPOLL("Stream %d at state %s\n", 194 stream->id, TCPStateToString(stream)); 195 /* if there are payloads already read before epoll registration */ 196 /* generate read event */ 197 if (socket->epoll & MOS_EPOLLIN) { 198 struct tcp_recv_vars *rcvvar = stream->rcvvar; 199 if (rcvvar->rcvbuf && tcprb_cflen(rcvvar->rcvbuf) > 0) { 200 TRACE_EPOLL("Socket %d: Has existing payloads\n", socket->id); 201 AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 202 } else if (stream->state == TCP_ST_CLOSE_WAIT) { 203 TRACE_EPOLL("Socket %d: Waiting for close\n", socket->id); 204 AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 205 } 206 } 207 208 /* same thing to the write event */ 209 if (socket->epoll & MOS_EPOLLOUT) { 210 struct tcp_send_vars *sndvar = stream->sndvar; 211 if (!sndvar->sndbuf || 212 (sndvar->sndbuf && sndvar->sndbuf->len < sndvar->snd_wnd)) { 213 if (!(socket->events & MOS_EPOLLOUT)) { 214 TRACE_EPOLL("Socket %d: Adding write event\n", socket->id); 215 AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 216 } 217 } 218 } 219 220 return 0; 221 } 222 /*----------------------------------------------------------------------------*/ 223 int 224 mtcp_epoll_ctl(mctx_t mctx, int epid, 225 int op, int sockid, struct mtcp_epoll_event *event) 226 { 227 mtcp_manager_t mtcp; 228 struct mtcp_epoll *ep; 229 socket_map_t socket; 230 uint32_t events; 231 232 mtcp = GetMTCPManager(mctx); 233 if (!mtcp) { 234 return -1; 235 } 236 237 if (epid < 0 || epid >= g_config.mos->max_concurrency) { 238 TRACE_API("Epoll id %d out of range.\n", epid); 239 errno = EBADF; 240 return -1; 241 } 242 243 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 244 TRACE_API("Socket id %d out of range.\n", sockid); 245 errno = EBADF; 246 return -1; 247 } 248 249 if (mtcp->smap[epid].socktype == MOS_SOCK_UNUSED) { 250 errno = EBADF; 251 return -1; 252 } 253 254 if (mtcp->smap[epid].socktype != MOS_SOCK_EPOLL) { 255 errno = EINVAL; 256 return -1; 257 } 258 259 ep = mtcp->smap[epid].ep; 260 if (!ep || (!event && op != MOS_EPOLL_CTL_DEL)) { 261 errno = EINVAL; 262 return -1; 263 } 264 socket = &mtcp->smap[sockid]; 265 266 if (op == MOS_EPOLL_CTL_ADD) { 267 if (socket->epoll) { 268 errno = EEXIST; 269 return -1; 270 } 271 272 /* EPOLLERR and EPOLLHUP are registered as default */ 273 events = event->events; 274 events |= (MOS_EPOLLERR | MOS_EPOLLHUP); 275 socket->ep_data = event->data; 276 socket->epoll = events; 277 278 TRACE_EPOLL("Adding epoll socket %d(type %d) ET: %u, IN: %u, OUT: %u\n", 279 socket->id, socket->socktype, socket->epoll & MOS_EPOLLET, 280 socket->epoll & MOS_EPOLLIN, socket->epoll & MOS_EPOLLOUT); 281 282 if (socket->socktype == MOS_SOCK_STREAM) { 283 RaisePendingStreamEvents(mtcp, ep, socket); 284 } else if (socket->socktype == MOS_SOCK_PIPE) { 285 RaisePendingPipeEvents(mctx, epid, sockid); 286 } 287 288 } else if (op == MOS_EPOLL_CTL_MOD) { 289 if (!socket->epoll) { 290 pthread_mutex_unlock(&ep->epoll_lock); 291 errno = ENOENT; 292 return -1; 293 } 294 295 events = event->events; 296 events |= (MOS_EPOLLERR | MOS_EPOLLHUP); 297 socket->ep_data = event->data; 298 socket->epoll = events; 299 300 if (socket->socktype == MOS_SOCK_STREAM) { 301 RaisePendingStreamEvents(mtcp, ep, socket); 302 } else if (socket->socktype == MOS_SOCK_PIPE) { 303 RaisePendingPipeEvents(mctx, epid, sockid); 304 } 305 306 } else if (op == MOS_EPOLL_CTL_DEL) { 307 if (!socket->epoll) { 308 errno = ENOENT; 309 return -1; 310 } 311 312 socket->epoll = MOS_EPOLLNONE; 313 } 314 315 return 0; 316 } 317 /*----------------------------------------------------------------------------*/ 318 int 319 mtcp_epoll_wait(mctx_t mctx, int epid, 320 struct mtcp_epoll_event *events, int maxevents, int timeout) 321 { 322 mtcp_manager_t mtcp; 323 struct mtcp_epoll *ep; 324 struct event_queue *eq; 325 struct event_queue *eq_shadow; 326 socket_map_t event_socket; 327 int validity; 328 int i, cnt, ret; 329 int num_events; 330 331 mtcp = GetMTCPManager(mctx); 332 if (!mtcp) { 333 return -1; 334 } 335 336 if (epid < 0 || epid >= g_config.mos->max_concurrency) { 337 TRACE_API("Epoll id %d out of range.\n", epid); 338 errno = EBADF; 339 return -1; 340 } 341 342 if (mtcp->smap[epid].socktype == MOS_SOCK_UNUSED) { 343 errno = EBADF; 344 return -1; 345 } 346 347 if (mtcp->smap[epid].socktype != MOS_SOCK_EPOLL) { 348 errno = EINVAL; 349 return -1; 350 } 351 352 ep = mtcp->smap[epid].ep; 353 if (!ep || !events || maxevents <= 0) { 354 errno = EINVAL; 355 return -1; 356 } 357 358 ep->stat.calls++; 359 360 #if SPIN_BEFORE_SLEEP 361 int spin = 0; 362 while (ep->num_events == 0 && spin < SPIN_THRESH) { 363 spin++; 364 } 365 #endif /* SPIN_BEFORE_SLEEP */ 366 367 if (pthread_mutex_lock(&ep->epoll_lock)) { 368 if (errno == EDEADLK) 369 perror("mtcp_epoll_wait: epoll_lock blocked\n"); 370 assert(0); 371 } 372 373 wait: 374 eq = ep->usr_queue; 375 eq_shadow = ep->usr_shadow_queue; 376 377 /* wait until event occurs */ 378 while (eq->num_events == 0 && eq_shadow->num_events == 0 && timeout != 0) { 379 380 #if INTR_SLEEPING_MTCP 381 /* signal to mtcp thread if it is sleeping */ 382 if (mtcp->wakeup_flag && mtcp->is_sleeping) { 383 pthread_kill(mtcp->ctx->thread, SIGUSR1); 384 } 385 #endif 386 ep->stat.waits++; 387 ep->waiting = TRUE; 388 if (timeout > 0) { 389 struct timespec deadline; 390 391 clock_gettime(CLOCK_REALTIME, &deadline); 392 if (timeout > 1000) { 393 int sec; 394 sec = timeout / 1000; 395 deadline.tv_sec += sec; 396 timeout -= sec * 1000; 397 } 398 399 if (deadline.tv_nsec >= 1000000000) { 400 deadline.tv_sec++; 401 deadline.tv_nsec -= 1000000000; 402 } 403 404 //deadline.tv_sec = mtcp->cur_tv.tv_sec; 405 //deadline.tv_nsec = (mtcp->cur_tv.tv_usec + timeout * 1000) * 1000; 406 ret = pthread_cond_timedwait(&ep->epoll_cond, 407 &ep->epoll_lock, &deadline); 408 if (ret && ret != ETIMEDOUT) { 409 /* errno set by pthread_cond_timedwait() */ 410 pthread_mutex_unlock(&ep->epoll_lock); 411 TRACE_ERROR("pthread_cond_timedwait failed. ret: %d, error: %s\n", 412 ret, strerror(errno)); 413 return -1; 414 } 415 timeout = 0; 416 } else if (timeout < 0) { 417 ret = pthread_cond_wait(&ep->epoll_cond, &ep->epoll_lock); 418 if (ret) { 419 /* errno set by pthread_cond_wait() */ 420 pthread_mutex_unlock(&ep->epoll_lock); 421 TRACE_ERROR("pthread_cond_wait failed. ret: %d, error: %s\n", 422 ret, strerror(errno)); 423 return -1; 424 } 425 } 426 ep->waiting = FALSE; 427 428 if (mtcp->ctx->done || mtcp->ctx->exit || mtcp->ctx->interrupt) { 429 mtcp->ctx->interrupt = FALSE; 430 //ret = pthread_cond_signal(&ep->epoll_cond); 431 pthread_mutex_unlock(&ep->epoll_lock); 432 errno = EINTR; 433 return -1; 434 } 435 436 } 437 438 /* fetch events from the user event queue */ 439 cnt = 0; 440 num_events = eq->num_events; 441 for (i = 0; i < num_events && cnt < maxevents; i++) { 442 event_socket = &mtcp->smap[eq->events[eq->start].sockid]; 443 validity = TRUE; 444 if (event_socket->socktype == MOS_SOCK_UNUSED) 445 validity = FALSE; 446 if (!(event_socket->epoll & eq->events[eq->start].ev.events)) 447 validity = FALSE; 448 if (!(event_socket->events & eq->events[eq->start].ev.events)) 449 validity = FALSE; 450 451 if (validity) { 452 events[cnt++] = eq->events[eq->start].ev; 453 assert(eq->events[eq->start].sockid >= 0); 454 455 TRACE_EPOLL("Socket %d: Handled event. event: %s, " 456 "start: %u, end: %u, num: %u\n", 457 event_socket->id, 458 EventToString(eq->events[eq->start].ev.events), 459 eq->start, eq->end, eq->num_events); 460 ep->stat.handled++; 461 } else { 462 TRACE_EPOLL("Socket %d: event %s invalidated.\n", 463 eq->events[eq->start].sockid, 464 EventToString(eq->events[eq->start].ev.events)); 465 ep->stat.invalidated++; 466 } 467 event_socket->events &= (~eq->events[eq->start].ev.events); 468 469 eq->start++; 470 eq->num_events--; 471 if (eq->start >= eq->size) { 472 eq->start = 0; 473 } 474 } 475 476 /* fetch eventes from user shadow event queue */ 477 eq = ep->usr_shadow_queue; 478 num_events = eq->num_events; 479 for (i = 0; i < num_events && cnt < maxevents; i++) { 480 event_socket = &mtcp->smap[eq->events[eq->start].sockid]; 481 validity = TRUE; 482 if (event_socket->socktype == MOS_SOCK_UNUSED) 483 validity = FALSE; 484 if (!(event_socket->epoll & eq->events[eq->start].ev.events)) 485 validity = FALSE; 486 if (!(event_socket->events & eq->events[eq->start].ev.events)) 487 validity = FALSE; 488 489 if (validity) { 490 events[cnt++] = eq->events[eq->start].ev; 491 assert(eq->events[eq->start].sockid >= 0); 492 493 TRACE_EPOLL("Socket %d: Handled event. event: %s, " 494 "start: %u, end: %u, num: %u\n", 495 event_socket->id, 496 EventToString(eq->events[eq->start].ev.events), 497 eq->start, eq->end, eq->num_events); 498 ep->stat.handled++; 499 } else { 500 TRACE_EPOLL("Socket %d: event %s invalidated.\n", 501 eq->events[eq->start].sockid, 502 EventToString(eq->events[eq->start].ev.events)); 503 ep->stat.invalidated++; 504 } 505 event_socket->events &= (~eq->events[eq->start].ev.events); 506 507 eq->start++; 508 eq->num_events--; 509 if (eq->start >= eq->size) { 510 eq->start = 0; 511 } 512 } 513 514 if (cnt == 0 && timeout != 0) 515 goto wait; 516 517 pthread_mutex_unlock(&ep->epoll_lock); 518 519 return cnt; 520 } 521 /*----------------------------------------------------------------------------*/ 522 inline int 523 AddEpollEvent(struct mtcp_epoll *ep, 524 int queue_type, socket_map_t socket, uint32_t event) 525 { 526 #ifdef DBGMSG 527 __PREPARE_DBGLOGGING(); 528 #endif 529 struct event_queue *eq; 530 int index; 531 532 if (!ep || !socket || !event) 533 return -1; 534 535 ep->stat.issued++; 536 537 if (socket->events & event) { 538 return 0; 539 } 540 541 if (queue_type == MOS_EVENT_QUEUE) { 542 eq = ep->mtcp_queue; 543 } else if (queue_type == USR_EVENT_QUEUE) { 544 eq = ep->usr_queue; 545 pthread_mutex_lock(&ep->epoll_lock); 546 } else if (queue_type == USR_SHADOW_EVENT_QUEUE) { 547 eq = ep->usr_shadow_queue; 548 } else { 549 TRACE_ERROR("Non-existing event queue type!\n"); 550 return -1; 551 } 552 553 if (eq->num_events >= eq->size) { 554 TRACE_ERROR("Exceeded epoll event queue! num_events: %d, size: %d\n", 555 eq->num_events, eq->size); 556 if (queue_type == USR_EVENT_QUEUE) 557 pthread_mutex_unlock(&ep->epoll_lock); 558 return -1; 559 } 560 561 index = eq->end++; 562 563 socket->events |= event; 564 eq->events[index].sockid = socket->id; 565 eq->events[index].ev.events = event; 566 eq->events[index].ev.data = socket->ep_data; 567 568 if (eq->end >= eq->size) { 569 eq->end = 0; 570 } 571 eq->num_events++; 572 573 TRACE_EPOLL("Socket %d New event: %s, start: %u, end: %u, num: %u\n", 574 eq->events[index].sockid, 575 EventToString(eq->events[index].ev.events), 576 eq->start, eq->end, eq->num_events); 577 578 if (queue_type == USR_EVENT_QUEUE) 579 pthread_mutex_unlock(&ep->epoll_lock); 580 581 ep->stat.registered++; 582 583 return 0; 584 } 585