xref: /mOS-networking-stack/core/src/tcp.c (revision 76404edc)
1 #include <assert.h>
2 #include <string.h>
3 
4 #include "mtcp.h"
5 #include "arp.h"
6 #include "socket.h"
7 #include "eth_out.h"
8 #include "ip_out.h"
9 #include "mos_api.h"
10 #include "tcp_util.h"
11 #include "tcp_in.h"
12 #include "tcp_out.h"
13 #include "tcp_ring_buffer.h"
14 #include "eventpoll.h"
15 #include "debug.h"
16 #include "timer.h"
17 #include "ip_in.h"
18 #include "config.h"
19 
20 extern struct pkt_info *
21 ClonePacketCtx(struct pkt_info *to, unsigned char *frame, struct pkt_ctx *from);
22 
23 #define VERIFY_RX_CHECKSUM	TRUE
24 /*----------------------------------------------------------------------------*/
25 static inline uint32_t
26 DetectStreamType(mtcp_manager_t mtcp, struct pkt_ctx *pctx,
27 				 uint32_t ip, uint16_t port)
28 {
29 	/* To Do: We will extend this filter to check listeners for proxy as well */
30 	struct sockaddr_in *addr;
31 	int rc, cnt_match, socktype;
32 	struct mon_listener *walk;
33 	struct sfbpf_program fcode;
34 
35 	cnt_match = 0;
36 	rc = 0;
37 
38 	if (mtcp->num_msp > 0) {
39 		/* mtcp_bind_monitor_filter()
40 		 * - create MonitorTCPStream only when the filter of any of the existing
41 		 *   passive sockets match the incoming flow */
42 		TAILQ_FOREACH(walk, &mtcp->monitors, link) {
43 			/* For every passive monitor sockets, */
44 			socktype = walk->socket->socktype;
45 			if (socktype != MOS_SOCK_MONITOR_STREAM)
46 				continue; // XXX: can this happen??
47 
48 			/* if pctx hits the filter rule, handle the passive monitor socket */
49 			fcode = walk->stream_syn_fcode;
50 			if (!(ISSET_BPFFILTER(fcode) && pctx &&
51 				EVAL_BPFFILTER(fcode, (uint8_t *)pctx->p.iph - sizeof(struct ethhdr),
52 							   pctx->p.ip_len + sizeof(struct ethhdr)) == 0)) {
53 				walk->is_stream_syn_filter_hit = 1;// set the 'filter hit' flag to 1
54 				cnt_match++; // count the number of matched sockets
55 			}
56 		}
57 
58 		/* if there's any passive monitoring socket whose filter is hit,
59 		   we should create monitor stream */
60 		if (cnt_match > 0)
61 			rc = STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE);
62 	}
63 
64 	if (mtcp->listener) {
65 		/* Detect end TCP stack mode */
66 		addr = &mtcp->listener->socket->saddr;
67 		if (addr->sin_port == port) {
68 			if (addr->sin_addr.s_addr != INADDR_ANY) {
69 				if (ip == addr->sin_addr.s_addr) {
70 					rc |= STREAM_TYPE(MOS_SOCK_STREAM);
71 				}
72 			} else {
73 				int i;
74 
75 				for (i = 0; i < g_config.mos->netdev_table->num; i++) {
76 					if (ip == g_config.mos->netdev_table->ent[i]->ip_addr) {
77 						rc |= STREAM_TYPE(MOS_SOCK_STREAM);
78 					}
79 				}
80 			}
81 		}
82 	}
83 
84 	return rc;
85 }
86 /*----------------------------------------------------------------------------*/
87 static inline tcp_stream *
88 CreateServerStream(mtcp_manager_t mtcp, int type, struct pkt_ctx *pctx)
89 {
90 	tcp_stream *cur_stream = NULL;
91 
92 	/* create new stream and add to flow hash table */
93 	cur_stream = CreateTCPStream(mtcp, NULL, type,
94 			pctx->p.iph->daddr, pctx->p.tcph->dest,
95 			pctx->p.iph->saddr, pctx->p.tcph->source, NULL);
96 	if (!cur_stream) {
97 		TRACE_ERROR("INFO: Could not allocate tcp_stream!\n");
98 		return FALSE;
99 	}
100 
101 	cur_stream->rcvvar->irs = pctx->p.seq;
102 	cur_stream->sndvar->peer_wnd = pctx->p.window;
103 	cur_stream->rcv_nxt = cur_stream->rcvvar->irs;
104 	cur_stream->sndvar->cwnd = 1;
105 	ParseTCPOptions(cur_stream, pctx->p.cur_ts, (uint8_t *)pctx->p.tcph +
106 			TCP_HEADER_LEN, (pctx->p.tcph->doff << 2) - TCP_HEADER_LEN);
107 
108 	return cur_stream;
109 }
110 /*----------------------------------------------------------------------------*/
111 static inline tcp_stream *
112 CreateMonitorStream(mtcp_manager_t mtcp, struct pkt_ctx* pctx,
113 		    uint32_t stream_type, unsigned int *hash)
114 {
115 	tcp_stream *stream = NULL;
116 	struct socket_map *walk;
117 	/* create a client stream context */
118 	stream = CreateDualTCPStream(mtcp, NULL, stream_type, pctx->p.iph->daddr,
119 				     pctx->p.tcph->dest, pctx->p.iph->saddr,
120 				     pctx->p.tcph->source, NULL);
121 	if (!stream)
122 		return FALSE;
123 
124 	stream->side = MOS_SIDE_CLI;
125 	stream->pair_stream->side = MOS_SIDE_SVR;
126 	/* update recv context */
127 	stream->rcvvar->irs = pctx->p.seq;
128 	stream->sndvar->peer_wnd = pctx->p.window;
129 	stream->rcv_nxt = stream->rcvvar->irs + 1;
130 	stream->sndvar->cwnd = 1;
131 
132 	/*
133 	 * if buffer management is off, then disable
134 	 * monitoring tcp ring of either streams (only if stream
135 	 * is just monitor stream active)
136 	 */
137 	if (IS_STREAM_TYPE(stream, MOS_SOCK_MONITOR_STREAM_ACTIVE)) {
138 		assert(IS_STREAM_TYPE(stream->pair_stream,
139 				      MOS_SOCK_MONITOR_STREAM_ACTIVE));
140 
141 		stream->buffer_mgmt = FALSE;
142 		stream->pair_stream->buffer_mgmt = FALSE;
143 
144 		/*
145 		 * if there is even a single monitor asking for
146 		 * buffer management, enable it (that's why the
147 		 * need for the loop)
148 		 */
149 #ifdef NEWRB
150 		uint8_t bm;
151 		stream->status_mgmt = 0;
152 		SOCKQ_FOREACH_START(walk, &stream->msocks) {
153 			bm = walk->monitor_stream->monitor_listener->server_buf_mgmt;
154 			if (bm > stream->buffer_mgmt) {
155 				stream->buffer_mgmt = bm;
156 			}
157 			if (walk->monitor_stream->monitor_listener->server_mon == 1) {
158 				stream->status_mgmt = 1;
159 			}
160 		} SOCKQ_FOREACH_END;
161 
162 		stream->pair_stream->status_mgmt = 0;
163 		SOCKQ_FOREACH_START(walk, &stream->pair_stream->msocks) {
164 			bm = walk->monitor_stream->monitor_listener->client_buf_mgmt;
165 			if (bm > stream->pair_stream->buffer_mgmt) {
166 				stream->pair_stream->buffer_mgmt = bm;
167 			}
168 			if (walk->monitor_stream->monitor_listener->client_mon == 1) {
169 				stream->pair_stream->status_mgmt = 1;
170 			}
171 		} SOCKQ_FOREACH_END;
172 #else
173 		SOCKQ_FOREACH_START(walk, &stream->msocks) {
174 			if (walk->monitor_stream->monitor_listener->server_buf_mgmt) {
175 				stream->buffer_mgmt = TRUE;
176 				break;
177 			}
178 		} SOCKQ_FOREACH_END;
179 
180 		SOCKQ_FOREACH_START(walk, &stream->pair_stream->msocks) {
181 			if (walk->monitor_stream->monitor_listener->client_buf_mgmt) {
182 				stream->pair_stream->buffer_mgmt = TRUE;
183 				break;
184 			}
185 		} SOCKQ_FOREACH_END;
186 #endif
187 	}
188 
189 	ParseTCPOptions(stream, pctx->p.cur_ts,
190 			(uint8_t *)pctx->p.tcph + TCP_HEADER_LEN,
191 			(pctx->p.tcph->doff << 2) - TCP_HEADER_LEN);
192 
193 	return stream;
194 }
195 /*----------------------------------------------------------------------------*/
196 static inline struct tcp_stream *
197 FindStream(mtcp_manager_t mtcp, struct pkt_ctx *pctx, unsigned int *hash)
198 {
199 	struct tcp_stream temp_stream;
200 
201 	temp_stream.saddr = pctx->p.iph->daddr;
202 	temp_stream.sport = pctx->p.tcph->dest;
203 	temp_stream.daddr = pctx->p.iph->saddr;
204 	temp_stream.dport = pctx->p.tcph->source;
205 
206 	return HTSearch(mtcp->tcp_flow_table, &temp_stream, hash);
207 }
208 /*----------------------------------------------------------------------------*/
209 /* Create new flow for new packet or return NULL */
210 /*----------------------------------------------------------------------------*/
211 static inline struct tcp_stream *
212 CreateStream(mtcp_manager_t mtcp, struct pkt_ctx *pctx, unsigned int *hash)
213 {
214 	tcp_stream *cur_stream = NULL;
215 	uint32_t stream_type;
216 	const struct iphdr *iph = pctx->p.iph;
217 	const struct tcphdr* tcph = pctx->p.tcph;
218 
219 	if (tcph->syn && !tcph->ack) {
220 		/* handle the SYN */
221 
222 		stream_type = DetectStreamType(mtcp, pctx, iph->daddr, tcph->dest);
223 		if (!stream_type) {
224 			TRACE_DBG("Refusing SYN packet.\n");
225 #ifdef DBGMSG
226 			DumpIPPacket(mtcp, iph, pctx->p.ip_len);
227 #endif
228 			return NULL;
229 		}
230 
231 		/* if it is accepting connections only */
232 		if (stream_type == STREAM_TYPE(MOS_SOCK_STREAM)) {
233 			cur_stream = CreateServerStream(mtcp, stream_type, pctx);
234 			if (!cur_stream) {
235 				TRACE_DBG("No available space in flow pool.\n");
236 #ifdef DBGMSG
237 				DumpIPPacket(mtcp, iph, pctx->p.ip_len);
238 #endif
239 			}
240 		} else if (stream_type & STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE)) {
241 			/*
242 			 * create both monitoring streams, and accept
243 			 * connection if it is set in embedded environment
244 			 */
245 #if 1
246 			cur_stream = CreateClientTCPStream(mtcp, NULL, stream_type,
247 									pctx->p.iph->saddr, pctx->p.tcph->source,
248 									pctx->p.iph->daddr, pctx->p.tcph->dest,
249 									hash);
250 #else
251 			cur_stream = CreateMonitorStream(mtcp, pctx, stream_type, hash);
252 #endif
253 			if (!cur_stream) {
254 				TRACE_DBG("No available space in flow pool.\n");
255 #ifdef DBGMSG
256 				DumpIPPacket(mtcp, iph, pctx->p.ip_len);
257 #endif
258 			}
259 		}  else {
260 			/* invalid stream type! */
261 		}
262 
263 		return cur_stream;
264 
265 	} else {
266 		TRACE_DBG("Weird packet comes.\n");
267 #ifdef DBGMSG
268 		DumpIPPacket(mtcp, iph, pctx->p.ip_len);
269 #endif
270 		return NULL;
271 	}
272 }
273 /*----------------------------------------------------------------------------*/
274 inline void
275 FillPacketContextTCPInfo(struct pkt_ctx *pctx, struct tcphdr * tcph)
276 {
277 	pctx->p.tcph = tcph;
278 	pctx->p.payload    = (uint8_t *)tcph + (tcph->doff << 2);
279 	pctx->p.payloadlen = pctx->p.ip_len - (pctx->p.payload - (u_char *)pctx->p.iph);
280 	pctx->p.seq = ntohl(tcph->seq);
281 	pctx->p.ack_seq = ntohl(tcph->ack_seq);
282 	pctx->p.window = ntohs(tcph->window);
283 #ifdef NEWPPEEK
284 	pctx->p.offset = 0;
285 #endif
286 
287 	return ;
288 }
289 /*----------------------------------------------------------------------------*/
290 /**
291  * Called for every incoming packet from the NIC (when monitoring is disabled)
292  */
293 static void
294 HandleSockStream(mtcp_manager_t mtcp, struct tcp_stream *cur_stream,
295 				struct pkt_ctx *pctx)
296 {
297 	UpdateRecvTCPContext(mtcp, cur_stream, pctx);
298 	DoActionEndTCPPacket(mtcp, cur_stream, pctx);
299 }
300 /*----------------------------------------------------------------------------*/
301 void
302 UpdateMonitor(mtcp_manager_t mtcp, struct tcp_stream *sendside_stream,
303 			struct tcp_stream *recvside_stream, struct pkt_ctx *pctx,
304 			bool is_pkt_reception)
305 {
306 	struct socket_map *walk;
307 
308 	assert(pctx);
309 
310 	/* clone sendside_stream even if sender is disabled */
311 	ClonePacketCtx(&sendside_stream->last_pctx.p,
312 		       sendside_stream->last_pkt_data, pctx);
313 
314 	/* update send stream context first */
315 	if (sendside_stream->status_mgmt) {
316 		sendside_stream->cb_events = MOS_ON_PKT_IN;
317 
318 		if (is_pkt_reception)
319 			UpdatePassiveSendTCPContext(mtcp, sendside_stream, pctx);
320 
321 		sendside_stream->allow_pkt_modification = true;
322 		/* POST hook of sender */
323 		SOCKQ_FOREACH_START(walk, &sendside_stream->msocks) {
324 			HandleCallback(mtcp, MOS_HK_SND, walk, sendside_stream->side,
325 				       pctx, sendside_stream->cb_events);
326 		} SOCKQ_FOREACH_END;
327 		sendside_stream->allow_pkt_modification = false;
328 	}
329 
330 	/* Attach Server-side stream */
331 	if (recvside_stream == NULL) {
332 		assert(sendside_stream->side == MOS_SIDE_CLI);
333 		if ((recvside_stream = AttachServerTCPStream(mtcp, sendside_stream, 0,
334 				pctx->p.iph->saddr, pctx->p.tcph->source,
335 				pctx->p.iph->daddr, pctx->p.tcph->dest)) == NULL) {
336 			DestroyTCPStream(mtcp, sendside_stream);
337 			return;
338 		}
339 		/* update recv context */
340 		recvside_stream->rcvvar->irs = pctx->p.seq;
341 		recvside_stream->sndvar->peer_wnd = pctx->p.window;
342 		recvside_stream->rcv_nxt = recvside_stream->rcvvar->irs + 1;
343 		recvside_stream->sndvar->cwnd = 1;
344 
345 		ParseTCPOptions(recvside_stream, pctx->p.cur_ts,
346 				(uint8_t *)pctx->p.tcph + TCP_HEADER_LEN,
347 				(pctx->p.tcph->doff << 2) - TCP_HEADER_LEN);
348 	}
349 
350 	/* Perform post-send tcp activities */
351 	PostSendTCPAction(mtcp, pctx, recvside_stream, sendside_stream);
352 
353 	if (/*1*/recvside_stream->status_mgmt) {
354 		recvside_stream->cb_events = MOS_ON_PKT_IN;
355 
356 		/* Predict events which may be raised prior to performing TCP processing */
357 		PreRecvTCPEventPrediction(mtcp, pctx, recvside_stream);
358 
359 		/* retransmitted packet should avoid event simulation */
360 		//if ((recvside_stream->cb_events & MOS_ON_REXMIT) == 0)
361 			/* update receive stream context (recv_side stream) */
362 		if (is_pkt_reception)
363 			UpdateRecvTCPContext(mtcp, recvside_stream, pctx);
364 		else
365 			UpdatePassiveRecvTCPContext(mtcp, recvside_stream, pctx);
366 
367 		/* POST hook of receiver */
368 		SOCKQ_FOREACH_START(walk, &recvside_stream->msocks) {
369 			HandleCallback(mtcp, MOS_HK_RCV, walk, recvside_stream->side,
370 				       pctx, recvside_stream->cb_events);
371 		} SOCKQ_FOREACH_END;
372 	}
373 
374 	/* reset callback events counter */
375 	recvside_stream->cb_events = 0;
376 	sendside_stream->cb_events = 0;
377 }
378 /*----------------------------------------------------------------------------*/
379 static void
380 HandleMonitorStream(mtcp_manager_t mtcp, struct tcp_stream *sendside_stream,
381 			struct tcp_stream *recvside_stream, struct pkt_ctx *pctx)
382 {
383 	UpdateMonitor(mtcp, sendside_stream, recvside_stream, pctx, true);
384 
385 	recvside_stream = sendside_stream->pair_stream;
386 
387 	if (HAS_STREAM_TYPE(recvside_stream, MOS_SOCK_STREAM)) {
388 		DoActionEndTCPPacket(mtcp, recvside_stream, pctx);
389 	} else {
390 		/* forward packets */
391 		if (pctx->forward)
392 			ForwardIPPacket(mtcp, pctx);
393 
394 		if (recvside_stream->stream_type == sendside_stream->stream_type &&
395 		    IS_STREAM_TYPE(recvside_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE)) {
396 			if (((recvside_stream->state == TCP_ST_TIME_WAIT &&
397 				  g_config.mos->tcp_tw_interval == 0) ||
398 			     recvside_stream->state == TCP_ST_CLOSED_RSVD ||
399 			     !recvside_stream->status_mgmt) &&
400 			    ((sendside_stream->state == TCP_ST_TIME_WAIT &&
401 				  g_config.mos->tcp_tw_interval == 0) ||
402 			     sendside_stream->state == TCP_ST_CLOSED_RSVD ||
403 			     !sendside_stream->status_mgmt))
404 
405 				DestroyTCPStream(mtcp, recvside_stream);
406 			}
407 	}
408 }
409 /*----------------------------------------------------------------------------*/
410 int
411 ProcessInTCPPacket(mtcp_manager_t mtcp, struct pkt_ctx *pctx)
412 {
413 	uint64_t events = 0;
414 	struct tcp_stream *cur_stream;
415 	struct iphdr* iph;
416 	struct tcphdr* tcph;
417 	struct mon_listener *walk;
418 	unsigned int hash = 0;
419 
420 	iph = pctx->p.iph;
421 	tcph = (struct tcphdr *)((u_char *)pctx->p.iph + (pctx->p.iph->ihl << 2));
422 
423 	FillPacketContextTCPInfo(pctx, tcph);
424 
425 	/* callback for monitor raw socket */
426 	TAILQ_FOREACH(walk, &mtcp->monitors, link)
427 		if (walk->socket->socktype == MOS_SOCK_MONITOR_RAW)
428 			HandleCallback(mtcp, MOS_NULL, walk->socket, MOS_SIDE_BOTH,
429 				       pctx, MOS_ON_PKT_IN);
430 
431 	if (pctx->p.ip_len < ((iph->ihl + tcph->doff) << 2))
432 		return ERROR;
433 
434 #if VERIFY_RX_CHECKSUM
435 	if (TCPCalcChecksum((uint16_t *)pctx->p.tcph,
436 						(tcph->doff << 2) + pctx->p.payloadlen,
437 						iph->saddr, pctx->p.iph->daddr)) {
438 		TRACE_DBG("Checksum Error: Original: 0x%04x, calculated: 0x%04x\n",
439 				tcph->check, TCPCalcChecksum((uint16_t *)tcph,
440 				(tcph->doff << 2) + payloadlen,
441 				iph->saddr, iph->daddr));
442 		if (pctx->forward && mtcp->num_msp)
443 			ForwardIPPacket(mtcp, pctx);
444 		return ERROR;
445 	}
446 #endif
447 	events |= MOS_ON_PKT_IN;
448 
449 	/* Check whether a packet is belong to any stream */
450 	cur_stream = FindStream(mtcp, pctx, &hash);
451 	if (!cur_stream) {
452 		/*
453 		 * No need to create stream for monitor.
454 		 *  But do create 1 for client case!
455 		 */
456 		if (mtcp->listener == NULL && mtcp->num_msp == 0) {
457 			//if (pctx->forward)
458 			//	ForwardIPPacket(mtcp, pctx);
459 			return TRUE;
460 		}
461 		/* Create new flow for new packet or return NULL */
462 		cur_stream = CreateStream(mtcp, pctx, &hash);
463 		if (!cur_stream)
464 			events = MOS_ON_ORPHAN;
465 	}
466 
467 	if (cur_stream) {
468 		cur_stream->cb_events = events;
469 
470 #ifdef NEWPPEEK
471 		if (cur_stream->rcvvar && cur_stream->rcvvar->rcvbuf)
472 			pctx->p.offset = (uint64_t)seq2loff(cur_stream->rcvvar->rcvbuf,
473 					pctx->p.seq, cur_stream->rcvvar->irs + 1);
474 #endif
475 
476 		if (IS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
477 			HandleSockStream(mtcp, cur_stream, pctx);
478 
479 		else if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE))
480 			HandleMonitorStream(mtcp, cur_stream, cur_stream->pair_stream, pctx);
481 		else
482 			assert(0);
483 	} else {
484 		struct mon_listener *walk;
485 		struct sfbpf_program fcode;
486 		/*
487 		 * event callback for pkt_no_conn; MOS_SIDE_BOTH
488 		 * means that we can't judge sides here
489 		 */
490 		TAILQ_FOREACH(walk, &mtcp->monitors, link) {
491 			/* mtcp_bind_monitor_filter()
492 			 * - apply stream orphan filter to every pkt before raising ORPHAN event */
493 			fcode = walk->stream_orphan_fcode;
494 			if (!(ISSET_BPFFILTER(fcode) && pctx &&
495 				EVAL_BPFFILTER(fcode, (uint8_t *)pctx->p.iph - sizeof(struct ethhdr),
496 							   pctx->p.ip_len + sizeof(struct ethhdr)) == 0)) {
497 				HandleCallback(mtcp, MOS_NULL, walk->socket, MOS_SIDE_BOTH,
498 					       pctx, events);
499 			}
500 		}
501 		if (mtcp->listener) {
502 			/* Send RST if it is run as EndTCP only mode */
503 			SendTCPPacketStandalone(mtcp,
504 						iph->daddr, tcph->dest, iph->saddr, tcph->source,
505 						0, pctx->p.seq + pctx->p.payloadlen + 1, 0, TCP_FLAG_RST | TCP_FLAG_ACK,
506 						NULL, 0, pctx->p.cur_ts, 0);
507 		} else if (pctx->forward) {
508 			/* Do forward or drop if it run as Monitor only mode */
509 			ForwardIPPacket(mtcp, pctx);
510 		}
511 	}
512 
513 	return TRUE;
514 }
515 /*----------------------------------------------------------------------------*/
516