1 #include <assert.h> 2 #include <string.h> 3 4 #include "mtcp.h" 5 #include "arp.h" 6 #include "socket.h" 7 #include "eth_out.h" 8 #include "ip_out.h" 9 #include "mos_api.h" 10 #include "tcp_util.h" 11 #include "tcp_in.h" 12 #include "tcp_out.h" 13 #include "tcp_ring_buffer.h" 14 #include "eventpoll.h" 15 #include "debug.h" 16 #include "timer.h" 17 #include "ip_in.h" 18 #include "config.h" 19 20 extern struct pkt_info * 21 ClonePacketCtx(struct pkt_info *to, unsigned char *frame, struct pkt_ctx *from); 22 23 #define VERIFY_RX_CHECKSUM TRUE 24 /*----------------------------------------------------------------------------*/ 25 static inline uint32_t 26 DetectStreamType(mtcp_manager_t mtcp, struct pkt_ctx *pctx, 27 uint32_t ip, uint16_t port) 28 { 29 /* To Do: We will extend this filter to check listeners for proxy as well */ 30 struct sockaddr_in *addr; 31 int rc, cnt_match, socktype; 32 struct mon_listener *walk; 33 struct sfbpf_program fcode; 34 35 cnt_match = 0; 36 rc = 0; 37 38 if (mtcp->num_msp > 0) { 39 /* mtcp_bind_monitor_filter() 40 * - create MonitorTCPStream only when the filter of any of the existing 41 * passive sockets match the incoming flow */ 42 TAILQ_FOREACH(walk, &mtcp->monitors, link) { 43 /* For every passive monitor sockets, */ 44 socktype = walk->socket->socktype; 45 if (socktype != MOS_SOCK_MONITOR_STREAM) 46 continue; // XXX: can this happen?? 47 48 /* if pctx hits the filter rule, handle the passive monitor socket */ 49 fcode = walk->stream_syn_fcode; 50 if (!(ISSET_BPFFILTER(fcode) && pctx && 51 EVAL_BPFFILTER(fcode, (uint8_t *)pctx->p.iph - sizeof(struct ethhdr), 52 pctx->p.ip_len + sizeof(struct ethhdr)) == 0)) { 53 walk->is_stream_syn_filter_hit = 1;// set the 'filter hit' flag to 1 54 cnt_match++; // count the number of matched sockets 55 } 56 } 57 58 /* if there's any passive monitoring socket whose filter is hit, 59 we should create monitor stream */ 60 if (cnt_match > 0) 61 rc = STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE); 62 } 63 64 if (mtcp->listener) { 65 /* Detect end TCP stack mode */ 66 addr = &mtcp->listener->socket->saddr; 67 if (addr->sin_port == port) { 68 if (addr->sin_addr.s_addr != INADDR_ANY) { 69 if (ip == addr->sin_addr.s_addr) { 70 rc |= STREAM_TYPE(MOS_SOCK_STREAM); 71 } 72 } else { 73 int i; 74 75 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 76 if (ip == g_config.mos->netdev_table->ent[i]->ip_addr) { 77 rc |= STREAM_TYPE(MOS_SOCK_STREAM); 78 } 79 } 80 } 81 } 82 } 83 84 return rc; 85 } 86 /*----------------------------------------------------------------------------*/ 87 static inline tcp_stream * 88 CreateServerStream(mtcp_manager_t mtcp, int type, struct pkt_ctx *pctx) 89 { 90 tcp_stream *cur_stream = NULL; 91 92 /* create new stream and add to flow hash table */ 93 cur_stream = CreateTCPStream(mtcp, NULL, type, 94 pctx->p.iph->daddr, pctx->p.tcph->dest, 95 pctx->p.iph->saddr, pctx->p.tcph->source, NULL); 96 if (!cur_stream) { 97 TRACE_ERROR("INFO: Could not allocate tcp_stream!\n"); 98 return FALSE; 99 } 100 101 cur_stream->rcvvar->irs = pctx->p.seq; 102 cur_stream->sndvar->peer_wnd = pctx->p.window; 103 cur_stream->rcv_nxt = cur_stream->rcvvar->irs; 104 cur_stream->sndvar->cwnd = 1; 105 ParseTCPOptions(cur_stream, pctx->p.cur_ts, (uint8_t *)pctx->p.tcph + 106 TCP_HEADER_LEN, (pctx->p.tcph->doff << 2) - TCP_HEADER_LEN); 107 108 return cur_stream; 109 } 110 /*----------------------------------------------------------------------------*/ 111 static inline tcp_stream * 112 CreateMonitorStream(mtcp_manager_t mtcp, struct pkt_ctx* pctx, 113 uint32_t stream_type, unsigned int *hash) 114 { 115 tcp_stream *stream = NULL; 116 struct socket_map *walk; 117 /* create a client stream context */ 118 stream = CreateDualTCPStream(mtcp, NULL, stream_type, pctx->p.iph->daddr, 119 pctx->p.tcph->dest, pctx->p.iph->saddr, 120 pctx->p.tcph->source, NULL); 121 if (!stream) 122 return FALSE; 123 124 stream->side = MOS_SIDE_CLI; 125 stream->pair_stream->side = MOS_SIDE_SVR; 126 /* update recv context */ 127 stream->rcvvar->irs = pctx->p.seq; 128 stream->sndvar->peer_wnd = pctx->p.window; 129 stream->rcv_nxt = stream->rcvvar->irs + 1; 130 stream->sndvar->cwnd = 1; 131 132 /* 133 * if buffer management is off, then disable 134 * monitoring tcp ring of either streams (only if stream 135 * is just monitor stream active) 136 */ 137 if (IS_STREAM_TYPE(stream, MOS_SOCK_MONITOR_STREAM_ACTIVE)) { 138 assert(IS_STREAM_TYPE(stream->pair_stream, 139 MOS_SOCK_MONITOR_STREAM_ACTIVE)); 140 141 stream->buffer_mgmt = FALSE; 142 stream->pair_stream->buffer_mgmt = FALSE; 143 144 /* 145 * if there is even a single monitor asking for 146 * buffer management, enable it (that's why the 147 * need for the loop) 148 */ 149 #ifdef NEWRB 150 uint8_t bm; 151 stream->status_mgmt = 0; 152 SOCKQ_FOREACH_START(walk, &stream->msocks) { 153 bm = walk->monitor_stream->monitor_listener->server_buf_mgmt; 154 if (bm > stream->buffer_mgmt) { 155 stream->buffer_mgmt = bm; 156 } 157 if (walk->monitor_stream->monitor_listener->server_mon == 1) { 158 stream->status_mgmt = 1; 159 } 160 } SOCKQ_FOREACH_END; 161 162 stream->pair_stream->status_mgmt = 0; 163 SOCKQ_FOREACH_START(walk, &stream->pair_stream->msocks) { 164 bm = walk->monitor_stream->monitor_listener->client_buf_mgmt; 165 if (bm > stream->pair_stream->buffer_mgmt) { 166 stream->pair_stream->buffer_mgmt = bm; 167 } 168 if (walk->monitor_stream->monitor_listener->client_mon == 1) { 169 stream->pair_stream->status_mgmt = 1; 170 } 171 } SOCKQ_FOREACH_END; 172 #else 173 SOCKQ_FOREACH_START(walk, &stream->msocks) { 174 if (walk->monitor_stream->monitor_listener->server_buf_mgmt) { 175 stream->buffer_mgmt = TRUE; 176 break; 177 } 178 } SOCKQ_FOREACH_END; 179 180 SOCKQ_FOREACH_START(walk, &stream->pair_stream->msocks) { 181 if (walk->monitor_stream->monitor_listener->client_buf_mgmt) { 182 stream->pair_stream->buffer_mgmt = TRUE; 183 break; 184 } 185 } SOCKQ_FOREACH_END; 186 #endif 187 } 188 189 ParseTCPOptions(stream, pctx->p.cur_ts, 190 (uint8_t *)pctx->p.tcph + TCP_HEADER_LEN, 191 (pctx->p.tcph->doff << 2) - TCP_HEADER_LEN); 192 193 return stream; 194 } 195 /*----------------------------------------------------------------------------*/ 196 static inline struct tcp_stream * 197 FindStream(mtcp_manager_t mtcp, struct pkt_ctx *pctx, unsigned int *hash) 198 { 199 struct tcp_stream temp_stream; 200 201 temp_stream.saddr = pctx->p.iph->daddr; 202 temp_stream.sport = pctx->p.tcph->dest; 203 temp_stream.daddr = pctx->p.iph->saddr; 204 temp_stream.dport = pctx->p.tcph->source; 205 206 return HTSearch(mtcp->tcp_flow_table, &temp_stream, hash); 207 } 208 /*----------------------------------------------------------------------------*/ 209 /* Create new flow for new packet or return NULL */ 210 /*----------------------------------------------------------------------------*/ 211 static inline struct tcp_stream * 212 CreateStream(mtcp_manager_t mtcp, struct pkt_ctx *pctx, unsigned int *hash) 213 { 214 tcp_stream *cur_stream = NULL; 215 uint32_t stream_type; 216 const struct iphdr *iph = pctx->p.iph; 217 const struct tcphdr* tcph = pctx->p.tcph; 218 219 if (tcph->syn && !tcph->ack) { 220 /* handle the SYN */ 221 222 stream_type = DetectStreamType(mtcp, pctx, iph->daddr, tcph->dest); 223 if (!stream_type) { 224 TRACE_DBG("Refusing SYN packet.\n"); 225 #ifdef DBGMSG 226 DumpIPPacket(mtcp, iph, pctx->p.ip_len); 227 #endif 228 return NULL; 229 } 230 231 /* if it is accepting connections only */ 232 if (stream_type == STREAM_TYPE(MOS_SOCK_STREAM)) { 233 cur_stream = CreateServerStream(mtcp, stream_type, pctx); 234 if (!cur_stream) { 235 TRACE_DBG("No available space in flow pool.\n"); 236 #ifdef DBGMSG 237 DumpIPPacket(mtcp, iph, pctx->p.ip_len); 238 #endif 239 } 240 } else if (stream_type & STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE)) { 241 /* 242 * create both monitoring streams, and accept 243 * connection if it is set in embedded environment 244 */ 245 #if 1 246 cur_stream = CreateClientTCPStream(mtcp, NULL, stream_type, 247 pctx->p.iph->saddr, pctx->p.tcph->source, 248 pctx->p.iph->daddr, pctx->p.tcph->dest, 249 hash); 250 #else 251 cur_stream = CreateMonitorStream(mtcp, pctx, stream_type, hash); 252 #endif 253 if (!cur_stream) { 254 TRACE_DBG("No available space in flow pool.\n"); 255 #ifdef DBGMSG 256 DumpIPPacket(mtcp, iph, pctx->p.ip_len); 257 #endif 258 } 259 } else { 260 /* invalid stream type! */ 261 } 262 263 return cur_stream; 264 265 } else { 266 TRACE_DBG("Weird packet comes.\n"); 267 #ifdef DBGMSG 268 DumpIPPacket(mtcp, iph, pctx->p.ip_len); 269 #endif 270 return NULL; 271 } 272 } 273 /*----------------------------------------------------------------------------*/ 274 inline void 275 FillPacketContextTCPInfo(struct pkt_ctx *pctx, struct tcphdr * tcph) 276 { 277 pctx->p.tcph = tcph; 278 pctx->p.payload = (uint8_t *)tcph + (tcph->doff << 2); 279 pctx->p.payloadlen = pctx->p.ip_len - (pctx->p.payload - (u_char *)pctx->p.iph); 280 pctx->p.seq = ntohl(tcph->seq); 281 pctx->p.ack_seq = ntohl(tcph->ack_seq); 282 pctx->p.window = ntohs(tcph->window); 283 #ifdef NEWPPEEK 284 pctx->p.offset = 0; 285 #endif 286 287 return ; 288 } 289 /*----------------------------------------------------------------------------*/ 290 /** 291 * Called for every incoming packet from the NIC (when monitoring is disabled) 292 */ 293 static void 294 HandleSockStream(mtcp_manager_t mtcp, struct tcp_stream *cur_stream, 295 struct pkt_ctx *pctx) 296 { 297 UpdateRecvTCPContext(mtcp, cur_stream, pctx); 298 DoActionEndTCPPacket(mtcp, cur_stream, pctx); 299 } 300 /*----------------------------------------------------------------------------*/ 301 void 302 UpdateMonitor(mtcp_manager_t mtcp, struct tcp_stream *sendside_stream, 303 struct tcp_stream *recvside_stream, struct pkt_ctx *pctx, 304 bool is_pkt_reception) 305 { 306 struct socket_map *walk; 307 308 assert(pctx); 309 310 /* clone sendside_stream even if sender is disabled */ 311 ClonePacketCtx(&sendside_stream->last_pctx.p, 312 sendside_stream->last_pkt_data, pctx); 313 314 /* update send stream context first */ 315 if (sendside_stream->status_mgmt) { 316 sendside_stream->cb_events = MOS_ON_PKT_IN; 317 318 if (is_pkt_reception) 319 UpdatePassiveSendTCPContext(mtcp, sendside_stream, pctx); 320 321 sendside_stream->allow_pkt_modification = true; 322 /* POST hook of sender */ 323 SOCKQ_FOREACH_START(walk, &sendside_stream->msocks) { 324 HandleCallback(mtcp, MOS_HK_SND, walk, sendside_stream->side, 325 pctx, sendside_stream->cb_events); 326 } SOCKQ_FOREACH_END; 327 sendside_stream->allow_pkt_modification = false; 328 } 329 330 /* Attach Server-side stream */ 331 if (recvside_stream == NULL) { 332 assert(sendside_stream->side == MOS_SIDE_CLI); 333 if ((recvside_stream = AttachServerTCPStream(mtcp, sendside_stream, 0, 334 pctx->p.iph->saddr, pctx->p.tcph->source, 335 pctx->p.iph->daddr, pctx->p.tcph->dest)) == NULL) { 336 DestroyTCPStream(mtcp, sendside_stream); 337 return; 338 } 339 /* update recv context */ 340 recvside_stream->rcvvar->irs = pctx->p.seq; 341 recvside_stream->sndvar->peer_wnd = pctx->p.window; 342 recvside_stream->rcv_nxt = recvside_stream->rcvvar->irs + 1; 343 recvside_stream->sndvar->cwnd = 1; 344 345 ParseTCPOptions(recvside_stream, pctx->p.cur_ts, 346 (uint8_t *)pctx->p.tcph + TCP_HEADER_LEN, 347 (pctx->p.tcph->doff << 2) - TCP_HEADER_LEN); 348 } 349 350 /* Perform post-send tcp activities */ 351 PostSendTCPAction(mtcp, pctx, recvside_stream, sendside_stream); 352 353 if (/*1*/recvside_stream->status_mgmt) { 354 recvside_stream->cb_events = MOS_ON_PKT_IN; 355 356 /* Predict events which may be raised prior to performing TCP processing */ 357 PreRecvTCPEventPrediction(mtcp, pctx, recvside_stream); 358 359 /* retransmitted packet should avoid event simulation */ 360 //if ((recvside_stream->cb_events & MOS_ON_REXMIT) == 0) 361 /* update receive stream context (recv_side stream) */ 362 if (is_pkt_reception) 363 UpdateRecvTCPContext(mtcp, recvside_stream, pctx); 364 else 365 UpdatePassiveRecvTCPContext(mtcp, recvside_stream, pctx); 366 367 /* POST hook of receiver */ 368 SOCKQ_FOREACH_START(walk, &recvside_stream->msocks) { 369 HandleCallback(mtcp, MOS_HK_RCV, walk, recvside_stream->side, 370 pctx, recvside_stream->cb_events); 371 } SOCKQ_FOREACH_END; 372 } 373 374 /* reset callback events counter */ 375 recvside_stream->cb_events = 0; 376 sendside_stream->cb_events = 0; 377 } 378 /*----------------------------------------------------------------------------*/ 379 static void 380 HandleMonitorStream(mtcp_manager_t mtcp, struct tcp_stream *sendside_stream, 381 struct tcp_stream *recvside_stream, struct pkt_ctx *pctx) 382 { 383 UpdateMonitor(mtcp, sendside_stream, recvside_stream, pctx, true); 384 385 recvside_stream = sendside_stream->pair_stream; 386 387 if (HAS_STREAM_TYPE(recvside_stream, MOS_SOCK_STREAM)) { 388 DoActionEndTCPPacket(mtcp, recvside_stream, pctx); 389 } else { 390 /* forward packets */ 391 if (pctx->forward) 392 ForwardIPPacket(mtcp, pctx); 393 394 if (recvside_stream->stream_type == sendside_stream->stream_type && 395 IS_STREAM_TYPE(recvside_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE)) { 396 if (((recvside_stream->state == TCP_ST_TIME_WAIT && 397 g_config.mos->tcp_tw_interval == 0) || 398 recvside_stream->state == TCP_ST_CLOSED_RSVD || 399 !recvside_stream->status_mgmt) && 400 ((sendside_stream->state == TCP_ST_TIME_WAIT && 401 g_config.mos->tcp_tw_interval == 0) || 402 sendside_stream->state == TCP_ST_CLOSED_RSVD || 403 !sendside_stream->status_mgmt)) 404 405 DestroyTCPStream(mtcp, recvside_stream); 406 } 407 } 408 } 409 /*----------------------------------------------------------------------------*/ 410 int 411 ProcessInTCPPacket(mtcp_manager_t mtcp, struct pkt_ctx *pctx) 412 { 413 uint64_t events = 0; 414 struct tcp_stream *cur_stream; 415 struct iphdr* iph; 416 struct tcphdr* tcph; 417 struct mon_listener *walk; 418 unsigned int hash = 0; 419 420 iph = pctx->p.iph; 421 tcph = (struct tcphdr *)((u_char *)pctx->p.iph + (pctx->p.iph->ihl << 2)); 422 423 FillPacketContextTCPInfo(pctx, tcph); 424 425 /* callback for monitor raw socket */ 426 TAILQ_FOREACH(walk, &mtcp->monitors, link) 427 if (walk->socket->socktype == MOS_SOCK_MONITOR_RAW) 428 HandleCallback(mtcp, MOS_NULL, walk->socket, MOS_SIDE_BOTH, 429 pctx, MOS_ON_PKT_IN); 430 431 if (pctx->p.ip_len < ((iph->ihl + tcph->doff) << 2)) 432 return ERROR; 433 434 #if VERIFY_RX_CHECKSUM 435 if (TCPCalcChecksum((uint16_t *)pctx->p.tcph, 436 (tcph->doff << 2) + pctx->p.payloadlen, 437 iph->saddr, pctx->p.iph->daddr)) { 438 TRACE_DBG("Checksum Error: Original: 0x%04x, calculated: 0x%04x\n", 439 tcph->check, TCPCalcChecksum((uint16_t *)tcph, 440 (tcph->doff << 2) + payloadlen, 441 iph->saddr, iph->daddr)); 442 if (pctx->forward && mtcp->num_msp) 443 ForwardIPPacket(mtcp, pctx); 444 return ERROR; 445 } 446 #endif 447 events |= MOS_ON_PKT_IN; 448 449 /* Check whether a packet is belong to any stream */ 450 cur_stream = FindStream(mtcp, pctx, &hash); 451 if (!cur_stream) { 452 /* 453 * No need to create stream for monitor. 454 * But do create 1 for client case! 455 */ 456 if (mtcp->listener == NULL && mtcp->num_msp == 0) { 457 //if (pctx->forward) 458 // ForwardIPPacket(mtcp, pctx); 459 return TRUE; 460 } 461 /* Create new flow for new packet or return NULL */ 462 cur_stream = CreateStream(mtcp, pctx, &hash); 463 if (!cur_stream) 464 events = MOS_ON_ORPHAN; 465 } 466 467 if (cur_stream) { 468 cur_stream->cb_events = events; 469 470 #ifdef NEWPPEEK 471 if (cur_stream->rcvvar && cur_stream->rcvvar->rcvbuf) 472 pctx->p.offset = (uint64_t)seq2loff(cur_stream->rcvvar->rcvbuf, 473 pctx->p.seq, cur_stream->rcvvar->irs + 1); 474 #endif 475 476 if (IS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 477 HandleSockStream(mtcp, cur_stream, pctx); 478 479 else if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE)) 480 HandleMonitorStream(mtcp, cur_stream, cur_stream->pair_stream, pctx); 481 else 482 assert(0); 483 } else { 484 struct mon_listener *walk; 485 struct sfbpf_program fcode; 486 /* 487 * event callback for pkt_no_conn; MOS_SIDE_BOTH 488 * means that we can't judge sides here 489 */ 490 TAILQ_FOREACH(walk, &mtcp->monitors, link) { 491 /* mtcp_bind_monitor_filter() 492 * - apply stream orphan filter to every pkt before raising ORPHAN event */ 493 fcode = walk->stream_orphan_fcode; 494 if (!(ISSET_BPFFILTER(fcode) && pctx && 495 EVAL_BPFFILTER(fcode, (uint8_t *)pctx->p.iph - sizeof(struct ethhdr), 496 pctx->p.ip_len + sizeof(struct ethhdr)) == 0)) { 497 HandleCallback(mtcp, MOS_NULL, walk->socket, MOS_SIDE_BOTH, 498 pctx, events); 499 } 500 } 501 if (mtcp->listener) { 502 /* Send RST if it is run as EndTCP only mode */ 503 SendTCPPacketStandalone(mtcp, 504 iph->daddr, tcph->dest, iph->saddr, tcph->source, 505 0, pctx->p.seq + pctx->p.payloadlen + 1, 0, TCP_FLAG_RST | TCP_FLAG_ACK, 506 NULL, 0, pctx->p.cur_ts, 0); 507 } else if (pctx->forward) { 508 /* Do forward or drop if it run as Monitor only mode */ 509 ForwardIPPacket(mtcp, pctx); 510 } 511 } 512 513 return TRUE; 514 } 515 /*----------------------------------------------------------------------------*/ 516