1 #include "debug.h"
2 #include <string.h>
3 
4 #include "config.h"
5 #include "tcp_stream.h"
6 #include "fhash.h"
7 #include "tcp.h"
8 #include "tcp_in.h"
9 #include "tcp_out.h"
10 #include "tcp_ring_buffer.h"
11 #include "tcp_send_buffer.h"
12 #include "eventpoll.h"
13 #include "ip_out.h"
14 #include "timer.h"
15 #include "tcp_rb.h"
16 /*---------------------------------------------------------------------------*/
17 char *state_str[] = {
18 	"TCP_ST_CLOSED",
19 	"TCP_ST_LISTEN",
20 	"TCP_ST_SYN_SENT",
21 	"TCP_ST_SYN_RCVD",
22 	"TCP_ST_ESTABILSHED",
23 	"TCP_ST_FIN_WAIT_1",
24 	"TCP_ST_FIN_WAIT_2",
25 	"TCP_ST_CLOSE_WAIT",
26 	"TCP_ST_CLOSING",
27 	"TCP_ST_LAST_ACK",
28 	"TCP_ST_TIME_WAIT",
29 	"TCP_ST_CLOSED_RSVD"
30 };
31 /*---------------------------------------------------------------------------*/
32 char *close_reason_str[] = {
33 	"NOT_CLOSED",
34 	"CLOSE",
35 	"CLOSED",
36 	"CONN_FAIL",
37 	"CONN_LOST",
38 	"RESET",
39 	"NO_MEM",
40 	"DENIED",
41 	"TIMEDOUT"
42 };
43 /*---------------------------------------------------------------------------*/
44 static __thread unsigned long next = 1;
45 /* Function retrieved from POSIX.1-2001 standard */
46 /* RAND_MAX assumed to be 32767 */
47 static int
48 posix_seq_rand(void) {
49 	next = next * 1103515245 + 12345;
50 	return ((unsigned)(next/65536) % 32768);
51 }
52 /*---------------------------------------------------------------------------*/
53 void
54 posix_seq_srand(unsigned seed) {
55 	next = seed % 32768;
56 }
57 /*---------------------------------------------------------------------------*/
58 uint32_t
59 FetchSeqDrift(struct tcp_stream *stream, uint32_t seq)
60 {
61 	int i = 0;
62 	uint8_t flag = 1;
63 	int count;
64 
65 	i = stream->sndvar->sre_index - 1;
66 	if (i == -1) i = SRE_MAX - 1;
67 	count = 0;
68 
69 	while (flag) {
70 		if (stream->sndvar->sre[i].seq_base == 0)
71 			return 0;
72 		else if (seq >= stream->sndvar->sre[i].seq_base)
73 			return stream->sndvar->sre[i].seq_off;
74 
75 		i--;
76 		if (i == -1) i = SRE_MAX - 1;
77 		count++;
78 		if (count == SRE_MAX)
79 			flag = 1;
80 	}
81 
82 	return 0;
83 }
84 /*---------------------------------------------------------------------------*/
85 int
86 TcpSeqChange(socket_map_t socket, uint32_t seq_drift, int side, uint32_t seqno)
87 {
88 	struct tcp_stream *mstrm, *stream;
89 
90 	if (side != MOS_SIDE_CLI && side != MOS_SIDE_SVR) {
91 		TRACE_ERROR("Invalid side requested!\n");
92 		errno = EINVAL;
93 		return -1;
94 	}
95 
96 	mstrm = socket->monitor_stream->stream;
97 	stream = (side == mstrm->side) ? mstrm : mstrm->pair_stream;
98 	if (stream == NULL) {
99 		TRACE_ERROR("Stream pointer for sockid: %u not found!\n",
100 			    socket->id);
101 		errno = EBADF;
102 		return -1;
103 	}
104 
105 	stream->sndvar->sre[stream->sndvar->sre_index].seq_off = seq_drift;
106 	stream->sndvar->sre[stream->sndvar->sre_index].seq_off +=
107 		(stream->sndvar->sre_index == 0) ? stream->sndvar->sre[SRE_MAX - 1].seq_off :
108 		stream->sndvar->sre[stream->sndvar->sre_index - 1].seq_off;
109 	stream->sndvar->sre[stream->sndvar->sre_index].seq_base = seqno;
110 	stream->sndvar->sre_index = (stream->sndvar->sre_index + 1) & (SRE_MAX - 1);
111 
112 	return 0;
113 }
114 /*---------------------------------------------------------------------------*/
115 /**
116  * FYI: This is NOT a read-only return!
117  */
118 int
119 GetFragInfo(socket_map_t sock, int side, void *optval, socklen_t *len)
120 {
121 	struct tcp_stream *stream;
122 
123 	stream = NULL;
124 	if (!*len || ( *len % sizeof(tcpfrag_t) != 0))
125 		goto frag_info_error;
126 
127 	if (side != MOS_SIDE_CLI && side != MOS_SIDE_SVR) {
128 		TRACE_ERROR("Invalid side requested!\n");
129 		exit(EXIT_FAILURE);
130 		return -1;
131 	}
132 
133 	struct tcp_stream *mstrm = sock->monitor_stream->stream;
134 	stream = (side == mstrm->side) ? mstrm : mstrm->pair_stream;
135 
136 	if (stream == NULL) goto frag_info_error;
137 
138 	/* First check if the tcp ring buffer even has anything */
139 	if (stream->rcvvar != NULL &&
140 	    stream->rcvvar->rcvbuf != NULL) {
141 		tcprb_t *rcvbuf = stream->rcvvar->rcvbuf;
142 		struct tcp_ring_fragment *out = (struct tcp_ring_fragment *)optval;
143 		int const maxout = *len;
144 		*len = 0;
145 		struct _tcpfrag_t *walk;
146 		TAILQ_FOREACH(walk, &rcvbuf->frags, link) {
147 			if (*len == maxout)
148 				break;
149 			out[*len].offset = walk->head;
150 			out[*len].len = walk->tail - walk->head;
151 			(*len)++;
152 		}
153 		if (*len != maxout) {
154 			/* set zero sentinel */
155 			out[*len].offset = 0;
156 			out[*len].len = 0;
157 		}
158 	} else
159 		goto frag_info_error;
160 
161 	return 0;
162 
163  frag_info_error:
164 	optval = NULL;
165 	*len = 0;
166 	return -1;
167 }
168 /*---------------------------------------------------------------------------*/
169 /**
170  * Comments later...
171  */
172 int
173 GetBufInfo(socket_map_t sock, int side, void *optval, socklen_t *len)
174 {
175 	struct tcp_stream *stream;
176 	struct tcp_buf_info *tbi;
177 
178 	tbi = (struct tcp_buf_info *)optval;
179 	memset(tbi, 0, sizeof(struct tcp_buf_info));
180 	stream = NULL;
181 
182 	if (*len != sizeof(struct tcp_buf_info)) {
183 		errno = EINVAL;
184 		goto buf_info_error;
185 	}
186 
187 	if (side != MOS_SIDE_CLI && side != MOS_SIDE_SVR) {
188 		TRACE_ERROR("Invalid side requested!\n");
189 		errno = EINVAL;
190 		goto buf_info_error;
191 	}
192 
193 	struct tcp_stream *mstrm = sock->monitor_stream->stream;
194 	stream = (side == mstrm->side) ? mstrm : mstrm->pair_stream;
195 
196 	/* First check if the tcp ring buffer even has anything */
197 	if (stream != NULL &&
198 	    stream->rcvvar != NULL &&
199 	    stream->rcvvar->rcvbuf != NULL) {
200 		tcprb_t *rcvbuf = stream->rcvvar->rcvbuf;
201 		tcpfrag_t *f = TAILQ_LAST(&rcvbuf->frags, flist);
202 		tbi->tcpbi_init_seq = stream->rcvvar->irs + 1;
203 		tbi->tcpbi_last_byte_read = rcvbuf->pile;
204 		tbi->tcpbi_next_byte_expected = rcvbuf->pile + tcprb_cflen(rcvbuf);
205 		tbi->tcpbi_last_byte_received = (f ? f->tail : rcvbuf->head);
206 	} else {
207 		errno = ENODATA;
208 		goto buf_info_error;
209 	}
210 
211 	return 0;
212 
213  buf_info_error:
214 	optval = NULL;
215 	*len = 0;
216 	return -1;
217 }
218 /*---------------------------------------------------------------------------*/
219 int
220 DisableBuf(socket_map_t sock, int side)
221 {
222 #ifdef DBGMSG
223 	__PREPARE_DBGLOGGING();
224 #endif
225 	struct tcp_stream *stream;
226 	int rc = 0;
227 
228 	switch (sock->socktype) {
229 	case MOS_SOCK_MONITOR_STREAM:
230 		if (side == MOS_SIDE_CLI)
231 			sock->monitor_listener->client_buf_mgmt = 0;
232 		else if (side == MOS_SIDE_SVR)
233 			sock->monitor_listener->server_buf_mgmt = 0;
234 		else {
235 			assert(0);
236 			TRACE_DBG("Invalid side!\n");
237 			rc = -1;
238 		}
239 		break;
240 	case MOS_SOCK_MONITOR_STREAM_ACTIVE:
241 		stream = sock->monitor_stream->stream;
242 		if (stream->side != side)
243 			stream = stream->pair_stream;
244 		assert(stream->side == side);
245 		stream->buffer_mgmt = 0;
246 		break;
247 	default:
248 		assert(0);
249 		TRACE_DBG("Can't disable buf for invalid socket!\n");
250 		rc = -1;
251 	}
252 
253 	return rc;
254 }
255 /*---------------------------------------------------------------------------*/
256 int
257 GetLastTimestamp(struct tcp_stream *stream, uint32_t *usecs, socklen_t *len)
258 {
259 #ifdef DBGMSG
260 	__PREPARE_DBGLOGGING();
261 #endif
262 	if (*len < sizeof(uint32_t)) {
263 		TRACE_DBG("Size passed is not >= sizeof(uint32_t)!\n");
264 		return -1;
265 	}
266 
267 	*usecs = (stream->last_active_ts >
268 		  stream->pair_stream->last_active_ts)
269 		?
270 		TS_TO_USEC(stream->last_active_ts) :
271 		TS_TO_USEC(stream->pair_stream->last_active_ts);
272 
273 	return 0;
274 }
275 /*---------------------------------------------------------------------------*/
276 inline int
277 GetTCPState(struct tcp_stream *stream, int side,
278 			void *optval, socklen_t *optlen)
279 {
280 	if (!stream || !(stream = (side == stream->side) ? stream : stream->pair_stream))
281 		return -1;
282 	*(int *)optval = (int)((stream->state == TCP_ST_CLOSED_RSVD) ?
283 						   TCP_ST_CLOSED : stream->state);
284 	return 0;
285 }
286 /*---------------------------------------------------------------------------*/
287 inline char *
288 TCPStateToString(const tcp_stream *stream)
289 {
290 	return (stream) ? state_str[stream->state] : NULL;
291 }
292 /*---------------------------------------------------------------------------*/
293 inline void
294 RaiseReadEvent(mtcp_manager_t mtcp, tcp_stream *stream)
295 {
296 	struct tcp_recv_vars *rcvvar;
297 
298 	rcvvar = stream->rcvvar;
299 
300 	if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) {
301 		if (stream->socket && (stream->socket->epoll & MOS_EPOLLIN))
302 			AddEpollEvent(mtcp->ep, MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLIN);
303 	} else if (rcvvar->rcvbuf && tcprb_cflen(rcvvar->rcvbuf) > 0) {
304 		/*
305 		 * in case it is a monitoring socket, queue up the read events
306 		 * in the event_queue of only if the tcp_stream hasn't already
307 		 * been registered in the event queue
308 		 */
309 		int index;
310 		struct event_queue *eq;
311 		struct socket_map *walk;
312 
313 		SOCKQ_FOREACH_START(walk, &stream->msocks) {
314 			assert(walk->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE);
315 			eq = walk->monitor_stream->monitor_listener->eq;
316 
317 			/* if it already has read data register... then skip this step */
318 			if (stream->actions & MOS_ACT_READ_DATA)
319 				return;
320 			if (eq->num_events >= eq->size) {
321 				TRACE_ERROR("Exceeded epoll event queue! num_events: %d, "
322 						"size: %d\n", eq->num_events, eq->size);
323 				return;
324 			}
325 
326 			index = eq->end++;
327 			eq->events[index].ev.events = MOS_EPOLLIN;
328 			eq->events[index].ev.data.ptr = (void *)stream;
329 
330 			if (eq->end >= eq->size) {
331 				eq->end = 0;
332 			}
333 			eq->num_events++;
334 			stream->actions |= MOS_ACT_READ_DATA;
335 		} SOCKQ_FOREACH_END;
336 	} else {
337 		TRACE_EPOLL("Stream %d: Raising read without a socket!\n", stream->id);
338 	}
339 }
340 /*---------------------------------------------------------------------------*/
341 inline void
342 RaiseWriteEvent(mtcp_manager_t mtcp, tcp_stream *stream)
343 {
344 	if (stream->socket) {
345 		if (stream->socket->epoll & MOS_EPOLLOUT) {
346 			AddEpollEvent(mtcp->ep,
347 				      MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLOUT);
348 		}
349 	} else {
350 		TRACE_EPOLL("Stream %d: Raising write without a socket!\n", stream->id);
351 	}
352 }
353 /*---------------------------------------------------------------------------*/
354 inline void
355 RaiseCloseEvent(mtcp_manager_t mtcp, tcp_stream *stream)
356 {
357 	if (stream->socket) {
358 		if (stream->socket->epoll & MOS_EPOLLRDHUP) {
359 			AddEpollEvent(mtcp->ep,
360 					MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLRDHUP);
361 		} else if (stream->socket->epoll & MOS_EPOLLIN) {
362 			AddEpollEvent(mtcp->ep,
363 				      MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLIN);
364 		}
365 	} else {
366 		TRACE_EPOLL("Stream %d: Raising close without a socket!\n", stream->id);
367 	}
368 }
369 /*---------------------------------------------------------------------------*/
370 inline int
371 RaiseErrorEvent(mtcp_manager_t mtcp, tcp_stream *stream)
372 {
373 	if (stream->socket) {
374 		if (stream->socket->epoll & MOS_EPOLLERR) {
375 			/* passing closing reason for error notification */
376 			return AddEpollEvent(mtcp->ep,
377 					     MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLERR);
378 		}
379 	} else {
380 		TRACE_EPOLL("Stream %d: Raising error without a socket!\n", stream->id);
381 	}
382 	return -1;
383 }
384 /*----------------------------------------------------------------------------*/
385 int
386 AddMonitorStreamSockets(mtcp_manager_t mtcp, struct tcp_stream *stream)
387 {
388 	struct mtcp_context mctx;
389 	int socktype;
390 
391 	mctx.cpu = mtcp->ctx->cpu;
392 	struct mon_listener *walk;
393 
394 	// traverse the passive socket's list
395 	TAILQ_FOREACH(walk, &mtcp->monitors, link) {
396 		socktype = walk->socket->socktype;
397 
398 		if (socktype != MOS_SOCK_MONITOR_STREAM)
399 			continue;
400 
401 		/* mtcp_bind_monitor_filter()
402 		 * - create an monitor active socket only for the filter-passed flows
403 		 * - we use the result (= tag) from DetectStreamType() to avoid
404 		 *   evaluating the same BPF filter twice */
405 		if (!walk->is_stream_syn_filter_hit) {
406 			continue;
407 		}
408 
409 		struct socket_map *s =
410 			AllocateSocket(&mctx, MOS_SOCK_MONITOR_STREAM_ACTIVE);
411 		if (!s)
412 			return -1;
413 
414 		s->monitor_stream->socket = s;
415 		s->monitor_stream->stream = stream;
416 		s->monitor_stream->monitor_listener = walk;
417 		s->monitor_stream->client_buf_mgmt = walk->client_buf_mgmt;
418 		s->monitor_stream->server_buf_mgmt = walk->server_buf_mgmt;
419 		s->monitor_stream->client_mon = walk->client_mon;
420 		s->monitor_stream->server_mon = walk->server_mon;
421 #ifdef NEWEV
422 		s->monitor_stream->stree_dontcare =
423 			s->monitor_stream->monitor_listener->stree_dontcare;
424 		s->monitor_stream->stree_pre_rcv =
425 			s->monitor_stream->monitor_listener->stree_pre_rcv;
426 		s->monitor_stream->stree_post_snd =
427 			s->monitor_stream->monitor_listener->stree_post_snd;
428 		if (s->monitor_stream->stree_dontcare)
429 			stree_inc_ref(s->monitor_stream->stree_dontcare);
430 		if (s->monitor_stream->stree_pre_rcv)
431 			stree_inc_ref(s->monitor_stream->stree_pre_rcv);
432 		if (s->monitor_stream->stree_post_snd)
433 			stree_inc_ref(s->monitor_stream->stree_post_snd);
434 #else
435 		InitEvP(&s->monitor_stream->dontcare_evp,
436 				&walk->dontcare_evb);
437 		InitEvP(&s->monitor_stream->pre_tcp_evp,
438 				&walk->pre_tcp_evb);
439 		InitEvP(&s->monitor_stream->post_tcp_evp,
440 				&walk->post_tcp_evb);
441 #endif
442 
443 		SOCKQ_INSERT_TAIL(&stream->msocks, s);
444 	}
445 
446 	return 0;
447 }
448 /*----------------------------------------------------------------------------*/
449 int
450 DestroyMonitorStreamSocket(mtcp_manager_t mtcp, socket_map_t msock)
451 {
452 	struct mtcp_context mctx;
453 	int socktype, sockid, rc;
454 
455 	if (msock == NULL) {
456 		TRACE_DBG("Stream socket does not exist!\n");
457 		/* exit(-1); */
458 		return 0;
459 	}
460 
461 	rc = 0;
462 	mctx.cpu = mtcp->ctx->cpu;
463 	socktype = msock->socktype;
464 	sockid = msock->id;
465 
466 	switch (socktype) {
467 	case MOS_SOCK_MONITOR_STREAM_ACTIVE:
468 		FreeSocket(&mctx, sockid, socktype);
469 		break;
470 	case MOS_SOCK_MONITOR_RAW:
471 		/* do nothing since all raw sockets point to the same socket */
472 		break;
473 	default:
474 		TRACE_DBG("Trying to destroy a monitor socket for an unsupported type!\n");
475 		rc = -1;
476 		/* exit(-1); */
477 		break;
478 	}
479 
480 	return rc;
481 }
482 /*---------------------------------------------------------------------------*/
483 tcp_stream *
484 CreateTCPStream(mtcp_manager_t mtcp, socket_map_t socket, int type,
485 		uint32_t saddr, uint16_t sport, uint32_t daddr, uint16_t dport,
486 		unsigned int *hash)
487 {
488 	tcp_stream *stream = NULL;
489 	int ret;
490 	/* stand-alone monitor does not need this since it is single-threaded */
491 	bool flow_lock = type & STREAM_TYPE(MOS_SOCK_STREAM);
492 	//bool flow_lock = false;
493 
494 	if (flow_lock)
495 		pthread_mutex_lock(&mtcp->ctx->flow_pool_lock);
496 
497 	stream = (tcp_stream *)MPAllocateChunk(mtcp->flow_pool);
498 	if (!stream) {
499 		TRACE_ERROR("Cannot allocate memory for the stream. "
500 				"g_config.mos->max_concurrency: %d, concurrent: %u\n",
501 				g_config.mos->max_concurrency, mtcp->flow_cnt);
502 		if (flow_lock)
503 			pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock);
504 		return NULL;
505 	}
506 	memset(stream, 0, sizeof(tcp_stream));
507 
508 	stream->rcvvar = (struct tcp_recv_vars *)MPAllocateChunk(mtcp->rv_pool);
509 	if (!stream->rcvvar) {
510 		MPFreeChunk(mtcp->flow_pool, stream);
511 		if (flow_lock)
512 			pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock);
513 		return NULL;
514 	}
515 	memset(stream->rcvvar, 0, sizeof(struct tcp_recv_vars));
516 
517 	/* stand-alone monitor does not need to do this */
518 	stream->sndvar = (struct tcp_send_vars *)MPAllocateChunk(mtcp->sv_pool);
519 	if (!stream->sndvar) {
520 		MPFreeChunk(mtcp->rv_pool, stream->rcvvar);
521 		MPFreeChunk(mtcp->flow_pool, stream);
522 		if (flow_lock)
523 			pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock);
524 		return NULL;
525 	}
526 	//if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM))
527 		memset(stream->sndvar, 0, sizeof(struct tcp_send_vars));
528 
529 	stream->id = mtcp->g_id++;
530 	stream->saddr = saddr;
531 	stream->sport = sport;
532 	stream->daddr = daddr;
533 	stream->dport = dport;
534 
535 	ret = HTInsert(mtcp->tcp_flow_table, stream, hash);
536 	if (ret < 0) {
537 		TRACE_ERROR("Stream %d: "
538 				"Failed to insert the stream into hash table.\n", stream->id);
539 		MPFreeChunk(mtcp->flow_pool, stream);
540 		if (flow_lock)
541 			pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock);
542 		return NULL;
543 	}
544 	stream->on_hash_table = TRUE;
545 	mtcp->flow_cnt++;
546 
547 	SOCKQ_INIT(&stream->msocks);
548 
549 	/*
550 	 * if an embedded monitor is attached...
551 	 *  create monitor stream socket now!
552 	 * If socket type is raw.. then don't create it
553 	 */
554 	if ((mtcp->num_msp > 0) &&
555 		(type & STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE)))
556 		if (AddMonitorStreamSockets(mtcp, stream) < 0)
557 			TRACE_DBG("Could not create monitor stream socket!\n");
558 
559 	if (flow_lock)
560 		pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock);
561 
562 	if (socket) {
563 		stream->socket = socket;
564 		socket->stream = stream;
565 	}
566 
567 	stream->stream_type = type;
568 	stream->state = TCP_ST_LISTEN;
569 	/* This is handled by core.c, tcp_in.c & tcp_out.c */
570 	/* stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; */
571 
572 	stream->on_rto_idx = -1;
573 
574 	/* stand-alone monitor does not need to do this */
575 	stream->sndvar->mss = TCP_DEFAULT_MSS;
576 	stream->sndvar->wscale_mine = TCP_DEFAULT_WSCALE;
577 	stream->sndvar->wscale_peer = 0;
578 
579 	if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) {
580 		stream->sndvar->ip_id = 0;
581 		stream->sndvar->nif_out = GetOutputInterface(stream->daddr);
582 
583 		stream->sndvar->iss = posix_seq_rand() % TCP_MAX_SEQ;
584 		//stream->sndvar->iss = 0;
585 		stream->snd_nxt = stream->sndvar->iss;
586 		stream->sndvar->snd_una = stream->sndvar->iss;
587 		stream->sndvar->snd_wnd = g_config.mos->wmem_size;
588 		stream->sndvar->rto = TCP_INITIAL_RTO;
589 #if USE_SPIN_LOCK
590 		if (pthread_spin_init(&stream->sndvar->write_lock, PTHREAD_PROCESS_PRIVATE)) {
591 			perror("pthread_spin_init of write_lock");
592 			pthread_spin_destroy(&stream->rcvvar->read_lock);
593 #else
594 		if (pthread_mutex_init(&stream->sndvar->write_lock, NULL)) {
595 			perror("pthread_mutex_init of write_lock");
596 			pthread_mutex_destroy(&stream->rcvvar->read_lock);
597 #endif
598 			return NULL;
599 		}
600 	}
601 	stream->rcvvar->irs = 0;
602 
603 	stream->rcv_nxt = 0;
604 	stream->rcvvar->rcv_wnd = TCP_INITIAL_WINDOW;
605 
606 	stream->rcvvar->snd_wl1 = stream->rcvvar->irs - 1;
607 
608 	stream->buffer_mgmt = BUFMGMT_FULL;
609 
610 	/* needs state update by default */
611 	stream->status_mgmt = 1;
612 
613 #if USE_SPIN_LOCK
614 	if (pthread_spin_init(&stream->rcvvar->read_lock, PTHREAD_PROCESS_PRIVATE)) {
615 #else
616 	if (pthread_mutex_init(&stream->rcvvar->read_lock, NULL)) {
617 #endif
618 		perror("pthread_mutex_init of read_lock");
619 		return NULL;
620 	}
621 
622 #ifdef STREAM
623 	uint8_t *sa;
624 	uint8_t *da;
625 
626 	sa = (uint8_t *)&stream->saddr;
627 	da = (uint8_t *)&stream->daddr;
628 	TRACE_STREAM("CREATED NEW TCP STREAM %d: "
629 			"%u.%u.%u.%u(%d) -> %u.%u.%u.%u(%d) (ISS: %u)\n", stream->id,
630 			sa[0], sa[1], sa[2], sa[3], ntohs(stream->sport),
631 			da[0], da[1], da[2], da[3], ntohs(stream->dport),
632 			stream->sndvar->iss);
633 #endif
634 
635 	return stream;
636 }
637 /*----------------------------------------------------------------------------*/
638 inline tcp_stream *
639 CreateDualTCPStream(mtcp_manager_t mtcp, socket_map_t socket, int type, uint32_t saddr,
640 		    uint16_t sport, uint32_t daddr, uint16_t dport, unsigned int *hash)
641 {
642         tcp_stream *cur_stream, *paired_stream;
643         struct socket_map *walk;
644 
645         cur_stream = CreateTCPStream(mtcp, socket, type,
646 				     saddr, sport, daddr, dport, hash);
647         if (cur_stream == NULL) {
648                 TRACE_ERROR("Can't create tcp_stream!\n");
649                 return NULL;
650         }
651 
652 		paired_stream = CreateTCPStream(mtcp, NULL, MOS_SOCK_UNUSED,
653 						daddr, dport, saddr, sport, hash);
654         if (paired_stream == NULL) {
655                 DestroyTCPStream(mtcp, cur_stream);
656                 TRACE_ERROR("Can't create tcp_stream!\n");
657                 return NULL;
658         }
659 
660         cur_stream->pair_stream = paired_stream;
661         paired_stream->pair_stream = cur_stream;
662         paired_stream->socket = socket;
663         SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
664                 SOCKQ_INSERT_TAIL(&paired_stream->msocks, walk);
665         } SOCKQ_FOREACH_END;
666         paired_stream->stream_type = STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE);
667 
668         return cur_stream;
669 }
670 /*----------------------------------------------------------------------------*/
671 inline tcp_stream *
672 CreateClientTCPStream(mtcp_manager_t mtcp, socket_map_t socket, int type,
673 			uint32_t saddr, uint16_t sport, uint32_t daddr, uint16_t dport,
674 			unsigned int *hash)
675 {
676 	tcp_stream *cs;
677 	struct socket_map *w;
678 
679 	cs = CreateTCPStream(mtcp, socket, type, daddr, dport, saddr, sport, hash);
680 	if (cs == NULL) {
681 		TRACE_ERROR("Can't create tcp_stream!\n");
682 		return NULL;
683 	}
684 
685 	cs->side = MOS_SIDE_CLI;
686 	cs->pair_stream = NULL;
687 
688 	/* if buffer management is off, then disable
689 	 * monitoring tcp ring of either streams (only if stream
690 	 * is just monitor stream active)
691 	 */
692 	if (IS_STREAM_TYPE(cs, MOS_SOCK_MONITOR_STREAM_ACTIVE)) {
693 		cs->buffer_mgmt = BUFMGMT_OFF;
694 		SOCKQ_FOREACH_START(w, &cs->msocks) {
695 			uint8_t bm = w->monitor_stream->client_buf_mgmt;
696 			if (bm > cs->buffer_mgmt)
697 				cs->buffer_mgmt = bm;
698 			if (w->monitor_stream->monitor_listener->client_mon == 1)
699 				cs->status_mgmt = 1;
700 		} SOCKQ_FOREACH_END;
701 	}
702 
703 	return cs;
704 }
705 /*----------------------------------------------------------------------------*/
706 inline tcp_stream *
707 AttachServerTCPStream(mtcp_manager_t mtcp, tcp_stream *cs, int type,
708 			uint32_t saddr, uint16_t sport, uint32_t daddr, uint16_t dport)
709 {
710 	tcp_stream *ss;
711 	struct socket_map *w;
712 
713 	/* The 3rd arg is a temp hackk... FIXIT! TODO: XXX */
714 	ss = CreateTCPStream(mtcp, NULL, MOS_SOCK_UNUSED, saddr, sport, daddr, dport, NULL);
715 	if (ss == NULL) {
716 		TRACE_ERROR("Can't create tcp_stream!\n");
717 		return NULL;
718 	}
719 
720 	ss->side = MOS_SIDE_SVR;
721 	cs->pair_stream = ss;
722 	ss->pair_stream = cs;
723 	ss->socket = cs->socket;
724 	SOCKQ_FOREACH_START(w, &cs->msocks) {
725 		SOCKQ_INSERT_TAIL(&ss->msocks, w);
726 	} SOCKQ_FOREACH_END;
727 	ss->stream_type = STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE);
728 
729 	if (IS_STREAM_TYPE(ss, MOS_SOCK_MONITOR_STREAM_ACTIVE)) {
730 		ss->buffer_mgmt = BUFMGMT_OFF;
731 		SOCKQ_FOREACH_START(w, &ss->msocks) {
732 			uint8_t bm = w->monitor_stream->server_buf_mgmt;
733 			if (bm > ss->buffer_mgmt)
734 				ss->buffer_mgmt = bm;
735 			if (w->monitor_stream->monitor_listener->server_mon == 1)
736 				ss->status_mgmt = 1;
737 		} SOCKQ_FOREACH_END;
738 	}
739 
740 	return ss;
741 }
742 /*---------------------------------------------------------------------------*/
743 static void
744 DestroySingleTCPStream(mtcp_manager_t mtcp, tcp_stream *stream)
745 {
746 	struct sockaddr_in addr;
747 	int bound_addr = FALSE;
748 	int ret;
749 	/* stand-alone monitor does not need this since it is single-threaded */
750 	bool flow_lock = HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM);
751 
752 	struct socket_map *walk;
753 
754 	/* Set the stream state as CLOSED */
755 	stream->state = TCP_ST_CLOSED_RSVD;
756 
757 	SOCKQ_FOREACH_START(walk, &stream->msocks) {
758 		HandleCallback(mtcp, MOS_HK_RCV, walk, stream->side, NULL,
759 			   MOS_ON_CONN_END | MOS_ON_TCP_STATE_CHANGE | stream->cb_events);
760 		HandleCallback(mtcp, MOS_HK_SND, walk, stream->side, NULL,
761 			   MOS_ON_CONN_END | MOS_ON_TCP_STATE_CHANGE | stream->cb_events);
762 	} SOCKQ_FOREACH_END;
763 
764 #if 0
765 #ifdef DUMP_STREAM
766 	if (stream->close_reason != TCP_ACTIVE_CLOSE &&
767 			stream->close_reason != TCP_PASSIVE_CLOSE) {
768 		thread_printf(mtcp, mtcp->log_fp,
769 				"Stream %d abnormally closed.\n", stream->id);
770 		DumpStream(mtcp, stream);
771 		DumpControlList(mtcp, mtcp->n_sender[0]);
772 	}
773 #endif
774 
775 #ifdef STREAM
776 	uint8_t *sa, *da;
777 	sa = (uint8_t *)&stream->saddr;
778 	da = (uint8_t *)&stream->daddr;
779 	TRACE_STREAM("DESTROY TCP STREAM %d: "
780 			"%u.%u.%u.%u(%d) -> %u.%u.%u.%u(%d) (%s)\n", stream->id,
781 			sa[0], sa[1], sa[2], sa[3], ntohs(stream->sport),
782 			da[0], da[1], da[2], da[3], ntohs(stream->dport),
783 			close_reason_str[stream->close_reason]);
784 #endif
785 
786 	if (stream->sndvar->sndbuf) {
787 		TRACE_FSTAT("Stream %d: send buffer "
788 				"cum_len: %lu, len: %u\n", stream->id,
789 				stream->sndvar->sndbuf->cum_len,
790 				stream->sndvar->sndbuf->len);
791 	}
792 	if (stream->rcvvar->rcvbuf) {
793 		TRACE_FSTAT("Stream %d: recv buffer "
794 				"cum_len: %lu, merged_len: %u, last_len: %u\n", stream->id,
795 				stream->rcvvar->rcvbuf->cum_len,
796 				stream->rcvvar->rcvbuf->merged_len,
797 				stream->rcvvar->rcvbuf->last_len);
798 	}
799 
800 #if RTM_STAT
801 	/* Triple duplicated ack stats */
802 	if (stream->sndvar->rstat.tdp_ack_cnt) {
803 		TRACE_FSTAT("Stream %d: triple duplicated ack: %u, "
804 				"retransmission bytes: %u, average rtm bytes/ack: %u\n",
805 				stream->id,
806 				stream->sndvar->rstat.tdp_ack_cnt, stream->sndvar->rstat.tdp_ack_bytes,
807 				stream->sndvar->rstat.tdp_ack_bytes / stream->sndvar->rstat.tdp_ack_cnt);
808 	}
809 
810 	/* Retransmission timeout stats */
811 	if (stream->sndvar->rstat.rto_cnt > 0) {
812 		TRACE_FSTAT("Stream %d: timeout count: %u, bytes: %u\n", stream->id,
813 				stream->sndvar->rstat.rto_cnt, stream->sndvar->rstat.rto_bytes);
814 	}
815 
816 	/* Recovery stats */
817 	if (stream->sndvar->rstat.ack_upd_cnt) {
818 		TRACE_FSTAT("Stream %d: snd_nxt update count: %u, "
819 				"snd_nxt update bytes: %u, average update bytes/update: %u\n",
820 				stream->id,
821 				stream->sndvar->rstat.ack_upd_cnt, stream->sndvar->rstat.ack_upd_bytes,
822 				stream->sndvar->rstat.ack_upd_bytes / stream->sndvar->rstat.ack_upd_cnt);
823 	}
824 #if TCP_OPT_SACK_ENABLED
825 	if (stream->sndvar->rstat.sack_cnt) {
826 		TRACE_FSTAT("Selective ack count: %u, bytes: %u, "
827 				"average bytes/ack: %u\n",
828 				stream->sndvar->rstat.sack_cnt, stream->sndvar->rstat.sack_bytes,
829 				stream->sndvar->rstat.sack_bytes / stream->sndvar->rstat.sack_cnt);
830 	} else {
831 		TRACE_FSTAT("Selective ack count: %u, bytes: %u\n",
832 				stream->sndvar->rstat.sack_cnt, stream->sndvar->rstat.sack_bytes);
833 	}
834 	if (stream->sndvar->rstat.tdp_sack_cnt) {
835 		TRACE_FSTAT("Selective tdp ack count: %u, bytes: %u, "
836 				"average bytes/ack: %u\n",
837 				stream->sndvar->rstat.tdp_sack_cnt, stream->sndvar->rstat.tdp_sack_bytes,
838 				stream->sndvar->rstat.tdp_sack_bytes / stream->sndvar->rstat.tdp_sack_cnt);
839 	} else {
840 		TRACE_FSTAT("Selective ack count: %u, bytes: %u\n",
841 				stream->sndvar->rstat.tdp_sack_cnt, stream->sndvar->rstat.tdp_sack_bytes);
842 	}
843 #endif /* TCP_OPT_SACK_ENABLED */
844 #endif /* RTM_STAT */
845 #endif
846 
847 	if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) {
848 		/* stand-alone monitor does not need to do these */
849 		if (stream->is_bound_addr) {
850 			bound_addr = TRUE;
851 			addr.sin_addr.s_addr = stream->saddr;
852 			addr.sin_port = stream->sport;
853 		}
854 
855 		RemoveFromControlList(mtcp, stream);
856 		RemoveFromSendList(mtcp, stream);
857 		RemoveFromACKList(mtcp, stream);
858 
859 		if (stream->on_rto_idx >= 0)
860 			RemoveFromRTOList(mtcp, stream);
861 
862 		SBUF_LOCK_DESTROY(&stream->rcvvar->read_lock);
863 		SBUF_LOCK_DESTROY(&stream->sndvar->write_lock);
864 
865 		assert(stream->on_hash_table == TRUE);
866 
867 		/* free ring buffers */
868 		if (stream->sndvar->sndbuf) {
869 			SBFree(mtcp->rbm_snd, stream->sndvar->sndbuf);
870 			stream->sndvar->sndbuf = NULL;
871 		}
872 	}
873 
874 	if (stream->on_timewait_list)
875 		RemoveFromTimewaitList(mtcp, stream);
876 
877 	if (g_config.mos->tcp_timeout > 0)
878 		RemoveFromTimeoutList(mtcp, stream);
879 
880 	if (stream->rcvvar->rcvbuf) {
881 		tcprb_del(stream->rcvvar->rcvbuf);
882 		stream->rcvvar->rcvbuf = NULL;
883 	}
884 
885 	if (flow_lock)
886 		pthread_mutex_lock(&mtcp->ctx->flow_pool_lock);
887 
888 	/* remove from flow hash table */
889 	HTRemove(mtcp->tcp_flow_table, stream);
890 	stream->on_hash_table = FALSE;
891 
892 	mtcp->flow_cnt--;
893 
894 	/* if there was a corresponding monitor stream socket opened
895 	 * then close it */
896 	SOCKQ_FOREACH_START(walk, &stream->msocks) {
897 		SOCKQ_REMOVE(&stream->msocks, walk);
898 		if (stream->pair_stream == NULL)
899 			DestroyMonitorStreamSocket(mtcp, walk);
900 	} SOCKQ_FOREACH_END;
901 
902 	if (stream->pair_stream != NULL) {
903 		/* Nullify pointer to sibliing tcp_stream's pair_stream */
904 		stream->pair_stream->pair_stream = NULL;
905 	}
906 
907 	MPFreeChunk(mtcp->rv_pool, stream->rcvvar);
908 	MPFreeChunk(mtcp->sv_pool, stream->sndvar);
909 	MPFreeChunk(mtcp->flow_pool, stream);
910 
911 	if (flow_lock)
912 		/* stand-alone monitor does not need this since it is single-threaded */
913 		pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock);
914 
915 	if (bound_addr) {
916 		if (mtcp->ap) {
917 			ret = FreeAddress(mtcp->ap, &addr);
918 		} else {
919 			int nif;
920 			nif = GetOutputInterface(addr.sin_addr.s_addr);
921 			ret = FreeAddress(ap[nif], &addr);
922 		}
923 		if (ret < 0) {
924 			TRACE_ERROR("(NEVER HAPPEN) Failed to free address.\n");
925 		}
926 	}
927 
928 #ifdef NETSTAT
929 #if NETSTAT_PERTHREAD
930 	TRACE_STREAM("Destroyed. Remaining flows: %u\n", mtcp->flow_cnt);
931 #endif /* NETSTAT_PERTHREAD */
932 #endif /* NETSTAT */
933 
934 }
935 /*---------------------------------------------------------------------------*/
936 void
937 DestroyTCPStream(mtcp_manager_t mtcp, tcp_stream *stream)
938 {
939 	tcp_stream *pair_stream = stream->pair_stream;
940 
941 	DestroySingleTCPStream(mtcp, stream);
942 
943 	if (pair_stream)
944 		DestroySingleTCPStream(mtcp, pair_stream);
945 }
946 /*---------------------------------------------------------------------------*/
947 void
948 DumpStream(mtcp_manager_t mtcp, tcp_stream *stream)
949 {
950 	uint8_t *sa, *da;
951 	struct tcp_send_vars *sndvar = stream->sndvar;
952 	struct tcp_recv_vars *rcvvar = stream->rcvvar;
953 
954 	sa = (uint8_t *)&stream->saddr;
955 	da = (uint8_t *)&stream->daddr;
956 	thread_printf(mtcp, mtcp->log_fp, "========== Stream %u: "
957 			"%u.%u.%u.%u(%u) -> %u.%u.%u.%u(%u) ==========\n", stream->id,
958 			sa[0], sa[1], sa[2], sa[3], ntohs(stream->sport),
959 			da[0], da[1], da[2], da[3], ntohs(stream->dport));
960 	thread_printf(mtcp, mtcp->log_fp,
961 			"Stream id: %u, type: %u, state: %s, close_reason: %s\n",
962 			stream->id, stream->stream_type,
963 			TCPStateToString(stream), close_reason_str[stream->close_reason]);
964 	if (stream->socket) {
965 		socket_map_t socket = stream->socket;
966 		thread_printf(mtcp, mtcp->log_fp, "Socket id: %d, type: %d, opts: %u\n"
967 				"epoll: %u (IN: %u, OUT: %u, ERR: %u, RDHUP: %u, ET: %u)\n"
968 				"events: %u (IN: %u, OUT: %u, ERR: %u, RDHUP: %u, ET: %u)\n",
969 				socket->id, socket->socktype, socket->opts,
970 				socket->epoll, socket->epoll & MOS_EPOLLIN,
971 				socket->epoll & MOS_EPOLLOUT, socket->epoll & MOS_EPOLLERR,
972 				socket->epoll & MOS_EPOLLRDHUP, socket->epoll & MOS_EPOLLET,
973 				socket->events, socket->events & MOS_EPOLLIN,
974 				socket->events & MOS_EPOLLOUT, socket->events & MOS_EPOLLERR,
975 				socket->events & MOS_EPOLLRDHUP, socket->events & MOS_EPOLLET);
976 	} else {
977 		thread_printf(mtcp, mtcp->log_fp, "Socket: (null)\n");
978 	}
979 
980 	thread_printf(mtcp, mtcp->log_fp,
981 			"on_hash_table: %u, on_control_list: %u (wait: %u), on_send_list: %u, "
982 			"on_ack_list: %u, is_wack: %u, ack_cnt: %u\n"
983 			"on_rto_idx: %d, on_timewait_list: %u, on_timeout_list: %u, "
984 			"on_rcv_br_list: %u, on_snd_br_list: %u\n"
985 			"on_sendq: %u, on_ackq: %u, closed: %u, on_closeq: %u, "
986 			"on_closeq_int: %u, on_resetq: %u, on_resetq_int: %u\n"
987 			"have_reset: %u, is_fin_sent: %u, is_fin_ackd: %u, "
988 			"saw_timestamp: %u, sack_permit: %u, "
989 			"is_bound_addr: %u, need_wnd_adv: %u\n", stream->on_hash_table,
990 			sndvar->on_control_list, stream->control_list_waiting, sndvar->on_send_list,
991 			sndvar->on_ack_list, sndvar->is_wack, sndvar->ack_cnt,
992 			stream->on_rto_idx, stream->on_timewait_list, stream->on_timeout_list,
993 			stream->on_rcv_br_list, stream->on_snd_br_list,
994 			sndvar->on_sendq, sndvar->on_ackq,
995 			stream->closed, sndvar->on_closeq, sndvar->on_closeq_int,
996 			sndvar->on_resetq, sndvar->on_resetq_int,
997 			stream->have_reset, sndvar->is_fin_sent,
998 			sndvar->is_fin_ackd, stream->saw_timestamp, stream->sack_permit,
999 			stream->is_bound_addr, stream->need_wnd_adv);
1000 
1001 	thread_printf(mtcp, mtcp->log_fp, "========== Send variables ==========\n");
1002 	thread_printf(mtcp, mtcp->log_fp,
1003 		      "ip_id: %u, mss: %u, eff_mss: %u, wscale(me, peer): (%u, %u), nif_out: %d\n",
1004 		      sndvar->ip_id, sndvar->mss, sndvar->eff_mss,
1005 		      sndvar->wscale_mine, sndvar->wscale_peer, sndvar->nif_out);
1006 	thread_printf(mtcp, mtcp->log_fp,
1007 			"snd_nxt: %u, snd_una: %u, iss: %u, fss: %u\nsnd_wnd: %u, "
1008 			"peer_wnd: %u, cwnd: %u, ssthresh: %u\n",
1009 			stream->snd_nxt, sndvar->snd_una, sndvar->iss, sndvar->fss,
1010 			sndvar->snd_wnd, sndvar->peer_wnd, sndvar->cwnd, sndvar->ssthresh);
1011 
1012 	if (sndvar->sndbuf) {
1013 		thread_printf(mtcp, mtcp->log_fp,
1014 				"Send buffer: init_seq: %u, head_seq: %u, "
1015 				"len: %d, cum_len: %lu, size: %d\n",
1016 				sndvar->sndbuf->init_seq, sndvar->sndbuf->head_seq,
1017 				sndvar->sndbuf->len, sndvar->sndbuf->cum_len, sndvar->sndbuf->size);
1018 	} else {
1019 		thread_printf(mtcp, mtcp->log_fp, "Send buffer: (null)\n");
1020 	}
1021 	thread_printf(mtcp, mtcp->log_fp,
1022 			"nrtx: %u, max_nrtx: %u, rto: %u, ts_rto: %u, "
1023 			"ts_lastack_sent: %u\n", sndvar->nrtx, sndvar->max_nrtx,
1024 			sndvar->rto, sndvar->ts_rto, sndvar->ts_lastack_sent);
1025 
1026 	thread_printf(mtcp, mtcp->log_fp,
1027 			"========== Receive variables ==========\n");
1028 	thread_printf(mtcp, mtcp->log_fp,
1029 			"rcv_nxt: %u, irs: %u, rcv_wnd: %u, "
1030 			"snd_wl1: %u, snd_wl2: %u\n",
1031 			stream->rcv_nxt, rcvvar->irs,
1032 			rcvvar->rcv_wnd, rcvvar->snd_wl1, rcvvar->snd_wl2);
1033 	if (!rcvvar->rcvbuf) {
1034 		thread_printf(mtcp, mtcp->log_fp, "Receive buffer: (null)\n");
1035 	}
1036 	thread_printf(mtcp, mtcp->log_fp, "last_ack_seq: %u, dup_acks: %u\n",
1037 			rcvvar->last_ack_seq, rcvvar->dup_acks);
1038 	thread_printf(mtcp, mtcp->log_fp,
1039 			"ts_recent: %u, ts_lastack_rcvd: %u, ts_last_ts_upd: %u, "
1040 			"ts_tw_expire: %u\n", rcvvar->ts_recent, rcvvar->ts_lastack_rcvd,
1041 			rcvvar->ts_last_ts_upd, rcvvar->ts_tw_expire);
1042 	thread_printf(mtcp, mtcp->log_fp,
1043 			"srtt: %u, mdev: %u, mdev_max: %u, rttvar: %u, rtt_seq: %u\n",
1044 			rcvvar->srtt, rcvvar->mdev, rcvvar->mdev_max,
1045 			rcvvar->rttvar, rcvvar->rtt_seq);
1046 }
1047 /*---------------------------------------------------------------------------*/
1048