xref: /mOS-networking-stack/core/src/api.c (revision 152f7c19)
1 #include <sys/queue.h>
2 #include <sys/ioctl.h>
3 #include <limits.h>
4 #include <unistd.h>
5 #include <assert.h>
6 #include <string.h>
7 
8 #ifdef DARWIN
9 #include <netinet/tcp.h>
10 #include <netinet/ip.h>
11 #include <netinet/if_ether.h>
12 #endif
13 
14 #include "mtcp.h"
15 #include "mtcp_api.h"
16 #include "tcp_in.h"
17 #include "tcp_stream.h"
18 #include "tcp_out.h"
19 #include "ip_out.h"
20 #include "eventpoll.h"
21 #include "pipe.h"
22 #include "fhash.h"
23 #include "addr_pool.h"
24 #include "util.h"
25 #include "config.h"
26 #include "debug.h"
27 #include "eventpoll.h"
28 #include "mos_api.h"
29 #include "tcp_rb.h"
30 
31 #define MAX(a, b) ((a)>(b)?(a):(b))
32 #define MIN(a, b) ((a)<(b)?(a):(b))
33 
34 /*----------------------------------------------------------------------------*/
35 /** Stop monitoring the socket! (function prototype)
36  * @param [in] mctx: mtcp context
37  * @param [in] sock: monitoring stream socket id
38  * @param [in] side: side of monitoring (client side, server side or both)
39  *
40  * This function is now DEPRECATED and is only used within mOS core...
41  */
42 int
43 mtcp_cb_stop(mctx_t mctx, int sock, int side);
44 /*----------------------------------------------------------------------------*/
45 /** Reset the connection (send RST packets to both sides)
46  *  (We need to decide the API for this.)
47  */
48 //int
49 //mtcp_cb_reset(mctx_t mctx, int sock, int side);
50 /*----------------------------------------------------------------------------*/
51 inline mtcp_manager_t
52 GetMTCPManager(mctx_t mctx)
53 {
54 	if (!mctx) {
55 		errno = EACCES;
56 		return NULL;
57 	}
58 
59 	if (mctx->cpu < 0 || mctx->cpu >= num_cpus) {
60 		errno = EINVAL;
61 		return NULL;
62 	}
63 
64 	if (!g_mtcp[mctx->cpu] || g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) {
65 		errno = EPERM;
66 		return NULL;
67 	}
68 
69 	return g_mtcp[mctx->cpu];
70 }
71 /*----------------------------------------------------------------------------*/
72 static inline int
73 GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen)
74 {
75 	tcp_stream *cur_stream;
76 
77 	if (!socket->stream) {
78 		errno = EBADF;
79 		return -1;
80 	}
81 
82 	cur_stream = socket->stream;
83 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
84 		if (cur_stream->close_reason == TCP_TIMEDOUT ||
85 				cur_stream->close_reason == TCP_CONN_FAIL ||
86 				cur_stream->close_reason == TCP_CONN_LOST) {
87 			*(int *)optval = ETIMEDOUT;
88 			*optlen = sizeof(int);
89 
90 			return 0;
91 		}
92 	}
93 
94 	if (cur_stream->state == TCP_ST_CLOSE_WAIT ||
95 			cur_stream->state == TCP_ST_CLOSED_RSVD) {
96 		if (cur_stream->close_reason == TCP_RESET) {
97 			*(int *)optval = ECONNRESET;
98 			*optlen = sizeof(int);
99 
100 			return 0;
101 		}
102 	}
103 
104 	if (cur_stream->state == TCP_ST_SYN_SENT &&
105 	    errno == EINPROGRESS) {
106 		*(int *)optval = errno;
107 		*optlen = sizeof(int);
108 
109 		return -1;
110 	}
111 
112 	/*
113 	 * `base case`: If socket sees no so_error, then
114 	 * this also means close_reason will always be
115 	 * TCP_NOT_CLOSED.
116 	 */
117 	if (cur_stream->close_reason == TCP_NOT_CLOSED) {
118 		*(int *)optval = 0;
119 		*optlen = sizeof(int);
120 
121 		return 0;
122 	}
123 
124 	errno = ENOSYS;
125 	return -1;
126 }
127 /*----------------------------------------------------------------------------*/
128 int
129 mtcp_getsockname(mctx_t mctx, int sockid, struct sockaddr *addr,
130 		 socklen_t *addrlen)
131 {
132 	mtcp_manager_t mtcp;
133 	socket_map_t socket;
134 
135 	mtcp = GetMTCPManager(mctx);
136 	if (!mtcp) {
137 		return -1;
138 	}
139 
140 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
141 		TRACE_API("Socket id %d out of range.\n", sockid);
142 		errno = EBADF;
143 		return -1;
144 	}
145 
146 	socket = &mtcp->smap[sockid];
147 	if (socket->socktype == MOS_SOCK_UNUSED) {
148 		TRACE_API("Invalid socket id: %d\n", sockid);
149 		errno = EBADF;
150 		return -1;
151 	}
152 
153 	if (*addrlen <= 0) {
154 		TRACE_API("Invalid addrlen: %d\n", *addrlen);
155 		errno = EINVAL;
156 		return -1;
157 	}
158 
159 	if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
160 	    socket->socktype != MOS_SOCK_STREAM) {
161 		TRACE_API("Invalid socket id: %d\n", sockid);
162 		errno = ENOTSOCK;
163 		return -1;
164 	}
165 
166 	*(struct sockaddr_in *)addr = socket->saddr;
167         *addrlen = sizeof(socket->saddr);
168 
169 	return 0;
170 }
171 /*----------------------------------------------------------------------------*/
172 int
173 mtcp_getsockopt(mctx_t mctx, int sockid, int level,
174 		int optname, void *optval, socklen_t *optlen)
175 {
176 	mtcp_manager_t mtcp;
177 	socket_map_t socket;
178 
179 	mtcp = GetMTCPManager(mctx);
180 	if (!mtcp) {
181 		errno = EACCES;
182 		return -1;
183 	}
184 
185 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
186 		TRACE_API("Socket id %d out of range.\n", sockid);
187 		errno = EBADF;
188 		return -1;
189 	}
190 
191 	switch (level) {
192 	case SOL_SOCKET:
193 		socket = &mtcp->smap[sockid];
194 		if (socket->socktype == MOS_SOCK_UNUSED) {
195 			TRACE_API("Invalid socket id: %d\n", sockid);
196 			errno = EBADF;
197 			return -1;
198 		}
199 
200 		if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
201 		    socket->socktype != MOS_SOCK_STREAM) {
202 			TRACE_API("Invalid socket id: %d\n", sockid);
203 			errno = ENOTSOCK;
204 			return -1;
205 		}
206 
207 		if (optname == SO_ERROR) {
208 			if (socket->socktype == MOS_SOCK_STREAM) {
209 				return GetSocketError(socket, optval, optlen);
210 			}
211 		}
212 		break;
213 	case SOL_MONSOCKET:
214 		/* check if the calling thread is in MOS context */
215 		if (mtcp->ctx->thread != pthread_self()) {
216 			errno = EPERM;
217 			return -1;
218 		}
219 		/*
220 		 * All options will only work for active
221 		 * monitor stream sockets
222 		 */
223 		socket = &mtcp->msmap[sockid];
224 		if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
225 			TRACE_API("Invalid socket id: %d\n", sockid);
226 			errno = ENOTSOCK;
227 			return -1;
228 		}
229 
230 		switch (optname) {
231 		case MOS_FRAGINFO_CLIBUF:
232 			return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen);
233 		case MOS_FRAGINFO_SVRBUF:
234 			return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen);
235 		case MOS_INFO_CLIBUF:
236 			return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen);
237 		case MOS_INFO_SVRBUF:
238 			return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen);
239 		case MOS_TCP_STATE_CLI:
240 			return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI,
241 							   optval, optlen);
242 		case MOS_TCP_STATE_SVR:
243 			return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR,
244 							   optval, optlen);
245 		case MOS_TIMESTAMP:
246 			return GetLastTimestamp(socket->monitor_stream->stream,
247 						(uint32_t *)optval,
248 						optlen);
249 		default:
250 		  TRACE_API("can't recognize the optname=%d\n", optname);
251 		  assert(0);
252 		}
253 		break;
254 	}
255 	errno = ENOSYS;
256 	return -1;
257 }
258 /*----------------------------------------------------------------------------*/
259 int
260 mtcp_setsockopt(mctx_t mctx, int sockid, int level,
261 		int optname, const void *optval, socklen_t optlen)
262 {
263 	mtcp_manager_t mtcp;
264 	socket_map_t socket;
265 	tcprb_t *rb;
266 
267 	mtcp = GetMTCPManager(mctx);
268 	if (!mtcp) {
269 		errno = EACCES;
270 		return -1;
271 	}
272 
273 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
274 		TRACE_API("Socket id %d out of range.\n", sockid);
275 		errno = EBADF;
276 		return -1;
277 	}
278 
279 	switch (level) {
280 	case SOL_SOCKET:
281 		socket = &mtcp->smap[sockid];
282 		if (socket->socktype == MOS_SOCK_UNUSED) {
283 			TRACE_API("Invalid socket id: %d\n", sockid);
284 			errno = EBADF;
285 			return -1;
286 		}
287 
288 		if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
289 		    socket->socktype != MOS_SOCK_STREAM) {
290 			TRACE_API("Invalid socket id: %d\n", sockid);
291 			errno = ENOTSOCK;
292 			return -1;
293 		}
294 		break;
295 	case SOL_MONSOCKET:
296 		socket = &mtcp->msmap[sockid];
297 		/*
298 		 * checking of calling thread to be in MOS context is
299 		 * disabled since both optnames can be called from
300 		 * `application' context (on passive sockets)
301 		 */
302 		/*
303 		 * if (mtcp->ctx->thread != pthread_self())
304 		 * return -1;
305 		 */
306 
307 		switch (optname) {
308 		case MOS_CLIOVERLAP:
309 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
310 				socket->monitor_stream->stream->rcvvar->rcvbuf :
311 			socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
312 			if (rb == NULL) {
313 				errno = EFAULT;
314 				return -1;
315 			}
316 			if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) {
317 				errno = EINVAL;
318 				return -1;
319 			} else
320 				return 0;
321 			break;
322 		case MOS_SVROVERLAP:
323 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
324 				socket->monitor_stream->stream->rcvvar->rcvbuf :
325 			socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
326 			if (rb == NULL) {
327 				errno = EFAULT;
328 				return -1;
329 			}
330 			if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) {
331 				errno = EINVAL;
332 				return -1;
333 			} else
334 				return 0;
335 			break;
336 		case MOS_CLIBUF:
337 #if 0
338 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
339 				errno = EBADF;
340 				return -1;
341 			}
342 #endif
343 #ifdef DISABLE_DYN_RESIZE
344 			if (*(int *)optval != 0)
345 				return -1;
346 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
347 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
348 					socket->monitor_stream->stream->rcvvar->rcvbuf :
349 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
350 				if (rb) {
351 					tcprb_resize_meta(rb, 0);
352 					tcprb_resize(rb, 0);
353 				}
354 			}
355 			return DisableBuf(socket, MOS_SIDE_CLI);
356 #else
357 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
358 				socket->monitor_stream->stream->rcvvar->rcvbuf :
359 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
360 			if (tcprb_resize_meta(rb, *(int *)optval) < 0)
361 				return -1;
362 			return tcprb_resize(rb,
363 					(((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
364 #endif
365 		case MOS_SVRBUF:
366 #if 0
367 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
368 				errno = EBADF;
369 				return -1;
370 			}
371 #endif
372 #ifdef DISABLE_DYN_RESIZE
373 			if (*(int *)optval != 0)
374 				return -1;
375 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
376 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
377 					socket->monitor_stream->stream->rcvvar->rcvbuf :
378 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
379 				if (rb) {
380 					tcprb_resize_meta(rb, 0);
381 					tcprb_resize(rb, 0);
382 				}
383 			}
384 			return DisableBuf(socket, MOS_SIDE_SVR);
385 #else
386 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
387 				socket->monitor_stream->stream->rcvvar->rcvbuf :
388 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
389 			if (tcprb_resize_meta(rb, *(int *)optval) < 0)
390 				return -1;
391 			return tcprb_resize(rb,
392 					(((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
393 #endif
394 		case MOS_FRAG_CLIBUF:
395 #if 0
396 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
397 				errno = EBADF;
398 				return -1;
399 			}
400 #endif
401 #ifdef DISABLE_DYN_RESIZE
402 			if (*(int *)optval != 0)
403 				return -1;
404 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
405 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
406 					socket->monitor_stream->stream->rcvvar->rcvbuf :
407 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
408 				if (rb)
409 					tcprb_resize(rb, 0);
410 			}
411 			return 0;
412 #else
413 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
414 				socket->monitor_stream->stream->rcvvar->rcvbuf :
415 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
416 			if (rb->len == 0)
417 				return tcprb_resize_meta(rb, *(int *)optval);
418 			else
419 				return -1;
420 #endif
421 		case MOS_FRAG_SVRBUF:
422 #if 0
423 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
424 				errno = EBADF;
425 				return -1;
426 			}
427 #endif
428 #ifdef DISABLE_DYN_RESIZE
429 			if (*(int *)optval != 0)
430 				return -1;
431 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
432 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
433 					socket->monitor_stream->stream->rcvvar->rcvbuf :
434 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
435 				if (rb)
436 					tcprb_resize(rb, 0);
437 			}
438 			return 0;
439 #else
440 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
441 				socket->monitor_stream->stream->rcvvar->rcvbuf :
442 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
443 			if (rb->len == 0)
444 				return tcprb_resize_meta(rb, *(int *)optval);
445 			else
446 				return -1;
447 #endif
448 		case MOS_SEQ_REMAP:
449 			break;
450 		case MOS_STOP_MON:
451 			return mtcp_cb_stop(mctx, sockid, *(int *)optval);
452 		default:
453 			TRACE_API("invalid optname=%d\n", optname);
454 			assert(0);
455 		}
456 		break;
457 	}
458 
459 	errno = ENOSYS;
460 	return -1;
461 }
462 /*----------------------------------------------------------------------------*/
463 int
464 mtcp_setsock_nonblock(mctx_t mctx, int sockid)
465 {
466 	mtcp_manager_t mtcp;
467 
468 	mtcp = GetMTCPManager(mctx);
469 	if (!mtcp) {
470 		errno = EACCES;
471 		return -1;
472 	}
473 
474 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
475 		TRACE_API("Socket id %d out of range.\n", sockid);
476 		errno = EBADF;
477 		return -1;
478 	}
479 
480 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
481 		TRACE_API("Invalid socket id: %d\n", sockid);
482 		errno = EBADF;
483 		return -1;
484 	}
485 
486 	mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
487 
488 	return 0;
489 }
490 /*----------------------------------------------------------------------------*/
491 int
492 mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp)
493 {
494 	mtcp_manager_t mtcp;
495 	socket_map_t socket;
496 
497 	mtcp = GetMTCPManager(mctx);
498 	if (!mtcp) {
499 		errno = EACCES;
500 		return -1;
501 	}
502 
503 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
504 		TRACE_API("Socket id %d out of range.\n", sockid);
505 		errno = EBADF;
506 		return -1;
507 	}
508 
509 	/* only support stream socket */
510 	socket = &mtcp->smap[sockid];
511 
512 	if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
513 		socket->socktype != MOS_SOCK_STREAM) {
514 		TRACE_API("Invalid socket id: %d\n", sockid);
515 		errno = EBADF;
516 		return -1;
517 	}
518 
519 	if (!argp) {
520 		errno = EFAULT;
521 		return -1;
522 	}
523 
524 	if (request == FIONREAD) {
525 		tcp_stream *cur_stream;
526 		tcprb_t *rbuf;
527 
528 		cur_stream = socket->stream;
529 		if (!cur_stream) {
530 			errno = EBADF;
531 			return -1;
532 		}
533 
534 		rbuf = cur_stream->rcvvar->rcvbuf;
535 		*(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0;
536 
537 	} else if (request == FIONBIO) {
538 		/*
539 		 * sockets can only be set to blocking/non-blocking
540 		 * modes during initialization
541 		 */
542 		if ((*(int *)argp))
543 			mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
544 		else
545 			mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK;
546 	} else {
547 		errno = EINVAL;
548 		return -1;
549 	}
550 
551 	return 0;
552 }
553 /*----------------------------------------------------------------------------*/
554 static int
555 mtcp_monitor(mctx_t mctx, socket_map_t sock)
556 {
557 	mtcp_manager_t mtcp;
558 	struct mon_listener *monitor;
559 	int sockid = sock->id;
560 
561 	mtcp = GetMTCPManager(mctx);
562 	if (!mtcp) {
563 		errno = EACCES;
564 		return -1;
565 	}
566 
567 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
568 		TRACE_API("Socket id %d out of range.\n", sockid);
569 		errno = EBADF;
570 		return -1;
571 	}
572 
573 	if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) {
574 		TRACE_API("Invalid socket id: %d\n", sockid);
575 		errno = EBADF;
576 		return -1;
577 	}
578 
579 	if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM ||
580 	      mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) {
581 		TRACE_API("Not a monitor socket. id: %d\n", sockid);
582 		errno = ENOTSOCK;
583 		return -1;
584 	}
585 
586 	monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener));
587 	if (!monitor) {
588 		/* errno set from the malloc() */
589 		errno = ENOMEM;
590 		return -1;
591 	}
592 
593 	/* create a monitor-specific event queue */
594 	monitor->eq = CreateEventQueue(g_config.mos->max_concurrency);
595 	if (!monitor->eq) {
596 		TRACE_API("Can't create event queue (concurrency: %d) for "
597 			  "monitor read event registrations!\n",
598 			  g_config.mos->max_concurrency);
599 		free(monitor);
600 		errno = ENOMEM;
601 		return -1;
602 	}
603 
604 	/* set monitor-related basic parameters */
605 #ifndef NEWEV
606 	monitor->ude_id = UDE_OFFSET;
607 #endif
608 	monitor->socket = sock;
609 	monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL;
610 
611 	/* perform both sides monitoring by default */
612 	monitor->client_mon = monitor->server_mon = 1;
613 
614 	/* add monitor socket to the monitor list */
615 	TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link);
616 
617 	mtcp->msmap[sockid].monitor_listener = monitor;
618 
619 	return 0;
620 }
621 /*----------------------------------------------------------------------------*/
622 int
623 mtcp_socket(mctx_t mctx, int domain, int type, int protocol)
624 {
625 	mtcp_manager_t mtcp;
626 	socket_map_t socket;
627 
628 	mtcp = GetMTCPManager(mctx);
629 	if (!mtcp) {
630 		errno = EACCES;
631 		return -1;
632 	}
633 
634 	if (domain != AF_INET) {
635 		errno = EAFNOSUPPORT;
636 		return -1;
637 	}
638 
639 	if (type == (int)SOCK_STREAM) {
640 		type = MOS_SOCK_STREAM;
641 	} else if (type == MOS_SOCK_MONITOR_STREAM ||
642 		   type == MOS_SOCK_MONITOR_RAW) {
643 		/* do nothing for the time being */
644 	} else {
645 		/* Not supported type */
646 		errno = EINVAL;
647 		return -1;
648 	}
649 
650 	socket = AllocateSocket(mctx, type);
651 	if (!socket) {
652 		errno = ENFILE;
653 		return -1;
654 	}
655 
656 	if (type == MOS_SOCK_MONITOR_STREAM ||
657 	    type == MOS_SOCK_MONITOR_RAW) {
658 		mtcp_manager_t mtcp = GetMTCPManager(mctx);
659 		if (!mtcp) {
660 			errno = EACCES;
661 			return -1;
662 		}
663 		mtcp_monitor(mctx, socket);
664 #ifdef NEWEV
665 		socket->monitor_listener->stree_dontcare = NULL;
666 		socket->monitor_listener->stree_pre_rcv = NULL;
667 		socket->monitor_listener->stree_post_snd = NULL;
668 #else
669 		InitEvB(mtcp, &socket->monitor_listener->dontcare_evb);
670 		InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb);
671 		InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb);
672 #endif
673 	}
674 
675 	return socket->id;
676 }
677 /*----------------------------------------------------------------------------*/
678 int
679 mtcp_bind(mctx_t mctx, int sockid,
680 		const struct sockaddr *addr, socklen_t addrlen)
681 {
682 	mtcp_manager_t mtcp;
683 	struct sockaddr_in *addr_in;
684 
685 	mtcp = GetMTCPManager(mctx);
686 	if (!mtcp) {
687 		errno = EACCES;
688 		return -1;
689 	}
690 
691 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
692 		TRACE_API("Socket id %d out of range.\n", sockid);
693 		errno = EBADF;
694 		return -1;
695 	}
696 
697 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
698 		TRACE_API("Invalid socket id: %d\n", sockid);
699 		errno = EBADF;
700 		return -1;
701 	}
702 
703 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM &&
704 			mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
705 		TRACE_API("Not a stream socket id: %d\n", sockid);
706 		errno = ENOTSOCK;
707 		return -1;
708 	}
709 
710 	if (!addr) {
711 		TRACE_API("Socket %d: empty address!\n", sockid);
712 		errno = EINVAL;
713 		return -1;
714 	}
715 
716 	if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) {
717 		TRACE_API("Socket %d: adress already bind for this socket.\n", sockid);
718 		errno = EINVAL;
719 		return -1;
720 	}
721 
722 	/* we only allow bind() for AF_INET address */
723 	if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
724 		TRACE_API("Socket %d: invalid argument!\n", sockid);
725 		errno = EINVAL;
726 		return -1;
727 	}
728 
729 	if (mtcp->listener) {
730 		TRACE_API("Address already bound!\n");
731 		errno = EINVAL;
732 		return -1;
733 	}
734 	addr_in = (struct sockaddr_in *)addr;
735 	mtcp->smap[sockid].saddr = *addr_in;
736 	mtcp->smap[sockid].opts |= MTCP_ADDR_BIND;
737 
738 	return 0;
739 }
740 /*----------------------------------------------------------------------------*/
741 int
742 mtcp_listen(mctx_t mctx, int sockid, int backlog)
743 {
744 	mtcp_manager_t mtcp;
745 	struct tcp_listener *listener;
746 
747 	mtcp = GetMTCPManager(mctx);
748 	if (!mtcp) {
749 		errno = EACCES;
750 		return -1;
751 	}
752 
753 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
754 		TRACE_API("Socket id %d out of range.\n", sockid);
755 		errno = EBADF;
756 		return -1;
757 	}
758 
759 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
760 		TRACE_API("Invalid socket id: %d\n", sockid);
761 		errno = EBADF;
762 		return -1;
763 	}
764 
765 	if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) {
766 		mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN;
767 	}
768 
769 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
770 		TRACE_API("Not a listening socket. id: %d\n", sockid);
771 		errno = ENOTSOCK;
772 		return -1;
773 	}
774 
775 	if (backlog <= 0 || backlog > g_config.mos->max_concurrency) {
776 		errno = EINVAL;
777 		return -1;
778 	}
779 
780 	listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener));
781 	if (!listener) {
782 		/* errno set from the malloc() */
783 		errno = ENOMEM;
784 		return -1;
785 	}
786 
787 	listener->sockid = sockid;
788 	listener->backlog = backlog;
789 	listener->socket = &mtcp->smap[sockid];
790 
791 	if (pthread_cond_init(&listener->accept_cond, NULL)) {
792 		perror("pthread_cond_init of ctx->accept_cond\n");
793 		/* errno set by pthread_cond_init() */
794 		free(listener);
795 		return -1;
796 	}
797 	if (pthread_mutex_init(&listener->accept_lock, NULL)) {
798 		perror("pthread_mutex_init of ctx->accept_lock\n");
799 		/* errno set by pthread_mutex_init() */
800 		free(listener);
801 		return -1;
802 	}
803 
804 	listener->acceptq = CreateStreamQueue(backlog);
805 	if (!listener->acceptq) {
806 		free(listener);
807 		errno = ENOMEM;
808 		return -1;
809 	}
810 
811 	mtcp->smap[sockid].listener = listener;
812 	mtcp->listener = listener;
813 
814 	return 0;
815 }
816 /*----------------------------------------------------------------------------*/
817 int
818 mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen)
819 {
820 	mtcp_manager_t mtcp;
821 	struct tcp_listener *listener;
822 	socket_map_t socket;
823 	tcp_stream *accepted = NULL;
824 
825 	mtcp = GetMTCPManager(mctx);
826 	if (!mtcp) {
827 		errno = EACCES;
828 		return -1;
829 	}
830 
831 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
832 		TRACE_API("Socket id %d out of range.\n", sockid);
833 		errno = EBADF;
834 		return -1;
835 	}
836 
837 	/* requires listening socket */
838 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
839 		errno = EINVAL;
840 		return -1;
841 	}
842 
843 	listener = mtcp->smap[sockid].listener;
844 
845 	/* dequeue from the acceptq without lock first */
846 	/* if nothing there, acquire lock and cond_wait */
847 	accepted = StreamDequeue(listener->acceptq);
848 	if (!accepted) {
849 		if (listener->socket->opts & MTCP_NONBLOCK) {
850 			errno = EAGAIN;
851 			return -1;
852 
853 		} else {
854 			pthread_mutex_lock(&listener->accept_lock);
855 			while ((accepted = StreamDequeue(listener->acceptq)) == NULL) {
856 				pthread_cond_wait(&listener->accept_cond, &listener->accept_lock);
857 
858 				if (mtcp->ctx->done || mtcp->ctx->exit) {
859 					pthread_mutex_unlock(&listener->accept_lock);
860 					errno = EINTR;
861 					return -1;
862 				}
863 			}
864 			pthread_mutex_unlock(&listener->accept_lock);
865 		}
866 	}
867 
868 	if (!accepted) {
869 		TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n");
870 	}
871 
872 	if (!accepted->socket) {
873 		socket = AllocateSocket(mctx, MOS_SOCK_STREAM);
874 		if (!socket) {
875 			TRACE_ERROR("Failed to create new socket!\n");
876 			/* TODO: destroy the stream */
877 			errno = ENFILE;
878 			return -1;
879 		}
880 		socket->stream = accepted;
881 		accepted->socket = socket;
882 
883 		/* set socket addr parameters */
884 		socket->saddr.sin_family = AF_INET;
885 		socket->saddr.sin_port = accepted->dport;
886 		socket->saddr.sin_addr.s_addr = accepted->daddr;
887 
888 		/* if monitor is enabled, complete the socket assignment */
889 		if (socket->stream->pair_stream != NULL)
890 			socket->stream->pair_stream->socket = socket;
891 	}
892 
893 	if (!(listener->socket->epoll & MOS_EPOLLET) &&
894 	    !StreamQueueIsEmpty(listener->acceptq))
895 		AddEpollEvent(mtcp->ep,
896 			      USR_SHADOW_EVENT_QUEUE,
897 			      listener->socket, MOS_EPOLLIN);
898 
899 	TRACE_API("Stream %d accepted.\n", accepted->id);
900 
901 	if (addr && addrlen) {
902 		struct sockaddr_in *addr_in = (struct sockaddr_in *)addr;
903 		addr_in->sin_family = AF_INET;
904 		addr_in->sin_port = accepted->dport;
905 		addr_in->sin_addr.s_addr = accepted->daddr;
906 		*addrlen = sizeof(struct sockaddr_in);
907 	}
908 
909 	return accepted->socket->id;
910 }
911 /*----------------------------------------------------------------------------*/
912 int
913 mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr,
914 		in_addr_t daddr, in_addr_t dport)
915 {
916 	mtcp_manager_t mtcp;
917 	addr_pool_t ap;
918 
919 	mtcp = GetMTCPManager(mctx);
920 	if (!mtcp) {
921 		errno = EACCES;
922 		return -1;
923 	}
924 
925 	if (saddr_base == INADDR_ANY) {
926 		int nif_out;
927 
928 		/* for the INADDR_ANY, find the output interface for the destination
929 		   and set the saddr_base as the ip address of the output interface */
930 		nif_out = GetOutputInterface(daddr);
931 		if (nif_out < 0) {
932 			TRACE_DBG("Could not determine nif idx!\n");
933 			errno = EINVAL;
934 			return -1;
935 		}
936 		saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr;
937 	}
938 
939 	ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus,
940 			saddr_base, num_addr, daddr, dport);
941 	if (!ap) {
942 		errno = ENOMEM;
943 		return -1;
944 	}
945 
946 	mtcp->ap = ap;
947 
948 	return 0;
949 }
950 /*----------------------------------------------------------------------------*/
951 int
952 eval_bpf_5tuple(struct sfbpf_program fcode,
953 				in_addr_t saddr, in_port_t sport,
954 				in_addr_t daddr, in_port_t dport) {
955 	uint8_t buf[TOTAL_TCP_HEADER_LEN];
956 	struct ethhdr *ethh;
957 	struct iphdr *iph;
958 	struct tcphdr *tcph;
959 
960 	ethh = (struct ethhdr *)buf;
961 	ethh->h_proto = htons(ETH_P_IP);
962 	iph = (struct iphdr *)(ethh + 1);
963 	iph->ihl = IP_HEADER_LEN >> 2;
964 	iph->version = 4;
965 	iph->tos = 0;
966 	iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN);
967 	iph->id = htons(0);
968 	iph->protocol = IPPROTO_TCP;
969 	iph->saddr = saddr;
970 	iph->daddr = daddr;
971 	iph->check = 0;
972 	tcph = (struct tcphdr *)(iph + 1);
973 	tcph->source = sport;
974 	tcph->dest = dport;
975 
976 	return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr),
977 						 TOTAL_TCP_HEADER_LEN);
978 }
979 /*----------------------------------------------------------------------------*/
980 int
981 mtcp_connect(mctx_t mctx, int sockid,
982 		const struct sockaddr *addr, socklen_t addrlen)
983 {
984 	mtcp_manager_t mtcp;
985 	socket_map_t socket;
986 	tcp_stream *cur_stream;
987 	struct sockaddr_in *addr_in;
988 	in_addr_t dip;
989 	in_port_t dport;
990 	int is_dyn_bound = FALSE;
991 	int ret, nif;
992 	int cnt_match = 0;
993 	struct mon_listener *walk;
994 	struct sfbpf_program fcode;
995 
996 	cur_stream = NULL;
997 	mtcp = GetMTCPManager(mctx);
998 	if (!mtcp) {
999 		errno = EACCES;
1000 		return -1;
1001 	}
1002 
1003 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1004 		TRACE_API("Socket id %d out of range.\n", sockid);
1005 		errno = EBADF;
1006 		return -1;
1007 	}
1008 
1009 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1010 		TRACE_API("Invalid socket id: %d\n", sockid);
1011 		errno = EBADF;
1012 		return -1;
1013 	}
1014 
1015 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
1016 		TRACE_API("Not an end socket. id: %d\n", sockid);
1017 		errno = ENOTSOCK;
1018 		return -1;
1019 	}
1020 
1021 	if (!addr) {
1022 		TRACE_API("Socket %d: empty address!\n", sockid);
1023 		errno = EFAULT;
1024 		return -1;
1025 	}
1026 
1027 	/* we only allow bind() for AF_INET address */
1028 	if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
1029 		TRACE_API("Socket %d: invalid argument!\n", sockid);
1030 		errno = EAFNOSUPPORT;
1031 		return -1;
1032 	}
1033 
1034 	socket = &mtcp->smap[sockid];
1035 	if (socket->stream) {
1036 		TRACE_API("Socket %d: stream already exist!\n", sockid);
1037 		if (socket->stream->state >= TCP_ST_ESTABLISHED) {
1038 			errno = EISCONN;
1039 		} else {
1040 			errno = EALREADY;
1041 		}
1042 		return -1;
1043 	}
1044 
1045 	addr_in = (struct sockaddr_in *)addr;
1046 	dip = addr_in->sin_addr.s_addr;
1047 	dport = addr_in->sin_port;
1048 
1049 	/* address binding */
1050 	if (socket->opts & MTCP_ADDR_BIND &&
1051 	    socket->saddr.sin_port != INPORT_ANY &&
1052 	    socket->saddr.sin_addr.s_addr != INADDR_ANY) {
1053 		int rss_core;
1054 
1055 		rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip,
1056 				socket->saddr.sin_port, dport, num_queues);
1057 
1058 		if (rss_core != mctx->cpu) {
1059 			errno = EINVAL;
1060 			return -1;
1061 		}
1062 	} else {
1063 		if (mtcp->ap) {
1064 			ret = FetchAddress(mtcp->ap,
1065 					mctx->cpu, num_queues, addr_in, &socket->saddr);
1066 		} else {
1067 			nif = GetOutputInterface(dip);
1068 			if (nif < 0) {
1069 				errno = EINVAL;
1070 				return -1;
1071 			}
1072 			ret = FetchAddress(ap[nif],
1073 					   mctx->cpu, num_queues, addr_in, &socket->saddr);
1074 		}
1075 		if (ret < 0) {
1076 			errno = EAGAIN;
1077 			return -1;
1078 		}
1079 		socket->opts |= MTCP_ADDR_BIND;
1080 		is_dyn_bound = TRUE;
1081 	}
1082 
1083 	cnt_match = 0;
1084 	if (mtcp->num_msp > 0) {
1085 		TAILQ_FOREACH(walk, &mtcp->monitors, link) {
1086 			fcode = walk->stream_syn_fcode;
1087 			if (!(ISSET_BPFFILTER(fcode) &&
1088 				  eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr,
1089 								  socket->saddr.sin_port,
1090 								  dip, dport) == 0)) {
1091 				walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1
1092 				cnt_match++;
1093 			}
1094 		}
1095 	}
1096 
1097 	if (mtcp->num_msp > 0 && cnt_match > 0) {
1098 		/* 150820 dhkim: XXX: embedded mode is not verified */
1099 #if 1
1100 		cur_stream = CreateClientTCPStream(mtcp, socket,
1101 						 STREAM_TYPE(MOS_SOCK_STREAM) |
1102 						 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
1103 						 socket->saddr.sin_addr.s_addr,
1104 						 socket->saddr.sin_port, dip, dport, NULL);
1105 #else
1106 		cur_stream = CreateDualTCPStream(mtcp, socket,
1107 						 STREAM_TYPE(MOS_SOCK_STREAM) |
1108 						 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
1109 						 socket->saddr.sin_addr.s_addr,
1110 						 socket->saddr.sin_port, dip, dport, NULL);
1111 #endif
1112 	}
1113 	else
1114 		cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM),
1115 					     socket->saddr.sin_addr.s_addr,
1116 					     socket->saddr.sin_port, dip, dport, NULL);
1117 	if (!cur_stream) {
1118 		TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid);
1119 		errno = ENOMEM;
1120 		return -1;
1121 	}
1122 
1123 	if (is_dyn_bound)
1124 		cur_stream->is_bound_addr = TRUE;
1125 	cur_stream->sndvar->cwnd = 1;
1126 	cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10;
1127 	cur_stream->side = MOS_SIDE_CLI;
1128 	/* if monitor is enabled, update the pair stream side as well */
1129 	if (cur_stream->pair_stream) {
1130 		cur_stream->pair_stream->side = MOS_SIDE_SVR;
1131 		/*
1132 		 * if buffer management is off, then disable
1133 		 * monitoring tcp ring of server...
1134 		 * if there is even a single monitor asking for
1135 		 * buffer management, enable it (that's why the
1136 		 * need for the loop)
1137 		 */
1138 		cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF;
1139 		struct socket_map *walk;
1140 		SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
1141 			uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt;
1142 			if (bm > cur_stream->pair_stream->buffer_mgmt) {
1143 				cur_stream->pair_stream->buffer_mgmt = bm;
1144 				break;
1145 			}
1146 		} SOCKQ_FOREACH_END;
1147 	}
1148 
1149 	cur_stream->state = TCP_ST_SYN_SENT;
1150 	cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1151 
1152 	TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id);
1153 
1154 	SQ_LOCK(&mtcp->ctx->connect_lock);
1155 	ret = StreamEnqueue(mtcp->connectq, cur_stream);
1156 	SQ_UNLOCK(&mtcp->ctx->connect_lock);
1157 	mtcp->wakeup_flag = TRUE;
1158 	if (ret < 0) {
1159 		TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid);
1160 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1161 		StreamEnqueue(mtcp->destroyq, cur_stream);
1162 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1163 		errno = EAGAIN;
1164 		return -1;
1165 	}
1166 
1167 	/* if nonblocking socket, return EINPROGRESS */
1168 	if (socket->opts & MTCP_NONBLOCK) {
1169 		errno = EINPROGRESS;
1170 		return -1;
1171 
1172 	} else {
1173 		while (1) {
1174 			if (!cur_stream) {
1175 				TRACE_ERROR("STREAM DESTROYED\n");
1176 				errno = ETIMEDOUT;
1177 				return -1;
1178 			}
1179 			if (cur_stream->state > TCP_ST_ESTABLISHED) {
1180 				TRACE_ERROR("Socket %d: weird state %s\n",
1181 						sockid, TCPStateToString(cur_stream));
1182 				// TODO: how to handle this?
1183 				errno = ENOSYS;
1184 				return -1;
1185 			}
1186 
1187 			if (cur_stream->state == TCP_ST_ESTABLISHED) {
1188 				break;
1189 			}
1190 			usleep(1000);
1191 		}
1192 	}
1193 
1194 	return 0;
1195 }
1196 /*----------------------------------------------------------------------------*/
1197 static inline int
1198 CloseStreamSocket(mctx_t mctx, int sockid)
1199 {
1200 	mtcp_manager_t mtcp;
1201 	tcp_stream *cur_stream;
1202 	int ret;
1203 
1204 	mtcp = GetMTCPManager(mctx);
1205 	if (!mtcp) {
1206 		errno = EACCES;
1207 		return -1;
1208 	}
1209 
1210 	cur_stream = mtcp->smap[sockid].stream;
1211 	if (!cur_stream) {
1212 		TRACE_API("Socket %d: stream does not exist.\n", sockid);
1213 		errno = ENOTCONN;
1214 		return -1;
1215 	}
1216 
1217 	if (cur_stream->closed) {
1218 		TRACE_API("Socket %d (Stream %u): already closed stream\n",
1219 				sockid, cur_stream->id);
1220 		return 0;
1221 	}
1222 	cur_stream->closed = TRUE;
1223 
1224 	TRACE_API("Stream %d: closing the stream.\n", cur_stream->id);
1225 
1226 	/* 141029 dhkim: Check this! */
1227 	cur_stream->socket = NULL;
1228 
1229 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
1230 		TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n",
1231 				cur_stream->id);
1232 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1233 		StreamEnqueue(mtcp->destroyq, cur_stream);
1234 		mtcp->wakeup_flag = TRUE;
1235 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1236 		return 0;
1237 
1238 	} else if (cur_stream->state == TCP_ST_SYN_SENT) {
1239 #if 1
1240 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1241 		StreamEnqueue(mtcp->destroyq, cur_stream);
1242 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1243 		mtcp->wakeup_flag = TRUE;
1244 #endif
1245 		return -1;
1246 
1247 	} else if (cur_stream->state != TCP_ST_ESTABLISHED &&
1248 			cur_stream->state != TCP_ST_CLOSE_WAIT) {
1249 		TRACE_API("Stream %d at state %s\n",
1250 				cur_stream->id, TCPStateToString(cur_stream));
1251 		errno = EBADF;
1252 		return -1;
1253 	}
1254 
1255 	SQ_LOCK(&mtcp->ctx->close_lock);
1256 	cur_stream->sndvar->on_closeq = TRUE;
1257 	ret = StreamEnqueue(mtcp->closeq, cur_stream);
1258 	mtcp->wakeup_flag = TRUE;
1259 	SQ_UNLOCK(&mtcp->ctx->close_lock);
1260 
1261 	if (ret < 0) {
1262 		TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
1263 		errno = EAGAIN;
1264 		return -1;
1265 	}
1266 
1267 	return 0;
1268 }
1269 /*----------------------------------------------------------------------------*/
1270 static inline int
1271 CloseListeningSocket(mctx_t mctx, int sockid)
1272 {
1273 	mtcp_manager_t mtcp;
1274 	struct tcp_listener *listener;
1275 
1276 	mtcp = GetMTCPManager(mctx);
1277 	if (!mtcp) {
1278 		errno = EACCES;
1279 		return -1;
1280 	}
1281 
1282 	listener = mtcp->smap[sockid].listener;
1283 	if (!listener) {
1284 		errno = EINVAL;
1285 		return -1;
1286 	}
1287 
1288 	if (listener->acceptq) {
1289 		DestroyStreamQueue(listener->acceptq);
1290 		listener->acceptq = NULL;
1291 	}
1292 
1293 	pthread_mutex_lock(&listener->accept_lock);
1294 	pthread_cond_signal(&listener->accept_cond);
1295 	pthread_mutex_unlock(&listener->accept_lock);
1296 
1297 	pthread_cond_destroy(&listener->accept_cond);
1298 	pthread_mutex_destroy(&listener->accept_lock);
1299 
1300 	free(listener);
1301 	mtcp->smap[sockid].listener = NULL;
1302 
1303 	return 0;
1304 }
1305 /*----------------------------------------------------------------------------*/
1306 int
1307 mtcp_close(mctx_t mctx, int sockid)
1308 {
1309 	mtcp_manager_t mtcp;
1310 	int ret;
1311 
1312 	mtcp = GetMTCPManager(mctx);
1313 	if (!mtcp) {
1314 		errno = EACCES;
1315 		return -1;
1316 	}
1317 
1318 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1319 		TRACE_API("Socket id %d out of range.\n", sockid);
1320 		errno = EBADF;
1321 		return -1;
1322 	}
1323 
1324 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1325 		TRACE_API("Invalid socket id: %d\n", sockid);
1326 		errno = EBADF;
1327 		return -1;
1328 	}
1329 
1330 	TRACE_API("Socket %d: mtcp_close called.\n", sockid);
1331 
1332 	switch (mtcp->smap[sockid].socktype) {
1333 	case MOS_SOCK_STREAM:
1334 		ret = CloseStreamSocket(mctx, sockid);
1335 		break;
1336 
1337 	case MOS_SOCK_STREAM_LISTEN:
1338 		ret = CloseListeningSocket(mctx, sockid);
1339 		break;
1340 
1341 	case MOS_SOCK_EPOLL:
1342 		ret = CloseEpollSocket(mctx, sockid);
1343 		break;
1344 
1345 	case MOS_SOCK_PIPE:
1346 		ret = PipeClose(mctx, sockid);
1347 		break;
1348 
1349 	default:
1350 		errno = EINVAL;
1351 		ret = -1;
1352 		break;
1353 	}
1354 
1355 	FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
1356 
1357 	return ret;
1358 }
1359 /*----------------------------------------------------------------------------*/
1360 int
1361 mtcp_abort(mctx_t mctx, int sockid)
1362 {
1363 	mtcp_manager_t mtcp;
1364 	tcp_stream *cur_stream;
1365 	int ret;
1366 
1367 	mtcp = GetMTCPManager(mctx);
1368 	if (!mtcp) {
1369 		errno = EACCES;
1370 		return -1;
1371 	}
1372 
1373 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1374 		TRACE_API("Socket id %d out of range.\n", sockid);
1375 		errno = EBADF;
1376 		return -1;
1377 	}
1378 
1379 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1380 		TRACE_API("Invalid socket id: %d\n", sockid);
1381 		errno = EBADF;
1382 		return -1;
1383 	}
1384 
1385 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
1386 		TRACE_API("Not an end socket. id: %d\n", sockid);
1387 		errno = ENOTSOCK;
1388 		return -1;
1389 	}
1390 
1391 	cur_stream = mtcp->smap[sockid].stream;
1392 	if (!cur_stream) {
1393 		TRACE_API("Stream %d: does not exist.\n", sockid);
1394 		errno = ENOTCONN;
1395 		return -1;
1396 	}
1397 
1398 	TRACE_API("Socket %d: mtcp_abort()\n", sockid);
1399 
1400 	FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
1401 	cur_stream->socket = NULL;
1402 
1403 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
1404 		TRACE_API("Stream %d: connection already reset.\n", sockid);
1405 		return ERROR;
1406 
1407 	} else if (cur_stream->state == TCP_ST_SYN_SENT) {
1408 		/* TODO: this should notify event failure to all
1409 		   previous read() or write() calls */
1410 		cur_stream->state = TCP_ST_CLOSED_RSVD;
1411 		cur_stream->close_reason = TCP_ACTIVE_CLOSE;
1412 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1413 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1414 		StreamEnqueue(mtcp->destroyq, cur_stream);
1415 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1416 		mtcp->wakeup_flag = TRUE;
1417 		return 0;
1418 
1419 	} else if (cur_stream->state == TCP_ST_CLOSING ||
1420 			cur_stream->state == TCP_ST_LAST_ACK ||
1421 			cur_stream->state == TCP_ST_TIME_WAIT) {
1422 		cur_stream->state = TCP_ST_CLOSED_RSVD;
1423 		cur_stream->close_reason = TCP_ACTIVE_CLOSE;
1424 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1425 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1426 		StreamEnqueue(mtcp->destroyq, cur_stream);
1427 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1428 		mtcp->wakeup_flag = TRUE;
1429 		return 0;
1430 	}
1431 
1432 	/* the stream structure will be destroyed after sending RST */
1433 	if (cur_stream->sndvar->on_resetq) {
1434 		TRACE_ERROR("Stream %d: calling mtcp_abort() "
1435 				"when in reset queue.\n", sockid);
1436 		errno = ECONNRESET;
1437 		return -1;
1438 	}
1439 	SQ_LOCK(&mtcp->ctx->reset_lock);
1440 	cur_stream->sndvar->on_resetq = TRUE;
1441 	ret = StreamEnqueue(mtcp->resetq, cur_stream);
1442 	SQ_UNLOCK(&mtcp->ctx->reset_lock);
1443 	mtcp->wakeup_flag = TRUE;
1444 
1445 	if (ret < 0) {
1446 		TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
1447 		errno = EAGAIN;
1448 		return -1;
1449 	}
1450 
1451 	return 0;
1452 }
1453 /*----------------------------------------------------------------------------*/
1454 static inline int
1455 PeekForUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len)
1456 {
1457 	struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
1458 	int copylen;
1459 	tcprb_t *rb = rcvvar->rcvbuf;
1460 
1461 	if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) {
1462 		errno = EAGAIN;
1463 		return -1;
1464 	}
1465 
1466 	return copylen;
1467 }
1468 /*----------------------------------------------------------------------------*/
1469 static inline int
1470 CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len)
1471 {
1472 	struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
1473 	int copylen;
1474 	tcprb_t *rb = rcvvar->rcvbuf;
1475 	if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) {
1476 		errno = EAGAIN;
1477 		return -1;
1478 	}
1479 	tcprb_setpile(rb, rb->pile + copylen);
1480 
1481 	rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb);
1482 	//printf("rcv_wnd: %d\n", rcvvar->rcv_wnd);
1483 
1484 	/* Advertise newly freed receive buffer */
1485 	if (cur_stream->need_wnd_adv) {
1486 		if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) {
1487 			if (!cur_stream->sndvar->on_ackq) {
1488 				SQ_LOCK(&mtcp->ctx->ackq_lock);
1489 				cur_stream->sndvar->on_ackq = TRUE;
1490 				StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */
1491 				SQ_UNLOCK(&mtcp->ctx->ackq_lock);
1492 				cur_stream->need_wnd_adv = FALSE;
1493 				mtcp->wakeup_flag = TRUE;
1494 			}
1495 		}
1496 	}
1497 
1498 	return copylen;
1499 }
1500 /*----------------------------------------------------------------------------*/
1501 ssize_t
1502 mtcp_recv(mctx_t mctx, int sockid, char *buf, size_t len, int flags)
1503 {
1504 	mtcp_manager_t mtcp;
1505 	socket_map_t socket;
1506 	tcp_stream *cur_stream;
1507 	struct tcp_recv_vars *rcvvar;
1508 	int event_remaining, merged_len;
1509 	int ret;
1510 
1511 	mtcp = GetMTCPManager(mctx);
1512 	if (!mtcp) {
1513 		errno = EACCES;
1514 		return -1;
1515 	}
1516 
1517 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1518 		TRACE_API("Socket id %d out of range.\n", sockid);
1519 		errno = EBADF;
1520 		return -1;
1521 	}
1522 
1523 	socket = &mtcp->smap[sockid];
1524 	if (socket->socktype == MOS_SOCK_UNUSED) {
1525 		TRACE_API("Invalid socket id: %d\n", sockid);
1526 		errno = EBADF;
1527 		return -1;
1528 	}
1529 
1530 	if (socket->socktype == MOS_SOCK_PIPE) {
1531 		return PipeRead(mctx, sockid, buf, len);
1532 	}
1533 
1534 	if (socket->socktype != MOS_SOCK_STREAM) {
1535 		TRACE_API("Not an end socket. id: %d\n", sockid);
1536 		errno = ENOTSOCK;
1537 		return -1;
1538 	}
1539 
1540 	/* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
1541 	cur_stream = socket->stream;
1542 	if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf ||
1543 	    !(cur_stream->state >= TCP_ST_ESTABLISHED &&
1544 	      cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
1545 		errno = ENOTCONN;
1546 		return -1;
1547 	}
1548 
1549 	rcvvar = cur_stream->rcvvar;
1550 
1551 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1552 
1553 	/* if CLOSE_WAIT, return 0 if there is no payload */
1554 	if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
1555 		if (merged_len == 0)
1556 			return 0;
1557 	}
1558 
1559 	/* return EAGAIN if no receive buffer */
1560 	if (socket->opts & MTCP_NONBLOCK) {
1561 		if (merged_len == 0) {
1562 			errno = EAGAIN;
1563 			return -1;
1564 		}
1565 	}
1566 
1567 	SBUF_LOCK(&rcvvar->read_lock);
1568 
1569 	switch (flags) {
1570 	case 0:
1571 		ret = CopyToUser(mtcp, cur_stream, buf, len);
1572 		break;
1573 	case MSG_PEEK:
1574 		ret = PeekForUser(mtcp, cur_stream, buf, len);
1575 		break;
1576 	default:
1577 		SBUF_UNLOCK(&rcvvar->read_lock);
1578 		ret = -1;
1579 		errno = EINVAL;
1580 		return ret;
1581 	}
1582 
1583 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1584 	event_remaining = FALSE;
1585 	/* if there are remaining payload, generate EPOLLIN */
1586 	/* (may due to insufficient user buffer) */
1587 	if (socket->epoll & MOS_EPOLLIN) {
1588 		if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
1589 			event_remaining = TRUE;
1590 		}
1591 	}
1592 	/* if waiting for close, notify it if no remaining data */
1593 	if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
1594 	    merged_len == 0 && ret > 0) {
1595 		event_remaining = TRUE;
1596 	}
1597 
1598 	SBUF_UNLOCK(&rcvvar->read_lock);
1599 
1600 	if (event_remaining) {
1601 		if (socket->epoll) {
1602 			AddEpollEvent(mtcp->ep,
1603 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
1604 		}
1605 	}
1606 
1607 	TRACE_API("Stream %d: mtcp_recv() returning %d\n", cur_stream->id, ret);
1608 	return ret;
1609 }
1610 /*----------------------------------------------------------------------------*/
1611 inline ssize_t
1612 mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len)
1613 {
1614 	return mtcp_recv(mctx, sockid, buf, len, 0);
1615 }
1616 /*----------------------------------------------------------------------------*/
1617 ssize_t
1618 mtcp_readv(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV)
1619 {
1620 	mtcp_manager_t mtcp;
1621 	socket_map_t socket;
1622 	tcp_stream *cur_stream;
1623 	struct tcp_recv_vars *rcvvar;
1624 	int ret, bytes_read, i;
1625 	int event_remaining, merged_len;
1626 
1627 	mtcp = GetMTCPManager(mctx);
1628 	if (!mtcp) {
1629 		errno = EACCES;
1630 		return -1;
1631 	}
1632 
1633 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1634 		TRACE_API("Socket id %d out of range.\n", sockid);
1635 		errno = EBADF;
1636 		return -1;
1637 	}
1638 
1639 	socket = &mtcp->smap[sockid];
1640 	if (socket->socktype == MOS_SOCK_UNUSED) {
1641 		TRACE_API("Invalid socket id: %d\n", sockid);
1642 		errno = EBADF;
1643 		return -1;
1644 	}
1645 
1646 	if (socket->socktype != MOS_SOCK_STREAM) {
1647 		TRACE_API("Not an end socket. id: %d\n", sockid);
1648 		errno = ENOTSOCK;
1649 		return -1;
1650 	}
1651 
1652 	/* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
1653 	cur_stream = socket->stream;
1654 	if (!cur_stream || !cur_stream->rcvvar->rcvbuf ||
1655 			!(cur_stream->state >= TCP_ST_ESTABLISHED &&
1656 			  cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
1657 		errno = ENOTCONN;
1658 		return -1;
1659 	}
1660 
1661 	rcvvar = cur_stream->rcvvar;
1662 
1663 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1664 
1665 	/* if CLOSE_WAIT, return 0 if there is no payload */
1666 	if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
1667 		if (merged_len == 0)
1668 			return 0;
1669 	}
1670 
1671 	/* return EAGAIN if no receive buffer */
1672 	if (socket->opts & MTCP_NONBLOCK) {
1673 		if (merged_len == 0) {
1674 			errno = EAGAIN;
1675 			return -1;
1676 		}
1677 	}
1678 
1679 	SBUF_LOCK(&rcvvar->read_lock);
1680 
1681 	/* read and store the contents to the vectored buffers */
1682 	bytes_read = 0;
1683 	for (i = 0; i < numIOV; i++) {
1684 		if (iov[i].iov_len <= 0)
1685 			continue;
1686 
1687 		ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
1688 		if (ret <= 0)
1689 			break;
1690 
1691 		bytes_read += ret;
1692 
1693 		if (ret < iov[i].iov_len)
1694 			break;
1695 	}
1696 
1697 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1698 
1699 	event_remaining = FALSE;
1700 	/* if there are remaining payload, generate read event */
1701 	/* (may due to insufficient user buffer) */
1702 	if (socket->epoll & MOS_EPOLLIN) {
1703 		if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
1704 			event_remaining = TRUE;
1705 		}
1706 	}
1707 	/* if waiting for close, notify it if no remaining data */
1708 	if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
1709 			merged_len == 0 && bytes_read > 0) {
1710 		event_remaining = TRUE;
1711 	}
1712 
1713 	SBUF_UNLOCK(&rcvvar->read_lock);
1714 
1715 	if(event_remaining) {
1716 		if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) {
1717 			AddEpollEvent(mtcp->ep,
1718 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
1719 		}
1720 	}
1721 
1722 	TRACE_API("Stream %d: mtcp_readv() returning %d\n",
1723 			cur_stream->id, bytes_read);
1724 	return bytes_read;
1725 }
1726 /*----------------------------------------------------------------------------*/
1727 static inline int
1728 CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, const char *buf, int len)
1729 {
1730 	struct tcp_send_vars *sndvar = cur_stream->sndvar;
1731 	int sndlen;
1732 	int ret;
1733 
1734 	sndlen = MIN((int)sndvar->snd_wnd, len);
1735 	if (sndlen <= 0) {
1736 		errno = EAGAIN;
1737 		return -1;
1738 	}
1739 
1740 	/* allocate send buffer if not exist */
1741 	if (!sndvar->sndbuf) {
1742 		sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1);
1743 		if (!sndvar->sndbuf) {
1744 			cur_stream->close_reason = TCP_NO_MEM;
1745 			/* notification may not required due to -1 return */
1746 			errno = ENOMEM;
1747 			return -1;
1748 		}
1749 	}
1750 
1751 	ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen);
1752 	assert(ret == sndlen);
1753 	sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len;
1754 	if (ret <= 0) {
1755 		TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n",
1756 				ret, sndlen, sndvar->sndbuf->len);
1757 		errno = EAGAIN;
1758 		return -1;
1759 	}
1760 
1761 	if (sndvar->snd_wnd <= 0) {
1762 		TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n",
1763 				cur_stream->id, sndvar->snd_wnd);
1764 	}
1765 
1766 	return ret;
1767 }
1768 /*----------------------------------------------------------------------------*/
1769 ssize_t
1770 mtcp_write(mctx_t mctx, int sockid, const char *buf, size_t len)
1771 {
1772 	mtcp_manager_t mtcp;
1773 	socket_map_t socket;
1774 	tcp_stream *cur_stream;
1775 	struct tcp_send_vars *sndvar;
1776 	int ret;
1777 
1778 	mtcp = GetMTCPManager(mctx);
1779 	if (!mtcp) {
1780 		errno = EACCES;
1781 		return -1;
1782 	}
1783 
1784 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1785 		TRACE_API("Socket id %d out of range.\n", sockid);
1786 		errno = EBADF;
1787 		return -1;
1788 	}
1789 
1790 	socket = &mtcp->smap[sockid];
1791 	if (socket->socktype == MOS_SOCK_UNUSED) {
1792 		TRACE_API("Invalid socket id: %d\n", sockid);
1793 		errno = EBADF;
1794 		return -1;
1795 	}
1796 
1797 	if (socket->socktype == MOS_SOCK_PIPE) {
1798 		return PipeWrite(mctx, sockid, buf, len);
1799 	}
1800 
1801 	if (socket->socktype != MOS_SOCK_STREAM) {
1802 		TRACE_API("Not an end socket. id: %d\n", sockid);
1803 		errno = ENOTSOCK;
1804 		return -1;
1805 	}
1806 
1807 	cur_stream = socket->stream;
1808 	if (!cur_stream ||
1809 			!(cur_stream->state == TCP_ST_ESTABLISHED ||
1810 			  cur_stream->state == TCP_ST_CLOSE_WAIT)) {
1811 		errno = ENOTCONN;
1812 		return -1;
1813 	}
1814 
1815 	if (len <= 0) {
1816 		if (socket->opts & MTCP_NONBLOCK) {
1817 			errno = EAGAIN;
1818 			return -1;
1819 		} else {
1820 			return 0;
1821 		}
1822 	}
1823 
1824 	sndvar = cur_stream->sndvar;
1825 
1826 	SBUF_LOCK(&sndvar->write_lock);
1827 	ret = CopyFromUser(mtcp, cur_stream, buf, len);
1828 
1829 	SBUF_UNLOCK(&sndvar->write_lock);
1830 
1831 	if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
1832 		SQ_LOCK(&mtcp->ctx->sendq_lock);
1833 		sndvar->on_sendq = TRUE;
1834 		StreamEnqueue(mtcp->sendq, cur_stream);		/* this always success */
1835 		SQ_UNLOCK(&mtcp->ctx->sendq_lock);
1836 		mtcp->wakeup_flag = TRUE;
1837 	}
1838 
1839 	if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) {
1840 		ret = -1;
1841 		errno = EAGAIN;
1842 	}
1843 
1844 	/* if there are remaining sending buffer, generate write event */
1845 	if (sndvar->snd_wnd > 0) {
1846 		if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
1847 			AddEpollEvent(mtcp->ep,
1848 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
1849 		}
1850 	}
1851 
1852 	TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret);
1853 	return ret;
1854 }
1855 /*----------------------------------------------------------------------------*/
1856 ssize_t
1857 mtcp_writev(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV)
1858 {
1859 	mtcp_manager_t mtcp;
1860 	socket_map_t socket;
1861 	tcp_stream *cur_stream;
1862 	struct tcp_send_vars *sndvar;
1863 	int ret, to_write, i;
1864 
1865 	mtcp = GetMTCPManager(mctx);
1866 	if (!mtcp) {
1867 		errno = EACCES;
1868 		return -1;
1869 	}
1870 
1871 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1872 		TRACE_API("Socket id %d out of range.\n", sockid);
1873 		errno = EBADF;
1874 		return -1;
1875 	}
1876 
1877 	socket = &mtcp->smap[sockid];
1878 	if (socket->socktype == MOS_SOCK_UNUSED) {
1879 		TRACE_API("Invalid socket id: %d\n", sockid);
1880 		errno = EBADF;
1881 		return -1;
1882 	}
1883 
1884 	if (socket->socktype != MOS_SOCK_STREAM) {
1885 		TRACE_API("Not an end socket. id: %d\n", sockid);
1886 		errno = ENOTSOCK;
1887 		return -1;
1888 	}
1889 
1890 	cur_stream = socket->stream;
1891 	if (!cur_stream ||
1892 			!(cur_stream->state == TCP_ST_ESTABLISHED ||
1893 			  cur_stream->state == TCP_ST_CLOSE_WAIT)) {
1894 		errno = ENOTCONN;
1895 		return -1;
1896 	}
1897 
1898 	sndvar = cur_stream->sndvar;
1899 	SBUF_LOCK(&sndvar->write_lock);
1900 
1901 	/* write from the vectored buffers */
1902 	to_write = 0;
1903 	for (i = 0; i < numIOV; i++) {
1904 		if (iov[i].iov_len <= 0)
1905 			continue;
1906 
1907 		ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
1908 		if (ret <= 0)
1909 			break;
1910 
1911 		to_write += ret;
1912 
1913 		if (ret < iov[i].iov_len)
1914 			break;
1915 	}
1916 	SBUF_UNLOCK(&sndvar->write_lock);
1917 
1918 	if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
1919 		SQ_LOCK(&mtcp->ctx->sendq_lock);
1920 		sndvar->on_sendq = TRUE;
1921 		StreamEnqueue(mtcp->sendq, cur_stream);		/* this always success */
1922 		SQ_UNLOCK(&mtcp->ctx->sendq_lock);
1923 		mtcp->wakeup_flag = TRUE;
1924 	}
1925 
1926 	if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) {
1927 		to_write = -1;
1928 		errno = EAGAIN;
1929 	}
1930 
1931 	/* if there are remaining sending buffer, generate write event */
1932 	if (sndvar->snd_wnd > 0) {
1933 		if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
1934 			AddEpollEvent(mtcp->ep,
1935 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
1936 		}
1937 	}
1938 
1939 	TRACE_API("Stream %d: mtcp_writev() returning %d\n",
1940 			cur_stream->id, to_write);
1941 	return to_write;
1942 }
1943 /*----------------------------------------------------------------------------*/
1944 uint32_t
1945 mtcp_get_connection_cnt(mctx_t mctx)
1946 {
1947 	mtcp_manager_t mtcp;
1948 	mtcp = GetMTCPManager(mctx);
1949 	if (!mtcp) {
1950 		errno = EACCES;
1951 		return -1;
1952 	}
1953 
1954 	if (mtcp->num_msp > 0)
1955 		return mtcp->flow_cnt / 2;
1956 	else
1957 		return mtcp->flow_cnt;
1958 }
1959 /*----------------------------------------------------------------------------*/
1960