1 #include "debug.h" 2 #include <string.h> 3 4 #include "config.h" 5 #include "tcp_stream.h" 6 #include "fhash.h" 7 #include "tcp.h" 8 #include "tcp_in.h" 9 #include "tcp_out.h" 10 #include "tcp_ring_buffer.h" 11 #include "tcp_send_buffer.h" 12 #include "eventpoll.h" 13 #include "ip_out.h" 14 #include "timer.h" 15 #include "tcp_rb.h" 16 /*---------------------------------------------------------------------------*/ 17 char *state_str[] = { 18 "TCP_ST_CLOSED", 19 "TCP_ST_LISTEN", 20 "TCP_ST_SYN_SENT", 21 "TCP_ST_SYN_RCVD", 22 "TCP_ST_ESTABILSHED", 23 "TCP_ST_FIN_WAIT_1", 24 "TCP_ST_FIN_WAIT_2", 25 "TCP_ST_CLOSE_WAIT", 26 "TCP_ST_CLOSING", 27 "TCP_ST_LAST_ACK", 28 "TCP_ST_TIME_WAIT", 29 "TCP_ST_CLOSED_RSVD" 30 }; 31 /*---------------------------------------------------------------------------*/ 32 char *close_reason_str[] = { 33 "NOT_CLOSED", 34 "CLOSE", 35 "CLOSED", 36 "CONN_FAIL", 37 "CONN_LOST", 38 "RESET", 39 "NO_MEM", 40 "DENIED", 41 "TIMEDOUT" 42 }; 43 /*---------------------------------------------------------------------------*/ 44 static __thread unsigned long next = 1; 45 /* Function retrieved from POSIX.1-2001 standard */ 46 /* RAND_MAX assumed to be 32767 */ 47 static int 48 posix_seq_rand(void) { 49 next = next * 1103515245 + 12345; 50 return ((unsigned)(next/65536) % 32768); 51 } 52 /*---------------------------------------------------------------------------*/ 53 void 54 posix_seq_srand(unsigned seed) { 55 next = seed % 32768; 56 } 57 /*---------------------------------------------------------------------------*/ 58 uint32_t 59 FetchSeqDrift(struct tcp_stream *stream, uint32_t seq) 60 { 61 int i = 0; 62 uint8_t flag = 1; 63 int count; 64 65 i = stream->sndvar->sre_index - 1; 66 if (i == -1) i = SRE_MAX - 1; 67 count = 0; 68 69 while (flag) { 70 if (stream->sndvar->sre[i].seq_base == 0) 71 return 0; 72 else if (seq >= stream->sndvar->sre[i].seq_base) 73 return stream->sndvar->sre[i].seq_off; 74 75 i--; 76 if (i == -1) i = SRE_MAX - 1; 77 count++; 78 if (count == SRE_MAX) 79 flag = 1; 80 } 81 82 return 0; 83 } 84 /*---------------------------------------------------------------------------*/ 85 int 86 TcpSeqChange(socket_map_t socket, uint32_t seq_drift, int side, uint32_t seqno) 87 { 88 struct tcp_stream *mstrm, *stream; 89 90 if (side != MOS_SIDE_CLI && side != MOS_SIDE_SVR) { 91 TRACE_ERROR("Invalid side requested!\n"); 92 errno = EINVAL; 93 return -1; 94 } 95 96 mstrm = socket->monitor_stream->stream; 97 stream = (side == mstrm->side) ? mstrm : mstrm->pair_stream; 98 if (stream == NULL) { 99 TRACE_ERROR("Stream pointer for sockid: %u not found!\n", 100 socket->id); 101 errno = EBADF; 102 return -1; 103 } 104 105 stream->sndvar->sre[stream->sndvar->sre_index].seq_off = seq_drift; 106 stream->sndvar->sre[stream->sndvar->sre_index].seq_off += 107 (stream->sndvar->sre_index == 0) ? stream->sndvar->sre[SRE_MAX - 1].seq_off : 108 stream->sndvar->sre[stream->sndvar->sre_index - 1].seq_off; 109 stream->sndvar->sre[stream->sndvar->sre_index].seq_base = seqno; 110 stream->sndvar->sre_index = (stream->sndvar->sre_index + 1) & (SRE_MAX - 1); 111 112 return 0; 113 } 114 /*---------------------------------------------------------------------------*/ 115 /** 116 * FYI: This is NOT a read-only return! 117 */ 118 int 119 GetFragInfo(socket_map_t sock, int side, void *optval, socklen_t *len) 120 { 121 #ifdef NEWRB 122 #ifndef NEWPPEEK 123 /* TODO: Implement this */ 124 return -1; 125 #endif 126 #endif 127 struct tcp_stream *stream; 128 129 stream = NULL; 130 #ifdef NEWPPEEK 131 if (!*len || ( *len % sizeof(tcpfrag_t) != 0)) 132 goto frag_info_error; 133 #else 134 if (*len != sizeof(struct fragment_ctx)) 135 goto frag_info_error; 136 #endif 137 138 if (side != MOS_SIDE_CLI && side != MOS_SIDE_SVR) { 139 TRACE_ERROR("Invalid side requested!\n"); 140 exit(EXIT_FAILURE); 141 return -1; 142 } 143 144 struct tcp_stream *mstrm = sock->monitor_stream->stream; 145 stream = (side == mstrm->side) ? mstrm : mstrm->pair_stream; 146 147 if (stream == NULL) goto frag_info_error; 148 149 /* First check if the tcp ring buffer even has anything */ 150 if (stream->rcvvar != NULL && 151 stream->rcvvar->rcvbuf != NULL) { 152 #ifdef NEWRB 153 #ifdef NEWPPEEK 154 tcprb_t *rcvbuf = stream->rcvvar->rcvbuf; 155 struct tcp_ring_fragment *out = (struct tcp_ring_fragment *)optval; 156 int const maxout = *len; 157 *len = 0; 158 struct _tcpfrag_t *walk; 159 TAILQ_FOREACH(walk, &rcvbuf->frags, link) { 160 if (*len == maxout) 161 break; 162 out[*len].offset = walk->head; 163 out[*len].len = walk->tail - walk->head; 164 (*len)++; 165 } 166 if (*len != maxout) { 167 /* set zero sentinel */ 168 out[*len].offset = 0; 169 out[*len].len = 0; 170 } 171 #endif 172 #else 173 struct tcp_ring_buffer *rcvbuf = stream->rcvvar->rcvbuf; 174 if (rcvbuf->fctx == NULL) 175 goto frag_info_error; 176 else { 177 optval = (void *)rcvbuf->fctx; 178 *len = rcvbuf->head_seq + 179 (rcvbuf->tail_offset - 180 rcvbuf->head_offset); 181 } 182 #endif 183 } else 184 goto frag_info_error; 185 186 return 0; 187 188 frag_info_error: 189 optval = NULL; 190 *len = 0; 191 return -1; 192 } 193 /*---------------------------------------------------------------------------*/ 194 /** 195 * Comments later... 196 */ 197 int 198 GetBufInfo(socket_map_t sock, int side, void *optval, socklen_t *len) 199 { 200 struct tcp_stream *stream; 201 struct tcp_buf_info *tbi; 202 203 tbi = (struct tcp_buf_info *)optval; 204 memset(tbi, 0, sizeof(struct tcp_buf_info)); 205 stream = NULL; 206 207 if (*len != sizeof(struct tcp_buf_info)) { 208 errno = EINVAL; 209 goto buf_info_error; 210 } 211 212 if (side != MOS_SIDE_CLI && side != MOS_SIDE_SVR) { 213 TRACE_ERROR("Invalid side requested!\n"); 214 errno = EINVAL; 215 goto buf_info_error; 216 } 217 218 struct tcp_stream *mstrm = sock->monitor_stream->stream; 219 stream = (side == mstrm->side) ? mstrm : mstrm->pair_stream; 220 221 /* First check if the tcp ring buffer even has anything */ 222 if (stream != NULL && 223 stream->rcvvar != NULL && 224 stream->rcvvar->rcvbuf != NULL) { 225 #ifdef NEWRB 226 tcprb_t *rcvbuf = stream->rcvvar->rcvbuf; 227 tcpfrag_t *f = TAILQ_LAST(&rcvbuf->frags, flist); 228 tbi->tcpbi_init_seq = stream->rcvvar->irs + 1; 229 tbi->tcpbi_last_byte_read = rcvbuf->pile; 230 tbi->tcpbi_next_byte_expected = rcvbuf->pile + tcprb_cflen(rcvbuf); 231 tbi->tcpbi_last_byte_received = (f ? f->tail : rcvbuf->head); 232 #else 233 struct tcp_ring_buffer *rcvbuf = stream->rcvvar->rcvbuf; 234 tbi->tcpbi_init_seq = rcvbuf->init_seq; 235 tbi->tcpbi_last_byte_read = rcvbuf->head_seq; 236 tbi->tcpbi_next_byte_expected = rcvbuf->head_seq + rcvbuf->merged_len; 237 tbi->tcpbi_last_byte_received = rcvbuf->head_seq + rcvbuf->last_len; 238 #endif 239 } else { 240 errno = ENODATA; 241 goto buf_info_error; 242 } 243 244 return 0; 245 246 buf_info_error: 247 optval = NULL; 248 *len = 0; 249 return -1; 250 } 251 /*---------------------------------------------------------------------------*/ 252 int 253 DisableBuf(socket_map_t sock, int side) 254 { 255 #ifdef DBGMSG 256 __PREPARE_DGBLOGGING(); 257 #endif 258 struct tcp_stream *stream; 259 int rc = 0; 260 261 switch (sock->socktype) { 262 case MOS_SOCK_MONITOR_STREAM: 263 if (side == MOS_SIDE_CLI) 264 sock->monitor_listener->client_buf_mgmt = 0; 265 else if (side == MOS_SIDE_SVR) 266 sock->monitor_listener->server_buf_mgmt = 0; 267 else { 268 assert(0); 269 TRACE_DBG("Invalid side!\n"); 270 rc = -1; 271 } 272 break; 273 case MOS_SOCK_MONITOR_STREAM_ACTIVE: 274 stream = sock->monitor_stream->stream; 275 if (stream->side != side) 276 stream = stream->pair_stream; 277 assert(stream->side == side); 278 stream->buffer_mgmt = 0; 279 break; 280 default: 281 assert(0); 282 TRACE_DBG("Can't disable buf for invalid socket!\n"); 283 rc = -1; 284 } 285 286 return rc; 287 } 288 /*---------------------------------------------------------------------------*/ 289 int 290 GetLastTimestamp(struct tcp_stream *stream, uint32_t *usecs, socklen_t *len) 291 { 292 #ifdef DBGMSG 293 __PREPARE_DGBLOGGING(); 294 #endif 295 if (*len < sizeof(uint32_t)) { 296 TRACE_DBG("Size passed is not >= sizeof(uint32_t)!\n"); 297 return -1; 298 } 299 300 *usecs = (stream->last_active_ts > 301 stream->pair_stream->last_active_ts) 302 ? 303 TS_TO_USEC(stream->last_active_ts) : 304 TS_TO_USEC(stream->pair_stream->last_active_ts); 305 306 return 0; 307 } 308 /*---------------------------------------------------------------------------*/ 309 inline int 310 GetTCPState(struct tcp_stream *stream, int side, 311 void *optval, socklen_t *optlen) 312 { 313 if (!stream || !(stream = (side == stream->side) ? stream : stream->pair_stream)) 314 return -1; 315 *(int *)optval = (int)((stream->state == TCP_ST_CLOSED_RSVD) ? 316 TCP_ST_CLOSED : stream->state); 317 return 0; 318 } 319 /*---------------------------------------------------------------------------*/ 320 inline char * 321 TCPStateToString(const tcp_stream *stream) 322 { 323 return (stream) ? state_str[stream->state] : NULL; 324 } 325 /*---------------------------------------------------------------------------*/ 326 inline void 327 RaiseReadEvent(mtcp_manager_t mtcp, tcp_stream *stream) 328 { 329 struct tcp_recv_vars *rcvvar; 330 331 rcvvar = stream->rcvvar; 332 333 if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) { 334 if (stream->socket && (stream->socket->epoll & MOS_EPOLLIN)) 335 AddEpollEvent(mtcp->ep, MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLIN); 336 #ifdef NEWRB 337 } else if (rcvvar->rcvbuf && tcprb_cflen(rcvvar->rcvbuf) > 0) { 338 #else 339 } else if (rcvvar->rcvbuf && rcvvar->rcvbuf->merged_len) { 340 #endif 341 /* 342 * in case it is a monitoring socket, queue up the read events 343 * in the event_queue of only if the tcp_stream hasn't already 344 * been registered in the event queue 345 */ 346 int index; 347 struct event_queue *eq; 348 struct socket_map *walk; 349 350 SOCKQ_FOREACH_START(walk, &stream->msocks) { 351 assert(walk->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE); 352 eq = walk->monitor_stream->monitor_listener->eq; 353 354 /* if it already has read data register... then skip this step */ 355 if (stream->actions & MOS_ACT_READ_DATA) 356 return; 357 if (eq->num_events >= eq->size) { 358 TRACE_ERROR("Exceeded epoll event queue! num_events: %d, " 359 "size: %d\n", eq->num_events, eq->size); 360 return; 361 } 362 363 index = eq->end++; 364 eq->events[index].ev.events = MOS_EPOLLIN; 365 eq->events[index].ev.data.ptr = (void *)stream; 366 367 if (eq->end >= eq->size) { 368 eq->end = 0; 369 } 370 eq->num_events++; 371 stream->actions |= MOS_ACT_READ_DATA; 372 } SOCKQ_FOREACH_END; 373 } else { 374 TRACE_EPOLL("Stream %d: Raising read without a socket!\n", stream->id); 375 } 376 } 377 /*---------------------------------------------------------------------------*/ 378 inline void 379 RaiseWriteEvent(mtcp_manager_t mtcp, tcp_stream *stream) 380 { 381 if (stream->socket) { 382 if (stream->socket->epoll & MOS_EPOLLOUT) { 383 AddEpollEvent(mtcp->ep, 384 MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLOUT); 385 } 386 } else { 387 TRACE_EPOLL("Stream %d: Raising write without a socket!\n", stream->id); 388 } 389 } 390 /*---------------------------------------------------------------------------*/ 391 inline void 392 RaiseCloseEvent(mtcp_manager_t mtcp, tcp_stream *stream) 393 { 394 if (stream->socket) { 395 if (stream->socket->epoll & MOS_EPOLLRDHUP) { 396 AddEpollEvent(mtcp->ep, 397 MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLRDHUP); 398 } else if (stream->socket->epoll & MOS_EPOLLIN) { 399 AddEpollEvent(mtcp->ep, 400 MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLIN); 401 } 402 } else { 403 TRACE_EPOLL("Stream %d: Raising close without a socket!\n", stream->id); 404 } 405 } 406 /*---------------------------------------------------------------------------*/ 407 inline int 408 RaiseErrorEvent(mtcp_manager_t mtcp, tcp_stream *stream) 409 { 410 if (stream->socket) { 411 if (stream->socket->epoll & MOS_EPOLLERR) { 412 /* passing closing reason for error notification */ 413 return AddEpollEvent(mtcp->ep, 414 MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLERR); 415 } 416 } else { 417 TRACE_EPOLL("Stream %d: Raising error without a socket!\n", stream->id); 418 } 419 return -1; 420 } 421 /*----------------------------------------------------------------------------*/ 422 int 423 AddMonitorStreamSockets(mtcp_manager_t mtcp, struct tcp_stream *stream) 424 { 425 struct mtcp_context mctx; 426 int socktype; 427 428 mctx.cpu = mtcp->ctx->cpu; 429 struct mon_listener *walk; 430 431 // traverse the passive socket's list 432 TAILQ_FOREACH(walk, &mtcp->monitors, link) { 433 socktype = walk->socket->socktype; 434 435 if (socktype != MOS_SOCK_MONITOR_STREAM) 436 continue; 437 438 /* mtcp_bind_monitor_filter() 439 * - create an monitor active socket only for the filter-passed flows 440 * - we use the result (= tag) from DetectStreamType() to avoid 441 * evaluating the same BPF filter twice */ 442 if (!walk->is_stream_syn_filter_hit) { 443 continue; 444 } 445 446 struct socket_map *s = 447 AllocateSocket(&mctx, MOS_SOCK_MONITOR_STREAM_ACTIVE); 448 if (!s) 449 return -1; 450 451 s->monitor_stream->socket = s; 452 s->monitor_stream->stream = stream; 453 s->monitor_stream->monitor_listener = walk; 454 s->monitor_stream->client_buf_mgmt = walk->client_buf_mgmt; 455 s->monitor_stream->server_buf_mgmt = walk->server_buf_mgmt; 456 s->monitor_stream->client_mon = walk->client_mon; 457 s->monitor_stream->server_mon = walk->server_mon; 458 #ifdef NEWEV 459 s->monitor_stream->stree_dontcare = 460 s->monitor_stream->monitor_listener->stree_dontcare; 461 s->monitor_stream->stree_pre_rcv = 462 s->monitor_stream->monitor_listener->stree_pre_rcv; 463 s->monitor_stream->stree_post_snd = 464 s->monitor_stream->monitor_listener->stree_post_snd; 465 if (s->monitor_stream->stree_dontcare) 466 stree_inc_ref(s->monitor_stream->stree_dontcare); 467 if (s->monitor_stream->stree_pre_rcv) 468 stree_inc_ref(s->monitor_stream->stree_pre_rcv); 469 if (s->monitor_stream->stree_post_snd) 470 stree_inc_ref(s->monitor_stream->stree_post_snd); 471 #else 472 InitEvP(&s->monitor_stream->dontcare_evp, 473 &walk->dontcare_evb); 474 InitEvP(&s->monitor_stream->pre_tcp_evp, 475 &walk->pre_tcp_evb); 476 InitEvP(&s->monitor_stream->post_tcp_evp, 477 &walk->post_tcp_evb); 478 #endif 479 480 SOCKQ_INSERT_TAIL(&stream->msocks, s); 481 } 482 483 return 0; 484 } 485 /*----------------------------------------------------------------------------*/ 486 int 487 DestroyMonitorStreamSocket(mtcp_manager_t mtcp, socket_map_t msock) 488 { 489 struct mtcp_context mctx; 490 int socktype, sockid, rc; 491 492 if (msock == NULL) { 493 TRACE_DBG("Stream socket does not exist!\n"); 494 /* exit(-1); */ 495 return 0; 496 } 497 498 rc = 0; 499 mctx.cpu = mtcp->ctx->cpu; 500 socktype = msock->socktype; 501 sockid = msock->id; 502 503 switch (socktype) { 504 case MOS_SOCK_MONITOR_STREAM_ACTIVE: 505 FreeSocket(&mctx, sockid, socktype); 506 break; 507 case MOS_SOCK_MONITOR_RAW: 508 /* do nothing since all raw sockets point to the same socket */ 509 break; 510 default: 511 TRACE_DBG("Trying to destroy a monitor socket for an unsupported type!\n"); 512 rc = -1; 513 /* exit(-1); */ 514 break; 515 } 516 517 return rc; 518 } 519 /*---------------------------------------------------------------------------*/ 520 tcp_stream * 521 CreateTCPStream(mtcp_manager_t mtcp, socket_map_t socket, int type, 522 uint32_t saddr, uint16_t sport, uint32_t daddr, uint16_t dport, 523 unsigned int *hash) 524 { 525 tcp_stream *stream = NULL; 526 int ret; 527 /* stand-alone monitor does not need this since it is single-threaded */ 528 bool flow_lock = type & STREAM_TYPE(MOS_SOCK_STREAM); 529 //bool flow_lock = false; 530 531 if (flow_lock) 532 pthread_mutex_lock(&mtcp->ctx->flow_pool_lock); 533 534 stream = (tcp_stream *)MPAllocateChunk(mtcp->flow_pool); 535 if (!stream) { 536 TRACE_ERROR("Cannot allocate memory for the stream. " 537 "g_config.mos->max_concurrency: %d, concurrent: %u\n", 538 g_config.mos->max_concurrency, mtcp->flow_cnt); 539 if (flow_lock) 540 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock); 541 return NULL; 542 } 543 memset(stream, 0, sizeof(tcp_stream)); 544 545 stream->rcvvar = (struct tcp_recv_vars *)MPAllocateChunk(mtcp->rv_pool); 546 if (!stream->rcvvar) { 547 MPFreeChunk(mtcp->flow_pool, stream); 548 if (flow_lock) 549 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock); 550 return NULL; 551 } 552 memset(stream->rcvvar, 0, sizeof(struct tcp_recv_vars)); 553 554 /* stand-alone monitor does not need to do this */ 555 stream->sndvar = (struct tcp_send_vars *)MPAllocateChunk(mtcp->sv_pool); 556 if (!stream->sndvar) { 557 MPFreeChunk(mtcp->rv_pool, stream->rcvvar); 558 MPFreeChunk(mtcp->flow_pool, stream); 559 if (flow_lock) 560 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock); 561 return NULL; 562 } 563 //if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) 564 memset(stream->sndvar, 0, sizeof(struct tcp_send_vars)); 565 566 stream->id = mtcp->g_id++; 567 stream->saddr = saddr; 568 stream->sport = sport; 569 stream->daddr = daddr; 570 stream->dport = dport; 571 572 ret = HTInsert(mtcp->tcp_flow_table, stream, hash); 573 if (ret < 0) { 574 TRACE_ERROR("Stream %d: " 575 "Failed to insert the stream into hash table.\n", stream->id); 576 MPFreeChunk(mtcp->flow_pool, stream); 577 if (flow_lock) 578 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock); 579 return NULL; 580 } 581 stream->on_hash_table = TRUE; 582 mtcp->flow_cnt++; 583 584 SOCKQ_INIT(&stream->msocks); 585 586 /* 587 * if an embedded monitor is attached... 588 * create monitor stream socket now! 589 * If socket type is raw.. then don't create it 590 */ 591 if ((mtcp->num_msp > 0) && 592 (type & STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE))) 593 if (AddMonitorStreamSockets(mtcp, stream) < 0) 594 TRACE_DBG("Could not create monitor stream socket!\n"); 595 596 if (flow_lock) 597 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock); 598 599 if (socket) { 600 stream->socket = socket; 601 socket->stream = stream; 602 } 603 604 stream->stream_type = type; 605 stream->state = TCP_ST_LISTEN; 606 /* This is handled by core.c, tcp_in.c & tcp_out.c */ 607 /* stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; */ 608 609 stream->on_rto_idx = -1; 610 611 /* stand-alone monitor does not need to do this */ 612 stream->sndvar->mss = TCP_DEFAULT_MSS; 613 stream->sndvar->wscale_mine = TCP_DEFAULT_WSCALE; 614 stream->sndvar->wscale_peer = 0; 615 616 if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) { 617 stream->sndvar->ip_id = 0; 618 stream->sndvar->nif_out = GetOutputInterface(stream->daddr); 619 620 stream->sndvar->iss = posix_seq_rand() % TCP_MAX_SEQ; 621 //stream->sndvar->iss = 0; 622 stream->snd_nxt = stream->sndvar->iss; 623 stream->sndvar->snd_una = stream->sndvar->iss; 624 stream->sndvar->snd_wnd = g_config.mos->wmem_size; 625 stream->sndvar->rto = TCP_INITIAL_RTO; 626 #if USE_SPIN_LOCK 627 if (pthread_spin_init(&stream->sndvar->write_lock, PTHREAD_PROCESS_PRIVATE)) { 628 perror("pthread_spin_init of write_lock"); 629 pthread_spin_destroy(&stream->rcvvar->read_lock); 630 #else 631 if (pthread_mutex_init(&stream->sndvar->write_lock, NULL)) { 632 perror("pthread_mutex_init of write_lock"); 633 pthread_mutex_destroy(&stream->rcvvar->read_lock); 634 #endif 635 return NULL; 636 } 637 } 638 stream->rcvvar->irs = 0; 639 640 stream->rcv_nxt = 0; 641 stream->rcvvar->rcv_wnd = TCP_INITIAL_WINDOW; 642 643 stream->rcvvar->snd_wl1 = stream->rcvvar->irs - 1; 644 645 #ifdef NEWRB 646 stream->buffer_mgmt = BUFMGMT_FULL; 647 #else 648 /* set tcp_ring_buffer management to `on' by default */ 649 stream->buffer_mgmt = TRUE; 650 #endif 651 652 /* needs state update by default */ 653 stream->status_mgmt = 1; 654 655 #if USE_SPIN_LOCK 656 if (pthread_spin_init(&stream->rcvvar->read_lock, PTHREAD_PROCESS_PRIVATE)) { 657 #else 658 if (pthread_mutex_init(&stream->rcvvar->read_lock, NULL)) { 659 #endif 660 perror("pthread_mutex_init of read_lock"); 661 return NULL; 662 } 663 664 #ifdef STREAM 665 uint8_t *sa; 666 uint8_t *da; 667 668 sa = (uint8_t *)&stream->saddr; 669 da = (uint8_t *)&stream->daddr; 670 TRACE_STREAM("CREATED NEW TCP STREAM %d: " 671 "%u.%u.%u.%u(%d) -> %u.%u.%u.%u(%d) (ISS: %u)\n", stream->id, 672 sa[0], sa[1], sa[2], sa[3], ntohs(stream->sport), 673 da[0], da[1], da[2], da[3], ntohs(stream->dport), 674 stream->sndvar->iss); 675 #endif 676 677 return stream; 678 } 679 /*----------------------------------------------------------------------------*/ 680 inline tcp_stream * 681 CreateDualTCPStream(mtcp_manager_t mtcp, socket_map_t socket, int type, uint32_t saddr, 682 uint16_t sport, uint32_t daddr, uint16_t dport, unsigned int *hash) 683 { 684 tcp_stream *cur_stream, *paired_stream; 685 struct socket_map *walk; 686 687 cur_stream = CreateTCPStream(mtcp, socket, type, 688 saddr, sport, daddr, dport, hash); 689 if (cur_stream == NULL) { 690 TRACE_ERROR("Can't create tcp_stream!\n"); 691 return NULL; 692 } 693 694 paired_stream = CreateTCPStream(mtcp, NULL, MOS_SOCK_UNUSED, 695 daddr, dport, saddr, sport, hash); 696 if (paired_stream == NULL) { 697 DestroyTCPStream(mtcp, cur_stream); 698 TRACE_ERROR("Can't create tcp_stream!\n"); 699 return NULL; 700 } 701 702 cur_stream->pair_stream = paired_stream; 703 paired_stream->pair_stream = cur_stream; 704 paired_stream->socket = socket; 705 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 706 SOCKQ_INSERT_TAIL(&paired_stream->msocks, walk); 707 } SOCKQ_FOREACH_END; 708 paired_stream->stream_type = STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE); 709 710 return cur_stream; 711 } 712 /*----------------------------------------------------------------------------*/ 713 inline tcp_stream * 714 CreateClientTCPStream(mtcp_manager_t mtcp, socket_map_t socket, int type, 715 uint32_t saddr, uint16_t sport, uint32_t daddr, uint16_t dport, 716 unsigned int *hash) 717 { 718 tcp_stream *cs; 719 struct socket_map *w; 720 721 cs = CreateTCPStream(mtcp, socket, type, daddr, dport, saddr, sport, hash); 722 if (cs == NULL) { 723 TRACE_ERROR("Can't create tcp_stream!\n"); 724 return NULL; 725 } 726 727 cs->side = MOS_SIDE_CLI; 728 cs->pair_stream = NULL; 729 730 /* if buffer management is off, then disable 731 * monitoring tcp ring of either streams (only if stream 732 * is just monitor stream active) 733 */ 734 if (IS_STREAM_TYPE(cs, MOS_SOCK_MONITOR_STREAM_ACTIVE)) { 735 cs->buffer_mgmt = BUFMGMT_OFF; 736 SOCKQ_FOREACH_START(w, &cs->msocks) { 737 uint8_t bm = w->monitor_stream->client_buf_mgmt; 738 if (bm > cs->buffer_mgmt) 739 cs->buffer_mgmt = bm; 740 if (w->monitor_stream->monitor_listener->client_mon == 1) 741 cs->status_mgmt = 1; 742 } SOCKQ_FOREACH_END; 743 } 744 745 return cs; 746 } 747 /*----------------------------------------------------------------------------*/ 748 inline tcp_stream * 749 AttachServerTCPStream(mtcp_manager_t mtcp, tcp_stream *cs, int type, 750 uint32_t saddr, uint16_t sport, uint32_t daddr, uint16_t dport) 751 { 752 tcp_stream *ss; 753 struct socket_map *w; 754 755 /* The 3rd arg is a temp hackk... FIXIT! TODO: XXX */ 756 ss = CreateTCPStream(mtcp, NULL, MOS_SOCK_UNUSED, saddr, sport, daddr, dport, NULL); 757 if (ss == NULL) { 758 TRACE_ERROR("Can't create tcp_stream!\n"); 759 return NULL; 760 } 761 762 ss->side = MOS_SIDE_SVR; 763 cs->pair_stream = ss; 764 ss->pair_stream = cs; 765 ss->socket = cs->socket; 766 SOCKQ_FOREACH_START(w, &cs->msocks) { 767 SOCKQ_INSERT_TAIL(&ss->msocks, w); 768 } SOCKQ_FOREACH_END; 769 ss->stream_type = STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE); 770 771 if (IS_STREAM_TYPE(ss, MOS_SOCK_MONITOR_STREAM_ACTIVE)) { 772 ss->buffer_mgmt = BUFMGMT_OFF; 773 SOCKQ_FOREACH_START(w, &ss->msocks) { 774 uint8_t bm = w->monitor_stream->server_buf_mgmt; 775 if (bm > ss->buffer_mgmt) 776 ss->buffer_mgmt = bm; 777 if (w->monitor_stream->monitor_listener->server_mon == 1) 778 ss->status_mgmt = 1; 779 } SOCKQ_FOREACH_END; 780 } 781 782 return ss; 783 } 784 /*---------------------------------------------------------------------------*/ 785 static void 786 DestroySingleTCPStream(mtcp_manager_t mtcp, tcp_stream *stream) 787 { 788 struct sockaddr_in addr; 789 int bound_addr = FALSE; 790 int ret; 791 /* stand-alone monitor does not need this since it is single-threaded */ 792 bool flow_lock = HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM); 793 794 struct socket_map *walk; 795 796 /* Set the stream state as CLOSED */ 797 stream->state = TCP_ST_CLOSED_RSVD; 798 799 SOCKQ_FOREACH_START(walk, &stream->msocks) { 800 HandleCallback(mtcp, MOS_HK_RCV, walk, stream->side, NULL, 801 MOS_ON_CONN_END | MOS_ON_TCP_STATE_CHANGE | stream->cb_events); 802 HandleCallback(mtcp, MOS_HK_SND, walk, stream->side, NULL, 803 MOS_ON_CONN_END | MOS_ON_TCP_STATE_CHANGE | stream->cb_events); 804 } SOCKQ_FOREACH_END; 805 806 #if 0 807 #ifdef DUMP_STREAM 808 if (stream->close_reason != TCP_ACTIVE_CLOSE && 809 stream->close_reason != TCP_PASSIVE_CLOSE) { 810 thread_printf(mtcp, mtcp->log_fp, 811 "Stream %d abnormally closed.\n", stream->id); 812 DumpStream(mtcp, stream); 813 DumpControlList(mtcp, mtcp->n_sender[0]); 814 } 815 #endif 816 817 #ifdef STREAM 818 uint8_t *sa, *da; 819 sa = (uint8_t *)&stream->saddr; 820 da = (uint8_t *)&stream->daddr; 821 TRACE_STREAM("DESTROY TCP STREAM %d: " 822 "%u.%u.%u.%u(%d) -> %u.%u.%u.%u(%d) (%s)\n", stream->id, 823 sa[0], sa[1], sa[2], sa[3], ntohs(stream->sport), 824 da[0], da[1], da[2], da[3], ntohs(stream->dport), 825 close_reason_str[stream->close_reason]); 826 #endif 827 828 if (stream->sndvar->sndbuf) { 829 TRACE_FSTAT("Stream %d: send buffer " 830 "cum_len: %lu, len: %u\n", stream->id, 831 stream->sndvar->sndbuf->cum_len, 832 stream->sndvar->sndbuf->len); 833 } 834 if (stream->rcvvar->rcvbuf) { 835 TRACE_FSTAT("Stream %d: recv buffer " 836 "cum_len: %lu, merged_len: %u, last_len: %u\n", stream->id, 837 stream->rcvvar->rcvbuf->cum_len, 838 stream->rcvvar->rcvbuf->merged_len, 839 stream->rcvvar->rcvbuf->last_len); 840 } 841 842 #if RTM_STAT 843 /* Triple duplicated ack stats */ 844 if (stream->sndvar->rstat.tdp_ack_cnt) { 845 TRACE_FSTAT("Stream %d: triple duplicated ack: %u, " 846 "retransmission bytes: %u, average rtm bytes/ack: %u\n", 847 stream->id, 848 stream->sndvar->rstat.tdp_ack_cnt, stream->sndvar->rstat.tdp_ack_bytes, 849 stream->sndvar->rstat.tdp_ack_bytes / stream->sndvar->rstat.tdp_ack_cnt); 850 } 851 852 /* Retransmission timeout stats */ 853 if (stream->sndvar->rstat.rto_cnt > 0) { 854 TRACE_FSTAT("Stream %d: timeout count: %u, bytes: %u\n", stream->id, 855 stream->sndvar->rstat.rto_cnt, stream->sndvar->rstat.rto_bytes); 856 } 857 858 /* Recovery stats */ 859 if (stream->sndvar->rstat.ack_upd_cnt) { 860 TRACE_FSTAT("Stream %d: snd_nxt update count: %u, " 861 "snd_nxt update bytes: %u, average update bytes/update: %u\n", 862 stream->id, 863 stream->sndvar->rstat.ack_upd_cnt, stream->sndvar->rstat.ack_upd_bytes, 864 stream->sndvar->rstat.ack_upd_bytes / stream->sndvar->rstat.ack_upd_cnt); 865 } 866 #if TCP_OPT_SACK_ENABLED 867 if (stream->sndvar->rstat.sack_cnt) { 868 TRACE_FSTAT("Selective ack count: %u, bytes: %u, " 869 "average bytes/ack: %u\n", 870 stream->sndvar->rstat.sack_cnt, stream->sndvar->rstat.sack_bytes, 871 stream->sndvar->rstat.sack_bytes / stream->sndvar->rstat.sack_cnt); 872 } else { 873 TRACE_FSTAT("Selective ack count: %u, bytes: %u\n", 874 stream->sndvar->rstat.sack_cnt, stream->sndvar->rstat.sack_bytes); 875 } 876 if (stream->sndvar->rstat.tdp_sack_cnt) { 877 TRACE_FSTAT("Selective tdp ack count: %u, bytes: %u, " 878 "average bytes/ack: %u\n", 879 stream->sndvar->rstat.tdp_sack_cnt, stream->sndvar->rstat.tdp_sack_bytes, 880 stream->sndvar->rstat.tdp_sack_bytes / stream->sndvar->rstat.tdp_sack_cnt); 881 } else { 882 TRACE_FSTAT("Selective ack count: %u, bytes: %u\n", 883 stream->sndvar->rstat.tdp_sack_cnt, stream->sndvar->rstat.tdp_sack_bytes); 884 } 885 #endif /* TCP_OPT_SACK_ENABLED */ 886 #endif /* RTM_STAT */ 887 #endif 888 889 if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) { 890 /* stand-alone monitor does not need to do these */ 891 if (stream->is_bound_addr) { 892 bound_addr = TRUE; 893 addr.sin_addr.s_addr = stream->saddr; 894 addr.sin_port = stream->sport; 895 } 896 897 RemoveFromControlList(mtcp, stream); 898 RemoveFromSendList(mtcp, stream); 899 RemoveFromACKList(mtcp, stream); 900 901 if (stream->on_rto_idx >= 0) 902 RemoveFromRTOList(mtcp, stream); 903 904 SBUF_LOCK_DESTROY(&stream->rcvvar->read_lock); 905 SBUF_LOCK_DESTROY(&stream->sndvar->write_lock); 906 907 assert(stream->on_hash_table == TRUE); 908 909 /* free ring buffers */ 910 if (stream->sndvar->sndbuf) { 911 SBFree(mtcp->rbm_snd, stream->sndvar->sndbuf); 912 stream->sndvar->sndbuf = NULL; 913 } 914 } 915 916 if (stream->on_timewait_list) 917 RemoveFromTimewaitList(mtcp, stream); 918 919 if (g_config.mos->tcp_timeout > 0) 920 RemoveFromTimeoutList(mtcp, stream); 921 922 if (stream->rcvvar->rcvbuf) { 923 #ifdef NEWRB 924 tcprb_del(stream->rcvvar->rcvbuf); 925 #else 926 RBFree(mtcp->rbm_rcv, stream->rcvvar->rcvbuf, 927 stream->buffer_mgmt); 928 #endif 929 stream->rcvvar->rcvbuf = NULL; 930 } 931 932 if (flow_lock) 933 pthread_mutex_lock(&mtcp->ctx->flow_pool_lock); 934 935 /* remove from flow hash table */ 936 HTRemove(mtcp->tcp_flow_table, stream); 937 stream->on_hash_table = FALSE; 938 939 mtcp->flow_cnt--; 940 941 /* if there was a corresponding monitor stream socket opened 942 * then close it */ 943 SOCKQ_FOREACH_START(walk, &stream->msocks) { 944 SOCKQ_REMOVE(&stream->msocks, walk); 945 if (stream->pair_stream == NULL) 946 DestroyMonitorStreamSocket(mtcp, walk); 947 } SOCKQ_FOREACH_END; 948 949 if (stream->pair_stream != NULL) { 950 /* Nullify pointer to sibliing tcp_stream's pair_stream */ 951 stream->pair_stream->pair_stream = NULL; 952 } 953 954 MPFreeChunk(mtcp->rv_pool, stream->rcvvar); 955 MPFreeChunk(mtcp->sv_pool, stream->sndvar); 956 MPFreeChunk(mtcp->flow_pool, stream); 957 958 if (flow_lock) 959 /* stand-alone monitor does not need this since it is single-threaded */ 960 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock); 961 962 if (bound_addr) { 963 if (mtcp->ap) { 964 ret = FreeAddress(mtcp->ap, &addr); 965 } else { 966 ret = FreeAddress(ap, &addr); 967 } 968 if (ret < 0) { 969 TRACE_ERROR("(NEVER HAPPEN) Failed to free address.\n"); 970 } 971 } 972 973 #ifdef NETSTAT 974 #if NETSTAT_PERTHREAD 975 TRACE_STREAM("Destroyed. Remaining flows: %u\n", mtcp->flow_cnt); 976 #endif /* NETSTAT_PERTHREAD */ 977 #endif /* NETSTAT */ 978 979 } 980 /*---------------------------------------------------------------------------*/ 981 void 982 DestroyTCPStream(mtcp_manager_t mtcp, tcp_stream *stream) 983 { 984 tcp_stream *pair_stream = stream->pair_stream; 985 986 DestroySingleTCPStream(mtcp, stream); 987 988 if (pair_stream) 989 DestroySingleTCPStream(mtcp, pair_stream); 990 } 991 /*---------------------------------------------------------------------------*/ 992 void 993 DumpStream(mtcp_manager_t mtcp, tcp_stream *stream) 994 { 995 uint8_t *sa, *da; 996 struct tcp_send_vars *sndvar = stream->sndvar; 997 struct tcp_recv_vars *rcvvar = stream->rcvvar; 998 999 sa = (uint8_t *)&stream->saddr; 1000 da = (uint8_t *)&stream->daddr; 1001 thread_printf(mtcp, mtcp->log_fp, "========== Stream %u: " 1002 "%u.%u.%u.%u(%u) -> %u.%u.%u.%u(%u) ==========\n", stream->id, 1003 sa[0], sa[1], sa[2], sa[3], ntohs(stream->sport), 1004 da[0], da[1], da[2], da[3], ntohs(stream->dport)); 1005 thread_printf(mtcp, mtcp->log_fp, 1006 "Stream id: %u, type: %u, state: %s, close_reason: %s\n", 1007 stream->id, stream->stream_type, 1008 TCPStateToString(stream), close_reason_str[stream->close_reason]); 1009 if (stream->socket) { 1010 socket_map_t socket = stream->socket; 1011 thread_printf(mtcp, mtcp->log_fp, "Socket id: %d, type: %d, opts: %u\n" 1012 "epoll: %u (IN: %u, OUT: %u, ERR: %u, RDHUP: %u, ET: %u)\n" 1013 "events: %u (IN: %u, OUT: %u, ERR: %u, RDHUP: %u, ET: %u)\n", 1014 socket->id, socket->socktype, socket->opts, 1015 socket->epoll, socket->epoll & MOS_EPOLLIN, 1016 socket->epoll & MOS_EPOLLOUT, socket->epoll & MOS_EPOLLERR, 1017 socket->epoll & MOS_EPOLLRDHUP, socket->epoll & MOS_EPOLLET, 1018 socket->events, socket->events & MOS_EPOLLIN, 1019 socket->events & MOS_EPOLLOUT, socket->events & MOS_EPOLLERR, 1020 socket->events & MOS_EPOLLRDHUP, socket->events & MOS_EPOLLET); 1021 } else { 1022 thread_printf(mtcp, mtcp->log_fp, "Socket: (null)\n"); 1023 } 1024 1025 thread_printf(mtcp, mtcp->log_fp, 1026 "on_hash_table: %u, on_control_list: %u (wait: %u), on_send_list: %u, " 1027 "on_ack_list: %u, is_wack: %u, ack_cnt: %u\n" 1028 "on_rto_idx: %d, on_timewait_list: %u, on_timeout_list: %u, " 1029 "on_rcv_br_list: %u, on_snd_br_list: %u\n" 1030 "on_sendq: %u, on_ackq: %u, closed: %u, on_closeq: %u, " 1031 "on_closeq_int: %u, on_resetq: %u, on_resetq_int: %u\n" 1032 "have_reset: %u, is_fin_sent: %u, is_fin_ackd: %u, " 1033 "saw_timestamp: %u, sack_permit: %u, " 1034 "is_bound_addr: %u, need_wnd_adv: %u\n", stream->on_hash_table, 1035 sndvar->on_control_list, stream->control_list_waiting, sndvar->on_send_list, 1036 sndvar->on_ack_list, sndvar->is_wack, sndvar->ack_cnt, 1037 stream->on_rto_idx, stream->on_timewait_list, stream->on_timeout_list, 1038 stream->on_rcv_br_list, stream->on_snd_br_list, 1039 sndvar->on_sendq, sndvar->on_ackq, 1040 stream->closed, sndvar->on_closeq, sndvar->on_closeq_int, 1041 sndvar->on_resetq, sndvar->on_resetq_int, 1042 stream->have_reset, sndvar->is_fin_sent, 1043 sndvar->is_fin_ackd, stream->saw_timestamp, stream->sack_permit, 1044 stream->is_bound_addr, stream->need_wnd_adv); 1045 1046 thread_printf(mtcp, mtcp->log_fp, "========== Send variables ==========\n"); 1047 thread_printf(mtcp, mtcp->log_fp, 1048 "ip_id: %u, mss: %u, eff_mss: %u, wscale(me, peer): (%u, %u), nif_out: %d\n", 1049 sndvar->ip_id, sndvar->mss, sndvar->eff_mss, 1050 sndvar->wscale_mine, sndvar->wscale_peer, sndvar->nif_out); 1051 thread_printf(mtcp, mtcp->log_fp, 1052 "snd_nxt: %u, snd_una: %u, iss: %u, fss: %u\nsnd_wnd: %u, " 1053 "peer_wnd: %u, cwnd: %u, ssthresh: %u\n", 1054 stream->snd_nxt, sndvar->snd_una, sndvar->iss, sndvar->fss, 1055 sndvar->snd_wnd, sndvar->peer_wnd, sndvar->cwnd, sndvar->ssthresh); 1056 1057 if (sndvar->sndbuf) { 1058 thread_printf(mtcp, mtcp->log_fp, 1059 "Send buffer: init_seq: %u, head_seq: %u, " 1060 "len: %d, cum_len: %lu, size: %d\n", 1061 sndvar->sndbuf->init_seq, sndvar->sndbuf->head_seq, 1062 sndvar->sndbuf->len, sndvar->sndbuf->cum_len, sndvar->sndbuf->size); 1063 } else { 1064 thread_printf(mtcp, mtcp->log_fp, "Send buffer: (null)\n"); 1065 } 1066 thread_printf(mtcp, mtcp->log_fp, 1067 "nrtx: %u, max_nrtx: %u, rto: %u, ts_rto: %u, " 1068 "ts_lastack_sent: %u\n", sndvar->nrtx, sndvar->max_nrtx, 1069 sndvar->rto, sndvar->ts_rto, sndvar->ts_lastack_sent); 1070 1071 thread_printf(mtcp, mtcp->log_fp, 1072 "========== Receive variables ==========\n"); 1073 thread_printf(mtcp, mtcp->log_fp, 1074 "rcv_nxt: %u, irs: %u, rcv_wnd: %u, " 1075 "snd_wl1: %u, snd_wl2: %u\n", 1076 stream->rcv_nxt, rcvvar->irs, 1077 rcvvar->rcv_wnd, rcvvar->snd_wl1, rcvvar->snd_wl2); 1078 if (rcvvar->rcvbuf) { 1079 #ifdef NEWRB 1080 /* TODO: Implement this */ 1081 #else 1082 thread_printf(mtcp, mtcp->log_fp, 1083 "Receive buffer: init_seq: %u, head_seq: %u, " 1084 "merged_len: %d, cum_len: %lu, last_len: %d, size: %d\n", 1085 rcvvar->rcvbuf->init_seq, rcvvar->rcvbuf->head_seq, 1086 rcvvar->rcvbuf->merged_len, rcvvar->rcvbuf->cum_len, 1087 rcvvar->rcvbuf->last_len, rcvvar->rcvbuf->size); 1088 #endif 1089 } else { 1090 thread_printf(mtcp, mtcp->log_fp, "Receive buffer: (null)\n"); 1091 } 1092 thread_printf(mtcp, mtcp->log_fp, "last_ack_seq: %u, dup_acks: %u\n", 1093 rcvvar->last_ack_seq, rcvvar->dup_acks); 1094 thread_printf(mtcp, mtcp->log_fp, 1095 "ts_recent: %u, ts_lastack_rcvd: %u, ts_last_ts_upd: %u, " 1096 "ts_tw_expire: %u\n", rcvvar->ts_recent, rcvvar->ts_lastack_rcvd, 1097 rcvvar->ts_last_ts_upd, rcvvar->ts_tw_expire); 1098 thread_printf(mtcp, mtcp->log_fp, 1099 "srtt: %u, mdev: %u, mdev_max: %u, rttvar: %u, rtt_seq: %u\n", 1100 rcvvar->srtt, rcvvar->mdev, rcvvar->mdev_max, 1101 rcvvar->rttvar, rcvvar->rtt_seq); 1102 } 1103 /*---------------------------------------------------------------------------*/ 1104