1 #include <sys/queue.h> 2 #include <sys/ioctl.h> 3 #include <limits.h> 4 #include <unistd.h> 5 #include <assert.h> 6 #include <string.h> 7 8 #ifdef DARWIN 9 #include <netinet/tcp.h> 10 #include <netinet/ip.h> 11 #include <netinet/if_ether.h> 12 #endif 13 14 #include "mtcp.h" 15 #include "mtcp_api.h" 16 #include "tcp_in.h" 17 #include "tcp_stream.h" 18 #include "tcp_out.h" 19 #include "ip_out.h" 20 #include "eventpoll.h" 21 #include "pipe.h" 22 #include "fhash.h" 23 #include "addr_pool.h" 24 #include "rss.h" 25 #include "config.h" 26 #include "debug.h" 27 #include "eventpoll.h" 28 #include "mos_api.h" 29 #include "tcp_rb.h" 30 31 #define MAX(a, b) ((a)>(b)?(a):(b)) 32 #define MIN(a, b) ((a)<(b)?(a):(b)) 33 34 /*----------------------------------------------------------------------------*/ 35 /** Stop monitoring the socket! (function prototype) 36 * @param [in] mctx: mtcp context 37 * @param [in] sock: monitoring stream socket id 38 * @param [in] side: side of monitoring (client side, server side or both) 39 * 40 * This function is now DEPRECATED and is only used within mOS core... 41 */ 42 int 43 mtcp_cb_stop(mctx_t mctx, int sock, int side); 44 /*----------------------------------------------------------------------------*/ 45 /** Reset the connection (send RST packets to both sides) 46 * (We need to decide the API for this.) 47 */ 48 //int 49 //mtcp_cb_reset(mctx_t mctx, int sock, int side); 50 /*----------------------------------------------------------------------------*/ 51 inline mtcp_manager_t 52 GetMTCPManager(mctx_t mctx) 53 { 54 if (!mctx) { 55 errno = EACCES; 56 return NULL; 57 } 58 59 if (mctx->cpu < 0 || mctx->cpu >= num_cpus) { 60 errno = EINVAL; 61 return NULL; 62 } 63 64 if (!g_mtcp[mctx->cpu] || g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) { 65 errno = EPERM; 66 return NULL; 67 } 68 69 return g_mtcp[mctx->cpu]; 70 } 71 /*----------------------------------------------------------------------------*/ 72 static inline int 73 GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen) 74 { 75 tcp_stream *cur_stream; 76 77 if (!socket->stream) { 78 errno = EBADF; 79 return -1; 80 } 81 82 cur_stream = socket->stream; 83 if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 84 if (cur_stream->close_reason == TCP_TIMEDOUT || 85 cur_stream->close_reason == TCP_CONN_FAIL || 86 cur_stream->close_reason == TCP_CONN_LOST) { 87 *(int *)optval = ETIMEDOUT; 88 *optlen = sizeof(int); 89 90 return 0; 91 } 92 } 93 94 if (cur_stream->state == TCP_ST_CLOSE_WAIT || 95 cur_stream->state == TCP_ST_CLOSED_RSVD) { 96 if (cur_stream->close_reason == TCP_RESET) { 97 *(int *)optval = ECONNRESET; 98 *optlen = sizeof(int); 99 100 return 0; 101 } 102 } 103 104 if (cur_stream->state == TCP_ST_SYN_SENT && 105 errno == EINPROGRESS) { 106 *(int *)optval = errno; 107 *optlen = sizeof(int); 108 109 return -1; 110 } 111 112 /* 113 * `base case`: If socket sees no so_error, then 114 * this also means close_reason will always be 115 * TCP_NOT_CLOSED. 116 */ 117 if (cur_stream->close_reason == TCP_NOT_CLOSED) { 118 *(int *)optval = 0; 119 *optlen = sizeof(int); 120 121 return 0; 122 } 123 124 errno = ENOSYS; 125 return -1; 126 } 127 /*----------------------------------------------------------------------------*/ 128 int 129 mtcp_getsockname(mctx_t mctx, int sockid, struct sockaddr *addr, 130 socklen_t *addrlen) 131 { 132 mtcp_manager_t mtcp; 133 socket_map_t socket; 134 135 mtcp = GetMTCPManager(mctx); 136 if (!mtcp) { 137 return -1; 138 } 139 140 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 141 TRACE_API("Socket id %d out of range.\n", sockid); 142 errno = EBADF; 143 return -1; 144 } 145 146 socket = &mtcp->smap[sockid]; 147 if (socket->socktype == MOS_SOCK_UNUSED) { 148 TRACE_API("Invalid socket id: %d\n", sockid); 149 errno = EBADF; 150 return -1; 151 } 152 153 if (*addrlen <= 0) { 154 TRACE_API("Invalid addrlen: %d\n", *addrlen); 155 errno = EINVAL; 156 return -1; 157 } 158 159 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 160 socket->socktype != MOS_SOCK_STREAM) { 161 TRACE_API("Invalid socket id: %d\n", sockid); 162 errno = ENOTSOCK; 163 return -1; 164 } 165 166 *(struct sockaddr_in *)addr = socket->saddr; 167 *addrlen = sizeof(socket->saddr); 168 169 return 0; 170 } 171 /*----------------------------------------------------------------------------*/ 172 int 173 mtcp_getsockopt(mctx_t mctx, int sockid, int level, 174 int optname, void *optval, socklen_t *optlen) 175 { 176 mtcp_manager_t mtcp; 177 socket_map_t socket; 178 179 mtcp = GetMTCPManager(mctx); 180 if (!mtcp) { 181 errno = EACCES; 182 return -1; 183 } 184 185 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 186 TRACE_API("Socket id %d out of range.\n", sockid); 187 errno = EBADF; 188 return -1; 189 } 190 191 switch (level) { 192 case SOL_SOCKET: 193 socket = &mtcp->smap[sockid]; 194 if (socket->socktype == MOS_SOCK_UNUSED) { 195 TRACE_API("Invalid socket id: %d\n", sockid); 196 errno = EBADF; 197 return -1; 198 } 199 200 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 201 socket->socktype != MOS_SOCK_STREAM) { 202 TRACE_API("Invalid socket id: %d\n", sockid); 203 errno = ENOTSOCK; 204 return -1; 205 } 206 207 if (optname == SO_ERROR) { 208 if (socket->socktype == MOS_SOCK_STREAM) { 209 return GetSocketError(socket, optval, optlen); 210 } 211 } 212 break; 213 case SOL_MONSOCKET: 214 /* check if the calling thread is in MOS context */ 215 if (mtcp->ctx->thread != pthread_self()) { 216 errno = EPERM; 217 return -1; 218 } 219 /* 220 * All options will only work for active 221 * monitor stream sockets 222 */ 223 socket = &mtcp->msmap[sockid]; 224 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 225 TRACE_API("Invalid socket id: %d\n", sockid); 226 errno = ENOTSOCK; 227 return -1; 228 } 229 230 switch (optname) { 231 case MOS_FRAGINFO_CLIBUF: 232 return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen); 233 case MOS_FRAGINFO_SVRBUF: 234 return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen); 235 case MOS_INFO_CLIBUF: 236 return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen); 237 case MOS_INFO_SVRBUF: 238 return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen); 239 case MOS_TCP_STATE_CLI: 240 return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI, 241 optval, optlen); 242 case MOS_TCP_STATE_SVR: 243 return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR, 244 optval, optlen); 245 case MOS_TIMESTAMP: 246 return GetLastTimestamp(socket->monitor_stream->stream, 247 (uint32_t *)optval, 248 optlen); 249 default: 250 TRACE_API("can't recognize the optname=%d\n", optname); 251 assert(0); 252 } 253 break; 254 } 255 errno = ENOSYS; 256 return -1; 257 } 258 /*----------------------------------------------------------------------------*/ 259 int 260 mtcp_setsockopt(mctx_t mctx, int sockid, int level, 261 int optname, const void *optval, socklen_t optlen) 262 { 263 mtcp_manager_t mtcp; 264 socket_map_t socket; 265 tcprb_t *rb; 266 267 mtcp = GetMTCPManager(mctx); 268 if (!mtcp) { 269 errno = EACCES; 270 return -1; 271 } 272 273 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 274 TRACE_API("Socket id %d out of range.\n", sockid); 275 errno = EBADF; 276 return -1; 277 } 278 279 switch (level) { 280 case SOL_SOCKET: 281 socket = &mtcp->smap[sockid]; 282 if (socket->socktype == MOS_SOCK_UNUSED) { 283 TRACE_API("Invalid socket id: %d\n", sockid); 284 errno = EBADF; 285 return -1; 286 } 287 288 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 289 socket->socktype != MOS_SOCK_STREAM) { 290 TRACE_API("Invalid socket id: %d\n", sockid); 291 errno = ENOTSOCK; 292 return -1; 293 } 294 break; 295 case SOL_MONSOCKET: 296 socket = &mtcp->msmap[sockid]; 297 /* 298 * checking of calling thread to be in MOS context is 299 * disabled since both optnames can be called from 300 * `application' context (on passive sockets) 301 */ 302 /* 303 * if (mtcp->ctx->thread != pthread_self()) 304 * return -1; 305 */ 306 307 switch (optname) { 308 case MOS_CLIOVERLAP: 309 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 310 socket->monitor_stream->stream->rcvvar->rcvbuf : 311 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 312 if (rb == NULL) { 313 errno = EFAULT; 314 return -1; 315 } 316 if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) { 317 errno = EINVAL; 318 return -1; 319 } else 320 return 0; 321 break; 322 case MOS_SVROVERLAP: 323 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 324 socket->monitor_stream->stream->rcvvar->rcvbuf : 325 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 326 if (rb == NULL) { 327 errno = EFAULT; 328 return -1; 329 } 330 if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) { 331 errno = EINVAL; 332 return -1; 333 } else 334 return 0; 335 break; 336 case MOS_CLIBUF: 337 #if 0 338 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 339 errno = EBADF; 340 return -1; 341 } 342 #endif 343 #ifdef DISABLE_DYN_RESIZE 344 if (*(int *)optval != 0) 345 return -1; 346 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 347 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 348 socket->monitor_stream->stream->rcvvar->rcvbuf : 349 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 350 if (rb) { 351 tcprb_resize_meta(rb, 0); 352 tcprb_resize(rb, 0); 353 } 354 } 355 return DisableBuf(socket, MOS_SIDE_CLI); 356 #else 357 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 358 socket->monitor_stream->stream->rcvvar->rcvbuf : 359 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 360 if (tcprb_resize_meta(rb, *(int *)optval) < 0) 361 return -1; 362 return tcprb_resize(rb, 363 (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 364 #endif 365 case MOS_SVRBUF: 366 #if 0 367 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 368 errno = EBADF; 369 return -1; 370 } 371 #endif 372 #ifdef DISABLE_DYN_RESIZE 373 if (*(int *)optval != 0) 374 return -1; 375 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 376 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 377 socket->monitor_stream->stream->rcvvar->rcvbuf : 378 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 379 if (rb) { 380 tcprb_resize_meta(rb, 0); 381 tcprb_resize(rb, 0); 382 } 383 } 384 return DisableBuf(socket, MOS_SIDE_SVR); 385 #else 386 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 387 socket->monitor_stream->stream->rcvvar->rcvbuf : 388 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 389 if (tcprb_resize_meta(rb, *(int *)optval) < 0) 390 return -1; 391 return tcprb_resize(rb, 392 (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 393 #endif 394 case MOS_FRAG_CLIBUF: 395 #if 0 396 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 397 errno = EBADF; 398 return -1; 399 } 400 #endif 401 #ifdef DISABLE_DYN_RESIZE 402 if (*(int *)optval != 0) 403 return -1; 404 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 405 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 406 socket->monitor_stream->stream->rcvvar->rcvbuf : 407 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 408 if (rb) 409 tcprb_resize(rb, 0); 410 } 411 return 0; 412 #else 413 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 414 socket->monitor_stream->stream->rcvvar->rcvbuf : 415 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 416 if (rb->len == 0) 417 return tcprb_resize_meta(rb, *(int *)optval); 418 else 419 return -1; 420 #endif 421 case MOS_FRAG_SVRBUF: 422 #if 0 423 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 424 errno = EBADF; 425 return -1; 426 } 427 #endif 428 #ifdef DISABLE_DYN_RESIZE 429 if (*(int *)optval != 0) 430 return -1; 431 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 432 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 433 socket->monitor_stream->stream->rcvvar->rcvbuf : 434 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 435 if (rb) 436 tcprb_resize(rb, 0); 437 } 438 return 0; 439 #else 440 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 441 socket->monitor_stream->stream->rcvvar->rcvbuf : 442 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 443 if (rb->len == 0) 444 return tcprb_resize_meta(rb, *(int *)optval); 445 else 446 return -1; 447 #endif 448 case MOS_MONLEVEL: 449 #ifdef OLD_API 450 assert(*(int *)optval == MOS_NO_CLIBUF || 451 *(int *)optval == MOS_NO_SVRBUF); 452 return DisableBuf(socket, 453 (*(int *)optval == MOS_NO_CLIBUF) ? 454 MOS_SIDE_CLI : MOS_SIDE_SVR); 455 #endif 456 case MOS_SEQ_REMAP: 457 return TcpSeqChange(socket, 458 (uint32_t)((seq_remap_info *)optval)->seq_off, 459 ((seq_remap_info *)optval)->side, 460 mtcp->pctx->p.seq); 461 case MOS_STOP_MON: 462 return mtcp_cb_stop(mctx, sockid, *(int *)optval); 463 default: 464 TRACE_API("invalid optname=%d\n", optname); 465 assert(0); 466 } 467 break; 468 } 469 470 errno = ENOSYS; 471 return -1; 472 } 473 /*----------------------------------------------------------------------------*/ 474 int 475 mtcp_setsock_nonblock(mctx_t mctx, int sockid) 476 { 477 mtcp_manager_t mtcp; 478 479 mtcp = GetMTCPManager(mctx); 480 if (!mtcp) { 481 errno = EACCES; 482 return -1; 483 } 484 485 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 486 TRACE_API("Socket id %d out of range.\n", sockid); 487 errno = EBADF; 488 return -1; 489 } 490 491 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 492 TRACE_API("Invalid socket id: %d\n", sockid); 493 errno = EBADF; 494 return -1; 495 } 496 497 mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 498 499 return 0; 500 } 501 /*----------------------------------------------------------------------------*/ 502 int 503 mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp) 504 { 505 mtcp_manager_t mtcp; 506 socket_map_t socket; 507 508 mtcp = GetMTCPManager(mctx); 509 if (!mtcp) { 510 errno = EACCES; 511 return -1; 512 } 513 514 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 515 TRACE_API("Socket id %d out of range.\n", sockid); 516 errno = EBADF; 517 return -1; 518 } 519 520 /* only support stream socket */ 521 socket = &mtcp->smap[sockid]; 522 523 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 524 socket->socktype != MOS_SOCK_STREAM) { 525 TRACE_API("Invalid socket id: %d\n", sockid); 526 errno = EBADF; 527 return -1; 528 } 529 530 if (!argp) { 531 errno = EFAULT; 532 return -1; 533 } 534 535 if (request == FIONREAD) { 536 tcp_stream *cur_stream; 537 tcprb_t *rbuf; 538 539 cur_stream = socket->stream; 540 if (!cur_stream) { 541 errno = EBADF; 542 return -1; 543 } 544 545 rbuf = cur_stream->rcvvar->rcvbuf; 546 *(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0; 547 548 } else if (request == FIONBIO) { 549 /* 550 * sockets can only be set to blocking/non-blocking 551 * modes during initialization 552 */ 553 if ((*(int *)argp)) 554 mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 555 else 556 mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK; 557 } else { 558 errno = EINVAL; 559 return -1; 560 } 561 562 return 0; 563 } 564 /*----------------------------------------------------------------------------*/ 565 static int 566 mtcp_monitor(mctx_t mctx, socket_map_t sock) 567 { 568 mtcp_manager_t mtcp; 569 struct mon_listener *monitor; 570 int sockid = sock->id; 571 572 mtcp = GetMTCPManager(mctx); 573 if (!mtcp) { 574 errno = EACCES; 575 return -1; 576 } 577 578 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 579 TRACE_API("Socket id %d out of range.\n", sockid); 580 errno = EBADF; 581 return -1; 582 } 583 584 if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) { 585 TRACE_API("Invalid socket id: %d\n", sockid); 586 errno = EBADF; 587 return -1; 588 } 589 590 if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM || 591 mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) { 592 TRACE_API("Not a monitor socket. id: %d\n", sockid); 593 errno = ENOTSOCK; 594 return -1; 595 } 596 597 monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener)); 598 if (!monitor) { 599 /* errno set from the malloc() */ 600 errno = ENOMEM; 601 return -1; 602 } 603 604 /* create a monitor-specific event queue */ 605 monitor->eq = CreateEventQueue(g_config.mos->max_concurrency); 606 if (!monitor->eq) { 607 TRACE_API("Can't create event queue (concurrency: %d) for " 608 "monitor read event registrations!\n", 609 g_config.mos->max_concurrency); 610 free(monitor); 611 errno = ENOMEM; 612 return -1; 613 } 614 615 /* set monitor-related basic parameters */ 616 #ifndef NEWEV 617 monitor->ude_id = UDE_OFFSET; 618 #endif 619 monitor->socket = sock; 620 monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL; 621 622 /* perform both sides monitoring by default */ 623 monitor->client_mon = monitor->server_mon = 1; 624 625 /* add monitor socket to the monitor list */ 626 TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link); 627 628 mtcp->msmap[sockid].monitor_listener = monitor; 629 630 return 0; 631 } 632 /*----------------------------------------------------------------------------*/ 633 int 634 mtcp_socket(mctx_t mctx, int domain, int type, int protocol) 635 { 636 mtcp_manager_t mtcp; 637 socket_map_t socket; 638 639 mtcp = GetMTCPManager(mctx); 640 if (!mtcp) { 641 errno = EACCES; 642 return -1; 643 } 644 645 if (domain != AF_INET) { 646 errno = EAFNOSUPPORT; 647 return -1; 648 } 649 650 if (type == SOCK_STREAM) { 651 type = MOS_SOCK_STREAM; 652 } else if (type == MOS_SOCK_MONITOR_STREAM || 653 type == MOS_SOCK_MONITOR_RAW) { 654 /* do nothing for the time being */ 655 } else { 656 /* Not supported type */ 657 errno = EINVAL; 658 return -1; 659 } 660 661 socket = AllocateSocket(mctx, type); 662 if (!socket) { 663 errno = ENFILE; 664 return -1; 665 } 666 667 if (type == MOS_SOCK_MONITOR_STREAM || 668 type == MOS_SOCK_MONITOR_RAW) { 669 mtcp_manager_t mtcp = GetMTCPManager(mctx); 670 if (!mtcp) { 671 errno = EACCES; 672 return -1; 673 } 674 mtcp_monitor(mctx, socket); 675 #ifdef NEWEV 676 socket->monitor_listener->stree_dontcare = NULL; 677 socket->monitor_listener->stree_pre_rcv = NULL; 678 socket->monitor_listener->stree_post_snd = NULL; 679 #else 680 InitEvB(mtcp, &socket->monitor_listener->dontcare_evb); 681 InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb); 682 InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb); 683 #endif 684 } 685 686 return socket->id; 687 } 688 /*----------------------------------------------------------------------------*/ 689 int 690 mtcp_bind(mctx_t mctx, int sockid, 691 const struct sockaddr *addr, socklen_t addrlen) 692 { 693 mtcp_manager_t mtcp; 694 struct sockaddr_in *addr_in; 695 696 mtcp = GetMTCPManager(mctx); 697 if (!mtcp) { 698 errno = EACCES; 699 return -1; 700 } 701 702 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 703 TRACE_API("Socket id %d out of range.\n", sockid); 704 errno = EBADF; 705 return -1; 706 } 707 708 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 709 TRACE_API("Invalid socket id: %d\n", sockid); 710 errno = EBADF; 711 return -1; 712 } 713 714 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM && 715 mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 716 TRACE_API("Not a stream socket id: %d\n", sockid); 717 errno = ENOTSOCK; 718 return -1; 719 } 720 721 if (!addr) { 722 TRACE_API("Socket %d: empty address!\n", sockid); 723 errno = EINVAL; 724 return -1; 725 } 726 727 if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) { 728 TRACE_API("Socket %d: adress already bind for this socket.\n", sockid); 729 errno = EINVAL; 730 return -1; 731 } 732 733 /* we only allow bind() for AF_INET address */ 734 if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 735 TRACE_API("Socket %d: invalid argument!\n", sockid); 736 errno = EINVAL; 737 return -1; 738 } 739 740 if (mtcp->listener) { 741 TRACE_API("Address already bound!\n"); 742 errno = EINVAL; 743 return -1; 744 } 745 addr_in = (struct sockaddr_in *)addr; 746 mtcp->smap[sockid].saddr = *addr_in; 747 mtcp->smap[sockid].opts |= MTCP_ADDR_BIND; 748 749 return 0; 750 } 751 /*----------------------------------------------------------------------------*/ 752 int 753 mtcp_listen(mctx_t mctx, int sockid, int backlog) 754 { 755 mtcp_manager_t mtcp; 756 struct tcp_listener *listener; 757 758 mtcp = GetMTCPManager(mctx); 759 if (!mtcp) { 760 errno = EACCES; 761 return -1; 762 } 763 764 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 765 TRACE_API("Socket id %d out of range.\n", sockid); 766 errno = EBADF; 767 return -1; 768 } 769 770 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 771 TRACE_API("Invalid socket id: %d\n", sockid); 772 errno = EBADF; 773 return -1; 774 } 775 776 if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) { 777 mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN; 778 } 779 780 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 781 TRACE_API("Not a listening socket. id: %d\n", sockid); 782 errno = ENOTSOCK; 783 return -1; 784 } 785 786 if (backlog <= 0 || backlog > g_config.mos->max_concurrency) { 787 errno = EINVAL; 788 return -1; 789 } 790 791 listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener)); 792 if (!listener) { 793 /* errno set from the malloc() */ 794 errno = ENOMEM; 795 return -1; 796 } 797 798 listener->sockid = sockid; 799 listener->backlog = backlog; 800 listener->socket = &mtcp->smap[sockid]; 801 802 if (pthread_cond_init(&listener->accept_cond, NULL)) { 803 perror("pthread_cond_init of ctx->accept_cond\n"); 804 /* errno set by pthread_cond_init() */ 805 return -1; 806 } 807 if (pthread_mutex_init(&listener->accept_lock, NULL)) { 808 perror("pthread_mutex_init of ctx->accept_lock\n"); 809 /* errno set by pthread_mutex_init() */ 810 return -1; 811 } 812 813 listener->acceptq = CreateStreamQueue(backlog); 814 if (!listener->acceptq) { 815 free(listener); 816 errno = ENOMEM; 817 return -1; 818 } 819 820 mtcp->smap[sockid].listener = listener; 821 mtcp->listener = listener; 822 823 return 0; 824 } 825 /*----------------------------------------------------------------------------*/ 826 int 827 mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen) 828 { 829 mtcp_manager_t mtcp; 830 struct tcp_listener *listener; 831 socket_map_t socket; 832 tcp_stream *accepted = NULL; 833 834 mtcp = GetMTCPManager(mctx); 835 if (!mtcp) { 836 errno = EACCES; 837 return -1; 838 } 839 840 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 841 TRACE_API("Socket id %d out of range.\n", sockid); 842 errno = EBADF; 843 return -1; 844 } 845 846 /* requires listening socket */ 847 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 848 errno = EINVAL; 849 return -1; 850 } 851 852 listener = mtcp->smap[sockid].listener; 853 854 /* dequeue from the acceptq without lock first */ 855 /* if nothing there, acquire lock and cond_wait */ 856 accepted = StreamDequeue(listener->acceptq); 857 if (!accepted) { 858 if (listener->socket->opts & MTCP_NONBLOCK) { 859 errno = EAGAIN; 860 return -1; 861 862 } else { 863 pthread_mutex_lock(&listener->accept_lock); 864 while ((accepted = StreamDequeue(listener->acceptq)) == NULL) { 865 pthread_cond_wait(&listener->accept_cond, &listener->accept_lock); 866 867 if (mtcp->ctx->done || mtcp->ctx->exit) { 868 pthread_mutex_unlock(&listener->accept_lock); 869 errno = EINTR; 870 return -1; 871 } 872 } 873 pthread_mutex_unlock(&listener->accept_lock); 874 } 875 } 876 877 if (!accepted) { 878 TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n"); 879 } 880 881 if (!accepted->socket) { 882 socket = AllocateSocket(mctx, MOS_SOCK_STREAM); 883 if (!socket) { 884 TRACE_ERROR("Failed to create new socket!\n"); 885 /* TODO: destroy the stream */ 886 errno = ENFILE; 887 return -1; 888 } 889 socket->stream = accepted; 890 accepted->socket = socket; 891 892 /* set socket addr parameters */ 893 socket->saddr.sin_family = AF_INET; 894 socket->saddr.sin_port = accepted->dport; 895 socket->saddr.sin_addr.s_addr = accepted->daddr; 896 897 /* if monitor is enabled, complete the socket assignment */ 898 if (socket->stream->pair_stream != NULL) 899 socket->stream->pair_stream->socket = socket; 900 } 901 902 if (!(listener->socket->epoll & MOS_EPOLLET) && 903 !StreamQueueIsEmpty(listener->acceptq)) 904 AddEpollEvent(mtcp->ep, 905 USR_SHADOW_EVENT_QUEUE, 906 listener->socket, MOS_EPOLLIN); 907 908 TRACE_API("Stream %d accepted.\n", accepted->id); 909 910 if (addr && addrlen) { 911 struct sockaddr_in *addr_in = (struct sockaddr_in *)addr; 912 addr_in->sin_family = AF_INET; 913 addr_in->sin_port = accepted->dport; 914 addr_in->sin_addr.s_addr = accepted->daddr; 915 *addrlen = sizeof(struct sockaddr_in); 916 } 917 918 return accepted->socket->id; 919 } 920 /*----------------------------------------------------------------------------*/ 921 int 922 mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr, 923 in_addr_t daddr, in_addr_t dport) 924 { 925 mtcp_manager_t mtcp; 926 addr_pool_t ap; 927 928 mtcp = GetMTCPManager(mctx); 929 if (!mtcp) { 930 errno = EACCES; 931 return -1; 932 } 933 934 if (saddr_base == INADDR_ANY) { 935 int nif_out; 936 937 /* for the INADDR_ANY, find the output interface for the destination 938 and set the saddr_base as the ip address of the output interface */ 939 nif_out = GetOutputInterface(daddr); 940 if (nif_out < 0) { 941 TRACE_DBG("Could not determine nif idx!\n"); 942 errno = EINVAL; 943 return -1; 944 } 945 saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr; 946 } 947 948 ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus, 949 saddr_base, num_addr, daddr, dport); 950 if (!ap) { 951 errno = ENOMEM; 952 return -1; 953 } 954 955 mtcp->ap = ap; 956 957 return 0; 958 } 959 /*----------------------------------------------------------------------------*/ 960 int 961 eval_bpf_5tuple(struct sfbpf_program fcode, 962 in_addr_t saddr, in_port_t sport, 963 in_addr_t daddr, in_port_t dport) { 964 uint8_t buf[TOTAL_TCP_HEADER_LEN]; 965 struct ethhdr *ethh; 966 struct iphdr *iph; 967 struct tcphdr *tcph; 968 969 ethh = (struct ethhdr *)buf; 970 ethh->h_proto = htons(ETH_P_IP); 971 iph = (struct iphdr *)(ethh + 1); 972 iph->ihl = IP_HEADER_LEN >> 2; 973 iph->version = 4; 974 iph->tos = 0; 975 iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN); 976 iph->id = htons(0); 977 iph->protocol = IPPROTO_TCP; 978 iph->saddr = saddr; 979 iph->daddr = daddr; 980 iph->check = 0; 981 tcph = (struct tcphdr *)(iph + 1); 982 tcph->source = sport; 983 tcph->dest = dport; 984 985 return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr), 986 TOTAL_TCP_HEADER_LEN); 987 } 988 /*----------------------------------------------------------------------------*/ 989 int 990 mtcp_connect(mctx_t mctx, int sockid, 991 const struct sockaddr *addr, socklen_t addrlen) 992 { 993 mtcp_manager_t mtcp; 994 socket_map_t socket; 995 tcp_stream *cur_stream; 996 struct sockaddr_in *addr_in; 997 in_addr_t dip; 998 in_port_t dport; 999 int is_dyn_bound = FALSE; 1000 int ret, nif; 1001 int cnt_match = 0; 1002 struct mon_listener *walk; 1003 struct sfbpf_program fcode; 1004 1005 cur_stream = NULL; 1006 mtcp = GetMTCPManager(mctx); 1007 if (!mtcp) { 1008 errno = EACCES; 1009 return -1; 1010 } 1011 1012 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1013 TRACE_API("Socket id %d out of range.\n", sockid); 1014 errno = EBADF; 1015 return -1; 1016 } 1017 1018 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 1019 TRACE_API("Invalid socket id: %d\n", sockid); 1020 errno = EBADF; 1021 return -1; 1022 } 1023 1024 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 1025 TRACE_API("Not an end socket. id: %d\n", sockid); 1026 errno = ENOTSOCK; 1027 return -1; 1028 } 1029 1030 if (!addr) { 1031 TRACE_API("Socket %d: empty address!\n", sockid); 1032 errno = EFAULT; 1033 return -1; 1034 } 1035 1036 /* we only allow bind() for AF_INET address */ 1037 if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 1038 TRACE_API("Socket %d: invalid argument!\n", sockid); 1039 errno = EAFNOSUPPORT; 1040 return -1; 1041 } 1042 1043 socket = &mtcp->smap[sockid]; 1044 if (socket->stream) { 1045 TRACE_API("Socket %d: stream already exist!\n", sockid); 1046 if (socket->stream->state >= TCP_ST_ESTABLISHED) { 1047 errno = EISCONN; 1048 } else { 1049 errno = EALREADY; 1050 } 1051 return -1; 1052 } 1053 1054 addr_in = (struct sockaddr_in *)addr; 1055 dip = addr_in->sin_addr.s_addr; 1056 dport = addr_in->sin_port; 1057 1058 /* address binding */ 1059 if (socket->opts & MTCP_ADDR_BIND && 1060 socket->saddr.sin_port != INPORT_ANY && 1061 socket->saddr.sin_addr.s_addr != INADDR_ANY) { 1062 int rss_core; 1063 1064 rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip, 1065 socket->saddr.sin_port, dport, num_queues); 1066 1067 if (rss_core != mctx->cpu) { 1068 errno = EINVAL; 1069 return -1; 1070 } 1071 } else { 1072 if (mtcp->ap) { 1073 ret = FetchAddress(mtcp->ap, 1074 mctx->cpu, num_queues, addr_in, &socket->saddr); 1075 } else { 1076 nif = GetOutputInterface(dip); 1077 if (nif < 0) { 1078 errno = EINVAL; 1079 return -1; 1080 } 1081 ret = FetchAddress(ap[nif], 1082 mctx->cpu, num_queues, addr_in, &socket->saddr); 1083 } 1084 if (ret < 0) { 1085 errno = EAGAIN; 1086 return -1; 1087 } 1088 socket->opts |= MTCP_ADDR_BIND; 1089 is_dyn_bound = TRUE; 1090 } 1091 1092 cnt_match = 0; 1093 if (mtcp->num_msp > 0) { 1094 TAILQ_FOREACH(walk, &mtcp->monitors, link) { 1095 fcode = walk->stream_syn_fcode; 1096 if (!(ISSET_BPFFILTER(fcode) && 1097 eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr, 1098 socket->saddr.sin_port, 1099 dip, dport) == 0)) { 1100 walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1 1101 cnt_match++; 1102 } 1103 } 1104 } 1105 1106 if (mtcp->num_msp > 0 && cnt_match > 0) { 1107 /* 150820 dhkim: XXX: embedded mode is not verified */ 1108 #if 1 1109 cur_stream = CreateClientTCPStream(mtcp, socket, 1110 STREAM_TYPE(MOS_SOCK_STREAM) | 1111 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 1112 socket->saddr.sin_addr.s_addr, 1113 socket->saddr.sin_port, dip, dport, NULL); 1114 #else 1115 cur_stream = CreateDualTCPStream(mtcp, socket, 1116 STREAM_TYPE(MOS_SOCK_STREAM) | 1117 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 1118 socket->saddr.sin_addr.s_addr, 1119 socket->saddr.sin_port, dip, dport, NULL); 1120 #endif 1121 } 1122 else 1123 cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM), 1124 socket->saddr.sin_addr.s_addr, 1125 socket->saddr.sin_port, dip, dport, NULL); 1126 if (!cur_stream) { 1127 TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid); 1128 errno = ENOMEM; 1129 return -1; 1130 } 1131 1132 if (is_dyn_bound) 1133 cur_stream->is_bound_addr = TRUE; 1134 cur_stream->sndvar->cwnd = 1; 1135 cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10; 1136 cur_stream->side = MOS_SIDE_CLI; 1137 /* if monitor is enabled, update the pair stream side as well */ 1138 if (cur_stream->pair_stream) { 1139 cur_stream->pair_stream->side = MOS_SIDE_SVR; 1140 /* 1141 * if buffer management is off, then disable 1142 * monitoring tcp ring of server... 1143 * if there is even a single monitor asking for 1144 * buffer management, enable it (that's why the 1145 * need for the loop) 1146 */ 1147 cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF; 1148 struct socket_map *walk; 1149 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 1150 uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt; 1151 if (bm > cur_stream->pair_stream->buffer_mgmt) { 1152 cur_stream->pair_stream->buffer_mgmt = bm; 1153 break; 1154 } 1155 } SOCKQ_FOREACH_END; 1156 } 1157 1158 cur_stream->state = TCP_ST_SYN_SENT; 1159 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1160 1161 TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id); 1162 1163 SQ_LOCK(&mtcp->ctx->connect_lock); 1164 ret = StreamEnqueue(mtcp->connectq, cur_stream); 1165 SQ_UNLOCK(&mtcp->ctx->connect_lock); 1166 mtcp->wakeup_flag = TRUE; 1167 if (ret < 0) { 1168 TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid); 1169 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1170 StreamEnqueue(mtcp->destroyq, cur_stream); 1171 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1172 errno = EAGAIN; 1173 return -1; 1174 } 1175 1176 /* if nonblocking socket, return EINPROGRESS */ 1177 if (socket->opts & MTCP_NONBLOCK) { 1178 errno = EINPROGRESS; 1179 return -1; 1180 1181 } else { 1182 while (1) { 1183 if (!cur_stream) { 1184 TRACE_ERROR("STREAM DESTROYED\n"); 1185 errno = ETIMEDOUT; 1186 return -1; 1187 } 1188 if (cur_stream->state > TCP_ST_ESTABLISHED) { 1189 TRACE_ERROR("Socket %d: weird state %s\n", 1190 sockid, TCPStateToString(cur_stream)); 1191 // TODO: how to handle this? 1192 errno = ENOSYS; 1193 return -1; 1194 } 1195 1196 if (cur_stream->state == TCP_ST_ESTABLISHED) { 1197 break; 1198 } 1199 usleep(1000); 1200 } 1201 } 1202 1203 return 0; 1204 } 1205 /*----------------------------------------------------------------------------*/ 1206 static inline int 1207 CloseStreamSocket(mctx_t mctx, int sockid) 1208 { 1209 mtcp_manager_t mtcp; 1210 tcp_stream *cur_stream; 1211 int ret; 1212 1213 mtcp = GetMTCPManager(mctx); 1214 if (!mtcp) { 1215 errno = EACCES; 1216 return -1; 1217 } 1218 1219 cur_stream = mtcp->smap[sockid].stream; 1220 if (!cur_stream) { 1221 TRACE_API("Socket %d: stream does not exist.\n", sockid); 1222 errno = ENOTCONN; 1223 return -1; 1224 } 1225 1226 if (cur_stream->closed) { 1227 TRACE_API("Socket %d (Stream %u): already closed stream\n", 1228 sockid, cur_stream->id); 1229 return 0; 1230 } 1231 cur_stream->closed = TRUE; 1232 1233 TRACE_API("Stream %d: closing the stream.\n", cur_stream->id); 1234 1235 /* 141029 dhkim: Check this! */ 1236 cur_stream->socket = NULL; 1237 1238 if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 1239 TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n", 1240 cur_stream->id); 1241 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1242 StreamEnqueue(mtcp->destroyq, cur_stream); 1243 mtcp->wakeup_flag = TRUE; 1244 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1245 return 0; 1246 1247 } else if (cur_stream->state == TCP_ST_SYN_SENT) { 1248 #if 1 1249 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1250 StreamEnqueue(mtcp->destroyq, cur_stream); 1251 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1252 mtcp->wakeup_flag = TRUE; 1253 #endif 1254 return -1; 1255 1256 } else if (cur_stream->state != TCP_ST_ESTABLISHED && 1257 cur_stream->state != TCP_ST_CLOSE_WAIT) { 1258 TRACE_API("Stream %d at state %s\n", 1259 cur_stream->id, TCPStateToString(cur_stream)); 1260 errno = EBADF; 1261 return -1; 1262 } 1263 1264 SQ_LOCK(&mtcp->ctx->close_lock); 1265 cur_stream->sndvar->on_closeq = TRUE; 1266 ret = StreamEnqueue(mtcp->closeq, cur_stream); 1267 mtcp->wakeup_flag = TRUE; 1268 SQ_UNLOCK(&mtcp->ctx->close_lock); 1269 1270 if (ret < 0) { 1271 TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 1272 errno = EAGAIN; 1273 return -1; 1274 } 1275 1276 return 0; 1277 } 1278 /*----------------------------------------------------------------------------*/ 1279 static inline int 1280 CloseListeningSocket(mctx_t mctx, int sockid) 1281 { 1282 mtcp_manager_t mtcp; 1283 struct tcp_listener *listener; 1284 1285 mtcp = GetMTCPManager(mctx); 1286 if (!mtcp) { 1287 errno = EACCES; 1288 return -1; 1289 } 1290 1291 listener = mtcp->smap[sockid].listener; 1292 if (!listener) { 1293 errno = EINVAL; 1294 return -1; 1295 } 1296 1297 if (listener->acceptq) { 1298 DestroyStreamQueue(listener->acceptq); 1299 listener->acceptq = NULL; 1300 } 1301 1302 pthread_mutex_lock(&listener->accept_lock); 1303 pthread_cond_signal(&listener->accept_cond); 1304 pthread_mutex_unlock(&listener->accept_lock); 1305 1306 pthread_cond_destroy(&listener->accept_cond); 1307 pthread_mutex_destroy(&listener->accept_lock); 1308 1309 free(listener); 1310 mtcp->smap[sockid].listener = NULL; 1311 1312 return 0; 1313 } 1314 /*----------------------------------------------------------------------------*/ 1315 int 1316 mtcp_close(mctx_t mctx, int sockid) 1317 { 1318 mtcp_manager_t mtcp; 1319 int ret; 1320 1321 mtcp = GetMTCPManager(mctx); 1322 if (!mtcp) { 1323 errno = EACCES; 1324 return -1; 1325 } 1326 1327 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1328 TRACE_API("Socket id %d out of range.\n", sockid); 1329 errno = EBADF; 1330 return -1; 1331 } 1332 1333 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 1334 TRACE_API("Invalid socket id: %d\n", sockid); 1335 errno = EBADF; 1336 return -1; 1337 } 1338 1339 TRACE_API("Socket %d: mtcp_close called.\n", sockid); 1340 1341 switch (mtcp->smap[sockid].socktype) { 1342 case MOS_SOCK_STREAM: 1343 ret = CloseStreamSocket(mctx, sockid); 1344 break; 1345 1346 case MOS_SOCK_STREAM_LISTEN: 1347 ret = CloseListeningSocket(mctx, sockid); 1348 break; 1349 1350 case MOS_SOCK_EPOLL: 1351 ret = CloseEpollSocket(mctx, sockid); 1352 break; 1353 1354 case MOS_SOCK_PIPE: 1355 ret = PipeClose(mctx, sockid); 1356 break; 1357 1358 default: 1359 errno = EINVAL; 1360 ret = -1; 1361 break; 1362 } 1363 1364 FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 1365 1366 return ret; 1367 } 1368 /*----------------------------------------------------------------------------*/ 1369 int 1370 mtcp_abort(mctx_t mctx, int sockid) 1371 { 1372 mtcp_manager_t mtcp; 1373 tcp_stream *cur_stream; 1374 int ret; 1375 1376 mtcp = GetMTCPManager(mctx); 1377 if (!mtcp) { 1378 errno = EACCES; 1379 return -1; 1380 } 1381 1382 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1383 TRACE_API("Socket id %d out of range.\n", sockid); 1384 errno = EBADF; 1385 return -1; 1386 } 1387 1388 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 1389 TRACE_API("Invalid socket id: %d\n", sockid); 1390 errno = EBADF; 1391 return -1; 1392 } 1393 1394 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 1395 TRACE_API("Not an end socket. id: %d\n", sockid); 1396 errno = ENOTSOCK; 1397 return -1; 1398 } 1399 1400 cur_stream = mtcp->smap[sockid].stream; 1401 if (!cur_stream) { 1402 TRACE_API("Stream %d: does not exist.\n", sockid); 1403 errno = ENOTCONN; 1404 return -1; 1405 } 1406 1407 TRACE_API("Socket %d: mtcp_abort()\n", sockid); 1408 1409 FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 1410 cur_stream->socket = NULL; 1411 1412 if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 1413 TRACE_API("Stream %d: connection already reset.\n", sockid); 1414 return ERROR; 1415 1416 } else if (cur_stream->state == TCP_ST_SYN_SENT) { 1417 /* TODO: this should notify event failure to all 1418 previous read() or write() calls */ 1419 cur_stream->state = TCP_ST_CLOSED_RSVD; 1420 cur_stream->close_reason = TCP_ACTIVE_CLOSE; 1421 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1422 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1423 StreamEnqueue(mtcp->destroyq, cur_stream); 1424 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1425 mtcp->wakeup_flag = TRUE; 1426 return 0; 1427 1428 } else if (cur_stream->state == TCP_ST_CLOSING || 1429 cur_stream->state == TCP_ST_LAST_ACK || 1430 cur_stream->state == TCP_ST_TIME_WAIT) { 1431 cur_stream->state = TCP_ST_CLOSED_RSVD; 1432 cur_stream->close_reason = TCP_ACTIVE_CLOSE; 1433 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1434 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1435 StreamEnqueue(mtcp->destroyq, cur_stream); 1436 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1437 mtcp->wakeup_flag = TRUE; 1438 return 0; 1439 } 1440 1441 /* the stream structure will be destroyed after sending RST */ 1442 if (cur_stream->sndvar->on_resetq) { 1443 TRACE_ERROR("Stream %d: calling mtcp_abort() " 1444 "when in reset queue.\n", sockid); 1445 errno = ECONNRESET; 1446 return -1; 1447 } 1448 SQ_LOCK(&mtcp->ctx->reset_lock); 1449 cur_stream->sndvar->on_resetq = TRUE; 1450 ret = StreamEnqueue(mtcp->resetq, cur_stream); 1451 SQ_UNLOCK(&mtcp->ctx->reset_lock); 1452 mtcp->wakeup_flag = TRUE; 1453 1454 if (ret < 0) { 1455 TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 1456 errno = EAGAIN; 1457 return -1; 1458 } 1459 1460 return 0; 1461 } 1462 /*----------------------------------------------------------------------------*/ 1463 static inline int 1464 PeekForUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len) 1465 { 1466 struct tcp_recv_vars *rcvvar = cur_stream->rcvvar; 1467 int copylen; 1468 tcprb_t *rb = rcvvar->rcvbuf; 1469 1470 if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) { 1471 errno = EAGAIN; 1472 return -1; 1473 } 1474 1475 return copylen; 1476 } 1477 /*----------------------------------------------------------------------------*/ 1478 static inline int 1479 CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len) 1480 { 1481 struct tcp_recv_vars *rcvvar = cur_stream->rcvvar; 1482 int copylen; 1483 tcprb_t *rb = rcvvar->rcvbuf; 1484 if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) { 1485 errno = EAGAIN; 1486 return -1; 1487 } 1488 tcprb_setpile(rb, rb->pile + copylen); 1489 1490 rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb); 1491 //printf("rcv_wnd: %d\n", rcvvar->rcv_wnd); 1492 1493 /* Advertise newly freed receive buffer */ 1494 if (cur_stream->need_wnd_adv) { 1495 if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) { 1496 if (!cur_stream->sndvar->on_ackq) { 1497 SQ_LOCK(&mtcp->ctx->ackq_lock); 1498 cur_stream->sndvar->on_ackq = TRUE; 1499 StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */ 1500 SQ_UNLOCK(&mtcp->ctx->ackq_lock); 1501 cur_stream->need_wnd_adv = FALSE; 1502 mtcp->wakeup_flag = TRUE; 1503 } 1504 } 1505 } 1506 1507 return copylen; 1508 } 1509 /*----------------------------------------------------------------------------*/ 1510 ssize_t 1511 mtcp_recv(mctx_t mctx, int sockid, char *buf, size_t len, int flags) 1512 { 1513 mtcp_manager_t mtcp; 1514 socket_map_t socket; 1515 tcp_stream *cur_stream; 1516 struct tcp_recv_vars *rcvvar; 1517 int event_remaining, merged_len; 1518 int ret; 1519 1520 mtcp = GetMTCPManager(mctx); 1521 if (!mtcp) { 1522 errno = EACCES; 1523 return -1; 1524 } 1525 1526 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1527 TRACE_API("Socket id %d out of range.\n", sockid); 1528 errno = EBADF; 1529 return -1; 1530 } 1531 1532 socket = &mtcp->smap[sockid]; 1533 if (socket->socktype == MOS_SOCK_UNUSED) { 1534 TRACE_API("Invalid socket id: %d\n", sockid); 1535 errno = EBADF; 1536 return -1; 1537 } 1538 1539 if (socket->socktype == MOS_SOCK_PIPE) { 1540 return PipeRead(mctx, sockid, buf, len); 1541 } 1542 1543 if (socket->socktype != MOS_SOCK_STREAM) { 1544 TRACE_API("Not an end socket. id: %d\n", sockid); 1545 errno = ENOTSOCK; 1546 return -1; 1547 } 1548 1549 /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 1550 cur_stream = socket->stream; 1551 if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf || 1552 !(cur_stream->state >= TCP_ST_ESTABLISHED && 1553 cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 1554 errno = ENOTCONN; 1555 return -1; 1556 } 1557 1558 rcvvar = cur_stream->rcvvar; 1559 1560 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1561 1562 /* if CLOSE_WAIT, return 0 if there is no payload */ 1563 if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 1564 if (!rcvvar->rcvbuf) 1565 return 0; 1566 1567 if (merged_len == 0) 1568 return 0; 1569 } 1570 1571 /* return EAGAIN if no receive buffer */ 1572 if (socket->opts & MTCP_NONBLOCK) { 1573 if (!rcvvar->rcvbuf || merged_len == 0) { 1574 errno = EAGAIN; 1575 return -1; 1576 } 1577 } 1578 1579 SBUF_LOCK(&rcvvar->read_lock); 1580 1581 switch (flags) { 1582 case 0: 1583 ret = CopyToUser(mtcp, cur_stream, buf, len); 1584 break; 1585 case MSG_PEEK: 1586 ret = PeekForUser(mtcp, cur_stream, buf, len); 1587 break; 1588 default: 1589 SBUF_UNLOCK(&rcvvar->read_lock); 1590 ret = -1; 1591 errno = EINVAL; 1592 return ret; 1593 } 1594 1595 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1596 event_remaining = FALSE; 1597 /* if there are remaining payload, generate EPOLLIN */ 1598 /* (may due to insufficient user buffer) */ 1599 if (socket->epoll & MOS_EPOLLIN) { 1600 if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 1601 event_remaining = TRUE; 1602 } 1603 } 1604 /* if waiting for close, notify it if no remaining data */ 1605 if (cur_stream->state == TCP_ST_CLOSE_WAIT && 1606 merged_len == 0 && ret > 0) { 1607 event_remaining = TRUE; 1608 } 1609 1610 SBUF_UNLOCK(&rcvvar->read_lock); 1611 1612 if (event_remaining) { 1613 if (socket->epoll) { 1614 AddEpollEvent(mtcp->ep, 1615 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 1616 } 1617 } 1618 1619 TRACE_API("Stream %d: mtcp_recv() returning %d\n", cur_stream->id, ret); 1620 return ret; 1621 } 1622 /*----------------------------------------------------------------------------*/ 1623 inline ssize_t 1624 mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len) 1625 { 1626 return mtcp_recv(mctx, sockid, buf, len, 0); 1627 } 1628 /*----------------------------------------------------------------------------*/ 1629 ssize_t 1630 mtcp_readv(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV) 1631 { 1632 mtcp_manager_t mtcp; 1633 socket_map_t socket; 1634 tcp_stream *cur_stream; 1635 struct tcp_recv_vars *rcvvar; 1636 int ret, bytes_read, i; 1637 int event_remaining, merged_len; 1638 1639 mtcp = GetMTCPManager(mctx); 1640 if (!mtcp) { 1641 errno = EACCES; 1642 return -1; 1643 } 1644 1645 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1646 TRACE_API("Socket id %d out of range.\n", sockid); 1647 errno = EBADF; 1648 return -1; 1649 } 1650 1651 socket = &mtcp->smap[sockid]; 1652 if (socket->socktype == MOS_SOCK_UNUSED) { 1653 TRACE_API("Invalid socket id: %d\n", sockid); 1654 errno = EBADF; 1655 return -1; 1656 } 1657 1658 if (socket->socktype != MOS_SOCK_STREAM) { 1659 TRACE_API("Not an end socket. id: %d\n", sockid); 1660 errno = ENOTSOCK; 1661 return -1; 1662 } 1663 1664 /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 1665 cur_stream = socket->stream; 1666 if (!cur_stream || 1667 !(cur_stream->state >= TCP_ST_ESTABLISHED && 1668 cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 1669 errno = ENOTCONN; 1670 return -1; 1671 } 1672 1673 rcvvar = cur_stream->rcvvar; 1674 1675 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1676 1677 /* if CLOSE_WAIT, return 0 if there is no payload */ 1678 if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 1679 if (!rcvvar->rcvbuf) 1680 return 0; 1681 1682 if (merged_len == 0) 1683 return 0; 1684 } 1685 1686 /* return EAGAIN if no receive buffer */ 1687 if (socket->opts & MTCP_NONBLOCK) { 1688 if (!rcvvar->rcvbuf || merged_len == 0) { 1689 errno = EAGAIN; 1690 return -1; 1691 } 1692 } 1693 1694 SBUF_LOCK(&rcvvar->read_lock); 1695 1696 /* read and store the contents to the vectored buffers */ 1697 bytes_read = 0; 1698 for (i = 0; i < numIOV; i++) { 1699 if (iov[i].iov_len <= 0) 1700 continue; 1701 1702 ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 1703 if (ret <= 0) 1704 break; 1705 1706 bytes_read += ret; 1707 1708 if (ret < iov[i].iov_len) 1709 break; 1710 } 1711 1712 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1713 1714 event_remaining = FALSE; 1715 /* if there are remaining payload, generate read event */ 1716 /* (may due to insufficient user buffer) */ 1717 if (socket->epoll & MOS_EPOLLIN) { 1718 if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 1719 event_remaining = TRUE; 1720 } 1721 } 1722 /* if waiting for close, notify it if no remaining data */ 1723 if (cur_stream->state == TCP_ST_CLOSE_WAIT && 1724 merged_len == 0 && bytes_read > 0) { 1725 event_remaining = TRUE; 1726 } 1727 1728 SBUF_UNLOCK(&rcvvar->read_lock); 1729 1730 if(event_remaining) { 1731 if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) { 1732 AddEpollEvent(mtcp->ep, 1733 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 1734 } 1735 } 1736 1737 TRACE_API("Stream %d: mtcp_readv() returning %d\n", 1738 cur_stream->id, bytes_read); 1739 return bytes_read; 1740 } 1741 /*----------------------------------------------------------------------------*/ 1742 static inline int 1743 CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, const char *buf, int len) 1744 { 1745 struct tcp_send_vars *sndvar = cur_stream->sndvar; 1746 int sndlen; 1747 int ret; 1748 1749 sndlen = MIN((int)sndvar->snd_wnd, len); 1750 if (sndlen <= 0) { 1751 errno = EAGAIN; 1752 return -1; 1753 } 1754 1755 /* allocate send buffer if not exist */ 1756 if (!sndvar->sndbuf) { 1757 sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1); 1758 if (!sndvar->sndbuf) { 1759 cur_stream->close_reason = TCP_NO_MEM; 1760 /* notification may not required due to -1 return */ 1761 errno = ENOMEM; 1762 return -1; 1763 } 1764 } 1765 1766 ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen); 1767 assert(ret == sndlen); 1768 sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len; 1769 if (ret <= 0) { 1770 TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n", 1771 ret, sndlen, sndvar->sndbuf->len); 1772 errno = EAGAIN; 1773 return -1; 1774 } 1775 1776 if (sndvar->snd_wnd <= 0) { 1777 TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n", 1778 cur_stream->id, sndvar->snd_wnd); 1779 } 1780 1781 return ret; 1782 } 1783 /*----------------------------------------------------------------------------*/ 1784 ssize_t 1785 mtcp_write(mctx_t mctx, int sockid, const char *buf, size_t len) 1786 { 1787 mtcp_manager_t mtcp; 1788 socket_map_t socket; 1789 tcp_stream *cur_stream; 1790 struct tcp_send_vars *sndvar; 1791 int ret; 1792 1793 mtcp = GetMTCPManager(mctx); 1794 if (!mtcp) { 1795 errno = EACCES; 1796 return -1; 1797 } 1798 1799 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1800 TRACE_API("Socket id %d out of range.\n", sockid); 1801 errno = EBADF; 1802 return -1; 1803 } 1804 1805 socket = &mtcp->smap[sockid]; 1806 if (socket->socktype == MOS_SOCK_UNUSED) { 1807 TRACE_API("Invalid socket id: %d\n", sockid); 1808 errno = EBADF; 1809 return -1; 1810 } 1811 1812 if (socket->socktype == MOS_SOCK_PIPE) { 1813 return PipeWrite(mctx, sockid, buf, len); 1814 } 1815 1816 if (socket->socktype != MOS_SOCK_STREAM) { 1817 TRACE_API("Not an end socket. id: %d\n", sockid); 1818 errno = ENOTSOCK; 1819 return -1; 1820 } 1821 1822 cur_stream = socket->stream; 1823 if (!cur_stream || 1824 !(cur_stream->state == TCP_ST_ESTABLISHED || 1825 cur_stream->state == TCP_ST_CLOSE_WAIT)) { 1826 errno = ENOTCONN; 1827 return -1; 1828 } 1829 1830 if (len <= 0) { 1831 if (socket->opts & MTCP_NONBLOCK) { 1832 errno = EAGAIN; 1833 return -1; 1834 } else { 1835 return 0; 1836 } 1837 } 1838 1839 sndvar = cur_stream->sndvar; 1840 1841 SBUF_LOCK(&sndvar->write_lock); 1842 ret = CopyFromUser(mtcp, cur_stream, buf, len); 1843 1844 SBUF_UNLOCK(&sndvar->write_lock); 1845 1846 if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 1847 SQ_LOCK(&mtcp->ctx->sendq_lock); 1848 sndvar->on_sendq = TRUE; 1849 StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 1850 SQ_UNLOCK(&mtcp->ctx->sendq_lock); 1851 mtcp->wakeup_flag = TRUE; 1852 } 1853 1854 if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) { 1855 ret = -1; 1856 errno = EAGAIN; 1857 } 1858 1859 /* if there are remaining sending buffer, generate write event */ 1860 if (sndvar->snd_wnd > 0) { 1861 if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 1862 AddEpollEvent(mtcp->ep, 1863 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 1864 } 1865 } 1866 1867 TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret); 1868 return ret; 1869 } 1870 /*----------------------------------------------------------------------------*/ 1871 ssize_t 1872 mtcp_writev(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV) 1873 { 1874 mtcp_manager_t mtcp; 1875 socket_map_t socket; 1876 tcp_stream *cur_stream; 1877 struct tcp_send_vars *sndvar; 1878 int ret, to_write, i; 1879 1880 mtcp = GetMTCPManager(mctx); 1881 if (!mtcp) { 1882 errno = EACCES; 1883 return -1; 1884 } 1885 1886 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1887 TRACE_API("Socket id %d out of range.\n", sockid); 1888 errno = EBADF; 1889 return -1; 1890 } 1891 1892 socket = &mtcp->smap[sockid]; 1893 if (socket->socktype == MOS_SOCK_UNUSED) { 1894 TRACE_API("Invalid socket id: %d\n", sockid); 1895 errno = EBADF; 1896 return -1; 1897 } 1898 1899 if (socket->socktype != MOS_SOCK_STREAM) { 1900 TRACE_API("Not an end socket. id: %d\n", sockid); 1901 errno = ENOTSOCK; 1902 return -1; 1903 } 1904 1905 cur_stream = socket->stream; 1906 if (!cur_stream || 1907 !(cur_stream->state == TCP_ST_ESTABLISHED || 1908 cur_stream->state == TCP_ST_CLOSE_WAIT)) { 1909 errno = ENOTCONN; 1910 return -1; 1911 } 1912 1913 sndvar = cur_stream->sndvar; 1914 SBUF_LOCK(&sndvar->write_lock); 1915 1916 /* write from the vectored buffers */ 1917 to_write = 0; 1918 for (i = 0; i < numIOV; i++) { 1919 if (iov[i].iov_len <= 0) 1920 continue; 1921 1922 ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 1923 if (ret <= 0) 1924 break; 1925 1926 to_write += ret; 1927 1928 if (ret < iov[i].iov_len) 1929 break; 1930 } 1931 SBUF_UNLOCK(&sndvar->write_lock); 1932 1933 if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 1934 SQ_LOCK(&mtcp->ctx->sendq_lock); 1935 sndvar->on_sendq = TRUE; 1936 StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 1937 SQ_UNLOCK(&mtcp->ctx->sendq_lock); 1938 mtcp->wakeup_flag = TRUE; 1939 } 1940 1941 if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) { 1942 to_write = -1; 1943 errno = EAGAIN; 1944 } 1945 1946 /* if there are remaining sending buffer, generate write event */ 1947 if (sndvar->snd_wnd > 0) { 1948 if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 1949 AddEpollEvent(mtcp->ep, 1950 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 1951 } 1952 } 1953 1954 TRACE_API("Stream %d: mtcp_writev() returning %d\n", 1955 cur_stream->id, to_write); 1956 return to_write; 1957 } 1958 /*----------------------------------------------------------------------------*/ 1959 uint32_t 1960 mtcp_get_connection_cnt(mctx_t mctx) 1961 { 1962 mtcp_manager_t mtcp; 1963 mtcp = GetMTCPManager(mctx); 1964 if (!mtcp) { 1965 errno = EACCES; 1966 return -1; 1967 } 1968 1969 if (mtcp->num_msp > 0) 1970 return mtcp->flow_cnt / 2; 1971 else 1972 return mtcp->flow_cnt; 1973 } 1974 /*----------------------------------------------------------------------------*/ 1975