1 #include <assert.h>
2 #include <string.h>
3
4 #include "mtcp.h"
5 #include "arp.h"
6 #include "socket.h"
7 #include "eth_out.h"
8 #include "ip_out.h"
9 #include "mos_api.h"
10 #include "tcp_util.h"
11 #include "tcp_in.h"
12 #include "tcp_out.h"
13 #include "tcp_ring_buffer.h"
14 #include "eventpoll.h"
15 #include "debug.h"
16 #include "timer.h"
17 #include "ip_in.h"
18 #include "config.h"
19
20 extern struct pkt_info *
21 ClonePacketCtx(struct pkt_info *to, unsigned char *frame, struct pkt_info *from);
22
23 #define VERIFY_RX_CHECKSUM TRUE
24 /*----------------------------------------------------------------------------*/
25 static inline uint32_t
DetectStreamType(mtcp_manager_t mtcp,struct pkt_ctx * pctx,uint32_t ip,uint16_t port)26 DetectStreamType(mtcp_manager_t mtcp, struct pkt_ctx *pctx,
27 uint32_t ip, uint16_t port)
28 {
29 /* To Do: We will extend this filter to check listeners for proxy as well */
30 struct sockaddr_in *addr;
31 int rc, cnt_match, socktype;
32 struct mon_listener *walk;
33 struct sfbpf_program fcode;
34
35 cnt_match = 0;
36 rc = 0;
37
38 if (mtcp->num_msp > 0) {
39 /* mtcp_bind_monitor_filter()
40 * - create MonitorTCPStream only when the filter of any of the existing
41 * passive sockets match the incoming flow */
42 TAILQ_FOREACH(walk, &mtcp->monitors, link) {
43 /* For every passive monitor sockets, */
44 socktype = walk->socket->socktype;
45 if (socktype != MOS_SOCK_MONITOR_STREAM)
46 continue; // XXX: can this happen??
47
48 /* if pctx hits the filter rule, handle the passive monitor socket */
49 fcode = walk->stream_syn_fcode;
50 if (!(ISSET_BPFFILTER(fcode) && pctx &&
51 EVAL_BPFFILTER(fcode, (uint8_t *)pctx->p.iph - sizeof(struct ethhdr),
52 pctx->p.ip_len + sizeof(struct ethhdr)) == 0)) {
53 walk->is_stream_syn_filter_hit = 1;// set the 'filter hit' flag to 1
54 cnt_match++; // count the number of matched sockets
55 }
56 }
57
58 /* if there's any passive monitoring socket whose filter is hit,
59 we should create monitor stream */
60 if (cnt_match > 0)
61 rc = STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE);
62 }
63
64 if (mtcp->listener) {
65 /* Detect end TCP stack mode */
66 addr = &mtcp->listener->socket->saddr;
67 if (addr->sin_port == port) {
68 if (addr->sin_addr.s_addr != INADDR_ANY) {
69 if (ip == addr->sin_addr.s_addr) {
70 rc |= STREAM_TYPE(MOS_SOCK_STREAM);
71 }
72 } else {
73 int i;
74
75 for (i = 0; i < g_config.mos->netdev_table->num; i++) {
76 if (ip == g_config.mos->netdev_table->ent[i]->ip_addr) {
77 rc |= STREAM_TYPE(MOS_SOCK_STREAM);
78 }
79 }
80 }
81 }
82 }
83
84 return rc;
85 }
86 /*----------------------------------------------------------------------------*/
87 static inline tcp_stream *
CreateServerStream(mtcp_manager_t mtcp,int type,struct pkt_ctx * pctx)88 CreateServerStream(mtcp_manager_t mtcp, int type, struct pkt_ctx *pctx)
89 {
90 tcp_stream *cur_stream = NULL;
91
92 /* create new stream and add to flow hash table */
93 cur_stream = CreateTCPStream(mtcp, NULL, type,
94 pctx->p.iph->daddr, pctx->p.tcph->dest,
95 pctx->p.iph->saddr, pctx->p.tcph->source, NULL);
96 if (!cur_stream) {
97 TRACE_ERROR("INFO: Could not allocate tcp_stream!\n");
98 return FALSE;
99 }
100
101 cur_stream->rcvvar->irs = pctx->p.seq;
102 cur_stream->sndvar->peer_wnd = pctx->p.window;
103 cur_stream->rcv_nxt = cur_stream->rcvvar->irs;
104 cur_stream->sndvar->cwnd = 1;
105 ParseTCPOptions(cur_stream, pctx->p.cur_ts, (uint8_t *)pctx->p.tcph +
106 TCP_HEADER_LEN, (pctx->p.tcph->doff << 2) - TCP_HEADER_LEN);
107
108 return cur_stream;
109 }
110 /*----------------------------------------------------------------------------*/
111 static inline tcp_stream *
CreateMonitorStream(mtcp_manager_t mtcp,struct pkt_ctx * pctx,uint32_t stream_type,unsigned int * hash)112 CreateMonitorStream(mtcp_manager_t mtcp, struct pkt_ctx* pctx,
113 uint32_t stream_type, unsigned int *hash)
114 {
115 tcp_stream *stream = NULL;
116 struct socket_map *walk;
117 /* create a client stream context */
118 stream = CreateDualTCPStream(mtcp, NULL, stream_type, pctx->p.iph->daddr,
119 pctx->p.tcph->dest, pctx->p.iph->saddr,
120 pctx->p.tcph->source, NULL);
121 if (!stream)
122 return FALSE;
123
124 stream->side = MOS_SIDE_CLI;
125 stream->pair_stream->side = MOS_SIDE_SVR;
126 /* update recv context */
127 stream->rcvvar->irs = pctx->p.seq;
128 stream->sndvar->peer_wnd = pctx->p.window;
129 stream->rcv_nxt = stream->rcvvar->irs + 1;
130 stream->sndvar->cwnd = 1;
131
132 /*
133 * if buffer management is off, then disable
134 * monitoring tcp ring of either streams (only if stream
135 * is just monitor stream active)
136 */
137 if (IS_STREAM_TYPE(stream, MOS_SOCK_MONITOR_STREAM_ACTIVE)) {
138 assert(IS_STREAM_TYPE(stream->pair_stream,
139 MOS_SOCK_MONITOR_STREAM_ACTIVE));
140
141 stream->buffer_mgmt = FALSE;
142 stream->pair_stream->buffer_mgmt = FALSE;
143
144 /*
145 * if there is even a single monitor asking for
146 * buffer management, enable it (that's why the
147 * need for the loop)
148 */
149 uint8_t bm;
150 stream->status_mgmt = 0;
151 SOCKQ_FOREACH_START(walk, &stream->msocks) {
152 bm = walk->monitor_stream->monitor_listener->server_buf_mgmt;
153 if (bm > stream->buffer_mgmt) {
154 stream->buffer_mgmt = bm;
155 }
156 if (walk->monitor_stream->monitor_listener->server_mon == 1) {
157 stream->status_mgmt = 1;
158 }
159 } SOCKQ_FOREACH_END;
160
161 stream->pair_stream->status_mgmt = 0;
162 SOCKQ_FOREACH_START(walk, &stream->pair_stream->msocks) {
163 bm = walk->monitor_stream->monitor_listener->client_buf_mgmt;
164 if (bm > stream->pair_stream->buffer_mgmt) {
165 stream->pair_stream->buffer_mgmt = bm;
166 }
167 if (walk->monitor_stream->monitor_listener->client_mon == 1) {
168 stream->pair_stream->status_mgmt = 1;
169 }
170 } SOCKQ_FOREACH_END;
171 }
172
173 ParseTCPOptions(stream, pctx->p.cur_ts,
174 (uint8_t *)pctx->p.tcph + TCP_HEADER_LEN,
175 (pctx->p.tcph->doff << 2) - TCP_HEADER_LEN);
176
177 return stream;
178 }
179 /*----------------------------------------------------------------------------*/
180 static inline struct tcp_stream *
FindStream(mtcp_manager_t mtcp,struct pkt_ctx * pctx,unsigned int * hash)181 FindStream(mtcp_manager_t mtcp, struct pkt_ctx *pctx, unsigned int *hash)
182 {
183 struct tcp_stream temp_stream;
184
185 temp_stream.saddr = pctx->p.iph->daddr;
186 temp_stream.sport = pctx->p.tcph->dest;
187 temp_stream.daddr = pctx->p.iph->saddr;
188 temp_stream.dport = pctx->p.tcph->source;
189
190 return HTSearch(mtcp->tcp_flow_table, &temp_stream, hash);
191 }
192 /*----------------------------------------------------------------------------*/
193 /* Create new flow for new packet or return NULL */
194 /*----------------------------------------------------------------------------*/
195 static inline struct tcp_stream *
CreateStream(mtcp_manager_t mtcp,struct pkt_ctx * pctx,unsigned int * hash)196 CreateStream(mtcp_manager_t mtcp, struct pkt_ctx *pctx, unsigned int *hash)
197 {
198 tcp_stream *cur_stream = NULL;
199 uint32_t stream_type;
200 const struct iphdr *iph = pctx->p.iph;
201 const struct tcphdr* tcph = pctx->p.tcph;
202
203 if (tcph->syn && !tcph->ack) {
204 /* handle the SYN */
205
206 stream_type = DetectStreamType(mtcp, pctx, iph->daddr, tcph->dest);
207 if (!stream_type) {
208 TRACE_DBG("Refusing SYN packet.\n");
209 #ifdef DBGMSG
210 DumpIPPacket(mtcp, iph, pctx->p.ip_len);
211 #endif
212 return NULL;
213 }
214
215 /* if it is accepting connections only */
216 if (stream_type == STREAM_TYPE(MOS_SOCK_STREAM)) {
217 cur_stream = CreateServerStream(mtcp, stream_type, pctx);
218 if (!cur_stream) {
219 TRACE_DBG("No available space in flow pool.\n");
220 #ifdef DBGMSG
221 DumpIPPacket(mtcp, iph, pctx->p.ip_len);
222 #endif
223 }
224 } else if (stream_type & STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE)) {
225 /*
226 * create both monitoring streams, and accept
227 * connection if it is set in embedded environment
228 */
229 #if 1
230 cur_stream = CreateClientTCPStream(mtcp, NULL, stream_type,
231 pctx->p.iph->saddr, pctx->p.tcph->source,
232 pctx->p.iph->daddr, pctx->p.tcph->dest,
233 hash);
234 #else
235 cur_stream = CreateMonitorStream(mtcp, pctx, stream_type, hash);
236 #endif
237 if (!cur_stream) {
238 TRACE_DBG("No available space in flow pool.\n");
239 #ifdef DBGMSG
240 DumpIPPacket(mtcp, iph, pctx->p.ip_len);
241 #endif
242 }
243 } else {
244 /* invalid stream type! */
245 }
246
247 return cur_stream;
248
249 } else {
250 TRACE_DBG("Weird packet comes.\n");
251 #ifdef DBGMSG
252 DumpIPPacket(mtcp, iph, pctx->p.ip_len);
253 #endif
254 return NULL;
255 }
256 }
257 /*----------------------------------------------------------------------------*/
258 inline void
FillPacketContextTCPInfo(struct pkt_ctx * pctx,struct tcphdr * tcph)259 FillPacketContextTCPInfo(struct pkt_ctx *pctx, struct tcphdr * tcph)
260 {
261 pctx->p.tcph = tcph;
262 pctx->p.payload = (uint8_t *)tcph + (tcph->doff << 2);
263 pctx->p.payloadlen = pctx->p.ip_len - (pctx->p.payload - (u_char *)pctx->p.iph);
264 pctx->p.seq = ntohl(tcph->seq);
265 pctx->p.ack_seq = ntohl(tcph->ack_seq);
266 pctx->p.window = ntohs(tcph->window);
267 pctx->p.offset = 0;
268
269 return ;
270 }
271 /*----------------------------------------------------------------------------*/
272 /**
273 * Called for every incoming packet from the NIC (when monitoring is disabled)
274 */
275 static void
HandleSockStream(mtcp_manager_t mtcp,struct tcp_stream * cur_stream,struct pkt_ctx * pctx)276 HandleSockStream(mtcp_manager_t mtcp, struct tcp_stream *cur_stream,
277 struct pkt_ctx *pctx)
278 {
279 UpdateRecvTCPContext(mtcp, cur_stream, pctx);
280 DoActionEndTCPPacket(mtcp, cur_stream, pctx);
281 }
282 /*----------------------------------------------------------------------------*/
283 void
UpdateMonitor(mtcp_manager_t mtcp,struct tcp_stream * sendside_stream,struct tcp_stream * recvside_stream,struct pkt_ctx * pctx,bool is_pkt_reception)284 UpdateMonitor(mtcp_manager_t mtcp, struct tcp_stream *sendside_stream,
285 struct tcp_stream *recvside_stream, struct pkt_ctx *pctx,
286 bool is_pkt_reception)
287 {
288 struct socket_map *walk;
289
290 assert(pctx);
291
292 #ifdef RECORDPKT_PER_STREAM
293 /* clone sendside_stream even if sender is disabled */
294 ClonePacketCtx(&sendside_stream->last_pctx.p,
295 sendside_stream->last_pkt_data, &(pctx.p));
296 #endif
297
298 /* update send stream context first */
299 if (sendside_stream->status_mgmt) {
300 sendside_stream->cb_events = MOS_ON_PKT_IN;
301
302 if (is_pkt_reception)
303 UpdatePassiveSendTCPContext(mtcp, sendside_stream, pctx);
304
305 sendside_stream->allow_pkt_modification = true;
306 /* POST hook of sender */
307 if (sendside_stream->side == MOS_SIDE_CLI) {
308 SOCKQ_FOREACH_START(walk, &sendside_stream->msocks) {
309 HandleCallback(mtcp, MOS_HK_SND, walk, sendside_stream->side,
310 pctx, sendside_stream->cb_events);
311 } SOCKQ_FOREACH_END;
312 } else { /* sendside_stream->side == MOS_SIDE_SVR */
313 SOCKQ_FOREACH_REVERSE(walk, &sendside_stream->msocks) {
314 HandleCallback(mtcp, MOS_HK_SND, walk, sendside_stream->side,
315 pctx, sendside_stream->cb_events);
316 } SOCKQ_FOREACH_END;
317 }
318 sendside_stream->allow_pkt_modification = false;
319 }
320
321 /* Attach Server-side stream */
322 if (recvside_stream == NULL) {
323 assert(sendside_stream->side == MOS_SIDE_CLI);
324 if ((recvside_stream = AttachServerTCPStream(mtcp, sendside_stream, 0,
325 pctx->p.iph->saddr, pctx->p.tcph->source,
326 pctx->p.iph->daddr, pctx->p.tcph->dest)) == NULL) {
327 DestroyTCPStream(mtcp, sendside_stream);
328 return;
329 }
330 /* update recv context */
331 recvside_stream->rcvvar->irs = pctx->p.seq;
332 recvside_stream->sndvar->peer_wnd = pctx->p.window;
333 recvside_stream->rcv_nxt = recvside_stream->rcvvar->irs + 1;
334 recvside_stream->sndvar->cwnd = 1;
335
336 ParseTCPOptions(recvside_stream, pctx->p.cur_ts,
337 (uint8_t *)pctx->p.tcph + TCP_HEADER_LEN,
338 (pctx->p.tcph->doff << 2) - TCP_HEADER_LEN);
339 }
340
341 /* Perform post-send tcp activities */
342 PostSendTCPAction(mtcp, pctx, recvside_stream, sendside_stream);
343
344 if (/*1*/recvside_stream->status_mgmt) {
345 recvside_stream->cb_events = MOS_ON_PKT_IN;
346
347 /* Predict events which may be raised prior to performing TCP processing */
348 PreRecvTCPEventPrediction(mtcp, pctx, recvside_stream);
349
350 /* retransmitted packet should avoid event simulation */
351 //if ((recvside_stream->cb_events & MOS_ON_REXMIT) == 0)
352 /* update receive stream context (recv_side stream) */
353 if (is_pkt_reception)
354 UpdateRecvTCPContext(mtcp, recvside_stream, pctx);
355 else
356 UpdatePassiveRecvTCPContext(mtcp, recvside_stream, pctx);
357
358 /* POST hook of receiver */
359 if (recvside_stream->side == MOS_SIDE_CLI) {
360 SOCKQ_FOREACH_REVERSE(walk, &recvside_stream->msocks) {
361 HandleCallback(mtcp, MOS_HK_RCV, walk, recvside_stream->side,
362 pctx, recvside_stream->cb_events);
363 } SOCKQ_FOREACH_END;
364 } else { /* recvside_stream->side == MOS_SIDE_SVR */
365 SOCKQ_FOREACH_START(walk, &recvside_stream->msocks) {
366 HandleCallback(mtcp, MOS_HK_RCV, walk, recvside_stream->side,
367 pctx, recvside_stream->cb_events);
368 } SOCKQ_FOREACH_END;
369 }
370 }
371
372 /* reset callback events counter */
373 recvside_stream->cb_events = 0;
374 sendside_stream->cb_events = 0;
375 }
376 /*----------------------------------------------------------------------------*/
377 static void
HandleMonitorStream(mtcp_manager_t mtcp,struct tcp_stream * sendside_stream,struct tcp_stream * recvside_stream,struct pkt_ctx * pctx)378 HandleMonitorStream(mtcp_manager_t mtcp, struct tcp_stream *sendside_stream,
379 struct tcp_stream *recvside_stream, struct pkt_ctx *pctx)
380 {
381 UpdateMonitor(mtcp, sendside_stream, recvside_stream, pctx, true);
382
383 recvside_stream = sendside_stream->pair_stream;
384
385 if (HAS_STREAM_TYPE(recvside_stream, MOS_SOCK_STREAM)) {
386 DoActionEndTCPPacket(mtcp, recvside_stream, pctx);
387 } else {
388 /* forward packets */
389 if (pctx->forward)
390 ForwardIPPacket(mtcp, pctx);
391
392 if (recvside_stream->stream_type == sendside_stream->stream_type &&
393 IS_STREAM_TYPE(recvside_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE)) {
394 if (((recvside_stream->state == TCP_ST_TIME_WAIT &&
395 g_config.mos->tcp_tw_interval == 0) ||
396 recvside_stream->state == TCP_ST_CLOSED_RSVD ||
397 !recvside_stream->status_mgmt) &&
398 ((sendside_stream->state == TCP_ST_TIME_WAIT &&
399 g_config.mos->tcp_tw_interval == 0) ||
400 sendside_stream->state == TCP_ST_CLOSED_RSVD ||
401 !sendside_stream->status_mgmt))
402
403 DestroyTCPStream(mtcp, recvside_stream);
404 }
405 }
406 }
407 /*----------------------------------------------------------------------------*/
408 int
ProcessInTCPPacket(mtcp_manager_t mtcp,struct pkt_ctx * pctx)409 ProcessInTCPPacket(mtcp_manager_t mtcp, struct pkt_ctx *pctx)
410 {
411 uint64_t events = 0;
412 struct tcp_stream *cur_stream;
413 struct iphdr* iph;
414 struct tcphdr* tcph;
415 struct mon_listener *walk;
416 unsigned int hash = 0;
417
418 iph = pctx->p.iph;
419 tcph = (struct tcphdr *)((u_char *)pctx->p.iph + (pctx->p.iph->ihl << 2));
420
421 FillPacketContextTCPInfo(pctx, tcph);
422
423 /* callback for monitor raw socket */
424 TAILQ_FOREACH(walk, &mtcp->monitors, link)
425 if (walk->socket->socktype == MOS_SOCK_MONITOR_RAW)
426 HandleCallback(mtcp, MOS_NULL, walk->socket, MOS_SIDE_BOTH,
427 pctx, MOS_ON_PKT_IN);
428
429 if (pctx->p.ip_len < ((iph->ihl + tcph->doff) << 2))
430 return ERROR;
431
432 #if VERIFY_RX_CHECKSUM
433 if (TCPCalcChecksum((uint16_t *)pctx->p.tcph,
434 (tcph->doff << 2) + pctx->p.payloadlen,
435 iph->saddr, pctx->p.iph->daddr)) {
436 TRACE_DBG("Checksum Error: Original: 0x%04x, calculated: 0x%04x\n",
437 tcph->check, TCPCalcChecksum((uint16_t *)tcph,
438 (tcph->doff << 2) + pctx->p.payloadlen,
439 iph->saddr, iph->daddr));
440 if (pctx->forward && mtcp->num_msp)
441 ForwardIPPacket(mtcp, pctx);
442 return ERROR;
443 }
444 #endif
445 events |= MOS_ON_PKT_IN;
446
447 /* Check whether a packet is belong to any stream */
448 cur_stream = FindStream(mtcp, pctx, &hash);
449 if (!cur_stream) {
450 /*
451 * No need to create stream for monitor.
452 * But do create 1 for client case!
453 */
454 if (mtcp->listener == NULL && mtcp->num_msp == 0) {
455 //if (pctx->forward)
456 // ForwardIPPacket(mtcp, pctx);
457 return TRUE;
458 }
459 /* Create new flow for new packet or return NULL */
460 cur_stream = CreateStream(mtcp, pctx, &hash);
461 if (!cur_stream)
462 events = MOS_ON_ORPHAN;
463 }
464
465 if (cur_stream) {
466 cur_stream->cb_events = events;
467
468 if (cur_stream->rcvvar && cur_stream->rcvvar->rcvbuf)
469 pctx->p.offset = (uint64_t)seq2loff(cur_stream->rcvvar->rcvbuf,
470 pctx->p.seq, cur_stream->rcvvar->irs + 1);
471
472 if (IS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
473 HandleSockStream(mtcp, cur_stream, pctx);
474
475 else if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE))
476 HandleMonitorStream(mtcp, cur_stream, cur_stream->pair_stream, pctx);
477 else
478 assert(0);
479 } else {
480 struct mon_listener *walk;
481 struct sfbpf_program fcode;
482 /*
483 * event callback for pkt_no_conn; MOS_SIDE_BOTH
484 * means that we can't judge sides here
485 */
486 TAILQ_FOREACH(walk, &mtcp->monitors, link) {
487 /* mtcp_bind_monitor_filter()
488 * - apply stream orphan filter to every pkt before raising ORPHAN event */
489 fcode = walk->stream_orphan_fcode;
490 if (!(ISSET_BPFFILTER(fcode) && pctx &&
491 EVAL_BPFFILTER(fcode, (uint8_t *)pctx->p.iph - sizeof(struct ethhdr),
492 pctx->p.ip_len + sizeof(struct ethhdr)) == 0)) {
493 HandleCallback(mtcp, MOS_NULL, walk->socket, MOS_SIDE_BOTH,
494 pctx, events);
495 }
496 }
497 if (mtcp->listener) {
498 /* RFC 793 (page 65) says
499 "An incoming segment containing a RST is discarded."
500 if the TCP state is CLOSED (= TCP stream does not exist). */
501 if (!tcph->rst)
502 /* Send RST if it is run as EndTCP only mode */
503 SendTCPPacketStandalone(mtcp,
504 iph->daddr, tcph->dest, iph->saddr, tcph->source,
505 0, pctx->p.seq + pctx->p.payloadlen + 1, 0,
506 TCP_FLAG_RST | TCP_FLAG_ACK,
507 NULL, 0, pctx->p.cur_ts, 0, 0, -1);
508 } else if (pctx->forward) {
509 /* Do forward or drop if it run as Monitor only mode */
510 ForwardIPPacket(mtcp, pctx);
511 }
512 }
513
514 return TRUE;
515 }
516 /*----------------------------------------------------------------------------*/
517