1 #include <sys/queue.h> 2 #include <sys/ioctl.h> 3 #include <limits.h> 4 #include <unistd.h> 5 #include <assert.h> 6 #include <string.h> 7 8 #ifdef DARWIN 9 #include <netinet/tcp.h> 10 #include <netinet/ip.h> 11 #include <netinet/if_ether.h> 12 #endif 13 14 #include "mtcp.h" 15 #include "mtcp_api.h" 16 #include "tcp_in.h" 17 #include "tcp_stream.h" 18 #include "tcp_out.h" 19 #include "ip_out.h" 20 #include "eventpoll.h" 21 #include "pipe.h" 22 #include "fhash.h" 23 #include "addr_pool.h" 24 #include "rss.h" 25 #include "config.h" 26 #include "debug.h" 27 #include "eventpoll.h" 28 #include "mos_api.h" 29 #include "tcp_rb.h" 30 31 #define MAX(a, b) ((a)>(b)?(a):(b)) 32 #define MIN(a, b) ((a)<(b)?(a):(b)) 33 34 /*----------------------------------------------------------------------------*/ 35 /** Stop monitoring the socket! (function prototype) 36 * @param [in] mctx: mtcp context 37 * @param [in] sock: monitoring stream socket id 38 * @param [in] side: side of monitoring (client side, server side or both) 39 * 40 * This function is now DEPRECATED and is only used within mOS core... 41 */ 42 int 43 mtcp_cb_stop(mctx_t mctx, int sock, int side); 44 /*----------------------------------------------------------------------------*/ 45 /** Reset the connection (send RST packets to both sides) 46 * (We need to decide the API for this.) 47 */ 48 //int 49 //mtcp_cb_reset(mctx_t mctx, int sock, int side); 50 /*----------------------------------------------------------------------------*/ 51 inline mtcp_manager_t 52 GetMTCPManager(mctx_t mctx) 53 { 54 if (!mctx) { 55 errno = EACCES; 56 return NULL; 57 } 58 59 if (mctx->cpu < 0 || mctx->cpu >= num_cpus) { 60 errno = EINVAL; 61 return NULL; 62 } 63 64 if (g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) { 65 errno = EPERM; 66 return NULL; 67 } 68 69 return g_mtcp[mctx->cpu]; 70 } 71 /*----------------------------------------------------------------------------*/ 72 inline int 73 GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen) 74 { 75 tcp_stream *cur_stream; 76 77 if (!socket->stream) { 78 errno = EBADF; 79 return -1; 80 } 81 82 cur_stream = socket->stream; 83 if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 84 if (cur_stream->close_reason == TCP_TIMEDOUT || 85 cur_stream->close_reason == TCP_CONN_FAIL || 86 cur_stream->close_reason == TCP_CONN_LOST) { 87 *(int *)optval = ETIMEDOUT; 88 *optlen = sizeof(int); 89 90 return 0; 91 } 92 } 93 94 if (cur_stream->state == TCP_ST_CLOSE_WAIT || 95 cur_stream->state == TCP_ST_CLOSED_RSVD) { 96 if (cur_stream->close_reason == TCP_RESET) { 97 *(int *)optval = ECONNRESET; 98 *optlen = sizeof(int); 99 100 return 0; 101 } 102 } 103 104 errno = ENOSYS; 105 return -1; 106 } 107 /*----------------------------------------------------------------------------*/ 108 int 109 mtcp_getsockopt(mctx_t mctx, int sockid, int level, 110 int optname, void *optval, socklen_t *optlen) 111 { 112 mtcp_manager_t mtcp; 113 socket_map_t socket; 114 115 mtcp = GetMTCPManager(mctx); 116 if (!mtcp) { 117 errno = EACCES; 118 return -1; 119 } 120 121 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 122 TRACE_API("Socket id %d out of range.\n", sockid); 123 errno = EBADF; 124 return -1; 125 } 126 127 switch (level) { 128 case SOL_SOCKET: 129 socket = &mtcp->smap[sockid]; 130 if (socket->socktype == MOS_SOCK_UNUSED) { 131 TRACE_API("Invalid socket id: %d\n", sockid); 132 errno = EBADF; 133 return -1; 134 } 135 136 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 137 socket->socktype != MOS_SOCK_STREAM) { 138 TRACE_API("Invalid socket id: %d\n", sockid); 139 errno = ENOTSOCK; 140 return -1; 141 } 142 143 if (optname == SO_ERROR) { 144 if (socket->socktype == MOS_SOCK_STREAM) { 145 return GetSocketError(socket, optval, optlen); 146 } 147 } 148 break; 149 case SOL_MONSOCKET: 150 /* check if the calling thread is in MOS context */ 151 if (mtcp->ctx->thread != pthread_self()) { 152 errno = EPERM; 153 return -1; 154 } 155 /* 156 * All options will only work for active 157 * monitor stream sockets 158 */ 159 socket = &mtcp->msmap[sockid]; 160 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 161 TRACE_API("Invalid socket id: %d\n", sockid); 162 errno = ENOTSOCK; 163 return -1; 164 } 165 166 switch (optname) { 167 case MOS_FRAGINFO_CLIBUF: 168 return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen); 169 case MOS_FRAGINFO_SVRBUF: 170 return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen); 171 case MOS_INFO_CLIBUF: 172 return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen); 173 case MOS_INFO_SVRBUF: 174 return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen); 175 case MOS_TCP_STATE_CLI: 176 return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI, 177 optval, optlen); 178 case MOS_TCP_STATE_SVR: 179 return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR, 180 optval, optlen); 181 case MOS_TIMESTAMP: 182 return GetLastTimestamp(socket->monitor_stream->stream, 183 (uint32_t *)optval, 184 optlen); 185 default: 186 TRACE_API("can't recognize the optname=%d\n", optname); 187 assert(0); 188 } 189 break; 190 } 191 errno = ENOSYS; 192 return -1; 193 } 194 /*----------------------------------------------------------------------------*/ 195 int 196 mtcp_setsockopt(mctx_t mctx, int sockid, int level, 197 int optname, const void *optval, socklen_t optlen) 198 { 199 mtcp_manager_t mtcp; 200 socket_map_t socket; 201 tcprb_t *rb; 202 203 mtcp = GetMTCPManager(mctx); 204 if (!mtcp) { 205 errno = EACCES; 206 return -1; 207 } 208 209 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 210 TRACE_API("Socket id %d out of range.\n", sockid); 211 errno = EBADF; 212 return -1; 213 } 214 215 switch (level) { 216 case SOL_SOCKET: 217 socket = &mtcp->smap[sockid]; 218 if (socket->socktype == MOS_SOCK_UNUSED) { 219 TRACE_API("Invalid socket id: %d\n", sockid); 220 errno = EBADF; 221 return -1; 222 } 223 224 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 225 socket->socktype != MOS_SOCK_STREAM) { 226 TRACE_API("Invalid socket id: %d\n", sockid); 227 errno = ENOTSOCK; 228 return -1; 229 } 230 break; 231 case SOL_MONSOCKET: 232 socket = &mtcp->msmap[sockid]; 233 /* 234 * checking of calling thread to be in MOS context is 235 * disabled since both optnames can be called from 236 * `application' context (on passive sockets) 237 */ 238 /* 239 * if (mtcp->ctx->thread != pthread_self()) 240 * return -1; 241 */ 242 243 switch (optname) { 244 case MOS_CLIBUF: 245 #if 0 246 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 247 errno = EBADF; 248 return -1; 249 } 250 #endif 251 #ifdef NEWRB 252 #ifdef DISABLE_DYN_RESIZE 253 if (*(int *)optval != 0) 254 return -1; 255 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 256 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 257 socket->monitor_stream->stream->rcvvar->rcvbuf : 258 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 259 if (rb) { 260 tcprb_resize_meta(rb, 0); 261 tcprb_resize(rb, 0); 262 } 263 } 264 return DisableBuf(socket, MOS_SIDE_CLI); 265 #else 266 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 267 socket->monitor_stream->stream->rcvvar->rcvbuf : 268 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 269 if (tcprb_resize_meta(rb, *(int *)optval) < 0) 270 return -1; 271 return tcprb_resize(rb, 272 (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 273 #endif 274 #else 275 errno = EBADF; 276 return -1; 277 #endif 278 case MOS_SVRBUF: 279 #if 0 280 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 281 errno = EBADF; 282 return -1; 283 } 284 #endif 285 #ifdef NEWRB 286 #ifdef DISABLE_DYN_RESIZE 287 if (*(int *)optval != 0) 288 return -1; 289 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 290 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 291 socket->monitor_stream->stream->rcvvar->rcvbuf : 292 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 293 if (rb) { 294 tcprb_resize_meta(rb, 0); 295 tcprb_resize(rb, 0); 296 } 297 } 298 return DisableBuf(socket, MOS_SIDE_SVR); 299 #else 300 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 301 socket->monitor_stream->stream->rcvvar->rcvbuf : 302 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 303 if (tcprb_resize_meta(rb, *(int *)optval) < 0) 304 return -1; 305 return tcprb_resize(rb, 306 (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 307 #endif 308 #else 309 errno = EBADF; 310 return -1; 311 #endif 312 case MOS_FRAG_CLIBUF: 313 #if 0 314 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 315 errno = EBADF; 316 return -1; 317 } 318 #endif 319 #ifdef NEWRB 320 #ifdef DISABLE_DYN_RESIZE 321 if (*(int *)optval != 0) 322 return -1; 323 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 324 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 325 socket->monitor_stream->stream->rcvvar->rcvbuf : 326 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 327 if (rb) 328 tcprb_resize(rb, 0); 329 } 330 return 0; 331 #else 332 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 333 socket->monitor_stream->stream->rcvvar->rcvbuf : 334 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 335 if (rb->len == 0) 336 return tcprb_resize_meta(rb, *(int *)optval); 337 else 338 return -1; 339 #endif 340 #else 341 errno = EBADF; 342 return -1; 343 #endif 344 case MOS_FRAG_SVRBUF: 345 #if 0 346 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 347 errno = EBADF; 348 return -1; 349 } 350 #endif 351 #ifdef NEWRB 352 #ifdef DISABLE_DYN_RESIZE 353 if (*(int *)optval != 0) 354 return -1; 355 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 356 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 357 socket->monitor_stream->stream->rcvvar->rcvbuf : 358 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 359 if (rb) 360 tcprb_resize(rb, 0); 361 } 362 return 0; 363 #else 364 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 365 socket->monitor_stream->stream->rcvvar->rcvbuf : 366 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 367 if (rb->len == 0) 368 return tcprb_resize_meta(rb, *(int *)optval); 369 else 370 return -1; 371 #endif 372 #else 373 errno = EBADF; 374 return -1; 375 #endif 376 case MOS_MONLEVEL: 377 #ifdef OLD_API 378 assert(*(int *)optval == MOS_NO_CLIBUF || 379 *(int *)optval == MOS_NO_SVRBUF); 380 return DisableBuf(socket, 381 (*(int *)optval == MOS_NO_CLIBUF) ? 382 MOS_SIDE_CLI : MOS_SIDE_SVR); 383 #endif 384 case MOS_SEQ_REMAP: 385 return TcpSeqChange(socket, 386 (uint32_t)((seq_remap_info *)optval)->seq_off, 387 ((seq_remap_info *)optval)->side, 388 mtcp->pctx->p.seq); 389 case MOS_STOP_MON: 390 return mtcp_cb_stop(mctx, sockid, *(int *)optval); 391 default: 392 TRACE_API("invalid optname=%d\n", optname); 393 assert(0); 394 } 395 break; 396 } 397 398 errno = ENOSYS; 399 return -1; 400 } 401 /*----------------------------------------------------------------------------*/ 402 int 403 mtcp_setsock_nonblock(mctx_t mctx, int sockid) 404 { 405 mtcp_manager_t mtcp; 406 407 mtcp = GetMTCPManager(mctx); 408 if (!mtcp) { 409 errno = EACCES; 410 return -1; 411 } 412 413 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 414 TRACE_API("Socket id %d out of range.\n", sockid); 415 errno = EBADF; 416 return -1; 417 } 418 419 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 420 TRACE_API("Invalid socket id: %d\n", sockid); 421 errno = EBADF; 422 return -1; 423 } 424 425 mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 426 427 return 0; 428 } 429 /*----------------------------------------------------------------------------*/ 430 int 431 mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp) 432 { 433 mtcp_manager_t mtcp; 434 socket_map_t socket; 435 436 mtcp = GetMTCPManager(mctx); 437 if (!mtcp) { 438 errno = EACCES; 439 return -1; 440 } 441 442 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 443 TRACE_API("Socket id %d out of range.\n", sockid); 444 errno = EBADF; 445 return -1; 446 } 447 448 /* only support stream socket */ 449 socket = &mtcp->smap[sockid]; 450 451 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 452 socket->socktype != MOS_SOCK_STREAM) { 453 TRACE_API("Invalid socket id: %d\n", sockid); 454 errno = EBADF; 455 return -1; 456 } 457 458 if (!argp) { 459 errno = EFAULT; 460 return -1; 461 } 462 463 if (request == FIONREAD) { 464 tcp_stream *cur_stream; 465 #ifdef NEWRB 466 tcprb_t *rbuf; 467 #else 468 struct tcp_ring_buffer *rbuf; 469 #endif 470 471 cur_stream = socket->stream; 472 if (!cur_stream) { 473 errno = EBADF; 474 return -1; 475 } 476 477 rbuf = cur_stream->rcvvar->rcvbuf; 478 #ifdef NEWRB 479 *(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0; 480 #else 481 *(int *)argp = (rbuf) ? rbuf->merged_len : 0; 482 #endif 483 484 } else if (request == FIONBIO) { 485 /* 486 * sockets can only be set to blocking/non-blocking 487 * modes during initialization 488 */ 489 if ((*(int *)argp)) 490 mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 491 else 492 mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK; 493 } else { 494 errno = EINVAL; 495 return -1; 496 } 497 498 return 0; 499 } 500 /*----------------------------------------------------------------------------*/ 501 static int 502 mtcp_monitor(mctx_t mctx, socket_map_t sock) 503 { 504 mtcp_manager_t mtcp; 505 struct mon_listener *monitor; 506 int sockid = sock->id; 507 508 mtcp = GetMTCPManager(mctx); 509 if (!mtcp) { 510 errno = EACCES; 511 return -1; 512 } 513 514 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 515 TRACE_API("Socket id %d out of range.\n", sockid); 516 errno = EBADF; 517 return -1; 518 } 519 520 if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) { 521 TRACE_API("Invalid socket id: %d\n", sockid); 522 errno = EBADF; 523 return -1; 524 } 525 526 if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM || 527 mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) { 528 TRACE_API("Not a monitor socket. id: %d\n", sockid); 529 errno = ENOTSOCK; 530 return -1; 531 } 532 533 monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener)); 534 if (!monitor) { 535 /* errno set from the malloc() */ 536 errno = ENOMEM; 537 return -1; 538 } 539 540 /* create a monitor-specific event queue */ 541 monitor->eq = CreateEventQueue(g_config.mos->max_concurrency); 542 if (!monitor->eq) { 543 TRACE_API("Can't create event queue (concurrency: %d) for " 544 "monitor read event registrations!\n", 545 g_config.mos->max_concurrency); 546 free(monitor); 547 errno = ENOMEM; 548 return -1; 549 } 550 551 /* set monitor-related basic parameters */ 552 #ifndef NEWEV 553 monitor->ude_id = UDE_OFFSET; 554 #endif 555 monitor->socket = sock; 556 #ifdef NEWRB 557 monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL; 558 #else 559 monitor->client_buf_mgmt = monitor->server_buf_mgmt = 1; 560 #endif 561 562 /* perform both sides monitoring by default */ 563 monitor->client_mon = monitor->server_mon = 1; 564 565 /* add monitor socket to the monitor list */ 566 TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link); 567 568 mtcp->msmap[sockid].monitor_listener = monitor; 569 570 return 0; 571 } 572 /*----------------------------------------------------------------------------*/ 573 int 574 mtcp_socket(mctx_t mctx, int domain, int type, int protocol) 575 { 576 mtcp_manager_t mtcp; 577 socket_map_t socket; 578 579 mtcp = GetMTCPManager(mctx); 580 if (!mtcp) { 581 errno = EACCES; 582 return -1; 583 } 584 585 if (domain != AF_INET) { 586 errno = EAFNOSUPPORT; 587 return -1; 588 } 589 590 if (type == SOCK_STREAM) { 591 type = MOS_SOCK_STREAM; 592 } else if (type == MOS_SOCK_MONITOR_STREAM || 593 type == MOS_SOCK_MONITOR_RAW) { 594 /* do nothing for the time being */ 595 } else { 596 /* Not supported type */ 597 errno = EINVAL; 598 return -1; 599 } 600 601 socket = AllocateSocket(mctx, type); 602 if (!socket) { 603 errno = ENFILE; 604 return -1; 605 } 606 607 if (type == MOS_SOCK_MONITOR_STREAM || 608 type == MOS_SOCK_MONITOR_RAW) { 609 mtcp_manager_t mtcp = GetMTCPManager(mctx); 610 if (!mtcp) { 611 errno = EACCES; 612 return -1; 613 } 614 mtcp_monitor(mctx, socket); 615 #ifdef NEWEV 616 socket->monitor_listener->stree_dontcare = NULL; 617 socket->monitor_listener->stree_pre_rcv = NULL; 618 socket->monitor_listener->stree_post_snd = NULL; 619 #else 620 InitEvB(mtcp, &socket->monitor_listener->dontcare_evb); 621 InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb); 622 InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb); 623 #endif 624 } 625 626 return socket->id; 627 } 628 /*----------------------------------------------------------------------------*/ 629 int 630 mtcp_bind(mctx_t mctx, int sockid, 631 const struct sockaddr *addr, socklen_t addrlen) 632 { 633 mtcp_manager_t mtcp; 634 struct sockaddr_in *addr_in; 635 636 mtcp = GetMTCPManager(mctx); 637 if (!mtcp) { 638 errno = EACCES; 639 return -1; 640 } 641 642 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 643 TRACE_API("Socket id %d out of range.\n", sockid); 644 errno = EBADF; 645 return -1; 646 } 647 648 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 649 TRACE_API("Invalid socket id: %d\n", sockid); 650 errno = EBADF; 651 return -1; 652 } 653 654 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM && 655 mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 656 TRACE_API("Not a stream socket id: %d\n", sockid); 657 errno = ENOTSOCK; 658 return -1; 659 } 660 661 if (!addr) { 662 TRACE_API("Socket %d: empty address!\n", sockid); 663 errno = EINVAL; 664 return -1; 665 } 666 667 if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) { 668 TRACE_API("Socket %d: adress already bind for this socket.\n", sockid); 669 errno = EINVAL; 670 return -1; 671 } 672 673 /* we only allow bind() for AF_INET address */ 674 if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 675 TRACE_API("Socket %d: invalid argument!\n", sockid); 676 errno = EINVAL; 677 return -1; 678 } 679 680 if (mtcp->listener) { 681 TRACE_API("Address already bound!\n"); 682 errno = EINVAL; 683 return -1; 684 } 685 addr_in = (struct sockaddr_in *)addr; 686 mtcp->smap[sockid].saddr = *addr_in; 687 mtcp->smap[sockid].opts |= MTCP_ADDR_BIND; 688 689 return 0; 690 } 691 /*----------------------------------------------------------------------------*/ 692 int 693 mtcp_listen(mctx_t mctx, int sockid, int backlog) 694 { 695 mtcp_manager_t mtcp; 696 struct tcp_listener *listener; 697 698 mtcp = GetMTCPManager(mctx); 699 if (!mtcp) { 700 errno = EACCES; 701 return -1; 702 } 703 704 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 705 TRACE_API("Socket id %d out of range.\n", sockid); 706 errno = EBADF; 707 return -1; 708 } 709 710 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 711 TRACE_API("Invalid socket id: %d\n", sockid); 712 errno = EBADF; 713 return -1; 714 } 715 716 if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) { 717 mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN; 718 } 719 720 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 721 TRACE_API("Not a listening socket. id: %d\n", sockid); 722 errno = ENOTSOCK; 723 return -1; 724 } 725 726 if (backlog <= 0 || backlog > g_config.mos->max_concurrency) { 727 errno = EINVAL; 728 return -1; 729 } 730 731 listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener)); 732 if (!listener) { 733 /* errno set from the malloc() */ 734 errno = ENOMEM; 735 return -1; 736 } 737 738 listener->sockid = sockid; 739 listener->backlog = backlog; 740 listener->socket = &mtcp->smap[sockid]; 741 742 if (pthread_cond_init(&listener->accept_cond, NULL)) { 743 perror("pthread_cond_init of ctx->accept_cond\n"); 744 /* errno set by pthread_cond_init() */ 745 return -1; 746 } 747 if (pthread_mutex_init(&listener->accept_lock, NULL)) { 748 perror("pthread_mutex_init of ctx->accept_lock\n"); 749 /* errno set by pthread_mutex_init() */ 750 return -1; 751 } 752 753 listener->acceptq = CreateStreamQueue(backlog); 754 if (!listener->acceptq) { 755 errno = ENOMEM; 756 return -1; 757 } 758 759 mtcp->smap[sockid].listener = listener; 760 mtcp->listener = listener; 761 762 return 0; 763 } 764 /*----------------------------------------------------------------------------*/ 765 int 766 mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen) 767 { 768 mtcp_manager_t mtcp; 769 struct tcp_listener *listener; 770 socket_map_t socket; 771 tcp_stream *accepted = NULL; 772 773 mtcp = GetMTCPManager(mctx); 774 if (!mtcp) { 775 errno = EACCES; 776 return -1; 777 } 778 779 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 780 TRACE_API("Socket id %d out of range.\n", sockid); 781 errno = EBADF; 782 return -1; 783 } 784 785 /* requires listening socket */ 786 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 787 errno = EINVAL; 788 return -1; 789 } 790 791 listener = mtcp->smap[sockid].listener; 792 793 /* dequeue from the acceptq without lock first */ 794 /* if nothing there, acquire lock and cond_wait */ 795 accepted = StreamDequeue(listener->acceptq); 796 if (!accepted) { 797 if (listener->socket->opts & MTCP_NONBLOCK) { 798 errno = EAGAIN; 799 return -1; 800 801 } else { 802 pthread_mutex_lock(&listener->accept_lock); 803 while ((accepted = StreamDequeue(listener->acceptq)) == NULL) { 804 pthread_cond_wait(&listener->accept_cond, &listener->accept_lock); 805 806 if (mtcp->ctx->done || mtcp->ctx->exit) { 807 pthread_mutex_unlock(&listener->accept_lock); 808 errno = EINTR; 809 return -1; 810 } 811 } 812 pthread_mutex_unlock(&listener->accept_lock); 813 } 814 } 815 816 if (!accepted) { 817 TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n"); 818 } 819 820 if (!accepted->socket) { 821 socket = AllocateSocket(mctx, MOS_SOCK_STREAM); 822 if (!socket) { 823 TRACE_ERROR("Failed to create new socket!\n"); 824 /* TODO: destroy the stream */ 825 errno = ENFILE; 826 return -1; 827 } 828 socket->stream = accepted; 829 accepted->socket = socket; 830 /* if monitor is enabled, complete the socket assignment */ 831 if (socket->stream->pair_stream != NULL) 832 socket->stream->pair_stream->socket = socket; 833 } 834 835 TRACE_API("Stream %d accepted.\n", accepted->id); 836 837 if (addr && addrlen) { 838 struct sockaddr_in *addr_in = (struct sockaddr_in *)addr; 839 addr_in->sin_family = AF_INET; 840 addr_in->sin_port = accepted->dport; 841 addr_in->sin_addr.s_addr = accepted->daddr; 842 *addrlen = sizeof(struct sockaddr_in); 843 } 844 845 return accepted->socket->id; 846 } 847 /*----------------------------------------------------------------------------*/ 848 int 849 mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr, 850 in_addr_t daddr, in_addr_t dport) 851 { 852 mtcp_manager_t mtcp; 853 addr_pool_t ap; 854 855 mtcp = GetMTCPManager(mctx); 856 if (!mtcp) { 857 errno = EACCES; 858 return -1; 859 } 860 861 if (saddr_base == INADDR_ANY) { 862 int nif_out; 863 864 /* for the INADDR_ANY, find the output interface for the destination 865 and set the saddr_base as the ip address of the output interface */ 866 nif_out = GetOutputInterface(daddr); 867 saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr; 868 } 869 870 ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus, 871 saddr_base, num_addr, daddr, dport); 872 if (!ap) { 873 errno = ENOMEM; 874 return -1; 875 } 876 877 mtcp->ap = ap; 878 879 return 0; 880 } 881 /*----------------------------------------------------------------------------*/ 882 int 883 eval_bpf_5tuple(struct sfbpf_program fcode, 884 in_addr_t saddr, in_port_t sport, 885 in_addr_t daddr, in_port_t dport) { 886 uint8_t buf[TOTAL_TCP_HEADER_LEN]; 887 struct ethhdr *ethh; 888 struct iphdr *iph; 889 struct tcphdr *tcph; 890 891 ethh = (struct ethhdr *)buf; 892 ethh->h_proto = htons(ETH_P_IP); 893 iph = (struct iphdr *)(ethh + 1); 894 iph->ihl = IP_HEADER_LEN >> 2; 895 iph->version = 4; 896 iph->tos = 0; 897 iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN); 898 iph->id = htons(0); 899 iph->protocol = IPPROTO_TCP; 900 iph->saddr = saddr; 901 iph->daddr = daddr; 902 iph->check = 0; 903 tcph = (struct tcphdr *)(iph + 1); 904 tcph->source = sport; 905 tcph->dest = dport; 906 907 return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr), 908 TOTAL_TCP_HEADER_LEN); 909 } 910 /*----------------------------------------------------------------------------*/ 911 int 912 mtcp_connect(mctx_t mctx, int sockid, 913 const struct sockaddr *addr, socklen_t addrlen) 914 { 915 mtcp_manager_t mtcp; 916 socket_map_t socket; 917 tcp_stream *cur_stream; 918 struct sockaddr_in *addr_in; 919 in_addr_t dip; 920 in_port_t dport; 921 int is_dyn_bound = FALSE; 922 int ret; 923 int cnt_match = 0; 924 struct mon_listener *walk; 925 struct sfbpf_program fcode; 926 927 cur_stream = NULL; 928 mtcp = GetMTCPManager(mctx); 929 if (!mtcp) { 930 errno = EACCES; 931 return -1; 932 } 933 934 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 935 TRACE_API("Socket id %d out of range.\n", sockid); 936 errno = EBADF; 937 return -1; 938 } 939 940 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 941 TRACE_API("Invalid socket id: %d\n", sockid); 942 errno = EBADF; 943 return -1; 944 } 945 946 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 947 TRACE_API("Not an end socket. id: %d\n", sockid); 948 errno = ENOTSOCK; 949 return -1; 950 } 951 952 if (!addr) { 953 TRACE_API("Socket %d: empty address!\n", sockid); 954 errno = EFAULT; 955 return -1; 956 } 957 958 /* we only allow bind() for AF_INET address */ 959 if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 960 TRACE_API("Socket %d: invalid argument!\n", sockid); 961 errno = EAFNOSUPPORT; 962 return -1; 963 } 964 965 socket = &mtcp->smap[sockid]; 966 if (socket->stream) { 967 TRACE_API("Socket %d: stream already exist!\n", sockid); 968 if (socket->stream->state >= TCP_ST_ESTABLISHED) { 969 errno = EISCONN; 970 } else { 971 errno = EALREADY; 972 } 973 return -1; 974 } 975 976 addr_in = (struct sockaddr_in *)addr; 977 dip = addr_in->sin_addr.s_addr; 978 dport = addr_in->sin_port; 979 980 /* address binding */ 981 if (socket->opts & MTCP_ADDR_BIND && 982 socket->saddr.sin_port != INPORT_ANY && 983 socket->saddr.sin_addr.s_addr != INADDR_ANY) { 984 int rss_core; 985 986 rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip, 987 socket->saddr.sin_port, dport, num_queues); 988 989 if (rss_core != mctx->cpu) { 990 errno = EINVAL; 991 return -1; 992 } 993 } else { 994 if (mtcp->ap) { 995 ret = FetchAddress(mtcp->ap, 996 mctx->cpu, num_queues, addr_in, &socket->saddr); 997 } else { 998 ret = FetchAddress(ap, 999 mctx->cpu, num_queues, addr_in, &socket->saddr); 1000 } 1001 if (ret < 0) { 1002 errno = EAGAIN; 1003 return -1; 1004 } 1005 socket->opts |= MTCP_ADDR_BIND; 1006 is_dyn_bound = TRUE; 1007 } 1008 1009 cnt_match = 0; 1010 if (mtcp->num_msp > 0) { 1011 TAILQ_FOREACH(walk, &mtcp->monitors, link) { 1012 fcode = walk->stream_syn_fcode; 1013 if (!(ISSET_BPFFILTER(fcode) && 1014 eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr, 1015 socket->saddr.sin_port, 1016 dip, dport) == 0)) { 1017 walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1 1018 cnt_match++; 1019 } 1020 } 1021 } 1022 1023 if (mtcp->num_msp > 0 && cnt_match > 0) { 1024 /* 150820 dhkim: XXX: embedded mode is not verified */ 1025 #if 1 1026 cur_stream = CreateClientTCPStream(mtcp, socket, 1027 STREAM_TYPE(MOS_SOCK_STREAM) | 1028 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 1029 socket->saddr.sin_addr.s_addr, 1030 socket->saddr.sin_port, dip, dport, NULL); 1031 #else 1032 cur_stream = CreateDualTCPStream(mtcp, socket, 1033 STREAM_TYPE(MOS_SOCK_STREAM) | 1034 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 1035 socket->saddr.sin_addr.s_addr, 1036 socket->saddr.sin_port, dip, dport, NULL); 1037 #endif 1038 } 1039 else 1040 cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM), 1041 socket->saddr.sin_addr.s_addr, 1042 socket->saddr.sin_port, dip, dport, NULL); 1043 if (!cur_stream) { 1044 TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid); 1045 errno = ENOMEM; 1046 return -1; 1047 } 1048 1049 if (is_dyn_bound) 1050 cur_stream->is_bound_addr = TRUE; 1051 cur_stream->sndvar->cwnd = 1; 1052 cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10; 1053 cur_stream->side = MOS_SIDE_CLI; 1054 /* if monitor is enabled, update the pair stream side as well */ 1055 if (cur_stream->pair_stream) { 1056 cur_stream->pair_stream->side = MOS_SIDE_SVR; 1057 /* 1058 * if buffer management is off, then disable 1059 * monitoring tcp ring of server... 1060 * if there is even a single monitor asking for 1061 * buffer management, enable it (that's why the 1062 * need for the loop) 1063 */ 1064 cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF; 1065 struct socket_map *walk; 1066 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 1067 uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt; 1068 if (bm > cur_stream->pair_stream->buffer_mgmt) { 1069 cur_stream->pair_stream->buffer_mgmt = bm; 1070 break; 1071 } 1072 } SOCKQ_FOREACH_END; 1073 } 1074 1075 cur_stream->state = TCP_ST_SYN_SENT; 1076 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1077 1078 TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id); 1079 1080 SQ_LOCK(&mtcp->ctx->connect_lock); 1081 ret = StreamEnqueue(mtcp->connectq, cur_stream); 1082 SQ_UNLOCK(&mtcp->ctx->connect_lock); 1083 mtcp->wakeup_flag = TRUE; 1084 if (ret < 0) { 1085 TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid); 1086 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1087 StreamEnqueue(mtcp->destroyq, cur_stream); 1088 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1089 errno = EAGAIN; 1090 return -1; 1091 } 1092 1093 /* if nonblocking socket, return EINPROGRESS */ 1094 if (socket->opts & MTCP_NONBLOCK) { 1095 errno = EINPROGRESS; 1096 return -1; 1097 1098 } else { 1099 while (1) { 1100 if (!cur_stream) { 1101 TRACE_ERROR("STREAM DESTROYED\n"); 1102 errno = ETIMEDOUT; 1103 return -1; 1104 } 1105 if (cur_stream->state > TCP_ST_ESTABLISHED) { 1106 TRACE_ERROR("Socket %d: weird state %s\n", 1107 sockid, TCPStateToString(cur_stream)); 1108 // TODO: how to handle this? 1109 errno = ENOSYS; 1110 return -1; 1111 } 1112 1113 if (cur_stream->state == TCP_ST_ESTABLISHED) { 1114 break; 1115 } 1116 usleep(1000); 1117 } 1118 } 1119 1120 return 0; 1121 } 1122 /*----------------------------------------------------------------------------*/ 1123 static inline int 1124 CloseStreamSocket(mctx_t mctx, int sockid) 1125 { 1126 mtcp_manager_t mtcp; 1127 tcp_stream *cur_stream; 1128 int ret; 1129 1130 mtcp = GetMTCPManager(mctx); 1131 if (!mtcp) { 1132 errno = EACCES; 1133 return -1; 1134 } 1135 1136 cur_stream = mtcp->smap[sockid].stream; 1137 if (!cur_stream) { 1138 TRACE_API("Socket %d: stream does not exist.\n", sockid); 1139 errno = ENOTCONN; 1140 return -1; 1141 } 1142 1143 if (cur_stream->closed) { 1144 TRACE_API("Socket %d (Stream %u): already closed stream\n", 1145 sockid, cur_stream->id); 1146 return 0; 1147 } 1148 cur_stream->closed = TRUE; 1149 1150 TRACE_API("Stream %d: closing the stream.\n", cur_stream->id); 1151 1152 /* 141029 dhkim: Check this! */ 1153 cur_stream->socket = NULL; 1154 1155 if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 1156 TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n", 1157 cur_stream->id); 1158 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1159 StreamEnqueue(mtcp->destroyq, cur_stream); 1160 mtcp->wakeup_flag = TRUE; 1161 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1162 return 0; 1163 1164 } else if (cur_stream->state == TCP_ST_SYN_SENT) { 1165 #if 1 1166 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1167 StreamEnqueue(mtcp->destroyq, cur_stream); 1168 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1169 mtcp->wakeup_flag = TRUE; 1170 #endif 1171 return -1; 1172 1173 } else if (cur_stream->state != TCP_ST_ESTABLISHED && 1174 cur_stream->state != TCP_ST_CLOSE_WAIT) { 1175 TRACE_API("Stream %d at state %s\n", 1176 cur_stream->id, TCPStateToString(cur_stream)); 1177 errno = EBADF; 1178 return -1; 1179 } 1180 1181 SQ_LOCK(&mtcp->ctx->close_lock); 1182 cur_stream->sndvar->on_closeq = TRUE; 1183 ret = StreamEnqueue(mtcp->closeq, cur_stream); 1184 mtcp->wakeup_flag = TRUE; 1185 SQ_UNLOCK(&mtcp->ctx->close_lock); 1186 1187 if (ret < 0) { 1188 TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 1189 errno = EAGAIN; 1190 return -1; 1191 } 1192 1193 return 0; 1194 } 1195 /*----------------------------------------------------------------------------*/ 1196 static inline int 1197 CloseListeningSocket(mctx_t mctx, int sockid) 1198 { 1199 mtcp_manager_t mtcp; 1200 struct tcp_listener *listener; 1201 1202 mtcp = GetMTCPManager(mctx); 1203 if (!mtcp) { 1204 errno = EACCES; 1205 return -1; 1206 } 1207 1208 listener = mtcp->smap[sockid].listener; 1209 if (!listener) { 1210 errno = EINVAL; 1211 return -1; 1212 } 1213 1214 if (listener->acceptq) { 1215 DestroyStreamQueue(listener->acceptq); 1216 listener->acceptq = NULL; 1217 } 1218 1219 pthread_mutex_lock(&listener->accept_lock); 1220 pthread_cond_signal(&listener->accept_cond); 1221 pthread_mutex_unlock(&listener->accept_lock); 1222 1223 pthread_cond_destroy(&listener->accept_cond); 1224 pthread_mutex_destroy(&listener->accept_lock); 1225 1226 free(listener); 1227 mtcp->smap[sockid].listener = NULL; 1228 1229 return 0; 1230 } 1231 /*----------------------------------------------------------------------------*/ 1232 int 1233 mtcp_close(mctx_t mctx, int sockid) 1234 { 1235 mtcp_manager_t mtcp; 1236 int ret; 1237 1238 mtcp = GetMTCPManager(mctx); 1239 if (!mtcp) { 1240 errno = EACCES; 1241 return -1; 1242 } 1243 1244 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1245 TRACE_API("Socket id %d out of range.\n", sockid); 1246 errno = EBADF; 1247 return -1; 1248 } 1249 1250 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 1251 TRACE_API("Invalid socket id: %d\n", sockid); 1252 errno = EBADF; 1253 return -1; 1254 } 1255 1256 TRACE_API("Socket %d: mtcp_close called.\n", sockid); 1257 1258 switch (mtcp->smap[sockid].socktype) { 1259 case MOS_SOCK_STREAM: 1260 ret = CloseStreamSocket(mctx, sockid); 1261 break; 1262 1263 case MOS_SOCK_STREAM_LISTEN: 1264 ret = CloseListeningSocket(mctx, sockid); 1265 break; 1266 1267 case MOS_SOCK_EPOLL: 1268 ret = CloseEpollSocket(mctx, sockid); 1269 break; 1270 1271 case MOS_SOCK_PIPE: 1272 ret = PipeClose(mctx, sockid); 1273 break; 1274 1275 default: 1276 errno = EINVAL; 1277 ret = -1; 1278 break; 1279 } 1280 1281 FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 1282 1283 return ret; 1284 } 1285 /*----------------------------------------------------------------------------*/ 1286 int 1287 mtcp_abort(mctx_t mctx, int sockid) 1288 { 1289 mtcp_manager_t mtcp; 1290 tcp_stream *cur_stream; 1291 int ret; 1292 1293 mtcp = GetMTCPManager(mctx); 1294 if (!mtcp) { 1295 errno = EACCES; 1296 return -1; 1297 } 1298 1299 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1300 TRACE_API("Socket id %d out of range.\n", sockid); 1301 errno = EBADF; 1302 return -1; 1303 } 1304 1305 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 1306 TRACE_API("Invalid socket id: %d\n", sockid); 1307 errno = EBADF; 1308 return -1; 1309 } 1310 1311 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 1312 TRACE_API("Not an end socket. id: %d\n", sockid); 1313 errno = ENOTSOCK; 1314 return -1; 1315 } 1316 1317 cur_stream = mtcp->smap[sockid].stream; 1318 if (!cur_stream) { 1319 TRACE_API("Stream %d: does not exist.\n", sockid); 1320 errno = ENOTCONN; 1321 return -1; 1322 } 1323 1324 TRACE_API("Socket %d: mtcp_abort()\n", sockid); 1325 1326 FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 1327 cur_stream->socket = NULL; 1328 1329 if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 1330 TRACE_API("Stream %d: connection already reset.\n", sockid); 1331 return ERROR; 1332 1333 } else if (cur_stream->state == TCP_ST_SYN_SENT) { 1334 /* TODO: this should notify event failure to all 1335 previous read() or write() calls */ 1336 cur_stream->state = TCP_ST_CLOSED_RSVD; 1337 cur_stream->close_reason = TCP_ACTIVE_CLOSE; 1338 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1339 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1340 StreamEnqueue(mtcp->destroyq, cur_stream); 1341 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1342 mtcp->wakeup_flag = TRUE; 1343 return 0; 1344 1345 } else if (cur_stream->state == TCP_ST_CLOSING || 1346 cur_stream->state == TCP_ST_LAST_ACK || 1347 cur_stream->state == TCP_ST_TIME_WAIT) { 1348 cur_stream->state = TCP_ST_CLOSED_RSVD; 1349 cur_stream->close_reason = TCP_ACTIVE_CLOSE; 1350 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1351 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1352 StreamEnqueue(mtcp->destroyq, cur_stream); 1353 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1354 mtcp->wakeup_flag = TRUE; 1355 return 0; 1356 } 1357 1358 /* the stream structure will be destroyed after sending RST */ 1359 if (cur_stream->sndvar->on_resetq) { 1360 TRACE_ERROR("Stream %d: calling mtcp_abort() " 1361 "when in reset queue.\n", sockid); 1362 errno = ECONNRESET; 1363 return -1; 1364 } 1365 SQ_LOCK(&mtcp->ctx->reset_lock); 1366 cur_stream->sndvar->on_resetq = TRUE; 1367 ret = StreamEnqueue(mtcp->resetq, cur_stream); 1368 SQ_UNLOCK(&mtcp->ctx->reset_lock); 1369 mtcp->wakeup_flag = TRUE; 1370 1371 if (ret < 0) { 1372 TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 1373 errno = EAGAIN; 1374 return -1; 1375 } 1376 1377 return 0; 1378 } 1379 /*----------------------------------------------------------------------------*/ 1380 static inline int 1381 CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len) 1382 { 1383 struct tcp_recv_vars *rcvvar = cur_stream->rcvvar; 1384 int copylen; 1385 #ifdef NEWRB 1386 tcprb_t *rb = rcvvar->rcvbuf; 1387 if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) { 1388 errno = EAGAIN; 1389 return -1; 1390 } 1391 tcprb_setpile(rb, rb->pile + copylen); 1392 1393 rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb); 1394 //printf("rcv_wnd: %d\n", rcvvar->rcv_wnd); 1395 #else 1396 copylen = MIN(rcvvar->rcvbuf->merged_len, len); 1397 if (copylen <= 0) { 1398 errno = EAGAIN; 1399 return -1; 1400 } 1401 1402 assert(rcvvar->rcvbuf->data); 1403 /* Copy data to user buffer and remove it from receiving buffer */ 1404 memcpy(buf, rcvvar->rcvbuf->head, copylen); 1405 RBRemove(mtcp->rbm_rcv, rcvvar->rcvbuf, copylen, AT_APP, cur_stream->buffer_mgmt); 1406 rcvvar->rcv_wnd = rcvvar->rcvbuf->size - rcvvar->rcvbuf->merged_len; 1407 #endif 1408 1409 /* Advertise newly freed receive buffer */ 1410 if (cur_stream->need_wnd_adv) { 1411 if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) { 1412 if (!cur_stream->sndvar->on_ackq) { 1413 SQ_LOCK(&mtcp->ctx->ackq_lock); 1414 cur_stream->sndvar->on_ackq = TRUE; 1415 StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */ 1416 SQ_UNLOCK(&mtcp->ctx->ackq_lock); 1417 cur_stream->need_wnd_adv = FALSE; 1418 mtcp->wakeup_flag = TRUE; 1419 } 1420 } 1421 } 1422 1423 return copylen; 1424 } 1425 /*----------------------------------------------------------------------------*/ 1426 ssize_t 1427 mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len) 1428 { 1429 mtcp_manager_t mtcp; 1430 socket_map_t socket; 1431 tcp_stream *cur_stream; 1432 struct tcp_recv_vars *rcvvar; 1433 int event_remaining, merged_len; 1434 int ret; 1435 1436 mtcp = GetMTCPManager(mctx); 1437 if (!mtcp) { 1438 errno = EACCES; 1439 return -1; 1440 } 1441 1442 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1443 TRACE_API("Socket id %d out of range.\n", sockid); 1444 errno = EBADF; 1445 return -1; 1446 } 1447 1448 socket = &mtcp->smap[sockid]; 1449 if (socket->socktype == MOS_SOCK_UNUSED) { 1450 TRACE_API("Invalid socket id: %d\n", sockid); 1451 errno = EBADF; 1452 return -1; 1453 } 1454 1455 if (socket->socktype == MOS_SOCK_PIPE) { 1456 return PipeRead(mctx, sockid, buf, len); 1457 } 1458 1459 if (socket->socktype != MOS_SOCK_STREAM) { 1460 TRACE_API("Not an end socket. id: %d\n", sockid); 1461 errno = ENOTSOCK; 1462 return -1; 1463 } 1464 1465 /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 1466 cur_stream = socket->stream; 1467 if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf || 1468 !(cur_stream->state >= TCP_ST_ESTABLISHED && 1469 cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 1470 errno = ENOTCONN; 1471 return -1; 1472 } 1473 1474 rcvvar = cur_stream->rcvvar; 1475 1476 #ifdef NEWRB 1477 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1478 #else 1479 merged_len = rcvvar->rcvbuf->merged_len; 1480 #endif 1481 1482 /* if CLOSE_WAIT, return 0 if there is no payload */ 1483 if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 1484 if (!rcvvar->rcvbuf) 1485 return 0; 1486 1487 if (merged_len == 0) 1488 return 0; 1489 } 1490 1491 /* return EAGAIN if no receive buffer */ 1492 if (socket->opts & MTCP_NONBLOCK) { 1493 if (!rcvvar->rcvbuf || merged_len == 0) { 1494 errno = EAGAIN; 1495 return -1; 1496 } 1497 } 1498 1499 SBUF_LOCK(&rcvvar->read_lock); 1500 1501 ret = CopyToUser(mtcp, cur_stream, buf, len); 1502 1503 #ifdef NEWRB 1504 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1505 #else 1506 merged_len = rcvvar->rcvbuf->merged_len; 1507 #endif 1508 1509 event_remaining = FALSE; 1510 /* if there are remaining payload, generate EPOLLIN */ 1511 /* (may due to insufficient user buffer) */ 1512 if (socket->epoll & MOS_EPOLLIN) { 1513 if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 1514 event_remaining = TRUE; 1515 } 1516 } 1517 /* if waiting for close, notify it if no remaining data */ 1518 if (cur_stream->state == TCP_ST_CLOSE_WAIT && 1519 merged_len == 0 && ret > 0) { 1520 event_remaining = TRUE; 1521 } 1522 1523 SBUF_UNLOCK(&rcvvar->read_lock); 1524 1525 if (event_remaining) { 1526 if (socket->epoll) { 1527 AddEpollEvent(mtcp->ep, 1528 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 1529 } 1530 } 1531 1532 TRACE_API("Stream %d: mtcp_read() returning %d\n", cur_stream->id, ret); 1533 return ret; 1534 } 1535 /*----------------------------------------------------------------------------*/ 1536 ssize_t 1537 mtcp_readv(mctx_t mctx, int sockid, struct iovec *iov, int numIOV) 1538 { 1539 mtcp_manager_t mtcp; 1540 socket_map_t socket; 1541 tcp_stream *cur_stream; 1542 struct tcp_recv_vars *rcvvar; 1543 int ret, bytes_read, i; 1544 int event_remaining, merged_len; 1545 1546 mtcp = GetMTCPManager(mctx); 1547 if (!mtcp) { 1548 errno = EACCES; 1549 return -1; 1550 } 1551 1552 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1553 TRACE_API("Socket id %d out of range.\n", sockid); 1554 errno = EBADF; 1555 return -1; 1556 } 1557 1558 socket = &mtcp->smap[sockid]; 1559 if (socket->socktype == MOS_SOCK_UNUSED) { 1560 TRACE_API("Invalid socket id: %d\n", sockid); 1561 errno = EBADF; 1562 return -1; 1563 } 1564 1565 if (socket->socktype != MOS_SOCK_STREAM) { 1566 TRACE_API("Not an end socket. id: %d\n", sockid); 1567 errno = ENOTSOCK; 1568 return -1; 1569 } 1570 1571 /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 1572 cur_stream = socket->stream; 1573 if (!cur_stream || 1574 !(cur_stream->state >= TCP_ST_ESTABLISHED && 1575 cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 1576 errno = ENOTCONN; 1577 return -1; 1578 } 1579 1580 rcvvar = cur_stream->rcvvar; 1581 1582 #ifdef NEWRB 1583 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1584 #else 1585 merged_len = rcvvar->rcvbuf->merged_len; 1586 #endif 1587 1588 /* if CLOSE_WAIT, return 0 if there is no payload */ 1589 if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 1590 if (!rcvvar->rcvbuf) 1591 return 0; 1592 1593 if (merged_len == 0) 1594 return 0; 1595 } 1596 1597 /* return EAGAIN if no receive buffer */ 1598 if (socket->opts & MTCP_NONBLOCK) { 1599 if (!rcvvar->rcvbuf || merged_len == 0) { 1600 errno = EAGAIN; 1601 return -1; 1602 } 1603 } 1604 1605 SBUF_LOCK(&rcvvar->read_lock); 1606 1607 /* read and store the contents to the vectored buffers */ 1608 bytes_read = 0; 1609 for (i = 0; i < numIOV; i++) { 1610 if (iov[i].iov_len <= 0) 1611 continue; 1612 1613 ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 1614 if (ret <= 0) 1615 break; 1616 1617 bytes_read += ret; 1618 1619 if (ret < iov[i].iov_len) 1620 break; 1621 } 1622 1623 #ifdef NEWRB 1624 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1625 #else 1626 merged_len = rcvvar->rcvbuf->merged_len; 1627 #endif 1628 1629 event_remaining = FALSE; 1630 /* if there are remaining payload, generate read event */ 1631 /* (may due to insufficient user buffer) */ 1632 if (socket->epoll & MOS_EPOLLIN) { 1633 if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 1634 event_remaining = TRUE; 1635 } 1636 } 1637 /* if waiting for close, notify it if no remaining data */ 1638 if (cur_stream->state == TCP_ST_CLOSE_WAIT && 1639 merged_len == 0 && bytes_read > 0) { 1640 event_remaining = TRUE; 1641 } 1642 1643 SBUF_UNLOCK(&rcvvar->read_lock); 1644 1645 if(event_remaining) { 1646 if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) { 1647 AddEpollEvent(mtcp->ep, 1648 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 1649 } 1650 } 1651 1652 TRACE_API("Stream %d: mtcp_readv() returning %d\n", 1653 cur_stream->id, bytes_read); 1654 return bytes_read; 1655 } 1656 /*----------------------------------------------------------------------------*/ 1657 static inline int 1658 CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len) 1659 { 1660 struct tcp_send_vars *sndvar = cur_stream->sndvar; 1661 int sndlen; 1662 int ret; 1663 1664 sndlen = MIN((int)sndvar->snd_wnd, len); 1665 if (sndlen <= 0) { 1666 errno = EAGAIN; 1667 return -1; 1668 } 1669 1670 /* allocate send buffer if not exist */ 1671 if (!sndvar->sndbuf) { 1672 sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1); 1673 if (!sndvar->sndbuf) { 1674 cur_stream->close_reason = TCP_NO_MEM; 1675 /* notification may not required due to -1 return */ 1676 errno = ENOMEM; 1677 return -1; 1678 } 1679 } 1680 1681 ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen); 1682 assert(ret == sndlen); 1683 sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len; 1684 if (ret <= 0) { 1685 TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n", 1686 ret, sndlen, sndvar->sndbuf->len); 1687 errno = EAGAIN; 1688 return -1; 1689 } 1690 1691 if (sndvar->snd_wnd <= 0) { 1692 TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n", 1693 cur_stream->id, sndvar->snd_wnd); 1694 } 1695 1696 return ret; 1697 } 1698 /*----------------------------------------------------------------------------*/ 1699 ssize_t 1700 mtcp_write(mctx_t mctx, int sockid, char *buf, size_t len) 1701 { 1702 mtcp_manager_t mtcp; 1703 socket_map_t socket; 1704 tcp_stream *cur_stream; 1705 struct tcp_send_vars *sndvar; 1706 int ret; 1707 1708 mtcp = GetMTCPManager(mctx); 1709 if (!mtcp) { 1710 errno = EACCES; 1711 return -1; 1712 } 1713 1714 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1715 TRACE_API("Socket id %d out of range.\n", sockid); 1716 errno = EBADF; 1717 return -1; 1718 } 1719 1720 socket = &mtcp->smap[sockid]; 1721 if (socket->socktype == MOS_SOCK_UNUSED) { 1722 TRACE_API("Invalid socket id: %d\n", sockid); 1723 errno = EBADF; 1724 return -1; 1725 } 1726 1727 if (socket->socktype == MOS_SOCK_PIPE) { 1728 return PipeWrite(mctx, sockid, buf, len); 1729 } 1730 1731 if (socket->socktype != MOS_SOCK_STREAM) { 1732 TRACE_API("Not an end socket. id: %d\n", sockid); 1733 errno = ENOTSOCK; 1734 return -1; 1735 } 1736 1737 cur_stream = socket->stream; 1738 if (!cur_stream || 1739 !(cur_stream->state == TCP_ST_ESTABLISHED || 1740 cur_stream->state == TCP_ST_CLOSE_WAIT)) { 1741 errno = ENOTCONN; 1742 return -1; 1743 } 1744 1745 if (len <= 0) { 1746 if (socket->opts & MTCP_NONBLOCK) { 1747 errno = EAGAIN; 1748 return -1; 1749 } else { 1750 return 0; 1751 } 1752 } 1753 1754 sndvar = cur_stream->sndvar; 1755 1756 SBUF_LOCK(&sndvar->write_lock); 1757 ret = CopyFromUser(mtcp, cur_stream, buf, len); 1758 1759 SBUF_UNLOCK(&sndvar->write_lock); 1760 1761 if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 1762 SQ_LOCK(&mtcp->ctx->sendq_lock); 1763 sndvar->on_sendq = TRUE; 1764 StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 1765 SQ_UNLOCK(&mtcp->ctx->sendq_lock); 1766 mtcp->wakeup_flag = TRUE; 1767 } 1768 1769 if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) { 1770 ret = -1; 1771 errno = EAGAIN; 1772 } 1773 1774 /* if there are remaining sending buffer, generate write event */ 1775 if (sndvar->snd_wnd > 0) { 1776 if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 1777 AddEpollEvent(mtcp->ep, 1778 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 1779 } 1780 } 1781 1782 TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret); 1783 return ret; 1784 } 1785 /*----------------------------------------------------------------------------*/ 1786 ssize_t 1787 mtcp_writev(mctx_t mctx, int sockid, struct iovec *iov, int numIOV) 1788 { 1789 mtcp_manager_t mtcp; 1790 socket_map_t socket; 1791 tcp_stream *cur_stream; 1792 struct tcp_send_vars *sndvar; 1793 int ret, to_write, i; 1794 1795 mtcp = GetMTCPManager(mctx); 1796 if (!mtcp) { 1797 errno = EACCES; 1798 return -1; 1799 } 1800 1801 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1802 TRACE_API("Socket id %d out of range.\n", sockid); 1803 errno = EBADF; 1804 return -1; 1805 } 1806 1807 socket = &mtcp->smap[sockid]; 1808 if (socket->socktype == MOS_SOCK_UNUSED) { 1809 TRACE_API("Invalid socket id: %d\n", sockid); 1810 errno = EBADF; 1811 return -1; 1812 } 1813 1814 if (socket->socktype != MOS_SOCK_STREAM) { 1815 TRACE_API("Not an end socket. id: %d\n", sockid); 1816 errno = ENOTSOCK; 1817 return -1; 1818 } 1819 1820 cur_stream = socket->stream; 1821 if (!cur_stream || 1822 !(cur_stream->state == TCP_ST_ESTABLISHED || 1823 cur_stream->state == TCP_ST_CLOSE_WAIT)) { 1824 errno = ENOTCONN; 1825 return -1; 1826 } 1827 1828 sndvar = cur_stream->sndvar; 1829 SBUF_LOCK(&sndvar->write_lock); 1830 1831 /* write from the vectored buffers */ 1832 to_write = 0; 1833 for (i = 0; i < numIOV; i++) { 1834 if (iov[i].iov_len <= 0) 1835 continue; 1836 1837 ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 1838 if (ret <= 0) 1839 break; 1840 1841 to_write += ret; 1842 1843 if (ret < iov[i].iov_len) 1844 break; 1845 } 1846 SBUF_UNLOCK(&sndvar->write_lock); 1847 1848 if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 1849 SQ_LOCK(&mtcp->ctx->sendq_lock); 1850 sndvar->on_sendq = TRUE; 1851 StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 1852 SQ_UNLOCK(&mtcp->ctx->sendq_lock); 1853 mtcp->wakeup_flag = TRUE; 1854 } 1855 1856 if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) { 1857 to_write = -1; 1858 errno = EAGAIN; 1859 } 1860 1861 /* if there are remaining sending buffer, generate write event */ 1862 if (sndvar->snd_wnd > 0) { 1863 if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 1864 AddEpollEvent(mtcp->ep, 1865 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 1866 } 1867 } 1868 1869 TRACE_API("Stream %d: mtcp_writev() returning %d\n", 1870 cur_stream->id, to_write); 1871 return to_write; 1872 } 1873 /*----------------------------------------------------------------------------*/ 1874 uint32_t 1875 mtcp_get_connection_cnt(mctx_t mctx) 1876 { 1877 mtcp_manager_t mtcp; 1878 mtcp = GetMTCPManager(mctx); 1879 if (!mtcp) { 1880 errno = EACCES; 1881 return -1; 1882 } 1883 1884 if (mtcp->num_msp > 0) 1885 return mtcp->flow_cnt / 2; 1886 else 1887 return mtcp->flow_cnt; 1888 } 1889 /*----------------------------------------------------------------------------*/ 1890