1 #include "debug.h" 2 #include <sys/queue.h> 3 #include <unistd.h> 4 #include <time.h> 5 #include <signal.h> 6 #include <assert.h> 7 #include <string.h> 8 9 #include "mtcp.h" 10 #include "tcp_stream.h" 11 #include "eventpoll.h" 12 #include "tcp_in.h" 13 #include "pipe.h" 14 #include "tcp_rb.h" 15 #include "config.h" 16 17 #define MAX(a, b) ((a)>(b)?(a):(b)) 18 #define MIN(a, b) ((a)<(b)?(a):(b)) 19 20 #define SPIN_BEFORE_SLEEP FALSE 21 #define SPIN_THRESH 10000000 22 23 /*----------------------------------------------------------------------------*/ 24 char *event_str[] = {"NONE", "IN", "PRI", "OUT", "ERR", "HUP", "RDHUP"}; 25 /*----------------------------------------------------------------------------*/ 26 char * 27 EventToString(uint32_t event) 28 { 29 switch (event) { 30 case MOS_EPOLLNONE: 31 return event_str[0]; 32 break; 33 case MOS_EPOLLIN: 34 return event_str[1]; 35 break; 36 case MOS_EPOLLPRI: 37 return event_str[2]; 38 break; 39 case MOS_EPOLLOUT: 40 return event_str[3]; 41 break; 42 case MOS_EPOLLERR: 43 return event_str[4]; 44 break; 45 case MOS_EPOLLHUP: 46 return event_str[5]; 47 break; 48 case MOS_EPOLLRDHUP: 49 return event_str[6]; 50 break; 51 default: 52 assert(0); 53 } 54 55 assert(0); 56 return NULL; 57 } 58 /*----------------------------------------------------------------------------*/ 59 struct event_queue * 60 CreateEventQueue(int size) 61 { 62 struct event_queue *eq; 63 64 eq = (struct event_queue *)calloc(1, sizeof(struct event_queue)); 65 if (!eq) 66 return NULL; 67 68 eq->start = 0; 69 eq->end = 0; 70 eq->size = size; 71 eq->events = (struct mtcp_epoll_event_int *) 72 calloc(size, sizeof(struct mtcp_epoll_event_int)); 73 if (!eq->events) { 74 free(eq); 75 return NULL; 76 } 77 eq->num_events = 0; 78 79 return eq; 80 } 81 /*----------------------------------------------------------------------------*/ 82 void 83 DestroyEventQueue(struct event_queue *eq) 84 { 85 if (eq->events) 86 free(eq->events); 87 88 free(eq); 89 } 90 /*----------------------------------------------------------------------------*/ 91 int 92 mtcp_epoll_create(mctx_t mctx, int size) 93 { 94 mtcp_manager_t mtcp = g_mtcp[mctx->cpu]; 95 struct mtcp_epoll *ep; 96 socket_map_t epsocket; 97 98 if (size <= 0) { 99 errno = EINVAL; 100 return -1; 101 } 102 103 epsocket = AllocateSocket(mctx, MOS_SOCK_EPOLL); 104 if (!epsocket) { 105 errno = ENFILE; 106 return -1; 107 } 108 109 ep = (struct mtcp_epoll *)calloc(1, sizeof(struct mtcp_epoll)); 110 if (!ep) { 111 FreeSocket(mctx, epsocket->id, MOS_SOCK_EPOLL); 112 return -1; 113 } 114 115 /* create event queues */ 116 ep->usr_queue = CreateEventQueue(size); 117 if (!ep->usr_queue) 118 return -1; 119 120 ep->usr_shadow_queue = CreateEventQueue(size); 121 if (!ep->usr_shadow_queue) { 122 DestroyEventQueue(ep->usr_queue); 123 return -1; 124 } 125 126 ep->mtcp_queue = CreateEventQueue(size); 127 if (!ep->mtcp_queue) { 128 DestroyEventQueue(ep->usr_queue); 129 DestroyEventQueue(ep->usr_shadow_queue); 130 return -1; 131 } 132 133 TRACE_EPOLL("epoll structure of size %d created.\n", size); 134 135 mtcp->ep = ep; 136 epsocket->ep = ep; 137 138 if (pthread_mutex_init(&ep->epoll_lock, NULL)) { 139 return -1; 140 } 141 if (pthread_cond_init(&ep->epoll_cond, NULL)) { 142 return -1; 143 } 144 145 return epsocket->id; 146 } 147 /*----------------------------------------------------------------------------*/ 148 int 149 CloseEpollSocket(mctx_t mctx, int epid) 150 { 151 mtcp_manager_t mtcp; 152 struct mtcp_epoll *ep; 153 154 mtcp = GetMTCPManager(mctx); 155 if (!mtcp) { 156 return -1; 157 } 158 159 ep = mtcp->smap[epid].ep; 160 if (!ep) { 161 errno = EINVAL; 162 return -1; 163 } 164 165 DestroyEventQueue(ep->usr_queue); 166 DestroyEventQueue(ep->usr_shadow_queue); 167 DestroyEventQueue(ep->mtcp_queue); 168 free(ep); 169 170 pthread_mutex_lock(&ep->epoll_lock); 171 mtcp->ep = NULL; 172 mtcp->smap[epid].ep = NULL; 173 pthread_cond_signal(&ep->epoll_cond); 174 pthread_mutex_unlock(&ep->epoll_lock); 175 176 pthread_cond_destroy(&ep->epoll_cond); 177 pthread_mutex_destroy(&ep->epoll_lock); 178 179 return 0; 180 } 181 /*----------------------------------------------------------------------------*/ 182 static int 183 RaisePendingStreamEvents(mtcp_manager_t mtcp, 184 struct mtcp_epoll *ep, socket_map_t socket) 185 { 186 tcp_stream *stream = socket->stream; 187 188 if (!stream) 189 return -1; 190 if (stream->state < TCP_ST_ESTABLISHED) 191 return -1; 192 193 TRACE_EPOLL("Stream %d at state %s\n", 194 stream->id, TCPStateToString(stream)); 195 /* if there are payloads already read before epoll registration */ 196 /* generate read event */ 197 if (socket->epoll & MOS_EPOLLIN) { 198 struct tcp_recv_vars *rcvvar = stream->rcvvar; 199 if (rcvvar->rcvbuf && tcprb_cflen(rcvvar->rcvbuf) > 0) { 200 TRACE_EPOLL("Socket %d: Has existing payloads\n", socket->id); 201 AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 202 } else if (stream->state == TCP_ST_CLOSE_WAIT) { 203 TRACE_EPOLL("Socket %d: Waiting for close\n", socket->id); 204 AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 205 } 206 } 207 208 /* same thing to the write event */ 209 if (socket->epoll & MOS_EPOLLOUT) { 210 struct tcp_send_vars *sndvar = stream->sndvar; 211 if (!sndvar->sndbuf || 212 (sndvar->sndbuf && sndvar->sndbuf->len < sndvar->snd_wnd)) { 213 if (!(socket->events & MOS_EPOLLOUT)) { 214 TRACE_EPOLL("Socket %d: Adding write event\n", socket->id); 215 AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 216 } 217 } 218 } 219 220 return 0; 221 } 222 /*----------------------------------------------------------------------------*/ 223 int 224 mtcp_epoll_ctl(mctx_t mctx, int epid, 225 int op, int sockid, struct mtcp_epoll_event *event) 226 { 227 mtcp_manager_t mtcp; 228 struct mtcp_epoll *ep; 229 socket_map_t socket; 230 uint32_t events; 231 232 mtcp = GetMTCPManager(mctx); 233 if (!mtcp) { 234 return -1; 235 } 236 237 if (epid < 0 || epid >= g_config.mos->max_concurrency) { 238 TRACE_API("Epoll id %d out of range.\n", epid); 239 errno = EBADF; 240 return -1; 241 } 242 243 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 244 TRACE_API("Socket id %d out of range.\n", sockid); 245 errno = EBADF; 246 return -1; 247 } 248 249 if (mtcp->smap[epid].socktype == MOS_SOCK_UNUSED) { 250 errno = EBADF; 251 return -1; 252 } 253 254 if (mtcp->smap[epid].socktype != MOS_SOCK_EPOLL) { 255 errno = EINVAL; 256 return -1; 257 } 258 259 ep = mtcp->smap[epid].ep; 260 if (!ep || (!event && op != MOS_EPOLL_CTL_DEL)) { 261 errno = EINVAL; 262 return -1; 263 } 264 socket = &mtcp->smap[sockid]; 265 266 if (op == MOS_EPOLL_CTL_ADD) { 267 if (socket->epoll) { 268 errno = EEXIST; 269 return -1; 270 } 271 272 /* EPOLLERR and EPOLLHUP are registered as default */ 273 events = event->events; 274 events |= (MOS_EPOLLERR | MOS_EPOLLHUP); 275 socket->ep_data = event->data; 276 socket->epoll = events; 277 278 TRACE_EPOLL("Adding epoll socket %d(type %d) ET: %llu, IN: %llu, OUT: %llu\n", 279 socket->id, socket->socktype, 280 (unsigned long long)socket->epoll & MOS_EPOLLET, 281 (unsigned long long)socket->epoll & MOS_EPOLLIN, 282 (unsigned long long)socket->epoll & MOS_EPOLLOUT); 283 284 if (socket->socktype == MOS_SOCK_STREAM) { 285 RaisePendingStreamEvents(mtcp, ep, socket); 286 } else if (socket->socktype == MOS_SOCK_PIPE) { 287 RaisePendingPipeEvents(mctx, epid, sockid); 288 } 289 290 } else if (op == MOS_EPOLL_CTL_MOD) { 291 if (!socket->epoll) { 292 pthread_mutex_unlock(&ep->epoll_lock); 293 errno = ENOENT; 294 return -1; 295 } 296 297 events = event->events; 298 events |= (MOS_EPOLLERR | MOS_EPOLLHUP); 299 socket->ep_data = event->data; 300 socket->epoll = events; 301 302 if (socket->socktype == MOS_SOCK_STREAM) { 303 RaisePendingStreamEvents(mtcp, ep, socket); 304 } else if (socket->socktype == MOS_SOCK_PIPE) { 305 RaisePendingPipeEvents(mctx, epid, sockid); 306 } 307 308 } else if (op == MOS_EPOLL_CTL_DEL) { 309 if (!socket->epoll) { 310 errno = ENOENT; 311 return -1; 312 } 313 314 socket->epoll = MOS_EPOLLNONE; 315 } 316 317 return 0; 318 } 319 /*----------------------------------------------------------------------------*/ 320 int 321 mtcp_epoll_wait(mctx_t mctx, int epid, 322 struct mtcp_epoll_event *events, int maxevents, int timeout) 323 { 324 mtcp_manager_t mtcp; 325 struct mtcp_epoll *ep; 326 struct event_queue *eq; 327 struct event_queue *eq_shadow; 328 socket_map_t event_socket; 329 int validity; 330 int i, cnt, ret; 331 int num_events; 332 333 mtcp = GetMTCPManager(mctx); 334 if (!mtcp) { 335 return -1; 336 } 337 338 if (epid < 0 || epid >= g_config.mos->max_concurrency) { 339 TRACE_API("Epoll id %d out of range.\n", epid); 340 errno = EBADF; 341 return -1; 342 } 343 344 if (mtcp->smap[epid].socktype == MOS_SOCK_UNUSED) { 345 errno = EBADF; 346 return -1; 347 } 348 349 if (mtcp->smap[epid].socktype != MOS_SOCK_EPOLL) { 350 errno = EINVAL; 351 return -1; 352 } 353 354 ep = mtcp->smap[epid].ep; 355 if (!ep || !events || maxevents <= 0) { 356 errno = EINVAL; 357 return -1; 358 } 359 360 ep->stat.calls++; 361 362 #if SPIN_BEFORE_SLEEP 363 int spin = 0; 364 while (ep->num_events == 0 && spin < SPIN_THRESH) { 365 spin++; 366 } 367 #endif /* SPIN_BEFORE_SLEEP */ 368 369 if (pthread_mutex_lock(&ep->epoll_lock)) { 370 if (errno == EDEADLK) 371 perror("mtcp_epoll_wait: epoll_lock blocked\n"); 372 assert(0); 373 } 374 375 wait: 376 eq = ep->usr_queue; 377 eq_shadow = ep->usr_shadow_queue; 378 379 /* wait until event occurs */ 380 while (eq->num_events == 0 && eq_shadow->num_events == 0 && timeout != 0) { 381 382 #if INTR_SLEEPING_MTCP 383 /* signal to mtcp thread if it is sleeping */ 384 if (mtcp->wakeup_flag && mtcp->is_sleeping) { 385 pthread_kill(mtcp->ctx->thread, SIGUSR1); 386 } 387 #endif 388 ep->stat.waits++; 389 ep->waiting = TRUE; 390 if (timeout > 0) { 391 struct timespec deadline; 392 393 clock_gettime(CLOCK_REALTIME, &deadline); 394 if (timeout > 1000) { 395 int sec; 396 sec = timeout / 1000; 397 deadline.tv_sec += sec; 398 timeout -= sec * 1000; 399 } 400 401 if (deadline.tv_nsec >= 1000000000) { 402 deadline.tv_sec++; 403 deadline.tv_nsec -= 1000000000; 404 } 405 406 //deadline.tv_sec = mtcp->cur_tv.tv_sec; 407 //deadline.tv_nsec = (mtcp->cur_tv.tv_usec + timeout * 1000) * 1000; 408 ret = pthread_cond_timedwait(&ep->epoll_cond, 409 &ep->epoll_lock, &deadline); 410 if (ret && ret != ETIMEDOUT) { 411 /* errno set by pthread_cond_timedwait() */ 412 pthread_mutex_unlock(&ep->epoll_lock); 413 TRACE_ERROR("pthread_cond_timedwait failed. ret: %d, error: %s\n", 414 ret, strerror(errno)); 415 return -1; 416 } 417 timeout = 0; 418 } else if (timeout < 0) { 419 ret = pthread_cond_wait(&ep->epoll_cond, &ep->epoll_lock); 420 if (ret) { 421 /* errno set by pthread_cond_wait() */ 422 pthread_mutex_unlock(&ep->epoll_lock); 423 TRACE_ERROR("pthread_cond_wait failed. ret: %d, error: %s\n", 424 ret, strerror(errno)); 425 return -1; 426 } 427 } 428 ep->waiting = FALSE; 429 430 if (mtcp->ctx->done || mtcp->ctx->exit || mtcp->ctx->interrupt) { 431 mtcp->ctx->interrupt = FALSE; 432 //ret = pthread_cond_signal(&ep->epoll_cond); 433 pthread_mutex_unlock(&ep->epoll_lock); 434 errno = EINTR; 435 return -1; 436 } 437 438 } 439 440 /* fetch events from the user event queue */ 441 cnt = 0; 442 num_events = eq->num_events; 443 for (i = 0; i < num_events && cnt < maxevents; i++) { 444 event_socket = &mtcp->smap[eq->events[eq->start].sockid]; 445 validity = TRUE; 446 if (event_socket->socktype == MOS_SOCK_UNUSED) 447 validity = FALSE; 448 if (!(event_socket->epoll & eq->events[eq->start].ev.events)) 449 validity = FALSE; 450 if (!(event_socket->events & eq->events[eq->start].ev.events)) 451 validity = FALSE; 452 453 if (validity) { 454 events[cnt++] = eq->events[eq->start].ev; 455 assert(eq->events[eq->start].sockid >= 0); 456 457 TRACE_EPOLL("Socket %d: Handled event. event: %s, " 458 "start: %u, end: %u, num: %u\n", 459 event_socket->id, 460 EventToString(eq->events[eq->start].ev.events), 461 eq->start, eq->end, eq->num_events); 462 ep->stat.handled++; 463 } else { 464 TRACE_EPOLL("Socket %d: event %s invalidated.\n", 465 eq->events[eq->start].sockid, 466 EventToString(eq->events[eq->start].ev.events)); 467 ep->stat.invalidated++; 468 } 469 event_socket->events &= (~eq->events[eq->start].ev.events); 470 471 eq->start++; 472 eq->num_events--; 473 if (eq->start >= eq->size) { 474 eq->start = 0; 475 } 476 } 477 478 /* fetch eventes from user shadow event queue */ 479 eq = ep->usr_shadow_queue; 480 num_events = eq->num_events; 481 for (i = 0; i < num_events && cnt < maxevents; i++) { 482 event_socket = &mtcp->smap[eq->events[eq->start].sockid]; 483 validity = TRUE; 484 if (event_socket->socktype == MOS_SOCK_UNUSED) 485 validity = FALSE; 486 if (!(event_socket->epoll & eq->events[eq->start].ev.events)) 487 validity = FALSE; 488 if (!(event_socket->events & eq->events[eq->start].ev.events)) 489 validity = FALSE; 490 491 if (validity) { 492 events[cnt++] = eq->events[eq->start].ev; 493 assert(eq->events[eq->start].sockid >= 0); 494 495 TRACE_EPOLL("Socket %d: Handled event. event: %s, " 496 "start: %u, end: %u, num: %u\n", 497 event_socket->id, 498 EventToString(eq->events[eq->start].ev.events), 499 eq->start, eq->end, eq->num_events); 500 ep->stat.handled++; 501 } else { 502 TRACE_EPOLL("Socket %d: event %s invalidated.\n", 503 eq->events[eq->start].sockid, 504 EventToString(eq->events[eq->start].ev.events)); 505 ep->stat.invalidated++; 506 } 507 event_socket->events &= (~eq->events[eq->start].ev.events); 508 509 eq->start++; 510 eq->num_events--; 511 if (eq->start >= eq->size) { 512 eq->start = 0; 513 } 514 } 515 516 if (cnt == 0 && timeout != 0) 517 goto wait; 518 519 pthread_mutex_unlock(&ep->epoll_lock); 520 521 return cnt; 522 } 523 /*----------------------------------------------------------------------------*/ 524 inline int 525 AddEpollEvent(struct mtcp_epoll *ep, 526 int queue_type, socket_map_t socket, uint32_t event) 527 { 528 #ifdef DBGMSG 529 __PREPARE_DBGLOGGING(); 530 #endif 531 struct event_queue *eq; 532 int index; 533 534 if (!ep || !socket || !event) 535 return -1; 536 537 ep->stat.issued++; 538 539 if (socket->events & event) { 540 return 0; 541 } 542 543 if (queue_type == MOS_EVENT_QUEUE) { 544 eq = ep->mtcp_queue; 545 } else if (queue_type == USR_EVENT_QUEUE) { 546 eq = ep->usr_queue; 547 pthread_mutex_lock(&ep->epoll_lock); 548 } else if (queue_type == USR_SHADOW_EVENT_QUEUE) { 549 eq = ep->usr_shadow_queue; 550 } else { 551 TRACE_ERROR("Non-existing event queue type!\n"); 552 return -1; 553 } 554 555 if (eq->num_events >= eq->size) { 556 TRACE_ERROR("Exceeded epoll event queue! num_events: %d, size: %d\n", 557 eq->num_events, eq->size); 558 if (queue_type == USR_EVENT_QUEUE) 559 pthread_mutex_unlock(&ep->epoll_lock); 560 return -1; 561 } 562 563 index = eq->end++; 564 565 socket->events |= event; 566 eq->events[index].sockid = socket->id; 567 eq->events[index].ev.events = event; 568 eq->events[index].ev.data = socket->ep_data; 569 570 if (eq->end >= eq->size) { 571 eq->end = 0; 572 } 573 eq->num_events++; 574 575 TRACE_EPOLL("Socket %d New event: %s, start: %u, end: %u, num: %u\n", 576 eq->events[index].sockid, 577 EventToString(eq->events[index].ev.events), 578 eq->start, eq->end, eq->num_events); 579 580 if (queue_type == USR_EVENT_QUEUE) 581 pthread_mutex_unlock(&ep->epoll_lock); 582 583 ep->stat.registered++; 584 585 return 0; 586 } 587