xref: /mOS-networking-stack/core/src/api.c (revision 91df013f)
1 #include <sys/queue.h>
2 #include <sys/ioctl.h>
3 #include <limits.h>
4 #include <unistd.h>
5 #include <assert.h>
6 #include <string.h>
7 
8 #ifdef DARWIN
9 #include <netinet/tcp.h>
10 #include <netinet/ip.h>
11 #include <netinet/if_ether.h>
12 #endif
13 
14 #include "mtcp.h"
15 #include "mtcp_api.h"
16 #include "tcp_in.h"
17 #include "tcp_stream.h"
18 #include "tcp_out.h"
19 #include "ip_out.h"
20 #include "eventpoll.h"
21 #include "pipe.h"
22 #include "fhash.h"
23 #include "addr_pool.h"
24 #include "rss.h"
25 #include "config.h"
26 #include "debug.h"
27 #include "eventpoll.h"
28 #include "mos_api.h"
29 #include "tcp_rb.h"
30 
31 #define MAX(a, b) ((a)>(b)?(a):(b))
32 #define MIN(a, b) ((a)<(b)?(a):(b))
33 
34 /*----------------------------------------------------------------------------*/
35 /** Stop monitoring the socket! (function prototype)
36  * @param [in] mctx: mtcp context
37  * @param [in] sock: monitoring stream socket id
38  * @param [in] side: side of monitoring (client side, server side or both)
39  *
40  * This function is now DEPRECATED and is only used within mOS core...
41  */
42 int
43 mtcp_cb_stop(mctx_t mctx, int sock, int side);
44 /*----------------------------------------------------------------------------*/
45 /** Reset the connection (send RST packets to both sides)
46  *  (We need to decide the API for this.)
47  */
48 //int
49 //mtcp_cb_reset(mctx_t mctx, int sock, int side);
50 /*----------------------------------------------------------------------------*/
51 inline mtcp_manager_t
52 GetMTCPManager(mctx_t mctx)
53 {
54 	if (!mctx) {
55 		errno = EACCES;
56 		return NULL;
57 	}
58 
59 	if (mctx->cpu < 0 || mctx->cpu >= num_cpus) {
60 		errno = EINVAL;
61 		return NULL;
62 	}
63 
64 	if (!g_mtcp[mctx->cpu] || g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) {
65 		errno = EPERM;
66 		return NULL;
67 	}
68 
69 	return g_mtcp[mctx->cpu];
70 }
71 /*----------------------------------------------------------------------------*/
72 static inline int
73 GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen)
74 {
75 	tcp_stream *cur_stream;
76 
77 	if (!socket->stream) {
78 		errno = EBADF;
79 		return -1;
80 	}
81 
82 	cur_stream = socket->stream;
83 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
84 		if (cur_stream->close_reason == TCP_TIMEDOUT ||
85 				cur_stream->close_reason == TCP_CONN_FAIL ||
86 				cur_stream->close_reason == TCP_CONN_LOST) {
87 			*(int *)optval = ETIMEDOUT;
88 			*optlen = sizeof(int);
89 
90 			return 0;
91 		}
92 	}
93 
94 	if (cur_stream->state == TCP_ST_CLOSE_WAIT ||
95 			cur_stream->state == TCP_ST_CLOSED_RSVD) {
96 		if (cur_stream->close_reason == TCP_RESET) {
97 			*(int *)optval = ECONNRESET;
98 			*optlen = sizeof(int);
99 
100 			return 0;
101 		}
102 	}
103 
104 	if (cur_stream->state == TCP_ST_SYN_SENT &&
105 	    errno == EINPROGRESS) {
106 		*(int *)optval = errno;
107 		*optlen = sizeof(int);
108 
109 		return -1;
110         }
111 
112 	/*
113 	 * `base case`: If socket sees no so_error, then
114 	 * this also means close_reason will always be
115 	 * TCP_NOT_CLOSED.
116 	 */
117 	if (cur_stream->close_reason == TCP_NOT_CLOSED) {
118 		*(int *)optval = 0;
119 		*optlen = sizeof(int);
120 
121 		return 0;
122 	}
123 
124 	errno = ENOSYS;
125 	return -1;
126 }
127 /*----------------------------------------------------------------------------*/
128 int
129 mtcp_getsockname(mctx_t mctx, int sockid, struct sockaddr *addr,
130 		 socklen_t *addrlen)
131 {
132 	mtcp_manager_t mtcp;
133 	socket_map_t socket;
134 
135 	mtcp = GetMTCPManager(mctx);
136 	if (!mtcp) {
137 		return -1;
138 	}
139 
140 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
141 		TRACE_API("Socket id %d out of range.\n", sockid);
142 		errno = EBADF;
143 		return -1;
144 	}
145 
146 	socket = &mtcp->smap[sockid];
147 	if (socket->socktype == MOS_SOCK_UNUSED) {
148 		TRACE_API("Invalid socket id: %d\n", sockid);
149 		errno = EBADF;
150 		return -1;
151 	}
152 
153 	if (*addrlen <= 0) {
154 		TRACE_API("Invalid addrlen: %d\n", *addrlen);
155 		errno = EINVAL;
156 		return -1;
157 	}
158 
159 	if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
160 	    socket->socktype != MOS_SOCK_STREAM) {
161 		TRACE_API("Invalid socket id: %d\n", sockid);
162 		errno = ENOTSOCK;
163 		return -1;
164 	}
165 
166 	*(struct sockaddr_in *)addr = socket->saddr;
167         *addrlen = sizeof(socket->saddr);
168 
169 	return 0;
170 }
171 /*----------------------------------------------------------------------------*/
172 int
173 mtcp_getsockopt(mctx_t mctx, int sockid, int level,
174 		int optname, void *optval, socklen_t *optlen)
175 {
176 	mtcp_manager_t mtcp;
177 	socket_map_t socket;
178 
179 	mtcp = GetMTCPManager(mctx);
180 	if (!mtcp) {
181 		errno = EACCES;
182 		return -1;
183 	}
184 
185 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
186 		TRACE_API("Socket id %d out of range.\n", sockid);
187 		errno = EBADF;
188 		return -1;
189 	}
190 
191 	switch (level) {
192 	case SOL_SOCKET:
193 		socket = &mtcp->smap[sockid];
194 		if (socket->socktype == MOS_SOCK_UNUSED) {
195 			TRACE_API("Invalid socket id: %d\n", sockid);
196 			errno = EBADF;
197 			return -1;
198 		}
199 
200 		if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
201 		    socket->socktype != MOS_SOCK_STREAM) {
202 			TRACE_API("Invalid socket id: %d\n", sockid);
203 			errno = ENOTSOCK;
204 			return -1;
205 		}
206 
207 		if (optname == SO_ERROR) {
208 			if (socket->socktype == MOS_SOCK_STREAM) {
209 				return GetSocketError(socket, optval, optlen);
210 			}
211 		}
212 		break;
213 	case SOL_MONSOCKET:
214 		/* check if the calling thread is in MOS context */
215 		if (mtcp->ctx->thread != pthread_self()) {
216 			errno = EPERM;
217 			return -1;
218 		}
219 		/*
220 		 * All options will only work for active
221 		 * monitor stream sockets
222 		 */
223 		socket = &mtcp->msmap[sockid];
224 		if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
225 			TRACE_API("Invalid socket id: %d\n", sockid);
226 			errno = ENOTSOCK;
227 			return -1;
228 		}
229 
230 		switch (optname) {
231 		case MOS_FRAGINFO_CLIBUF:
232 			return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen);
233 		case MOS_FRAGINFO_SVRBUF:
234 			return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen);
235 		case MOS_INFO_CLIBUF:
236 			return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen);
237 		case MOS_INFO_SVRBUF:
238 			return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen);
239 		case MOS_TCP_STATE_CLI:
240 			return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI,
241 							   optval, optlen);
242 		case MOS_TCP_STATE_SVR:
243 			return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR,
244 							   optval, optlen);
245 		case MOS_TIMESTAMP:
246 			return GetLastTimestamp(socket->monitor_stream->stream,
247 						(uint32_t *)optval,
248 						optlen);
249 		default:
250 		  TRACE_API("can't recognize the optname=%d\n", optname);
251 		  assert(0);
252 		}
253 		break;
254 	}
255 	errno = ENOSYS;
256 	return -1;
257 }
258 /*----------------------------------------------------------------------------*/
259 int
260 mtcp_setsockopt(mctx_t mctx, int sockid, int level,
261 		int optname, const void *optval, socklen_t optlen)
262 {
263 	mtcp_manager_t mtcp;
264 	socket_map_t socket;
265 	tcprb_t *rb;
266 
267 	mtcp = GetMTCPManager(mctx);
268 	if (!mtcp) {
269 		errno = EACCES;
270 		return -1;
271 	}
272 
273 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
274 		TRACE_API("Socket id %d out of range.\n", sockid);
275 		errno = EBADF;
276 		return -1;
277 	}
278 
279 	switch (level) {
280 	case SOL_SOCKET:
281 		socket = &mtcp->smap[sockid];
282 		if (socket->socktype == MOS_SOCK_UNUSED) {
283 			TRACE_API("Invalid socket id: %d\n", sockid);
284 			errno = EBADF;
285 			return -1;
286 		}
287 
288 		if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
289 		    socket->socktype != MOS_SOCK_STREAM) {
290 			TRACE_API("Invalid socket id: %d\n", sockid);
291 			errno = ENOTSOCK;
292 			return -1;
293 		}
294 		break;
295 	case SOL_MONSOCKET:
296 		socket = &mtcp->msmap[sockid];
297 		/*
298 		 * checking of calling thread to be in MOS context is
299 		 * disabled since both optnames can be called from
300 		 * `application' context (on passive sockets)
301 		 */
302 		/*
303 		 * if (mtcp->ctx->thread != pthread_self())
304 		 * return -1;
305 		 */
306 
307 		switch (optname) {
308 		case MOS_CLIBUF:
309 #if 0
310 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
311 				errno = EBADF;
312 				return -1;
313 			}
314 #endif
315 #ifdef DISABLE_DYN_RESIZE
316 			if (*(int *)optval != 0)
317 				return -1;
318 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
319 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
320 					socket->monitor_stream->stream->rcvvar->rcvbuf :
321 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
322 				if (rb) {
323 					tcprb_resize_meta(rb, 0);
324 					tcprb_resize(rb, 0);
325 				}
326 			}
327 			return DisableBuf(socket, MOS_SIDE_CLI);
328 #else
329 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
330 				socket->monitor_stream->stream->rcvvar->rcvbuf :
331 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
332 			if (tcprb_resize_meta(rb, *(int *)optval) < 0)
333 				return -1;
334 			return tcprb_resize(rb,
335 					(((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
336 #endif
337 		case MOS_SVRBUF:
338 #if 0
339 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
340 				errno = EBADF;
341 				return -1;
342 			}
343 #endif
344 #ifdef DISABLE_DYN_RESIZE
345 			if (*(int *)optval != 0)
346 				return -1;
347 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
348 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
349 					socket->monitor_stream->stream->rcvvar->rcvbuf :
350 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
351 				if (rb) {
352 					tcprb_resize_meta(rb, 0);
353 					tcprb_resize(rb, 0);
354 				}
355 			}
356 			return DisableBuf(socket, MOS_SIDE_SVR);
357 #else
358 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
359 				socket->monitor_stream->stream->rcvvar->rcvbuf :
360 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
361 			if (tcprb_resize_meta(rb, *(int *)optval) < 0)
362 				return -1;
363 			return tcprb_resize(rb,
364 					(((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
365 #endif
366 		case MOS_FRAG_CLIBUF:
367 #if 0
368 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
369 				errno = EBADF;
370 				return -1;
371 			}
372 #endif
373 #ifdef DISABLE_DYN_RESIZE
374 			if (*(int *)optval != 0)
375 				return -1;
376 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
377 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
378 					socket->monitor_stream->stream->rcvvar->rcvbuf :
379 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
380 				if (rb)
381 					tcprb_resize(rb, 0);
382 			}
383 			return 0;
384 #else
385 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
386 				socket->monitor_stream->stream->rcvvar->rcvbuf :
387 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
388 			if (rb->len == 0)
389 				return tcprb_resize_meta(rb, *(int *)optval);
390 			else
391 				return -1;
392 #endif
393 		case MOS_FRAG_SVRBUF:
394 #if 0
395 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
396 				errno = EBADF;
397 				return -1;
398 			}
399 #endif
400 #ifdef DISABLE_DYN_RESIZE
401 			if (*(int *)optval != 0)
402 				return -1;
403 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
404 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
405 					socket->monitor_stream->stream->rcvvar->rcvbuf :
406 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
407 				if (rb)
408 					tcprb_resize(rb, 0);
409 			}
410 			return 0;
411 #else
412 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
413 				socket->monitor_stream->stream->rcvvar->rcvbuf :
414 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
415 			if (rb->len == 0)
416 				return tcprb_resize_meta(rb, *(int *)optval);
417 			else
418 				return -1;
419 #endif
420 		case MOS_MONLEVEL:
421 #ifdef OLD_API
422 			assert(*(int *)optval == MOS_NO_CLIBUF ||
423 			       *(int *)optval == MOS_NO_SVRBUF);
424 			return DisableBuf(socket,
425 					  (*(int *)optval == MOS_NO_CLIBUF) ?
426 					  MOS_SIDE_CLI : MOS_SIDE_SVR);
427 #endif
428 		case MOS_SEQ_REMAP:
429 			return TcpSeqChange(socket,
430 					    (uint32_t)((seq_remap_info *)optval)->seq_off,
431 					    ((seq_remap_info *)optval)->side,
432 					    mtcp->pctx->p.seq);
433 		case MOS_STOP_MON:
434 			return mtcp_cb_stop(mctx, sockid, *(int *)optval);
435 		default:
436 			TRACE_API("invalid optname=%d\n", optname);
437 			assert(0);
438 		}
439 		break;
440 	}
441 
442 	errno = ENOSYS;
443 	return -1;
444 }
445 /*----------------------------------------------------------------------------*/
446 int
447 mtcp_setsock_nonblock(mctx_t mctx, int sockid)
448 {
449 	mtcp_manager_t mtcp;
450 
451 	mtcp = GetMTCPManager(mctx);
452 	if (!mtcp) {
453 		errno = EACCES;
454 		return -1;
455 	}
456 
457 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
458 		TRACE_API("Socket id %d out of range.\n", sockid);
459 		errno = EBADF;
460 		return -1;
461 	}
462 
463 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
464 		TRACE_API("Invalid socket id: %d\n", sockid);
465 		errno = EBADF;
466 		return -1;
467 	}
468 
469 	mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
470 
471 	return 0;
472 }
473 /*----------------------------------------------------------------------------*/
474 int
475 mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp)
476 {
477 	mtcp_manager_t mtcp;
478 	socket_map_t socket;
479 
480 	mtcp = GetMTCPManager(mctx);
481 	if (!mtcp) {
482 		errno = EACCES;
483 		return -1;
484 	}
485 
486 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
487 		TRACE_API("Socket id %d out of range.\n", sockid);
488 		errno = EBADF;
489 		return -1;
490 	}
491 
492 	/* only support stream socket */
493 	socket = &mtcp->smap[sockid];
494 
495 	if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
496 		socket->socktype != MOS_SOCK_STREAM) {
497 		TRACE_API("Invalid socket id: %d\n", sockid);
498 		errno = EBADF;
499 		return -1;
500 	}
501 
502 	if (!argp) {
503 		errno = EFAULT;
504 		return -1;
505 	}
506 
507 	if (request == FIONREAD) {
508 		tcp_stream *cur_stream;
509 		tcprb_t *rbuf;
510 
511 		cur_stream = socket->stream;
512 		if (!cur_stream) {
513 			errno = EBADF;
514 			return -1;
515 		}
516 
517 		rbuf = cur_stream->rcvvar->rcvbuf;
518 		*(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0;
519 
520 	} else if (request == FIONBIO) {
521 		/*
522 		 * sockets can only be set to blocking/non-blocking
523 		 * modes during initialization
524 		 */
525 		if ((*(int *)argp))
526 			mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
527 		else
528 			mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK;
529 	} else {
530 		errno = EINVAL;
531 		return -1;
532 	}
533 
534 	return 0;
535 }
536 /*----------------------------------------------------------------------------*/
537 static int
538 mtcp_monitor(mctx_t mctx, socket_map_t sock)
539 {
540 	mtcp_manager_t mtcp;
541 	struct mon_listener *monitor;
542 	int sockid = sock->id;
543 
544 	mtcp = GetMTCPManager(mctx);
545 	if (!mtcp) {
546 		errno = EACCES;
547 		return -1;
548 	}
549 
550 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
551 		TRACE_API("Socket id %d out of range.\n", sockid);
552 		errno = EBADF;
553 		return -1;
554 	}
555 
556 	if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) {
557 		TRACE_API("Invalid socket id: %d\n", sockid);
558 		errno = EBADF;
559 		return -1;
560 	}
561 
562 	if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM ||
563 	      mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) {
564 		TRACE_API("Not a monitor socket. id: %d\n", sockid);
565 		errno = ENOTSOCK;
566 		return -1;
567 	}
568 
569 	monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener));
570 	if (!monitor) {
571 		/* errno set from the malloc() */
572 		errno = ENOMEM;
573 		return -1;
574 	}
575 
576 	/* create a monitor-specific event queue */
577 	monitor->eq = CreateEventQueue(g_config.mos->max_concurrency);
578 	if (!monitor->eq) {
579 		TRACE_API("Can't create event queue (concurrency: %d) for "
580 			  "monitor read event registrations!\n",
581 			  g_config.mos->max_concurrency);
582 		free(monitor);
583 		errno = ENOMEM;
584 		return -1;
585 	}
586 
587 	/* set monitor-related basic parameters */
588 #ifndef NEWEV
589 	monitor->ude_id = UDE_OFFSET;
590 #endif
591 	monitor->socket = sock;
592 	monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL;
593 
594 	/* perform both sides monitoring by default */
595 	monitor->client_mon = monitor->server_mon = 1;
596 
597 	/* add monitor socket to the monitor list */
598 	TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link);
599 
600 	mtcp->msmap[sockid].monitor_listener = monitor;
601 
602 	return 0;
603 }
604 /*----------------------------------------------------------------------------*/
605 int
606 mtcp_socket(mctx_t mctx, int domain, int type, int protocol)
607 {
608 	mtcp_manager_t mtcp;
609 	socket_map_t socket;
610 
611 	mtcp = GetMTCPManager(mctx);
612 	if (!mtcp) {
613 		errno = EACCES;
614 		return -1;
615 	}
616 
617 	if (domain != AF_INET) {
618 		errno = EAFNOSUPPORT;
619 		return -1;
620 	}
621 
622 	if (type == SOCK_STREAM) {
623 		type = MOS_SOCK_STREAM;
624 	} else if (type == MOS_SOCK_MONITOR_STREAM ||
625 		   type == MOS_SOCK_MONITOR_RAW) {
626 		/* do nothing for the time being */
627 	} else {
628 		/* Not supported type */
629 		errno = EINVAL;
630 		return -1;
631 	}
632 
633 	socket = AllocateSocket(mctx, type);
634 	if (!socket) {
635 		errno = ENFILE;
636 		return -1;
637 	}
638 
639 	if (type == MOS_SOCK_MONITOR_STREAM ||
640 	    type == MOS_SOCK_MONITOR_RAW) {
641 		mtcp_manager_t mtcp = GetMTCPManager(mctx);
642 		if (!mtcp) {
643 			errno = EACCES;
644 			return -1;
645 		}
646 		mtcp_monitor(mctx, socket);
647 #ifdef NEWEV
648 		socket->monitor_listener->stree_dontcare = NULL;
649 		socket->monitor_listener->stree_pre_rcv = NULL;
650 		socket->monitor_listener->stree_post_snd = NULL;
651 #else
652 		InitEvB(mtcp, &socket->monitor_listener->dontcare_evb);
653 		InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb);
654 		InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb);
655 #endif
656 	}
657 
658 	return socket->id;
659 }
660 /*----------------------------------------------------------------------------*/
661 int
662 mtcp_bind(mctx_t mctx, int sockid,
663 		const struct sockaddr *addr, socklen_t addrlen)
664 {
665 	mtcp_manager_t mtcp;
666 	struct sockaddr_in *addr_in;
667 
668 	mtcp = GetMTCPManager(mctx);
669 	if (!mtcp) {
670 		errno = EACCES;
671 		return -1;
672 	}
673 
674 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
675 		TRACE_API("Socket id %d out of range.\n", sockid);
676 		errno = EBADF;
677 		return -1;
678 	}
679 
680 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
681 		TRACE_API("Invalid socket id: %d\n", sockid);
682 		errno = EBADF;
683 		return -1;
684 	}
685 
686 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM &&
687 			mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
688 		TRACE_API("Not a stream socket id: %d\n", sockid);
689 		errno = ENOTSOCK;
690 		return -1;
691 	}
692 
693 	if (!addr) {
694 		TRACE_API("Socket %d: empty address!\n", sockid);
695 		errno = EINVAL;
696 		return -1;
697 	}
698 
699 	if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) {
700 		TRACE_API("Socket %d: adress already bind for this socket.\n", sockid);
701 		errno = EINVAL;
702 		return -1;
703 	}
704 
705 	/* we only allow bind() for AF_INET address */
706 	if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
707 		TRACE_API("Socket %d: invalid argument!\n", sockid);
708 		errno = EINVAL;
709 		return -1;
710 	}
711 
712 	if (mtcp->listener) {
713 		TRACE_API("Address already bound!\n");
714 		errno = EINVAL;
715 		return -1;
716 	}
717 	addr_in = (struct sockaddr_in *)addr;
718 	mtcp->smap[sockid].saddr = *addr_in;
719 	mtcp->smap[sockid].opts |= MTCP_ADDR_BIND;
720 
721 	return 0;
722 }
723 /*----------------------------------------------------------------------------*/
724 int
725 mtcp_listen(mctx_t mctx, int sockid, int backlog)
726 {
727 	mtcp_manager_t mtcp;
728 	struct tcp_listener *listener;
729 
730 	mtcp = GetMTCPManager(mctx);
731 	if (!mtcp) {
732 		errno = EACCES;
733 		return -1;
734 	}
735 
736 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
737 		TRACE_API("Socket id %d out of range.\n", sockid);
738 		errno = EBADF;
739 		return -1;
740 	}
741 
742 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
743 		TRACE_API("Invalid socket id: %d\n", sockid);
744 		errno = EBADF;
745 		return -1;
746 	}
747 
748 	if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) {
749 		mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN;
750 	}
751 
752 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
753 		TRACE_API("Not a listening socket. id: %d\n", sockid);
754 		errno = ENOTSOCK;
755 		return -1;
756 	}
757 
758 	if (backlog <= 0 || backlog > g_config.mos->max_concurrency) {
759 		errno = EINVAL;
760 		return -1;
761 	}
762 
763 	listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener));
764 	if (!listener) {
765 		/* errno set from the malloc() */
766 		errno = ENOMEM;
767 		return -1;
768 	}
769 
770 	listener->sockid = sockid;
771 	listener->backlog = backlog;
772 	listener->socket = &mtcp->smap[sockid];
773 
774 	if (pthread_cond_init(&listener->accept_cond, NULL)) {
775 		perror("pthread_cond_init of ctx->accept_cond\n");
776 		/* errno set by pthread_cond_init() */
777 		return -1;
778 	}
779 	if (pthread_mutex_init(&listener->accept_lock, NULL)) {
780 		perror("pthread_mutex_init of ctx->accept_lock\n");
781 		/* errno set by pthread_mutex_init() */
782 		return -1;
783 	}
784 
785 	listener->acceptq = CreateStreamQueue(backlog);
786 	if (!listener->acceptq) {
787 		errno = ENOMEM;
788 		return -1;
789 	}
790 
791 	mtcp->smap[sockid].listener = listener;
792 	mtcp->listener = listener;
793 
794 	return 0;
795 }
796 /*----------------------------------------------------------------------------*/
797 int
798 mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen)
799 {
800 	mtcp_manager_t mtcp;
801 	struct tcp_listener *listener;
802 	socket_map_t socket;
803 	tcp_stream *accepted = NULL;
804 
805 	mtcp = GetMTCPManager(mctx);
806 	if (!mtcp) {
807 		errno = EACCES;
808 		return -1;
809 	}
810 
811 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
812 		TRACE_API("Socket id %d out of range.\n", sockid);
813 		errno = EBADF;
814 		return -1;
815 	}
816 
817 	/* requires listening socket */
818 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
819 		errno = EINVAL;
820 		return -1;
821 	}
822 
823 	listener = mtcp->smap[sockid].listener;
824 
825 	/* dequeue from the acceptq without lock first */
826 	/* if nothing there, acquire lock and cond_wait */
827 	accepted = StreamDequeue(listener->acceptq);
828 	if (!accepted) {
829 		if (listener->socket->opts & MTCP_NONBLOCK) {
830 			errno = EAGAIN;
831 			return -1;
832 
833 		} else {
834 			pthread_mutex_lock(&listener->accept_lock);
835 			while ((accepted = StreamDequeue(listener->acceptq)) == NULL) {
836 				pthread_cond_wait(&listener->accept_cond, &listener->accept_lock);
837 
838 				if (mtcp->ctx->done || mtcp->ctx->exit) {
839 					pthread_mutex_unlock(&listener->accept_lock);
840 					errno = EINTR;
841 					return -1;
842 				}
843 			}
844 			pthread_mutex_unlock(&listener->accept_lock);
845 		}
846 	}
847 
848 	if (!accepted) {
849 		TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n");
850 	}
851 
852 	if (!accepted->socket) {
853 		socket = AllocateSocket(mctx, MOS_SOCK_STREAM);
854 		if (!socket) {
855 			TRACE_ERROR("Failed to create new socket!\n");
856 			/* TODO: destroy the stream */
857 			errno = ENFILE;
858 			return -1;
859 		}
860 		socket->stream = accepted;
861 		accepted->socket = socket;
862 
863 		/* set socket addr parameters */
864 		socket->saddr.sin_family = AF_INET;
865 		socket->saddr.sin_port = accepted->dport;
866 		socket->saddr.sin_addr.s_addr = accepted->daddr;
867 
868 		/* if monitor is enabled, complete the socket assignment */
869 		if (socket->stream->pair_stream != NULL)
870 			socket->stream->pair_stream->socket = socket;
871 	}
872 
873 	if (!(listener->socket->epoll & MOS_EPOLLET) &&
874 	    !StreamQueueIsEmpty(listener->acceptq))
875 		AddEpollEvent(mtcp->ep,
876 			      USR_SHADOW_EVENT_QUEUE,
877 			      listener->socket, MOS_EPOLLIN);
878 
879 	TRACE_API("Stream %d accepted.\n", accepted->id);
880 
881 	if (addr && addrlen) {
882 		struct sockaddr_in *addr_in = (struct sockaddr_in *)addr;
883 		addr_in->sin_family = AF_INET;
884 		addr_in->sin_port = accepted->dport;
885 		addr_in->sin_addr.s_addr = accepted->daddr;
886 		*addrlen = sizeof(struct sockaddr_in);
887 	}
888 
889 	return accepted->socket->id;
890 }
891 /*----------------------------------------------------------------------------*/
892 int
893 mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr,
894 		in_addr_t daddr, in_addr_t dport)
895 {
896 	mtcp_manager_t mtcp;
897 	addr_pool_t ap;
898 
899 	mtcp = GetMTCPManager(mctx);
900 	if (!mtcp) {
901 		errno = EACCES;
902 		return -1;
903 	}
904 
905 	if (saddr_base == INADDR_ANY) {
906 		int nif_out;
907 
908 		/* for the INADDR_ANY, find the output interface for the destination
909 		   and set the saddr_base as the ip address of the output interface */
910 		nif_out = GetOutputInterface(daddr);
911 		saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr;
912 	}
913 
914 	ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus,
915 			saddr_base, num_addr, daddr, dport);
916 	if (!ap) {
917 		errno = ENOMEM;
918 		return -1;
919 	}
920 
921 	mtcp->ap = ap;
922 
923 	return 0;
924 }
925 /*----------------------------------------------------------------------------*/
926 int
927 eval_bpf_5tuple(struct sfbpf_program fcode,
928 				in_addr_t saddr, in_port_t sport,
929 				in_addr_t daddr, in_port_t dport) {
930 	uint8_t buf[TOTAL_TCP_HEADER_LEN];
931 	struct ethhdr *ethh;
932 	struct iphdr *iph;
933 	struct tcphdr *tcph;
934 
935 	ethh = (struct ethhdr *)buf;
936 	ethh->h_proto = htons(ETH_P_IP);
937 	iph = (struct iphdr *)(ethh + 1);
938 	iph->ihl = IP_HEADER_LEN >> 2;
939 	iph->version = 4;
940 	iph->tos = 0;
941 	iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN);
942 	iph->id = htons(0);
943 	iph->protocol = IPPROTO_TCP;
944 	iph->saddr = saddr;
945 	iph->daddr = daddr;
946 	iph->check = 0;
947 	tcph = (struct tcphdr *)(iph + 1);
948 	tcph->source = sport;
949 	tcph->dest = dport;
950 
951 	return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr),
952 						 TOTAL_TCP_HEADER_LEN);
953 }
954 /*----------------------------------------------------------------------------*/
955 int
956 mtcp_connect(mctx_t mctx, int sockid,
957 		const struct sockaddr *addr, socklen_t addrlen)
958 {
959 	mtcp_manager_t mtcp;
960 	socket_map_t socket;
961 	tcp_stream *cur_stream;
962 	struct sockaddr_in *addr_in;
963 	in_addr_t dip;
964 	in_port_t dport;
965 	int is_dyn_bound = FALSE;
966 	int ret;
967 	int cnt_match = 0;
968 	struct mon_listener *walk;
969 	struct sfbpf_program fcode;
970 
971 	cur_stream = NULL;
972 	mtcp = GetMTCPManager(mctx);
973 	if (!mtcp) {
974 		errno = EACCES;
975 		return -1;
976 	}
977 
978 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
979 		TRACE_API("Socket id %d out of range.\n", sockid);
980 		errno = EBADF;
981 		return -1;
982 	}
983 
984 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
985 		TRACE_API("Invalid socket id: %d\n", sockid);
986 		errno = EBADF;
987 		return -1;
988 	}
989 
990 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
991 		TRACE_API("Not an end socket. id: %d\n", sockid);
992 		errno = ENOTSOCK;
993 		return -1;
994 	}
995 
996 	if (!addr) {
997 		TRACE_API("Socket %d: empty address!\n", sockid);
998 		errno = EFAULT;
999 		return -1;
1000 	}
1001 
1002 	/* we only allow bind() for AF_INET address */
1003 	if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
1004 		TRACE_API("Socket %d: invalid argument!\n", sockid);
1005 		errno = EAFNOSUPPORT;
1006 		return -1;
1007 	}
1008 
1009 	socket = &mtcp->smap[sockid];
1010 	if (socket->stream) {
1011 		TRACE_API("Socket %d: stream already exist!\n", sockid);
1012 		if (socket->stream->state >= TCP_ST_ESTABLISHED) {
1013 			errno = EISCONN;
1014 		} else {
1015 			errno = EALREADY;
1016 		}
1017 		return -1;
1018 	}
1019 
1020 	addr_in = (struct sockaddr_in *)addr;
1021 	dip = addr_in->sin_addr.s_addr;
1022 	dport = addr_in->sin_port;
1023 
1024 	/* address binding */
1025 	if (socket->opts & MTCP_ADDR_BIND &&
1026 	    socket->saddr.sin_port != INPORT_ANY &&
1027 	    socket->saddr.sin_addr.s_addr != INADDR_ANY) {
1028 		int rss_core;
1029 
1030 		rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip,
1031 				socket->saddr.sin_port, dport, num_queues);
1032 
1033 		if (rss_core != mctx->cpu) {
1034 			errno = EINVAL;
1035 			return -1;
1036 		}
1037 	} else {
1038 		if (mtcp->ap) {
1039 			ret = FetchAddress(mtcp->ap,
1040 					mctx->cpu, num_queues, addr_in, &socket->saddr);
1041 		} else {
1042 			ret = FetchAddress(ap[GetOutputInterface(dip)],
1043 					   mctx->cpu, num_queues, addr_in, &socket->saddr);
1044 		}
1045 		if (ret < 0) {
1046 			errno = EAGAIN;
1047 			return -1;
1048 		}
1049 		socket->opts |= MTCP_ADDR_BIND;
1050 		is_dyn_bound = TRUE;
1051 	}
1052 
1053 	cnt_match = 0;
1054 	if (mtcp->num_msp > 0) {
1055 		TAILQ_FOREACH(walk, &mtcp->monitors, link) {
1056 			fcode = walk->stream_syn_fcode;
1057 			if (!(ISSET_BPFFILTER(fcode) &&
1058 				  eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr,
1059 								  socket->saddr.sin_port,
1060 								  dip, dport) == 0)) {
1061 				walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1
1062 				cnt_match++;
1063 			}
1064 		}
1065 	}
1066 
1067 	if (mtcp->num_msp > 0 && cnt_match > 0) {
1068 		/* 150820 dhkim: XXX: embedded mode is not verified */
1069 #if 1
1070 		cur_stream = CreateClientTCPStream(mtcp, socket,
1071 						 STREAM_TYPE(MOS_SOCK_STREAM) |
1072 						 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
1073 						 socket->saddr.sin_addr.s_addr,
1074 						 socket->saddr.sin_port, dip, dport, NULL);
1075 #else
1076 		cur_stream = CreateDualTCPStream(mtcp, socket,
1077 						 STREAM_TYPE(MOS_SOCK_STREAM) |
1078 						 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
1079 						 socket->saddr.sin_addr.s_addr,
1080 						 socket->saddr.sin_port, dip, dport, NULL);
1081 #endif
1082 	}
1083 	else
1084 		cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM),
1085 					     socket->saddr.sin_addr.s_addr,
1086 					     socket->saddr.sin_port, dip, dport, NULL);
1087 	if (!cur_stream) {
1088 		TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid);
1089 		errno = ENOMEM;
1090 		return -1;
1091 	}
1092 
1093 	if (is_dyn_bound)
1094 		cur_stream->is_bound_addr = TRUE;
1095 	cur_stream->sndvar->cwnd = 1;
1096 	cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10;
1097 	cur_stream->side = MOS_SIDE_CLI;
1098 	/* if monitor is enabled, update the pair stream side as well */
1099 	if (cur_stream->pair_stream) {
1100 		cur_stream->pair_stream->side = MOS_SIDE_SVR;
1101 		/*
1102 		 * if buffer management is off, then disable
1103 		 * monitoring tcp ring of server...
1104 		 * if there is even a single monitor asking for
1105 		 * buffer management, enable it (that's why the
1106 		 * need for the loop)
1107 		 */
1108 		cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF;
1109 		struct socket_map *walk;
1110 		SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
1111 			uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt;
1112 			if (bm > cur_stream->pair_stream->buffer_mgmt) {
1113 				cur_stream->pair_stream->buffer_mgmt = bm;
1114 				break;
1115 			}
1116 		} SOCKQ_FOREACH_END;
1117 	}
1118 
1119 	cur_stream->state = TCP_ST_SYN_SENT;
1120 	cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1121 
1122 	TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id);
1123 
1124 	SQ_LOCK(&mtcp->ctx->connect_lock);
1125 	ret = StreamEnqueue(mtcp->connectq, cur_stream);
1126 	SQ_UNLOCK(&mtcp->ctx->connect_lock);
1127 	mtcp->wakeup_flag = TRUE;
1128 	if (ret < 0) {
1129 		TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid);
1130 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1131 		StreamEnqueue(mtcp->destroyq, cur_stream);
1132 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1133 		errno = EAGAIN;
1134 		return -1;
1135 	}
1136 
1137 	/* if nonblocking socket, return EINPROGRESS */
1138 	if (socket->opts & MTCP_NONBLOCK) {
1139 		errno = EINPROGRESS;
1140 		return -1;
1141 
1142 	} else {
1143 		while (1) {
1144 			if (!cur_stream) {
1145 				TRACE_ERROR("STREAM DESTROYED\n");
1146 				errno = ETIMEDOUT;
1147 				return -1;
1148 			}
1149 			if (cur_stream->state > TCP_ST_ESTABLISHED) {
1150 				TRACE_ERROR("Socket %d: weird state %s\n",
1151 						sockid, TCPStateToString(cur_stream));
1152 				// TODO: how to handle this?
1153 				errno = ENOSYS;
1154 				return -1;
1155 			}
1156 
1157 			if (cur_stream->state == TCP_ST_ESTABLISHED) {
1158 				break;
1159 			}
1160 			usleep(1000);
1161 		}
1162 	}
1163 
1164 	return 0;
1165 }
1166 /*----------------------------------------------------------------------------*/
1167 static inline int
1168 CloseStreamSocket(mctx_t mctx, int sockid)
1169 {
1170 	mtcp_manager_t mtcp;
1171 	tcp_stream *cur_stream;
1172 	int ret;
1173 
1174 	mtcp = GetMTCPManager(mctx);
1175 	if (!mtcp) {
1176 		errno = EACCES;
1177 		return -1;
1178 	}
1179 
1180 	cur_stream = mtcp->smap[sockid].stream;
1181 	if (!cur_stream) {
1182 		TRACE_API("Socket %d: stream does not exist.\n", sockid);
1183 		errno = ENOTCONN;
1184 		return -1;
1185 	}
1186 
1187 	if (cur_stream->closed) {
1188 		TRACE_API("Socket %d (Stream %u): already closed stream\n",
1189 				sockid, cur_stream->id);
1190 		return 0;
1191 	}
1192 	cur_stream->closed = TRUE;
1193 
1194 	TRACE_API("Stream %d: closing the stream.\n", cur_stream->id);
1195 
1196 	/* 141029 dhkim: Check this! */
1197 	cur_stream->socket = NULL;
1198 
1199 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
1200 		TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n",
1201 				cur_stream->id);
1202 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1203 		StreamEnqueue(mtcp->destroyq, cur_stream);
1204 		mtcp->wakeup_flag = TRUE;
1205 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1206 		return 0;
1207 
1208 	} else if (cur_stream->state == TCP_ST_SYN_SENT) {
1209 #if 1
1210 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1211 		StreamEnqueue(mtcp->destroyq, cur_stream);
1212 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1213 		mtcp->wakeup_flag = TRUE;
1214 #endif
1215 		return -1;
1216 
1217 	} else if (cur_stream->state != TCP_ST_ESTABLISHED &&
1218 			cur_stream->state != TCP_ST_CLOSE_WAIT) {
1219 		TRACE_API("Stream %d at state %s\n",
1220 				cur_stream->id, TCPStateToString(cur_stream));
1221 		errno = EBADF;
1222 		return -1;
1223 	}
1224 
1225 	SQ_LOCK(&mtcp->ctx->close_lock);
1226 	cur_stream->sndvar->on_closeq = TRUE;
1227 	ret = StreamEnqueue(mtcp->closeq, cur_stream);
1228 	mtcp->wakeup_flag = TRUE;
1229 	SQ_UNLOCK(&mtcp->ctx->close_lock);
1230 
1231 	if (ret < 0) {
1232 		TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
1233 		errno = EAGAIN;
1234 		return -1;
1235 	}
1236 
1237 	return 0;
1238 }
1239 /*----------------------------------------------------------------------------*/
1240 static inline int
1241 CloseListeningSocket(mctx_t mctx, int sockid)
1242 {
1243 	mtcp_manager_t mtcp;
1244 	struct tcp_listener *listener;
1245 
1246 	mtcp = GetMTCPManager(mctx);
1247 	if (!mtcp) {
1248 		errno = EACCES;
1249 		return -1;
1250 	}
1251 
1252 	listener = mtcp->smap[sockid].listener;
1253 	if (!listener) {
1254 		errno = EINVAL;
1255 		return -1;
1256 	}
1257 
1258 	if (listener->acceptq) {
1259 		DestroyStreamQueue(listener->acceptq);
1260 		listener->acceptq = NULL;
1261 	}
1262 
1263 	pthread_mutex_lock(&listener->accept_lock);
1264 	pthread_cond_signal(&listener->accept_cond);
1265 	pthread_mutex_unlock(&listener->accept_lock);
1266 
1267 	pthread_cond_destroy(&listener->accept_cond);
1268 	pthread_mutex_destroy(&listener->accept_lock);
1269 
1270 	free(listener);
1271 	mtcp->smap[sockid].listener = NULL;
1272 
1273 	return 0;
1274 }
1275 /*----------------------------------------------------------------------------*/
1276 int
1277 mtcp_close(mctx_t mctx, int sockid)
1278 {
1279 	mtcp_manager_t mtcp;
1280 	int ret;
1281 
1282 	mtcp = GetMTCPManager(mctx);
1283 	if (!mtcp) {
1284 		errno = EACCES;
1285 		return -1;
1286 	}
1287 
1288 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1289 		TRACE_API("Socket id %d out of range.\n", sockid);
1290 		errno = EBADF;
1291 		return -1;
1292 	}
1293 
1294 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1295 		TRACE_API("Invalid socket id: %d\n", sockid);
1296 		errno = EBADF;
1297 		return -1;
1298 	}
1299 
1300 	TRACE_API("Socket %d: mtcp_close called.\n", sockid);
1301 
1302 	switch (mtcp->smap[sockid].socktype) {
1303 	case MOS_SOCK_STREAM:
1304 		ret = CloseStreamSocket(mctx, sockid);
1305 		break;
1306 
1307 	case MOS_SOCK_STREAM_LISTEN:
1308 		ret = CloseListeningSocket(mctx, sockid);
1309 		break;
1310 
1311 	case MOS_SOCK_EPOLL:
1312 		ret = CloseEpollSocket(mctx, sockid);
1313 		break;
1314 
1315 	case MOS_SOCK_PIPE:
1316 		ret = PipeClose(mctx, sockid);
1317 		break;
1318 
1319 	default:
1320 		errno = EINVAL;
1321 		ret = -1;
1322 		break;
1323 	}
1324 
1325 	FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
1326 
1327 	return ret;
1328 }
1329 /*----------------------------------------------------------------------------*/
1330 int
1331 mtcp_abort(mctx_t mctx, int sockid)
1332 {
1333 	mtcp_manager_t mtcp;
1334 	tcp_stream *cur_stream;
1335 	int ret;
1336 
1337 	mtcp = GetMTCPManager(mctx);
1338 	if (!mtcp) {
1339 		errno = EACCES;
1340 		return -1;
1341 	}
1342 
1343 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1344 		TRACE_API("Socket id %d out of range.\n", sockid);
1345 		errno = EBADF;
1346 		return -1;
1347 	}
1348 
1349 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1350 		TRACE_API("Invalid socket id: %d\n", sockid);
1351 		errno = EBADF;
1352 		return -1;
1353 	}
1354 
1355 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
1356 		TRACE_API("Not an end socket. id: %d\n", sockid);
1357 		errno = ENOTSOCK;
1358 		return -1;
1359 	}
1360 
1361 	cur_stream = mtcp->smap[sockid].stream;
1362 	if (!cur_stream) {
1363 		TRACE_API("Stream %d: does not exist.\n", sockid);
1364 		errno = ENOTCONN;
1365 		return -1;
1366 	}
1367 
1368 	TRACE_API("Socket %d: mtcp_abort()\n", sockid);
1369 
1370 	FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
1371 	cur_stream->socket = NULL;
1372 
1373 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
1374 		TRACE_API("Stream %d: connection already reset.\n", sockid);
1375 		return ERROR;
1376 
1377 	} else if (cur_stream->state == TCP_ST_SYN_SENT) {
1378 		/* TODO: this should notify event failure to all
1379 		   previous read() or write() calls */
1380 		cur_stream->state = TCP_ST_CLOSED_RSVD;
1381 		cur_stream->close_reason = TCP_ACTIVE_CLOSE;
1382 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1383 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1384 		StreamEnqueue(mtcp->destroyq, cur_stream);
1385 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1386 		mtcp->wakeup_flag = TRUE;
1387 		return 0;
1388 
1389 	} else if (cur_stream->state == TCP_ST_CLOSING ||
1390 			cur_stream->state == TCP_ST_LAST_ACK ||
1391 			cur_stream->state == TCP_ST_TIME_WAIT) {
1392 		cur_stream->state = TCP_ST_CLOSED_RSVD;
1393 		cur_stream->close_reason = TCP_ACTIVE_CLOSE;
1394 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1395 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1396 		StreamEnqueue(mtcp->destroyq, cur_stream);
1397 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1398 		mtcp->wakeup_flag = TRUE;
1399 		return 0;
1400 	}
1401 
1402 	/* the stream structure will be destroyed after sending RST */
1403 	if (cur_stream->sndvar->on_resetq) {
1404 		TRACE_ERROR("Stream %d: calling mtcp_abort() "
1405 				"when in reset queue.\n", sockid);
1406 		errno = ECONNRESET;
1407 		return -1;
1408 	}
1409 	SQ_LOCK(&mtcp->ctx->reset_lock);
1410 	cur_stream->sndvar->on_resetq = TRUE;
1411 	ret = StreamEnqueue(mtcp->resetq, cur_stream);
1412 	SQ_UNLOCK(&mtcp->ctx->reset_lock);
1413 	mtcp->wakeup_flag = TRUE;
1414 
1415 	if (ret < 0) {
1416 		TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
1417 		errno = EAGAIN;
1418 		return -1;
1419 	}
1420 
1421 	return 0;
1422 }
1423 /*----------------------------------------------------------------------------*/
1424 static inline int
1425 PeekForUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len)
1426 {
1427 	struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
1428 	int copylen;
1429 	tcprb_t *rb = rcvvar->rcvbuf;
1430 
1431 	if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) {
1432 		errno = EAGAIN;
1433 		return -1;
1434 	}
1435 
1436 	return copylen;
1437 }
1438 /*----------------------------------------------------------------------------*/
1439 static inline int
1440 CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len)
1441 {
1442 	struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
1443 	int copylen;
1444 	tcprb_t *rb = rcvvar->rcvbuf;
1445 	if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) {
1446 		errno = EAGAIN;
1447 		return -1;
1448 	}
1449 	tcprb_setpile(rb, rb->pile + copylen);
1450 
1451 	rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb);
1452 	//printf("rcv_wnd: %d\n", rcvvar->rcv_wnd);
1453 
1454 	/* Advertise newly freed receive buffer */
1455 	if (cur_stream->need_wnd_adv) {
1456 		if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) {
1457 			if (!cur_stream->sndvar->on_ackq) {
1458 				SQ_LOCK(&mtcp->ctx->ackq_lock);
1459 				cur_stream->sndvar->on_ackq = TRUE;
1460 				StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */
1461 				SQ_UNLOCK(&mtcp->ctx->ackq_lock);
1462 				cur_stream->need_wnd_adv = FALSE;
1463 				mtcp->wakeup_flag = TRUE;
1464 			}
1465 		}
1466 	}
1467 
1468 	return copylen;
1469 }
1470 /*----------------------------------------------------------------------------*/
1471 ssize_t
1472 mtcp_recv(mctx_t mctx, int sockid, char *buf, size_t len, int flags)
1473 {
1474 	mtcp_manager_t mtcp;
1475 	socket_map_t socket;
1476 	tcp_stream *cur_stream;
1477 	struct tcp_recv_vars *rcvvar;
1478 	int event_remaining, merged_len;
1479 	int ret;
1480 
1481 	mtcp = GetMTCPManager(mctx);
1482 	if (!mtcp) {
1483 		errno = EACCES;
1484 		return -1;
1485 	}
1486 
1487 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1488 		TRACE_API("Socket id %d out of range.\n", sockid);
1489 		errno = EBADF;
1490 		return -1;
1491 	}
1492 
1493 	socket = &mtcp->smap[sockid];
1494 	if (socket->socktype == MOS_SOCK_UNUSED) {
1495 		TRACE_API("Invalid socket id: %d\n", sockid);
1496 		errno = EBADF;
1497 		return -1;
1498 	}
1499 
1500 	if (socket->socktype == MOS_SOCK_PIPE) {
1501 		return PipeRead(mctx, sockid, buf, len);
1502 	}
1503 
1504 	if (socket->socktype != MOS_SOCK_STREAM) {
1505 		TRACE_API("Not an end socket. id: %d\n", sockid);
1506 		errno = ENOTSOCK;
1507 		return -1;
1508 	}
1509 
1510 	/* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
1511 	cur_stream = socket->stream;
1512 	if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf ||
1513 	    !(cur_stream->state >= TCP_ST_ESTABLISHED &&
1514 	      cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
1515 		errno = ENOTCONN;
1516 		return -1;
1517 	}
1518 
1519 	rcvvar = cur_stream->rcvvar;
1520 
1521 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1522 
1523 	/* if CLOSE_WAIT, return 0 if there is no payload */
1524 	if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
1525 		if (!rcvvar->rcvbuf)
1526 			return 0;
1527 
1528 		if (merged_len == 0)
1529 			return 0;
1530 	}
1531 
1532 	/* return EAGAIN if no receive buffer */
1533 	if (socket->opts & MTCP_NONBLOCK) {
1534 		if (!rcvvar->rcvbuf || merged_len == 0) {
1535 			errno = EAGAIN;
1536 			return -1;
1537 		}
1538 	}
1539 
1540 	SBUF_LOCK(&rcvvar->read_lock);
1541 
1542 	switch (flags) {
1543 	case 0:
1544 		ret = CopyToUser(mtcp, cur_stream, buf, len);
1545 		break;
1546 	case MSG_PEEK:
1547 		ret = PeekForUser(mtcp, cur_stream, buf, len);
1548 		break;
1549 	default:
1550 		SBUF_UNLOCK(&rcvvar->read_lock);
1551 		ret = -1;
1552 		errno = EINVAL;
1553 		return ret;
1554 	}
1555 
1556 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1557 	event_remaining = FALSE;
1558 	/* if there are remaining payload, generate EPOLLIN */
1559 	/* (may due to insufficient user buffer) */
1560 	if (socket->epoll & MOS_EPOLLIN) {
1561 		if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
1562 			event_remaining = TRUE;
1563 		}
1564 	}
1565 	/* if waiting for close, notify it if no remaining data */
1566 	if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
1567 	    merged_len == 0 && ret > 0) {
1568 		event_remaining = TRUE;
1569 	}
1570 
1571 	SBUF_UNLOCK(&rcvvar->read_lock);
1572 
1573 	if (event_remaining) {
1574 		if (socket->epoll) {
1575 			AddEpollEvent(mtcp->ep,
1576 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
1577 		}
1578 	}
1579 
1580 	TRACE_API("Stream %d: mtcp_recv() returning %d\n", cur_stream->id, ret);
1581 	return ret;
1582 }
1583 /*----------------------------------------------------------------------------*/
1584 inline ssize_t
1585 mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len)
1586 {
1587 	return mtcp_recv(mctx, sockid, buf, len, 0);
1588 }
1589 /*----------------------------------------------------------------------------*/
1590 ssize_t
1591 mtcp_readv(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV)
1592 {
1593 	mtcp_manager_t mtcp;
1594 	socket_map_t socket;
1595 	tcp_stream *cur_stream;
1596 	struct tcp_recv_vars *rcvvar;
1597 	int ret, bytes_read, i;
1598 	int event_remaining, merged_len;
1599 
1600 	mtcp = GetMTCPManager(mctx);
1601 	if (!mtcp) {
1602 		errno = EACCES;
1603 		return -1;
1604 	}
1605 
1606 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1607 		TRACE_API("Socket id %d out of range.\n", sockid);
1608 		errno = EBADF;
1609 		return -1;
1610 	}
1611 
1612 	socket = &mtcp->smap[sockid];
1613 	if (socket->socktype == MOS_SOCK_UNUSED) {
1614 		TRACE_API("Invalid socket id: %d\n", sockid);
1615 		errno = EBADF;
1616 		return -1;
1617 	}
1618 
1619 	if (socket->socktype != MOS_SOCK_STREAM) {
1620 		TRACE_API("Not an end socket. id: %d\n", sockid);
1621 		errno = ENOTSOCK;
1622 		return -1;
1623 	}
1624 
1625 	/* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
1626 	cur_stream = socket->stream;
1627 	if (!cur_stream ||
1628 			!(cur_stream->state >= TCP_ST_ESTABLISHED &&
1629 			  cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
1630 		errno = ENOTCONN;
1631 		return -1;
1632 	}
1633 
1634 	rcvvar = cur_stream->rcvvar;
1635 
1636 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1637 
1638 	/* if CLOSE_WAIT, return 0 if there is no payload */
1639 	if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
1640 		if (!rcvvar->rcvbuf)
1641 			return 0;
1642 
1643 		if (merged_len == 0)
1644 			return 0;
1645 	}
1646 
1647 	/* return EAGAIN if no receive buffer */
1648 	if (socket->opts & MTCP_NONBLOCK) {
1649 		if (!rcvvar->rcvbuf || merged_len == 0) {
1650 			errno = EAGAIN;
1651 			return -1;
1652 		}
1653 	}
1654 
1655 	SBUF_LOCK(&rcvvar->read_lock);
1656 
1657 	/* read and store the contents to the vectored buffers */
1658 	bytes_read = 0;
1659 	for (i = 0; i < numIOV; i++) {
1660 		if (iov[i].iov_len <= 0)
1661 			continue;
1662 
1663 		ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
1664 		if (ret <= 0)
1665 			break;
1666 
1667 		bytes_read += ret;
1668 
1669 		if (ret < iov[i].iov_len)
1670 			break;
1671 	}
1672 
1673 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1674 
1675 	event_remaining = FALSE;
1676 	/* if there are remaining payload, generate read event */
1677 	/* (may due to insufficient user buffer) */
1678 	if (socket->epoll & MOS_EPOLLIN) {
1679 		if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
1680 			event_remaining = TRUE;
1681 		}
1682 	}
1683 	/* if waiting for close, notify it if no remaining data */
1684 	if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
1685 			merged_len == 0 && bytes_read > 0) {
1686 		event_remaining = TRUE;
1687 	}
1688 
1689 	SBUF_UNLOCK(&rcvvar->read_lock);
1690 
1691 	if(event_remaining) {
1692 		if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) {
1693 			AddEpollEvent(mtcp->ep,
1694 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
1695 		}
1696 	}
1697 
1698 	TRACE_API("Stream %d: mtcp_readv() returning %d\n",
1699 			cur_stream->id, bytes_read);
1700 	return bytes_read;
1701 }
1702 /*----------------------------------------------------------------------------*/
1703 static inline int
1704 CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, const char *buf, int len)
1705 {
1706 	struct tcp_send_vars *sndvar = cur_stream->sndvar;
1707 	int sndlen;
1708 	int ret;
1709 
1710 	sndlen = MIN((int)sndvar->snd_wnd, len);
1711 	if (sndlen <= 0) {
1712 		errno = EAGAIN;
1713 		return -1;
1714 	}
1715 
1716 	/* allocate send buffer if not exist */
1717 	if (!sndvar->sndbuf) {
1718 		sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1);
1719 		if (!sndvar->sndbuf) {
1720 			cur_stream->close_reason = TCP_NO_MEM;
1721 			/* notification may not required due to -1 return */
1722 			errno = ENOMEM;
1723 			return -1;
1724 		}
1725 	}
1726 
1727 	ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen);
1728 	assert(ret == sndlen);
1729 	sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len;
1730 	if (ret <= 0) {
1731 		TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n",
1732 				ret, sndlen, sndvar->sndbuf->len);
1733 		errno = EAGAIN;
1734 		return -1;
1735 	}
1736 
1737 	if (sndvar->snd_wnd <= 0) {
1738 		TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n",
1739 				cur_stream->id, sndvar->snd_wnd);
1740 	}
1741 
1742 	return ret;
1743 }
1744 /*----------------------------------------------------------------------------*/
1745 ssize_t
1746 mtcp_write(mctx_t mctx, int sockid, const char *buf, size_t len)
1747 {
1748 	mtcp_manager_t mtcp;
1749 	socket_map_t socket;
1750 	tcp_stream *cur_stream;
1751 	struct tcp_send_vars *sndvar;
1752 	int ret;
1753 
1754 	mtcp = GetMTCPManager(mctx);
1755 	if (!mtcp) {
1756 		errno = EACCES;
1757 		return -1;
1758 	}
1759 
1760 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1761 		TRACE_API("Socket id %d out of range.\n", sockid);
1762 		errno = EBADF;
1763 		return -1;
1764 	}
1765 
1766 	socket = &mtcp->smap[sockid];
1767 	if (socket->socktype == MOS_SOCK_UNUSED) {
1768 		TRACE_API("Invalid socket id: %d\n", sockid);
1769 		errno = EBADF;
1770 		return -1;
1771 	}
1772 
1773 	if (socket->socktype == MOS_SOCK_PIPE) {
1774 		return PipeWrite(mctx, sockid, buf, len);
1775 	}
1776 
1777 	if (socket->socktype != MOS_SOCK_STREAM) {
1778 		TRACE_API("Not an end socket. id: %d\n", sockid);
1779 		errno = ENOTSOCK;
1780 		return -1;
1781 	}
1782 
1783 	cur_stream = socket->stream;
1784 	if (!cur_stream ||
1785 			!(cur_stream->state == TCP_ST_ESTABLISHED ||
1786 			  cur_stream->state == TCP_ST_CLOSE_WAIT)) {
1787 		errno = ENOTCONN;
1788 		return -1;
1789 	}
1790 
1791 	if (len <= 0) {
1792 		if (socket->opts & MTCP_NONBLOCK) {
1793 			errno = EAGAIN;
1794 			return -1;
1795 		} else {
1796 			return 0;
1797 		}
1798 	}
1799 
1800 	sndvar = cur_stream->sndvar;
1801 
1802 	SBUF_LOCK(&sndvar->write_lock);
1803 	ret = CopyFromUser(mtcp, cur_stream, buf, len);
1804 
1805 	SBUF_UNLOCK(&sndvar->write_lock);
1806 
1807 	if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
1808 		SQ_LOCK(&mtcp->ctx->sendq_lock);
1809 		sndvar->on_sendq = TRUE;
1810 		StreamEnqueue(mtcp->sendq, cur_stream);		/* this always success */
1811 		SQ_UNLOCK(&mtcp->ctx->sendq_lock);
1812 		mtcp->wakeup_flag = TRUE;
1813 	}
1814 
1815 	if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) {
1816 		ret = -1;
1817 		errno = EAGAIN;
1818 	}
1819 
1820 	/* if there are remaining sending buffer, generate write event */
1821 	if (sndvar->snd_wnd > 0) {
1822 		if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
1823 			AddEpollEvent(mtcp->ep,
1824 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
1825 		}
1826 	}
1827 
1828 	TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret);
1829 	return ret;
1830 }
1831 /*----------------------------------------------------------------------------*/
1832 ssize_t
1833 mtcp_writev(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV)
1834 {
1835 	mtcp_manager_t mtcp;
1836 	socket_map_t socket;
1837 	tcp_stream *cur_stream;
1838 	struct tcp_send_vars *sndvar;
1839 	int ret, to_write, i;
1840 
1841 	mtcp = GetMTCPManager(mctx);
1842 	if (!mtcp) {
1843 		errno = EACCES;
1844 		return -1;
1845 	}
1846 
1847 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1848 		TRACE_API("Socket id %d out of range.\n", sockid);
1849 		errno = EBADF;
1850 		return -1;
1851 	}
1852 
1853 	socket = &mtcp->smap[sockid];
1854 	if (socket->socktype == MOS_SOCK_UNUSED) {
1855 		TRACE_API("Invalid socket id: %d\n", sockid);
1856 		errno = EBADF;
1857 		return -1;
1858 	}
1859 
1860 	if (socket->socktype != MOS_SOCK_STREAM) {
1861 		TRACE_API("Not an end socket. id: %d\n", sockid);
1862 		errno = ENOTSOCK;
1863 		return -1;
1864 	}
1865 
1866 	cur_stream = socket->stream;
1867 	if (!cur_stream ||
1868 			!(cur_stream->state == TCP_ST_ESTABLISHED ||
1869 			  cur_stream->state == TCP_ST_CLOSE_WAIT)) {
1870 		errno = ENOTCONN;
1871 		return -1;
1872 	}
1873 
1874 	sndvar = cur_stream->sndvar;
1875 	SBUF_LOCK(&sndvar->write_lock);
1876 
1877 	/* write from the vectored buffers */
1878 	to_write = 0;
1879 	for (i = 0; i < numIOV; i++) {
1880 		if (iov[i].iov_len <= 0)
1881 			continue;
1882 
1883 		ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
1884 		if (ret <= 0)
1885 			break;
1886 
1887 		to_write += ret;
1888 
1889 		if (ret < iov[i].iov_len)
1890 			break;
1891 	}
1892 	SBUF_UNLOCK(&sndvar->write_lock);
1893 
1894 	if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
1895 		SQ_LOCK(&mtcp->ctx->sendq_lock);
1896 		sndvar->on_sendq = TRUE;
1897 		StreamEnqueue(mtcp->sendq, cur_stream);		/* this always success */
1898 		SQ_UNLOCK(&mtcp->ctx->sendq_lock);
1899 		mtcp->wakeup_flag = TRUE;
1900 	}
1901 
1902 	if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) {
1903 		to_write = -1;
1904 		errno = EAGAIN;
1905 	}
1906 
1907 	/* if there are remaining sending buffer, generate write event */
1908 	if (sndvar->snd_wnd > 0) {
1909 		if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
1910 			AddEpollEvent(mtcp->ep,
1911 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
1912 		}
1913 	}
1914 
1915 	TRACE_API("Stream %d: mtcp_writev() returning %d\n",
1916 			cur_stream->id, to_write);
1917 	return to_write;
1918 }
1919 /*----------------------------------------------------------------------------*/
1920 uint32_t
1921 mtcp_get_connection_cnt(mctx_t mctx)
1922 {
1923 	mtcp_manager_t mtcp;
1924 	mtcp = GetMTCPManager(mctx);
1925 	if (!mtcp) {
1926 		errno = EACCES;
1927 		return -1;
1928 	}
1929 
1930 	if (mtcp->num_msp > 0)
1931 		return mtcp->flow_cnt / 2;
1932 	else
1933 		return mtcp->flow_cnt;
1934 }
1935 /*----------------------------------------------------------------------------*/
1936