1 #include <assert.h> 2 #include <string.h> 3 4 #include "mtcp.h" 5 #include "arp.h" 6 #include "socket.h" 7 #include "eth_out.h" 8 #include "ip_out.h" 9 #include "mos_api.h" 10 #include "tcp_util.h" 11 #include "tcp_in.h" 12 #include "tcp_out.h" 13 #include "tcp_ring_buffer.h" 14 #include "eventpoll.h" 15 #include "debug.h" 16 #include "timer.h" 17 #include "ip_in.h" 18 #include "config.h" 19 20 extern struct pkt_info * 21 ClonePacketCtx(struct pkt_info *to, unsigned char *frame, struct pkt_info *from); 22 23 #define VERIFY_RX_CHECKSUM TRUE 24 /*----------------------------------------------------------------------------*/ 25 static inline uint32_t 26 DetectStreamType(mtcp_manager_t mtcp, struct pkt_ctx *pctx, 27 uint32_t ip, uint16_t port) 28 { 29 /* To Do: We will extend this filter to check listeners for proxy as well */ 30 struct sockaddr_in *addr; 31 int rc, cnt_match, socktype; 32 struct mon_listener *walk; 33 struct sfbpf_program fcode; 34 35 cnt_match = 0; 36 rc = 0; 37 38 if (mtcp->num_msp > 0) { 39 /* mtcp_bind_monitor_filter() 40 * - create MonitorTCPStream only when the filter of any of the existing 41 * passive sockets match the incoming flow */ 42 TAILQ_FOREACH(walk, &mtcp->monitors, link) { 43 /* For every passive monitor sockets, */ 44 socktype = walk->socket->socktype; 45 if (socktype != MOS_SOCK_MONITOR_STREAM) 46 continue; // XXX: can this happen?? 47 48 /* if pctx hits the filter rule, handle the passive monitor socket */ 49 fcode = walk->stream_syn_fcode; 50 if (!(ISSET_BPFFILTER(fcode) && pctx && 51 EVAL_BPFFILTER(fcode, (uint8_t *)pctx->p.iph - sizeof(struct ethhdr), 52 pctx->p.ip_len + sizeof(struct ethhdr)) == 0)) { 53 walk->is_stream_syn_filter_hit = 1;// set the 'filter hit' flag to 1 54 cnt_match++; // count the number of matched sockets 55 } 56 } 57 58 /* if there's any passive monitoring socket whose filter is hit, 59 we should create monitor stream */ 60 if (cnt_match > 0) 61 rc = STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE); 62 } 63 64 if (mtcp->listener) { 65 /* Detect end TCP stack mode */ 66 addr = &mtcp->listener->socket->saddr; 67 if (addr->sin_port == port) { 68 if (addr->sin_addr.s_addr != INADDR_ANY) { 69 if (ip == addr->sin_addr.s_addr) { 70 rc |= STREAM_TYPE(MOS_SOCK_STREAM); 71 } 72 } else { 73 int i; 74 75 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 76 if (ip == g_config.mos->netdev_table->ent[i]->ip_addr) { 77 rc |= STREAM_TYPE(MOS_SOCK_STREAM); 78 } 79 } 80 } 81 } 82 } 83 84 return rc; 85 } 86 /*----------------------------------------------------------------------------*/ 87 static inline tcp_stream * 88 CreateServerStream(mtcp_manager_t mtcp, int type, struct pkt_ctx *pctx) 89 { 90 tcp_stream *cur_stream = NULL; 91 92 /* create new stream and add to flow hash table */ 93 cur_stream = CreateTCPStream(mtcp, NULL, type, 94 pctx->p.iph->daddr, pctx->p.tcph->dest, 95 pctx->p.iph->saddr, pctx->p.tcph->source, NULL); 96 if (!cur_stream) { 97 TRACE_ERROR("INFO: Could not allocate tcp_stream!\n"); 98 return FALSE; 99 } 100 101 cur_stream->rcvvar->irs = pctx->p.seq; 102 cur_stream->sndvar->peer_wnd = pctx->p.window; 103 cur_stream->rcv_nxt = cur_stream->rcvvar->irs; 104 cur_stream->sndvar->cwnd = 1; 105 ParseTCPOptions(cur_stream, pctx->p.cur_ts, (uint8_t *)pctx->p.tcph + 106 TCP_HEADER_LEN, (pctx->p.tcph->doff << 2) - TCP_HEADER_LEN); 107 108 return cur_stream; 109 } 110 /*----------------------------------------------------------------------------*/ 111 static inline tcp_stream * 112 CreateMonitorStream(mtcp_manager_t mtcp, struct pkt_ctx* pctx, 113 uint32_t stream_type, unsigned int *hash) 114 { 115 tcp_stream *stream = NULL; 116 struct socket_map *walk; 117 /* create a client stream context */ 118 stream = CreateDualTCPStream(mtcp, NULL, stream_type, pctx->p.iph->daddr, 119 pctx->p.tcph->dest, pctx->p.iph->saddr, 120 pctx->p.tcph->source, NULL); 121 if (!stream) 122 return FALSE; 123 124 stream->side = MOS_SIDE_CLI; 125 stream->pair_stream->side = MOS_SIDE_SVR; 126 /* update recv context */ 127 stream->rcvvar->irs = pctx->p.seq; 128 stream->sndvar->peer_wnd = pctx->p.window; 129 stream->rcv_nxt = stream->rcvvar->irs + 1; 130 stream->sndvar->cwnd = 1; 131 132 /* 133 * if buffer management is off, then disable 134 * monitoring tcp ring of either streams (only if stream 135 * is just monitor stream active) 136 */ 137 if (IS_STREAM_TYPE(stream, MOS_SOCK_MONITOR_STREAM_ACTIVE)) { 138 assert(IS_STREAM_TYPE(stream->pair_stream, 139 MOS_SOCK_MONITOR_STREAM_ACTIVE)); 140 141 stream->buffer_mgmt = FALSE; 142 stream->pair_stream->buffer_mgmt = FALSE; 143 144 /* 145 * if there is even a single monitor asking for 146 * buffer management, enable it (that's why the 147 * need for the loop) 148 */ 149 uint8_t bm; 150 stream->status_mgmt = 0; 151 SOCKQ_FOREACH_START(walk, &stream->msocks) { 152 bm = walk->monitor_stream->monitor_listener->server_buf_mgmt; 153 if (bm > stream->buffer_mgmt) { 154 stream->buffer_mgmt = bm; 155 } 156 if (walk->monitor_stream->monitor_listener->server_mon == 1) { 157 stream->status_mgmt = 1; 158 } 159 } SOCKQ_FOREACH_END; 160 161 stream->pair_stream->status_mgmt = 0; 162 SOCKQ_FOREACH_START(walk, &stream->pair_stream->msocks) { 163 bm = walk->monitor_stream->monitor_listener->client_buf_mgmt; 164 if (bm > stream->pair_stream->buffer_mgmt) { 165 stream->pair_stream->buffer_mgmt = bm; 166 } 167 if (walk->monitor_stream->monitor_listener->client_mon == 1) { 168 stream->pair_stream->status_mgmt = 1; 169 } 170 } SOCKQ_FOREACH_END; 171 } 172 173 ParseTCPOptions(stream, pctx->p.cur_ts, 174 (uint8_t *)pctx->p.tcph + TCP_HEADER_LEN, 175 (pctx->p.tcph->doff << 2) - TCP_HEADER_LEN); 176 177 return stream; 178 } 179 /*----------------------------------------------------------------------------*/ 180 static inline struct tcp_stream * 181 FindStream(mtcp_manager_t mtcp, struct pkt_ctx *pctx, unsigned int *hash) 182 { 183 struct tcp_stream temp_stream; 184 185 temp_stream.saddr = pctx->p.iph->daddr; 186 temp_stream.sport = pctx->p.tcph->dest; 187 temp_stream.daddr = pctx->p.iph->saddr; 188 temp_stream.dport = pctx->p.tcph->source; 189 190 return HTSearch(mtcp->tcp_flow_table, &temp_stream, hash); 191 } 192 /*----------------------------------------------------------------------------*/ 193 /* Create new flow for new packet or return NULL */ 194 /*----------------------------------------------------------------------------*/ 195 static inline struct tcp_stream * 196 CreateStream(mtcp_manager_t mtcp, struct pkt_ctx *pctx, unsigned int *hash) 197 { 198 tcp_stream *cur_stream = NULL; 199 uint32_t stream_type; 200 const struct iphdr *iph = pctx->p.iph; 201 const struct tcphdr* tcph = pctx->p.tcph; 202 203 if (tcph->syn && !tcph->ack) { 204 /* handle the SYN */ 205 206 stream_type = DetectStreamType(mtcp, pctx, iph->daddr, tcph->dest); 207 if (!stream_type) { 208 TRACE_DBG("Refusing SYN packet.\n"); 209 #ifdef DBGMSG 210 DumpIPPacket(mtcp, iph, pctx->p.ip_len); 211 #endif 212 return NULL; 213 } 214 215 /* if it is accepting connections only */ 216 if (stream_type == STREAM_TYPE(MOS_SOCK_STREAM)) { 217 cur_stream = CreateServerStream(mtcp, stream_type, pctx); 218 if (!cur_stream) { 219 TRACE_DBG("No available space in flow pool.\n"); 220 #ifdef DBGMSG 221 DumpIPPacket(mtcp, iph, pctx->p.ip_len); 222 #endif 223 } 224 } else if (stream_type & STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE)) { 225 /* 226 * create both monitoring streams, and accept 227 * connection if it is set in embedded environment 228 */ 229 #if 1 230 cur_stream = CreateClientTCPStream(mtcp, NULL, stream_type, 231 pctx->p.iph->saddr, pctx->p.tcph->source, 232 pctx->p.iph->daddr, pctx->p.tcph->dest, 233 hash); 234 #else 235 cur_stream = CreateMonitorStream(mtcp, pctx, stream_type, hash); 236 #endif 237 if (!cur_stream) { 238 TRACE_DBG("No available space in flow pool.\n"); 239 #ifdef DBGMSG 240 DumpIPPacket(mtcp, iph, pctx->p.ip_len); 241 #endif 242 } 243 } else { 244 /* invalid stream type! */ 245 } 246 247 return cur_stream; 248 249 } else { 250 TRACE_DBG("Weird packet comes.\n"); 251 #ifdef DBGMSG 252 DumpIPPacket(mtcp, iph, pctx->p.ip_len); 253 #endif 254 return NULL; 255 } 256 } 257 /*----------------------------------------------------------------------------*/ 258 inline void 259 FillPacketContextTCPInfo(struct pkt_ctx *pctx, struct tcphdr * tcph) 260 { 261 pctx->p.tcph = tcph; 262 pctx->p.payload = (uint8_t *)tcph + (tcph->doff << 2); 263 pctx->p.payloadlen = pctx->p.ip_len - (pctx->p.payload - (u_char *)pctx->p.iph); 264 pctx->p.seq = ntohl(tcph->seq); 265 pctx->p.ack_seq = ntohl(tcph->ack_seq); 266 pctx->p.window = ntohs(tcph->window); 267 pctx->p.offset = 0; 268 269 return ; 270 } 271 /*----------------------------------------------------------------------------*/ 272 /** 273 * Called for every incoming packet from the NIC (when monitoring is disabled) 274 */ 275 static void 276 HandleSockStream(mtcp_manager_t mtcp, struct tcp_stream *cur_stream, 277 struct pkt_ctx *pctx) 278 { 279 UpdateRecvTCPContext(mtcp, cur_stream, pctx); 280 DoActionEndTCPPacket(mtcp, cur_stream, pctx); 281 } 282 /*----------------------------------------------------------------------------*/ 283 void 284 UpdateMonitor(mtcp_manager_t mtcp, struct tcp_stream *sendside_stream, 285 struct tcp_stream *recvside_stream, struct pkt_ctx *pctx, 286 bool is_pkt_reception) 287 { 288 struct socket_map *walk; 289 290 assert(pctx); 291 292 #ifdef RECORDPKT_PER_STREAM 293 /* clone sendside_stream even if sender is disabled */ 294 ClonePacketCtx(&sendside_stream->last_pctx.p, 295 sendside_stream->last_pkt_data, &(pctx.p)); 296 #endif 297 298 /* update send stream context first */ 299 if (sendside_stream->status_mgmt) { 300 sendside_stream->cb_events = MOS_ON_PKT_IN; 301 302 if (is_pkt_reception) 303 UpdatePassiveSendTCPContext(mtcp, sendside_stream, pctx); 304 305 sendside_stream->allow_pkt_modification = true; 306 /* POST hook of sender */ 307 if (sendside_stream->side == MOS_SIDE_CLI) { 308 SOCKQ_FOREACH_START(walk, &sendside_stream->msocks) { 309 HandleCallback(mtcp, MOS_HK_SND, walk, sendside_stream->side, 310 pctx, sendside_stream->cb_events); 311 } SOCKQ_FOREACH_END; 312 } else { /* sendside_stream->side == MOS_SIDE_SVR */ 313 SOCKQ_FOREACH_REVERSE(walk, &sendside_stream->msocks) { 314 HandleCallback(mtcp, MOS_HK_SND, walk, sendside_stream->side, 315 pctx, sendside_stream->cb_events); 316 } SOCKQ_FOREACH_END; 317 } 318 sendside_stream->allow_pkt_modification = false; 319 } 320 321 /* Attach Server-side stream */ 322 if (recvside_stream == NULL) { 323 assert(sendside_stream->side == MOS_SIDE_CLI); 324 if ((recvside_stream = AttachServerTCPStream(mtcp, sendside_stream, 0, 325 pctx->p.iph->saddr, pctx->p.tcph->source, 326 pctx->p.iph->daddr, pctx->p.tcph->dest)) == NULL) { 327 DestroyTCPStream(mtcp, sendside_stream); 328 return; 329 } 330 /* update recv context */ 331 recvside_stream->rcvvar->irs = pctx->p.seq; 332 recvside_stream->sndvar->peer_wnd = pctx->p.window; 333 recvside_stream->rcv_nxt = recvside_stream->rcvvar->irs + 1; 334 recvside_stream->sndvar->cwnd = 1; 335 336 ParseTCPOptions(recvside_stream, pctx->p.cur_ts, 337 (uint8_t *)pctx->p.tcph + TCP_HEADER_LEN, 338 (pctx->p.tcph->doff << 2) - TCP_HEADER_LEN); 339 } 340 341 /* Perform post-send tcp activities */ 342 PostSendTCPAction(mtcp, pctx, recvside_stream, sendside_stream); 343 344 if (/*1*/recvside_stream->status_mgmt) { 345 recvside_stream->cb_events = MOS_ON_PKT_IN; 346 347 /* Predict events which may be raised prior to performing TCP processing */ 348 PreRecvTCPEventPrediction(mtcp, pctx, recvside_stream); 349 350 /* retransmitted packet should avoid event simulation */ 351 //if ((recvside_stream->cb_events & MOS_ON_REXMIT) == 0) 352 /* update receive stream context (recv_side stream) */ 353 if (is_pkt_reception) 354 UpdateRecvTCPContext(mtcp, recvside_stream, pctx); 355 else 356 UpdatePassiveRecvTCPContext(mtcp, recvside_stream, pctx); 357 358 /* POST hook of receiver */ 359 if (recvside_stream->side == MOS_SIDE_CLI) { 360 SOCKQ_FOREACH_REVERSE(walk, &recvside_stream->msocks) { 361 HandleCallback(mtcp, MOS_HK_RCV, walk, recvside_stream->side, 362 pctx, recvside_stream->cb_events); 363 } SOCKQ_FOREACH_END; 364 } else { /* recvside_stream->side == MOS_SIDE_SVR */ 365 SOCKQ_FOREACH_START(walk, &recvside_stream->msocks) { 366 HandleCallback(mtcp, MOS_HK_RCV, walk, recvside_stream->side, 367 pctx, recvside_stream->cb_events); 368 } SOCKQ_FOREACH_END; 369 } 370 } 371 372 /* reset callback events counter */ 373 recvside_stream->cb_events = 0; 374 sendside_stream->cb_events = 0; 375 } 376 /*----------------------------------------------------------------------------*/ 377 static void 378 HandleMonitorStream(mtcp_manager_t mtcp, struct tcp_stream *sendside_stream, 379 struct tcp_stream *recvside_stream, struct pkt_ctx *pctx) 380 { 381 UpdateMonitor(mtcp, sendside_stream, recvside_stream, pctx, true); 382 383 recvside_stream = sendside_stream->pair_stream; 384 385 if (HAS_STREAM_TYPE(recvside_stream, MOS_SOCK_STREAM)) { 386 DoActionEndTCPPacket(mtcp, recvside_stream, pctx); 387 } else { 388 /* forward packets */ 389 if (pctx->forward) 390 ForwardIPPacket(mtcp, pctx); 391 392 if (recvside_stream->stream_type == sendside_stream->stream_type && 393 IS_STREAM_TYPE(recvside_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE)) { 394 if (((recvside_stream->state == TCP_ST_TIME_WAIT && 395 g_config.mos->tcp_tw_interval == 0) || 396 recvside_stream->state == TCP_ST_CLOSED_RSVD || 397 !recvside_stream->status_mgmt) && 398 ((sendside_stream->state == TCP_ST_TIME_WAIT && 399 g_config.mos->tcp_tw_interval == 0) || 400 sendside_stream->state == TCP_ST_CLOSED_RSVD || 401 !sendside_stream->status_mgmt)) 402 403 DestroyTCPStream(mtcp, recvside_stream); 404 } 405 } 406 } 407 /*----------------------------------------------------------------------------*/ 408 int 409 ProcessInTCPPacket(mtcp_manager_t mtcp, struct pkt_ctx *pctx) 410 { 411 uint64_t events = 0; 412 struct tcp_stream *cur_stream; 413 struct iphdr* iph; 414 struct tcphdr* tcph; 415 struct mon_listener *walk; 416 unsigned int hash = 0; 417 418 iph = pctx->p.iph; 419 tcph = (struct tcphdr *)((u_char *)pctx->p.iph + (pctx->p.iph->ihl << 2)); 420 421 FillPacketContextTCPInfo(pctx, tcph); 422 423 /* callback for monitor raw socket */ 424 TAILQ_FOREACH(walk, &mtcp->monitors, link) 425 if (walk->socket->socktype == MOS_SOCK_MONITOR_RAW) 426 HandleCallback(mtcp, MOS_NULL, walk->socket, MOS_SIDE_BOTH, 427 pctx, MOS_ON_PKT_IN); 428 429 if (pctx->p.ip_len < ((iph->ihl + tcph->doff) << 2)) 430 return ERROR; 431 432 #if VERIFY_RX_CHECKSUM 433 if (TCPCalcChecksum((uint16_t *)pctx->p.tcph, 434 (tcph->doff << 2) + pctx->p.payloadlen, 435 iph->saddr, pctx->p.iph->daddr)) { 436 TRACE_DBG("Checksum Error: Original: 0x%04x, calculated: 0x%04x\n", 437 tcph->check, TCPCalcChecksum((uint16_t *)tcph, 438 (tcph->doff << 2) + pctx->p.payloadlen, 439 iph->saddr, iph->daddr)); 440 if (pctx->forward && mtcp->num_msp) 441 ForwardIPPacket(mtcp, pctx); 442 return ERROR; 443 } 444 #endif 445 events |= MOS_ON_PKT_IN; 446 447 /* Check whether a packet is belong to any stream */ 448 cur_stream = FindStream(mtcp, pctx, &hash); 449 if (!cur_stream) { 450 /* 451 * No need to create stream for monitor. 452 * But do create 1 for client case! 453 */ 454 if (mtcp->listener == NULL && mtcp->num_msp == 0) { 455 //if (pctx->forward) 456 // ForwardIPPacket(mtcp, pctx); 457 return TRUE; 458 } 459 /* Create new flow for new packet or return NULL */ 460 cur_stream = CreateStream(mtcp, pctx, &hash); 461 if (!cur_stream) 462 events = MOS_ON_ORPHAN; 463 } 464 465 if (cur_stream) { 466 cur_stream->cb_events = events; 467 468 if (cur_stream->rcvvar && cur_stream->rcvvar->rcvbuf) 469 pctx->p.offset = (uint64_t)seq2loff(cur_stream->rcvvar->rcvbuf, 470 pctx->p.seq, cur_stream->rcvvar->irs + 1); 471 472 if (IS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 473 HandleSockStream(mtcp, cur_stream, pctx); 474 475 else if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE)) 476 HandleMonitorStream(mtcp, cur_stream, cur_stream->pair_stream, pctx); 477 else 478 assert(0); 479 } else { 480 struct mon_listener *walk; 481 struct sfbpf_program fcode; 482 /* 483 * event callback for pkt_no_conn; MOS_SIDE_BOTH 484 * means that we can't judge sides here 485 */ 486 TAILQ_FOREACH(walk, &mtcp->monitors, link) { 487 /* mtcp_bind_monitor_filter() 488 * - apply stream orphan filter to every pkt before raising ORPHAN event */ 489 fcode = walk->stream_orphan_fcode; 490 if (!(ISSET_BPFFILTER(fcode) && pctx && 491 EVAL_BPFFILTER(fcode, (uint8_t *)pctx->p.iph - sizeof(struct ethhdr), 492 pctx->p.ip_len + sizeof(struct ethhdr)) == 0)) { 493 HandleCallback(mtcp, MOS_NULL, walk->socket, MOS_SIDE_BOTH, 494 pctx, events); 495 } 496 } 497 if (mtcp->listener) { 498 /* RFC 793 (page 65) says 499 "An incoming segment containing a RST is discarded." 500 if the TCP state is CLOSED (= TCP stream does not exist). */ 501 if (!tcph->rst) 502 /* Send RST if it is run as EndTCP only mode */ 503 SendTCPPacketStandalone(mtcp, 504 iph->daddr, tcph->dest, iph->saddr, tcph->source, 505 0, pctx->p.seq + pctx->p.payloadlen + 1, 0, 506 TCP_FLAG_RST | TCP_FLAG_ACK, 507 NULL, 0, pctx->p.cur_ts, 0, 0, -1); 508 } else if (pctx->forward) { 509 /* Do forward or drop if it run as Monitor only mode */ 510 ForwardIPPacket(mtcp, pctx); 511 } 512 } 513 514 return TRUE; 515 } 516 /*----------------------------------------------------------------------------*/ 517