1 #include <assert.h> 2 3 #include "mos_api.h" 4 #include "tcp_util.h" 5 #include "tcp_in.h" 6 #include "tcp_out.h" 7 #include "tcp_ring_buffer.h" 8 #include "eventpoll.h" 9 #include "debug.h" 10 #include "timer.h" 11 #include "ip_in.h" 12 #include "tcp_rb.h" 13 #include "config.h" 14 #include "scalable_event.h" 15 16 #define MAX(a, b) ((a)>(b)?(a):(b)) 17 #define MIN(a, b) ((a)<(b)?(a):(b)) 18 19 #define RECOVERY_AFTER_LOSS TRUE 20 #define SELECTIVE_WRITE_EVENT_NOTIFY TRUE 21 /*----------------------------------------------------------------------------*/ 22 static inline void 23 Handle_TCP_ST_ESTABLISHED (mtcp_manager_t mtcp, tcp_stream* cur_stream, 24 struct pkt_ctx *pctx); 25 /*----------------------------------------------------------------------------*/ 26 static inline int 27 FilterSYNPacket(mtcp_manager_t mtcp, uint32_t ip, uint16_t port) 28 { 29 struct sockaddr_in *addr; 30 31 /* TODO: This listening logic should be revised */ 32 33 /* if not listening, drop */ 34 if (!mtcp->listener) { 35 return FALSE; 36 } 37 38 /* if not the address we want, drop */ 39 addr = &mtcp->listener->socket->saddr; 40 if (addr->sin_port == port) { 41 if (addr->sin_addr.s_addr != INADDR_ANY) { 42 if (ip == addr->sin_addr.s_addr) { 43 return TRUE; 44 } 45 return FALSE; 46 } else { 47 int i; 48 49 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 50 if (ip == g_config.mos->netdev_table->ent[i]->ip_addr) { 51 return TRUE; 52 } 53 } 54 return FALSE; 55 } 56 } 57 58 return FALSE; 59 } 60 /*----------------------------------------------------------------------------*/ 61 static inline int 62 HandleActiveOpen(mtcp_manager_t mtcp, tcp_stream *cur_stream, 63 struct pkt_ctx *pctx) 64 { 65 const struct tcphdr* tcph = pctx->p.tcph; 66 67 cur_stream->rcvvar->irs = pctx->p.seq; 68 cur_stream->snd_nxt = pctx->p.ack_seq; 69 cur_stream->sndvar->peer_wnd = pctx->p.window; 70 cur_stream->rcvvar->snd_wl1 = cur_stream->rcvvar->irs - 1; 71 cur_stream->rcv_nxt = cur_stream->rcvvar->irs + 1; 72 cur_stream->rcvvar->last_ack_seq = pctx->p.ack_seq; 73 ParseTCPOptions(cur_stream, pctx->p.cur_ts, (uint8_t *)tcph + TCP_HEADER_LEN, 74 (tcph->doff << 2) - TCP_HEADER_LEN); 75 cur_stream->sndvar->cwnd = ((cur_stream->sndvar->cwnd == 1)? 76 (cur_stream->sndvar->mss * 2): cur_stream->sndvar->mss); 77 cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10; 78 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 79 UpdateRetransmissionTimer(mtcp, cur_stream, pctx->p.cur_ts); 80 81 return TRUE; 82 } 83 /*----------------------------------------------------------------------------*/ 84 /* ValidateSequence: validates sequence number of the segment */ 85 /* Return: TRUE if acceptable, FALSE if not acceptable */ 86 /*----------------------------------------------------------------------------*/ 87 static inline int 88 ValidateSequence(mtcp_manager_t mtcp, tcp_stream *cur_stream, 89 struct pkt_ctx *pctx) 90 { 91 const struct tcphdr* tcph = pctx->p.tcph; 92 93 /* Protect Against Wrapped Sequence number (PAWS) */ 94 if (!tcph->rst && cur_stream->saw_timestamp) { 95 struct tcp_timestamp ts; 96 97 if (!ParseTCPTimestamp(cur_stream, &ts, 98 (uint8_t *)tcph + TCP_HEADER_LEN, 99 (tcph->doff << 2) - TCP_HEADER_LEN)) { 100 /* if there is no timestamp */ 101 /* TODO: implement here */ 102 TRACE_DBG("No timestamp found.\n"); 103 return FALSE; 104 } 105 106 /* RFC1323: if SEG.TSval < TS.Recent, drop and send ack */ 107 if (TCP_SEQ_LT(ts.ts_val, cur_stream->rcvvar->ts_recent)) { 108 /* TODO: ts_recent should be invalidated 109 before timestamp wraparound for long idle flow */ 110 TRACE_DBG("PAWS Detect wrong timestamp. " 111 "seq: %u, ts_val: %u, prev: %u\n", 112 pctx->p.seq, ts.ts_val, cur_stream->rcvvar->ts_recent); 113 cur_stream->actions |= MOS_ACT_SEND_ACK_NOW; 114 return FALSE; 115 } else { 116 /* valid timestamp */ 117 if (TCP_SEQ_GT(ts.ts_val, cur_stream->rcvvar->ts_recent)) { 118 TRACE_TSTAMP("Timestamp update. cur: %u, prior: %u " 119 "(time diff: %uus)\n", 120 ts.ts_val, cur_stream->rcvvar->ts_recent, 121 TS_TO_USEC(pctx->p.cur_ts - cur_stream->rcvvar->ts_last_ts_upd)); 122 cur_stream->rcvvar->ts_last_ts_upd = pctx->p.cur_ts; 123 } 124 125 cur_stream->rcvvar->ts_recent = ts.ts_val; 126 cur_stream->rcvvar->ts_lastack_rcvd = ts.ts_ref; 127 } 128 } 129 130 /* TCP sequence validation */ 131 if (!TCP_SEQ_BETWEEN(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt, 132 cur_stream->rcv_nxt + cur_stream->rcvvar->rcv_wnd)) { 133 134 /* if RST bit is set, ignore the segment */ 135 if (tcph->rst) 136 return FALSE; 137 138 if (cur_stream->state == TCP_ST_ESTABLISHED) { 139 /* check if it is to get window advertisement */ 140 if (pctx->p.seq + 1 == cur_stream->rcv_nxt) { 141 TRACE_DBG("Window update request. (seq: %u, rcv_wnd: %u)\n", 142 pctx->p.seq, cur_stream->rcvvar->rcv_wnd); 143 cur_stream->actions |= MOS_ACT_SEND_ACK_AGG; 144 return FALSE; 145 146 } 147 148 if (TCP_SEQ_LEQ(pctx->p.seq, cur_stream->rcv_nxt)) { 149 cur_stream->actions |= MOS_ACT_SEND_ACK_AGG; 150 } else { 151 cur_stream->actions |= MOS_ACT_SEND_ACK_NOW; 152 } 153 } else { 154 if (cur_stream->state == TCP_ST_TIME_WAIT) { 155 TRACE_DBG("Stream %d: tw expire update to %u\n", 156 cur_stream->id, cur_stream->rcvvar->ts_tw_expire); 157 AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts); 158 } 159 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 160 } 161 return FALSE; 162 } 163 164 return TRUE; 165 } 166 /*----------------------------------------------------------------------------*/ 167 static inline void 168 NotifyConnectionReset(mtcp_manager_t mtcp, tcp_stream *cur_stream) 169 { 170 TRACE_DBG("Stream %d: Notifying connection reset.\n", cur_stream->id); 171 /* TODO: implement this function */ 172 /* signal to user "connection reset" */ 173 } 174 /*----------------------------------------------------------------------------*/ 175 static inline int 176 ProcessRST(mtcp_manager_t mtcp, tcp_stream *cur_stream, 177 struct pkt_ctx *pctx) 178 { 179 /* TODO: we need reset validation logic */ 180 /* the sequence number of a RST should be inside window */ 181 /* (in SYN_SENT state, it should ack the previous SYN */ 182 183 TRACE_DBG("Stream %d: TCP RESET (%s)\n", 184 cur_stream->id, TCPStateToString(cur_stream)); 185 #if DUMP_STREAM 186 DumpStream(mtcp, cur_stream); 187 #endif 188 189 if (cur_stream->state <= TCP_ST_SYN_SENT) { 190 /* not handled here */ 191 return FALSE; 192 } 193 194 if (cur_stream->state == TCP_ST_SYN_RCVD) { 195 /* ACK number of last sent ACK packet == rcv_nxt + 1*/ 196 if (pctx->p.seq == 0 || 197 #ifdef BE_RESILIENT_TO_PACKET_DROP 198 pctx->p.seq == cur_stream->rcv_nxt + 1 || 199 #endif 200 pctx->p.ack_seq == cur_stream->snd_nxt) 201 { 202 cur_stream->state = TCP_ST_CLOSED_RSVD; 203 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 204 cur_stream->close_reason = TCP_RESET; 205 cur_stream->actions |= MOS_ACT_DESTROY; 206 } else { 207 RAISE_DEBUG_EVENT(mtcp, cur_stream, 208 "(SYN_RCVD): Ignore invalid RST. " 209 "ack_seq expected: %u, ack_seq rcvd: %u\n", 210 cur_stream->rcv_nxt + 1, pctx->p.ack_seq); 211 } 212 return TRUE; 213 } 214 215 /* if the application is already closed the connection, 216 just destroy the it */ 217 if (cur_stream->state == TCP_ST_FIN_WAIT_1 || 218 cur_stream->state == TCP_ST_FIN_WAIT_2 || 219 cur_stream->state == TCP_ST_LAST_ACK || 220 cur_stream->state == TCP_ST_CLOSING || 221 cur_stream->state == TCP_ST_TIME_WAIT) { 222 cur_stream->state = TCP_ST_CLOSED_RSVD; 223 cur_stream->close_reason = TCP_ACTIVE_CLOSE; 224 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 225 cur_stream->actions |= MOS_ACT_DESTROY; 226 return TRUE; 227 } 228 229 if (cur_stream->state >= TCP_ST_ESTABLISHED && 230 cur_stream->state <= TCP_ST_CLOSE_WAIT) { 231 /* ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 232 /* TODO: flush all the segment queues */ 233 //NotifyConnectionReset(mtcp, cur_stream); 234 } 235 236 if (!(cur_stream->sndvar->on_closeq || cur_stream->sndvar->on_closeq_int || 237 cur_stream->sndvar->on_resetq || cur_stream->sndvar->on_resetq_int)) { 238 //cur_stream->state = TCP_ST_CLOSED_RSVD; 239 //cur_stream->actions |= MOS_ACT_DESTROY; 240 cur_stream->state = TCP_ST_CLOSED_RSVD; 241 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 242 cur_stream->close_reason = TCP_RESET; 243 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 244 RaiseCloseEvent(mtcp, cur_stream); 245 } 246 247 return TRUE; 248 } 249 /*----------------------------------------------------------------------------*/ 250 inline void 251 EstimateRTT(mtcp_manager_t mtcp, tcp_stream *cur_stream, uint32_t mrtt) 252 { 253 /* This function should be called for not retransmitted packets */ 254 /* TODO: determine tcp_rto_min */ 255 #define TCP_RTO_MIN 0 256 long m = mrtt; 257 uint32_t tcp_rto_min = TCP_RTO_MIN; 258 struct tcp_recv_vars *rcvvar = cur_stream->rcvvar; 259 260 if (m == 0) { 261 m = 1; 262 } 263 if (rcvvar->srtt != 0) { 264 /* rtt = 7/8 rtt + 1/8 new */ 265 m -= (rcvvar->srtt >> 3); 266 rcvvar->srtt += m; 267 if (m < 0) { 268 m = -m; 269 m -= (rcvvar->mdev >> 2); 270 if (m > 0) { 271 m >>= 3; 272 } 273 } else { 274 m -= (rcvvar->mdev >> 2); 275 } 276 rcvvar->mdev += m; 277 if (rcvvar->mdev > rcvvar->mdev_max) { 278 rcvvar->mdev_max = rcvvar->mdev; 279 if (rcvvar->mdev_max > rcvvar->rttvar) { 280 rcvvar->rttvar = rcvvar->mdev_max; 281 } 282 } 283 if (TCP_SEQ_GT(cur_stream->sndvar->snd_una, rcvvar->rtt_seq)) { 284 if (rcvvar->mdev_max < rcvvar->rttvar) { 285 rcvvar->rttvar -= (rcvvar->rttvar - rcvvar->mdev_max) >> 2; 286 } 287 rcvvar->rtt_seq = cur_stream->snd_nxt; 288 rcvvar->mdev_max = tcp_rto_min; 289 } 290 } else { 291 /* fresh measurement */ 292 rcvvar->srtt = m << 3; 293 rcvvar->mdev = m << 1; 294 rcvvar->mdev_max = rcvvar->rttvar = MAX(rcvvar->mdev, tcp_rto_min); 295 rcvvar->rtt_seq = cur_stream->snd_nxt; 296 } 297 298 TRACE_RTT("mrtt: %u (%uus), srtt: %u (%ums), mdev: %u, mdev_max: %u, " 299 "rttvar: %u, rtt_seq: %u\n", mrtt, mrtt * TIME_TICK, 300 rcvvar->srtt, TS_TO_MSEC((rcvvar->srtt) >> 3), rcvvar->mdev, 301 rcvvar->mdev_max, rcvvar->rttvar, rcvvar->rtt_seq); 302 } 303 /*----------------------------------------------------------------------------*/ 304 static inline void 305 ProcessACK(mtcp_manager_t mtcp, tcp_stream *cur_stream, 306 struct pkt_ctx *pctx) 307 { 308 const struct tcphdr* tcph = pctx->p.tcph; 309 uint32_t seq = pctx->p.seq; 310 uint32_t ack_seq = pctx->p.ack_seq; 311 struct tcp_send_vars *sndvar = cur_stream->sndvar; 312 uint32_t cwindow, cwindow_prev; 313 uint32_t rmlen; 314 uint32_t snd_wnd_prev; 315 uint32_t right_wnd_edge; 316 uint8_t dup; 317 318 cwindow = pctx->p.window; 319 if (!tcph->syn) { 320 cwindow = cwindow << sndvar->wscale_peer; 321 } 322 right_wnd_edge = sndvar->peer_wnd + cur_stream->rcvvar->snd_wl2; 323 324 /* If ack overs the sending buffer, return */ 325 if (cur_stream->state == TCP_ST_FIN_WAIT_1 || 326 cur_stream->state == TCP_ST_FIN_WAIT_2 || 327 cur_stream->state == TCP_ST_CLOSING || 328 cur_stream->state == TCP_ST_CLOSE_WAIT || 329 cur_stream->state == TCP_ST_LAST_ACK) { 330 if (sndvar->is_fin_sent && ack_seq == sndvar->fss + 1) { 331 ack_seq--; 332 } 333 } 334 335 if (TCP_SEQ_GT(ack_seq, sndvar->sndbuf->head_seq + sndvar->sndbuf->len)) { 336 TRACE_DBG("Stream %d (%s): invalid acknologement. " 337 "ack_seq: %u, possible max_ack_seq: %u\n", cur_stream->id, 338 TCPStateToString(cur_stream), ack_seq, 339 sndvar->sndbuf->head_seq + sndvar->sndbuf->len); 340 return; 341 } 342 343 #ifdef BE_RESILIENT_TO_PACKET_DROP 344 if (TCP_SEQ_GT(seq + pctx->p.payloadlen, cur_stream->rcv_nxt)) 345 cur_stream->rcv_nxt = seq + pctx->p.payloadlen; 346 #endif 347 348 /* Update window */ 349 if (TCP_SEQ_LT(cur_stream->rcvvar->snd_wl1, seq) || 350 (cur_stream->rcvvar->snd_wl1 == seq && 351 TCP_SEQ_LT(cur_stream->rcvvar->snd_wl2, ack_seq)) || 352 (cur_stream->rcvvar->snd_wl2 == ack_seq && 353 cwindow > sndvar->peer_wnd)) { 354 cwindow_prev = sndvar->peer_wnd; 355 sndvar->peer_wnd = cwindow; 356 cur_stream->rcvvar->snd_wl1 = seq; 357 cur_stream->rcvvar->snd_wl2 = ack_seq; 358 TRACE_CLWND("Window update. " 359 "ack: %u, peer_wnd: %u, snd_nxt-snd_una: %u\n", 360 ack_seq, cwindow, cur_stream->snd_nxt - sndvar->snd_una); 361 if (cwindow_prev < cur_stream->snd_nxt - sndvar->snd_una && 362 sndvar->peer_wnd >= cur_stream->snd_nxt - sndvar->snd_una) { 363 TRACE_CLWND("%u Broadcasting client window update! " 364 "ack_seq: %u, peer_wnd: %u (before: %u), " 365 "(snd_nxt - snd_una: %u)\n", 366 cur_stream->id, ack_seq, sndvar->peer_wnd, cwindow_prev, 367 cur_stream->snd_nxt - sndvar->snd_una); 368 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 369 RaiseWriteEvent(mtcp, cur_stream); 370 } 371 } 372 373 /* Check duplicated ack count */ 374 /* Duplicated ack if 375 1) ack_seq is old 376 2) payload length is 0. 377 3) advertised window not changed. 378 4) there is outstanding unacknowledged data 379 5) ack_seq == snd_una 380 */ 381 382 dup = FALSE; 383 if (TCP_SEQ_LT(ack_seq, cur_stream->snd_nxt)) { 384 if (ack_seq == cur_stream->rcvvar->last_ack_seq && pctx->p.payloadlen == 0) { 385 if (cur_stream->rcvvar->snd_wl2 + sndvar->peer_wnd == right_wnd_edge) { 386 if (cur_stream->rcvvar->dup_acks + 1 > cur_stream->rcvvar->dup_acks) { 387 cur_stream->rcvvar->dup_acks++; 388 } 389 dup = TRUE; 390 } 391 } 392 } 393 if (!dup) { 394 cur_stream->rcvvar->dup_acks = 0; 395 cur_stream->rcvvar->last_ack_seq = ack_seq; 396 } 397 398 /* Fast retransmission */ 399 if (dup && cur_stream->rcvvar->dup_acks == 3) { 400 TRACE_LOSS("Triple duplicated ACKs!! ack_seq: %u\n", ack_seq); 401 if (TCP_SEQ_LT(ack_seq, cur_stream->snd_nxt)) { 402 TRACE_LOSS("Reducing snd_nxt from %u to %u\n", 403 cur_stream->snd_nxt, ack_seq); 404 #if RTM_STAT 405 sndvar->rstat.tdp_ack_cnt++; 406 sndvar->rstat.tdp_ack_bytes += (cur_stream->snd_nxt - ack_seq); 407 #endif 408 if (ack_seq != sndvar->snd_una) { 409 TRACE_DBG("ack_seq and snd_una mismatch on tdp ack. " 410 "ack_seq: %u, snd_una: %u\n", 411 ack_seq, sndvar->snd_una); 412 } 413 cur_stream->snd_nxt = ack_seq; 414 } 415 416 /* update congestion control variables */ 417 /* ssthresh to half of min of cwnd and peer wnd */ 418 sndvar->ssthresh = MIN(sndvar->cwnd, sndvar->peer_wnd) / 2; 419 if (sndvar->ssthresh < 2 * sndvar->mss) { 420 sndvar->ssthresh = 2 * sndvar->mss; 421 } 422 sndvar->cwnd = sndvar->ssthresh + 3 * sndvar->mss; 423 TRACE_CONG("Fast retransmission. cwnd: %u, ssthresh: %u\n", 424 sndvar->cwnd, sndvar->ssthresh); 425 426 /* count number of retransmissions */ 427 if (sndvar->nrtx < TCP_MAX_RTX) { 428 sndvar->nrtx++; 429 } else { 430 TRACE_DBG("Exceed MAX_RTX.\n"); 431 } 432 433 cur_stream->actions |= MOS_ACT_SEND_DATA; 434 435 } else if (cur_stream->rcvvar->dup_acks > 3) { 436 /* Inflate congestion window until before overflow */ 437 if ((uint32_t)(sndvar->cwnd + sndvar->mss) > sndvar->cwnd) { 438 sndvar->cwnd += sndvar->mss; 439 TRACE_CONG("Dupack cwnd inflate. cwnd: %u, ssthresh: %u\n", 440 sndvar->cwnd, sndvar->ssthresh); 441 } 442 } 443 444 #if TCP_OPT_SACK_ENABLED 445 ParseSACKOption(cur_stream, ack_seq, (uint8_t *)tcph + TCP_HEADER_LEN, 446 (tcph->doff << 2) - TCP_HEADER_LEN); 447 #endif /* TCP_OPT_SACK_ENABLED */ 448 449 #if RECOVERY_AFTER_LOSS 450 /* updating snd_nxt (when recovered from loss) */ 451 if (TCP_SEQ_GT(ack_seq, cur_stream->snd_nxt)) { 452 #if RTM_STAT 453 sndvar->rstat.ack_upd_cnt++; 454 sndvar->rstat.ack_upd_bytes += (ack_seq - cur_stream->snd_nxt); 455 #endif 456 TRACE_LOSS("Updating snd_nxt from %u to %u\n", 457 cur_stream->snd_nxt, ack_seq); 458 cur_stream->snd_nxt = ack_seq; 459 if (sndvar->sndbuf->len == 0) { 460 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 461 RemoveFromSendList(mtcp, cur_stream); 462 } 463 } 464 #endif 465 466 /* If ack_seq is previously acked, return */ 467 if (TCP_SEQ_GEQ(sndvar->sndbuf->head_seq, ack_seq)) { 468 return; 469 } 470 471 /* Remove acked sequence from send buffer */ 472 rmlen = ack_seq - sndvar->sndbuf->head_seq; 473 if (rmlen > 0) { 474 /* Routine goes here only if there is new payload (not retransmitted) */ 475 uint16_t packets; 476 477 /* If acks new data */ 478 packets = rmlen / sndvar->eff_mss; 479 if ((rmlen / sndvar->eff_mss) * sndvar->eff_mss > rmlen) { 480 packets++; 481 } 482 483 /* Estimate RTT and calculate rto */ 484 if (cur_stream->saw_timestamp) { 485 EstimateRTT(mtcp, cur_stream, 486 pctx->p.cur_ts - cur_stream->rcvvar->ts_lastack_rcvd); 487 sndvar->rto = (cur_stream->rcvvar->srtt >> 3) + cur_stream->rcvvar->rttvar; 488 assert(sndvar->rto > 0); 489 } else { 490 //TODO: Need to implement timestamp estimation without timestamp 491 TRACE_RTT("NOT IMPLEMENTED.\n"); 492 } 493 494 /* Update congestion control variables */ 495 if (cur_stream->state >= TCP_ST_ESTABLISHED) { 496 if (sndvar->cwnd < sndvar->ssthresh) { 497 if ((sndvar->cwnd + sndvar->mss) > sndvar->cwnd) { 498 sndvar->cwnd += (sndvar->mss * packets); 499 } 500 TRACE_CONG("slow start cwnd: %u, ssthresh: %u\n", 501 sndvar->cwnd, sndvar->ssthresh); 502 } else { 503 uint32_t new_cwnd = sndvar->cwnd + 504 packets * sndvar->mss * sndvar->mss / 505 sndvar->cwnd; 506 if (new_cwnd > sndvar->cwnd) { 507 sndvar->cwnd = new_cwnd; 508 } 509 //TRACE_CONG("congestion avoidance cwnd: %u, ssthresh: %u\n", 510 // sndvar->cwnd, sndvar->ssthresh); 511 } 512 } 513 514 if (SBUF_LOCK(&sndvar->write_lock)) { 515 if (errno == EDEADLK) 516 perror("ProcessACK: write_lock blocked\n"); 517 assert(0); 518 } 519 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 520 SBRemove(mtcp->rbm_snd, sndvar->sndbuf, rmlen); 521 sndvar->snd_una = ack_seq; 522 snd_wnd_prev = sndvar->snd_wnd; 523 sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len; 524 525 /* If there was no available sending window */ 526 /* notify the newly available window to application */ 527 #if SELECTIVE_WRITE_EVENT_NOTIFY 528 if (snd_wnd_prev <= 0) { 529 #endif /* SELECTIVE_WRITE_EVENT_NOTIFY */ 530 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 531 RaiseWriteEvent(mtcp, cur_stream); 532 #if SELECTIVE_WRITE_EVENT_NOTIFY 533 } 534 #endif /* SELECTIVE_WRITE_EVENT_NOTIFY */ 535 536 SBUF_UNLOCK(&sndvar->write_lock); 537 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 538 UpdateRetransmissionTimer(mtcp, cur_stream, pctx->p.cur_ts); 539 } 540 } 541 /*----------------------------------------------------------------------------*/ 542 /* ProcessTCPPayload: merges TCP payload using receive ring buffer */ 543 /* Return: TRUE (1) in normal case, FALSE (0) if immediate ACK is required */ 544 /* CAUTION: should only be called at ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2 */ 545 /*----------------------------------------------------------------------------*/ 546 static inline int 547 ProcessTCPPayload(mtcp_manager_t mtcp, tcp_stream *cur_stream, 548 struct pkt_ctx *pctx) 549 { 550 struct tcp_recv_vars *rcvvar = cur_stream->rcvvar; 551 uint32_t prev_rcv_nxt; 552 int ret = -1; 553 bool read_lock; 554 struct socket_map *walk; 555 556 if (!cur_stream->buffer_mgmt) 557 return FALSE; 558 559 /* if seq and segment length is lower than rcv_nxt, ignore and send ack */ 560 if (TCP_SEQ_LT(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt)) { 561 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 562 HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side, 563 pctx, MOS_ON_ERROR); 564 } SOCKQ_FOREACH_END; 565 return FALSE; 566 } 567 /* if payload exceeds receiving buffer, drop and send ack */ 568 if (TCP_SEQ_GT(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt + rcvvar->rcv_wnd)) { 569 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 570 HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side, 571 pctx, MOS_ON_ERROR); 572 } SOCKQ_FOREACH_END; 573 return FALSE; 574 } 575 576 /* allocate receive buffer if not exist */ 577 if (!rcvvar->rcvbuf) { 578 rcvvar->rcvbuf = tcprb_new(mtcp->bufseg_pool, g_config.mos->rmem_size, cur_stream->buffer_mgmt); 579 if (!rcvvar->rcvbuf) { 580 TRACE_ERROR("Stream %d: Failed to allocate receive buffer.\n", 581 cur_stream->id); 582 cur_stream->state = TCP_ST_CLOSED_RSVD; 583 cur_stream->close_reason = TCP_NO_MEM; 584 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 585 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 586 RaiseErrorEvent(mtcp, cur_stream); 587 588 return ERROR; 589 } 590 } 591 592 read_lock = HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM); 593 594 if (read_lock && SBUF_LOCK(&rcvvar->read_lock)) { 595 if (errno == EDEADLK) 596 perror("ProcessTCPPayload: read_lock blocked\n"); 597 assert(0); 598 } 599 600 prev_rcv_nxt = cur_stream->rcv_nxt; 601 602 tcprb_t *rb = rcvvar->rcvbuf; 603 loff_t off = seq2loff(rb, pctx->p.seq, (rcvvar->irs + 1)); 604 if (off >= 0) { 605 if (!HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM) && 606 HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE)) 607 tcprb_setpile(rb, rb->pile + tcprb_cflen(rb)); 608 ret = tcprb_pwrite(rb, pctx->p.payload, pctx->p.payloadlen, off); 609 if (ret < 0) { 610 /* We try again after warning this result to the user. */ 611 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 612 HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side, 613 pctx, MOS_ON_ERROR); 614 } SOCKQ_FOREACH_END; 615 ret = tcprb_pwrite(rb, pctx->p.payload, pctx->p.payloadlen, off); 616 } 617 } 618 /* TODO: update monitor vars */ 619 620 /* 621 * error can pop up due to disabled buffered management 622 * (only in monitor mode). In that case, ignore the warning 623 * message. 624 */ 625 if (ret < 0 && cur_stream->buffer_mgmt && mtcp->num_msp == 0) 626 TRACE_ERROR("Cannot merge payload. reason: %d\n", ret); 627 628 /* discard the buffer if the state is FIN_WAIT_1 or FIN_WAIT_2, 629 meaning that the connection is already closed by the application */ 630 loff_t cftail = rb->pile + tcprb_cflen(rb); 631 if (cur_stream->state == TCP_ST_FIN_WAIT_1 || 632 cur_stream->state == TCP_ST_FIN_WAIT_2) { 633 /* XXX: Do we really need to update recv vars? */ 634 tcprb_setpile(rb, cftail); 635 } 636 if (cftail > 0 && (rcvvar->irs + 1) + cftail > cur_stream->rcv_nxt) { 637 RAISE_DEBUG_EVENT(mtcp, cur_stream, 638 "Move rcv_nxt from %u to %u.\n", 639 cur_stream->rcv_nxt, (rcvvar->irs + 1) + cftail); 640 cur_stream->rcv_nxt = (rcvvar->irs + 1) + cftail; 641 } 642 assert(cftail - rb->pile >= 0); 643 rcvvar->rcv_wnd = rb->len - (cftail - rb->pile); 644 645 if (read_lock) 646 SBUF_UNLOCK(&rcvvar->read_lock); 647 648 649 if (TCP_SEQ_LEQ(cur_stream->rcv_nxt, prev_rcv_nxt)) { 650 /* There are some lost packets */ 651 return FALSE; 652 } 653 654 TRACE_EPOLL("Stream %d data arrived. " 655 "len: %d, ET: %llu, IN: %llu, OUT: %llu\n", 656 cur_stream->id, pctx->p.payloadlen, 657 cur_stream->socket? (unsigned long long)cur_stream->socket->epoll & MOS_EPOLLET : 0, 658 cur_stream->socket? (unsigned long long)cur_stream->socket->epoll & MOS_EPOLLIN : 0, 659 cur_stream->socket? (unsigned long long)cur_stream->socket->epoll & MOS_EPOLLOUT : 0); 660 661 if (cur_stream->state == TCP_ST_ESTABLISHED) 662 RaiseReadEvent(mtcp, cur_stream); 663 664 return TRUE; 665 } 666 /*----------------------------------------------------------------------------*/ 667 static inline void 668 Handle_TCP_ST_LISTEN (mtcp_manager_t mtcp, tcp_stream* cur_stream, 669 struct pkt_ctx *pctx) 670 { 671 672 const struct tcphdr* tcph = pctx->p.tcph; 673 674 if (tcph->syn) { 675 if (cur_stream->state == TCP_ST_LISTEN) 676 cur_stream->rcv_nxt++; 677 cur_stream->state = TCP_ST_SYN_RCVD; 678 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE | MOS_ON_CONN_START; 679 TRACE_STATE("Stream %d: TCP_ST_SYN_RCVD\n", cur_stream->id); 680 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 681 if (IS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE)) { 682 /** 683 * Passive stream context needs to initialize irs and rcv_nxt 684 * as it is not set neither during createserverstream or monitor 685 * creation. 686 */ 687 cur_stream->rcvvar->irs = 688 cur_stream->rcv_nxt = pctx->p.seq; 689 } 690 } else { 691 CTRACE_ERROR("Stream %d (TCP_ST_LISTEN): " 692 "Packet without SYN.\n", cur_stream->id); 693 } 694 695 } 696 /*----------------------------------------------------------------------------*/ 697 static inline void 698 Handle_TCP_ST_SYN_SENT (mtcp_manager_t mtcp, tcp_stream* cur_stream, 699 struct pkt_ctx *pctx) 700 { 701 const struct tcphdr* tcph = pctx->p.tcph; 702 703 /* when active open */ 704 if (tcph->ack) { 705 /* filter the unacceptable acks */ 706 if (TCP_SEQ_LEQ(pctx->p.ack_seq, cur_stream->sndvar->iss) 707 #ifndef BE_RESILIENT_TO_PACKET_DROP 708 || TCP_SEQ_GT(pctx->p.ack_seq, cur_stream->snd_nxt) 709 #endif 710 ) { 711 if (!tcph->rst) { 712 cur_stream->actions |= MOS_ACT_SEND_RST; 713 } 714 return; 715 } 716 /* accept the ack */ 717 cur_stream->sndvar->snd_una++; 718 } 719 720 if (tcph->rst) { 721 if (tcph->ack) { 722 cur_stream->state = TCP_ST_CLOSED_RSVD; 723 cur_stream->close_reason = TCP_RESET; 724 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 725 if (cur_stream->socket) { 726 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 727 RaiseErrorEvent(mtcp, cur_stream); 728 } else { 729 cur_stream->actions |= MOS_ACT_DESTROY; 730 } 731 } 732 return; 733 } 734 735 if (tcph->ack 736 #ifndef BE_RESILIENT_TO_PACKET_DROP 737 /* If we already lost SYNACK, let the ACK packet do the SYNACK's role */ 738 && tcph->syn 739 #endif 740 ) { 741 int ret = HandleActiveOpen(mtcp, cur_stream, pctx); 742 if (!ret) { 743 return; 744 } 745 746 #ifdef BE_RESILIENT_TO_PACKET_DROP 747 if (!tcph->syn) { 748 RAISE_DEBUG_EVENT(mtcp, cur_stream, 749 "We missed SYNACK. Replace it with an ACK packet.\n"); 750 /* correct some variables */ 751 cur_stream->rcv_nxt = pctx->p.seq; 752 } 753 #endif 754 755 cur_stream->sndvar->nrtx = 0; 756 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 757 RemoveFromRTOList(mtcp, cur_stream); 758 cur_stream->state = TCP_ST_ESTABLISHED; 759 cur_stream->cb_events |= /*MOS_ON_CONN_SETUP |*/ MOS_ON_TCP_STATE_CHANGE; 760 TRACE_STATE("Stream %d: TCP_ST_ESTABLISHED\n", cur_stream->id); 761 762 if (cur_stream->socket) { 763 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 764 RaiseWriteEvent(mtcp, cur_stream); 765 } else { 766 TRACE_STATE("Stream %d: ESTABLISHED, but no socket\n", cur_stream->id); 767 cur_stream->close_reason = TCP_ACTIVE_CLOSE; 768 cur_stream->actions |= MOS_ACT_SEND_RST; 769 cur_stream->actions |= MOS_ACT_DESTROY; 770 } 771 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 772 if (g_config.mos->tcp_timeout > 0) 773 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 774 AddtoTimeoutList(mtcp, cur_stream); 775 776 #ifdef BE_RESILIENT_TO_PACKET_DROP 777 /* Handle this ack packet */ 778 if (!tcph->syn) 779 Handle_TCP_ST_ESTABLISHED(mtcp, cur_stream, pctx); 780 #endif 781 782 } else if (tcph->syn) { 783 cur_stream->state = TCP_ST_SYN_RCVD; 784 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 785 TRACE_STATE("Stream %d: TCP_ST_SYN_RCVD\n", cur_stream->id); 786 cur_stream->snd_nxt = cur_stream->sndvar->iss; 787 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 788 } 789 } 790 /*----------------------------------------------------------------------------*/ 791 static inline void 792 Handle_TCP_ST_SYN_RCVD (mtcp_manager_t mtcp, tcp_stream* cur_stream, 793 struct pkt_ctx *pctx) 794 { 795 const struct tcphdr* tcph = pctx->p.tcph; 796 struct tcp_send_vars *sndvar = cur_stream->sndvar; 797 int ret; 798 799 if (tcph->ack) { 800 uint32_t prior_cwnd; 801 /* NOTE: We do not validate the ack number because first few packets 802 * can also come out of order */ 803 804 sndvar->snd_una++; 805 cur_stream->snd_nxt = pctx->p.ack_seq; 806 prior_cwnd = sndvar->cwnd; 807 sndvar->cwnd = ((prior_cwnd == 1)? 808 (sndvar->mss * 2): sndvar->mss); 809 810 //UpdateRetransmissionTimer(mtcp, cur_stream, cur_ts); 811 sndvar->nrtx = 0; 812 cur_stream->rcv_nxt = cur_stream->rcvvar->irs + 1; 813 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 814 RemoveFromRTOList(mtcp, cur_stream); 815 816 cur_stream->state = TCP_ST_ESTABLISHED; 817 cur_stream->cb_events |= /*MOS_ON_CONN_SETUP |*/ MOS_ON_TCP_STATE_CHANGE; 818 TRACE_STATE("Stream %d: TCP_ST_ESTABLISHED\n", cur_stream->id); 819 820 #ifdef BE_RESILIENT_TO_PACKET_DROP 821 if (pctx->p.ack_seq != sndvar->iss + 1) 822 Handle_TCP_ST_ESTABLISHED(mtcp, cur_stream, pctx); 823 #endif 824 825 /* update listening socket */ 826 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) { 827 828 struct tcp_listener *listener = mtcp->listener; 829 830 ret = StreamEnqueue(listener->acceptq, cur_stream); 831 if (ret < 0) { 832 TRACE_ERROR("Stream %d: Failed to enqueue to " 833 "the listen backlog!\n", cur_stream->id); 834 cur_stream->close_reason = TCP_NOT_ACCEPTED; 835 cur_stream->state = TCP_ST_CLOSED_RSVD; 836 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 837 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", cur_stream->id); 838 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 839 } 840 841 /* raise an event to the listening socket */ 842 if (listener->socket && (listener->socket->epoll & MOS_EPOLLIN)) { 843 AddEpollEvent(mtcp->ep, 844 MOS_EVENT_QUEUE, listener->socket, MOS_EPOLLIN); 845 } 846 } 847 848 //TRACE_DBG("Stream %d inserted into acceptq.\n", cur_stream->id); 849 if (g_config.mos->tcp_timeout > 0) 850 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 851 AddtoTimeoutList(mtcp, cur_stream); 852 853 } else { 854 /* Handle retransmitted SYN packet */ 855 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE) && 856 tcph->syn) { 857 if (pctx->p.seq == cur_stream->pair_stream->sndvar->iss) { 858 TRACE_DBG("syn retransmit! (p.seq = %u / iss = %u)\n", 859 pctx->p.seq, cur_stream->pair_stream->sndvar->iss); 860 cur_stream->cb_events |= MOS_ON_REXMIT; 861 } 862 } 863 864 TRACE_DBG("Stream %d (TCP_ST_SYN_RCVD): No ACK.\n", 865 cur_stream->id); 866 /* retransmit SYN/ACK */ 867 cur_stream->snd_nxt = sndvar->iss; 868 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 869 } 870 } 871 /*----------------------------------------------------------------------------*/ 872 static inline void 873 Handle_TCP_ST_ESTABLISHED (mtcp_manager_t mtcp, tcp_stream* cur_stream, 874 struct pkt_ctx *pctx) 875 { 876 const struct tcphdr* tcph = pctx->p.tcph; 877 878 if (tcph->syn) { 879 /* Handle retransmitted SYNACK packet */ 880 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE) && 881 tcph->ack) { 882 if (pctx->p.seq == cur_stream->pair_stream->sndvar->iss) { 883 TRACE_DBG("syn/ack retransmit! (p.seq = %u / iss = %u)\n", 884 pctx->p.seq, cur_stream->pair_stream->sndvar->iss); 885 cur_stream->cb_events |= MOS_ON_REXMIT; 886 } 887 } 888 889 TRACE_DBG("Stream %d (TCP_ST_ESTABLISHED): weird SYN. " 890 "seq: %u, expected: %u, ack_seq: %u, expected: %u\n", 891 cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt, 892 pctx->p.ack_seq, cur_stream->snd_nxt); 893 cur_stream->snd_nxt = pctx->p.ack_seq; 894 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 895 return; 896 } 897 898 if (pctx->p.payloadlen > 0) { 899 if (ProcessTCPPayload(mtcp, cur_stream, pctx)) { 900 /* if return is TRUE, send ACK */ 901 cur_stream->actions |= MOS_ACT_SEND_ACK_AGG; 902 } else { 903 cur_stream->actions |= MOS_ACT_SEND_ACK_NOW; 904 } 905 } 906 907 if (tcph->ack) { 908 if (cur_stream->sndvar->sndbuf) { 909 ProcessACK(mtcp, cur_stream, pctx); 910 } 911 } 912 913 if (tcph->fin) { 914 /* process the FIN only if the sequence is valid */ 915 /* FIN packet is allowed to push payload (should we check for PSH flag)? */ 916 if (!cur_stream->buffer_mgmt || 917 #ifdef BE_RESILIENT_TO_PACKET_DROP 918 TCP_SEQ_GEQ(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt) 919 #else 920 pctx->p.seq + pctx->p.payloadlen == cur_stream->rcv_nxt 921 #endif 922 ) { 923 #ifdef BE_RESILIENT_TO_PACKET_DROP 924 cur_stream->rcv_nxt = pctx->p.seq + pctx->p.payloadlen; 925 #endif 926 cur_stream->state = TCP_ST_CLOSE_WAIT; 927 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 928 TRACE_STATE("Stream %d: TCP_ST_CLOSE_WAIT\n", cur_stream->id); 929 cur_stream->rcv_nxt++; 930 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 931 932 /* notify FIN to application */ 933 RaiseReadEvent(mtcp, cur_stream); 934 } else { 935 RAISE_DEBUG_EVENT(mtcp, cur_stream, 936 "Expected %u, but received %u (= %u + %u)\n", 937 cur_stream->rcv_nxt, pctx->p.seq + pctx->p.payloadlen, 938 pctx->p.seq, pctx->p.payloadlen); 939 940 cur_stream->actions |= MOS_ACT_SEND_ACK_NOW; 941 return; 942 } 943 } 944 } 945 /*----------------------------------------------------------------------------*/ 946 static inline void 947 Handle_TCP_ST_CLOSE_WAIT (mtcp_manager_t mtcp, tcp_stream* cur_stream, 948 struct pkt_ctx *pctx) 949 { 950 if (TCP_SEQ_LT(pctx->p.seq, cur_stream->rcv_nxt)) { 951 TRACE_DBG("Stream %d (TCP_ST_CLOSE_WAIT): " 952 "weird seq: %u, expected: %u\n", 953 cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt); 954 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 955 return; 956 } 957 958 if (cur_stream->sndvar->sndbuf) { 959 ProcessACK(mtcp, cur_stream, pctx); 960 } 961 } 962 /*----------------------------------------------------------------------------*/ 963 static inline void 964 Handle_TCP_ST_LAST_ACK (mtcp_manager_t mtcp, tcp_stream* cur_stream, 965 struct pkt_ctx *pctx) 966 { 967 const struct tcphdr* tcph = pctx->p.tcph; 968 969 if (TCP_SEQ_LT(pctx->p.seq, cur_stream->rcv_nxt)) { 970 TRACE_DBG("Stream %d (TCP_ST_LAST_ACK): " 971 "weird seq: %u, expected: %u\n", 972 cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt); 973 return; 974 } 975 976 if (tcph->ack) { 977 if (cur_stream->sndvar->sndbuf) { 978 ProcessACK(mtcp, cur_stream, pctx); 979 } 980 981 if (!cur_stream->sndvar->is_fin_sent) { 982 /* the case that FIN is not sent yet */ 983 /* this is not ack for FIN, ignore */ 984 TRACE_DBG("Stream %d (TCP_ST_LAST_ACK): " 985 "No FIN sent yet.\n", cur_stream->id); 986 #ifdef DBGMSG 987 DumpIPPacket(mtcp, pctx->p.iph, pctx->p.ip_len); 988 #endif 989 #if DUMP_STREAM 990 DumpStream(mtcp, cur_stream); 991 DumpControlList(mtcp, mtcp->n_sender[0]); 992 #endif 993 return; 994 } 995 996 /* check if ACK of FIN */ 997 if (pctx->p.ack_seq == cur_stream->sndvar->fss + 1) { 998 cur_stream->sndvar->snd_una++; 999 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 1000 UpdateRetransmissionTimer(mtcp, cur_stream, pctx->p.cur_ts); 1001 cur_stream->state = TCP_ST_CLOSED_RSVD; 1002 cur_stream->close_reason = TCP_PASSIVE_CLOSE; 1003 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1004 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", 1005 cur_stream->id); 1006 cur_stream->actions |= MOS_ACT_DESTROY; 1007 } else { 1008 TRACE_DBG("Stream %d (TCP_ST_LAST_ACK): Not ACK of FIN. " 1009 "ack_seq: %u, expected: %u\n", 1010 cur_stream->id, pctx->p.ack_seq, cur_stream->sndvar->fss + 1); 1011 //cur_stream->snd_nxt = cur_stream->sndvar->fss; 1012 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 1013 } 1014 } else { 1015 TRACE_DBG("Stream %d (TCP_ST_LAST_ACK): No ACK\n", 1016 cur_stream->id); 1017 //cur_stream->snd_nxt = cur_stream->sndvar->fss; 1018 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 1019 } 1020 } 1021 /*----------------------------------------------------------------------------*/ 1022 static inline void 1023 Handle_TCP_ST_FIN_WAIT_1 (mtcp_manager_t mtcp, tcp_stream* cur_stream, 1024 struct pkt_ctx *pctx) 1025 { 1026 const struct tcphdr* tcph = pctx->p.tcph; 1027 1028 if (TCP_SEQ_LT(pctx->p.seq, cur_stream->rcv_nxt)) { 1029 RAISE_DEBUG_EVENT(mtcp, cur_stream, 1030 "Stream %d (FIN_WAIT_1): " 1031 "weird seq: %u, expected: %u\n", 1032 cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt); 1033 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 1034 return; 1035 } 1036 1037 if (tcph->ack) { 1038 if (cur_stream->sndvar->sndbuf) { 1039 ProcessACK(mtcp, cur_stream, pctx); 1040 } 1041 1042 if (cur_stream->sndvar->is_fin_sent && 1043 ((!HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM) && 1044 pctx->p.ack_seq == cur_stream->sndvar->fss) || 1045 pctx->p.ack_seq == cur_stream->sndvar->fss + 1)) { 1046 cur_stream->sndvar->snd_una = pctx->p.ack_seq; 1047 if (TCP_SEQ_GT(pctx->p.ack_seq, cur_stream->snd_nxt)) { 1048 TRACE_DBG("Stream %d: update snd_nxt to %u\n", 1049 cur_stream->id, pctx->p.ack_seq); 1050 cur_stream->snd_nxt = pctx->p.ack_seq; 1051 } 1052 //cur_stream->sndvar->snd_una++; 1053 //UpdateRetransmissionTimer(mtcp, cur_stream, cur_ts); 1054 cur_stream->sndvar->nrtx = 0; 1055 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 1056 RemoveFromRTOList(mtcp, cur_stream); 1057 cur_stream->state = TCP_ST_FIN_WAIT_2; 1058 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1059 TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_2\n", 1060 cur_stream->id); 1061 } else { 1062 RAISE_DEBUG_EVENT(mtcp, cur_stream, 1063 "Failed to transit to FIN_WAIT_2, " 1064 "is_fin_sent: %s, ack_seq: %u, sndvar->fss: %u\n", 1065 cur_stream->sndvar->is_fin_sent ? "true" : "false", 1066 pctx->p.ack_seq, cur_stream->sndvar->fss); 1067 } 1068 1069 } else { 1070 RAISE_DEBUG_EVENT(mtcp, cur_stream, 1071 "Failed to transit to FIN_WAIT_2, " 1072 "We got a %s%s%s%s%s%s packet.", 1073 pctx->p.tcph->syn ? "S" : "", 1074 pctx->p.tcph->fin ? "F" : "", 1075 pctx->p.tcph->rst ? "R" : "", 1076 pctx->p.tcph->psh ? "P" : "", 1077 pctx->p.tcph->urg ? "U" : "", 1078 pctx->p.tcph->ack ? "A" : ""); 1079 1080 TRACE_DBG("Stream %d: does not contain an ack!\n", 1081 cur_stream->id); 1082 return; 1083 } 1084 1085 if (pctx->p.payloadlen > 0) { 1086 if (ProcessTCPPayload(mtcp, cur_stream, pctx)) { 1087 /* if return is TRUE, send ACK */ 1088 cur_stream->actions |= MOS_ACT_SEND_ACK_AGG; 1089 } else { 1090 cur_stream->actions |= MOS_ACT_SEND_ACK_NOW; 1091 } 1092 } 1093 1094 if (tcph->fin) { 1095 /* process the FIN only if the sequence is valid */ 1096 /* FIN packet is allowed to push payload (should we check for PSH flag)? */ 1097 if (!cur_stream->buffer_mgmt || 1098 #ifdef BE_RESILIENT_TO_PACKET_DROP 1099 TCP_SEQ_GEQ(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt) 1100 #else 1101 pctx->p.seq + pctx->p.payloadlen == cur_stream->rcv_nxt 1102 #endif 1103 ) { 1104 cur_stream->rcv_nxt++; 1105 1106 if (cur_stream->state == TCP_ST_FIN_WAIT_1) { 1107 cur_stream->state = TCP_ST_CLOSING; 1108 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1109 TRACE_STATE("Stream %d: TCP_ST_CLOSING\n", cur_stream->id); 1110 1111 } else if (cur_stream->state == TCP_ST_FIN_WAIT_2) { 1112 cur_stream->state = TCP_ST_TIME_WAIT; 1113 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1114 TRACE_STATE("Stream %d: TCP_ST_TIME_WAIT\n", cur_stream->id); 1115 AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts); 1116 } 1117 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 1118 } else { 1119 RAISE_DEBUG_EVENT(mtcp, cur_stream, 1120 "Expected %u, but received %u (= %u + %u)\n", 1121 cur_stream->rcv_nxt, pctx->p.seq + pctx->p.payloadlen, 1122 pctx->p.seq, pctx->p.payloadlen); 1123 1124 cur_stream->actions |= MOS_ACT_SEND_ACK_NOW; 1125 return; 1126 } 1127 } 1128 } 1129 /*----------------------------------------------------------------------------*/ 1130 static inline void 1131 Handle_TCP_ST_FIN_WAIT_2 (mtcp_manager_t mtcp, tcp_stream* cur_stream, 1132 struct pkt_ctx *pctx) 1133 { 1134 const struct tcphdr* tcph = pctx->p.tcph; 1135 1136 if (tcph->ack) { 1137 if (cur_stream->sndvar->sndbuf) { 1138 ProcessACK(mtcp, cur_stream, pctx); 1139 } 1140 } else { 1141 TRACE_DBG("Stream %d: does not contain an ack!\n", 1142 cur_stream->id); 1143 return; 1144 } 1145 1146 if (pctx->p.payloadlen > 0) { 1147 if (ProcessTCPPayload(mtcp, cur_stream, pctx)) { 1148 /* if return is TRUE, send ACK */ 1149 cur_stream->actions |= MOS_ACT_SEND_ACK_AGG; 1150 } else { 1151 cur_stream->actions |= MOS_ACT_SEND_ACK_NOW; 1152 } 1153 } 1154 1155 if (tcph->fin) { 1156 /* process the FIN only if the sequence is valid */ 1157 /* FIN packet is allowed to push payload (should we check for PSH flag)? */ 1158 if (!cur_stream->buffer_mgmt || 1159 #ifdef BE_RESILIENT_TO_PACKET_DROP 1160 TCP_SEQ_GEQ(pctx->p.seq + pctx->p.payloadlen, cur_stream->rcv_nxt) 1161 #else 1162 pctx->p.seq + pctx->p.payloadlen == cur_stream->rcv_nxt 1163 #endif 1164 ) { 1165 cur_stream->state = TCP_ST_TIME_WAIT; 1166 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1167 cur_stream->rcv_nxt++; 1168 TRACE_STATE("Stream %d: TCP_ST_TIME_WAIT\n", cur_stream->id); 1169 1170 AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts); 1171 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 1172 } 1173 } else { 1174 TRACE_DBG("Stream %d (TCP_ST_FIN_WAIT_2): No FIN. " 1175 "seq: %u, ack_seq: %u, snd_nxt: %u, snd_una: %u\n", 1176 cur_stream->id, pctx->p.seq, pctx->p.ack_seq, 1177 cur_stream->snd_nxt, cur_stream->sndvar->snd_una); 1178 #if DBGMSG 1179 DumpIPPacket(mtcp, pctx->p.iph, pctx->p.ip_len); 1180 #endif 1181 } 1182 1183 } 1184 /*----------------------------------------------------------------------------*/ 1185 static inline void 1186 Handle_TCP_ST_CLOSING (mtcp_manager_t mtcp, tcp_stream* cur_stream, 1187 struct pkt_ctx *pctx) 1188 { 1189 const struct tcphdr* tcph = pctx->p.tcph; 1190 1191 if (tcph->ack) { 1192 if (cur_stream->sndvar->sndbuf) { 1193 ProcessACK(mtcp, cur_stream, pctx); 1194 } 1195 1196 if (!cur_stream->sndvar->is_fin_sent) { 1197 TRACE_DBG("Stream %d (TCP_ST_CLOSING): " 1198 "No FIN sent yet.\n", cur_stream->id); 1199 return; 1200 } 1201 1202 // check if ACK of FIN 1203 if (pctx->p.ack_seq != cur_stream->sndvar->fss + 1) { 1204 #if DBGMSG 1205 TRACE_DBG("Stream %d (TCP_ST_CLOSING): Not ACK of FIN. " 1206 "ack_seq: %u, snd_nxt: %u, snd_una: %u, fss: %u\n", 1207 cur_stream->id, pctx->p.ack_seq, cur_stream->snd_nxt, 1208 cur_stream->sndvar->snd_una, cur_stream->sndvar->fss); 1209 DumpIPPacketToFile(stderr, pctx->p.iph, pctx->p.ip_len); 1210 DumpStream(mtcp, cur_stream); 1211 #endif 1212 //assert(0); 1213 /* if the packet is not the ACK of FIN, ignore */ 1214 return; 1215 } 1216 1217 cur_stream->sndvar->snd_una = pctx->p.ack_seq; 1218 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 1219 UpdateRetransmissionTimer(mtcp, cur_stream, pctx->p.cur_ts); 1220 cur_stream->state = TCP_ST_TIME_WAIT; 1221 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1222 TRACE_STATE("Stream %d: TCP_ST_TIME_WAIT\n", cur_stream->id); 1223 1224 AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts); 1225 1226 } else { 1227 TRACE_DBG("Stream %d (TCP_ST_CLOSING): Not ACK\n", 1228 cur_stream->id); 1229 return; 1230 } 1231 } 1232 /*----------------------------------------------------------------------------*/ 1233 void 1234 UpdateRecvTCPContext(mtcp_manager_t mtcp, struct tcp_stream *cur_stream, 1235 struct pkt_ctx *pctx) 1236 { 1237 struct tcphdr* tcph = pctx->p.tcph; 1238 int ret; 1239 1240 assert(cur_stream); 1241 1242 /* Validate sequence. if not valid, ignore the packet */ 1243 if (cur_stream->state > TCP_ST_SYN_RCVD) { 1244 1245 ret = ValidateSequence(mtcp, cur_stream, pctx); 1246 if (!ret) { 1247 TRACE_DBG("Stream %d: Unexpected sequence: %u, expected: %u\n", 1248 cur_stream->id, pctx->p.seq, cur_stream->rcv_nxt); 1249 #ifdef DBGMSG 1250 DumpIPPacket(mtcp, pctx->p.iph, pctx->p.ip_len); 1251 #endif 1252 #if DUMP_STREAM 1253 DumpStream(mtcp, cur_stream); 1254 #endif 1255 /* cur_stream->cb_events |= MOS_ON_ERROR; */ 1256 } 1257 } 1258 /* Update receive window size */ 1259 if (tcph->syn) { 1260 cur_stream->sndvar->peer_wnd = pctx->p.window; 1261 } else { 1262 cur_stream->sndvar->peer_wnd = 1263 (uint32_t)pctx->p.window << cur_stream->sndvar->wscale_peer; 1264 } 1265 1266 cur_stream->last_active_ts = pctx->p.cur_ts; 1267 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 1268 UpdateTimeoutList(mtcp, cur_stream); 1269 1270 /* Process RST: process here only if state > TCP_ST_SYN_SENT */ 1271 if (tcph->rst) { 1272 cur_stream->have_reset = TRUE; 1273 if (cur_stream->state > TCP_ST_SYN_SENT) { 1274 if (ProcessRST(mtcp, cur_stream, pctx)) { 1275 return; 1276 } 1277 } 1278 } 1279 1280 if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE) && 1281 pctx->p.tcph->fin) { 1282 if (cur_stream->state == TCP_ST_CLOSE_WAIT || 1283 cur_stream->state == TCP_ST_LAST_ACK || 1284 cur_stream->state == TCP_ST_CLOSING || 1285 cur_stream->state == TCP_ST_TIME_WAIT) { 1286 /* Handle retransmitted FIN packet */ 1287 if (pctx->p.seq == cur_stream->pair_stream->sndvar->fss) { 1288 TRACE_DBG("FIN retransmit! (seq = %u / fss = %u)\n", 1289 pctx->p.seq, cur_stream->pair_stream->sndvar->fss); 1290 cur_stream->cb_events |= MOS_ON_REXMIT; 1291 } 1292 } 1293 } 1294 1295 switch (cur_stream->state) { 1296 case TCP_ST_LISTEN: 1297 Handle_TCP_ST_LISTEN(mtcp, cur_stream, pctx); 1298 break; 1299 1300 case TCP_ST_SYN_SENT: 1301 Handle_TCP_ST_SYN_SENT(mtcp, cur_stream, pctx); 1302 break; 1303 1304 case TCP_ST_SYN_RCVD: 1305 /* SYN retransmit implies our SYN/ACK was lost. Resend */ 1306 if (tcph->syn && pctx->p.seq == cur_stream->rcvvar->irs) 1307 Handle_TCP_ST_LISTEN(mtcp, cur_stream, pctx); 1308 else { 1309 Handle_TCP_ST_SYN_RCVD(mtcp, cur_stream, pctx); 1310 if (pctx->p.payloadlen > 0 && cur_stream->state == TCP_ST_ESTABLISHED) 1311 Handle_TCP_ST_ESTABLISHED(mtcp, cur_stream, pctx); 1312 } 1313 break; 1314 1315 case TCP_ST_ESTABLISHED: 1316 Handle_TCP_ST_ESTABLISHED(mtcp, cur_stream, pctx); 1317 break; 1318 1319 case TCP_ST_CLOSE_WAIT: 1320 Handle_TCP_ST_CLOSE_WAIT(mtcp, cur_stream, pctx); 1321 break; 1322 1323 case TCP_ST_LAST_ACK: 1324 Handle_TCP_ST_LAST_ACK(mtcp, cur_stream, pctx); 1325 break; 1326 1327 case TCP_ST_FIN_WAIT_1: 1328 Handle_TCP_ST_FIN_WAIT_1(mtcp, cur_stream, pctx); 1329 break; 1330 1331 case TCP_ST_FIN_WAIT_2: 1332 Handle_TCP_ST_FIN_WAIT_2(mtcp, cur_stream, pctx); 1333 break; 1334 1335 case TCP_ST_CLOSING: 1336 Handle_TCP_ST_CLOSING(mtcp, cur_stream, pctx); 1337 break; 1338 1339 case TCP_ST_TIME_WAIT: 1340 /* the only thing that can arrive in this state is a retransmission 1341 of the remote FIN. Acknowledge it, and restart the 2 MSL timeout */ 1342 if (cur_stream->on_timewait_list) { 1343 RemoveFromTimewaitList(mtcp, cur_stream); 1344 AddtoTimewaitList(mtcp, cur_stream, pctx->p.cur_ts); 1345 } 1346 cur_stream->actions |= MOS_ACT_SEND_CONTROL; 1347 break; 1348 1349 case TCP_ST_CLOSED: 1350 case TCP_ST_CLOSED_RSVD: 1351 break; 1352 1353 default: 1354 break; 1355 } 1356 1357 TRACE_STATE("Stream %d: Events: %0lx, Action: %0x\n", 1358 cur_stream->id, cur_stream->cb_events, cur_stream->actions); 1359 return; 1360 } 1361 /*----------------------------------------------------------------------------*/ 1362 void 1363 DoActionEndTCPPacket(mtcp_manager_t mtcp, struct tcp_stream *cur_stream, 1364 struct pkt_ctx *pctx) 1365 { 1366 int i; 1367 1368 for (i = 1; i < MOS_ACT_CNT; i = i << 1) { 1369 1370 if (cur_stream->actions & i) { 1371 switch(i) { 1372 case MOS_ACT_SEND_DATA: 1373 AddtoSendList(mtcp, cur_stream); 1374 break; 1375 case MOS_ACT_SEND_ACK_NOW: 1376 EnqueueACK(mtcp, cur_stream, pctx->p.cur_ts, ACK_OPT_NOW); 1377 break; 1378 case MOS_ACT_SEND_ACK_AGG: 1379 EnqueueACK(mtcp, cur_stream, pctx->p.cur_ts, ACK_OPT_AGGREGATE); 1380 break; 1381 case MOS_ACT_SEND_CONTROL: 1382 AddtoControlList(mtcp, cur_stream, pctx->p.cur_ts); 1383 break; 1384 case MOS_ACT_SEND_RST: 1385 if (cur_stream->state <= TCP_ST_SYN_SENT) 1386 SendTCPPacketStandalone(mtcp, 1387 pctx->p.iph->daddr, pctx->p.tcph->dest, 1388 pctx->p.iph->saddr, pctx->p.tcph->source, 1389 0, pctx->p.seq + 1, 0, TCP_FLAG_RST | TCP_FLAG_ACK, 1390 NULL, 0, pctx->p.cur_ts, 0, 0, -1); 1391 else 1392 SendTCPPacketStandalone(mtcp, 1393 pctx->p.iph->daddr, pctx->p.tcph->dest, 1394 pctx->p.iph->saddr, pctx->p.tcph->source, 1395 pctx->p.ack_seq, 0, 0, TCP_FLAG_RST | TCP_FLAG_ACK, 1396 NULL, 0, pctx->p.cur_ts, 0, 0, -1); 1397 break; 1398 case MOS_ACT_DESTROY: 1399 DestroyTCPStream(mtcp, cur_stream); 1400 break; 1401 default: 1402 assert(1); 1403 break; 1404 } 1405 } 1406 } 1407 1408 cur_stream->actions = 0; 1409 } 1410 /*----------------------------------------------------------------------------*/ 1411 /** 1412 * Called (when monitoring mode is enabled).. for every outgoing packet to the 1413 * NIC. 1414 */ 1415 inline void 1416 UpdatePassiveRecvTCPContext(mtcp_manager_t mtcp, struct tcp_stream *cur_stream, 1417 struct pkt_ctx *pctx) 1418 { 1419 UpdateRecvTCPContext(mtcp, cur_stream, pctx); 1420 } 1421 /*----------------------------------------------------------------------------*/ 1422 /* NOTE TODO: This event prediction is additional overhaed of HK_RCV hook. 1423 * We can transparently optimize this by disabling prediction of events which 1424 * are not monitored by anyone. */ 1425 inline void 1426 PreRecvTCPEventPrediction(mtcp_manager_t mtcp, struct pkt_ctx *pctx, 1427 struct tcp_stream *recvside_stream) 1428 { 1429 tcp_rb_overlapchk(mtcp, pctx, recvside_stream); 1430 } 1431 /*----------------------------------------------------------------------------*/ 1432