1 #include <sys/queue.h> 2 #include <sys/ioctl.h> 3 #include <limits.h> 4 #include <unistd.h> 5 #include <assert.h> 6 #include <string.h> 7 8 #ifdef DARWIN 9 #include <netinet/tcp.h> 10 #include <netinet/ip.h> 11 #include <netinet/if_ether.h> 12 #endif 13 14 #include "mtcp.h" 15 #include "mtcp_api.h" 16 #include "tcp_in.h" 17 #include "tcp_stream.h" 18 #include "tcp_out.h" 19 #include "ip_out.h" 20 #include "eventpoll.h" 21 #include "pipe.h" 22 #include "fhash.h" 23 #include "addr_pool.h" 24 #include "util.h" 25 #include "config.h" 26 #include "debug.h" 27 #include "eventpoll.h" 28 #include "mos_api.h" 29 #include "tcp_rb.h" 30 31 #define MAX(a, b) ((a)>(b)?(a):(b)) 32 #define MIN(a, b) ((a)<(b)?(a):(b)) 33 34 /*----------------------------------------------------------------------------*/ 35 /** Stop monitoring the socket! (function prototype) 36 * @param [in] mctx: mtcp context 37 * @param [in] sock: monitoring stream socket id 38 * @param [in] side: side of monitoring (client side, server side or both) 39 * 40 * This function is now DEPRECATED and is only used within mOS core... 41 */ 42 int 43 mtcp_cb_stop(mctx_t mctx, int sock, int side); 44 /*----------------------------------------------------------------------------*/ 45 /** Reset the connection (send RST packets to both sides) 46 * (We need to decide the API for this.) 47 */ 48 //int 49 //mtcp_cb_reset(mctx_t mctx, int sock, int side); 50 /*----------------------------------------------------------------------------*/ 51 inline mtcp_manager_t 52 GetMTCPManager(mctx_t mctx) 53 { 54 if (!mctx) { 55 errno = EACCES; 56 return NULL; 57 } 58 59 if (mctx->cpu < 0 || mctx->cpu >= num_cpus) { 60 errno = EINVAL; 61 return NULL; 62 } 63 64 if (!g_mtcp[mctx->cpu] || g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) { 65 errno = EPERM; 66 return NULL; 67 } 68 69 return g_mtcp[mctx->cpu]; 70 } 71 /*----------------------------------------------------------------------------*/ 72 static inline int 73 GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen) 74 { 75 tcp_stream *cur_stream; 76 77 if (!socket->stream) { 78 errno = EBADF; 79 return -1; 80 } 81 82 cur_stream = socket->stream; 83 if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 84 if (cur_stream->close_reason == TCP_TIMEDOUT || 85 cur_stream->close_reason == TCP_CONN_FAIL || 86 cur_stream->close_reason == TCP_CONN_LOST) { 87 *(int *)optval = ETIMEDOUT; 88 *optlen = sizeof(int); 89 90 return 0; 91 } 92 } 93 94 if (cur_stream->state == TCP_ST_CLOSE_WAIT || 95 cur_stream->state == TCP_ST_CLOSED_RSVD) { 96 if (cur_stream->close_reason == TCP_RESET) { 97 *(int *)optval = ECONNRESET; 98 *optlen = sizeof(int); 99 100 return 0; 101 } 102 } 103 104 if (cur_stream->state == TCP_ST_SYN_SENT && 105 errno == EINPROGRESS) { 106 *(int *)optval = errno; 107 *optlen = sizeof(int); 108 109 return -1; 110 } 111 112 /* 113 * `base case`: If socket sees no so_error, then 114 * this also means close_reason will always be 115 * TCP_NOT_CLOSED. 116 */ 117 if (cur_stream->close_reason == TCP_NOT_CLOSED) { 118 *(int *)optval = 0; 119 *optlen = sizeof(int); 120 121 return 0; 122 } 123 124 errno = ENOSYS; 125 return -1; 126 } 127 /*----------------------------------------------------------------------------*/ 128 int 129 mtcp_getsockname(mctx_t mctx, int sockid, struct sockaddr *addr, 130 socklen_t *addrlen) 131 { 132 mtcp_manager_t mtcp; 133 socket_map_t socket; 134 135 mtcp = GetMTCPManager(mctx); 136 if (!mtcp) { 137 return -1; 138 } 139 140 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 141 TRACE_API("Socket id %d out of range.\n", sockid); 142 errno = EBADF; 143 return -1; 144 } 145 146 socket = &mtcp->smap[sockid]; 147 if (socket->socktype == MOS_SOCK_UNUSED) { 148 TRACE_API("Invalid socket id: %d\n", sockid); 149 errno = EBADF; 150 return -1; 151 } 152 153 if (*addrlen <= 0) { 154 TRACE_API("Invalid addrlen: %d\n", *addrlen); 155 errno = EINVAL; 156 return -1; 157 } 158 159 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 160 socket->socktype != MOS_SOCK_STREAM) { 161 TRACE_API("Invalid socket id: %d\n", sockid); 162 errno = ENOTSOCK; 163 return -1; 164 } 165 166 *(struct sockaddr_in *)addr = socket->saddr; 167 *addrlen = sizeof(socket->saddr); 168 169 return 0; 170 } 171 /*----------------------------------------------------------------------------*/ 172 int 173 mtcp_getsockopt(mctx_t mctx, int sockid, int level, 174 int optname, void *optval, socklen_t *optlen) 175 { 176 mtcp_manager_t mtcp; 177 socket_map_t socket; 178 179 mtcp = GetMTCPManager(mctx); 180 if (!mtcp) { 181 errno = EACCES; 182 return -1; 183 } 184 185 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 186 TRACE_API("Socket id %d out of range.\n", sockid); 187 errno = EBADF; 188 return -1; 189 } 190 191 switch (level) { 192 case SOL_SOCKET: 193 socket = &mtcp->smap[sockid]; 194 if (socket->socktype == MOS_SOCK_UNUSED) { 195 TRACE_API("Invalid socket id: %d\n", sockid); 196 errno = EBADF; 197 return -1; 198 } 199 200 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 201 socket->socktype != MOS_SOCK_STREAM) { 202 TRACE_API("Invalid socket id: %d\n", sockid); 203 errno = ENOTSOCK; 204 return -1; 205 } 206 207 if (optname == SO_ERROR) { 208 if (socket->socktype == MOS_SOCK_STREAM) { 209 return GetSocketError(socket, optval, optlen); 210 } 211 } 212 break; 213 case SOL_MONSOCKET: 214 /* check if the calling thread is in MOS context */ 215 if (mtcp->ctx->thread != pthread_self()) { 216 errno = EPERM; 217 return -1; 218 } 219 /* 220 * All options will only work for active 221 * monitor stream sockets 222 */ 223 socket = &mtcp->msmap[sockid]; 224 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 225 TRACE_API("Invalid socket id: %d\n", sockid); 226 errno = ENOTSOCK; 227 return -1; 228 } 229 230 switch (optname) { 231 case MOS_FRAGINFO_CLIBUF: 232 return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen); 233 case MOS_FRAGINFO_SVRBUF: 234 return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen); 235 case MOS_INFO_CLIBUF: 236 return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen); 237 case MOS_INFO_SVRBUF: 238 return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen); 239 case MOS_TCP_STATE_CLI: 240 return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI, 241 optval, optlen); 242 case MOS_TCP_STATE_SVR: 243 return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR, 244 optval, optlen); 245 case MOS_TIMESTAMP: 246 return GetLastTimestamp(socket->monitor_stream->stream, 247 (uint32_t *)optval, 248 optlen); 249 default: 250 TRACE_API("can't recognize the optname=%d\n", optname); 251 assert(0); 252 } 253 break; 254 } 255 errno = ENOSYS; 256 return -1; 257 } 258 /*----------------------------------------------------------------------------*/ 259 int 260 mtcp_setsockopt(mctx_t mctx, int sockid, int level, 261 int optname, const void *optval, socklen_t optlen) 262 { 263 mtcp_manager_t mtcp; 264 socket_map_t socket; 265 tcprb_t *rb; 266 267 mtcp = GetMTCPManager(mctx); 268 if (!mtcp) { 269 errno = EACCES; 270 return -1; 271 } 272 273 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 274 TRACE_API("Socket id %d out of range.\n", sockid); 275 errno = EBADF; 276 return -1; 277 } 278 279 switch (level) { 280 case SOL_SOCKET: 281 socket = &mtcp->smap[sockid]; 282 if (socket->socktype == MOS_SOCK_UNUSED) { 283 TRACE_API("Invalid socket id: %d\n", sockid); 284 errno = EBADF; 285 return -1; 286 } 287 288 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 289 socket->socktype != MOS_SOCK_STREAM) { 290 TRACE_API("Invalid socket id: %d\n", sockid); 291 errno = ENOTSOCK; 292 return -1; 293 } 294 break; 295 case SOL_MONSOCKET: 296 socket = &mtcp->msmap[sockid]; 297 /* 298 * checking of calling thread to be in MOS context is 299 * disabled since both optnames can be called from 300 * `application' context (on passive sockets) 301 */ 302 /* 303 * if (mtcp->ctx->thread != pthread_self()) 304 * return -1; 305 */ 306 307 switch (optname) { 308 case MOS_CLIOVERLAP: 309 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 310 socket->monitor_stream->stream->rcvvar->rcvbuf : 311 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 312 if (rb == NULL) { 313 errno = EFAULT; 314 return -1; 315 } 316 if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) { 317 errno = EINVAL; 318 return -1; 319 } else 320 return 0; 321 break; 322 case MOS_SVROVERLAP: 323 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 324 socket->monitor_stream->stream->rcvvar->rcvbuf : 325 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 326 if (rb == NULL) { 327 errno = EFAULT; 328 return -1; 329 } 330 if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) { 331 errno = EINVAL; 332 return -1; 333 } else 334 return 0; 335 break; 336 case MOS_CLIBUF: 337 #if 0 338 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 339 errno = EBADF; 340 return -1; 341 } 342 #endif 343 #ifdef DISABLE_DYN_RESIZE 344 if (*(int *)optval != 0) 345 return -1; 346 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 347 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 348 socket->monitor_stream->stream->rcvvar->rcvbuf : 349 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 350 if (rb) { 351 tcprb_resize_meta(rb, 0); 352 tcprb_resize(rb, 0); 353 } 354 } 355 return DisableBuf(socket, MOS_SIDE_CLI); 356 #else 357 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 358 socket->monitor_stream->stream->rcvvar->rcvbuf : 359 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 360 if (tcprb_resize_meta(rb, *(int *)optval) < 0) 361 return -1; 362 return tcprb_resize(rb, 363 (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 364 #endif 365 case MOS_SVRBUF: 366 #if 0 367 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 368 errno = EBADF; 369 return -1; 370 } 371 #endif 372 #ifdef DISABLE_DYN_RESIZE 373 if (*(int *)optval != 0) 374 return -1; 375 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 376 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 377 socket->monitor_stream->stream->rcvvar->rcvbuf : 378 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 379 if (rb) { 380 tcprb_resize_meta(rb, 0); 381 tcprb_resize(rb, 0); 382 } 383 } 384 return DisableBuf(socket, MOS_SIDE_SVR); 385 #else 386 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 387 socket->monitor_stream->stream->rcvvar->rcvbuf : 388 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 389 if (tcprb_resize_meta(rb, *(int *)optval) < 0) 390 return -1; 391 return tcprb_resize(rb, 392 (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 393 #endif 394 case MOS_FRAG_CLIBUF: 395 #if 0 396 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 397 errno = EBADF; 398 return -1; 399 } 400 #endif 401 #ifdef DISABLE_DYN_RESIZE 402 if (*(int *)optval != 0) 403 return -1; 404 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 405 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 406 socket->monitor_stream->stream->rcvvar->rcvbuf : 407 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 408 if (rb) 409 tcprb_resize(rb, 0); 410 } 411 return 0; 412 #else 413 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 414 socket->monitor_stream->stream->rcvvar->rcvbuf : 415 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 416 if (rb->len == 0) 417 return tcprb_resize_meta(rb, *(int *)optval); 418 else 419 return -1; 420 #endif 421 case MOS_FRAG_SVRBUF: 422 #if 0 423 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 424 errno = EBADF; 425 return -1; 426 } 427 #endif 428 #ifdef DISABLE_DYN_RESIZE 429 if (*(int *)optval != 0) 430 return -1; 431 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 432 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 433 socket->monitor_stream->stream->rcvvar->rcvbuf : 434 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 435 if (rb) 436 tcprb_resize(rb, 0); 437 } 438 return 0; 439 #else 440 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 441 socket->monitor_stream->stream->rcvvar->rcvbuf : 442 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 443 if (rb->len == 0) 444 return tcprb_resize_meta(rb, *(int *)optval); 445 else 446 return -1; 447 #endif 448 case MOS_SEQ_REMAP: 449 break; 450 case MOS_STOP_MON: 451 return mtcp_cb_stop(mctx, sockid, *(int *)optval); 452 default: 453 TRACE_API("invalid optname=%d\n", optname); 454 assert(0); 455 } 456 break; 457 } 458 459 errno = ENOSYS; 460 return -1; 461 } 462 /*----------------------------------------------------------------------------*/ 463 int 464 mtcp_setsock_nonblock(mctx_t mctx, int sockid) 465 { 466 mtcp_manager_t mtcp; 467 468 mtcp = GetMTCPManager(mctx); 469 if (!mtcp) { 470 errno = EACCES; 471 return -1; 472 } 473 474 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 475 TRACE_API("Socket id %d out of range.\n", sockid); 476 errno = EBADF; 477 return -1; 478 } 479 480 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 481 TRACE_API("Invalid socket id: %d\n", sockid); 482 errno = EBADF; 483 return -1; 484 } 485 486 mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 487 488 return 0; 489 } 490 /*----------------------------------------------------------------------------*/ 491 int 492 mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp) 493 { 494 mtcp_manager_t mtcp; 495 socket_map_t socket; 496 497 mtcp = GetMTCPManager(mctx); 498 if (!mtcp) { 499 errno = EACCES; 500 return -1; 501 } 502 503 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 504 TRACE_API("Socket id %d out of range.\n", sockid); 505 errno = EBADF; 506 return -1; 507 } 508 509 /* only support stream socket */ 510 socket = &mtcp->smap[sockid]; 511 512 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 513 socket->socktype != MOS_SOCK_STREAM) { 514 TRACE_API("Invalid socket id: %d\n", sockid); 515 errno = EBADF; 516 return -1; 517 } 518 519 if (!argp) { 520 errno = EFAULT; 521 return -1; 522 } 523 524 if (request == FIONREAD) { 525 tcp_stream *cur_stream; 526 tcprb_t *rbuf; 527 528 cur_stream = socket->stream; 529 if (!cur_stream) { 530 errno = EBADF; 531 return -1; 532 } 533 534 rbuf = cur_stream->rcvvar->rcvbuf; 535 *(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0; 536 537 } else if (request == FIONBIO) { 538 /* 539 * sockets can only be set to blocking/non-blocking 540 * modes during initialization 541 */ 542 if ((*(int *)argp)) 543 mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 544 else 545 mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK; 546 } else { 547 errno = EINVAL; 548 return -1; 549 } 550 551 return 0; 552 } 553 /*----------------------------------------------------------------------------*/ 554 static int 555 mtcp_monitor(mctx_t mctx, socket_map_t sock) 556 { 557 mtcp_manager_t mtcp; 558 struct mon_listener *monitor; 559 int sockid = sock->id; 560 561 mtcp = GetMTCPManager(mctx); 562 if (!mtcp) { 563 errno = EACCES; 564 return -1; 565 } 566 567 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 568 TRACE_API("Socket id %d out of range.\n", sockid); 569 errno = EBADF; 570 return -1; 571 } 572 573 if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) { 574 TRACE_API("Invalid socket id: %d\n", sockid); 575 errno = EBADF; 576 return -1; 577 } 578 579 if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM || 580 mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) { 581 TRACE_API("Not a monitor socket. id: %d\n", sockid); 582 errno = ENOTSOCK; 583 return -1; 584 } 585 586 monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener)); 587 if (!monitor) { 588 /* errno set from the malloc() */ 589 errno = ENOMEM; 590 return -1; 591 } 592 593 /* create a monitor-specific event queue */ 594 monitor->eq = CreateEventQueue(g_config.mos->max_concurrency); 595 if (!monitor->eq) { 596 TRACE_API("Can't create event queue (concurrency: %d) for " 597 "monitor read event registrations!\n", 598 g_config.mos->max_concurrency); 599 free(monitor); 600 errno = ENOMEM; 601 return -1; 602 } 603 604 /* set monitor-related basic parameters */ 605 #ifndef NEWEV 606 monitor->ude_id = UDE_OFFSET; 607 #endif 608 monitor->socket = sock; 609 monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL; 610 611 /* perform both sides monitoring by default */ 612 monitor->client_mon = monitor->server_mon = 1; 613 614 /* add monitor socket to the monitor list */ 615 TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link); 616 617 mtcp->msmap[sockid].monitor_listener = monitor; 618 619 return 0; 620 } 621 /*----------------------------------------------------------------------------*/ 622 int 623 mtcp_socket(mctx_t mctx, int domain, int type, int protocol) 624 { 625 mtcp_manager_t mtcp; 626 socket_map_t socket; 627 628 mtcp = GetMTCPManager(mctx); 629 if (!mtcp) { 630 errno = EACCES; 631 return -1; 632 } 633 634 if (domain != AF_INET) { 635 errno = EAFNOSUPPORT; 636 return -1; 637 } 638 639 if (type == (int)SOCK_STREAM) { 640 type = MOS_SOCK_STREAM; 641 } else if (type == MOS_SOCK_MONITOR_STREAM || 642 type == MOS_SOCK_MONITOR_RAW) { 643 /* do nothing for the time being */ 644 } else { 645 /* Not supported type */ 646 errno = EINVAL; 647 return -1; 648 } 649 650 socket = AllocateSocket(mctx, type); 651 if (!socket) { 652 errno = ENFILE; 653 return -1; 654 } 655 656 if (type == MOS_SOCK_MONITOR_STREAM || 657 type == MOS_SOCK_MONITOR_RAW) { 658 mtcp_manager_t mtcp = GetMTCPManager(mctx); 659 if (!mtcp) { 660 errno = EACCES; 661 return -1; 662 } 663 mtcp_monitor(mctx, socket); 664 #ifdef NEWEV 665 socket->monitor_listener->stree_dontcare = NULL; 666 socket->monitor_listener->stree_pre_rcv = NULL; 667 socket->monitor_listener->stree_post_snd = NULL; 668 #else 669 InitEvB(mtcp, &socket->monitor_listener->dontcare_evb); 670 InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb); 671 InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb); 672 #endif 673 } 674 675 return socket->id; 676 } 677 /*----------------------------------------------------------------------------*/ 678 int 679 mtcp_bind(mctx_t mctx, int sockid, 680 const struct sockaddr *addr, socklen_t addrlen) 681 { 682 mtcp_manager_t mtcp; 683 struct sockaddr_in *addr_in; 684 685 mtcp = GetMTCPManager(mctx); 686 if (!mtcp) { 687 errno = EACCES; 688 return -1; 689 } 690 691 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 692 TRACE_API("Socket id %d out of range.\n", sockid); 693 errno = EBADF; 694 return -1; 695 } 696 697 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 698 TRACE_API("Invalid socket id: %d\n", sockid); 699 errno = EBADF; 700 return -1; 701 } 702 703 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM && 704 mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 705 TRACE_API("Not a stream socket id: %d\n", sockid); 706 errno = ENOTSOCK; 707 return -1; 708 } 709 710 if (!addr) { 711 TRACE_API("Socket %d: empty address!\n", sockid); 712 errno = EINVAL; 713 return -1; 714 } 715 716 if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) { 717 TRACE_API("Socket %d: adress already bind for this socket.\n", sockid); 718 errno = EINVAL; 719 return -1; 720 } 721 722 /* we only allow bind() for AF_INET address */ 723 if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 724 TRACE_API("Socket %d: invalid argument!\n", sockid); 725 errno = EINVAL; 726 return -1; 727 } 728 729 if (mtcp->listener) { 730 TRACE_API("Address already bound!\n"); 731 errno = EINVAL; 732 return -1; 733 } 734 addr_in = (struct sockaddr_in *)addr; 735 mtcp->smap[sockid].saddr = *addr_in; 736 mtcp->smap[sockid].opts |= MTCP_ADDR_BIND; 737 738 return 0; 739 } 740 /*----------------------------------------------------------------------------*/ 741 int 742 mtcp_listen(mctx_t mctx, int sockid, int backlog) 743 { 744 mtcp_manager_t mtcp; 745 struct tcp_listener *listener; 746 747 mtcp = GetMTCPManager(mctx); 748 if (!mtcp) { 749 errno = EACCES; 750 return -1; 751 } 752 753 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 754 TRACE_API("Socket id %d out of range.\n", sockid); 755 errno = EBADF; 756 return -1; 757 } 758 759 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 760 TRACE_API("Invalid socket id: %d\n", sockid); 761 errno = EBADF; 762 return -1; 763 } 764 765 if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) { 766 mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN; 767 } 768 769 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 770 TRACE_API("Not a listening socket. id: %d\n", sockid); 771 errno = ENOTSOCK; 772 return -1; 773 } 774 775 if (backlog <= 0 || backlog > g_config.mos->max_concurrency) { 776 errno = EINVAL; 777 return -1; 778 } 779 780 listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener)); 781 if (!listener) { 782 /* errno set from the malloc() */ 783 errno = ENOMEM; 784 return -1; 785 } 786 787 listener->sockid = sockid; 788 listener->backlog = backlog; 789 listener->socket = &mtcp->smap[sockid]; 790 791 if (pthread_cond_init(&listener->accept_cond, NULL)) { 792 perror("pthread_cond_init of ctx->accept_cond\n"); 793 /* errno set by pthread_cond_init() */ 794 free(listener); 795 return -1; 796 } 797 if (pthread_mutex_init(&listener->accept_lock, NULL)) { 798 perror("pthread_mutex_init of ctx->accept_lock\n"); 799 /* errno set by pthread_mutex_init() */ 800 free(listener); 801 return -1; 802 } 803 804 listener->acceptq = CreateStreamQueue(backlog); 805 if (!listener->acceptq) { 806 free(listener); 807 errno = ENOMEM; 808 return -1; 809 } 810 811 mtcp->smap[sockid].listener = listener; 812 mtcp->listener = listener; 813 814 return 0; 815 } 816 /*----------------------------------------------------------------------------*/ 817 int 818 mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen) 819 { 820 mtcp_manager_t mtcp; 821 struct tcp_listener *listener; 822 socket_map_t socket; 823 tcp_stream *accepted = NULL; 824 825 mtcp = GetMTCPManager(mctx); 826 if (!mtcp) { 827 errno = EACCES; 828 return -1; 829 } 830 831 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 832 TRACE_API("Socket id %d out of range.\n", sockid); 833 errno = EBADF; 834 return -1; 835 } 836 837 /* requires listening socket */ 838 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 839 errno = EINVAL; 840 return -1; 841 } 842 843 listener = mtcp->smap[sockid].listener; 844 845 /* dequeue from the acceptq without lock first */ 846 /* if nothing there, acquire lock and cond_wait */ 847 accepted = StreamDequeue(listener->acceptq); 848 if (!accepted) { 849 if (listener->socket->opts & MTCP_NONBLOCK) { 850 errno = EAGAIN; 851 return -1; 852 853 } else { 854 pthread_mutex_lock(&listener->accept_lock); 855 while ((accepted = StreamDequeue(listener->acceptq)) == NULL) { 856 pthread_cond_wait(&listener->accept_cond, &listener->accept_lock); 857 858 if (mtcp->ctx->done || mtcp->ctx->exit) { 859 pthread_mutex_unlock(&listener->accept_lock); 860 errno = EINTR; 861 return -1; 862 } 863 } 864 pthread_mutex_unlock(&listener->accept_lock); 865 } 866 } 867 868 if (!accepted) { 869 TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n"); 870 } 871 872 if (!accepted->socket) { 873 socket = AllocateSocket(mctx, MOS_SOCK_STREAM); 874 if (!socket) { 875 TRACE_ERROR("Failed to create new socket!\n"); 876 /* TODO: destroy the stream */ 877 errno = ENFILE; 878 return -1; 879 } 880 socket->stream = accepted; 881 accepted->socket = socket; 882 883 /* set socket addr parameters */ 884 socket->saddr.sin_family = AF_INET; 885 socket->saddr.sin_port = accepted->dport; 886 socket->saddr.sin_addr.s_addr = accepted->daddr; 887 888 /* if monitor is enabled, complete the socket assignment */ 889 if (socket->stream->pair_stream != NULL) 890 socket->stream->pair_stream->socket = socket; 891 } 892 893 if (!(listener->socket->epoll & MOS_EPOLLET) && 894 !StreamQueueIsEmpty(listener->acceptq)) 895 AddEpollEvent(mtcp->ep, 896 USR_SHADOW_EVENT_QUEUE, 897 listener->socket, MOS_EPOLLIN); 898 899 TRACE_API("Stream %d accepted.\n", accepted->id); 900 901 if (addr && addrlen) { 902 struct sockaddr_in *addr_in = (struct sockaddr_in *)addr; 903 addr_in->sin_family = AF_INET; 904 addr_in->sin_port = accepted->dport; 905 addr_in->sin_addr.s_addr = accepted->daddr; 906 *addrlen = sizeof(struct sockaddr_in); 907 } 908 909 return accepted->socket->id; 910 } 911 /*----------------------------------------------------------------------------*/ 912 int 913 mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr, 914 in_addr_t daddr, in_addr_t dport) 915 { 916 mtcp_manager_t mtcp; 917 addr_pool_t ap; 918 919 mtcp = GetMTCPManager(mctx); 920 if (!mtcp) { 921 errno = EACCES; 922 return -1; 923 } 924 925 if (saddr_base == INADDR_ANY) { 926 int nif_out; 927 928 /* for the INADDR_ANY, find the output interface for the destination 929 and set the saddr_base as the ip address of the output interface */ 930 nif_out = GetOutputInterface(daddr); 931 if (nif_out < 0) { 932 TRACE_DBG("Could not determine nif idx!\n"); 933 errno = EINVAL; 934 return -1; 935 } 936 saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr; 937 } 938 939 ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus, 940 saddr_base, num_addr, daddr, dport); 941 if (!ap) { 942 errno = ENOMEM; 943 return -1; 944 } 945 946 mtcp->ap = ap; 947 948 return 0; 949 } 950 /*----------------------------------------------------------------------------*/ 951 int 952 eval_bpf_5tuple(struct sfbpf_program fcode, 953 in_addr_t saddr, in_port_t sport, 954 in_addr_t daddr, in_port_t dport) { 955 uint8_t buf[TOTAL_TCP_HEADER_LEN]; 956 struct ethhdr *ethh; 957 struct iphdr *iph; 958 struct tcphdr *tcph; 959 960 ethh = (struct ethhdr *)buf; 961 ethh->h_proto = htons(ETH_P_IP); 962 iph = (struct iphdr *)(ethh + 1); 963 iph->ihl = IP_HEADER_LEN >> 2; 964 iph->version = 4; 965 iph->tos = 0; 966 iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN); 967 iph->id = htons(0); 968 iph->protocol = IPPROTO_TCP; 969 iph->saddr = saddr; 970 iph->daddr = daddr; 971 iph->check = 0; 972 tcph = (struct tcphdr *)(iph + 1); 973 tcph->source = sport; 974 tcph->dest = dport; 975 976 return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr), 977 TOTAL_TCP_HEADER_LEN); 978 } 979 /*----------------------------------------------------------------------------*/ 980 int 981 mtcp_connect(mctx_t mctx, int sockid, 982 const struct sockaddr *addr, socklen_t addrlen) 983 { 984 mtcp_manager_t mtcp; 985 socket_map_t socket; 986 tcp_stream *cur_stream; 987 struct sockaddr_in *addr_in; 988 in_addr_t dip; 989 in_port_t dport; 990 int is_dyn_bound = FALSE; 991 int ret, nif, endian_check; 992 int cnt_match = 0; 993 struct mon_listener *walk; 994 struct sfbpf_program fcode; 995 996 cur_stream = NULL; 997 mtcp = GetMTCPManager(mctx); 998 if (!mtcp) { 999 errno = EACCES; 1000 return -1; 1001 } 1002 1003 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1004 TRACE_API("Socket id %d out of range.\n", sockid); 1005 errno = EBADF; 1006 return -1; 1007 } 1008 1009 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 1010 TRACE_API("Invalid socket id: %d\n", sockid); 1011 errno = EBADF; 1012 return -1; 1013 } 1014 1015 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 1016 TRACE_API("Not an end socket. id: %d\n", sockid); 1017 errno = ENOTSOCK; 1018 return -1; 1019 } 1020 1021 if (!addr) { 1022 TRACE_API("Socket %d: empty address!\n", sockid); 1023 errno = EFAULT; 1024 return -1; 1025 } 1026 1027 /* we only allow bind() for AF_INET address */ 1028 if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 1029 TRACE_API("Socket %d: invalid argument!\n", sockid); 1030 errno = EAFNOSUPPORT; 1031 return -1; 1032 } 1033 1034 socket = &mtcp->smap[sockid]; 1035 if (socket->stream) { 1036 TRACE_API("Socket %d: stream already exist!\n", sockid); 1037 if (socket->stream->state >= TCP_ST_ESTABLISHED) { 1038 errno = EISCONN; 1039 } else { 1040 errno = EALREADY; 1041 } 1042 return -1; 1043 } 1044 1045 addr_in = (struct sockaddr_in *)addr; 1046 dip = addr_in->sin_addr.s_addr; 1047 dport = addr_in->sin_port; 1048 1049 /* address binding */ 1050 if (socket->opts & MTCP_ADDR_BIND && 1051 socket->saddr.sin_port != INPORT_ANY && 1052 socket->saddr.sin_addr.s_addr != INADDR_ANY) { 1053 int rss_core; 1054 1055 endian_check = FetchEndianType(); 1056 rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip, 1057 socket->saddr.sin_port, dport, num_queues, 1058 endian_check); 1059 1060 if (rss_core != mctx->cpu) { 1061 errno = EINVAL; 1062 return -1; 1063 } 1064 } else { 1065 if (mtcp->ap) { 1066 ret = FetchAddress(mtcp->ap, 1067 mctx->cpu, num_queues, addr_in, &socket->saddr); 1068 } else { 1069 nif = GetOutputInterface(dip); 1070 if (nif < 0) { 1071 errno = EINVAL; 1072 return -1; 1073 } 1074 ret = FetchAddress(ap[nif], 1075 mctx->cpu, num_queues, addr_in, &socket->saddr); 1076 } 1077 if (ret < 0) { 1078 errno = EAGAIN; 1079 return -1; 1080 } 1081 socket->opts |= MTCP_ADDR_BIND; 1082 is_dyn_bound = TRUE; 1083 } 1084 1085 cnt_match = 0; 1086 if (mtcp->num_msp > 0) { 1087 TAILQ_FOREACH(walk, &mtcp->monitors, link) { 1088 fcode = walk->stream_syn_fcode; 1089 if (!(ISSET_BPFFILTER(fcode) && 1090 eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr, 1091 socket->saddr.sin_port, 1092 dip, dport) == 0)) { 1093 walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1 1094 cnt_match++; 1095 } 1096 } 1097 } 1098 1099 if (mtcp->num_msp > 0 && cnt_match > 0) { 1100 /* 150820 dhkim: XXX: embedded mode is not verified */ 1101 #if 1 1102 cur_stream = CreateClientTCPStream(mtcp, socket, 1103 STREAM_TYPE(MOS_SOCK_STREAM) | 1104 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 1105 socket->saddr.sin_addr.s_addr, 1106 socket->saddr.sin_port, dip, dport, NULL); 1107 #else 1108 cur_stream = CreateDualTCPStream(mtcp, socket, 1109 STREAM_TYPE(MOS_SOCK_STREAM) | 1110 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 1111 socket->saddr.sin_addr.s_addr, 1112 socket->saddr.sin_port, dip, dport, NULL); 1113 #endif 1114 } 1115 else 1116 cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM), 1117 socket->saddr.sin_addr.s_addr, 1118 socket->saddr.sin_port, dip, dport, NULL); 1119 if (!cur_stream) { 1120 TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid); 1121 errno = ENOMEM; 1122 return -1; 1123 } 1124 1125 if (is_dyn_bound) 1126 cur_stream->is_bound_addr = TRUE; 1127 cur_stream->sndvar->cwnd = 1; 1128 cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10; 1129 cur_stream->side = MOS_SIDE_CLI; 1130 /* if monitor is enabled, update the pair stream side as well */ 1131 if (cur_stream->pair_stream) { 1132 cur_stream->pair_stream->side = MOS_SIDE_SVR; 1133 /* 1134 * if buffer management is off, then disable 1135 * monitoring tcp ring of server... 1136 * if there is even a single monitor asking for 1137 * buffer management, enable it (that's why the 1138 * need for the loop) 1139 */ 1140 cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF; 1141 struct socket_map *walk; 1142 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 1143 uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt; 1144 if (bm > cur_stream->pair_stream->buffer_mgmt) { 1145 cur_stream->pair_stream->buffer_mgmt = bm; 1146 break; 1147 } 1148 } SOCKQ_FOREACH_END; 1149 } 1150 1151 cur_stream->state = TCP_ST_SYN_SENT; 1152 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1153 1154 TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id); 1155 1156 SQ_LOCK(&mtcp->ctx->connect_lock); 1157 ret = StreamEnqueue(mtcp->connectq, cur_stream); 1158 SQ_UNLOCK(&mtcp->ctx->connect_lock); 1159 mtcp->wakeup_flag = TRUE; 1160 if (ret < 0) { 1161 TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid); 1162 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1163 StreamEnqueue(mtcp->destroyq, cur_stream); 1164 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1165 errno = EAGAIN; 1166 return -1; 1167 } 1168 1169 /* if nonblocking socket, return EINPROGRESS */ 1170 if (socket->opts & MTCP_NONBLOCK) { 1171 errno = EINPROGRESS; 1172 return -1; 1173 1174 } else { 1175 while (1) { 1176 if (!cur_stream) { 1177 TRACE_ERROR("STREAM DESTROYED\n"); 1178 errno = ETIMEDOUT; 1179 return -1; 1180 } 1181 if (cur_stream->state > TCP_ST_ESTABLISHED) { 1182 TRACE_ERROR("Socket %d: weird state %s\n", 1183 sockid, TCPStateToString(cur_stream)); 1184 // TODO: how to handle this? 1185 errno = ENOSYS; 1186 return -1; 1187 } 1188 1189 if (cur_stream->state == TCP_ST_ESTABLISHED) { 1190 break; 1191 } 1192 usleep(1000); 1193 } 1194 } 1195 1196 return 0; 1197 } 1198 /*----------------------------------------------------------------------------*/ 1199 static inline int 1200 CloseStreamSocket(mctx_t mctx, int sockid) 1201 { 1202 mtcp_manager_t mtcp; 1203 tcp_stream *cur_stream; 1204 int ret; 1205 1206 mtcp = GetMTCPManager(mctx); 1207 if (!mtcp) { 1208 errno = EACCES; 1209 return -1; 1210 } 1211 1212 cur_stream = mtcp->smap[sockid].stream; 1213 if (!cur_stream) { 1214 TRACE_API("Socket %d: stream does not exist.\n", sockid); 1215 errno = ENOTCONN; 1216 return -1; 1217 } 1218 1219 if (cur_stream->closed) { 1220 TRACE_API("Socket %d (Stream %u): already closed stream\n", 1221 sockid, cur_stream->id); 1222 return 0; 1223 } 1224 cur_stream->closed = TRUE; 1225 1226 TRACE_API("Stream %d: closing the stream.\n", cur_stream->id); 1227 1228 /* 141029 dhkim: Check this! */ 1229 cur_stream->socket = NULL; 1230 1231 if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 1232 TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n", 1233 cur_stream->id); 1234 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1235 StreamEnqueue(mtcp->destroyq, cur_stream); 1236 mtcp->wakeup_flag = TRUE; 1237 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1238 return 0; 1239 1240 } else if (cur_stream->state == TCP_ST_SYN_SENT) { 1241 #if 1 1242 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1243 StreamEnqueue(mtcp->destroyq, cur_stream); 1244 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1245 mtcp->wakeup_flag = TRUE; 1246 #endif 1247 return -1; 1248 1249 } else if (cur_stream->state != TCP_ST_ESTABLISHED && 1250 cur_stream->state != TCP_ST_CLOSE_WAIT) { 1251 TRACE_API("Stream %d at state %s\n", 1252 cur_stream->id, TCPStateToString(cur_stream)); 1253 errno = EBADF; 1254 return -1; 1255 } 1256 1257 SQ_LOCK(&mtcp->ctx->close_lock); 1258 cur_stream->sndvar->on_closeq = TRUE; 1259 ret = StreamEnqueue(mtcp->closeq, cur_stream); 1260 mtcp->wakeup_flag = TRUE; 1261 SQ_UNLOCK(&mtcp->ctx->close_lock); 1262 1263 if (ret < 0) { 1264 TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 1265 errno = EAGAIN; 1266 return -1; 1267 } 1268 1269 return 0; 1270 } 1271 /*----------------------------------------------------------------------------*/ 1272 static inline int 1273 CloseListeningSocket(mctx_t mctx, int sockid) 1274 { 1275 mtcp_manager_t mtcp; 1276 struct tcp_listener *listener; 1277 1278 mtcp = GetMTCPManager(mctx); 1279 if (!mtcp) { 1280 errno = EACCES; 1281 return -1; 1282 } 1283 1284 listener = mtcp->smap[sockid].listener; 1285 if (!listener) { 1286 errno = EINVAL; 1287 return -1; 1288 } 1289 1290 if (listener->acceptq) { 1291 DestroyStreamQueue(listener->acceptq); 1292 listener->acceptq = NULL; 1293 } 1294 1295 pthread_mutex_lock(&listener->accept_lock); 1296 pthread_cond_signal(&listener->accept_cond); 1297 pthread_mutex_unlock(&listener->accept_lock); 1298 1299 pthread_cond_destroy(&listener->accept_cond); 1300 pthread_mutex_destroy(&listener->accept_lock); 1301 1302 free(listener); 1303 mtcp->smap[sockid].listener = NULL; 1304 1305 return 0; 1306 } 1307 /*----------------------------------------------------------------------------*/ 1308 int 1309 mtcp_close(mctx_t mctx, int sockid) 1310 { 1311 mtcp_manager_t mtcp; 1312 int ret; 1313 1314 mtcp = GetMTCPManager(mctx); 1315 if (!mtcp) { 1316 errno = EACCES; 1317 return -1; 1318 } 1319 1320 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1321 TRACE_API("Socket id %d out of range.\n", sockid); 1322 errno = EBADF; 1323 return -1; 1324 } 1325 1326 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 1327 TRACE_API("Invalid socket id: %d\n", sockid); 1328 errno = EBADF; 1329 return -1; 1330 } 1331 1332 TRACE_API("Socket %d: mtcp_close called.\n", sockid); 1333 1334 switch (mtcp->smap[sockid].socktype) { 1335 case MOS_SOCK_STREAM: 1336 ret = CloseStreamSocket(mctx, sockid); 1337 break; 1338 1339 case MOS_SOCK_STREAM_LISTEN: 1340 ret = CloseListeningSocket(mctx, sockid); 1341 break; 1342 1343 case MOS_SOCK_EPOLL: 1344 ret = CloseEpollSocket(mctx, sockid); 1345 break; 1346 1347 case MOS_SOCK_PIPE: 1348 ret = PipeClose(mctx, sockid); 1349 break; 1350 1351 default: 1352 errno = EINVAL; 1353 ret = -1; 1354 break; 1355 } 1356 1357 FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 1358 1359 return ret; 1360 } 1361 /*----------------------------------------------------------------------------*/ 1362 int 1363 mtcp_abort(mctx_t mctx, int sockid) 1364 { 1365 mtcp_manager_t mtcp; 1366 tcp_stream *cur_stream; 1367 int ret; 1368 1369 mtcp = GetMTCPManager(mctx); 1370 if (!mtcp) { 1371 errno = EACCES; 1372 return -1; 1373 } 1374 1375 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1376 TRACE_API("Socket id %d out of range.\n", sockid); 1377 errno = EBADF; 1378 return -1; 1379 } 1380 1381 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 1382 TRACE_API("Invalid socket id: %d\n", sockid); 1383 errno = EBADF; 1384 return -1; 1385 } 1386 1387 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 1388 TRACE_API("Not an end socket. id: %d\n", sockid); 1389 errno = ENOTSOCK; 1390 return -1; 1391 } 1392 1393 cur_stream = mtcp->smap[sockid].stream; 1394 if (!cur_stream) { 1395 TRACE_API("Stream %d: does not exist.\n", sockid); 1396 errno = ENOTCONN; 1397 return -1; 1398 } 1399 1400 TRACE_API("Socket %d: mtcp_abort()\n", sockid); 1401 1402 FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 1403 cur_stream->socket = NULL; 1404 1405 if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 1406 TRACE_API("Stream %d: connection already reset.\n", sockid); 1407 return ERROR; 1408 1409 } else if (cur_stream->state == TCP_ST_SYN_SENT) { 1410 /* TODO: this should notify event failure to all 1411 previous read() or write() calls */ 1412 cur_stream->state = TCP_ST_CLOSED_RSVD; 1413 cur_stream->close_reason = TCP_ACTIVE_CLOSE; 1414 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1415 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1416 StreamEnqueue(mtcp->destroyq, cur_stream); 1417 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1418 mtcp->wakeup_flag = TRUE; 1419 return 0; 1420 1421 } else if (cur_stream->state == TCP_ST_CLOSING || 1422 cur_stream->state == TCP_ST_LAST_ACK || 1423 cur_stream->state == TCP_ST_TIME_WAIT) { 1424 cur_stream->state = TCP_ST_CLOSED_RSVD; 1425 cur_stream->close_reason = TCP_ACTIVE_CLOSE; 1426 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1427 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1428 StreamEnqueue(mtcp->destroyq, cur_stream); 1429 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1430 mtcp->wakeup_flag = TRUE; 1431 return 0; 1432 } 1433 1434 /* the stream structure will be destroyed after sending RST */ 1435 if (cur_stream->sndvar->on_resetq) { 1436 TRACE_ERROR("Stream %d: calling mtcp_abort() " 1437 "when in reset queue.\n", sockid); 1438 errno = ECONNRESET; 1439 return -1; 1440 } 1441 SQ_LOCK(&mtcp->ctx->reset_lock); 1442 cur_stream->sndvar->on_resetq = TRUE; 1443 ret = StreamEnqueue(mtcp->resetq, cur_stream); 1444 SQ_UNLOCK(&mtcp->ctx->reset_lock); 1445 mtcp->wakeup_flag = TRUE; 1446 1447 if (ret < 0) { 1448 TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 1449 errno = EAGAIN; 1450 return -1; 1451 } 1452 1453 return 0; 1454 } 1455 /*----------------------------------------------------------------------------*/ 1456 static inline int 1457 PeekForUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len) 1458 { 1459 struct tcp_recv_vars *rcvvar = cur_stream->rcvvar; 1460 int copylen; 1461 tcprb_t *rb = rcvvar->rcvbuf; 1462 1463 if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) { 1464 errno = EAGAIN; 1465 return -1; 1466 } 1467 1468 return copylen; 1469 } 1470 /*----------------------------------------------------------------------------*/ 1471 static inline int 1472 CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len) 1473 { 1474 struct tcp_recv_vars *rcvvar = cur_stream->rcvvar; 1475 int copylen; 1476 tcprb_t *rb = rcvvar->rcvbuf; 1477 if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) { 1478 errno = EAGAIN; 1479 return -1; 1480 } 1481 tcprb_setpile(rb, rb->pile + copylen); 1482 1483 rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb); 1484 //printf("rcv_wnd: %d\n", rcvvar->rcv_wnd); 1485 1486 /* Advertise newly freed receive buffer */ 1487 if (cur_stream->need_wnd_adv) { 1488 if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) { 1489 if (!cur_stream->sndvar->on_ackq) { 1490 SQ_LOCK(&mtcp->ctx->ackq_lock); 1491 cur_stream->sndvar->on_ackq = TRUE; 1492 StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */ 1493 SQ_UNLOCK(&mtcp->ctx->ackq_lock); 1494 cur_stream->need_wnd_adv = FALSE; 1495 mtcp->wakeup_flag = TRUE; 1496 } 1497 } 1498 } 1499 1500 return copylen; 1501 } 1502 /*----------------------------------------------------------------------------*/ 1503 ssize_t 1504 mtcp_recv(mctx_t mctx, int sockid, char *buf, size_t len, int flags) 1505 { 1506 mtcp_manager_t mtcp; 1507 socket_map_t socket; 1508 tcp_stream *cur_stream; 1509 struct tcp_recv_vars *rcvvar; 1510 int event_remaining, merged_len; 1511 int ret; 1512 1513 mtcp = GetMTCPManager(mctx); 1514 if (!mtcp) { 1515 errno = EACCES; 1516 return -1; 1517 } 1518 1519 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1520 TRACE_API("Socket id %d out of range.\n", sockid); 1521 errno = EBADF; 1522 return -1; 1523 } 1524 1525 socket = &mtcp->smap[sockid]; 1526 if (socket->socktype == MOS_SOCK_UNUSED) { 1527 TRACE_API("Invalid socket id: %d\n", sockid); 1528 errno = EBADF; 1529 return -1; 1530 } 1531 1532 if (socket->socktype == MOS_SOCK_PIPE) { 1533 return PipeRead(mctx, sockid, buf, len); 1534 } 1535 1536 if (socket->socktype != MOS_SOCK_STREAM) { 1537 TRACE_API("Not an end socket. id: %d\n", sockid); 1538 errno = ENOTSOCK; 1539 return -1; 1540 } 1541 1542 /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 1543 cur_stream = socket->stream; 1544 if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf || 1545 !(cur_stream->state >= TCP_ST_ESTABLISHED && 1546 cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 1547 errno = ENOTCONN; 1548 return -1; 1549 } 1550 1551 rcvvar = cur_stream->rcvvar; 1552 1553 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1554 1555 /* if CLOSE_WAIT, return 0 if there is no payload */ 1556 if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 1557 if (merged_len == 0) 1558 return 0; 1559 } 1560 1561 /* return EAGAIN if no receive buffer */ 1562 if (socket->opts & MTCP_NONBLOCK) { 1563 if (merged_len == 0) { 1564 errno = EAGAIN; 1565 return -1; 1566 } 1567 } 1568 1569 SBUF_LOCK(&rcvvar->read_lock); 1570 1571 switch (flags) { 1572 case 0: 1573 ret = CopyToUser(mtcp, cur_stream, buf, len); 1574 break; 1575 case MSG_PEEK: 1576 ret = PeekForUser(mtcp, cur_stream, buf, len); 1577 break; 1578 default: 1579 SBUF_UNLOCK(&rcvvar->read_lock); 1580 ret = -1; 1581 errno = EINVAL; 1582 return ret; 1583 } 1584 1585 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1586 event_remaining = FALSE; 1587 /* if there are remaining payload, generate EPOLLIN */ 1588 /* (may due to insufficient user buffer) */ 1589 if (socket->epoll & MOS_EPOLLIN) { 1590 if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 1591 event_remaining = TRUE; 1592 } 1593 } 1594 /* if waiting for close, notify it if no remaining data */ 1595 if (cur_stream->state == TCP_ST_CLOSE_WAIT && 1596 merged_len == 0 && ret > 0) { 1597 event_remaining = TRUE; 1598 } 1599 1600 SBUF_UNLOCK(&rcvvar->read_lock); 1601 1602 if (event_remaining) { 1603 if (socket->epoll) { 1604 AddEpollEvent(mtcp->ep, 1605 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 1606 } 1607 } 1608 1609 TRACE_API("Stream %d: mtcp_recv() returning %d\n", cur_stream->id, ret); 1610 return ret; 1611 } 1612 /*----------------------------------------------------------------------------*/ 1613 inline ssize_t 1614 mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len) 1615 { 1616 return mtcp_recv(mctx, sockid, buf, len, 0); 1617 } 1618 /*----------------------------------------------------------------------------*/ 1619 ssize_t 1620 mtcp_readv(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV) 1621 { 1622 mtcp_manager_t mtcp; 1623 socket_map_t socket; 1624 tcp_stream *cur_stream; 1625 struct tcp_recv_vars *rcvvar; 1626 int ret, bytes_read, i; 1627 int event_remaining, merged_len; 1628 1629 mtcp = GetMTCPManager(mctx); 1630 if (!mtcp) { 1631 errno = EACCES; 1632 return -1; 1633 } 1634 1635 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1636 TRACE_API("Socket id %d out of range.\n", sockid); 1637 errno = EBADF; 1638 return -1; 1639 } 1640 1641 socket = &mtcp->smap[sockid]; 1642 if (socket->socktype == MOS_SOCK_UNUSED) { 1643 TRACE_API("Invalid socket id: %d\n", sockid); 1644 errno = EBADF; 1645 return -1; 1646 } 1647 1648 if (socket->socktype != MOS_SOCK_STREAM) { 1649 TRACE_API("Not an end socket. id: %d\n", sockid); 1650 errno = ENOTSOCK; 1651 return -1; 1652 } 1653 1654 /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 1655 cur_stream = socket->stream; 1656 if (!cur_stream || !cur_stream->rcvvar->rcvbuf || 1657 !(cur_stream->state >= TCP_ST_ESTABLISHED && 1658 cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 1659 errno = ENOTCONN; 1660 return -1; 1661 } 1662 1663 rcvvar = cur_stream->rcvvar; 1664 1665 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1666 1667 /* if CLOSE_WAIT, return 0 if there is no payload */ 1668 if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 1669 if (merged_len == 0) 1670 return 0; 1671 } 1672 1673 /* return EAGAIN if no receive buffer */ 1674 if (socket->opts & MTCP_NONBLOCK) { 1675 if (merged_len == 0) { 1676 errno = EAGAIN; 1677 return -1; 1678 } 1679 } 1680 1681 SBUF_LOCK(&rcvvar->read_lock); 1682 1683 /* read and store the contents to the vectored buffers */ 1684 bytes_read = 0; 1685 for (i = 0; i < numIOV; i++) { 1686 if (iov[i].iov_len <= 0) 1687 continue; 1688 1689 ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 1690 if (ret <= 0) 1691 break; 1692 1693 bytes_read += ret; 1694 1695 if (ret < iov[i].iov_len) 1696 break; 1697 } 1698 1699 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1700 1701 event_remaining = FALSE; 1702 /* if there are remaining payload, generate read event */ 1703 /* (may due to insufficient user buffer) */ 1704 if (socket->epoll & MOS_EPOLLIN) { 1705 if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 1706 event_remaining = TRUE; 1707 } 1708 } 1709 /* if waiting for close, notify it if no remaining data */ 1710 if (cur_stream->state == TCP_ST_CLOSE_WAIT && 1711 merged_len == 0 && bytes_read > 0) { 1712 event_remaining = TRUE; 1713 } 1714 1715 SBUF_UNLOCK(&rcvvar->read_lock); 1716 1717 if(event_remaining) { 1718 if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) { 1719 AddEpollEvent(mtcp->ep, 1720 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 1721 } 1722 } 1723 1724 TRACE_API("Stream %d: mtcp_readv() returning %d\n", 1725 cur_stream->id, bytes_read); 1726 return bytes_read; 1727 } 1728 /*----------------------------------------------------------------------------*/ 1729 static inline int 1730 CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, const char *buf, int len) 1731 { 1732 struct tcp_send_vars *sndvar = cur_stream->sndvar; 1733 int sndlen; 1734 int ret; 1735 1736 sndlen = MIN((int)sndvar->snd_wnd, len); 1737 if (sndlen <= 0) { 1738 errno = EAGAIN; 1739 return -1; 1740 } 1741 1742 /* allocate send buffer if not exist */ 1743 if (!sndvar->sndbuf) { 1744 sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1); 1745 if (!sndvar->sndbuf) { 1746 cur_stream->close_reason = TCP_NO_MEM; 1747 /* notification may not required due to -1 return */ 1748 errno = ENOMEM; 1749 return -1; 1750 } 1751 } 1752 1753 ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen); 1754 assert(ret == sndlen); 1755 sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len; 1756 if (ret <= 0) { 1757 TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n", 1758 ret, sndlen, sndvar->sndbuf->len); 1759 errno = EAGAIN; 1760 return -1; 1761 } 1762 1763 if (sndvar->snd_wnd <= 0) { 1764 TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n", 1765 cur_stream->id, sndvar->snd_wnd); 1766 } 1767 1768 return ret; 1769 } 1770 /*----------------------------------------------------------------------------*/ 1771 ssize_t 1772 mtcp_write(mctx_t mctx, int sockid, const char *buf, size_t len) 1773 { 1774 mtcp_manager_t mtcp; 1775 socket_map_t socket; 1776 tcp_stream *cur_stream; 1777 struct tcp_send_vars *sndvar; 1778 int ret; 1779 1780 mtcp = GetMTCPManager(mctx); 1781 if (!mtcp) { 1782 errno = EACCES; 1783 return -1; 1784 } 1785 1786 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1787 TRACE_API("Socket id %d out of range.\n", sockid); 1788 errno = EBADF; 1789 return -1; 1790 } 1791 1792 socket = &mtcp->smap[sockid]; 1793 if (socket->socktype == MOS_SOCK_UNUSED) { 1794 TRACE_API("Invalid socket id: %d\n", sockid); 1795 errno = EBADF; 1796 return -1; 1797 } 1798 1799 if (socket->socktype == MOS_SOCK_PIPE) { 1800 return PipeWrite(mctx, sockid, buf, len); 1801 } 1802 1803 if (socket->socktype != MOS_SOCK_STREAM) { 1804 TRACE_API("Not an end socket. id: %d\n", sockid); 1805 errno = ENOTSOCK; 1806 return -1; 1807 } 1808 1809 cur_stream = socket->stream; 1810 if (!cur_stream || 1811 !(cur_stream->state == TCP_ST_ESTABLISHED || 1812 cur_stream->state == TCP_ST_CLOSE_WAIT)) { 1813 errno = ENOTCONN; 1814 return -1; 1815 } 1816 1817 if (len <= 0) { 1818 if (socket->opts & MTCP_NONBLOCK) { 1819 errno = EAGAIN; 1820 return -1; 1821 } else { 1822 return 0; 1823 } 1824 } 1825 1826 sndvar = cur_stream->sndvar; 1827 1828 SBUF_LOCK(&sndvar->write_lock); 1829 ret = CopyFromUser(mtcp, cur_stream, buf, len); 1830 1831 SBUF_UNLOCK(&sndvar->write_lock); 1832 1833 if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 1834 SQ_LOCK(&mtcp->ctx->sendq_lock); 1835 sndvar->on_sendq = TRUE; 1836 StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 1837 SQ_UNLOCK(&mtcp->ctx->sendq_lock); 1838 mtcp->wakeup_flag = TRUE; 1839 } 1840 1841 if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) { 1842 ret = -1; 1843 errno = EAGAIN; 1844 } 1845 1846 /* if there are remaining sending buffer, generate write event */ 1847 if (sndvar->snd_wnd > 0) { 1848 if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 1849 AddEpollEvent(mtcp->ep, 1850 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 1851 } 1852 } 1853 1854 TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret); 1855 return ret; 1856 } 1857 /*----------------------------------------------------------------------------*/ 1858 ssize_t 1859 mtcp_writev(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV) 1860 { 1861 mtcp_manager_t mtcp; 1862 socket_map_t socket; 1863 tcp_stream *cur_stream; 1864 struct tcp_send_vars *sndvar; 1865 int ret, to_write, i; 1866 1867 mtcp = GetMTCPManager(mctx); 1868 if (!mtcp) { 1869 errno = EACCES; 1870 return -1; 1871 } 1872 1873 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1874 TRACE_API("Socket id %d out of range.\n", sockid); 1875 errno = EBADF; 1876 return -1; 1877 } 1878 1879 socket = &mtcp->smap[sockid]; 1880 if (socket->socktype == MOS_SOCK_UNUSED) { 1881 TRACE_API("Invalid socket id: %d\n", sockid); 1882 errno = EBADF; 1883 return -1; 1884 } 1885 1886 if (socket->socktype != MOS_SOCK_STREAM) { 1887 TRACE_API("Not an end socket. id: %d\n", sockid); 1888 errno = ENOTSOCK; 1889 return -1; 1890 } 1891 1892 cur_stream = socket->stream; 1893 if (!cur_stream || 1894 !(cur_stream->state == TCP_ST_ESTABLISHED || 1895 cur_stream->state == TCP_ST_CLOSE_WAIT)) { 1896 errno = ENOTCONN; 1897 return -1; 1898 } 1899 1900 sndvar = cur_stream->sndvar; 1901 SBUF_LOCK(&sndvar->write_lock); 1902 1903 /* write from the vectored buffers */ 1904 to_write = 0; 1905 for (i = 0; i < numIOV; i++) { 1906 if (iov[i].iov_len <= 0) 1907 continue; 1908 1909 ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 1910 if (ret <= 0) 1911 break; 1912 1913 to_write += ret; 1914 1915 if (ret < iov[i].iov_len) 1916 break; 1917 } 1918 SBUF_UNLOCK(&sndvar->write_lock); 1919 1920 if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 1921 SQ_LOCK(&mtcp->ctx->sendq_lock); 1922 sndvar->on_sendq = TRUE; 1923 StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 1924 SQ_UNLOCK(&mtcp->ctx->sendq_lock); 1925 mtcp->wakeup_flag = TRUE; 1926 } 1927 1928 if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) { 1929 to_write = -1; 1930 errno = EAGAIN; 1931 } 1932 1933 /* if there are remaining sending buffer, generate write event */ 1934 if (sndvar->snd_wnd > 0) { 1935 if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 1936 AddEpollEvent(mtcp->ep, 1937 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 1938 } 1939 } 1940 1941 TRACE_API("Stream %d: mtcp_writev() returning %d\n", 1942 cur_stream->id, to_write); 1943 return to_write; 1944 } 1945 /*----------------------------------------------------------------------------*/ 1946 uint32_t 1947 mtcp_get_connection_cnt(mctx_t mctx) 1948 { 1949 mtcp_manager_t mtcp; 1950 mtcp = GetMTCPManager(mctx); 1951 if (!mtcp) { 1952 errno = EACCES; 1953 return -1; 1954 } 1955 1956 if (mtcp->num_msp > 0) 1957 return mtcp->flow_cnt / 2; 1958 else 1959 return mtcp->flow_cnt; 1960 } 1961 /*----------------------------------------------------------------------------*/ 1962