1 #include <sys/queue.h> 2 #include <sys/ioctl.h> 3 #include <limits.h> 4 #include <unistd.h> 5 #include <assert.h> 6 #include <string.h> 7 8 #ifdef DARWIN 9 #include <netinet/tcp.h> 10 #include <netinet/ip.h> 11 #include <netinet/if_ether.h> 12 #endif 13 14 #include "mtcp.h" 15 #include "mtcp_api.h" 16 #include "tcp_in.h" 17 #include "tcp_stream.h" 18 #include "tcp_out.h" 19 #include "ip_out.h" 20 #include "eventpoll.h" 21 #include "pipe.h" 22 #include "fhash.h" 23 #include "addr_pool.h" 24 #include "rss.h" 25 #include "config.h" 26 #include "debug.h" 27 #include "eventpoll.h" 28 #include "mos_api.h" 29 #include "tcp_rb.h" 30 31 #define MAX(a, b) ((a)>(b)?(a):(b)) 32 #define MIN(a, b) ((a)<(b)?(a):(b)) 33 34 /*----------------------------------------------------------------------------*/ 35 /** Stop monitoring the socket! (function prototype) 36 * @param [in] mctx: mtcp context 37 * @param [in] sock: monitoring stream socket id 38 * @param [in] side: side of monitoring (client side, server side or both) 39 * 40 * This function is now DEPRECATED and is only used within mOS core... 41 */ 42 int 43 mtcp_cb_stop(mctx_t mctx, int sock, int side); 44 /*----------------------------------------------------------------------------*/ 45 /** Reset the connection (send RST packets to both sides) 46 * (We need to decide the API for this.) 47 */ 48 //int 49 //mtcp_cb_reset(mctx_t mctx, int sock, int side); 50 /*----------------------------------------------------------------------------*/ 51 inline mtcp_manager_t 52 GetMTCPManager(mctx_t mctx) 53 { 54 if (!mctx) { 55 errno = EACCES; 56 return NULL; 57 } 58 59 if (mctx->cpu < 0 || mctx->cpu >= num_cpus) { 60 errno = EINVAL; 61 return NULL; 62 } 63 64 if (!g_mtcp[mctx->cpu] || g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) { 65 errno = EPERM; 66 return NULL; 67 } 68 69 return g_mtcp[mctx->cpu]; 70 } 71 /*----------------------------------------------------------------------------*/ 72 static inline int 73 GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen) 74 { 75 tcp_stream *cur_stream; 76 77 if (!socket->stream) { 78 errno = EBADF; 79 return -1; 80 } 81 82 cur_stream = socket->stream; 83 if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 84 if (cur_stream->close_reason == TCP_TIMEDOUT || 85 cur_stream->close_reason == TCP_CONN_FAIL || 86 cur_stream->close_reason == TCP_CONN_LOST) { 87 *(int *)optval = ETIMEDOUT; 88 *optlen = sizeof(int); 89 90 return 0; 91 } 92 } 93 94 if (cur_stream->state == TCP_ST_CLOSE_WAIT || 95 cur_stream->state == TCP_ST_CLOSED_RSVD) { 96 if (cur_stream->close_reason == TCP_RESET) { 97 *(int *)optval = ECONNRESET; 98 *optlen = sizeof(int); 99 100 return 0; 101 } 102 } 103 104 if (cur_stream->state == TCP_ST_SYN_SENT && 105 errno == EINPROGRESS) { 106 *(int *)optval = errno; 107 *optlen = sizeof(int); 108 109 return -1; 110 } 111 112 /* 113 * `base case`: If socket sees no so_error, then 114 * this also means close_reason will always be 115 * TCP_NOT_CLOSED. 116 */ 117 if (cur_stream->close_reason == TCP_NOT_CLOSED) { 118 *(int *)optval = 0; 119 *optlen = sizeof(int); 120 121 return 0; 122 } 123 124 errno = ENOSYS; 125 return -1; 126 } 127 /*----------------------------------------------------------------------------*/ 128 int 129 mtcp_getsockname(mctx_t mctx, int sockid, struct sockaddr *addr, 130 socklen_t *addrlen) 131 { 132 mtcp_manager_t mtcp; 133 socket_map_t socket; 134 135 mtcp = GetMTCPManager(mctx); 136 if (!mtcp) { 137 return -1; 138 } 139 140 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 141 TRACE_API("Socket id %d out of range.\n", sockid); 142 errno = EBADF; 143 return -1; 144 } 145 146 socket = &mtcp->smap[sockid]; 147 if (socket->socktype == MOS_SOCK_UNUSED) { 148 TRACE_API("Invalid socket id: %d\n", sockid); 149 errno = EBADF; 150 return -1; 151 } 152 153 if (*addrlen <= 0) { 154 TRACE_API("Invalid addrlen: %d\n", *addrlen); 155 errno = EINVAL; 156 return -1; 157 } 158 159 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 160 socket->socktype != MOS_SOCK_STREAM) { 161 TRACE_API("Invalid socket id: %d\n", sockid); 162 errno = ENOTSOCK; 163 return -1; 164 } 165 166 *(struct sockaddr_in *)addr = socket->saddr; 167 *addrlen = sizeof(socket->saddr); 168 169 return 0; 170 } 171 /*----------------------------------------------------------------------------*/ 172 int 173 mtcp_getsockopt(mctx_t mctx, int sockid, int level, 174 int optname, void *optval, socklen_t *optlen) 175 { 176 mtcp_manager_t mtcp; 177 socket_map_t socket; 178 179 mtcp = GetMTCPManager(mctx); 180 if (!mtcp) { 181 errno = EACCES; 182 return -1; 183 } 184 185 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 186 TRACE_API("Socket id %d out of range.\n", sockid); 187 errno = EBADF; 188 return -1; 189 } 190 191 switch (level) { 192 case SOL_SOCKET: 193 socket = &mtcp->smap[sockid]; 194 if (socket->socktype == MOS_SOCK_UNUSED) { 195 TRACE_API("Invalid socket id: %d\n", sockid); 196 errno = EBADF; 197 return -1; 198 } 199 200 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 201 socket->socktype != MOS_SOCK_STREAM) { 202 TRACE_API("Invalid socket id: %d\n", sockid); 203 errno = ENOTSOCK; 204 return -1; 205 } 206 207 if (optname == SO_ERROR) { 208 if (socket->socktype == MOS_SOCK_STREAM) { 209 return GetSocketError(socket, optval, optlen); 210 } 211 } 212 break; 213 case SOL_MONSOCKET: 214 /* check if the calling thread is in MOS context */ 215 if (mtcp->ctx->thread != pthread_self()) { 216 errno = EPERM; 217 return -1; 218 } 219 /* 220 * All options will only work for active 221 * monitor stream sockets 222 */ 223 socket = &mtcp->msmap[sockid]; 224 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 225 TRACE_API("Invalid socket id: %d\n", sockid); 226 errno = ENOTSOCK; 227 return -1; 228 } 229 230 switch (optname) { 231 case MOS_FRAGINFO_CLIBUF: 232 return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen); 233 case MOS_FRAGINFO_SVRBUF: 234 return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen); 235 case MOS_INFO_CLIBUF: 236 return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen); 237 case MOS_INFO_SVRBUF: 238 return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen); 239 case MOS_TCP_STATE_CLI: 240 return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI, 241 optval, optlen); 242 case MOS_TCP_STATE_SVR: 243 return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR, 244 optval, optlen); 245 case MOS_TIMESTAMP: 246 return GetLastTimestamp(socket->monitor_stream->stream, 247 (uint32_t *)optval, 248 optlen); 249 default: 250 TRACE_API("can't recognize the optname=%d\n", optname); 251 assert(0); 252 } 253 break; 254 } 255 errno = ENOSYS; 256 return -1; 257 } 258 /*----------------------------------------------------------------------------*/ 259 int 260 mtcp_setsockopt(mctx_t mctx, int sockid, int level, 261 int optname, const void *optval, socklen_t optlen) 262 { 263 mtcp_manager_t mtcp; 264 socket_map_t socket; 265 tcprb_t *rb; 266 267 mtcp = GetMTCPManager(mctx); 268 if (!mtcp) { 269 errno = EACCES; 270 return -1; 271 } 272 273 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 274 TRACE_API("Socket id %d out of range.\n", sockid); 275 errno = EBADF; 276 return -1; 277 } 278 279 switch (level) { 280 case SOL_SOCKET: 281 socket = &mtcp->smap[sockid]; 282 if (socket->socktype == MOS_SOCK_UNUSED) { 283 TRACE_API("Invalid socket id: %d\n", sockid); 284 errno = EBADF; 285 return -1; 286 } 287 288 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 289 socket->socktype != MOS_SOCK_STREAM) { 290 TRACE_API("Invalid socket id: %d\n", sockid); 291 errno = ENOTSOCK; 292 return -1; 293 } 294 break; 295 case SOL_MONSOCKET: 296 socket = &mtcp->msmap[sockid]; 297 /* 298 * checking of calling thread to be in MOS context is 299 * disabled since both optnames can be called from 300 * `application' context (on passive sockets) 301 */ 302 /* 303 * if (mtcp->ctx->thread != pthread_self()) 304 * return -1; 305 */ 306 307 switch (optname) { 308 case MOS_CLIBUF: 309 #if 0 310 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 311 errno = EBADF; 312 return -1; 313 } 314 #endif 315 #ifdef DISABLE_DYN_RESIZE 316 if (*(int *)optval != 0) 317 return -1; 318 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 319 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 320 socket->monitor_stream->stream->rcvvar->rcvbuf : 321 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 322 if (rb) { 323 tcprb_resize_meta(rb, 0); 324 tcprb_resize(rb, 0); 325 } 326 } 327 return DisableBuf(socket, MOS_SIDE_CLI); 328 #else 329 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 330 socket->monitor_stream->stream->rcvvar->rcvbuf : 331 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 332 if (tcprb_resize_meta(rb, *(int *)optval) < 0) 333 return -1; 334 return tcprb_resize(rb, 335 (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 336 #endif 337 case MOS_SVRBUF: 338 #if 0 339 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 340 errno = EBADF; 341 return -1; 342 } 343 #endif 344 #ifdef DISABLE_DYN_RESIZE 345 if (*(int *)optval != 0) 346 return -1; 347 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 348 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 349 socket->monitor_stream->stream->rcvvar->rcvbuf : 350 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 351 if (rb) { 352 tcprb_resize_meta(rb, 0); 353 tcprb_resize(rb, 0); 354 } 355 } 356 return DisableBuf(socket, MOS_SIDE_SVR); 357 #else 358 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 359 socket->monitor_stream->stream->rcvvar->rcvbuf : 360 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 361 if (tcprb_resize_meta(rb, *(int *)optval) < 0) 362 return -1; 363 return tcprb_resize(rb, 364 (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 365 #endif 366 case MOS_FRAG_CLIBUF: 367 #if 0 368 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 369 errno = EBADF; 370 return -1; 371 } 372 #endif 373 #ifdef DISABLE_DYN_RESIZE 374 if (*(int *)optval != 0) 375 return -1; 376 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 377 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 378 socket->monitor_stream->stream->rcvvar->rcvbuf : 379 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 380 if (rb) 381 tcprb_resize(rb, 0); 382 } 383 return 0; 384 #else 385 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 386 socket->monitor_stream->stream->rcvvar->rcvbuf : 387 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 388 if (rb->len == 0) 389 return tcprb_resize_meta(rb, *(int *)optval); 390 else 391 return -1; 392 #endif 393 case MOS_FRAG_SVRBUF: 394 #if 0 395 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 396 errno = EBADF; 397 return -1; 398 } 399 #endif 400 #ifdef DISABLE_DYN_RESIZE 401 if (*(int *)optval != 0) 402 return -1; 403 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 404 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 405 socket->monitor_stream->stream->rcvvar->rcvbuf : 406 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 407 if (rb) 408 tcprb_resize(rb, 0); 409 } 410 return 0; 411 #else 412 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 413 socket->monitor_stream->stream->rcvvar->rcvbuf : 414 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 415 if (rb->len == 0) 416 return tcprb_resize_meta(rb, *(int *)optval); 417 else 418 return -1; 419 #endif 420 case MOS_MONLEVEL: 421 #ifdef OLD_API 422 assert(*(int *)optval == MOS_NO_CLIBUF || 423 *(int *)optval == MOS_NO_SVRBUF); 424 return DisableBuf(socket, 425 (*(int *)optval == MOS_NO_CLIBUF) ? 426 MOS_SIDE_CLI : MOS_SIDE_SVR); 427 #endif 428 case MOS_SEQ_REMAP: 429 return TcpSeqChange(socket, 430 (uint32_t)((seq_remap_info *)optval)->seq_off, 431 ((seq_remap_info *)optval)->side, 432 mtcp->pctx->p.seq); 433 case MOS_STOP_MON: 434 return mtcp_cb_stop(mctx, sockid, *(int *)optval); 435 default: 436 TRACE_API("invalid optname=%d\n", optname); 437 assert(0); 438 } 439 break; 440 } 441 442 errno = ENOSYS; 443 return -1; 444 } 445 /*----------------------------------------------------------------------------*/ 446 int 447 mtcp_setsock_nonblock(mctx_t mctx, int sockid) 448 { 449 mtcp_manager_t mtcp; 450 451 mtcp = GetMTCPManager(mctx); 452 if (!mtcp) { 453 errno = EACCES; 454 return -1; 455 } 456 457 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 458 TRACE_API("Socket id %d out of range.\n", sockid); 459 errno = EBADF; 460 return -1; 461 } 462 463 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 464 TRACE_API("Invalid socket id: %d\n", sockid); 465 errno = EBADF; 466 return -1; 467 } 468 469 mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 470 471 return 0; 472 } 473 /*----------------------------------------------------------------------------*/ 474 int 475 mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp) 476 { 477 mtcp_manager_t mtcp; 478 socket_map_t socket; 479 480 mtcp = GetMTCPManager(mctx); 481 if (!mtcp) { 482 errno = EACCES; 483 return -1; 484 } 485 486 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 487 TRACE_API("Socket id %d out of range.\n", sockid); 488 errno = EBADF; 489 return -1; 490 } 491 492 /* only support stream socket */ 493 socket = &mtcp->smap[sockid]; 494 495 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 496 socket->socktype != MOS_SOCK_STREAM) { 497 TRACE_API("Invalid socket id: %d\n", sockid); 498 errno = EBADF; 499 return -1; 500 } 501 502 if (!argp) { 503 errno = EFAULT; 504 return -1; 505 } 506 507 if (request == FIONREAD) { 508 tcp_stream *cur_stream; 509 tcprb_t *rbuf; 510 511 cur_stream = socket->stream; 512 if (!cur_stream) { 513 errno = EBADF; 514 return -1; 515 } 516 517 rbuf = cur_stream->rcvvar->rcvbuf; 518 *(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0; 519 520 } else if (request == FIONBIO) { 521 /* 522 * sockets can only be set to blocking/non-blocking 523 * modes during initialization 524 */ 525 if ((*(int *)argp)) 526 mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 527 else 528 mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK; 529 } else { 530 errno = EINVAL; 531 return -1; 532 } 533 534 return 0; 535 } 536 /*----------------------------------------------------------------------------*/ 537 static int 538 mtcp_monitor(mctx_t mctx, socket_map_t sock) 539 { 540 mtcp_manager_t mtcp; 541 struct mon_listener *monitor; 542 int sockid = sock->id; 543 544 mtcp = GetMTCPManager(mctx); 545 if (!mtcp) { 546 errno = EACCES; 547 return -1; 548 } 549 550 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 551 TRACE_API("Socket id %d out of range.\n", sockid); 552 errno = EBADF; 553 return -1; 554 } 555 556 if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) { 557 TRACE_API("Invalid socket id: %d\n", sockid); 558 errno = EBADF; 559 return -1; 560 } 561 562 if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM || 563 mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) { 564 TRACE_API("Not a monitor socket. id: %d\n", sockid); 565 errno = ENOTSOCK; 566 return -1; 567 } 568 569 monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener)); 570 if (!monitor) { 571 /* errno set from the malloc() */ 572 errno = ENOMEM; 573 return -1; 574 } 575 576 /* create a monitor-specific event queue */ 577 monitor->eq = CreateEventQueue(g_config.mos->max_concurrency); 578 if (!monitor->eq) { 579 TRACE_API("Can't create event queue (concurrency: %d) for " 580 "monitor read event registrations!\n", 581 g_config.mos->max_concurrency); 582 free(monitor); 583 errno = ENOMEM; 584 return -1; 585 } 586 587 /* set monitor-related basic parameters */ 588 #ifndef NEWEV 589 monitor->ude_id = UDE_OFFSET; 590 #endif 591 monitor->socket = sock; 592 monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL; 593 594 /* perform both sides monitoring by default */ 595 monitor->client_mon = monitor->server_mon = 1; 596 597 /* add monitor socket to the monitor list */ 598 TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link); 599 600 mtcp->msmap[sockid].monitor_listener = monitor; 601 602 return 0; 603 } 604 /*----------------------------------------------------------------------------*/ 605 int 606 mtcp_socket(mctx_t mctx, int domain, int type, int protocol) 607 { 608 mtcp_manager_t mtcp; 609 socket_map_t socket; 610 611 mtcp = GetMTCPManager(mctx); 612 if (!mtcp) { 613 errno = EACCES; 614 return -1; 615 } 616 617 if (domain != AF_INET) { 618 errno = EAFNOSUPPORT; 619 return -1; 620 } 621 622 if (type == SOCK_STREAM) { 623 type = MOS_SOCK_STREAM; 624 } else if (type == MOS_SOCK_MONITOR_STREAM || 625 type == MOS_SOCK_MONITOR_RAW) { 626 /* do nothing for the time being */ 627 } else { 628 /* Not supported type */ 629 errno = EINVAL; 630 return -1; 631 } 632 633 socket = AllocateSocket(mctx, type); 634 if (!socket) { 635 errno = ENFILE; 636 return -1; 637 } 638 639 if (type == MOS_SOCK_MONITOR_STREAM || 640 type == MOS_SOCK_MONITOR_RAW) { 641 mtcp_manager_t mtcp = GetMTCPManager(mctx); 642 if (!mtcp) { 643 errno = EACCES; 644 return -1; 645 } 646 mtcp_monitor(mctx, socket); 647 #ifdef NEWEV 648 socket->monitor_listener->stree_dontcare = NULL; 649 socket->monitor_listener->stree_pre_rcv = NULL; 650 socket->monitor_listener->stree_post_snd = NULL; 651 #else 652 InitEvB(mtcp, &socket->monitor_listener->dontcare_evb); 653 InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb); 654 InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb); 655 #endif 656 } 657 658 return socket->id; 659 } 660 /*----------------------------------------------------------------------------*/ 661 int 662 mtcp_bind(mctx_t mctx, int sockid, 663 const struct sockaddr *addr, socklen_t addrlen) 664 { 665 mtcp_manager_t mtcp; 666 struct sockaddr_in *addr_in; 667 668 mtcp = GetMTCPManager(mctx); 669 if (!mtcp) { 670 errno = EACCES; 671 return -1; 672 } 673 674 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 675 TRACE_API("Socket id %d out of range.\n", sockid); 676 errno = EBADF; 677 return -1; 678 } 679 680 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 681 TRACE_API("Invalid socket id: %d\n", sockid); 682 errno = EBADF; 683 return -1; 684 } 685 686 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM && 687 mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 688 TRACE_API("Not a stream socket id: %d\n", sockid); 689 errno = ENOTSOCK; 690 return -1; 691 } 692 693 if (!addr) { 694 TRACE_API("Socket %d: empty address!\n", sockid); 695 errno = EINVAL; 696 return -1; 697 } 698 699 if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) { 700 TRACE_API("Socket %d: adress already bind for this socket.\n", sockid); 701 errno = EINVAL; 702 return -1; 703 } 704 705 /* we only allow bind() for AF_INET address */ 706 if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 707 TRACE_API("Socket %d: invalid argument!\n", sockid); 708 errno = EINVAL; 709 return -1; 710 } 711 712 if (mtcp->listener) { 713 TRACE_API("Address already bound!\n"); 714 errno = EINVAL; 715 return -1; 716 } 717 addr_in = (struct sockaddr_in *)addr; 718 mtcp->smap[sockid].saddr = *addr_in; 719 mtcp->smap[sockid].opts |= MTCP_ADDR_BIND; 720 721 return 0; 722 } 723 /*----------------------------------------------------------------------------*/ 724 int 725 mtcp_listen(mctx_t mctx, int sockid, int backlog) 726 { 727 mtcp_manager_t mtcp; 728 struct tcp_listener *listener; 729 730 mtcp = GetMTCPManager(mctx); 731 if (!mtcp) { 732 errno = EACCES; 733 return -1; 734 } 735 736 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 737 TRACE_API("Socket id %d out of range.\n", sockid); 738 errno = EBADF; 739 return -1; 740 } 741 742 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 743 TRACE_API("Invalid socket id: %d\n", sockid); 744 errno = EBADF; 745 return -1; 746 } 747 748 if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) { 749 mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN; 750 } 751 752 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 753 TRACE_API("Not a listening socket. id: %d\n", sockid); 754 errno = ENOTSOCK; 755 return -1; 756 } 757 758 if (backlog <= 0 || backlog > g_config.mos->max_concurrency) { 759 errno = EINVAL; 760 return -1; 761 } 762 763 listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener)); 764 if (!listener) { 765 /* errno set from the malloc() */ 766 errno = ENOMEM; 767 return -1; 768 } 769 770 listener->sockid = sockid; 771 listener->backlog = backlog; 772 listener->socket = &mtcp->smap[sockid]; 773 774 if (pthread_cond_init(&listener->accept_cond, NULL)) { 775 perror("pthread_cond_init of ctx->accept_cond\n"); 776 /* errno set by pthread_cond_init() */ 777 return -1; 778 } 779 if (pthread_mutex_init(&listener->accept_lock, NULL)) { 780 perror("pthread_mutex_init of ctx->accept_lock\n"); 781 /* errno set by pthread_mutex_init() */ 782 return -1; 783 } 784 785 listener->acceptq = CreateStreamQueue(backlog); 786 if (!listener->acceptq) { 787 errno = ENOMEM; 788 return -1; 789 } 790 791 mtcp->smap[sockid].listener = listener; 792 mtcp->listener = listener; 793 794 return 0; 795 } 796 /*----------------------------------------------------------------------------*/ 797 int 798 mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen) 799 { 800 mtcp_manager_t mtcp; 801 struct tcp_listener *listener; 802 socket_map_t socket; 803 tcp_stream *accepted = NULL; 804 805 mtcp = GetMTCPManager(mctx); 806 if (!mtcp) { 807 errno = EACCES; 808 return -1; 809 } 810 811 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 812 TRACE_API("Socket id %d out of range.\n", sockid); 813 errno = EBADF; 814 return -1; 815 } 816 817 /* requires listening socket */ 818 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 819 errno = EINVAL; 820 return -1; 821 } 822 823 listener = mtcp->smap[sockid].listener; 824 825 /* dequeue from the acceptq without lock first */ 826 /* if nothing there, acquire lock and cond_wait */ 827 accepted = StreamDequeue(listener->acceptq); 828 if (!accepted) { 829 if (listener->socket->opts & MTCP_NONBLOCK) { 830 errno = EAGAIN; 831 return -1; 832 833 } else { 834 pthread_mutex_lock(&listener->accept_lock); 835 while ((accepted = StreamDequeue(listener->acceptq)) == NULL) { 836 pthread_cond_wait(&listener->accept_cond, &listener->accept_lock); 837 838 if (mtcp->ctx->done || mtcp->ctx->exit) { 839 pthread_mutex_unlock(&listener->accept_lock); 840 errno = EINTR; 841 return -1; 842 } 843 } 844 pthread_mutex_unlock(&listener->accept_lock); 845 } 846 } 847 848 if (!accepted) { 849 TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n"); 850 } 851 852 if (!accepted->socket) { 853 socket = AllocateSocket(mctx, MOS_SOCK_STREAM); 854 if (!socket) { 855 TRACE_ERROR("Failed to create new socket!\n"); 856 /* TODO: destroy the stream */ 857 errno = ENFILE; 858 return -1; 859 } 860 socket->stream = accepted; 861 accepted->socket = socket; 862 863 /* set socket addr parameters */ 864 socket->saddr.sin_family = AF_INET; 865 socket->saddr.sin_port = accepted->dport; 866 socket->saddr.sin_addr.s_addr = accepted->daddr; 867 868 /* if monitor is enabled, complete the socket assignment */ 869 if (socket->stream->pair_stream != NULL) 870 socket->stream->pair_stream->socket = socket; 871 } 872 873 if (!(listener->socket->epoll & MOS_EPOLLET) && 874 !StreamQueueIsEmpty(listener->acceptq)) 875 AddEpollEvent(mtcp->ep, 876 USR_SHADOW_EVENT_QUEUE, 877 listener->socket, MOS_EPOLLIN); 878 879 TRACE_API("Stream %d accepted.\n", accepted->id); 880 881 if (addr && addrlen) { 882 struct sockaddr_in *addr_in = (struct sockaddr_in *)addr; 883 addr_in->sin_family = AF_INET; 884 addr_in->sin_port = accepted->dport; 885 addr_in->sin_addr.s_addr = accepted->daddr; 886 *addrlen = sizeof(struct sockaddr_in); 887 } 888 889 return accepted->socket->id; 890 } 891 /*----------------------------------------------------------------------------*/ 892 int 893 mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr, 894 in_addr_t daddr, in_addr_t dport) 895 { 896 mtcp_manager_t mtcp; 897 addr_pool_t ap; 898 899 mtcp = GetMTCPManager(mctx); 900 if (!mtcp) { 901 errno = EACCES; 902 return -1; 903 } 904 905 if (saddr_base == INADDR_ANY) { 906 int nif_out; 907 908 /* for the INADDR_ANY, find the output interface for the destination 909 and set the saddr_base as the ip address of the output interface */ 910 nif_out = GetOutputInterface(daddr); 911 saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr; 912 } 913 914 ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus, 915 saddr_base, num_addr, daddr, dport); 916 if (!ap) { 917 errno = ENOMEM; 918 return -1; 919 } 920 921 mtcp->ap = ap; 922 923 return 0; 924 } 925 /*----------------------------------------------------------------------------*/ 926 int 927 eval_bpf_5tuple(struct sfbpf_program fcode, 928 in_addr_t saddr, in_port_t sport, 929 in_addr_t daddr, in_port_t dport) { 930 uint8_t buf[TOTAL_TCP_HEADER_LEN]; 931 struct ethhdr *ethh; 932 struct iphdr *iph; 933 struct tcphdr *tcph; 934 935 ethh = (struct ethhdr *)buf; 936 ethh->h_proto = htons(ETH_P_IP); 937 iph = (struct iphdr *)(ethh + 1); 938 iph->ihl = IP_HEADER_LEN >> 2; 939 iph->version = 4; 940 iph->tos = 0; 941 iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN); 942 iph->id = htons(0); 943 iph->protocol = IPPROTO_TCP; 944 iph->saddr = saddr; 945 iph->daddr = daddr; 946 iph->check = 0; 947 tcph = (struct tcphdr *)(iph + 1); 948 tcph->source = sport; 949 tcph->dest = dport; 950 951 return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr), 952 TOTAL_TCP_HEADER_LEN); 953 } 954 /*----------------------------------------------------------------------------*/ 955 int 956 mtcp_connect(mctx_t mctx, int sockid, 957 const struct sockaddr *addr, socklen_t addrlen) 958 { 959 mtcp_manager_t mtcp; 960 socket_map_t socket; 961 tcp_stream *cur_stream; 962 struct sockaddr_in *addr_in; 963 in_addr_t dip; 964 in_port_t dport; 965 int is_dyn_bound = FALSE; 966 int ret; 967 int cnt_match = 0; 968 struct mon_listener *walk; 969 struct sfbpf_program fcode; 970 971 cur_stream = NULL; 972 mtcp = GetMTCPManager(mctx); 973 if (!mtcp) { 974 errno = EACCES; 975 return -1; 976 } 977 978 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 979 TRACE_API("Socket id %d out of range.\n", sockid); 980 errno = EBADF; 981 return -1; 982 } 983 984 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 985 TRACE_API("Invalid socket id: %d\n", sockid); 986 errno = EBADF; 987 return -1; 988 } 989 990 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 991 TRACE_API("Not an end socket. id: %d\n", sockid); 992 errno = ENOTSOCK; 993 return -1; 994 } 995 996 if (!addr) { 997 TRACE_API("Socket %d: empty address!\n", sockid); 998 errno = EFAULT; 999 return -1; 1000 } 1001 1002 /* we only allow bind() for AF_INET address */ 1003 if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 1004 TRACE_API("Socket %d: invalid argument!\n", sockid); 1005 errno = EAFNOSUPPORT; 1006 return -1; 1007 } 1008 1009 socket = &mtcp->smap[sockid]; 1010 if (socket->stream) { 1011 TRACE_API("Socket %d: stream already exist!\n", sockid); 1012 if (socket->stream->state >= TCP_ST_ESTABLISHED) { 1013 errno = EISCONN; 1014 } else { 1015 errno = EALREADY; 1016 } 1017 return -1; 1018 } 1019 1020 addr_in = (struct sockaddr_in *)addr; 1021 dip = addr_in->sin_addr.s_addr; 1022 dport = addr_in->sin_port; 1023 1024 /* address binding */ 1025 if (socket->opts & MTCP_ADDR_BIND && 1026 socket->saddr.sin_port != INPORT_ANY && 1027 socket->saddr.sin_addr.s_addr != INADDR_ANY) { 1028 int rss_core; 1029 1030 rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip, 1031 socket->saddr.sin_port, dport, num_queues); 1032 1033 if (rss_core != mctx->cpu) { 1034 errno = EINVAL; 1035 return -1; 1036 } 1037 } else { 1038 if (mtcp->ap) { 1039 ret = FetchAddress(mtcp->ap, 1040 mctx->cpu, num_queues, addr_in, &socket->saddr); 1041 } else { 1042 ret = FetchAddress(ap[GetOutputInterface(dip)], 1043 mctx->cpu, num_queues, addr_in, &socket->saddr); 1044 } 1045 if (ret < 0) { 1046 errno = EAGAIN; 1047 return -1; 1048 } 1049 socket->opts |= MTCP_ADDR_BIND; 1050 is_dyn_bound = TRUE; 1051 } 1052 1053 cnt_match = 0; 1054 if (mtcp->num_msp > 0) { 1055 TAILQ_FOREACH(walk, &mtcp->monitors, link) { 1056 fcode = walk->stream_syn_fcode; 1057 if (!(ISSET_BPFFILTER(fcode) && 1058 eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr, 1059 socket->saddr.sin_port, 1060 dip, dport) == 0)) { 1061 walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1 1062 cnt_match++; 1063 } 1064 } 1065 } 1066 1067 if (mtcp->num_msp > 0 && cnt_match > 0) { 1068 /* 150820 dhkim: XXX: embedded mode is not verified */ 1069 #if 1 1070 cur_stream = CreateClientTCPStream(mtcp, socket, 1071 STREAM_TYPE(MOS_SOCK_STREAM) | 1072 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 1073 socket->saddr.sin_addr.s_addr, 1074 socket->saddr.sin_port, dip, dport, NULL); 1075 #else 1076 cur_stream = CreateDualTCPStream(mtcp, socket, 1077 STREAM_TYPE(MOS_SOCK_STREAM) | 1078 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 1079 socket->saddr.sin_addr.s_addr, 1080 socket->saddr.sin_port, dip, dport, NULL); 1081 #endif 1082 } 1083 else 1084 cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM), 1085 socket->saddr.sin_addr.s_addr, 1086 socket->saddr.sin_port, dip, dport, NULL); 1087 if (!cur_stream) { 1088 TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid); 1089 errno = ENOMEM; 1090 return -1; 1091 } 1092 1093 if (is_dyn_bound) 1094 cur_stream->is_bound_addr = TRUE; 1095 cur_stream->sndvar->cwnd = 1; 1096 cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10; 1097 cur_stream->side = MOS_SIDE_CLI; 1098 /* if monitor is enabled, update the pair stream side as well */ 1099 if (cur_stream->pair_stream) { 1100 cur_stream->pair_stream->side = MOS_SIDE_SVR; 1101 /* 1102 * if buffer management is off, then disable 1103 * monitoring tcp ring of server... 1104 * if there is even a single monitor asking for 1105 * buffer management, enable it (that's why the 1106 * need for the loop) 1107 */ 1108 cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF; 1109 struct socket_map *walk; 1110 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 1111 uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt; 1112 if (bm > cur_stream->pair_stream->buffer_mgmt) { 1113 cur_stream->pair_stream->buffer_mgmt = bm; 1114 break; 1115 } 1116 } SOCKQ_FOREACH_END; 1117 } 1118 1119 cur_stream->state = TCP_ST_SYN_SENT; 1120 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1121 1122 TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id); 1123 1124 SQ_LOCK(&mtcp->ctx->connect_lock); 1125 ret = StreamEnqueue(mtcp->connectq, cur_stream); 1126 SQ_UNLOCK(&mtcp->ctx->connect_lock); 1127 mtcp->wakeup_flag = TRUE; 1128 if (ret < 0) { 1129 TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid); 1130 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1131 StreamEnqueue(mtcp->destroyq, cur_stream); 1132 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1133 errno = EAGAIN; 1134 return -1; 1135 } 1136 1137 /* if nonblocking socket, return EINPROGRESS */ 1138 if (socket->opts & MTCP_NONBLOCK) { 1139 errno = EINPROGRESS; 1140 return -1; 1141 1142 } else { 1143 while (1) { 1144 if (!cur_stream) { 1145 TRACE_ERROR("STREAM DESTROYED\n"); 1146 errno = ETIMEDOUT; 1147 return -1; 1148 } 1149 if (cur_stream->state > TCP_ST_ESTABLISHED) { 1150 TRACE_ERROR("Socket %d: weird state %s\n", 1151 sockid, TCPStateToString(cur_stream)); 1152 // TODO: how to handle this? 1153 errno = ENOSYS; 1154 return -1; 1155 } 1156 1157 if (cur_stream->state == TCP_ST_ESTABLISHED) { 1158 break; 1159 } 1160 usleep(1000); 1161 } 1162 } 1163 1164 return 0; 1165 } 1166 /*----------------------------------------------------------------------------*/ 1167 static inline int 1168 CloseStreamSocket(mctx_t mctx, int sockid) 1169 { 1170 mtcp_manager_t mtcp; 1171 tcp_stream *cur_stream; 1172 int ret; 1173 1174 mtcp = GetMTCPManager(mctx); 1175 if (!mtcp) { 1176 errno = EACCES; 1177 return -1; 1178 } 1179 1180 cur_stream = mtcp->smap[sockid].stream; 1181 if (!cur_stream) { 1182 TRACE_API("Socket %d: stream does not exist.\n", sockid); 1183 errno = ENOTCONN; 1184 return -1; 1185 } 1186 1187 if (cur_stream->closed) { 1188 TRACE_API("Socket %d (Stream %u): already closed stream\n", 1189 sockid, cur_stream->id); 1190 return 0; 1191 } 1192 cur_stream->closed = TRUE; 1193 1194 TRACE_API("Stream %d: closing the stream.\n", cur_stream->id); 1195 1196 /* 141029 dhkim: Check this! */ 1197 cur_stream->socket = NULL; 1198 1199 if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 1200 TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n", 1201 cur_stream->id); 1202 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1203 StreamEnqueue(mtcp->destroyq, cur_stream); 1204 mtcp->wakeup_flag = TRUE; 1205 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1206 return 0; 1207 1208 } else if (cur_stream->state == TCP_ST_SYN_SENT) { 1209 #if 1 1210 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1211 StreamEnqueue(mtcp->destroyq, cur_stream); 1212 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1213 mtcp->wakeup_flag = TRUE; 1214 #endif 1215 return -1; 1216 1217 } else if (cur_stream->state != TCP_ST_ESTABLISHED && 1218 cur_stream->state != TCP_ST_CLOSE_WAIT) { 1219 TRACE_API("Stream %d at state %s\n", 1220 cur_stream->id, TCPStateToString(cur_stream)); 1221 errno = EBADF; 1222 return -1; 1223 } 1224 1225 SQ_LOCK(&mtcp->ctx->close_lock); 1226 cur_stream->sndvar->on_closeq = TRUE; 1227 ret = StreamEnqueue(mtcp->closeq, cur_stream); 1228 mtcp->wakeup_flag = TRUE; 1229 SQ_UNLOCK(&mtcp->ctx->close_lock); 1230 1231 if (ret < 0) { 1232 TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 1233 errno = EAGAIN; 1234 return -1; 1235 } 1236 1237 return 0; 1238 } 1239 /*----------------------------------------------------------------------------*/ 1240 static inline int 1241 CloseListeningSocket(mctx_t mctx, int sockid) 1242 { 1243 mtcp_manager_t mtcp; 1244 struct tcp_listener *listener; 1245 1246 mtcp = GetMTCPManager(mctx); 1247 if (!mtcp) { 1248 errno = EACCES; 1249 return -1; 1250 } 1251 1252 listener = mtcp->smap[sockid].listener; 1253 if (!listener) { 1254 errno = EINVAL; 1255 return -1; 1256 } 1257 1258 if (listener->acceptq) { 1259 DestroyStreamQueue(listener->acceptq); 1260 listener->acceptq = NULL; 1261 } 1262 1263 pthread_mutex_lock(&listener->accept_lock); 1264 pthread_cond_signal(&listener->accept_cond); 1265 pthread_mutex_unlock(&listener->accept_lock); 1266 1267 pthread_cond_destroy(&listener->accept_cond); 1268 pthread_mutex_destroy(&listener->accept_lock); 1269 1270 free(listener); 1271 mtcp->smap[sockid].listener = NULL; 1272 1273 return 0; 1274 } 1275 /*----------------------------------------------------------------------------*/ 1276 int 1277 mtcp_close(mctx_t mctx, int sockid) 1278 { 1279 mtcp_manager_t mtcp; 1280 int ret; 1281 1282 mtcp = GetMTCPManager(mctx); 1283 if (!mtcp) { 1284 errno = EACCES; 1285 return -1; 1286 } 1287 1288 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1289 TRACE_API("Socket id %d out of range.\n", sockid); 1290 errno = EBADF; 1291 return -1; 1292 } 1293 1294 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 1295 TRACE_API("Invalid socket id: %d\n", sockid); 1296 errno = EBADF; 1297 return -1; 1298 } 1299 1300 TRACE_API("Socket %d: mtcp_close called.\n", sockid); 1301 1302 switch (mtcp->smap[sockid].socktype) { 1303 case MOS_SOCK_STREAM: 1304 ret = CloseStreamSocket(mctx, sockid); 1305 break; 1306 1307 case MOS_SOCK_STREAM_LISTEN: 1308 ret = CloseListeningSocket(mctx, sockid); 1309 break; 1310 1311 case MOS_SOCK_EPOLL: 1312 ret = CloseEpollSocket(mctx, sockid); 1313 break; 1314 1315 case MOS_SOCK_PIPE: 1316 ret = PipeClose(mctx, sockid); 1317 break; 1318 1319 default: 1320 errno = EINVAL; 1321 ret = -1; 1322 break; 1323 } 1324 1325 FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 1326 1327 return ret; 1328 } 1329 /*----------------------------------------------------------------------------*/ 1330 int 1331 mtcp_abort(mctx_t mctx, int sockid) 1332 { 1333 mtcp_manager_t mtcp; 1334 tcp_stream *cur_stream; 1335 int ret; 1336 1337 mtcp = GetMTCPManager(mctx); 1338 if (!mtcp) { 1339 errno = EACCES; 1340 return -1; 1341 } 1342 1343 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1344 TRACE_API("Socket id %d out of range.\n", sockid); 1345 errno = EBADF; 1346 return -1; 1347 } 1348 1349 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 1350 TRACE_API("Invalid socket id: %d\n", sockid); 1351 errno = EBADF; 1352 return -1; 1353 } 1354 1355 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 1356 TRACE_API("Not an end socket. id: %d\n", sockid); 1357 errno = ENOTSOCK; 1358 return -1; 1359 } 1360 1361 cur_stream = mtcp->smap[sockid].stream; 1362 if (!cur_stream) { 1363 TRACE_API("Stream %d: does not exist.\n", sockid); 1364 errno = ENOTCONN; 1365 return -1; 1366 } 1367 1368 TRACE_API("Socket %d: mtcp_abort()\n", sockid); 1369 1370 FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 1371 cur_stream->socket = NULL; 1372 1373 if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 1374 TRACE_API("Stream %d: connection already reset.\n", sockid); 1375 return ERROR; 1376 1377 } else if (cur_stream->state == TCP_ST_SYN_SENT) { 1378 /* TODO: this should notify event failure to all 1379 previous read() or write() calls */ 1380 cur_stream->state = TCP_ST_CLOSED_RSVD; 1381 cur_stream->close_reason = TCP_ACTIVE_CLOSE; 1382 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1383 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1384 StreamEnqueue(mtcp->destroyq, cur_stream); 1385 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1386 mtcp->wakeup_flag = TRUE; 1387 return 0; 1388 1389 } else if (cur_stream->state == TCP_ST_CLOSING || 1390 cur_stream->state == TCP_ST_LAST_ACK || 1391 cur_stream->state == TCP_ST_TIME_WAIT) { 1392 cur_stream->state = TCP_ST_CLOSED_RSVD; 1393 cur_stream->close_reason = TCP_ACTIVE_CLOSE; 1394 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1395 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1396 StreamEnqueue(mtcp->destroyq, cur_stream); 1397 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1398 mtcp->wakeup_flag = TRUE; 1399 return 0; 1400 } 1401 1402 /* the stream structure will be destroyed after sending RST */ 1403 if (cur_stream->sndvar->on_resetq) { 1404 TRACE_ERROR("Stream %d: calling mtcp_abort() " 1405 "when in reset queue.\n", sockid); 1406 errno = ECONNRESET; 1407 return -1; 1408 } 1409 SQ_LOCK(&mtcp->ctx->reset_lock); 1410 cur_stream->sndvar->on_resetq = TRUE; 1411 ret = StreamEnqueue(mtcp->resetq, cur_stream); 1412 SQ_UNLOCK(&mtcp->ctx->reset_lock); 1413 mtcp->wakeup_flag = TRUE; 1414 1415 if (ret < 0) { 1416 TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 1417 errno = EAGAIN; 1418 return -1; 1419 } 1420 1421 return 0; 1422 } 1423 /*----------------------------------------------------------------------------*/ 1424 static inline int 1425 CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len) 1426 { 1427 struct tcp_recv_vars *rcvvar = cur_stream->rcvvar; 1428 int copylen; 1429 tcprb_t *rb = rcvvar->rcvbuf; 1430 if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) { 1431 errno = EAGAIN; 1432 return -1; 1433 } 1434 tcprb_setpile(rb, rb->pile + copylen); 1435 1436 rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb); 1437 //printf("rcv_wnd: %d\n", rcvvar->rcv_wnd); 1438 1439 /* Advertise newly freed receive buffer */ 1440 if (cur_stream->need_wnd_adv) { 1441 if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) { 1442 if (!cur_stream->sndvar->on_ackq) { 1443 SQ_LOCK(&mtcp->ctx->ackq_lock); 1444 cur_stream->sndvar->on_ackq = TRUE; 1445 StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */ 1446 SQ_UNLOCK(&mtcp->ctx->ackq_lock); 1447 cur_stream->need_wnd_adv = FALSE; 1448 mtcp->wakeup_flag = TRUE; 1449 } 1450 } 1451 } 1452 1453 return copylen; 1454 } 1455 /*----------------------------------------------------------------------------*/ 1456 ssize_t 1457 mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len) 1458 { 1459 mtcp_manager_t mtcp; 1460 socket_map_t socket; 1461 tcp_stream *cur_stream; 1462 struct tcp_recv_vars *rcvvar; 1463 int event_remaining, merged_len; 1464 int ret; 1465 1466 mtcp = GetMTCPManager(mctx); 1467 if (!mtcp) { 1468 errno = EACCES; 1469 return -1; 1470 } 1471 1472 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1473 TRACE_API("Socket id %d out of range.\n", sockid); 1474 errno = EBADF; 1475 return -1; 1476 } 1477 1478 socket = &mtcp->smap[sockid]; 1479 if (socket->socktype == MOS_SOCK_UNUSED) { 1480 TRACE_API("Invalid socket id: %d\n", sockid); 1481 errno = EBADF; 1482 return -1; 1483 } 1484 1485 if (socket->socktype == MOS_SOCK_PIPE) { 1486 return PipeRead(mctx, sockid, buf, len); 1487 } 1488 1489 if (socket->socktype != MOS_SOCK_STREAM) { 1490 TRACE_API("Not an end socket. id: %d\n", sockid); 1491 errno = ENOTSOCK; 1492 return -1; 1493 } 1494 1495 /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 1496 cur_stream = socket->stream; 1497 if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf || 1498 !(cur_stream->state >= TCP_ST_ESTABLISHED && 1499 cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 1500 errno = ENOTCONN; 1501 return -1; 1502 } 1503 1504 rcvvar = cur_stream->rcvvar; 1505 1506 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1507 1508 /* if CLOSE_WAIT, return 0 if there is no payload */ 1509 if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 1510 if (!rcvvar->rcvbuf) 1511 return 0; 1512 1513 if (merged_len == 0) 1514 return 0; 1515 } 1516 1517 /* return EAGAIN if no receive buffer */ 1518 if (socket->opts & MTCP_NONBLOCK) { 1519 if (!rcvvar->rcvbuf || merged_len == 0) { 1520 errno = EAGAIN; 1521 return -1; 1522 } 1523 } 1524 1525 SBUF_LOCK(&rcvvar->read_lock); 1526 1527 ret = CopyToUser(mtcp, cur_stream, buf, len); 1528 1529 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1530 event_remaining = FALSE; 1531 /* if there are remaining payload, generate EPOLLIN */ 1532 /* (may due to insufficient user buffer) */ 1533 if (socket->epoll & MOS_EPOLLIN) { 1534 if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 1535 event_remaining = TRUE; 1536 } 1537 } 1538 /* if waiting for close, notify it if no remaining data */ 1539 if (cur_stream->state == TCP_ST_CLOSE_WAIT && 1540 merged_len == 0 && ret > 0) { 1541 event_remaining = TRUE; 1542 } 1543 1544 SBUF_UNLOCK(&rcvvar->read_lock); 1545 1546 if (event_remaining) { 1547 if (socket->epoll) { 1548 AddEpollEvent(mtcp->ep, 1549 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 1550 } 1551 } 1552 1553 TRACE_API("Stream %d: mtcp_read() returning %d\n", cur_stream->id, ret); 1554 return ret; 1555 } 1556 /*----------------------------------------------------------------------------*/ 1557 ssize_t 1558 mtcp_readv(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV) 1559 { 1560 mtcp_manager_t mtcp; 1561 socket_map_t socket; 1562 tcp_stream *cur_stream; 1563 struct tcp_recv_vars *rcvvar; 1564 int ret, bytes_read, i; 1565 int event_remaining, merged_len; 1566 1567 mtcp = GetMTCPManager(mctx); 1568 if (!mtcp) { 1569 errno = EACCES; 1570 return -1; 1571 } 1572 1573 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1574 TRACE_API("Socket id %d out of range.\n", sockid); 1575 errno = EBADF; 1576 return -1; 1577 } 1578 1579 socket = &mtcp->smap[sockid]; 1580 if (socket->socktype == MOS_SOCK_UNUSED) { 1581 TRACE_API("Invalid socket id: %d\n", sockid); 1582 errno = EBADF; 1583 return -1; 1584 } 1585 1586 if (socket->socktype != MOS_SOCK_STREAM) { 1587 TRACE_API("Not an end socket. id: %d\n", sockid); 1588 errno = ENOTSOCK; 1589 return -1; 1590 } 1591 1592 /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 1593 cur_stream = socket->stream; 1594 if (!cur_stream || 1595 !(cur_stream->state >= TCP_ST_ESTABLISHED && 1596 cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 1597 errno = ENOTCONN; 1598 return -1; 1599 } 1600 1601 rcvvar = cur_stream->rcvvar; 1602 1603 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1604 1605 /* if CLOSE_WAIT, return 0 if there is no payload */ 1606 if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 1607 if (!rcvvar->rcvbuf) 1608 return 0; 1609 1610 if (merged_len == 0) 1611 return 0; 1612 } 1613 1614 /* return EAGAIN if no receive buffer */ 1615 if (socket->opts & MTCP_NONBLOCK) { 1616 if (!rcvvar->rcvbuf || merged_len == 0) { 1617 errno = EAGAIN; 1618 return -1; 1619 } 1620 } 1621 1622 SBUF_LOCK(&rcvvar->read_lock); 1623 1624 /* read and store the contents to the vectored buffers */ 1625 bytes_read = 0; 1626 for (i = 0; i < numIOV; i++) { 1627 if (iov[i].iov_len <= 0) 1628 continue; 1629 1630 ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 1631 if (ret <= 0) 1632 break; 1633 1634 bytes_read += ret; 1635 1636 if (ret < iov[i].iov_len) 1637 break; 1638 } 1639 1640 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1641 1642 event_remaining = FALSE; 1643 /* if there are remaining payload, generate read event */ 1644 /* (may due to insufficient user buffer) */ 1645 if (socket->epoll & MOS_EPOLLIN) { 1646 if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 1647 event_remaining = TRUE; 1648 } 1649 } 1650 /* if waiting for close, notify it if no remaining data */ 1651 if (cur_stream->state == TCP_ST_CLOSE_WAIT && 1652 merged_len == 0 && bytes_read > 0) { 1653 event_remaining = TRUE; 1654 } 1655 1656 SBUF_UNLOCK(&rcvvar->read_lock); 1657 1658 if(event_remaining) { 1659 if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) { 1660 AddEpollEvent(mtcp->ep, 1661 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 1662 } 1663 } 1664 1665 TRACE_API("Stream %d: mtcp_readv() returning %d\n", 1666 cur_stream->id, bytes_read); 1667 return bytes_read; 1668 } 1669 /*----------------------------------------------------------------------------*/ 1670 static inline int 1671 CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, const char *buf, int len) 1672 { 1673 struct tcp_send_vars *sndvar = cur_stream->sndvar; 1674 int sndlen; 1675 int ret; 1676 1677 sndlen = MIN((int)sndvar->snd_wnd, len); 1678 if (sndlen <= 0) { 1679 errno = EAGAIN; 1680 return -1; 1681 } 1682 1683 /* allocate send buffer if not exist */ 1684 if (!sndvar->sndbuf) { 1685 sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1); 1686 if (!sndvar->sndbuf) { 1687 cur_stream->close_reason = TCP_NO_MEM; 1688 /* notification may not required due to -1 return */ 1689 errno = ENOMEM; 1690 return -1; 1691 } 1692 } 1693 1694 ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen); 1695 assert(ret == sndlen); 1696 sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len; 1697 if (ret <= 0) { 1698 TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n", 1699 ret, sndlen, sndvar->sndbuf->len); 1700 errno = EAGAIN; 1701 return -1; 1702 } 1703 1704 if (sndvar->snd_wnd <= 0) { 1705 TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n", 1706 cur_stream->id, sndvar->snd_wnd); 1707 } 1708 1709 return ret; 1710 } 1711 /*----------------------------------------------------------------------------*/ 1712 ssize_t 1713 mtcp_write(mctx_t mctx, int sockid, const char *buf, size_t len) 1714 { 1715 mtcp_manager_t mtcp; 1716 socket_map_t socket; 1717 tcp_stream *cur_stream; 1718 struct tcp_send_vars *sndvar; 1719 int ret; 1720 1721 mtcp = GetMTCPManager(mctx); 1722 if (!mtcp) { 1723 errno = EACCES; 1724 return -1; 1725 } 1726 1727 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1728 TRACE_API("Socket id %d out of range.\n", sockid); 1729 errno = EBADF; 1730 return -1; 1731 } 1732 1733 socket = &mtcp->smap[sockid]; 1734 if (socket->socktype == MOS_SOCK_UNUSED) { 1735 TRACE_API("Invalid socket id: %d\n", sockid); 1736 errno = EBADF; 1737 return -1; 1738 } 1739 1740 if (socket->socktype == MOS_SOCK_PIPE) { 1741 return PipeWrite(mctx, sockid, buf, len); 1742 } 1743 1744 if (socket->socktype != MOS_SOCK_STREAM) { 1745 TRACE_API("Not an end socket. id: %d\n", sockid); 1746 errno = ENOTSOCK; 1747 return -1; 1748 } 1749 1750 cur_stream = socket->stream; 1751 if (!cur_stream || 1752 !(cur_stream->state == TCP_ST_ESTABLISHED || 1753 cur_stream->state == TCP_ST_CLOSE_WAIT)) { 1754 errno = ENOTCONN; 1755 return -1; 1756 } 1757 1758 if (len <= 0) { 1759 if (socket->opts & MTCP_NONBLOCK) { 1760 errno = EAGAIN; 1761 return -1; 1762 } else { 1763 return 0; 1764 } 1765 } 1766 1767 sndvar = cur_stream->sndvar; 1768 1769 SBUF_LOCK(&sndvar->write_lock); 1770 ret = CopyFromUser(mtcp, cur_stream, buf, len); 1771 1772 SBUF_UNLOCK(&sndvar->write_lock); 1773 1774 if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 1775 SQ_LOCK(&mtcp->ctx->sendq_lock); 1776 sndvar->on_sendq = TRUE; 1777 StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 1778 SQ_UNLOCK(&mtcp->ctx->sendq_lock); 1779 mtcp->wakeup_flag = TRUE; 1780 } 1781 1782 if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) { 1783 ret = -1; 1784 errno = EAGAIN; 1785 } 1786 1787 /* if there are remaining sending buffer, generate write event */ 1788 if (sndvar->snd_wnd > 0) { 1789 if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 1790 AddEpollEvent(mtcp->ep, 1791 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 1792 } 1793 } 1794 1795 TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret); 1796 return ret; 1797 } 1798 /*----------------------------------------------------------------------------*/ 1799 ssize_t 1800 mtcp_writev(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV) 1801 { 1802 mtcp_manager_t mtcp; 1803 socket_map_t socket; 1804 tcp_stream *cur_stream; 1805 struct tcp_send_vars *sndvar; 1806 int ret, to_write, i; 1807 1808 mtcp = GetMTCPManager(mctx); 1809 if (!mtcp) { 1810 errno = EACCES; 1811 return -1; 1812 } 1813 1814 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1815 TRACE_API("Socket id %d out of range.\n", sockid); 1816 errno = EBADF; 1817 return -1; 1818 } 1819 1820 socket = &mtcp->smap[sockid]; 1821 if (socket->socktype == MOS_SOCK_UNUSED) { 1822 TRACE_API("Invalid socket id: %d\n", sockid); 1823 errno = EBADF; 1824 return -1; 1825 } 1826 1827 if (socket->socktype != MOS_SOCK_STREAM) { 1828 TRACE_API("Not an end socket. id: %d\n", sockid); 1829 errno = ENOTSOCK; 1830 return -1; 1831 } 1832 1833 cur_stream = socket->stream; 1834 if (!cur_stream || 1835 !(cur_stream->state == TCP_ST_ESTABLISHED || 1836 cur_stream->state == TCP_ST_CLOSE_WAIT)) { 1837 errno = ENOTCONN; 1838 return -1; 1839 } 1840 1841 sndvar = cur_stream->sndvar; 1842 SBUF_LOCK(&sndvar->write_lock); 1843 1844 /* write from the vectored buffers */ 1845 to_write = 0; 1846 for (i = 0; i < numIOV; i++) { 1847 if (iov[i].iov_len <= 0) 1848 continue; 1849 1850 ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 1851 if (ret <= 0) 1852 break; 1853 1854 to_write += ret; 1855 1856 if (ret < iov[i].iov_len) 1857 break; 1858 } 1859 SBUF_UNLOCK(&sndvar->write_lock); 1860 1861 if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 1862 SQ_LOCK(&mtcp->ctx->sendq_lock); 1863 sndvar->on_sendq = TRUE; 1864 StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 1865 SQ_UNLOCK(&mtcp->ctx->sendq_lock); 1866 mtcp->wakeup_flag = TRUE; 1867 } 1868 1869 if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) { 1870 to_write = -1; 1871 errno = EAGAIN; 1872 } 1873 1874 /* if there are remaining sending buffer, generate write event */ 1875 if (sndvar->snd_wnd > 0) { 1876 if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 1877 AddEpollEvent(mtcp->ep, 1878 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 1879 } 1880 } 1881 1882 TRACE_API("Stream %d: mtcp_writev() returning %d\n", 1883 cur_stream->id, to_write); 1884 return to_write; 1885 } 1886 /*----------------------------------------------------------------------------*/ 1887 uint32_t 1888 mtcp_get_connection_cnt(mctx_t mctx) 1889 { 1890 mtcp_manager_t mtcp; 1891 mtcp = GetMTCPManager(mctx); 1892 if (!mtcp) { 1893 errno = EACCES; 1894 return -1; 1895 } 1896 1897 if (mtcp->num_msp > 0) 1898 return mtcp->flow_cnt / 2; 1899 else 1900 return mtcp->flow_cnt; 1901 } 1902 /*----------------------------------------------------------------------------*/ 1903