xref: /mOS-networking-stack/core/src/tcp.c (revision dcdbbb98)
1 #include <assert.h>
2 #include <string.h>
3 
4 #include "mtcp.h"
5 #include "arp.h"
6 #include "socket.h"
7 #include "eth_out.h"
8 #include "ip_out.h"
9 #include "mos_api.h"
10 #include "tcp_util.h"
11 #include "tcp_in.h"
12 #include "tcp_out.h"
13 #include "tcp_ring_buffer.h"
14 #include "eventpoll.h"
15 #include "debug.h"
16 #include "timer.h"
17 #include "ip_in.h"
18 #include "config.h"
19 
20 extern struct pkt_info *
21 ClonePacketCtx(struct pkt_info *to, unsigned char *frame, struct pkt_info *from);
22 
23 #define VERIFY_RX_CHECKSUM	TRUE
24 /*----------------------------------------------------------------------------*/
25 static inline uint32_t
26 DetectStreamType(mtcp_manager_t mtcp, struct pkt_ctx *pctx,
27 				 uint32_t ip, uint16_t port)
28 {
29 	/* To Do: We will extend this filter to check listeners for proxy as well */
30 	struct sockaddr_in *addr;
31 	int rc, cnt_match, socktype;
32 	struct mon_listener *walk;
33 	struct sfbpf_program fcode;
34 
35 	cnt_match = 0;
36 	rc = 0;
37 
38 	if (mtcp->num_msp > 0) {
39 		/* mtcp_bind_monitor_filter()
40 		 * - create MonitorTCPStream only when the filter of any of the existing
41 		 *   passive sockets match the incoming flow */
42 		TAILQ_FOREACH(walk, &mtcp->monitors, link) {
43 			/* For every passive monitor sockets, */
44 			socktype = walk->socket->socktype;
45 			if (socktype != MOS_SOCK_MONITOR_STREAM)
46 				continue; // XXX: can this happen??
47 
48 			/* if pctx hits the filter rule, handle the passive monitor socket */
49 			fcode = walk->stream_syn_fcode;
50 			if (!(ISSET_BPFFILTER(fcode) && pctx &&
51 				EVAL_BPFFILTER(fcode, (uint8_t *)pctx->p.iph - sizeof(struct ethhdr),
52 							   pctx->p.ip_len + sizeof(struct ethhdr)) == 0)) {
53 				walk->is_stream_syn_filter_hit = 1;// set the 'filter hit' flag to 1
54 				cnt_match++; // count the number of matched sockets
55 			}
56 		}
57 
58 		/* if there's any passive monitoring socket whose filter is hit,
59 		   we should create monitor stream */
60 		if (cnt_match > 0)
61 			rc = STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE);
62 	}
63 
64 	if (mtcp->listener) {
65 		/* Detect end TCP stack mode */
66 		addr = &mtcp->listener->socket->saddr;
67 		if (addr->sin_port == port) {
68 			if (addr->sin_addr.s_addr != INADDR_ANY) {
69 				if (ip == addr->sin_addr.s_addr) {
70 					rc |= STREAM_TYPE(MOS_SOCK_STREAM);
71 				}
72 			} else {
73 				int i;
74 
75 				for (i = 0; i < g_config.mos->netdev_table->num; i++) {
76 					if (ip == g_config.mos->netdev_table->ent[i]->ip_addr) {
77 						rc |= STREAM_TYPE(MOS_SOCK_STREAM);
78 					}
79 				}
80 			}
81 		}
82 	}
83 
84 	return rc;
85 }
86 /*----------------------------------------------------------------------------*/
87 static inline tcp_stream *
88 CreateServerStream(mtcp_manager_t mtcp, int type, struct pkt_ctx *pctx)
89 {
90 	tcp_stream *cur_stream = NULL;
91 
92 	/* create new stream and add to flow hash table */
93 	cur_stream = CreateTCPStream(mtcp, NULL, type,
94 			pctx->p.iph->daddr, pctx->p.tcph->dest,
95 			pctx->p.iph->saddr, pctx->p.tcph->source, NULL);
96 	if (!cur_stream) {
97 		TRACE_ERROR("INFO: Could not allocate tcp_stream!\n");
98 		return FALSE;
99 	}
100 
101 	cur_stream->rcvvar->irs = pctx->p.seq;
102 	cur_stream->sndvar->peer_wnd = pctx->p.window;
103 	cur_stream->rcv_nxt = cur_stream->rcvvar->irs;
104 	cur_stream->sndvar->cwnd = 1;
105 	ParseTCPOptions(cur_stream, pctx->p.cur_ts, (uint8_t *)pctx->p.tcph +
106 			TCP_HEADER_LEN, (pctx->p.tcph->doff << 2) - TCP_HEADER_LEN);
107 
108 	return cur_stream;
109 }
110 /*----------------------------------------------------------------------------*/
111 static inline tcp_stream *
112 CreateMonitorStream(mtcp_manager_t mtcp, struct pkt_ctx* pctx,
113 		    uint32_t stream_type, unsigned int *hash)
114 {
115 	tcp_stream *stream = NULL;
116 	struct socket_map *walk;
117 	/* create a client stream context */
118 	stream = CreateDualTCPStream(mtcp, NULL, stream_type, pctx->p.iph->daddr,
119 				     pctx->p.tcph->dest, pctx->p.iph->saddr,
120 				     pctx->p.tcph->source, NULL);
121 	if (!stream)
122 		return FALSE;
123 
124 	stream->side = MOS_SIDE_CLI;
125 	stream->pair_stream->side = MOS_SIDE_SVR;
126 	/* update recv context */
127 	stream->rcvvar->irs = pctx->p.seq;
128 	stream->sndvar->peer_wnd = pctx->p.window;
129 	stream->rcv_nxt = stream->rcvvar->irs + 1;
130 	stream->sndvar->cwnd = 1;
131 
132 	/*
133 	 * if buffer management is off, then disable
134 	 * monitoring tcp ring of either streams (only if stream
135 	 * is just monitor stream active)
136 	 */
137 	if (IS_STREAM_TYPE(stream, MOS_SOCK_MONITOR_STREAM_ACTIVE)) {
138 		assert(IS_STREAM_TYPE(stream->pair_stream,
139 				      MOS_SOCK_MONITOR_STREAM_ACTIVE));
140 
141 		stream->buffer_mgmt = FALSE;
142 		stream->pair_stream->buffer_mgmt = FALSE;
143 
144 		/*
145 		 * if there is even a single monitor asking for
146 		 * buffer management, enable it (that's why the
147 		 * need for the loop)
148 		 */
149 		uint8_t bm;
150 		stream->status_mgmt = 0;
151 		SOCKQ_FOREACH_START(walk, &stream->msocks) {
152 			bm = walk->monitor_stream->monitor_listener->server_buf_mgmt;
153 			if (bm > stream->buffer_mgmt) {
154 				stream->buffer_mgmt = bm;
155 			}
156 			if (walk->monitor_stream->monitor_listener->server_mon == 1) {
157 				stream->status_mgmt = 1;
158 			}
159 		} SOCKQ_FOREACH_END;
160 
161 		stream->pair_stream->status_mgmt = 0;
162 		SOCKQ_FOREACH_START(walk, &stream->pair_stream->msocks) {
163 			bm = walk->monitor_stream->monitor_listener->client_buf_mgmt;
164 			if (bm > stream->pair_stream->buffer_mgmt) {
165 				stream->pair_stream->buffer_mgmt = bm;
166 			}
167 			if (walk->monitor_stream->monitor_listener->client_mon == 1) {
168 				stream->pair_stream->status_mgmt = 1;
169 			}
170 		} SOCKQ_FOREACH_END;
171 	}
172 
173 	ParseTCPOptions(stream, pctx->p.cur_ts,
174 			(uint8_t *)pctx->p.tcph + TCP_HEADER_LEN,
175 			(pctx->p.tcph->doff << 2) - TCP_HEADER_LEN);
176 
177 	return stream;
178 }
179 /*----------------------------------------------------------------------------*/
180 static inline struct tcp_stream *
181 FindStream(mtcp_manager_t mtcp, struct pkt_ctx *pctx, unsigned int *hash)
182 {
183 	struct tcp_stream temp_stream;
184 
185 	temp_stream.saddr = pctx->p.iph->daddr;
186 	temp_stream.sport = pctx->p.tcph->dest;
187 	temp_stream.daddr = pctx->p.iph->saddr;
188 	temp_stream.dport = pctx->p.tcph->source;
189 
190 	return HTSearch(mtcp->tcp_flow_table, &temp_stream, hash);
191 }
192 /*----------------------------------------------------------------------------*/
193 /* Create new flow for new packet or return NULL */
194 /*----------------------------------------------------------------------------*/
195 static inline struct tcp_stream *
196 CreateStream(mtcp_manager_t mtcp, struct pkt_ctx *pctx, unsigned int *hash)
197 {
198 	tcp_stream *cur_stream = NULL;
199 	uint32_t stream_type;
200 	const struct iphdr *iph = pctx->p.iph;
201 	const struct tcphdr* tcph = pctx->p.tcph;
202 
203 	if (tcph->syn && !tcph->ack) {
204 		/* handle the SYN */
205 
206 		stream_type = DetectStreamType(mtcp, pctx, iph->daddr, tcph->dest);
207 		if (!stream_type) {
208 			TRACE_DBG("Refusing SYN packet.\n");
209 #ifdef DBGMSG
210 			DumpIPPacket(mtcp, iph, pctx->p.ip_len);
211 #endif
212 			return NULL;
213 		}
214 
215 		/* if it is accepting connections only */
216 		if (stream_type == STREAM_TYPE(MOS_SOCK_STREAM)) {
217 			cur_stream = CreateServerStream(mtcp, stream_type, pctx);
218 			if (!cur_stream) {
219 				TRACE_DBG("No available space in flow pool.\n");
220 #ifdef DBGMSG
221 				DumpIPPacket(mtcp, iph, pctx->p.ip_len);
222 #endif
223 			}
224 		} else if (stream_type & STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE)) {
225 			/*
226 			 * create both monitoring streams, and accept
227 			 * connection if it is set in embedded environment
228 			 */
229 #if 1
230 			cur_stream = CreateClientTCPStream(mtcp, NULL, stream_type,
231 									pctx->p.iph->saddr, pctx->p.tcph->source,
232 									pctx->p.iph->daddr, pctx->p.tcph->dest,
233 									hash);
234 #else
235 			cur_stream = CreateMonitorStream(mtcp, pctx, stream_type, hash);
236 #endif
237 			if (!cur_stream) {
238 				TRACE_DBG("No available space in flow pool.\n");
239 #ifdef DBGMSG
240 				DumpIPPacket(mtcp, iph, pctx->p.ip_len);
241 #endif
242 			}
243 		}  else {
244 			/* invalid stream type! */
245 		}
246 
247 		return cur_stream;
248 
249 	} else {
250 		TRACE_DBG("Weird packet comes.\n");
251 #ifdef DBGMSG
252 		DumpIPPacket(mtcp, iph, pctx->p.ip_len);
253 #endif
254 		return NULL;
255 	}
256 }
257 /*----------------------------------------------------------------------------*/
258 inline void
259 FillPacketContextTCPInfo(struct pkt_ctx *pctx, struct tcphdr * tcph)
260 {
261 	pctx->p.tcph = tcph;
262 	pctx->p.payload    = (uint8_t *)tcph + (tcph->doff << 2);
263 	pctx->p.payloadlen = pctx->p.ip_len - (pctx->p.payload - (u_char *)pctx->p.iph);
264 	pctx->p.seq = ntohl(tcph->seq);
265 	pctx->p.ack_seq = ntohl(tcph->ack_seq);
266 	pctx->p.window = ntohs(tcph->window);
267 	pctx->p.offset = 0;
268 
269 	return ;
270 }
271 /*----------------------------------------------------------------------------*/
272 /**
273  * Called for every incoming packet from the NIC (when monitoring is disabled)
274  */
275 static void
276 HandleSockStream(mtcp_manager_t mtcp, struct tcp_stream *cur_stream,
277 				struct pkt_ctx *pctx)
278 {
279 	UpdateRecvTCPContext(mtcp, cur_stream, pctx);
280 	DoActionEndTCPPacket(mtcp, cur_stream, pctx);
281 }
282 /*----------------------------------------------------------------------------*/
283 void
284 UpdateMonitor(mtcp_manager_t mtcp, struct tcp_stream *sendside_stream,
285 			struct tcp_stream *recvside_stream, struct pkt_ctx *pctx,
286 			bool is_pkt_reception)
287 {
288 	struct socket_map *walk;
289 
290 	assert(pctx);
291 
292 #ifdef RECORDPKT_PER_STREAM
293 	/* clone sendside_stream even if sender is disabled */
294 	ClonePacketCtx(&sendside_stream->last_pctx.p,
295 		       sendside_stream->last_pkt_data, &(pctx.p));
296 #endif
297 	/* update send stream context first */
298 	if (sendside_stream->status_mgmt) {
299 		sendside_stream->cb_events = MOS_ON_PKT_IN;
300 
301 		if (is_pkt_reception)
302 			UpdatePassiveSendTCPContext(mtcp, sendside_stream, pctx);
303 
304 		sendside_stream->allow_pkt_modification = true;
305 		/* POST hook of sender */
306 		SOCKQ_FOREACH_START(walk, &sendside_stream->msocks) {
307 			HandleCallback(mtcp, MOS_HK_SND, walk, sendside_stream->side,
308 				       pctx, sendside_stream->cb_events);
309 		} SOCKQ_FOREACH_END;
310 		sendside_stream->allow_pkt_modification = false;
311 	}
312 
313 	/* Attach Server-side stream */
314 	if (recvside_stream == NULL) {
315 		assert(sendside_stream->side == MOS_SIDE_CLI);
316 		if ((recvside_stream = AttachServerTCPStream(mtcp, sendside_stream, 0,
317 				pctx->p.iph->saddr, pctx->p.tcph->source,
318 				pctx->p.iph->daddr, pctx->p.tcph->dest)) == NULL) {
319 			DestroyTCPStream(mtcp, sendside_stream);
320 			return;
321 		}
322 		/* update recv context */
323 		recvside_stream->rcvvar->irs = pctx->p.seq;
324 		recvside_stream->sndvar->peer_wnd = pctx->p.window;
325 		recvside_stream->rcv_nxt = recvside_stream->rcvvar->irs + 1;
326 		recvside_stream->sndvar->cwnd = 1;
327 
328 		ParseTCPOptions(recvside_stream, pctx->p.cur_ts,
329 				(uint8_t *)pctx->p.tcph + TCP_HEADER_LEN,
330 				(pctx->p.tcph->doff << 2) - TCP_HEADER_LEN);
331 	}
332 
333 	/* Perform post-send tcp activities */
334 	PostSendTCPAction(mtcp, pctx, recvside_stream, sendside_stream);
335 
336 	if (/*1*/recvside_stream->status_mgmt) {
337 		recvside_stream->cb_events = MOS_ON_PKT_IN;
338 
339 		/* Predict events which may be raised prior to performing TCP processing */
340 		PreRecvTCPEventPrediction(mtcp, pctx, recvside_stream);
341 
342 		/* retransmitted packet should avoid event simulation */
343 		//if ((recvside_stream->cb_events & MOS_ON_REXMIT) == 0)
344 			/* update receive stream context (recv_side stream) */
345 		if (is_pkt_reception)
346 			UpdateRecvTCPContext(mtcp, recvside_stream, pctx);
347 		else
348 			UpdatePassiveRecvTCPContext(mtcp, recvside_stream, pctx);
349 
350 		/* POST hook of receiver */
351 		SOCKQ_FOREACH_START(walk, &recvside_stream->msocks) {
352 			HandleCallback(mtcp, MOS_HK_RCV, walk, recvside_stream->side,
353 				       pctx, recvside_stream->cb_events);
354 		} SOCKQ_FOREACH_END;
355 	}
356 
357 	/* reset callback events counter */
358 	recvside_stream->cb_events = 0;
359 	sendside_stream->cb_events = 0;
360 }
361 /*----------------------------------------------------------------------------*/
362 static void
363 HandleMonitorStream(mtcp_manager_t mtcp, struct tcp_stream *sendside_stream,
364 			struct tcp_stream *recvside_stream, struct pkt_ctx *pctx)
365 {
366 	UpdateMonitor(mtcp, sendside_stream, recvside_stream, pctx, true);
367 
368 	recvside_stream = sendside_stream->pair_stream;
369 
370 	if (HAS_STREAM_TYPE(recvside_stream, MOS_SOCK_STREAM)) {
371 		DoActionEndTCPPacket(mtcp, recvside_stream, pctx);
372 	} else {
373 		/* forward packets */
374 		if (pctx->forward)
375 			ForwardIPPacket(mtcp, pctx);
376 
377 		if (recvside_stream->stream_type == sendside_stream->stream_type &&
378 		    IS_STREAM_TYPE(recvside_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE)) {
379 			if (((recvside_stream->state == TCP_ST_TIME_WAIT &&
380 				  g_config.mos->tcp_tw_interval == 0) ||
381 			     recvside_stream->state == TCP_ST_CLOSED_RSVD ||
382 			     !recvside_stream->status_mgmt) &&
383 			    ((sendside_stream->state == TCP_ST_TIME_WAIT &&
384 				  g_config.mos->tcp_tw_interval == 0) ||
385 			     sendside_stream->state == TCP_ST_CLOSED_RSVD ||
386 			     !sendside_stream->status_mgmt))
387 
388 				DestroyTCPStream(mtcp, recvside_stream);
389 			}
390 	}
391 }
392 /*----------------------------------------------------------------------------*/
393 int
394 ProcessInTCPPacket(mtcp_manager_t mtcp, struct pkt_ctx *pctx)
395 {
396 	uint64_t events = 0;
397 	struct tcp_stream *cur_stream;
398 	struct iphdr* iph;
399 	struct tcphdr* tcph;
400 	struct mon_listener *walk;
401 	unsigned int hash = 0;
402 
403 	iph = pctx->p.iph;
404 	tcph = (struct tcphdr *)((u_char *)pctx->p.iph + (pctx->p.iph->ihl << 2));
405 
406 	FillPacketContextTCPInfo(pctx, tcph);
407 
408 	/* callback for monitor raw socket */
409 	TAILQ_FOREACH(walk, &mtcp->monitors, link)
410 		if (walk->socket->socktype == MOS_SOCK_MONITOR_RAW)
411 			HandleCallback(mtcp, MOS_NULL, walk->socket, MOS_SIDE_BOTH,
412 				       pctx, MOS_ON_PKT_IN);
413 
414 	if (pctx->p.ip_len < ((iph->ihl + tcph->doff) << 2))
415 		return ERROR;
416 
417 #if VERIFY_RX_CHECKSUM
418 	if (TCPCalcChecksum((uint16_t *)pctx->p.tcph,
419 						(tcph->doff << 2) + pctx->p.payloadlen,
420 						iph->saddr, pctx->p.iph->daddr)) {
421 		TRACE_DBG("Checksum Error: Original: 0x%04x, calculated: 0x%04x\n",
422 				tcph->check, TCPCalcChecksum((uint16_t *)tcph,
423 				(tcph->doff << 2) + pctx->p.payloadlen,
424 				iph->saddr, iph->daddr));
425 		if (pctx->forward && mtcp->num_msp)
426 			ForwardIPPacket(mtcp, pctx);
427 		return ERROR;
428 	}
429 #endif
430 	events |= MOS_ON_PKT_IN;
431 
432 	/* Check whether a packet is belong to any stream */
433 	cur_stream = FindStream(mtcp, pctx, &hash);
434 	if (!cur_stream) {
435 		/*
436 		 * No need to create stream for monitor.
437 		 *  But do create 1 for client case!
438 		 */
439 		if (mtcp->listener == NULL && mtcp->num_msp == 0) {
440 			//if (pctx->forward)
441 			//	ForwardIPPacket(mtcp, pctx);
442 			return TRUE;
443 		}
444 		/* Create new flow for new packet or return NULL */
445 		cur_stream = CreateStream(mtcp, pctx, &hash);
446 		if (!cur_stream)
447 			events = MOS_ON_ORPHAN;
448 	}
449 
450 	if (cur_stream) {
451 		cur_stream->cb_events = events;
452 
453 		if (cur_stream->rcvvar && cur_stream->rcvvar->rcvbuf)
454 			pctx->p.offset = (uint64_t)seq2loff(cur_stream->rcvvar->rcvbuf,
455 					pctx->p.seq, cur_stream->rcvvar->irs + 1);
456 
457 		if (IS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
458 			HandleSockStream(mtcp, cur_stream, pctx);
459 
460 		else if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE))
461 			HandleMonitorStream(mtcp, cur_stream, cur_stream->pair_stream, pctx);
462 		else
463 			assert(0);
464 	} else {
465 		struct mon_listener *walk;
466 		struct sfbpf_program fcode;
467 		/*
468 		 * event callback for pkt_no_conn; MOS_SIDE_BOTH
469 		 * means that we can't judge sides here
470 		 */
471 		TAILQ_FOREACH(walk, &mtcp->monitors, link) {
472 			/* mtcp_bind_monitor_filter()
473 			 * - apply stream orphan filter to every pkt before raising ORPHAN event */
474 			fcode = walk->stream_orphan_fcode;
475 			if (!(ISSET_BPFFILTER(fcode) && pctx &&
476 				EVAL_BPFFILTER(fcode, (uint8_t *)pctx->p.iph - sizeof(struct ethhdr),
477 							   pctx->p.ip_len + sizeof(struct ethhdr)) == 0)) {
478 				HandleCallback(mtcp, MOS_NULL, walk->socket, MOS_SIDE_BOTH,
479 					       pctx, events);
480 			}
481 		}
482 		if (mtcp->listener) {
483 			/* RFC 793 (page 65) says
484 			   "An incoming segment containing a RST is discarded."
485 			   if the TCP state is CLOSED (= TCP stream does not exist). */
486 			if (!tcph->rst)
487 				/* Send RST if it is run as EndTCP only mode */
488 				SendTCPPacketStandalone(mtcp,
489 							iph->daddr, tcph->dest, iph->saddr, tcph->source,
490 							0, pctx->p.seq + pctx->p.payloadlen + 1, 0, TCP_FLAG_RST | TCP_FLAG_ACK,
491 							NULL, 0, pctx->p.cur_ts, 0, 0, -1);
492 		} else if (pctx->forward) {
493 			/* Do forward or drop if it run as Monitor only mode */
494 			ForwardIPPacket(mtcp, pctx);
495 		}
496 	}
497 
498 	return TRUE;
499 }
500 /*----------------------------------------------------------------------------*/
501