1 #include <sys/queue.h> 2 #include <sys/ioctl.h> 3 #include <limits.h> 4 #include <unistd.h> 5 #include <assert.h> 6 #include <string.h> 7 8 #ifdef DARWIN 9 #include <netinet/tcp.h> 10 #include <netinet/ip.h> 11 #include <netinet/if_ether.h> 12 #endif 13 14 #include "mtcp.h" 15 #include "mtcp_api.h" 16 #include "tcp_in.h" 17 #include "tcp_stream.h" 18 #include "tcp_out.h" 19 #include "ip_out.h" 20 #include "eventpoll.h" 21 #include "pipe.h" 22 #include "fhash.h" 23 #include "addr_pool.h" 24 #include "rss.h" 25 #include "config.h" 26 #include "debug.h" 27 #include "eventpoll.h" 28 #include "mos_api.h" 29 #include "tcp_rb.h" 30 31 #define MAX(a, b) ((a)>(b)?(a):(b)) 32 #define MIN(a, b) ((a)<(b)?(a):(b)) 33 34 /*----------------------------------------------------------------------------*/ 35 /** Stop monitoring the socket! (function prototype) 36 * @param [in] mctx: mtcp context 37 * @param [in] sock: monitoring stream socket id 38 * @param [in] side: side of monitoring (client side, server side or both) 39 * 40 * This function is now DEPRECATED and is only used within mOS core... 41 */ 42 int 43 mtcp_cb_stop(mctx_t mctx, int sock, int side); 44 /*----------------------------------------------------------------------------*/ 45 /** Reset the connection (send RST packets to both sides) 46 * (We need to decide the API for this.) 47 */ 48 //int 49 //mtcp_cb_reset(mctx_t mctx, int sock, int side); 50 /*----------------------------------------------------------------------------*/ 51 inline mtcp_manager_t 52 GetMTCPManager(mctx_t mctx) 53 { 54 if (!mctx) { 55 errno = EACCES; 56 return NULL; 57 } 58 59 if (mctx->cpu < 0 || mctx->cpu >= num_cpus) { 60 errno = EINVAL; 61 return NULL; 62 } 63 64 if (g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) { 65 errno = EPERM; 66 return NULL; 67 } 68 69 return g_mtcp[mctx->cpu]; 70 } 71 /*----------------------------------------------------------------------------*/ 72 inline int 73 GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen) 74 { 75 tcp_stream *cur_stream; 76 77 if (!socket->stream) { 78 errno = EBADF; 79 return -1; 80 } 81 82 cur_stream = socket->stream; 83 if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 84 if (cur_stream->close_reason == TCP_TIMEDOUT || 85 cur_stream->close_reason == TCP_CONN_FAIL || 86 cur_stream->close_reason == TCP_CONN_LOST) { 87 *(int *)optval = ETIMEDOUT; 88 *optlen = sizeof(int); 89 90 return 0; 91 } 92 } 93 94 if (cur_stream->state == TCP_ST_CLOSE_WAIT || 95 cur_stream->state == TCP_ST_CLOSED_RSVD) { 96 if (cur_stream->close_reason == TCP_RESET) { 97 *(int *)optval = ECONNRESET; 98 *optlen = sizeof(int); 99 100 return 0; 101 } 102 } 103 104 /* 105 * `base case`: If socket sees no so_error, then 106 * this also means close_reason will always be 107 * TCP_NOT_CLOSED. 108 */ 109 if (cur_stream->close_reason == TCP_NOT_CLOSED) { 110 *(int *)optval = 0; 111 *optlen = sizeof(int); 112 113 return 0; 114 } 115 116 errno = ENOSYS; 117 return -1; 118 } 119 /*----------------------------------------------------------------------------*/ 120 int 121 mtcp_getsockopt(mctx_t mctx, int sockid, int level, 122 int optname, void *optval, socklen_t *optlen) 123 { 124 mtcp_manager_t mtcp; 125 socket_map_t socket; 126 127 mtcp = GetMTCPManager(mctx); 128 if (!mtcp) { 129 errno = EACCES; 130 return -1; 131 } 132 133 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 134 TRACE_API("Socket id %d out of range.\n", sockid); 135 errno = EBADF; 136 return -1; 137 } 138 139 switch (level) { 140 case SOL_SOCKET: 141 socket = &mtcp->smap[sockid]; 142 if (socket->socktype == MOS_SOCK_UNUSED) { 143 TRACE_API("Invalid socket id: %d\n", sockid); 144 errno = EBADF; 145 return -1; 146 } 147 148 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 149 socket->socktype != MOS_SOCK_STREAM) { 150 TRACE_API("Invalid socket id: %d\n", sockid); 151 errno = ENOTSOCK; 152 return -1; 153 } 154 155 if (optname == SO_ERROR) { 156 if (socket->socktype == MOS_SOCK_STREAM) { 157 return GetSocketError(socket, optval, optlen); 158 } 159 } 160 break; 161 case SOL_MONSOCKET: 162 /* check if the calling thread is in MOS context */ 163 if (mtcp->ctx->thread != pthread_self()) { 164 errno = EPERM; 165 return -1; 166 } 167 /* 168 * All options will only work for active 169 * monitor stream sockets 170 */ 171 socket = &mtcp->msmap[sockid]; 172 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 173 TRACE_API("Invalid socket id: %d\n", sockid); 174 errno = ENOTSOCK; 175 return -1; 176 } 177 178 switch (optname) { 179 case MOS_FRAGINFO_CLIBUF: 180 return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen); 181 case MOS_FRAGINFO_SVRBUF: 182 return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen); 183 case MOS_INFO_CLIBUF: 184 return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen); 185 case MOS_INFO_SVRBUF: 186 return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen); 187 case MOS_TCP_STATE_CLI: 188 return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI, 189 optval, optlen); 190 case MOS_TCP_STATE_SVR: 191 return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR, 192 optval, optlen); 193 case MOS_TIMESTAMP: 194 return GetLastTimestamp(socket->monitor_stream->stream, 195 (uint32_t *)optval, 196 optlen); 197 default: 198 TRACE_API("can't recognize the optname=%d\n", optname); 199 assert(0); 200 } 201 break; 202 } 203 errno = ENOSYS; 204 return -1; 205 } 206 /*----------------------------------------------------------------------------*/ 207 int 208 mtcp_setsockopt(mctx_t mctx, int sockid, int level, 209 int optname, const void *optval, socklen_t optlen) 210 { 211 mtcp_manager_t mtcp; 212 socket_map_t socket; 213 tcprb_t *rb; 214 215 mtcp = GetMTCPManager(mctx); 216 if (!mtcp) { 217 errno = EACCES; 218 return -1; 219 } 220 221 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 222 TRACE_API("Socket id %d out of range.\n", sockid); 223 errno = EBADF; 224 return -1; 225 } 226 227 switch (level) { 228 case SOL_SOCKET: 229 socket = &mtcp->smap[sockid]; 230 if (socket->socktype == MOS_SOCK_UNUSED) { 231 TRACE_API("Invalid socket id: %d\n", sockid); 232 errno = EBADF; 233 return -1; 234 } 235 236 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 237 socket->socktype != MOS_SOCK_STREAM) { 238 TRACE_API("Invalid socket id: %d\n", sockid); 239 errno = ENOTSOCK; 240 return -1; 241 } 242 break; 243 case SOL_MONSOCKET: 244 socket = &mtcp->msmap[sockid]; 245 /* 246 * checking of calling thread to be in MOS context is 247 * disabled since both optnames can be called from 248 * `application' context (on passive sockets) 249 */ 250 /* 251 * if (mtcp->ctx->thread != pthread_self()) 252 * return -1; 253 */ 254 255 switch (optname) { 256 case MOS_CLIBUF: 257 #if 0 258 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 259 errno = EBADF; 260 return -1; 261 } 262 #endif 263 #ifdef NEWRB 264 #ifdef DISABLE_DYN_RESIZE 265 if (*(int *)optval != 0) 266 return -1; 267 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 268 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 269 socket->monitor_stream->stream->rcvvar->rcvbuf : 270 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 271 if (rb) { 272 tcprb_resize_meta(rb, 0); 273 tcprb_resize(rb, 0); 274 } 275 } 276 return DisableBuf(socket, MOS_SIDE_CLI); 277 #else 278 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 279 socket->monitor_stream->stream->rcvvar->rcvbuf : 280 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 281 if (tcprb_resize_meta(rb, *(int *)optval) < 0) 282 return -1; 283 return tcprb_resize(rb, 284 (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 285 #endif 286 #else 287 errno = EBADF; 288 return -1; 289 #endif 290 case MOS_SVRBUF: 291 #if 0 292 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 293 errno = EBADF; 294 return -1; 295 } 296 #endif 297 #ifdef NEWRB 298 #ifdef DISABLE_DYN_RESIZE 299 if (*(int *)optval != 0) 300 return -1; 301 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 302 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 303 socket->monitor_stream->stream->rcvvar->rcvbuf : 304 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 305 if (rb) { 306 tcprb_resize_meta(rb, 0); 307 tcprb_resize(rb, 0); 308 } 309 } 310 return DisableBuf(socket, MOS_SIDE_SVR); 311 #else 312 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 313 socket->monitor_stream->stream->rcvvar->rcvbuf : 314 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 315 if (tcprb_resize_meta(rb, *(int *)optval) < 0) 316 return -1; 317 return tcprb_resize(rb, 318 (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 319 #endif 320 #else 321 errno = EBADF; 322 return -1; 323 #endif 324 case MOS_FRAG_CLIBUF: 325 #if 0 326 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 327 errno = EBADF; 328 return -1; 329 } 330 #endif 331 #ifdef NEWRB 332 #ifdef DISABLE_DYN_RESIZE 333 if (*(int *)optval != 0) 334 return -1; 335 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 336 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 337 socket->monitor_stream->stream->rcvvar->rcvbuf : 338 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 339 if (rb) 340 tcprb_resize(rb, 0); 341 } 342 return 0; 343 #else 344 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 345 socket->monitor_stream->stream->rcvvar->rcvbuf : 346 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 347 if (rb->len == 0) 348 return tcprb_resize_meta(rb, *(int *)optval); 349 else 350 return -1; 351 #endif 352 #else 353 errno = EBADF; 354 return -1; 355 #endif 356 case MOS_FRAG_SVRBUF: 357 #if 0 358 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 359 errno = EBADF; 360 return -1; 361 } 362 #endif 363 #ifdef NEWRB 364 #ifdef DISABLE_DYN_RESIZE 365 if (*(int *)optval != 0) 366 return -1; 367 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 368 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 369 socket->monitor_stream->stream->rcvvar->rcvbuf : 370 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 371 if (rb) 372 tcprb_resize(rb, 0); 373 } 374 return 0; 375 #else 376 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 377 socket->monitor_stream->stream->rcvvar->rcvbuf : 378 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 379 if (rb->len == 0) 380 return tcprb_resize_meta(rb, *(int *)optval); 381 else 382 return -1; 383 #endif 384 #else 385 errno = EBADF; 386 return -1; 387 #endif 388 case MOS_MONLEVEL: 389 #ifdef OLD_API 390 assert(*(int *)optval == MOS_NO_CLIBUF || 391 *(int *)optval == MOS_NO_SVRBUF); 392 return DisableBuf(socket, 393 (*(int *)optval == MOS_NO_CLIBUF) ? 394 MOS_SIDE_CLI : MOS_SIDE_SVR); 395 #endif 396 case MOS_SEQ_REMAP: 397 return TcpSeqChange(socket, 398 (uint32_t)((seq_remap_info *)optval)->seq_off, 399 ((seq_remap_info *)optval)->side, 400 mtcp->pctx->p.seq); 401 case MOS_STOP_MON: 402 return mtcp_cb_stop(mctx, sockid, *(int *)optval); 403 default: 404 TRACE_API("invalid optname=%d\n", optname); 405 assert(0); 406 } 407 break; 408 } 409 410 errno = ENOSYS; 411 return -1; 412 } 413 /*----------------------------------------------------------------------------*/ 414 int 415 mtcp_setsock_nonblock(mctx_t mctx, int sockid) 416 { 417 mtcp_manager_t mtcp; 418 419 mtcp = GetMTCPManager(mctx); 420 if (!mtcp) { 421 errno = EACCES; 422 return -1; 423 } 424 425 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 426 TRACE_API("Socket id %d out of range.\n", sockid); 427 errno = EBADF; 428 return -1; 429 } 430 431 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 432 TRACE_API("Invalid socket id: %d\n", sockid); 433 errno = EBADF; 434 return -1; 435 } 436 437 mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 438 439 return 0; 440 } 441 /*----------------------------------------------------------------------------*/ 442 int 443 mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp) 444 { 445 mtcp_manager_t mtcp; 446 socket_map_t socket; 447 448 mtcp = GetMTCPManager(mctx); 449 if (!mtcp) { 450 errno = EACCES; 451 return -1; 452 } 453 454 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 455 TRACE_API("Socket id %d out of range.\n", sockid); 456 errno = EBADF; 457 return -1; 458 } 459 460 /* only support stream socket */ 461 socket = &mtcp->smap[sockid]; 462 463 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 464 socket->socktype != MOS_SOCK_STREAM) { 465 TRACE_API("Invalid socket id: %d\n", sockid); 466 errno = EBADF; 467 return -1; 468 } 469 470 if (!argp) { 471 errno = EFAULT; 472 return -1; 473 } 474 475 if (request == FIONREAD) { 476 tcp_stream *cur_stream; 477 #ifdef NEWRB 478 tcprb_t *rbuf; 479 #else 480 struct tcp_ring_buffer *rbuf; 481 #endif 482 483 cur_stream = socket->stream; 484 if (!cur_stream) { 485 errno = EBADF; 486 return -1; 487 } 488 489 rbuf = cur_stream->rcvvar->rcvbuf; 490 #ifdef NEWRB 491 *(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0; 492 #else 493 *(int *)argp = (rbuf) ? rbuf->merged_len : 0; 494 #endif 495 496 } else if (request == FIONBIO) { 497 /* 498 * sockets can only be set to blocking/non-blocking 499 * modes during initialization 500 */ 501 if ((*(int *)argp)) 502 mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 503 else 504 mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK; 505 } else { 506 errno = EINVAL; 507 return -1; 508 } 509 510 return 0; 511 } 512 /*----------------------------------------------------------------------------*/ 513 static int 514 mtcp_monitor(mctx_t mctx, socket_map_t sock) 515 { 516 mtcp_manager_t mtcp; 517 struct mon_listener *monitor; 518 int sockid = sock->id; 519 520 mtcp = GetMTCPManager(mctx); 521 if (!mtcp) { 522 errno = EACCES; 523 return -1; 524 } 525 526 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 527 TRACE_API("Socket id %d out of range.\n", sockid); 528 errno = EBADF; 529 return -1; 530 } 531 532 if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) { 533 TRACE_API("Invalid socket id: %d\n", sockid); 534 errno = EBADF; 535 return -1; 536 } 537 538 if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM || 539 mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) { 540 TRACE_API("Not a monitor socket. id: %d\n", sockid); 541 errno = ENOTSOCK; 542 return -1; 543 } 544 545 monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener)); 546 if (!monitor) { 547 /* errno set from the malloc() */ 548 errno = ENOMEM; 549 return -1; 550 } 551 552 /* create a monitor-specific event queue */ 553 monitor->eq = CreateEventQueue(g_config.mos->max_concurrency); 554 if (!monitor->eq) { 555 TRACE_API("Can't create event queue (concurrency: %d) for " 556 "monitor read event registrations!\n", 557 g_config.mos->max_concurrency); 558 free(monitor); 559 errno = ENOMEM; 560 return -1; 561 } 562 563 /* set monitor-related basic parameters */ 564 #ifndef NEWEV 565 monitor->ude_id = UDE_OFFSET; 566 #endif 567 monitor->socket = sock; 568 #ifdef NEWRB 569 monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL; 570 #else 571 monitor->client_buf_mgmt = monitor->server_buf_mgmt = 1; 572 #endif 573 574 /* perform both sides monitoring by default */ 575 monitor->client_mon = monitor->server_mon = 1; 576 577 /* add monitor socket to the monitor list */ 578 TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link); 579 580 mtcp->msmap[sockid].monitor_listener = monitor; 581 582 return 0; 583 } 584 /*----------------------------------------------------------------------------*/ 585 int 586 mtcp_socket(mctx_t mctx, int domain, int type, int protocol) 587 { 588 mtcp_manager_t mtcp; 589 socket_map_t socket; 590 591 mtcp = GetMTCPManager(mctx); 592 if (!mtcp) { 593 errno = EACCES; 594 return -1; 595 } 596 597 if (domain != AF_INET) { 598 errno = EAFNOSUPPORT; 599 return -1; 600 } 601 602 if (type == SOCK_STREAM) { 603 type = MOS_SOCK_STREAM; 604 } else if (type == MOS_SOCK_MONITOR_STREAM || 605 type == MOS_SOCK_MONITOR_RAW) { 606 /* do nothing for the time being */ 607 } else { 608 /* Not supported type */ 609 errno = EINVAL; 610 return -1; 611 } 612 613 socket = AllocateSocket(mctx, type); 614 if (!socket) { 615 errno = ENFILE; 616 return -1; 617 } 618 619 if (type == MOS_SOCK_MONITOR_STREAM || 620 type == MOS_SOCK_MONITOR_RAW) { 621 mtcp_manager_t mtcp = GetMTCPManager(mctx); 622 if (!mtcp) { 623 errno = EACCES; 624 return -1; 625 } 626 mtcp_monitor(mctx, socket); 627 #ifdef NEWEV 628 socket->monitor_listener->stree_dontcare = NULL; 629 socket->monitor_listener->stree_pre_rcv = NULL; 630 socket->monitor_listener->stree_post_snd = NULL; 631 #else 632 InitEvB(mtcp, &socket->monitor_listener->dontcare_evb); 633 InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb); 634 InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb); 635 #endif 636 } 637 638 return socket->id; 639 } 640 /*----------------------------------------------------------------------------*/ 641 int 642 mtcp_bind(mctx_t mctx, int sockid, 643 const struct sockaddr *addr, socklen_t addrlen) 644 { 645 mtcp_manager_t mtcp; 646 struct sockaddr_in *addr_in; 647 648 mtcp = GetMTCPManager(mctx); 649 if (!mtcp) { 650 errno = EACCES; 651 return -1; 652 } 653 654 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 655 TRACE_API("Socket id %d out of range.\n", sockid); 656 errno = EBADF; 657 return -1; 658 } 659 660 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 661 TRACE_API("Invalid socket id: %d\n", sockid); 662 errno = EBADF; 663 return -1; 664 } 665 666 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM && 667 mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 668 TRACE_API("Not a stream socket id: %d\n", sockid); 669 errno = ENOTSOCK; 670 return -1; 671 } 672 673 if (!addr) { 674 TRACE_API("Socket %d: empty address!\n", sockid); 675 errno = EINVAL; 676 return -1; 677 } 678 679 if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) { 680 TRACE_API("Socket %d: adress already bind for this socket.\n", sockid); 681 errno = EINVAL; 682 return -1; 683 } 684 685 /* we only allow bind() for AF_INET address */ 686 if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 687 TRACE_API("Socket %d: invalid argument!\n", sockid); 688 errno = EINVAL; 689 return -1; 690 } 691 692 if (mtcp->listener) { 693 TRACE_API("Address already bound!\n"); 694 errno = EINVAL; 695 return -1; 696 } 697 addr_in = (struct sockaddr_in *)addr; 698 mtcp->smap[sockid].saddr = *addr_in; 699 mtcp->smap[sockid].opts |= MTCP_ADDR_BIND; 700 701 return 0; 702 } 703 /*----------------------------------------------------------------------------*/ 704 int 705 mtcp_listen(mctx_t mctx, int sockid, int backlog) 706 { 707 mtcp_manager_t mtcp; 708 struct tcp_listener *listener; 709 710 mtcp = GetMTCPManager(mctx); 711 if (!mtcp) { 712 errno = EACCES; 713 return -1; 714 } 715 716 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 717 TRACE_API("Socket id %d out of range.\n", sockid); 718 errno = EBADF; 719 return -1; 720 } 721 722 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 723 TRACE_API("Invalid socket id: %d\n", sockid); 724 errno = EBADF; 725 return -1; 726 } 727 728 if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) { 729 mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN; 730 } 731 732 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 733 TRACE_API("Not a listening socket. id: %d\n", sockid); 734 errno = ENOTSOCK; 735 return -1; 736 } 737 738 if (backlog <= 0 || backlog > g_config.mos->max_concurrency) { 739 errno = EINVAL; 740 return -1; 741 } 742 743 listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener)); 744 if (!listener) { 745 /* errno set from the malloc() */ 746 errno = ENOMEM; 747 return -1; 748 } 749 750 listener->sockid = sockid; 751 listener->backlog = backlog; 752 listener->socket = &mtcp->smap[sockid]; 753 754 if (pthread_cond_init(&listener->accept_cond, NULL)) { 755 perror("pthread_cond_init of ctx->accept_cond\n"); 756 /* errno set by pthread_cond_init() */ 757 return -1; 758 } 759 if (pthread_mutex_init(&listener->accept_lock, NULL)) { 760 perror("pthread_mutex_init of ctx->accept_lock\n"); 761 /* errno set by pthread_mutex_init() */ 762 return -1; 763 } 764 765 listener->acceptq = CreateStreamQueue(backlog); 766 if (!listener->acceptq) { 767 errno = ENOMEM; 768 return -1; 769 } 770 771 mtcp->smap[sockid].listener = listener; 772 mtcp->listener = listener; 773 774 return 0; 775 } 776 /*----------------------------------------------------------------------------*/ 777 int 778 mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen) 779 { 780 mtcp_manager_t mtcp; 781 struct tcp_listener *listener; 782 socket_map_t socket; 783 tcp_stream *accepted = NULL; 784 785 mtcp = GetMTCPManager(mctx); 786 if (!mtcp) { 787 errno = EACCES; 788 return -1; 789 } 790 791 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 792 TRACE_API("Socket id %d out of range.\n", sockid); 793 errno = EBADF; 794 return -1; 795 } 796 797 /* requires listening socket */ 798 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 799 errno = EINVAL; 800 return -1; 801 } 802 803 listener = mtcp->smap[sockid].listener; 804 805 /* dequeue from the acceptq without lock first */ 806 /* if nothing there, acquire lock and cond_wait */ 807 accepted = StreamDequeue(listener->acceptq); 808 if (!accepted) { 809 if (listener->socket->opts & MTCP_NONBLOCK) { 810 errno = EAGAIN; 811 return -1; 812 813 } else { 814 pthread_mutex_lock(&listener->accept_lock); 815 while ((accepted = StreamDequeue(listener->acceptq)) == NULL) { 816 pthread_cond_wait(&listener->accept_cond, &listener->accept_lock); 817 818 if (mtcp->ctx->done || mtcp->ctx->exit) { 819 pthread_mutex_unlock(&listener->accept_lock); 820 errno = EINTR; 821 return -1; 822 } 823 } 824 pthread_mutex_unlock(&listener->accept_lock); 825 } 826 } 827 828 if (!accepted) { 829 TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n"); 830 } 831 832 if (!accepted->socket) { 833 socket = AllocateSocket(mctx, MOS_SOCK_STREAM); 834 if (!socket) { 835 TRACE_ERROR("Failed to create new socket!\n"); 836 /* TODO: destroy the stream */ 837 errno = ENFILE; 838 return -1; 839 } 840 socket->stream = accepted; 841 accepted->socket = socket; 842 /* if monitor is enabled, complete the socket assignment */ 843 if (socket->stream->pair_stream != NULL) 844 socket->stream->pair_stream->socket = socket; 845 } 846 847 TRACE_API("Stream %d accepted.\n", accepted->id); 848 849 if (addr && addrlen) { 850 struct sockaddr_in *addr_in = (struct sockaddr_in *)addr; 851 addr_in->sin_family = AF_INET; 852 addr_in->sin_port = accepted->dport; 853 addr_in->sin_addr.s_addr = accepted->daddr; 854 *addrlen = sizeof(struct sockaddr_in); 855 } 856 857 return accepted->socket->id; 858 } 859 /*----------------------------------------------------------------------------*/ 860 int 861 mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr, 862 in_addr_t daddr, in_addr_t dport) 863 { 864 mtcp_manager_t mtcp; 865 addr_pool_t ap; 866 867 mtcp = GetMTCPManager(mctx); 868 if (!mtcp) { 869 errno = EACCES; 870 return -1; 871 } 872 873 if (saddr_base == INADDR_ANY) { 874 int nif_out; 875 876 /* for the INADDR_ANY, find the output interface for the destination 877 and set the saddr_base as the ip address of the output interface */ 878 nif_out = GetOutputInterface(daddr); 879 saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr; 880 } 881 882 ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus, 883 saddr_base, num_addr, daddr, dport); 884 if (!ap) { 885 errno = ENOMEM; 886 return -1; 887 } 888 889 mtcp->ap = ap; 890 891 return 0; 892 } 893 /*----------------------------------------------------------------------------*/ 894 int 895 eval_bpf_5tuple(struct sfbpf_program fcode, 896 in_addr_t saddr, in_port_t sport, 897 in_addr_t daddr, in_port_t dport) { 898 uint8_t buf[TOTAL_TCP_HEADER_LEN]; 899 struct ethhdr *ethh; 900 struct iphdr *iph; 901 struct tcphdr *tcph; 902 903 ethh = (struct ethhdr *)buf; 904 ethh->h_proto = htons(ETH_P_IP); 905 iph = (struct iphdr *)(ethh + 1); 906 iph->ihl = IP_HEADER_LEN >> 2; 907 iph->version = 4; 908 iph->tos = 0; 909 iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN); 910 iph->id = htons(0); 911 iph->protocol = IPPROTO_TCP; 912 iph->saddr = saddr; 913 iph->daddr = daddr; 914 iph->check = 0; 915 tcph = (struct tcphdr *)(iph + 1); 916 tcph->source = sport; 917 tcph->dest = dport; 918 919 return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr), 920 TOTAL_TCP_HEADER_LEN); 921 } 922 /*----------------------------------------------------------------------------*/ 923 int 924 mtcp_connect(mctx_t mctx, int sockid, 925 const struct sockaddr *addr, socklen_t addrlen) 926 { 927 mtcp_manager_t mtcp; 928 socket_map_t socket; 929 tcp_stream *cur_stream; 930 struct sockaddr_in *addr_in; 931 in_addr_t dip; 932 in_port_t dport; 933 int is_dyn_bound = FALSE; 934 int ret; 935 int cnt_match = 0; 936 struct mon_listener *walk; 937 struct sfbpf_program fcode; 938 939 cur_stream = NULL; 940 mtcp = GetMTCPManager(mctx); 941 if (!mtcp) { 942 errno = EACCES; 943 return -1; 944 } 945 946 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 947 TRACE_API("Socket id %d out of range.\n", sockid); 948 errno = EBADF; 949 return -1; 950 } 951 952 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 953 TRACE_API("Invalid socket id: %d\n", sockid); 954 errno = EBADF; 955 return -1; 956 } 957 958 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 959 TRACE_API("Not an end socket. id: %d\n", sockid); 960 errno = ENOTSOCK; 961 return -1; 962 } 963 964 if (!addr) { 965 TRACE_API("Socket %d: empty address!\n", sockid); 966 errno = EFAULT; 967 return -1; 968 } 969 970 /* we only allow bind() for AF_INET address */ 971 if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 972 TRACE_API("Socket %d: invalid argument!\n", sockid); 973 errno = EAFNOSUPPORT; 974 return -1; 975 } 976 977 socket = &mtcp->smap[sockid]; 978 if (socket->stream) { 979 TRACE_API("Socket %d: stream already exist!\n", sockid); 980 if (socket->stream->state >= TCP_ST_ESTABLISHED) { 981 errno = EISCONN; 982 } else { 983 errno = EALREADY; 984 } 985 return -1; 986 } 987 988 addr_in = (struct sockaddr_in *)addr; 989 dip = addr_in->sin_addr.s_addr; 990 dport = addr_in->sin_port; 991 992 /* address binding */ 993 if (socket->opts & MTCP_ADDR_BIND && 994 socket->saddr.sin_port != INPORT_ANY && 995 socket->saddr.sin_addr.s_addr != INADDR_ANY) { 996 int rss_core; 997 998 rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip, 999 socket->saddr.sin_port, dport, num_queues); 1000 1001 if (rss_core != mctx->cpu) { 1002 errno = EINVAL; 1003 return -1; 1004 } 1005 } else { 1006 if (mtcp->ap) { 1007 ret = FetchAddress(mtcp->ap, 1008 mctx->cpu, num_queues, addr_in, &socket->saddr); 1009 } else { 1010 ret = FetchAddress(ap, 1011 mctx->cpu, num_queues, addr_in, &socket->saddr); 1012 } 1013 if (ret < 0) { 1014 errno = EAGAIN; 1015 return -1; 1016 } 1017 socket->opts |= MTCP_ADDR_BIND; 1018 is_dyn_bound = TRUE; 1019 } 1020 1021 cnt_match = 0; 1022 if (mtcp->num_msp > 0) { 1023 TAILQ_FOREACH(walk, &mtcp->monitors, link) { 1024 fcode = walk->stream_syn_fcode; 1025 if (!(ISSET_BPFFILTER(fcode) && 1026 eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr, 1027 socket->saddr.sin_port, 1028 dip, dport) == 0)) { 1029 walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1 1030 cnt_match++; 1031 } 1032 } 1033 } 1034 1035 if (mtcp->num_msp > 0 && cnt_match > 0) { 1036 /* 150820 dhkim: XXX: embedded mode is not verified */ 1037 #if 1 1038 cur_stream = CreateClientTCPStream(mtcp, socket, 1039 STREAM_TYPE(MOS_SOCK_STREAM) | 1040 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 1041 socket->saddr.sin_addr.s_addr, 1042 socket->saddr.sin_port, dip, dport, NULL); 1043 #else 1044 cur_stream = CreateDualTCPStream(mtcp, socket, 1045 STREAM_TYPE(MOS_SOCK_STREAM) | 1046 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 1047 socket->saddr.sin_addr.s_addr, 1048 socket->saddr.sin_port, dip, dport, NULL); 1049 #endif 1050 } 1051 else 1052 cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM), 1053 socket->saddr.sin_addr.s_addr, 1054 socket->saddr.sin_port, dip, dport, NULL); 1055 if (!cur_stream) { 1056 TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid); 1057 errno = ENOMEM; 1058 return -1; 1059 } 1060 1061 if (is_dyn_bound) 1062 cur_stream->is_bound_addr = TRUE; 1063 cur_stream->sndvar->cwnd = 1; 1064 cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10; 1065 cur_stream->side = MOS_SIDE_CLI; 1066 /* if monitor is enabled, update the pair stream side as well */ 1067 if (cur_stream->pair_stream) { 1068 cur_stream->pair_stream->side = MOS_SIDE_SVR; 1069 /* 1070 * if buffer management is off, then disable 1071 * monitoring tcp ring of server... 1072 * if there is even a single monitor asking for 1073 * buffer management, enable it (that's why the 1074 * need for the loop) 1075 */ 1076 cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF; 1077 struct socket_map *walk; 1078 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 1079 uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt; 1080 if (bm > cur_stream->pair_stream->buffer_mgmt) { 1081 cur_stream->pair_stream->buffer_mgmt = bm; 1082 break; 1083 } 1084 } SOCKQ_FOREACH_END; 1085 } 1086 1087 cur_stream->state = TCP_ST_SYN_SENT; 1088 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1089 1090 TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id); 1091 1092 SQ_LOCK(&mtcp->ctx->connect_lock); 1093 ret = StreamEnqueue(mtcp->connectq, cur_stream); 1094 SQ_UNLOCK(&mtcp->ctx->connect_lock); 1095 mtcp->wakeup_flag = TRUE; 1096 if (ret < 0) { 1097 TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid); 1098 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1099 StreamEnqueue(mtcp->destroyq, cur_stream); 1100 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1101 errno = EAGAIN; 1102 return -1; 1103 } 1104 1105 /* if nonblocking socket, return EINPROGRESS */ 1106 if (socket->opts & MTCP_NONBLOCK) { 1107 errno = EINPROGRESS; 1108 return -1; 1109 1110 } else { 1111 while (1) { 1112 if (!cur_stream) { 1113 TRACE_ERROR("STREAM DESTROYED\n"); 1114 errno = ETIMEDOUT; 1115 return -1; 1116 } 1117 if (cur_stream->state > TCP_ST_ESTABLISHED) { 1118 TRACE_ERROR("Socket %d: weird state %s\n", 1119 sockid, TCPStateToString(cur_stream)); 1120 // TODO: how to handle this? 1121 errno = ENOSYS; 1122 return -1; 1123 } 1124 1125 if (cur_stream->state == TCP_ST_ESTABLISHED) { 1126 break; 1127 } 1128 usleep(1000); 1129 } 1130 } 1131 1132 return 0; 1133 } 1134 /*----------------------------------------------------------------------------*/ 1135 static inline int 1136 CloseStreamSocket(mctx_t mctx, int sockid) 1137 { 1138 mtcp_manager_t mtcp; 1139 tcp_stream *cur_stream; 1140 int ret; 1141 1142 mtcp = GetMTCPManager(mctx); 1143 if (!mtcp) { 1144 errno = EACCES; 1145 return -1; 1146 } 1147 1148 cur_stream = mtcp->smap[sockid].stream; 1149 if (!cur_stream) { 1150 TRACE_API("Socket %d: stream does not exist.\n", sockid); 1151 errno = ENOTCONN; 1152 return -1; 1153 } 1154 1155 if (cur_stream->closed) { 1156 TRACE_API("Socket %d (Stream %u): already closed stream\n", 1157 sockid, cur_stream->id); 1158 return 0; 1159 } 1160 cur_stream->closed = TRUE; 1161 1162 TRACE_API("Stream %d: closing the stream.\n", cur_stream->id); 1163 1164 /* 141029 dhkim: Check this! */ 1165 cur_stream->socket = NULL; 1166 1167 if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 1168 TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n", 1169 cur_stream->id); 1170 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1171 StreamEnqueue(mtcp->destroyq, cur_stream); 1172 mtcp->wakeup_flag = TRUE; 1173 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1174 return 0; 1175 1176 } else if (cur_stream->state == TCP_ST_SYN_SENT) { 1177 #if 1 1178 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1179 StreamEnqueue(mtcp->destroyq, cur_stream); 1180 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1181 mtcp->wakeup_flag = TRUE; 1182 #endif 1183 return -1; 1184 1185 } else if (cur_stream->state != TCP_ST_ESTABLISHED && 1186 cur_stream->state != TCP_ST_CLOSE_WAIT) { 1187 TRACE_API("Stream %d at state %s\n", 1188 cur_stream->id, TCPStateToString(cur_stream)); 1189 errno = EBADF; 1190 return -1; 1191 } 1192 1193 SQ_LOCK(&mtcp->ctx->close_lock); 1194 cur_stream->sndvar->on_closeq = TRUE; 1195 ret = StreamEnqueue(mtcp->closeq, cur_stream); 1196 mtcp->wakeup_flag = TRUE; 1197 SQ_UNLOCK(&mtcp->ctx->close_lock); 1198 1199 if (ret < 0) { 1200 TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 1201 errno = EAGAIN; 1202 return -1; 1203 } 1204 1205 return 0; 1206 } 1207 /*----------------------------------------------------------------------------*/ 1208 static inline int 1209 CloseListeningSocket(mctx_t mctx, int sockid) 1210 { 1211 mtcp_manager_t mtcp; 1212 struct tcp_listener *listener; 1213 1214 mtcp = GetMTCPManager(mctx); 1215 if (!mtcp) { 1216 errno = EACCES; 1217 return -1; 1218 } 1219 1220 listener = mtcp->smap[sockid].listener; 1221 if (!listener) { 1222 errno = EINVAL; 1223 return -1; 1224 } 1225 1226 if (listener->acceptq) { 1227 DestroyStreamQueue(listener->acceptq); 1228 listener->acceptq = NULL; 1229 } 1230 1231 pthread_mutex_lock(&listener->accept_lock); 1232 pthread_cond_signal(&listener->accept_cond); 1233 pthread_mutex_unlock(&listener->accept_lock); 1234 1235 pthread_cond_destroy(&listener->accept_cond); 1236 pthread_mutex_destroy(&listener->accept_lock); 1237 1238 free(listener); 1239 mtcp->smap[sockid].listener = NULL; 1240 1241 return 0; 1242 } 1243 /*----------------------------------------------------------------------------*/ 1244 int 1245 mtcp_close(mctx_t mctx, int sockid) 1246 { 1247 mtcp_manager_t mtcp; 1248 int ret; 1249 1250 mtcp = GetMTCPManager(mctx); 1251 if (!mtcp) { 1252 errno = EACCES; 1253 return -1; 1254 } 1255 1256 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1257 TRACE_API("Socket id %d out of range.\n", sockid); 1258 errno = EBADF; 1259 return -1; 1260 } 1261 1262 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 1263 TRACE_API("Invalid socket id: %d\n", sockid); 1264 errno = EBADF; 1265 return -1; 1266 } 1267 1268 TRACE_API("Socket %d: mtcp_close called.\n", sockid); 1269 1270 switch (mtcp->smap[sockid].socktype) { 1271 case MOS_SOCK_STREAM: 1272 ret = CloseStreamSocket(mctx, sockid); 1273 break; 1274 1275 case MOS_SOCK_STREAM_LISTEN: 1276 ret = CloseListeningSocket(mctx, sockid); 1277 break; 1278 1279 case MOS_SOCK_EPOLL: 1280 ret = CloseEpollSocket(mctx, sockid); 1281 break; 1282 1283 case MOS_SOCK_PIPE: 1284 ret = PipeClose(mctx, sockid); 1285 break; 1286 1287 default: 1288 errno = EINVAL; 1289 ret = -1; 1290 break; 1291 } 1292 1293 FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 1294 1295 return ret; 1296 } 1297 /*----------------------------------------------------------------------------*/ 1298 int 1299 mtcp_abort(mctx_t mctx, int sockid) 1300 { 1301 mtcp_manager_t mtcp; 1302 tcp_stream *cur_stream; 1303 int ret; 1304 1305 mtcp = GetMTCPManager(mctx); 1306 if (!mtcp) { 1307 errno = EACCES; 1308 return -1; 1309 } 1310 1311 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1312 TRACE_API("Socket id %d out of range.\n", sockid); 1313 errno = EBADF; 1314 return -1; 1315 } 1316 1317 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 1318 TRACE_API("Invalid socket id: %d\n", sockid); 1319 errno = EBADF; 1320 return -1; 1321 } 1322 1323 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 1324 TRACE_API("Not an end socket. id: %d\n", sockid); 1325 errno = ENOTSOCK; 1326 return -1; 1327 } 1328 1329 cur_stream = mtcp->smap[sockid].stream; 1330 if (!cur_stream) { 1331 TRACE_API("Stream %d: does not exist.\n", sockid); 1332 errno = ENOTCONN; 1333 return -1; 1334 } 1335 1336 TRACE_API("Socket %d: mtcp_abort()\n", sockid); 1337 1338 FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 1339 cur_stream->socket = NULL; 1340 1341 if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 1342 TRACE_API("Stream %d: connection already reset.\n", sockid); 1343 return ERROR; 1344 1345 } else if (cur_stream->state == TCP_ST_SYN_SENT) { 1346 /* TODO: this should notify event failure to all 1347 previous read() or write() calls */ 1348 cur_stream->state = TCP_ST_CLOSED_RSVD; 1349 cur_stream->close_reason = TCP_ACTIVE_CLOSE; 1350 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1351 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1352 StreamEnqueue(mtcp->destroyq, cur_stream); 1353 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1354 mtcp->wakeup_flag = TRUE; 1355 return 0; 1356 1357 } else if (cur_stream->state == TCP_ST_CLOSING || 1358 cur_stream->state == TCP_ST_LAST_ACK || 1359 cur_stream->state == TCP_ST_TIME_WAIT) { 1360 cur_stream->state = TCP_ST_CLOSED_RSVD; 1361 cur_stream->close_reason = TCP_ACTIVE_CLOSE; 1362 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1363 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1364 StreamEnqueue(mtcp->destroyq, cur_stream); 1365 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1366 mtcp->wakeup_flag = TRUE; 1367 return 0; 1368 } 1369 1370 /* the stream structure will be destroyed after sending RST */ 1371 if (cur_stream->sndvar->on_resetq) { 1372 TRACE_ERROR("Stream %d: calling mtcp_abort() " 1373 "when in reset queue.\n", sockid); 1374 errno = ECONNRESET; 1375 return -1; 1376 } 1377 SQ_LOCK(&mtcp->ctx->reset_lock); 1378 cur_stream->sndvar->on_resetq = TRUE; 1379 ret = StreamEnqueue(mtcp->resetq, cur_stream); 1380 SQ_UNLOCK(&mtcp->ctx->reset_lock); 1381 mtcp->wakeup_flag = TRUE; 1382 1383 if (ret < 0) { 1384 TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 1385 errno = EAGAIN; 1386 return -1; 1387 } 1388 1389 return 0; 1390 } 1391 /*----------------------------------------------------------------------------*/ 1392 static inline int 1393 CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len) 1394 { 1395 struct tcp_recv_vars *rcvvar = cur_stream->rcvvar; 1396 int copylen; 1397 #ifdef NEWRB 1398 tcprb_t *rb = rcvvar->rcvbuf; 1399 if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) { 1400 errno = EAGAIN; 1401 return -1; 1402 } 1403 tcprb_setpile(rb, rb->pile + copylen); 1404 1405 rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb); 1406 //printf("rcv_wnd: %d\n", rcvvar->rcv_wnd); 1407 #else 1408 copylen = MIN(rcvvar->rcvbuf->merged_len, len); 1409 if (copylen <= 0) { 1410 errno = EAGAIN; 1411 return -1; 1412 } 1413 1414 assert(rcvvar->rcvbuf->data); 1415 /* Copy data to user buffer and remove it from receiving buffer */ 1416 memcpy(buf, rcvvar->rcvbuf->head, copylen); 1417 RBRemove(mtcp->rbm_rcv, rcvvar->rcvbuf, copylen, AT_APP, cur_stream->buffer_mgmt); 1418 rcvvar->rcv_wnd = rcvvar->rcvbuf->size - rcvvar->rcvbuf->merged_len; 1419 #endif 1420 1421 /* Advertise newly freed receive buffer */ 1422 if (cur_stream->need_wnd_adv) { 1423 if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) { 1424 if (!cur_stream->sndvar->on_ackq) { 1425 SQ_LOCK(&mtcp->ctx->ackq_lock); 1426 cur_stream->sndvar->on_ackq = TRUE; 1427 StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */ 1428 SQ_UNLOCK(&mtcp->ctx->ackq_lock); 1429 cur_stream->need_wnd_adv = FALSE; 1430 mtcp->wakeup_flag = TRUE; 1431 } 1432 } 1433 } 1434 1435 return copylen; 1436 } 1437 /*----------------------------------------------------------------------------*/ 1438 ssize_t 1439 mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len) 1440 { 1441 mtcp_manager_t mtcp; 1442 socket_map_t socket; 1443 tcp_stream *cur_stream; 1444 struct tcp_recv_vars *rcvvar; 1445 int event_remaining, merged_len; 1446 int ret; 1447 1448 mtcp = GetMTCPManager(mctx); 1449 if (!mtcp) { 1450 errno = EACCES; 1451 return -1; 1452 } 1453 1454 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1455 TRACE_API("Socket id %d out of range.\n", sockid); 1456 errno = EBADF; 1457 return -1; 1458 } 1459 1460 socket = &mtcp->smap[sockid]; 1461 if (socket->socktype == MOS_SOCK_UNUSED) { 1462 TRACE_API("Invalid socket id: %d\n", sockid); 1463 errno = EBADF; 1464 return -1; 1465 } 1466 1467 if (socket->socktype == MOS_SOCK_PIPE) { 1468 return PipeRead(mctx, sockid, buf, len); 1469 } 1470 1471 if (socket->socktype != MOS_SOCK_STREAM) { 1472 TRACE_API("Not an end socket. id: %d\n", sockid); 1473 errno = ENOTSOCK; 1474 return -1; 1475 } 1476 1477 /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 1478 cur_stream = socket->stream; 1479 if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf || 1480 !(cur_stream->state >= TCP_ST_ESTABLISHED && 1481 cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 1482 errno = ENOTCONN; 1483 return -1; 1484 } 1485 1486 rcvvar = cur_stream->rcvvar; 1487 1488 #ifdef NEWRB 1489 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1490 #else 1491 merged_len = rcvvar->rcvbuf->merged_len; 1492 #endif 1493 1494 /* if CLOSE_WAIT, return 0 if there is no payload */ 1495 if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 1496 if (!rcvvar->rcvbuf) 1497 return 0; 1498 1499 if (merged_len == 0) 1500 return 0; 1501 } 1502 1503 /* return EAGAIN if no receive buffer */ 1504 if (socket->opts & MTCP_NONBLOCK) { 1505 if (!rcvvar->rcvbuf || merged_len == 0) { 1506 errno = EAGAIN; 1507 return -1; 1508 } 1509 } 1510 1511 SBUF_LOCK(&rcvvar->read_lock); 1512 1513 ret = CopyToUser(mtcp, cur_stream, buf, len); 1514 1515 #ifdef NEWRB 1516 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1517 #else 1518 merged_len = rcvvar->rcvbuf->merged_len; 1519 #endif 1520 1521 event_remaining = FALSE; 1522 /* if there are remaining payload, generate EPOLLIN */ 1523 /* (may due to insufficient user buffer) */ 1524 if (socket->epoll & MOS_EPOLLIN) { 1525 if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 1526 event_remaining = TRUE; 1527 } 1528 } 1529 /* if waiting for close, notify it if no remaining data */ 1530 if (cur_stream->state == TCP_ST_CLOSE_WAIT && 1531 merged_len == 0 && ret > 0) { 1532 event_remaining = TRUE; 1533 } 1534 1535 SBUF_UNLOCK(&rcvvar->read_lock); 1536 1537 if (event_remaining) { 1538 if (socket->epoll) { 1539 AddEpollEvent(mtcp->ep, 1540 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 1541 } 1542 } 1543 1544 TRACE_API("Stream %d: mtcp_read() returning %d\n", cur_stream->id, ret); 1545 return ret; 1546 } 1547 /*----------------------------------------------------------------------------*/ 1548 ssize_t 1549 mtcp_readv(mctx_t mctx, int sockid, struct iovec *iov, int numIOV) 1550 { 1551 mtcp_manager_t mtcp; 1552 socket_map_t socket; 1553 tcp_stream *cur_stream; 1554 struct tcp_recv_vars *rcvvar; 1555 int ret, bytes_read, i; 1556 int event_remaining, merged_len; 1557 1558 mtcp = GetMTCPManager(mctx); 1559 if (!mtcp) { 1560 errno = EACCES; 1561 return -1; 1562 } 1563 1564 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1565 TRACE_API("Socket id %d out of range.\n", sockid); 1566 errno = EBADF; 1567 return -1; 1568 } 1569 1570 socket = &mtcp->smap[sockid]; 1571 if (socket->socktype == MOS_SOCK_UNUSED) { 1572 TRACE_API("Invalid socket id: %d\n", sockid); 1573 errno = EBADF; 1574 return -1; 1575 } 1576 1577 if (socket->socktype != MOS_SOCK_STREAM) { 1578 TRACE_API("Not an end socket. id: %d\n", sockid); 1579 errno = ENOTSOCK; 1580 return -1; 1581 } 1582 1583 /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 1584 cur_stream = socket->stream; 1585 if (!cur_stream || 1586 !(cur_stream->state >= TCP_ST_ESTABLISHED && 1587 cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 1588 errno = ENOTCONN; 1589 return -1; 1590 } 1591 1592 rcvvar = cur_stream->rcvvar; 1593 1594 #ifdef NEWRB 1595 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1596 #else 1597 merged_len = rcvvar->rcvbuf->merged_len; 1598 #endif 1599 1600 /* if CLOSE_WAIT, return 0 if there is no payload */ 1601 if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 1602 if (!rcvvar->rcvbuf) 1603 return 0; 1604 1605 if (merged_len == 0) 1606 return 0; 1607 } 1608 1609 /* return EAGAIN if no receive buffer */ 1610 if (socket->opts & MTCP_NONBLOCK) { 1611 if (!rcvvar->rcvbuf || merged_len == 0) { 1612 errno = EAGAIN; 1613 return -1; 1614 } 1615 } 1616 1617 SBUF_LOCK(&rcvvar->read_lock); 1618 1619 /* read and store the contents to the vectored buffers */ 1620 bytes_read = 0; 1621 for (i = 0; i < numIOV; i++) { 1622 if (iov[i].iov_len <= 0) 1623 continue; 1624 1625 ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 1626 if (ret <= 0) 1627 break; 1628 1629 bytes_read += ret; 1630 1631 if (ret < iov[i].iov_len) 1632 break; 1633 } 1634 1635 #ifdef NEWRB 1636 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1637 #else 1638 merged_len = rcvvar->rcvbuf->merged_len; 1639 #endif 1640 1641 event_remaining = FALSE; 1642 /* if there are remaining payload, generate read event */ 1643 /* (may due to insufficient user buffer) */ 1644 if (socket->epoll & MOS_EPOLLIN) { 1645 if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 1646 event_remaining = TRUE; 1647 } 1648 } 1649 /* if waiting for close, notify it if no remaining data */ 1650 if (cur_stream->state == TCP_ST_CLOSE_WAIT && 1651 merged_len == 0 && bytes_read > 0) { 1652 event_remaining = TRUE; 1653 } 1654 1655 SBUF_UNLOCK(&rcvvar->read_lock); 1656 1657 if(event_remaining) { 1658 if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) { 1659 AddEpollEvent(mtcp->ep, 1660 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 1661 } 1662 } 1663 1664 TRACE_API("Stream %d: mtcp_readv() returning %d\n", 1665 cur_stream->id, bytes_read); 1666 return bytes_read; 1667 } 1668 /*----------------------------------------------------------------------------*/ 1669 static inline int 1670 CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len) 1671 { 1672 struct tcp_send_vars *sndvar = cur_stream->sndvar; 1673 int sndlen; 1674 int ret; 1675 1676 sndlen = MIN((int)sndvar->snd_wnd, len); 1677 if (sndlen <= 0) { 1678 errno = EAGAIN; 1679 return -1; 1680 } 1681 1682 /* allocate send buffer if not exist */ 1683 if (!sndvar->sndbuf) { 1684 sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1); 1685 if (!sndvar->sndbuf) { 1686 cur_stream->close_reason = TCP_NO_MEM; 1687 /* notification may not required due to -1 return */ 1688 errno = ENOMEM; 1689 return -1; 1690 } 1691 } 1692 1693 ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen); 1694 assert(ret == sndlen); 1695 sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len; 1696 if (ret <= 0) { 1697 TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n", 1698 ret, sndlen, sndvar->sndbuf->len); 1699 errno = EAGAIN; 1700 return -1; 1701 } 1702 1703 if (sndvar->snd_wnd <= 0) { 1704 TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n", 1705 cur_stream->id, sndvar->snd_wnd); 1706 } 1707 1708 return ret; 1709 } 1710 /*----------------------------------------------------------------------------*/ 1711 ssize_t 1712 mtcp_write(mctx_t mctx, int sockid, char *buf, size_t len) 1713 { 1714 mtcp_manager_t mtcp; 1715 socket_map_t socket; 1716 tcp_stream *cur_stream; 1717 struct tcp_send_vars *sndvar; 1718 int ret; 1719 1720 mtcp = GetMTCPManager(mctx); 1721 if (!mtcp) { 1722 errno = EACCES; 1723 return -1; 1724 } 1725 1726 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1727 TRACE_API("Socket id %d out of range.\n", sockid); 1728 errno = EBADF; 1729 return -1; 1730 } 1731 1732 socket = &mtcp->smap[sockid]; 1733 if (socket->socktype == MOS_SOCK_UNUSED) { 1734 TRACE_API("Invalid socket id: %d\n", sockid); 1735 errno = EBADF; 1736 return -1; 1737 } 1738 1739 if (socket->socktype == MOS_SOCK_PIPE) { 1740 return PipeWrite(mctx, sockid, buf, len); 1741 } 1742 1743 if (socket->socktype != MOS_SOCK_STREAM) { 1744 TRACE_API("Not an end socket. id: %d\n", sockid); 1745 errno = ENOTSOCK; 1746 return -1; 1747 } 1748 1749 cur_stream = socket->stream; 1750 if (!cur_stream || 1751 !(cur_stream->state == TCP_ST_ESTABLISHED || 1752 cur_stream->state == TCP_ST_CLOSE_WAIT)) { 1753 errno = ENOTCONN; 1754 return -1; 1755 } 1756 1757 if (len <= 0) { 1758 if (socket->opts & MTCP_NONBLOCK) { 1759 errno = EAGAIN; 1760 return -1; 1761 } else { 1762 return 0; 1763 } 1764 } 1765 1766 sndvar = cur_stream->sndvar; 1767 1768 SBUF_LOCK(&sndvar->write_lock); 1769 ret = CopyFromUser(mtcp, cur_stream, buf, len); 1770 1771 SBUF_UNLOCK(&sndvar->write_lock); 1772 1773 if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 1774 SQ_LOCK(&mtcp->ctx->sendq_lock); 1775 sndvar->on_sendq = TRUE; 1776 StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 1777 SQ_UNLOCK(&mtcp->ctx->sendq_lock); 1778 mtcp->wakeup_flag = TRUE; 1779 } 1780 1781 if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) { 1782 ret = -1; 1783 errno = EAGAIN; 1784 } 1785 1786 /* if there are remaining sending buffer, generate write event */ 1787 if (sndvar->snd_wnd > 0) { 1788 if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 1789 AddEpollEvent(mtcp->ep, 1790 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 1791 } 1792 } 1793 1794 TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret); 1795 return ret; 1796 } 1797 /*----------------------------------------------------------------------------*/ 1798 ssize_t 1799 mtcp_writev(mctx_t mctx, int sockid, struct iovec *iov, int numIOV) 1800 { 1801 mtcp_manager_t mtcp; 1802 socket_map_t socket; 1803 tcp_stream *cur_stream; 1804 struct tcp_send_vars *sndvar; 1805 int ret, to_write, i; 1806 1807 mtcp = GetMTCPManager(mctx); 1808 if (!mtcp) { 1809 errno = EACCES; 1810 return -1; 1811 } 1812 1813 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1814 TRACE_API("Socket id %d out of range.\n", sockid); 1815 errno = EBADF; 1816 return -1; 1817 } 1818 1819 socket = &mtcp->smap[sockid]; 1820 if (socket->socktype == MOS_SOCK_UNUSED) { 1821 TRACE_API("Invalid socket id: %d\n", sockid); 1822 errno = EBADF; 1823 return -1; 1824 } 1825 1826 if (socket->socktype != MOS_SOCK_STREAM) { 1827 TRACE_API("Not an end socket. id: %d\n", sockid); 1828 errno = ENOTSOCK; 1829 return -1; 1830 } 1831 1832 cur_stream = socket->stream; 1833 if (!cur_stream || 1834 !(cur_stream->state == TCP_ST_ESTABLISHED || 1835 cur_stream->state == TCP_ST_CLOSE_WAIT)) { 1836 errno = ENOTCONN; 1837 return -1; 1838 } 1839 1840 sndvar = cur_stream->sndvar; 1841 SBUF_LOCK(&sndvar->write_lock); 1842 1843 /* write from the vectored buffers */ 1844 to_write = 0; 1845 for (i = 0; i < numIOV; i++) { 1846 if (iov[i].iov_len <= 0) 1847 continue; 1848 1849 ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 1850 if (ret <= 0) 1851 break; 1852 1853 to_write += ret; 1854 1855 if (ret < iov[i].iov_len) 1856 break; 1857 } 1858 SBUF_UNLOCK(&sndvar->write_lock); 1859 1860 if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 1861 SQ_LOCK(&mtcp->ctx->sendq_lock); 1862 sndvar->on_sendq = TRUE; 1863 StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 1864 SQ_UNLOCK(&mtcp->ctx->sendq_lock); 1865 mtcp->wakeup_flag = TRUE; 1866 } 1867 1868 if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) { 1869 to_write = -1; 1870 errno = EAGAIN; 1871 } 1872 1873 /* if there are remaining sending buffer, generate write event */ 1874 if (sndvar->snd_wnd > 0) { 1875 if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 1876 AddEpollEvent(mtcp->ep, 1877 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 1878 } 1879 } 1880 1881 TRACE_API("Stream %d: mtcp_writev() returning %d\n", 1882 cur_stream->id, to_write); 1883 return to_write; 1884 } 1885 /*----------------------------------------------------------------------------*/ 1886 uint32_t 1887 mtcp_get_connection_cnt(mctx_t mctx) 1888 { 1889 mtcp_manager_t mtcp; 1890 mtcp = GetMTCPManager(mctx); 1891 if (!mtcp) { 1892 errno = EACCES; 1893 return -1; 1894 } 1895 1896 if (mtcp->num_msp > 0) 1897 return mtcp->flow_cnt / 2; 1898 else 1899 return mtcp->flow_cnt; 1900 } 1901 /*----------------------------------------------------------------------------*/ 1902