1 #include <sys/queue.h> 2 #include <sys/ioctl.h> 3 #include <limits.h> 4 #include <unistd.h> 5 #include <assert.h> 6 #include <string.h> 7 8 #ifdef DARWIN 9 #include <netinet/tcp.h> 10 #include <netinet/ip.h> 11 #include <netinet/if_ether.h> 12 #endif 13 14 #include "mtcp.h" 15 #include "mtcp_api.h" 16 #include "tcp_in.h" 17 #include "tcp_stream.h" 18 #include "tcp_out.h" 19 #include "ip_out.h" 20 #include "eventpoll.h" 21 #include "pipe.h" 22 #include "fhash.h" 23 #include "addr_pool.h" 24 #include "rss.h" 25 #include "config.h" 26 #include "debug.h" 27 #include "eventpoll.h" 28 #include "mos_api.h" 29 #include "tcp_rb.h" 30 31 #define MAX(a, b) ((a)>(b)?(a):(b)) 32 #define MIN(a, b) ((a)<(b)?(a):(b)) 33 34 /*----------------------------------------------------------------------------*/ 35 /** Stop monitoring the socket! (function prototype) 36 * @param [in] mctx: mtcp context 37 * @param [in] sock: monitoring stream socket id 38 * @param [in] side: side of monitoring (client side, server side or both) 39 * 40 * This function is now DEPRECATED and is only used within mOS core... 41 */ 42 int 43 mtcp_cb_stop(mctx_t mctx, int sock, int side); 44 /*----------------------------------------------------------------------------*/ 45 /** Reset the connection (send RST packets to both sides) 46 * (We need to decide the API for this.) 47 */ 48 //int 49 //mtcp_cb_reset(mctx_t mctx, int sock, int side); 50 /*----------------------------------------------------------------------------*/ 51 inline mtcp_manager_t 52 GetMTCPManager(mctx_t mctx) 53 { 54 if (!mctx) { 55 errno = EACCES; 56 return NULL; 57 } 58 59 if (mctx->cpu < 0 || mctx->cpu >= num_cpus) { 60 errno = EINVAL; 61 return NULL; 62 } 63 64 if (!g_mtcp[mctx->cpu] || g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) { 65 errno = EPERM; 66 return NULL; 67 } 68 69 return g_mtcp[mctx->cpu]; 70 } 71 /*----------------------------------------------------------------------------*/ 72 static inline int 73 GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen) 74 { 75 tcp_stream *cur_stream; 76 77 if (!socket->stream) { 78 errno = EBADF; 79 return -1; 80 } 81 82 cur_stream = socket->stream; 83 if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 84 if (cur_stream->close_reason == TCP_TIMEDOUT || 85 cur_stream->close_reason == TCP_CONN_FAIL || 86 cur_stream->close_reason == TCP_CONN_LOST) { 87 *(int *)optval = ETIMEDOUT; 88 *optlen = sizeof(int); 89 90 return 0; 91 } 92 } 93 94 if (cur_stream->state == TCP_ST_CLOSE_WAIT || 95 cur_stream->state == TCP_ST_CLOSED_RSVD) { 96 if (cur_stream->close_reason == TCP_RESET) { 97 *(int *)optval = ECONNRESET; 98 *optlen = sizeof(int); 99 100 return 0; 101 } 102 } 103 104 if (cur_stream->state == TCP_ST_SYN_SENT && 105 errno == EINPROGRESS) { 106 *(int *)optval = errno; 107 *optlen = sizeof(int); 108 109 return -1; 110 } 111 112 /* 113 * `base case`: If socket sees no so_error, then 114 * this also means close_reason will always be 115 * TCP_NOT_CLOSED. 116 */ 117 if (cur_stream->close_reason == TCP_NOT_CLOSED) { 118 *(int *)optval = 0; 119 *optlen = sizeof(int); 120 121 return 0; 122 } 123 124 errno = ENOSYS; 125 return -1; 126 } 127 /*----------------------------------------------------------------------------*/ 128 int 129 mtcp_getsockname(mctx_t mctx, int sockid, struct sockaddr *addr, 130 socklen_t *addrlen) 131 { 132 mtcp_manager_t mtcp; 133 socket_map_t socket; 134 135 mtcp = GetMTCPManager(mctx); 136 if (!mtcp) { 137 return -1; 138 } 139 140 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 141 TRACE_API("Socket id %d out of range.\n", sockid); 142 errno = EBADF; 143 return -1; 144 } 145 146 socket = &mtcp->smap[sockid]; 147 if (socket->socktype == MOS_SOCK_UNUSED) { 148 TRACE_API("Invalid socket id: %d\n", sockid); 149 errno = EBADF; 150 return -1; 151 } 152 153 if (*addrlen <= 0) { 154 TRACE_API("Invalid addrlen: %d\n", *addrlen); 155 errno = EINVAL; 156 return -1; 157 } 158 159 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 160 socket->socktype != MOS_SOCK_STREAM) { 161 TRACE_API("Invalid socket id: %d\n", sockid); 162 errno = ENOTSOCK; 163 return -1; 164 } 165 166 *(struct sockaddr_in *)addr = socket->saddr; 167 *addrlen = sizeof(socket->saddr); 168 169 return 0; 170 } 171 /*----------------------------------------------------------------------------*/ 172 int 173 mtcp_getsockopt(mctx_t mctx, int sockid, int level, 174 int optname, void *optval, socklen_t *optlen) 175 { 176 mtcp_manager_t mtcp; 177 socket_map_t socket; 178 179 mtcp = GetMTCPManager(mctx); 180 if (!mtcp) { 181 errno = EACCES; 182 return -1; 183 } 184 185 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 186 TRACE_API("Socket id %d out of range.\n", sockid); 187 errno = EBADF; 188 return -1; 189 } 190 191 switch (level) { 192 case SOL_SOCKET: 193 socket = &mtcp->smap[sockid]; 194 if (socket->socktype == MOS_SOCK_UNUSED) { 195 TRACE_API("Invalid socket id: %d\n", sockid); 196 errno = EBADF; 197 return -1; 198 } 199 200 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 201 socket->socktype != MOS_SOCK_STREAM) { 202 TRACE_API("Invalid socket id: %d\n", sockid); 203 errno = ENOTSOCK; 204 return -1; 205 } 206 207 if (optname == SO_ERROR) { 208 if (socket->socktype == MOS_SOCK_STREAM) { 209 return GetSocketError(socket, optval, optlen); 210 } 211 } 212 break; 213 case SOL_MONSOCKET: 214 /* check if the calling thread is in MOS context */ 215 if (mtcp->ctx->thread != pthread_self()) { 216 errno = EPERM; 217 return -1; 218 } 219 /* 220 * All options will only work for active 221 * monitor stream sockets 222 */ 223 socket = &mtcp->msmap[sockid]; 224 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 225 TRACE_API("Invalid socket id: %d\n", sockid); 226 errno = ENOTSOCK; 227 return -1; 228 } 229 230 switch (optname) { 231 case MOS_FRAGINFO_CLIBUF: 232 return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen); 233 case MOS_FRAGINFO_SVRBUF: 234 return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen); 235 case MOS_INFO_CLIBUF: 236 return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen); 237 case MOS_INFO_SVRBUF: 238 return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen); 239 case MOS_TCP_STATE_CLI: 240 return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI, 241 optval, optlen); 242 case MOS_TCP_STATE_SVR: 243 return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR, 244 optval, optlen); 245 case MOS_TIMESTAMP: 246 return GetLastTimestamp(socket->monitor_stream->stream, 247 (uint32_t *)optval, 248 optlen); 249 default: 250 TRACE_API("can't recognize the optname=%d\n", optname); 251 assert(0); 252 } 253 break; 254 } 255 errno = ENOSYS; 256 return -1; 257 } 258 /*----------------------------------------------------------------------------*/ 259 int 260 mtcp_setsockopt(mctx_t mctx, int sockid, int level, 261 int optname, const void *optval, socklen_t optlen) 262 { 263 mtcp_manager_t mtcp; 264 socket_map_t socket; 265 tcprb_t *rb; 266 267 mtcp = GetMTCPManager(mctx); 268 if (!mtcp) { 269 errno = EACCES; 270 return -1; 271 } 272 273 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 274 TRACE_API("Socket id %d out of range.\n", sockid); 275 errno = EBADF; 276 return -1; 277 } 278 279 switch (level) { 280 case SOL_SOCKET: 281 socket = &mtcp->smap[sockid]; 282 if (socket->socktype == MOS_SOCK_UNUSED) { 283 TRACE_API("Invalid socket id: %d\n", sockid); 284 errno = EBADF; 285 return -1; 286 } 287 288 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 289 socket->socktype != MOS_SOCK_STREAM) { 290 TRACE_API("Invalid socket id: %d\n", sockid); 291 errno = ENOTSOCK; 292 return -1; 293 } 294 break; 295 case SOL_MONSOCKET: 296 socket = &mtcp->msmap[sockid]; 297 /* 298 * checking of calling thread to be in MOS context is 299 * disabled since both optnames can be called from 300 * `application' context (on passive sockets) 301 */ 302 /* 303 * if (mtcp->ctx->thread != pthread_self()) 304 * return -1; 305 */ 306 307 switch (optname) { 308 case MOS_CLIOVERLAP: 309 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 310 socket->monitor_stream->stream->rcvvar->rcvbuf : 311 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 312 if (rb == NULL) { 313 errno = EFAULT; 314 return -1; 315 } 316 if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) { 317 errno = EINVAL; 318 return -1; 319 } else 320 return 0; 321 break; 322 case MOS_SVROVERLAP: 323 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 324 socket->monitor_stream->stream->rcvvar->rcvbuf : 325 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 326 if (rb == NULL) { 327 errno = EFAULT; 328 return -1; 329 } 330 if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) { 331 errno = EINVAL; 332 return -1; 333 } else 334 return 0; 335 break; 336 case MOS_CLIBUF: 337 #if 0 338 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 339 errno = EBADF; 340 return -1; 341 } 342 #endif 343 #ifdef DISABLE_DYN_RESIZE 344 if (*(int *)optval != 0) 345 return -1; 346 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 347 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 348 socket->monitor_stream->stream->rcvvar->rcvbuf : 349 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 350 if (rb) { 351 tcprb_resize_meta(rb, 0); 352 tcprb_resize(rb, 0); 353 } 354 } 355 return DisableBuf(socket, MOS_SIDE_CLI); 356 #else 357 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 358 socket->monitor_stream->stream->rcvvar->rcvbuf : 359 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 360 if (tcprb_resize_meta(rb, *(int *)optval) < 0) 361 return -1; 362 return tcprb_resize(rb, 363 (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 364 #endif 365 case MOS_SVRBUF: 366 #if 0 367 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 368 errno = EBADF; 369 return -1; 370 } 371 #endif 372 #ifdef DISABLE_DYN_RESIZE 373 if (*(int *)optval != 0) 374 return -1; 375 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 376 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 377 socket->monitor_stream->stream->rcvvar->rcvbuf : 378 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 379 if (rb) { 380 tcprb_resize_meta(rb, 0); 381 tcprb_resize(rb, 0); 382 } 383 } 384 return DisableBuf(socket, MOS_SIDE_SVR); 385 #else 386 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 387 socket->monitor_stream->stream->rcvvar->rcvbuf : 388 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 389 if (tcprb_resize_meta(rb, *(int *)optval) < 0) 390 return -1; 391 return tcprb_resize(rb, 392 (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 393 #endif 394 case MOS_FRAG_CLIBUF: 395 #if 0 396 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 397 errno = EBADF; 398 return -1; 399 } 400 #endif 401 #ifdef DISABLE_DYN_RESIZE 402 if (*(int *)optval != 0) 403 return -1; 404 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 405 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 406 socket->monitor_stream->stream->rcvvar->rcvbuf : 407 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 408 if (rb) 409 tcprb_resize(rb, 0); 410 } 411 return 0; 412 #else 413 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 414 socket->monitor_stream->stream->rcvvar->rcvbuf : 415 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 416 if (rb->len == 0) 417 return tcprb_resize_meta(rb, *(int *)optval); 418 else 419 return -1; 420 #endif 421 case MOS_FRAG_SVRBUF: 422 #if 0 423 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 424 errno = EBADF; 425 return -1; 426 } 427 #endif 428 #ifdef DISABLE_DYN_RESIZE 429 if (*(int *)optval != 0) 430 return -1; 431 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 432 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 433 socket->monitor_stream->stream->rcvvar->rcvbuf : 434 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 435 if (rb) 436 tcprb_resize(rb, 0); 437 } 438 return 0; 439 #else 440 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 441 socket->monitor_stream->stream->rcvvar->rcvbuf : 442 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 443 if (rb->len == 0) 444 return tcprb_resize_meta(rb, *(int *)optval); 445 else 446 return -1; 447 #endif 448 case MOS_MONLEVEL: 449 #ifdef OLD_API 450 assert(*(int *)optval == MOS_NO_CLIBUF || 451 *(int *)optval == MOS_NO_SVRBUF); 452 return DisableBuf(socket, 453 (*(int *)optval == MOS_NO_CLIBUF) ? 454 MOS_SIDE_CLI : MOS_SIDE_SVR); 455 #endif 456 case MOS_SEQ_REMAP: 457 return TcpSeqChange(socket, 458 (uint32_t)((seq_remap_info *)optval)->seq_off, 459 ((seq_remap_info *)optval)->side, 460 mtcp->pctx->p.seq); 461 case MOS_STOP_MON: 462 return mtcp_cb_stop(mctx, sockid, *(int *)optval); 463 default: 464 TRACE_API("invalid optname=%d\n", optname); 465 assert(0); 466 } 467 break; 468 } 469 470 errno = ENOSYS; 471 return -1; 472 } 473 /*----------------------------------------------------------------------------*/ 474 int 475 mtcp_setsock_nonblock(mctx_t mctx, int sockid) 476 { 477 mtcp_manager_t mtcp; 478 479 mtcp = GetMTCPManager(mctx); 480 if (!mtcp) { 481 errno = EACCES; 482 return -1; 483 } 484 485 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 486 TRACE_API("Socket id %d out of range.\n", sockid); 487 errno = EBADF; 488 return -1; 489 } 490 491 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 492 TRACE_API("Invalid socket id: %d\n", sockid); 493 errno = EBADF; 494 return -1; 495 } 496 497 mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 498 499 return 0; 500 } 501 /*----------------------------------------------------------------------------*/ 502 int 503 mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp) 504 { 505 mtcp_manager_t mtcp; 506 socket_map_t socket; 507 508 mtcp = GetMTCPManager(mctx); 509 if (!mtcp) { 510 errno = EACCES; 511 return -1; 512 } 513 514 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 515 TRACE_API("Socket id %d out of range.\n", sockid); 516 errno = EBADF; 517 return -1; 518 } 519 520 /* only support stream socket */ 521 socket = &mtcp->smap[sockid]; 522 523 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 524 socket->socktype != MOS_SOCK_STREAM) { 525 TRACE_API("Invalid socket id: %d\n", sockid); 526 errno = EBADF; 527 return -1; 528 } 529 530 if (!argp) { 531 errno = EFAULT; 532 return -1; 533 } 534 535 if (request == FIONREAD) { 536 tcp_stream *cur_stream; 537 tcprb_t *rbuf; 538 539 cur_stream = socket->stream; 540 if (!cur_stream) { 541 errno = EBADF; 542 return -1; 543 } 544 545 rbuf = cur_stream->rcvvar->rcvbuf; 546 *(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0; 547 548 } else if (request == FIONBIO) { 549 /* 550 * sockets can only be set to blocking/non-blocking 551 * modes during initialization 552 */ 553 if ((*(int *)argp)) 554 mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 555 else 556 mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK; 557 } else { 558 errno = EINVAL; 559 return -1; 560 } 561 562 return 0; 563 } 564 /*----------------------------------------------------------------------------*/ 565 static int 566 mtcp_monitor(mctx_t mctx, socket_map_t sock) 567 { 568 mtcp_manager_t mtcp; 569 struct mon_listener *monitor; 570 int sockid = sock->id; 571 572 mtcp = GetMTCPManager(mctx); 573 if (!mtcp) { 574 errno = EACCES; 575 return -1; 576 } 577 578 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 579 TRACE_API("Socket id %d out of range.\n", sockid); 580 errno = EBADF; 581 return -1; 582 } 583 584 if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) { 585 TRACE_API("Invalid socket id: %d\n", sockid); 586 errno = EBADF; 587 return -1; 588 } 589 590 if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM || 591 mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) { 592 TRACE_API("Not a monitor socket. id: %d\n", sockid); 593 errno = ENOTSOCK; 594 return -1; 595 } 596 597 monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener)); 598 if (!monitor) { 599 /* errno set from the malloc() */ 600 errno = ENOMEM; 601 return -1; 602 } 603 604 /* create a monitor-specific event queue */ 605 monitor->eq = CreateEventQueue(g_config.mos->max_concurrency); 606 if (!monitor->eq) { 607 TRACE_API("Can't create event queue (concurrency: %d) for " 608 "monitor read event registrations!\n", 609 g_config.mos->max_concurrency); 610 free(monitor); 611 errno = ENOMEM; 612 return -1; 613 } 614 615 /* set monitor-related basic parameters */ 616 #ifndef NEWEV 617 monitor->ude_id = UDE_OFFSET; 618 #endif 619 monitor->socket = sock; 620 monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL; 621 622 /* perform both sides monitoring by default */ 623 monitor->client_mon = monitor->server_mon = 1; 624 625 /* add monitor socket to the monitor list */ 626 TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link); 627 628 mtcp->msmap[sockid].monitor_listener = monitor; 629 630 return 0; 631 } 632 /*----------------------------------------------------------------------------*/ 633 int 634 mtcp_socket(mctx_t mctx, int domain, int type, int protocol) 635 { 636 mtcp_manager_t mtcp; 637 socket_map_t socket; 638 639 mtcp = GetMTCPManager(mctx); 640 if (!mtcp) { 641 errno = EACCES; 642 return -1; 643 } 644 645 if (domain != AF_INET) { 646 errno = EAFNOSUPPORT; 647 return -1; 648 } 649 650 if (type == (int)SOCK_STREAM) { 651 type = MOS_SOCK_STREAM; 652 } else if (type == MOS_SOCK_MONITOR_STREAM || 653 type == MOS_SOCK_MONITOR_RAW) { 654 /* do nothing for the time being */ 655 } else { 656 /* Not supported type */ 657 errno = EINVAL; 658 return -1; 659 } 660 661 socket = AllocateSocket(mctx, type); 662 if (!socket) { 663 errno = ENFILE; 664 return -1; 665 } 666 667 if (type == MOS_SOCK_MONITOR_STREAM || 668 type == MOS_SOCK_MONITOR_RAW) { 669 mtcp_manager_t mtcp = GetMTCPManager(mctx); 670 if (!mtcp) { 671 errno = EACCES; 672 return -1; 673 } 674 mtcp_monitor(mctx, socket); 675 #ifdef NEWEV 676 socket->monitor_listener->stree_dontcare = NULL; 677 socket->monitor_listener->stree_pre_rcv = NULL; 678 socket->monitor_listener->stree_post_snd = NULL; 679 #else 680 InitEvB(mtcp, &socket->monitor_listener->dontcare_evb); 681 InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb); 682 InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb); 683 #endif 684 } 685 686 return socket->id; 687 } 688 /*----------------------------------------------------------------------------*/ 689 int 690 mtcp_bind(mctx_t mctx, int sockid, 691 const struct sockaddr *addr, socklen_t addrlen) 692 { 693 mtcp_manager_t mtcp; 694 struct sockaddr_in *addr_in; 695 696 mtcp = GetMTCPManager(mctx); 697 if (!mtcp) { 698 errno = EACCES; 699 return -1; 700 } 701 702 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 703 TRACE_API("Socket id %d out of range.\n", sockid); 704 errno = EBADF; 705 return -1; 706 } 707 708 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 709 TRACE_API("Invalid socket id: %d\n", sockid); 710 errno = EBADF; 711 return -1; 712 } 713 714 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM && 715 mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 716 TRACE_API("Not a stream socket id: %d\n", sockid); 717 errno = ENOTSOCK; 718 return -1; 719 } 720 721 if (!addr) { 722 TRACE_API("Socket %d: empty address!\n", sockid); 723 errno = EINVAL; 724 return -1; 725 } 726 727 if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) { 728 TRACE_API("Socket %d: adress already bind for this socket.\n", sockid); 729 errno = EINVAL; 730 return -1; 731 } 732 733 /* we only allow bind() for AF_INET address */ 734 if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 735 TRACE_API("Socket %d: invalid argument!\n", sockid); 736 errno = EINVAL; 737 return -1; 738 } 739 740 if (mtcp->listener) { 741 TRACE_API("Address already bound!\n"); 742 errno = EINVAL; 743 return -1; 744 } 745 addr_in = (struct sockaddr_in *)addr; 746 mtcp->smap[sockid].saddr = *addr_in; 747 mtcp->smap[sockid].opts |= MTCP_ADDR_BIND; 748 749 return 0; 750 } 751 /*----------------------------------------------------------------------------*/ 752 int 753 mtcp_listen(mctx_t mctx, int sockid, int backlog) 754 { 755 mtcp_manager_t mtcp; 756 struct tcp_listener *listener; 757 758 mtcp = GetMTCPManager(mctx); 759 if (!mtcp) { 760 errno = EACCES; 761 return -1; 762 } 763 764 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 765 TRACE_API("Socket id %d out of range.\n", sockid); 766 errno = EBADF; 767 return -1; 768 } 769 770 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 771 TRACE_API("Invalid socket id: %d\n", sockid); 772 errno = EBADF; 773 return -1; 774 } 775 776 if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) { 777 mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN; 778 } 779 780 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 781 TRACE_API("Not a listening socket. id: %d\n", sockid); 782 errno = ENOTSOCK; 783 return -1; 784 } 785 786 if (backlog <= 0 || backlog > g_config.mos->max_concurrency) { 787 errno = EINVAL; 788 return -1; 789 } 790 791 listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener)); 792 if (!listener) { 793 /* errno set from the malloc() */ 794 errno = ENOMEM; 795 return -1; 796 } 797 798 listener->sockid = sockid; 799 listener->backlog = backlog; 800 listener->socket = &mtcp->smap[sockid]; 801 802 if (pthread_cond_init(&listener->accept_cond, NULL)) { 803 perror("pthread_cond_init of ctx->accept_cond\n"); 804 /* errno set by pthread_cond_init() */ 805 free(listener); 806 return -1; 807 } 808 if (pthread_mutex_init(&listener->accept_lock, NULL)) { 809 perror("pthread_mutex_init of ctx->accept_lock\n"); 810 /* errno set by pthread_mutex_init() */ 811 free(listener); 812 return -1; 813 } 814 815 listener->acceptq = CreateStreamQueue(backlog); 816 if (!listener->acceptq) { 817 free(listener); 818 errno = ENOMEM; 819 return -1; 820 } 821 822 mtcp->smap[sockid].listener = listener; 823 mtcp->listener = listener; 824 825 return 0; 826 } 827 /*----------------------------------------------------------------------------*/ 828 int 829 mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen) 830 { 831 mtcp_manager_t mtcp; 832 struct tcp_listener *listener; 833 socket_map_t socket; 834 tcp_stream *accepted = NULL; 835 836 mtcp = GetMTCPManager(mctx); 837 if (!mtcp) { 838 errno = EACCES; 839 return -1; 840 } 841 842 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 843 TRACE_API("Socket id %d out of range.\n", sockid); 844 errno = EBADF; 845 return -1; 846 } 847 848 /* requires listening socket */ 849 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 850 errno = EINVAL; 851 return -1; 852 } 853 854 listener = mtcp->smap[sockid].listener; 855 856 /* dequeue from the acceptq without lock first */ 857 /* if nothing there, acquire lock and cond_wait */ 858 accepted = StreamDequeue(listener->acceptq); 859 if (!accepted) { 860 if (listener->socket->opts & MTCP_NONBLOCK) { 861 errno = EAGAIN; 862 return -1; 863 864 } else { 865 pthread_mutex_lock(&listener->accept_lock); 866 while ((accepted = StreamDequeue(listener->acceptq)) == NULL) { 867 pthread_cond_wait(&listener->accept_cond, &listener->accept_lock); 868 869 if (mtcp->ctx->done || mtcp->ctx->exit) { 870 pthread_mutex_unlock(&listener->accept_lock); 871 errno = EINTR; 872 return -1; 873 } 874 } 875 pthread_mutex_unlock(&listener->accept_lock); 876 } 877 } 878 879 if (!accepted) { 880 TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n"); 881 } 882 883 if (!accepted->socket) { 884 socket = AllocateSocket(mctx, MOS_SOCK_STREAM); 885 if (!socket) { 886 TRACE_ERROR("Failed to create new socket!\n"); 887 /* TODO: destroy the stream */ 888 errno = ENFILE; 889 return -1; 890 } 891 socket->stream = accepted; 892 accepted->socket = socket; 893 894 /* set socket addr parameters */ 895 socket->saddr.sin_family = AF_INET; 896 socket->saddr.sin_port = accepted->dport; 897 socket->saddr.sin_addr.s_addr = accepted->daddr; 898 899 /* if monitor is enabled, complete the socket assignment */ 900 if (socket->stream->pair_stream != NULL) 901 socket->stream->pair_stream->socket = socket; 902 } 903 904 if (!(listener->socket->epoll & MOS_EPOLLET) && 905 !StreamQueueIsEmpty(listener->acceptq)) 906 AddEpollEvent(mtcp->ep, 907 USR_SHADOW_EVENT_QUEUE, 908 listener->socket, MOS_EPOLLIN); 909 910 TRACE_API("Stream %d accepted.\n", accepted->id); 911 912 if (addr && addrlen) { 913 struct sockaddr_in *addr_in = (struct sockaddr_in *)addr; 914 addr_in->sin_family = AF_INET; 915 addr_in->sin_port = accepted->dport; 916 addr_in->sin_addr.s_addr = accepted->daddr; 917 *addrlen = sizeof(struct sockaddr_in); 918 } 919 920 return accepted->socket->id; 921 } 922 /*----------------------------------------------------------------------------*/ 923 int 924 mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr, 925 in_addr_t daddr, in_addr_t dport) 926 { 927 mtcp_manager_t mtcp; 928 addr_pool_t ap; 929 930 mtcp = GetMTCPManager(mctx); 931 if (!mtcp) { 932 errno = EACCES; 933 return -1; 934 } 935 936 if (saddr_base == INADDR_ANY) { 937 int nif_out; 938 939 /* for the INADDR_ANY, find the output interface for the destination 940 and set the saddr_base as the ip address of the output interface */ 941 nif_out = GetOutputInterface(daddr); 942 if (nif_out < 0) { 943 TRACE_DBG("Could not determine nif idx!\n"); 944 errno = EINVAL; 945 return -1; 946 } 947 saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr; 948 } 949 950 ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus, 951 saddr_base, num_addr, daddr, dport); 952 if (!ap) { 953 errno = ENOMEM; 954 return -1; 955 } 956 957 mtcp->ap = ap; 958 959 return 0; 960 } 961 /*----------------------------------------------------------------------------*/ 962 int 963 eval_bpf_5tuple(struct sfbpf_program fcode, 964 in_addr_t saddr, in_port_t sport, 965 in_addr_t daddr, in_port_t dport) { 966 uint8_t buf[TOTAL_TCP_HEADER_LEN]; 967 struct ethhdr *ethh; 968 struct iphdr *iph; 969 struct tcphdr *tcph; 970 971 ethh = (struct ethhdr *)buf; 972 ethh->h_proto = htons(ETH_P_IP); 973 iph = (struct iphdr *)(ethh + 1); 974 iph->ihl = IP_HEADER_LEN >> 2; 975 iph->version = 4; 976 iph->tos = 0; 977 iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN); 978 iph->id = htons(0); 979 iph->protocol = IPPROTO_TCP; 980 iph->saddr = saddr; 981 iph->daddr = daddr; 982 iph->check = 0; 983 tcph = (struct tcphdr *)(iph + 1); 984 tcph->source = sport; 985 tcph->dest = dport; 986 987 return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr), 988 TOTAL_TCP_HEADER_LEN); 989 } 990 /*----------------------------------------------------------------------------*/ 991 int 992 mtcp_connect(mctx_t mctx, int sockid, 993 const struct sockaddr *addr, socklen_t addrlen) 994 { 995 mtcp_manager_t mtcp; 996 socket_map_t socket; 997 tcp_stream *cur_stream; 998 struct sockaddr_in *addr_in; 999 in_addr_t dip; 1000 in_port_t dport; 1001 int is_dyn_bound = FALSE; 1002 int ret, nif; 1003 int cnt_match = 0; 1004 struct mon_listener *walk; 1005 struct sfbpf_program fcode; 1006 1007 cur_stream = NULL; 1008 mtcp = GetMTCPManager(mctx); 1009 if (!mtcp) { 1010 errno = EACCES; 1011 return -1; 1012 } 1013 1014 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1015 TRACE_API("Socket id %d out of range.\n", sockid); 1016 errno = EBADF; 1017 return -1; 1018 } 1019 1020 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 1021 TRACE_API("Invalid socket id: %d\n", sockid); 1022 errno = EBADF; 1023 return -1; 1024 } 1025 1026 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 1027 TRACE_API("Not an end socket. id: %d\n", sockid); 1028 errno = ENOTSOCK; 1029 return -1; 1030 } 1031 1032 if (!addr) { 1033 TRACE_API("Socket %d: empty address!\n", sockid); 1034 errno = EFAULT; 1035 return -1; 1036 } 1037 1038 /* we only allow bind() for AF_INET address */ 1039 if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 1040 TRACE_API("Socket %d: invalid argument!\n", sockid); 1041 errno = EAFNOSUPPORT; 1042 return -1; 1043 } 1044 1045 socket = &mtcp->smap[sockid]; 1046 if (socket->stream) { 1047 TRACE_API("Socket %d: stream already exist!\n", sockid); 1048 if (socket->stream->state >= TCP_ST_ESTABLISHED) { 1049 errno = EISCONN; 1050 } else { 1051 errno = EALREADY; 1052 } 1053 return -1; 1054 } 1055 1056 addr_in = (struct sockaddr_in *)addr; 1057 dip = addr_in->sin_addr.s_addr; 1058 dport = addr_in->sin_port; 1059 1060 /* address binding */ 1061 if (socket->opts & MTCP_ADDR_BIND && 1062 socket->saddr.sin_port != INPORT_ANY && 1063 socket->saddr.sin_addr.s_addr != INADDR_ANY) { 1064 int rss_core; 1065 1066 rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip, 1067 socket->saddr.sin_port, dport, num_queues); 1068 1069 if (rss_core != mctx->cpu) { 1070 errno = EINVAL; 1071 return -1; 1072 } 1073 } else { 1074 if (mtcp->ap) { 1075 ret = FetchAddress(mtcp->ap, 1076 mctx->cpu, num_queues, addr_in, &socket->saddr); 1077 } else { 1078 nif = GetOutputInterface(dip); 1079 if (nif < 0) { 1080 errno = EINVAL; 1081 return -1; 1082 } 1083 ret = FetchAddress(ap[nif], 1084 mctx->cpu, num_queues, addr_in, &socket->saddr); 1085 } 1086 if (ret < 0) { 1087 errno = EAGAIN; 1088 return -1; 1089 } 1090 socket->opts |= MTCP_ADDR_BIND; 1091 is_dyn_bound = TRUE; 1092 } 1093 1094 cnt_match = 0; 1095 if (mtcp->num_msp > 0) { 1096 TAILQ_FOREACH(walk, &mtcp->monitors, link) { 1097 fcode = walk->stream_syn_fcode; 1098 if (!(ISSET_BPFFILTER(fcode) && 1099 eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr, 1100 socket->saddr.sin_port, 1101 dip, dport) == 0)) { 1102 walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1 1103 cnt_match++; 1104 } 1105 } 1106 } 1107 1108 if (mtcp->num_msp > 0 && cnt_match > 0) { 1109 /* 150820 dhkim: XXX: embedded mode is not verified */ 1110 #if 1 1111 cur_stream = CreateClientTCPStream(mtcp, socket, 1112 STREAM_TYPE(MOS_SOCK_STREAM) | 1113 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 1114 socket->saddr.sin_addr.s_addr, 1115 socket->saddr.sin_port, dip, dport, NULL); 1116 #else 1117 cur_stream = CreateDualTCPStream(mtcp, socket, 1118 STREAM_TYPE(MOS_SOCK_STREAM) | 1119 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 1120 socket->saddr.sin_addr.s_addr, 1121 socket->saddr.sin_port, dip, dport, NULL); 1122 #endif 1123 } 1124 else 1125 cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM), 1126 socket->saddr.sin_addr.s_addr, 1127 socket->saddr.sin_port, dip, dport, NULL); 1128 if (!cur_stream) { 1129 TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid); 1130 errno = ENOMEM; 1131 return -1; 1132 } 1133 1134 if (is_dyn_bound) 1135 cur_stream->is_bound_addr = TRUE; 1136 cur_stream->sndvar->cwnd = 1; 1137 cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10; 1138 cur_stream->side = MOS_SIDE_CLI; 1139 /* if monitor is enabled, update the pair stream side as well */ 1140 if (cur_stream->pair_stream) { 1141 cur_stream->pair_stream->side = MOS_SIDE_SVR; 1142 /* 1143 * if buffer management is off, then disable 1144 * monitoring tcp ring of server... 1145 * if there is even a single monitor asking for 1146 * buffer management, enable it (that's why the 1147 * need for the loop) 1148 */ 1149 cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF; 1150 struct socket_map *walk; 1151 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 1152 uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt; 1153 if (bm > cur_stream->pair_stream->buffer_mgmt) { 1154 cur_stream->pair_stream->buffer_mgmt = bm; 1155 break; 1156 } 1157 } SOCKQ_FOREACH_END; 1158 } 1159 1160 cur_stream->state = TCP_ST_SYN_SENT; 1161 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1162 1163 TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id); 1164 1165 SQ_LOCK(&mtcp->ctx->connect_lock); 1166 ret = StreamEnqueue(mtcp->connectq, cur_stream); 1167 SQ_UNLOCK(&mtcp->ctx->connect_lock); 1168 mtcp->wakeup_flag = TRUE; 1169 if (ret < 0) { 1170 TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid); 1171 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1172 StreamEnqueue(mtcp->destroyq, cur_stream); 1173 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1174 errno = EAGAIN; 1175 return -1; 1176 } 1177 1178 /* if nonblocking socket, return EINPROGRESS */ 1179 if (socket->opts & MTCP_NONBLOCK) { 1180 errno = EINPROGRESS; 1181 return -1; 1182 1183 } else { 1184 while (1) { 1185 if (!cur_stream) { 1186 TRACE_ERROR("STREAM DESTROYED\n"); 1187 errno = ETIMEDOUT; 1188 return -1; 1189 } 1190 if (cur_stream->state > TCP_ST_ESTABLISHED) { 1191 TRACE_ERROR("Socket %d: weird state %s\n", 1192 sockid, TCPStateToString(cur_stream)); 1193 // TODO: how to handle this? 1194 errno = ENOSYS; 1195 return -1; 1196 } 1197 1198 if (cur_stream->state == TCP_ST_ESTABLISHED) { 1199 break; 1200 } 1201 usleep(1000); 1202 } 1203 } 1204 1205 return 0; 1206 } 1207 /*----------------------------------------------------------------------------*/ 1208 static inline int 1209 CloseStreamSocket(mctx_t mctx, int sockid) 1210 { 1211 mtcp_manager_t mtcp; 1212 tcp_stream *cur_stream; 1213 int ret; 1214 1215 mtcp = GetMTCPManager(mctx); 1216 if (!mtcp) { 1217 errno = EACCES; 1218 return -1; 1219 } 1220 1221 cur_stream = mtcp->smap[sockid].stream; 1222 if (!cur_stream) { 1223 TRACE_API("Socket %d: stream does not exist.\n", sockid); 1224 errno = ENOTCONN; 1225 return -1; 1226 } 1227 1228 if (cur_stream->closed) { 1229 TRACE_API("Socket %d (Stream %u): already closed stream\n", 1230 sockid, cur_stream->id); 1231 return 0; 1232 } 1233 cur_stream->closed = TRUE; 1234 1235 TRACE_API("Stream %d: closing the stream.\n", cur_stream->id); 1236 1237 /* 141029 dhkim: Check this! */ 1238 cur_stream->socket = NULL; 1239 1240 if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 1241 TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n", 1242 cur_stream->id); 1243 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1244 StreamEnqueue(mtcp->destroyq, cur_stream); 1245 mtcp->wakeup_flag = TRUE; 1246 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1247 return 0; 1248 1249 } else if (cur_stream->state == TCP_ST_SYN_SENT) { 1250 #if 1 1251 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1252 StreamEnqueue(mtcp->destroyq, cur_stream); 1253 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1254 mtcp->wakeup_flag = TRUE; 1255 #endif 1256 return -1; 1257 1258 } else if (cur_stream->state != TCP_ST_ESTABLISHED && 1259 cur_stream->state != TCP_ST_CLOSE_WAIT) { 1260 TRACE_API("Stream %d at state %s\n", 1261 cur_stream->id, TCPStateToString(cur_stream)); 1262 errno = EBADF; 1263 return -1; 1264 } 1265 1266 SQ_LOCK(&mtcp->ctx->close_lock); 1267 cur_stream->sndvar->on_closeq = TRUE; 1268 ret = StreamEnqueue(mtcp->closeq, cur_stream); 1269 mtcp->wakeup_flag = TRUE; 1270 SQ_UNLOCK(&mtcp->ctx->close_lock); 1271 1272 if (ret < 0) { 1273 TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 1274 errno = EAGAIN; 1275 return -1; 1276 } 1277 1278 return 0; 1279 } 1280 /*----------------------------------------------------------------------------*/ 1281 static inline int 1282 CloseListeningSocket(mctx_t mctx, int sockid) 1283 { 1284 mtcp_manager_t mtcp; 1285 struct tcp_listener *listener; 1286 1287 mtcp = GetMTCPManager(mctx); 1288 if (!mtcp) { 1289 errno = EACCES; 1290 return -1; 1291 } 1292 1293 listener = mtcp->smap[sockid].listener; 1294 if (!listener) { 1295 errno = EINVAL; 1296 return -1; 1297 } 1298 1299 if (listener->acceptq) { 1300 DestroyStreamQueue(listener->acceptq); 1301 listener->acceptq = NULL; 1302 } 1303 1304 pthread_mutex_lock(&listener->accept_lock); 1305 pthread_cond_signal(&listener->accept_cond); 1306 pthread_mutex_unlock(&listener->accept_lock); 1307 1308 pthread_cond_destroy(&listener->accept_cond); 1309 pthread_mutex_destroy(&listener->accept_lock); 1310 1311 free(listener); 1312 mtcp->smap[sockid].listener = NULL; 1313 1314 return 0; 1315 } 1316 /*----------------------------------------------------------------------------*/ 1317 int 1318 mtcp_close(mctx_t mctx, int sockid) 1319 { 1320 mtcp_manager_t mtcp; 1321 int ret; 1322 1323 mtcp = GetMTCPManager(mctx); 1324 if (!mtcp) { 1325 errno = EACCES; 1326 return -1; 1327 } 1328 1329 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1330 TRACE_API("Socket id %d out of range.\n", sockid); 1331 errno = EBADF; 1332 return -1; 1333 } 1334 1335 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 1336 TRACE_API("Invalid socket id: %d\n", sockid); 1337 errno = EBADF; 1338 return -1; 1339 } 1340 1341 TRACE_API("Socket %d: mtcp_close called.\n", sockid); 1342 1343 switch (mtcp->smap[sockid].socktype) { 1344 case MOS_SOCK_STREAM: 1345 ret = CloseStreamSocket(mctx, sockid); 1346 break; 1347 1348 case MOS_SOCK_STREAM_LISTEN: 1349 ret = CloseListeningSocket(mctx, sockid); 1350 break; 1351 1352 case MOS_SOCK_EPOLL: 1353 ret = CloseEpollSocket(mctx, sockid); 1354 break; 1355 1356 case MOS_SOCK_PIPE: 1357 ret = PipeClose(mctx, sockid); 1358 break; 1359 1360 default: 1361 errno = EINVAL; 1362 ret = -1; 1363 break; 1364 } 1365 1366 FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 1367 1368 return ret; 1369 } 1370 /*----------------------------------------------------------------------------*/ 1371 int 1372 mtcp_abort(mctx_t mctx, int sockid) 1373 { 1374 mtcp_manager_t mtcp; 1375 tcp_stream *cur_stream; 1376 int ret; 1377 1378 mtcp = GetMTCPManager(mctx); 1379 if (!mtcp) { 1380 errno = EACCES; 1381 return -1; 1382 } 1383 1384 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1385 TRACE_API("Socket id %d out of range.\n", sockid); 1386 errno = EBADF; 1387 return -1; 1388 } 1389 1390 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 1391 TRACE_API("Invalid socket id: %d\n", sockid); 1392 errno = EBADF; 1393 return -1; 1394 } 1395 1396 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 1397 TRACE_API("Not an end socket. id: %d\n", sockid); 1398 errno = ENOTSOCK; 1399 return -1; 1400 } 1401 1402 cur_stream = mtcp->smap[sockid].stream; 1403 if (!cur_stream) { 1404 TRACE_API("Stream %d: does not exist.\n", sockid); 1405 errno = ENOTCONN; 1406 return -1; 1407 } 1408 1409 TRACE_API("Socket %d: mtcp_abort()\n", sockid); 1410 1411 FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 1412 cur_stream->socket = NULL; 1413 1414 if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 1415 TRACE_API("Stream %d: connection already reset.\n", sockid); 1416 return ERROR; 1417 1418 } else if (cur_stream->state == TCP_ST_SYN_SENT) { 1419 /* TODO: this should notify event failure to all 1420 previous read() or write() calls */ 1421 cur_stream->state = TCP_ST_CLOSED_RSVD; 1422 cur_stream->close_reason = TCP_ACTIVE_CLOSE; 1423 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1424 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1425 StreamEnqueue(mtcp->destroyq, cur_stream); 1426 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1427 mtcp->wakeup_flag = TRUE; 1428 return 0; 1429 1430 } else if (cur_stream->state == TCP_ST_CLOSING || 1431 cur_stream->state == TCP_ST_LAST_ACK || 1432 cur_stream->state == TCP_ST_TIME_WAIT) { 1433 cur_stream->state = TCP_ST_CLOSED_RSVD; 1434 cur_stream->close_reason = TCP_ACTIVE_CLOSE; 1435 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1436 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1437 StreamEnqueue(mtcp->destroyq, cur_stream); 1438 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1439 mtcp->wakeup_flag = TRUE; 1440 return 0; 1441 } 1442 1443 /* the stream structure will be destroyed after sending RST */ 1444 if (cur_stream->sndvar->on_resetq) { 1445 TRACE_ERROR("Stream %d: calling mtcp_abort() " 1446 "when in reset queue.\n", sockid); 1447 errno = ECONNRESET; 1448 return -1; 1449 } 1450 SQ_LOCK(&mtcp->ctx->reset_lock); 1451 cur_stream->sndvar->on_resetq = TRUE; 1452 ret = StreamEnqueue(mtcp->resetq, cur_stream); 1453 SQ_UNLOCK(&mtcp->ctx->reset_lock); 1454 mtcp->wakeup_flag = TRUE; 1455 1456 if (ret < 0) { 1457 TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 1458 errno = EAGAIN; 1459 return -1; 1460 } 1461 1462 return 0; 1463 } 1464 /*----------------------------------------------------------------------------*/ 1465 static inline int 1466 PeekForUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len) 1467 { 1468 struct tcp_recv_vars *rcvvar = cur_stream->rcvvar; 1469 int copylen; 1470 tcprb_t *rb = rcvvar->rcvbuf; 1471 1472 if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) { 1473 errno = EAGAIN; 1474 return -1; 1475 } 1476 1477 return copylen; 1478 } 1479 /*----------------------------------------------------------------------------*/ 1480 static inline int 1481 CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len) 1482 { 1483 struct tcp_recv_vars *rcvvar = cur_stream->rcvvar; 1484 int copylen; 1485 tcprb_t *rb = rcvvar->rcvbuf; 1486 if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) { 1487 errno = EAGAIN; 1488 return -1; 1489 } 1490 tcprb_setpile(rb, rb->pile + copylen); 1491 1492 rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb); 1493 //printf("rcv_wnd: %d\n", rcvvar->rcv_wnd); 1494 1495 /* Advertise newly freed receive buffer */ 1496 if (cur_stream->need_wnd_adv) { 1497 if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) { 1498 if (!cur_stream->sndvar->on_ackq) { 1499 SQ_LOCK(&mtcp->ctx->ackq_lock); 1500 cur_stream->sndvar->on_ackq = TRUE; 1501 StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */ 1502 SQ_UNLOCK(&mtcp->ctx->ackq_lock); 1503 cur_stream->need_wnd_adv = FALSE; 1504 mtcp->wakeup_flag = TRUE; 1505 } 1506 } 1507 } 1508 1509 return copylen; 1510 } 1511 /*----------------------------------------------------------------------------*/ 1512 ssize_t 1513 mtcp_recv(mctx_t mctx, int sockid, char *buf, size_t len, int flags) 1514 { 1515 mtcp_manager_t mtcp; 1516 socket_map_t socket; 1517 tcp_stream *cur_stream; 1518 struct tcp_recv_vars *rcvvar; 1519 int event_remaining, merged_len; 1520 int ret; 1521 1522 mtcp = GetMTCPManager(mctx); 1523 if (!mtcp) { 1524 errno = EACCES; 1525 return -1; 1526 } 1527 1528 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1529 TRACE_API("Socket id %d out of range.\n", sockid); 1530 errno = EBADF; 1531 return -1; 1532 } 1533 1534 socket = &mtcp->smap[sockid]; 1535 if (socket->socktype == MOS_SOCK_UNUSED) { 1536 TRACE_API("Invalid socket id: %d\n", sockid); 1537 errno = EBADF; 1538 return -1; 1539 } 1540 1541 if (socket->socktype == MOS_SOCK_PIPE) { 1542 return PipeRead(mctx, sockid, buf, len); 1543 } 1544 1545 if (socket->socktype != MOS_SOCK_STREAM) { 1546 TRACE_API("Not an end socket. id: %d\n", sockid); 1547 errno = ENOTSOCK; 1548 return -1; 1549 } 1550 1551 /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 1552 cur_stream = socket->stream; 1553 if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf || 1554 !(cur_stream->state >= TCP_ST_ESTABLISHED && 1555 cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 1556 errno = ENOTCONN; 1557 return -1; 1558 } 1559 1560 rcvvar = cur_stream->rcvvar; 1561 1562 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1563 1564 /* if CLOSE_WAIT, return 0 if there is no payload */ 1565 if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 1566 if (merged_len == 0) 1567 return 0; 1568 } 1569 1570 /* return EAGAIN if no receive buffer */ 1571 if (socket->opts & MTCP_NONBLOCK) { 1572 if (merged_len == 0) { 1573 errno = EAGAIN; 1574 return -1; 1575 } 1576 } 1577 1578 SBUF_LOCK(&rcvvar->read_lock); 1579 1580 switch (flags) { 1581 case 0: 1582 ret = CopyToUser(mtcp, cur_stream, buf, len); 1583 break; 1584 case MSG_PEEK: 1585 ret = PeekForUser(mtcp, cur_stream, buf, len); 1586 break; 1587 default: 1588 SBUF_UNLOCK(&rcvvar->read_lock); 1589 ret = -1; 1590 errno = EINVAL; 1591 return ret; 1592 } 1593 1594 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1595 event_remaining = FALSE; 1596 /* if there are remaining payload, generate EPOLLIN */ 1597 /* (may due to insufficient user buffer) */ 1598 if (socket->epoll & MOS_EPOLLIN) { 1599 if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 1600 event_remaining = TRUE; 1601 } 1602 } 1603 /* if waiting for close, notify it if no remaining data */ 1604 if (cur_stream->state == TCP_ST_CLOSE_WAIT && 1605 merged_len == 0 && ret > 0) { 1606 event_remaining = TRUE; 1607 } 1608 1609 SBUF_UNLOCK(&rcvvar->read_lock); 1610 1611 if (event_remaining) { 1612 if (socket->epoll) { 1613 AddEpollEvent(mtcp->ep, 1614 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 1615 } 1616 } 1617 1618 TRACE_API("Stream %d: mtcp_recv() returning %d\n", cur_stream->id, ret); 1619 return ret; 1620 } 1621 /*----------------------------------------------------------------------------*/ 1622 inline ssize_t 1623 mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len) 1624 { 1625 return mtcp_recv(mctx, sockid, buf, len, 0); 1626 } 1627 /*----------------------------------------------------------------------------*/ 1628 ssize_t 1629 mtcp_readv(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV) 1630 { 1631 mtcp_manager_t mtcp; 1632 socket_map_t socket; 1633 tcp_stream *cur_stream; 1634 struct tcp_recv_vars *rcvvar; 1635 int ret, bytes_read, i; 1636 int event_remaining, merged_len; 1637 1638 mtcp = GetMTCPManager(mctx); 1639 if (!mtcp) { 1640 errno = EACCES; 1641 return -1; 1642 } 1643 1644 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1645 TRACE_API("Socket id %d out of range.\n", sockid); 1646 errno = EBADF; 1647 return -1; 1648 } 1649 1650 socket = &mtcp->smap[sockid]; 1651 if (socket->socktype == MOS_SOCK_UNUSED) { 1652 TRACE_API("Invalid socket id: %d\n", sockid); 1653 errno = EBADF; 1654 return -1; 1655 } 1656 1657 if (socket->socktype != MOS_SOCK_STREAM) { 1658 TRACE_API("Not an end socket. id: %d\n", sockid); 1659 errno = ENOTSOCK; 1660 return -1; 1661 } 1662 1663 /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 1664 cur_stream = socket->stream; 1665 if (!cur_stream || !cur_stream->rcvvar->rcvbuf || 1666 !(cur_stream->state >= TCP_ST_ESTABLISHED && 1667 cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 1668 errno = ENOTCONN; 1669 return -1; 1670 } 1671 1672 rcvvar = cur_stream->rcvvar; 1673 1674 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1675 1676 /* if CLOSE_WAIT, return 0 if there is no payload */ 1677 if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 1678 if (merged_len == 0) 1679 return 0; 1680 } 1681 1682 /* return EAGAIN if no receive buffer */ 1683 if (socket->opts & MTCP_NONBLOCK) { 1684 if (merged_len == 0) { 1685 errno = EAGAIN; 1686 return -1; 1687 } 1688 } 1689 1690 SBUF_LOCK(&rcvvar->read_lock); 1691 1692 /* read and store the contents to the vectored buffers */ 1693 bytes_read = 0; 1694 for (i = 0; i < numIOV; i++) { 1695 if (iov[i].iov_len <= 0) 1696 continue; 1697 1698 ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 1699 if (ret <= 0) 1700 break; 1701 1702 bytes_read += ret; 1703 1704 if (ret < iov[i].iov_len) 1705 break; 1706 } 1707 1708 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1709 1710 event_remaining = FALSE; 1711 /* if there are remaining payload, generate read event */ 1712 /* (may due to insufficient user buffer) */ 1713 if (socket->epoll & MOS_EPOLLIN) { 1714 if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 1715 event_remaining = TRUE; 1716 } 1717 } 1718 /* if waiting for close, notify it if no remaining data */ 1719 if (cur_stream->state == TCP_ST_CLOSE_WAIT && 1720 merged_len == 0 && bytes_read > 0) { 1721 event_remaining = TRUE; 1722 } 1723 1724 SBUF_UNLOCK(&rcvvar->read_lock); 1725 1726 if(event_remaining) { 1727 if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) { 1728 AddEpollEvent(mtcp->ep, 1729 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 1730 } 1731 } 1732 1733 TRACE_API("Stream %d: mtcp_readv() returning %d\n", 1734 cur_stream->id, bytes_read); 1735 return bytes_read; 1736 } 1737 /*----------------------------------------------------------------------------*/ 1738 static inline int 1739 CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, const char *buf, int len) 1740 { 1741 struct tcp_send_vars *sndvar = cur_stream->sndvar; 1742 int sndlen; 1743 int ret; 1744 1745 sndlen = MIN((int)sndvar->snd_wnd, len); 1746 if (sndlen <= 0) { 1747 errno = EAGAIN; 1748 return -1; 1749 } 1750 1751 /* allocate send buffer if not exist */ 1752 if (!sndvar->sndbuf) { 1753 sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1); 1754 if (!sndvar->sndbuf) { 1755 cur_stream->close_reason = TCP_NO_MEM; 1756 /* notification may not required due to -1 return */ 1757 errno = ENOMEM; 1758 return -1; 1759 } 1760 } 1761 1762 ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen); 1763 assert(ret == sndlen); 1764 sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len; 1765 if (ret <= 0) { 1766 TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n", 1767 ret, sndlen, sndvar->sndbuf->len); 1768 errno = EAGAIN; 1769 return -1; 1770 } 1771 1772 if (sndvar->snd_wnd <= 0) { 1773 TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n", 1774 cur_stream->id, sndvar->snd_wnd); 1775 } 1776 1777 return ret; 1778 } 1779 /*----------------------------------------------------------------------------*/ 1780 ssize_t 1781 mtcp_write(mctx_t mctx, int sockid, const char *buf, size_t len) 1782 { 1783 mtcp_manager_t mtcp; 1784 socket_map_t socket; 1785 tcp_stream *cur_stream; 1786 struct tcp_send_vars *sndvar; 1787 int ret; 1788 1789 mtcp = GetMTCPManager(mctx); 1790 if (!mtcp) { 1791 errno = EACCES; 1792 return -1; 1793 } 1794 1795 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1796 TRACE_API("Socket id %d out of range.\n", sockid); 1797 errno = EBADF; 1798 return -1; 1799 } 1800 1801 socket = &mtcp->smap[sockid]; 1802 if (socket->socktype == MOS_SOCK_UNUSED) { 1803 TRACE_API("Invalid socket id: %d\n", sockid); 1804 errno = EBADF; 1805 return -1; 1806 } 1807 1808 if (socket->socktype == MOS_SOCK_PIPE) { 1809 return PipeWrite(mctx, sockid, buf, len); 1810 } 1811 1812 if (socket->socktype != MOS_SOCK_STREAM) { 1813 TRACE_API("Not an end socket. id: %d\n", sockid); 1814 errno = ENOTSOCK; 1815 return -1; 1816 } 1817 1818 cur_stream = socket->stream; 1819 if (!cur_stream || 1820 !(cur_stream->state == TCP_ST_ESTABLISHED || 1821 cur_stream->state == TCP_ST_CLOSE_WAIT)) { 1822 errno = ENOTCONN; 1823 return -1; 1824 } 1825 1826 if (len <= 0) { 1827 if (socket->opts & MTCP_NONBLOCK) { 1828 errno = EAGAIN; 1829 return -1; 1830 } else { 1831 return 0; 1832 } 1833 } 1834 1835 sndvar = cur_stream->sndvar; 1836 1837 SBUF_LOCK(&sndvar->write_lock); 1838 ret = CopyFromUser(mtcp, cur_stream, buf, len); 1839 1840 SBUF_UNLOCK(&sndvar->write_lock); 1841 1842 if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 1843 SQ_LOCK(&mtcp->ctx->sendq_lock); 1844 sndvar->on_sendq = TRUE; 1845 StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 1846 SQ_UNLOCK(&mtcp->ctx->sendq_lock); 1847 mtcp->wakeup_flag = TRUE; 1848 } 1849 1850 if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) { 1851 ret = -1; 1852 errno = EAGAIN; 1853 } 1854 1855 /* if there are remaining sending buffer, generate write event */ 1856 if (sndvar->snd_wnd > 0) { 1857 if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 1858 AddEpollEvent(mtcp->ep, 1859 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 1860 } 1861 } 1862 1863 TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret); 1864 return ret; 1865 } 1866 /*----------------------------------------------------------------------------*/ 1867 ssize_t 1868 mtcp_writev(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV) 1869 { 1870 mtcp_manager_t mtcp; 1871 socket_map_t socket; 1872 tcp_stream *cur_stream; 1873 struct tcp_send_vars *sndvar; 1874 int ret, to_write, i; 1875 1876 mtcp = GetMTCPManager(mctx); 1877 if (!mtcp) { 1878 errno = EACCES; 1879 return -1; 1880 } 1881 1882 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1883 TRACE_API("Socket id %d out of range.\n", sockid); 1884 errno = EBADF; 1885 return -1; 1886 } 1887 1888 socket = &mtcp->smap[sockid]; 1889 if (socket->socktype == MOS_SOCK_UNUSED) { 1890 TRACE_API("Invalid socket id: %d\n", sockid); 1891 errno = EBADF; 1892 return -1; 1893 } 1894 1895 if (socket->socktype != MOS_SOCK_STREAM) { 1896 TRACE_API("Not an end socket. id: %d\n", sockid); 1897 errno = ENOTSOCK; 1898 return -1; 1899 } 1900 1901 cur_stream = socket->stream; 1902 if (!cur_stream || 1903 !(cur_stream->state == TCP_ST_ESTABLISHED || 1904 cur_stream->state == TCP_ST_CLOSE_WAIT)) { 1905 errno = ENOTCONN; 1906 return -1; 1907 } 1908 1909 sndvar = cur_stream->sndvar; 1910 SBUF_LOCK(&sndvar->write_lock); 1911 1912 /* write from the vectored buffers */ 1913 to_write = 0; 1914 for (i = 0; i < numIOV; i++) { 1915 if (iov[i].iov_len <= 0) 1916 continue; 1917 1918 ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 1919 if (ret <= 0) 1920 break; 1921 1922 to_write += ret; 1923 1924 if (ret < iov[i].iov_len) 1925 break; 1926 } 1927 SBUF_UNLOCK(&sndvar->write_lock); 1928 1929 if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 1930 SQ_LOCK(&mtcp->ctx->sendq_lock); 1931 sndvar->on_sendq = TRUE; 1932 StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 1933 SQ_UNLOCK(&mtcp->ctx->sendq_lock); 1934 mtcp->wakeup_flag = TRUE; 1935 } 1936 1937 if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) { 1938 to_write = -1; 1939 errno = EAGAIN; 1940 } 1941 1942 /* if there are remaining sending buffer, generate write event */ 1943 if (sndvar->snd_wnd > 0) { 1944 if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 1945 AddEpollEvent(mtcp->ep, 1946 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 1947 } 1948 } 1949 1950 TRACE_API("Stream %d: mtcp_writev() returning %d\n", 1951 cur_stream->id, to_write); 1952 return to_write; 1953 } 1954 /*----------------------------------------------------------------------------*/ 1955 uint32_t 1956 mtcp_get_connection_cnt(mctx_t mctx) 1957 { 1958 mtcp_manager_t mtcp; 1959 mtcp = GetMTCPManager(mctx); 1960 if (!mtcp) { 1961 errno = EACCES; 1962 return -1; 1963 } 1964 1965 if (mtcp->num_msp > 0) 1966 return mtcp->flow_cnt / 2; 1967 else 1968 return mtcp->flow_cnt; 1969 } 1970 /*----------------------------------------------------------------------------*/ 1971