176404edcSAsim Jamshed #include <sys/queue.h>
276404edcSAsim Jamshed #include <sys/ioctl.h>
376404edcSAsim Jamshed #include <limits.h>
476404edcSAsim Jamshed #include <unistd.h>
576404edcSAsim Jamshed #include <assert.h>
676404edcSAsim Jamshed #include <string.h>
776404edcSAsim Jamshed
876404edcSAsim Jamshed #ifdef DARWIN
976404edcSAsim Jamshed #include <netinet/tcp.h>
1076404edcSAsim Jamshed #include <netinet/ip.h>
1176404edcSAsim Jamshed #include <netinet/if_ether.h>
1276404edcSAsim Jamshed #endif
1376404edcSAsim Jamshed
1476404edcSAsim Jamshed #include "mtcp.h"
1576404edcSAsim Jamshed #include "mtcp_api.h"
1676404edcSAsim Jamshed #include "tcp_in.h"
1776404edcSAsim Jamshed #include "tcp_stream.h"
1876404edcSAsim Jamshed #include "tcp_out.h"
1976404edcSAsim Jamshed #include "ip_out.h"
2076404edcSAsim Jamshed #include "eventpoll.h"
2176404edcSAsim Jamshed #include "pipe.h"
2276404edcSAsim Jamshed #include "fhash.h"
2376404edcSAsim Jamshed #include "addr_pool.h"
24152f7c19SAsim Jamshed #include "util.h"
2576404edcSAsim Jamshed #include "config.h"
2676404edcSAsim Jamshed #include "debug.h"
2776404edcSAsim Jamshed #include "eventpoll.h"
2876404edcSAsim Jamshed #include "mos_api.h"
2976404edcSAsim Jamshed #include "tcp_rb.h"
3076404edcSAsim Jamshed
3176404edcSAsim Jamshed #define MAX(a, b) ((a)>(b)?(a):(b))
3276404edcSAsim Jamshed #define MIN(a, b) ((a)<(b)?(a):(b))
3376404edcSAsim Jamshed
3476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
3576404edcSAsim Jamshed /** Stop monitoring the socket! (function prototype)
3676404edcSAsim Jamshed * @param [in] mctx: mtcp context
3776404edcSAsim Jamshed * @param [in] sock: monitoring stream socket id
3876404edcSAsim Jamshed * @param [in] side: side of monitoring (client side, server side or both)
3976404edcSAsim Jamshed *
4076404edcSAsim Jamshed * This function is now DEPRECATED and is only used within mOS core...
4176404edcSAsim Jamshed */
4276404edcSAsim Jamshed int
4376404edcSAsim Jamshed mtcp_cb_stop(mctx_t mctx, int sock, int side);
4476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
4576404edcSAsim Jamshed /** Reset the connection (send RST packets to both sides)
4676404edcSAsim Jamshed * (We need to decide the API for this.)
4776404edcSAsim Jamshed */
4876404edcSAsim Jamshed //int
4976404edcSAsim Jamshed //mtcp_cb_reset(mctx_t mctx, int sock, int side);
5076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
5176404edcSAsim Jamshed inline mtcp_manager_t
GetMTCPManager(mctx_t mctx)5276404edcSAsim Jamshed GetMTCPManager(mctx_t mctx)
5376404edcSAsim Jamshed {
5476404edcSAsim Jamshed if (!mctx) {
5576404edcSAsim Jamshed errno = EACCES;
5676404edcSAsim Jamshed return NULL;
5776404edcSAsim Jamshed }
5876404edcSAsim Jamshed
5976404edcSAsim Jamshed if (mctx->cpu < 0 || mctx->cpu >= num_cpus) {
6076404edcSAsim Jamshed errno = EINVAL;
6176404edcSAsim Jamshed return NULL;
6276404edcSAsim Jamshed }
6376404edcSAsim Jamshed
644cb4e140SAsim Jamshed if (!g_mtcp[mctx->cpu] || g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) {
6576404edcSAsim Jamshed errno = EPERM;
6676404edcSAsim Jamshed return NULL;
6776404edcSAsim Jamshed }
6876404edcSAsim Jamshed
6976404edcSAsim Jamshed return g_mtcp[mctx->cpu];
7076404edcSAsim Jamshed }
7176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
72a5e1a556SAsim Jamshed static inline int
GetSocketError(socket_map_t socket,void * optval,socklen_t * optlen)7376404edcSAsim Jamshed GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen)
7476404edcSAsim Jamshed {
7576404edcSAsim Jamshed tcp_stream *cur_stream;
7676404edcSAsim Jamshed
7776404edcSAsim Jamshed if (!socket->stream) {
7876404edcSAsim Jamshed errno = EBADF;
7976404edcSAsim Jamshed return -1;
8076404edcSAsim Jamshed }
8176404edcSAsim Jamshed
8276404edcSAsim Jamshed cur_stream = socket->stream;
8376404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
8476404edcSAsim Jamshed if (cur_stream->close_reason == TCP_TIMEDOUT ||
8576404edcSAsim Jamshed cur_stream->close_reason == TCP_CONN_FAIL ||
8676404edcSAsim Jamshed cur_stream->close_reason == TCP_CONN_LOST) {
8776404edcSAsim Jamshed *(int *)optval = ETIMEDOUT;
8876404edcSAsim Jamshed *optlen = sizeof(int);
8976404edcSAsim Jamshed
9076404edcSAsim Jamshed return 0;
9176404edcSAsim Jamshed }
9276404edcSAsim Jamshed }
9376404edcSAsim Jamshed
9476404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT ||
9576404edcSAsim Jamshed cur_stream->state == TCP_ST_CLOSED_RSVD) {
9676404edcSAsim Jamshed if (cur_stream->close_reason == TCP_RESET) {
9776404edcSAsim Jamshed *(int *)optval = ECONNRESET;
9876404edcSAsim Jamshed *optlen = sizeof(int);
9976404edcSAsim Jamshed
10076404edcSAsim Jamshed return 0;
10176404edcSAsim Jamshed }
10276404edcSAsim Jamshed }
10376404edcSAsim Jamshed
104a5e1a556SAsim Jamshed if (cur_stream->state == TCP_ST_SYN_SENT &&
105a5e1a556SAsim Jamshed errno == EINPROGRESS) {
106a5e1a556SAsim Jamshed *(int *)optval = errno;
107a5e1a556SAsim Jamshed *optlen = sizeof(int);
108a5e1a556SAsim Jamshed
109a5e1a556SAsim Jamshed return -1;
110a5e1a556SAsim Jamshed }
111a5e1a556SAsim Jamshed
112265cb675SAsim Jamshed /*
113265cb675SAsim Jamshed * `base case`: If socket sees no so_error, then
114265cb675SAsim Jamshed * this also means close_reason will always be
115265cb675SAsim Jamshed * TCP_NOT_CLOSED.
116265cb675SAsim Jamshed */
117265cb675SAsim Jamshed if (cur_stream->close_reason == TCP_NOT_CLOSED) {
118265cb675SAsim Jamshed *(int *)optval = 0;
119265cb675SAsim Jamshed *optlen = sizeof(int);
120265cb675SAsim Jamshed
121265cb675SAsim Jamshed return 0;
122265cb675SAsim Jamshed }
123265cb675SAsim Jamshed
12476404edcSAsim Jamshed errno = ENOSYS;
12576404edcSAsim Jamshed return -1;
12676404edcSAsim Jamshed }
12776404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
12876404edcSAsim Jamshed int
mtcp_getsockname(mctx_t mctx,int sockid,struct sockaddr * addr,socklen_t * addrlen)129a5e1a556SAsim Jamshed mtcp_getsockname(mctx_t mctx, int sockid, struct sockaddr *addr,
130a5e1a556SAsim Jamshed socklen_t *addrlen)
131a5e1a556SAsim Jamshed {
132a5e1a556SAsim Jamshed mtcp_manager_t mtcp;
133a5e1a556SAsim Jamshed socket_map_t socket;
134a5e1a556SAsim Jamshed
135a5e1a556SAsim Jamshed mtcp = GetMTCPManager(mctx);
136a5e1a556SAsim Jamshed if (!mtcp) {
137a5e1a556SAsim Jamshed return -1;
138a5e1a556SAsim Jamshed }
139a5e1a556SAsim Jamshed
140a5e1a556SAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
141a5e1a556SAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid);
142a5e1a556SAsim Jamshed errno = EBADF;
143a5e1a556SAsim Jamshed return -1;
144a5e1a556SAsim Jamshed }
145a5e1a556SAsim Jamshed
146a5e1a556SAsim Jamshed socket = &mtcp->smap[sockid];
147a5e1a556SAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) {
148a5e1a556SAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid);
149a5e1a556SAsim Jamshed errno = EBADF;
150a5e1a556SAsim Jamshed return -1;
151a5e1a556SAsim Jamshed }
152a5e1a556SAsim Jamshed
153a5e1a556SAsim Jamshed if (*addrlen <= 0) {
154a5e1a556SAsim Jamshed TRACE_API("Invalid addrlen: %d\n", *addrlen);
155a5e1a556SAsim Jamshed errno = EINVAL;
156a5e1a556SAsim Jamshed return -1;
157a5e1a556SAsim Jamshed }
158a5e1a556SAsim Jamshed
159a5e1a556SAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
160a5e1a556SAsim Jamshed socket->socktype != MOS_SOCK_STREAM) {
161a5e1a556SAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid);
162a5e1a556SAsim Jamshed errno = ENOTSOCK;
163a5e1a556SAsim Jamshed return -1;
164a5e1a556SAsim Jamshed }
165a5e1a556SAsim Jamshed
166a5e1a556SAsim Jamshed *(struct sockaddr_in *)addr = socket->saddr;
167a5e1a556SAsim Jamshed *addrlen = sizeof(socket->saddr);
168a5e1a556SAsim Jamshed
169a5e1a556SAsim Jamshed return 0;
170a5e1a556SAsim Jamshed }
171a5e1a556SAsim Jamshed /*----------------------------------------------------------------------------*/
172a5e1a556SAsim Jamshed int
mtcp_getsockopt(mctx_t mctx,int sockid,int level,int optname,void * optval,socklen_t * optlen)17376404edcSAsim Jamshed mtcp_getsockopt(mctx_t mctx, int sockid, int level,
17476404edcSAsim Jamshed int optname, void *optval, socklen_t *optlen)
17576404edcSAsim Jamshed {
17676404edcSAsim Jamshed mtcp_manager_t mtcp;
17776404edcSAsim Jamshed socket_map_t socket;
17876404edcSAsim Jamshed
17976404edcSAsim Jamshed mtcp = GetMTCPManager(mctx);
18076404edcSAsim Jamshed if (!mtcp) {
18176404edcSAsim Jamshed errno = EACCES;
18276404edcSAsim Jamshed return -1;
18376404edcSAsim Jamshed }
18476404edcSAsim Jamshed
18576404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
18676404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid);
18776404edcSAsim Jamshed errno = EBADF;
18876404edcSAsim Jamshed return -1;
18976404edcSAsim Jamshed }
19076404edcSAsim Jamshed
19176404edcSAsim Jamshed switch (level) {
19276404edcSAsim Jamshed case SOL_SOCKET:
19376404edcSAsim Jamshed socket = &mtcp->smap[sockid];
19476404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) {
19576404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid);
19676404edcSAsim Jamshed errno = EBADF;
19776404edcSAsim Jamshed return -1;
19876404edcSAsim Jamshed }
19976404edcSAsim Jamshed
20076404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
20176404edcSAsim Jamshed socket->socktype != MOS_SOCK_STREAM) {
20276404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid);
20376404edcSAsim Jamshed errno = ENOTSOCK;
20476404edcSAsim Jamshed return -1;
20576404edcSAsim Jamshed }
20676404edcSAsim Jamshed
20776404edcSAsim Jamshed if (optname == SO_ERROR) {
20876404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_STREAM) {
20976404edcSAsim Jamshed return GetSocketError(socket, optval, optlen);
21076404edcSAsim Jamshed }
21176404edcSAsim Jamshed }
21276404edcSAsim Jamshed break;
21376404edcSAsim Jamshed case SOL_MONSOCKET:
21476404edcSAsim Jamshed /* check if the calling thread is in MOS context */
21576404edcSAsim Jamshed if (mtcp->ctx->thread != pthread_self()) {
21676404edcSAsim Jamshed errno = EPERM;
21776404edcSAsim Jamshed return -1;
21876404edcSAsim Jamshed }
21976404edcSAsim Jamshed /*
22076404edcSAsim Jamshed * All options will only work for active
22176404edcSAsim Jamshed * monitor stream sockets
22276404edcSAsim Jamshed */
22376404edcSAsim Jamshed socket = &mtcp->msmap[sockid];
22476404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
22576404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid);
22676404edcSAsim Jamshed errno = ENOTSOCK;
22776404edcSAsim Jamshed return -1;
22876404edcSAsim Jamshed }
22976404edcSAsim Jamshed
23076404edcSAsim Jamshed switch (optname) {
23176404edcSAsim Jamshed case MOS_FRAGINFO_CLIBUF:
23276404edcSAsim Jamshed return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen);
23376404edcSAsim Jamshed case MOS_FRAGINFO_SVRBUF:
23476404edcSAsim Jamshed return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen);
23576404edcSAsim Jamshed case MOS_INFO_CLIBUF:
23676404edcSAsim Jamshed return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen);
23776404edcSAsim Jamshed case MOS_INFO_SVRBUF:
23876404edcSAsim Jamshed return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen);
23976404edcSAsim Jamshed case MOS_TCP_STATE_CLI:
24076404edcSAsim Jamshed return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI,
24176404edcSAsim Jamshed optval, optlen);
24276404edcSAsim Jamshed case MOS_TCP_STATE_SVR:
24376404edcSAsim Jamshed return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR,
24476404edcSAsim Jamshed optval, optlen);
24576404edcSAsim Jamshed case MOS_TIMESTAMP:
24676404edcSAsim Jamshed return GetLastTimestamp(socket->monitor_stream->stream,
24776404edcSAsim Jamshed (uint32_t *)optval,
24876404edcSAsim Jamshed optlen);
24976404edcSAsim Jamshed default:
25076404edcSAsim Jamshed TRACE_API("can't recognize the optname=%d\n", optname);
25176404edcSAsim Jamshed assert(0);
25276404edcSAsim Jamshed }
25376404edcSAsim Jamshed break;
25476404edcSAsim Jamshed }
25576404edcSAsim Jamshed errno = ENOSYS;
25676404edcSAsim Jamshed return -1;
25776404edcSAsim Jamshed }
25876404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
25976404edcSAsim Jamshed int
mtcp_setsockopt(mctx_t mctx,int sockid,int level,int optname,const void * optval,socklen_t optlen)26076404edcSAsim Jamshed mtcp_setsockopt(mctx_t mctx, int sockid, int level,
26176404edcSAsim Jamshed int optname, const void *optval, socklen_t optlen)
26276404edcSAsim Jamshed {
26376404edcSAsim Jamshed mtcp_manager_t mtcp;
26476404edcSAsim Jamshed socket_map_t socket;
26576404edcSAsim Jamshed tcprb_t *rb;
26676404edcSAsim Jamshed
26776404edcSAsim Jamshed mtcp = GetMTCPManager(mctx);
26876404edcSAsim Jamshed if (!mtcp) {
26976404edcSAsim Jamshed errno = EACCES;
27076404edcSAsim Jamshed return -1;
27176404edcSAsim Jamshed }
27276404edcSAsim Jamshed
27376404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
27476404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid);
27576404edcSAsim Jamshed errno = EBADF;
27676404edcSAsim Jamshed return -1;
27776404edcSAsim Jamshed }
27876404edcSAsim Jamshed
27976404edcSAsim Jamshed switch (level) {
28076404edcSAsim Jamshed case SOL_SOCKET:
28176404edcSAsim Jamshed socket = &mtcp->smap[sockid];
28276404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) {
28376404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid);
28476404edcSAsim Jamshed errno = EBADF;
28576404edcSAsim Jamshed return -1;
28676404edcSAsim Jamshed }
28776404edcSAsim Jamshed
28876404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
28976404edcSAsim Jamshed socket->socktype != MOS_SOCK_STREAM) {
29076404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid);
29176404edcSAsim Jamshed errno = ENOTSOCK;
29276404edcSAsim Jamshed return -1;
29376404edcSAsim Jamshed }
29476404edcSAsim Jamshed break;
29576404edcSAsim Jamshed case SOL_MONSOCKET:
29676404edcSAsim Jamshed socket = &mtcp->msmap[sockid];
29776404edcSAsim Jamshed /*
29876404edcSAsim Jamshed * checking of calling thread to be in MOS context is
29976404edcSAsim Jamshed * disabled since both optnames can be called from
30076404edcSAsim Jamshed * `application' context (on passive sockets)
30176404edcSAsim Jamshed */
30276404edcSAsim Jamshed /*
30376404edcSAsim Jamshed * if (mtcp->ctx->thread != pthread_self())
30476404edcSAsim Jamshed * return -1;
30576404edcSAsim Jamshed */
30676404edcSAsim Jamshed
30776404edcSAsim Jamshed switch (optname) {
308a834ea89SAsim Jamshed case MOS_CLIOVERLAP:
309a834ea89SAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
310a834ea89SAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf :
311a834ea89SAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
312a834ea89SAsim Jamshed if (rb == NULL) {
313a834ea89SAsim Jamshed errno = EFAULT;
314a834ea89SAsim Jamshed return -1;
315a834ea89SAsim Jamshed }
316a834ea89SAsim Jamshed if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) {
317a834ea89SAsim Jamshed errno = EINVAL;
318a834ea89SAsim Jamshed return -1;
319a834ea89SAsim Jamshed } else
320a834ea89SAsim Jamshed return 0;
321a834ea89SAsim Jamshed break;
322a834ea89SAsim Jamshed case MOS_SVROVERLAP:
323a834ea89SAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
324a834ea89SAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf :
325a834ea89SAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
326a834ea89SAsim Jamshed if (rb == NULL) {
327a834ea89SAsim Jamshed errno = EFAULT;
328a834ea89SAsim Jamshed return -1;
329a834ea89SAsim Jamshed }
330a834ea89SAsim Jamshed if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) {
331a834ea89SAsim Jamshed errno = EINVAL;
332a834ea89SAsim Jamshed return -1;
333a834ea89SAsim Jamshed } else
334a834ea89SAsim Jamshed return 0;
335a834ea89SAsim Jamshed break;
33676404edcSAsim Jamshed case MOS_CLIBUF:
33776404edcSAsim Jamshed #if 0
33876404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
33976404edcSAsim Jamshed errno = EBADF;
34076404edcSAsim Jamshed return -1;
34176404edcSAsim Jamshed }
34276404edcSAsim Jamshed #endif
34376404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE
34476404edcSAsim Jamshed if (*(int *)optval != 0)
34576404edcSAsim Jamshed return -1;
34676404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
34776404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
34876404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf :
34976404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
35076404edcSAsim Jamshed if (rb) {
35176404edcSAsim Jamshed tcprb_resize_meta(rb, 0);
35276404edcSAsim Jamshed tcprb_resize(rb, 0);
35376404edcSAsim Jamshed }
35476404edcSAsim Jamshed }
35576404edcSAsim Jamshed return DisableBuf(socket, MOS_SIDE_CLI);
35676404edcSAsim Jamshed #else
35776404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
35876404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf :
35976404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
36076404edcSAsim Jamshed if (tcprb_resize_meta(rb, *(int *)optval) < 0)
36176404edcSAsim Jamshed return -1;
36276404edcSAsim Jamshed return tcprb_resize(rb,
36376404edcSAsim Jamshed (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
36476404edcSAsim Jamshed #endif
36576404edcSAsim Jamshed case MOS_SVRBUF:
36676404edcSAsim Jamshed #if 0
36776404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
36876404edcSAsim Jamshed errno = EBADF;
36976404edcSAsim Jamshed return -1;
37076404edcSAsim Jamshed }
37176404edcSAsim Jamshed #endif
37276404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE
37376404edcSAsim Jamshed if (*(int *)optval != 0)
37476404edcSAsim Jamshed return -1;
37576404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
37676404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
37776404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf :
37876404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
37976404edcSAsim Jamshed if (rb) {
38076404edcSAsim Jamshed tcprb_resize_meta(rb, 0);
38176404edcSAsim Jamshed tcprb_resize(rb, 0);
38276404edcSAsim Jamshed }
38376404edcSAsim Jamshed }
38476404edcSAsim Jamshed return DisableBuf(socket, MOS_SIDE_SVR);
38576404edcSAsim Jamshed #else
38676404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
38776404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf :
38876404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
38976404edcSAsim Jamshed if (tcprb_resize_meta(rb, *(int *)optval) < 0)
39076404edcSAsim Jamshed return -1;
39176404edcSAsim Jamshed return tcprb_resize(rb,
39276404edcSAsim Jamshed (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
39376404edcSAsim Jamshed #endif
39476404edcSAsim Jamshed case MOS_FRAG_CLIBUF:
39576404edcSAsim Jamshed #if 0
39676404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
39776404edcSAsim Jamshed errno = EBADF;
39876404edcSAsim Jamshed return -1;
39976404edcSAsim Jamshed }
40076404edcSAsim Jamshed #endif
40176404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE
40276404edcSAsim Jamshed if (*(int *)optval != 0)
40376404edcSAsim Jamshed return -1;
40476404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
40576404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
40676404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf :
40776404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
40876404edcSAsim Jamshed if (rb)
40976404edcSAsim Jamshed tcprb_resize(rb, 0);
41076404edcSAsim Jamshed }
41176404edcSAsim Jamshed return 0;
41276404edcSAsim Jamshed #else
41376404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
41476404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf :
41576404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
41676404edcSAsim Jamshed if (rb->len == 0)
41776404edcSAsim Jamshed return tcprb_resize_meta(rb, *(int *)optval);
41876404edcSAsim Jamshed else
41976404edcSAsim Jamshed return -1;
42076404edcSAsim Jamshed #endif
42176404edcSAsim Jamshed case MOS_FRAG_SVRBUF:
42276404edcSAsim Jamshed #if 0
42376404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
42476404edcSAsim Jamshed errno = EBADF;
42576404edcSAsim Jamshed return -1;
42676404edcSAsim Jamshed }
42776404edcSAsim Jamshed #endif
42876404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE
42976404edcSAsim Jamshed if (*(int *)optval != 0)
43076404edcSAsim Jamshed return -1;
43176404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
43276404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
43376404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf :
43476404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
43576404edcSAsim Jamshed if (rb)
43676404edcSAsim Jamshed tcprb_resize(rb, 0);
43776404edcSAsim Jamshed }
43876404edcSAsim Jamshed return 0;
43976404edcSAsim Jamshed #else
44076404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
44176404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf :
44276404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
44376404edcSAsim Jamshed if (rb->len == 0)
44476404edcSAsim Jamshed return tcprb_resize_meta(rb, *(int *)optval);
44576404edcSAsim Jamshed else
44676404edcSAsim Jamshed return -1;
44776404edcSAsim Jamshed #endif
44876404edcSAsim Jamshed case MOS_SEQ_REMAP:
449861ea7dfSAsim Jamshed break;
45076404edcSAsim Jamshed case MOS_STOP_MON:
45176404edcSAsim Jamshed return mtcp_cb_stop(mctx, sockid, *(int *)optval);
45276404edcSAsim Jamshed default:
45376404edcSAsim Jamshed TRACE_API("invalid optname=%d\n", optname);
45476404edcSAsim Jamshed assert(0);
45576404edcSAsim Jamshed }
45676404edcSAsim Jamshed break;
45776404edcSAsim Jamshed }
45876404edcSAsim Jamshed
45976404edcSAsim Jamshed errno = ENOSYS;
46076404edcSAsim Jamshed return -1;
46176404edcSAsim Jamshed }
46276404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
46376404edcSAsim Jamshed int
mtcp_setsock_nonblock(mctx_t mctx,int sockid)46476404edcSAsim Jamshed mtcp_setsock_nonblock(mctx_t mctx, int sockid)
46576404edcSAsim Jamshed {
46676404edcSAsim Jamshed mtcp_manager_t mtcp;
46776404edcSAsim Jamshed
46876404edcSAsim Jamshed mtcp = GetMTCPManager(mctx);
46976404edcSAsim Jamshed if (!mtcp) {
47076404edcSAsim Jamshed errno = EACCES;
47176404edcSAsim Jamshed return -1;
47276404edcSAsim Jamshed }
47376404edcSAsim Jamshed
47476404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
47576404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid);
47676404edcSAsim Jamshed errno = EBADF;
47776404edcSAsim Jamshed return -1;
47876404edcSAsim Jamshed }
47976404edcSAsim Jamshed
48076404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
48176404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid);
48276404edcSAsim Jamshed errno = EBADF;
48376404edcSAsim Jamshed return -1;
48476404edcSAsim Jamshed }
48576404edcSAsim Jamshed
48676404edcSAsim Jamshed mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
48776404edcSAsim Jamshed
48876404edcSAsim Jamshed return 0;
48976404edcSAsim Jamshed }
49076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
49176404edcSAsim Jamshed int
mtcp_ioctl(mctx_t mctx,int sockid,int request,void * argp)49276404edcSAsim Jamshed mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp)
49376404edcSAsim Jamshed {
49476404edcSAsim Jamshed mtcp_manager_t mtcp;
49576404edcSAsim Jamshed socket_map_t socket;
49676404edcSAsim Jamshed
49776404edcSAsim Jamshed mtcp = GetMTCPManager(mctx);
49876404edcSAsim Jamshed if (!mtcp) {
49976404edcSAsim Jamshed errno = EACCES;
50076404edcSAsim Jamshed return -1;
50176404edcSAsim Jamshed }
50276404edcSAsim Jamshed
50376404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
50476404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid);
50576404edcSAsim Jamshed errno = EBADF;
50676404edcSAsim Jamshed return -1;
50776404edcSAsim Jamshed }
50876404edcSAsim Jamshed
50976404edcSAsim Jamshed /* only support stream socket */
51076404edcSAsim Jamshed socket = &mtcp->smap[sockid];
51176404edcSAsim Jamshed
51276404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
51376404edcSAsim Jamshed socket->socktype != MOS_SOCK_STREAM) {
51476404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid);
51576404edcSAsim Jamshed errno = EBADF;
51676404edcSAsim Jamshed return -1;
51776404edcSAsim Jamshed }
51876404edcSAsim Jamshed
51976404edcSAsim Jamshed if (!argp) {
52076404edcSAsim Jamshed errno = EFAULT;
52176404edcSAsim Jamshed return -1;
52276404edcSAsim Jamshed }
52376404edcSAsim Jamshed
52476404edcSAsim Jamshed if (request == FIONREAD) {
52576404edcSAsim Jamshed tcp_stream *cur_stream;
52676404edcSAsim Jamshed tcprb_t *rbuf;
52776404edcSAsim Jamshed
52876404edcSAsim Jamshed cur_stream = socket->stream;
52976404edcSAsim Jamshed if (!cur_stream) {
53076404edcSAsim Jamshed errno = EBADF;
53176404edcSAsim Jamshed return -1;
53276404edcSAsim Jamshed }
53376404edcSAsim Jamshed
53476404edcSAsim Jamshed rbuf = cur_stream->rcvvar->rcvbuf;
53576404edcSAsim Jamshed *(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0;
53676404edcSAsim Jamshed
53776404edcSAsim Jamshed } else if (request == FIONBIO) {
53876404edcSAsim Jamshed /*
53976404edcSAsim Jamshed * sockets can only be set to blocking/non-blocking
54076404edcSAsim Jamshed * modes during initialization
54176404edcSAsim Jamshed */
54276404edcSAsim Jamshed if ((*(int *)argp))
54376404edcSAsim Jamshed mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
54476404edcSAsim Jamshed else
54576404edcSAsim Jamshed mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK;
54676404edcSAsim Jamshed } else {
54776404edcSAsim Jamshed errno = EINVAL;
54876404edcSAsim Jamshed return -1;
54976404edcSAsim Jamshed }
55076404edcSAsim Jamshed
55176404edcSAsim Jamshed return 0;
55276404edcSAsim Jamshed }
55376404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
55476404edcSAsim Jamshed static int
mtcp_monitor(mctx_t mctx,socket_map_t sock)55576404edcSAsim Jamshed mtcp_monitor(mctx_t mctx, socket_map_t sock)
55676404edcSAsim Jamshed {
55776404edcSAsim Jamshed mtcp_manager_t mtcp;
55876404edcSAsim Jamshed struct mon_listener *monitor;
55976404edcSAsim Jamshed int sockid = sock->id;
56076404edcSAsim Jamshed
56176404edcSAsim Jamshed mtcp = GetMTCPManager(mctx);
56276404edcSAsim Jamshed if (!mtcp) {
56376404edcSAsim Jamshed errno = EACCES;
56476404edcSAsim Jamshed return -1;
56576404edcSAsim Jamshed }
56676404edcSAsim Jamshed
56776404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
56876404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid);
56976404edcSAsim Jamshed errno = EBADF;
57076404edcSAsim Jamshed return -1;
57176404edcSAsim Jamshed }
57276404edcSAsim Jamshed
57376404edcSAsim Jamshed if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) {
57476404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid);
57576404edcSAsim Jamshed errno = EBADF;
57676404edcSAsim Jamshed return -1;
57776404edcSAsim Jamshed }
57876404edcSAsim Jamshed
57976404edcSAsim Jamshed if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM ||
58076404edcSAsim Jamshed mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) {
58176404edcSAsim Jamshed TRACE_API("Not a monitor socket. id: %d\n", sockid);
58276404edcSAsim Jamshed errno = ENOTSOCK;
58376404edcSAsim Jamshed return -1;
58476404edcSAsim Jamshed }
58576404edcSAsim Jamshed
58676404edcSAsim Jamshed monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener));
58776404edcSAsim Jamshed if (!monitor) {
58876404edcSAsim Jamshed /* errno set from the malloc() */
58976404edcSAsim Jamshed errno = ENOMEM;
59076404edcSAsim Jamshed return -1;
59176404edcSAsim Jamshed }
59276404edcSAsim Jamshed
59376404edcSAsim Jamshed /* create a monitor-specific event queue */
59476404edcSAsim Jamshed monitor->eq = CreateEventQueue(g_config.mos->max_concurrency);
59576404edcSAsim Jamshed if (!monitor->eq) {
59676404edcSAsim Jamshed TRACE_API("Can't create event queue (concurrency: %d) for "
59776404edcSAsim Jamshed "monitor read event registrations!\n",
59876404edcSAsim Jamshed g_config.mos->max_concurrency);
59976404edcSAsim Jamshed free(monitor);
60076404edcSAsim Jamshed errno = ENOMEM;
60176404edcSAsim Jamshed return -1;
60276404edcSAsim Jamshed }
60376404edcSAsim Jamshed
60476404edcSAsim Jamshed /* set monitor-related basic parameters */
60576404edcSAsim Jamshed #ifndef NEWEV
60676404edcSAsim Jamshed monitor->ude_id = UDE_OFFSET;
60776404edcSAsim Jamshed #endif
60876404edcSAsim Jamshed monitor->socket = sock;
60976404edcSAsim Jamshed monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL;
61076404edcSAsim Jamshed
61176404edcSAsim Jamshed /* perform both sides monitoring by default */
61276404edcSAsim Jamshed monitor->client_mon = monitor->server_mon = 1;
61376404edcSAsim Jamshed
61476404edcSAsim Jamshed /* add monitor socket to the monitor list */
61576404edcSAsim Jamshed TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link);
61676404edcSAsim Jamshed
61776404edcSAsim Jamshed mtcp->msmap[sockid].monitor_listener = monitor;
61876404edcSAsim Jamshed
61976404edcSAsim Jamshed return 0;
62076404edcSAsim Jamshed }
62176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
62276404edcSAsim Jamshed int
mtcp_socket(mctx_t mctx,int domain,int type,int protocol)62376404edcSAsim Jamshed mtcp_socket(mctx_t mctx, int domain, int type, int protocol)
62476404edcSAsim Jamshed {
62576404edcSAsim Jamshed mtcp_manager_t mtcp;
62676404edcSAsim Jamshed socket_map_t socket;
62776404edcSAsim Jamshed
62876404edcSAsim Jamshed mtcp = GetMTCPManager(mctx);
62976404edcSAsim Jamshed if (!mtcp) {
63076404edcSAsim Jamshed errno = EACCES;
63176404edcSAsim Jamshed return -1;
63276404edcSAsim Jamshed }
63376404edcSAsim Jamshed
63476404edcSAsim Jamshed if (domain != AF_INET) {
63576404edcSAsim Jamshed errno = EAFNOSUPPORT;
63676404edcSAsim Jamshed return -1;
63776404edcSAsim Jamshed }
63876404edcSAsim Jamshed
639dcdbbb98SAsim Jamshed if (type == (int)SOCK_STREAM) {
64076404edcSAsim Jamshed type = MOS_SOCK_STREAM;
64176404edcSAsim Jamshed } else if (type == MOS_SOCK_MONITOR_STREAM ||
64276404edcSAsim Jamshed type == MOS_SOCK_MONITOR_RAW) {
64376404edcSAsim Jamshed /* do nothing for the time being */
64476404edcSAsim Jamshed } else {
64576404edcSAsim Jamshed /* Not supported type */
64676404edcSAsim Jamshed errno = EINVAL;
64776404edcSAsim Jamshed return -1;
64876404edcSAsim Jamshed }
64976404edcSAsim Jamshed
65076404edcSAsim Jamshed socket = AllocateSocket(mctx, type);
65176404edcSAsim Jamshed if (!socket) {
65276404edcSAsim Jamshed errno = ENFILE;
65376404edcSAsim Jamshed return -1;
65476404edcSAsim Jamshed }
65576404edcSAsim Jamshed
65676404edcSAsim Jamshed if (type == MOS_SOCK_MONITOR_STREAM ||
65776404edcSAsim Jamshed type == MOS_SOCK_MONITOR_RAW) {
65876404edcSAsim Jamshed mtcp_manager_t mtcp = GetMTCPManager(mctx);
65976404edcSAsim Jamshed if (!mtcp) {
66076404edcSAsim Jamshed errno = EACCES;
66176404edcSAsim Jamshed return -1;
66276404edcSAsim Jamshed }
66376404edcSAsim Jamshed mtcp_monitor(mctx, socket);
66476404edcSAsim Jamshed #ifdef NEWEV
66576404edcSAsim Jamshed socket->monitor_listener->stree_dontcare = NULL;
66676404edcSAsim Jamshed socket->monitor_listener->stree_pre_rcv = NULL;
66776404edcSAsim Jamshed socket->monitor_listener->stree_post_snd = NULL;
66876404edcSAsim Jamshed #else
66976404edcSAsim Jamshed InitEvB(mtcp, &socket->monitor_listener->dontcare_evb);
67076404edcSAsim Jamshed InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb);
67176404edcSAsim Jamshed InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb);
67276404edcSAsim Jamshed #endif
67376404edcSAsim Jamshed }
67476404edcSAsim Jamshed
67576404edcSAsim Jamshed return socket->id;
67676404edcSAsim Jamshed }
67776404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
67876404edcSAsim Jamshed int
mtcp_bind(mctx_t mctx,int sockid,const struct sockaddr * addr,socklen_t addrlen)67976404edcSAsim Jamshed mtcp_bind(mctx_t mctx, int sockid,
68076404edcSAsim Jamshed const struct sockaddr *addr, socklen_t addrlen)
68176404edcSAsim Jamshed {
68276404edcSAsim Jamshed mtcp_manager_t mtcp;
68376404edcSAsim Jamshed struct sockaddr_in *addr_in;
68476404edcSAsim Jamshed
68576404edcSAsim Jamshed mtcp = GetMTCPManager(mctx);
68676404edcSAsim Jamshed if (!mtcp) {
68776404edcSAsim Jamshed errno = EACCES;
68876404edcSAsim Jamshed return -1;
68976404edcSAsim Jamshed }
69076404edcSAsim Jamshed
69176404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
69276404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid);
69376404edcSAsim Jamshed errno = EBADF;
69476404edcSAsim Jamshed return -1;
69576404edcSAsim Jamshed }
69676404edcSAsim Jamshed
69776404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
69876404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid);
69976404edcSAsim Jamshed errno = EBADF;
70076404edcSAsim Jamshed return -1;
70176404edcSAsim Jamshed }
70276404edcSAsim Jamshed
70376404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM &&
70476404edcSAsim Jamshed mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
70576404edcSAsim Jamshed TRACE_API("Not a stream socket id: %d\n", sockid);
70676404edcSAsim Jamshed errno = ENOTSOCK;
70776404edcSAsim Jamshed return -1;
70876404edcSAsim Jamshed }
70976404edcSAsim Jamshed
71076404edcSAsim Jamshed if (!addr) {
71176404edcSAsim Jamshed TRACE_API("Socket %d: empty address!\n", sockid);
71276404edcSAsim Jamshed errno = EINVAL;
71376404edcSAsim Jamshed return -1;
71476404edcSAsim Jamshed }
71576404edcSAsim Jamshed
71676404edcSAsim Jamshed if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) {
71776404edcSAsim Jamshed TRACE_API("Socket %d: adress already bind for this socket.\n", sockid);
71876404edcSAsim Jamshed errno = EINVAL;
71976404edcSAsim Jamshed return -1;
72076404edcSAsim Jamshed }
72176404edcSAsim Jamshed
72276404edcSAsim Jamshed /* we only allow bind() for AF_INET address */
72376404edcSAsim Jamshed if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
72476404edcSAsim Jamshed TRACE_API("Socket %d: invalid argument!\n", sockid);
72576404edcSAsim Jamshed errno = EINVAL;
72676404edcSAsim Jamshed return -1;
72776404edcSAsim Jamshed }
72876404edcSAsim Jamshed
72976404edcSAsim Jamshed if (mtcp->listener) {
73076404edcSAsim Jamshed TRACE_API("Address already bound!\n");
73176404edcSAsim Jamshed errno = EINVAL;
73276404edcSAsim Jamshed return -1;
73376404edcSAsim Jamshed }
73476404edcSAsim Jamshed addr_in = (struct sockaddr_in *)addr;
73576404edcSAsim Jamshed mtcp->smap[sockid].saddr = *addr_in;
73676404edcSAsim Jamshed mtcp->smap[sockid].opts |= MTCP_ADDR_BIND;
73776404edcSAsim Jamshed
73876404edcSAsim Jamshed return 0;
73976404edcSAsim Jamshed }
74076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
74176404edcSAsim Jamshed int
mtcp_listen(mctx_t mctx,int sockid,int backlog)74276404edcSAsim Jamshed mtcp_listen(mctx_t mctx, int sockid, int backlog)
74376404edcSAsim Jamshed {
74476404edcSAsim Jamshed mtcp_manager_t mtcp;
74576404edcSAsim Jamshed struct tcp_listener *listener;
74676404edcSAsim Jamshed
74776404edcSAsim Jamshed mtcp = GetMTCPManager(mctx);
74876404edcSAsim Jamshed if (!mtcp) {
74976404edcSAsim Jamshed errno = EACCES;
75076404edcSAsim Jamshed return -1;
75176404edcSAsim Jamshed }
75276404edcSAsim Jamshed
75376404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
75476404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid);
75576404edcSAsim Jamshed errno = EBADF;
75676404edcSAsim Jamshed return -1;
75776404edcSAsim Jamshed }
75876404edcSAsim Jamshed
75976404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
76076404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid);
76176404edcSAsim Jamshed errno = EBADF;
76276404edcSAsim Jamshed return -1;
76376404edcSAsim Jamshed }
76476404edcSAsim Jamshed
76576404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) {
76676404edcSAsim Jamshed mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN;
76776404edcSAsim Jamshed }
76876404edcSAsim Jamshed
76976404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
77076404edcSAsim Jamshed TRACE_API("Not a listening socket. id: %d\n", sockid);
77176404edcSAsim Jamshed errno = ENOTSOCK;
77276404edcSAsim Jamshed return -1;
77376404edcSAsim Jamshed }
77476404edcSAsim Jamshed
77576404edcSAsim Jamshed if (backlog <= 0 || backlog > g_config.mos->max_concurrency) {
77676404edcSAsim Jamshed errno = EINVAL;
77776404edcSAsim Jamshed return -1;
77876404edcSAsim Jamshed }
77976404edcSAsim Jamshed
78076404edcSAsim Jamshed listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener));
78176404edcSAsim Jamshed if (!listener) {
78276404edcSAsim Jamshed /* errno set from the malloc() */
78376404edcSAsim Jamshed errno = ENOMEM;
78476404edcSAsim Jamshed return -1;
78576404edcSAsim Jamshed }
78676404edcSAsim Jamshed
78776404edcSAsim Jamshed listener->sockid = sockid;
78876404edcSAsim Jamshed listener->backlog = backlog;
78976404edcSAsim Jamshed listener->socket = &mtcp->smap[sockid];
79076404edcSAsim Jamshed
79176404edcSAsim Jamshed if (pthread_cond_init(&listener->accept_cond, NULL)) {
79276404edcSAsim Jamshed perror("pthread_cond_init of ctx->accept_cond\n");
79376404edcSAsim Jamshed /* errno set by pthread_cond_init() */
794c6a5549bSAsim Jamshed free(listener);
79576404edcSAsim Jamshed return -1;
79676404edcSAsim Jamshed }
79776404edcSAsim Jamshed if (pthread_mutex_init(&listener->accept_lock, NULL)) {
79876404edcSAsim Jamshed perror("pthread_mutex_init of ctx->accept_lock\n");
79976404edcSAsim Jamshed /* errno set by pthread_mutex_init() */
800c6a5549bSAsim Jamshed free(listener);
80176404edcSAsim Jamshed return -1;
80276404edcSAsim Jamshed }
80376404edcSAsim Jamshed
80476404edcSAsim Jamshed listener->acceptq = CreateStreamQueue(backlog);
80576404edcSAsim Jamshed if (!listener->acceptq) {
8068a941c7eSAsim Jamshed free(listener);
80776404edcSAsim Jamshed errno = ENOMEM;
80876404edcSAsim Jamshed return -1;
80976404edcSAsim Jamshed }
81076404edcSAsim Jamshed
81176404edcSAsim Jamshed mtcp->smap[sockid].listener = listener;
81276404edcSAsim Jamshed mtcp->listener = listener;
81376404edcSAsim Jamshed
81476404edcSAsim Jamshed return 0;
81576404edcSAsim Jamshed }
81676404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
81776404edcSAsim Jamshed int
mtcp_accept(mctx_t mctx,int sockid,struct sockaddr * addr,socklen_t * addrlen)81876404edcSAsim Jamshed mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen)
81976404edcSAsim Jamshed {
82076404edcSAsim Jamshed mtcp_manager_t mtcp;
82176404edcSAsim Jamshed struct tcp_listener *listener;
82276404edcSAsim Jamshed socket_map_t socket;
82376404edcSAsim Jamshed tcp_stream *accepted = NULL;
82476404edcSAsim Jamshed
82576404edcSAsim Jamshed mtcp = GetMTCPManager(mctx);
82676404edcSAsim Jamshed if (!mtcp) {
82776404edcSAsim Jamshed errno = EACCES;
82876404edcSAsim Jamshed return -1;
82976404edcSAsim Jamshed }
83076404edcSAsim Jamshed
83176404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
83276404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid);
83376404edcSAsim Jamshed errno = EBADF;
83476404edcSAsim Jamshed return -1;
83576404edcSAsim Jamshed }
83676404edcSAsim Jamshed
83776404edcSAsim Jamshed /* requires listening socket */
83876404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
83976404edcSAsim Jamshed errno = EINVAL;
84076404edcSAsim Jamshed return -1;
84176404edcSAsim Jamshed }
84276404edcSAsim Jamshed
84376404edcSAsim Jamshed listener = mtcp->smap[sockid].listener;
84476404edcSAsim Jamshed
84576404edcSAsim Jamshed /* dequeue from the acceptq without lock first */
84676404edcSAsim Jamshed /* if nothing there, acquire lock and cond_wait */
84776404edcSAsim Jamshed accepted = StreamDequeue(listener->acceptq);
84876404edcSAsim Jamshed if (!accepted) {
84976404edcSAsim Jamshed if (listener->socket->opts & MTCP_NONBLOCK) {
85076404edcSAsim Jamshed errno = EAGAIN;
85176404edcSAsim Jamshed return -1;
85276404edcSAsim Jamshed
85376404edcSAsim Jamshed } else {
85476404edcSAsim Jamshed pthread_mutex_lock(&listener->accept_lock);
85576404edcSAsim Jamshed while ((accepted = StreamDequeue(listener->acceptq)) == NULL) {
85676404edcSAsim Jamshed pthread_cond_wait(&listener->accept_cond, &listener->accept_lock);
85776404edcSAsim Jamshed
85876404edcSAsim Jamshed if (mtcp->ctx->done || mtcp->ctx->exit) {
85976404edcSAsim Jamshed pthread_mutex_unlock(&listener->accept_lock);
86076404edcSAsim Jamshed errno = EINTR;
86176404edcSAsim Jamshed return -1;
86276404edcSAsim Jamshed }
86376404edcSAsim Jamshed }
86476404edcSAsim Jamshed pthread_mutex_unlock(&listener->accept_lock);
86576404edcSAsim Jamshed }
86676404edcSAsim Jamshed }
86776404edcSAsim Jamshed
86876404edcSAsim Jamshed if (!accepted) {
86976404edcSAsim Jamshed TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n");
87076404edcSAsim Jamshed }
87176404edcSAsim Jamshed
87276404edcSAsim Jamshed if (!accepted->socket) {
87376404edcSAsim Jamshed socket = AllocateSocket(mctx, MOS_SOCK_STREAM);
87476404edcSAsim Jamshed if (!socket) {
87576404edcSAsim Jamshed TRACE_ERROR("Failed to create new socket!\n");
87676404edcSAsim Jamshed /* TODO: destroy the stream */
87776404edcSAsim Jamshed errno = ENFILE;
87876404edcSAsim Jamshed return -1;
87976404edcSAsim Jamshed }
88076404edcSAsim Jamshed socket->stream = accepted;
88176404edcSAsim Jamshed accepted->socket = socket;
882a5e1a556SAsim Jamshed
883a5e1a556SAsim Jamshed /* set socket addr parameters */
884a5e1a556SAsim Jamshed socket->saddr.sin_family = AF_INET;
885a5e1a556SAsim Jamshed socket->saddr.sin_port = accepted->dport;
886a5e1a556SAsim Jamshed socket->saddr.sin_addr.s_addr = accepted->daddr;
887a5e1a556SAsim Jamshed
88876404edcSAsim Jamshed /* if monitor is enabled, complete the socket assignment */
88976404edcSAsim Jamshed if (socket->stream->pair_stream != NULL)
89076404edcSAsim Jamshed socket->stream->pair_stream->socket = socket;
89176404edcSAsim Jamshed }
89276404edcSAsim Jamshed
893a5e1a556SAsim Jamshed if (!(listener->socket->epoll & MOS_EPOLLET) &&
894a5e1a556SAsim Jamshed !StreamQueueIsEmpty(listener->acceptq))
895a5e1a556SAsim Jamshed AddEpollEvent(mtcp->ep,
896a5e1a556SAsim Jamshed USR_SHADOW_EVENT_QUEUE,
897a5e1a556SAsim Jamshed listener->socket, MOS_EPOLLIN);
898a5e1a556SAsim Jamshed
89976404edcSAsim Jamshed TRACE_API("Stream %d accepted.\n", accepted->id);
90076404edcSAsim Jamshed
90176404edcSAsim Jamshed if (addr && addrlen) {
90276404edcSAsim Jamshed struct sockaddr_in *addr_in = (struct sockaddr_in *)addr;
90376404edcSAsim Jamshed addr_in->sin_family = AF_INET;
90476404edcSAsim Jamshed addr_in->sin_port = accepted->dport;
90576404edcSAsim Jamshed addr_in->sin_addr.s_addr = accepted->daddr;
90676404edcSAsim Jamshed *addrlen = sizeof(struct sockaddr_in);
90776404edcSAsim Jamshed }
90876404edcSAsim Jamshed
90976404edcSAsim Jamshed return accepted->socket->id;
91076404edcSAsim Jamshed }
91176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
91276404edcSAsim Jamshed int
mtcp_init_rss(mctx_t mctx,in_addr_t saddr_base,int num_addr,in_addr_t daddr,in_addr_t dport)91376404edcSAsim Jamshed mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr,
91476404edcSAsim Jamshed in_addr_t daddr, in_addr_t dport)
91576404edcSAsim Jamshed {
91676404edcSAsim Jamshed mtcp_manager_t mtcp;
91776404edcSAsim Jamshed addr_pool_t ap;
91876404edcSAsim Jamshed
91976404edcSAsim Jamshed mtcp = GetMTCPManager(mctx);
92076404edcSAsim Jamshed if (!mtcp) {
92176404edcSAsim Jamshed errno = EACCES;
92276404edcSAsim Jamshed return -1;
92376404edcSAsim Jamshed }
92476404edcSAsim Jamshed
92576404edcSAsim Jamshed if (saddr_base == INADDR_ANY) {
92676404edcSAsim Jamshed int nif_out;
92776404edcSAsim Jamshed
92876404edcSAsim Jamshed /* for the INADDR_ANY, find the output interface for the destination
92976404edcSAsim Jamshed and set the saddr_base as the ip address of the output interface */
93076404edcSAsim Jamshed nif_out = GetOutputInterface(daddr);
9318a941c7eSAsim Jamshed if (nif_out < 0) {
9328a941c7eSAsim Jamshed TRACE_DBG("Could not determine nif idx!\n");
9338a941c7eSAsim Jamshed errno = EINVAL;
9348a941c7eSAsim Jamshed return -1;
9358a941c7eSAsim Jamshed }
93676404edcSAsim Jamshed saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr;
93776404edcSAsim Jamshed }
93876404edcSAsim Jamshed
93976404edcSAsim Jamshed ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus,
94076404edcSAsim Jamshed saddr_base, num_addr, daddr, dport);
94176404edcSAsim Jamshed if (!ap) {
94276404edcSAsim Jamshed errno = ENOMEM;
94376404edcSAsim Jamshed return -1;
94476404edcSAsim Jamshed }
94576404edcSAsim Jamshed
94676404edcSAsim Jamshed mtcp->ap = ap;
94776404edcSAsim Jamshed
94876404edcSAsim Jamshed return 0;
94976404edcSAsim Jamshed }
95076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
95176404edcSAsim Jamshed int
eval_bpf_5tuple(struct sfbpf_program fcode,in_addr_t saddr,in_port_t sport,in_addr_t daddr,in_port_t dport)95276404edcSAsim Jamshed eval_bpf_5tuple(struct sfbpf_program fcode,
95376404edcSAsim Jamshed in_addr_t saddr, in_port_t sport,
95476404edcSAsim Jamshed in_addr_t daddr, in_port_t dport) {
95576404edcSAsim Jamshed uint8_t buf[TOTAL_TCP_HEADER_LEN];
95676404edcSAsim Jamshed struct ethhdr *ethh;
95776404edcSAsim Jamshed struct iphdr *iph;
95876404edcSAsim Jamshed struct tcphdr *tcph;
95976404edcSAsim Jamshed
96076404edcSAsim Jamshed ethh = (struct ethhdr *)buf;
96176404edcSAsim Jamshed ethh->h_proto = htons(ETH_P_IP);
96276404edcSAsim Jamshed iph = (struct iphdr *)(ethh + 1);
96376404edcSAsim Jamshed iph->ihl = IP_HEADER_LEN >> 2;
96476404edcSAsim Jamshed iph->version = 4;
96576404edcSAsim Jamshed iph->tos = 0;
96676404edcSAsim Jamshed iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN);
96776404edcSAsim Jamshed iph->id = htons(0);
96876404edcSAsim Jamshed iph->protocol = IPPROTO_TCP;
96976404edcSAsim Jamshed iph->saddr = saddr;
97076404edcSAsim Jamshed iph->daddr = daddr;
97176404edcSAsim Jamshed iph->check = 0;
97276404edcSAsim Jamshed tcph = (struct tcphdr *)(iph + 1);
97376404edcSAsim Jamshed tcph->source = sport;
97476404edcSAsim Jamshed tcph->dest = dport;
97576404edcSAsim Jamshed
97676404edcSAsim Jamshed return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr),
97776404edcSAsim Jamshed TOTAL_TCP_HEADER_LEN);
97876404edcSAsim Jamshed }
97976404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
98076404edcSAsim Jamshed int
mtcp_connect(mctx_t mctx,int sockid,const struct sockaddr * addr,socklen_t addrlen)98176404edcSAsim Jamshed mtcp_connect(mctx_t mctx, int sockid,
98276404edcSAsim Jamshed const struct sockaddr *addr, socklen_t addrlen)
98376404edcSAsim Jamshed {
98476404edcSAsim Jamshed mtcp_manager_t mtcp;
98576404edcSAsim Jamshed socket_map_t socket;
98676404edcSAsim Jamshed tcp_stream *cur_stream;
98776404edcSAsim Jamshed struct sockaddr_in *addr_in;
98876404edcSAsim Jamshed in_addr_t dip;
98976404edcSAsim Jamshed in_port_t dport;
99076404edcSAsim Jamshed int is_dyn_bound = FALSE;
991*8c9e1184SAsim Jamshed int ret, nif;
99276404edcSAsim Jamshed int cnt_match = 0;
99376404edcSAsim Jamshed struct mon_listener *walk;
99476404edcSAsim Jamshed struct sfbpf_program fcode;
99576404edcSAsim Jamshed
99676404edcSAsim Jamshed cur_stream = NULL;
99776404edcSAsim Jamshed mtcp = GetMTCPManager(mctx);
99876404edcSAsim Jamshed if (!mtcp) {
99976404edcSAsim Jamshed errno = EACCES;
100076404edcSAsim Jamshed return -1;
100176404edcSAsim Jamshed }
100276404edcSAsim Jamshed
100376404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
100476404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid);
100576404edcSAsim Jamshed errno = EBADF;
100676404edcSAsim Jamshed return -1;
100776404edcSAsim Jamshed }
100876404edcSAsim Jamshed
100976404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
101076404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid);
101176404edcSAsim Jamshed errno = EBADF;
101276404edcSAsim Jamshed return -1;
101376404edcSAsim Jamshed }
101476404edcSAsim Jamshed
101576404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
101676404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid);
101776404edcSAsim Jamshed errno = ENOTSOCK;
101876404edcSAsim Jamshed return -1;
101976404edcSAsim Jamshed }
102076404edcSAsim Jamshed
102176404edcSAsim Jamshed if (!addr) {
102276404edcSAsim Jamshed TRACE_API("Socket %d: empty address!\n", sockid);
102376404edcSAsim Jamshed errno = EFAULT;
102476404edcSAsim Jamshed return -1;
102576404edcSAsim Jamshed }
102676404edcSAsim Jamshed
102776404edcSAsim Jamshed /* we only allow bind() for AF_INET address */
102876404edcSAsim Jamshed if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
102976404edcSAsim Jamshed TRACE_API("Socket %d: invalid argument!\n", sockid);
103076404edcSAsim Jamshed errno = EAFNOSUPPORT;
103176404edcSAsim Jamshed return -1;
103276404edcSAsim Jamshed }
103376404edcSAsim Jamshed
103476404edcSAsim Jamshed socket = &mtcp->smap[sockid];
103576404edcSAsim Jamshed if (socket->stream) {
103676404edcSAsim Jamshed TRACE_API("Socket %d: stream already exist!\n", sockid);
103776404edcSAsim Jamshed if (socket->stream->state >= TCP_ST_ESTABLISHED) {
103876404edcSAsim Jamshed errno = EISCONN;
103976404edcSAsim Jamshed } else {
104076404edcSAsim Jamshed errno = EALREADY;
104176404edcSAsim Jamshed }
104276404edcSAsim Jamshed return -1;
104376404edcSAsim Jamshed }
104476404edcSAsim Jamshed
104576404edcSAsim Jamshed addr_in = (struct sockaddr_in *)addr;
104676404edcSAsim Jamshed dip = addr_in->sin_addr.s_addr;
104776404edcSAsim Jamshed dport = addr_in->sin_port;
104876404edcSAsim Jamshed
104976404edcSAsim Jamshed /* address binding */
105076404edcSAsim Jamshed if (socket->opts & MTCP_ADDR_BIND &&
105176404edcSAsim Jamshed socket->saddr.sin_port != INPORT_ANY &&
105276404edcSAsim Jamshed socket->saddr.sin_addr.s_addr != INADDR_ANY) {
105376404edcSAsim Jamshed int rss_core;
105476404edcSAsim Jamshed
105576404edcSAsim Jamshed rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip,
1056*8c9e1184SAsim Jamshed socket->saddr.sin_port, dport, num_queues);
105776404edcSAsim Jamshed
105876404edcSAsim Jamshed if (rss_core != mctx->cpu) {
105976404edcSAsim Jamshed errno = EINVAL;
106076404edcSAsim Jamshed return -1;
106176404edcSAsim Jamshed }
106276404edcSAsim Jamshed } else {
106376404edcSAsim Jamshed if (mtcp->ap) {
10642efd908dSAsim Jamshed ret = FetchAddressPerCore(mtcp->ap,
106576404edcSAsim Jamshed mctx->cpu, num_queues, addr_in, &socket->saddr);
106676404edcSAsim Jamshed } else {
10678a941c7eSAsim Jamshed nif = GetOutputInterface(dip);
10688a941c7eSAsim Jamshed if (nif < 0) {
10698a941c7eSAsim Jamshed errno = EINVAL;
10708a941c7eSAsim Jamshed return -1;
10718a941c7eSAsim Jamshed }
10728a941c7eSAsim Jamshed ret = FetchAddress(ap[nif],
107376404edcSAsim Jamshed mctx->cpu, num_queues, addr_in, &socket->saddr);
107476404edcSAsim Jamshed }
107576404edcSAsim Jamshed if (ret < 0) {
107676404edcSAsim Jamshed errno = EAGAIN;
107776404edcSAsim Jamshed return -1;
107876404edcSAsim Jamshed }
107976404edcSAsim Jamshed socket->opts |= MTCP_ADDR_BIND;
108076404edcSAsim Jamshed is_dyn_bound = TRUE;
108176404edcSAsim Jamshed }
108276404edcSAsim Jamshed
108376404edcSAsim Jamshed cnt_match = 0;
108476404edcSAsim Jamshed if (mtcp->num_msp > 0) {
108576404edcSAsim Jamshed TAILQ_FOREACH(walk, &mtcp->monitors, link) {
108676404edcSAsim Jamshed fcode = walk->stream_syn_fcode;
108776404edcSAsim Jamshed if (!(ISSET_BPFFILTER(fcode) &&
108876404edcSAsim Jamshed eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr,
108976404edcSAsim Jamshed socket->saddr.sin_port,
109076404edcSAsim Jamshed dip, dport) == 0)) {
109176404edcSAsim Jamshed walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1
109276404edcSAsim Jamshed cnt_match++;
109376404edcSAsim Jamshed }
109476404edcSAsim Jamshed }
109576404edcSAsim Jamshed }
109676404edcSAsim Jamshed
109776404edcSAsim Jamshed if (mtcp->num_msp > 0 && cnt_match > 0) {
109876404edcSAsim Jamshed /* 150820 dhkim: XXX: embedded mode is not verified */
109976404edcSAsim Jamshed #if 1
110076404edcSAsim Jamshed cur_stream = CreateClientTCPStream(mtcp, socket,
110176404edcSAsim Jamshed STREAM_TYPE(MOS_SOCK_STREAM) |
110276404edcSAsim Jamshed STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
110376404edcSAsim Jamshed socket->saddr.sin_addr.s_addr,
110476404edcSAsim Jamshed socket->saddr.sin_port, dip, dport, NULL);
110576404edcSAsim Jamshed #else
110676404edcSAsim Jamshed cur_stream = CreateDualTCPStream(mtcp, socket,
110776404edcSAsim Jamshed STREAM_TYPE(MOS_SOCK_STREAM) |
110876404edcSAsim Jamshed STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
110976404edcSAsim Jamshed socket->saddr.sin_addr.s_addr,
111076404edcSAsim Jamshed socket->saddr.sin_port, dip, dport, NULL);
111176404edcSAsim Jamshed #endif
111276404edcSAsim Jamshed }
111376404edcSAsim Jamshed else
111476404edcSAsim Jamshed cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM),
111576404edcSAsim Jamshed socket->saddr.sin_addr.s_addr,
111676404edcSAsim Jamshed socket->saddr.sin_port, dip, dport, NULL);
111776404edcSAsim Jamshed if (!cur_stream) {
111876404edcSAsim Jamshed TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid);
111976404edcSAsim Jamshed errno = ENOMEM;
112076404edcSAsim Jamshed return -1;
112176404edcSAsim Jamshed }
112276404edcSAsim Jamshed
112376404edcSAsim Jamshed if (is_dyn_bound)
112476404edcSAsim Jamshed cur_stream->is_bound_addr = TRUE;
112576404edcSAsim Jamshed cur_stream->sndvar->cwnd = 1;
112676404edcSAsim Jamshed cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10;
112776404edcSAsim Jamshed cur_stream->side = MOS_SIDE_CLI;
112876404edcSAsim Jamshed /* if monitor is enabled, update the pair stream side as well */
112976404edcSAsim Jamshed if (cur_stream->pair_stream) {
113076404edcSAsim Jamshed cur_stream->pair_stream->side = MOS_SIDE_SVR;
113176404edcSAsim Jamshed /*
113276404edcSAsim Jamshed * if buffer management is off, then disable
113376404edcSAsim Jamshed * monitoring tcp ring of server...
113476404edcSAsim Jamshed * if there is even a single monitor asking for
113576404edcSAsim Jamshed * buffer management, enable it (that's why the
113676404edcSAsim Jamshed * need for the loop)
113776404edcSAsim Jamshed */
113876404edcSAsim Jamshed cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF;
113976404edcSAsim Jamshed struct socket_map *walk;
114076404edcSAsim Jamshed SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
114176404edcSAsim Jamshed uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt;
114276404edcSAsim Jamshed if (bm > cur_stream->pair_stream->buffer_mgmt) {
114376404edcSAsim Jamshed cur_stream->pair_stream->buffer_mgmt = bm;
114476404edcSAsim Jamshed break;
114576404edcSAsim Jamshed }
114676404edcSAsim Jamshed } SOCKQ_FOREACH_END;
114776404edcSAsim Jamshed }
114876404edcSAsim Jamshed
114976404edcSAsim Jamshed cur_stream->state = TCP_ST_SYN_SENT;
115076404edcSAsim Jamshed cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
115176404edcSAsim Jamshed
115276404edcSAsim Jamshed TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id);
115376404edcSAsim Jamshed
115476404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->connect_lock);
115576404edcSAsim Jamshed ret = StreamEnqueue(mtcp->connectq, cur_stream);
115676404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->connect_lock);
115776404edcSAsim Jamshed mtcp->wakeup_flag = TRUE;
115876404edcSAsim Jamshed if (ret < 0) {
115976404edcSAsim Jamshed TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid);
116076404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock);
116176404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream);
116276404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
116376404edcSAsim Jamshed errno = EAGAIN;
116476404edcSAsim Jamshed return -1;
116576404edcSAsim Jamshed }
116676404edcSAsim Jamshed
116776404edcSAsim Jamshed /* if nonblocking socket, return EINPROGRESS */
116876404edcSAsim Jamshed if (socket->opts & MTCP_NONBLOCK) {
116976404edcSAsim Jamshed errno = EINPROGRESS;
117076404edcSAsim Jamshed return -1;
117176404edcSAsim Jamshed
117276404edcSAsim Jamshed } else {
117376404edcSAsim Jamshed while (1) {
117476404edcSAsim Jamshed if (!cur_stream) {
117576404edcSAsim Jamshed TRACE_ERROR("STREAM DESTROYED\n");
117676404edcSAsim Jamshed errno = ETIMEDOUT;
117776404edcSAsim Jamshed return -1;
117876404edcSAsim Jamshed }
117976404edcSAsim Jamshed if (cur_stream->state > TCP_ST_ESTABLISHED) {
118076404edcSAsim Jamshed TRACE_ERROR("Socket %d: weird state %s\n",
118176404edcSAsim Jamshed sockid, TCPStateToString(cur_stream));
118276404edcSAsim Jamshed // TODO: how to handle this?
118376404edcSAsim Jamshed errno = ENOSYS;
118476404edcSAsim Jamshed return -1;
118576404edcSAsim Jamshed }
118676404edcSAsim Jamshed
118776404edcSAsim Jamshed if (cur_stream->state == TCP_ST_ESTABLISHED) {
118876404edcSAsim Jamshed break;
118976404edcSAsim Jamshed }
119076404edcSAsim Jamshed usleep(1000);
119176404edcSAsim Jamshed }
119276404edcSAsim Jamshed }
119376404edcSAsim Jamshed
119476404edcSAsim Jamshed return 0;
119576404edcSAsim Jamshed }
119676404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
119776404edcSAsim Jamshed static inline int
CloseStreamSocket(mctx_t mctx,int sockid)119876404edcSAsim Jamshed CloseStreamSocket(mctx_t mctx, int sockid)
119976404edcSAsim Jamshed {
120076404edcSAsim Jamshed mtcp_manager_t mtcp;
120176404edcSAsim Jamshed tcp_stream *cur_stream;
120276404edcSAsim Jamshed int ret;
120376404edcSAsim Jamshed
120476404edcSAsim Jamshed mtcp = GetMTCPManager(mctx);
120576404edcSAsim Jamshed if (!mtcp) {
120676404edcSAsim Jamshed errno = EACCES;
120776404edcSAsim Jamshed return -1;
120876404edcSAsim Jamshed }
120976404edcSAsim Jamshed
121076404edcSAsim Jamshed cur_stream = mtcp->smap[sockid].stream;
121176404edcSAsim Jamshed if (!cur_stream) {
121276404edcSAsim Jamshed TRACE_API("Socket %d: stream does not exist.\n", sockid);
121376404edcSAsim Jamshed errno = ENOTCONN;
121476404edcSAsim Jamshed return -1;
121576404edcSAsim Jamshed }
121676404edcSAsim Jamshed
121776404edcSAsim Jamshed if (cur_stream->closed) {
121876404edcSAsim Jamshed TRACE_API("Socket %d (Stream %u): already closed stream\n",
121976404edcSAsim Jamshed sockid, cur_stream->id);
122076404edcSAsim Jamshed return 0;
122176404edcSAsim Jamshed }
122276404edcSAsim Jamshed cur_stream->closed = TRUE;
122376404edcSAsim Jamshed
122476404edcSAsim Jamshed TRACE_API("Stream %d: closing the stream.\n", cur_stream->id);
122576404edcSAsim Jamshed
122676404edcSAsim Jamshed /* 141029 dhkim: Check this! */
122776404edcSAsim Jamshed cur_stream->socket = NULL;
122876404edcSAsim Jamshed
122976404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
123076404edcSAsim Jamshed TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n",
123176404edcSAsim Jamshed cur_stream->id);
123276404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock);
123376404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream);
123476404edcSAsim Jamshed mtcp->wakeup_flag = TRUE;
123576404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
123676404edcSAsim Jamshed return 0;
123776404edcSAsim Jamshed
123876404edcSAsim Jamshed } else if (cur_stream->state == TCP_ST_SYN_SENT) {
123976404edcSAsim Jamshed #if 1
124076404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock);
124176404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream);
124276404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
124376404edcSAsim Jamshed mtcp->wakeup_flag = TRUE;
124476404edcSAsim Jamshed #endif
124576404edcSAsim Jamshed return -1;
124676404edcSAsim Jamshed
124776404edcSAsim Jamshed } else if (cur_stream->state != TCP_ST_ESTABLISHED &&
124876404edcSAsim Jamshed cur_stream->state != TCP_ST_CLOSE_WAIT) {
124976404edcSAsim Jamshed TRACE_API("Stream %d at state %s\n",
125076404edcSAsim Jamshed cur_stream->id, TCPStateToString(cur_stream));
125176404edcSAsim Jamshed errno = EBADF;
125276404edcSAsim Jamshed return -1;
125376404edcSAsim Jamshed }
125476404edcSAsim Jamshed
125576404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->close_lock);
125676404edcSAsim Jamshed cur_stream->sndvar->on_closeq = TRUE;
125776404edcSAsim Jamshed ret = StreamEnqueue(mtcp->closeq, cur_stream);
125876404edcSAsim Jamshed mtcp->wakeup_flag = TRUE;
125976404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->close_lock);
126076404edcSAsim Jamshed
126176404edcSAsim Jamshed if (ret < 0) {
126276404edcSAsim Jamshed TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
126376404edcSAsim Jamshed errno = EAGAIN;
126476404edcSAsim Jamshed return -1;
126576404edcSAsim Jamshed }
126676404edcSAsim Jamshed
126776404edcSAsim Jamshed return 0;
126876404edcSAsim Jamshed }
126976404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
127076404edcSAsim Jamshed static inline int
CloseListeningSocket(mctx_t mctx,int sockid)127176404edcSAsim Jamshed CloseListeningSocket(mctx_t mctx, int sockid)
127276404edcSAsim Jamshed {
127376404edcSAsim Jamshed mtcp_manager_t mtcp;
127476404edcSAsim Jamshed struct tcp_listener *listener;
127576404edcSAsim Jamshed
127676404edcSAsim Jamshed mtcp = GetMTCPManager(mctx);
127776404edcSAsim Jamshed if (!mtcp) {
127876404edcSAsim Jamshed errno = EACCES;
127976404edcSAsim Jamshed return -1;
128076404edcSAsim Jamshed }
128176404edcSAsim Jamshed
128276404edcSAsim Jamshed listener = mtcp->smap[sockid].listener;
128376404edcSAsim Jamshed if (!listener) {
128476404edcSAsim Jamshed errno = EINVAL;
128576404edcSAsim Jamshed return -1;
128676404edcSAsim Jamshed }
128776404edcSAsim Jamshed
128876404edcSAsim Jamshed if (listener->acceptq) {
128976404edcSAsim Jamshed DestroyStreamQueue(listener->acceptq);
129076404edcSAsim Jamshed listener->acceptq = NULL;
129176404edcSAsim Jamshed }
129276404edcSAsim Jamshed
129376404edcSAsim Jamshed pthread_mutex_lock(&listener->accept_lock);
129476404edcSAsim Jamshed pthread_cond_signal(&listener->accept_cond);
129576404edcSAsim Jamshed pthread_mutex_unlock(&listener->accept_lock);
129676404edcSAsim Jamshed
129776404edcSAsim Jamshed pthread_cond_destroy(&listener->accept_cond);
129876404edcSAsim Jamshed pthread_mutex_destroy(&listener->accept_lock);
129976404edcSAsim Jamshed
130076404edcSAsim Jamshed free(listener);
130176404edcSAsim Jamshed mtcp->smap[sockid].listener = NULL;
130276404edcSAsim Jamshed
130376404edcSAsim Jamshed return 0;
130476404edcSAsim Jamshed }
130576404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
130676404edcSAsim Jamshed int
mtcp_close(mctx_t mctx,int sockid)130776404edcSAsim Jamshed mtcp_close(mctx_t mctx, int sockid)
130876404edcSAsim Jamshed {
130976404edcSAsim Jamshed mtcp_manager_t mtcp;
131076404edcSAsim Jamshed int ret;
131176404edcSAsim Jamshed
131276404edcSAsim Jamshed mtcp = GetMTCPManager(mctx);
131376404edcSAsim Jamshed if (!mtcp) {
131476404edcSAsim Jamshed errno = EACCES;
131576404edcSAsim Jamshed return -1;
131676404edcSAsim Jamshed }
131776404edcSAsim Jamshed
131876404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
131976404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid);
132076404edcSAsim Jamshed errno = EBADF;
132176404edcSAsim Jamshed return -1;
132276404edcSAsim Jamshed }
132376404edcSAsim Jamshed
132476404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
132576404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid);
132676404edcSAsim Jamshed errno = EBADF;
132776404edcSAsim Jamshed return -1;
132876404edcSAsim Jamshed }
132976404edcSAsim Jamshed
133076404edcSAsim Jamshed TRACE_API("Socket %d: mtcp_close called.\n", sockid);
133176404edcSAsim Jamshed
133276404edcSAsim Jamshed switch (mtcp->smap[sockid].socktype) {
133376404edcSAsim Jamshed case MOS_SOCK_STREAM:
133476404edcSAsim Jamshed ret = CloseStreamSocket(mctx, sockid);
133576404edcSAsim Jamshed break;
133676404edcSAsim Jamshed
133776404edcSAsim Jamshed case MOS_SOCK_STREAM_LISTEN:
133876404edcSAsim Jamshed ret = CloseListeningSocket(mctx, sockid);
133976404edcSAsim Jamshed break;
134076404edcSAsim Jamshed
134176404edcSAsim Jamshed case MOS_SOCK_EPOLL:
134276404edcSAsim Jamshed ret = CloseEpollSocket(mctx, sockid);
134376404edcSAsim Jamshed break;
134476404edcSAsim Jamshed
134576404edcSAsim Jamshed case MOS_SOCK_PIPE:
134676404edcSAsim Jamshed ret = PipeClose(mctx, sockid);
134776404edcSAsim Jamshed break;
134876404edcSAsim Jamshed
134976404edcSAsim Jamshed default:
135076404edcSAsim Jamshed errno = EINVAL;
135176404edcSAsim Jamshed ret = -1;
135276404edcSAsim Jamshed break;
135376404edcSAsim Jamshed }
135476404edcSAsim Jamshed
135576404edcSAsim Jamshed FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
135676404edcSAsim Jamshed
135776404edcSAsim Jamshed return ret;
135876404edcSAsim Jamshed }
135976404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
136076404edcSAsim Jamshed int
mtcp_abort(mctx_t mctx,int sockid)136176404edcSAsim Jamshed mtcp_abort(mctx_t mctx, int sockid)
136276404edcSAsim Jamshed {
136376404edcSAsim Jamshed mtcp_manager_t mtcp;
136476404edcSAsim Jamshed tcp_stream *cur_stream;
136576404edcSAsim Jamshed int ret;
136676404edcSAsim Jamshed
136776404edcSAsim Jamshed mtcp = GetMTCPManager(mctx);
136876404edcSAsim Jamshed if (!mtcp) {
136976404edcSAsim Jamshed errno = EACCES;
137076404edcSAsim Jamshed return -1;
137176404edcSAsim Jamshed }
137276404edcSAsim Jamshed
137376404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
137476404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid);
137576404edcSAsim Jamshed errno = EBADF;
137676404edcSAsim Jamshed return -1;
137776404edcSAsim Jamshed }
137876404edcSAsim Jamshed
137976404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
138076404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid);
138176404edcSAsim Jamshed errno = EBADF;
138276404edcSAsim Jamshed return -1;
138376404edcSAsim Jamshed }
138476404edcSAsim Jamshed
138576404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
138676404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid);
138776404edcSAsim Jamshed errno = ENOTSOCK;
138876404edcSAsim Jamshed return -1;
138976404edcSAsim Jamshed }
139076404edcSAsim Jamshed
139176404edcSAsim Jamshed cur_stream = mtcp->smap[sockid].stream;
139276404edcSAsim Jamshed if (!cur_stream) {
139376404edcSAsim Jamshed TRACE_API("Stream %d: does not exist.\n", sockid);
139476404edcSAsim Jamshed errno = ENOTCONN;
139576404edcSAsim Jamshed return -1;
139676404edcSAsim Jamshed }
139776404edcSAsim Jamshed
139876404edcSAsim Jamshed TRACE_API("Socket %d: mtcp_abort()\n", sockid);
139976404edcSAsim Jamshed
140076404edcSAsim Jamshed FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
140176404edcSAsim Jamshed cur_stream->socket = NULL;
140276404edcSAsim Jamshed
140376404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
140476404edcSAsim Jamshed TRACE_API("Stream %d: connection already reset.\n", sockid);
140576404edcSAsim Jamshed return ERROR;
140676404edcSAsim Jamshed
140776404edcSAsim Jamshed } else if (cur_stream->state == TCP_ST_SYN_SENT) {
140876404edcSAsim Jamshed /* TODO: this should notify event failure to all
140976404edcSAsim Jamshed previous read() or write() calls */
141076404edcSAsim Jamshed cur_stream->state = TCP_ST_CLOSED_RSVD;
141176404edcSAsim Jamshed cur_stream->close_reason = TCP_ACTIVE_CLOSE;
141276404edcSAsim Jamshed cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
141376404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock);
141476404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream);
141576404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
141676404edcSAsim Jamshed mtcp->wakeup_flag = TRUE;
141776404edcSAsim Jamshed return 0;
141876404edcSAsim Jamshed
141976404edcSAsim Jamshed } else if (cur_stream->state == TCP_ST_CLOSING ||
142076404edcSAsim Jamshed cur_stream->state == TCP_ST_LAST_ACK ||
142176404edcSAsim Jamshed cur_stream->state == TCP_ST_TIME_WAIT) {
142276404edcSAsim Jamshed cur_stream->state = TCP_ST_CLOSED_RSVD;
142376404edcSAsim Jamshed cur_stream->close_reason = TCP_ACTIVE_CLOSE;
142476404edcSAsim Jamshed cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
142576404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock);
142676404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream);
142776404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
142876404edcSAsim Jamshed mtcp->wakeup_flag = TRUE;
142976404edcSAsim Jamshed return 0;
143076404edcSAsim Jamshed }
143176404edcSAsim Jamshed
143276404edcSAsim Jamshed /* the stream structure will be destroyed after sending RST */
143376404edcSAsim Jamshed if (cur_stream->sndvar->on_resetq) {
143476404edcSAsim Jamshed TRACE_ERROR("Stream %d: calling mtcp_abort() "
143576404edcSAsim Jamshed "when in reset queue.\n", sockid);
143676404edcSAsim Jamshed errno = ECONNRESET;
143776404edcSAsim Jamshed return -1;
143876404edcSAsim Jamshed }
143976404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->reset_lock);
144076404edcSAsim Jamshed cur_stream->sndvar->on_resetq = TRUE;
144176404edcSAsim Jamshed ret = StreamEnqueue(mtcp->resetq, cur_stream);
144276404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->reset_lock);
144376404edcSAsim Jamshed mtcp->wakeup_flag = TRUE;
144476404edcSAsim Jamshed
144576404edcSAsim Jamshed if (ret < 0) {
144676404edcSAsim Jamshed TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
144776404edcSAsim Jamshed errno = EAGAIN;
144876404edcSAsim Jamshed return -1;
144976404edcSAsim Jamshed }
145076404edcSAsim Jamshed
145176404edcSAsim Jamshed return 0;
145276404edcSAsim Jamshed }
145376404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
145476404edcSAsim Jamshed static inline int
PeekForUser(mtcp_manager_t mtcp,tcp_stream * cur_stream,char * buf,int len)1455df3fae06SAsim Jamshed PeekForUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len)
1456df3fae06SAsim Jamshed {
1457df3fae06SAsim Jamshed struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
1458df3fae06SAsim Jamshed int copylen;
1459df3fae06SAsim Jamshed tcprb_t *rb = rcvvar->rcvbuf;
1460df3fae06SAsim Jamshed
1461df3fae06SAsim Jamshed if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) {
1462df3fae06SAsim Jamshed errno = EAGAIN;
1463df3fae06SAsim Jamshed return -1;
1464df3fae06SAsim Jamshed }
1465df3fae06SAsim Jamshed
1466df3fae06SAsim Jamshed return copylen;
1467df3fae06SAsim Jamshed }
1468df3fae06SAsim Jamshed /*----------------------------------------------------------------------------*/
1469df3fae06SAsim Jamshed static inline int
CopyToUser(mtcp_manager_t mtcp,tcp_stream * cur_stream,char * buf,int len)147076404edcSAsim Jamshed CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len)
147176404edcSAsim Jamshed {
147276404edcSAsim Jamshed struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
147376404edcSAsim Jamshed int copylen;
147476404edcSAsim Jamshed tcprb_t *rb = rcvvar->rcvbuf;
147576404edcSAsim Jamshed if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) {
147676404edcSAsim Jamshed errno = EAGAIN;
147776404edcSAsim Jamshed return -1;
147876404edcSAsim Jamshed }
147976404edcSAsim Jamshed tcprb_setpile(rb, rb->pile + copylen);
148076404edcSAsim Jamshed
148176404edcSAsim Jamshed rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb);
148276404edcSAsim Jamshed //printf("rcv_wnd: %d\n", rcvvar->rcv_wnd);
148376404edcSAsim Jamshed
148476404edcSAsim Jamshed /* Advertise newly freed receive buffer */
148576404edcSAsim Jamshed if (cur_stream->need_wnd_adv) {
148676404edcSAsim Jamshed if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) {
148776404edcSAsim Jamshed if (!cur_stream->sndvar->on_ackq) {
148876404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->ackq_lock);
148976404edcSAsim Jamshed cur_stream->sndvar->on_ackq = TRUE;
149076404edcSAsim Jamshed StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */
149176404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->ackq_lock);
149276404edcSAsim Jamshed cur_stream->need_wnd_adv = FALSE;
149376404edcSAsim Jamshed mtcp->wakeup_flag = TRUE;
149476404edcSAsim Jamshed }
149576404edcSAsim Jamshed }
149676404edcSAsim Jamshed }
149776404edcSAsim Jamshed
149876404edcSAsim Jamshed return copylen;
149976404edcSAsim Jamshed }
150076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
150176404edcSAsim Jamshed ssize_t
mtcp_recv(mctx_t mctx,int sockid,char * buf,size_t len,int flags)1502df3fae06SAsim Jamshed mtcp_recv(mctx_t mctx, int sockid, char *buf, size_t len, int flags)
150376404edcSAsim Jamshed {
150476404edcSAsim Jamshed mtcp_manager_t mtcp;
150576404edcSAsim Jamshed socket_map_t socket;
150676404edcSAsim Jamshed tcp_stream *cur_stream;
150776404edcSAsim Jamshed struct tcp_recv_vars *rcvvar;
150876404edcSAsim Jamshed int event_remaining, merged_len;
150976404edcSAsim Jamshed int ret;
151076404edcSAsim Jamshed
151176404edcSAsim Jamshed mtcp = GetMTCPManager(mctx);
151276404edcSAsim Jamshed if (!mtcp) {
151376404edcSAsim Jamshed errno = EACCES;
151476404edcSAsim Jamshed return -1;
151576404edcSAsim Jamshed }
151676404edcSAsim Jamshed
151776404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
151876404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid);
151976404edcSAsim Jamshed errno = EBADF;
152076404edcSAsim Jamshed return -1;
152176404edcSAsim Jamshed }
152276404edcSAsim Jamshed
152376404edcSAsim Jamshed socket = &mtcp->smap[sockid];
152476404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) {
152576404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid);
152676404edcSAsim Jamshed errno = EBADF;
152776404edcSAsim Jamshed return -1;
152876404edcSAsim Jamshed }
152976404edcSAsim Jamshed
153076404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_PIPE) {
153176404edcSAsim Jamshed return PipeRead(mctx, sockid, buf, len);
153276404edcSAsim Jamshed }
153376404edcSAsim Jamshed
153476404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM) {
153576404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid);
153676404edcSAsim Jamshed errno = ENOTSOCK;
153776404edcSAsim Jamshed return -1;
153876404edcSAsim Jamshed }
153976404edcSAsim Jamshed
154076404edcSAsim Jamshed /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
154176404edcSAsim Jamshed cur_stream = socket->stream;
154276404edcSAsim Jamshed if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf ||
154376404edcSAsim Jamshed !(cur_stream->state >= TCP_ST_ESTABLISHED &&
154476404edcSAsim Jamshed cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
154576404edcSAsim Jamshed errno = ENOTCONN;
154676404edcSAsim Jamshed return -1;
154776404edcSAsim Jamshed }
154876404edcSAsim Jamshed
154976404edcSAsim Jamshed rcvvar = cur_stream->rcvvar;
155076404edcSAsim Jamshed
155176404edcSAsim Jamshed merged_len = tcprb_cflen(rcvvar->rcvbuf);
155276404edcSAsim Jamshed
155376404edcSAsim Jamshed /* if CLOSE_WAIT, return 0 if there is no payload */
155476404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
155576404edcSAsim Jamshed if (merged_len == 0)
155676404edcSAsim Jamshed return 0;
155776404edcSAsim Jamshed }
155876404edcSAsim Jamshed
155976404edcSAsim Jamshed /* return EAGAIN if no receive buffer */
156076404edcSAsim Jamshed if (socket->opts & MTCP_NONBLOCK) {
15611879243cSAsim Jamshed if (merged_len == 0) {
156276404edcSAsim Jamshed errno = EAGAIN;
156376404edcSAsim Jamshed return -1;
156476404edcSAsim Jamshed }
156576404edcSAsim Jamshed }
156676404edcSAsim Jamshed
156776404edcSAsim Jamshed SBUF_LOCK(&rcvvar->read_lock);
156876404edcSAsim Jamshed
1569df3fae06SAsim Jamshed switch (flags) {
1570df3fae06SAsim Jamshed case 0:
157176404edcSAsim Jamshed ret = CopyToUser(mtcp, cur_stream, buf, len);
1572df3fae06SAsim Jamshed break;
1573df3fae06SAsim Jamshed case MSG_PEEK:
1574df3fae06SAsim Jamshed ret = PeekForUser(mtcp, cur_stream, buf, len);
1575df3fae06SAsim Jamshed break;
1576df3fae06SAsim Jamshed default:
1577df3fae06SAsim Jamshed SBUF_UNLOCK(&rcvvar->read_lock);
1578df3fae06SAsim Jamshed ret = -1;
1579df3fae06SAsim Jamshed errno = EINVAL;
1580df3fae06SAsim Jamshed return ret;
1581df3fae06SAsim Jamshed }
158276404edcSAsim Jamshed
158376404edcSAsim Jamshed merged_len = tcprb_cflen(rcvvar->rcvbuf);
158476404edcSAsim Jamshed event_remaining = FALSE;
158576404edcSAsim Jamshed /* if there are remaining payload, generate EPOLLIN */
158676404edcSAsim Jamshed /* (may due to insufficient user buffer) */
158776404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLIN) {
158876404edcSAsim Jamshed if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
158976404edcSAsim Jamshed event_remaining = TRUE;
159076404edcSAsim Jamshed }
159176404edcSAsim Jamshed }
159276404edcSAsim Jamshed /* if waiting for close, notify it if no remaining data */
159376404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
159476404edcSAsim Jamshed merged_len == 0 && ret > 0) {
159576404edcSAsim Jamshed event_remaining = TRUE;
159676404edcSAsim Jamshed }
159776404edcSAsim Jamshed
159876404edcSAsim Jamshed SBUF_UNLOCK(&rcvvar->read_lock);
159976404edcSAsim Jamshed
160076404edcSAsim Jamshed if (event_remaining) {
160176404edcSAsim Jamshed if (socket->epoll) {
160276404edcSAsim Jamshed AddEpollEvent(mtcp->ep,
160376404edcSAsim Jamshed USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
160476404edcSAsim Jamshed }
160576404edcSAsim Jamshed }
160676404edcSAsim Jamshed
1607df3fae06SAsim Jamshed TRACE_API("Stream %d: mtcp_recv() returning %d\n", cur_stream->id, ret);
160876404edcSAsim Jamshed return ret;
160976404edcSAsim Jamshed }
161076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
1611df3fae06SAsim Jamshed inline ssize_t
mtcp_read(mctx_t mctx,int sockid,char * buf,size_t len)1612df3fae06SAsim Jamshed mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len)
1613df3fae06SAsim Jamshed {
1614df3fae06SAsim Jamshed return mtcp_recv(mctx, sockid, buf, len, 0);
1615df3fae06SAsim Jamshed }
1616df3fae06SAsim Jamshed /*----------------------------------------------------------------------------*/
161776404edcSAsim Jamshed ssize_t
mtcp_readv(mctx_t mctx,int sockid,const struct iovec * iov,int numIOV)1618a5e1a556SAsim Jamshed mtcp_readv(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV)
161976404edcSAsim Jamshed {
162076404edcSAsim Jamshed mtcp_manager_t mtcp;
162176404edcSAsim Jamshed socket_map_t socket;
162276404edcSAsim Jamshed tcp_stream *cur_stream;
162376404edcSAsim Jamshed struct tcp_recv_vars *rcvvar;
162476404edcSAsim Jamshed int ret, bytes_read, i;
162576404edcSAsim Jamshed int event_remaining, merged_len;
162676404edcSAsim Jamshed
162776404edcSAsim Jamshed mtcp = GetMTCPManager(mctx);
162876404edcSAsim Jamshed if (!mtcp) {
162976404edcSAsim Jamshed errno = EACCES;
163076404edcSAsim Jamshed return -1;
163176404edcSAsim Jamshed }
163276404edcSAsim Jamshed
163376404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
163476404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid);
163576404edcSAsim Jamshed errno = EBADF;
163676404edcSAsim Jamshed return -1;
163776404edcSAsim Jamshed }
163876404edcSAsim Jamshed
163976404edcSAsim Jamshed socket = &mtcp->smap[sockid];
164076404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) {
164176404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid);
164276404edcSAsim Jamshed errno = EBADF;
164376404edcSAsim Jamshed return -1;
164476404edcSAsim Jamshed }
164576404edcSAsim Jamshed
164676404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM) {
164776404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid);
164876404edcSAsim Jamshed errno = ENOTSOCK;
164976404edcSAsim Jamshed return -1;
165076404edcSAsim Jamshed }
165176404edcSAsim Jamshed
165276404edcSAsim Jamshed /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
165376404edcSAsim Jamshed cur_stream = socket->stream;
16541879243cSAsim Jamshed if (!cur_stream || !cur_stream->rcvvar->rcvbuf ||
165576404edcSAsim Jamshed !(cur_stream->state >= TCP_ST_ESTABLISHED &&
165676404edcSAsim Jamshed cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
165776404edcSAsim Jamshed errno = ENOTCONN;
165876404edcSAsim Jamshed return -1;
165976404edcSAsim Jamshed }
166076404edcSAsim Jamshed
166176404edcSAsim Jamshed rcvvar = cur_stream->rcvvar;
166276404edcSAsim Jamshed
166376404edcSAsim Jamshed merged_len = tcprb_cflen(rcvvar->rcvbuf);
166476404edcSAsim Jamshed
166576404edcSAsim Jamshed /* if CLOSE_WAIT, return 0 if there is no payload */
166676404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
166776404edcSAsim Jamshed if (merged_len == 0)
166876404edcSAsim Jamshed return 0;
166976404edcSAsim Jamshed }
167076404edcSAsim Jamshed
167176404edcSAsim Jamshed /* return EAGAIN if no receive buffer */
167276404edcSAsim Jamshed if (socket->opts & MTCP_NONBLOCK) {
16731879243cSAsim Jamshed if (merged_len == 0) {
167476404edcSAsim Jamshed errno = EAGAIN;
167576404edcSAsim Jamshed return -1;
167676404edcSAsim Jamshed }
167776404edcSAsim Jamshed }
167876404edcSAsim Jamshed
167976404edcSAsim Jamshed SBUF_LOCK(&rcvvar->read_lock);
168076404edcSAsim Jamshed
168176404edcSAsim Jamshed /* read and store the contents to the vectored buffers */
168276404edcSAsim Jamshed bytes_read = 0;
168376404edcSAsim Jamshed for (i = 0; i < numIOV; i++) {
168476404edcSAsim Jamshed if (iov[i].iov_len <= 0)
168576404edcSAsim Jamshed continue;
168676404edcSAsim Jamshed
168776404edcSAsim Jamshed ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
168876404edcSAsim Jamshed if (ret <= 0)
168976404edcSAsim Jamshed break;
169076404edcSAsim Jamshed
169176404edcSAsim Jamshed bytes_read += ret;
169276404edcSAsim Jamshed
169376404edcSAsim Jamshed if (ret < iov[i].iov_len)
169476404edcSAsim Jamshed break;
169576404edcSAsim Jamshed }
169676404edcSAsim Jamshed
169776404edcSAsim Jamshed merged_len = tcprb_cflen(rcvvar->rcvbuf);
169876404edcSAsim Jamshed
169976404edcSAsim Jamshed event_remaining = FALSE;
170076404edcSAsim Jamshed /* if there are remaining payload, generate read event */
170176404edcSAsim Jamshed /* (may due to insufficient user buffer) */
170276404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLIN) {
170376404edcSAsim Jamshed if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
170476404edcSAsim Jamshed event_remaining = TRUE;
170576404edcSAsim Jamshed }
170676404edcSAsim Jamshed }
170776404edcSAsim Jamshed /* if waiting for close, notify it if no remaining data */
170876404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
170976404edcSAsim Jamshed merged_len == 0 && bytes_read > 0) {
171076404edcSAsim Jamshed event_remaining = TRUE;
171176404edcSAsim Jamshed }
171276404edcSAsim Jamshed
171376404edcSAsim Jamshed SBUF_UNLOCK(&rcvvar->read_lock);
171476404edcSAsim Jamshed
171576404edcSAsim Jamshed if(event_remaining) {
171676404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) {
171776404edcSAsim Jamshed AddEpollEvent(mtcp->ep,
171876404edcSAsim Jamshed USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
171976404edcSAsim Jamshed }
172076404edcSAsim Jamshed }
172176404edcSAsim Jamshed
172276404edcSAsim Jamshed TRACE_API("Stream %d: mtcp_readv() returning %d\n",
172376404edcSAsim Jamshed cur_stream->id, bytes_read);
172476404edcSAsim Jamshed return bytes_read;
172576404edcSAsim Jamshed }
172676404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
172776404edcSAsim Jamshed static inline int
CopyFromUser(mtcp_manager_t mtcp,tcp_stream * cur_stream,const char * buf,int len)1728a5e1a556SAsim Jamshed CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, const char *buf, int len)
172976404edcSAsim Jamshed {
173076404edcSAsim Jamshed struct tcp_send_vars *sndvar = cur_stream->sndvar;
173176404edcSAsim Jamshed int sndlen;
173276404edcSAsim Jamshed int ret;
173376404edcSAsim Jamshed
173476404edcSAsim Jamshed sndlen = MIN((int)sndvar->snd_wnd, len);
173576404edcSAsim Jamshed if (sndlen <= 0) {
173676404edcSAsim Jamshed errno = EAGAIN;
173776404edcSAsim Jamshed return -1;
173876404edcSAsim Jamshed }
173976404edcSAsim Jamshed
174076404edcSAsim Jamshed /* allocate send buffer if not exist */
174176404edcSAsim Jamshed if (!sndvar->sndbuf) {
174276404edcSAsim Jamshed sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1);
174376404edcSAsim Jamshed if (!sndvar->sndbuf) {
174476404edcSAsim Jamshed cur_stream->close_reason = TCP_NO_MEM;
174576404edcSAsim Jamshed /* notification may not required due to -1 return */
174676404edcSAsim Jamshed errno = ENOMEM;
174776404edcSAsim Jamshed return -1;
174876404edcSAsim Jamshed }
174976404edcSAsim Jamshed }
175076404edcSAsim Jamshed
175176404edcSAsim Jamshed ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen);
175276404edcSAsim Jamshed assert(ret == sndlen);
175376404edcSAsim Jamshed sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len;
175476404edcSAsim Jamshed if (ret <= 0) {
175576404edcSAsim Jamshed TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n",
175676404edcSAsim Jamshed ret, sndlen, sndvar->sndbuf->len);
175776404edcSAsim Jamshed errno = EAGAIN;
175876404edcSAsim Jamshed return -1;
175976404edcSAsim Jamshed }
176076404edcSAsim Jamshed
176176404edcSAsim Jamshed if (sndvar->snd_wnd <= 0) {
176276404edcSAsim Jamshed TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n",
176376404edcSAsim Jamshed cur_stream->id, sndvar->snd_wnd);
176476404edcSAsim Jamshed }
176576404edcSAsim Jamshed
176676404edcSAsim Jamshed return ret;
176776404edcSAsim Jamshed }
176876404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
176976404edcSAsim Jamshed ssize_t
mtcp_write(mctx_t mctx,int sockid,const char * buf,size_t len)1770a5e1a556SAsim Jamshed mtcp_write(mctx_t mctx, int sockid, const char *buf, size_t len)
177176404edcSAsim Jamshed {
177276404edcSAsim Jamshed mtcp_manager_t mtcp;
177376404edcSAsim Jamshed socket_map_t socket;
177476404edcSAsim Jamshed tcp_stream *cur_stream;
177576404edcSAsim Jamshed struct tcp_send_vars *sndvar;
177676404edcSAsim Jamshed int ret;
177776404edcSAsim Jamshed
177876404edcSAsim Jamshed mtcp = GetMTCPManager(mctx);
177976404edcSAsim Jamshed if (!mtcp) {
178076404edcSAsim Jamshed errno = EACCES;
178176404edcSAsim Jamshed return -1;
178276404edcSAsim Jamshed }
178376404edcSAsim Jamshed
178476404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
178576404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid);
178676404edcSAsim Jamshed errno = EBADF;
178776404edcSAsim Jamshed return -1;
178876404edcSAsim Jamshed }
178976404edcSAsim Jamshed
179076404edcSAsim Jamshed socket = &mtcp->smap[sockid];
179176404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) {
179276404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid);
179376404edcSAsim Jamshed errno = EBADF;
179476404edcSAsim Jamshed return -1;
179576404edcSAsim Jamshed }
179676404edcSAsim Jamshed
179776404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_PIPE) {
179876404edcSAsim Jamshed return PipeWrite(mctx, sockid, buf, len);
179976404edcSAsim Jamshed }
180076404edcSAsim Jamshed
180176404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM) {
180276404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid);
180376404edcSAsim Jamshed errno = ENOTSOCK;
180476404edcSAsim Jamshed return -1;
180576404edcSAsim Jamshed }
180676404edcSAsim Jamshed
180776404edcSAsim Jamshed cur_stream = socket->stream;
180876404edcSAsim Jamshed if (!cur_stream ||
180976404edcSAsim Jamshed !(cur_stream->state == TCP_ST_ESTABLISHED ||
181076404edcSAsim Jamshed cur_stream->state == TCP_ST_CLOSE_WAIT)) {
181176404edcSAsim Jamshed errno = ENOTCONN;
181276404edcSAsim Jamshed return -1;
181376404edcSAsim Jamshed }
181476404edcSAsim Jamshed
181576404edcSAsim Jamshed if (len <= 0) {
181676404edcSAsim Jamshed if (socket->opts & MTCP_NONBLOCK) {
181776404edcSAsim Jamshed errno = EAGAIN;
181876404edcSAsim Jamshed return -1;
181976404edcSAsim Jamshed } else {
182076404edcSAsim Jamshed return 0;
182176404edcSAsim Jamshed }
182276404edcSAsim Jamshed }
182376404edcSAsim Jamshed
182476404edcSAsim Jamshed sndvar = cur_stream->sndvar;
182576404edcSAsim Jamshed
182676404edcSAsim Jamshed SBUF_LOCK(&sndvar->write_lock);
182776404edcSAsim Jamshed ret = CopyFromUser(mtcp, cur_stream, buf, len);
182876404edcSAsim Jamshed
182976404edcSAsim Jamshed SBUF_UNLOCK(&sndvar->write_lock);
183076404edcSAsim Jamshed
183176404edcSAsim Jamshed if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
183276404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->sendq_lock);
183376404edcSAsim Jamshed sndvar->on_sendq = TRUE;
183476404edcSAsim Jamshed StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */
183576404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->sendq_lock);
183676404edcSAsim Jamshed mtcp->wakeup_flag = TRUE;
183776404edcSAsim Jamshed }
183876404edcSAsim Jamshed
183976404edcSAsim Jamshed if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) {
184076404edcSAsim Jamshed ret = -1;
184176404edcSAsim Jamshed errno = EAGAIN;
184276404edcSAsim Jamshed }
184376404edcSAsim Jamshed
184476404edcSAsim Jamshed /* if there are remaining sending buffer, generate write event */
184576404edcSAsim Jamshed if (sndvar->snd_wnd > 0) {
184676404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
184776404edcSAsim Jamshed AddEpollEvent(mtcp->ep,
184876404edcSAsim Jamshed USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
184976404edcSAsim Jamshed }
185076404edcSAsim Jamshed }
185176404edcSAsim Jamshed
185276404edcSAsim Jamshed TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret);
185376404edcSAsim Jamshed return ret;
185476404edcSAsim Jamshed }
185576404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
185676404edcSAsim Jamshed ssize_t
mtcp_writev(mctx_t mctx,int sockid,const struct iovec * iov,int numIOV)1857a5e1a556SAsim Jamshed mtcp_writev(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV)
185876404edcSAsim Jamshed {
185976404edcSAsim Jamshed mtcp_manager_t mtcp;
186076404edcSAsim Jamshed socket_map_t socket;
186176404edcSAsim Jamshed tcp_stream *cur_stream;
186276404edcSAsim Jamshed struct tcp_send_vars *sndvar;
186376404edcSAsim Jamshed int ret, to_write, i;
186476404edcSAsim Jamshed
186576404edcSAsim Jamshed mtcp = GetMTCPManager(mctx);
186676404edcSAsim Jamshed if (!mtcp) {
186776404edcSAsim Jamshed errno = EACCES;
186876404edcSAsim Jamshed return -1;
186976404edcSAsim Jamshed }
187076404edcSAsim Jamshed
187176404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
187276404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid);
187376404edcSAsim Jamshed errno = EBADF;
187476404edcSAsim Jamshed return -1;
187576404edcSAsim Jamshed }
187676404edcSAsim Jamshed
187776404edcSAsim Jamshed socket = &mtcp->smap[sockid];
187876404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) {
187976404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid);
188076404edcSAsim Jamshed errno = EBADF;
188176404edcSAsim Jamshed return -1;
188276404edcSAsim Jamshed }
188376404edcSAsim Jamshed
188476404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM) {
188576404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid);
188676404edcSAsim Jamshed errno = ENOTSOCK;
188776404edcSAsim Jamshed return -1;
188876404edcSAsim Jamshed }
188976404edcSAsim Jamshed
189076404edcSAsim Jamshed cur_stream = socket->stream;
189176404edcSAsim Jamshed if (!cur_stream ||
189276404edcSAsim Jamshed !(cur_stream->state == TCP_ST_ESTABLISHED ||
189376404edcSAsim Jamshed cur_stream->state == TCP_ST_CLOSE_WAIT)) {
189476404edcSAsim Jamshed errno = ENOTCONN;
189576404edcSAsim Jamshed return -1;
189676404edcSAsim Jamshed }
189776404edcSAsim Jamshed
189876404edcSAsim Jamshed sndvar = cur_stream->sndvar;
189976404edcSAsim Jamshed SBUF_LOCK(&sndvar->write_lock);
190076404edcSAsim Jamshed
190176404edcSAsim Jamshed /* write from the vectored buffers */
190276404edcSAsim Jamshed to_write = 0;
190376404edcSAsim Jamshed for (i = 0; i < numIOV; i++) {
190476404edcSAsim Jamshed if (iov[i].iov_len <= 0)
190576404edcSAsim Jamshed continue;
190676404edcSAsim Jamshed
190776404edcSAsim Jamshed ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
190876404edcSAsim Jamshed if (ret <= 0)
190976404edcSAsim Jamshed break;
191076404edcSAsim Jamshed
191176404edcSAsim Jamshed to_write += ret;
191276404edcSAsim Jamshed
191376404edcSAsim Jamshed if (ret < iov[i].iov_len)
191476404edcSAsim Jamshed break;
191576404edcSAsim Jamshed }
191676404edcSAsim Jamshed SBUF_UNLOCK(&sndvar->write_lock);
191776404edcSAsim Jamshed
191876404edcSAsim Jamshed if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
191976404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->sendq_lock);
192076404edcSAsim Jamshed sndvar->on_sendq = TRUE;
192176404edcSAsim Jamshed StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */
192276404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->sendq_lock);
192376404edcSAsim Jamshed mtcp->wakeup_flag = TRUE;
192476404edcSAsim Jamshed }
192576404edcSAsim Jamshed
192676404edcSAsim Jamshed if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) {
192776404edcSAsim Jamshed to_write = -1;
192876404edcSAsim Jamshed errno = EAGAIN;
192976404edcSAsim Jamshed }
193076404edcSAsim Jamshed
193176404edcSAsim Jamshed /* if there are remaining sending buffer, generate write event */
193276404edcSAsim Jamshed if (sndvar->snd_wnd > 0) {
193376404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
193476404edcSAsim Jamshed AddEpollEvent(mtcp->ep,
193576404edcSAsim Jamshed USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
193676404edcSAsim Jamshed }
193776404edcSAsim Jamshed }
193876404edcSAsim Jamshed
193976404edcSAsim Jamshed TRACE_API("Stream %d: mtcp_writev() returning %d\n",
194076404edcSAsim Jamshed cur_stream->id, to_write);
194176404edcSAsim Jamshed return to_write;
194276404edcSAsim Jamshed }
194376404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
194476404edcSAsim Jamshed uint32_t
mtcp_get_connection_cnt(mctx_t mctx)194576404edcSAsim Jamshed mtcp_get_connection_cnt(mctx_t mctx)
194676404edcSAsim Jamshed {
194776404edcSAsim Jamshed mtcp_manager_t mtcp;
194876404edcSAsim Jamshed mtcp = GetMTCPManager(mctx);
194976404edcSAsim Jamshed if (!mtcp) {
195076404edcSAsim Jamshed errno = EACCES;
195176404edcSAsim Jamshed return -1;
195276404edcSAsim Jamshed }
195376404edcSAsim Jamshed
195476404edcSAsim Jamshed if (mtcp->num_msp > 0)
195576404edcSAsim Jamshed return mtcp->flow_cnt / 2;
195676404edcSAsim Jamshed else
195776404edcSAsim Jamshed return mtcp->flow_cnt;
195876404edcSAsim Jamshed }
195976404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
1960