1 #include "debug.h" 2 #include <string.h> 3 4 #include "config.h" 5 #include "tcp_stream.h" 6 #include "fhash.h" 7 #include "tcp.h" 8 #include "tcp_in.h" 9 #include "tcp_out.h" 10 #include "tcp_ring_buffer.h" 11 #include "tcp_send_buffer.h" 12 #include "eventpoll.h" 13 #include "ip_out.h" 14 #include "timer.h" 15 #include "tcp_rb.h" 16 /*---------------------------------------------------------------------------*/ 17 char *state_str[] = { 18 "TCP_ST_CLOSED", 19 "TCP_ST_LISTEN", 20 "TCP_ST_SYN_SENT", 21 "TCP_ST_SYN_RCVD", 22 "TCP_ST_ESTABILSHED", 23 "TCP_ST_FIN_WAIT_1", 24 "TCP_ST_FIN_WAIT_2", 25 "TCP_ST_CLOSE_WAIT", 26 "TCP_ST_CLOSING", 27 "TCP_ST_LAST_ACK", 28 "TCP_ST_TIME_WAIT", 29 "TCP_ST_CLOSED_RSVD" 30 }; 31 /*---------------------------------------------------------------------------*/ 32 char *close_reason_str[] = { 33 "NOT_CLOSED", 34 "CLOSE", 35 "CLOSED", 36 "CONN_FAIL", 37 "CONN_LOST", 38 "RESET", 39 "NO_MEM", 40 "DENIED", 41 "TIMEDOUT" 42 }; 43 /*---------------------------------------------------------------------------*/ 44 static __thread unsigned long next = 1; 45 /* Function retrieved from POSIX.1-2001 standard */ 46 /* RAND_MAX assumed to be 32767 */ 47 static int 48 posix_seq_rand(void) { 49 next = next * 1103515245 + 12345; 50 return ((unsigned)(next/65536) % 32768); 51 } 52 /*---------------------------------------------------------------------------*/ 53 void 54 posix_seq_srand(unsigned seed) { 55 next = seed % 32768; 56 } 57 /*---------------------------------------------------------------------------*/ 58 uint32_t 59 FetchSeqDrift(struct tcp_stream *stream, uint32_t seq) 60 { 61 int i = 0; 62 uint8_t flag = 1; 63 int count; 64 65 i = stream->sndvar->sre_index - 1; 66 if (i == -1) i = SRE_MAX - 1; 67 count = 0; 68 69 while (flag) { 70 if (stream->sndvar->sre[i].seq_base == 0) 71 return 0; 72 else if (seq >= stream->sndvar->sre[i].seq_base) 73 return stream->sndvar->sre[i].seq_off; 74 75 i--; 76 if (i == -1) i = SRE_MAX - 1; 77 count++; 78 if (count == SRE_MAX) 79 flag = 1; 80 } 81 82 return 0; 83 } 84 /*---------------------------------------------------------------------------*/ 85 int 86 TcpSeqChange(socket_map_t socket, uint32_t seq_drift, int side, uint32_t seqno) 87 { 88 struct tcp_stream *mstrm, *stream; 89 90 if (side != MOS_SIDE_CLI && side != MOS_SIDE_SVR) { 91 TRACE_ERROR("Invalid side requested!\n"); 92 errno = EINVAL; 93 return -1; 94 } 95 96 mstrm = socket->monitor_stream->stream; 97 stream = (side == mstrm->side) ? mstrm : mstrm->pair_stream; 98 if (stream == NULL) { 99 TRACE_ERROR("Stream pointer for sockid: %u not found!\n", 100 socket->id); 101 errno = EBADF; 102 return -1; 103 } 104 105 stream->sndvar->sre[stream->sndvar->sre_index].seq_off = seq_drift; 106 stream->sndvar->sre[stream->sndvar->sre_index].seq_off += 107 (stream->sndvar->sre_index == 0) ? stream->sndvar->sre[SRE_MAX - 1].seq_off : 108 stream->sndvar->sre[stream->sndvar->sre_index - 1].seq_off; 109 stream->sndvar->sre[stream->sndvar->sre_index].seq_base = seqno; 110 stream->sndvar->sre_index = (stream->sndvar->sre_index + 1) & (SRE_MAX - 1); 111 112 return 0; 113 } 114 /*---------------------------------------------------------------------------*/ 115 /** 116 * FYI: This is NOT a read-only return! 117 */ 118 int 119 GetFragInfo(socket_map_t sock, int side, void *optval, socklen_t *len) 120 { 121 struct tcp_stream *stream; 122 123 stream = NULL; 124 if (!*len || ( *len % sizeof(tcpfrag_t) != 0)) 125 goto frag_info_error; 126 127 if (side != MOS_SIDE_CLI && side != MOS_SIDE_SVR) { 128 TRACE_ERROR("Invalid side requested!\n"); 129 exit(EXIT_FAILURE); 130 return -1; 131 } 132 133 struct tcp_stream *mstrm = sock->monitor_stream->stream; 134 stream = (side == mstrm->side) ? mstrm : mstrm->pair_stream; 135 136 if (stream == NULL) goto frag_info_error; 137 138 /* First check if the tcp ring buffer even has anything */ 139 if (stream->rcvvar != NULL && 140 stream->rcvvar->rcvbuf != NULL) { 141 tcprb_t *rcvbuf = stream->rcvvar->rcvbuf; 142 struct tcp_ring_fragment *out = (struct tcp_ring_fragment *)optval; 143 int const maxout = *len; 144 *len = 0; 145 struct _tcpfrag_t *walk; 146 TAILQ_FOREACH(walk, &rcvbuf->frags, link) { 147 if (*len == maxout) 148 break; 149 out[*len].offset = walk->head; 150 out[*len].len = walk->tail - walk->head; 151 (*len)++; 152 } 153 if (*len != maxout) { 154 /* set zero sentinel */ 155 out[*len].offset = 0; 156 out[*len].len = 0; 157 } 158 } else 159 goto frag_info_error; 160 161 return 0; 162 163 frag_info_error: 164 optval = NULL; 165 *len = 0; 166 return -1; 167 } 168 /*---------------------------------------------------------------------------*/ 169 /** 170 * Comments later... 171 */ 172 int 173 GetBufInfo(socket_map_t sock, int side, void *optval, socklen_t *len) 174 { 175 struct tcp_stream *stream; 176 struct tcp_buf_info *tbi; 177 178 tbi = (struct tcp_buf_info *)optval; 179 memset(tbi, 0, sizeof(struct tcp_buf_info)); 180 stream = NULL; 181 182 if (*len != sizeof(struct tcp_buf_info)) { 183 errno = EINVAL; 184 goto buf_info_error; 185 } 186 187 if (side != MOS_SIDE_CLI && side != MOS_SIDE_SVR) { 188 TRACE_ERROR("Invalid side requested!\n"); 189 errno = EINVAL; 190 goto buf_info_error; 191 } 192 193 struct tcp_stream *mstrm = sock->monitor_stream->stream; 194 stream = (side == mstrm->side) ? mstrm : mstrm->pair_stream; 195 196 /* First check if the tcp ring buffer even has anything */ 197 if (stream != NULL && 198 stream->rcvvar != NULL && 199 stream->rcvvar->rcvbuf != NULL) { 200 tcprb_t *rcvbuf = stream->rcvvar->rcvbuf; 201 tcpfrag_t *f = TAILQ_LAST(&rcvbuf->frags, flist); 202 tbi->tcpbi_init_seq = stream->rcvvar->irs + 1; 203 tbi->tcpbi_last_byte_read = rcvbuf->pile; 204 tbi->tcpbi_next_byte_expected = rcvbuf->pile + tcprb_cflen(rcvbuf); 205 tbi->tcpbi_last_byte_received = (f ? f->tail : rcvbuf->head); 206 } else { 207 errno = ENODATA; 208 goto buf_info_error; 209 } 210 211 return 0; 212 213 buf_info_error: 214 optval = NULL; 215 *len = 0; 216 return -1; 217 } 218 /*---------------------------------------------------------------------------*/ 219 int 220 DisableBuf(socket_map_t sock, int side) 221 { 222 #ifdef DBGMSG 223 __PREPARE_DBGLOGGING(); 224 #endif 225 struct tcp_stream *stream; 226 int rc = 0; 227 228 switch (sock->socktype) { 229 case MOS_SOCK_MONITOR_STREAM: 230 if (side == MOS_SIDE_CLI) 231 sock->monitor_listener->client_buf_mgmt = 0; 232 else if (side == MOS_SIDE_SVR) 233 sock->monitor_listener->server_buf_mgmt = 0; 234 else { 235 assert(0); 236 TRACE_DBG("Invalid side!\n"); 237 rc = -1; 238 } 239 break; 240 case MOS_SOCK_MONITOR_STREAM_ACTIVE: 241 stream = sock->monitor_stream->stream; 242 if (stream->side != side) 243 stream = stream->pair_stream; 244 assert(stream->side == side); 245 stream->buffer_mgmt = 0; 246 break; 247 default: 248 assert(0); 249 TRACE_DBG("Can't disable buf for invalid socket!\n"); 250 rc = -1; 251 } 252 253 return rc; 254 } 255 /*---------------------------------------------------------------------------*/ 256 int 257 GetLastTimestamp(struct tcp_stream *stream, uint32_t *usecs, socklen_t *len) 258 { 259 #ifdef DBGMSG 260 __PREPARE_DBGLOGGING(); 261 #endif 262 if (*len < sizeof(uint32_t)) { 263 TRACE_DBG("Size passed is not >= sizeof(uint32_t)!\n"); 264 return -1; 265 } 266 267 *usecs = (stream->last_active_ts > 268 stream->pair_stream->last_active_ts) 269 ? 270 TS_TO_USEC(stream->last_active_ts) : 271 TS_TO_USEC(stream->pair_stream->last_active_ts); 272 273 return 0; 274 } 275 /*---------------------------------------------------------------------------*/ 276 inline int 277 GetTCPState(struct tcp_stream *stream, int side, 278 void *optval, socklen_t *optlen) 279 { 280 if (!stream || !(stream = (side == stream->side) ? stream : stream->pair_stream)) 281 return -1; 282 *(int *)optval = (int)((stream->state == TCP_ST_CLOSED_RSVD) ? 283 TCP_ST_CLOSED : stream->state); 284 return 0; 285 } 286 /*---------------------------------------------------------------------------*/ 287 inline char * 288 TCPStateToString(const tcp_stream *stream) 289 { 290 return (stream) ? state_str[stream->state] : NULL; 291 } 292 /*---------------------------------------------------------------------------*/ 293 inline void 294 RaiseReadEvent(mtcp_manager_t mtcp, tcp_stream *stream) 295 { 296 struct tcp_recv_vars *rcvvar; 297 298 rcvvar = stream->rcvvar; 299 300 if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) { 301 if (stream->socket && (stream->socket->epoll & MOS_EPOLLIN)) 302 AddEpollEvent(mtcp->ep, MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLIN); 303 } else if (rcvvar->rcvbuf && tcprb_cflen(rcvvar->rcvbuf) > 0) { 304 /* 305 * in case it is a monitoring socket, queue up the read events 306 * in the event_queue of only if the tcp_stream hasn't already 307 * been registered in the event queue 308 */ 309 int index; 310 struct event_queue *eq; 311 struct socket_map *walk; 312 313 SOCKQ_FOREACH_START(walk, &stream->msocks) { 314 assert(walk->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE); 315 eq = walk->monitor_stream->monitor_listener->eq; 316 317 /* if it already has read data register... then skip this step */ 318 if (stream->actions & MOS_ACT_READ_DATA) 319 return; 320 if (eq->num_events >= eq->size) { 321 TRACE_ERROR("Exceeded epoll event queue! num_events: %d, " 322 "size: %d\n", eq->num_events, eq->size); 323 return; 324 } 325 326 index = eq->end++; 327 eq->events[index].ev.events = MOS_EPOLLIN; 328 eq->events[index].ev.data.ptr = (void *)stream; 329 330 if (eq->end >= eq->size) { 331 eq->end = 0; 332 } 333 eq->num_events++; 334 stream->actions |= MOS_ACT_READ_DATA; 335 } SOCKQ_FOREACH_END; 336 } else { 337 TRACE_EPOLL("Stream %d: Raising read without a socket!\n", stream->id); 338 } 339 } 340 /*---------------------------------------------------------------------------*/ 341 inline void 342 RaiseWriteEvent(mtcp_manager_t mtcp, tcp_stream *stream) 343 { 344 if (stream->socket) { 345 if (stream->socket->epoll & MOS_EPOLLOUT) { 346 AddEpollEvent(mtcp->ep, 347 MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLOUT); 348 } 349 } else { 350 TRACE_EPOLL("Stream %d: Raising write without a socket!\n", stream->id); 351 } 352 } 353 /*---------------------------------------------------------------------------*/ 354 inline void 355 RaiseCloseEvent(mtcp_manager_t mtcp, tcp_stream *stream) 356 { 357 if (stream->socket) { 358 if (stream->socket->epoll & MOS_EPOLLRDHUP) { 359 AddEpollEvent(mtcp->ep, 360 MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLRDHUP); 361 } else if (stream->socket->epoll & MOS_EPOLLIN) { 362 AddEpollEvent(mtcp->ep, 363 MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLIN); 364 } 365 } else { 366 TRACE_EPOLL("Stream %d: Raising close without a socket!\n", stream->id); 367 } 368 } 369 /*---------------------------------------------------------------------------*/ 370 inline int 371 RaiseErrorEvent(mtcp_manager_t mtcp, tcp_stream *stream) 372 { 373 if (stream->socket) { 374 if (stream->socket->epoll & MOS_EPOLLERR) { 375 /* passing closing reason for error notification */ 376 return AddEpollEvent(mtcp->ep, 377 MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLERR); 378 } 379 } else { 380 TRACE_EPOLL("Stream %d: Raising error without a socket!\n", stream->id); 381 } 382 return -1; 383 } 384 /*----------------------------------------------------------------------------*/ 385 int 386 AddMonitorStreamSockets(mtcp_manager_t mtcp, struct tcp_stream *stream) 387 { 388 struct mtcp_context mctx; 389 int socktype; 390 391 mctx.cpu = mtcp->ctx->cpu; 392 struct mon_listener *walk; 393 394 // traverse the passive socket's list 395 TAILQ_FOREACH(walk, &mtcp->monitors, link) { 396 socktype = walk->socket->socktype; 397 398 if (socktype != MOS_SOCK_MONITOR_STREAM) 399 continue; 400 401 /* mtcp_bind_monitor_filter() 402 * - create an monitor active socket only for the filter-passed flows 403 * - we use the result (= tag) from DetectStreamType() to avoid 404 * evaluating the same BPF filter twice */ 405 if (!walk->is_stream_syn_filter_hit) { 406 continue; 407 } 408 409 struct socket_map *s = 410 AllocateSocket(&mctx, MOS_SOCK_MONITOR_STREAM_ACTIVE); 411 if (!s) 412 return -1; 413 414 s->monitor_stream->socket = s; 415 s->monitor_stream->stream = stream; 416 s->monitor_stream->monitor_listener = walk; 417 s->monitor_stream->client_buf_mgmt = walk->client_buf_mgmt; 418 s->monitor_stream->server_buf_mgmt = walk->server_buf_mgmt; 419 s->monitor_stream->client_mon = walk->client_mon; 420 s->monitor_stream->server_mon = walk->server_mon; 421 #ifdef NEWEV 422 s->monitor_stream->stree_dontcare = 423 s->monitor_stream->monitor_listener->stree_dontcare; 424 s->monitor_stream->stree_pre_rcv = 425 s->monitor_stream->monitor_listener->stree_pre_rcv; 426 s->monitor_stream->stree_post_snd = 427 s->monitor_stream->monitor_listener->stree_post_snd; 428 if (s->monitor_stream->stree_dontcare) 429 stree_inc_ref(s->monitor_stream->stree_dontcare); 430 if (s->monitor_stream->stree_pre_rcv) 431 stree_inc_ref(s->monitor_stream->stree_pre_rcv); 432 if (s->monitor_stream->stree_post_snd) 433 stree_inc_ref(s->monitor_stream->stree_post_snd); 434 #else 435 InitEvP(&s->monitor_stream->dontcare_evp, 436 &walk->dontcare_evb); 437 InitEvP(&s->monitor_stream->pre_tcp_evp, 438 &walk->pre_tcp_evb); 439 InitEvP(&s->monitor_stream->post_tcp_evp, 440 &walk->post_tcp_evb); 441 #endif 442 443 SOCKQ_INSERT_TAIL(&stream->msocks, s); 444 } 445 446 return 0; 447 } 448 /*----------------------------------------------------------------------------*/ 449 int 450 DestroyMonitorStreamSocket(mtcp_manager_t mtcp, socket_map_t msock) 451 { 452 struct mtcp_context mctx; 453 int socktype, sockid, rc; 454 455 if (msock == NULL) { 456 TRACE_DBG("Stream socket does not exist!\n"); 457 /* exit(-1); */ 458 return 0; 459 } 460 461 rc = 0; 462 mctx.cpu = mtcp->ctx->cpu; 463 socktype = msock->socktype; 464 sockid = msock->id; 465 466 switch (socktype) { 467 case MOS_SOCK_MONITOR_STREAM_ACTIVE: 468 FreeSocket(&mctx, sockid, socktype); 469 break; 470 case MOS_SOCK_MONITOR_RAW: 471 /* do nothing since all raw sockets point to the same socket */ 472 break; 473 default: 474 TRACE_DBG("Trying to destroy a monitor socket for an unsupported type!\n"); 475 rc = -1; 476 /* exit(-1); */ 477 break; 478 } 479 480 return rc; 481 } 482 /*---------------------------------------------------------------------------*/ 483 tcp_stream * 484 CreateTCPStream(mtcp_manager_t mtcp, socket_map_t socket, int type, 485 uint32_t saddr, uint16_t sport, uint32_t daddr, uint16_t dport, 486 unsigned int *hash) 487 { 488 tcp_stream *stream = NULL; 489 int ret; 490 /* stand-alone monitor does not need this since it is single-threaded */ 491 bool flow_lock = type & STREAM_TYPE(MOS_SOCK_STREAM); 492 //bool flow_lock = false; 493 494 if (flow_lock) 495 pthread_mutex_lock(&mtcp->ctx->flow_pool_lock); 496 497 stream = (tcp_stream *)MPAllocateChunk(mtcp->flow_pool); 498 if (!stream) { 499 TRACE_ERROR("Cannot allocate memory for the stream. " 500 "g_config.mos->max_concurrency: %d, concurrent: %u\n", 501 g_config.mos->max_concurrency, mtcp->flow_cnt); 502 if (flow_lock) 503 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock); 504 return NULL; 505 } 506 memset(stream, 0, sizeof(tcp_stream)); 507 508 stream->rcvvar = (struct tcp_recv_vars *)MPAllocateChunk(mtcp->rv_pool); 509 if (!stream->rcvvar) { 510 MPFreeChunk(mtcp->flow_pool, stream); 511 if (flow_lock) 512 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock); 513 return NULL; 514 } 515 memset(stream->rcvvar, 0, sizeof(struct tcp_recv_vars)); 516 517 /* stand-alone monitor does not need to do this */ 518 stream->sndvar = (struct tcp_send_vars *)MPAllocateChunk(mtcp->sv_pool); 519 if (!stream->sndvar) { 520 MPFreeChunk(mtcp->rv_pool, stream->rcvvar); 521 MPFreeChunk(mtcp->flow_pool, stream); 522 if (flow_lock) 523 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock); 524 return NULL; 525 } 526 //if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) 527 memset(stream->sndvar, 0, sizeof(struct tcp_send_vars)); 528 529 stream->id = mtcp->g_id++; 530 stream->saddr = saddr; 531 stream->sport = sport; 532 stream->daddr = daddr; 533 stream->dport = dport; 534 535 ret = HTInsert(mtcp->tcp_flow_table, stream, hash); 536 if (ret < 0) { 537 TRACE_ERROR("Stream %d: " 538 "Failed to insert the stream into hash table.\n", stream->id); 539 MPFreeChunk(mtcp->flow_pool, stream); 540 if (flow_lock) 541 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock); 542 return NULL; 543 } 544 stream->on_hash_table = TRUE; 545 mtcp->flow_cnt++; 546 547 SOCKQ_INIT(&stream->msocks); 548 549 /* 550 * if an embedded monitor is attached... 551 * create monitor stream socket now! 552 * If socket type is raw.. then don't create it 553 */ 554 if ((mtcp->num_msp > 0) && 555 (type & STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE))) 556 if (AddMonitorStreamSockets(mtcp, stream) < 0) 557 TRACE_DBG("Could not create monitor stream socket!\n"); 558 559 if (flow_lock) 560 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock); 561 562 if (socket) { 563 stream->socket = socket; 564 socket->stream = stream; 565 } 566 567 stream->stream_type = type; 568 stream->state = TCP_ST_LISTEN; 569 /* This is handled by core.c, tcp_in.c & tcp_out.c */ 570 /* stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; */ 571 572 stream->on_rto_idx = -1; 573 574 /* stand-alone monitor does not need to do this */ 575 stream->sndvar->mss = TCP_DEFAULT_MSS; 576 stream->sndvar->wscale_mine = TCP_DEFAULT_WSCALE; 577 stream->sndvar->wscale_peer = 0; 578 579 if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) { 580 stream->sndvar->ip_id = 0; 581 stream->sndvar->nif_out = GetOutputInterface(stream->daddr); 582 583 stream->sndvar->iss = posix_seq_rand() % TCP_MAX_SEQ; 584 //stream->sndvar->iss = 0; 585 stream->snd_nxt = stream->sndvar->iss; 586 stream->sndvar->snd_una = stream->sndvar->iss; 587 stream->sndvar->snd_wnd = g_config.mos->wmem_size; 588 stream->sndvar->rto = TCP_INITIAL_RTO; 589 #if USE_SPIN_LOCK 590 if (pthread_spin_init(&stream->sndvar->write_lock, PTHREAD_PROCESS_PRIVATE)) { 591 perror("pthread_spin_init of write_lock"); 592 pthread_spin_destroy(&stream->rcvvar->read_lock); 593 #else 594 if (pthread_mutex_init(&stream->sndvar->write_lock, NULL)) { 595 perror("pthread_mutex_init of write_lock"); 596 pthread_mutex_destroy(&stream->rcvvar->read_lock); 597 #endif 598 return NULL; 599 } 600 } 601 stream->rcvvar->irs = 0; 602 603 stream->rcv_nxt = 0; 604 stream->rcvvar->rcv_wnd = TCP_INITIAL_WINDOW; 605 606 stream->rcvvar->snd_wl1 = stream->rcvvar->irs - 1; 607 608 stream->buffer_mgmt = BUFMGMT_FULL; 609 610 /* needs state update by default */ 611 stream->status_mgmt = 1; 612 613 #if USE_SPIN_LOCK 614 if (pthread_spin_init(&stream->rcvvar->read_lock, PTHREAD_PROCESS_PRIVATE)) { 615 #else 616 if (pthread_mutex_init(&stream->rcvvar->read_lock, NULL)) { 617 #endif 618 perror("pthread_mutex_init of read_lock"); 619 return NULL; 620 } 621 622 #ifdef STREAM 623 uint8_t *sa; 624 uint8_t *da; 625 626 sa = (uint8_t *)&stream->saddr; 627 da = (uint8_t *)&stream->daddr; 628 TRACE_STREAM("CREATED NEW TCP STREAM %d: " 629 "%u.%u.%u.%u(%d) -> %u.%u.%u.%u(%d) (ISS: %u)\n", stream->id, 630 sa[0], sa[1], sa[2], sa[3], ntohs(stream->sport), 631 da[0], da[1], da[2], da[3], ntohs(stream->dport), 632 stream->sndvar->iss); 633 #endif 634 635 return stream; 636 } 637 /*----------------------------------------------------------------------------*/ 638 inline tcp_stream * 639 CreateDualTCPStream(mtcp_manager_t mtcp, socket_map_t socket, int type, uint32_t saddr, 640 uint16_t sport, uint32_t daddr, uint16_t dport, unsigned int *hash) 641 { 642 tcp_stream *cur_stream, *paired_stream; 643 struct socket_map *walk; 644 645 cur_stream = CreateTCPStream(mtcp, socket, type, 646 saddr, sport, daddr, dport, hash); 647 if (cur_stream == NULL) { 648 TRACE_ERROR("Can't create tcp_stream!\n"); 649 return NULL; 650 } 651 652 paired_stream = CreateTCPStream(mtcp, NULL, MOS_SOCK_UNUSED, 653 daddr, dport, saddr, sport, hash); 654 if (paired_stream == NULL) { 655 DestroyTCPStream(mtcp, cur_stream); 656 TRACE_ERROR("Can't create tcp_stream!\n"); 657 return NULL; 658 } 659 660 cur_stream->pair_stream = paired_stream; 661 paired_stream->pair_stream = cur_stream; 662 paired_stream->socket = socket; 663 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 664 SOCKQ_INSERT_TAIL(&paired_stream->msocks, walk); 665 } SOCKQ_FOREACH_END; 666 paired_stream->stream_type = STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE); 667 668 return cur_stream; 669 } 670 /*----------------------------------------------------------------------------*/ 671 inline tcp_stream * 672 CreateClientTCPStream(mtcp_manager_t mtcp, socket_map_t socket, int type, 673 uint32_t saddr, uint16_t sport, uint32_t daddr, uint16_t dport, 674 unsigned int *hash) 675 { 676 tcp_stream *cs; 677 struct socket_map *w; 678 679 cs = CreateTCPStream(mtcp, socket, type, daddr, dport, saddr, sport, hash); 680 if (cs == NULL) { 681 TRACE_ERROR("Can't create tcp_stream!\n"); 682 return NULL; 683 } 684 685 cs->side = MOS_SIDE_CLI; 686 cs->pair_stream = NULL; 687 688 /* if buffer management is off, then disable 689 * monitoring tcp ring of either streams (only if stream 690 * is just monitor stream active) 691 */ 692 if (IS_STREAM_TYPE(cs, MOS_SOCK_MONITOR_STREAM_ACTIVE)) { 693 cs->buffer_mgmt = BUFMGMT_OFF; 694 SOCKQ_FOREACH_START(w, &cs->msocks) { 695 uint8_t bm = w->monitor_stream->client_buf_mgmt; 696 if (bm > cs->buffer_mgmt) 697 cs->buffer_mgmt = bm; 698 if (w->monitor_stream->monitor_listener->client_mon == 1) 699 cs->status_mgmt = 1; 700 } SOCKQ_FOREACH_END; 701 } 702 703 return cs; 704 } 705 /*----------------------------------------------------------------------------*/ 706 inline tcp_stream * 707 AttachServerTCPStream(mtcp_manager_t mtcp, tcp_stream *cs, int type, 708 uint32_t saddr, uint16_t sport, uint32_t daddr, uint16_t dport) 709 { 710 tcp_stream *ss; 711 struct socket_map *w; 712 713 /* The 3rd arg is a temp hackk... FIXIT! TODO: XXX */ 714 ss = CreateTCPStream(mtcp, NULL, MOS_SOCK_UNUSED, saddr, sport, daddr, dport, NULL); 715 if (ss == NULL) { 716 TRACE_ERROR("Can't create tcp_stream!\n"); 717 return NULL; 718 } 719 720 ss->side = MOS_SIDE_SVR; 721 cs->pair_stream = ss; 722 ss->pair_stream = cs; 723 ss->socket = cs->socket; 724 SOCKQ_FOREACH_START(w, &cs->msocks) { 725 SOCKQ_INSERT_TAIL(&ss->msocks, w); 726 } SOCKQ_FOREACH_END; 727 ss->stream_type = STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE); 728 729 if (IS_STREAM_TYPE(ss, MOS_SOCK_MONITOR_STREAM_ACTIVE)) { 730 ss->buffer_mgmt = BUFMGMT_OFF; 731 SOCKQ_FOREACH_START(w, &ss->msocks) { 732 uint8_t bm = w->monitor_stream->server_buf_mgmt; 733 if (bm > ss->buffer_mgmt) 734 ss->buffer_mgmt = bm; 735 if (w->monitor_stream->monitor_listener->server_mon == 1) 736 ss->status_mgmt = 1; 737 } SOCKQ_FOREACH_END; 738 } 739 740 return ss; 741 } 742 /*---------------------------------------------------------------------------*/ 743 static void 744 DestroySingleTCPStream(mtcp_manager_t mtcp, tcp_stream *stream) 745 { 746 struct sockaddr_in addr; 747 int bound_addr = FALSE; 748 int ret; 749 /* stand-alone monitor does not need this since it is single-threaded */ 750 bool flow_lock = HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM); 751 752 struct socket_map *walk; 753 754 /* Set the stream state as CLOSED */ 755 stream->state = TCP_ST_CLOSED_RSVD; 756 757 SOCKQ_FOREACH_START(walk, &stream->msocks) { 758 HandleCallback(mtcp, MOS_HK_RCV, walk, stream->side, NULL, 759 MOS_ON_CONN_END | MOS_ON_TCP_STATE_CHANGE | stream->cb_events); 760 HandleCallback(mtcp, MOS_HK_SND, walk, stream->side, NULL, 761 MOS_ON_CONN_END | MOS_ON_TCP_STATE_CHANGE | stream->cb_events); 762 } SOCKQ_FOREACH_END; 763 764 #if 0 765 #ifdef DUMP_STREAM 766 if (stream->close_reason != TCP_ACTIVE_CLOSE && 767 stream->close_reason != TCP_PASSIVE_CLOSE) { 768 thread_printf(mtcp, mtcp->log_fp, 769 "Stream %d abnormally closed.\n", stream->id); 770 DumpStream(mtcp, stream); 771 DumpControlList(mtcp, mtcp->n_sender[0]); 772 } 773 #endif 774 775 #ifdef STREAM 776 uint8_t *sa, *da; 777 sa = (uint8_t *)&stream->saddr; 778 da = (uint8_t *)&stream->daddr; 779 TRACE_STREAM("DESTROY TCP STREAM %d: " 780 "%u.%u.%u.%u(%d) -> %u.%u.%u.%u(%d) (%s)\n", stream->id, 781 sa[0], sa[1], sa[2], sa[3], ntohs(stream->sport), 782 da[0], da[1], da[2], da[3], ntohs(stream->dport), 783 close_reason_str[stream->close_reason]); 784 #endif 785 786 if (stream->sndvar->sndbuf) { 787 TRACE_FSTAT("Stream %d: send buffer " 788 "cum_len: %lu, len: %u\n", stream->id, 789 stream->sndvar->sndbuf->cum_len, 790 stream->sndvar->sndbuf->len); 791 } 792 if (stream->rcvvar->rcvbuf) { 793 TRACE_FSTAT("Stream %d: recv buffer " 794 "cum_len: %lu, merged_len: %u, last_len: %u\n", stream->id, 795 stream->rcvvar->rcvbuf->cum_len, 796 stream->rcvvar->rcvbuf->merged_len, 797 stream->rcvvar->rcvbuf->last_len); 798 } 799 800 #if RTM_STAT 801 /* Triple duplicated ack stats */ 802 if (stream->sndvar->rstat.tdp_ack_cnt) { 803 TRACE_FSTAT("Stream %d: triple duplicated ack: %u, " 804 "retransmission bytes: %u, average rtm bytes/ack: %u\n", 805 stream->id, 806 stream->sndvar->rstat.tdp_ack_cnt, stream->sndvar->rstat.tdp_ack_bytes, 807 stream->sndvar->rstat.tdp_ack_bytes / stream->sndvar->rstat.tdp_ack_cnt); 808 } 809 810 /* Retransmission timeout stats */ 811 if (stream->sndvar->rstat.rto_cnt > 0) { 812 TRACE_FSTAT("Stream %d: timeout count: %u, bytes: %u\n", stream->id, 813 stream->sndvar->rstat.rto_cnt, stream->sndvar->rstat.rto_bytes); 814 } 815 816 /* Recovery stats */ 817 if (stream->sndvar->rstat.ack_upd_cnt) { 818 TRACE_FSTAT("Stream %d: snd_nxt update count: %u, " 819 "snd_nxt update bytes: %u, average update bytes/update: %u\n", 820 stream->id, 821 stream->sndvar->rstat.ack_upd_cnt, stream->sndvar->rstat.ack_upd_bytes, 822 stream->sndvar->rstat.ack_upd_bytes / stream->sndvar->rstat.ack_upd_cnt); 823 } 824 #if TCP_OPT_SACK_ENABLED 825 if (stream->sndvar->rstat.sack_cnt) { 826 TRACE_FSTAT("Selective ack count: %u, bytes: %u, " 827 "average bytes/ack: %u\n", 828 stream->sndvar->rstat.sack_cnt, stream->sndvar->rstat.sack_bytes, 829 stream->sndvar->rstat.sack_bytes / stream->sndvar->rstat.sack_cnt); 830 } else { 831 TRACE_FSTAT("Selective ack count: %u, bytes: %u\n", 832 stream->sndvar->rstat.sack_cnt, stream->sndvar->rstat.sack_bytes); 833 } 834 if (stream->sndvar->rstat.tdp_sack_cnt) { 835 TRACE_FSTAT("Selective tdp ack count: %u, bytes: %u, " 836 "average bytes/ack: %u\n", 837 stream->sndvar->rstat.tdp_sack_cnt, stream->sndvar->rstat.tdp_sack_bytes, 838 stream->sndvar->rstat.tdp_sack_bytes / stream->sndvar->rstat.tdp_sack_cnt); 839 } else { 840 TRACE_FSTAT("Selective ack count: %u, bytes: %u\n", 841 stream->sndvar->rstat.tdp_sack_cnt, stream->sndvar->rstat.tdp_sack_bytes); 842 } 843 #endif /* TCP_OPT_SACK_ENABLED */ 844 #endif /* RTM_STAT */ 845 #endif 846 847 if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) { 848 /* stand-alone monitor does not need to do these */ 849 if (stream->is_bound_addr) { 850 bound_addr = TRUE; 851 addr.sin_addr.s_addr = stream->saddr; 852 addr.sin_port = stream->sport; 853 } 854 855 RemoveFromControlList(mtcp, stream); 856 RemoveFromSendList(mtcp, stream); 857 RemoveFromACKList(mtcp, stream); 858 859 if (stream->on_rto_idx >= 0) 860 RemoveFromRTOList(mtcp, stream); 861 862 SBUF_LOCK_DESTROY(&stream->rcvvar->read_lock); 863 SBUF_LOCK_DESTROY(&stream->sndvar->write_lock); 864 865 assert(stream->on_hash_table == TRUE); 866 867 /* free ring buffers */ 868 if (stream->sndvar->sndbuf) { 869 SBFree(mtcp->rbm_snd, stream->sndvar->sndbuf); 870 stream->sndvar->sndbuf = NULL; 871 } 872 } 873 874 if (stream->on_timewait_list) 875 RemoveFromTimewaitList(mtcp, stream); 876 877 if (g_config.mos->tcp_timeout > 0) 878 RemoveFromTimeoutList(mtcp, stream); 879 880 if (stream->rcvvar->rcvbuf) { 881 tcprb_del(stream->rcvvar->rcvbuf); 882 stream->rcvvar->rcvbuf = NULL; 883 } 884 885 if (flow_lock) 886 pthread_mutex_lock(&mtcp->ctx->flow_pool_lock); 887 888 /* remove from flow hash table */ 889 HTRemove(mtcp->tcp_flow_table, stream); 890 stream->on_hash_table = FALSE; 891 892 mtcp->flow_cnt--; 893 894 /* if there was a corresponding monitor stream socket opened 895 * then close it */ 896 SOCKQ_FOREACH_START(walk, &stream->msocks) { 897 SOCKQ_REMOVE(&stream->msocks, walk); 898 if (stream->pair_stream == NULL) 899 DestroyMonitorStreamSocket(mtcp, walk); 900 } SOCKQ_FOREACH_END; 901 902 if (stream->pair_stream != NULL) { 903 /* Nullify pointer to sibliing tcp_stream's pair_stream */ 904 stream->pair_stream->pair_stream = NULL; 905 } 906 907 MPFreeChunk(mtcp->rv_pool, stream->rcvvar); 908 MPFreeChunk(mtcp->sv_pool, stream->sndvar); 909 MPFreeChunk(mtcp->flow_pool, stream); 910 911 if (flow_lock) 912 /* stand-alone monitor does not need this since it is single-threaded */ 913 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock); 914 915 if (bound_addr) { 916 if (mtcp->ap) { 917 ret = FreeAddress(mtcp->ap, &addr); 918 } else { 919 int nif; 920 nif = GetOutputInterface(addr.sin_addr.s_addr); 921 ret = FreeAddress(ap[nif], &addr); 922 } 923 if (ret < 0) { 924 TRACE_ERROR("(NEVER HAPPEN) Failed to free address.\n"); 925 } 926 } 927 928 #ifdef NETSTAT 929 #if NETSTAT_PERTHREAD 930 TRACE_STREAM("Destroyed. Remaining flows: %u\n", mtcp->flow_cnt); 931 #endif /* NETSTAT_PERTHREAD */ 932 #endif /* NETSTAT */ 933 934 } 935 /*---------------------------------------------------------------------------*/ 936 void 937 DestroyTCPStream(mtcp_manager_t mtcp, tcp_stream *stream) 938 { 939 tcp_stream *pair_stream = stream->pair_stream; 940 941 DestroySingleTCPStream(mtcp, stream); 942 943 if (pair_stream) 944 DestroySingleTCPStream(mtcp, pair_stream); 945 } 946 /*---------------------------------------------------------------------------*/ 947 void 948 DumpStream(mtcp_manager_t mtcp, tcp_stream *stream) 949 { 950 uint8_t *sa, *da; 951 struct tcp_send_vars *sndvar = stream->sndvar; 952 struct tcp_recv_vars *rcvvar = stream->rcvvar; 953 954 sa = (uint8_t *)&stream->saddr; 955 da = (uint8_t *)&stream->daddr; 956 thread_printf(mtcp, mtcp->log_fp, "========== Stream %u: " 957 "%u.%u.%u.%u(%u) -> %u.%u.%u.%u(%u) ==========\n", stream->id, 958 sa[0], sa[1], sa[2], sa[3], ntohs(stream->sport), 959 da[0], da[1], da[2], da[3], ntohs(stream->dport)); 960 thread_printf(mtcp, mtcp->log_fp, 961 "Stream id: %u, type: %u, state: %s, close_reason: %s\n", 962 stream->id, stream->stream_type, 963 TCPStateToString(stream), close_reason_str[stream->close_reason]); 964 if (stream->socket) { 965 socket_map_t socket = stream->socket; 966 thread_printf(mtcp, mtcp->log_fp, "Socket id: %d, type: %d, opts: %u\n" 967 "epoll: %u (IN: %u, OUT: %u, ERR: %u, RDHUP: %u, ET: %u)\n" 968 "events: %u (IN: %u, OUT: %u, ERR: %u, RDHUP: %u, ET: %u)\n", 969 socket->id, socket->socktype, socket->opts, 970 socket->epoll, socket->epoll & MOS_EPOLLIN, 971 socket->epoll & MOS_EPOLLOUT, socket->epoll & MOS_EPOLLERR, 972 socket->epoll & MOS_EPOLLRDHUP, socket->epoll & MOS_EPOLLET, 973 socket->events, socket->events & MOS_EPOLLIN, 974 socket->events & MOS_EPOLLOUT, socket->events & MOS_EPOLLERR, 975 socket->events & MOS_EPOLLRDHUP, socket->events & MOS_EPOLLET); 976 } else { 977 thread_printf(mtcp, mtcp->log_fp, "Socket: (null)\n"); 978 } 979 980 thread_printf(mtcp, mtcp->log_fp, 981 "on_hash_table: %u, on_control_list: %u (wait: %u), on_send_list: %u, " 982 "on_ack_list: %u, is_wack: %u, ack_cnt: %u\n" 983 "on_rto_idx: %d, on_timewait_list: %u, on_timeout_list: %u, " 984 "on_rcv_br_list: %u, on_snd_br_list: %u\n" 985 "on_sendq: %u, on_ackq: %u, closed: %u, on_closeq: %u, " 986 "on_closeq_int: %u, on_resetq: %u, on_resetq_int: %u\n" 987 "have_reset: %u, is_fin_sent: %u, is_fin_ackd: %u, " 988 "saw_timestamp: %u, sack_permit: %u, " 989 "is_bound_addr: %u, need_wnd_adv: %u\n", stream->on_hash_table, 990 sndvar->on_control_list, stream->control_list_waiting, sndvar->on_send_list, 991 sndvar->on_ack_list, sndvar->is_wack, sndvar->ack_cnt, 992 stream->on_rto_idx, stream->on_timewait_list, stream->on_timeout_list, 993 stream->on_rcv_br_list, stream->on_snd_br_list, 994 sndvar->on_sendq, sndvar->on_ackq, 995 stream->closed, sndvar->on_closeq, sndvar->on_closeq_int, 996 sndvar->on_resetq, sndvar->on_resetq_int, 997 stream->have_reset, sndvar->is_fin_sent, 998 sndvar->is_fin_ackd, stream->saw_timestamp, stream->sack_permit, 999 stream->is_bound_addr, stream->need_wnd_adv); 1000 1001 thread_printf(mtcp, mtcp->log_fp, "========== Send variables ==========\n"); 1002 thread_printf(mtcp, mtcp->log_fp, 1003 "ip_id: %u, mss: %u, eff_mss: %u, wscale(me, peer): (%u, %u), nif_out: %d\n", 1004 sndvar->ip_id, sndvar->mss, sndvar->eff_mss, 1005 sndvar->wscale_mine, sndvar->wscale_peer, sndvar->nif_out); 1006 thread_printf(mtcp, mtcp->log_fp, 1007 "snd_nxt: %u, snd_una: %u, iss: %u, fss: %u\nsnd_wnd: %u, " 1008 "peer_wnd: %u, cwnd: %u, ssthresh: %u\n", 1009 stream->snd_nxt, sndvar->snd_una, sndvar->iss, sndvar->fss, 1010 sndvar->snd_wnd, sndvar->peer_wnd, sndvar->cwnd, sndvar->ssthresh); 1011 1012 if (sndvar->sndbuf) { 1013 thread_printf(mtcp, mtcp->log_fp, 1014 "Send buffer: init_seq: %u, head_seq: %u, " 1015 "len: %d, cum_len: %lu, size: %d\n", 1016 sndvar->sndbuf->init_seq, sndvar->sndbuf->head_seq, 1017 sndvar->sndbuf->len, sndvar->sndbuf->cum_len, sndvar->sndbuf->size); 1018 } else { 1019 thread_printf(mtcp, mtcp->log_fp, "Send buffer: (null)\n"); 1020 } 1021 thread_printf(mtcp, mtcp->log_fp, 1022 "nrtx: %u, max_nrtx: %u, rto: %u, ts_rto: %u, " 1023 "ts_lastack_sent: %u\n", sndvar->nrtx, sndvar->max_nrtx, 1024 sndvar->rto, sndvar->ts_rto, sndvar->ts_lastack_sent); 1025 1026 thread_printf(mtcp, mtcp->log_fp, 1027 "========== Receive variables ==========\n"); 1028 thread_printf(mtcp, mtcp->log_fp, 1029 "rcv_nxt: %u, irs: %u, rcv_wnd: %u, " 1030 "snd_wl1: %u, snd_wl2: %u\n", 1031 stream->rcv_nxt, rcvvar->irs, 1032 rcvvar->rcv_wnd, rcvvar->snd_wl1, rcvvar->snd_wl2); 1033 if (!rcvvar->rcvbuf) { 1034 thread_printf(mtcp, mtcp->log_fp, "Receive buffer: (null)\n"); 1035 } 1036 thread_printf(mtcp, mtcp->log_fp, "last_ack_seq: %u, dup_acks: %u\n", 1037 rcvvar->last_ack_seq, rcvvar->dup_acks); 1038 thread_printf(mtcp, mtcp->log_fp, 1039 "ts_recent: %u, ts_lastack_rcvd: %u, ts_last_ts_upd: %u, " 1040 "ts_tw_expire: %u\n", rcvvar->ts_recent, rcvvar->ts_lastack_rcvd, 1041 rcvvar->ts_last_ts_upd, rcvvar->ts_tw_expire); 1042 thread_printf(mtcp, mtcp->log_fp, 1043 "srtt: %u, mdev: %u, mdev_max: %u, rttvar: %u, rtt_seq: %u\n", 1044 rcvvar->srtt, rcvvar->mdev, rcvvar->mdev_max, 1045 rcvvar->rttvar, rcvvar->rtt_seq); 1046 } 1047 /*---------------------------------------------------------------------------*/ 1048