xref: /mOS-networking-stack/core/src/api.c (revision a834ea89)
1 #include <sys/queue.h>
2 #include <sys/ioctl.h>
3 #include <limits.h>
4 #include <unistd.h>
5 #include <assert.h>
6 #include <string.h>
7 
8 #ifdef DARWIN
9 #include <netinet/tcp.h>
10 #include <netinet/ip.h>
11 #include <netinet/if_ether.h>
12 #endif
13 
14 #include "mtcp.h"
15 #include "mtcp_api.h"
16 #include "tcp_in.h"
17 #include "tcp_stream.h"
18 #include "tcp_out.h"
19 #include "ip_out.h"
20 #include "eventpoll.h"
21 #include "pipe.h"
22 #include "fhash.h"
23 #include "addr_pool.h"
24 #include "rss.h"
25 #include "config.h"
26 #include "debug.h"
27 #include "eventpoll.h"
28 #include "mos_api.h"
29 #include "tcp_rb.h"
30 
31 #define MAX(a, b) ((a)>(b)?(a):(b))
32 #define MIN(a, b) ((a)<(b)?(a):(b))
33 
34 /*----------------------------------------------------------------------------*/
35 /** Stop monitoring the socket! (function prototype)
36  * @param [in] mctx: mtcp context
37  * @param [in] sock: monitoring stream socket id
38  * @param [in] side: side of monitoring (client side, server side or both)
39  *
40  * This function is now DEPRECATED and is only used within mOS core...
41  */
42 int
43 mtcp_cb_stop(mctx_t mctx, int sock, int side);
44 /*----------------------------------------------------------------------------*/
45 /** Reset the connection (send RST packets to both sides)
46  *  (We need to decide the API for this.)
47  */
48 //int
49 //mtcp_cb_reset(mctx_t mctx, int sock, int side);
50 /*----------------------------------------------------------------------------*/
51 inline mtcp_manager_t
52 GetMTCPManager(mctx_t mctx)
53 {
54 	if (!mctx) {
55 		errno = EACCES;
56 		return NULL;
57 	}
58 
59 	if (mctx->cpu < 0 || mctx->cpu >= num_cpus) {
60 		errno = EINVAL;
61 		return NULL;
62 	}
63 
64 	if (!g_mtcp[mctx->cpu] || g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) {
65 		errno = EPERM;
66 		return NULL;
67 	}
68 
69 	return g_mtcp[mctx->cpu];
70 }
71 /*----------------------------------------------------------------------------*/
72 static inline int
73 GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen)
74 {
75 	tcp_stream *cur_stream;
76 
77 	if (!socket->stream) {
78 		errno = EBADF;
79 		return -1;
80 	}
81 
82 	cur_stream = socket->stream;
83 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
84 		if (cur_stream->close_reason == TCP_TIMEDOUT ||
85 				cur_stream->close_reason == TCP_CONN_FAIL ||
86 				cur_stream->close_reason == TCP_CONN_LOST) {
87 			*(int *)optval = ETIMEDOUT;
88 			*optlen = sizeof(int);
89 
90 			return 0;
91 		}
92 	}
93 
94 	if (cur_stream->state == TCP_ST_CLOSE_WAIT ||
95 			cur_stream->state == TCP_ST_CLOSED_RSVD) {
96 		if (cur_stream->close_reason == TCP_RESET) {
97 			*(int *)optval = ECONNRESET;
98 			*optlen = sizeof(int);
99 
100 			return 0;
101 		}
102 	}
103 
104 	if (cur_stream->state == TCP_ST_SYN_SENT &&
105 	    errno == EINPROGRESS) {
106 		*(int *)optval = errno;
107 		*optlen = sizeof(int);
108 
109 		return -1;
110 	}
111 
112 	/*
113 	 * `base case`: If socket sees no so_error, then
114 	 * this also means close_reason will always be
115 	 * TCP_NOT_CLOSED.
116 	 */
117 	if (cur_stream->close_reason == TCP_NOT_CLOSED) {
118 		*(int *)optval = 0;
119 		*optlen = sizeof(int);
120 
121 		return 0;
122 	}
123 
124 	errno = ENOSYS;
125 	return -1;
126 }
127 /*----------------------------------------------------------------------------*/
128 int
129 mtcp_getsockname(mctx_t mctx, int sockid, struct sockaddr *addr,
130 		 socklen_t *addrlen)
131 {
132 	mtcp_manager_t mtcp;
133 	socket_map_t socket;
134 
135 	mtcp = GetMTCPManager(mctx);
136 	if (!mtcp) {
137 		return -1;
138 	}
139 
140 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
141 		TRACE_API("Socket id %d out of range.\n", sockid);
142 		errno = EBADF;
143 		return -1;
144 	}
145 
146 	socket = &mtcp->smap[sockid];
147 	if (socket->socktype == MOS_SOCK_UNUSED) {
148 		TRACE_API("Invalid socket id: %d\n", sockid);
149 		errno = EBADF;
150 		return -1;
151 	}
152 
153 	if (*addrlen <= 0) {
154 		TRACE_API("Invalid addrlen: %d\n", *addrlen);
155 		errno = EINVAL;
156 		return -1;
157 	}
158 
159 	if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
160 	    socket->socktype != MOS_SOCK_STREAM) {
161 		TRACE_API("Invalid socket id: %d\n", sockid);
162 		errno = ENOTSOCK;
163 		return -1;
164 	}
165 
166 	*(struct sockaddr_in *)addr = socket->saddr;
167         *addrlen = sizeof(socket->saddr);
168 
169 	return 0;
170 }
171 /*----------------------------------------------------------------------------*/
172 int
173 mtcp_getsockopt(mctx_t mctx, int sockid, int level,
174 		int optname, void *optval, socklen_t *optlen)
175 {
176 	mtcp_manager_t mtcp;
177 	socket_map_t socket;
178 
179 	mtcp = GetMTCPManager(mctx);
180 	if (!mtcp) {
181 		errno = EACCES;
182 		return -1;
183 	}
184 
185 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
186 		TRACE_API("Socket id %d out of range.\n", sockid);
187 		errno = EBADF;
188 		return -1;
189 	}
190 
191 	switch (level) {
192 	case SOL_SOCKET:
193 		socket = &mtcp->smap[sockid];
194 		if (socket->socktype == MOS_SOCK_UNUSED) {
195 			TRACE_API("Invalid socket id: %d\n", sockid);
196 			errno = EBADF;
197 			return -1;
198 		}
199 
200 		if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
201 		    socket->socktype != MOS_SOCK_STREAM) {
202 			TRACE_API("Invalid socket id: %d\n", sockid);
203 			errno = ENOTSOCK;
204 			return -1;
205 		}
206 
207 		if (optname == SO_ERROR) {
208 			if (socket->socktype == MOS_SOCK_STREAM) {
209 				return GetSocketError(socket, optval, optlen);
210 			}
211 		}
212 		break;
213 	case SOL_MONSOCKET:
214 		/* check if the calling thread is in MOS context */
215 		if (mtcp->ctx->thread != pthread_self()) {
216 			errno = EPERM;
217 			return -1;
218 		}
219 		/*
220 		 * All options will only work for active
221 		 * monitor stream sockets
222 		 */
223 		socket = &mtcp->msmap[sockid];
224 		if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
225 			TRACE_API("Invalid socket id: %d\n", sockid);
226 			errno = ENOTSOCK;
227 			return -1;
228 		}
229 
230 		switch (optname) {
231 		case MOS_FRAGINFO_CLIBUF:
232 			return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen);
233 		case MOS_FRAGINFO_SVRBUF:
234 			return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen);
235 		case MOS_INFO_CLIBUF:
236 			return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen);
237 		case MOS_INFO_SVRBUF:
238 			return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen);
239 		case MOS_TCP_STATE_CLI:
240 			return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI,
241 							   optval, optlen);
242 		case MOS_TCP_STATE_SVR:
243 			return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR,
244 							   optval, optlen);
245 		case MOS_TIMESTAMP:
246 			return GetLastTimestamp(socket->monitor_stream->stream,
247 						(uint32_t *)optval,
248 						optlen);
249 		default:
250 		  TRACE_API("can't recognize the optname=%d\n", optname);
251 		  assert(0);
252 		}
253 		break;
254 	}
255 	errno = ENOSYS;
256 	return -1;
257 }
258 /*----------------------------------------------------------------------------*/
259 int
260 mtcp_setsockopt(mctx_t mctx, int sockid, int level,
261 		int optname, const void *optval, socklen_t optlen)
262 {
263 	mtcp_manager_t mtcp;
264 	socket_map_t socket;
265 	tcprb_t *rb;
266 
267 	mtcp = GetMTCPManager(mctx);
268 	if (!mtcp) {
269 		errno = EACCES;
270 		return -1;
271 	}
272 
273 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
274 		TRACE_API("Socket id %d out of range.\n", sockid);
275 		errno = EBADF;
276 		return -1;
277 	}
278 
279 	switch (level) {
280 	case SOL_SOCKET:
281 		socket = &mtcp->smap[sockid];
282 		if (socket->socktype == MOS_SOCK_UNUSED) {
283 			TRACE_API("Invalid socket id: %d\n", sockid);
284 			errno = EBADF;
285 			return -1;
286 		}
287 
288 		if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
289 		    socket->socktype != MOS_SOCK_STREAM) {
290 			TRACE_API("Invalid socket id: %d\n", sockid);
291 			errno = ENOTSOCK;
292 			return -1;
293 		}
294 		break;
295 	case SOL_MONSOCKET:
296 		socket = &mtcp->msmap[sockid];
297 		/*
298 		 * checking of calling thread to be in MOS context is
299 		 * disabled since both optnames can be called from
300 		 * `application' context (on passive sockets)
301 		 */
302 		/*
303 		 * if (mtcp->ctx->thread != pthread_self())
304 		 * return -1;
305 		 */
306 
307 		switch (optname) {
308 		case MOS_CLIOVERLAP:
309 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
310 				socket->monitor_stream->stream->rcvvar->rcvbuf :
311 			socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
312 			if (rb == NULL) {
313 				errno = EFAULT;
314 				return -1;
315 			}
316 			if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) {
317 				errno = EINVAL;
318 				return -1;
319 			} else
320 				return 0;
321 			break;
322 		case MOS_SVROVERLAP:
323 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
324 				socket->monitor_stream->stream->rcvvar->rcvbuf :
325 			socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
326 			if (rb == NULL) {
327 				errno = EFAULT;
328 				return -1;
329 			}
330 			if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) {
331 				errno = EINVAL;
332 				return -1;
333 			} else
334 				return 0;
335 			break;
336 		case MOS_CLIBUF:
337 #if 0
338 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
339 				errno = EBADF;
340 				return -1;
341 			}
342 #endif
343 #ifdef DISABLE_DYN_RESIZE
344 			if (*(int *)optval != 0)
345 				return -1;
346 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
347 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
348 					socket->monitor_stream->stream->rcvvar->rcvbuf :
349 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
350 				if (rb) {
351 					tcprb_resize_meta(rb, 0);
352 					tcprb_resize(rb, 0);
353 				}
354 			}
355 			return DisableBuf(socket, MOS_SIDE_CLI);
356 #else
357 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
358 				socket->monitor_stream->stream->rcvvar->rcvbuf :
359 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
360 			if (tcprb_resize_meta(rb, *(int *)optval) < 0)
361 				return -1;
362 			return tcprb_resize(rb,
363 					(((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
364 #endif
365 		case MOS_SVRBUF:
366 #if 0
367 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
368 				errno = EBADF;
369 				return -1;
370 			}
371 #endif
372 #ifdef DISABLE_DYN_RESIZE
373 			if (*(int *)optval != 0)
374 				return -1;
375 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
376 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
377 					socket->monitor_stream->stream->rcvvar->rcvbuf :
378 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
379 				if (rb) {
380 					tcprb_resize_meta(rb, 0);
381 					tcprb_resize(rb, 0);
382 				}
383 			}
384 			return DisableBuf(socket, MOS_SIDE_SVR);
385 #else
386 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
387 				socket->monitor_stream->stream->rcvvar->rcvbuf :
388 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
389 			if (tcprb_resize_meta(rb, *(int *)optval) < 0)
390 				return -1;
391 			return tcprb_resize(rb,
392 					(((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
393 #endif
394 		case MOS_FRAG_CLIBUF:
395 #if 0
396 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
397 				errno = EBADF;
398 				return -1;
399 			}
400 #endif
401 #ifdef DISABLE_DYN_RESIZE
402 			if (*(int *)optval != 0)
403 				return -1;
404 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
405 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
406 					socket->monitor_stream->stream->rcvvar->rcvbuf :
407 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
408 				if (rb)
409 					tcprb_resize(rb, 0);
410 			}
411 			return 0;
412 #else
413 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
414 				socket->monitor_stream->stream->rcvvar->rcvbuf :
415 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
416 			if (rb->len == 0)
417 				return tcprb_resize_meta(rb, *(int *)optval);
418 			else
419 				return -1;
420 #endif
421 		case MOS_FRAG_SVRBUF:
422 #if 0
423 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
424 				errno = EBADF;
425 				return -1;
426 			}
427 #endif
428 #ifdef DISABLE_DYN_RESIZE
429 			if (*(int *)optval != 0)
430 				return -1;
431 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
432 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
433 					socket->monitor_stream->stream->rcvvar->rcvbuf :
434 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
435 				if (rb)
436 					tcprb_resize(rb, 0);
437 			}
438 			return 0;
439 #else
440 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
441 				socket->monitor_stream->stream->rcvvar->rcvbuf :
442 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
443 			if (rb->len == 0)
444 				return tcprb_resize_meta(rb, *(int *)optval);
445 			else
446 				return -1;
447 #endif
448 		case MOS_MONLEVEL:
449 #ifdef OLD_API
450 			assert(*(int *)optval == MOS_NO_CLIBUF ||
451 			       *(int *)optval == MOS_NO_SVRBUF);
452 			return DisableBuf(socket,
453 					  (*(int *)optval == MOS_NO_CLIBUF) ?
454 					  MOS_SIDE_CLI : MOS_SIDE_SVR);
455 #endif
456 		case MOS_SEQ_REMAP:
457 			return TcpSeqChange(socket,
458 					    (uint32_t)((seq_remap_info *)optval)->seq_off,
459 					    ((seq_remap_info *)optval)->side,
460 					    mtcp->pctx->p.seq);
461 		case MOS_STOP_MON:
462 			return mtcp_cb_stop(mctx, sockid, *(int *)optval);
463 		default:
464 			TRACE_API("invalid optname=%d\n", optname);
465 			assert(0);
466 		}
467 		break;
468 	}
469 
470 	errno = ENOSYS;
471 	return -1;
472 }
473 /*----------------------------------------------------------------------------*/
474 int
475 mtcp_setsock_nonblock(mctx_t mctx, int sockid)
476 {
477 	mtcp_manager_t mtcp;
478 
479 	mtcp = GetMTCPManager(mctx);
480 	if (!mtcp) {
481 		errno = EACCES;
482 		return -1;
483 	}
484 
485 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
486 		TRACE_API("Socket id %d out of range.\n", sockid);
487 		errno = EBADF;
488 		return -1;
489 	}
490 
491 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
492 		TRACE_API("Invalid socket id: %d\n", sockid);
493 		errno = EBADF;
494 		return -1;
495 	}
496 
497 	mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
498 
499 	return 0;
500 }
501 /*----------------------------------------------------------------------------*/
502 int
503 mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp)
504 {
505 	mtcp_manager_t mtcp;
506 	socket_map_t socket;
507 
508 	mtcp = GetMTCPManager(mctx);
509 	if (!mtcp) {
510 		errno = EACCES;
511 		return -1;
512 	}
513 
514 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
515 		TRACE_API("Socket id %d out of range.\n", sockid);
516 		errno = EBADF;
517 		return -1;
518 	}
519 
520 	/* only support stream socket */
521 	socket = &mtcp->smap[sockid];
522 
523 	if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
524 		socket->socktype != MOS_SOCK_STREAM) {
525 		TRACE_API("Invalid socket id: %d\n", sockid);
526 		errno = EBADF;
527 		return -1;
528 	}
529 
530 	if (!argp) {
531 		errno = EFAULT;
532 		return -1;
533 	}
534 
535 	if (request == FIONREAD) {
536 		tcp_stream *cur_stream;
537 		tcprb_t *rbuf;
538 
539 		cur_stream = socket->stream;
540 		if (!cur_stream) {
541 			errno = EBADF;
542 			return -1;
543 		}
544 
545 		rbuf = cur_stream->rcvvar->rcvbuf;
546 		*(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0;
547 
548 	} else if (request == FIONBIO) {
549 		/*
550 		 * sockets can only be set to blocking/non-blocking
551 		 * modes during initialization
552 		 */
553 		if ((*(int *)argp))
554 			mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
555 		else
556 			mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK;
557 	} else {
558 		errno = EINVAL;
559 		return -1;
560 	}
561 
562 	return 0;
563 }
564 /*----------------------------------------------------------------------------*/
565 static int
566 mtcp_monitor(mctx_t mctx, socket_map_t sock)
567 {
568 	mtcp_manager_t mtcp;
569 	struct mon_listener *monitor;
570 	int sockid = sock->id;
571 
572 	mtcp = GetMTCPManager(mctx);
573 	if (!mtcp) {
574 		errno = EACCES;
575 		return -1;
576 	}
577 
578 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
579 		TRACE_API("Socket id %d out of range.\n", sockid);
580 		errno = EBADF;
581 		return -1;
582 	}
583 
584 	if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) {
585 		TRACE_API("Invalid socket id: %d\n", sockid);
586 		errno = EBADF;
587 		return -1;
588 	}
589 
590 	if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM ||
591 	      mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) {
592 		TRACE_API("Not a monitor socket. id: %d\n", sockid);
593 		errno = ENOTSOCK;
594 		return -1;
595 	}
596 
597 	monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener));
598 	if (!monitor) {
599 		/* errno set from the malloc() */
600 		errno = ENOMEM;
601 		return -1;
602 	}
603 
604 	/* create a monitor-specific event queue */
605 	monitor->eq = CreateEventQueue(g_config.mos->max_concurrency);
606 	if (!monitor->eq) {
607 		TRACE_API("Can't create event queue (concurrency: %d) for "
608 			  "monitor read event registrations!\n",
609 			  g_config.mos->max_concurrency);
610 		free(monitor);
611 		errno = ENOMEM;
612 		return -1;
613 	}
614 
615 	/* set monitor-related basic parameters */
616 #ifndef NEWEV
617 	monitor->ude_id = UDE_OFFSET;
618 #endif
619 	monitor->socket = sock;
620 	monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL;
621 
622 	/* perform both sides monitoring by default */
623 	monitor->client_mon = monitor->server_mon = 1;
624 
625 	/* add monitor socket to the monitor list */
626 	TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link);
627 
628 	mtcp->msmap[sockid].monitor_listener = monitor;
629 
630 	return 0;
631 }
632 /*----------------------------------------------------------------------------*/
633 int
634 mtcp_socket(mctx_t mctx, int domain, int type, int protocol)
635 {
636 	mtcp_manager_t mtcp;
637 	socket_map_t socket;
638 
639 	mtcp = GetMTCPManager(mctx);
640 	if (!mtcp) {
641 		errno = EACCES;
642 		return -1;
643 	}
644 
645 	if (domain != AF_INET) {
646 		errno = EAFNOSUPPORT;
647 		return -1;
648 	}
649 
650 	if (type == SOCK_STREAM) {
651 		type = MOS_SOCK_STREAM;
652 	} else if (type == MOS_SOCK_MONITOR_STREAM ||
653 		   type == MOS_SOCK_MONITOR_RAW) {
654 		/* do nothing for the time being */
655 	} else {
656 		/* Not supported type */
657 		errno = EINVAL;
658 		return -1;
659 	}
660 
661 	socket = AllocateSocket(mctx, type);
662 	if (!socket) {
663 		errno = ENFILE;
664 		return -1;
665 	}
666 
667 	if (type == MOS_SOCK_MONITOR_STREAM ||
668 	    type == MOS_SOCK_MONITOR_RAW) {
669 		mtcp_manager_t mtcp = GetMTCPManager(mctx);
670 		if (!mtcp) {
671 			errno = EACCES;
672 			return -1;
673 		}
674 		mtcp_monitor(mctx, socket);
675 #ifdef NEWEV
676 		socket->monitor_listener->stree_dontcare = NULL;
677 		socket->monitor_listener->stree_pre_rcv = NULL;
678 		socket->monitor_listener->stree_post_snd = NULL;
679 #else
680 		InitEvB(mtcp, &socket->monitor_listener->dontcare_evb);
681 		InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb);
682 		InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb);
683 #endif
684 	}
685 
686 	return socket->id;
687 }
688 /*----------------------------------------------------------------------------*/
689 int
690 mtcp_bind(mctx_t mctx, int sockid,
691 		const struct sockaddr *addr, socklen_t addrlen)
692 {
693 	mtcp_manager_t mtcp;
694 	struct sockaddr_in *addr_in;
695 
696 	mtcp = GetMTCPManager(mctx);
697 	if (!mtcp) {
698 		errno = EACCES;
699 		return -1;
700 	}
701 
702 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
703 		TRACE_API("Socket id %d out of range.\n", sockid);
704 		errno = EBADF;
705 		return -1;
706 	}
707 
708 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
709 		TRACE_API("Invalid socket id: %d\n", sockid);
710 		errno = EBADF;
711 		return -1;
712 	}
713 
714 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM &&
715 			mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
716 		TRACE_API("Not a stream socket id: %d\n", sockid);
717 		errno = ENOTSOCK;
718 		return -1;
719 	}
720 
721 	if (!addr) {
722 		TRACE_API("Socket %d: empty address!\n", sockid);
723 		errno = EINVAL;
724 		return -1;
725 	}
726 
727 	if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) {
728 		TRACE_API("Socket %d: adress already bind for this socket.\n", sockid);
729 		errno = EINVAL;
730 		return -1;
731 	}
732 
733 	/* we only allow bind() for AF_INET address */
734 	if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
735 		TRACE_API("Socket %d: invalid argument!\n", sockid);
736 		errno = EINVAL;
737 		return -1;
738 	}
739 
740 	if (mtcp->listener) {
741 		TRACE_API("Address already bound!\n");
742 		errno = EINVAL;
743 		return -1;
744 	}
745 	addr_in = (struct sockaddr_in *)addr;
746 	mtcp->smap[sockid].saddr = *addr_in;
747 	mtcp->smap[sockid].opts |= MTCP_ADDR_BIND;
748 
749 	return 0;
750 }
751 /*----------------------------------------------------------------------------*/
752 int
753 mtcp_listen(mctx_t mctx, int sockid, int backlog)
754 {
755 	mtcp_manager_t mtcp;
756 	struct tcp_listener *listener;
757 
758 	mtcp = GetMTCPManager(mctx);
759 	if (!mtcp) {
760 		errno = EACCES;
761 		return -1;
762 	}
763 
764 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
765 		TRACE_API("Socket id %d out of range.\n", sockid);
766 		errno = EBADF;
767 		return -1;
768 	}
769 
770 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
771 		TRACE_API("Invalid socket id: %d\n", sockid);
772 		errno = EBADF;
773 		return -1;
774 	}
775 
776 	if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) {
777 		mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN;
778 	}
779 
780 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
781 		TRACE_API("Not a listening socket. id: %d\n", sockid);
782 		errno = ENOTSOCK;
783 		return -1;
784 	}
785 
786 	if (backlog <= 0 || backlog > g_config.mos->max_concurrency) {
787 		errno = EINVAL;
788 		return -1;
789 	}
790 
791 	listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener));
792 	if (!listener) {
793 		/* errno set from the malloc() */
794 		errno = ENOMEM;
795 		return -1;
796 	}
797 
798 	listener->sockid = sockid;
799 	listener->backlog = backlog;
800 	listener->socket = &mtcp->smap[sockid];
801 
802 	if (pthread_cond_init(&listener->accept_cond, NULL)) {
803 		perror("pthread_cond_init of ctx->accept_cond\n");
804 		/* errno set by pthread_cond_init() */
805 		return -1;
806 	}
807 	if (pthread_mutex_init(&listener->accept_lock, NULL)) {
808 		perror("pthread_mutex_init of ctx->accept_lock\n");
809 		/* errno set by pthread_mutex_init() */
810 		return -1;
811 	}
812 
813 	listener->acceptq = CreateStreamQueue(backlog);
814 	if (!listener->acceptq) {
815 		errno = ENOMEM;
816 		return -1;
817 	}
818 
819 	mtcp->smap[sockid].listener = listener;
820 	mtcp->listener = listener;
821 
822 	return 0;
823 }
824 /*----------------------------------------------------------------------------*/
825 int
826 mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen)
827 {
828 	mtcp_manager_t mtcp;
829 	struct tcp_listener *listener;
830 	socket_map_t socket;
831 	tcp_stream *accepted = NULL;
832 
833 	mtcp = GetMTCPManager(mctx);
834 	if (!mtcp) {
835 		errno = EACCES;
836 		return -1;
837 	}
838 
839 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
840 		TRACE_API("Socket id %d out of range.\n", sockid);
841 		errno = EBADF;
842 		return -1;
843 	}
844 
845 	/* requires listening socket */
846 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
847 		errno = EINVAL;
848 		return -1;
849 	}
850 
851 	listener = mtcp->smap[sockid].listener;
852 
853 	/* dequeue from the acceptq without lock first */
854 	/* if nothing there, acquire lock and cond_wait */
855 	accepted = StreamDequeue(listener->acceptq);
856 	if (!accepted) {
857 		if (listener->socket->opts & MTCP_NONBLOCK) {
858 			errno = EAGAIN;
859 			return -1;
860 
861 		} else {
862 			pthread_mutex_lock(&listener->accept_lock);
863 			while ((accepted = StreamDequeue(listener->acceptq)) == NULL) {
864 				pthread_cond_wait(&listener->accept_cond, &listener->accept_lock);
865 
866 				if (mtcp->ctx->done || mtcp->ctx->exit) {
867 					pthread_mutex_unlock(&listener->accept_lock);
868 					errno = EINTR;
869 					return -1;
870 				}
871 			}
872 			pthread_mutex_unlock(&listener->accept_lock);
873 		}
874 	}
875 
876 	if (!accepted) {
877 		TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n");
878 	}
879 
880 	if (!accepted->socket) {
881 		socket = AllocateSocket(mctx, MOS_SOCK_STREAM);
882 		if (!socket) {
883 			TRACE_ERROR("Failed to create new socket!\n");
884 			/* TODO: destroy the stream */
885 			errno = ENFILE;
886 			return -1;
887 		}
888 		socket->stream = accepted;
889 		accepted->socket = socket;
890 
891 		/* set socket addr parameters */
892 		socket->saddr.sin_family = AF_INET;
893 		socket->saddr.sin_port = accepted->dport;
894 		socket->saddr.sin_addr.s_addr = accepted->daddr;
895 
896 		/* if monitor is enabled, complete the socket assignment */
897 		if (socket->stream->pair_stream != NULL)
898 			socket->stream->pair_stream->socket = socket;
899 	}
900 
901 	if (!(listener->socket->epoll & MOS_EPOLLET) &&
902 	    !StreamQueueIsEmpty(listener->acceptq))
903 		AddEpollEvent(mtcp->ep,
904 			      USR_SHADOW_EVENT_QUEUE,
905 			      listener->socket, MOS_EPOLLIN);
906 
907 	TRACE_API("Stream %d accepted.\n", accepted->id);
908 
909 	if (addr && addrlen) {
910 		struct sockaddr_in *addr_in = (struct sockaddr_in *)addr;
911 		addr_in->sin_family = AF_INET;
912 		addr_in->sin_port = accepted->dport;
913 		addr_in->sin_addr.s_addr = accepted->daddr;
914 		*addrlen = sizeof(struct sockaddr_in);
915 	}
916 
917 	return accepted->socket->id;
918 }
919 /*----------------------------------------------------------------------------*/
920 int
921 mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr,
922 		in_addr_t daddr, in_addr_t dport)
923 {
924 	mtcp_manager_t mtcp;
925 	addr_pool_t ap;
926 
927 	mtcp = GetMTCPManager(mctx);
928 	if (!mtcp) {
929 		errno = EACCES;
930 		return -1;
931 	}
932 
933 	if (saddr_base == INADDR_ANY) {
934 		int nif_out;
935 
936 		/* for the INADDR_ANY, find the output interface for the destination
937 		   and set the saddr_base as the ip address of the output interface */
938 		nif_out = GetOutputInterface(daddr);
939 		saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr;
940 	}
941 
942 	ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus,
943 			saddr_base, num_addr, daddr, dport);
944 	if (!ap) {
945 		errno = ENOMEM;
946 		return -1;
947 	}
948 
949 	mtcp->ap = ap;
950 
951 	return 0;
952 }
953 /*----------------------------------------------------------------------------*/
954 int
955 eval_bpf_5tuple(struct sfbpf_program fcode,
956 				in_addr_t saddr, in_port_t sport,
957 				in_addr_t daddr, in_port_t dport) {
958 	uint8_t buf[TOTAL_TCP_HEADER_LEN];
959 	struct ethhdr *ethh;
960 	struct iphdr *iph;
961 	struct tcphdr *tcph;
962 
963 	ethh = (struct ethhdr *)buf;
964 	ethh->h_proto = htons(ETH_P_IP);
965 	iph = (struct iphdr *)(ethh + 1);
966 	iph->ihl = IP_HEADER_LEN >> 2;
967 	iph->version = 4;
968 	iph->tos = 0;
969 	iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN);
970 	iph->id = htons(0);
971 	iph->protocol = IPPROTO_TCP;
972 	iph->saddr = saddr;
973 	iph->daddr = daddr;
974 	iph->check = 0;
975 	tcph = (struct tcphdr *)(iph + 1);
976 	tcph->source = sport;
977 	tcph->dest = dport;
978 
979 	return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr),
980 						 TOTAL_TCP_HEADER_LEN);
981 }
982 /*----------------------------------------------------------------------------*/
983 int
984 mtcp_connect(mctx_t mctx, int sockid,
985 		const struct sockaddr *addr, socklen_t addrlen)
986 {
987 	mtcp_manager_t mtcp;
988 	socket_map_t socket;
989 	tcp_stream *cur_stream;
990 	struct sockaddr_in *addr_in;
991 	in_addr_t dip;
992 	in_port_t dport;
993 	int is_dyn_bound = FALSE;
994 	int ret;
995 	int cnt_match = 0;
996 	struct mon_listener *walk;
997 	struct sfbpf_program fcode;
998 
999 	cur_stream = NULL;
1000 	mtcp = GetMTCPManager(mctx);
1001 	if (!mtcp) {
1002 		errno = EACCES;
1003 		return -1;
1004 	}
1005 
1006 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1007 		TRACE_API("Socket id %d out of range.\n", sockid);
1008 		errno = EBADF;
1009 		return -1;
1010 	}
1011 
1012 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1013 		TRACE_API("Invalid socket id: %d\n", sockid);
1014 		errno = EBADF;
1015 		return -1;
1016 	}
1017 
1018 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
1019 		TRACE_API("Not an end socket. id: %d\n", sockid);
1020 		errno = ENOTSOCK;
1021 		return -1;
1022 	}
1023 
1024 	if (!addr) {
1025 		TRACE_API("Socket %d: empty address!\n", sockid);
1026 		errno = EFAULT;
1027 		return -1;
1028 	}
1029 
1030 	/* we only allow bind() for AF_INET address */
1031 	if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
1032 		TRACE_API("Socket %d: invalid argument!\n", sockid);
1033 		errno = EAFNOSUPPORT;
1034 		return -1;
1035 	}
1036 
1037 	socket = &mtcp->smap[sockid];
1038 	if (socket->stream) {
1039 		TRACE_API("Socket %d: stream already exist!\n", sockid);
1040 		if (socket->stream->state >= TCP_ST_ESTABLISHED) {
1041 			errno = EISCONN;
1042 		} else {
1043 			errno = EALREADY;
1044 		}
1045 		return -1;
1046 	}
1047 
1048 	addr_in = (struct sockaddr_in *)addr;
1049 	dip = addr_in->sin_addr.s_addr;
1050 	dport = addr_in->sin_port;
1051 
1052 	/* address binding */
1053 	if (socket->opts & MTCP_ADDR_BIND &&
1054 	    socket->saddr.sin_port != INPORT_ANY &&
1055 	    socket->saddr.sin_addr.s_addr != INADDR_ANY) {
1056 		int rss_core;
1057 
1058 		rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip,
1059 				socket->saddr.sin_port, dport, num_queues);
1060 
1061 		if (rss_core != mctx->cpu) {
1062 			errno = EINVAL;
1063 			return -1;
1064 		}
1065 	} else {
1066 		if (mtcp->ap) {
1067 			ret = FetchAddress(mtcp->ap,
1068 					mctx->cpu, num_queues, addr_in, &socket->saddr);
1069 		} else {
1070 			ret = FetchAddress(ap[GetOutputInterface(dip)],
1071 					   mctx->cpu, num_queues, addr_in, &socket->saddr);
1072 		}
1073 		if (ret < 0) {
1074 			errno = EAGAIN;
1075 			return -1;
1076 		}
1077 		socket->opts |= MTCP_ADDR_BIND;
1078 		is_dyn_bound = TRUE;
1079 	}
1080 
1081 	cnt_match = 0;
1082 	if (mtcp->num_msp > 0) {
1083 		TAILQ_FOREACH(walk, &mtcp->monitors, link) {
1084 			fcode = walk->stream_syn_fcode;
1085 			if (!(ISSET_BPFFILTER(fcode) &&
1086 				  eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr,
1087 								  socket->saddr.sin_port,
1088 								  dip, dport) == 0)) {
1089 				walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1
1090 				cnt_match++;
1091 			}
1092 		}
1093 	}
1094 
1095 	if (mtcp->num_msp > 0 && cnt_match > 0) {
1096 		/* 150820 dhkim: XXX: embedded mode is not verified */
1097 #if 1
1098 		cur_stream = CreateClientTCPStream(mtcp, socket,
1099 						 STREAM_TYPE(MOS_SOCK_STREAM) |
1100 						 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
1101 						 socket->saddr.sin_addr.s_addr,
1102 						 socket->saddr.sin_port, dip, dport, NULL);
1103 #else
1104 		cur_stream = CreateDualTCPStream(mtcp, socket,
1105 						 STREAM_TYPE(MOS_SOCK_STREAM) |
1106 						 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
1107 						 socket->saddr.sin_addr.s_addr,
1108 						 socket->saddr.sin_port, dip, dport, NULL);
1109 #endif
1110 	}
1111 	else
1112 		cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM),
1113 					     socket->saddr.sin_addr.s_addr,
1114 					     socket->saddr.sin_port, dip, dport, NULL);
1115 	if (!cur_stream) {
1116 		TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid);
1117 		errno = ENOMEM;
1118 		return -1;
1119 	}
1120 
1121 	if (is_dyn_bound)
1122 		cur_stream->is_bound_addr = TRUE;
1123 	cur_stream->sndvar->cwnd = 1;
1124 	cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10;
1125 	cur_stream->side = MOS_SIDE_CLI;
1126 	/* if monitor is enabled, update the pair stream side as well */
1127 	if (cur_stream->pair_stream) {
1128 		cur_stream->pair_stream->side = MOS_SIDE_SVR;
1129 		/*
1130 		 * if buffer management is off, then disable
1131 		 * monitoring tcp ring of server...
1132 		 * if there is even a single monitor asking for
1133 		 * buffer management, enable it (that's why the
1134 		 * need for the loop)
1135 		 */
1136 		cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF;
1137 		struct socket_map *walk;
1138 		SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
1139 			uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt;
1140 			if (bm > cur_stream->pair_stream->buffer_mgmt) {
1141 				cur_stream->pair_stream->buffer_mgmt = bm;
1142 				break;
1143 			}
1144 		} SOCKQ_FOREACH_END;
1145 	}
1146 
1147 	cur_stream->state = TCP_ST_SYN_SENT;
1148 	cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1149 
1150 	TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id);
1151 
1152 	SQ_LOCK(&mtcp->ctx->connect_lock);
1153 	ret = StreamEnqueue(mtcp->connectq, cur_stream);
1154 	SQ_UNLOCK(&mtcp->ctx->connect_lock);
1155 	mtcp->wakeup_flag = TRUE;
1156 	if (ret < 0) {
1157 		TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid);
1158 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1159 		StreamEnqueue(mtcp->destroyq, cur_stream);
1160 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1161 		errno = EAGAIN;
1162 		return -1;
1163 	}
1164 
1165 	/* if nonblocking socket, return EINPROGRESS */
1166 	if (socket->opts & MTCP_NONBLOCK) {
1167 		errno = EINPROGRESS;
1168 		return -1;
1169 
1170 	} else {
1171 		while (1) {
1172 			if (!cur_stream) {
1173 				TRACE_ERROR("STREAM DESTROYED\n");
1174 				errno = ETIMEDOUT;
1175 				return -1;
1176 			}
1177 			if (cur_stream->state > TCP_ST_ESTABLISHED) {
1178 				TRACE_ERROR("Socket %d: weird state %s\n",
1179 						sockid, TCPStateToString(cur_stream));
1180 				// TODO: how to handle this?
1181 				errno = ENOSYS;
1182 				return -1;
1183 			}
1184 
1185 			if (cur_stream->state == TCP_ST_ESTABLISHED) {
1186 				break;
1187 			}
1188 			usleep(1000);
1189 		}
1190 	}
1191 
1192 	return 0;
1193 }
1194 /*----------------------------------------------------------------------------*/
1195 static inline int
1196 CloseStreamSocket(mctx_t mctx, int sockid)
1197 {
1198 	mtcp_manager_t mtcp;
1199 	tcp_stream *cur_stream;
1200 	int ret;
1201 
1202 	mtcp = GetMTCPManager(mctx);
1203 	if (!mtcp) {
1204 		errno = EACCES;
1205 		return -1;
1206 	}
1207 
1208 	cur_stream = mtcp->smap[sockid].stream;
1209 	if (!cur_stream) {
1210 		TRACE_API("Socket %d: stream does not exist.\n", sockid);
1211 		errno = ENOTCONN;
1212 		return -1;
1213 	}
1214 
1215 	if (cur_stream->closed) {
1216 		TRACE_API("Socket %d (Stream %u): already closed stream\n",
1217 				sockid, cur_stream->id);
1218 		return 0;
1219 	}
1220 	cur_stream->closed = TRUE;
1221 
1222 	TRACE_API("Stream %d: closing the stream.\n", cur_stream->id);
1223 
1224 	/* 141029 dhkim: Check this! */
1225 	cur_stream->socket = NULL;
1226 
1227 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
1228 		TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n",
1229 				cur_stream->id);
1230 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1231 		StreamEnqueue(mtcp->destroyq, cur_stream);
1232 		mtcp->wakeup_flag = TRUE;
1233 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1234 		return 0;
1235 
1236 	} else if (cur_stream->state == TCP_ST_SYN_SENT) {
1237 #if 1
1238 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1239 		StreamEnqueue(mtcp->destroyq, cur_stream);
1240 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1241 		mtcp->wakeup_flag = TRUE;
1242 #endif
1243 		return -1;
1244 
1245 	} else if (cur_stream->state != TCP_ST_ESTABLISHED &&
1246 			cur_stream->state != TCP_ST_CLOSE_WAIT) {
1247 		TRACE_API("Stream %d at state %s\n",
1248 				cur_stream->id, TCPStateToString(cur_stream));
1249 		errno = EBADF;
1250 		return -1;
1251 	}
1252 
1253 	SQ_LOCK(&mtcp->ctx->close_lock);
1254 	cur_stream->sndvar->on_closeq = TRUE;
1255 	ret = StreamEnqueue(mtcp->closeq, cur_stream);
1256 	mtcp->wakeup_flag = TRUE;
1257 	SQ_UNLOCK(&mtcp->ctx->close_lock);
1258 
1259 	if (ret < 0) {
1260 		TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
1261 		errno = EAGAIN;
1262 		return -1;
1263 	}
1264 
1265 	return 0;
1266 }
1267 /*----------------------------------------------------------------------------*/
1268 static inline int
1269 CloseListeningSocket(mctx_t mctx, int sockid)
1270 {
1271 	mtcp_manager_t mtcp;
1272 	struct tcp_listener *listener;
1273 
1274 	mtcp = GetMTCPManager(mctx);
1275 	if (!mtcp) {
1276 		errno = EACCES;
1277 		return -1;
1278 	}
1279 
1280 	listener = mtcp->smap[sockid].listener;
1281 	if (!listener) {
1282 		errno = EINVAL;
1283 		return -1;
1284 	}
1285 
1286 	if (listener->acceptq) {
1287 		DestroyStreamQueue(listener->acceptq);
1288 		listener->acceptq = NULL;
1289 	}
1290 
1291 	pthread_mutex_lock(&listener->accept_lock);
1292 	pthread_cond_signal(&listener->accept_cond);
1293 	pthread_mutex_unlock(&listener->accept_lock);
1294 
1295 	pthread_cond_destroy(&listener->accept_cond);
1296 	pthread_mutex_destroy(&listener->accept_lock);
1297 
1298 	free(listener);
1299 	mtcp->smap[sockid].listener = NULL;
1300 
1301 	return 0;
1302 }
1303 /*----------------------------------------------------------------------------*/
1304 int
1305 mtcp_close(mctx_t mctx, int sockid)
1306 {
1307 	mtcp_manager_t mtcp;
1308 	int ret;
1309 
1310 	mtcp = GetMTCPManager(mctx);
1311 	if (!mtcp) {
1312 		errno = EACCES;
1313 		return -1;
1314 	}
1315 
1316 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1317 		TRACE_API("Socket id %d out of range.\n", sockid);
1318 		errno = EBADF;
1319 		return -1;
1320 	}
1321 
1322 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1323 		TRACE_API("Invalid socket id: %d\n", sockid);
1324 		errno = EBADF;
1325 		return -1;
1326 	}
1327 
1328 	TRACE_API("Socket %d: mtcp_close called.\n", sockid);
1329 
1330 	switch (mtcp->smap[sockid].socktype) {
1331 	case MOS_SOCK_STREAM:
1332 		ret = CloseStreamSocket(mctx, sockid);
1333 		break;
1334 
1335 	case MOS_SOCK_STREAM_LISTEN:
1336 		ret = CloseListeningSocket(mctx, sockid);
1337 		break;
1338 
1339 	case MOS_SOCK_EPOLL:
1340 		ret = CloseEpollSocket(mctx, sockid);
1341 		break;
1342 
1343 	case MOS_SOCK_PIPE:
1344 		ret = PipeClose(mctx, sockid);
1345 		break;
1346 
1347 	default:
1348 		errno = EINVAL;
1349 		ret = -1;
1350 		break;
1351 	}
1352 
1353 	FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
1354 
1355 	return ret;
1356 }
1357 /*----------------------------------------------------------------------------*/
1358 int
1359 mtcp_abort(mctx_t mctx, int sockid)
1360 {
1361 	mtcp_manager_t mtcp;
1362 	tcp_stream *cur_stream;
1363 	int ret;
1364 
1365 	mtcp = GetMTCPManager(mctx);
1366 	if (!mtcp) {
1367 		errno = EACCES;
1368 		return -1;
1369 	}
1370 
1371 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1372 		TRACE_API("Socket id %d out of range.\n", sockid);
1373 		errno = EBADF;
1374 		return -1;
1375 	}
1376 
1377 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1378 		TRACE_API("Invalid socket id: %d\n", sockid);
1379 		errno = EBADF;
1380 		return -1;
1381 	}
1382 
1383 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
1384 		TRACE_API("Not an end socket. id: %d\n", sockid);
1385 		errno = ENOTSOCK;
1386 		return -1;
1387 	}
1388 
1389 	cur_stream = mtcp->smap[sockid].stream;
1390 	if (!cur_stream) {
1391 		TRACE_API("Stream %d: does not exist.\n", sockid);
1392 		errno = ENOTCONN;
1393 		return -1;
1394 	}
1395 
1396 	TRACE_API("Socket %d: mtcp_abort()\n", sockid);
1397 
1398 	FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
1399 	cur_stream->socket = NULL;
1400 
1401 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
1402 		TRACE_API("Stream %d: connection already reset.\n", sockid);
1403 		return ERROR;
1404 
1405 	} else if (cur_stream->state == TCP_ST_SYN_SENT) {
1406 		/* TODO: this should notify event failure to all
1407 		   previous read() or write() calls */
1408 		cur_stream->state = TCP_ST_CLOSED_RSVD;
1409 		cur_stream->close_reason = TCP_ACTIVE_CLOSE;
1410 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1411 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1412 		StreamEnqueue(mtcp->destroyq, cur_stream);
1413 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1414 		mtcp->wakeup_flag = TRUE;
1415 		return 0;
1416 
1417 	} else if (cur_stream->state == TCP_ST_CLOSING ||
1418 			cur_stream->state == TCP_ST_LAST_ACK ||
1419 			cur_stream->state == TCP_ST_TIME_WAIT) {
1420 		cur_stream->state = TCP_ST_CLOSED_RSVD;
1421 		cur_stream->close_reason = TCP_ACTIVE_CLOSE;
1422 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1423 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1424 		StreamEnqueue(mtcp->destroyq, cur_stream);
1425 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1426 		mtcp->wakeup_flag = TRUE;
1427 		return 0;
1428 	}
1429 
1430 	/* the stream structure will be destroyed after sending RST */
1431 	if (cur_stream->sndvar->on_resetq) {
1432 		TRACE_ERROR("Stream %d: calling mtcp_abort() "
1433 				"when in reset queue.\n", sockid);
1434 		errno = ECONNRESET;
1435 		return -1;
1436 	}
1437 	SQ_LOCK(&mtcp->ctx->reset_lock);
1438 	cur_stream->sndvar->on_resetq = TRUE;
1439 	ret = StreamEnqueue(mtcp->resetq, cur_stream);
1440 	SQ_UNLOCK(&mtcp->ctx->reset_lock);
1441 	mtcp->wakeup_flag = TRUE;
1442 
1443 	if (ret < 0) {
1444 		TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
1445 		errno = EAGAIN;
1446 		return -1;
1447 	}
1448 
1449 	return 0;
1450 }
1451 /*----------------------------------------------------------------------------*/
1452 static inline int
1453 PeekForUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len)
1454 {
1455 	struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
1456 	int copylen;
1457 	tcprb_t *rb = rcvvar->rcvbuf;
1458 
1459 	if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) {
1460 		errno = EAGAIN;
1461 		return -1;
1462 	}
1463 
1464 	return copylen;
1465 }
1466 /*----------------------------------------------------------------------------*/
1467 static inline int
1468 CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len)
1469 {
1470 	struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
1471 	int copylen;
1472 	tcprb_t *rb = rcvvar->rcvbuf;
1473 	if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) {
1474 		errno = EAGAIN;
1475 		return -1;
1476 	}
1477 	tcprb_setpile(rb, rb->pile + copylen);
1478 
1479 	rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb);
1480 	//printf("rcv_wnd: %d\n", rcvvar->rcv_wnd);
1481 
1482 	/* Advertise newly freed receive buffer */
1483 	if (cur_stream->need_wnd_adv) {
1484 		if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) {
1485 			if (!cur_stream->sndvar->on_ackq) {
1486 				SQ_LOCK(&mtcp->ctx->ackq_lock);
1487 				cur_stream->sndvar->on_ackq = TRUE;
1488 				StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */
1489 				SQ_UNLOCK(&mtcp->ctx->ackq_lock);
1490 				cur_stream->need_wnd_adv = FALSE;
1491 				mtcp->wakeup_flag = TRUE;
1492 			}
1493 		}
1494 	}
1495 
1496 	return copylen;
1497 }
1498 /*----------------------------------------------------------------------------*/
1499 ssize_t
1500 mtcp_recv(mctx_t mctx, int sockid, char *buf, size_t len, int flags)
1501 {
1502 	mtcp_manager_t mtcp;
1503 	socket_map_t socket;
1504 	tcp_stream *cur_stream;
1505 	struct tcp_recv_vars *rcvvar;
1506 	int event_remaining, merged_len;
1507 	int ret;
1508 
1509 	mtcp = GetMTCPManager(mctx);
1510 	if (!mtcp) {
1511 		errno = EACCES;
1512 		return -1;
1513 	}
1514 
1515 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1516 		TRACE_API("Socket id %d out of range.\n", sockid);
1517 		errno = EBADF;
1518 		return -1;
1519 	}
1520 
1521 	socket = &mtcp->smap[sockid];
1522 	if (socket->socktype == MOS_SOCK_UNUSED) {
1523 		TRACE_API("Invalid socket id: %d\n", sockid);
1524 		errno = EBADF;
1525 		return -1;
1526 	}
1527 
1528 	if (socket->socktype == MOS_SOCK_PIPE) {
1529 		return PipeRead(mctx, sockid, buf, len);
1530 	}
1531 
1532 	if (socket->socktype != MOS_SOCK_STREAM) {
1533 		TRACE_API("Not an end socket. id: %d\n", sockid);
1534 		errno = ENOTSOCK;
1535 		return -1;
1536 	}
1537 
1538 	/* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
1539 	cur_stream = socket->stream;
1540 	if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf ||
1541 	    !(cur_stream->state >= TCP_ST_ESTABLISHED &&
1542 	      cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
1543 		errno = ENOTCONN;
1544 		return -1;
1545 	}
1546 
1547 	rcvvar = cur_stream->rcvvar;
1548 
1549 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1550 
1551 	/* if CLOSE_WAIT, return 0 if there is no payload */
1552 	if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
1553 		if (!rcvvar->rcvbuf)
1554 			return 0;
1555 
1556 		if (merged_len == 0)
1557 			return 0;
1558 	}
1559 
1560 	/* return EAGAIN if no receive buffer */
1561 	if (socket->opts & MTCP_NONBLOCK) {
1562 		if (!rcvvar->rcvbuf || merged_len == 0) {
1563 			errno = EAGAIN;
1564 			return -1;
1565 		}
1566 	}
1567 
1568 	SBUF_LOCK(&rcvvar->read_lock);
1569 
1570 	switch (flags) {
1571 	case 0:
1572 		ret = CopyToUser(mtcp, cur_stream, buf, len);
1573 		break;
1574 	case MSG_PEEK:
1575 		ret = PeekForUser(mtcp, cur_stream, buf, len);
1576 		break;
1577 	default:
1578 		SBUF_UNLOCK(&rcvvar->read_lock);
1579 		ret = -1;
1580 		errno = EINVAL;
1581 		return ret;
1582 	}
1583 
1584 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1585 	event_remaining = FALSE;
1586 	/* if there are remaining payload, generate EPOLLIN */
1587 	/* (may due to insufficient user buffer) */
1588 	if (socket->epoll & MOS_EPOLLIN) {
1589 		if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
1590 			event_remaining = TRUE;
1591 		}
1592 	}
1593 	/* if waiting for close, notify it if no remaining data */
1594 	if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
1595 	    merged_len == 0 && ret > 0) {
1596 		event_remaining = TRUE;
1597 	}
1598 
1599 	SBUF_UNLOCK(&rcvvar->read_lock);
1600 
1601 	if (event_remaining) {
1602 		if (socket->epoll) {
1603 			AddEpollEvent(mtcp->ep,
1604 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
1605 		}
1606 	}
1607 
1608 	TRACE_API("Stream %d: mtcp_recv() returning %d\n", cur_stream->id, ret);
1609 	return ret;
1610 }
1611 /*----------------------------------------------------------------------------*/
1612 inline ssize_t
1613 mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len)
1614 {
1615 	return mtcp_recv(mctx, sockid, buf, len, 0);
1616 }
1617 /*----------------------------------------------------------------------------*/
1618 ssize_t
1619 mtcp_readv(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV)
1620 {
1621 	mtcp_manager_t mtcp;
1622 	socket_map_t socket;
1623 	tcp_stream *cur_stream;
1624 	struct tcp_recv_vars *rcvvar;
1625 	int ret, bytes_read, i;
1626 	int event_remaining, merged_len;
1627 
1628 	mtcp = GetMTCPManager(mctx);
1629 	if (!mtcp) {
1630 		errno = EACCES;
1631 		return -1;
1632 	}
1633 
1634 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1635 		TRACE_API("Socket id %d out of range.\n", sockid);
1636 		errno = EBADF;
1637 		return -1;
1638 	}
1639 
1640 	socket = &mtcp->smap[sockid];
1641 	if (socket->socktype == MOS_SOCK_UNUSED) {
1642 		TRACE_API("Invalid socket id: %d\n", sockid);
1643 		errno = EBADF;
1644 		return -1;
1645 	}
1646 
1647 	if (socket->socktype != MOS_SOCK_STREAM) {
1648 		TRACE_API("Not an end socket. id: %d\n", sockid);
1649 		errno = ENOTSOCK;
1650 		return -1;
1651 	}
1652 
1653 	/* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
1654 	cur_stream = socket->stream;
1655 	if (!cur_stream ||
1656 			!(cur_stream->state >= TCP_ST_ESTABLISHED &&
1657 			  cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
1658 		errno = ENOTCONN;
1659 		return -1;
1660 	}
1661 
1662 	rcvvar = cur_stream->rcvvar;
1663 
1664 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1665 
1666 	/* if CLOSE_WAIT, return 0 if there is no payload */
1667 	if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
1668 		if (!rcvvar->rcvbuf)
1669 			return 0;
1670 
1671 		if (merged_len == 0)
1672 			return 0;
1673 	}
1674 
1675 	/* return EAGAIN if no receive buffer */
1676 	if (socket->opts & MTCP_NONBLOCK) {
1677 		if (!rcvvar->rcvbuf || merged_len == 0) {
1678 			errno = EAGAIN;
1679 			return -1;
1680 		}
1681 	}
1682 
1683 	SBUF_LOCK(&rcvvar->read_lock);
1684 
1685 	/* read and store the contents to the vectored buffers */
1686 	bytes_read = 0;
1687 	for (i = 0; i < numIOV; i++) {
1688 		if (iov[i].iov_len <= 0)
1689 			continue;
1690 
1691 		ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
1692 		if (ret <= 0)
1693 			break;
1694 
1695 		bytes_read += ret;
1696 
1697 		if (ret < iov[i].iov_len)
1698 			break;
1699 	}
1700 
1701 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1702 
1703 	event_remaining = FALSE;
1704 	/* if there are remaining payload, generate read event */
1705 	/* (may due to insufficient user buffer) */
1706 	if (socket->epoll & MOS_EPOLLIN) {
1707 		if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
1708 			event_remaining = TRUE;
1709 		}
1710 	}
1711 	/* if waiting for close, notify it if no remaining data */
1712 	if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
1713 			merged_len == 0 && bytes_read > 0) {
1714 		event_remaining = TRUE;
1715 	}
1716 
1717 	SBUF_UNLOCK(&rcvvar->read_lock);
1718 
1719 	if(event_remaining) {
1720 		if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) {
1721 			AddEpollEvent(mtcp->ep,
1722 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
1723 		}
1724 	}
1725 
1726 	TRACE_API("Stream %d: mtcp_readv() returning %d\n",
1727 			cur_stream->id, bytes_read);
1728 	return bytes_read;
1729 }
1730 /*----------------------------------------------------------------------------*/
1731 static inline int
1732 CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, const char *buf, int len)
1733 {
1734 	struct tcp_send_vars *sndvar = cur_stream->sndvar;
1735 	int sndlen;
1736 	int ret;
1737 
1738 	sndlen = MIN((int)sndvar->snd_wnd, len);
1739 	if (sndlen <= 0) {
1740 		errno = EAGAIN;
1741 		return -1;
1742 	}
1743 
1744 	/* allocate send buffer if not exist */
1745 	if (!sndvar->sndbuf) {
1746 		sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1);
1747 		if (!sndvar->sndbuf) {
1748 			cur_stream->close_reason = TCP_NO_MEM;
1749 			/* notification may not required due to -1 return */
1750 			errno = ENOMEM;
1751 			return -1;
1752 		}
1753 	}
1754 
1755 	ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen);
1756 	assert(ret == sndlen);
1757 	sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len;
1758 	if (ret <= 0) {
1759 		TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n",
1760 				ret, sndlen, sndvar->sndbuf->len);
1761 		errno = EAGAIN;
1762 		return -1;
1763 	}
1764 
1765 	if (sndvar->snd_wnd <= 0) {
1766 		TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n",
1767 				cur_stream->id, sndvar->snd_wnd);
1768 	}
1769 
1770 	return ret;
1771 }
1772 /*----------------------------------------------------------------------------*/
1773 ssize_t
1774 mtcp_write(mctx_t mctx, int sockid, const char *buf, size_t len)
1775 {
1776 	mtcp_manager_t mtcp;
1777 	socket_map_t socket;
1778 	tcp_stream *cur_stream;
1779 	struct tcp_send_vars *sndvar;
1780 	int ret;
1781 
1782 	mtcp = GetMTCPManager(mctx);
1783 	if (!mtcp) {
1784 		errno = EACCES;
1785 		return -1;
1786 	}
1787 
1788 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1789 		TRACE_API("Socket id %d out of range.\n", sockid);
1790 		errno = EBADF;
1791 		return -1;
1792 	}
1793 
1794 	socket = &mtcp->smap[sockid];
1795 	if (socket->socktype == MOS_SOCK_UNUSED) {
1796 		TRACE_API("Invalid socket id: %d\n", sockid);
1797 		errno = EBADF;
1798 		return -1;
1799 	}
1800 
1801 	if (socket->socktype == MOS_SOCK_PIPE) {
1802 		return PipeWrite(mctx, sockid, buf, len);
1803 	}
1804 
1805 	if (socket->socktype != MOS_SOCK_STREAM) {
1806 		TRACE_API("Not an end socket. id: %d\n", sockid);
1807 		errno = ENOTSOCK;
1808 		return -1;
1809 	}
1810 
1811 	cur_stream = socket->stream;
1812 	if (!cur_stream ||
1813 			!(cur_stream->state == TCP_ST_ESTABLISHED ||
1814 			  cur_stream->state == TCP_ST_CLOSE_WAIT)) {
1815 		errno = ENOTCONN;
1816 		return -1;
1817 	}
1818 
1819 	if (len <= 0) {
1820 		if (socket->opts & MTCP_NONBLOCK) {
1821 			errno = EAGAIN;
1822 			return -1;
1823 		} else {
1824 			return 0;
1825 		}
1826 	}
1827 
1828 	sndvar = cur_stream->sndvar;
1829 
1830 	SBUF_LOCK(&sndvar->write_lock);
1831 	ret = CopyFromUser(mtcp, cur_stream, buf, len);
1832 
1833 	SBUF_UNLOCK(&sndvar->write_lock);
1834 
1835 	if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
1836 		SQ_LOCK(&mtcp->ctx->sendq_lock);
1837 		sndvar->on_sendq = TRUE;
1838 		StreamEnqueue(mtcp->sendq, cur_stream);		/* this always success */
1839 		SQ_UNLOCK(&mtcp->ctx->sendq_lock);
1840 		mtcp->wakeup_flag = TRUE;
1841 	}
1842 
1843 	if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) {
1844 		ret = -1;
1845 		errno = EAGAIN;
1846 	}
1847 
1848 	/* if there are remaining sending buffer, generate write event */
1849 	if (sndvar->snd_wnd > 0) {
1850 		if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
1851 			AddEpollEvent(mtcp->ep,
1852 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
1853 		}
1854 	}
1855 
1856 	TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret);
1857 	return ret;
1858 }
1859 /*----------------------------------------------------------------------------*/
1860 ssize_t
1861 mtcp_writev(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV)
1862 {
1863 	mtcp_manager_t mtcp;
1864 	socket_map_t socket;
1865 	tcp_stream *cur_stream;
1866 	struct tcp_send_vars *sndvar;
1867 	int ret, to_write, i;
1868 
1869 	mtcp = GetMTCPManager(mctx);
1870 	if (!mtcp) {
1871 		errno = EACCES;
1872 		return -1;
1873 	}
1874 
1875 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1876 		TRACE_API("Socket id %d out of range.\n", sockid);
1877 		errno = EBADF;
1878 		return -1;
1879 	}
1880 
1881 	socket = &mtcp->smap[sockid];
1882 	if (socket->socktype == MOS_SOCK_UNUSED) {
1883 		TRACE_API("Invalid socket id: %d\n", sockid);
1884 		errno = EBADF;
1885 		return -1;
1886 	}
1887 
1888 	if (socket->socktype != MOS_SOCK_STREAM) {
1889 		TRACE_API("Not an end socket. id: %d\n", sockid);
1890 		errno = ENOTSOCK;
1891 		return -1;
1892 	}
1893 
1894 	cur_stream = socket->stream;
1895 	if (!cur_stream ||
1896 			!(cur_stream->state == TCP_ST_ESTABLISHED ||
1897 			  cur_stream->state == TCP_ST_CLOSE_WAIT)) {
1898 		errno = ENOTCONN;
1899 		return -1;
1900 	}
1901 
1902 	sndvar = cur_stream->sndvar;
1903 	SBUF_LOCK(&sndvar->write_lock);
1904 
1905 	/* write from the vectored buffers */
1906 	to_write = 0;
1907 	for (i = 0; i < numIOV; i++) {
1908 		if (iov[i].iov_len <= 0)
1909 			continue;
1910 
1911 		ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
1912 		if (ret <= 0)
1913 			break;
1914 
1915 		to_write += ret;
1916 
1917 		if (ret < iov[i].iov_len)
1918 			break;
1919 	}
1920 	SBUF_UNLOCK(&sndvar->write_lock);
1921 
1922 	if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
1923 		SQ_LOCK(&mtcp->ctx->sendq_lock);
1924 		sndvar->on_sendq = TRUE;
1925 		StreamEnqueue(mtcp->sendq, cur_stream);		/* this always success */
1926 		SQ_UNLOCK(&mtcp->ctx->sendq_lock);
1927 		mtcp->wakeup_flag = TRUE;
1928 	}
1929 
1930 	if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) {
1931 		to_write = -1;
1932 		errno = EAGAIN;
1933 	}
1934 
1935 	/* if there are remaining sending buffer, generate write event */
1936 	if (sndvar->snd_wnd > 0) {
1937 		if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
1938 			AddEpollEvent(mtcp->ep,
1939 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
1940 		}
1941 	}
1942 
1943 	TRACE_API("Stream %d: mtcp_writev() returning %d\n",
1944 			cur_stream->id, to_write);
1945 	return to_write;
1946 }
1947 /*----------------------------------------------------------------------------*/
1948 uint32_t
1949 mtcp_get_connection_cnt(mctx_t mctx)
1950 {
1951 	mtcp_manager_t mtcp;
1952 	mtcp = GetMTCPManager(mctx);
1953 	if (!mtcp) {
1954 		errno = EACCES;
1955 		return -1;
1956 	}
1957 
1958 	if (mtcp->num_msp > 0)
1959 		return mtcp->flow_cnt / 2;
1960 	else
1961 		return mtcp->flow_cnt;
1962 }
1963 /*----------------------------------------------------------------------------*/
1964