1 #include "debug.h" 2 #include <sys/queue.h> 3 #include <unistd.h> 4 #include <time.h> 5 #include <signal.h> 6 #include <assert.h> 7 #include <string.h> 8 9 #include "mtcp.h" 10 #include "tcp_stream.h" 11 #include "eventpoll.h" 12 #include "tcp_in.h" 13 #include "pipe.h" 14 #include "tcp_rb.h" 15 #include "config.h" 16 17 #define MAX(a, b) ((a)>(b)?(a):(b)) 18 #define MIN(a, b) ((a)<(b)?(a):(b)) 19 20 #define SPIN_BEFORE_SLEEP FALSE 21 #define SPIN_THRESH 10000000 22 23 /*----------------------------------------------------------------------------*/ 24 char *event_str[] = {"NONE", "IN", "PRI", "OUT", "ERR", "HUP", "RDHUP"}; 25 /*----------------------------------------------------------------------------*/ 26 char * 27 EventToString(uint32_t event) 28 { 29 switch (event) { 30 case MOS_EPOLLNONE: 31 return event_str[0]; 32 break; 33 case MOS_EPOLLIN: 34 return event_str[1]; 35 break; 36 case MOS_EPOLLPRI: 37 return event_str[2]; 38 break; 39 case MOS_EPOLLOUT: 40 return event_str[3]; 41 break; 42 case MOS_EPOLLERR: 43 return event_str[4]; 44 break; 45 case MOS_EPOLLHUP: 46 return event_str[5]; 47 break; 48 case MOS_EPOLLRDHUP: 49 return event_str[6]; 50 break; 51 default: 52 assert(0); 53 } 54 55 assert(0); 56 return NULL; 57 } 58 /*----------------------------------------------------------------------------*/ 59 struct event_queue * 60 CreateEventQueue(int size) 61 { 62 struct event_queue *eq; 63 64 eq = (struct event_queue *)calloc(1, sizeof(struct event_queue)); 65 if (!eq) 66 return NULL; 67 68 eq->start = 0; 69 eq->end = 0; 70 eq->size = size; 71 eq->events = (struct mtcp_epoll_event_int *) 72 calloc(size, sizeof(struct mtcp_epoll_event_int)); 73 if (!eq->events) { 74 free(eq); 75 return NULL; 76 } 77 eq->num_events = 0; 78 79 return eq; 80 } 81 /*----------------------------------------------------------------------------*/ 82 void 83 DestroyEventQueue(struct event_queue *eq) 84 { 85 if (eq->events) 86 free(eq->events); 87 88 free(eq); 89 } 90 /*----------------------------------------------------------------------------*/ 91 int 92 mtcp_epoll_create(mctx_t mctx, int size) 93 { 94 mtcp_manager_t mtcp = g_mtcp[mctx->cpu]; 95 struct mtcp_epoll *ep; 96 socket_map_t epsocket; 97 98 if (size <= 0) { 99 errno = EINVAL; 100 return -1; 101 } 102 103 epsocket = AllocateSocket(mctx, MOS_SOCK_EPOLL); 104 if (!epsocket) { 105 errno = ENFILE; 106 return -1; 107 } 108 109 ep = (struct mtcp_epoll *)calloc(1, sizeof(struct mtcp_epoll)); 110 if (!ep) { 111 FreeSocket(mctx, epsocket->id, MOS_SOCK_EPOLL); 112 return -1; 113 } 114 115 /* create event queues */ 116 ep->usr_queue = CreateEventQueue(size); 117 if (!ep->usr_queue) { 118 FreeSocket(mctx, epsocket->id, FALSE); 119 free(ep); 120 return -1; 121 } 122 123 ep->usr_shadow_queue = CreateEventQueue(size); 124 if (!ep->usr_shadow_queue) { 125 DestroyEventQueue(ep->usr_queue); 126 FreeSocket(mctx, epsocket->id, FALSE); 127 free(ep); 128 return -1; 129 } 130 131 ep->mtcp_queue = CreateEventQueue(size); 132 if (!ep->mtcp_queue) { 133 DestroyEventQueue(ep->usr_shadow_queue); 134 DestroyEventQueue(ep->usr_queue); 135 FreeSocket(mctx, epsocket->id, FALSE); 136 free(ep); 137 return -1; 138 } 139 140 TRACE_EPOLL("epoll structure of size %d created.\n", size); 141 142 mtcp->ep = ep; 143 epsocket->ep = ep; 144 145 if (pthread_mutex_init(&ep->epoll_lock, NULL)) { 146 DestroyEventQueue(ep->mtcp_queue); 147 DestroyEventQueue(ep->usr_shadow_queue); 148 DestroyEventQueue(ep->usr_queue); 149 FreeSocket(mctx, epsocket->id, FALSE); 150 free(ep); 151 return -1; 152 } 153 if (pthread_cond_init(&ep->epoll_cond, NULL)) { 154 DestroyEventQueue(ep->mtcp_queue); 155 DestroyEventQueue(ep->usr_shadow_queue); 156 DestroyEventQueue(ep->usr_queue); 157 FreeSocket(mctx, epsocket->id, FALSE); 158 free(ep); 159 return -1; 160 } 161 162 return epsocket->id; 163 } 164 /*----------------------------------------------------------------------------*/ 165 int 166 CloseEpollSocket(mctx_t mctx, int epid) 167 { 168 mtcp_manager_t mtcp; 169 struct mtcp_epoll *ep; 170 171 mtcp = GetMTCPManager(mctx); 172 if (!mtcp) { 173 return -1; 174 } 175 176 ep = mtcp->smap[epid].ep; 177 if (!ep) { 178 errno = EINVAL; 179 return -1; 180 } 181 182 DestroyEventQueue(ep->usr_queue); 183 DestroyEventQueue(ep->usr_shadow_queue); 184 DestroyEventQueue(ep->mtcp_queue); 185 186 pthread_mutex_lock(&ep->epoll_lock); 187 mtcp->ep = NULL; 188 mtcp->smap[epid].ep = NULL; 189 pthread_cond_signal(&ep->epoll_cond); 190 pthread_mutex_unlock(&ep->epoll_lock); 191 192 pthread_cond_destroy(&ep->epoll_cond); 193 pthread_mutex_destroy(&ep->epoll_lock); 194 free(ep); 195 196 return 0; 197 } 198 /*----------------------------------------------------------------------------*/ 199 static int 200 RaisePendingStreamEvents(mtcp_manager_t mtcp, 201 struct mtcp_epoll *ep, socket_map_t socket) 202 { 203 tcp_stream *stream = socket->stream; 204 205 if (!stream) 206 return -1; 207 if (stream->state < TCP_ST_ESTABLISHED) 208 return -1; 209 210 TRACE_EPOLL("Stream %d at state %s\n", 211 stream->id, TCPStateToString(stream)); 212 /* if there are payloads already read before epoll registration */ 213 /* generate read event */ 214 if (socket->epoll & MOS_EPOLLIN) { 215 struct tcp_recv_vars *rcvvar = stream->rcvvar; 216 if (rcvvar->rcvbuf && tcprb_cflen(rcvvar->rcvbuf) > 0) { 217 TRACE_EPOLL("Socket %d: Has existing payloads\n", socket->id); 218 AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 219 } else if (stream->state == TCP_ST_CLOSE_WAIT) { 220 TRACE_EPOLL("Socket %d: Waiting for close\n", socket->id); 221 AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 222 } 223 } 224 225 /* same thing to the write event */ 226 if (socket->epoll & MOS_EPOLLOUT) { 227 struct tcp_send_vars *sndvar = stream->sndvar; 228 if (!sndvar->sndbuf || 229 (sndvar->sndbuf && sndvar->sndbuf->len < sndvar->snd_wnd)) { 230 if (!(socket->events & MOS_EPOLLOUT)) { 231 TRACE_EPOLL("Socket %d: Adding write event\n", socket->id); 232 AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 233 } 234 } 235 } 236 237 return 0; 238 } 239 /*----------------------------------------------------------------------------*/ 240 int 241 mtcp_epoll_ctl(mctx_t mctx, int epid, 242 int op, int sockid, struct mtcp_epoll_event *event) 243 { 244 mtcp_manager_t mtcp; 245 struct mtcp_epoll *ep; 246 socket_map_t socket; 247 uint32_t events; 248 249 mtcp = GetMTCPManager(mctx); 250 if (!mtcp) { 251 return -1; 252 } 253 254 if (epid < 0 || epid >= g_config.mos->max_concurrency) { 255 TRACE_API("Epoll id %d out of range.\n", epid); 256 errno = EBADF; 257 return -1; 258 } 259 260 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 261 TRACE_API("Socket id %d out of range.\n", sockid); 262 errno = EBADF; 263 return -1; 264 } 265 266 if (mtcp->smap[epid].socktype == MOS_SOCK_UNUSED) { 267 errno = EBADF; 268 return -1; 269 } 270 271 if (mtcp->smap[epid].socktype != MOS_SOCK_EPOLL) { 272 errno = EINVAL; 273 return -1; 274 } 275 276 ep = mtcp->smap[epid].ep; 277 if (!ep || (!event && op != MOS_EPOLL_CTL_DEL)) { 278 errno = EINVAL; 279 return -1; 280 } 281 socket = &mtcp->smap[sockid]; 282 283 if (op == MOS_EPOLL_CTL_ADD) { 284 if (socket->epoll) { 285 errno = EEXIST; 286 return -1; 287 } 288 289 /* EPOLLERR and EPOLLHUP are registered as default */ 290 events = event->events; 291 events |= (MOS_EPOLLERR | MOS_EPOLLHUP); 292 socket->ep_data = event->data; 293 socket->epoll = events; 294 295 TRACE_EPOLL("Adding epoll socket %d(type %d) ET: %llu, IN: %llu, OUT: %llu\n", 296 socket->id, socket->socktype, 297 (unsigned long long)socket->epoll & MOS_EPOLLET, 298 (unsigned long long)socket->epoll & MOS_EPOLLIN, 299 (unsigned long long)socket->epoll & MOS_EPOLLOUT); 300 301 if (socket->socktype == MOS_SOCK_STREAM) { 302 RaisePendingStreamEvents(mtcp, ep, socket); 303 } else if (socket->socktype == MOS_SOCK_PIPE) { 304 RaisePendingPipeEvents(mctx, epid, sockid); 305 } 306 307 } else if (op == MOS_EPOLL_CTL_MOD) { 308 if (!socket->epoll) { 309 pthread_mutex_unlock(&ep->epoll_lock); 310 errno = ENOENT; 311 return -1; 312 } 313 314 events = event->events; 315 events |= (MOS_EPOLLERR | MOS_EPOLLHUP); 316 socket->ep_data = event->data; 317 socket->epoll = events; 318 319 if (socket->socktype == MOS_SOCK_STREAM) { 320 RaisePendingStreamEvents(mtcp, ep, socket); 321 } else if (socket->socktype == MOS_SOCK_PIPE) { 322 RaisePendingPipeEvents(mctx, epid, sockid); 323 } 324 325 } else if (op == MOS_EPOLL_CTL_DEL) { 326 if (!socket->epoll) { 327 errno = ENOENT; 328 return -1; 329 } 330 331 socket->epoll = MOS_EPOLLNONE; 332 } 333 334 return 0; 335 } 336 /*----------------------------------------------------------------------------*/ 337 int 338 mtcp_epoll_wait(mctx_t mctx, int epid, 339 struct mtcp_epoll_event *events, int maxevents, int timeout) 340 { 341 mtcp_manager_t mtcp; 342 struct mtcp_epoll *ep; 343 struct event_queue *eq; 344 struct event_queue *eq_shadow; 345 socket_map_t event_socket; 346 int validity; 347 int i, cnt, ret; 348 int num_events; 349 350 mtcp = GetMTCPManager(mctx); 351 if (!mtcp) { 352 return -1; 353 } 354 355 if (epid < 0 || epid >= g_config.mos->max_concurrency) { 356 TRACE_API("Epoll id %d out of range.\n", epid); 357 errno = EBADF; 358 return -1; 359 } 360 361 if (mtcp->smap[epid].socktype == MOS_SOCK_UNUSED) { 362 errno = EBADF; 363 return -1; 364 } 365 366 if (mtcp->smap[epid].socktype != MOS_SOCK_EPOLL) { 367 errno = EINVAL; 368 return -1; 369 } 370 371 ep = mtcp->smap[epid].ep; 372 if (!ep || !events || maxevents <= 0) { 373 errno = EINVAL; 374 return -1; 375 } 376 377 ep->stat.calls++; 378 379 #if SPIN_BEFORE_SLEEP 380 int spin = 0; 381 while (ep->num_events == 0 && spin < SPIN_THRESH) { 382 spin++; 383 } 384 #endif /* SPIN_BEFORE_SLEEP */ 385 386 if (pthread_mutex_lock(&ep->epoll_lock)) { 387 if (errno == EDEADLK) 388 perror("mtcp_epoll_wait: epoll_lock blocked\n"); 389 assert(0); 390 } 391 392 wait: 393 eq = ep->usr_queue; 394 eq_shadow = ep->usr_shadow_queue; 395 396 /* wait until event occurs */ 397 while (eq->num_events == 0 && eq_shadow->num_events == 0 && timeout != 0) { 398 399 #if INTR_SLEEPING_MTCP 400 /* signal to mtcp thread if it is sleeping */ 401 if (mtcp->wakeup_flag && mtcp->is_sleeping) { 402 pthread_kill(mtcp->ctx->thread, SIGUSR1); 403 } 404 #endif 405 ep->stat.waits++; 406 ep->waiting = TRUE; 407 if (timeout > 0) { 408 struct timespec deadline; 409 410 clock_gettime(CLOCK_REALTIME, &deadline); 411 if (timeout >= 1000) { 412 int sec; 413 sec = timeout / 1000; 414 deadline.tv_sec += sec; 415 timeout -= sec * 1000; 416 } 417 418 deadline.tv_nsec += timeout * 1000000; 419 420 if (deadline.tv_nsec >= 1000000000) { 421 deadline.tv_sec++; 422 deadline.tv_nsec -= 1000000000; 423 } 424 425 //deadline.tv_sec = mtcp->cur_tv.tv_sec; 426 //deadline.tv_nsec = (mtcp->cur_tv.tv_usec + timeout * 1000) * 1000; 427 ret = pthread_cond_timedwait(&ep->epoll_cond, 428 &ep->epoll_lock, &deadline); 429 if (ret && ret != ETIMEDOUT) { 430 /* errno set by pthread_cond_timedwait() */ 431 pthread_mutex_unlock(&ep->epoll_lock); 432 TRACE_ERROR("pthread_cond_timedwait failed. ret: %d, error: %s\n", 433 ret, strerror(errno)); 434 return -1; 435 } 436 timeout = 0; 437 } else if (timeout < 0) { 438 ret = pthread_cond_wait(&ep->epoll_cond, &ep->epoll_lock); 439 if (ret) { 440 /* errno set by pthread_cond_wait() */ 441 pthread_mutex_unlock(&ep->epoll_lock); 442 TRACE_ERROR("pthread_cond_wait failed. ret: %d, error: %s\n", 443 ret, strerror(errno)); 444 return -1; 445 } 446 } 447 ep->waiting = FALSE; 448 449 if (mtcp->ctx->done || mtcp->ctx->exit || mtcp->ctx->interrupt) { 450 mtcp->ctx->interrupt = FALSE; 451 //ret = pthread_cond_signal(&ep->epoll_cond); 452 pthread_mutex_unlock(&ep->epoll_lock); 453 errno = EINTR; 454 return -1; 455 } 456 457 } 458 459 /* fetch events from the user event queue */ 460 cnt = 0; 461 num_events = eq->num_events; 462 for (i = 0; i < num_events && cnt < maxevents; i++) { 463 event_socket = &mtcp->smap[eq->events[eq->start].sockid]; 464 validity = TRUE; 465 if (event_socket->socktype == MOS_SOCK_UNUSED) 466 validity = FALSE; 467 if (!(event_socket->epoll & eq->events[eq->start].ev.events)) 468 validity = FALSE; 469 if (!(event_socket->events & eq->events[eq->start].ev.events)) 470 validity = FALSE; 471 472 if (validity) { 473 events[cnt++] = eq->events[eq->start].ev; 474 assert(eq->events[eq->start].sockid >= 0); 475 476 TRACE_EPOLL("Socket %d: Handled event. event: %s, " 477 "start: %u, end: %u, num: %u\n", 478 event_socket->id, 479 EventToString(eq->events[eq->start].ev.events), 480 eq->start, eq->end, eq->num_events); 481 ep->stat.handled++; 482 } else { 483 TRACE_EPOLL("Socket %d: event %s invalidated.\n", 484 eq->events[eq->start].sockid, 485 EventToString(eq->events[eq->start].ev.events)); 486 ep->stat.invalidated++; 487 } 488 event_socket->events &= (~eq->events[eq->start].ev.events); 489 490 eq->start++; 491 eq->num_events--; 492 if (eq->start >= eq->size) { 493 eq->start = 0; 494 } 495 } 496 497 /* fetch eventes from user shadow event queue */ 498 eq = ep->usr_shadow_queue; 499 num_events = eq->num_events; 500 for (i = 0; i < num_events && cnt < maxevents; i++) { 501 event_socket = &mtcp->smap[eq->events[eq->start].sockid]; 502 validity = TRUE; 503 if (event_socket->socktype == MOS_SOCK_UNUSED) 504 validity = FALSE; 505 if (!(event_socket->epoll & eq->events[eq->start].ev.events)) 506 validity = FALSE; 507 if (!(event_socket->events & eq->events[eq->start].ev.events)) 508 validity = FALSE; 509 510 if (validity) { 511 events[cnt++] = eq->events[eq->start].ev; 512 assert(eq->events[eq->start].sockid >= 0); 513 514 TRACE_EPOLL("Socket %d: Handled event. event: %s, " 515 "start: %u, end: %u, num: %u\n", 516 event_socket->id, 517 EventToString(eq->events[eq->start].ev.events), 518 eq->start, eq->end, eq->num_events); 519 ep->stat.handled++; 520 } else { 521 TRACE_EPOLL("Socket %d: event %s invalidated.\n", 522 eq->events[eq->start].sockid, 523 EventToString(eq->events[eq->start].ev.events)); 524 ep->stat.invalidated++; 525 } 526 event_socket->events &= (~eq->events[eq->start].ev.events); 527 528 eq->start++; 529 eq->num_events--; 530 if (eq->start >= eq->size) { 531 eq->start = 0; 532 } 533 } 534 535 if (cnt == 0 && timeout != 0) 536 goto wait; 537 538 pthread_mutex_unlock(&ep->epoll_lock); 539 540 return cnt; 541 } 542 /*----------------------------------------------------------------------------*/ 543 inline int 544 AddEpollEvent(struct mtcp_epoll *ep, 545 int queue_type, socket_map_t socket, uint32_t event) 546 { 547 #ifdef DBGMSG 548 __PREPARE_DBGLOGGING(); 549 #endif 550 struct event_queue *eq; 551 int index; 552 553 if (!ep || !socket || !event) 554 return -1; 555 556 ep->stat.issued++; 557 558 if (socket->events & event) { 559 return 0; 560 } 561 562 if (queue_type == MOS_EVENT_QUEUE) { 563 eq = ep->mtcp_queue; 564 } else if (queue_type == USR_EVENT_QUEUE) { 565 eq = ep->usr_queue; 566 pthread_mutex_lock(&ep->epoll_lock); 567 } else if (queue_type == USR_SHADOW_EVENT_QUEUE) { 568 eq = ep->usr_shadow_queue; 569 } else { 570 TRACE_ERROR("Non-existing event queue type!\n"); 571 return -1; 572 } 573 574 if (eq->num_events >= eq->size) { 575 TRACE_ERROR("Exceeded epoll event queue! num_events: %d, size: %d\n", 576 eq->num_events, eq->size); 577 if (queue_type == USR_EVENT_QUEUE) 578 pthread_mutex_unlock(&ep->epoll_lock); 579 return -1; 580 } 581 582 index = eq->end++; 583 584 socket->events |= event; 585 eq->events[index].sockid = socket->id; 586 eq->events[index].ev.events = event; 587 eq->events[index].ev.data = socket->ep_data; 588 589 if (eq->end >= eq->size) { 590 eq->end = 0; 591 } 592 eq->num_events++; 593 594 TRACE_EPOLL("Socket %d New event: %s, start: %u, end: %u, num: %u\n", 595 eq->events[index].sockid, 596 EventToString(eq->events[index].ev.events), 597 eq->start, eq->end, eq->num_events); 598 599 if (queue_type == USR_EVENT_QUEUE) 600 pthread_mutex_unlock(&ep->epoll_lock); 601 602 ep->stat.registered++; 603 604 return 0; 605 } 606