1 #include <sys/queue.h> 2 #include <sys/ioctl.h> 3 #include <limits.h> 4 #include <unistd.h> 5 #include <assert.h> 6 #include <string.h> 7 8 #ifdef DARWIN 9 #include <netinet/tcp.h> 10 #include <netinet/ip.h> 11 #include <netinet/if_ether.h> 12 #endif 13 14 #include "mtcp.h" 15 #include "mtcp_api.h" 16 #include "tcp_in.h" 17 #include "tcp_stream.h" 18 #include "tcp_out.h" 19 #include "ip_out.h" 20 #include "eventpoll.h" 21 #include "pipe.h" 22 #include "fhash.h" 23 #include "addr_pool.h" 24 #include "util.h" 25 #include "config.h" 26 #include "debug.h" 27 #include "eventpoll.h" 28 #include "mos_api.h" 29 #include "tcp_rb.h" 30 31 #define MAX(a, b) ((a)>(b)?(a):(b)) 32 #define MIN(a, b) ((a)<(b)?(a):(b)) 33 34 /*----------------------------------------------------------------------------*/ 35 /** Stop monitoring the socket! (function prototype) 36 * @param [in] mctx: mtcp context 37 * @param [in] sock: monitoring stream socket id 38 * @param [in] side: side of monitoring (client side, server side or both) 39 * 40 * This function is now DEPRECATED and is only used within mOS core... 41 */ 42 int 43 mtcp_cb_stop(mctx_t mctx, int sock, int side); 44 /*----------------------------------------------------------------------------*/ 45 /** Reset the connection (send RST packets to both sides) 46 * (We need to decide the API for this.) 47 */ 48 //int 49 //mtcp_cb_reset(mctx_t mctx, int sock, int side); 50 /*----------------------------------------------------------------------------*/ 51 inline mtcp_manager_t 52 GetMTCPManager(mctx_t mctx) 53 { 54 if (!mctx) { 55 errno = EACCES; 56 return NULL; 57 } 58 59 if (mctx->cpu < 0 || mctx->cpu >= num_cpus) { 60 errno = EINVAL; 61 return NULL; 62 } 63 64 if (!g_mtcp[mctx->cpu] || g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) { 65 errno = EPERM; 66 return NULL; 67 } 68 69 return g_mtcp[mctx->cpu]; 70 } 71 /*----------------------------------------------------------------------------*/ 72 static inline int 73 GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen) 74 { 75 tcp_stream *cur_stream; 76 77 if (!socket->stream) { 78 errno = EBADF; 79 return -1; 80 } 81 82 cur_stream = socket->stream; 83 if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 84 if (cur_stream->close_reason == TCP_TIMEDOUT || 85 cur_stream->close_reason == TCP_CONN_FAIL || 86 cur_stream->close_reason == TCP_CONN_LOST) { 87 *(int *)optval = ETIMEDOUT; 88 *optlen = sizeof(int); 89 90 return 0; 91 } 92 } 93 94 if (cur_stream->state == TCP_ST_CLOSE_WAIT || 95 cur_stream->state == TCP_ST_CLOSED_RSVD) { 96 if (cur_stream->close_reason == TCP_RESET) { 97 *(int *)optval = ECONNRESET; 98 *optlen = sizeof(int); 99 100 return 0; 101 } 102 } 103 104 if (cur_stream->state == TCP_ST_SYN_SENT && 105 errno == EINPROGRESS) { 106 *(int *)optval = errno; 107 *optlen = sizeof(int); 108 109 return -1; 110 } 111 112 /* 113 * `base case`: If socket sees no so_error, then 114 * this also means close_reason will always be 115 * TCP_NOT_CLOSED. 116 */ 117 if (cur_stream->close_reason == TCP_NOT_CLOSED) { 118 *(int *)optval = 0; 119 *optlen = sizeof(int); 120 121 return 0; 122 } 123 124 errno = ENOSYS; 125 return -1; 126 } 127 /*----------------------------------------------------------------------------*/ 128 int 129 mtcp_getsockname(mctx_t mctx, int sockid, struct sockaddr *addr, 130 socklen_t *addrlen) 131 { 132 mtcp_manager_t mtcp; 133 socket_map_t socket; 134 135 mtcp = GetMTCPManager(mctx); 136 if (!mtcp) { 137 return -1; 138 } 139 140 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 141 TRACE_API("Socket id %d out of range.\n", sockid); 142 errno = EBADF; 143 return -1; 144 } 145 146 socket = &mtcp->smap[sockid]; 147 if (socket->socktype == MOS_SOCK_UNUSED) { 148 TRACE_API("Invalid socket id: %d\n", sockid); 149 errno = EBADF; 150 return -1; 151 } 152 153 if (*addrlen <= 0) { 154 TRACE_API("Invalid addrlen: %d\n", *addrlen); 155 errno = EINVAL; 156 return -1; 157 } 158 159 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 160 socket->socktype != MOS_SOCK_STREAM) { 161 TRACE_API("Invalid socket id: %d\n", sockid); 162 errno = ENOTSOCK; 163 return -1; 164 } 165 166 *(struct sockaddr_in *)addr = socket->saddr; 167 *addrlen = sizeof(socket->saddr); 168 169 return 0; 170 } 171 /*----------------------------------------------------------------------------*/ 172 int 173 mtcp_getsockopt(mctx_t mctx, int sockid, int level, 174 int optname, void *optval, socklen_t *optlen) 175 { 176 mtcp_manager_t mtcp; 177 socket_map_t socket; 178 179 mtcp = GetMTCPManager(mctx); 180 if (!mtcp) { 181 errno = EACCES; 182 return -1; 183 } 184 185 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 186 TRACE_API("Socket id %d out of range.\n", sockid); 187 errno = EBADF; 188 return -1; 189 } 190 191 switch (level) { 192 case SOL_SOCKET: 193 socket = &mtcp->smap[sockid]; 194 if (socket->socktype == MOS_SOCK_UNUSED) { 195 TRACE_API("Invalid socket id: %d\n", sockid); 196 errno = EBADF; 197 return -1; 198 } 199 200 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 201 socket->socktype != MOS_SOCK_STREAM) { 202 TRACE_API("Invalid socket id: %d\n", sockid); 203 errno = ENOTSOCK; 204 return -1; 205 } 206 207 if (optname == SO_ERROR) { 208 if (socket->socktype == MOS_SOCK_STREAM) { 209 return GetSocketError(socket, optval, optlen); 210 } 211 } 212 break; 213 case SOL_MONSOCKET: 214 /* check if the calling thread is in MOS context */ 215 if (mtcp->ctx->thread != pthread_self()) { 216 errno = EPERM; 217 return -1; 218 } 219 /* 220 * All options will only work for active 221 * monitor stream sockets 222 */ 223 socket = &mtcp->msmap[sockid]; 224 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 225 TRACE_API("Invalid socket id: %d\n", sockid); 226 errno = ENOTSOCK; 227 return -1; 228 } 229 230 switch (optname) { 231 case MOS_FRAGINFO_CLIBUF: 232 return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen); 233 case MOS_FRAGINFO_SVRBUF: 234 return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen); 235 case MOS_INFO_CLIBUF: 236 return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen); 237 case MOS_INFO_SVRBUF: 238 return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen); 239 case MOS_TCP_STATE_CLI: 240 return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI, 241 optval, optlen); 242 case MOS_TCP_STATE_SVR: 243 return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR, 244 optval, optlen); 245 case MOS_TIMESTAMP: 246 return GetLastTimestamp(socket->monitor_stream->stream, 247 (uint32_t *)optval, 248 optlen); 249 default: 250 TRACE_API("can't recognize the optname=%d\n", optname); 251 assert(0); 252 } 253 break; 254 } 255 errno = ENOSYS; 256 return -1; 257 } 258 /*----------------------------------------------------------------------------*/ 259 int 260 mtcp_setsockopt(mctx_t mctx, int sockid, int level, 261 int optname, const void *optval, socklen_t optlen) 262 { 263 mtcp_manager_t mtcp; 264 socket_map_t socket; 265 tcprb_t *rb; 266 267 mtcp = GetMTCPManager(mctx); 268 if (!mtcp) { 269 errno = EACCES; 270 return -1; 271 } 272 273 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 274 TRACE_API("Socket id %d out of range.\n", sockid); 275 errno = EBADF; 276 return -1; 277 } 278 279 switch (level) { 280 case SOL_SOCKET: 281 socket = &mtcp->smap[sockid]; 282 if (socket->socktype == MOS_SOCK_UNUSED) { 283 TRACE_API("Invalid socket id: %d\n", sockid); 284 errno = EBADF; 285 return -1; 286 } 287 288 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 289 socket->socktype != MOS_SOCK_STREAM) { 290 TRACE_API("Invalid socket id: %d\n", sockid); 291 errno = ENOTSOCK; 292 return -1; 293 } 294 break; 295 case SOL_MONSOCKET: 296 socket = &mtcp->msmap[sockid]; 297 /* 298 * checking of calling thread to be in MOS context is 299 * disabled since both optnames can be called from 300 * `application' context (on passive sockets) 301 */ 302 /* 303 * if (mtcp->ctx->thread != pthread_self()) 304 * return -1; 305 */ 306 307 switch (optname) { 308 case MOS_CLIOVERLAP: 309 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 310 socket->monitor_stream->stream->rcvvar->rcvbuf : 311 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 312 if (rb == NULL) { 313 errno = EFAULT; 314 return -1; 315 } 316 if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) { 317 errno = EINVAL; 318 return -1; 319 } else 320 return 0; 321 break; 322 case MOS_SVROVERLAP: 323 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 324 socket->monitor_stream->stream->rcvvar->rcvbuf : 325 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 326 if (rb == NULL) { 327 errno = EFAULT; 328 return -1; 329 } 330 if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) { 331 errno = EINVAL; 332 return -1; 333 } else 334 return 0; 335 break; 336 case MOS_CLIBUF: 337 #if 0 338 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 339 errno = EBADF; 340 return -1; 341 } 342 #endif 343 #ifdef DISABLE_DYN_RESIZE 344 if (*(int *)optval != 0) 345 return -1; 346 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 347 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 348 socket->monitor_stream->stream->rcvvar->rcvbuf : 349 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 350 if (rb) { 351 tcprb_resize_meta(rb, 0); 352 tcprb_resize(rb, 0); 353 } 354 } 355 return DisableBuf(socket, MOS_SIDE_CLI); 356 #else 357 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 358 socket->monitor_stream->stream->rcvvar->rcvbuf : 359 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 360 if (tcprb_resize_meta(rb, *(int *)optval) < 0) 361 return -1; 362 return tcprb_resize(rb, 363 (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 364 #endif 365 case MOS_SVRBUF: 366 #if 0 367 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 368 errno = EBADF; 369 return -1; 370 } 371 #endif 372 #ifdef DISABLE_DYN_RESIZE 373 if (*(int *)optval != 0) 374 return -1; 375 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 376 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 377 socket->monitor_stream->stream->rcvvar->rcvbuf : 378 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 379 if (rb) { 380 tcprb_resize_meta(rb, 0); 381 tcprb_resize(rb, 0); 382 } 383 } 384 return DisableBuf(socket, MOS_SIDE_SVR); 385 #else 386 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 387 socket->monitor_stream->stream->rcvvar->rcvbuf : 388 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 389 if (tcprb_resize_meta(rb, *(int *)optval) < 0) 390 return -1; 391 return tcprb_resize(rb, 392 (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 393 #endif 394 case MOS_FRAG_CLIBUF: 395 #if 0 396 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 397 errno = EBADF; 398 return -1; 399 } 400 #endif 401 #ifdef DISABLE_DYN_RESIZE 402 if (*(int *)optval != 0) 403 return -1; 404 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 405 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 406 socket->monitor_stream->stream->rcvvar->rcvbuf : 407 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 408 if (rb) 409 tcprb_resize(rb, 0); 410 } 411 return 0; 412 #else 413 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 414 socket->monitor_stream->stream->rcvvar->rcvbuf : 415 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 416 if (rb->len == 0) 417 return tcprb_resize_meta(rb, *(int *)optval); 418 else 419 return -1; 420 #endif 421 case MOS_FRAG_SVRBUF: 422 #if 0 423 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 424 errno = EBADF; 425 return -1; 426 } 427 #endif 428 #ifdef DISABLE_DYN_RESIZE 429 if (*(int *)optval != 0) 430 return -1; 431 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 432 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 433 socket->monitor_stream->stream->rcvvar->rcvbuf : 434 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 435 if (rb) 436 tcprb_resize(rb, 0); 437 } 438 return 0; 439 #else 440 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 441 socket->monitor_stream->stream->rcvvar->rcvbuf : 442 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 443 if (rb->len == 0) 444 return tcprb_resize_meta(rb, *(int *)optval); 445 else 446 return -1; 447 #endif 448 case MOS_SEQ_REMAP: 449 break; 450 case MOS_STOP_MON: 451 return mtcp_cb_stop(mctx, sockid, *(int *)optval); 452 default: 453 TRACE_API("invalid optname=%d\n", optname); 454 assert(0); 455 } 456 break; 457 } 458 459 errno = ENOSYS; 460 return -1; 461 } 462 /*----------------------------------------------------------------------------*/ 463 int 464 mtcp_setsock_nonblock(mctx_t mctx, int sockid) 465 { 466 mtcp_manager_t mtcp; 467 468 mtcp = GetMTCPManager(mctx); 469 if (!mtcp) { 470 errno = EACCES; 471 return -1; 472 } 473 474 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 475 TRACE_API("Socket id %d out of range.\n", sockid); 476 errno = EBADF; 477 return -1; 478 } 479 480 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 481 TRACE_API("Invalid socket id: %d\n", sockid); 482 errno = EBADF; 483 return -1; 484 } 485 486 mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 487 488 return 0; 489 } 490 /*----------------------------------------------------------------------------*/ 491 int 492 mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp) 493 { 494 mtcp_manager_t mtcp; 495 socket_map_t socket; 496 497 mtcp = GetMTCPManager(mctx); 498 if (!mtcp) { 499 errno = EACCES; 500 return -1; 501 } 502 503 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 504 TRACE_API("Socket id %d out of range.\n", sockid); 505 errno = EBADF; 506 return -1; 507 } 508 509 /* only support stream socket */ 510 socket = &mtcp->smap[sockid]; 511 512 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 513 socket->socktype != MOS_SOCK_STREAM) { 514 TRACE_API("Invalid socket id: %d\n", sockid); 515 errno = EBADF; 516 return -1; 517 } 518 519 if (!argp) { 520 errno = EFAULT; 521 return -1; 522 } 523 524 if (request == FIONREAD) { 525 tcp_stream *cur_stream; 526 tcprb_t *rbuf; 527 528 cur_stream = socket->stream; 529 if (!cur_stream) { 530 errno = EBADF; 531 return -1; 532 } 533 534 rbuf = cur_stream->rcvvar->rcvbuf; 535 *(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0; 536 537 } else if (request == FIONBIO) { 538 /* 539 * sockets can only be set to blocking/non-blocking 540 * modes during initialization 541 */ 542 if ((*(int *)argp)) 543 mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 544 else 545 mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK; 546 } else { 547 errno = EINVAL; 548 return -1; 549 } 550 551 return 0; 552 } 553 /*----------------------------------------------------------------------------*/ 554 static int 555 mtcp_monitor(mctx_t mctx, socket_map_t sock) 556 { 557 mtcp_manager_t mtcp; 558 struct mon_listener *monitor; 559 int sockid = sock->id; 560 561 mtcp = GetMTCPManager(mctx); 562 if (!mtcp) { 563 errno = EACCES; 564 return -1; 565 } 566 567 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 568 TRACE_API("Socket id %d out of range.\n", sockid); 569 errno = EBADF; 570 return -1; 571 } 572 573 if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) { 574 TRACE_API("Invalid socket id: %d\n", sockid); 575 errno = EBADF; 576 return -1; 577 } 578 579 if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM || 580 mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) { 581 TRACE_API("Not a monitor socket. id: %d\n", sockid); 582 errno = ENOTSOCK; 583 return -1; 584 } 585 586 monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener)); 587 if (!monitor) { 588 /* errno set from the malloc() */ 589 errno = ENOMEM; 590 return -1; 591 } 592 593 /* create a monitor-specific event queue */ 594 monitor->eq = CreateEventQueue(g_config.mos->max_concurrency); 595 if (!monitor->eq) { 596 TRACE_API("Can't create event queue (concurrency: %d) for " 597 "monitor read event registrations!\n", 598 g_config.mos->max_concurrency); 599 free(monitor); 600 errno = ENOMEM; 601 return -1; 602 } 603 604 /* set monitor-related basic parameters */ 605 #ifndef NEWEV 606 monitor->ude_id = UDE_OFFSET; 607 #endif 608 monitor->socket = sock; 609 monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL; 610 611 /* perform both sides monitoring by default */ 612 monitor->client_mon = monitor->server_mon = 1; 613 614 /* add monitor socket to the monitor list */ 615 TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link); 616 617 mtcp->msmap[sockid].monitor_listener = monitor; 618 619 return 0; 620 } 621 /*----------------------------------------------------------------------------*/ 622 int 623 mtcp_socket(mctx_t mctx, int domain, int type, int protocol) 624 { 625 mtcp_manager_t mtcp; 626 socket_map_t socket; 627 628 mtcp = GetMTCPManager(mctx); 629 if (!mtcp) { 630 errno = EACCES; 631 return -1; 632 } 633 634 if (domain != AF_INET) { 635 errno = EAFNOSUPPORT; 636 return -1; 637 } 638 639 if (type == (int)SOCK_STREAM) { 640 type = MOS_SOCK_STREAM; 641 } else if (type == MOS_SOCK_MONITOR_STREAM || 642 type == MOS_SOCK_MONITOR_RAW) { 643 /* do nothing for the time being */ 644 } else { 645 /* Not supported type */ 646 errno = EINVAL; 647 return -1; 648 } 649 650 socket = AllocateSocket(mctx, type); 651 if (!socket) { 652 errno = ENFILE; 653 return -1; 654 } 655 656 if (type == MOS_SOCK_MONITOR_STREAM || 657 type == MOS_SOCK_MONITOR_RAW) { 658 mtcp_manager_t mtcp = GetMTCPManager(mctx); 659 if (!mtcp) { 660 errno = EACCES; 661 return -1; 662 } 663 mtcp_monitor(mctx, socket); 664 #ifdef NEWEV 665 socket->monitor_listener->stree_dontcare = NULL; 666 socket->monitor_listener->stree_pre_rcv = NULL; 667 socket->monitor_listener->stree_post_snd = NULL; 668 #else 669 InitEvB(mtcp, &socket->monitor_listener->dontcare_evb); 670 InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb); 671 InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb); 672 #endif 673 } 674 675 return socket->id; 676 } 677 /*----------------------------------------------------------------------------*/ 678 int 679 mtcp_bind(mctx_t mctx, int sockid, 680 const struct sockaddr *addr, socklen_t addrlen) 681 { 682 mtcp_manager_t mtcp; 683 struct sockaddr_in *addr_in; 684 685 mtcp = GetMTCPManager(mctx); 686 if (!mtcp) { 687 errno = EACCES; 688 return -1; 689 } 690 691 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 692 TRACE_API("Socket id %d out of range.\n", sockid); 693 errno = EBADF; 694 return -1; 695 } 696 697 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 698 TRACE_API("Invalid socket id: %d\n", sockid); 699 errno = EBADF; 700 return -1; 701 } 702 703 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM && 704 mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 705 TRACE_API("Not a stream socket id: %d\n", sockid); 706 errno = ENOTSOCK; 707 return -1; 708 } 709 710 if (!addr) { 711 TRACE_API("Socket %d: empty address!\n", sockid); 712 errno = EINVAL; 713 return -1; 714 } 715 716 if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) { 717 TRACE_API("Socket %d: adress already bind for this socket.\n", sockid); 718 errno = EINVAL; 719 return -1; 720 } 721 722 /* we only allow bind() for AF_INET address */ 723 if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 724 TRACE_API("Socket %d: invalid argument!\n", sockid); 725 errno = EINVAL; 726 return -1; 727 } 728 729 if (mtcp->listener) { 730 TRACE_API("Address already bound!\n"); 731 errno = EINVAL; 732 return -1; 733 } 734 addr_in = (struct sockaddr_in *)addr; 735 mtcp->smap[sockid].saddr = *addr_in; 736 mtcp->smap[sockid].opts |= MTCP_ADDR_BIND; 737 738 return 0; 739 } 740 /*----------------------------------------------------------------------------*/ 741 int 742 mtcp_listen(mctx_t mctx, int sockid, int backlog) 743 { 744 mtcp_manager_t mtcp; 745 struct tcp_listener *listener; 746 747 mtcp = GetMTCPManager(mctx); 748 if (!mtcp) { 749 errno = EACCES; 750 return -1; 751 } 752 753 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 754 TRACE_API("Socket id %d out of range.\n", sockid); 755 errno = EBADF; 756 return -1; 757 } 758 759 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 760 TRACE_API("Invalid socket id: %d\n", sockid); 761 errno = EBADF; 762 return -1; 763 } 764 765 if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) { 766 mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN; 767 } 768 769 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 770 TRACE_API("Not a listening socket. id: %d\n", sockid); 771 errno = ENOTSOCK; 772 return -1; 773 } 774 775 if (backlog <= 0 || backlog > g_config.mos->max_concurrency) { 776 errno = EINVAL; 777 return -1; 778 } 779 780 listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener)); 781 if (!listener) { 782 /* errno set from the malloc() */ 783 errno = ENOMEM; 784 return -1; 785 } 786 787 listener->sockid = sockid; 788 listener->backlog = backlog; 789 listener->socket = &mtcp->smap[sockid]; 790 791 if (pthread_cond_init(&listener->accept_cond, NULL)) { 792 perror("pthread_cond_init of ctx->accept_cond\n"); 793 /* errno set by pthread_cond_init() */ 794 free(listener); 795 return -1; 796 } 797 if (pthread_mutex_init(&listener->accept_lock, NULL)) { 798 perror("pthread_mutex_init of ctx->accept_lock\n"); 799 /* errno set by pthread_mutex_init() */ 800 free(listener); 801 return -1; 802 } 803 804 listener->acceptq = CreateStreamQueue(backlog); 805 if (!listener->acceptq) { 806 free(listener); 807 errno = ENOMEM; 808 return -1; 809 } 810 811 mtcp->smap[sockid].listener = listener; 812 mtcp->listener = listener; 813 814 return 0; 815 } 816 /*----------------------------------------------------------------------------*/ 817 int 818 mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen) 819 { 820 mtcp_manager_t mtcp; 821 struct tcp_listener *listener; 822 socket_map_t socket; 823 tcp_stream *accepted = NULL; 824 825 mtcp = GetMTCPManager(mctx); 826 if (!mtcp) { 827 errno = EACCES; 828 return -1; 829 } 830 831 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 832 TRACE_API("Socket id %d out of range.\n", sockid); 833 errno = EBADF; 834 return -1; 835 } 836 837 /* requires listening socket */ 838 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 839 errno = EINVAL; 840 return -1; 841 } 842 843 listener = mtcp->smap[sockid].listener; 844 845 /* dequeue from the acceptq without lock first */ 846 /* if nothing there, acquire lock and cond_wait */ 847 accepted = StreamDequeue(listener->acceptq); 848 if (!accepted) { 849 if (listener->socket->opts & MTCP_NONBLOCK) { 850 errno = EAGAIN; 851 return -1; 852 853 } else { 854 pthread_mutex_lock(&listener->accept_lock); 855 while ((accepted = StreamDequeue(listener->acceptq)) == NULL) { 856 pthread_cond_wait(&listener->accept_cond, &listener->accept_lock); 857 858 if (mtcp->ctx->done || mtcp->ctx->exit) { 859 pthread_mutex_unlock(&listener->accept_lock); 860 errno = EINTR; 861 return -1; 862 } 863 } 864 pthread_mutex_unlock(&listener->accept_lock); 865 } 866 } 867 868 if (!accepted) { 869 TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n"); 870 } 871 872 if (!accepted->socket) { 873 socket = AllocateSocket(mctx, MOS_SOCK_STREAM); 874 if (!socket) { 875 TRACE_ERROR("Failed to create new socket!\n"); 876 /* TODO: destroy the stream */ 877 errno = ENFILE; 878 return -1; 879 } 880 socket->stream = accepted; 881 accepted->socket = socket; 882 883 /* set socket addr parameters */ 884 socket->saddr.sin_family = AF_INET; 885 socket->saddr.sin_port = accepted->dport; 886 socket->saddr.sin_addr.s_addr = accepted->daddr; 887 888 /* if monitor is enabled, complete the socket assignment */ 889 if (socket->stream->pair_stream != NULL) 890 socket->stream->pair_stream->socket = socket; 891 } 892 893 if (!(listener->socket->epoll & MOS_EPOLLET) && 894 !StreamQueueIsEmpty(listener->acceptq)) 895 AddEpollEvent(mtcp->ep, 896 USR_SHADOW_EVENT_QUEUE, 897 listener->socket, MOS_EPOLLIN); 898 899 TRACE_API("Stream %d accepted.\n", accepted->id); 900 901 if (addr && addrlen) { 902 struct sockaddr_in *addr_in = (struct sockaddr_in *)addr; 903 addr_in->sin_family = AF_INET; 904 addr_in->sin_port = accepted->dport; 905 addr_in->sin_addr.s_addr = accepted->daddr; 906 *addrlen = sizeof(struct sockaddr_in); 907 } 908 909 return accepted->socket->id; 910 } 911 /*----------------------------------------------------------------------------*/ 912 int 913 mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr, 914 in_addr_t daddr, in_addr_t dport) 915 { 916 mtcp_manager_t mtcp; 917 addr_pool_t ap; 918 919 mtcp = GetMTCPManager(mctx); 920 if (!mtcp) { 921 errno = EACCES; 922 return -1; 923 } 924 925 if (saddr_base == INADDR_ANY) { 926 int nif_out; 927 928 /* for the INADDR_ANY, find the output interface for the destination 929 and set the saddr_base as the ip address of the output interface */ 930 nif_out = GetOutputInterface(daddr); 931 if (nif_out < 0) { 932 TRACE_DBG("Could not determine nif idx!\n"); 933 errno = EINVAL; 934 return -1; 935 } 936 saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr; 937 } 938 939 ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus, 940 saddr_base, num_addr, daddr, dport); 941 if (!ap) { 942 errno = ENOMEM; 943 return -1; 944 } 945 946 mtcp->ap = ap; 947 948 return 0; 949 } 950 /*----------------------------------------------------------------------------*/ 951 int 952 eval_bpf_5tuple(struct sfbpf_program fcode, 953 in_addr_t saddr, in_port_t sport, 954 in_addr_t daddr, in_port_t dport) { 955 uint8_t buf[TOTAL_TCP_HEADER_LEN]; 956 struct ethhdr *ethh; 957 struct iphdr *iph; 958 struct tcphdr *tcph; 959 960 ethh = (struct ethhdr *)buf; 961 ethh->h_proto = htons(ETH_P_IP); 962 iph = (struct iphdr *)(ethh + 1); 963 iph->ihl = IP_HEADER_LEN >> 2; 964 iph->version = 4; 965 iph->tos = 0; 966 iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN); 967 iph->id = htons(0); 968 iph->protocol = IPPROTO_TCP; 969 iph->saddr = saddr; 970 iph->daddr = daddr; 971 iph->check = 0; 972 tcph = (struct tcphdr *)(iph + 1); 973 tcph->source = sport; 974 tcph->dest = dport; 975 976 return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr), 977 TOTAL_TCP_HEADER_LEN); 978 } 979 /*----------------------------------------------------------------------------*/ 980 int 981 mtcp_connect(mctx_t mctx, int sockid, 982 const struct sockaddr *addr, socklen_t addrlen) 983 { 984 mtcp_manager_t mtcp; 985 socket_map_t socket; 986 tcp_stream *cur_stream; 987 struct sockaddr_in *addr_in; 988 in_addr_t dip; 989 in_port_t dport; 990 int is_dyn_bound = FALSE; 991 int ret, nif; 992 int cnt_match = 0; 993 struct mon_listener *walk; 994 struct sfbpf_program fcode; 995 996 cur_stream = NULL; 997 mtcp = GetMTCPManager(mctx); 998 if (!mtcp) { 999 errno = EACCES; 1000 return -1; 1001 } 1002 1003 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1004 TRACE_API("Socket id %d out of range.\n", sockid); 1005 errno = EBADF; 1006 return -1; 1007 } 1008 1009 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 1010 TRACE_API("Invalid socket id: %d\n", sockid); 1011 errno = EBADF; 1012 return -1; 1013 } 1014 1015 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 1016 TRACE_API("Not an end socket. id: %d\n", sockid); 1017 errno = ENOTSOCK; 1018 return -1; 1019 } 1020 1021 if (!addr) { 1022 TRACE_API("Socket %d: empty address!\n", sockid); 1023 errno = EFAULT; 1024 return -1; 1025 } 1026 1027 /* we only allow bind() for AF_INET address */ 1028 if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 1029 TRACE_API("Socket %d: invalid argument!\n", sockid); 1030 errno = EAFNOSUPPORT; 1031 return -1; 1032 } 1033 1034 socket = &mtcp->smap[sockid]; 1035 if (socket->stream) { 1036 TRACE_API("Socket %d: stream already exist!\n", sockid); 1037 if (socket->stream->state >= TCP_ST_ESTABLISHED) { 1038 errno = EISCONN; 1039 } else { 1040 errno = EALREADY; 1041 } 1042 return -1; 1043 } 1044 1045 addr_in = (struct sockaddr_in *)addr; 1046 dip = addr_in->sin_addr.s_addr; 1047 dport = addr_in->sin_port; 1048 1049 /* address binding */ 1050 if (socket->opts & MTCP_ADDR_BIND && 1051 socket->saddr.sin_port != INPORT_ANY && 1052 socket->saddr.sin_addr.s_addr != INADDR_ANY) { 1053 int rss_core; 1054 1055 rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip, 1056 socket->saddr.sin_port, dport, num_queues); 1057 1058 if (rss_core != mctx->cpu) { 1059 errno = EINVAL; 1060 return -1; 1061 } 1062 } else { 1063 if (mtcp->ap) { 1064 ret = FetchAddressPerCore(mtcp->ap, 1065 mctx->cpu, num_queues, addr_in, &socket->saddr); 1066 } else { 1067 nif = GetOutputInterface(dip); 1068 if (nif < 0) { 1069 errno = EINVAL; 1070 return -1; 1071 } 1072 ret = FetchAddress(ap[nif], 1073 mctx->cpu, num_queues, addr_in, &socket->saddr); 1074 } 1075 if (ret < 0) { 1076 errno = EAGAIN; 1077 return -1; 1078 } 1079 socket->opts |= MTCP_ADDR_BIND; 1080 is_dyn_bound = TRUE; 1081 } 1082 1083 cnt_match = 0; 1084 if (mtcp->num_msp > 0) { 1085 TAILQ_FOREACH(walk, &mtcp->monitors, link) { 1086 fcode = walk->stream_syn_fcode; 1087 if (!(ISSET_BPFFILTER(fcode) && 1088 eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr, 1089 socket->saddr.sin_port, 1090 dip, dport) == 0)) { 1091 walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1 1092 cnt_match++; 1093 } 1094 } 1095 } 1096 1097 if (mtcp->num_msp > 0 && cnt_match > 0) { 1098 /* 150820 dhkim: XXX: embedded mode is not verified */ 1099 #if 1 1100 cur_stream = CreateClientTCPStream(mtcp, socket, 1101 STREAM_TYPE(MOS_SOCK_STREAM) | 1102 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 1103 socket->saddr.sin_addr.s_addr, 1104 socket->saddr.sin_port, dip, dport, NULL); 1105 #else 1106 cur_stream = CreateDualTCPStream(mtcp, socket, 1107 STREAM_TYPE(MOS_SOCK_STREAM) | 1108 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 1109 socket->saddr.sin_addr.s_addr, 1110 socket->saddr.sin_port, dip, dport, NULL); 1111 #endif 1112 } 1113 else 1114 cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM), 1115 socket->saddr.sin_addr.s_addr, 1116 socket->saddr.sin_port, dip, dport, NULL); 1117 if (!cur_stream) { 1118 TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid); 1119 errno = ENOMEM; 1120 return -1; 1121 } 1122 1123 if (is_dyn_bound) 1124 cur_stream->is_bound_addr = TRUE; 1125 cur_stream->sndvar->cwnd = 1; 1126 cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10; 1127 cur_stream->side = MOS_SIDE_CLI; 1128 /* if monitor is enabled, update the pair stream side as well */ 1129 if (cur_stream->pair_stream) { 1130 cur_stream->pair_stream->side = MOS_SIDE_SVR; 1131 /* 1132 * if buffer management is off, then disable 1133 * monitoring tcp ring of server... 1134 * if there is even a single monitor asking for 1135 * buffer management, enable it (that's why the 1136 * need for the loop) 1137 */ 1138 cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF; 1139 struct socket_map *walk; 1140 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 1141 uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt; 1142 if (bm > cur_stream->pair_stream->buffer_mgmt) { 1143 cur_stream->pair_stream->buffer_mgmt = bm; 1144 break; 1145 } 1146 } SOCKQ_FOREACH_END; 1147 } 1148 1149 cur_stream->state = TCP_ST_SYN_SENT; 1150 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1151 1152 TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id); 1153 1154 SQ_LOCK(&mtcp->ctx->connect_lock); 1155 ret = StreamEnqueue(mtcp->connectq, cur_stream); 1156 SQ_UNLOCK(&mtcp->ctx->connect_lock); 1157 mtcp->wakeup_flag = TRUE; 1158 if (ret < 0) { 1159 TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid); 1160 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1161 StreamEnqueue(mtcp->destroyq, cur_stream); 1162 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1163 errno = EAGAIN; 1164 return -1; 1165 } 1166 1167 /* if nonblocking socket, return EINPROGRESS */ 1168 if (socket->opts & MTCP_NONBLOCK) { 1169 errno = EINPROGRESS; 1170 return -1; 1171 1172 } else { 1173 while (1) { 1174 if (!cur_stream) { 1175 TRACE_ERROR("STREAM DESTROYED\n"); 1176 errno = ETIMEDOUT; 1177 return -1; 1178 } 1179 if (cur_stream->state > TCP_ST_ESTABLISHED) { 1180 TRACE_ERROR("Socket %d: weird state %s\n", 1181 sockid, TCPStateToString(cur_stream)); 1182 // TODO: how to handle this? 1183 errno = ENOSYS; 1184 return -1; 1185 } 1186 1187 if (cur_stream->state == TCP_ST_ESTABLISHED) { 1188 break; 1189 } 1190 usleep(1000); 1191 } 1192 } 1193 1194 return 0; 1195 } 1196 /*----------------------------------------------------------------------------*/ 1197 static inline int 1198 CloseStreamSocket(mctx_t mctx, int sockid) 1199 { 1200 mtcp_manager_t mtcp; 1201 tcp_stream *cur_stream; 1202 int ret; 1203 1204 mtcp = GetMTCPManager(mctx); 1205 if (!mtcp) { 1206 errno = EACCES; 1207 return -1; 1208 } 1209 1210 cur_stream = mtcp->smap[sockid].stream; 1211 if (!cur_stream) { 1212 TRACE_API("Socket %d: stream does not exist.\n", sockid); 1213 errno = ENOTCONN; 1214 return -1; 1215 } 1216 1217 if (cur_stream->closed) { 1218 TRACE_API("Socket %d (Stream %u): already closed stream\n", 1219 sockid, cur_stream->id); 1220 return 0; 1221 } 1222 cur_stream->closed = TRUE; 1223 1224 TRACE_API("Stream %d: closing the stream.\n", cur_stream->id); 1225 1226 /* 141029 dhkim: Check this! */ 1227 cur_stream->socket = NULL; 1228 1229 if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 1230 TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n", 1231 cur_stream->id); 1232 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1233 StreamEnqueue(mtcp->destroyq, cur_stream); 1234 mtcp->wakeup_flag = TRUE; 1235 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1236 return 0; 1237 1238 } else if (cur_stream->state == TCP_ST_SYN_SENT) { 1239 #if 1 1240 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1241 StreamEnqueue(mtcp->destroyq, cur_stream); 1242 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1243 mtcp->wakeup_flag = TRUE; 1244 #endif 1245 return -1; 1246 1247 } else if (cur_stream->state != TCP_ST_ESTABLISHED && 1248 cur_stream->state != TCP_ST_CLOSE_WAIT) { 1249 TRACE_API("Stream %d at state %s\n", 1250 cur_stream->id, TCPStateToString(cur_stream)); 1251 errno = EBADF; 1252 return -1; 1253 } 1254 1255 SQ_LOCK(&mtcp->ctx->close_lock); 1256 cur_stream->sndvar->on_closeq = TRUE; 1257 ret = StreamEnqueue(mtcp->closeq, cur_stream); 1258 mtcp->wakeup_flag = TRUE; 1259 SQ_UNLOCK(&mtcp->ctx->close_lock); 1260 1261 if (ret < 0) { 1262 TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 1263 errno = EAGAIN; 1264 return -1; 1265 } 1266 1267 return 0; 1268 } 1269 /*----------------------------------------------------------------------------*/ 1270 static inline int 1271 CloseListeningSocket(mctx_t mctx, int sockid) 1272 { 1273 mtcp_manager_t mtcp; 1274 struct tcp_listener *listener; 1275 1276 mtcp = GetMTCPManager(mctx); 1277 if (!mtcp) { 1278 errno = EACCES; 1279 return -1; 1280 } 1281 1282 listener = mtcp->smap[sockid].listener; 1283 if (!listener) { 1284 errno = EINVAL; 1285 return -1; 1286 } 1287 1288 if (listener->acceptq) { 1289 DestroyStreamQueue(listener->acceptq); 1290 listener->acceptq = NULL; 1291 } 1292 1293 pthread_mutex_lock(&listener->accept_lock); 1294 pthread_cond_signal(&listener->accept_cond); 1295 pthread_mutex_unlock(&listener->accept_lock); 1296 1297 pthread_cond_destroy(&listener->accept_cond); 1298 pthread_mutex_destroy(&listener->accept_lock); 1299 1300 free(listener); 1301 mtcp->smap[sockid].listener = NULL; 1302 1303 return 0; 1304 } 1305 /*----------------------------------------------------------------------------*/ 1306 int 1307 mtcp_close(mctx_t mctx, int sockid) 1308 { 1309 mtcp_manager_t mtcp; 1310 int ret; 1311 1312 mtcp = GetMTCPManager(mctx); 1313 if (!mtcp) { 1314 errno = EACCES; 1315 return -1; 1316 } 1317 1318 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1319 TRACE_API("Socket id %d out of range.\n", sockid); 1320 errno = EBADF; 1321 return -1; 1322 } 1323 1324 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 1325 TRACE_API("Invalid socket id: %d\n", sockid); 1326 errno = EBADF; 1327 return -1; 1328 } 1329 1330 TRACE_API("Socket %d: mtcp_close called.\n", sockid); 1331 1332 switch (mtcp->smap[sockid].socktype) { 1333 case MOS_SOCK_STREAM: 1334 ret = CloseStreamSocket(mctx, sockid); 1335 break; 1336 1337 case MOS_SOCK_STREAM_LISTEN: 1338 ret = CloseListeningSocket(mctx, sockid); 1339 break; 1340 1341 case MOS_SOCK_EPOLL: 1342 ret = CloseEpollSocket(mctx, sockid); 1343 break; 1344 1345 case MOS_SOCK_PIPE: 1346 ret = PipeClose(mctx, sockid); 1347 break; 1348 1349 default: 1350 errno = EINVAL; 1351 ret = -1; 1352 break; 1353 } 1354 1355 FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 1356 1357 return ret; 1358 } 1359 /*----------------------------------------------------------------------------*/ 1360 int 1361 mtcp_abort(mctx_t mctx, int sockid) 1362 { 1363 mtcp_manager_t mtcp; 1364 tcp_stream *cur_stream; 1365 int ret; 1366 1367 mtcp = GetMTCPManager(mctx); 1368 if (!mtcp) { 1369 errno = EACCES; 1370 return -1; 1371 } 1372 1373 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1374 TRACE_API("Socket id %d out of range.\n", sockid); 1375 errno = EBADF; 1376 return -1; 1377 } 1378 1379 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 1380 TRACE_API("Invalid socket id: %d\n", sockid); 1381 errno = EBADF; 1382 return -1; 1383 } 1384 1385 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 1386 TRACE_API("Not an end socket. id: %d\n", sockid); 1387 errno = ENOTSOCK; 1388 return -1; 1389 } 1390 1391 cur_stream = mtcp->smap[sockid].stream; 1392 if (!cur_stream) { 1393 TRACE_API("Stream %d: does not exist.\n", sockid); 1394 errno = ENOTCONN; 1395 return -1; 1396 } 1397 1398 TRACE_API("Socket %d: mtcp_abort()\n", sockid); 1399 1400 FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 1401 cur_stream->socket = NULL; 1402 1403 if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 1404 TRACE_API("Stream %d: connection already reset.\n", sockid); 1405 return ERROR; 1406 1407 } else if (cur_stream->state == TCP_ST_SYN_SENT) { 1408 /* TODO: this should notify event failure to all 1409 previous read() or write() calls */ 1410 cur_stream->state = TCP_ST_CLOSED_RSVD; 1411 cur_stream->close_reason = TCP_ACTIVE_CLOSE; 1412 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1413 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1414 StreamEnqueue(mtcp->destroyq, cur_stream); 1415 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1416 mtcp->wakeup_flag = TRUE; 1417 return 0; 1418 1419 } else if (cur_stream->state == TCP_ST_CLOSING || 1420 cur_stream->state == TCP_ST_LAST_ACK || 1421 cur_stream->state == TCP_ST_TIME_WAIT) { 1422 cur_stream->state = TCP_ST_CLOSED_RSVD; 1423 cur_stream->close_reason = TCP_ACTIVE_CLOSE; 1424 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1425 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1426 StreamEnqueue(mtcp->destroyq, cur_stream); 1427 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1428 mtcp->wakeup_flag = TRUE; 1429 return 0; 1430 } 1431 1432 /* the stream structure will be destroyed after sending RST */ 1433 if (cur_stream->sndvar->on_resetq) { 1434 TRACE_ERROR("Stream %d: calling mtcp_abort() " 1435 "when in reset queue.\n", sockid); 1436 errno = ECONNRESET; 1437 return -1; 1438 } 1439 SQ_LOCK(&mtcp->ctx->reset_lock); 1440 cur_stream->sndvar->on_resetq = TRUE; 1441 ret = StreamEnqueue(mtcp->resetq, cur_stream); 1442 SQ_UNLOCK(&mtcp->ctx->reset_lock); 1443 mtcp->wakeup_flag = TRUE; 1444 1445 if (ret < 0) { 1446 TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 1447 errno = EAGAIN; 1448 return -1; 1449 } 1450 1451 return 0; 1452 } 1453 /*----------------------------------------------------------------------------*/ 1454 static inline int 1455 PeekForUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len) 1456 { 1457 struct tcp_recv_vars *rcvvar = cur_stream->rcvvar; 1458 int copylen; 1459 tcprb_t *rb = rcvvar->rcvbuf; 1460 1461 if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) { 1462 errno = EAGAIN; 1463 return -1; 1464 } 1465 1466 return copylen; 1467 } 1468 /*----------------------------------------------------------------------------*/ 1469 static inline int 1470 CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len) 1471 { 1472 struct tcp_recv_vars *rcvvar = cur_stream->rcvvar; 1473 int copylen; 1474 tcprb_t *rb = rcvvar->rcvbuf; 1475 if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) { 1476 errno = EAGAIN; 1477 return -1; 1478 } 1479 tcprb_setpile(rb, rb->pile + copylen); 1480 1481 rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb); 1482 //printf("rcv_wnd: %d\n", rcvvar->rcv_wnd); 1483 1484 /* Advertise newly freed receive buffer */ 1485 if (cur_stream->need_wnd_adv) { 1486 if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) { 1487 if (!cur_stream->sndvar->on_ackq) { 1488 SQ_LOCK(&mtcp->ctx->ackq_lock); 1489 cur_stream->sndvar->on_ackq = TRUE; 1490 StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */ 1491 SQ_UNLOCK(&mtcp->ctx->ackq_lock); 1492 cur_stream->need_wnd_adv = FALSE; 1493 mtcp->wakeup_flag = TRUE; 1494 } 1495 } 1496 } 1497 1498 return copylen; 1499 } 1500 /*----------------------------------------------------------------------------*/ 1501 ssize_t 1502 mtcp_recv(mctx_t mctx, int sockid, char *buf, size_t len, int flags) 1503 { 1504 mtcp_manager_t mtcp; 1505 socket_map_t socket; 1506 tcp_stream *cur_stream; 1507 struct tcp_recv_vars *rcvvar; 1508 int event_remaining, merged_len; 1509 int ret; 1510 1511 mtcp = GetMTCPManager(mctx); 1512 if (!mtcp) { 1513 errno = EACCES; 1514 return -1; 1515 } 1516 1517 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1518 TRACE_API("Socket id %d out of range.\n", sockid); 1519 errno = EBADF; 1520 return -1; 1521 } 1522 1523 socket = &mtcp->smap[sockid]; 1524 if (socket->socktype == MOS_SOCK_UNUSED) { 1525 TRACE_API("Invalid socket id: %d\n", sockid); 1526 errno = EBADF; 1527 return -1; 1528 } 1529 1530 if (socket->socktype == MOS_SOCK_PIPE) { 1531 return PipeRead(mctx, sockid, buf, len); 1532 } 1533 1534 if (socket->socktype != MOS_SOCK_STREAM) { 1535 TRACE_API("Not an end socket. id: %d\n", sockid); 1536 errno = ENOTSOCK; 1537 return -1; 1538 } 1539 1540 /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 1541 cur_stream = socket->stream; 1542 if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf || 1543 !(cur_stream->state >= TCP_ST_ESTABLISHED && 1544 cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 1545 errno = ENOTCONN; 1546 return -1; 1547 } 1548 1549 rcvvar = cur_stream->rcvvar; 1550 1551 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1552 1553 /* if CLOSE_WAIT, return 0 if there is no payload */ 1554 if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 1555 if (merged_len == 0) 1556 return 0; 1557 } 1558 1559 /* return EAGAIN if no receive buffer */ 1560 if (socket->opts & MTCP_NONBLOCK) { 1561 if (merged_len == 0) { 1562 errno = EAGAIN; 1563 return -1; 1564 } 1565 } 1566 1567 SBUF_LOCK(&rcvvar->read_lock); 1568 1569 switch (flags) { 1570 case 0: 1571 ret = CopyToUser(mtcp, cur_stream, buf, len); 1572 break; 1573 case MSG_PEEK: 1574 ret = PeekForUser(mtcp, cur_stream, buf, len); 1575 break; 1576 default: 1577 SBUF_UNLOCK(&rcvvar->read_lock); 1578 ret = -1; 1579 errno = EINVAL; 1580 return ret; 1581 } 1582 1583 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1584 event_remaining = FALSE; 1585 /* if there are remaining payload, generate EPOLLIN */ 1586 /* (may due to insufficient user buffer) */ 1587 if (socket->epoll & MOS_EPOLLIN) { 1588 if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 1589 event_remaining = TRUE; 1590 } 1591 } 1592 /* if waiting for close, notify it if no remaining data */ 1593 if (cur_stream->state == TCP_ST_CLOSE_WAIT && 1594 merged_len == 0 && ret > 0) { 1595 event_remaining = TRUE; 1596 } 1597 1598 SBUF_UNLOCK(&rcvvar->read_lock); 1599 1600 if (event_remaining) { 1601 if (socket->epoll) { 1602 AddEpollEvent(mtcp->ep, 1603 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 1604 } 1605 } 1606 1607 TRACE_API("Stream %d: mtcp_recv() returning %d\n", cur_stream->id, ret); 1608 return ret; 1609 } 1610 /*----------------------------------------------------------------------------*/ 1611 inline ssize_t 1612 mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len) 1613 { 1614 return mtcp_recv(mctx, sockid, buf, len, 0); 1615 } 1616 /*----------------------------------------------------------------------------*/ 1617 ssize_t 1618 mtcp_readv(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV) 1619 { 1620 mtcp_manager_t mtcp; 1621 socket_map_t socket; 1622 tcp_stream *cur_stream; 1623 struct tcp_recv_vars *rcvvar; 1624 int ret, bytes_read, i; 1625 int event_remaining, merged_len; 1626 1627 mtcp = GetMTCPManager(mctx); 1628 if (!mtcp) { 1629 errno = EACCES; 1630 return -1; 1631 } 1632 1633 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1634 TRACE_API("Socket id %d out of range.\n", sockid); 1635 errno = EBADF; 1636 return -1; 1637 } 1638 1639 socket = &mtcp->smap[sockid]; 1640 if (socket->socktype == MOS_SOCK_UNUSED) { 1641 TRACE_API("Invalid socket id: %d\n", sockid); 1642 errno = EBADF; 1643 return -1; 1644 } 1645 1646 if (socket->socktype != MOS_SOCK_STREAM) { 1647 TRACE_API("Not an end socket. id: %d\n", sockid); 1648 errno = ENOTSOCK; 1649 return -1; 1650 } 1651 1652 /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 1653 cur_stream = socket->stream; 1654 if (!cur_stream || !cur_stream->rcvvar->rcvbuf || 1655 !(cur_stream->state >= TCP_ST_ESTABLISHED && 1656 cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 1657 errno = ENOTCONN; 1658 return -1; 1659 } 1660 1661 rcvvar = cur_stream->rcvvar; 1662 1663 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1664 1665 /* if CLOSE_WAIT, return 0 if there is no payload */ 1666 if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 1667 if (merged_len == 0) 1668 return 0; 1669 } 1670 1671 /* return EAGAIN if no receive buffer */ 1672 if (socket->opts & MTCP_NONBLOCK) { 1673 if (merged_len == 0) { 1674 errno = EAGAIN; 1675 return -1; 1676 } 1677 } 1678 1679 SBUF_LOCK(&rcvvar->read_lock); 1680 1681 /* read and store the contents to the vectored buffers */ 1682 bytes_read = 0; 1683 for (i = 0; i < numIOV; i++) { 1684 if (iov[i].iov_len <= 0) 1685 continue; 1686 1687 ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 1688 if (ret <= 0) 1689 break; 1690 1691 bytes_read += ret; 1692 1693 if (ret < iov[i].iov_len) 1694 break; 1695 } 1696 1697 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1698 1699 event_remaining = FALSE; 1700 /* if there are remaining payload, generate read event */ 1701 /* (may due to insufficient user buffer) */ 1702 if (socket->epoll & MOS_EPOLLIN) { 1703 if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 1704 event_remaining = TRUE; 1705 } 1706 } 1707 /* if waiting for close, notify it if no remaining data */ 1708 if (cur_stream->state == TCP_ST_CLOSE_WAIT && 1709 merged_len == 0 && bytes_read > 0) { 1710 event_remaining = TRUE; 1711 } 1712 1713 SBUF_UNLOCK(&rcvvar->read_lock); 1714 1715 if(event_remaining) { 1716 if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) { 1717 AddEpollEvent(mtcp->ep, 1718 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 1719 } 1720 } 1721 1722 TRACE_API("Stream %d: mtcp_readv() returning %d\n", 1723 cur_stream->id, bytes_read); 1724 return bytes_read; 1725 } 1726 /*----------------------------------------------------------------------------*/ 1727 static inline int 1728 CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, const char *buf, int len) 1729 { 1730 struct tcp_send_vars *sndvar = cur_stream->sndvar; 1731 int sndlen; 1732 int ret; 1733 1734 sndlen = MIN((int)sndvar->snd_wnd, len); 1735 if (sndlen <= 0) { 1736 errno = EAGAIN; 1737 return -1; 1738 } 1739 1740 /* allocate send buffer if not exist */ 1741 if (!sndvar->sndbuf) { 1742 sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1); 1743 if (!sndvar->sndbuf) { 1744 cur_stream->close_reason = TCP_NO_MEM; 1745 /* notification may not required due to -1 return */ 1746 errno = ENOMEM; 1747 return -1; 1748 } 1749 } 1750 1751 ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen); 1752 assert(ret == sndlen); 1753 sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len; 1754 if (ret <= 0) { 1755 TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n", 1756 ret, sndlen, sndvar->sndbuf->len); 1757 errno = EAGAIN; 1758 return -1; 1759 } 1760 1761 if (sndvar->snd_wnd <= 0) { 1762 TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n", 1763 cur_stream->id, sndvar->snd_wnd); 1764 } 1765 1766 return ret; 1767 } 1768 /*----------------------------------------------------------------------------*/ 1769 ssize_t 1770 mtcp_write(mctx_t mctx, int sockid, const char *buf, size_t len) 1771 { 1772 mtcp_manager_t mtcp; 1773 socket_map_t socket; 1774 tcp_stream *cur_stream; 1775 struct tcp_send_vars *sndvar; 1776 int ret; 1777 1778 mtcp = GetMTCPManager(mctx); 1779 if (!mtcp) { 1780 errno = EACCES; 1781 return -1; 1782 } 1783 1784 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1785 TRACE_API("Socket id %d out of range.\n", sockid); 1786 errno = EBADF; 1787 return -1; 1788 } 1789 1790 socket = &mtcp->smap[sockid]; 1791 if (socket->socktype == MOS_SOCK_UNUSED) { 1792 TRACE_API("Invalid socket id: %d\n", sockid); 1793 errno = EBADF; 1794 return -1; 1795 } 1796 1797 if (socket->socktype == MOS_SOCK_PIPE) { 1798 return PipeWrite(mctx, sockid, buf, len); 1799 } 1800 1801 if (socket->socktype != MOS_SOCK_STREAM) { 1802 TRACE_API("Not an end socket. id: %d\n", sockid); 1803 errno = ENOTSOCK; 1804 return -1; 1805 } 1806 1807 cur_stream = socket->stream; 1808 if (!cur_stream || 1809 !(cur_stream->state == TCP_ST_ESTABLISHED || 1810 cur_stream->state == TCP_ST_CLOSE_WAIT)) { 1811 errno = ENOTCONN; 1812 return -1; 1813 } 1814 1815 if (len <= 0) { 1816 if (socket->opts & MTCP_NONBLOCK) { 1817 errno = EAGAIN; 1818 return -1; 1819 } else { 1820 return 0; 1821 } 1822 } 1823 1824 sndvar = cur_stream->sndvar; 1825 1826 SBUF_LOCK(&sndvar->write_lock); 1827 ret = CopyFromUser(mtcp, cur_stream, buf, len); 1828 1829 SBUF_UNLOCK(&sndvar->write_lock); 1830 1831 if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 1832 SQ_LOCK(&mtcp->ctx->sendq_lock); 1833 sndvar->on_sendq = TRUE; 1834 StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 1835 SQ_UNLOCK(&mtcp->ctx->sendq_lock); 1836 mtcp->wakeup_flag = TRUE; 1837 } 1838 1839 if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) { 1840 ret = -1; 1841 errno = EAGAIN; 1842 } 1843 1844 /* if there are remaining sending buffer, generate write event */ 1845 if (sndvar->snd_wnd > 0) { 1846 if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 1847 AddEpollEvent(mtcp->ep, 1848 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 1849 } 1850 } 1851 1852 TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret); 1853 return ret; 1854 } 1855 /*----------------------------------------------------------------------------*/ 1856 ssize_t 1857 mtcp_writev(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV) 1858 { 1859 mtcp_manager_t mtcp; 1860 socket_map_t socket; 1861 tcp_stream *cur_stream; 1862 struct tcp_send_vars *sndvar; 1863 int ret, to_write, i; 1864 1865 mtcp = GetMTCPManager(mctx); 1866 if (!mtcp) { 1867 errno = EACCES; 1868 return -1; 1869 } 1870 1871 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1872 TRACE_API("Socket id %d out of range.\n", sockid); 1873 errno = EBADF; 1874 return -1; 1875 } 1876 1877 socket = &mtcp->smap[sockid]; 1878 if (socket->socktype == MOS_SOCK_UNUSED) { 1879 TRACE_API("Invalid socket id: %d\n", sockid); 1880 errno = EBADF; 1881 return -1; 1882 } 1883 1884 if (socket->socktype != MOS_SOCK_STREAM) { 1885 TRACE_API("Not an end socket. id: %d\n", sockid); 1886 errno = ENOTSOCK; 1887 return -1; 1888 } 1889 1890 cur_stream = socket->stream; 1891 if (!cur_stream || 1892 !(cur_stream->state == TCP_ST_ESTABLISHED || 1893 cur_stream->state == TCP_ST_CLOSE_WAIT)) { 1894 errno = ENOTCONN; 1895 return -1; 1896 } 1897 1898 sndvar = cur_stream->sndvar; 1899 SBUF_LOCK(&sndvar->write_lock); 1900 1901 /* write from the vectored buffers */ 1902 to_write = 0; 1903 for (i = 0; i < numIOV; i++) { 1904 if (iov[i].iov_len <= 0) 1905 continue; 1906 1907 ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 1908 if (ret <= 0) 1909 break; 1910 1911 to_write += ret; 1912 1913 if (ret < iov[i].iov_len) 1914 break; 1915 } 1916 SBUF_UNLOCK(&sndvar->write_lock); 1917 1918 if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 1919 SQ_LOCK(&mtcp->ctx->sendq_lock); 1920 sndvar->on_sendq = TRUE; 1921 StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 1922 SQ_UNLOCK(&mtcp->ctx->sendq_lock); 1923 mtcp->wakeup_flag = TRUE; 1924 } 1925 1926 if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) { 1927 to_write = -1; 1928 errno = EAGAIN; 1929 } 1930 1931 /* if there are remaining sending buffer, generate write event */ 1932 if (sndvar->snd_wnd > 0) { 1933 if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 1934 AddEpollEvent(mtcp->ep, 1935 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 1936 } 1937 } 1938 1939 TRACE_API("Stream %d: mtcp_writev() returning %d\n", 1940 cur_stream->id, to_write); 1941 return to_write; 1942 } 1943 /*----------------------------------------------------------------------------*/ 1944 uint32_t 1945 mtcp_get_connection_cnt(mctx_t mctx) 1946 { 1947 mtcp_manager_t mtcp; 1948 mtcp = GetMTCPManager(mctx); 1949 if (!mtcp) { 1950 errno = EACCES; 1951 return -1; 1952 } 1953 1954 if (mtcp->num_msp > 0) 1955 return mtcp->flow_cnt / 2; 1956 else 1957 return mtcp->flow_cnt; 1958 } 1959 /*----------------------------------------------------------------------------*/ 1960