xref: /mOS-networking-stack/core/src/tcp.c (revision a5e1a556)
1 #include <assert.h>
2 #include <string.h>
3 
4 #include "mtcp.h"
5 #include "arp.h"
6 #include "socket.h"
7 #include "eth_out.h"
8 #include "ip_out.h"
9 #include "mos_api.h"
10 #include "tcp_util.h"
11 #include "tcp_in.h"
12 #include "tcp_out.h"
13 #include "tcp_ring_buffer.h"
14 #include "eventpoll.h"
15 #include "debug.h"
16 #include "timer.h"
17 #include "ip_in.h"
18 #include "config.h"
19 
20 extern struct pkt_info *
21 ClonePacketCtx(struct pkt_info *to, unsigned char *frame, struct pkt_ctx *from);
22 
23 #define VERIFY_RX_CHECKSUM	TRUE
24 /*----------------------------------------------------------------------------*/
25 static inline uint32_t
26 DetectStreamType(mtcp_manager_t mtcp, struct pkt_ctx *pctx,
27 				 uint32_t ip, uint16_t port)
28 {
29 	/* To Do: We will extend this filter to check listeners for proxy as well */
30 	struct sockaddr_in *addr;
31 	int rc, cnt_match, socktype;
32 	struct mon_listener *walk;
33 	struct sfbpf_program fcode;
34 
35 	cnt_match = 0;
36 	rc = 0;
37 
38 	if (mtcp->num_msp > 0) {
39 		/* mtcp_bind_monitor_filter()
40 		 * - create MonitorTCPStream only when the filter of any of the existing
41 		 *   passive sockets match the incoming flow */
42 		TAILQ_FOREACH(walk, &mtcp->monitors, link) {
43 			/* For every passive monitor sockets, */
44 			socktype = walk->socket->socktype;
45 			if (socktype != MOS_SOCK_MONITOR_STREAM)
46 				continue; // XXX: can this happen??
47 
48 			/* if pctx hits the filter rule, handle the passive monitor socket */
49 			fcode = walk->stream_syn_fcode;
50 			if (!(ISSET_BPFFILTER(fcode) && pctx &&
51 				EVAL_BPFFILTER(fcode, (uint8_t *)pctx->p.iph - sizeof(struct ethhdr),
52 							   pctx->p.ip_len + sizeof(struct ethhdr)) == 0)) {
53 				walk->is_stream_syn_filter_hit = 1;// set the 'filter hit' flag to 1
54 				cnt_match++; // count the number of matched sockets
55 			}
56 		}
57 
58 		/* if there's any passive monitoring socket whose filter is hit,
59 		   we should create monitor stream */
60 		if (cnt_match > 0)
61 			rc = STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE);
62 	}
63 
64 	if (mtcp->listener) {
65 		/* Detect end TCP stack mode */
66 		addr = &mtcp->listener->socket->saddr;
67 		if (addr->sin_port == port) {
68 			if (addr->sin_addr.s_addr != INADDR_ANY) {
69 				if (ip == addr->sin_addr.s_addr) {
70 					rc |= STREAM_TYPE(MOS_SOCK_STREAM);
71 				}
72 			} else {
73 				int i;
74 
75 				for (i = 0; i < g_config.mos->netdev_table->num; i++) {
76 					if (ip == g_config.mos->netdev_table->ent[i]->ip_addr) {
77 						rc |= STREAM_TYPE(MOS_SOCK_STREAM);
78 					}
79 				}
80 			}
81 		}
82 	}
83 
84 	return rc;
85 }
86 /*----------------------------------------------------------------------------*/
87 static inline tcp_stream *
88 CreateServerStream(mtcp_manager_t mtcp, int type, struct pkt_ctx *pctx)
89 {
90 	tcp_stream *cur_stream = NULL;
91 
92 	/* create new stream and add to flow hash table */
93 	cur_stream = CreateTCPStream(mtcp, NULL, type,
94 			pctx->p.iph->daddr, pctx->p.tcph->dest,
95 			pctx->p.iph->saddr, pctx->p.tcph->source, NULL);
96 	if (!cur_stream) {
97 		TRACE_ERROR("INFO: Could not allocate tcp_stream!\n");
98 		return FALSE;
99 	}
100 
101 	cur_stream->rcvvar->irs = pctx->p.seq;
102 	cur_stream->sndvar->peer_wnd = pctx->p.window;
103 	cur_stream->rcv_nxt = cur_stream->rcvvar->irs;
104 	cur_stream->sndvar->cwnd = 1;
105 	ParseTCPOptions(cur_stream, pctx->p.cur_ts, (uint8_t *)pctx->p.tcph +
106 			TCP_HEADER_LEN, (pctx->p.tcph->doff << 2) - TCP_HEADER_LEN);
107 
108 	return cur_stream;
109 }
110 /*----------------------------------------------------------------------------*/
111 static inline tcp_stream *
112 CreateMonitorStream(mtcp_manager_t mtcp, struct pkt_ctx* pctx,
113 		    uint32_t stream_type, unsigned int *hash)
114 {
115 	tcp_stream *stream = NULL;
116 	struct socket_map *walk;
117 	/* create a client stream context */
118 	stream = CreateDualTCPStream(mtcp, NULL, stream_type, pctx->p.iph->daddr,
119 				     pctx->p.tcph->dest, pctx->p.iph->saddr,
120 				     pctx->p.tcph->source, NULL);
121 	if (!stream)
122 		return FALSE;
123 
124 	stream->side = MOS_SIDE_CLI;
125 	stream->pair_stream->side = MOS_SIDE_SVR;
126 	/* update recv context */
127 	stream->rcvvar->irs = pctx->p.seq;
128 	stream->sndvar->peer_wnd = pctx->p.window;
129 	stream->rcv_nxt = stream->rcvvar->irs + 1;
130 	stream->sndvar->cwnd = 1;
131 
132 	/*
133 	 * if buffer management is off, then disable
134 	 * monitoring tcp ring of either streams (only if stream
135 	 * is just monitor stream active)
136 	 */
137 	if (IS_STREAM_TYPE(stream, MOS_SOCK_MONITOR_STREAM_ACTIVE)) {
138 		assert(IS_STREAM_TYPE(stream->pair_stream,
139 				      MOS_SOCK_MONITOR_STREAM_ACTIVE));
140 
141 		stream->buffer_mgmt = FALSE;
142 		stream->pair_stream->buffer_mgmt = FALSE;
143 
144 		/*
145 		 * if there is even a single monitor asking for
146 		 * buffer management, enable it (that's why the
147 		 * need for the loop)
148 		 */
149 		uint8_t bm;
150 		stream->status_mgmt = 0;
151 		SOCKQ_FOREACH_START(walk, &stream->msocks) {
152 			bm = walk->monitor_stream->monitor_listener->server_buf_mgmt;
153 			if (bm > stream->buffer_mgmt) {
154 				stream->buffer_mgmt = bm;
155 			}
156 			if (walk->monitor_stream->monitor_listener->server_mon == 1) {
157 				stream->status_mgmt = 1;
158 			}
159 		} SOCKQ_FOREACH_END;
160 
161 		stream->pair_stream->status_mgmt = 0;
162 		SOCKQ_FOREACH_START(walk, &stream->pair_stream->msocks) {
163 			bm = walk->monitor_stream->monitor_listener->client_buf_mgmt;
164 			if (bm > stream->pair_stream->buffer_mgmt) {
165 				stream->pair_stream->buffer_mgmt = bm;
166 			}
167 			if (walk->monitor_stream->monitor_listener->client_mon == 1) {
168 				stream->pair_stream->status_mgmt = 1;
169 			}
170 		} SOCKQ_FOREACH_END;
171 	}
172 
173 	ParseTCPOptions(stream, pctx->p.cur_ts,
174 			(uint8_t *)pctx->p.tcph + TCP_HEADER_LEN,
175 			(pctx->p.tcph->doff << 2) - TCP_HEADER_LEN);
176 
177 	return stream;
178 }
179 /*----------------------------------------------------------------------------*/
180 static inline struct tcp_stream *
181 FindStream(mtcp_manager_t mtcp, struct pkt_ctx *pctx, unsigned int *hash)
182 {
183 	struct tcp_stream temp_stream;
184 
185 	temp_stream.saddr = pctx->p.iph->daddr;
186 	temp_stream.sport = pctx->p.tcph->dest;
187 	temp_stream.daddr = pctx->p.iph->saddr;
188 	temp_stream.dport = pctx->p.tcph->source;
189 
190 	return HTSearch(mtcp->tcp_flow_table, &temp_stream, hash);
191 }
192 /*----------------------------------------------------------------------------*/
193 /* Create new flow for new packet or return NULL */
194 /*----------------------------------------------------------------------------*/
195 static inline struct tcp_stream *
196 CreateStream(mtcp_manager_t mtcp, struct pkt_ctx *pctx, unsigned int *hash)
197 {
198 	tcp_stream *cur_stream = NULL;
199 	uint32_t stream_type;
200 	const struct iphdr *iph = pctx->p.iph;
201 	const struct tcphdr* tcph = pctx->p.tcph;
202 
203 	if (tcph->syn && !tcph->ack) {
204 		/* handle the SYN */
205 
206 		stream_type = DetectStreamType(mtcp, pctx, iph->daddr, tcph->dest);
207 		if (!stream_type) {
208 			TRACE_DBG("Refusing SYN packet.\n");
209 #ifdef DBGMSG
210 			DumpIPPacket(mtcp, iph, pctx->p.ip_len);
211 #endif
212 			return NULL;
213 		}
214 
215 		/* if it is accepting connections only */
216 		if (stream_type == STREAM_TYPE(MOS_SOCK_STREAM)) {
217 			cur_stream = CreateServerStream(mtcp, stream_type, pctx);
218 			if (!cur_stream) {
219 				TRACE_DBG("No available space in flow pool.\n");
220 #ifdef DBGMSG
221 				DumpIPPacket(mtcp, iph, pctx->p.ip_len);
222 #endif
223 			}
224 		} else if (stream_type & STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE)) {
225 			/*
226 			 * create both monitoring streams, and accept
227 			 * connection if it is set in embedded environment
228 			 */
229 #if 1
230 			cur_stream = CreateClientTCPStream(mtcp, NULL, stream_type,
231 									pctx->p.iph->saddr, pctx->p.tcph->source,
232 									pctx->p.iph->daddr, pctx->p.tcph->dest,
233 									hash);
234 #else
235 			cur_stream = CreateMonitorStream(mtcp, pctx, stream_type, hash);
236 #endif
237 			if (!cur_stream) {
238 				TRACE_DBG("No available space in flow pool.\n");
239 #ifdef DBGMSG
240 				DumpIPPacket(mtcp, iph, pctx->p.ip_len);
241 #endif
242 			}
243 		}  else {
244 			/* invalid stream type! */
245 		}
246 
247 		return cur_stream;
248 
249 	} else {
250 		TRACE_DBG("Weird packet comes.\n");
251 #ifdef DBGMSG
252 		DumpIPPacket(mtcp, iph, pctx->p.ip_len);
253 #endif
254 		return NULL;
255 	}
256 }
257 /*----------------------------------------------------------------------------*/
258 inline void
259 FillPacketContextTCPInfo(struct pkt_ctx *pctx, struct tcphdr * tcph)
260 {
261 	pctx->p.tcph = tcph;
262 	pctx->p.payload    = (uint8_t *)tcph + (tcph->doff << 2);
263 	pctx->p.payloadlen = pctx->p.ip_len - (pctx->p.payload - (u_char *)pctx->p.iph);
264 	pctx->p.seq = ntohl(tcph->seq);
265 	pctx->p.ack_seq = ntohl(tcph->ack_seq);
266 	pctx->p.window = ntohs(tcph->window);
267 	pctx->p.offset = 0;
268 
269 	return ;
270 }
271 /*----------------------------------------------------------------------------*/
272 /**
273  * Called for every incoming packet from the NIC (when monitoring is disabled)
274  */
275 static void
276 HandleSockStream(mtcp_manager_t mtcp, struct tcp_stream *cur_stream,
277 				struct pkt_ctx *pctx)
278 {
279 	UpdateRecvTCPContext(mtcp, cur_stream, pctx);
280 	DoActionEndTCPPacket(mtcp, cur_stream, pctx);
281 }
282 /*----------------------------------------------------------------------------*/
283 void
284 UpdateMonitor(mtcp_manager_t mtcp, struct tcp_stream *sendside_stream,
285 			struct tcp_stream *recvside_stream, struct pkt_ctx *pctx,
286 			bool is_pkt_reception)
287 {
288 	struct socket_map *walk;
289 
290 	assert(pctx);
291 
292 	/* clone sendside_stream even if sender is disabled */
293 	ClonePacketCtx(&sendside_stream->last_pctx.p,
294 		       sendside_stream->last_pkt_data, pctx);
295 
296 	/* update send stream context first */
297 	if (sendside_stream->status_mgmt) {
298 		sendside_stream->cb_events = MOS_ON_PKT_IN;
299 
300 		if (is_pkt_reception)
301 			UpdatePassiveSendTCPContext(mtcp, sendside_stream, pctx);
302 
303 		sendside_stream->allow_pkt_modification = true;
304 		/* POST hook of sender */
305 		SOCKQ_FOREACH_START(walk, &sendside_stream->msocks) {
306 			HandleCallback(mtcp, MOS_HK_SND, walk, sendside_stream->side,
307 				       pctx, sendside_stream->cb_events);
308 		} SOCKQ_FOREACH_END;
309 		sendside_stream->allow_pkt_modification = false;
310 	}
311 
312 	/* Attach Server-side stream */
313 	if (recvside_stream == NULL) {
314 		assert(sendside_stream->side == MOS_SIDE_CLI);
315 		if ((recvside_stream = AttachServerTCPStream(mtcp, sendside_stream, 0,
316 				pctx->p.iph->saddr, pctx->p.tcph->source,
317 				pctx->p.iph->daddr, pctx->p.tcph->dest)) == NULL) {
318 			DestroyTCPStream(mtcp, sendside_stream);
319 			return;
320 		}
321 		/* update recv context */
322 		recvside_stream->rcvvar->irs = pctx->p.seq;
323 		recvside_stream->sndvar->peer_wnd = pctx->p.window;
324 		recvside_stream->rcv_nxt = recvside_stream->rcvvar->irs + 1;
325 		recvside_stream->sndvar->cwnd = 1;
326 
327 		ParseTCPOptions(recvside_stream, pctx->p.cur_ts,
328 				(uint8_t *)pctx->p.tcph + TCP_HEADER_LEN,
329 				(pctx->p.tcph->doff << 2) - TCP_HEADER_LEN);
330 	}
331 
332 	/* Perform post-send tcp activities */
333 	PostSendTCPAction(mtcp, pctx, recvside_stream, sendside_stream);
334 
335 	if (/*1*/recvside_stream->status_mgmt) {
336 		recvside_stream->cb_events = MOS_ON_PKT_IN;
337 
338 		/* Predict events which may be raised prior to performing TCP processing */
339 		PreRecvTCPEventPrediction(mtcp, pctx, recvside_stream);
340 
341 		/* retransmitted packet should avoid event simulation */
342 		//if ((recvside_stream->cb_events & MOS_ON_REXMIT) == 0)
343 			/* update receive stream context (recv_side stream) */
344 		if (is_pkt_reception)
345 			UpdateRecvTCPContext(mtcp, recvside_stream, pctx);
346 		else
347 			UpdatePassiveRecvTCPContext(mtcp, recvside_stream, pctx);
348 
349 		/* POST hook of receiver */
350 		SOCKQ_FOREACH_START(walk, &recvside_stream->msocks) {
351 			HandleCallback(mtcp, MOS_HK_RCV, walk, recvside_stream->side,
352 				       pctx, recvside_stream->cb_events);
353 		} SOCKQ_FOREACH_END;
354 	}
355 
356 	/* reset callback events counter */
357 	recvside_stream->cb_events = 0;
358 	sendside_stream->cb_events = 0;
359 }
360 /*----------------------------------------------------------------------------*/
361 static void
362 HandleMonitorStream(mtcp_manager_t mtcp, struct tcp_stream *sendside_stream,
363 			struct tcp_stream *recvside_stream, struct pkt_ctx *pctx)
364 {
365 	UpdateMonitor(mtcp, sendside_stream, recvside_stream, pctx, true);
366 
367 	recvside_stream = sendside_stream->pair_stream;
368 
369 	if (HAS_STREAM_TYPE(recvside_stream, MOS_SOCK_STREAM)) {
370 		DoActionEndTCPPacket(mtcp, recvside_stream, pctx);
371 	} else {
372 		/* forward packets */
373 		if (pctx->forward)
374 			ForwardIPPacket(mtcp, pctx);
375 
376 		if (recvside_stream->stream_type == sendside_stream->stream_type &&
377 		    IS_STREAM_TYPE(recvside_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE)) {
378 			if (((recvside_stream->state == TCP_ST_TIME_WAIT &&
379 				  g_config.mos->tcp_tw_interval == 0) ||
380 			     recvside_stream->state == TCP_ST_CLOSED_RSVD ||
381 			     !recvside_stream->status_mgmt) &&
382 			    ((sendside_stream->state == TCP_ST_TIME_WAIT &&
383 				  g_config.mos->tcp_tw_interval == 0) ||
384 			     sendside_stream->state == TCP_ST_CLOSED_RSVD ||
385 			     !sendside_stream->status_mgmt))
386 
387 				DestroyTCPStream(mtcp, recvside_stream);
388 			}
389 	}
390 }
391 /*----------------------------------------------------------------------------*/
392 int
393 ProcessInTCPPacket(mtcp_manager_t mtcp, struct pkt_ctx *pctx)
394 {
395 	uint64_t events = 0;
396 	struct tcp_stream *cur_stream;
397 	struct iphdr* iph;
398 	struct tcphdr* tcph;
399 	struct mon_listener *walk;
400 	unsigned int hash = 0;
401 
402 	iph = pctx->p.iph;
403 	tcph = (struct tcphdr *)((u_char *)pctx->p.iph + (pctx->p.iph->ihl << 2));
404 
405 	FillPacketContextTCPInfo(pctx, tcph);
406 
407 	/* callback for monitor raw socket */
408 	TAILQ_FOREACH(walk, &mtcp->monitors, link)
409 		if (walk->socket->socktype == MOS_SOCK_MONITOR_RAW)
410 			HandleCallback(mtcp, MOS_NULL, walk->socket, MOS_SIDE_BOTH,
411 				       pctx, MOS_ON_PKT_IN);
412 
413 	if (pctx->p.ip_len < ((iph->ihl + tcph->doff) << 2))
414 		return ERROR;
415 
416 #if VERIFY_RX_CHECKSUM
417 	if (TCPCalcChecksum((uint16_t *)pctx->p.tcph,
418 						(tcph->doff << 2) + pctx->p.payloadlen,
419 						iph->saddr, pctx->p.iph->daddr)) {
420 		TRACE_DBG("Checksum Error: Original: 0x%04x, calculated: 0x%04x\n",
421 				tcph->check, TCPCalcChecksum((uint16_t *)tcph,
422 				(tcph->doff << 2) + pctx->p.payloadlen,
423 				iph->saddr, iph->daddr));
424 		if (pctx->forward && mtcp->num_msp)
425 			ForwardIPPacket(mtcp, pctx);
426 		return ERROR;
427 	}
428 #endif
429 	events |= MOS_ON_PKT_IN;
430 
431 	/* Check whether a packet is belong to any stream */
432 	cur_stream = FindStream(mtcp, pctx, &hash);
433 	if (!cur_stream) {
434 		/*
435 		 * No need to create stream for monitor.
436 		 *  But do create 1 for client case!
437 		 */
438 		if (mtcp->listener == NULL && mtcp->num_msp == 0) {
439 			//if (pctx->forward)
440 			//	ForwardIPPacket(mtcp, pctx);
441 			return TRUE;
442 		}
443 		/* Create new flow for new packet or return NULL */
444 		cur_stream = CreateStream(mtcp, pctx, &hash);
445 		if (!cur_stream)
446 			events = MOS_ON_ORPHAN;
447 	}
448 
449 	if (cur_stream) {
450 		cur_stream->cb_events = events;
451 
452 		if (cur_stream->rcvvar && cur_stream->rcvvar->rcvbuf)
453 			pctx->p.offset = (uint64_t)seq2loff(cur_stream->rcvvar->rcvbuf,
454 					pctx->p.seq, cur_stream->rcvvar->irs + 1);
455 
456 		if (IS_STREAM_TYPE(cur_stream, MOS_SOCK_STREAM))
457 			HandleSockStream(mtcp, cur_stream, pctx);
458 
459 		else if (HAS_STREAM_TYPE(cur_stream, MOS_SOCK_MONITOR_STREAM_ACTIVE))
460 			HandleMonitorStream(mtcp, cur_stream, cur_stream->pair_stream, pctx);
461 		else
462 			assert(0);
463 	} else {
464 		struct mon_listener *walk;
465 		struct sfbpf_program fcode;
466 		/*
467 		 * event callback for pkt_no_conn; MOS_SIDE_BOTH
468 		 * means that we can't judge sides here
469 		 */
470 		TAILQ_FOREACH(walk, &mtcp->monitors, link) {
471 			/* mtcp_bind_monitor_filter()
472 			 * - apply stream orphan filter to every pkt before raising ORPHAN event */
473 			fcode = walk->stream_orphan_fcode;
474 			if (!(ISSET_BPFFILTER(fcode) && pctx &&
475 				EVAL_BPFFILTER(fcode, (uint8_t *)pctx->p.iph - sizeof(struct ethhdr),
476 							   pctx->p.ip_len + sizeof(struct ethhdr)) == 0)) {
477 				HandleCallback(mtcp, MOS_NULL, walk->socket, MOS_SIDE_BOTH,
478 					       pctx, events);
479 			}
480 		}
481 		if (mtcp->listener) {
482 			/* Send RST if it is run as EndTCP only mode */
483 			SendTCPPacketStandalone(mtcp,
484 						iph->daddr, tcph->dest, iph->saddr, tcph->source,
485 						0, pctx->p.seq + pctx->p.payloadlen + 1, 0, TCP_FLAG_RST | TCP_FLAG_ACK,
486 						NULL, 0, pctx->p.cur_ts, 0);
487 		} else if (pctx->forward) {
488 			/* Do forward or drop if it run as Monitor only mode */
489 			ForwardIPPacket(mtcp, pctx);
490 		}
491 	}
492 
493 	return TRUE;
494 }
495 /*----------------------------------------------------------------------------*/
496