xref: /mOS-networking-stack/core/src/tcp_in.c (revision 05e3289c)
1 #include <assert.h>
2 
3 #include "mos_api.h"
4 #include "tcp_util.h"
5 #include "tcp_in.h"
6 #include "tcp_out.h"
7 #include "tcp_ring_buffer.h"
8 #include "eventpoll.h"
9 #include "debug.h"
10 #include "timer.h"
11 #include "ip_in.h"
12 #include "tcp_rb.h"
13 #include "config.h"
14 #include "scalable_event.h"
15 
16 #define MAX(a, b) ((a)>(b)?(a):(b))
17 #define MIN(a, b) ((a)<(b)?(a):(b))
18 
19 #define RECOVERY_AFTER_LOSS TRUE
20 #define SELECTIVE_WRITE_EVENT_NOTIFY TRUE
21 /*----------------------------------------------------------------------------*/
22 static inline void
23 Handle_TCP_ST_ESTABLISHED (mtcp_manager_t mtcp, tcp_stream* cur_stream,
24 		struct pkt_ctx *pctx);
25 /*----------------------------------------------------------------------------*/
26 static inline int
FilterSYNPacket(mtcp_manager_t mtcp,uint32_t ip,uint16_t port)27 FilterSYNPacket(mtcp_manager_t mtcp, uint32_t ip, uint16_t port)
28 {
29 	struct sockaddr_in *addr;
30 
31 	/* TODO: This listening logic should be revised */
32 
33 	/* if not listening, drop */
34 	if (!mtcp->listener) {
35 		return FALSE;
36 	}
37 
38 	/* if not the address we want, drop */
39 	addr = &mtcp->listener->socket->saddr;
40 	if (addr->sin_port == port) {
41 		if (addr->sin_addr.s_addr != INADDR_ANY) {
42 			if (ip == addr->sin_addr.s_addr) {
43 				return TRUE;
44 			}
45 			return FALSE;
46 		} else {
47 			int i;
48 
49 			for (i = 0; i < g_config.mos->netdev_table->num; i++) {
50 				if (ip == g_config.mos->netdev_table->ent[i]->ip_addr) {
51 					return TRUE;
52 				}
53 			}
54 			return FALSE;
55 		}
56 	}
57 
58 	return FALSE;
59 }
60 /*----------------------------------------------------------------------------*/
61 static inline int
HandleActiveOpen(mtcp_manager_t mtcp,tcp_stream * cur_stream,struct pkt_ctx * pctx)62 HandleActiveOpen(mtcp_manager_t mtcp, tcp_stream *cur_stream,
63 		struct pkt_ctx *pctx)
64 {
65 	const struct tcphdr* tcph = pctx->p.tcph;
66 
67 	cur_stream->rcvvar->irs = pctx->p.seq;
68 	cur_stream->snd_nxt = pctx->p.ack_seq;
69 	cur_stream->sndvar->peer_wnd = pctx->p.window;
70 	cur_stream->rcvvar->snd_wl1 = cur_stream->rcvvar->irs - 1;
71 	cur_stream->rcv_nxt = cur_stream->rcvvar->irs + 1;
72 	cur_stream->rcvvar->last_ack_seq = pctx->p.ack_seq;
73 	ParseTCPOptions(cur_stream, pctx->p.cur_ts, (uint8_t *)tcph + TCP_HEADER_LEN,
74 			(tcph->doff << 2) - TCP_HEADER_LEN);
75 	cur_stream->sndvar->cwnd = ((cur_stream->sndvar->cwnd == 1)?
76 			(cur_stream->sndvar->mss * 2): cur_stream->sndvar->mss);
77 	cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10;
78 	if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
79 		UpdateRetransmissionTimer(mtcp, cur_stream, pctx->p.cur_ts);
80 
81 	return TRUE;
82 }
83 /*----------------------------------------------------------------------------*/
84 /* ValidateSequence: validates sequence number of the segment                 */
85 /* Return: TRUE if acceptable, FALSE if not acceptable                        */
86 /*----------------------------------------------------------------------------*/
87 static inline int
ValidateSequence(mtcp_manager_t mtcp,tcp_stream * cur_stream,struct pkt_ctx * pctx)88 ValidateSequence(mtcp_manager_t mtcp, tcp_stream *cur_stream,
89 		struct pkt_ctx *pctx)
90 {
91 	const struct tcphdr* tcph = pctx->p.tcph;
92 
93 	/* Protect Against Wrapped Sequence number (PAWS) */
94 	if (!tcph->rst && cur_stream->saw_timestamp) {
95 		struct tcp_timestamp ts;
96 
97 		if (!ParseTCPTimestamp(cur_stream, &ts,
98 				(uint8_t *)tcph + TCP_HEADER_LEN,
99 				(tcph->doff << 2) - TCP_HEADER_LEN)) {
100 			/* if there is no timestamp */
101 			/* TODO: implement here */
102 			TRACE_DBG("No timestamp found.\n");
103 			return FALSE;
104 		}
105 
106 		/* RFC1323: if SEG.TSval < TS.Recent, drop and send ack */
107 		if (TCP_SEQ_LT(ts.ts_val, cur_stream->rcvvar->ts_recent)) {
108 			/* TODO: ts_recent should be invalidated
109 					 before timestamp wraparound for long idle flow */
110 			TRACE_DBG("PAWS Detect wrong timestamp. "
111 					"seq: %u, ts_val: %u, prev: %u\n",
112 					pctx->p.seq, ts.ts_val, cur_stream->rcvvar->ts_recent);
113 			cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
114 			return FALSE;
115 		} else {
116 			/* valid timestamp */
117 			if (TCP_SEQ_GT(ts.ts_val, cur_stream->rcvvar->ts_recent)) {
118 				TRACE_TSTAMP("Timestamp update. cur: %u, prior: %u "
119 					"(time diff: %uus)\n",
120 					ts.ts_val, cur_stream->rcvvar->ts_recent,
121 					TS_TO_USEC(pctx->p.cur_ts - cur_stream->rcvvar->ts_last_ts_upd));
122 				cur_stream->rcvvar->ts_last_ts_upd = pctx->p.cur_ts;
123 			}
124 
125 			cur_stream->rcvvar->ts_recent = ts.ts_val;
126 			cur_stream->rcvvar->ts_lastack_rcvd = ts.ts_ref;
127 		}
128 	}
129 
130 	/* TCP sequence validation */
131 	if (!TCP_SEQ_BETWEEN(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt,
132 				cur_stream->rcv_nxt + cur_stream->rcvvar->rcv_wnd)) {
133 
134 		/* if RST bit is set, ignore the segment */
135 		if (tcph->rst)
136 			return FALSE;
137 
138 		if (cur_stream->state == TCP_ST_ESTABLISHED) {
139 			/* check if it is to get window advertisement */
140 			if (pctx->p.seq + 1 == cur_stream->rcv_nxt) {
141 				TRACE_DBG("Window update request. (seq: %u, rcv_wnd: %u)\n",
142 						pctx->p.seq, cur_stream->rcvvar->rcv_wnd);
143 				cur_stream->actions |= MOS_ACT_SEND_ACK_AGG;
144 				return FALSE;
145 
146 			}
147 
148 			if (TCP_SEQ_LEQ(pctx->p.seq, cur_stream->rcv_nxt)) {
149 				cur_stream->actions |= MOS_ACT_SEND_ACK_AGG;
150 			} else {
151 				cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
152 			}
153 		} else {
154 			if (cur_stream->state == TCP_ST_TIME_WAIT) {
155 				TRACE_DBG("Stream %d: tw expire update to %u\n",
156 						cur_stream->id, cur_stream->rcvvar->ts_tw_expire);
157 				AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts);
158 			}
159 			cur_stream->actions |= MOS_ACT_SEND_CONTROL;
160 		}
161 		return FALSE;
162 	}
163 
164 	return TRUE;
165 }
166 /*----------------------------------------------------------------------------*/
167 static inline void
NotifyConnectionReset(mtcp_manager_t mtcp,tcp_stream * cur_stream)168 NotifyConnectionReset(mtcp_manager_t mtcp, tcp_stream *cur_stream)
169 {
170 	TRACE_DBG("Stream %d: Notifying connection reset.\n", cur_stream->id);
171 	/* TODO: implement this function */
172 	/* signal to user "connection reset" */
173 }
174 /*----------------------------------------------------------------------------*/
175 static inline int
ProcessRST(mtcp_manager_t mtcp,tcp_stream * cur_stream,struct pkt_ctx * pctx)176 ProcessRST(mtcp_manager_t mtcp, tcp_stream *cur_stream,
177 		struct pkt_ctx *pctx)
178 {
179 	/* TODO: we need reset validation logic */
180 	/* the sequence number of a RST should be inside window */
181 	/* (in SYN_SENT state, it should ack the previous SYN */
182 
183 	TRACE_DBG("Stream %d: TCP RESET (%s)\n",
184 			cur_stream->id, TCPStateToString(cur_stream));
185 #if DUMP_STREAM
186 	DumpStream(mtcp, cur_stream);
187 #endif
188 
189 	if (cur_stream->state <= TCP_ST_SYN_SENT) {
190 		/* not handled here */
191 		return FALSE;
192 	}
193 
194 	if (cur_stream->state == TCP_ST_SYN_RCVD) {
195 		/* ACK number of last sent ACK packet == rcv_nxt + 1*/
196 		if (pctx->p.seq == 0 ||
197 #ifdef BE_RESILIENT_TO_PACKET_DROP
198 			pctx->p.seq == cur_stream->rcv_nxt + 1 ||
199 #endif
200 			pctx->p.ack_seq == cur_stream->snd_nxt)
201 		{
202 			cur_stream->state = TCP_ST_CLOSED_RSVD;
203 			cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
204 			cur_stream->close_reason = TCP_RESET;
205 			cur_stream->actions |= MOS_ACT_DESTROY;
206 		} else {
207 			RAISE_DEBUG_EVENT(mtcp, cur_stream,
208 				"(SYN_RCVD): Ignore invalid RST. "
209 				"ack_seq expected: %u, ack_seq rcvd: %u\n",
210 				cur_stream->rcv_nxt + 1, pctx->p.ack_seq);
211 		}
212 		return TRUE;
213 	}
214 
215 	/* if the application is already closed the connection,
216 	   just destroy the it */
217 	if (cur_stream->state == TCP_ST_FIN_WAIT_1 ||
218 			cur_stream->state == TCP_ST_FIN_WAIT_2 ||
219 			cur_stream->state == TCP_ST_LAST_ACK ||
220 			cur_stream->state == TCP_ST_CLOSING ||
221 			cur_stream->state == TCP_ST_TIME_WAIT) {
222 		cur_stream->state = TCP_ST_CLOSED_RSVD;
223 		cur_stream->close_reason = TCP_ACTIVE_CLOSE;
224 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
225 		cur_stream->actions |= MOS_ACT_DESTROY;
226 		return TRUE;
227 	}
228 
229 	if (cur_stream->state >= TCP_ST_ESTABLISHED &&
230 			cur_stream->state <= TCP_ST_CLOSE_WAIT) {
231 		/* ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
232 		/* TODO: flush all the segment queues */
233 		//NotifyConnectionReset(mtcp, cur_stream);
234 	}
235 
236 	if (!(cur_stream->sndvar->on_closeq || cur_stream->sndvar->on_closeq_int ||
237 		  cur_stream->sndvar->on_resetq || cur_stream->sndvar->on_resetq_int)) {
238 		//cur_stream->state = TCP_ST_CLOSED_RSVD;
239 		//cur_stream->actions |= MOS_ACT_DESTROY;
240 		cur_stream->state = TCP_ST_CLOSED_RSVD;
241 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
242 		cur_stream->close_reason = TCP_RESET;
243 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
244 			RaiseCloseEvent(mtcp, cur_stream);
245 	}
246 
247 	return TRUE;
248 }
249 /*----------------------------------------------------------------------------*/
250 inline void
EstimateRTT(mtcp_manager_t mtcp,tcp_stream * cur_stream,uint32_t mrtt)251 EstimateRTT(mtcp_manager_t mtcp, tcp_stream *cur_stream, uint32_t mrtt)
252 {
253 	/* This function should be called for not retransmitted packets */
254 	/* TODO: determine tcp_rto_min */
255 #define TCP_RTO_MIN 0
256 	long m = mrtt;
257 	uint32_t tcp_rto_min = TCP_RTO_MIN;
258 	struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
259 
260 	if (m == 0) {
261 		m = 1;
262 	}
263 	if (rcvvar->srtt != 0) {
264 		/* rtt = 7/8 rtt + 1/8 new */
265 		m -= (rcvvar->srtt >> 3);
266 		rcvvar->srtt += m;
267 		if (m < 0) {
268 			m = -m;
269 			m -= (rcvvar->mdev >> 2);
270 			if (m > 0) {
271 				m >>= 3;
272 			}
273 		} else {
274 			m -= (rcvvar->mdev >> 2);
275 		}
276 		rcvvar->mdev += m;
277 		if (rcvvar->mdev > rcvvar->mdev_max) {
278 			rcvvar->mdev_max = rcvvar->mdev;
279 			if (rcvvar->mdev_max > rcvvar->rttvar) {
280 				rcvvar->rttvar = rcvvar->mdev_max;
281 			}
282 		}
283 		if (TCP_SEQ_GT(cur_stream->sndvar->snd_una, rcvvar->rtt_seq)) {
284 			if (rcvvar->mdev_max < rcvvar->rttvar) {
285 				rcvvar->rttvar -= (rcvvar->rttvar - rcvvar->mdev_max) >> 2;
286 			}
287 			rcvvar->rtt_seq = cur_stream->snd_nxt;
288 			rcvvar->mdev_max = tcp_rto_min;
289 		}
290 	} else {
291 		/* fresh measurement */
292 		rcvvar->srtt = m << 3;
293 		rcvvar->mdev = m << 1;
294 		rcvvar->mdev_max = rcvvar->rttvar = MAX(rcvvar->mdev, tcp_rto_min);
295 		rcvvar->rtt_seq = cur_stream->snd_nxt;
296 	}
297 
298 	TRACE_RTT("mrtt: %u (%uus), srtt: %u (%ums), mdev: %u, mdev_max: %u, "
299 			"rttvar: %u, rtt_seq: %u\n", mrtt, mrtt * TIME_TICK,
300 			rcvvar->srtt, TS_TO_MSEC((rcvvar->srtt) >> 3), rcvvar->mdev,
301 			rcvvar->mdev_max, rcvvar->rttvar, rcvvar->rtt_seq);
302 }
303 /*----------------------------------------------------------------------------*/
304 static inline void
ProcessACK(mtcp_manager_t mtcp,tcp_stream * cur_stream,struct pkt_ctx * pctx)305 ProcessACK(mtcp_manager_t mtcp, tcp_stream *cur_stream,
306 		struct pkt_ctx *pctx)
307 {
308 	const struct tcphdr* tcph = pctx->p.tcph;
309 	uint32_t seq = pctx->p.seq;
310 	uint32_t ack_seq = pctx->p.ack_seq;
311 	struct tcp_send_vars *sndvar = cur_stream->sndvar;
312 	uint32_t cwindow, cwindow_prev;
313 	uint32_t rmlen;
314 	uint32_t snd_wnd_prev;
315 	uint32_t right_wnd_edge;
316 	uint8_t dup;
317 
318 	cwindow = pctx->p.window;
319 	if (!tcph->syn) {
320 		cwindow = cwindow << sndvar->wscale_peer;
321 	}
322 	right_wnd_edge = sndvar->peer_wnd + cur_stream->rcvvar->snd_wl2;
323 
324 	if (cur_stream->state == TCP_ST_FIN_WAIT_1 ||
325 			cur_stream->state == TCP_ST_FIN_WAIT_2 ||
326 			cur_stream->state == TCP_ST_CLOSING ||
327 			cur_stream->state == TCP_ST_CLOSE_WAIT ||
328 			cur_stream->state == TCP_ST_LAST_ACK) {
329 		if (sndvar->is_fin_sent && ack_seq == sndvar->fss + 1) {
330 			ack_seq--;
331 		}
332 	}
333 
334 	/* If ack overs the sending buffer, return */
335 	if (TCP_SEQ_GT(ack_seq, sndvar->sndbuf->head_seq + sndvar->sndbuf->len)) {
336 		TRACE_DBG("Stream %d (%s): invalid acknologement. "
337 				"ack_seq: %u, possible max_ack_seq: %u\n", cur_stream->id,
338 				TCPStateToString(cur_stream), ack_seq,
339 				sndvar->sndbuf->head_seq + sndvar->sndbuf->len);
340 		return;
341 	}
342 
343 #ifdef BE_RESILIENT_TO_PACKET_DROP
344 	if (TCP_SEQ_GT(seq + pctx->p.payloadlen, cur_stream->rcv_nxt))
345 		cur_stream->rcv_nxt = seq + pctx->p.payloadlen;
346 #endif
347 
348 	/* Update window */
349 	if (TCP_SEQ_LT(cur_stream->rcvvar->snd_wl1, seq) ||
350 			(cur_stream->rcvvar->snd_wl1 == seq &&
351 			TCP_SEQ_LT(cur_stream->rcvvar->snd_wl2, ack_seq)) ||
352 			(cur_stream->rcvvar->snd_wl2 == ack_seq &&
353 			cwindow > sndvar->peer_wnd)) {
354 		cwindow_prev = sndvar->peer_wnd;
355 		sndvar->peer_wnd = cwindow;
356 		cur_stream->rcvvar->snd_wl1 = seq;
357 		cur_stream->rcvvar->snd_wl2 = ack_seq;
358 		TRACE_CLWND("Window update. "
359 				"ack: %u, peer_wnd: %u, snd_nxt-snd_una: %u\n",
360 				ack_seq, cwindow, cur_stream->snd_nxt - sndvar->snd_una);
361 		if (cwindow_prev < cur_stream->snd_nxt - sndvar->snd_una &&
362 				sndvar->peer_wnd >= cur_stream->snd_nxt - sndvar->snd_una) {
363 			TRACE_CLWND("%u Broadcasting client window update! "
364 					"ack_seq: %u, peer_wnd: %u (before: %u), "
365 					"(snd_nxt - snd_una: %u)\n",
366 					cur_stream->id, ack_seq, sndvar->peer_wnd, cwindow_prev,
367 					cur_stream->snd_nxt - sndvar->snd_una);
368 			if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
369 				RaiseWriteEvent(mtcp, cur_stream);
370 		}
371 	}
372 
373 	/* Check duplicated ack count */
374 	/* Duplicated ack if
375 	   1) ack_seq is old
376 	   2) payload length is 0.
377 	   3) advertised window not changed.
378 	   4) there is outstanding unacknowledged data
379 	   5) ack_seq == snd_una
380 	 */
381 
382 	dup = FALSE;
383 	if (TCP_SEQ_LT(ack_seq, cur_stream->snd_nxt)) {
384 		if (ack_seq == cur_stream->rcvvar->last_ack_seq && pctx->p.payloadlen == 0) {
385 			if (cur_stream->rcvvar->snd_wl2 + sndvar->peer_wnd == right_wnd_edge) {
386 				if (cur_stream->rcvvar->dup_acks + 1 > cur_stream->rcvvar->dup_acks) {
387 					cur_stream->rcvvar->dup_acks++;
388 				}
389 				dup = TRUE;
390 			}
391 		}
392 	}
393 	if (!dup) {
394 		cur_stream->rcvvar->dup_acks = 0;
395 		cur_stream->rcvvar->last_ack_seq = ack_seq;
396 	}
397 
398 	/* Fast retransmission */
399 	if (dup && cur_stream->rcvvar->dup_acks == 3) {
400 		TRACE_LOSS("Triple duplicated ACKs!! ack_seq: %u\n", ack_seq);
401 		if (TCP_SEQ_LT(ack_seq, cur_stream->snd_nxt)) {
402 			TRACE_LOSS("Reducing snd_nxt from %u to %u\n",
403 					cur_stream->snd_nxt, ack_seq);
404 #if RTM_STAT
405 			sndvar->rstat.tdp_ack_cnt++;
406 			sndvar->rstat.tdp_ack_bytes += (cur_stream->snd_nxt - ack_seq);
407 #endif
408 			if (ack_seq != sndvar->snd_una) {
409 				TRACE_DBG("ack_seq and snd_una mismatch on tdp ack. "
410 						"ack_seq: %u, snd_una: %u\n",
411 						ack_seq, sndvar->snd_una);
412 			}
413 			cur_stream->snd_nxt = ack_seq;
414 		}
415 
416 		/* update congestion control variables */
417 		/* ssthresh to half of min of cwnd and peer wnd */
418 		sndvar->ssthresh = MIN(sndvar->cwnd, sndvar->peer_wnd) / 2;
419 		if (sndvar->ssthresh < 2 * sndvar->mss) {
420 			sndvar->ssthresh = 2 * sndvar->mss;
421 		}
422 		sndvar->cwnd = sndvar->ssthresh + 3 * sndvar->mss;
423 		TRACE_CONG("Fast retransmission. cwnd: %u, ssthresh: %u\n",
424 				sndvar->cwnd, sndvar->ssthresh);
425 
426 		/* count number of retransmissions */
427 		if (sndvar->nrtx < TCP_MAX_RTX) {
428 			sndvar->nrtx++;
429 		} else {
430 			TRACE_DBG("Exceed MAX_RTX.\n");
431 		}
432 
433 		cur_stream->actions |= MOS_ACT_SEND_DATA;
434 
435 	} else if (cur_stream->rcvvar->dup_acks > 3) {
436 		/* Inflate congestion window until before overflow */
437 		if ((uint32_t)(sndvar->cwnd + sndvar->mss) > sndvar->cwnd) {
438 			sndvar->cwnd += sndvar->mss;
439 			TRACE_CONG("Dupack cwnd inflate. cwnd: %u, ssthresh: %u\n",
440 					sndvar->cwnd, sndvar->ssthresh);
441 		}
442 	}
443 
444 #if TCP_OPT_SACK_ENABLED
445 	ParseSACKOption(cur_stream, ack_seq, (uint8_t *)tcph + TCP_HEADER_LEN,
446 			(tcph->doff << 2) - TCP_HEADER_LEN);
447 #endif /* TCP_OPT_SACK_ENABLED */
448 
449 #if RECOVERY_AFTER_LOSS
450 	/* updating snd_nxt (when recovered from loss) */
451 	if (TCP_SEQ_GT(ack_seq, cur_stream->snd_nxt)) {
452 #if RTM_STAT
453 		sndvar->rstat.ack_upd_cnt++;
454 		sndvar->rstat.ack_upd_bytes += (ack_seq - cur_stream->snd_nxt);
455 #endif
456 		TRACE_LOSS("Updating snd_nxt from %u to %u\n",
457 				cur_stream->snd_nxt, ack_seq);
458 		cur_stream->snd_nxt = ack_seq;
459 		if (sndvar->sndbuf->len == 0) {
460 			if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
461 				RemoveFromSendList(mtcp, cur_stream);
462 		}
463 	}
464 #endif
465 
466 	/* If ack_seq is previously acked, return */
467 	if (TCP_SEQ_GEQ(sndvar->sndbuf->head_seq, ack_seq)) {
468 		return;
469 	}
470 
471 	/* Remove acked sequence from send buffer */
472 	rmlen = ack_seq - sndvar->sndbuf->head_seq;
473 	if (rmlen > 0) {
474 		/* Routine goes here only if there is new payload (not retransmitted) */
475 		uint16_t packets;
476 
477 		/* If acks new data */
478 		packets = rmlen / sndvar->eff_mss;
479 		if ((rmlen / sndvar->eff_mss) * sndvar->eff_mss > rmlen) {
480 			packets++;
481 		}
482 
483 		/* Estimate RTT and calculate rto */
484 		if (cur_stream->saw_timestamp) {
485 			EstimateRTT(mtcp, cur_stream,
486 					pctx->p.cur_ts - cur_stream->rcvvar->ts_lastack_rcvd);
487 			sndvar->rto = (cur_stream->rcvvar->srtt >> 3) + cur_stream->rcvvar->rttvar;
488 			assert(sndvar->rto > 0);
489 		} else {
490 			//TODO: Need to implement timestamp estimation without timestamp
491 			TRACE_RTT("NOT IMPLEMENTED.\n");
492 		}
493 
494 		/* Update congestion control variables */
495 		if (cur_stream->state >= TCP_ST_ESTABLISHED) {
496 			if (sndvar->cwnd < sndvar->ssthresh) {
497 				if ((sndvar->cwnd + sndvar->mss) > sndvar->cwnd) {
498 					sndvar->cwnd += (sndvar->mss * packets);
499 				}
500 				TRACE_CONG("slow start cwnd: %u, ssthresh: %u\n",
501 						sndvar->cwnd, sndvar->ssthresh);
502 			} else {
503 				uint32_t new_cwnd = sndvar->cwnd +
504 						packets * sndvar->mss * sndvar->mss /
505 						sndvar->cwnd;
506 				if (new_cwnd > sndvar->cwnd) {
507 					sndvar->cwnd = new_cwnd;
508 				}
509 				//TRACE_CONG("congestion avoidance cwnd: %u, ssthresh: %u\n",
510 				//		sndvar->cwnd, sndvar->ssthresh);
511 			}
512 		}
513 
514 		if (SBUF_LOCK(&sndvar->write_lock)) {
515 			if (errno == EDEADLK)
516 				perror("ProcessACK: write_lock blocked\n");
517 			assert(0);
518 		}
519 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
520 			SBRemove(mtcp->rbm_snd, sndvar->sndbuf, rmlen);
521 		sndvar->snd_una = ack_seq;
522 		snd_wnd_prev = sndvar->snd_wnd;
523 		sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len;
524 
525 		/* If there was no available sending window */
526 		/* notify the newly available window to application */
527 #if SELECTIVE_WRITE_EVENT_NOTIFY
528 		if (snd_wnd_prev <= 0) {
529 #endif /* SELECTIVE_WRITE_EVENT_NOTIFY */
530 			if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
531 				RaiseWriteEvent(mtcp, cur_stream);
532 #if SELECTIVE_WRITE_EVENT_NOTIFY
533 		}
534 #endif /* SELECTIVE_WRITE_EVENT_NOTIFY */
535 
536 		SBUF_UNLOCK(&sndvar->write_lock);
537 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
538 			UpdateRetransmissionTimer(mtcp, cur_stream, pctx->p.cur_ts);
539 	}
540 }
541 /*----------------------------------------------------------------------------*/
542 /* ProcessTCPPayload: merges TCP payload using receive ring buffer            */
543 /* Return: TRUE (1) in normal case, FALSE (0) if immediate ACK is required    */
544 /* CAUTION: should only be called at ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2      */
545 /*----------------------------------------------------------------------------*/
546 static inline int
ProcessTCPPayload(mtcp_manager_t mtcp,tcp_stream * cur_stream,struct pkt_ctx * pctx)547 ProcessTCPPayload(mtcp_manager_t mtcp, tcp_stream *cur_stream,
548 				struct pkt_ctx *pctx)
549 {
550 	struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
551 	uint32_t prev_rcv_nxt;
552 	int ret = -1;
553 	bool read_lock;
554 	struct socket_map *walk;
555 
556 	if (!cur_stream->buffer_mgmt)
557 		return FALSE;
558 
559 	/* if seq and segment length is lower than rcv_nxt, ignore and send ack */
560 	if (TCP_SEQ_LT(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt))
561 		return FALSE;
562 
563 	/*
564 	TRACE_DEBUG("pctx->p.seq = %u, pctx->p.payloadlen = %d / cur_stream->rcv_nxt = %u, "
565 	            "rcvvar->rcv_wnd = %u\n",
566 				pctx->p.seq, pctx->p.payloadlen, cur_stream->rcv_nxt, rcvvar->rcv_wnd);
567 	*/
568 
569 	/* if payload exceeds receiving buffer, drop and send ack */
570 	if (TCP_SEQ_GT(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt + rcvvar->rcv_wnd)) {
571 		/* MOS_ON_ERROR: payload outside the window arrives */
572 		if (cur_stream->side == MOS_SIDE_CLI) {
573 			SOCKQ_FOREACH_REVERSE(walk, &cur_stream->msocks) {
574 				HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side,
575 					       pctx, MOS_ON_ERROR);
576 			} SOCKQ_FOREACH_END;
577 		} else { /* cur_stream->side == MOS_SIDE_SVR */
578 			SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
579 				HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side,
580 					       pctx, MOS_ON_ERROR);
581 			} SOCKQ_FOREACH_END;
582 		}
583 		return FALSE;
584 	}
585 
586 	/* allocate receive buffer if not exist */
587 	if (!rcvvar->rcvbuf) {
588 		rcvvar->rcvbuf = tcprb_new(mtcp->bufseg_pool, g_config.mos->rmem_size, cur_stream->buffer_mgmt);
589 		if (!rcvvar->rcvbuf) {
590 			TRACE_ERROR("Stream %d: Failed to allocate receive buffer.\n",
591 				    cur_stream->id);
592 			cur_stream->state = TCP_ST_CLOSED_RSVD;
593 			cur_stream->close_reason = TCP_NO_MEM;
594 			cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
595 			if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
596 				RaiseErrorEvent(mtcp, cur_stream);
597 
598 			return ERROR;
599 		}
600 	}
601 
602 	read_lock = HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM);
603 
604 	if (read_lock && SBUF_LOCK(&rcvvar->read_lock)) {
605 		if (errno == EDEADLK)
606 			perror("ProcessTCPPayload: read_lock blocked\n");
607 		assert(0);
608 	}
609 
610 	prev_rcv_nxt = cur_stream->rcv_nxt;
611 
612 	tcprb_t *rb = rcvvar->rcvbuf;
613 	loff_t off = seq2loff(rb, pctx->p.seq, (rcvvar->irs + 1));
614 	if (off >= 0) {
615 		if (!HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM) &&
616 			HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE))
617 			tcprb_setpile(rb, rb->pile + tcprb_cflen(rb));
618 
619 		/* policy on the buffer outrun case
620 		 * (1) has MOS_SOCK_MONITOR_STREAM_ACTIVE -> raise MOS_ON_ERROR first
621 		 *                                           and then overwrite payload
622 		 * (2) MOS_SOCK_STREAM only -> overwrite immediately
623 		 */
624 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE)) {
625 			int fflen = tcprb_fflen(rb, pctx->p.payload, pctx->p.payloadlen, off);
626 			if (fflen > 0) {
627 				/* if we detect buffer outrun, raise a MOS_ON_ERROR event.
628 				   then, the application can either consume read buffer data,
629 				   or increase the read buffer size */
630 				if (cur_stream->side == MOS_SIDE_CLI) {
631 					SOCKQ_FOREACH_REVERSE(walk, &cur_stream->msocks) {
632 					if (walk->monitor_stream->peek_offset[cur_stream->side]
633 						< rb->head + fflen)
634 						HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side,
635 									   pctx, MOS_ON_ERROR);
636 					} SOCKQ_FOREACH_END;
637 				} else {
638 					SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
639 						if (walk->monitor_stream->peek_offset[cur_stream->side]
640 							< rb->head + fflen)
641 						HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side,
642 									   pctx, MOS_ON_ERROR);
643 					} SOCKQ_FOREACH_END;
644 				}
645 			}
646 		}
647 		/* try writing packet payload to buffer */
648 		ret = tcprb_pwrite(rb, pctx->p.payload, pctx->p.payloadlen, off);
649 	}
650 	/* TODO: update monitor vars */
651 
652 	/*
653 	 * error can pop up due to disabled buffered management
654 	 * (only in monitor mode). In that case, ignore the warning
655 	 * message.
656 	 */
657 	if (ret < 0 && cur_stream->buffer_mgmt && mtcp->num_msp == 0)
658 		TRACE_ERROR("Cannot merge payload.\n");
659 
660 	/* discard the buffer if the state is FIN_WAIT_1 or FIN_WAIT_2,
661 	   meaning that the connection is already closed by the application */
662 	loff_t cftail = rb->pile + tcprb_cflen(rb);
663 	if (cur_stream->state == TCP_ST_FIN_WAIT_1 ||
664 	    cur_stream->state == TCP_ST_FIN_WAIT_2) {
665 		/* XXX: Do we really need to update recv vars? */
666 		tcprb_setpile(rb, cftail);
667 	}
668 	if (cftail > 0 && (rcvvar->irs + 1) + cftail > cur_stream->rcv_nxt) {
669 		RAISE_DEBUG_EVENT(mtcp, cur_stream,
670 				"Move rcv_nxt from %u to %u.\n",
671 				cur_stream->rcv_nxt, (rcvvar->irs + 1) + cftail);
672 		cur_stream->rcv_nxt = (rcvvar->irs + 1) + cftail;
673 	}
674 	assert(cftail - rb->pile >= 0);
675 	rcvvar->rcv_wnd = rb->len - (cftail - rb->pile);
676 
677 	if (read_lock)
678 		SBUF_UNLOCK(&rcvvar->read_lock);
679 
680 
681 	if (TCP_SEQ_LEQ(cur_stream->rcv_nxt, prev_rcv_nxt)) {
682 		/* There are some lost packets */
683 		return FALSE;
684 	}
685 
686 	TRACE_EPOLL("Stream %d data arrived. "
687 		    "len: %d, ET: %llu, IN: %llu, OUT: %llu\n",
688 		    cur_stream->id, pctx->p.payloadlen,
689 		    cur_stream->socket? (unsigned long long)cur_stream->socket->epoll & MOS_EPOLLET : 0,
690 		    cur_stream->socket? (unsigned long long)cur_stream->socket->epoll & MOS_EPOLLIN : 0,
691 		    cur_stream->socket? (unsigned long long)cur_stream->socket->epoll & MOS_EPOLLOUT : 0);
692 
693 	if (cur_stream->state == TCP_ST_ESTABLISHED)
694 		RaiseReadEvent(mtcp, cur_stream);
695 
696 	return TRUE;
697 }
698 /*----------------------------------------------------------------------------*/
699 static inline void
Handle_TCP_ST_LISTEN(mtcp_manager_t mtcp,tcp_stream * cur_stream,struct pkt_ctx * pctx)700 Handle_TCP_ST_LISTEN (mtcp_manager_t mtcp, tcp_stream* cur_stream,
701 		struct pkt_ctx *pctx)
702 {
703 
704 	const struct tcphdr* tcph = pctx->p.tcph;
705 
706 	if (tcph->syn) {
707 		if (cur_stream->state == TCP_ST_LISTEN)
708 			cur_stream->rcv_nxt++;
709 		cur_stream->state = TCP_ST_SYN_RCVD;
710 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE | MOS_ON_CONN_START;
711 		TRACE_STATE("Stream %d: TCP_ST_SYN_RCVD\n", cur_stream->id);
712 		cur_stream->actions |= MOS_ACT_SEND_CONTROL;
713 		if (IS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE)) {
714 			/**
715 			 * Passive stream context needs to initialize irs and rcv_nxt
716 			 * as it is not set neither during createserverstream or monitor
717 			 * creation.
718 			 */
719 			cur_stream->rcvvar->irs =
720 				cur_stream->rcv_nxt = pctx->p.seq;
721 		}
722 	} else {
723 		CTRACE_ERROR("Stream %d (TCP_ST_LISTEN): "
724 				"Packet without SYN.\n", cur_stream->id);
725 	}
726 
727 }
728 /*----------------------------------------------------------------------------*/
729 static inline void
Handle_TCP_ST_SYN_SENT(mtcp_manager_t mtcp,tcp_stream * cur_stream,struct pkt_ctx * pctx)730 Handle_TCP_ST_SYN_SENT (mtcp_manager_t mtcp, tcp_stream* cur_stream,
731 		struct pkt_ctx *pctx)
732 {
733 	const struct tcphdr* tcph = pctx->p.tcph;
734 
735 	/* when active open */
736 	if (tcph->ack) {
737 		/* filter the unacceptable acks */
738 		if (TCP_SEQ_LEQ(pctx->p.ack_seq, cur_stream->sndvar->iss)
739 #ifndef BE_RESILIENT_TO_PACKET_DROP
740 			|| TCP_SEQ_GT(pctx->p.ack_seq, cur_stream->snd_nxt)
741 #endif
742 				) {
743 			if (!tcph->rst) {
744 				cur_stream->actions |= MOS_ACT_SEND_RST;
745 			}
746 			return;
747 		}
748 		/* accept the ack */
749 		cur_stream->sndvar->snd_una++;
750 	}
751 
752 	if (tcph->rst) {
753 		if (tcph->ack) {
754 			cur_stream->state = TCP_ST_CLOSED_RSVD;
755 			cur_stream->close_reason = TCP_RESET;
756 			cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
757 			if (cur_stream->socket) {
758 				if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
759 					RaiseErrorEvent(mtcp, cur_stream);
760 			} else {
761 				cur_stream->actions |= MOS_ACT_DESTROY;
762 			}
763 		}
764 		return;
765 	}
766 
767 	if (tcph->ack
768 #ifndef BE_RESILIENT_TO_PACKET_DROP
769 		/* If we already lost SYNACK, let the ACK packet do the SYNACK's role */
770 		&& tcph->syn
771 #endif
772 	   ) {
773 		int ret = HandleActiveOpen(mtcp, cur_stream, pctx);
774 		if (!ret) {
775 			return;
776 		}
777 
778 #ifdef BE_RESILIENT_TO_PACKET_DROP
779 		if (!tcph->syn) {
780 			RAISE_DEBUG_EVENT(mtcp, cur_stream,
781 					"We missed SYNACK. Replace it with an ACK packet.\n");
782 			/* correct some variables */
783 			cur_stream->rcv_nxt = pctx->p.seq;
784 		}
785 #endif
786 
787 		cur_stream->sndvar->nrtx = 0;
788 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
789 			RemoveFromRTOList(mtcp, cur_stream);
790 		cur_stream->state = TCP_ST_ESTABLISHED;
791 		cur_stream->cb_events |= /*MOS_ON_CONN_SETUP |*/ MOS_ON_TCP_STATE_CHANGE;
792 		TRACE_STATE("Stream %d: TCP_ST_ESTABLISHED\n", cur_stream->id);
793 
794 		if (cur_stream->socket) {
795 			if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
796 				RaiseWriteEvent(mtcp, cur_stream);
797 		} else {
798 			TRACE_STATE("Stream %d: ESTABLISHED, but no socket\n", cur_stream->id);
799 			cur_stream->close_reason = TCP_ACTIVE_CLOSE;
800 			cur_stream->actions |= MOS_ACT_SEND_RST;
801 			cur_stream->actions |= MOS_ACT_DESTROY;
802 		}
803 		cur_stream->actions |= MOS_ACT_SEND_CONTROL;
804 		if (g_config.mos->tcp_timeout > 0)
805 			if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
806 				AddtoTimeoutList(mtcp, cur_stream);
807 
808 #ifdef BE_RESILIENT_TO_PACKET_DROP
809 		/* Handle this ack packet */
810 		if (!tcph->syn)
811 			Handle_TCP_ST_ESTABLISHED(mtcp, cur_stream, pctx);
812 #endif
813 
814 	} else if (tcph->syn) {
815 		cur_stream->state = TCP_ST_SYN_RCVD;
816 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
817 		TRACE_STATE("Stream %d: TCP_ST_SYN_RCVD\n", cur_stream->id);
818 		cur_stream->snd_nxt = cur_stream->sndvar->iss;
819 		cur_stream->actions |= MOS_ACT_SEND_CONTROL;
820 	}
821 }
822 /*----------------------------------------------------------------------------*/
823 static inline void
Handle_TCP_ST_SYN_RCVD(mtcp_manager_t mtcp,tcp_stream * cur_stream,struct pkt_ctx * pctx)824 Handle_TCP_ST_SYN_RCVD (mtcp_manager_t mtcp, tcp_stream* cur_stream,
825 		struct pkt_ctx *pctx)
826 {
827 	const struct tcphdr* tcph = pctx->p.tcph;
828 	struct tcp_send_vars *sndvar = cur_stream->sndvar;
829 	int ret;
830 
831 	if (tcph->ack) {
832 		uint32_t prior_cwnd;
833 		/* NOTE: We do not validate the ack number because first few packets
834 		 * can also come out of order */
835 
836 		sndvar->snd_una++;
837 		cur_stream->snd_nxt = pctx->p.ack_seq;
838 		prior_cwnd = sndvar->cwnd;
839 		sndvar->cwnd = ((prior_cwnd == 1)?
840 				(sndvar->mss * 2): sndvar->mss);
841 
842 		//UpdateRetransmissionTimer(mtcp, cur_stream, cur_ts);
843 		sndvar->nrtx = 0;
844 		cur_stream->rcv_nxt = cur_stream->rcvvar->irs + 1;
845 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
846 			RemoveFromRTOList(mtcp, cur_stream);
847 
848 		cur_stream->state = TCP_ST_ESTABLISHED;
849 		cur_stream->cb_events |= /*MOS_ON_CONN_SETUP |*/ MOS_ON_TCP_STATE_CHANGE;
850 		TRACE_STATE("Stream %d: TCP_ST_ESTABLISHED\n", cur_stream->id);
851 
852 #ifdef BE_RESILIENT_TO_PACKET_DROP
853 		if (pctx->p.ack_seq != sndvar->iss + 1)
854 			Handle_TCP_ST_ESTABLISHED(mtcp, cur_stream, pctx);
855 #endif
856 
857 		/* update listening socket */
858 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) {
859 
860 			struct tcp_listener *listener = mtcp->listener;
861 
862 			ret = StreamEnqueue(listener->acceptq, cur_stream);
863 			if (ret < 0) {
864 				TRACE_ERROR("Stream %d: Failed to enqueue to "
865 						"the listen backlog!\n", cur_stream->id);
866 				cur_stream->close_reason = TCP_NOT_ACCEPTED;
867 				cur_stream->state = TCP_ST_CLOSED_RSVD;
868 				cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
869 				TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", cur_stream->id);
870 				cur_stream->actions |= MOS_ACT_SEND_CONTROL;
871 			}
872 
873 			/* raise an event to the listening socket */
874 			if (listener->socket && (listener->socket->epoll & MOS_EPOLLIN)) {
875 				AddEpollEvent(mtcp->ep,
876 						MOS_EVENT_QUEUE, listener->socket, MOS_EPOLLIN);
877 			}
878 		}
879 
880 		//TRACE_DBG("Stream %d inserted into acceptq.\n", cur_stream->id);
881 		if (g_config.mos->tcp_timeout > 0)
882 			if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
883 				AddtoTimeoutList(mtcp, cur_stream);
884 
885 	} else {
886 		/* Handle retransmitted SYN packet */
887 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE) &&
888 			tcph->syn) {
889 			if (pctx->p.seq == cur_stream->pair_stream->sndvar->iss) {
890 				TRACE_DBG("syn retransmit! (p.seq = %u / iss = %u)\n",
891 						  pctx->p.seq, cur_stream->pair_stream->sndvar->iss);
892 				cur_stream->cb_events |= MOS_ON_REXMIT;
893 			}
894 		}
895 
896 		TRACE_DBG("Stream %d (TCP_ST_SYN_RCVD): No ACK.\n",
897 				cur_stream->id);
898 		/* retransmit SYN/ACK */
899 		cur_stream->snd_nxt = sndvar->iss;
900 		cur_stream->actions |= MOS_ACT_SEND_CONTROL;
901 	}
902 }
903 /*----------------------------------------------------------------------------*/
904 static inline void
Handle_TCP_ST_ESTABLISHED(mtcp_manager_t mtcp,tcp_stream * cur_stream,struct pkt_ctx * pctx)905 Handle_TCP_ST_ESTABLISHED (mtcp_manager_t mtcp, tcp_stream* cur_stream,
906 		struct pkt_ctx *pctx)
907 {
908 	const struct tcphdr* tcph = pctx->p.tcph;
909 
910 	if (tcph->syn) {
911 		/* Handle retransmitted SYNACK packet */
912 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE) &&
913 			tcph->ack) {
914 			if (pctx->p.seq == cur_stream->pair_stream->sndvar->iss) {
915 				TRACE_DBG("syn/ack retransmit! (p.seq = %u / iss = %u)\n",
916 						pctx->p.seq, cur_stream->pair_stream->sndvar->iss);
917 				cur_stream->cb_events |= MOS_ON_REXMIT;
918 			}
919 		}
920 
921 		TRACE_DBG("Stream %d (TCP_ST_ESTABLISHED): weird SYN. "
922 				"seq: %u, expected: %u, ack_seq: %u, expected: %u\n",
923 				cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt,
924 				pctx->p.ack_seq, cur_stream->snd_nxt);
925 		cur_stream->snd_nxt = pctx->p.ack_seq;
926 		cur_stream->actions |= MOS_ACT_SEND_CONTROL;
927 		return;
928 	}
929 
930 	if (pctx->p.payloadlen > 0) {
931 		if (ProcessTCPPayload(mtcp, cur_stream, pctx)) {
932 			/* if return is TRUE, send ACK */
933 			cur_stream->actions |= MOS_ACT_SEND_ACK_AGG;
934 		} else {
935 			cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
936 		}
937 	}
938 
939 	if (tcph->ack) {
940 		if (cur_stream->sndvar->sndbuf) {
941 			ProcessACK(mtcp, cur_stream, pctx);
942 		}
943 	}
944 
945 	if (tcph->fin) {
946 		/* process the FIN only if the sequence is valid */
947 		/* FIN packet is allowed to push payload (should we check for PSH flag)? */
948 		if (!cur_stream->buffer_mgmt ||
949 #ifdef BE_RESILIENT_TO_PACKET_DROP
950 			TCP_SEQ_GEQ(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt)
951 #else
952 			pctx->p.seq + pctx->p.payloadlen == cur_stream->rcv_nxt
953 #endif
954 			) {
955 #ifdef BE_RESILIENT_TO_PACKET_DROP
956 			cur_stream->rcv_nxt = pctx->p.seq + pctx->p.payloadlen;
957 #endif
958 			cur_stream->state = TCP_ST_CLOSE_WAIT;
959 			cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
960 			TRACE_STATE("Stream %d: TCP_ST_CLOSE_WAIT\n", cur_stream->id);
961 			cur_stream->rcv_nxt++;
962 			cur_stream->actions |= MOS_ACT_SEND_CONTROL;
963 
964 			/* notify FIN to application */
965 			RaiseReadEvent(mtcp, cur_stream);
966 		} else {
967 			RAISE_DEBUG_EVENT(mtcp, cur_stream,
968 					"Expected %u, but received %u (= %u + %u)\n",
969 					cur_stream->rcv_nxt, pctx->p.seq + pctx->p.payloadlen,
970 					pctx->p.seq, pctx->p.payloadlen);
971 
972 			cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
973 			return;
974 		}
975 	}
976 }
977 /*----------------------------------------------------------------------------*/
978 static inline void
Handle_TCP_ST_CLOSE_WAIT(mtcp_manager_t mtcp,tcp_stream * cur_stream,struct pkt_ctx * pctx)979 Handle_TCP_ST_CLOSE_WAIT (mtcp_manager_t mtcp, tcp_stream* cur_stream,
980 		struct pkt_ctx *pctx)
981 {
982 	if (TCP_SEQ_LT(pctx->p.seq, cur_stream->rcv_nxt)) {
983 		TRACE_DBG("Stream %d (TCP_ST_CLOSE_WAIT): "
984 				"weird seq: %u, expected: %u\n",
985 				cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt);
986 		cur_stream->actions |= MOS_ACT_SEND_CONTROL;
987 		return;
988 	}
989 
990 	if (cur_stream->sndvar->sndbuf) {
991 		ProcessACK(mtcp, cur_stream, pctx);
992 	}
993 }
994 /*----------------------------------------------------------------------------*/
995 static inline void
Handle_TCP_ST_LAST_ACK(mtcp_manager_t mtcp,tcp_stream * cur_stream,struct pkt_ctx * pctx)996 Handle_TCP_ST_LAST_ACK (mtcp_manager_t mtcp, tcp_stream* cur_stream,
997 		struct pkt_ctx *pctx)
998 {
999 	const struct tcphdr* tcph = pctx->p.tcph;
1000 
1001 	if (TCP_SEQ_LT(pctx->p.seq, cur_stream->rcv_nxt)) {
1002 		TRACE_DBG("Stream %d (TCP_ST_LAST_ACK): "
1003 				"weird seq: %u, expected: %u\n",
1004 				cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt);
1005 		return;
1006 	}
1007 
1008 	if (tcph->ack) {
1009 		if (cur_stream->sndvar->sndbuf) {
1010 			ProcessACK(mtcp, cur_stream, pctx);
1011 		}
1012 
1013 		if (!cur_stream->sndvar->is_fin_sent) {
1014 			/* the case that FIN is not sent yet */
1015 			/* this is not ack for FIN, ignore */
1016 			TRACE_DBG("Stream %d (TCP_ST_LAST_ACK): "
1017 					"No FIN sent yet.\n", cur_stream->id);
1018 #ifdef DBGMSG
1019 			DumpIPPacket(mtcp, pctx->p.iph, pctx->p.ip_len);
1020 #endif
1021 #if DUMP_STREAM
1022 			DumpStream(mtcp, cur_stream);
1023 			DumpControlList(mtcp, mtcp->n_sender[0]);
1024 #endif
1025 			return;
1026 		}
1027 
1028 		/* check if ACK of FIN */
1029 		if (pctx->p.ack_seq == cur_stream->sndvar->fss + 1) {
1030 			cur_stream->sndvar->snd_una++;
1031 			if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
1032 				UpdateRetransmissionTimer(mtcp, cur_stream, pctx->p.cur_ts);
1033 			cur_stream->state = TCP_ST_CLOSED_RSVD;
1034 			cur_stream->close_reason = TCP_PASSIVE_CLOSE;
1035 			cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1036 			TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n",
1037 					cur_stream->id);
1038 			cur_stream->actions |= MOS_ACT_DESTROY;
1039 		} else {
1040 			TRACE_DBG("Stream %d (TCP_ST_LAST_ACK): Not ACK of FIN. "
1041 					"ack_seq: %u, expected: %u\n",
1042 					cur_stream->id, pctx->p.ack_seq, cur_stream->sndvar->fss + 1);
1043 			//cur_stream->snd_nxt = cur_stream->sndvar->fss;
1044 			cur_stream->actions |= MOS_ACT_SEND_CONTROL;
1045 		}
1046 	} else {
1047 		TRACE_DBG("Stream %d (TCP_ST_LAST_ACK): No ACK\n",
1048 			  cur_stream->id);
1049 		//cur_stream->snd_nxt = cur_stream->sndvar->fss;
1050 		cur_stream->actions |= MOS_ACT_SEND_CONTROL;
1051 	}
1052 }
1053 /*----------------------------------------------------------------------------*/
1054 static inline void
Handle_TCP_ST_FIN_WAIT_1(mtcp_manager_t mtcp,tcp_stream * cur_stream,struct pkt_ctx * pctx)1055 Handle_TCP_ST_FIN_WAIT_1 (mtcp_manager_t mtcp, tcp_stream* cur_stream,
1056 		struct pkt_ctx *pctx)
1057 {
1058 	const struct tcphdr* tcph = pctx->p.tcph;
1059 
1060 	if (TCP_SEQ_LT(pctx->p.seq, cur_stream->rcv_nxt)) {
1061 		RAISE_DEBUG_EVENT(mtcp, cur_stream,
1062 				"Stream %d (FIN_WAIT_1): "
1063 				"weird seq: %u, expected: %u\n",
1064 				cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt);
1065 		cur_stream->actions |= MOS_ACT_SEND_CONTROL;
1066 		return;
1067 	}
1068 
1069 	if (tcph->ack) {
1070 		if (cur_stream->sndvar->sndbuf) {
1071 			ProcessACK(mtcp, cur_stream, pctx);
1072 		}
1073 #if BE_RESILIENT_TO_PACKET_DROP
1074 		if (cur_stream->sndvar->is_fin_sent &&
1075 			((!HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM) &&
1076 			 pctx->p.ack_seq == cur_stream->sndvar->fss) ||
1077 			 pctx->p.ack_seq == cur_stream->sndvar->fss + 1)) {
1078 
1079 #else
1080 		if (cur_stream->sndvar->is_fin_sent &&
1081 		    pctx->p.ack_seq == cur_stream->sndvar->fss + 1) {
1082 #endif
1083 			cur_stream->sndvar->snd_una = pctx->p.ack_seq;
1084 			if (TCP_SEQ_GT(pctx->p.ack_seq, cur_stream->snd_nxt)) {
1085 				TRACE_DBG("Stream %d: update snd_nxt to %u\n",
1086 						cur_stream->id, pctx->p.ack_seq);
1087 				cur_stream->snd_nxt = pctx->p.ack_seq;
1088 			}
1089 			//cur_stream->sndvar->snd_una++;
1090 			//UpdateRetransmissionTimer(mtcp, cur_stream, cur_ts);
1091 			cur_stream->sndvar->nrtx = 0;
1092 			if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
1093 				RemoveFromRTOList(mtcp, cur_stream);
1094 			cur_stream->state = TCP_ST_FIN_WAIT_2;
1095 			cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1096 			TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_2\n",
1097 					cur_stream->id);
1098 		} else {
1099 			RAISE_DEBUG_EVENT(mtcp, cur_stream,
1100 					"Failed to transit to FIN_WAIT_2, "
1101 					"is_fin_sent: %s, ack_seq: %u, sndvar->fss: %u\n",
1102 					cur_stream->sndvar->is_fin_sent ? "true" : "false",
1103 					pctx->p.ack_seq, cur_stream->sndvar->fss);
1104 		}
1105 
1106 	} else {
1107 		RAISE_DEBUG_EVENT(mtcp, cur_stream,
1108 				"Failed to transit to FIN_WAIT_2, "
1109 				"We got a %s%s%s%s%s%s packet.",
1110 				pctx->p.tcph->syn ? "S" : "",
1111 				pctx->p.tcph->fin ? "F" : "",
1112 				pctx->p.tcph->rst ? "R" : "",
1113 				pctx->p.tcph->psh ? "P" : "",
1114 				pctx->p.tcph->urg ? "U" : "",
1115 				pctx->p.tcph->ack ? "A" : "");
1116 
1117 		TRACE_DBG("Stream %d: does not contain an ack!\n",
1118 				cur_stream->id);
1119 		return;
1120 	}
1121 
1122 	if (pctx->p.payloadlen > 0) {
1123 		if (ProcessTCPPayload(mtcp, cur_stream, pctx)) {
1124 			/* if return is TRUE, send ACK */
1125 			cur_stream->actions |= MOS_ACT_SEND_ACK_AGG;
1126 		} else {
1127 			cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
1128 		}
1129 	}
1130 
1131 	if (tcph->fin) {
1132 		/* process the FIN only if the sequence is valid */
1133 		/* FIN packet is allowed to push payload (should we check for PSH flag)? */
1134 		if (!cur_stream->buffer_mgmt ||
1135 #ifdef BE_RESILIENT_TO_PACKET_DROP
1136 			TCP_SEQ_GEQ(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt)
1137 #else
1138 			pctx->p.seq + pctx->p.payloadlen == cur_stream->rcv_nxt
1139 #endif
1140 		   ) {
1141 			cur_stream->rcv_nxt++;
1142 
1143 			if (cur_stream->state == TCP_ST_FIN_WAIT_1) {
1144 				cur_stream->state = TCP_ST_CLOSING;
1145 				cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1146 				TRACE_STATE("Stream %d: TCP_ST_CLOSING\n", cur_stream->id);
1147 
1148 			} else if (cur_stream->state == TCP_ST_FIN_WAIT_2) {
1149 				cur_stream->state = TCP_ST_TIME_WAIT;
1150 				cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1151 				TRACE_STATE("Stream %d: TCP_ST_TIME_WAIT\n", cur_stream->id);
1152 				AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts);
1153 			}
1154 			cur_stream->actions |= MOS_ACT_SEND_CONTROL;
1155 		} else {
1156 			RAISE_DEBUG_EVENT(mtcp, cur_stream,
1157 					"Expected %u, but received %u (= %u + %u)\n",
1158 					cur_stream->rcv_nxt, pctx->p.seq + pctx->p.payloadlen,
1159 					pctx->p.seq, pctx->p.payloadlen);
1160 
1161 			cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
1162 			return;
1163 		}
1164 	}
1165 }
1166 /*----------------------------------------------------------------------------*/
1167 static inline void
1168 Handle_TCP_ST_FIN_WAIT_2 (mtcp_manager_t mtcp, tcp_stream* cur_stream,
1169 		struct pkt_ctx *pctx)
1170 {
1171 	const struct tcphdr* tcph = pctx->p.tcph;
1172 
1173 	if (tcph->ack) {
1174 		if (cur_stream->sndvar->sndbuf) {
1175 			ProcessACK(mtcp, cur_stream, pctx);
1176 		}
1177 	} else {
1178 		TRACE_DBG("Stream %d: does not contain an ack!\n",
1179 				cur_stream->id);
1180 		return;
1181 	}
1182 
1183 	if (pctx->p.payloadlen > 0) {
1184 		if (ProcessTCPPayload(mtcp, cur_stream, pctx)) {
1185 			/* if return is TRUE, send ACK */
1186 			cur_stream->actions |= MOS_ACT_SEND_ACK_AGG;
1187 		} else {
1188 			cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
1189 		}
1190 	}
1191 
1192 	if (tcph->fin) {
1193 		/* process the FIN only if the sequence is valid */
1194 		/* FIN packet is allowed to push payload (should we check for PSH flag)? */
1195 		if (!cur_stream->buffer_mgmt ||
1196 #ifdef BE_RESILIENT_TO_PACKET_DROP
1197 			TCP_SEQ_GEQ(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt)
1198 #else
1199 			pctx->p.seq + pctx->p.payloadlen == cur_stream->rcv_nxt
1200 #endif
1201 			) {
1202 			cur_stream->state = TCP_ST_TIME_WAIT;
1203 			cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1204 			cur_stream->rcv_nxt++;
1205 			TRACE_STATE("Stream %d: TCP_ST_TIME_WAIT\n", cur_stream->id);
1206 
1207 			AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts);
1208 			cur_stream->actions |= MOS_ACT_SEND_CONTROL;
1209 		}
1210 	} else {
1211 		TRACE_DBG("Stream %d (TCP_ST_FIN_WAIT_2): No FIN. "
1212 				"seq: %u, ack_seq: %u, snd_nxt: %u, snd_una: %u\n",
1213 				cur_stream->id, pctx->p.seq, pctx->p.ack_seq,
1214 				cur_stream->snd_nxt, cur_stream->sndvar->snd_una);
1215 #if DBGMSG
1216 		DumpIPPacket(mtcp, pctx->p.iph, pctx->p.ip_len);
1217 #endif
1218 	}
1219 
1220 }
1221 /*----------------------------------------------------------------------------*/
1222 static inline void
1223 Handle_TCP_ST_CLOSING (mtcp_manager_t mtcp, tcp_stream* cur_stream,
1224 		struct pkt_ctx *pctx)
1225 {
1226 	const struct tcphdr* tcph = pctx->p.tcph;
1227 
1228 	if (tcph->ack) {
1229 		if (cur_stream->sndvar->sndbuf) {
1230 			ProcessACK(mtcp, cur_stream, pctx);
1231 		}
1232 
1233 		if (!cur_stream->sndvar->is_fin_sent) {
1234 			TRACE_DBG("Stream %d (TCP_ST_CLOSING): "
1235 					"No FIN sent yet.\n", cur_stream->id);
1236 			return;
1237 		}
1238 
1239 		// check if ACK of FIN
1240 		if (pctx->p.ack_seq != cur_stream->sndvar->fss + 1) {
1241 #if DBGMSG
1242 			TRACE_DBG("Stream %d (TCP_ST_CLOSING): Not ACK of FIN. "
1243 				  "ack_seq: %u, snd_nxt: %u, snd_una: %u, fss: %u\n",
1244 				  cur_stream->id, pctx->p.ack_seq, cur_stream->snd_nxt,
1245 				  cur_stream->sndvar->snd_una, cur_stream->sndvar->fss);
1246 			DumpIPPacketToFile(stderr, pctx->p.iph, pctx->p.ip_len);
1247 			DumpStream(mtcp, cur_stream);
1248 #endif
1249 			//assert(0);
1250 			/* if the packet is not the ACK of FIN, ignore */
1251 			return;
1252 		}
1253 
1254 		cur_stream->sndvar->snd_una = pctx->p.ack_seq;
1255 		if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
1256 			UpdateRetransmissionTimer(mtcp, cur_stream, pctx->p.cur_ts);
1257 		cur_stream->state = TCP_ST_TIME_WAIT;
1258 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1259 		TRACE_STATE("Stream %d: TCP_ST_TIME_WAIT\n", cur_stream->id);
1260 
1261 		AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts);
1262 
1263 	} else {
1264 		TRACE_DBG("Stream %d (TCP_ST_CLOSING): Not ACK\n",
1265 			  cur_stream->id);
1266 		return;
1267 	}
1268 }
1269 /*----------------------------------------------------------------------------*/
1270 void
1271 UpdateRecvTCPContext(mtcp_manager_t mtcp, struct tcp_stream *cur_stream,
1272 		struct pkt_ctx *pctx)
1273 {
1274 	struct tcphdr* tcph = pctx->p.tcph;
1275 	int ret;
1276 
1277 	assert(cur_stream);
1278 
1279 	/* Validate sequence. if not valid, ignore the packet */
1280 	if (cur_stream->state > TCP_ST_SYN_RCVD) {
1281 
1282 		ret = ValidateSequence(mtcp, cur_stream, pctx);
1283 		if (!ret) {
1284 			TRACE_DBG("Stream %d: Unexpected sequence: %u, expected: %u\n",
1285 					cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt);
1286 #ifdef DBGMSG
1287 			DumpIPPacket(mtcp, pctx->p.iph, pctx->p.ip_len);
1288 #endif
1289 #if DUMP_STREAM
1290 			DumpStream(mtcp, cur_stream);
1291 #endif
1292 			/* cur_stream->cb_events |= MOS_ON_ERROR; */
1293 		}
1294 	}
1295 	/* Update receive window size */
1296 	if (tcph->syn) {
1297 		cur_stream->sndvar->peer_wnd = pctx->p.window;
1298 	} else {
1299 		cur_stream->sndvar->peer_wnd =
1300 				(uint32_t)pctx->p.window << cur_stream->sndvar->wscale_peer;
1301 	}
1302 
1303 	cur_stream->last_active_ts = pctx->p.cur_ts;
1304 	if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
1305 		UpdateTimeoutList(mtcp, cur_stream);
1306 
1307 	/* Process RST: process here only if state > TCP_ST_SYN_SENT */
1308 	if (tcph->rst) {
1309 		cur_stream->have_reset = TRUE;
1310 		if (cur_stream->state > TCP_ST_SYN_SENT) {
1311 			if (ProcessRST(mtcp, cur_stream, pctx)) {
1312 				return;
1313 			}
1314 		}
1315 	}
1316 
1317 	if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE) &&
1318 		pctx->p.tcph->fin) {
1319 		if (cur_stream->state == TCP_ST_CLOSE_WAIT ||
1320 			cur_stream->state == TCP_ST_LAST_ACK ||
1321 			cur_stream->state == TCP_ST_CLOSING ||
1322 			cur_stream->state == TCP_ST_TIME_WAIT) {
1323 			/* Handle retransmitted FIN packet */
1324 			if (pctx->p.seq == cur_stream->pair_stream->sndvar->fss) {
1325 				TRACE_DBG("FIN retransmit! (seq = %u / fss = %u)\n",
1326 						pctx->p.seq, cur_stream->pair_stream->sndvar->fss);
1327 				cur_stream->cb_events |= MOS_ON_REXMIT;
1328 			}
1329 		}
1330 	}
1331 
1332 	switch (cur_stream->state) {
1333 	case TCP_ST_LISTEN:
1334 		Handle_TCP_ST_LISTEN(mtcp, cur_stream, pctx);
1335 		break;
1336 
1337 	case TCP_ST_SYN_SENT:
1338 		Handle_TCP_ST_SYN_SENT(mtcp, cur_stream, pctx);
1339 		break;
1340 
1341 	case TCP_ST_SYN_RCVD:
1342 		/* SYN retransmit implies our SYN/ACK was lost. Resend */
1343 		if (tcph->syn && pctx->p.seq == cur_stream->rcvvar->irs)
1344 			Handle_TCP_ST_LISTEN(mtcp, cur_stream, pctx);
1345 		else {
1346 			Handle_TCP_ST_SYN_RCVD(mtcp, cur_stream, pctx);
1347 			if (pctx->p.payloadlen > 0 && cur_stream->state == TCP_ST_ESTABLISHED)
1348 				Handle_TCP_ST_ESTABLISHED(mtcp, cur_stream, pctx);
1349 		}
1350 		break;
1351 
1352 	case TCP_ST_ESTABLISHED:
1353 		Handle_TCP_ST_ESTABLISHED(mtcp, cur_stream, pctx);
1354 		break;
1355 
1356 	case TCP_ST_CLOSE_WAIT:
1357 		Handle_TCP_ST_CLOSE_WAIT(mtcp, cur_stream, pctx);
1358 		break;
1359 
1360 	case TCP_ST_LAST_ACK:
1361 		Handle_TCP_ST_LAST_ACK(mtcp, cur_stream, pctx);
1362 		break;
1363 
1364 	case TCP_ST_FIN_WAIT_1:
1365 		Handle_TCP_ST_FIN_WAIT_1(mtcp, cur_stream, pctx);
1366 		break;
1367 
1368 	case TCP_ST_FIN_WAIT_2:
1369 		Handle_TCP_ST_FIN_WAIT_2(mtcp, cur_stream, pctx);
1370 		break;
1371 
1372 	case TCP_ST_CLOSING:
1373 		Handle_TCP_ST_CLOSING(mtcp, cur_stream, pctx);
1374 		break;
1375 
1376 	case TCP_ST_TIME_WAIT:
1377 		/* the only thing that can arrive in this state is a retransmission
1378 		   of the remote FIN. Acknowledge it, and restart the 2 MSL timeout */
1379 		if (cur_stream->on_timewait_list) {
1380 			RemoveFromTimewaitList(mtcp, cur_stream);
1381 			AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts);
1382 		}
1383 		cur_stream->actions |= MOS_ACT_SEND_CONTROL;
1384 		break;
1385 
1386 	case TCP_ST_CLOSED:
1387 	case TCP_ST_CLOSED_RSVD:
1388 		break;
1389 
1390 	default:
1391 		break;
1392 	}
1393 
1394 	TRACE_STATE("Stream %d: Events: %0lx, Action: %0x\n",
1395 			cur_stream->id, cur_stream->cb_events, cur_stream->actions);
1396 	return;
1397 }
1398 /*----------------------------------------------------------------------------*/
1399 void
1400 DoActionEndTCPPacket(mtcp_manager_t mtcp, struct tcp_stream *cur_stream,
1401 		struct pkt_ctx *pctx)
1402 {
1403 	int i;
1404 
1405 	for (i = 1; i < MOS_ACT_CNT; i = i << 1) {
1406 
1407 		if (cur_stream->actions & i) {
1408 			switch(i) {
1409 			case MOS_ACT_SEND_DATA:
1410 				AddtoSendList(mtcp, cur_stream);
1411 				break;
1412 			case MOS_ACT_SEND_ACK_NOW:
1413 				EnqueueACK(mtcp, cur_stream, pctx->p.cur_ts, ACK_OPT_NOW);
1414 				break;
1415 			case MOS_ACT_SEND_ACK_AGG:
1416 				EnqueueACK(mtcp, cur_stream, pctx->p.cur_ts, ACK_OPT_AGGREGATE);
1417 				break;
1418 			case MOS_ACT_SEND_CONTROL:
1419 				AddtoControlList(mtcp, cur_stream, pctx->p.cur_ts);
1420 				break;
1421 			case MOS_ACT_SEND_RST:
1422 				if (cur_stream->state <= TCP_ST_SYN_SENT)
1423 					SendTCPPacketStandalone(mtcp,
1424 							pctx->p.iph->daddr, pctx->p.tcph->dest,
1425 							pctx->p.iph->saddr, pctx->p.tcph->source,
1426 							0, pctx->p.seq + 1, 0, TCP_FLAG_RST | TCP_FLAG_ACK,
1427 							NULL, 0, pctx->p.cur_ts, 0, 0, -1);
1428 				else
1429 					SendTCPPacketStandalone(mtcp,
1430 							pctx->p.iph->daddr, pctx->p.tcph->dest,
1431 							pctx->p.iph->saddr, pctx->p.tcph->source,
1432 							pctx->p.ack_seq, 0, 0, TCP_FLAG_RST | TCP_FLAG_ACK,
1433 							NULL, 0, pctx->p.cur_ts, 0, 0, -1);
1434 				break;
1435 			case MOS_ACT_DESTROY:
1436 				DestroyTCPStream(mtcp, cur_stream);
1437 				break;
1438 			default:
1439 				assert(1);
1440 				break;
1441 			}
1442 		}
1443 	}
1444 
1445 	cur_stream->actions = 0;
1446 }
1447 /*----------------------------------------------------------------------------*/
1448 /**
1449  * Called (when monitoring mode is enabled).. for every outgoing packet to the
1450  * NIC.
1451  */
1452 inline void
1453 UpdatePassiveRecvTCPContext(mtcp_manager_t mtcp, struct tcp_stream *cur_stream,
1454 			       struct pkt_ctx *pctx)
1455 {
1456 	UpdateRecvTCPContext(mtcp, cur_stream, pctx);
1457 }
1458 /*----------------------------------------------------------------------------*/
1459 /* NOTE TODO: This event prediction is additional overhaed of HK_RCV hook.
1460  * We can transparently optimize this by disabling prediction of events which
1461  * are not monitored by anyone. */
1462 inline void
1463 PreRecvTCPEventPrediction(mtcp_manager_t mtcp, struct pkt_ctx *pctx,
1464 			  struct tcp_stream *recvside_stream)
1465 {
1466 	tcp_rb_overlapchk(mtcp, pctx, recvside_stream);
1467 }
1468 /*----------------------------------------------------------------------------*/
1469