1 #include <string.h> 2 3 #include "config.h" 4 #include "tcp_stream.h" 5 #include "fhash.h" 6 #include "tcp.h" 7 #include "tcp_in.h" 8 #include "tcp_out.h" 9 #include "tcp_ring_buffer.h" 10 #include "tcp_send_buffer.h" 11 #include "eventpoll.h" 12 #include "ip_out.h" 13 #include "timer.h" 14 #include "debug.h" 15 #include "tcp_rb.h" 16 17 /*---------------------------------------------------------------------------*/ 18 char *state_str[] = { 19 "TCP_ST_CLOSED", 20 "TCP_ST_LISTEN", 21 "TCP_ST_SYN_SENT", 22 "TCP_ST_SYN_RCVD", 23 "TCP_ST_ESTABILSHED", 24 "TCP_ST_FIN_WAIT_1", 25 "TCP_ST_FIN_WAIT_2", 26 "TCP_ST_CLOSE_WAIT", 27 "TCP_ST_CLOSING", 28 "TCP_ST_LAST_ACK", 29 "TCP_ST_TIME_WAIT", 30 "TCP_ST_CLOSED_RSVD" 31 }; 32 /*---------------------------------------------------------------------------*/ 33 char *close_reason_str[] = { 34 "NOT_CLOSED", 35 "CLOSE", 36 "CLOSED", 37 "CONN_FAIL", 38 "CONN_LOST", 39 "RESET", 40 "NO_MEM", 41 "DENIED", 42 "TIMEDOUT" 43 }; 44 /*---------------------------------------------------------------------------*/ 45 static __thread unsigned long next = 1; 46 /* Function retrieved from POSIX.1-2001 standard */ 47 /* RAND_MAX assumed to be 32767 */ 48 static int 49 posix_seq_rand(void) { 50 next = next * 1103515245 + 12345; 51 return ((unsigned)(next/65536) % 32768); 52 } 53 /*---------------------------------------------------------------------------*/ 54 void 55 posix_seq_srand(unsigned seed) { 56 next = seed % 32768; 57 } 58 /*---------------------------------------------------------------------------*/ 59 uint32_t 60 FetchSeqDrift(struct tcp_stream *stream, uint32_t seq) 61 { 62 int i = 0; 63 uint8_t flag = 1; 64 int count; 65 66 i = stream->sndvar->sre_index - 1; 67 if (i == -1) i = SRE_MAX - 1; 68 count = 0; 69 70 while (flag) { 71 if (stream->sndvar->sre[i].seq_base == 0) 72 return 0; 73 else if (seq >= stream->sndvar->sre[i].seq_base) 74 return stream->sndvar->sre[i].seq_off; 75 76 i--; 77 if (i == -1) i = SRE_MAX - 1; 78 count++; 79 if (count == SRE_MAX) 80 flag = 1; 81 } 82 83 return 0; 84 } 85 /*---------------------------------------------------------------------------*/ 86 int 87 TcpSeqChange(socket_map_t socket, uint32_t seq_drift, int side, uint32_t seqno) 88 { 89 struct tcp_stream *mstrm, *stream; 90 91 if (side != MOS_SIDE_CLI && side != MOS_SIDE_SVR) { 92 TRACE_ERROR("Invalid side requested!\n"); 93 errno = EINVAL; 94 return -1; 95 } 96 97 mstrm = socket->monitor_stream->stream; 98 stream = (side == mstrm->side) ? mstrm : mstrm->pair_stream; 99 if (stream == NULL) { 100 TRACE_ERROR("Stream pointer for sockid: %u not found!\n", 101 socket->id); 102 errno = EBADF; 103 return -1; 104 } 105 106 stream->sndvar->sre[stream->sndvar->sre_index].seq_off = seq_drift; 107 stream->sndvar->sre[stream->sndvar->sre_index].seq_off += 108 (stream->sndvar->sre_index == 0) ? stream->sndvar->sre[SRE_MAX - 1].seq_off : 109 stream->sndvar->sre[stream->sndvar->sre_index - 1].seq_off; 110 stream->sndvar->sre[stream->sndvar->sre_index].seq_base = seqno; 111 stream->sndvar->sre_index = (stream->sndvar->sre_index + 1) & (SRE_MAX - 1); 112 113 return 0; 114 } 115 /*---------------------------------------------------------------------------*/ 116 /** 117 * FYI: This is NOT a read-only return! 118 */ 119 int 120 GetFragInfo(socket_map_t sock, int side, void *optval, socklen_t *len) 121 { 122 #ifdef NEWRB 123 #ifndef NEWPPEEK 124 /* TODO: Implement this */ 125 return -1; 126 #endif 127 #endif 128 struct tcp_stream *stream; 129 130 stream = NULL; 131 #ifdef NEWPPEEK 132 if (!*len || ( *len % sizeof(tcpfrag_t) != 0)) 133 goto frag_info_error; 134 #else 135 if (*len != sizeof(struct fragment_ctx)) 136 goto frag_info_error; 137 #endif 138 139 if (side != MOS_SIDE_CLI && side != MOS_SIDE_SVR) { 140 TRACE_ERROR("Invalid side requested!\n"); 141 exit(EXIT_FAILURE); 142 return -1; 143 } 144 145 struct tcp_stream *mstrm = sock->monitor_stream->stream; 146 stream = (side == mstrm->side) ? mstrm : mstrm->pair_stream; 147 148 if (stream == NULL) goto frag_info_error; 149 150 /* First check if the tcp ring buffer even has anything */ 151 if (stream->rcvvar != NULL && 152 stream->rcvvar->rcvbuf != NULL) { 153 #ifdef NEWRB 154 #ifdef NEWPPEEK 155 tcprb_t *rcvbuf = stream->rcvvar->rcvbuf; 156 struct tcp_ring_fragment *out = (struct tcp_ring_fragment *)optval; 157 int const maxout = *len; 158 *len = 0; 159 struct _tcpfrag_t *walk; 160 TAILQ_FOREACH(walk, &rcvbuf->frags, link) { 161 if (*len == maxout) 162 break; 163 out[*len].offset = walk->head; 164 out[*len].len = walk->tail - walk->head; 165 (*len)++; 166 } 167 if (*len != maxout) { 168 /* set zero sentinel */ 169 out[*len].offset = 0; 170 out[*len].len = 0; 171 } 172 #endif 173 #else 174 struct tcp_ring_buffer *rcvbuf = stream->rcvvar->rcvbuf; 175 if (rcvbuf->fctx == NULL) 176 goto frag_info_error; 177 else { 178 optval = (void *)rcvbuf->fctx; 179 *len = rcvbuf->head_seq + 180 (rcvbuf->tail_offset - 181 rcvbuf->head_offset); 182 } 183 #endif 184 } else 185 goto frag_info_error; 186 187 return 0; 188 189 frag_info_error: 190 optval = NULL; 191 *len = 0; 192 return -1; 193 } 194 /*---------------------------------------------------------------------------*/ 195 /** 196 * Comments later... 197 */ 198 int 199 GetBufInfo(socket_map_t sock, int side, void *optval, socklen_t *len) 200 { 201 struct tcp_stream *stream; 202 struct tcp_buf_info *tbi; 203 204 tbi = (struct tcp_buf_info *)optval; 205 memset(tbi, 0, sizeof(struct tcp_buf_info)); 206 stream = NULL; 207 208 if (*len != sizeof(struct tcp_buf_info)) { 209 errno = EINVAL; 210 goto buf_info_error; 211 } 212 213 if (side != MOS_SIDE_CLI && side != MOS_SIDE_SVR) { 214 TRACE_ERROR("Invalid side requested!\n"); 215 errno = EINVAL; 216 goto buf_info_error; 217 } 218 219 struct tcp_stream *mstrm = sock->monitor_stream->stream; 220 stream = (side == mstrm->side) ? mstrm : mstrm->pair_stream; 221 222 /* First check if the tcp ring buffer even has anything */ 223 if (stream != NULL && 224 stream->rcvvar != NULL && 225 stream->rcvvar->rcvbuf != NULL) { 226 #ifdef NEWRB 227 tcprb_t *rcvbuf = stream->rcvvar->rcvbuf; 228 tcpfrag_t *f = TAILQ_LAST(&rcvbuf->frags, flist); 229 tbi->tcpbi_init_seq = stream->rcvvar->irs + 1; 230 tbi->tcpbi_last_byte_read = rcvbuf->pile; 231 tbi->tcpbi_next_byte_expected = rcvbuf->pile + tcprb_cflen(rcvbuf); 232 tbi->tcpbi_last_byte_received = (f ? f->tail : rcvbuf->head); 233 #else 234 struct tcp_ring_buffer *rcvbuf = stream->rcvvar->rcvbuf; 235 tbi->tcpbi_init_seq = rcvbuf->init_seq; 236 tbi->tcpbi_last_byte_read = rcvbuf->head_seq; 237 tbi->tcpbi_next_byte_expected = rcvbuf->head_seq + rcvbuf->merged_len; 238 tbi->tcpbi_last_byte_received = rcvbuf->head_seq + rcvbuf->last_len; 239 #endif 240 } else { 241 errno = ENODATA; 242 goto buf_info_error; 243 } 244 245 return 0; 246 247 buf_info_error: 248 optval = NULL; 249 *len = 0; 250 return -1; 251 } 252 /*---------------------------------------------------------------------------*/ 253 int 254 DisableBuf(socket_map_t sock, int side) 255 { 256 struct tcp_stream *stream; 257 int rc = 0; 258 259 switch (sock->socktype) { 260 case MOS_SOCK_MONITOR_STREAM: 261 if (side == MOS_SIDE_CLI) 262 sock->monitor_listener->client_buf_mgmt = 0; 263 else if (side == MOS_SIDE_SVR) 264 sock->monitor_listener->server_buf_mgmt = 0; 265 else { 266 assert(0); 267 TRACE_DBG("Invalid side!\n"); 268 rc = -1; 269 } 270 break; 271 case MOS_SOCK_MONITOR_STREAM_ACTIVE: 272 stream = sock->monitor_stream->stream; 273 if (stream->side != side) 274 stream = stream->pair_stream; 275 assert(stream->side == side); 276 stream->buffer_mgmt = 0; 277 break; 278 default: 279 assert(0); 280 TRACE_DBG("Can't disable buf for invalid socket!\n"); 281 rc = -1; 282 } 283 284 return rc; 285 } 286 /*---------------------------------------------------------------------------*/ 287 int 288 GetLastTimestamp(struct tcp_stream *stream, uint32_t *usecs, socklen_t *len) 289 { 290 if (*len < sizeof(uint32_t)) { 291 TRACE_DBG("Size passed is not >= sizeof(uint32_t)!\n"); 292 return -1; 293 } 294 295 *usecs = (stream->last_active_ts > 296 stream->pair_stream->last_active_ts) 297 ? 298 TS_TO_USEC(stream->last_active_ts) : 299 TS_TO_USEC(stream->pair_stream->last_active_ts); 300 301 return 0; 302 } 303 /*---------------------------------------------------------------------------*/ 304 inline int 305 GetTCPState(struct tcp_stream *stream, int side, 306 void *optval, socklen_t *optlen) 307 { 308 if (!stream || !(stream = (side == stream->side) ? stream : stream->pair_stream)) 309 return -1; 310 *(int *)optval = (int)((stream->state == TCP_ST_CLOSED_RSVD) ? 311 TCP_ST_CLOSED : stream->state); 312 return 0; 313 } 314 /*---------------------------------------------------------------------------*/ 315 inline char * 316 TCPStateToString(const tcp_stream *stream) 317 { 318 return (stream) ? state_str[stream->state] : NULL; 319 } 320 /*---------------------------------------------------------------------------*/ 321 inline void 322 RaiseReadEvent(mtcp_manager_t mtcp, tcp_stream *stream) 323 { 324 struct tcp_recv_vars *rcvvar; 325 326 rcvvar = stream->rcvvar; 327 328 if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) { 329 if (stream->socket && (stream->socket->epoll & MOS_EPOLLIN)) 330 AddEpollEvent(mtcp->ep, MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLIN); 331 #ifdef NEWRB 332 } else if (rcvvar->rcvbuf && tcprb_cflen(rcvvar->rcvbuf) > 0) { 333 #else 334 } else if (rcvvar->rcvbuf && rcvvar->rcvbuf->merged_len) { 335 #endif 336 /* 337 * in case it is a monitoring socket, queue up the read events 338 * in the event_queue of only if the tcp_stream hasn't already 339 * been registered in the event queue 340 */ 341 int index; 342 struct event_queue *eq; 343 struct socket_map *walk; 344 345 SOCKQ_FOREACH_START(walk, &stream->msocks) { 346 assert(walk->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE); 347 eq = walk->monitor_stream->monitor_listener->eq; 348 349 /* if it already has read data register... then skip this step */ 350 if (stream->actions & MOS_ACT_READ_DATA) 351 return; 352 if (eq->num_events >= eq->size) { 353 TRACE_ERROR("Exceeded epoll event queue! num_events: %d, " 354 "size: %d\n", eq->num_events, eq->size); 355 return; 356 } 357 358 index = eq->end++; 359 eq->events[index].ev.events = MOS_EPOLLIN; 360 eq->events[index].ev.data.ptr = (void *)stream; 361 362 if (eq->end >= eq->size) { 363 eq->end = 0; 364 } 365 eq->num_events++; 366 stream->actions |= MOS_ACT_READ_DATA; 367 } SOCKQ_FOREACH_END; 368 } else { 369 TRACE_EPOLL("Stream %d: Raising read without a socket!\n", stream->id); 370 } 371 } 372 /*---------------------------------------------------------------------------*/ 373 inline void 374 RaiseWriteEvent(mtcp_manager_t mtcp, tcp_stream *stream) 375 { 376 if (stream->socket) { 377 if (stream->socket->epoll & MOS_EPOLLOUT) { 378 AddEpollEvent(mtcp->ep, 379 MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLOUT); 380 } 381 } else { 382 TRACE_EPOLL("Stream %d: Raising write without a socket!\n", stream->id); 383 } 384 } 385 /*---------------------------------------------------------------------------*/ 386 inline void 387 RaiseCloseEvent(mtcp_manager_t mtcp, tcp_stream *stream) 388 { 389 if (stream->socket) { 390 if (stream->socket->epoll & MOS_EPOLLRDHUP) { 391 AddEpollEvent(mtcp->ep, 392 MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLRDHUP); 393 } else if (stream->socket->epoll & MOS_EPOLLIN) { 394 AddEpollEvent(mtcp->ep, 395 MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLIN); 396 } 397 } else { 398 TRACE_EPOLL("Stream %d: Raising close without a socket!\n", stream->id); 399 } 400 } 401 /*---------------------------------------------------------------------------*/ 402 inline int 403 RaiseErrorEvent(mtcp_manager_t mtcp, tcp_stream *stream) 404 { 405 if (stream->socket) { 406 if (stream->socket->epoll & MOS_EPOLLERR) { 407 /* passing closing reason for error notification */ 408 return AddEpollEvent(mtcp->ep, 409 MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLERR); 410 } 411 } else { 412 TRACE_EPOLL("Stream %d: Raising error without a socket!\n", stream->id); 413 } 414 return -1; 415 } 416 /*----------------------------------------------------------------------------*/ 417 int 418 AddMonitorStreamSockets(mtcp_manager_t mtcp, struct tcp_stream *stream) 419 { 420 struct mtcp_context mctx; 421 int socktype; 422 423 mctx.cpu = mtcp->ctx->cpu; 424 struct mon_listener *walk; 425 426 // traverse the passive socket's list 427 TAILQ_FOREACH(walk, &mtcp->monitors, link) { 428 socktype = walk->socket->socktype; 429 430 if (socktype != MOS_SOCK_MONITOR_STREAM) 431 continue; 432 433 /* mtcp_bind_monitor_filter() 434 * - create an monitor active socket only for the filter-passed flows 435 * - we use the result (= tag) from DetectStreamType() to avoid 436 * evaluating the same BPF filter twice */ 437 if (!walk->is_stream_syn_filter_hit) { 438 continue; 439 } 440 441 struct socket_map *s = 442 AllocateSocket(&mctx, MOS_SOCK_MONITOR_STREAM_ACTIVE); 443 if (!s) 444 return -1; 445 446 s->monitor_stream->socket = s; 447 s->monitor_stream->stream = stream; 448 s->monitor_stream->monitor_listener = walk; 449 s->monitor_stream->client_buf_mgmt = walk->client_buf_mgmt; 450 s->monitor_stream->server_buf_mgmt = walk->server_buf_mgmt; 451 s->monitor_stream->client_mon = walk->client_mon; 452 s->monitor_stream->server_mon = walk->server_mon; 453 #ifdef NEWEV 454 s->monitor_stream->stree_dontcare = 455 s->monitor_stream->monitor_listener->stree_dontcare; 456 s->monitor_stream->stree_pre_rcv = 457 s->monitor_stream->monitor_listener->stree_pre_rcv; 458 s->monitor_stream->stree_post_snd = 459 s->monitor_stream->monitor_listener->stree_post_snd; 460 if (s->monitor_stream->stree_dontcare) 461 stree_inc_ref(s->monitor_stream->stree_dontcare); 462 if (s->monitor_stream->stree_pre_rcv) 463 stree_inc_ref(s->monitor_stream->stree_pre_rcv); 464 if (s->monitor_stream->stree_post_snd) 465 stree_inc_ref(s->monitor_stream->stree_post_snd); 466 #else 467 InitEvP(&s->monitor_stream->dontcare_evp, 468 &walk->dontcare_evb); 469 InitEvP(&s->monitor_stream->pre_tcp_evp, 470 &walk->pre_tcp_evb); 471 InitEvP(&s->monitor_stream->post_tcp_evp, 472 &walk->post_tcp_evb); 473 #endif 474 475 SOCKQ_INSERT_TAIL(&stream->msocks, s); 476 } 477 478 return 0; 479 } 480 /*----------------------------------------------------------------------------*/ 481 int 482 DestroyMonitorStreamSocket(mtcp_manager_t mtcp, socket_map_t msock) 483 { 484 struct mtcp_context mctx; 485 int socktype, sockid, rc; 486 487 if (msock == NULL) { 488 TRACE_DBG("Stream socket does not exist!\n"); 489 /* exit(-1); */ 490 return 0; 491 } 492 493 rc = 0; 494 mctx.cpu = mtcp->ctx->cpu; 495 socktype = msock->socktype; 496 sockid = msock->id; 497 498 switch (socktype) { 499 case MOS_SOCK_MONITOR_STREAM_ACTIVE: 500 FreeSocket(&mctx, sockid, socktype); 501 break; 502 case MOS_SOCK_MONITOR_RAW: 503 /* do nothing since all raw sockets point to the same socket */ 504 break; 505 default: 506 TRACE_DBG("Trying to destroy a monitor socket for an unsupported type!\n"); 507 rc = -1; 508 /* exit(-1); */ 509 break; 510 } 511 512 return rc; 513 } 514 /*---------------------------------------------------------------------------*/ 515 tcp_stream * 516 CreateTCPStream(mtcp_manager_t mtcp, socket_map_t socket, int type, 517 uint32_t saddr, uint16_t sport, uint32_t daddr, uint16_t dport, 518 unsigned int *hash) 519 { 520 tcp_stream *stream = NULL; 521 int ret; 522 /* stand-alone monitor does not need this since it is single-threaded */ 523 bool flow_lock = type & STREAM_TYPE(MOS_SOCK_STREAM); 524 //bool flow_lock = false; 525 526 if (flow_lock) 527 pthread_mutex_lock(&mtcp->ctx->flow_pool_lock); 528 529 stream = (tcp_stream *)MPAllocateChunk(mtcp->flow_pool); 530 if (!stream) { 531 TRACE_ERROR("Cannot allocate memory for the stream. " 532 "g_config.mos->max_concurrency: %d, concurrent: %u\n", 533 g_config.mos->max_concurrency, mtcp->flow_cnt); 534 if (flow_lock) 535 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock); 536 return NULL; 537 } 538 memset(stream, 0, sizeof(tcp_stream)); 539 540 stream->rcvvar = (struct tcp_recv_vars *)MPAllocateChunk(mtcp->rv_pool); 541 if (!stream->rcvvar) { 542 MPFreeChunk(mtcp->flow_pool, stream); 543 if (flow_lock) 544 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock); 545 return NULL; 546 } 547 memset(stream->rcvvar, 0, sizeof(struct tcp_recv_vars)); 548 549 /* stand-alone monitor does not need to do this */ 550 stream->sndvar = (struct tcp_send_vars *)MPAllocateChunk(mtcp->sv_pool); 551 if (!stream->sndvar) { 552 MPFreeChunk(mtcp->rv_pool, stream->rcvvar); 553 MPFreeChunk(mtcp->flow_pool, stream); 554 if (flow_lock) 555 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock); 556 return NULL; 557 } 558 //if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) 559 memset(stream->sndvar, 0, sizeof(struct tcp_send_vars)); 560 561 stream->id = mtcp->g_id++; 562 stream->saddr = saddr; 563 stream->sport = sport; 564 stream->daddr = daddr; 565 stream->dport = dport; 566 567 ret = HTInsert(mtcp->tcp_flow_table, stream, hash); 568 if (ret < 0) { 569 TRACE_ERROR("Stream %d: " 570 "Failed to insert the stream into hash table.\n", stream->id); 571 MPFreeChunk(mtcp->flow_pool, stream); 572 if (flow_lock) 573 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock); 574 return NULL; 575 } 576 stream->on_hash_table = TRUE; 577 mtcp->flow_cnt++; 578 579 SOCKQ_INIT(&stream->msocks); 580 581 /* 582 * if an embedded monitor is attached... 583 * create monitor stream socket now! 584 * If socket type is raw.. then don't create it 585 */ 586 if ((mtcp->num_msp > 0) && 587 (type & STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE))) 588 if (AddMonitorStreamSockets(mtcp, stream) < 0) 589 TRACE_DBG("Could not create monitor stream socket!\n"); 590 591 if (flow_lock) 592 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock); 593 594 if (socket) { 595 stream->socket = socket; 596 socket->stream = stream; 597 } 598 599 stream->stream_type = type; 600 stream->state = TCP_ST_LISTEN; 601 /* This is handled by core.c, tcp_in.c & tcp_out.c */ 602 /* stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; */ 603 604 stream->on_rto_idx = -1; 605 606 /* stand-alone monitor does not need to do this */ 607 stream->sndvar->mss = TCP_DEFAULT_MSS; 608 stream->sndvar->wscale_mine = TCP_DEFAULT_WSCALE; 609 stream->sndvar->wscale_peer = 0; 610 611 if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) { 612 stream->sndvar->ip_id = 0; 613 stream->sndvar->nif_out = GetOutputInterface(stream->daddr); 614 615 stream->sndvar->iss = posix_seq_rand() % TCP_MAX_SEQ; 616 //stream->sndvar->iss = 0; 617 stream->snd_nxt = stream->sndvar->iss; 618 stream->sndvar->snd_una = stream->sndvar->iss; 619 stream->sndvar->snd_wnd = g_config.mos->wmem_size; 620 stream->sndvar->rto = TCP_INITIAL_RTO; 621 #if USE_SPIN_LOCK 622 if (pthread_spin_init(&stream->sndvar->write_lock, PTHREAD_PROCESS_PRIVATE)) { 623 perror("pthread_spin_init of write_lock"); 624 pthread_spin_destroy(&stream->rcvvar->read_lock); 625 #else 626 if (pthread_mutex_init(&stream->sndvar->write_lock, NULL)) { 627 perror("pthread_mutex_init of write_lock"); 628 pthread_mutex_destroy(&stream->rcvvar->read_lock); 629 #endif 630 return NULL; 631 } 632 } 633 stream->rcvvar->irs = 0; 634 635 stream->rcv_nxt = 0; 636 stream->rcvvar->rcv_wnd = TCP_INITIAL_WINDOW; 637 638 stream->rcvvar->snd_wl1 = stream->rcvvar->irs - 1; 639 640 #ifdef NEWRB 641 stream->buffer_mgmt = BUFMGMT_FULL; 642 #else 643 /* set tcp_ring_buffer management to `on' by default */ 644 stream->buffer_mgmt = TRUE; 645 #endif 646 647 /* needs state update by default */ 648 stream->status_mgmt = 1; 649 650 #if USE_SPIN_LOCK 651 if (pthread_spin_init(&stream->rcvvar->read_lock, PTHREAD_PROCESS_PRIVATE)) { 652 #else 653 if (pthread_mutex_init(&stream->rcvvar->read_lock, NULL)) { 654 #endif 655 perror("pthread_mutex_init of read_lock"); 656 return NULL; 657 } 658 659 #ifdef STREAM 660 uint8_t *sa; 661 uint8_t *da; 662 663 sa = (uint8_t *)&stream->saddr; 664 da = (uint8_t *)&stream->daddr; 665 TRACE_STREAM("CREATED NEW TCP STREAM %d: " 666 "%u.%u.%u.%u(%d) -> %u.%u.%u.%u(%d) (ISS: %u)\n", stream->id, 667 sa[0], sa[1], sa[2], sa[3], ntohs(stream->sport), 668 da[0], da[1], da[2], da[3], ntohs(stream->dport), 669 stream->sndvar->iss); 670 #endif 671 672 return stream; 673 } 674 /*----------------------------------------------------------------------------*/ 675 inline tcp_stream * 676 CreateDualTCPStream(mtcp_manager_t mtcp, socket_map_t socket, int type, uint32_t saddr, 677 uint16_t sport, uint32_t daddr, uint16_t dport, unsigned int *hash) 678 { 679 tcp_stream *cur_stream, *paired_stream; 680 struct socket_map *walk; 681 682 cur_stream = CreateTCPStream(mtcp, socket, type, 683 saddr, sport, daddr, dport, hash); 684 if (cur_stream == NULL) { 685 TRACE_ERROR("Can't create tcp_stream!\n"); 686 return NULL; 687 } 688 689 paired_stream = CreateTCPStream(mtcp, NULL, MOS_SOCK_UNUSED, 690 daddr, dport, saddr, sport, hash); 691 if (paired_stream == NULL) { 692 DestroyTCPStream(mtcp, cur_stream); 693 TRACE_ERROR("Can't create tcp_stream!\n"); 694 return NULL; 695 } 696 697 cur_stream->pair_stream = paired_stream; 698 paired_stream->pair_stream = cur_stream; 699 paired_stream->socket = socket; 700 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 701 SOCKQ_INSERT_TAIL(&paired_stream->msocks, walk); 702 } SOCKQ_FOREACH_END; 703 paired_stream->stream_type = STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE); 704 705 return cur_stream; 706 } 707 /*----------------------------------------------------------------------------*/ 708 inline tcp_stream * 709 CreateClientTCPStream(mtcp_manager_t mtcp, socket_map_t socket, int type, 710 uint32_t saddr, uint16_t sport, uint32_t daddr, uint16_t dport, 711 unsigned int *hash) 712 { 713 tcp_stream *cs; 714 struct socket_map *w; 715 716 cs = CreateTCPStream(mtcp, socket, type, daddr, dport, saddr, sport, hash); 717 if (cs == NULL) { 718 TRACE_ERROR("Can't create tcp_stream!\n"); 719 return NULL; 720 } 721 722 cs->side = MOS_SIDE_CLI; 723 cs->pair_stream = NULL; 724 725 /* if buffer management is off, then disable 726 * monitoring tcp ring of either streams (only if stream 727 * is just monitor stream active) 728 */ 729 if (IS_STREAM_TYPE(cs, MOS_SOCK_MONITOR_STREAM_ACTIVE)) { 730 cs->buffer_mgmt = BUFMGMT_OFF; 731 SOCKQ_FOREACH_START(w, &cs->msocks) { 732 uint8_t bm = w->monitor_stream->client_buf_mgmt; 733 if (bm > cs->buffer_mgmt) 734 cs->buffer_mgmt = bm; 735 if (w->monitor_stream->monitor_listener->client_mon == 1) 736 cs->status_mgmt = 1; 737 } SOCKQ_FOREACH_END; 738 } 739 740 return cs; 741 } 742 /*----------------------------------------------------------------------------*/ 743 inline tcp_stream * 744 AttachServerTCPStream(mtcp_manager_t mtcp, tcp_stream *cs, int type, 745 uint32_t saddr, uint16_t sport, uint32_t daddr, uint16_t dport) 746 { 747 tcp_stream *ss; 748 struct socket_map *w; 749 750 /* The 3rd arg is a temp hackk... FIXIT! TODO: XXX */ 751 ss = CreateTCPStream(mtcp, NULL, MOS_SOCK_UNUSED, saddr, sport, daddr, dport, NULL); 752 if (ss == NULL) { 753 TRACE_ERROR("Can't create tcp_stream!\n"); 754 return NULL; 755 } 756 757 ss->side = MOS_SIDE_SVR; 758 cs->pair_stream = ss; 759 ss->pair_stream = cs; 760 ss->socket = cs->socket; 761 SOCKQ_FOREACH_START(w, &cs->msocks) { 762 SOCKQ_INSERT_TAIL(&ss->msocks, w); 763 } SOCKQ_FOREACH_END; 764 ss->stream_type = STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE); 765 766 if (IS_STREAM_TYPE(ss, MOS_SOCK_MONITOR_STREAM_ACTIVE)) { 767 ss->buffer_mgmt = BUFMGMT_OFF; 768 SOCKQ_FOREACH_START(w, &ss->msocks) { 769 uint8_t bm = w->monitor_stream->server_buf_mgmt; 770 if (bm > ss->buffer_mgmt) 771 ss->buffer_mgmt = bm; 772 if (w->monitor_stream->monitor_listener->server_mon == 1) 773 ss->status_mgmt = 1; 774 } SOCKQ_FOREACH_END; 775 } 776 777 return ss; 778 } 779 /*---------------------------------------------------------------------------*/ 780 static void 781 DestroySingleTCPStream(mtcp_manager_t mtcp, tcp_stream *stream) 782 { 783 struct sockaddr_in addr; 784 int bound_addr = FALSE; 785 int ret; 786 /* stand-alone monitor does not need this since it is single-threaded */ 787 bool flow_lock = HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM); 788 789 struct socket_map *walk; 790 791 /* Set the stream state as CLOSED */ 792 stream->state = TCP_ST_CLOSED_RSVD; 793 794 SOCKQ_FOREACH_START(walk, &stream->msocks) { 795 HandleCallback(mtcp, MOS_HK_RCV, walk, stream->side, NULL, 796 MOS_ON_CONN_END | MOS_ON_TCP_STATE_CHANGE | stream->cb_events); 797 HandleCallback(mtcp, MOS_HK_SND, walk, stream->side, NULL, 798 MOS_ON_CONN_END | MOS_ON_TCP_STATE_CHANGE | stream->cb_events); 799 } SOCKQ_FOREACH_END; 800 801 #if 0 802 #ifdef DUMP_STREAM 803 if (stream->close_reason != TCP_ACTIVE_CLOSE && 804 stream->close_reason != TCP_PASSIVE_CLOSE) { 805 thread_printf(mtcp, mtcp->log_fp, 806 "Stream %d abnormally closed.\n", stream->id); 807 DumpStream(mtcp, stream); 808 DumpControlList(mtcp, mtcp->n_sender[0]); 809 } 810 #endif 811 812 #ifdef STREAM 813 uint8_t *sa, *da; 814 sa = (uint8_t *)&stream->saddr; 815 da = (uint8_t *)&stream->daddr; 816 TRACE_STREAM("DESTROY TCP STREAM %d: " 817 "%u.%u.%u.%u(%d) -> %u.%u.%u.%u(%d) (%s)\n", stream->id, 818 sa[0], sa[1], sa[2], sa[3], ntohs(stream->sport), 819 da[0], da[1], da[2], da[3], ntohs(stream->dport), 820 close_reason_str[stream->close_reason]); 821 #endif 822 823 if (stream->sndvar->sndbuf) { 824 TRACE_FSTAT("Stream %d: send buffer " 825 "cum_len: %lu, len: %u\n", stream->id, 826 stream->sndvar->sndbuf->cum_len, 827 stream->sndvar->sndbuf->len); 828 } 829 if (stream->rcvvar->rcvbuf) { 830 TRACE_FSTAT("Stream %d: recv buffer " 831 "cum_len: %lu, merged_len: %u, last_len: %u\n", stream->id, 832 stream->rcvvar->rcvbuf->cum_len, 833 stream->rcvvar->rcvbuf->merged_len, 834 stream->rcvvar->rcvbuf->last_len); 835 } 836 837 #if RTM_STAT 838 /* Triple duplicated ack stats */ 839 if (stream->sndvar->rstat.tdp_ack_cnt) { 840 TRACE_FSTAT("Stream %d: triple duplicated ack: %u, " 841 "retransmission bytes: %u, average rtm bytes/ack: %u\n", 842 stream->id, 843 stream->sndvar->rstat.tdp_ack_cnt, stream->sndvar->rstat.tdp_ack_bytes, 844 stream->sndvar->rstat.tdp_ack_bytes / stream->sndvar->rstat.tdp_ack_cnt); 845 } 846 847 /* Retransmission timeout stats */ 848 if (stream->sndvar->rstat.rto_cnt > 0) { 849 TRACE_FSTAT("Stream %d: timeout count: %u, bytes: %u\n", stream->id, 850 stream->sndvar->rstat.rto_cnt, stream->sndvar->rstat.rto_bytes); 851 } 852 853 /* Recovery stats */ 854 if (stream->sndvar->rstat.ack_upd_cnt) { 855 TRACE_FSTAT("Stream %d: snd_nxt update count: %u, " 856 "snd_nxt update bytes: %u, average update bytes/update: %u\n", 857 stream->id, 858 stream->sndvar->rstat.ack_upd_cnt, stream->sndvar->rstat.ack_upd_bytes, 859 stream->sndvar->rstat.ack_upd_bytes / stream->sndvar->rstat.ack_upd_cnt); 860 } 861 #if TCP_OPT_SACK_ENABLED 862 if (stream->sndvar->rstat.sack_cnt) { 863 TRACE_FSTAT("Selective ack count: %u, bytes: %u, " 864 "average bytes/ack: %u\n", 865 stream->sndvar->rstat.sack_cnt, stream->sndvar->rstat.sack_bytes, 866 stream->sndvar->rstat.sack_bytes / stream->sndvar->rstat.sack_cnt); 867 } else { 868 TRACE_FSTAT("Selective ack count: %u, bytes: %u\n", 869 stream->sndvar->rstat.sack_cnt, stream->sndvar->rstat.sack_bytes); 870 } 871 if (stream->sndvar->rstat.tdp_sack_cnt) { 872 TRACE_FSTAT("Selective tdp ack count: %u, bytes: %u, " 873 "average bytes/ack: %u\n", 874 stream->sndvar->rstat.tdp_sack_cnt, stream->sndvar->rstat.tdp_sack_bytes, 875 stream->sndvar->rstat.tdp_sack_bytes / stream->sndvar->rstat.tdp_sack_cnt); 876 } else { 877 TRACE_FSTAT("Selective ack count: %u, bytes: %u\n", 878 stream->sndvar->rstat.tdp_sack_cnt, stream->sndvar->rstat.tdp_sack_bytes); 879 } 880 #endif /* TCP_OPT_SACK_ENABLED */ 881 #endif /* RTM_STAT */ 882 #endif 883 884 if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) { 885 /* stand-alone monitor does not need to do these */ 886 if (stream->is_bound_addr) { 887 bound_addr = TRUE; 888 addr.sin_addr.s_addr = stream->saddr; 889 addr.sin_port = stream->sport; 890 } 891 892 RemoveFromControlList(mtcp, stream); 893 RemoveFromSendList(mtcp, stream); 894 RemoveFromACKList(mtcp, stream); 895 896 if (stream->on_rto_idx >= 0) 897 RemoveFromRTOList(mtcp, stream); 898 899 SBUF_LOCK_DESTROY(&stream->rcvvar->read_lock); 900 SBUF_LOCK_DESTROY(&stream->sndvar->write_lock); 901 902 assert(stream->on_hash_table == TRUE); 903 904 /* free ring buffers */ 905 if (stream->sndvar->sndbuf) { 906 SBFree(mtcp->rbm_snd, stream->sndvar->sndbuf); 907 stream->sndvar->sndbuf = NULL; 908 } 909 } 910 911 if (stream->on_timewait_list) 912 RemoveFromTimewaitList(mtcp, stream); 913 914 if (g_config.mos->tcp_timeout > 0) 915 RemoveFromTimeoutList(mtcp, stream); 916 917 if (stream->rcvvar->rcvbuf) { 918 #ifdef NEWRB 919 tcprb_del(stream->rcvvar->rcvbuf); 920 #else 921 RBFree(mtcp->rbm_rcv, stream->rcvvar->rcvbuf, 922 stream->buffer_mgmt); 923 #endif 924 stream->rcvvar->rcvbuf = NULL; 925 } 926 927 if (flow_lock) 928 pthread_mutex_lock(&mtcp->ctx->flow_pool_lock); 929 930 /* remove from flow hash table */ 931 HTRemove(mtcp->tcp_flow_table, stream); 932 stream->on_hash_table = FALSE; 933 934 mtcp->flow_cnt--; 935 936 /* if there was a corresponding monitor stream socket opened 937 * then close it */ 938 SOCKQ_FOREACH_START(walk, &stream->msocks) { 939 SOCKQ_REMOVE(&stream->msocks, walk); 940 if (stream->pair_stream == NULL) 941 DestroyMonitorStreamSocket(mtcp, walk); 942 } SOCKQ_FOREACH_END; 943 944 if (stream->pair_stream != NULL) { 945 /* Nullify pointer to sibliing tcp_stream's pair_stream */ 946 stream->pair_stream->pair_stream = NULL; 947 } 948 949 MPFreeChunk(mtcp->rv_pool, stream->rcvvar); 950 MPFreeChunk(mtcp->sv_pool, stream->sndvar); 951 MPFreeChunk(mtcp->flow_pool, stream); 952 953 if (flow_lock) 954 /* stand-alone monitor does not need this since it is single-threaded */ 955 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock); 956 957 if (bound_addr) { 958 if (mtcp->ap) { 959 ret = FreeAddress(mtcp->ap, &addr); 960 } else { 961 ret = FreeAddress(ap, &addr); 962 } 963 if (ret < 0) { 964 TRACE_ERROR("(NEVER HAPPEN) Failed to free address.\n"); 965 } 966 } 967 968 #ifdef NETSTAT 969 #if NETSTAT_PERTHREAD 970 TRACE_STREAM("Destroyed. Remaining flows: %u\n", mtcp->flow_cnt); 971 #endif /* NETSTAT_PERTHREAD */ 972 #endif /* NETSTAT */ 973 974 } 975 /*---------------------------------------------------------------------------*/ 976 void 977 DestroyTCPStream(mtcp_manager_t mtcp, tcp_stream *stream) 978 { 979 tcp_stream *pair_stream = stream->pair_stream; 980 981 DestroySingleTCPStream(mtcp, stream); 982 983 if (pair_stream) 984 DestroySingleTCPStream(mtcp, pair_stream); 985 } 986 /*---------------------------------------------------------------------------*/ 987 void 988 DumpStream(mtcp_manager_t mtcp, tcp_stream *stream) 989 { 990 uint8_t *sa, *da; 991 struct tcp_send_vars *sndvar = stream->sndvar; 992 struct tcp_recv_vars *rcvvar = stream->rcvvar; 993 994 sa = (uint8_t *)&stream->saddr; 995 da = (uint8_t *)&stream->daddr; 996 thread_printf(mtcp, mtcp->log_fp, "========== Stream %u: " 997 "%u.%u.%u.%u(%u) -> %u.%u.%u.%u(%u) ==========\n", stream->id, 998 sa[0], sa[1], sa[2], sa[3], ntohs(stream->sport), 999 da[0], da[1], da[2], da[3], ntohs(stream->dport)); 1000 thread_printf(mtcp, mtcp->log_fp, 1001 "Stream id: %u, type: %u, state: %s, close_reason: %s\n", 1002 stream->id, stream->stream_type, 1003 TCPStateToString(stream), close_reason_str[stream->close_reason]); 1004 if (stream->socket) { 1005 socket_map_t socket = stream->socket; 1006 thread_printf(mtcp, mtcp->log_fp, "Socket id: %d, type: %d, opts: %u\n" 1007 "epoll: %u (IN: %u, OUT: %u, ERR: %u, RDHUP: %u, ET: %u)\n" 1008 "events: %u (IN: %u, OUT: %u, ERR: %u, RDHUP: %u, ET: %u)\n", 1009 socket->id, socket->socktype, socket->opts, 1010 socket->epoll, socket->epoll & MOS_EPOLLIN, 1011 socket->epoll & MOS_EPOLLOUT, socket->epoll & MOS_EPOLLERR, 1012 socket->epoll & MOS_EPOLLRDHUP, socket->epoll & MOS_EPOLLET, 1013 socket->events, socket->events & MOS_EPOLLIN, 1014 socket->events & MOS_EPOLLOUT, socket->events & MOS_EPOLLERR, 1015 socket->events & MOS_EPOLLRDHUP, socket->events & MOS_EPOLLET); 1016 } else { 1017 thread_printf(mtcp, mtcp->log_fp, "Socket: (null)\n"); 1018 } 1019 1020 thread_printf(mtcp, mtcp->log_fp, 1021 "on_hash_table: %u, on_control_list: %u (wait: %u), on_send_list: %u, " 1022 "on_ack_list: %u, is_wack: %u, ack_cnt: %u\n" 1023 "on_rto_idx: %d, on_timewait_list: %u, on_timeout_list: %u, " 1024 "on_rcv_br_list: %u, on_snd_br_list: %u\n" 1025 "on_sendq: %u, on_ackq: %u, closed: %u, on_closeq: %u, " 1026 "on_closeq_int: %u, on_resetq: %u, on_resetq_int: %u\n" 1027 "have_reset: %u, is_fin_sent: %u, is_fin_ackd: %u, " 1028 "saw_timestamp: %u, sack_permit: %u, " 1029 "is_bound_addr: %u, need_wnd_adv: %u\n", stream->on_hash_table, 1030 sndvar->on_control_list, stream->control_list_waiting, sndvar->on_send_list, 1031 sndvar->on_ack_list, sndvar->is_wack, sndvar->ack_cnt, 1032 stream->on_rto_idx, stream->on_timewait_list, stream->on_timeout_list, 1033 stream->on_rcv_br_list, stream->on_snd_br_list, 1034 sndvar->on_sendq, sndvar->on_ackq, 1035 stream->closed, sndvar->on_closeq, sndvar->on_closeq_int, 1036 sndvar->on_resetq, sndvar->on_resetq_int, 1037 stream->have_reset, sndvar->is_fin_sent, 1038 sndvar->is_fin_ackd, stream->saw_timestamp, stream->sack_permit, 1039 stream->is_bound_addr, stream->need_wnd_adv); 1040 1041 thread_printf(mtcp, mtcp->log_fp, "========== Send variables ==========\n"); 1042 thread_printf(mtcp, mtcp->log_fp, 1043 "ip_id: %u, mss: %u, eff_mss: %u, wscale(me, peer): (%u, %u), nif_out: %d\n", 1044 sndvar->ip_id, sndvar->mss, sndvar->eff_mss, 1045 sndvar->wscale_mine, sndvar->wscale_peer, sndvar->nif_out); 1046 thread_printf(mtcp, mtcp->log_fp, 1047 "snd_nxt: %u, snd_una: %u, iss: %u, fss: %u\nsnd_wnd: %u, " 1048 "peer_wnd: %u, cwnd: %u, ssthresh: %u\n", 1049 stream->snd_nxt, sndvar->snd_una, sndvar->iss, sndvar->fss, 1050 sndvar->snd_wnd, sndvar->peer_wnd, sndvar->cwnd, sndvar->ssthresh); 1051 1052 if (sndvar->sndbuf) { 1053 thread_printf(mtcp, mtcp->log_fp, 1054 "Send buffer: init_seq: %u, head_seq: %u, " 1055 "len: %d, cum_len: %lu, size: %d\n", 1056 sndvar->sndbuf->init_seq, sndvar->sndbuf->head_seq, 1057 sndvar->sndbuf->len, sndvar->sndbuf->cum_len, sndvar->sndbuf->size); 1058 } else { 1059 thread_printf(mtcp, mtcp->log_fp, "Send buffer: (null)\n"); 1060 } 1061 thread_printf(mtcp, mtcp->log_fp, 1062 "nrtx: %u, max_nrtx: %u, rto: %u, ts_rto: %u, " 1063 "ts_lastack_sent: %u\n", sndvar->nrtx, sndvar->max_nrtx, 1064 sndvar->rto, sndvar->ts_rto, sndvar->ts_lastack_sent); 1065 1066 thread_printf(mtcp, mtcp->log_fp, 1067 "========== Receive variables ==========\n"); 1068 thread_printf(mtcp, mtcp->log_fp, 1069 "rcv_nxt: %u, irs: %u, rcv_wnd: %u, " 1070 "snd_wl1: %u, snd_wl2: %u\n", 1071 stream->rcv_nxt, rcvvar->irs, 1072 rcvvar->rcv_wnd, rcvvar->snd_wl1, rcvvar->snd_wl2); 1073 if (rcvvar->rcvbuf) { 1074 #ifdef NEWRB 1075 /* TODO: Implement this */ 1076 #else 1077 thread_printf(mtcp, mtcp->log_fp, 1078 "Receive buffer: init_seq: %u, head_seq: %u, " 1079 "merged_len: %d, cum_len: %lu, last_len: %d, size: %d\n", 1080 rcvvar->rcvbuf->init_seq, rcvvar->rcvbuf->head_seq, 1081 rcvvar->rcvbuf->merged_len, rcvvar->rcvbuf->cum_len, 1082 rcvvar->rcvbuf->last_len, rcvvar->rcvbuf->size); 1083 #endif 1084 } else { 1085 thread_printf(mtcp, mtcp->log_fp, "Receive buffer: (null)\n"); 1086 } 1087 thread_printf(mtcp, mtcp->log_fp, "last_ack_seq: %u, dup_acks: %u\n", 1088 rcvvar->last_ack_seq, rcvvar->dup_acks); 1089 thread_printf(mtcp, mtcp->log_fp, 1090 "ts_recent: %u, ts_lastack_rcvd: %u, ts_last_ts_upd: %u, " 1091 "ts_tw_expire: %u\n", rcvvar->ts_recent, rcvvar->ts_lastack_rcvd, 1092 rcvvar->ts_last_ts_upd, rcvvar->ts_tw_expire); 1093 thread_printf(mtcp, mtcp->log_fp, 1094 "srtt: %u, mdev: %u, mdev_max: %u, rttvar: %u, rtt_seq: %u\n", 1095 rcvvar->srtt, rcvvar->mdev, rcvvar->mdev_max, 1096 rcvvar->rttvar, rcvvar->rtt_seq); 1097 } 1098 /*---------------------------------------------------------------------------*/ 1099