1 #include "debug.h" 2 #include <sys/queue.h> 3 #include <unistd.h> 4 #include <time.h> 5 #include <signal.h> 6 #include <assert.h> 7 #include <string.h> 8 9 #include "mtcp.h" 10 #include "tcp_stream.h" 11 #include "eventpoll.h" 12 #include "tcp_in.h" 13 #include "pipe.h" 14 #include "tcp_rb.h" 15 #include "config.h" 16 17 #define MAX(a, b) ((a)>(b)?(a):(b)) 18 #define MIN(a, b) ((a)<(b)?(a):(b)) 19 20 #define SPIN_BEFORE_SLEEP FALSE 21 #define SPIN_THRESH 10000000 22 23 /*----------------------------------------------------------------------------*/ 24 char *event_str[] = {"NONE", "IN", "PRI", "OUT", "ERR", "HUP", "RDHUP"}; 25 /*----------------------------------------------------------------------------*/ 26 char * 27 EventToString(uint32_t event) 28 { 29 switch (event) { 30 case MOS_EPOLLNONE: 31 return event_str[0]; 32 break; 33 case MOS_EPOLLIN: 34 return event_str[1]; 35 break; 36 case MOS_EPOLLPRI: 37 return event_str[2]; 38 break; 39 case MOS_EPOLLOUT: 40 return event_str[3]; 41 break; 42 case MOS_EPOLLERR: 43 return event_str[4]; 44 break; 45 case MOS_EPOLLHUP: 46 return event_str[5]; 47 break; 48 case MOS_EPOLLRDHUP: 49 return event_str[6]; 50 break; 51 default: 52 assert(0); 53 } 54 55 assert(0); 56 return NULL; 57 } 58 /*----------------------------------------------------------------------------*/ 59 struct event_queue * 60 CreateEventQueue(int size) 61 { 62 struct event_queue *eq; 63 64 eq = (struct event_queue *)calloc(1, sizeof(struct event_queue)); 65 if (!eq) 66 return NULL; 67 68 eq->start = 0; 69 eq->end = 0; 70 eq->size = size; 71 eq->events = (struct mtcp_epoll_event_int *) 72 calloc(size, sizeof(struct mtcp_epoll_event_int)); 73 if (!eq->events) { 74 free(eq); 75 return NULL; 76 } 77 eq->num_events = 0; 78 79 return eq; 80 } 81 /*----------------------------------------------------------------------------*/ 82 void 83 DestroyEventQueue(struct event_queue *eq) 84 { 85 if (eq->events) 86 free(eq->events); 87 88 free(eq); 89 } 90 /*----------------------------------------------------------------------------*/ 91 int 92 mtcp_epoll_create(mctx_t mctx, int size) 93 { 94 mtcp_manager_t mtcp = g_mtcp[mctx->cpu]; 95 struct mtcp_epoll *ep; 96 socket_map_t epsocket; 97 98 if (size <= 0) { 99 errno = EINVAL; 100 return -1; 101 } 102 103 epsocket = AllocateSocket(mctx, MOS_SOCK_EPOLL); 104 if (!epsocket) { 105 errno = ENFILE; 106 return -1; 107 } 108 109 ep = (struct mtcp_epoll *)calloc(1, sizeof(struct mtcp_epoll)); 110 if (!ep) { 111 FreeSocket(mctx, epsocket->id, MOS_SOCK_EPOLL); 112 return -1; 113 } 114 115 /* create event queues */ 116 ep->usr_queue = CreateEventQueue(size); 117 if (!ep->usr_queue) 118 return -1; 119 120 ep->usr_shadow_queue = CreateEventQueue(size); 121 if (!ep->usr_shadow_queue) { 122 DestroyEventQueue(ep->usr_queue); 123 return -1; 124 } 125 126 ep->mtcp_queue = CreateEventQueue(size); 127 if (!ep->mtcp_queue) { 128 DestroyEventQueue(ep->usr_queue); 129 DestroyEventQueue(ep->usr_shadow_queue); 130 return -1; 131 } 132 133 TRACE_EPOLL("epoll structure of size %d created.\n", size); 134 135 mtcp->ep = ep; 136 epsocket->ep = ep; 137 138 if (pthread_mutex_init(&ep->epoll_lock, NULL)) { 139 return -1; 140 } 141 if (pthread_cond_init(&ep->epoll_cond, NULL)) { 142 return -1; 143 } 144 145 return epsocket->id; 146 } 147 /*----------------------------------------------------------------------------*/ 148 int 149 CloseEpollSocket(mctx_t mctx, int epid) 150 { 151 mtcp_manager_t mtcp; 152 struct mtcp_epoll *ep; 153 154 mtcp = GetMTCPManager(mctx); 155 if (!mtcp) { 156 return -1; 157 } 158 159 ep = mtcp->smap[epid].ep; 160 if (!ep) { 161 errno = EINVAL; 162 return -1; 163 } 164 165 DestroyEventQueue(ep->usr_queue); 166 DestroyEventQueue(ep->usr_shadow_queue); 167 DestroyEventQueue(ep->mtcp_queue); 168 free(ep); 169 170 pthread_mutex_lock(&ep->epoll_lock); 171 mtcp->ep = NULL; 172 mtcp->smap[epid].ep = NULL; 173 pthread_cond_signal(&ep->epoll_cond); 174 pthread_mutex_unlock(&ep->epoll_lock); 175 176 pthread_cond_destroy(&ep->epoll_cond); 177 pthread_mutex_destroy(&ep->epoll_lock); 178 179 return 0; 180 } 181 /*----------------------------------------------------------------------------*/ 182 static int 183 RaisePendingStreamEvents(mtcp_manager_t mtcp, 184 struct mtcp_epoll *ep, socket_map_t socket) 185 { 186 tcp_stream *stream = socket->stream; 187 188 if (!stream) 189 return -1; 190 if (stream->state < TCP_ST_ESTABLISHED) 191 return -1; 192 193 TRACE_EPOLL("Stream %d at state %s\n", 194 stream->id, TCPStateToString(stream)); 195 /* if there are payloads already read before epoll registration */ 196 /* generate read event */ 197 if (socket->epoll & MOS_EPOLLIN) { 198 struct tcp_recv_vars *rcvvar = stream->rcvvar; 199 if (rcvvar->rcvbuf && tcprb_cflen(rcvvar->rcvbuf) > 0) { 200 TRACE_EPOLL("Socket %d: Has existing payloads\n", socket->id); 201 AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 202 } else if (stream->state == TCP_ST_CLOSE_WAIT) { 203 TRACE_EPOLL("Socket %d: Waiting for close\n", socket->id); 204 AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 205 } 206 } 207 208 /* same thing to the write event */ 209 if (socket->epoll & MOS_EPOLLOUT) { 210 struct tcp_send_vars *sndvar = stream->sndvar; 211 if (!sndvar->sndbuf || 212 (sndvar->sndbuf && sndvar->sndbuf->len < sndvar->snd_wnd)) { 213 if (!(socket->events & MOS_EPOLLOUT)) { 214 TRACE_EPOLL("Socket %d: Adding write event\n", socket->id); 215 AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 216 } 217 } 218 } 219 220 return 0; 221 } 222 /*----------------------------------------------------------------------------*/ 223 int 224 mtcp_epoll_ctl(mctx_t mctx, int epid, 225 int op, int sockid, struct mtcp_epoll_event *event) 226 { 227 mtcp_manager_t mtcp; 228 struct mtcp_epoll *ep; 229 socket_map_t socket; 230 uint32_t events; 231 232 mtcp = GetMTCPManager(mctx); 233 if (!mtcp) { 234 return -1; 235 } 236 237 if (epid < 0 || epid >= g_config.mos->max_concurrency) { 238 TRACE_API("Epoll id %d out of range.\n", epid); 239 errno = EBADF; 240 return -1; 241 } 242 243 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 244 TRACE_API("Socket id %d out of range.\n", sockid); 245 errno = EBADF; 246 return -1; 247 } 248 249 if (mtcp->smap[epid].socktype == MOS_SOCK_UNUSED) { 250 errno = EBADF; 251 return -1; 252 } 253 254 if (mtcp->smap[epid].socktype != MOS_SOCK_EPOLL) { 255 errno = EINVAL; 256 return -1; 257 } 258 259 ep = mtcp->smap[epid].ep; 260 if (!ep || (!event && op != MOS_EPOLL_CTL_DEL)) { 261 errno = EINVAL; 262 return -1; 263 } 264 socket = &mtcp->smap[sockid]; 265 266 if (op == MOS_EPOLL_CTL_ADD) { 267 if (socket->epoll) { 268 errno = EEXIST; 269 return -1; 270 } 271 272 /* EPOLLERR and EPOLLHUP are registered as default */ 273 events = event->events; 274 events |= (MOS_EPOLLERR | MOS_EPOLLHUP); 275 socket->ep_data = event->data; 276 socket->epoll = events; 277 278 TRACE_EPOLL("Adding epoll socket %d(type %d) ET: %llu, IN: %llu, OUT: %llu\n", 279 socket->id, socket->socktype, 280 (unsigned long long)socket->epoll & MOS_EPOLLET, 281 (unsigned long long)socket->epoll & MOS_EPOLLIN, 282 (unsigned long long)socket->epoll & MOS_EPOLLOUT); 283 284 if (socket->socktype == MOS_SOCK_STREAM) { 285 RaisePendingStreamEvents(mtcp, ep, socket); 286 } else if (socket->socktype == MOS_SOCK_PIPE) { 287 RaisePendingPipeEvents(mctx, epid, sockid); 288 } 289 290 } else if (op == MOS_EPOLL_CTL_MOD) { 291 if (!socket->epoll) { 292 pthread_mutex_unlock(&ep->epoll_lock); 293 errno = ENOENT; 294 return -1; 295 } 296 297 events = event->events; 298 events |= (MOS_EPOLLERR | MOS_EPOLLHUP); 299 socket->ep_data = event->data; 300 socket->epoll = events; 301 302 if (socket->socktype == MOS_SOCK_STREAM) { 303 RaisePendingStreamEvents(mtcp, ep, socket); 304 } else if (socket->socktype == MOS_SOCK_PIPE) { 305 RaisePendingPipeEvents(mctx, epid, sockid); 306 } 307 308 } else if (op == MOS_EPOLL_CTL_DEL) { 309 if (!socket->epoll) { 310 errno = ENOENT; 311 return -1; 312 } 313 314 socket->epoll = MOS_EPOLLNONE; 315 } 316 317 return 0; 318 } 319 /*----------------------------------------------------------------------------*/ 320 int 321 mtcp_epoll_wait(mctx_t mctx, int epid, 322 struct mtcp_epoll_event *events, int maxevents, int timeout) 323 { 324 mtcp_manager_t mtcp; 325 struct mtcp_epoll *ep; 326 struct event_queue *eq; 327 struct event_queue *eq_shadow; 328 socket_map_t event_socket; 329 int validity; 330 int i, cnt, ret; 331 int num_events; 332 333 mtcp = GetMTCPManager(mctx); 334 if (!mtcp) { 335 return -1; 336 } 337 338 if (epid < 0 || epid >= g_config.mos->max_concurrency) { 339 TRACE_API("Epoll id %d out of range.\n", epid); 340 errno = EBADF; 341 return -1; 342 } 343 344 if (mtcp->smap[epid].socktype == MOS_SOCK_UNUSED) { 345 errno = EBADF; 346 return -1; 347 } 348 349 if (mtcp->smap[epid].socktype != MOS_SOCK_EPOLL) { 350 errno = EINVAL; 351 return -1; 352 } 353 354 ep = mtcp->smap[epid].ep; 355 if (!ep || !events || maxevents <= 0) { 356 errno = EINVAL; 357 return -1; 358 } 359 360 ep->stat.calls++; 361 362 #if SPIN_BEFORE_SLEEP 363 int spin = 0; 364 while (ep->num_events == 0 && spin < SPIN_THRESH) { 365 spin++; 366 } 367 #endif /* SPIN_BEFORE_SLEEP */ 368 369 if (pthread_mutex_lock(&ep->epoll_lock)) { 370 if (errno == EDEADLK) 371 perror("mtcp_epoll_wait: epoll_lock blocked\n"); 372 assert(0); 373 } 374 375 wait: 376 eq = ep->usr_queue; 377 eq_shadow = ep->usr_shadow_queue; 378 379 /* wait until event occurs */ 380 while (eq->num_events == 0 && eq_shadow->num_events == 0 && timeout != 0) { 381 382 #if INTR_SLEEPING_MTCP 383 /* signal to mtcp thread if it is sleeping */ 384 if (mtcp->wakeup_flag && mtcp->is_sleeping) { 385 pthread_kill(mtcp->ctx->thread, SIGUSR1); 386 } 387 #endif 388 ep->stat.waits++; 389 ep->waiting = TRUE; 390 if (timeout > 0) { 391 struct timespec deadline; 392 393 clock_gettime(CLOCK_REALTIME, &deadline); 394 if (timeout >= 1000) { 395 int sec; 396 sec = timeout / 1000; 397 deadline.tv_sec += sec; 398 timeout -= sec * 1000; 399 } 400 401 deadline.tv_nsec += timeout * 1000000; 402 403 if (deadline.tv_nsec >= 1000000000) { 404 deadline.tv_sec++; 405 deadline.tv_nsec -= 1000000000; 406 } 407 408 //deadline.tv_sec = mtcp->cur_tv.tv_sec; 409 //deadline.tv_nsec = (mtcp->cur_tv.tv_usec + timeout * 1000) * 1000; 410 ret = pthread_cond_timedwait(&ep->epoll_cond, 411 &ep->epoll_lock, &deadline); 412 if (ret && ret != ETIMEDOUT) { 413 /* errno set by pthread_cond_timedwait() */ 414 pthread_mutex_unlock(&ep->epoll_lock); 415 TRACE_ERROR("pthread_cond_timedwait failed. ret: %d, error: %s\n", 416 ret, strerror(errno)); 417 return -1; 418 } 419 timeout = 0; 420 } else if (timeout < 0) { 421 ret = pthread_cond_wait(&ep->epoll_cond, &ep->epoll_lock); 422 if (ret) { 423 /* errno set by pthread_cond_wait() */ 424 pthread_mutex_unlock(&ep->epoll_lock); 425 TRACE_ERROR("pthread_cond_wait failed. ret: %d, error: %s\n", 426 ret, strerror(errno)); 427 return -1; 428 } 429 } 430 ep->waiting = FALSE; 431 432 if (mtcp->ctx->done || mtcp->ctx->exit || mtcp->ctx->interrupt) { 433 mtcp->ctx->interrupt = FALSE; 434 //ret = pthread_cond_signal(&ep->epoll_cond); 435 pthread_mutex_unlock(&ep->epoll_lock); 436 errno = EINTR; 437 return -1; 438 } 439 440 } 441 442 /* fetch events from the user event queue */ 443 cnt = 0; 444 num_events = eq->num_events; 445 for (i = 0; i < num_events && cnt < maxevents; i++) { 446 event_socket = &mtcp->smap[eq->events[eq->start].sockid]; 447 validity = TRUE; 448 if (event_socket->socktype == MOS_SOCK_UNUSED) 449 validity = FALSE; 450 if (!(event_socket->epoll & eq->events[eq->start].ev.events)) 451 validity = FALSE; 452 if (!(event_socket->events & eq->events[eq->start].ev.events)) 453 validity = FALSE; 454 455 if (validity) { 456 events[cnt++] = eq->events[eq->start].ev; 457 assert(eq->events[eq->start].sockid >= 0); 458 459 TRACE_EPOLL("Socket %d: Handled event. event: %s, " 460 "start: %u, end: %u, num: %u\n", 461 event_socket->id, 462 EventToString(eq->events[eq->start].ev.events), 463 eq->start, eq->end, eq->num_events); 464 ep->stat.handled++; 465 } else { 466 TRACE_EPOLL("Socket %d: event %s invalidated.\n", 467 eq->events[eq->start].sockid, 468 EventToString(eq->events[eq->start].ev.events)); 469 ep->stat.invalidated++; 470 } 471 event_socket->events &= (~eq->events[eq->start].ev.events); 472 473 eq->start++; 474 eq->num_events--; 475 if (eq->start >= eq->size) { 476 eq->start = 0; 477 } 478 } 479 480 /* fetch eventes from user shadow event queue */ 481 eq = ep->usr_shadow_queue; 482 num_events = eq->num_events; 483 for (i = 0; i < num_events && cnt < maxevents; i++) { 484 event_socket = &mtcp->smap[eq->events[eq->start].sockid]; 485 validity = TRUE; 486 if (event_socket->socktype == MOS_SOCK_UNUSED) 487 validity = FALSE; 488 if (!(event_socket->epoll & eq->events[eq->start].ev.events)) 489 validity = FALSE; 490 if (!(event_socket->events & eq->events[eq->start].ev.events)) 491 validity = FALSE; 492 493 if (validity) { 494 events[cnt++] = eq->events[eq->start].ev; 495 assert(eq->events[eq->start].sockid >= 0); 496 497 TRACE_EPOLL("Socket %d: Handled event. event: %s, " 498 "start: %u, end: %u, num: %u\n", 499 event_socket->id, 500 EventToString(eq->events[eq->start].ev.events), 501 eq->start, eq->end, eq->num_events); 502 ep->stat.handled++; 503 } else { 504 TRACE_EPOLL("Socket %d: event %s invalidated.\n", 505 eq->events[eq->start].sockid, 506 EventToString(eq->events[eq->start].ev.events)); 507 ep->stat.invalidated++; 508 } 509 event_socket->events &= (~eq->events[eq->start].ev.events); 510 511 eq->start++; 512 eq->num_events--; 513 if (eq->start >= eq->size) { 514 eq->start = 0; 515 } 516 } 517 518 if (cnt == 0 && timeout != 0) 519 goto wait; 520 521 pthread_mutex_unlock(&ep->epoll_lock); 522 523 return cnt; 524 } 525 /*----------------------------------------------------------------------------*/ 526 inline int 527 AddEpollEvent(struct mtcp_epoll *ep, 528 int queue_type, socket_map_t socket, uint32_t event) 529 { 530 #ifdef DBGMSG 531 __PREPARE_DBGLOGGING(); 532 #endif 533 struct event_queue *eq; 534 int index; 535 536 if (!ep || !socket || !event) 537 return -1; 538 539 ep->stat.issued++; 540 541 if (socket->events & event) { 542 return 0; 543 } 544 545 if (queue_type == MOS_EVENT_QUEUE) { 546 eq = ep->mtcp_queue; 547 } else if (queue_type == USR_EVENT_QUEUE) { 548 eq = ep->usr_queue; 549 pthread_mutex_lock(&ep->epoll_lock); 550 } else if (queue_type == USR_SHADOW_EVENT_QUEUE) { 551 eq = ep->usr_shadow_queue; 552 } else { 553 TRACE_ERROR("Non-existing event queue type!\n"); 554 return -1; 555 } 556 557 if (eq->num_events >= eq->size) { 558 TRACE_ERROR("Exceeded epoll event queue! num_events: %d, size: %d\n", 559 eq->num_events, eq->size); 560 if (queue_type == USR_EVENT_QUEUE) 561 pthread_mutex_unlock(&ep->epoll_lock); 562 return -1; 563 } 564 565 index = eq->end++; 566 567 socket->events |= event; 568 eq->events[index].sockid = socket->id; 569 eq->events[index].ev.events = event; 570 eq->events[index].ev.data = socket->ep_data; 571 572 if (eq->end >= eq->size) { 573 eq->end = 0; 574 } 575 eq->num_events++; 576 577 TRACE_EPOLL("Socket %d New event: %s, start: %u, end: %u, num: %u\n", 578 eq->events[index].sockid, 579 EventToString(eq->events[index].ev.events), 580 eq->start, eq->end, eq->num_events); 581 582 if (queue_type == USR_EVENT_QUEUE) 583 pthread_mutex_unlock(&ep->epoll_lock); 584 585 ep->stat.registered++; 586 587 return 0; 588 } 589