xref: /mOS-networking-stack/core/src/api.c (revision df3fae06)
176404edcSAsim Jamshed #include <sys/queue.h>
276404edcSAsim Jamshed #include <sys/ioctl.h>
376404edcSAsim Jamshed #include <limits.h>
476404edcSAsim Jamshed #include <unistd.h>
576404edcSAsim Jamshed #include <assert.h>
676404edcSAsim Jamshed #include <string.h>
776404edcSAsim Jamshed 
876404edcSAsim Jamshed #ifdef DARWIN
976404edcSAsim Jamshed #include <netinet/tcp.h>
1076404edcSAsim Jamshed #include <netinet/ip.h>
1176404edcSAsim Jamshed #include <netinet/if_ether.h>
1276404edcSAsim Jamshed #endif
1376404edcSAsim Jamshed 
1476404edcSAsim Jamshed #include "mtcp.h"
1576404edcSAsim Jamshed #include "mtcp_api.h"
1676404edcSAsim Jamshed #include "tcp_in.h"
1776404edcSAsim Jamshed #include "tcp_stream.h"
1876404edcSAsim Jamshed #include "tcp_out.h"
1976404edcSAsim Jamshed #include "ip_out.h"
2076404edcSAsim Jamshed #include "eventpoll.h"
2176404edcSAsim Jamshed #include "pipe.h"
2276404edcSAsim Jamshed #include "fhash.h"
2376404edcSAsim Jamshed #include "addr_pool.h"
2476404edcSAsim Jamshed #include "rss.h"
2576404edcSAsim Jamshed #include "config.h"
2676404edcSAsim Jamshed #include "debug.h"
2776404edcSAsim Jamshed #include "eventpoll.h"
2876404edcSAsim Jamshed #include "mos_api.h"
2976404edcSAsim Jamshed #include "tcp_rb.h"
3076404edcSAsim Jamshed 
3176404edcSAsim Jamshed #define MAX(a, b) ((a)>(b)?(a):(b))
3276404edcSAsim Jamshed #define MIN(a, b) ((a)<(b)?(a):(b))
3376404edcSAsim Jamshed 
3476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
3576404edcSAsim Jamshed /** Stop monitoring the socket! (function prototype)
3676404edcSAsim Jamshed  * @param [in] mctx: mtcp context
3776404edcSAsim Jamshed  * @param [in] sock: monitoring stream socket id
3876404edcSAsim Jamshed  * @param [in] side: side of monitoring (client side, server side or both)
3976404edcSAsim Jamshed  *
4076404edcSAsim Jamshed  * This function is now DEPRECATED and is only used within mOS core...
4176404edcSAsim Jamshed  */
4276404edcSAsim Jamshed int
4376404edcSAsim Jamshed mtcp_cb_stop(mctx_t mctx, int sock, int side);
4476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
4576404edcSAsim Jamshed /** Reset the connection (send RST packets to both sides)
4676404edcSAsim Jamshed  *  (We need to decide the API for this.)
4776404edcSAsim Jamshed  */
4876404edcSAsim Jamshed //int
4976404edcSAsim Jamshed //mtcp_cb_reset(mctx_t mctx, int sock, int side);
5076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
5176404edcSAsim Jamshed inline mtcp_manager_t
5276404edcSAsim Jamshed GetMTCPManager(mctx_t mctx)
5376404edcSAsim Jamshed {
5476404edcSAsim Jamshed 	if (!mctx) {
5576404edcSAsim Jamshed 		errno = EACCES;
5676404edcSAsim Jamshed 		return NULL;
5776404edcSAsim Jamshed 	}
5876404edcSAsim Jamshed 
5976404edcSAsim Jamshed 	if (mctx->cpu < 0 || mctx->cpu >= num_cpus) {
6076404edcSAsim Jamshed 		errno = EINVAL;
6176404edcSAsim Jamshed 		return NULL;
6276404edcSAsim Jamshed 	}
6376404edcSAsim Jamshed 
644cb4e140SAsim Jamshed 	if (!g_mtcp[mctx->cpu] || g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) {
6576404edcSAsim Jamshed 		errno = EPERM;
6676404edcSAsim Jamshed 		return NULL;
6776404edcSAsim Jamshed 	}
6876404edcSAsim Jamshed 
6976404edcSAsim Jamshed 	return g_mtcp[mctx->cpu];
7076404edcSAsim Jamshed }
7176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
72a5e1a556SAsim Jamshed static inline int
7376404edcSAsim Jamshed GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen)
7476404edcSAsim Jamshed {
7576404edcSAsim Jamshed 	tcp_stream *cur_stream;
7676404edcSAsim Jamshed 
7776404edcSAsim Jamshed 	if (!socket->stream) {
7876404edcSAsim Jamshed 		errno = EBADF;
7976404edcSAsim Jamshed 		return -1;
8076404edcSAsim Jamshed 	}
8176404edcSAsim Jamshed 
8276404edcSAsim Jamshed 	cur_stream = socket->stream;
8376404edcSAsim Jamshed 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
8476404edcSAsim Jamshed 		if (cur_stream->close_reason == TCP_TIMEDOUT ||
8576404edcSAsim Jamshed 				cur_stream->close_reason == TCP_CONN_FAIL ||
8676404edcSAsim Jamshed 				cur_stream->close_reason == TCP_CONN_LOST) {
8776404edcSAsim Jamshed 			*(int *)optval = ETIMEDOUT;
8876404edcSAsim Jamshed 			*optlen = sizeof(int);
8976404edcSAsim Jamshed 
9076404edcSAsim Jamshed 			return 0;
9176404edcSAsim Jamshed 		}
9276404edcSAsim Jamshed 	}
9376404edcSAsim Jamshed 
9476404edcSAsim Jamshed 	if (cur_stream->state == TCP_ST_CLOSE_WAIT ||
9576404edcSAsim Jamshed 			cur_stream->state == TCP_ST_CLOSED_RSVD) {
9676404edcSAsim Jamshed 		if (cur_stream->close_reason == TCP_RESET) {
9776404edcSAsim Jamshed 			*(int *)optval = ECONNRESET;
9876404edcSAsim Jamshed 			*optlen = sizeof(int);
9976404edcSAsim Jamshed 
10076404edcSAsim Jamshed 			return 0;
10176404edcSAsim Jamshed 		}
10276404edcSAsim Jamshed 	}
10376404edcSAsim Jamshed 
104a5e1a556SAsim Jamshed 	if (cur_stream->state == TCP_ST_SYN_SENT &&
105a5e1a556SAsim Jamshed 	    errno == EINPROGRESS) {
106a5e1a556SAsim Jamshed 		*(int *)optval = errno;
107a5e1a556SAsim Jamshed 		*optlen = sizeof(int);
108a5e1a556SAsim Jamshed 
109a5e1a556SAsim Jamshed 		return -1;
110a5e1a556SAsim Jamshed         }
111a5e1a556SAsim Jamshed 
112265cb675SAsim Jamshed 	/*
113265cb675SAsim Jamshed 	 * `base case`: If socket sees no so_error, then
114265cb675SAsim Jamshed 	 * this also means close_reason will always be
115265cb675SAsim Jamshed 	 * TCP_NOT_CLOSED.
116265cb675SAsim Jamshed 	 */
117265cb675SAsim Jamshed 	if (cur_stream->close_reason == TCP_NOT_CLOSED) {
118265cb675SAsim Jamshed 		*(int *)optval = 0;
119265cb675SAsim Jamshed 		*optlen = sizeof(int);
120265cb675SAsim Jamshed 
121265cb675SAsim Jamshed 		return 0;
122265cb675SAsim Jamshed 	}
123265cb675SAsim Jamshed 
12476404edcSAsim Jamshed 	errno = ENOSYS;
12576404edcSAsim Jamshed 	return -1;
12676404edcSAsim Jamshed }
12776404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
12876404edcSAsim Jamshed int
129a5e1a556SAsim Jamshed mtcp_getsockname(mctx_t mctx, int sockid, struct sockaddr *addr,
130a5e1a556SAsim Jamshed 		 socklen_t *addrlen)
131a5e1a556SAsim Jamshed {
132a5e1a556SAsim Jamshed 	mtcp_manager_t mtcp;
133a5e1a556SAsim Jamshed 	socket_map_t socket;
134a5e1a556SAsim Jamshed 
135a5e1a556SAsim Jamshed 	mtcp = GetMTCPManager(mctx);
136a5e1a556SAsim Jamshed 	if (!mtcp) {
137a5e1a556SAsim Jamshed 		return -1;
138a5e1a556SAsim Jamshed 	}
139a5e1a556SAsim Jamshed 
140a5e1a556SAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
141a5e1a556SAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
142a5e1a556SAsim Jamshed 		errno = EBADF;
143a5e1a556SAsim Jamshed 		return -1;
144a5e1a556SAsim Jamshed 	}
145a5e1a556SAsim Jamshed 
146a5e1a556SAsim Jamshed 	socket = &mtcp->smap[sockid];
147a5e1a556SAsim Jamshed 	if (socket->socktype == MOS_SOCK_UNUSED) {
148a5e1a556SAsim Jamshed 		TRACE_API("Invalid socket id: %d\n", sockid);
149a5e1a556SAsim Jamshed 		errno = EBADF;
150a5e1a556SAsim Jamshed 		return -1;
151a5e1a556SAsim Jamshed 	}
152a5e1a556SAsim Jamshed 
153a5e1a556SAsim Jamshed 	if (*addrlen <= 0) {
154a5e1a556SAsim Jamshed 		TRACE_API("Invalid addrlen: %d\n", *addrlen);
155a5e1a556SAsim Jamshed 		errno = EINVAL;
156a5e1a556SAsim Jamshed 		return -1;
157a5e1a556SAsim Jamshed 	}
158a5e1a556SAsim Jamshed 
159a5e1a556SAsim Jamshed 	if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
160a5e1a556SAsim Jamshed 	    socket->socktype != MOS_SOCK_STREAM) {
161a5e1a556SAsim Jamshed 		TRACE_API("Invalid socket id: %d\n", sockid);
162a5e1a556SAsim Jamshed 		errno = ENOTSOCK;
163a5e1a556SAsim Jamshed 		return -1;
164a5e1a556SAsim Jamshed 	}
165a5e1a556SAsim Jamshed 
166a5e1a556SAsim Jamshed 	*(struct sockaddr_in *)addr = socket->saddr;
167a5e1a556SAsim Jamshed         *addrlen = sizeof(socket->saddr);
168a5e1a556SAsim Jamshed 
169a5e1a556SAsim Jamshed 	return 0;
170a5e1a556SAsim Jamshed }
171a5e1a556SAsim Jamshed /*----------------------------------------------------------------------------*/
172a5e1a556SAsim Jamshed int
17376404edcSAsim Jamshed mtcp_getsockopt(mctx_t mctx, int sockid, int level,
17476404edcSAsim Jamshed 		int optname, void *optval, socklen_t *optlen)
17576404edcSAsim Jamshed {
17676404edcSAsim Jamshed 	mtcp_manager_t mtcp;
17776404edcSAsim Jamshed 	socket_map_t socket;
17876404edcSAsim Jamshed 
17976404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
18076404edcSAsim Jamshed 	if (!mtcp) {
18176404edcSAsim Jamshed 		errno = EACCES;
18276404edcSAsim Jamshed 		return -1;
18376404edcSAsim Jamshed 	}
18476404edcSAsim Jamshed 
18576404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
18676404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
18776404edcSAsim Jamshed 		errno = EBADF;
18876404edcSAsim Jamshed 		return -1;
18976404edcSAsim Jamshed 	}
19076404edcSAsim Jamshed 
19176404edcSAsim Jamshed 	switch (level) {
19276404edcSAsim Jamshed 	case SOL_SOCKET:
19376404edcSAsim Jamshed 		socket = &mtcp->smap[sockid];
19476404edcSAsim Jamshed 		if (socket->socktype == MOS_SOCK_UNUSED) {
19576404edcSAsim Jamshed 			TRACE_API("Invalid socket id: %d\n", sockid);
19676404edcSAsim Jamshed 			errno = EBADF;
19776404edcSAsim Jamshed 			return -1;
19876404edcSAsim Jamshed 		}
19976404edcSAsim Jamshed 
20076404edcSAsim Jamshed 		if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
20176404edcSAsim Jamshed 		    socket->socktype != MOS_SOCK_STREAM) {
20276404edcSAsim Jamshed 			TRACE_API("Invalid socket id: %d\n", sockid);
20376404edcSAsim Jamshed 			errno = ENOTSOCK;
20476404edcSAsim Jamshed 			return -1;
20576404edcSAsim Jamshed 		}
20676404edcSAsim Jamshed 
20776404edcSAsim Jamshed 		if (optname == SO_ERROR) {
20876404edcSAsim Jamshed 			if (socket->socktype == MOS_SOCK_STREAM) {
20976404edcSAsim Jamshed 				return GetSocketError(socket, optval, optlen);
21076404edcSAsim Jamshed 			}
21176404edcSAsim Jamshed 		}
21276404edcSAsim Jamshed 		break;
21376404edcSAsim Jamshed 	case SOL_MONSOCKET:
21476404edcSAsim Jamshed 		/* check if the calling thread is in MOS context */
21576404edcSAsim Jamshed 		if (mtcp->ctx->thread != pthread_self()) {
21676404edcSAsim Jamshed 			errno = EPERM;
21776404edcSAsim Jamshed 			return -1;
21876404edcSAsim Jamshed 		}
21976404edcSAsim Jamshed 		/*
22076404edcSAsim Jamshed 		 * All options will only work for active
22176404edcSAsim Jamshed 		 * monitor stream sockets
22276404edcSAsim Jamshed 		 */
22376404edcSAsim Jamshed 		socket = &mtcp->msmap[sockid];
22476404edcSAsim Jamshed 		if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
22576404edcSAsim Jamshed 			TRACE_API("Invalid socket id: %d\n", sockid);
22676404edcSAsim Jamshed 			errno = ENOTSOCK;
22776404edcSAsim Jamshed 			return -1;
22876404edcSAsim Jamshed 		}
22976404edcSAsim Jamshed 
23076404edcSAsim Jamshed 		switch (optname) {
23176404edcSAsim Jamshed 		case MOS_FRAGINFO_CLIBUF:
23276404edcSAsim Jamshed 			return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen);
23376404edcSAsim Jamshed 		case MOS_FRAGINFO_SVRBUF:
23476404edcSAsim Jamshed 			return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen);
23576404edcSAsim Jamshed 		case MOS_INFO_CLIBUF:
23676404edcSAsim Jamshed 			return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen);
23776404edcSAsim Jamshed 		case MOS_INFO_SVRBUF:
23876404edcSAsim Jamshed 			return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen);
23976404edcSAsim Jamshed 		case MOS_TCP_STATE_CLI:
24076404edcSAsim Jamshed 			return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI,
24176404edcSAsim Jamshed 							   optval, optlen);
24276404edcSAsim Jamshed 		case MOS_TCP_STATE_SVR:
24376404edcSAsim Jamshed 			return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR,
24476404edcSAsim Jamshed 							   optval, optlen);
24576404edcSAsim Jamshed 		case MOS_TIMESTAMP:
24676404edcSAsim Jamshed 			return GetLastTimestamp(socket->monitor_stream->stream,
24776404edcSAsim Jamshed 						(uint32_t *)optval,
24876404edcSAsim Jamshed 						optlen);
24976404edcSAsim Jamshed 		default:
25076404edcSAsim Jamshed 		  TRACE_API("can't recognize the optname=%d\n", optname);
25176404edcSAsim Jamshed 		  assert(0);
25276404edcSAsim Jamshed 		}
25376404edcSAsim Jamshed 		break;
25476404edcSAsim Jamshed 	}
25576404edcSAsim Jamshed 	errno = ENOSYS;
25676404edcSAsim Jamshed 	return -1;
25776404edcSAsim Jamshed }
25876404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
25976404edcSAsim Jamshed int
26076404edcSAsim Jamshed mtcp_setsockopt(mctx_t mctx, int sockid, int level,
26176404edcSAsim Jamshed 		int optname, const void *optval, socklen_t optlen)
26276404edcSAsim Jamshed {
26376404edcSAsim Jamshed 	mtcp_manager_t mtcp;
26476404edcSAsim Jamshed 	socket_map_t socket;
26576404edcSAsim Jamshed 	tcprb_t *rb;
26676404edcSAsim Jamshed 
26776404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
26876404edcSAsim Jamshed 	if (!mtcp) {
26976404edcSAsim Jamshed 		errno = EACCES;
27076404edcSAsim Jamshed 		return -1;
27176404edcSAsim Jamshed 	}
27276404edcSAsim Jamshed 
27376404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
27476404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
27576404edcSAsim Jamshed 		errno = EBADF;
27676404edcSAsim Jamshed 		return -1;
27776404edcSAsim Jamshed 	}
27876404edcSAsim Jamshed 
27976404edcSAsim Jamshed 	switch (level) {
28076404edcSAsim Jamshed 	case SOL_SOCKET:
28176404edcSAsim Jamshed 		socket = &mtcp->smap[sockid];
28276404edcSAsim Jamshed 		if (socket->socktype == MOS_SOCK_UNUSED) {
28376404edcSAsim Jamshed 			TRACE_API("Invalid socket id: %d\n", sockid);
28476404edcSAsim Jamshed 			errno = EBADF;
28576404edcSAsim Jamshed 			return -1;
28676404edcSAsim Jamshed 		}
28776404edcSAsim Jamshed 
28876404edcSAsim Jamshed 		if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
28976404edcSAsim Jamshed 		    socket->socktype != MOS_SOCK_STREAM) {
29076404edcSAsim Jamshed 			TRACE_API("Invalid socket id: %d\n", sockid);
29176404edcSAsim Jamshed 			errno = ENOTSOCK;
29276404edcSAsim Jamshed 			return -1;
29376404edcSAsim Jamshed 		}
29476404edcSAsim Jamshed 		break;
29576404edcSAsim Jamshed 	case SOL_MONSOCKET:
29676404edcSAsim Jamshed 		socket = &mtcp->msmap[sockid];
29776404edcSAsim Jamshed 		/*
29876404edcSAsim Jamshed 		 * checking of calling thread to be in MOS context is
29976404edcSAsim Jamshed 		 * disabled since both optnames can be called from
30076404edcSAsim Jamshed 		 * `application' context (on passive sockets)
30176404edcSAsim Jamshed 		 */
30276404edcSAsim Jamshed 		/*
30376404edcSAsim Jamshed 		 * if (mtcp->ctx->thread != pthread_self())
30476404edcSAsim Jamshed 		 * return -1;
30576404edcSAsim Jamshed 		 */
30676404edcSAsim Jamshed 
30776404edcSAsim Jamshed 		switch (optname) {
30876404edcSAsim Jamshed 		case MOS_CLIBUF:
30976404edcSAsim Jamshed #if 0
31076404edcSAsim Jamshed 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
31176404edcSAsim Jamshed 				errno = EBADF;
31276404edcSAsim Jamshed 				return -1;
31376404edcSAsim Jamshed 			}
31476404edcSAsim Jamshed #endif
31576404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE
31676404edcSAsim Jamshed 			if (*(int *)optval != 0)
31776404edcSAsim Jamshed 				return -1;
31876404edcSAsim Jamshed 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
31976404edcSAsim Jamshed 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
32076404edcSAsim Jamshed 					socket->monitor_stream->stream->rcvvar->rcvbuf :
32176404edcSAsim Jamshed 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
32276404edcSAsim Jamshed 				if (rb) {
32376404edcSAsim Jamshed 					tcprb_resize_meta(rb, 0);
32476404edcSAsim Jamshed 					tcprb_resize(rb, 0);
32576404edcSAsim Jamshed 				}
32676404edcSAsim Jamshed 			}
32776404edcSAsim Jamshed 			return DisableBuf(socket, MOS_SIDE_CLI);
32876404edcSAsim Jamshed #else
32976404edcSAsim Jamshed 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
33076404edcSAsim Jamshed 				socket->monitor_stream->stream->rcvvar->rcvbuf :
33176404edcSAsim Jamshed 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
33276404edcSAsim Jamshed 			if (tcprb_resize_meta(rb, *(int *)optval) < 0)
33376404edcSAsim Jamshed 				return -1;
33476404edcSAsim Jamshed 			return tcprb_resize(rb,
33576404edcSAsim Jamshed 					(((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
33676404edcSAsim Jamshed #endif
33776404edcSAsim Jamshed 		case MOS_SVRBUF:
33876404edcSAsim Jamshed #if 0
33976404edcSAsim Jamshed 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
34076404edcSAsim Jamshed 				errno = EBADF;
34176404edcSAsim Jamshed 				return -1;
34276404edcSAsim Jamshed 			}
34376404edcSAsim Jamshed #endif
34476404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE
34576404edcSAsim Jamshed 			if (*(int *)optval != 0)
34676404edcSAsim Jamshed 				return -1;
34776404edcSAsim Jamshed 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
34876404edcSAsim Jamshed 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
34976404edcSAsim Jamshed 					socket->monitor_stream->stream->rcvvar->rcvbuf :
35076404edcSAsim Jamshed 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
35176404edcSAsim Jamshed 				if (rb) {
35276404edcSAsim Jamshed 					tcprb_resize_meta(rb, 0);
35376404edcSAsim Jamshed 					tcprb_resize(rb, 0);
35476404edcSAsim Jamshed 				}
35576404edcSAsim Jamshed 			}
35676404edcSAsim Jamshed 			return DisableBuf(socket, MOS_SIDE_SVR);
35776404edcSAsim Jamshed #else
35876404edcSAsim Jamshed 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
35976404edcSAsim Jamshed 				socket->monitor_stream->stream->rcvvar->rcvbuf :
36076404edcSAsim Jamshed 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
36176404edcSAsim Jamshed 			if (tcprb_resize_meta(rb, *(int *)optval) < 0)
36276404edcSAsim Jamshed 				return -1;
36376404edcSAsim Jamshed 			return tcprb_resize(rb,
36476404edcSAsim Jamshed 					(((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
36576404edcSAsim Jamshed #endif
36676404edcSAsim Jamshed 		case MOS_FRAG_CLIBUF:
36776404edcSAsim Jamshed #if 0
36876404edcSAsim Jamshed 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
36976404edcSAsim Jamshed 				errno = EBADF;
37076404edcSAsim Jamshed 				return -1;
37176404edcSAsim Jamshed 			}
37276404edcSAsim Jamshed #endif
37376404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE
37476404edcSAsim Jamshed 			if (*(int *)optval != 0)
37576404edcSAsim Jamshed 				return -1;
37676404edcSAsim Jamshed 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
37776404edcSAsim Jamshed 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
37876404edcSAsim Jamshed 					socket->monitor_stream->stream->rcvvar->rcvbuf :
37976404edcSAsim Jamshed 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
38076404edcSAsim Jamshed 				if (rb)
38176404edcSAsim Jamshed 					tcprb_resize(rb, 0);
38276404edcSAsim Jamshed 			}
38376404edcSAsim Jamshed 			return 0;
38476404edcSAsim Jamshed #else
38576404edcSAsim Jamshed 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
38676404edcSAsim Jamshed 				socket->monitor_stream->stream->rcvvar->rcvbuf :
38776404edcSAsim Jamshed 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
38876404edcSAsim Jamshed 			if (rb->len == 0)
38976404edcSAsim Jamshed 				return tcprb_resize_meta(rb, *(int *)optval);
39076404edcSAsim Jamshed 			else
39176404edcSAsim Jamshed 				return -1;
39276404edcSAsim Jamshed #endif
39376404edcSAsim Jamshed 		case MOS_FRAG_SVRBUF:
39476404edcSAsim Jamshed #if 0
39576404edcSAsim Jamshed 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
39676404edcSAsim Jamshed 				errno = EBADF;
39776404edcSAsim Jamshed 				return -1;
39876404edcSAsim Jamshed 			}
39976404edcSAsim Jamshed #endif
40076404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE
40176404edcSAsim Jamshed 			if (*(int *)optval != 0)
40276404edcSAsim Jamshed 				return -1;
40376404edcSAsim Jamshed 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
40476404edcSAsim Jamshed 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
40576404edcSAsim Jamshed 					socket->monitor_stream->stream->rcvvar->rcvbuf :
40676404edcSAsim Jamshed 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
40776404edcSAsim Jamshed 				if (rb)
40876404edcSAsim Jamshed 					tcprb_resize(rb, 0);
40976404edcSAsim Jamshed 			}
41076404edcSAsim Jamshed 			return 0;
41176404edcSAsim Jamshed #else
41276404edcSAsim Jamshed 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
41376404edcSAsim Jamshed 				socket->monitor_stream->stream->rcvvar->rcvbuf :
41476404edcSAsim Jamshed 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
41576404edcSAsim Jamshed 			if (rb->len == 0)
41676404edcSAsim Jamshed 				return tcprb_resize_meta(rb, *(int *)optval);
41776404edcSAsim Jamshed 			else
41876404edcSAsim Jamshed 				return -1;
41976404edcSAsim Jamshed #endif
42076404edcSAsim Jamshed 		case MOS_MONLEVEL:
42176404edcSAsim Jamshed #ifdef OLD_API
42276404edcSAsim Jamshed 			assert(*(int *)optval == MOS_NO_CLIBUF ||
42376404edcSAsim Jamshed 			       *(int *)optval == MOS_NO_SVRBUF);
42476404edcSAsim Jamshed 			return DisableBuf(socket,
42576404edcSAsim Jamshed 					  (*(int *)optval == MOS_NO_CLIBUF) ?
42676404edcSAsim Jamshed 					  MOS_SIDE_CLI : MOS_SIDE_SVR);
42776404edcSAsim Jamshed #endif
42876404edcSAsim Jamshed 		case MOS_SEQ_REMAP:
42976404edcSAsim Jamshed 			return TcpSeqChange(socket,
43076404edcSAsim Jamshed 					    (uint32_t)((seq_remap_info *)optval)->seq_off,
43176404edcSAsim Jamshed 					    ((seq_remap_info *)optval)->side,
43276404edcSAsim Jamshed 					    mtcp->pctx->p.seq);
43376404edcSAsim Jamshed 		case MOS_STOP_MON:
43476404edcSAsim Jamshed 			return mtcp_cb_stop(mctx, sockid, *(int *)optval);
43576404edcSAsim Jamshed 		default:
43676404edcSAsim Jamshed 			TRACE_API("invalid optname=%d\n", optname);
43776404edcSAsim Jamshed 			assert(0);
43876404edcSAsim Jamshed 		}
43976404edcSAsim Jamshed 		break;
44076404edcSAsim Jamshed 	}
44176404edcSAsim Jamshed 
44276404edcSAsim Jamshed 	errno = ENOSYS;
44376404edcSAsim Jamshed 	return -1;
44476404edcSAsim Jamshed }
44576404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
44676404edcSAsim Jamshed int
44776404edcSAsim Jamshed mtcp_setsock_nonblock(mctx_t mctx, int sockid)
44876404edcSAsim Jamshed {
44976404edcSAsim Jamshed 	mtcp_manager_t mtcp;
45076404edcSAsim Jamshed 
45176404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
45276404edcSAsim Jamshed 	if (!mtcp) {
45376404edcSAsim Jamshed 		errno = EACCES;
45476404edcSAsim Jamshed 		return -1;
45576404edcSAsim Jamshed 	}
45676404edcSAsim Jamshed 
45776404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
45876404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
45976404edcSAsim Jamshed 		errno = EBADF;
46076404edcSAsim Jamshed 		return -1;
46176404edcSAsim Jamshed 	}
46276404edcSAsim Jamshed 
46376404edcSAsim Jamshed 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
46476404edcSAsim Jamshed 		TRACE_API("Invalid socket id: %d\n", sockid);
46576404edcSAsim Jamshed 		errno = EBADF;
46676404edcSAsim Jamshed 		return -1;
46776404edcSAsim Jamshed 	}
46876404edcSAsim Jamshed 
46976404edcSAsim Jamshed 	mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
47076404edcSAsim Jamshed 
47176404edcSAsim Jamshed 	return 0;
47276404edcSAsim Jamshed }
47376404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
47476404edcSAsim Jamshed int
47576404edcSAsim Jamshed mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp)
47676404edcSAsim Jamshed {
47776404edcSAsim Jamshed 	mtcp_manager_t mtcp;
47876404edcSAsim Jamshed 	socket_map_t socket;
47976404edcSAsim Jamshed 
48076404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
48176404edcSAsim Jamshed 	if (!mtcp) {
48276404edcSAsim Jamshed 		errno = EACCES;
48376404edcSAsim Jamshed 		return -1;
48476404edcSAsim Jamshed 	}
48576404edcSAsim Jamshed 
48676404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
48776404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
48876404edcSAsim Jamshed 		errno = EBADF;
48976404edcSAsim Jamshed 		return -1;
49076404edcSAsim Jamshed 	}
49176404edcSAsim Jamshed 
49276404edcSAsim Jamshed 	/* only support stream socket */
49376404edcSAsim Jamshed 	socket = &mtcp->smap[sockid];
49476404edcSAsim Jamshed 
49576404edcSAsim Jamshed 	if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
49676404edcSAsim Jamshed 		socket->socktype != MOS_SOCK_STREAM) {
49776404edcSAsim Jamshed 		TRACE_API("Invalid socket id: %d\n", sockid);
49876404edcSAsim Jamshed 		errno = EBADF;
49976404edcSAsim Jamshed 		return -1;
50076404edcSAsim Jamshed 	}
50176404edcSAsim Jamshed 
50276404edcSAsim Jamshed 	if (!argp) {
50376404edcSAsim Jamshed 		errno = EFAULT;
50476404edcSAsim Jamshed 		return -1;
50576404edcSAsim Jamshed 	}
50676404edcSAsim Jamshed 
50776404edcSAsim Jamshed 	if (request == FIONREAD) {
50876404edcSAsim Jamshed 		tcp_stream *cur_stream;
50976404edcSAsim Jamshed 		tcprb_t *rbuf;
51076404edcSAsim Jamshed 
51176404edcSAsim Jamshed 		cur_stream = socket->stream;
51276404edcSAsim Jamshed 		if (!cur_stream) {
51376404edcSAsim Jamshed 			errno = EBADF;
51476404edcSAsim Jamshed 			return -1;
51576404edcSAsim Jamshed 		}
51676404edcSAsim Jamshed 
51776404edcSAsim Jamshed 		rbuf = cur_stream->rcvvar->rcvbuf;
51876404edcSAsim Jamshed 		*(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0;
51976404edcSAsim Jamshed 
52076404edcSAsim Jamshed 	} else if (request == FIONBIO) {
52176404edcSAsim Jamshed 		/*
52276404edcSAsim Jamshed 		 * sockets can only be set to blocking/non-blocking
52376404edcSAsim Jamshed 		 * modes during initialization
52476404edcSAsim Jamshed 		 */
52576404edcSAsim Jamshed 		if ((*(int *)argp))
52676404edcSAsim Jamshed 			mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
52776404edcSAsim Jamshed 		else
52876404edcSAsim Jamshed 			mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK;
52976404edcSAsim Jamshed 	} else {
53076404edcSAsim Jamshed 		errno = EINVAL;
53176404edcSAsim Jamshed 		return -1;
53276404edcSAsim Jamshed 	}
53376404edcSAsim Jamshed 
53476404edcSAsim Jamshed 	return 0;
53576404edcSAsim Jamshed }
53676404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
53776404edcSAsim Jamshed static int
53876404edcSAsim Jamshed mtcp_monitor(mctx_t mctx, socket_map_t sock)
53976404edcSAsim Jamshed {
54076404edcSAsim Jamshed 	mtcp_manager_t mtcp;
54176404edcSAsim Jamshed 	struct mon_listener *monitor;
54276404edcSAsim Jamshed 	int sockid = sock->id;
54376404edcSAsim Jamshed 
54476404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
54576404edcSAsim Jamshed 	if (!mtcp) {
54676404edcSAsim Jamshed 		errno = EACCES;
54776404edcSAsim Jamshed 		return -1;
54876404edcSAsim Jamshed 	}
54976404edcSAsim Jamshed 
55076404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
55176404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
55276404edcSAsim Jamshed 		errno = EBADF;
55376404edcSAsim Jamshed 		return -1;
55476404edcSAsim Jamshed 	}
55576404edcSAsim Jamshed 
55676404edcSAsim Jamshed 	if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) {
55776404edcSAsim Jamshed 		TRACE_API("Invalid socket id: %d\n", sockid);
55876404edcSAsim Jamshed 		errno = EBADF;
55976404edcSAsim Jamshed 		return -1;
56076404edcSAsim Jamshed 	}
56176404edcSAsim Jamshed 
56276404edcSAsim Jamshed 	if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM ||
56376404edcSAsim Jamshed 	      mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) {
56476404edcSAsim Jamshed 		TRACE_API("Not a monitor socket. id: %d\n", sockid);
56576404edcSAsim Jamshed 		errno = ENOTSOCK;
56676404edcSAsim Jamshed 		return -1;
56776404edcSAsim Jamshed 	}
56876404edcSAsim Jamshed 
56976404edcSAsim Jamshed 	monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener));
57076404edcSAsim Jamshed 	if (!monitor) {
57176404edcSAsim Jamshed 		/* errno set from the malloc() */
57276404edcSAsim Jamshed 		errno = ENOMEM;
57376404edcSAsim Jamshed 		return -1;
57476404edcSAsim Jamshed 	}
57576404edcSAsim Jamshed 
57676404edcSAsim Jamshed 	/* create a monitor-specific event queue */
57776404edcSAsim Jamshed 	monitor->eq = CreateEventQueue(g_config.mos->max_concurrency);
57876404edcSAsim Jamshed 	if (!monitor->eq) {
57976404edcSAsim Jamshed 		TRACE_API("Can't create event queue (concurrency: %d) for "
58076404edcSAsim Jamshed 			  "monitor read event registrations!\n",
58176404edcSAsim Jamshed 			  g_config.mos->max_concurrency);
58276404edcSAsim Jamshed 		free(monitor);
58376404edcSAsim Jamshed 		errno = ENOMEM;
58476404edcSAsim Jamshed 		return -1;
58576404edcSAsim Jamshed 	}
58676404edcSAsim Jamshed 
58776404edcSAsim Jamshed 	/* set monitor-related basic parameters */
58876404edcSAsim Jamshed #ifndef NEWEV
58976404edcSAsim Jamshed 	monitor->ude_id = UDE_OFFSET;
59076404edcSAsim Jamshed #endif
59176404edcSAsim Jamshed 	monitor->socket = sock;
59276404edcSAsim Jamshed 	monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL;
59376404edcSAsim Jamshed 
59476404edcSAsim Jamshed 	/* perform both sides monitoring by default */
59576404edcSAsim Jamshed 	monitor->client_mon = monitor->server_mon = 1;
59676404edcSAsim Jamshed 
59776404edcSAsim Jamshed 	/* add monitor socket to the monitor list */
59876404edcSAsim Jamshed 	TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link);
59976404edcSAsim Jamshed 
60076404edcSAsim Jamshed 	mtcp->msmap[sockid].monitor_listener = monitor;
60176404edcSAsim Jamshed 
60276404edcSAsim Jamshed 	return 0;
60376404edcSAsim Jamshed }
60476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
60576404edcSAsim Jamshed int
60676404edcSAsim Jamshed mtcp_socket(mctx_t mctx, int domain, int type, int protocol)
60776404edcSAsim Jamshed {
60876404edcSAsim Jamshed 	mtcp_manager_t mtcp;
60976404edcSAsim Jamshed 	socket_map_t socket;
61076404edcSAsim Jamshed 
61176404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
61276404edcSAsim Jamshed 	if (!mtcp) {
61376404edcSAsim Jamshed 		errno = EACCES;
61476404edcSAsim Jamshed 		return -1;
61576404edcSAsim Jamshed 	}
61676404edcSAsim Jamshed 
61776404edcSAsim Jamshed 	if (domain != AF_INET) {
61876404edcSAsim Jamshed 		errno = EAFNOSUPPORT;
61976404edcSAsim Jamshed 		return -1;
62076404edcSAsim Jamshed 	}
62176404edcSAsim Jamshed 
62276404edcSAsim Jamshed 	if (type == SOCK_STREAM) {
62376404edcSAsim Jamshed 		type = MOS_SOCK_STREAM;
62476404edcSAsim Jamshed 	} else if (type == MOS_SOCK_MONITOR_STREAM ||
62576404edcSAsim Jamshed 		   type == MOS_SOCK_MONITOR_RAW) {
62676404edcSAsim Jamshed 		/* do nothing for the time being */
62776404edcSAsim Jamshed 	} else {
62876404edcSAsim Jamshed 		/* Not supported type */
62976404edcSAsim Jamshed 		errno = EINVAL;
63076404edcSAsim Jamshed 		return -1;
63176404edcSAsim Jamshed 	}
63276404edcSAsim Jamshed 
63376404edcSAsim Jamshed 	socket = AllocateSocket(mctx, type);
63476404edcSAsim Jamshed 	if (!socket) {
63576404edcSAsim Jamshed 		errno = ENFILE;
63676404edcSAsim Jamshed 		return -1;
63776404edcSAsim Jamshed 	}
63876404edcSAsim Jamshed 
63976404edcSAsim Jamshed 	if (type == MOS_SOCK_MONITOR_STREAM ||
64076404edcSAsim Jamshed 	    type == MOS_SOCK_MONITOR_RAW) {
64176404edcSAsim Jamshed 		mtcp_manager_t mtcp = GetMTCPManager(mctx);
64276404edcSAsim Jamshed 		if (!mtcp) {
64376404edcSAsim Jamshed 			errno = EACCES;
64476404edcSAsim Jamshed 			return -1;
64576404edcSAsim Jamshed 		}
64676404edcSAsim Jamshed 		mtcp_monitor(mctx, socket);
64776404edcSAsim Jamshed #ifdef NEWEV
64876404edcSAsim Jamshed 		socket->monitor_listener->stree_dontcare = NULL;
64976404edcSAsim Jamshed 		socket->monitor_listener->stree_pre_rcv = NULL;
65076404edcSAsim Jamshed 		socket->monitor_listener->stree_post_snd = NULL;
65176404edcSAsim Jamshed #else
65276404edcSAsim Jamshed 		InitEvB(mtcp, &socket->monitor_listener->dontcare_evb);
65376404edcSAsim Jamshed 		InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb);
65476404edcSAsim Jamshed 		InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb);
65576404edcSAsim Jamshed #endif
65676404edcSAsim Jamshed 	}
65776404edcSAsim Jamshed 
65876404edcSAsim Jamshed 	return socket->id;
65976404edcSAsim Jamshed }
66076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
66176404edcSAsim Jamshed int
66276404edcSAsim Jamshed mtcp_bind(mctx_t mctx, int sockid,
66376404edcSAsim Jamshed 		const struct sockaddr *addr, socklen_t addrlen)
66476404edcSAsim Jamshed {
66576404edcSAsim Jamshed 	mtcp_manager_t mtcp;
66676404edcSAsim Jamshed 	struct sockaddr_in *addr_in;
66776404edcSAsim Jamshed 
66876404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
66976404edcSAsim Jamshed 	if (!mtcp) {
67076404edcSAsim Jamshed 		errno = EACCES;
67176404edcSAsim Jamshed 		return -1;
67276404edcSAsim Jamshed 	}
67376404edcSAsim Jamshed 
67476404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
67576404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
67676404edcSAsim Jamshed 		errno = EBADF;
67776404edcSAsim Jamshed 		return -1;
67876404edcSAsim Jamshed 	}
67976404edcSAsim Jamshed 
68076404edcSAsim Jamshed 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
68176404edcSAsim Jamshed 		TRACE_API("Invalid socket id: %d\n", sockid);
68276404edcSAsim Jamshed 		errno = EBADF;
68376404edcSAsim Jamshed 		return -1;
68476404edcSAsim Jamshed 	}
68576404edcSAsim Jamshed 
68676404edcSAsim Jamshed 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM &&
68776404edcSAsim Jamshed 			mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
68876404edcSAsim Jamshed 		TRACE_API("Not a stream socket id: %d\n", sockid);
68976404edcSAsim Jamshed 		errno = ENOTSOCK;
69076404edcSAsim Jamshed 		return -1;
69176404edcSAsim Jamshed 	}
69276404edcSAsim Jamshed 
69376404edcSAsim Jamshed 	if (!addr) {
69476404edcSAsim Jamshed 		TRACE_API("Socket %d: empty address!\n", sockid);
69576404edcSAsim Jamshed 		errno = EINVAL;
69676404edcSAsim Jamshed 		return -1;
69776404edcSAsim Jamshed 	}
69876404edcSAsim Jamshed 
69976404edcSAsim Jamshed 	if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) {
70076404edcSAsim Jamshed 		TRACE_API("Socket %d: adress already bind for this socket.\n", sockid);
70176404edcSAsim Jamshed 		errno = EINVAL;
70276404edcSAsim Jamshed 		return -1;
70376404edcSAsim Jamshed 	}
70476404edcSAsim Jamshed 
70576404edcSAsim Jamshed 	/* we only allow bind() for AF_INET address */
70676404edcSAsim Jamshed 	if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
70776404edcSAsim Jamshed 		TRACE_API("Socket %d: invalid argument!\n", sockid);
70876404edcSAsim Jamshed 		errno = EINVAL;
70976404edcSAsim Jamshed 		return -1;
71076404edcSAsim Jamshed 	}
71176404edcSAsim Jamshed 
71276404edcSAsim Jamshed 	if (mtcp->listener) {
71376404edcSAsim Jamshed 		TRACE_API("Address already bound!\n");
71476404edcSAsim Jamshed 		errno = EINVAL;
71576404edcSAsim Jamshed 		return -1;
71676404edcSAsim Jamshed 	}
71776404edcSAsim Jamshed 	addr_in = (struct sockaddr_in *)addr;
71876404edcSAsim Jamshed 	mtcp->smap[sockid].saddr = *addr_in;
71976404edcSAsim Jamshed 	mtcp->smap[sockid].opts |= MTCP_ADDR_BIND;
72076404edcSAsim Jamshed 
72176404edcSAsim Jamshed 	return 0;
72276404edcSAsim Jamshed }
72376404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
72476404edcSAsim Jamshed int
72576404edcSAsim Jamshed mtcp_listen(mctx_t mctx, int sockid, int backlog)
72676404edcSAsim Jamshed {
72776404edcSAsim Jamshed 	mtcp_manager_t mtcp;
72876404edcSAsim Jamshed 	struct tcp_listener *listener;
72976404edcSAsim Jamshed 
73076404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
73176404edcSAsim Jamshed 	if (!mtcp) {
73276404edcSAsim Jamshed 		errno = EACCES;
73376404edcSAsim Jamshed 		return -1;
73476404edcSAsim Jamshed 	}
73576404edcSAsim Jamshed 
73676404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
73776404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
73876404edcSAsim Jamshed 		errno = EBADF;
73976404edcSAsim Jamshed 		return -1;
74076404edcSAsim Jamshed 	}
74176404edcSAsim Jamshed 
74276404edcSAsim Jamshed 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
74376404edcSAsim Jamshed 		TRACE_API("Invalid socket id: %d\n", sockid);
74476404edcSAsim Jamshed 		errno = EBADF;
74576404edcSAsim Jamshed 		return -1;
74676404edcSAsim Jamshed 	}
74776404edcSAsim Jamshed 
74876404edcSAsim Jamshed 	if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) {
74976404edcSAsim Jamshed 		mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN;
75076404edcSAsim Jamshed 	}
75176404edcSAsim Jamshed 
75276404edcSAsim Jamshed 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
75376404edcSAsim Jamshed 		TRACE_API("Not a listening socket. id: %d\n", sockid);
75476404edcSAsim Jamshed 		errno = ENOTSOCK;
75576404edcSAsim Jamshed 		return -1;
75676404edcSAsim Jamshed 	}
75776404edcSAsim Jamshed 
75876404edcSAsim Jamshed 	if (backlog <= 0 || backlog > g_config.mos->max_concurrency) {
75976404edcSAsim Jamshed 		errno = EINVAL;
76076404edcSAsim Jamshed 		return -1;
76176404edcSAsim Jamshed 	}
76276404edcSAsim Jamshed 
76376404edcSAsim Jamshed 	listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener));
76476404edcSAsim Jamshed 	if (!listener) {
76576404edcSAsim Jamshed 		/* errno set from the malloc() */
76676404edcSAsim Jamshed 		errno = ENOMEM;
76776404edcSAsim Jamshed 		return -1;
76876404edcSAsim Jamshed 	}
76976404edcSAsim Jamshed 
77076404edcSAsim Jamshed 	listener->sockid = sockid;
77176404edcSAsim Jamshed 	listener->backlog = backlog;
77276404edcSAsim Jamshed 	listener->socket = &mtcp->smap[sockid];
77376404edcSAsim Jamshed 
77476404edcSAsim Jamshed 	if (pthread_cond_init(&listener->accept_cond, NULL)) {
77576404edcSAsim Jamshed 		perror("pthread_cond_init of ctx->accept_cond\n");
77676404edcSAsim Jamshed 		/* errno set by pthread_cond_init() */
77776404edcSAsim Jamshed 		return -1;
77876404edcSAsim Jamshed 	}
77976404edcSAsim Jamshed 	if (pthread_mutex_init(&listener->accept_lock, NULL)) {
78076404edcSAsim Jamshed 		perror("pthread_mutex_init of ctx->accept_lock\n");
78176404edcSAsim Jamshed 		/* errno set by pthread_mutex_init() */
78276404edcSAsim Jamshed 		return -1;
78376404edcSAsim Jamshed 	}
78476404edcSAsim Jamshed 
78576404edcSAsim Jamshed 	listener->acceptq = CreateStreamQueue(backlog);
78676404edcSAsim Jamshed 	if (!listener->acceptq) {
78776404edcSAsim Jamshed 		errno = ENOMEM;
78876404edcSAsim Jamshed 		return -1;
78976404edcSAsim Jamshed 	}
79076404edcSAsim Jamshed 
79176404edcSAsim Jamshed 	mtcp->smap[sockid].listener = listener;
79276404edcSAsim Jamshed 	mtcp->listener = listener;
79376404edcSAsim Jamshed 
79476404edcSAsim Jamshed 	return 0;
79576404edcSAsim Jamshed }
79676404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
79776404edcSAsim Jamshed int
79876404edcSAsim Jamshed mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen)
79976404edcSAsim Jamshed {
80076404edcSAsim Jamshed 	mtcp_manager_t mtcp;
80176404edcSAsim Jamshed 	struct tcp_listener *listener;
80276404edcSAsim Jamshed 	socket_map_t socket;
80376404edcSAsim Jamshed 	tcp_stream *accepted = NULL;
80476404edcSAsim Jamshed 
80576404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
80676404edcSAsim Jamshed 	if (!mtcp) {
80776404edcSAsim Jamshed 		errno = EACCES;
80876404edcSAsim Jamshed 		return -1;
80976404edcSAsim Jamshed 	}
81076404edcSAsim Jamshed 
81176404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
81276404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
81376404edcSAsim Jamshed 		errno = EBADF;
81476404edcSAsim Jamshed 		return -1;
81576404edcSAsim Jamshed 	}
81676404edcSAsim Jamshed 
81776404edcSAsim Jamshed 	/* requires listening socket */
81876404edcSAsim Jamshed 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
81976404edcSAsim Jamshed 		errno = EINVAL;
82076404edcSAsim Jamshed 		return -1;
82176404edcSAsim Jamshed 	}
82276404edcSAsim Jamshed 
82376404edcSAsim Jamshed 	listener = mtcp->smap[sockid].listener;
82476404edcSAsim Jamshed 
82576404edcSAsim Jamshed 	/* dequeue from the acceptq without lock first */
82676404edcSAsim Jamshed 	/* if nothing there, acquire lock and cond_wait */
82776404edcSAsim Jamshed 	accepted = StreamDequeue(listener->acceptq);
82876404edcSAsim Jamshed 	if (!accepted) {
82976404edcSAsim Jamshed 		if (listener->socket->opts & MTCP_NONBLOCK) {
83076404edcSAsim Jamshed 			errno = EAGAIN;
83176404edcSAsim Jamshed 			return -1;
83276404edcSAsim Jamshed 
83376404edcSAsim Jamshed 		} else {
83476404edcSAsim Jamshed 			pthread_mutex_lock(&listener->accept_lock);
83576404edcSAsim Jamshed 			while ((accepted = StreamDequeue(listener->acceptq)) == NULL) {
83676404edcSAsim Jamshed 				pthread_cond_wait(&listener->accept_cond, &listener->accept_lock);
83776404edcSAsim Jamshed 
83876404edcSAsim Jamshed 				if (mtcp->ctx->done || mtcp->ctx->exit) {
83976404edcSAsim Jamshed 					pthread_mutex_unlock(&listener->accept_lock);
84076404edcSAsim Jamshed 					errno = EINTR;
84176404edcSAsim Jamshed 					return -1;
84276404edcSAsim Jamshed 				}
84376404edcSAsim Jamshed 			}
84476404edcSAsim Jamshed 			pthread_mutex_unlock(&listener->accept_lock);
84576404edcSAsim Jamshed 		}
84676404edcSAsim Jamshed 	}
84776404edcSAsim Jamshed 
84876404edcSAsim Jamshed 	if (!accepted) {
84976404edcSAsim Jamshed 		TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n");
85076404edcSAsim Jamshed 	}
85176404edcSAsim Jamshed 
85276404edcSAsim Jamshed 	if (!accepted->socket) {
85376404edcSAsim Jamshed 		socket = AllocateSocket(mctx, MOS_SOCK_STREAM);
85476404edcSAsim Jamshed 		if (!socket) {
85576404edcSAsim Jamshed 			TRACE_ERROR("Failed to create new socket!\n");
85676404edcSAsim Jamshed 			/* TODO: destroy the stream */
85776404edcSAsim Jamshed 			errno = ENFILE;
85876404edcSAsim Jamshed 			return -1;
85976404edcSAsim Jamshed 		}
86076404edcSAsim Jamshed 		socket->stream = accepted;
86176404edcSAsim Jamshed 		accepted->socket = socket;
862a5e1a556SAsim Jamshed 
863a5e1a556SAsim Jamshed 		/* set socket addr parameters */
864a5e1a556SAsim Jamshed 		socket->saddr.sin_family = AF_INET;
865a5e1a556SAsim Jamshed 		socket->saddr.sin_port = accepted->dport;
866a5e1a556SAsim Jamshed 		socket->saddr.sin_addr.s_addr = accepted->daddr;
867a5e1a556SAsim Jamshed 
86876404edcSAsim Jamshed 		/* if monitor is enabled, complete the socket assignment */
86976404edcSAsim Jamshed 		if (socket->stream->pair_stream != NULL)
87076404edcSAsim Jamshed 			socket->stream->pair_stream->socket = socket;
87176404edcSAsim Jamshed 	}
87276404edcSAsim Jamshed 
873a5e1a556SAsim Jamshed 	if (!(listener->socket->epoll & MOS_EPOLLET) &&
874a5e1a556SAsim Jamshed 	    !StreamQueueIsEmpty(listener->acceptq))
875a5e1a556SAsim Jamshed 		AddEpollEvent(mtcp->ep,
876a5e1a556SAsim Jamshed 			      USR_SHADOW_EVENT_QUEUE,
877a5e1a556SAsim Jamshed 			      listener->socket, MOS_EPOLLIN);
878a5e1a556SAsim Jamshed 
87976404edcSAsim Jamshed 	TRACE_API("Stream %d accepted.\n", accepted->id);
88076404edcSAsim Jamshed 
88176404edcSAsim Jamshed 	if (addr && addrlen) {
88276404edcSAsim Jamshed 		struct sockaddr_in *addr_in = (struct sockaddr_in *)addr;
88376404edcSAsim Jamshed 		addr_in->sin_family = AF_INET;
88476404edcSAsim Jamshed 		addr_in->sin_port = accepted->dport;
88576404edcSAsim Jamshed 		addr_in->sin_addr.s_addr = accepted->daddr;
88676404edcSAsim Jamshed 		*addrlen = sizeof(struct sockaddr_in);
88776404edcSAsim Jamshed 	}
88876404edcSAsim Jamshed 
88976404edcSAsim Jamshed 	return accepted->socket->id;
89076404edcSAsim Jamshed }
89176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
89276404edcSAsim Jamshed int
89376404edcSAsim Jamshed mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr,
89476404edcSAsim Jamshed 		in_addr_t daddr, in_addr_t dport)
89576404edcSAsim Jamshed {
89676404edcSAsim Jamshed 	mtcp_manager_t mtcp;
89776404edcSAsim Jamshed 	addr_pool_t ap;
89876404edcSAsim Jamshed 
89976404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
90076404edcSAsim Jamshed 	if (!mtcp) {
90176404edcSAsim Jamshed 		errno = EACCES;
90276404edcSAsim Jamshed 		return -1;
90376404edcSAsim Jamshed 	}
90476404edcSAsim Jamshed 
90576404edcSAsim Jamshed 	if (saddr_base == INADDR_ANY) {
90676404edcSAsim Jamshed 		int nif_out;
90776404edcSAsim Jamshed 
90876404edcSAsim Jamshed 		/* for the INADDR_ANY, find the output interface for the destination
90976404edcSAsim Jamshed 		   and set the saddr_base as the ip address of the output interface */
91076404edcSAsim Jamshed 		nif_out = GetOutputInterface(daddr);
91176404edcSAsim Jamshed 		saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr;
91276404edcSAsim Jamshed 	}
91376404edcSAsim Jamshed 
91476404edcSAsim Jamshed 	ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus,
91576404edcSAsim Jamshed 			saddr_base, num_addr, daddr, dport);
91676404edcSAsim Jamshed 	if (!ap) {
91776404edcSAsim Jamshed 		errno = ENOMEM;
91876404edcSAsim Jamshed 		return -1;
91976404edcSAsim Jamshed 	}
92076404edcSAsim Jamshed 
92176404edcSAsim Jamshed 	mtcp->ap = ap;
92276404edcSAsim Jamshed 
92376404edcSAsim Jamshed 	return 0;
92476404edcSAsim Jamshed }
92576404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
92676404edcSAsim Jamshed int
92776404edcSAsim Jamshed eval_bpf_5tuple(struct sfbpf_program fcode,
92876404edcSAsim Jamshed 				in_addr_t saddr, in_port_t sport,
92976404edcSAsim Jamshed 				in_addr_t daddr, in_port_t dport) {
93076404edcSAsim Jamshed 	uint8_t buf[TOTAL_TCP_HEADER_LEN];
93176404edcSAsim Jamshed 	struct ethhdr *ethh;
93276404edcSAsim Jamshed 	struct iphdr *iph;
93376404edcSAsim Jamshed 	struct tcphdr *tcph;
93476404edcSAsim Jamshed 
93576404edcSAsim Jamshed 	ethh = (struct ethhdr *)buf;
93676404edcSAsim Jamshed 	ethh->h_proto = htons(ETH_P_IP);
93776404edcSAsim Jamshed 	iph = (struct iphdr *)(ethh + 1);
93876404edcSAsim Jamshed 	iph->ihl = IP_HEADER_LEN >> 2;
93976404edcSAsim Jamshed 	iph->version = 4;
94076404edcSAsim Jamshed 	iph->tos = 0;
94176404edcSAsim Jamshed 	iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN);
94276404edcSAsim Jamshed 	iph->id = htons(0);
94376404edcSAsim Jamshed 	iph->protocol = IPPROTO_TCP;
94476404edcSAsim Jamshed 	iph->saddr = saddr;
94576404edcSAsim Jamshed 	iph->daddr = daddr;
94676404edcSAsim Jamshed 	iph->check = 0;
94776404edcSAsim Jamshed 	tcph = (struct tcphdr *)(iph + 1);
94876404edcSAsim Jamshed 	tcph->source = sport;
94976404edcSAsim Jamshed 	tcph->dest = dport;
95076404edcSAsim Jamshed 
95176404edcSAsim Jamshed 	return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr),
95276404edcSAsim Jamshed 						 TOTAL_TCP_HEADER_LEN);
95376404edcSAsim Jamshed }
95476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
95576404edcSAsim Jamshed int
95676404edcSAsim Jamshed mtcp_connect(mctx_t mctx, int sockid,
95776404edcSAsim Jamshed 		const struct sockaddr *addr, socklen_t addrlen)
95876404edcSAsim Jamshed {
95976404edcSAsim Jamshed 	mtcp_manager_t mtcp;
96076404edcSAsim Jamshed 	socket_map_t socket;
96176404edcSAsim Jamshed 	tcp_stream *cur_stream;
96276404edcSAsim Jamshed 	struct sockaddr_in *addr_in;
96376404edcSAsim Jamshed 	in_addr_t dip;
96476404edcSAsim Jamshed 	in_port_t dport;
96576404edcSAsim Jamshed 	int is_dyn_bound = FALSE;
96676404edcSAsim Jamshed 	int ret;
96776404edcSAsim Jamshed 	int cnt_match = 0;
96876404edcSAsim Jamshed 	struct mon_listener *walk;
96976404edcSAsim Jamshed 	struct sfbpf_program fcode;
97076404edcSAsim Jamshed 
97176404edcSAsim Jamshed 	cur_stream = NULL;
97276404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
97376404edcSAsim Jamshed 	if (!mtcp) {
97476404edcSAsim Jamshed 		errno = EACCES;
97576404edcSAsim Jamshed 		return -1;
97676404edcSAsim Jamshed 	}
97776404edcSAsim Jamshed 
97876404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
97976404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
98076404edcSAsim Jamshed 		errno = EBADF;
98176404edcSAsim Jamshed 		return -1;
98276404edcSAsim Jamshed 	}
98376404edcSAsim Jamshed 
98476404edcSAsim Jamshed 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
98576404edcSAsim Jamshed 		TRACE_API("Invalid socket id: %d\n", sockid);
98676404edcSAsim Jamshed 		errno = EBADF;
98776404edcSAsim Jamshed 		return -1;
98876404edcSAsim Jamshed 	}
98976404edcSAsim Jamshed 
99076404edcSAsim Jamshed 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
99176404edcSAsim Jamshed 		TRACE_API("Not an end socket. id: %d\n", sockid);
99276404edcSAsim Jamshed 		errno = ENOTSOCK;
99376404edcSAsim Jamshed 		return -1;
99476404edcSAsim Jamshed 	}
99576404edcSAsim Jamshed 
99676404edcSAsim Jamshed 	if (!addr) {
99776404edcSAsim Jamshed 		TRACE_API("Socket %d: empty address!\n", sockid);
99876404edcSAsim Jamshed 		errno = EFAULT;
99976404edcSAsim Jamshed 		return -1;
100076404edcSAsim Jamshed 	}
100176404edcSAsim Jamshed 
100276404edcSAsim Jamshed 	/* we only allow bind() for AF_INET address */
100376404edcSAsim Jamshed 	if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
100476404edcSAsim Jamshed 		TRACE_API("Socket %d: invalid argument!\n", sockid);
100576404edcSAsim Jamshed 		errno = EAFNOSUPPORT;
100676404edcSAsim Jamshed 		return -1;
100776404edcSAsim Jamshed 	}
100876404edcSAsim Jamshed 
100976404edcSAsim Jamshed 	socket = &mtcp->smap[sockid];
101076404edcSAsim Jamshed 	if (socket->stream) {
101176404edcSAsim Jamshed 		TRACE_API("Socket %d: stream already exist!\n", sockid);
101276404edcSAsim Jamshed 		if (socket->stream->state >= TCP_ST_ESTABLISHED) {
101376404edcSAsim Jamshed 			errno = EISCONN;
101476404edcSAsim Jamshed 		} else {
101576404edcSAsim Jamshed 			errno = EALREADY;
101676404edcSAsim Jamshed 		}
101776404edcSAsim Jamshed 		return -1;
101876404edcSAsim Jamshed 	}
101976404edcSAsim Jamshed 
102076404edcSAsim Jamshed 	addr_in = (struct sockaddr_in *)addr;
102176404edcSAsim Jamshed 	dip = addr_in->sin_addr.s_addr;
102276404edcSAsim Jamshed 	dport = addr_in->sin_port;
102376404edcSAsim Jamshed 
102476404edcSAsim Jamshed 	/* address binding */
102576404edcSAsim Jamshed 	if (socket->opts & MTCP_ADDR_BIND &&
102676404edcSAsim Jamshed 	    socket->saddr.sin_port != INPORT_ANY &&
102776404edcSAsim Jamshed 	    socket->saddr.sin_addr.s_addr != INADDR_ANY) {
102876404edcSAsim Jamshed 		int rss_core;
102976404edcSAsim Jamshed 
103076404edcSAsim Jamshed 		rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip,
103176404edcSAsim Jamshed 				socket->saddr.sin_port, dport, num_queues);
103276404edcSAsim Jamshed 
103376404edcSAsim Jamshed 		if (rss_core != mctx->cpu) {
103476404edcSAsim Jamshed 			errno = EINVAL;
103576404edcSAsim Jamshed 			return -1;
103676404edcSAsim Jamshed 		}
103776404edcSAsim Jamshed 	} else {
103876404edcSAsim Jamshed 		if (mtcp->ap) {
103976404edcSAsim Jamshed 			ret = FetchAddress(mtcp->ap,
104076404edcSAsim Jamshed 					mctx->cpu, num_queues, addr_in, &socket->saddr);
104176404edcSAsim Jamshed 		} else {
1042a5e1a556SAsim Jamshed 			ret = FetchAddress(ap[GetOutputInterface(dip)],
104376404edcSAsim Jamshed 					   mctx->cpu, num_queues, addr_in, &socket->saddr);
104476404edcSAsim Jamshed 		}
104576404edcSAsim Jamshed 		if (ret < 0) {
104676404edcSAsim Jamshed 			errno = EAGAIN;
104776404edcSAsim Jamshed 			return -1;
104876404edcSAsim Jamshed 		}
104976404edcSAsim Jamshed 		socket->opts |= MTCP_ADDR_BIND;
105076404edcSAsim Jamshed 		is_dyn_bound = TRUE;
105176404edcSAsim Jamshed 	}
105276404edcSAsim Jamshed 
105376404edcSAsim Jamshed 	cnt_match = 0;
105476404edcSAsim Jamshed 	if (mtcp->num_msp > 0) {
105576404edcSAsim Jamshed 		TAILQ_FOREACH(walk, &mtcp->monitors, link) {
105676404edcSAsim Jamshed 			fcode = walk->stream_syn_fcode;
105776404edcSAsim Jamshed 			if (!(ISSET_BPFFILTER(fcode) &&
105876404edcSAsim Jamshed 				  eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr,
105976404edcSAsim Jamshed 								  socket->saddr.sin_port,
106076404edcSAsim Jamshed 								  dip, dport) == 0)) {
106176404edcSAsim Jamshed 				walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1
106276404edcSAsim Jamshed 				cnt_match++;
106376404edcSAsim Jamshed 			}
106476404edcSAsim Jamshed 		}
106576404edcSAsim Jamshed 	}
106676404edcSAsim Jamshed 
106776404edcSAsim Jamshed 	if (mtcp->num_msp > 0 && cnt_match > 0) {
106876404edcSAsim Jamshed 		/* 150820 dhkim: XXX: embedded mode is not verified */
106976404edcSAsim Jamshed #if 1
107076404edcSAsim Jamshed 		cur_stream = CreateClientTCPStream(mtcp, socket,
107176404edcSAsim Jamshed 						 STREAM_TYPE(MOS_SOCK_STREAM) |
107276404edcSAsim Jamshed 						 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
107376404edcSAsim Jamshed 						 socket->saddr.sin_addr.s_addr,
107476404edcSAsim Jamshed 						 socket->saddr.sin_port, dip, dport, NULL);
107576404edcSAsim Jamshed #else
107676404edcSAsim Jamshed 		cur_stream = CreateDualTCPStream(mtcp, socket,
107776404edcSAsim Jamshed 						 STREAM_TYPE(MOS_SOCK_STREAM) |
107876404edcSAsim Jamshed 						 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
107976404edcSAsim Jamshed 						 socket->saddr.sin_addr.s_addr,
108076404edcSAsim Jamshed 						 socket->saddr.sin_port, dip, dport, NULL);
108176404edcSAsim Jamshed #endif
108276404edcSAsim Jamshed 	}
108376404edcSAsim Jamshed 	else
108476404edcSAsim Jamshed 		cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM),
108576404edcSAsim Jamshed 					     socket->saddr.sin_addr.s_addr,
108676404edcSAsim Jamshed 					     socket->saddr.sin_port, dip, dport, NULL);
108776404edcSAsim Jamshed 	if (!cur_stream) {
108876404edcSAsim Jamshed 		TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid);
108976404edcSAsim Jamshed 		errno = ENOMEM;
109076404edcSAsim Jamshed 		return -1;
109176404edcSAsim Jamshed 	}
109276404edcSAsim Jamshed 
109376404edcSAsim Jamshed 	if (is_dyn_bound)
109476404edcSAsim Jamshed 		cur_stream->is_bound_addr = TRUE;
109576404edcSAsim Jamshed 	cur_stream->sndvar->cwnd = 1;
109676404edcSAsim Jamshed 	cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10;
109776404edcSAsim Jamshed 	cur_stream->side = MOS_SIDE_CLI;
109876404edcSAsim Jamshed 	/* if monitor is enabled, update the pair stream side as well */
109976404edcSAsim Jamshed 	if (cur_stream->pair_stream) {
110076404edcSAsim Jamshed 		cur_stream->pair_stream->side = MOS_SIDE_SVR;
110176404edcSAsim Jamshed 		/*
110276404edcSAsim Jamshed 		 * if buffer management is off, then disable
110376404edcSAsim Jamshed 		 * monitoring tcp ring of server...
110476404edcSAsim Jamshed 		 * if there is even a single monitor asking for
110576404edcSAsim Jamshed 		 * buffer management, enable it (that's why the
110676404edcSAsim Jamshed 		 * need for the loop)
110776404edcSAsim Jamshed 		 */
110876404edcSAsim Jamshed 		cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF;
110976404edcSAsim Jamshed 		struct socket_map *walk;
111076404edcSAsim Jamshed 		SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
111176404edcSAsim Jamshed 			uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt;
111276404edcSAsim Jamshed 			if (bm > cur_stream->pair_stream->buffer_mgmt) {
111376404edcSAsim Jamshed 				cur_stream->pair_stream->buffer_mgmt = bm;
111476404edcSAsim Jamshed 				break;
111576404edcSAsim Jamshed 			}
111676404edcSAsim Jamshed 		} SOCKQ_FOREACH_END;
111776404edcSAsim Jamshed 	}
111876404edcSAsim Jamshed 
111976404edcSAsim Jamshed 	cur_stream->state = TCP_ST_SYN_SENT;
112076404edcSAsim Jamshed 	cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
112176404edcSAsim Jamshed 
112276404edcSAsim Jamshed 	TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id);
112376404edcSAsim Jamshed 
112476404edcSAsim Jamshed 	SQ_LOCK(&mtcp->ctx->connect_lock);
112576404edcSAsim Jamshed 	ret = StreamEnqueue(mtcp->connectq, cur_stream);
112676404edcSAsim Jamshed 	SQ_UNLOCK(&mtcp->ctx->connect_lock);
112776404edcSAsim Jamshed 	mtcp->wakeup_flag = TRUE;
112876404edcSAsim Jamshed 	if (ret < 0) {
112976404edcSAsim Jamshed 		TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid);
113076404edcSAsim Jamshed 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
113176404edcSAsim Jamshed 		StreamEnqueue(mtcp->destroyq, cur_stream);
113276404edcSAsim Jamshed 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
113376404edcSAsim Jamshed 		errno = EAGAIN;
113476404edcSAsim Jamshed 		return -1;
113576404edcSAsim Jamshed 	}
113676404edcSAsim Jamshed 
113776404edcSAsim Jamshed 	/* if nonblocking socket, return EINPROGRESS */
113876404edcSAsim Jamshed 	if (socket->opts & MTCP_NONBLOCK) {
113976404edcSAsim Jamshed 		errno = EINPROGRESS;
114076404edcSAsim Jamshed 		return -1;
114176404edcSAsim Jamshed 
114276404edcSAsim Jamshed 	} else {
114376404edcSAsim Jamshed 		while (1) {
114476404edcSAsim Jamshed 			if (!cur_stream) {
114576404edcSAsim Jamshed 				TRACE_ERROR("STREAM DESTROYED\n");
114676404edcSAsim Jamshed 				errno = ETIMEDOUT;
114776404edcSAsim Jamshed 				return -1;
114876404edcSAsim Jamshed 			}
114976404edcSAsim Jamshed 			if (cur_stream->state > TCP_ST_ESTABLISHED) {
115076404edcSAsim Jamshed 				TRACE_ERROR("Socket %d: weird state %s\n",
115176404edcSAsim Jamshed 						sockid, TCPStateToString(cur_stream));
115276404edcSAsim Jamshed 				// TODO: how to handle this?
115376404edcSAsim Jamshed 				errno = ENOSYS;
115476404edcSAsim Jamshed 				return -1;
115576404edcSAsim Jamshed 			}
115676404edcSAsim Jamshed 
115776404edcSAsim Jamshed 			if (cur_stream->state == TCP_ST_ESTABLISHED) {
115876404edcSAsim Jamshed 				break;
115976404edcSAsim Jamshed 			}
116076404edcSAsim Jamshed 			usleep(1000);
116176404edcSAsim Jamshed 		}
116276404edcSAsim Jamshed 	}
116376404edcSAsim Jamshed 
116476404edcSAsim Jamshed 	return 0;
116576404edcSAsim Jamshed }
116676404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
116776404edcSAsim Jamshed static inline int
116876404edcSAsim Jamshed CloseStreamSocket(mctx_t mctx, int sockid)
116976404edcSAsim Jamshed {
117076404edcSAsim Jamshed 	mtcp_manager_t mtcp;
117176404edcSAsim Jamshed 	tcp_stream *cur_stream;
117276404edcSAsim Jamshed 	int ret;
117376404edcSAsim Jamshed 
117476404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
117576404edcSAsim Jamshed 	if (!mtcp) {
117676404edcSAsim Jamshed 		errno = EACCES;
117776404edcSAsim Jamshed 		return -1;
117876404edcSAsim Jamshed 	}
117976404edcSAsim Jamshed 
118076404edcSAsim Jamshed 	cur_stream = mtcp->smap[sockid].stream;
118176404edcSAsim Jamshed 	if (!cur_stream) {
118276404edcSAsim Jamshed 		TRACE_API("Socket %d: stream does not exist.\n", sockid);
118376404edcSAsim Jamshed 		errno = ENOTCONN;
118476404edcSAsim Jamshed 		return -1;
118576404edcSAsim Jamshed 	}
118676404edcSAsim Jamshed 
118776404edcSAsim Jamshed 	if (cur_stream->closed) {
118876404edcSAsim Jamshed 		TRACE_API("Socket %d (Stream %u): already closed stream\n",
118976404edcSAsim Jamshed 				sockid, cur_stream->id);
119076404edcSAsim Jamshed 		return 0;
119176404edcSAsim Jamshed 	}
119276404edcSAsim Jamshed 	cur_stream->closed = TRUE;
119376404edcSAsim Jamshed 
119476404edcSAsim Jamshed 	TRACE_API("Stream %d: closing the stream.\n", cur_stream->id);
119576404edcSAsim Jamshed 
119676404edcSAsim Jamshed 	/* 141029 dhkim: Check this! */
119776404edcSAsim Jamshed 	cur_stream->socket = NULL;
119876404edcSAsim Jamshed 
119976404edcSAsim Jamshed 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
120076404edcSAsim Jamshed 		TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n",
120176404edcSAsim Jamshed 				cur_stream->id);
120276404edcSAsim Jamshed 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
120376404edcSAsim Jamshed 		StreamEnqueue(mtcp->destroyq, cur_stream);
120476404edcSAsim Jamshed 		mtcp->wakeup_flag = TRUE;
120576404edcSAsim Jamshed 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
120676404edcSAsim Jamshed 		return 0;
120776404edcSAsim Jamshed 
120876404edcSAsim Jamshed 	} else if (cur_stream->state == TCP_ST_SYN_SENT) {
120976404edcSAsim Jamshed #if 1
121076404edcSAsim Jamshed 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
121176404edcSAsim Jamshed 		StreamEnqueue(mtcp->destroyq, cur_stream);
121276404edcSAsim Jamshed 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
121376404edcSAsim Jamshed 		mtcp->wakeup_flag = TRUE;
121476404edcSAsim Jamshed #endif
121576404edcSAsim Jamshed 		return -1;
121676404edcSAsim Jamshed 
121776404edcSAsim Jamshed 	} else if (cur_stream->state != TCP_ST_ESTABLISHED &&
121876404edcSAsim Jamshed 			cur_stream->state != TCP_ST_CLOSE_WAIT) {
121976404edcSAsim Jamshed 		TRACE_API("Stream %d at state %s\n",
122076404edcSAsim Jamshed 				cur_stream->id, TCPStateToString(cur_stream));
122176404edcSAsim Jamshed 		errno = EBADF;
122276404edcSAsim Jamshed 		return -1;
122376404edcSAsim Jamshed 	}
122476404edcSAsim Jamshed 
122576404edcSAsim Jamshed 	SQ_LOCK(&mtcp->ctx->close_lock);
122676404edcSAsim Jamshed 	cur_stream->sndvar->on_closeq = TRUE;
122776404edcSAsim Jamshed 	ret = StreamEnqueue(mtcp->closeq, cur_stream);
122876404edcSAsim Jamshed 	mtcp->wakeup_flag = TRUE;
122976404edcSAsim Jamshed 	SQ_UNLOCK(&mtcp->ctx->close_lock);
123076404edcSAsim Jamshed 
123176404edcSAsim Jamshed 	if (ret < 0) {
123276404edcSAsim Jamshed 		TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
123376404edcSAsim Jamshed 		errno = EAGAIN;
123476404edcSAsim Jamshed 		return -1;
123576404edcSAsim Jamshed 	}
123676404edcSAsim Jamshed 
123776404edcSAsim Jamshed 	return 0;
123876404edcSAsim Jamshed }
123976404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
124076404edcSAsim Jamshed static inline int
124176404edcSAsim Jamshed CloseListeningSocket(mctx_t mctx, int sockid)
124276404edcSAsim Jamshed {
124376404edcSAsim Jamshed 	mtcp_manager_t mtcp;
124476404edcSAsim Jamshed 	struct tcp_listener *listener;
124576404edcSAsim Jamshed 
124676404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
124776404edcSAsim Jamshed 	if (!mtcp) {
124876404edcSAsim Jamshed 		errno = EACCES;
124976404edcSAsim Jamshed 		return -1;
125076404edcSAsim Jamshed 	}
125176404edcSAsim Jamshed 
125276404edcSAsim Jamshed 	listener = mtcp->smap[sockid].listener;
125376404edcSAsim Jamshed 	if (!listener) {
125476404edcSAsim Jamshed 		errno = EINVAL;
125576404edcSAsim Jamshed 		return -1;
125676404edcSAsim Jamshed 	}
125776404edcSAsim Jamshed 
125876404edcSAsim Jamshed 	if (listener->acceptq) {
125976404edcSAsim Jamshed 		DestroyStreamQueue(listener->acceptq);
126076404edcSAsim Jamshed 		listener->acceptq = NULL;
126176404edcSAsim Jamshed 	}
126276404edcSAsim Jamshed 
126376404edcSAsim Jamshed 	pthread_mutex_lock(&listener->accept_lock);
126476404edcSAsim Jamshed 	pthread_cond_signal(&listener->accept_cond);
126576404edcSAsim Jamshed 	pthread_mutex_unlock(&listener->accept_lock);
126676404edcSAsim Jamshed 
126776404edcSAsim Jamshed 	pthread_cond_destroy(&listener->accept_cond);
126876404edcSAsim Jamshed 	pthread_mutex_destroy(&listener->accept_lock);
126976404edcSAsim Jamshed 
127076404edcSAsim Jamshed 	free(listener);
127176404edcSAsim Jamshed 	mtcp->smap[sockid].listener = NULL;
127276404edcSAsim Jamshed 
127376404edcSAsim Jamshed 	return 0;
127476404edcSAsim Jamshed }
127576404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
127676404edcSAsim Jamshed int
127776404edcSAsim Jamshed mtcp_close(mctx_t mctx, int sockid)
127876404edcSAsim Jamshed {
127976404edcSAsim Jamshed 	mtcp_manager_t mtcp;
128076404edcSAsim Jamshed 	int ret;
128176404edcSAsim Jamshed 
128276404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
128376404edcSAsim Jamshed 	if (!mtcp) {
128476404edcSAsim Jamshed 		errno = EACCES;
128576404edcSAsim Jamshed 		return -1;
128676404edcSAsim Jamshed 	}
128776404edcSAsim Jamshed 
128876404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
128976404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
129076404edcSAsim Jamshed 		errno = EBADF;
129176404edcSAsim Jamshed 		return -1;
129276404edcSAsim Jamshed 	}
129376404edcSAsim Jamshed 
129476404edcSAsim Jamshed 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
129576404edcSAsim Jamshed 		TRACE_API("Invalid socket id: %d\n", sockid);
129676404edcSAsim Jamshed 		errno = EBADF;
129776404edcSAsim Jamshed 		return -1;
129876404edcSAsim Jamshed 	}
129976404edcSAsim Jamshed 
130076404edcSAsim Jamshed 	TRACE_API("Socket %d: mtcp_close called.\n", sockid);
130176404edcSAsim Jamshed 
130276404edcSAsim Jamshed 	switch (mtcp->smap[sockid].socktype) {
130376404edcSAsim Jamshed 	case MOS_SOCK_STREAM:
130476404edcSAsim Jamshed 		ret = CloseStreamSocket(mctx, sockid);
130576404edcSAsim Jamshed 		break;
130676404edcSAsim Jamshed 
130776404edcSAsim Jamshed 	case MOS_SOCK_STREAM_LISTEN:
130876404edcSAsim Jamshed 		ret = CloseListeningSocket(mctx, sockid);
130976404edcSAsim Jamshed 		break;
131076404edcSAsim Jamshed 
131176404edcSAsim Jamshed 	case MOS_SOCK_EPOLL:
131276404edcSAsim Jamshed 		ret = CloseEpollSocket(mctx, sockid);
131376404edcSAsim Jamshed 		break;
131476404edcSAsim Jamshed 
131576404edcSAsim Jamshed 	case MOS_SOCK_PIPE:
131676404edcSAsim Jamshed 		ret = PipeClose(mctx, sockid);
131776404edcSAsim Jamshed 		break;
131876404edcSAsim Jamshed 
131976404edcSAsim Jamshed 	default:
132076404edcSAsim Jamshed 		errno = EINVAL;
132176404edcSAsim Jamshed 		ret = -1;
132276404edcSAsim Jamshed 		break;
132376404edcSAsim Jamshed 	}
132476404edcSAsim Jamshed 
132576404edcSAsim Jamshed 	FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
132676404edcSAsim Jamshed 
132776404edcSAsim Jamshed 	return ret;
132876404edcSAsim Jamshed }
132976404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
133076404edcSAsim Jamshed int
133176404edcSAsim Jamshed mtcp_abort(mctx_t mctx, int sockid)
133276404edcSAsim Jamshed {
133376404edcSAsim Jamshed 	mtcp_manager_t mtcp;
133476404edcSAsim Jamshed 	tcp_stream *cur_stream;
133576404edcSAsim Jamshed 	int ret;
133676404edcSAsim Jamshed 
133776404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
133876404edcSAsim Jamshed 	if (!mtcp) {
133976404edcSAsim Jamshed 		errno = EACCES;
134076404edcSAsim Jamshed 		return -1;
134176404edcSAsim Jamshed 	}
134276404edcSAsim Jamshed 
134376404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
134476404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
134576404edcSAsim Jamshed 		errno = EBADF;
134676404edcSAsim Jamshed 		return -1;
134776404edcSAsim Jamshed 	}
134876404edcSAsim Jamshed 
134976404edcSAsim Jamshed 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
135076404edcSAsim Jamshed 		TRACE_API("Invalid socket id: %d\n", sockid);
135176404edcSAsim Jamshed 		errno = EBADF;
135276404edcSAsim Jamshed 		return -1;
135376404edcSAsim Jamshed 	}
135476404edcSAsim Jamshed 
135576404edcSAsim Jamshed 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
135676404edcSAsim Jamshed 		TRACE_API("Not an end socket. id: %d\n", sockid);
135776404edcSAsim Jamshed 		errno = ENOTSOCK;
135876404edcSAsim Jamshed 		return -1;
135976404edcSAsim Jamshed 	}
136076404edcSAsim Jamshed 
136176404edcSAsim Jamshed 	cur_stream = mtcp->smap[sockid].stream;
136276404edcSAsim Jamshed 	if (!cur_stream) {
136376404edcSAsim Jamshed 		TRACE_API("Stream %d: does not exist.\n", sockid);
136476404edcSAsim Jamshed 		errno = ENOTCONN;
136576404edcSAsim Jamshed 		return -1;
136676404edcSAsim Jamshed 	}
136776404edcSAsim Jamshed 
136876404edcSAsim Jamshed 	TRACE_API("Socket %d: mtcp_abort()\n", sockid);
136976404edcSAsim Jamshed 
137076404edcSAsim Jamshed 	FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
137176404edcSAsim Jamshed 	cur_stream->socket = NULL;
137276404edcSAsim Jamshed 
137376404edcSAsim Jamshed 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
137476404edcSAsim Jamshed 		TRACE_API("Stream %d: connection already reset.\n", sockid);
137576404edcSAsim Jamshed 		return ERROR;
137676404edcSAsim Jamshed 
137776404edcSAsim Jamshed 	} else if (cur_stream->state == TCP_ST_SYN_SENT) {
137876404edcSAsim Jamshed 		/* TODO: this should notify event failure to all
137976404edcSAsim Jamshed 		   previous read() or write() calls */
138076404edcSAsim Jamshed 		cur_stream->state = TCP_ST_CLOSED_RSVD;
138176404edcSAsim Jamshed 		cur_stream->close_reason = TCP_ACTIVE_CLOSE;
138276404edcSAsim Jamshed 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
138376404edcSAsim Jamshed 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
138476404edcSAsim Jamshed 		StreamEnqueue(mtcp->destroyq, cur_stream);
138576404edcSAsim Jamshed 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
138676404edcSAsim Jamshed 		mtcp->wakeup_flag = TRUE;
138776404edcSAsim Jamshed 		return 0;
138876404edcSAsim Jamshed 
138976404edcSAsim Jamshed 	} else if (cur_stream->state == TCP_ST_CLOSING ||
139076404edcSAsim Jamshed 			cur_stream->state == TCP_ST_LAST_ACK ||
139176404edcSAsim Jamshed 			cur_stream->state == TCP_ST_TIME_WAIT) {
139276404edcSAsim Jamshed 		cur_stream->state = TCP_ST_CLOSED_RSVD;
139376404edcSAsim Jamshed 		cur_stream->close_reason = TCP_ACTIVE_CLOSE;
139476404edcSAsim Jamshed 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
139576404edcSAsim Jamshed 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
139676404edcSAsim Jamshed 		StreamEnqueue(mtcp->destroyq, cur_stream);
139776404edcSAsim Jamshed 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
139876404edcSAsim Jamshed 		mtcp->wakeup_flag = TRUE;
139976404edcSAsim Jamshed 		return 0;
140076404edcSAsim Jamshed 	}
140176404edcSAsim Jamshed 
140276404edcSAsim Jamshed 	/* the stream structure will be destroyed after sending RST */
140376404edcSAsim Jamshed 	if (cur_stream->sndvar->on_resetq) {
140476404edcSAsim Jamshed 		TRACE_ERROR("Stream %d: calling mtcp_abort() "
140576404edcSAsim Jamshed 				"when in reset queue.\n", sockid);
140676404edcSAsim Jamshed 		errno = ECONNRESET;
140776404edcSAsim Jamshed 		return -1;
140876404edcSAsim Jamshed 	}
140976404edcSAsim Jamshed 	SQ_LOCK(&mtcp->ctx->reset_lock);
141076404edcSAsim Jamshed 	cur_stream->sndvar->on_resetq = TRUE;
141176404edcSAsim Jamshed 	ret = StreamEnqueue(mtcp->resetq, cur_stream);
141276404edcSAsim Jamshed 	SQ_UNLOCK(&mtcp->ctx->reset_lock);
141376404edcSAsim Jamshed 	mtcp->wakeup_flag = TRUE;
141476404edcSAsim Jamshed 
141576404edcSAsim Jamshed 	if (ret < 0) {
141676404edcSAsim Jamshed 		TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
141776404edcSAsim Jamshed 		errno = EAGAIN;
141876404edcSAsim Jamshed 		return -1;
141976404edcSAsim Jamshed 	}
142076404edcSAsim Jamshed 
142176404edcSAsim Jamshed 	return 0;
142276404edcSAsim Jamshed }
142376404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
142476404edcSAsim Jamshed static inline int
1425*df3fae06SAsim Jamshed PeekForUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len)
1426*df3fae06SAsim Jamshed {
1427*df3fae06SAsim Jamshed 	struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
1428*df3fae06SAsim Jamshed 	int copylen;
1429*df3fae06SAsim Jamshed 	tcprb_t *rb = rcvvar->rcvbuf;
1430*df3fae06SAsim Jamshed 
1431*df3fae06SAsim Jamshed 	if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) {
1432*df3fae06SAsim Jamshed 		errno = EAGAIN;
1433*df3fae06SAsim Jamshed 		return -1;
1434*df3fae06SAsim Jamshed 	}
1435*df3fae06SAsim Jamshed 
1436*df3fae06SAsim Jamshed 	return copylen;
1437*df3fae06SAsim Jamshed }
1438*df3fae06SAsim Jamshed /*----------------------------------------------------------------------------*/
1439*df3fae06SAsim Jamshed static inline int
144076404edcSAsim Jamshed CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len)
144176404edcSAsim Jamshed {
144276404edcSAsim Jamshed 	struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
144376404edcSAsim Jamshed 	int copylen;
144476404edcSAsim Jamshed 	tcprb_t *rb = rcvvar->rcvbuf;
144576404edcSAsim Jamshed 	if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) {
144676404edcSAsim Jamshed 		errno = EAGAIN;
144776404edcSAsim Jamshed 		return -1;
144876404edcSAsim Jamshed 	}
144976404edcSAsim Jamshed 	tcprb_setpile(rb, rb->pile + copylen);
145076404edcSAsim Jamshed 
145176404edcSAsim Jamshed 	rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb);
145276404edcSAsim Jamshed 	//printf("rcv_wnd: %d\n", rcvvar->rcv_wnd);
145376404edcSAsim Jamshed 
145476404edcSAsim Jamshed 	/* Advertise newly freed receive buffer */
145576404edcSAsim Jamshed 	if (cur_stream->need_wnd_adv) {
145676404edcSAsim Jamshed 		if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) {
145776404edcSAsim Jamshed 			if (!cur_stream->sndvar->on_ackq) {
145876404edcSAsim Jamshed 				SQ_LOCK(&mtcp->ctx->ackq_lock);
145976404edcSAsim Jamshed 				cur_stream->sndvar->on_ackq = TRUE;
146076404edcSAsim Jamshed 				StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */
146176404edcSAsim Jamshed 				SQ_UNLOCK(&mtcp->ctx->ackq_lock);
146276404edcSAsim Jamshed 				cur_stream->need_wnd_adv = FALSE;
146376404edcSAsim Jamshed 				mtcp->wakeup_flag = TRUE;
146476404edcSAsim Jamshed 			}
146576404edcSAsim Jamshed 		}
146676404edcSAsim Jamshed 	}
146776404edcSAsim Jamshed 
146876404edcSAsim Jamshed 	return copylen;
146976404edcSAsim Jamshed }
147076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
147176404edcSAsim Jamshed ssize_t
1472*df3fae06SAsim Jamshed mtcp_recv(mctx_t mctx, int sockid, char *buf, size_t len, int flags)
147376404edcSAsim Jamshed {
147476404edcSAsim Jamshed 	mtcp_manager_t mtcp;
147576404edcSAsim Jamshed 	socket_map_t socket;
147676404edcSAsim Jamshed 	tcp_stream *cur_stream;
147776404edcSAsim Jamshed 	struct tcp_recv_vars *rcvvar;
147876404edcSAsim Jamshed 	int event_remaining, merged_len;
147976404edcSAsim Jamshed 	int ret;
148076404edcSAsim Jamshed 
148176404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
148276404edcSAsim Jamshed 	if (!mtcp) {
148376404edcSAsim Jamshed 		errno = EACCES;
148476404edcSAsim Jamshed 		return -1;
148576404edcSAsim Jamshed 	}
148676404edcSAsim Jamshed 
148776404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
148876404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
148976404edcSAsim Jamshed 		errno = EBADF;
149076404edcSAsim Jamshed 		return -1;
149176404edcSAsim Jamshed 	}
149276404edcSAsim Jamshed 
149376404edcSAsim Jamshed 	socket = &mtcp->smap[sockid];
149476404edcSAsim Jamshed 	if (socket->socktype == MOS_SOCK_UNUSED) {
149576404edcSAsim Jamshed 		TRACE_API("Invalid socket id: %d\n", sockid);
149676404edcSAsim Jamshed 		errno = EBADF;
149776404edcSAsim Jamshed 		return -1;
149876404edcSAsim Jamshed 	}
149976404edcSAsim Jamshed 
150076404edcSAsim Jamshed 	if (socket->socktype == MOS_SOCK_PIPE) {
150176404edcSAsim Jamshed 		return PipeRead(mctx, sockid, buf, len);
150276404edcSAsim Jamshed 	}
150376404edcSAsim Jamshed 
150476404edcSAsim Jamshed 	if (socket->socktype != MOS_SOCK_STREAM) {
150576404edcSAsim Jamshed 		TRACE_API("Not an end socket. id: %d\n", sockid);
150676404edcSAsim Jamshed 		errno = ENOTSOCK;
150776404edcSAsim Jamshed 		return -1;
150876404edcSAsim Jamshed 	}
150976404edcSAsim Jamshed 
151076404edcSAsim Jamshed 	/* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
151176404edcSAsim Jamshed 	cur_stream = socket->stream;
151276404edcSAsim Jamshed 	if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf ||
151376404edcSAsim Jamshed 	    !(cur_stream->state >= TCP_ST_ESTABLISHED &&
151476404edcSAsim Jamshed 	      cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
151576404edcSAsim Jamshed 		errno = ENOTCONN;
151676404edcSAsim Jamshed 		return -1;
151776404edcSAsim Jamshed 	}
151876404edcSAsim Jamshed 
151976404edcSAsim Jamshed 	rcvvar = cur_stream->rcvvar;
152076404edcSAsim Jamshed 
152176404edcSAsim Jamshed 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
152276404edcSAsim Jamshed 
152376404edcSAsim Jamshed 	/* if CLOSE_WAIT, return 0 if there is no payload */
152476404edcSAsim Jamshed 	if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
152576404edcSAsim Jamshed 		if (!rcvvar->rcvbuf)
152676404edcSAsim Jamshed 			return 0;
152776404edcSAsim Jamshed 
152876404edcSAsim Jamshed 		if (merged_len == 0)
152976404edcSAsim Jamshed 			return 0;
153076404edcSAsim Jamshed 	}
153176404edcSAsim Jamshed 
153276404edcSAsim Jamshed 	/* return EAGAIN if no receive buffer */
153376404edcSAsim Jamshed 	if (socket->opts & MTCP_NONBLOCK) {
153476404edcSAsim Jamshed 		if (!rcvvar->rcvbuf || merged_len == 0) {
153576404edcSAsim Jamshed 			errno = EAGAIN;
153676404edcSAsim Jamshed 			return -1;
153776404edcSAsim Jamshed 		}
153876404edcSAsim Jamshed 	}
153976404edcSAsim Jamshed 
154076404edcSAsim Jamshed 	SBUF_LOCK(&rcvvar->read_lock);
154176404edcSAsim Jamshed 
1542*df3fae06SAsim Jamshed 	switch (flags) {
1543*df3fae06SAsim Jamshed 	case 0:
154476404edcSAsim Jamshed 		ret = CopyToUser(mtcp, cur_stream, buf, len);
1545*df3fae06SAsim Jamshed 		break;
1546*df3fae06SAsim Jamshed 	case MSG_PEEK:
1547*df3fae06SAsim Jamshed 		ret = PeekForUser(mtcp, cur_stream, buf, len);
1548*df3fae06SAsim Jamshed 		break;
1549*df3fae06SAsim Jamshed 	default:
1550*df3fae06SAsim Jamshed 		SBUF_UNLOCK(&rcvvar->read_lock);
1551*df3fae06SAsim Jamshed 		ret = -1;
1552*df3fae06SAsim Jamshed 		errno = EINVAL;
1553*df3fae06SAsim Jamshed 		return ret;
1554*df3fae06SAsim Jamshed 	}
155576404edcSAsim Jamshed 
155676404edcSAsim Jamshed 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
155776404edcSAsim Jamshed 	event_remaining = FALSE;
155876404edcSAsim Jamshed 	/* if there are remaining payload, generate EPOLLIN */
155976404edcSAsim Jamshed 	/* (may due to insufficient user buffer) */
156076404edcSAsim Jamshed 	if (socket->epoll & MOS_EPOLLIN) {
156176404edcSAsim Jamshed 		if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
156276404edcSAsim Jamshed 			event_remaining = TRUE;
156376404edcSAsim Jamshed 		}
156476404edcSAsim Jamshed 	}
156576404edcSAsim Jamshed 	/* if waiting for close, notify it if no remaining data */
156676404edcSAsim Jamshed 	if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
156776404edcSAsim Jamshed 	    merged_len == 0 && ret > 0) {
156876404edcSAsim Jamshed 		event_remaining = TRUE;
156976404edcSAsim Jamshed 	}
157076404edcSAsim Jamshed 
157176404edcSAsim Jamshed 	SBUF_UNLOCK(&rcvvar->read_lock);
157276404edcSAsim Jamshed 
157376404edcSAsim Jamshed 	if (event_remaining) {
157476404edcSAsim Jamshed 		if (socket->epoll) {
157576404edcSAsim Jamshed 			AddEpollEvent(mtcp->ep,
157676404edcSAsim Jamshed 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
157776404edcSAsim Jamshed 		}
157876404edcSAsim Jamshed 	}
157976404edcSAsim Jamshed 
1580*df3fae06SAsim Jamshed 	TRACE_API("Stream %d: mtcp_recv() returning %d\n", cur_stream->id, ret);
158176404edcSAsim Jamshed 	return ret;
158276404edcSAsim Jamshed }
158376404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
1584*df3fae06SAsim Jamshed inline ssize_t
1585*df3fae06SAsim Jamshed mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len)
1586*df3fae06SAsim Jamshed {
1587*df3fae06SAsim Jamshed 	return mtcp_recv(mctx, sockid, buf, len, 0);
1588*df3fae06SAsim Jamshed }
1589*df3fae06SAsim Jamshed /*----------------------------------------------------------------------------*/
159076404edcSAsim Jamshed ssize_t
1591a5e1a556SAsim Jamshed mtcp_readv(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV)
159276404edcSAsim Jamshed {
159376404edcSAsim Jamshed 	mtcp_manager_t mtcp;
159476404edcSAsim Jamshed 	socket_map_t socket;
159576404edcSAsim Jamshed 	tcp_stream *cur_stream;
159676404edcSAsim Jamshed 	struct tcp_recv_vars *rcvvar;
159776404edcSAsim Jamshed 	int ret, bytes_read, i;
159876404edcSAsim Jamshed 	int event_remaining, merged_len;
159976404edcSAsim Jamshed 
160076404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
160176404edcSAsim Jamshed 	if (!mtcp) {
160276404edcSAsim Jamshed 		errno = EACCES;
160376404edcSAsim Jamshed 		return -1;
160476404edcSAsim Jamshed 	}
160576404edcSAsim Jamshed 
160676404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
160776404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
160876404edcSAsim Jamshed 		errno = EBADF;
160976404edcSAsim Jamshed 		return -1;
161076404edcSAsim Jamshed 	}
161176404edcSAsim Jamshed 
161276404edcSAsim Jamshed 	socket = &mtcp->smap[sockid];
161376404edcSAsim Jamshed 	if (socket->socktype == MOS_SOCK_UNUSED) {
161476404edcSAsim Jamshed 		TRACE_API("Invalid socket id: %d\n", sockid);
161576404edcSAsim Jamshed 		errno = EBADF;
161676404edcSAsim Jamshed 		return -1;
161776404edcSAsim Jamshed 	}
161876404edcSAsim Jamshed 
161976404edcSAsim Jamshed 	if (socket->socktype != MOS_SOCK_STREAM) {
162076404edcSAsim Jamshed 		TRACE_API("Not an end socket. id: %d\n", sockid);
162176404edcSAsim Jamshed 		errno = ENOTSOCK;
162276404edcSAsim Jamshed 		return -1;
162376404edcSAsim Jamshed 	}
162476404edcSAsim Jamshed 
162576404edcSAsim Jamshed 	/* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
162676404edcSAsim Jamshed 	cur_stream = socket->stream;
162776404edcSAsim Jamshed 	if (!cur_stream ||
162876404edcSAsim Jamshed 			!(cur_stream->state >= TCP_ST_ESTABLISHED &&
162976404edcSAsim Jamshed 			  cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
163076404edcSAsim Jamshed 		errno = ENOTCONN;
163176404edcSAsim Jamshed 		return -1;
163276404edcSAsim Jamshed 	}
163376404edcSAsim Jamshed 
163476404edcSAsim Jamshed 	rcvvar = cur_stream->rcvvar;
163576404edcSAsim Jamshed 
163676404edcSAsim Jamshed 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
163776404edcSAsim Jamshed 
163876404edcSAsim Jamshed 	/* if CLOSE_WAIT, return 0 if there is no payload */
163976404edcSAsim Jamshed 	if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
164076404edcSAsim Jamshed 		if (!rcvvar->rcvbuf)
164176404edcSAsim Jamshed 			return 0;
164276404edcSAsim Jamshed 
164376404edcSAsim Jamshed 		if (merged_len == 0)
164476404edcSAsim Jamshed 			return 0;
164576404edcSAsim Jamshed 	}
164676404edcSAsim Jamshed 
164776404edcSAsim Jamshed 	/* return EAGAIN if no receive buffer */
164876404edcSAsim Jamshed 	if (socket->opts & MTCP_NONBLOCK) {
164976404edcSAsim Jamshed 		if (!rcvvar->rcvbuf || merged_len == 0) {
165076404edcSAsim Jamshed 			errno = EAGAIN;
165176404edcSAsim Jamshed 			return -1;
165276404edcSAsim Jamshed 		}
165376404edcSAsim Jamshed 	}
165476404edcSAsim Jamshed 
165576404edcSAsim Jamshed 	SBUF_LOCK(&rcvvar->read_lock);
165676404edcSAsim Jamshed 
165776404edcSAsim Jamshed 	/* read and store the contents to the vectored buffers */
165876404edcSAsim Jamshed 	bytes_read = 0;
165976404edcSAsim Jamshed 	for (i = 0; i < numIOV; i++) {
166076404edcSAsim Jamshed 		if (iov[i].iov_len <= 0)
166176404edcSAsim Jamshed 			continue;
166276404edcSAsim Jamshed 
166376404edcSAsim Jamshed 		ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
166476404edcSAsim Jamshed 		if (ret <= 0)
166576404edcSAsim Jamshed 			break;
166676404edcSAsim Jamshed 
166776404edcSAsim Jamshed 		bytes_read += ret;
166876404edcSAsim Jamshed 
166976404edcSAsim Jamshed 		if (ret < iov[i].iov_len)
167076404edcSAsim Jamshed 			break;
167176404edcSAsim Jamshed 	}
167276404edcSAsim Jamshed 
167376404edcSAsim Jamshed 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
167476404edcSAsim Jamshed 
167576404edcSAsim Jamshed 	event_remaining = FALSE;
167676404edcSAsim Jamshed 	/* if there are remaining payload, generate read event */
167776404edcSAsim Jamshed 	/* (may due to insufficient user buffer) */
167876404edcSAsim Jamshed 	if (socket->epoll & MOS_EPOLLIN) {
167976404edcSAsim Jamshed 		if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
168076404edcSAsim Jamshed 			event_remaining = TRUE;
168176404edcSAsim Jamshed 		}
168276404edcSAsim Jamshed 	}
168376404edcSAsim Jamshed 	/* if waiting for close, notify it if no remaining data */
168476404edcSAsim Jamshed 	if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
168576404edcSAsim Jamshed 			merged_len == 0 && bytes_read > 0) {
168676404edcSAsim Jamshed 		event_remaining = TRUE;
168776404edcSAsim Jamshed 	}
168876404edcSAsim Jamshed 
168976404edcSAsim Jamshed 	SBUF_UNLOCK(&rcvvar->read_lock);
169076404edcSAsim Jamshed 
169176404edcSAsim Jamshed 	if(event_remaining) {
169276404edcSAsim Jamshed 		if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) {
169376404edcSAsim Jamshed 			AddEpollEvent(mtcp->ep,
169476404edcSAsim Jamshed 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
169576404edcSAsim Jamshed 		}
169676404edcSAsim Jamshed 	}
169776404edcSAsim Jamshed 
169876404edcSAsim Jamshed 	TRACE_API("Stream %d: mtcp_readv() returning %d\n",
169976404edcSAsim Jamshed 			cur_stream->id, bytes_read);
170076404edcSAsim Jamshed 	return bytes_read;
170176404edcSAsim Jamshed }
170276404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
170376404edcSAsim Jamshed static inline int
1704a5e1a556SAsim Jamshed CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, const char *buf, int len)
170576404edcSAsim Jamshed {
170676404edcSAsim Jamshed 	struct tcp_send_vars *sndvar = cur_stream->sndvar;
170776404edcSAsim Jamshed 	int sndlen;
170876404edcSAsim Jamshed 	int ret;
170976404edcSAsim Jamshed 
171076404edcSAsim Jamshed 	sndlen = MIN((int)sndvar->snd_wnd, len);
171176404edcSAsim Jamshed 	if (sndlen <= 0) {
171276404edcSAsim Jamshed 		errno = EAGAIN;
171376404edcSAsim Jamshed 		return -1;
171476404edcSAsim Jamshed 	}
171576404edcSAsim Jamshed 
171676404edcSAsim Jamshed 	/* allocate send buffer if not exist */
171776404edcSAsim Jamshed 	if (!sndvar->sndbuf) {
171876404edcSAsim Jamshed 		sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1);
171976404edcSAsim Jamshed 		if (!sndvar->sndbuf) {
172076404edcSAsim Jamshed 			cur_stream->close_reason = TCP_NO_MEM;
172176404edcSAsim Jamshed 			/* notification may not required due to -1 return */
172276404edcSAsim Jamshed 			errno = ENOMEM;
172376404edcSAsim Jamshed 			return -1;
172476404edcSAsim Jamshed 		}
172576404edcSAsim Jamshed 	}
172676404edcSAsim Jamshed 
172776404edcSAsim Jamshed 	ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen);
172876404edcSAsim Jamshed 	assert(ret == sndlen);
172976404edcSAsim Jamshed 	sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len;
173076404edcSAsim Jamshed 	if (ret <= 0) {
173176404edcSAsim Jamshed 		TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n",
173276404edcSAsim Jamshed 				ret, sndlen, sndvar->sndbuf->len);
173376404edcSAsim Jamshed 		errno = EAGAIN;
173476404edcSAsim Jamshed 		return -1;
173576404edcSAsim Jamshed 	}
173676404edcSAsim Jamshed 
173776404edcSAsim Jamshed 	if (sndvar->snd_wnd <= 0) {
173876404edcSAsim Jamshed 		TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n",
173976404edcSAsim Jamshed 				cur_stream->id, sndvar->snd_wnd);
174076404edcSAsim Jamshed 	}
174176404edcSAsim Jamshed 
174276404edcSAsim Jamshed 	return ret;
174376404edcSAsim Jamshed }
174476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
174576404edcSAsim Jamshed ssize_t
1746a5e1a556SAsim Jamshed mtcp_write(mctx_t mctx, int sockid, const char *buf, size_t len)
174776404edcSAsim Jamshed {
174876404edcSAsim Jamshed 	mtcp_manager_t mtcp;
174976404edcSAsim Jamshed 	socket_map_t socket;
175076404edcSAsim Jamshed 	tcp_stream *cur_stream;
175176404edcSAsim Jamshed 	struct tcp_send_vars *sndvar;
175276404edcSAsim Jamshed 	int ret;
175376404edcSAsim Jamshed 
175476404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
175576404edcSAsim Jamshed 	if (!mtcp) {
175676404edcSAsim Jamshed 		errno = EACCES;
175776404edcSAsim Jamshed 		return -1;
175876404edcSAsim Jamshed 	}
175976404edcSAsim Jamshed 
176076404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
176176404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
176276404edcSAsim Jamshed 		errno = EBADF;
176376404edcSAsim Jamshed 		return -1;
176476404edcSAsim Jamshed 	}
176576404edcSAsim Jamshed 
176676404edcSAsim Jamshed 	socket = &mtcp->smap[sockid];
176776404edcSAsim Jamshed 	if (socket->socktype == MOS_SOCK_UNUSED) {
176876404edcSAsim Jamshed 		TRACE_API("Invalid socket id: %d\n", sockid);
176976404edcSAsim Jamshed 		errno = EBADF;
177076404edcSAsim Jamshed 		return -1;
177176404edcSAsim Jamshed 	}
177276404edcSAsim Jamshed 
177376404edcSAsim Jamshed 	if (socket->socktype == MOS_SOCK_PIPE) {
177476404edcSAsim Jamshed 		return PipeWrite(mctx, sockid, buf, len);
177576404edcSAsim Jamshed 	}
177676404edcSAsim Jamshed 
177776404edcSAsim Jamshed 	if (socket->socktype != MOS_SOCK_STREAM) {
177876404edcSAsim Jamshed 		TRACE_API("Not an end socket. id: %d\n", sockid);
177976404edcSAsim Jamshed 		errno = ENOTSOCK;
178076404edcSAsim Jamshed 		return -1;
178176404edcSAsim Jamshed 	}
178276404edcSAsim Jamshed 
178376404edcSAsim Jamshed 	cur_stream = socket->stream;
178476404edcSAsim Jamshed 	if (!cur_stream ||
178576404edcSAsim Jamshed 			!(cur_stream->state == TCP_ST_ESTABLISHED ||
178676404edcSAsim Jamshed 			  cur_stream->state == TCP_ST_CLOSE_WAIT)) {
178776404edcSAsim Jamshed 		errno = ENOTCONN;
178876404edcSAsim Jamshed 		return -1;
178976404edcSAsim Jamshed 	}
179076404edcSAsim Jamshed 
179176404edcSAsim Jamshed 	if (len <= 0) {
179276404edcSAsim Jamshed 		if (socket->opts & MTCP_NONBLOCK) {
179376404edcSAsim Jamshed 			errno = EAGAIN;
179476404edcSAsim Jamshed 			return -1;
179576404edcSAsim Jamshed 		} else {
179676404edcSAsim Jamshed 			return 0;
179776404edcSAsim Jamshed 		}
179876404edcSAsim Jamshed 	}
179976404edcSAsim Jamshed 
180076404edcSAsim Jamshed 	sndvar = cur_stream->sndvar;
180176404edcSAsim Jamshed 
180276404edcSAsim Jamshed 	SBUF_LOCK(&sndvar->write_lock);
180376404edcSAsim Jamshed 	ret = CopyFromUser(mtcp, cur_stream, buf, len);
180476404edcSAsim Jamshed 
180576404edcSAsim Jamshed 	SBUF_UNLOCK(&sndvar->write_lock);
180676404edcSAsim Jamshed 
180776404edcSAsim Jamshed 	if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
180876404edcSAsim Jamshed 		SQ_LOCK(&mtcp->ctx->sendq_lock);
180976404edcSAsim Jamshed 		sndvar->on_sendq = TRUE;
181076404edcSAsim Jamshed 		StreamEnqueue(mtcp->sendq, cur_stream);		/* this always success */
181176404edcSAsim Jamshed 		SQ_UNLOCK(&mtcp->ctx->sendq_lock);
181276404edcSAsim Jamshed 		mtcp->wakeup_flag = TRUE;
181376404edcSAsim Jamshed 	}
181476404edcSAsim Jamshed 
181576404edcSAsim Jamshed 	if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) {
181676404edcSAsim Jamshed 		ret = -1;
181776404edcSAsim Jamshed 		errno = EAGAIN;
181876404edcSAsim Jamshed 	}
181976404edcSAsim Jamshed 
182076404edcSAsim Jamshed 	/* if there are remaining sending buffer, generate write event */
182176404edcSAsim Jamshed 	if (sndvar->snd_wnd > 0) {
182276404edcSAsim Jamshed 		if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
182376404edcSAsim Jamshed 			AddEpollEvent(mtcp->ep,
182476404edcSAsim Jamshed 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
182576404edcSAsim Jamshed 		}
182676404edcSAsim Jamshed 	}
182776404edcSAsim Jamshed 
182876404edcSAsim Jamshed 	TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret);
182976404edcSAsim Jamshed 	return ret;
183076404edcSAsim Jamshed }
183176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
183276404edcSAsim Jamshed ssize_t
1833a5e1a556SAsim Jamshed mtcp_writev(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV)
183476404edcSAsim Jamshed {
183576404edcSAsim Jamshed 	mtcp_manager_t mtcp;
183676404edcSAsim Jamshed 	socket_map_t socket;
183776404edcSAsim Jamshed 	tcp_stream *cur_stream;
183876404edcSAsim Jamshed 	struct tcp_send_vars *sndvar;
183976404edcSAsim Jamshed 	int ret, to_write, i;
184076404edcSAsim Jamshed 
184176404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
184276404edcSAsim Jamshed 	if (!mtcp) {
184376404edcSAsim Jamshed 		errno = EACCES;
184476404edcSAsim Jamshed 		return -1;
184576404edcSAsim Jamshed 	}
184676404edcSAsim Jamshed 
184776404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
184876404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
184976404edcSAsim Jamshed 		errno = EBADF;
185076404edcSAsim Jamshed 		return -1;
185176404edcSAsim Jamshed 	}
185276404edcSAsim Jamshed 
185376404edcSAsim Jamshed 	socket = &mtcp->smap[sockid];
185476404edcSAsim Jamshed 	if (socket->socktype == MOS_SOCK_UNUSED) {
185576404edcSAsim Jamshed 		TRACE_API("Invalid socket id: %d\n", sockid);
185676404edcSAsim Jamshed 		errno = EBADF;
185776404edcSAsim Jamshed 		return -1;
185876404edcSAsim Jamshed 	}
185976404edcSAsim Jamshed 
186076404edcSAsim Jamshed 	if (socket->socktype != MOS_SOCK_STREAM) {
186176404edcSAsim Jamshed 		TRACE_API("Not an end socket. id: %d\n", sockid);
186276404edcSAsim Jamshed 		errno = ENOTSOCK;
186376404edcSAsim Jamshed 		return -1;
186476404edcSAsim Jamshed 	}
186576404edcSAsim Jamshed 
186676404edcSAsim Jamshed 	cur_stream = socket->stream;
186776404edcSAsim Jamshed 	if (!cur_stream ||
186876404edcSAsim Jamshed 			!(cur_stream->state == TCP_ST_ESTABLISHED ||
186976404edcSAsim Jamshed 			  cur_stream->state == TCP_ST_CLOSE_WAIT)) {
187076404edcSAsim Jamshed 		errno = ENOTCONN;
187176404edcSAsim Jamshed 		return -1;
187276404edcSAsim Jamshed 	}
187376404edcSAsim Jamshed 
187476404edcSAsim Jamshed 	sndvar = cur_stream->sndvar;
187576404edcSAsim Jamshed 	SBUF_LOCK(&sndvar->write_lock);
187676404edcSAsim Jamshed 
187776404edcSAsim Jamshed 	/* write from the vectored buffers */
187876404edcSAsim Jamshed 	to_write = 0;
187976404edcSAsim Jamshed 	for (i = 0; i < numIOV; i++) {
188076404edcSAsim Jamshed 		if (iov[i].iov_len <= 0)
188176404edcSAsim Jamshed 			continue;
188276404edcSAsim Jamshed 
188376404edcSAsim Jamshed 		ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
188476404edcSAsim Jamshed 		if (ret <= 0)
188576404edcSAsim Jamshed 			break;
188676404edcSAsim Jamshed 
188776404edcSAsim Jamshed 		to_write += ret;
188876404edcSAsim Jamshed 
188976404edcSAsim Jamshed 		if (ret < iov[i].iov_len)
189076404edcSAsim Jamshed 			break;
189176404edcSAsim Jamshed 	}
189276404edcSAsim Jamshed 	SBUF_UNLOCK(&sndvar->write_lock);
189376404edcSAsim Jamshed 
189476404edcSAsim Jamshed 	if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
189576404edcSAsim Jamshed 		SQ_LOCK(&mtcp->ctx->sendq_lock);
189676404edcSAsim Jamshed 		sndvar->on_sendq = TRUE;
189776404edcSAsim Jamshed 		StreamEnqueue(mtcp->sendq, cur_stream);		/* this always success */
189876404edcSAsim Jamshed 		SQ_UNLOCK(&mtcp->ctx->sendq_lock);
189976404edcSAsim Jamshed 		mtcp->wakeup_flag = TRUE;
190076404edcSAsim Jamshed 	}
190176404edcSAsim Jamshed 
190276404edcSAsim Jamshed 	if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) {
190376404edcSAsim Jamshed 		to_write = -1;
190476404edcSAsim Jamshed 		errno = EAGAIN;
190576404edcSAsim Jamshed 	}
190676404edcSAsim Jamshed 
190776404edcSAsim Jamshed 	/* if there are remaining sending buffer, generate write event */
190876404edcSAsim Jamshed 	if (sndvar->snd_wnd > 0) {
190976404edcSAsim Jamshed 		if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
191076404edcSAsim Jamshed 			AddEpollEvent(mtcp->ep,
191176404edcSAsim Jamshed 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
191276404edcSAsim Jamshed 		}
191376404edcSAsim Jamshed 	}
191476404edcSAsim Jamshed 
191576404edcSAsim Jamshed 	TRACE_API("Stream %d: mtcp_writev() returning %d\n",
191676404edcSAsim Jamshed 			cur_stream->id, to_write);
191776404edcSAsim Jamshed 	return to_write;
191876404edcSAsim Jamshed }
191976404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
192076404edcSAsim Jamshed uint32_t
192176404edcSAsim Jamshed mtcp_get_connection_cnt(mctx_t mctx)
192276404edcSAsim Jamshed {
192376404edcSAsim Jamshed 	mtcp_manager_t mtcp;
192476404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
192576404edcSAsim Jamshed 	if (!mtcp) {
192676404edcSAsim Jamshed 		errno = EACCES;
192776404edcSAsim Jamshed 		return -1;
192876404edcSAsim Jamshed 	}
192976404edcSAsim Jamshed 
193076404edcSAsim Jamshed 	if (mtcp->num_msp > 0)
193176404edcSAsim Jamshed 		return mtcp->flow_cnt / 2;
193276404edcSAsim Jamshed 	else
193376404edcSAsim Jamshed 		return mtcp->flow_cnt;
193476404edcSAsim Jamshed }
193576404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
1936