1 #include <assert.h>
2
3 #include "mos_api.h"
4 #include "tcp_util.h"
5 #include "tcp_in.h"
6 #include "tcp_out.h"
7 #include "tcp_ring_buffer.h"
8 #include "eventpoll.h"
9 #include "debug.h"
10 #include "timer.h"
11 #include "ip_in.h"
12 #include "tcp_rb.h"
13 #include "config.h"
14 #include "scalable_event.h"
15
16 #define MAX(a, b) ((a)>(b)?(a):(b))
17 #define MIN(a, b) ((a)<(b)?(a):(b))
18
19 #define RECOVERY_AFTER_LOSS TRUE
20 #define SELECTIVE_WRITE_EVENT_NOTIFY TRUE
21 /*----------------------------------------------------------------------------*/
22 static inline void
23 Handle_TCP_ST_ESTABLISHED (mtcp_manager_t mtcp, tcp_stream* cur_stream,
24 struct pkt_ctx *pctx);
25 /*----------------------------------------------------------------------------*/
26 static inline int
FilterSYNPacket(mtcp_manager_t mtcp,uint32_t ip,uint16_t port)27 FilterSYNPacket(mtcp_manager_t mtcp, uint32_t ip, uint16_t port)
28 {
29 struct sockaddr_in *addr;
30
31 /* TODO: This listening logic should be revised */
32
33 /* if not listening, drop */
34 if (!mtcp->listener) {
35 return FALSE;
36 }
37
38 /* if not the address we want, drop */
39 addr = &mtcp->listener->socket->saddr;
40 if (addr->sin_port == port) {
41 if (addr->sin_addr.s_addr != INADDR_ANY) {
42 if (ip == addr->sin_addr.s_addr) {
43 return TRUE;
44 }
45 return FALSE;
46 } else {
47 int i;
48
49 for (i = 0; i < g_config.mos->netdev_table->num; i++) {
50 if (ip == g_config.mos->netdev_table->ent[i]->ip_addr) {
51 return TRUE;
52 }
53 }
54 return FALSE;
55 }
56 }
57
58 return FALSE;
59 }
60 /*----------------------------------------------------------------------------*/
61 static inline int
HandleActiveOpen(mtcp_manager_t mtcp,tcp_stream * cur_stream,struct pkt_ctx * pctx)62 HandleActiveOpen(mtcp_manager_t mtcp, tcp_stream *cur_stream,
63 struct pkt_ctx *pctx)
64 {
65 const struct tcphdr* tcph = pctx->p.tcph;
66
67 cur_stream->rcvvar->irs = pctx->p.seq;
68 cur_stream->snd_nxt = pctx->p.ack_seq;
69 cur_stream->sndvar->peer_wnd = pctx->p.window;
70 cur_stream->rcvvar->snd_wl1 = cur_stream->rcvvar->irs - 1;
71 cur_stream->rcv_nxt = cur_stream->rcvvar->irs + 1;
72 cur_stream->rcvvar->last_ack_seq = pctx->p.ack_seq;
73 ParseTCPOptions(cur_stream, pctx->p.cur_ts, (uint8_t *)tcph + TCP_HEADER_LEN,
74 (tcph->doff << 2) - TCP_HEADER_LEN);
75 cur_stream->sndvar->cwnd = ((cur_stream->sndvar->cwnd == 1)?
76 (cur_stream->sndvar->mss * 2): cur_stream->sndvar->mss);
77 cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10;
78 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
79 UpdateRetransmissionTimer(mtcp, cur_stream, pctx->p.cur_ts);
80
81 return TRUE;
82 }
83 /*----------------------------------------------------------------------------*/
84 /* ValidateSequence: validates sequence number of the segment */
85 /* Return: TRUE if acceptable, FALSE if not acceptable */
86 /*----------------------------------------------------------------------------*/
87 static inline int
ValidateSequence(mtcp_manager_t mtcp,tcp_stream * cur_stream,struct pkt_ctx * pctx)88 ValidateSequence(mtcp_manager_t mtcp, tcp_stream *cur_stream,
89 struct pkt_ctx *pctx)
90 {
91 const struct tcphdr* tcph = pctx->p.tcph;
92
93 /* Protect Against Wrapped Sequence number (PAWS) */
94 if (!tcph->rst && cur_stream->saw_timestamp) {
95 struct tcp_timestamp ts;
96
97 if (!ParseTCPTimestamp(cur_stream, &ts,
98 (uint8_t *)tcph + TCP_HEADER_LEN,
99 (tcph->doff << 2) - TCP_HEADER_LEN)) {
100 /* if there is no timestamp */
101 /* TODO: implement here */
102 TRACE_DBG("No timestamp found.\n");
103 return FALSE;
104 }
105
106 /* RFC1323: if SEG.TSval < TS.Recent, drop and send ack */
107 if (TCP_SEQ_LT(ts.ts_val, cur_stream->rcvvar->ts_recent)) {
108 /* TODO: ts_recent should be invalidated
109 before timestamp wraparound for long idle flow */
110 TRACE_DBG("PAWS Detect wrong timestamp. "
111 "seq: %u, ts_val: %u, prev: %u\n",
112 pctx->p.seq, ts.ts_val, cur_stream->rcvvar->ts_recent);
113 cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
114 return FALSE;
115 } else {
116 /* valid timestamp */
117 if (TCP_SEQ_GT(ts.ts_val, cur_stream->rcvvar->ts_recent)) {
118 TRACE_TSTAMP("Timestamp update. cur: %u, prior: %u "
119 "(time diff: %uus)\n",
120 ts.ts_val, cur_stream->rcvvar->ts_recent,
121 TS_TO_USEC(pctx->p.cur_ts - cur_stream->rcvvar->ts_last_ts_upd));
122 cur_stream->rcvvar->ts_last_ts_upd = pctx->p.cur_ts;
123 }
124
125 cur_stream->rcvvar->ts_recent = ts.ts_val;
126 cur_stream->rcvvar->ts_lastack_rcvd = ts.ts_ref;
127 }
128 }
129
130 /* TCP sequence validation */
131 if (!TCP_SEQ_BETWEEN(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt,
132 cur_stream->rcv_nxt + cur_stream->rcvvar->rcv_wnd)) {
133
134 /* if RST bit is set, ignore the segment */
135 if (tcph->rst)
136 return FALSE;
137
138 if (cur_stream->state == TCP_ST_ESTABLISHED) {
139 /* check if it is to get window advertisement */
140 if (pctx->p.seq + 1 == cur_stream->rcv_nxt) {
141 TRACE_DBG("Window update request. (seq: %u, rcv_wnd: %u)\n",
142 pctx->p.seq, cur_stream->rcvvar->rcv_wnd);
143 cur_stream->actions |= MOS_ACT_SEND_ACK_AGG;
144 return FALSE;
145
146 }
147
148 if (TCP_SEQ_LEQ(pctx->p.seq, cur_stream->rcv_nxt)) {
149 cur_stream->actions |= MOS_ACT_SEND_ACK_AGG;
150 } else {
151 cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
152 }
153 } else {
154 if (cur_stream->state == TCP_ST_TIME_WAIT) {
155 TRACE_DBG("Stream %d: tw expire update to %u\n",
156 cur_stream->id, cur_stream->rcvvar->ts_tw_expire);
157 AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts);
158 }
159 cur_stream->actions |= MOS_ACT_SEND_CONTROL;
160 }
161 return FALSE;
162 }
163
164 return TRUE;
165 }
166 /*----------------------------------------------------------------------------*/
167 static inline void
NotifyConnectionReset(mtcp_manager_t mtcp,tcp_stream * cur_stream)168 NotifyConnectionReset(mtcp_manager_t mtcp, tcp_stream *cur_stream)
169 {
170 TRACE_DBG("Stream %d: Notifying connection reset.\n", cur_stream->id);
171 /* TODO: implement this function */
172 /* signal to user "connection reset" */
173 }
174 /*----------------------------------------------------------------------------*/
175 static inline int
ProcessRST(mtcp_manager_t mtcp,tcp_stream * cur_stream,struct pkt_ctx * pctx)176 ProcessRST(mtcp_manager_t mtcp, tcp_stream *cur_stream,
177 struct pkt_ctx *pctx)
178 {
179 /* TODO: we need reset validation logic */
180 /* the sequence number of a RST should be inside window */
181 /* (in SYN_SENT state, it should ack the previous SYN */
182
183 TRACE_DBG("Stream %d: TCP RESET (%s)\n",
184 cur_stream->id, TCPStateToString(cur_stream));
185 #if DUMP_STREAM
186 DumpStream(mtcp, cur_stream);
187 #endif
188
189 if (cur_stream->state <= TCP_ST_SYN_SENT) {
190 /* not handled here */
191 return FALSE;
192 }
193
194 if (cur_stream->state == TCP_ST_SYN_RCVD) {
195 /* ACK number of last sent ACK packet == rcv_nxt + 1*/
196 if (pctx->p.seq == 0 ||
197 #ifdef BE_RESILIENT_TO_PACKET_DROP
198 pctx->p.seq == cur_stream->rcv_nxt + 1 ||
199 #endif
200 pctx->p.ack_seq == cur_stream->snd_nxt)
201 {
202 cur_stream->state = TCP_ST_CLOSED_RSVD;
203 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
204 cur_stream->close_reason = TCP_RESET;
205 cur_stream->actions |= MOS_ACT_DESTROY;
206 } else {
207 RAISE_DEBUG_EVENT(mtcp, cur_stream,
208 "(SYN_RCVD): Ignore invalid RST. "
209 "ack_seq expected: %u, ack_seq rcvd: %u\n",
210 cur_stream->rcv_nxt + 1, pctx->p.ack_seq);
211 }
212 return TRUE;
213 }
214
215 /* if the application is already closed the connection,
216 just destroy the it */
217 if (cur_stream->state == TCP_ST_FIN_WAIT_1 ||
218 cur_stream->state == TCP_ST_FIN_WAIT_2 ||
219 cur_stream->state == TCP_ST_LAST_ACK ||
220 cur_stream->state == TCP_ST_CLOSING ||
221 cur_stream->state == TCP_ST_TIME_WAIT) {
222 cur_stream->state = TCP_ST_CLOSED_RSVD;
223 cur_stream->close_reason = TCP_ACTIVE_CLOSE;
224 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
225 cur_stream->actions |= MOS_ACT_DESTROY;
226 return TRUE;
227 }
228
229 if (cur_stream->state >= TCP_ST_ESTABLISHED &&
230 cur_stream->state <= TCP_ST_CLOSE_WAIT) {
231 /* ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
232 /* TODO: flush all the segment queues */
233 //NotifyConnectionReset(mtcp, cur_stream);
234 }
235
236 if (!(cur_stream->sndvar->on_closeq || cur_stream->sndvar->on_closeq_int ||
237 cur_stream->sndvar->on_resetq || cur_stream->sndvar->on_resetq_int)) {
238 //cur_stream->state = TCP_ST_CLOSED_RSVD;
239 //cur_stream->actions |= MOS_ACT_DESTROY;
240 cur_stream->state = TCP_ST_CLOSED_RSVD;
241 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
242 cur_stream->close_reason = TCP_RESET;
243 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
244 RaiseCloseEvent(mtcp, cur_stream);
245 }
246
247 return TRUE;
248 }
249 /*----------------------------------------------------------------------------*/
250 inline void
EstimateRTT(mtcp_manager_t mtcp,tcp_stream * cur_stream,uint32_t mrtt)251 EstimateRTT(mtcp_manager_t mtcp, tcp_stream *cur_stream, uint32_t mrtt)
252 {
253 /* This function should be called for not retransmitted packets */
254 /* TODO: determine tcp_rto_min */
255 #define TCP_RTO_MIN 0
256 long m = mrtt;
257 uint32_t tcp_rto_min = TCP_RTO_MIN;
258 struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
259
260 if (m == 0) {
261 m = 1;
262 }
263 if (rcvvar->srtt != 0) {
264 /* rtt = 7/8 rtt + 1/8 new */
265 m -= (rcvvar->srtt >> 3);
266 rcvvar->srtt += m;
267 if (m < 0) {
268 m = -m;
269 m -= (rcvvar->mdev >> 2);
270 if (m > 0) {
271 m >>= 3;
272 }
273 } else {
274 m -= (rcvvar->mdev >> 2);
275 }
276 rcvvar->mdev += m;
277 if (rcvvar->mdev > rcvvar->mdev_max) {
278 rcvvar->mdev_max = rcvvar->mdev;
279 if (rcvvar->mdev_max > rcvvar->rttvar) {
280 rcvvar->rttvar = rcvvar->mdev_max;
281 }
282 }
283 if (TCP_SEQ_GT(cur_stream->sndvar->snd_una, rcvvar->rtt_seq)) {
284 if (rcvvar->mdev_max < rcvvar->rttvar) {
285 rcvvar->rttvar -= (rcvvar->rttvar - rcvvar->mdev_max) >> 2;
286 }
287 rcvvar->rtt_seq = cur_stream->snd_nxt;
288 rcvvar->mdev_max = tcp_rto_min;
289 }
290 } else {
291 /* fresh measurement */
292 rcvvar->srtt = m << 3;
293 rcvvar->mdev = m << 1;
294 rcvvar->mdev_max = rcvvar->rttvar = MAX(rcvvar->mdev, tcp_rto_min);
295 rcvvar->rtt_seq = cur_stream->snd_nxt;
296 }
297
298 TRACE_RTT("mrtt: %u (%uus), srtt: %u (%ums), mdev: %u, mdev_max: %u, "
299 "rttvar: %u, rtt_seq: %u\n", mrtt, mrtt * TIME_TICK,
300 rcvvar->srtt, TS_TO_MSEC((rcvvar->srtt) >> 3), rcvvar->mdev,
301 rcvvar->mdev_max, rcvvar->rttvar, rcvvar->rtt_seq);
302 }
303 /*----------------------------------------------------------------------------*/
304 static inline void
ProcessACK(mtcp_manager_t mtcp,tcp_stream * cur_stream,struct pkt_ctx * pctx)305 ProcessACK(mtcp_manager_t mtcp, tcp_stream *cur_stream,
306 struct pkt_ctx *pctx)
307 {
308 const struct tcphdr* tcph = pctx->p.tcph;
309 uint32_t seq = pctx->p.seq;
310 uint32_t ack_seq = pctx->p.ack_seq;
311 struct tcp_send_vars *sndvar = cur_stream->sndvar;
312 uint32_t cwindow, cwindow_prev;
313 uint32_t rmlen;
314 uint32_t snd_wnd_prev;
315 uint32_t right_wnd_edge;
316 uint8_t dup;
317
318 cwindow = pctx->p.window;
319 if (!tcph->syn) {
320 cwindow = cwindow << sndvar->wscale_peer;
321 }
322 right_wnd_edge = sndvar->peer_wnd + cur_stream->rcvvar->snd_wl2;
323
324 if (cur_stream->state == TCP_ST_FIN_WAIT_1 ||
325 cur_stream->state == TCP_ST_FIN_WAIT_2 ||
326 cur_stream->state == TCP_ST_CLOSING ||
327 cur_stream->state == TCP_ST_CLOSE_WAIT ||
328 cur_stream->state == TCP_ST_LAST_ACK) {
329 if (sndvar->is_fin_sent && ack_seq == sndvar->fss + 1) {
330 ack_seq--;
331 }
332 }
333
334 /* If ack overs the sending buffer, return */
335 if (TCP_SEQ_GT(ack_seq, sndvar->sndbuf->head_seq + sndvar->sndbuf->len)) {
336 TRACE_DBG("Stream %d (%s): invalid acknologement. "
337 "ack_seq: %u, possible max_ack_seq: %u\n", cur_stream->id,
338 TCPStateToString(cur_stream), ack_seq,
339 sndvar->sndbuf->head_seq + sndvar->sndbuf->len);
340 return;
341 }
342
343 #ifdef BE_RESILIENT_TO_PACKET_DROP
344 if (TCP_SEQ_GT(seq + pctx->p.payloadlen, cur_stream->rcv_nxt))
345 cur_stream->rcv_nxt = seq + pctx->p.payloadlen;
346 #endif
347
348 /* Update window */
349 if (TCP_SEQ_LT(cur_stream->rcvvar->snd_wl1, seq) ||
350 (cur_stream->rcvvar->snd_wl1 == seq &&
351 TCP_SEQ_LT(cur_stream->rcvvar->snd_wl2, ack_seq)) ||
352 (cur_stream->rcvvar->snd_wl2 == ack_seq &&
353 cwindow > sndvar->peer_wnd)) {
354 cwindow_prev = sndvar->peer_wnd;
355 sndvar->peer_wnd = cwindow;
356 cur_stream->rcvvar->snd_wl1 = seq;
357 cur_stream->rcvvar->snd_wl2 = ack_seq;
358 TRACE_CLWND("Window update. "
359 "ack: %u, peer_wnd: %u, snd_nxt-snd_una: %u\n",
360 ack_seq, cwindow, cur_stream->snd_nxt - sndvar->snd_una);
361 if (cwindow_prev < cur_stream->snd_nxt - sndvar->snd_una &&
362 sndvar->peer_wnd >= cur_stream->snd_nxt - sndvar->snd_una) {
363 TRACE_CLWND("%u Broadcasting client window update! "
364 "ack_seq: %u, peer_wnd: %u (before: %u), "
365 "(snd_nxt - snd_una: %u)\n",
366 cur_stream->id, ack_seq, sndvar->peer_wnd, cwindow_prev,
367 cur_stream->snd_nxt - sndvar->snd_una);
368 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
369 RaiseWriteEvent(mtcp, cur_stream);
370 }
371 }
372
373 /* Check duplicated ack count */
374 /* Duplicated ack if
375 1) ack_seq is old
376 2) payload length is 0.
377 3) advertised window not changed.
378 4) there is outstanding unacknowledged data
379 5) ack_seq == snd_una
380 */
381
382 dup = FALSE;
383 if (TCP_SEQ_LT(ack_seq, cur_stream->snd_nxt)) {
384 if (ack_seq == cur_stream->rcvvar->last_ack_seq && pctx->p.payloadlen == 0) {
385 if (cur_stream->rcvvar->snd_wl2 + sndvar->peer_wnd == right_wnd_edge) {
386 if (cur_stream->rcvvar->dup_acks + 1 > cur_stream->rcvvar->dup_acks) {
387 cur_stream->rcvvar->dup_acks++;
388 }
389 dup = TRUE;
390 }
391 }
392 }
393 if (!dup) {
394 cur_stream->rcvvar->dup_acks = 0;
395 cur_stream->rcvvar->last_ack_seq = ack_seq;
396 }
397
398 /* Fast retransmission */
399 if (dup && cur_stream->rcvvar->dup_acks == 3) {
400 TRACE_LOSS("Triple duplicated ACKs!! ack_seq: %u\n", ack_seq);
401 if (TCP_SEQ_LT(ack_seq, cur_stream->snd_nxt)) {
402 TRACE_LOSS("Reducing snd_nxt from %u to %u\n",
403 cur_stream->snd_nxt, ack_seq);
404 #if RTM_STAT
405 sndvar->rstat.tdp_ack_cnt++;
406 sndvar->rstat.tdp_ack_bytes += (cur_stream->snd_nxt - ack_seq);
407 #endif
408 if (ack_seq != sndvar->snd_una) {
409 TRACE_DBG("ack_seq and snd_una mismatch on tdp ack. "
410 "ack_seq: %u, snd_una: %u\n",
411 ack_seq, sndvar->snd_una);
412 }
413 cur_stream->snd_nxt = ack_seq;
414 }
415
416 /* update congestion control variables */
417 /* ssthresh to half of min of cwnd and peer wnd */
418 sndvar->ssthresh = MIN(sndvar->cwnd, sndvar->peer_wnd) / 2;
419 if (sndvar->ssthresh < 2 * sndvar->mss) {
420 sndvar->ssthresh = 2 * sndvar->mss;
421 }
422 sndvar->cwnd = sndvar->ssthresh + 3 * sndvar->mss;
423 TRACE_CONG("Fast retransmission. cwnd: %u, ssthresh: %u\n",
424 sndvar->cwnd, sndvar->ssthresh);
425
426 /* count number of retransmissions */
427 if (sndvar->nrtx < TCP_MAX_RTX) {
428 sndvar->nrtx++;
429 } else {
430 TRACE_DBG("Exceed MAX_RTX.\n");
431 }
432
433 cur_stream->actions |= MOS_ACT_SEND_DATA;
434
435 } else if (cur_stream->rcvvar->dup_acks > 3) {
436 /* Inflate congestion window until before overflow */
437 if ((uint32_t)(sndvar->cwnd + sndvar->mss) > sndvar->cwnd) {
438 sndvar->cwnd += sndvar->mss;
439 TRACE_CONG("Dupack cwnd inflate. cwnd: %u, ssthresh: %u\n",
440 sndvar->cwnd, sndvar->ssthresh);
441 }
442 }
443
444 #if TCP_OPT_SACK_ENABLED
445 ParseSACKOption(cur_stream, ack_seq, (uint8_t *)tcph + TCP_HEADER_LEN,
446 (tcph->doff << 2) - TCP_HEADER_LEN);
447 #endif /* TCP_OPT_SACK_ENABLED */
448
449 #if RECOVERY_AFTER_LOSS
450 /* updating snd_nxt (when recovered from loss) */
451 if (TCP_SEQ_GT(ack_seq, cur_stream->snd_nxt)) {
452 #if RTM_STAT
453 sndvar->rstat.ack_upd_cnt++;
454 sndvar->rstat.ack_upd_bytes += (ack_seq - cur_stream->snd_nxt);
455 #endif
456 TRACE_LOSS("Updating snd_nxt from %u to %u\n",
457 cur_stream->snd_nxt, ack_seq);
458 cur_stream->snd_nxt = ack_seq;
459 if (sndvar->sndbuf->len == 0) {
460 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
461 RemoveFromSendList(mtcp, cur_stream);
462 }
463 }
464 #endif
465
466 /* If ack_seq is previously acked, return */
467 if (TCP_SEQ_GEQ(sndvar->sndbuf->head_seq, ack_seq)) {
468 return;
469 }
470
471 /* Remove acked sequence from send buffer */
472 rmlen = ack_seq - sndvar->sndbuf->head_seq;
473 if (rmlen > 0) {
474 /* Routine goes here only if there is new payload (not retransmitted) */
475 uint16_t packets;
476
477 /* If acks new data */
478 packets = rmlen / sndvar->eff_mss;
479 if ((rmlen / sndvar->eff_mss) * sndvar->eff_mss > rmlen) {
480 packets++;
481 }
482
483 /* Estimate RTT and calculate rto */
484 if (cur_stream->saw_timestamp) {
485 EstimateRTT(mtcp, cur_stream,
486 pctx->p.cur_ts - cur_stream->rcvvar->ts_lastack_rcvd);
487 sndvar->rto = (cur_stream->rcvvar->srtt >> 3) + cur_stream->rcvvar->rttvar;
488 assert(sndvar->rto > 0);
489 } else {
490 //TODO: Need to implement timestamp estimation without timestamp
491 TRACE_RTT("NOT IMPLEMENTED.\n");
492 }
493
494 /* Update congestion control variables */
495 if (cur_stream->state >= TCP_ST_ESTABLISHED) {
496 if (sndvar->cwnd < sndvar->ssthresh) {
497 if ((sndvar->cwnd + sndvar->mss) > sndvar->cwnd) {
498 sndvar->cwnd += (sndvar->mss * packets);
499 }
500 TRACE_CONG("slow start cwnd: %u, ssthresh: %u\n",
501 sndvar->cwnd, sndvar->ssthresh);
502 } else {
503 uint32_t new_cwnd = sndvar->cwnd +
504 packets * sndvar->mss * sndvar->mss /
505 sndvar->cwnd;
506 if (new_cwnd > sndvar->cwnd) {
507 sndvar->cwnd = new_cwnd;
508 }
509 //TRACE_CONG("congestion avoidance cwnd: %u, ssthresh: %u\n",
510 // sndvar->cwnd, sndvar->ssthresh);
511 }
512 }
513
514 if (SBUF_LOCK(&sndvar->write_lock)) {
515 if (errno == EDEADLK)
516 perror("ProcessACK: write_lock blocked\n");
517 assert(0);
518 }
519 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
520 SBRemove(mtcp->rbm_snd, sndvar->sndbuf, rmlen);
521 sndvar->snd_una = ack_seq;
522 snd_wnd_prev = sndvar->snd_wnd;
523 sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len;
524
525 /* If there was no available sending window */
526 /* notify the newly available window to application */
527 #if SELECTIVE_WRITE_EVENT_NOTIFY
528 if (snd_wnd_prev <= 0) {
529 #endif /* SELECTIVE_WRITE_EVENT_NOTIFY */
530 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
531 RaiseWriteEvent(mtcp, cur_stream);
532 #if SELECTIVE_WRITE_EVENT_NOTIFY
533 }
534 #endif /* SELECTIVE_WRITE_EVENT_NOTIFY */
535
536 SBUF_UNLOCK(&sndvar->write_lock);
537 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
538 UpdateRetransmissionTimer(mtcp, cur_stream, pctx->p.cur_ts);
539 }
540 }
541 /*----------------------------------------------------------------------------*/
542 /* ProcessTCPPayload: merges TCP payload using receive ring buffer */
543 /* Return: TRUE (1) in normal case, FALSE (0) if immediate ACK is required */
544 /* CAUTION: should only be called at ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2 */
545 /*----------------------------------------------------------------------------*/
546 static inline int
ProcessTCPPayload(mtcp_manager_t mtcp,tcp_stream * cur_stream,struct pkt_ctx * pctx)547 ProcessTCPPayload(mtcp_manager_t mtcp, tcp_stream *cur_stream,
548 struct pkt_ctx *pctx)
549 {
550 struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
551 uint32_t prev_rcv_nxt;
552 int ret = -1;
553 bool read_lock;
554 struct socket_map *walk;
555
556 if (!cur_stream->buffer_mgmt)
557 return FALSE;
558
559 /* if seq and segment length is lower than rcv_nxt, ignore and send ack */
560 if (TCP_SEQ_LT(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt))
561 return FALSE;
562
563 /*
564 TRACE_DEBUG("pctx->p.seq = %u, pctx->p.payloadlen = %d / cur_stream->rcv_nxt = %u, "
565 "rcvvar->rcv_wnd = %u\n",
566 pctx->p.seq, pctx->p.payloadlen, cur_stream->rcv_nxt, rcvvar->rcv_wnd);
567 */
568
569 /* if payload exceeds receiving buffer, drop and send ack */
570 if (TCP_SEQ_GT(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt + rcvvar->rcv_wnd)) {
571 /* MOS_ON_ERROR: payload outside the window arrives */
572 if (cur_stream->side == MOS_SIDE_CLI) {
573 SOCKQ_FOREACH_REVERSE(walk, &cur_stream->msocks) {
574 HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side,
575 pctx, MOS_ON_ERROR);
576 } SOCKQ_FOREACH_END;
577 } else { /* cur_stream->side == MOS_SIDE_SVR */
578 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
579 HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side,
580 pctx, MOS_ON_ERROR);
581 } SOCKQ_FOREACH_END;
582 }
583 return FALSE;
584 }
585
586 /* allocate receive buffer if not exist */
587 if (!rcvvar->rcvbuf) {
588 rcvvar->rcvbuf = tcprb_new(mtcp->bufseg_pool, g_config.mos->rmem_size, cur_stream->buffer_mgmt);
589 if (!rcvvar->rcvbuf) {
590 TRACE_ERROR("Stream %d: Failed to allocate receive buffer.\n",
591 cur_stream->id);
592 cur_stream->state = TCP_ST_CLOSED_RSVD;
593 cur_stream->close_reason = TCP_NO_MEM;
594 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
595 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
596 RaiseErrorEvent(mtcp, cur_stream);
597
598 return ERROR;
599 }
600 }
601
602 read_lock = HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM);
603
604 if (read_lock && SBUF_LOCK(&rcvvar->read_lock)) {
605 if (errno == EDEADLK)
606 perror("ProcessTCPPayload: read_lock blocked\n");
607 assert(0);
608 }
609
610 prev_rcv_nxt = cur_stream->rcv_nxt;
611
612 tcprb_t *rb = rcvvar->rcvbuf;
613 loff_t off = seq2loff(rb, pctx->p.seq, (rcvvar->irs + 1));
614 if (off >= 0) {
615 if (!HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM) &&
616 HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE))
617 tcprb_setpile(rb, rb->pile + tcprb_cflen(rb));
618
619 /* policy on the buffer outrun case
620 * (1) has MOS_SOCK_MONITOR_STREAM_ACTIVE -> raise MOS_ON_ERROR first
621 * and then overwrite payload
622 * (2) MOS_SOCK_STREAM only -> overwrite immediately
623 */
624 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE)) {
625 int fflen = tcprb_fflen(rb, pctx->p.payload, pctx->p.payloadlen, off);
626 if (fflen > 0) {
627 /* if we detect buffer outrun, raise a MOS_ON_ERROR event.
628 then, the application can either consume read buffer data,
629 or increase the read buffer size */
630 if (cur_stream->side == MOS_SIDE_CLI) {
631 SOCKQ_FOREACH_REVERSE(walk, &cur_stream->msocks) {
632 if (walk->monitor_stream->peek_offset[cur_stream->side]
633 < rb->head + fflen)
634 HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side,
635 pctx, MOS_ON_ERROR);
636 } SOCKQ_FOREACH_END;
637 } else {
638 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
639 if (walk->monitor_stream->peek_offset[cur_stream->side]
640 < rb->head + fflen)
641 HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side,
642 pctx, MOS_ON_ERROR);
643 } SOCKQ_FOREACH_END;
644 }
645 }
646 }
647 /* try writing packet payload to buffer */
648 ret = tcprb_pwrite(rb, pctx->p.payload, pctx->p.payloadlen, off);
649 }
650 /* TODO: update monitor vars */
651
652 /*
653 * error can pop up due to disabled buffered management
654 * (only in monitor mode). In that case, ignore the warning
655 * message.
656 */
657 if (ret < 0 && cur_stream->buffer_mgmt && mtcp->num_msp == 0)
658 TRACE_ERROR("Cannot merge payload.\n");
659
660 /* discard the buffer if the state is FIN_WAIT_1 or FIN_WAIT_2,
661 meaning that the connection is already closed by the application */
662 loff_t cftail = rb->pile + tcprb_cflen(rb);
663 if (cur_stream->state == TCP_ST_FIN_WAIT_1 ||
664 cur_stream->state == TCP_ST_FIN_WAIT_2) {
665 /* XXX: Do we really need to update recv vars? */
666 tcprb_setpile(rb, cftail);
667 }
668 if (cftail > 0 && (rcvvar->irs + 1) + cftail > cur_stream->rcv_nxt) {
669 RAISE_DEBUG_EVENT(mtcp, cur_stream,
670 "Move rcv_nxt from %u to %u.\n",
671 cur_stream->rcv_nxt, (rcvvar->irs + 1) + cftail);
672 cur_stream->rcv_nxt = (rcvvar->irs + 1) + cftail;
673 }
674 assert(cftail - rb->pile >= 0);
675 rcvvar->rcv_wnd = rb->len - (cftail - rb->pile);
676
677 if (read_lock)
678 SBUF_UNLOCK(&rcvvar->read_lock);
679
680
681 if (TCP_SEQ_LEQ(cur_stream->rcv_nxt, prev_rcv_nxt)) {
682 /* There are some lost packets */
683 return FALSE;
684 }
685
686 TRACE_EPOLL("Stream %d data arrived. "
687 "len: %d, ET: %llu, IN: %llu, OUT: %llu\n",
688 cur_stream->id, pctx->p.payloadlen,
689 cur_stream->socket? (unsigned long long)cur_stream->socket->epoll & MOS_EPOLLET : 0,
690 cur_stream->socket? (unsigned long long)cur_stream->socket->epoll & MOS_EPOLLIN : 0,
691 cur_stream->socket? (unsigned long long)cur_stream->socket->epoll & MOS_EPOLLOUT : 0);
692
693 if (cur_stream->state == TCP_ST_ESTABLISHED)
694 RaiseReadEvent(mtcp, cur_stream);
695
696 return TRUE;
697 }
698 /*----------------------------------------------------------------------------*/
699 static inline void
Handle_TCP_ST_LISTEN(mtcp_manager_t mtcp,tcp_stream * cur_stream,struct pkt_ctx * pctx)700 Handle_TCP_ST_LISTEN (mtcp_manager_t mtcp, tcp_stream* cur_stream,
701 struct pkt_ctx *pctx)
702 {
703
704 const struct tcphdr* tcph = pctx->p.tcph;
705
706 if (tcph->syn) {
707 if (cur_stream->state == TCP_ST_LISTEN)
708 cur_stream->rcv_nxt++;
709 cur_stream->state = TCP_ST_SYN_RCVD;
710 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE | MOS_ON_CONN_START;
711 TRACE_STATE("Stream %d: TCP_ST_SYN_RCVD\n", cur_stream->id);
712 cur_stream->actions |= MOS_ACT_SEND_CONTROL;
713 if (IS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE)) {
714 /**
715 * Passive stream context needs to initialize irs and rcv_nxt
716 * as it is not set neither during createserverstream or monitor
717 * creation.
718 */
719 cur_stream->rcvvar->irs =
720 cur_stream->rcv_nxt = pctx->p.seq;
721 }
722 } else {
723 CTRACE_ERROR("Stream %d (TCP_ST_LISTEN): "
724 "Packet without SYN.\n", cur_stream->id);
725 }
726
727 }
728 /*----------------------------------------------------------------------------*/
729 static inline void
Handle_TCP_ST_SYN_SENT(mtcp_manager_t mtcp,tcp_stream * cur_stream,struct pkt_ctx * pctx)730 Handle_TCP_ST_SYN_SENT (mtcp_manager_t mtcp, tcp_stream* cur_stream,
731 struct pkt_ctx *pctx)
732 {
733 const struct tcphdr* tcph = pctx->p.tcph;
734
735 /* when active open */
736 if (tcph->ack) {
737 /* filter the unacceptable acks */
738 if (TCP_SEQ_LEQ(pctx->p.ack_seq, cur_stream->sndvar->iss)
739 #ifndef BE_RESILIENT_TO_PACKET_DROP
740 || TCP_SEQ_GT(pctx->p.ack_seq, cur_stream->snd_nxt)
741 #endif
742 ) {
743 if (!tcph->rst) {
744 cur_stream->actions |= MOS_ACT_SEND_RST;
745 }
746 return;
747 }
748 /* accept the ack */
749 cur_stream->sndvar->snd_una++;
750 }
751
752 if (tcph->rst) {
753 if (tcph->ack) {
754 cur_stream->state = TCP_ST_CLOSED_RSVD;
755 cur_stream->close_reason = TCP_RESET;
756 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
757 if (cur_stream->socket) {
758 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
759 RaiseErrorEvent(mtcp, cur_stream);
760 } else {
761 cur_stream->actions |= MOS_ACT_DESTROY;
762 }
763 }
764 return;
765 }
766
767 if (tcph->ack
768 #ifndef BE_RESILIENT_TO_PACKET_DROP
769 /* If we already lost SYNACK, let the ACK packet do the SYNACK's role */
770 && tcph->syn
771 #endif
772 ) {
773 int ret = HandleActiveOpen(mtcp, cur_stream, pctx);
774 if (!ret) {
775 return;
776 }
777
778 #ifdef BE_RESILIENT_TO_PACKET_DROP
779 if (!tcph->syn) {
780 RAISE_DEBUG_EVENT(mtcp, cur_stream,
781 "We missed SYNACK. Replace it with an ACK packet.\n");
782 /* correct some variables */
783 cur_stream->rcv_nxt = pctx->p.seq;
784 }
785 #endif
786
787 cur_stream->sndvar->nrtx = 0;
788 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
789 RemoveFromRTOList(mtcp, cur_stream);
790 cur_stream->state = TCP_ST_ESTABLISHED;
791 cur_stream->cb_events |= /*MOS_ON_CONN_SETUP |*/ MOS_ON_TCP_STATE_CHANGE;
792 TRACE_STATE("Stream %d: TCP_ST_ESTABLISHED\n", cur_stream->id);
793
794 if (cur_stream->socket) {
795 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
796 RaiseWriteEvent(mtcp, cur_stream);
797 } else {
798 TRACE_STATE("Stream %d: ESTABLISHED, but no socket\n", cur_stream->id);
799 cur_stream->close_reason = TCP_ACTIVE_CLOSE;
800 cur_stream->actions |= MOS_ACT_SEND_RST;
801 cur_stream->actions |= MOS_ACT_DESTROY;
802 }
803 cur_stream->actions |= MOS_ACT_SEND_CONTROL;
804 if (g_config.mos->tcp_timeout > 0)
805 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
806 AddtoTimeoutList(mtcp, cur_stream);
807
808 #ifdef BE_RESILIENT_TO_PACKET_DROP
809 /* Handle this ack packet */
810 if (!tcph->syn)
811 Handle_TCP_ST_ESTABLISHED(mtcp, cur_stream, pctx);
812 #endif
813
814 } else if (tcph->syn) {
815 cur_stream->state = TCP_ST_SYN_RCVD;
816 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
817 TRACE_STATE("Stream %d: TCP_ST_SYN_RCVD\n", cur_stream->id);
818 cur_stream->snd_nxt = cur_stream->sndvar->iss;
819 cur_stream->actions |= MOS_ACT_SEND_CONTROL;
820 }
821 }
822 /*----------------------------------------------------------------------------*/
823 static inline void
Handle_TCP_ST_SYN_RCVD(mtcp_manager_t mtcp,tcp_stream * cur_stream,struct pkt_ctx * pctx)824 Handle_TCP_ST_SYN_RCVD (mtcp_manager_t mtcp, tcp_stream* cur_stream,
825 struct pkt_ctx *pctx)
826 {
827 const struct tcphdr* tcph = pctx->p.tcph;
828 struct tcp_send_vars *sndvar = cur_stream->sndvar;
829 int ret;
830
831 if (tcph->ack) {
832 uint32_t prior_cwnd;
833 /* NOTE: We do not validate the ack number because first few packets
834 * can also come out of order */
835
836 sndvar->snd_una++;
837 cur_stream->snd_nxt = pctx->p.ack_seq;
838 prior_cwnd = sndvar->cwnd;
839 sndvar->cwnd = ((prior_cwnd == 1)?
840 (sndvar->mss * 2): sndvar->mss);
841
842 //UpdateRetransmissionTimer(mtcp, cur_stream, cur_ts);
843 sndvar->nrtx = 0;
844 cur_stream->rcv_nxt = cur_stream->rcvvar->irs + 1;
845 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
846 RemoveFromRTOList(mtcp, cur_stream);
847
848 cur_stream->state = TCP_ST_ESTABLISHED;
849 cur_stream->cb_events |= /*MOS_ON_CONN_SETUP |*/ MOS_ON_TCP_STATE_CHANGE;
850 TRACE_STATE("Stream %d: TCP_ST_ESTABLISHED\n", cur_stream->id);
851
852 #ifdef BE_RESILIENT_TO_PACKET_DROP
853 if (pctx->p.ack_seq != sndvar->iss + 1)
854 Handle_TCP_ST_ESTABLISHED(mtcp, cur_stream, pctx);
855 #endif
856
857 /* update listening socket */
858 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) {
859
860 struct tcp_listener *listener = mtcp->listener;
861
862 ret = StreamEnqueue(listener->acceptq, cur_stream);
863 if (ret < 0) {
864 TRACE_ERROR("Stream %d: Failed to enqueue to "
865 "the listen backlog!\n", cur_stream->id);
866 cur_stream->close_reason = TCP_NOT_ACCEPTED;
867 cur_stream->state = TCP_ST_CLOSED_RSVD;
868 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
869 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", cur_stream->id);
870 cur_stream->actions |= MOS_ACT_SEND_CONTROL;
871 }
872
873 /* raise an event to the listening socket */
874 if (listener->socket && (listener->socket->epoll & MOS_EPOLLIN)) {
875 AddEpollEvent(mtcp->ep,
876 MOS_EVENT_QUEUE, listener->socket, MOS_EPOLLIN);
877 }
878 }
879
880 //TRACE_DBG("Stream %d inserted into acceptq.\n", cur_stream->id);
881 if (g_config.mos->tcp_timeout > 0)
882 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
883 AddtoTimeoutList(mtcp, cur_stream);
884
885 } else {
886 /* Handle retransmitted SYN packet */
887 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE) &&
888 tcph->syn) {
889 if (pctx->p.seq == cur_stream->pair_stream->sndvar->iss) {
890 TRACE_DBG("syn retransmit! (p.seq = %u / iss = %u)\n",
891 pctx->p.seq, cur_stream->pair_stream->sndvar->iss);
892 cur_stream->cb_events |= MOS_ON_REXMIT;
893 }
894 }
895
896 TRACE_DBG("Stream %d (TCP_ST_SYN_RCVD): No ACK.\n",
897 cur_stream->id);
898 /* retransmit SYN/ACK */
899 cur_stream->snd_nxt = sndvar->iss;
900 cur_stream->actions |= MOS_ACT_SEND_CONTROL;
901 }
902 }
903 /*----------------------------------------------------------------------------*/
904 static inline void
Handle_TCP_ST_ESTABLISHED(mtcp_manager_t mtcp,tcp_stream * cur_stream,struct pkt_ctx * pctx)905 Handle_TCP_ST_ESTABLISHED (mtcp_manager_t mtcp, tcp_stream* cur_stream,
906 struct pkt_ctx *pctx)
907 {
908 const struct tcphdr* tcph = pctx->p.tcph;
909
910 if (tcph->syn) {
911 /* Handle retransmitted SYNACK packet */
912 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE) &&
913 tcph->ack) {
914 if (pctx->p.seq == cur_stream->pair_stream->sndvar->iss) {
915 TRACE_DBG("syn/ack retransmit! (p.seq = %u / iss = %u)\n",
916 pctx->p.seq, cur_stream->pair_stream->sndvar->iss);
917 cur_stream->cb_events |= MOS_ON_REXMIT;
918 }
919 }
920
921 TRACE_DBG("Stream %d (TCP_ST_ESTABLISHED): weird SYN. "
922 "seq: %u, expected: %u, ack_seq: %u, expected: %u\n",
923 cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt,
924 pctx->p.ack_seq, cur_stream->snd_nxt);
925 cur_stream->snd_nxt = pctx->p.ack_seq;
926 cur_stream->actions |= MOS_ACT_SEND_CONTROL;
927 return;
928 }
929
930 if (pctx->p.payloadlen > 0) {
931 if (ProcessTCPPayload(mtcp, cur_stream, pctx)) {
932 /* if return is TRUE, send ACK */
933 cur_stream->actions |= MOS_ACT_SEND_ACK_AGG;
934 } else {
935 cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
936 }
937 }
938
939 if (tcph->ack) {
940 if (cur_stream->sndvar->sndbuf) {
941 ProcessACK(mtcp, cur_stream, pctx);
942 }
943 }
944
945 if (tcph->fin) {
946 /* process the FIN only if the sequence is valid */
947 /* FIN packet is allowed to push payload (should we check for PSH flag)? */
948 if (!cur_stream->buffer_mgmt ||
949 #ifdef BE_RESILIENT_TO_PACKET_DROP
950 TCP_SEQ_GEQ(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt)
951 #else
952 pctx->p.seq + pctx->p.payloadlen == cur_stream->rcv_nxt
953 #endif
954 ) {
955 #ifdef BE_RESILIENT_TO_PACKET_DROP
956 cur_stream->rcv_nxt = pctx->p.seq + pctx->p.payloadlen;
957 #endif
958 cur_stream->state = TCP_ST_CLOSE_WAIT;
959 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
960 TRACE_STATE("Stream %d: TCP_ST_CLOSE_WAIT\n", cur_stream->id);
961 cur_stream->rcv_nxt++;
962 cur_stream->actions |= MOS_ACT_SEND_CONTROL;
963
964 /* notify FIN to application */
965 RaiseReadEvent(mtcp, cur_stream);
966 } else {
967 RAISE_DEBUG_EVENT(mtcp, cur_stream,
968 "Expected %u, but received %u (= %u + %u)\n",
969 cur_stream->rcv_nxt, pctx->p.seq + pctx->p.payloadlen,
970 pctx->p.seq, pctx->p.payloadlen);
971
972 cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
973 return;
974 }
975 }
976 }
977 /*----------------------------------------------------------------------------*/
978 static inline void
Handle_TCP_ST_CLOSE_WAIT(mtcp_manager_t mtcp,tcp_stream * cur_stream,struct pkt_ctx * pctx)979 Handle_TCP_ST_CLOSE_WAIT (mtcp_manager_t mtcp, tcp_stream* cur_stream,
980 struct pkt_ctx *pctx)
981 {
982 if (TCP_SEQ_LT(pctx->p.seq, cur_stream->rcv_nxt)) {
983 TRACE_DBG("Stream %d (TCP_ST_CLOSE_WAIT): "
984 "weird seq: %u, expected: %u\n",
985 cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt);
986 cur_stream->actions |= MOS_ACT_SEND_CONTROL;
987 return;
988 }
989
990 if (cur_stream->sndvar->sndbuf) {
991 ProcessACK(mtcp, cur_stream, pctx);
992 }
993 }
994 /*----------------------------------------------------------------------------*/
995 static inline void
Handle_TCP_ST_LAST_ACK(mtcp_manager_t mtcp,tcp_stream * cur_stream,struct pkt_ctx * pctx)996 Handle_TCP_ST_LAST_ACK (mtcp_manager_t mtcp, tcp_stream* cur_stream,
997 struct pkt_ctx *pctx)
998 {
999 const struct tcphdr* tcph = pctx->p.tcph;
1000
1001 if (TCP_SEQ_LT(pctx->p.seq, cur_stream->rcv_nxt)) {
1002 TRACE_DBG("Stream %d (TCP_ST_LAST_ACK): "
1003 "weird seq: %u, expected: %u\n",
1004 cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt);
1005 return;
1006 }
1007
1008 if (tcph->ack) {
1009 if (cur_stream->sndvar->sndbuf) {
1010 ProcessACK(mtcp, cur_stream, pctx);
1011 }
1012
1013 if (!cur_stream->sndvar->is_fin_sent) {
1014 /* the case that FIN is not sent yet */
1015 /* this is not ack for FIN, ignore */
1016 TRACE_DBG("Stream %d (TCP_ST_LAST_ACK): "
1017 "No FIN sent yet.\n", cur_stream->id);
1018 #ifdef DBGMSG
1019 DumpIPPacket(mtcp, pctx->p.iph, pctx->p.ip_len);
1020 #endif
1021 #if DUMP_STREAM
1022 DumpStream(mtcp, cur_stream);
1023 DumpControlList(mtcp, mtcp->n_sender[0]);
1024 #endif
1025 return;
1026 }
1027
1028 /* check if ACK of FIN */
1029 if (pctx->p.ack_seq == cur_stream->sndvar->fss + 1) {
1030 cur_stream->sndvar->snd_una++;
1031 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
1032 UpdateRetransmissionTimer(mtcp, cur_stream, pctx->p.cur_ts);
1033 cur_stream->state = TCP_ST_CLOSED_RSVD;
1034 cur_stream->close_reason = TCP_PASSIVE_CLOSE;
1035 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1036 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n",
1037 cur_stream->id);
1038 cur_stream->actions |= MOS_ACT_DESTROY;
1039 } else {
1040 TRACE_DBG("Stream %d (TCP_ST_LAST_ACK): Not ACK of FIN. "
1041 "ack_seq: %u, expected: %u\n",
1042 cur_stream->id, pctx->p.ack_seq, cur_stream->sndvar->fss + 1);
1043 //cur_stream->snd_nxt = cur_stream->sndvar->fss;
1044 cur_stream->actions |= MOS_ACT_SEND_CONTROL;
1045 }
1046 } else {
1047 TRACE_DBG("Stream %d (TCP_ST_LAST_ACK): No ACK\n",
1048 cur_stream->id);
1049 //cur_stream->snd_nxt = cur_stream->sndvar->fss;
1050 cur_stream->actions |= MOS_ACT_SEND_CONTROL;
1051 }
1052 }
1053 /*----------------------------------------------------------------------------*/
1054 static inline void
Handle_TCP_ST_FIN_WAIT_1(mtcp_manager_t mtcp,tcp_stream * cur_stream,struct pkt_ctx * pctx)1055 Handle_TCP_ST_FIN_WAIT_1 (mtcp_manager_t mtcp, tcp_stream* cur_stream,
1056 struct pkt_ctx *pctx)
1057 {
1058 const struct tcphdr* tcph = pctx->p.tcph;
1059
1060 if (TCP_SEQ_LT(pctx->p.seq, cur_stream->rcv_nxt)) {
1061 RAISE_DEBUG_EVENT(mtcp, cur_stream,
1062 "Stream %d (FIN_WAIT_1): "
1063 "weird seq: %u, expected: %u\n",
1064 cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt);
1065 cur_stream->actions |= MOS_ACT_SEND_CONTROL;
1066 return;
1067 }
1068
1069 if (tcph->ack) {
1070 if (cur_stream->sndvar->sndbuf) {
1071 ProcessACK(mtcp, cur_stream, pctx);
1072 }
1073 #if BE_RESILIENT_TO_PACKET_DROP
1074 if (cur_stream->sndvar->is_fin_sent &&
1075 ((!HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM) &&
1076 pctx->p.ack_seq == cur_stream->sndvar->fss) ||
1077 pctx->p.ack_seq == cur_stream->sndvar->fss + 1)) {
1078
1079 #else
1080 if (cur_stream->sndvar->is_fin_sent &&
1081 pctx->p.ack_seq == cur_stream->sndvar->fss + 1) {
1082 #endif
1083 cur_stream->sndvar->snd_una = pctx->p.ack_seq;
1084 if (TCP_SEQ_GT(pctx->p.ack_seq, cur_stream->snd_nxt)) {
1085 TRACE_DBG("Stream %d: update snd_nxt to %u\n",
1086 cur_stream->id, pctx->p.ack_seq);
1087 cur_stream->snd_nxt = pctx->p.ack_seq;
1088 }
1089 //cur_stream->sndvar->snd_una++;
1090 //UpdateRetransmissionTimer(mtcp, cur_stream, cur_ts);
1091 cur_stream->sndvar->nrtx = 0;
1092 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
1093 RemoveFromRTOList(mtcp, cur_stream);
1094 cur_stream->state = TCP_ST_FIN_WAIT_2;
1095 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1096 TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_2\n",
1097 cur_stream->id);
1098 } else {
1099 RAISE_DEBUG_EVENT(mtcp, cur_stream,
1100 "Failed to transit to FIN_WAIT_2, "
1101 "is_fin_sent: %s, ack_seq: %u, sndvar->fss: %u\n",
1102 cur_stream->sndvar->is_fin_sent ? "true" : "false",
1103 pctx->p.ack_seq, cur_stream->sndvar->fss);
1104 }
1105
1106 } else {
1107 RAISE_DEBUG_EVENT(mtcp, cur_stream,
1108 "Failed to transit to FIN_WAIT_2, "
1109 "We got a %s%s%s%s%s%s packet.",
1110 pctx->p.tcph->syn ? "S" : "",
1111 pctx->p.tcph->fin ? "F" : "",
1112 pctx->p.tcph->rst ? "R" : "",
1113 pctx->p.tcph->psh ? "P" : "",
1114 pctx->p.tcph->urg ? "U" : "",
1115 pctx->p.tcph->ack ? "A" : "");
1116
1117 TRACE_DBG("Stream %d: does not contain an ack!\n",
1118 cur_stream->id);
1119 return;
1120 }
1121
1122 if (pctx->p.payloadlen > 0) {
1123 if (ProcessTCPPayload(mtcp, cur_stream, pctx)) {
1124 /* if return is TRUE, send ACK */
1125 cur_stream->actions |= MOS_ACT_SEND_ACK_AGG;
1126 } else {
1127 cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
1128 }
1129 }
1130
1131 if (tcph->fin) {
1132 /* process the FIN only if the sequence is valid */
1133 /* FIN packet is allowed to push payload (should we check for PSH flag)? */
1134 if (!cur_stream->buffer_mgmt ||
1135 #ifdef BE_RESILIENT_TO_PACKET_DROP
1136 TCP_SEQ_GEQ(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt)
1137 #else
1138 pctx->p.seq + pctx->p.payloadlen == cur_stream->rcv_nxt
1139 #endif
1140 ) {
1141 cur_stream->rcv_nxt++;
1142
1143 if (cur_stream->state == TCP_ST_FIN_WAIT_1) {
1144 cur_stream->state = TCP_ST_CLOSING;
1145 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1146 TRACE_STATE("Stream %d: TCP_ST_CLOSING\n", cur_stream->id);
1147
1148 } else if (cur_stream->state == TCP_ST_FIN_WAIT_2) {
1149 cur_stream->state = TCP_ST_TIME_WAIT;
1150 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1151 TRACE_STATE("Stream %d: TCP_ST_TIME_WAIT\n", cur_stream->id);
1152 AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts);
1153 }
1154 cur_stream->actions |= MOS_ACT_SEND_CONTROL;
1155 } else {
1156 RAISE_DEBUG_EVENT(mtcp, cur_stream,
1157 "Expected %u, but received %u (= %u + %u)\n",
1158 cur_stream->rcv_nxt, pctx->p.seq + pctx->p.payloadlen,
1159 pctx->p.seq, pctx->p.payloadlen);
1160
1161 cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
1162 return;
1163 }
1164 }
1165 }
1166 /*----------------------------------------------------------------------------*/
1167 static inline void
1168 Handle_TCP_ST_FIN_WAIT_2 (mtcp_manager_t mtcp, tcp_stream* cur_stream,
1169 struct pkt_ctx *pctx)
1170 {
1171 const struct tcphdr* tcph = pctx->p.tcph;
1172
1173 if (tcph->ack) {
1174 if (cur_stream->sndvar->sndbuf) {
1175 ProcessACK(mtcp, cur_stream, pctx);
1176 }
1177 } else {
1178 TRACE_DBG("Stream %d: does not contain an ack!\n",
1179 cur_stream->id);
1180 return;
1181 }
1182
1183 if (pctx->p.payloadlen > 0) {
1184 if (ProcessTCPPayload(mtcp, cur_stream, pctx)) {
1185 /* if return is TRUE, send ACK */
1186 cur_stream->actions |= MOS_ACT_SEND_ACK_AGG;
1187 } else {
1188 cur_stream->actions |= MOS_ACT_SEND_ACK_NOW;
1189 }
1190 }
1191
1192 if (tcph->fin) {
1193 /* process the FIN only if the sequence is valid */
1194 /* FIN packet is allowed to push payload (should we check for PSH flag)? */
1195 if (!cur_stream->buffer_mgmt ||
1196 #ifdef BE_RESILIENT_TO_PACKET_DROP
1197 TCP_SEQ_GEQ(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt)
1198 #else
1199 pctx->p.seq + pctx->p.payloadlen == cur_stream->rcv_nxt
1200 #endif
1201 ) {
1202 cur_stream->state = TCP_ST_TIME_WAIT;
1203 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1204 cur_stream->rcv_nxt++;
1205 TRACE_STATE("Stream %d: TCP_ST_TIME_WAIT\n", cur_stream->id);
1206
1207 AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts);
1208 cur_stream->actions |= MOS_ACT_SEND_CONTROL;
1209 }
1210 } else {
1211 TRACE_DBG("Stream %d (TCP_ST_FIN_WAIT_2): No FIN. "
1212 "seq: %u, ack_seq: %u, snd_nxt: %u, snd_una: %u\n",
1213 cur_stream->id, pctx->p.seq, pctx->p.ack_seq,
1214 cur_stream->snd_nxt, cur_stream->sndvar->snd_una);
1215 #if DBGMSG
1216 DumpIPPacket(mtcp, pctx->p.iph, pctx->p.ip_len);
1217 #endif
1218 }
1219
1220 }
1221 /*----------------------------------------------------------------------------*/
1222 static inline void
1223 Handle_TCP_ST_CLOSING (mtcp_manager_t mtcp, tcp_stream* cur_stream,
1224 struct pkt_ctx *pctx)
1225 {
1226 const struct tcphdr* tcph = pctx->p.tcph;
1227
1228 if (tcph->ack) {
1229 if (cur_stream->sndvar->sndbuf) {
1230 ProcessACK(mtcp, cur_stream, pctx);
1231 }
1232
1233 if (!cur_stream->sndvar->is_fin_sent) {
1234 TRACE_DBG("Stream %d (TCP_ST_CLOSING): "
1235 "No FIN sent yet.\n", cur_stream->id);
1236 return;
1237 }
1238
1239 // check if ACK of FIN
1240 if (pctx->p.ack_seq != cur_stream->sndvar->fss + 1) {
1241 #if DBGMSG
1242 TRACE_DBG("Stream %d (TCP_ST_CLOSING): Not ACK of FIN. "
1243 "ack_seq: %u, snd_nxt: %u, snd_una: %u, fss: %u\n",
1244 cur_stream->id, pctx->p.ack_seq, cur_stream->snd_nxt,
1245 cur_stream->sndvar->snd_una, cur_stream->sndvar->fss);
1246 DumpIPPacketToFile(stderr, pctx->p.iph, pctx->p.ip_len);
1247 DumpStream(mtcp, cur_stream);
1248 #endif
1249 //assert(0);
1250 /* if the packet is not the ACK of FIN, ignore */
1251 return;
1252 }
1253
1254 cur_stream->sndvar->snd_una = pctx->p.ack_seq;
1255 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
1256 UpdateRetransmissionTimer(mtcp, cur_stream, pctx->p.cur_ts);
1257 cur_stream->state = TCP_ST_TIME_WAIT;
1258 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1259 TRACE_STATE("Stream %d: TCP_ST_TIME_WAIT\n", cur_stream->id);
1260
1261 AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts);
1262
1263 } else {
1264 TRACE_DBG("Stream %d (TCP_ST_CLOSING): Not ACK\n",
1265 cur_stream->id);
1266 return;
1267 }
1268 }
1269 /*----------------------------------------------------------------------------*/
1270 void
1271 UpdateRecvTCPContext(mtcp_manager_t mtcp, struct tcp_stream *cur_stream,
1272 struct pkt_ctx *pctx)
1273 {
1274 struct tcphdr* tcph = pctx->p.tcph;
1275 int ret;
1276
1277 assert(cur_stream);
1278
1279 /* Validate sequence. if not valid, ignore the packet */
1280 if (cur_stream->state > TCP_ST_SYN_RCVD) {
1281
1282 ret = ValidateSequence(mtcp, cur_stream, pctx);
1283 if (!ret) {
1284 TRACE_DBG("Stream %d: Unexpected sequence: %u, expected: %u\n",
1285 cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt);
1286 #ifdef DBGMSG
1287 DumpIPPacket(mtcp, pctx->p.iph, pctx->p.ip_len);
1288 #endif
1289 #if DUMP_STREAM
1290 DumpStream(mtcp, cur_stream);
1291 #endif
1292 /* cur_stream->cb_events |= MOS_ON_ERROR; */
1293 }
1294 }
1295 /* Update receive window size */
1296 if (tcph->syn) {
1297 cur_stream->sndvar->peer_wnd = pctx->p.window;
1298 } else {
1299 cur_stream->sndvar->peer_wnd =
1300 (uint32_t)pctx->p.window << cur_stream->sndvar->wscale_peer;
1301 }
1302
1303 cur_stream->last_active_ts = pctx->p.cur_ts;
1304 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
1305 UpdateTimeoutList(mtcp, cur_stream);
1306
1307 /* Process RST: process here only if state > TCP_ST_SYN_SENT */
1308 if (tcph->rst) {
1309 cur_stream->have_reset = TRUE;
1310 if (cur_stream->state > TCP_ST_SYN_SENT) {
1311 if (ProcessRST(mtcp, cur_stream, pctx)) {
1312 return;
1313 }
1314 }
1315 }
1316
1317 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE) &&
1318 pctx->p.tcph->fin) {
1319 if (cur_stream->state == TCP_ST_CLOSE_WAIT ||
1320 cur_stream->state == TCP_ST_LAST_ACK ||
1321 cur_stream->state == TCP_ST_CLOSING ||
1322 cur_stream->state == TCP_ST_TIME_WAIT) {
1323 /* Handle retransmitted FIN packet */
1324 if (pctx->p.seq == cur_stream->pair_stream->sndvar->fss) {
1325 TRACE_DBG("FIN retransmit! (seq = %u / fss = %u)\n",
1326 pctx->p.seq, cur_stream->pair_stream->sndvar->fss);
1327 cur_stream->cb_events |= MOS_ON_REXMIT;
1328 }
1329 }
1330 }
1331
1332 switch (cur_stream->state) {
1333 case TCP_ST_LISTEN:
1334 Handle_TCP_ST_LISTEN(mtcp, cur_stream, pctx);
1335 break;
1336
1337 case TCP_ST_SYN_SENT:
1338 Handle_TCP_ST_SYN_SENT(mtcp, cur_stream, pctx);
1339 break;
1340
1341 case TCP_ST_SYN_RCVD:
1342 /* SYN retransmit implies our SYN/ACK was lost. Resend */
1343 if (tcph->syn && pctx->p.seq == cur_stream->rcvvar->irs)
1344 Handle_TCP_ST_LISTEN(mtcp, cur_stream, pctx);
1345 else {
1346 Handle_TCP_ST_SYN_RCVD(mtcp, cur_stream, pctx);
1347 if (pctx->p.payloadlen > 0 && cur_stream->state == TCP_ST_ESTABLISHED)
1348 Handle_TCP_ST_ESTABLISHED(mtcp, cur_stream, pctx);
1349 }
1350 break;
1351
1352 case TCP_ST_ESTABLISHED:
1353 Handle_TCP_ST_ESTABLISHED(mtcp, cur_stream, pctx);
1354 break;
1355
1356 case TCP_ST_CLOSE_WAIT:
1357 Handle_TCP_ST_CLOSE_WAIT(mtcp, cur_stream, pctx);
1358 break;
1359
1360 case TCP_ST_LAST_ACK:
1361 Handle_TCP_ST_LAST_ACK(mtcp, cur_stream, pctx);
1362 break;
1363
1364 case TCP_ST_FIN_WAIT_1:
1365 Handle_TCP_ST_FIN_WAIT_1(mtcp, cur_stream, pctx);
1366 break;
1367
1368 case TCP_ST_FIN_WAIT_2:
1369 Handle_TCP_ST_FIN_WAIT_2(mtcp, cur_stream, pctx);
1370 break;
1371
1372 case TCP_ST_CLOSING:
1373 Handle_TCP_ST_CLOSING(mtcp, cur_stream, pctx);
1374 break;
1375
1376 case TCP_ST_TIME_WAIT:
1377 /* the only thing that can arrive in this state is a retransmission
1378 of the remote FIN. Acknowledge it, and restart the 2 MSL timeout */
1379 if (cur_stream->on_timewait_list) {
1380 RemoveFromTimewaitList(mtcp, cur_stream);
1381 AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts);
1382 }
1383 cur_stream->actions |= MOS_ACT_SEND_CONTROL;
1384 break;
1385
1386 case TCP_ST_CLOSED:
1387 case TCP_ST_CLOSED_RSVD:
1388 break;
1389
1390 default:
1391 break;
1392 }
1393
1394 TRACE_STATE("Stream %d: Events: %0lx, Action: %0x\n",
1395 cur_stream->id, cur_stream->cb_events, cur_stream->actions);
1396 return;
1397 }
1398 /*----------------------------------------------------------------------------*/
1399 void
1400 DoActionEndTCPPacket(mtcp_manager_t mtcp, struct tcp_stream *cur_stream,
1401 struct pkt_ctx *pctx)
1402 {
1403 int i;
1404
1405 for (i = 1; i < MOS_ACT_CNT; i = i << 1) {
1406
1407 if (cur_stream->actions & i) {
1408 switch(i) {
1409 case MOS_ACT_SEND_DATA:
1410 AddtoSendList(mtcp, cur_stream);
1411 break;
1412 case MOS_ACT_SEND_ACK_NOW:
1413 EnqueueACK(mtcp, cur_stream, pctx->p.cur_ts, ACK_OPT_NOW);
1414 break;
1415 case MOS_ACT_SEND_ACK_AGG:
1416 EnqueueACK(mtcp, cur_stream, pctx->p.cur_ts, ACK_OPT_AGGREGATE);
1417 break;
1418 case MOS_ACT_SEND_CONTROL:
1419 AddtoControlList(mtcp, cur_stream, pctx->p.cur_ts);
1420 break;
1421 case MOS_ACT_SEND_RST:
1422 if (cur_stream->state <= TCP_ST_SYN_SENT)
1423 SendTCPPacketStandalone(mtcp,
1424 pctx->p.iph->daddr, pctx->p.tcph->dest,
1425 pctx->p.iph->saddr, pctx->p.tcph->source,
1426 0, pctx->p.seq + 1, 0, TCP_FLAG_RST | TCP_FLAG_ACK,
1427 NULL, 0, pctx->p.cur_ts, 0, 0, -1);
1428 else
1429 SendTCPPacketStandalone(mtcp,
1430 pctx->p.iph->daddr, pctx->p.tcph->dest,
1431 pctx->p.iph->saddr, pctx->p.tcph->source,
1432 pctx->p.ack_seq, 0, 0, TCP_FLAG_RST | TCP_FLAG_ACK,
1433 NULL, 0, pctx->p.cur_ts, 0, 0, -1);
1434 break;
1435 case MOS_ACT_DESTROY:
1436 DestroyTCPStream(mtcp, cur_stream);
1437 break;
1438 default:
1439 assert(1);
1440 break;
1441 }
1442 }
1443 }
1444
1445 cur_stream->actions = 0;
1446 }
1447 /*----------------------------------------------------------------------------*/
1448 /**
1449 * Called (when monitoring mode is enabled).. for every outgoing packet to the
1450 * NIC.
1451 */
1452 inline void
1453 UpdatePassiveRecvTCPContext(mtcp_manager_t mtcp, struct tcp_stream *cur_stream,
1454 struct pkt_ctx *pctx)
1455 {
1456 UpdateRecvTCPContext(mtcp, cur_stream, pctx);
1457 }
1458 /*----------------------------------------------------------------------------*/
1459 /* NOTE TODO: This event prediction is additional overhaed of HK_RCV hook.
1460 * We can transparently optimize this by disabling prediction of events which
1461 * are not monitored by anyone. */
1462 inline void
1463 PreRecvTCPEventPrediction(mtcp_manager_t mtcp, struct pkt_ctx *pctx,
1464 struct tcp_stream *recvside_stream)
1465 {
1466 tcp_rb_overlapchk(mtcp, pctx, recvside_stream);
1467 }
1468 /*----------------------------------------------------------------------------*/
1469