1 #include "debug.h" 2 #include <string.h> 3 4 #include "config.h" 5 #include "tcp_stream.h" 6 #include "fhash.h" 7 #include "tcp.h" 8 #include "tcp_in.h" 9 #include "tcp_out.h" 10 #include "tcp_ring_buffer.h" 11 #include "tcp_send_buffer.h" 12 #include "eventpoll.h" 13 #include "ip_out.h" 14 #include "timer.h" 15 #include "tcp_rb.h" 16 /*---------------------------------------------------------------------------*/ 17 char *state_str[] = { 18 "TCP_ST_CLOSED", 19 "TCP_ST_LISTEN", 20 "TCP_ST_SYN_SENT", 21 "TCP_ST_SYN_RCVD", 22 "TCP_ST_ESTABILSHED", 23 "TCP_ST_FIN_WAIT_1", 24 "TCP_ST_FIN_WAIT_2", 25 "TCP_ST_CLOSE_WAIT", 26 "TCP_ST_CLOSING", 27 "TCP_ST_LAST_ACK", 28 "TCP_ST_TIME_WAIT", 29 "TCP_ST_CLOSED_RSVD" 30 }; 31 /*---------------------------------------------------------------------------*/ 32 char *close_reason_str[] = { 33 "NOT_CLOSED", 34 "CLOSE", 35 "CLOSED", 36 "CONN_FAIL", 37 "CONN_LOST", 38 "RESET", 39 "NO_MEM", 40 "DENIED", 41 "TIMEDOUT" 42 }; 43 /*---------------------------------------------------------------------------*/ 44 static __thread unsigned long next = 1; 45 /* Function retrieved from POSIX.1-2001 standard */ 46 /* RAND_MAX assumed to be 32767 */ 47 static int 48 posix_seq_rand(void) { 49 next = next * 1103515245 + 12345; 50 return ((unsigned)(next/65536) % 32768); 51 } 52 /*---------------------------------------------------------------------------*/ 53 void 54 posix_seq_srand(unsigned seed) { 55 next = seed % 32768; 56 } 57 /*---------------------------------------------------------------------------*/ 58 /** 59 * FYI: This is NOT a read-only return! 60 */ 61 int 62 GetFragInfo(socket_map_t sock, int side, void *optval, socklen_t *len) 63 { 64 struct tcp_stream *stream; 65 66 stream = NULL; 67 if (!*len || ( *len % sizeof(tcpfrag_t) != 0)) 68 goto frag_info_error; 69 70 if (side != MOS_SIDE_CLI && side != MOS_SIDE_SVR) { 71 TRACE_ERROR("Invalid side requested!\n"); 72 exit(EXIT_FAILURE); 73 return -1; 74 } 75 76 struct tcp_stream *mstrm = sock->monitor_stream->stream; 77 stream = (side == mstrm->side) ? mstrm : mstrm->pair_stream; 78 79 if (stream == NULL) goto frag_info_error; 80 81 /* First check if the tcp ring buffer even has anything */ 82 if (stream->rcvvar != NULL && 83 stream->rcvvar->rcvbuf != NULL) { 84 tcprb_t *rcvbuf = stream->rcvvar->rcvbuf; 85 struct tcp_ring_fragment *out = (struct tcp_ring_fragment *)optval; 86 int const maxout = *len; 87 *len = 0; 88 struct _tcpfrag_t *walk; 89 TAILQ_FOREACH(walk, &rcvbuf->frags, link) { 90 if (*len == maxout) 91 break; 92 out[*len].offset = walk->head; 93 out[*len].len = walk->tail - walk->head; 94 (*len)++; 95 } 96 if (*len != maxout) { 97 /* set zero sentinel */ 98 out[*len].offset = 0; 99 out[*len].len = 0; 100 } 101 } else 102 goto frag_info_error; 103 104 return 0; 105 106 frag_info_error: 107 optval = NULL; 108 *len = 0; 109 return -1; 110 } 111 /*---------------------------------------------------------------------------*/ 112 /** 113 * Comments later... 114 */ 115 int 116 GetBufInfo(socket_map_t sock, int side, void *optval, socklen_t *len) 117 { 118 struct tcp_stream *stream; 119 struct tcp_buf_info *tbi; 120 121 tbi = (struct tcp_buf_info *)optval; 122 memset(tbi, 0, sizeof(struct tcp_buf_info)); 123 stream = NULL; 124 125 if (*len != sizeof(struct tcp_buf_info)) { 126 errno = EINVAL; 127 goto buf_info_error; 128 } 129 130 if (side != MOS_SIDE_CLI && side != MOS_SIDE_SVR) { 131 TRACE_ERROR("Invalid side requested!\n"); 132 errno = EINVAL; 133 goto buf_info_error; 134 } 135 136 struct tcp_stream *mstrm = sock->monitor_stream->stream; 137 stream = (side == mstrm->side) ? mstrm : mstrm->pair_stream; 138 139 /* First check if the tcp ring buffer even has anything */ 140 if (stream != NULL && 141 stream->rcvvar != NULL && 142 stream->rcvvar->rcvbuf != NULL) { 143 tcprb_t *rcvbuf = stream->rcvvar->rcvbuf; 144 tcpfrag_t *f = TAILQ_LAST(&rcvbuf->frags, flist); 145 tbi->tcpbi_init_seq = stream->rcvvar->irs + 1; 146 tbi->tcpbi_last_byte_read = rcvbuf->pile; 147 tbi->tcpbi_next_byte_expected = rcvbuf->pile + tcprb_cflen(rcvbuf); 148 tbi->tcpbi_last_byte_received = (f ? f->tail : rcvbuf->head); 149 } else { 150 errno = ENODATA; 151 goto buf_info_error; 152 } 153 154 return 0; 155 156 buf_info_error: 157 optval = NULL; 158 *len = 0; 159 return -1; 160 } 161 /*---------------------------------------------------------------------------*/ 162 int 163 DisableBuf(socket_map_t sock, int side) 164 { 165 #ifdef DBGMSG 166 __PREPARE_DBGLOGGING(); 167 #endif 168 struct tcp_stream *stream; 169 int rc = 0; 170 171 switch (sock->socktype) { 172 case MOS_SOCK_MONITOR_STREAM: 173 if (side == MOS_SIDE_CLI) 174 sock->monitor_listener->client_buf_mgmt = 0; 175 else if (side == MOS_SIDE_SVR) 176 sock->monitor_listener->server_buf_mgmt = 0; 177 else { 178 assert(0); 179 TRACE_DBG("Invalid side!\n"); 180 rc = -1; 181 } 182 break; 183 case MOS_SOCK_MONITOR_STREAM_ACTIVE: 184 stream = sock->monitor_stream->stream; 185 if (stream->side != side) 186 stream = stream->pair_stream; 187 assert(stream->side == side); 188 stream->buffer_mgmt = 0; 189 break; 190 default: 191 assert(0); 192 TRACE_DBG("Can't disable buf for invalid socket!\n"); 193 rc = -1; 194 } 195 196 return rc; 197 } 198 /*---------------------------------------------------------------------------*/ 199 int 200 GetLastTimestamp(struct tcp_stream *stream, uint32_t *usecs, socklen_t *len) 201 { 202 #ifdef DBGMSG 203 __PREPARE_DBGLOGGING(); 204 #endif 205 if (*len < sizeof(uint32_t)) { 206 TRACE_DBG("Size passed is not >= sizeof(uint32_t)!\n"); 207 return -1; 208 } 209 210 *usecs = (stream->last_active_ts > 211 stream->pair_stream->last_active_ts) 212 ? 213 TS_TO_USEC(stream->last_active_ts) : 214 TS_TO_USEC(stream->pair_stream->last_active_ts); 215 216 return 0; 217 } 218 /*---------------------------------------------------------------------------*/ 219 inline int 220 GetTCPState(struct tcp_stream *stream, int side, 221 void *optval, socklen_t *optlen) 222 { 223 if (!stream || !(stream = (side == stream->side) ? stream : stream->pair_stream)) 224 return -1; 225 *(int *)optval = (int)((stream->state == TCP_ST_CLOSED_RSVD) ? 226 TCP_ST_CLOSED : stream->state); 227 return 0; 228 } 229 /*---------------------------------------------------------------------------*/ 230 inline char * 231 TCPStateToString(const tcp_stream *stream) 232 { 233 return (stream) ? state_str[stream->state] : NULL; 234 } 235 /*---------------------------------------------------------------------------*/ 236 inline void 237 RaiseReadEvent(mtcp_manager_t mtcp, tcp_stream *stream) 238 { 239 struct tcp_recv_vars *rcvvar; 240 241 rcvvar = stream->rcvvar; 242 243 if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) { 244 if (stream->socket && (stream->socket->epoll & MOS_EPOLLIN)) 245 AddEpollEvent(mtcp->ep, MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLIN); 246 } else if (rcvvar->rcvbuf && tcprb_cflen(rcvvar->rcvbuf) > 0) { 247 /* 248 * in case it is a monitoring socket, queue up the read events 249 * in the event_queue of only if the tcp_stream hasn't already 250 * been registered in the event queue 251 */ 252 int index; 253 struct event_queue *eq; 254 struct socket_map *walk; 255 256 SOCKQ_FOREACH_START(walk, &stream->msocks) { 257 assert(walk->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE); 258 eq = walk->monitor_stream->monitor_listener->eq; 259 260 /* if it already has read data register... then skip this step */ 261 if (stream->actions & MOS_ACT_READ_DATA) 262 return; 263 if (eq->num_events >= eq->size) { 264 TRACE_ERROR("Exceeded epoll event queue! num_events: %d, " 265 "size: %d\n", eq->num_events, eq->size); 266 return; 267 } 268 269 index = eq->end++; 270 eq->events[index].ev.events = MOS_EPOLLIN; 271 eq->events[index].ev.data.ptr = (void *)stream; 272 273 if (eq->end >= eq->size) { 274 eq->end = 0; 275 } 276 eq->num_events++; 277 stream->actions |= MOS_ACT_READ_DATA; 278 } SOCKQ_FOREACH_END; 279 } else { 280 TRACE_EPOLL("Stream %d: Raising read without a socket!\n", stream->id); 281 } 282 } 283 /*---------------------------------------------------------------------------*/ 284 inline void 285 RaiseWriteEvent(mtcp_manager_t mtcp, tcp_stream *stream) 286 { 287 if (stream->socket) { 288 if (stream->socket->epoll & MOS_EPOLLOUT) { 289 AddEpollEvent(mtcp->ep, 290 MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLOUT); 291 } 292 } else { 293 TRACE_EPOLL("Stream %d: Raising write without a socket!\n", stream->id); 294 } 295 } 296 /*---------------------------------------------------------------------------*/ 297 inline void 298 RaiseCloseEvent(mtcp_manager_t mtcp, tcp_stream *stream) 299 { 300 if (stream->socket) { 301 if (stream->socket->epoll & MOS_EPOLLRDHUP) { 302 AddEpollEvent(mtcp->ep, 303 MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLRDHUP); 304 } else if (stream->socket->epoll & MOS_EPOLLIN) { 305 AddEpollEvent(mtcp->ep, 306 MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLIN); 307 } 308 } else { 309 TRACE_EPOLL("Stream %d: Raising close without a socket!\n", stream->id); 310 } 311 } 312 /*---------------------------------------------------------------------------*/ 313 inline int 314 RaiseErrorEvent(mtcp_manager_t mtcp, tcp_stream *stream) 315 { 316 if (stream->socket) { 317 if (stream->socket->epoll & MOS_EPOLLERR) { 318 /* passing closing reason for error notification */ 319 return AddEpollEvent(mtcp->ep, 320 MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLERR); 321 } 322 } else { 323 TRACE_EPOLL("Stream %d: Raising error without a socket!\n", stream->id); 324 } 325 return -1; 326 } 327 /*----------------------------------------------------------------------------*/ 328 int 329 AddMonitorStreamSockets(mtcp_manager_t mtcp, struct tcp_stream *stream) 330 { 331 struct mtcp_context mctx; 332 int socktype; 333 334 mctx.cpu = mtcp->ctx->cpu; 335 struct mon_listener *walk; 336 337 // traverse the passive socket's list 338 TAILQ_FOREACH(walk, &mtcp->monitors, link) { 339 socktype = walk->socket->socktype; 340 341 if (socktype != MOS_SOCK_MONITOR_STREAM) 342 continue; 343 344 /* mtcp_bind_monitor_filter() 345 * - create an monitor active socket only for the filter-passed flows 346 * - we use the result (= tag) from DetectStreamType() to avoid 347 * evaluating the same BPF filter twice */ 348 if (!walk->is_stream_syn_filter_hit) { 349 continue; 350 } 351 352 struct socket_map *s = 353 AllocateSocket(&mctx, MOS_SOCK_MONITOR_STREAM_ACTIVE); 354 if (!s) 355 return -1; 356 357 s->monitor_stream->socket = s; 358 s->monitor_stream->stream = stream; 359 s->monitor_stream->monitor_listener = walk; 360 s->monitor_stream->client_buf_mgmt = walk->client_buf_mgmt; 361 s->monitor_stream->server_buf_mgmt = walk->server_buf_mgmt; 362 s->monitor_stream->client_mon = walk->client_mon; 363 s->monitor_stream->server_mon = walk->server_mon; 364 #ifdef NEWEV 365 s->monitor_stream->stree_dontcare = 366 s->monitor_stream->monitor_listener->stree_dontcare; 367 s->monitor_stream->stree_pre_rcv = 368 s->monitor_stream->monitor_listener->stree_pre_rcv; 369 s->monitor_stream->stree_post_snd = 370 s->monitor_stream->monitor_listener->stree_post_snd; 371 if (s->monitor_stream->stree_dontcare) 372 stree_inc_ref(s->monitor_stream->stree_dontcare); 373 if (s->monitor_stream->stree_pre_rcv) 374 stree_inc_ref(s->monitor_stream->stree_pre_rcv); 375 if (s->monitor_stream->stree_post_snd) 376 stree_inc_ref(s->monitor_stream->stree_post_snd); 377 #else 378 InitEvP(&s->monitor_stream->dontcare_evp, 379 &walk->dontcare_evb); 380 InitEvP(&s->monitor_stream->pre_tcp_evp, 381 &walk->pre_tcp_evb); 382 InitEvP(&s->monitor_stream->post_tcp_evp, 383 &walk->post_tcp_evb); 384 #endif 385 386 SOCKQ_INSERT_TAIL(&stream->msocks, s); 387 } 388 389 return 0; 390 } 391 /*----------------------------------------------------------------------------*/ 392 int 393 DestroyMonitorStreamSocket(mtcp_manager_t mtcp, socket_map_t msock) 394 { 395 struct mtcp_context mctx; 396 int socktype, sockid, rc; 397 398 if (msock == NULL) { 399 TRACE_DBG("Stream socket does not exist!\n"); 400 /* exit(-1); */ 401 return 0; 402 } 403 404 rc = 0; 405 mctx.cpu = mtcp->ctx->cpu; 406 socktype = msock->socktype; 407 sockid = msock->id; 408 409 switch (socktype) { 410 case MOS_SOCK_MONITOR_STREAM_ACTIVE: 411 FreeSocket(&mctx, sockid, socktype); 412 break; 413 case MOS_SOCK_MONITOR_RAW: 414 /* do nothing since all raw sockets point to the same socket */ 415 break; 416 default: 417 TRACE_DBG("Trying to destroy a monitor socket for an unsupported type!\n"); 418 rc = -1; 419 /* exit(-1); */ 420 break; 421 } 422 423 return rc; 424 } 425 /*---------------------------------------------------------------------------*/ 426 tcp_stream * 427 CreateTCPStream(mtcp_manager_t mtcp, socket_map_t socket, int type, 428 uint32_t saddr, uint16_t sport, uint32_t daddr, uint16_t dport, 429 unsigned int *hash) 430 { 431 tcp_stream *stream = NULL; 432 int ret; 433 /* stand-alone monitor does not need this since it is single-threaded */ 434 bool flow_lock = type & STREAM_TYPE(MOS_SOCK_STREAM); 435 //bool flow_lock = false; 436 437 if (flow_lock) 438 pthread_mutex_lock(&mtcp->ctx->flow_pool_lock); 439 440 stream = (tcp_stream *)MPAllocateChunk(mtcp->flow_pool); 441 if (!stream) { 442 TRACE_ERROR("Cannot allocate memory for the stream. " 443 "g_config.mos->max_concurrency: %d, concurrent: %u\n", 444 g_config.mos->max_concurrency, mtcp->flow_cnt); 445 if (flow_lock) 446 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock); 447 return NULL; 448 } 449 memset(stream, 0, sizeof(tcp_stream)); 450 451 stream->rcvvar = (struct tcp_recv_vars *)MPAllocateChunk(mtcp->rv_pool); 452 if (!stream->rcvvar) { 453 MPFreeChunk(mtcp->flow_pool, stream); 454 if (flow_lock) 455 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock); 456 return NULL; 457 } 458 memset(stream->rcvvar, 0, sizeof(struct tcp_recv_vars)); 459 460 /* stand-alone monitor does not need to do this */ 461 stream->sndvar = (struct tcp_send_vars *)MPAllocateChunk(mtcp->sv_pool); 462 if (!stream->sndvar) { 463 MPFreeChunk(mtcp->rv_pool, stream->rcvvar); 464 MPFreeChunk(mtcp->flow_pool, stream); 465 if (flow_lock) 466 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock); 467 return NULL; 468 } 469 //if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) 470 memset(stream->sndvar, 0, sizeof(struct tcp_send_vars)); 471 472 stream->id = mtcp->g_id++; 473 stream->saddr = saddr; 474 stream->sport = sport; 475 stream->daddr = daddr; 476 stream->dport = dport; 477 478 ret = HTInsert(mtcp->tcp_flow_table, stream, hash); 479 if (ret < 0) { 480 TRACE_ERROR("Stream %d: " 481 "Failed to insert the stream into hash table.\n", stream->id); 482 MPFreeChunk(mtcp->flow_pool, stream); 483 if (flow_lock) 484 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock); 485 return NULL; 486 } 487 stream->on_hash_table = TRUE; 488 mtcp->flow_cnt++; 489 490 SOCKQ_INIT(&stream->msocks); 491 492 /* 493 * if an embedded monitor is attached... 494 * create monitor stream socket now! 495 * If socket type is raw.. then don't create it 496 */ 497 if ((mtcp->num_msp > 0) && 498 (type & STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE))) 499 if (AddMonitorStreamSockets(mtcp, stream) < 0) 500 TRACE_DBG("Could not create monitor stream socket!\n"); 501 502 if (flow_lock) 503 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock); 504 505 if (socket) { 506 stream->socket = socket; 507 socket->stream = stream; 508 } 509 510 stream->stream_type = type; 511 stream->state = TCP_ST_LISTEN; 512 /* This is handled by core.c, tcp_in.c & tcp_out.c */ 513 /* stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; */ 514 515 stream->on_rto_idx = -1; 516 517 /* stand-alone monitor does not need to do this */ 518 stream->sndvar->mss = TCP_DEFAULT_MSS; 519 stream->sndvar->wscale_mine = TCP_DEFAULT_WSCALE; 520 stream->sndvar->wscale_peer = 0; 521 522 if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) { 523 stream->sndvar->ip_id = 0; 524 stream->sndvar->nif_out = GetOutputInterface(stream->daddr); 525 526 stream->sndvar->iss = posix_seq_rand() % TCP_MAX_SEQ; 527 //stream->sndvar->iss = 0; 528 stream->snd_nxt = stream->sndvar->iss; 529 stream->sndvar->snd_una = stream->sndvar->iss; 530 stream->sndvar->snd_wnd = g_config.mos->wmem_size; 531 stream->sndvar->rto = TCP_INITIAL_RTO; 532 #if USE_SPIN_LOCK 533 if (pthread_spin_init(&stream->sndvar->write_lock, PTHREAD_PROCESS_PRIVATE)) { 534 perror("pthread_spin_init of write_lock"); 535 pthread_spin_destroy(&stream->rcvvar->read_lock); 536 #else 537 if (pthread_mutex_init(&stream->sndvar->write_lock, NULL)) { 538 perror("pthread_mutex_init of write_lock"); 539 pthread_mutex_destroy(&stream->rcvvar->read_lock); 540 #endif 541 return NULL; 542 } 543 } 544 stream->rcvvar->irs = 0; 545 546 stream->rcv_nxt = 0; 547 stream->rcvvar->rcv_wnd = TCP_INITIAL_WINDOW; 548 549 stream->rcvvar->snd_wl1 = stream->rcvvar->irs - 1; 550 551 stream->buffer_mgmt = BUFMGMT_FULL; 552 553 /* needs state update by default */ 554 stream->status_mgmt = 1; 555 556 #if USE_SPIN_LOCK 557 if (pthread_spin_init(&stream->rcvvar->read_lock, PTHREAD_PROCESS_PRIVATE)) { 558 #else 559 if (pthread_mutex_init(&stream->rcvvar->read_lock, NULL)) { 560 #endif 561 perror("pthread_mutex_init of read_lock"); 562 return NULL; 563 } 564 565 #ifdef STREAM 566 uint8_t *sa; 567 uint8_t *da; 568 569 sa = (uint8_t *)&stream->saddr; 570 da = (uint8_t *)&stream->daddr; 571 TRACE_STREAM("CREATED NEW TCP STREAM %d: " 572 "%u.%u.%u.%u(%d) -> %u.%u.%u.%u(%d) (ISS: %u)\n", stream->id, 573 sa[0], sa[1], sa[2], sa[3], ntohs(stream->sport), 574 da[0], da[1], da[2], da[3], ntohs(stream->dport), 575 stream->sndvar->iss); 576 #endif 577 578 return stream; 579 } 580 /*----------------------------------------------------------------------------*/ 581 inline tcp_stream * 582 CreateDualTCPStream(mtcp_manager_t mtcp, socket_map_t socket, int type, uint32_t saddr, 583 uint16_t sport, uint32_t daddr, uint16_t dport, unsigned int *hash) 584 { 585 tcp_stream *cur_stream, *paired_stream; 586 struct socket_map *walk; 587 588 cur_stream = CreateTCPStream(mtcp, socket, type, 589 saddr, sport, daddr, dport, hash); 590 if (cur_stream == NULL) { 591 TRACE_ERROR("Can't create tcp_stream!\n"); 592 return NULL; 593 } 594 595 paired_stream = CreateTCPStream(mtcp, NULL, MOS_SOCK_UNUSED, 596 daddr, dport, saddr, sport, hash); 597 if (paired_stream == NULL) { 598 DestroyTCPStream(mtcp, cur_stream); 599 TRACE_ERROR("Can't create tcp_stream!\n"); 600 return NULL; 601 } 602 603 cur_stream->pair_stream = paired_stream; 604 paired_stream->pair_stream = cur_stream; 605 paired_stream->socket = socket; 606 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 607 SOCKQ_INSERT_TAIL(&paired_stream->msocks, walk); 608 } SOCKQ_FOREACH_END; 609 paired_stream->stream_type = STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE); 610 611 return cur_stream; 612 } 613 /*----------------------------------------------------------------------------*/ 614 inline tcp_stream * 615 CreateClientTCPStream(mtcp_manager_t mtcp, socket_map_t socket, int type, 616 uint32_t saddr, uint16_t sport, uint32_t daddr, uint16_t dport, 617 unsigned int *hash) 618 { 619 tcp_stream *cs; 620 struct socket_map *w; 621 622 cs = CreateTCPStream(mtcp, socket, type, daddr, dport, saddr, sport, hash); 623 if (cs == NULL) { 624 TRACE_ERROR("Can't create tcp_stream!\n"); 625 return NULL; 626 } 627 628 cs->side = MOS_SIDE_CLI; 629 cs->pair_stream = NULL; 630 631 /* if buffer management is off, then disable 632 * monitoring tcp ring of either streams (only if stream 633 * is just monitor stream active) 634 */ 635 if (IS_STREAM_TYPE(cs, MOS_SOCK_MONITOR_STREAM_ACTIVE)) { 636 cs->buffer_mgmt = BUFMGMT_OFF; 637 SOCKQ_FOREACH_START(w, &cs->msocks) { 638 uint8_t bm = w->monitor_stream->client_buf_mgmt; 639 if (bm > cs->buffer_mgmt) 640 cs->buffer_mgmt = bm; 641 if (w->monitor_stream->monitor_listener->client_mon == 1) 642 cs->status_mgmt = 1; 643 } SOCKQ_FOREACH_END; 644 } 645 646 return cs; 647 } 648 /*----------------------------------------------------------------------------*/ 649 inline tcp_stream * 650 AttachServerTCPStream(mtcp_manager_t mtcp, tcp_stream *cs, int type, 651 uint32_t saddr, uint16_t sport, uint32_t daddr, uint16_t dport) 652 { 653 tcp_stream *ss; 654 struct socket_map *w; 655 656 /* The 3rd arg is a temp hackk... FIXIT! TODO: XXX */ 657 ss = CreateTCPStream(mtcp, NULL, MOS_SOCK_UNUSED, saddr, sport, daddr, dport, NULL); 658 if (ss == NULL) { 659 TRACE_ERROR("Can't create tcp_stream!\n"); 660 return NULL; 661 } 662 663 ss->side = MOS_SIDE_SVR; 664 cs->pair_stream = ss; 665 ss->pair_stream = cs; 666 ss->socket = cs->socket; 667 SOCKQ_FOREACH_START(w, &cs->msocks) { 668 SOCKQ_INSERT_TAIL(&ss->msocks, w); 669 } SOCKQ_FOREACH_END; 670 ss->stream_type = STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE); 671 672 if (IS_STREAM_TYPE(ss, MOS_SOCK_MONITOR_STREAM_ACTIVE)) { 673 ss->buffer_mgmt = BUFMGMT_OFF; 674 SOCKQ_FOREACH_START(w, &ss->msocks) { 675 uint8_t bm = w->monitor_stream->server_buf_mgmt; 676 if (bm > ss->buffer_mgmt) 677 ss->buffer_mgmt = bm; 678 if (w->monitor_stream->monitor_listener->server_mon == 1) 679 ss->status_mgmt = 1; 680 } SOCKQ_FOREACH_END; 681 } 682 683 return ss; 684 } 685 /*---------------------------------------------------------------------------*/ 686 static void 687 DestroySingleTCPStream(mtcp_manager_t mtcp, tcp_stream *stream) 688 { 689 struct sockaddr_in addr; 690 int bound_addr = FALSE; 691 int ret; 692 /* stand-alone monitor does not need this since it is single-threaded */ 693 bool flow_lock = HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM); 694 695 struct socket_map *walk; 696 697 /* Set the stream state as CLOSED */ 698 stream->state = TCP_ST_CLOSED_RSVD; 699 700 SOCKQ_FOREACH_START(walk, &stream->msocks) { 701 HandleCallback(mtcp, MOS_HK_RCV, walk, stream->side, NULL, 702 MOS_ON_CONN_END | MOS_ON_TCP_STATE_CHANGE | stream->cb_events); 703 HandleCallback(mtcp, MOS_HK_SND, walk, stream->side, NULL, 704 MOS_ON_CONN_END | MOS_ON_TCP_STATE_CHANGE | stream->cb_events); 705 } SOCKQ_FOREACH_END; 706 707 #if 0 708 #ifdef DUMP_STREAM 709 if (stream->close_reason != TCP_ACTIVE_CLOSE && 710 stream->close_reason != TCP_PASSIVE_CLOSE) { 711 thread_printf(mtcp, mtcp->log_fp, 712 "Stream %d abnormally closed.\n", stream->id); 713 DumpStream(mtcp, stream); 714 DumpControlList(mtcp, mtcp->n_sender[0]); 715 } 716 #endif 717 718 #ifdef STREAM 719 uint8_t *sa, *da; 720 sa = (uint8_t *)&stream->saddr; 721 da = (uint8_t *)&stream->daddr; 722 TRACE_STREAM("DESTROY TCP STREAM %d: " 723 "%u.%u.%u.%u(%d) -> %u.%u.%u.%u(%d) (%s)\n", stream->id, 724 sa[0], sa[1], sa[2], sa[3], ntohs(stream->sport), 725 da[0], da[1], da[2], da[3], ntohs(stream->dport), 726 close_reason_str[stream->close_reason]); 727 #endif 728 729 if (stream->sndvar->sndbuf) { 730 TRACE_FSTAT("Stream %d: send buffer " 731 "cum_len: %lu, len: %u\n", stream->id, 732 stream->sndvar->sndbuf->cum_len, 733 stream->sndvar->sndbuf->len); 734 } 735 if (stream->rcvvar->rcvbuf) { 736 TRACE_FSTAT("Stream %d: recv buffer " 737 "cum_len: %lu, merged_len: %u, last_len: %u\n", stream->id, 738 stream->rcvvar->rcvbuf->cum_len, 739 stream->rcvvar->rcvbuf->merged_len, 740 stream->rcvvar->rcvbuf->last_len); 741 } 742 743 #if RTM_STAT 744 /* Triple duplicated ack stats */ 745 if (stream->sndvar->rstat.tdp_ack_cnt) { 746 TRACE_FSTAT("Stream %d: triple duplicated ack: %u, " 747 "retransmission bytes: %u, average rtm bytes/ack: %u\n", 748 stream->id, 749 stream->sndvar->rstat.tdp_ack_cnt, stream->sndvar->rstat.tdp_ack_bytes, 750 stream->sndvar->rstat.tdp_ack_bytes / stream->sndvar->rstat.tdp_ack_cnt); 751 } 752 753 /* Retransmission timeout stats */ 754 if (stream->sndvar->rstat.rto_cnt > 0) { 755 TRACE_FSTAT("Stream %d: timeout count: %u, bytes: %u\n", stream->id, 756 stream->sndvar->rstat.rto_cnt, stream->sndvar->rstat.rto_bytes); 757 } 758 759 /* Recovery stats */ 760 if (stream->sndvar->rstat.ack_upd_cnt) { 761 TRACE_FSTAT("Stream %d: snd_nxt update count: %u, " 762 "snd_nxt update bytes: %u, average update bytes/update: %u\n", 763 stream->id, 764 stream->sndvar->rstat.ack_upd_cnt, stream->sndvar->rstat.ack_upd_bytes, 765 stream->sndvar->rstat.ack_upd_bytes / stream->sndvar->rstat.ack_upd_cnt); 766 } 767 #if TCP_OPT_SACK_ENABLED 768 if (stream->sndvar->rstat.sack_cnt) { 769 TRACE_FSTAT("Selective ack count: %u, bytes: %u, " 770 "average bytes/ack: %u\n", 771 stream->sndvar->rstat.sack_cnt, stream->sndvar->rstat.sack_bytes, 772 stream->sndvar->rstat.sack_bytes / stream->sndvar->rstat.sack_cnt); 773 } else { 774 TRACE_FSTAT("Selective ack count: %u, bytes: %u\n", 775 stream->sndvar->rstat.sack_cnt, stream->sndvar->rstat.sack_bytes); 776 } 777 if (stream->sndvar->rstat.tdp_sack_cnt) { 778 TRACE_FSTAT("Selective tdp ack count: %u, bytes: %u, " 779 "average bytes/ack: %u\n", 780 stream->sndvar->rstat.tdp_sack_cnt, stream->sndvar->rstat.tdp_sack_bytes, 781 stream->sndvar->rstat.tdp_sack_bytes / stream->sndvar->rstat.tdp_sack_cnt); 782 } else { 783 TRACE_FSTAT("Selective ack count: %u, bytes: %u\n", 784 stream->sndvar->rstat.tdp_sack_cnt, stream->sndvar->rstat.tdp_sack_bytes); 785 } 786 #endif /* TCP_OPT_SACK_ENABLED */ 787 #endif /* RTM_STAT */ 788 #endif 789 790 if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) { 791 /* stand-alone monitor does not need to do these */ 792 if (stream->is_bound_addr) { 793 bound_addr = TRUE; 794 addr.sin_addr.s_addr = stream->saddr; 795 addr.sin_port = stream->sport; 796 } 797 798 RemoveFromControlList(mtcp, stream); 799 RemoveFromSendList(mtcp, stream); 800 RemoveFromACKList(mtcp, stream); 801 802 if (stream->on_rto_idx >= 0) 803 RemoveFromRTOList(mtcp, stream); 804 805 SBUF_LOCK_DESTROY(&stream->rcvvar->read_lock); 806 SBUF_LOCK_DESTROY(&stream->sndvar->write_lock); 807 808 assert(stream->on_hash_table == TRUE); 809 810 /* free ring buffers */ 811 if (stream->sndvar->sndbuf) { 812 SBFree(mtcp->rbm_snd, stream->sndvar->sndbuf); 813 stream->sndvar->sndbuf = NULL; 814 } 815 } 816 817 if (stream->on_timewait_list) 818 RemoveFromTimewaitList(mtcp, stream); 819 820 if (g_config.mos->tcp_timeout > 0) 821 RemoveFromTimeoutList(mtcp, stream); 822 823 if (stream->rcvvar->rcvbuf) { 824 tcprb_del(stream->rcvvar->rcvbuf); 825 stream->rcvvar->rcvbuf = NULL; 826 } 827 828 if (flow_lock) 829 pthread_mutex_lock(&mtcp->ctx->flow_pool_lock); 830 831 /* remove from flow hash table */ 832 HTRemove(mtcp->tcp_flow_table, stream); 833 stream->on_hash_table = FALSE; 834 835 mtcp->flow_cnt--; 836 837 /* if there was a corresponding monitor stream socket opened 838 * then close it */ 839 SOCKQ_FOREACH_START(walk, &stream->msocks) { 840 SOCKQ_REMOVE(&stream->msocks, walk); 841 if (stream->pair_stream == NULL) 842 DestroyMonitorStreamSocket(mtcp, walk); 843 } SOCKQ_FOREACH_END; 844 845 if (stream->pair_stream != NULL) { 846 /* Nullify pointer to sibliing tcp_stream's pair_stream */ 847 stream->pair_stream->pair_stream = NULL; 848 } 849 850 MPFreeChunk(mtcp->rv_pool, stream->rcvvar); 851 MPFreeChunk(mtcp->sv_pool, stream->sndvar); 852 MPFreeChunk(mtcp->flow_pool, stream); 853 854 if (flow_lock) 855 /* stand-alone monitor does not need this since it is single-threaded */ 856 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock); 857 858 if (bound_addr) { 859 if (mtcp->ap) { 860 ret = FreeAddress(mtcp->ap, &addr); 861 } else { 862 int nif; 863 nif = GetOutputInterface(addr.sin_addr.s_addr); 864 if (nif < 0) { 865 TRACE_ERROR("Can't determine interface idx!\n"); 866 exit(EXIT_FAILURE); 867 } else { 868 ret = FreeAddress(ap[nif], &addr); 869 } 870 } 871 if (ret < 0) { 872 TRACE_ERROR("(NEVER HAPPEN) Failed to free address.\n"); 873 } 874 } 875 876 #ifdef NETSTAT 877 #if NETSTAT_PERTHREAD 878 TRACE_STREAM("Destroyed. Remaining flows: %u\n", mtcp->flow_cnt); 879 #endif /* NETSTAT_PERTHREAD */ 880 #endif /* NETSTAT */ 881 882 } 883 /*---------------------------------------------------------------------------*/ 884 void 885 DestroyTCPStream(mtcp_manager_t mtcp, tcp_stream *stream) 886 { 887 tcp_stream *pair_stream = stream->pair_stream; 888 889 DestroySingleTCPStream(mtcp, stream); 890 891 if (pair_stream) 892 DestroySingleTCPStream(mtcp, pair_stream); 893 } 894 /*---------------------------------------------------------------------------*/ 895 void 896 DumpStream(mtcp_manager_t mtcp, tcp_stream *stream) 897 { 898 uint8_t *sa, *da; 899 struct tcp_send_vars *sndvar = stream->sndvar; 900 struct tcp_recv_vars *rcvvar = stream->rcvvar; 901 902 sa = (uint8_t *)&stream->saddr; 903 da = (uint8_t *)&stream->daddr; 904 thread_printf(mtcp, mtcp->log_fp, "========== Stream %u: " 905 "%u.%u.%u.%u(%u) -> %u.%u.%u.%u(%u) ==========\n", stream->id, 906 sa[0], sa[1], sa[2], sa[3], ntohs(stream->sport), 907 da[0], da[1], da[2], da[3], ntohs(stream->dport)); 908 thread_printf(mtcp, mtcp->log_fp, 909 "Stream id: %u, type: %u, state: %s, close_reason: %s\n", 910 stream->id, stream->stream_type, 911 TCPStateToString(stream), close_reason_str[stream->close_reason]); 912 if (stream->socket) { 913 socket_map_t socket = stream->socket; 914 thread_printf(mtcp, mtcp->log_fp, "Socket id: %d, type: %d, opts: %u\n" 915 "epoll: %u (IN: %u, OUT: %u, ERR: %u, RDHUP: %u, ET: %u)\n" 916 "events: %u (IN: %u, OUT: %u, ERR: %u, RDHUP: %u, ET: %u)\n", 917 socket->id, socket->socktype, socket->opts, 918 socket->epoll, socket->epoll & MOS_EPOLLIN, 919 socket->epoll & MOS_EPOLLOUT, socket->epoll & MOS_EPOLLERR, 920 socket->epoll & MOS_EPOLLRDHUP, socket->epoll & MOS_EPOLLET, 921 socket->events, socket->events & MOS_EPOLLIN, 922 socket->events & MOS_EPOLLOUT, socket->events & MOS_EPOLLERR, 923 socket->events & MOS_EPOLLRDHUP, socket->events & MOS_EPOLLET); 924 } else { 925 thread_printf(mtcp, mtcp->log_fp, "Socket: (null)\n"); 926 } 927 928 thread_printf(mtcp, mtcp->log_fp, 929 "on_hash_table: %u, on_control_list: %u (wait: %u), on_send_list: %u, " 930 "on_ack_list: %u, is_wack: %u, ack_cnt: %u\n" 931 "on_rto_idx: %d, on_timewait_list: %u, on_timeout_list: %u, " 932 "on_rcv_br_list: %u, on_snd_br_list: %u\n" 933 "on_sendq: %u, on_ackq: %u, closed: %u, on_closeq: %u, " 934 "on_closeq_int: %u, on_resetq: %u, on_resetq_int: %u\n" 935 "have_reset: %u, is_fin_sent: %u, is_fin_ackd: %u, " 936 "saw_timestamp: %u, sack_permit: %u, " 937 "is_bound_addr: %u, need_wnd_adv: %u\n", stream->on_hash_table, 938 sndvar->on_control_list, stream->control_list_waiting, sndvar->on_send_list, 939 sndvar->on_ack_list, sndvar->is_wack, sndvar->ack_cnt, 940 stream->on_rto_idx, stream->on_timewait_list, stream->on_timeout_list, 941 stream->on_rcv_br_list, stream->on_snd_br_list, 942 sndvar->on_sendq, sndvar->on_ackq, 943 stream->closed, sndvar->on_closeq, sndvar->on_closeq_int, 944 sndvar->on_resetq, sndvar->on_resetq_int, 945 stream->have_reset, sndvar->is_fin_sent, 946 sndvar->is_fin_ackd, stream->saw_timestamp, stream->sack_permit, 947 stream->is_bound_addr, stream->need_wnd_adv); 948 949 thread_printf(mtcp, mtcp->log_fp, "========== Send variables ==========\n"); 950 thread_printf(mtcp, mtcp->log_fp, 951 "ip_id: %u, mss: %u, eff_mss: %u, wscale(me, peer): (%u, %u), nif_out: %d\n", 952 sndvar->ip_id, sndvar->mss, sndvar->eff_mss, 953 sndvar->wscale_mine, sndvar->wscale_peer, sndvar->nif_out); 954 thread_printf(mtcp, mtcp->log_fp, 955 "snd_nxt: %u, snd_una: %u, iss: %u, fss: %u\nsnd_wnd: %u, " 956 "peer_wnd: %u, cwnd: %u, ssthresh: %u\n", 957 stream->snd_nxt, sndvar->snd_una, sndvar->iss, sndvar->fss, 958 sndvar->snd_wnd, sndvar->peer_wnd, sndvar->cwnd, sndvar->ssthresh); 959 960 if (sndvar->sndbuf) { 961 thread_printf(mtcp, mtcp->log_fp, 962 "Send buffer: init_seq: %u, head_seq: %u, " 963 "len: %d, cum_len: %lu, size: %d\n", 964 sndvar->sndbuf->init_seq, sndvar->sndbuf->head_seq, 965 sndvar->sndbuf->len, sndvar->sndbuf->cum_len, sndvar->sndbuf->size); 966 } else { 967 thread_printf(mtcp, mtcp->log_fp, "Send buffer: (null)\n"); 968 } 969 thread_printf(mtcp, mtcp->log_fp, 970 "nrtx: %u, max_nrtx: %u, rto: %u, ts_rto: %u, " 971 "ts_lastack_sent: %u\n", sndvar->nrtx, sndvar->max_nrtx, 972 sndvar->rto, sndvar->ts_rto, sndvar->ts_lastack_sent); 973 974 thread_printf(mtcp, mtcp->log_fp, 975 "========== Receive variables ==========\n"); 976 thread_printf(mtcp, mtcp->log_fp, 977 "rcv_nxt: %u, irs: %u, rcv_wnd: %u, " 978 "snd_wl1: %u, snd_wl2: %u\n", 979 stream->rcv_nxt, rcvvar->irs, 980 rcvvar->rcv_wnd, rcvvar->snd_wl1, rcvvar->snd_wl2); 981 if (!rcvvar->rcvbuf) { 982 thread_printf(mtcp, mtcp->log_fp, "Receive buffer: (null)\n"); 983 } 984 thread_printf(mtcp, mtcp->log_fp, "last_ack_seq: %u, dup_acks: %u\n", 985 rcvvar->last_ack_seq, rcvvar->dup_acks); 986 thread_printf(mtcp, mtcp->log_fp, 987 "ts_recent: %u, ts_lastack_rcvd: %u, ts_last_ts_upd: %u, " 988 "ts_tw_expire: %u\n", rcvvar->ts_recent, rcvvar->ts_lastack_rcvd, 989 rcvvar->ts_last_ts_upd, rcvvar->ts_tw_expire); 990 thread_printf(mtcp, mtcp->log_fp, 991 "srtt: %u, mdev: %u, mdev_max: %u, rttvar: %u, rtt_seq: %u\n", 992 rcvvar->srtt, rcvvar->mdev, rcvvar->mdev_max, 993 rcvvar->rttvar, rcvvar->rtt_seq); 994 } 995 /*---------------------------------------------------------------------------*/ 996