1 #include <assert.h> 2 #include <string.h> 3 4 #include "mtcp.h" 5 #include "arp.h" 6 #include "socket.h" 7 #include "eth_out.h" 8 #include "ip_out.h" 9 #include "mos_api.h" 10 #include "tcp_util.h" 11 #include "tcp_in.h" 12 #include "tcp_out.h" 13 #include "tcp_ring_buffer.h" 14 #include "eventpoll.h" 15 #include "debug.h" 16 #include "timer.h" 17 #include "ip_in.h" 18 #include "config.h" 19 20 extern struct pkt_info * 21 ClonePacketCtx(struct pkt_info *to, unsigned char *frame, struct pkt_ctx *from); 22 23 #define VERIFY_RX_CHECKSUM TRUE 24 /*----------------------------------------------------------------------------*/ 25 static inline uint32_t 26 DetectStreamType(mtcp_manager_t mtcp, struct pkt_ctx *pctx, 27 uint32_t ip, uint16_t port) 28 { 29 /* To Do: We will extend this filter to check listeners for proxy as well */ 30 struct sockaddr_in *addr; 31 int rc, cnt_match, socktype; 32 struct mon_listener *walk; 33 struct sfbpf_program fcode; 34 35 cnt_match = 0; 36 rc = 0; 37 38 if (mtcp->num_msp > 0) { 39 /* mtcp_bind_monitor_filter() 40 * - create MonitorTCPStream only when the filter of any of the existing 41 * passive sockets match the incoming flow */ 42 TAILQ_FOREACH(walk, &mtcp->monitors, link) { 43 /* For every passive monitor sockets, */ 44 socktype = walk->socket->socktype; 45 if (socktype != MOS_SOCK_MONITOR_STREAM) 46 continue; // XXX: can this happen?? 47 48 /* if pctx hits the filter rule, handle the passive monitor socket */ 49 fcode = walk->stream_syn_fcode; 50 if (!(ISSET_BPFFILTER(fcode) && pctx && 51 EVAL_BPFFILTER(fcode, (uint8_t *)pctx->p.iph - sizeof(struct ethhdr), 52 pctx->p.ip_len + sizeof(struct ethhdr)) == 0)) { 53 walk->is_stream_syn_filter_hit = 1;// set the 'filter hit' flag to 1 54 cnt_match++; // count the number of matched sockets 55 } 56 } 57 58 /* if there's any passive monitoring socket whose filter is hit, 59 we should create monitor stream */ 60 if (cnt_match > 0) 61 rc = STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE); 62 } 63 64 if (mtcp->listener) { 65 /* Detect end TCP stack mode */ 66 addr = &mtcp->listener->socket->saddr; 67 if (addr->sin_port == port) { 68 if (addr->sin_addr.s_addr != INADDR_ANY) { 69 if (ip == addr->sin_addr.s_addr) { 70 rc |= STREAM_TYPE(MOS_SOCK_STREAM); 71 } 72 } else { 73 int i; 74 75 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 76 if (ip == g_config.mos->netdev_table->ent[i]->ip_addr) { 77 rc |= STREAM_TYPE(MOS_SOCK_STREAM); 78 } 79 } 80 } 81 } 82 } 83 84 return rc; 85 } 86 /*----------------------------------------------------------------------------*/ 87 static inline tcp_stream * 88 CreateServerStream(mtcp_manager_t mtcp, int type, struct pkt_ctx *pctx) 89 { 90 tcp_stream *cur_stream = NULL; 91 92 /* create new stream and add to flow hash table */ 93 cur_stream = CreateTCPStream(mtcp, NULL, type, 94 pctx->p.iph->daddr, pctx->p.tcph->dest, 95 pctx->p.iph->saddr, pctx->p.tcph->source, NULL); 96 if (!cur_stream) { 97 TRACE_ERROR("INFO: Could not allocate tcp_stream!\n"); 98 return FALSE; 99 } 100 101 cur_stream->rcvvar->irs = pctx->p.seq; 102 cur_stream->sndvar->peer_wnd = pctx->p.window; 103 cur_stream->rcv_nxt = cur_stream->rcvvar->irs; 104 cur_stream->sndvar->cwnd = 1; 105 ParseTCPOptions(cur_stream, pctx->p.cur_ts, (uint8_t *)pctx->p.tcph + 106 TCP_HEADER_LEN, (pctx->p.tcph->doff << 2) - TCP_HEADER_LEN); 107 108 return cur_stream; 109 } 110 /*----------------------------------------------------------------------------*/ 111 static inline tcp_stream * 112 CreateMonitorStream(mtcp_manager_t mtcp, struct pkt_ctx* pctx, 113 uint32_t stream_type, unsigned int *hash) 114 { 115 tcp_stream *stream = NULL; 116 struct socket_map *walk; 117 /* create a client stream context */ 118 stream = CreateDualTCPStream(mtcp, NULL, stream_type, pctx->p.iph->daddr, 119 pctx->p.tcph->dest, pctx->p.iph->saddr, 120 pctx->p.tcph->source, NULL); 121 if (!stream) 122 return FALSE; 123 124 stream->side = MOS_SIDE_CLI; 125 stream->pair_stream->side = MOS_SIDE_SVR; 126 /* update recv context */ 127 stream->rcvvar->irs = pctx->p.seq; 128 stream->sndvar->peer_wnd = pctx->p.window; 129 stream->rcv_nxt = stream->rcvvar->irs + 1; 130 stream->sndvar->cwnd = 1; 131 132 /* 133 * if buffer management is off, then disable 134 * monitoring tcp ring of either streams (only if stream 135 * is just monitor stream active) 136 */ 137 if (IS_STREAM_TYPE(stream, MOS_SOCK_MONITOR_STREAM_ACTIVE)) { 138 assert(IS_STREAM_TYPE(stream->pair_stream, 139 MOS_SOCK_MONITOR_STREAM_ACTIVE)); 140 141 stream->buffer_mgmt = FALSE; 142 stream->pair_stream->buffer_mgmt = FALSE; 143 144 /* 145 * if there is even a single monitor asking for 146 * buffer management, enable it (that's why the 147 * need for the loop) 148 */ 149 uint8_t bm; 150 stream->status_mgmt = 0; 151 SOCKQ_FOREACH_START(walk, &stream->msocks) { 152 bm = walk->monitor_stream->monitor_listener->server_buf_mgmt; 153 if (bm > stream->buffer_mgmt) { 154 stream->buffer_mgmt = bm; 155 } 156 if (walk->monitor_stream->monitor_listener->server_mon == 1) { 157 stream->status_mgmt = 1; 158 } 159 } SOCKQ_FOREACH_END; 160 161 stream->pair_stream->status_mgmt = 0; 162 SOCKQ_FOREACH_START(walk, &stream->pair_stream->msocks) { 163 bm = walk->monitor_stream->monitor_listener->client_buf_mgmt; 164 if (bm > stream->pair_stream->buffer_mgmt) { 165 stream->pair_stream->buffer_mgmt = bm; 166 } 167 if (walk->monitor_stream->monitor_listener->client_mon == 1) { 168 stream->pair_stream->status_mgmt = 1; 169 } 170 } SOCKQ_FOREACH_END; 171 } 172 173 ParseTCPOptions(stream, pctx->p.cur_ts, 174 (uint8_t *)pctx->p.tcph + TCP_HEADER_LEN, 175 (pctx->p.tcph->doff << 2) - TCP_HEADER_LEN); 176 177 return stream; 178 } 179 /*----------------------------------------------------------------------------*/ 180 static inline struct tcp_stream * 181 FindStream(mtcp_manager_t mtcp, struct pkt_ctx *pctx, unsigned int *hash) 182 { 183 struct tcp_stream temp_stream; 184 185 temp_stream.saddr = pctx->p.iph->daddr; 186 temp_stream.sport = pctx->p.tcph->dest; 187 temp_stream.daddr = pctx->p.iph->saddr; 188 temp_stream.dport = pctx->p.tcph->source; 189 190 return HTSearch(mtcp->tcp_flow_table, &temp_stream, hash); 191 } 192 /*----------------------------------------------------------------------------*/ 193 /* Create new flow for new packet or return NULL */ 194 /*----------------------------------------------------------------------------*/ 195 static inline struct tcp_stream * 196 CreateStream(mtcp_manager_t mtcp, struct pkt_ctx *pctx, unsigned int *hash) 197 { 198 tcp_stream *cur_stream = NULL; 199 uint32_t stream_type; 200 const struct iphdr *iph = pctx->p.iph; 201 const struct tcphdr* tcph = pctx->p.tcph; 202 203 if (tcph->syn && !tcph->ack) { 204 /* handle the SYN */ 205 206 stream_type = DetectStreamType(mtcp, pctx, iph->daddr, tcph->dest); 207 if (!stream_type) { 208 TRACE_DBG("Refusing SYN packet.\n"); 209 #ifdef DBGMSG 210 DumpIPPacket(mtcp, iph, pctx->p.ip_len); 211 #endif 212 return NULL; 213 } 214 215 /* if it is accepting connections only */ 216 if (stream_type == STREAM_TYPE(MOS_SOCK_STREAM)) { 217 cur_stream = CreateServerStream(mtcp, stream_type, pctx); 218 if (!cur_stream) { 219 TRACE_DBG("No available space in flow pool.\n"); 220 #ifdef DBGMSG 221 DumpIPPacket(mtcp, iph, pctx->p.ip_len); 222 #endif 223 } 224 } else if (stream_type & STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE)) { 225 /* 226 * create both monitoring streams, and accept 227 * connection if it is set in embedded environment 228 */ 229 #if 1 230 cur_stream = CreateClientTCPStream(mtcp, NULL, stream_type, 231 pctx->p.iph->saddr, pctx->p.tcph->source, 232 pctx->p.iph->daddr, pctx->p.tcph->dest, 233 hash); 234 #else 235 cur_stream = CreateMonitorStream(mtcp, pctx, stream_type, hash); 236 #endif 237 if (!cur_stream) { 238 TRACE_DBG("No available space in flow pool.\n"); 239 #ifdef DBGMSG 240 DumpIPPacket(mtcp, iph, pctx->p.ip_len); 241 #endif 242 } 243 } else { 244 /* invalid stream type! */ 245 } 246 247 return cur_stream; 248 249 } else { 250 TRACE_DBG("Weird packet comes.\n"); 251 #ifdef DBGMSG 252 DumpIPPacket(mtcp, iph, pctx->p.ip_len); 253 #endif 254 return NULL; 255 } 256 } 257 /*----------------------------------------------------------------------------*/ 258 inline void 259 FillPacketContextTCPInfo(struct pkt_ctx *pctx, struct tcphdr * tcph) 260 { 261 pctx->p.tcph = tcph; 262 pctx->p.payload = (uint8_t *)tcph + (tcph->doff << 2); 263 pctx->p.payloadlen = pctx->p.ip_len - (pctx->p.payload - (u_char *)pctx->p.iph); 264 pctx->p.seq = ntohl(tcph->seq); 265 pctx->p.ack_seq = ntohl(tcph->ack_seq); 266 pctx->p.window = ntohs(tcph->window); 267 pctx->p.offset = 0; 268 269 return ; 270 } 271 /*----------------------------------------------------------------------------*/ 272 /** 273 * Called for every incoming packet from the NIC (when monitoring is disabled) 274 */ 275 static void 276 HandleSockStream(mtcp_manager_t mtcp, struct tcp_stream *cur_stream, 277 struct pkt_ctx *pctx) 278 { 279 UpdateRecvTCPContext(mtcp, cur_stream, pctx); 280 DoActionEndTCPPacket(mtcp, cur_stream, pctx); 281 } 282 /*----------------------------------------------------------------------------*/ 283 void 284 UpdateMonitor(mtcp_manager_t mtcp, struct tcp_stream *sendside_stream, 285 struct tcp_stream *recvside_stream, struct pkt_ctx *pctx, 286 bool is_pkt_reception) 287 { 288 struct socket_map *walk; 289 290 assert(pctx); 291 292 /* clone sendside_stream even if sender is disabled */ 293 ClonePacketCtx(&sendside_stream->last_pctx.p, 294 sendside_stream->last_pkt_data, pctx); 295 296 /* update send stream context first */ 297 if (sendside_stream->status_mgmt) { 298 sendside_stream->cb_events = MOS_ON_PKT_IN; 299 300 if (is_pkt_reception) 301 UpdatePassiveSendTCPContext(mtcp, sendside_stream, pctx); 302 303 sendside_stream->allow_pkt_modification = true; 304 /* POST hook of sender */ 305 SOCKQ_FOREACH_START(walk, &sendside_stream->msocks) { 306 HandleCallback(mtcp, MOS_HK_SND, walk, sendside_stream->side, 307 pctx, sendside_stream->cb_events); 308 } SOCKQ_FOREACH_END; 309 sendside_stream->allow_pkt_modification = false; 310 } 311 312 /* Attach Server-side stream */ 313 if (recvside_stream == NULL) { 314 assert(sendside_stream->side == MOS_SIDE_CLI); 315 if ((recvside_stream = AttachServerTCPStream(mtcp, sendside_stream, 0, 316 pctx->p.iph->saddr, pctx->p.tcph->source, 317 pctx->p.iph->daddr, pctx->p.tcph->dest)) == NULL) { 318 DestroyTCPStream(mtcp, sendside_stream); 319 return; 320 } 321 /* update recv context */ 322 recvside_stream->rcvvar->irs = pctx->p.seq; 323 recvside_stream->sndvar->peer_wnd = pctx->p.window; 324 recvside_stream->rcv_nxt = recvside_stream->rcvvar->irs + 1; 325 recvside_stream->sndvar->cwnd = 1; 326 327 ParseTCPOptions(recvside_stream, pctx->p.cur_ts, 328 (uint8_t *)pctx->p.tcph + TCP_HEADER_LEN, 329 (pctx->p.tcph->doff << 2) - TCP_HEADER_LEN); 330 } 331 332 /* Perform post-send tcp activities */ 333 PostSendTCPAction(mtcp, pctx, recvside_stream, sendside_stream); 334 335 if (/*1*/recvside_stream->status_mgmt) { 336 recvside_stream->cb_events = MOS_ON_PKT_IN; 337 338 /* Predict events which may be raised prior to performing TCP processing */ 339 PreRecvTCPEventPrediction(mtcp, pctx, recvside_stream); 340 341 /* retransmitted packet should avoid event simulation */ 342 //if ((recvside_stream->cb_events & MOS_ON_REXMIT) == 0) 343 /* update receive stream context (recv_side stream) */ 344 if (is_pkt_reception) 345 UpdateRecvTCPContext(mtcp, recvside_stream, pctx); 346 else 347 UpdatePassiveRecvTCPContext(mtcp, recvside_stream, pctx); 348 349 /* POST hook of receiver */ 350 SOCKQ_FOREACH_START(walk, &recvside_stream->msocks) { 351 HandleCallback(mtcp, MOS_HK_RCV, walk, recvside_stream->side, 352 pctx, recvside_stream->cb_events); 353 } SOCKQ_FOREACH_END; 354 } 355 356 /* reset callback events counter */ 357 recvside_stream->cb_events = 0; 358 sendside_stream->cb_events = 0; 359 } 360 /*----------------------------------------------------------------------------*/ 361 static void 362 HandleMonitorStream(mtcp_manager_t mtcp, struct tcp_stream *sendside_stream, 363 struct tcp_stream *recvside_stream, struct pkt_ctx *pctx) 364 { 365 UpdateMonitor(mtcp, sendside_stream, recvside_stream, pctx, true); 366 367 recvside_stream = sendside_stream->pair_stream; 368 369 if (HAS_STREAM_TYPE(recvside_stream, MOS_SOCK_STREAM)) { 370 DoActionEndTCPPacket(mtcp, recvside_stream, pctx); 371 } else { 372 /* forward packets */ 373 if (pctx->forward) 374 ForwardIPPacket(mtcp, pctx); 375 376 if (recvside_stream->stream_type == sendside_stream->stream_type && 377 IS_STREAM_TYPE(recvside_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE)) { 378 if (((recvside_stream->state == TCP_ST_TIME_WAIT && 379 g_config.mos->tcp_tw_interval == 0) || 380 recvside_stream->state == TCP_ST_CLOSED_RSVD || 381 !recvside_stream->status_mgmt) && 382 ((sendside_stream->state == TCP_ST_TIME_WAIT && 383 g_config.mos->tcp_tw_interval == 0) || 384 sendside_stream->state == TCP_ST_CLOSED_RSVD || 385 !sendside_stream->status_mgmt)) 386 387 DestroyTCPStream(mtcp, recvside_stream); 388 } 389 } 390 } 391 /*----------------------------------------------------------------------------*/ 392 int 393 ProcessInTCPPacket(mtcp_manager_t mtcp, struct pkt_ctx *pctx) 394 { 395 uint64_t events = 0; 396 struct tcp_stream *cur_stream; 397 struct iphdr* iph; 398 struct tcphdr* tcph; 399 struct mon_listener *walk; 400 unsigned int hash = 0; 401 402 iph = pctx->p.iph; 403 tcph = (struct tcphdr *)((u_char *)pctx->p.iph + (pctx->p.iph->ihl << 2)); 404 405 FillPacketContextTCPInfo(pctx, tcph); 406 407 /* callback for monitor raw socket */ 408 TAILQ_FOREACH(walk, &mtcp->monitors, link) 409 if (walk->socket->socktype == MOS_SOCK_MONITOR_RAW) 410 HandleCallback(mtcp, MOS_NULL, walk->socket, MOS_SIDE_BOTH, 411 pctx, MOS_ON_PKT_IN); 412 413 if (pctx->p.ip_len < ((iph->ihl + tcph->doff) << 2)) 414 return ERROR; 415 416 #if VERIFY_RX_CHECKSUM 417 if (TCPCalcChecksum((uint16_t *)pctx->p.tcph, 418 (tcph->doff << 2) + pctx->p.payloadlen, 419 iph->saddr, pctx->p.iph->daddr)) { 420 TRACE_DBG("Checksum Error: Original: 0x%04x, calculated: 0x%04x\n", 421 tcph->check, TCPCalcChecksum((uint16_t *)tcph, 422 (tcph->doff << 2) + pctx->p.payloadlen, 423 iph->saddr, iph->daddr)); 424 if (pctx->forward && mtcp->num_msp) 425 ForwardIPPacket(mtcp, pctx); 426 return ERROR; 427 } 428 #endif 429 events |= MOS_ON_PKT_IN; 430 431 /* Check whether a packet is belong to any stream */ 432 cur_stream = FindStream(mtcp, pctx, &hash); 433 if (!cur_stream) { 434 /* 435 * No need to create stream for monitor. 436 * But do create 1 for client case! 437 */ 438 if (mtcp->listener == NULL && mtcp->num_msp == 0) { 439 //if (pctx->forward) 440 // ForwardIPPacket(mtcp, pctx); 441 return TRUE; 442 } 443 /* Create new flow for new packet or return NULL */ 444 cur_stream = CreateStream(mtcp, pctx, &hash); 445 if (!cur_stream) 446 events = MOS_ON_ORPHAN; 447 } 448 449 if (cur_stream) { 450 cur_stream->cb_events = events; 451 452 if (cur_stream->rcvvar && cur_stream->rcvvar->rcvbuf) 453 pctx->p.offset = (uint64_t)seq2loff(cur_stream->rcvvar->rcvbuf, 454 pctx->p.seq, cur_stream->rcvvar->irs + 1); 455 456 if (IS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM)) 457 HandleSockStream(mtcp, cur_stream, pctx); 458 459 else if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE)) 460 HandleMonitorStream(mtcp, cur_stream, cur_stream->pair_stream, pctx); 461 else 462 assert(0); 463 } else { 464 struct mon_listener *walk; 465 struct sfbpf_program fcode; 466 /* 467 * event callback for pkt_no_conn; MOS_SIDE_BOTH 468 * means that we can't judge sides here 469 */ 470 TAILQ_FOREACH(walk, &mtcp->monitors, link) { 471 /* mtcp_bind_monitor_filter() 472 * - apply stream orphan filter to every pkt before raising ORPHAN event */ 473 fcode = walk->stream_orphan_fcode; 474 if (!(ISSET_BPFFILTER(fcode) && pctx && 475 EVAL_BPFFILTER(fcode, (uint8_t *)pctx->p.iph - sizeof(struct ethhdr), 476 pctx->p.ip_len + sizeof(struct ethhdr)) == 0)) { 477 HandleCallback(mtcp, MOS_NULL, walk->socket, MOS_SIDE_BOTH, 478 pctx, events); 479 } 480 } 481 if (mtcp->listener) { 482 /* RFC 793 (page 65) says 483 "An incoming segment containing a RST is discarded." 484 if the TCP state is CLOSED (= TCP stream does not exist). */ 485 if (!tcph->rst) 486 /* Send RST if it is run as EndTCP only mode */ 487 SendTCPPacketStandalone(mtcp, 488 iph->daddr, tcph->dest, iph->saddr, tcph->source, 489 0, pctx->p.seq + pctx->p.payloadlen + 1, 0, TCP_FLAG_RST | TCP_FLAG_ACK, 490 NULL, 0, pctx->p.cur_ts, 0, 0); 491 } else if (pctx->forward) { 492 /* Do forward or drop if it run as Monitor only mode */ 493 ForwardIPPacket(mtcp, pctx); 494 } 495 } 496 497 return TRUE; 498 } 499 /*----------------------------------------------------------------------------*/ 500