xref: /mOS-networking-stack/core/src/api.c (revision 265cb675)
1 #include <sys/queue.h>
2 #include <sys/ioctl.h>
3 #include <limits.h>
4 #include <unistd.h>
5 #include <assert.h>
6 #include <string.h>
7 
8 #ifdef DARWIN
9 #include <netinet/tcp.h>
10 #include <netinet/ip.h>
11 #include <netinet/if_ether.h>
12 #endif
13 
14 #include "mtcp.h"
15 #include "mtcp_api.h"
16 #include "tcp_in.h"
17 #include "tcp_stream.h"
18 #include "tcp_out.h"
19 #include "ip_out.h"
20 #include "eventpoll.h"
21 #include "pipe.h"
22 #include "fhash.h"
23 #include "addr_pool.h"
24 #include "rss.h"
25 #include "config.h"
26 #include "debug.h"
27 #include "eventpoll.h"
28 #include "mos_api.h"
29 #include "tcp_rb.h"
30 
31 #define MAX(a, b) ((a)>(b)?(a):(b))
32 #define MIN(a, b) ((a)<(b)?(a):(b))
33 
34 /*----------------------------------------------------------------------------*/
35 /** Stop monitoring the socket! (function prototype)
36  * @param [in] mctx: mtcp context
37  * @param [in] sock: monitoring stream socket id
38  * @param [in] side: side of monitoring (client side, server side or both)
39  *
40  * This function is now DEPRECATED and is only used within mOS core...
41  */
42 int
43 mtcp_cb_stop(mctx_t mctx, int sock, int side);
44 /*----------------------------------------------------------------------------*/
45 /** Reset the connection (send RST packets to both sides)
46  *  (We need to decide the API for this.)
47  */
48 //int
49 //mtcp_cb_reset(mctx_t mctx, int sock, int side);
50 /*----------------------------------------------------------------------------*/
51 inline mtcp_manager_t
52 GetMTCPManager(mctx_t mctx)
53 {
54 	if (!mctx) {
55 		errno = EACCES;
56 		return NULL;
57 	}
58 
59 	if (mctx->cpu < 0 || mctx->cpu >= num_cpus) {
60 		errno = EINVAL;
61 		return NULL;
62 	}
63 
64 	if (g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) {
65 		errno = EPERM;
66 		return NULL;
67 	}
68 
69 	return g_mtcp[mctx->cpu];
70 }
71 /*----------------------------------------------------------------------------*/
72 inline int
73 GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen)
74 {
75 	tcp_stream *cur_stream;
76 
77 	if (!socket->stream) {
78 		errno = EBADF;
79 		return -1;
80 	}
81 
82 	cur_stream = socket->stream;
83 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
84 		if (cur_stream->close_reason == TCP_TIMEDOUT ||
85 				cur_stream->close_reason == TCP_CONN_FAIL ||
86 				cur_stream->close_reason == TCP_CONN_LOST) {
87 			*(int *)optval = ETIMEDOUT;
88 			*optlen = sizeof(int);
89 
90 			return 0;
91 		}
92 	}
93 
94 	if (cur_stream->state == TCP_ST_CLOSE_WAIT ||
95 			cur_stream->state == TCP_ST_CLOSED_RSVD) {
96 		if (cur_stream->close_reason == TCP_RESET) {
97 			*(int *)optval = ECONNRESET;
98 			*optlen = sizeof(int);
99 
100 			return 0;
101 		}
102 	}
103 
104 	/*
105 	 * `base case`: If socket sees no so_error, then
106 	 * this also means close_reason will always be
107 	 * TCP_NOT_CLOSED.
108 	 */
109 	if (cur_stream->close_reason == TCP_NOT_CLOSED) {
110 		*(int *)optval = 0;
111 		*optlen = sizeof(int);
112 
113 		return 0;
114 	}
115 
116 	errno = ENOSYS;
117 	return -1;
118 }
119 /*----------------------------------------------------------------------------*/
120 int
121 mtcp_getsockopt(mctx_t mctx, int sockid, int level,
122 		int optname, void *optval, socklen_t *optlen)
123 {
124 	mtcp_manager_t mtcp;
125 	socket_map_t socket;
126 
127 	mtcp = GetMTCPManager(mctx);
128 	if (!mtcp) {
129 		errno = EACCES;
130 		return -1;
131 	}
132 
133 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
134 		TRACE_API("Socket id %d out of range.\n", sockid);
135 		errno = EBADF;
136 		return -1;
137 	}
138 
139 	switch (level) {
140 	case SOL_SOCKET:
141 		socket = &mtcp->smap[sockid];
142 		if (socket->socktype == MOS_SOCK_UNUSED) {
143 			TRACE_API("Invalid socket id: %d\n", sockid);
144 			errno = EBADF;
145 			return -1;
146 		}
147 
148 		if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
149 		    socket->socktype != MOS_SOCK_STREAM) {
150 			TRACE_API("Invalid socket id: %d\n", sockid);
151 			errno = ENOTSOCK;
152 			return -1;
153 		}
154 
155 		if (optname == SO_ERROR) {
156 			if (socket->socktype == MOS_SOCK_STREAM) {
157 				return GetSocketError(socket, optval, optlen);
158 			}
159 		}
160 		break;
161 	case SOL_MONSOCKET:
162 		/* check if the calling thread is in MOS context */
163 		if (mtcp->ctx->thread != pthread_self()) {
164 			errno = EPERM;
165 			return -1;
166 		}
167 		/*
168 		 * All options will only work for active
169 		 * monitor stream sockets
170 		 */
171 		socket = &mtcp->msmap[sockid];
172 		if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
173 			TRACE_API("Invalid socket id: %d\n", sockid);
174 			errno = ENOTSOCK;
175 			return -1;
176 		}
177 
178 		switch (optname) {
179 		case MOS_FRAGINFO_CLIBUF:
180 			return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen);
181 		case MOS_FRAGINFO_SVRBUF:
182 			return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen);
183 		case MOS_INFO_CLIBUF:
184 			return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen);
185 		case MOS_INFO_SVRBUF:
186 			return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen);
187 		case MOS_TCP_STATE_CLI:
188 			return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI,
189 							   optval, optlen);
190 		case MOS_TCP_STATE_SVR:
191 			return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR,
192 							   optval, optlen);
193 		case MOS_TIMESTAMP:
194 			return GetLastTimestamp(socket->monitor_stream->stream,
195 						(uint32_t *)optval,
196 						optlen);
197 		default:
198 		  TRACE_API("can't recognize the optname=%d\n", optname);
199 		  assert(0);
200 		}
201 		break;
202 	}
203 	errno = ENOSYS;
204 	return -1;
205 }
206 /*----------------------------------------------------------------------------*/
207 int
208 mtcp_setsockopt(mctx_t mctx, int sockid, int level,
209 		int optname, const void *optval, socklen_t optlen)
210 {
211 	mtcp_manager_t mtcp;
212 	socket_map_t socket;
213 	tcprb_t *rb;
214 
215 	mtcp = GetMTCPManager(mctx);
216 	if (!mtcp) {
217 		errno = EACCES;
218 		return -1;
219 	}
220 
221 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
222 		TRACE_API("Socket id %d out of range.\n", sockid);
223 		errno = EBADF;
224 		return -1;
225 	}
226 
227 	switch (level) {
228 	case SOL_SOCKET:
229 		socket = &mtcp->smap[sockid];
230 		if (socket->socktype == MOS_SOCK_UNUSED) {
231 			TRACE_API("Invalid socket id: %d\n", sockid);
232 			errno = EBADF;
233 			return -1;
234 		}
235 
236 		if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
237 		    socket->socktype != MOS_SOCK_STREAM) {
238 			TRACE_API("Invalid socket id: %d\n", sockid);
239 			errno = ENOTSOCK;
240 			return -1;
241 		}
242 		break;
243 	case SOL_MONSOCKET:
244 		socket = &mtcp->msmap[sockid];
245 		/*
246 		 * checking of calling thread to be in MOS context is
247 		 * disabled since both optnames can be called from
248 		 * `application' context (on passive sockets)
249 		 */
250 		/*
251 		 * if (mtcp->ctx->thread != pthread_self())
252 		 * return -1;
253 		 */
254 
255 		switch (optname) {
256 		case MOS_CLIBUF:
257 #if 0
258 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
259 				errno = EBADF;
260 				return -1;
261 			}
262 #endif
263 #ifdef NEWRB
264 #ifdef DISABLE_DYN_RESIZE
265 			if (*(int *)optval != 0)
266 				return -1;
267 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
268 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
269 					socket->monitor_stream->stream->rcvvar->rcvbuf :
270 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
271 				if (rb) {
272 					tcprb_resize_meta(rb, 0);
273 					tcprb_resize(rb, 0);
274 				}
275 			}
276 			return DisableBuf(socket, MOS_SIDE_CLI);
277 #else
278 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
279 				socket->monitor_stream->stream->rcvvar->rcvbuf :
280 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
281 			if (tcprb_resize_meta(rb, *(int *)optval) < 0)
282 				return -1;
283 			return tcprb_resize(rb,
284 					(((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
285 #endif
286 #else
287 			errno = EBADF;
288 			return -1;
289 #endif
290 		case MOS_SVRBUF:
291 #if 0
292 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
293 				errno = EBADF;
294 				return -1;
295 			}
296 #endif
297 #ifdef NEWRB
298 #ifdef DISABLE_DYN_RESIZE
299 			if (*(int *)optval != 0)
300 				return -1;
301 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
302 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
303 					socket->monitor_stream->stream->rcvvar->rcvbuf :
304 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
305 				if (rb) {
306 					tcprb_resize_meta(rb, 0);
307 					tcprb_resize(rb, 0);
308 				}
309 			}
310 			return DisableBuf(socket, MOS_SIDE_SVR);
311 #else
312 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
313 				socket->monitor_stream->stream->rcvvar->rcvbuf :
314 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
315 			if (tcprb_resize_meta(rb, *(int *)optval) < 0)
316 				return -1;
317 			return tcprb_resize(rb,
318 					(((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
319 #endif
320 #else
321 			errno = EBADF;
322 			return -1;
323 #endif
324 		case MOS_FRAG_CLIBUF:
325 #if 0
326 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
327 				errno = EBADF;
328 				return -1;
329 			}
330 #endif
331 #ifdef NEWRB
332 #ifdef DISABLE_DYN_RESIZE
333 			if (*(int *)optval != 0)
334 				return -1;
335 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
336 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
337 					socket->monitor_stream->stream->rcvvar->rcvbuf :
338 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
339 				if (rb)
340 					tcprb_resize(rb, 0);
341 			}
342 			return 0;
343 #else
344 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
345 				socket->monitor_stream->stream->rcvvar->rcvbuf :
346 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
347 			if (rb->len == 0)
348 				return tcprb_resize_meta(rb, *(int *)optval);
349 			else
350 				return -1;
351 #endif
352 #else
353 			errno = EBADF;
354 			return -1;
355 #endif
356 		case MOS_FRAG_SVRBUF:
357 #if 0
358 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
359 				errno = EBADF;
360 				return -1;
361 			}
362 #endif
363 #ifdef NEWRB
364 #ifdef DISABLE_DYN_RESIZE
365 			if (*(int *)optval != 0)
366 				return -1;
367 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
368 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
369 					socket->monitor_stream->stream->rcvvar->rcvbuf :
370 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
371 				if (rb)
372 					tcprb_resize(rb, 0);
373 			}
374 			return 0;
375 #else
376 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
377 				socket->monitor_stream->stream->rcvvar->rcvbuf :
378 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
379 			if (rb->len == 0)
380 				return tcprb_resize_meta(rb, *(int *)optval);
381 			else
382 				return -1;
383 #endif
384 #else
385 			errno = EBADF;
386 			return -1;
387 #endif
388 		case MOS_MONLEVEL:
389 #ifdef OLD_API
390 			assert(*(int *)optval == MOS_NO_CLIBUF ||
391 			       *(int *)optval == MOS_NO_SVRBUF);
392 			return DisableBuf(socket,
393 					  (*(int *)optval == MOS_NO_CLIBUF) ?
394 					  MOS_SIDE_CLI : MOS_SIDE_SVR);
395 #endif
396 		case MOS_SEQ_REMAP:
397 			return TcpSeqChange(socket,
398 					    (uint32_t)((seq_remap_info *)optval)->seq_off,
399 					    ((seq_remap_info *)optval)->side,
400 					    mtcp->pctx->p.seq);
401 		case MOS_STOP_MON:
402 			return mtcp_cb_stop(mctx, sockid, *(int *)optval);
403 		default:
404 			TRACE_API("invalid optname=%d\n", optname);
405 			assert(0);
406 		}
407 		break;
408 	}
409 
410 	errno = ENOSYS;
411 	return -1;
412 }
413 /*----------------------------------------------------------------------------*/
414 int
415 mtcp_setsock_nonblock(mctx_t mctx, int sockid)
416 {
417 	mtcp_manager_t mtcp;
418 
419 	mtcp = GetMTCPManager(mctx);
420 	if (!mtcp) {
421 		errno = EACCES;
422 		return -1;
423 	}
424 
425 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
426 		TRACE_API("Socket id %d out of range.\n", sockid);
427 		errno = EBADF;
428 		return -1;
429 	}
430 
431 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
432 		TRACE_API("Invalid socket id: %d\n", sockid);
433 		errno = EBADF;
434 		return -1;
435 	}
436 
437 	mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
438 
439 	return 0;
440 }
441 /*----------------------------------------------------------------------------*/
442 int
443 mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp)
444 {
445 	mtcp_manager_t mtcp;
446 	socket_map_t socket;
447 
448 	mtcp = GetMTCPManager(mctx);
449 	if (!mtcp) {
450 		errno = EACCES;
451 		return -1;
452 	}
453 
454 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
455 		TRACE_API("Socket id %d out of range.\n", sockid);
456 		errno = EBADF;
457 		return -1;
458 	}
459 
460 	/* only support stream socket */
461 	socket = &mtcp->smap[sockid];
462 
463 	if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
464 		socket->socktype != MOS_SOCK_STREAM) {
465 		TRACE_API("Invalid socket id: %d\n", sockid);
466 		errno = EBADF;
467 		return -1;
468 	}
469 
470 	if (!argp) {
471 		errno = EFAULT;
472 		return -1;
473 	}
474 
475 	if (request == FIONREAD) {
476 		tcp_stream *cur_stream;
477 #ifdef NEWRB
478 		tcprb_t *rbuf;
479 #else
480 		struct tcp_ring_buffer *rbuf;
481 #endif
482 
483 		cur_stream = socket->stream;
484 		if (!cur_stream) {
485 			errno = EBADF;
486 			return -1;
487 		}
488 
489 		rbuf = cur_stream->rcvvar->rcvbuf;
490 #ifdef NEWRB
491 		*(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0;
492 #else
493 		*(int *)argp = (rbuf) ? rbuf->merged_len : 0;
494 #endif
495 
496 	} else if (request == FIONBIO) {
497 		/*
498 		 * sockets can only be set to blocking/non-blocking
499 		 * modes during initialization
500 		 */
501 		if ((*(int *)argp))
502 			mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
503 		else
504 			mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK;
505 	} else {
506 		errno = EINVAL;
507 		return -1;
508 	}
509 
510 	return 0;
511 }
512 /*----------------------------------------------------------------------------*/
513 static int
514 mtcp_monitor(mctx_t mctx, socket_map_t sock)
515 {
516 	mtcp_manager_t mtcp;
517 	struct mon_listener *monitor;
518 	int sockid = sock->id;
519 
520 	mtcp = GetMTCPManager(mctx);
521 	if (!mtcp) {
522 		errno = EACCES;
523 		return -1;
524 	}
525 
526 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
527 		TRACE_API("Socket id %d out of range.\n", sockid);
528 		errno = EBADF;
529 		return -1;
530 	}
531 
532 	if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) {
533 		TRACE_API("Invalid socket id: %d\n", sockid);
534 		errno = EBADF;
535 		return -1;
536 	}
537 
538 	if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM ||
539 	      mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) {
540 		TRACE_API("Not a monitor socket. id: %d\n", sockid);
541 		errno = ENOTSOCK;
542 		return -1;
543 	}
544 
545 	monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener));
546 	if (!monitor) {
547 		/* errno set from the malloc() */
548 		errno = ENOMEM;
549 		return -1;
550 	}
551 
552 	/* create a monitor-specific event queue */
553 	monitor->eq = CreateEventQueue(g_config.mos->max_concurrency);
554 	if (!monitor->eq) {
555 		TRACE_API("Can't create event queue (concurrency: %d) for "
556 			  "monitor read event registrations!\n",
557 			  g_config.mos->max_concurrency);
558 		free(monitor);
559 		errno = ENOMEM;
560 		return -1;
561 	}
562 
563 	/* set monitor-related basic parameters */
564 #ifndef NEWEV
565 	monitor->ude_id = UDE_OFFSET;
566 #endif
567 	monitor->socket = sock;
568 #ifdef NEWRB
569 	monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL;
570 #else
571 	monitor->client_buf_mgmt = monitor->server_buf_mgmt = 1;
572 #endif
573 
574 	/* perform both sides monitoring by default */
575 	monitor->client_mon = monitor->server_mon = 1;
576 
577 	/* add monitor socket to the monitor list */
578 	TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link);
579 
580 	mtcp->msmap[sockid].monitor_listener = monitor;
581 
582 	return 0;
583 }
584 /*----------------------------------------------------------------------------*/
585 int
586 mtcp_socket(mctx_t mctx, int domain, int type, int protocol)
587 {
588 	mtcp_manager_t mtcp;
589 	socket_map_t socket;
590 
591 	mtcp = GetMTCPManager(mctx);
592 	if (!mtcp) {
593 		errno = EACCES;
594 		return -1;
595 	}
596 
597 	if (domain != AF_INET) {
598 		errno = EAFNOSUPPORT;
599 		return -1;
600 	}
601 
602 	if (type == SOCK_STREAM) {
603 		type = MOS_SOCK_STREAM;
604 	} else if (type == MOS_SOCK_MONITOR_STREAM ||
605 		   type == MOS_SOCK_MONITOR_RAW) {
606 		/* do nothing for the time being */
607 	} else {
608 		/* Not supported type */
609 		errno = EINVAL;
610 		return -1;
611 	}
612 
613 	socket = AllocateSocket(mctx, type);
614 	if (!socket) {
615 		errno = ENFILE;
616 		return -1;
617 	}
618 
619 	if (type == MOS_SOCK_MONITOR_STREAM ||
620 	    type == MOS_SOCK_MONITOR_RAW) {
621 		mtcp_manager_t mtcp = GetMTCPManager(mctx);
622 		if (!mtcp) {
623 			errno = EACCES;
624 			return -1;
625 		}
626 		mtcp_monitor(mctx, socket);
627 #ifdef NEWEV
628 		socket->monitor_listener->stree_dontcare = NULL;
629 		socket->monitor_listener->stree_pre_rcv = NULL;
630 		socket->monitor_listener->stree_post_snd = NULL;
631 #else
632 		InitEvB(mtcp, &socket->monitor_listener->dontcare_evb);
633 		InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb);
634 		InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb);
635 #endif
636 	}
637 
638 	return socket->id;
639 }
640 /*----------------------------------------------------------------------------*/
641 int
642 mtcp_bind(mctx_t mctx, int sockid,
643 		const struct sockaddr *addr, socklen_t addrlen)
644 {
645 	mtcp_manager_t mtcp;
646 	struct sockaddr_in *addr_in;
647 
648 	mtcp = GetMTCPManager(mctx);
649 	if (!mtcp) {
650 		errno = EACCES;
651 		return -1;
652 	}
653 
654 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
655 		TRACE_API("Socket id %d out of range.\n", sockid);
656 		errno = EBADF;
657 		return -1;
658 	}
659 
660 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
661 		TRACE_API("Invalid socket id: %d\n", sockid);
662 		errno = EBADF;
663 		return -1;
664 	}
665 
666 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM &&
667 			mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
668 		TRACE_API("Not a stream socket id: %d\n", sockid);
669 		errno = ENOTSOCK;
670 		return -1;
671 	}
672 
673 	if (!addr) {
674 		TRACE_API("Socket %d: empty address!\n", sockid);
675 		errno = EINVAL;
676 		return -1;
677 	}
678 
679 	if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) {
680 		TRACE_API("Socket %d: adress already bind for this socket.\n", sockid);
681 		errno = EINVAL;
682 		return -1;
683 	}
684 
685 	/* we only allow bind() for AF_INET address */
686 	if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
687 		TRACE_API("Socket %d: invalid argument!\n", sockid);
688 		errno = EINVAL;
689 		return -1;
690 	}
691 
692 	if (mtcp->listener) {
693 		TRACE_API("Address already bound!\n");
694 		errno = EINVAL;
695 		return -1;
696 	}
697 	addr_in = (struct sockaddr_in *)addr;
698 	mtcp->smap[sockid].saddr = *addr_in;
699 	mtcp->smap[sockid].opts |= MTCP_ADDR_BIND;
700 
701 	return 0;
702 }
703 /*----------------------------------------------------------------------------*/
704 int
705 mtcp_listen(mctx_t mctx, int sockid, int backlog)
706 {
707 	mtcp_manager_t mtcp;
708 	struct tcp_listener *listener;
709 
710 	mtcp = GetMTCPManager(mctx);
711 	if (!mtcp) {
712 		errno = EACCES;
713 		return -1;
714 	}
715 
716 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
717 		TRACE_API("Socket id %d out of range.\n", sockid);
718 		errno = EBADF;
719 		return -1;
720 	}
721 
722 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
723 		TRACE_API("Invalid socket id: %d\n", sockid);
724 		errno = EBADF;
725 		return -1;
726 	}
727 
728 	if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) {
729 		mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN;
730 	}
731 
732 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
733 		TRACE_API("Not a listening socket. id: %d\n", sockid);
734 		errno = ENOTSOCK;
735 		return -1;
736 	}
737 
738 	if (backlog <= 0 || backlog > g_config.mos->max_concurrency) {
739 		errno = EINVAL;
740 		return -1;
741 	}
742 
743 	listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener));
744 	if (!listener) {
745 		/* errno set from the malloc() */
746 		errno = ENOMEM;
747 		return -1;
748 	}
749 
750 	listener->sockid = sockid;
751 	listener->backlog = backlog;
752 	listener->socket = &mtcp->smap[sockid];
753 
754 	if (pthread_cond_init(&listener->accept_cond, NULL)) {
755 		perror("pthread_cond_init of ctx->accept_cond\n");
756 		/* errno set by pthread_cond_init() */
757 		return -1;
758 	}
759 	if (pthread_mutex_init(&listener->accept_lock, NULL)) {
760 		perror("pthread_mutex_init of ctx->accept_lock\n");
761 		/* errno set by pthread_mutex_init() */
762 		return -1;
763 	}
764 
765 	listener->acceptq = CreateStreamQueue(backlog);
766 	if (!listener->acceptq) {
767 		errno = ENOMEM;
768 		return -1;
769 	}
770 
771 	mtcp->smap[sockid].listener = listener;
772 	mtcp->listener = listener;
773 
774 	return 0;
775 }
776 /*----------------------------------------------------------------------------*/
777 int
778 mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen)
779 {
780 	mtcp_manager_t mtcp;
781 	struct tcp_listener *listener;
782 	socket_map_t socket;
783 	tcp_stream *accepted = NULL;
784 
785 	mtcp = GetMTCPManager(mctx);
786 	if (!mtcp) {
787 		errno = EACCES;
788 		return -1;
789 	}
790 
791 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
792 		TRACE_API("Socket id %d out of range.\n", sockid);
793 		errno = EBADF;
794 		return -1;
795 	}
796 
797 	/* requires listening socket */
798 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
799 		errno = EINVAL;
800 		return -1;
801 	}
802 
803 	listener = mtcp->smap[sockid].listener;
804 
805 	/* dequeue from the acceptq without lock first */
806 	/* if nothing there, acquire lock and cond_wait */
807 	accepted = StreamDequeue(listener->acceptq);
808 	if (!accepted) {
809 		if (listener->socket->opts & MTCP_NONBLOCK) {
810 			errno = EAGAIN;
811 			return -1;
812 
813 		} else {
814 			pthread_mutex_lock(&listener->accept_lock);
815 			while ((accepted = StreamDequeue(listener->acceptq)) == NULL) {
816 				pthread_cond_wait(&listener->accept_cond, &listener->accept_lock);
817 
818 				if (mtcp->ctx->done || mtcp->ctx->exit) {
819 					pthread_mutex_unlock(&listener->accept_lock);
820 					errno = EINTR;
821 					return -1;
822 				}
823 			}
824 			pthread_mutex_unlock(&listener->accept_lock);
825 		}
826 	}
827 
828 	if (!accepted) {
829 		TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n");
830 	}
831 
832 	if (!accepted->socket) {
833 		socket = AllocateSocket(mctx, MOS_SOCK_STREAM);
834 		if (!socket) {
835 			TRACE_ERROR("Failed to create new socket!\n");
836 			/* TODO: destroy the stream */
837 			errno = ENFILE;
838 			return -1;
839 		}
840 		socket->stream = accepted;
841 		accepted->socket = socket;
842 		/* if monitor is enabled, complete the socket assignment */
843 		if (socket->stream->pair_stream != NULL)
844 			socket->stream->pair_stream->socket = socket;
845 	}
846 
847 	TRACE_API("Stream %d accepted.\n", accepted->id);
848 
849 	if (addr && addrlen) {
850 		struct sockaddr_in *addr_in = (struct sockaddr_in *)addr;
851 		addr_in->sin_family = AF_INET;
852 		addr_in->sin_port = accepted->dport;
853 		addr_in->sin_addr.s_addr = accepted->daddr;
854 		*addrlen = sizeof(struct sockaddr_in);
855 	}
856 
857 	return accepted->socket->id;
858 }
859 /*----------------------------------------------------------------------------*/
860 int
861 mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr,
862 		in_addr_t daddr, in_addr_t dport)
863 {
864 	mtcp_manager_t mtcp;
865 	addr_pool_t ap;
866 
867 	mtcp = GetMTCPManager(mctx);
868 	if (!mtcp) {
869 		errno = EACCES;
870 		return -1;
871 	}
872 
873 	if (saddr_base == INADDR_ANY) {
874 		int nif_out;
875 
876 		/* for the INADDR_ANY, find the output interface for the destination
877 		   and set the saddr_base as the ip address of the output interface */
878 		nif_out = GetOutputInterface(daddr);
879 		saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr;
880 	}
881 
882 	ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus,
883 			saddr_base, num_addr, daddr, dport);
884 	if (!ap) {
885 		errno = ENOMEM;
886 		return -1;
887 	}
888 
889 	mtcp->ap = ap;
890 
891 	return 0;
892 }
893 /*----------------------------------------------------------------------------*/
894 int
895 eval_bpf_5tuple(struct sfbpf_program fcode,
896 				in_addr_t saddr, in_port_t sport,
897 				in_addr_t daddr, in_port_t dport) {
898 	uint8_t buf[TOTAL_TCP_HEADER_LEN];
899 	struct ethhdr *ethh;
900 	struct iphdr *iph;
901 	struct tcphdr *tcph;
902 
903 	ethh = (struct ethhdr *)buf;
904 	ethh->h_proto = htons(ETH_P_IP);
905 	iph = (struct iphdr *)(ethh + 1);
906 	iph->ihl = IP_HEADER_LEN >> 2;
907 	iph->version = 4;
908 	iph->tos = 0;
909 	iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN);
910 	iph->id = htons(0);
911 	iph->protocol = IPPROTO_TCP;
912 	iph->saddr = saddr;
913 	iph->daddr = daddr;
914 	iph->check = 0;
915 	tcph = (struct tcphdr *)(iph + 1);
916 	tcph->source = sport;
917 	tcph->dest = dport;
918 
919 	return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr),
920 						 TOTAL_TCP_HEADER_LEN);
921 }
922 /*----------------------------------------------------------------------------*/
923 int
924 mtcp_connect(mctx_t mctx, int sockid,
925 		const struct sockaddr *addr, socklen_t addrlen)
926 {
927 	mtcp_manager_t mtcp;
928 	socket_map_t socket;
929 	tcp_stream *cur_stream;
930 	struct sockaddr_in *addr_in;
931 	in_addr_t dip;
932 	in_port_t dport;
933 	int is_dyn_bound = FALSE;
934 	int ret;
935 	int cnt_match = 0;
936 	struct mon_listener *walk;
937 	struct sfbpf_program fcode;
938 
939 	cur_stream = NULL;
940 	mtcp = GetMTCPManager(mctx);
941 	if (!mtcp) {
942 		errno = EACCES;
943 		return -1;
944 	}
945 
946 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
947 		TRACE_API("Socket id %d out of range.\n", sockid);
948 		errno = EBADF;
949 		return -1;
950 	}
951 
952 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
953 		TRACE_API("Invalid socket id: %d\n", sockid);
954 		errno = EBADF;
955 		return -1;
956 	}
957 
958 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
959 		TRACE_API("Not an end socket. id: %d\n", sockid);
960 		errno = ENOTSOCK;
961 		return -1;
962 	}
963 
964 	if (!addr) {
965 		TRACE_API("Socket %d: empty address!\n", sockid);
966 		errno = EFAULT;
967 		return -1;
968 	}
969 
970 	/* we only allow bind() for AF_INET address */
971 	if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
972 		TRACE_API("Socket %d: invalid argument!\n", sockid);
973 		errno = EAFNOSUPPORT;
974 		return -1;
975 	}
976 
977 	socket = &mtcp->smap[sockid];
978 	if (socket->stream) {
979 		TRACE_API("Socket %d: stream already exist!\n", sockid);
980 		if (socket->stream->state >= TCP_ST_ESTABLISHED) {
981 			errno = EISCONN;
982 		} else {
983 			errno = EALREADY;
984 		}
985 		return -1;
986 	}
987 
988 	addr_in = (struct sockaddr_in *)addr;
989 	dip = addr_in->sin_addr.s_addr;
990 	dport = addr_in->sin_port;
991 
992 	/* address binding */
993 	if (socket->opts & MTCP_ADDR_BIND &&
994 	    socket->saddr.sin_port != INPORT_ANY &&
995 	    socket->saddr.sin_addr.s_addr != INADDR_ANY) {
996 		int rss_core;
997 
998 		rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip,
999 				socket->saddr.sin_port, dport, num_queues);
1000 
1001 		if (rss_core != mctx->cpu) {
1002 			errno = EINVAL;
1003 			return -1;
1004 		}
1005 	} else {
1006 		if (mtcp->ap) {
1007 			ret = FetchAddress(mtcp->ap,
1008 					mctx->cpu, num_queues, addr_in, &socket->saddr);
1009 		} else {
1010 			ret = FetchAddress(ap,
1011 					mctx->cpu, num_queues, addr_in, &socket->saddr);
1012 		}
1013 		if (ret < 0) {
1014 			errno = EAGAIN;
1015 			return -1;
1016 		}
1017 		socket->opts |= MTCP_ADDR_BIND;
1018 		is_dyn_bound = TRUE;
1019 	}
1020 
1021 	cnt_match = 0;
1022 	if (mtcp->num_msp > 0) {
1023 		TAILQ_FOREACH(walk, &mtcp->monitors, link) {
1024 			fcode = walk->stream_syn_fcode;
1025 			if (!(ISSET_BPFFILTER(fcode) &&
1026 				  eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr,
1027 								  socket->saddr.sin_port,
1028 								  dip, dport) == 0)) {
1029 				walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1
1030 				cnt_match++;
1031 			}
1032 		}
1033 	}
1034 
1035 	if (mtcp->num_msp > 0 && cnt_match > 0) {
1036 		/* 150820 dhkim: XXX: embedded mode is not verified */
1037 #if 1
1038 		cur_stream = CreateClientTCPStream(mtcp, socket,
1039 						 STREAM_TYPE(MOS_SOCK_STREAM) |
1040 						 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
1041 						 socket->saddr.sin_addr.s_addr,
1042 						 socket->saddr.sin_port, dip, dport, NULL);
1043 #else
1044 		cur_stream = CreateDualTCPStream(mtcp, socket,
1045 						 STREAM_TYPE(MOS_SOCK_STREAM) |
1046 						 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
1047 						 socket->saddr.sin_addr.s_addr,
1048 						 socket->saddr.sin_port, dip, dport, NULL);
1049 #endif
1050 	}
1051 	else
1052 		cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM),
1053 					     socket->saddr.sin_addr.s_addr,
1054 					     socket->saddr.sin_port, dip, dport, NULL);
1055 	if (!cur_stream) {
1056 		TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid);
1057 		errno = ENOMEM;
1058 		return -1;
1059 	}
1060 
1061 	if (is_dyn_bound)
1062 		cur_stream->is_bound_addr = TRUE;
1063 	cur_stream->sndvar->cwnd = 1;
1064 	cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10;
1065 	cur_stream->side = MOS_SIDE_CLI;
1066 	/* if monitor is enabled, update the pair stream side as well */
1067 	if (cur_stream->pair_stream) {
1068 		cur_stream->pair_stream->side = MOS_SIDE_SVR;
1069 		/*
1070 		 * if buffer management is off, then disable
1071 		 * monitoring tcp ring of server...
1072 		 * if there is even a single monitor asking for
1073 		 * buffer management, enable it (that's why the
1074 		 * need for the loop)
1075 		 */
1076 		cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF;
1077 		struct socket_map *walk;
1078 		SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
1079 			uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt;
1080 			if (bm > cur_stream->pair_stream->buffer_mgmt) {
1081 				cur_stream->pair_stream->buffer_mgmt = bm;
1082 				break;
1083 			}
1084 		} SOCKQ_FOREACH_END;
1085 	}
1086 
1087 	cur_stream->state = TCP_ST_SYN_SENT;
1088 	cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1089 
1090 	TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id);
1091 
1092 	SQ_LOCK(&mtcp->ctx->connect_lock);
1093 	ret = StreamEnqueue(mtcp->connectq, cur_stream);
1094 	SQ_UNLOCK(&mtcp->ctx->connect_lock);
1095 	mtcp->wakeup_flag = TRUE;
1096 	if (ret < 0) {
1097 		TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid);
1098 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1099 		StreamEnqueue(mtcp->destroyq, cur_stream);
1100 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1101 		errno = EAGAIN;
1102 		return -1;
1103 	}
1104 
1105 	/* if nonblocking socket, return EINPROGRESS */
1106 	if (socket->opts & MTCP_NONBLOCK) {
1107 		errno = EINPROGRESS;
1108 		return -1;
1109 
1110 	} else {
1111 		while (1) {
1112 			if (!cur_stream) {
1113 				TRACE_ERROR("STREAM DESTROYED\n");
1114 				errno = ETIMEDOUT;
1115 				return -1;
1116 			}
1117 			if (cur_stream->state > TCP_ST_ESTABLISHED) {
1118 				TRACE_ERROR("Socket %d: weird state %s\n",
1119 						sockid, TCPStateToString(cur_stream));
1120 				// TODO: how to handle this?
1121 				errno = ENOSYS;
1122 				return -1;
1123 			}
1124 
1125 			if (cur_stream->state == TCP_ST_ESTABLISHED) {
1126 				break;
1127 			}
1128 			usleep(1000);
1129 		}
1130 	}
1131 
1132 	return 0;
1133 }
1134 /*----------------------------------------------------------------------------*/
1135 static inline int
1136 CloseStreamSocket(mctx_t mctx, int sockid)
1137 {
1138 	mtcp_manager_t mtcp;
1139 	tcp_stream *cur_stream;
1140 	int ret;
1141 
1142 	mtcp = GetMTCPManager(mctx);
1143 	if (!mtcp) {
1144 		errno = EACCES;
1145 		return -1;
1146 	}
1147 
1148 	cur_stream = mtcp->smap[sockid].stream;
1149 	if (!cur_stream) {
1150 		TRACE_API("Socket %d: stream does not exist.\n", sockid);
1151 		errno = ENOTCONN;
1152 		return -1;
1153 	}
1154 
1155 	if (cur_stream->closed) {
1156 		TRACE_API("Socket %d (Stream %u): already closed stream\n",
1157 				sockid, cur_stream->id);
1158 		return 0;
1159 	}
1160 	cur_stream->closed = TRUE;
1161 
1162 	TRACE_API("Stream %d: closing the stream.\n", cur_stream->id);
1163 
1164 	/* 141029 dhkim: Check this! */
1165 	cur_stream->socket = NULL;
1166 
1167 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
1168 		TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n",
1169 				cur_stream->id);
1170 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1171 		StreamEnqueue(mtcp->destroyq, cur_stream);
1172 		mtcp->wakeup_flag = TRUE;
1173 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1174 		return 0;
1175 
1176 	} else if (cur_stream->state == TCP_ST_SYN_SENT) {
1177 #if 1
1178 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1179 		StreamEnqueue(mtcp->destroyq, cur_stream);
1180 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1181 		mtcp->wakeup_flag = TRUE;
1182 #endif
1183 		return -1;
1184 
1185 	} else if (cur_stream->state != TCP_ST_ESTABLISHED &&
1186 			cur_stream->state != TCP_ST_CLOSE_WAIT) {
1187 		TRACE_API("Stream %d at state %s\n",
1188 				cur_stream->id, TCPStateToString(cur_stream));
1189 		errno = EBADF;
1190 		return -1;
1191 	}
1192 
1193 	SQ_LOCK(&mtcp->ctx->close_lock);
1194 	cur_stream->sndvar->on_closeq = TRUE;
1195 	ret = StreamEnqueue(mtcp->closeq, cur_stream);
1196 	mtcp->wakeup_flag = TRUE;
1197 	SQ_UNLOCK(&mtcp->ctx->close_lock);
1198 
1199 	if (ret < 0) {
1200 		TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
1201 		errno = EAGAIN;
1202 		return -1;
1203 	}
1204 
1205 	return 0;
1206 }
1207 /*----------------------------------------------------------------------------*/
1208 static inline int
1209 CloseListeningSocket(mctx_t mctx, int sockid)
1210 {
1211 	mtcp_manager_t mtcp;
1212 	struct tcp_listener *listener;
1213 
1214 	mtcp = GetMTCPManager(mctx);
1215 	if (!mtcp) {
1216 		errno = EACCES;
1217 		return -1;
1218 	}
1219 
1220 	listener = mtcp->smap[sockid].listener;
1221 	if (!listener) {
1222 		errno = EINVAL;
1223 		return -1;
1224 	}
1225 
1226 	if (listener->acceptq) {
1227 		DestroyStreamQueue(listener->acceptq);
1228 		listener->acceptq = NULL;
1229 	}
1230 
1231 	pthread_mutex_lock(&listener->accept_lock);
1232 	pthread_cond_signal(&listener->accept_cond);
1233 	pthread_mutex_unlock(&listener->accept_lock);
1234 
1235 	pthread_cond_destroy(&listener->accept_cond);
1236 	pthread_mutex_destroy(&listener->accept_lock);
1237 
1238 	free(listener);
1239 	mtcp->smap[sockid].listener = NULL;
1240 
1241 	return 0;
1242 }
1243 /*----------------------------------------------------------------------------*/
1244 int
1245 mtcp_close(mctx_t mctx, int sockid)
1246 {
1247 	mtcp_manager_t mtcp;
1248 	int ret;
1249 
1250 	mtcp = GetMTCPManager(mctx);
1251 	if (!mtcp) {
1252 		errno = EACCES;
1253 		return -1;
1254 	}
1255 
1256 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1257 		TRACE_API("Socket id %d out of range.\n", sockid);
1258 		errno = EBADF;
1259 		return -1;
1260 	}
1261 
1262 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1263 		TRACE_API("Invalid socket id: %d\n", sockid);
1264 		errno = EBADF;
1265 		return -1;
1266 	}
1267 
1268 	TRACE_API("Socket %d: mtcp_close called.\n", sockid);
1269 
1270 	switch (mtcp->smap[sockid].socktype) {
1271 	case MOS_SOCK_STREAM:
1272 		ret = CloseStreamSocket(mctx, sockid);
1273 		break;
1274 
1275 	case MOS_SOCK_STREAM_LISTEN:
1276 		ret = CloseListeningSocket(mctx, sockid);
1277 		break;
1278 
1279 	case MOS_SOCK_EPOLL:
1280 		ret = CloseEpollSocket(mctx, sockid);
1281 		break;
1282 
1283 	case MOS_SOCK_PIPE:
1284 		ret = PipeClose(mctx, sockid);
1285 		break;
1286 
1287 	default:
1288 		errno = EINVAL;
1289 		ret = -1;
1290 		break;
1291 	}
1292 
1293 	FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
1294 
1295 	return ret;
1296 }
1297 /*----------------------------------------------------------------------------*/
1298 int
1299 mtcp_abort(mctx_t mctx, int sockid)
1300 {
1301 	mtcp_manager_t mtcp;
1302 	tcp_stream *cur_stream;
1303 	int ret;
1304 
1305 	mtcp = GetMTCPManager(mctx);
1306 	if (!mtcp) {
1307 		errno = EACCES;
1308 		return -1;
1309 	}
1310 
1311 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1312 		TRACE_API("Socket id %d out of range.\n", sockid);
1313 		errno = EBADF;
1314 		return -1;
1315 	}
1316 
1317 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1318 		TRACE_API("Invalid socket id: %d\n", sockid);
1319 		errno = EBADF;
1320 		return -1;
1321 	}
1322 
1323 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
1324 		TRACE_API("Not an end socket. id: %d\n", sockid);
1325 		errno = ENOTSOCK;
1326 		return -1;
1327 	}
1328 
1329 	cur_stream = mtcp->smap[sockid].stream;
1330 	if (!cur_stream) {
1331 		TRACE_API("Stream %d: does not exist.\n", sockid);
1332 		errno = ENOTCONN;
1333 		return -1;
1334 	}
1335 
1336 	TRACE_API("Socket %d: mtcp_abort()\n", sockid);
1337 
1338 	FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
1339 	cur_stream->socket = NULL;
1340 
1341 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
1342 		TRACE_API("Stream %d: connection already reset.\n", sockid);
1343 		return ERROR;
1344 
1345 	} else if (cur_stream->state == TCP_ST_SYN_SENT) {
1346 		/* TODO: this should notify event failure to all
1347 		   previous read() or write() calls */
1348 		cur_stream->state = TCP_ST_CLOSED_RSVD;
1349 		cur_stream->close_reason = TCP_ACTIVE_CLOSE;
1350 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1351 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1352 		StreamEnqueue(mtcp->destroyq, cur_stream);
1353 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1354 		mtcp->wakeup_flag = TRUE;
1355 		return 0;
1356 
1357 	} else if (cur_stream->state == TCP_ST_CLOSING ||
1358 			cur_stream->state == TCP_ST_LAST_ACK ||
1359 			cur_stream->state == TCP_ST_TIME_WAIT) {
1360 		cur_stream->state = TCP_ST_CLOSED_RSVD;
1361 		cur_stream->close_reason = TCP_ACTIVE_CLOSE;
1362 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1363 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1364 		StreamEnqueue(mtcp->destroyq, cur_stream);
1365 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1366 		mtcp->wakeup_flag = TRUE;
1367 		return 0;
1368 	}
1369 
1370 	/* the stream structure will be destroyed after sending RST */
1371 	if (cur_stream->sndvar->on_resetq) {
1372 		TRACE_ERROR("Stream %d: calling mtcp_abort() "
1373 				"when in reset queue.\n", sockid);
1374 		errno = ECONNRESET;
1375 		return -1;
1376 	}
1377 	SQ_LOCK(&mtcp->ctx->reset_lock);
1378 	cur_stream->sndvar->on_resetq = TRUE;
1379 	ret = StreamEnqueue(mtcp->resetq, cur_stream);
1380 	SQ_UNLOCK(&mtcp->ctx->reset_lock);
1381 	mtcp->wakeup_flag = TRUE;
1382 
1383 	if (ret < 0) {
1384 		TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
1385 		errno = EAGAIN;
1386 		return -1;
1387 	}
1388 
1389 	return 0;
1390 }
1391 /*----------------------------------------------------------------------------*/
1392 static inline int
1393 CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len)
1394 {
1395 	struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
1396 	int copylen;
1397 #ifdef NEWRB
1398 	tcprb_t *rb = rcvvar->rcvbuf;
1399 	if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) {
1400 		errno = EAGAIN;
1401 		return -1;
1402 	}
1403 	tcprb_setpile(rb, rb->pile + copylen);
1404 
1405 	rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb);
1406 	//printf("rcv_wnd: %d\n", rcvvar->rcv_wnd);
1407 #else
1408 	copylen = MIN(rcvvar->rcvbuf->merged_len, len);
1409 	if (copylen <= 0) {
1410 		errno = EAGAIN;
1411 		return -1;
1412 	}
1413 
1414 	assert(rcvvar->rcvbuf->data);
1415 	/* Copy data to user buffer and remove it from receiving buffer */
1416 	memcpy(buf, rcvvar->rcvbuf->head, copylen);
1417 	RBRemove(mtcp->rbm_rcv, rcvvar->rcvbuf, copylen, AT_APP, cur_stream->buffer_mgmt);
1418 	rcvvar->rcv_wnd = rcvvar->rcvbuf->size - rcvvar->rcvbuf->merged_len;
1419 #endif
1420 
1421 	/* Advertise newly freed receive buffer */
1422 	if (cur_stream->need_wnd_adv) {
1423 		if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) {
1424 			if (!cur_stream->sndvar->on_ackq) {
1425 				SQ_LOCK(&mtcp->ctx->ackq_lock);
1426 				cur_stream->sndvar->on_ackq = TRUE;
1427 				StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */
1428 				SQ_UNLOCK(&mtcp->ctx->ackq_lock);
1429 				cur_stream->need_wnd_adv = FALSE;
1430 				mtcp->wakeup_flag = TRUE;
1431 			}
1432 		}
1433 	}
1434 
1435 	return copylen;
1436 }
1437 /*----------------------------------------------------------------------------*/
1438 ssize_t
1439 mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len)
1440 {
1441 	mtcp_manager_t mtcp;
1442 	socket_map_t socket;
1443 	tcp_stream *cur_stream;
1444 	struct tcp_recv_vars *rcvvar;
1445 	int event_remaining, merged_len;
1446 	int ret;
1447 
1448 	mtcp = GetMTCPManager(mctx);
1449 	if (!mtcp) {
1450 		errno = EACCES;
1451 		return -1;
1452 	}
1453 
1454 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1455 		TRACE_API("Socket id %d out of range.\n", sockid);
1456 		errno = EBADF;
1457 		return -1;
1458 	}
1459 
1460 	socket = &mtcp->smap[sockid];
1461 	if (socket->socktype == MOS_SOCK_UNUSED) {
1462 		TRACE_API("Invalid socket id: %d\n", sockid);
1463 		errno = EBADF;
1464 		return -1;
1465 	}
1466 
1467 	if (socket->socktype == MOS_SOCK_PIPE) {
1468 		return PipeRead(mctx, sockid, buf, len);
1469 	}
1470 
1471 	if (socket->socktype != MOS_SOCK_STREAM) {
1472 		TRACE_API("Not an end socket. id: %d\n", sockid);
1473 		errno = ENOTSOCK;
1474 		return -1;
1475 	}
1476 
1477 	/* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
1478 	cur_stream = socket->stream;
1479 	if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf ||
1480 			!(cur_stream->state >= TCP_ST_ESTABLISHED &&
1481 			cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
1482 		errno = ENOTCONN;
1483 		return -1;
1484 	}
1485 
1486 	rcvvar = cur_stream->rcvvar;
1487 
1488 #ifdef NEWRB
1489 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1490 #else
1491 	merged_len = rcvvar->rcvbuf->merged_len;
1492 #endif
1493 
1494 	/* if CLOSE_WAIT, return 0 if there is no payload */
1495 	if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
1496 		if (!rcvvar->rcvbuf)
1497 			return 0;
1498 
1499 		if (merged_len == 0)
1500 			return 0;
1501 	}
1502 
1503 	/* return EAGAIN if no receive buffer */
1504 	if (socket->opts & MTCP_NONBLOCK) {
1505 		if (!rcvvar->rcvbuf || merged_len == 0) {
1506 			errno = EAGAIN;
1507 			return -1;
1508 		}
1509 	}
1510 
1511 	SBUF_LOCK(&rcvvar->read_lock);
1512 
1513 	ret = CopyToUser(mtcp, cur_stream, buf, len);
1514 
1515 #ifdef NEWRB
1516 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1517 #else
1518 	merged_len = rcvvar->rcvbuf->merged_len;
1519 #endif
1520 
1521 	event_remaining = FALSE;
1522 	/* if there are remaining payload, generate EPOLLIN */
1523 	/* (may due to insufficient user buffer) */
1524 	if (socket->epoll & MOS_EPOLLIN) {
1525 		if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
1526 			event_remaining = TRUE;
1527 		}
1528 	}
1529 	/* if waiting for close, notify it if no remaining data */
1530 	if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
1531 			merged_len == 0 && ret > 0) {
1532 		event_remaining = TRUE;
1533 	}
1534 
1535 	SBUF_UNLOCK(&rcvvar->read_lock);
1536 
1537 	if (event_remaining) {
1538 		if (socket->epoll) {
1539 			AddEpollEvent(mtcp->ep,
1540 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
1541 		}
1542 	}
1543 
1544 	TRACE_API("Stream %d: mtcp_read() returning %d\n", cur_stream->id, ret);
1545 	return ret;
1546 }
1547 /*----------------------------------------------------------------------------*/
1548 ssize_t
1549 mtcp_readv(mctx_t mctx, int sockid, struct iovec *iov, int numIOV)
1550 {
1551 	mtcp_manager_t mtcp;
1552 	socket_map_t socket;
1553 	tcp_stream *cur_stream;
1554 	struct tcp_recv_vars *rcvvar;
1555 	int ret, bytes_read, i;
1556 	int event_remaining, merged_len;
1557 
1558 	mtcp = GetMTCPManager(mctx);
1559 	if (!mtcp) {
1560 		errno = EACCES;
1561 		return -1;
1562 	}
1563 
1564 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1565 		TRACE_API("Socket id %d out of range.\n", sockid);
1566 		errno = EBADF;
1567 		return -1;
1568 	}
1569 
1570 	socket = &mtcp->smap[sockid];
1571 	if (socket->socktype == MOS_SOCK_UNUSED) {
1572 		TRACE_API("Invalid socket id: %d\n", sockid);
1573 		errno = EBADF;
1574 		return -1;
1575 	}
1576 
1577 	if (socket->socktype != MOS_SOCK_STREAM) {
1578 		TRACE_API("Not an end socket. id: %d\n", sockid);
1579 		errno = ENOTSOCK;
1580 		return -1;
1581 	}
1582 
1583 	/* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
1584 	cur_stream = socket->stream;
1585 	if (!cur_stream ||
1586 			!(cur_stream->state >= TCP_ST_ESTABLISHED &&
1587 			  cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
1588 		errno = ENOTCONN;
1589 		return -1;
1590 	}
1591 
1592 	rcvvar = cur_stream->rcvvar;
1593 
1594 #ifdef NEWRB
1595 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1596 #else
1597 	merged_len = rcvvar->rcvbuf->merged_len;
1598 #endif
1599 
1600 	/* if CLOSE_WAIT, return 0 if there is no payload */
1601 	if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
1602 		if (!rcvvar->rcvbuf)
1603 			return 0;
1604 
1605 		if (merged_len == 0)
1606 			return 0;
1607 	}
1608 
1609 	/* return EAGAIN if no receive buffer */
1610 	if (socket->opts & MTCP_NONBLOCK) {
1611 		if (!rcvvar->rcvbuf || merged_len == 0) {
1612 			errno = EAGAIN;
1613 			return -1;
1614 		}
1615 	}
1616 
1617 	SBUF_LOCK(&rcvvar->read_lock);
1618 
1619 	/* read and store the contents to the vectored buffers */
1620 	bytes_read = 0;
1621 	for (i = 0; i < numIOV; i++) {
1622 		if (iov[i].iov_len <= 0)
1623 			continue;
1624 
1625 		ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
1626 		if (ret <= 0)
1627 			break;
1628 
1629 		bytes_read += ret;
1630 
1631 		if (ret < iov[i].iov_len)
1632 			break;
1633 	}
1634 
1635 #ifdef NEWRB
1636 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1637 #else
1638 	merged_len = rcvvar->rcvbuf->merged_len;
1639 #endif
1640 
1641 	event_remaining = FALSE;
1642 	/* if there are remaining payload, generate read event */
1643 	/* (may due to insufficient user buffer) */
1644 	if (socket->epoll & MOS_EPOLLIN) {
1645 		if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
1646 			event_remaining = TRUE;
1647 		}
1648 	}
1649 	/* if waiting for close, notify it if no remaining data */
1650 	if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
1651 			merged_len == 0 && bytes_read > 0) {
1652 		event_remaining = TRUE;
1653 	}
1654 
1655 	SBUF_UNLOCK(&rcvvar->read_lock);
1656 
1657 	if(event_remaining) {
1658 		if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) {
1659 			AddEpollEvent(mtcp->ep,
1660 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
1661 		}
1662 	}
1663 
1664 	TRACE_API("Stream %d: mtcp_readv() returning %d\n",
1665 			cur_stream->id, bytes_read);
1666 	return bytes_read;
1667 }
1668 /*----------------------------------------------------------------------------*/
1669 static inline int
1670 CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len)
1671 {
1672 	struct tcp_send_vars *sndvar = cur_stream->sndvar;
1673 	int sndlen;
1674 	int ret;
1675 
1676 	sndlen = MIN((int)sndvar->snd_wnd, len);
1677 	if (sndlen <= 0) {
1678 		errno = EAGAIN;
1679 		return -1;
1680 	}
1681 
1682 	/* allocate send buffer if not exist */
1683 	if (!sndvar->sndbuf) {
1684 		sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1);
1685 		if (!sndvar->sndbuf) {
1686 			cur_stream->close_reason = TCP_NO_MEM;
1687 			/* notification may not required due to -1 return */
1688 			errno = ENOMEM;
1689 			return -1;
1690 		}
1691 	}
1692 
1693 	ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen);
1694 	assert(ret == sndlen);
1695 	sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len;
1696 	if (ret <= 0) {
1697 		TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n",
1698 				ret, sndlen, sndvar->sndbuf->len);
1699 		errno = EAGAIN;
1700 		return -1;
1701 	}
1702 
1703 	if (sndvar->snd_wnd <= 0) {
1704 		TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n",
1705 				cur_stream->id, sndvar->snd_wnd);
1706 	}
1707 
1708 	return ret;
1709 }
1710 /*----------------------------------------------------------------------------*/
1711 ssize_t
1712 mtcp_write(mctx_t mctx, int sockid, char *buf, size_t len)
1713 {
1714 	mtcp_manager_t mtcp;
1715 	socket_map_t socket;
1716 	tcp_stream *cur_stream;
1717 	struct tcp_send_vars *sndvar;
1718 	int ret;
1719 
1720 	mtcp = GetMTCPManager(mctx);
1721 	if (!mtcp) {
1722 		errno = EACCES;
1723 		return -1;
1724 	}
1725 
1726 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1727 		TRACE_API("Socket id %d out of range.\n", sockid);
1728 		errno = EBADF;
1729 		return -1;
1730 	}
1731 
1732 	socket = &mtcp->smap[sockid];
1733 	if (socket->socktype == MOS_SOCK_UNUSED) {
1734 		TRACE_API("Invalid socket id: %d\n", sockid);
1735 		errno = EBADF;
1736 		return -1;
1737 	}
1738 
1739 	if (socket->socktype == MOS_SOCK_PIPE) {
1740 		return PipeWrite(mctx, sockid, buf, len);
1741 	}
1742 
1743 	if (socket->socktype != MOS_SOCK_STREAM) {
1744 		TRACE_API("Not an end socket. id: %d\n", sockid);
1745 		errno = ENOTSOCK;
1746 		return -1;
1747 	}
1748 
1749 	cur_stream = socket->stream;
1750 	if (!cur_stream ||
1751 			!(cur_stream->state == TCP_ST_ESTABLISHED ||
1752 			  cur_stream->state == TCP_ST_CLOSE_WAIT)) {
1753 		errno = ENOTCONN;
1754 		return -1;
1755 	}
1756 
1757 	if (len <= 0) {
1758 		if (socket->opts & MTCP_NONBLOCK) {
1759 			errno = EAGAIN;
1760 			return -1;
1761 		} else {
1762 			return 0;
1763 		}
1764 	}
1765 
1766 	sndvar = cur_stream->sndvar;
1767 
1768 	SBUF_LOCK(&sndvar->write_lock);
1769 	ret = CopyFromUser(mtcp, cur_stream, buf, len);
1770 
1771 	SBUF_UNLOCK(&sndvar->write_lock);
1772 
1773 	if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
1774 		SQ_LOCK(&mtcp->ctx->sendq_lock);
1775 		sndvar->on_sendq = TRUE;
1776 		StreamEnqueue(mtcp->sendq, cur_stream);		/* this always success */
1777 		SQ_UNLOCK(&mtcp->ctx->sendq_lock);
1778 		mtcp->wakeup_flag = TRUE;
1779 	}
1780 
1781 	if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) {
1782 		ret = -1;
1783 		errno = EAGAIN;
1784 	}
1785 
1786 	/* if there are remaining sending buffer, generate write event */
1787 	if (sndvar->snd_wnd > 0) {
1788 		if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
1789 			AddEpollEvent(mtcp->ep,
1790 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
1791 		}
1792 	}
1793 
1794 	TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret);
1795 	return ret;
1796 }
1797 /*----------------------------------------------------------------------------*/
1798 ssize_t
1799 mtcp_writev(mctx_t mctx, int sockid, struct iovec *iov, int numIOV)
1800 {
1801 	mtcp_manager_t mtcp;
1802 	socket_map_t socket;
1803 	tcp_stream *cur_stream;
1804 	struct tcp_send_vars *sndvar;
1805 	int ret, to_write, i;
1806 
1807 	mtcp = GetMTCPManager(mctx);
1808 	if (!mtcp) {
1809 		errno = EACCES;
1810 		return -1;
1811 	}
1812 
1813 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1814 		TRACE_API("Socket id %d out of range.\n", sockid);
1815 		errno = EBADF;
1816 		return -1;
1817 	}
1818 
1819 	socket = &mtcp->smap[sockid];
1820 	if (socket->socktype == MOS_SOCK_UNUSED) {
1821 		TRACE_API("Invalid socket id: %d\n", sockid);
1822 		errno = EBADF;
1823 		return -1;
1824 	}
1825 
1826 	if (socket->socktype != MOS_SOCK_STREAM) {
1827 		TRACE_API("Not an end socket. id: %d\n", sockid);
1828 		errno = ENOTSOCK;
1829 		return -1;
1830 	}
1831 
1832 	cur_stream = socket->stream;
1833 	if (!cur_stream ||
1834 			!(cur_stream->state == TCP_ST_ESTABLISHED ||
1835 			  cur_stream->state == TCP_ST_CLOSE_WAIT)) {
1836 		errno = ENOTCONN;
1837 		return -1;
1838 	}
1839 
1840 	sndvar = cur_stream->sndvar;
1841 	SBUF_LOCK(&sndvar->write_lock);
1842 
1843 	/* write from the vectored buffers */
1844 	to_write = 0;
1845 	for (i = 0; i < numIOV; i++) {
1846 		if (iov[i].iov_len <= 0)
1847 			continue;
1848 
1849 		ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
1850 		if (ret <= 0)
1851 			break;
1852 
1853 		to_write += ret;
1854 
1855 		if (ret < iov[i].iov_len)
1856 			break;
1857 	}
1858 	SBUF_UNLOCK(&sndvar->write_lock);
1859 
1860 	if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
1861 		SQ_LOCK(&mtcp->ctx->sendq_lock);
1862 		sndvar->on_sendq = TRUE;
1863 		StreamEnqueue(mtcp->sendq, cur_stream);		/* this always success */
1864 		SQ_UNLOCK(&mtcp->ctx->sendq_lock);
1865 		mtcp->wakeup_flag = TRUE;
1866 	}
1867 
1868 	if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) {
1869 		to_write = -1;
1870 		errno = EAGAIN;
1871 	}
1872 
1873 	/* if there are remaining sending buffer, generate write event */
1874 	if (sndvar->snd_wnd > 0) {
1875 		if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
1876 			AddEpollEvent(mtcp->ep,
1877 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
1878 		}
1879 	}
1880 
1881 	TRACE_API("Stream %d: mtcp_writev() returning %d\n",
1882 			cur_stream->id, to_write);
1883 	return to_write;
1884 }
1885 /*----------------------------------------------------------------------------*/
1886 uint32_t
1887 mtcp_get_connection_cnt(mctx_t mctx)
1888 {
1889 	mtcp_manager_t mtcp;
1890 	mtcp = GetMTCPManager(mctx);
1891 	if (!mtcp) {
1892 		errno = EACCES;
1893 		return -1;
1894 	}
1895 
1896 	if (mtcp->num_msp > 0)
1897 		return mtcp->flow_cnt / 2;
1898 	else
1899 		return mtcp->flow_cnt;
1900 }
1901 /*----------------------------------------------------------------------------*/
1902