1 #include <assert.h> 2 3 #include "mos_api.h" 4 #include "tcp_util.h" 5 #include "tcp_in.h" 6 #include "tcp_out.h" 7 #include "tcp_ring_buffer.h" 8 #include "eventpoll.h" 9 #include "debug.h" 10 #include "timer.h" 11 #include "ip_in.h" 12 #include "tcp_rb.h" 13 #include "config.h" 14 #include "scalable_event.h" 15 16 #define MAX(a, b) ((a)>(b)?(a):(b)) 17 #define MIN(a, b) ((a)<(b)?(a):(b)) 18 19 #define RECOVERY_AFTER_LOSS TRUE 20 #define SELECTIVE_WRITE_EVENT_NOTIFY TRUE 21 /*----------------------------------------------------------------------------*/ 22 static inline void 23 Handle_TCP_ST_ESTABLISHED (mtcp_manager_t mtcp, tcp_stream* cur_stream, 24 struct pkt_ctx *pctx); 25 /*----------------------------------------------------------------------------*/ 26 static inline int 27 FilterSYNPacket(mtcp_manager_t mtcp, uint32_t ip, uint16_t port) 28 { 29 struct sockaddr_in *addr; 30 31 /* TODO: This listening logic should be revised */ 32 33 /* if not listening, drop */ 34 if (!mtcp->listener) { 35 return FALSE; 36 } 37 38 /* if not the address we want, drop */ 39 addr = &mtcp->listener->socket->saddr; 40 if (addr->sin_port == port) { 41 if (addr->sin_addr.s_addr != INADDR_ANY) { 42 if (ip == addr->sin_addr.s_addr) { 43 return TRUE; 44 } 45 return FALSE; 46 } else { 47 int i; 48 49 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 50 if (ip == g_config.mos->netdev_table->ent[i]->ip_addr) { 51 return TRUE; 52 } 53 } 54 return FALSE; 55 } 56 } 57 58 return FALSE; 59 } 60 /*----------------------------------------------------------------------------*/ 61 static inline int 62 HandleActiveOpen(mtcp_manager_t mtcp, tcp_stream *cur_stream, 63 struct pkt_ctx *pctx) 64 { 65 const struct tcphdr* tcph = pctx->p.tcph; 66 67 cur_stream->rcvvar->irs = pctx->p.seq; 68 cur_stream->snd_nxt = pctx->p.ack_seq; 69 cur_stream->sndvar->peer_wnd = pctx->p.window; 70 cur_stream->rcvvar->snd_wl1 = cur_stream->rcvvar->irs - 1; 71 cur_stream->rcv_nxt = cur_stream->rcvvar->irs + 1; 72 cur_stream->rcvvar->last_ack_seq = pctx->p.ack_seq; 73 ParseTCPOptions(cur_stream, pctx->p.cur_ts, (uint8_t *)tcph + TCP_HEADER_LEN, 74 (tcph->doff << 2) - TCP_HEADER_LEN); 75 cur_stream->sndvar->cwnd = ((cur_stream->sndvar->cwnd == 1)? 76 (cur_stream->sndvar->mss * 2): cur_stream->sndvar->mss); 77 cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10; 78 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 79 UpdateRetransmissionTimer(mtcp, cur_stream, pctx->p.cur_ts); 80 81 return TRUE; 82 } 83 /*----------------------------------------------------------------------------*/ 84 /* ValidateSequence: validates sequence number of the segment */ 85 /* Return: TRUE if acceptable, FALSE if not acceptable */ 86 /*----------------------------------------------------------------------------*/ 87 static inline int 88 ValidateSequence(mtcp_manager_t mtcp, tcp_stream *cur_stream, 89 struct pkt_ctx *pctx) 90 { 91 const struct tcphdr* tcph = pctx->p.tcph; 92 93 /* Protect Against Wrapped Sequence number (PAWS) */ 94 if (!tcph->rst && cur_stream->saw_timestamp) { 95 struct tcp_timestamp ts; 96 97 if (!ParseTCPTimestamp(cur_stream, &ts, 98 (uint8_t *)tcph + TCP_HEADER_LEN, 99 (tcph->doff << 2) - TCP_HEADER_LEN)) { 100 /* if there is no timestamp */ 101 /* TODO: implement here */ 102 TRACE_DBG("No timestamp found.\n"); 103 return FALSE; 104 } 105 106 /* RFC1323: if SEG.TSval < TS.Recent, drop and send ack */ 107 if (TCP_SEQ_LT(ts.ts_val, cur_stream->rcvvar->ts_recent)) { 108 /* TODO: ts_recent should be invalidated 109 before timestamp wraparound for long idle flow */ 110 TRACE_DBG("PAWS Detect wrong timestamp. " 111 "seq: %u, ts_val: %u, prev: %u\n", 112 pctx->p.seq, ts.ts_val, cur_stream->rcvvar->ts_recent); 113 cur_stream->actions |= MOS_ACT_SEND_ACK_NOW; 114 return FALSE; 115 } else { 116 /* valid timestamp */ 117 if (TCP_SEQ_GT(ts.ts_val, cur_stream->rcvvar->ts_recent)) { 118 TRACE_TSTAMP("Timestamp update. cur: %u, prior: %u " 119 "(time diff: %uus)\n", 120 ts.ts_val, cur_stream->rcvvar->ts_recent, 121 TS_TO_USEC(pctx->p.cur_ts - cur_stream->rcvvar->ts_last_ts_upd)); 122 cur_stream->rcvvar->ts_last_ts_upd = pctx->p.cur_ts; 123 } 124 125 cur_stream->rcvvar->ts_recent = ts.ts_val; 126 cur_stream->rcvvar->ts_lastack_rcvd = ts.ts_ref; 127 } 128 } 129 130 /* TCP sequence validation */ 131 if (!TCP_SEQ_BETWEEN(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt, 132 cur_stream->rcv_nxt + cur_stream->rcvvar->rcv_wnd)) { 133 134 /* if RST bit is set, ignore the segment */ 135 if (tcph->rst) 136 return FALSE; 137 138 if (cur_stream->state == TCP_ST_ESTABLISHED) { 139 /* check if it is to get window advertisement */ 140 if (pctx->p.seq + 1 == cur_stream->rcv_nxt) { 141 TRACE_DBG("Window update request. (seq: %u, rcv_wnd: %u)\n", 142 pctx->p.seq, cur_stream->rcvvar->rcv_wnd); 143 cur_stream->actions |= MOS_ACT_SEND_ACK_AGG; 144 return FALSE; 145 146 } 147 148 if (TCP_SEQ_LEQ(pctx->p.seq, cur_stream->rcv_nxt)) { 149 cur_stream->actions |= MOS_ACT_SEND_ACK_AGG; 150 } else { 151 cur_stream->actions |= MOS_ACT_SEND_ACK_NOW; 152 } 153 } else { 154 if (cur_stream->state == TCP_ST_TIME_WAIT) { 155 TRACE_DBG("Stream %d: tw expire update to %u\n", 156 cur_stream->id, cur_stream->rcvvar->ts_tw_expire); 157 AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts); 158 } 159 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 160 } 161 return FALSE; 162 } 163 164 return TRUE; 165 } 166 /*----------------------------------------------------------------------------*/ 167 static inline void 168 NotifyConnectionReset(mtcp_manager_t mtcp, tcp_stream *cur_stream) 169 { 170 TRACE_DBG("Stream %d: Notifying connection reset.\n", cur_stream->id); 171 /* TODO: implement this function */ 172 /* signal to user "connection reset" */ 173 } 174 /*----------------------------------------------------------------------------*/ 175 static inline int 176 ProcessRST(mtcp_manager_t mtcp, tcp_stream *cur_stream, 177 struct pkt_ctx *pctx) 178 { 179 /* TODO: we need reset validation logic */ 180 /* the sequence number of a RST should be inside window */ 181 /* (in SYN_SENT state, it should ack the previous SYN */ 182 183 TRACE_DBG("Stream %d: TCP RESET (%s)\n", 184 cur_stream->id, TCPStateToString(cur_stream)); 185 #if DUMP_STREAM 186 DumpStream(mtcp, cur_stream); 187 #endif 188 189 if (cur_stream->state <= TCP_ST_SYN_SENT) { 190 /* not handled here */ 191 return FALSE; 192 } 193 194 if (cur_stream->state == TCP_ST_SYN_RCVD) { 195 /* ACK number of last sent ACK packet == rcv_nxt + 1*/ 196 if (pctx->p.seq == 0 || 197 #ifdef BE_RESILIENT_TO_PACKET_DROP 198 pctx->p.seq == cur_stream->rcv_nxt + 1 || 199 #endif 200 pctx->p.ack_seq == cur_stream->snd_nxt) 201 { 202 cur_stream->state = TCP_ST_CLOSED_RSVD; 203 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 204 cur_stream->close_reason = TCP_RESET; 205 cur_stream->actions |= MOS_ACT_DESTROY; 206 } else { 207 RAISE_DEBUG_EVENT(mtcp, cur_stream, 208 "(SYN_RCVD): Ignore invalid RST. " 209 "ack_seq expected: %u, ack_seq rcvd: %u\n", 210 cur_stream->rcv_nxt + 1, pctx->p.ack_seq); 211 } 212 return TRUE; 213 } 214 215 /* if the application is already closed the connection, 216 just destroy the it */ 217 if (cur_stream->state == TCP_ST_FIN_WAIT_1 || 218 cur_stream->state == TCP_ST_FIN_WAIT_2 || 219 cur_stream->state == TCP_ST_LAST_ACK || 220 cur_stream->state == TCP_ST_CLOSING || 221 cur_stream->state == TCP_ST_TIME_WAIT) { 222 cur_stream->state = TCP_ST_CLOSED_RSVD; 223 cur_stream->close_reason = TCP_ACTIVE_CLOSE; 224 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 225 cur_stream->actions |= MOS_ACT_DESTROY; 226 return TRUE; 227 } 228 229 if (cur_stream->state >= TCP_ST_ESTABLISHED && 230 cur_stream->state <= TCP_ST_CLOSE_WAIT) { 231 /* ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 232 /* TODO: flush all the segment queues */ 233 //NotifyConnectionReset(mtcp, cur_stream); 234 } 235 236 if (!(cur_stream->sndvar->on_closeq || cur_stream->sndvar->on_closeq_int || 237 cur_stream->sndvar->on_resetq || cur_stream->sndvar->on_resetq_int)) { 238 //cur_stream->state = TCP_ST_CLOSED_RSVD; 239 //cur_stream->actions |= MOS_ACT_DESTROY; 240 cur_stream->state = TCP_ST_CLOSED_RSVD; 241 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 242 cur_stream->close_reason = TCP_RESET; 243 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 244 RaiseCloseEvent(mtcp, cur_stream); 245 } 246 247 return TRUE; 248 } 249 /*----------------------------------------------------------------------------*/ 250 inline void 251 EstimateRTT(mtcp_manager_t mtcp, tcp_stream *cur_stream, uint32_t mrtt) 252 { 253 /* This function should be called for not retransmitted packets */ 254 /* TODO: determine tcp_rto_min */ 255 #define TCP_RTO_MIN 0 256 long m = mrtt; 257 uint32_t tcp_rto_min = TCP_RTO_MIN; 258 struct tcp_recv_vars *rcvvar = cur_stream->rcvvar; 259 260 if (m == 0) { 261 m = 1; 262 } 263 if (rcvvar->srtt != 0) { 264 /* rtt = 7/8 rtt + 1/8 new */ 265 m -= (rcvvar->srtt >> 3); 266 rcvvar->srtt += m; 267 if (m < 0) { 268 m = -m; 269 m -= (rcvvar->mdev >> 2); 270 if (m > 0) { 271 m >>= 3; 272 } 273 } else { 274 m -= (rcvvar->mdev >> 2); 275 } 276 rcvvar->mdev += m; 277 if (rcvvar->mdev > rcvvar->mdev_max) { 278 rcvvar->mdev_max = rcvvar->mdev; 279 if (rcvvar->mdev_max > rcvvar->rttvar) { 280 rcvvar->rttvar = rcvvar->mdev_max; 281 } 282 } 283 if (TCP_SEQ_GT(cur_stream->sndvar->snd_una, rcvvar->rtt_seq)) { 284 if (rcvvar->mdev_max < rcvvar->rttvar) { 285 rcvvar->rttvar -= (rcvvar->rttvar - rcvvar->mdev_max) >> 2; 286 } 287 rcvvar->rtt_seq = cur_stream->snd_nxt; 288 rcvvar->mdev_max = tcp_rto_min; 289 } 290 } else { 291 /* fresh measurement */ 292 rcvvar->srtt = m << 3; 293 rcvvar->mdev = m << 1; 294 rcvvar->mdev_max = rcvvar->rttvar = MAX(rcvvar->mdev, tcp_rto_min); 295 rcvvar->rtt_seq = cur_stream->snd_nxt; 296 } 297 298 TRACE_RTT("mrtt: %u (%uus), srtt: %u (%ums), mdev: %u, mdev_max: %u, " 299 "rttvar: %u, rtt_seq: %u\n", mrtt, mrtt * TIME_TICK, 300 rcvvar->srtt, TS_TO_MSEC((rcvvar->srtt) >> 3), rcvvar->mdev, 301 rcvvar->mdev_max, rcvvar->rttvar, rcvvar->rtt_seq); 302 } 303 /*----------------------------------------------------------------------------*/ 304 static inline void 305 ProcessACK(mtcp_manager_t mtcp, tcp_stream *cur_stream, 306 struct pkt_ctx *pctx) 307 { 308 const struct tcphdr* tcph = pctx->p.tcph; 309 uint32_t seq = pctx->p.seq; 310 uint32_t ack_seq = pctx->p.ack_seq; 311 struct tcp_send_vars *sndvar = cur_stream->sndvar; 312 uint32_t cwindow, cwindow_prev; 313 uint32_t rmlen; 314 uint32_t snd_wnd_prev; 315 uint32_t right_wnd_edge; 316 uint8_t dup; 317 318 cwindow = pctx->p.window; 319 if (!tcph->syn) { 320 cwindow = cwindow << sndvar->wscale_peer; 321 } 322 right_wnd_edge = sndvar->peer_wnd + cur_stream->rcvvar->snd_wl2; 323 324 /* If ack overs the sending buffer, return */ 325 if (cur_stream->state == TCP_ST_FIN_WAIT_1 || 326 cur_stream->state == TCP_ST_FIN_WAIT_2 || 327 cur_stream->state == TCP_ST_CLOSING || 328 cur_stream->state == TCP_ST_CLOSE_WAIT || 329 cur_stream->state == TCP_ST_LAST_ACK) { 330 if (sndvar->is_fin_sent && ack_seq == sndvar->fss + 1) { 331 ack_seq--; 332 } 333 } 334 335 if (TCP_SEQ_GT(ack_seq, sndvar->sndbuf->head_seq + sndvar->sndbuf->len)) { 336 TRACE_DBG("Stream %d (%s): invalid acknologement. " 337 "ack_seq: %u, possible max_ack_seq: %u\n", cur_stream->id, 338 TCPStateToString(cur_stream), ack_seq, 339 sndvar->sndbuf->head_seq + sndvar->sndbuf->len); 340 return; 341 } 342 343 #ifdef BE_RESILIENT_TO_PACKET_DROP 344 if (TCP_SEQ_GT(seq + pctx->p.payloadlen, cur_stream->rcv_nxt)) 345 cur_stream->rcv_nxt = seq + pctx->p.payloadlen; 346 #endif 347 348 /* Update window */ 349 if (TCP_SEQ_LT(cur_stream->rcvvar->snd_wl1, seq) || 350 (cur_stream->rcvvar->snd_wl1 == seq && 351 TCP_SEQ_LT(cur_stream->rcvvar->snd_wl2, ack_seq)) || 352 (cur_stream->rcvvar->snd_wl2 == ack_seq && 353 cwindow > sndvar->peer_wnd)) { 354 cwindow_prev = sndvar->peer_wnd; 355 sndvar->peer_wnd = cwindow; 356 cur_stream->rcvvar->snd_wl1 = seq; 357 cur_stream->rcvvar->snd_wl2 = ack_seq; 358 TRACE_CLWND("Window update. " 359 "ack: %u, peer_wnd: %u, snd_nxt-snd_una: %u\n", 360 ack_seq, cwindow, cur_stream->snd_nxt - sndvar->snd_una); 361 if (cwindow_prev < cur_stream->snd_nxt - sndvar->snd_una && 362 sndvar->peer_wnd >= cur_stream->snd_nxt - sndvar->snd_una) { 363 TRACE_CLWND("%u Broadcasting client window update! " 364 "ack_seq: %u, peer_wnd: %u (before: %u), " 365 "(snd_nxt - snd_una: %u)\n", 366 cur_stream->id, ack_seq, sndvar->peer_wnd, cwindow_prev, 367 cur_stream->snd_nxt - sndvar->snd_una); 368 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 369 RaiseWriteEvent(mtcp, cur_stream); 370 } 371 } 372 373 /* Check duplicated ack count */ 374 /* Duplicated ack if 375 1) ack_seq is old 376 2) payload length is 0. 377 3) advertised window not changed. 378 4) there is outstanding unacknowledged data 379 5) ack_seq == snd_una 380 */ 381 382 dup = FALSE; 383 if (TCP_SEQ_LT(ack_seq, cur_stream->snd_nxt)) { 384 if (ack_seq == cur_stream->rcvvar->last_ack_seq && pctx->p.payloadlen == 0) { 385 if (cur_stream->rcvvar->snd_wl2 + sndvar->peer_wnd == right_wnd_edge) { 386 if (cur_stream->rcvvar->dup_acks + 1 > cur_stream->rcvvar->dup_acks) { 387 cur_stream->rcvvar->dup_acks++; 388 } 389 dup = TRUE; 390 } 391 } 392 } 393 if (!dup) { 394 cur_stream->rcvvar->dup_acks = 0; 395 cur_stream->rcvvar->last_ack_seq = ack_seq; 396 } 397 398 /* Fast retransmission */ 399 if (dup && cur_stream->rcvvar->dup_acks == 3) { 400 TRACE_LOSS("Triple duplicated ACKs!! ack_seq: %u\n", ack_seq); 401 if (TCP_SEQ_LT(ack_seq, cur_stream->snd_nxt)) { 402 TRACE_LOSS("Reducing snd_nxt from %u to %u\n", 403 cur_stream->snd_nxt, ack_seq); 404 #if RTM_STAT 405 sndvar->rstat.tdp_ack_cnt++; 406 sndvar->rstat.tdp_ack_bytes += (cur_stream->snd_nxt - ack_seq); 407 #endif 408 if (ack_seq != sndvar->snd_una) { 409 TRACE_DBG("ack_seq and snd_una mismatch on tdp ack. " 410 "ack_seq: %u, snd_una: %u\n", 411 ack_seq, sndvar->snd_una); 412 } 413 cur_stream->snd_nxt = ack_seq; 414 } 415 416 /* update congestion control variables */ 417 /* ssthresh to half of min of cwnd and peer wnd */ 418 sndvar->ssthresh = MIN(sndvar->cwnd, sndvar->peer_wnd) / 2; 419 if (sndvar->ssthresh < 2 * sndvar->mss) { 420 sndvar->ssthresh = 2 * sndvar->mss; 421 } 422 sndvar->cwnd = sndvar->ssthresh + 3 * sndvar->mss; 423 TRACE_CONG("Fast retransmission. cwnd: %u, ssthresh: %u\n", 424 sndvar->cwnd, sndvar->ssthresh); 425 426 /* count number of retransmissions */ 427 if (sndvar->nrtx < TCP_MAX_RTX) { 428 sndvar->nrtx++; 429 } else { 430 TRACE_DBG("Exceed MAX_RTX.\n"); 431 } 432 433 cur_stream->actions |= MOS_ACT_SEND_DATA; 434 435 } else if (cur_stream->rcvvar->dup_acks > 3) { 436 /* Inflate congestion window until before overflow */ 437 if ((uint32_t)(sndvar->cwnd + sndvar->mss) > sndvar->cwnd) { 438 sndvar->cwnd += sndvar->mss; 439 TRACE_CONG("Dupack cwnd inflate. cwnd: %u, ssthresh: %u\n", 440 sndvar->cwnd, sndvar->ssthresh); 441 } 442 } 443 444 #if TCP_OPT_SACK_ENABLED 445 ParseSACKOption(cur_stream, ack_seq, (uint8_t *)tcph + TCP_HEADER_LEN, 446 (tcph->doff << 2) - TCP_HEADER_LEN); 447 #endif /* TCP_OPT_SACK_ENABLED */ 448 449 #if RECOVERY_AFTER_LOSS 450 /* updating snd_nxt (when recovered from loss) */ 451 if (TCP_SEQ_GT(ack_seq, cur_stream->snd_nxt)) { 452 #if RTM_STAT 453 sndvar->rstat.ack_upd_cnt++; 454 sndvar->rstat.ack_upd_bytes += (ack_seq - cur_stream->snd_nxt); 455 #endif 456 TRACE_LOSS("Updating snd_nxt from %u to %u\n", 457 cur_stream->snd_nxt, ack_seq); 458 cur_stream->snd_nxt = ack_seq; 459 if (sndvar->sndbuf->len == 0) { 460 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 461 RemoveFromSendList(mtcp, cur_stream); 462 } 463 } 464 #endif 465 466 /* If ack_seq is previously acked, return */ 467 if (TCP_SEQ_GEQ(sndvar->sndbuf->head_seq, ack_seq)) { 468 return; 469 } 470 471 /* Remove acked sequence from send buffer */ 472 rmlen = ack_seq - sndvar->sndbuf->head_seq; 473 if (rmlen > 0) { 474 /* Routine goes here only if there is new payload (not retransmitted) */ 475 uint16_t packets; 476 477 /* If acks new data */ 478 packets = rmlen / sndvar->eff_mss; 479 if ((rmlen / sndvar->eff_mss) * sndvar->eff_mss > rmlen) { 480 packets++; 481 } 482 483 /* Estimate RTT and calculate rto */ 484 if (cur_stream->saw_timestamp) { 485 EstimateRTT(mtcp, cur_stream, 486 pctx->p.cur_ts - cur_stream->rcvvar->ts_lastack_rcvd); 487 sndvar->rto = (cur_stream->rcvvar->srtt >> 3) + cur_stream->rcvvar->rttvar; 488 assert(sndvar->rto > 0); 489 } else { 490 //TODO: Need to implement timestamp estimation without timestamp 491 TRACE_RTT("NOT IMPLEMENTED.\n"); 492 } 493 494 /* Update congestion control variables */ 495 if (cur_stream->state >= TCP_ST_ESTABLISHED) { 496 if (sndvar->cwnd < sndvar->ssthresh) { 497 if ((sndvar->cwnd + sndvar->mss) > sndvar->cwnd) { 498 sndvar->cwnd += (sndvar->mss * packets); 499 } 500 TRACE_CONG("slow start cwnd: %u, ssthresh: %u\n", 501 sndvar->cwnd, sndvar->ssthresh); 502 } else { 503 uint32_t new_cwnd = sndvar->cwnd + 504 packets * sndvar->mss * sndvar->mss / 505 sndvar->cwnd; 506 if (new_cwnd > sndvar->cwnd) { 507 sndvar->cwnd = new_cwnd; 508 } 509 //TRACE_CONG("congestion avoidance cwnd: %u, ssthresh: %u\n", 510 // sndvar->cwnd, sndvar->ssthresh); 511 } 512 } 513 514 if (SBUF_LOCK(&sndvar->write_lock)) { 515 if (errno == EDEADLK) 516 perror("ProcessACK: write_lock blocked\n"); 517 assert(0); 518 } 519 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 520 SBRemove(mtcp->rbm_snd, sndvar->sndbuf, rmlen); 521 sndvar->snd_una = ack_seq; 522 snd_wnd_prev = sndvar->snd_wnd; 523 sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len; 524 525 /* If there was no available sending window */ 526 /* notify the newly available window to application */ 527 #if SELECTIVE_WRITE_EVENT_NOTIFY 528 if (snd_wnd_prev <= 0) { 529 #endif /* SELECTIVE_WRITE_EVENT_NOTIFY */ 530 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 531 RaiseWriteEvent(mtcp, cur_stream); 532 #if SELECTIVE_WRITE_EVENT_NOTIFY 533 } 534 #endif /* SELECTIVE_WRITE_EVENT_NOTIFY */ 535 536 SBUF_UNLOCK(&sndvar->write_lock); 537 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 538 UpdateRetransmissionTimer(mtcp, cur_stream, pctx->p.cur_ts); 539 } 540 } 541 /*----------------------------------------------------------------------------*/ 542 /* ProcessTCPPayload: merges TCP payload using receive ring buffer */ 543 /* Return: TRUE (1) in normal case, FALSE (0) if immediate ACK is required */ 544 /* CAUTION: should only be called at ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2 */ 545 /*----------------------------------------------------------------------------*/ 546 static inline int 547 ProcessTCPPayload(mtcp_manager_t mtcp, tcp_stream *cur_stream, 548 struct pkt_ctx *pctx) 549 { 550 struct tcp_recv_vars *rcvvar = cur_stream->rcvvar; 551 uint32_t prev_rcv_nxt; 552 int ret = -1; 553 bool read_lock; 554 struct socket_map *walk; 555 556 if (!cur_stream->buffer_mgmt) 557 return FALSE; 558 559 /* if seq and segment length is lower than rcv_nxt, ignore and send ack */ 560 if (TCP_SEQ_LT(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt)) { 561 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 562 HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side, 563 pctx, MOS_ON_ERROR); 564 } SOCKQ_FOREACH_END; 565 return FALSE; 566 } 567 /* if payload exceeds receiving buffer, drop and send ack */ 568 if (TCP_SEQ_GT(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt + rcvvar->rcv_wnd)) { 569 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 570 HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side, 571 pctx, MOS_ON_ERROR); 572 } SOCKQ_FOREACH_END; 573 return FALSE; 574 } 575 576 /* allocate receive buffer if not exist */ 577 if (!rcvvar->rcvbuf) { 578 rcvvar->rcvbuf = tcprb_new(mtcp->bufseg_pool, g_config.mos->rmem_size, cur_stream->buffer_mgmt); 579 if (!rcvvar->rcvbuf) { 580 TRACE_ERROR("Stream %d: Failed to allocate receive buffer.\n", 581 cur_stream->id); 582 cur_stream->state = TCP_ST_CLOSED_RSVD; 583 cur_stream->close_reason = TCP_NO_MEM; 584 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 585 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 586 RaiseErrorEvent(mtcp, cur_stream); 587 588 return ERROR; 589 } 590 } 591 592 read_lock = HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM); 593 594 if (read_lock && SBUF_LOCK(&rcvvar->read_lock)) { 595 if (errno == EDEADLK) 596 perror("ProcessTCPPayload: read_lock blocked\n"); 597 assert(0); 598 } 599 600 prev_rcv_nxt = cur_stream->rcv_nxt; 601 602 tcprb_t *rb = rcvvar->rcvbuf; 603 loff_t off = seq2loff(rb, pctx->p.seq, (rcvvar->irs + 1)); 604 if (off >= 0) { 605 if (!HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM) && 606 HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE)) 607 tcprb_setpile(rb, rb->pile + tcprb_cflen(rb)); 608 ret = tcprb_pwrite(rb, pctx->p.payload, pctx->p.payloadlen, off); 609 if (ret < 0) { 610 /* We try again after warning this result to the user. */ 611 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 612 HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side, 613 pctx, MOS_ON_ERROR); 614 } SOCKQ_FOREACH_END; 615 ret = tcprb_pwrite(rb, pctx->p.payload, pctx->p.payloadlen, off); 616 } 617 } 618 /* TODO: update monitor vars */ 619 620 /* 621 * error can pop up due to disabled buffered management 622 * (only in monitor mode). In that case, ignore the warning 623 * message. 624 */ 625 if (ret < 0 && cur_stream->buffer_mgmt && mtcp->num_msp == 0) 626 TRACE_ERROR("Cannot merge payload. reason: %d\n", ret); 627 628 /* discard the buffer if the state is FIN_WAIT_1 or FIN_WAIT_2, 629 meaning that the connection is already closed by the application */ 630 loff_t cftail = rb->pile + tcprb_cflen(rb); 631 if (cur_stream->state == TCP_ST_FIN_WAIT_1 || 632 cur_stream->state == TCP_ST_FIN_WAIT_2) { 633 /* XXX: Do we really need to update recv vars? */ 634 tcprb_setpile(rb, cftail); 635 } 636 if (cftail > 0 && (rcvvar->irs + 1) + cftail > cur_stream->rcv_nxt) { 637 RAISE_DEBUG_EVENT(mtcp, cur_stream, 638 "Move rcv_nxt from %u to %u.\n", 639 cur_stream->rcv_nxt, (rcvvar->irs + 1) + cftail); 640 cur_stream->rcv_nxt = (rcvvar->irs + 1) + cftail; 641 } 642 assert(cftail - rb->pile >= 0); 643 rcvvar->rcv_wnd = rb->len - (cftail - rb->pile); 644 645 if (read_lock) 646 SBUF_UNLOCK(&rcvvar->read_lock); 647 648 649 if (TCP_SEQ_LEQ(cur_stream->rcv_nxt, prev_rcv_nxt)) { 650 /* There are some lost packets */ 651 return FALSE; 652 } 653 654 TRACE_EPOLL("Stream %d data arrived. " 655 "len: %d, ET: %llu, IN: %llu, OUT: %llu\n", 656 cur_stream->id, pctx->p.payloadlen, 657 cur_stream->socket? (unsigned long long)cur_stream->socket->epoll & MOS_EPOLLET : 0, 658 cur_stream->socket? (unsigned long long)cur_stream->socket->epoll & MOS_EPOLLIN : 0, 659 cur_stream->socket? (unsigned long long)cur_stream->socket->epoll & MOS_EPOLLOUT : 0); 660 661 if (cur_stream->state == TCP_ST_ESTABLISHED) 662 RaiseReadEvent(mtcp, cur_stream); 663 664 return TRUE; 665 } 666 /*----------------------------------------------------------------------------*/ 667 static inline void 668 Handle_TCP_ST_LISTEN (mtcp_manager_t mtcp, tcp_stream* cur_stream, 669 struct pkt_ctx *pctx) 670 { 671 672 const struct tcphdr* tcph = pctx->p.tcph; 673 674 if (tcph->syn) { 675 if (cur_stream->state == TCP_ST_LISTEN) 676 cur_stream->rcv_nxt++; 677 cur_stream->state = TCP_ST_SYN_RCVD; 678 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE | MOS_ON_CONN_START; 679 TRACE_STATE("Stream %d: TCP_ST_SYN_RCVD\n", cur_stream->id); 680 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 681 if (IS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE)) { 682 /** 683 * Passive stream context needs to initialize irs and rcv_nxt 684 * as it is not set neither during createserverstream or monitor 685 * creation. 686 */ 687 cur_stream->rcvvar->irs = 688 cur_stream->rcv_nxt = pctx->p.seq; 689 } 690 } else { 691 CTRACE_ERROR("Stream %d (TCP_ST_LISTEN): " 692 "Packet without SYN.\n", cur_stream->id); 693 } 694 695 } 696 /*----------------------------------------------------------------------------*/ 697 static inline void 698 Handle_TCP_ST_SYN_SENT (mtcp_manager_t mtcp, tcp_stream* cur_stream, 699 struct pkt_ctx *pctx) 700 { 701 const struct tcphdr* tcph = pctx->p.tcph; 702 703 /* when active open */ 704 if (tcph->ack) { 705 /* filter the unacceptable acks */ 706 if (TCP_SEQ_LEQ(pctx->p.ack_seq, cur_stream->sndvar->iss) 707 #ifndef BE_RESILIENT_TO_PACKET_DROP 708 || TCP_SEQ_GT(pctx->p.ack_seq, cur_stream->snd_nxt) 709 #endif 710 ) { 711 if (!tcph->rst) { 712 cur_stream->actions |= MOS_ACT_SEND_RST; 713 } 714 return; 715 } 716 /* accept the ack */ 717 cur_stream->sndvar->snd_una++; 718 } 719 720 if (tcph->rst) { 721 if (tcph->ack) { 722 cur_stream->state = TCP_ST_CLOSED_RSVD; 723 cur_stream->close_reason = TCP_RESET; 724 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 725 if (cur_stream->socket) { 726 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 727 RaiseErrorEvent(mtcp, cur_stream); 728 } else { 729 cur_stream->actions |= MOS_ACT_DESTROY; 730 } 731 } 732 return; 733 } 734 735 if (tcph->ack 736 #ifndef BE_RESILIENT_TO_PACKET_DROP 737 /* If we already lost SYNACK, let the ACK packet do the SYNACK's role */ 738 && tcph->syn 739 #endif 740 ) { 741 int ret = HandleActiveOpen(mtcp, cur_stream, pctx); 742 if (!ret) { 743 return; 744 } 745 746 #ifdef BE_RESILIENT_TO_PACKET_DROP 747 if (!tcph->syn) { 748 RAISE_DEBUG_EVENT(mtcp, cur_stream, 749 "We missed SYNACK. Replace it with an ACK packet.\n"); 750 /* correct some variables */ 751 cur_stream->rcv_nxt = pctx->p.seq; 752 } 753 #endif 754 755 cur_stream->sndvar->nrtx = 0; 756 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 757 RemoveFromRTOList(mtcp, cur_stream); 758 cur_stream->state = TCP_ST_ESTABLISHED; 759 cur_stream->cb_events |= /*MOS_ON_CONN_SETUP |*/ MOS_ON_TCP_STATE_CHANGE; 760 TRACE_STATE("Stream %d: TCP_ST_ESTABLISHED\n", cur_stream->id); 761 762 if (cur_stream->socket) { 763 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 764 RaiseWriteEvent(mtcp, cur_stream); 765 } else { 766 TRACE_STATE("Stream %d: ESTABLISHED, but no socket\n", cur_stream->id); 767 cur_stream->close_reason = TCP_ACTIVE_CLOSE; 768 cur_stream->actions |= MOS_ACT_SEND_RST; 769 cur_stream->actions |= MOS_ACT_DESTROY; 770 } 771 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 772 if (g_config.mos->tcp_timeout > 0) 773 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 774 AddtoTimeoutList(mtcp, cur_stream); 775 776 #ifdef BE_RESILIENT_TO_PACKET_DROP 777 /* Handle this ack packet */ 778 if (!tcph->syn) 779 Handle_TCP_ST_ESTABLISHED(mtcp, cur_stream, pctx); 780 #endif 781 782 } else if (tcph->syn) { 783 cur_stream->state = TCP_ST_SYN_RCVD; 784 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 785 TRACE_STATE("Stream %d: TCP_ST_SYN_RCVD\n", cur_stream->id); 786 cur_stream->snd_nxt = cur_stream->sndvar->iss; 787 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 788 } 789 } 790 /*----------------------------------------------------------------------------*/ 791 static inline void 792 Handle_TCP_ST_SYN_RCVD (mtcp_manager_t mtcp, tcp_stream* cur_stream, 793 struct pkt_ctx *pctx) 794 { 795 const struct tcphdr* tcph = pctx->p.tcph; 796 struct tcp_send_vars *sndvar = cur_stream->sndvar; 797 int ret; 798 799 if (tcph->ack) { 800 uint32_t prior_cwnd; 801 /* NOTE: We do not validate the ack number because first few packets 802 * can also come out of order */ 803 804 sndvar->snd_una++; 805 cur_stream->snd_nxt = pctx->p.ack_seq; 806 prior_cwnd = sndvar->cwnd; 807 sndvar->cwnd = ((prior_cwnd == 1)? 808 (sndvar->mss * 2): sndvar->mss); 809 810 //UpdateRetransmissionTimer(mtcp, cur_stream, cur_ts); 811 sndvar->nrtx = 0; 812 cur_stream->rcv_nxt = cur_stream->rcvvar->irs + 1; 813 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 814 RemoveFromRTOList(mtcp, cur_stream); 815 816 cur_stream->state = TCP_ST_ESTABLISHED; 817 cur_stream->cb_events |= /*MOS_ON_CONN_SETUP |*/ MOS_ON_TCP_STATE_CHANGE; 818 TRACE_STATE("Stream %d: TCP_ST_ESTABLISHED\n", cur_stream->id); 819 820 #ifdef BE_RESILIENT_TO_PACKET_DROP 821 if (pctx->p.ack_seq != sndvar->iss + 1) 822 Handle_TCP_ST_ESTABLISHED(mtcp, cur_stream, pctx); 823 #endif 824 825 /* update listening socket */ 826 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) { 827 828 struct tcp_listener *listener = mtcp->listener; 829 830 ret = StreamEnqueue(listener->acceptq, cur_stream); 831 if (ret < 0) { 832 TRACE_ERROR("Stream %d: Failed to enqueue to " 833 "the listen backlog!\n", cur_stream->id); 834 cur_stream->close_reason = TCP_NOT_ACCEPTED; 835 cur_stream->state = TCP_ST_CLOSED_RSVD; 836 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 837 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", cur_stream->id); 838 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 839 } 840 841 /* raise an event to the listening socket */ 842 if (listener->socket && (listener->socket->epoll & MOS_EPOLLIN)) { 843 AddEpollEvent(mtcp->ep, 844 MOS_EVENT_QUEUE, listener->socket, MOS_EPOLLIN); 845 } 846 } 847 848 //TRACE_DBG("Stream %d inserted into acceptq.\n", cur_stream->id); 849 if (g_config.mos->tcp_timeout > 0) 850 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 851 AddtoTimeoutList(mtcp, cur_stream); 852 853 } else { 854 /* Handle retransmitted SYN packet */ 855 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE) && 856 tcph->syn) { 857 if (pctx->p.seq == cur_stream->pair_stream->sndvar->iss) { 858 TRACE_DBG("syn retransmit! (p.seq = %u / iss = %u)\n", 859 pctx->p.seq, cur_stream->pair_stream->sndvar->iss); 860 cur_stream->cb_events |= MOS_ON_REXMIT; 861 } 862 } 863 864 TRACE_DBG("Stream %d (TCP_ST_SYN_RCVD): No ACK.\n", 865 cur_stream->id); 866 /* retransmit SYN/ACK */ 867 cur_stream->snd_nxt = sndvar->iss; 868 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 869 } 870 } 871 /*----------------------------------------------------------------------------*/ 872 static inline void 873 Handle_TCP_ST_ESTABLISHED (mtcp_manager_t mtcp, tcp_stream* cur_stream, 874 struct pkt_ctx *pctx) 875 { 876 const struct tcphdr* tcph = pctx->p.tcph; 877 878 if (tcph->syn) { 879 /* Handle retransmitted SYNACK packet */ 880 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE) && 881 tcph->ack) { 882 if (pctx->p.seq == cur_stream->pair_stream->sndvar->iss) { 883 TRACE_DBG("syn/ack retransmit! (p.seq = %u / iss = %u)\n", 884 pctx->p.seq, cur_stream->pair_stream->sndvar->iss); 885 cur_stream->cb_events |= MOS_ON_REXMIT; 886 } 887 } 888 889 TRACE_DBG("Stream %d (TCP_ST_ESTABLISHED): weird SYN. " 890 "seq: %u, expected: %u, ack_seq: %u, expected: %u\n", 891 cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt, 892 pctx->p.ack_seq, cur_stream->snd_nxt); 893 cur_stream->snd_nxt = pctx->p.ack_seq; 894 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 895 return; 896 } 897 898 if (pctx->p.payloadlen > 0) { 899 if (ProcessTCPPayload(mtcp, cur_stream, pctx)) { 900 /* if return is TRUE, send ACK */ 901 cur_stream->actions |= MOS_ACT_SEND_ACK_AGG; 902 } else { 903 cur_stream->actions |= MOS_ACT_SEND_ACK_NOW; 904 } 905 } 906 907 if (tcph->ack) { 908 if (cur_stream->sndvar->sndbuf) { 909 ProcessACK(mtcp, cur_stream, pctx); 910 } 911 } 912 913 if (tcph->fin) { 914 /* process the FIN only if the sequence is valid */ 915 /* FIN packet is allowed to push payload (should we check for PSH flag)? */ 916 if (!cur_stream->buffer_mgmt || 917 #ifdef BE_RESILIENT_TO_PACKET_DROP 918 TCP_SEQ_GEQ(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt) 919 #else 920 pctx->p.seq + pctx->p.payloadlen == cur_stream->rcv_nxt 921 #endif 922 ) { 923 #ifdef BE_RESILIENT_TO_PACKET_DROP 924 cur_stream->rcv_nxt = pctx->p.seq + pctx->p.payloadlen; 925 #endif 926 cur_stream->state = TCP_ST_CLOSE_WAIT; 927 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 928 TRACE_STATE("Stream %d: TCP_ST_CLOSE_WAIT\n", cur_stream->id); 929 cur_stream->rcv_nxt++; 930 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 931 932 /* notify FIN to application */ 933 RaiseReadEvent(mtcp, cur_stream); 934 } else { 935 RAISE_DEBUG_EVENT(mtcp, cur_stream, 936 "Expected %u, but received %u (= %u + %u)\n", 937 cur_stream->rcv_nxt, pctx->p.seq + pctx->p.payloadlen, 938 pctx->p.seq, pctx->p.payloadlen); 939 940 cur_stream->actions |= MOS_ACT_SEND_ACK_NOW; 941 return; 942 } 943 } 944 } 945 /*----------------------------------------------------------------------------*/ 946 static inline void 947 Handle_TCP_ST_CLOSE_WAIT (mtcp_manager_t mtcp, tcp_stream* cur_stream, 948 struct pkt_ctx *pctx) 949 { 950 if (TCP_SEQ_LT(pctx->p.seq, cur_stream->rcv_nxt)) { 951 TRACE_DBG("Stream %d (TCP_ST_CLOSE_WAIT): " 952 "weird seq: %u, expected: %u\n", 953 cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt); 954 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 955 return; 956 } 957 958 if (cur_stream->sndvar->sndbuf) { 959 ProcessACK(mtcp, cur_stream, pctx); 960 } 961 } 962 /*----------------------------------------------------------------------------*/ 963 static inline void 964 Handle_TCP_ST_LAST_ACK (mtcp_manager_t mtcp, tcp_stream* cur_stream, 965 struct pkt_ctx *pctx) 966 { 967 const struct tcphdr* tcph = pctx->p.tcph; 968 969 if (TCP_SEQ_LT(pctx->p.seq, cur_stream->rcv_nxt)) { 970 TRACE_DBG("Stream %d (TCP_ST_LAST_ACK): " 971 "weird seq: %u, expected: %u\n", 972 cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt); 973 return; 974 } 975 976 if (tcph->ack) { 977 if (cur_stream->sndvar->sndbuf) { 978 ProcessACK(mtcp, cur_stream, pctx); 979 } 980 981 if (!cur_stream->sndvar->is_fin_sent) { 982 /* the case that FIN is not sent yet */ 983 /* this is not ack for FIN, ignore */ 984 TRACE_DBG("Stream %d (TCP_ST_LAST_ACK): " 985 "No FIN sent yet.\n", cur_stream->id); 986 #ifdef DBGMSG 987 DumpIPPacket(mtcp, pctx->p.iph, pctx->p.ip_len); 988 #endif 989 #if DUMP_STREAM 990 DumpStream(mtcp, cur_stream); 991 DumpControlList(mtcp, mtcp->n_sender[0]); 992 #endif 993 return; 994 } 995 996 /* check if ACK of FIN */ 997 if (pctx->p.ack_seq == cur_stream->sndvar->fss + 1) { 998 cur_stream->sndvar->snd_una++; 999 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 1000 UpdateRetransmissionTimer(mtcp, cur_stream, pctx->p.cur_ts); 1001 cur_stream->state = TCP_ST_CLOSED_RSVD; 1002 cur_stream->close_reason = TCP_PASSIVE_CLOSE; 1003 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1004 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", 1005 cur_stream->id); 1006 cur_stream->actions |= MOS_ACT_DESTROY; 1007 } else { 1008 TRACE_DBG("Stream %d (TCP_ST_LAST_ACK): Not ACK of FIN. " 1009 "ack_seq: %u, expected: %u\n", 1010 cur_stream->id, pctx->p.ack_seq, cur_stream->sndvar->fss + 1); 1011 //cur_stream->snd_nxt = cur_stream->sndvar->fss; 1012 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 1013 } 1014 } else { 1015 TRACE_DBG("Stream %d (TCP_ST_LAST_ACK): No ACK\n", 1016 cur_stream->id); 1017 //cur_stream->snd_nxt = cur_stream->sndvar->fss; 1018 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 1019 } 1020 } 1021 /*----------------------------------------------------------------------------*/ 1022 static inline void 1023 Handle_TCP_ST_FIN_WAIT_1 (mtcp_manager_t mtcp, tcp_stream* cur_stream, 1024 struct pkt_ctx *pctx) 1025 { 1026 const struct tcphdr* tcph = pctx->p.tcph; 1027 1028 if (TCP_SEQ_LT(pctx->p.seq, cur_stream->rcv_nxt)) { 1029 RAISE_DEBUG_EVENT(mtcp, cur_stream, 1030 "Stream %d (FIN_WAIT_1): " 1031 "weird seq: %u, expected: %u\n", 1032 cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt); 1033 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 1034 return; 1035 } 1036 1037 if (tcph->ack) { 1038 if (cur_stream->sndvar->sndbuf) { 1039 ProcessACK(mtcp, cur_stream, pctx); 1040 } 1041 #if BE_RESILIENT_TO_PACKET_DROP 1042 if (cur_stream->sndvar->is_fin_sent && 1043 ((!HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM) && 1044 pctx->p.ack_seq == cur_stream->sndvar->fss) || 1045 pctx->p.ack_seq == cur_stream->sndvar->fss + 1)) { 1046 1047 #else 1048 if (cur_stream->sndvar->is_fin_sent && 1049 pctx->p.ack_seq == cur_stream->sndvar->fss + 1) { 1050 #endif 1051 cur_stream->sndvar->snd_una = pctx->p.ack_seq; 1052 if (TCP_SEQ_GT(pctx->p.ack_seq, cur_stream->snd_nxt)) { 1053 TRACE_DBG("Stream %d: update snd_nxt to %u\n", 1054 cur_stream->id, pctx->p.ack_seq); 1055 cur_stream->snd_nxt = pctx->p.ack_seq; 1056 } 1057 //cur_stream->sndvar->snd_una++; 1058 //UpdateRetransmissionTimer(mtcp, cur_stream, cur_ts); 1059 cur_stream->sndvar->nrtx = 0; 1060 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 1061 RemoveFromRTOList(mtcp, cur_stream); 1062 cur_stream->state = TCP_ST_FIN_WAIT_2; 1063 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1064 TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_2\n", 1065 cur_stream->id); 1066 } else { 1067 RAISE_DEBUG_EVENT(mtcp, cur_stream, 1068 "Failed to transit to FIN_WAIT_2, " 1069 "is_fin_sent: %s, ack_seq: %u, sndvar->fss: %u\n", 1070 cur_stream->sndvar->is_fin_sent ? "true" : "false", 1071 pctx->p.ack_seq, cur_stream->sndvar->fss); 1072 } 1073 1074 } else { 1075 RAISE_DEBUG_EVENT(mtcp, cur_stream, 1076 "Failed to transit to FIN_WAIT_2, " 1077 "We got a %s%s%s%s%s%s packet.", 1078 pctx->p.tcph->syn ? "S" : "", 1079 pctx->p.tcph->fin ? "F" : "", 1080 pctx->p.tcph->rst ? "R" : "", 1081 pctx->p.tcph->psh ? "P" : "", 1082 pctx->p.tcph->urg ? "U" : "", 1083 pctx->p.tcph->ack ? "A" : ""); 1084 1085 TRACE_DBG("Stream %d: does not contain an ack!\n", 1086 cur_stream->id); 1087 return; 1088 } 1089 1090 if (pctx->p.payloadlen > 0) { 1091 if (ProcessTCPPayload(mtcp, cur_stream, pctx)) { 1092 /* if return is TRUE, send ACK */ 1093 cur_stream->actions |= MOS_ACT_SEND_ACK_AGG; 1094 } else { 1095 cur_stream->actions |= MOS_ACT_SEND_ACK_NOW; 1096 } 1097 } 1098 1099 if (tcph->fin) { 1100 /* process the FIN only if the sequence is valid */ 1101 /* FIN packet is allowed to push payload (should we check for PSH flag)? */ 1102 if (!cur_stream->buffer_mgmt || 1103 #ifdef BE_RESILIENT_TO_PACKET_DROP 1104 TCP_SEQ_GEQ(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt) 1105 #else 1106 pctx->p.seq + pctx->p.payloadlen == cur_stream->rcv_nxt 1107 #endif 1108 ) { 1109 cur_stream->rcv_nxt++; 1110 1111 if (cur_stream->state == TCP_ST_FIN_WAIT_1) { 1112 cur_stream->state = TCP_ST_CLOSING; 1113 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1114 TRACE_STATE("Stream %d: TCP_ST_CLOSING\n", cur_stream->id); 1115 1116 } else if (cur_stream->state == TCP_ST_FIN_WAIT_2) { 1117 cur_stream->state = TCP_ST_TIME_WAIT; 1118 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1119 TRACE_STATE("Stream %d: TCP_ST_TIME_WAIT\n", cur_stream->id); 1120 AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts); 1121 } 1122 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 1123 } else { 1124 RAISE_DEBUG_EVENT(mtcp, cur_stream, 1125 "Expected %u, but received %u (= %u + %u)\n", 1126 cur_stream->rcv_nxt, pctx->p.seq + pctx->p.payloadlen, 1127 pctx->p.seq, pctx->p.payloadlen); 1128 1129 cur_stream->actions |= MOS_ACT_SEND_ACK_NOW; 1130 return; 1131 } 1132 } 1133 } 1134 /*----------------------------------------------------------------------------*/ 1135 static inline void 1136 Handle_TCP_ST_FIN_WAIT_2 (mtcp_manager_t mtcp, tcp_stream* cur_stream, 1137 struct pkt_ctx *pctx) 1138 { 1139 const struct tcphdr* tcph = pctx->p.tcph; 1140 1141 if (tcph->ack) { 1142 if (cur_stream->sndvar->sndbuf) { 1143 ProcessACK(mtcp, cur_stream, pctx); 1144 } 1145 } else { 1146 TRACE_DBG("Stream %d: does not contain an ack!\n", 1147 cur_stream->id); 1148 return; 1149 } 1150 1151 if (pctx->p.payloadlen > 0) { 1152 if (ProcessTCPPayload(mtcp, cur_stream, pctx)) { 1153 /* if return is TRUE, send ACK */ 1154 cur_stream->actions |= MOS_ACT_SEND_ACK_AGG; 1155 } else { 1156 cur_stream->actions |= MOS_ACT_SEND_ACK_NOW; 1157 } 1158 } 1159 1160 if (tcph->fin) { 1161 /* process the FIN only if the sequence is valid */ 1162 /* FIN packet is allowed to push payload (should we check for PSH flag)? */ 1163 if (!cur_stream->buffer_mgmt || 1164 #ifdef BE_RESILIENT_TO_PACKET_DROP 1165 TCP_SEQ_GEQ(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt) 1166 #else 1167 pctx->p.seq + pctx->p.payloadlen == cur_stream->rcv_nxt 1168 #endif 1169 ) { 1170 cur_stream->state = TCP_ST_TIME_WAIT; 1171 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1172 cur_stream->rcv_nxt++; 1173 TRACE_STATE("Stream %d: TCP_ST_TIME_WAIT\n", cur_stream->id); 1174 1175 AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts); 1176 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 1177 } 1178 } else { 1179 TRACE_DBG("Stream %d (TCP_ST_FIN_WAIT_2): No FIN. " 1180 "seq: %u, ack_seq: %u, snd_nxt: %u, snd_una: %u\n", 1181 cur_stream->id, pctx->p.seq, pctx->p.ack_seq, 1182 cur_stream->snd_nxt, cur_stream->sndvar->snd_una); 1183 #if DBGMSG 1184 DumpIPPacket(mtcp, pctx->p.iph, pctx->p.ip_len); 1185 #endif 1186 } 1187 1188 } 1189 /*----------------------------------------------------------------------------*/ 1190 static inline void 1191 Handle_TCP_ST_CLOSING (mtcp_manager_t mtcp, tcp_stream* cur_stream, 1192 struct pkt_ctx *pctx) 1193 { 1194 const struct tcphdr* tcph = pctx->p.tcph; 1195 1196 if (tcph->ack) { 1197 if (cur_stream->sndvar->sndbuf) { 1198 ProcessACK(mtcp, cur_stream, pctx); 1199 } 1200 1201 if (!cur_stream->sndvar->is_fin_sent) { 1202 TRACE_DBG("Stream %d (TCP_ST_CLOSING): " 1203 "No FIN sent yet.\n", cur_stream->id); 1204 return; 1205 } 1206 1207 // check if ACK of FIN 1208 if (pctx->p.ack_seq != cur_stream->sndvar->fss + 1) { 1209 #if DBGMSG 1210 TRACE_DBG("Stream %d (TCP_ST_CLOSING): Not ACK of FIN. " 1211 "ack_seq: %u, snd_nxt: %u, snd_una: %u, fss: %u\n", 1212 cur_stream->id, pctx->p.ack_seq, cur_stream->snd_nxt, 1213 cur_stream->sndvar->snd_una, cur_stream->sndvar->fss); 1214 DumpIPPacketToFile(stderr, pctx->p.iph, pctx->p.ip_len); 1215 DumpStream(mtcp, cur_stream); 1216 #endif 1217 //assert(0); 1218 /* if the packet is not the ACK of FIN, ignore */ 1219 return; 1220 } 1221 1222 cur_stream->sndvar->snd_una = pctx->p.ack_seq; 1223 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 1224 UpdateRetransmissionTimer(mtcp, cur_stream, pctx->p.cur_ts); 1225 cur_stream->state = TCP_ST_TIME_WAIT; 1226 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1227 TRACE_STATE("Stream %d: TCP_ST_TIME_WAIT\n", cur_stream->id); 1228 1229 AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts); 1230 1231 } else { 1232 TRACE_DBG("Stream %d (TCP_ST_CLOSING): Not ACK\n", 1233 cur_stream->id); 1234 return; 1235 } 1236 } 1237 /*----------------------------------------------------------------------------*/ 1238 void 1239 UpdateRecvTCPContext(mtcp_manager_t mtcp, struct tcp_stream *cur_stream, 1240 struct pkt_ctx *pctx) 1241 { 1242 struct tcphdr* tcph = pctx->p.tcph; 1243 int ret; 1244 1245 assert(cur_stream); 1246 1247 /* Validate sequence. if not valid, ignore the packet */ 1248 if (cur_stream->state > TCP_ST_SYN_RCVD) { 1249 1250 ret = ValidateSequence(mtcp, cur_stream, pctx); 1251 if (!ret) { 1252 TRACE_DBG("Stream %d: Unexpected sequence: %u, expected: %u\n", 1253 cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt); 1254 #ifdef DBGMSG 1255 DumpIPPacket(mtcp, pctx->p.iph, pctx->p.ip_len); 1256 #endif 1257 #if DUMP_STREAM 1258 DumpStream(mtcp, cur_stream); 1259 #endif 1260 /* cur_stream->cb_events |= MOS_ON_ERROR; */ 1261 } 1262 } 1263 /* Update receive window size */ 1264 if (tcph->syn) { 1265 cur_stream->sndvar->peer_wnd = pctx->p.window; 1266 } else { 1267 cur_stream->sndvar->peer_wnd = 1268 (uint32_t)pctx->p.window << cur_stream->sndvar->wscale_peer; 1269 } 1270 1271 cur_stream->last_active_ts = pctx->p.cur_ts; 1272 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 1273 UpdateTimeoutList(mtcp, cur_stream); 1274 1275 /* Process RST: process here only if state > TCP_ST_SYN_SENT */ 1276 if (tcph->rst) { 1277 cur_stream->have_reset = TRUE; 1278 if (cur_stream->state > TCP_ST_SYN_SENT) { 1279 if (ProcessRST(mtcp, cur_stream, pctx)) { 1280 return; 1281 } 1282 } 1283 } 1284 1285 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE) && 1286 pctx->p.tcph->fin) { 1287 if (cur_stream->state == TCP_ST_CLOSE_WAIT || 1288 cur_stream->state == TCP_ST_LAST_ACK || 1289 cur_stream->state == TCP_ST_CLOSING || 1290 cur_stream->state == TCP_ST_TIME_WAIT) { 1291 /* Handle retransmitted FIN packet */ 1292 if (pctx->p.seq == cur_stream->pair_stream->sndvar->fss) { 1293 TRACE_DBG("FIN retransmit! (seq = %u / fss = %u)\n", 1294 pctx->p.seq, cur_stream->pair_stream->sndvar->fss); 1295 cur_stream->cb_events |= MOS_ON_REXMIT; 1296 } 1297 } 1298 } 1299 1300 switch (cur_stream->state) { 1301 case TCP_ST_LISTEN: 1302 Handle_TCP_ST_LISTEN(mtcp, cur_stream, pctx); 1303 break; 1304 1305 case TCP_ST_SYN_SENT: 1306 Handle_TCP_ST_SYN_SENT(mtcp, cur_stream, pctx); 1307 break; 1308 1309 case TCP_ST_SYN_RCVD: 1310 /* SYN retransmit implies our SYN/ACK was lost. Resend */ 1311 if (tcph->syn && pctx->p.seq == cur_stream->rcvvar->irs) 1312 Handle_TCP_ST_LISTEN(mtcp, cur_stream, pctx); 1313 else { 1314 Handle_TCP_ST_SYN_RCVD(mtcp, cur_stream, pctx); 1315 if (pctx->p.payloadlen > 0 && cur_stream->state == TCP_ST_ESTABLISHED) 1316 Handle_TCP_ST_ESTABLISHED(mtcp, cur_stream, pctx); 1317 } 1318 break; 1319 1320 case TCP_ST_ESTABLISHED: 1321 Handle_TCP_ST_ESTABLISHED(mtcp, cur_stream, pctx); 1322 break; 1323 1324 case TCP_ST_CLOSE_WAIT: 1325 Handle_TCP_ST_CLOSE_WAIT(mtcp, cur_stream, pctx); 1326 break; 1327 1328 case TCP_ST_LAST_ACK: 1329 Handle_TCP_ST_LAST_ACK(mtcp, cur_stream, pctx); 1330 break; 1331 1332 case TCP_ST_FIN_WAIT_1: 1333 Handle_TCP_ST_FIN_WAIT_1(mtcp, cur_stream, pctx); 1334 break; 1335 1336 case TCP_ST_FIN_WAIT_2: 1337 Handle_TCP_ST_FIN_WAIT_2(mtcp, cur_stream, pctx); 1338 break; 1339 1340 case TCP_ST_CLOSING: 1341 Handle_TCP_ST_CLOSING(mtcp, cur_stream, pctx); 1342 break; 1343 1344 case TCP_ST_TIME_WAIT: 1345 /* the only thing that can arrive in this state is a retransmission 1346 of the remote FIN. Acknowledge it, and restart the 2 MSL timeout */ 1347 if (cur_stream->on_timewait_list) { 1348 RemoveFromTimewaitList(mtcp, cur_stream); 1349 AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts); 1350 } 1351 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 1352 break; 1353 1354 case TCP_ST_CLOSED: 1355 case TCP_ST_CLOSED_RSVD: 1356 break; 1357 1358 default: 1359 break; 1360 } 1361 1362 TRACE_STATE("Stream %d: Events: %0lx, Action: %0x\n", 1363 cur_stream->id, cur_stream->cb_events, cur_stream->actions); 1364 return; 1365 } 1366 /*----------------------------------------------------------------------------*/ 1367 void 1368 DoActionEndTCPPacket(mtcp_manager_t mtcp, struct tcp_stream *cur_stream, 1369 struct pkt_ctx *pctx) 1370 { 1371 int i; 1372 1373 for (i = 1; i < MOS_ACT_CNT; i = i << 1) { 1374 1375 if (cur_stream->actions & i) { 1376 switch(i) { 1377 case MOS_ACT_SEND_DATA: 1378 AddtoSendList(mtcp, cur_stream); 1379 break; 1380 case MOS_ACT_SEND_ACK_NOW: 1381 EnqueueACK(mtcp, cur_stream, pctx->p.cur_ts, ACK_OPT_NOW); 1382 break; 1383 case MOS_ACT_SEND_ACK_AGG: 1384 EnqueueACK(mtcp, cur_stream, pctx->p.cur_ts, ACK_OPT_AGGREGATE); 1385 break; 1386 case MOS_ACT_SEND_CONTROL: 1387 AddtoControlList(mtcp, cur_stream, pctx->p.cur_ts); 1388 break; 1389 case MOS_ACT_SEND_RST: 1390 if (cur_stream->state <= TCP_ST_SYN_SENT) 1391 SendTCPPacketStandalone(mtcp, 1392 pctx->p.iph->daddr, pctx->p.tcph->dest, 1393 pctx->p.iph->saddr, pctx->p.tcph->source, 1394 0, pctx->p.seq + 1, 0, TCP_FLAG_RST | TCP_FLAG_ACK, 1395 NULL, 0, pctx->p.cur_ts, 0, 0, -1); 1396 else 1397 SendTCPPacketStandalone(mtcp, 1398 pctx->p.iph->daddr, pctx->p.tcph->dest, 1399 pctx->p.iph->saddr, pctx->p.tcph->source, 1400 pctx->p.ack_seq, 0, 0, TCP_FLAG_RST | TCP_FLAG_ACK, 1401 NULL, 0, pctx->p.cur_ts, 0, 0, -1); 1402 break; 1403 case MOS_ACT_DESTROY: 1404 DestroyTCPStream(mtcp, cur_stream); 1405 break; 1406 default: 1407 assert(1); 1408 break; 1409 } 1410 } 1411 } 1412 1413 cur_stream->actions = 0; 1414 } 1415 /*----------------------------------------------------------------------------*/ 1416 /** 1417 * Called (when monitoring mode is enabled).. for every outgoing packet to the 1418 * NIC. 1419 */ 1420 inline void 1421 UpdatePassiveRecvTCPContext(mtcp_manager_t mtcp, struct tcp_stream *cur_stream, 1422 struct pkt_ctx *pctx) 1423 { 1424 UpdateRecvTCPContext(mtcp, cur_stream, pctx); 1425 } 1426 /*----------------------------------------------------------------------------*/ 1427 /* NOTE TODO: This event prediction is additional overhaed of HK_RCV hook. 1428 * We can transparently optimize this by disabling prediction of events which 1429 * are not monitored by anyone. */ 1430 inline void 1431 PreRecvTCPEventPrediction(mtcp_manager_t mtcp, struct pkt_ctx *pctx, 1432 struct tcp_stream *recvside_stream) 1433 { 1434 tcp_rb_overlapchk(mtcp, pctx, recvside_stream); 1435 } 1436 /*----------------------------------------------------------------------------*/ 1437