xref: /mOS-networking-stack/core/src/api.c (revision 4cb4e140)
1 #include <sys/queue.h>
2 #include <sys/ioctl.h>
3 #include <limits.h>
4 #include <unistd.h>
5 #include <assert.h>
6 #include <string.h>
7 
8 #ifdef DARWIN
9 #include <netinet/tcp.h>
10 #include <netinet/ip.h>
11 #include <netinet/if_ether.h>
12 #endif
13 
14 #include "mtcp.h"
15 #include "mtcp_api.h"
16 #include "tcp_in.h"
17 #include "tcp_stream.h"
18 #include "tcp_out.h"
19 #include "ip_out.h"
20 #include "eventpoll.h"
21 #include "pipe.h"
22 #include "fhash.h"
23 #include "addr_pool.h"
24 #include "rss.h"
25 #include "config.h"
26 #include "debug.h"
27 #include "eventpoll.h"
28 #include "mos_api.h"
29 #include "tcp_rb.h"
30 
31 #define MAX(a, b) ((a)>(b)?(a):(b))
32 #define MIN(a, b) ((a)<(b)?(a):(b))
33 
34 /*----------------------------------------------------------------------------*/
35 /** Stop monitoring the socket! (function prototype)
36  * @param [in] mctx: mtcp context
37  * @param [in] sock: monitoring stream socket id
38  * @param [in] side: side of monitoring (client side, server side or both)
39  *
40  * This function is now DEPRECATED and is only used within mOS core...
41  */
42 int
43 mtcp_cb_stop(mctx_t mctx, int sock, int side);
44 /*----------------------------------------------------------------------------*/
45 /** Reset the connection (send RST packets to both sides)
46  *  (We need to decide the API for this.)
47  */
48 //int
49 //mtcp_cb_reset(mctx_t mctx, int sock, int side);
50 /*----------------------------------------------------------------------------*/
51 inline mtcp_manager_t
52 GetMTCPManager(mctx_t mctx)
53 {
54 	if (!mctx) {
55 		errno = EACCES;
56 		return NULL;
57 	}
58 
59 	if (mctx->cpu < 0 || mctx->cpu >= num_cpus) {
60 		errno = EINVAL;
61 		return NULL;
62 	}
63 
64 	if (!g_mtcp[mctx->cpu] || g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) {
65 		errno = EPERM;
66 		return NULL;
67 	}
68 
69 	return g_mtcp[mctx->cpu];
70 }
71 /*----------------------------------------------------------------------------*/
72 inline int
73 GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen)
74 {
75 	tcp_stream *cur_stream;
76 
77 	if (!socket->stream) {
78 		errno = EBADF;
79 		return -1;
80 	}
81 
82 	cur_stream = socket->stream;
83 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
84 		if (cur_stream->close_reason == TCP_TIMEDOUT ||
85 				cur_stream->close_reason == TCP_CONN_FAIL ||
86 				cur_stream->close_reason == TCP_CONN_LOST) {
87 			*(int *)optval = ETIMEDOUT;
88 			*optlen = sizeof(int);
89 
90 			return 0;
91 		}
92 	}
93 
94 	if (cur_stream->state == TCP_ST_CLOSE_WAIT ||
95 			cur_stream->state == TCP_ST_CLOSED_RSVD) {
96 		if (cur_stream->close_reason == TCP_RESET) {
97 			*(int *)optval = ECONNRESET;
98 			*optlen = sizeof(int);
99 
100 			return 0;
101 		}
102 	}
103 
104 	/*
105 	 * `base case`: If socket sees no so_error, then
106 	 * this also means close_reason will always be
107 	 * TCP_NOT_CLOSED.
108 	 */
109 	if (cur_stream->close_reason == TCP_NOT_CLOSED) {
110 		*(int *)optval = 0;
111 		*optlen = sizeof(int);
112 
113 		return 0;
114 	}
115 
116 	errno = ENOSYS;
117 	return -1;
118 }
119 /*----------------------------------------------------------------------------*/
120 int
121 mtcp_getsockopt(mctx_t mctx, int sockid, int level,
122 		int optname, void *optval, socklen_t *optlen)
123 {
124 	mtcp_manager_t mtcp;
125 	socket_map_t socket;
126 
127 	mtcp = GetMTCPManager(mctx);
128 	if (!mtcp) {
129 		errno = EACCES;
130 		return -1;
131 	}
132 
133 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
134 		TRACE_API("Socket id %d out of range.\n", sockid);
135 		errno = EBADF;
136 		return -1;
137 	}
138 
139 	switch (level) {
140 	case SOL_SOCKET:
141 		socket = &mtcp->smap[sockid];
142 		if (socket->socktype == MOS_SOCK_UNUSED) {
143 			TRACE_API("Invalid socket id: %d\n", sockid);
144 			errno = EBADF;
145 			return -1;
146 		}
147 
148 		if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
149 		    socket->socktype != MOS_SOCK_STREAM) {
150 			TRACE_API("Invalid socket id: %d\n", sockid);
151 			errno = ENOTSOCK;
152 			return -1;
153 		}
154 
155 		if (optname == SO_ERROR) {
156 			if (socket->socktype == MOS_SOCK_STREAM) {
157 				return GetSocketError(socket, optval, optlen);
158 			}
159 		}
160 		break;
161 	case SOL_MONSOCKET:
162 		/* check if the calling thread is in MOS context */
163 		if (mtcp->ctx->thread != pthread_self()) {
164 			errno = EPERM;
165 			return -1;
166 		}
167 		/*
168 		 * All options will only work for active
169 		 * monitor stream sockets
170 		 */
171 		socket = &mtcp->msmap[sockid];
172 		if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
173 			TRACE_API("Invalid socket id: %d\n", sockid);
174 			errno = ENOTSOCK;
175 			return -1;
176 		}
177 
178 		switch (optname) {
179 		case MOS_FRAGINFO_CLIBUF:
180 			return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen);
181 		case MOS_FRAGINFO_SVRBUF:
182 			return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen);
183 		case MOS_INFO_CLIBUF:
184 			return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen);
185 		case MOS_INFO_SVRBUF:
186 			return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen);
187 		case MOS_TCP_STATE_CLI:
188 			return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI,
189 							   optval, optlen);
190 		case MOS_TCP_STATE_SVR:
191 			return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR,
192 							   optval, optlen);
193 		case MOS_TIMESTAMP:
194 			return GetLastTimestamp(socket->monitor_stream->stream,
195 						(uint32_t *)optval,
196 						optlen);
197 		default:
198 		  TRACE_API("can't recognize the optname=%d\n", optname);
199 		  assert(0);
200 		}
201 		break;
202 	}
203 	errno = ENOSYS;
204 	return -1;
205 }
206 /*----------------------------------------------------------------------------*/
207 int
208 mtcp_setsockopt(mctx_t mctx, int sockid, int level,
209 		int optname, const void *optval, socklen_t optlen)
210 {
211 	mtcp_manager_t mtcp;
212 	socket_map_t socket;
213 	tcprb_t *rb;
214 
215 	mtcp = GetMTCPManager(mctx);
216 	if (!mtcp) {
217 		errno = EACCES;
218 		return -1;
219 	}
220 
221 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
222 		TRACE_API("Socket id %d out of range.\n", sockid);
223 		errno = EBADF;
224 		return -1;
225 	}
226 
227 	switch (level) {
228 	case SOL_SOCKET:
229 		socket = &mtcp->smap[sockid];
230 		if (socket->socktype == MOS_SOCK_UNUSED) {
231 			TRACE_API("Invalid socket id: %d\n", sockid);
232 			errno = EBADF;
233 			return -1;
234 		}
235 
236 		if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
237 		    socket->socktype != MOS_SOCK_STREAM) {
238 			TRACE_API("Invalid socket id: %d\n", sockid);
239 			errno = ENOTSOCK;
240 			return -1;
241 		}
242 		break;
243 	case SOL_MONSOCKET:
244 		socket = &mtcp->msmap[sockid];
245 		/*
246 		 * checking of calling thread to be in MOS context is
247 		 * disabled since both optnames can be called from
248 		 * `application' context (on passive sockets)
249 		 */
250 		/*
251 		 * if (mtcp->ctx->thread != pthread_self())
252 		 * return -1;
253 		 */
254 
255 		switch (optname) {
256 		case MOS_CLIBUF:
257 #if 0
258 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
259 				errno = EBADF;
260 				return -1;
261 			}
262 #endif
263 #ifdef DISABLE_DYN_RESIZE
264 			if (*(int *)optval != 0)
265 				return -1;
266 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
267 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
268 					socket->monitor_stream->stream->rcvvar->rcvbuf :
269 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
270 				if (rb) {
271 					tcprb_resize_meta(rb, 0);
272 					tcprb_resize(rb, 0);
273 				}
274 			}
275 			return DisableBuf(socket, MOS_SIDE_CLI);
276 #else
277 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
278 				socket->monitor_stream->stream->rcvvar->rcvbuf :
279 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
280 			if (tcprb_resize_meta(rb, *(int *)optval) < 0)
281 				return -1;
282 			return tcprb_resize(rb,
283 					(((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
284 #endif
285 		case MOS_SVRBUF:
286 #if 0
287 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
288 				errno = EBADF;
289 				return -1;
290 			}
291 #endif
292 #ifdef DISABLE_DYN_RESIZE
293 			if (*(int *)optval != 0)
294 				return -1;
295 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
296 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
297 					socket->monitor_stream->stream->rcvvar->rcvbuf :
298 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
299 				if (rb) {
300 					tcprb_resize_meta(rb, 0);
301 					tcprb_resize(rb, 0);
302 				}
303 			}
304 			return DisableBuf(socket, MOS_SIDE_SVR);
305 #else
306 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
307 				socket->monitor_stream->stream->rcvvar->rcvbuf :
308 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
309 			if (tcprb_resize_meta(rb, *(int *)optval) < 0)
310 				return -1;
311 			return tcprb_resize(rb,
312 					(((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
313 #endif
314 		case MOS_FRAG_CLIBUF:
315 #if 0
316 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
317 				errno = EBADF;
318 				return -1;
319 			}
320 #endif
321 #ifdef DISABLE_DYN_RESIZE
322 			if (*(int *)optval != 0)
323 				return -1;
324 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
325 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
326 					socket->monitor_stream->stream->rcvvar->rcvbuf :
327 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
328 				if (rb)
329 					tcprb_resize(rb, 0);
330 			}
331 			return 0;
332 #else
333 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
334 				socket->monitor_stream->stream->rcvvar->rcvbuf :
335 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
336 			if (rb->len == 0)
337 				return tcprb_resize_meta(rb, *(int *)optval);
338 			else
339 				return -1;
340 #endif
341 		case MOS_FRAG_SVRBUF:
342 #if 0
343 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
344 				errno = EBADF;
345 				return -1;
346 			}
347 #endif
348 #ifdef DISABLE_DYN_RESIZE
349 			if (*(int *)optval != 0)
350 				return -1;
351 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
352 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
353 					socket->monitor_stream->stream->rcvvar->rcvbuf :
354 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
355 				if (rb)
356 					tcprb_resize(rb, 0);
357 			}
358 			return 0;
359 #else
360 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
361 				socket->monitor_stream->stream->rcvvar->rcvbuf :
362 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
363 			if (rb->len == 0)
364 				return tcprb_resize_meta(rb, *(int *)optval);
365 			else
366 				return -1;
367 #endif
368 		case MOS_MONLEVEL:
369 #ifdef OLD_API
370 			assert(*(int *)optval == MOS_NO_CLIBUF ||
371 			       *(int *)optval == MOS_NO_SVRBUF);
372 			return DisableBuf(socket,
373 					  (*(int *)optval == MOS_NO_CLIBUF) ?
374 					  MOS_SIDE_CLI : MOS_SIDE_SVR);
375 #endif
376 		case MOS_SEQ_REMAP:
377 			return TcpSeqChange(socket,
378 					    (uint32_t)((seq_remap_info *)optval)->seq_off,
379 					    ((seq_remap_info *)optval)->side,
380 					    mtcp->pctx->p.seq);
381 		case MOS_STOP_MON:
382 			return mtcp_cb_stop(mctx, sockid, *(int *)optval);
383 		default:
384 			TRACE_API("invalid optname=%d\n", optname);
385 			assert(0);
386 		}
387 		break;
388 	}
389 
390 	errno = ENOSYS;
391 	return -1;
392 }
393 /*----------------------------------------------------------------------------*/
394 int
395 mtcp_setsock_nonblock(mctx_t mctx, int sockid)
396 {
397 	mtcp_manager_t mtcp;
398 
399 	mtcp = GetMTCPManager(mctx);
400 	if (!mtcp) {
401 		errno = EACCES;
402 		return -1;
403 	}
404 
405 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
406 		TRACE_API("Socket id %d out of range.\n", sockid);
407 		errno = EBADF;
408 		return -1;
409 	}
410 
411 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
412 		TRACE_API("Invalid socket id: %d\n", sockid);
413 		errno = EBADF;
414 		return -1;
415 	}
416 
417 	mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
418 
419 	return 0;
420 }
421 /*----------------------------------------------------------------------------*/
422 int
423 mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp)
424 {
425 	mtcp_manager_t mtcp;
426 	socket_map_t socket;
427 
428 	mtcp = GetMTCPManager(mctx);
429 	if (!mtcp) {
430 		errno = EACCES;
431 		return -1;
432 	}
433 
434 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
435 		TRACE_API("Socket id %d out of range.\n", sockid);
436 		errno = EBADF;
437 		return -1;
438 	}
439 
440 	/* only support stream socket */
441 	socket = &mtcp->smap[sockid];
442 
443 	if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
444 		socket->socktype != MOS_SOCK_STREAM) {
445 		TRACE_API("Invalid socket id: %d\n", sockid);
446 		errno = EBADF;
447 		return -1;
448 	}
449 
450 	if (!argp) {
451 		errno = EFAULT;
452 		return -1;
453 	}
454 
455 	if (request == FIONREAD) {
456 		tcp_stream *cur_stream;
457 		tcprb_t *rbuf;
458 
459 		cur_stream = socket->stream;
460 		if (!cur_stream) {
461 			errno = EBADF;
462 			return -1;
463 		}
464 
465 		rbuf = cur_stream->rcvvar->rcvbuf;
466 		*(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0;
467 
468 	} else if (request == FIONBIO) {
469 		/*
470 		 * sockets can only be set to blocking/non-blocking
471 		 * modes during initialization
472 		 */
473 		if ((*(int *)argp))
474 			mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
475 		else
476 			mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK;
477 	} else {
478 		errno = EINVAL;
479 		return -1;
480 	}
481 
482 	return 0;
483 }
484 /*----------------------------------------------------------------------------*/
485 static int
486 mtcp_monitor(mctx_t mctx, socket_map_t sock)
487 {
488 	mtcp_manager_t mtcp;
489 	struct mon_listener *monitor;
490 	int sockid = sock->id;
491 
492 	mtcp = GetMTCPManager(mctx);
493 	if (!mtcp) {
494 		errno = EACCES;
495 		return -1;
496 	}
497 
498 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
499 		TRACE_API("Socket id %d out of range.\n", sockid);
500 		errno = EBADF;
501 		return -1;
502 	}
503 
504 	if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) {
505 		TRACE_API("Invalid socket id: %d\n", sockid);
506 		errno = EBADF;
507 		return -1;
508 	}
509 
510 	if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM ||
511 	      mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) {
512 		TRACE_API("Not a monitor socket. id: %d\n", sockid);
513 		errno = ENOTSOCK;
514 		return -1;
515 	}
516 
517 	monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener));
518 	if (!monitor) {
519 		/* errno set from the malloc() */
520 		errno = ENOMEM;
521 		return -1;
522 	}
523 
524 	/* create a monitor-specific event queue */
525 	monitor->eq = CreateEventQueue(g_config.mos->max_concurrency);
526 	if (!monitor->eq) {
527 		TRACE_API("Can't create event queue (concurrency: %d) for "
528 			  "monitor read event registrations!\n",
529 			  g_config.mos->max_concurrency);
530 		free(monitor);
531 		errno = ENOMEM;
532 		return -1;
533 	}
534 
535 	/* set monitor-related basic parameters */
536 #ifndef NEWEV
537 	monitor->ude_id = UDE_OFFSET;
538 #endif
539 	monitor->socket = sock;
540 	monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL;
541 
542 	/* perform both sides monitoring by default */
543 	monitor->client_mon = monitor->server_mon = 1;
544 
545 	/* add monitor socket to the monitor list */
546 	TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link);
547 
548 	mtcp->msmap[sockid].monitor_listener = monitor;
549 
550 	return 0;
551 }
552 /*----------------------------------------------------------------------------*/
553 int
554 mtcp_socket(mctx_t mctx, int domain, int type, int protocol)
555 {
556 	mtcp_manager_t mtcp;
557 	socket_map_t socket;
558 
559 	mtcp = GetMTCPManager(mctx);
560 	if (!mtcp) {
561 		errno = EACCES;
562 		return -1;
563 	}
564 
565 	if (domain != AF_INET) {
566 		errno = EAFNOSUPPORT;
567 		return -1;
568 	}
569 
570 	if (type == SOCK_STREAM) {
571 		type = MOS_SOCK_STREAM;
572 	} else if (type == MOS_SOCK_MONITOR_STREAM ||
573 		   type == MOS_SOCK_MONITOR_RAW) {
574 		/* do nothing for the time being */
575 	} else {
576 		/* Not supported type */
577 		errno = EINVAL;
578 		return -1;
579 	}
580 
581 	socket = AllocateSocket(mctx, type);
582 	if (!socket) {
583 		errno = ENFILE;
584 		return -1;
585 	}
586 
587 	if (type == MOS_SOCK_MONITOR_STREAM ||
588 	    type == MOS_SOCK_MONITOR_RAW) {
589 		mtcp_manager_t mtcp = GetMTCPManager(mctx);
590 		if (!mtcp) {
591 			errno = EACCES;
592 			return -1;
593 		}
594 		mtcp_monitor(mctx, socket);
595 #ifdef NEWEV
596 		socket->monitor_listener->stree_dontcare = NULL;
597 		socket->monitor_listener->stree_pre_rcv = NULL;
598 		socket->monitor_listener->stree_post_snd = NULL;
599 #else
600 		InitEvB(mtcp, &socket->monitor_listener->dontcare_evb);
601 		InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb);
602 		InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb);
603 #endif
604 	}
605 
606 	return socket->id;
607 }
608 /*----------------------------------------------------------------------------*/
609 int
610 mtcp_bind(mctx_t mctx, int sockid,
611 		const struct sockaddr *addr, socklen_t addrlen)
612 {
613 	mtcp_manager_t mtcp;
614 	struct sockaddr_in *addr_in;
615 
616 	mtcp = GetMTCPManager(mctx);
617 	if (!mtcp) {
618 		errno = EACCES;
619 		return -1;
620 	}
621 
622 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
623 		TRACE_API("Socket id %d out of range.\n", sockid);
624 		errno = EBADF;
625 		return -1;
626 	}
627 
628 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
629 		TRACE_API("Invalid socket id: %d\n", sockid);
630 		errno = EBADF;
631 		return -1;
632 	}
633 
634 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM &&
635 			mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
636 		TRACE_API("Not a stream socket id: %d\n", sockid);
637 		errno = ENOTSOCK;
638 		return -1;
639 	}
640 
641 	if (!addr) {
642 		TRACE_API("Socket %d: empty address!\n", sockid);
643 		errno = EINVAL;
644 		return -1;
645 	}
646 
647 	if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) {
648 		TRACE_API("Socket %d: adress already bind for this socket.\n", sockid);
649 		errno = EINVAL;
650 		return -1;
651 	}
652 
653 	/* we only allow bind() for AF_INET address */
654 	if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
655 		TRACE_API("Socket %d: invalid argument!\n", sockid);
656 		errno = EINVAL;
657 		return -1;
658 	}
659 
660 	if (mtcp->listener) {
661 		TRACE_API("Address already bound!\n");
662 		errno = EINVAL;
663 		return -1;
664 	}
665 	addr_in = (struct sockaddr_in *)addr;
666 	mtcp->smap[sockid].saddr = *addr_in;
667 	mtcp->smap[sockid].opts |= MTCP_ADDR_BIND;
668 
669 	return 0;
670 }
671 /*----------------------------------------------------------------------------*/
672 int
673 mtcp_listen(mctx_t mctx, int sockid, int backlog)
674 {
675 	mtcp_manager_t mtcp;
676 	struct tcp_listener *listener;
677 
678 	mtcp = GetMTCPManager(mctx);
679 	if (!mtcp) {
680 		errno = EACCES;
681 		return -1;
682 	}
683 
684 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
685 		TRACE_API("Socket id %d out of range.\n", sockid);
686 		errno = EBADF;
687 		return -1;
688 	}
689 
690 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
691 		TRACE_API("Invalid socket id: %d\n", sockid);
692 		errno = EBADF;
693 		return -1;
694 	}
695 
696 	if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) {
697 		mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN;
698 	}
699 
700 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
701 		TRACE_API("Not a listening socket. id: %d\n", sockid);
702 		errno = ENOTSOCK;
703 		return -1;
704 	}
705 
706 	if (backlog <= 0 || backlog > g_config.mos->max_concurrency) {
707 		errno = EINVAL;
708 		return -1;
709 	}
710 
711 	listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener));
712 	if (!listener) {
713 		/* errno set from the malloc() */
714 		errno = ENOMEM;
715 		return -1;
716 	}
717 
718 	listener->sockid = sockid;
719 	listener->backlog = backlog;
720 	listener->socket = &mtcp->smap[sockid];
721 
722 	if (pthread_cond_init(&listener->accept_cond, NULL)) {
723 		perror("pthread_cond_init of ctx->accept_cond\n");
724 		/* errno set by pthread_cond_init() */
725 		return -1;
726 	}
727 	if (pthread_mutex_init(&listener->accept_lock, NULL)) {
728 		perror("pthread_mutex_init of ctx->accept_lock\n");
729 		/* errno set by pthread_mutex_init() */
730 		return -1;
731 	}
732 
733 	listener->acceptq = CreateStreamQueue(backlog);
734 	if (!listener->acceptq) {
735 		errno = ENOMEM;
736 		return -1;
737 	}
738 
739 	mtcp->smap[sockid].listener = listener;
740 	mtcp->listener = listener;
741 
742 	return 0;
743 }
744 /*----------------------------------------------------------------------------*/
745 int
746 mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen)
747 {
748 	mtcp_manager_t mtcp;
749 	struct tcp_listener *listener;
750 	socket_map_t socket;
751 	tcp_stream *accepted = NULL;
752 
753 	mtcp = GetMTCPManager(mctx);
754 	if (!mtcp) {
755 		errno = EACCES;
756 		return -1;
757 	}
758 
759 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
760 		TRACE_API("Socket id %d out of range.\n", sockid);
761 		errno = EBADF;
762 		return -1;
763 	}
764 
765 	/* requires listening socket */
766 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
767 		errno = EINVAL;
768 		return -1;
769 	}
770 
771 	listener = mtcp->smap[sockid].listener;
772 
773 	/* dequeue from the acceptq without lock first */
774 	/* if nothing there, acquire lock and cond_wait */
775 	accepted = StreamDequeue(listener->acceptq);
776 	if (!accepted) {
777 		if (listener->socket->opts & MTCP_NONBLOCK) {
778 			errno = EAGAIN;
779 			return -1;
780 
781 		} else {
782 			pthread_mutex_lock(&listener->accept_lock);
783 			while ((accepted = StreamDequeue(listener->acceptq)) == NULL) {
784 				pthread_cond_wait(&listener->accept_cond, &listener->accept_lock);
785 
786 				if (mtcp->ctx->done || mtcp->ctx->exit) {
787 					pthread_mutex_unlock(&listener->accept_lock);
788 					errno = EINTR;
789 					return -1;
790 				}
791 			}
792 			pthread_mutex_unlock(&listener->accept_lock);
793 		}
794 	}
795 
796 	if (!accepted) {
797 		TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n");
798 	}
799 
800 	if (!accepted->socket) {
801 		socket = AllocateSocket(mctx, MOS_SOCK_STREAM);
802 		if (!socket) {
803 			TRACE_ERROR("Failed to create new socket!\n");
804 			/* TODO: destroy the stream */
805 			errno = ENFILE;
806 			return -1;
807 		}
808 		socket->stream = accepted;
809 		accepted->socket = socket;
810 		/* if monitor is enabled, complete the socket assignment */
811 		if (socket->stream->pair_stream != NULL)
812 			socket->stream->pair_stream->socket = socket;
813 	}
814 
815 	TRACE_API("Stream %d accepted.\n", accepted->id);
816 
817 	if (addr && addrlen) {
818 		struct sockaddr_in *addr_in = (struct sockaddr_in *)addr;
819 		addr_in->sin_family = AF_INET;
820 		addr_in->sin_port = accepted->dport;
821 		addr_in->sin_addr.s_addr = accepted->daddr;
822 		*addrlen = sizeof(struct sockaddr_in);
823 	}
824 
825 	return accepted->socket->id;
826 }
827 /*----------------------------------------------------------------------------*/
828 int
829 mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr,
830 		in_addr_t daddr, in_addr_t dport)
831 {
832 	mtcp_manager_t mtcp;
833 	addr_pool_t ap;
834 
835 	mtcp = GetMTCPManager(mctx);
836 	if (!mtcp) {
837 		errno = EACCES;
838 		return -1;
839 	}
840 
841 	if (saddr_base == INADDR_ANY) {
842 		int nif_out;
843 
844 		/* for the INADDR_ANY, find the output interface for the destination
845 		   and set the saddr_base as the ip address of the output interface */
846 		nif_out = GetOutputInterface(daddr);
847 		saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr;
848 	}
849 
850 	ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus,
851 			saddr_base, num_addr, daddr, dport);
852 	if (!ap) {
853 		errno = ENOMEM;
854 		return -1;
855 	}
856 
857 	mtcp->ap = ap;
858 
859 	return 0;
860 }
861 /*----------------------------------------------------------------------------*/
862 int
863 eval_bpf_5tuple(struct sfbpf_program fcode,
864 				in_addr_t saddr, in_port_t sport,
865 				in_addr_t daddr, in_port_t dport) {
866 	uint8_t buf[TOTAL_TCP_HEADER_LEN];
867 	struct ethhdr *ethh;
868 	struct iphdr *iph;
869 	struct tcphdr *tcph;
870 
871 	ethh = (struct ethhdr *)buf;
872 	ethh->h_proto = htons(ETH_P_IP);
873 	iph = (struct iphdr *)(ethh + 1);
874 	iph->ihl = IP_HEADER_LEN >> 2;
875 	iph->version = 4;
876 	iph->tos = 0;
877 	iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN);
878 	iph->id = htons(0);
879 	iph->protocol = IPPROTO_TCP;
880 	iph->saddr = saddr;
881 	iph->daddr = daddr;
882 	iph->check = 0;
883 	tcph = (struct tcphdr *)(iph + 1);
884 	tcph->source = sport;
885 	tcph->dest = dport;
886 
887 	return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr),
888 						 TOTAL_TCP_HEADER_LEN);
889 }
890 /*----------------------------------------------------------------------------*/
891 int
892 mtcp_connect(mctx_t mctx, int sockid,
893 		const struct sockaddr *addr, socklen_t addrlen)
894 {
895 	mtcp_manager_t mtcp;
896 	socket_map_t socket;
897 	tcp_stream *cur_stream;
898 	struct sockaddr_in *addr_in;
899 	in_addr_t dip;
900 	in_port_t dport;
901 	int is_dyn_bound = FALSE;
902 	int ret;
903 	int cnt_match = 0;
904 	struct mon_listener *walk;
905 	struct sfbpf_program fcode;
906 
907 	cur_stream = NULL;
908 	mtcp = GetMTCPManager(mctx);
909 	if (!mtcp) {
910 		errno = EACCES;
911 		return -1;
912 	}
913 
914 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
915 		TRACE_API("Socket id %d out of range.\n", sockid);
916 		errno = EBADF;
917 		return -1;
918 	}
919 
920 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
921 		TRACE_API("Invalid socket id: %d\n", sockid);
922 		errno = EBADF;
923 		return -1;
924 	}
925 
926 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
927 		TRACE_API("Not an end socket. id: %d\n", sockid);
928 		errno = ENOTSOCK;
929 		return -1;
930 	}
931 
932 	if (!addr) {
933 		TRACE_API("Socket %d: empty address!\n", sockid);
934 		errno = EFAULT;
935 		return -1;
936 	}
937 
938 	/* we only allow bind() for AF_INET address */
939 	if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
940 		TRACE_API("Socket %d: invalid argument!\n", sockid);
941 		errno = EAFNOSUPPORT;
942 		return -1;
943 	}
944 
945 	socket = &mtcp->smap[sockid];
946 	if (socket->stream) {
947 		TRACE_API("Socket %d: stream already exist!\n", sockid);
948 		if (socket->stream->state >= TCP_ST_ESTABLISHED) {
949 			errno = EISCONN;
950 		} else {
951 			errno = EALREADY;
952 		}
953 		return -1;
954 	}
955 
956 	addr_in = (struct sockaddr_in *)addr;
957 	dip = addr_in->sin_addr.s_addr;
958 	dport = addr_in->sin_port;
959 
960 	/* address binding */
961 	if (socket->opts & MTCP_ADDR_BIND &&
962 	    socket->saddr.sin_port != INPORT_ANY &&
963 	    socket->saddr.sin_addr.s_addr != INADDR_ANY) {
964 		int rss_core;
965 
966 		rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip,
967 				socket->saddr.sin_port, dport, num_queues);
968 
969 		if (rss_core != mctx->cpu) {
970 			errno = EINVAL;
971 			return -1;
972 		}
973 	} else {
974 		if (mtcp->ap) {
975 			ret = FetchAddress(mtcp->ap,
976 					mctx->cpu, num_queues, addr_in, &socket->saddr);
977 		} else {
978 			ret = FetchAddress(ap,
979 					mctx->cpu, num_queues, addr_in, &socket->saddr);
980 		}
981 		if (ret < 0) {
982 			errno = EAGAIN;
983 			return -1;
984 		}
985 		socket->opts |= MTCP_ADDR_BIND;
986 		is_dyn_bound = TRUE;
987 	}
988 
989 	cnt_match = 0;
990 	if (mtcp->num_msp > 0) {
991 		TAILQ_FOREACH(walk, &mtcp->monitors, link) {
992 			fcode = walk->stream_syn_fcode;
993 			if (!(ISSET_BPFFILTER(fcode) &&
994 				  eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr,
995 								  socket->saddr.sin_port,
996 								  dip, dport) == 0)) {
997 				walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1
998 				cnt_match++;
999 			}
1000 		}
1001 	}
1002 
1003 	if (mtcp->num_msp > 0 && cnt_match > 0) {
1004 		/* 150820 dhkim: XXX: embedded mode is not verified */
1005 #if 1
1006 		cur_stream = CreateClientTCPStream(mtcp, socket,
1007 						 STREAM_TYPE(MOS_SOCK_STREAM) |
1008 						 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
1009 						 socket->saddr.sin_addr.s_addr,
1010 						 socket->saddr.sin_port, dip, dport, NULL);
1011 #else
1012 		cur_stream = CreateDualTCPStream(mtcp, socket,
1013 						 STREAM_TYPE(MOS_SOCK_STREAM) |
1014 						 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
1015 						 socket->saddr.sin_addr.s_addr,
1016 						 socket->saddr.sin_port, dip, dport, NULL);
1017 #endif
1018 	}
1019 	else
1020 		cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM),
1021 					     socket->saddr.sin_addr.s_addr,
1022 					     socket->saddr.sin_port, dip, dport, NULL);
1023 	if (!cur_stream) {
1024 		TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid);
1025 		errno = ENOMEM;
1026 		return -1;
1027 	}
1028 
1029 	if (is_dyn_bound)
1030 		cur_stream->is_bound_addr = TRUE;
1031 	cur_stream->sndvar->cwnd = 1;
1032 	cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10;
1033 	cur_stream->side = MOS_SIDE_CLI;
1034 	/* if monitor is enabled, update the pair stream side as well */
1035 	if (cur_stream->pair_stream) {
1036 		cur_stream->pair_stream->side = MOS_SIDE_SVR;
1037 		/*
1038 		 * if buffer management is off, then disable
1039 		 * monitoring tcp ring of server...
1040 		 * if there is even a single monitor asking for
1041 		 * buffer management, enable it (that's why the
1042 		 * need for the loop)
1043 		 */
1044 		cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF;
1045 		struct socket_map *walk;
1046 		SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
1047 			uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt;
1048 			if (bm > cur_stream->pair_stream->buffer_mgmt) {
1049 				cur_stream->pair_stream->buffer_mgmt = bm;
1050 				break;
1051 			}
1052 		} SOCKQ_FOREACH_END;
1053 	}
1054 
1055 	cur_stream->state = TCP_ST_SYN_SENT;
1056 	cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1057 
1058 	TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id);
1059 
1060 	SQ_LOCK(&mtcp->ctx->connect_lock);
1061 	ret = StreamEnqueue(mtcp->connectq, cur_stream);
1062 	SQ_UNLOCK(&mtcp->ctx->connect_lock);
1063 	mtcp->wakeup_flag = TRUE;
1064 	if (ret < 0) {
1065 		TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid);
1066 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1067 		StreamEnqueue(mtcp->destroyq, cur_stream);
1068 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1069 		errno = EAGAIN;
1070 		return -1;
1071 	}
1072 
1073 	/* if nonblocking socket, return EINPROGRESS */
1074 	if (socket->opts & MTCP_NONBLOCK) {
1075 		errno = EINPROGRESS;
1076 		return -1;
1077 
1078 	} else {
1079 		while (1) {
1080 			if (!cur_stream) {
1081 				TRACE_ERROR("STREAM DESTROYED\n");
1082 				errno = ETIMEDOUT;
1083 				return -1;
1084 			}
1085 			if (cur_stream->state > TCP_ST_ESTABLISHED) {
1086 				TRACE_ERROR("Socket %d: weird state %s\n",
1087 						sockid, TCPStateToString(cur_stream));
1088 				// TODO: how to handle this?
1089 				errno = ENOSYS;
1090 				return -1;
1091 			}
1092 
1093 			if (cur_stream->state == TCP_ST_ESTABLISHED) {
1094 				break;
1095 			}
1096 			usleep(1000);
1097 		}
1098 	}
1099 
1100 	return 0;
1101 }
1102 /*----------------------------------------------------------------------------*/
1103 static inline int
1104 CloseStreamSocket(mctx_t mctx, int sockid)
1105 {
1106 	mtcp_manager_t mtcp;
1107 	tcp_stream *cur_stream;
1108 	int ret;
1109 
1110 	mtcp = GetMTCPManager(mctx);
1111 	if (!mtcp) {
1112 		errno = EACCES;
1113 		return -1;
1114 	}
1115 
1116 	cur_stream = mtcp->smap[sockid].stream;
1117 	if (!cur_stream) {
1118 		TRACE_API("Socket %d: stream does not exist.\n", sockid);
1119 		errno = ENOTCONN;
1120 		return -1;
1121 	}
1122 
1123 	if (cur_stream->closed) {
1124 		TRACE_API("Socket %d (Stream %u): already closed stream\n",
1125 				sockid, cur_stream->id);
1126 		return 0;
1127 	}
1128 	cur_stream->closed = TRUE;
1129 
1130 	TRACE_API("Stream %d: closing the stream.\n", cur_stream->id);
1131 
1132 	/* 141029 dhkim: Check this! */
1133 	cur_stream->socket = NULL;
1134 
1135 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
1136 		TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n",
1137 				cur_stream->id);
1138 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1139 		StreamEnqueue(mtcp->destroyq, cur_stream);
1140 		mtcp->wakeup_flag = TRUE;
1141 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1142 		return 0;
1143 
1144 	} else if (cur_stream->state == TCP_ST_SYN_SENT) {
1145 #if 1
1146 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1147 		StreamEnqueue(mtcp->destroyq, cur_stream);
1148 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1149 		mtcp->wakeup_flag = TRUE;
1150 #endif
1151 		return -1;
1152 
1153 	} else if (cur_stream->state != TCP_ST_ESTABLISHED &&
1154 			cur_stream->state != TCP_ST_CLOSE_WAIT) {
1155 		TRACE_API("Stream %d at state %s\n",
1156 				cur_stream->id, TCPStateToString(cur_stream));
1157 		errno = EBADF;
1158 		return -1;
1159 	}
1160 
1161 	SQ_LOCK(&mtcp->ctx->close_lock);
1162 	cur_stream->sndvar->on_closeq = TRUE;
1163 	ret = StreamEnqueue(mtcp->closeq, cur_stream);
1164 	mtcp->wakeup_flag = TRUE;
1165 	SQ_UNLOCK(&mtcp->ctx->close_lock);
1166 
1167 	if (ret < 0) {
1168 		TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
1169 		errno = EAGAIN;
1170 		return -1;
1171 	}
1172 
1173 	return 0;
1174 }
1175 /*----------------------------------------------------------------------------*/
1176 static inline int
1177 CloseListeningSocket(mctx_t mctx, int sockid)
1178 {
1179 	mtcp_manager_t mtcp;
1180 	struct tcp_listener *listener;
1181 
1182 	mtcp = GetMTCPManager(mctx);
1183 	if (!mtcp) {
1184 		errno = EACCES;
1185 		return -1;
1186 	}
1187 
1188 	listener = mtcp->smap[sockid].listener;
1189 	if (!listener) {
1190 		errno = EINVAL;
1191 		return -1;
1192 	}
1193 
1194 	if (listener->acceptq) {
1195 		DestroyStreamQueue(listener->acceptq);
1196 		listener->acceptq = NULL;
1197 	}
1198 
1199 	pthread_mutex_lock(&listener->accept_lock);
1200 	pthread_cond_signal(&listener->accept_cond);
1201 	pthread_mutex_unlock(&listener->accept_lock);
1202 
1203 	pthread_cond_destroy(&listener->accept_cond);
1204 	pthread_mutex_destroy(&listener->accept_lock);
1205 
1206 	free(listener);
1207 	mtcp->smap[sockid].listener = NULL;
1208 
1209 	return 0;
1210 }
1211 /*----------------------------------------------------------------------------*/
1212 int
1213 mtcp_close(mctx_t mctx, int sockid)
1214 {
1215 	mtcp_manager_t mtcp;
1216 	int ret;
1217 
1218 	mtcp = GetMTCPManager(mctx);
1219 	if (!mtcp) {
1220 		errno = EACCES;
1221 		return -1;
1222 	}
1223 
1224 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1225 		TRACE_API("Socket id %d out of range.\n", sockid);
1226 		errno = EBADF;
1227 		return -1;
1228 	}
1229 
1230 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1231 		TRACE_API("Invalid socket id: %d\n", sockid);
1232 		errno = EBADF;
1233 		return -1;
1234 	}
1235 
1236 	TRACE_API("Socket %d: mtcp_close called.\n", sockid);
1237 
1238 	switch (mtcp->smap[sockid].socktype) {
1239 	case MOS_SOCK_STREAM:
1240 		ret = CloseStreamSocket(mctx, sockid);
1241 		break;
1242 
1243 	case MOS_SOCK_STREAM_LISTEN:
1244 		ret = CloseListeningSocket(mctx, sockid);
1245 		break;
1246 
1247 	case MOS_SOCK_EPOLL:
1248 		ret = CloseEpollSocket(mctx, sockid);
1249 		break;
1250 
1251 	case MOS_SOCK_PIPE:
1252 		ret = PipeClose(mctx, sockid);
1253 		break;
1254 
1255 	default:
1256 		errno = EINVAL;
1257 		ret = -1;
1258 		break;
1259 	}
1260 
1261 	FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
1262 
1263 	return ret;
1264 }
1265 /*----------------------------------------------------------------------------*/
1266 int
1267 mtcp_abort(mctx_t mctx, int sockid)
1268 {
1269 	mtcp_manager_t mtcp;
1270 	tcp_stream *cur_stream;
1271 	int ret;
1272 
1273 	mtcp = GetMTCPManager(mctx);
1274 	if (!mtcp) {
1275 		errno = EACCES;
1276 		return -1;
1277 	}
1278 
1279 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1280 		TRACE_API("Socket id %d out of range.\n", sockid);
1281 		errno = EBADF;
1282 		return -1;
1283 	}
1284 
1285 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1286 		TRACE_API("Invalid socket id: %d\n", sockid);
1287 		errno = EBADF;
1288 		return -1;
1289 	}
1290 
1291 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
1292 		TRACE_API("Not an end socket. id: %d\n", sockid);
1293 		errno = ENOTSOCK;
1294 		return -1;
1295 	}
1296 
1297 	cur_stream = mtcp->smap[sockid].stream;
1298 	if (!cur_stream) {
1299 		TRACE_API("Stream %d: does not exist.\n", sockid);
1300 		errno = ENOTCONN;
1301 		return -1;
1302 	}
1303 
1304 	TRACE_API("Socket %d: mtcp_abort()\n", sockid);
1305 
1306 	FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
1307 	cur_stream->socket = NULL;
1308 
1309 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
1310 		TRACE_API("Stream %d: connection already reset.\n", sockid);
1311 		return ERROR;
1312 
1313 	} else if (cur_stream->state == TCP_ST_SYN_SENT) {
1314 		/* TODO: this should notify event failure to all
1315 		   previous read() or write() calls */
1316 		cur_stream->state = TCP_ST_CLOSED_RSVD;
1317 		cur_stream->close_reason = TCP_ACTIVE_CLOSE;
1318 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1319 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1320 		StreamEnqueue(mtcp->destroyq, cur_stream);
1321 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1322 		mtcp->wakeup_flag = TRUE;
1323 		return 0;
1324 
1325 	} else if (cur_stream->state == TCP_ST_CLOSING ||
1326 			cur_stream->state == TCP_ST_LAST_ACK ||
1327 			cur_stream->state == TCP_ST_TIME_WAIT) {
1328 		cur_stream->state = TCP_ST_CLOSED_RSVD;
1329 		cur_stream->close_reason = TCP_ACTIVE_CLOSE;
1330 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1331 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1332 		StreamEnqueue(mtcp->destroyq, cur_stream);
1333 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1334 		mtcp->wakeup_flag = TRUE;
1335 		return 0;
1336 	}
1337 
1338 	/* the stream structure will be destroyed after sending RST */
1339 	if (cur_stream->sndvar->on_resetq) {
1340 		TRACE_ERROR("Stream %d: calling mtcp_abort() "
1341 				"when in reset queue.\n", sockid);
1342 		errno = ECONNRESET;
1343 		return -1;
1344 	}
1345 	SQ_LOCK(&mtcp->ctx->reset_lock);
1346 	cur_stream->sndvar->on_resetq = TRUE;
1347 	ret = StreamEnqueue(mtcp->resetq, cur_stream);
1348 	SQ_UNLOCK(&mtcp->ctx->reset_lock);
1349 	mtcp->wakeup_flag = TRUE;
1350 
1351 	if (ret < 0) {
1352 		TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
1353 		errno = EAGAIN;
1354 		return -1;
1355 	}
1356 
1357 	return 0;
1358 }
1359 /*----------------------------------------------------------------------------*/
1360 static inline int
1361 CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len)
1362 {
1363 	struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
1364 	int copylen;
1365 	tcprb_t *rb = rcvvar->rcvbuf;
1366 	if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) {
1367 		errno = EAGAIN;
1368 		return -1;
1369 	}
1370 	tcprb_setpile(rb, rb->pile + copylen);
1371 
1372 	rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb);
1373 	//printf("rcv_wnd: %d\n", rcvvar->rcv_wnd);
1374 
1375 	/* Advertise newly freed receive buffer */
1376 	if (cur_stream->need_wnd_adv) {
1377 		if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) {
1378 			if (!cur_stream->sndvar->on_ackq) {
1379 				SQ_LOCK(&mtcp->ctx->ackq_lock);
1380 				cur_stream->sndvar->on_ackq = TRUE;
1381 				StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */
1382 				SQ_UNLOCK(&mtcp->ctx->ackq_lock);
1383 				cur_stream->need_wnd_adv = FALSE;
1384 				mtcp->wakeup_flag = TRUE;
1385 			}
1386 		}
1387 	}
1388 
1389 	return copylen;
1390 }
1391 /*----------------------------------------------------------------------------*/
1392 ssize_t
1393 mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len)
1394 {
1395 	mtcp_manager_t mtcp;
1396 	socket_map_t socket;
1397 	tcp_stream *cur_stream;
1398 	struct tcp_recv_vars *rcvvar;
1399 	int event_remaining, merged_len;
1400 	int ret;
1401 
1402 	mtcp = GetMTCPManager(mctx);
1403 	if (!mtcp) {
1404 		errno = EACCES;
1405 		return -1;
1406 	}
1407 
1408 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1409 		TRACE_API("Socket id %d out of range.\n", sockid);
1410 		errno = EBADF;
1411 		return -1;
1412 	}
1413 
1414 	socket = &mtcp->smap[sockid];
1415 	if (socket->socktype == MOS_SOCK_UNUSED) {
1416 		TRACE_API("Invalid socket id: %d\n", sockid);
1417 		errno = EBADF;
1418 		return -1;
1419 	}
1420 
1421 	if (socket->socktype == MOS_SOCK_PIPE) {
1422 		return PipeRead(mctx, sockid, buf, len);
1423 	}
1424 
1425 	if (socket->socktype != MOS_SOCK_STREAM) {
1426 		TRACE_API("Not an end socket. id: %d\n", sockid);
1427 		errno = ENOTSOCK;
1428 		return -1;
1429 	}
1430 
1431 	/* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
1432 	cur_stream = socket->stream;
1433 	if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf ||
1434 			!(cur_stream->state >= TCP_ST_ESTABLISHED &&
1435 			cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
1436 		errno = ENOTCONN;
1437 		return -1;
1438 	}
1439 
1440 	rcvvar = cur_stream->rcvvar;
1441 
1442 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1443 
1444 	/* if CLOSE_WAIT, return 0 if there is no payload */
1445 	if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
1446 		if (!rcvvar->rcvbuf)
1447 			return 0;
1448 
1449 		if (merged_len == 0)
1450 			return 0;
1451 	}
1452 
1453 	/* return EAGAIN if no receive buffer */
1454 	if (socket->opts & MTCP_NONBLOCK) {
1455 		if (!rcvvar->rcvbuf || merged_len == 0) {
1456 			errno = EAGAIN;
1457 			return -1;
1458 		}
1459 	}
1460 
1461 	SBUF_LOCK(&rcvvar->read_lock);
1462 
1463 	ret = CopyToUser(mtcp, cur_stream, buf, len);
1464 
1465 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1466 	event_remaining = FALSE;
1467 	/* if there are remaining payload, generate EPOLLIN */
1468 	/* (may due to insufficient user buffer) */
1469 	if (socket->epoll & MOS_EPOLLIN) {
1470 		if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
1471 			event_remaining = TRUE;
1472 		}
1473 	}
1474 	/* if waiting for close, notify it if no remaining data */
1475 	if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
1476 			merged_len == 0 && ret > 0) {
1477 		event_remaining = TRUE;
1478 	}
1479 
1480 	SBUF_UNLOCK(&rcvvar->read_lock);
1481 
1482 	if (event_remaining) {
1483 		if (socket->epoll) {
1484 			AddEpollEvent(mtcp->ep,
1485 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
1486 		}
1487 	}
1488 
1489 	TRACE_API("Stream %d: mtcp_read() returning %d\n", cur_stream->id, ret);
1490 	return ret;
1491 }
1492 /*----------------------------------------------------------------------------*/
1493 ssize_t
1494 mtcp_readv(mctx_t mctx, int sockid, struct iovec *iov, int numIOV)
1495 {
1496 	mtcp_manager_t mtcp;
1497 	socket_map_t socket;
1498 	tcp_stream *cur_stream;
1499 	struct tcp_recv_vars *rcvvar;
1500 	int ret, bytes_read, i;
1501 	int event_remaining, merged_len;
1502 
1503 	mtcp = GetMTCPManager(mctx);
1504 	if (!mtcp) {
1505 		errno = EACCES;
1506 		return -1;
1507 	}
1508 
1509 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1510 		TRACE_API("Socket id %d out of range.\n", sockid);
1511 		errno = EBADF;
1512 		return -1;
1513 	}
1514 
1515 	socket = &mtcp->smap[sockid];
1516 	if (socket->socktype == MOS_SOCK_UNUSED) {
1517 		TRACE_API("Invalid socket id: %d\n", sockid);
1518 		errno = EBADF;
1519 		return -1;
1520 	}
1521 
1522 	if (socket->socktype != MOS_SOCK_STREAM) {
1523 		TRACE_API("Not an end socket. id: %d\n", sockid);
1524 		errno = ENOTSOCK;
1525 		return -1;
1526 	}
1527 
1528 	/* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
1529 	cur_stream = socket->stream;
1530 	if (!cur_stream ||
1531 			!(cur_stream->state >= TCP_ST_ESTABLISHED &&
1532 			  cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
1533 		errno = ENOTCONN;
1534 		return -1;
1535 	}
1536 
1537 	rcvvar = cur_stream->rcvvar;
1538 
1539 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1540 
1541 	/* if CLOSE_WAIT, return 0 if there is no payload */
1542 	if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
1543 		if (!rcvvar->rcvbuf)
1544 			return 0;
1545 
1546 		if (merged_len == 0)
1547 			return 0;
1548 	}
1549 
1550 	/* return EAGAIN if no receive buffer */
1551 	if (socket->opts & MTCP_NONBLOCK) {
1552 		if (!rcvvar->rcvbuf || merged_len == 0) {
1553 			errno = EAGAIN;
1554 			return -1;
1555 		}
1556 	}
1557 
1558 	SBUF_LOCK(&rcvvar->read_lock);
1559 
1560 	/* read and store the contents to the vectored buffers */
1561 	bytes_read = 0;
1562 	for (i = 0; i < numIOV; i++) {
1563 		if (iov[i].iov_len <= 0)
1564 			continue;
1565 
1566 		ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
1567 		if (ret <= 0)
1568 			break;
1569 
1570 		bytes_read += ret;
1571 
1572 		if (ret < iov[i].iov_len)
1573 			break;
1574 	}
1575 
1576 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1577 
1578 	event_remaining = FALSE;
1579 	/* if there are remaining payload, generate read event */
1580 	/* (may due to insufficient user buffer) */
1581 	if (socket->epoll & MOS_EPOLLIN) {
1582 		if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
1583 			event_remaining = TRUE;
1584 		}
1585 	}
1586 	/* if waiting for close, notify it if no remaining data */
1587 	if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
1588 			merged_len == 0 && bytes_read > 0) {
1589 		event_remaining = TRUE;
1590 	}
1591 
1592 	SBUF_UNLOCK(&rcvvar->read_lock);
1593 
1594 	if(event_remaining) {
1595 		if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) {
1596 			AddEpollEvent(mtcp->ep,
1597 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
1598 		}
1599 	}
1600 
1601 	TRACE_API("Stream %d: mtcp_readv() returning %d\n",
1602 			cur_stream->id, bytes_read);
1603 	return bytes_read;
1604 }
1605 /*----------------------------------------------------------------------------*/
1606 static inline int
1607 CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len)
1608 {
1609 	struct tcp_send_vars *sndvar = cur_stream->sndvar;
1610 	int sndlen;
1611 	int ret;
1612 
1613 	sndlen = MIN((int)sndvar->snd_wnd, len);
1614 	if (sndlen <= 0) {
1615 		errno = EAGAIN;
1616 		return -1;
1617 	}
1618 
1619 	/* allocate send buffer if not exist */
1620 	if (!sndvar->sndbuf) {
1621 		sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1);
1622 		if (!sndvar->sndbuf) {
1623 			cur_stream->close_reason = TCP_NO_MEM;
1624 			/* notification may not required due to -1 return */
1625 			errno = ENOMEM;
1626 			return -1;
1627 		}
1628 	}
1629 
1630 	ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen);
1631 	assert(ret == sndlen);
1632 	sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len;
1633 	if (ret <= 0) {
1634 		TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n",
1635 				ret, sndlen, sndvar->sndbuf->len);
1636 		errno = EAGAIN;
1637 		return -1;
1638 	}
1639 
1640 	if (sndvar->snd_wnd <= 0) {
1641 		TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n",
1642 				cur_stream->id, sndvar->snd_wnd);
1643 	}
1644 
1645 	return ret;
1646 }
1647 /*----------------------------------------------------------------------------*/
1648 ssize_t
1649 mtcp_write(mctx_t mctx, int sockid, char *buf, size_t len)
1650 {
1651 	mtcp_manager_t mtcp;
1652 	socket_map_t socket;
1653 	tcp_stream *cur_stream;
1654 	struct tcp_send_vars *sndvar;
1655 	int ret;
1656 
1657 	mtcp = GetMTCPManager(mctx);
1658 	if (!mtcp) {
1659 		errno = EACCES;
1660 		return -1;
1661 	}
1662 
1663 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1664 		TRACE_API("Socket id %d out of range.\n", sockid);
1665 		errno = EBADF;
1666 		return -1;
1667 	}
1668 
1669 	socket = &mtcp->smap[sockid];
1670 	if (socket->socktype == MOS_SOCK_UNUSED) {
1671 		TRACE_API("Invalid socket id: %d\n", sockid);
1672 		errno = EBADF;
1673 		return -1;
1674 	}
1675 
1676 	if (socket->socktype == MOS_SOCK_PIPE) {
1677 		return PipeWrite(mctx, sockid, buf, len);
1678 	}
1679 
1680 	if (socket->socktype != MOS_SOCK_STREAM) {
1681 		TRACE_API("Not an end socket. id: %d\n", sockid);
1682 		errno = ENOTSOCK;
1683 		return -1;
1684 	}
1685 
1686 	cur_stream = socket->stream;
1687 	if (!cur_stream ||
1688 			!(cur_stream->state == TCP_ST_ESTABLISHED ||
1689 			  cur_stream->state == TCP_ST_CLOSE_WAIT)) {
1690 		errno = ENOTCONN;
1691 		return -1;
1692 	}
1693 
1694 	if (len <= 0) {
1695 		if (socket->opts & MTCP_NONBLOCK) {
1696 			errno = EAGAIN;
1697 			return -1;
1698 		} else {
1699 			return 0;
1700 		}
1701 	}
1702 
1703 	sndvar = cur_stream->sndvar;
1704 
1705 	SBUF_LOCK(&sndvar->write_lock);
1706 	ret = CopyFromUser(mtcp, cur_stream, buf, len);
1707 
1708 	SBUF_UNLOCK(&sndvar->write_lock);
1709 
1710 	if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
1711 		SQ_LOCK(&mtcp->ctx->sendq_lock);
1712 		sndvar->on_sendq = TRUE;
1713 		StreamEnqueue(mtcp->sendq, cur_stream);		/* this always success */
1714 		SQ_UNLOCK(&mtcp->ctx->sendq_lock);
1715 		mtcp->wakeup_flag = TRUE;
1716 	}
1717 
1718 	if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) {
1719 		ret = -1;
1720 		errno = EAGAIN;
1721 	}
1722 
1723 	/* if there are remaining sending buffer, generate write event */
1724 	if (sndvar->snd_wnd > 0) {
1725 		if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
1726 			AddEpollEvent(mtcp->ep,
1727 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
1728 		}
1729 	}
1730 
1731 	TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret);
1732 	return ret;
1733 }
1734 /*----------------------------------------------------------------------------*/
1735 ssize_t
1736 mtcp_writev(mctx_t mctx, int sockid, struct iovec *iov, int numIOV)
1737 {
1738 	mtcp_manager_t mtcp;
1739 	socket_map_t socket;
1740 	tcp_stream *cur_stream;
1741 	struct tcp_send_vars *sndvar;
1742 	int ret, to_write, i;
1743 
1744 	mtcp = GetMTCPManager(mctx);
1745 	if (!mtcp) {
1746 		errno = EACCES;
1747 		return -1;
1748 	}
1749 
1750 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1751 		TRACE_API("Socket id %d out of range.\n", sockid);
1752 		errno = EBADF;
1753 		return -1;
1754 	}
1755 
1756 	socket = &mtcp->smap[sockid];
1757 	if (socket->socktype == MOS_SOCK_UNUSED) {
1758 		TRACE_API("Invalid socket id: %d\n", sockid);
1759 		errno = EBADF;
1760 		return -1;
1761 	}
1762 
1763 	if (socket->socktype != MOS_SOCK_STREAM) {
1764 		TRACE_API("Not an end socket. id: %d\n", sockid);
1765 		errno = ENOTSOCK;
1766 		return -1;
1767 	}
1768 
1769 	cur_stream = socket->stream;
1770 	if (!cur_stream ||
1771 			!(cur_stream->state == TCP_ST_ESTABLISHED ||
1772 			  cur_stream->state == TCP_ST_CLOSE_WAIT)) {
1773 		errno = ENOTCONN;
1774 		return -1;
1775 	}
1776 
1777 	sndvar = cur_stream->sndvar;
1778 	SBUF_LOCK(&sndvar->write_lock);
1779 
1780 	/* write from the vectored buffers */
1781 	to_write = 0;
1782 	for (i = 0; i < numIOV; i++) {
1783 		if (iov[i].iov_len <= 0)
1784 			continue;
1785 
1786 		ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
1787 		if (ret <= 0)
1788 			break;
1789 
1790 		to_write += ret;
1791 
1792 		if (ret < iov[i].iov_len)
1793 			break;
1794 	}
1795 	SBUF_UNLOCK(&sndvar->write_lock);
1796 
1797 	if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
1798 		SQ_LOCK(&mtcp->ctx->sendq_lock);
1799 		sndvar->on_sendq = TRUE;
1800 		StreamEnqueue(mtcp->sendq, cur_stream);		/* this always success */
1801 		SQ_UNLOCK(&mtcp->ctx->sendq_lock);
1802 		mtcp->wakeup_flag = TRUE;
1803 	}
1804 
1805 	if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) {
1806 		to_write = -1;
1807 		errno = EAGAIN;
1808 	}
1809 
1810 	/* if there are remaining sending buffer, generate write event */
1811 	if (sndvar->snd_wnd > 0) {
1812 		if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
1813 			AddEpollEvent(mtcp->ep,
1814 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
1815 		}
1816 	}
1817 
1818 	TRACE_API("Stream %d: mtcp_writev() returning %d\n",
1819 			cur_stream->id, to_write);
1820 	return to_write;
1821 }
1822 /*----------------------------------------------------------------------------*/
1823 uint32_t
1824 mtcp_get_connection_cnt(mctx_t mctx)
1825 {
1826 	mtcp_manager_t mtcp;
1827 	mtcp = GetMTCPManager(mctx);
1828 	if (!mtcp) {
1829 		errno = EACCES;
1830 		return -1;
1831 	}
1832 
1833 	if (mtcp->num_msp > 0)
1834 		return mtcp->flow_cnt / 2;
1835 	else
1836 		return mtcp->flow_cnt;
1837 }
1838 /*----------------------------------------------------------------------------*/
1839