1 #include <sys/queue.h> 2 #include <sys/ioctl.h> 3 #include <limits.h> 4 #include <unistd.h> 5 #include <assert.h> 6 #include <string.h> 7 8 #ifdef DARWIN 9 #include <netinet/tcp.h> 10 #include <netinet/ip.h> 11 #include <netinet/if_ether.h> 12 #endif 13 14 #include "mtcp.h" 15 #include "mtcp_api.h" 16 #include "tcp_in.h" 17 #include "tcp_stream.h" 18 #include "tcp_out.h" 19 #include "ip_out.h" 20 #include "eventpoll.h" 21 #include "pipe.h" 22 #include "fhash.h" 23 #include "addr_pool.h" 24 #include "rss.h" 25 #include "config.h" 26 #include "debug.h" 27 #include "eventpoll.h" 28 #include "mos_api.h" 29 #include "tcp_rb.h" 30 31 #define MAX(a, b) ((a)>(b)?(a):(b)) 32 #define MIN(a, b) ((a)<(b)?(a):(b)) 33 34 /*----------------------------------------------------------------------------*/ 35 /** Stop monitoring the socket! (function prototype) 36 * @param [in] mctx: mtcp context 37 * @param [in] sock: monitoring stream socket id 38 * @param [in] side: side of monitoring (client side, server side or both) 39 * 40 * This function is now DEPRECATED and is only used within mOS core... 41 */ 42 int 43 mtcp_cb_stop(mctx_t mctx, int sock, int side); 44 /*----------------------------------------------------------------------------*/ 45 /** Reset the connection (send RST packets to both sides) 46 * (We need to decide the API for this.) 47 */ 48 //int 49 //mtcp_cb_reset(mctx_t mctx, int sock, int side); 50 /*----------------------------------------------------------------------------*/ 51 inline mtcp_manager_t 52 GetMTCPManager(mctx_t mctx) 53 { 54 if (!mctx) { 55 errno = EACCES; 56 return NULL; 57 } 58 59 if (mctx->cpu < 0 || mctx->cpu >= num_cpus) { 60 errno = EINVAL; 61 return NULL; 62 } 63 64 if (!g_mtcp[mctx->cpu] || g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) { 65 errno = EPERM; 66 return NULL; 67 } 68 69 return g_mtcp[mctx->cpu]; 70 } 71 /*----------------------------------------------------------------------------*/ 72 static inline int 73 GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen) 74 { 75 tcp_stream *cur_stream; 76 77 if (!socket->stream) { 78 errno = EBADF; 79 return -1; 80 } 81 82 cur_stream = socket->stream; 83 if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 84 if (cur_stream->close_reason == TCP_TIMEDOUT || 85 cur_stream->close_reason == TCP_CONN_FAIL || 86 cur_stream->close_reason == TCP_CONN_LOST) { 87 *(int *)optval = ETIMEDOUT; 88 *optlen = sizeof(int); 89 90 return 0; 91 } 92 } 93 94 if (cur_stream->state == TCP_ST_CLOSE_WAIT || 95 cur_stream->state == TCP_ST_CLOSED_RSVD) { 96 if (cur_stream->close_reason == TCP_RESET) { 97 *(int *)optval = ECONNRESET; 98 *optlen = sizeof(int); 99 100 return 0; 101 } 102 } 103 104 if (cur_stream->state == TCP_ST_SYN_SENT && 105 errno == EINPROGRESS) { 106 *(int *)optval = errno; 107 *optlen = sizeof(int); 108 109 return -1; 110 } 111 112 /* 113 * `base case`: If socket sees no so_error, then 114 * this also means close_reason will always be 115 * TCP_NOT_CLOSED. 116 */ 117 if (cur_stream->close_reason == TCP_NOT_CLOSED) { 118 *(int *)optval = 0; 119 *optlen = sizeof(int); 120 121 return 0; 122 } 123 124 errno = ENOSYS; 125 return -1; 126 } 127 /*----------------------------------------------------------------------------*/ 128 int 129 mtcp_getsockname(mctx_t mctx, int sockid, struct sockaddr *addr, 130 socklen_t *addrlen) 131 { 132 mtcp_manager_t mtcp; 133 socket_map_t socket; 134 135 mtcp = GetMTCPManager(mctx); 136 if (!mtcp) { 137 return -1; 138 } 139 140 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 141 TRACE_API("Socket id %d out of range.\n", sockid); 142 errno = EBADF; 143 return -1; 144 } 145 146 socket = &mtcp->smap[sockid]; 147 if (socket->socktype == MOS_SOCK_UNUSED) { 148 TRACE_API("Invalid socket id: %d\n", sockid); 149 errno = EBADF; 150 return -1; 151 } 152 153 if (*addrlen <= 0) { 154 TRACE_API("Invalid addrlen: %d\n", *addrlen); 155 errno = EINVAL; 156 return -1; 157 } 158 159 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 160 socket->socktype != MOS_SOCK_STREAM) { 161 TRACE_API("Invalid socket id: %d\n", sockid); 162 errno = ENOTSOCK; 163 return -1; 164 } 165 166 *(struct sockaddr_in *)addr = socket->saddr; 167 *addrlen = sizeof(socket->saddr); 168 169 return 0; 170 } 171 /*----------------------------------------------------------------------------*/ 172 int 173 mtcp_getsockopt(mctx_t mctx, int sockid, int level, 174 int optname, void *optval, socklen_t *optlen) 175 { 176 mtcp_manager_t mtcp; 177 socket_map_t socket; 178 179 mtcp = GetMTCPManager(mctx); 180 if (!mtcp) { 181 errno = EACCES; 182 return -1; 183 } 184 185 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 186 TRACE_API("Socket id %d out of range.\n", sockid); 187 errno = EBADF; 188 return -1; 189 } 190 191 switch (level) { 192 case SOL_SOCKET: 193 socket = &mtcp->smap[sockid]; 194 if (socket->socktype == MOS_SOCK_UNUSED) { 195 TRACE_API("Invalid socket id: %d\n", sockid); 196 errno = EBADF; 197 return -1; 198 } 199 200 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 201 socket->socktype != MOS_SOCK_STREAM) { 202 TRACE_API("Invalid socket id: %d\n", sockid); 203 errno = ENOTSOCK; 204 return -1; 205 } 206 207 if (optname == SO_ERROR) { 208 if (socket->socktype == MOS_SOCK_STREAM) { 209 return GetSocketError(socket, optval, optlen); 210 } 211 } 212 break; 213 case SOL_MONSOCKET: 214 /* check if the calling thread is in MOS context */ 215 if (mtcp->ctx->thread != pthread_self()) { 216 errno = EPERM; 217 return -1; 218 } 219 /* 220 * All options will only work for active 221 * monitor stream sockets 222 */ 223 socket = &mtcp->msmap[sockid]; 224 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 225 TRACE_API("Invalid socket id: %d\n", sockid); 226 errno = ENOTSOCK; 227 return -1; 228 } 229 230 switch (optname) { 231 case MOS_FRAGINFO_CLIBUF: 232 return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen); 233 case MOS_FRAGINFO_SVRBUF: 234 return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen); 235 case MOS_INFO_CLIBUF: 236 return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen); 237 case MOS_INFO_SVRBUF: 238 return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen); 239 case MOS_TCP_STATE_CLI: 240 return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI, 241 optval, optlen); 242 case MOS_TCP_STATE_SVR: 243 return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR, 244 optval, optlen); 245 case MOS_TIMESTAMP: 246 return GetLastTimestamp(socket->monitor_stream->stream, 247 (uint32_t *)optval, 248 optlen); 249 default: 250 TRACE_API("can't recognize the optname=%d\n", optname); 251 assert(0); 252 } 253 break; 254 } 255 errno = ENOSYS; 256 return -1; 257 } 258 /*----------------------------------------------------------------------------*/ 259 int 260 mtcp_setsockopt(mctx_t mctx, int sockid, int level, 261 int optname, const void *optval, socklen_t optlen) 262 { 263 mtcp_manager_t mtcp; 264 socket_map_t socket; 265 tcprb_t *rb; 266 267 mtcp = GetMTCPManager(mctx); 268 if (!mtcp) { 269 errno = EACCES; 270 return -1; 271 } 272 273 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 274 TRACE_API("Socket id %d out of range.\n", sockid); 275 errno = EBADF; 276 return -1; 277 } 278 279 switch (level) { 280 case SOL_SOCKET: 281 socket = &mtcp->smap[sockid]; 282 if (socket->socktype == MOS_SOCK_UNUSED) { 283 TRACE_API("Invalid socket id: %d\n", sockid); 284 errno = EBADF; 285 return -1; 286 } 287 288 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 289 socket->socktype != MOS_SOCK_STREAM) { 290 TRACE_API("Invalid socket id: %d\n", sockid); 291 errno = ENOTSOCK; 292 return -1; 293 } 294 break; 295 case SOL_MONSOCKET: 296 socket = &mtcp->msmap[sockid]; 297 /* 298 * checking of calling thread to be in MOS context is 299 * disabled since both optnames can be called from 300 * `application' context (on passive sockets) 301 */ 302 /* 303 * if (mtcp->ctx->thread != pthread_self()) 304 * return -1; 305 */ 306 307 switch (optname) { 308 case MOS_CLIOVERLAP: 309 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 310 socket->monitor_stream->stream->rcvvar->rcvbuf : 311 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 312 if (rb == NULL) { 313 errno = EFAULT; 314 return -1; 315 } 316 if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) { 317 errno = EINVAL; 318 return -1; 319 } else 320 return 0; 321 break; 322 case MOS_SVROVERLAP: 323 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 324 socket->monitor_stream->stream->rcvvar->rcvbuf : 325 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 326 if (rb == NULL) { 327 errno = EFAULT; 328 return -1; 329 } 330 if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) { 331 errno = EINVAL; 332 return -1; 333 } else 334 return 0; 335 break; 336 case MOS_CLIBUF: 337 #if 0 338 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 339 errno = EBADF; 340 return -1; 341 } 342 #endif 343 #ifdef DISABLE_DYN_RESIZE 344 if (*(int *)optval != 0) 345 return -1; 346 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 347 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 348 socket->monitor_stream->stream->rcvvar->rcvbuf : 349 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 350 if (rb) { 351 tcprb_resize_meta(rb, 0); 352 tcprb_resize(rb, 0); 353 } 354 } 355 return DisableBuf(socket, MOS_SIDE_CLI); 356 #else 357 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 358 socket->monitor_stream->stream->rcvvar->rcvbuf : 359 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 360 if (tcprb_resize_meta(rb, *(int *)optval) < 0) 361 return -1; 362 return tcprb_resize(rb, 363 (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 364 #endif 365 case MOS_SVRBUF: 366 #if 0 367 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 368 errno = EBADF; 369 return -1; 370 } 371 #endif 372 #ifdef DISABLE_DYN_RESIZE 373 if (*(int *)optval != 0) 374 return -1; 375 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 376 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 377 socket->monitor_stream->stream->rcvvar->rcvbuf : 378 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 379 if (rb) { 380 tcprb_resize_meta(rb, 0); 381 tcprb_resize(rb, 0); 382 } 383 } 384 return DisableBuf(socket, MOS_SIDE_SVR); 385 #else 386 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 387 socket->monitor_stream->stream->rcvvar->rcvbuf : 388 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 389 if (tcprb_resize_meta(rb, *(int *)optval) < 0) 390 return -1; 391 return tcprb_resize(rb, 392 (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 393 #endif 394 case MOS_FRAG_CLIBUF: 395 #if 0 396 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 397 errno = EBADF; 398 return -1; 399 } 400 #endif 401 #ifdef DISABLE_DYN_RESIZE 402 if (*(int *)optval != 0) 403 return -1; 404 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 405 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 406 socket->monitor_stream->stream->rcvvar->rcvbuf : 407 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 408 if (rb) 409 tcprb_resize(rb, 0); 410 } 411 return 0; 412 #else 413 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 414 socket->monitor_stream->stream->rcvvar->rcvbuf : 415 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 416 if (rb->len == 0) 417 return tcprb_resize_meta(rb, *(int *)optval); 418 else 419 return -1; 420 #endif 421 case MOS_FRAG_SVRBUF: 422 #if 0 423 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 424 errno = EBADF; 425 return -1; 426 } 427 #endif 428 #ifdef DISABLE_DYN_RESIZE 429 if (*(int *)optval != 0) 430 return -1; 431 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 432 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 433 socket->monitor_stream->stream->rcvvar->rcvbuf : 434 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 435 if (rb) 436 tcprb_resize(rb, 0); 437 } 438 return 0; 439 #else 440 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 441 socket->monitor_stream->stream->rcvvar->rcvbuf : 442 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 443 if (rb->len == 0) 444 return tcprb_resize_meta(rb, *(int *)optval); 445 else 446 return -1; 447 #endif 448 case MOS_MONLEVEL: 449 #ifdef OLD_API 450 assert(*(int *)optval == MOS_NO_CLIBUF || 451 *(int *)optval == MOS_NO_SVRBUF); 452 return DisableBuf(socket, 453 (*(int *)optval == MOS_NO_CLIBUF) ? 454 MOS_SIDE_CLI : MOS_SIDE_SVR); 455 #endif 456 case MOS_SEQ_REMAP: 457 return TcpSeqChange(socket, 458 (uint32_t)((seq_remap_info *)optval)->seq_off, 459 ((seq_remap_info *)optval)->side, 460 mtcp->pctx->p.seq); 461 case MOS_STOP_MON: 462 return mtcp_cb_stop(mctx, sockid, *(int *)optval); 463 default: 464 TRACE_API("invalid optname=%d\n", optname); 465 assert(0); 466 } 467 break; 468 } 469 470 errno = ENOSYS; 471 return -1; 472 } 473 /*----------------------------------------------------------------------------*/ 474 int 475 mtcp_setsock_nonblock(mctx_t mctx, int sockid) 476 { 477 mtcp_manager_t mtcp; 478 479 mtcp = GetMTCPManager(mctx); 480 if (!mtcp) { 481 errno = EACCES; 482 return -1; 483 } 484 485 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 486 TRACE_API("Socket id %d out of range.\n", sockid); 487 errno = EBADF; 488 return -1; 489 } 490 491 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 492 TRACE_API("Invalid socket id: %d\n", sockid); 493 errno = EBADF; 494 return -1; 495 } 496 497 mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 498 499 return 0; 500 } 501 /*----------------------------------------------------------------------------*/ 502 int 503 mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp) 504 { 505 mtcp_manager_t mtcp; 506 socket_map_t socket; 507 508 mtcp = GetMTCPManager(mctx); 509 if (!mtcp) { 510 errno = EACCES; 511 return -1; 512 } 513 514 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 515 TRACE_API("Socket id %d out of range.\n", sockid); 516 errno = EBADF; 517 return -1; 518 } 519 520 /* only support stream socket */ 521 socket = &mtcp->smap[sockid]; 522 523 if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 524 socket->socktype != MOS_SOCK_STREAM) { 525 TRACE_API("Invalid socket id: %d\n", sockid); 526 errno = EBADF; 527 return -1; 528 } 529 530 if (!argp) { 531 errno = EFAULT; 532 return -1; 533 } 534 535 if (request == FIONREAD) { 536 tcp_stream *cur_stream; 537 tcprb_t *rbuf; 538 539 cur_stream = socket->stream; 540 if (!cur_stream) { 541 errno = EBADF; 542 return -1; 543 } 544 545 rbuf = cur_stream->rcvvar->rcvbuf; 546 *(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0; 547 548 } else if (request == FIONBIO) { 549 /* 550 * sockets can only be set to blocking/non-blocking 551 * modes during initialization 552 */ 553 if ((*(int *)argp)) 554 mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 555 else 556 mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK; 557 } else { 558 errno = EINVAL; 559 return -1; 560 } 561 562 return 0; 563 } 564 /*----------------------------------------------------------------------------*/ 565 static int 566 mtcp_monitor(mctx_t mctx, socket_map_t sock) 567 { 568 mtcp_manager_t mtcp; 569 struct mon_listener *monitor; 570 int sockid = sock->id; 571 572 mtcp = GetMTCPManager(mctx); 573 if (!mtcp) { 574 errno = EACCES; 575 return -1; 576 } 577 578 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 579 TRACE_API("Socket id %d out of range.\n", sockid); 580 errno = EBADF; 581 return -1; 582 } 583 584 if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) { 585 TRACE_API("Invalid socket id: %d\n", sockid); 586 errno = EBADF; 587 return -1; 588 } 589 590 if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM || 591 mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) { 592 TRACE_API("Not a monitor socket. id: %d\n", sockid); 593 errno = ENOTSOCK; 594 return -1; 595 } 596 597 monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener)); 598 if (!monitor) { 599 /* errno set from the malloc() */ 600 errno = ENOMEM; 601 return -1; 602 } 603 604 /* create a monitor-specific event queue */ 605 monitor->eq = CreateEventQueue(g_config.mos->max_concurrency); 606 if (!monitor->eq) { 607 TRACE_API("Can't create event queue (concurrency: %d) for " 608 "monitor read event registrations!\n", 609 g_config.mos->max_concurrency); 610 free(monitor); 611 errno = ENOMEM; 612 return -1; 613 } 614 615 /* set monitor-related basic parameters */ 616 #ifndef NEWEV 617 monitor->ude_id = UDE_OFFSET; 618 #endif 619 monitor->socket = sock; 620 monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL; 621 622 /* perform both sides monitoring by default */ 623 monitor->client_mon = monitor->server_mon = 1; 624 625 /* add monitor socket to the monitor list */ 626 TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link); 627 628 mtcp->msmap[sockid].monitor_listener = monitor; 629 630 return 0; 631 } 632 /*----------------------------------------------------------------------------*/ 633 int 634 mtcp_socket(mctx_t mctx, int domain, int type, int protocol) 635 { 636 mtcp_manager_t mtcp; 637 socket_map_t socket; 638 639 mtcp = GetMTCPManager(mctx); 640 if (!mtcp) { 641 errno = EACCES; 642 return -1; 643 } 644 645 if (domain != AF_INET) { 646 errno = EAFNOSUPPORT; 647 return -1; 648 } 649 650 if (type == SOCK_STREAM) { 651 type = MOS_SOCK_STREAM; 652 } else if (type == MOS_SOCK_MONITOR_STREAM || 653 type == MOS_SOCK_MONITOR_RAW) { 654 /* do nothing for the time being */ 655 } else { 656 /* Not supported type */ 657 errno = EINVAL; 658 return -1; 659 } 660 661 socket = AllocateSocket(mctx, type); 662 if (!socket) { 663 errno = ENFILE; 664 return -1; 665 } 666 667 if (type == MOS_SOCK_MONITOR_STREAM || 668 type == MOS_SOCK_MONITOR_RAW) { 669 mtcp_manager_t mtcp = GetMTCPManager(mctx); 670 if (!mtcp) { 671 errno = EACCES; 672 return -1; 673 } 674 mtcp_monitor(mctx, socket); 675 #ifdef NEWEV 676 socket->monitor_listener->stree_dontcare = NULL; 677 socket->monitor_listener->stree_pre_rcv = NULL; 678 socket->monitor_listener->stree_post_snd = NULL; 679 #else 680 InitEvB(mtcp, &socket->monitor_listener->dontcare_evb); 681 InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb); 682 InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb); 683 #endif 684 } 685 686 return socket->id; 687 } 688 /*----------------------------------------------------------------------------*/ 689 int 690 mtcp_bind(mctx_t mctx, int sockid, 691 const struct sockaddr *addr, socklen_t addrlen) 692 { 693 mtcp_manager_t mtcp; 694 struct sockaddr_in *addr_in; 695 696 mtcp = GetMTCPManager(mctx); 697 if (!mtcp) { 698 errno = EACCES; 699 return -1; 700 } 701 702 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 703 TRACE_API("Socket id %d out of range.\n", sockid); 704 errno = EBADF; 705 return -1; 706 } 707 708 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 709 TRACE_API("Invalid socket id: %d\n", sockid); 710 errno = EBADF; 711 return -1; 712 } 713 714 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM && 715 mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 716 TRACE_API("Not a stream socket id: %d\n", sockid); 717 errno = ENOTSOCK; 718 return -1; 719 } 720 721 if (!addr) { 722 TRACE_API("Socket %d: empty address!\n", sockid); 723 errno = EINVAL; 724 return -1; 725 } 726 727 if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) { 728 TRACE_API("Socket %d: adress already bind for this socket.\n", sockid); 729 errno = EINVAL; 730 return -1; 731 } 732 733 /* we only allow bind() for AF_INET address */ 734 if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 735 TRACE_API("Socket %d: invalid argument!\n", sockid); 736 errno = EINVAL; 737 return -1; 738 } 739 740 if (mtcp->listener) { 741 TRACE_API("Address already bound!\n"); 742 errno = EINVAL; 743 return -1; 744 } 745 addr_in = (struct sockaddr_in *)addr; 746 mtcp->smap[sockid].saddr = *addr_in; 747 mtcp->smap[sockid].opts |= MTCP_ADDR_BIND; 748 749 return 0; 750 } 751 /*----------------------------------------------------------------------------*/ 752 int 753 mtcp_listen(mctx_t mctx, int sockid, int backlog) 754 { 755 mtcp_manager_t mtcp; 756 struct tcp_listener *listener; 757 758 mtcp = GetMTCPManager(mctx); 759 if (!mtcp) { 760 errno = EACCES; 761 return -1; 762 } 763 764 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 765 TRACE_API("Socket id %d out of range.\n", sockid); 766 errno = EBADF; 767 return -1; 768 } 769 770 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 771 TRACE_API("Invalid socket id: %d\n", sockid); 772 errno = EBADF; 773 return -1; 774 } 775 776 if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) { 777 mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN; 778 } 779 780 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 781 TRACE_API("Not a listening socket. id: %d\n", sockid); 782 errno = ENOTSOCK; 783 return -1; 784 } 785 786 if (backlog <= 0 || backlog > g_config.mos->max_concurrency) { 787 errno = EINVAL; 788 return -1; 789 } 790 791 listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener)); 792 if (!listener) { 793 /* errno set from the malloc() */ 794 errno = ENOMEM; 795 return -1; 796 } 797 798 listener->sockid = sockid; 799 listener->backlog = backlog; 800 listener->socket = &mtcp->smap[sockid]; 801 802 if (pthread_cond_init(&listener->accept_cond, NULL)) { 803 perror("pthread_cond_init of ctx->accept_cond\n"); 804 /* errno set by pthread_cond_init() */ 805 return -1; 806 } 807 if (pthread_mutex_init(&listener->accept_lock, NULL)) { 808 perror("pthread_mutex_init of ctx->accept_lock\n"); 809 /* errno set by pthread_mutex_init() */ 810 return -1; 811 } 812 813 listener->acceptq = CreateStreamQueue(backlog); 814 if (!listener->acceptq) { 815 errno = ENOMEM; 816 return -1; 817 } 818 819 mtcp->smap[sockid].listener = listener; 820 mtcp->listener = listener; 821 822 return 0; 823 } 824 /*----------------------------------------------------------------------------*/ 825 int 826 mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen) 827 { 828 mtcp_manager_t mtcp; 829 struct tcp_listener *listener; 830 socket_map_t socket; 831 tcp_stream *accepted = NULL; 832 833 mtcp = GetMTCPManager(mctx); 834 if (!mtcp) { 835 errno = EACCES; 836 return -1; 837 } 838 839 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 840 TRACE_API("Socket id %d out of range.\n", sockid); 841 errno = EBADF; 842 return -1; 843 } 844 845 /* requires listening socket */ 846 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 847 errno = EINVAL; 848 return -1; 849 } 850 851 listener = mtcp->smap[sockid].listener; 852 853 /* dequeue from the acceptq without lock first */ 854 /* if nothing there, acquire lock and cond_wait */ 855 accepted = StreamDequeue(listener->acceptq); 856 if (!accepted) { 857 if (listener->socket->opts & MTCP_NONBLOCK) { 858 errno = EAGAIN; 859 return -1; 860 861 } else { 862 pthread_mutex_lock(&listener->accept_lock); 863 while ((accepted = StreamDequeue(listener->acceptq)) == NULL) { 864 pthread_cond_wait(&listener->accept_cond, &listener->accept_lock); 865 866 if (mtcp->ctx->done || mtcp->ctx->exit) { 867 pthread_mutex_unlock(&listener->accept_lock); 868 errno = EINTR; 869 return -1; 870 } 871 } 872 pthread_mutex_unlock(&listener->accept_lock); 873 } 874 } 875 876 if (!accepted) { 877 TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n"); 878 } 879 880 if (!accepted->socket) { 881 socket = AllocateSocket(mctx, MOS_SOCK_STREAM); 882 if (!socket) { 883 TRACE_ERROR("Failed to create new socket!\n"); 884 /* TODO: destroy the stream */ 885 errno = ENFILE; 886 return -1; 887 } 888 socket->stream = accepted; 889 accepted->socket = socket; 890 891 /* set socket addr parameters */ 892 socket->saddr.sin_family = AF_INET; 893 socket->saddr.sin_port = accepted->dport; 894 socket->saddr.sin_addr.s_addr = accepted->daddr; 895 896 /* if monitor is enabled, complete the socket assignment */ 897 if (socket->stream->pair_stream != NULL) 898 socket->stream->pair_stream->socket = socket; 899 } 900 901 if (!(listener->socket->epoll & MOS_EPOLLET) && 902 !StreamQueueIsEmpty(listener->acceptq)) 903 AddEpollEvent(mtcp->ep, 904 USR_SHADOW_EVENT_QUEUE, 905 listener->socket, MOS_EPOLLIN); 906 907 TRACE_API("Stream %d accepted.\n", accepted->id); 908 909 if (addr && addrlen) { 910 struct sockaddr_in *addr_in = (struct sockaddr_in *)addr; 911 addr_in->sin_family = AF_INET; 912 addr_in->sin_port = accepted->dport; 913 addr_in->sin_addr.s_addr = accepted->daddr; 914 *addrlen = sizeof(struct sockaddr_in); 915 } 916 917 return accepted->socket->id; 918 } 919 /*----------------------------------------------------------------------------*/ 920 int 921 mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr, 922 in_addr_t daddr, in_addr_t dport) 923 { 924 mtcp_manager_t mtcp; 925 addr_pool_t ap; 926 927 mtcp = GetMTCPManager(mctx); 928 if (!mtcp) { 929 errno = EACCES; 930 return -1; 931 } 932 933 if (saddr_base == INADDR_ANY) { 934 int nif_out; 935 936 /* for the INADDR_ANY, find the output interface for the destination 937 and set the saddr_base as the ip address of the output interface */ 938 nif_out = GetOutputInterface(daddr); 939 saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr; 940 } 941 942 ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus, 943 saddr_base, num_addr, daddr, dport); 944 if (!ap) { 945 errno = ENOMEM; 946 return -1; 947 } 948 949 mtcp->ap = ap; 950 951 return 0; 952 } 953 /*----------------------------------------------------------------------------*/ 954 int 955 eval_bpf_5tuple(struct sfbpf_program fcode, 956 in_addr_t saddr, in_port_t sport, 957 in_addr_t daddr, in_port_t dport) { 958 uint8_t buf[TOTAL_TCP_HEADER_LEN]; 959 struct ethhdr *ethh; 960 struct iphdr *iph; 961 struct tcphdr *tcph; 962 963 ethh = (struct ethhdr *)buf; 964 ethh->h_proto = htons(ETH_P_IP); 965 iph = (struct iphdr *)(ethh + 1); 966 iph->ihl = IP_HEADER_LEN >> 2; 967 iph->version = 4; 968 iph->tos = 0; 969 iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN); 970 iph->id = htons(0); 971 iph->protocol = IPPROTO_TCP; 972 iph->saddr = saddr; 973 iph->daddr = daddr; 974 iph->check = 0; 975 tcph = (struct tcphdr *)(iph + 1); 976 tcph->source = sport; 977 tcph->dest = dport; 978 979 return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr), 980 TOTAL_TCP_HEADER_LEN); 981 } 982 /*----------------------------------------------------------------------------*/ 983 int 984 mtcp_connect(mctx_t mctx, int sockid, 985 const struct sockaddr *addr, socklen_t addrlen) 986 { 987 mtcp_manager_t mtcp; 988 socket_map_t socket; 989 tcp_stream *cur_stream; 990 struct sockaddr_in *addr_in; 991 in_addr_t dip; 992 in_port_t dport; 993 int is_dyn_bound = FALSE; 994 int ret; 995 int cnt_match = 0; 996 struct mon_listener *walk; 997 struct sfbpf_program fcode; 998 999 cur_stream = NULL; 1000 mtcp = GetMTCPManager(mctx); 1001 if (!mtcp) { 1002 errno = EACCES; 1003 return -1; 1004 } 1005 1006 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1007 TRACE_API("Socket id %d out of range.\n", sockid); 1008 errno = EBADF; 1009 return -1; 1010 } 1011 1012 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 1013 TRACE_API("Invalid socket id: %d\n", sockid); 1014 errno = EBADF; 1015 return -1; 1016 } 1017 1018 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 1019 TRACE_API("Not an end socket. id: %d\n", sockid); 1020 errno = ENOTSOCK; 1021 return -1; 1022 } 1023 1024 if (!addr) { 1025 TRACE_API("Socket %d: empty address!\n", sockid); 1026 errno = EFAULT; 1027 return -1; 1028 } 1029 1030 /* we only allow bind() for AF_INET address */ 1031 if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 1032 TRACE_API("Socket %d: invalid argument!\n", sockid); 1033 errno = EAFNOSUPPORT; 1034 return -1; 1035 } 1036 1037 socket = &mtcp->smap[sockid]; 1038 if (socket->stream) { 1039 TRACE_API("Socket %d: stream already exist!\n", sockid); 1040 if (socket->stream->state >= TCP_ST_ESTABLISHED) { 1041 errno = EISCONN; 1042 } else { 1043 errno = EALREADY; 1044 } 1045 return -1; 1046 } 1047 1048 addr_in = (struct sockaddr_in *)addr; 1049 dip = addr_in->sin_addr.s_addr; 1050 dport = addr_in->sin_port; 1051 1052 /* address binding */ 1053 if (socket->opts & MTCP_ADDR_BIND && 1054 socket->saddr.sin_port != INPORT_ANY && 1055 socket->saddr.sin_addr.s_addr != INADDR_ANY) { 1056 int rss_core; 1057 1058 rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip, 1059 socket->saddr.sin_port, dport, num_queues); 1060 1061 if (rss_core != mctx->cpu) { 1062 errno = EINVAL; 1063 return -1; 1064 } 1065 } else { 1066 if (mtcp->ap) { 1067 ret = FetchAddress(mtcp->ap, 1068 mctx->cpu, num_queues, addr_in, &socket->saddr); 1069 } else { 1070 ret = FetchAddress(ap[GetOutputInterface(dip)], 1071 mctx->cpu, num_queues, addr_in, &socket->saddr); 1072 } 1073 if (ret < 0) { 1074 errno = EAGAIN; 1075 return -1; 1076 } 1077 socket->opts |= MTCP_ADDR_BIND; 1078 is_dyn_bound = TRUE; 1079 } 1080 1081 cnt_match = 0; 1082 if (mtcp->num_msp > 0) { 1083 TAILQ_FOREACH(walk, &mtcp->monitors, link) { 1084 fcode = walk->stream_syn_fcode; 1085 if (!(ISSET_BPFFILTER(fcode) && 1086 eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr, 1087 socket->saddr.sin_port, 1088 dip, dport) == 0)) { 1089 walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1 1090 cnt_match++; 1091 } 1092 } 1093 } 1094 1095 if (mtcp->num_msp > 0 && cnt_match > 0) { 1096 /* 150820 dhkim: XXX: embedded mode is not verified */ 1097 #if 1 1098 cur_stream = CreateClientTCPStream(mtcp, socket, 1099 STREAM_TYPE(MOS_SOCK_STREAM) | 1100 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 1101 socket->saddr.sin_addr.s_addr, 1102 socket->saddr.sin_port, dip, dport, NULL); 1103 #else 1104 cur_stream = CreateDualTCPStream(mtcp, socket, 1105 STREAM_TYPE(MOS_SOCK_STREAM) | 1106 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 1107 socket->saddr.sin_addr.s_addr, 1108 socket->saddr.sin_port, dip, dport, NULL); 1109 #endif 1110 } 1111 else 1112 cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM), 1113 socket->saddr.sin_addr.s_addr, 1114 socket->saddr.sin_port, dip, dport, NULL); 1115 if (!cur_stream) { 1116 TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid); 1117 errno = ENOMEM; 1118 return -1; 1119 } 1120 1121 if (is_dyn_bound) 1122 cur_stream->is_bound_addr = TRUE; 1123 cur_stream->sndvar->cwnd = 1; 1124 cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10; 1125 cur_stream->side = MOS_SIDE_CLI; 1126 /* if monitor is enabled, update the pair stream side as well */ 1127 if (cur_stream->pair_stream) { 1128 cur_stream->pair_stream->side = MOS_SIDE_SVR; 1129 /* 1130 * if buffer management is off, then disable 1131 * monitoring tcp ring of server... 1132 * if there is even a single monitor asking for 1133 * buffer management, enable it (that's why the 1134 * need for the loop) 1135 */ 1136 cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF; 1137 struct socket_map *walk; 1138 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 1139 uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt; 1140 if (bm > cur_stream->pair_stream->buffer_mgmt) { 1141 cur_stream->pair_stream->buffer_mgmt = bm; 1142 break; 1143 } 1144 } SOCKQ_FOREACH_END; 1145 } 1146 1147 cur_stream->state = TCP_ST_SYN_SENT; 1148 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1149 1150 TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id); 1151 1152 SQ_LOCK(&mtcp->ctx->connect_lock); 1153 ret = StreamEnqueue(mtcp->connectq, cur_stream); 1154 SQ_UNLOCK(&mtcp->ctx->connect_lock); 1155 mtcp->wakeup_flag = TRUE; 1156 if (ret < 0) { 1157 TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid); 1158 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1159 StreamEnqueue(mtcp->destroyq, cur_stream); 1160 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1161 errno = EAGAIN; 1162 return -1; 1163 } 1164 1165 /* if nonblocking socket, return EINPROGRESS */ 1166 if (socket->opts & MTCP_NONBLOCK) { 1167 errno = EINPROGRESS; 1168 return -1; 1169 1170 } else { 1171 while (1) { 1172 if (!cur_stream) { 1173 TRACE_ERROR("STREAM DESTROYED\n"); 1174 errno = ETIMEDOUT; 1175 return -1; 1176 } 1177 if (cur_stream->state > TCP_ST_ESTABLISHED) { 1178 TRACE_ERROR("Socket %d: weird state %s\n", 1179 sockid, TCPStateToString(cur_stream)); 1180 // TODO: how to handle this? 1181 errno = ENOSYS; 1182 return -1; 1183 } 1184 1185 if (cur_stream->state == TCP_ST_ESTABLISHED) { 1186 break; 1187 } 1188 usleep(1000); 1189 } 1190 } 1191 1192 return 0; 1193 } 1194 /*----------------------------------------------------------------------------*/ 1195 static inline int 1196 CloseStreamSocket(mctx_t mctx, int sockid) 1197 { 1198 mtcp_manager_t mtcp; 1199 tcp_stream *cur_stream; 1200 int ret; 1201 1202 mtcp = GetMTCPManager(mctx); 1203 if (!mtcp) { 1204 errno = EACCES; 1205 return -1; 1206 } 1207 1208 cur_stream = mtcp->smap[sockid].stream; 1209 if (!cur_stream) { 1210 TRACE_API("Socket %d: stream does not exist.\n", sockid); 1211 errno = ENOTCONN; 1212 return -1; 1213 } 1214 1215 if (cur_stream->closed) { 1216 TRACE_API("Socket %d (Stream %u): already closed stream\n", 1217 sockid, cur_stream->id); 1218 return 0; 1219 } 1220 cur_stream->closed = TRUE; 1221 1222 TRACE_API("Stream %d: closing the stream.\n", cur_stream->id); 1223 1224 /* 141029 dhkim: Check this! */ 1225 cur_stream->socket = NULL; 1226 1227 if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 1228 TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n", 1229 cur_stream->id); 1230 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1231 StreamEnqueue(mtcp->destroyq, cur_stream); 1232 mtcp->wakeup_flag = TRUE; 1233 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1234 return 0; 1235 1236 } else if (cur_stream->state == TCP_ST_SYN_SENT) { 1237 #if 1 1238 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1239 StreamEnqueue(mtcp->destroyq, cur_stream); 1240 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1241 mtcp->wakeup_flag = TRUE; 1242 #endif 1243 return -1; 1244 1245 } else if (cur_stream->state != TCP_ST_ESTABLISHED && 1246 cur_stream->state != TCP_ST_CLOSE_WAIT) { 1247 TRACE_API("Stream %d at state %s\n", 1248 cur_stream->id, TCPStateToString(cur_stream)); 1249 errno = EBADF; 1250 return -1; 1251 } 1252 1253 SQ_LOCK(&mtcp->ctx->close_lock); 1254 cur_stream->sndvar->on_closeq = TRUE; 1255 ret = StreamEnqueue(mtcp->closeq, cur_stream); 1256 mtcp->wakeup_flag = TRUE; 1257 SQ_UNLOCK(&mtcp->ctx->close_lock); 1258 1259 if (ret < 0) { 1260 TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 1261 errno = EAGAIN; 1262 return -1; 1263 } 1264 1265 return 0; 1266 } 1267 /*----------------------------------------------------------------------------*/ 1268 static inline int 1269 CloseListeningSocket(mctx_t mctx, int sockid) 1270 { 1271 mtcp_manager_t mtcp; 1272 struct tcp_listener *listener; 1273 1274 mtcp = GetMTCPManager(mctx); 1275 if (!mtcp) { 1276 errno = EACCES; 1277 return -1; 1278 } 1279 1280 listener = mtcp->smap[sockid].listener; 1281 if (!listener) { 1282 errno = EINVAL; 1283 return -1; 1284 } 1285 1286 if (listener->acceptq) { 1287 DestroyStreamQueue(listener->acceptq); 1288 listener->acceptq = NULL; 1289 } 1290 1291 pthread_mutex_lock(&listener->accept_lock); 1292 pthread_cond_signal(&listener->accept_cond); 1293 pthread_mutex_unlock(&listener->accept_lock); 1294 1295 pthread_cond_destroy(&listener->accept_cond); 1296 pthread_mutex_destroy(&listener->accept_lock); 1297 1298 free(listener); 1299 mtcp->smap[sockid].listener = NULL; 1300 1301 return 0; 1302 } 1303 /*----------------------------------------------------------------------------*/ 1304 int 1305 mtcp_close(mctx_t mctx, int sockid) 1306 { 1307 mtcp_manager_t mtcp; 1308 int ret; 1309 1310 mtcp = GetMTCPManager(mctx); 1311 if (!mtcp) { 1312 errno = EACCES; 1313 return -1; 1314 } 1315 1316 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1317 TRACE_API("Socket id %d out of range.\n", sockid); 1318 errno = EBADF; 1319 return -1; 1320 } 1321 1322 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 1323 TRACE_API("Invalid socket id: %d\n", sockid); 1324 errno = EBADF; 1325 return -1; 1326 } 1327 1328 TRACE_API("Socket %d: mtcp_close called.\n", sockid); 1329 1330 switch (mtcp->smap[sockid].socktype) { 1331 case MOS_SOCK_STREAM: 1332 ret = CloseStreamSocket(mctx, sockid); 1333 break; 1334 1335 case MOS_SOCK_STREAM_LISTEN: 1336 ret = CloseListeningSocket(mctx, sockid); 1337 break; 1338 1339 case MOS_SOCK_EPOLL: 1340 ret = CloseEpollSocket(mctx, sockid); 1341 break; 1342 1343 case MOS_SOCK_PIPE: 1344 ret = PipeClose(mctx, sockid); 1345 break; 1346 1347 default: 1348 errno = EINVAL; 1349 ret = -1; 1350 break; 1351 } 1352 1353 FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 1354 1355 return ret; 1356 } 1357 /*----------------------------------------------------------------------------*/ 1358 int 1359 mtcp_abort(mctx_t mctx, int sockid) 1360 { 1361 mtcp_manager_t mtcp; 1362 tcp_stream *cur_stream; 1363 int ret; 1364 1365 mtcp = GetMTCPManager(mctx); 1366 if (!mtcp) { 1367 errno = EACCES; 1368 return -1; 1369 } 1370 1371 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1372 TRACE_API("Socket id %d out of range.\n", sockid); 1373 errno = EBADF; 1374 return -1; 1375 } 1376 1377 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 1378 TRACE_API("Invalid socket id: %d\n", sockid); 1379 errno = EBADF; 1380 return -1; 1381 } 1382 1383 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 1384 TRACE_API("Not an end socket. id: %d\n", sockid); 1385 errno = ENOTSOCK; 1386 return -1; 1387 } 1388 1389 cur_stream = mtcp->smap[sockid].stream; 1390 if (!cur_stream) { 1391 TRACE_API("Stream %d: does not exist.\n", sockid); 1392 errno = ENOTCONN; 1393 return -1; 1394 } 1395 1396 TRACE_API("Socket %d: mtcp_abort()\n", sockid); 1397 1398 FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 1399 cur_stream->socket = NULL; 1400 1401 if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 1402 TRACE_API("Stream %d: connection already reset.\n", sockid); 1403 return ERROR; 1404 1405 } else if (cur_stream->state == TCP_ST_SYN_SENT) { 1406 /* TODO: this should notify event failure to all 1407 previous read() or write() calls */ 1408 cur_stream->state = TCP_ST_CLOSED_RSVD; 1409 cur_stream->close_reason = TCP_ACTIVE_CLOSE; 1410 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1411 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1412 StreamEnqueue(mtcp->destroyq, cur_stream); 1413 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1414 mtcp->wakeup_flag = TRUE; 1415 return 0; 1416 1417 } else if (cur_stream->state == TCP_ST_CLOSING || 1418 cur_stream->state == TCP_ST_LAST_ACK || 1419 cur_stream->state == TCP_ST_TIME_WAIT) { 1420 cur_stream->state = TCP_ST_CLOSED_RSVD; 1421 cur_stream->close_reason = TCP_ACTIVE_CLOSE; 1422 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1423 SQ_LOCK(&mtcp->ctx->destroyq_lock); 1424 StreamEnqueue(mtcp->destroyq, cur_stream); 1425 SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1426 mtcp->wakeup_flag = TRUE; 1427 return 0; 1428 } 1429 1430 /* the stream structure will be destroyed after sending RST */ 1431 if (cur_stream->sndvar->on_resetq) { 1432 TRACE_ERROR("Stream %d: calling mtcp_abort() " 1433 "when in reset queue.\n", sockid); 1434 errno = ECONNRESET; 1435 return -1; 1436 } 1437 SQ_LOCK(&mtcp->ctx->reset_lock); 1438 cur_stream->sndvar->on_resetq = TRUE; 1439 ret = StreamEnqueue(mtcp->resetq, cur_stream); 1440 SQ_UNLOCK(&mtcp->ctx->reset_lock); 1441 mtcp->wakeup_flag = TRUE; 1442 1443 if (ret < 0) { 1444 TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 1445 errno = EAGAIN; 1446 return -1; 1447 } 1448 1449 return 0; 1450 } 1451 /*----------------------------------------------------------------------------*/ 1452 static inline int 1453 PeekForUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len) 1454 { 1455 struct tcp_recv_vars *rcvvar = cur_stream->rcvvar; 1456 int copylen; 1457 tcprb_t *rb = rcvvar->rcvbuf; 1458 1459 if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) { 1460 errno = EAGAIN; 1461 return -1; 1462 } 1463 1464 return copylen; 1465 } 1466 /*----------------------------------------------------------------------------*/ 1467 static inline int 1468 CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len) 1469 { 1470 struct tcp_recv_vars *rcvvar = cur_stream->rcvvar; 1471 int copylen; 1472 tcprb_t *rb = rcvvar->rcvbuf; 1473 if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) { 1474 errno = EAGAIN; 1475 return -1; 1476 } 1477 tcprb_setpile(rb, rb->pile + copylen); 1478 1479 rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb); 1480 //printf("rcv_wnd: %d\n", rcvvar->rcv_wnd); 1481 1482 /* Advertise newly freed receive buffer */ 1483 if (cur_stream->need_wnd_adv) { 1484 if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) { 1485 if (!cur_stream->sndvar->on_ackq) { 1486 SQ_LOCK(&mtcp->ctx->ackq_lock); 1487 cur_stream->sndvar->on_ackq = TRUE; 1488 StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */ 1489 SQ_UNLOCK(&mtcp->ctx->ackq_lock); 1490 cur_stream->need_wnd_adv = FALSE; 1491 mtcp->wakeup_flag = TRUE; 1492 } 1493 } 1494 } 1495 1496 return copylen; 1497 } 1498 /*----------------------------------------------------------------------------*/ 1499 ssize_t 1500 mtcp_recv(mctx_t mctx, int sockid, char *buf, size_t len, int flags) 1501 { 1502 mtcp_manager_t mtcp; 1503 socket_map_t socket; 1504 tcp_stream *cur_stream; 1505 struct tcp_recv_vars *rcvvar; 1506 int event_remaining, merged_len; 1507 int ret; 1508 1509 mtcp = GetMTCPManager(mctx); 1510 if (!mtcp) { 1511 errno = EACCES; 1512 return -1; 1513 } 1514 1515 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1516 TRACE_API("Socket id %d out of range.\n", sockid); 1517 errno = EBADF; 1518 return -1; 1519 } 1520 1521 socket = &mtcp->smap[sockid]; 1522 if (socket->socktype == MOS_SOCK_UNUSED) { 1523 TRACE_API("Invalid socket id: %d\n", sockid); 1524 errno = EBADF; 1525 return -1; 1526 } 1527 1528 if (socket->socktype == MOS_SOCK_PIPE) { 1529 return PipeRead(mctx, sockid, buf, len); 1530 } 1531 1532 if (socket->socktype != MOS_SOCK_STREAM) { 1533 TRACE_API("Not an end socket. id: %d\n", sockid); 1534 errno = ENOTSOCK; 1535 return -1; 1536 } 1537 1538 /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 1539 cur_stream = socket->stream; 1540 if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf || 1541 !(cur_stream->state >= TCP_ST_ESTABLISHED && 1542 cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 1543 errno = ENOTCONN; 1544 return -1; 1545 } 1546 1547 rcvvar = cur_stream->rcvvar; 1548 1549 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1550 1551 /* if CLOSE_WAIT, return 0 if there is no payload */ 1552 if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 1553 if (!rcvvar->rcvbuf) 1554 return 0; 1555 1556 if (merged_len == 0) 1557 return 0; 1558 } 1559 1560 /* return EAGAIN if no receive buffer */ 1561 if (socket->opts & MTCP_NONBLOCK) { 1562 if (!rcvvar->rcvbuf || merged_len == 0) { 1563 errno = EAGAIN; 1564 return -1; 1565 } 1566 } 1567 1568 SBUF_LOCK(&rcvvar->read_lock); 1569 1570 switch (flags) { 1571 case 0: 1572 ret = CopyToUser(mtcp, cur_stream, buf, len); 1573 break; 1574 case MSG_PEEK: 1575 ret = PeekForUser(mtcp, cur_stream, buf, len); 1576 break; 1577 default: 1578 SBUF_UNLOCK(&rcvvar->read_lock); 1579 ret = -1; 1580 errno = EINVAL; 1581 return ret; 1582 } 1583 1584 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1585 event_remaining = FALSE; 1586 /* if there are remaining payload, generate EPOLLIN */ 1587 /* (may due to insufficient user buffer) */ 1588 if (socket->epoll & MOS_EPOLLIN) { 1589 if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 1590 event_remaining = TRUE; 1591 } 1592 } 1593 /* if waiting for close, notify it if no remaining data */ 1594 if (cur_stream->state == TCP_ST_CLOSE_WAIT && 1595 merged_len == 0 && ret > 0) { 1596 event_remaining = TRUE; 1597 } 1598 1599 SBUF_UNLOCK(&rcvvar->read_lock); 1600 1601 if (event_remaining) { 1602 if (socket->epoll) { 1603 AddEpollEvent(mtcp->ep, 1604 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 1605 } 1606 } 1607 1608 TRACE_API("Stream %d: mtcp_recv() returning %d\n", cur_stream->id, ret); 1609 return ret; 1610 } 1611 /*----------------------------------------------------------------------------*/ 1612 inline ssize_t 1613 mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len) 1614 { 1615 return mtcp_recv(mctx, sockid, buf, len, 0); 1616 } 1617 /*----------------------------------------------------------------------------*/ 1618 ssize_t 1619 mtcp_readv(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV) 1620 { 1621 mtcp_manager_t mtcp; 1622 socket_map_t socket; 1623 tcp_stream *cur_stream; 1624 struct tcp_recv_vars *rcvvar; 1625 int ret, bytes_read, i; 1626 int event_remaining, merged_len; 1627 1628 mtcp = GetMTCPManager(mctx); 1629 if (!mtcp) { 1630 errno = EACCES; 1631 return -1; 1632 } 1633 1634 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1635 TRACE_API("Socket id %d out of range.\n", sockid); 1636 errno = EBADF; 1637 return -1; 1638 } 1639 1640 socket = &mtcp->smap[sockid]; 1641 if (socket->socktype == MOS_SOCK_UNUSED) { 1642 TRACE_API("Invalid socket id: %d\n", sockid); 1643 errno = EBADF; 1644 return -1; 1645 } 1646 1647 if (socket->socktype != MOS_SOCK_STREAM) { 1648 TRACE_API("Not an end socket. id: %d\n", sockid); 1649 errno = ENOTSOCK; 1650 return -1; 1651 } 1652 1653 /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 1654 cur_stream = socket->stream; 1655 if (!cur_stream || 1656 !(cur_stream->state >= TCP_ST_ESTABLISHED && 1657 cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 1658 errno = ENOTCONN; 1659 return -1; 1660 } 1661 1662 rcvvar = cur_stream->rcvvar; 1663 1664 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1665 1666 /* if CLOSE_WAIT, return 0 if there is no payload */ 1667 if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 1668 if (!rcvvar->rcvbuf) 1669 return 0; 1670 1671 if (merged_len == 0) 1672 return 0; 1673 } 1674 1675 /* return EAGAIN if no receive buffer */ 1676 if (socket->opts & MTCP_NONBLOCK) { 1677 if (!rcvvar->rcvbuf || merged_len == 0) { 1678 errno = EAGAIN; 1679 return -1; 1680 } 1681 } 1682 1683 SBUF_LOCK(&rcvvar->read_lock); 1684 1685 /* read and store the contents to the vectored buffers */ 1686 bytes_read = 0; 1687 for (i = 0; i < numIOV; i++) { 1688 if (iov[i].iov_len <= 0) 1689 continue; 1690 1691 ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 1692 if (ret <= 0) 1693 break; 1694 1695 bytes_read += ret; 1696 1697 if (ret < iov[i].iov_len) 1698 break; 1699 } 1700 1701 merged_len = tcprb_cflen(rcvvar->rcvbuf); 1702 1703 event_remaining = FALSE; 1704 /* if there are remaining payload, generate read event */ 1705 /* (may due to insufficient user buffer) */ 1706 if (socket->epoll & MOS_EPOLLIN) { 1707 if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 1708 event_remaining = TRUE; 1709 } 1710 } 1711 /* if waiting for close, notify it if no remaining data */ 1712 if (cur_stream->state == TCP_ST_CLOSE_WAIT && 1713 merged_len == 0 && bytes_read > 0) { 1714 event_remaining = TRUE; 1715 } 1716 1717 SBUF_UNLOCK(&rcvvar->read_lock); 1718 1719 if(event_remaining) { 1720 if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) { 1721 AddEpollEvent(mtcp->ep, 1722 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 1723 } 1724 } 1725 1726 TRACE_API("Stream %d: mtcp_readv() returning %d\n", 1727 cur_stream->id, bytes_read); 1728 return bytes_read; 1729 } 1730 /*----------------------------------------------------------------------------*/ 1731 static inline int 1732 CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, const char *buf, int len) 1733 { 1734 struct tcp_send_vars *sndvar = cur_stream->sndvar; 1735 int sndlen; 1736 int ret; 1737 1738 sndlen = MIN((int)sndvar->snd_wnd, len); 1739 if (sndlen <= 0) { 1740 errno = EAGAIN; 1741 return -1; 1742 } 1743 1744 /* allocate send buffer if not exist */ 1745 if (!sndvar->sndbuf) { 1746 sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1); 1747 if (!sndvar->sndbuf) { 1748 cur_stream->close_reason = TCP_NO_MEM; 1749 /* notification may not required due to -1 return */ 1750 errno = ENOMEM; 1751 return -1; 1752 } 1753 } 1754 1755 ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen); 1756 assert(ret == sndlen); 1757 sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len; 1758 if (ret <= 0) { 1759 TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n", 1760 ret, sndlen, sndvar->sndbuf->len); 1761 errno = EAGAIN; 1762 return -1; 1763 } 1764 1765 if (sndvar->snd_wnd <= 0) { 1766 TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n", 1767 cur_stream->id, sndvar->snd_wnd); 1768 } 1769 1770 return ret; 1771 } 1772 /*----------------------------------------------------------------------------*/ 1773 ssize_t 1774 mtcp_write(mctx_t mctx, int sockid, const char *buf, size_t len) 1775 { 1776 mtcp_manager_t mtcp; 1777 socket_map_t socket; 1778 tcp_stream *cur_stream; 1779 struct tcp_send_vars *sndvar; 1780 int ret; 1781 1782 mtcp = GetMTCPManager(mctx); 1783 if (!mtcp) { 1784 errno = EACCES; 1785 return -1; 1786 } 1787 1788 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1789 TRACE_API("Socket id %d out of range.\n", sockid); 1790 errno = EBADF; 1791 return -1; 1792 } 1793 1794 socket = &mtcp->smap[sockid]; 1795 if (socket->socktype == MOS_SOCK_UNUSED) { 1796 TRACE_API("Invalid socket id: %d\n", sockid); 1797 errno = EBADF; 1798 return -1; 1799 } 1800 1801 if (socket->socktype == MOS_SOCK_PIPE) { 1802 return PipeWrite(mctx, sockid, buf, len); 1803 } 1804 1805 if (socket->socktype != MOS_SOCK_STREAM) { 1806 TRACE_API("Not an end socket. id: %d\n", sockid); 1807 errno = ENOTSOCK; 1808 return -1; 1809 } 1810 1811 cur_stream = socket->stream; 1812 if (!cur_stream || 1813 !(cur_stream->state == TCP_ST_ESTABLISHED || 1814 cur_stream->state == TCP_ST_CLOSE_WAIT)) { 1815 errno = ENOTCONN; 1816 return -1; 1817 } 1818 1819 if (len <= 0) { 1820 if (socket->opts & MTCP_NONBLOCK) { 1821 errno = EAGAIN; 1822 return -1; 1823 } else { 1824 return 0; 1825 } 1826 } 1827 1828 sndvar = cur_stream->sndvar; 1829 1830 SBUF_LOCK(&sndvar->write_lock); 1831 ret = CopyFromUser(mtcp, cur_stream, buf, len); 1832 1833 SBUF_UNLOCK(&sndvar->write_lock); 1834 1835 if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 1836 SQ_LOCK(&mtcp->ctx->sendq_lock); 1837 sndvar->on_sendq = TRUE; 1838 StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 1839 SQ_UNLOCK(&mtcp->ctx->sendq_lock); 1840 mtcp->wakeup_flag = TRUE; 1841 } 1842 1843 if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) { 1844 ret = -1; 1845 errno = EAGAIN; 1846 } 1847 1848 /* if there are remaining sending buffer, generate write event */ 1849 if (sndvar->snd_wnd > 0) { 1850 if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 1851 AddEpollEvent(mtcp->ep, 1852 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 1853 } 1854 } 1855 1856 TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret); 1857 return ret; 1858 } 1859 /*----------------------------------------------------------------------------*/ 1860 ssize_t 1861 mtcp_writev(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV) 1862 { 1863 mtcp_manager_t mtcp; 1864 socket_map_t socket; 1865 tcp_stream *cur_stream; 1866 struct tcp_send_vars *sndvar; 1867 int ret, to_write, i; 1868 1869 mtcp = GetMTCPManager(mctx); 1870 if (!mtcp) { 1871 errno = EACCES; 1872 return -1; 1873 } 1874 1875 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1876 TRACE_API("Socket id %d out of range.\n", sockid); 1877 errno = EBADF; 1878 return -1; 1879 } 1880 1881 socket = &mtcp->smap[sockid]; 1882 if (socket->socktype == MOS_SOCK_UNUSED) { 1883 TRACE_API("Invalid socket id: %d\n", sockid); 1884 errno = EBADF; 1885 return -1; 1886 } 1887 1888 if (socket->socktype != MOS_SOCK_STREAM) { 1889 TRACE_API("Not an end socket. id: %d\n", sockid); 1890 errno = ENOTSOCK; 1891 return -1; 1892 } 1893 1894 cur_stream = socket->stream; 1895 if (!cur_stream || 1896 !(cur_stream->state == TCP_ST_ESTABLISHED || 1897 cur_stream->state == TCP_ST_CLOSE_WAIT)) { 1898 errno = ENOTCONN; 1899 return -1; 1900 } 1901 1902 sndvar = cur_stream->sndvar; 1903 SBUF_LOCK(&sndvar->write_lock); 1904 1905 /* write from the vectored buffers */ 1906 to_write = 0; 1907 for (i = 0; i < numIOV; i++) { 1908 if (iov[i].iov_len <= 0) 1909 continue; 1910 1911 ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 1912 if (ret <= 0) 1913 break; 1914 1915 to_write += ret; 1916 1917 if (ret < iov[i].iov_len) 1918 break; 1919 } 1920 SBUF_UNLOCK(&sndvar->write_lock); 1921 1922 if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 1923 SQ_LOCK(&mtcp->ctx->sendq_lock); 1924 sndvar->on_sendq = TRUE; 1925 StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 1926 SQ_UNLOCK(&mtcp->ctx->sendq_lock); 1927 mtcp->wakeup_flag = TRUE; 1928 } 1929 1930 if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) { 1931 to_write = -1; 1932 errno = EAGAIN; 1933 } 1934 1935 /* if there are remaining sending buffer, generate write event */ 1936 if (sndvar->snd_wnd > 0) { 1937 if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 1938 AddEpollEvent(mtcp->ep, 1939 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 1940 } 1941 } 1942 1943 TRACE_API("Stream %d: mtcp_writev() returning %d\n", 1944 cur_stream->id, to_write); 1945 return to_write; 1946 } 1947 /*----------------------------------------------------------------------------*/ 1948 uint32_t 1949 mtcp_get_connection_cnt(mctx_t mctx) 1950 { 1951 mtcp_manager_t mtcp; 1952 mtcp = GetMTCPManager(mctx); 1953 if (!mtcp) { 1954 errno = EACCES; 1955 return -1; 1956 } 1957 1958 if (mtcp->num_msp > 0) 1959 return mtcp->flow_cnt / 2; 1960 else 1961 return mtcp->flow_cnt; 1962 } 1963 /*----------------------------------------------------------------------------*/ 1964