176404edcSAsim Jamshed #include <sys/queue.h> 276404edcSAsim Jamshed #include <sys/ioctl.h> 376404edcSAsim Jamshed #include <limits.h> 476404edcSAsim Jamshed #include <unistd.h> 576404edcSAsim Jamshed #include <assert.h> 676404edcSAsim Jamshed #include <string.h> 776404edcSAsim Jamshed 876404edcSAsim Jamshed #ifdef DARWIN 976404edcSAsim Jamshed #include <netinet/tcp.h> 1076404edcSAsim Jamshed #include <netinet/ip.h> 1176404edcSAsim Jamshed #include <netinet/if_ether.h> 1276404edcSAsim Jamshed #endif 1376404edcSAsim Jamshed 1476404edcSAsim Jamshed #include "mtcp.h" 1576404edcSAsim Jamshed #include "mtcp_api.h" 1676404edcSAsim Jamshed #include "tcp_in.h" 1776404edcSAsim Jamshed #include "tcp_stream.h" 1876404edcSAsim Jamshed #include "tcp_out.h" 1976404edcSAsim Jamshed #include "ip_out.h" 2076404edcSAsim Jamshed #include "eventpoll.h" 2176404edcSAsim Jamshed #include "pipe.h" 2276404edcSAsim Jamshed #include "fhash.h" 2376404edcSAsim Jamshed #include "addr_pool.h" 24*152f7c19SAsim Jamshed #include "util.h" 2576404edcSAsim Jamshed #include "config.h" 2676404edcSAsim Jamshed #include "debug.h" 2776404edcSAsim Jamshed #include "eventpoll.h" 2876404edcSAsim Jamshed #include "mos_api.h" 2976404edcSAsim Jamshed #include "tcp_rb.h" 3076404edcSAsim Jamshed 3176404edcSAsim Jamshed #define MAX(a, b) ((a)>(b)?(a):(b)) 3276404edcSAsim Jamshed #define MIN(a, b) ((a)<(b)?(a):(b)) 3376404edcSAsim Jamshed 3476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 3576404edcSAsim Jamshed /** Stop monitoring the socket! (function prototype) 3676404edcSAsim Jamshed * @param [in] mctx: mtcp context 3776404edcSAsim Jamshed * @param [in] sock: monitoring stream socket id 3876404edcSAsim Jamshed * @param [in] side: side of monitoring (client side, server side or both) 3976404edcSAsim Jamshed * 4076404edcSAsim Jamshed * This function is now DEPRECATED and is only used within mOS core... 4176404edcSAsim Jamshed */ 4276404edcSAsim Jamshed int 4376404edcSAsim Jamshed mtcp_cb_stop(mctx_t mctx, int sock, int side); 4476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 4576404edcSAsim Jamshed /** Reset the connection (send RST packets to both sides) 4676404edcSAsim Jamshed * (We need to decide the API for this.) 4776404edcSAsim Jamshed */ 4876404edcSAsim Jamshed //int 4976404edcSAsim Jamshed //mtcp_cb_reset(mctx_t mctx, int sock, int side); 5076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 5176404edcSAsim Jamshed inline mtcp_manager_t 5276404edcSAsim Jamshed GetMTCPManager(mctx_t mctx) 5376404edcSAsim Jamshed { 5476404edcSAsim Jamshed if (!mctx) { 5576404edcSAsim Jamshed errno = EACCES; 5676404edcSAsim Jamshed return NULL; 5776404edcSAsim Jamshed } 5876404edcSAsim Jamshed 5976404edcSAsim Jamshed if (mctx->cpu < 0 || mctx->cpu >= num_cpus) { 6076404edcSAsim Jamshed errno = EINVAL; 6176404edcSAsim Jamshed return NULL; 6276404edcSAsim Jamshed } 6376404edcSAsim Jamshed 644cb4e140SAsim Jamshed if (!g_mtcp[mctx->cpu] || g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) { 6576404edcSAsim Jamshed errno = EPERM; 6676404edcSAsim Jamshed return NULL; 6776404edcSAsim Jamshed } 6876404edcSAsim Jamshed 6976404edcSAsim Jamshed return g_mtcp[mctx->cpu]; 7076404edcSAsim Jamshed } 7176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 72a5e1a556SAsim Jamshed static inline int 7376404edcSAsim Jamshed GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen) 7476404edcSAsim Jamshed { 7576404edcSAsim Jamshed tcp_stream *cur_stream; 7676404edcSAsim Jamshed 7776404edcSAsim Jamshed if (!socket->stream) { 7876404edcSAsim Jamshed errno = EBADF; 7976404edcSAsim Jamshed return -1; 8076404edcSAsim Jamshed } 8176404edcSAsim Jamshed 8276404edcSAsim Jamshed cur_stream = socket->stream; 8376404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 8476404edcSAsim Jamshed if (cur_stream->close_reason == TCP_TIMEDOUT || 8576404edcSAsim Jamshed cur_stream->close_reason == TCP_CONN_FAIL || 8676404edcSAsim Jamshed cur_stream->close_reason == TCP_CONN_LOST) { 8776404edcSAsim Jamshed *(int *)optval = ETIMEDOUT; 8876404edcSAsim Jamshed *optlen = sizeof(int); 8976404edcSAsim Jamshed 9076404edcSAsim Jamshed return 0; 9176404edcSAsim Jamshed } 9276404edcSAsim Jamshed } 9376404edcSAsim Jamshed 9476404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT || 9576404edcSAsim Jamshed cur_stream->state == TCP_ST_CLOSED_RSVD) { 9676404edcSAsim Jamshed if (cur_stream->close_reason == TCP_RESET) { 9776404edcSAsim Jamshed *(int *)optval = ECONNRESET; 9876404edcSAsim Jamshed *optlen = sizeof(int); 9976404edcSAsim Jamshed 10076404edcSAsim Jamshed return 0; 10176404edcSAsim Jamshed } 10276404edcSAsim Jamshed } 10376404edcSAsim Jamshed 104a5e1a556SAsim Jamshed if (cur_stream->state == TCP_ST_SYN_SENT && 105a5e1a556SAsim Jamshed errno == EINPROGRESS) { 106a5e1a556SAsim Jamshed *(int *)optval = errno; 107a5e1a556SAsim Jamshed *optlen = sizeof(int); 108a5e1a556SAsim Jamshed 109a5e1a556SAsim Jamshed return -1; 110a5e1a556SAsim Jamshed } 111a5e1a556SAsim Jamshed 112265cb675SAsim Jamshed /* 113265cb675SAsim Jamshed * `base case`: If socket sees no so_error, then 114265cb675SAsim Jamshed * this also means close_reason will always be 115265cb675SAsim Jamshed * TCP_NOT_CLOSED. 116265cb675SAsim Jamshed */ 117265cb675SAsim Jamshed if (cur_stream->close_reason == TCP_NOT_CLOSED) { 118265cb675SAsim Jamshed *(int *)optval = 0; 119265cb675SAsim Jamshed *optlen = sizeof(int); 120265cb675SAsim Jamshed 121265cb675SAsim Jamshed return 0; 122265cb675SAsim Jamshed } 123265cb675SAsim Jamshed 12476404edcSAsim Jamshed errno = ENOSYS; 12576404edcSAsim Jamshed return -1; 12676404edcSAsim Jamshed } 12776404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 12876404edcSAsim Jamshed int 129a5e1a556SAsim Jamshed mtcp_getsockname(mctx_t mctx, int sockid, struct sockaddr *addr, 130a5e1a556SAsim Jamshed socklen_t *addrlen) 131a5e1a556SAsim Jamshed { 132a5e1a556SAsim Jamshed mtcp_manager_t mtcp; 133a5e1a556SAsim Jamshed socket_map_t socket; 134a5e1a556SAsim Jamshed 135a5e1a556SAsim Jamshed mtcp = GetMTCPManager(mctx); 136a5e1a556SAsim Jamshed if (!mtcp) { 137a5e1a556SAsim Jamshed return -1; 138a5e1a556SAsim Jamshed } 139a5e1a556SAsim Jamshed 140a5e1a556SAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 141a5e1a556SAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 142a5e1a556SAsim Jamshed errno = EBADF; 143a5e1a556SAsim Jamshed return -1; 144a5e1a556SAsim Jamshed } 145a5e1a556SAsim Jamshed 146a5e1a556SAsim Jamshed socket = &mtcp->smap[sockid]; 147a5e1a556SAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 148a5e1a556SAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 149a5e1a556SAsim Jamshed errno = EBADF; 150a5e1a556SAsim Jamshed return -1; 151a5e1a556SAsim Jamshed } 152a5e1a556SAsim Jamshed 153a5e1a556SAsim Jamshed if (*addrlen <= 0) { 154a5e1a556SAsim Jamshed TRACE_API("Invalid addrlen: %d\n", *addrlen); 155a5e1a556SAsim Jamshed errno = EINVAL; 156a5e1a556SAsim Jamshed return -1; 157a5e1a556SAsim Jamshed } 158a5e1a556SAsim Jamshed 159a5e1a556SAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 160a5e1a556SAsim Jamshed socket->socktype != MOS_SOCK_STREAM) { 161a5e1a556SAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 162a5e1a556SAsim Jamshed errno = ENOTSOCK; 163a5e1a556SAsim Jamshed return -1; 164a5e1a556SAsim Jamshed } 165a5e1a556SAsim Jamshed 166a5e1a556SAsim Jamshed *(struct sockaddr_in *)addr = socket->saddr; 167a5e1a556SAsim Jamshed *addrlen = sizeof(socket->saddr); 168a5e1a556SAsim Jamshed 169a5e1a556SAsim Jamshed return 0; 170a5e1a556SAsim Jamshed } 171a5e1a556SAsim Jamshed /*----------------------------------------------------------------------------*/ 172a5e1a556SAsim Jamshed int 17376404edcSAsim Jamshed mtcp_getsockopt(mctx_t mctx, int sockid, int level, 17476404edcSAsim Jamshed int optname, void *optval, socklen_t *optlen) 17576404edcSAsim Jamshed { 17676404edcSAsim Jamshed mtcp_manager_t mtcp; 17776404edcSAsim Jamshed socket_map_t socket; 17876404edcSAsim Jamshed 17976404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 18076404edcSAsim Jamshed if (!mtcp) { 18176404edcSAsim Jamshed errno = EACCES; 18276404edcSAsim Jamshed return -1; 18376404edcSAsim Jamshed } 18476404edcSAsim Jamshed 18576404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 18676404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 18776404edcSAsim Jamshed errno = EBADF; 18876404edcSAsim Jamshed return -1; 18976404edcSAsim Jamshed } 19076404edcSAsim Jamshed 19176404edcSAsim Jamshed switch (level) { 19276404edcSAsim Jamshed case SOL_SOCKET: 19376404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 19476404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 19576404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 19676404edcSAsim Jamshed errno = EBADF; 19776404edcSAsim Jamshed return -1; 19876404edcSAsim Jamshed } 19976404edcSAsim Jamshed 20076404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 20176404edcSAsim Jamshed socket->socktype != MOS_SOCK_STREAM) { 20276404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 20376404edcSAsim Jamshed errno = ENOTSOCK; 20476404edcSAsim Jamshed return -1; 20576404edcSAsim Jamshed } 20676404edcSAsim Jamshed 20776404edcSAsim Jamshed if (optname == SO_ERROR) { 20876404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_STREAM) { 20976404edcSAsim Jamshed return GetSocketError(socket, optval, optlen); 21076404edcSAsim Jamshed } 21176404edcSAsim Jamshed } 21276404edcSAsim Jamshed break; 21376404edcSAsim Jamshed case SOL_MONSOCKET: 21476404edcSAsim Jamshed /* check if the calling thread is in MOS context */ 21576404edcSAsim Jamshed if (mtcp->ctx->thread != pthread_self()) { 21676404edcSAsim Jamshed errno = EPERM; 21776404edcSAsim Jamshed return -1; 21876404edcSAsim Jamshed } 21976404edcSAsim Jamshed /* 22076404edcSAsim Jamshed * All options will only work for active 22176404edcSAsim Jamshed * monitor stream sockets 22276404edcSAsim Jamshed */ 22376404edcSAsim Jamshed socket = &mtcp->msmap[sockid]; 22476404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 22576404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 22676404edcSAsim Jamshed errno = ENOTSOCK; 22776404edcSAsim Jamshed return -1; 22876404edcSAsim Jamshed } 22976404edcSAsim Jamshed 23076404edcSAsim Jamshed switch (optname) { 23176404edcSAsim Jamshed case MOS_FRAGINFO_CLIBUF: 23276404edcSAsim Jamshed return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen); 23376404edcSAsim Jamshed case MOS_FRAGINFO_SVRBUF: 23476404edcSAsim Jamshed return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen); 23576404edcSAsim Jamshed case MOS_INFO_CLIBUF: 23676404edcSAsim Jamshed return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen); 23776404edcSAsim Jamshed case MOS_INFO_SVRBUF: 23876404edcSAsim Jamshed return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen); 23976404edcSAsim Jamshed case MOS_TCP_STATE_CLI: 24076404edcSAsim Jamshed return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI, 24176404edcSAsim Jamshed optval, optlen); 24276404edcSAsim Jamshed case MOS_TCP_STATE_SVR: 24376404edcSAsim Jamshed return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR, 24476404edcSAsim Jamshed optval, optlen); 24576404edcSAsim Jamshed case MOS_TIMESTAMP: 24676404edcSAsim Jamshed return GetLastTimestamp(socket->monitor_stream->stream, 24776404edcSAsim Jamshed (uint32_t *)optval, 24876404edcSAsim Jamshed optlen); 24976404edcSAsim Jamshed default: 25076404edcSAsim Jamshed TRACE_API("can't recognize the optname=%d\n", optname); 25176404edcSAsim Jamshed assert(0); 25276404edcSAsim Jamshed } 25376404edcSAsim Jamshed break; 25476404edcSAsim Jamshed } 25576404edcSAsim Jamshed errno = ENOSYS; 25676404edcSAsim Jamshed return -1; 25776404edcSAsim Jamshed } 25876404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 25976404edcSAsim Jamshed int 26076404edcSAsim Jamshed mtcp_setsockopt(mctx_t mctx, int sockid, int level, 26176404edcSAsim Jamshed int optname, const void *optval, socklen_t optlen) 26276404edcSAsim Jamshed { 26376404edcSAsim Jamshed mtcp_manager_t mtcp; 26476404edcSAsim Jamshed socket_map_t socket; 26576404edcSAsim Jamshed tcprb_t *rb; 26676404edcSAsim Jamshed 26776404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 26876404edcSAsim Jamshed if (!mtcp) { 26976404edcSAsim Jamshed errno = EACCES; 27076404edcSAsim Jamshed return -1; 27176404edcSAsim Jamshed } 27276404edcSAsim Jamshed 27376404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 27476404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 27576404edcSAsim Jamshed errno = EBADF; 27676404edcSAsim Jamshed return -1; 27776404edcSAsim Jamshed } 27876404edcSAsim Jamshed 27976404edcSAsim Jamshed switch (level) { 28076404edcSAsim Jamshed case SOL_SOCKET: 28176404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 28276404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 28376404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 28476404edcSAsim Jamshed errno = EBADF; 28576404edcSAsim Jamshed return -1; 28676404edcSAsim Jamshed } 28776404edcSAsim Jamshed 28876404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 28976404edcSAsim Jamshed socket->socktype != MOS_SOCK_STREAM) { 29076404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 29176404edcSAsim Jamshed errno = ENOTSOCK; 29276404edcSAsim Jamshed return -1; 29376404edcSAsim Jamshed } 29476404edcSAsim Jamshed break; 29576404edcSAsim Jamshed case SOL_MONSOCKET: 29676404edcSAsim Jamshed socket = &mtcp->msmap[sockid]; 29776404edcSAsim Jamshed /* 29876404edcSAsim Jamshed * checking of calling thread to be in MOS context is 29976404edcSAsim Jamshed * disabled since both optnames can be called from 30076404edcSAsim Jamshed * `application' context (on passive sockets) 30176404edcSAsim Jamshed */ 30276404edcSAsim Jamshed /* 30376404edcSAsim Jamshed * if (mtcp->ctx->thread != pthread_self()) 30476404edcSAsim Jamshed * return -1; 30576404edcSAsim Jamshed */ 30676404edcSAsim Jamshed 30776404edcSAsim Jamshed switch (optname) { 308a834ea89SAsim Jamshed case MOS_CLIOVERLAP: 309a834ea89SAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 310a834ea89SAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 311a834ea89SAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 312a834ea89SAsim Jamshed if (rb == NULL) { 313a834ea89SAsim Jamshed errno = EFAULT; 314a834ea89SAsim Jamshed return -1; 315a834ea89SAsim Jamshed } 316a834ea89SAsim Jamshed if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) { 317a834ea89SAsim Jamshed errno = EINVAL; 318a834ea89SAsim Jamshed return -1; 319a834ea89SAsim Jamshed } else 320a834ea89SAsim Jamshed return 0; 321a834ea89SAsim Jamshed break; 322a834ea89SAsim Jamshed case MOS_SVROVERLAP: 323a834ea89SAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 324a834ea89SAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 325a834ea89SAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 326a834ea89SAsim Jamshed if (rb == NULL) { 327a834ea89SAsim Jamshed errno = EFAULT; 328a834ea89SAsim Jamshed return -1; 329a834ea89SAsim Jamshed } 330a834ea89SAsim Jamshed if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) { 331a834ea89SAsim Jamshed errno = EINVAL; 332a834ea89SAsim Jamshed return -1; 333a834ea89SAsim Jamshed } else 334a834ea89SAsim Jamshed return 0; 335a834ea89SAsim Jamshed break; 33676404edcSAsim Jamshed case MOS_CLIBUF: 33776404edcSAsim Jamshed #if 0 33876404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 33976404edcSAsim Jamshed errno = EBADF; 34076404edcSAsim Jamshed return -1; 34176404edcSAsim Jamshed } 34276404edcSAsim Jamshed #endif 34376404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE 34476404edcSAsim Jamshed if (*(int *)optval != 0) 34576404edcSAsim Jamshed return -1; 34676404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 34776404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 34876404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 34976404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 35076404edcSAsim Jamshed if (rb) { 35176404edcSAsim Jamshed tcprb_resize_meta(rb, 0); 35276404edcSAsim Jamshed tcprb_resize(rb, 0); 35376404edcSAsim Jamshed } 35476404edcSAsim Jamshed } 35576404edcSAsim Jamshed return DisableBuf(socket, MOS_SIDE_CLI); 35676404edcSAsim Jamshed #else 35776404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 35876404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 35976404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 36076404edcSAsim Jamshed if (tcprb_resize_meta(rb, *(int *)optval) < 0) 36176404edcSAsim Jamshed return -1; 36276404edcSAsim Jamshed return tcprb_resize(rb, 36376404edcSAsim Jamshed (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 36476404edcSAsim Jamshed #endif 36576404edcSAsim Jamshed case MOS_SVRBUF: 36676404edcSAsim Jamshed #if 0 36776404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 36876404edcSAsim Jamshed errno = EBADF; 36976404edcSAsim Jamshed return -1; 37076404edcSAsim Jamshed } 37176404edcSAsim Jamshed #endif 37276404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE 37376404edcSAsim Jamshed if (*(int *)optval != 0) 37476404edcSAsim Jamshed return -1; 37576404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 37676404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 37776404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 37876404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 37976404edcSAsim Jamshed if (rb) { 38076404edcSAsim Jamshed tcprb_resize_meta(rb, 0); 38176404edcSAsim Jamshed tcprb_resize(rb, 0); 38276404edcSAsim Jamshed } 38376404edcSAsim Jamshed } 38476404edcSAsim Jamshed return DisableBuf(socket, MOS_SIDE_SVR); 38576404edcSAsim Jamshed #else 38676404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 38776404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 38876404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 38976404edcSAsim Jamshed if (tcprb_resize_meta(rb, *(int *)optval) < 0) 39076404edcSAsim Jamshed return -1; 39176404edcSAsim Jamshed return tcprb_resize(rb, 39276404edcSAsim Jamshed (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 39376404edcSAsim Jamshed #endif 39476404edcSAsim Jamshed case MOS_FRAG_CLIBUF: 39576404edcSAsim Jamshed #if 0 39676404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 39776404edcSAsim Jamshed errno = EBADF; 39876404edcSAsim Jamshed return -1; 39976404edcSAsim Jamshed } 40076404edcSAsim Jamshed #endif 40176404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE 40276404edcSAsim Jamshed if (*(int *)optval != 0) 40376404edcSAsim Jamshed return -1; 40476404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 40576404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 40676404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 40776404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 40876404edcSAsim Jamshed if (rb) 40976404edcSAsim Jamshed tcprb_resize(rb, 0); 41076404edcSAsim Jamshed } 41176404edcSAsim Jamshed return 0; 41276404edcSAsim Jamshed #else 41376404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 41476404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 41576404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 41676404edcSAsim Jamshed if (rb->len == 0) 41776404edcSAsim Jamshed return tcprb_resize_meta(rb, *(int *)optval); 41876404edcSAsim Jamshed else 41976404edcSAsim Jamshed return -1; 42076404edcSAsim Jamshed #endif 42176404edcSAsim Jamshed case MOS_FRAG_SVRBUF: 42276404edcSAsim Jamshed #if 0 42376404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 42476404edcSAsim Jamshed errno = EBADF; 42576404edcSAsim Jamshed return -1; 42676404edcSAsim Jamshed } 42776404edcSAsim Jamshed #endif 42876404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE 42976404edcSAsim Jamshed if (*(int *)optval != 0) 43076404edcSAsim Jamshed return -1; 43176404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 43276404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 43376404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 43476404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 43576404edcSAsim Jamshed if (rb) 43676404edcSAsim Jamshed tcprb_resize(rb, 0); 43776404edcSAsim Jamshed } 43876404edcSAsim Jamshed return 0; 43976404edcSAsim Jamshed #else 44076404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 44176404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 44276404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 44376404edcSAsim Jamshed if (rb->len == 0) 44476404edcSAsim Jamshed return tcprb_resize_meta(rb, *(int *)optval); 44576404edcSAsim Jamshed else 44676404edcSAsim Jamshed return -1; 44776404edcSAsim Jamshed #endif 44876404edcSAsim Jamshed case MOS_SEQ_REMAP: 449861ea7dfSAsim Jamshed break; 45076404edcSAsim Jamshed case MOS_STOP_MON: 45176404edcSAsim Jamshed return mtcp_cb_stop(mctx, sockid, *(int *)optval); 45276404edcSAsim Jamshed default: 45376404edcSAsim Jamshed TRACE_API("invalid optname=%d\n", optname); 45476404edcSAsim Jamshed assert(0); 45576404edcSAsim Jamshed } 45676404edcSAsim Jamshed break; 45776404edcSAsim Jamshed } 45876404edcSAsim Jamshed 45976404edcSAsim Jamshed errno = ENOSYS; 46076404edcSAsim Jamshed return -1; 46176404edcSAsim Jamshed } 46276404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 46376404edcSAsim Jamshed int 46476404edcSAsim Jamshed mtcp_setsock_nonblock(mctx_t mctx, int sockid) 46576404edcSAsim Jamshed { 46676404edcSAsim Jamshed mtcp_manager_t mtcp; 46776404edcSAsim Jamshed 46876404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 46976404edcSAsim Jamshed if (!mtcp) { 47076404edcSAsim Jamshed errno = EACCES; 47176404edcSAsim Jamshed return -1; 47276404edcSAsim Jamshed } 47376404edcSAsim Jamshed 47476404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 47576404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 47676404edcSAsim Jamshed errno = EBADF; 47776404edcSAsim Jamshed return -1; 47876404edcSAsim Jamshed } 47976404edcSAsim Jamshed 48076404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 48176404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 48276404edcSAsim Jamshed errno = EBADF; 48376404edcSAsim Jamshed return -1; 48476404edcSAsim Jamshed } 48576404edcSAsim Jamshed 48676404edcSAsim Jamshed mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 48776404edcSAsim Jamshed 48876404edcSAsim Jamshed return 0; 48976404edcSAsim Jamshed } 49076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 49176404edcSAsim Jamshed int 49276404edcSAsim Jamshed mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp) 49376404edcSAsim Jamshed { 49476404edcSAsim Jamshed mtcp_manager_t mtcp; 49576404edcSAsim Jamshed socket_map_t socket; 49676404edcSAsim Jamshed 49776404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 49876404edcSAsim Jamshed if (!mtcp) { 49976404edcSAsim Jamshed errno = EACCES; 50076404edcSAsim Jamshed return -1; 50176404edcSAsim Jamshed } 50276404edcSAsim Jamshed 50376404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 50476404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 50576404edcSAsim Jamshed errno = EBADF; 50676404edcSAsim Jamshed return -1; 50776404edcSAsim Jamshed } 50876404edcSAsim Jamshed 50976404edcSAsim Jamshed /* only support stream socket */ 51076404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 51176404edcSAsim Jamshed 51276404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 51376404edcSAsim Jamshed socket->socktype != MOS_SOCK_STREAM) { 51476404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 51576404edcSAsim Jamshed errno = EBADF; 51676404edcSAsim Jamshed return -1; 51776404edcSAsim Jamshed } 51876404edcSAsim Jamshed 51976404edcSAsim Jamshed if (!argp) { 52076404edcSAsim Jamshed errno = EFAULT; 52176404edcSAsim Jamshed return -1; 52276404edcSAsim Jamshed } 52376404edcSAsim Jamshed 52476404edcSAsim Jamshed if (request == FIONREAD) { 52576404edcSAsim Jamshed tcp_stream *cur_stream; 52676404edcSAsim Jamshed tcprb_t *rbuf; 52776404edcSAsim Jamshed 52876404edcSAsim Jamshed cur_stream = socket->stream; 52976404edcSAsim Jamshed if (!cur_stream) { 53076404edcSAsim Jamshed errno = EBADF; 53176404edcSAsim Jamshed return -1; 53276404edcSAsim Jamshed } 53376404edcSAsim Jamshed 53476404edcSAsim Jamshed rbuf = cur_stream->rcvvar->rcvbuf; 53576404edcSAsim Jamshed *(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0; 53676404edcSAsim Jamshed 53776404edcSAsim Jamshed } else if (request == FIONBIO) { 53876404edcSAsim Jamshed /* 53976404edcSAsim Jamshed * sockets can only be set to blocking/non-blocking 54076404edcSAsim Jamshed * modes during initialization 54176404edcSAsim Jamshed */ 54276404edcSAsim Jamshed if ((*(int *)argp)) 54376404edcSAsim Jamshed mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 54476404edcSAsim Jamshed else 54576404edcSAsim Jamshed mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK; 54676404edcSAsim Jamshed } else { 54776404edcSAsim Jamshed errno = EINVAL; 54876404edcSAsim Jamshed return -1; 54976404edcSAsim Jamshed } 55076404edcSAsim Jamshed 55176404edcSAsim Jamshed return 0; 55276404edcSAsim Jamshed } 55376404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 55476404edcSAsim Jamshed static int 55576404edcSAsim Jamshed mtcp_monitor(mctx_t mctx, socket_map_t sock) 55676404edcSAsim Jamshed { 55776404edcSAsim Jamshed mtcp_manager_t mtcp; 55876404edcSAsim Jamshed struct mon_listener *monitor; 55976404edcSAsim Jamshed int sockid = sock->id; 56076404edcSAsim Jamshed 56176404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 56276404edcSAsim Jamshed if (!mtcp) { 56376404edcSAsim Jamshed errno = EACCES; 56476404edcSAsim Jamshed return -1; 56576404edcSAsim Jamshed } 56676404edcSAsim Jamshed 56776404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 56876404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 56976404edcSAsim Jamshed errno = EBADF; 57076404edcSAsim Jamshed return -1; 57176404edcSAsim Jamshed } 57276404edcSAsim Jamshed 57376404edcSAsim Jamshed if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) { 57476404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 57576404edcSAsim Jamshed errno = EBADF; 57676404edcSAsim Jamshed return -1; 57776404edcSAsim Jamshed } 57876404edcSAsim Jamshed 57976404edcSAsim Jamshed if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM || 58076404edcSAsim Jamshed mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) { 58176404edcSAsim Jamshed TRACE_API("Not a monitor socket. id: %d\n", sockid); 58276404edcSAsim Jamshed errno = ENOTSOCK; 58376404edcSAsim Jamshed return -1; 58476404edcSAsim Jamshed } 58576404edcSAsim Jamshed 58676404edcSAsim Jamshed monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener)); 58776404edcSAsim Jamshed if (!monitor) { 58876404edcSAsim Jamshed /* errno set from the malloc() */ 58976404edcSAsim Jamshed errno = ENOMEM; 59076404edcSAsim Jamshed return -1; 59176404edcSAsim Jamshed } 59276404edcSAsim Jamshed 59376404edcSAsim Jamshed /* create a monitor-specific event queue */ 59476404edcSAsim Jamshed monitor->eq = CreateEventQueue(g_config.mos->max_concurrency); 59576404edcSAsim Jamshed if (!monitor->eq) { 59676404edcSAsim Jamshed TRACE_API("Can't create event queue (concurrency: %d) for " 59776404edcSAsim Jamshed "monitor read event registrations!\n", 59876404edcSAsim Jamshed g_config.mos->max_concurrency); 59976404edcSAsim Jamshed free(monitor); 60076404edcSAsim Jamshed errno = ENOMEM; 60176404edcSAsim Jamshed return -1; 60276404edcSAsim Jamshed } 60376404edcSAsim Jamshed 60476404edcSAsim Jamshed /* set monitor-related basic parameters */ 60576404edcSAsim Jamshed #ifndef NEWEV 60676404edcSAsim Jamshed monitor->ude_id = UDE_OFFSET; 60776404edcSAsim Jamshed #endif 60876404edcSAsim Jamshed monitor->socket = sock; 60976404edcSAsim Jamshed monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL; 61076404edcSAsim Jamshed 61176404edcSAsim Jamshed /* perform both sides monitoring by default */ 61276404edcSAsim Jamshed monitor->client_mon = monitor->server_mon = 1; 61376404edcSAsim Jamshed 61476404edcSAsim Jamshed /* add monitor socket to the monitor list */ 61576404edcSAsim Jamshed TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link); 61676404edcSAsim Jamshed 61776404edcSAsim Jamshed mtcp->msmap[sockid].monitor_listener = monitor; 61876404edcSAsim Jamshed 61976404edcSAsim Jamshed return 0; 62076404edcSAsim Jamshed } 62176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 62276404edcSAsim Jamshed int 62376404edcSAsim Jamshed mtcp_socket(mctx_t mctx, int domain, int type, int protocol) 62476404edcSAsim Jamshed { 62576404edcSAsim Jamshed mtcp_manager_t mtcp; 62676404edcSAsim Jamshed socket_map_t socket; 62776404edcSAsim Jamshed 62876404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 62976404edcSAsim Jamshed if (!mtcp) { 63076404edcSAsim Jamshed errno = EACCES; 63176404edcSAsim Jamshed return -1; 63276404edcSAsim Jamshed } 63376404edcSAsim Jamshed 63476404edcSAsim Jamshed if (domain != AF_INET) { 63576404edcSAsim Jamshed errno = EAFNOSUPPORT; 63676404edcSAsim Jamshed return -1; 63776404edcSAsim Jamshed } 63876404edcSAsim Jamshed 639dcdbbb98SAsim Jamshed if (type == (int)SOCK_STREAM) { 64076404edcSAsim Jamshed type = MOS_SOCK_STREAM; 64176404edcSAsim Jamshed } else if (type == MOS_SOCK_MONITOR_STREAM || 64276404edcSAsim Jamshed type == MOS_SOCK_MONITOR_RAW) { 64376404edcSAsim Jamshed /* do nothing for the time being */ 64476404edcSAsim Jamshed } else { 64576404edcSAsim Jamshed /* Not supported type */ 64676404edcSAsim Jamshed errno = EINVAL; 64776404edcSAsim Jamshed return -1; 64876404edcSAsim Jamshed } 64976404edcSAsim Jamshed 65076404edcSAsim Jamshed socket = AllocateSocket(mctx, type); 65176404edcSAsim Jamshed if (!socket) { 65276404edcSAsim Jamshed errno = ENFILE; 65376404edcSAsim Jamshed return -1; 65476404edcSAsim Jamshed } 65576404edcSAsim Jamshed 65676404edcSAsim Jamshed if (type == MOS_SOCK_MONITOR_STREAM || 65776404edcSAsim Jamshed type == MOS_SOCK_MONITOR_RAW) { 65876404edcSAsim Jamshed mtcp_manager_t mtcp = GetMTCPManager(mctx); 65976404edcSAsim Jamshed if (!mtcp) { 66076404edcSAsim Jamshed errno = EACCES; 66176404edcSAsim Jamshed return -1; 66276404edcSAsim Jamshed } 66376404edcSAsim Jamshed mtcp_monitor(mctx, socket); 66476404edcSAsim Jamshed #ifdef NEWEV 66576404edcSAsim Jamshed socket->monitor_listener->stree_dontcare = NULL; 66676404edcSAsim Jamshed socket->monitor_listener->stree_pre_rcv = NULL; 66776404edcSAsim Jamshed socket->monitor_listener->stree_post_snd = NULL; 66876404edcSAsim Jamshed #else 66976404edcSAsim Jamshed InitEvB(mtcp, &socket->monitor_listener->dontcare_evb); 67076404edcSAsim Jamshed InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb); 67176404edcSAsim Jamshed InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb); 67276404edcSAsim Jamshed #endif 67376404edcSAsim Jamshed } 67476404edcSAsim Jamshed 67576404edcSAsim Jamshed return socket->id; 67676404edcSAsim Jamshed } 67776404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 67876404edcSAsim Jamshed int 67976404edcSAsim Jamshed mtcp_bind(mctx_t mctx, int sockid, 68076404edcSAsim Jamshed const struct sockaddr *addr, socklen_t addrlen) 68176404edcSAsim Jamshed { 68276404edcSAsim Jamshed mtcp_manager_t mtcp; 68376404edcSAsim Jamshed struct sockaddr_in *addr_in; 68476404edcSAsim Jamshed 68576404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 68676404edcSAsim Jamshed if (!mtcp) { 68776404edcSAsim Jamshed errno = EACCES; 68876404edcSAsim Jamshed return -1; 68976404edcSAsim Jamshed } 69076404edcSAsim Jamshed 69176404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 69276404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 69376404edcSAsim Jamshed errno = EBADF; 69476404edcSAsim Jamshed return -1; 69576404edcSAsim Jamshed } 69676404edcSAsim Jamshed 69776404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 69876404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 69976404edcSAsim Jamshed errno = EBADF; 70076404edcSAsim Jamshed return -1; 70176404edcSAsim Jamshed } 70276404edcSAsim Jamshed 70376404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM && 70476404edcSAsim Jamshed mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 70576404edcSAsim Jamshed TRACE_API("Not a stream socket id: %d\n", sockid); 70676404edcSAsim Jamshed errno = ENOTSOCK; 70776404edcSAsim Jamshed return -1; 70876404edcSAsim Jamshed } 70976404edcSAsim Jamshed 71076404edcSAsim Jamshed if (!addr) { 71176404edcSAsim Jamshed TRACE_API("Socket %d: empty address!\n", sockid); 71276404edcSAsim Jamshed errno = EINVAL; 71376404edcSAsim Jamshed return -1; 71476404edcSAsim Jamshed } 71576404edcSAsim Jamshed 71676404edcSAsim Jamshed if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) { 71776404edcSAsim Jamshed TRACE_API("Socket %d: adress already bind for this socket.\n", sockid); 71876404edcSAsim Jamshed errno = EINVAL; 71976404edcSAsim Jamshed return -1; 72076404edcSAsim Jamshed } 72176404edcSAsim Jamshed 72276404edcSAsim Jamshed /* we only allow bind() for AF_INET address */ 72376404edcSAsim Jamshed if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 72476404edcSAsim Jamshed TRACE_API("Socket %d: invalid argument!\n", sockid); 72576404edcSAsim Jamshed errno = EINVAL; 72676404edcSAsim Jamshed return -1; 72776404edcSAsim Jamshed } 72876404edcSAsim Jamshed 72976404edcSAsim Jamshed if (mtcp->listener) { 73076404edcSAsim Jamshed TRACE_API("Address already bound!\n"); 73176404edcSAsim Jamshed errno = EINVAL; 73276404edcSAsim Jamshed return -1; 73376404edcSAsim Jamshed } 73476404edcSAsim Jamshed addr_in = (struct sockaddr_in *)addr; 73576404edcSAsim Jamshed mtcp->smap[sockid].saddr = *addr_in; 73676404edcSAsim Jamshed mtcp->smap[sockid].opts |= MTCP_ADDR_BIND; 73776404edcSAsim Jamshed 73876404edcSAsim Jamshed return 0; 73976404edcSAsim Jamshed } 74076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 74176404edcSAsim Jamshed int 74276404edcSAsim Jamshed mtcp_listen(mctx_t mctx, int sockid, int backlog) 74376404edcSAsim Jamshed { 74476404edcSAsim Jamshed mtcp_manager_t mtcp; 74576404edcSAsim Jamshed struct tcp_listener *listener; 74676404edcSAsim Jamshed 74776404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 74876404edcSAsim Jamshed if (!mtcp) { 74976404edcSAsim Jamshed errno = EACCES; 75076404edcSAsim Jamshed return -1; 75176404edcSAsim Jamshed } 75276404edcSAsim Jamshed 75376404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 75476404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 75576404edcSAsim Jamshed errno = EBADF; 75676404edcSAsim Jamshed return -1; 75776404edcSAsim Jamshed } 75876404edcSAsim Jamshed 75976404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 76076404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 76176404edcSAsim Jamshed errno = EBADF; 76276404edcSAsim Jamshed return -1; 76376404edcSAsim Jamshed } 76476404edcSAsim Jamshed 76576404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) { 76676404edcSAsim Jamshed mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN; 76776404edcSAsim Jamshed } 76876404edcSAsim Jamshed 76976404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 77076404edcSAsim Jamshed TRACE_API("Not a listening socket. id: %d\n", sockid); 77176404edcSAsim Jamshed errno = ENOTSOCK; 77276404edcSAsim Jamshed return -1; 77376404edcSAsim Jamshed } 77476404edcSAsim Jamshed 77576404edcSAsim Jamshed if (backlog <= 0 || backlog > g_config.mos->max_concurrency) { 77676404edcSAsim Jamshed errno = EINVAL; 77776404edcSAsim Jamshed return -1; 77876404edcSAsim Jamshed } 77976404edcSAsim Jamshed 78076404edcSAsim Jamshed listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener)); 78176404edcSAsim Jamshed if (!listener) { 78276404edcSAsim Jamshed /* errno set from the malloc() */ 78376404edcSAsim Jamshed errno = ENOMEM; 78476404edcSAsim Jamshed return -1; 78576404edcSAsim Jamshed } 78676404edcSAsim Jamshed 78776404edcSAsim Jamshed listener->sockid = sockid; 78876404edcSAsim Jamshed listener->backlog = backlog; 78976404edcSAsim Jamshed listener->socket = &mtcp->smap[sockid]; 79076404edcSAsim Jamshed 79176404edcSAsim Jamshed if (pthread_cond_init(&listener->accept_cond, NULL)) { 79276404edcSAsim Jamshed perror("pthread_cond_init of ctx->accept_cond\n"); 79376404edcSAsim Jamshed /* errno set by pthread_cond_init() */ 794c6a5549bSAsim Jamshed free(listener); 79576404edcSAsim Jamshed return -1; 79676404edcSAsim Jamshed } 79776404edcSAsim Jamshed if (pthread_mutex_init(&listener->accept_lock, NULL)) { 79876404edcSAsim Jamshed perror("pthread_mutex_init of ctx->accept_lock\n"); 79976404edcSAsim Jamshed /* errno set by pthread_mutex_init() */ 800c6a5549bSAsim Jamshed free(listener); 80176404edcSAsim Jamshed return -1; 80276404edcSAsim Jamshed } 80376404edcSAsim Jamshed 80476404edcSAsim Jamshed listener->acceptq = CreateStreamQueue(backlog); 80576404edcSAsim Jamshed if (!listener->acceptq) { 8068a941c7eSAsim Jamshed free(listener); 80776404edcSAsim Jamshed errno = ENOMEM; 80876404edcSAsim Jamshed return -1; 80976404edcSAsim Jamshed } 81076404edcSAsim Jamshed 81176404edcSAsim Jamshed mtcp->smap[sockid].listener = listener; 81276404edcSAsim Jamshed mtcp->listener = listener; 81376404edcSAsim Jamshed 81476404edcSAsim Jamshed return 0; 81576404edcSAsim Jamshed } 81676404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 81776404edcSAsim Jamshed int 81876404edcSAsim Jamshed mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen) 81976404edcSAsim Jamshed { 82076404edcSAsim Jamshed mtcp_manager_t mtcp; 82176404edcSAsim Jamshed struct tcp_listener *listener; 82276404edcSAsim Jamshed socket_map_t socket; 82376404edcSAsim Jamshed tcp_stream *accepted = NULL; 82476404edcSAsim Jamshed 82576404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 82676404edcSAsim Jamshed if (!mtcp) { 82776404edcSAsim Jamshed errno = EACCES; 82876404edcSAsim Jamshed return -1; 82976404edcSAsim Jamshed } 83076404edcSAsim Jamshed 83176404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 83276404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 83376404edcSAsim Jamshed errno = EBADF; 83476404edcSAsim Jamshed return -1; 83576404edcSAsim Jamshed } 83676404edcSAsim Jamshed 83776404edcSAsim Jamshed /* requires listening socket */ 83876404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 83976404edcSAsim Jamshed errno = EINVAL; 84076404edcSAsim Jamshed return -1; 84176404edcSAsim Jamshed } 84276404edcSAsim Jamshed 84376404edcSAsim Jamshed listener = mtcp->smap[sockid].listener; 84476404edcSAsim Jamshed 84576404edcSAsim Jamshed /* dequeue from the acceptq without lock first */ 84676404edcSAsim Jamshed /* if nothing there, acquire lock and cond_wait */ 84776404edcSAsim Jamshed accepted = StreamDequeue(listener->acceptq); 84876404edcSAsim Jamshed if (!accepted) { 84976404edcSAsim Jamshed if (listener->socket->opts & MTCP_NONBLOCK) { 85076404edcSAsim Jamshed errno = EAGAIN; 85176404edcSAsim Jamshed return -1; 85276404edcSAsim Jamshed 85376404edcSAsim Jamshed } else { 85476404edcSAsim Jamshed pthread_mutex_lock(&listener->accept_lock); 85576404edcSAsim Jamshed while ((accepted = StreamDequeue(listener->acceptq)) == NULL) { 85676404edcSAsim Jamshed pthread_cond_wait(&listener->accept_cond, &listener->accept_lock); 85776404edcSAsim Jamshed 85876404edcSAsim Jamshed if (mtcp->ctx->done || mtcp->ctx->exit) { 85976404edcSAsim Jamshed pthread_mutex_unlock(&listener->accept_lock); 86076404edcSAsim Jamshed errno = EINTR; 86176404edcSAsim Jamshed return -1; 86276404edcSAsim Jamshed } 86376404edcSAsim Jamshed } 86476404edcSAsim Jamshed pthread_mutex_unlock(&listener->accept_lock); 86576404edcSAsim Jamshed } 86676404edcSAsim Jamshed } 86776404edcSAsim Jamshed 86876404edcSAsim Jamshed if (!accepted) { 86976404edcSAsim Jamshed TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n"); 87076404edcSAsim Jamshed } 87176404edcSAsim Jamshed 87276404edcSAsim Jamshed if (!accepted->socket) { 87376404edcSAsim Jamshed socket = AllocateSocket(mctx, MOS_SOCK_STREAM); 87476404edcSAsim Jamshed if (!socket) { 87576404edcSAsim Jamshed TRACE_ERROR("Failed to create new socket!\n"); 87676404edcSAsim Jamshed /* TODO: destroy the stream */ 87776404edcSAsim Jamshed errno = ENFILE; 87876404edcSAsim Jamshed return -1; 87976404edcSAsim Jamshed } 88076404edcSAsim Jamshed socket->stream = accepted; 88176404edcSAsim Jamshed accepted->socket = socket; 882a5e1a556SAsim Jamshed 883a5e1a556SAsim Jamshed /* set socket addr parameters */ 884a5e1a556SAsim Jamshed socket->saddr.sin_family = AF_INET; 885a5e1a556SAsim Jamshed socket->saddr.sin_port = accepted->dport; 886a5e1a556SAsim Jamshed socket->saddr.sin_addr.s_addr = accepted->daddr; 887a5e1a556SAsim Jamshed 88876404edcSAsim Jamshed /* if monitor is enabled, complete the socket assignment */ 88976404edcSAsim Jamshed if (socket->stream->pair_stream != NULL) 89076404edcSAsim Jamshed socket->stream->pair_stream->socket = socket; 89176404edcSAsim Jamshed } 89276404edcSAsim Jamshed 893a5e1a556SAsim Jamshed if (!(listener->socket->epoll & MOS_EPOLLET) && 894a5e1a556SAsim Jamshed !StreamQueueIsEmpty(listener->acceptq)) 895a5e1a556SAsim Jamshed AddEpollEvent(mtcp->ep, 896a5e1a556SAsim Jamshed USR_SHADOW_EVENT_QUEUE, 897a5e1a556SAsim Jamshed listener->socket, MOS_EPOLLIN); 898a5e1a556SAsim Jamshed 89976404edcSAsim Jamshed TRACE_API("Stream %d accepted.\n", accepted->id); 90076404edcSAsim Jamshed 90176404edcSAsim Jamshed if (addr && addrlen) { 90276404edcSAsim Jamshed struct sockaddr_in *addr_in = (struct sockaddr_in *)addr; 90376404edcSAsim Jamshed addr_in->sin_family = AF_INET; 90476404edcSAsim Jamshed addr_in->sin_port = accepted->dport; 90576404edcSAsim Jamshed addr_in->sin_addr.s_addr = accepted->daddr; 90676404edcSAsim Jamshed *addrlen = sizeof(struct sockaddr_in); 90776404edcSAsim Jamshed } 90876404edcSAsim Jamshed 90976404edcSAsim Jamshed return accepted->socket->id; 91076404edcSAsim Jamshed } 91176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 91276404edcSAsim Jamshed int 91376404edcSAsim Jamshed mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr, 91476404edcSAsim Jamshed in_addr_t daddr, in_addr_t dport) 91576404edcSAsim Jamshed { 91676404edcSAsim Jamshed mtcp_manager_t mtcp; 91776404edcSAsim Jamshed addr_pool_t ap; 91876404edcSAsim Jamshed 91976404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 92076404edcSAsim Jamshed if (!mtcp) { 92176404edcSAsim Jamshed errno = EACCES; 92276404edcSAsim Jamshed return -1; 92376404edcSAsim Jamshed } 92476404edcSAsim Jamshed 92576404edcSAsim Jamshed if (saddr_base == INADDR_ANY) { 92676404edcSAsim Jamshed int nif_out; 92776404edcSAsim Jamshed 92876404edcSAsim Jamshed /* for the INADDR_ANY, find the output interface for the destination 92976404edcSAsim Jamshed and set the saddr_base as the ip address of the output interface */ 93076404edcSAsim Jamshed nif_out = GetOutputInterface(daddr); 9318a941c7eSAsim Jamshed if (nif_out < 0) { 9328a941c7eSAsim Jamshed TRACE_DBG("Could not determine nif idx!\n"); 9338a941c7eSAsim Jamshed errno = EINVAL; 9348a941c7eSAsim Jamshed return -1; 9358a941c7eSAsim Jamshed } 93676404edcSAsim Jamshed saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr; 93776404edcSAsim Jamshed } 93876404edcSAsim Jamshed 93976404edcSAsim Jamshed ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus, 94076404edcSAsim Jamshed saddr_base, num_addr, daddr, dport); 94176404edcSAsim Jamshed if (!ap) { 94276404edcSAsim Jamshed errno = ENOMEM; 94376404edcSAsim Jamshed return -1; 94476404edcSAsim Jamshed } 94576404edcSAsim Jamshed 94676404edcSAsim Jamshed mtcp->ap = ap; 94776404edcSAsim Jamshed 94876404edcSAsim Jamshed return 0; 94976404edcSAsim Jamshed } 95076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 95176404edcSAsim Jamshed int 95276404edcSAsim Jamshed eval_bpf_5tuple(struct sfbpf_program fcode, 95376404edcSAsim Jamshed in_addr_t saddr, in_port_t sport, 95476404edcSAsim Jamshed in_addr_t daddr, in_port_t dport) { 95576404edcSAsim Jamshed uint8_t buf[TOTAL_TCP_HEADER_LEN]; 95676404edcSAsim Jamshed struct ethhdr *ethh; 95776404edcSAsim Jamshed struct iphdr *iph; 95876404edcSAsim Jamshed struct tcphdr *tcph; 95976404edcSAsim Jamshed 96076404edcSAsim Jamshed ethh = (struct ethhdr *)buf; 96176404edcSAsim Jamshed ethh->h_proto = htons(ETH_P_IP); 96276404edcSAsim Jamshed iph = (struct iphdr *)(ethh + 1); 96376404edcSAsim Jamshed iph->ihl = IP_HEADER_LEN >> 2; 96476404edcSAsim Jamshed iph->version = 4; 96576404edcSAsim Jamshed iph->tos = 0; 96676404edcSAsim Jamshed iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN); 96776404edcSAsim Jamshed iph->id = htons(0); 96876404edcSAsim Jamshed iph->protocol = IPPROTO_TCP; 96976404edcSAsim Jamshed iph->saddr = saddr; 97076404edcSAsim Jamshed iph->daddr = daddr; 97176404edcSAsim Jamshed iph->check = 0; 97276404edcSAsim Jamshed tcph = (struct tcphdr *)(iph + 1); 97376404edcSAsim Jamshed tcph->source = sport; 97476404edcSAsim Jamshed tcph->dest = dport; 97576404edcSAsim Jamshed 97676404edcSAsim Jamshed return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr), 97776404edcSAsim Jamshed TOTAL_TCP_HEADER_LEN); 97876404edcSAsim Jamshed } 97976404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 98076404edcSAsim Jamshed int 98176404edcSAsim Jamshed mtcp_connect(mctx_t mctx, int sockid, 98276404edcSAsim Jamshed const struct sockaddr *addr, socklen_t addrlen) 98376404edcSAsim Jamshed { 98476404edcSAsim Jamshed mtcp_manager_t mtcp; 98576404edcSAsim Jamshed socket_map_t socket; 98676404edcSAsim Jamshed tcp_stream *cur_stream; 98776404edcSAsim Jamshed struct sockaddr_in *addr_in; 98876404edcSAsim Jamshed in_addr_t dip; 98976404edcSAsim Jamshed in_port_t dport; 99076404edcSAsim Jamshed int is_dyn_bound = FALSE; 9918a941c7eSAsim Jamshed int ret, nif; 99276404edcSAsim Jamshed int cnt_match = 0; 99376404edcSAsim Jamshed struct mon_listener *walk; 99476404edcSAsim Jamshed struct sfbpf_program fcode; 99576404edcSAsim Jamshed 99676404edcSAsim Jamshed cur_stream = NULL; 99776404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 99876404edcSAsim Jamshed if (!mtcp) { 99976404edcSAsim Jamshed errno = EACCES; 100076404edcSAsim Jamshed return -1; 100176404edcSAsim Jamshed } 100276404edcSAsim Jamshed 100376404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 100476404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 100576404edcSAsim Jamshed errno = EBADF; 100676404edcSAsim Jamshed return -1; 100776404edcSAsim Jamshed } 100876404edcSAsim Jamshed 100976404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 101076404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 101176404edcSAsim Jamshed errno = EBADF; 101276404edcSAsim Jamshed return -1; 101376404edcSAsim Jamshed } 101476404edcSAsim Jamshed 101576404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 101676404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 101776404edcSAsim Jamshed errno = ENOTSOCK; 101876404edcSAsim Jamshed return -1; 101976404edcSAsim Jamshed } 102076404edcSAsim Jamshed 102176404edcSAsim Jamshed if (!addr) { 102276404edcSAsim Jamshed TRACE_API("Socket %d: empty address!\n", sockid); 102376404edcSAsim Jamshed errno = EFAULT; 102476404edcSAsim Jamshed return -1; 102576404edcSAsim Jamshed } 102676404edcSAsim Jamshed 102776404edcSAsim Jamshed /* we only allow bind() for AF_INET address */ 102876404edcSAsim Jamshed if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 102976404edcSAsim Jamshed TRACE_API("Socket %d: invalid argument!\n", sockid); 103076404edcSAsim Jamshed errno = EAFNOSUPPORT; 103176404edcSAsim Jamshed return -1; 103276404edcSAsim Jamshed } 103376404edcSAsim Jamshed 103476404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 103576404edcSAsim Jamshed if (socket->stream) { 103676404edcSAsim Jamshed TRACE_API("Socket %d: stream already exist!\n", sockid); 103776404edcSAsim Jamshed if (socket->stream->state >= TCP_ST_ESTABLISHED) { 103876404edcSAsim Jamshed errno = EISCONN; 103976404edcSAsim Jamshed } else { 104076404edcSAsim Jamshed errno = EALREADY; 104176404edcSAsim Jamshed } 104276404edcSAsim Jamshed return -1; 104376404edcSAsim Jamshed } 104476404edcSAsim Jamshed 104576404edcSAsim Jamshed addr_in = (struct sockaddr_in *)addr; 104676404edcSAsim Jamshed dip = addr_in->sin_addr.s_addr; 104776404edcSAsim Jamshed dport = addr_in->sin_port; 104876404edcSAsim Jamshed 104976404edcSAsim Jamshed /* address binding */ 105076404edcSAsim Jamshed if (socket->opts & MTCP_ADDR_BIND && 105176404edcSAsim Jamshed socket->saddr.sin_port != INPORT_ANY && 105276404edcSAsim Jamshed socket->saddr.sin_addr.s_addr != INADDR_ANY) { 105376404edcSAsim Jamshed int rss_core; 105476404edcSAsim Jamshed 105576404edcSAsim Jamshed rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip, 105676404edcSAsim Jamshed socket->saddr.sin_port, dport, num_queues); 105776404edcSAsim Jamshed 105876404edcSAsim Jamshed if (rss_core != mctx->cpu) { 105976404edcSAsim Jamshed errno = EINVAL; 106076404edcSAsim Jamshed return -1; 106176404edcSAsim Jamshed } 106276404edcSAsim Jamshed } else { 106376404edcSAsim Jamshed if (mtcp->ap) { 106476404edcSAsim Jamshed ret = FetchAddress(mtcp->ap, 106576404edcSAsim Jamshed mctx->cpu, num_queues, addr_in, &socket->saddr); 106676404edcSAsim Jamshed } else { 10678a941c7eSAsim Jamshed nif = GetOutputInterface(dip); 10688a941c7eSAsim Jamshed if (nif < 0) { 10698a941c7eSAsim Jamshed errno = EINVAL; 10708a941c7eSAsim Jamshed return -1; 10718a941c7eSAsim Jamshed } 10728a941c7eSAsim Jamshed ret = FetchAddress(ap[nif], 107376404edcSAsim Jamshed mctx->cpu, num_queues, addr_in, &socket->saddr); 107476404edcSAsim Jamshed } 107576404edcSAsim Jamshed if (ret < 0) { 107676404edcSAsim Jamshed errno = EAGAIN; 107776404edcSAsim Jamshed return -1; 107876404edcSAsim Jamshed } 107976404edcSAsim Jamshed socket->opts |= MTCP_ADDR_BIND; 108076404edcSAsim Jamshed is_dyn_bound = TRUE; 108176404edcSAsim Jamshed } 108276404edcSAsim Jamshed 108376404edcSAsim Jamshed cnt_match = 0; 108476404edcSAsim Jamshed if (mtcp->num_msp > 0) { 108576404edcSAsim Jamshed TAILQ_FOREACH(walk, &mtcp->monitors, link) { 108676404edcSAsim Jamshed fcode = walk->stream_syn_fcode; 108776404edcSAsim Jamshed if (!(ISSET_BPFFILTER(fcode) && 108876404edcSAsim Jamshed eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr, 108976404edcSAsim Jamshed socket->saddr.sin_port, 109076404edcSAsim Jamshed dip, dport) == 0)) { 109176404edcSAsim Jamshed walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1 109276404edcSAsim Jamshed cnt_match++; 109376404edcSAsim Jamshed } 109476404edcSAsim Jamshed } 109576404edcSAsim Jamshed } 109676404edcSAsim Jamshed 109776404edcSAsim Jamshed if (mtcp->num_msp > 0 && cnt_match > 0) { 109876404edcSAsim Jamshed /* 150820 dhkim: XXX: embedded mode is not verified */ 109976404edcSAsim Jamshed #if 1 110076404edcSAsim Jamshed cur_stream = CreateClientTCPStream(mtcp, socket, 110176404edcSAsim Jamshed STREAM_TYPE(MOS_SOCK_STREAM) | 110276404edcSAsim Jamshed STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 110376404edcSAsim Jamshed socket->saddr.sin_addr.s_addr, 110476404edcSAsim Jamshed socket->saddr.sin_port, dip, dport, NULL); 110576404edcSAsim Jamshed #else 110676404edcSAsim Jamshed cur_stream = CreateDualTCPStream(mtcp, socket, 110776404edcSAsim Jamshed STREAM_TYPE(MOS_SOCK_STREAM) | 110876404edcSAsim Jamshed STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 110976404edcSAsim Jamshed socket->saddr.sin_addr.s_addr, 111076404edcSAsim Jamshed socket->saddr.sin_port, dip, dport, NULL); 111176404edcSAsim Jamshed #endif 111276404edcSAsim Jamshed } 111376404edcSAsim Jamshed else 111476404edcSAsim Jamshed cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM), 111576404edcSAsim Jamshed socket->saddr.sin_addr.s_addr, 111676404edcSAsim Jamshed socket->saddr.sin_port, dip, dport, NULL); 111776404edcSAsim Jamshed if (!cur_stream) { 111876404edcSAsim Jamshed TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid); 111976404edcSAsim Jamshed errno = ENOMEM; 112076404edcSAsim Jamshed return -1; 112176404edcSAsim Jamshed } 112276404edcSAsim Jamshed 112376404edcSAsim Jamshed if (is_dyn_bound) 112476404edcSAsim Jamshed cur_stream->is_bound_addr = TRUE; 112576404edcSAsim Jamshed cur_stream->sndvar->cwnd = 1; 112676404edcSAsim Jamshed cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10; 112776404edcSAsim Jamshed cur_stream->side = MOS_SIDE_CLI; 112876404edcSAsim Jamshed /* if monitor is enabled, update the pair stream side as well */ 112976404edcSAsim Jamshed if (cur_stream->pair_stream) { 113076404edcSAsim Jamshed cur_stream->pair_stream->side = MOS_SIDE_SVR; 113176404edcSAsim Jamshed /* 113276404edcSAsim Jamshed * if buffer management is off, then disable 113376404edcSAsim Jamshed * monitoring tcp ring of server... 113476404edcSAsim Jamshed * if there is even a single monitor asking for 113576404edcSAsim Jamshed * buffer management, enable it (that's why the 113676404edcSAsim Jamshed * need for the loop) 113776404edcSAsim Jamshed */ 113876404edcSAsim Jamshed cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF; 113976404edcSAsim Jamshed struct socket_map *walk; 114076404edcSAsim Jamshed SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 114176404edcSAsim Jamshed uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt; 114276404edcSAsim Jamshed if (bm > cur_stream->pair_stream->buffer_mgmt) { 114376404edcSAsim Jamshed cur_stream->pair_stream->buffer_mgmt = bm; 114476404edcSAsim Jamshed break; 114576404edcSAsim Jamshed } 114676404edcSAsim Jamshed } SOCKQ_FOREACH_END; 114776404edcSAsim Jamshed } 114876404edcSAsim Jamshed 114976404edcSAsim Jamshed cur_stream->state = TCP_ST_SYN_SENT; 115076404edcSAsim Jamshed cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 115176404edcSAsim Jamshed 115276404edcSAsim Jamshed TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id); 115376404edcSAsim Jamshed 115476404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->connect_lock); 115576404edcSAsim Jamshed ret = StreamEnqueue(mtcp->connectq, cur_stream); 115676404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->connect_lock); 115776404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 115876404edcSAsim Jamshed if (ret < 0) { 115976404edcSAsim Jamshed TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid); 116076404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock); 116176404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream); 116276404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 116376404edcSAsim Jamshed errno = EAGAIN; 116476404edcSAsim Jamshed return -1; 116576404edcSAsim Jamshed } 116676404edcSAsim Jamshed 116776404edcSAsim Jamshed /* if nonblocking socket, return EINPROGRESS */ 116876404edcSAsim Jamshed if (socket->opts & MTCP_NONBLOCK) { 116976404edcSAsim Jamshed errno = EINPROGRESS; 117076404edcSAsim Jamshed return -1; 117176404edcSAsim Jamshed 117276404edcSAsim Jamshed } else { 117376404edcSAsim Jamshed while (1) { 117476404edcSAsim Jamshed if (!cur_stream) { 117576404edcSAsim Jamshed TRACE_ERROR("STREAM DESTROYED\n"); 117676404edcSAsim Jamshed errno = ETIMEDOUT; 117776404edcSAsim Jamshed return -1; 117876404edcSAsim Jamshed } 117976404edcSAsim Jamshed if (cur_stream->state > TCP_ST_ESTABLISHED) { 118076404edcSAsim Jamshed TRACE_ERROR("Socket %d: weird state %s\n", 118176404edcSAsim Jamshed sockid, TCPStateToString(cur_stream)); 118276404edcSAsim Jamshed // TODO: how to handle this? 118376404edcSAsim Jamshed errno = ENOSYS; 118476404edcSAsim Jamshed return -1; 118576404edcSAsim Jamshed } 118676404edcSAsim Jamshed 118776404edcSAsim Jamshed if (cur_stream->state == TCP_ST_ESTABLISHED) { 118876404edcSAsim Jamshed break; 118976404edcSAsim Jamshed } 119076404edcSAsim Jamshed usleep(1000); 119176404edcSAsim Jamshed } 119276404edcSAsim Jamshed } 119376404edcSAsim Jamshed 119476404edcSAsim Jamshed return 0; 119576404edcSAsim Jamshed } 119676404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 119776404edcSAsim Jamshed static inline int 119876404edcSAsim Jamshed CloseStreamSocket(mctx_t mctx, int sockid) 119976404edcSAsim Jamshed { 120076404edcSAsim Jamshed mtcp_manager_t mtcp; 120176404edcSAsim Jamshed tcp_stream *cur_stream; 120276404edcSAsim Jamshed int ret; 120376404edcSAsim Jamshed 120476404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 120576404edcSAsim Jamshed if (!mtcp) { 120676404edcSAsim Jamshed errno = EACCES; 120776404edcSAsim Jamshed return -1; 120876404edcSAsim Jamshed } 120976404edcSAsim Jamshed 121076404edcSAsim Jamshed cur_stream = mtcp->smap[sockid].stream; 121176404edcSAsim Jamshed if (!cur_stream) { 121276404edcSAsim Jamshed TRACE_API("Socket %d: stream does not exist.\n", sockid); 121376404edcSAsim Jamshed errno = ENOTCONN; 121476404edcSAsim Jamshed return -1; 121576404edcSAsim Jamshed } 121676404edcSAsim Jamshed 121776404edcSAsim Jamshed if (cur_stream->closed) { 121876404edcSAsim Jamshed TRACE_API("Socket %d (Stream %u): already closed stream\n", 121976404edcSAsim Jamshed sockid, cur_stream->id); 122076404edcSAsim Jamshed return 0; 122176404edcSAsim Jamshed } 122276404edcSAsim Jamshed cur_stream->closed = TRUE; 122376404edcSAsim Jamshed 122476404edcSAsim Jamshed TRACE_API("Stream %d: closing the stream.\n", cur_stream->id); 122576404edcSAsim Jamshed 122676404edcSAsim Jamshed /* 141029 dhkim: Check this! */ 122776404edcSAsim Jamshed cur_stream->socket = NULL; 122876404edcSAsim Jamshed 122976404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 123076404edcSAsim Jamshed TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n", 123176404edcSAsim Jamshed cur_stream->id); 123276404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock); 123376404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream); 123476404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 123576404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 123676404edcSAsim Jamshed return 0; 123776404edcSAsim Jamshed 123876404edcSAsim Jamshed } else if (cur_stream->state == TCP_ST_SYN_SENT) { 123976404edcSAsim Jamshed #if 1 124076404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock); 124176404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream); 124276404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 124376404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 124476404edcSAsim Jamshed #endif 124576404edcSAsim Jamshed return -1; 124676404edcSAsim Jamshed 124776404edcSAsim Jamshed } else if (cur_stream->state != TCP_ST_ESTABLISHED && 124876404edcSAsim Jamshed cur_stream->state != TCP_ST_CLOSE_WAIT) { 124976404edcSAsim Jamshed TRACE_API("Stream %d at state %s\n", 125076404edcSAsim Jamshed cur_stream->id, TCPStateToString(cur_stream)); 125176404edcSAsim Jamshed errno = EBADF; 125276404edcSAsim Jamshed return -1; 125376404edcSAsim Jamshed } 125476404edcSAsim Jamshed 125576404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->close_lock); 125676404edcSAsim Jamshed cur_stream->sndvar->on_closeq = TRUE; 125776404edcSAsim Jamshed ret = StreamEnqueue(mtcp->closeq, cur_stream); 125876404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 125976404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->close_lock); 126076404edcSAsim Jamshed 126176404edcSAsim Jamshed if (ret < 0) { 126276404edcSAsim Jamshed TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 126376404edcSAsim Jamshed errno = EAGAIN; 126476404edcSAsim Jamshed return -1; 126576404edcSAsim Jamshed } 126676404edcSAsim Jamshed 126776404edcSAsim Jamshed return 0; 126876404edcSAsim Jamshed } 126976404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 127076404edcSAsim Jamshed static inline int 127176404edcSAsim Jamshed CloseListeningSocket(mctx_t mctx, int sockid) 127276404edcSAsim Jamshed { 127376404edcSAsim Jamshed mtcp_manager_t mtcp; 127476404edcSAsim Jamshed struct tcp_listener *listener; 127576404edcSAsim Jamshed 127676404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 127776404edcSAsim Jamshed if (!mtcp) { 127876404edcSAsim Jamshed errno = EACCES; 127976404edcSAsim Jamshed return -1; 128076404edcSAsim Jamshed } 128176404edcSAsim Jamshed 128276404edcSAsim Jamshed listener = mtcp->smap[sockid].listener; 128376404edcSAsim Jamshed if (!listener) { 128476404edcSAsim Jamshed errno = EINVAL; 128576404edcSAsim Jamshed return -1; 128676404edcSAsim Jamshed } 128776404edcSAsim Jamshed 128876404edcSAsim Jamshed if (listener->acceptq) { 128976404edcSAsim Jamshed DestroyStreamQueue(listener->acceptq); 129076404edcSAsim Jamshed listener->acceptq = NULL; 129176404edcSAsim Jamshed } 129276404edcSAsim Jamshed 129376404edcSAsim Jamshed pthread_mutex_lock(&listener->accept_lock); 129476404edcSAsim Jamshed pthread_cond_signal(&listener->accept_cond); 129576404edcSAsim Jamshed pthread_mutex_unlock(&listener->accept_lock); 129676404edcSAsim Jamshed 129776404edcSAsim Jamshed pthread_cond_destroy(&listener->accept_cond); 129876404edcSAsim Jamshed pthread_mutex_destroy(&listener->accept_lock); 129976404edcSAsim Jamshed 130076404edcSAsim Jamshed free(listener); 130176404edcSAsim Jamshed mtcp->smap[sockid].listener = NULL; 130276404edcSAsim Jamshed 130376404edcSAsim Jamshed return 0; 130476404edcSAsim Jamshed } 130576404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 130676404edcSAsim Jamshed int 130776404edcSAsim Jamshed mtcp_close(mctx_t mctx, int sockid) 130876404edcSAsim Jamshed { 130976404edcSAsim Jamshed mtcp_manager_t mtcp; 131076404edcSAsim Jamshed int ret; 131176404edcSAsim Jamshed 131276404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 131376404edcSAsim Jamshed if (!mtcp) { 131476404edcSAsim Jamshed errno = EACCES; 131576404edcSAsim Jamshed return -1; 131676404edcSAsim Jamshed } 131776404edcSAsim Jamshed 131876404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 131976404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 132076404edcSAsim Jamshed errno = EBADF; 132176404edcSAsim Jamshed return -1; 132276404edcSAsim Jamshed } 132376404edcSAsim Jamshed 132476404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 132576404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 132676404edcSAsim Jamshed errno = EBADF; 132776404edcSAsim Jamshed return -1; 132876404edcSAsim Jamshed } 132976404edcSAsim Jamshed 133076404edcSAsim Jamshed TRACE_API("Socket %d: mtcp_close called.\n", sockid); 133176404edcSAsim Jamshed 133276404edcSAsim Jamshed switch (mtcp->smap[sockid].socktype) { 133376404edcSAsim Jamshed case MOS_SOCK_STREAM: 133476404edcSAsim Jamshed ret = CloseStreamSocket(mctx, sockid); 133576404edcSAsim Jamshed break; 133676404edcSAsim Jamshed 133776404edcSAsim Jamshed case MOS_SOCK_STREAM_LISTEN: 133876404edcSAsim Jamshed ret = CloseListeningSocket(mctx, sockid); 133976404edcSAsim Jamshed break; 134076404edcSAsim Jamshed 134176404edcSAsim Jamshed case MOS_SOCK_EPOLL: 134276404edcSAsim Jamshed ret = CloseEpollSocket(mctx, sockid); 134376404edcSAsim Jamshed break; 134476404edcSAsim Jamshed 134576404edcSAsim Jamshed case MOS_SOCK_PIPE: 134676404edcSAsim Jamshed ret = PipeClose(mctx, sockid); 134776404edcSAsim Jamshed break; 134876404edcSAsim Jamshed 134976404edcSAsim Jamshed default: 135076404edcSAsim Jamshed errno = EINVAL; 135176404edcSAsim Jamshed ret = -1; 135276404edcSAsim Jamshed break; 135376404edcSAsim Jamshed } 135476404edcSAsim Jamshed 135576404edcSAsim Jamshed FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 135676404edcSAsim Jamshed 135776404edcSAsim Jamshed return ret; 135876404edcSAsim Jamshed } 135976404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 136076404edcSAsim Jamshed int 136176404edcSAsim Jamshed mtcp_abort(mctx_t mctx, int sockid) 136276404edcSAsim Jamshed { 136376404edcSAsim Jamshed mtcp_manager_t mtcp; 136476404edcSAsim Jamshed tcp_stream *cur_stream; 136576404edcSAsim Jamshed int ret; 136676404edcSAsim Jamshed 136776404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 136876404edcSAsim Jamshed if (!mtcp) { 136976404edcSAsim Jamshed errno = EACCES; 137076404edcSAsim Jamshed return -1; 137176404edcSAsim Jamshed } 137276404edcSAsim Jamshed 137376404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 137476404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 137576404edcSAsim Jamshed errno = EBADF; 137676404edcSAsim Jamshed return -1; 137776404edcSAsim Jamshed } 137876404edcSAsim Jamshed 137976404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 138076404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 138176404edcSAsim Jamshed errno = EBADF; 138276404edcSAsim Jamshed return -1; 138376404edcSAsim Jamshed } 138476404edcSAsim Jamshed 138576404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 138676404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 138776404edcSAsim Jamshed errno = ENOTSOCK; 138876404edcSAsim Jamshed return -1; 138976404edcSAsim Jamshed } 139076404edcSAsim Jamshed 139176404edcSAsim Jamshed cur_stream = mtcp->smap[sockid].stream; 139276404edcSAsim Jamshed if (!cur_stream) { 139376404edcSAsim Jamshed TRACE_API("Stream %d: does not exist.\n", sockid); 139476404edcSAsim Jamshed errno = ENOTCONN; 139576404edcSAsim Jamshed return -1; 139676404edcSAsim Jamshed } 139776404edcSAsim Jamshed 139876404edcSAsim Jamshed TRACE_API("Socket %d: mtcp_abort()\n", sockid); 139976404edcSAsim Jamshed 140076404edcSAsim Jamshed FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 140176404edcSAsim Jamshed cur_stream->socket = NULL; 140276404edcSAsim Jamshed 140376404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 140476404edcSAsim Jamshed TRACE_API("Stream %d: connection already reset.\n", sockid); 140576404edcSAsim Jamshed return ERROR; 140676404edcSAsim Jamshed 140776404edcSAsim Jamshed } else if (cur_stream->state == TCP_ST_SYN_SENT) { 140876404edcSAsim Jamshed /* TODO: this should notify event failure to all 140976404edcSAsim Jamshed previous read() or write() calls */ 141076404edcSAsim Jamshed cur_stream->state = TCP_ST_CLOSED_RSVD; 141176404edcSAsim Jamshed cur_stream->close_reason = TCP_ACTIVE_CLOSE; 141276404edcSAsim Jamshed cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 141376404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock); 141476404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream); 141576404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 141676404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 141776404edcSAsim Jamshed return 0; 141876404edcSAsim Jamshed 141976404edcSAsim Jamshed } else if (cur_stream->state == TCP_ST_CLOSING || 142076404edcSAsim Jamshed cur_stream->state == TCP_ST_LAST_ACK || 142176404edcSAsim Jamshed cur_stream->state == TCP_ST_TIME_WAIT) { 142276404edcSAsim Jamshed cur_stream->state = TCP_ST_CLOSED_RSVD; 142376404edcSAsim Jamshed cur_stream->close_reason = TCP_ACTIVE_CLOSE; 142476404edcSAsim Jamshed cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 142576404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock); 142676404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream); 142776404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 142876404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 142976404edcSAsim Jamshed return 0; 143076404edcSAsim Jamshed } 143176404edcSAsim Jamshed 143276404edcSAsim Jamshed /* the stream structure will be destroyed after sending RST */ 143376404edcSAsim Jamshed if (cur_stream->sndvar->on_resetq) { 143476404edcSAsim Jamshed TRACE_ERROR("Stream %d: calling mtcp_abort() " 143576404edcSAsim Jamshed "when in reset queue.\n", sockid); 143676404edcSAsim Jamshed errno = ECONNRESET; 143776404edcSAsim Jamshed return -1; 143876404edcSAsim Jamshed } 143976404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->reset_lock); 144076404edcSAsim Jamshed cur_stream->sndvar->on_resetq = TRUE; 144176404edcSAsim Jamshed ret = StreamEnqueue(mtcp->resetq, cur_stream); 144276404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->reset_lock); 144376404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 144476404edcSAsim Jamshed 144576404edcSAsim Jamshed if (ret < 0) { 144676404edcSAsim Jamshed TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 144776404edcSAsim Jamshed errno = EAGAIN; 144876404edcSAsim Jamshed return -1; 144976404edcSAsim Jamshed } 145076404edcSAsim Jamshed 145176404edcSAsim Jamshed return 0; 145276404edcSAsim Jamshed } 145376404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 145476404edcSAsim Jamshed static inline int 1455df3fae06SAsim Jamshed PeekForUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len) 1456df3fae06SAsim Jamshed { 1457df3fae06SAsim Jamshed struct tcp_recv_vars *rcvvar = cur_stream->rcvvar; 1458df3fae06SAsim Jamshed int copylen; 1459df3fae06SAsim Jamshed tcprb_t *rb = rcvvar->rcvbuf; 1460df3fae06SAsim Jamshed 1461df3fae06SAsim Jamshed if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) { 1462df3fae06SAsim Jamshed errno = EAGAIN; 1463df3fae06SAsim Jamshed return -1; 1464df3fae06SAsim Jamshed } 1465df3fae06SAsim Jamshed 1466df3fae06SAsim Jamshed return copylen; 1467df3fae06SAsim Jamshed } 1468df3fae06SAsim Jamshed /*----------------------------------------------------------------------------*/ 1469df3fae06SAsim Jamshed static inline int 147076404edcSAsim Jamshed CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len) 147176404edcSAsim Jamshed { 147276404edcSAsim Jamshed struct tcp_recv_vars *rcvvar = cur_stream->rcvvar; 147376404edcSAsim Jamshed int copylen; 147476404edcSAsim Jamshed tcprb_t *rb = rcvvar->rcvbuf; 147576404edcSAsim Jamshed if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) { 147676404edcSAsim Jamshed errno = EAGAIN; 147776404edcSAsim Jamshed return -1; 147876404edcSAsim Jamshed } 147976404edcSAsim Jamshed tcprb_setpile(rb, rb->pile + copylen); 148076404edcSAsim Jamshed 148176404edcSAsim Jamshed rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb); 148276404edcSAsim Jamshed //printf("rcv_wnd: %d\n", rcvvar->rcv_wnd); 148376404edcSAsim Jamshed 148476404edcSAsim Jamshed /* Advertise newly freed receive buffer */ 148576404edcSAsim Jamshed if (cur_stream->need_wnd_adv) { 148676404edcSAsim Jamshed if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) { 148776404edcSAsim Jamshed if (!cur_stream->sndvar->on_ackq) { 148876404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->ackq_lock); 148976404edcSAsim Jamshed cur_stream->sndvar->on_ackq = TRUE; 149076404edcSAsim Jamshed StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */ 149176404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->ackq_lock); 149276404edcSAsim Jamshed cur_stream->need_wnd_adv = FALSE; 149376404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 149476404edcSAsim Jamshed } 149576404edcSAsim Jamshed } 149676404edcSAsim Jamshed } 149776404edcSAsim Jamshed 149876404edcSAsim Jamshed return copylen; 149976404edcSAsim Jamshed } 150076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 150176404edcSAsim Jamshed ssize_t 1502df3fae06SAsim Jamshed mtcp_recv(mctx_t mctx, int sockid, char *buf, size_t len, int flags) 150376404edcSAsim Jamshed { 150476404edcSAsim Jamshed mtcp_manager_t mtcp; 150576404edcSAsim Jamshed socket_map_t socket; 150676404edcSAsim Jamshed tcp_stream *cur_stream; 150776404edcSAsim Jamshed struct tcp_recv_vars *rcvvar; 150876404edcSAsim Jamshed int event_remaining, merged_len; 150976404edcSAsim Jamshed int ret; 151076404edcSAsim Jamshed 151176404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 151276404edcSAsim Jamshed if (!mtcp) { 151376404edcSAsim Jamshed errno = EACCES; 151476404edcSAsim Jamshed return -1; 151576404edcSAsim Jamshed } 151676404edcSAsim Jamshed 151776404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 151876404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 151976404edcSAsim Jamshed errno = EBADF; 152076404edcSAsim Jamshed return -1; 152176404edcSAsim Jamshed } 152276404edcSAsim Jamshed 152376404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 152476404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 152576404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 152676404edcSAsim Jamshed errno = EBADF; 152776404edcSAsim Jamshed return -1; 152876404edcSAsim Jamshed } 152976404edcSAsim Jamshed 153076404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_PIPE) { 153176404edcSAsim Jamshed return PipeRead(mctx, sockid, buf, len); 153276404edcSAsim Jamshed } 153376404edcSAsim Jamshed 153476404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM) { 153576404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 153676404edcSAsim Jamshed errno = ENOTSOCK; 153776404edcSAsim Jamshed return -1; 153876404edcSAsim Jamshed } 153976404edcSAsim Jamshed 154076404edcSAsim Jamshed /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 154176404edcSAsim Jamshed cur_stream = socket->stream; 154276404edcSAsim Jamshed if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf || 154376404edcSAsim Jamshed !(cur_stream->state >= TCP_ST_ESTABLISHED && 154476404edcSAsim Jamshed cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 154576404edcSAsim Jamshed errno = ENOTCONN; 154676404edcSAsim Jamshed return -1; 154776404edcSAsim Jamshed } 154876404edcSAsim Jamshed 154976404edcSAsim Jamshed rcvvar = cur_stream->rcvvar; 155076404edcSAsim Jamshed 155176404edcSAsim Jamshed merged_len = tcprb_cflen(rcvvar->rcvbuf); 155276404edcSAsim Jamshed 155376404edcSAsim Jamshed /* if CLOSE_WAIT, return 0 if there is no payload */ 155476404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 155576404edcSAsim Jamshed if (merged_len == 0) 155676404edcSAsim Jamshed return 0; 155776404edcSAsim Jamshed } 155876404edcSAsim Jamshed 155976404edcSAsim Jamshed /* return EAGAIN if no receive buffer */ 156076404edcSAsim Jamshed if (socket->opts & MTCP_NONBLOCK) { 15611879243cSAsim Jamshed if (merged_len == 0) { 156276404edcSAsim Jamshed errno = EAGAIN; 156376404edcSAsim Jamshed return -1; 156476404edcSAsim Jamshed } 156576404edcSAsim Jamshed } 156676404edcSAsim Jamshed 156776404edcSAsim Jamshed SBUF_LOCK(&rcvvar->read_lock); 156876404edcSAsim Jamshed 1569df3fae06SAsim Jamshed switch (flags) { 1570df3fae06SAsim Jamshed case 0: 157176404edcSAsim Jamshed ret = CopyToUser(mtcp, cur_stream, buf, len); 1572df3fae06SAsim Jamshed break; 1573df3fae06SAsim Jamshed case MSG_PEEK: 1574df3fae06SAsim Jamshed ret = PeekForUser(mtcp, cur_stream, buf, len); 1575df3fae06SAsim Jamshed break; 1576df3fae06SAsim Jamshed default: 1577df3fae06SAsim Jamshed SBUF_UNLOCK(&rcvvar->read_lock); 1578df3fae06SAsim Jamshed ret = -1; 1579df3fae06SAsim Jamshed errno = EINVAL; 1580df3fae06SAsim Jamshed return ret; 1581df3fae06SAsim Jamshed } 158276404edcSAsim Jamshed 158376404edcSAsim Jamshed merged_len = tcprb_cflen(rcvvar->rcvbuf); 158476404edcSAsim Jamshed event_remaining = FALSE; 158576404edcSAsim Jamshed /* if there are remaining payload, generate EPOLLIN */ 158676404edcSAsim Jamshed /* (may due to insufficient user buffer) */ 158776404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLIN) { 158876404edcSAsim Jamshed if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 158976404edcSAsim Jamshed event_remaining = TRUE; 159076404edcSAsim Jamshed } 159176404edcSAsim Jamshed } 159276404edcSAsim Jamshed /* if waiting for close, notify it if no remaining data */ 159376404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT && 159476404edcSAsim Jamshed merged_len == 0 && ret > 0) { 159576404edcSAsim Jamshed event_remaining = TRUE; 159676404edcSAsim Jamshed } 159776404edcSAsim Jamshed 159876404edcSAsim Jamshed SBUF_UNLOCK(&rcvvar->read_lock); 159976404edcSAsim Jamshed 160076404edcSAsim Jamshed if (event_remaining) { 160176404edcSAsim Jamshed if (socket->epoll) { 160276404edcSAsim Jamshed AddEpollEvent(mtcp->ep, 160376404edcSAsim Jamshed USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 160476404edcSAsim Jamshed } 160576404edcSAsim Jamshed } 160676404edcSAsim Jamshed 1607df3fae06SAsim Jamshed TRACE_API("Stream %d: mtcp_recv() returning %d\n", cur_stream->id, ret); 160876404edcSAsim Jamshed return ret; 160976404edcSAsim Jamshed } 161076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 1611df3fae06SAsim Jamshed inline ssize_t 1612df3fae06SAsim Jamshed mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len) 1613df3fae06SAsim Jamshed { 1614df3fae06SAsim Jamshed return mtcp_recv(mctx, sockid, buf, len, 0); 1615df3fae06SAsim Jamshed } 1616df3fae06SAsim Jamshed /*----------------------------------------------------------------------------*/ 161776404edcSAsim Jamshed ssize_t 1618a5e1a556SAsim Jamshed mtcp_readv(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV) 161976404edcSAsim Jamshed { 162076404edcSAsim Jamshed mtcp_manager_t mtcp; 162176404edcSAsim Jamshed socket_map_t socket; 162276404edcSAsim Jamshed tcp_stream *cur_stream; 162376404edcSAsim Jamshed struct tcp_recv_vars *rcvvar; 162476404edcSAsim Jamshed int ret, bytes_read, i; 162576404edcSAsim Jamshed int event_remaining, merged_len; 162676404edcSAsim Jamshed 162776404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 162876404edcSAsim Jamshed if (!mtcp) { 162976404edcSAsim Jamshed errno = EACCES; 163076404edcSAsim Jamshed return -1; 163176404edcSAsim Jamshed } 163276404edcSAsim Jamshed 163376404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 163476404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 163576404edcSAsim Jamshed errno = EBADF; 163676404edcSAsim Jamshed return -1; 163776404edcSAsim Jamshed } 163876404edcSAsim Jamshed 163976404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 164076404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 164176404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 164276404edcSAsim Jamshed errno = EBADF; 164376404edcSAsim Jamshed return -1; 164476404edcSAsim Jamshed } 164576404edcSAsim Jamshed 164676404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM) { 164776404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 164876404edcSAsim Jamshed errno = ENOTSOCK; 164976404edcSAsim Jamshed return -1; 165076404edcSAsim Jamshed } 165176404edcSAsim Jamshed 165276404edcSAsim Jamshed /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 165376404edcSAsim Jamshed cur_stream = socket->stream; 16541879243cSAsim Jamshed if (!cur_stream || !cur_stream->rcvvar->rcvbuf || 165576404edcSAsim Jamshed !(cur_stream->state >= TCP_ST_ESTABLISHED && 165676404edcSAsim Jamshed cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 165776404edcSAsim Jamshed errno = ENOTCONN; 165876404edcSAsim Jamshed return -1; 165976404edcSAsim Jamshed } 166076404edcSAsim Jamshed 166176404edcSAsim Jamshed rcvvar = cur_stream->rcvvar; 166276404edcSAsim Jamshed 166376404edcSAsim Jamshed merged_len = tcprb_cflen(rcvvar->rcvbuf); 166476404edcSAsim Jamshed 166576404edcSAsim Jamshed /* if CLOSE_WAIT, return 0 if there is no payload */ 166676404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 166776404edcSAsim Jamshed if (merged_len == 0) 166876404edcSAsim Jamshed return 0; 166976404edcSAsim Jamshed } 167076404edcSAsim Jamshed 167176404edcSAsim Jamshed /* return EAGAIN if no receive buffer */ 167276404edcSAsim Jamshed if (socket->opts & MTCP_NONBLOCK) { 16731879243cSAsim Jamshed if (merged_len == 0) { 167476404edcSAsim Jamshed errno = EAGAIN; 167576404edcSAsim Jamshed return -1; 167676404edcSAsim Jamshed } 167776404edcSAsim Jamshed } 167876404edcSAsim Jamshed 167976404edcSAsim Jamshed SBUF_LOCK(&rcvvar->read_lock); 168076404edcSAsim Jamshed 168176404edcSAsim Jamshed /* read and store the contents to the vectored buffers */ 168276404edcSAsim Jamshed bytes_read = 0; 168376404edcSAsim Jamshed for (i = 0; i < numIOV; i++) { 168476404edcSAsim Jamshed if (iov[i].iov_len <= 0) 168576404edcSAsim Jamshed continue; 168676404edcSAsim Jamshed 168776404edcSAsim Jamshed ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 168876404edcSAsim Jamshed if (ret <= 0) 168976404edcSAsim Jamshed break; 169076404edcSAsim Jamshed 169176404edcSAsim Jamshed bytes_read += ret; 169276404edcSAsim Jamshed 169376404edcSAsim Jamshed if (ret < iov[i].iov_len) 169476404edcSAsim Jamshed break; 169576404edcSAsim Jamshed } 169676404edcSAsim Jamshed 169776404edcSAsim Jamshed merged_len = tcprb_cflen(rcvvar->rcvbuf); 169876404edcSAsim Jamshed 169976404edcSAsim Jamshed event_remaining = FALSE; 170076404edcSAsim Jamshed /* if there are remaining payload, generate read event */ 170176404edcSAsim Jamshed /* (may due to insufficient user buffer) */ 170276404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLIN) { 170376404edcSAsim Jamshed if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 170476404edcSAsim Jamshed event_remaining = TRUE; 170576404edcSAsim Jamshed } 170676404edcSAsim Jamshed } 170776404edcSAsim Jamshed /* if waiting for close, notify it if no remaining data */ 170876404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT && 170976404edcSAsim Jamshed merged_len == 0 && bytes_read > 0) { 171076404edcSAsim Jamshed event_remaining = TRUE; 171176404edcSAsim Jamshed } 171276404edcSAsim Jamshed 171376404edcSAsim Jamshed SBUF_UNLOCK(&rcvvar->read_lock); 171476404edcSAsim Jamshed 171576404edcSAsim Jamshed if(event_remaining) { 171676404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) { 171776404edcSAsim Jamshed AddEpollEvent(mtcp->ep, 171876404edcSAsim Jamshed USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 171976404edcSAsim Jamshed } 172076404edcSAsim Jamshed } 172176404edcSAsim Jamshed 172276404edcSAsim Jamshed TRACE_API("Stream %d: mtcp_readv() returning %d\n", 172376404edcSAsim Jamshed cur_stream->id, bytes_read); 172476404edcSAsim Jamshed return bytes_read; 172576404edcSAsim Jamshed } 172676404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 172776404edcSAsim Jamshed static inline int 1728a5e1a556SAsim Jamshed CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, const char *buf, int len) 172976404edcSAsim Jamshed { 173076404edcSAsim Jamshed struct tcp_send_vars *sndvar = cur_stream->sndvar; 173176404edcSAsim Jamshed int sndlen; 173276404edcSAsim Jamshed int ret; 173376404edcSAsim Jamshed 173476404edcSAsim Jamshed sndlen = MIN((int)sndvar->snd_wnd, len); 173576404edcSAsim Jamshed if (sndlen <= 0) { 173676404edcSAsim Jamshed errno = EAGAIN; 173776404edcSAsim Jamshed return -1; 173876404edcSAsim Jamshed } 173976404edcSAsim Jamshed 174076404edcSAsim Jamshed /* allocate send buffer if not exist */ 174176404edcSAsim Jamshed if (!sndvar->sndbuf) { 174276404edcSAsim Jamshed sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1); 174376404edcSAsim Jamshed if (!sndvar->sndbuf) { 174476404edcSAsim Jamshed cur_stream->close_reason = TCP_NO_MEM; 174576404edcSAsim Jamshed /* notification may not required due to -1 return */ 174676404edcSAsim Jamshed errno = ENOMEM; 174776404edcSAsim Jamshed return -1; 174876404edcSAsim Jamshed } 174976404edcSAsim Jamshed } 175076404edcSAsim Jamshed 175176404edcSAsim Jamshed ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen); 175276404edcSAsim Jamshed assert(ret == sndlen); 175376404edcSAsim Jamshed sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len; 175476404edcSAsim Jamshed if (ret <= 0) { 175576404edcSAsim Jamshed TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n", 175676404edcSAsim Jamshed ret, sndlen, sndvar->sndbuf->len); 175776404edcSAsim Jamshed errno = EAGAIN; 175876404edcSAsim Jamshed return -1; 175976404edcSAsim Jamshed } 176076404edcSAsim Jamshed 176176404edcSAsim Jamshed if (sndvar->snd_wnd <= 0) { 176276404edcSAsim Jamshed TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n", 176376404edcSAsim Jamshed cur_stream->id, sndvar->snd_wnd); 176476404edcSAsim Jamshed } 176576404edcSAsim Jamshed 176676404edcSAsim Jamshed return ret; 176776404edcSAsim Jamshed } 176876404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 176976404edcSAsim Jamshed ssize_t 1770a5e1a556SAsim Jamshed mtcp_write(mctx_t mctx, int sockid, const char *buf, size_t len) 177176404edcSAsim Jamshed { 177276404edcSAsim Jamshed mtcp_manager_t mtcp; 177376404edcSAsim Jamshed socket_map_t socket; 177476404edcSAsim Jamshed tcp_stream *cur_stream; 177576404edcSAsim Jamshed struct tcp_send_vars *sndvar; 177676404edcSAsim Jamshed int ret; 177776404edcSAsim Jamshed 177876404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 177976404edcSAsim Jamshed if (!mtcp) { 178076404edcSAsim Jamshed errno = EACCES; 178176404edcSAsim Jamshed return -1; 178276404edcSAsim Jamshed } 178376404edcSAsim Jamshed 178476404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 178576404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 178676404edcSAsim Jamshed errno = EBADF; 178776404edcSAsim Jamshed return -1; 178876404edcSAsim Jamshed } 178976404edcSAsim Jamshed 179076404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 179176404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 179276404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 179376404edcSAsim Jamshed errno = EBADF; 179476404edcSAsim Jamshed return -1; 179576404edcSAsim Jamshed } 179676404edcSAsim Jamshed 179776404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_PIPE) { 179876404edcSAsim Jamshed return PipeWrite(mctx, sockid, buf, len); 179976404edcSAsim Jamshed } 180076404edcSAsim Jamshed 180176404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM) { 180276404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 180376404edcSAsim Jamshed errno = ENOTSOCK; 180476404edcSAsim Jamshed return -1; 180576404edcSAsim Jamshed } 180676404edcSAsim Jamshed 180776404edcSAsim Jamshed cur_stream = socket->stream; 180876404edcSAsim Jamshed if (!cur_stream || 180976404edcSAsim Jamshed !(cur_stream->state == TCP_ST_ESTABLISHED || 181076404edcSAsim Jamshed cur_stream->state == TCP_ST_CLOSE_WAIT)) { 181176404edcSAsim Jamshed errno = ENOTCONN; 181276404edcSAsim Jamshed return -1; 181376404edcSAsim Jamshed } 181476404edcSAsim Jamshed 181576404edcSAsim Jamshed if (len <= 0) { 181676404edcSAsim Jamshed if (socket->opts & MTCP_NONBLOCK) { 181776404edcSAsim Jamshed errno = EAGAIN; 181876404edcSAsim Jamshed return -1; 181976404edcSAsim Jamshed } else { 182076404edcSAsim Jamshed return 0; 182176404edcSAsim Jamshed } 182276404edcSAsim Jamshed } 182376404edcSAsim Jamshed 182476404edcSAsim Jamshed sndvar = cur_stream->sndvar; 182576404edcSAsim Jamshed 182676404edcSAsim Jamshed SBUF_LOCK(&sndvar->write_lock); 182776404edcSAsim Jamshed ret = CopyFromUser(mtcp, cur_stream, buf, len); 182876404edcSAsim Jamshed 182976404edcSAsim Jamshed SBUF_UNLOCK(&sndvar->write_lock); 183076404edcSAsim Jamshed 183176404edcSAsim Jamshed if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 183276404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->sendq_lock); 183376404edcSAsim Jamshed sndvar->on_sendq = TRUE; 183476404edcSAsim Jamshed StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 183576404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->sendq_lock); 183676404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 183776404edcSAsim Jamshed } 183876404edcSAsim Jamshed 183976404edcSAsim Jamshed if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) { 184076404edcSAsim Jamshed ret = -1; 184176404edcSAsim Jamshed errno = EAGAIN; 184276404edcSAsim Jamshed } 184376404edcSAsim Jamshed 184476404edcSAsim Jamshed /* if there are remaining sending buffer, generate write event */ 184576404edcSAsim Jamshed if (sndvar->snd_wnd > 0) { 184676404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 184776404edcSAsim Jamshed AddEpollEvent(mtcp->ep, 184876404edcSAsim Jamshed USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 184976404edcSAsim Jamshed } 185076404edcSAsim Jamshed } 185176404edcSAsim Jamshed 185276404edcSAsim Jamshed TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret); 185376404edcSAsim Jamshed return ret; 185476404edcSAsim Jamshed } 185576404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 185676404edcSAsim Jamshed ssize_t 1857a5e1a556SAsim Jamshed mtcp_writev(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV) 185876404edcSAsim Jamshed { 185976404edcSAsim Jamshed mtcp_manager_t mtcp; 186076404edcSAsim Jamshed socket_map_t socket; 186176404edcSAsim Jamshed tcp_stream *cur_stream; 186276404edcSAsim Jamshed struct tcp_send_vars *sndvar; 186376404edcSAsim Jamshed int ret, to_write, i; 186476404edcSAsim Jamshed 186576404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 186676404edcSAsim Jamshed if (!mtcp) { 186776404edcSAsim Jamshed errno = EACCES; 186876404edcSAsim Jamshed return -1; 186976404edcSAsim Jamshed } 187076404edcSAsim Jamshed 187176404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 187276404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 187376404edcSAsim Jamshed errno = EBADF; 187476404edcSAsim Jamshed return -1; 187576404edcSAsim Jamshed } 187676404edcSAsim Jamshed 187776404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 187876404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 187976404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 188076404edcSAsim Jamshed errno = EBADF; 188176404edcSAsim Jamshed return -1; 188276404edcSAsim Jamshed } 188376404edcSAsim Jamshed 188476404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM) { 188576404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 188676404edcSAsim Jamshed errno = ENOTSOCK; 188776404edcSAsim Jamshed return -1; 188876404edcSAsim Jamshed } 188976404edcSAsim Jamshed 189076404edcSAsim Jamshed cur_stream = socket->stream; 189176404edcSAsim Jamshed if (!cur_stream || 189276404edcSAsim Jamshed !(cur_stream->state == TCP_ST_ESTABLISHED || 189376404edcSAsim Jamshed cur_stream->state == TCP_ST_CLOSE_WAIT)) { 189476404edcSAsim Jamshed errno = ENOTCONN; 189576404edcSAsim Jamshed return -1; 189676404edcSAsim Jamshed } 189776404edcSAsim Jamshed 189876404edcSAsim Jamshed sndvar = cur_stream->sndvar; 189976404edcSAsim Jamshed SBUF_LOCK(&sndvar->write_lock); 190076404edcSAsim Jamshed 190176404edcSAsim Jamshed /* write from the vectored buffers */ 190276404edcSAsim Jamshed to_write = 0; 190376404edcSAsim Jamshed for (i = 0; i < numIOV; i++) { 190476404edcSAsim Jamshed if (iov[i].iov_len <= 0) 190576404edcSAsim Jamshed continue; 190676404edcSAsim Jamshed 190776404edcSAsim Jamshed ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 190876404edcSAsim Jamshed if (ret <= 0) 190976404edcSAsim Jamshed break; 191076404edcSAsim Jamshed 191176404edcSAsim Jamshed to_write += ret; 191276404edcSAsim Jamshed 191376404edcSAsim Jamshed if (ret < iov[i].iov_len) 191476404edcSAsim Jamshed break; 191576404edcSAsim Jamshed } 191676404edcSAsim Jamshed SBUF_UNLOCK(&sndvar->write_lock); 191776404edcSAsim Jamshed 191876404edcSAsim Jamshed if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 191976404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->sendq_lock); 192076404edcSAsim Jamshed sndvar->on_sendq = TRUE; 192176404edcSAsim Jamshed StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 192276404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->sendq_lock); 192376404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 192476404edcSAsim Jamshed } 192576404edcSAsim Jamshed 192676404edcSAsim Jamshed if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) { 192776404edcSAsim Jamshed to_write = -1; 192876404edcSAsim Jamshed errno = EAGAIN; 192976404edcSAsim Jamshed } 193076404edcSAsim Jamshed 193176404edcSAsim Jamshed /* if there are remaining sending buffer, generate write event */ 193276404edcSAsim Jamshed if (sndvar->snd_wnd > 0) { 193376404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 193476404edcSAsim Jamshed AddEpollEvent(mtcp->ep, 193576404edcSAsim Jamshed USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 193676404edcSAsim Jamshed } 193776404edcSAsim Jamshed } 193876404edcSAsim Jamshed 193976404edcSAsim Jamshed TRACE_API("Stream %d: mtcp_writev() returning %d\n", 194076404edcSAsim Jamshed cur_stream->id, to_write); 194176404edcSAsim Jamshed return to_write; 194276404edcSAsim Jamshed } 194376404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 194476404edcSAsim Jamshed uint32_t 194576404edcSAsim Jamshed mtcp_get_connection_cnt(mctx_t mctx) 194676404edcSAsim Jamshed { 194776404edcSAsim Jamshed mtcp_manager_t mtcp; 194876404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 194976404edcSAsim Jamshed if (!mtcp) { 195076404edcSAsim Jamshed errno = EACCES; 195176404edcSAsim Jamshed return -1; 195276404edcSAsim Jamshed } 195376404edcSAsim Jamshed 195476404edcSAsim Jamshed if (mtcp->num_msp > 0) 195576404edcSAsim Jamshed return mtcp->flow_cnt / 2; 195676404edcSAsim Jamshed else 195776404edcSAsim Jamshed return mtcp->flow_cnt; 195876404edcSAsim Jamshed } 195976404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 1960