xref: /mOS-networking-stack/core/src/api.c (revision 1c9bc629)
1 #include <sys/queue.h>
2 #include <sys/ioctl.h>
3 #include <limits.h>
4 #include <unistd.h>
5 #include <assert.h>
6 #include <string.h>
7 
8 #ifdef DARWIN
9 #include <netinet/tcp.h>
10 #include <netinet/ip.h>
11 #include <netinet/if_ether.h>
12 #endif
13 
14 #include "mtcp.h"
15 #include "mtcp_api.h"
16 #include "tcp_in.h"
17 #include "tcp_stream.h"
18 #include "tcp_out.h"
19 #include "ip_out.h"
20 #include "eventpoll.h"
21 #include "pipe.h"
22 #include "fhash.h"
23 #include "addr_pool.h"
24 #include "util.h"
25 #include "config.h"
26 #include "debug.h"
27 #include "eventpoll.h"
28 #include "mos_api.h"
29 #include "tcp_rb.h"
30 
31 #define MAX(a, b) ((a)>(b)?(a):(b))
32 #define MIN(a, b) ((a)<(b)?(a):(b))
33 
34 /*----------------------------------------------------------------------------*/
35 /** Stop monitoring the socket! (function prototype)
36  * @param [in] mctx: mtcp context
37  * @param [in] sock: monitoring stream socket id
38  * @param [in] side: side of monitoring (client side, server side or both)
39  *
40  * This function is now DEPRECATED and is only used within mOS core...
41  */
42 int
43 mtcp_cb_stop(mctx_t mctx, int sock, int side);
44 /*----------------------------------------------------------------------------*/
45 /** Reset the connection (send RST packets to both sides)
46  *  (We need to decide the API for this.)
47  */
48 //int
49 //mtcp_cb_reset(mctx_t mctx, int sock, int side);
50 /*----------------------------------------------------------------------------*/
51 inline mtcp_manager_t
52 GetMTCPManager(mctx_t mctx)
53 {
54 	if (!mctx) {
55 		errno = EACCES;
56 		return NULL;
57 	}
58 
59 	if (mctx->cpu < 0 || mctx->cpu >= num_cpus) {
60 		errno = EINVAL;
61 		return NULL;
62 	}
63 
64 	if (!g_mtcp[mctx->cpu] || g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) {
65 		errno = EPERM;
66 		return NULL;
67 	}
68 
69 	return g_mtcp[mctx->cpu];
70 }
71 /*----------------------------------------------------------------------------*/
72 static inline int
73 GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen)
74 {
75 	tcp_stream *cur_stream;
76 
77 	if (!socket->stream) {
78 		errno = EBADF;
79 		return -1;
80 	}
81 
82 	cur_stream = socket->stream;
83 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
84 		if (cur_stream->close_reason == TCP_TIMEDOUT ||
85 				cur_stream->close_reason == TCP_CONN_FAIL ||
86 				cur_stream->close_reason == TCP_CONN_LOST) {
87 			*(int *)optval = ETIMEDOUT;
88 			*optlen = sizeof(int);
89 
90 			return 0;
91 		}
92 	}
93 
94 	if (cur_stream->state == TCP_ST_CLOSE_WAIT ||
95 			cur_stream->state == TCP_ST_CLOSED_RSVD) {
96 		if (cur_stream->close_reason == TCP_RESET) {
97 			*(int *)optval = ECONNRESET;
98 			*optlen = sizeof(int);
99 
100 			return 0;
101 		}
102 	}
103 
104 	if (cur_stream->state == TCP_ST_SYN_SENT &&
105 	    errno == EINPROGRESS) {
106 		*(int *)optval = errno;
107 		*optlen = sizeof(int);
108 
109 		return -1;
110 	}
111 
112 	/*
113 	 * `base case`: If socket sees no so_error, then
114 	 * this also means close_reason will always be
115 	 * TCP_NOT_CLOSED.
116 	 */
117 	if (cur_stream->close_reason == TCP_NOT_CLOSED) {
118 		*(int *)optval = 0;
119 		*optlen = sizeof(int);
120 
121 		return 0;
122 	}
123 
124 	errno = ENOSYS;
125 	return -1;
126 }
127 /*----------------------------------------------------------------------------*/
128 int
129 mtcp_getsockname(mctx_t mctx, int sockid, struct sockaddr *addr,
130 		 socklen_t *addrlen)
131 {
132 	mtcp_manager_t mtcp;
133 	socket_map_t socket;
134 
135 	mtcp = GetMTCPManager(mctx);
136 	if (!mtcp) {
137 		return -1;
138 	}
139 
140 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
141 		TRACE_API("Socket id %d out of range.\n", sockid);
142 		errno = EBADF;
143 		return -1;
144 	}
145 
146 	socket = &mtcp->smap[sockid];
147 	if (socket->socktype == MOS_SOCK_UNUSED) {
148 		TRACE_API("Invalid socket id: %d\n", sockid);
149 		errno = EBADF;
150 		return -1;
151 	}
152 
153 	if (*addrlen <= 0) {
154 		TRACE_API("Invalid addrlen: %d\n", *addrlen);
155 		errno = EINVAL;
156 		return -1;
157 	}
158 
159 	if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
160 	    socket->socktype != MOS_SOCK_STREAM) {
161 		TRACE_API("Invalid socket id: %d\n", sockid);
162 		errno = ENOTSOCK;
163 		return -1;
164 	}
165 
166 	*(struct sockaddr_in *)addr = socket->saddr;
167         *addrlen = sizeof(socket->saddr);
168 
169 	return 0;
170 }
171 /*----------------------------------------------------------------------------*/
172 int
173 mtcp_getsockopt(mctx_t mctx, int sockid, int level,
174 		int optname, void *optval, socklen_t *optlen)
175 {
176 	mtcp_manager_t mtcp;
177 	socket_map_t socket;
178 
179 	mtcp = GetMTCPManager(mctx);
180 	if (!mtcp) {
181 		errno = EACCES;
182 		return -1;
183 	}
184 
185 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
186 		TRACE_API("Socket id %d out of range.\n", sockid);
187 		errno = EBADF;
188 		return -1;
189 	}
190 
191 	switch (level) {
192 	case SOL_SOCKET:
193 		socket = &mtcp->smap[sockid];
194 		if (socket->socktype == MOS_SOCK_UNUSED) {
195 			TRACE_API("Invalid socket id: %d\n", sockid);
196 			errno = EBADF;
197 			return -1;
198 		}
199 
200 		if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
201 		    socket->socktype != MOS_SOCK_STREAM) {
202 			TRACE_API("Invalid socket id: %d\n", sockid);
203 			errno = ENOTSOCK;
204 			return -1;
205 		}
206 
207 		if (optname == SO_ERROR) {
208 			if (socket->socktype == MOS_SOCK_STREAM) {
209 				return GetSocketError(socket, optval, optlen);
210 			}
211 		}
212 		break;
213 	case SOL_MONSOCKET:
214 		/* check if the calling thread is in MOS context */
215 		if (mtcp->ctx->thread != pthread_self()) {
216 			errno = EPERM;
217 			return -1;
218 		}
219 		/*
220 		 * All options will only work for active
221 		 * monitor stream sockets
222 		 */
223 		socket = &mtcp->msmap[sockid];
224 		if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
225 			TRACE_API("Invalid socket id: %d\n", sockid);
226 			errno = ENOTSOCK;
227 			return -1;
228 		}
229 
230 		switch (optname) {
231 		case MOS_FRAGINFO_CLIBUF:
232 			return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen);
233 		case MOS_FRAGINFO_SVRBUF:
234 			return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen);
235 		case MOS_INFO_CLIBUF:
236 			return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen);
237 		case MOS_INFO_SVRBUF:
238 			return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen);
239 		case MOS_TCP_STATE_CLI:
240 			return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI,
241 							   optval, optlen);
242 		case MOS_TCP_STATE_SVR:
243 			return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR,
244 							   optval, optlen);
245 		case MOS_TIMESTAMP:
246 			return GetLastTimestamp(socket->monitor_stream->stream,
247 						(uint32_t *)optval,
248 						optlen);
249 		default:
250 		  TRACE_API("can't recognize the optname=%d\n", optname);
251 		  assert(0);
252 		}
253 		break;
254 	}
255 	errno = ENOSYS;
256 	return -1;
257 }
258 /*----------------------------------------------------------------------------*/
259 int
260 mtcp_setsockopt(mctx_t mctx, int sockid, int level,
261 		int optname, const void *optval, socklen_t optlen)
262 {
263 	mtcp_manager_t mtcp;
264 	socket_map_t socket;
265 	tcprb_t *rb;
266 
267 	mtcp = GetMTCPManager(mctx);
268 	if (!mtcp) {
269 		errno = EACCES;
270 		return -1;
271 	}
272 
273 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
274 		TRACE_API("Socket id %d out of range.\n", sockid);
275 		errno = EBADF;
276 		return -1;
277 	}
278 
279 	switch (level) {
280 	case SOL_SOCKET:
281 		socket = &mtcp->smap[sockid];
282 		if (socket->socktype == MOS_SOCK_UNUSED) {
283 			TRACE_API("Invalid socket id: %d\n", sockid);
284 			errno = EBADF;
285 			return -1;
286 		}
287 
288 		if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
289 		    socket->socktype != MOS_SOCK_STREAM) {
290 			TRACE_API("Invalid socket id: %d\n", sockid);
291 			errno = ENOTSOCK;
292 			return -1;
293 		}
294 		break;
295 	case SOL_MONSOCKET:
296 		socket = &mtcp->msmap[sockid];
297 		/*
298 		 * checking of calling thread to be in MOS context is
299 		 * disabled since both optnames can be called from
300 		 * `application' context (on passive sockets)
301 		 */
302 		/*
303 		 * if (mtcp->ctx->thread != pthread_self())
304 		 * return -1;
305 		 */
306 
307 		switch (optname) {
308 		case MOS_CLIOVERLAP:
309 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
310 				socket->monitor_stream->stream->rcvvar->rcvbuf :
311 			socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
312 			if (rb == NULL) {
313 				errno = EFAULT;
314 				return -1;
315 			}
316 			if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) {
317 				errno = EINVAL;
318 				return -1;
319 			} else
320 				return 0;
321 			break;
322 		case MOS_SVROVERLAP:
323 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
324 				socket->monitor_stream->stream->rcvvar->rcvbuf :
325 			socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
326 			if (rb == NULL) {
327 				errno = EFAULT;
328 				return -1;
329 			}
330 			if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) {
331 				errno = EINVAL;
332 				return -1;
333 			} else
334 				return 0;
335 			break;
336 		case MOS_CLIBUF:
337 #if 0
338 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
339 				errno = EBADF;
340 				return -1;
341 			}
342 #endif
343 #ifdef DISABLE_DYN_RESIZE
344 			if (*(int *)optval != 0)
345 				return -1;
346 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
347 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
348 					socket->monitor_stream->stream->rcvvar->rcvbuf :
349 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
350 				if (rb) {
351 					tcprb_resize_meta(rb, 0);
352 					tcprb_resize(rb, 0);
353 				}
354 			}
355 			return DisableBuf(socket, MOS_SIDE_CLI);
356 #else
357 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
358 				socket->monitor_stream->stream->rcvvar->rcvbuf :
359 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
360 			if (tcprb_resize_meta(rb, *(int *)optval) < 0)
361 				return -1;
362 			return tcprb_resize(rb,
363 					(((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
364 #endif
365 		case MOS_SVRBUF:
366 #if 0
367 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
368 				errno = EBADF;
369 				return -1;
370 			}
371 #endif
372 #ifdef DISABLE_DYN_RESIZE
373 			if (*(int *)optval != 0)
374 				return -1;
375 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
376 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
377 					socket->monitor_stream->stream->rcvvar->rcvbuf :
378 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
379 				if (rb) {
380 					tcprb_resize_meta(rb, 0);
381 					tcprb_resize(rb, 0);
382 				}
383 			}
384 			return DisableBuf(socket, MOS_SIDE_SVR);
385 #else
386 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
387 				socket->monitor_stream->stream->rcvvar->rcvbuf :
388 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
389 			if (tcprb_resize_meta(rb, *(int *)optval) < 0)
390 				return -1;
391 			return tcprb_resize(rb,
392 					(((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
393 #endif
394 		case MOS_FRAG_CLIBUF:
395 #if 0
396 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
397 				errno = EBADF;
398 				return -1;
399 			}
400 #endif
401 #ifdef DISABLE_DYN_RESIZE
402 			if (*(int *)optval != 0)
403 				return -1;
404 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
405 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
406 					socket->monitor_stream->stream->rcvvar->rcvbuf :
407 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
408 				if (rb)
409 					tcprb_resize(rb, 0);
410 			}
411 			return 0;
412 #else
413 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
414 				socket->monitor_stream->stream->rcvvar->rcvbuf :
415 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
416 			if (rb->len == 0)
417 				return tcprb_resize_meta(rb, *(int *)optval);
418 			else
419 				return -1;
420 #endif
421 		case MOS_FRAG_SVRBUF:
422 #if 0
423 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
424 				errno = EBADF;
425 				return -1;
426 			}
427 #endif
428 #ifdef DISABLE_DYN_RESIZE
429 			if (*(int *)optval != 0)
430 				return -1;
431 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
432 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
433 					socket->monitor_stream->stream->rcvvar->rcvbuf :
434 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
435 				if (rb)
436 					tcprb_resize(rb, 0);
437 			}
438 			return 0;
439 #else
440 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
441 				socket->monitor_stream->stream->rcvvar->rcvbuf :
442 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
443 			if (rb->len == 0)
444 				return tcprb_resize_meta(rb, *(int *)optval);
445 			else
446 				return -1;
447 #endif
448 		case MOS_SEQ_REMAP:
449 			break;
450 		case MOS_STOP_MON:
451 			return mtcp_cb_stop(mctx, sockid, *(int *)optval);
452 		default:
453 			TRACE_API("invalid optname=%d\n", optname);
454 			assert(0);
455 		}
456 		break;
457 	}
458 
459 	errno = ENOSYS;
460 	return -1;
461 }
462 /*----------------------------------------------------------------------------*/
463 int
464 mtcp_setsock_nonblock(mctx_t mctx, int sockid)
465 {
466 	mtcp_manager_t mtcp;
467 
468 	mtcp = GetMTCPManager(mctx);
469 	if (!mtcp) {
470 		errno = EACCES;
471 		return -1;
472 	}
473 
474 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
475 		TRACE_API("Socket id %d out of range.\n", sockid);
476 		errno = EBADF;
477 		return -1;
478 	}
479 
480 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
481 		TRACE_API("Invalid socket id: %d\n", sockid);
482 		errno = EBADF;
483 		return -1;
484 	}
485 
486 	mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
487 
488 	return 0;
489 }
490 /*----------------------------------------------------------------------------*/
491 int
492 mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp)
493 {
494 	mtcp_manager_t mtcp;
495 	socket_map_t socket;
496 
497 	mtcp = GetMTCPManager(mctx);
498 	if (!mtcp) {
499 		errno = EACCES;
500 		return -1;
501 	}
502 
503 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
504 		TRACE_API("Socket id %d out of range.\n", sockid);
505 		errno = EBADF;
506 		return -1;
507 	}
508 
509 	/* only support stream socket */
510 	socket = &mtcp->smap[sockid];
511 
512 	if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
513 		socket->socktype != MOS_SOCK_STREAM) {
514 		TRACE_API("Invalid socket id: %d\n", sockid);
515 		errno = EBADF;
516 		return -1;
517 	}
518 
519 	if (!argp) {
520 		errno = EFAULT;
521 		return -1;
522 	}
523 
524 	if (request == FIONREAD) {
525 		tcp_stream *cur_stream;
526 		tcprb_t *rbuf;
527 
528 		cur_stream = socket->stream;
529 		if (!cur_stream) {
530 			errno = EBADF;
531 			return -1;
532 		}
533 
534 		rbuf = cur_stream->rcvvar->rcvbuf;
535 		*(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0;
536 
537 	} else if (request == FIONBIO) {
538 		/*
539 		 * sockets can only be set to blocking/non-blocking
540 		 * modes during initialization
541 		 */
542 		if ((*(int *)argp))
543 			mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
544 		else
545 			mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK;
546 	} else {
547 		errno = EINVAL;
548 		return -1;
549 	}
550 
551 	return 0;
552 }
553 /*----------------------------------------------------------------------------*/
554 static int
555 mtcp_monitor(mctx_t mctx, socket_map_t sock)
556 {
557 	mtcp_manager_t mtcp;
558 	struct mon_listener *monitor;
559 	int sockid = sock->id;
560 
561 	mtcp = GetMTCPManager(mctx);
562 	if (!mtcp) {
563 		errno = EACCES;
564 		return -1;
565 	}
566 
567 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
568 		TRACE_API("Socket id %d out of range.\n", sockid);
569 		errno = EBADF;
570 		return -1;
571 	}
572 
573 	if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) {
574 		TRACE_API("Invalid socket id: %d\n", sockid);
575 		errno = EBADF;
576 		return -1;
577 	}
578 
579 	if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM ||
580 	      mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) {
581 		TRACE_API("Not a monitor socket. id: %d\n", sockid);
582 		errno = ENOTSOCK;
583 		return -1;
584 	}
585 
586 	monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener));
587 	if (!monitor) {
588 		/* errno set from the malloc() */
589 		errno = ENOMEM;
590 		return -1;
591 	}
592 
593 	/* create a monitor-specific event queue */
594 	monitor->eq = CreateEventQueue(g_config.mos->max_concurrency);
595 	if (!monitor->eq) {
596 		TRACE_API("Can't create event queue (concurrency: %d) for "
597 			  "monitor read event registrations!\n",
598 			  g_config.mos->max_concurrency);
599 		free(monitor);
600 		errno = ENOMEM;
601 		return -1;
602 	}
603 
604 	/* set monitor-related basic parameters */
605 #ifndef NEWEV
606 	monitor->ude_id = UDE_OFFSET;
607 #endif
608 	monitor->socket = sock;
609 	monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL;
610 
611 	/* perform both sides monitoring by default */
612 	monitor->client_mon = monitor->server_mon = 1;
613 
614 	/* add monitor socket to the monitor list */
615 	TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link);
616 
617 	mtcp->msmap[sockid].monitor_listener = monitor;
618 
619 	return 0;
620 }
621 /*----------------------------------------------------------------------------*/
622 int
623 mtcp_socket(mctx_t mctx, int domain, int type, int protocol)
624 {
625 	mtcp_manager_t mtcp;
626 	socket_map_t socket;
627 
628 	mtcp = GetMTCPManager(mctx);
629 	if (!mtcp) {
630 		errno = EACCES;
631 		return -1;
632 	}
633 
634 	if (domain != AF_INET) {
635 		errno = EAFNOSUPPORT;
636 		return -1;
637 	}
638 
639 	if (type == (int)SOCK_STREAM) {
640 		type = MOS_SOCK_STREAM;
641 	} else if (type == MOS_SOCK_MONITOR_STREAM ||
642 		   type == MOS_SOCK_MONITOR_RAW) {
643 		/* do nothing for the time being */
644 	} else {
645 		/* Not supported type */
646 		errno = EINVAL;
647 		return -1;
648 	}
649 
650 	socket = AllocateSocket(mctx, type);
651 	if (!socket) {
652 		errno = ENFILE;
653 		return -1;
654 	}
655 
656 	if (type == MOS_SOCK_MONITOR_STREAM ||
657 	    type == MOS_SOCK_MONITOR_RAW) {
658 		mtcp_manager_t mtcp = GetMTCPManager(mctx);
659 		if (!mtcp) {
660 			errno = EACCES;
661 			return -1;
662 		}
663 		mtcp_monitor(mctx, socket);
664 #ifdef NEWEV
665 		socket->monitor_listener->stree_dontcare = NULL;
666 		socket->monitor_listener->stree_pre_rcv = NULL;
667 		socket->monitor_listener->stree_post_snd = NULL;
668 #else
669 		InitEvB(mtcp, &socket->monitor_listener->dontcare_evb);
670 		InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb);
671 		InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb);
672 #endif
673 	}
674 
675 	return socket->id;
676 }
677 /*----------------------------------------------------------------------------*/
678 int
679 mtcp_bind(mctx_t mctx, int sockid,
680 		const struct sockaddr *addr, socklen_t addrlen)
681 {
682 	mtcp_manager_t mtcp;
683 	struct sockaddr_in *addr_in;
684 
685 	mtcp = GetMTCPManager(mctx);
686 	if (!mtcp) {
687 		errno = EACCES;
688 		return -1;
689 	}
690 
691 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
692 		TRACE_API("Socket id %d out of range.\n", sockid);
693 		errno = EBADF;
694 		return -1;
695 	}
696 
697 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
698 		TRACE_API("Invalid socket id: %d\n", sockid);
699 		errno = EBADF;
700 		return -1;
701 	}
702 
703 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM &&
704 			mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
705 		TRACE_API("Not a stream socket id: %d\n", sockid);
706 		errno = ENOTSOCK;
707 		return -1;
708 	}
709 
710 	if (!addr) {
711 		TRACE_API("Socket %d: empty address!\n", sockid);
712 		errno = EINVAL;
713 		return -1;
714 	}
715 
716 	if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) {
717 		TRACE_API("Socket %d: adress already bind for this socket.\n", sockid);
718 		errno = EINVAL;
719 		return -1;
720 	}
721 
722 	/* we only allow bind() for AF_INET address */
723 	if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
724 		TRACE_API("Socket %d: invalid argument!\n", sockid);
725 		errno = EINVAL;
726 		return -1;
727 	}
728 
729 	if (mtcp->listener) {
730 		TRACE_API("Address already bound!\n");
731 		errno = EINVAL;
732 		return -1;
733 	}
734 	addr_in = (struct sockaddr_in *)addr;
735 	mtcp->smap[sockid].saddr = *addr_in;
736 	mtcp->smap[sockid].opts |= MTCP_ADDR_BIND;
737 
738 	return 0;
739 }
740 /*----------------------------------------------------------------------------*/
741 int
742 mtcp_listen(mctx_t mctx, int sockid, int backlog)
743 {
744 	mtcp_manager_t mtcp;
745 	struct tcp_listener *listener;
746 
747 	mtcp = GetMTCPManager(mctx);
748 	if (!mtcp) {
749 		errno = EACCES;
750 		return -1;
751 	}
752 
753 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
754 		TRACE_API("Socket id %d out of range.\n", sockid);
755 		errno = EBADF;
756 		return -1;
757 	}
758 
759 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
760 		TRACE_API("Invalid socket id: %d\n", sockid);
761 		errno = EBADF;
762 		return -1;
763 	}
764 
765 	if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) {
766 		mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN;
767 	}
768 
769 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
770 		TRACE_API("Not a listening socket. id: %d\n", sockid);
771 		errno = ENOTSOCK;
772 		return -1;
773 	}
774 
775 	if (backlog <= 0 || backlog > g_config.mos->max_concurrency) {
776 		errno = EINVAL;
777 		return -1;
778 	}
779 
780 	listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener));
781 	if (!listener) {
782 		/* errno set from the malloc() */
783 		errno = ENOMEM;
784 		return -1;
785 	}
786 
787 	listener->sockid = sockid;
788 	listener->backlog = backlog;
789 	listener->socket = &mtcp->smap[sockid];
790 
791 	if (pthread_cond_init(&listener->accept_cond, NULL)) {
792 		perror("pthread_cond_init of ctx->accept_cond\n");
793 		/* errno set by pthread_cond_init() */
794 		free(listener);
795 		return -1;
796 	}
797 	if (pthread_mutex_init(&listener->accept_lock, NULL)) {
798 		perror("pthread_mutex_init of ctx->accept_lock\n");
799 		/* errno set by pthread_mutex_init() */
800 		free(listener);
801 		return -1;
802 	}
803 
804 	listener->acceptq = CreateStreamQueue(backlog);
805 	if (!listener->acceptq) {
806 		free(listener);
807 		errno = ENOMEM;
808 		return -1;
809 	}
810 
811 	mtcp->smap[sockid].listener = listener;
812 	mtcp->listener = listener;
813 
814 	return 0;
815 }
816 /*----------------------------------------------------------------------------*/
817 int
818 mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen)
819 {
820 	mtcp_manager_t mtcp;
821 	struct tcp_listener *listener;
822 	socket_map_t socket;
823 	tcp_stream *accepted = NULL;
824 
825 	mtcp = GetMTCPManager(mctx);
826 	if (!mtcp) {
827 		errno = EACCES;
828 		return -1;
829 	}
830 
831 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
832 		TRACE_API("Socket id %d out of range.\n", sockid);
833 		errno = EBADF;
834 		return -1;
835 	}
836 
837 	/* requires listening socket */
838 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
839 		errno = EINVAL;
840 		return -1;
841 	}
842 
843 	listener = mtcp->smap[sockid].listener;
844 
845 	/* dequeue from the acceptq without lock first */
846 	/* if nothing there, acquire lock and cond_wait */
847 	accepted = StreamDequeue(listener->acceptq);
848 	if (!accepted) {
849 		if (listener->socket->opts & MTCP_NONBLOCK) {
850 			errno = EAGAIN;
851 			return -1;
852 
853 		} else {
854 			pthread_mutex_lock(&listener->accept_lock);
855 			while ((accepted = StreamDequeue(listener->acceptq)) == NULL) {
856 				pthread_cond_wait(&listener->accept_cond, &listener->accept_lock);
857 
858 				if (mtcp->ctx->done || mtcp->ctx->exit) {
859 					pthread_mutex_unlock(&listener->accept_lock);
860 					errno = EINTR;
861 					return -1;
862 				}
863 			}
864 			pthread_mutex_unlock(&listener->accept_lock);
865 		}
866 	}
867 
868 	if (!accepted) {
869 		TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n");
870 	}
871 
872 	if (!accepted->socket) {
873 		socket = AllocateSocket(mctx, MOS_SOCK_STREAM);
874 		if (!socket) {
875 			TRACE_ERROR("Failed to create new socket!\n");
876 			/* TODO: destroy the stream */
877 			errno = ENFILE;
878 			return -1;
879 		}
880 		socket->stream = accepted;
881 		accepted->socket = socket;
882 
883 		/* set socket addr parameters */
884 		socket->saddr.sin_family = AF_INET;
885 		socket->saddr.sin_port = accepted->dport;
886 		socket->saddr.sin_addr.s_addr = accepted->daddr;
887 
888 		/* if monitor is enabled, complete the socket assignment */
889 		if (socket->stream->pair_stream != NULL)
890 			socket->stream->pair_stream->socket = socket;
891 	}
892 
893 	if (!(listener->socket->epoll & MOS_EPOLLET) &&
894 	    !StreamQueueIsEmpty(listener->acceptq))
895 		AddEpollEvent(mtcp->ep,
896 			      USR_SHADOW_EVENT_QUEUE,
897 			      listener->socket, MOS_EPOLLIN);
898 
899 	TRACE_API("Stream %d accepted.\n", accepted->id);
900 
901 	if (addr && addrlen) {
902 		struct sockaddr_in *addr_in = (struct sockaddr_in *)addr;
903 		addr_in->sin_family = AF_INET;
904 		addr_in->sin_port = accepted->dport;
905 		addr_in->sin_addr.s_addr = accepted->daddr;
906 		*addrlen = sizeof(struct sockaddr_in);
907 	}
908 
909 	return accepted->socket->id;
910 }
911 /*----------------------------------------------------------------------------*/
912 int
913 mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr,
914 		in_addr_t daddr, in_addr_t dport)
915 {
916 	mtcp_manager_t mtcp;
917 	addr_pool_t ap;
918 
919 	mtcp = GetMTCPManager(mctx);
920 	if (!mtcp) {
921 		errno = EACCES;
922 		return -1;
923 	}
924 
925 	if (saddr_base == INADDR_ANY) {
926 		int nif_out;
927 
928 		/* for the INADDR_ANY, find the output interface for the destination
929 		   and set the saddr_base as the ip address of the output interface */
930 		nif_out = GetOutputInterface(daddr);
931 		if (nif_out < 0) {
932 			TRACE_DBG("Could not determine nif idx!\n");
933 			errno = EINVAL;
934 			return -1;
935 		}
936 		saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr;
937 	}
938 
939 	ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus,
940 			saddr_base, num_addr, daddr, dport);
941 	if (!ap) {
942 		errno = ENOMEM;
943 		return -1;
944 	}
945 
946 	mtcp->ap = ap;
947 
948 	return 0;
949 }
950 /*----------------------------------------------------------------------------*/
951 int
952 eval_bpf_5tuple(struct sfbpf_program fcode,
953 				in_addr_t saddr, in_port_t sport,
954 				in_addr_t daddr, in_port_t dport) {
955 	uint8_t buf[TOTAL_TCP_HEADER_LEN];
956 	struct ethhdr *ethh;
957 	struct iphdr *iph;
958 	struct tcphdr *tcph;
959 
960 	ethh = (struct ethhdr *)buf;
961 	ethh->h_proto = htons(ETH_P_IP);
962 	iph = (struct iphdr *)(ethh + 1);
963 	iph->ihl = IP_HEADER_LEN >> 2;
964 	iph->version = 4;
965 	iph->tos = 0;
966 	iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN);
967 	iph->id = htons(0);
968 	iph->protocol = IPPROTO_TCP;
969 	iph->saddr = saddr;
970 	iph->daddr = daddr;
971 	iph->check = 0;
972 	tcph = (struct tcphdr *)(iph + 1);
973 	tcph->source = sport;
974 	tcph->dest = dport;
975 
976 	return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr),
977 						 TOTAL_TCP_HEADER_LEN);
978 }
979 /*----------------------------------------------------------------------------*/
980 int
981 mtcp_connect(mctx_t mctx, int sockid,
982 		const struct sockaddr *addr, socklen_t addrlen)
983 {
984 	mtcp_manager_t mtcp;
985 	socket_map_t socket;
986 	tcp_stream *cur_stream;
987 	struct sockaddr_in *addr_in;
988 	in_addr_t dip;
989 	in_port_t dport;
990 	int is_dyn_bound = FALSE;
991 	int ret, nif, endian_check;
992 	int cnt_match = 0;
993 	struct mon_listener *walk;
994 	struct sfbpf_program fcode;
995 
996 	cur_stream = NULL;
997 	mtcp = GetMTCPManager(mctx);
998 	if (!mtcp) {
999 		errno = EACCES;
1000 		return -1;
1001 	}
1002 
1003 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1004 		TRACE_API("Socket id %d out of range.\n", sockid);
1005 		errno = EBADF;
1006 		return -1;
1007 	}
1008 
1009 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1010 		TRACE_API("Invalid socket id: %d\n", sockid);
1011 		errno = EBADF;
1012 		return -1;
1013 	}
1014 
1015 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
1016 		TRACE_API("Not an end socket. id: %d\n", sockid);
1017 		errno = ENOTSOCK;
1018 		return -1;
1019 	}
1020 
1021 	if (!addr) {
1022 		TRACE_API("Socket %d: empty address!\n", sockid);
1023 		errno = EFAULT;
1024 		return -1;
1025 	}
1026 
1027 	/* we only allow bind() for AF_INET address */
1028 	if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
1029 		TRACE_API("Socket %d: invalid argument!\n", sockid);
1030 		errno = EAFNOSUPPORT;
1031 		return -1;
1032 	}
1033 
1034 	socket = &mtcp->smap[sockid];
1035 	if (socket->stream) {
1036 		TRACE_API("Socket %d: stream already exist!\n", sockid);
1037 		if (socket->stream->state >= TCP_ST_ESTABLISHED) {
1038 			errno = EISCONN;
1039 		} else {
1040 			errno = EALREADY;
1041 		}
1042 		return -1;
1043 	}
1044 
1045 	addr_in = (struct sockaddr_in *)addr;
1046 	dip = addr_in->sin_addr.s_addr;
1047 	dport = addr_in->sin_port;
1048 
1049 	/* address binding */
1050 	if (socket->opts & MTCP_ADDR_BIND &&
1051 	    socket->saddr.sin_port != INPORT_ANY &&
1052 	    socket->saddr.sin_addr.s_addr != INADDR_ANY) {
1053 		int rss_core;
1054 
1055 		endian_check = FetchEndianType();
1056 		rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip,
1057 					 socket->saddr.sin_port, dport, num_queues,
1058 					 endian_check);
1059 
1060 		if (rss_core != mctx->cpu) {
1061 			errno = EINVAL;
1062 			return -1;
1063 		}
1064 	} else {
1065 		if (mtcp->ap) {
1066 			ret = FetchAddress(mtcp->ap,
1067 					mctx->cpu, num_queues, addr_in, &socket->saddr);
1068 		} else {
1069 			nif = GetOutputInterface(dip);
1070 			if (nif < 0) {
1071 				errno = EINVAL;
1072 				return -1;
1073 			}
1074 			ret = FetchAddress(ap[nif],
1075 					   mctx->cpu, num_queues, addr_in, &socket->saddr);
1076 		}
1077 		if (ret < 0) {
1078 			errno = EAGAIN;
1079 			return -1;
1080 		}
1081 		socket->opts |= MTCP_ADDR_BIND;
1082 		is_dyn_bound = TRUE;
1083 	}
1084 
1085 	cnt_match = 0;
1086 	if (mtcp->num_msp > 0) {
1087 		TAILQ_FOREACH(walk, &mtcp->monitors, link) {
1088 			fcode = walk->stream_syn_fcode;
1089 			if (!(ISSET_BPFFILTER(fcode) &&
1090 				  eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr,
1091 								  socket->saddr.sin_port,
1092 								  dip, dport) == 0)) {
1093 				walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1
1094 				cnt_match++;
1095 			}
1096 		}
1097 	}
1098 
1099 	if (mtcp->num_msp > 0 && cnt_match > 0) {
1100 		/* 150820 dhkim: XXX: embedded mode is not verified */
1101 #if 1
1102 		cur_stream = CreateClientTCPStream(mtcp, socket,
1103 						 STREAM_TYPE(MOS_SOCK_STREAM) |
1104 						 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
1105 						 socket->saddr.sin_addr.s_addr,
1106 						 socket->saddr.sin_port, dip, dport, NULL);
1107 #else
1108 		cur_stream = CreateDualTCPStream(mtcp, socket,
1109 						 STREAM_TYPE(MOS_SOCK_STREAM) |
1110 						 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
1111 						 socket->saddr.sin_addr.s_addr,
1112 						 socket->saddr.sin_port, dip, dport, NULL);
1113 #endif
1114 	}
1115 	else
1116 		cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM),
1117 					     socket->saddr.sin_addr.s_addr,
1118 					     socket->saddr.sin_port, dip, dport, NULL);
1119 	if (!cur_stream) {
1120 		TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid);
1121 		errno = ENOMEM;
1122 		return -1;
1123 	}
1124 
1125 	if (is_dyn_bound)
1126 		cur_stream->is_bound_addr = TRUE;
1127 	cur_stream->sndvar->cwnd = 1;
1128 	cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10;
1129 	cur_stream->side = MOS_SIDE_CLI;
1130 	/* if monitor is enabled, update the pair stream side as well */
1131 	if (cur_stream->pair_stream) {
1132 		cur_stream->pair_stream->side = MOS_SIDE_SVR;
1133 		/*
1134 		 * if buffer management is off, then disable
1135 		 * monitoring tcp ring of server...
1136 		 * if there is even a single monitor asking for
1137 		 * buffer management, enable it (that's why the
1138 		 * need for the loop)
1139 		 */
1140 		cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF;
1141 		struct socket_map *walk;
1142 		SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
1143 			uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt;
1144 			if (bm > cur_stream->pair_stream->buffer_mgmt) {
1145 				cur_stream->pair_stream->buffer_mgmt = bm;
1146 				break;
1147 			}
1148 		} SOCKQ_FOREACH_END;
1149 	}
1150 
1151 	cur_stream->state = TCP_ST_SYN_SENT;
1152 	cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1153 
1154 	TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id);
1155 
1156 	SQ_LOCK(&mtcp->ctx->connect_lock);
1157 	ret = StreamEnqueue(mtcp->connectq, cur_stream);
1158 	SQ_UNLOCK(&mtcp->ctx->connect_lock);
1159 	mtcp->wakeup_flag = TRUE;
1160 	if (ret < 0) {
1161 		TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid);
1162 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1163 		StreamEnqueue(mtcp->destroyq, cur_stream);
1164 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1165 		errno = EAGAIN;
1166 		return -1;
1167 	}
1168 
1169 	/* if nonblocking socket, return EINPROGRESS */
1170 	if (socket->opts & MTCP_NONBLOCK) {
1171 		errno = EINPROGRESS;
1172 		return -1;
1173 
1174 	} else {
1175 		while (1) {
1176 			if (!cur_stream) {
1177 				TRACE_ERROR("STREAM DESTROYED\n");
1178 				errno = ETIMEDOUT;
1179 				return -1;
1180 			}
1181 			if (cur_stream->state > TCP_ST_ESTABLISHED) {
1182 				TRACE_ERROR("Socket %d: weird state %s\n",
1183 						sockid, TCPStateToString(cur_stream));
1184 				// TODO: how to handle this?
1185 				errno = ENOSYS;
1186 				return -1;
1187 			}
1188 
1189 			if (cur_stream->state == TCP_ST_ESTABLISHED) {
1190 				break;
1191 			}
1192 			usleep(1000);
1193 		}
1194 	}
1195 
1196 	return 0;
1197 }
1198 /*----------------------------------------------------------------------------*/
1199 static inline int
1200 CloseStreamSocket(mctx_t mctx, int sockid)
1201 {
1202 	mtcp_manager_t mtcp;
1203 	tcp_stream *cur_stream;
1204 	int ret;
1205 
1206 	mtcp = GetMTCPManager(mctx);
1207 	if (!mtcp) {
1208 		errno = EACCES;
1209 		return -1;
1210 	}
1211 
1212 	cur_stream = mtcp->smap[sockid].stream;
1213 	if (!cur_stream) {
1214 		TRACE_API("Socket %d: stream does not exist.\n", sockid);
1215 		errno = ENOTCONN;
1216 		return -1;
1217 	}
1218 
1219 	if (cur_stream->closed) {
1220 		TRACE_API("Socket %d (Stream %u): already closed stream\n",
1221 				sockid, cur_stream->id);
1222 		return 0;
1223 	}
1224 	cur_stream->closed = TRUE;
1225 
1226 	TRACE_API("Stream %d: closing the stream.\n", cur_stream->id);
1227 
1228 	/* 141029 dhkim: Check this! */
1229 	cur_stream->socket = NULL;
1230 
1231 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
1232 		TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n",
1233 				cur_stream->id);
1234 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1235 		StreamEnqueue(mtcp->destroyq, cur_stream);
1236 		mtcp->wakeup_flag = TRUE;
1237 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1238 		return 0;
1239 
1240 	} else if (cur_stream->state == TCP_ST_SYN_SENT) {
1241 #if 1
1242 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1243 		StreamEnqueue(mtcp->destroyq, cur_stream);
1244 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1245 		mtcp->wakeup_flag = TRUE;
1246 #endif
1247 		return -1;
1248 
1249 	} else if (cur_stream->state != TCP_ST_ESTABLISHED &&
1250 			cur_stream->state != TCP_ST_CLOSE_WAIT) {
1251 		TRACE_API("Stream %d at state %s\n",
1252 				cur_stream->id, TCPStateToString(cur_stream));
1253 		errno = EBADF;
1254 		return -1;
1255 	}
1256 
1257 	SQ_LOCK(&mtcp->ctx->close_lock);
1258 	cur_stream->sndvar->on_closeq = TRUE;
1259 	ret = StreamEnqueue(mtcp->closeq, cur_stream);
1260 	mtcp->wakeup_flag = TRUE;
1261 	SQ_UNLOCK(&mtcp->ctx->close_lock);
1262 
1263 	if (ret < 0) {
1264 		TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
1265 		errno = EAGAIN;
1266 		return -1;
1267 	}
1268 
1269 	return 0;
1270 }
1271 /*----------------------------------------------------------------------------*/
1272 static inline int
1273 CloseListeningSocket(mctx_t mctx, int sockid)
1274 {
1275 	mtcp_manager_t mtcp;
1276 	struct tcp_listener *listener;
1277 
1278 	mtcp = GetMTCPManager(mctx);
1279 	if (!mtcp) {
1280 		errno = EACCES;
1281 		return -1;
1282 	}
1283 
1284 	listener = mtcp->smap[sockid].listener;
1285 	if (!listener) {
1286 		errno = EINVAL;
1287 		return -1;
1288 	}
1289 
1290 	if (listener->acceptq) {
1291 		DestroyStreamQueue(listener->acceptq);
1292 		listener->acceptq = NULL;
1293 	}
1294 
1295 	pthread_mutex_lock(&listener->accept_lock);
1296 	pthread_cond_signal(&listener->accept_cond);
1297 	pthread_mutex_unlock(&listener->accept_lock);
1298 
1299 	pthread_cond_destroy(&listener->accept_cond);
1300 	pthread_mutex_destroy(&listener->accept_lock);
1301 
1302 	free(listener);
1303 	mtcp->smap[sockid].listener = NULL;
1304 
1305 	return 0;
1306 }
1307 /*----------------------------------------------------------------------------*/
1308 int
1309 mtcp_close(mctx_t mctx, int sockid)
1310 {
1311 	mtcp_manager_t mtcp;
1312 	int ret;
1313 
1314 	mtcp = GetMTCPManager(mctx);
1315 	if (!mtcp) {
1316 		errno = EACCES;
1317 		return -1;
1318 	}
1319 
1320 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1321 		TRACE_API("Socket id %d out of range.\n", sockid);
1322 		errno = EBADF;
1323 		return -1;
1324 	}
1325 
1326 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1327 		TRACE_API("Invalid socket id: %d\n", sockid);
1328 		errno = EBADF;
1329 		return -1;
1330 	}
1331 
1332 	TRACE_API("Socket %d: mtcp_close called.\n", sockid);
1333 
1334 	switch (mtcp->smap[sockid].socktype) {
1335 	case MOS_SOCK_STREAM:
1336 		ret = CloseStreamSocket(mctx, sockid);
1337 		break;
1338 
1339 	case MOS_SOCK_STREAM_LISTEN:
1340 		ret = CloseListeningSocket(mctx, sockid);
1341 		break;
1342 
1343 	case MOS_SOCK_EPOLL:
1344 		ret = CloseEpollSocket(mctx, sockid);
1345 		break;
1346 
1347 	case MOS_SOCK_PIPE:
1348 		ret = PipeClose(mctx, sockid);
1349 		break;
1350 
1351 	default:
1352 		errno = EINVAL;
1353 		ret = -1;
1354 		break;
1355 	}
1356 
1357 	FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
1358 
1359 	return ret;
1360 }
1361 /*----------------------------------------------------------------------------*/
1362 int
1363 mtcp_abort(mctx_t mctx, int sockid)
1364 {
1365 	mtcp_manager_t mtcp;
1366 	tcp_stream *cur_stream;
1367 	int ret;
1368 
1369 	mtcp = GetMTCPManager(mctx);
1370 	if (!mtcp) {
1371 		errno = EACCES;
1372 		return -1;
1373 	}
1374 
1375 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1376 		TRACE_API("Socket id %d out of range.\n", sockid);
1377 		errno = EBADF;
1378 		return -1;
1379 	}
1380 
1381 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1382 		TRACE_API("Invalid socket id: %d\n", sockid);
1383 		errno = EBADF;
1384 		return -1;
1385 	}
1386 
1387 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
1388 		TRACE_API("Not an end socket. id: %d\n", sockid);
1389 		errno = ENOTSOCK;
1390 		return -1;
1391 	}
1392 
1393 	cur_stream = mtcp->smap[sockid].stream;
1394 	if (!cur_stream) {
1395 		TRACE_API("Stream %d: does not exist.\n", sockid);
1396 		errno = ENOTCONN;
1397 		return -1;
1398 	}
1399 
1400 	TRACE_API("Socket %d: mtcp_abort()\n", sockid);
1401 
1402 	FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
1403 	cur_stream->socket = NULL;
1404 
1405 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
1406 		TRACE_API("Stream %d: connection already reset.\n", sockid);
1407 		return ERROR;
1408 
1409 	} else if (cur_stream->state == TCP_ST_SYN_SENT) {
1410 		/* TODO: this should notify event failure to all
1411 		   previous read() or write() calls */
1412 		cur_stream->state = TCP_ST_CLOSED_RSVD;
1413 		cur_stream->close_reason = TCP_ACTIVE_CLOSE;
1414 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1415 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1416 		StreamEnqueue(mtcp->destroyq, cur_stream);
1417 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1418 		mtcp->wakeup_flag = TRUE;
1419 		return 0;
1420 
1421 	} else if (cur_stream->state == TCP_ST_CLOSING ||
1422 			cur_stream->state == TCP_ST_LAST_ACK ||
1423 			cur_stream->state == TCP_ST_TIME_WAIT) {
1424 		cur_stream->state = TCP_ST_CLOSED_RSVD;
1425 		cur_stream->close_reason = TCP_ACTIVE_CLOSE;
1426 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1427 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1428 		StreamEnqueue(mtcp->destroyq, cur_stream);
1429 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1430 		mtcp->wakeup_flag = TRUE;
1431 		return 0;
1432 	}
1433 
1434 	/* the stream structure will be destroyed after sending RST */
1435 	if (cur_stream->sndvar->on_resetq) {
1436 		TRACE_ERROR("Stream %d: calling mtcp_abort() "
1437 				"when in reset queue.\n", sockid);
1438 		errno = ECONNRESET;
1439 		return -1;
1440 	}
1441 	SQ_LOCK(&mtcp->ctx->reset_lock);
1442 	cur_stream->sndvar->on_resetq = TRUE;
1443 	ret = StreamEnqueue(mtcp->resetq, cur_stream);
1444 	SQ_UNLOCK(&mtcp->ctx->reset_lock);
1445 	mtcp->wakeup_flag = TRUE;
1446 
1447 	if (ret < 0) {
1448 		TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
1449 		errno = EAGAIN;
1450 		return -1;
1451 	}
1452 
1453 	return 0;
1454 }
1455 /*----------------------------------------------------------------------------*/
1456 static inline int
1457 PeekForUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len)
1458 {
1459 	struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
1460 	int copylen;
1461 	tcprb_t *rb = rcvvar->rcvbuf;
1462 
1463 	if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) {
1464 		errno = EAGAIN;
1465 		return -1;
1466 	}
1467 
1468 	return copylen;
1469 }
1470 /*----------------------------------------------------------------------------*/
1471 static inline int
1472 CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len)
1473 {
1474 	struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
1475 	int copylen;
1476 	tcprb_t *rb = rcvvar->rcvbuf;
1477 	if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) {
1478 		errno = EAGAIN;
1479 		return -1;
1480 	}
1481 	tcprb_setpile(rb, rb->pile + copylen);
1482 
1483 	rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb);
1484 	//printf("rcv_wnd: %d\n", rcvvar->rcv_wnd);
1485 
1486 	/* Advertise newly freed receive buffer */
1487 	if (cur_stream->need_wnd_adv) {
1488 		if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) {
1489 			if (!cur_stream->sndvar->on_ackq) {
1490 				SQ_LOCK(&mtcp->ctx->ackq_lock);
1491 				cur_stream->sndvar->on_ackq = TRUE;
1492 				StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */
1493 				SQ_UNLOCK(&mtcp->ctx->ackq_lock);
1494 				cur_stream->need_wnd_adv = FALSE;
1495 				mtcp->wakeup_flag = TRUE;
1496 			}
1497 		}
1498 	}
1499 
1500 	return copylen;
1501 }
1502 /*----------------------------------------------------------------------------*/
1503 ssize_t
1504 mtcp_recv(mctx_t mctx, int sockid, char *buf, size_t len, int flags)
1505 {
1506 	mtcp_manager_t mtcp;
1507 	socket_map_t socket;
1508 	tcp_stream *cur_stream;
1509 	struct tcp_recv_vars *rcvvar;
1510 	int event_remaining, merged_len;
1511 	int ret;
1512 
1513 	mtcp = GetMTCPManager(mctx);
1514 	if (!mtcp) {
1515 		errno = EACCES;
1516 		return -1;
1517 	}
1518 
1519 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1520 		TRACE_API("Socket id %d out of range.\n", sockid);
1521 		errno = EBADF;
1522 		return -1;
1523 	}
1524 
1525 	socket = &mtcp->smap[sockid];
1526 	if (socket->socktype == MOS_SOCK_UNUSED) {
1527 		TRACE_API("Invalid socket id: %d\n", sockid);
1528 		errno = EBADF;
1529 		return -1;
1530 	}
1531 
1532 	if (socket->socktype == MOS_SOCK_PIPE) {
1533 		return PipeRead(mctx, sockid, buf, len);
1534 	}
1535 
1536 	if (socket->socktype != MOS_SOCK_STREAM) {
1537 		TRACE_API("Not an end socket. id: %d\n", sockid);
1538 		errno = ENOTSOCK;
1539 		return -1;
1540 	}
1541 
1542 	/* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
1543 	cur_stream = socket->stream;
1544 	if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf ||
1545 	    !(cur_stream->state >= TCP_ST_ESTABLISHED &&
1546 	      cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
1547 		errno = ENOTCONN;
1548 		return -1;
1549 	}
1550 
1551 	rcvvar = cur_stream->rcvvar;
1552 
1553 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1554 
1555 	/* if CLOSE_WAIT, return 0 if there is no payload */
1556 	if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
1557 		if (merged_len == 0)
1558 			return 0;
1559 	}
1560 
1561 	/* return EAGAIN if no receive buffer */
1562 	if (socket->opts & MTCP_NONBLOCK) {
1563 		if (merged_len == 0) {
1564 			errno = EAGAIN;
1565 			return -1;
1566 		}
1567 	}
1568 
1569 	SBUF_LOCK(&rcvvar->read_lock);
1570 
1571 	switch (flags) {
1572 	case 0:
1573 		ret = CopyToUser(mtcp, cur_stream, buf, len);
1574 		break;
1575 	case MSG_PEEK:
1576 		ret = PeekForUser(mtcp, cur_stream, buf, len);
1577 		break;
1578 	default:
1579 		SBUF_UNLOCK(&rcvvar->read_lock);
1580 		ret = -1;
1581 		errno = EINVAL;
1582 		return ret;
1583 	}
1584 
1585 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1586 	event_remaining = FALSE;
1587 	/* if there are remaining payload, generate EPOLLIN */
1588 	/* (may due to insufficient user buffer) */
1589 	if (socket->epoll & MOS_EPOLLIN) {
1590 		if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
1591 			event_remaining = TRUE;
1592 		}
1593 	}
1594 	/* if waiting for close, notify it if no remaining data */
1595 	if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
1596 	    merged_len == 0 && ret > 0) {
1597 		event_remaining = TRUE;
1598 	}
1599 
1600 	SBUF_UNLOCK(&rcvvar->read_lock);
1601 
1602 	if (event_remaining) {
1603 		if (socket->epoll) {
1604 			AddEpollEvent(mtcp->ep,
1605 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
1606 		}
1607 	}
1608 
1609 	TRACE_API("Stream %d: mtcp_recv() returning %d\n", cur_stream->id, ret);
1610 	return ret;
1611 }
1612 /*----------------------------------------------------------------------------*/
1613 inline ssize_t
1614 mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len)
1615 {
1616 	return mtcp_recv(mctx, sockid, buf, len, 0);
1617 }
1618 /*----------------------------------------------------------------------------*/
1619 ssize_t
1620 mtcp_readv(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV)
1621 {
1622 	mtcp_manager_t mtcp;
1623 	socket_map_t socket;
1624 	tcp_stream *cur_stream;
1625 	struct tcp_recv_vars *rcvvar;
1626 	int ret, bytes_read, i;
1627 	int event_remaining, merged_len;
1628 
1629 	mtcp = GetMTCPManager(mctx);
1630 	if (!mtcp) {
1631 		errno = EACCES;
1632 		return -1;
1633 	}
1634 
1635 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1636 		TRACE_API("Socket id %d out of range.\n", sockid);
1637 		errno = EBADF;
1638 		return -1;
1639 	}
1640 
1641 	socket = &mtcp->smap[sockid];
1642 	if (socket->socktype == MOS_SOCK_UNUSED) {
1643 		TRACE_API("Invalid socket id: %d\n", sockid);
1644 		errno = EBADF;
1645 		return -1;
1646 	}
1647 
1648 	if (socket->socktype != MOS_SOCK_STREAM) {
1649 		TRACE_API("Not an end socket. id: %d\n", sockid);
1650 		errno = ENOTSOCK;
1651 		return -1;
1652 	}
1653 
1654 	/* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
1655 	cur_stream = socket->stream;
1656 	if (!cur_stream || !cur_stream->rcvvar->rcvbuf ||
1657 			!(cur_stream->state >= TCP_ST_ESTABLISHED &&
1658 			  cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
1659 		errno = ENOTCONN;
1660 		return -1;
1661 	}
1662 
1663 	rcvvar = cur_stream->rcvvar;
1664 
1665 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1666 
1667 	/* if CLOSE_WAIT, return 0 if there is no payload */
1668 	if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
1669 		if (merged_len == 0)
1670 			return 0;
1671 	}
1672 
1673 	/* return EAGAIN if no receive buffer */
1674 	if (socket->opts & MTCP_NONBLOCK) {
1675 		if (merged_len == 0) {
1676 			errno = EAGAIN;
1677 			return -1;
1678 		}
1679 	}
1680 
1681 	SBUF_LOCK(&rcvvar->read_lock);
1682 
1683 	/* read and store the contents to the vectored buffers */
1684 	bytes_read = 0;
1685 	for (i = 0; i < numIOV; i++) {
1686 		if (iov[i].iov_len <= 0)
1687 			continue;
1688 
1689 		ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
1690 		if (ret <= 0)
1691 			break;
1692 
1693 		bytes_read += ret;
1694 
1695 		if (ret < iov[i].iov_len)
1696 			break;
1697 	}
1698 
1699 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1700 
1701 	event_remaining = FALSE;
1702 	/* if there are remaining payload, generate read event */
1703 	/* (may due to insufficient user buffer) */
1704 	if (socket->epoll & MOS_EPOLLIN) {
1705 		if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
1706 			event_remaining = TRUE;
1707 		}
1708 	}
1709 	/* if waiting for close, notify it if no remaining data */
1710 	if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
1711 			merged_len == 0 && bytes_read > 0) {
1712 		event_remaining = TRUE;
1713 	}
1714 
1715 	SBUF_UNLOCK(&rcvvar->read_lock);
1716 
1717 	if(event_remaining) {
1718 		if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) {
1719 			AddEpollEvent(mtcp->ep,
1720 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
1721 		}
1722 	}
1723 
1724 	TRACE_API("Stream %d: mtcp_readv() returning %d\n",
1725 			cur_stream->id, bytes_read);
1726 	return bytes_read;
1727 }
1728 /*----------------------------------------------------------------------------*/
1729 static inline int
1730 CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, const char *buf, int len)
1731 {
1732 	struct tcp_send_vars *sndvar = cur_stream->sndvar;
1733 	int sndlen;
1734 	int ret;
1735 
1736 	sndlen = MIN((int)sndvar->snd_wnd, len);
1737 	if (sndlen <= 0) {
1738 		errno = EAGAIN;
1739 		return -1;
1740 	}
1741 
1742 	/* allocate send buffer if not exist */
1743 	if (!sndvar->sndbuf) {
1744 		sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1);
1745 		if (!sndvar->sndbuf) {
1746 			cur_stream->close_reason = TCP_NO_MEM;
1747 			/* notification may not required due to -1 return */
1748 			errno = ENOMEM;
1749 			return -1;
1750 		}
1751 	}
1752 
1753 	ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen);
1754 	assert(ret == sndlen);
1755 	sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len;
1756 	if (ret <= 0) {
1757 		TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n",
1758 				ret, sndlen, sndvar->sndbuf->len);
1759 		errno = EAGAIN;
1760 		return -1;
1761 	}
1762 
1763 	if (sndvar->snd_wnd <= 0) {
1764 		TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n",
1765 				cur_stream->id, sndvar->snd_wnd);
1766 	}
1767 
1768 	return ret;
1769 }
1770 /*----------------------------------------------------------------------------*/
1771 ssize_t
1772 mtcp_write(mctx_t mctx, int sockid, const char *buf, size_t len)
1773 {
1774 	mtcp_manager_t mtcp;
1775 	socket_map_t socket;
1776 	tcp_stream *cur_stream;
1777 	struct tcp_send_vars *sndvar;
1778 	int ret;
1779 
1780 	mtcp = GetMTCPManager(mctx);
1781 	if (!mtcp) {
1782 		errno = EACCES;
1783 		return -1;
1784 	}
1785 
1786 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1787 		TRACE_API("Socket id %d out of range.\n", sockid);
1788 		errno = EBADF;
1789 		return -1;
1790 	}
1791 
1792 	socket = &mtcp->smap[sockid];
1793 	if (socket->socktype == MOS_SOCK_UNUSED) {
1794 		TRACE_API("Invalid socket id: %d\n", sockid);
1795 		errno = EBADF;
1796 		return -1;
1797 	}
1798 
1799 	if (socket->socktype == MOS_SOCK_PIPE) {
1800 		return PipeWrite(mctx, sockid, buf, len);
1801 	}
1802 
1803 	if (socket->socktype != MOS_SOCK_STREAM) {
1804 		TRACE_API("Not an end socket. id: %d\n", sockid);
1805 		errno = ENOTSOCK;
1806 		return -1;
1807 	}
1808 
1809 	cur_stream = socket->stream;
1810 	if (!cur_stream ||
1811 			!(cur_stream->state == TCP_ST_ESTABLISHED ||
1812 			  cur_stream->state == TCP_ST_CLOSE_WAIT)) {
1813 		errno = ENOTCONN;
1814 		return -1;
1815 	}
1816 
1817 	if (len <= 0) {
1818 		if (socket->opts & MTCP_NONBLOCK) {
1819 			errno = EAGAIN;
1820 			return -1;
1821 		} else {
1822 			return 0;
1823 		}
1824 	}
1825 
1826 	sndvar = cur_stream->sndvar;
1827 
1828 	SBUF_LOCK(&sndvar->write_lock);
1829 	ret = CopyFromUser(mtcp, cur_stream, buf, len);
1830 
1831 	SBUF_UNLOCK(&sndvar->write_lock);
1832 
1833 	if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
1834 		SQ_LOCK(&mtcp->ctx->sendq_lock);
1835 		sndvar->on_sendq = TRUE;
1836 		StreamEnqueue(mtcp->sendq, cur_stream);		/* this always success */
1837 		SQ_UNLOCK(&mtcp->ctx->sendq_lock);
1838 		mtcp->wakeup_flag = TRUE;
1839 	}
1840 
1841 	if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) {
1842 		ret = -1;
1843 		errno = EAGAIN;
1844 	}
1845 
1846 	/* if there are remaining sending buffer, generate write event */
1847 	if (sndvar->snd_wnd > 0) {
1848 		if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
1849 			AddEpollEvent(mtcp->ep,
1850 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
1851 		}
1852 	}
1853 
1854 	TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret);
1855 	return ret;
1856 }
1857 /*----------------------------------------------------------------------------*/
1858 ssize_t
1859 mtcp_writev(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV)
1860 {
1861 	mtcp_manager_t mtcp;
1862 	socket_map_t socket;
1863 	tcp_stream *cur_stream;
1864 	struct tcp_send_vars *sndvar;
1865 	int ret, to_write, i;
1866 
1867 	mtcp = GetMTCPManager(mctx);
1868 	if (!mtcp) {
1869 		errno = EACCES;
1870 		return -1;
1871 	}
1872 
1873 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1874 		TRACE_API("Socket id %d out of range.\n", sockid);
1875 		errno = EBADF;
1876 		return -1;
1877 	}
1878 
1879 	socket = &mtcp->smap[sockid];
1880 	if (socket->socktype == MOS_SOCK_UNUSED) {
1881 		TRACE_API("Invalid socket id: %d\n", sockid);
1882 		errno = EBADF;
1883 		return -1;
1884 	}
1885 
1886 	if (socket->socktype != MOS_SOCK_STREAM) {
1887 		TRACE_API("Not an end socket. id: %d\n", sockid);
1888 		errno = ENOTSOCK;
1889 		return -1;
1890 	}
1891 
1892 	cur_stream = socket->stream;
1893 	if (!cur_stream ||
1894 			!(cur_stream->state == TCP_ST_ESTABLISHED ||
1895 			  cur_stream->state == TCP_ST_CLOSE_WAIT)) {
1896 		errno = ENOTCONN;
1897 		return -1;
1898 	}
1899 
1900 	sndvar = cur_stream->sndvar;
1901 	SBUF_LOCK(&sndvar->write_lock);
1902 
1903 	/* write from the vectored buffers */
1904 	to_write = 0;
1905 	for (i = 0; i < numIOV; i++) {
1906 		if (iov[i].iov_len <= 0)
1907 			continue;
1908 
1909 		ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
1910 		if (ret <= 0)
1911 			break;
1912 
1913 		to_write += ret;
1914 
1915 		if (ret < iov[i].iov_len)
1916 			break;
1917 	}
1918 	SBUF_UNLOCK(&sndvar->write_lock);
1919 
1920 	if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
1921 		SQ_LOCK(&mtcp->ctx->sendq_lock);
1922 		sndvar->on_sendq = TRUE;
1923 		StreamEnqueue(mtcp->sendq, cur_stream);		/* this always success */
1924 		SQ_UNLOCK(&mtcp->ctx->sendq_lock);
1925 		mtcp->wakeup_flag = TRUE;
1926 	}
1927 
1928 	if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) {
1929 		to_write = -1;
1930 		errno = EAGAIN;
1931 	}
1932 
1933 	/* if there are remaining sending buffer, generate write event */
1934 	if (sndvar->snd_wnd > 0) {
1935 		if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
1936 			AddEpollEvent(mtcp->ep,
1937 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
1938 		}
1939 	}
1940 
1941 	TRACE_API("Stream %d: mtcp_writev() returning %d\n",
1942 			cur_stream->id, to_write);
1943 	return to_write;
1944 }
1945 /*----------------------------------------------------------------------------*/
1946 uint32_t
1947 mtcp_get_connection_cnt(mctx_t mctx)
1948 {
1949 	mtcp_manager_t mtcp;
1950 	mtcp = GetMTCPManager(mctx);
1951 	if (!mtcp) {
1952 		errno = EACCES;
1953 		return -1;
1954 	}
1955 
1956 	if (mtcp->num_msp > 0)
1957 		return mtcp->flow_cnt / 2;
1958 	else
1959 		return mtcp->flow_cnt;
1960 }
1961 /*----------------------------------------------------------------------------*/
1962