xref: /mOS-networking-stack/core/src/tcp_in.c (revision 3f59ddac)
1 #include <assert.h>
2 
3 #include "mos_api.h"
4 #include "tcp_util.h"
5 #include "tcp_in.h"
6 #include "tcp_out.h"
7 #include "tcp_ring_buffer.h"
8 #include "eventpoll.h"
9 #include "debug.h"
10 #include "timer.h"
11 #include "ip_in.h"
12 #include "tcp_rb.h"
13 #include "config.h"
14 #include "scalable_event.h"
15 
16 #define MAX(a, b) ((a)>(b)?(a):(b))
17 #define MIN(a, b) ((a)<(b)?(a):(b))
18 
19 #define RECOVERY_AFTER_LOSS TRUE
20 #define SELECTIVE_WRITE_EVENT_NOTIFY TRUE
21 /*----------------------------------------------------------------------------*/
22 static inline void
23 Handle_TCP_ST_ESTABLISHED (mtcp_manager_t mtcp, tcp_stream* cur_stream,
24 		struct pkt_ctx *pctx);
25 /*----------------------------------------------------------------------------*/
26 static inline int
27 FilterSYNPacket(mtcp_manager_t mtcp, uint32_t ip, uint16_t port)
28 {
29 	struct sockaddr_in *addr;
30 
31 	/* TODO: This listening logic should be revised */
32 
33 	/* if not listening, drop */
34 	if (!mtcp->listener) {
35 		return FALSE;
36 	}
37 
38 	/* if not the address we want, drop */
39 	addr = &mtcp->listener->socket->saddr;
40 	if (addr->sin_port == port) {
41 		if (addr->sin_addr.s_addr != INADDR_ANY) {
42 			if (ip == addr->sin_addr.s_addr) {
43 				return TRUE;
44 			}
45 			return FALSE;
46 		} else {
47 			int i;
48 
49 			for (i = 0; i < g_config.mos->netdev_table->num; i++) {
50 				if (ip == g_config.mos->netdev_table->ent[i]->ip_addr) {
51 					return TRUE;
52 				}
53 			}
54 			return FALSE;
55 		}
56 	}
57 
58 	return FALSE;
59 }
60 /*----------------------------------------------------------------------------*/
61 static inline int
62 HandleActiveOpen(mtcp_manager_t mtcp, tcp_stream *cur_stream,
63 		struct pkt_ctx *pctx)
64 {
65 	const struct tcphdr* tcph = pctx->p.tcph;
66 
67 	cur_stream->rcvvar->irs = pctx->p.seq;
68 	cur_stream->snd_nxt = pctx->p.ack_seq;
69 	cur_stream->sndvar->peer_wnd = pctx->p.window;
70 	cur_stream->rcvvar->snd_wl1 = cur_stream->rcvvar->irs - 1;
71 	cur_stream->rcv_nxt = cur_stream->rcvvar->irs + 1;
72 	cur_stream->rcvvar->last_ack_seq = pctx->p.ack_seq;
73 	ParseTCPOptions(cur_stream, pctx->p.cur_ts, (uint8_t *)tcph + TCP_HEADER_LEN,
74 			(tcph->doff << 2) - TCP_HEADER_LEN);
75 	cur_stream->sndvar->cwnd = ((cur_stream->sndvar->cwnd == 1)?
76 			(cur_stream->sndvar->mss * 2): cur_stream->sndvar->mss);
77 	cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10;
78 	if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
79 		UpdateRetransmissionTimer(mtcp, cur_stream, pctx->p.cur_ts);
80 
81 	return TRUE;
82 }
83 /*----------------------------------------------------------------------------*/
84 /* ValidateSequence: validates sequence number of the segment                 */
85 /* Return: TRUE if acceptable, FALSE if not acceptable                        */
86 /*----------------------------------------------------------------------------*/
87 static inline int
88 ValidateSequence(mtcp_manager_t mtcp, tcp_stream *cur_stream,
89 		struct pkt_ctx *pctx)
90 {
91 	const struct tcphdr* tcph = pctx->p.tcph;
92 
93 	/* Protect Against Wrapped Sequence number (PAWS) */
94 	if (!tcph->rst && cur_stream->saw_timestamp) {
95 		struct tcp_timestamp ts;
96 
97 		if (!ParseTCPTimestamp(cur_stream, &ts,
98 				(uint8_t *)tcph + TCP_HEADER_LEN,
99 				(tcph->doff << 2) - TCP_HEADER_LEN)) {
100 			/* if there is no timestamp */
101 			/* TODO: implement here */
102 			TRACE_DBG("No timestamp found.\n");
103 			return FALSE;
104 		}
105 
106 		/* RFC1323: if SEG.TSval < TS.Recent, drop and send ack */
107 		if (TCP_SEQ_LT(ts.ts_val, cur_stream->rcvvar->ts_recent)) {
108 			/* TODO: ts_recent should be invalidated
109 					 before timestamp wraparound for long idle flow */
110 			TRACE_DBG("PAWS Detect wrong timestamp. "
111 					"seq: %u, ts_val: %u, prev: %u\n",
112 					pctx->p.seq, ts.ts_val, cur_stream->rcvvar->ts_recent);
113 			cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
114 			return FALSE;
115 		} else {
116 			/* valid timestamp */
117 			if (TCP_SEQ_GT(ts.ts_val, cur_stream->rcvvar->ts_recent)) {
118 				TRACE_TSTAMP("Timestamp update. cur: %u, prior: %u "
119 					"(time diff: %uus)\n",
120 					ts.ts_val, cur_stream->rcvvar->ts_recent,
121 					TS_TO_USEC(pctx->p.cur_ts - cur_stream->rcvvar->ts_last_ts_upd));
122 				cur_stream->rcvvar->ts_last_ts_upd = pctx->p.cur_ts;
123 			}
124 
125 			cur_stream->rcvvar->ts_recent = ts.ts_val;
126 			cur_stream->rcvvar->ts_lastack_rcvd = ts.ts_ref;
127 		}
128 	}
129 
130 	/* TCP sequence validation */
131 	if (!TCP_SEQ_BETWEEN(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt,
132 				cur_stream->rcv_nxt + cur_stream->rcvvar->rcv_wnd)) {
133 
134 		/* if RST bit is set, ignore the segment */
135 		if (tcph->rst)
136 			return FALSE;
137 
138 		if (cur_stream->state == TCP_ST_ESTABLISHED) {
139 			/* check if it is to get window advertisement */
140 			if (pctx->p.seq + 1 == cur_stream->rcv_nxt) {
141 				TRACE_DBG("Window update request. (seq: %u, rcv_wnd: %u)\n",
142 						pctx->p.seq, cur_stream->rcvvar->rcv_wnd);
143 				cur_stream->actions |= MOS_ACT_SEND_ACK_AGG;
144 				return FALSE;
145 
146 			}
147 
148 			if (TCP_SEQ_LEQ(pctx->p.seq, cur_stream->rcv_nxt)) {
149 				cur_stream->actions |= MOS_ACT_SEND_ACK_AGG;
150 			} else {
151 				cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
152 			}
153 		} else {
154 			if (cur_stream->state == TCP_ST_TIME_WAIT) {
155 				TRACE_DBG("Stream %d: tw expire update to %u\n",
156 						cur_stream->id, cur_stream->rcvvar->ts_tw_expire);
157 				AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts);
158 			}
159 			cur_stream->actions |= MOS_ACT_SEND_CONTROL;
160 		}
161 		return FALSE;
162 	}
163 
164 	return TRUE;
165 }
166 /*----------------------------------------------------------------------------*/
167 static inline void
168 NotifyConnectionReset(mtcp_manager_t mtcp, tcp_stream *cur_stream)
169 {
170 	TRACE_DBG("Stream %d: Notifying connection reset.\n", cur_stream->id);
171 	/* TODO: implement this function */
172 	/* signal to user "connection reset" */
173 }
174 /*----------------------------------------------------------------------------*/
175 static inline int
176 ProcessRST(mtcp_manager_t mtcp, tcp_stream *cur_stream,
177 		struct pkt_ctx *pctx)
178 {
179 	/* TODO: we need reset validation logic */
180 	/* the sequence number of a RST should be inside window */
181 	/* (in SYN_SENT state, it should ack the previous SYN */
182 
183 	TRACE_DBG("Stream %d: TCP RESET (%s)\n",
184 			cur_stream->id, TCPStateToString(cur_stream));
185 #if DUMP_STREAM
186 	DumpStream(mtcp, cur_stream);
187 #endif
188 
189 	if (cur_stream->state <= TCP_ST_SYN_SENT) {
190 		/* not handled here */
191 		return FALSE;
192 	}
193 
194 	if (cur_stream->state == TCP_ST_SYN_RCVD) {
195 		/* ACK number of last sent ACK packet == rcv_nxt + 1*/
196 		if (pctx->p.seq == 0 ||
197 #ifdef BE_RESILIENT_TO_PACKET_DROP
198 			pctx->p.seq == cur_stream->rcv_nxt + 1 ||
199 #endif
200 			pctx->p.ack_seq == cur_stream->snd_nxt)
201 		{
202 			cur_stream->state = TCP_ST_CLOSED_RSVD;
203 			cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
204 			cur_stream->close_reason = TCP_RESET;
205 			cur_stream->actions |= MOS_ACT_DESTROY;
206 		} else {
207 			RAISE_DEBUG_EVENT(mtcp, cur_stream,
208 				"(SYN_RCVD): Ignore invalid RST. "
209 				"ack_seq expected: %u, ack_seq rcvd: %u\n",
210 				cur_stream->rcv_nxt + 1, pctx->p.ack_seq);
211 		}
212 		return TRUE;
213 	}
214 
215 	/* if the application is already closed the connection,
216 	   just destroy the it */
217 	if (cur_stream->state == TCP_ST_FIN_WAIT_1 ||
218 			cur_stream->state == TCP_ST_FIN_WAIT_2 ||
219 			cur_stream->state == TCP_ST_LAST_ACK ||
220 			cur_stream->state == TCP_ST_CLOSING ||
221 			cur_stream->state == TCP_ST_TIME_WAIT) {
222 		cur_stream->state = TCP_ST_CLOSED_RSVD;
223 		cur_stream->close_reason = TCP_ACTIVE_CLOSE;
224 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
225 		cur_stream->actions |= MOS_ACT_DESTROY;
226 		return TRUE;
227 	}
228 
229 	if (cur_stream->state >= TCP_ST_ESTABLISHED &&
230 			cur_stream->state <= TCP_ST_CLOSE_WAIT) {
231 		/* ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
232 		/* TODO: flush all the segment queues */
233 		//NotifyConnectionReset(mtcp, cur_stream);
234 	}
235 
236 	if (!(cur_stream->sndvar->on_closeq || cur_stream->sndvar->on_closeq_int ||
237 		  cur_stream->sndvar->on_resetq || cur_stream->sndvar->on_resetq_int)) {
238 		//cur_stream->state = TCP_ST_CLOSED_RSVD;
239 		//cur_stream->actions |= MOS_ACT_DESTROY;
240 		cur_stream->state = TCP_ST_CLOSED_RSVD;
241 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
242 		cur_stream->close_reason = TCP_RESET;
243 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
244 			RaiseCloseEvent(mtcp, cur_stream);
245 	}
246 
247 	return TRUE;
248 }
249 /*----------------------------------------------------------------------------*/
250 inline void
251 EstimateRTT(mtcp_manager_t mtcp, tcp_stream *cur_stream, uint32_t mrtt)
252 {
253 	/* This function should be called for not retransmitted packets */
254 	/* TODO: determine tcp_rto_min */
255 #define TCP_RTO_MIN 0
256 	long m = mrtt;
257 	uint32_t tcp_rto_min = TCP_RTO_MIN;
258 	struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
259 
260 	if (m == 0) {
261 		m = 1;
262 	}
263 	if (rcvvar->srtt != 0) {
264 		/* rtt = 7/8 rtt + 1/8 new */
265 		m -= (rcvvar->srtt >> 3);
266 		rcvvar->srtt += m;
267 		if (m < 0) {
268 			m = -m;
269 			m -= (rcvvar->mdev >> 2);
270 			if (m > 0) {
271 				m >>= 3;
272 			}
273 		} else {
274 			m -= (rcvvar->mdev >> 2);
275 		}
276 		rcvvar->mdev += m;
277 		if (rcvvar->mdev > rcvvar->mdev_max) {
278 			rcvvar->mdev_max = rcvvar->mdev;
279 			if (rcvvar->mdev_max > rcvvar->rttvar) {
280 				rcvvar->rttvar = rcvvar->mdev_max;
281 			}
282 		}
283 		if (TCP_SEQ_GT(cur_stream->sndvar->snd_una, rcvvar->rtt_seq)) {
284 			if (rcvvar->mdev_max < rcvvar->rttvar) {
285 				rcvvar->rttvar -= (rcvvar->rttvar - rcvvar->mdev_max) >> 2;
286 			}
287 			rcvvar->rtt_seq = cur_stream->snd_nxt;
288 			rcvvar->mdev_max = tcp_rto_min;
289 		}
290 	} else {
291 		/* fresh measurement */
292 		rcvvar->srtt = m << 3;
293 		rcvvar->mdev = m << 1;
294 		rcvvar->mdev_max = rcvvar->rttvar = MAX(rcvvar->mdev, tcp_rto_min);
295 		rcvvar->rtt_seq = cur_stream->snd_nxt;
296 	}
297 
298 	TRACE_RTT("mrtt: %u (%uus), srtt: %u (%ums), mdev: %u, mdev_max: %u, "
299 			"rttvar: %u, rtt_seq: %u\n", mrtt, mrtt * TIME_TICK,
300 			rcvvar->srtt, TS_TO_MSEC((rcvvar->srtt) >> 3), rcvvar->mdev,
301 			rcvvar->mdev_max, rcvvar->rttvar, rcvvar->rtt_seq);
302 }
303 /*----------------------------------------------------------------------------*/
304 static inline void
305 ProcessACK(mtcp_manager_t mtcp, tcp_stream *cur_stream,
306 		struct pkt_ctx *pctx)
307 {
308 	const struct tcphdr* tcph = pctx->p.tcph;
309 	uint32_t seq = pctx->p.seq;
310 	uint32_t ack_seq = pctx->p.ack_seq;
311 	struct tcp_send_vars *sndvar = cur_stream->sndvar;
312 	uint32_t cwindow, cwindow_prev;
313 	uint32_t rmlen;
314 	uint32_t snd_wnd_prev;
315 	uint32_t right_wnd_edge;
316 	uint8_t dup;
317 
318 	cwindow = pctx->p.window;
319 	if (!tcph->syn) {
320 		cwindow = cwindow << sndvar->wscale_peer;
321 	}
322 	right_wnd_edge = sndvar->peer_wnd + cur_stream->rcvvar->snd_wl2;
323 
324 	/* If ack overs the sending buffer, return */
325 	if (cur_stream->state == TCP_ST_FIN_WAIT_1 ||
326 			cur_stream->state == TCP_ST_FIN_WAIT_2 ||
327 			cur_stream->state == TCP_ST_CLOSING ||
328 			cur_stream->state == TCP_ST_CLOSE_WAIT ||
329 			cur_stream->state == TCP_ST_LAST_ACK) {
330 		if (sndvar->is_fin_sent && ack_seq == sndvar->fss + 1) {
331 			ack_seq--;
332 		}
333 	}
334 
335 	if (TCP_SEQ_GT(ack_seq, sndvar->sndbuf->head_seq + sndvar->sndbuf->len)) {
336 		TRACE_DBG("Stream %d (%s): invalid acknologement. "
337 				"ack_seq: %u, possible max_ack_seq: %u\n", cur_stream->id,
338 				TCPStateToString(cur_stream), ack_seq,
339 				sndvar->sndbuf->head_seq + sndvar->sndbuf->len);
340 		return;
341 	}
342 
343 #ifdef BE_RESILIENT_TO_PACKET_DROP
344 	if (TCP_SEQ_GT(seq + pctx->p.payloadlen, cur_stream->rcv_nxt))
345 		cur_stream->rcv_nxt = seq + pctx->p.payloadlen;
346 #endif
347 
348 	/* Update window */
349 	if (TCP_SEQ_LT(cur_stream->rcvvar->snd_wl1, seq) ||
350 			(cur_stream->rcvvar->snd_wl1 == seq &&
351 			TCP_SEQ_LT(cur_stream->rcvvar->snd_wl2, ack_seq)) ||
352 			(cur_stream->rcvvar->snd_wl2 == ack_seq &&
353 			cwindow > sndvar->peer_wnd)) {
354 		cwindow_prev = sndvar->peer_wnd;
355 		sndvar->peer_wnd = cwindow;
356 		cur_stream->rcvvar->snd_wl1 = seq;
357 		cur_stream->rcvvar->snd_wl2 = ack_seq;
358 		TRACE_CLWND("Window update. "
359 				"ack: %u, peer_wnd: %u, snd_nxt-snd_una: %u\n",
360 				ack_seq, cwindow, cur_stream->snd_nxt - sndvar->snd_una);
361 		if (cwindow_prev < cur_stream->snd_nxt - sndvar->snd_una &&
362 				sndvar->peer_wnd >= cur_stream->snd_nxt - sndvar->snd_una) {
363 			TRACE_CLWND("%u Broadcasting client window update! "
364 					"ack_seq: %u, peer_wnd: %u (before: %u), "
365 					"(snd_nxt - snd_una: %u)\n",
366 					cur_stream->id, ack_seq, sndvar->peer_wnd, cwindow_prev,
367 					cur_stream->snd_nxt - sndvar->snd_una);
368 			if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
369 				RaiseWriteEvent(mtcp, cur_stream);
370 		}
371 	}
372 
373 	/* Check duplicated ack count */
374 	/* Duplicated ack if
375 	   1) ack_seq is old
376 	   2) payload length is 0.
377 	   3) advertised window not changed.
378 	   4) there is outstanding unacknowledged data
379 	   5) ack_seq == snd_una
380 	 */
381 
382 	dup = FALSE;
383 	if (TCP_SEQ_LT(ack_seq, cur_stream->snd_nxt)) {
384 		if (ack_seq == cur_stream->rcvvar->last_ack_seq && pctx->p.payloadlen == 0) {
385 			if (cur_stream->rcvvar->snd_wl2 + sndvar->peer_wnd == right_wnd_edge) {
386 				if (cur_stream->rcvvar->dup_acks + 1 > cur_stream->rcvvar->dup_acks) {
387 					cur_stream->rcvvar->dup_acks++;
388 				}
389 				dup = TRUE;
390 			}
391 		}
392 	}
393 	if (!dup) {
394 		cur_stream->rcvvar->dup_acks = 0;
395 		cur_stream->rcvvar->last_ack_seq = ack_seq;
396 	}
397 
398 	/* Fast retransmission */
399 	if (dup && cur_stream->rcvvar->dup_acks == 3) {
400 		TRACE_LOSS("Triple duplicated ACKs!! ack_seq: %u\n", ack_seq);
401 		if (TCP_SEQ_LT(ack_seq, cur_stream->snd_nxt)) {
402 			TRACE_LOSS("Reducing snd_nxt from %u to %u\n",
403 					cur_stream->snd_nxt, ack_seq);
404 #if RTM_STAT
405 			sndvar->rstat.tdp_ack_cnt++;
406 			sndvar->rstat.tdp_ack_bytes += (cur_stream->snd_nxt - ack_seq);
407 #endif
408 			if (ack_seq != sndvar->snd_una) {
409 				TRACE_DBG("ack_seq and snd_una mismatch on tdp ack. "
410 						"ack_seq: %u, snd_una: %u\n",
411 						ack_seq, sndvar->snd_una);
412 			}
413 			cur_stream->snd_nxt = ack_seq;
414 		}
415 
416 		/* update congestion control variables */
417 		/* ssthresh to half of min of cwnd and peer wnd */
418 		sndvar->ssthresh = MIN(sndvar->cwnd, sndvar->peer_wnd) / 2;
419 		if (sndvar->ssthresh < 2 * sndvar->mss) {
420 			sndvar->ssthresh = 2 * sndvar->mss;
421 		}
422 		sndvar->cwnd = sndvar->ssthresh + 3 * sndvar->mss;
423 		TRACE_CONG("Fast retransmission. cwnd: %u, ssthresh: %u\n",
424 				sndvar->cwnd, sndvar->ssthresh);
425 
426 		/* count number of retransmissions */
427 		if (sndvar->nrtx < TCP_MAX_RTX) {
428 			sndvar->nrtx++;
429 		} else {
430 			TRACE_DBG("Exceed MAX_RTX.\n");
431 		}
432 
433 		cur_stream->actions |= MOS_ACT_SEND_DATA;
434 
435 	} else if (cur_stream->rcvvar->dup_acks > 3) {
436 		/* Inflate congestion window until before overflow */
437 		if ((uint32_t)(sndvar->cwnd + sndvar->mss) > sndvar->cwnd) {
438 			sndvar->cwnd += sndvar->mss;
439 			TRACE_CONG("Dupack cwnd inflate. cwnd: %u, ssthresh: %u\n",
440 					sndvar->cwnd, sndvar->ssthresh);
441 		}
442 	}
443 
444 #if TCP_OPT_SACK_ENABLED
445 	ParseSACKOption(cur_stream, ack_seq, (uint8_t *)tcph + TCP_HEADER_LEN,
446 			(tcph->doff << 2) - TCP_HEADER_LEN);
447 #endif /* TCP_OPT_SACK_ENABLED */
448 
449 #if RECOVERY_AFTER_LOSS
450 	/* updating snd_nxt (when recovered from loss) */
451 	if (TCP_SEQ_GT(ack_seq, cur_stream->snd_nxt)) {
452 #if RTM_STAT
453 		sndvar->rstat.ack_upd_cnt++;
454 		sndvar->rstat.ack_upd_bytes += (ack_seq - cur_stream->snd_nxt);
455 #endif
456 		TRACE_LOSS("Updating snd_nxt from %u to %u\n",
457 				cur_stream->snd_nxt, ack_seq);
458 		cur_stream->snd_nxt = ack_seq;
459 		if (sndvar->sndbuf->len == 0) {
460 			if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
461 				RemoveFromSendList(mtcp, cur_stream);
462 		}
463 	}
464 #endif
465 
466 	/* If ack_seq is previously acked, return */
467 	if (TCP_SEQ_GEQ(sndvar->sndbuf->head_seq, ack_seq)) {
468 		return;
469 	}
470 
471 	/* Remove acked sequence from send buffer */
472 	rmlen = ack_seq - sndvar->sndbuf->head_seq;
473 	if (rmlen > 0) {
474 		/* Routine goes here only if there is new payload (not retransmitted) */
475 		uint16_t packets;
476 
477 		/* If acks new data */
478 		packets = rmlen / sndvar->eff_mss;
479 		if ((rmlen / sndvar->eff_mss) * sndvar->eff_mss > rmlen) {
480 			packets++;
481 		}
482 
483 		/* Estimate RTT and calculate rto */
484 		if (cur_stream->saw_timestamp) {
485 			EstimateRTT(mtcp, cur_stream,
486 					pctx->p.cur_ts - cur_stream->rcvvar->ts_lastack_rcvd);
487 			sndvar->rto = (cur_stream->rcvvar->srtt >> 3) + cur_stream->rcvvar->rttvar;
488 			assert(sndvar->rto > 0);
489 		} else {
490 			//TODO: Need to implement timestamp estimation without timestamp
491 			TRACE_RTT("NOT IMPLEMENTED.\n");
492 		}
493 
494 		/* Update congestion control variables */
495 		if (cur_stream->state >= TCP_ST_ESTABLISHED) {
496 			if (sndvar->cwnd < sndvar->ssthresh) {
497 				if ((sndvar->cwnd + sndvar->mss) > sndvar->cwnd) {
498 					sndvar->cwnd += (sndvar->mss * packets);
499 				}
500 				TRACE_CONG("slow start cwnd: %u, ssthresh: %u\n",
501 						sndvar->cwnd, sndvar->ssthresh);
502 			} else {
503 				uint32_t new_cwnd = sndvar->cwnd +
504 						packets * sndvar->mss * sndvar->mss /
505 						sndvar->cwnd;
506 				if (new_cwnd > sndvar->cwnd) {
507 					sndvar->cwnd = new_cwnd;
508 				}
509 				//TRACE_CONG("congestion avoidance cwnd: %u, ssthresh: %u\n",
510 				//		sndvar->cwnd, sndvar->ssthresh);
511 			}
512 		}
513 
514 		if (SBUF_LOCK(&sndvar->write_lock)) {
515 			if (errno == EDEADLK)
516 				perror("ProcessACK: write_lock blocked\n");
517 			assert(0);
518 		}
519 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
520 			SBRemove(mtcp->rbm_snd, sndvar->sndbuf, rmlen);
521 		sndvar->snd_una = ack_seq;
522 		snd_wnd_prev = sndvar->snd_wnd;
523 		sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len;
524 
525 		/* If there was no available sending window */
526 		/* notify the newly available window to application */
527 #if SELECTIVE_WRITE_EVENT_NOTIFY
528 		if (snd_wnd_prev <= 0) {
529 #endif /* SELECTIVE_WRITE_EVENT_NOTIFY */
530 			if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
531 				RaiseWriteEvent(mtcp, cur_stream);
532 #if SELECTIVE_WRITE_EVENT_NOTIFY
533 		}
534 #endif /* SELECTIVE_WRITE_EVENT_NOTIFY */
535 
536 		SBUF_UNLOCK(&sndvar->write_lock);
537 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
538 			UpdateRetransmissionTimer(mtcp, cur_stream, pctx->p.cur_ts);
539 	}
540 }
541 /*----------------------------------------------------------------------------*/
542 /* ProcessTCPPayload: merges TCP payload using receive ring buffer            */
543 /* Return: TRUE (1) in normal case, FALSE (0) if immediate ACK is required    */
544 /* CAUTION: should only be called at ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2      */
545 /*----------------------------------------------------------------------------*/
546 static inline int
547 ProcessTCPPayload(mtcp_manager_t mtcp, tcp_stream *cur_stream,
548 				struct pkt_ctx *pctx)
549 {
550 	struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
551 	uint32_t prev_rcv_nxt;
552 	int ret = -1;
553 	bool read_lock;
554 	struct socket_map *walk;
555 
556 	if (!cur_stream->buffer_mgmt)
557 		return FALSE;
558 
559 	/* if seq and segment length is lower than rcv_nxt, ignore and send ack */
560 	if (TCP_SEQ_LT(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt)) {
561 		SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
562 			HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side,
563 				       pctx, MOS_ON_ERROR);
564 		} SOCKQ_FOREACH_END;
565 		return FALSE;
566 	}
567 	/* if payload exceeds receiving buffer, drop and send ack */
568 	if (TCP_SEQ_GT(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt + rcvvar->rcv_wnd)) {
569 		SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
570 			HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side,
571 				       pctx, MOS_ON_ERROR);
572 		} SOCKQ_FOREACH_END;
573 		return FALSE;
574 	}
575 
576 	/* allocate receive buffer if not exist */
577 	if (!rcvvar->rcvbuf) {
578 		rcvvar->rcvbuf = tcprb_new(mtcp->bufseg_pool, g_config.mos->rmem_size, cur_stream->buffer_mgmt);
579 		if (!rcvvar->rcvbuf) {
580 			TRACE_ERROR("Stream %d: Failed to allocate receive buffer.\n",
581 				    cur_stream->id);
582 			cur_stream->state = TCP_ST_CLOSED_RSVD;
583 			cur_stream->close_reason = TCP_NO_MEM;
584 			cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
585 			if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
586 				RaiseErrorEvent(mtcp, cur_stream);
587 
588 			return ERROR;
589 		}
590 	}
591 
592 	read_lock = HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM);
593 
594 	if (read_lock && SBUF_LOCK(&rcvvar->read_lock)) {
595 		if (errno == EDEADLK)
596 			perror("ProcessTCPPayload: read_lock blocked\n");
597 		assert(0);
598 	}
599 
600 	prev_rcv_nxt = cur_stream->rcv_nxt;
601 
602 	tcprb_t *rb = rcvvar->rcvbuf;
603 	loff_t off = seq2loff(rb, pctx->p.seq, (rcvvar->irs + 1));
604 	if (off >= 0) {
605 		if (!HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM) &&
606 			HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE))
607 			tcprb_setpile(rb, rb->pile + tcprb_cflen(rb));
608 		ret = tcprb_pwrite(rb, pctx->p.payload, pctx->p.payloadlen, off);
609 		if (ret < 0) {
610 			/* We try again after warning this result to the user. */
611 			SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
612 				HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side,
613 						   pctx, MOS_ON_ERROR);
614 			} SOCKQ_FOREACH_END;
615 			ret = tcprb_pwrite(rb, pctx->p.payload, pctx->p.payloadlen, off);
616 		}
617 	}
618 	/* TODO: update monitor vars */
619 
620 	/*
621 	 * error can pop up due to disabled buffered management
622 	 * (only in monitor mode). In that case, ignore the warning
623 	 * message.
624 	 */
625 	if (ret < 0 && cur_stream->buffer_mgmt && mtcp->num_msp == 0)
626 		TRACE_ERROR("Cannot merge payload. reason: %d\n", ret);
627 
628 	/* discard the buffer if the state is FIN_WAIT_1 or FIN_WAIT_2,
629 	   meaning that the connection is already closed by the application */
630 	loff_t cftail = rb->pile + tcprb_cflen(rb);
631 	if (cur_stream->state == TCP_ST_FIN_WAIT_1 ||
632 	    cur_stream->state == TCP_ST_FIN_WAIT_2) {
633 		/* XXX: Do we really need to update recv vars? */
634 		tcprb_setpile(rb, cftail);
635 	}
636 	if (cftail > 0 && (rcvvar->irs + 1) + cftail > cur_stream->rcv_nxt) {
637 		RAISE_DEBUG_EVENT(mtcp, cur_stream,
638 				"Move rcv_nxt from %u to %u.\n",
639 				cur_stream->rcv_nxt, (rcvvar->irs + 1) + cftail);
640 		cur_stream->rcv_nxt = (rcvvar->irs + 1) + cftail;
641 	}
642 	assert(cftail - rb->pile >= 0);
643 	rcvvar->rcv_wnd = rb->len - (cftail - rb->pile);
644 
645 	if (read_lock)
646 		SBUF_UNLOCK(&rcvvar->read_lock);
647 
648 
649 	if (TCP_SEQ_LEQ(cur_stream->rcv_nxt, prev_rcv_nxt)) {
650 		/* There are some lost packets */
651 		return FALSE;
652 	}
653 
654 	TRACE_EPOLL("Stream %d data arrived. "
655 		    "len: %d, ET: %llu, IN: %llu, OUT: %llu\n",
656 		    cur_stream->id, pctx->p.payloadlen,
657 		    cur_stream->socket? (unsigned long long)cur_stream->socket->epoll & MOS_EPOLLET : 0,
658 		    cur_stream->socket? (unsigned long long)cur_stream->socket->epoll & MOS_EPOLLIN : 0,
659 		    cur_stream->socket? (unsigned long long)cur_stream->socket->epoll & MOS_EPOLLOUT : 0);
660 
661 	if (cur_stream->state == TCP_ST_ESTABLISHED)
662 		RaiseReadEvent(mtcp, cur_stream);
663 
664 	return TRUE;
665 }
666 /*----------------------------------------------------------------------------*/
667 static inline void
668 Handle_TCP_ST_LISTEN (mtcp_manager_t mtcp, tcp_stream* cur_stream,
669 		struct pkt_ctx *pctx)
670 {
671 
672 	const struct tcphdr* tcph = pctx->p.tcph;
673 
674 	if (tcph->syn) {
675 		if (cur_stream->state == TCP_ST_LISTEN)
676 			cur_stream->rcv_nxt++;
677 		cur_stream->state = TCP_ST_SYN_RCVD;
678 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE | MOS_ON_CONN_START;
679 		TRACE_STATE("Stream %d: TCP_ST_SYN_RCVD\n", cur_stream->id);
680 		cur_stream->actions |= MOS_ACT_SEND_CONTROL;
681 		if (IS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE)) {
682 			/**
683 			 * Passive stream context needs to initialize irs and rcv_nxt
684 			 * as it is not set neither during createserverstream or monitor
685 			 * creation.
686 			 */
687 			cur_stream->rcvvar->irs =
688 				cur_stream->rcv_nxt = pctx->p.seq;
689 		}
690 	} else {
691 		CTRACE_ERROR("Stream %d (TCP_ST_LISTEN): "
692 				"Packet without SYN.\n", cur_stream->id);
693 	}
694 
695 }
696 /*----------------------------------------------------------------------------*/
697 static inline void
698 Handle_TCP_ST_SYN_SENT (mtcp_manager_t mtcp, tcp_stream* cur_stream,
699 		struct pkt_ctx *pctx)
700 {
701 	const struct tcphdr* tcph = pctx->p.tcph;
702 
703 	/* when active open */
704 	if (tcph->ack) {
705 		/* filter the unacceptable acks */
706 		if (TCP_SEQ_LEQ(pctx->p.ack_seq, cur_stream->sndvar->iss)
707 #ifndef BE_RESILIENT_TO_PACKET_DROP
708 			|| TCP_SEQ_GT(pctx->p.ack_seq, cur_stream->snd_nxt)
709 #endif
710 				) {
711 			if (!tcph->rst) {
712 				cur_stream->actions |= MOS_ACT_SEND_RST;
713 			}
714 			return;
715 		}
716 		/* accept the ack */
717 		cur_stream->sndvar->snd_una++;
718 	}
719 
720 	if (tcph->rst) {
721 		if (tcph->ack) {
722 			cur_stream->state = TCP_ST_CLOSED_RSVD;
723 			cur_stream->close_reason = TCP_RESET;
724 			cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
725 			if (cur_stream->socket) {
726 				if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
727 					RaiseErrorEvent(mtcp, cur_stream);
728 			} else {
729 				cur_stream->actions |= MOS_ACT_DESTROY;
730 			}
731 		}
732 		return;
733 	}
734 
735 	if (tcph->ack
736 #ifndef BE_RESILIENT_TO_PACKET_DROP
737 		/* If we already lost SYNACK, let the ACK packet do the SYNACK's role */
738 		&& tcph->syn
739 #endif
740 	   ) {
741 		int ret = HandleActiveOpen(mtcp, cur_stream, pctx);
742 		if (!ret) {
743 			return;
744 		}
745 
746 #ifdef BE_RESILIENT_TO_PACKET_DROP
747 		if (!tcph->syn) {
748 			RAISE_DEBUG_EVENT(mtcp, cur_stream,
749 					"We missed SYNACK. Replace it with an ACK packet.\n");
750 			/* correct some variables */
751 			cur_stream->rcv_nxt = pctx->p.seq;
752 		}
753 #endif
754 
755 		cur_stream->sndvar->nrtx = 0;
756 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
757 			RemoveFromRTOList(mtcp, cur_stream);
758 		cur_stream->state = TCP_ST_ESTABLISHED;
759 		cur_stream->cb_events |= /*MOS_ON_CONN_SETUP |*/ MOS_ON_TCP_STATE_CHANGE;
760 		TRACE_STATE("Stream %d: TCP_ST_ESTABLISHED\n", cur_stream->id);
761 
762 		if (cur_stream->socket) {
763 			if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
764 				RaiseWriteEvent(mtcp, cur_stream);
765 		} else {
766 			TRACE_STATE("Stream %d: ESTABLISHED, but no socket\n", cur_stream->id);
767 			cur_stream->close_reason = TCP_ACTIVE_CLOSE;
768 			cur_stream->actions |= MOS_ACT_SEND_RST;
769 			cur_stream->actions |= MOS_ACT_DESTROY;
770 		}
771 		cur_stream->actions |= MOS_ACT_SEND_CONTROL;
772 		if (g_config.mos->tcp_timeout > 0)
773 			if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
774 				AddtoTimeoutList(mtcp, cur_stream);
775 
776 #ifdef BE_RESILIENT_TO_PACKET_DROP
777 		/* Handle this ack packet */
778 		if (!tcph->syn)
779 			Handle_TCP_ST_ESTABLISHED(mtcp, cur_stream, pctx);
780 #endif
781 
782 	} else if (tcph->syn) {
783 		cur_stream->state = TCP_ST_SYN_RCVD;
784 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
785 		TRACE_STATE("Stream %d: TCP_ST_SYN_RCVD\n", cur_stream->id);
786 		cur_stream->snd_nxt = cur_stream->sndvar->iss;
787 		cur_stream->actions |= MOS_ACT_SEND_CONTROL;
788 	}
789 }
790 /*----------------------------------------------------------------------------*/
791 static inline void
792 Handle_TCP_ST_SYN_RCVD (mtcp_manager_t mtcp, tcp_stream* cur_stream,
793 		struct pkt_ctx *pctx)
794 {
795 	const struct tcphdr* tcph = pctx->p.tcph;
796 	struct tcp_send_vars *sndvar = cur_stream->sndvar;
797 	int ret;
798 
799 	if (tcph->ack) {
800 		uint32_t prior_cwnd;
801 		/* NOTE: We do not validate the ack number because first few packets
802 		 * can also come out of order */
803 
804 		sndvar->snd_una++;
805 		cur_stream->snd_nxt = pctx->p.ack_seq;
806 		prior_cwnd = sndvar->cwnd;
807 		sndvar->cwnd = ((prior_cwnd == 1)?
808 				(sndvar->mss * 2): sndvar->mss);
809 
810 		//UpdateRetransmissionTimer(mtcp, cur_stream, cur_ts);
811 		sndvar->nrtx = 0;
812 		cur_stream->rcv_nxt = cur_stream->rcvvar->irs + 1;
813 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
814 			RemoveFromRTOList(mtcp, cur_stream);
815 
816 		cur_stream->state = TCP_ST_ESTABLISHED;
817 		cur_stream->cb_events |= /*MOS_ON_CONN_SETUP |*/ MOS_ON_TCP_STATE_CHANGE;
818 		TRACE_STATE("Stream %d: TCP_ST_ESTABLISHED\n", cur_stream->id);
819 
820 #ifdef BE_RESILIENT_TO_PACKET_DROP
821 		if (pctx->p.ack_seq != sndvar->iss + 1)
822 			Handle_TCP_ST_ESTABLISHED(mtcp, cur_stream, pctx);
823 #endif
824 
825 		/* update listening socket */
826 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) {
827 
828 			struct tcp_listener *listener = mtcp->listener;
829 
830 			ret = StreamEnqueue(listener->acceptq, cur_stream);
831 			if (ret < 0) {
832 				TRACE_ERROR("Stream %d: Failed to enqueue to "
833 						"the listen backlog!\n", cur_stream->id);
834 				cur_stream->close_reason = TCP_NOT_ACCEPTED;
835 				cur_stream->state = TCP_ST_CLOSED_RSVD;
836 				cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
837 				TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", cur_stream->id);
838 				cur_stream->actions |= MOS_ACT_SEND_CONTROL;
839 			}
840 
841 			/* raise an event to the listening socket */
842 			if (listener->socket && (listener->socket->epoll & MOS_EPOLLIN)) {
843 				AddEpollEvent(mtcp->ep,
844 						MOS_EVENT_QUEUE, listener->socket, MOS_EPOLLIN);
845 			}
846 		}
847 
848 		//TRACE_DBG("Stream %d inserted into acceptq.\n", cur_stream->id);
849 		if (g_config.mos->tcp_timeout > 0)
850 			if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
851 				AddtoTimeoutList(mtcp, cur_stream);
852 
853 	} else {
854 		/* Handle retransmitted SYN packet */
855 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE) &&
856 			tcph->syn) {
857 			if (pctx->p.seq == cur_stream->pair_stream->sndvar->iss) {
858 				TRACE_DBG("syn retransmit! (p.seq = %u / iss = %u)\n",
859 						  pctx->p.seq, cur_stream->pair_stream->sndvar->iss);
860 				cur_stream->cb_events |= MOS_ON_REXMIT;
861 			}
862 		}
863 
864 		TRACE_DBG("Stream %d (TCP_ST_SYN_RCVD): No ACK.\n",
865 				cur_stream->id);
866 		/* retransmit SYN/ACK */
867 		cur_stream->snd_nxt = sndvar->iss;
868 		cur_stream->actions |= MOS_ACT_SEND_CONTROL;
869 	}
870 }
871 /*----------------------------------------------------------------------------*/
872 static inline void
873 Handle_TCP_ST_ESTABLISHED (mtcp_manager_t mtcp, tcp_stream* cur_stream,
874 		struct pkt_ctx *pctx)
875 {
876 	const struct tcphdr* tcph = pctx->p.tcph;
877 
878 	if (tcph->syn) {
879 		/* Handle retransmitted SYNACK packet */
880 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE) &&
881 			tcph->ack) {
882 			if (pctx->p.seq == cur_stream->pair_stream->sndvar->iss) {
883 				TRACE_DBG("syn/ack retransmit! (p.seq = %u / iss = %u)\n",
884 						pctx->p.seq, cur_stream->pair_stream->sndvar->iss);
885 				cur_stream->cb_events |= MOS_ON_REXMIT;
886 			}
887 		}
888 
889 		TRACE_DBG("Stream %d (TCP_ST_ESTABLISHED): weird SYN. "
890 				"seq: %u, expected: %u, ack_seq: %u, expected: %u\n",
891 				cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt,
892 				pctx->p.ack_seq, cur_stream->snd_nxt);
893 		cur_stream->snd_nxt = pctx->p.ack_seq;
894 		cur_stream->actions |= MOS_ACT_SEND_CONTROL;
895 		return;
896 	}
897 
898 	if (pctx->p.payloadlen > 0) {
899 		if (ProcessTCPPayload(mtcp, cur_stream, pctx)) {
900 			/* if return is TRUE, send ACK */
901 			cur_stream->actions |= MOS_ACT_SEND_ACK_AGG;
902 		} else {
903 			cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
904 		}
905 	}
906 
907 	if (tcph->ack) {
908 		if (cur_stream->sndvar->sndbuf) {
909 			ProcessACK(mtcp, cur_stream, pctx);
910 		}
911 	}
912 
913 	if (tcph->fin) {
914 		/* process the FIN only if the sequence is valid */
915 		/* FIN packet is allowed to push payload (should we check for PSH flag)? */
916 		if (!cur_stream->buffer_mgmt ||
917 #ifdef BE_RESILIENT_TO_PACKET_DROP
918 			TCP_SEQ_GEQ(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt)
919 #else
920 			pctx->p.seq + pctx->p.payloadlen == cur_stream->rcv_nxt
921 #endif
922 			) {
923 #ifdef BE_RESILIENT_TO_PACKET_DROP
924 			cur_stream->rcv_nxt = pctx->p.seq + pctx->p.payloadlen;
925 #endif
926 			cur_stream->state = TCP_ST_CLOSE_WAIT;
927 			cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
928 			TRACE_STATE("Stream %d: TCP_ST_CLOSE_WAIT\n", cur_stream->id);
929 			cur_stream->rcv_nxt++;
930 			cur_stream->actions |= MOS_ACT_SEND_CONTROL;
931 
932 			/* notify FIN to application */
933 			RaiseReadEvent(mtcp, cur_stream);
934 		} else {
935 			RAISE_DEBUG_EVENT(mtcp, cur_stream,
936 					"Expected %u, but received %u (= %u + %u)\n",
937 					cur_stream->rcv_nxt, pctx->p.seq + pctx->p.payloadlen,
938 					pctx->p.seq, pctx->p.payloadlen);
939 
940 			cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
941 			return;
942 		}
943 	}
944 }
945 /*----------------------------------------------------------------------------*/
946 static inline void
947 Handle_TCP_ST_CLOSE_WAIT (mtcp_manager_t mtcp, tcp_stream* cur_stream,
948 		struct pkt_ctx *pctx)
949 {
950 	if (TCP_SEQ_LT(pctx->p.seq, cur_stream->rcv_nxt)) {
951 		TRACE_DBG("Stream %d (TCP_ST_CLOSE_WAIT): "
952 				"weird seq: %u, expected: %u\n",
953 				cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt);
954 		cur_stream->actions |= MOS_ACT_SEND_CONTROL;
955 		return;
956 	}
957 
958 	if (cur_stream->sndvar->sndbuf) {
959 		ProcessACK(mtcp, cur_stream, pctx);
960 	}
961 }
962 /*----------------------------------------------------------------------------*/
963 static inline void
964 Handle_TCP_ST_LAST_ACK (mtcp_manager_t mtcp, tcp_stream* cur_stream,
965 		struct pkt_ctx *pctx)
966 {
967 	const struct tcphdr* tcph = pctx->p.tcph;
968 
969 	if (TCP_SEQ_LT(pctx->p.seq, cur_stream->rcv_nxt)) {
970 		TRACE_DBG("Stream %d (TCP_ST_LAST_ACK): "
971 				"weird seq: %u, expected: %u\n",
972 				cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt);
973 		return;
974 	}
975 
976 	if (tcph->ack) {
977 		if (cur_stream->sndvar->sndbuf) {
978 			ProcessACK(mtcp, cur_stream, pctx);
979 		}
980 
981 		if (!cur_stream->sndvar->is_fin_sent) {
982 			/* the case that FIN is not sent yet */
983 			/* this is not ack for FIN, ignore */
984 			TRACE_DBG("Stream %d (TCP_ST_LAST_ACK): "
985 					"No FIN sent yet.\n", cur_stream->id);
986 #ifdef DBGMSG
987 			DumpIPPacket(mtcp, pctx->p.iph, pctx->p.ip_len);
988 #endif
989 #if DUMP_STREAM
990 			DumpStream(mtcp, cur_stream);
991 			DumpControlList(mtcp, mtcp->n_sender[0]);
992 #endif
993 			return;
994 		}
995 
996 		/* check if ACK of FIN */
997 		if (pctx->p.ack_seq == cur_stream->sndvar->fss + 1) {
998 			cur_stream->sndvar->snd_una++;
999 			if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
1000 				UpdateRetransmissionTimer(mtcp, cur_stream, pctx->p.cur_ts);
1001 			cur_stream->state = TCP_ST_CLOSED_RSVD;
1002 			cur_stream->close_reason = TCP_PASSIVE_CLOSE;
1003 			cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1004 			TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n",
1005 					cur_stream->id);
1006 			cur_stream->actions |= MOS_ACT_DESTROY;
1007 		} else {
1008 			TRACE_DBG("Stream %d (TCP_ST_LAST_ACK): Not ACK of FIN. "
1009 					"ack_seq: %u, expected: %u\n",
1010 					cur_stream->id, pctx->p.ack_seq, cur_stream->sndvar->fss + 1);
1011 			//cur_stream->snd_nxt = cur_stream->sndvar->fss;
1012 			cur_stream->actions |= MOS_ACT_SEND_CONTROL;
1013 		}
1014 	} else {
1015 		TRACE_DBG("Stream %d (TCP_ST_LAST_ACK): No ACK\n",
1016 			  cur_stream->id);
1017 		//cur_stream->snd_nxt = cur_stream->sndvar->fss;
1018 		cur_stream->actions |= MOS_ACT_SEND_CONTROL;
1019 	}
1020 }
1021 /*----------------------------------------------------------------------------*/
1022 static inline void
1023 Handle_TCP_ST_FIN_WAIT_1 (mtcp_manager_t mtcp, tcp_stream* cur_stream,
1024 		struct pkt_ctx *pctx)
1025 {
1026 	const struct tcphdr* tcph = pctx->p.tcph;
1027 
1028 	if (TCP_SEQ_LT(pctx->p.seq, cur_stream->rcv_nxt)) {
1029 		RAISE_DEBUG_EVENT(mtcp, cur_stream,
1030 				"Stream %d (FIN_WAIT_1): "
1031 				"weird seq: %u, expected: %u\n",
1032 				cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt);
1033 		cur_stream->actions |= MOS_ACT_SEND_CONTROL;
1034 		return;
1035 	}
1036 
1037 	if (tcph->ack) {
1038 		if (cur_stream->sndvar->sndbuf) {
1039 			ProcessACK(mtcp, cur_stream, pctx);
1040 		}
1041 #if BE_RESILIENT_TO_PACKET_DROP
1042 		if (cur_stream->sndvar->is_fin_sent &&
1043 			((!HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM) &&
1044 			 pctx->p.ack_seq == cur_stream->sndvar->fss) ||
1045 			 pctx->p.ack_seq == cur_stream->sndvar->fss + 1)) {
1046 
1047 #else
1048 		if (cur_stream->sndvar->is_fin_sent &&
1049 			pctx->p.ack_seq == cur_stream->sndvar->fss + 1) {
1050 #endif
1051 			cur_stream->sndvar->snd_una = pctx->p.ack_seq;
1052 			if (TCP_SEQ_GT(pctx->p.ack_seq, cur_stream->snd_nxt)) {
1053 				TRACE_DBG("Stream %d: update snd_nxt to %u\n",
1054 						cur_stream->id, pctx->p.ack_seq);
1055 				cur_stream->snd_nxt = pctx->p.ack_seq;
1056 			}
1057 			//cur_stream->sndvar->snd_una++;
1058 			//UpdateRetransmissionTimer(mtcp, cur_stream, cur_ts);
1059 			cur_stream->sndvar->nrtx = 0;
1060 			if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
1061 				RemoveFromRTOList(mtcp, cur_stream);
1062 			cur_stream->state = TCP_ST_FIN_WAIT_2;
1063 			cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1064 			TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_2\n",
1065 					cur_stream->id);
1066 		} else {
1067 			RAISE_DEBUG_EVENT(mtcp, cur_stream,
1068 					"Failed to transit to FIN_WAIT_2, "
1069 					"is_fin_sent: %s, ack_seq: %u, sndvar->fss: %u\n",
1070 					cur_stream->sndvar->is_fin_sent ? "true" : "false",
1071 					pctx->p.ack_seq, cur_stream->sndvar->fss);
1072 		}
1073 
1074 	} else {
1075 		RAISE_DEBUG_EVENT(mtcp, cur_stream,
1076 				"Failed to transit to FIN_WAIT_2, "
1077 				"We got a %s%s%s%s%s%s packet.",
1078 				pctx->p.tcph->syn ? "S" : "",
1079 				pctx->p.tcph->fin ? "F" : "",
1080 				pctx->p.tcph->rst ? "R" : "",
1081 				pctx->p.tcph->psh ? "P" : "",
1082 				pctx->p.tcph->urg ? "U" : "",
1083 				pctx->p.tcph->ack ? "A" : "");
1084 
1085 		TRACE_DBG("Stream %d: does not contain an ack!\n",
1086 				cur_stream->id);
1087 		return;
1088 	}
1089 
1090 	if (pctx->p.payloadlen > 0) {
1091 		if (ProcessTCPPayload(mtcp, cur_stream, pctx)) {
1092 			/* if return is TRUE, send ACK */
1093 			cur_stream->actions |= MOS_ACT_SEND_ACK_AGG;
1094 		} else {
1095 			cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
1096 		}
1097 	}
1098 
1099 	if (tcph->fin) {
1100 		/* process the FIN only if the sequence is valid */
1101 		/* FIN packet is allowed to push payload (should we check for PSH flag)? */
1102 		if (!cur_stream->buffer_mgmt ||
1103 #ifdef BE_RESILIENT_TO_PACKET_DROP
1104 			TCP_SEQ_GEQ(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt)
1105 #else
1106 			pctx->p.seq + pctx->p.payloadlen == cur_stream->rcv_nxt
1107 #endif
1108 		   ) {
1109 			cur_stream->rcv_nxt++;
1110 
1111 			if (cur_stream->state == TCP_ST_FIN_WAIT_1) {
1112 				cur_stream->state = TCP_ST_CLOSING;
1113 				cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1114 				TRACE_STATE("Stream %d: TCP_ST_CLOSING\n", cur_stream->id);
1115 
1116 			} else if (cur_stream->state == TCP_ST_FIN_WAIT_2) {
1117 				cur_stream->state = TCP_ST_TIME_WAIT;
1118 				cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1119 				TRACE_STATE("Stream %d: TCP_ST_TIME_WAIT\n", cur_stream->id);
1120 				AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts);
1121 			}
1122 			cur_stream->actions |= MOS_ACT_SEND_CONTROL;
1123 		} else {
1124 			RAISE_DEBUG_EVENT(mtcp, cur_stream,
1125 					"Expected %u, but received %u (= %u + %u)\n",
1126 					cur_stream->rcv_nxt, pctx->p.seq + pctx->p.payloadlen,
1127 					pctx->p.seq, pctx->p.payloadlen);
1128 
1129 			cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
1130 			return;
1131 		}
1132 	}
1133 }
1134 /*----------------------------------------------------------------------------*/
1135 static inline void
1136 Handle_TCP_ST_FIN_WAIT_2 (mtcp_manager_t mtcp, tcp_stream* cur_stream,
1137 		struct pkt_ctx *pctx)
1138 {
1139 	const struct tcphdr* tcph = pctx->p.tcph;
1140 
1141 	if (tcph->ack) {
1142 		if (cur_stream->sndvar->sndbuf) {
1143 			ProcessACK(mtcp, cur_stream, pctx);
1144 		}
1145 	} else {
1146 		TRACE_DBG("Stream %d: does not contain an ack!\n",
1147 				cur_stream->id);
1148 		return;
1149 	}
1150 
1151 	if (pctx->p.payloadlen > 0) {
1152 		if (ProcessTCPPayload(mtcp, cur_stream, pctx)) {
1153 			/* if return is TRUE, send ACK */
1154 			cur_stream->actions |= MOS_ACT_SEND_ACK_AGG;
1155 		} else {
1156 			cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
1157 		}
1158 	}
1159 
1160 	if (tcph->fin) {
1161 		/* process the FIN only if the sequence is valid */
1162 		/* FIN packet is allowed to push payload (should we check for PSH flag)? */
1163 		if (!cur_stream->buffer_mgmt ||
1164 #ifdef BE_RESILIENT_TO_PACKET_DROP
1165 			TCP_SEQ_GEQ(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt)
1166 #else
1167 			pctx->p.seq + pctx->p.payloadlen == cur_stream->rcv_nxt
1168 #endif
1169 			) {
1170 			cur_stream->state = TCP_ST_TIME_WAIT;
1171 			cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1172 			cur_stream->rcv_nxt++;
1173 			TRACE_STATE("Stream %d: TCP_ST_TIME_WAIT\n", cur_stream->id);
1174 
1175 			AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts);
1176 			cur_stream->actions |= MOS_ACT_SEND_CONTROL;
1177 		}
1178 	} else {
1179 		TRACE_DBG("Stream %d (TCP_ST_FIN_WAIT_2): No FIN. "
1180 				"seq: %u, ack_seq: %u, snd_nxt: %u, snd_una: %u\n",
1181 				cur_stream->id, pctx->p.seq, pctx->p.ack_seq,
1182 				cur_stream->snd_nxt, cur_stream->sndvar->snd_una);
1183 #if DBGMSG
1184 		DumpIPPacket(mtcp, pctx->p.iph, pctx->p.ip_len);
1185 #endif
1186 	}
1187 
1188 }
1189 /*----------------------------------------------------------------------------*/
1190 static inline void
1191 Handle_TCP_ST_CLOSING (mtcp_manager_t mtcp, tcp_stream* cur_stream,
1192 		struct pkt_ctx *pctx)
1193 {
1194 	const struct tcphdr* tcph = pctx->p.tcph;
1195 
1196 	if (tcph->ack) {
1197 		if (cur_stream->sndvar->sndbuf) {
1198 			ProcessACK(mtcp, cur_stream, pctx);
1199 		}
1200 
1201 		if (!cur_stream->sndvar->is_fin_sent) {
1202 			TRACE_DBG("Stream %d (TCP_ST_CLOSING): "
1203 					"No FIN sent yet.\n", cur_stream->id);
1204 			return;
1205 		}
1206 
1207 		// check if ACK of FIN
1208 		if (pctx->p.ack_seq != cur_stream->sndvar->fss + 1) {
1209 #if DBGMSG
1210 			TRACE_DBG("Stream %d (TCP_ST_CLOSING): Not ACK of FIN. "
1211 				  "ack_seq: %u, snd_nxt: %u, snd_una: %u, fss: %u\n",
1212 				  cur_stream->id, pctx->p.ack_seq, cur_stream->snd_nxt,
1213 				  cur_stream->sndvar->snd_una, cur_stream->sndvar->fss);
1214 			DumpIPPacketToFile(stderr, pctx->p.iph, pctx->p.ip_len);
1215 			DumpStream(mtcp, cur_stream);
1216 #endif
1217 			//assert(0);
1218 			/* if the packet is not the ACK of FIN, ignore */
1219 			return;
1220 		}
1221 
1222 		cur_stream->sndvar->snd_una = pctx->p.ack_seq;
1223 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
1224 			UpdateRetransmissionTimer(mtcp, cur_stream, pctx->p.cur_ts);
1225 		cur_stream->state = TCP_ST_TIME_WAIT;
1226 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1227 		TRACE_STATE("Stream %d: TCP_ST_TIME_WAIT\n", cur_stream->id);
1228 
1229 		AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts);
1230 
1231 	} else {
1232 		TRACE_DBG("Stream %d (TCP_ST_CLOSING): Not ACK\n",
1233 			  cur_stream->id);
1234 		return;
1235 	}
1236 }
1237 /*----------------------------------------------------------------------------*/
1238 void
1239 UpdateRecvTCPContext(mtcp_manager_t mtcp, struct tcp_stream *cur_stream,
1240 		struct pkt_ctx *pctx)
1241 {
1242 	struct tcphdr* tcph = pctx->p.tcph;
1243 	int ret;
1244 
1245 	assert(cur_stream);
1246 
1247 	/* Validate sequence. if not valid, ignore the packet */
1248 	if (cur_stream->state > TCP_ST_SYN_RCVD) {
1249 
1250 		ret = ValidateSequence(mtcp, cur_stream, pctx);
1251 		if (!ret) {
1252 			TRACE_DBG("Stream %d: Unexpected sequence: %u, expected: %u\n",
1253 					cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt);
1254 #ifdef DBGMSG
1255 			DumpIPPacket(mtcp, pctx->p.iph, pctx->p.ip_len);
1256 #endif
1257 #if DUMP_STREAM
1258 			DumpStream(mtcp, cur_stream);
1259 #endif
1260 			/* cur_stream->cb_events |= MOS_ON_ERROR; */
1261 		}
1262 	}
1263 	/* Update receive window size */
1264 	if (tcph->syn) {
1265 		cur_stream->sndvar->peer_wnd = pctx->p.window;
1266 	} else {
1267 		cur_stream->sndvar->peer_wnd =
1268 				(uint32_t)pctx->p.window << cur_stream->sndvar->wscale_peer;
1269 	}
1270 
1271 	cur_stream->last_active_ts = pctx->p.cur_ts;
1272 	if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
1273 		UpdateTimeoutList(mtcp, cur_stream);
1274 
1275 	/* Process RST: process here only if state > TCP_ST_SYN_SENT */
1276 	if (tcph->rst) {
1277 		cur_stream->have_reset = TRUE;
1278 		if (cur_stream->state > TCP_ST_SYN_SENT) {
1279 			if (ProcessRST(mtcp, cur_stream, pctx)) {
1280 				return;
1281 			}
1282 		}
1283 	}
1284 
1285 	if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE) &&
1286 		pctx->p.tcph->fin) {
1287 		if (cur_stream->state == TCP_ST_CLOSE_WAIT ||
1288 			cur_stream->state == TCP_ST_LAST_ACK ||
1289 			cur_stream->state == TCP_ST_CLOSING ||
1290 			cur_stream->state == TCP_ST_TIME_WAIT) {
1291 			/* Handle retransmitted FIN packet */
1292 			if (pctx->p.seq == cur_stream->pair_stream->sndvar->fss) {
1293 				TRACE_DBG("FIN retransmit! (seq = %u / fss = %u)\n",
1294 						pctx->p.seq, cur_stream->pair_stream->sndvar->fss);
1295 				cur_stream->cb_events |= MOS_ON_REXMIT;
1296 			}
1297 		}
1298 	}
1299 
1300 	switch (cur_stream->state) {
1301 	case TCP_ST_LISTEN:
1302 		Handle_TCP_ST_LISTEN(mtcp, cur_stream, pctx);
1303 		break;
1304 
1305 	case TCP_ST_SYN_SENT:
1306 		Handle_TCP_ST_SYN_SENT(mtcp, cur_stream, pctx);
1307 		break;
1308 
1309 	case TCP_ST_SYN_RCVD:
1310 		/* SYN retransmit implies our SYN/ACK was lost. Resend */
1311 		if (tcph->syn && pctx->p.seq == cur_stream->rcvvar->irs)
1312 			Handle_TCP_ST_LISTEN(mtcp, cur_stream, pctx);
1313 		else {
1314 			Handle_TCP_ST_SYN_RCVD(mtcp, cur_stream, pctx);
1315 			if (pctx->p.payloadlen > 0 && cur_stream->state == TCP_ST_ESTABLISHED)
1316 				Handle_TCP_ST_ESTABLISHED(mtcp, cur_stream, pctx);
1317 		}
1318 		break;
1319 
1320 	case TCP_ST_ESTABLISHED:
1321 		Handle_TCP_ST_ESTABLISHED(mtcp, cur_stream, pctx);
1322 		break;
1323 
1324 	case TCP_ST_CLOSE_WAIT:
1325 		Handle_TCP_ST_CLOSE_WAIT(mtcp, cur_stream, pctx);
1326 		break;
1327 
1328 	case TCP_ST_LAST_ACK:
1329 		Handle_TCP_ST_LAST_ACK(mtcp, cur_stream, pctx);
1330 		break;
1331 
1332 	case TCP_ST_FIN_WAIT_1:
1333 		Handle_TCP_ST_FIN_WAIT_1(mtcp, cur_stream, pctx);
1334 		break;
1335 
1336 	case TCP_ST_FIN_WAIT_2:
1337 		Handle_TCP_ST_FIN_WAIT_2(mtcp, cur_stream, pctx);
1338 		break;
1339 
1340 	case TCP_ST_CLOSING:
1341 		Handle_TCP_ST_CLOSING(mtcp, cur_stream, pctx);
1342 		break;
1343 
1344 	case TCP_ST_TIME_WAIT:
1345 		/* the only thing that can arrive in this state is a retransmission
1346 		   of the remote FIN. Acknowledge it, and restart the 2 MSL timeout */
1347 		if (cur_stream->on_timewait_list) {
1348 			RemoveFromTimewaitList(mtcp, cur_stream);
1349 			AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts);
1350 		}
1351 		cur_stream->actions |= MOS_ACT_SEND_CONTROL;
1352 		break;
1353 
1354 	case TCP_ST_CLOSED:
1355 	case TCP_ST_CLOSED_RSVD:
1356 		break;
1357 
1358 	default:
1359 		break;
1360 	}
1361 
1362 	TRACE_STATE("Stream %d: Events: %0lx, Action: %0x\n",
1363 			cur_stream->id, cur_stream->cb_events, cur_stream->actions);
1364 	return;
1365 }
1366 /*----------------------------------------------------------------------------*/
1367 void
1368 DoActionEndTCPPacket(mtcp_manager_t mtcp, struct tcp_stream *cur_stream,
1369 		struct pkt_ctx *pctx)
1370 {
1371 	int i;
1372 
1373 	for (i = 1; i < MOS_ACT_CNT; i = i << 1) {
1374 
1375 		if (cur_stream->actions & i) {
1376 			switch(i) {
1377 			case MOS_ACT_SEND_DATA:
1378 				AddtoSendList(mtcp, cur_stream);
1379 				break;
1380 			case MOS_ACT_SEND_ACK_NOW:
1381 				EnqueueACK(mtcp, cur_stream, pctx->p.cur_ts, ACK_OPT_NOW);
1382 				break;
1383 			case MOS_ACT_SEND_ACK_AGG:
1384 				EnqueueACK(mtcp, cur_stream, pctx->p.cur_ts, ACK_OPT_AGGREGATE);
1385 				break;
1386 			case MOS_ACT_SEND_CONTROL:
1387 				AddtoControlList(mtcp, cur_stream, pctx->p.cur_ts);
1388 				break;
1389 			case MOS_ACT_SEND_RST:
1390 				if (cur_stream->state <= TCP_ST_SYN_SENT)
1391 					SendTCPPacketStandalone(mtcp,
1392 							pctx->p.iph->daddr, pctx->p.tcph->dest,
1393 							pctx->p.iph->saddr, pctx->p.tcph->source,
1394 							0, pctx->p.seq + 1, 0, TCP_FLAG_RST | TCP_FLAG_ACK,
1395 							NULL, 0, pctx->p.cur_ts, 0, 0, -1);
1396 				else
1397 					SendTCPPacketStandalone(mtcp,
1398 							pctx->p.iph->daddr, pctx->p.tcph->dest,
1399 							pctx->p.iph->saddr, pctx->p.tcph->source,
1400 							pctx->p.ack_seq, 0, 0, TCP_FLAG_RST | TCP_FLAG_ACK,
1401 							NULL, 0, pctx->p.cur_ts, 0, 0, -1);
1402 				break;
1403 			case MOS_ACT_DESTROY:
1404 				DestroyTCPStream(mtcp, cur_stream);
1405 				break;
1406 			default:
1407 				assert(1);
1408 				break;
1409 			}
1410 		}
1411 	}
1412 
1413 	cur_stream->actions = 0;
1414 }
1415 /*----------------------------------------------------------------------------*/
1416 /**
1417  * Called (when monitoring mode is enabled).. for every outgoing packet to the
1418  * NIC.
1419  */
1420 inline void
1421 UpdatePassiveRecvTCPContext(mtcp_manager_t mtcp, struct tcp_stream *cur_stream,
1422 			       struct pkt_ctx *pctx)
1423 {
1424 	UpdateRecvTCPContext(mtcp, cur_stream, pctx);
1425 }
1426 /*----------------------------------------------------------------------------*/
1427 /* NOTE TODO: This event prediction is additional overhaed of HK_RCV hook.
1428  * We can transparently optimize this by disabling prediction of events which
1429  * are not monitored by anyone. */
1430 inline void
1431 PreRecvTCPEventPrediction(mtcp_manager_t mtcp, struct pkt_ctx *pctx,
1432 			  struct tcp_stream *recvside_stream)
1433 {
1434 	tcp_rb_overlapchk(mtcp, pctx, recvside_stream);
1435 }
1436 /*----------------------------------------------------------------------------*/
1437