1 #include "debug.h" 2 #include <sys/queue.h> 3 #include <unistd.h> 4 #include <time.h> 5 #include <signal.h> 6 #include <assert.h> 7 #include <string.h> 8 9 #include "mtcp.h" 10 #include "tcp_stream.h" 11 #include "eventpoll.h" 12 #include "tcp_in.h" 13 #include "pipe.h" 14 #include "tcp_rb.h" 15 #include "config.h" 16 17 #define MAX(a, b) ((a)>(b)?(a):(b)) 18 #define MIN(a, b) ((a)<(b)?(a):(b)) 19 20 #define SPIN_BEFORE_SLEEP FALSE 21 #define SPIN_THRESH 10000000 22 23 /*----------------------------------------------------------------------------*/ 24 char *event_str[] = {"NONE", "IN", "PRI", "OUT", "ERR", "HUP", "RDHUP"}; 25 /*----------------------------------------------------------------------------*/ 26 char * 27 EventToString(uint32_t event) 28 { 29 switch (event) { 30 case MOS_EPOLLNONE: 31 return event_str[0]; 32 break; 33 case MOS_EPOLLIN: 34 return event_str[1]; 35 break; 36 case MOS_EPOLLPRI: 37 return event_str[2]; 38 break; 39 case MOS_EPOLLOUT: 40 return event_str[3]; 41 break; 42 case MOS_EPOLLERR: 43 return event_str[4]; 44 break; 45 case MOS_EPOLLHUP: 46 return event_str[5]; 47 break; 48 case MOS_EPOLLRDHUP: 49 return event_str[6]; 50 break; 51 default: 52 assert(0); 53 } 54 55 assert(0); 56 return NULL; 57 } 58 /*----------------------------------------------------------------------------*/ 59 struct event_queue * 60 CreateEventQueue(int size) 61 { 62 struct event_queue *eq; 63 64 eq = (struct event_queue *)calloc(1, sizeof(struct event_queue)); 65 if (!eq) 66 return NULL; 67 68 eq->start = 0; 69 eq->end = 0; 70 eq->size = size; 71 eq->events = (struct mtcp_epoll_event_int *) 72 calloc(size, sizeof(struct mtcp_epoll_event_int)); 73 if (!eq->events) { 74 free(eq); 75 return NULL; 76 } 77 eq->num_events = 0; 78 79 return eq; 80 } 81 /*----------------------------------------------------------------------------*/ 82 void 83 DestroyEventQueue(struct event_queue *eq) 84 { 85 if (eq->events) 86 free(eq->events); 87 88 free(eq); 89 } 90 /*----------------------------------------------------------------------------*/ 91 int 92 mtcp_epoll_create(mctx_t mctx, int size) 93 { 94 mtcp_manager_t mtcp = g_mtcp[mctx->cpu]; 95 struct mtcp_epoll *ep; 96 socket_map_t epsocket; 97 98 if (size <= 0) { 99 errno = EINVAL; 100 return -1; 101 } 102 103 epsocket = AllocateSocket(mctx, MOS_SOCK_EPOLL); 104 if (!epsocket) { 105 errno = ENFILE; 106 return -1; 107 } 108 109 ep = (struct mtcp_epoll *)calloc(1, sizeof(struct mtcp_epoll)); 110 if (!ep) { 111 FreeSocket(mctx, epsocket->id, MOS_SOCK_EPOLL); 112 return -1; 113 } 114 115 /* create event queues */ 116 ep->usr_queue = CreateEventQueue(size); 117 if (!ep->usr_queue) 118 return -1; 119 120 ep->usr_shadow_queue = CreateEventQueue(size); 121 if (!ep->usr_shadow_queue) { 122 DestroyEventQueue(ep->usr_queue); 123 return -1; 124 } 125 126 ep->mtcp_queue = CreateEventQueue(size); 127 if (!ep->mtcp_queue) { 128 DestroyEventQueue(ep->usr_queue); 129 DestroyEventQueue(ep->usr_shadow_queue); 130 return -1; 131 } 132 133 TRACE_EPOLL("epoll structure of size %d created.\n", size); 134 135 mtcp->ep = ep; 136 epsocket->ep = ep; 137 138 if (pthread_mutex_init(&ep->epoll_lock, NULL)) { 139 return -1; 140 } 141 if (pthread_cond_init(&ep->epoll_cond, NULL)) { 142 return -1; 143 } 144 145 return epsocket->id; 146 } 147 /*----------------------------------------------------------------------------*/ 148 int 149 CloseEpollSocket(mctx_t mctx, int epid) 150 { 151 mtcp_manager_t mtcp; 152 struct mtcp_epoll *ep; 153 154 mtcp = GetMTCPManager(mctx); 155 if (!mtcp) { 156 return -1; 157 } 158 159 ep = mtcp->smap[epid].ep; 160 if (!ep) { 161 errno = EINVAL; 162 return -1; 163 } 164 165 DestroyEventQueue(ep->usr_queue); 166 DestroyEventQueue(ep->usr_shadow_queue); 167 DestroyEventQueue(ep->mtcp_queue); 168 free(ep); 169 170 pthread_mutex_lock(&ep->epoll_lock); 171 mtcp->ep = NULL; 172 mtcp->smap[epid].ep = NULL; 173 pthread_cond_signal(&ep->epoll_cond); 174 pthread_mutex_unlock(&ep->epoll_lock); 175 176 pthread_cond_destroy(&ep->epoll_cond); 177 pthread_mutex_destroy(&ep->epoll_lock); 178 179 return 0; 180 } 181 /*----------------------------------------------------------------------------*/ 182 static int 183 RaisePendingStreamEvents(mtcp_manager_t mtcp, 184 struct mtcp_epoll *ep, socket_map_t socket) 185 { 186 tcp_stream *stream = socket->stream; 187 188 if (!stream) 189 return -1; 190 if (stream->state < TCP_ST_ESTABLISHED) 191 return -1; 192 193 TRACE_EPOLL("Stream %d at state %s\n", 194 stream->id, TCPStateToString(stream)); 195 /* if there are payloads already read before epoll registration */ 196 /* generate read event */ 197 if (socket->epoll & MOS_EPOLLIN) { 198 struct tcp_recv_vars *rcvvar = stream->rcvvar; 199 #ifdef NEWRB 200 if (rcvvar->rcvbuf && tcprb_cflen(rcvvar->rcvbuf) > 0) { 201 #else 202 if (rcvvar->rcvbuf && rcvvar->rcvbuf->merged_len > 0) { 203 #endif 204 TRACE_EPOLL("Socket %d: Has existing payloads\n", socket->id); 205 AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 206 } else if (stream->state == TCP_ST_CLOSE_WAIT) { 207 TRACE_EPOLL("Socket %d: Waiting for close\n", socket->id); 208 AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 209 } 210 } 211 212 /* same thing to the write event */ 213 if (socket->epoll & MOS_EPOLLOUT) { 214 struct tcp_send_vars *sndvar = stream->sndvar; 215 if (!sndvar->sndbuf || 216 (sndvar->sndbuf && sndvar->sndbuf->len < sndvar->snd_wnd)) { 217 if (!(socket->events & MOS_EPOLLOUT)) { 218 TRACE_EPOLL("Socket %d: Adding write event\n", socket->id); 219 AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 220 } 221 } 222 } 223 224 return 0; 225 } 226 /*----------------------------------------------------------------------------*/ 227 int 228 mtcp_epoll_ctl(mctx_t mctx, int epid, 229 int op, int sockid, struct mtcp_epoll_event *event) 230 { 231 mtcp_manager_t mtcp; 232 struct mtcp_epoll *ep; 233 socket_map_t socket; 234 uint32_t events; 235 236 mtcp = GetMTCPManager(mctx); 237 if (!mtcp) { 238 return -1; 239 } 240 241 if (epid < 0 || epid >= g_config.mos->max_concurrency) { 242 TRACE_API("Epoll id %d out of range.\n", epid); 243 errno = EBADF; 244 return -1; 245 } 246 247 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 248 TRACE_API("Socket id %d out of range.\n", sockid); 249 errno = EBADF; 250 return -1; 251 } 252 253 if (mtcp->smap[epid].socktype == MOS_SOCK_UNUSED) { 254 errno = EBADF; 255 return -1; 256 } 257 258 if (mtcp->smap[epid].socktype != MOS_SOCK_EPOLL) { 259 errno = EINVAL; 260 return -1; 261 } 262 263 ep = mtcp->smap[epid].ep; 264 if (!ep || (!event && op != MOS_EPOLL_CTL_DEL)) { 265 errno = EINVAL; 266 return -1; 267 } 268 socket = &mtcp->smap[sockid]; 269 270 if (op == MOS_EPOLL_CTL_ADD) { 271 if (socket->epoll) { 272 errno = EEXIST; 273 return -1; 274 } 275 276 /* EPOLLERR and EPOLLHUP are registered as default */ 277 events = event->events; 278 events |= (MOS_EPOLLERR | MOS_EPOLLHUP); 279 socket->ep_data = event->data; 280 socket->epoll = events; 281 282 TRACE_EPOLL("Adding epoll socket %d(type %d) ET: %u, IN: %u, OUT: %u\n", 283 socket->id, socket->socktype, socket->epoll & MOS_EPOLLET, 284 socket->epoll & MOS_EPOLLIN, socket->epoll & MOS_EPOLLOUT); 285 286 if (socket->socktype == MOS_SOCK_STREAM) { 287 RaisePendingStreamEvents(mtcp, ep, socket); 288 } else if (socket->socktype == MOS_SOCK_PIPE) { 289 RaisePendingPipeEvents(mctx, epid, sockid); 290 } 291 292 } else if (op == MOS_EPOLL_CTL_MOD) { 293 if (!socket->epoll) { 294 pthread_mutex_unlock(&ep->epoll_lock); 295 errno = ENOENT; 296 return -1; 297 } 298 299 events = event->events; 300 events |= (MOS_EPOLLERR | MOS_EPOLLHUP); 301 socket->ep_data = event->data; 302 socket->epoll = events; 303 304 if (socket->socktype == MOS_SOCK_STREAM) { 305 RaisePendingStreamEvents(mtcp, ep, socket); 306 } else if (socket->socktype == MOS_SOCK_PIPE) { 307 RaisePendingPipeEvents(mctx, epid, sockid); 308 } 309 310 } else if (op == MOS_EPOLL_CTL_DEL) { 311 if (!socket->epoll) { 312 errno = ENOENT; 313 return -1; 314 } 315 316 socket->epoll = MOS_EPOLLNONE; 317 } 318 319 return 0; 320 } 321 /*----------------------------------------------------------------------------*/ 322 int 323 mtcp_epoll_wait(mctx_t mctx, int epid, 324 struct mtcp_epoll_event *events, int maxevents, int timeout) 325 { 326 mtcp_manager_t mtcp; 327 struct mtcp_epoll *ep; 328 struct event_queue *eq; 329 struct event_queue *eq_shadow; 330 socket_map_t event_socket; 331 int validity; 332 int i, cnt, ret; 333 int num_events; 334 335 mtcp = GetMTCPManager(mctx); 336 if (!mtcp) { 337 return -1; 338 } 339 340 if (epid < 0 || epid >= g_config.mos->max_concurrency) { 341 TRACE_API("Epoll id %d out of range.\n", epid); 342 errno = EBADF; 343 return -1; 344 } 345 346 if (mtcp->smap[epid].socktype == MOS_SOCK_UNUSED) { 347 errno = EBADF; 348 return -1; 349 } 350 351 if (mtcp->smap[epid].socktype != MOS_SOCK_EPOLL) { 352 errno = EINVAL; 353 return -1; 354 } 355 356 ep = mtcp->smap[epid].ep; 357 if (!ep || !events || maxevents <= 0) { 358 errno = EINVAL; 359 return -1; 360 } 361 362 ep->stat.calls++; 363 364 #if SPIN_BEFORE_SLEEP 365 int spin = 0; 366 while (ep->num_events == 0 && spin < SPIN_THRESH) { 367 spin++; 368 } 369 #endif /* SPIN_BEFORE_SLEEP */ 370 371 if (pthread_mutex_lock(&ep->epoll_lock)) { 372 if (errno == EDEADLK) 373 perror("mtcp_epoll_wait: epoll_lock blocked\n"); 374 assert(0); 375 } 376 377 wait: 378 eq = ep->usr_queue; 379 eq_shadow = ep->usr_shadow_queue; 380 381 /* wait until event occurs */ 382 while (eq->num_events == 0 && eq_shadow->num_events == 0 && timeout != 0) { 383 384 #if INTR_SLEEPING_MTCP 385 /* signal to mtcp thread if it is sleeping */ 386 if (mtcp->wakeup_flag && mtcp->is_sleeping) { 387 pthread_kill(mtcp->ctx->thread, SIGUSR1); 388 } 389 #endif 390 ep->stat.waits++; 391 ep->waiting = TRUE; 392 if (timeout > 0) { 393 struct timespec deadline; 394 395 clock_gettime(CLOCK_REALTIME, &deadline); 396 if (timeout > 1000) { 397 int sec; 398 sec = timeout / 1000; 399 deadline.tv_sec += sec; 400 timeout -= sec * 1000; 401 } 402 403 if (deadline.tv_nsec >= 1000000000) { 404 deadline.tv_sec++; 405 deadline.tv_nsec -= 1000000000; 406 } 407 408 //deadline.tv_sec = mtcp->cur_tv.tv_sec; 409 //deadline.tv_nsec = (mtcp->cur_tv.tv_usec + timeout * 1000) * 1000; 410 ret = pthread_cond_timedwait(&ep->epoll_cond, 411 &ep->epoll_lock, &deadline); 412 if (ret && ret != ETIMEDOUT) { 413 /* errno set by pthread_cond_timedwait() */ 414 pthread_mutex_unlock(&ep->epoll_lock); 415 TRACE_ERROR("pthread_cond_timedwait failed. ret: %d, error: %s\n", 416 ret, strerror(errno)); 417 return -1; 418 } 419 timeout = 0; 420 } else if (timeout < 0) { 421 ret = pthread_cond_wait(&ep->epoll_cond, &ep->epoll_lock); 422 if (ret) { 423 /* errno set by pthread_cond_wait() */ 424 pthread_mutex_unlock(&ep->epoll_lock); 425 TRACE_ERROR("pthread_cond_wait failed. ret: %d, error: %s\n", 426 ret, strerror(errno)); 427 return -1; 428 } 429 } 430 ep->waiting = FALSE; 431 432 if (mtcp->ctx->done || mtcp->ctx->exit || mtcp->ctx->interrupt) { 433 mtcp->ctx->interrupt = FALSE; 434 //ret = pthread_cond_signal(&ep->epoll_cond); 435 pthread_mutex_unlock(&ep->epoll_lock); 436 errno = EINTR; 437 return -1; 438 } 439 440 } 441 442 /* fetch events from the user event queue */ 443 cnt = 0; 444 num_events = eq->num_events; 445 for (i = 0; i < num_events && cnt < maxevents; i++) { 446 event_socket = &mtcp->smap[eq->events[eq->start].sockid]; 447 validity = TRUE; 448 if (event_socket->socktype == MOS_SOCK_UNUSED) 449 validity = FALSE; 450 if (!(event_socket->epoll & eq->events[eq->start].ev.events)) 451 validity = FALSE; 452 if (!(event_socket->events & eq->events[eq->start].ev.events)) 453 validity = FALSE; 454 455 if (validity) { 456 events[cnt++] = eq->events[eq->start].ev; 457 assert(eq->events[eq->start].sockid >= 0); 458 459 TRACE_EPOLL("Socket %d: Handled event. event: %s, " 460 "start: %u, end: %u, num: %u\n", 461 event_socket->id, 462 EventToString(eq->events[eq->start].ev.events), 463 eq->start, eq->end, eq->num_events); 464 ep->stat.handled++; 465 } else { 466 TRACE_EPOLL("Socket %d: event %s invalidated.\n", 467 eq->events[eq->start].sockid, 468 EventToString(eq->events[eq->start].ev.events)); 469 ep->stat.invalidated++; 470 } 471 event_socket->events &= (~eq->events[eq->start].ev.events); 472 473 eq->start++; 474 eq->num_events--; 475 if (eq->start >= eq->size) { 476 eq->start = 0; 477 } 478 } 479 480 /* fetch eventes from user shadow event queue */ 481 eq = ep->usr_shadow_queue; 482 num_events = eq->num_events; 483 for (i = 0; i < num_events && cnt < maxevents; i++) { 484 event_socket = &mtcp->smap[eq->events[eq->start].sockid]; 485 validity = TRUE; 486 if (event_socket->socktype == MOS_SOCK_UNUSED) 487 validity = FALSE; 488 if (!(event_socket->epoll & eq->events[eq->start].ev.events)) 489 validity = FALSE; 490 if (!(event_socket->events & eq->events[eq->start].ev.events)) 491 validity = FALSE; 492 493 if (validity) { 494 events[cnt++] = eq->events[eq->start].ev; 495 assert(eq->events[eq->start].sockid >= 0); 496 497 TRACE_EPOLL("Socket %d: Handled event. event: %s, " 498 "start: %u, end: %u, num: %u\n", 499 event_socket->id, 500 EventToString(eq->events[eq->start].ev.events), 501 eq->start, eq->end, eq->num_events); 502 ep->stat.handled++; 503 } else { 504 TRACE_EPOLL("Socket %d: event %s invalidated.\n", 505 eq->events[eq->start].sockid, 506 EventToString(eq->events[eq->start].ev.events)); 507 ep->stat.invalidated++; 508 } 509 event_socket->events &= (~eq->events[eq->start].ev.events); 510 511 eq->start++; 512 eq->num_events--; 513 if (eq->start >= eq->size) { 514 eq->start = 0; 515 } 516 } 517 518 if (cnt == 0 && timeout != 0) 519 goto wait; 520 521 pthread_mutex_unlock(&ep->epoll_lock); 522 523 return cnt; 524 } 525 /*----------------------------------------------------------------------------*/ 526 inline int 527 AddEpollEvent(struct mtcp_epoll *ep, 528 int queue_type, socket_map_t socket, uint32_t event) 529 { 530 #ifdef DBGMSG 531 __PREPARE_DGBLOGGING(); 532 #endif 533 struct event_queue *eq; 534 int index; 535 536 if (!ep || !socket || !event) 537 return -1; 538 539 ep->stat.issued++; 540 541 if (socket->events & event) { 542 return 0; 543 } 544 545 if (queue_type == MOS_EVENT_QUEUE) { 546 eq = ep->mtcp_queue; 547 } else if (queue_type == USR_EVENT_QUEUE) { 548 eq = ep->usr_queue; 549 pthread_mutex_lock(&ep->epoll_lock); 550 } else if (queue_type == USR_SHADOW_EVENT_QUEUE) { 551 eq = ep->usr_shadow_queue; 552 } else { 553 TRACE_ERROR("Non-existing event queue type!\n"); 554 return -1; 555 } 556 557 if (eq->num_events >= eq->size) { 558 TRACE_ERROR("Exceeded epoll event queue! num_events: %d, size: %d\n", 559 eq->num_events, eq->size); 560 if (queue_type == USR_EVENT_QUEUE) 561 pthread_mutex_unlock(&ep->epoll_lock); 562 return -1; 563 } 564 565 index = eq->end++; 566 567 socket->events |= event; 568 eq->events[index].sockid = socket->id; 569 eq->events[index].ev.events = event; 570 eq->events[index].ev.data = socket->ep_data; 571 572 if (eq->end >= eq->size) { 573 eq->end = 0; 574 } 575 eq->num_events++; 576 577 TRACE_EPOLL("Socket %d New event: %s, start: %u, end: %u, num: %u\n", 578 eq->events[index].sockid, 579 EventToString(eq->events[index].ev.events), 580 eq->start, eq->end, eq->num_events); 581 582 if (queue_type == USR_EVENT_QUEUE) 583 pthread_mutex_unlock(&ep->epoll_lock); 584 585 ep->stat.registered++; 586 587 return 0; 588 } 589