xref: /mOS-networking-stack/core/src/api.c (revision 8a941c7e)
1 #include <sys/queue.h>
2 #include <sys/ioctl.h>
3 #include <limits.h>
4 #include <unistd.h>
5 #include <assert.h>
6 #include <string.h>
7 
8 #ifdef DARWIN
9 #include <netinet/tcp.h>
10 #include <netinet/ip.h>
11 #include <netinet/if_ether.h>
12 #endif
13 
14 #include "mtcp.h"
15 #include "mtcp_api.h"
16 #include "tcp_in.h"
17 #include "tcp_stream.h"
18 #include "tcp_out.h"
19 #include "ip_out.h"
20 #include "eventpoll.h"
21 #include "pipe.h"
22 #include "fhash.h"
23 #include "addr_pool.h"
24 #include "rss.h"
25 #include "config.h"
26 #include "debug.h"
27 #include "eventpoll.h"
28 #include "mos_api.h"
29 #include "tcp_rb.h"
30 
31 #define MAX(a, b) ((a)>(b)?(a):(b))
32 #define MIN(a, b) ((a)<(b)?(a):(b))
33 
34 /*----------------------------------------------------------------------------*/
35 /** Stop monitoring the socket! (function prototype)
36  * @param [in] mctx: mtcp context
37  * @param [in] sock: monitoring stream socket id
38  * @param [in] side: side of monitoring (client side, server side or both)
39  *
40  * This function is now DEPRECATED and is only used within mOS core...
41  */
42 int
43 mtcp_cb_stop(mctx_t mctx, int sock, int side);
44 /*----------------------------------------------------------------------------*/
45 /** Reset the connection (send RST packets to both sides)
46  *  (We need to decide the API for this.)
47  */
48 //int
49 //mtcp_cb_reset(mctx_t mctx, int sock, int side);
50 /*----------------------------------------------------------------------------*/
51 inline mtcp_manager_t
52 GetMTCPManager(mctx_t mctx)
53 {
54 	if (!mctx) {
55 		errno = EACCES;
56 		return NULL;
57 	}
58 
59 	if (mctx->cpu < 0 || mctx->cpu >= num_cpus) {
60 		errno = EINVAL;
61 		return NULL;
62 	}
63 
64 	if (!g_mtcp[mctx->cpu] || g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) {
65 		errno = EPERM;
66 		return NULL;
67 	}
68 
69 	return g_mtcp[mctx->cpu];
70 }
71 /*----------------------------------------------------------------------------*/
72 static inline int
73 GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen)
74 {
75 	tcp_stream *cur_stream;
76 
77 	if (!socket->stream) {
78 		errno = EBADF;
79 		return -1;
80 	}
81 
82 	cur_stream = socket->stream;
83 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
84 		if (cur_stream->close_reason == TCP_TIMEDOUT ||
85 				cur_stream->close_reason == TCP_CONN_FAIL ||
86 				cur_stream->close_reason == TCP_CONN_LOST) {
87 			*(int *)optval = ETIMEDOUT;
88 			*optlen = sizeof(int);
89 
90 			return 0;
91 		}
92 	}
93 
94 	if (cur_stream->state == TCP_ST_CLOSE_WAIT ||
95 			cur_stream->state == TCP_ST_CLOSED_RSVD) {
96 		if (cur_stream->close_reason == TCP_RESET) {
97 			*(int *)optval = ECONNRESET;
98 			*optlen = sizeof(int);
99 
100 			return 0;
101 		}
102 	}
103 
104 	if (cur_stream->state == TCP_ST_SYN_SENT &&
105 	    errno == EINPROGRESS) {
106 		*(int *)optval = errno;
107 		*optlen = sizeof(int);
108 
109 		return -1;
110 	}
111 
112 	/*
113 	 * `base case`: If socket sees no so_error, then
114 	 * this also means close_reason will always be
115 	 * TCP_NOT_CLOSED.
116 	 */
117 	if (cur_stream->close_reason == TCP_NOT_CLOSED) {
118 		*(int *)optval = 0;
119 		*optlen = sizeof(int);
120 
121 		return 0;
122 	}
123 
124 	errno = ENOSYS;
125 	return -1;
126 }
127 /*----------------------------------------------------------------------------*/
128 int
129 mtcp_getsockname(mctx_t mctx, int sockid, struct sockaddr *addr,
130 		 socklen_t *addrlen)
131 {
132 	mtcp_manager_t mtcp;
133 	socket_map_t socket;
134 
135 	mtcp = GetMTCPManager(mctx);
136 	if (!mtcp) {
137 		return -1;
138 	}
139 
140 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
141 		TRACE_API("Socket id %d out of range.\n", sockid);
142 		errno = EBADF;
143 		return -1;
144 	}
145 
146 	socket = &mtcp->smap[sockid];
147 	if (socket->socktype == MOS_SOCK_UNUSED) {
148 		TRACE_API("Invalid socket id: %d\n", sockid);
149 		errno = EBADF;
150 		return -1;
151 	}
152 
153 	if (*addrlen <= 0) {
154 		TRACE_API("Invalid addrlen: %d\n", *addrlen);
155 		errno = EINVAL;
156 		return -1;
157 	}
158 
159 	if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
160 	    socket->socktype != MOS_SOCK_STREAM) {
161 		TRACE_API("Invalid socket id: %d\n", sockid);
162 		errno = ENOTSOCK;
163 		return -1;
164 	}
165 
166 	*(struct sockaddr_in *)addr = socket->saddr;
167         *addrlen = sizeof(socket->saddr);
168 
169 	return 0;
170 }
171 /*----------------------------------------------------------------------------*/
172 int
173 mtcp_getsockopt(mctx_t mctx, int sockid, int level,
174 		int optname, void *optval, socklen_t *optlen)
175 {
176 	mtcp_manager_t mtcp;
177 	socket_map_t socket;
178 
179 	mtcp = GetMTCPManager(mctx);
180 	if (!mtcp) {
181 		errno = EACCES;
182 		return -1;
183 	}
184 
185 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
186 		TRACE_API("Socket id %d out of range.\n", sockid);
187 		errno = EBADF;
188 		return -1;
189 	}
190 
191 	switch (level) {
192 	case SOL_SOCKET:
193 		socket = &mtcp->smap[sockid];
194 		if (socket->socktype == MOS_SOCK_UNUSED) {
195 			TRACE_API("Invalid socket id: %d\n", sockid);
196 			errno = EBADF;
197 			return -1;
198 		}
199 
200 		if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
201 		    socket->socktype != MOS_SOCK_STREAM) {
202 			TRACE_API("Invalid socket id: %d\n", sockid);
203 			errno = ENOTSOCK;
204 			return -1;
205 		}
206 
207 		if (optname == SO_ERROR) {
208 			if (socket->socktype == MOS_SOCK_STREAM) {
209 				return GetSocketError(socket, optval, optlen);
210 			}
211 		}
212 		break;
213 	case SOL_MONSOCKET:
214 		/* check if the calling thread is in MOS context */
215 		if (mtcp->ctx->thread != pthread_self()) {
216 			errno = EPERM;
217 			return -1;
218 		}
219 		/*
220 		 * All options will only work for active
221 		 * monitor stream sockets
222 		 */
223 		socket = &mtcp->msmap[sockid];
224 		if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
225 			TRACE_API("Invalid socket id: %d\n", sockid);
226 			errno = ENOTSOCK;
227 			return -1;
228 		}
229 
230 		switch (optname) {
231 		case MOS_FRAGINFO_CLIBUF:
232 			return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen);
233 		case MOS_FRAGINFO_SVRBUF:
234 			return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen);
235 		case MOS_INFO_CLIBUF:
236 			return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen);
237 		case MOS_INFO_SVRBUF:
238 			return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen);
239 		case MOS_TCP_STATE_CLI:
240 			return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI,
241 							   optval, optlen);
242 		case MOS_TCP_STATE_SVR:
243 			return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR,
244 							   optval, optlen);
245 		case MOS_TIMESTAMP:
246 			return GetLastTimestamp(socket->monitor_stream->stream,
247 						(uint32_t *)optval,
248 						optlen);
249 		default:
250 		  TRACE_API("can't recognize the optname=%d\n", optname);
251 		  assert(0);
252 		}
253 		break;
254 	}
255 	errno = ENOSYS;
256 	return -1;
257 }
258 /*----------------------------------------------------------------------------*/
259 int
260 mtcp_setsockopt(mctx_t mctx, int sockid, int level,
261 		int optname, const void *optval, socklen_t optlen)
262 {
263 	mtcp_manager_t mtcp;
264 	socket_map_t socket;
265 	tcprb_t *rb;
266 
267 	mtcp = GetMTCPManager(mctx);
268 	if (!mtcp) {
269 		errno = EACCES;
270 		return -1;
271 	}
272 
273 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
274 		TRACE_API("Socket id %d out of range.\n", sockid);
275 		errno = EBADF;
276 		return -1;
277 	}
278 
279 	switch (level) {
280 	case SOL_SOCKET:
281 		socket = &mtcp->smap[sockid];
282 		if (socket->socktype == MOS_SOCK_UNUSED) {
283 			TRACE_API("Invalid socket id: %d\n", sockid);
284 			errno = EBADF;
285 			return -1;
286 		}
287 
288 		if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
289 		    socket->socktype != MOS_SOCK_STREAM) {
290 			TRACE_API("Invalid socket id: %d\n", sockid);
291 			errno = ENOTSOCK;
292 			return -1;
293 		}
294 		break;
295 	case SOL_MONSOCKET:
296 		socket = &mtcp->msmap[sockid];
297 		/*
298 		 * checking of calling thread to be in MOS context is
299 		 * disabled since both optnames can be called from
300 		 * `application' context (on passive sockets)
301 		 */
302 		/*
303 		 * if (mtcp->ctx->thread != pthread_self())
304 		 * return -1;
305 		 */
306 
307 		switch (optname) {
308 		case MOS_CLIOVERLAP:
309 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
310 				socket->monitor_stream->stream->rcvvar->rcvbuf :
311 			socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
312 			if (rb == NULL) {
313 				errno = EFAULT;
314 				return -1;
315 			}
316 			if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) {
317 				errno = EINVAL;
318 				return -1;
319 			} else
320 				return 0;
321 			break;
322 		case MOS_SVROVERLAP:
323 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
324 				socket->monitor_stream->stream->rcvvar->rcvbuf :
325 			socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
326 			if (rb == NULL) {
327 				errno = EFAULT;
328 				return -1;
329 			}
330 			if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) {
331 				errno = EINVAL;
332 				return -1;
333 			} else
334 				return 0;
335 			break;
336 		case MOS_CLIBUF:
337 #if 0
338 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
339 				errno = EBADF;
340 				return -1;
341 			}
342 #endif
343 #ifdef DISABLE_DYN_RESIZE
344 			if (*(int *)optval != 0)
345 				return -1;
346 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
347 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
348 					socket->monitor_stream->stream->rcvvar->rcvbuf :
349 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
350 				if (rb) {
351 					tcprb_resize_meta(rb, 0);
352 					tcprb_resize(rb, 0);
353 				}
354 			}
355 			return DisableBuf(socket, MOS_SIDE_CLI);
356 #else
357 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
358 				socket->monitor_stream->stream->rcvvar->rcvbuf :
359 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
360 			if (tcprb_resize_meta(rb, *(int *)optval) < 0)
361 				return -1;
362 			return tcprb_resize(rb,
363 					(((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
364 #endif
365 		case MOS_SVRBUF:
366 #if 0
367 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
368 				errno = EBADF;
369 				return -1;
370 			}
371 #endif
372 #ifdef DISABLE_DYN_RESIZE
373 			if (*(int *)optval != 0)
374 				return -1;
375 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
376 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
377 					socket->monitor_stream->stream->rcvvar->rcvbuf :
378 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
379 				if (rb) {
380 					tcprb_resize_meta(rb, 0);
381 					tcprb_resize(rb, 0);
382 				}
383 			}
384 			return DisableBuf(socket, MOS_SIDE_SVR);
385 #else
386 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
387 				socket->monitor_stream->stream->rcvvar->rcvbuf :
388 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
389 			if (tcprb_resize_meta(rb, *(int *)optval) < 0)
390 				return -1;
391 			return tcprb_resize(rb,
392 					(((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
393 #endif
394 		case MOS_FRAG_CLIBUF:
395 #if 0
396 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
397 				errno = EBADF;
398 				return -1;
399 			}
400 #endif
401 #ifdef DISABLE_DYN_RESIZE
402 			if (*(int *)optval != 0)
403 				return -1;
404 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
405 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
406 					socket->monitor_stream->stream->rcvvar->rcvbuf :
407 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
408 				if (rb)
409 					tcprb_resize(rb, 0);
410 			}
411 			return 0;
412 #else
413 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
414 				socket->monitor_stream->stream->rcvvar->rcvbuf :
415 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
416 			if (rb->len == 0)
417 				return tcprb_resize_meta(rb, *(int *)optval);
418 			else
419 				return -1;
420 #endif
421 		case MOS_FRAG_SVRBUF:
422 #if 0
423 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
424 				errno = EBADF;
425 				return -1;
426 			}
427 #endif
428 #ifdef DISABLE_DYN_RESIZE
429 			if (*(int *)optval != 0)
430 				return -1;
431 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
432 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
433 					socket->monitor_stream->stream->rcvvar->rcvbuf :
434 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
435 				if (rb)
436 					tcprb_resize(rb, 0);
437 			}
438 			return 0;
439 #else
440 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
441 				socket->monitor_stream->stream->rcvvar->rcvbuf :
442 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
443 			if (rb->len == 0)
444 				return tcprb_resize_meta(rb, *(int *)optval);
445 			else
446 				return -1;
447 #endif
448 		case MOS_MONLEVEL:
449 #ifdef OLD_API
450 			assert(*(int *)optval == MOS_NO_CLIBUF ||
451 			       *(int *)optval == MOS_NO_SVRBUF);
452 			return DisableBuf(socket,
453 					  (*(int *)optval == MOS_NO_CLIBUF) ?
454 					  MOS_SIDE_CLI : MOS_SIDE_SVR);
455 #endif
456 		case MOS_SEQ_REMAP:
457 			return TcpSeqChange(socket,
458 					    (uint32_t)((seq_remap_info *)optval)->seq_off,
459 					    ((seq_remap_info *)optval)->side,
460 					    mtcp->pctx->p.seq);
461 		case MOS_STOP_MON:
462 			return mtcp_cb_stop(mctx, sockid, *(int *)optval);
463 		default:
464 			TRACE_API("invalid optname=%d\n", optname);
465 			assert(0);
466 		}
467 		break;
468 	}
469 
470 	errno = ENOSYS;
471 	return -1;
472 }
473 /*----------------------------------------------------------------------------*/
474 int
475 mtcp_setsock_nonblock(mctx_t mctx, int sockid)
476 {
477 	mtcp_manager_t mtcp;
478 
479 	mtcp = GetMTCPManager(mctx);
480 	if (!mtcp) {
481 		errno = EACCES;
482 		return -1;
483 	}
484 
485 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
486 		TRACE_API("Socket id %d out of range.\n", sockid);
487 		errno = EBADF;
488 		return -1;
489 	}
490 
491 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
492 		TRACE_API("Invalid socket id: %d\n", sockid);
493 		errno = EBADF;
494 		return -1;
495 	}
496 
497 	mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
498 
499 	return 0;
500 }
501 /*----------------------------------------------------------------------------*/
502 int
503 mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp)
504 {
505 	mtcp_manager_t mtcp;
506 	socket_map_t socket;
507 
508 	mtcp = GetMTCPManager(mctx);
509 	if (!mtcp) {
510 		errno = EACCES;
511 		return -1;
512 	}
513 
514 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
515 		TRACE_API("Socket id %d out of range.\n", sockid);
516 		errno = EBADF;
517 		return -1;
518 	}
519 
520 	/* only support stream socket */
521 	socket = &mtcp->smap[sockid];
522 
523 	if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
524 		socket->socktype != MOS_SOCK_STREAM) {
525 		TRACE_API("Invalid socket id: %d\n", sockid);
526 		errno = EBADF;
527 		return -1;
528 	}
529 
530 	if (!argp) {
531 		errno = EFAULT;
532 		return -1;
533 	}
534 
535 	if (request == FIONREAD) {
536 		tcp_stream *cur_stream;
537 		tcprb_t *rbuf;
538 
539 		cur_stream = socket->stream;
540 		if (!cur_stream) {
541 			errno = EBADF;
542 			return -1;
543 		}
544 
545 		rbuf = cur_stream->rcvvar->rcvbuf;
546 		*(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0;
547 
548 	} else if (request == FIONBIO) {
549 		/*
550 		 * sockets can only be set to blocking/non-blocking
551 		 * modes during initialization
552 		 */
553 		if ((*(int *)argp))
554 			mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
555 		else
556 			mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK;
557 	} else {
558 		errno = EINVAL;
559 		return -1;
560 	}
561 
562 	return 0;
563 }
564 /*----------------------------------------------------------------------------*/
565 static int
566 mtcp_monitor(mctx_t mctx, socket_map_t sock)
567 {
568 	mtcp_manager_t mtcp;
569 	struct mon_listener *monitor;
570 	int sockid = sock->id;
571 
572 	mtcp = GetMTCPManager(mctx);
573 	if (!mtcp) {
574 		errno = EACCES;
575 		return -1;
576 	}
577 
578 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
579 		TRACE_API("Socket id %d out of range.\n", sockid);
580 		errno = EBADF;
581 		return -1;
582 	}
583 
584 	if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) {
585 		TRACE_API("Invalid socket id: %d\n", sockid);
586 		errno = EBADF;
587 		return -1;
588 	}
589 
590 	if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM ||
591 	      mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) {
592 		TRACE_API("Not a monitor socket. id: %d\n", sockid);
593 		errno = ENOTSOCK;
594 		return -1;
595 	}
596 
597 	monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener));
598 	if (!monitor) {
599 		/* errno set from the malloc() */
600 		errno = ENOMEM;
601 		return -1;
602 	}
603 
604 	/* create a monitor-specific event queue */
605 	monitor->eq = CreateEventQueue(g_config.mos->max_concurrency);
606 	if (!monitor->eq) {
607 		TRACE_API("Can't create event queue (concurrency: %d) for "
608 			  "monitor read event registrations!\n",
609 			  g_config.mos->max_concurrency);
610 		free(monitor);
611 		errno = ENOMEM;
612 		return -1;
613 	}
614 
615 	/* set monitor-related basic parameters */
616 #ifndef NEWEV
617 	monitor->ude_id = UDE_OFFSET;
618 #endif
619 	monitor->socket = sock;
620 	monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL;
621 
622 	/* perform both sides monitoring by default */
623 	monitor->client_mon = monitor->server_mon = 1;
624 
625 	/* add monitor socket to the monitor list */
626 	TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link);
627 
628 	mtcp->msmap[sockid].monitor_listener = monitor;
629 
630 	return 0;
631 }
632 /*----------------------------------------------------------------------------*/
633 int
634 mtcp_socket(mctx_t mctx, int domain, int type, int protocol)
635 {
636 	mtcp_manager_t mtcp;
637 	socket_map_t socket;
638 
639 	mtcp = GetMTCPManager(mctx);
640 	if (!mtcp) {
641 		errno = EACCES;
642 		return -1;
643 	}
644 
645 	if (domain != AF_INET) {
646 		errno = EAFNOSUPPORT;
647 		return -1;
648 	}
649 
650 	if (type == SOCK_STREAM) {
651 		type = MOS_SOCK_STREAM;
652 	} else if (type == MOS_SOCK_MONITOR_STREAM ||
653 		   type == MOS_SOCK_MONITOR_RAW) {
654 		/* do nothing for the time being */
655 	} else {
656 		/* Not supported type */
657 		errno = EINVAL;
658 		return -1;
659 	}
660 
661 	socket = AllocateSocket(mctx, type);
662 	if (!socket) {
663 		errno = ENFILE;
664 		return -1;
665 	}
666 
667 	if (type == MOS_SOCK_MONITOR_STREAM ||
668 	    type == MOS_SOCK_MONITOR_RAW) {
669 		mtcp_manager_t mtcp = GetMTCPManager(mctx);
670 		if (!mtcp) {
671 			errno = EACCES;
672 			return -1;
673 		}
674 		mtcp_monitor(mctx, socket);
675 #ifdef NEWEV
676 		socket->monitor_listener->stree_dontcare = NULL;
677 		socket->monitor_listener->stree_pre_rcv = NULL;
678 		socket->monitor_listener->stree_post_snd = NULL;
679 #else
680 		InitEvB(mtcp, &socket->monitor_listener->dontcare_evb);
681 		InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb);
682 		InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb);
683 #endif
684 	}
685 
686 	return socket->id;
687 }
688 /*----------------------------------------------------------------------------*/
689 int
690 mtcp_bind(mctx_t mctx, int sockid,
691 		const struct sockaddr *addr, socklen_t addrlen)
692 {
693 	mtcp_manager_t mtcp;
694 	struct sockaddr_in *addr_in;
695 
696 	mtcp = GetMTCPManager(mctx);
697 	if (!mtcp) {
698 		errno = EACCES;
699 		return -1;
700 	}
701 
702 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
703 		TRACE_API("Socket id %d out of range.\n", sockid);
704 		errno = EBADF;
705 		return -1;
706 	}
707 
708 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
709 		TRACE_API("Invalid socket id: %d\n", sockid);
710 		errno = EBADF;
711 		return -1;
712 	}
713 
714 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM &&
715 			mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
716 		TRACE_API("Not a stream socket id: %d\n", sockid);
717 		errno = ENOTSOCK;
718 		return -1;
719 	}
720 
721 	if (!addr) {
722 		TRACE_API("Socket %d: empty address!\n", sockid);
723 		errno = EINVAL;
724 		return -1;
725 	}
726 
727 	if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) {
728 		TRACE_API("Socket %d: adress already bind for this socket.\n", sockid);
729 		errno = EINVAL;
730 		return -1;
731 	}
732 
733 	/* we only allow bind() for AF_INET address */
734 	if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
735 		TRACE_API("Socket %d: invalid argument!\n", sockid);
736 		errno = EINVAL;
737 		return -1;
738 	}
739 
740 	if (mtcp->listener) {
741 		TRACE_API("Address already bound!\n");
742 		errno = EINVAL;
743 		return -1;
744 	}
745 	addr_in = (struct sockaddr_in *)addr;
746 	mtcp->smap[sockid].saddr = *addr_in;
747 	mtcp->smap[sockid].opts |= MTCP_ADDR_BIND;
748 
749 	return 0;
750 }
751 /*----------------------------------------------------------------------------*/
752 int
753 mtcp_listen(mctx_t mctx, int sockid, int backlog)
754 {
755 	mtcp_manager_t mtcp;
756 	struct tcp_listener *listener;
757 
758 	mtcp = GetMTCPManager(mctx);
759 	if (!mtcp) {
760 		errno = EACCES;
761 		return -1;
762 	}
763 
764 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
765 		TRACE_API("Socket id %d out of range.\n", sockid);
766 		errno = EBADF;
767 		return -1;
768 	}
769 
770 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
771 		TRACE_API("Invalid socket id: %d\n", sockid);
772 		errno = EBADF;
773 		return -1;
774 	}
775 
776 	if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) {
777 		mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN;
778 	}
779 
780 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
781 		TRACE_API("Not a listening socket. id: %d\n", sockid);
782 		errno = ENOTSOCK;
783 		return -1;
784 	}
785 
786 	if (backlog <= 0 || backlog > g_config.mos->max_concurrency) {
787 		errno = EINVAL;
788 		return -1;
789 	}
790 
791 	listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener));
792 	if (!listener) {
793 		/* errno set from the malloc() */
794 		errno = ENOMEM;
795 		return -1;
796 	}
797 
798 	listener->sockid = sockid;
799 	listener->backlog = backlog;
800 	listener->socket = &mtcp->smap[sockid];
801 
802 	if (pthread_cond_init(&listener->accept_cond, NULL)) {
803 		perror("pthread_cond_init of ctx->accept_cond\n");
804 		/* errno set by pthread_cond_init() */
805 		return -1;
806 	}
807 	if (pthread_mutex_init(&listener->accept_lock, NULL)) {
808 		perror("pthread_mutex_init of ctx->accept_lock\n");
809 		/* errno set by pthread_mutex_init() */
810 		return -1;
811 	}
812 
813 	listener->acceptq = CreateStreamQueue(backlog);
814 	if (!listener->acceptq) {
815 		free(listener);
816 		errno = ENOMEM;
817 		return -1;
818 	}
819 
820 	mtcp->smap[sockid].listener = listener;
821 	mtcp->listener = listener;
822 
823 	return 0;
824 }
825 /*----------------------------------------------------------------------------*/
826 int
827 mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen)
828 {
829 	mtcp_manager_t mtcp;
830 	struct tcp_listener *listener;
831 	socket_map_t socket;
832 	tcp_stream *accepted = NULL;
833 
834 	mtcp = GetMTCPManager(mctx);
835 	if (!mtcp) {
836 		errno = EACCES;
837 		return -1;
838 	}
839 
840 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
841 		TRACE_API("Socket id %d out of range.\n", sockid);
842 		errno = EBADF;
843 		return -1;
844 	}
845 
846 	/* requires listening socket */
847 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
848 		errno = EINVAL;
849 		return -1;
850 	}
851 
852 	listener = mtcp->smap[sockid].listener;
853 
854 	/* dequeue from the acceptq without lock first */
855 	/* if nothing there, acquire lock and cond_wait */
856 	accepted = StreamDequeue(listener->acceptq);
857 	if (!accepted) {
858 		if (listener->socket->opts & MTCP_NONBLOCK) {
859 			errno = EAGAIN;
860 			return -1;
861 
862 		} else {
863 			pthread_mutex_lock(&listener->accept_lock);
864 			while ((accepted = StreamDequeue(listener->acceptq)) == NULL) {
865 				pthread_cond_wait(&listener->accept_cond, &listener->accept_lock);
866 
867 				if (mtcp->ctx->done || mtcp->ctx->exit) {
868 					pthread_mutex_unlock(&listener->accept_lock);
869 					errno = EINTR;
870 					return -1;
871 				}
872 			}
873 			pthread_mutex_unlock(&listener->accept_lock);
874 		}
875 	}
876 
877 	if (!accepted) {
878 		TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n");
879 	}
880 
881 	if (!accepted->socket) {
882 		socket = AllocateSocket(mctx, MOS_SOCK_STREAM);
883 		if (!socket) {
884 			TRACE_ERROR("Failed to create new socket!\n");
885 			/* TODO: destroy the stream */
886 			errno = ENFILE;
887 			return -1;
888 		}
889 		socket->stream = accepted;
890 		accepted->socket = socket;
891 
892 		/* set socket addr parameters */
893 		socket->saddr.sin_family = AF_INET;
894 		socket->saddr.sin_port = accepted->dport;
895 		socket->saddr.sin_addr.s_addr = accepted->daddr;
896 
897 		/* if monitor is enabled, complete the socket assignment */
898 		if (socket->stream->pair_stream != NULL)
899 			socket->stream->pair_stream->socket = socket;
900 	}
901 
902 	if (!(listener->socket->epoll & MOS_EPOLLET) &&
903 	    !StreamQueueIsEmpty(listener->acceptq))
904 		AddEpollEvent(mtcp->ep,
905 			      USR_SHADOW_EVENT_QUEUE,
906 			      listener->socket, MOS_EPOLLIN);
907 
908 	TRACE_API("Stream %d accepted.\n", accepted->id);
909 
910 	if (addr && addrlen) {
911 		struct sockaddr_in *addr_in = (struct sockaddr_in *)addr;
912 		addr_in->sin_family = AF_INET;
913 		addr_in->sin_port = accepted->dport;
914 		addr_in->sin_addr.s_addr = accepted->daddr;
915 		*addrlen = sizeof(struct sockaddr_in);
916 	}
917 
918 	return accepted->socket->id;
919 }
920 /*----------------------------------------------------------------------------*/
921 int
922 mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr,
923 		in_addr_t daddr, in_addr_t dport)
924 {
925 	mtcp_manager_t mtcp;
926 	addr_pool_t ap;
927 
928 	mtcp = GetMTCPManager(mctx);
929 	if (!mtcp) {
930 		errno = EACCES;
931 		return -1;
932 	}
933 
934 	if (saddr_base == INADDR_ANY) {
935 		int nif_out;
936 
937 		/* for the INADDR_ANY, find the output interface for the destination
938 		   and set the saddr_base as the ip address of the output interface */
939 		nif_out = GetOutputInterface(daddr);
940 		if (nif_out < 0) {
941 			TRACE_DBG("Could not determine nif idx!\n");
942 			errno = EINVAL;
943 			return -1;
944 		}
945 		saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr;
946 	}
947 
948 	ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus,
949 			saddr_base, num_addr, daddr, dport);
950 	if (!ap) {
951 		errno = ENOMEM;
952 		return -1;
953 	}
954 
955 	mtcp->ap = ap;
956 
957 	return 0;
958 }
959 /*----------------------------------------------------------------------------*/
960 int
961 eval_bpf_5tuple(struct sfbpf_program fcode,
962 				in_addr_t saddr, in_port_t sport,
963 				in_addr_t daddr, in_port_t dport) {
964 	uint8_t buf[TOTAL_TCP_HEADER_LEN];
965 	struct ethhdr *ethh;
966 	struct iphdr *iph;
967 	struct tcphdr *tcph;
968 
969 	ethh = (struct ethhdr *)buf;
970 	ethh->h_proto = htons(ETH_P_IP);
971 	iph = (struct iphdr *)(ethh + 1);
972 	iph->ihl = IP_HEADER_LEN >> 2;
973 	iph->version = 4;
974 	iph->tos = 0;
975 	iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN);
976 	iph->id = htons(0);
977 	iph->protocol = IPPROTO_TCP;
978 	iph->saddr = saddr;
979 	iph->daddr = daddr;
980 	iph->check = 0;
981 	tcph = (struct tcphdr *)(iph + 1);
982 	tcph->source = sport;
983 	tcph->dest = dport;
984 
985 	return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr),
986 						 TOTAL_TCP_HEADER_LEN);
987 }
988 /*----------------------------------------------------------------------------*/
989 int
990 mtcp_connect(mctx_t mctx, int sockid,
991 		const struct sockaddr *addr, socklen_t addrlen)
992 {
993 	mtcp_manager_t mtcp;
994 	socket_map_t socket;
995 	tcp_stream *cur_stream;
996 	struct sockaddr_in *addr_in;
997 	in_addr_t dip;
998 	in_port_t dport;
999 	int is_dyn_bound = FALSE;
1000 	int ret, nif;
1001 	int cnt_match = 0;
1002 	struct mon_listener *walk;
1003 	struct sfbpf_program fcode;
1004 
1005 	cur_stream = NULL;
1006 	mtcp = GetMTCPManager(mctx);
1007 	if (!mtcp) {
1008 		errno = EACCES;
1009 		return -1;
1010 	}
1011 
1012 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1013 		TRACE_API("Socket id %d out of range.\n", sockid);
1014 		errno = EBADF;
1015 		return -1;
1016 	}
1017 
1018 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1019 		TRACE_API("Invalid socket id: %d\n", sockid);
1020 		errno = EBADF;
1021 		return -1;
1022 	}
1023 
1024 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
1025 		TRACE_API("Not an end socket. id: %d\n", sockid);
1026 		errno = ENOTSOCK;
1027 		return -1;
1028 	}
1029 
1030 	if (!addr) {
1031 		TRACE_API("Socket %d: empty address!\n", sockid);
1032 		errno = EFAULT;
1033 		return -1;
1034 	}
1035 
1036 	/* we only allow bind() for AF_INET address */
1037 	if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
1038 		TRACE_API("Socket %d: invalid argument!\n", sockid);
1039 		errno = EAFNOSUPPORT;
1040 		return -1;
1041 	}
1042 
1043 	socket = &mtcp->smap[sockid];
1044 	if (socket->stream) {
1045 		TRACE_API("Socket %d: stream already exist!\n", sockid);
1046 		if (socket->stream->state >= TCP_ST_ESTABLISHED) {
1047 			errno = EISCONN;
1048 		} else {
1049 			errno = EALREADY;
1050 		}
1051 		return -1;
1052 	}
1053 
1054 	addr_in = (struct sockaddr_in *)addr;
1055 	dip = addr_in->sin_addr.s_addr;
1056 	dport = addr_in->sin_port;
1057 
1058 	/* address binding */
1059 	if (socket->opts & MTCP_ADDR_BIND &&
1060 	    socket->saddr.sin_port != INPORT_ANY &&
1061 	    socket->saddr.sin_addr.s_addr != INADDR_ANY) {
1062 		int rss_core;
1063 
1064 		rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip,
1065 				socket->saddr.sin_port, dport, num_queues);
1066 
1067 		if (rss_core != mctx->cpu) {
1068 			errno = EINVAL;
1069 			return -1;
1070 		}
1071 	} else {
1072 		if (mtcp->ap) {
1073 			ret = FetchAddress(mtcp->ap,
1074 					mctx->cpu, num_queues, addr_in, &socket->saddr);
1075 		} else {
1076 			nif = GetOutputInterface(dip);
1077 			if (nif < 0) {
1078 				errno = EINVAL;
1079 				return -1;
1080 			}
1081 			ret = FetchAddress(ap[nif],
1082 					   mctx->cpu, num_queues, addr_in, &socket->saddr);
1083 		}
1084 		if (ret < 0) {
1085 			errno = EAGAIN;
1086 			return -1;
1087 		}
1088 		socket->opts |= MTCP_ADDR_BIND;
1089 		is_dyn_bound = TRUE;
1090 	}
1091 
1092 	cnt_match = 0;
1093 	if (mtcp->num_msp > 0) {
1094 		TAILQ_FOREACH(walk, &mtcp->monitors, link) {
1095 			fcode = walk->stream_syn_fcode;
1096 			if (!(ISSET_BPFFILTER(fcode) &&
1097 				  eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr,
1098 								  socket->saddr.sin_port,
1099 								  dip, dport) == 0)) {
1100 				walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1
1101 				cnt_match++;
1102 			}
1103 		}
1104 	}
1105 
1106 	if (mtcp->num_msp > 0 && cnt_match > 0) {
1107 		/* 150820 dhkim: XXX: embedded mode is not verified */
1108 #if 1
1109 		cur_stream = CreateClientTCPStream(mtcp, socket,
1110 						 STREAM_TYPE(MOS_SOCK_STREAM) |
1111 						 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
1112 						 socket->saddr.sin_addr.s_addr,
1113 						 socket->saddr.sin_port, dip, dport, NULL);
1114 #else
1115 		cur_stream = CreateDualTCPStream(mtcp, socket,
1116 						 STREAM_TYPE(MOS_SOCK_STREAM) |
1117 						 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
1118 						 socket->saddr.sin_addr.s_addr,
1119 						 socket->saddr.sin_port, dip, dport, NULL);
1120 #endif
1121 	}
1122 	else
1123 		cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM),
1124 					     socket->saddr.sin_addr.s_addr,
1125 					     socket->saddr.sin_port, dip, dport, NULL);
1126 	if (!cur_stream) {
1127 		TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid);
1128 		errno = ENOMEM;
1129 		return -1;
1130 	}
1131 
1132 	if (is_dyn_bound)
1133 		cur_stream->is_bound_addr = TRUE;
1134 	cur_stream->sndvar->cwnd = 1;
1135 	cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10;
1136 	cur_stream->side = MOS_SIDE_CLI;
1137 	/* if monitor is enabled, update the pair stream side as well */
1138 	if (cur_stream->pair_stream) {
1139 		cur_stream->pair_stream->side = MOS_SIDE_SVR;
1140 		/*
1141 		 * if buffer management is off, then disable
1142 		 * monitoring tcp ring of server...
1143 		 * if there is even a single monitor asking for
1144 		 * buffer management, enable it (that's why the
1145 		 * need for the loop)
1146 		 */
1147 		cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF;
1148 		struct socket_map *walk;
1149 		SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
1150 			uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt;
1151 			if (bm > cur_stream->pair_stream->buffer_mgmt) {
1152 				cur_stream->pair_stream->buffer_mgmt = bm;
1153 				break;
1154 			}
1155 		} SOCKQ_FOREACH_END;
1156 	}
1157 
1158 	cur_stream->state = TCP_ST_SYN_SENT;
1159 	cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1160 
1161 	TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id);
1162 
1163 	SQ_LOCK(&mtcp->ctx->connect_lock);
1164 	ret = StreamEnqueue(mtcp->connectq, cur_stream);
1165 	SQ_UNLOCK(&mtcp->ctx->connect_lock);
1166 	mtcp->wakeup_flag = TRUE;
1167 	if (ret < 0) {
1168 		TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid);
1169 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1170 		StreamEnqueue(mtcp->destroyq, cur_stream);
1171 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1172 		errno = EAGAIN;
1173 		return -1;
1174 	}
1175 
1176 	/* if nonblocking socket, return EINPROGRESS */
1177 	if (socket->opts & MTCP_NONBLOCK) {
1178 		errno = EINPROGRESS;
1179 		return -1;
1180 
1181 	} else {
1182 		while (1) {
1183 			if (!cur_stream) {
1184 				TRACE_ERROR("STREAM DESTROYED\n");
1185 				errno = ETIMEDOUT;
1186 				return -1;
1187 			}
1188 			if (cur_stream->state > TCP_ST_ESTABLISHED) {
1189 				TRACE_ERROR("Socket %d: weird state %s\n",
1190 						sockid, TCPStateToString(cur_stream));
1191 				// TODO: how to handle this?
1192 				errno = ENOSYS;
1193 				return -1;
1194 			}
1195 
1196 			if (cur_stream->state == TCP_ST_ESTABLISHED) {
1197 				break;
1198 			}
1199 			usleep(1000);
1200 		}
1201 	}
1202 
1203 	return 0;
1204 }
1205 /*----------------------------------------------------------------------------*/
1206 static inline int
1207 CloseStreamSocket(mctx_t mctx, int sockid)
1208 {
1209 	mtcp_manager_t mtcp;
1210 	tcp_stream *cur_stream;
1211 	int ret;
1212 
1213 	mtcp = GetMTCPManager(mctx);
1214 	if (!mtcp) {
1215 		errno = EACCES;
1216 		return -1;
1217 	}
1218 
1219 	cur_stream = mtcp->smap[sockid].stream;
1220 	if (!cur_stream) {
1221 		TRACE_API("Socket %d: stream does not exist.\n", sockid);
1222 		errno = ENOTCONN;
1223 		return -1;
1224 	}
1225 
1226 	if (cur_stream->closed) {
1227 		TRACE_API("Socket %d (Stream %u): already closed stream\n",
1228 				sockid, cur_stream->id);
1229 		return 0;
1230 	}
1231 	cur_stream->closed = TRUE;
1232 
1233 	TRACE_API("Stream %d: closing the stream.\n", cur_stream->id);
1234 
1235 	/* 141029 dhkim: Check this! */
1236 	cur_stream->socket = NULL;
1237 
1238 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
1239 		TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n",
1240 				cur_stream->id);
1241 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1242 		StreamEnqueue(mtcp->destroyq, cur_stream);
1243 		mtcp->wakeup_flag = TRUE;
1244 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1245 		return 0;
1246 
1247 	} else if (cur_stream->state == TCP_ST_SYN_SENT) {
1248 #if 1
1249 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1250 		StreamEnqueue(mtcp->destroyq, cur_stream);
1251 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1252 		mtcp->wakeup_flag = TRUE;
1253 #endif
1254 		return -1;
1255 
1256 	} else if (cur_stream->state != TCP_ST_ESTABLISHED &&
1257 			cur_stream->state != TCP_ST_CLOSE_WAIT) {
1258 		TRACE_API("Stream %d at state %s\n",
1259 				cur_stream->id, TCPStateToString(cur_stream));
1260 		errno = EBADF;
1261 		return -1;
1262 	}
1263 
1264 	SQ_LOCK(&mtcp->ctx->close_lock);
1265 	cur_stream->sndvar->on_closeq = TRUE;
1266 	ret = StreamEnqueue(mtcp->closeq, cur_stream);
1267 	mtcp->wakeup_flag = TRUE;
1268 	SQ_UNLOCK(&mtcp->ctx->close_lock);
1269 
1270 	if (ret < 0) {
1271 		TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
1272 		errno = EAGAIN;
1273 		return -1;
1274 	}
1275 
1276 	return 0;
1277 }
1278 /*----------------------------------------------------------------------------*/
1279 static inline int
1280 CloseListeningSocket(mctx_t mctx, int sockid)
1281 {
1282 	mtcp_manager_t mtcp;
1283 	struct tcp_listener *listener;
1284 
1285 	mtcp = GetMTCPManager(mctx);
1286 	if (!mtcp) {
1287 		errno = EACCES;
1288 		return -1;
1289 	}
1290 
1291 	listener = mtcp->smap[sockid].listener;
1292 	if (!listener) {
1293 		errno = EINVAL;
1294 		return -1;
1295 	}
1296 
1297 	if (listener->acceptq) {
1298 		DestroyStreamQueue(listener->acceptq);
1299 		listener->acceptq = NULL;
1300 	}
1301 
1302 	pthread_mutex_lock(&listener->accept_lock);
1303 	pthread_cond_signal(&listener->accept_cond);
1304 	pthread_mutex_unlock(&listener->accept_lock);
1305 
1306 	pthread_cond_destroy(&listener->accept_cond);
1307 	pthread_mutex_destroy(&listener->accept_lock);
1308 
1309 	free(listener);
1310 	mtcp->smap[sockid].listener = NULL;
1311 
1312 	return 0;
1313 }
1314 /*----------------------------------------------------------------------------*/
1315 int
1316 mtcp_close(mctx_t mctx, int sockid)
1317 {
1318 	mtcp_manager_t mtcp;
1319 	int ret;
1320 
1321 	mtcp = GetMTCPManager(mctx);
1322 	if (!mtcp) {
1323 		errno = EACCES;
1324 		return -1;
1325 	}
1326 
1327 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1328 		TRACE_API("Socket id %d out of range.\n", sockid);
1329 		errno = EBADF;
1330 		return -1;
1331 	}
1332 
1333 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1334 		TRACE_API("Invalid socket id: %d\n", sockid);
1335 		errno = EBADF;
1336 		return -1;
1337 	}
1338 
1339 	TRACE_API("Socket %d: mtcp_close called.\n", sockid);
1340 
1341 	switch (mtcp->smap[sockid].socktype) {
1342 	case MOS_SOCK_STREAM:
1343 		ret = CloseStreamSocket(mctx, sockid);
1344 		break;
1345 
1346 	case MOS_SOCK_STREAM_LISTEN:
1347 		ret = CloseListeningSocket(mctx, sockid);
1348 		break;
1349 
1350 	case MOS_SOCK_EPOLL:
1351 		ret = CloseEpollSocket(mctx, sockid);
1352 		break;
1353 
1354 	case MOS_SOCK_PIPE:
1355 		ret = PipeClose(mctx, sockid);
1356 		break;
1357 
1358 	default:
1359 		errno = EINVAL;
1360 		ret = -1;
1361 		break;
1362 	}
1363 
1364 	FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
1365 
1366 	return ret;
1367 }
1368 /*----------------------------------------------------------------------------*/
1369 int
1370 mtcp_abort(mctx_t mctx, int sockid)
1371 {
1372 	mtcp_manager_t mtcp;
1373 	tcp_stream *cur_stream;
1374 	int ret;
1375 
1376 	mtcp = GetMTCPManager(mctx);
1377 	if (!mtcp) {
1378 		errno = EACCES;
1379 		return -1;
1380 	}
1381 
1382 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1383 		TRACE_API("Socket id %d out of range.\n", sockid);
1384 		errno = EBADF;
1385 		return -1;
1386 	}
1387 
1388 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1389 		TRACE_API("Invalid socket id: %d\n", sockid);
1390 		errno = EBADF;
1391 		return -1;
1392 	}
1393 
1394 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
1395 		TRACE_API("Not an end socket. id: %d\n", sockid);
1396 		errno = ENOTSOCK;
1397 		return -1;
1398 	}
1399 
1400 	cur_stream = mtcp->smap[sockid].stream;
1401 	if (!cur_stream) {
1402 		TRACE_API("Stream %d: does not exist.\n", sockid);
1403 		errno = ENOTCONN;
1404 		return -1;
1405 	}
1406 
1407 	TRACE_API("Socket %d: mtcp_abort()\n", sockid);
1408 
1409 	FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
1410 	cur_stream->socket = NULL;
1411 
1412 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
1413 		TRACE_API("Stream %d: connection already reset.\n", sockid);
1414 		return ERROR;
1415 
1416 	} else if (cur_stream->state == TCP_ST_SYN_SENT) {
1417 		/* TODO: this should notify event failure to all
1418 		   previous read() or write() calls */
1419 		cur_stream->state = TCP_ST_CLOSED_RSVD;
1420 		cur_stream->close_reason = TCP_ACTIVE_CLOSE;
1421 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1422 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1423 		StreamEnqueue(mtcp->destroyq, cur_stream);
1424 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1425 		mtcp->wakeup_flag = TRUE;
1426 		return 0;
1427 
1428 	} else if (cur_stream->state == TCP_ST_CLOSING ||
1429 			cur_stream->state == TCP_ST_LAST_ACK ||
1430 			cur_stream->state == TCP_ST_TIME_WAIT) {
1431 		cur_stream->state = TCP_ST_CLOSED_RSVD;
1432 		cur_stream->close_reason = TCP_ACTIVE_CLOSE;
1433 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1434 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1435 		StreamEnqueue(mtcp->destroyq, cur_stream);
1436 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1437 		mtcp->wakeup_flag = TRUE;
1438 		return 0;
1439 	}
1440 
1441 	/* the stream structure will be destroyed after sending RST */
1442 	if (cur_stream->sndvar->on_resetq) {
1443 		TRACE_ERROR("Stream %d: calling mtcp_abort() "
1444 				"when in reset queue.\n", sockid);
1445 		errno = ECONNRESET;
1446 		return -1;
1447 	}
1448 	SQ_LOCK(&mtcp->ctx->reset_lock);
1449 	cur_stream->sndvar->on_resetq = TRUE;
1450 	ret = StreamEnqueue(mtcp->resetq, cur_stream);
1451 	SQ_UNLOCK(&mtcp->ctx->reset_lock);
1452 	mtcp->wakeup_flag = TRUE;
1453 
1454 	if (ret < 0) {
1455 		TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
1456 		errno = EAGAIN;
1457 		return -1;
1458 	}
1459 
1460 	return 0;
1461 }
1462 /*----------------------------------------------------------------------------*/
1463 static inline int
1464 PeekForUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len)
1465 {
1466 	struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
1467 	int copylen;
1468 	tcprb_t *rb = rcvvar->rcvbuf;
1469 
1470 	if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) {
1471 		errno = EAGAIN;
1472 		return -1;
1473 	}
1474 
1475 	return copylen;
1476 }
1477 /*----------------------------------------------------------------------------*/
1478 static inline int
1479 CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len)
1480 {
1481 	struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
1482 	int copylen;
1483 	tcprb_t *rb = rcvvar->rcvbuf;
1484 	if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) {
1485 		errno = EAGAIN;
1486 		return -1;
1487 	}
1488 	tcprb_setpile(rb, rb->pile + copylen);
1489 
1490 	rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb);
1491 	//printf("rcv_wnd: %d\n", rcvvar->rcv_wnd);
1492 
1493 	/* Advertise newly freed receive buffer */
1494 	if (cur_stream->need_wnd_adv) {
1495 		if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) {
1496 			if (!cur_stream->sndvar->on_ackq) {
1497 				SQ_LOCK(&mtcp->ctx->ackq_lock);
1498 				cur_stream->sndvar->on_ackq = TRUE;
1499 				StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */
1500 				SQ_UNLOCK(&mtcp->ctx->ackq_lock);
1501 				cur_stream->need_wnd_adv = FALSE;
1502 				mtcp->wakeup_flag = TRUE;
1503 			}
1504 		}
1505 	}
1506 
1507 	return copylen;
1508 }
1509 /*----------------------------------------------------------------------------*/
1510 ssize_t
1511 mtcp_recv(mctx_t mctx, int sockid, char *buf, size_t len, int flags)
1512 {
1513 	mtcp_manager_t mtcp;
1514 	socket_map_t socket;
1515 	tcp_stream *cur_stream;
1516 	struct tcp_recv_vars *rcvvar;
1517 	int event_remaining, merged_len;
1518 	int ret;
1519 
1520 	mtcp = GetMTCPManager(mctx);
1521 	if (!mtcp) {
1522 		errno = EACCES;
1523 		return -1;
1524 	}
1525 
1526 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1527 		TRACE_API("Socket id %d out of range.\n", sockid);
1528 		errno = EBADF;
1529 		return -1;
1530 	}
1531 
1532 	socket = &mtcp->smap[sockid];
1533 	if (socket->socktype == MOS_SOCK_UNUSED) {
1534 		TRACE_API("Invalid socket id: %d\n", sockid);
1535 		errno = EBADF;
1536 		return -1;
1537 	}
1538 
1539 	if (socket->socktype == MOS_SOCK_PIPE) {
1540 		return PipeRead(mctx, sockid, buf, len);
1541 	}
1542 
1543 	if (socket->socktype != MOS_SOCK_STREAM) {
1544 		TRACE_API("Not an end socket. id: %d\n", sockid);
1545 		errno = ENOTSOCK;
1546 		return -1;
1547 	}
1548 
1549 	/* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
1550 	cur_stream = socket->stream;
1551 	if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf ||
1552 	    !(cur_stream->state >= TCP_ST_ESTABLISHED &&
1553 	      cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
1554 		errno = ENOTCONN;
1555 		return -1;
1556 	}
1557 
1558 	rcvvar = cur_stream->rcvvar;
1559 
1560 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1561 
1562 	/* if CLOSE_WAIT, return 0 if there is no payload */
1563 	if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
1564 		if (!rcvvar->rcvbuf)
1565 			return 0;
1566 
1567 		if (merged_len == 0)
1568 			return 0;
1569 	}
1570 
1571 	/* return EAGAIN if no receive buffer */
1572 	if (socket->opts & MTCP_NONBLOCK) {
1573 		if (!rcvvar->rcvbuf || merged_len == 0) {
1574 			errno = EAGAIN;
1575 			return -1;
1576 		}
1577 	}
1578 
1579 	SBUF_LOCK(&rcvvar->read_lock);
1580 
1581 	switch (flags) {
1582 	case 0:
1583 		ret = CopyToUser(mtcp, cur_stream, buf, len);
1584 		break;
1585 	case MSG_PEEK:
1586 		ret = PeekForUser(mtcp, cur_stream, buf, len);
1587 		break;
1588 	default:
1589 		SBUF_UNLOCK(&rcvvar->read_lock);
1590 		ret = -1;
1591 		errno = EINVAL;
1592 		return ret;
1593 	}
1594 
1595 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1596 	event_remaining = FALSE;
1597 	/* if there are remaining payload, generate EPOLLIN */
1598 	/* (may due to insufficient user buffer) */
1599 	if (socket->epoll & MOS_EPOLLIN) {
1600 		if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
1601 			event_remaining = TRUE;
1602 		}
1603 	}
1604 	/* if waiting for close, notify it if no remaining data */
1605 	if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
1606 	    merged_len == 0 && ret > 0) {
1607 		event_remaining = TRUE;
1608 	}
1609 
1610 	SBUF_UNLOCK(&rcvvar->read_lock);
1611 
1612 	if (event_remaining) {
1613 		if (socket->epoll) {
1614 			AddEpollEvent(mtcp->ep,
1615 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
1616 		}
1617 	}
1618 
1619 	TRACE_API("Stream %d: mtcp_recv() returning %d\n", cur_stream->id, ret);
1620 	return ret;
1621 }
1622 /*----------------------------------------------------------------------------*/
1623 inline ssize_t
1624 mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len)
1625 {
1626 	return mtcp_recv(mctx, sockid, buf, len, 0);
1627 }
1628 /*----------------------------------------------------------------------------*/
1629 ssize_t
1630 mtcp_readv(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV)
1631 {
1632 	mtcp_manager_t mtcp;
1633 	socket_map_t socket;
1634 	tcp_stream *cur_stream;
1635 	struct tcp_recv_vars *rcvvar;
1636 	int ret, bytes_read, i;
1637 	int event_remaining, merged_len;
1638 
1639 	mtcp = GetMTCPManager(mctx);
1640 	if (!mtcp) {
1641 		errno = EACCES;
1642 		return -1;
1643 	}
1644 
1645 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1646 		TRACE_API("Socket id %d out of range.\n", sockid);
1647 		errno = EBADF;
1648 		return -1;
1649 	}
1650 
1651 	socket = &mtcp->smap[sockid];
1652 	if (socket->socktype == MOS_SOCK_UNUSED) {
1653 		TRACE_API("Invalid socket id: %d\n", sockid);
1654 		errno = EBADF;
1655 		return -1;
1656 	}
1657 
1658 	if (socket->socktype != MOS_SOCK_STREAM) {
1659 		TRACE_API("Not an end socket. id: %d\n", sockid);
1660 		errno = ENOTSOCK;
1661 		return -1;
1662 	}
1663 
1664 	/* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
1665 	cur_stream = socket->stream;
1666 	if (!cur_stream ||
1667 			!(cur_stream->state >= TCP_ST_ESTABLISHED &&
1668 			  cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
1669 		errno = ENOTCONN;
1670 		return -1;
1671 	}
1672 
1673 	rcvvar = cur_stream->rcvvar;
1674 
1675 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1676 
1677 	/* if CLOSE_WAIT, return 0 if there is no payload */
1678 	if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
1679 		if (!rcvvar->rcvbuf)
1680 			return 0;
1681 
1682 		if (merged_len == 0)
1683 			return 0;
1684 	}
1685 
1686 	/* return EAGAIN if no receive buffer */
1687 	if (socket->opts & MTCP_NONBLOCK) {
1688 		if (!rcvvar->rcvbuf || merged_len == 0) {
1689 			errno = EAGAIN;
1690 			return -1;
1691 		}
1692 	}
1693 
1694 	SBUF_LOCK(&rcvvar->read_lock);
1695 
1696 	/* read and store the contents to the vectored buffers */
1697 	bytes_read = 0;
1698 	for (i = 0; i < numIOV; i++) {
1699 		if (iov[i].iov_len <= 0)
1700 			continue;
1701 
1702 		ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
1703 		if (ret <= 0)
1704 			break;
1705 
1706 		bytes_read += ret;
1707 
1708 		if (ret < iov[i].iov_len)
1709 			break;
1710 	}
1711 
1712 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1713 
1714 	event_remaining = FALSE;
1715 	/* if there are remaining payload, generate read event */
1716 	/* (may due to insufficient user buffer) */
1717 	if (socket->epoll & MOS_EPOLLIN) {
1718 		if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
1719 			event_remaining = TRUE;
1720 		}
1721 	}
1722 	/* if waiting for close, notify it if no remaining data */
1723 	if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
1724 			merged_len == 0 && bytes_read > 0) {
1725 		event_remaining = TRUE;
1726 	}
1727 
1728 	SBUF_UNLOCK(&rcvvar->read_lock);
1729 
1730 	if(event_remaining) {
1731 		if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) {
1732 			AddEpollEvent(mtcp->ep,
1733 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
1734 		}
1735 	}
1736 
1737 	TRACE_API("Stream %d: mtcp_readv() returning %d\n",
1738 			cur_stream->id, bytes_read);
1739 	return bytes_read;
1740 }
1741 /*----------------------------------------------------------------------------*/
1742 static inline int
1743 CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, const char *buf, int len)
1744 {
1745 	struct tcp_send_vars *sndvar = cur_stream->sndvar;
1746 	int sndlen;
1747 	int ret;
1748 
1749 	sndlen = MIN((int)sndvar->snd_wnd, len);
1750 	if (sndlen <= 0) {
1751 		errno = EAGAIN;
1752 		return -1;
1753 	}
1754 
1755 	/* allocate send buffer if not exist */
1756 	if (!sndvar->sndbuf) {
1757 		sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1);
1758 		if (!sndvar->sndbuf) {
1759 			cur_stream->close_reason = TCP_NO_MEM;
1760 			/* notification may not required due to -1 return */
1761 			errno = ENOMEM;
1762 			return -1;
1763 		}
1764 	}
1765 
1766 	ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen);
1767 	assert(ret == sndlen);
1768 	sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len;
1769 	if (ret <= 0) {
1770 		TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n",
1771 				ret, sndlen, sndvar->sndbuf->len);
1772 		errno = EAGAIN;
1773 		return -1;
1774 	}
1775 
1776 	if (sndvar->snd_wnd <= 0) {
1777 		TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n",
1778 				cur_stream->id, sndvar->snd_wnd);
1779 	}
1780 
1781 	return ret;
1782 }
1783 /*----------------------------------------------------------------------------*/
1784 ssize_t
1785 mtcp_write(mctx_t mctx, int sockid, const char *buf, size_t len)
1786 {
1787 	mtcp_manager_t mtcp;
1788 	socket_map_t socket;
1789 	tcp_stream *cur_stream;
1790 	struct tcp_send_vars *sndvar;
1791 	int ret;
1792 
1793 	mtcp = GetMTCPManager(mctx);
1794 	if (!mtcp) {
1795 		errno = EACCES;
1796 		return -1;
1797 	}
1798 
1799 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1800 		TRACE_API("Socket id %d out of range.\n", sockid);
1801 		errno = EBADF;
1802 		return -1;
1803 	}
1804 
1805 	socket = &mtcp->smap[sockid];
1806 	if (socket->socktype == MOS_SOCK_UNUSED) {
1807 		TRACE_API("Invalid socket id: %d\n", sockid);
1808 		errno = EBADF;
1809 		return -1;
1810 	}
1811 
1812 	if (socket->socktype == MOS_SOCK_PIPE) {
1813 		return PipeWrite(mctx, sockid, buf, len);
1814 	}
1815 
1816 	if (socket->socktype != MOS_SOCK_STREAM) {
1817 		TRACE_API("Not an end socket. id: %d\n", sockid);
1818 		errno = ENOTSOCK;
1819 		return -1;
1820 	}
1821 
1822 	cur_stream = socket->stream;
1823 	if (!cur_stream ||
1824 			!(cur_stream->state == TCP_ST_ESTABLISHED ||
1825 			  cur_stream->state == TCP_ST_CLOSE_WAIT)) {
1826 		errno = ENOTCONN;
1827 		return -1;
1828 	}
1829 
1830 	if (len <= 0) {
1831 		if (socket->opts & MTCP_NONBLOCK) {
1832 			errno = EAGAIN;
1833 			return -1;
1834 		} else {
1835 			return 0;
1836 		}
1837 	}
1838 
1839 	sndvar = cur_stream->sndvar;
1840 
1841 	SBUF_LOCK(&sndvar->write_lock);
1842 	ret = CopyFromUser(mtcp, cur_stream, buf, len);
1843 
1844 	SBUF_UNLOCK(&sndvar->write_lock);
1845 
1846 	if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
1847 		SQ_LOCK(&mtcp->ctx->sendq_lock);
1848 		sndvar->on_sendq = TRUE;
1849 		StreamEnqueue(mtcp->sendq, cur_stream);		/* this always success */
1850 		SQ_UNLOCK(&mtcp->ctx->sendq_lock);
1851 		mtcp->wakeup_flag = TRUE;
1852 	}
1853 
1854 	if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) {
1855 		ret = -1;
1856 		errno = EAGAIN;
1857 	}
1858 
1859 	/* if there are remaining sending buffer, generate write event */
1860 	if (sndvar->snd_wnd > 0) {
1861 		if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
1862 			AddEpollEvent(mtcp->ep,
1863 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
1864 		}
1865 	}
1866 
1867 	TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret);
1868 	return ret;
1869 }
1870 /*----------------------------------------------------------------------------*/
1871 ssize_t
1872 mtcp_writev(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV)
1873 {
1874 	mtcp_manager_t mtcp;
1875 	socket_map_t socket;
1876 	tcp_stream *cur_stream;
1877 	struct tcp_send_vars *sndvar;
1878 	int ret, to_write, i;
1879 
1880 	mtcp = GetMTCPManager(mctx);
1881 	if (!mtcp) {
1882 		errno = EACCES;
1883 		return -1;
1884 	}
1885 
1886 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1887 		TRACE_API("Socket id %d out of range.\n", sockid);
1888 		errno = EBADF;
1889 		return -1;
1890 	}
1891 
1892 	socket = &mtcp->smap[sockid];
1893 	if (socket->socktype == MOS_SOCK_UNUSED) {
1894 		TRACE_API("Invalid socket id: %d\n", sockid);
1895 		errno = EBADF;
1896 		return -1;
1897 	}
1898 
1899 	if (socket->socktype != MOS_SOCK_STREAM) {
1900 		TRACE_API("Not an end socket. id: %d\n", sockid);
1901 		errno = ENOTSOCK;
1902 		return -1;
1903 	}
1904 
1905 	cur_stream = socket->stream;
1906 	if (!cur_stream ||
1907 			!(cur_stream->state == TCP_ST_ESTABLISHED ||
1908 			  cur_stream->state == TCP_ST_CLOSE_WAIT)) {
1909 		errno = ENOTCONN;
1910 		return -1;
1911 	}
1912 
1913 	sndvar = cur_stream->sndvar;
1914 	SBUF_LOCK(&sndvar->write_lock);
1915 
1916 	/* write from the vectored buffers */
1917 	to_write = 0;
1918 	for (i = 0; i < numIOV; i++) {
1919 		if (iov[i].iov_len <= 0)
1920 			continue;
1921 
1922 		ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
1923 		if (ret <= 0)
1924 			break;
1925 
1926 		to_write += ret;
1927 
1928 		if (ret < iov[i].iov_len)
1929 			break;
1930 	}
1931 	SBUF_UNLOCK(&sndvar->write_lock);
1932 
1933 	if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
1934 		SQ_LOCK(&mtcp->ctx->sendq_lock);
1935 		sndvar->on_sendq = TRUE;
1936 		StreamEnqueue(mtcp->sendq, cur_stream);		/* this always success */
1937 		SQ_UNLOCK(&mtcp->ctx->sendq_lock);
1938 		mtcp->wakeup_flag = TRUE;
1939 	}
1940 
1941 	if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) {
1942 		to_write = -1;
1943 		errno = EAGAIN;
1944 	}
1945 
1946 	/* if there are remaining sending buffer, generate write event */
1947 	if (sndvar->snd_wnd > 0) {
1948 		if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
1949 			AddEpollEvent(mtcp->ep,
1950 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
1951 		}
1952 	}
1953 
1954 	TRACE_API("Stream %d: mtcp_writev() returning %d\n",
1955 			cur_stream->id, to_write);
1956 	return to_write;
1957 }
1958 /*----------------------------------------------------------------------------*/
1959 uint32_t
1960 mtcp_get_connection_cnt(mctx_t mctx)
1961 {
1962 	mtcp_manager_t mtcp;
1963 	mtcp = GetMTCPManager(mctx);
1964 	if (!mtcp) {
1965 		errno = EACCES;
1966 		return -1;
1967 	}
1968 
1969 	if (mtcp->num_msp > 0)
1970 		return mtcp->flow_cnt / 2;
1971 	else
1972 		return mtcp->flow_cnt;
1973 }
1974 /*----------------------------------------------------------------------------*/
1975