176404edcSAsim Jamshed #include <sys/queue.h> 276404edcSAsim Jamshed #include <sys/ioctl.h> 376404edcSAsim Jamshed #include <limits.h> 476404edcSAsim Jamshed #include <unistd.h> 576404edcSAsim Jamshed #include <assert.h> 676404edcSAsim Jamshed #include <string.h> 776404edcSAsim Jamshed 876404edcSAsim Jamshed #ifdef DARWIN 976404edcSAsim Jamshed #include <netinet/tcp.h> 1076404edcSAsim Jamshed #include <netinet/ip.h> 1176404edcSAsim Jamshed #include <netinet/if_ether.h> 1276404edcSAsim Jamshed #endif 1376404edcSAsim Jamshed 1476404edcSAsim Jamshed #include "mtcp.h" 1576404edcSAsim Jamshed #include "mtcp_api.h" 1676404edcSAsim Jamshed #include "tcp_in.h" 1776404edcSAsim Jamshed #include "tcp_stream.h" 1876404edcSAsim Jamshed #include "tcp_out.h" 1976404edcSAsim Jamshed #include "ip_out.h" 2076404edcSAsim Jamshed #include "eventpoll.h" 2176404edcSAsim Jamshed #include "pipe.h" 2276404edcSAsim Jamshed #include "fhash.h" 2376404edcSAsim Jamshed #include "addr_pool.h" 2476404edcSAsim Jamshed #include "rss.h" 2576404edcSAsim Jamshed #include "config.h" 2676404edcSAsim Jamshed #include "debug.h" 2776404edcSAsim Jamshed #include "eventpoll.h" 2876404edcSAsim Jamshed #include "mos_api.h" 2976404edcSAsim Jamshed #include "tcp_rb.h" 3076404edcSAsim Jamshed 3176404edcSAsim Jamshed #define MAX(a, b) ((a)>(b)?(a):(b)) 3276404edcSAsim Jamshed #define MIN(a, b) ((a)<(b)?(a):(b)) 3376404edcSAsim Jamshed 3476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 3576404edcSAsim Jamshed /** Stop monitoring the socket! (function prototype) 3676404edcSAsim Jamshed * @param [in] mctx: mtcp context 3776404edcSAsim Jamshed * @param [in] sock: monitoring stream socket id 3876404edcSAsim Jamshed * @param [in] side: side of monitoring (client side, server side or both) 3976404edcSAsim Jamshed * 4076404edcSAsim Jamshed * This function is now DEPRECATED and is only used within mOS core... 4176404edcSAsim Jamshed */ 4276404edcSAsim Jamshed int 4376404edcSAsim Jamshed mtcp_cb_stop(mctx_t mctx, int sock, int side); 4476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 4576404edcSAsim Jamshed /** Reset the connection (send RST packets to both sides) 4676404edcSAsim Jamshed * (We need to decide the API for this.) 4776404edcSAsim Jamshed */ 4876404edcSAsim Jamshed //int 4976404edcSAsim Jamshed //mtcp_cb_reset(mctx_t mctx, int sock, int side); 5076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 5176404edcSAsim Jamshed inline mtcp_manager_t 5276404edcSAsim Jamshed GetMTCPManager(mctx_t mctx) 5376404edcSAsim Jamshed { 5476404edcSAsim Jamshed if (!mctx) { 5576404edcSAsim Jamshed errno = EACCES; 5676404edcSAsim Jamshed return NULL; 5776404edcSAsim Jamshed } 5876404edcSAsim Jamshed 5976404edcSAsim Jamshed if (mctx->cpu < 0 || mctx->cpu >= num_cpus) { 6076404edcSAsim Jamshed errno = EINVAL; 6176404edcSAsim Jamshed return NULL; 6276404edcSAsim Jamshed } 6376404edcSAsim Jamshed 644cb4e140SAsim Jamshed if (!g_mtcp[mctx->cpu] || g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) { 6576404edcSAsim Jamshed errno = EPERM; 6676404edcSAsim Jamshed return NULL; 6776404edcSAsim Jamshed } 6876404edcSAsim Jamshed 6976404edcSAsim Jamshed return g_mtcp[mctx->cpu]; 7076404edcSAsim Jamshed } 7176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 72a5e1a556SAsim Jamshed static inline int 7376404edcSAsim Jamshed GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen) 7476404edcSAsim Jamshed { 7576404edcSAsim Jamshed tcp_stream *cur_stream; 7676404edcSAsim Jamshed 7776404edcSAsim Jamshed if (!socket->stream) { 7876404edcSAsim Jamshed errno = EBADF; 7976404edcSAsim Jamshed return -1; 8076404edcSAsim Jamshed } 8176404edcSAsim Jamshed 8276404edcSAsim Jamshed cur_stream = socket->stream; 8376404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 8476404edcSAsim Jamshed if (cur_stream->close_reason == TCP_TIMEDOUT || 8576404edcSAsim Jamshed cur_stream->close_reason == TCP_CONN_FAIL || 8676404edcSAsim Jamshed cur_stream->close_reason == TCP_CONN_LOST) { 8776404edcSAsim Jamshed *(int *)optval = ETIMEDOUT; 8876404edcSAsim Jamshed *optlen = sizeof(int); 8976404edcSAsim Jamshed 9076404edcSAsim Jamshed return 0; 9176404edcSAsim Jamshed } 9276404edcSAsim Jamshed } 9376404edcSAsim Jamshed 9476404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT || 9576404edcSAsim Jamshed cur_stream->state == TCP_ST_CLOSED_RSVD) { 9676404edcSAsim Jamshed if (cur_stream->close_reason == TCP_RESET) { 9776404edcSAsim Jamshed *(int *)optval = ECONNRESET; 9876404edcSAsim Jamshed *optlen = sizeof(int); 9976404edcSAsim Jamshed 10076404edcSAsim Jamshed return 0; 10176404edcSAsim Jamshed } 10276404edcSAsim Jamshed } 10376404edcSAsim Jamshed 104a5e1a556SAsim Jamshed if (cur_stream->state == TCP_ST_SYN_SENT && 105a5e1a556SAsim Jamshed errno == EINPROGRESS) { 106a5e1a556SAsim Jamshed *(int *)optval = errno; 107a5e1a556SAsim Jamshed *optlen = sizeof(int); 108a5e1a556SAsim Jamshed 109a5e1a556SAsim Jamshed return -1; 110a5e1a556SAsim Jamshed } 111a5e1a556SAsim Jamshed 112265cb675SAsim Jamshed /* 113265cb675SAsim Jamshed * `base case`: If socket sees no so_error, then 114265cb675SAsim Jamshed * this also means close_reason will always be 115265cb675SAsim Jamshed * TCP_NOT_CLOSED. 116265cb675SAsim Jamshed */ 117265cb675SAsim Jamshed if (cur_stream->close_reason == TCP_NOT_CLOSED) { 118265cb675SAsim Jamshed *(int *)optval = 0; 119265cb675SAsim Jamshed *optlen = sizeof(int); 120265cb675SAsim Jamshed 121265cb675SAsim Jamshed return 0; 122265cb675SAsim Jamshed } 123265cb675SAsim Jamshed 12476404edcSAsim Jamshed errno = ENOSYS; 12576404edcSAsim Jamshed return -1; 12676404edcSAsim Jamshed } 12776404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 12876404edcSAsim Jamshed int 129a5e1a556SAsim Jamshed mtcp_getsockname(mctx_t mctx, int sockid, struct sockaddr *addr, 130a5e1a556SAsim Jamshed socklen_t *addrlen) 131a5e1a556SAsim Jamshed { 132a5e1a556SAsim Jamshed mtcp_manager_t mtcp; 133a5e1a556SAsim Jamshed socket_map_t socket; 134a5e1a556SAsim Jamshed 135a5e1a556SAsim Jamshed mtcp = GetMTCPManager(mctx); 136a5e1a556SAsim Jamshed if (!mtcp) { 137a5e1a556SAsim Jamshed return -1; 138a5e1a556SAsim Jamshed } 139a5e1a556SAsim Jamshed 140a5e1a556SAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 141a5e1a556SAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 142a5e1a556SAsim Jamshed errno = EBADF; 143a5e1a556SAsim Jamshed return -1; 144a5e1a556SAsim Jamshed } 145a5e1a556SAsim Jamshed 146a5e1a556SAsim Jamshed socket = &mtcp->smap[sockid]; 147a5e1a556SAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 148a5e1a556SAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 149a5e1a556SAsim Jamshed errno = EBADF; 150a5e1a556SAsim Jamshed return -1; 151a5e1a556SAsim Jamshed } 152a5e1a556SAsim Jamshed 153a5e1a556SAsim Jamshed if (*addrlen <= 0) { 154a5e1a556SAsim Jamshed TRACE_API("Invalid addrlen: %d\n", *addrlen); 155a5e1a556SAsim Jamshed errno = EINVAL; 156a5e1a556SAsim Jamshed return -1; 157a5e1a556SAsim Jamshed } 158a5e1a556SAsim Jamshed 159a5e1a556SAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 160a5e1a556SAsim Jamshed socket->socktype != MOS_SOCK_STREAM) { 161a5e1a556SAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 162a5e1a556SAsim Jamshed errno = ENOTSOCK; 163a5e1a556SAsim Jamshed return -1; 164a5e1a556SAsim Jamshed } 165a5e1a556SAsim Jamshed 166a5e1a556SAsim Jamshed *(struct sockaddr_in *)addr = socket->saddr; 167a5e1a556SAsim Jamshed *addrlen = sizeof(socket->saddr); 168a5e1a556SAsim Jamshed 169a5e1a556SAsim Jamshed return 0; 170a5e1a556SAsim Jamshed } 171a5e1a556SAsim Jamshed /*----------------------------------------------------------------------------*/ 172a5e1a556SAsim Jamshed int 17376404edcSAsim Jamshed mtcp_getsockopt(mctx_t mctx, int sockid, int level, 17476404edcSAsim Jamshed int optname, void *optval, socklen_t *optlen) 17576404edcSAsim Jamshed { 17676404edcSAsim Jamshed mtcp_manager_t mtcp; 17776404edcSAsim Jamshed socket_map_t socket; 17876404edcSAsim Jamshed 17976404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 18076404edcSAsim Jamshed if (!mtcp) { 18176404edcSAsim Jamshed errno = EACCES; 18276404edcSAsim Jamshed return -1; 18376404edcSAsim Jamshed } 18476404edcSAsim Jamshed 18576404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 18676404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 18776404edcSAsim Jamshed errno = EBADF; 18876404edcSAsim Jamshed return -1; 18976404edcSAsim Jamshed } 19076404edcSAsim Jamshed 19176404edcSAsim Jamshed switch (level) { 19276404edcSAsim Jamshed case SOL_SOCKET: 19376404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 19476404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 19576404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 19676404edcSAsim Jamshed errno = EBADF; 19776404edcSAsim Jamshed return -1; 19876404edcSAsim Jamshed } 19976404edcSAsim Jamshed 20076404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 20176404edcSAsim Jamshed socket->socktype != MOS_SOCK_STREAM) { 20276404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 20376404edcSAsim Jamshed errno = ENOTSOCK; 20476404edcSAsim Jamshed return -1; 20576404edcSAsim Jamshed } 20676404edcSAsim Jamshed 20776404edcSAsim Jamshed if (optname == SO_ERROR) { 20876404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_STREAM) { 20976404edcSAsim Jamshed return GetSocketError(socket, optval, optlen); 21076404edcSAsim Jamshed } 21176404edcSAsim Jamshed } 21276404edcSAsim Jamshed break; 21376404edcSAsim Jamshed case SOL_MONSOCKET: 21476404edcSAsim Jamshed /* check if the calling thread is in MOS context */ 21576404edcSAsim Jamshed if (mtcp->ctx->thread != pthread_self()) { 21676404edcSAsim Jamshed errno = EPERM; 21776404edcSAsim Jamshed return -1; 21876404edcSAsim Jamshed } 21976404edcSAsim Jamshed /* 22076404edcSAsim Jamshed * All options will only work for active 22176404edcSAsim Jamshed * monitor stream sockets 22276404edcSAsim Jamshed */ 22376404edcSAsim Jamshed socket = &mtcp->msmap[sockid]; 22476404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 22576404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 22676404edcSAsim Jamshed errno = ENOTSOCK; 22776404edcSAsim Jamshed return -1; 22876404edcSAsim Jamshed } 22976404edcSAsim Jamshed 23076404edcSAsim Jamshed switch (optname) { 23176404edcSAsim Jamshed case MOS_FRAGINFO_CLIBUF: 23276404edcSAsim Jamshed return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen); 23376404edcSAsim Jamshed case MOS_FRAGINFO_SVRBUF: 23476404edcSAsim Jamshed return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen); 23576404edcSAsim Jamshed case MOS_INFO_CLIBUF: 23676404edcSAsim Jamshed return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen); 23776404edcSAsim Jamshed case MOS_INFO_SVRBUF: 23876404edcSAsim Jamshed return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen); 23976404edcSAsim Jamshed case MOS_TCP_STATE_CLI: 24076404edcSAsim Jamshed return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI, 24176404edcSAsim Jamshed optval, optlen); 24276404edcSAsim Jamshed case MOS_TCP_STATE_SVR: 24376404edcSAsim Jamshed return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR, 24476404edcSAsim Jamshed optval, optlen); 24576404edcSAsim Jamshed case MOS_TIMESTAMP: 24676404edcSAsim Jamshed return GetLastTimestamp(socket->monitor_stream->stream, 24776404edcSAsim Jamshed (uint32_t *)optval, 24876404edcSAsim Jamshed optlen); 24976404edcSAsim Jamshed default: 25076404edcSAsim Jamshed TRACE_API("can't recognize the optname=%d\n", optname); 25176404edcSAsim Jamshed assert(0); 25276404edcSAsim Jamshed } 25376404edcSAsim Jamshed break; 25476404edcSAsim Jamshed } 25576404edcSAsim Jamshed errno = ENOSYS; 25676404edcSAsim Jamshed return -1; 25776404edcSAsim Jamshed } 25876404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 25976404edcSAsim Jamshed int 26076404edcSAsim Jamshed mtcp_setsockopt(mctx_t mctx, int sockid, int level, 26176404edcSAsim Jamshed int optname, const void *optval, socklen_t optlen) 26276404edcSAsim Jamshed { 26376404edcSAsim Jamshed mtcp_manager_t mtcp; 26476404edcSAsim Jamshed socket_map_t socket; 26576404edcSAsim Jamshed tcprb_t *rb; 26676404edcSAsim Jamshed 26776404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 26876404edcSAsim Jamshed if (!mtcp) { 26976404edcSAsim Jamshed errno = EACCES; 27076404edcSAsim Jamshed return -1; 27176404edcSAsim Jamshed } 27276404edcSAsim Jamshed 27376404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 27476404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 27576404edcSAsim Jamshed errno = EBADF; 27676404edcSAsim Jamshed return -1; 27776404edcSAsim Jamshed } 27876404edcSAsim Jamshed 27976404edcSAsim Jamshed switch (level) { 28076404edcSAsim Jamshed case SOL_SOCKET: 28176404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 28276404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 28376404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 28476404edcSAsim Jamshed errno = EBADF; 28576404edcSAsim Jamshed return -1; 28676404edcSAsim Jamshed } 28776404edcSAsim Jamshed 28876404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 28976404edcSAsim Jamshed socket->socktype != MOS_SOCK_STREAM) { 29076404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 29176404edcSAsim Jamshed errno = ENOTSOCK; 29276404edcSAsim Jamshed return -1; 29376404edcSAsim Jamshed } 29476404edcSAsim Jamshed break; 29576404edcSAsim Jamshed case SOL_MONSOCKET: 29676404edcSAsim Jamshed socket = &mtcp->msmap[sockid]; 29776404edcSAsim Jamshed /* 29876404edcSAsim Jamshed * checking of calling thread to be in MOS context is 29976404edcSAsim Jamshed * disabled since both optnames can be called from 30076404edcSAsim Jamshed * `application' context (on passive sockets) 30176404edcSAsim Jamshed */ 30276404edcSAsim Jamshed /* 30376404edcSAsim Jamshed * if (mtcp->ctx->thread != pthread_self()) 30476404edcSAsim Jamshed * return -1; 30576404edcSAsim Jamshed */ 30676404edcSAsim Jamshed 30776404edcSAsim Jamshed switch (optname) { 308a834ea89SAsim Jamshed case MOS_CLIOVERLAP: 309a834ea89SAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 310a834ea89SAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 311a834ea89SAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 312a834ea89SAsim Jamshed if (rb == NULL) { 313a834ea89SAsim Jamshed errno = EFAULT; 314a834ea89SAsim Jamshed return -1; 315a834ea89SAsim Jamshed } 316a834ea89SAsim Jamshed if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) { 317a834ea89SAsim Jamshed errno = EINVAL; 318a834ea89SAsim Jamshed return -1; 319a834ea89SAsim Jamshed } else 320a834ea89SAsim Jamshed return 0; 321a834ea89SAsim Jamshed break; 322a834ea89SAsim Jamshed case MOS_SVROVERLAP: 323a834ea89SAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 324a834ea89SAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 325a834ea89SAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 326a834ea89SAsim Jamshed if (rb == NULL) { 327a834ea89SAsim Jamshed errno = EFAULT; 328a834ea89SAsim Jamshed return -1; 329a834ea89SAsim Jamshed } 330a834ea89SAsim Jamshed if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) { 331a834ea89SAsim Jamshed errno = EINVAL; 332a834ea89SAsim Jamshed return -1; 333a834ea89SAsim Jamshed } else 334a834ea89SAsim Jamshed return 0; 335a834ea89SAsim Jamshed break; 33676404edcSAsim Jamshed case MOS_CLIBUF: 33776404edcSAsim Jamshed #if 0 33876404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 33976404edcSAsim Jamshed errno = EBADF; 34076404edcSAsim Jamshed return -1; 34176404edcSAsim Jamshed } 34276404edcSAsim Jamshed #endif 34376404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE 34476404edcSAsim Jamshed if (*(int *)optval != 0) 34576404edcSAsim Jamshed return -1; 34676404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 34776404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 34876404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 34976404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 35076404edcSAsim Jamshed if (rb) { 35176404edcSAsim Jamshed tcprb_resize_meta(rb, 0); 35276404edcSAsim Jamshed tcprb_resize(rb, 0); 35376404edcSAsim Jamshed } 35476404edcSAsim Jamshed } 35576404edcSAsim Jamshed return DisableBuf(socket, MOS_SIDE_CLI); 35676404edcSAsim Jamshed #else 35776404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 35876404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 35976404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 36076404edcSAsim Jamshed if (tcprb_resize_meta(rb, *(int *)optval) < 0) 36176404edcSAsim Jamshed return -1; 36276404edcSAsim Jamshed return tcprb_resize(rb, 36376404edcSAsim Jamshed (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 36476404edcSAsim Jamshed #endif 36576404edcSAsim Jamshed case MOS_SVRBUF: 36676404edcSAsim Jamshed #if 0 36776404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 36876404edcSAsim Jamshed errno = EBADF; 36976404edcSAsim Jamshed return -1; 37076404edcSAsim Jamshed } 37176404edcSAsim Jamshed #endif 37276404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE 37376404edcSAsim Jamshed if (*(int *)optval != 0) 37476404edcSAsim Jamshed return -1; 37576404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 37676404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 37776404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 37876404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 37976404edcSAsim Jamshed if (rb) { 38076404edcSAsim Jamshed tcprb_resize_meta(rb, 0); 38176404edcSAsim Jamshed tcprb_resize(rb, 0); 38276404edcSAsim Jamshed } 38376404edcSAsim Jamshed } 38476404edcSAsim Jamshed return DisableBuf(socket, MOS_SIDE_SVR); 38576404edcSAsim Jamshed #else 38676404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 38776404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 38876404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 38976404edcSAsim Jamshed if (tcprb_resize_meta(rb, *(int *)optval) < 0) 39076404edcSAsim Jamshed return -1; 39176404edcSAsim Jamshed return tcprb_resize(rb, 39276404edcSAsim Jamshed (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 39376404edcSAsim Jamshed #endif 39476404edcSAsim Jamshed case MOS_FRAG_CLIBUF: 39576404edcSAsim Jamshed #if 0 39676404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 39776404edcSAsim Jamshed errno = EBADF; 39876404edcSAsim Jamshed return -1; 39976404edcSAsim Jamshed } 40076404edcSAsim Jamshed #endif 40176404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE 40276404edcSAsim Jamshed if (*(int *)optval != 0) 40376404edcSAsim Jamshed return -1; 40476404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 40576404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 40676404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 40776404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 40876404edcSAsim Jamshed if (rb) 40976404edcSAsim Jamshed tcprb_resize(rb, 0); 41076404edcSAsim Jamshed } 41176404edcSAsim Jamshed return 0; 41276404edcSAsim Jamshed #else 41376404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 41476404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 41576404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 41676404edcSAsim Jamshed if (rb->len == 0) 41776404edcSAsim Jamshed return tcprb_resize_meta(rb, *(int *)optval); 41876404edcSAsim Jamshed else 41976404edcSAsim Jamshed return -1; 42076404edcSAsim Jamshed #endif 42176404edcSAsim Jamshed case MOS_FRAG_SVRBUF: 42276404edcSAsim Jamshed #if 0 42376404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 42476404edcSAsim Jamshed errno = EBADF; 42576404edcSAsim Jamshed return -1; 42676404edcSAsim Jamshed } 42776404edcSAsim Jamshed #endif 42876404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE 42976404edcSAsim Jamshed if (*(int *)optval != 0) 43076404edcSAsim Jamshed return -1; 43176404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 43276404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 43376404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 43476404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 43576404edcSAsim Jamshed if (rb) 43676404edcSAsim Jamshed tcprb_resize(rb, 0); 43776404edcSAsim Jamshed } 43876404edcSAsim Jamshed return 0; 43976404edcSAsim Jamshed #else 44076404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 44176404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 44276404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 44376404edcSAsim Jamshed if (rb->len == 0) 44476404edcSAsim Jamshed return tcprb_resize_meta(rb, *(int *)optval); 44576404edcSAsim Jamshed else 44676404edcSAsim Jamshed return -1; 44776404edcSAsim Jamshed #endif 44876404edcSAsim Jamshed case MOS_MONLEVEL: 44976404edcSAsim Jamshed #ifdef OLD_API 45076404edcSAsim Jamshed assert(*(int *)optval == MOS_NO_CLIBUF || 45176404edcSAsim Jamshed *(int *)optval == MOS_NO_SVRBUF); 45276404edcSAsim Jamshed return DisableBuf(socket, 45376404edcSAsim Jamshed (*(int *)optval == MOS_NO_CLIBUF) ? 45476404edcSAsim Jamshed MOS_SIDE_CLI : MOS_SIDE_SVR); 45576404edcSAsim Jamshed #endif 45676404edcSAsim Jamshed case MOS_SEQ_REMAP: 45776404edcSAsim Jamshed return TcpSeqChange(socket, 45876404edcSAsim Jamshed (uint32_t)((seq_remap_info *)optval)->seq_off, 45976404edcSAsim Jamshed ((seq_remap_info *)optval)->side, 46076404edcSAsim Jamshed mtcp->pctx->p.seq); 46176404edcSAsim Jamshed case MOS_STOP_MON: 46276404edcSAsim Jamshed return mtcp_cb_stop(mctx, sockid, *(int *)optval); 46376404edcSAsim Jamshed default: 46476404edcSAsim Jamshed TRACE_API("invalid optname=%d\n", optname); 46576404edcSAsim Jamshed assert(0); 46676404edcSAsim Jamshed } 46776404edcSAsim Jamshed break; 46876404edcSAsim Jamshed } 46976404edcSAsim Jamshed 47076404edcSAsim Jamshed errno = ENOSYS; 47176404edcSAsim Jamshed return -1; 47276404edcSAsim Jamshed } 47376404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 47476404edcSAsim Jamshed int 47576404edcSAsim Jamshed mtcp_setsock_nonblock(mctx_t mctx, int sockid) 47676404edcSAsim Jamshed { 47776404edcSAsim Jamshed mtcp_manager_t mtcp; 47876404edcSAsim Jamshed 47976404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 48076404edcSAsim Jamshed if (!mtcp) { 48176404edcSAsim Jamshed errno = EACCES; 48276404edcSAsim Jamshed return -1; 48376404edcSAsim Jamshed } 48476404edcSAsim Jamshed 48576404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 48676404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 48776404edcSAsim Jamshed errno = EBADF; 48876404edcSAsim Jamshed return -1; 48976404edcSAsim Jamshed } 49076404edcSAsim Jamshed 49176404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 49276404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 49376404edcSAsim Jamshed errno = EBADF; 49476404edcSAsim Jamshed return -1; 49576404edcSAsim Jamshed } 49676404edcSAsim Jamshed 49776404edcSAsim Jamshed mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 49876404edcSAsim Jamshed 49976404edcSAsim Jamshed return 0; 50076404edcSAsim Jamshed } 50176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 50276404edcSAsim Jamshed int 50376404edcSAsim Jamshed mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp) 50476404edcSAsim Jamshed { 50576404edcSAsim Jamshed mtcp_manager_t mtcp; 50676404edcSAsim Jamshed socket_map_t socket; 50776404edcSAsim Jamshed 50876404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 50976404edcSAsim Jamshed if (!mtcp) { 51076404edcSAsim Jamshed errno = EACCES; 51176404edcSAsim Jamshed return -1; 51276404edcSAsim Jamshed } 51376404edcSAsim Jamshed 51476404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 51576404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 51676404edcSAsim Jamshed errno = EBADF; 51776404edcSAsim Jamshed return -1; 51876404edcSAsim Jamshed } 51976404edcSAsim Jamshed 52076404edcSAsim Jamshed /* only support stream socket */ 52176404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 52276404edcSAsim Jamshed 52376404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 52476404edcSAsim Jamshed socket->socktype != MOS_SOCK_STREAM) { 52576404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 52676404edcSAsim Jamshed errno = EBADF; 52776404edcSAsim Jamshed return -1; 52876404edcSAsim Jamshed } 52976404edcSAsim Jamshed 53076404edcSAsim Jamshed if (!argp) { 53176404edcSAsim Jamshed errno = EFAULT; 53276404edcSAsim Jamshed return -1; 53376404edcSAsim Jamshed } 53476404edcSAsim Jamshed 53576404edcSAsim Jamshed if (request == FIONREAD) { 53676404edcSAsim Jamshed tcp_stream *cur_stream; 53776404edcSAsim Jamshed tcprb_t *rbuf; 53876404edcSAsim Jamshed 53976404edcSAsim Jamshed cur_stream = socket->stream; 54076404edcSAsim Jamshed if (!cur_stream) { 54176404edcSAsim Jamshed errno = EBADF; 54276404edcSAsim Jamshed return -1; 54376404edcSAsim Jamshed } 54476404edcSAsim Jamshed 54576404edcSAsim Jamshed rbuf = cur_stream->rcvvar->rcvbuf; 54676404edcSAsim Jamshed *(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0; 54776404edcSAsim Jamshed 54876404edcSAsim Jamshed } else if (request == FIONBIO) { 54976404edcSAsim Jamshed /* 55076404edcSAsim Jamshed * sockets can only be set to blocking/non-blocking 55176404edcSAsim Jamshed * modes during initialization 55276404edcSAsim Jamshed */ 55376404edcSAsim Jamshed if ((*(int *)argp)) 55476404edcSAsim Jamshed mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 55576404edcSAsim Jamshed else 55676404edcSAsim Jamshed mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK; 55776404edcSAsim Jamshed } else { 55876404edcSAsim Jamshed errno = EINVAL; 55976404edcSAsim Jamshed return -1; 56076404edcSAsim Jamshed } 56176404edcSAsim Jamshed 56276404edcSAsim Jamshed return 0; 56376404edcSAsim Jamshed } 56476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 56576404edcSAsim Jamshed static int 56676404edcSAsim Jamshed mtcp_monitor(mctx_t mctx, socket_map_t sock) 56776404edcSAsim Jamshed { 56876404edcSAsim Jamshed mtcp_manager_t mtcp; 56976404edcSAsim Jamshed struct mon_listener *monitor; 57076404edcSAsim Jamshed int sockid = sock->id; 57176404edcSAsim Jamshed 57276404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 57376404edcSAsim Jamshed if (!mtcp) { 57476404edcSAsim Jamshed errno = EACCES; 57576404edcSAsim Jamshed return -1; 57676404edcSAsim Jamshed } 57776404edcSAsim Jamshed 57876404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 57976404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 58076404edcSAsim Jamshed errno = EBADF; 58176404edcSAsim Jamshed return -1; 58276404edcSAsim Jamshed } 58376404edcSAsim Jamshed 58476404edcSAsim Jamshed if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) { 58576404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 58676404edcSAsim Jamshed errno = EBADF; 58776404edcSAsim Jamshed return -1; 58876404edcSAsim Jamshed } 58976404edcSAsim Jamshed 59076404edcSAsim Jamshed if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM || 59176404edcSAsim Jamshed mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) { 59276404edcSAsim Jamshed TRACE_API("Not a monitor socket. id: %d\n", sockid); 59376404edcSAsim Jamshed errno = ENOTSOCK; 59476404edcSAsim Jamshed return -1; 59576404edcSAsim Jamshed } 59676404edcSAsim Jamshed 59776404edcSAsim Jamshed monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener)); 59876404edcSAsim Jamshed if (!monitor) { 59976404edcSAsim Jamshed /* errno set from the malloc() */ 60076404edcSAsim Jamshed errno = ENOMEM; 60176404edcSAsim Jamshed return -1; 60276404edcSAsim Jamshed } 60376404edcSAsim Jamshed 60476404edcSAsim Jamshed /* create a monitor-specific event queue */ 60576404edcSAsim Jamshed monitor->eq = CreateEventQueue(g_config.mos->max_concurrency); 60676404edcSAsim Jamshed if (!monitor->eq) { 60776404edcSAsim Jamshed TRACE_API("Can't create event queue (concurrency: %d) for " 60876404edcSAsim Jamshed "monitor read event registrations!\n", 60976404edcSAsim Jamshed g_config.mos->max_concurrency); 61076404edcSAsim Jamshed free(monitor); 61176404edcSAsim Jamshed errno = ENOMEM; 61276404edcSAsim Jamshed return -1; 61376404edcSAsim Jamshed } 61476404edcSAsim Jamshed 61576404edcSAsim Jamshed /* set monitor-related basic parameters */ 61676404edcSAsim Jamshed #ifndef NEWEV 61776404edcSAsim Jamshed monitor->ude_id = UDE_OFFSET; 61876404edcSAsim Jamshed #endif 61976404edcSAsim Jamshed monitor->socket = sock; 62076404edcSAsim Jamshed monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL; 62176404edcSAsim Jamshed 62276404edcSAsim Jamshed /* perform both sides monitoring by default */ 62376404edcSAsim Jamshed monitor->client_mon = monitor->server_mon = 1; 62476404edcSAsim Jamshed 62576404edcSAsim Jamshed /* add monitor socket to the monitor list */ 62676404edcSAsim Jamshed TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link); 62776404edcSAsim Jamshed 62876404edcSAsim Jamshed mtcp->msmap[sockid].monitor_listener = monitor; 62976404edcSAsim Jamshed 63076404edcSAsim Jamshed return 0; 63176404edcSAsim Jamshed } 63276404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 63376404edcSAsim Jamshed int 63476404edcSAsim Jamshed mtcp_socket(mctx_t mctx, int domain, int type, int protocol) 63576404edcSAsim Jamshed { 63676404edcSAsim Jamshed mtcp_manager_t mtcp; 63776404edcSAsim Jamshed socket_map_t socket; 63876404edcSAsim Jamshed 63976404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 64076404edcSAsim Jamshed if (!mtcp) { 64176404edcSAsim Jamshed errno = EACCES; 64276404edcSAsim Jamshed return -1; 64376404edcSAsim Jamshed } 64476404edcSAsim Jamshed 64576404edcSAsim Jamshed if (domain != AF_INET) { 64676404edcSAsim Jamshed errno = EAFNOSUPPORT; 64776404edcSAsim Jamshed return -1; 64876404edcSAsim Jamshed } 64976404edcSAsim Jamshed 650*dcdbbb98SAsim Jamshed if (type == (int)SOCK_STREAM) { 65176404edcSAsim Jamshed type = MOS_SOCK_STREAM; 65276404edcSAsim Jamshed } else if (type == MOS_SOCK_MONITOR_STREAM || 65376404edcSAsim Jamshed type == MOS_SOCK_MONITOR_RAW) { 65476404edcSAsim Jamshed /* do nothing for the time being */ 65576404edcSAsim Jamshed } else { 65676404edcSAsim Jamshed /* Not supported type */ 65776404edcSAsim Jamshed errno = EINVAL; 65876404edcSAsim Jamshed return -1; 65976404edcSAsim Jamshed } 66076404edcSAsim Jamshed 66176404edcSAsim Jamshed socket = AllocateSocket(mctx, type); 66276404edcSAsim Jamshed if (!socket) { 66376404edcSAsim Jamshed errno = ENFILE; 66476404edcSAsim Jamshed return -1; 66576404edcSAsim Jamshed } 66676404edcSAsim Jamshed 66776404edcSAsim Jamshed if (type == MOS_SOCK_MONITOR_STREAM || 66876404edcSAsim Jamshed type == MOS_SOCK_MONITOR_RAW) { 66976404edcSAsim Jamshed mtcp_manager_t mtcp = GetMTCPManager(mctx); 67076404edcSAsim Jamshed if (!mtcp) { 67176404edcSAsim Jamshed errno = EACCES; 67276404edcSAsim Jamshed return -1; 67376404edcSAsim Jamshed } 67476404edcSAsim Jamshed mtcp_monitor(mctx, socket); 67576404edcSAsim Jamshed #ifdef NEWEV 67676404edcSAsim Jamshed socket->monitor_listener->stree_dontcare = NULL; 67776404edcSAsim Jamshed socket->monitor_listener->stree_pre_rcv = NULL; 67876404edcSAsim Jamshed socket->monitor_listener->stree_post_snd = NULL; 67976404edcSAsim Jamshed #else 68076404edcSAsim Jamshed InitEvB(mtcp, &socket->monitor_listener->dontcare_evb); 68176404edcSAsim Jamshed InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb); 68276404edcSAsim Jamshed InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb); 68376404edcSAsim Jamshed #endif 68476404edcSAsim Jamshed } 68576404edcSAsim Jamshed 68676404edcSAsim Jamshed return socket->id; 68776404edcSAsim Jamshed } 68876404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 68976404edcSAsim Jamshed int 69076404edcSAsim Jamshed mtcp_bind(mctx_t mctx, int sockid, 69176404edcSAsim Jamshed const struct sockaddr *addr, socklen_t addrlen) 69276404edcSAsim Jamshed { 69376404edcSAsim Jamshed mtcp_manager_t mtcp; 69476404edcSAsim Jamshed struct sockaddr_in *addr_in; 69576404edcSAsim Jamshed 69676404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 69776404edcSAsim Jamshed if (!mtcp) { 69876404edcSAsim Jamshed errno = EACCES; 69976404edcSAsim Jamshed return -1; 70076404edcSAsim Jamshed } 70176404edcSAsim Jamshed 70276404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 70376404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 70476404edcSAsim Jamshed errno = EBADF; 70576404edcSAsim Jamshed return -1; 70676404edcSAsim Jamshed } 70776404edcSAsim Jamshed 70876404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 70976404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 71076404edcSAsim Jamshed errno = EBADF; 71176404edcSAsim Jamshed return -1; 71276404edcSAsim Jamshed } 71376404edcSAsim Jamshed 71476404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM && 71576404edcSAsim Jamshed mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 71676404edcSAsim Jamshed TRACE_API("Not a stream socket id: %d\n", sockid); 71776404edcSAsim Jamshed errno = ENOTSOCK; 71876404edcSAsim Jamshed return -1; 71976404edcSAsim Jamshed } 72076404edcSAsim Jamshed 72176404edcSAsim Jamshed if (!addr) { 72276404edcSAsim Jamshed TRACE_API("Socket %d: empty address!\n", sockid); 72376404edcSAsim Jamshed errno = EINVAL; 72476404edcSAsim Jamshed return -1; 72576404edcSAsim Jamshed } 72676404edcSAsim Jamshed 72776404edcSAsim Jamshed if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) { 72876404edcSAsim Jamshed TRACE_API("Socket %d: adress already bind for this socket.\n", sockid); 72976404edcSAsim Jamshed errno = EINVAL; 73076404edcSAsim Jamshed return -1; 73176404edcSAsim Jamshed } 73276404edcSAsim Jamshed 73376404edcSAsim Jamshed /* we only allow bind() for AF_INET address */ 73476404edcSAsim Jamshed if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 73576404edcSAsim Jamshed TRACE_API("Socket %d: invalid argument!\n", sockid); 73676404edcSAsim Jamshed errno = EINVAL; 73776404edcSAsim Jamshed return -1; 73876404edcSAsim Jamshed } 73976404edcSAsim Jamshed 74076404edcSAsim Jamshed if (mtcp->listener) { 74176404edcSAsim Jamshed TRACE_API("Address already bound!\n"); 74276404edcSAsim Jamshed errno = EINVAL; 74376404edcSAsim Jamshed return -1; 74476404edcSAsim Jamshed } 74576404edcSAsim Jamshed addr_in = (struct sockaddr_in *)addr; 74676404edcSAsim Jamshed mtcp->smap[sockid].saddr = *addr_in; 74776404edcSAsim Jamshed mtcp->smap[sockid].opts |= MTCP_ADDR_BIND; 74876404edcSAsim Jamshed 74976404edcSAsim Jamshed return 0; 75076404edcSAsim Jamshed } 75176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 75276404edcSAsim Jamshed int 75376404edcSAsim Jamshed mtcp_listen(mctx_t mctx, int sockid, int backlog) 75476404edcSAsim Jamshed { 75576404edcSAsim Jamshed mtcp_manager_t mtcp; 75676404edcSAsim Jamshed struct tcp_listener *listener; 75776404edcSAsim Jamshed 75876404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 75976404edcSAsim Jamshed if (!mtcp) { 76076404edcSAsim Jamshed errno = EACCES; 76176404edcSAsim Jamshed return -1; 76276404edcSAsim Jamshed } 76376404edcSAsim Jamshed 76476404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 76576404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 76676404edcSAsim Jamshed errno = EBADF; 76776404edcSAsim Jamshed return -1; 76876404edcSAsim Jamshed } 76976404edcSAsim Jamshed 77076404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 77176404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 77276404edcSAsim Jamshed errno = EBADF; 77376404edcSAsim Jamshed return -1; 77476404edcSAsim Jamshed } 77576404edcSAsim Jamshed 77676404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) { 77776404edcSAsim Jamshed mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN; 77876404edcSAsim Jamshed } 77976404edcSAsim Jamshed 78076404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 78176404edcSAsim Jamshed TRACE_API("Not a listening socket. id: %d\n", sockid); 78276404edcSAsim Jamshed errno = ENOTSOCK; 78376404edcSAsim Jamshed return -1; 78476404edcSAsim Jamshed } 78576404edcSAsim Jamshed 78676404edcSAsim Jamshed if (backlog <= 0 || backlog > g_config.mos->max_concurrency) { 78776404edcSAsim Jamshed errno = EINVAL; 78876404edcSAsim Jamshed return -1; 78976404edcSAsim Jamshed } 79076404edcSAsim Jamshed 79176404edcSAsim Jamshed listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener)); 79276404edcSAsim Jamshed if (!listener) { 79376404edcSAsim Jamshed /* errno set from the malloc() */ 79476404edcSAsim Jamshed errno = ENOMEM; 79576404edcSAsim Jamshed return -1; 79676404edcSAsim Jamshed } 79776404edcSAsim Jamshed 79876404edcSAsim Jamshed listener->sockid = sockid; 79976404edcSAsim Jamshed listener->backlog = backlog; 80076404edcSAsim Jamshed listener->socket = &mtcp->smap[sockid]; 80176404edcSAsim Jamshed 80276404edcSAsim Jamshed if (pthread_cond_init(&listener->accept_cond, NULL)) { 80376404edcSAsim Jamshed perror("pthread_cond_init of ctx->accept_cond\n"); 80476404edcSAsim Jamshed /* errno set by pthread_cond_init() */ 805c6a5549bSAsim Jamshed free(listener); 80676404edcSAsim Jamshed return -1; 80776404edcSAsim Jamshed } 80876404edcSAsim Jamshed if (pthread_mutex_init(&listener->accept_lock, NULL)) { 80976404edcSAsim Jamshed perror("pthread_mutex_init of ctx->accept_lock\n"); 81076404edcSAsim Jamshed /* errno set by pthread_mutex_init() */ 811c6a5549bSAsim Jamshed free(listener); 81276404edcSAsim Jamshed return -1; 81376404edcSAsim Jamshed } 81476404edcSAsim Jamshed 81576404edcSAsim Jamshed listener->acceptq = CreateStreamQueue(backlog); 81676404edcSAsim Jamshed if (!listener->acceptq) { 8178a941c7eSAsim Jamshed free(listener); 81876404edcSAsim Jamshed errno = ENOMEM; 81976404edcSAsim Jamshed return -1; 82076404edcSAsim Jamshed } 82176404edcSAsim Jamshed 82276404edcSAsim Jamshed mtcp->smap[sockid].listener = listener; 82376404edcSAsim Jamshed mtcp->listener = listener; 82476404edcSAsim Jamshed 82576404edcSAsim Jamshed return 0; 82676404edcSAsim Jamshed } 82776404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 82876404edcSAsim Jamshed int 82976404edcSAsim Jamshed mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen) 83076404edcSAsim Jamshed { 83176404edcSAsim Jamshed mtcp_manager_t mtcp; 83276404edcSAsim Jamshed struct tcp_listener *listener; 83376404edcSAsim Jamshed socket_map_t socket; 83476404edcSAsim Jamshed tcp_stream *accepted = NULL; 83576404edcSAsim Jamshed 83676404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 83776404edcSAsim Jamshed if (!mtcp) { 83876404edcSAsim Jamshed errno = EACCES; 83976404edcSAsim Jamshed return -1; 84076404edcSAsim Jamshed } 84176404edcSAsim Jamshed 84276404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 84376404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 84476404edcSAsim Jamshed errno = EBADF; 84576404edcSAsim Jamshed return -1; 84676404edcSAsim Jamshed } 84776404edcSAsim Jamshed 84876404edcSAsim Jamshed /* requires listening socket */ 84976404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 85076404edcSAsim Jamshed errno = EINVAL; 85176404edcSAsim Jamshed return -1; 85276404edcSAsim Jamshed } 85376404edcSAsim Jamshed 85476404edcSAsim Jamshed listener = mtcp->smap[sockid].listener; 85576404edcSAsim Jamshed 85676404edcSAsim Jamshed /* dequeue from the acceptq without lock first */ 85776404edcSAsim Jamshed /* if nothing there, acquire lock and cond_wait */ 85876404edcSAsim Jamshed accepted = StreamDequeue(listener->acceptq); 85976404edcSAsim Jamshed if (!accepted) { 86076404edcSAsim Jamshed if (listener->socket->opts & MTCP_NONBLOCK) { 86176404edcSAsim Jamshed errno = EAGAIN; 86276404edcSAsim Jamshed return -1; 86376404edcSAsim Jamshed 86476404edcSAsim Jamshed } else { 86576404edcSAsim Jamshed pthread_mutex_lock(&listener->accept_lock); 86676404edcSAsim Jamshed while ((accepted = StreamDequeue(listener->acceptq)) == NULL) { 86776404edcSAsim Jamshed pthread_cond_wait(&listener->accept_cond, &listener->accept_lock); 86876404edcSAsim Jamshed 86976404edcSAsim Jamshed if (mtcp->ctx->done || mtcp->ctx->exit) { 87076404edcSAsim Jamshed pthread_mutex_unlock(&listener->accept_lock); 87176404edcSAsim Jamshed errno = EINTR; 87276404edcSAsim Jamshed return -1; 87376404edcSAsim Jamshed } 87476404edcSAsim Jamshed } 87576404edcSAsim Jamshed pthread_mutex_unlock(&listener->accept_lock); 87676404edcSAsim Jamshed } 87776404edcSAsim Jamshed } 87876404edcSAsim Jamshed 87976404edcSAsim Jamshed if (!accepted) { 88076404edcSAsim Jamshed TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n"); 88176404edcSAsim Jamshed } 88276404edcSAsim Jamshed 88376404edcSAsim Jamshed if (!accepted->socket) { 88476404edcSAsim Jamshed socket = AllocateSocket(mctx, MOS_SOCK_STREAM); 88576404edcSAsim Jamshed if (!socket) { 88676404edcSAsim Jamshed TRACE_ERROR("Failed to create new socket!\n"); 88776404edcSAsim Jamshed /* TODO: destroy the stream */ 88876404edcSAsim Jamshed errno = ENFILE; 88976404edcSAsim Jamshed return -1; 89076404edcSAsim Jamshed } 89176404edcSAsim Jamshed socket->stream = accepted; 89276404edcSAsim Jamshed accepted->socket = socket; 893a5e1a556SAsim Jamshed 894a5e1a556SAsim Jamshed /* set socket addr parameters */ 895a5e1a556SAsim Jamshed socket->saddr.sin_family = AF_INET; 896a5e1a556SAsim Jamshed socket->saddr.sin_port = accepted->dport; 897a5e1a556SAsim Jamshed socket->saddr.sin_addr.s_addr = accepted->daddr; 898a5e1a556SAsim Jamshed 89976404edcSAsim Jamshed /* if monitor is enabled, complete the socket assignment */ 90076404edcSAsim Jamshed if (socket->stream->pair_stream != NULL) 90176404edcSAsim Jamshed socket->stream->pair_stream->socket = socket; 90276404edcSAsim Jamshed } 90376404edcSAsim Jamshed 904a5e1a556SAsim Jamshed if (!(listener->socket->epoll & MOS_EPOLLET) && 905a5e1a556SAsim Jamshed !StreamQueueIsEmpty(listener->acceptq)) 906a5e1a556SAsim Jamshed AddEpollEvent(mtcp->ep, 907a5e1a556SAsim Jamshed USR_SHADOW_EVENT_QUEUE, 908a5e1a556SAsim Jamshed listener->socket, MOS_EPOLLIN); 909a5e1a556SAsim Jamshed 91076404edcSAsim Jamshed TRACE_API("Stream %d accepted.\n", accepted->id); 91176404edcSAsim Jamshed 91276404edcSAsim Jamshed if (addr && addrlen) { 91376404edcSAsim Jamshed struct sockaddr_in *addr_in = (struct sockaddr_in *)addr; 91476404edcSAsim Jamshed addr_in->sin_family = AF_INET; 91576404edcSAsim Jamshed addr_in->sin_port = accepted->dport; 91676404edcSAsim Jamshed addr_in->sin_addr.s_addr = accepted->daddr; 91776404edcSAsim Jamshed *addrlen = sizeof(struct sockaddr_in); 91876404edcSAsim Jamshed } 91976404edcSAsim Jamshed 92076404edcSAsim Jamshed return accepted->socket->id; 92176404edcSAsim Jamshed } 92276404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 92376404edcSAsim Jamshed int 92476404edcSAsim Jamshed mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr, 92576404edcSAsim Jamshed in_addr_t daddr, in_addr_t dport) 92676404edcSAsim Jamshed { 92776404edcSAsim Jamshed mtcp_manager_t mtcp; 92876404edcSAsim Jamshed addr_pool_t ap; 92976404edcSAsim Jamshed 93076404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 93176404edcSAsim Jamshed if (!mtcp) { 93276404edcSAsim Jamshed errno = EACCES; 93376404edcSAsim Jamshed return -1; 93476404edcSAsim Jamshed } 93576404edcSAsim Jamshed 93676404edcSAsim Jamshed if (saddr_base == INADDR_ANY) { 93776404edcSAsim Jamshed int nif_out; 93876404edcSAsim Jamshed 93976404edcSAsim Jamshed /* for the INADDR_ANY, find the output interface for the destination 94076404edcSAsim Jamshed and set the saddr_base as the ip address of the output interface */ 94176404edcSAsim Jamshed nif_out = GetOutputInterface(daddr); 9428a941c7eSAsim Jamshed if (nif_out < 0) { 9438a941c7eSAsim Jamshed TRACE_DBG("Could not determine nif idx!\n"); 9448a941c7eSAsim Jamshed errno = EINVAL; 9458a941c7eSAsim Jamshed return -1; 9468a941c7eSAsim Jamshed } 94776404edcSAsim Jamshed saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr; 94876404edcSAsim Jamshed } 94976404edcSAsim Jamshed 95076404edcSAsim Jamshed ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus, 95176404edcSAsim Jamshed saddr_base, num_addr, daddr, dport); 95276404edcSAsim Jamshed if (!ap) { 95376404edcSAsim Jamshed errno = ENOMEM; 95476404edcSAsim Jamshed return -1; 95576404edcSAsim Jamshed } 95676404edcSAsim Jamshed 95776404edcSAsim Jamshed mtcp->ap = ap; 95876404edcSAsim Jamshed 95976404edcSAsim Jamshed return 0; 96076404edcSAsim Jamshed } 96176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 96276404edcSAsim Jamshed int 96376404edcSAsim Jamshed eval_bpf_5tuple(struct sfbpf_program fcode, 96476404edcSAsim Jamshed in_addr_t saddr, in_port_t sport, 96576404edcSAsim Jamshed in_addr_t daddr, in_port_t dport) { 96676404edcSAsim Jamshed uint8_t buf[TOTAL_TCP_HEADER_LEN]; 96776404edcSAsim Jamshed struct ethhdr *ethh; 96876404edcSAsim Jamshed struct iphdr *iph; 96976404edcSAsim Jamshed struct tcphdr *tcph; 97076404edcSAsim Jamshed 97176404edcSAsim Jamshed ethh = (struct ethhdr *)buf; 97276404edcSAsim Jamshed ethh->h_proto = htons(ETH_P_IP); 97376404edcSAsim Jamshed iph = (struct iphdr *)(ethh + 1); 97476404edcSAsim Jamshed iph->ihl = IP_HEADER_LEN >> 2; 97576404edcSAsim Jamshed iph->version = 4; 97676404edcSAsim Jamshed iph->tos = 0; 97776404edcSAsim Jamshed iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN); 97876404edcSAsim Jamshed iph->id = htons(0); 97976404edcSAsim Jamshed iph->protocol = IPPROTO_TCP; 98076404edcSAsim Jamshed iph->saddr = saddr; 98176404edcSAsim Jamshed iph->daddr = daddr; 98276404edcSAsim Jamshed iph->check = 0; 98376404edcSAsim Jamshed tcph = (struct tcphdr *)(iph + 1); 98476404edcSAsim Jamshed tcph->source = sport; 98576404edcSAsim Jamshed tcph->dest = dport; 98676404edcSAsim Jamshed 98776404edcSAsim Jamshed return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr), 98876404edcSAsim Jamshed TOTAL_TCP_HEADER_LEN); 98976404edcSAsim Jamshed } 99076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 99176404edcSAsim Jamshed int 99276404edcSAsim Jamshed mtcp_connect(mctx_t mctx, int sockid, 99376404edcSAsim Jamshed const struct sockaddr *addr, socklen_t addrlen) 99476404edcSAsim Jamshed { 99576404edcSAsim Jamshed mtcp_manager_t mtcp; 99676404edcSAsim Jamshed socket_map_t socket; 99776404edcSAsim Jamshed tcp_stream *cur_stream; 99876404edcSAsim Jamshed struct sockaddr_in *addr_in; 99976404edcSAsim Jamshed in_addr_t dip; 100076404edcSAsim Jamshed in_port_t dport; 100176404edcSAsim Jamshed int is_dyn_bound = FALSE; 10028a941c7eSAsim Jamshed int ret, nif; 100376404edcSAsim Jamshed int cnt_match = 0; 100476404edcSAsim Jamshed struct mon_listener *walk; 100576404edcSAsim Jamshed struct sfbpf_program fcode; 100676404edcSAsim Jamshed 100776404edcSAsim Jamshed cur_stream = NULL; 100876404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 100976404edcSAsim Jamshed if (!mtcp) { 101076404edcSAsim Jamshed errno = EACCES; 101176404edcSAsim Jamshed return -1; 101276404edcSAsim Jamshed } 101376404edcSAsim Jamshed 101476404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 101576404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 101676404edcSAsim Jamshed errno = EBADF; 101776404edcSAsim Jamshed return -1; 101876404edcSAsim Jamshed } 101976404edcSAsim Jamshed 102076404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 102176404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 102276404edcSAsim Jamshed errno = EBADF; 102376404edcSAsim Jamshed return -1; 102476404edcSAsim Jamshed } 102576404edcSAsim Jamshed 102676404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 102776404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 102876404edcSAsim Jamshed errno = ENOTSOCK; 102976404edcSAsim Jamshed return -1; 103076404edcSAsim Jamshed } 103176404edcSAsim Jamshed 103276404edcSAsim Jamshed if (!addr) { 103376404edcSAsim Jamshed TRACE_API("Socket %d: empty address!\n", sockid); 103476404edcSAsim Jamshed errno = EFAULT; 103576404edcSAsim Jamshed return -1; 103676404edcSAsim Jamshed } 103776404edcSAsim Jamshed 103876404edcSAsim Jamshed /* we only allow bind() for AF_INET address */ 103976404edcSAsim Jamshed if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 104076404edcSAsim Jamshed TRACE_API("Socket %d: invalid argument!\n", sockid); 104176404edcSAsim Jamshed errno = EAFNOSUPPORT; 104276404edcSAsim Jamshed return -1; 104376404edcSAsim Jamshed } 104476404edcSAsim Jamshed 104576404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 104676404edcSAsim Jamshed if (socket->stream) { 104776404edcSAsim Jamshed TRACE_API("Socket %d: stream already exist!\n", sockid); 104876404edcSAsim Jamshed if (socket->stream->state >= TCP_ST_ESTABLISHED) { 104976404edcSAsim Jamshed errno = EISCONN; 105076404edcSAsim Jamshed } else { 105176404edcSAsim Jamshed errno = EALREADY; 105276404edcSAsim Jamshed } 105376404edcSAsim Jamshed return -1; 105476404edcSAsim Jamshed } 105576404edcSAsim Jamshed 105676404edcSAsim Jamshed addr_in = (struct sockaddr_in *)addr; 105776404edcSAsim Jamshed dip = addr_in->sin_addr.s_addr; 105876404edcSAsim Jamshed dport = addr_in->sin_port; 105976404edcSAsim Jamshed 106076404edcSAsim Jamshed /* address binding */ 106176404edcSAsim Jamshed if (socket->opts & MTCP_ADDR_BIND && 106276404edcSAsim Jamshed socket->saddr.sin_port != INPORT_ANY && 106376404edcSAsim Jamshed socket->saddr.sin_addr.s_addr != INADDR_ANY) { 106476404edcSAsim Jamshed int rss_core; 106576404edcSAsim Jamshed 106676404edcSAsim Jamshed rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip, 106776404edcSAsim Jamshed socket->saddr.sin_port, dport, num_queues); 106876404edcSAsim Jamshed 106976404edcSAsim Jamshed if (rss_core != mctx->cpu) { 107076404edcSAsim Jamshed errno = EINVAL; 107176404edcSAsim Jamshed return -1; 107276404edcSAsim Jamshed } 107376404edcSAsim Jamshed } else { 107476404edcSAsim Jamshed if (mtcp->ap) { 107576404edcSAsim Jamshed ret = FetchAddress(mtcp->ap, 107676404edcSAsim Jamshed mctx->cpu, num_queues, addr_in, &socket->saddr); 107776404edcSAsim Jamshed } else { 10788a941c7eSAsim Jamshed nif = GetOutputInterface(dip); 10798a941c7eSAsim Jamshed if (nif < 0) { 10808a941c7eSAsim Jamshed errno = EINVAL; 10818a941c7eSAsim Jamshed return -1; 10828a941c7eSAsim Jamshed } 10838a941c7eSAsim Jamshed ret = FetchAddress(ap[nif], 108476404edcSAsim Jamshed mctx->cpu, num_queues, addr_in, &socket->saddr); 108576404edcSAsim Jamshed } 108676404edcSAsim Jamshed if (ret < 0) { 108776404edcSAsim Jamshed errno = EAGAIN; 108876404edcSAsim Jamshed return -1; 108976404edcSAsim Jamshed } 109076404edcSAsim Jamshed socket->opts |= MTCP_ADDR_BIND; 109176404edcSAsim Jamshed is_dyn_bound = TRUE; 109276404edcSAsim Jamshed } 109376404edcSAsim Jamshed 109476404edcSAsim Jamshed cnt_match = 0; 109576404edcSAsim Jamshed if (mtcp->num_msp > 0) { 109676404edcSAsim Jamshed TAILQ_FOREACH(walk, &mtcp->monitors, link) { 109776404edcSAsim Jamshed fcode = walk->stream_syn_fcode; 109876404edcSAsim Jamshed if (!(ISSET_BPFFILTER(fcode) && 109976404edcSAsim Jamshed eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr, 110076404edcSAsim Jamshed socket->saddr.sin_port, 110176404edcSAsim Jamshed dip, dport) == 0)) { 110276404edcSAsim Jamshed walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1 110376404edcSAsim Jamshed cnt_match++; 110476404edcSAsim Jamshed } 110576404edcSAsim Jamshed } 110676404edcSAsim Jamshed } 110776404edcSAsim Jamshed 110876404edcSAsim Jamshed if (mtcp->num_msp > 0 && cnt_match > 0) { 110976404edcSAsim Jamshed /* 150820 dhkim: XXX: embedded mode is not verified */ 111076404edcSAsim Jamshed #if 1 111176404edcSAsim Jamshed cur_stream = CreateClientTCPStream(mtcp, socket, 111276404edcSAsim Jamshed STREAM_TYPE(MOS_SOCK_STREAM) | 111376404edcSAsim Jamshed STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 111476404edcSAsim Jamshed socket->saddr.sin_addr.s_addr, 111576404edcSAsim Jamshed socket->saddr.sin_port, dip, dport, NULL); 111676404edcSAsim Jamshed #else 111776404edcSAsim Jamshed cur_stream = CreateDualTCPStream(mtcp, socket, 111876404edcSAsim Jamshed STREAM_TYPE(MOS_SOCK_STREAM) | 111976404edcSAsim Jamshed STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 112076404edcSAsim Jamshed socket->saddr.sin_addr.s_addr, 112176404edcSAsim Jamshed socket->saddr.sin_port, dip, dport, NULL); 112276404edcSAsim Jamshed #endif 112376404edcSAsim Jamshed } 112476404edcSAsim Jamshed else 112576404edcSAsim Jamshed cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM), 112676404edcSAsim Jamshed socket->saddr.sin_addr.s_addr, 112776404edcSAsim Jamshed socket->saddr.sin_port, dip, dport, NULL); 112876404edcSAsim Jamshed if (!cur_stream) { 112976404edcSAsim Jamshed TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid); 113076404edcSAsim Jamshed errno = ENOMEM; 113176404edcSAsim Jamshed return -1; 113276404edcSAsim Jamshed } 113376404edcSAsim Jamshed 113476404edcSAsim Jamshed if (is_dyn_bound) 113576404edcSAsim Jamshed cur_stream->is_bound_addr = TRUE; 113676404edcSAsim Jamshed cur_stream->sndvar->cwnd = 1; 113776404edcSAsim Jamshed cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10; 113876404edcSAsim Jamshed cur_stream->side = MOS_SIDE_CLI; 113976404edcSAsim Jamshed /* if monitor is enabled, update the pair stream side as well */ 114076404edcSAsim Jamshed if (cur_stream->pair_stream) { 114176404edcSAsim Jamshed cur_stream->pair_stream->side = MOS_SIDE_SVR; 114276404edcSAsim Jamshed /* 114376404edcSAsim Jamshed * if buffer management is off, then disable 114476404edcSAsim Jamshed * monitoring tcp ring of server... 114576404edcSAsim Jamshed * if there is even a single monitor asking for 114676404edcSAsim Jamshed * buffer management, enable it (that's why the 114776404edcSAsim Jamshed * need for the loop) 114876404edcSAsim Jamshed */ 114976404edcSAsim Jamshed cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF; 115076404edcSAsim Jamshed struct socket_map *walk; 115176404edcSAsim Jamshed SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 115276404edcSAsim Jamshed uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt; 115376404edcSAsim Jamshed if (bm > cur_stream->pair_stream->buffer_mgmt) { 115476404edcSAsim Jamshed cur_stream->pair_stream->buffer_mgmt = bm; 115576404edcSAsim Jamshed break; 115676404edcSAsim Jamshed } 115776404edcSAsim Jamshed } SOCKQ_FOREACH_END; 115876404edcSAsim Jamshed } 115976404edcSAsim Jamshed 116076404edcSAsim Jamshed cur_stream->state = TCP_ST_SYN_SENT; 116176404edcSAsim Jamshed cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 116276404edcSAsim Jamshed 116376404edcSAsim Jamshed TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id); 116476404edcSAsim Jamshed 116576404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->connect_lock); 116676404edcSAsim Jamshed ret = StreamEnqueue(mtcp->connectq, cur_stream); 116776404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->connect_lock); 116876404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 116976404edcSAsim Jamshed if (ret < 0) { 117076404edcSAsim Jamshed TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid); 117176404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock); 117276404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream); 117376404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 117476404edcSAsim Jamshed errno = EAGAIN; 117576404edcSAsim Jamshed return -1; 117676404edcSAsim Jamshed } 117776404edcSAsim Jamshed 117876404edcSAsim Jamshed /* if nonblocking socket, return EINPROGRESS */ 117976404edcSAsim Jamshed if (socket->opts & MTCP_NONBLOCK) { 118076404edcSAsim Jamshed errno = EINPROGRESS; 118176404edcSAsim Jamshed return -1; 118276404edcSAsim Jamshed 118376404edcSAsim Jamshed } else { 118476404edcSAsim Jamshed while (1) { 118576404edcSAsim Jamshed if (!cur_stream) { 118676404edcSAsim Jamshed TRACE_ERROR("STREAM DESTROYED\n"); 118776404edcSAsim Jamshed errno = ETIMEDOUT; 118876404edcSAsim Jamshed return -1; 118976404edcSAsim Jamshed } 119076404edcSAsim Jamshed if (cur_stream->state > TCP_ST_ESTABLISHED) { 119176404edcSAsim Jamshed TRACE_ERROR("Socket %d: weird state %s\n", 119276404edcSAsim Jamshed sockid, TCPStateToString(cur_stream)); 119376404edcSAsim Jamshed // TODO: how to handle this? 119476404edcSAsim Jamshed errno = ENOSYS; 119576404edcSAsim Jamshed return -1; 119676404edcSAsim Jamshed } 119776404edcSAsim Jamshed 119876404edcSAsim Jamshed if (cur_stream->state == TCP_ST_ESTABLISHED) { 119976404edcSAsim Jamshed break; 120076404edcSAsim Jamshed } 120176404edcSAsim Jamshed usleep(1000); 120276404edcSAsim Jamshed } 120376404edcSAsim Jamshed } 120476404edcSAsim Jamshed 120576404edcSAsim Jamshed return 0; 120676404edcSAsim Jamshed } 120776404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 120876404edcSAsim Jamshed static inline int 120976404edcSAsim Jamshed CloseStreamSocket(mctx_t mctx, int sockid) 121076404edcSAsim Jamshed { 121176404edcSAsim Jamshed mtcp_manager_t mtcp; 121276404edcSAsim Jamshed tcp_stream *cur_stream; 121376404edcSAsim Jamshed int ret; 121476404edcSAsim Jamshed 121576404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 121676404edcSAsim Jamshed if (!mtcp) { 121776404edcSAsim Jamshed errno = EACCES; 121876404edcSAsim Jamshed return -1; 121976404edcSAsim Jamshed } 122076404edcSAsim Jamshed 122176404edcSAsim Jamshed cur_stream = mtcp->smap[sockid].stream; 122276404edcSAsim Jamshed if (!cur_stream) { 122376404edcSAsim Jamshed TRACE_API("Socket %d: stream does not exist.\n", sockid); 122476404edcSAsim Jamshed errno = ENOTCONN; 122576404edcSAsim Jamshed return -1; 122676404edcSAsim Jamshed } 122776404edcSAsim Jamshed 122876404edcSAsim Jamshed if (cur_stream->closed) { 122976404edcSAsim Jamshed TRACE_API("Socket %d (Stream %u): already closed stream\n", 123076404edcSAsim Jamshed sockid, cur_stream->id); 123176404edcSAsim Jamshed return 0; 123276404edcSAsim Jamshed } 123376404edcSAsim Jamshed cur_stream->closed = TRUE; 123476404edcSAsim Jamshed 123576404edcSAsim Jamshed TRACE_API("Stream %d: closing the stream.\n", cur_stream->id); 123676404edcSAsim Jamshed 123776404edcSAsim Jamshed /* 141029 dhkim: Check this! */ 123876404edcSAsim Jamshed cur_stream->socket = NULL; 123976404edcSAsim Jamshed 124076404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 124176404edcSAsim Jamshed TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n", 124276404edcSAsim Jamshed cur_stream->id); 124376404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock); 124476404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream); 124576404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 124676404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 124776404edcSAsim Jamshed return 0; 124876404edcSAsim Jamshed 124976404edcSAsim Jamshed } else if (cur_stream->state == TCP_ST_SYN_SENT) { 125076404edcSAsim Jamshed #if 1 125176404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock); 125276404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream); 125376404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 125476404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 125576404edcSAsim Jamshed #endif 125676404edcSAsim Jamshed return -1; 125776404edcSAsim Jamshed 125876404edcSAsim Jamshed } else if (cur_stream->state != TCP_ST_ESTABLISHED && 125976404edcSAsim Jamshed cur_stream->state != TCP_ST_CLOSE_WAIT) { 126076404edcSAsim Jamshed TRACE_API("Stream %d at state %s\n", 126176404edcSAsim Jamshed cur_stream->id, TCPStateToString(cur_stream)); 126276404edcSAsim Jamshed errno = EBADF; 126376404edcSAsim Jamshed return -1; 126476404edcSAsim Jamshed } 126576404edcSAsim Jamshed 126676404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->close_lock); 126776404edcSAsim Jamshed cur_stream->sndvar->on_closeq = TRUE; 126876404edcSAsim Jamshed ret = StreamEnqueue(mtcp->closeq, cur_stream); 126976404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 127076404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->close_lock); 127176404edcSAsim Jamshed 127276404edcSAsim Jamshed if (ret < 0) { 127376404edcSAsim Jamshed TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 127476404edcSAsim Jamshed errno = EAGAIN; 127576404edcSAsim Jamshed return -1; 127676404edcSAsim Jamshed } 127776404edcSAsim Jamshed 127876404edcSAsim Jamshed return 0; 127976404edcSAsim Jamshed } 128076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 128176404edcSAsim Jamshed static inline int 128276404edcSAsim Jamshed CloseListeningSocket(mctx_t mctx, int sockid) 128376404edcSAsim Jamshed { 128476404edcSAsim Jamshed mtcp_manager_t mtcp; 128576404edcSAsim Jamshed struct tcp_listener *listener; 128676404edcSAsim Jamshed 128776404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 128876404edcSAsim Jamshed if (!mtcp) { 128976404edcSAsim Jamshed errno = EACCES; 129076404edcSAsim Jamshed return -1; 129176404edcSAsim Jamshed } 129276404edcSAsim Jamshed 129376404edcSAsim Jamshed listener = mtcp->smap[sockid].listener; 129476404edcSAsim Jamshed if (!listener) { 129576404edcSAsim Jamshed errno = EINVAL; 129676404edcSAsim Jamshed return -1; 129776404edcSAsim Jamshed } 129876404edcSAsim Jamshed 129976404edcSAsim Jamshed if (listener->acceptq) { 130076404edcSAsim Jamshed DestroyStreamQueue(listener->acceptq); 130176404edcSAsim Jamshed listener->acceptq = NULL; 130276404edcSAsim Jamshed } 130376404edcSAsim Jamshed 130476404edcSAsim Jamshed pthread_mutex_lock(&listener->accept_lock); 130576404edcSAsim Jamshed pthread_cond_signal(&listener->accept_cond); 130676404edcSAsim Jamshed pthread_mutex_unlock(&listener->accept_lock); 130776404edcSAsim Jamshed 130876404edcSAsim Jamshed pthread_cond_destroy(&listener->accept_cond); 130976404edcSAsim Jamshed pthread_mutex_destroy(&listener->accept_lock); 131076404edcSAsim Jamshed 131176404edcSAsim Jamshed free(listener); 131276404edcSAsim Jamshed mtcp->smap[sockid].listener = NULL; 131376404edcSAsim Jamshed 131476404edcSAsim Jamshed return 0; 131576404edcSAsim Jamshed } 131676404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 131776404edcSAsim Jamshed int 131876404edcSAsim Jamshed mtcp_close(mctx_t mctx, int sockid) 131976404edcSAsim Jamshed { 132076404edcSAsim Jamshed mtcp_manager_t mtcp; 132176404edcSAsim Jamshed int ret; 132276404edcSAsim Jamshed 132376404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 132476404edcSAsim Jamshed if (!mtcp) { 132576404edcSAsim Jamshed errno = EACCES; 132676404edcSAsim Jamshed return -1; 132776404edcSAsim Jamshed } 132876404edcSAsim Jamshed 132976404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 133076404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 133176404edcSAsim Jamshed errno = EBADF; 133276404edcSAsim Jamshed return -1; 133376404edcSAsim Jamshed } 133476404edcSAsim Jamshed 133576404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 133676404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 133776404edcSAsim Jamshed errno = EBADF; 133876404edcSAsim Jamshed return -1; 133976404edcSAsim Jamshed } 134076404edcSAsim Jamshed 134176404edcSAsim Jamshed TRACE_API("Socket %d: mtcp_close called.\n", sockid); 134276404edcSAsim Jamshed 134376404edcSAsim Jamshed switch (mtcp->smap[sockid].socktype) { 134476404edcSAsim Jamshed case MOS_SOCK_STREAM: 134576404edcSAsim Jamshed ret = CloseStreamSocket(mctx, sockid); 134676404edcSAsim Jamshed break; 134776404edcSAsim Jamshed 134876404edcSAsim Jamshed case MOS_SOCK_STREAM_LISTEN: 134976404edcSAsim Jamshed ret = CloseListeningSocket(mctx, sockid); 135076404edcSAsim Jamshed break; 135176404edcSAsim Jamshed 135276404edcSAsim Jamshed case MOS_SOCK_EPOLL: 135376404edcSAsim Jamshed ret = CloseEpollSocket(mctx, sockid); 135476404edcSAsim Jamshed break; 135576404edcSAsim Jamshed 135676404edcSAsim Jamshed case MOS_SOCK_PIPE: 135776404edcSAsim Jamshed ret = PipeClose(mctx, sockid); 135876404edcSAsim Jamshed break; 135976404edcSAsim Jamshed 136076404edcSAsim Jamshed default: 136176404edcSAsim Jamshed errno = EINVAL; 136276404edcSAsim Jamshed ret = -1; 136376404edcSAsim Jamshed break; 136476404edcSAsim Jamshed } 136576404edcSAsim Jamshed 136676404edcSAsim Jamshed FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 136776404edcSAsim Jamshed 136876404edcSAsim Jamshed return ret; 136976404edcSAsim Jamshed } 137076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 137176404edcSAsim Jamshed int 137276404edcSAsim Jamshed mtcp_abort(mctx_t mctx, int sockid) 137376404edcSAsim Jamshed { 137476404edcSAsim Jamshed mtcp_manager_t mtcp; 137576404edcSAsim Jamshed tcp_stream *cur_stream; 137676404edcSAsim Jamshed int ret; 137776404edcSAsim Jamshed 137876404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 137976404edcSAsim Jamshed if (!mtcp) { 138076404edcSAsim Jamshed errno = EACCES; 138176404edcSAsim Jamshed return -1; 138276404edcSAsim Jamshed } 138376404edcSAsim Jamshed 138476404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 138576404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 138676404edcSAsim Jamshed errno = EBADF; 138776404edcSAsim Jamshed return -1; 138876404edcSAsim Jamshed } 138976404edcSAsim Jamshed 139076404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 139176404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 139276404edcSAsim Jamshed errno = EBADF; 139376404edcSAsim Jamshed return -1; 139476404edcSAsim Jamshed } 139576404edcSAsim Jamshed 139676404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 139776404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 139876404edcSAsim Jamshed errno = ENOTSOCK; 139976404edcSAsim Jamshed return -1; 140076404edcSAsim Jamshed } 140176404edcSAsim Jamshed 140276404edcSAsim Jamshed cur_stream = mtcp->smap[sockid].stream; 140376404edcSAsim Jamshed if (!cur_stream) { 140476404edcSAsim Jamshed TRACE_API("Stream %d: does not exist.\n", sockid); 140576404edcSAsim Jamshed errno = ENOTCONN; 140676404edcSAsim Jamshed return -1; 140776404edcSAsim Jamshed } 140876404edcSAsim Jamshed 140976404edcSAsim Jamshed TRACE_API("Socket %d: mtcp_abort()\n", sockid); 141076404edcSAsim Jamshed 141176404edcSAsim Jamshed FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 141276404edcSAsim Jamshed cur_stream->socket = NULL; 141376404edcSAsim Jamshed 141476404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 141576404edcSAsim Jamshed TRACE_API("Stream %d: connection already reset.\n", sockid); 141676404edcSAsim Jamshed return ERROR; 141776404edcSAsim Jamshed 141876404edcSAsim Jamshed } else if (cur_stream->state == TCP_ST_SYN_SENT) { 141976404edcSAsim Jamshed /* TODO: this should notify event failure to all 142076404edcSAsim Jamshed previous read() or write() calls */ 142176404edcSAsim Jamshed cur_stream->state = TCP_ST_CLOSED_RSVD; 142276404edcSAsim Jamshed cur_stream->close_reason = TCP_ACTIVE_CLOSE; 142376404edcSAsim Jamshed cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 142476404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock); 142576404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream); 142676404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 142776404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 142876404edcSAsim Jamshed return 0; 142976404edcSAsim Jamshed 143076404edcSAsim Jamshed } else if (cur_stream->state == TCP_ST_CLOSING || 143176404edcSAsim Jamshed cur_stream->state == TCP_ST_LAST_ACK || 143276404edcSAsim Jamshed cur_stream->state == TCP_ST_TIME_WAIT) { 143376404edcSAsim Jamshed cur_stream->state = TCP_ST_CLOSED_RSVD; 143476404edcSAsim Jamshed cur_stream->close_reason = TCP_ACTIVE_CLOSE; 143576404edcSAsim Jamshed cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 143676404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock); 143776404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream); 143876404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 143976404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 144076404edcSAsim Jamshed return 0; 144176404edcSAsim Jamshed } 144276404edcSAsim Jamshed 144376404edcSAsim Jamshed /* the stream structure will be destroyed after sending RST */ 144476404edcSAsim Jamshed if (cur_stream->sndvar->on_resetq) { 144576404edcSAsim Jamshed TRACE_ERROR("Stream %d: calling mtcp_abort() " 144676404edcSAsim Jamshed "when in reset queue.\n", sockid); 144776404edcSAsim Jamshed errno = ECONNRESET; 144876404edcSAsim Jamshed return -1; 144976404edcSAsim Jamshed } 145076404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->reset_lock); 145176404edcSAsim Jamshed cur_stream->sndvar->on_resetq = TRUE; 145276404edcSAsim Jamshed ret = StreamEnqueue(mtcp->resetq, cur_stream); 145376404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->reset_lock); 145476404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 145576404edcSAsim Jamshed 145676404edcSAsim Jamshed if (ret < 0) { 145776404edcSAsim Jamshed TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 145876404edcSAsim Jamshed errno = EAGAIN; 145976404edcSAsim Jamshed return -1; 146076404edcSAsim Jamshed } 146176404edcSAsim Jamshed 146276404edcSAsim Jamshed return 0; 146376404edcSAsim Jamshed } 146476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 146576404edcSAsim Jamshed static inline int 1466df3fae06SAsim Jamshed PeekForUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len) 1467df3fae06SAsim Jamshed { 1468df3fae06SAsim Jamshed struct tcp_recv_vars *rcvvar = cur_stream->rcvvar; 1469df3fae06SAsim Jamshed int copylen; 1470df3fae06SAsim Jamshed tcprb_t *rb = rcvvar->rcvbuf; 1471df3fae06SAsim Jamshed 1472df3fae06SAsim Jamshed if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) { 1473df3fae06SAsim Jamshed errno = EAGAIN; 1474df3fae06SAsim Jamshed return -1; 1475df3fae06SAsim Jamshed } 1476df3fae06SAsim Jamshed 1477df3fae06SAsim Jamshed return copylen; 1478df3fae06SAsim Jamshed } 1479df3fae06SAsim Jamshed /*----------------------------------------------------------------------------*/ 1480df3fae06SAsim Jamshed static inline int 148176404edcSAsim Jamshed CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len) 148276404edcSAsim Jamshed { 148376404edcSAsim Jamshed struct tcp_recv_vars *rcvvar = cur_stream->rcvvar; 148476404edcSAsim Jamshed int copylen; 148576404edcSAsim Jamshed tcprb_t *rb = rcvvar->rcvbuf; 148676404edcSAsim Jamshed if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) { 148776404edcSAsim Jamshed errno = EAGAIN; 148876404edcSAsim Jamshed return -1; 148976404edcSAsim Jamshed } 149076404edcSAsim Jamshed tcprb_setpile(rb, rb->pile + copylen); 149176404edcSAsim Jamshed 149276404edcSAsim Jamshed rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb); 149376404edcSAsim Jamshed //printf("rcv_wnd: %d\n", rcvvar->rcv_wnd); 149476404edcSAsim Jamshed 149576404edcSAsim Jamshed /* Advertise newly freed receive buffer */ 149676404edcSAsim Jamshed if (cur_stream->need_wnd_adv) { 149776404edcSAsim Jamshed if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) { 149876404edcSAsim Jamshed if (!cur_stream->sndvar->on_ackq) { 149976404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->ackq_lock); 150076404edcSAsim Jamshed cur_stream->sndvar->on_ackq = TRUE; 150176404edcSAsim Jamshed StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */ 150276404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->ackq_lock); 150376404edcSAsim Jamshed cur_stream->need_wnd_adv = FALSE; 150476404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 150576404edcSAsim Jamshed } 150676404edcSAsim Jamshed } 150776404edcSAsim Jamshed } 150876404edcSAsim Jamshed 150976404edcSAsim Jamshed return copylen; 151076404edcSAsim Jamshed } 151176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 151276404edcSAsim Jamshed ssize_t 1513df3fae06SAsim Jamshed mtcp_recv(mctx_t mctx, int sockid, char *buf, size_t len, int flags) 151476404edcSAsim Jamshed { 151576404edcSAsim Jamshed mtcp_manager_t mtcp; 151676404edcSAsim Jamshed socket_map_t socket; 151776404edcSAsim Jamshed tcp_stream *cur_stream; 151876404edcSAsim Jamshed struct tcp_recv_vars *rcvvar; 151976404edcSAsim Jamshed int event_remaining, merged_len; 152076404edcSAsim Jamshed int ret; 152176404edcSAsim Jamshed 152276404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 152376404edcSAsim Jamshed if (!mtcp) { 152476404edcSAsim Jamshed errno = EACCES; 152576404edcSAsim Jamshed return -1; 152676404edcSAsim Jamshed } 152776404edcSAsim Jamshed 152876404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 152976404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 153076404edcSAsim Jamshed errno = EBADF; 153176404edcSAsim Jamshed return -1; 153276404edcSAsim Jamshed } 153376404edcSAsim Jamshed 153476404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 153576404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 153676404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 153776404edcSAsim Jamshed errno = EBADF; 153876404edcSAsim Jamshed return -1; 153976404edcSAsim Jamshed } 154076404edcSAsim Jamshed 154176404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_PIPE) { 154276404edcSAsim Jamshed return PipeRead(mctx, sockid, buf, len); 154376404edcSAsim Jamshed } 154476404edcSAsim Jamshed 154576404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM) { 154676404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 154776404edcSAsim Jamshed errno = ENOTSOCK; 154876404edcSAsim Jamshed return -1; 154976404edcSAsim Jamshed } 155076404edcSAsim Jamshed 155176404edcSAsim Jamshed /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 155276404edcSAsim Jamshed cur_stream = socket->stream; 155376404edcSAsim Jamshed if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf || 155476404edcSAsim Jamshed !(cur_stream->state >= TCP_ST_ESTABLISHED && 155576404edcSAsim Jamshed cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 155676404edcSAsim Jamshed errno = ENOTCONN; 155776404edcSAsim Jamshed return -1; 155876404edcSAsim Jamshed } 155976404edcSAsim Jamshed 156076404edcSAsim Jamshed rcvvar = cur_stream->rcvvar; 156176404edcSAsim Jamshed 156276404edcSAsim Jamshed merged_len = tcprb_cflen(rcvvar->rcvbuf); 156376404edcSAsim Jamshed 156476404edcSAsim Jamshed /* if CLOSE_WAIT, return 0 if there is no payload */ 156576404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 156676404edcSAsim Jamshed if (merged_len == 0) 156776404edcSAsim Jamshed return 0; 156876404edcSAsim Jamshed } 156976404edcSAsim Jamshed 157076404edcSAsim Jamshed /* return EAGAIN if no receive buffer */ 157176404edcSAsim Jamshed if (socket->opts & MTCP_NONBLOCK) { 15721879243cSAsim Jamshed if (merged_len == 0) { 157376404edcSAsim Jamshed errno = EAGAIN; 157476404edcSAsim Jamshed return -1; 157576404edcSAsim Jamshed } 157676404edcSAsim Jamshed } 157776404edcSAsim Jamshed 157876404edcSAsim Jamshed SBUF_LOCK(&rcvvar->read_lock); 157976404edcSAsim Jamshed 1580df3fae06SAsim Jamshed switch (flags) { 1581df3fae06SAsim Jamshed case 0: 158276404edcSAsim Jamshed ret = CopyToUser(mtcp, cur_stream, buf, len); 1583df3fae06SAsim Jamshed break; 1584df3fae06SAsim Jamshed case MSG_PEEK: 1585df3fae06SAsim Jamshed ret = PeekForUser(mtcp, cur_stream, buf, len); 1586df3fae06SAsim Jamshed break; 1587df3fae06SAsim Jamshed default: 1588df3fae06SAsim Jamshed SBUF_UNLOCK(&rcvvar->read_lock); 1589df3fae06SAsim Jamshed ret = -1; 1590df3fae06SAsim Jamshed errno = EINVAL; 1591df3fae06SAsim Jamshed return ret; 1592df3fae06SAsim Jamshed } 159376404edcSAsim Jamshed 159476404edcSAsim Jamshed merged_len = tcprb_cflen(rcvvar->rcvbuf); 159576404edcSAsim Jamshed event_remaining = FALSE; 159676404edcSAsim Jamshed /* if there are remaining payload, generate EPOLLIN */ 159776404edcSAsim Jamshed /* (may due to insufficient user buffer) */ 159876404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLIN) { 159976404edcSAsim Jamshed if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 160076404edcSAsim Jamshed event_remaining = TRUE; 160176404edcSAsim Jamshed } 160276404edcSAsim Jamshed } 160376404edcSAsim Jamshed /* if waiting for close, notify it if no remaining data */ 160476404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT && 160576404edcSAsim Jamshed merged_len == 0 && ret > 0) { 160676404edcSAsim Jamshed event_remaining = TRUE; 160776404edcSAsim Jamshed } 160876404edcSAsim Jamshed 160976404edcSAsim Jamshed SBUF_UNLOCK(&rcvvar->read_lock); 161076404edcSAsim Jamshed 161176404edcSAsim Jamshed if (event_remaining) { 161276404edcSAsim Jamshed if (socket->epoll) { 161376404edcSAsim Jamshed AddEpollEvent(mtcp->ep, 161476404edcSAsim Jamshed USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 161576404edcSAsim Jamshed } 161676404edcSAsim Jamshed } 161776404edcSAsim Jamshed 1618df3fae06SAsim Jamshed TRACE_API("Stream %d: mtcp_recv() returning %d\n", cur_stream->id, ret); 161976404edcSAsim Jamshed return ret; 162076404edcSAsim Jamshed } 162176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 1622df3fae06SAsim Jamshed inline ssize_t 1623df3fae06SAsim Jamshed mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len) 1624df3fae06SAsim Jamshed { 1625df3fae06SAsim Jamshed return mtcp_recv(mctx, sockid, buf, len, 0); 1626df3fae06SAsim Jamshed } 1627df3fae06SAsim Jamshed /*----------------------------------------------------------------------------*/ 162876404edcSAsim Jamshed ssize_t 1629a5e1a556SAsim Jamshed mtcp_readv(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV) 163076404edcSAsim Jamshed { 163176404edcSAsim Jamshed mtcp_manager_t mtcp; 163276404edcSAsim Jamshed socket_map_t socket; 163376404edcSAsim Jamshed tcp_stream *cur_stream; 163476404edcSAsim Jamshed struct tcp_recv_vars *rcvvar; 163576404edcSAsim Jamshed int ret, bytes_read, i; 163676404edcSAsim Jamshed int event_remaining, merged_len; 163776404edcSAsim Jamshed 163876404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 163976404edcSAsim Jamshed if (!mtcp) { 164076404edcSAsim Jamshed errno = EACCES; 164176404edcSAsim Jamshed return -1; 164276404edcSAsim Jamshed } 164376404edcSAsim Jamshed 164476404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 164576404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 164676404edcSAsim Jamshed errno = EBADF; 164776404edcSAsim Jamshed return -1; 164876404edcSAsim Jamshed } 164976404edcSAsim Jamshed 165076404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 165176404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 165276404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 165376404edcSAsim Jamshed errno = EBADF; 165476404edcSAsim Jamshed return -1; 165576404edcSAsim Jamshed } 165676404edcSAsim Jamshed 165776404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM) { 165876404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 165976404edcSAsim Jamshed errno = ENOTSOCK; 166076404edcSAsim Jamshed return -1; 166176404edcSAsim Jamshed } 166276404edcSAsim Jamshed 166376404edcSAsim Jamshed /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 166476404edcSAsim Jamshed cur_stream = socket->stream; 16651879243cSAsim Jamshed if (!cur_stream || !cur_stream->rcvvar->rcvbuf || 166676404edcSAsim Jamshed !(cur_stream->state >= TCP_ST_ESTABLISHED && 166776404edcSAsim Jamshed cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 166876404edcSAsim Jamshed errno = ENOTCONN; 166976404edcSAsim Jamshed return -1; 167076404edcSAsim Jamshed } 167176404edcSAsim Jamshed 167276404edcSAsim Jamshed rcvvar = cur_stream->rcvvar; 167376404edcSAsim Jamshed 167476404edcSAsim Jamshed merged_len = tcprb_cflen(rcvvar->rcvbuf); 167576404edcSAsim Jamshed 167676404edcSAsim Jamshed /* if CLOSE_WAIT, return 0 if there is no payload */ 167776404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 167876404edcSAsim Jamshed if (merged_len == 0) 167976404edcSAsim Jamshed return 0; 168076404edcSAsim Jamshed } 168176404edcSAsim Jamshed 168276404edcSAsim Jamshed /* return EAGAIN if no receive buffer */ 168376404edcSAsim Jamshed if (socket->opts & MTCP_NONBLOCK) { 16841879243cSAsim Jamshed if (merged_len == 0) { 168576404edcSAsim Jamshed errno = EAGAIN; 168676404edcSAsim Jamshed return -1; 168776404edcSAsim Jamshed } 168876404edcSAsim Jamshed } 168976404edcSAsim Jamshed 169076404edcSAsim Jamshed SBUF_LOCK(&rcvvar->read_lock); 169176404edcSAsim Jamshed 169276404edcSAsim Jamshed /* read and store the contents to the vectored buffers */ 169376404edcSAsim Jamshed bytes_read = 0; 169476404edcSAsim Jamshed for (i = 0; i < numIOV; i++) { 169576404edcSAsim Jamshed if (iov[i].iov_len <= 0) 169676404edcSAsim Jamshed continue; 169776404edcSAsim Jamshed 169876404edcSAsim Jamshed ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 169976404edcSAsim Jamshed if (ret <= 0) 170076404edcSAsim Jamshed break; 170176404edcSAsim Jamshed 170276404edcSAsim Jamshed bytes_read += ret; 170376404edcSAsim Jamshed 170476404edcSAsim Jamshed if (ret < iov[i].iov_len) 170576404edcSAsim Jamshed break; 170676404edcSAsim Jamshed } 170776404edcSAsim Jamshed 170876404edcSAsim Jamshed merged_len = tcprb_cflen(rcvvar->rcvbuf); 170976404edcSAsim Jamshed 171076404edcSAsim Jamshed event_remaining = FALSE; 171176404edcSAsim Jamshed /* if there are remaining payload, generate read event */ 171276404edcSAsim Jamshed /* (may due to insufficient user buffer) */ 171376404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLIN) { 171476404edcSAsim Jamshed if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 171576404edcSAsim Jamshed event_remaining = TRUE; 171676404edcSAsim Jamshed } 171776404edcSAsim Jamshed } 171876404edcSAsim Jamshed /* if waiting for close, notify it if no remaining data */ 171976404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT && 172076404edcSAsim Jamshed merged_len == 0 && bytes_read > 0) { 172176404edcSAsim Jamshed event_remaining = TRUE; 172276404edcSAsim Jamshed } 172376404edcSAsim Jamshed 172476404edcSAsim Jamshed SBUF_UNLOCK(&rcvvar->read_lock); 172576404edcSAsim Jamshed 172676404edcSAsim Jamshed if(event_remaining) { 172776404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) { 172876404edcSAsim Jamshed AddEpollEvent(mtcp->ep, 172976404edcSAsim Jamshed USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 173076404edcSAsim Jamshed } 173176404edcSAsim Jamshed } 173276404edcSAsim Jamshed 173376404edcSAsim Jamshed TRACE_API("Stream %d: mtcp_readv() returning %d\n", 173476404edcSAsim Jamshed cur_stream->id, bytes_read); 173576404edcSAsim Jamshed return bytes_read; 173676404edcSAsim Jamshed } 173776404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 173876404edcSAsim Jamshed static inline int 1739a5e1a556SAsim Jamshed CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, const char *buf, int len) 174076404edcSAsim Jamshed { 174176404edcSAsim Jamshed struct tcp_send_vars *sndvar = cur_stream->sndvar; 174276404edcSAsim Jamshed int sndlen; 174376404edcSAsim Jamshed int ret; 174476404edcSAsim Jamshed 174576404edcSAsim Jamshed sndlen = MIN((int)sndvar->snd_wnd, len); 174676404edcSAsim Jamshed if (sndlen <= 0) { 174776404edcSAsim Jamshed errno = EAGAIN; 174876404edcSAsim Jamshed return -1; 174976404edcSAsim Jamshed } 175076404edcSAsim Jamshed 175176404edcSAsim Jamshed /* allocate send buffer if not exist */ 175276404edcSAsim Jamshed if (!sndvar->sndbuf) { 175376404edcSAsim Jamshed sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1); 175476404edcSAsim Jamshed if (!sndvar->sndbuf) { 175576404edcSAsim Jamshed cur_stream->close_reason = TCP_NO_MEM; 175676404edcSAsim Jamshed /* notification may not required due to -1 return */ 175776404edcSAsim Jamshed errno = ENOMEM; 175876404edcSAsim Jamshed return -1; 175976404edcSAsim Jamshed } 176076404edcSAsim Jamshed } 176176404edcSAsim Jamshed 176276404edcSAsim Jamshed ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen); 176376404edcSAsim Jamshed assert(ret == sndlen); 176476404edcSAsim Jamshed sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len; 176576404edcSAsim Jamshed if (ret <= 0) { 176676404edcSAsim Jamshed TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n", 176776404edcSAsim Jamshed ret, sndlen, sndvar->sndbuf->len); 176876404edcSAsim Jamshed errno = EAGAIN; 176976404edcSAsim Jamshed return -1; 177076404edcSAsim Jamshed } 177176404edcSAsim Jamshed 177276404edcSAsim Jamshed if (sndvar->snd_wnd <= 0) { 177376404edcSAsim Jamshed TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n", 177476404edcSAsim Jamshed cur_stream->id, sndvar->snd_wnd); 177576404edcSAsim Jamshed } 177676404edcSAsim Jamshed 177776404edcSAsim Jamshed return ret; 177876404edcSAsim Jamshed } 177976404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 178076404edcSAsim Jamshed ssize_t 1781a5e1a556SAsim Jamshed mtcp_write(mctx_t mctx, int sockid, const char *buf, size_t len) 178276404edcSAsim Jamshed { 178376404edcSAsim Jamshed mtcp_manager_t mtcp; 178476404edcSAsim Jamshed socket_map_t socket; 178576404edcSAsim Jamshed tcp_stream *cur_stream; 178676404edcSAsim Jamshed struct tcp_send_vars *sndvar; 178776404edcSAsim Jamshed int ret; 178876404edcSAsim Jamshed 178976404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 179076404edcSAsim Jamshed if (!mtcp) { 179176404edcSAsim Jamshed errno = EACCES; 179276404edcSAsim Jamshed return -1; 179376404edcSAsim Jamshed } 179476404edcSAsim Jamshed 179576404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 179676404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 179776404edcSAsim Jamshed errno = EBADF; 179876404edcSAsim Jamshed return -1; 179976404edcSAsim Jamshed } 180076404edcSAsim Jamshed 180176404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 180276404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 180376404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 180476404edcSAsim Jamshed errno = EBADF; 180576404edcSAsim Jamshed return -1; 180676404edcSAsim Jamshed } 180776404edcSAsim Jamshed 180876404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_PIPE) { 180976404edcSAsim Jamshed return PipeWrite(mctx, sockid, buf, len); 181076404edcSAsim Jamshed } 181176404edcSAsim Jamshed 181276404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM) { 181376404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 181476404edcSAsim Jamshed errno = ENOTSOCK; 181576404edcSAsim Jamshed return -1; 181676404edcSAsim Jamshed } 181776404edcSAsim Jamshed 181876404edcSAsim Jamshed cur_stream = socket->stream; 181976404edcSAsim Jamshed if (!cur_stream || 182076404edcSAsim Jamshed !(cur_stream->state == TCP_ST_ESTABLISHED || 182176404edcSAsim Jamshed cur_stream->state == TCP_ST_CLOSE_WAIT)) { 182276404edcSAsim Jamshed errno = ENOTCONN; 182376404edcSAsim Jamshed return -1; 182476404edcSAsim Jamshed } 182576404edcSAsim Jamshed 182676404edcSAsim Jamshed if (len <= 0) { 182776404edcSAsim Jamshed if (socket->opts & MTCP_NONBLOCK) { 182876404edcSAsim Jamshed errno = EAGAIN; 182976404edcSAsim Jamshed return -1; 183076404edcSAsim Jamshed } else { 183176404edcSAsim Jamshed return 0; 183276404edcSAsim Jamshed } 183376404edcSAsim Jamshed } 183476404edcSAsim Jamshed 183576404edcSAsim Jamshed sndvar = cur_stream->sndvar; 183676404edcSAsim Jamshed 183776404edcSAsim Jamshed SBUF_LOCK(&sndvar->write_lock); 183876404edcSAsim Jamshed ret = CopyFromUser(mtcp, cur_stream, buf, len); 183976404edcSAsim Jamshed 184076404edcSAsim Jamshed SBUF_UNLOCK(&sndvar->write_lock); 184176404edcSAsim Jamshed 184276404edcSAsim Jamshed if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 184376404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->sendq_lock); 184476404edcSAsim Jamshed sndvar->on_sendq = TRUE; 184576404edcSAsim Jamshed StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 184676404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->sendq_lock); 184776404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 184876404edcSAsim Jamshed } 184976404edcSAsim Jamshed 185076404edcSAsim Jamshed if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) { 185176404edcSAsim Jamshed ret = -1; 185276404edcSAsim Jamshed errno = EAGAIN; 185376404edcSAsim Jamshed } 185476404edcSAsim Jamshed 185576404edcSAsim Jamshed /* if there are remaining sending buffer, generate write event */ 185676404edcSAsim Jamshed if (sndvar->snd_wnd > 0) { 185776404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 185876404edcSAsim Jamshed AddEpollEvent(mtcp->ep, 185976404edcSAsim Jamshed USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 186076404edcSAsim Jamshed } 186176404edcSAsim Jamshed } 186276404edcSAsim Jamshed 186376404edcSAsim Jamshed TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret); 186476404edcSAsim Jamshed return ret; 186576404edcSAsim Jamshed } 186676404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 186776404edcSAsim Jamshed ssize_t 1868a5e1a556SAsim Jamshed mtcp_writev(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV) 186976404edcSAsim Jamshed { 187076404edcSAsim Jamshed mtcp_manager_t mtcp; 187176404edcSAsim Jamshed socket_map_t socket; 187276404edcSAsim Jamshed tcp_stream *cur_stream; 187376404edcSAsim Jamshed struct tcp_send_vars *sndvar; 187476404edcSAsim Jamshed int ret, to_write, i; 187576404edcSAsim Jamshed 187676404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 187776404edcSAsim Jamshed if (!mtcp) { 187876404edcSAsim Jamshed errno = EACCES; 187976404edcSAsim Jamshed return -1; 188076404edcSAsim Jamshed } 188176404edcSAsim Jamshed 188276404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 188376404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 188476404edcSAsim Jamshed errno = EBADF; 188576404edcSAsim Jamshed return -1; 188676404edcSAsim Jamshed } 188776404edcSAsim Jamshed 188876404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 188976404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 189076404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 189176404edcSAsim Jamshed errno = EBADF; 189276404edcSAsim Jamshed return -1; 189376404edcSAsim Jamshed } 189476404edcSAsim Jamshed 189576404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM) { 189676404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 189776404edcSAsim Jamshed errno = ENOTSOCK; 189876404edcSAsim Jamshed return -1; 189976404edcSAsim Jamshed } 190076404edcSAsim Jamshed 190176404edcSAsim Jamshed cur_stream = socket->stream; 190276404edcSAsim Jamshed if (!cur_stream || 190376404edcSAsim Jamshed !(cur_stream->state == TCP_ST_ESTABLISHED || 190476404edcSAsim Jamshed cur_stream->state == TCP_ST_CLOSE_WAIT)) { 190576404edcSAsim Jamshed errno = ENOTCONN; 190676404edcSAsim Jamshed return -1; 190776404edcSAsim Jamshed } 190876404edcSAsim Jamshed 190976404edcSAsim Jamshed sndvar = cur_stream->sndvar; 191076404edcSAsim Jamshed SBUF_LOCK(&sndvar->write_lock); 191176404edcSAsim Jamshed 191276404edcSAsim Jamshed /* write from the vectored buffers */ 191376404edcSAsim Jamshed to_write = 0; 191476404edcSAsim Jamshed for (i = 0; i < numIOV; i++) { 191576404edcSAsim Jamshed if (iov[i].iov_len <= 0) 191676404edcSAsim Jamshed continue; 191776404edcSAsim Jamshed 191876404edcSAsim Jamshed ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 191976404edcSAsim Jamshed if (ret <= 0) 192076404edcSAsim Jamshed break; 192176404edcSAsim Jamshed 192276404edcSAsim Jamshed to_write += ret; 192376404edcSAsim Jamshed 192476404edcSAsim Jamshed if (ret < iov[i].iov_len) 192576404edcSAsim Jamshed break; 192676404edcSAsim Jamshed } 192776404edcSAsim Jamshed SBUF_UNLOCK(&sndvar->write_lock); 192876404edcSAsim Jamshed 192976404edcSAsim Jamshed if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 193076404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->sendq_lock); 193176404edcSAsim Jamshed sndvar->on_sendq = TRUE; 193276404edcSAsim Jamshed StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 193376404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->sendq_lock); 193476404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 193576404edcSAsim Jamshed } 193676404edcSAsim Jamshed 193776404edcSAsim Jamshed if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) { 193876404edcSAsim Jamshed to_write = -1; 193976404edcSAsim Jamshed errno = EAGAIN; 194076404edcSAsim Jamshed } 194176404edcSAsim Jamshed 194276404edcSAsim Jamshed /* if there are remaining sending buffer, generate write event */ 194376404edcSAsim Jamshed if (sndvar->snd_wnd > 0) { 194476404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 194576404edcSAsim Jamshed AddEpollEvent(mtcp->ep, 194676404edcSAsim Jamshed USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 194776404edcSAsim Jamshed } 194876404edcSAsim Jamshed } 194976404edcSAsim Jamshed 195076404edcSAsim Jamshed TRACE_API("Stream %d: mtcp_writev() returning %d\n", 195176404edcSAsim Jamshed cur_stream->id, to_write); 195276404edcSAsim Jamshed return to_write; 195376404edcSAsim Jamshed } 195476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 195576404edcSAsim Jamshed uint32_t 195676404edcSAsim Jamshed mtcp_get_connection_cnt(mctx_t mctx) 195776404edcSAsim Jamshed { 195876404edcSAsim Jamshed mtcp_manager_t mtcp; 195976404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 196076404edcSAsim Jamshed if (!mtcp) { 196176404edcSAsim Jamshed errno = EACCES; 196276404edcSAsim Jamshed return -1; 196376404edcSAsim Jamshed } 196476404edcSAsim Jamshed 196576404edcSAsim Jamshed if (mtcp->num_msp > 0) 196676404edcSAsim Jamshed return mtcp->flow_cnt / 2; 196776404edcSAsim Jamshed else 196876404edcSAsim Jamshed return mtcp->flow_cnt; 196976404edcSAsim Jamshed } 197076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 1971