1 #include "debug.h"
2 #include <string.h>
3 
4 #include "config.h"
5 #include "tcp_stream.h"
6 #include "fhash.h"
7 #include "tcp.h"
8 #include "tcp_in.h"
9 #include "tcp_out.h"
10 #include "tcp_ring_buffer.h"
11 #include "tcp_send_buffer.h"
12 #include "eventpoll.h"
13 #include "ip_out.h"
14 #include "timer.h"
15 #include "tcp_rb.h"
16 /*---------------------------------------------------------------------------*/
17 char *state_str[] = {
18 	"TCP_ST_CLOSED",
19 	"TCP_ST_LISTEN",
20 	"TCP_ST_SYN_SENT",
21 	"TCP_ST_SYN_RCVD",
22 	"TCP_ST_ESTABILSHED",
23 	"TCP_ST_FIN_WAIT_1",
24 	"TCP_ST_FIN_WAIT_2",
25 	"TCP_ST_CLOSE_WAIT",
26 	"TCP_ST_CLOSING",
27 	"TCP_ST_LAST_ACK",
28 	"TCP_ST_TIME_WAIT",
29 	"TCP_ST_CLOSED_RSVD"
30 };
31 /*---------------------------------------------------------------------------*/
32 char *close_reason_str[] = {
33 	"NOT_CLOSED",
34 	"CLOSE",
35 	"CLOSED",
36 	"CONN_FAIL",
37 	"CONN_LOST",
38 	"RESET",
39 	"NO_MEM",
40 	"DENIED",
41 	"TIMEDOUT"
42 };
43 /*---------------------------------------------------------------------------*/
44 static __thread unsigned long next = 1;
45 /* Function retrieved from POSIX.1-2001 standard */
46 /* RAND_MAX assumed to be 32767 */
47 static int
48 posix_seq_rand(void) {
49 	next = next * 1103515245 + 12345;
50 	return ((unsigned)(next/65536) % 32768);
51 }
52 /*---------------------------------------------------------------------------*/
53 void
54 posix_seq_srand(unsigned seed) {
55 	next = seed % 32768;
56 }
57 /*---------------------------------------------------------------------------*/
58 /**
59  * FYI: This is NOT a read-only return!
60  */
61 int
62 GetFragInfo(socket_map_t sock, int side, void *optval, socklen_t *len)
63 {
64 	struct tcp_stream *stream;
65 
66 	stream = NULL;
67 	if (!*len || ( *len % sizeof(tcpfrag_t) != 0))
68 		goto frag_info_error;
69 
70 	if (side != MOS_SIDE_CLI && side != MOS_SIDE_SVR) {
71 		TRACE_ERROR("Invalid side requested!\n");
72 		exit(EXIT_FAILURE);
73 		return -1;
74 	}
75 
76 	struct tcp_stream *mstrm = sock->monitor_stream->stream;
77 	stream = (side == mstrm->side) ? mstrm : mstrm->pair_stream;
78 
79 	if (stream == NULL) goto frag_info_error;
80 
81 	/* First check if the tcp ring buffer even has anything */
82 	if (stream->rcvvar != NULL &&
83 	    stream->rcvvar->rcvbuf != NULL) {
84 		tcprb_t *rcvbuf = stream->rcvvar->rcvbuf;
85 		struct tcp_ring_fragment *out = (struct tcp_ring_fragment *)optval;
86 		int const maxout = *len;
87 		*len = 0;
88 		struct _tcpfrag_t *walk;
89 		TAILQ_FOREACH(walk, &rcvbuf->frags, link) {
90 			if (*len == maxout)
91 				break;
92 			out[*len].offset = walk->head;
93 			out[*len].len = walk->tail - walk->head;
94 			(*len)++;
95 		}
96 		if (*len != maxout) {
97 			/* set zero sentinel */
98 			out[*len].offset = 0;
99 			out[*len].len = 0;
100 		}
101 	} else
102 		goto frag_info_error;
103 
104 	return 0;
105 
106  frag_info_error:
107 	optval = NULL;
108 	*len = 0;
109 	return -1;
110 }
111 /*---------------------------------------------------------------------------*/
112 /**
113  * Comments later...
114  */
115 int
116 GetBufInfo(socket_map_t sock, int side, void *optval, socklen_t *len)
117 {
118 	struct tcp_stream *stream;
119 	struct tcp_buf_info *tbi;
120 
121 	tbi = (struct tcp_buf_info *)optval;
122 	memset(tbi, 0, sizeof(struct tcp_buf_info));
123 	stream = NULL;
124 
125 	if (*len != sizeof(struct tcp_buf_info)) {
126 		errno = EINVAL;
127 		goto buf_info_error;
128 	}
129 
130 	if (side != MOS_SIDE_CLI && side != MOS_SIDE_SVR) {
131 		TRACE_ERROR("Invalid side requested!\n");
132 		errno = EINVAL;
133 		goto buf_info_error;
134 	}
135 
136 	struct tcp_stream *mstrm = sock->monitor_stream->stream;
137 	stream = (side == mstrm->side) ? mstrm : mstrm->pair_stream;
138 
139 	/* First check if the tcp ring buffer even has anything */
140 	if (stream != NULL &&
141 	    stream->rcvvar != NULL &&
142 	    stream->rcvvar->rcvbuf != NULL) {
143 		tcprb_t *rcvbuf = stream->rcvvar->rcvbuf;
144 		tcpfrag_t *f = TAILQ_LAST(&rcvbuf->frags, flist);
145 		tbi->tcpbi_init_seq = stream->rcvvar->irs + 1;
146 		tbi->tcpbi_last_byte_read = rcvbuf->pile;
147 		tbi->tcpbi_next_byte_expected = rcvbuf->pile + tcprb_cflen(rcvbuf);
148 		tbi->tcpbi_last_byte_received = (f ? f->tail : rcvbuf->head);
149 	} else {
150 		errno = ENODATA;
151 		goto buf_info_error;
152 	}
153 
154 	return 0;
155 
156  buf_info_error:
157 	optval = NULL;
158 	*len = 0;
159 	return -1;
160 }
161 /*---------------------------------------------------------------------------*/
162 int
163 DisableBuf(socket_map_t sock, int side)
164 {
165 #ifdef DBGMSG
166 	__PREPARE_DBGLOGGING();
167 #endif
168 	struct tcp_stream *stream;
169 	int rc = 0;
170 
171 	switch (sock->socktype) {
172 	case MOS_SOCK_MONITOR_STREAM:
173 		if (side == MOS_SIDE_CLI)
174 			sock->monitor_listener->client_buf_mgmt = 0;
175 		else if (side == MOS_SIDE_SVR)
176 			sock->monitor_listener->server_buf_mgmt = 0;
177 		else {
178 			assert(0);
179 			TRACE_DBG("Invalid side!\n");
180 			rc = -1;
181 		}
182 		break;
183 	case MOS_SOCK_MONITOR_STREAM_ACTIVE:
184 		stream = sock->monitor_stream->stream;
185 		if (stream->side != side)
186 			stream = stream->pair_stream;
187 		assert(stream->side == side);
188 		stream->buffer_mgmt = 0;
189 		break;
190 	default:
191 		assert(0);
192 		TRACE_DBG("Can't disable buf for invalid socket!\n");
193 		rc = -1;
194 	}
195 
196 	return rc;
197 }
198 /*---------------------------------------------------------------------------*/
199 int
200 GetLastTimestamp(struct tcp_stream *stream, uint32_t *usecs, socklen_t *len)
201 {
202 #ifdef DBGMSG
203 	__PREPARE_DBGLOGGING();
204 #endif
205 	if (*len < sizeof(uint32_t)) {
206 		TRACE_DBG("Size passed is not >= sizeof(uint32_t)!\n");
207 		return -1;
208 	}
209 
210 	*usecs = (stream->last_active_ts >
211 		  stream->pair_stream->last_active_ts)
212 		?
213 		TS_TO_USEC(stream->last_active_ts) :
214 		TS_TO_USEC(stream->pair_stream->last_active_ts);
215 
216 	return 0;
217 }
218 /*---------------------------------------------------------------------------*/
219 inline int
220 GetTCPState(struct tcp_stream *stream, int side,
221 			void *optval, socklen_t *optlen)
222 {
223 	if (!stream || !(stream = (side == stream->side) ? stream : stream->pair_stream))
224 		return -1;
225 	*(int *)optval = (int)((stream->state == TCP_ST_CLOSED_RSVD) ?
226 						   TCP_ST_CLOSED : stream->state);
227 	return 0;
228 }
229 /*---------------------------------------------------------------------------*/
230 inline char *
231 TCPStateToString(const tcp_stream *stream)
232 {
233 	return (stream) ? state_str[stream->state] : NULL;
234 }
235 /*---------------------------------------------------------------------------*/
236 inline void
237 RaiseReadEvent(mtcp_manager_t mtcp, tcp_stream *stream)
238 {
239 	struct tcp_recv_vars *rcvvar;
240 
241 	rcvvar = stream->rcvvar;
242 
243 	if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) {
244 		if (stream->socket && (stream->socket->epoll & MOS_EPOLLIN))
245 			AddEpollEvent(mtcp->ep, MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLIN);
246 	} else if (rcvvar->rcvbuf && tcprb_cflen(rcvvar->rcvbuf) > 0) {
247 		/*
248 		 * in case it is a monitoring socket, queue up the read events
249 		 * in the event_queue of only if the tcp_stream hasn't already
250 		 * been registered in the event queue
251 		 */
252 		int index;
253 		struct event_queue *eq;
254 		struct socket_map *walk;
255 
256 		SOCKQ_FOREACH_START(walk, &stream->msocks) {
257 			assert(walk->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE);
258 			eq = walk->monitor_stream->monitor_listener->eq;
259 
260 			/* if it already has read data register... then skip this step */
261 			if (stream->actions & MOS_ACT_READ_DATA)
262 				return;
263 			if (eq->num_events >= eq->size) {
264 				TRACE_ERROR("Exceeded epoll event queue! num_events: %d, "
265 						"size: %d\n", eq->num_events, eq->size);
266 				return;
267 			}
268 
269 			index = eq->end++;
270 			eq->events[index].ev.events = MOS_EPOLLIN;
271 			eq->events[index].ev.data.ptr = (void *)stream;
272 
273 			if (eq->end >= eq->size) {
274 				eq->end = 0;
275 			}
276 			eq->num_events++;
277 			stream->actions |= MOS_ACT_READ_DATA;
278 		} SOCKQ_FOREACH_END;
279 	} else {
280 		TRACE_EPOLL("Stream %d: Raising read without a socket!\n", stream->id);
281 	}
282 }
283 /*---------------------------------------------------------------------------*/
284 inline void
285 RaiseWriteEvent(mtcp_manager_t mtcp, tcp_stream *stream)
286 {
287 	if (stream->socket) {
288 		if (stream->socket->epoll & MOS_EPOLLOUT) {
289 			AddEpollEvent(mtcp->ep,
290 				      MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLOUT);
291 		}
292 	} else {
293 		TRACE_EPOLL("Stream %d: Raising write without a socket!\n", stream->id);
294 	}
295 }
296 /*---------------------------------------------------------------------------*/
297 inline void
298 RaiseCloseEvent(mtcp_manager_t mtcp, tcp_stream *stream)
299 {
300 	if (stream->socket) {
301 		if (stream->socket->epoll & MOS_EPOLLRDHUP) {
302 			AddEpollEvent(mtcp->ep,
303 					MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLRDHUP);
304 		} else if (stream->socket->epoll & MOS_EPOLLIN) {
305 			AddEpollEvent(mtcp->ep,
306 				      MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLIN);
307 		}
308 	} else {
309 		TRACE_EPOLL("Stream %d: Raising close without a socket!\n", stream->id);
310 	}
311 }
312 /*---------------------------------------------------------------------------*/
313 inline int
314 RaiseErrorEvent(mtcp_manager_t mtcp, tcp_stream *stream)
315 {
316 	if (stream->socket) {
317 		if (stream->socket->epoll & MOS_EPOLLERR) {
318 			/* passing closing reason for error notification */
319 			return AddEpollEvent(mtcp->ep,
320 					     MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLERR);
321 		}
322 	} else {
323 		TRACE_EPOLL("Stream %d: Raising error without a socket!\n", stream->id);
324 	}
325 	return -1;
326 }
327 /*----------------------------------------------------------------------------*/
328 int
329 AddMonitorStreamSockets(mtcp_manager_t mtcp, struct tcp_stream *stream)
330 {
331 	struct mtcp_context mctx;
332 	int socktype;
333 
334 	mctx.cpu = mtcp->ctx->cpu;
335 	struct mon_listener *walk;
336 
337 	// traverse the passive socket's list
338 	TAILQ_FOREACH(walk, &mtcp->monitors, link) {
339 		socktype = walk->socket->socktype;
340 
341 		if (socktype != MOS_SOCK_MONITOR_STREAM)
342 			continue;
343 
344 		/* mtcp_bind_monitor_filter()
345 		 * - create an monitor active socket only for the filter-passed flows
346 		 * - we use the result (= tag) from DetectStreamType() to avoid
347 		 *   evaluating the same BPF filter twice */
348 		if (!walk->is_stream_syn_filter_hit) {
349 			continue;
350 		}
351 
352 		struct socket_map *s =
353 			AllocateSocket(&mctx, MOS_SOCK_MONITOR_STREAM_ACTIVE);
354 		if (!s)
355 			return -1;
356 
357 		s->monitor_stream->socket = s;
358 		s->monitor_stream->stream = stream;
359 		s->monitor_stream->monitor_listener = walk;
360 		s->monitor_stream->client_buf_mgmt = walk->client_buf_mgmt;
361 		s->monitor_stream->server_buf_mgmt = walk->server_buf_mgmt;
362 		s->monitor_stream->client_mon = walk->client_mon;
363 		s->monitor_stream->server_mon = walk->server_mon;
364 #ifdef NEWEV
365 		s->monitor_stream->stree_dontcare =
366 			s->monitor_stream->monitor_listener->stree_dontcare;
367 		s->monitor_stream->stree_pre_rcv =
368 			s->monitor_stream->monitor_listener->stree_pre_rcv;
369 		s->monitor_stream->stree_post_snd =
370 			s->monitor_stream->monitor_listener->stree_post_snd;
371 		if (s->monitor_stream->stree_dontcare)
372 			stree_inc_ref(s->monitor_stream->stree_dontcare);
373 		if (s->monitor_stream->stree_pre_rcv)
374 			stree_inc_ref(s->monitor_stream->stree_pre_rcv);
375 		if (s->monitor_stream->stree_post_snd)
376 			stree_inc_ref(s->monitor_stream->stree_post_snd);
377 #else
378 		InitEvP(&s->monitor_stream->dontcare_evp,
379 				&walk->dontcare_evb);
380 		InitEvP(&s->monitor_stream->pre_tcp_evp,
381 				&walk->pre_tcp_evb);
382 		InitEvP(&s->monitor_stream->post_tcp_evp,
383 				&walk->post_tcp_evb);
384 #endif
385 
386 		SOCKQ_INSERT_TAIL(&stream->msocks, s);
387 	}
388 
389 	return 0;
390 }
391 /*----------------------------------------------------------------------------*/
392 int
393 DestroyMonitorStreamSocket(mtcp_manager_t mtcp, socket_map_t msock)
394 {
395 	struct mtcp_context mctx;
396 	int socktype, sockid, rc;
397 
398 	if (msock == NULL) {
399 		TRACE_DBG("Stream socket does not exist!\n");
400 		/* exit(-1); */
401 		return 0;
402 	}
403 
404 	rc = 0;
405 	mctx.cpu = mtcp->ctx->cpu;
406 	socktype = msock->socktype;
407 	sockid = msock->id;
408 
409 	switch (socktype) {
410 	case MOS_SOCK_MONITOR_STREAM_ACTIVE:
411 		FreeSocket(&mctx, sockid, socktype);
412 		break;
413 	case MOS_SOCK_MONITOR_RAW:
414 		/* do nothing since all raw sockets point to the same socket */
415 		break;
416 	default:
417 		TRACE_DBG("Trying to destroy a monitor socket for an unsupported type!\n");
418 		rc = -1;
419 		/* exit(-1); */
420 		break;
421 	}
422 
423 	return rc;
424 }
425 /*---------------------------------------------------------------------------*/
426 tcp_stream *
427 CreateTCPStream(mtcp_manager_t mtcp, socket_map_t socket, int type,
428 		uint32_t saddr, uint16_t sport, uint32_t daddr, uint16_t dport,
429 		unsigned int *hash)
430 {
431 	tcp_stream *stream = NULL;
432 	int ret;
433 	/* stand-alone monitor does not need this since it is single-threaded */
434 	bool flow_lock = type & STREAM_TYPE(MOS_SOCK_STREAM);
435 	//bool flow_lock = false;
436 
437 	if (flow_lock)
438 		pthread_mutex_lock(&mtcp->ctx->flow_pool_lock);
439 
440 	stream = (tcp_stream *)MPAllocateChunk(mtcp->flow_pool);
441 	if (!stream) {
442 		TRACE_ERROR("Cannot allocate memory for the stream. "
443 				"g_config.mos->max_concurrency: %d, concurrent: %u\n",
444 				g_config.mos->max_concurrency, mtcp->flow_cnt);
445 		if (flow_lock)
446 			pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock);
447 		return NULL;
448 	}
449 	memset(stream, 0, sizeof(tcp_stream));
450 
451 	stream->rcvvar = (struct tcp_recv_vars *)MPAllocateChunk(mtcp->rv_pool);
452 	if (!stream->rcvvar) {
453 		MPFreeChunk(mtcp->flow_pool, stream);
454 		if (flow_lock)
455 			pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock);
456 		return NULL;
457 	}
458 	memset(stream->rcvvar, 0, sizeof(struct tcp_recv_vars));
459 
460 	/* stand-alone monitor does not need to do this */
461 	stream->sndvar = (struct tcp_send_vars *)MPAllocateChunk(mtcp->sv_pool);
462 	if (!stream->sndvar) {
463 		MPFreeChunk(mtcp->rv_pool, stream->rcvvar);
464 		MPFreeChunk(mtcp->flow_pool, stream);
465 		if (flow_lock)
466 			pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock);
467 		return NULL;
468 	}
469 	//if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM))
470 		memset(stream->sndvar, 0, sizeof(struct tcp_send_vars));
471 
472 	stream->id = mtcp->g_id++;
473 	stream->saddr = saddr;
474 	stream->sport = sport;
475 	stream->daddr = daddr;
476 	stream->dport = dport;
477 
478 	ret = HTInsert(mtcp->tcp_flow_table, stream, hash);
479 	if (ret < 0) {
480 		TRACE_ERROR("Stream %d: "
481 				"Failed to insert the stream into hash table.\n", stream->id);
482 		MPFreeChunk(mtcp->flow_pool, stream);
483 		if (flow_lock)
484 			pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock);
485 		return NULL;
486 	}
487 	stream->on_hash_table = TRUE;
488 	mtcp->flow_cnt++;
489 
490 	SOCKQ_INIT(&stream->msocks);
491 
492 	/*
493 	 * if an embedded monitor is attached...
494 	 *  create monitor stream socket now!
495 	 * If socket type is raw.. then don't create it
496 	 */
497 	if ((mtcp->num_msp > 0) &&
498 		(type & STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE)))
499 		if (AddMonitorStreamSockets(mtcp, stream) < 0)
500 			TRACE_DBG("Could not create monitor stream socket!\n");
501 
502 	if (flow_lock)
503 		pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock);
504 
505 	if (socket) {
506 		stream->socket = socket;
507 		socket->stream = stream;
508 	}
509 
510 	stream->stream_type = type;
511 	stream->state = TCP_ST_LISTEN;
512 	/* This is handled by core.c, tcp_in.c & tcp_out.c */
513 	/* stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; */
514 
515 	stream->on_rto_idx = -1;
516 
517 	/* stand-alone monitor does not need to do this */
518 	stream->sndvar->mss = TCP_DEFAULT_MSS;
519 	stream->sndvar->wscale_mine = TCP_DEFAULT_WSCALE;
520 	stream->sndvar->wscale_peer = 0;
521 
522 	if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) {
523 		stream->sndvar->ip_id = 0;
524 		stream->sndvar->nif_out = GetOutputInterface(stream->daddr);
525 
526 		stream->sndvar->iss = posix_seq_rand() % TCP_MAX_SEQ;
527 		//stream->sndvar->iss = 0;
528 		stream->snd_nxt = stream->sndvar->iss;
529 		stream->sndvar->snd_una = stream->sndvar->iss;
530 		stream->sndvar->snd_wnd = g_config.mos->wmem_size;
531 		stream->sndvar->rto = TCP_INITIAL_RTO;
532 #if USE_SPIN_LOCK
533 		if (pthread_spin_init(&stream->sndvar->write_lock, PTHREAD_PROCESS_PRIVATE)) {
534 			perror("pthread_spin_init of write_lock");
535 			pthread_spin_destroy(&stream->rcvvar->read_lock);
536 #else
537 		if (pthread_mutex_init(&stream->sndvar->write_lock, NULL)) {
538 			perror("pthread_mutex_init of write_lock");
539 			pthread_mutex_destroy(&stream->rcvvar->read_lock);
540 #endif
541 			return NULL;
542 		}
543 	}
544 	stream->rcvvar->irs = 0;
545 
546 	stream->rcv_nxt = 0;
547 	stream->rcvvar->rcv_wnd = TCP_INITIAL_WINDOW;
548 
549 	stream->rcvvar->snd_wl1 = stream->rcvvar->irs - 1;
550 
551 	stream->buffer_mgmt = BUFMGMT_FULL;
552 
553 	/* needs state update by default */
554 	stream->status_mgmt = 1;
555 
556 #if USE_SPIN_LOCK
557 	if (pthread_spin_init(&stream->rcvvar->read_lock, PTHREAD_PROCESS_PRIVATE)) {
558 #else
559 	if (pthread_mutex_init(&stream->rcvvar->read_lock, NULL)) {
560 #endif
561 		perror("pthread_mutex_init of read_lock");
562 		return NULL;
563 	}
564 
565 #ifdef STREAM
566 	uint8_t *sa;
567 	uint8_t *da;
568 
569 	sa = (uint8_t *)&stream->saddr;
570 	da = (uint8_t *)&stream->daddr;
571 	TRACE_STREAM("CREATED NEW TCP STREAM %d: "
572 			"%u.%u.%u.%u(%d) -> %u.%u.%u.%u(%d) (ISS: %u)\n", stream->id,
573 			sa[0], sa[1], sa[2], sa[3], ntohs(stream->sport),
574 			da[0], da[1], da[2], da[3], ntohs(stream->dport),
575 			stream->sndvar->iss);
576 #endif
577 
578 	return stream;
579 }
580 /*----------------------------------------------------------------------------*/
581 inline tcp_stream *
582 CreateDualTCPStream(mtcp_manager_t mtcp, socket_map_t socket, int type, uint32_t saddr,
583 		    uint16_t sport, uint32_t daddr, uint16_t dport, unsigned int *hash)
584 {
585         tcp_stream *cur_stream, *paired_stream;
586         struct socket_map *walk;
587 
588         cur_stream = CreateTCPStream(mtcp, socket, type,
589 				     saddr, sport, daddr, dport, hash);
590         if (cur_stream == NULL) {
591                 TRACE_ERROR("Can't create tcp_stream!\n");
592                 return NULL;
593         }
594 
595 		paired_stream = CreateTCPStream(mtcp, NULL, MOS_SOCK_UNUSED,
596 						daddr, dport, saddr, sport, hash);
597         if (paired_stream == NULL) {
598                 DestroyTCPStream(mtcp, cur_stream);
599                 TRACE_ERROR("Can't create tcp_stream!\n");
600                 return NULL;
601         }
602 
603         cur_stream->pair_stream = paired_stream;
604         paired_stream->pair_stream = cur_stream;
605         paired_stream->socket = socket;
606         SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
607                 SOCKQ_INSERT_TAIL(&paired_stream->msocks, walk);
608         } SOCKQ_FOREACH_END;
609         paired_stream->stream_type = STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE);
610 
611         return cur_stream;
612 }
613 /*----------------------------------------------------------------------------*/
614 inline tcp_stream *
615 CreateClientTCPStream(mtcp_manager_t mtcp, socket_map_t socket, int type,
616 			uint32_t saddr, uint16_t sport, uint32_t daddr, uint16_t dport,
617 			unsigned int *hash)
618 {
619 	tcp_stream *cs;
620 	struct socket_map *w;
621 
622 	cs = CreateTCPStream(mtcp, socket, type, daddr, dport, saddr, sport, hash);
623 	if (cs == NULL) {
624 		TRACE_ERROR("Can't create tcp_stream!\n");
625 		return NULL;
626 	}
627 
628 	cs->side = MOS_SIDE_CLI;
629 	cs->pair_stream = NULL;
630 
631 	/* if buffer management is off, then disable
632 	 * monitoring tcp ring of either streams (only if stream
633 	 * is just monitor stream active)
634 	 */
635 	if (IS_STREAM_TYPE(cs, MOS_SOCK_MONITOR_STREAM_ACTIVE)) {
636 		cs->buffer_mgmt = BUFMGMT_OFF;
637 		SOCKQ_FOREACH_START(w, &cs->msocks) {
638 			uint8_t bm = w->monitor_stream->client_buf_mgmt;
639 			if (bm > cs->buffer_mgmt)
640 				cs->buffer_mgmt = bm;
641 			if (w->monitor_stream->monitor_listener->client_mon == 1)
642 				cs->status_mgmt = 1;
643 		} SOCKQ_FOREACH_END;
644 	}
645 
646 	return cs;
647 }
648 /*----------------------------------------------------------------------------*/
649 inline tcp_stream *
650 AttachServerTCPStream(mtcp_manager_t mtcp, tcp_stream *cs, int type,
651 			uint32_t saddr, uint16_t sport, uint32_t daddr, uint16_t dport)
652 {
653 	tcp_stream *ss;
654 	struct socket_map *w;
655 
656 	/* The 3rd arg is a temp hackk... FIXIT! TODO: XXX */
657 	ss = CreateTCPStream(mtcp, NULL, MOS_SOCK_UNUSED, saddr, sport, daddr, dport, NULL);
658 	if (ss == NULL) {
659 		TRACE_ERROR("Can't create tcp_stream!\n");
660 		return NULL;
661 	}
662 
663 	ss->side = MOS_SIDE_SVR;
664 	cs->pair_stream = ss;
665 	ss->pair_stream = cs;
666 	ss->socket = cs->socket;
667 	SOCKQ_FOREACH_START(w, &cs->msocks) {
668 		SOCKQ_INSERT_TAIL(&ss->msocks, w);
669 	} SOCKQ_FOREACH_END;
670 	ss->stream_type = STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE);
671 
672 	if (IS_STREAM_TYPE(ss, MOS_SOCK_MONITOR_STREAM_ACTIVE)) {
673 		ss->buffer_mgmt = BUFMGMT_OFF;
674 		SOCKQ_FOREACH_START(w, &ss->msocks) {
675 			uint8_t bm = w->monitor_stream->server_buf_mgmt;
676 			if (bm > ss->buffer_mgmt)
677 				ss->buffer_mgmt = bm;
678 			if (w->monitor_stream->monitor_listener->server_mon == 1)
679 				ss->status_mgmt = 1;
680 		} SOCKQ_FOREACH_END;
681 	}
682 
683 	return ss;
684 }
685 /*---------------------------------------------------------------------------*/
686 static void
687 DestroySingleTCPStream(mtcp_manager_t mtcp, tcp_stream *stream)
688 {
689 	struct sockaddr_in addr;
690 	int bound_addr = FALSE;
691 	int ret;
692 	/* stand-alone monitor does not need this since it is single-threaded */
693 	bool flow_lock = HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM);
694 
695 	struct socket_map *walk;
696 
697 	/* Set the stream state as CLOSED */
698 	stream->state = TCP_ST_CLOSED_RSVD;
699 
700 	SOCKQ_FOREACH_START(walk, &stream->msocks) {
701 		HandleCallback(mtcp, MOS_HK_RCV, walk, stream->side, NULL,
702 			   MOS_ON_CONN_END | MOS_ON_TCP_STATE_CHANGE | stream->cb_events);
703 		HandleCallback(mtcp, MOS_HK_SND, walk, stream->side, NULL,
704 			   MOS_ON_CONN_END | MOS_ON_TCP_STATE_CHANGE | stream->cb_events);
705 	} SOCKQ_FOREACH_END;
706 
707 #if 0
708 #ifdef DUMP_STREAM
709 	if (stream->close_reason != TCP_ACTIVE_CLOSE &&
710 			stream->close_reason != TCP_PASSIVE_CLOSE) {
711 		thread_printf(mtcp, mtcp->log_fp,
712 				"Stream %d abnormally closed.\n", stream->id);
713 		DumpStream(mtcp, stream);
714 		DumpControlList(mtcp, mtcp->n_sender[0]);
715 	}
716 #endif
717 
718 #ifdef STREAM
719 	uint8_t *sa, *da;
720 	sa = (uint8_t *)&stream->saddr;
721 	da = (uint8_t *)&stream->daddr;
722 	TRACE_STREAM("DESTROY TCP STREAM %d: "
723 			"%u.%u.%u.%u(%d) -> %u.%u.%u.%u(%d) (%s)\n", stream->id,
724 			sa[0], sa[1], sa[2], sa[3], ntohs(stream->sport),
725 			da[0], da[1], da[2], da[3], ntohs(stream->dport),
726 			close_reason_str[stream->close_reason]);
727 #endif
728 
729 	if (stream->sndvar->sndbuf) {
730 		TRACE_FSTAT("Stream %d: send buffer "
731 				"cum_len: %lu, len: %u\n", stream->id,
732 				stream->sndvar->sndbuf->cum_len,
733 				stream->sndvar->sndbuf->len);
734 	}
735 	if (stream->rcvvar->rcvbuf) {
736 		TRACE_FSTAT("Stream %d: recv buffer "
737 				"cum_len: %lu, merged_len: %u, last_len: %u\n", stream->id,
738 				stream->rcvvar->rcvbuf->cum_len,
739 				stream->rcvvar->rcvbuf->merged_len,
740 				stream->rcvvar->rcvbuf->last_len);
741 	}
742 
743 #if RTM_STAT
744 	/* Triple duplicated ack stats */
745 	if (stream->sndvar->rstat.tdp_ack_cnt) {
746 		TRACE_FSTAT("Stream %d: triple duplicated ack: %u, "
747 				"retransmission bytes: %u, average rtm bytes/ack: %u\n",
748 				stream->id,
749 				stream->sndvar->rstat.tdp_ack_cnt, stream->sndvar->rstat.tdp_ack_bytes,
750 				stream->sndvar->rstat.tdp_ack_bytes / stream->sndvar->rstat.tdp_ack_cnt);
751 	}
752 
753 	/* Retransmission timeout stats */
754 	if (stream->sndvar->rstat.rto_cnt > 0) {
755 		TRACE_FSTAT("Stream %d: timeout count: %u, bytes: %u\n", stream->id,
756 				stream->sndvar->rstat.rto_cnt, stream->sndvar->rstat.rto_bytes);
757 	}
758 
759 	/* Recovery stats */
760 	if (stream->sndvar->rstat.ack_upd_cnt) {
761 		TRACE_FSTAT("Stream %d: snd_nxt update count: %u, "
762 				"snd_nxt update bytes: %u, average update bytes/update: %u\n",
763 				stream->id,
764 				stream->sndvar->rstat.ack_upd_cnt, stream->sndvar->rstat.ack_upd_bytes,
765 				stream->sndvar->rstat.ack_upd_bytes / stream->sndvar->rstat.ack_upd_cnt);
766 	}
767 #if TCP_OPT_SACK_ENABLED
768 	if (stream->sndvar->rstat.sack_cnt) {
769 		TRACE_FSTAT("Selective ack count: %u, bytes: %u, "
770 				"average bytes/ack: %u\n",
771 				stream->sndvar->rstat.sack_cnt, stream->sndvar->rstat.sack_bytes,
772 				stream->sndvar->rstat.sack_bytes / stream->sndvar->rstat.sack_cnt);
773 	} else {
774 		TRACE_FSTAT("Selective ack count: %u, bytes: %u\n",
775 				stream->sndvar->rstat.sack_cnt, stream->sndvar->rstat.sack_bytes);
776 	}
777 	if (stream->sndvar->rstat.tdp_sack_cnt) {
778 		TRACE_FSTAT("Selective tdp ack count: %u, bytes: %u, "
779 				"average bytes/ack: %u\n",
780 				stream->sndvar->rstat.tdp_sack_cnt, stream->sndvar->rstat.tdp_sack_bytes,
781 				stream->sndvar->rstat.tdp_sack_bytes / stream->sndvar->rstat.tdp_sack_cnt);
782 	} else {
783 		TRACE_FSTAT("Selective ack count: %u, bytes: %u\n",
784 				stream->sndvar->rstat.tdp_sack_cnt, stream->sndvar->rstat.tdp_sack_bytes);
785 	}
786 #endif /* TCP_OPT_SACK_ENABLED */
787 #endif /* RTM_STAT */
788 #endif
789 
790 	if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) {
791 		/* stand-alone monitor does not need to do these */
792 		if (stream->is_bound_addr) {
793 			bound_addr = TRUE;
794 			addr.sin_addr.s_addr = stream->saddr;
795 			addr.sin_port = stream->sport;
796 		}
797 
798 		RemoveFromControlList(mtcp, stream);
799 		RemoveFromSendList(mtcp, stream);
800 		RemoveFromACKList(mtcp, stream);
801 
802 		if (stream->on_rto_idx >= 0)
803 			RemoveFromRTOList(mtcp, stream);
804 
805 		SBUF_LOCK_DESTROY(&stream->rcvvar->read_lock);
806 		SBUF_LOCK_DESTROY(&stream->sndvar->write_lock);
807 
808 		assert(stream->on_hash_table == TRUE);
809 
810 		/* free ring buffers */
811 		if (stream->sndvar->sndbuf) {
812 			SBFree(mtcp->rbm_snd, stream->sndvar->sndbuf);
813 			stream->sndvar->sndbuf = NULL;
814 		}
815 	}
816 
817 	if (stream->on_timewait_list)
818 		RemoveFromTimewaitList(mtcp, stream);
819 
820 	if (g_config.mos->tcp_timeout > 0)
821 		RemoveFromTimeoutList(mtcp, stream);
822 
823 	if (stream->rcvvar->rcvbuf) {
824 		tcprb_del(stream->rcvvar->rcvbuf);
825 		stream->rcvvar->rcvbuf = NULL;
826 	}
827 
828 	if (flow_lock)
829 		pthread_mutex_lock(&mtcp->ctx->flow_pool_lock);
830 
831 	/* remove from flow hash table */
832 	HTRemove(mtcp->tcp_flow_table, stream);
833 	stream->on_hash_table = FALSE;
834 
835 	mtcp->flow_cnt--;
836 
837 	/* if there was a corresponding monitor stream socket opened
838 	 * then close it */
839 	SOCKQ_FOREACH_START(walk, &stream->msocks) {
840 		SOCKQ_REMOVE(&stream->msocks, walk);
841 		if (stream->pair_stream == NULL)
842 			DestroyMonitorStreamSocket(mtcp, walk);
843 	} SOCKQ_FOREACH_END;
844 
845 	if (stream->pair_stream != NULL) {
846 		/* Nullify pointer to sibliing tcp_stream's pair_stream */
847 		stream->pair_stream->pair_stream = NULL;
848 	}
849 
850 	MPFreeChunk(mtcp->rv_pool, stream->rcvvar);
851 	MPFreeChunk(mtcp->sv_pool, stream->sndvar);
852 	MPFreeChunk(mtcp->flow_pool, stream);
853 
854 	if (flow_lock)
855 		/* stand-alone monitor does not need this since it is single-threaded */
856 		pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock);
857 
858 	if (bound_addr) {
859 		if (mtcp->ap) {
860 			ret = FreeAddress(mtcp->ap, &addr);
861 		} else {
862 			int nif;
863 			nif = GetOutputInterface(addr.sin_addr.s_addr);
864 			if (nif < 0) {
865 				TRACE_ERROR("Can't determine interface idx!\n");
866 				exit(EXIT_FAILURE);
867 			} else {
868 				ret = FreeAddress(ap[nif], &addr);
869 			}
870 		}
871 		if (ret < 0) {
872 			TRACE_ERROR("(NEVER HAPPEN) Failed to free address.\n");
873 		}
874 	}
875 
876 #ifdef NETSTAT
877 #if NETSTAT_PERTHREAD
878 	TRACE_STREAM("Destroyed. Remaining flows: %u\n", mtcp->flow_cnt);
879 #endif /* NETSTAT_PERTHREAD */
880 #endif /* NETSTAT */
881 
882 }
883 /*---------------------------------------------------------------------------*/
884 void
885 DestroyTCPStream(mtcp_manager_t mtcp, tcp_stream *stream)
886 {
887 	tcp_stream *pair_stream = stream->pair_stream;
888 
889 	DestroySingleTCPStream(mtcp, stream);
890 
891 	if (pair_stream)
892 		DestroySingleTCPStream(mtcp, pair_stream);
893 }
894 /*---------------------------------------------------------------------------*/
895 void
896 DumpStream(mtcp_manager_t mtcp, tcp_stream *stream)
897 {
898 	uint8_t *sa, *da;
899 	struct tcp_send_vars *sndvar = stream->sndvar;
900 	struct tcp_recv_vars *rcvvar = stream->rcvvar;
901 
902 	sa = (uint8_t *)&stream->saddr;
903 	da = (uint8_t *)&stream->daddr;
904 	thread_printf(mtcp, mtcp->log_fp, "========== Stream %u: "
905 			"%u.%u.%u.%u(%u) -> %u.%u.%u.%u(%u) ==========\n", stream->id,
906 			sa[0], sa[1], sa[2], sa[3], ntohs(stream->sport),
907 			da[0], da[1], da[2], da[3], ntohs(stream->dport));
908 	thread_printf(mtcp, mtcp->log_fp,
909 			"Stream id: %u, type: %u, state: %s, close_reason: %s\n",
910 			stream->id, stream->stream_type,
911 			TCPStateToString(stream), close_reason_str[stream->close_reason]);
912 	if (stream->socket) {
913 		socket_map_t socket = stream->socket;
914 		thread_printf(mtcp, mtcp->log_fp, "Socket id: %d, type: %d, opts: %u\n"
915 				"epoll: %u (IN: %u, OUT: %u, ERR: %u, RDHUP: %u, ET: %u)\n"
916 				"events: %u (IN: %u, OUT: %u, ERR: %u, RDHUP: %u, ET: %u)\n",
917 				socket->id, socket->socktype, socket->opts,
918 				socket->epoll, socket->epoll & MOS_EPOLLIN,
919 				socket->epoll & MOS_EPOLLOUT, socket->epoll & MOS_EPOLLERR,
920 				socket->epoll & MOS_EPOLLRDHUP, socket->epoll & MOS_EPOLLET,
921 				socket->events, socket->events & MOS_EPOLLIN,
922 				socket->events & MOS_EPOLLOUT, socket->events & MOS_EPOLLERR,
923 				socket->events & MOS_EPOLLRDHUP, socket->events & MOS_EPOLLET);
924 	} else {
925 		thread_printf(mtcp, mtcp->log_fp, "Socket: (null)\n");
926 	}
927 
928 	thread_printf(mtcp, mtcp->log_fp,
929 			"on_hash_table: %u, on_control_list: %u (wait: %u), on_send_list: %u, "
930 			"on_ack_list: %u, is_wack: %u, ack_cnt: %u\n"
931 			"on_rto_idx: %d, on_timewait_list: %u, on_timeout_list: %u, "
932 			"on_rcv_br_list: %u, on_snd_br_list: %u\n"
933 			"on_sendq: %u, on_ackq: %u, closed: %u, on_closeq: %u, "
934 			"on_closeq_int: %u, on_resetq: %u, on_resetq_int: %u\n"
935 			"have_reset: %u, is_fin_sent: %u, is_fin_ackd: %u, "
936 			"saw_timestamp: %u, sack_permit: %u, "
937 			"is_bound_addr: %u, need_wnd_adv: %u\n", stream->on_hash_table,
938 			sndvar->on_control_list, stream->control_list_waiting, sndvar->on_send_list,
939 			sndvar->on_ack_list, sndvar->is_wack, sndvar->ack_cnt,
940 			stream->on_rto_idx, stream->on_timewait_list, stream->on_timeout_list,
941 			stream->on_rcv_br_list, stream->on_snd_br_list,
942 			sndvar->on_sendq, sndvar->on_ackq,
943 			stream->closed, sndvar->on_closeq, sndvar->on_closeq_int,
944 			sndvar->on_resetq, sndvar->on_resetq_int,
945 			stream->have_reset, sndvar->is_fin_sent,
946 			sndvar->is_fin_ackd, stream->saw_timestamp, stream->sack_permit,
947 			stream->is_bound_addr, stream->need_wnd_adv);
948 
949 	thread_printf(mtcp, mtcp->log_fp, "========== Send variables ==========\n");
950 	thread_printf(mtcp, mtcp->log_fp,
951 		      "ip_id: %u, mss: %u, eff_mss: %u, wscale(me, peer): (%u, %u), nif_out: %d\n",
952 		      sndvar->ip_id, sndvar->mss, sndvar->eff_mss,
953 		      sndvar->wscale_mine, sndvar->wscale_peer, sndvar->nif_out);
954 	thread_printf(mtcp, mtcp->log_fp,
955 			"snd_nxt: %u, snd_una: %u, iss: %u, fss: %u\nsnd_wnd: %u, "
956 			"peer_wnd: %u, cwnd: %u, ssthresh: %u\n",
957 			stream->snd_nxt, sndvar->snd_una, sndvar->iss, sndvar->fss,
958 			sndvar->snd_wnd, sndvar->peer_wnd, sndvar->cwnd, sndvar->ssthresh);
959 
960 	if (sndvar->sndbuf) {
961 		thread_printf(mtcp, mtcp->log_fp,
962 				"Send buffer: init_seq: %u, head_seq: %u, "
963 				"len: %d, cum_len: %lu, size: %d\n",
964 				sndvar->sndbuf->init_seq, sndvar->sndbuf->head_seq,
965 				sndvar->sndbuf->len, sndvar->sndbuf->cum_len, sndvar->sndbuf->size);
966 	} else {
967 		thread_printf(mtcp, mtcp->log_fp, "Send buffer: (null)\n");
968 	}
969 	thread_printf(mtcp, mtcp->log_fp,
970 			"nrtx: %u, max_nrtx: %u, rto: %u, ts_rto: %u, "
971 			"ts_lastack_sent: %u\n", sndvar->nrtx, sndvar->max_nrtx,
972 			sndvar->rto, sndvar->ts_rto, sndvar->ts_lastack_sent);
973 
974 	thread_printf(mtcp, mtcp->log_fp,
975 			"========== Receive variables ==========\n");
976 	thread_printf(mtcp, mtcp->log_fp,
977 			"rcv_nxt: %u, irs: %u, rcv_wnd: %u, "
978 			"snd_wl1: %u, snd_wl2: %u\n",
979 			stream->rcv_nxt, rcvvar->irs,
980 			rcvvar->rcv_wnd, rcvvar->snd_wl1, rcvvar->snd_wl2);
981 	if (!rcvvar->rcvbuf) {
982 		thread_printf(mtcp, mtcp->log_fp, "Receive buffer: (null)\n");
983 	}
984 	thread_printf(mtcp, mtcp->log_fp, "last_ack_seq: %u, dup_acks: %u\n",
985 			rcvvar->last_ack_seq, rcvvar->dup_acks);
986 	thread_printf(mtcp, mtcp->log_fp,
987 			"ts_recent: %u, ts_lastack_rcvd: %u, ts_last_ts_upd: %u, "
988 			"ts_tw_expire: %u\n", rcvvar->ts_recent, rcvvar->ts_lastack_rcvd,
989 			rcvvar->ts_last_ts_upd, rcvvar->ts_tw_expire);
990 	thread_printf(mtcp, mtcp->log_fp,
991 			"srtt: %u, mdev: %u, mdev_max: %u, rttvar: %u, rtt_seq: %u\n",
992 			rcvvar->srtt, rcvvar->mdev, rcvvar->mdev_max,
993 			rcvvar->rttvar, rcvvar->rtt_seq);
994 }
995 /*---------------------------------------------------------------------------*/
996