xref: /mOS-networking-stack/core/src/api.c (revision a5e1a556)
1 #include <sys/queue.h>
2 #include <sys/ioctl.h>
3 #include <limits.h>
4 #include <unistd.h>
5 #include <assert.h>
6 #include <string.h>
7 
8 #ifdef DARWIN
9 #include <netinet/tcp.h>
10 #include <netinet/ip.h>
11 #include <netinet/if_ether.h>
12 #endif
13 
14 #include "mtcp.h"
15 #include "mtcp_api.h"
16 #include "tcp_in.h"
17 #include "tcp_stream.h"
18 #include "tcp_out.h"
19 #include "ip_out.h"
20 #include "eventpoll.h"
21 #include "pipe.h"
22 #include "fhash.h"
23 #include "addr_pool.h"
24 #include "rss.h"
25 #include "config.h"
26 #include "debug.h"
27 #include "eventpoll.h"
28 #include "mos_api.h"
29 #include "tcp_rb.h"
30 
31 #define MAX(a, b) ((a)>(b)?(a):(b))
32 #define MIN(a, b) ((a)<(b)?(a):(b))
33 
34 /*----------------------------------------------------------------------------*/
35 /** Stop monitoring the socket! (function prototype)
36  * @param [in] mctx: mtcp context
37  * @param [in] sock: monitoring stream socket id
38  * @param [in] side: side of monitoring (client side, server side or both)
39  *
40  * This function is now DEPRECATED and is only used within mOS core...
41  */
42 int
43 mtcp_cb_stop(mctx_t mctx, int sock, int side);
44 /*----------------------------------------------------------------------------*/
45 /** Reset the connection (send RST packets to both sides)
46  *  (We need to decide the API for this.)
47  */
48 //int
49 //mtcp_cb_reset(mctx_t mctx, int sock, int side);
50 /*----------------------------------------------------------------------------*/
51 inline mtcp_manager_t
52 GetMTCPManager(mctx_t mctx)
53 {
54 	if (!mctx) {
55 		errno = EACCES;
56 		return NULL;
57 	}
58 
59 	if (mctx->cpu < 0 || mctx->cpu >= num_cpus) {
60 		errno = EINVAL;
61 		return NULL;
62 	}
63 
64 	if (!g_mtcp[mctx->cpu] || g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) {
65 		errno = EPERM;
66 		return NULL;
67 	}
68 
69 	return g_mtcp[mctx->cpu];
70 }
71 /*----------------------------------------------------------------------------*/
72 static inline int
73 GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen)
74 {
75 	tcp_stream *cur_stream;
76 
77 	if (!socket->stream) {
78 		errno = EBADF;
79 		return -1;
80 	}
81 
82 	cur_stream = socket->stream;
83 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
84 		if (cur_stream->close_reason == TCP_TIMEDOUT ||
85 				cur_stream->close_reason == TCP_CONN_FAIL ||
86 				cur_stream->close_reason == TCP_CONN_LOST) {
87 			*(int *)optval = ETIMEDOUT;
88 			*optlen = sizeof(int);
89 
90 			return 0;
91 		}
92 	}
93 
94 	if (cur_stream->state == TCP_ST_CLOSE_WAIT ||
95 			cur_stream->state == TCP_ST_CLOSED_RSVD) {
96 		if (cur_stream->close_reason == TCP_RESET) {
97 			*(int *)optval = ECONNRESET;
98 			*optlen = sizeof(int);
99 
100 			return 0;
101 		}
102 	}
103 
104 	if (cur_stream->state == TCP_ST_SYN_SENT &&
105 	    errno == EINPROGRESS) {
106 		*(int *)optval = errno;
107 		*optlen = sizeof(int);
108 
109 		return -1;
110         }
111 
112 	/*
113 	 * `base case`: If socket sees no so_error, then
114 	 * this also means close_reason will always be
115 	 * TCP_NOT_CLOSED.
116 	 */
117 	if (cur_stream->close_reason == TCP_NOT_CLOSED) {
118 		*(int *)optval = 0;
119 		*optlen = sizeof(int);
120 
121 		return 0;
122 	}
123 
124 	errno = ENOSYS;
125 	return -1;
126 }
127 /*----------------------------------------------------------------------------*/
128 int
129 mtcp_getsockname(mctx_t mctx, int sockid, struct sockaddr *addr,
130 		 socklen_t *addrlen)
131 {
132 	mtcp_manager_t mtcp;
133 	socket_map_t socket;
134 
135 	mtcp = GetMTCPManager(mctx);
136 	if (!mtcp) {
137 		return -1;
138 	}
139 
140 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
141 		TRACE_API("Socket id %d out of range.\n", sockid);
142 		errno = EBADF;
143 		return -1;
144 	}
145 
146 	socket = &mtcp->smap[sockid];
147 	if (socket->socktype == MOS_SOCK_UNUSED) {
148 		TRACE_API("Invalid socket id: %d\n", sockid);
149 		errno = EBADF;
150 		return -1;
151 	}
152 
153 	if (*addrlen <= 0) {
154 		TRACE_API("Invalid addrlen: %d\n", *addrlen);
155 		errno = EINVAL;
156 		return -1;
157 	}
158 
159 	if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
160 	    socket->socktype != MOS_SOCK_STREAM) {
161 		TRACE_API("Invalid socket id: %d\n", sockid);
162 		errno = ENOTSOCK;
163 		return -1;
164 	}
165 
166 	*(struct sockaddr_in *)addr = socket->saddr;
167         *addrlen = sizeof(socket->saddr);
168 
169 	return 0;
170 }
171 /*----------------------------------------------------------------------------*/
172 int
173 mtcp_getsockopt(mctx_t mctx, int sockid, int level,
174 		int optname, void *optval, socklen_t *optlen)
175 {
176 	mtcp_manager_t mtcp;
177 	socket_map_t socket;
178 
179 	mtcp = GetMTCPManager(mctx);
180 	if (!mtcp) {
181 		errno = EACCES;
182 		return -1;
183 	}
184 
185 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
186 		TRACE_API("Socket id %d out of range.\n", sockid);
187 		errno = EBADF;
188 		return -1;
189 	}
190 
191 	switch (level) {
192 	case SOL_SOCKET:
193 		socket = &mtcp->smap[sockid];
194 		if (socket->socktype == MOS_SOCK_UNUSED) {
195 			TRACE_API("Invalid socket id: %d\n", sockid);
196 			errno = EBADF;
197 			return -1;
198 		}
199 
200 		if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
201 		    socket->socktype != MOS_SOCK_STREAM) {
202 			TRACE_API("Invalid socket id: %d\n", sockid);
203 			errno = ENOTSOCK;
204 			return -1;
205 		}
206 
207 		if (optname == SO_ERROR) {
208 			if (socket->socktype == MOS_SOCK_STREAM) {
209 				return GetSocketError(socket, optval, optlen);
210 			}
211 		}
212 		break;
213 	case SOL_MONSOCKET:
214 		/* check if the calling thread is in MOS context */
215 		if (mtcp->ctx->thread != pthread_self()) {
216 			errno = EPERM;
217 			return -1;
218 		}
219 		/*
220 		 * All options will only work for active
221 		 * monitor stream sockets
222 		 */
223 		socket = &mtcp->msmap[sockid];
224 		if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
225 			TRACE_API("Invalid socket id: %d\n", sockid);
226 			errno = ENOTSOCK;
227 			return -1;
228 		}
229 
230 		switch (optname) {
231 		case MOS_FRAGINFO_CLIBUF:
232 			return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen);
233 		case MOS_FRAGINFO_SVRBUF:
234 			return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen);
235 		case MOS_INFO_CLIBUF:
236 			return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen);
237 		case MOS_INFO_SVRBUF:
238 			return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen);
239 		case MOS_TCP_STATE_CLI:
240 			return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI,
241 							   optval, optlen);
242 		case MOS_TCP_STATE_SVR:
243 			return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR,
244 							   optval, optlen);
245 		case MOS_TIMESTAMP:
246 			return GetLastTimestamp(socket->monitor_stream->stream,
247 						(uint32_t *)optval,
248 						optlen);
249 		default:
250 		  TRACE_API("can't recognize the optname=%d\n", optname);
251 		  assert(0);
252 		}
253 		break;
254 	}
255 	errno = ENOSYS;
256 	return -1;
257 }
258 /*----------------------------------------------------------------------------*/
259 int
260 mtcp_setsockopt(mctx_t mctx, int sockid, int level,
261 		int optname, const void *optval, socklen_t optlen)
262 {
263 	mtcp_manager_t mtcp;
264 	socket_map_t socket;
265 	tcprb_t *rb;
266 
267 	mtcp = GetMTCPManager(mctx);
268 	if (!mtcp) {
269 		errno = EACCES;
270 		return -1;
271 	}
272 
273 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
274 		TRACE_API("Socket id %d out of range.\n", sockid);
275 		errno = EBADF;
276 		return -1;
277 	}
278 
279 	switch (level) {
280 	case SOL_SOCKET:
281 		socket = &mtcp->smap[sockid];
282 		if (socket->socktype == MOS_SOCK_UNUSED) {
283 			TRACE_API("Invalid socket id: %d\n", sockid);
284 			errno = EBADF;
285 			return -1;
286 		}
287 
288 		if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
289 		    socket->socktype != MOS_SOCK_STREAM) {
290 			TRACE_API("Invalid socket id: %d\n", sockid);
291 			errno = ENOTSOCK;
292 			return -1;
293 		}
294 		break;
295 	case SOL_MONSOCKET:
296 		socket = &mtcp->msmap[sockid];
297 		/*
298 		 * checking of calling thread to be in MOS context is
299 		 * disabled since both optnames can be called from
300 		 * `application' context (on passive sockets)
301 		 */
302 		/*
303 		 * if (mtcp->ctx->thread != pthread_self())
304 		 * return -1;
305 		 */
306 
307 		switch (optname) {
308 		case MOS_CLIBUF:
309 #if 0
310 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
311 				errno = EBADF;
312 				return -1;
313 			}
314 #endif
315 #ifdef DISABLE_DYN_RESIZE
316 			if (*(int *)optval != 0)
317 				return -1;
318 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
319 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
320 					socket->monitor_stream->stream->rcvvar->rcvbuf :
321 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
322 				if (rb) {
323 					tcprb_resize_meta(rb, 0);
324 					tcprb_resize(rb, 0);
325 				}
326 			}
327 			return DisableBuf(socket, MOS_SIDE_CLI);
328 #else
329 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
330 				socket->monitor_stream->stream->rcvvar->rcvbuf :
331 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
332 			if (tcprb_resize_meta(rb, *(int *)optval) < 0)
333 				return -1;
334 			return tcprb_resize(rb,
335 					(((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
336 #endif
337 		case MOS_SVRBUF:
338 #if 0
339 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
340 				errno = EBADF;
341 				return -1;
342 			}
343 #endif
344 #ifdef DISABLE_DYN_RESIZE
345 			if (*(int *)optval != 0)
346 				return -1;
347 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
348 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
349 					socket->monitor_stream->stream->rcvvar->rcvbuf :
350 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
351 				if (rb) {
352 					tcprb_resize_meta(rb, 0);
353 					tcprb_resize(rb, 0);
354 				}
355 			}
356 			return DisableBuf(socket, MOS_SIDE_SVR);
357 #else
358 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
359 				socket->monitor_stream->stream->rcvvar->rcvbuf :
360 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
361 			if (tcprb_resize_meta(rb, *(int *)optval) < 0)
362 				return -1;
363 			return tcprb_resize(rb,
364 					(((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
365 #endif
366 		case MOS_FRAG_CLIBUF:
367 #if 0
368 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
369 				errno = EBADF;
370 				return -1;
371 			}
372 #endif
373 #ifdef DISABLE_DYN_RESIZE
374 			if (*(int *)optval != 0)
375 				return -1;
376 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
377 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
378 					socket->monitor_stream->stream->rcvvar->rcvbuf :
379 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
380 				if (rb)
381 					tcprb_resize(rb, 0);
382 			}
383 			return 0;
384 #else
385 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
386 				socket->monitor_stream->stream->rcvvar->rcvbuf :
387 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
388 			if (rb->len == 0)
389 				return tcprb_resize_meta(rb, *(int *)optval);
390 			else
391 				return -1;
392 #endif
393 		case MOS_FRAG_SVRBUF:
394 #if 0
395 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
396 				errno = EBADF;
397 				return -1;
398 			}
399 #endif
400 #ifdef DISABLE_DYN_RESIZE
401 			if (*(int *)optval != 0)
402 				return -1;
403 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
404 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
405 					socket->monitor_stream->stream->rcvvar->rcvbuf :
406 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
407 				if (rb)
408 					tcprb_resize(rb, 0);
409 			}
410 			return 0;
411 #else
412 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
413 				socket->monitor_stream->stream->rcvvar->rcvbuf :
414 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
415 			if (rb->len == 0)
416 				return tcprb_resize_meta(rb, *(int *)optval);
417 			else
418 				return -1;
419 #endif
420 		case MOS_MONLEVEL:
421 #ifdef OLD_API
422 			assert(*(int *)optval == MOS_NO_CLIBUF ||
423 			       *(int *)optval == MOS_NO_SVRBUF);
424 			return DisableBuf(socket,
425 					  (*(int *)optval == MOS_NO_CLIBUF) ?
426 					  MOS_SIDE_CLI : MOS_SIDE_SVR);
427 #endif
428 		case MOS_SEQ_REMAP:
429 			return TcpSeqChange(socket,
430 					    (uint32_t)((seq_remap_info *)optval)->seq_off,
431 					    ((seq_remap_info *)optval)->side,
432 					    mtcp->pctx->p.seq);
433 		case MOS_STOP_MON:
434 			return mtcp_cb_stop(mctx, sockid, *(int *)optval);
435 		default:
436 			TRACE_API("invalid optname=%d\n", optname);
437 			assert(0);
438 		}
439 		break;
440 	}
441 
442 	errno = ENOSYS;
443 	return -1;
444 }
445 /*----------------------------------------------------------------------------*/
446 int
447 mtcp_setsock_nonblock(mctx_t mctx, int sockid)
448 {
449 	mtcp_manager_t mtcp;
450 
451 	mtcp = GetMTCPManager(mctx);
452 	if (!mtcp) {
453 		errno = EACCES;
454 		return -1;
455 	}
456 
457 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
458 		TRACE_API("Socket id %d out of range.\n", sockid);
459 		errno = EBADF;
460 		return -1;
461 	}
462 
463 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
464 		TRACE_API("Invalid socket id: %d\n", sockid);
465 		errno = EBADF;
466 		return -1;
467 	}
468 
469 	mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
470 
471 	return 0;
472 }
473 /*----------------------------------------------------------------------------*/
474 int
475 mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp)
476 {
477 	mtcp_manager_t mtcp;
478 	socket_map_t socket;
479 
480 	mtcp = GetMTCPManager(mctx);
481 	if (!mtcp) {
482 		errno = EACCES;
483 		return -1;
484 	}
485 
486 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
487 		TRACE_API("Socket id %d out of range.\n", sockid);
488 		errno = EBADF;
489 		return -1;
490 	}
491 
492 	/* only support stream socket */
493 	socket = &mtcp->smap[sockid];
494 
495 	if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
496 		socket->socktype != MOS_SOCK_STREAM) {
497 		TRACE_API("Invalid socket id: %d\n", sockid);
498 		errno = EBADF;
499 		return -1;
500 	}
501 
502 	if (!argp) {
503 		errno = EFAULT;
504 		return -1;
505 	}
506 
507 	if (request == FIONREAD) {
508 		tcp_stream *cur_stream;
509 		tcprb_t *rbuf;
510 
511 		cur_stream = socket->stream;
512 		if (!cur_stream) {
513 			errno = EBADF;
514 			return -1;
515 		}
516 
517 		rbuf = cur_stream->rcvvar->rcvbuf;
518 		*(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0;
519 
520 	} else if (request == FIONBIO) {
521 		/*
522 		 * sockets can only be set to blocking/non-blocking
523 		 * modes during initialization
524 		 */
525 		if ((*(int *)argp))
526 			mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
527 		else
528 			mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK;
529 	} else {
530 		errno = EINVAL;
531 		return -1;
532 	}
533 
534 	return 0;
535 }
536 /*----------------------------------------------------------------------------*/
537 static int
538 mtcp_monitor(mctx_t mctx, socket_map_t sock)
539 {
540 	mtcp_manager_t mtcp;
541 	struct mon_listener *monitor;
542 	int sockid = sock->id;
543 
544 	mtcp = GetMTCPManager(mctx);
545 	if (!mtcp) {
546 		errno = EACCES;
547 		return -1;
548 	}
549 
550 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
551 		TRACE_API("Socket id %d out of range.\n", sockid);
552 		errno = EBADF;
553 		return -1;
554 	}
555 
556 	if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) {
557 		TRACE_API("Invalid socket id: %d\n", sockid);
558 		errno = EBADF;
559 		return -1;
560 	}
561 
562 	if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM ||
563 	      mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) {
564 		TRACE_API("Not a monitor socket. id: %d\n", sockid);
565 		errno = ENOTSOCK;
566 		return -1;
567 	}
568 
569 	monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener));
570 	if (!monitor) {
571 		/* errno set from the malloc() */
572 		errno = ENOMEM;
573 		return -1;
574 	}
575 
576 	/* create a monitor-specific event queue */
577 	monitor->eq = CreateEventQueue(g_config.mos->max_concurrency);
578 	if (!monitor->eq) {
579 		TRACE_API("Can't create event queue (concurrency: %d) for "
580 			  "monitor read event registrations!\n",
581 			  g_config.mos->max_concurrency);
582 		free(monitor);
583 		errno = ENOMEM;
584 		return -1;
585 	}
586 
587 	/* set monitor-related basic parameters */
588 #ifndef NEWEV
589 	monitor->ude_id = UDE_OFFSET;
590 #endif
591 	monitor->socket = sock;
592 	monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL;
593 
594 	/* perform both sides monitoring by default */
595 	monitor->client_mon = monitor->server_mon = 1;
596 
597 	/* add monitor socket to the monitor list */
598 	TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link);
599 
600 	mtcp->msmap[sockid].monitor_listener = monitor;
601 
602 	return 0;
603 }
604 /*----------------------------------------------------------------------------*/
605 int
606 mtcp_socket(mctx_t mctx, int domain, int type, int protocol)
607 {
608 	mtcp_manager_t mtcp;
609 	socket_map_t socket;
610 
611 	mtcp = GetMTCPManager(mctx);
612 	if (!mtcp) {
613 		errno = EACCES;
614 		return -1;
615 	}
616 
617 	if (domain != AF_INET) {
618 		errno = EAFNOSUPPORT;
619 		return -1;
620 	}
621 
622 	if (type == SOCK_STREAM) {
623 		type = MOS_SOCK_STREAM;
624 	} else if (type == MOS_SOCK_MONITOR_STREAM ||
625 		   type == MOS_SOCK_MONITOR_RAW) {
626 		/* do nothing for the time being */
627 	} else {
628 		/* Not supported type */
629 		errno = EINVAL;
630 		return -1;
631 	}
632 
633 	socket = AllocateSocket(mctx, type);
634 	if (!socket) {
635 		errno = ENFILE;
636 		return -1;
637 	}
638 
639 	if (type == MOS_SOCK_MONITOR_STREAM ||
640 	    type == MOS_SOCK_MONITOR_RAW) {
641 		mtcp_manager_t mtcp = GetMTCPManager(mctx);
642 		if (!mtcp) {
643 			errno = EACCES;
644 			return -1;
645 		}
646 		mtcp_monitor(mctx, socket);
647 #ifdef NEWEV
648 		socket->monitor_listener->stree_dontcare = NULL;
649 		socket->monitor_listener->stree_pre_rcv = NULL;
650 		socket->monitor_listener->stree_post_snd = NULL;
651 #else
652 		InitEvB(mtcp, &socket->monitor_listener->dontcare_evb);
653 		InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb);
654 		InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb);
655 #endif
656 	}
657 
658 	return socket->id;
659 }
660 /*----------------------------------------------------------------------------*/
661 int
662 mtcp_bind(mctx_t mctx, int sockid,
663 		const struct sockaddr *addr, socklen_t addrlen)
664 {
665 	mtcp_manager_t mtcp;
666 	struct sockaddr_in *addr_in;
667 
668 	mtcp = GetMTCPManager(mctx);
669 	if (!mtcp) {
670 		errno = EACCES;
671 		return -1;
672 	}
673 
674 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
675 		TRACE_API("Socket id %d out of range.\n", sockid);
676 		errno = EBADF;
677 		return -1;
678 	}
679 
680 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
681 		TRACE_API("Invalid socket id: %d\n", sockid);
682 		errno = EBADF;
683 		return -1;
684 	}
685 
686 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM &&
687 			mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
688 		TRACE_API("Not a stream socket id: %d\n", sockid);
689 		errno = ENOTSOCK;
690 		return -1;
691 	}
692 
693 	if (!addr) {
694 		TRACE_API("Socket %d: empty address!\n", sockid);
695 		errno = EINVAL;
696 		return -1;
697 	}
698 
699 	if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) {
700 		TRACE_API("Socket %d: adress already bind for this socket.\n", sockid);
701 		errno = EINVAL;
702 		return -1;
703 	}
704 
705 	/* we only allow bind() for AF_INET address */
706 	if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
707 		TRACE_API("Socket %d: invalid argument!\n", sockid);
708 		errno = EINVAL;
709 		return -1;
710 	}
711 
712 	if (mtcp->listener) {
713 		TRACE_API("Address already bound!\n");
714 		errno = EINVAL;
715 		return -1;
716 	}
717 	addr_in = (struct sockaddr_in *)addr;
718 	mtcp->smap[sockid].saddr = *addr_in;
719 	mtcp->smap[sockid].opts |= MTCP_ADDR_BIND;
720 
721 	return 0;
722 }
723 /*----------------------------------------------------------------------------*/
724 int
725 mtcp_listen(mctx_t mctx, int sockid, int backlog)
726 {
727 	mtcp_manager_t mtcp;
728 	struct tcp_listener *listener;
729 
730 	mtcp = GetMTCPManager(mctx);
731 	if (!mtcp) {
732 		errno = EACCES;
733 		return -1;
734 	}
735 
736 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
737 		TRACE_API("Socket id %d out of range.\n", sockid);
738 		errno = EBADF;
739 		return -1;
740 	}
741 
742 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
743 		TRACE_API("Invalid socket id: %d\n", sockid);
744 		errno = EBADF;
745 		return -1;
746 	}
747 
748 	if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) {
749 		mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN;
750 	}
751 
752 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
753 		TRACE_API("Not a listening socket. id: %d\n", sockid);
754 		errno = ENOTSOCK;
755 		return -1;
756 	}
757 
758 	if (backlog <= 0 || backlog > g_config.mos->max_concurrency) {
759 		errno = EINVAL;
760 		return -1;
761 	}
762 
763 	listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener));
764 	if (!listener) {
765 		/* errno set from the malloc() */
766 		errno = ENOMEM;
767 		return -1;
768 	}
769 
770 	listener->sockid = sockid;
771 	listener->backlog = backlog;
772 	listener->socket = &mtcp->smap[sockid];
773 
774 	if (pthread_cond_init(&listener->accept_cond, NULL)) {
775 		perror("pthread_cond_init of ctx->accept_cond\n");
776 		/* errno set by pthread_cond_init() */
777 		return -1;
778 	}
779 	if (pthread_mutex_init(&listener->accept_lock, NULL)) {
780 		perror("pthread_mutex_init of ctx->accept_lock\n");
781 		/* errno set by pthread_mutex_init() */
782 		return -1;
783 	}
784 
785 	listener->acceptq = CreateStreamQueue(backlog);
786 	if (!listener->acceptq) {
787 		errno = ENOMEM;
788 		return -1;
789 	}
790 
791 	mtcp->smap[sockid].listener = listener;
792 	mtcp->listener = listener;
793 
794 	return 0;
795 }
796 /*----------------------------------------------------------------------------*/
797 int
798 mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen)
799 {
800 	mtcp_manager_t mtcp;
801 	struct tcp_listener *listener;
802 	socket_map_t socket;
803 	tcp_stream *accepted = NULL;
804 
805 	mtcp = GetMTCPManager(mctx);
806 	if (!mtcp) {
807 		errno = EACCES;
808 		return -1;
809 	}
810 
811 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
812 		TRACE_API("Socket id %d out of range.\n", sockid);
813 		errno = EBADF;
814 		return -1;
815 	}
816 
817 	/* requires listening socket */
818 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
819 		errno = EINVAL;
820 		return -1;
821 	}
822 
823 	listener = mtcp->smap[sockid].listener;
824 
825 	/* dequeue from the acceptq without lock first */
826 	/* if nothing there, acquire lock and cond_wait */
827 	accepted = StreamDequeue(listener->acceptq);
828 	if (!accepted) {
829 		if (listener->socket->opts & MTCP_NONBLOCK) {
830 			errno = EAGAIN;
831 			return -1;
832 
833 		} else {
834 			pthread_mutex_lock(&listener->accept_lock);
835 			while ((accepted = StreamDequeue(listener->acceptq)) == NULL) {
836 				pthread_cond_wait(&listener->accept_cond, &listener->accept_lock);
837 
838 				if (mtcp->ctx->done || mtcp->ctx->exit) {
839 					pthread_mutex_unlock(&listener->accept_lock);
840 					errno = EINTR;
841 					return -1;
842 				}
843 			}
844 			pthread_mutex_unlock(&listener->accept_lock);
845 		}
846 	}
847 
848 	if (!accepted) {
849 		TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n");
850 	}
851 
852 	if (!accepted->socket) {
853 		socket = AllocateSocket(mctx, MOS_SOCK_STREAM);
854 		if (!socket) {
855 			TRACE_ERROR("Failed to create new socket!\n");
856 			/* TODO: destroy the stream */
857 			errno = ENFILE;
858 			return -1;
859 		}
860 		socket->stream = accepted;
861 		accepted->socket = socket;
862 
863 		/* set socket addr parameters */
864 		socket->saddr.sin_family = AF_INET;
865 		socket->saddr.sin_port = accepted->dport;
866 		socket->saddr.sin_addr.s_addr = accepted->daddr;
867 
868 		/* if monitor is enabled, complete the socket assignment */
869 		if (socket->stream->pair_stream != NULL)
870 			socket->stream->pair_stream->socket = socket;
871 	}
872 
873 	if (!(listener->socket->epoll & MOS_EPOLLET) &&
874 	    !StreamQueueIsEmpty(listener->acceptq))
875 		AddEpollEvent(mtcp->ep,
876 			      USR_SHADOW_EVENT_QUEUE,
877 			      listener->socket, MOS_EPOLLIN);
878 
879 	TRACE_API("Stream %d accepted.\n", accepted->id);
880 
881 	if (addr && addrlen) {
882 		struct sockaddr_in *addr_in = (struct sockaddr_in *)addr;
883 		addr_in->sin_family = AF_INET;
884 		addr_in->sin_port = accepted->dport;
885 		addr_in->sin_addr.s_addr = accepted->daddr;
886 		*addrlen = sizeof(struct sockaddr_in);
887 	}
888 
889 	return accepted->socket->id;
890 }
891 /*----------------------------------------------------------------------------*/
892 int
893 mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr,
894 		in_addr_t daddr, in_addr_t dport)
895 {
896 	mtcp_manager_t mtcp;
897 	addr_pool_t ap;
898 
899 	mtcp = GetMTCPManager(mctx);
900 	if (!mtcp) {
901 		errno = EACCES;
902 		return -1;
903 	}
904 
905 	if (saddr_base == INADDR_ANY) {
906 		int nif_out;
907 
908 		/* for the INADDR_ANY, find the output interface for the destination
909 		   and set the saddr_base as the ip address of the output interface */
910 		nif_out = GetOutputInterface(daddr);
911 		saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr;
912 	}
913 
914 	ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus,
915 			saddr_base, num_addr, daddr, dport);
916 	if (!ap) {
917 		errno = ENOMEM;
918 		return -1;
919 	}
920 
921 	mtcp->ap = ap;
922 
923 	return 0;
924 }
925 /*----------------------------------------------------------------------------*/
926 int
927 eval_bpf_5tuple(struct sfbpf_program fcode,
928 				in_addr_t saddr, in_port_t sport,
929 				in_addr_t daddr, in_port_t dport) {
930 	uint8_t buf[TOTAL_TCP_HEADER_LEN];
931 	struct ethhdr *ethh;
932 	struct iphdr *iph;
933 	struct tcphdr *tcph;
934 
935 	ethh = (struct ethhdr *)buf;
936 	ethh->h_proto = htons(ETH_P_IP);
937 	iph = (struct iphdr *)(ethh + 1);
938 	iph->ihl = IP_HEADER_LEN >> 2;
939 	iph->version = 4;
940 	iph->tos = 0;
941 	iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN);
942 	iph->id = htons(0);
943 	iph->protocol = IPPROTO_TCP;
944 	iph->saddr = saddr;
945 	iph->daddr = daddr;
946 	iph->check = 0;
947 	tcph = (struct tcphdr *)(iph + 1);
948 	tcph->source = sport;
949 	tcph->dest = dport;
950 
951 	return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr),
952 						 TOTAL_TCP_HEADER_LEN);
953 }
954 /*----------------------------------------------------------------------------*/
955 int
956 mtcp_connect(mctx_t mctx, int sockid,
957 		const struct sockaddr *addr, socklen_t addrlen)
958 {
959 	mtcp_manager_t mtcp;
960 	socket_map_t socket;
961 	tcp_stream *cur_stream;
962 	struct sockaddr_in *addr_in;
963 	in_addr_t dip;
964 	in_port_t dport;
965 	int is_dyn_bound = FALSE;
966 	int ret;
967 	int cnt_match = 0;
968 	struct mon_listener *walk;
969 	struct sfbpf_program fcode;
970 
971 	cur_stream = NULL;
972 	mtcp = GetMTCPManager(mctx);
973 	if (!mtcp) {
974 		errno = EACCES;
975 		return -1;
976 	}
977 
978 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
979 		TRACE_API("Socket id %d out of range.\n", sockid);
980 		errno = EBADF;
981 		return -1;
982 	}
983 
984 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
985 		TRACE_API("Invalid socket id: %d\n", sockid);
986 		errno = EBADF;
987 		return -1;
988 	}
989 
990 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
991 		TRACE_API("Not an end socket. id: %d\n", sockid);
992 		errno = ENOTSOCK;
993 		return -1;
994 	}
995 
996 	if (!addr) {
997 		TRACE_API("Socket %d: empty address!\n", sockid);
998 		errno = EFAULT;
999 		return -1;
1000 	}
1001 
1002 	/* we only allow bind() for AF_INET address */
1003 	if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
1004 		TRACE_API("Socket %d: invalid argument!\n", sockid);
1005 		errno = EAFNOSUPPORT;
1006 		return -1;
1007 	}
1008 
1009 	socket = &mtcp->smap[sockid];
1010 	if (socket->stream) {
1011 		TRACE_API("Socket %d: stream already exist!\n", sockid);
1012 		if (socket->stream->state >= TCP_ST_ESTABLISHED) {
1013 			errno = EISCONN;
1014 		} else {
1015 			errno = EALREADY;
1016 		}
1017 		return -1;
1018 	}
1019 
1020 	addr_in = (struct sockaddr_in *)addr;
1021 	dip = addr_in->sin_addr.s_addr;
1022 	dport = addr_in->sin_port;
1023 
1024 	/* address binding */
1025 	if (socket->opts & MTCP_ADDR_BIND &&
1026 	    socket->saddr.sin_port != INPORT_ANY &&
1027 	    socket->saddr.sin_addr.s_addr != INADDR_ANY) {
1028 		int rss_core;
1029 
1030 		rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip,
1031 				socket->saddr.sin_port, dport, num_queues);
1032 
1033 		if (rss_core != mctx->cpu) {
1034 			errno = EINVAL;
1035 			return -1;
1036 		}
1037 	} else {
1038 		if (mtcp->ap) {
1039 			ret = FetchAddress(mtcp->ap,
1040 					mctx->cpu, num_queues, addr_in, &socket->saddr);
1041 		} else {
1042 			ret = FetchAddress(ap[GetOutputInterface(dip)],
1043 					   mctx->cpu, num_queues, addr_in, &socket->saddr);
1044 		}
1045 		if (ret < 0) {
1046 			errno = EAGAIN;
1047 			return -1;
1048 		}
1049 		socket->opts |= MTCP_ADDR_BIND;
1050 		is_dyn_bound = TRUE;
1051 	}
1052 
1053 	cnt_match = 0;
1054 	if (mtcp->num_msp > 0) {
1055 		TAILQ_FOREACH(walk, &mtcp->monitors, link) {
1056 			fcode = walk->stream_syn_fcode;
1057 			if (!(ISSET_BPFFILTER(fcode) &&
1058 				  eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr,
1059 								  socket->saddr.sin_port,
1060 								  dip, dport) == 0)) {
1061 				walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1
1062 				cnt_match++;
1063 			}
1064 		}
1065 	}
1066 
1067 	if (mtcp->num_msp > 0 && cnt_match > 0) {
1068 		/* 150820 dhkim: XXX: embedded mode is not verified */
1069 #if 1
1070 		cur_stream = CreateClientTCPStream(mtcp, socket,
1071 						 STREAM_TYPE(MOS_SOCK_STREAM) |
1072 						 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
1073 						 socket->saddr.sin_addr.s_addr,
1074 						 socket->saddr.sin_port, dip, dport, NULL);
1075 #else
1076 		cur_stream = CreateDualTCPStream(mtcp, socket,
1077 						 STREAM_TYPE(MOS_SOCK_STREAM) |
1078 						 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
1079 						 socket->saddr.sin_addr.s_addr,
1080 						 socket->saddr.sin_port, dip, dport, NULL);
1081 #endif
1082 	}
1083 	else
1084 		cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM),
1085 					     socket->saddr.sin_addr.s_addr,
1086 					     socket->saddr.sin_port, dip, dport, NULL);
1087 	if (!cur_stream) {
1088 		TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid);
1089 		errno = ENOMEM;
1090 		return -1;
1091 	}
1092 
1093 	if (is_dyn_bound)
1094 		cur_stream->is_bound_addr = TRUE;
1095 	cur_stream->sndvar->cwnd = 1;
1096 	cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10;
1097 	cur_stream->side = MOS_SIDE_CLI;
1098 	/* if monitor is enabled, update the pair stream side as well */
1099 	if (cur_stream->pair_stream) {
1100 		cur_stream->pair_stream->side = MOS_SIDE_SVR;
1101 		/*
1102 		 * if buffer management is off, then disable
1103 		 * monitoring tcp ring of server...
1104 		 * if there is even a single monitor asking for
1105 		 * buffer management, enable it (that's why the
1106 		 * need for the loop)
1107 		 */
1108 		cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF;
1109 		struct socket_map *walk;
1110 		SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
1111 			uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt;
1112 			if (bm > cur_stream->pair_stream->buffer_mgmt) {
1113 				cur_stream->pair_stream->buffer_mgmt = bm;
1114 				break;
1115 			}
1116 		} SOCKQ_FOREACH_END;
1117 	}
1118 
1119 	cur_stream->state = TCP_ST_SYN_SENT;
1120 	cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1121 
1122 	TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id);
1123 
1124 	SQ_LOCK(&mtcp->ctx->connect_lock);
1125 	ret = StreamEnqueue(mtcp->connectq, cur_stream);
1126 	SQ_UNLOCK(&mtcp->ctx->connect_lock);
1127 	mtcp->wakeup_flag = TRUE;
1128 	if (ret < 0) {
1129 		TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid);
1130 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1131 		StreamEnqueue(mtcp->destroyq, cur_stream);
1132 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1133 		errno = EAGAIN;
1134 		return -1;
1135 	}
1136 
1137 	/* if nonblocking socket, return EINPROGRESS */
1138 	if (socket->opts & MTCP_NONBLOCK) {
1139 		errno = EINPROGRESS;
1140 		return -1;
1141 
1142 	} else {
1143 		while (1) {
1144 			if (!cur_stream) {
1145 				TRACE_ERROR("STREAM DESTROYED\n");
1146 				errno = ETIMEDOUT;
1147 				return -1;
1148 			}
1149 			if (cur_stream->state > TCP_ST_ESTABLISHED) {
1150 				TRACE_ERROR("Socket %d: weird state %s\n",
1151 						sockid, TCPStateToString(cur_stream));
1152 				// TODO: how to handle this?
1153 				errno = ENOSYS;
1154 				return -1;
1155 			}
1156 
1157 			if (cur_stream->state == TCP_ST_ESTABLISHED) {
1158 				break;
1159 			}
1160 			usleep(1000);
1161 		}
1162 	}
1163 
1164 	return 0;
1165 }
1166 /*----------------------------------------------------------------------------*/
1167 static inline int
1168 CloseStreamSocket(mctx_t mctx, int sockid)
1169 {
1170 	mtcp_manager_t mtcp;
1171 	tcp_stream *cur_stream;
1172 	int ret;
1173 
1174 	mtcp = GetMTCPManager(mctx);
1175 	if (!mtcp) {
1176 		errno = EACCES;
1177 		return -1;
1178 	}
1179 
1180 	cur_stream = mtcp->smap[sockid].stream;
1181 	if (!cur_stream) {
1182 		TRACE_API("Socket %d: stream does not exist.\n", sockid);
1183 		errno = ENOTCONN;
1184 		return -1;
1185 	}
1186 
1187 	if (cur_stream->closed) {
1188 		TRACE_API("Socket %d (Stream %u): already closed stream\n",
1189 				sockid, cur_stream->id);
1190 		return 0;
1191 	}
1192 	cur_stream->closed = TRUE;
1193 
1194 	TRACE_API("Stream %d: closing the stream.\n", cur_stream->id);
1195 
1196 	/* 141029 dhkim: Check this! */
1197 	cur_stream->socket = NULL;
1198 
1199 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
1200 		TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n",
1201 				cur_stream->id);
1202 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1203 		StreamEnqueue(mtcp->destroyq, cur_stream);
1204 		mtcp->wakeup_flag = TRUE;
1205 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1206 		return 0;
1207 
1208 	} else if (cur_stream->state == TCP_ST_SYN_SENT) {
1209 #if 1
1210 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1211 		StreamEnqueue(mtcp->destroyq, cur_stream);
1212 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1213 		mtcp->wakeup_flag = TRUE;
1214 #endif
1215 		return -1;
1216 
1217 	} else if (cur_stream->state != TCP_ST_ESTABLISHED &&
1218 			cur_stream->state != TCP_ST_CLOSE_WAIT) {
1219 		TRACE_API("Stream %d at state %s\n",
1220 				cur_stream->id, TCPStateToString(cur_stream));
1221 		errno = EBADF;
1222 		return -1;
1223 	}
1224 
1225 	SQ_LOCK(&mtcp->ctx->close_lock);
1226 	cur_stream->sndvar->on_closeq = TRUE;
1227 	ret = StreamEnqueue(mtcp->closeq, cur_stream);
1228 	mtcp->wakeup_flag = TRUE;
1229 	SQ_UNLOCK(&mtcp->ctx->close_lock);
1230 
1231 	if (ret < 0) {
1232 		TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
1233 		errno = EAGAIN;
1234 		return -1;
1235 	}
1236 
1237 	return 0;
1238 }
1239 /*----------------------------------------------------------------------------*/
1240 static inline int
1241 CloseListeningSocket(mctx_t mctx, int sockid)
1242 {
1243 	mtcp_manager_t mtcp;
1244 	struct tcp_listener *listener;
1245 
1246 	mtcp = GetMTCPManager(mctx);
1247 	if (!mtcp) {
1248 		errno = EACCES;
1249 		return -1;
1250 	}
1251 
1252 	listener = mtcp->smap[sockid].listener;
1253 	if (!listener) {
1254 		errno = EINVAL;
1255 		return -1;
1256 	}
1257 
1258 	if (listener->acceptq) {
1259 		DestroyStreamQueue(listener->acceptq);
1260 		listener->acceptq = NULL;
1261 	}
1262 
1263 	pthread_mutex_lock(&listener->accept_lock);
1264 	pthread_cond_signal(&listener->accept_cond);
1265 	pthread_mutex_unlock(&listener->accept_lock);
1266 
1267 	pthread_cond_destroy(&listener->accept_cond);
1268 	pthread_mutex_destroy(&listener->accept_lock);
1269 
1270 	free(listener);
1271 	mtcp->smap[sockid].listener = NULL;
1272 
1273 	return 0;
1274 }
1275 /*----------------------------------------------------------------------------*/
1276 int
1277 mtcp_close(mctx_t mctx, int sockid)
1278 {
1279 	mtcp_manager_t mtcp;
1280 	int ret;
1281 
1282 	mtcp = GetMTCPManager(mctx);
1283 	if (!mtcp) {
1284 		errno = EACCES;
1285 		return -1;
1286 	}
1287 
1288 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1289 		TRACE_API("Socket id %d out of range.\n", sockid);
1290 		errno = EBADF;
1291 		return -1;
1292 	}
1293 
1294 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1295 		TRACE_API("Invalid socket id: %d\n", sockid);
1296 		errno = EBADF;
1297 		return -1;
1298 	}
1299 
1300 	TRACE_API("Socket %d: mtcp_close called.\n", sockid);
1301 
1302 	switch (mtcp->smap[sockid].socktype) {
1303 	case MOS_SOCK_STREAM:
1304 		ret = CloseStreamSocket(mctx, sockid);
1305 		break;
1306 
1307 	case MOS_SOCK_STREAM_LISTEN:
1308 		ret = CloseListeningSocket(mctx, sockid);
1309 		break;
1310 
1311 	case MOS_SOCK_EPOLL:
1312 		ret = CloseEpollSocket(mctx, sockid);
1313 		break;
1314 
1315 	case MOS_SOCK_PIPE:
1316 		ret = PipeClose(mctx, sockid);
1317 		break;
1318 
1319 	default:
1320 		errno = EINVAL;
1321 		ret = -1;
1322 		break;
1323 	}
1324 
1325 	FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
1326 
1327 	return ret;
1328 }
1329 /*----------------------------------------------------------------------------*/
1330 int
1331 mtcp_abort(mctx_t mctx, int sockid)
1332 {
1333 	mtcp_manager_t mtcp;
1334 	tcp_stream *cur_stream;
1335 	int ret;
1336 
1337 	mtcp = GetMTCPManager(mctx);
1338 	if (!mtcp) {
1339 		errno = EACCES;
1340 		return -1;
1341 	}
1342 
1343 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1344 		TRACE_API("Socket id %d out of range.\n", sockid);
1345 		errno = EBADF;
1346 		return -1;
1347 	}
1348 
1349 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1350 		TRACE_API("Invalid socket id: %d\n", sockid);
1351 		errno = EBADF;
1352 		return -1;
1353 	}
1354 
1355 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
1356 		TRACE_API("Not an end socket. id: %d\n", sockid);
1357 		errno = ENOTSOCK;
1358 		return -1;
1359 	}
1360 
1361 	cur_stream = mtcp->smap[sockid].stream;
1362 	if (!cur_stream) {
1363 		TRACE_API("Stream %d: does not exist.\n", sockid);
1364 		errno = ENOTCONN;
1365 		return -1;
1366 	}
1367 
1368 	TRACE_API("Socket %d: mtcp_abort()\n", sockid);
1369 
1370 	FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
1371 	cur_stream->socket = NULL;
1372 
1373 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
1374 		TRACE_API("Stream %d: connection already reset.\n", sockid);
1375 		return ERROR;
1376 
1377 	} else if (cur_stream->state == TCP_ST_SYN_SENT) {
1378 		/* TODO: this should notify event failure to all
1379 		   previous read() or write() calls */
1380 		cur_stream->state = TCP_ST_CLOSED_RSVD;
1381 		cur_stream->close_reason = TCP_ACTIVE_CLOSE;
1382 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1383 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1384 		StreamEnqueue(mtcp->destroyq, cur_stream);
1385 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1386 		mtcp->wakeup_flag = TRUE;
1387 		return 0;
1388 
1389 	} else if (cur_stream->state == TCP_ST_CLOSING ||
1390 			cur_stream->state == TCP_ST_LAST_ACK ||
1391 			cur_stream->state == TCP_ST_TIME_WAIT) {
1392 		cur_stream->state = TCP_ST_CLOSED_RSVD;
1393 		cur_stream->close_reason = TCP_ACTIVE_CLOSE;
1394 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1395 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1396 		StreamEnqueue(mtcp->destroyq, cur_stream);
1397 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1398 		mtcp->wakeup_flag = TRUE;
1399 		return 0;
1400 	}
1401 
1402 	/* the stream structure will be destroyed after sending RST */
1403 	if (cur_stream->sndvar->on_resetq) {
1404 		TRACE_ERROR("Stream %d: calling mtcp_abort() "
1405 				"when in reset queue.\n", sockid);
1406 		errno = ECONNRESET;
1407 		return -1;
1408 	}
1409 	SQ_LOCK(&mtcp->ctx->reset_lock);
1410 	cur_stream->sndvar->on_resetq = TRUE;
1411 	ret = StreamEnqueue(mtcp->resetq, cur_stream);
1412 	SQ_UNLOCK(&mtcp->ctx->reset_lock);
1413 	mtcp->wakeup_flag = TRUE;
1414 
1415 	if (ret < 0) {
1416 		TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
1417 		errno = EAGAIN;
1418 		return -1;
1419 	}
1420 
1421 	return 0;
1422 }
1423 /*----------------------------------------------------------------------------*/
1424 static inline int
1425 CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len)
1426 {
1427 	struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
1428 	int copylen;
1429 	tcprb_t *rb = rcvvar->rcvbuf;
1430 	if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) {
1431 		errno = EAGAIN;
1432 		return -1;
1433 	}
1434 	tcprb_setpile(rb, rb->pile + copylen);
1435 
1436 	rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb);
1437 	//printf("rcv_wnd: %d\n", rcvvar->rcv_wnd);
1438 
1439 	/* Advertise newly freed receive buffer */
1440 	if (cur_stream->need_wnd_adv) {
1441 		if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) {
1442 			if (!cur_stream->sndvar->on_ackq) {
1443 				SQ_LOCK(&mtcp->ctx->ackq_lock);
1444 				cur_stream->sndvar->on_ackq = TRUE;
1445 				StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */
1446 				SQ_UNLOCK(&mtcp->ctx->ackq_lock);
1447 				cur_stream->need_wnd_adv = FALSE;
1448 				mtcp->wakeup_flag = TRUE;
1449 			}
1450 		}
1451 	}
1452 
1453 	return copylen;
1454 }
1455 /*----------------------------------------------------------------------------*/
1456 ssize_t
1457 mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len)
1458 {
1459 	mtcp_manager_t mtcp;
1460 	socket_map_t socket;
1461 	tcp_stream *cur_stream;
1462 	struct tcp_recv_vars *rcvvar;
1463 	int event_remaining, merged_len;
1464 	int ret;
1465 
1466 	mtcp = GetMTCPManager(mctx);
1467 	if (!mtcp) {
1468 		errno = EACCES;
1469 		return -1;
1470 	}
1471 
1472 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1473 		TRACE_API("Socket id %d out of range.\n", sockid);
1474 		errno = EBADF;
1475 		return -1;
1476 	}
1477 
1478 	socket = &mtcp->smap[sockid];
1479 	if (socket->socktype == MOS_SOCK_UNUSED) {
1480 		TRACE_API("Invalid socket id: %d\n", sockid);
1481 		errno = EBADF;
1482 		return -1;
1483 	}
1484 
1485 	if (socket->socktype == MOS_SOCK_PIPE) {
1486 		return PipeRead(mctx, sockid, buf, len);
1487 	}
1488 
1489 	if (socket->socktype != MOS_SOCK_STREAM) {
1490 		TRACE_API("Not an end socket. id: %d\n", sockid);
1491 		errno = ENOTSOCK;
1492 		return -1;
1493 	}
1494 
1495 	/* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
1496 	cur_stream = socket->stream;
1497 	if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf ||
1498 			!(cur_stream->state >= TCP_ST_ESTABLISHED &&
1499 			cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
1500 		errno = ENOTCONN;
1501 		return -1;
1502 	}
1503 
1504 	rcvvar = cur_stream->rcvvar;
1505 
1506 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1507 
1508 	/* if CLOSE_WAIT, return 0 if there is no payload */
1509 	if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
1510 		if (!rcvvar->rcvbuf)
1511 			return 0;
1512 
1513 		if (merged_len == 0)
1514 			return 0;
1515 	}
1516 
1517 	/* return EAGAIN if no receive buffer */
1518 	if (socket->opts & MTCP_NONBLOCK) {
1519 		if (!rcvvar->rcvbuf || merged_len == 0) {
1520 			errno = EAGAIN;
1521 			return -1;
1522 		}
1523 	}
1524 
1525 	SBUF_LOCK(&rcvvar->read_lock);
1526 
1527 	ret = CopyToUser(mtcp, cur_stream, buf, len);
1528 
1529 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1530 	event_remaining = FALSE;
1531 	/* if there are remaining payload, generate EPOLLIN */
1532 	/* (may due to insufficient user buffer) */
1533 	if (socket->epoll & MOS_EPOLLIN) {
1534 		if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
1535 			event_remaining = TRUE;
1536 		}
1537 	}
1538 	/* if waiting for close, notify it if no remaining data */
1539 	if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
1540 			merged_len == 0 && ret > 0) {
1541 		event_remaining = TRUE;
1542 	}
1543 
1544 	SBUF_UNLOCK(&rcvvar->read_lock);
1545 
1546 	if (event_remaining) {
1547 		if (socket->epoll) {
1548 			AddEpollEvent(mtcp->ep,
1549 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
1550 		}
1551 	}
1552 
1553 	TRACE_API("Stream %d: mtcp_read() returning %d\n", cur_stream->id, ret);
1554 	return ret;
1555 }
1556 /*----------------------------------------------------------------------------*/
1557 ssize_t
1558 mtcp_readv(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV)
1559 {
1560 	mtcp_manager_t mtcp;
1561 	socket_map_t socket;
1562 	tcp_stream *cur_stream;
1563 	struct tcp_recv_vars *rcvvar;
1564 	int ret, bytes_read, i;
1565 	int event_remaining, merged_len;
1566 
1567 	mtcp = GetMTCPManager(mctx);
1568 	if (!mtcp) {
1569 		errno = EACCES;
1570 		return -1;
1571 	}
1572 
1573 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1574 		TRACE_API("Socket id %d out of range.\n", sockid);
1575 		errno = EBADF;
1576 		return -1;
1577 	}
1578 
1579 	socket = &mtcp->smap[sockid];
1580 	if (socket->socktype == MOS_SOCK_UNUSED) {
1581 		TRACE_API("Invalid socket id: %d\n", sockid);
1582 		errno = EBADF;
1583 		return -1;
1584 	}
1585 
1586 	if (socket->socktype != MOS_SOCK_STREAM) {
1587 		TRACE_API("Not an end socket. id: %d\n", sockid);
1588 		errno = ENOTSOCK;
1589 		return -1;
1590 	}
1591 
1592 	/* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
1593 	cur_stream = socket->stream;
1594 	if (!cur_stream ||
1595 			!(cur_stream->state >= TCP_ST_ESTABLISHED &&
1596 			  cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
1597 		errno = ENOTCONN;
1598 		return -1;
1599 	}
1600 
1601 	rcvvar = cur_stream->rcvvar;
1602 
1603 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1604 
1605 	/* if CLOSE_WAIT, return 0 if there is no payload */
1606 	if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
1607 		if (!rcvvar->rcvbuf)
1608 			return 0;
1609 
1610 		if (merged_len == 0)
1611 			return 0;
1612 	}
1613 
1614 	/* return EAGAIN if no receive buffer */
1615 	if (socket->opts & MTCP_NONBLOCK) {
1616 		if (!rcvvar->rcvbuf || merged_len == 0) {
1617 			errno = EAGAIN;
1618 			return -1;
1619 		}
1620 	}
1621 
1622 	SBUF_LOCK(&rcvvar->read_lock);
1623 
1624 	/* read and store the contents to the vectored buffers */
1625 	bytes_read = 0;
1626 	for (i = 0; i < numIOV; i++) {
1627 		if (iov[i].iov_len <= 0)
1628 			continue;
1629 
1630 		ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
1631 		if (ret <= 0)
1632 			break;
1633 
1634 		bytes_read += ret;
1635 
1636 		if (ret < iov[i].iov_len)
1637 			break;
1638 	}
1639 
1640 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1641 
1642 	event_remaining = FALSE;
1643 	/* if there are remaining payload, generate read event */
1644 	/* (may due to insufficient user buffer) */
1645 	if (socket->epoll & MOS_EPOLLIN) {
1646 		if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
1647 			event_remaining = TRUE;
1648 		}
1649 	}
1650 	/* if waiting for close, notify it if no remaining data */
1651 	if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
1652 			merged_len == 0 && bytes_read > 0) {
1653 		event_remaining = TRUE;
1654 	}
1655 
1656 	SBUF_UNLOCK(&rcvvar->read_lock);
1657 
1658 	if(event_remaining) {
1659 		if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) {
1660 			AddEpollEvent(mtcp->ep,
1661 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
1662 		}
1663 	}
1664 
1665 	TRACE_API("Stream %d: mtcp_readv() returning %d\n",
1666 			cur_stream->id, bytes_read);
1667 	return bytes_read;
1668 }
1669 /*----------------------------------------------------------------------------*/
1670 static inline int
1671 CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, const char *buf, int len)
1672 {
1673 	struct tcp_send_vars *sndvar = cur_stream->sndvar;
1674 	int sndlen;
1675 	int ret;
1676 
1677 	sndlen = MIN((int)sndvar->snd_wnd, len);
1678 	if (sndlen <= 0) {
1679 		errno = EAGAIN;
1680 		return -1;
1681 	}
1682 
1683 	/* allocate send buffer if not exist */
1684 	if (!sndvar->sndbuf) {
1685 		sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1);
1686 		if (!sndvar->sndbuf) {
1687 			cur_stream->close_reason = TCP_NO_MEM;
1688 			/* notification may not required due to -1 return */
1689 			errno = ENOMEM;
1690 			return -1;
1691 		}
1692 	}
1693 
1694 	ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen);
1695 	assert(ret == sndlen);
1696 	sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len;
1697 	if (ret <= 0) {
1698 		TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n",
1699 				ret, sndlen, sndvar->sndbuf->len);
1700 		errno = EAGAIN;
1701 		return -1;
1702 	}
1703 
1704 	if (sndvar->snd_wnd <= 0) {
1705 		TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n",
1706 				cur_stream->id, sndvar->snd_wnd);
1707 	}
1708 
1709 	return ret;
1710 }
1711 /*----------------------------------------------------------------------------*/
1712 ssize_t
1713 mtcp_write(mctx_t mctx, int sockid, const char *buf, size_t len)
1714 {
1715 	mtcp_manager_t mtcp;
1716 	socket_map_t socket;
1717 	tcp_stream *cur_stream;
1718 	struct tcp_send_vars *sndvar;
1719 	int ret;
1720 
1721 	mtcp = GetMTCPManager(mctx);
1722 	if (!mtcp) {
1723 		errno = EACCES;
1724 		return -1;
1725 	}
1726 
1727 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1728 		TRACE_API("Socket id %d out of range.\n", sockid);
1729 		errno = EBADF;
1730 		return -1;
1731 	}
1732 
1733 	socket = &mtcp->smap[sockid];
1734 	if (socket->socktype == MOS_SOCK_UNUSED) {
1735 		TRACE_API("Invalid socket id: %d\n", sockid);
1736 		errno = EBADF;
1737 		return -1;
1738 	}
1739 
1740 	if (socket->socktype == MOS_SOCK_PIPE) {
1741 		return PipeWrite(mctx, sockid, buf, len);
1742 	}
1743 
1744 	if (socket->socktype != MOS_SOCK_STREAM) {
1745 		TRACE_API("Not an end socket. id: %d\n", sockid);
1746 		errno = ENOTSOCK;
1747 		return -1;
1748 	}
1749 
1750 	cur_stream = socket->stream;
1751 	if (!cur_stream ||
1752 			!(cur_stream->state == TCP_ST_ESTABLISHED ||
1753 			  cur_stream->state == TCP_ST_CLOSE_WAIT)) {
1754 		errno = ENOTCONN;
1755 		return -1;
1756 	}
1757 
1758 	if (len <= 0) {
1759 		if (socket->opts & MTCP_NONBLOCK) {
1760 			errno = EAGAIN;
1761 			return -1;
1762 		} else {
1763 			return 0;
1764 		}
1765 	}
1766 
1767 	sndvar = cur_stream->sndvar;
1768 
1769 	SBUF_LOCK(&sndvar->write_lock);
1770 	ret = CopyFromUser(mtcp, cur_stream, buf, len);
1771 
1772 	SBUF_UNLOCK(&sndvar->write_lock);
1773 
1774 	if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
1775 		SQ_LOCK(&mtcp->ctx->sendq_lock);
1776 		sndvar->on_sendq = TRUE;
1777 		StreamEnqueue(mtcp->sendq, cur_stream);		/* this always success */
1778 		SQ_UNLOCK(&mtcp->ctx->sendq_lock);
1779 		mtcp->wakeup_flag = TRUE;
1780 	}
1781 
1782 	if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) {
1783 		ret = -1;
1784 		errno = EAGAIN;
1785 	}
1786 
1787 	/* if there are remaining sending buffer, generate write event */
1788 	if (sndvar->snd_wnd > 0) {
1789 		if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
1790 			AddEpollEvent(mtcp->ep,
1791 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
1792 		}
1793 	}
1794 
1795 	TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret);
1796 	return ret;
1797 }
1798 /*----------------------------------------------------------------------------*/
1799 ssize_t
1800 mtcp_writev(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV)
1801 {
1802 	mtcp_manager_t mtcp;
1803 	socket_map_t socket;
1804 	tcp_stream *cur_stream;
1805 	struct tcp_send_vars *sndvar;
1806 	int ret, to_write, i;
1807 
1808 	mtcp = GetMTCPManager(mctx);
1809 	if (!mtcp) {
1810 		errno = EACCES;
1811 		return -1;
1812 	}
1813 
1814 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1815 		TRACE_API("Socket id %d out of range.\n", sockid);
1816 		errno = EBADF;
1817 		return -1;
1818 	}
1819 
1820 	socket = &mtcp->smap[sockid];
1821 	if (socket->socktype == MOS_SOCK_UNUSED) {
1822 		TRACE_API("Invalid socket id: %d\n", sockid);
1823 		errno = EBADF;
1824 		return -1;
1825 	}
1826 
1827 	if (socket->socktype != MOS_SOCK_STREAM) {
1828 		TRACE_API("Not an end socket. id: %d\n", sockid);
1829 		errno = ENOTSOCK;
1830 		return -1;
1831 	}
1832 
1833 	cur_stream = socket->stream;
1834 	if (!cur_stream ||
1835 			!(cur_stream->state == TCP_ST_ESTABLISHED ||
1836 			  cur_stream->state == TCP_ST_CLOSE_WAIT)) {
1837 		errno = ENOTCONN;
1838 		return -1;
1839 	}
1840 
1841 	sndvar = cur_stream->sndvar;
1842 	SBUF_LOCK(&sndvar->write_lock);
1843 
1844 	/* write from the vectored buffers */
1845 	to_write = 0;
1846 	for (i = 0; i < numIOV; i++) {
1847 		if (iov[i].iov_len <= 0)
1848 			continue;
1849 
1850 		ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
1851 		if (ret <= 0)
1852 			break;
1853 
1854 		to_write += ret;
1855 
1856 		if (ret < iov[i].iov_len)
1857 			break;
1858 	}
1859 	SBUF_UNLOCK(&sndvar->write_lock);
1860 
1861 	if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
1862 		SQ_LOCK(&mtcp->ctx->sendq_lock);
1863 		sndvar->on_sendq = TRUE;
1864 		StreamEnqueue(mtcp->sendq, cur_stream);		/* this always success */
1865 		SQ_UNLOCK(&mtcp->ctx->sendq_lock);
1866 		mtcp->wakeup_flag = TRUE;
1867 	}
1868 
1869 	if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) {
1870 		to_write = -1;
1871 		errno = EAGAIN;
1872 	}
1873 
1874 	/* if there are remaining sending buffer, generate write event */
1875 	if (sndvar->snd_wnd > 0) {
1876 		if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
1877 			AddEpollEvent(mtcp->ep,
1878 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
1879 		}
1880 	}
1881 
1882 	TRACE_API("Stream %d: mtcp_writev() returning %d\n",
1883 			cur_stream->id, to_write);
1884 	return to_write;
1885 }
1886 /*----------------------------------------------------------------------------*/
1887 uint32_t
1888 mtcp_get_connection_cnt(mctx_t mctx)
1889 {
1890 	mtcp_manager_t mtcp;
1891 	mtcp = GetMTCPManager(mctx);
1892 	if (!mtcp) {
1893 		errno = EACCES;
1894 		return -1;
1895 	}
1896 
1897 	if (mtcp->num_msp > 0)
1898 		return mtcp->flow_cnt / 2;
1899 	else
1900 		return mtcp->flow_cnt;
1901 }
1902 /*----------------------------------------------------------------------------*/
1903