xref: /mOS-networking-stack/core/src/tcp_in.c (revision 152f7c19)
1 #include <assert.h>
2 
3 #include "mos_api.h"
4 #include "tcp_util.h"
5 #include "tcp_in.h"
6 #include "tcp_out.h"
7 #include "tcp_ring_buffer.h"
8 #include "eventpoll.h"
9 #include "debug.h"
10 #include "timer.h"
11 #include "ip_in.h"
12 #include "tcp_rb.h"
13 #include "config.h"
14 #include "scalable_event.h"
15 
16 #define MAX(a, b) ((a)>(b)?(a):(b))
17 #define MIN(a, b) ((a)<(b)?(a):(b))
18 
19 #define RECOVERY_AFTER_LOSS TRUE
20 #define SELECTIVE_WRITE_EVENT_NOTIFY TRUE
21 /*----------------------------------------------------------------------------*/
22 static inline void
23 Handle_TCP_ST_ESTABLISHED (mtcp_manager_t mtcp, tcp_stream* cur_stream,
24 		struct pkt_ctx *pctx);
25 /*----------------------------------------------------------------------------*/
26 static inline int
27 FilterSYNPacket(mtcp_manager_t mtcp, uint32_t ip, uint16_t port)
28 {
29 	struct sockaddr_in *addr;
30 
31 	/* TODO: This listening logic should be revised */
32 
33 	/* if not listening, drop */
34 	if (!mtcp->listener) {
35 		return FALSE;
36 	}
37 
38 	/* if not the address we want, drop */
39 	addr = &mtcp->listener->socket->saddr;
40 	if (addr->sin_port == port) {
41 		if (addr->sin_addr.s_addr != INADDR_ANY) {
42 			if (ip == addr->sin_addr.s_addr) {
43 				return TRUE;
44 			}
45 			return FALSE;
46 		} else {
47 			int i;
48 
49 			for (i = 0; i < g_config.mos->netdev_table->num; i++) {
50 				if (ip == g_config.mos->netdev_table->ent[i]->ip_addr) {
51 					return TRUE;
52 				}
53 			}
54 			return FALSE;
55 		}
56 	}
57 
58 	return FALSE;
59 }
60 /*----------------------------------------------------------------------------*/
61 static inline int
62 HandleActiveOpen(mtcp_manager_t mtcp, tcp_stream *cur_stream,
63 		struct pkt_ctx *pctx)
64 {
65 	const struct tcphdr* tcph = pctx->p.tcph;
66 
67 	cur_stream->rcvvar->irs = pctx->p.seq;
68 	cur_stream->snd_nxt = pctx->p.ack_seq;
69 	cur_stream->sndvar->peer_wnd = pctx->p.window;
70 	cur_stream->rcvvar->snd_wl1 = cur_stream->rcvvar->irs - 1;
71 	cur_stream->rcv_nxt = cur_stream->rcvvar->irs + 1;
72 	cur_stream->rcvvar->last_ack_seq = pctx->p.ack_seq;
73 	ParseTCPOptions(cur_stream, pctx->p.cur_ts, (uint8_t *)tcph + TCP_HEADER_LEN,
74 			(tcph->doff << 2) - TCP_HEADER_LEN);
75 	cur_stream->sndvar->cwnd = ((cur_stream->sndvar->cwnd == 1)?
76 			(cur_stream->sndvar->mss * 2): cur_stream->sndvar->mss);
77 	cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10;
78 	if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
79 		UpdateRetransmissionTimer(mtcp, cur_stream, pctx->p.cur_ts);
80 
81 	return TRUE;
82 }
83 /*----------------------------------------------------------------------------*/
84 /* ValidateSequence: validates sequence number of the segment                 */
85 /* Return: TRUE if acceptable, FALSE if not acceptable                        */
86 /*----------------------------------------------------------------------------*/
87 static inline int
88 ValidateSequence(mtcp_manager_t mtcp, tcp_stream *cur_stream,
89 		struct pkt_ctx *pctx)
90 {
91 	const struct tcphdr* tcph = pctx->p.tcph;
92 
93 	/* Protect Against Wrapped Sequence number (PAWS) */
94 	if (!tcph->rst && cur_stream->saw_timestamp) {
95 		struct tcp_timestamp ts;
96 
97 		if (!ParseTCPTimestamp(cur_stream, &ts,
98 				(uint8_t *)tcph + TCP_HEADER_LEN,
99 				(tcph->doff << 2) - TCP_HEADER_LEN)) {
100 			/* if there is no timestamp */
101 			/* TODO: implement here */
102 			TRACE_DBG("No timestamp found.\n");
103 			return FALSE;
104 		}
105 
106 		/* RFC1323: if SEG.TSval < TS.Recent, drop and send ack */
107 		if (TCP_SEQ_LT(ts.ts_val, cur_stream->rcvvar->ts_recent)) {
108 			/* TODO: ts_recent should be invalidated
109 					 before timestamp wraparound for long idle flow */
110 			TRACE_DBG("PAWS Detect wrong timestamp. "
111 					"seq: %u, ts_val: %u, prev: %u\n",
112 					pctx->p.seq, ts.ts_val, cur_stream->rcvvar->ts_recent);
113 			cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
114 			return FALSE;
115 		} else {
116 			/* valid timestamp */
117 			if (TCP_SEQ_GT(ts.ts_val, cur_stream->rcvvar->ts_recent)) {
118 				TRACE_TSTAMP("Timestamp update. cur: %u, prior: %u "
119 					"(time diff: %uus)\n",
120 					ts.ts_val, cur_stream->rcvvar->ts_recent,
121 					TS_TO_USEC(pctx->p.cur_ts - cur_stream->rcvvar->ts_last_ts_upd));
122 				cur_stream->rcvvar->ts_last_ts_upd = pctx->p.cur_ts;
123 			}
124 
125 			cur_stream->rcvvar->ts_recent = ts.ts_val;
126 			cur_stream->rcvvar->ts_lastack_rcvd = ts.ts_ref;
127 		}
128 	}
129 
130 	/* TCP sequence validation */
131 	if (!TCP_SEQ_BETWEEN(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt,
132 				cur_stream->rcv_nxt + cur_stream->rcvvar->rcv_wnd)) {
133 
134 		/* if RST bit is set, ignore the segment */
135 		if (tcph->rst)
136 			return FALSE;
137 
138 		if (cur_stream->state == TCP_ST_ESTABLISHED) {
139 			/* check if it is to get window advertisement */
140 			if (pctx->p.seq + 1 == cur_stream->rcv_nxt) {
141 				TRACE_DBG("Window update request. (seq: %u, rcv_wnd: %u)\n",
142 						pctx->p.seq, cur_stream->rcvvar->rcv_wnd);
143 				cur_stream->actions |= MOS_ACT_SEND_ACK_AGG;
144 				return FALSE;
145 
146 			}
147 
148 			if (TCP_SEQ_LEQ(pctx->p.seq, cur_stream->rcv_nxt)) {
149 				cur_stream->actions |= MOS_ACT_SEND_ACK_AGG;
150 			} else {
151 				cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
152 			}
153 		} else {
154 			if (cur_stream->state == TCP_ST_TIME_WAIT) {
155 				TRACE_DBG("Stream %d: tw expire update to %u\n",
156 						cur_stream->id, cur_stream->rcvvar->ts_tw_expire);
157 				AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts);
158 			}
159 			cur_stream->actions |= MOS_ACT_SEND_CONTROL;
160 		}
161 		return FALSE;
162 	}
163 
164 	return TRUE;
165 }
166 /*----------------------------------------------------------------------------*/
167 static inline void
168 NotifyConnectionReset(mtcp_manager_t mtcp, tcp_stream *cur_stream)
169 {
170 	TRACE_DBG("Stream %d: Notifying connection reset.\n", cur_stream->id);
171 	/* TODO: implement this function */
172 	/* signal to user "connection reset" */
173 }
174 /*----------------------------------------------------------------------------*/
175 static inline int
176 ProcessRST(mtcp_manager_t mtcp, tcp_stream *cur_stream,
177 		struct pkt_ctx *pctx)
178 {
179 	/* TODO: we need reset validation logic */
180 	/* the sequence number of a RST should be inside window */
181 	/* (in SYN_SENT state, it should ack the previous SYN */
182 
183 	TRACE_DBG("Stream %d: TCP RESET (%s)\n",
184 			cur_stream->id, TCPStateToString(cur_stream));
185 #if DUMP_STREAM
186 	DumpStream(mtcp, cur_stream);
187 #endif
188 
189 	if (cur_stream->state <= TCP_ST_SYN_SENT) {
190 		/* not handled here */
191 		return FALSE;
192 	}
193 
194 	if (cur_stream->state == TCP_ST_SYN_RCVD) {
195 		/* ACK number of last sent ACK packet == rcv_nxt + 1*/
196 		if (pctx->p.seq == 0 ||
197 #ifdef BE_RESILIENT_TO_PACKET_DROP
198 			pctx->p.seq == cur_stream->rcv_nxt + 1 ||
199 #endif
200 			pctx->p.ack_seq == cur_stream->snd_nxt)
201 		{
202 			cur_stream->state = TCP_ST_CLOSED_RSVD;
203 			cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
204 			cur_stream->close_reason = TCP_RESET;
205 			cur_stream->actions |= MOS_ACT_DESTROY;
206 		} else {
207 			RAISE_DEBUG_EVENT(mtcp, cur_stream,
208 				"(SYN_RCVD): Ignore invalid RST. "
209 				"ack_seq expected: %u, ack_seq rcvd: %u\n",
210 				cur_stream->rcv_nxt + 1, pctx->p.ack_seq);
211 		}
212 		return TRUE;
213 	}
214 
215 	/* if the application is already closed the connection,
216 	   just destroy the it */
217 	if (cur_stream->state == TCP_ST_FIN_WAIT_1 ||
218 			cur_stream->state == TCP_ST_FIN_WAIT_2 ||
219 			cur_stream->state == TCP_ST_LAST_ACK ||
220 			cur_stream->state == TCP_ST_CLOSING ||
221 			cur_stream->state == TCP_ST_TIME_WAIT) {
222 		cur_stream->state = TCP_ST_CLOSED_RSVD;
223 		cur_stream->close_reason = TCP_ACTIVE_CLOSE;
224 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
225 		cur_stream->actions |= MOS_ACT_DESTROY;
226 		return TRUE;
227 	}
228 
229 	if (cur_stream->state >= TCP_ST_ESTABLISHED &&
230 			cur_stream->state <= TCP_ST_CLOSE_WAIT) {
231 		/* ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
232 		/* TODO: flush all the segment queues */
233 		//NotifyConnectionReset(mtcp, cur_stream);
234 	}
235 
236 	if (!(cur_stream->sndvar->on_closeq || cur_stream->sndvar->on_closeq_int ||
237 		  cur_stream->sndvar->on_resetq || cur_stream->sndvar->on_resetq_int)) {
238 		//cur_stream->state = TCP_ST_CLOSED_RSVD;
239 		//cur_stream->actions |= MOS_ACT_DESTROY;
240 		cur_stream->state = TCP_ST_CLOSED_RSVD;
241 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
242 		cur_stream->close_reason = TCP_RESET;
243 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
244 			RaiseCloseEvent(mtcp, cur_stream);
245 	}
246 
247 	return TRUE;
248 }
249 /*----------------------------------------------------------------------------*/
250 inline void
251 EstimateRTT(mtcp_manager_t mtcp, tcp_stream *cur_stream, uint32_t mrtt)
252 {
253 	/* This function should be called for not retransmitted packets */
254 	/* TODO: determine tcp_rto_min */
255 #define TCP_RTO_MIN 0
256 	long m = mrtt;
257 	uint32_t tcp_rto_min = TCP_RTO_MIN;
258 	struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
259 
260 	if (m == 0) {
261 		m = 1;
262 	}
263 	if (rcvvar->srtt != 0) {
264 		/* rtt = 7/8 rtt + 1/8 new */
265 		m -= (rcvvar->srtt >> 3);
266 		rcvvar->srtt += m;
267 		if (m < 0) {
268 			m = -m;
269 			m -= (rcvvar->mdev >> 2);
270 			if (m > 0) {
271 				m >>= 3;
272 			}
273 		} else {
274 			m -= (rcvvar->mdev >> 2);
275 		}
276 		rcvvar->mdev += m;
277 		if (rcvvar->mdev > rcvvar->mdev_max) {
278 			rcvvar->mdev_max = rcvvar->mdev;
279 			if (rcvvar->mdev_max > rcvvar->rttvar) {
280 				rcvvar->rttvar = rcvvar->mdev_max;
281 			}
282 		}
283 		if (TCP_SEQ_GT(cur_stream->sndvar->snd_una, rcvvar->rtt_seq)) {
284 			if (rcvvar->mdev_max < rcvvar->rttvar) {
285 				rcvvar->rttvar -= (rcvvar->rttvar - rcvvar->mdev_max) >> 2;
286 			}
287 			rcvvar->rtt_seq = cur_stream->snd_nxt;
288 			rcvvar->mdev_max = tcp_rto_min;
289 		}
290 	} else {
291 		/* fresh measurement */
292 		rcvvar->srtt = m << 3;
293 		rcvvar->mdev = m << 1;
294 		rcvvar->mdev_max = rcvvar->rttvar = MAX(rcvvar->mdev, tcp_rto_min);
295 		rcvvar->rtt_seq = cur_stream->snd_nxt;
296 	}
297 
298 	TRACE_RTT("mrtt: %u (%uus), srtt: %u (%ums), mdev: %u, mdev_max: %u, "
299 			"rttvar: %u, rtt_seq: %u\n", mrtt, mrtt * TIME_TICK,
300 			rcvvar->srtt, TS_TO_MSEC((rcvvar->srtt) >> 3), rcvvar->mdev,
301 			rcvvar->mdev_max, rcvvar->rttvar, rcvvar->rtt_seq);
302 }
303 /*----------------------------------------------------------------------------*/
304 static inline void
305 ProcessACK(mtcp_manager_t mtcp, tcp_stream *cur_stream,
306 		struct pkt_ctx *pctx)
307 {
308 	const struct tcphdr* tcph = pctx->p.tcph;
309 	uint32_t seq = pctx->p.seq;
310 	uint32_t ack_seq = pctx->p.ack_seq;
311 	struct tcp_send_vars *sndvar = cur_stream->sndvar;
312 	uint32_t cwindow, cwindow_prev;
313 	uint32_t rmlen;
314 	uint32_t snd_wnd_prev;
315 	uint32_t right_wnd_edge;
316 	uint8_t dup;
317 
318 	cwindow = pctx->p.window;
319 	if (!tcph->syn) {
320 		cwindow = cwindow << sndvar->wscale_peer;
321 	}
322 	right_wnd_edge = sndvar->peer_wnd + cur_stream->rcvvar->snd_wl2;
323 
324 	/* If ack overs the sending buffer, return */
325 	if (cur_stream->state == TCP_ST_FIN_WAIT_1 ||
326 			cur_stream->state == TCP_ST_FIN_WAIT_2 ||
327 			cur_stream->state == TCP_ST_CLOSING ||
328 			cur_stream->state == TCP_ST_CLOSE_WAIT ||
329 			cur_stream->state == TCP_ST_LAST_ACK) {
330 		if (sndvar->is_fin_sent && ack_seq == sndvar->fss + 1) {
331 			ack_seq--;
332 		}
333 	}
334 
335 	if (TCP_SEQ_GT(ack_seq, sndvar->sndbuf->head_seq + sndvar->sndbuf->len)) {
336 		TRACE_DBG("Stream %d (%s): invalid acknologement. "
337 				"ack_seq: %u, possible max_ack_seq: %u\n", cur_stream->id,
338 				TCPStateToString(cur_stream), ack_seq,
339 				sndvar->sndbuf->head_seq + sndvar->sndbuf->len);
340 		return;
341 	}
342 
343 #ifdef BE_RESILIENT_TO_PACKET_DROP
344 	if (TCP_SEQ_GT(seq + pctx->p.payloadlen, cur_stream->rcv_nxt))
345 		cur_stream->rcv_nxt = seq + pctx->p.payloadlen;
346 #endif
347 
348 	/* Update window */
349 	if (TCP_SEQ_LT(cur_stream->rcvvar->snd_wl1, seq) ||
350 			(cur_stream->rcvvar->snd_wl1 == seq &&
351 			TCP_SEQ_LT(cur_stream->rcvvar->snd_wl2, ack_seq)) ||
352 			(cur_stream->rcvvar->snd_wl2 == ack_seq &&
353 			cwindow > sndvar->peer_wnd)) {
354 		cwindow_prev = sndvar->peer_wnd;
355 		sndvar->peer_wnd = cwindow;
356 		cur_stream->rcvvar->snd_wl1 = seq;
357 		cur_stream->rcvvar->snd_wl2 = ack_seq;
358 		TRACE_CLWND("Window update. "
359 				"ack: %u, peer_wnd: %u, snd_nxt-snd_una: %u\n",
360 				ack_seq, cwindow, cur_stream->snd_nxt - sndvar->snd_una);
361 		if (cwindow_prev < cur_stream->snd_nxt - sndvar->snd_una &&
362 				sndvar->peer_wnd >= cur_stream->snd_nxt - sndvar->snd_una) {
363 			TRACE_CLWND("%u Broadcasting client window update! "
364 					"ack_seq: %u, peer_wnd: %u (before: %u), "
365 					"(snd_nxt - snd_una: %u)\n",
366 					cur_stream->id, ack_seq, sndvar->peer_wnd, cwindow_prev,
367 					cur_stream->snd_nxt - sndvar->snd_una);
368 			if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
369 				RaiseWriteEvent(mtcp, cur_stream);
370 		}
371 	}
372 
373 	/* Check duplicated ack count */
374 	/* Duplicated ack if
375 	   1) ack_seq is old
376 	   2) payload length is 0.
377 	   3) advertised window not changed.
378 	   4) there is outstanding unacknowledged data
379 	   5) ack_seq == snd_una
380 	 */
381 
382 	dup = FALSE;
383 	if (TCP_SEQ_LT(ack_seq, cur_stream->snd_nxt)) {
384 		if (ack_seq == cur_stream->rcvvar->last_ack_seq && pctx->p.payloadlen == 0) {
385 			if (cur_stream->rcvvar->snd_wl2 + sndvar->peer_wnd == right_wnd_edge) {
386 				if (cur_stream->rcvvar->dup_acks + 1 > cur_stream->rcvvar->dup_acks) {
387 					cur_stream->rcvvar->dup_acks++;
388 				}
389 				dup = TRUE;
390 			}
391 		}
392 	}
393 	if (!dup) {
394 		cur_stream->rcvvar->dup_acks = 0;
395 		cur_stream->rcvvar->last_ack_seq = ack_seq;
396 	}
397 
398 	/* Fast retransmission */
399 	if (dup && cur_stream->rcvvar->dup_acks == 3) {
400 		TRACE_LOSS("Triple duplicated ACKs!! ack_seq: %u\n", ack_seq);
401 		if (TCP_SEQ_LT(ack_seq, cur_stream->snd_nxt)) {
402 			TRACE_LOSS("Reducing snd_nxt from %u to %u\n",
403 					cur_stream->snd_nxt, ack_seq);
404 #if RTM_STAT
405 			sndvar->rstat.tdp_ack_cnt++;
406 			sndvar->rstat.tdp_ack_bytes += (cur_stream->snd_nxt - ack_seq);
407 #endif
408 			if (ack_seq != sndvar->snd_una) {
409 				TRACE_DBG("ack_seq and snd_una mismatch on tdp ack. "
410 						"ack_seq: %u, snd_una: %u\n",
411 						ack_seq, sndvar->snd_una);
412 			}
413 			cur_stream->snd_nxt = ack_seq;
414 		}
415 
416 		/* update congestion control variables */
417 		/* ssthresh to half of min of cwnd and peer wnd */
418 		sndvar->ssthresh = MIN(sndvar->cwnd, sndvar->peer_wnd) / 2;
419 		if (sndvar->ssthresh < 2 * sndvar->mss) {
420 			sndvar->ssthresh = 2 * sndvar->mss;
421 		}
422 		sndvar->cwnd = sndvar->ssthresh + 3 * sndvar->mss;
423 		TRACE_CONG("Fast retransmission. cwnd: %u, ssthresh: %u\n",
424 				sndvar->cwnd, sndvar->ssthresh);
425 
426 		/* count number of retransmissions */
427 		if (sndvar->nrtx < TCP_MAX_RTX) {
428 			sndvar->nrtx++;
429 		} else {
430 			TRACE_DBG("Exceed MAX_RTX.\n");
431 		}
432 
433 		cur_stream->actions |= MOS_ACT_SEND_DATA;
434 
435 	} else if (cur_stream->rcvvar->dup_acks > 3) {
436 		/* Inflate congestion window until before overflow */
437 		if ((uint32_t)(sndvar->cwnd + sndvar->mss) > sndvar->cwnd) {
438 			sndvar->cwnd += sndvar->mss;
439 			TRACE_CONG("Dupack cwnd inflate. cwnd: %u, ssthresh: %u\n",
440 					sndvar->cwnd, sndvar->ssthresh);
441 		}
442 	}
443 
444 #if TCP_OPT_SACK_ENABLED
445 	ParseSACKOption(cur_stream, ack_seq, (uint8_t *)tcph + TCP_HEADER_LEN,
446 			(tcph->doff << 2) - TCP_HEADER_LEN);
447 #endif /* TCP_OPT_SACK_ENABLED */
448 
449 #if RECOVERY_AFTER_LOSS
450 	/* updating snd_nxt (when recovered from loss) */
451 	if (TCP_SEQ_GT(ack_seq, cur_stream->snd_nxt)) {
452 #if RTM_STAT
453 		sndvar->rstat.ack_upd_cnt++;
454 		sndvar->rstat.ack_upd_bytes += (ack_seq - cur_stream->snd_nxt);
455 #endif
456 		TRACE_LOSS("Updating snd_nxt from %u to %u\n",
457 				cur_stream->snd_nxt, ack_seq);
458 		cur_stream->snd_nxt = ack_seq;
459 		if (sndvar->sndbuf->len == 0) {
460 			if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
461 				RemoveFromSendList(mtcp, cur_stream);
462 		}
463 	}
464 #endif
465 
466 	/* If ack_seq is previously acked, return */
467 	if (TCP_SEQ_GEQ(sndvar->sndbuf->head_seq, ack_seq)) {
468 		return;
469 	}
470 
471 	/* Remove acked sequence from send buffer */
472 	rmlen = ack_seq - sndvar->sndbuf->head_seq;
473 	if (rmlen > 0) {
474 		/* Routine goes here only if there is new payload (not retransmitted) */
475 		uint16_t packets;
476 
477 		/* If acks new data */
478 		packets = rmlen / sndvar->eff_mss;
479 		if ((rmlen / sndvar->eff_mss) * sndvar->eff_mss > rmlen) {
480 			packets++;
481 		}
482 
483 		/* Estimate RTT and calculate rto */
484 		if (cur_stream->saw_timestamp) {
485 			EstimateRTT(mtcp, cur_stream,
486 					pctx->p.cur_ts - cur_stream->rcvvar->ts_lastack_rcvd);
487 			sndvar->rto = (cur_stream->rcvvar->srtt >> 3) + cur_stream->rcvvar->rttvar;
488 			assert(sndvar->rto > 0);
489 		} else {
490 			//TODO: Need to implement timestamp estimation without timestamp
491 			TRACE_RTT("NOT IMPLEMENTED.\n");
492 		}
493 
494 		/* Update congestion control variables */
495 		if (cur_stream->state >= TCP_ST_ESTABLISHED) {
496 			if (sndvar->cwnd < sndvar->ssthresh) {
497 				if ((sndvar->cwnd + sndvar->mss) > sndvar->cwnd) {
498 					sndvar->cwnd += (sndvar->mss * packets);
499 				}
500 				TRACE_CONG("slow start cwnd: %u, ssthresh: %u\n",
501 						sndvar->cwnd, sndvar->ssthresh);
502 			} else {
503 				uint32_t new_cwnd = sndvar->cwnd +
504 						packets * sndvar->mss * sndvar->mss /
505 						sndvar->cwnd;
506 				if (new_cwnd > sndvar->cwnd) {
507 					sndvar->cwnd = new_cwnd;
508 				}
509 				//TRACE_CONG("congestion avoidance cwnd: %u, ssthresh: %u\n",
510 				//		sndvar->cwnd, sndvar->ssthresh);
511 			}
512 		}
513 
514 		if (SBUF_LOCK(&sndvar->write_lock)) {
515 			if (errno == EDEADLK)
516 				perror("ProcessACK: write_lock blocked\n");
517 			assert(0);
518 		}
519 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
520 			SBRemove(mtcp->rbm_snd, sndvar->sndbuf, rmlen);
521 		sndvar->snd_una = ack_seq;
522 		snd_wnd_prev = sndvar->snd_wnd;
523 		sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len;
524 
525 		/* If there was no available sending window */
526 		/* notify the newly available window to application */
527 #if SELECTIVE_WRITE_EVENT_NOTIFY
528 		if (snd_wnd_prev <= 0) {
529 #endif /* SELECTIVE_WRITE_EVENT_NOTIFY */
530 			if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
531 				RaiseWriteEvent(mtcp, cur_stream);
532 #if SELECTIVE_WRITE_EVENT_NOTIFY
533 		}
534 #endif /* SELECTIVE_WRITE_EVENT_NOTIFY */
535 
536 		SBUF_UNLOCK(&sndvar->write_lock);
537 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
538 			UpdateRetransmissionTimer(mtcp, cur_stream, pctx->p.cur_ts);
539 	}
540 }
541 /*----------------------------------------------------------------------------*/
542 /* ProcessTCPPayload: merges TCP payload using receive ring buffer            */
543 /* Return: TRUE (1) in normal case, FALSE (0) if immediate ACK is required    */
544 /* CAUTION: should only be called at ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2      */
545 /*----------------------------------------------------------------------------*/
546 static inline int
547 ProcessTCPPayload(mtcp_manager_t mtcp, tcp_stream *cur_stream,
548 				struct pkt_ctx *pctx)
549 {
550 	struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
551 	uint32_t prev_rcv_nxt;
552 	int ret = -1;
553 	bool read_lock;
554 	struct socket_map *walk;
555 
556 	if (!cur_stream->buffer_mgmt)
557 		return FALSE;
558 
559 	/* if seq and segment length is lower than rcv_nxt, ignore and send ack */
560 	if (TCP_SEQ_LT(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt)) {
561 		SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
562 			HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side,
563 				       pctx, MOS_ON_ERROR);
564 		} SOCKQ_FOREACH_END;
565 		return FALSE;
566 	}
567 	/* if payload exceeds receiving buffer, drop and send ack */
568 	if (TCP_SEQ_GT(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt + rcvvar->rcv_wnd)) {
569 		SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
570 			HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side,
571 				       pctx, MOS_ON_ERROR);
572 		} SOCKQ_FOREACH_END;
573 		return FALSE;
574 	}
575 
576 	/* allocate receive buffer if not exist */
577 	if (!rcvvar->rcvbuf) {
578 		rcvvar->rcvbuf = tcprb_new(mtcp->bufseg_pool, g_config.mos->rmem_size, cur_stream->buffer_mgmt);
579 		if (!rcvvar->rcvbuf) {
580 			TRACE_ERROR("Stream %d: Failed to allocate receive buffer.\n",
581 				    cur_stream->id);
582 			cur_stream->state = TCP_ST_CLOSED_RSVD;
583 			cur_stream->close_reason = TCP_NO_MEM;
584 			cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
585 			if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
586 				RaiseErrorEvent(mtcp, cur_stream);
587 
588 			return ERROR;
589 		}
590 	}
591 
592 	read_lock = HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM);
593 
594 	if (read_lock && SBUF_LOCK(&rcvvar->read_lock)) {
595 		if (errno == EDEADLK)
596 			perror("ProcessTCPPayload: read_lock blocked\n");
597 		assert(0);
598 	}
599 
600 	prev_rcv_nxt = cur_stream->rcv_nxt;
601 
602 	tcprb_t *rb = rcvvar->rcvbuf;
603 	loff_t off = seq2loff(rb, pctx->p.seq, (rcvvar->irs + 1));
604 	if (off >= 0) {
605 		if (!HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM) &&
606 			HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE))
607 			tcprb_setpile(rb, rb->pile + tcprb_cflen(rb));
608 		ret = tcprb_pwrite(rb, pctx->p.payload, pctx->p.payloadlen, off);
609 		if (ret < 0) {
610 			/* We try again after warning this result to the user. */
611 			SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
612 				HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side,
613 						   pctx, MOS_ON_ERROR);
614 			} SOCKQ_FOREACH_END;
615 			ret = tcprb_pwrite(rb, pctx->p.payload, pctx->p.payloadlen, off);
616 		}
617 	}
618 	/* TODO: update monitor vars */
619 
620 	/*
621 	 * error can pop up due to disabled buffered management
622 	 * (only in monitor mode). In that case, ignore the warning
623 	 * message.
624 	 */
625 	if (ret < 0 && cur_stream->buffer_mgmt && mtcp->num_msp == 0)
626 		TRACE_ERROR("Cannot merge payload. reason: %d\n", ret);
627 
628 	/* discard the buffer if the state is FIN_WAIT_1 or FIN_WAIT_2,
629 	   meaning that the connection is already closed by the application */
630 	loff_t cftail = rb->pile + tcprb_cflen(rb);
631 	if (cur_stream->state == TCP_ST_FIN_WAIT_1 ||
632 	    cur_stream->state == TCP_ST_FIN_WAIT_2) {
633 		/* XXX: Do we really need to update recv vars? */
634 		tcprb_setpile(rb, cftail);
635 	}
636 	if (cftail > 0 && (rcvvar->irs + 1) + cftail > cur_stream->rcv_nxt) {
637 		RAISE_DEBUG_EVENT(mtcp, cur_stream,
638 				"Move rcv_nxt from %u to %u.\n",
639 				cur_stream->rcv_nxt, (rcvvar->irs + 1) + cftail);
640 		cur_stream->rcv_nxt = (rcvvar->irs + 1) + cftail;
641 	}
642 	assert(cftail - rb->pile >= 0);
643 	rcvvar->rcv_wnd = rb->len - (cftail - rb->pile);
644 
645 	if (read_lock)
646 		SBUF_UNLOCK(&rcvvar->read_lock);
647 
648 
649 	if (TCP_SEQ_LEQ(cur_stream->rcv_nxt, prev_rcv_nxt)) {
650 		/* There are some lost packets */
651 		return FALSE;
652 	}
653 
654 	TRACE_EPOLL("Stream %d data arrived. "
655 		    "len: %d, ET: %llu, IN: %llu, OUT: %llu\n",
656 		    cur_stream->id, pctx->p.payloadlen,
657 		    cur_stream->socket? (unsigned long long)cur_stream->socket->epoll & MOS_EPOLLET : 0,
658 		    cur_stream->socket? (unsigned long long)cur_stream->socket->epoll & MOS_EPOLLIN : 0,
659 		    cur_stream->socket? (unsigned long long)cur_stream->socket->epoll & MOS_EPOLLOUT : 0);
660 
661 	if (cur_stream->state == TCP_ST_ESTABLISHED)
662 		RaiseReadEvent(mtcp, cur_stream);
663 
664 	return TRUE;
665 }
666 /*----------------------------------------------------------------------------*/
667 static inline void
668 Handle_TCP_ST_LISTEN (mtcp_manager_t mtcp, tcp_stream* cur_stream,
669 		struct pkt_ctx *pctx)
670 {
671 
672 	const struct tcphdr* tcph = pctx->p.tcph;
673 
674 	if (tcph->syn) {
675 		if (cur_stream->state == TCP_ST_LISTEN)
676 			cur_stream->rcv_nxt++;
677 		cur_stream->state = TCP_ST_SYN_RCVD;
678 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE | MOS_ON_CONN_START;
679 		TRACE_STATE("Stream %d: TCP_ST_SYN_RCVD\n", cur_stream->id);
680 		cur_stream->actions |= MOS_ACT_SEND_CONTROL;
681 		if (IS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE)) {
682 			/**
683 			 * Passive stream context needs to initialize irs and rcv_nxt
684 			 * as it is not set neither during createserverstream or monitor
685 			 * creation.
686 			 */
687 			cur_stream->rcvvar->irs =
688 				cur_stream->rcv_nxt = pctx->p.seq;
689 		}
690 	} else {
691 		CTRACE_ERROR("Stream %d (TCP_ST_LISTEN): "
692 				"Packet without SYN.\n", cur_stream->id);
693 	}
694 
695 }
696 /*----------------------------------------------------------------------------*/
697 static inline void
698 Handle_TCP_ST_SYN_SENT (mtcp_manager_t mtcp, tcp_stream* cur_stream,
699 		struct pkt_ctx *pctx)
700 {
701 	const struct tcphdr* tcph = pctx->p.tcph;
702 
703 	/* when active open */
704 	if (tcph->ack) {
705 		/* filter the unacceptable acks */
706 		if (TCP_SEQ_LEQ(pctx->p.ack_seq, cur_stream->sndvar->iss)
707 #ifndef BE_RESILIENT_TO_PACKET_DROP
708 			|| TCP_SEQ_GT(pctx->p.ack_seq, cur_stream->snd_nxt)
709 #endif
710 				) {
711 			if (!tcph->rst) {
712 				cur_stream->actions |= MOS_ACT_SEND_RST;
713 			}
714 			return;
715 		}
716 		/* accept the ack */
717 		cur_stream->sndvar->snd_una++;
718 	}
719 
720 	if (tcph->rst) {
721 		if (tcph->ack) {
722 			cur_stream->state = TCP_ST_CLOSED_RSVD;
723 			cur_stream->close_reason = TCP_RESET;
724 			cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
725 			if (cur_stream->socket) {
726 				if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
727 					RaiseErrorEvent(mtcp, cur_stream);
728 			} else {
729 				cur_stream->actions |= MOS_ACT_DESTROY;
730 			}
731 		}
732 		return;
733 	}
734 
735 	if (tcph->ack
736 #ifndef BE_RESILIENT_TO_PACKET_DROP
737 		/* If we already lost SYNACK, let the ACK packet do the SYNACK's role */
738 		&& tcph->syn
739 #endif
740 	   ) {
741 		int ret = HandleActiveOpen(mtcp, cur_stream, pctx);
742 		if (!ret) {
743 			return;
744 		}
745 
746 #ifdef BE_RESILIENT_TO_PACKET_DROP
747 		if (!tcph->syn) {
748 			RAISE_DEBUG_EVENT(mtcp, cur_stream,
749 					"We missed SYNACK. Replace it with an ACK packet.\n");
750 			/* correct some variables */
751 			cur_stream->rcv_nxt = pctx->p.seq;
752 		}
753 #endif
754 
755 		cur_stream->sndvar->nrtx = 0;
756 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
757 			RemoveFromRTOList(mtcp, cur_stream);
758 		cur_stream->state = TCP_ST_ESTABLISHED;
759 		cur_stream->cb_events |= /*MOS_ON_CONN_SETUP |*/ MOS_ON_TCP_STATE_CHANGE;
760 		TRACE_STATE("Stream %d: TCP_ST_ESTABLISHED\n", cur_stream->id);
761 
762 		if (cur_stream->socket) {
763 			if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
764 				RaiseWriteEvent(mtcp, cur_stream);
765 		} else {
766 			TRACE_STATE("Stream %d: ESTABLISHED, but no socket\n", cur_stream->id);
767 			cur_stream->close_reason = TCP_ACTIVE_CLOSE;
768 			cur_stream->actions |= MOS_ACT_SEND_RST;
769 			cur_stream->actions |= MOS_ACT_DESTROY;
770 		}
771 		cur_stream->actions |= MOS_ACT_SEND_CONTROL;
772 		if (g_config.mos->tcp_timeout > 0)
773 			if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
774 				AddtoTimeoutList(mtcp, cur_stream);
775 
776 #ifdef BE_RESILIENT_TO_PACKET_DROP
777 		/* Handle this ack packet */
778 		if (!tcph->syn)
779 			Handle_TCP_ST_ESTABLISHED(mtcp, cur_stream, pctx);
780 #endif
781 
782 	} else if (tcph->syn) {
783 		cur_stream->state = TCP_ST_SYN_RCVD;
784 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
785 		TRACE_STATE("Stream %d: TCP_ST_SYN_RCVD\n", cur_stream->id);
786 		cur_stream->snd_nxt = cur_stream->sndvar->iss;
787 		cur_stream->actions |= MOS_ACT_SEND_CONTROL;
788 	}
789 }
790 /*----------------------------------------------------------------------------*/
791 static inline void
792 Handle_TCP_ST_SYN_RCVD (mtcp_manager_t mtcp, tcp_stream* cur_stream,
793 		struct pkt_ctx *pctx)
794 {
795 	const struct tcphdr* tcph = pctx->p.tcph;
796 	struct tcp_send_vars *sndvar = cur_stream->sndvar;
797 	int ret;
798 
799 	if (tcph->ack) {
800 		uint32_t prior_cwnd;
801 		/* NOTE: We do not validate the ack number because first few packets
802 		 * can also come out of order */
803 
804 		sndvar->snd_una++;
805 		cur_stream->snd_nxt = pctx->p.ack_seq;
806 		prior_cwnd = sndvar->cwnd;
807 		sndvar->cwnd = ((prior_cwnd == 1)?
808 				(sndvar->mss * 2): sndvar->mss);
809 
810 		//UpdateRetransmissionTimer(mtcp, cur_stream, cur_ts);
811 		sndvar->nrtx = 0;
812 		cur_stream->rcv_nxt = cur_stream->rcvvar->irs + 1;
813 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
814 			RemoveFromRTOList(mtcp, cur_stream);
815 
816 		cur_stream->state = TCP_ST_ESTABLISHED;
817 		cur_stream->cb_events |= /*MOS_ON_CONN_SETUP |*/ MOS_ON_TCP_STATE_CHANGE;
818 		TRACE_STATE("Stream %d: TCP_ST_ESTABLISHED\n", cur_stream->id);
819 
820 #ifdef BE_RESILIENT_TO_PACKET_DROP
821 		if (pctx->p.ack_seq != sndvar->iss + 1)
822 			Handle_TCP_ST_ESTABLISHED(mtcp, cur_stream, pctx);
823 #endif
824 
825 		/* update listening socket */
826 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) {
827 
828 			struct tcp_listener *listener = mtcp->listener;
829 
830 			ret = StreamEnqueue(listener->acceptq, cur_stream);
831 			if (ret < 0) {
832 				TRACE_ERROR("Stream %d: Failed to enqueue to "
833 						"the listen backlog!\n", cur_stream->id);
834 				cur_stream->close_reason = TCP_NOT_ACCEPTED;
835 				cur_stream->state = TCP_ST_CLOSED_RSVD;
836 				cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
837 				TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", cur_stream->id);
838 				cur_stream->actions |= MOS_ACT_SEND_CONTROL;
839 			}
840 
841 			/* raise an event to the listening socket */
842 			if (listener->socket && (listener->socket->epoll & MOS_EPOLLIN)) {
843 				AddEpollEvent(mtcp->ep,
844 						MOS_EVENT_QUEUE, listener->socket, MOS_EPOLLIN);
845 			}
846 		}
847 
848 		//TRACE_DBG("Stream %d inserted into acceptq.\n", cur_stream->id);
849 		if (g_config.mos->tcp_timeout > 0)
850 			if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
851 				AddtoTimeoutList(mtcp, cur_stream);
852 
853 	} else {
854 		/* Handle retransmitted SYN packet */
855 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE) &&
856 			tcph->syn) {
857 			if (pctx->p.seq == cur_stream->pair_stream->sndvar->iss) {
858 				TRACE_DBG("syn retransmit! (p.seq = %u / iss = %u)\n",
859 						  pctx->p.seq, cur_stream->pair_stream->sndvar->iss);
860 				cur_stream->cb_events |= MOS_ON_REXMIT;
861 			}
862 		}
863 
864 		TRACE_DBG("Stream %d (TCP_ST_SYN_RCVD): No ACK.\n",
865 				cur_stream->id);
866 		/* retransmit SYN/ACK */
867 		cur_stream->snd_nxt = sndvar->iss;
868 		cur_stream->actions |= MOS_ACT_SEND_CONTROL;
869 	}
870 }
871 /*----------------------------------------------------------------------------*/
872 static inline void
873 Handle_TCP_ST_ESTABLISHED (mtcp_manager_t mtcp, tcp_stream* cur_stream,
874 		struct pkt_ctx *pctx)
875 {
876 	const struct tcphdr* tcph = pctx->p.tcph;
877 
878 	if (tcph->syn) {
879 		/* Handle retransmitted SYNACK packet */
880 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE) &&
881 			tcph->ack) {
882 			if (pctx->p.seq == cur_stream->pair_stream->sndvar->iss) {
883 				TRACE_DBG("syn/ack retransmit! (p.seq = %u / iss = %u)\n",
884 						pctx->p.seq, cur_stream->pair_stream->sndvar->iss);
885 				cur_stream->cb_events |= MOS_ON_REXMIT;
886 			}
887 		}
888 
889 		TRACE_DBG("Stream %d (TCP_ST_ESTABLISHED): weird SYN. "
890 				"seq: %u, expected: %u, ack_seq: %u, expected: %u\n",
891 				cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt,
892 				pctx->p.ack_seq, cur_stream->snd_nxt);
893 		cur_stream->snd_nxt = pctx->p.ack_seq;
894 		cur_stream->actions |= MOS_ACT_SEND_CONTROL;
895 		return;
896 	}
897 
898 	if (pctx->p.payloadlen > 0) {
899 		if (ProcessTCPPayload(mtcp, cur_stream, pctx)) {
900 			/* if return is TRUE, send ACK */
901 			cur_stream->actions |= MOS_ACT_SEND_ACK_AGG;
902 		} else {
903 			cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
904 		}
905 	}
906 
907 	if (tcph->ack) {
908 		if (cur_stream->sndvar->sndbuf) {
909 			ProcessACK(mtcp, cur_stream, pctx);
910 		}
911 	}
912 
913 	if (tcph->fin) {
914 		/* process the FIN only if the sequence is valid */
915 		/* FIN packet is allowed to push payload (should we check for PSH flag)? */
916 		if (!cur_stream->buffer_mgmt ||
917 #ifdef BE_RESILIENT_TO_PACKET_DROP
918 			TCP_SEQ_GEQ(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt)
919 #else
920 			pctx->p.seq + pctx->p.payloadlen == cur_stream->rcv_nxt
921 #endif
922 			) {
923 #ifdef BE_RESILIENT_TO_PACKET_DROP
924 			cur_stream->rcv_nxt = pctx->p.seq + pctx->p.payloadlen;
925 #endif
926 			cur_stream->state = TCP_ST_CLOSE_WAIT;
927 			cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
928 			TRACE_STATE("Stream %d: TCP_ST_CLOSE_WAIT\n", cur_stream->id);
929 			cur_stream->rcv_nxt++;
930 			cur_stream->actions |= MOS_ACT_SEND_CONTROL;
931 
932 			/* notify FIN to application */
933 			RaiseReadEvent(mtcp, cur_stream);
934 		} else {
935 			RAISE_DEBUG_EVENT(mtcp, cur_stream,
936 					"Expected %u, but received %u (= %u + %u)\n",
937 					cur_stream->rcv_nxt, pctx->p.seq + pctx->p.payloadlen,
938 					pctx->p.seq, pctx->p.payloadlen);
939 
940 			cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
941 			return;
942 		}
943 	}
944 }
945 /*----------------------------------------------------------------------------*/
946 static inline void
947 Handle_TCP_ST_CLOSE_WAIT (mtcp_manager_t mtcp, tcp_stream* cur_stream,
948 		struct pkt_ctx *pctx)
949 {
950 	if (TCP_SEQ_LT(pctx->p.seq, cur_stream->rcv_nxt)) {
951 		TRACE_DBG("Stream %d (TCP_ST_CLOSE_WAIT): "
952 				"weird seq: %u, expected: %u\n",
953 				cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt);
954 		cur_stream->actions |= MOS_ACT_SEND_CONTROL;
955 		return;
956 	}
957 
958 	if (cur_stream->sndvar->sndbuf) {
959 		ProcessACK(mtcp, cur_stream, pctx);
960 	}
961 }
962 /*----------------------------------------------------------------------------*/
963 static inline void
964 Handle_TCP_ST_LAST_ACK (mtcp_manager_t mtcp, tcp_stream* cur_stream,
965 		struct pkt_ctx *pctx)
966 {
967 	const struct tcphdr* tcph = pctx->p.tcph;
968 
969 	if (TCP_SEQ_LT(pctx->p.seq, cur_stream->rcv_nxt)) {
970 		TRACE_DBG("Stream %d (TCP_ST_LAST_ACK): "
971 				"weird seq: %u, expected: %u\n",
972 				cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt);
973 		return;
974 	}
975 
976 	if (tcph->ack) {
977 		if (cur_stream->sndvar->sndbuf) {
978 			ProcessACK(mtcp, cur_stream, pctx);
979 		}
980 
981 		if (!cur_stream->sndvar->is_fin_sent) {
982 			/* the case that FIN is not sent yet */
983 			/* this is not ack for FIN, ignore */
984 			TRACE_DBG("Stream %d (TCP_ST_LAST_ACK): "
985 					"No FIN sent yet.\n", cur_stream->id);
986 #ifdef DBGMSG
987 			DumpIPPacket(mtcp, pctx->p.iph, pctx->p.ip_len);
988 #endif
989 #if DUMP_STREAM
990 			DumpStream(mtcp, cur_stream);
991 			DumpControlList(mtcp, mtcp->n_sender[0]);
992 #endif
993 			return;
994 		}
995 
996 		/* check if ACK of FIN */
997 		if (pctx->p.ack_seq == cur_stream->sndvar->fss + 1) {
998 			cur_stream->sndvar->snd_una++;
999 			if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
1000 				UpdateRetransmissionTimer(mtcp, cur_stream, pctx->p.cur_ts);
1001 			cur_stream->state = TCP_ST_CLOSED_RSVD;
1002 			cur_stream->close_reason = TCP_PASSIVE_CLOSE;
1003 			cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1004 			TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n",
1005 					cur_stream->id);
1006 			cur_stream->actions |= MOS_ACT_DESTROY;
1007 		} else {
1008 			TRACE_DBG("Stream %d (TCP_ST_LAST_ACK): Not ACK of FIN. "
1009 					"ack_seq: %u, expected: %u\n",
1010 					cur_stream->id, pctx->p.ack_seq, cur_stream->sndvar->fss + 1);
1011 			//cur_stream->snd_nxt = cur_stream->sndvar->fss;
1012 			cur_stream->actions |= MOS_ACT_SEND_CONTROL;
1013 		}
1014 	} else {
1015 		TRACE_DBG("Stream %d (TCP_ST_LAST_ACK): No ACK\n",
1016 			  cur_stream->id);
1017 		//cur_stream->snd_nxt = cur_stream->sndvar->fss;
1018 		cur_stream->actions |= MOS_ACT_SEND_CONTROL;
1019 	}
1020 }
1021 /*----------------------------------------------------------------------------*/
1022 static inline void
1023 Handle_TCP_ST_FIN_WAIT_1 (mtcp_manager_t mtcp, tcp_stream* cur_stream,
1024 		struct pkt_ctx *pctx)
1025 {
1026 	const struct tcphdr* tcph = pctx->p.tcph;
1027 
1028 	if (TCP_SEQ_LT(pctx->p.seq, cur_stream->rcv_nxt)) {
1029 		RAISE_DEBUG_EVENT(mtcp, cur_stream,
1030 				"Stream %d (FIN_WAIT_1): "
1031 				"weird seq: %u, expected: %u\n",
1032 				cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt);
1033 		cur_stream->actions |= MOS_ACT_SEND_CONTROL;
1034 		return;
1035 	}
1036 
1037 	if (tcph->ack) {
1038 		if (cur_stream->sndvar->sndbuf) {
1039 			ProcessACK(mtcp, cur_stream, pctx);
1040 		}
1041 
1042 		if (cur_stream->sndvar->is_fin_sent &&
1043 			((!HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM) &&
1044 			  pctx->p.ack_seq == cur_stream->sndvar->fss) ||
1045 			 pctx->p.ack_seq == cur_stream->sndvar->fss + 1)) {
1046 			cur_stream->sndvar->snd_una = pctx->p.ack_seq;
1047 			if (TCP_SEQ_GT(pctx->p.ack_seq, cur_stream->snd_nxt)) {
1048 				TRACE_DBG("Stream %d: update snd_nxt to %u\n",
1049 						cur_stream->id, pctx->p.ack_seq);
1050 				cur_stream->snd_nxt = pctx->p.ack_seq;
1051 			}
1052 			//cur_stream->sndvar->snd_una++;
1053 			//UpdateRetransmissionTimer(mtcp, cur_stream, cur_ts);
1054 			cur_stream->sndvar->nrtx = 0;
1055 			if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
1056 				RemoveFromRTOList(mtcp, cur_stream);
1057 			cur_stream->state = TCP_ST_FIN_WAIT_2;
1058 			cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1059 			TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_2\n",
1060 					cur_stream->id);
1061 		} else {
1062 			RAISE_DEBUG_EVENT(mtcp, cur_stream,
1063 					"Failed to transit to FIN_WAIT_2, "
1064 					"is_fin_sent: %s, ack_seq: %u, sndvar->fss: %u\n",
1065 					cur_stream->sndvar->is_fin_sent ? "true" : "false",
1066 					pctx->p.ack_seq, cur_stream->sndvar->fss);
1067 		}
1068 
1069 	} else {
1070 		RAISE_DEBUG_EVENT(mtcp, cur_stream,
1071 				"Failed to transit to FIN_WAIT_2, "
1072 				"We got a %s%s%s%s%s%s packet.",
1073 				pctx->p.tcph->syn ? "S" : "",
1074 				pctx->p.tcph->fin ? "F" : "",
1075 				pctx->p.tcph->rst ? "R" : "",
1076 				pctx->p.tcph->psh ? "P" : "",
1077 				pctx->p.tcph->urg ? "U" : "",
1078 				pctx->p.tcph->ack ? "A" : "");
1079 
1080 		TRACE_DBG("Stream %d: does not contain an ack!\n",
1081 				cur_stream->id);
1082 		return;
1083 	}
1084 
1085 	if (pctx->p.payloadlen > 0) {
1086 		if (ProcessTCPPayload(mtcp, cur_stream, pctx)) {
1087 			/* if return is TRUE, send ACK */
1088 			cur_stream->actions |= MOS_ACT_SEND_ACK_AGG;
1089 		} else {
1090 			cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
1091 		}
1092 	}
1093 
1094 	if (tcph->fin) {
1095 		/* process the FIN only if the sequence is valid */
1096 		/* FIN packet is allowed to push payload (should we check for PSH flag)? */
1097 		if (!cur_stream->buffer_mgmt ||
1098 #ifdef BE_RESILIENT_TO_PACKET_DROP
1099 			TCP_SEQ_GEQ(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt)
1100 #else
1101 			pctx->p.seq + pctx->p.payloadlen == cur_stream->rcv_nxt
1102 #endif
1103 		   ) {
1104 			cur_stream->rcv_nxt++;
1105 
1106 			if (cur_stream->state == TCP_ST_FIN_WAIT_1) {
1107 				cur_stream->state = TCP_ST_CLOSING;
1108 				cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1109 				TRACE_STATE("Stream %d: TCP_ST_CLOSING\n", cur_stream->id);
1110 
1111 			} else if (cur_stream->state == TCP_ST_FIN_WAIT_2) {
1112 				cur_stream->state = TCP_ST_TIME_WAIT;
1113 				cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1114 				TRACE_STATE("Stream %d: TCP_ST_TIME_WAIT\n", cur_stream->id);
1115 				AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts);
1116 			}
1117 			cur_stream->actions |= MOS_ACT_SEND_CONTROL;
1118 		} else {
1119 			RAISE_DEBUG_EVENT(mtcp, cur_stream,
1120 					"Expected %u, but received %u (= %u + %u)\n",
1121 					cur_stream->rcv_nxt, pctx->p.seq + pctx->p.payloadlen,
1122 					pctx->p.seq, pctx->p.payloadlen);
1123 
1124 			cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
1125 			return;
1126 		}
1127 	}
1128 }
1129 /*----------------------------------------------------------------------------*/
1130 static inline void
1131 Handle_TCP_ST_FIN_WAIT_2 (mtcp_manager_t mtcp, tcp_stream* cur_stream,
1132 		struct pkt_ctx *pctx)
1133 {
1134 	const struct tcphdr* tcph = pctx->p.tcph;
1135 
1136 	if (tcph->ack) {
1137 		if (cur_stream->sndvar->sndbuf) {
1138 			ProcessACK(mtcp, cur_stream, pctx);
1139 		}
1140 	} else {
1141 		TRACE_DBG("Stream %d: does not contain an ack!\n",
1142 				cur_stream->id);
1143 		return;
1144 	}
1145 
1146 	if (pctx->p.payloadlen > 0) {
1147 		if (ProcessTCPPayload(mtcp, cur_stream, pctx)) {
1148 			/* if return is TRUE, send ACK */
1149 			cur_stream->actions |= MOS_ACT_SEND_ACK_AGG;
1150 		} else {
1151 			cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
1152 		}
1153 	}
1154 
1155 	if (tcph->fin) {
1156 		/* process the FIN only if the sequence is valid */
1157 		/* FIN packet is allowed to push payload (should we check for PSH flag)? */
1158 		if (!cur_stream->buffer_mgmt ||
1159 #ifdef BE_RESILIENT_TO_PACKET_DROP
1160 			TCP_SEQ_GEQ(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt)
1161 #else
1162 			pctx->p.seq + pctx->p.payloadlen == cur_stream->rcv_nxt
1163 #endif
1164 			) {
1165 			cur_stream->state = TCP_ST_TIME_WAIT;
1166 			cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1167 			cur_stream->rcv_nxt++;
1168 			TRACE_STATE("Stream %d: TCP_ST_TIME_WAIT\n", cur_stream->id);
1169 
1170 			AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts);
1171 			cur_stream->actions |= MOS_ACT_SEND_CONTROL;
1172 		}
1173 	} else {
1174 		TRACE_DBG("Stream %d (TCP_ST_FIN_WAIT_2): No FIN. "
1175 				"seq: %u, ack_seq: %u, snd_nxt: %u, snd_una: %u\n",
1176 				cur_stream->id, pctx->p.seq, pctx->p.ack_seq,
1177 				cur_stream->snd_nxt, cur_stream->sndvar->snd_una);
1178 #if DBGMSG
1179 		DumpIPPacket(mtcp, pctx->p.iph, pctx->p.ip_len);
1180 #endif
1181 	}
1182 
1183 }
1184 /*----------------------------------------------------------------------------*/
1185 static inline void
1186 Handle_TCP_ST_CLOSING (mtcp_manager_t mtcp, tcp_stream* cur_stream,
1187 		struct pkt_ctx *pctx)
1188 {
1189 	const struct tcphdr* tcph = pctx->p.tcph;
1190 
1191 	if (tcph->ack) {
1192 		if (cur_stream->sndvar->sndbuf) {
1193 			ProcessACK(mtcp, cur_stream, pctx);
1194 		}
1195 
1196 		if (!cur_stream->sndvar->is_fin_sent) {
1197 			TRACE_DBG("Stream %d (TCP_ST_CLOSING): "
1198 					"No FIN sent yet.\n", cur_stream->id);
1199 			return;
1200 		}
1201 
1202 		// check if ACK of FIN
1203 		if (pctx->p.ack_seq != cur_stream->sndvar->fss + 1) {
1204 #if DBGMSG
1205 			TRACE_DBG("Stream %d (TCP_ST_CLOSING): Not ACK of FIN. "
1206 				  "ack_seq: %u, snd_nxt: %u, snd_una: %u, fss: %u\n",
1207 				  cur_stream->id, pctx->p.ack_seq, cur_stream->snd_nxt,
1208 				  cur_stream->sndvar->snd_una, cur_stream->sndvar->fss);
1209 			DumpIPPacketToFile(stderr, pctx->p.iph, pctx->p.ip_len);
1210 			DumpStream(mtcp, cur_stream);
1211 #endif
1212 			//assert(0);
1213 			/* if the packet is not the ACK of FIN, ignore */
1214 			return;
1215 		}
1216 
1217 		cur_stream->sndvar->snd_una = pctx->p.ack_seq;
1218 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
1219 			UpdateRetransmissionTimer(mtcp, cur_stream, pctx->p.cur_ts);
1220 		cur_stream->state = TCP_ST_TIME_WAIT;
1221 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1222 		TRACE_STATE("Stream %d: TCP_ST_TIME_WAIT\n", cur_stream->id);
1223 
1224 		AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts);
1225 
1226 	} else {
1227 		TRACE_DBG("Stream %d (TCP_ST_CLOSING): Not ACK\n",
1228 			  cur_stream->id);
1229 		return;
1230 	}
1231 }
1232 /*----------------------------------------------------------------------------*/
1233 void
1234 UpdateRecvTCPContext(mtcp_manager_t mtcp, struct tcp_stream *cur_stream,
1235 		struct pkt_ctx *pctx)
1236 {
1237 	struct tcphdr* tcph = pctx->p.tcph;
1238 	int ret;
1239 
1240 	assert(cur_stream);
1241 
1242 	/* Validate sequence. if not valid, ignore the packet */
1243 	if (cur_stream->state > TCP_ST_SYN_RCVD) {
1244 
1245 		ret = ValidateSequence(mtcp, cur_stream, pctx);
1246 		if (!ret) {
1247 			TRACE_DBG("Stream %d: Unexpected sequence: %u, expected: %u\n",
1248 					cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt);
1249 #ifdef DBGMSG
1250 			DumpIPPacket(mtcp, pctx->p.iph, pctx->p.ip_len);
1251 #endif
1252 #if DUMP_STREAM
1253 			DumpStream(mtcp, cur_stream);
1254 #endif
1255 			/* cur_stream->cb_events |= MOS_ON_ERROR; */
1256 		}
1257 	}
1258 	/* Update receive window size */
1259 	if (tcph->syn) {
1260 		cur_stream->sndvar->peer_wnd = pctx->p.window;
1261 	} else {
1262 		cur_stream->sndvar->peer_wnd =
1263 				(uint32_t)pctx->p.window << cur_stream->sndvar->wscale_peer;
1264 	}
1265 
1266 	cur_stream->last_active_ts = pctx->p.cur_ts;
1267 	if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
1268 		UpdateTimeoutList(mtcp, cur_stream);
1269 
1270 	/* Process RST: process here only if state > TCP_ST_SYN_SENT */
1271 	if (tcph->rst) {
1272 		cur_stream->have_reset = TRUE;
1273 		if (cur_stream->state > TCP_ST_SYN_SENT) {
1274 			if (ProcessRST(mtcp, cur_stream, pctx)) {
1275 				return;
1276 			}
1277 		}
1278 	}
1279 
1280 	if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE) &&
1281 		pctx->p.tcph->fin) {
1282 		if (cur_stream->state == TCP_ST_CLOSE_WAIT ||
1283 			cur_stream->state == TCP_ST_LAST_ACK ||
1284 			cur_stream->state == TCP_ST_CLOSING ||
1285 			cur_stream->state == TCP_ST_TIME_WAIT) {
1286 			/* Handle retransmitted FIN packet */
1287 			if (pctx->p.seq == cur_stream->pair_stream->sndvar->fss) {
1288 				TRACE_DBG("FIN retransmit! (seq = %u / fss = %u)\n",
1289 						pctx->p.seq, cur_stream->pair_stream->sndvar->fss);
1290 				cur_stream->cb_events |= MOS_ON_REXMIT;
1291 			}
1292 		}
1293 	}
1294 
1295 	switch (cur_stream->state) {
1296 	case TCP_ST_LISTEN:
1297 		Handle_TCP_ST_LISTEN(mtcp, cur_stream, pctx);
1298 		break;
1299 
1300 	case TCP_ST_SYN_SENT:
1301 		Handle_TCP_ST_SYN_SENT(mtcp, cur_stream, pctx);
1302 		break;
1303 
1304 	case TCP_ST_SYN_RCVD:
1305 		/* SYN retransmit implies our SYN/ACK was lost. Resend */
1306 		if (tcph->syn && pctx->p.seq == cur_stream->rcvvar->irs)
1307 			Handle_TCP_ST_LISTEN(mtcp, cur_stream, pctx);
1308 		else {
1309 			Handle_TCP_ST_SYN_RCVD(mtcp, cur_stream, pctx);
1310 			if (pctx->p.payloadlen > 0 && cur_stream->state == TCP_ST_ESTABLISHED)
1311 				Handle_TCP_ST_ESTABLISHED(mtcp, cur_stream, pctx);
1312 		}
1313 		break;
1314 
1315 	case TCP_ST_ESTABLISHED:
1316 		Handle_TCP_ST_ESTABLISHED(mtcp, cur_stream, pctx);
1317 		break;
1318 
1319 	case TCP_ST_CLOSE_WAIT:
1320 		Handle_TCP_ST_CLOSE_WAIT(mtcp, cur_stream, pctx);
1321 		break;
1322 
1323 	case TCP_ST_LAST_ACK:
1324 		Handle_TCP_ST_LAST_ACK(mtcp, cur_stream, pctx);
1325 		break;
1326 
1327 	case TCP_ST_FIN_WAIT_1:
1328 		Handle_TCP_ST_FIN_WAIT_1(mtcp, cur_stream, pctx);
1329 		break;
1330 
1331 	case TCP_ST_FIN_WAIT_2:
1332 		Handle_TCP_ST_FIN_WAIT_2(mtcp, cur_stream, pctx);
1333 		break;
1334 
1335 	case TCP_ST_CLOSING:
1336 		Handle_TCP_ST_CLOSING(mtcp, cur_stream, pctx);
1337 		break;
1338 
1339 	case TCP_ST_TIME_WAIT:
1340 		/* the only thing that can arrive in this state is a retransmission
1341 		   of the remote FIN. Acknowledge it, and restart the 2 MSL timeout */
1342 		if (cur_stream->on_timewait_list) {
1343 			RemoveFromTimewaitList(mtcp, cur_stream);
1344 			AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts);
1345 		}
1346 		cur_stream->actions |= MOS_ACT_SEND_CONTROL;
1347 		break;
1348 
1349 	case TCP_ST_CLOSED:
1350 	case TCP_ST_CLOSED_RSVD:
1351 		break;
1352 
1353 	default:
1354 		break;
1355 	}
1356 
1357 	TRACE_STATE("Stream %d: Events: %0lx, Action: %0x\n",
1358 			cur_stream->id, cur_stream->cb_events, cur_stream->actions);
1359 	return;
1360 }
1361 /*----------------------------------------------------------------------------*/
1362 void
1363 DoActionEndTCPPacket(mtcp_manager_t mtcp, struct tcp_stream *cur_stream,
1364 		struct pkt_ctx *pctx)
1365 {
1366 	int i;
1367 
1368 	for (i = 1; i < MOS_ACT_CNT; i = i << 1) {
1369 
1370 		if (cur_stream->actions & i) {
1371 			switch(i) {
1372 			case MOS_ACT_SEND_DATA:
1373 				AddtoSendList(mtcp, cur_stream);
1374 				break;
1375 			case MOS_ACT_SEND_ACK_NOW:
1376 				EnqueueACK(mtcp, cur_stream, pctx->p.cur_ts, ACK_OPT_NOW);
1377 				break;
1378 			case MOS_ACT_SEND_ACK_AGG:
1379 				EnqueueACK(mtcp, cur_stream, pctx->p.cur_ts, ACK_OPT_AGGREGATE);
1380 				break;
1381 			case MOS_ACT_SEND_CONTROL:
1382 				AddtoControlList(mtcp, cur_stream, pctx->p.cur_ts);
1383 				break;
1384 			case MOS_ACT_SEND_RST:
1385 				if (cur_stream->state <= TCP_ST_SYN_SENT)
1386 					SendTCPPacketStandalone(mtcp,
1387 							pctx->p.iph->daddr, pctx->p.tcph->dest,
1388 							pctx->p.iph->saddr, pctx->p.tcph->source,
1389 							0, pctx->p.seq + 1, 0, TCP_FLAG_RST | TCP_FLAG_ACK,
1390 							NULL, 0, pctx->p.cur_ts, 0, 0, -1);
1391 				else
1392 					SendTCPPacketStandalone(mtcp,
1393 							pctx->p.iph->daddr, pctx->p.tcph->dest,
1394 							pctx->p.iph->saddr, pctx->p.tcph->source,
1395 							pctx->p.ack_seq, 0, 0, TCP_FLAG_RST | TCP_FLAG_ACK,
1396 							NULL, 0, pctx->p.cur_ts, 0, 0, -1);
1397 				break;
1398 			case MOS_ACT_DESTROY:
1399 				DestroyTCPStream(mtcp, cur_stream);
1400 				break;
1401 			default:
1402 				assert(1);
1403 				break;
1404 			}
1405 		}
1406 	}
1407 
1408 	cur_stream->actions = 0;
1409 }
1410 /*----------------------------------------------------------------------------*/
1411 /**
1412  * Called (when monitoring mode is enabled).. for every outgoing packet to the
1413  * NIC.
1414  */
1415 inline void
1416 UpdatePassiveRecvTCPContext(mtcp_manager_t mtcp, struct tcp_stream *cur_stream,
1417 			       struct pkt_ctx *pctx)
1418 {
1419 	UpdateRecvTCPContext(mtcp, cur_stream, pctx);
1420 }
1421 /*----------------------------------------------------------------------------*/
1422 /* NOTE TODO: This event prediction is additional overhaed of HK_RCV hook.
1423  * We can transparently optimize this by disabling prediction of events which
1424  * are not monitored by anyone. */
1425 inline void
1426 PreRecvTCPEventPrediction(mtcp_manager_t mtcp, struct pkt_ctx *pctx,
1427 			  struct tcp_stream *recvside_stream)
1428 {
1429 	tcp_rb_overlapchk(mtcp, pctx, recvside_stream);
1430 }
1431 /*----------------------------------------------------------------------------*/
1432