1 #include "debug.h"
2 #include <string.h>
3
4 #include "config.h"
5 #include "tcp_stream.h"
6 #include "fhash.h"
7 #include "tcp.h"
8 #include "tcp_in.h"
9 #include "tcp_out.h"
10 #include "tcp_ring_buffer.h"
11 #include "tcp_send_buffer.h"
12 #include "eventpoll.h"
13 #include "ip_out.h"
14 #include "timer.h"
15 #include "tcp_rb.h"
16 /*---------------------------------------------------------------------------*/
17 char *state_str[] = {
18 "TCP_ST_CLOSED",
19 "TCP_ST_LISTEN",
20 "TCP_ST_SYN_SENT",
21 "TCP_ST_SYN_RCVD",
22 "TCP_ST_ESTABILSHED",
23 "TCP_ST_FIN_WAIT_1",
24 "TCP_ST_FIN_WAIT_2",
25 "TCP_ST_CLOSE_WAIT",
26 "TCP_ST_CLOSING",
27 "TCP_ST_LAST_ACK",
28 "TCP_ST_TIME_WAIT",
29 "TCP_ST_CLOSED_RSVD"
30 };
31 /*---------------------------------------------------------------------------*/
32 char *close_reason_str[] = {
33 "NOT_CLOSED",
34 "CLOSE",
35 "CLOSED",
36 "CONN_FAIL",
37 "CONN_LOST",
38 "RESET",
39 "NO_MEM",
40 "DENIED",
41 "TIMEDOUT"
42 };
43 /*---------------------------------------------------------------------------*/
44 static __thread unsigned long next = 1;
45 /* Function retrieved from POSIX.1-2001 standard */
46 /* RAND_MAX assumed to be 32767 */
47 static int
posix_seq_rand(void)48 posix_seq_rand(void) {
49 next = next * 1103515245 + 12345;
50 return ((unsigned)(next/65536) % 32768);
51 }
52 /*---------------------------------------------------------------------------*/
53 void
posix_seq_srand(unsigned seed)54 posix_seq_srand(unsigned seed) {
55 next = seed % 32768;
56 }
57 /*---------------------------------------------------------------------------*/
58 /**
59 * FYI: This is NOT a read-only return!
60 */
61 int
GetFragInfo(socket_map_t sock,int side,void * optval,socklen_t * len)62 GetFragInfo(socket_map_t sock, int side, void *optval, socklen_t *len)
63 {
64 struct tcp_stream *stream;
65
66 stream = NULL;
67 if (!*len || ( *len % sizeof(tcpfrag_t) != 0))
68 goto frag_info_error;
69
70 if (side != MOS_SIDE_CLI && side != MOS_SIDE_SVR) {
71 TRACE_ERROR("Invalid side requested!\n");
72 exit(EXIT_FAILURE);
73 return -1;
74 }
75
76 struct tcp_stream *mstrm = sock->monitor_stream->stream;
77 stream = (side == mstrm->side) ? mstrm : mstrm->pair_stream;
78
79 if (stream == NULL) goto frag_info_error;
80
81 /* First check if the tcp ring buffer even has anything */
82 if (stream->rcvvar != NULL &&
83 stream->rcvvar->rcvbuf != NULL) {
84 tcprb_t *rcvbuf = stream->rcvvar->rcvbuf;
85 struct tcp_ring_fragment *out = (struct tcp_ring_fragment *)optval;
86 int const maxout = *len;
87 *len = 0;
88 struct _tcpfrag_t *walk;
89 TAILQ_FOREACH(walk, &rcvbuf->frags, link) {
90 if (*len == maxout)
91 break;
92 out[*len].offset = walk->head;
93 out[*len].len = walk->tail - walk->head;
94 (*len)++;
95 }
96 if (*len != maxout) {
97 /* set zero sentinel */
98 out[*len].offset = 0;
99 out[*len].len = 0;
100 }
101 } else
102 goto frag_info_error;
103
104 return 0;
105
106 frag_info_error:
107 optval = NULL;
108 *len = 0;
109 return -1;
110 }
111 /*---------------------------------------------------------------------------*/
112 /**
113 * Comments later...
114 */
115 int
GetBufInfo(socket_map_t sock,int side,void * optval,socklen_t * len)116 GetBufInfo(socket_map_t sock, int side, void *optval, socklen_t *len)
117 {
118 struct tcp_stream *stream;
119 struct tcp_buf_info *tbi;
120
121 tbi = (struct tcp_buf_info *)optval;
122 memset(tbi, 0, sizeof(struct tcp_buf_info));
123 stream = NULL;
124
125 if (*len != sizeof(struct tcp_buf_info)) {
126 errno = EINVAL;
127 goto buf_info_error;
128 }
129
130 if (side != MOS_SIDE_CLI && side != MOS_SIDE_SVR) {
131 TRACE_ERROR("Invalid side requested!\n");
132 errno = EINVAL;
133 goto buf_info_error;
134 }
135
136 struct tcp_stream *mstrm = sock->monitor_stream->stream;
137 stream = (side == mstrm->side) ? mstrm : mstrm->pair_stream;
138
139 /* First check if the tcp ring buffer even has anything */
140 if (stream != NULL &&
141 stream->rcvvar != NULL &&
142 stream->rcvvar->rcvbuf != NULL) {
143 tcprb_t *rcvbuf = stream->rcvvar->rcvbuf;
144 tcpfrag_t *f = TAILQ_LAST(&rcvbuf->frags, flist);
145 tbi->tcpbi_init_seq = stream->rcvvar->irs + 1;
146 tbi->tcpbi_last_byte_read = rcvbuf->pile;
147 tbi->tcpbi_next_byte_expected = rcvbuf->pile + tcprb_cflen(rcvbuf);
148 tbi->tcpbi_last_byte_received = (f ? f->tail : rcvbuf->head);
149 } else {
150 errno = ENODATA;
151 goto buf_info_error;
152 }
153
154 return 0;
155
156 buf_info_error:
157 optval = NULL;
158 *len = 0;
159 return -1;
160 }
161 /*---------------------------------------------------------------------------*/
162 int
DisableBuf(socket_map_t sock,int side)163 DisableBuf(socket_map_t sock, int side)
164 {
165 #ifdef DBGMSG
166 __PREPARE_DBGLOGGING();
167 #endif
168 struct tcp_stream *stream;
169 int rc = 0;
170
171 switch (sock->socktype) {
172 case MOS_SOCK_MONITOR_STREAM:
173 if (side == MOS_SIDE_CLI)
174 sock->monitor_listener->client_buf_mgmt = 0;
175 else if (side == MOS_SIDE_SVR)
176 sock->monitor_listener->server_buf_mgmt = 0;
177 else {
178 assert(0);
179 TRACE_DBG("Invalid side!\n");
180 rc = -1;
181 }
182 break;
183 case MOS_SOCK_MONITOR_STREAM_ACTIVE:
184 stream = sock->monitor_stream->stream;
185 if (stream->side != side)
186 stream = stream->pair_stream;
187 assert(stream->side == side);
188 stream->buffer_mgmt = 0;
189 break;
190 default:
191 assert(0);
192 TRACE_DBG("Can't disable buf for invalid socket!\n");
193 rc = -1;
194 }
195
196 return rc;
197 }
198 /*---------------------------------------------------------------------------*/
199 int
GetLastTimestamp(struct tcp_stream * stream,uint32_t * usecs,socklen_t * len)200 GetLastTimestamp(struct tcp_stream *stream, uint32_t *usecs, socklen_t *len)
201 {
202 #ifdef DBGMSG
203 __PREPARE_DBGLOGGING();
204 #endif
205 if (*len < sizeof(uint32_t)) {
206 TRACE_DBG("Size passed is not >= sizeof(uint32_t)!\n");
207 return -1;
208 }
209
210 *usecs = (stream->last_active_ts >
211 stream->pair_stream->last_active_ts)
212 ?
213 TS_TO_USEC(stream->last_active_ts) :
214 TS_TO_USEC(stream->pair_stream->last_active_ts);
215
216 return 0;
217 }
218 /*---------------------------------------------------------------------------*/
219 inline int
GetTCPState(struct tcp_stream * stream,int side,void * optval,socklen_t * optlen)220 GetTCPState(struct tcp_stream *stream, int side,
221 void *optval, socklen_t *optlen)
222 {
223 if (!stream || !(stream = (side == stream->side) ? stream : stream->pair_stream))
224 return -1;
225 *(int *)optval = (int)((stream->state == TCP_ST_CLOSED_RSVD) ?
226 TCP_ST_CLOSED : stream->state);
227 return 0;
228 }
229 /*---------------------------------------------------------------------------*/
230 inline char *
TCPStateToString(const tcp_stream * stream)231 TCPStateToString(const tcp_stream *stream)
232 {
233 return (stream) ? state_str[stream->state] : NULL;
234 }
235 /*---------------------------------------------------------------------------*/
236 inline void
RaiseReadEvent(mtcp_manager_t mtcp,tcp_stream * stream)237 RaiseReadEvent(mtcp_manager_t mtcp, tcp_stream *stream)
238 {
239 struct tcp_recv_vars *rcvvar;
240
241 rcvvar = stream->rcvvar;
242
243 if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) {
244 if (stream->socket && (stream->socket->epoll & MOS_EPOLLIN))
245 AddEpollEvent(mtcp->ep, MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLIN);
246 } else if (rcvvar->rcvbuf && tcprb_cflen(rcvvar->rcvbuf) > 0) {
247 /*
248 * in case it is a monitoring socket, queue up the read events
249 * in the event_queue of only if the tcp_stream hasn't already
250 * been registered in the event queue
251 */
252 int index;
253 struct event_queue *eq;
254 struct socket_map *walk;
255
256 SOCKQ_FOREACH_START(walk, &stream->msocks) {
257 assert(walk->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE);
258 eq = walk->monitor_stream->monitor_listener->eq;
259
260 /* if it already has read data register... then skip this step */
261 if (stream->actions & MOS_ACT_READ_DATA)
262 return;
263 if (eq->num_events >= eq->size) {
264 TRACE_ERROR("Exceeded epoll event queue! num_events: %d, "
265 "size: %d\n", eq->num_events, eq->size);
266 return;
267 }
268
269 index = eq->end++;
270 eq->events[index].ev.events = MOS_EPOLLIN;
271 eq->events[index].ev.data.ptr = (void *)stream;
272
273 if (eq->end >= eq->size) {
274 eq->end = 0;
275 }
276 eq->num_events++;
277 stream->actions |= MOS_ACT_READ_DATA;
278 } SOCKQ_FOREACH_END;
279 } else {
280 TRACE_EPOLL("Stream %d: Raising read without a socket!\n", stream->id);
281 }
282 }
283 /*---------------------------------------------------------------------------*/
284 inline void
RaiseWriteEvent(mtcp_manager_t mtcp,tcp_stream * stream)285 RaiseWriteEvent(mtcp_manager_t mtcp, tcp_stream *stream)
286 {
287 if (stream->socket) {
288 if (stream->socket->epoll & MOS_EPOLLOUT) {
289 AddEpollEvent(mtcp->ep,
290 MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLOUT);
291 }
292 } else {
293 TRACE_EPOLL("Stream %d: Raising write without a socket!\n", stream->id);
294 }
295 }
296 /*---------------------------------------------------------------------------*/
297 inline void
RaiseCloseEvent(mtcp_manager_t mtcp,tcp_stream * stream)298 RaiseCloseEvent(mtcp_manager_t mtcp, tcp_stream *stream)
299 {
300 if (stream->socket) {
301 if (stream->socket->epoll & MOS_EPOLLRDHUP) {
302 AddEpollEvent(mtcp->ep,
303 MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLRDHUP);
304 } else if (stream->socket->epoll & MOS_EPOLLIN) {
305 AddEpollEvent(mtcp->ep,
306 MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLIN);
307 }
308 } else {
309 TRACE_EPOLL("Stream %d: Raising close without a socket!\n", stream->id);
310 }
311 }
312 /*---------------------------------------------------------------------------*/
313 inline int
RaiseErrorEvent(mtcp_manager_t mtcp,tcp_stream * stream)314 RaiseErrorEvent(mtcp_manager_t mtcp, tcp_stream *stream)
315 {
316 if (stream->socket) {
317 if (stream->socket->epoll & MOS_EPOLLERR) {
318 /* passing closing reason for error notification */
319 return AddEpollEvent(mtcp->ep,
320 MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLERR);
321 }
322 } else {
323 TRACE_EPOLL("Stream %d: Raising error without a socket!\n", stream->id);
324 }
325 return -1;
326 }
327 /*----------------------------------------------------------------------------*/
328 int
AddMonitorStreamSockets(mtcp_manager_t mtcp,struct tcp_stream * stream)329 AddMonitorStreamSockets(mtcp_manager_t mtcp, struct tcp_stream *stream)
330 {
331 struct mtcp_context mctx;
332 int socktype;
333
334 mctx.cpu = mtcp->ctx->cpu;
335 struct mon_listener *walk;
336
337 // traverse the passive socket's list
338 TAILQ_FOREACH(walk, &mtcp->monitors, link) {
339 socktype = walk->socket->socktype;
340
341 if (socktype != MOS_SOCK_MONITOR_STREAM)
342 continue;
343
344 /* mtcp_bind_monitor_filter()
345 * - create an monitor active socket only for the filter-passed flows
346 * - we use the result (= tag) from DetectStreamType() to avoid
347 * evaluating the same BPF filter twice */
348 if (!walk->is_stream_syn_filter_hit) {
349 continue;
350 }
351
352 struct socket_map *s =
353 AllocateSocket(&mctx, MOS_SOCK_MONITOR_STREAM_ACTIVE);
354 if (!s)
355 return -1;
356
357 s->monitor_stream->socket = s;
358 s->monitor_stream->stream = stream;
359 s->monitor_stream->monitor_listener = walk;
360 s->monitor_stream->client_buf_mgmt = walk->client_buf_mgmt;
361 s->monitor_stream->server_buf_mgmt = walk->server_buf_mgmt;
362 s->monitor_stream->client_mon = walk->client_mon;
363 s->monitor_stream->server_mon = walk->server_mon;
364 #ifdef NEWEV
365 s->monitor_stream->stree_dontcare =
366 s->monitor_stream->monitor_listener->stree_dontcare;
367 s->monitor_stream->stree_pre_rcv =
368 s->monitor_stream->monitor_listener->stree_pre_rcv;
369 s->monitor_stream->stree_post_snd =
370 s->monitor_stream->monitor_listener->stree_post_snd;
371 if (s->monitor_stream->stree_dontcare)
372 stree_inc_ref(s->monitor_stream->stree_dontcare);
373 if (s->monitor_stream->stree_pre_rcv)
374 stree_inc_ref(s->monitor_stream->stree_pre_rcv);
375 if (s->monitor_stream->stree_post_snd)
376 stree_inc_ref(s->monitor_stream->stree_post_snd);
377 #else
378 InitEvP(&s->monitor_stream->dontcare_evp,
379 &walk->dontcare_evb);
380 InitEvP(&s->monitor_stream->pre_tcp_evp,
381 &walk->pre_tcp_evb);
382 InitEvP(&s->monitor_stream->post_tcp_evp,
383 &walk->post_tcp_evb);
384 #endif
385
386 SOCKQ_INSERT_TAIL(&stream->msocks, s);
387 }
388
389 return 0;
390 }
391 /*----------------------------------------------------------------------------*/
392 int
DestroyMonitorStreamSocket(mtcp_manager_t mtcp,socket_map_t msock)393 DestroyMonitorStreamSocket(mtcp_manager_t mtcp, socket_map_t msock)
394 {
395 struct mtcp_context mctx;
396 int socktype, sockid, rc;
397
398 if (msock == NULL) {
399 TRACE_DBG("Stream socket does not exist!\n");
400 /* exit(-1); */
401 return 0;
402 }
403
404 rc = 0;
405 mctx.cpu = mtcp->ctx->cpu;
406 socktype = msock->socktype;
407 sockid = msock->id;
408
409 switch (socktype) {
410 case MOS_SOCK_MONITOR_STREAM_ACTIVE:
411 FreeSocket(&mctx, sockid, socktype);
412 break;
413 case MOS_SOCK_MONITOR_RAW:
414 /* do nothing since all raw sockets point to the same socket */
415 break;
416 default:
417 TRACE_DBG("Trying to destroy a monitor socket for an unsupported type!\n");
418 rc = -1;
419 /* exit(-1); */
420 break;
421 }
422
423 return rc;
424 }
425 /*---------------------------------------------------------------------------*/
426 tcp_stream *
CreateTCPStream(mtcp_manager_t mtcp,socket_map_t socket,int type,uint32_t saddr,uint16_t sport,uint32_t daddr,uint16_t dport,unsigned int * hash)427 CreateTCPStream(mtcp_manager_t mtcp, socket_map_t socket, int type,
428 uint32_t saddr, uint16_t sport, uint32_t daddr, uint16_t dport,
429 unsigned int *hash)
430 {
431 tcp_stream *stream = NULL;
432 int ret;
433 /* stand-alone monitor does not need this since it is single-threaded */
434 bool flow_lock = type & STREAM_TYPE(MOS_SOCK_STREAM);
435 //bool flow_lock = false;
436
437 if (flow_lock)
438 pthread_mutex_lock(&mtcp->ctx->flow_pool_lock);
439
440 stream = (tcp_stream *)MPAllocateChunk(mtcp->flow_pool);
441 if (!stream) {
442 TRACE_ERROR("Cannot allocate memory for the stream. "
443 "g_config.mos->max_concurrency: %d, concurrent: %u\n",
444 g_config.mos->max_concurrency, mtcp->flow_cnt);
445 if (flow_lock)
446 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock);
447 return NULL;
448 }
449 memset(stream, 0, sizeof(tcp_stream));
450
451 stream->rcvvar = (struct tcp_recv_vars *)MPAllocateChunk(mtcp->rv_pool);
452 if (!stream->rcvvar) {
453 MPFreeChunk(mtcp->flow_pool, stream);
454 if (flow_lock)
455 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock);
456 return NULL;
457 }
458 memset(stream->rcvvar, 0, sizeof(struct tcp_recv_vars));
459
460 /* stand-alone monitor does not need to do this */
461 stream->sndvar = (struct tcp_send_vars *)MPAllocateChunk(mtcp->sv_pool);
462 if (!stream->sndvar) {
463 MPFreeChunk(mtcp->rv_pool, stream->rcvvar);
464 MPFreeChunk(mtcp->flow_pool, stream);
465 if (flow_lock)
466 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock);
467 return NULL;
468 }
469 //if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM))
470 memset(stream->sndvar, 0, sizeof(struct tcp_send_vars));
471
472 stream->id = mtcp->g_id++;
473 stream->saddr = saddr;
474 stream->sport = sport;
475 stream->daddr = daddr;
476 stream->dport = dport;
477
478 ret = HTInsert(mtcp->tcp_flow_table, stream, hash);
479 if (ret < 0) {
480 TRACE_ERROR("Stream %d: "
481 "Failed to insert the stream into hash table.\n", stream->id);
482 MPFreeChunk(mtcp->flow_pool, stream);
483 if (flow_lock)
484 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock);
485 return NULL;
486 }
487 stream->on_hash_table = TRUE;
488 mtcp->flow_cnt++;
489
490 SOCKQ_INIT(&stream->msocks);
491
492 /*
493 * if an embedded monitor is attached...
494 * create monitor stream socket now!
495 * If socket type is raw.. then don't create it
496 */
497 if ((mtcp->num_msp > 0) &&
498 (type & STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE)))
499 if (AddMonitorStreamSockets(mtcp, stream) < 0)
500 TRACE_DBG("Could not create monitor stream socket!\n");
501
502 if (flow_lock)
503 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock);
504
505 if (socket) {
506 stream->socket = socket;
507 socket->stream = stream;
508 }
509
510 stream->stream_type = type;
511 stream->state = TCP_ST_LISTEN;
512 /* This is handled by core.c, tcp_in.c & tcp_out.c */
513 /* stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; */
514
515 stream->on_rto_idx = -1;
516
517 /* stand-alone monitor does not need to do this */
518 stream->sndvar->mss = TCP_DEFAULT_MSS;
519 stream->sndvar->wscale_mine = TCP_DEFAULT_WSCALE;
520 stream->sndvar->wscale_peer = 0;
521
522 if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) {
523 stream->sndvar->ip_id = 0;
524 stream->sndvar->nif_out = GetOutputInterface(stream->daddr);
525
526 stream->sndvar->iss = posix_seq_rand() % TCP_MAX_SEQ;
527 //stream->sndvar->iss = 0;
528 stream->snd_nxt = stream->sndvar->iss;
529 stream->sndvar->snd_una = stream->sndvar->iss;
530 stream->sndvar->snd_wnd = g_config.mos->wmem_size;
531 stream->sndvar->rto = TCP_INITIAL_RTO;
532 #if USE_SPIN_LOCK
533 if (pthread_spin_init(&stream->sndvar->write_lock, PTHREAD_PROCESS_PRIVATE)) {
534 perror("pthread_spin_init of write_lock");
535 pthread_spin_destroy(&stream->rcvvar->read_lock);
536 #else
537 if (pthread_mutex_init(&stream->sndvar->write_lock, NULL)) {
538 perror("pthread_mutex_init of write_lock");
539 pthread_mutex_destroy(&stream->rcvvar->read_lock);
540 #endif
541 return NULL;
542 }
543 }
544 stream->rcvvar->irs = 0;
545
546 stream->rcv_nxt = 0;
547 stream->rcvvar->rcv_wnd = TCP_INITIAL_WINDOW;
548
549 stream->rcvvar->snd_wl1 = stream->rcvvar->irs - 1;
550
551 stream->buffer_mgmt = BUFMGMT_FULL;
552
553 /* needs state update by default */
554 stream->status_mgmt = 1;
555
556 #if USE_SPIN_LOCK
557 if (pthread_spin_init(&stream->rcvvar->read_lock, PTHREAD_PROCESS_PRIVATE)) {
558 #else
559 if (pthread_mutex_init(&stream->rcvvar->read_lock, NULL)) {
560 #endif
561 perror("pthread_mutex_init of read_lock");
562 return NULL;
563 }
564
565 #ifdef STREAM
566 uint8_t *sa;
567 uint8_t *da;
568
569 sa = (uint8_t *)&stream->saddr;
570 da = (uint8_t *)&stream->daddr;
571 TRACE_STREAM("CREATED NEW TCP STREAM %d: "
572 "%u.%u.%u.%u(%d) -> %u.%u.%u.%u(%d) (ISS: %u)\n", stream->id,
573 sa[0], sa[1], sa[2], sa[3], ntohs(stream->sport),
574 da[0], da[1], da[2], da[3], ntohs(stream->dport),
575 stream->sndvar->iss);
576 #endif
577
578 return stream;
579 }
580 /*----------------------------------------------------------------------------*/
581 inline tcp_stream *
582 CreateDualTCPStream(mtcp_manager_t mtcp, socket_map_t socket, int type, uint32_t saddr,
583 uint16_t sport, uint32_t daddr, uint16_t dport, unsigned int *hash)
584 {
585 tcp_stream *cur_stream, *paired_stream;
586 struct socket_map *walk;
587
588 cur_stream = CreateTCPStream(mtcp, socket, type,
589 saddr, sport, daddr, dport, hash);
590 if (cur_stream == NULL) {
591 TRACE_ERROR("Can't create tcp_stream!\n");
592 return NULL;
593 }
594
595 paired_stream = CreateTCPStream(mtcp, NULL, MOS_SOCK_UNUSED,
596 daddr, dport, saddr, sport, hash);
597 if (paired_stream == NULL) {
598 DestroyTCPStream(mtcp, cur_stream);
599 TRACE_ERROR("Can't create tcp_stream!\n");
600 return NULL;
601 }
602
603 cur_stream->pair_stream = paired_stream;
604 paired_stream->pair_stream = cur_stream;
605 paired_stream->socket = socket;
606 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
607 SOCKQ_INSERT_TAIL(&paired_stream->msocks, walk);
608 } SOCKQ_FOREACH_END;
609 paired_stream->stream_type = STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE);
610
611 return cur_stream;
612 }
613 /*----------------------------------------------------------------------------*/
614 inline tcp_stream *
615 CreateClientTCPStream(mtcp_manager_t mtcp, socket_map_t socket, int type,
616 uint32_t saddr, uint16_t sport, uint32_t daddr, uint16_t dport,
617 unsigned int *hash)
618 {
619 tcp_stream *cs;
620 struct socket_map *w;
621
622 cs = CreateTCPStream(mtcp, socket, type, daddr, dport, saddr, sport, hash);
623 if (cs == NULL) {
624 TRACE_ERROR("Can't create tcp_stream!\n");
625 return NULL;
626 }
627
628 cs->side = MOS_SIDE_CLI;
629 cs->pair_stream = NULL;
630
631 /* if buffer management is off, then disable
632 * monitoring tcp ring of either streams (only if stream
633 * is just monitor stream active)
634 */
635 if (IS_STREAM_TYPE(cs, MOS_SOCK_MONITOR_STREAM_ACTIVE)) {
636 cs->buffer_mgmt = BUFMGMT_OFF;
637 SOCKQ_FOREACH_START(w, &cs->msocks) {
638 uint8_t bm = w->monitor_stream->client_buf_mgmt;
639 if (bm > cs->buffer_mgmt)
640 cs->buffer_mgmt = bm;
641 if (w->monitor_stream->monitor_listener->client_mon == 1)
642 cs->status_mgmt = 1;
643 } SOCKQ_FOREACH_END;
644 }
645
646 return cs;
647 }
648 /*----------------------------------------------------------------------------*/
649 inline tcp_stream *
650 AttachServerTCPStream(mtcp_manager_t mtcp, tcp_stream *cs, int type,
651 uint32_t saddr, uint16_t sport, uint32_t daddr, uint16_t dport)
652 {
653 tcp_stream *ss;
654 struct socket_map *w;
655
656 /* The 3rd arg is a temp hackk... FIXIT! TODO: XXX */
657 ss = CreateTCPStream(mtcp, NULL, MOS_SOCK_UNUSED, saddr, sport, daddr, dport, NULL);
658 if (ss == NULL) {
659 TRACE_ERROR("Can't create tcp_stream!\n");
660 return NULL;
661 }
662
663 ss->side = MOS_SIDE_SVR;
664 cs->pair_stream = ss;
665 ss->pair_stream = cs;
666 ss->socket = cs->socket;
667 SOCKQ_FOREACH_START(w, &cs->msocks) {
668 SOCKQ_INSERT_TAIL(&ss->msocks, w);
669 } SOCKQ_FOREACH_END;
670 ss->stream_type = STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE);
671
672 if (IS_STREAM_TYPE(ss, MOS_SOCK_MONITOR_STREAM_ACTIVE)) {
673 ss->buffer_mgmt = BUFMGMT_OFF;
674 SOCKQ_FOREACH_START(w, &ss->msocks) {
675 uint8_t bm = w->monitor_stream->server_buf_mgmt;
676 if (bm > ss->buffer_mgmt)
677 ss->buffer_mgmt = bm;
678 if (w->monitor_stream->monitor_listener->server_mon == 1)
679 ss->status_mgmt = 1;
680 } SOCKQ_FOREACH_END;
681 }
682
683 return ss;
684 }
685 /*---------------------------------------------------------------------------*/
686 static void
687 DestroySingleTCPStream(mtcp_manager_t mtcp, tcp_stream *stream)
688 {
689 struct sockaddr_in addr;
690 int bound_addr = FALSE;
691 int ret;
692 /* stand-alone monitor does not need this since it is single-threaded */
693 bool flow_lock = HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM);
694
695 struct socket_map *walk;
696
697 /* Set the stream state as CLOSED */
698 stream->state = TCP_ST_CLOSED_RSVD;
699
700 SOCKQ_FOREACH_START(walk, &stream->msocks) {
701 HandleCallback(mtcp, MOS_HK_RCV, walk, stream->side, NULL,
702 MOS_ON_CONN_END | MOS_ON_TCP_STATE_CHANGE | stream->cb_events);
703 HandleCallback(mtcp, MOS_HK_SND, walk, stream->side, NULL,
704 MOS_ON_CONN_END | MOS_ON_TCP_STATE_CHANGE | stream->cb_events);
705 } SOCKQ_FOREACH_END;
706
707 #if 0
708 #ifdef DUMP_STREAM
709 if (stream->close_reason != TCP_ACTIVE_CLOSE &&
710 stream->close_reason != TCP_PASSIVE_CLOSE) {
711 thread_printf(mtcp, mtcp->log_fp,
712 "Stream %d abnormally closed.\n", stream->id);
713 DumpStream(mtcp, stream);
714 DumpControlList(mtcp, mtcp->n_sender[0]);
715 }
716 #endif
717
718 #ifdef STREAM
719 uint8_t *sa, *da;
720 sa = (uint8_t *)&stream->saddr;
721 da = (uint8_t *)&stream->daddr;
722 TRACE_STREAM("DESTROY TCP STREAM %d: "
723 "%u.%u.%u.%u(%d) -> %u.%u.%u.%u(%d) (%s)\n", stream->id,
724 sa[0], sa[1], sa[2], sa[3], ntohs(stream->sport),
725 da[0], da[1], da[2], da[3], ntohs(stream->dport),
726 close_reason_str[stream->close_reason]);
727 #endif
728
729 if (stream->sndvar->sndbuf) {
730 TRACE_FSTAT("Stream %d: send buffer "
731 "cum_len: %lu, len: %u\n", stream->id,
732 stream->sndvar->sndbuf->cum_len,
733 stream->sndvar->sndbuf->len);
734 }
735 if (stream->rcvvar->rcvbuf) {
736 TRACE_FSTAT("Stream %d: recv buffer "
737 "cum_len: %lu, merged_len: %u, last_len: %u\n", stream->id,
738 stream->rcvvar->rcvbuf->cum_len,
739 stream->rcvvar->rcvbuf->merged_len,
740 stream->rcvvar->rcvbuf->last_len);
741 }
742
743 #if RTM_STAT
744 /* Triple duplicated ack stats */
745 if (stream->sndvar->rstat.tdp_ack_cnt) {
746 TRACE_FSTAT("Stream %d: triple duplicated ack: %u, "
747 "retransmission bytes: %u, average rtm bytes/ack: %u\n",
748 stream->id,
749 stream->sndvar->rstat.tdp_ack_cnt, stream->sndvar->rstat.tdp_ack_bytes,
750 stream->sndvar->rstat.tdp_ack_bytes / stream->sndvar->rstat.tdp_ack_cnt);
751 }
752
753 /* Retransmission timeout stats */
754 if (stream->sndvar->rstat.rto_cnt > 0) {
755 TRACE_FSTAT("Stream %d: timeout count: %u, bytes: %u\n", stream->id,
756 stream->sndvar->rstat.rto_cnt, stream->sndvar->rstat.rto_bytes);
757 }
758
759 /* Recovery stats */
760 if (stream->sndvar->rstat.ack_upd_cnt) {
761 TRACE_FSTAT("Stream %d: snd_nxt update count: %u, "
762 "snd_nxt update bytes: %u, average update bytes/update: %u\n",
763 stream->id,
764 stream->sndvar->rstat.ack_upd_cnt, stream->sndvar->rstat.ack_upd_bytes,
765 stream->sndvar->rstat.ack_upd_bytes / stream->sndvar->rstat.ack_upd_cnt);
766 }
767 #if TCP_OPT_SACK_ENABLED
768 if (stream->sndvar->rstat.sack_cnt) {
769 TRACE_FSTAT("Selective ack count: %u, bytes: %u, "
770 "average bytes/ack: %u\n",
771 stream->sndvar->rstat.sack_cnt, stream->sndvar->rstat.sack_bytes,
772 stream->sndvar->rstat.sack_bytes / stream->sndvar->rstat.sack_cnt);
773 } else {
774 TRACE_FSTAT("Selective ack count: %u, bytes: %u\n",
775 stream->sndvar->rstat.sack_cnt, stream->sndvar->rstat.sack_bytes);
776 }
777 if (stream->sndvar->rstat.tdp_sack_cnt) {
778 TRACE_FSTAT("Selective tdp ack count: %u, bytes: %u, "
779 "average bytes/ack: %u\n",
780 stream->sndvar->rstat.tdp_sack_cnt, stream->sndvar->rstat.tdp_sack_bytes,
781 stream->sndvar->rstat.tdp_sack_bytes / stream->sndvar->rstat.tdp_sack_cnt);
782 } else {
783 TRACE_FSTAT("Selective ack count: %u, bytes: %u\n",
784 stream->sndvar->rstat.tdp_sack_cnt, stream->sndvar->rstat.tdp_sack_bytes);
785 }
786 #endif /* TCP_OPT_SACK_ENABLED */
787 #endif /* RTM_STAT */
788 #endif
789
790 if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) {
791 /* stand-alone monitor does not need to do these */
792 if (stream->is_bound_addr) {
793 bound_addr = TRUE;
794 addr.sin_addr.s_addr = stream->saddr;
795 addr.sin_port = stream->sport;
796 }
797
798 RemoveFromControlList(mtcp, stream);
799 RemoveFromSendList(mtcp, stream);
800 RemoveFromACKList(mtcp, stream);
801
802 if (stream->on_rto_idx >= 0)
803 RemoveFromRTOList(mtcp, stream);
804
805 SBUF_LOCK_DESTROY(&stream->rcvvar->read_lock);
806 SBUF_LOCK_DESTROY(&stream->sndvar->write_lock);
807
808 assert(stream->on_hash_table == TRUE);
809
810 /* free ring buffers */
811 if (stream->sndvar->sndbuf) {
812 SBFree(mtcp->rbm_snd, stream->sndvar->sndbuf);
813 stream->sndvar->sndbuf = NULL;
814 }
815 }
816
817 if (stream->on_timewait_list)
818 RemoveFromTimewaitList(mtcp, stream);
819
820 if (g_config.mos->tcp_timeout > 0)
821 RemoveFromTimeoutList(mtcp, stream);
822
823 if (stream->rcvvar->rcvbuf) {
824 tcprb_del(stream->rcvvar->rcvbuf);
825 stream->rcvvar->rcvbuf = NULL;
826 }
827
828 if (flow_lock)
829 pthread_mutex_lock(&mtcp->ctx->flow_pool_lock);
830
831 /* remove from flow hash table */
832 HTRemove(mtcp->tcp_flow_table, stream);
833 stream->on_hash_table = FALSE;
834
835 mtcp->flow_cnt--;
836
837 /* if there was a corresponding monitor stream socket opened
838 * then close it */
839 SOCKQ_FOREACH_START(walk, &stream->msocks) {
840 SOCKQ_REMOVE(&stream->msocks, walk);
841 if (stream->pair_stream == NULL)
842 DestroyMonitorStreamSocket(mtcp, walk);
843 } SOCKQ_FOREACH_END;
844
845 if (stream->pair_stream != NULL) {
846 /* Nullify pointer to sibliing tcp_stream's pair_stream */
847 stream->pair_stream->pair_stream = NULL;
848 }
849
850 MPFreeChunk(mtcp->rv_pool, stream->rcvvar);
851 MPFreeChunk(mtcp->sv_pool, stream->sndvar);
852 MPFreeChunk(mtcp->flow_pool, stream);
853
854 if (flow_lock)
855 /* stand-alone monitor does not need this since it is single-threaded */
856 pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock);
857
858 if (bound_addr) {
859 if (mtcp->ap) {
860 ret = FreeAddress(mtcp->ap, &addr);
861 } else {
862 int nif;
863 nif = GetOutputInterface(addr.sin_addr.s_addr);
864 if (nif < 0) {
865 TRACE_ERROR("Can't determine interface idx!\n");
866 exit(EXIT_FAILURE);
867 } else {
868 ret = FreeAddress(ap[nif], &addr);
869 }
870 }
871 if (ret < 0) {
872 TRACE_ERROR("(NEVER HAPPEN) Failed to free address.\n");
873 }
874 }
875
876 #ifdef NETSTAT
877 #if NETSTAT_PERTHREAD
878 TRACE_STREAM("Destroyed. Remaining flows: %u\n", mtcp->flow_cnt);
879 #endif /* NETSTAT_PERTHREAD */
880 #endif /* NETSTAT */
881
882 }
883 /*---------------------------------------------------------------------------*/
884 void
885 DestroyTCPStream(mtcp_manager_t mtcp, tcp_stream *stream)
886 {
887 tcp_stream *pair_stream = stream->pair_stream;
888
889 DestroySingleTCPStream(mtcp, stream);
890
891 if (pair_stream)
892 DestroySingleTCPStream(mtcp, pair_stream);
893 }
894 /*---------------------------------------------------------------------------*/
895 void
896 DumpStream(mtcp_manager_t mtcp, tcp_stream *stream)
897 {
898 uint8_t *sa, *da;
899 struct tcp_send_vars *sndvar = stream->sndvar;
900 struct tcp_recv_vars *rcvvar = stream->rcvvar;
901
902 sa = (uint8_t *)&stream->saddr;
903 da = (uint8_t *)&stream->daddr;
904 thread_printf(mtcp, mtcp->log_fp, "========== Stream %u: "
905 "%u.%u.%u.%u(%u) -> %u.%u.%u.%u(%u) ==========\n", stream->id,
906 sa[0], sa[1], sa[2], sa[3], ntohs(stream->sport),
907 da[0], da[1], da[2], da[3], ntohs(stream->dport));
908 thread_printf(mtcp, mtcp->log_fp,
909 "Stream id: %u, type: %u, state: %s, close_reason: %s\n",
910 stream->id, stream->stream_type,
911 TCPStateToString(stream), close_reason_str[stream->close_reason]);
912 if (stream->socket) {
913 socket_map_t socket = stream->socket;
914 thread_printf(mtcp, mtcp->log_fp, "Socket id: %d, type: %d, opts: %u\n"
915 "epoll: %u (IN: %u, OUT: %u, ERR: %u, RDHUP: %u, ET: %u)\n"
916 "events: %u (IN: %u, OUT: %u, ERR: %u, RDHUP: %u, ET: %u)\n",
917 socket->id, socket->socktype, socket->opts,
918 socket->epoll, socket->epoll & MOS_EPOLLIN,
919 socket->epoll & MOS_EPOLLOUT, socket->epoll & MOS_EPOLLERR,
920 socket->epoll & MOS_EPOLLRDHUP, socket->epoll & MOS_EPOLLET,
921 socket->events, socket->events & MOS_EPOLLIN,
922 socket->events & MOS_EPOLLOUT, socket->events & MOS_EPOLLERR,
923 socket->events & MOS_EPOLLRDHUP, socket->events & MOS_EPOLLET);
924 } else {
925 thread_printf(mtcp, mtcp->log_fp, "Socket: (null)\n");
926 }
927
928 thread_printf(mtcp, mtcp->log_fp,
929 "on_hash_table: %u, on_control_list: %u (wait: %u), on_send_list: %u, "
930 "on_ack_list: %u, is_wack: %u, ack_cnt: %u\n"
931 "on_rto_idx: %d, on_timewait_list: %u, on_timeout_list: %u, "
932 "on_rcv_br_list: %u, on_snd_br_list: %u\n"
933 "on_sendq: %u, on_ackq: %u, closed: %u, on_closeq: %u, "
934 "on_closeq_int: %u, on_resetq: %u, on_resetq_int: %u\n"
935 "have_reset: %u, is_fin_sent: %u, is_fin_ackd: %u, "
936 "saw_timestamp: %u, sack_permit: %u, "
937 "is_bound_addr: %u, need_wnd_adv: %u\n", stream->on_hash_table,
938 sndvar->on_control_list, stream->control_list_waiting, sndvar->on_send_list,
939 sndvar->on_ack_list, sndvar->is_wack, sndvar->ack_cnt,
940 stream->on_rto_idx, stream->on_timewait_list, stream->on_timeout_list,
941 stream->on_rcv_br_list, stream->on_snd_br_list,
942 sndvar->on_sendq, sndvar->on_ackq,
943 stream->closed, sndvar->on_closeq, sndvar->on_closeq_int,
944 sndvar->on_resetq, sndvar->on_resetq_int,
945 stream->have_reset, sndvar->is_fin_sent,
946 sndvar->is_fin_ackd, stream->saw_timestamp, stream->sack_permit,
947 stream->is_bound_addr, stream->need_wnd_adv);
948
949 thread_printf(mtcp, mtcp->log_fp, "========== Send variables ==========\n");
950 thread_printf(mtcp, mtcp->log_fp,
951 "ip_id: %u, mss: %u, eff_mss: %u, wscale(me, peer): (%u, %u), nif_out: %d\n",
952 sndvar->ip_id, sndvar->mss, sndvar->eff_mss,
953 sndvar->wscale_mine, sndvar->wscale_peer, sndvar->nif_out);
954 thread_printf(mtcp, mtcp->log_fp,
955 "snd_nxt: %u, snd_una: %u, iss: %u, fss: %u\nsnd_wnd: %u, "
956 "peer_wnd: %u, cwnd: %u, ssthresh: %u\n",
957 stream->snd_nxt, sndvar->snd_una, sndvar->iss, sndvar->fss,
958 sndvar->snd_wnd, sndvar->peer_wnd, sndvar->cwnd, sndvar->ssthresh);
959
960 if (sndvar->sndbuf) {
961 thread_printf(mtcp, mtcp->log_fp,
962 "Send buffer: init_seq: %u, head_seq: %u, "
963 "len: %d, cum_len: %lu, size: %d\n",
964 sndvar->sndbuf->init_seq, sndvar->sndbuf->head_seq,
965 sndvar->sndbuf->len, sndvar->sndbuf->cum_len, sndvar->sndbuf->size);
966 } else {
967 thread_printf(mtcp, mtcp->log_fp, "Send buffer: (null)\n");
968 }
969 thread_printf(mtcp, mtcp->log_fp,
970 "nrtx: %u, max_nrtx: %u, rto: %u, ts_rto: %u, "
971 "ts_lastack_sent: %u\n", sndvar->nrtx, sndvar->max_nrtx,
972 sndvar->rto, sndvar->ts_rto, sndvar->ts_lastack_sent);
973
974 thread_printf(mtcp, mtcp->log_fp,
975 "========== Receive variables ==========\n");
976 thread_printf(mtcp, mtcp->log_fp,
977 "rcv_nxt: %u, irs: %u, rcv_wnd: %u, "
978 "snd_wl1: %u, snd_wl2: %u\n",
979 stream->rcv_nxt, rcvvar->irs,
980 rcvvar->rcv_wnd, rcvvar->snd_wl1, rcvvar->snd_wl2);
981 if (!rcvvar->rcvbuf) {
982 thread_printf(mtcp, mtcp->log_fp, "Receive buffer: (null)\n");
983 }
984 thread_printf(mtcp, mtcp->log_fp, "last_ack_seq: %u, dup_acks: %u\n",
985 rcvvar->last_ack_seq, rcvvar->dup_acks);
986 thread_printf(mtcp, mtcp->log_fp,
987 "ts_recent: %u, ts_lastack_rcvd: %u, ts_last_ts_upd: %u, "
988 "ts_tw_expire: %u\n", rcvvar->ts_recent, rcvvar->ts_lastack_rcvd,
989 rcvvar->ts_last_ts_upd, rcvvar->ts_tw_expire);
990 thread_printf(mtcp, mtcp->log_fp,
991 "srtt: %u, mdev: %u, mdev_max: %u, rttvar: %u, rtt_seq: %u\n",
992 rcvvar->srtt, rcvvar->mdev, rcvvar->mdev_max,
993 rcvvar->rttvar, rcvvar->rtt_seq);
994 }
995 /*---------------------------------------------------------------------------*/
996