xref: /mOS-networking-stack/core/src/api.c (revision 76404edc)
1 #include <sys/queue.h>
2 #include <sys/ioctl.h>
3 #include <limits.h>
4 #include <unistd.h>
5 #include <assert.h>
6 #include <string.h>
7 
8 #ifdef DARWIN
9 #include <netinet/tcp.h>
10 #include <netinet/ip.h>
11 #include <netinet/if_ether.h>
12 #endif
13 
14 #include "mtcp.h"
15 #include "mtcp_api.h"
16 #include "tcp_in.h"
17 #include "tcp_stream.h"
18 #include "tcp_out.h"
19 #include "ip_out.h"
20 #include "eventpoll.h"
21 #include "pipe.h"
22 #include "fhash.h"
23 #include "addr_pool.h"
24 #include "rss.h"
25 #include "config.h"
26 #include "debug.h"
27 #include "eventpoll.h"
28 #include "mos_api.h"
29 #include "tcp_rb.h"
30 
31 #define MAX(a, b) ((a)>(b)?(a):(b))
32 #define MIN(a, b) ((a)<(b)?(a):(b))
33 
34 /*----------------------------------------------------------------------------*/
35 /** Stop monitoring the socket! (function prototype)
36  * @param [in] mctx: mtcp context
37  * @param [in] sock: monitoring stream socket id
38  * @param [in] side: side of monitoring (client side, server side or both)
39  *
40  * This function is now DEPRECATED and is only used within mOS core...
41  */
42 int
43 mtcp_cb_stop(mctx_t mctx, int sock, int side);
44 /*----------------------------------------------------------------------------*/
45 /** Reset the connection (send RST packets to both sides)
46  *  (We need to decide the API for this.)
47  */
48 //int
49 //mtcp_cb_reset(mctx_t mctx, int sock, int side);
50 /*----------------------------------------------------------------------------*/
51 inline mtcp_manager_t
52 GetMTCPManager(mctx_t mctx)
53 {
54 	if (!mctx) {
55 		errno = EACCES;
56 		return NULL;
57 	}
58 
59 	if (mctx->cpu < 0 || mctx->cpu >= num_cpus) {
60 		errno = EINVAL;
61 		return NULL;
62 	}
63 
64 	if (g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) {
65 		errno = EPERM;
66 		return NULL;
67 	}
68 
69 	return g_mtcp[mctx->cpu];
70 }
71 /*----------------------------------------------------------------------------*/
72 inline int
73 GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen)
74 {
75 	tcp_stream *cur_stream;
76 
77 	if (!socket->stream) {
78 		errno = EBADF;
79 		return -1;
80 	}
81 
82 	cur_stream = socket->stream;
83 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
84 		if (cur_stream->close_reason == TCP_TIMEDOUT ||
85 				cur_stream->close_reason == TCP_CONN_FAIL ||
86 				cur_stream->close_reason == TCP_CONN_LOST) {
87 			*(int *)optval = ETIMEDOUT;
88 			*optlen = sizeof(int);
89 
90 			return 0;
91 		}
92 	}
93 
94 	if (cur_stream->state == TCP_ST_CLOSE_WAIT ||
95 			cur_stream->state == TCP_ST_CLOSED_RSVD) {
96 		if (cur_stream->close_reason == TCP_RESET) {
97 			*(int *)optval = ECONNRESET;
98 			*optlen = sizeof(int);
99 
100 			return 0;
101 		}
102 	}
103 
104 	errno = ENOSYS;
105 	return -1;
106 }
107 /*----------------------------------------------------------------------------*/
108 int
109 mtcp_getsockopt(mctx_t mctx, int sockid, int level,
110 		int optname, void *optval, socklen_t *optlen)
111 {
112 	mtcp_manager_t mtcp;
113 	socket_map_t socket;
114 
115 	mtcp = GetMTCPManager(mctx);
116 	if (!mtcp) {
117 		errno = EACCES;
118 		return -1;
119 	}
120 
121 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
122 		TRACE_API("Socket id %d out of range.\n", sockid);
123 		errno = EBADF;
124 		return -1;
125 	}
126 
127 	switch (level) {
128 	case SOL_SOCKET:
129 		socket = &mtcp->smap[sockid];
130 		if (socket->socktype == MOS_SOCK_UNUSED) {
131 			TRACE_API("Invalid socket id: %d\n", sockid);
132 			errno = EBADF;
133 			return -1;
134 		}
135 
136 		if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
137 		    socket->socktype != MOS_SOCK_STREAM) {
138 			TRACE_API("Invalid socket id: %d\n", sockid);
139 			errno = ENOTSOCK;
140 			return -1;
141 		}
142 
143 		if (optname == SO_ERROR) {
144 			if (socket->socktype == MOS_SOCK_STREAM) {
145 				return GetSocketError(socket, optval, optlen);
146 			}
147 		}
148 		break;
149 	case SOL_MONSOCKET:
150 		/* check if the calling thread is in MOS context */
151 		if (mtcp->ctx->thread != pthread_self()) {
152 			errno = EPERM;
153 			return -1;
154 		}
155 		/*
156 		 * All options will only work for active
157 		 * monitor stream sockets
158 		 */
159 		socket = &mtcp->msmap[sockid];
160 		if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
161 			TRACE_API("Invalid socket id: %d\n", sockid);
162 			errno = ENOTSOCK;
163 			return -1;
164 		}
165 
166 		switch (optname) {
167 		case MOS_FRAGINFO_CLIBUF:
168 			return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen);
169 		case MOS_FRAGINFO_SVRBUF:
170 			return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen);
171 		case MOS_INFO_CLIBUF:
172 			return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen);
173 		case MOS_INFO_SVRBUF:
174 			return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen);
175 		case MOS_TCP_STATE_CLI:
176 			return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI,
177 							   optval, optlen);
178 		case MOS_TCP_STATE_SVR:
179 			return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR,
180 							   optval, optlen);
181 		case MOS_TIMESTAMP:
182 			return GetLastTimestamp(socket->monitor_stream->stream,
183 						(uint32_t *)optval,
184 						optlen);
185 		default:
186 		  TRACE_API("can't recognize the optname=%d\n", optname);
187 		  assert(0);
188 		}
189 		break;
190 	}
191 	errno = ENOSYS;
192 	return -1;
193 }
194 /*----------------------------------------------------------------------------*/
195 int
196 mtcp_setsockopt(mctx_t mctx, int sockid, int level,
197 		int optname, const void *optval, socklen_t optlen)
198 {
199 	mtcp_manager_t mtcp;
200 	socket_map_t socket;
201 	tcprb_t *rb;
202 
203 	mtcp = GetMTCPManager(mctx);
204 	if (!mtcp) {
205 		errno = EACCES;
206 		return -1;
207 	}
208 
209 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
210 		TRACE_API("Socket id %d out of range.\n", sockid);
211 		errno = EBADF;
212 		return -1;
213 	}
214 
215 	switch (level) {
216 	case SOL_SOCKET:
217 		socket = &mtcp->smap[sockid];
218 		if (socket->socktype == MOS_SOCK_UNUSED) {
219 			TRACE_API("Invalid socket id: %d\n", sockid);
220 			errno = EBADF;
221 			return -1;
222 		}
223 
224 		if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
225 		    socket->socktype != MOS_SOCK_STREAM) {
226 			TRACE_API("Invalid socket id: %d\n", sockid);
227 			errno = ENOTSOCK;
228 			return -1;
229 		}
230 		break;
231 	case SOL_MONSOCKET:
232 		socket = &mtcp->msmap[sockid];
233 		/*
234 		 * checking of calling thread to be in MOS context is
235 		 * disabled since both optnames can be called from
236 		 * `application' context (on passive sockets)
237 		 */
238 		/*
239 		 * if (mtcp->ctx->thread != pthread_self())
240 		 * return -1;
241 		 */
242 
243 		switch (optname) {
244 		case MOS_CLIBUF:
245 #if 0
246 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
247 				errno = EBADF;
248 				return -1;
249 			}
250 #endif
251 #ifdef NEWRB
252 #ifdef DISABLE_DYN_RESIZE
253 			if (*(int *)optval != 0)
254 				return -1;
255 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
256 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
257 					socket->monitor_stream->stream->rcvvar->rcvbuf :
258 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
259 				if (rb) {
260 					tcprb_resize_meta(rb, 0);
261 					tcprb_resize(rb, 0);
262 				}
263 			}
264 			return DisableBuf(socket, MOS_SIDE_CLI);
265 #else
266 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
267 				socket->monitor_stream->stream->rcvvar->rcvbuf :
268 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
269 			if (tcprb_resize_meta(rb, *(int *)optval) < 0)
270 				return -1;
271 			return tcprb_resize(rb,
272 					(((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
273 #endif
274 #else
275 			errno = EBADF;
276 			return -1;
277 #endif
278 		case MOS_SVRBUF:
279 #if 0
280 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
281 				errno = EBADF;
282 				return -1;
283 			}
284 #endif
285 #ifdef NEWRB
286 #ifdef DISABLE_DYN_RESIZE
287 			if (*(int *)optval != 0)
288 				return -1;
289 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
290 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
291 					socket->monitor_stream->stream->rcvvar->rcvbuf :
292 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
293 				if (rb) {
294 					tcprb_resize_meta(rb, 0);
295 					tcprb_resize(rb, 0);
296 				}
297 			}
298 			return DisableBuf(socket, MOS_SIDE_SVR);
299 #else
300 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
301 				socket->monitor_stream->stream->rcvvar->rcvbuf :
302 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
303 			if (tcprb_resize_meta(rb, *(int *)optval) < 0)
304 				return -1;
305 			return tcprb_resize(rb,
306 					(((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
307 #endif
308 #else
309 			errno = EBADF;
310 			return -1;
311 #endif
312 		case MOS_FRAG_CLIBUF:
313 #if 0
314 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
315 				errno = EBADF;
316 				return -1;
317 			}
318 #endif
319 #ifdef NEWRB
320 #ifdef DISABLE_DYN_RESIZE
321 			if (*(int *)optval != 0)
322 				return -1;
323 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
324 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
325 					socket->monitor_stream->stream->rcvvar->rcvbuf :
326 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
327 				if (rb)
328 					tcprb_resize(rb, 0);
329 			}
330 			return 0;
331 #else
332 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
333 				socket->monitor_stream->stream->rcvvar->rcvbuf :
334 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
335 			if (rb->len == 0)
336 				return tcprb_resize_meta(rb, *(int *)optval);
337 			else
338 				return -1;
339 #endif
340 #else
341 			errno = EBADF;
342 			return -1;
343 #endif
344 		case MOS_FRAG_SVRBUF:
345 #if 0
346 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
347 				errno = EBADF;
348 				return -1;
349 			}
350 #endif
351 #ifdef NEWRB
352 #ifdef DISABLE_DYN_RESIZE
353 			if (*(int *)optval != 0)
354 				return -1;
355 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
356 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
357 					socket->monitor_stream->stream->rcvvar->rcvbuf :
358 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
359 				if (rb)
360 					tcprb_resize(rb, 0);
361 			}
362 			return 0;
363 #else
364 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
365 				socket->monitor_stream->stream->rcvvar->rcvbuf :
366 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
367 			if (rb->len == 0)
368 				return tcprb_resize_meta(rb, *(int *)optval);
369 			else
370 				return -1;
371 #endif
372 #else
373 			errno = EBADF;
374 			return -1;
375 #endif
376 		case MOS_MONLEVEL:
377 #ifdef OLD_API
378 			assert(*(int *)optval == MOS_NO_CLIBUF ||
379 			       *(int *)optval == MOS_NO_SVRBUF);
380 			return DisableBuf(socket,
381 					  (*(int *)optval == MOS_NO_CLIBUF) ?
382 					  MOS_SIDE_CLI : MOS_SIDE_SVR);
383 #endif
384 		case MOS_SEQ_REMAP:
385 			return TcpSeqChange(socket,
386 					    (uint32_t)((seq_remap_info *)optval)->seq_off,
387 					    ((seq_remap_info *)optval)->side,
388 					    mtcp->pctx->p.seq);
389 		case MOS_STOP_MON:
390 			return mtcp_cb_stop(mctx, sockid, *(int *)optval);
391 		default:
392 			TRACE_API("invalid optname=%d\n", optname);
393 			assert(0);
394 		}
395 		break;
396 	}
397 
398 	errno = ENOSYS;
399 	return -1;
400 }
401 /*----------------------------------------------------------------------------*/
402 int
403 mtcp_setsock_nonblock(mctx_t mctx, int sockid)
404 {
405 	mtcp_manager_t mtcp;
406 
407 	mtcp = GetMTCPManager(mctx);
408 	if (!mtcp) {
409 		errno = EACCES;
410 		return -1;
411 	}
412 
413 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
414 		TRACE_API("Socket id %d out of range.\n", sockid);
415 		errno = EBADF;
416 		return -1;
417 	}
418 
419 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
420 		TRACE_API("Invalid socket id: %d\n", sockid);
421 		errno = EBADF;
422 		return -1;
423 	}
424 
425 	mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
426 
427 	return 0;
428 }
429 /*----------------------------------------------------------------------------*/
430 int
431 mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp)
432 {
433 	mtcp_manager_t mtcp;
434 	socket_map_t socket;
435 
436 	mtcp = GetMTCPManager(mctx);
437 	if (!mtcp) {
438 		errno = EACCES;
439 		return -1;
440 	}
441 
442 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
443 		TRACE_API("Socket id %d out of range.\n", sockid);
444 		errno = EBADF;
445 		return -1;
446 	}
447 
448 	/* only support stream socket */
449 	socket = &mtcp->smap[sockid];
450 
451 	if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
452 		socket->socktype != MOS_SOCK_STREAM) {
453 		TRACE_API("Invalid socket id: %d\n", sockid);
454 		errno = EBADF;
455 		return -1;
456 	}
457 
458 	if (!argp) {
459 		errno = EFAULT;
460 		return -1;
461 	}
462 
463 	if (request == FIONREAD) {
464 		tcp_stream *cur_stream;
465 #ifdef NEWRB
466 		tcprb_t *rbuf;
467 #else
468 		struct tcp_ring_buffer *rbuf;
469 #endif
470 
471 		cur_stream = socket->stream;
472 		if (!cur_stream) {
473 			errno = EBADF;
474 			return -1;
475 		}
476 
477 		rbuf = cur_stream->rcvvar->rcvbuf;
478 #ifdef NEWRB
479 		*(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0;
480 #else
481 		*(int *)argp = (rbuf) ? rbuf->merged_len : 0;
482 #endif
483 
484 	} else if (request == FIONBIO) {
485 		/*
486 		 * sockets can only be set to blocking/non-blocking
487 		 * modes during initialization
488 		 */
489 		if ((*(int *)argp))
490 			mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
491 		else
492 			mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK;
493 	} else {
494 		errno = EINVAL;
495 		return -1;
496 	}
497 
498 	return 0;
499 }
500 /*----------------------------------------------------------------------------*/
501 static int
502 mtcp_monitor(mctx_t mctx, socket_map_t sock)
503 {
504 	mtcp_manager_t mtcp;
505 	struct mon_listener *monitor;
506 	int sockid = sock->id;
507 
508 	mtcp = GetMTCPManager(mctx);
509 	if (!mtcp) {
510 		errno = EACCES;
511 		return -1;
512 	}
513 
514 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
515 		TRACE_API("Socket id %d out of range.\n", sockid);
516 		errno = EBADF;
517 		return -1;
518 	}
519 
520 	if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) {
521 		TRACE_API("Invalid socket id: %d\n", sockid);
522 		errno = EBADF;
523 		return -1;
524 	}
525 
526 	if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM ||
527 	      mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) {
528 		TRACE_API("Not a monitor socket. id: %d\n", sockid);
529 		errno = ENOTSOCK;
530 		return -1;
531 	}
532 
533 	monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener));
534 	if (!monitor) {
535 		/* errno set from the malloc() */
536 		errno = ENOMEM;
537 		return -1;
538 	}
539 
540 	/* create a monitor-specific event queue */
541 	monitor->eq = CreateEventQueue(g_config.mos->max_concurrency);
542 	if (!monitor->eq) {
543 		TRACE_API("Can't create event queue (concurrency: %d) for "
544 			  "monitor read event registrations!\n",
545 			  g_config.mos->max_concurrency);
546 		free(monitor);
547 		errno = ENOMEM;
548 		return -1;
549 	}
550 
551 	/* set monitor-related basic parameters */
552 #ifndef NEWEV
553 	monitor->ude_id = UDE_OFFSET;
554 #endif
555 	monitor->socket = sock;
556 #ifdef NEWRB
557 	monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL;
558 #else
559 	monitor->client_buf_mgmt = monitor->server_buf_mgmt = 1;
560 #endif
561 
562 	/* perform both sides monitoring by default */
563 	monitor->client_mon = monitor->server_mon = 1;
564 
565 	/* add monitor socket to the monitor list */
566 	TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link);
567 
568 	mtcp->msmap[sockid].monitor_listener = monitor;
569 
570 	return 0;
571 }
572 /*----------------------------------------------------------------------------*/
573 int
574 mtcp_socket(mctx_t mctx, int domain, int type, int protocol)
575 {
576 	mtcp_manager_t mtcp;
577 	socket_map_t socket;
578 
579 	mtcp = GetMTCPManager(mctx);
580 	if (!mtcp) {
581 		errno = EACCES;
582 		return -1;
583 	}
584 
585 	if (domain != AF_INET) {
586 		errno = EAFNOSUPPORT;
587 		return -1;
588 	}
589 
590 	if (type == SOCK_STREAM) {
591 		type = MOS_SOCK_STREAM;
592 	} else if (type == MOS_SOCK_MONITOR_STREAM ||
593 		   type == MOS_SOCK_MONITOR_RAW) {
594 		/* do nothing for the time being */
595 	} else {
596 		/* Not supported type */
597 		errno = EINVAL;
598 		return -1;
599 	}
600 
601 	socket = AllocateSocket(mctx, type);
602 	if (!socket) {
603 		errno = ENFILE;
604 		return -1;
605 	}
606 
607 	if (type == MOS_SOCK_MONITOR_STREAM ||
608 	    type == MOS_SOCK_MONITOR_RAW) {
609 		mtcp_manager_t mtcp = GetMTCPManager(mctx);
610 		if (!mtcp) {
611 			errno = EACCES;
612 			return -1;
613 		}
614 		mtcp_monitor(mctx, socket);
615 #ifdef NEWEV
616 		socket->monitor_listener->stree_dontcare = NULL;
617 		socket->monitor_listener->stree_pre_rcv = NULL;
618 		socket->monitor_listener->stree_post_snd = NULL;
619 #else
620 		InitEvB(mtcp, &socket->monitor_listener->dontcare_evb);
621 		InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb);
622 		InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb);
623 #endif
624 	}
625 
626 	return socket->id;
627 }
628 /*----------------------------------------------------------------------------*/
629 int
630 mtcp_bind(mctx_t mctx, int sockid,
631 		const struct sockaddr *addr, socklen_t addrlen)
632 {
633 	mtcp_manager_t mtcp;
634 	struct sockaddr_in *addr_in;
635 
636 	mtcp = GetMTCPManager(mctx);
637 	if (!mtcp) {
638 		errno = EACCES;
639 		return -1;
640 	}
641 
642 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
643 		TRACE_API("Socket id %d out of range.\n", sockid);
644 		errno = EBADF;
645 		return -1;
646 	}
647 
648 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
649 		TRACE_API("Invalid socket id: %d\n", sockid);
650 		errno = EBADF;
651 		return -1;
652 	}
653 
654 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM &&
655 			mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
656 		TRACE_API("Not a stream socket id: %d\n", sockid);
657 		errno = ENOTSOCK;
658 		return -1;
659 	}
660 
661 	if (!addr) {
662 		TRACE_API("Socket %d: empty address!\n", sockid);
663 		errno = EINVAL;
664 		return -1;
665 	}
666 
667 	if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) {
668 		TRACE_API("Socket %d: adress already bind for this socket.\n", sockid);
669 		errno = EINVAL;
670 		return -1;
671 	}
672 
673 	/* we only allow bind() for AF_INET address */
674 	if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
675 		TRACE_API("Socket %d: invalid argument!\n", sockid);
676 		errno = EINVAL;
677 		return -1;
678 	}
679 
680 	if (mtcp->listener) {
681 		TRACE_API("Address already bound!\n");
682 		errno = EINVAL;
683 		return -1;
684 	}
685 	addr_in = (struct sockaddr_in *)addr;
686 	mtcp->smap[sockid].saddr = *addr_in;
687 	mtcp->smap[sockid].opts |= MTCP_ADDR_BIND;
688 
689 	return 0;
690 }
691 /*----------------------------------------------------------------------------*/
692 int
693 mtcp_listen(mctx_t mctx, int sockid, int backlog)
694 {
695 	mtcp_manager_t mtcp;
696 	struct tcp_listener *listener;
697 
698 	mtcp = GetMTCPManager(mctx);
699 	if (!mtcp) {
700 		errno = EACCES;
701 		return -1;
702 	}
703 
704 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
705 		TRACE_API("Socket id %d out of range.\n", sockid);
706 		errno = EBADF;
707 		return -1;
708 	}
709 
710 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
711 		TRACE_API("Invalid socket id: %d\n", sockid);
712 		errno = EBADF;
713 		return -1;
714 	}
715 
716 	if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) {
717 		mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN;
718 	}
719 
720 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
721 		TRACE_API("Not a listening socket. id: %d\n", sockid);
722 		errno = ENOTSOCK;
723 		return -1;
724 	}
725 
726 	if (backlog <= 0 || backlog > g_config.mos->max_concurrency) {
727 		errno = EINVAL;
728 		return -1;
729 	}
730 
731 	listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener));
732 	if (!listener) {
733 		/* errno set from the malloc() */
734 		errno = ENOMEM;
735 		return -1;
736 	}
737 
738 	listener->sockid = sockid;
739 	listener->backlog = backlog;
740 	listener->socket = &mtcp->smap[sockid];
741 
742 	if (pthread_cond_init(&listener->accept_cond, NULL)) {
743 		perror("pthread_cond_init of ctx->accept_cond\n");
744 		/* errno set by pthread_cond_init() */
745 		return -1;
746 	}
747 	if (pthread_mutex_init(&listener->accept_lock, NULL)) {
748 		perror("pthread_mutex_init of ctx->accept_lock\n");
749 		/* errno set by pthread_mutex_init() */
750 		return -1;
751 	}
752 
753 	listener->acceptq = CreateStreamQueue(backlog);
754 	if (!listener->acceptq) {
755 		errno = ENOMEM;
756 		return -1;
757 	}
758 
759 	mtcp->smap[sockid].listener = listener;
760 	mtcp->listener = listener;
761 
762 	return 0;
763 }
764 /*----------------------------------------------------------------------------*/
765 int
766 mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen)
767 {
768 	mtcp_manager_t mtcp;
769 	struct tcp_listener *listener;
770 	socket_map_t socket;
771 	tcp_stream *accepted = NULL;
772 
773 	mtcp = GetMTCPManager(mctx);
774 	if (!mtcp) {
775 		errno = EACCES;
776 		return -1;
777 	}
778 
779 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
780 		TRACE_API("Socket id %d out of range.\n", sockid);
781 		errno = EBADF;
782 		return -1;
783 	}
784 
785 	/* requires listening socket */
786 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
787 		errno = EINVAL;
788 		return -1;
789 	}
790 
791 	listener = mtcp->smap[sockid].listener;
792 
793 	/* dequeue from the acceptq without lock first */
794 	/* if nothing there, acquire lock and cond_wait */
795 	accepted = StreamDequeue(listener->acceptq);
796 	if (!accepted) {
797 		if (listener->socket->opts & MTCP_NONBLOCK) {
798 			errno = EAGAIN;
799 			return -1;
800 
801 		} else {
802 			pthread_mutex_lock(&listener->accept_lock);
803 			while ((accepted = StreamDequeue(listener->acceptq)) == NULL) {
804 				pthread_cond_wait(&listener->accept_cond, &listener->accept_lock);
805 
806 				if (mtcp->ctx->done || mtcp->ctx->exit) {
807 					pthread_mutex_unlock(&listener->accept_lock);
808 					errno = EINTR;
809 					return -1;
810 				}
811 			}
812 			pthread_mutex_unlock(&listener->accept_lock);
813 		}
814 	}
815 
816 	if (!accepted) {
817 		TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n");
818 	}
819 
820 	if (!accepted->socket) {
821 		socket = AllocateSocket(mctx, MOS_SOCK_STREAM);
822 		if (!socket) {
823 			TRACE_ERROR("Failed to create new socket!\n");
824 			/* TODO: destroy the stream */
825 			errno = ENFILE;
826 			return -1;
827 		}
828 		socket->stream = accepted;
829 		accepted->socket = socket;
830 		/* if monitor is enabled, complete the socket assignment */
831 		if (socket->stream->pair_stream != NULL)
832 			socket->stream->pair_stream->socket = socket;
833 	}
834 
835 	TRACE_API("Stream %d accepted.\n", accepted->id);
836 
837 	if (addr && addrlen) {
838 		struct sockaddr_in *addr_in = (struct sockaddr_in *)addr;
839 		addr_in->sin_family = AF_INET;
840 		addr_in->sin_port = accepted->dport;
841 		addr_in->sin_addr.s_addr = accepted->daddr;
842 		*addrlen = sizeof(struct sockaddr_in);
843 	}
844 
845 	return accepted->socket->id;
846 }
847 /*----------------------------------------------------------------------------*/
848 int
849 mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr,
850 		in_addr_t daddr, in_addr_t dport)
851 {
852 	mtcp_manager_t mtcp;
853 	addr_pool_t ap;
854 
855 	mtcp = GetMTCPManager(mctx);
856 	if (!mtcp) {
857 		errno = EACCES;
858 		return -1;
859 	}
860 
861 	if (saddr_base == INADDR_ANY) {
862 		int nif_out;
863 
864 		/* for the INADDR_ANY, find the output interface for the destination
865 		   and set the saddr_base as the ip address of the output interface */
866 		nif_out = GetOutputInterface(daddr);
867 		saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr;
868 	}
869 
870 	ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus,
871 			saddr_base, num_addr, daddr, dport);
872 	if (!ap) {
873 		errno = ENOMEM;
874 		return -1;
875 	}
876 
877 	mtcp->ap = ap;
878 
879 	return 0;
880 }
881 /*----------------------------------------------------------------------------*/
882 int
883 eval_bpf_5tuple(struct sfbpf_program fcode,
884 				in_addr_t saddr, in_port_t sport,
885 				in_addr_t daddr, in_port_t dport) {
886 	uint8_t buf[TOTAL_TCP_HEADER_LEN];
887 	struct ethhdr *ethh;
888 	struct iphdr *iph;
889 	struct tcphdr *tcph;
890 
891 	ethh = (struct ethhdr *)buf;
892 	ethh->h_proto = htons(ETH_P_IP);
893 	iph = (struct iphdr *)(ethh + 1);
894 	iph->ihl = IP_HEADER_LEN >> 2;
895 	iph->version = 4;
896 	iph->tos = 0;
897 	iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN);
898 	iph->id = htons(0);
899 	iph->protocol = IPPROTO_TCP;
900 	iph->saddr = saddr;
901 	iph->daddr = daddr;
902 	iph->check = 0;
903 	tcph = (struct tcphdr *)(iph + 1);
904 	tcph->source = sport;
905 	tcph->dest = dport;
906 
907 	return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr),
908 						 TOTAL_TCP_HEADER_LEN);
909 }
910 /*----------------------------------------------------------------------------*/
911 int
912 mtcp_connect(mctx_t mctx, int sockid,
913 		const struct sockaddr *addr, socklen_t addrlen)
914 {
915 	mtcp_manager_t mtcp;
916 	socket_map_t socket;
917 	tcp_stream *cur_stream;
918 	struct sockaddr_in *addr_in;
919 	in_addr_t dip;
920 	in_port_t dport;
921 	int is_dyn_bound = FALSE;
922 	int ret;
923 	int cnt_match = 0;
924 	struct mon_listener *walk;
925 	struct sfbpf_program fcode;
926 
927 	cur_stream = NULL;
928 	mtcp = GetMTCPManager(mctx);
929 	if (!mtcp) {
930 		errno = EACCES;
931 		return -1;
932 	}
933 
934 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
935 		TRACE_API("Socket id %d out of range.\n", sockid);
936 		errno = EBADF;
937 		return -1;
938 	}
939 
940 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
941 		TRACE_API("Invalid socket id: %d\n", sockid);
942 		errno = EBADF;
943 		return -1;
944 	}
945 
946 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
947 		TRACE_API("Not an end socket. id: %d\n", sockid);
948 		errno = ENOTSOCK;
949 		return -1;
950 	}
951 
952 	if (!addr) {
953 		TRACE_API("Socket %d: empty address!\n", sockid);
954 		errno = EFAULT;
955 		return -1;
956 	}
957 
958 	/* we only allow bind() for AF_INET address */
959 	if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
960 		TRACE_API("Socket %d: invalid argument!\n", sockid);
961 		errno = EAFNOSUPPORT;
962 		return -1;
963 	}
964 
965 	socket = &mtcp->smap[sockid];
966 	if (socket->stream) {
967 		TRACE_API("Socket %d: stream already exist!\n", sockid);
968 		if (socket->stream->state >= TCP_ST_ESTABLISHED) {
969 			errno = EISCONN;
970 		} else {
971 			errno = EALREADY;
972 		}
973 		return -1;
974 	}
975 
976 	addr_in = (struct sockaddr_in *)addr;
977 	dip = addr_in->sin_addr.s_addr;
978 	dport = addr_in->sin_port;
979 
980 	/* address binding */
981 	if (socket->opts & MTCP_ADDR_BIND &&
982 	    socket->saddr.sin_port != INPORT_ANY &&
983 	    socket->saddr.sin_addr.s_addr != INADDR_ANY) {
984 		int rss_core;
985 
986 		rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip,
987 				socket->saddr.sin_port, dport, num_queues);
988 
989 		if (rss_core != mctx->cpu) {
990 			errno = EINVAL;
991 			return -1;
992 		}
993 	} else {
994 		if (mtcp->ap) {
995 			ret = FetchAddress(mtcp->ap,
996 					mctx->cpu, num_queues, addr_in, &socket->saddr);
997 		} else {
998 			ret = FetchAddress(ap,
999 					mctx->cpu, num_queues, addr_in, &socket->saddr);
1000 		}
1001 		if (ret < 0) {
1002 			errno = EAGAIN;
1003 			return -1;
1004 		}
1005 		socket->opts |= MTCP_ADDR_BIND;
1006 		is_dyn_bound = TRUE;
1007 	}
1008 
1009 	cnt_match = 0;
1010 	if (mtcp->num_msp > 0) {
1011 		TAILQ_FOREACH(walk, &mtcp->monitors, link) {
1012 			fcode = walk->stream_syn_fcode;
1013 			if (!(ISSET_BPFFILTER(fcode) &&
1014 				  eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr,
1015 								  socket->saddr.sin_port,
1016 								  dip, dport) == 0)) {
1017 				walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1
1018 				cnt_match++;
1019 			}
1020 		}
1021 	}
1022 
1023 	if (mtcp->num_msp > 0 && cnt_match > 0) {
1024 		/* 150820 dhkim: XXX: embedded mode is not verified */
1025 #if 1
1026 		cur_stream = CreateClientTCPStream(mtcp, socket,
1027 						 STREAM_TYPE(MOS_SOCK_STREAM) |
1028 						 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
1029 						 socket->saddr.sin_addr.s_addr,
1030 						 socket->saddr.sin_port, dip, dport, NULL);
1031 #else
1032 		cur_stream = CreateDualTCPStream(mtcp, socket,
1033 						 STREAM_TYPE(MOS_SOCK_STREAM) |
1034 						 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
1035 						 socket->saddr.sin_addr.s_addr,
1036 						 socket->saddr.sin_port, dip, dport, NULL);
1037 #endif
1038 	}
1039 	else
1040 		cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM),
1041 					     socket->saddr.sin_addr.s_addr,
1042 					     socket->saddr.sin_port, dip, dport, NULL);
1043 	if (!cur_stream) {
1044 		TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid);
1045 		errno = ENOMEM;
1046 		return -1;
1047 	}
1048 
1049 	if (is_dyn_bound)
1050 		cur_stream->is_bound_addr = TRUE;
1051 	cur_stream->sndvar->cwnd = 1;
1052 	cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10;
1053 	cur_stream->side = MOS_SIDE_CLI;
1054 	/* if monitor is enabled, update the pair stream side as well */
1055 	if (cur_stream->pair_stream) {
1056 		cur_stream->pair_stream->side = MOS_SIDE_SVR;
1057 		/*
1058 		 * if buffer management is off, then disable
1059 		 * monitoring tcp ring of server...
1060 		 * if there is even a single monitor asking for
1061 		 * buffer management, enable it (that's why the
1062 		 * need for the loop)
1063 		 */
1064 		cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF;
1065 		struct socket_map *walk;
1066 		SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
1067 			uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt;
1068 			if (bm > cur_stream->pair_stream->buffer_mgmt) {
1069 				cur_stream->pair_stream->buffer_mgmt = bm;
1070 				break;
1071 			}
1072 		} SOCKQ_FOREACH_END;
1073 	}
1074 
1075 	cur_stream->state = TCP_ST_SYN_SENT;
1076 	cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1077 
1078 	TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id);
1079 
1080 	SQ_LOCK(&mtcp->ctx->connect_lock);
1081 	ret = StreamEnqueue(mtcp->connectq, cur_stream);
1082 	SQ_UNLOCK(&mtcp->ctx->connect_lock);
1083 	mtcp->wakeup_flag = TRUE;
1084 	if (ret < 0) {
1085 		TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid);
1086 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1087 		StreamEnqueue(mtcp->destroyq, cur_stream);
1088 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1089 		errno = EAGAIN;
1090 		return -1;
1091 	}
1092 
1093 	/* if nonblocking socket, return EINPROGRESS */
1094 	if (socket->opts & MTCP_NONBLOCK) {
1095 		errno = EINPROGRESS;
1096 		return -1;
1097 
1098 	} else {
1099 		while (1) {
1100 			if (!cur_stream) {
1101 				TRACE_ERROR("STREAM DESTROYED\n");
1102 				errno = ETIMEDOUT;
1103 				return -1;
1104 			}
1105 			if (cur_stream->state > TCP_ST_ESTABLISHED) {
1106 				TRACE_ERROR("Socket %d: weird state %s\n",
1107 						sockid, TCPStateToString(cur_stream));
1108 				// TODO: how to handle this?
1109 				errno = ENOSYS;
1110 				return -1;
1111 			}
1112 
1113 			if (cur_stream->state == TCP_ST_ESTABLISHED) {
1114 				break;
1115 			}
1116 			usleep(1000);
1117 		}
1118 	}
1119 
1120 	return 0;
1121 }
1122 /*----------------------------------------------------------------------------*/
1123 static inline int
1124 CloseStreamSocket(mctx_t mctx, int sockid)
1125 {
1126 	mtcp_manager_t mtcp;
1127 	tcp_stream *cur_stream;
1128 	int ret;
1129 
1130 	mtcp = GetMTCPManager(mctx);
1131 	if (!mtcp) {
1132 		errno = EACCES;
1133 		return -1;
1134 	}
1135 
1136 	cur_stream = mtcp->smap[sockid].stream;
1137 	if (!cur_stream) {
1138 		TRACE_API("Socket %d: stream does not exist.\n", sockid);
1139 		errno = ENOTCONN;
1140 		return -1;
1141 	}
1142 
1143 	if (cur_stream->closed) {
1144 		TRACE_API("Socket %d (Stream %u): already closed stream\n",
1145 				sockid, cur_stream->id);
1146 		return 0;
1147 	}
1148 	cur_stream->closed = TRUE;
1149 
1150 	TRACE_API("Stream %d: closing the stream.\n", cur_stream->id);
1151 
1152 	/* 141029 dhkim: Check this! */
1153 	cur_stream->socket = NULL;
1154 
1155 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
1156 		TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n",
1157 				cur_stream->id);
1158 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1159 		StreamEnqueue(mtcp->destroyq, cur_stream);
1160 		mtcp->wakeup_flag = TRUE;
1161 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1162 		return 0;
1163 
1164 	} else if (cur_stream->state == TCP_ST_SYN_SENT) {
1165 #if 1
1166 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1167 		StreamEnqueue(mtcp->destroyq, cur_stream);
1168 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1169 		mtcp->wakeup_flag = TRUE;
1170 #endif
1171 		return -1;
1172 
1173 	} else if (cur_stream->state != TCP_ST_ESTABLISHED &&
1174 			cur_stream->state != TCP_ST_CLOSE_WAIT) {
1175 		TRACE_API("Stream %d at state %s\n",
1176 				cur_stream->id, TCPStateToString(cur_stream));
1177 		errno = EBADF;
1178 		return -1;
1179 	}
1180 
1181 	SQ_LOCK(&mtcp->ctx->close_lock);
1182 	cur_stream->sndvar->on_closeq = TRUE;
1183 	ret = StreamEnqueue(mtcp->closeq, cur_stream);
1184 	mtcp->wakeup_flag = TRUE;
1185 	SQ_UNLOCK(&mtcp->ctx->close_lock);
1186 
1187 	if (ret < 0) {
1188 		TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
1189 		errno = EAGAIN;
1190 		return -1;
1191 	}
1192 
1193 	return 0;
1194 }
1195 /*----------------------------------------------------------------------------*/
1196 static inline int
1197 CloseListeningSocket(mctx_t mctx, int sockid)
1198 {
1199 	mtcp_manager_t mtcp;
1200 	struct tcp_listener *listener;
1201 
1202 	mtcp = GetMTCPManager(mctx);
1203 	if (!mtcp) {
1204 		errno = EACCES;
1205 		return -1;
1206 	}
1207 
1208 	listener = mtcp->smap[sockid].listener;
1209 	if (!listener) {
1210 		errno = EINVAL;
1211 		return -1;
1212 	}
1213 
1214 	if (listener->acceptq) {
1215 		DestroyStreamQueue(listener->acceptq);
1216 		listener->acceptq = NULL;
1217 	}
1218 
1219 	pthread_mutex_lock(&listener->accept_lock);
1220 	pthread_cond_signal(&listener->accept_cond);
1221 	pthread_mutex_unlock(&listener->accept_lock);
1222 
1223 	pthread_cond_destroy(&listener->accept_cond);
1224 	pthread_mutex_destroy(&listener->accept_lock);
1225 
1226 	free(listener);
1227 	mtcp->smap[sockid].listener = NULL;
1228 
1229 	return 0;
1230 }
1231 /*----------------------------------------------------------------------------*/
1232 int
1233 mtcp_close(mctx_t mctx, int sockid)
1234 {
1235 	mtcp_manager_t mtcp;
1236 	int ret;
1237 
1238 	mtcp = GetMTCPManager(mctx);
1239 	if (!mtcp) {
1240 		errno = EACCES;
1241 		return -1;
1242 	}
1243 
1244 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1245 		TRACE_API("Socket id %d out of range.\n", sockid);
1246 		errno = EBADF;
1247 		return -1;
1248 	}
1249 
1250 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1251 		TRACE_API("Invalid socket id: %d\n", sockid);
1252 		errno = EBADF;
1253 		return -1;
1254 	}
1255 
1256 	TRACE_API("Socket %d: mtcp_close called.\n", sockid);
1257 
1258 	switch (mtcp->smap[sockid].socktype) {
1259 	case MOS_SOCK_STREAM:
1260 		ret = CloseStreamSocket(mctx, sockid);
1261 		break;
1262 
1263 	case MOS_SOCK_STREAM_LISTEN:
1264 		ret = CloseListeningSocket(mctx, sockid);
1265 		break;
1266 
1267 	case MOS_SOCK_EPOLL:
1268 		ret = CloseEpollSocket(mctx, sockid);
1269 		break;
1270 
1271 	case MOS_SOCK_PIPE:
1272 		ret = PipeClose(mctx, sockid);
1273 		break;
1274 
1275 	default:
1276 		errno = EINVAL;
1277 		ret = -1;
1278 		break;
1279 	}
1280 
1281 	FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
1282 
1283 	return ret;
1284 }
1285 /*----------------------------------------------------------------------------*/
1286 int
1287 mtcp_abort(mctx_t mctx, int sockid)
1288 {
1289 	mtcp_manager_t mtcp;
1290 	tcp_stream *cur_stream;
1291 	int ret;
1292 
1293 	mtcp = GetMTCPManager(mctx);
1294 	if (!mtcp) {
1295 		errno = EACCES;
1296 		return -1;
1297 	}
1298 
1299 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1300 		TRACE_API("Socket id %d out of range.\n", sockid);
1301 		errno = EBADF;
1302 		return -1;
1303 	}
1304 
1305 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1306 		TRACE_API("Invalid socket id: %d\n", sockid);
1307 		errno = EBADF;
1308 		return -1;
1309 	}
1310 
1311 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
1312 		TRACE_API("Not an end socket. id: %d\n", sockid);
1313 		errno = ENOTSOCK;
1314 		return -1;
1315 	}
1316 
1317 	cur_stream = mtcp->smap[sockid].stream;
1318 	if (!cur_stream) {
1319 		TRACE_API("Stream %d: does not exist.\n", sockid);
1320 		errno = ENOTCONN;
1321 		return -1;
1322 	}
1323 
1324 	TRACE_API("Socket %d: mtcp_abort()\n", sockid);
1325 
1326 	FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
1327 	cur_stream->socket = NULL;
1328 
1329 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
1330 		TRACE_API("Stream %d: connection already reset.\n", sockid);
1331 		return ERROR;
1332 
1333 	} else if (cur_stream->state == TCP_ST_SYN_SENT) {
1334 		/* TODO: this should notify event failure to all
1335 		   previous read() or write() calls */
1336 		cur_stream->state = TCP_ST_CLOSED_RSVD;
1337 		cur_stream->close_reason = TCP_ACTIVE_CLOSE;
1338 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1339 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1340 		StreamEnqueue(mtcp->destroyq, cur_stream);
1341 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1342 		mtcp->wakeup_flag = TRUE;
1343 		return 0;
1344 
1345 	} else if (cur_stream->state == TCP_ST_CLOSING ||
1346 			cur_stream->state == TCP_ST_LAST_ACK ||
1347 			cur_stream->state == TCP_ST_TIME_WAIT) {
1348 		cur_stream->state = TCP_ST_CLOSED_RSVD;
1349 		cur_stream->close_reason = TCP_ACTIVE_CLOSE;
1350 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1351 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1352 		StreamEnqueue(mtcp->destroyq, cur_stream);
1353 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1354 		mtcp->wakeup_flag = TRUE;
1355 		return 0;
1356 	}
1357 
1358 	/* the stream structure will be destroyed after sending RST */
1359 	if (cur_stream->sndvar->on_resetq) {
1360 		TRACE_ERROR("Stream %d: calling mtcp_abort() "
1361 				"when in reset queue.\n", sockid);
1362 		errno = ECONNRESET;
1363 		return -1;
1364 	}
1365 	SQ_LOCK(&mtcp->ctx->reset_lock);
1366 	cur_stream->sndvar->on_resetq = TRUE;
1367 	ret = StreamEnqueue(mtcp->resetq, cur_stream);
1368 	SQ_UNLOCK(&mtcp->ctx->reset_lock);
1369 	mtcp->wakeup_flag = TRUE;
1370 
1371 	if (ret < 0) {
1372 		TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
1373 		errno = EAGAIN;
1374 		return -1;
1375 	}
1376 
1377 	return 0;
1378 }
1379 /*----------------------------------------------------------------------------*/
1380 static inline int
1381 CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len)
1382 {
1383 	struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
1384 	int copylen;
1385 #ifdef NEWRB
1386 	tcprb_t *rb = rcvvar->rcvbuf;
1387 	if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) {
1388 		errno = EAGAIN;
1389 		return -1;
1390 	}
1391 	tcprb_setpile(rb, rb->pile + copylen);
1392 
1393 	rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb);
1394 	//printf("rcv_wnd: %d\n", rcvvar->rcv_wnd);
1395 #else
1396 	copylen = MIN(rcvvar->rcvbuf->merged_len, len);
1397 	if (copylen <= 0) {
1398 		errno = EAGAIN;
1399 		return -1;
1400 	}
1401 
1402 	assert(rcvvar->rcvbuf->data);
1403 	/* Copy data to user buffer and remove it from receiving buffer */
1404 	memcpy(buf, rcvvar->rcvbuf->head, copylen);
1405 	RBRemove(mtcp->rbm_rcv, rcvvar->rcvbuf, copylen, AT_APP, cur_stream->buffer_mgmt);
1406 	rcvvar->rcv_wnd = rcvvar->rcvbuf->size - rcvvar->rcvbuf->merged_len;
1407 #endif
1408 
1409 	/* Advertise newly freed receive buffer */
1410 	if (cur_stream->need_wnd_adv) {
1411 		if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) {
1412 			if (!cur_stream->sndvar->on_ackq) {
1413 				SQ_LOCK(&mtcp->ctx->ackq_lock);
1414 				cur_stream->sndvar->on_ackq = TRUE;
1415 				StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */
1416 				SQ_UNLOCK(&mtcp->ctx->ackq_lock);
1417 				cur_stream->need_wnd_adv = FALSE;
1418 				mtcp->wakeup_flag = TRUE;
1419 			}
1420 		}
1421 	}
1422 
1423 	return copylen;
1424 }
1425 /*----------------------------------------------------------------------------*/
1426 ssize_t
1427 mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len)
1428 {
1429 	mtcp_manager_t mtcp;
1430 	socket_map_t socket;
1431 	tcp_stream *cur_stream;
1432 	struct tcp_recv_vars *rcvvar;
1433 	int event_remaining, merged_len;
1434 	int ret;
1435 
1436 	mtcp = GetMTCPManager(mctx);
1437 	if (!mtcp) {
1438 		errno = EACCES;
1439 		return -1;
1440 	}
1441 
1442 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1443 		TRACE_API("Socket id %d out of range.\n", sockid);
1444 		errno = EBADF;
1445 		return -1;
1446 	}
1447 
1448 	socket = &mtcp->smap[sockid];
1449 	if (socket->socktype == MOS_SOCK_UNUSED) {
1450 		TRACE_API("Invalid socket id: %d\n", sockid);
1451 		errno = EBADF;
1452 		return -1;
1453 	}
1454 
1455 	if (socket->socktype == MOS_SOCK_PIPE) {
1456 		return PipeRead(mctx, sockid, buf, len);
1457 	}
1458 
1459 	if (socket->socktype != MOS_SOCK_STREAM) {
1460 		TRACE_API("Not an end socket. id: %d\n", sockid);
1461 		errno = ENOTSOCK;
1462 		return -1;
1463 	}
1464 
1465 	/* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
1466 	cur_stream = socket->stream;
1467 	if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf ||
1468 			!(cur_stream->state >= TCP_ST_ESTABLISHED &&
1469 			cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
1470 		errno = ENOTCONN;
1471 		return -1;
1472 	}
1473 
1474 	rcvvar = cur_stream->rcvvar;
1475 
1476 #ifdef NEWRB
1477 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1478 #else
1479 	merged_len = rcvvar->rcvbuf->merged_len;
1480 #endif
1481 
1482 	/* if CLOSE_WAIT, return 0 if there is no payload */
1483 	if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
1484 		if (!rcvvar->rcvbuf)
1485 			return 0;
1486 
1487 		if (merged_len == 0)
1488 			return 0;
1489 	}
1490 
1491 	/* return EAGAIN if no receive buffer */
1492 	if (socket->opts & MTCP_NONBLOCK) {
1493 		if (!rcvvar->rcvbuf || merged_len == 0) {
1494 			errno = EAGAIN;
1495 			return -1;
1496 		}
1497 	}
1498 
1499 	SBUF_LOCK(&rcvvar->read_lock);
1500 
1501 	ret = CopyToUser(mtcp, cur_stream, buf, len);
1502 
1503 #ifdef NEWRB
1504 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1505 #else
1506 	merged_len = rcvvar->rcvbuf->merged_len;
1507 #endif
1508 
1509 	event_remaining = FALSE;
1510 	/* if there are remaining payload, generate EPOLLIN */
1511 	/* (may due to insufficient user buffer) */
1512 	if (socket->epoll & MOS_EPOLLIN) {
1513 		if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
1514 			event_remaining = TRUE;
1515 		}
1516 	}
1517 	/* if waiting for close, notify it if no remaining data */
1518 	if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
1519 			merged_len == 0 && ret > 0) {
1520 		event_remaining = TRUE;
1521 	}
1522 
1523 	SBUF_UNLOCK(&rcvvar->read_lock);
1524 
1525 	if (event_remaining) {
1526 		if (socket->epoll) {
1527 			AddEpollEvent(mtcp->ep,
1528 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
1529 		}
1530 	}
1531 
1532 	TRACE_API("Stream %d: mtcp_read() returning %d\n", cur_stream->id, ret);
1533 	return ret;
1534 }
1535 /*----------------------------------------------------------------------------*/
1536 ssize_t
1537 mtcp_readv(mctx_t mctx, int sockid, struct iovec *iov, int numIOV)
1538 {
1539 	mtcp_manager_t mtcp;
1540 	socket_map_t socket;
1541 	tcp_stream *cur_stream;
1542 	struct tcp_recv_vars *rcvvar;
1543 	int ret, bytes_read, i;
1544 	int event_remaining, merged_len;
1545 
1546 	mtcp = GetMTCPManager(mctx);
1547 	if (!mtcp) {
1548 		errno = EACCES;
1549 		return -1;
1550 	}
1551 
1552 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1553 		TRACE_API("Socket id %d out of range.\n", sockid);
1554 		errno = EBADF;
1555 		return -1;
1556 	}
1557 
1558 	socket = &mtcp->smap[sockid];
1559 	if (socket->socktype == MOS_SOCK_UNUSED) {
1560 		TRACE_API("Invalid socket id: %d\n", sockid);
1561 		errno = EBADF;
1562 		return -1;
1563 	}
1564 
1565 	if (socket->socktype != MOS_SOCK_STREAM) {
1566 		TRACE_API("Not an end socket. id: %d\n", sockid);
1567 		errno = ENOTSOCK;
1568 		return -1;
1569 	}
1570 
1571 	/* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
1572 	cur_stream = socket->stream;
1573 	if (!cur_stream ||
1574 			!(cur_stream->state >= TCP_ST_ESTABLISHED &&
1575 			  cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
1576 		errno = ENOTCONN;
1577 		return -1;
1578 	}
1579 
1580 	rcvvar = cur_stream->rcvvar;
1581 
1582 #ifdef NEWRB
1583 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1584 #else
1585 	merged_len = rcvvar->rcvbuf->merged_len;
1586 #endif
1587 
1588 	/* if CLOSE_WAIT, return 0 if there is no payload */
1589 	if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
1590 		if (!rcvvar->rcvbuf)
1591 			return 0;
1592 
1593 		if (merged_len == 0)
1594 			return 0;
1595 	}
1596 
1597 	/* return EAGAIN if no receive buffer */
1598 	if (socket->opts & MTCP_NONBLOCK) {
1599 		if (!rcvvar->rcvbuf || merged_len == 0) {
1600 			errno = EAGAIN;
1601 			return -1;
1602 		}
1603 	}
1604 
1605 	SBUF_LOCK(&rcvvar->read_lock);
1606 
1607 	/* read and store the contents to the vectored buffers */
1608 	bytes_read = 0;
1609 	for (i = 0; i < numIOV; i++) {
1610 		if (iov[i].iov_len <= 0)
1611 			continue;
1612 
1613 		ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
1614 		if (ret <= 0)
1615 			break;
1616 
1617 		bytes_read += ret;
1618 
1619 		if (ret < iov[i].iov_len)
1620 			break;
1621 	}
1622 
1623 #ifdef NEWRB
1624 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1625 #else
1626 	merged_len = rcvvar->rcvbuf->merged_len;
1627 #endif
1628 
1629 	event_remaining = FALSE;
1630 	/* if there are remaining payload, generate read event */
1631 	/* (may due to insufficient user buffer) */
1632 	if (socket->epoll & MOS_EPOLLIN) {
1633 		if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
1634 			event_remaining = TRUE;
1635 		}
1636 	}
1637 	/* if waiting for close, notify it if no remaining data */
1638 	if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
1639 			merged_len == 0 && bytes_read > 0) {
1640 		event_remaining = TRUE;
1641 	}
1642 
1643 	SBUF_UNLOCK(&rcvvar->read_lock);
1644 
1645 	if(event_remaining) {
1646 		if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) {
1647 			AddEpollEvent(mtcp->ep,
1648 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
1649 		}
1650 	}
1651 
1652 	TRACE_API("Stream %d: mtcp_readv() returning %d\n",
1653 			cur_stream->id, bytes_read);
1654 	return bytes_read;
1655 }
1656 /*----------------------------------------------------------------------------*/
1657 static inline int
1658 CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len)
1659 {
1660 	struct tcp_send_vars *sndvar = cur_stream->sndvar;
1661 	int sndlen;
1662 	int ret;
1663 
1664 	sndlen = MIN((int)sndvar->snd_wnd, len);
1665 	if (sndlen <= 0) {
1666 		errno = EAGAIN;
1667 		return -1;
1668 	}
1669 
1670 	/* allocate send buffer if not exist */
1671 	if (!sndvar->sndbuf) {
1672 		sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1);
1673 		if (!sndvar->sndbuf) {
1674 			cur_stream->close_reason = TCP_NO_MEM;
1675 			/* notification may not required due to -1 return */
1676 			errno = ENOMEM;
1677 			return -1;
1678 		}
1679 	}
1680 
1681 	ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen);
1682 	assert(ret == sndlen);
1683 	sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len;
1684 	if (ret <= 0) {
1685 		TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n",
1686 				ret, sndlen, sndvar->sndbuf->len);
1687 		errno = EAGAIN;
1688 		return -1;
1689 	}
1690 
1691 	if (sndvar->snd_wnd <= 0) {
1692 		TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n",
1693 				cur_stream->id, sndvar->snd_wnd);
1694 	}
1695 
1696 	return ret;
1697 }
1698 /*----------------------------------------------------------------------------*/
1699 ssize_t
1700 mtcp_write(mctx_t mctx, int sockid, char *buf, size_t len)
1701 {
1702 	mtcp_manager_t mtcp;
1703 	socket_map_t socket;
1704 	tcp_stream *cur_stream;
1705 	struct tcp_send_vars *sndvar;
1706 	int ret;
1707 
1708 	mtcp = GetMTCPManager(mctx);
1709 	if (!mtcp) {
1710 		errno = EACCES;
1711 		return -1;
1712 	}
1713 
1714 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1715 		TRACE_API("Socket id %d out of range.\n", sockid);
1716 		errno = EBADF;
1717 		return -1;
1718 	}
1719 
1720 	socket = &mtcp->smap[sockid];
1721 	if (socket->socktype == MOS_SOCK_UNUSED) {
1722 		TRACE_API("Invalid socket id: %d\n", sockid);
1723 		errno = EBADF;
1724 		return -1;
1725 	}
1726 
1727 	if (socket->socktype == MOS_SOCK_PIPE) {
1728 		return PipeWrite(mctx, sockid, buf, len);
1729 	}
1730 
1731 	if (socket->socktype != MOS_SOCK_STREAM) {
1732 		TRACE_API("Not an end socket. id: %d\n", sockid);
1733 		errno = ENOTSOCK;
1734 		return -1;
1735 	}
1736 
1737 	cur_stream = socket->stream;
1738 	if (!cur_stream ||
1739 			!(cur_stream->state == TCP_ST_ESTABLISHED ||
1740 			  cur_stream->state == TCP_ST_CLOSE_WAIT)) {
1741 		errno = ENOTCONN;
1742 		return -1;
1743 	}
1744 
1745 	if (len <= 0) {
1746 		if (socket->opts & MTCP_NONBLOCK) {
1747 			errno = EAGAIN;
1748 			return -1;
1749 		} else {
1750 			return 0;
1751 		}
1752 	}
1753 
1754 	sndvar = cur_stream->sndvar;
1755 
1756 	SBUF_LOCK(&sndvar->write_lock);
1757 	ret = CopyFromUser(mtcp, cur_stream, buf, len);
1758 
1759 	SBUF_UNLOCK(&sndvar->write_lock);
1760 
1761 	if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
1762 		SQ_LOCK(&mtcp->ctx->sendq_lock);
1763 		sndvar->on_sendq = TRUE;
1764 		StreamEnqueue(mtcp->sendq, cur_stream);		/* this always success */
1765 		SQ_UNLOCK(&mtcp->ctx->sendq_lock);
1766 		mtcp->wakeup_flag = TRUE;
1767 	}
1768 
1769 	if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) {
1770 		ret = -1;
1771 		errno = EAGAIN;
1772 	}
1773 
1774 	/* if there are remaining sending buffer, generate write event */
1775 	if (sndvar->snd_wnd > 0) {
1776 		if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
1777 			AddEpollEvent(mtcp->ep,
1778 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
1779 		}
1780 	}
1781 
1782 	TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret);
1783 	return ret;
1784 }
1785 /*----------------------------------------------------------------------------*/
1786 ssize_t
1787 mtcp_writev(mctx_t mctx, int sockid, struct iovec *iov, int numIOV)
1788 {
1789 	mtcp_manager_t mtcp;
1790 	socket_map_t socket;
1791 	tcp_stream *cur_stream;
1792 	struct tcp_send_vars *sndvar;
1793 	int ret, to_write, i;
1794 
1795 	mtcp = GetMTCPManager(mctx);
1796 	if (!mtcp) {
1797 		errno = EACCES;
1798 		return -1;
1799 	}
1800 
1801 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1802 		TRACE_API("Socket id %d out of range.\n", sockid);
1803 		errno = EBADF;
1804 		return -1;
1805 	}
1806 
1807 	socket = &mtcp->smap[sockid];
1808 	if (socket->socktype == MOS_SOCK_UNUSED) {
1809 		TRACE_API("Invalid socket id: %d\n", sockid);
1810 		errno = EBADF;
1811 		return -1;
1812 	}
1813 
1814 	if (socket->socktype != MOS_SOCK_STREAM) {
1815 		TRACE_API("Not an end socket. id: %d\n", sockid);
1816 		errno = ENOTSOCK;
1817 		return -1;
1818 	}
1819 
1820 	cur_stream = socket->stream;
1821 	if (!cur_stream ||
1822 			!(cur_stream->state == TCP_ST_ESTABLISHED ||
1823 			  cur_stream->state == TCP_ST_CLOSE_WAIT)) {
1824 		errno = ENOTCONN;
1825 		return -1;
1826 	}
1827 
1828 	sndvar = cur_stream->sndvar;
1829 	SBUF_LOCK(&sndvar->write_lock);
1830 
1831 	/* write from the vectored buffers */
1832 	to_write = 0;
1833 	for (i = 0; i < numIOV; i++) {
1834 		if (iov[i].iov_len <= 0)
1835 			continue;
1836 
1837 		ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
1838 		if (ret <= 0)
1839 			break;
1840 
1841 		to_write += ret;
1842 
1843 		if (ret < iov[i].iov_len)
1844 			break;
1845 	}
1846 	SBUF_UNLOCK(&sndvar->write_lock);
1847 
1848 	if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
1849 		SQ_LOCK(&mtcp->ctx->sendq_lock);
1850 		sndvar->on_sendq = TRUE;
1851 		StreamEnqueue(mtcp->sendq, cur_stream);		/* this always success */
1852 		SQ_UNLOCK(&mtcp->ctx->sendq_lock);
1853 		mtcp->wakeup_flag = TRUE;
1854 	}
1855 
1856 	if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) {
1857 		to_write = -1;
1858 		errno = EAGAIN;
1859 	}
1860 
1861 	/* if there are remaining sending buffer, generate write event */
1862 	if (sndvar->snd_wnd > 0) {
1863 		if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
1864 			AddEpollEvent(mtcp->ep,
1865 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
1866 		}
1867 	}
1868 
1869 	TRACE_API("Stream %d: mtcp_writev() returning %d\n",
1870 			cur_stream->id, to_write);
1871 	return to_write;
1872 }
1873 /*----------------------------------------------------------------------------*/
1874 uint32_t
1875 mtcp_get_connection_cnt(mctx_t mctx)
1876 {
1877 	mtcp_manager_t mtcp;
1878 	mtcp = GetMTCPManager(mctx);
1879 	if (!mtcp) {
1880 		errno = EACCES;
1881 		return -1;
1882 	}
1883 
1884 	if (mtcp->num_msp > 0)
1885 		return mtcp->flow_cnt / 2;
1886 	else
1887 		return mtcp->flow_cnt;
1888 }
1889 /*----------------------------------------------------------------------------*/
1890