1 #include <sys/queue.h> 2 #include <sys/ioctl.h> 3 #include <limits.h> 4 #include <unistd.h> 5 #include <assert.h> 6 #include <string.h> 7 8 #ifdef DARWIN 9 #include <netinet/tcp.h> 10 #include <netinet/ip.h> 11 #include <netinet/if_ether.h> 12 #endif 13 14 #include "mtcp.h" 15 #include "mtcp_api.h" 16 #include "tcp_in.h" 17 #include "tcp_stream.h" 18 #include "tcp_out.h" 19 #include "ip_out.h" 20 #include "eventpoll.h" 21 #include "pipe.h" 22 #include "fhash.h" 23 #include "addr_pool.h" 24 #include "rss.h" 25 #include "config.h" 26 #include "debug.h" 27 #include "eventpoll.h" 28 #include "mos_api.h" 29 #include "tcp_rb.h" 30 31 #define MAX(a, b) ((a)>(b)?(a):(b)) 32 #define MIN(a, b) ((a)<(b)?(a):(b)) 33 34 /*----------------------------------------------------------------------------*/ 35 /** Stop monitoring the socket! (function prototype) 36 * @param [in] mctx: mtcp context 37 * @param [in] sock: monitoring stream socket id 38 * @param [in] side: side of monitoring (client side, server side or both) 39 * 40 * This function is now DEPRECATED and is only used within mOS core... 41 */ 42 int 43 mtcp_cb_stop(mctx_t mctx, int sock, int side); 44 /*----------------------------------------------------------------------------*/ 45 /** Reset the connection (send RST packets to both sides) 46 * (We need to decide the API for this.) 47 */ 48 //int 49 //mtcp_cb_reset(mctx_t mctx, int sock, int side); 50 /*----------------------------------------------------------------------------*/ 51 inline mtcp_manager_t 52 GetMTCPManager(mctx_t mctx) 53 { 54 if (!mctx) { 55 errno = EACCES; 56 return NULL; 57 } 58 59 if (mctx->cpu < 0 || mctx->cpu >= num_cpus) { 60 errno = EINVAL; 61 return NULL; 62 } 63 64 if (!g_mtcp[mctx->cpu] || g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) { 65 errno = EPERM; 66 return NULL; 67 } 68 69 return g_mtcp[mctx->cpu]; 70 } 71 /*----------------------------------------------------------------------------*/ 72 inline int 73 GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen) 74 { 75 tcp_stream *cur_stream; 76 77 if (!socket->stream) { 78 errno = EBADF; 79 return -1; 80 } 81 82 cur_stream = socket->stream; 83 if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 84 if (cur_stream->close_reason == TCP_TIMEDOUT || 85 cur_stream->close_reason == TCP_CONN_FAIL || 86 cur_stream->close_reason == TCP_CONN_LOST) { 87 *(int *)optval = ETIMEDOUT; 88 *optlen = sizeof(int); 89 90 return 0; 91 } 92 } 93 94 if (cur_stream->state == TCP_ST_CLOSE_WAIT || 95 cur_stream->state == TCP_ST_CLOSED_RSVD) { 96 if (cur_stream->close_reason == TCP_RESET) { 97 *(int *)optval = ECONNRESET; 98 *optlen = sizeof(int); 99 100 return 0; 101 } 102 } 103 104 /* 105 * `base case`: If socket sees no so_error, then 106 * this also means close_reason will always be 107 * TCP_NOT_CLOSED. 108 */ 109 if (cur_stream->close_reason == TCP_NOT_CLOSED) { 110 *(int *)optval = 0; 111 *optlen = sizeof(int); 112 113 return 0; 114 } 115 116 errno = ENOSYS; 117 return -1; 118 } 119 /*----------------------------------------------------------------------------*/ 120 int 121 mtcp_getsockopt(mctx_t mctx, int sockid, int level, 122 int optname, void *optval, socklen_t *optlen) 123 { 124 mtcp_manager_t mtcp; 125 socket_map_t socket; 126 127 mtcp = GetMTCPManager(mctx); 128 if (!mtcp) { 129 errno = EACCES; 130 return -1; 131 } 132 133 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 134 TRACE_API("Socket id %d out of range.\n", sockid); 135 errno = EBADF; 136 return -1; 137 } 138 139 switch (level) { 140 case SOL_SOCKET: 141 socket = &mtcp->smap[sockid]; 142 if (socket->socktype == MOS_SOCK_UNUSED) { 143 TRACE_API("Invalid socket id: %d\n", sockid); 144 errno = EBADF; 145 return -1; 146 } 147 148 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 149 socket->socktype != MOS_SOCK_STREAM) { 150 TRACE_API("Invalid socket id: %d\n", sockid); 151 errno = ENOTSOCK; 152 return -1; 153 } 154 155 if (optname == SO_ERROR) { 156 if (socket->socktype == MOS_SOCK_STREAM) { 157 return GetSocketError(socket, optval, optlen); 158 } 159 } 160 break; 161 case SOL_MONSOCKET: 162 /* check if the calling thread is in MOS context */ 163 if (mtcp->ctx->thread != pthread_self()) { 164 errno = EPERM; 165 return -1; 166 } 167 /* 168 * All options will only work for active 169 * monitor stream sockets 170 */ 171 socket = &mtcp->msmap[sockid]; 172 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 173 TRACE_API("Invalid socket id: %d\n", sockid); 174 errno = ENOTSOCK; 175 return -1; 176 } 177 178 switch (optname) { 179 case MOS_FRAGINFO_CLIBUF: 180 return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen); 181 case MOS_FRAGINFO_SVRBUF: 182 return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen); 183 case MOS_INFO_CLIBUF: 184 return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen); 185 case MOS_INFO_SVRBUF: 186 return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen); 187 case MOS_TCP_STATE_CLI: 188 return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI, 189 optval, optlen); 190 case MOS_TCP_STATE_SVR: 191 return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR, 192 optval, optlen); 193 case MOS_TIMESTAMP: 194 return GetLastTimestamp(socket->monitor_stream->stream, 195 (uint32_t *)optval, 196 optlen); 197 default: 198 TRACE_API("can't recognize the optname=%d\n", optname); 199 assert(0); 200 } 201 break; 202 } 203 errno = ENOSYS; 204 return -1; 205 } 206 /*----------------------------------------------------------------------------*/ 207 int 208 mtcp_setsockopt(mctx_t mctx, int sockid, int level, 209 int optname, const void *optval, socklen_t optlen) 210 { 211 mtcp_manager_t mtcp; 212 socket_map_t socket; 213 tcprb_t *rb; 214 215 mtcp = GetMTCPManager(mctx); 216 if (!mtcp) { 217 errno = EACCES; 218 return -1; 219 } 220 221 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 222 TRACE_API("Socket id %d out of range.\n", sockid); 223 errno = EBADF; 224 return -1; 225 } 226 227 switch (level) { 228 case SOL_SOCKET: 229 socket = &mtcp->smap[sockid]; 230 if (socket->socktype == MOS_SOCK_UNUSED) { 231 TRACE_API("Invalid socket id: %d\n", sockid); 232 errno = EBADF; 233 return -1; 234 } 235 236 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 237 socket->socktype != MOS_SOCK_STREAM) { 238 TRACE_API("Invalid socket id: %d\n", sockid); 239 errno = ENOTSOCK; 240 return -1; 241 } 242 break; 243 case SOL_MONSOCKET: 244 socket = &mtcp->msmap[sockid]; 245 /* 246 * checking of calling thread to be in MOS context is 247 * disabled since both optnames can be called from 248 * `application' context (on passive sockets) 249 */ 250 /* 251 * if (mtcp->ctx->thread != pthread_self()) 252 * return -1; 253 */ 254 255 switch (optname) { 256 case MOS_CLIBUF: 257 #if 0 258 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 259 errno = EBADF; 260 return -1; 261 } 262 #endif 263 #ifdef DISABLE_DYN_RESIZE 264 if (*(int *)optval != 0) 265 return -1; 266 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 267 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 268 socket->monitor_stream->stream->rcvvar->rcvbuf : 269 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 270 if (rb) { 271 tcprb_resize_meta(rb, 0); 272 tcprb_resize(rb, 0); 273 } 274 } 275 return DisableBuf(socket, MOS_SIDE_CLI); 276 #else 277 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 278 socket->monitor_stream->stream->rcvvar->rcvbuf : 279 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 280 if (tcprb_resize_meta(rb, *(int *)optval) < 0) 281 return -1; 282 return tcprb_resize(rb, 283 (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 284 #endif 285 case MOS_SVRBUF: 286 #if 0 287 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 288 errno = EBADF; 289 return -1; 290 } 291 #endif 292 #ifdef DISABLE_DYN_RESIZE 293 if (*(int *)optval != 0) 294 return -1; 295 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 296 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 297 socket->monitor_stream->stream->rcvvar->rcvbuf : 298 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 299 if (rb) { 300 tcprb_resize_meta(rb, 0); 301 tcprb_resize(rb, 0); 302 } 303 } 304 return DisableBuf(socket, MOS_SIDE_SVR); 305 #else 306 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 307 socket->monitor_stream->stream->rcvvar->rcvbuf : 308 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 309 if (tcprb_resize_meta(rb, *(int *)optval) < 0) 310 return -1; 311 return tcprb_resize(rb, 312 (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 313 #endif 314 case MOS_FRAG_CLIBUF: 315 #if 0 316 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 317 errno = EBADF; 318 return -1; 319 } 320 #endif 321 #ifdef DISABLE_DYN_RESIZE 322 if (*(int *)optval != 0) 323 return -1; 324 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 325 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 326 socket->monitor_stream->stream->rcvvar->rcvbuf : 327 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 328 if (rb) 329 tcprb_resize(rb, 0); 330 } 331 return 0; 332 #else 333 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 334 socket->monitor_stream->stream->rcvvar->rcvbuf : 335 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 336 if (rb->len == 0) 337 return tcprb_resize_meta(rb, *(int *)optval); 338 else 339 return -1; 340 #endif 341 case MOS_FRAG_SVRBUF: 342 #if 0 343 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 344 errno = EBADF; 345 return -1; 346 } 347 #endif 348 #ifdef DISABLE_DYN_RESIZE 349 if (*(int *)optval != 0) 350 return -1; 351 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 352 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 353 socket->monitor_stream->stream->rcvvar->rcvbuf : 354 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 355 if (rb) 356 tcprb_resize(rb, 0); 357 } 358 return 0; 359 #else 360 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 361 socket->monitor_stream->stream->rcvvar->rcvbuf : 362 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 363 if (rb->len == 0) 364 return tcprb_resize_meta(rb, *(int *)optval); 365 else 366 return -1; 367 #endif 368 case MOS_MONLEVEL: 369 #ifdef OLD_API 370 assert(*(int *)optval == MOS_NO_CLIBUF || 371 *(int *)optval == MOS_NO_SVRBUF); 372 return DisableBuf(socket, 373 (*(int *)optval == MOS_NO_CLIBUF) ? 374 MOS_SIDE_CLI : MOS_SIDE_SVR); 375 #endif 376 case MOS_SEQ_REMAP: 377 return TcpSeqChange(socket, 378 (uint32_t)((seq_remap_info *)optval)->seq_off, 379 ((seq_remap_info *)optval)->side, 380 mtcp->pctx->p.seq); 381 case MOS_STOP_MON: 382 return mtcp_cb_stop(mctx, sockid, *(int *)optval); 383 default: 384 TRACE_API("invalid optname=%d\n", optname); 385 assert(0); 386 } 387 break; 388 } 389 390 errno = ENOSYS; 391 return -1; 392 } 393 /*----------------------------------------------------------------------------*/ 394 int 395 mtcp_setsock_nonblock(mctx_t mctx, int sockid) 396 { 397 mtcp_manager_t mtcp; 398 399 mtcp = GetMTCPManager(mctx); 400 if (!mtcp) { 401 errno = EACCES; 402 return -1; 403 } 404 405 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 406 TRACE_API("Socket id %d out of range.\n", sockid); 407 errno = EBADF; 408 return -1; 409 } 410 411 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 412 TRACE_API("Invalid socket id: %d\n", sockid); 413 errno = EBADF; 414 return -1; 415 } 416 417 mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 418 419 return 0; 420 } 421 /*----------------------------------------------------------------------------*/ 422 int 423 mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp) 424 { 425 mtcp_manager_t mtcp; 426 socket_map_t socket; 427 428 mtcp = GetMTCPManager(mctx); 429 if (!mtcp) { 430 errno = EACCES; 431 return -1; 432 } 433 434 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 435 TRACE_API("Socket id %d out of range.\n", sockid); 436 errno = EBADF; 437 return -1; 438 } 439 440 /* only support stream socket */ 441 socket = &mtcp->smap[sockid]; 442 443 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 444 socket->socktype != MOS_SOCK_STREAM) { 445 TRACE_API("Invalid socket id: %d\n", sockid); 446 errno = EBADF; 447 return -1; 448 } 449 450 if (!argp) { 451 errno = EFAULT; 452 return -1; 453 } 454 455 if (request == FIONREAD) { 456 tcp_stream *cur_stream; 457 tcprb_t *rbuf; 458 459 cur_stream = socket->stream; 460 if (!cur_stream) { 461 errno = EBADF; 462 return -1; 463 } 464 465 rbuf = cur_stream->rcvvar->rcvbuf; 466 *(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0; 467 468 } else if (request == FIONBIO) { 469 /* 470 * sockets can only be set to blocking/non-blocking 471 * modes during initialization 472 */ 473 if ((*(int *)argp)) 474 mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 475 else 476 mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK; 477 } else { 478 errno = EINVAL; 479 return -1; 480 } 481 482 return 0; 483 } 484 /*----------------------------------------------------------------------------*/ 485 static int 486 mtcp_monitor(mctx_t mctx, socket_map_t sock) 487 { 488 mtcp_manager_t mtcp; 489 struct mon_listener *monitor; 490 int sockid = sock->id; 491 492 mtcp = GetMTCPManager(mctx); 493 if (!mtcp) { 494 errno = EACCES; 495 return -1; 496 } 497 498 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 499 TRACE_API("Socket id %d out of range.\n", sockid); 500 errno = EBADF; 501 return -1; 502 } 503 504 if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) { 505 TRACE_API("Invalid socket id: %d\n", sockid); 506 errno = EBADF; 507 return -1; 508 } 509 510 if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM || 511 mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) { 512 TRACE_API("Not a monitor socket. id: %d\n", sockid); 513 errno = ENOTSOCK; 514 return -1; 515 } 516 517 monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener)); 518 if (!monitor) { 519 /* errno set from the malloc() */ 520 errno = ENOMEM; 521 return -1; 522 } 523 524 /* create a monitor-specific event queue */ 525 monitor->eq = CreateEventQueue(g_config.mos->max_concurrency); 526 if (!monitor->eq) { 527 TRACE_API("Can't create event queue (concurrency: %d) for " 528 "monitor read event registrations!\n", 529 g_config.mos->max_concurrency); 530 free(monitor); 531 errno = ENOMEM; 532 return -1; 533 } 534 535 /* set monitor-related basic parameters */ 536 #ifndef NEWEV 537 monitor->ude_id = UDE_OFFSET; 538 #endif 539 monitor->socket = sock; 540 monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL; 541 542 /* perform both sides monitoring by default */ 543 monitor->client_mon = monitor->server_mon = 1; 544 545 /* add monitor socket to the monitor list */ 546 TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link); 547 548 mtcp->msmap[sockid].monitor_listener = monitor; 549 550 return 0; 551 } 552 /*----------------------------------------------------------------------------*/ 553 int 554 mtcp_socket(mctx_t mctx, int domain, int type, int protocol) 555 { 556 mtcp_manager_t mtcp; 557 socket_map_t socket; 558 559 mtcp = GetMTCPManager(mctx); 560 if (!mtcp) { 561 errno = EACCES; 562 return -1; 563 } 564 565 if (domain != AF_INET) { 566 errno = EAFNOSUPPORT; 567 return -1; 568 } 569 570 if (type == SOCK_STREAM) { 571 type = MOS_SOCK_STREAM; 572 } else if (type == MOS_SOCK_MONITOR_STREAM || 573 type == MOS_SOCK_MONITOR_RAW) { 574 /* do nothing for the time being */ 575 } else { 576 /* Not supported type */ 577 errno = EINVAL; 578 return -1; 579 } 580 581 socket = AllocateSocket(mctx, type); 582 if (!socket) { 583 errno = ENFILE; 584 return -1; 585 } 586 587 if (type == MOS_SOCK_MONITOR_STREAM || 588 type == MOS_SOCK_MONITOR_RAW) { 589 mtcp_manager_t mtcp = GetMTCPManager(mctx); 590 if (!mtcp) { 591 errno = EACCES; 592 return -1; 593 } 594 mtcp_monitor(mctx, socket); 595 #ifdef NEWEV 596 socket->monitor_listener->stree_dontcare = NULL; 597 socket->monitor_listener->stree_pre_rcv = NULL; 598 socket->monitor_listener->stree_post_snd = NULL; 599 #else 600 InitEvB(mtcp, &socket->monitor_listener->dontcare_evb); 601 InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb); 602 InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb); 603 #endif 604 } 605 606 return socket->id; 607 } 608 /*----------------------------------------------------------------------------*/ 609 int 610 mtcp_bind(mctx_t mctx, int sockid, 611 const struct sockaddr *addr, socklen_t addrlen) 612 { 613 mtcp_manager_t mtcp; 614 struct sockaddr_in *addr_in; 615 616 mtcp = GetMTCPManager(mctx); 617 if (!mtcp) { 618 errno = EACCES; 619 return -1; 620 } 621 622 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 623 TRACE_API("Socket id %d out of range.\n", sockid); 624 errno = EBADF; 625 return -1; 626 } 627 628 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 629 TRACE_API("Invalid socket id: %d\n", sockid); 630 errno = EBADF; 631 return -1; 632 } 633 634 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM && 635 mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 636 TRACE_API("Not a stream socket id: %d\n", sockid); 637 errno = ENOTSOCK; 638 return -1; 639 } 640 641 if (!addr) { 642 TRACE_API("Socket %d: empty address!\n", sockid); 643 errno = EINVAL; 644 return -1; 645 } 646 647 if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) { 648 TRACE_API("Socket %d: adress already bind for this socket.\n", sockid); 649 errno = EINVAL; 650 return -1; 651 } 652 653 /* we only allow bind() for AF_INET address */ 654 if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 655 TRACE_API("Socket %d: invalid argument!\n", sockid); 656 errno = EINVAL; 657 return -1; 658 } 659 660 if (mtcp->listener) { 661 TRACE_API("Address already bound!\n"); 662 errno = EINVAL; 663 return -1; 664 } 665 addr_in = (struct sockaddr_in *)addr; 666 mtcp->smap[sockid].saddr = *addr_in; 667 mtcp->smap[sockid].opts |= MTCP_ADDR_BIND; 668 669 return 0; 670 } 671 /*----------------------------------------------------------------------------*/ 672 int 673 mtcp_listen(mctx_t mctx, int sockid, int backlog) 674 { 675 mtcp_manager_t mtcp; 676 struct tcp_listener *listener; 677 678 mtcp = GetMTCPManager(mctx); 679 if (!mtcp) { 680 errno = EACCES; 681 return -1; 682 } 683 684 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 685 TRACE_API("Socket id %d out of range.\n", sockid); 686 errno = EBADF; 687 return -1; 688 } 689 690 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 691 TRACE_API("Invalid socket id: %d\n", sockid); 692 errno = EBADF; 693 return -1; 694 } 695 696 if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) { 697 mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN; 698 } 699 700 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 701 TRACE_API("Not a listening socket. id: %d\n", sockid); 702 errno = ENOTSOCK; 703 return -1; 704 } 705 706 if (backlog <= 0 || backlog > g_config.mos->max_concurrency) { 707 errno = EINVAL; 708 return -1; 709 } 710 711 listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener)); 712 if (!listener) { 713 /* errno set from the malloc() */ 714 errno = ENOMEM; 715 return -1; 716 } 717 718 listener->sockid = sockid; 719 listener->backlog = backlog; 720 listener->socket = &mtcp->smap[sockid]; 721 722 if (pthread_cond_init(&listener->accept_cond, NULL)) { 723 perror("pthread_cond_init of ctx->accept_cond\n"); 724 /* errno set by pthread_cond_init() */ 725 return -1; 726 } 727 if (pthread_mutex_init(&listener->accept_lock, NULL)) { 728 perror("pthread_mutex_init of ctx->accept_lock\n"); 729 /* errno set by pthread_mutex_init() */ 730 return -1; 731 } 732 733 listener->acceptq = CreateStreamQueue(backlog); 734 if (!listener->acceptq) { 735 errno = ENOMEM; 736 return -1; 737 } 738 739 mtcp->smap[sockid].listener = listener; 740 mtcp->listener = listener; 741 742 return 0; 743 } 744 /*----------------------------------------------------------------------------*/ 745 int 746 mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen) 747 { 748 mtcp_manager_t mtcp; 749 struct tcp_listener *listener; 750 socket_map_t socket; 751 tcp_stream *accepted = NULL; 752 753 mtcp = GetMTCPManager(mctx); 754 if (!mtcp) { 755 errno = EACCES; 756 return -1; 757 } 758 759 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 760 TRACE_API("Socket id %d out of range.\n", sockid); 761 errno = EBADF; 762 return -1; 763 } 764 765 /* requires listening socket */ 766 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 767 errno = EINVAL; 768 return -1; 769 } 770 771 listener = mtcp->smap[sockid].listener; 772 773 /* dequeue from the acceptq without lock first */ 774 /* if nothing there, acquire lock and cond_wait */ 775 accepted = StreamDequeue(listener->acceptq); 776 if (!accepted) { 777 if (listener->socket->opts & MTCP_NONBLOCK) { 778 errno = EAGAIN; 779 return -1; 780 781 } else { 782 pthread_mutex_lock(&listener->accept_lock); 783 while ((accepted = StreamDequeue(listener->acceptq)) == NULL) { 784 pthread_cond_wait(&listener->accept_cond, &listener->accept_lock); 785 786 if (mtcp->ctx->done || mtcp->ctx->exit) { 787 pthread_mutex_unlock(&listener->accept_lock); 788 errno = EINTR; 789 return -1; 790 } 791 } 792 pthread_mutex_unlock(&listener->accept_lock); 793 } 794 } 795 796 if (!accepted) { 797 TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n"); 798 } 799 800 if (!accepted->socket) { 801 socket = AllocateSocket(mctx, MOS_SOCK_STREAM); 802 if (!socket) { 803 TRACE_ERROR("Failed to create new socket!\n"); 804 /* TODO: destroy the stream */ 805 errno = ENFILE; 806 return -1; 807 } 808 socket->stream = accepted; 809 accepted->socket = socket; 810 /* if monitor is enabled, complete the socket assignment */ 811 if (socket->stream->pair_stream != NULL) 812 socket->stream->pair_stream->socket = socket; 813 } 814 815 TRACE_API("Stream %d accepted.\n", accepted->id); 816 817 if (addr && addrlen) { 818 struct sockaddr_in *addr_in = (struct sockaddr_in *)addr; 819 addr_in->sin_family = AF_INET; 820 addr_in->sin_port = accepted->dport; 821 addr_in->sin_addr.s_addr = accepted->daddr; 822 *addrlen = sizeof(struct sockaddr_in); 823 } 824 825 return accepted->socket->id; 826 } 827 /*----------------------------------------------------------------------------*/ 828 int 829 mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr, 830 in_addr_t daddr, in_addr_t dport) 831 { 832 mtcp_manager_t mtcp; 833 addr_pool_t ap; 834 835 mtcp = GetMTCPManager(mctx); 836 if (!mtcp) { 837 errno = EACCES; 838 return -1; 839 } 840 841 if (saddr_base == INADDR_ANY) { 842 int nif_out; 843 844 /* for the INADDR_ANY, find the output interface for the destination 845 and set the saddr_base as the ip address of the output interface */ 846 nif_out = GetOutputInterface(daddr); 847 saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr; 848 } 849 850 ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus, 851 saddr_base, num_addr, daddr, dport); 852 if (!ap) { 853 errno = ENOMEM; 854 return -1; 855 } 856 857 mtcp->ap = ap; 858 859 return 0; 860 } 861 /*----------------------------------------------------------------------------*/ 862 int 863 eval_bpf_5tuple(struct sfbpf_program fcode, 864 in_addr_t saddr, in_port_t sport, 865 in_addr_t daddr, in_port_t dport) { 866 uint8_t buf[TOTAL_TCP_HEADER_LEN]; 867 struct ethhdr *ethh; 868 struct iphdr *iph; 869 struct tcphdr *tcph; 870 871 ethh = (struct ethhdr *)buf; 872 ethh->h_proto = htons(ETH_P_IP); 873 iph = (struct iphdr *)(ethh + 1); 874 iph->ihl = IP_HEADER_LEN >> 2; 875 iph->version = 4; 876 iph->tos = 0; 877 iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN); 878 iph->id = htons(0); 879 iph->protocol = IPPROTO_TCP; 880 iph->saddr = saddr; 881 iph->daddr = daddr; 882 iph->check = 0; 883 tcph = (struct tcphdr *)(iph + 1); 884 tcph->source = sport; 885 tcph->dest = dport; 886 887 return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr), 888 TOTAL_TCP_HEADER_LEN); 889 } 890 /*----------------------------------------------------------------------------*/ 891 int 892 mtcp_connect(mctx_t mctx, int sockid, 893 const struct sockaddr *addr, socklen_t addrlen) 894 { 895 mtcp_manager_t mtcp; 896 socket_map_t socket; 897 tcp_stream *cur_stream; 898 struct sockaddr_in *addr_in; 899 in_addr_t dip; 900 in_port_t dport; 901 int is_dyn_bound = FALSE; 902 int ret; 903 int cnt_match = 0; 904 struct mon_listener *walk; 905 struct sfbpf_program fcode; 906 907 cur_stream = NULL; 908 mtcp = GetMTCPManager(mctx); 909 if (!mtcp) { 910 errno = EACCES; 911 return -1; 912 } 913 914 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 915 TRACE_API("Socket id %d out of range.\n", sockid); 916 errno = EBADF; 917 return -1; 918 } 919 920 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 921 TRACE_API("Invalid socket id: %d\n", sockid); 922 errno = EBADF; 923 return -1; 924 } 925 926 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 927 TRACE_API("Not an end socket. id: %d\n", sockid); 928 errno = ENOTSOCK; 929 return -1; 930 } 931 932 if (!addr) { 933 TRACE_API("Socket %d: empty address!\n", sockid); 934 errno = EFAULT; 935 return -1; 936 } 937 938 /* we only allow bind() for AF_INET address */ 939 if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 940 TRACE_API("Socket %d: invalid argument!\n", sockid); 941 errno = EAFNOSUPPORT; 942 return -1; 943 } 944 945 socket = &mtcp->smap[sockid]; 946 if (socket->stream) { 947 TRACE_API("Socket %d: stream already exist!\n", sockid); 948 if (socket->stream->state >= TCP_ST_ESTABLISHED) { 949 errno = EISCONN; 950 } else { 951 errno = EALREADY; 952 } 953 return -1; 954 } 955 956 addr_in = (struct sockaddr_in *)addr; 957 dip = addr_in->sin_addr.s_addr; 958 dport = addr_in->sin_port; 959 960 /* address binding */ 961 if (socket->opts & MTCP_ADDR_BIND && 962 socket->saddr.sin_port != INPORT_ANY && 963 socket->saddr.sin_addr.s_addr != INADDR_ANY) { 964 int rss_core; 965 966 rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip, 967 socket->saddr.sin_port, dport, num_queues); 968 969 if (rss_core != mctx->cpu) { 970 errno = EINVAL; 971 return -1; 972 } 973 } else { 974 if (mtcp->ap) { 975 ret = FetchAddress(mtcp->ap, 976 mctx->cpu, num_queues, addr_in, &socket->saddr); 977 } else { 978 ret = FetchAddress(ap, 979 mctx->cpu, num_queues, addr_in, &socket->saddr); 980 } 981 if (ret < 0) { 982 errno = EAGAIN; 983 return -1; 984 } 985 socket->opts |= MTCP_ADDR_BIND; 986 is_dyn_bound = TRUE; 987 } 988 989 cnt_match = 0; 990 if (mtcp->num_msp > 0) { 991 TAILQ_FOREACH(walk, &mtcp->monitors, link) { 992 fcode = walk->stream_syn_fcode; 993 if (!(ISSET_BPFFILTER(fcode) && 994 eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr, 995 socket->saddr.sin_port, 996 dip, dport) == 0)) { 997 walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1 998 cnt_match++; 999 } 1000 } 1001 } 1002 1003 if (mtcp->num_msp > 0 && cnt_match > 0) { 1004 /* 150820 dhkim: XXX: embedded mode is not verified */ 1005 #if 1 1006 cur_stream = CreateClientTCPStream(mtcp, socket, 1007 STREAM_TYPE(MOS_SOCK_STREAM) | 1008 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 1009 socket->saddr.sin_addr.s_addr, 1010 socket->saddr.sin_port, dip, dport, NULL); 1011 #else 1012 cur_stream = CreateDualTCPStream(mtcp, socket, 1013 STREAM_TYPE(MOS_SOCK_STREAM) | 1014 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 1015 socket->saddr.sin_addr.s_addr, 1016 socket->saddr.sin_port, dip, dport, NULL); 1017 #endif 1018 } 1019 else 1020 cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM), 1021 socket->saddr.sin_addr.s_addr, 1022 socket->saddr.sin_port, dip, dport, NULL); 1023 if (!cur_stream) { 1024 TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid); 1025 errno = ENOMEM; 1026 return -1; 1027 } 1028 1029 if (is_dyn_bound) 1030 cur_stream->is_bound_addr = TRUE; 1031 cur_stream->sndvar->cwnd = 1; 1032 cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10; 1033 cur_stream->side = MOS_SIDE_CLI; 1034 /* if monitor is enabled, update the pair stream side as well */ 1035 if (cur_stream->pair_stream) { 1036 cur_stream->pair_stream->side = MOS_SIDE_SVR; 1037 /* 1038 * if buffer management is off, then disable 1039 * monitoring tcp ring of server... 1040 * if there is even a single monitor asking for 1041 * buffer management, enable it (that's why the 1042 * need for the loop) 1043 */ 1044 cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF; 1045 struct socket_map *walk; 1046 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 1047 uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt; 1048 if (bm > cur_stream->pair_stream->buffer_mgmt) { 1049 cur_stream->pair_stream->buffer_mgmt = bm; 1050 break; 1051 } 1052 } SOCKQ_FOREACH_END; 1053 } 1054 1055 cur_stream->state = TCP_ST_SYN_SENT; 1056 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1057 1058 TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id); 1059 1060 SQ_LOCK(&mtcp->ctx->connect_lock); 1061 ret = StreamEnqueue(mtcp->connectq, cur_stream); 1062 SQ_UNLOCK(&mtcp->ctx->connect_lock); 1063 mtcp->wakeup_flag = TRUE; 1064 if (ret < 0) { 1065 TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid); 1066 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1067 StreamEnqueue(mtcp->destroyq, cur_stream); 1068 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1069 errno = EAGAIN; 1070 return -1; 1071 } 1072 1073 /* if nonblocking socket, return EINPROGRESS */ 1074 if (socket->opts & MTCP_NONBLOCK) { 1075 errno = EINPROGRESS; 1076 return -1; 1077 1078 } else { 1079 while (1) { 1080 if (!cur_stream) { 1081 TRACE_ERROR("STREAM DESTROYED\n"); 1082 errno = ETIMEDOUT; 1083 return -1; 1084 } 1085 if (cur_stream->state > TCP_ST_ESTABLISHED) { 1086 TRACE_ERROR("Socket %d: weird state %s\n", 1087 sockid, TCPStateToString(cur_stream)); 1088 // TODO: how to handle this? 1089 errno = ENOSYS; 1090 return -1; 1091 } 1092 1093 if (cur_stream->state == TCP_ST_ESTABLISHED) { 1094 break; 1095 } 1096 usleep(1000); 1097 } 1098 } 1099 1100 return 0; 1101 } 1102 /*----------------------------------------------------------------------------*/ 1103 static inline int 1104 CloseStreamSocket(mctx_t mctx, int sockid) 1105 { 1106 mtcp_manager_t mtcp; 1107 tcp_stream *cur_stream; 1108 int ret; 1109 1110 mtcp = GetMTCPManager(mctx); 1111 if (!mtcp) { 1112 errno = EACCES; 1113 return -1; 1114 } 1115 1116 cur_stream = mtcp->smap[sockid].stream; 1117 if (!cur_stream) { 1118 TRACE_API("Socket %d: stream does not exist.\n", sockid); 1119 errno = ENOTCONN; 1120 return -1; 1121 } 1122 1123 if (cur_stream->closed) { 1124 TRACE_API("Socket %d (Stream %u): already closed stream\n", 1125 sockid, cur_stream->id); 1126 return 0; 1127 } 1128 cur_stream->closed = TRUE; 1129 1130 TRACE_API("Stream %d: closing the stream.\n", cur_stream->id); 1131 1132 /* 141029 dhkim: Check this! */ 1133 cur_stream->socket = NULL; 1134 1135 if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 1136 TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n", 1137 cur_stream->id); 1138 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1139 StreamEnqueue(mtcp->destroyq, cur_stream); 1140 mtcp->wakeup_flag = TRUE; 1141 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1142 return 0; 1143 1144 } else if (cur_stream->state == TCP_ST_SYN_SENT) { 1145 #if 1 1146 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1147 StreamEnqueue(mtcp->destroyq, cur_stream); 1148 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1149 mtcp->wakeup_flag = TRUE; 1150 #endif 1151 return -1; 1152 1153 } else if (cur_stream->state != TCP_ST_ESTABLISHED && 1154 cur_stream->state != TCP_ST_CLOSE_WAIT) { 1155 TRACE_API("Stream %d at state %s\n", 1156 cur_stream->id, TCPStateToString(cur_stream)); 1157 errno = EBADF; 1158 return -1; 1159 } 1160 1161 SQ_LOCK(&mtcp->ctx->close_lock); 1162 cur_stream->sndvar->on_closeq = TRUE; 1163 ret = StreamEnqueue(mtcp->closeq, cur_stream); 1164 mtcp->wakeup_flag = TRUE; 1165 SQ_UNLOCK(&mtcp->ctx->close_lock); 1166 1167 if (ret < 0) { 1168 TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 1169 errno = EAGAIN; 1170 return -1; 1171 } 1172 1173 return 0; 1174 } 1175 /*----------------------------------------------------------------------------*/ 1176 static inline int 1177 CloseListeningSocket(mctx_t mctx, int sockid) 1178 { 1179 mtcp_manager_t mtcp; 1180 struct tcp_listener *listener; 1181 1182 mtcp = GetMTCPManager(mctx); 1183 if (!mtcp) { 1184 errno = EACCES; 1185 return -1; 1186 } 1187 1188 listener = mtcp->smap[sockid].listener; 1189 if (!listener) { 1190 errno = EINVAL; 1191 return -1; 1192 } 1193 1194 if (listener->acceptq) { 1195 DestroyStreamQueue(listener->acceptq); 1196 listener->acceptq = NULL; 1197 } 1198 1199 pthread_mutex_lock(&listener->accept_lock); 1200 pthread_cond_signal(&listener->accept_cond); 1201 pthread_mutex_unlock(&listener->accept_lock); 1202 1203 pthread_cond_destroy(&listener->accept_cond); 1204 pthread_mutex_destroy(&listener->accept_lock); 1205 1206 free(listener); 1207 mtcp->smap[sockid].listener = NULL; 1208 1209 return 0; 1210 } 1211 /*----------------------------------------------------------------------------*/ 1212 int 1213 mtcp_close(mctx_t mctx, int sockid) 1214 { 1215 mtcp_manager_t mtcp; 1216 int ret; 1217 1218 mtcp = GetMTCPManager(mctx); 1219 if (!mtcp) { 1220 errno = EACCES; 1221 return -1; 1222 } 1223 1224 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1225 TRACE_API("Socket id %d out of range.\n", sockid); 1226 errno = EBADF; 1227 return -1; 1228 } 1229 1230 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 1231 TRACE_API("Invalid socket id: %d\n", sockid); 1232 errno = EBADF; 1233 return -1; 1234 } 1235 1236 TRACE_API("Socket %d: mtcp_close called.\n", sockid); 1237 1238 switch (mtcp->smap[sockid].socktype) { 1239 case MOS_SOCK_STREAM: 1240 ret = CloseStreamSocket(mctx, sockid); 1241 break; 1242 1243 case MOS_SOCK_STREAM_LISTEN: 1244 ret = CloseListeningSocket(mctx, sockid); 1245 break; 1246 1247 case MOS_SOCK_EPOLL: 1248 ret = CloseEpollSocket(mctx, sockid); 1249 break; 1250 1251 case MOS_SOCK_PIPE: 1252 ret = PipeClose(mctx, sockid); 1253 break; 1254 1255 default: 1256 errno = EINVAL; 1257 ret = -1; 1258 break; 1259 } 1260 1261 FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 1262 1263 return ret; 1264 } 1265 /*----------------------------------------------------------------------------*/ 1266 int 1267 mtcp_abort(mctx_t mctx, int sockid) 1268 { 1269 mtcp_manager_t mtcp; 1270 tcp_stream *cur_stream; 1271 int ret; 1272 1273 mtcp = GetMTCPManager(mctx); 1274 if (!mtcp) { 1275 errno = EACCES; 1276 return -1; 1277 } 1278 1279 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1280 TRACE_API("Socket id %d out of range.\n", sockid); 1281 errno = EBADF; 1282 return -1; 1283 } 1284 1285 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 1286 TRACE_API("Invalid socket id: %d\n", sockid); 1287 errno = EBADF; 1288 return -1; 1289 } 1290 1291 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 1292 TRACE_API("Not an end socket. id: %d\n", sockid); 1293 errno = ENOTSOCK; 1294 return -1; 1295 } 1296 1297 cur_stream = mtcp->smap[sockid].stream; 1298 if (!cur_stream) { 1299 TRACE_API("Stream %d: does not exist.\n", sockid); 1300 errno = ENOTCONN; 1301 return -1; 1302 } 1303 1304 TRACE_API("Socket %d: mtcp_abort()\n", sockid); 1305 1306 FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 1307 cur_stream->socket = NULL; 1308 1309 if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 1310 TRACE_API("Stream %d: connection already reset.\n", sockid); 1311 return ERROR; 1312 1313 } else if (cur_stream->state == TCP_ST_SYN_SENT) { 1314 /* TODO: this should notify event failure to all 1315 previous read() or write() calls */ 1316 cur_stream->state = TCP_ST_CLOSED_RSVD; 1317 cur_stream->close_reason = TCP_ACTIVE_CLOSE; 1318 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1319 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1320 StreamEnqueue(mtcp->destroyq, cur_stream); 1321 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1322 mtcp->wakeup_flag = TRUE; 1323 return 0; 1324 1325 } else if (cur_stream->state == TCP_ST_CLOSING || 1326 cur_stream->state == TCP_ST_LAST_ACK || 1327 cur_stream->state == TCP_ST_TIME_WAIT) { 1328 cur_stream->state = TCP_ST_CLOSED_RSVD; 1329 cur_stream->close_reason = TCP_ACTIVE_CLOSE; 1330 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1331 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1332 StreamEnqueue(mtcp->destroyq, cur_stream); 1333 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1334 mtcp->wakeup_flag = TRUE; 1335 return 0; 1336 } 1337 1338 /* the stream structure will be destroyed after sending RST */ 1339 if (cur_stream->sndvar->on_resetq) { 1340 TRACE_ERROR("Stream %d: calling mtcp_abort() " 1341 "when in reset queue.\n", sockid); 1342 errno = ECONNRESET; 1343 return -1; 1344 } 1345 SQ_LOCK(&mtcp->ctx->reset_lock); 1346 cur_stream->sndvar->on_resetq = TRUE; 1347 ret = StreamEnqueue(mtcp->resetq, cur_stream); 1348 SQ_UNLOCK(&mtcp->ctx->reset_lock); 1349 mtcp->wakeup_flag = TRUE; 1350 1351 if (ret < 0) { 1352 TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 1353 errno = EAGAIN; 1354 return -1; 1355 } 1356 1357 return 0; 1358 } 1359 /*----------------------------------------------------------------------------*/ 1360 static inline int 1361 CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len) 1362 { 1363 struct tcp_recv_vars *rcvvar = cur_stream->rcvvar; 1364 int copylen; 1365 tcprb_t *rb = rcvvar->rcvbuf; 1366 if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) { 1367 errno = EAGAIN; 1368 return -1; 1369 } 1370 tcprb_setpile(rb, rb->pile + copylen); 1371 1372 rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb); 1373 //printf("rcv_wnd: %d\n", rcvvar->rcv_wnd); 1374 1375 /* Advertise newly freed receive buffer */ 1376 if (cur_stream->need_wnd_adv) { 1377 if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) { 1378 if (!cur_stream->sndvar->on_ackq) { 1379 SQ_LOCK(&mtcp->ctx->ackq_lock); 1380 cur_stream->sndvar->on_ackq = TRUE; 1381 StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */ 1382 SQ_UNLOCK(&mtcp->ctx->ackq_lock); 1383 cur_stream->need_wnd_adv = FALSE; 1384 mtcp->wakeup_flag = TRUE; 1385 } 1386 } 1387 } 1388 1389 return copylen; 1390 } 1391 /*----------------------------------------------------------------------------*/ 1392 ssize_t 1393 mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len) 1394 { 1395 mtcp_manager_t mtcp; 1396 socket_map_t socket; 1397 tcp_stream *cur_stream; 1398 struct tcp_recv_vars *rcvvar; 1399 int event_remaining, merged_len; 1400 int ret; 1401 1402 mtcp = GetMTCPManager(mctx); 1403 if (!mtcp) { 1404 errno = EACCES; 1405 return -1; 1406 } 1407 1408 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1409 TRACE_API("Socket id %d out of range.\n", sockid); 1410 errno = EBADF; 1411 return -1; 1412 } 1413 1414 socket = &mtcp->smap[sockid]; 1415 if (socket->socktype == MOS_SOCK_UNUSED) { 1416 TRACE_API("Invalid socket id: %d\n", sockid); 1417 errno = EBADF; 1418 return -1; 1419 } 1420 1421 if (socket->socktype == MOS_SOCK_PIPE) { 1422 return PipeRead(mctx, sockid, buf, len); 1423 } 1424 1425 if (socket->socktype != MOS_SOCK_STREAM) { 1426 TRACE_API("Not an end socket. id: %d\n", sockid); 1427 errno = ENOTSOCK; 1428 return -1; 1429 } 1430 1431 /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 1432 cur_stream = socket->stream; 1433 if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf || 1434 !(cur_stream->state >= TCP_ST_ESTABLISHED && 1435 cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 1436 errno = ENOTCONN; 1437 return -1; 1438 } 1439 1440 rcvvar = cur_stream->rcvvar; 1441 1442 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1443 1444 /* if CLOSE_WAIT, return 0 if there is no payload */ 1445 if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 1446 if (!rcvvar->rcvbuf) 1447 return 0; 1448 1449 if (merged_len == 0) 1450 return 0; 1451 } 1452 1453 /* return EAGAIN if no receive buffer */ 1454 if (socket->opts & MTCP_NONBLOCK) { 1455 if (!rcvvar->rcvbuf || merged_len == 0) { 1456 errno = EAGAIN; 1457 return -1; 1458 } 1459 } 1460 1461 SBUF_LOCK(&rcvvar->read_lock); 1462 1463 ret = CopyToUser(mtcp, cur_stream, buf, len); 1464 1465 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1466 event_remaining = FALSE; 1467 /* if there are remaining payload, generate EPOLLIN */ 1468 /* (may due to insufficient user buffer) */ 1469 if (socket->epoll & MOS_EPOLLIN) { 1470 if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 1471 event_remaining = TRUE; 1472 } 1473 } 1474 /* if waiting for close, notify it if no remaining data */ 1475 if (cur_stream->state == TCP_ST_CLOSE_WAIT && 1476 merged_len == 0 && ret > 0) { 1477 event_remaining = TRUE; 1478 } 1479 1480 SBUF_UNLOCK(&rcvvar->read_lock); 1481 1482 if (event_remaining) { 1483 if (socket->epoll) { 1484 AddEpollEvent(mtcp->ep, 1485 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 1486 } 1487 } 1488 1489 TRACE_API("Stream %d: mtcp_read() returning %d\n", cur_stream->id, ret); 1490 return ret; 1491 } 1492 /*----------------------------------------------------------------------------*/ 1493 ssize_t 1494 mtcp_readv(mctx_t mctx, int sockid, struct iovec *iov, int numIOV) 1495 { 1496 mtcp_manager_t mtcp; 1497 socket_map_t socket; 1498 tcp_stream *cur_stream; 1499 struct tcp_recv_vars *rcvvar; 1500 int ret, bytes_read, i; 1501 int event_remaining, merged_len; 1502 1503 mtcp = GetMTCPManager(mctx); 1504 if (!mtcp) { 1505 errno = EACCES; 1506 return -1; 1507 } 1508 1509 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1510 TRACE_API("Socket id %d out of range.\n", sockid); 1511 errno = EBADF; 1512 return -1; 1513 } 1514 1515 socket = &mtcp->smap[sockid]; 1516 if (socket->socktype == MOS_SOCK_UNUSED) { 1517 TRACE_API("Invalid socket id: %d\n", sockid); 1518 errno = EBADF; 1519 return -1; 1520 } 1521 1522 if (socket->socktype != MOS_SOCK_STREAM) { 1523 TRACE_API("Not an end socket. id: %d\n", sockid); 1524 errno = ENOTSOCK; 1525 return -1; 1526 } 1527 1528 /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 1529 cur_stream = socket->stream; 1530 if (!cur_stream || 1531 !(cur_stream->state >= TCP_ST_ESTABLISHED && 1532 cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 1533 errno = ENOTCONN; 1534 return -1; 1535 } 1536 1537 rcvvar = cur_stream->rcvvar; 1538 1539 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1540 1541 /* if CLOSE_WAIT, return 0 if there is no payload */ 1542 if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 1543 if (!rcvvar->rcvbuf) 1544 return 0; 1545 1546 if (merged_len == 0) 1547 return 0; 1548 } 1549 1550 /* return EAGAIN if no receive buffer */ 1551 if (socket->opts & MTCP_NONBLOCK) { 1552 if (!rcvvar->rcvbuf || merged_len == 0) { 1553 errno = EAGAIN; 1554 return -1; 1555 } 1556 } 1557 1558 SBUF_LOCK(&rcvvar->read_lock); 1559 1560 /* read and store the contents to the vectored buffers */ 1561 bytes_read = 0; 1562 for (i = 0; i < numIOV; i++) { 1563 if (iov[i].iov_len <= 0) 1564 continue; 1565 1566 ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 1567 if (ret <= 0) 1568 break; 1569 1570 bytes_read += ret; 1571 1572 if (ret < iov[i].iov_len) 1573 break; 1574 } 1575 1576 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1577 1578 event_remaining = FALSE; 1579 /* if there are remaining payload, generate read event */ 1580 /* (may due to insufficient user buffer) */ 1581 if (socket->epoll & MOS_EPOLLIN) { 1582 if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 1583 event_remaining = TRUE; 1584 } 1585 } 1586 /* if waiting for close, notify it if no remaining data */ 1587 if (cur_stream->state == TCP_ST_CLOSE_WAIT && 1588 merged_len == 0 && bytes_read > 0) { 1589 event_remaining = TRUE; 1590 } 1591 1592 SBUF_UNLOCK(&rcvvar->read_lock); 1593 1594 if(event_remaining) { 1595 if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) { 1596 AddEpollEvent(mtcp->ep, 1597 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 1598 } 1599 } 1600 1601 TRACE_API("Stream %d: mtcp_readv() returning %d\n", 1602 cur_stream->id, bytes_read); 1603 return bytes_read; 1604 } 1605 /*----------------------------------------------------------------------------*/ 1606 static inline int 1607 CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len) 1608 { 1609 struct tcp_send_vars *sndvar = cur_stream->sndvar; 1610 int sndlen; 1611 int ret; 1612 1613 sndlen = MIN((int)sndvar->snd_wnd, len); 1614 if (sndlen <= 0) { 1615 errno = EAGAIN; 1616 return -1; 1617 } 1618 1619 /* allocate send buffer if not exist */ 1620 if (!sndvar->sndbuf) { 1621 sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1); 1622 if (!sndvar->sndbuf) { 1623 cur_stream->close_reason = TCP_NO_MEM; 1624 /* notification may not required due to -1 return */ 1625 errno = ENOMEM; 1626 return -1; 1627 } 1628 } 1629 1630 ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen); 1631 assert(ret == sndlen); 1632 sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len; 1633 if (ret <= 0) { 1634 TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n", 1635 ret, sndlen, sndvar->sndbuf->len); 1636 errno = EAGAIN; 1637 return -1; 1638 } 1639 1640 if (sndvar->snd_wnd <= 0) { 1641 TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n", 1642 cur_stream->id, sndvar->snd_wnd); 1643 } 1644 1645 return ret; 1646 } 1647 /*----------------------------------------------------------------------------*/ 1648 ssize_t 1649 mtcp_write(mctx_t mctx, int sockid, char *buf, size_t len) 1650 { 1651 mtcp_manager_t mtcp; 1652 socket_map_t socket; 1653 tcp_stream *cur_stream; 1654 struct tcp_send_vars *sndvar; 1655 int ret; 1656 1657 mtcp = GetMTCPManager(mctx); 1658 if (!mtcp) { 1659 errno = EACCES; 1660 return -1; 1661 } 1662 1663 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1664 TRACE_API("Socket id %d out of range.\n", sockid); 1665 errno = EBADF; 1666 return -1; 1667 } 1668 1669 socket = &mtcp->smap[sockid]; 1670 if (socket->socktype == MOS_SOCK_UNUSED) { 1671 TRACE_API("Invalid socket id: %d\n", sockid); 1672 errno = EBADF; 1673 return -1; 1674 } 1675 1676 if (socket->socktype == MOS_SOCK_PIPE) { 1677 return PipeWrite(mctx, sockid, buf, len); 1678 } 1679 1680 if (socket->socktype != MOS_SOCK_STREAM) { 1681 TRACE_API("Not an end socket. id: %d\n", sockid); 1682 errno = ENOTSOCK; 1683 return -1; 1684 } 1685 1686 cur_stream = socket->stream; 1687 if (!cur_stream || 1688 !(cur_stream->state == TCP_ST_ESTABLISHED || 1689 cur_stream->state == TCP_ST_CLOSE_WAIT)) { 1690 errno = ENOTCONN; 1691 return -1; 1692 } 1693 1694 if (len <= 0) { 1695 if (socket->opts & MTCP_NONBLOCK) { 1696 errno = EAGAIN; 1697 return -1; 1698 } else { 1699 return 0; 1700 } 1701 } 1702 1703 sndvar = cur_stream->sndvar; 1704 1705 SBUF_LOCK(&sndvar->write_lock); 1706 ret = CopyFromUser(mtcp, cur_stream, buf, len); 1707 1708 SBUF_UNLOCK(&sndvar->write_lock); 1709 1710 if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 1711 SQ_LOCK(&mtcp->ctx->sendq_lock); 1712 sndvar->on_sendq = TRUE; 1713 StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 1714 SQ_UNLOCK(&mtcp->ctx->sendq_lock); 1715 mtcp->wakeup_flag = TRUE; 1716 } 1717 1718 if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) { 1719 ret = -1; 1720 errno = EAGAIN; 1721 } 1722 1723 /* if there are remaining sending buffer, generate write event */ 1724 if (sndvar->snd_wnd > 0) { 1725 if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 1726 AddEpollEvent(mtcp->ep, 1727 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 1728 } 1729 } 1730 1731 TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret); 1732 return ret; 1733 } 1734 /*----------------------------------------------------------------------------*/ 1735 ssize_t 1736 mtcp_writev(mctx_t mctx, int sockid, struct iovec *iov, int numIOV) 1737 { 1738 mtcp_manager_t mtcp; 1739 socket_map_t socket; 1740 tcp_stream *cur_stream; 1741 struct tcp_send_vars *sndvar; 1742 int ret, to_write, i; 1743 1744 mtcp = GetMTCPManager(mctx); 1745 if (!mtcp) { 1746 errno = EACCES; 1747 return -1; 1748 } 1749 1750 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1751 TRACE_API("Socket id %d out of range.\n", sockid); 1752 errno = EBADF; 1753 return -1; 1754 } 1755 1756 socket = &mtcp->smap[sockid]; 1757 if (socket->socktype == MOS_SOCK_UNUSED) { 1758 TRACE_API("Invalid socket id: %d\n", sockid); 1759 errno = EBADF; 1760 return -1; 1761 } 1762 1763 if (socket->socktype != MOS_SOCK_STREAM) { 1764 TRACE_API("Not an end socket. id: %d\n", sockid); 1765 errno = ENOTSOCK; 1766 return -1; 1767 } 1768 1769 cur_stream = socket->stream; 1770 if (!cur_stream || 1771 !(cur_stream->state == TCP_ST_ESTABLISHED || 1772 cur_stream->state == TCP_ST_CLOSE_WAIT)) { 1773 errno = ENOTCONN; 1774 return -1; 1775 } 1776 1777 sndvar = cur_stream->sndvar; 1778 SBUF_LOCK(&sndvar->write_lock); 1779 1780 /* write from the vectored buffers */ 1781 to_write = 0; 1782 for (i = 0; i < numIOV; i++) { 1783 if (iov[i].iov_len <= 0) 1784 continue; 1785 1786 ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 1787 if (ret <= 0) 1788 break; 1789 1790 to_write += ret; 1791 1792 if (ret < iov[i].iov_len) 1793 break; 1794 } 1795 SBUF_UNLOCK(&sndvar->write_lock); 1796 1797 if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 1798 SQ_LOCK(&mtcp->ctx->sendq_lock); 1799 sndvar->on_sendq = TRUE; 1800 StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 1801 SQ_UNLOCK(&mtcp->ctx->sendq_lock); 1802 mtcp->wakeup_flag = TRUE; 1803 } 1804 1805 if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) { 1806 to_write = -1; 1807 errno = EAGAIN; 1808 } 1809 1810 /* if there are remaining sending buffer, generate write event */ 1811 if (sndvar->snd_wnd > 0) { 1812 if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 1813 AddEpollEvent(mtcp->ep, 1814 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 1815 } 1816 } 1817 1818 TRACE_API("Stream %d: mtcp_writev() returning %d\n", 1819 cur_stream->id, to_write); 1820 return to_write; 1821 } 1822 /*----------------------------------------------------------------------------*/ 1823 uint32_t 1824 mtcp_get_connection_cnt(mctx_t mctx) 1825 { 1826 mtcp_manager_t mtcp; 1827 mtcp = GetMTCPManager(mctx); 1828 if (!mtcp) { 1829 errno = EACCES; 1830 return -1; 1831 } 1832 1833 if (mtcp->num_msp > 0) 1834 return mtcp->flow_cnt / 2; 1835 else 1836 return mtcp->flow_cnt; 1837 } 1838 /*----------------------------------------------------------------------------*/ 1839