xref: /mOS-networking-stack/core/src/api.c (revision 76404edc)
1*76404edcSAsim Jamshed #include <sys/queue.h>
2*76404edcSAsim Jamshed #include <sys/ioctl.h>
3*76404edcSAsim Jamshed #include <limits.h>
4*76404edcSAsim Jamshed #include <unistd.h>
5*76404edcSAsim Jamshed #include <assert.h>
6*76404edcSAsim Jamshed #include <string.h>
7*76404edcSAsim Jamshed 
8*76404edcSAsim Jamshed #ifdef DARWIN
9*76404edcSAsim Jamshed #include <netinet/tcp.h>
10*76404edcSAsim Jamshed #include <netinet/ip.h>
11*76404edcSAsim Jamshed #include <netinet/if_ether.h>
12*76404edcSAsim Jamshed #endif
13*76404edcSAsim Jamshed 
14*76404edcSAsim Jamshed #include "mtcp.h"
15*76404edcSAsim Jamshed #include "mtcp_api.h"
16*76404edcSAsim Jamshed #include "tcp_in.h"
17*76404edcSAsim Jamshed #include "tcp_stream.h"
18*76404edcSAsim Jamshed #include "tcp_out.h"
19*76404edcSAsim Jamshed #include "ip_out.h"
20*76404edcSAsim Jamshed #include "eventpoll.h"
21*76404edcSAsim Jamshed #include "pipe.h"
22*76404edcSAsim Jamshed #include "fhash.h"
23*76404edcSAsim Jamshed #include "addr_pool.h"
24*76404edcSAsim Jamshed #include "rss.h"
25*76404edcSAsim Jamshed #include "config.h"
26*76404edcSAsim Jamshed #include "debug.h"
27*76404edcSAsim Jamshed #include "eventpoll.h"
28*76404edcSAsim Jamshed #include "mos_api.h"
29*76404edcSAsim Jamshed #include "tcp_rb.h"
30*76404edcSAsim Jamshed 
31*76404edcSAsim Jamshed #define MAX(a, b) ((a)>(b)?(a):(b))
32*76404edcSAsim Jamshed #define MIN(a, b) ((a)<(b)?(a):(b))
33*76404edcSAsim Jamshed 
34*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
35*76404edcSAsim Jamshed /** Stop monitoring the socket! (function prototype)
36*76404edcSAsim Jamshed  * @param [in] mctx: mtcp context
37*76404edcSAsim Jamshed  * @param [in] sock: monitoring stream socket id
38*76404edcSAsim Jamshed  * @param [in] side: side of monitoring (client side, server side or both)
39*76404edcSAsim Jamshed  *
40*76404edcSAsim Jamshed  * This function is now DEPRECATED and is only used within mOS core...
41*76404edcSAsim Jamshed  */
42*76404edcSAsim Jamshed int
43*76404edcSAsim Jamshed mtcp_cb_stop(mctx_t mctx, int sock, int side);
44*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
45*76404edcSAsim Jamshed /** Reset the connection (send RST packets to both sides)
46*76404edcSAsim Jamshed  *  (We need to decide the API for this.)
47*76404edcSAsim Jamshed  */
48*76404edcSAsim Jamshed //int
49*76404edcSAsim Jamshed //mtcp_cb_reset(mctx_t mctx, int sock, int side);
50*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
51*76404edcSAsim Jamshed inline mtcp_manager_t
52*76404edcSAsim Jamshed GetMTCPManager(mctx_t mctx)
53*76404edcSAsim Jamshed {
54*76404edcSAsim Jamshed 	if (!mctx) {
55*76404edcSAsim Jamshed 		errno = EACCES;
56*76404edcSAsim Jamshed 		return NULL;
57*76404edcSAsim Jamshed 	}
58*76404edcSAsim Jamshed 
59*76404edcSAsim Jamshed 	if (mctx->cpu < 0 || mctx->cpu >= num_cpus) {
60*76404edcSAsim Jamshed 		errno = EINVAL;
61*76404edcSAsim Jamshed 		return NULL;
62*76404edcSAsim Jamshed 	}
63*76404edcSAsim Jamshed 
64*76404edcSAsim Jamshed 	if (g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) {
65*76404edcSAsim Jamshed 		errno = EPERM;
66*76404edcSAsim Jamshed 		return NULL;
67*76404edcSAsim Jamshed 	}
68*76404edcSAsim Jamshed 
69*76404edcSAsim Jamshed 	return g_mtcp[mctx->cpu];
70*76404edcSAsim Jamshed }
71*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
72*76404edcSAsim Jamshed inline int
73*76404edcSAsim Jamshed GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen)
74*76404edcSAsim Jamshed {
75*76404edcSAsim Jamshed 	tcp_stream *cur_stream;
76*76404edcSAsim Jamshed 
77*76404edcSAsim Jamshed 	if (!socket->stream) {
78*76404edcSAsim Jamshed 		errno = EBADF;
79*76404edcSAsim Jamshed 		return -1;
80*76404edcSAsim Jamshed 	}
81*76404edcSAsim Jamshed 
82*76404edcSAsim Jamshed 	cur_stream = socket->stream;
83*76404edcSAsim Jamshed 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
84*76404edcSAsim Jamshed 		if (cur_stream->close_reason == TCP_TIMEDOUT ||
85*76404edcSAsim Jamshed 				cur_stream->close_reason == TCP_CONN_FAIL ||
86*76404edcSAsim Jamshed 				cur_stream->close_reason == TCP_CONN_LOST) {
87*76404edcSAsim Jamshed 			*(int *)optval = ETIMEDOUT;
88*76404edcSAsim Jamshed 			*optlen = sizeof(int);
89*76404edcSAsim Jamshed 
90*76404edcSAsim Jamshed 			return 0;
91*76404edcSAsim Jamshed 		}
92*76404edcSAsim Jamshed 	}
93*76404edcSAsim Jamshed 
94*76404edcSAsim Jamshed 	if (cur_stream->state == TCP_ST_CLOSE_WAIT ||
95*76404edcSAsim Jamshed 			cur_stream->state == TCP_ST_CLOSED_RSVD) {
96*76404edcSAsim Jamshed 		if (cur_stream->close_reason == TCP_RESET) {
97*76404edcSAsim Jamshed 			*(int *)optval = ECONNRESET;
98*76404edcSAsim Jamshed 			*optlen = sizeof(int);
99*76404edcSAsim Jamshed 
100*76404edcSAsim Jamshed 			return 0;
101*76404edcSAsim Jamshed 		}
102*76404edcSAsim Jamshed 	}
103*76404edcSAsim Jamshed 
104*76404edcSAsim Jamshed 	errno = ENOSYS;
105*76404edcSAsim Jamshed 	return -1;
106*76404edcSAsim Jamshed }
107*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
108*76404edcSAsim Jamshed int
109*76404edcSAsim Jamshed mtcp_getsockopt(mctx_t mctx, int sockid, int level,
110*76404edcSAsim Jamshed 		int optname, void *optval, socklen_t *optlen)
111*76404edcSAsim Jamshed {
112*76404edcSAsim Jamshed 	mtcp_manager_t mtcp;
113*76404edcSAsim Jamshed 	socket_map_t socket;
114*76404edcSAsim Jamshed 
115*76404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
116*76404edcSAsim Jamshed 	if (!mtcp) {
117*76404edcSAsim Jamshed 		errno = EACCES;
118*76404edcSAsim Jamshed 		return -1;
119*76404edcSAsim Jamshed 	}
120*76404edcSAsim Jamshed 
121*76404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
122*76404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
123*76404edcSAsim Jamshed 		errno = EBADF;
124*76404edcSAsim Jamshed 		return -1;
125*76404edcSAsim Jamshed 	}
126*76404edcSAsim Jamshed 
127*76404edcSAsim Jamshed 	switch (level) {
128*76404edcSAsim Jamshed 	case SOL_SOCKET:
129*76404edcSAsim Jamshed 		socket = &mtcp->smap[sockid];
130*76404edcSAsim Jamshed 		if (socket->socktype == MOS_SOCK_UNUSED) {
131*76404edcSAsim Jamshed 			TRACE_API("Invalid socket id: %d\n", sockid);
132*76404edcSAsim Jamshed 			errno = EBADF;
133*76404edcSAsim Jamshed 			return -1;
134*76404edcSAsim Jamshed 		}
135*76404edcSAsim Jamshed 
136*76404edcSAsim Jamshed 		if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
137*76404edcSAsim Jamshed 		    socket->socktype != MOS_SOCK_STREAM) {
138*76404edcSAsim Jamshed 			TRACE_API("Invalid socket id: %d\n", sockid);
139*76404edcSAsim Jamshed 			errno = ENOTSOCK;
140*76404edcSAsim Jamshed 			return -1;
141*76404edcSAsim Jamshed 		}
142*76404edcSAsim Jamshed 
143*76404edcSAsim Jamshed 		if (optname == SO_ERROR) {
144*76404edcSAsim Jamshed 			if (socket->socktype == MOS_SOCK_STREAM) {
145*76404edcSAsim Jamshed 				return GetSocketError(socket, optval, optlen);
146*76404edcSAsim Jamshed 			}
147*76404edcSAsim Jamshed 		}
148*76404edcSAsim Jamshed 		break;
149*76404edcSAsim Jamshed 	case SOL_MONSOCKET:
150*76404edcSAsim Jamshed 		/* check if the calling thread is in MOS context */
151*76404edcSAsim Jamshed 		if (mtcp->ctx->thread != pthread_self()) {
152*76404edcSAsim Jamshed 			errno = EPERM;
153*76404edcSAsim Jamshed 			return -1;
154*76404edcSAsim Jamshed 		}
155*76404edcSAsim Jamshed 		/*
156*76404edcSAsim Jamshed 		 * All options will only work for active
157*76404edcSAsim Jamshed 		 * monitor stream sockets
158*76404edcSAsim Jamshed 		 */
159*76404edcSAsim Jamshed 		socket = &mtcp->msmap[sockid];
160*76404edcSAsim Jamshed 		if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
161*76404edcSAsim Jamshed 			TRACE_API("Invalid socket id: %d\n", sockid);
162*76404edcSAsim Jamshed 			errno = ENOTSOCK;
163*76404edcSAsim Jamshed 			return -1;
164*76404edcSAsim Jamshed 		}
165*76404edcSAsim Jamshed 
166*76404edcSAsim Jamshed 		switch (optname) {
167*76404edcSAsim Jamshed 		case MOS_FRAGINFO_CLIBUF:
168*76404edcSAsim Jamshed 			return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen);
169*76404edcSAsim Jamshed 		case MOS_FRAGINFO_SVRBUF:
170*76404edcSAsim Jamshed 			return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen);
171*76404edcSAsim Jamshed 		case MOS_INFO_CLIBUF:
172*76404edcSAsim Jamshed 			return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen);
173*76404edcSAsim Jamshed 		case MOS_INFO_SVRBUF:
174*76404edcSAsim Jamshed 			return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen);
175*76404edcSAsim Jamshed 		case MOS_TCP_STATE_CLI:
176*76404edcSAsim Jamshed 			return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI,
177*76404edcSAsim Jamshed 							   optval, optlen);
178*76404edcSAsim Jamshed 		case MOS_TCP_STATE_SVR:
179*76404edcSAsim Jamshed 			return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR,
180*76404edcSAsim Jamshed 							   optval, optlen);
181*76404edcSAsim Jamshed 		case MOS_TIMESTAMP:
182*76404edcSAsim Jamshed 			return GetLastTimestamp(socket->monitor_stream->stream,
183*76404edcSAsim Jamshed 						(uint32_t *)optval,
184*76404edcSAsim Jamshed 						optlen);
185*76404edcSAsim Jamshed 		default:
186*76404edcSAsim Jamshed 		  TRACE_API("can't recognize the optname=%d\n", optname);
187*76404edcSAsim Jamshed 		  assert(0);
188*76404edcSAsim Jamshed 		}
189*76404edcSAsim Jamshed 		break;
190*76404edcSAsim Jamshed 	}
191*76404edcSAsim Jamshed 	errno = ENOSYS;
192*76404edcSAsim Jamshed 	return -1;
193*76404edcSAsim Jamshed }
194*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
195*76404edcSAsim Jamshed int
196*76404edcSAsim Jamshed mtcp_setsockopt(mctx_t mctx, int sockid, int level,
197*76404edcSAsim Jamshed 		int optname, const void *optval, socklen_t optlen)
198*76404edcSAsim Jamshed {
199*76404edcSAsim Jamshed 	mtcp_manager_t mtcp;
200*76404edcSAsim Jamshed 	socket_map_t socket;
201*76404edcSAsim Jamshed 	tcprb_t *rb;
202*76404edcSAsim Jamshed 
203*76404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
204*76404edcSAsim Jamshed 	if (!mtcp) {
205*76404edcSAsim Jamshed 		errno = EACCES;
206*76404edcSAsim Jamshed 		return -1;
207*76404edcSAsim Jamshed 	}
208*76404edcSAsim Jamshed 
209*76404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
210*76404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
211*76404edcSAsim Jamshed 		errno = EBADF;
212*76404edcSAsim Jamshed 		return -1;
213*76404edcSAsim Jamshed 	}
214*76404edcSAsim Jamshed 
215*76404edcSAsim Jamshed 	switch (level) {
216*76404edcSAsim Jamshed 	case SOL_SOCKET:
217*76404edcSAsim Jamshed 		socket = &mtcp->smap[sockid];
218*76404edcSAsim Jamshed 		if (socket->socktype == MOS_SOCK_UNUSED) {
219*76404edcSAsim Jamshed 			TRACE_API("Invalid socket id: %d\n", sockid);
220*76404edcSAsim Jamshed 			errno = EBADF;
221*76404edcSAsim Jamshed 			return -1;
222*76404edcSAsim Jamshed 		}
223*76404edcSAsim Jamshed 
224*76404edcSAsim Jamshed 		if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
225*76404edcSAsim Jamshed 		    socket->socktype != MOS_SOCK_STREAM) {
226*76404edcSAsim Jamshed 			TRACE_API("Invalid socket id: %d\n", sockid);
227*76404edcSAsim Jamshed 			errno = ENOTSOCK;
228*76404edcSAsim Jamshed 			return -1;
229*76404edcSAsim Jamshed 		}
230*76404edcSAsim Jamshed 		break;
231*76404edcSAsim Jamshed 	case SOL_MONSOCKET:
232*76404edcSAsim Jamshed 		socket = &mtcp->msmap[sockid];
233*76404edcSAsim Jamshed 		/*
234*76404edcSAsim Jamshed 		 * checking of calling thread to be in MOS context is
235*76404edcSAsim Jamshed 		 * disabled since both optnames can be called from
236*76404edcSAsim Jamshed 		 * `application' context (on passive sockets)
237*76404edcSAsim Jamshed 		 */
238*76404edcSAsim Jamshed 		/*
239*76404edcSAsim Jamshed 		 * if (mtcp->ctx->thread != pthread_self())
240*76404edcSAsim Jamshed 		 * return -1;
241*76404edcSAsim Jamshed 		 */
242*76404edcSAsim Jamshed 
243*76404edcSAsim Jamshed 		switch (optname) {
244*76404edcSAsim Jamshed 		case MOS_CLIBUF:
245*76404edcSAsim Jamshed #if 0
246*76404edcSAsim Jamshed 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
247*76404edcSAsim Jamshed 				errno = EBADF;
248*76404edcSAsim Jamshed 				return -1;
249*76404edcSAsim Jamshed 			}
250*76404edcSAsim Jamshed #endif
251*76404edcSAsim Jamshed #ifdef NEWRB
252*76404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE
253*76404edcSAsim Jamshed 			if (*(int *)optval != 0)
254*76404edcSAsim Jamshed 				return -1;
255*76404edcSAsim Jamshed 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
256*76404edcSAsim Jamshed 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
257*76404edcSAsim Jamshed 					socket->monitor_stream->stream->rcvvar->rcvbuf :
258*76404edcSAsim Jamshed 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
259*76404edcSAsim Jamshed 				if (rb) {
260*76404edcSAsim Jamshed 					tcprb_resize_meta(rb, 0);
261*76404edcSAsim Jamshed 					tcprb_resize(rb, 0);
262*76404edcSAsim Jamshed 				}
263*76404edcSAsim Jamshed 			}
264*76404edcSAsim Jamshed 			return DisableBuf(socket, MOS_SIDE_CLI);
265*76404edcSAsim Jamshed #else
266*76404edcSAsim Jamshed 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
267*76404edcSAsim Jamshed 				socket->monitor_stream->stream->rcvvar->rcvbuf :
268*76404edcSAsim Jamshed 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
269*76404edcSAsim Jamshed 			if (tcprb_resize_meta(rb, *(int *)optval) < 0)
270*76404edcSAsim Jamshed 				return -1;
271*76404edcSAsim Jamshed 			return tcprb_resize(rb,
272*76404edcSAsim Jamshed 					(((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
273*76404edcSAsim Jamshed #endif
274*76404edcSAsim Jamshed #else
275*76404edcSAsim Jamshed 			errno = EBADF;
276*76404edcSAsim Jamshed 			return -1;
277*76404edcSAsim Jamshed #endif
278*76404edcSAsim Jamshed 		case MOS_SVRBUF:
279*76404edcSAsim Jamshed #if 0
280*76404edcSAsim Jamshed 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
281*76404edcSAsim Jamshed 				errno = EBADF;
282*76404edcSAsim Jamshed 				return -1;
283*76404edcSAsim Jamshed 			}
284*76404edcSAsim Jamshed #endif
285*76404edcSAsim Jamshed #ifdef NEWRB
286*76404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE
287*76404edcSAsim Jamshed 			if (*(int *)optval != 0)
288*76404edcSAsim Jamshed 				return -1;
289*76404edcSAsim Jamshed 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
290*76404edcSAsim Jamshed 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
291*76404edcSAsim Jamshed 					socket->monitor_stream->stream->rcvvar->rcvbuf :
292*76404edcSAsim Jamshed 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
293*76404edcSAsim Jamshed 				if (rb) {
294*76404edcSAsim Jamshed 					tcprb_resize_meta(rb, 0);
295*76404edcSAsim Jamshed 					tcprb_resize(rb, 0);
296*76404edcSAsim Jamshed 				}
297*76404edcSAsim Jamshed 			}
298*76404edcSAsim Jamshed 			return DisableBuf(socket, MOS_SIDE_SVR);
299*76404edcSAsim Jamshed #else
300*76404edcSAsim Jamshed 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
301*76404edcSAsim Jamshed 				socket->monitor_stream->stream->rcvvar->rcvbuf :
302*76404edcSAsim Jamshed 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
303*76404edcSAsim Jamshed 			if (tcprb_resize_meta(rb, *(int *)optval) < 0)
304*76404edcSAsim Jamshed 				return -1;
305*76404edcSAsim Jamshed 			return tcprb_resize(rb,
306*76404edcSAsim Jamshed 					(((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
307*76404edcSAsim Jamshed #endif
308*76404edcSAsim Jamshed #else
309*76404edcSAsim Jamshed 			errno = EBADF;
310*76404edcSAsim Jamshed 			return -1;
311*76404edcSAsim Jamshed #endif
312*76404edcSAsim Jamshed 		case MOS_FRAG_CLIBUF:
313*76404edcSAsim Jamshed #if 0
314*76404edcSAsim Jamshed 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
315*76404edcSAsim Jamshed 				errno = EBADF;
316*76404edcSAsim Jamshed 				return -1;
317*76404edcSAsim Jamshed 			}
318*76404edcSAsim Jamshed #endif
319*76404edcSAsim Jamshed #ifdef NEWRB
320*76404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE
321*76404edcSAsim Jamshed 			if (*(int *)optval != 0)
322*76404edcSAsim Jamshed 				return -1;
323*76404edcSAsim Jamshed 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
324*76404edcSAsim Jamshed 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
325*76404edcSAsim Jamshed 					socket->monitor_stream->stream->rcvvar->rcvbuf :
326*76404edcSAsim Jamshed 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
327*76404edcSAsim Jamshed 				if (rb)
328*76404edcSAsim Jamshed 					tcprb_resize(rb, 0);
329*76404edcSAsim Jamshed 			}
330*76404edcSAsim Jamshed 			return 0;
331*76404edcSAsim Jamshed #else
332*76404edcSAsim Jamshed 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
333*76404edcSAsim Jamshed 				socket->monitor_stream->stream->rcvvar->rcvbuf :
334*76404edcSAsim Jamshed 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
335*76404edcSAsim Jamshed 			if (rb->len == 0)
336*76404edcSAsim Jamshed 				return tcprb_resize_meta(rb, *(int *)optval);
337*76404edcSAsim Jamshed 			else
338*76404edcSAsim Jamshed 				return -1;
339*76404edcSAsim Jamshed #endif
340*76404edcSAsim Jamshed #else
341*76404edcSAsim Jamshed 			errno = EBADF;
342*76404edcSAsim Jamshed 			return -1;
343*76404edcSAsim Jamshed #endif
344*76404edcSAsim Jamshed 		case MOS_FRAG_SVRBUF:
345*76404edcSAsim Jamshed #if 0
346*76404edcSAsim Jamshed 			if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
347*76404edcSAsim Jamshed 				errno = EBADF;
348*76404edcSAsim Jamshed 				return -1;
349*76404edcSAsim Jamshed 			}
350*76404edcSAsim Jamshed #endif
351*76404edcSAsim Jamshed #ifdef NEWRB
352*76404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE
353*76404edcSAsim Jamshed 			if (*(int *)optval != 0)
354*76404edcSAsim Jamshed 				return -1;
355*76404edcSAsim Jamshed 			if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
356*76404edcSAsim Jamshed 				rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
357*76404edcSAsim Jamshed 					socket->monitor_stream->stream->rcvvar->rcvbuf :
358*76404edcSAsim Jamshed 					socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
359*76404edcSAsim Jamshed 				if (rb)
360*76404edcSAsim Jamshed 					tcprb_resize(rb, 0);
361*76404edcSAsim Jamshed 			}
362*76404edcSAsim Jamshed 			return 0;
363*76404edcSAsim Jamshed #else
364*76404edcSAsim Jamshed 			rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
365*76404edcSAsim Jamshed 				socket->monitor_stream->stream->rcvvar->rcvbuf :
366*76404edcSAsim Jamshed 				socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
367*76404edcSAsim Jamshed 			if (rb->len == 0)
368*76404edcSAsim Jamshed 				return tcprb_resize_meta(rb, *(int *)optval);
369*76404edcSAsim Jamshed 			else
370*76404edcSAsim Jamshed 				return -1;
371*76404edcSAsim Jamshed #endif
372*76404edcSAsim Jamshed #else
373*76404edcSAsim Jamshed 			errno = EBADF;
374*76404edcSAsim Jamshed 			return -1;
375*76404edcSAsim Jamshed #endif
376*76404edcSAsim Jamshed 		case MOS_MONLEVEL:
377*76404edcSAsim Jamshed #ifdef OLD_API
378*76404edcSAsim Jamshed 			assert(*(int *)optval == MOS_NO_CLIBUF ||
379*76404edcSAsim Jamshed 			       *(int *)optval == MOS_NO_SVRBUF);
380*76404edcSAsim Jamshed 			return DisableBuf(socket,
381*76404edcSAsim Jamshed 					  (*(int *)optval == MOS_NO_CLIBUF) ?
382*76404edcSAsim Jamshed 					  MOS_SIDE_CLI : MOS_SIDE_SVR);
383*76404edcSAsim Jamshed #endif
384*76404edcSAsim Jamshed 		case MOS_SEQ_REMAP:
385*76404edcSAsim Jamshed 			return TcpSeqChange(socket,
386*76404edcSAsim Jamshed 					    (uint32_t)((seq_remap_info *)optval)->seq_off,
387*76404edcSAsim Jamshed 					    ((seq_remap_info *)optval)->side,
388*76404edcSAsim Jamshed 					    mtcp->pctx->p.seq);
389*76404edcSAsim Jamshed 		case MOS_STOP_MON:
390*76404edcSAsim Jamshed 			return mtcp_cb_stop(mctx, sockid, *(int *)optval);
391*76404edcSAsim Jamshed 		default:
392*76404edcSAsim Jamshed 			TRACE_API("invalid optname=%d\n", optname);
393*76404edcSAsim Jamshed 			assert(0);
394*76404edcSAsim Jamshed 		}
395*76404edcSAsim Jamshed 		break;
396*76404edcSAsim Jamshed 	}
397*76404edcSAsim Jamshed 
398*76404edcSAsim Jamshed 	errno = ENOSYS;
399*76404edcSAsim Jamshed 	return -1;
400*76404edcSAsim Jamshed }
401*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
402*76404edcSAsim Jamshed int
403*76404edcSAsim Jamshed mtcp_setsock_nonblock(mctx_t mctx, int sockid)
404*76404edcSAsim Jamshed {
405*76404edcSAsim Jamshed 	mtcp_manager_t mtcp;
406*76404edcSAsim Jamshed 
407*76404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
408*76404edcSAsim Jamshed 	if (!mtcp) {
409*76404edcSAsim Jamshed 		errno = EACCES;
410*76404edcSAsim Jamshed 		return -1;
411*76404edcSAsim Jamshed 	}
412*76404edcSAsim Jamshed 
413*76404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
414*76404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
415*76404edcSAsim Jamshed 		errno = EBADF;
416*76404edcSAsim Jamshed 		return -1;
417*76404edcSAsim Jamshed 	}
418*76404edcSAsim Jamshed 
419*76404edcSAsim Jamshed 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
420*76404edcSAsim Jamshed 		TRACE_API("Invalid socket id: %d\n", sockid);
421*76404edcSAsim Jamshed 		errno = EBADF;
422*76404edcSAsim Jamshed 		return -1;
423*76404edcSAsim Jamshed 	}
424*76404edcSAsim Jamshed 
425*76404edcSAsim Jamshed 	mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
426*76404edcSAsim Jamshed 
427*76404edcSAsim Jamshed 	return 0;
428*76404edcSAsim Jamshed }
429*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
430*76404edcSAsim Jamshed int
431*76404edcSAsim Jamshed mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp)
432*76404edcSAsim Jamshed {
433*76404edcSAsim Jamshed 	mtcp_manager_t mtcp;
434*76404edcSAsim Jamshed 	socket_map_t socket;
435*76404edcSAsim Jamshed 
436*76404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
437*76404edcSAsim Jamshed 	if (!mtcp) {
438*76404edcSAsim Jamshed 		errno = EACCES;
439*76404edcSAsim Jamshed 		return -1;
440*76404edcSAsim Jamshed 	}
441*76404edcSAsim Jamshed 
442*76404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
443*76404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
444*76404edcSAsim Jamshed 		errno = EBADF;
445*76404edcSAsim Jamshed 		return -1;
446*76404edcSAsim Jamshed 	}
447*76404edcSAsim Jamshed 
448*76404edcSAsim Jamshed 	/* only support stream socket */
449*76404edcSAsim Jamshed 	socket = &mtcp->smap[sockid];
450*76404edcSAsim Jamshed 
451*76404edcSAsim Jamshed 	if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
452*76404edcSAsim Jamshed 		socket->socktype != MOS_SOCK_STREAM) {
453*76404edcSAsim Jamshed 		TRACE_API("Invalid socket id: %d\n", sockid);
454*76404edcSAsim Jamshed 		errno = EBADF;
455*76404edcSAsim Jamshed 		return -1;
456*76404edcSAsim Jamshed 	}
457*76404edcSAsim Jamshed 
458*76404edcSAsim Jamshed 	if (!argp) {
459*76404edcSAsim Jamshed 		errno = EFAULT;
460*76404edcSAsim Jamshed 		return -1;
461*76404edcSAsim Jamshed 	}
462*76404edcSAsim Jamshed 
463*76404edcSAsim Jamshed 	if (request == FIONREAD) {
464*76404edcSAsim Jamshed 		tcp_stream *cur_stream;
465*76404edcSAsim Jamshed #ifdef NEWRB
466*76404edcSAsim Jamshed 		tcprb_t *rbuf;
467*76404edcSAsim Jamshed #else
468*76404edcSAsim Jamshed 		struct tcp_ring_buffer *rbuf;
469*76404edcSAsim Jamshed #endif
470*76404edcSAsim Jamshed 
471*76404edcSAsim Jamshed 		cur_stream = socket->stream;
472*76404edcSAsim Jamshed 		if (!cur_stream) {
473*76404edcSAsim Jamshed 			errno = EBADF;
474*76404edcSAsim Jamshed 			return -1;
475*76404edcSAsim Jamshed 		}
476*76404edcSAsim Jamshed 
477*76404edcSAsim Jamshed 		rbuf = cur_stream->rcvvar->rcvbuf;
478*76404edcSAsim Jamshed #ifdef NEWRB
479*76404edcSAsim Jamshed 		*(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0;
480*76404edcSAsim Jamshed #else
481*76404edcSAsim Jamshed 		*(int *)argp = (rbuf) ? rbuf->merged_len : 0;
482*76404edcSAsim Jamshed #endif
483*76404edcSAsim Jamshed 
484*76404edcSAsim Jamshed 	} else if (request == FIONBIO) {
485*76404edcSAsim Jamshed 		/*
486*76404edcSAsim Jamshed 		 * sockets can only be set to blocking/non-blocking
487*76404edcSAsim Jamshed 		 * modes during initialization
488*76404edcSAsim Jamshed 		 */
489*76404edcSAsim Jamshed 		if ((*(int *)argp))
490*76404edcSAsim Jamshed 			mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
491*76404edcSAsim Jamshed 		else
492*76404edcSAsim Jamshed 			mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK;
493*76404edcSAsim Jamshed 	} else {
494*76404edcSAsim Jamshed 		errno = EINVAL;
495*76404edcSAsim Jamshed 		return -1;
496*76404edcSAsim Jamshed 	}
497*76404edcSAsim Jamshed 
498*76404edcSAsim Jamshed 	return 0;
499*76404edcSAsim Jamshed }
500*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
501*76404edcSAsim Jamshed static int
502*76404edcSAsim Jamshed mtcp_monitor(mctx_t mctx, socket_map_t sock)
503*76404edcSAsim Jamshed {
504*76404edcSAsim Jamshed 	mtcp_manager_t mtcp;
505*76404edcSAsim Jamshed 	struct mon_listener *monitor;
506*76404edcSAsim Jamshed 	int sockid = sock->id;
507*76404edcSAsim Jamshed 
508*76404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
509*76404edcSAsim Jamshed 	if (!mtcp) {
510*76404edcSAsim Jamshed 		errno = EACCES;
511*76404edcSAsim Jamshed 		return -1;
512*76404edcSAsim Jamshed 	}
513*76404edcSAsim Jamshed 
514*76404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
515*76404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
516*76404edcSAsim Jamshed 		errno = EBADF;
517*76404edcSAsim Jamshed 		return -1;
518*76404edcSAsim Jamshed 	}
519*76404edcSAsim Jamshed 
520*76404edcSAsim Jamshed 	if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) {
521*76404edcSAsim Jamshed 		TRACE_API("Invalid socket id: %d\n", sockid);
522*76404edcSAsim Jamshed 		errno = EBADF;
523*76404edcSAsim Jamshed 		return -1;
524*76404edcSAsim Jamshed 	}
525*76404edcSAsim Jamshed 
526*76404edcSAsim Jamshed 	if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM ||
527*76404edcSAsim Jamshed 	      mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) {
528*76404edcSAsim Jamshed 		TRACE_API("Not a monitor socket. id: %d\n", sockid);
529*76404edcSAsim Jamshed 		errno = ENOTSOCK;
530*76404edcSAsim Jamshed 		return -1;
531*76404edcSAsim Jamshed 	}
532*76404edcSAsim Jamshed 
533*76404edcSAsim Jamshed 	monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener));
534*76404edcSAsim Jamshed 	if (!monitor) {
535*76404edcSAsim Jamshed 		/* errno set from the malloc() */
536*76404edcSAsim Jamshed 		errno = ENOMEM;
537*76404edcSAsim Jamshed 		return -1;
538*76404edcSAsim Jamshed 	}
539*76404edcSAsim Jamshed 
540*76404edcSAsim Jamshed 	/* create a monitor-specific event queue */
541*76404edcSAsim Jamshed 	monitor->eq = CreateEventQueue(g_config.mos->max_concurrency);
542*76404edcSAsim Jamshed 	if (!monitor->eq) {
543*76404edcSAsim Jamshed 		TRACE_API("Can't create event queue (concurrency: %d) for "
544*76404edcSAsim Jamshed 			  "monitor read event registrations!\n",
545*76404edcSAsim Jamshed 			  g_config.mos->max_concurrency);
546*76404edcSAsim Jamshed 		free(monitor);
547*76404edcSAsim Jamshed 		errno = ENOMEM;
548*76404edcSAsim Jamshed 		return -1;
549*76404edcSAsim Jamshed 	}
550*76404edcSAsim Jamshed 
551*76404edcSAsim Jamshed 	/* set monitor-related basic parameters */
552*76404edcSAsim Jamshed #ifndef NEWEV
553*76404edcSAsim Jamshed 	monitor->ude_id = UDE_OFFSET;
554*76404edcSAsim Jamshed #endif
555*76404edcSAsim Jamshed 	monitor->socket = sock;
556*76404edcSAsim Jamshed #ifdef NEWRB
557*76404edcSAsim Jamshed 	monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL;
558*76404edcSAsim Jamshed #else
559*76404edcSAsim Jamshed 	monitor->client_buf_mgmt = monitor->server_buf_mgmt = 1;
560*76404edcSAsim Jamshed #endif
561*76404edcSAsim Jamshed 
562*76404edcSAsim Jamshed 	/* perform both sides monitoring by default */
563*76404edcSAsim Jamshed 	monitor->client_mon = monitor->server_mon = 1;
564*76404edcSAsim Jamshed 
565*76404edcSAsim Jamshed 	/* add monitor socket to the monitor list */
566*76404edcSAsim Jamshed 	TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link);
567*76404edcSAsim Jamshed 
568*76404edcSAsim Jamshed 	mtcp->msmap[sockid].monitor_listener = monitor;
569*76404edcSAsim Jamshed 
570*76404edcSAsim Jamshed 	return 0;
571*76404edcSAsim Jamshed }
572*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
573*76404edcSAsim Jamshed int
574*76404edcSAsim Jamshed mtcp_socket(mctx_t mctx, int domain, int type, int protocol)
575*76404edcSAsim Jamshed {
576*76404edcSAsim Jamshed 	mtcp_manager_t mtcp;
577*76404edcSAsim Jamshed 	socket_map_t socket;
578*76404edcSAsim Jamshed 
579*76404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
580*76404edcSAsim Jamshed 	if (!mtcp) {
581*76404edcSAsim Jamshed 		errno = EACCES;
582*76404edcSAsim Jamshed 		return -1;
583*76404edcSAsim Jamshed 	}
584*76404edcSAsim Jamshed 
585*76404edcSAsim Jamshed 	if (domain != AF_INET) {
586*76404edcSAsim Jamshed 		errno = EAFNOSUPPORT;
587*76404edcSAsim Jamshed 		return -1;
588*76404edcSAsim Jamshed 	}
589*76404edcSAsim Jamshed 
590*76404edcSAsim Jamshed 	if (type == SOCK_STREAM) {
591*76404edcSAsim Jamshed 		type = MOS_SOCK_STREAM;
592*76404edcSAsim Jamshed 	} else if (type == MOS_SOCK_MONITOR_STREAM ||
593*76404edcSAsim Jamshed 		   type == MOS_SOCK_MONITOR_RAW) {
594*76404edcSAsim Jamshed 		/* do nothing for the time being */
595*76404edcSAsim Jamshed 	} else {
596*76404edcSAsim Jamshed 		/* Not supported type */
597*76404edcSAsim Jamshed 		errno = EINVAL;
598*76404edcSAsim Jamshed 		return -1;
599*76404edcSAsim Jamshed 	}
600*76404edcSAsim Jamshed 
601*76404edcSAsim Jamshed 	socket = AllocateSocket(mctx, type);
602*76404edcSAsim Jamshed 	if (!socket) {
603*76404edcSAsim Jamshed 		errno = ENFILE;
604*76404edcSAsim Jamshed 		return -1;
605*76404edcSAsim Jamshed 	}
606*76404edcSAsim Jamshed 
607*76404edcSAsim Jamshed 	if (type == MOS_SOCK_MONITOR_STREAM ||
608*76404edcSAsim Jamshed 	    type == MOS_SOCK_MONITOR_RAW) {
609*76404edcSAsim Jamshed 		mtcp_manager_t mtcp = GetMTCPManager(mctx);
610*76404edcSAsim Jamshed 		if (!mtcp) {
611*76404edcSAsim Jamshed 			errno = EACCES;
612*76404edcSAsim Jamshed 			return -1;
613*76404edcSAsim Jamshed 		}
614*76404edcSAsim Jamshed 		mtcp_monitor(mctx, socket);
615*76404edcSAsim Jamshed #ifdef NEWEV
616*76404edcSAsim Jamshed 		socket->monitor_listener->stree_dontcare = NULL;
617*76404edcSAsim Jamshed 		socket->monitor_listener->stree_pre_rcv = NULL;
618*76404edcSAsim Jamshed 		socket->monitor_listener->stree_post_snd = NULL;
619*76404edcSAsim Jamshed #else
620*76404edcSAsim Jamshed 		InitEvB(mtcp, &socket->monitor_listener->dontcare_evb);
621*76404edcSAsim Jamshed 		InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb);
622*76404edcSAsim Jamshed 		InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb);
623*76404edcSAsim Jamshed #endif
624*76404edcSAsim Jamshed 	}
625*76404edcSAsim Jamshed 
626*76404edcSAsim Jamshed 	return socket->id;
627*76404edcSAsim Jamshed }
628*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
629*76404edcSAsim Jamshed int
630*76404edcSAsim Jamshed mtcp_bind(mctx_t mctx, int sockid,
631*76404edcSAsim Jamshed 		const struct sockaddr *addr, socklen_t addrlen)
632*76404edcSAsim Jamshed {
633*76404edcSAsim Jamshed 	mtcp_manager_t mtcp;
634*76404edcSAsim Jamshed 	struct sockaddr_in *addr_in;
635*76404edcSAsim Jamshed 
636*76404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
637*76404edcSAsim Jamshed 	if (!mtcp) {
638*76404edcSAsim Jamshed 		errno = EACCES;
639*76404edcSAsim Jamshed 		return -1;
640*76404edcSAsim Jamshed 	}
641*76404edcSAsim Jamshed 
642*76404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
643*76404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
644*76404edcSAsim Jamshed 		errno = EBADF;
645*76404edcSAsim Jamshed 		return -1;
646*76404edcSAsim Jamshed 	}
647*76404edcSAsim Jamshed 
648*76404edcSAsim Jamshed 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
649*76404edcSAsim Jamshed 		TRACE_API("Invalid socket id: %d\n", sockid);
650*76404edcSAsim Jamshed 		errno = EBADF;
651*76404edcSAsim Jamshed 		return -1;
652*76404edcSAsim Jamshed 	}
653*76404edcSAsim Jamshed 
654*76404edcSAsim Jamshed 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM &&
655*76404edcSAsim Jamshed 			mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
656*76404edcSAsim Jamshed 		TRACE_API("Not a stream socket id: %d\n", sockid);
657*76404edcSAsim Jamshed 		errno = ENOTSOCK;
658*76404edcSAsim Jamshed 		return -1;
659*76404edcSAsim Jamshed 	}
660*76404edcSAsim Jamshed 
661*76404edcSAsim Jamshed 	if (!addr) {
662*76404edcSAsim Jamshed 		TRACE_API("Socket %d: empty address!\n", sockid);
663*76404edcSAsim Jamshed 		errno = EINVAL;
664*76404edcSAsim Jamshed 		return -1;
665*76404edcSAsim Jamshed 	}
666*76404edcSAsim Jamshed 
667*76404edcSAsim Jamshed 	if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) {
668*76404edcSAsim Jamshed 		TRACE_API("Socket %d: adress already bind for this socket.\n", sockid);
669*76404edcSAsim Jamshed 		errno = EINVAL;
670*76404edcSAsim Jamshed 		return -1;
671*76404edcSAsim Jamshed 	}
672*76404edcSAsim Jamshed 
673*76404edcSAsim Jamshed 	/* we only allow bind() for AF_INET address */
674*76404edcSAsim Jamshed 	if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
675*76404edcSAsim Jamshed 		TRACE_API("Socket %d: invalid argument!\n", sockid);
676*76404edcSAsim Jamshed 		errno = EINVAL;
677*76404edcSAsim Jamshed 		return -1;
678*76404edcSAsim Jamshed 	}
679*76404edcSAsim Jamshed 
680*76404edcSAsim Jamshed 	if (mtcp->listener) {
681*76404edcSAsim Jamshed 		TRACE_API("Address already bound!\n");
682*76404edcSAsim Jamshed 		errno = EINVAL;
683*76404edcSAsim Jamshed 		return -1;
684*76404edcSAsim Jamshed 	}
685*76404edcSAsim Jamshed 	addr_in = (struct sockaddr_in *)addr;
686*76404edcSAsim Jamshed 	mtcp->smap[sockid].saddr = *addr_in;
687*76404edcSAsim Jamshed 	mtcp->smap[sockid].opts |= MTCP_ADDR_BIND;
688*76404edcSAsim Jamshed 
689*76404edcSAsim Jamshed 	return 0;
690*76404edcSAsim Jamshed }
691*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
692*76404edcSAsim Jamshed int
693*76404edcSAsim Jamshed mtcp_listen(mctx_t mctx, int sockid, int backlog)
694*76404edcSAsim Jamshed {
695*76404edcSAsim Jamshed 	mtcp_manager_t mtcp;
696*76404edcSAsim Jamshed 	struct tcp_listener *listener;
697*76404edcSAsim Jamshed 
698*76404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
699*76404edcSAsim Jamshed 	if (!mtcp) {
700*76404edcSAsim Jamshed 		errno = EACCES;
701*76404edcSAsim Jamshed 		return -1;
702*76404edcSAsim Jamshed 	}
703*76404edcSAsim Jamshed 
704*76404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
705*76404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
706*76404edcSAsim Jamshed 		errno = EBADF;
707*76404edcSAsim Jamshed 		return -1;
708*76404edcSAsim Jamshed 	}
709*76404edcSAsim Jamshed 
710*76404edcSAsim Jamshed 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
711*76404edcSAsim Jamshed 		TRACE_API("Invalid socket id: %d\n", sockid);
712*76404edcSAsim Jamshed 		errno = EBADF;
713*76404edcSAsim Jamshed 		return -1;
714*76404edcSAsim Jamshed 	}
715*76404edcSAsim Jamshed 
716*76404edcSAsim Jamshed 	if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) {
717*76404edcSAsim Jamshed 		mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN;
718*76404edcSAsim Jamshed 	}
719*76404edcSAsim Jamshed 
720*76404edcSAsim Jamshed 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
721*76404edcSAsim Jamshed 		TRACE_API("Not a listening socket. id: %d\n", sockid);
722*76404edcSAsim Jamshed 		errno = ENOTSOCK;
723*76404edcSAsim Jamshed 		return -1;
724*76404edcSAsim Jamshed 	}
725*76404edcSAsim Jamshed 
726*76404edcSAsim Jamshed 	if (backlog <= 0 || backlog > g_config.mos->max_concurrency) {
727*76404edcSAsim Jamshed 		errno = EINVAL;
728*76404edcSAsim Jamshed 		return -1;
729*76404edcSAsim Jamshed 	}
730*76404edcSAsim Jamshed 
731*76404edcSAsim Jamshed 	listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener));
732*76404edcSAsim Jamshed 	if (!listener) {
733*76404edcSAsim Jamshed 		/* errno set from the malloc() */
734*76404edcSAsim Jamshed 		errno = ENOMEM;
735*76404edcSAsim Jamshed 		return -1;
736*76404edcSAsim Jamshed 	}
737*76404edcSAsim Jamshed 
738*76404edcSAsim Jamshed 	listener->sockid = sockid;
739*76404edcSAsim Jamshed 	listener->backlog = backlog;
740*76404edcSAsim Jamshed 	listener->socket = &mtcp->smap[sockid];
741*76404edcSAsim Jamshed 
742*76404edcSAsim Jamshed 	if (pthread_cond_init(&listener->accept_cond, NULL)) {
743*76404edcSAsim Jamshed 		perror("pthread_cond_init of ctx->accept_cond\n");
744*76404edcSAsim Jamshed 		/* errno set by pthread_cond_init() */
745*76404edcSAsim Jamshed 		return -1;
746*76404edcSAsim Jamshed 	}
747*76404edcSAsim Jamshed 	if (pthread_mutex_init(&listener->accept_lock, NULL)) {
748*76404edcSAsim Jamshed 		perror("pthread_mutex_init of ctx->accept_lock\n");
749*76404edcSAsim Jamshed 		/* errno set by pthread_mutex_init() */
750*76404edcSAsim Jamshed 		return -1;
751*76404edcSAsim Jamshed 	}
752*76404edcSAsim Jamshed 
753*76404edcSAsim Jamshed 	listener->acceptq = CreateStreamQueue(backlog);
754*76404edcSAsim Jamshed 	if (!listener->acceptq) {
755*76404edcSAsim Jamshed 		errno = ENOMEM;
756*76404edcSAsim Jamshed 		return -1;
757*76404edcSAsim Jamshed 	}
758*76404edcSAsim Jamshed 
759*76404edcSAsim Jamshed 	mtcp->smap[sockid].listener = listener;
760*76404edcSAsim Jamshed 	mtcp->listener = listener;
761*76404edcSAsim Jamshed 
762*76404edcSAsim Jamshed 	return 0;
763*76404edcSAsim Jamshed }
764*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
765*76404edcSAsim Jamshed int
766*76404edcSAsim Jamshed mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen)
767*76404edcSAsim Jamshed {
768*76404edcSAsim Jamshed 	mtcp_manager_t mtcp;
769*76404edcSAsim Jamshed 	struct tcp_listener *listener;
770*76404edcSAsim Jamshed 	socket_map_t socket;
771*76404edcSAsim Jamshed 	tcp_stream *accepted = NULL;
772*76404edcSAsim Jamshed 
773*76404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
774*76404edcSAsim Jamshed 	if (!mtcp) {
775*76404edcSAsim Jamshed 		errno = EACCES;
776*76404edcSAsim Jamshed 		return -1;
777*76404edcSAsim Jamshed 	}
778*76404edcSAsim Jamshed 
779*76404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
780*76404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
781*76404edcSAsim Jamshed 		errno = EBADF;
782*76404edcSAsim Jamshed 		return -1;
783*76404edcSAsim Jamshed 	}
784*76404edcSAsim Jamshed 
785*76404edcSAsim Jamshed 	/* requires listening socket */
786*76404edcSAsim Jamshed 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
787*76404edcSAsim Jamshed 		errno = EINVAL;
788*76404edcSAsim Jamshed 		return -1;
789*76404edcSAsim Jamshed 	}
790*76404edcSAsim Jamshed 
791*76404edcSAsim Jamshed 	listener = mtcp->smap[sockid].listener;
792*76404edcSAsim Jamshed 
793*76404edcSAsim Jamshed 	/* dequeue from the acceptq without lock first */
794*76404edcSAsim Jamshed 	/* if nothing there, acquire lock and cond_wait */
795*76404edcSAsim Jamshed 	accepted = StreamDequeue(listener->acceptq);
796*76404edcSAsim Jamshed 	if (!accepted) {
797*76404edcSAsim Jamshed 		if (listener->socket->opts & MTCP_NONBLOCK) {
798*76404edcSAsim Jamshed 			errno = EAGAIN;
799*76404edcSAsim Jamshed 			return -1;
800*76404edcSAsim Jamshed 
801*76404edcSAsim Jamshed 		} else {
802*76404edcSAsim Jamshed 			pthread_mutex_lock(&listener->accept_lock);
803*76404edcSAsim Jamshed 			while ((accepted = StreamDequeue(listener->acceptq)) == NULL) {
804*76404edcSAsim Jamshed 				pthread_cond_wait(&listener->accept_cond, &listener->accept_lock);
805*76404edcSAsim Jamshed 
806*76404edcSAsim Jamshed 				if (mtcp->ctx->done || mtcp->ctx->exit) {
807*76404edcSAsim Jamshed 					pthread_mutex_unlock(&listener->accept_lock);
808*76404edcSAsim Jamshed 					errno = EINTR;
809*76404edcSAsim Jamshed 					return -1;
810*76404edcSAsim Jamshed 				}
811*76404edcSAsim Jamshed 			}
812*76404edcSAsim Jamshed 			pthread_mutex_unlock(&listener->accept_lock);
813*76404edcSAsim Jamshed 		}
814*76404edcSAsim Jamshed 	}
815*76404edcSAsim Jamshed 
816*76404edcSAsim Jamshed 	if (!accepted) {
817*76404edcSAsim Jamshed 		TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n");
818*76404edcSAsim Jamshed 	}
819*76404edcSAsim Jamshed 
820*76404edcSAsim Jamshed 	if (!accepted->socket) {
821*76404edcSAsim Jamshed 		socket = AllocateSocket(mctx, MOS_SOCK_STREAM);
822*76404edcSAsim Jamshed 		if (!socket) {
823*76404edcSAsim Jamshed 			TRACE_ERROR("Failed to create new socket!\n");
824*76404edcSAsim Jamshed 			/* TODO: destroy the stream */
825*76404edcSAsim Jamshed 			errno = ENFILE;
826*76404edcSAsim Jamshed 			return -1;
827*76404edcSAsim Jamshed 		}
828*76404edcSAsim Jamshed 		socket->stream = accepted;
829*76404edcSAsim Jamshed 		accepted->socket = socket;
830*76404edcSAsim Jamshed 		/* if monitor is enabled, complete the socket assignment */
831*76404edcSAsim Jamshed 		if (socket->stream->pair_stream != NULL)
832*76404edcSAsim Jamshed 			socket->stream->pair_stream->socket = socket;
833*76404edcSAsim Jamshed 	}
834*76404edcSAsim Jamshed 
835*76404edcSAsim Jamshed 	TRACE_API("Stream %d accepted.\n", accepted->id);
836*76404edcSAsim Jamshed 
837*76404edcSAsim Jamshed 	if (addr && addrlen) {
838*76404edcSAsim Jamshed 		struct sockaddr_in *addr_in = (struct sockaddr_in *)addr;
839*76404edcSAsim Jamshed 		addr_in->sin_family = AF_INET;
840*76404edcSAsim Jamshed 		addr_in->sin_port = accepted->dport;
841*76404edcSAsim Jamshed 		addr_in->sin_addr.s_addr = accepted->daddr;
842*76404edcSAsim Jamshed 		*addrlen = sizeof(struct sockaddr_in);
843*76404edcSAsim Jamshed 	}
844*76404edcSAsim Jamshed 
845*76404edcSAsim Jamshed 	return accepted->socket->id;
846*76404edcSAsim Jamshed }
847*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
848*76404edcSAsim Jamshed int
849*76404edcSAsim Jamshed mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr,
850*76404edcSAsim Jamshed 		in_addr_t daddr, in_addr_t dport)
851*76404edcSAsim Jamshed {
852*76404edcSAsim Jamshed 	mtcp_manager_t mtcp;
853*76404edcSAsim Jamshed 	addr_pool_t ap;
854*76404edcSAsim Jamshed 
855*76404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
856*76404edcSAsim Jamshed 	if (!mtcp) {
857*76404edcSAsim Jamshed 		errno = EACCES;
858*76404edcSAsim Jamshed 		return -1;
859*76404edcSAsim Jamshed 	}
860*76404edcSAsim Jamshed 
861*76404edcSAsim Jamshed 	if (saddr_base == INADDR_ANY) {
862*76404edcSAsim Jamshed 		int nif_out;
863*76404edcSAsim Jamshed 
864*76404edcSAsim Jamshed 		/* for the INADDR_ANY, find the output interface for the destination
865*76404edcSAsim Jamshed 		   and set the saddr_base as the ip address of the output interface */
866*76404edcSAsim Jamshed 		nif_out = GetOutputInterface(daddr);
867*76404edcSAsim Jamshed 		saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr;
868*76404edcSAsim Jamshed 	}
869*76404edcSAsim Jamshed 
870*76404edcSAsim Jamshed 	ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus,
871*76404edcSAsim Jamshed 			saddr_base, num_addr, daddr, dport);
872*76404edcSAsim Jamshed 	if (!ap) {
873*76404edcSAsim Jamshed 		errno = ENOMEM;
874*76404edcSAsim Jamshed 		return -1;
875*76404edcSAsim Jamshed 	}
876*76404edcSAsim Jamshed 
877*76404edcSAsim Jamshed 	mtcp->ap = ap;
878*76404edcSAsim Jamshed 
879*76404edcSAsim Jamshed 	return 0;
880*76404edcSAsim Jamshed }
881*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
882*76404edcSAsim Jamshed int
883*76404edcSAsim Jamshed eval_bpf_5tuple(struct sfbpf_program fcode,
884*76404edcSAsim Jamshed 				in_addr_t saddr, in_port_t sport,
885*76404edcSAsim Jamshed 				in_addr_t daddr, in_port_t dport) {
886*76404edcSAsim Jamshed 	uint8_t buf[TOTAL_TCP_HEADER_LEN];
887*76404edcSAsim Jamshed 	struct ethhdr *ethh;
888*76404edcSAsim Jamshed 	struct iphdr *iph;
889*76404edcSAsim Jamshed 	struct tcphdr *tcph;
890*76404edcSAsim Jamshed 
891*76404edcSAsim Jamshed 	ethh = (struct ethhdr *)buf;
892*76404edcSAsim Jamshed 	ethh->h_proto = htons(ETH_P_IP);
893*76404edcSAsim Jamshed 	iph = (struct iphdr *)(ethh + 1);
894*76404edcSAsim Jamshed 	iph->ihl = IP_HEADER_LEN >> 2;
895*76404edcSAsim Jamshed 	iph->version = 4;
896*76404edcSAsim Jamshed 	iph->tos = 0;
897*76404edcSAsim Jamshed 	iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN);
898*76404edcSAsim Jamshed 	iph->id = htons(0);
899*76404edcSAsim Jamshed 	iph->protocol = IPPROTO_TCP;
900*76404edcSAsim Jamshed 	iph->saddr = saddr;
901*76404edcSAsim Jamshed 	iph->daddr = daddr;
902*76404edcSAsim Jamshed 	iph->check = 0;
903*76404edcSAsim Jamshed 	tcph = (struct tcphdr *)(iph + 1);
904*76404edcSAsim Jamshed 	tcph->source = sport;
905*76404edcSAsim Jamshed 	tcph->dest = dport;
906*76404edcSAsim Jamshed 
907*76404edcSAsim Jamshed 	return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr),
908*76404edcSAsim Jamshed 						 TOTAL_TCP_HEADER_LEN);
909*76404edcSAsim Jamshed }
910*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
911*76404edcSAsim Jamshed int
912*76404edcSAsim Jamshed mtcp_connect(mctx_t mctx, int sockid,
913*76404edcSAsim Jamshed 		const struct sockaddr *addr, socklen_t addrlen)
914*76404edcSAsim Jamshed {
915*76404edcSAsim Jamshed 	mtcp_manager_t mtcp;
916*76404edcSAsim Jamshed 	socket_map_t socket;
917*76404edcSAsim Jamshed 	tcp_stream *cur_stream;
918*76404edcSAsim Jamshed 	struct sockaddr_in *addr_in;
919*76404edcSAsim Jamshed 	in_addr_t dip;
920*76404edcSAsim Jamshed 	in_port_t dport;
921*76404edcSAsim Jamshed 	int is_dyn_bound = FALSE;
922*76404edcSAsim Jamshed 	int ret;
923*76404edcSAsim Jamshed 	int cnt_match = 0;
924*76404edcSAsim Jamshed 	struct mon_listener *walk;
925*76404edcSAsim Jamshed 	struct sfbpf_program fcode;
926*76404edcSAsim Jamshed 
927*76404edcSAsim Jamshed 	cur_stream = NULL;
928*76404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
929*76404edcSAsim Jamshed 	if (!mtcp) {
930*76404edcSAsim Jamshed 		errno = EACCES;
931*76404edcSAsim Jamshed 		return -1;
932*76404edcSAsim Jamshed 	}
933*76404edcSAsim Jamshed 
934*76404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
935*76404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
936*76404edcSAsim Jamshed 		errno = EBADF;
937*76404edcSAsim Jamshed 		return -1;
938*76404edcSAsim Jamshed 	}
939*76404edcSAsim Jamshed 
940*76404edcSAsim Jamshed 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
941*76404edcSAsim Jamshed 		TRACE_API("Invalid socket id: %d\n", sockid);
942*76404edcSAsim Jamshed 		errno = EBADF;
943*76404edcSAsim Jamshed 		return -1;
944*76404edcSAsim Jamshed 	}
945*76404edcSAsim Jamshed 
946*76404edcSAsim Jamshed 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
947*76404edcSAsim Jamshed 		TRACE_API("Not an end socket. id: %d\n", sockid);
948*76404edcSAsim Jamshed 		errno = ENOTSOCK;
949*76404edcSAsim Jamshed 		return -1;
950*76404edcSAsim Jamshed 	}
951*76404edcSAsim Jamshed 
952*76404edcSAsim Jamshed 	if (!addr) {
953*76404edcSAsim Jamshed 		TRACE_API("Socket %d: empty address!\n", sockid);
954*76404edcSAsim Jamshed 		errno = EFAULT;
955*76404edcSAsim Jamshed 		return -1;
956*76404edcSAsim Jamshed 	}
957*76404edcSAsim Jamshed 
958*76404edcSAsim Jamshed 	/* we only allow bind() for AF_INET address */
959*76404edcSAsim Jamshed 	if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
960*76404edcSAsim Jamshed 		TRACE_API("Socket %d: invalid argument!\n", sockid);
961*76404edcSAsim Jamshed 		errno = EAFNOSUPPORT;
962*76404edcSAsim Jamshed 		return -1;
963*76404edcSAsim Jamshed 	}
964*76404edcSAsim Jamshed 
965*76404edcSAsim Jamshed 	socket = &mtcp->smap[sockid];
966*76404edcSAsim Jamshed 	if (socket->stream) {
967*76404edcSAsim Jamshed 		TRACE_API("Socket %d: stream already exist!\n", sockid);
968*76404edcSAsim Jamshed 		if (socket->stream->state >= TCP_ST_ESTABLISHED) {
969*76404edcSAsim Jamshed 			errno = EISCONN;
970*76404edcSAsim Jamshed 		} else {
971*76404edcSAsim Jamshed 			errno = EALREADY;
972*76404edcSAsim Jamshed 		}
973*76404edcSAsim Jamshed 		return -1;
974*76404edcSAsim Jamshed 	}
975*76404edcSAsim Jamshed 
976*76404edcSAsim Jamshed 	addr_in = (struct sockaddr_in *)addr;
977*76404edcSAsim Jamshed 	dip = addr_in->sin_addr.s_addr;
978*76404edcSAsim Jamshed 	dport = addr_in->sin_port;
979*76404edcSAsim Jamshed 
980*76404edcSAsim Jamshed 	/* address binding */
981*76404edcSAsim Jamshed 	if (socket->opts & MTCP_ADDR_BIND &&
982*76404edcSAsim Jamshed 	    socket->saddr.sin_port != INPORT_ANY &&
983*76404edcSAsim Jamshed 	    socket->saddr.sin_addr.s_addr != INADDR_ANY) {
984*76404edcSAsim Jamshed 		int rss_core;
985*76404edcSAsim Jamshed 
986*76404edcSAsim Jamshed 		rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip,
987*76404edcSAsim Jamshed 				socket->saddr.sin_port, dport, num_queues);
988*76404edcSAsim Jamshed 
989*76404edcSAsim Jamshed 		if (rss_core != mctx->cpu) {
990*76404edcSAsim Jamshed 			errno = EINVAL;
991*76404edcSAsim Jamshed 			return -1;
992*76404edcSAsim Jamshed 		}
993*76404edcSAsim Jamshed 	} else {
994*76404edcSAsim Jamshed 		if (mtcp->ap) {
995*76404edcSAsim Jamshed 			ret = FetchAddress(mtcp->ap,
996*76404edcSAsim Jamshed 					mctx->cpu, num_queues, addr_in, &socket->saddr);
997*76404edcSAsim Jamshed 		} else {
998*76404edcSAsim Jamshed 			ret = FetchAddress(ap,
999*76404edcSAsim Jamshed 					mctx->cpu, num_queues, addr_in, &socket->saddr);
1000*76404edcSAsim Jamshed 		}
1001*76404edcSAsim Jamshed 		if (ret < 0) {
1002*76404edcSAsim Jamshed 			errno = EAGAIN;
1003*76404edcSAsim Jamshed 			return -1;
1004*76404edcSAsim Jamshed 		}
1005*76404edcSAsim Jamshed 		socket->opts |= MTCP_ADDR_BIND;
1006*76404edcSAsim Jamshed 		is_dyn_bound = TRUE;
1007*76404edcSAsim Jamshed 	}
1008*76404edcSAsim Jamshed 
1009*76404edcSAsim Jamshed 	cnt_match = 0;
1010*76404edcSAsim Jamshed 	if (mtcp->num_msp > 0) {
1011*76404edcSAsim Jamshed 		TAILQ_FOREACH(walk, &mtcp->monitors, link) {
1012*76404edcSAsim Jamshed 			fcode = walk->stream_syn_fcode;
1013*76404edcSAsim Jamshed 			if (!(ISSET_BPFFILTER(fcode) &&
1014*76404edcSAsim Jamshed 				  eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr,
1015*76404edcSAsim Jamshed 								  socket->saddr.sin_port,
1016*76404edcSAsim Jamshed 								  dip, dport) == 0)) {
1017*76404edcSAsim Jamshed 				walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1
1018*76404edcSAsim Jamshed 				cnt_match++;
1019*76404edcSAsim Jamshed 			}
1020*76404edcSAsim Jamshed 		}
1021*76404edcSAsim Jamshed 	}
1022*76404edcSAsim Jamshed 
1023*76404edcSAsim Jamshed 	if (mtcp->num_msp > 0 && cnt_match > 0) {
1024*76404edcSAsim Jamshed 		/* 150820 dhkim: XXX: embedded mode is not verified */
1025*76404edcSAsim Jamshed #if 1
1026*76404edcSAsim Jamshed 		cur_stream = CreateClientTCPStream(mtcp, socket,
1027*76404edcSAsim Jamshed 						 STREAM_TYPE(MOS_SOCK_STREAM) |
1028*76404edcSAsim Jamshed 						 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
1029*76404edcSAsim Jamshed 						 socket->saddr.sin_addr.s_addr,
1030*76404edcSAsim Jamshed 						 socket->saddr.sin_port, dip, dport, NULL);
1031*76404edcSAsim Jamshed #else
1032*76404edcSAsim Jamshed 		cur_stream = CreateDualTCPStream(mtcp, socket,
1033*76404edcSAsim Jamshed 						 STREAM_TYPE(MOS_SOCK_STREAM) |
1034*76404edcSAsim Jamshed 						 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
1035*76404edcSAsim Jamshed 						 socket->saddr.sin_addr.s_addr,
1036*76404edcSAsim Jamshed 						 socket->saddr.sin_port, dip, dport, NULL);
1037*76404edcSAsim Jamshed #endif
1038*76404edcSAsim Jamshed 	}
1039*76404edcSAsim Jamshed 	else
1040*76404edcSAsim Jamshed 		cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM),
1041*76404edcSAsim Jamshed 					     socket->saddr.sin_addr.s_addr,
1042*76404edcSAsim Jamshed 					     socket->saddr.sin_port, dip, dport, NULL);
1043*76404edcSAsim Jamshed 	if (!cur_stream) {
1044*76404edcSAsim Jamshed 		TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid);
1045*76404edcSAsim Jamshed 		errno = ENOMEM;
1046*76404edcSAsim Jamshed 		return -1;
1047*76404edcSAsim Jamshed 	}
1048*76404edcSAsim Jamshed 
1049*76404edcSAsim Jamshed 	if (is_dyn_bound)
1050*76404edcSAsim Jamshed 		cur_stream->is_bound_addr = TRUE;
1051*76404edcSAsim Jamshed 	cur_stream->sndvar->cwnd = 1;
1052*76404edcSAsim Jamshed 	cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10;
1053*76404edcSAsim Jamshed 	cur_stream->side = MOS_SIDE_CLI;
1054*76404edcSAsim Jamshed 	/* if monitor is enabled, update the pair stream side as well */
1055*76404edcSAsim Jamshed 	if (cur_stream->pair_stream) {
1056*76404edcSAsim Jamshed 		cur_stream->pair_stream->side = MOS_SIDE_SVR;
1057*76404edcSAsim Jamshed 		/*
1058*76404edcSAsim Jamshed 		 * if buffer management is off, then disable
1059*76404edcSAsim Jamshed 		 * monitoring tcp ring of server...
1060*76404edcSAsim Jamshed 		 * if there is even a single monitor asking for
1061*76404edcSAsim Jamshed 		 * buffer management, enable it (that's why the
1062*76404edcSAsim Jamshed 		 * need for the loop)
1063*76404edcSAsim Jamshed 		 */
1064*76404edcSAsim Jamshed 		cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF;
1065*76404edcSAsim Jamshed 		struct socket_map *walk;
1066*76404edcSAsim Jamshed 		SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
1067*76404edcSAsim Jamshed 			uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt;
1068*76404edcSAsim Jamshed 			if (bm > cur_stream->pair_stream->buffer_mgmt) {
1069*76404edcSAsim Jamshed 				cur_stream->pair_stream->buffer_mgmt = bm;
1070*76404edcSAsim Jamshed 				break;
1071*76404edcSAsim Jamshed 			}
1072*76404edcSAsim Jamshed 		} SOCKQ_FOREACH_END;
1073*76404edcSAsim Jamshed 	}
1074*76404edcSAsim Jamshed 
1075*76404edcSAsim Jamshed 	cur_stream->state = TCP_ST_SYN_SENT;
1076*76404edcSAsim Jamshed 	cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1077*76404edcSAsim Jamshed 
1078*76404edcSAsim Jamshed 	TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id);
1079*76404edcSAsim Jamshed 
1080*76404edcSAsim Jamshed 	SQ_LOCK(&mtcp->ctx->connect_lock);
1081*76404edcSAsim Jamshed 	ret = StreamEnqueue(mtcp->connectq, cur_stream);
1082*76404edcSAsim Jamshed 	SQ_UNLOCK(&mtcp->ctx->connect_lock);
1083*76404edcSAsim Jamshed 	mtcp->wakeup_flag = TRUE;
1084*76404edcSAsim Jamshed 	if (ret < 0) {
1085*76404edcSAsim Jamshed 		TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid);
1086*76404edcSAsim Jamshed 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1087*76404edcSAsim Jamshed 		StreamEnqueue(mtcp->destroyq, cur_stream);
1088*76404edcSAsim Jamshed 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1089*76404edcSAsim Jamshed 		errno = EAGAIN;
1090*76404edcSAsim Jamshed 		return -1;
1091*76404edcSAsim Jamshed 	}
1092*76404edcSAsim Jamshed 
1093*76404edcSAsim Jamshed 	/* if nonblocking socket, return EINPROGRESS */
1094*76404edcSAsim Jamshed 	if (socket->opts & MTCP_NONBLOCK) {
1095*76404edcSAsim Jamshed 		errno = EINPROGRESS;
1096*76404edcSAsim Jamshed 		return -1;
1097*76404edcSAsim Jamshed 
1098*76404edcSAsim Jamshed 	} else {
1099*76404edcSAsim Jamshed 		while (1) {
1100*76404edcSAsim Jamshed 			if (!cur_stream) {
1101*76404edcSAsim Jamshed 				TRACE_ERROR("STREAM DESTROYED\n");
1102*76404edcSAsim Jamshed 				errno = ETIMEDOUT;
1103*76404edcSAsim Jamshed 				return -1;
1104*76404edcSAsim Jamshed 			}
1105*76404edcSAsim Jamshed 			if (cur_stream->state > TCP_ST_ESTABLISHED) {
1106*76404edcSAsim Jamshed 				TRACE_ERROR("Socket %d: weird state %s\n",
1107*76404edcSAsim Jamshed 						sockid, TCPStateToString(cur_stream));
1108*76404edcSAsim Jamshed 				// TODO: how to handle this?
1109*76404edcSAsim Jamshed 				errno = ENOSYS;
1110*76404edcSAsim Jamshed 				return -1;
1111*76404edcSAsim Jamshed 			}
1112*76404edcSAsim Jamshed 
1113*76404edcSAsim Jamshed 			if (cur_stream->state == TCP_ST_ESTABLISHED) {
1114*76404edcSAsim Jamshed 				break;
1115*76404edcSAsim Jamshed 			}
1116*76404edcSAsim Jamshed 			usleep(1000);
1117*76404edcSAsim Jamshed 		}
1118*76404edcSAsim Jamshed 	}
1119*76404edcSAsim Jamshed 
1120*76404edcSAsim Jamshed 	return 0;
1121*76404edcSAsim Jamshed }
1122*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
1123*76404edcSAsim Jamshed static inline int
1124*76404edcSAsim Jamshed CloseStreamSocket(mctx_t mctx, int sockid)
1125*76404edcSAsim Jamshed {
1126*76404edcSAsim Jamshed 	mtcp_manager_t mtcp;
1127*76404edcSAsim Jamshed 	tcp_stream *cur_stream;
1128*76404edcSAsim Jamshed 	int ret;
1129*76404edcSAsim Jamshed 
1130*76404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
1131*76404edcSAsim Jamshed 	if (!mtcp) {
1132*76404edcSAsim Jamshed 		errno = EACCES;
1133*76404edcSAsim Jamshed 		return -1;
1134*76404edcSAsim Jamshed 	}
1135*76404edcSAsim Jamshed 
1136*76404edcSAsim Jamshed 	cur_stream = mtcp->smap[sockid].stream;
1137*76404edcSAsim Jamshed 	if (!cur_stream) {
1138*76404edcSAsim Jamshed 		TRACE_API("Socket %d: stream does not exist.\n", sockid);
1139*76404edcSAsim Jamshed 		errno = ENOTCONN;
1140*76404edcSAsim Jamshed 		return -1;
1141*76404edcSAsim Jamshed 	}
1142*76404edcSAsim Jamshed 
1143*76404edcSAsim Jamshed 	if (cur_stream->closed) {
1144*76404edcSAsim Jamshed 		TRACE_API("Socket %d (Stream %u): already closed stream\n",
1145*76404edcSAsim Jamshed 				sockid, cur_stream->id);
1146*76404edcSAsim Jamshed 		return 0;
1147*76404edcSAsim Jamshed 	}
1148*76404edcSAsim Jamshed 	cur_stream->closed = TRUE;
1149*76404edcSAsim Jamshed 
1150*76404edcSAsim Jamshed 	TRACE_API("Stream %d: closing the stream.\n", cur_stream->id);
1151*76404edcSAsim Jamshed 
1152*76404edcSAsim Jamshed 	/* 141029 dhkim: Check this! */
1153*76404edcSAsim Jamshed 	cur_stream->socket = NULL;
1154*76404edcSAsim Jamshed 
1155*76404edcSAsim Jamshed 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
1156*76404edcSAsim Jamshed 		TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n",
1157*76404edcSAsim Jamshed 				cur_stream->id);
1158*76404edcSAsim Jamshed 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1159*76404edcSAsim Jamshed 		StreamEnqueue(mtcp->destroyq, cur_stream);
1160*76404edcSAsim Jamshed 		mtcp->wakeup_flag = TRUE;
1161*76404edcSAsim Jamshed 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1162*76404edcSAsim Jamshed 		return 0;
1163*76404edcSAsim Jamshed 
1164*76404edcSAsim Jamshed 	} else if (cur_stream->state == TCP_ST_SYN_SENT) {
1165*76404edcSAsim Jamshed #if 1
1166*76404edcSAsim Jamshed 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1167*76404edcSAsim Jamshed 		StreamEnqueue(mtcp->destroyq, cur_stream);
1168*76404edcSAsim Jamshed 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1169*76404edcSAsim Jamshed 		mtcp->wakeup_flag = TRUE;
1170*76404edcSAsim Jamshed #endif
1171*76404edcSAsim Jamshed 		return -1;
1172*76404edcSAsim Jamshed 
1173*76404edcSAsim Jamshed 	} else if (cur_stream->state != TCP_ST_ESTABLISHED &&
1174*76404edcSAsim Jamshed 			cur_stream->state != TCP_ST_CLOSE_WAIT) {
1175*76404edcSAsim Jamshed 		TRACE_API("Stream %d at state %s\n",
1176*76404edcSAsim Jamshed 				cur_stream->id, TCPStateToString(cur_stream));
1177*76404edcSAsim Jamshed 		errno = EBADF;
1178*76404edcSAsim Jamshed 		return -1;
1179*76404edcSAsim Jamshed 	}
1180*76404edcSAsim Jamshed 
1181*76404edcSAsim Jamshed 	SQ_LOCK(&mtcp->ctx->close_lock);
1182*76404edcSAsim Jamshed 	cur_stream->sndvar->on_closeq = TRUE;
1183*76404edcSAsim Jamshed 	ret = StreamEnqueue(mtcp->closeq, cur_stream);
1184*76404edcSAsim Jamshed 	mtcp->wakeup_flag = TRUE;
1185*76404edcSAsim Jamshed 	SQ_UNLOCK(&mtcp->ctx->close_lock);
1186*76404edcSAsim Jamshed 
1187*76404edcSAsim Jamshed 	if (ret < 0) {
1188*76404edcSAsim Jamshed 		TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
1189*76404edcSAsim Jamshed 		errno = EAGAIN;
1190*76404edcSAsim Jamshed 		return -1;
1191*76404edcSAsim Jamshed 	}
1192*76404edcSAsim Jamshed 
1193*76404edcSAsim Jamshed 	return 0;
1194*76404edcSAsim Jamshed }
1195*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
1196*76404edcSAsim Jamshed static inline int
1197*76404edcSAsim Jamshed CloseListeningSocket(mctx_t mctx, int sockid)
1198*76404edcSAsim Jamshed {
1199*76404edcSAsim Jamshed 	mtcp_manager_t mtcp;
1200*76404edcSAsim Jamshed 	struct tcp_listener *listener;
1201*76404edcSAsim Jamshed 
1202*76404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
1203*76404edcSAsim Jamshed 	if (!mtcp) {
1204*76404edcSAsim Jamshed 		errno = EACCES;
1205*76404edcSAsim Jamshed 		return -1;
1206*76404edcSAsim Jamshed 	}
1207*76404edcSAsim Jamshed 
1208*76404edcSAsim Jamshed 	listener = mtcp->smap[sockid].listener;
1209*76404edcSAsim Jamshed 	if (!listener) {
1210*76404edcSAsim Jamshed 		errno = EINVAL;
1211*76404edcSAsim Jamshed 		return -1;
1212*76404edcSAsim Jamshed 	}
1213*76404edcSAsim Jamshed 
1214*76404edcSAsim Jamshed 	if (listener->acceptq) {
1215*76404edcSAsim Jamshed 		DestroyStreamQueue(listener->acceptq);
1216*76404edcSAsim Jamshed 		listener->acceptq = NULL;
1217*76404edcSAsim Jamshed 	}
1218*76404edcSAsim Jamshed 
1219*76404edcSAsim Jamshed 	pthread_mutex_lock(&listener->accept_lock);
1220*76404edcSAsim Jamshed 	pthread_cond_signal(&listener->accept_cond);
1221*76404edcSAsim Jamshed 	pthread_mutex_unlock(&listener->accept_lock);
1222*76404edcSAsim Jamshed 
1223*76404edcSAsim Jamshed 	pthread_cond_destroy(&listener->accept_cond);
1224*76404edcSAsim Jamshed 	pthread_mutex_destroy(&listener->accept_lock);
1225*76404edcSAsim Jamshed 
1226*76404edcSAsim Jamshed 	free(listener);
1227*76404edcSAsim Jamshed 	mtcp->smap[sockid].listener = NULL;
1228*76404edcSAsim Jamshed 
1229*76404edcSAsim Jamshed 	return 0;
1230*76404edcSAsim Jamshed }
1231*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
1232*76404edcSAsim Jamshed int
1233*76404edcSAsim Jamshed mtcp_close(mctx_t mctx, int sockid)
1234*76404edcSAsim Jamshed {
1235*76404edcSAsim Jamshed 	mtcp_manager_t mtcp;
1236*76404edcSAsim Jamshed 	int ret;
1237*76404edcSAsim Jamshed 
1238*76404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
1239*76404edcSAsim Jamshed 	if (!mtcp) {
1240*76404edcSAsim Jamshed 		errno = EACCES;
1241*76404edcSAsim Jamshed 		return -1;
1242*76404edcSAsim Jamshed 	}
1243*76404edcSAsim Jamshed 
1244*76404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1245*76404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
1246*76404edcSAsim Jamshed 		errno = EBADF;
1247*76404edcSAsim Jamshed 		return -1;
1248*76404edcSAsim Jamshed 	}
1249*76404edcSAsim Jamshed 
1250*76404edcSAsim Jamshed 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1251*76404edcSAsim Jamshed 		TRACE_API("Invalid socket id: %d\n", sockid);
1252*76404edcSAsim Jamshed 		errno = EBADF;
1253*76404edcSAsim Jamshed 		return -1;
1254*76404edcSAsim Jamshed 	}
1255*76404edcSAsim Jamshed 
1256*76404edcSAsim Jamshed 	TRACE_API("Socket %d: mtcp_close called.\n", sockid);
1257*76404edcSAsim Jamshed 
1258*76404edcSAsim Jamshed 	switch (mtcp->smap[sockid].socktype) {
1259*76404edcSAsim Jamshed 	case MOS_SOCK_STREAM:
1260*76404edcSAsim Jamshed 		ret = CloseStreamSocket(mctx, sockid);
1261*76404edcSAsim Jamshed 		break;
1262*76404edcSAsim Jamshed 
1263*76404edcSAsim Jamshed 	case MOS_SOCK_STREAM_LISTEN:
1264*76404edcSAsim Jamshed 		ret = CloseListeningSocket(mctx, sockid);
1265*76404edcSAsim Jamshed 		break;
1266*76404edcSAsim Jamshed 
1267*76404edcSAsim Jamshed 	case MOS_SOCK_EPOLL:
1268*76404edcSAsim Jamshed 		ret = CloseEpollSocket(mctx, sockid);
1269*76404edcSAsim Jamshed 		break;
1270*76404edcSAsim Jamshed 
1271*76404edcSAsim Jamshed 	case MOS_SOCK_PIPE:
1272*76404edcSAsim Jamshed 		ret = PipeClose(mctx, sockid);
1273*76404edcSAsim Jamshed 		break;
1274*76404edcSAsim Jamshed 
1275*76404edcSAsim Jamshed 	default:
1276*76404edcSAsim Jamshed 		errno = EINVAL;
1277*76404edcSAsim Jamshed 		ret = -1;
1278*76404edcSAsim Jamshed 		break;
1279*76404edcSAsim Jamshed 	}
1280*76404edcSAsim Jamshed 
1281*76404edcSAsim Jamshed 	FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
1282*76404edcSAsim Jamshed 
1283*76404edcSAsim Jamshed 	return ret;
1284*76404edcSAsim Jamshed }
1285*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
1286*76404edcSAsim Jamshed int
1287*76404edcSAsim Jamshed mtcp_abort(mctx_t mctx, int sockid)
1288*76404edcSAsim Jamshed {
1289*76404edcSAsim Jamshed 	mtcp_manager_t mtcp;
1290*76404edcSAsim Jamshed 	tcp_stream *cur_stream;
1291*76404edcSAsim Jamshed 	int ret;
1292*76404edcSAsim Jamshed 
1293*76404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
1294*76404edcSAsim Jamshed 	if (!mtcp) {
1295*76404edcSAsim Jamshed 		errno = EACCES;
1296*76404edcSAsim Jamshed 		return -1;
1297*76404edcSAsim Jamshed 	}
1298*76404edcSAsim Jamshed 
1299*76404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1300*76404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
1301*76404edcSAsim Jamshed 		errno = EBADF;
1302*76404edcSAsim Jamshed 		return -1;
1303*76404edcSAsim Jamshed 	}
1304*76404edcSAsim Jamshed 
1305*76404edcSAsim Jamshed 	if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1306*76404edcSAsim Jamshed 		TRACE_API("Invalid socket id: %d\n", sockid);
1307*76404edcSAsim Jamshed 		errno = EBADF;
1308*76404edcSAsim Jamshed 		return -1;
1309*76404edcSAsim Jamshed 	}
1310*76404edcSAsim Jamshed 
1311*76404edcSAsim Jamshed 	if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
1312*76404edcSAsim Jamshed 		TRACE_API("Not an end socket. id: %d\n", sockid);
1313*76404edcSAsim Jamshed 		errno = ENOTSOCK;
1314*76404edcSAsim Jamshed 		return -1;
1315*76404edcSAsim Jamshed 	}
1316*76404edcSAsim Jamshed 
1317*76404edcSAsim Jamshed 	cur_stream = mtcp->smap[sockid].stream;
1318*76404edcSAsim Jamshed 	if (!cur_stream) {
1319*76404edcSAsim Jamshed 		TRACE_API("Stream %d: does not exist.\n", sockid);
1320*76404edcSAsim Jamshed 		errno = ENOTCONN;
1321*76404edcSAsim Jamshed 		return -1;
1322*76404edcSAsim Jamshed 	}
1323*76404edcSAsim Jamshed 
1324*76404edcSAsim Jamshed 	TRACE_API("Socket %d: mtcp_abort()\n", sockid);
1325*76404edcSAsim Jamshed 
1326*76404edcSAsim Jamshed 	FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
1327*76404edcSAsim Jamshed 	cur_stream->socket = NULL;
1328*76404edcSAsim Jamshed 
1329*76404edcSAsim Jamshed 	if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
1330*76404edcSAsim Jamshed 		TRACE_API("Stream %d: connection already reset.\n", sockid);
1331*76404edcSAsim Jamshed 		return ERROR;
1332*76404edcSAsim Jamshed 
1333*76404edcSAsim Jamshed 	} else if (cur_stream->state == TCP_ST_SYN_SENT) {
1334*76404edcSAsim Jamshed 		/* TODO: this should notify event failure to all
1335*76404edcSAsim Jamshed 		   previous read() or write() calls */
1336*76404edcSAsim Jamshed 		cur_stream->state = TCP_ST_CLOSED_RSVD;
1337*76404edcSAsim Jamshed 		cur_stream->close_reason = TCP_ACTIVE_CLOSE;
1338*76404edcSAsim Jamshed 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1339*76404edcSAsim Jamshed 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1340*76404edcSAsim Jamshed 		StreamEnqueue(mtcp->destroyq, cur_stream);
1341*76404edcSAsim Jamshed 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1342*76404edcSAsim Jamshed 		mtcp->wakeup_flag = TRUE;
1343*76404edcSAsim Jamshed 		return 0;
1344*76404edcSAsim Jamshed 
1345*76404edcSAsim Jamshed 	} else if (cur_stream->state == TCP_ST_CLOSING ||
1346*76404edcSAsim Jamshed 			cur_stream->state == TCP_ST_LAST_ACK ||
1347*76404edcSAsim Jamshed 			cur_stream->state == TCP_ST_TIME_WAIT) {
1348*76404edcSAsim Jamshed 		cur_stream->state = TCP_ST_CLOSED_RSVD;
1349*76404edcSAsim Jamshed 		cur_stream->close_reason = TCP_ACTIVE_CLOSE;
1350*76404edcSAsim Jamshed 		cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1351*76404edcSAsim Jamshed 		SQ_LOCK(&mtcp->ctx->destroyq_lock);
1352*76404edcSAsim Jamshed 		StreamEnqueue(mtcp->destroyq, cur_stream);
1353*76404edcSAsim Jamshed 		SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1354*76404edcSAsim Jamshed 		mtcp->wakeup_flag = TRUE;
1355*76404edcSAsim Jamshed 		return 0;
1356*76404edcSAsim Jamshed 	}
1357*76404edcSAsim Jamshed 
1358*76404edcSAsim Jamshed 	/* the stream structure will be destroyed after sending RST */
1359*76404edcSAsim Jamshed 	if (cur_stream->sndvar->on_resetq) {
1360*76404edcSAsim Jamshed 		TRACE_ERROR("Stream %d: calling mtcp_abort() "
1361*76404edcSAsim Jamshed 				"when in reset queue.\n", sockid);
1362*76404edcSAsim Jamshed 		errno = ECONNRESET;
1363*76404edcSAsim Jamshed 		return -1;
1364*76404edcSAsim Jamshed 	}
1365*76404edcSAsim Jamshed 	SQ_LOCK(&mtcp->ctx->reset_lock);
1366*76404edcSAsim Jamshed 	cur_stream->sndvar->on_resetq = TRUE;
1367*76404edcSAsim Jamshed 	ret = StreamEnqueue(mtcp->resetq, cur_stream);
1368*76404edcSAsim Jamshed 	SQ_UNLOCK(&mtcp->ctx->reset_lock);
1369*76404edcSAsim Jamshed 	mtcp->wakeup_flag = TRUE;
1370*76404edcSAsim Jamshed 
1371*76404edcSAsim Jamshed 	if (ret < 0) {
1372*76404edcSAsim Jamshed 		TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
1373*76404edcSAsim Jamshed 		errno = EAGAIN;
1374*76404edcSAsim Jamshed 		return -1;
1375*76404edcSAsim Jamshed 	}
1376*76404edcSAsim Jamshed 
1377*76404edcSAsim Jamshed 	return 0;
1378*76404edcSAsim Jamshed }
1379*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
1380*76404edcSAsim Jamshed static inline int
1381*76404edcSAsim Jamshed CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len)
1382*76404edcSAsim Jamshed {
1383*76404edcSAsim Jamshed 	struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
1384*76404edcSAsim Jamshed 	int copylen;
1385*76404edcSAsim Jamshed #ifdef NEWRB
1386*76404edcSAsim Jamshed 	tcprb_t *rb = rcvvar->rcvbuf;
1387*76404edcSAsim Jamshed 	if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) {
1388*76404edcSAsim Jamshed 		errno = EAGAIN;
1389*76404edcSAsim Jamshed 		return -1;
1390*76404edcSAsim Jamshed 	}
1391*76404edcSAsim Jamshed 	tcprb_setpile(rb, rb->pile + copylen);
1392*76404edcSAsim Jamshed 
1393*76404edcSAsim Jamshed 	rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb);
1394*76404edcSAsim Jamshed 	//printf("rcv_wnd: %d\n", rcvvar->rcv_wnd);
1395*76404edcSAsim Jamshed #else
1396*76404edcSAsim Jamshed 	copylen = MIN(rcvvar->rcvbuf->merged_len, len);
1397*76404edcSAsim Jamshed 	if (copylen <= 0) {
1398*76404edcSAsim Jamshed 		errno = EAGAIN;
1399*76404edcSAsim Jamshed 		return -1;
1400*76404edcSAsim Jamshed 	}
1401*76404edcSAsim Jamshed 
1402*76404edcSAsim Jamshed 	assert(rcvvar->rcvbuf->data);
1403*76404edcSAsim Jamshed 	/* Copy data to user buffer and remove it from receiving buffer */
1404*76404edcSAsim Jamshed 	memcpy(buf, rcvvar->rcvbuf->head, copylen);
1405*76404edcSAsim Jamshed 	RBRemove(mtcp->rbm_rcv, rcvvar->rcvbuf, copylen, AT_APP, cur_stream->buffer_mgmt);
1406*76404edcSAsim Jamshed 	rcvvar->rcv_wnd = rcvvar->rcvbuf->size - rcvvar->rcvbuf->merged_len;
1407*76404edcSAsim Jamshed #endif
1408*76404edcSAsim Jamshed 
1409*76404edcSAsim Jamshed 	/* Advertise newly freed receive buffer */
1410*76404edcSAsim Jamshed 	if (cur_stream->need_wnd_adv) {
1411*76404edcSAsim Jamshed 		if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) {
1412*76404edcSAsim Jamshed 			if (!cur_stream->sndvar->on_ackq) {
1413*76404edcSAsim Jamshed 				SQ_LOCK(&mtcp->ctx->ackq_lock);
1414*76404edcSAsim Jamshed 				cur_stream->sndvar->on_ackq = TRUE;
1415*76404edcSAsim Jamshed 				StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */
1416*76404edcSAsim Jamshed 				SQ_UNLOCK(&mtcp->ctx->ackq_lock);
1417*76404edcSAsim Jamshed 				cur_stream->need_wnd_adv = FALSE;
1418*76404edcSAsim Jamshed 				mtcp->wakeup_flag = TRUE;
1419*76404edcSAsim Jamshed 			}
1420*76404edcSAsim Jamshed 		}
1421*76404edcSAsim Jamshed 	}
1422*76404edcSAsim Jamshed 
1423*76404edcSAsim Jamshed 	return copylen;
1424*76404edcSAsim Jamshed }
1425*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
1426*76404edcSAsim Jamshed ssize_t
1427*76404edcSAsim Jamshed mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len)
1428*76404edcSAsim Jamshed {
1429*76404edcSAsim Jamshed 	mtcp_manager_t mtcp;
1430*76404edcSAsim Jamshed 	socket_map_t socket;
1431*76404edcSAsim Jamshed 	tcp_stream *cur_stream;
1432*76404edcSAsim Jamshed 	struct tcp_recv_vars *rcvvar;
1433*76404edcSAsim Jamshed 	int event_remaining, merged_len;
1434*76404edcSAsim Jamshed 	int ret;
1435*76404edcSAsim Jamshed 
1436*76404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
1437*76404edcSAsim Jamshed 	if (!mtcp) {
1438*76404edcSAsim Jamshed 		errno = EACCES;
1439*76404edcSAsim Jamshed 		return -1;
1440*76404edcSAsim Jamshed 	}
1441*76404edcSAsim Jamshed 
1442*76404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1443*76404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
1444*76404edcSAsim Jamshed 		errno = EBADF;
1445*76404edcSAsim Jamshed 		return -1;
1446*76404edcSAsim Jamshed 	}
1447*76404edcSAsim Jamshed 
1448*76404edcSAsim Jamshed 	socket = &mtcp->smap[sockid];
1449*76404edcSAsim Jamshed 	if (socket->socktype == MOS_SOCK_UNUSED) {
1450*76404edcSAsim Jamshed 		TRACE_API("Invalid socket id: %d\n", sockid);
1451*76404edcSAsim Jamshed 		errno = EBADF;
1452*76404edcSAsim Jamshed 		return -1;
1453*76404edcSAsim Jamshed 	}
1454*76404edcSAsim Jamshed 
1455*76404edcSAsim Jamshed 	if (socket->socktype == MOS_SOCK_PIPE) {
1456*76404edcSAsim Jamshed 		return PipeRead(mctx, sockid, buf, len);
1457*76404edcSAsim Jamshed 	}
1458*76404edcSAsim Jamshed 
1459*76404edcSAsim Jamshed 	if (socket->socktype != MOS_SOCK_STREAM) {
1460*76404edcSAsim Jamshed 		TRACE_API("Not an end socket. id: %d\n", sockid);
1461*76404edcSAsim Jamshed 		errno = ENOTSOCK;
1462*76404edcSAsim Jamshed 		return -1;
1463*76404edcSAsim Jamshed 	}
1464*76404edcSAsim Jamshed 
1465*76404edcSAsim Jamshed 	/* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
1466*76404edcSAsim Jamshed 	cur_stream = socket->stream;
1467*76404edcSAsim Jamshed 	if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf ||
1468*76404edcSAsim Jamshed 			!(cur_stream->state >= TCP_ST_ESTABLISHED &&
1469*76404edcSAsim Jamshed 			cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
1470*76404edcSAsim Jamshed 		errno = ENOTCONN;
1471*76404edcSAsim Jamshed 		return -1;
1472*76404edcSAsim Jamshed 	}
1473*76404edcSAsim Jamshed 
1474*76404edcSAsim Jamshed 	rcvvar = cur_stream->rcvvar;
1475*76404edcSAsim Jamshed 
1476*76404edcSAsim Jamshed #ifdef NEWRB
1477*76404edcSAsim Jamshed 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1478*76404edcSAsim Jamshed #else
1479*76404edcSAsim Jamshed 	merged_len = rcvvar->rcvbuf->merged_len;
1480*76404edcSAsim Jamshed #endif
1481*76404edcSAsim Jamshed 
1482*76404edcSAsim Jamshed 	/* if CLOSE_WAIT, return 0 if there is no payload */
1483*76404edcSAsim Jamshed 	if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
1484*76404edcSAsim Jamshed 		if (!rcvvar->rcvbuf)
1485*76404edcSAsim Jamshed 			return 0;
1486*76404edcSAsim Jamshed 
1487*76404edcSAsim Jamshed 		if (merged_len == 0)
1488*76404edcSAsim Jamshed 			return 0;
1489*76404edcSAsim Jamshed 	}
1490*76404edcSAsim Jamshed 
1491*76404edcSAsim Jamshed 	/* return EAGAIN if no receive buffer */
1492*76404edcSAsim Jamshed 	if (socket->opts & MTCP_NONBLOCK) {
1493*76404edcSAsim Jamshed 		if (!rcvvar->rcvbuf || merged_len == 0) {
1494*76404edcSAsim Jamshed 			errno = EAGAIN;
1495*76404edcSAsim Jamshed 			return -1;
1496*76404edcSAsim Jamshed 		}
1497*76404edcSAsim Jamshed 	}
1498*76404edcSAsim Jamshed 
1499*76404edcSAsim Jamshed 	SBUF_LOCK(&rcvvar->read_lock);
1500*76404edcSAsim Jamshed 
1501*76404edcSAsim Jamshed 	ret = CopyToUser(mtcp, cur_stream, buf, len);
1502*76404edcSAsim Jamshed 
1503*76404edcSAsim Jamshed #ifdef NEWRB
1504*76404edcSAsim Jamshed 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1505*76404edcSAsim Jamshed #else
1506*76404edcSAsim Jamshed 	merged_len = rcvvar->rcvbuf->merged_len;
1507*76404edcSAsim Jamshed #endif
1508*76404edcSAsim Jamshed 
1509*76404edcSAsim Jamshed 	event_remaining = FALSE;
1510*76404edcSAsim Jamshed 	/* if there are remaining payload, generate EPOLLIN */
1511*76404edcSAsim Jamshed 	/* (may due to insufficient user buffer) */
1512*76404edcSAsim Jamshed 	if (socket->epoll & MOS_EPOLLIN) {
1513*76404edcSAsim Jamshed 		if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
1514*76404edcSAsim Jamshed 			event_remaining = TRUE;
1515*76404edcSAsim Jamshed 		}
1516*76404edcSAsim Jamshed 	}
1517*76404edcSAsim Jamshed 	/* if waiting for close, notify it if no remaining data */
1518*76404edcSAsim Jamshed 	if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
1519*76404edcSAsim Jamshed 			merged_len == 0 && ret > 0) {
1520*76404edcSAsim Jamshed 		event_remaining = TRUE;
1521*76404edcSAsim Jamshed 	}
1522*76404edcSAsim Jamshed 
1523*76404edcSAsim Jamshed 	SBUF_UNLOCK(&rcvvar->read_lock);
1524*76404edcSAsim Jamshed 
1525*76404edcSAsim Jamshed 	if (event_remaining) {
1526*76404edcSAsim Jamshed 		if (socket->epoll) {
1527*76404edcSAsim Jamshed 			AddEpollEvent(mtcp->ep,
1528*76404edcSAsim Jamshed 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
1529*76404edcSAsim Jamshed 		}
1530*76404edcSAsim Jamshed 	}
1531*76404edcSAsim Jamshed 
1532*76404edcSAsim Jamshed 	TRACE_API("Stream %d: mtcp_read() returning %d\n", cur_stream->id, ret);
1533*76404edcSAsim Jamshed 	return ret;
1534*76404edcSAsim Jamshed }
1535*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
1536*76404edcSAsim Jamshed ssize_t
1537*76404edcSAsim Jamshed mtcp_readv(mctx_t mctx, int sockid, struct iovec *iov, int numIOV)
1538*76404edcSAsim Jamshed {
1539*76404edcSAsim Jamshed 	mtcp_manager_t mtcp;
1540*76404edcSAsim Jamshed 	socket_map_t socket;
1541*76404edcSAsim Jamshed 	tcp_stream *cur_stream;
1542*76404edcSAsim Jamshed 	struct tcp_recv_vars *rcvvar;
1543*76404edcSAsim Jamshed 	int ret, bytes_read, i;
1544*76404edcSAsim Jamshed 	int event_remaining, merged_len;
1545*76404edcSAsim Jamshed 
1546*76404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
1547*76404edcSAsim Jamshed 	if (!mtcp) {
1548*76404edcSAsim Jamshed 		errno = EACCES;
1549*76404edcSAsim Jamshed 		return -1;
1550*76404edcSAsim Jamshed 	}
1551*76404edcSAsim Jamshed 
1552*76404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1553*76404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
1554*76404edcSAsim Jamshed 		errno = EBADF;
1555*76404edcSAsim Jamshed 		return -1;
1556*76404edcSAsim Jamshed 	}
1557*76404edcSAsim Jamshed 
1558*76404edcSAsim Jamshed 	socket = &mtcp->smap[sockid];
1559*76404edcSAsim Jamshed 	if (socket->socktype == MOS_SOCK_UNUSED) {
1560*76404edcSAsim Jamshed 		TRACE_API("Invalid socket id: %d\n", sockid);
1561*76404edcSAsim Jamshed 		errno = EBADF;
1562*76404edcSAsim Jamshed 		return -1;
1563*76404edcSAsim Jamshed 	}
1564*76404edcSAsim Jamshed 
1565*76404edcSAsim Jamshed 	if (socket->socktype != MOS_SOCK_STREAM) {
1566*76404edcSAsim Jamshed 		TRACE_API("Not an end socket. id: %d\n", sockid);
1567*76404edcSAsim Jamshed 		errno = ENOTSOCK;
1568*76404edcSAsim Jamshed 		return -1;
1569*76404edcSAsim Jamshed 	}
1570*76404edcSAsim Jamshed 
1571*76404edcSAsim Jamshed 	/* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
1572*76404edcSAsim Jamshed 	cur_stream = socket->stream;
1573*76404edcSAsim Jamshed 	if (!cur_stream ||
1574*76404edcSAsim Jamshed 			!(cur_stream->state >= TCP_ST_ESTABLISHED &&
1575*76404edcSAsim Jamshed 			  cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
1576*76404edcSAsim Jamshed 		errno = ENOTCONN;
1577*76404edcSAsim Jamshed 		return -1;
1578*76404edcSAsim Jamshed 	}
1579*76404edcSAsim Jamshed 
1580*76404edcSAsim Jamshed 	rcvvar = cur_stream->rcvvar;
1581*76404edcSAsim Jamshed 
1582*76404edcSAsim Jamshed #ifdef NEWRB
1583*76404edcSAsim Jamshed 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1584*76404edcSAsim Jamshed #else
1585*76404edcSAsim Jamshed 	merged_len = rcvvar->rcvbuf->merged_len;
1586*76404edcSAsim Jamshed #endif
1587*76404edcSAsim Jamshed 
1588*76404edcSAsim Jamshed 	/* if CLOSE_WAIT, return 0 if there is no payload */
1589*76404edcSAsim Jamshed 	if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
1590*76404edcSAsim Jamshed 		if (!rcvvar->rcvbuf)
1591*76404edcSAsim Jamshed 			return 0;
1592*76404edcSAsim Jamshed 
1593*76404edcSAsim Jamshed 		if (merged_len == 0)
1594*76404edcSAsim Jamshed 			return 0;
1595*76404edcSAsim Jamshed 	}
1596*76404edcSAsim Jamshed 
1597*76404edcSAsim Jamshed 	/* return EAGAIN if no receive buffer */
1598*76404edcSAsim Jamshed 	if (socket->opts & MTCP_NONBLOCK) {
1599*76404edcSAsim Jamshed 		if (!rcvvar->rcvbuf || merged_len == 0) {
1600*76404edcSAsim Jamshed 			errno = EAGAIN;
1601*76404edcSAsim Jamshed 			return -1;
1602*76404edcSAsim Jamshed 		}
1603*76404edcSAsim Jamshed 	}
1604*76404edcSAsim Jamshed 
1605*76404edcSAsim Jamshed 	SBUF_LOCK(&rcvvar->read_lock);
1606*76404edcSAsim Jamshed 
1607*76404edcSAsim Jamshed 	/* read and store the contents to the vectored buffers */
1608*76404edcSAsim Jamshed 	bytes_read = 0;
1609*76404edcSAsim Jamshed 	for (i = 0; i < numIOV; i++) {
1610*76404edcSAsim Jamshed 		if (iov[i].iov_len <= 0)
1611*76404edcSAsim Jamshed 			continue;
1612*76404edcSAsim Jamshed 
1613*76404edcSAsim Jamshed 		ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
1614*76404edcSAsim Jamshed 		if (ret <= 0)
1615*76404edcSAsim Jamshed 			break;
1616*76404edcSAsim Jamshed 
1617*76404edcSAsim Jamshed 		bytes_read += ret;
1618*76404edcSAsim Jamshed 
1619*76404edcSAsim Jamshed 		if (ret < iov[i].iov_len)
1620*76404edcSAsim Jamshed 			break;
1621*76404edcSAsim Jamshed 	}
1622*76404edcSAsim Jamshed 
1623*76404edcSAsim Jamshed #ifdef NEWRB
1624*76404edcSAsim Jamshed 	merged_len = tcprb_cflen(rcvvar->rcvbuf);
1625*76404edcSAsim Jamshed #else
1626*76404edcSAsim Jamshed 	merged_len = rcvvar->rcvbuf->merged_len;
1627*76404edcSAsim Jamshed #endif
1628*76404edcSAsim Jamshed 
1629*76404edcSAsim Jamshed 	event_remaining = FALSE;
1630*76404edcSAsim Jamshed 	/* if there are remaining payload, generate read event */
1631*76404edcSAsim Jamshed 	/* (may due to insufficient user buffer) */
1632*76404edcSAsim Jamshed 	if (socket->epoll & MOS_EPOLLIN) {
1633*76404edcSAsim Jamshed 		if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
1634*76404edcSAsim Jamshed 			event_remaining = TRUE;
1635*76404edcSAsim Jamshed 		}
1636*76404edcSAsim Jamshed 	}
1637*76404edcSAsim Jamshed 	/* if waiting for close, notify it if no remaining data */
1638*76404edcSAsim Jamshed 	if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
1639*76404edcSAsim Jamshed 			merged_len == 0 && bytes_read > 0) {
1640*76404edcSAsim Jamshed 		event_remaining = TRUE;
1641*76404edcSAsim Jamshed 	}
1642*76404edcSAsim Jamshed 
1643*76404edcSAsim Jamshed 	SBUF_UNLOCK(&rcvvar->read_lock);
1644*76404edcSAsim Jamshed 
1645*76404edcSAsim Jamshed 	if(event_remaining) {
1646*76404edcSAsim Jamshed 		if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) {
1647*76404edcSAsim Jamshed 			AddEpollEvent(mtcp->ep,
1648*76404edcSAsim Jamshed 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
1649*76404edcSAsim Jamshed 		}
1650*76404edcSAsim Jamshed 	}
1651*76404edcSAsim Jamshed 
1652*76404edcSAsim Jamshed 	TRACE_API("Stream %d: mtcp_readv() returning %d\n",
1653*76404edcSAsim Jamshed 			cur_stream->id, bytes_read);
1654*76404edcSAsim Jamshed 	return bytes_read;
1655*76404edcSAsim Jamshed }
1656*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
1657*76404edcSAsim Jamshed static inline int
1658*76404edcSAsim Jamshed CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len)
1659*76404edcSAsim Jamshed {
1660*76404edcSAsim Jamshed 	struct tcp_send_vars *sndvar = cur_stream->sndvar;
1661*76404edcSAsim Jamshed 	int sndlen;
1662*76404edcSAsim Jamshed 	int ret;
1663*76404edcSAsim Jamshed 
1664*76404edcSAsim Jamshed 	sndlen = MIN((int)sndvar->snd_wnd, len);
1665*76404edcSAsim Jamshed 	if (sndlen <= 0) {
1666*76404edcSAsim Jamshed 		errno = EAGAIN;
1667*76404edcSAsim Jamshed 		return -1;
1668*76404edcSAsim Jamshed 	}
1669*76404edcSAsim Jamshed 
1670*76404edcSAsim Jamshed 	/* allocate send buffer if not exist */
1671*76404edcSAsim Jamshed 	if (!sndvar->sndbuf) {
1672*76404edcSAsim Jamshed 		sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1);
1673*76404edcSAsim Jamshed 		if (!sndvar->sndbuf) {
1674*76404edcSAsim Jamshed 			cur_stream->close_reason = TCP_NO_MEM;
1675*76404edcSAsim Jamshed 			/* notification may not required due to -1 return */
1676*76404edcSAsim Jamshed 			errno = ENOMEM;
1677*76404edcSAsim Jamshed 			return -1;
1678*76404edcSAsim Jamshed 		}
1679*76404edcSAsim Jamshed 	}
1680*76404edcSAsim Jamshed 
1681*76404edcSAsim Jamshed 	ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen);
1682*76404edcSAsim Jamshed 	assert(ret == sndlen);
1683*76404edcSAsim Jamshed 	sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len;
1684*76404edcSAsim Jamshed 	if (ret <= 0) {
1685*76404edcSAsim Jamshed 		TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n",
1686*76404edcSAsim Jamshed 				ret, sndlen, sndvar->sndbuf->len);
1687*76404edcSAsim Jamshed 		errno = EAGAIN;
1688*76404edcSAsim Jamshed 		return -1;
1689*76404edcSAsim Jamshed 	}
1690*76404edcSAsim Jamshed 
1691*76404edcSAsim Jamshed 	if (sndvar->snd_wnd <= 0) {
1692*76404edcSAsim Jamshed 		TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n",
1693*76404edcSAsim Jamshed 				cur_stream->id, sndvar->snd_wnd);
1694*76404edcSAsim Jamshed 	}
1695*76404edcSAsim Jamshed 
1696*76404edcSAsim Jamshed 	return ret;
1697*76404edcSAsim Jamshed }
1698*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
1699*76404edcSAsim Jamshed ssize_t
1700*76404edcSAsim Jamshed mtcp_write(mctx_t mctx, int sockid, char *buf, size_t len)
1701*76404edcSAsim Jamshed {
1702*76404edcSAsim Jamshed 	mtcp_manager_t mtcp;
1703*76404edcSAsim Jamshed 	socket_map_t socket;
1704*76404edcSAsim Jamshed 	tcp_stream *cur_stream;
1705*76404edcSAsim Jamshed 	struct tcp_send_vars *sndvar;
1706*76404edcSAsim Jamshed 	int ret;
1707*76404edcSAsim Jamshed 
1708*76404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
1709*76404edcSAsim Jamshed 	if (!mtcp) {
1710*76404edcSAsim Jamshed 		errno = EACCES;
1711*76404edcSAsim Jamshed 		return -1;
1712*76404edcSAsim Jamshed 	}
1713*76404edcSAsim Jamshed 
1714*76404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1715*76404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
1716*76404edcSAsim Jamshed 		errno = EBADF;
1717*76404edcSAsim Jamshed 		return -1;
1718*76404edcSAsim Jamshed 	}
1719*76404edcSAsim Jamshed 
1720*76404edcSAsim Jamshed 	socket = &mtcp->smap[sockid];
1721*76404edcSAsim Jamshed 	if (socket->socktype == MOS_SOCK_UNUSED) {
1722*76404edcSAsim Jamshed 		TRACE_API("Invalid socket id: %d\n", sockid);
1723*76404edcSAsim Jamshed 		errno = EBADF;
1724*76404edcSAsim Jamshed 		return -1;
1725*76404edcSAsim Jamshed 	}
1726*76404edcSAsim Jamshed 
1727*76404edcSAsim Jamshed 	if (socket->socktype == MOS_SOCK_PIPE) {
1728*76404edcSAsim Jamshed 		return PipeWrite(mctx, sockid, buf, len);
1729*76404edcSAsim Jamshed 	}
1730*76404edcSAsim Jamshed 
1731*76404edcSAsim Jamshed 	if (socket->socktype != MOS_SOCK_STREAM) {
1732*76404edcSAsim Jamshed 		TRACE_API("Not an end socket. id: %d\n", sockid);
1733*76404edcSAsim Jamshed 		errno = ENOTSOCK;
1734*76404edcSAsim Jamshed 		return -1;
1735*76404edcSAsim Jamshed 	}
1736*76404edcSAsim Jamshed 
1737*76404edcSAsim Jamshed 	cur_stream = socket->stream;
1738*76404edcSAsim Jamshed 	if (!cur_stream ||
1739*76404edcSAsim Jamshed 			!(cur_stream->state == TCP_ST_ESTABLISHED ||
1740*76404edcSAsim Jamshed 			  cur_stream->state == TCP_ST_CLOSE_WAIT)) {
1741*76404edcSAsim Jamshed 		errno = ENOTCONN;
1742*76404edcSAsim Jamshed 		return -1;
1743*76404edcSAsim Jamshed 	}
1744*76404edcSAsim Jamshed 
1745*76404edcSAsim Jamshed 	if (len <= 0) {
1746*76404edcSAsim Jamshed 		if (socket->opts & MTCP_NONBLOCK) {
1747*76404edcSAsim Jamshed 			errno = EAGAIN;
1748*76404edcSAsim Jamshed 			return -1;
1749*76404edcSAsim Jamshed 		} else {
1750*76404edcSAsim Jamshed 			return 0;
1751*76404edcSAsim Jamshed 		}
1752*76404edcSAsim Jamshed 	}
1753*76404edcSAsim Jamshed 
1754*76404edcSAsim Jamshed 	sndvar = cur_stream->sndvar;
1755*76404edcSAsim Jamshed 
1756*76404edcSAsim Jamshed 	SBUF_LOCK(&sndvar->write_lock);
1757*76404edcSAsim Jamshed 	ret = CopyFromUser(mtcp, cur_stream, buf, len);
1758*76404edcSAsim Jamshed 
1759*76404edcSAsim Jamshed 	SBUF_UNLOCK(&sndvar->write_lock);
1760*76404edcSAsim Jamshed 
1761*76404edcSAsim Jamshed 	if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
1762*76404edcSAsim Jamshed 		SQ_LOCK(&mtcp->ctx->sendq_lock);
1763*76404edcSAsim Jamshed 		sndvar->on_sendq = TRUE;
1764*76404edcSAsim Jamshed 		StreamEnqueue(mtcp->sendq, cur_stream);		/* this always success */
1765*76404edcSAsim Jamshed 		SQ_UNLOCK(&mtcp->ctx->sendq_lock);
1766*76404edcSAsim Jamshed 		mtcp->wakeup_flag = TRUE;
1767*76404edcSAsim Jamshed 	}
1768*76404edcSAsim Jamshed 
1769*76404edcSAsim Jamshed 	if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) {
1770*76404edcSAsim Jamshed 		ret = -1;
1771*76404edcSAsim Jamshed 		errno = EAGAIN;
1772*76404edcSAsim Jamshed 	}
1773*76404edcSAsim Jamshed 
1774*76404edcSAsim Jamshed 	/* if there are remaining sending buffer, generate write event */
1775*76404edcSAsim Jamshed 	if (sndvar->snd_wnd > 0) {
1776*76404edcSAsim Jamshed 		if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
1777*76404edcSAsim Jamshed 			AddEpollEvent(mtcp->ep,
1778*76404edcSAsim Jamshed 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
1779*76404edcSAsim Jamshed 		}
1780*76404edcSAsim Jamshed 	}
1781*76404edcSAsim Jamshed 
1782*76404edcSAsim Jamshed 	TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret);
1783*76404edcSAsim Jamshed 	return ret;
1784*76404edcSAsim Jamshed }
1785*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
1786*76404edcSAsim Jamshed ssize_t
1787*76404edcSAsim Jamshed mtcp_writev(mctx_t mctx, int sockid, struct iovec *iov, int numIOV)
1788*76404edcSAsim Jamshed {
1789*76404edcSAsim Jamshed 	mtcp_manager_t mtcp;
1790*76404edcSAsim Jamshed 	socket_map_t socket;
1791*76404edcSAsim Jamshed 	tcp_stream *cur_stream;
1792*76404edcSAsim Jamshed 	struct tcp_send_vars *sndvar;
1793*76404edcSAsim Jamshed 	int ret, to_write, i;
1794*76404edcSAsim Jamshed 
1795*76404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
1796*76404edcSAsim Jamshed 	if (!mtcp) {
1797*76404edcSAsim Jamshed 		errno = EACCES;
1798*76404edcSAsim Jamshed 		return -1;
1799*76404edcSAsim Jamshed 	}
1800*76404edcSAsim Jamshed 
1801*76404edcSAsim Jamshed 	if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1802*76404edcSAsim Jamshed 		TRACE_API("Socket id %d out of range.\n", sockid);
1803*76404edcSAsim Jamshed 		errno = EBADF;
1804*76404edcSAsim Jamshed 		return -1;
1805*76404edcSAsim Jamshed 	}
1806*76404edcSAsim Jamshed 
1807*76404edcSAsim Jamshed 	socket = &mtcp->smap[sockid];
1808*76404edcSAsim Jamshed 	if (socket->socktype == MOS_SOCK_UNUSED) {
1809*76404edcSAsim Jamshed 		TRACE_API("Invalid socket id: %d\n", sockid);
1810*76404edcSAsim Jamshed 		errno = EBADF;
1811*76404edcSAsim Jamshed 		return -1;
1812*76404edcSAsim Jamshed 	}
1813*76404edcSAsim Jamshed 
1814*76404edcSAsim Jamshed 	if (socket->socktype != MOS_SOCK_STREAM) {
1815*76404edcSAsim Jamshed 		TRACE_API("Not an end socket. id: %d\n", sockid);
1816*76404edcSAsim Jamshed 		errno = ENOTSOCK;
1817*76404edcSAsim Jamshed 		return -1;
1818*76404edcSAsim Jamshed 	}
1819*76404edcSAsim Jamshed 
1820*76404edcSAsim Jamshed 	cur_stream = socket->stream;
1821*76404edcSAsim Jamshed 	if (!cur_stream ||
1822*76404edcSAsim Jamshed 			!(cur_stream->state == TCP_ST_ESTABLISHED ||
1823*76404edcSAsim Jamshed 			  cur_stream->state == TCP_ST_CLOSE_WAIT)) {
1824*76404edcSAsim Jamshed 		errno = ENOTCONN;
1825*76404edcSAsim Jamshed 		return -1;
1826*76404edcSAsim Jamshed 	}
1827*76404edcSAsim Jamshed 
1828*76404edcSAsim Jamshed 	sndvar = cur_stream->sndvar;
1829*76404edcSAsim Jamshed 	SBUF_LOCK(&sndvar->write_lock);
1830*76404edcSAsim Jamshed 
1831*76404edcSAsim Jamshed 	/* write from the vectored buffers */
1832*76404edcSAsim Jamshed 	to_write = 0;
1833*76404edcSAsim Jamshed 	for (i = 0; i < numIOV; i++) {
1834*76404edcSAsim Jamshed 		if (iov[i].iov_len <= 0)
1835*76404edcSAsim Jamshed 			continue;
1836*76404edcSAsim Jamshed 
1837*76404edcSAsim Jamshed 		ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
1838*76404edcSAsim Jamshed 		if (ret <= 0)
1839*76404edcSAsim Jamshed 			break;
1840*76404edcSAsim Jamshed 
1841*76404edcSAsim Jamshed 		to_write += ret;
1842*76404edcSAsim Jamshed 
1843*76404edcSAsim Jamshed 		if (ret < iov[i].iov_len)
1844*76404edcSAsim Jamshed 			break;
1845*76404edcSAsim Jamshed 	}
1846*76404edcSAsim Jamshed 	SBUF_UNLOCK(&sndvar->write_lock);
1847*76404edcSAsim Jamshed 
1848*76404edcSAsim Jamshed 	if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
1849*76404edcSAsim Jamshed 		SQ_LOCK(&mtcp->ctx->sendq_lock);
1850*76404edcSAsim Jamshed 		sndvar->on_sendq = TRUE;
1851*76404edcSAsim Jamshed 		StreamEnqueue(mtcp->sendq, cur_stream);		/* this always success */
1852*76404edcSAsim Jamshed 		SQ_UNLOCK(&mtcp->ctx->sendq_lock);
1853*76404edcSAsim Jamshed 		mtcp->wakeup_flag = TRUE;
1854*76404edcSAsim Jamshed 	}
1855*76404edcSAsim Jamshed 
1856*76404edcSAsim Jamshed 	if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) {
1857*76404edcSAsim Jamshed 		to_write = -1;
1858*76404edcSAsim Jamshed 		errno = EAGAIN;
1859*76404edcSAsim Jamshed 	}
1860*76404edcSAsim Jamshed 
1861*76404edcSAsim Jamshed 	/* if there are remaining sending buffer, generate write event */
1862*76404edcSAsim Jamshed 	if (sndvar->snd_wnd > 0) {
1863*76404edcSAsim Jamshed 		if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
1864*76404edcSAsim Jamshed 			AddEpollEvent(mtcp->ep,
1865*76404edcSAsim Jamshed 				      USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
1866*76404edcSAsim Jamshed 		}
1867*76404edcSAsim Jamshed 	}
1868*76404edcSAsim Jamshed 
1869*76404edcSAsim Jamshed 	TRACE_API("Stream %d: mtcp_writev() returning %d\n",
1870*76404edcSAsim Jamshed 			cur_stream->id, to_write);
1871*76404edcSAsim Jamshed 	return to_write;
1872*76404edcSAsim Jamshed }
1873*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
1874*76404edcSAsim Jamshed uint32_t
1875*76404edcSAsim Jamshed mtcp_get_connection_cnt(mctx_t mctx)
1876*76404edcSAsim Jamshed {
1877*76404edcSAsim Jamshed 	mtcp_manager_t mtcp;
1878*76404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
1879*76404edcSAsim Jamshed 	if (!mtcp) {
1880*76404edcSAsim Jamshed 		errno = EACCES;
1881*76404edcSAsim Jamshed 		return -1;
1882*76404edcSAsim Jamshed 	}
1883*76404edcSAsim Jamshed 
1884*76404edcSAsim Jamshed 	if (mtcp->num_msp > 0)
1885*76404edcSAsim Jamshed 		return mtcp->flow_cnt / 2;
1886*76404edcSAsim Jamshed 	else
1887*76404edcSAsim Jamshed 		return mtcp->flow_cnt;
1888*76404edcSAsim Jamshed }
1889*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
1890