xref: /mOS-networking-stack/core/src/api.c (revision dcdbbb98)
1 #include <sys/queue.h>
2 #include <sys/ioctl.h>
3 #include <limits.h>
4 #include <unistd.h>
5 #include <assert.h>
6 #include <string.h>
7 
8 #ifdef DARWIN
9 #include <netinet/tcp.h>
10 #include <netinet/ip.h>
11 #include <netinet/if_ether.h>
12 #endif
13 
14 #include "mtcp.h"
15 #include "mtcp_api.h"
16 #include "tcp_in.h"
17 #include "tcp_stream.h"
18 #include "tcp_out.h"
19 #include "ip_out.h"
20 #include "eventpoll.h"
21 #include "pipe.h"
22 #include "fhash.h"
23 #include "addr_pool.h"
24 #include "rss.h"
25 #include "config.h"
26 #include "debug.h"
27 #include "eventpoll.h"
28 #include "mos_api.h"
29 #include "tcp_rb.h"
30 
31 #define MAX(a, b) ((a)>(b)?(a):(b))
32 #define MIN(a, b) ((a)<(b)?(a):(b))
33 
34 /*----------------------------------------------------------------------------*/
35 /** Stop monitoring the socket! (function prototype)
36  * @param [in] mctx: mtcp context
37  * @param [in] sock: monitoring stream socket id
38  * @param [in] side: side of monitoring (client side, server side or both)
39  *
40  * This function is now DEPRECATED and is only used within mOS core...
41  */
42 int
43 mtcp_cb_stop(mctx_t mctx, int sock, int side);
44 /*----------------------------------------------------------------------------*/
45 /** Reset the connection (send RST packets to both sides)
46  *  (We need to decide the API for this.)
47  */
48 //int
49 //mtcp_cb_reset(mctx_t mctx, int sock, int side);
50 /*----------------------------------------------------------------------------*/
51 inline mtcp_manager_t
52 GetMTCPManager(mctx_t mctx)
53 {
54 	if (!mctx) {
55 		errno = EACCES;
56 		return NULL;
57 	}
58 
59 	if (mctx->cpu < 0 || mctx->cpu >= num_cpus) {
60 		errno = EINVAL;
61 		return NULL;
62 	}
63 
64 	if (!g_mtcp[mctx->cpu] || g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) {
65 		errno = EPERM;
66 		return NULL;
67 	}
68 
69 	return g_mtcp[mctx->cpu];
70 }
71 /*----------------------------------------------------------------------------*/
72 static inline int
73 GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen)
74 {
75 	tcp_stream *cur_stream;
76 
77 	if (!socket->stream) {
78 		errno = EBADF;
79 		return -1;
80 	}
81 
82 	cur_stream = socket->stream;
83 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
84 		if (cur_stream->close_reason == TCP_TIMEDOUT ||
85 				cur_stream->close_reason == TCP_CONN_FAIL ||
86 				cur_stream->close_reason == TCP_CONN_LOST) {
87 			*(int *)optval = ETIMEDOUT;
88 			*optlen = sizeof(int);
89 
90 			return 0;
91 		}
92 	}
93 
94 	if (cur_stream->state == TCP_ST_CLOSE_WAIT ||
95 			cur_stream->state == TCP_ST_CLOSED_RSVD) {
96 		if (cur_stream->close_reason == TCP_RESET) {
97 			*(int *)optval = ECONNRESET;
98 			*optlen = sizeof(int);
99 
100 			return 0;
101 		}
102 	}
103 
104 	if (cur_stream->state == TCP_ST_SYN_SENT &&
105 	    errno == EINPROGRESS) {
106 		*(int *)optval = errno;
107 		*optlen = sizeof(int);
108 
109 		return -1;
110 	}
111 
112 	/*
113 	 * `base case`: If socket sees no so_error, then
114 	 * this also means close_reason will always be
115 	 * TCP_NOT_CLOSED.
116 	 */
117 	if (cur_stream->close_reason == TCP_NOT_CLOSED) {
118 		*(int *)optval = 0;
119 		*optlen = sizeof(int);
120 
121 		return 0;
122 	}
123 
124 	errno = ENOSYS;
125 	return -1;
126 }
127 /*----------------------------------------------------------------------------*/
128 int
129 mtcp_getsockname(mctx_t mctx, int sockid, struct sockaddr *addr,
130 		 socklen_t *addrlen)
131 {
132 	mtcp_manager_t mtcp;
133 	socket_map_t socket;
134 
135 	mtcp = GetMTCPManager(mctx);
136 	if (!mtcp) {
137 		return -1;
138 	}
139 
140 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
141 		TRACE_API("Socket id %d out of range.\n", sockid);
142 		errno = EBADF;
143 		return -1;
144 	}
145 
146 	socket = &mtcp->smap[sockid];
147 	if (socket->socktype == MOS_SOCK_UNUSED) {
148 		TRACE_API("Invalid socket id: %d\n", sockid);
149 		errno = EBADF;
150 		return -1;
151 	}
152 
153 	if (*addrlen <= 0) {
154 		TRACE_API("Invalid addrlen: %d\n", *addrlen);
155 		errno = EINVAL;
156 		return -1;
157 	}
158 
159 	if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
160 	    socket->socktype != MOS_SOCK_STREAM) {
161 		TRACE_API("Invalid socket id: %d\n", sockid);
162 		errno = ENOTSOCK;
163 		return -1;
164 	}
165 
166 	*(struct sockaddr_in *)addr = socket->saddr;
167         *addrlen = sizeof(socket->saddr);
168 
169 	return 0;
170 }
171 /*----------------------------------------------------------------------------*/
172 int
173 mtcp_getsockopt(mctx_t mctx, int sockid, int level,
174 		int optname, void *optval, socklen_t *optlen)
175 {
176 	mtcp_manager_t mtcp;
177 	socket_map_t socket;
178 
179 	mtcp = GetMTCPManager(mctx);
180 	if (!mtcp) {
181 		errno = EACCES;
182 		return -1;
183 	}
184 
185 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
186 		TRACE_API("Socket id %d out of range.\n", sockid);
187 		errno = EBADF;
188 		return -1;
189 	}
190 
191 	switch (level) {
192 	case SOL_SOCKET:
193 		socket = &mtcp->smap[sockid];
194 		if (socket->socktype == MOS_SOCK_UNUSED) {
195 			TRACE_API("Invalid socket id: %d\n", sockid);
196 			errno = EBADF;
197 			return -1;
198 		}
199 
200 		if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
201 		    socket->socktype != MOS_SOCK_STREAM) {
202 			TRACE_API("Invalid socket id: %d\n", sockid);
203 			errno = ENOTSOCK;
204 			return -1;
205 		}
206 
207 		if (optname == SO_ERROR) {
208 			if (socket->socktype == MOS_SOCK_STREAM) {
209 				return GetSocketError(socket, optval, optlen);
210 			}
211 		}
212 		break;
213 	case SOL_MONSOCKET:
214 		/* check if the calling thread is in MOS context */
215 		if (mtcp->ctx->thread != pthread_self()) {
216 			errno = EPERM;
217 			return -1;
218 		}
219 		/*
220 		 * All options will only work for active
221 		 * monitor stream sockets
222 		 */
223 		socket = &mtcp->msmap[sockid];
224 		if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
225 			TRACE_API("Invalid socket id: %d\n", sockid);
226 			errno = ENOTSOCK;
227 			return -1;
228 		}
229 
230 		switch (optname) {
231 		case MOS_FRAGINFO_CLIBUF:
232 			return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen);
233 		case MOS_FRAGINFO_SVRBUF:
234 			return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen);
235 		case MOS_INFO_CLIBUF:
236 			return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen);
237 		case MOS_INFO_SVRBUF:
238 			return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen);
239 		case MOS_TCP_STATE_CLI:
240 			return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI,
241 							   optval, optlen);
242 		case MOS_TCP_STATE_SVR:
243 			return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR,
244 							   optval, optlen);
245 		case MOS_TIMESTAMP:
246 			return GetLastTimestamp(socket->monitor_stream->stream,
247 						(uint32_t *)optval,
248 						optlen);
249 		default:
250 		  TRACE_API("can't recognize the optname=%d\n", optname);
251 		  assert(0);
252 		}
253 		break;
254 	}
255 	errno = ENOSYS;
256 	return -1;
257 }
258 /*----------------------------------------------------------------------------*/
259 int
260 mtcp_setsockopt(mctx_t mctx, int sockid, int level,
261 		int optname, const void *optval, socklen_t optlen)
262 {
263 	mtcp_manager_t mtcp;
264 	socket_map_t socket;
265 	tcprb_t *rb;
266 
267 	mtcp = GetMTCPManager(mctx);
268 	if (!mtcp) {
269 		errno = EACCES;
270 		return -1;
271 	}
272 
273 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
274 		TRACE_API("Socket id %d out of range.\n", sockid);
275 		errno = EBADF;
276 		return -1;
277 	}
278 
279 	switch (level) {
280 	case SOL_SOCKET:
281 		socket = &mtcp->smap[sockid];
282 		if (socket->socktype == MOS_SOCK_UNUSED) {
283 			TRACE_API("Invalid socket id: %d\n", sockid);
284 			errno = EBADF;
285 			return -1;
286 		}
287 
288 		if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
289 		    socket->socktype != MOS_SOCK_STREAM) {
290 			TRACE_API("Invalid socket id: %d\n", sockid);
291 			errno = ENOTSOCK;
292 			return -1;
293 		}
294 		break;
295 	case SOL_MONSOCKET:
296 		socket = &mtcp->msmap[sockid];
297 		/*
298 		 * checking of calling thread to be in MOS context is
299 		 * disabled since both optnames can be called from
300 		 * `application' context (on passive sockets)
301 		 */
302 		/*
303 		 * if (mtcp->ctx->thread != pthread_self())
304 		 * return -1;
305 		 */
306 
307 		switch (optname) {
308 		case MOS_CLIOVERLAP:
309 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
310 				socket->monitor_stream->stream->rcvvar->rcvbuf :
311 			socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
312 			if (rb == NULL) {
313 				errno = EFAULT;
314 				return -1;
315 			}
316 			if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) {
317 				errno = EINVAL;
318 				return -1;
319 			} else
320 				return 0;
321 			break;
322 		case MOS_SVROVERLAP:
323 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
324 				socket->monitor_stream->stream->rcvvar->rcvbuf :
325 			socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
326 			if (rb == NULL) {
327 				errno = EFAULT;
328 				return -1;
329 			}
330 			if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) {
331 				errno = EINVAL;
332 				return -1;
333 			} else
334 				return 0;
335 			break;
336 		case MOS_CLIBUF:
337 #if 0
338 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
339 				errno = EBADF;
340 				return -1;
341 			}
342 #endif
343 #ifdef DISABLE_DYN_RESIZE
344 			if (*(int *)optval != 0)
345 				return -1;
346 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
347 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
348 					socket->monitor_stream->stream->rcvvar->rcvbuf :
349 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
350 				if (rb) {
351 					tcprb_resize_meta(rb, 0);
352 					tcprb_resize(rb, 0);
353 				}
354 			}
355 			return DisableBuf(socket, MOS_SIDE_CLI);
356 #else
357 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
358 				socket->monitor_stream->stream->rcvvar->rcvbuf :
359 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
360 			if (tcprb_resize_meta(rb, *(int *)optval) < 0)
361 				return -1;
362 			return tcprb_resize(rb,
363 					(((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
364 #endif
365 		case MOS_SVRBUF:
366 #if 0
367 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
368 				errno = EBADF;
369 				return -1;
370 			}
371 #endif
372 #ifdef DISABLE_DYN_RESIZE
373 			if (*(int *)optval != 0)
374 				return -1;
375 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
376 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
377 					socket->monitor_stream->stream->rcvvar->rcvbuf :
378 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
379 				if (rb) {
380 					tcprb_resize_meta(rb, 0);
381 					tcprb_resize(rb, 0);
382 				}
383 			}
384 			return DisableBuf(socket, MOS_SIDE_SVR);
385 #else
386 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
387 				socket->monitor_stream->stream->rcvvar->rcvbuf :
388 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
389 			if (tcprb_resize_meta(rb, *(int *)optval) < 0)
390 				return -1;
391 			return tcprb_resize(rb,
392 					(((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
393 #endif
394 		case MOS_FRAG_CLIBUF:
395 #if 0
396 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
397 				errno = EBADF;
398 				return -1;
399 			}
400 #endif
401 #ifdef DISABLE_DYN_RESIZE
402 			if (*(int *)optval != 0)
403 				return -1;
404 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
405 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
406 					socket->monitor_stream->stream->rcvvar->rcvbuf :
407 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
408 				if (rb)
409 					tcprb_resize(rb, 0);
410 			}
411 			return 0;
412 #else
413 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
414 				socket->monitor_stream->stream->rcvvar->rcvbuf :
415 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
416 			if (rb->len == 0)
417 				return tcprb_resize_meta(rb, *(int *)optval);
418 			else
419 				return -1;
420 #endif
421 		case MOS_FRAG_SVRBUF:
422 #if 0
423 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
424 				errno = EBADF;
425 				return -1;
426 			}
427 #endif
428 #ifdef DISABLE_DYN_RESIZE
429 			if (*(int *)optval != 0)
430 				return -1;
431 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
432 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
433 					socket->monitor_stream->stream->rcvvar->rcvbuf :
434 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
435 				if (rb)
436 					tcprb_resize(rb, 0);
437 			}
438 			return 0;
439 #else
440 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
441 				socket->monitor_stream->stream->rcvvar->rcvbuf :
442 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
443 			if (rb->len == 0)
444 				return tcprb_resize_meta(rb, *(int *)optval);
445 			else
446 				return -1;
447 #endif
448 		case MOS_MONLEVEL:
449 #ifdef OLD_API
450 			assert(*(int *)optval == MOS_NO_CLIBUF ||
451 			       *(int *)optval == MOS_NO_SVRBUF);
452 			return DisableBuf(socket,
453 					  (*(int *)optval == MOS_NO_CLIBUF) ?
454 					  MOS_SIDE_CLI : MOS_SIDE_SVR);
455 #endif
456 		case MOS_SEQ_REMAP:
457 			return TcpSeqChange(socket,
458 					    (uint32_t)((seq_remap_info *)optval)->seq_off,
459 					    ((seq_remap_info *)optval)->side,
460 					    mtcp->pctx->p.seq);
461 		case MOS_STOP_MON:
462 			return mtcp_cb_stop(mctx, sockid, *(int *)optval);
463 		default:
464 			TRACE_API("invalid optname=%d\n", optname);
465 			assert(0);
466 		}
467 		break;
468 	}
469 
470 	errno = ENOSYS;
471 	return -1;
472 }
473 /*----------------------------------------------------------------------------*/
474 int
475 mtcp_setsock_nonblock(mctx_t mctx, int sockid)
476 {
477 	mtcp_manager_t mtcp;
478 
479 	mtcp = GetMTCPManager(mctx);
480 	if (!mtcp) {
481 		errno = EACCES;
482 		return -1;
483 	}
484 
485 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
486 		TRACE_API("Socket id %d out of range.\n", sockid);
487 		errno = EBADF;
488 		return -1;
489 	}
490 
491 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
492 		TRACE_API("Invalid socket id: %d\n", sockid);
493 		errno = EBADF;
494 		return -1;
495 	}
496 
497 	mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
498 
499 	return 0;
500 }
501 /*----------------------------------------------------------------------------*/
502 int
503 mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp)
504 {
505 	mtcp_manager_t mtcp;
506 	socket_map_t socket;
507 
508 	mtcp = GetMTCPManager(mctx);
509 	if (!mtcp) {
510 		errno = EACCES;
511 		return -1;
512 	}
513 
514 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
515 		TRACE_API("Socket id %d out of range.\n", sockid);
516 		errno = EBADF;
517 		return -1;
518 	}
519 
520 	/* only support stream socket */
521 	socket = &mtcp->smap[sockid];
522 
523 	if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
524 		socket->socktype != MOS_SOCK_STREAM) {
525 		TRACE_API("Invalid socket id: %d\n", sockid);
526 		errno = EBADF;
527 		return -1;
528 	}
529 
530 	if (!argp) {
531 		errno = EFAULT;
532 		return -1;
533 	}
534 
535 	if (request == FIONREAD) {
536 		tcp_stream *cur_stream;
537 		tcprb_t *rbuf;
538 
539 		cur_stream = socket->stream;
540 		if (!cur_stream) {
541 			errno = EBADF;
542 			return -1;
543 		}
544 
545 		rbuf = cur_stream->rcvvar->rcvbuf;
546 		*(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0;
547 
548 	} else if (request == FIONBIO) {
549 		/*
550 		 * sockets can only be set to blocking/non-blocking
551 		 * modes during initialization
552 		 */
553 		if ((*(int *)argp))
554 			mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
555 		else
556 			mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK;
557 	} else {
558 		errno = EINVAL;
559 		return -1;
560 	}
561 
562 	return 0;
563 }
564 /*----------------------------------------------------------------------------*/
565 static int
566 mtcp_monitor(mctx_t mctx, socket_map_t sock)
567 {
568 	mtcp_manager_t mtcp;
569 	struct mon_listener *monitor;
570 	int sockid = sock->id;
571 
572 	mtcp = GetMTCPManager(mctx);
573 	if (!mtcp) {
574 		errno = EACCES;
575 		return -1;
576 	}
577 
578 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
579 		TRACE_API("Socket id %d out of range.\n", sockid);
580 		errno = EBADF;
581 		return -1;
582 	}
583 
584 	if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) {
585 		TRACE_API("Invalid socket id: %d\n", sockid);
586 		errno = EBADF;
587 		return -1;
588 	}
589 
590 	if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM ||
591 	      mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) {
592 		TRACE_API("Not a monitor socket. id: %d\n", sockid);
593 		errno = ENOTSOCK;
594 		return -1;
595 	}
596 
597 	monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener));
598 	if (!monitor) {
599 		/* errno set from the malloc() */
600 		errno = ENOMEM;
601 		return -1;
602 	}
603 
604 	/* create a monitor-specific event queue */
605 	monitor->eq = CreateEventQueue(g_config.mos->max_concurrency);
606 	if (!monitor->eq) {
607 		TRACE_API("Can't create event queue (concurrency: %d) for "
608 			  "monitor read event registrations!\n",
609 			  g_config.mos->max_concurrency);
610 		free(monitor);
611 		errno = ENOMEM;
612 		return -1;
613 	}
614 
615 	/* set monitor-related basic parameters */
616 #ifndef NEWEV
617 	monitor->ude_id = UDE_OFFSET;
618 #endif
619 	monitor->socket = sock;
620 	monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL;
621 
622 	/* perform both sides monitoring by default */
623 	monitor->client_mon = monitor->server_mon = 1;
624 
625 	/* add monitor socket to the monitor list */
626 	TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link);
627 
628 	mtcp->msmap[sockid].monitor_listener = monitor;
629 
630 	return 0;
631 }
632 /*----------------------------------------------------------------------------*/
633 int
634 mtcp_socket(mctx_t mctx, int domain, int type, int protocol)
635 {
636 	mtcp_manager_t mtcp;
637 	socket_map_t socket;
638 
639 	mtcp = GetMTCPManager(mctx);
640 	if (!mtcp) {
641 		errno = EACCES;
642 		return -1;
643 	}
644 
645 	if (domain != AF_INET) {
646 		errno = EAFNOSUPPORT;
647 		return -1;
648 	}
649 
650 	if (type == (int)SOCK_STREAM) {
651 		type = MOS_SOCK_STREAM;
652 	} else if (type == MOS_SOCK_MONITOR_STREAM ||
653 		   type == MOS_SOCK_MONITOR_RAW) {
654 		/* do nothing for the time being */
655 	} else {
656 		/* Not supported type */
657 		errno = EINVAL;
658 		return -1;
659 	}
660 
661 	socket = AllocateSocket(mctx, type);
662 	if (!socket) {
663 		errno = ENFILE;
664 		return -1;
665 	}
666 
667 	if (type == MOS_SOCK_MONITOR_STREAM ||
668 	    type == MOS_SOCK_MONITOR_RAW) {
669 		mtcp_manager_t mtcp = GetMTCPManager(mctx);
670 		if (!mtcp) {
671 			errno = EACCES;
672 			return -1;
673 		}
674 		mtcp_monitor(mctx, socket);
675 #ifdef NEWEV
676 		socket->monitor_listener->stree_dontcare = NULL;
677 		socket->monitor_listener->stree_pre_rcv = NULL;
678 		socket->monitor_listener->stree_post_snd = NULL;
679 #else
680 		InitEvB(mtcp, &socket->monitor_listener->dontcare_evb);
681 		InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb);
682 		InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb);
683 #endif
684 	}
685 
686 	return socket->id;
687 }
688 /*----------------------------------------------------------------------------*/
689 int
690 mtcp_bind(mctx_t mctx, int sockid,
691 		const struct sockaddr *addr, socklen_t addrlen)
692 {
693 	mtcp_manager_t mtcp;
694 	struct sockaddr_in *addr_in;
695 
696 	mtcp = GetMTCPManager(mctx);
697 	if (!mtcp) {
698 		errno = EACCES;
699 		return -1;
700 	}
701 
702 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
703 		TRACE_API("Socket id %d out of range.\n", sockid);
704 		errno = EBADF;
705 		return -1;
706 	}
707 
708 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
709 		TRACE_API("Invalid socket id: %d\n", sockid);
710 		errno = EBADF;
711 		return -1;
712 	}
713 
714 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM &&
715 			mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
716 		TRACE_API("Not a stream socket id: %d\n", sockid);
717 		errno = ENOTSOCK;
718 		return -1;
719 	}
720 
721 	if (!addr) {
722 		TRACE_API("Socket %d: empty address!\n", sockid);
723 		errno = EINVAL;
724 		return -1;
725 	}
726 
727 	if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) {
728 		TRACE_API("Socket %d: adress already bind for this socket.\n", sockid);
729 		errno = EINVAL;
730 		return -1;
731 	}
732 
733 	/* we only allow bind() for AF_INET address */
734 	if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
735 		TRACE_API("Socket %d: invalid argument!\n", sockid);
736 		errno = EINVAL;
737 		return -1;
738 	}
739 
740 	if (mtcp->listener) {
741 		TRACE_API("Address already bound!\n");
742 		errno = EINVAL;
743 		return -1;
744 	}
745 	addr_in = (struct sockaddr_in *)addr;
746 	mtcp->smap[sockid].saddr = *addr_in;
747 	mtcp->smap[sockid].opts |= MTCP_ADDR_BIND;
748 
749 	return 0;
750 }
751 /*----------------------------------------------------------------------------*/
752 int
753 mtcp_listen(mctx_t mctx, int sockid, int backlog)
754 {
755 	mtcp_manager_t mtcp;
756 	struct tcp_listener *listener;
757 
758 	mtcp = GetMTCPManager(mctx);
759 	if (!mtcp) {
760 		errno = EACCES;
761 		return -1;
762 	}
763 
764 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
765 		TRACE_API("Socket id %d out of range.\n", sockid);
766 		errno = EBADF;
767 		return -1;
768 	}
769 
770 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
771 		TRACE_API("Invalid socket id: %d\n", sockid);
772 		errno = EBADF;
773 		return -1;
774 	}
775 
776 	if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) {
777 		mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN;
778 	}
779 
780 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
781 		TRACE_API("Not a listening socket. id: %d\n", sockid);
782 		errno = ENOTSOCK;
783 		return -1;
784 	}
785 
786 	if (backlog <= 0 || backlog > g_config.mos->max_concurrency) {
787 		errno = EINVAL;
788 		return -1;
789 	}
790 
791 	listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener));
792 	if (!listener) {
793 		/* errno set from the malloc() */
794 		errno = ENOMEM;
795 		return -1;
796 	}
797 
798 	listener->sockid = sockid;
799 	listener->backlog = backlog;
800 	listener->socket = &mtcp->smap[sockid];
801 
802 	if (pthread_cond_init(&listener->accept_cond, NULL)) {
803 		perror("pthread_cond_init of ctx->accept_cond\n");
804 		/* errno set by pthread_cond_init() */
805 		free(listener);
806 		return -1;
807 	}
808 	if (pthread_mutex_init(&listener->accept_lock, NULL)) {
809 		perror("pthread_mutex_init of ctx->accept_lock\n");
810 		/* errno set by pthread_mutex_init() */
811 		free(listener);
812 		return -1;
813 	}
814 
815 	listener->acceptq = CreateStreamQueue(backlog);
816 	if (!listener->acceptq) {
817 		free(listener);
818 		errno = ENOMEM;
819 		return -1;
820 	}
821 
822 	mtcp->smap[sockid].listener = listener;
823 	mtcp->listener = listener;
824 
825 	return 0;
826 }
827 /*----------------------------------------------------------------------------*/
828 int
829 mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen)
830 {
831 	mtcp_manager_t mtcp;
832 	struct tcp_listener *listener;
833 	socket_map_t socket;
834 	tcp_stream *accepted = NULL;
835 
836 	mtcp = GetMTCPManager(mctx);
837 	if (!mtcp) {
838 		errno = EACCES;
839 		return -1;
840 	}
841 
842 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
843 		TRACE_API("Socket id %d out of range.\n", sockid);
844 		errno = EBADF;
845 		return -1;
846 	}
847 
848 	/* requires listening socket */
849 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
850 		errno = EINVAL;
851 		return -1;
852 	}
853 
854 	listener = mtcp->smap[sockid].listener;
855 
856 	/* dequeue from the acceptq without lock first */
857 	/* if nothing there, acquire lock and cond_wait */
858 	accepted = StreamDequeue(listener->acceptq);
859 	if (!accepted) {
860 		if (listener->socket->opts & MTCP_NONBLOCK) {
861 			errno = EAGAIN;
862 			return -1;
863 
864 		} else {
865 			pthread_mutex_lock(&listener->accept_lock);
866 			while ((accepted = StreamDequeue(listener->acceptq)) == NULL) {
867 				pthread_cond_wait(&listener->accept_cond, &listener->accept_lock);
868 
869 				if (mtcp->ctx->done || mtcp->ctx->exit) {
870 					pthread_mutex_unlock(&listener->accept_lock);
871 					errno = EINTR;
872 					return -1;
873 				}
874 			}
875 			pthread_mutex_unlock(&listener->accept_lock);
876 		}
877 	}
878 
879 	if (!accepted) {
880 		TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n");
881 	}
882 
883 	if (!accepted->socket) {
884 		socket = AllocateSocket(mctx, MOS_SOCK_STREAM);
885 		if (!socket) {
886 			TRACE_ERROR("Failed to create new socket!\n");
887 			/* TODO: destroy the stream */
888 			errno = ENFILE;
889 			return -1;
890 		}
891 		socket->stream = accepted;
892 		accepted->socket = socket;
893 
894 		/* set socket addr parameters */
895 		socket->saddr.sin_family = AF_INET;
896 		socket->saddr.sin_port = accepted->dport;
897 		socket->saddr.sin_addr.s_addr = accepted->daddr;
898 
899 		/* if monitor is enabled, complete the socket assignment */
900 		if (socket->stream->pair_stream != NULL)
901 			socket->stream->pair_stream->socket = socket;
902 	}
903 
904 	if (!(listener->socket->epoll & MOS_EPOLLET) &&
905 	    !StreamQueueIsEmpty(listener->acceptq))
906 		AddEpollEvent(mtcp->ep,
907 			      USR_SHADOW_EVENT_QUEUE,
908 			      listener->socket, MOS_EPOLLIN);
909 
910 	TRACE_API("Stream %d accepted.\n", accepted->id);
911 
912 	if (addr && addrlen) {
913 		struct sockaddr_in *addr_in = (struct sockaddr_in *)addr;
914 		addr_in->sin_family = AF_INET;
915 		addr_in->sin_port = accepted->dport;
916 		addr_in->sin_addr.s_addr = accepted->daddr;
917 		*addrlen = sizeof(struct sockaddr_in);
918 	}
919 
920 	return accepted->socket->id;
921 }
922 /*----------------------------------------------------------------------------*/
923 int
924 mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr,
925 		in_addr_t daddr, in_addr_t dport)
926 {
927 	mtcp_manager_t mtcp;
928 	addr_pool_t ap;
929 
930 	mtcp = GetMTCPManager(mctx);
931 	if (!mtcp) {
932 		errno = EACCES;
933 		return -1;
934 	}
935 
936 	if (saddr_base == INADDR_ANY) {
937 		int nif_out;
938 
939 		/* for the INADDR_ANY, find the output interface for the destination
940 		   and set the saddr_base as the ip address of the output interface */
941 		nif_out = GetOutputInterface(daddr);
942 		if (nif_out < 0) {
943 			TRACE_DBG("Could not determine nif idx!\n");
944 			errno = EINVAL;
945 			return -1;
946 		}
947 		saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr;
948 	}
949 
950 	ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus,
951 			saddr_base, num_addr, daddr, dport);
952 	if (!ap) {
953 		errno = ENOMEM;
954 		return -1;
955 	}
956 
957 	mtcp->ap = ap;
958 
959 	return 0;
960 }
961 /*----------------------------------------------------------------------------*/
962 int
963 eval_bpf_5tuple(struct sfbpf_program fcode,
964 				in_addr_t saddr, in_port_t sport,
965 				in_addr_t daddr, in_port_t dport) {
966 	uint8_t buf[TOTAL_TCP_HEADER_LEN];
967 	struct ethhdr *ethh;
968 	struct iphdr *iph;
969 	struct tcphdr *tcph;
970 
971 	ethh = (struct ethhdr *)buf;
972 	ethh->h_proto = htons(ETH_P_IP);
973 	iph = (struct iphdr *)(ethh + 1);
974 	iph->ihl = IP_HEADER_LEN >> 2;
975 	iph->version = 4;
976 	iph->tos = 0;
977 	iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN);
978 	iph->id = htons(0);
979 	iph->protocol = IPPROTO_TCP;
980 	iph->saddr = saddr;
981 	iph->daddr = daddr;
982 	iph->check = 0;
983 	tcph = (struct tcphdr *)(iph + 1);
984 	tcph->source = sport;
985 	tcph->dest = dport;
986 
987 	return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr),
988 						 TOTAL_TCP_HEADER_LEN);
989 }
990 /*----------------------------------------------------------------------------*/
991 int
992 mtcp_connect(mctx_t mctx, int sockid,
993 		const struct sockaddr *addr, socklen_t addrlen)
994 {
995 	mtcp_manager_t mtcp;
996 	socket_map_t socket;
997 	tcp_stream *cur_stream;
998 	struct sockaddr_in *addr_in;
999 	in_addr_t dip;
1000 	in_port_t dport;
1001 	int is_dyn_bound = FALSE;
1002 	int ret, nif;
1003 	int cnt_match = 0;
1004 	struct mon_listener *walk;
1005 	struct sfbpf_program fcode;
1006 
1007 	cur_stream = NULL;
1008 	mtcp = GetMTCPManager(mctx);
1009 	if (!mtcp) {
1010 		errno = EACCES;
1011 		return -1;
1012 	}
1013 
1014 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1015 		TRACE_API("Socket id %d out of range.\n", sockid);
1016 		errno = EBADF;
1017 		return -1;
1018 	}
1019 
1020 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1021 		TRACE_API("Invalid socket id: %d\n", sockid);
1022 		errno = EBADF;
1023 		return -1;
1024 	}
1025 
1026 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
1027 		TRACE_API("Not an end socket. id: %d\n", sockid);
1028 		errno = ENOTSOCK;
1029 		return -1;
1030 	}
1031 
1032 	if (!addr) {
1033 		TRACE_API("Socket %d: empty address!\n", sockid);
1034 		errno = EFAULT;
1035 		return -1;
1036 	}
1037 
1038 	/* we only allow bind() for AF_INET address */
1039 	if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
1040 		TRACE_API("Socket %d: invalid argument!\n", sockid);
1041 		errno = EAFNOSUPPORT;
1042 		return -1;
1043 	}
1044 
1045 	socket = &mtcp->smap[sockid];
1046 	if (socket->stream) {
1047 		TRACE_API("Socket %d: stream already exist!\n", sockid);
1048 		if (socket->stream->state >= TCP_ST_ESTABLISHED) {
1049 			errno = EISCONN;
1050 		} else {
1051 			errno = EALREADY;
1052 		}
1053 		return -1;
1054 	}
1055 
1056 	addr_in = (struct sockaddr_in *)addr;
1057 	dip = addr_in->sin_addr.s_addr;
1058 	dport = addr_in->sin_port;
1059 
1060 	/* address binding */
1061 	if (socket->opts & MTCP_ADDR_BIND &&
1062 	    socket->saddr.sin_port != INPORT_ANY &&
1063 	    socket->saddr.sin_addr.s_addr != INADDR_ANY) {
1064 		int rss_core;
1065 
1066 		rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip,
1067 				socket->saddr.sin_port, dport, num_queues);
1068 
1069 		if (rss_core != mctx->cpu) {
1070 			errno = EINVAL;
1071 			return -1;
1072 		}
1073 	} else {
1074 		if (mtcp->ap) {
1075 			ret = FetchAddress(mtcp->ap,
1076 					mctx->cpu, num_queues, addr_in, &socket->saddr);
1077 		} else {
1078 			nif = GetOutputInterface(dip);
1079 			if (nif < 0) {
1080 				errno = EINVAL;
1081 				return -1;
1082 			}
1083 			ret = FetchAddress(ap[nif],
1084 					   mctx->cpu, num_queues, addr_in, &socket->saddr);
1085 		}
1086 		if (ret < 0) {
1087 			errno = EAGAIN;
1088 			return -1;
1089 		}
1090 		socket->opts |= MTCP_ADDR_BIND;
1091 		is_dyn_bound = TRUE;
1092 	}
1093 
1094 	cnt_match = 0;
1095 	if (mtcp->num_msp > 0) {
1096 		TAILQ_FOREACH(walk, &mtcp->monitors, link) {
1097 			fcode = walk->stream_syn_fcode;
1098 			if (!(ISSET_BPFFILTER(fcode) &&
1099 				  eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr,
1100 								  socket->saddr.sin_port,
1101 								  dip, dport) == 0)) {
1102 				walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1
1103 				cnt_match++;
1104 			}
1105 		}
1106 	}
1107 
1108 	if (mtcp->num_msp > 0 && cnt_match > 0) {
1109 		/* 150820 dhkim: XXX: embedded mode is not verified */
1110 #if 1
1111 		cur_stream = CreateClientTCPStream(mtcp, socket,
1112 						 STREAM_TYPE(MOS_SOCK_STREAM) |
1113 						 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
1114 						 socket->saddr.sin_addr.s_addr,
1115 						 socket->saddr.sin_port, dip, dport, NULL);
1116 #else
1117 		cur_stream = CreateDualTCPStream(mtcp, socket,
1118 						 STREAM_TYPE(MOS_SOCK_STREAM) |
1119 						 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
1120 						 socket->saddr.sin_addr.s_addr,
1121 						 socket->saddr.sin_port, dip, dport, NULL);
1122 #endif
1123 	}
1124 	else
1125 		cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM),
1126 					     socket->saddr.sin_addr.s_addr,
1127 					     socket->saddr.sin_port, dip, dport, NULL);
1128 	if (!cur_stream) {
1129 		TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid);
1130 		errno = ENOMEM;
1131 		return -1;
1132 	}
1133 
1134 	if (is_dyn_bound)
1135 		cur_stream->is_bound_addr = TRUE;
1136 	cur_stream->sndvar->cwnd = 1;
1137 	cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10;
1138 	cur_stream->side = MOS_SIDE_CLI;
1139 	/* if monitor is enabled, update the pair stream side as well */
1140 	if (cur_stream->pair_stream) {
1141 		cur_stream->pair_stream->side = MOS_SIDE_SVR;
1142 		/*
1143 		 * if buffer management is off, then disable
1144 		 * monitoring tcp ring of server...
1145 		 * if there is even a single monitor asking for
1146 		 * buffer management, enable it (that's why the
1147 		 * need for the loop)
1148 		 */
1149 		cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF;
1150 		struct socket_map *walk;
1151 		SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
1152 			uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt;
1153 			if (bm > cur_stream->pair_stream->buffer_mgmt) {
1154 				cur_stream->pair_stream->buffer_mgmt = bm;
1155 				break;
1156 			}
1157 		} SOCKQ_FOREACH_END;
1158 	}
1159 
1160 	cur_stream->state = TCP_ST_SYN_SENT;
1161 	cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1162 
1163 	TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id);
1164 
1165 	SQ_LOCK(&mtcp->ctx->connect_lock);
1166 	ret = StreamEnqueue(mtcp->connectq, cur_stream);
1167 	SQ_UNLOCK(&mtcp->ctx->connect_lock);
1168 	mtcp->wakeup_flag = TRUE;
1169 	if (ret < 0) {
1170 		TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid);
1171 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1172 		StreamEnqueue(mtcp->destroyq, cur_stream);
1173 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1174 		errno = EAGAIN;
1175 		return -1;
1176 	}
1177 
1178 	/* if nonblocking socket, return EINPROGRESS */
1179 	if (socket->opts & MTCP_NONBLOCK) {
1180 		errno = EINPROGRESS;
1181 		return -1;
1182 
1183 	} else {
1184 		while (1) {
1185 			if (!cur_stream) {
1186 				TRACE_ERROR("STREAM DESTROYED\n");
1187 				errno = ETIMEDOUT;
1188 				return -1;
1189 			}
1190 			if (cur_stream->state > TCP_ST_ESTABLISHED) {
1191 				TRACE_ERROR("Socket %d: weird state %s\n",
1192 						sockid, TCPStateToString(cur_stream));
1193 				// TODO: how to handle this?
1194 				errno = ENOSYS;
1195 				return -1;
1196 			}
1197 
1198 			if (cur_stream->state == TCP_ST_ESTABLISHED) {
1199 				break;
1200 			}
1201 			usleep(1000);
1202 		}
1203 	}
1204 
1205 	return 0;
1206 }
1207 /*----------------------------------------------------------------------------*/
1208 static inline int
1209 CloseStreamSocket(mctx_t mctx, int sockid)
1210 {
1211 	mtcp_manager_t mtcp;
1212 	tcp_stream *cur_stream;
1213 	int ret;
1214 
1215 	mtcp = GetMTCPManager(mctx);
1216 	if (!mtcp) {
1217 		errno = EACCES;
1218 		return -1;
1219 	}
1220 
1221 	cur_stream = mtcp->smap[sockid].stream;
1222 	if (!cur_stream) {
1223 		TRACE_API("Socket %d: stream does not exist.\n", sockid);
1224 		errno = ENOTCONN;
1225 		return -1;
1226 	}
1227 
1228 	if (cur_stream->closed) {
1229 		TRACE_API("Socket %d (Stream %u): already closed stream\n",
1230 				sockid, cur_stream->id);
1231 		return 0;
1232 	}
1233 	cur_stream->closed = TRUE;
1234 
1235 	TRACE_API("Stream %d: closing the stream.\n", cur_stream->id);
1236 
1237 	/* 141029 dhkim: Check this! */
1238 	cur_stream->socket = NULL;
1239 
1240 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
1241 		TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n",
1242 				cur_stream->id);
1243 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1244 		StreamEnqueue(mtcp->destroyq, cur_stream);
1245 		mtcp->wakeup_flag = TRUE;
1246 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1247 		return 0;
1248 
1249 	} else if (cur_stream->state == TCP_ST_SYN_SENT) {
1250 #if 1
1251 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1252 		StreamEnqueue(mtcp->destroyq, cur_stream);
1253 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1254 		mtcp->wakeup_flag = TRUE;
1255 #endif
1256 		return -1;
1257 
1258 	} else if (cur_stream->state != TCP_ST_ESTABLISHED &&
1259 			cur_stream->state != TCP_ST_CLOSE_WAIT) {
1260 		TRACE_API("Stream %d at state %s\n",
1261 				cur_stream->id, TCPStateToString(cur_stream));
1262 		errno = EBADF;
1263 		return -1;
1264 	}
1265 
1266 	SQ_LOCK(&mtcp->ctx->close_lock);
1267 	cur_stream->sndvar->on_closeq = TRUE;
1268 	ret = StreamEnqueue(mtcp->closeq, cur_stream);
1269 	mtcp->wakeup_flag = TRUE;
1270 	SQ_UNLOCK(&mtcp->ctx->close_lock);
1271 
1272 	if (ret < 0) {
1273 		TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
1274 		errno = EAGAIN;
1275 		return -1;
1276 	}
1277 
1278 	return 0;
1279 }
1280 /*----------------------------------------------------------------------------*/
1281 static inline int
1282 CloseListeningSocket(mctx_t mctx, int sockid)
1283 {
1284 	mtcp_manager_t mtcp;
1285 	struct tcp_listener *listener;
1286 
1287 	mtcp = GetMTCPManager(mctx);
1288 	if (!mtcp) {
1289 		errno = EACCES;
1290 		return -1;
1291 	}
1292 
1293 	listener = mtcp->smap[sockid].listener;
1294 	if (!listener) {
1295 		errno = EINVAL;
1296 		return -1;
1297 	}
1298 
1299 	if (listener->acceptq) {
1300 		DestroyStreamQueue(listener->acceptq);
1301 		listener->acceptq = NULL;
1302 	}
1303 
1304 	pthread_mutex_lock(&listener->accept_lock);
1305 	pthread_cond_signal(&listener->accept_cond);
1306 	pthread_mutex_unlock(&listener->accept_lock);
1307 
1308 	pthread_cond_destroy(&listener->accept_cond);
1309 	pthread_mutex_destroy(&listener->accept_lock);
1310 
1311 	free(listener);
1312 	mtcp->smap[sockid].listener = NULL;
1313 
1314 	return 0;
1315 }
1316 /*----------------------------------------------------------------------------*/
1317 int
1318 mtcp_close(mctx_t mctx, int sockid)
1319 {
1320 	mtcp_manager_t mtcp;
1321 	int ret;
1322 
1323 	mtcp = GetMTCPManager(mctx);
1324 	if (!mtcp) {
1325 		errno = EACCES;
1326 		return -1;
1327 	}
1328 
1329 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1330 		TRACE_API("Socket id %d out of range.\n", sockid);
1331 		errno = EBADF;
1332 		return -1;
1333 	}
1334 
1335 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1336 		TRACE_API("Invalid socket id: %d\n", sockid);
1337 		errno = EBADF;
1338 		return -1;
1339 	}
1340 
1341 	TRACE_API("Socket %d: mtcp_close called.\n", sockid);
1342 
1343 	switch (mtcp->smap[sockid].socktype) {
1344 	case MOS_SOCK_STREAM:
1345 		ret = CloseStreamSocket(mctx, sockid);
1346 		break;
1347 
1348 	case MOS_SOCK_STREAM_LISTEN:
1349 		ret = CloseListeningSocket(mctx, sockid);
1350 		break;
1351 
1352 	case MOS_SOCK_EPOLL:
1353 		ret = CloseEpollSocket(mctx, sockid);
1354 		break;
1355 
1356 	case MOS_SOCK_PIPE:
1357 		ret = PipeClose(mctx, sockid);
1358 		break;
1359 
1360 	default:
1361 		errno = EINVAL;
1362 		ret = -1;
1363 		break;
1364 	}
1365 
1366 	FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
1367 
1368 	return ret;
1369 }
1370 /*----------------------------------------------------------------------------*/
1371 int
1372 mtcp_abort(mctx_t mctx, int sockid)
1373 {
1374 	mtcp_manager_t mtcp;
1375 	tcp_stream *cur_stream;
1376 	int ret;
1377 
1378 	mtcp = GetMTCPManager(mctx);
1379 	if (!mtcp) {
1380 		errno = EACCES;
1381 		return -1;
1382 	}
1383 
1384 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1385 		TRACE_API("Socket id %d out of range.\n", sockid);
1386 		errno = EBADF;
1387 		return -1;
1388 	}
1389 
1390 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1391 		TRACE_API("Invalid socket id: %d\n", sockid);
1392 		errno = EBADF;
1393 		return -1;
1394 	}
1395 
1396 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
1397 		TRACE_API("Not an end socket. id: %d\n", sockid);
1398 		errno = ENOTSOCK;
1399 		return -1;
1400 	}
1401 
1402 	cur_stream = mtcp->smap[sockid].stream;
1403 	if (!cur_stream) {
1404 		TRACE_API("Stream %d: does not exist.\n", sockid);
1405 		errno = ENOTCONN;
1406 		return -1;
1407 	}
1408 
1409 	TRACE_API("Socket %d: mtcp_abort()\n", sockid);
1410 
1411 	FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
1412 	cur_stream->socket = NULL;
1413 
1414 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
1415 		TRACE_API("Stream %d: connection already reset.\n", sockid);
1416 		return ERROR;
1417 
1418 	} else if (cur_stream->state == TCP_ST_SYN_SENT) {
1419 		/* TODO: this should notify event failure to all
1420 		   previous read() or write() calls */
1421 		cur_stream->state = TCP_ST_CLOSED_RSVD;
1422 		cur_stream->close_reason = TCP_ACTIVE_CLOSE;
1423 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1424 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1425 		StreamEnqueue(mtcp->destroyq, cur_stream);
1426 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1427 		mtcp->wakeup_flag = TRUE;
1428 		return 0;
1429 
1430 	} else if (cur_stream->state == TCP_ST_CLOSING ||
1431 			cur_stream->state == TCP_ST_LAST_ACK ||
1432 			cur_stream->state == TCP_ST_TIME_WAIT) {
1433 		cur_stream->state = TCP_ST_CLOSED_RSVD;
1434 		cur_stream->close_reason = TCP_ACTIVE_CLOSE;
1435 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1436 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1437 		StreamEnqueue(mtcp->destroyq, cur_stream);
1438 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1439 		mtcp->wakeup_flag = TRUE;
1440 		return 0;
1441 	}
1442 
1443 	/* the stream structure will be destroyed after sending RST */
1444 	if (cur_stream->sndvar->on_resetq) {
1445 		TRACE_ERROR("Stream %d: calling mtcp_abort() "
1446 				"when in reset queue.\n", sockid);
1447 		errno = ECONNRESET;
1448 		return -1;
1449 	}
1450 	SQ_LOCK(&mtcp->ctx->reset_lock);
1451 	cur_stream->sndvar->on_resetq = TRUE;
1452 	ret = StreamEnqueue(mtcp->resetq, cur_stream);
1453 	SQ_UNLOCK(&mtcp->ctx->reset_lock);
1454 	mtcp->wakeup_flag = TRUE;
1455 
1456 	if (ret < 0) {
1457 		TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
1458 		errno = EAGAIN;
1459 		return -1;
1460 	}
1461 
1462 	return 0;
1463 }
1464 /*----------------------------------------------------------------------------*/
1465 static inline int
1466 PeekForUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len)
1467 {
1468 	struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
1469 	int copylen;
1470 	tcprb_t *rb = rcvvar->rcvbuf;
1471 
1472 	if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) {
1473 		errno = EAGAIN;
1474 		return -1;
1475 	}
1476 
1477 	return copylen;
1478 }
1479 /*----------------------------------------------------------------------------*/
1480 static inline int
1481 CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len)
1482 {
1483 	struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
1484 	int copylen;
1485 	tcprb_t *rb = rcvvar->rcvbuf;
1486 	if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) {
1487 		errno = EAGAIN;
1488 		return -1;
1489 	}
1490 	tcprb_setpile(rb, rb->pile + copylen);
1491 
1492 	rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb);
1493 	//printf("rcv_wnd: %d\n", rcvvar->rcv_wnd);
1494 
1495 	/* Advertise newly freed receive buffer */
1496 	if (cur_stream->need_wnd_adv) {
1497 		if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) {
1498 			if (!cur_stream->sndvar->on_ackq) {
1499 				SQ_LOCK(&mtcp->ctx->ackq_lock);
1500 				cur_stream->sndvar->on_ackq = TRUE;
1501 				StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */
1502 				SQ_UNLOCK(&mtcp->ctx->ackq_lock);
1503 				cur_stream->need_wnd_adv = FALSE;
1504 				mtcp->wakeup_flag = TRUE;
1505 			}
1506 		}
1507 	}
1508 
1509 	return copylen;
1510 }
1511 /*----------------------------------------------------------------------------*/
1512 ssize_t
1513 mtcp_recv(mctx_t mctx, int sockid, char *buf, size_t len, int flags)
1514 {
1515 	mtcp_manager_t mtcp;
1516 	socket_map_t socket;
1517 	tcp_stream *cur_stream;
1518 	struct tcp_recv_vars *rcvvar;
1519 	int event_remaining, merged_len;
1520 	int ret;
1521 
1522 	mtcp = GetMTCPManager(mctx);
1523 	if (!mtcp) {
1524 		errno = EACCES;
1525 		return -1;
1526 	}
1527 
1528 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1529 		TRACE_API("Socket id %d out of range.\n", sockid);
1530 		errno = EBADF;
1531 		return -1;
1532 	}
1533 
1534 	socket = &mtcp->smap[sockid];
1535 	if (socket->socktype == MOS_SOCK_UNUSED) {
1536 		TRACE_API("Invalid socket id: %d\n", sockid);
1537 		errno = EBADF;
1538 		return -1;
1539 	}
1540 
1541 	if (socket->socktype == MOS_SOCK_PIPE) {
1542 		return PipeRead(mctx, sockid, buf, len);
1543 	}
1544 
1545 	if (socket->socktype != MOS_SOCK_STREAM) {
1546 		TRACE_API("Not an end socket. id: %d\n", sockid);
1547 		errno = ENOTSOCK;
1548 		return -1;
1549 	}
1550 
1551 	/* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
1552 	cur_stream = socket->stream;
1553 	if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf ||
1554 	    !(cur_stream->state >= TCP_ST_ESTABLISHED &&
1555 	      cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
1556 		errno = ENOTCONN;
1557 		return -1;
1558 	}
1559 
1560 	rcvvar = cur_stream->rcvvar;
1561 
1562 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1563 
1564 	/* if CLOSE_WAIT, return 0 if there is no payload */
1565 	if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
1566 		if (merged_len == 0)
1567 			return 0;
1568 	}
1569 
1570 	/* return EAGAIN if no receive buffer */
1571 	if (socket->opts & MTCP_NONBLOCK) {
1572 		if (merged_len == 0) {
1573 			errno = EAGAIN;
1574 			return -1;
1575 		}
1576 	}
1577 
1578 	SBUF_LOCK(&rcvvar->read_lock);
1579 
1580 	switch (flags) {
1581 	case 0:
1582 		ret = CopyToUser(mtcp, cur_stream, buf, len);
1583 		break;
1584 	case MSG_PEEK:
1585 		ret = PeekForUser(mtcp, cur_stream, buf, len);
1586 		break;
1587 	default:
1588 		SBUF_UNLOCK(&rcvvar->read_lock);
1589 		ret = -1;
1590 		errno = EINVAL;
1591 		return ret;
1592 	}
1593 
1594 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1595 	event_remaining = FALSE;
1596 	/* if there are remaining payload, generate EPOLLIN */
1597 	/* (may due to insufficient user buffer) */
1598 	if (socket->epoll & MOS_EPOLLIN) {
1599 		if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
1600 			event_remaining = TRUE;
1601 		}
1602 	}
1603 	/* if waiting for close, notify it if no remaining data */
1604 	if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
1605 	    merged_len == 0 && ret > 0) {
1606 		event_remaining = TRUE;
1607 	}
1608 
1609 	SBUF_UNLOCK(&rcvvar->read_lock);
1610 
1611 	if (event_remaining) {
1612 		if (socket->epoll) {
1613 			AddEpollEvent(mtcp->ep,
1614 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
1615 		}
1616 	}
1617 
1618 	TRACE_API("Stream %d: mtcp_recv() returning %d\n", cur_stream->id, ret);
1619 	return ret;
1620 }
1621 /*----------------------------------------------------------------------------*/
1622 inline ssize_t
1623 mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len)
1624 {
1625 	return mtcp_recv(mctx, sockid, buf, len, 0);
1626 }
1627 /*----------------------------------------------------------------------------*/
1628 ssize_t
1629 mtcp_readv(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV)
1630 {
1631 	mtcp_manager_t mtcp;
1632 	socket_map_t socket;
1633 	tcp_stream *cur_stream;
1634 	struct tcp_recv_vars *rcvvar;
1635 	int ret, bytes_read, i;
1636 	int event_remaining, merged_len;
1637 
1638 	mtcp = GetMTCPManager(mctx);
1639 	if (!mtcp) {
1640 		errno = EACCES;
1641 		return -1;
1642 	}
1643 
1644 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1645 		TRACE_API("Socket id %d out of range.\n", sockid);
1646 		errno = EBADF;
1647 		return -1;
1648 	}
1649 
1650 	socket = &mtcp->smap[sockid];
1651 	if (socket->socktype == MOS_SOCK_UNUSED) {
1652 		TRACE_API("Invalid socket id: %d\n", sockid);
1653 		errno = EBADF;
1654 		return -1;
1655 	}
1656 
1657 	if (socket->socktype != MOS_SOCK_STREAM) {
1658 		TRACE_API("Not an end socket. id: %d\n", sockid);
1659 		errno = ENOTSOCK;
1660 		return -1;
1661 	}
1662 
1663 	/* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
1664 	cur_stream = socket->stream;
1665 	if (!cur_stream || !cur_stream->rcvvar->rcvbuf ||
1666 			!(cur_stream->state >= TCP_ST_ESTABLISHED &&
1667 			  cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
1668 		errno = ENOTCONN;
1669 		return -1;
1670 	}
1671 
1672 	rcvvar = cur_stream->rcvvar;
1673 
1674 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1675 
1676 	/* if CLOSE_WAIT, return 0 if there is no payload */
1677 	if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
1678 		if (merged_len == 0)
1679 			return 0;
1680 	}
1681 
1682 	/* return EAGAIN if no receive buffer */
1683 	if (socket->opts & MTCP_NONBLOCK) {
1684 		if (merged_len == 0) {
1685 			errno = EAGAIN;
1686 			return -1;
1687 		}
1688 	}
1689 
1690 	SBUF_LOCK(&rcvvar->read_lock);
1691 
1692 	/* read and store the contents to the vectored buffers */
1693 	bytes_read = 0;
1694 	for (i = 0; i < numIOV; i++) {
1695 		if (iov[i].iov_len <= 0)
1696 			continue;
1697 
1698 		ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
1699 		if (ret <= 0)
1700 			break;
1701 
1702 		bytes_read += ret;
1703 
1704 		if (ret < iov[i].iov_len)
1705 			break;
1706 	}
1707 
1708 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1709 
1710 	event_remaining = FALSE;
1711 	/* if there are remaining payload, generate read event */
1712 	/* (may due to insufficient user buffer) */
1713 	if (socket->epoll & MOS_EPOLLIN) {
1714 		if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
1715 			event_remaining = TRUE;
1716 		}
1717 	}
1718 	/* if waiting for close, notify it if no remaining data */
1719 	if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
1720 			merged_len == 0 && bytes_read > 0) {
1721 		event_remaining = TRUE;
1722 	}
1723 
1724 	SBUF_UNLOCK(&rcvvar->read_lock);
1725 
1726 	if(event_remaining) {
1727 		if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) {
1728 			AddEpollEvent(mtcp->ep,
1729 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
1730 		}
1731 	}
1732 
1733 	TRACE_API("Stream %d: mtcp_readv() returning %d\n",
1734 			cur_stream->id, bytes_read);
1735 	return bytes_read;
1736 }
1737 /*----------------------------------------------------------------------------*/
1738 static inline int
1739 CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, const char *buf, int len)
1740 {
1741 	struct tcp_send_vars *sndvar = cur_stream->sndvar;
1742 	int sndlen;
1743 	int ret;
1744 
1745 	sndlen = MIN((int)sndvar->snd_wnd, len);
1746 	if (sndlen <= 0) {
1747 		errno = EAGAIN;
1748 		return -1;
1749 	}
1750 
1751 	/* allocate send buffer if not exist */
1752 	if (!sndvar->sndbuf) {
1753 		sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1);
1754 		if (!sndvar->sndbuf) {
1755 			cur_stream->close_reason = TCP_NO_MEM;
1756 			/* notification may not required due to -1 return */
1757 			errno = ENOMEM;
1758 			return -1;
1759 		}
1760 	}
1761 
1762 	ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen);
1763 	assert(ret == sndlen);
1764 	sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len;
1765 	if (ret <= 0) {
1766 		TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n",
1767 				ret, sndlen, sndvar->sndbuf->len);
1768 		errno = EAGAIN;
1769 		return -1;
1770 	}
1771 
1772 	if (sndvar->snd_wnd <= 0) {
1773 		TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n",
1774 				cur_stream->id, sndvar->snd_wnd);
1775 	}
1776 
1777 	return ret;
1778 }
1779 /*----------------------------------------------------------------------------*/
1780 ssize_t
1781 mtcp_write(mctx_t mctx, int sockid, const char *buf, size_t len)
1782 {
1783 	mtcp_manager_t mtcp;
1784 	socket_map_t socket;
1785 	tcp_stream *cur_stream;
1786 	struct tcp_send_vars *sndvar;
1787 	int ret;
1788 
1789 	mtcp = GetMTCPManager(mctx);
1790 	if (!mtcp) {
1791 		errno = EACCES;
1792 		return -1;
1793 	}
1794 
1795 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1796 		TRACE_API("Socket id %d out of range.\n", sockid);
1797 		errno = EBADF;
1798 		return -1;
1799 	}
1800 
1801 	socket = &mtcp->smap[sockid];
1802 	if (socket->socktype == MOS_SOCK_UNUSED) {
1803 		TRACE_API("Invalid socket id: %d\n", sockid);
1804 		errno = EBADF;
1805 		return -1;
1806 	}
1807 
1808 	if (socket->socktype == MOS_SOCK_PIPE) {
1809 		return PipeWrite(mctx, sockid, buf, len);
1810 	}
1811 
1812 	if (socket->socktype != MOS_SOCK_STREAM) {
1813 		TRACE_API("Not an end socket. id: %d\n", sockid);
1814 		errno = ENOTSOCK;
1815 		return -1;
1816 	}
1817 
1818 	cur_stream = socket->stream;
1819 	if (!cur_stream ||
1820 			!(cur_stream->state == TCP_ST_ESTABLISHED ||
1821 			  cur_stream->state == TCP_ST_CLOSE_WAIT)) {
1822 		errno = ENOTCONN;
1823 		return -1;
1824 	}
1825 
1826 	if (len <= 0) {
1827 		if (socket->opts & MTCP_NONBLOCK) {
1828 			errno = EAGAIN;
1829 			return -1;
1830 		} else {
1831 			return 0;
1832 		}
1833 	}
1834 
1835 	sndvar = cur_stream->sndvar;
1836 
1837 	SBUF_LOCK(&sndvar->write_lock);
1838 	ret = CopyFromUser(mtcp, cur_stream, buf, len);
1839 
1840 	SBUF_UNLOCK(&sndvar->write_lock);
1841 
1842 	if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
1843 		SQ_LOCK(&mtcp->ctx->sendq_lock);
1844 		sndvar->on_sendq = TRUE;
1845 		StreamEnqueue(mtcp->sendq, cur_stream);		/* this always success */
1846 		SQ_UNLOCK(&mtcp->ctx->sendq_lock);
1847 		mtcp->wakeup_flag = TRUE;
1848 	}
1849 
1850 	if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) {
1851 		ret = -1;
1852 		errno = EAGAIN;
1853 	}
1854 
1855 	/* if there are remaining sending buffer, generate write event */
1856 	if (sndvar->snd_wnd > 0) {
1857 		if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
1858 			AddEpollEvent(mtcp->ep,
1859 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
1860 		}
1861 	}
1862 
1863 	TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret);
1864 	return ret;
1865 }
1866 /*----------------------------------------------------------------------------*/
1867 ssize_t
1868 mtcp_writev(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV)
1869 {
1870 	mtcp_manager_t mtcp;
1871 	socket_map_t socket;
1872 	tcp_stream *cur_stream;
1873 	struct tcp_send_vars *sndvar;
1874 	int ret, to_write, i;
1875 
1876 	mtcp = GetMTCPManager(mctx);
1877 	if (!mtcp) {
1878 		errno = EACCES;
1879 		return -1;
1880 	}
1881 
1882 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1883 		TRACE_API("Socket id %d out of range.\n", sockid);
1884 		errno = EBADF;
1885 		return -1;
1886 	}
1887 
1888 	socket = &mtcp->smap[sockid];
1889 	if (socket->socktype == MOS_SOCK_UNUSED) {
1890 		TRACE_API("Invalid socket id: %d\n", sockid);
1891 		errno = EBADF;
1892 		return -1;
1893 	}
1894 
1895 	if (socket->socktype != MOS_SOCK_STREAM) {
1896 		TRACE_API("Not an end socket. id: %d\n", sockid);
1897 		errno = ENOTSOCK;
1898 		return -1;
1899 	}
1900 
1901 	cur_stream = socket->stream;
1902 	if (!cur_stream ||
1903 			!(cur_stream->state == TCP_ST_ESTABLISHED ||
1904 			  cur_stream->state == TCP_ST_CLOSE_WAIT)) {
1905 		errno = ENOTCONN;
1906 		return -1;
1907 	}
1908 
1909 	sndvar = cur_stream->sndvar;
1910 	SBUF_LOCK(&sndvar->write_lock);
1911 
1912 	/* write from the vectored buffers */
1913 	to_write = 0;
1914 	for (i = 0; i < numIOV; i++) {
1915 		if (iov[i].iov_len <= 0)
1916 			continue;
1917 
1918 		ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
1919 		if (ret <= 0)
1920 			break;
1921 
1922 		to_write += ret;
1923 
1924 		if (ret < iov[i].iov_len)
1925 			break;
1926 	}
1927 	SBUF_UNLOCK(&sndvar->write_lock);
1928 
1929 	if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
1930 		SQ_LOCK(&mtcp->ctx->sendq_lock);
1931 		sndvar->on_sendq = TRUE;
1932 		StreamEnqueue(mtcp->sendq, cur_stream);		/* this always success */
1933 		SQ_UNLOCK(&mtcp->ctx->sendq_lock);
1934 		mtcp->wakeup_flag = TRUE;
1935 	}
1936 
1937 	if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) {
1938 		to_write = -1;
1939 		errno = EAGAIN;
1940 	}
1941 
1942 	/* if there are remaining sending buffer, generate write event */
1943 	if (sndvar->snd_wnd > 0) {
1944 		if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
1945 			AddEpollEvent(mtcp->ep,
1946 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
1947 		}
1948 	}
1949 
1950 	TRACE_API("Stream %d: mtcp_writev() returning %d\n",
1951 			cur_stream->id, to_write);
1952 	return to_write;
1953 }
1954 /*----------------------------------------------------------------------------*/
1955 uint32_t
1956 mtcp_get_connection_cnt(mctx_t mctx)
1957 {
1958 	mtcp_manager_t mtcp;
1959 	mtcp = GetMTCPManager(mctx);
1960 	if (!mtcp) {
1961 		errno = EACCES;
1962 		return -1;
1963 	}
1964 
1965 	if (mtcp->num_msp > 0)
1966 		return mtcp->flow_cnt / 2;
1967 	else
1968 		return mtcp->flow_cnt;
1969 }
1970 /*----------------------------------------------------------------------------*/
1971