176404edcSAsim Jamshed #include <sys/queue.h> 276404edcSAsim Jamshed #include <sys/ioctl.h> 376404edcSAsim Jamshed #include <limits.h> 476404edcSAsim Jamshed #include <unistd.h> 576404edcSAsim Jamshed #include <assert.h> 676404edcSAsim Jamshed #include <string.h> 776404edcSAsim Jamshed 876404edcSAsim Jamshed #ifdef DARWIN 976404edcSAsim Jamshed #include <netinet/tcp.h> 1076404edcSAsim Jamshed #include <netinet/ip.h> 1176404edcSAsim Jamshed #include <netinet/if_ether.h> 1276404edcSAsim Jamshed #endif 1376404edcSAsim Jamshed 1476404edcSAsim Jamshed #include "mtcp.h" 1576404edcSAsim Jamshed #include "mtcp_api.h" 1676404edcSAsim Jamshed #include "tcp_in.h" 1776404edcSAsim Jamshed #include "tcp_stream.h" 1876404edcSAsim Jamshed #include "tcp_out.h" 1976404edcSAsim Jamshed #include "ip_out.h" 2076404edcSAsim Jamshed #include "eventpoll.h" 2176404edcSAsim Jamshed #include "pipe.h" 2276404edcSAsim Jamshed #include "fhash.h" 2376404edcSAsim Jamshed #include "addr_pool.h" 2476404edcSAsim Jamshed #include "rss.h" 2576404edcSAsim Jamshed #include "config.h" 2676404edcSAsim Jamshed #include "debug.h" 2776404edcSAsim Jamshed #include "eventpoll.h" 2876404edcSAsim Jamshed #include "mos_api.h" 2976404edcSAsim Jamshed #include "tcp_rb.h" 3076404edcSAsim Jamshed 3176404edcSAsim Jamshed #define MAX(a, b) ((a)>(b)?(a):(b)) 3276404edcSAsim Jamshed #define MIN(a, b) ((a)<(b)?(a):(b)) 3376404edcSAsim Jamshed 3476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 3576404edcSAsim Jamshed /** Stop monitoring the socket! (function prototype) 3676404edcSAsim Jamshed * @param [in] mctx: mtcp context 3776404edcSAsim Jamshed * @param [in] sock: monitoring stream socket id 3876404edcSAsim Jamshed * @param [in] side: side of monitoring (client side, server side or both) 3976404edcSAsim Jamshed * 4076404edcSAsim Jamshed * This function is now DEPRECATED and is only used within mOS core... 4176404edcSAsim Jamshed */ 4276404edcSAsim Jamshed int 4376404edcSAsim Jamshed mtcp_cb_stop(mctx_t mctx, int sock, int side); 4476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 4576404edcSAsim Jamshed /** Reset the connection (send RST packets to both sides) 4676404edcSAsim Jamshed * (We need to decide the API for this.) 4776404edcSAsim Jamshed */ 4876404edcSAsim Jamshed //int 4976404edcSAsim Jamshed //mtcp_cb_reset(mctx_t mctx, int sock, int side); 5076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 5176404edcSAsim Jamshed inline mtcp_manager_t 5276404edcSAsim Jamshed GetMTCPManager(mctx_t mctx) 5376404edcSAsim Jamshed { 5476404edcSAsim Jamshed if (!mctx) { 5576404edcSAsim Jamshed errno = EACCES; 5676404edcSAsim Jamshed return NULL; 5776404edcSAsim Jamshed } 5876404edcSAsim Jamshed 5976404edcSAsim Jamshed if (mctx->cpu < 0 || mctx->cpu >= num_cpus) { 6076404edcSAsim Jamshed errno = EINVAL; 6176404edcSAsim Jamshed return NULL; 6276404edcSAsim Jamshed } 6376404edcSAsim Jamshed 644cb4e140SAsim Jamshed if (!g_mtcp[mctx->cpu] || g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) { 6576404edcSAsim Jamshed errno = EPERM; 6676404edcSAsim Jamshed return NULL; 6776404edcSAsim Jamshed } 6876404edcSAsim Jamshed 6976404edcSAsim Jamshed return g_mtcp[mctx->cpu]; 7076404edcSAsim Jamshed } 7176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 72a5e1a556SAsim Jamshed static inline int 7376404edcSAsim Jamshed GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen) 7476404edcSAsim Jamshed { 7576404edcSAsim Jamshed tcp_stream *cur_stream; 7676404edcSAsim Jamshed 7776404edcSAsim Jamshed if (!socket->stream) { 7876404edcSAsim Jamshed errno = EBADF; 7976404edcSAsim Jamshed return -1; 8076404edcSAsim Jamshed } 8176404edcSAsim Jamshed 8276404edcSAsim Jamshed cur_stream = socket->stream; 8376404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 8476404edcSAsim Jamshed if (cur_stream->close_reason == TCP_TIMEDOUT || 8576404edcSAsim Jamshed cur_stream->close_reason == TCP_CONN_FAIL || 8676404edcSAsim Jamshed cur_stream->close_reason == TCP_CONN_LOST) { 8776404edcSAsim Jamshed *(int *)optval = ETIMEDOUT; 8876404edcSAsim Jamshed *optlen = sizeof(int); 8976404edcSAsim Jamshed 9076404edcSAsim Jamshed return 0; 9176404edcSAsim Jamshed } 9276404edcSAsim Jamshed } 9376404edcSAsim Jamshed 9476404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT || 9576404edcSAsim Jamshed cur_stream->state == TCP_ST_CLOSED_RSVD) { 9676404edcSAsim Jamshed if (cur_stream->close_reason == TCP_RESET) { 9776404edcSAsim Jamshed *(int *)optval = ECONNRESET; 9876404edcSAsim Jamshed *optlen = sizeof(int); 9976404edcSAsim Jamshed 10076404edcSAsim Jamshed return 0; 10176404edcSAsim Jamshed } 10276404edcSAsim Jamshed } 10376404edcSAsim Jamshed 104a5e1a556SAsim Jamshed if (cur_stream->state == TCP_ST_SYN_SENT && 105a5e1a556SAsim Jamshed errno == EINPROGRESS) { 106a5e1a556SAsim Jamshed *(int *)optval = errno; 107a5e1a556SAsim Jamshed *optlen = sizeof(int); 108a5e1a556SAsim Jamshed 109a5e1a556SAsim Jamshed return -1; 110a5e1a556SAsim Jamshed } 111a5e1a556SAsim Jamshed 112265cb675SAsim Jamshed /* 113265cb675SAsim Jamshed * `base case`: If socket sees no so_error, then 114265cb675SAsim Jamshed * this also means close_reason will always be 115265cb675SAsim Jamshed * TCP_NOT_CLOSED. 116265cb675SAsim Jamshed */ 117265cb675SAsim Jamshed if (cur_stream->close_reason == TCP_NOT_CLOSED) { 118265cb675SAsim Jamshed *(int *)optval = 0; 119265cb675SAsim Jamshed *optlen = sizeof(int); 120265cb675SAsim Jamshed 121265cb675SAsim Jamshed return 0; 122265cb675SAsim Jamshed } 123265cb675SAsim Jamshed 12476404edcSAsim Jamshed errno = ENOSYS; 12576404edcSAsim Jamshed return -1; 12676404edcSAsim Jamshed } 12776404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 12876404edcSAsim Jamshed int 129a5e1a556SAsim Jamshed mtcp_getsockname(mctx_t mctx, int sockid, struct sockaddr *addr, 130a5e1a556SAsim Jamshed socklen_t *addrlen) 131a5e1a556SAsim Jamshed { 132a5e1a556SAsim Jamshed mtcp_manager_t mtcp; 133a5e1a556SAsim Jamshed socket_map_t socket; 134a5e1a556SAsim Jamshed 135a5e1a556SAsim Jamshed mtcp = GetMTCPManager(mctx); 136a5e1a556SAsim Jamshed if (!mtcp) { 137a5e1a556SAsim Jamshed return -1; 138a5e1a556SAsim Jamshed } 139a5e1a556SAsim Jamshed 140a5e1a556SAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 141a5e1a556SAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 142a5e1a556SAsim Jamshed errno = EBADF; 143a5e1a556SAsim Jamshed return -1; 144a5e1a556SAsim Jamshed } 145a5e1a556SAsim Jamshed 146a5e1a556SAsim Jamshed socket = &mtcp->smap[sockid]; 147a5e1a556SAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 148a5e1a556SAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 149a5e1a556SAsim Jamshed errno = EBADF; 150a5e1a556SAsim Jamshed return -1; 151a5e1a556SAsim Jamshed } 152a5e1a556SAsim Jamshed 153a5e1a556SAsim Jamshed if (*addrlen <= 0) { 154a5e1a556SAsim Jamshed TRACE_API("Invalid addrlen: %d\n", *addrlen); 155a5e1a556SAsim Jamshed errno = EINVAL; 156a5e1a556SAsim Jamshed return -1; 157a5e1a556SAsim Jamshed } 158a5e1a556SAsim Jamshed 159a5e1a556SAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 160a5e1a556SAsim Jamshed socket->socktype != MOS_SOCK_STREAM) { 161a5e1a556SAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 162a5e1a556SAsim Jamshed errno = ENOTSOCK; 163a5e1a556SAsim Jamshed return -1; 164a5e1a556SAsim Jamshed } 165a5e1a556SAsim Jamshed 166a5e1a556SAsim Jamshed *(struct sockaddr_in *)addr = socket->saddr; 167a5e1a556SAsim Jamshed *addrlen = sizeof(socket->saddr); 168a5e1a556SAsim Jamshed 169a5e1a556SAsim Jamshed return 0; 170a5e1a556SAsim Jamshed } 171a5e1a556SAsim Jamshed /*----------------------------------------------------------------------------*/ 172a5e1a556SAsim Jamshed int 17376404edcSAsim Jamshed mtcp_getsockopt(mctx_t mctx, int sockid, int level, 17476404edcSAsim Jamshed int optname, void *optval, socklen_t *optlen) 17576404edcSAsim Jamshed { 17676404edcSAsim Jamshed mtcp_manager_t mtcp; 17776404edcSAsim Jamshed socket_map_t socket; 17876404edcSAsim Jamshed 17976404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 18076404edcSAsim Jamshed if (!mtcp) { 18176404edcSAsim Jamshed errno = EACCES; 18276404edcSAsim Jamshed return -1; 18376404edcSAsim Jamshed } 18476404edcSAsim Jamshed 18576404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 18676404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 18776404edcSAsim Jamshed errno = EBADF; 18876404edcSAsim Jamshed return -1; 18976404edcSAsim Jamshed } 19076404edcSAsim Jamshed 19176404edcSAsim Jamshed switch (level) { 19276404edcSAsim Jamshed case SOL_SOCKET: 19376404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 19476404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 19576404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 19676404edcSAsim Jamshed errno = EBADF; 19776404edcSAsim Jamshed return -1; 19876404edcSAsim Jamshed } 19976404edcSAsim Jamshed 20076404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 20176404edcSAsim Jamshed socket->socktype != MOS_SOCK_STREAM) { 20276404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 20376404edcSAsim Jamshed errno = ENOTSOCK; 20476404edcSAsim Jamshed return -1; 20576404edcSAsim Jamshed } 20676404edcSAsim Jamshed 20776404edcSAsim Jamshed if (optname == SO_ERROR) { 20876404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_STREAM) { 20976404edcSAsim Jamshed return GetSocketError(socket, optval, optlen); 21076404edcSAsim Jamshed } 21176404edcSAsim Jamshed } 21276404edcSAsim Jamshed break; 21376404edcSAsim Jamshed case SOL_MONSOCKET: 21476404edcSAsim Jamshed /* check if the calling thread is in MOS context */ 21576404edcSAsim Jamshed if (mtcp->ctx->thread != pthread_self()) { 21676404edcSAsim Jamshed errno = EPERM; 21776404edcSAsim Jamshed return -1; 21876404edcSAsim Jamshed } 21976404edcSAsim Jamshed /* 22076404edcSAsim Jamshed * All options will only work for active 22176404edcSAsim Jamshed * monitor stream sockets 22276404edcSAsim Jamshed */ 22376404edcSAsim Jamshed socket = &mtcp->msmap[sockid]; 22476404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 22576404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 22676404edcSAsim Jamshed errno = ENOTSOCK; 22776404edcSAsim Jamshed return -1; 22876404edcSAsim Jamshed } 22976404edcSAsim Jamshed 23076404edcSAsim Jamshed switch (optname) { 23176404edcSAsim Jamshed case MOS_FRAGINFO_CLIBUF: 23276404edcSAsim Jamshed return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen); 23376404edcSAsim Jamshed case MOS_FRAGINFO_SVRBUF: 23476404edcSAsim Jamshed return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen); 23576404edcSAsim Jamshed case MOS_INFO_CLIBUF: 23676404edcSAsim Jamshed return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen); 23776404edcSAsim Jamshed case MOS_INFO_SVRBUF: 23876404edcSAsim Jamshed return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen); 23976404edcSAsim Jamshed case MOS_TCP_STATE_CLI: 24076404edcSAsim Jamshed return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI, 24176404edcSAsim Jamshed optval, optlen); 24276404edcSAsim Jamshed case MOS_TCP_STATE_SVR: 24376404edcSAsim Jamshed return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR, 24476404edcSAsim Jamshed optval, optlen); 24576404edcSAsim Jamshed case MOS_TIMESTAMP: 24676404edcSAsim Jamshed return GetLastTimestamp(socket->monitor_stream->stream, 24776404edcSAsim Jamshed (uint32_t *)optval, 24876404edcSAsim Jamshed optlen); 24976404edcSAsim Jamshed default: 25076404edcSAsim Jamshed TRACE_API("can't recognize the optname=%d\n", optname); 25176404edcSAsim Jamshed assert(0); 25276404edcSAsim Jamshed } 25376404edcSAsim Jamshed break; 25476404edcSAsim Jamshed } 25576404edcSAsim Jamshed errno = ENOSYS; 25676404edcSAsim Jamshed return -1; 25776404edcSAsim Jamshed } 25876404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 25976404edcSAsim Jamshed int 26076404edcSAsim Jamshed mtcp_setsockopt(mctx_t mctx, int sockid, int level, 26176404edcSAsim Jamshed int optname, const void *optval, socklen_t optlen) 26276404edcSAsim Jamshed { 26376404edcSAsim Jamshed mtcp_manager_t mtcp; 26476404edcSAsim Jamshed socket_map_t socket; 26576404edcSAsim Jamshed tcprb_t *rb; 26676404edcSAsim Jamshed 26776404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 26876404edcSAsim Jamshed if (!mtcp) { 26976404edcSAsim Jamshed errno = EACCES; 27076404edcSAsim Jamshed return -1; 27176404edcSAsim Jamshed } 27276404edcSAsim Jamshed 27376404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 27476404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 27576404edcSAsim Jamshed errno = EBADF; 27676404edcSAsim Jamshed return -1; 27776404edcSAsim Jamshed } 27876404edcSAsim Jamshed 27976404edcSAsim Jamshed switch (level) { 28076404edcSAsim Jamshed case SOL_SOCKET: 28176404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 28276404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 28376404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 28476404edcSAsim Jamshed errno = EBADF; 28576404edcSAsim Jamshed return -1; 28676404edcSAsim Jamshed } 28776404edcSAsim Jamshed 28876404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 28976404edcSAsim Jamshed socket->socktype != MOS_SOCK_STREAM) { 29076404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 29176404edcSAsim Jamshed errno = ENOTSOCK; 29276404edcSAsim Jamshed return -1; 29376404edcSAsim Jamshed } 29476404edcSAsim Jamshed break; 29576404edcSAsim Jamshed case SOL_MONSOCKET: 29676404edcSAsim Jamshed socket = &mtcp->msmap[sockid]; 29776404edcSAsim Jamshed /* 29876404edcSAsim Jamshed * checking of calling thread to be in MOS context is 29976404edcSAsim Jamshed * disabled since both optnames can be called from 30076404edcSAsim Jamshed * `application' context (on passive sockets) 30176404edcSAsim Jamshed */ 30276404edcSAsim Jamshed /* 30376404edcSAsim Jamshed * if (mtcp->ctx->thread != pthread_self()) 30476404edcSAsim Jamshed * return -1; 30576404edcSAsim Jamshed */ 30676404edcSAsim Jamshed 30776404edcSAsim Jamshed switch (optname) { 30876404edcSAsim Jamshed case MOS_CLIBUF: 30976404edcSAsim Jamshed #if 0 31076404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 31176404edcSAsim Jamshed errno = EBADF; 31276404edcSAsim Jamshed return -1; 31376404edcSAsim Jamshed } 31476404edcSAsim Jamshed #endif 31576404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE 31676404edcSAsim Jamshed if (*(int *)optval != 0) 31776404edcSAsim Jamshed return -1; 31876404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 31976404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 32076404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 32176404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 32276404edcSAsim Jamshed if (rb) { 32376404edcSAsim Jamshed tcprb_resize_meta(rb, 0); 32476404edcSAsim Jamshed tcprb_resize(rb, 0); 32576404edcSAsim Jamshed } 32676404edcSAsim Jamshed } 32776404edcSAsim Jamshed return DisableBuf(socket, MOS_SIDE_CLI); 32876404edcSAsim Jamshed #else 32976404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 33076404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 33176404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 33276404edcSAsim Jamshed if (tcprb_resize_meta(rb, *(int *)optval) < 0) 33376404edcSAsim Jamshed return -1; 33476404edcSAsim Jamshed return tcprb_resize(rb, 33576404edcSAsim Jamshed (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 33676404edcSAsim Jamshed #endif 33776404edcSAsim Jamshed case MOS_SVRBUF: 33876404edcSAsim Jamshed #if 0 33976404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 34076404edcSAsim Jamshed errno = EBADF; 34176404edcSAsim Jamshed return -1; 34276404edcSAsim Jamshed } 34376404edcSAsim Jamshed #endif 34476404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE 34576404edcSAsim Jamshed if (*(int *)optval != 0) 34676404edcSAsim Jamshed return -1; 34776404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 34876404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 34976404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 35076404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 35176404edcSAsim Jamshed if (rb) { 35276404edcSAsim Jamshed tcprb_resize_meta(rb, 0); 35376404edcSAsim Jamshed tcprb_resize(rb, 0); 35476404edcSAsim Jamshed } 35576404edcSAsim Jamshed } 35676404edcSAsim Jamshed return DisableBuf(socket, MOS_SIDE_SVR); 35776404edcSAsim Jamshed #else 35876404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 35976404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 36076404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 36176404edcSAsim Jamshed if (tcprb_resize_meta(rb, *(int *)optval) < 0) 36276404edcSAsim Jamshed return -1; 36376404edcSAsim Jamshed return tcprb_resize(rb, 36476404edcSAsim Jamshed (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 36576404edcSAsim Jamshed #endif 36676404edcSAsim Jamshed case MOS_FRAG_CLIBUF: 36776404edcSAsim Jamshed #if 0 36876404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 36976404edcSAsim Jamshed errno = EBADF; 37076404edcSAsim Jamshed return -1; 37176404edcSAsim Jamshed } 37276404edcSAsim Jamshed #endif 37376404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE 37476404edcSAsim Jamshed if (*(int *)optval != 0) 37576404edcSAsim Jamshed return -1; 37676404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 37776404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 37876404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 37976404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 38076404edcSAsim Jamshed if (rb) 38176404edcSAsim Jamshed tcprb_resize(rb, 0); 38276404edcSAsim Jamshed } 38376404edcSAsim Jamshed return 0; 38476404edcSAsim Jamshed #else 38576404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 38676404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 38776404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 38876404edcSAsim Jamshed if (rb->len == 0) 38976404edcSAsim Jamshed return tcprb_resize_meta(rb, *(int *)optval); 39076404edcSAsim Jamshed else 39176404edcSAsim Jamshed return -1; 39276404edcSAsim Jamshed #endif 39376404edcSAsim Jamshed case MOS_FRAG_SVRBUF: 39476404edcSAsim Jamshed #if 0 39576404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 39676404edcSAsim Jamshed errno = EBADF; 39776404edcSAsim Jamshed return -1; 39876404edcSAsim Jamshed } 39976404edcSAsim Jamshed #endif 40076404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE 40176404edcSAsim Jamshed if (*(int *)optval != 0) 40276404edcSAsim Jamshed return -1; 40376404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 40476404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 40576404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 40676404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 40776404edcSAsim Jamshed if (rb) 40876404edcSAsim Jamshed tcprb_resize(rb, 0); 40976404edcSAsim Jamshed } 41076404edcSAsim Jamshed return 0; 41176404edcSAsim Jamshed #else 41276404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 41376404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 41476404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 41576404edcSAsim Jamshed if (rb->len == 0) 41676404edcSAsim Jamshed return tcprb_resize_meta(rb, *(int *)optval); 41776404edcSAsim Jamshed else 41876404edcSAsim Jamshed return -1; 41976404edcSAsim Jamshed #endif 42076404edcSAsim Jamshed case MOS_MONLEVEL: 42176404edcSAsim Jamshed #ifdef OLD_API 42276404edcSAsim Jamshed assert(*(int *)optval == MOS_NO_CLIBUF || 42376404edcSAsim Jamshed *(int *)optval == MOS_NO_SVRBUF); 42476404edcSAsim Jamshed return DisableBuf(socket, 42576404edcSAsim Jamshed (*(int *)optval == MOS_NO_CLIBUF) ? 42676404edcSAsim Jamshed MOS_SIDE_CLI : MOS_SIDE_SVR); 42776404edcSAsim Jamshed #endif 42876404edcSAsim Jamshed case MOS_SEQ_REMAP: 42976404edcSAsim Jamshed return TcpSeqChange(socket, 43076404edcSAsim Jamshed (uint32_t)((seq_remap_info *)optval)->seq_off, 43176404edcSAsim Jamshed ((seq_remap_info *)optval)->side, 43276404edcSAsim Jamshed mtcp->pctx->p.seq); 43376404edcSAsim Jamshed case MOS_STOP_MON: 43476404edcSAsim Jamshed return mtcp_cb_stop(mctx, sockid, *(int *)optval); 43576404edcSAsim Jamshed default: 43676404edcSAsim Jamshed TRACE_API("invalid optname=%d\n", optname); 43776404edcSAsim Jamshed assert(0); 43876404edcSAsim Jamshed } 43976404edcSAsim Jamshed break; 44076404edcSAsim Jamshed } 44176404edcSAsim Jamshed 44276404edcSAsim Jamshed errno = ENOSYS; 44376404edcSAsim Jamshed return -1; 44476404edcSAsim Jamshed } 44576404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 44676404edcSAsim Jamshed int 44776404edcSAsim Jamshed mtcp_setsock_nonblock(mctx_t mctx, int sockid) 44876404edcSAsim Jamshed { 44976404edcSAsim Jamshed mtcp_manager_t mtcp; 45076404edcSAsim Jamshed 45176404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 45276404edcSAsim Jamshed if (!mtcp) { 45376404edcSAsim Jamshed errno = EACCES; 45476404edcSAsim Jamshed return -1; 45576404edcSAsim Jamshed } 45676404edcSAsim Jamshed 45776404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 45876404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 45976404edcSAsim Jamshed errno = EBADF; 46076404edcSAsim Jamshed return -1; 46176404edcSAsim Jamshed } 46276404edcSAsim Jamshed 46376404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 46476404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 46576404edcSAsim Jamshed errno = EBADF; 46676404edcSAsim Jamshed return -1; 46776404edcSAsim Jamshed } 46876404edcSAsim Jamshed 46976404edcSAsim Jamshed mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 47076404edcSAsim Jamshed 47176404edcSAsim Jamshed return 0; 47276404edcSAsim Jamshed } 47376404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 47476404edcSAsim Jamshed int 47576404edcSAsim Jamshed mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp) 47676404edcSAsim Jamshed { 47776404edcSAsim Jamshed mtcp_manager_t mtcp; 47876404edcSAsim Jamshed socket_map_t socket; 47976404edcSAsim Jamshed 48076404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 48176404edcSAsim Jamshed if (!mtcp) { 48276404edcSAsim Jamshed errno = EACCES; 48376404edcSAsim Jamshed return -1; 48476404edcSAsim Jamshed } 48576404edcSAsim Jamshed 48676404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 48776404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 48876404edcSAsim Jamshed errno = EBADF; 48976404edcSAsim Jamshed return -1; 49076404edcSAsim Jamshed } 49176404edcSAsim Jamshed 49276404edcSAsim Jamshed /* only support stream socket */ 49376404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 49476404edcSAsim Jamshed 49576404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 49676404edcSAsim Jamshed socket->socktype != MOS_SOCK_STREAM) { 49776404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 49876404edcSAsim Jamshed errno = EBADF; 49976404edcSAsim Jamshed return -1; 50076404edcSAsim Jamshed } 50176404edcSAsim Jamshed 50276404edcSAsim Jamshed if (!argp) { 50376404edcSAsim Jamshed errno = EFAULT; 50476404edcSAsim Jamshed return -1; 50576404edcSAsim Jamshed } 50676404edcSAsim Jamshed 50776404edcSAsim Jamshed if (request == FIONREAD) { 50876404edcSAsim Jamshed tcp_stream *cur_stream; 50976404edcSAsim Jamshed tcprb_t *rbuf; 51076404edcSAsim Jamshed 51176404edcSAsim Jamshed cur_stream = socket->stream; 51276404edcSAsim Jamshed if (!cur_stream) { 51376404edcSAsim Jamshed errno = EBADF; 51476404edcSAsim Jamshed return -1; 51576404edcSAsim Jamshed } 51676404edcSAsim Jamshed 51776404edcSAsim Jamshed rbuf = cur_stream->rcvvar->rcvbuf; 51876404edcSAsim Jamshed *(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0; 51976404edcSAsim Jamshed 52076404edcSAsim Jamshed } else if (request == FIONBIO) { 52176404edcSAsim Jamshed /* 52276404edcSAsim Jamshed * sockets can only be set to blocking/non-blocking 52376404edcSAsim Jamshed * modes during initialization 52476404edcSAsim Jamshed */ 52576404edcSAsim Jamshed if ((*(int *)argp)) 52676404edcSAsim Jamshed mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 52776404edcSAsim Jamshed else 52876404edcSAsim Jamshed mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK; 52976404edcSAsim Jamshed } else { 53076404edcSAsim Jamshed errno = EINVAL; 53176404edcSAsim Jamshed return -1; 53276404edcSAsim Jamshed } 53376404edcSAsim Jamshed 53476404edcSAsim Jamshed return 0; 53576404edcSAsim Jamshed } 53676404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 53776404edcSAsim Jamshed static int 53876404edcSAsim Jamshed mtcp_monitor(mctx_t mctx, socket_map_t sock) 53976404edcSAsim Jamshed { 54076404edcSAsim Jamshed mtcp_manager_t mtcp; 54176404edcSAsim Jamshed struct mon_listener *monitor; 54276404edcSAsim Jamshed int sockid = sock->id; 54376404edcSAsim Jamshed 54476404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 54576404edcSAsim Jamshed if (!mtcp) { 54676404edcSAsim Jamshed errno = EACCES; 54776404edcSAsim Jamshed return -1; 54876404edcSAsim Jamshed } 54976404edcSAsim Jamshed 55076404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 55176404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 55276404edcSAsim Jamshed errno = EBADF; 55376404edcSAsim Jamshed return -1; 55476404edcSAsim Jamshed } 55576404edcSAsim Jamshed 55676404edcSAsim Jamshed if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) { 55776404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 55876404edcSAsim Jamshed errno = EBADF; 55976404edcSAsim Jamshed return -1; 56076404edcSAsim Jamshed } 56176404edcSAsim Jamshed 56276404edcSAsim Jamshed if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM || 56376404edcSAsim Jamshed mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) { 56476404edcSAsim Jamshed TRACE_API("Not a monitor socket. id: %d\n", sockid); 56576404edcSAsim Jamshed errno = ENOTSOCK; 56676404edcSAsim Jamshed return -1; 56776404edcSAsim Jamshed } 56876404edcSAsim Jamshed 56976404edcSAsim Jamshed monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener)); 57076404edcSAsim Jamshed if (!monitor) { 57176404edcSAsim Jamshed /* errno set from the malloc() */ 57276404edcSAsim Jamshed errno = ENOMEM; 57376404edcSAsim Jamshed return -1; 57476404edcSAsim Jamshed } 57576404edcSAsim Jamshed 57676404edcSAsim Jamshed /* create a monitor-specific event queue */ 57776404edcSAsim Jamshed monitor->eq = CreateEventQueue(g_config.mos->max_concurrency); 57876404edcSAsim Jamshed if (!monitor->eq) { 57976404edcSAsim Jamshed TRACE_API("Can't create event queue (concurrency: %d) for " 58076404edcSAsim Jamshed "monitor read event registrations!\n", 58176404edcSAsim Jamshed g_config.mos->max_concurrency); 58276404edcSAsim Jamshed free(monitor); 58376404edcSAsim Jamshed errno = ENOMEM; 58476404edcSAsim Jamshed return -1; 58576404edcSAsim Jamshed } 58676404edcSAsim Jamshed 58776404edcSAsim Jamshed /* set monitor-related basic parameters */ 58876404edcSAsim Jamshed #ifndef NEWEV 58976404edcSAsim Jamshed monitor->ude_id = UDE_OFFSET; 59076404edcSAsim Jamshed #endif 59176404edcSAsim Jamshed monitor->socket = sock; 59276404edcSAsim Jamshed monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL; 59376404edcSAsim Jamshed 59476404edcSAsim Jamshed /* perform both sides monitoring by default */ 59576404edcSAsim Jamshed monitor->client_mon = monitor->server_mon = 1; 59676404edcSAsim Jamshed 59776404edcSAsim Jamshed /* add monitor socket to the monitor list */ 59876404edcSAsim Jamshed TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link); 59976404edcSAsim Jamshed 60076404edcSAsim Jamshed mtcp->msmap[sockid].monitor_listener = monitor; 60176404edcSAsim Jamshed 60276404edcSAsim Jamshed return 0; 60376404edcSAsim Jamshed } 60476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 60576404edcSAsim Jamshed int 60676404edcSAsim Jamshed mtcp_socket(mctx_t mctx, int domain, int type, int protocol) 60776404edcSAsim Jamshed { 60876404edcSAsim Jamshed mtcp_manager_t mtcp; 60976404edcSAsim Jamshed socket_map_t socket; 61076404edcSAsim Jamshed 61176404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 61276404edcSAsim Jamshed if (!mtcp) { 61376404edcSAsim Jamshed errno = EACCES; 61476404edcSAsim Jamshed return -1; 61576404edcSAsim Jamshed } 61676404edcSAsim Jamshed 61776404edcSAsim Jamshed if (domain != AF_INET) { 61876404edcSAsim Jamshed errno = EAFNOSUPPORT; 61976404edcSAsim Jamshed return -1; 62076404edcSAsim Jamshed } 62176404edcSAsim Jamshed 62276404edcSAsim Jamshed if (type == SOCK_STREAM) { 62376404edcSAsim Jamshed type = MOS_SOCK_STREAM; 62476404edcSAsim Jamshed } else if (type == MOS_SOCK_MONITOR_STREAM || 62576404edcSAsim Jamshed type == MOS_SOCK_MONITOR_RAW) { 62676404edcSAsim Jamshed /* do nothing for the time being */ 62776404edcSAsim Jamshed } else { 62876404edcSAsim Jamshed /* Not supported type */ 62976404edcSAsim Jamshed errno = EINVAL; 63076404edcSAsim Jamshed return -1; 63176404edcSAsim Jamshed } 63276404edcSAsim Jamshed 63376404edcSAsim Jamshed socket = AllocateSocket(mctx, type); 63476404edcSAsim Jamshed if (!socket) { 63576404edcSAsim Jamshed errno = ENFILE; 63676404edcSAsim Jamshed return -1; 63776404edcSAsim Jamshed } 63876404edcSAsim Jamshed 63976404edcSAsim Jamshed if (type == MOS_SOCK_MONITOR_STREAM || 64076404edcSAsim Jamshed type == MOS_SOCK_MONITOR_RAW) { 64176404edcSAsim Jamshed mtcp_manager_t mtcp = GetMTCPManager(mctx); 64276404edcSAsim Jamshed if (!mtcp) { 64376404edcSAsim Jamshed errno = EACCES; 64476404edcSAsim Jamshed return -1; 64576404edcSAsim Jamshed } 64676404edcSAsim Jamshed mtcp_monitor(mctx, socket); 64776404edcSAsim Jamshed #ifdef NEWEV 64876404edcSAsim Jamshed socket->monitor_listener->stree_dontcare = NULL; 64976404edcSAsim Jamshed socket->monitor_listener->stree_pre_rcv = NULL; 65076404edcSAsim Jamshed socket->monitor_listener->stree_post_snd = NULL; 65176404edcSAsim Jamshed #else 65276404edcSAsim Jamshed InitEvB(mtcp, &socket->monitor_listener->dontcare_evb); 65376404edcSAsim Jamshed InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb); 65476404edcSAsim Jamshed InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb); 65576404edcSAsim Jamshed #endif 65676404edcSAsim Jamshed } 65776404edcSAsim Jamshed 65876404edcSAsim Jamshed return socket->id; 65976404edcSAsim Jamshed } 66076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 66176404edcSAsim Jamshed int 66276404edcSAsim Jamshed mtcp_bind(mctx_t mctx, int sockid, 66376404edcSAsim Jamshed const struct sockaddr *addr, socklen_t addrlen) 66476404edcSAsim Jamshed { 66576404edcSAsim Jamshed mtcp_manager_t mtcp; 66676404edcSAsim Jamshed struct sockaddr_in *addr_in; 66776404edcSAsim Jamshed 66876404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 66976404edcSAsim Jamshed if (!mtcp) { 67076404edcSAsim Jamshed errno = EACCES; 67176404edcSAsim Jamshed return -1; 67276404edcSAsim Jamshed } 67376404edcSAsim Jamshed 67476404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 67576404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 67676404edcSAsim Jamshed errno = EBADF; 67776404edcSAsim Jamshed return -1; 67876404edcSAsim Jamshed } 67976404edcSAsim Jamshed 68076404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 68176404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 68276404edcSAsim Jamshed errno = EBADF; 68376404edcSAsim Jamshed return -1; 68476404edcSAsim Jamshed } 68576404edcSAsim Jamshed 68676404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM && 68776404edcSAsim Jamshed mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 68876404edcSAsim Jamshed TRACE_API("Not a stream socket id: %d\n", sockid); 68976404edcSAsim Jamshed errno = ENOTSOCK; 69076404edcSAsim Jamshed return -1; 69176404edcSAsim Jamshed } 69276404edcSAsim Jamshed 69376404edcSAsim Jamshed if (!addr) { 69476404edcSAsim Jamshed TRACE_API("Socket %d: empty address!\n", sockid); 69576404edcSAsim Jamshed errno = EINVAL; 69676404edcSAsim Jamshed return -1; 69776404edcSAsim Jamshed } 69876404edcSAsim Jamshed 69976404edcSAsim Jamshed if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) { 70076404edcSAsim Jamshed TRACE_API("Socket %d: adress already bind for this socket.\n", sockid); 70176404edcSAsim Jamshed errno = EINVAL; 70276404edcSAsim Jamshed return -1; 70376404edcSAsim Jamshed } 70476404edcSAsim Jamshed 70576404edcSAsim Jamshed /* we only allow bind() for AF_INET address */ 70676404edcSAsim Jamshed if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 70776404edcSAsim Jamshed TRACE_API("Socket %d: invalid argument!\n", sockid); 70876404edcSAsim Jamshed errno = EINVAL; 70976404edcSAsim Jamshed return -1; 71076404edcSAsim Jamshed } 71176404edcSAsim Jamshed 71276404edcSAsim Jamshed if (mtcp->listener) { 71376404edcSAsim Jamshed TRACE_API("Address already bound!\n"); 71476404edcSAsim Jamshed errno = EINVAL; 71576404edcSAsim Jamshed return -1; 71676404edcSAsim Jamshed } 71776404edcSAsim Jamshed addr_in = (struct sockaddr_in *)addr; 71876404edcSAsim Jamshed mtcp->smap[sockid].saddr = *addr_in; 71976404edcSAsim Jamshed mtcp->smap[sockid].opts |= MTCP_ADDR_BIND; 72076404edcSAsim Jamshed 72176404edcSAsim Jamshed return 0; 72276404edcSAsim Jamshed } 72376404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 72476404edcSAsim Jamshed int 72576404edcSAsim Jamshed mtcp_listen(mctx_t mctx, int sockid, int backlog) 72676404edcSAsim Jamshed { 72776404edcSAsim Jamshed mtcp_manager_t mtcp; 72876404edcSAsim Jamshed struct tcp_listener *listener; 72976404edcSAsim Jamshed 73076404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 73176404edcSAsim Jamshed if (!mtcp) { 73276404edcSAsim Jamshed errno = EACCES; 73376404edcSAsim Jamshed return -1; 73476404edcSAsim Jamshed } 73576404edcSAsim Jamshed 73676404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 73776404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 73876404edcSAsim Jamshed errno = EBADF; 73976404edcSAsim Jamshed return -1; 74076404edcSAsim Jamshed } 74176404edcSAsim Jamshed 74276404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 74376404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 74476404edcSAsim Jamshed errno = EBADF; 74576404edcSAsim Jamshed return -1; 74676404edcSAsim Jamshed } 74776404edcSAsim Jamshed 74876404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) { 74976404edcSAsim Jamshed mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN; 75076404edcSAsim Jamshed } 75176404edcSAsim Jamshed 75276404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 75376404edcSAsim Jamshed TRACE_API("Not a listening socket. id: %d\n", sockid); 75476404edcSAsim Jamshed errno = ENOTSOCK; 75576404edcSAsim Jamshed return -1; 75676404edcSAsim Jamshed } 75776404edcSAsim Jamshed 75876404edcSAsim Jamshed if (backlog <= 0 || backlog > g_config.mos->max_concurrency) { 75976404edcSAsim Jamshed errno = EINVAL; 76076404edcSAsim Jamshed return -1; 76176404edcSAsim Jamshed } 76276404edcSAsim Jamshed 76376404edcSAsim Jamshed listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener)); 76476404edcSAsim Jamshed if (!listener) { 76576404edcSAsim Jamshed /* errno set from the malloc() */ 76676404edcSAsim Jamshed errno = ENOMEM; 76776404edcSAsim Jamshed return -1; 76876404edcSAsim Jamshed } 76976404edcSAsim Jamshed 77076404edcSAsim Jamshed listener->sockid = sockid; 77176404edcSAsim Jamshed listener->backlog = backlog; 77276404edcSAsim Jamshed listener->socket = &mtcp->smap[sockid]; 77376404edcSAsim Jamshed 77476404edcSAsim Jamshed if (pthread_cond_init(&listener->accept_cond, NULL)) { 77576404edcSAsim Jamshed perror("pthread_cond_init of ctx->accept_cond\n"); 77676404edcSAsim Jamshed /* errno set by pthread_cond_init() */ 77776404edcSAsim Jamshed return -1; 77876404edcSAsim Jamshed } 77976404edcSAsim Jamshed if (pthread_mutex_init(&listener->accept_lock, NULL)) { 78076404edcSAsim Jamshed perror("pthread_mutex_init of ctx->accept_lock\n"); 78176404edcSAsim Jamshed /* errno set by pthread_mutex_init() */ 78276404edcSAsim Jamshed return -1; 78376404edcSAsim Jamshed } 78476404edcSAsim Jamshed 78576404edcSAsim Jamshed listener->acceptq = CreateStreamQueue(backlog); 78676404edcSAsim Jamshed if (!listener->acceptq) { 78776404edcSAsim Jamshed errno = ENOMEM; 78876404edcSAsim Jamshed return -1; 78976404edcSAsim Jamshed } 79076404edcSAsim Jamshed 79176404edcSAsim Jamshed mtcp->smap[sockid].listener = listener; 79276404edcSAsim Jamshed mtcp->listener = listener; 79376404edcSAsim Jamshed 79476404edcSAsim Jamshed return 0; 79576404edcSAsim Jamshed } 79676404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 79776404edcSAsim Jamshed int 79876404edcSAsim Jamshed mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen) 79976404edcSAsim Jamshed { 80076404edcSAsim Jamshed mtcp_manager_t mtcp; 80176404edcSAsim Jamshed struct tcp_listener *listener; 80276404edcSAsim Jamshed socket_map_t socket; 80376404edcSAsim Jamshed tcp_stream *accepted = NULL; 80476404edcSAsim Jamshed 80576404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 80676404edcSAsim Jamshed if (!mtcp) { 80776404edcSAsim Jamshed errno = EACCES; 80876404edcSAsim Jamshed return -1; 80976404edcSAsim Jamshed } 81076404edcSAsim Jamshed 81176404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 81276404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 81376404edcSAsim Jamshed errno = EBADF; 81476404edcSAsim Jamshed return -1; 81576404edcSAsim Jamshed } 81676404edcSAsim Jamshed 81776404edcSAsim Jamshed /* requires listening socket */ 81876404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 81976404edcSAsim Jamshed errno = EINVAL; 82076404edcSAsim Jamshed return -1; 82176404edcSAsim Jamshed } 82276404edcSAsim Jamshed 82376404edcSAsim Jamshed listener = mtcp->smap[sockid].listener; 82476404edcSAsim Jamshed 82576404edcSAsim Jamshed /* dequeue from the acceptq without lock first */ 82676404edcSAsim Jamshed /* if nothing there, acquire lock and cond_wait */ 82776404edcSAsim Jamshed accepted = StreamDequeue(listener->acceptq); 82876404edcSAsim Jamshed if (!accepted) { 82976404edcSAsim Jamshed if (listener->socket->opts & MTCP_NONBLOCK) { 83076404edcSAsim Jamshed errno = EAGAIN; 83176404edcSAsim Jamshed return -1; 83276404edcSAsim Jamshed 83376404edcSAsim Jamshed } else { 83476404edcSAsim Jamshed pthread_mutex_lock(&listener->accept_lock); 83576404edcSAsim Jamshed while ((accepted = StreamDequeue(listener->acceptq)) == NULL) { 83676404edcSAsim Jamshed pthread_cond_wait(&listener->accept_cond, &listener->accept_lock); 83776404edcSAsim Jamshed 83876404edcSAsim Jamshed if (mtcp->ctx->done || mtcp->ctx->exit) { 83976404edcSAsim Jamshed pthread_mutex_unlock(&listener->accept_lock); 84076404edcSAsim Jamshed errno = EINTR; 84176404edcSAsim Jamshed return -1; 84276404edcSAsim Jamshed } 84376404edcSAsim Jamshed } 84476404edcSAsim Jamshed pthread_mutex_unlock(&listener->accept_lock); 84576404edcSAsim Jamshed } 84676404edcSAsim Jamshed } 84776404edcSAsim Jamshed 84876404edcSAsim Jamshed if (!accepted) { 84976404edcSAsim Jamshed TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n"); 85076404edcSAsim Jamshed } 85176404edcSAsim Jamshed 85276404edcSAsim Jamshed if (!accepted->socket) { 85376404edcSAsim Jamshed socket = AllocateSocket(mctx, MOS_SOCK_STREAM); 85476404edcSAsim Jamshed if (!socket) { 85576404edcSAsim Jamshed TRACE_ERROR("Failed to create new socket!\n"); 85676404edcSAsim Jamshed /* TODO: destroy the stream */ 85776404edcSAsim Jamshed errno = ENFILE; 85876404edcSAsim Jamshed return -1; 85976404edcSAsim Jamshed } 86076404edcSAsim Jamshed socket->stream = accepted; 86176404edcSAsim Jamshed accepted->socket = socket; 862a5e1a556SAsim Jamshed 863a5e1a556SAsim Jamshed /* set socket addr parameters */ 864a5e1a556SAsim Jamshed socket->saddr.sin_family = AF_INET; 865a5e1a556SAsim Jamshed socket->saddr.sin_port = accepted->dport; 866a5e1a556SAsim Jamshed socket->saddr.sin_addr.s_addr = accepted->daddr; 867a5e1a556SAsim Jamshed 86876404edcSAsim Jamshed /* if monitor is enabled, complete the socket assignment */ 86976404edcSAsim Jamshed if (socket->stream->pair_stream != NULL) 87076404edcSAsim Jamshed socket->stream->pair_stream->socket = socket; 87176404edcSAsim Jamshed } 87276404edcSAsim Jamshed 873a5e1a556SAsim Jamshed if (!(listener->socket->epoll & MOS_EPOLLET) && 874a5e1a556SAsim Jamshed !StreamQueueIsEmpty(listener->acceptq)) 875a5e1a556SAsim Jamshed AddEpollEvent(mtcp->ep, 876a5e1a556SAsim Jamshed USR_SHADOW_EVENT_QUEUE, 877a5e1a556SAsim Jamshed listener->socket, MOS_EPOLLIN); 878a5e1a556SAsim Jamshed 87976404edcSAsim Jamshed TRACE_API("Stream %d accepted.\n", accepted->id); 88076404edcSAsim Jamshed 88176404edcSAsim Jamshed if (addr && addrlen) { 88276404edcSAsim Jamshed struct sockaddr_in *addr_in = (struct sockaddr_in *)addr; 88376404edcSAsim Jamshed addr_in->sin_family = AF_INET; 88476404edcSAsim Jamshed addr_in->sin_port = accepted->dport; 88576404edcSAsim Jamshed addr_in->sin_addr.s_addr = accepted->daddr; 88676404edcSAsim Jamshed *addrlen = sizeof(struct sockaddr_in); 88776404edcSAsim Jamshed } 88876404edcSAsim Jamshed 88976404edcSAsim Jamshed return accepted->socket->id; 89076404edcSAsim Jamshed } 89176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 89276404edcSAsim Jamshed int 89376404edcSAsim Jamshed mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr, 89476404edcSAsim Jamshed in_addr_t daddr, in_addr_t dport) 89576404edcSAsim Jamshed { 89676404edcSAsim Jamshed mtcp_manager_t mtcp; 89776404edcSAsim Jamshed addr_pool_t ap; 89876404edcSAsim Jamshed 89976404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 90076404edcSAsim Jamshed if (!mtcp) { 90176404edcSAsim Jamshed errno = EACCES; 90276404edcSAsim Jamshed return -1; 90376404edcSAsim Jamshed } 90476404edcSAsim Jamshed 90576404edcSAsim Jamshed if (saddr_base == INADDR_ANY) { 90676404edcSAsim Jamshed int nif_out; 90776404edcSAsim Jamshed 90876404edcSAsim Jamshed /* for the INADDR_ANY, find the output interface for the destination 90976404edcSAsim Jamshed and set the saddr_base as the ip address of the output interface */ 91076404edcSAsim Jamshed nif_out = GetOutputInterface(daddr); 91176404edcSAsim Jamshed saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr; 91276404edcSAsim Jamshed } 91376404edcSAsim Jamshed 91476404edcSAsim Jamshed ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus, 91576404edcSAsim Jamshed saddr_base, num_addr, daddr, dport); 91676404edcSAsim Jamshed if (!ap) { 91776404edcSAsim Jamshed errno = ENOMEM; 91876404edcSAsim Jamshed return -1; 91976404edcSAsim Jamshed } 92076404edcSAsim Jamshed 92176404edcSAsim Jamshed mtcp->ap = ap; 92276404edcSAsim Jamshed 92376404edcSAsim Jamshed return 0; 92476404edcSAsim Jamshed } 92576404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 92676404edcSAsim Jamshed int 92776404edcSAsim Jamshed eval_bpf_5tuple(struct sfbpf_program fcode, 92876404edcSAsim Jamshed in_addr_t saddr, in_port_t sport, 92976404edcSAsim Jamshed in_addr_t daddr, in_port_t dport) { 93076404edcSAsim Jamshed uint8_t buf[TOTAL_TCP_HEADER_LEN]; 93176404edcSAsim Jamshed struct ethhdr *ethh; 93276404edcSAsim Jamshed struct iphdr *iph; 93376404edcSAsim Jamshed struct tcphdr *tcph; 93476404edcSAsim Jamshed 93576404edcSAsim Jamshed ethh = (struct ethhdr *)buf; 93676404edcSAsim Jamshed ethh->h_proto = htons(ETH_P_IP); 93776404edcSAsim Jamshed iph = (struct iphdr *)(ethh + 1); 93876404edcSAsim Jamshed iph->ihl = IP_HEADER_LEN >> 2; 93976404edcSAsim Jamshed iph->version = 4; 94076404edcSAsim Jamshed iph->tos = 0; 94176404edcSAsim Jamshed iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN); 94276404edcSAsim Jamshed iph->id = htons(0); 94376404edcSAsim Jamshed iph->protocol = IPPROTO_TCP; 94476404edcSAsim Jamshed iph->saddr = saddr; 94576404edcSAsim Jamshed iph->daddr = daddr; 94676404edcSAsim Jamshed iph->check = 0; 94776404edcSAsim Jamshed tcph = (struct tcphdr *)(iph + 1); 94876404edcSAsim Jamshed tcph->source = sport; 94976404edcSAsim Jamshed tcph->dest = dport; 95076404edcSAsim Jamshed 95176404edcSAsim Jamshed return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr), 95276404edcSAsim Jamshed TOTAL_TCP_HEADER_LEN); 95376404edcSAsim Jamshed } 95476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 95576404edcSAsim Jamshed int 95676404edcSAsim Jamshed mtcp_connect(mctx_t mctx, int sockid, 95776404edcSAsim Jamshed const struct sockaddr *addr, socklen_t addrlen) 95876404edcSAsim Jamshed { 95976404edcSAsim Jamshed mtcp_manager_t mtcp; 96076404edcSAsim Jamshed socket_map_t socket; 96176404edcSAsim Jamshed tcp_stream *cur_stream; 96276404edcSAsim Jamshed struct sockaddr_in *addr_in; 96376404edcSAsim Jamshed in_addr_t dip; 96476404edcSAsim Jamshed in_port_t dport; 96576404edcSAsim Jamshed int is_dyn_bound = FALSE; 96676404edcSAsim Jamshed int ret; 96776404edcSAsim Jamshed int cnt_match = 0; 96876404edcSAsim Jamshed struct mon_listener *walk; 96976404edcSAsim Jamshed struct sfbpf_program fcode; 97076404edcSAsim Jamshed 97176404edcSAsim Jamshed cur_stream = NULL; 97276404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 97376404edcSAsim Jamshed if (!mtcp) { 97476404edcSAsim Jamshed errno = EACCES; 97576404edcSAsim Jamshed return -1; 97676404edcSAsim Jamshed } 97776404edcSAsim Jamshed 97876404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 97976404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 98076404edcSAsim Jamshed errno = EBADF; 98176404edcSAsim Jamshed return -1; 98276404edcSAsim Jamshed } 98376404edcSAsim Jamshed 98476404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 98576404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 98676404edcSAsim Jamshed errno = EBADF; 98776404edcSAsim Jamshed return -1; 98876404edcSAsim Jamshed } 98976404edcSAsim Jamshed 99076404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 99176404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 99276404edcSAsim Jamshed errno = ENOTSOCK; 99376404edcSAsim Jamshed return -1; 99476404edcSAsim Jamshed } 99576404edcSAsim Jamshed 99676404edcSAsim Jamshed if (!addr) { 99776404edcSAsim Jamshed TRACE_API("Socket %d: empty address!\n", sockid); 99876404edcSAsim Jamshed errno = EFAULT; 99976404edcSAsim Jamshed return -1; 100076404edcSAsim Jamshed } 100176404edcSAsim Jamshed 100276404edcSAsim Jamshed /* we only allow bind() for AF_INET address */ 100376404edcSAsim Jamshed if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 100476404edcSAsim Jamshed TRACE_API("Socket %d: invalid argument!\n", sockid); 100576404edcSAsim Jamshed errno = EAFNOSUPPORT; 100676404edcSAsim Jamshed return -1; 100776404edcSAsim Jamshed } 100876404edcSAsim Jamshed 100976404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 101076404edcSAsim Jamshed if (socket->stream) { 101176404edcSAsim Jamshed TRACE_API("Socket %d: stream already exist!\n", sockid); 101276404edcSAsim Jamshed if (socket->stream->state >= TCP_ST_ESTABLISHED) { 101376404edcSAsim Jamshed errno = EISCONN; 101476404edcSAsim Jamshed } else { 101576404edcSAsim Jamshed errno = EALREADY; 101676404edcSAsim Jamshed } 101776404edcSAsim Jamshed return -1; 101876404edcSAsim Jamshed } 101976404edcSAsim Jamshed 102076404edcSAsim Jamshed addr_in = (struct sockaddr_in *)addr; 102176404edcSAsim Jamshed dip = addr_in->sin_addr.s_addr; 102276404edcSAsim Jamshed dport = addr_in->sin_port; 102376404edcSAsim Jamshed 102476404edcSAsim Jamshed /* address binding */ 102576404edcSAsim Jamshed if (socket->opts & MTCP_ADDR_BIND && 102676404edcSAsim Jamshed socket->saddr.sin_port != INPORT_ANY && 102776404edcSAsim Jamshed socket->saddr.sin_addr.s_addr != INADDR_ANY) { 102876404edcSAsim Jamshed int rss_core; 102976404edcSAsim Jamshed 103076404edcSAsim Jamshed rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip, 103176404edcSAsim Jamshed socket->saddr.sin_port, dport, num_queues); 103276404edcSAsim Jamshed 103376404edcSAsim Jamshed if (rss_core != mctx->cpu) { 103476404edcSAsim Jamshed errno = EINVAL; 103576404edcSAsim Jamshed return -1; 103676404edcSAsim Jamshed } 103776404edcSAsim Jamshed } else { 103876404edcSAsim Jamshed if (mtcp->ap) { 103976404edcSAsim Jamshed ret = FetchAddress(mtcp->ap, 104076404edcSAsim Jamshed mctx->cpu, num_queues, addr_in, &socket->saddr); 104176404edcSAsim Jamshed } else { 1042a5e1a556SAsim Jamshed ret = FetchAddress(ap[GetOutputInterface(dip)], 104376404edcSAsim Jamshed mctx->cpu, num_queues, addr_in, &socket->saddr); 104476404edcSAsim Jamshed } 104576404edcSAsim Jamshed if (ret < 0) { 104676404edcSAsim Jamshed errno = EAGAIN; 104776404edcSAsim Jamshed return -1; 104876404edcSAsim Jamshed } 104976404edcSAsim Jamshed socket->opts |= MTCP_ADDR_BIND; 105076404edcSAsim Jamshed is_dyn_bound = TRUE; 105176404edcSAsim Jamshed } 105276404edcSAsim Jamshed 105376404edcSAsim Jamshed cnt_match = 0; 105476404edcSAsim Jamshed if (mtcp->num_msp > 0) { 105576404edcSAsim Jamshed TAILQ_FOREACH(walk, &mtcp->monitors, link) { 105676404edcSAsim Jamshed fcode = walk->stream_syn_fcode; 105776404edcSAsim Jamshed if (!(ISSET_BPFFILTER(fcode) && 105876404edcSAsim Jamshed eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr, 105976404edcSAsim Jamshed socket->saddr.sin_port, 106076404edcSAsim Jamshed dip, dport) == 0)) { 106176404edcSAsim Jamshed walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1 106276404edcSAsim Jamshed cnt_match++; 106376404edcSAsim Jamshed } 106476404edcSAsim Jamshed } 106576404edcSAsim Jamshed } 106676404edcSAsim Jamshed 106776404edcSAsim Jamshed if (mtcp->num_msp > 0 && cnt_match > 0) { 106876404edcSAsim Jamshed /* 150820 dhkim: XXX: embedded mode is not verified */ 106976404edcSAsim Jamshed #if 1 107076404edcSAsim Jamshed cur_stream = CreateClientTCPStream(mtcp, socket, 107176404edcSAsim Jamshed STREAM_TYPE(MOS_SOCK_STREAM) | 107276404edcSAsim Jamshed STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 107376404edcSAsim Jamshed socket->saddr.sin_addr.s_addr, 107476404edcSAsim Jamshed socket->saddr.sin_port, dip, dport, NULL); 107576404edcSAsim Jamshed #else 107676404edcSAsim Jamshed cur_stream = CreateDualTCPStream(mtcp, socket, 107776404edcSAsim Jamshed STREAM_TYPE(MOS_SOCK_STREAM) | 107876404edcSAsim Jamshed STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 107976404edcSAsim Jamshed socket->saddr.sin_addr.s_addr, 108076404edcSAsim Jamshed socket->saddr.sin_port, dip, dport, NULL); 108176404edcSAsim Jamshed #endif 108276404edcSAsim Jamshed } 108376404edcSAsim Jamshed else 108476404edcSAsim Jamshed cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM), 108576404edcSAsim Jamshed socket->saddr.sin_addr.s_addr, 108676404edcSAsim Jamshed socket->saddr.sin_port, dip, dport, NULL); 108776404edcSAsim Jamshed if (!cur_stream) { 108876404edcSAsim Jamshed TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid); 108976404edcSAsim Jamshed errno = ENOMEM; 109076404edcSAsim Jamshed return -1; 109176404edcSAsim Jamshed } 109276404edcSAsim Jamshed 109376404edcSAsim Jamshed if (is_dyn_bound) 109476404edcSAsim Jamshed cur_stream->is_bound_addr = TRUE; 109576404edcSAsim Jamshed cur_stream->sndvar->cwnd = 1; 109676404edcSAsim Jamshed cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10; 109776404edcSAsim Jamshed cur_stream->side = MOS_SIDE_CLI; 109876404edcSAsim Jamshed /* if monitor is enabled, update the pair stream side as well */ 109976404edcSAsim Jamshed if (cur_stream->pair_stream) { 110076404edcSAsim Jamshed cur_stream->pair_stream->side = MOS_SIDE_SVR; 110176404edcSAsim Jamshed /* 110276404edcSAsim Jamshed * if buffer management is off, then disable 110376404edcSAsim Jamshed * monitoring tcp ring of server... 110476404edcSAsim Jamshed * if there is even a single monitor asking for 110576404edcSAsim Jamshed * buffer management, enable it (that's why the 110676404edcSAsim Jamshed * need for the loop) 110776404edcSAsim Jamshed */ 110876404edcSAsim Jamshed cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF; 110976404edcSAsim Jamshed struct socket_map *walk; 111076404edcSAsim Jamshed SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 111176404edcSAsim Jamshed uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt; 111276404edcSAsim Jamshed if (bm > cur_stream->pair_stream->buffer_mgmt) { 111376404edcSAsim Jamshed cur_stream->pair_stream->buffer_mgmt = bm; 111476404edcSAsim Jamshed break; 111576404edcSAsim Jamshed } 111676404edcSAsim Jamshed } SOCKQ_FOREACH_END; 111776404edcSAsim Jamshed } 111876404edcSAsim Jamshed 111976404edcSAsim Jamshed cur_stream->state = TCP_ST_SYN_SENT; 112076404edcSAsim Jamshed cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 112176404edcSAsim Jamshed 112276404edcSAsim Jamshed TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id); 112376404edcSAsim Jamshed 112476404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->connect_lock); 112576404edcSAsim Jamshed ret = StreamEnqueue(mtcp->connectq, cur_stream); 112676404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->connect_lock); 112776404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 112876404edcSAsim Jamshed if (ret < 0) { 112976404edcSAsim Jamshed TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid); 113076404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock); 113176404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream); 113276404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 113376404edcSAsim Jamshed errno = EAGAIN; 113476404edcSAsim Jamshed return -1; 113576404edcSAsim Jamshed } 113676404edcSAsim Jamshed 113776404edcSAsim Jamshed /* if nonblocking socket, return EINPROGRESS */ 113876404edcSAsim Jamshed if (socket->opts & MTCP_NONBLOCK) { 113976404edcSAsim Jamshed errno = EINPROGRESS; 114076404edcSAsim Jamshed return -1; 114176404edcSAsim Jamshed 114276404edcSAsim Jamshed } else { 114376404edcSAsim Jamshed while (1) { 114476404edcSAsim Jamshed if (!cur_stream) { 114576404edcSAsim Jamshed TRACE_ERROR("STREAM DESTROYED\n"); 114676404edcSAsim Jamshed errno = ETIMEDOUT; 114776404edcSAsim Jamshed return -1; 114876404edcSAsim Jamshed } 114976404edcSAsim Jamshed if (cur_stream->state > TCP_ST_ESTABLISHED) { 115076404edcSAsim Jamshed TRACE_ERROR("Socket %d: weird state %s\n", 115176404edcSAsim Jamshed sockid, TCPStateToString(cur_stream)); 115276404edcSAsim Jamshed // TODO: how to handle this? 115376404edcSAsim Jamshed errno = ENOSYS; 115476404edcSAsim Jamshed return -1; 115576404edcSAsim Jamshed } 115676404edcSAsim Jamshed 115776404edcSAsim Jamshed if (cur_stream->state == TCP_ST_ESTABLISHED) { 115876404edcSAsim Jamshed break; 115976404edcSAsim Jamshed } 116076404edcSAsim Jamshed usleep(1000); 116176404edcSAsim Jamshed } 116276404edcSAsim Jamshed } 116376404edcSAsim Jamshed 116476404edcSAsim Jamshed return 0; 116576404edcSAsim Jamshed } 116676404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 116776404edcSAsim Jamshed static inline int 116876404edcSAsim Jamshed CloseStreamSocket(mctx_t mctx, int sockid) 116976404edcSAsim Jamshed { 117076404edcSAsim Jamshed mtcp_manager_t mtcp; 117176404edcSAsim Jamshed tcp_stream *cur_stream; 117276404edcSAsim Jamshed int ret; 117376404edcSAsim Jamshed 117476404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 117576404edcSAsim Jamshed if (!mtcp) { 117676404edcSAsim Jamshed errno = EACCES; 117776404edcSAsim Jamshed return -1; 117876404edcSAsim Jamshed } 117976404edcSAsim Jamshed 118076404edcSAsim Jamshed cur_stream = mtcp->smap[sockid].stream; 118176404edcSAsim Jamshed if (!cur_stream) { 118276404edcSAsim Jamshed TRACE_API("Socket %d: stream does not exist.\n", sockid); 118376404edcSAsim Jamshed errno = ENOTCONN; 118476404edcSAsim Jamshed return -1; 118576404edcSAsim Jamshed } 118676404edcSAsim Jamshed 118776404edcSAsim Jamshed if (cur_stream->closed) { 118876404edcSAsim Jamshed TRACE_API("Socket %d (Stream %u): already closed stream\n", 118976404edcSAsim Jamshed sockid, cur_stream->id); 119076404edcSAsim Jamshed return 0; 119176404edcSAsim Jamshed } 119276404edcSAsim Jamshed cur_stream->closed = TRUE; 119376404edcSAsim Jamshed 119476404edcSAsim Jamshed TRACE_API("Stream %d: closing the stream.\n", cur_stream->id); 119576404edcSAsim Jamshed 119676404edcSAsim Jamshed /* 141029 dhkim: Check this! */ 119776404edcSAsim Jamshed cur_stream->socket = NULL; 119876404edcSAsim Jamshed 119976404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 120076404edcSAsim Jamshed TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n", 120176404edcSAsim Jamshed cur_stream->id); 120276404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock); 120376404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream); 120476404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 120576404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 120676404edcSAsim Jamshed return 0; 120776404edcSAsim Jamshed 120876404edcSAsim Jamshed } else if (cur_stream->state == TCP_ST_SYN_SENT) { 120976404edcSAsim Jamshed #if 1 121076404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock); 121176404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream); 121276404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 121376404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 121476404edcSAsim Jamshed #endif 121576404edcSAsim Jamshed return -1; 121676404edcSAsim Jamshed 121776404edcSAsim Jamshed } else if (cur_stream->state != TCP_ST_ESTABLISHED && 121876404edcSAsim Jamshed cur_stream->state != TCP_ST_CLOSE_WAIT) { 121976404edcSAsim Jamshed TRACE_API("Stream %d at state %s\n", 122076404edcSAsim Jamshed cur_stream->id, TCPStateToString(cur_stream)); 122176404edcSAsim Jamshed errno = EBADF; 122276404edcSAsim Jamshed return -1; 122376404edcSAsim Jamshed } 122476404edcSAsim Jamshed 122576404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->close_lock); 122676404edcSAsim Jamshed cur_stream->sndvar->on_closeq = TRUE; 122776404edcSAsim Jamshed ret = StreamEnqueue(mtcp->closeq, cur_stream); 122876404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 122976404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->close_lock); 123076404edcSAsim Jamshed 123176404edcSAsim Jamshed if (ret < 0) { 123276404edcSAsim Jamshed TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 123376404edcSAsim Jamshed errno = EAGAIN; 123476404edcSAsim Jamshed return -1; 123576404edcSAsim Jamshed } 123676404edcSAsim Jamshed 123776404edcSAsim Jamshed return 0; 123876404edcSAsim Jamshed } 123976404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 124076404edcSAsim Jamshed static inline int 124176404edcSAsim Jamshed CloseListeningSocket(mctx_t mctx, int sockid) 124276404edcSAsim Jamshed { 124376404edcSAsim Jamshed mtcp_manager_t mtcp; 124476404edcSAsim Jamshed struct tcp_listener *listener; 124576404edcSAsim Jamshed 124676404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 124776404edcSAsim Jamshed if (!mtcp) { 124876404edcSAsim Jamshed errno = EACCES; 124976404edcSAsim Jamshed return -1; 125076404edcSAsim Jamshed } 125176404edcSAsim Jamshed 125276404edcSAsim Jamshed listener = mtcp->smap[sockid].listener; 125376404edcSAsim Jamshed if (!listener) { 125476404edcSAsim Jamshed errno = EINVAL; 125576404edcSAsim Jamshed return -1; 125676404edcSAsim Jamshed } 125776404edcSAsim Jamshed 125876404edcSAsim Jamshed if (listener->acceptq) { 125976404edcSAsim Jamshed DestroyStreamQueue(listener->acceptq); 126076404edcSAsim Jamshed listener->acceptq = NULL; 126176404edcSAsim Jamshed } 126276404edcSAsim Jamshed 126376404edcSAsim Jamshed pthread_mutex_lock(&listener->accept_lock); 126476404edcSAsim Jamshed pthread_cond_signal(&listener->accept_cond); 126576404edcSAsim Jamshed pthread_mutex_unlock(&listener->accept_lock); 126676404edcSAsim Jamshed 126776404edcSAsim Jamshed pthread_cond_destroy(&listener->accept_cond); 126876404edcSAsim Jamshed pthread_mutex_destroy(&listener->accept_lock); 126976404edcSAsim Jamshed 127076404edcSAsim Jamshed free(listener); 127176404edcSAsim Jamshed mtcp->smap[sockid].listener = NULL; 127276404edcSAsim Jamshed 127376404edcSAsim Jamshed return 0; 127476404edcSAsim Jamshed } 127576404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 127676404edcSAsim Jamshed int 127776404edcSAsim Jamshed mtcp_close(mctx_t mctx, int sockid) 127876404edcSAsim Jamshed { 127976404edcSAsim Jamshed mtcp_manager_t mtcp; 128076404edcSAsim Jamshed int ret; 128176404edcSAsim Jamshed 128276404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 128376404edcSAsim Jamshed if (!mtcp) { 128476404edcSAsim Jamshed errno = EACCES; 128576404edcSAsim Jamshed return -1; 128676404edcSAsim Jamshed } 128776404edcSAsim Jamshed 128876404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 128976404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 129076404edcSAsim Jamshed errno = EBADF; 129176404edcSAsim Jamshed return -1; 129276404edcSAsim Jamshed } 129376404edcSAsim Jamshed 129476404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 129576404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 129676404edcSAsim Jamshed errno = EBADF; 129776404edcSAsim Jamshed return -1; 129876404edcSAsim Jamshed } 129976404edcSAsim Jamshed 130076404edcSAsim Jamshed TRACE_API("Socket %d: mtcp_close called.\n", sockid); 130176404edcSAsim Jamshed 130276404edcSAsim Jamshed switch (mtcp->smap[sockid].socktype) { 130376404edcSAsim Jamshed case MOS_SOCK_STREAM: 130476404edcSAsim Jamshed ret = CloseStreamSocket(mctx, sockid); 130576404edcSAsim Jamshed break; 130676404edcSAsim Jamshed 130776404edcSAsim Jamshed case MOS_SOCK_STREAM_LISTEN: 130876404edcSAsim Jamshed ret = CloseListeningSocket(mctx, sockid); 130976404edcSAsim Jamshed break; 131076404edcSAsim Jamshed 131176404edcSAsim Jamshed case MOS_SOCK_EPOLL: 131276404edcSAsim Jamshed ret = CloseEpollSocket(mctx, sockid); 131376404edcSAsim Jamshed break; 131476404edcSAsim Jamshed 131576404edcSAsim Jamshed case MOS_SOCK_PIPE: 131676404edcSAsim Jamshed ret = PipeClose(mctx, sockid); 131776404edcSAsim Jamshed break; 131876404edcSAsim Jamshed 131976404edcSAsim Jamshed default: 132076404edcSAsim Jamshed errno = EINVAL; 132176404edcSAsim Jamshed ret = -1; 132276404edcSAsim Jamshed break; 132376404edcSAsim Jamshed } 132476404edcSAsim Jamshed 132576404edcSAsim Jamshed FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 132676404edcSAsim Jamshed 132776404edcSAsim Jamshed return ret; 132876404edcSAsim Jamshed } 132976404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 133076404edcSAsim Jamshed int 133176404edcSAsim Jamshed mtcp_abort(mctx_t mctx, int sockid) 133276404edcSAsim Jamshed { 133376404edcSAsim Jamshed mtcp_manager_t mtcp; 133476404edcSAsim Jamshed tcp_stream *cur_stream; 133576404edcSAsim Jamshed int ret; 133676404edcSAsim Jamshed 133776404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 133876404edcSAsim Jamshed if (!mtcp) { 133976404edcSAsim Jamshed errno = EACCES; 134076404edcSAsim Jamshed return -1; 134176404edcSAsim Jamshed } 134276404edcSAsim Jamshed 134376404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 134476404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 134576404edcSAsim Jamshed errno = EBADF; 134676404edcSAsim Jamshed return -1; 134776404edcSAsim Jamshed } 134876404edcSAsim Jamshed 134976404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 135076404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 135176404edcSAsim Jamshed errno = EBADF; 135276404edcSAsim Jamshed return -1; 135376404edcSAsim Jamshed } 135476404edcSAsim Jamshed 135576404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 135676404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 135776404edcSAsim Jamshed errno = ENOTSOCK; 135876404edcSAsim Jamshed return -1; 135976404edcSAsim Jamshed } 136076404edcSAsim Jamshed 136176404edcSAsim Jamshed cur_stream = mtcp->smap[sockid].stream; 136276404edcSAsim Jamshed if (!cur_stream) { 136376404edcSAsim Jamshed TRACE_API("Stream %d: does not exist.\n", sockid); 136476404edcSAsim Jamshed errno = ENOTCONN; 136576404edcSAsim Jamshed return -1; 136676404edcSAsim Jamshed } 136776404edcSAsim Jamshed 136876404edcSAsim Jamshed TRACE_API("Socket %d: mtcp_abort()\n", sockid); 136976404edcSAsim Jamshed 137076404edcSAsim Jamshed FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 137176404edcSAsim Jamshed cur_stream->socket = NULL; 137276404edcSAsim Jamshed 137376404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 137476404edcSAsim Jamshed TRACE_API("Stream %d: connection already reset.\n", sockid); 137576404edcSAsim Jamshed return ERROR; 137676404edcSAsim Jamshed 137776404edcSAsim Jamshed } else if (cur_stream->state == TCP_ST_SYN_SENT) { 137876404edcSAsim Jamshed /* TODO: this should notify event failure to all 137976404edcSAsim Jamshed previous read() or write() calls */ 138076404edcSAsim Jamshed cur_stream->state = TCP_ST_CLOSED_RSVD; 138176404edcSAsim Jamshed cur_stream->close_reason = TCP_ACTIVE_CLOSE; 138276404edcSAsim Jamshed cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 138376404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock); 138476404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream); 138576404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 138676404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 138776404edcSAsim Jamshed return 0; 138876404edcSAsim Jamshed 138976404edcSAsim Jamshed } else if (cur_stream->state == TCP_ST_CLOSING || 139076404edcSAsim Jamshed cur_stream->state == TCP_ST_LAST_ACK || 139176404edcSAsim Jamshed cur_stream->state == TCP_ST_TIME_WAIT) { 139276404edcSAsim Jamshed cur_stream->state = TCP_ST_CLOSED_RSVD; 139376404edcSAsim Jamshed cur_stream->close_reason = TCP_ACTIVE_CLOSE; 139476404edcSAsim Jamshed cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 139576404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock); 139676404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream); 139776404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 139876404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 139976404edcSAsim Jamshed return 0; 140076404edcSAsim Jamshed } 140176404edcSAsim Jamshed 140276404edcSAsim Jamshed /* the stream structure will be destroyed after sending RST */ 140376404edcSAsim Jamshed if (cur_stream->sndvar->on_resetq) { 140476404edcSAsim Jamshed TRACE_ERROR("Stream %d: calling mtcp_abort() " 140576404edcSAsim Jamshed "when in reset queue.\n", sockid); 140676404edcSAsim Jamshed errno = ECONNRESET; 140776404edcSAsim Jamshed return -1; 140876404edcSAsim Jamshed } 140976404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->reset_lock); 141076404edcSAsim Jamshed cur_stream->sndvar->on_resetq = TRUE; 141176404edcSAsim Jamshed ret = StreamEnqueue(mtcp->resetq, cur_stream); 141276404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->reset_lock); 141376404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 141476404edcSAsim Jamshed 141576404edcSAsim Jamshed if (ret < 0) { 141676404edcSAsim Jamshed TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 141776404edcSAsim Jamshed errno = EAGAIN; 141876404edcSAsim Jamshed return -1; 141976404edcSAsim Jamshed } 142076404edcSAsim Jamshed 142176404edcSAsim Jamshed return 0; 142276404edcSAsim Jamshed } 142376404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 142476404edcSAsim Jamshed static inline int 1425*df3fae06SAsim Jamshed PeekForUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len) 1426*df3fae06SAsim Jamshed { 1427*df3fae06SAsim Jamshed struct tcp_recv_vars *rcvvar = cur_stream->rcvvar; 1428*df3fae06SAsim Jamshed int copylen; 1429*df3fae06SAsim Jamshed tcprb_t *rb = rcvvar->rcvbuf; 1430*df3fae06SAsim Jamshed 1431*df3fae06SAsim Jamshed if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) { 1432*df3fae06SAsim Jamshed errno = EAGAIN; 1433*df3fae06SAsim Jamshed return -1; 1434*df3fae06SAsim Jamshed } 1435*df3fae06SAsim Jamshed 1436*df3fae06SAsim Jamshed return copylen; 1437*df3fae06SAsim Jamshed } 1438*df3fae06SAsim Jamshed /*----------------------------------------------------------------------------*/ 1439*df3fae06SAsim Jamshed static inline int 144076404edcSAsim Jamshed CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len) 144176404edcSAsim Jamshed { 144276404edcSAsim Jamshed struct tcp_recv_vars *rcvvar = cur_stream->rcvvar; 144376404edcSAsim Jamshed int copylen; 144476404edcSAsim Jamshed tcprb_t *rb = rcvvar->rcvbuf; 144576404edcSAsim Jamshed if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) { 144676404edcSAsim Jamshed errno = EAGAIN; 144776404edcSAsim Jamshed return -1; 144876404edcSAsim Jamshed } 144976404edcSAsim Jamshed tcprb_setpile(rb, rb->pile + copylen); 145076404edcSAsim Jamshed 145176404edcSAsim Jamshed rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb); 145276404edcSAsim Jamshed //printf("rcv_wnd: %d\n", rcvvar->rcv_wnd); 145376404edcSAsim Jamshed 145476404edcSAsim Jamshed /* Advertise newly freed receive buffer */ 145576404edcSAsim Jamshed if (cur_stream->need_wnd_adv) { 145676404edcSAsim Jamshed if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) { 145776404edcSAsim Jamshed if (!cur_stream->sndvar->on_ackq) { 145876404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->ackq_lock); 145976404edcSAsim Jamshed cur_stream->sndvar->on_ackq = TRUE; 146076404edcSAsim Jamshed StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */ 146176404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->ackq_lock); 146276404edcSAsim Jamshed cur_stream->need_wnd_adv = FALSE; 146376404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 146476404edcSAsim Jamshed } 146576404edcSAsim Jamshed } 146676404edcSAsim Jamshed } 146776404edcSAsim Jamshed 146876404edcSAsim Jamshed return copylen; 146976404edcSAsim Jamshed } 147076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 147176404edcSAsim Jamshed ssize_t 1472*df3fae06SAsim Jamshed mtcp_recv(mctx_t mctx, int sockid, char *buf, size_t len, int flags) 147376404edcSAsim Jamshed { 147476404edcSAsim Jamshed mtcp_manager_t mtcp; 147576404edcSAsim Jamshed socket_map_t socket; 147676404edcSAsim Jamshed tcp_stream *cur_stream; 147776404edcSAsim Jamshed struct tcp_recv_vars *rcvvar; 147876404edcSAsim Jamshed int event_remaining, merged_len; 147976404edcSAsim Jamshed int ret; 148076404edcSAsim Jamshed 148176404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 148276404edcSAsim Jamshed if (!mtcp) { 148376404edcSAsim Jamshed errno = EACCES; 148476404edcSAsim Jamshed return -1; 148576404edcSAsim Jamshed } 148676404edcSAsim Jamshed 148776404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 148876404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 148976404edcSAsim Jamshed errno = EBADF; 149076404edcSAsim Jamshed return -1; 149176404edcSAsim Jamshed } 149276404edcSAsim Jamshed 149376404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 149476404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 149576404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 149676404edcSAsim Jamshed errno = EBADF; 149776404edcSAsim Jamshed return -1; 149876404edcSAsim Jamshed } 149976404edcSAsim Jamshed 150076404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_PIPE) { 150176404edcSAsim Jamshed return PipeRead(mctx, sockid, buf, len); 150276404edcSAsim Jamshed } 150376404edcSAsim Jamshed 150476404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM) { 150576404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 150676404edcSAsim Jamshed errno = ENOTSOCK; 150776404edcSAsim Jamshed return -1; 150876404edcSAsim Jamshed } 150976404edcSAsim Jamshed 151076404edcSAsim Jamshed /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 151176404edcSAsim Jamshed cur_stream = socket->stream; 151276404edcSAsim Jamshed if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf || 151376404edcSAsim Jamshed !(cur_stream->state >= TCP_ST_ESTABLISHED && 151476404edcSAsim Jamshed cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 151576404edcSAsim Jamshed errno = ENOTCONN; 151676404edcSAsim Jamshed return -1; 151776404edcSAsim Jamshed } 151876404edcSAsim Jamshed 151976404edcSAsim Jamshed rcvvar = cur_stream->rcvvar; 152076404edcSAsim Jamshed 152176404edcSAsim Jamshed merged_len = tcprb_cflen(rcvvar->rcvbuf); 152276404edcSAsim Jamshed 152376404edcSAsim Jamshed /* if CLOSE_WAIT, return 0 if there is no payload */ 152476404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 152576404edcSAsim Jamshed if (!rcvvar->rcvbuf) 152676404edcSAsim Jamshed return 0; 152776404edcSAsim Jamshed 152876404edcSAsim Jamshed if (merged_len == 0) 152976404edcSAsim Jamshed return 0; 153076404edcSAsim Jamshed } 153176404edcSAsim Jamshed 153276404edcSAsim Jamshed /* return EAGAIN if no receive buffer */ 153376404edcSAsim Jamshed if (socket->opts & MTCP_NONBLOCK) { 153476404edcSAsim Jamshed if (!rcvvar->rcvbuf || merged_len == 0) { 153576404edcSAsim Jamshed errno = EAGAIN; 153676404edcSAsim Jamshed return -1; 153776404edcSAsim Jamshed } 153876404edcSAsim Jamshed } 153976404edcSAsim Jamshed 154076404edcSAsim Jamshed SBUF_LOCK(&rcvvar->read_lock); 154176404edcSAsim Jamshed 1542*df3fae06SAsim Jamshed switch (flags) { 1543*df3fae06SAsim Jamshed case 0: 154476404edcSAsim Jamshed ret = CopyToUser(mtcp, cur_stream, buf, len); 1545*df3fae06SAsim Jamshed break; 1546*df3fae06SAsim Jamshed case MSG_PEEK: 1547*df3fae06SAsim Jamshed ret = PeekForUser(mtcp, cur_stream, buf, len); 1548*df3fae06SAsim Jamshed break; 1549*df3fae06SAsim Jamshed default: 1550*df3fae06SAsim Jamshed SBUF_UNLOCK(&rcvvar->read_lock); 1551*df3fae06SAsim Jamshed ret = -1; 1552*df3fae06SAsim Jamshed errno = EINVAL; 1553*df3fae06SAsim Jamshed return ret; 1554*df3fae06SAsim Jamshed } 155576404edcSAsim Jamshed 155676404edcSAsim Jamshed merged_len = tcprb_cflen(rcvvar->rcvbuf); 155776404edcSAsim Jamshed event_remaining = FALSE; 155876404edcSAsim Jamshed /* if there are remaining payload, generate EPOLLIN */ 155976404edcSAsim Jamshed /* (may due to insufficient user buffer) */ 156076404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLIN) { 156176404edcSAsim Jamshed if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 156276404edcSAsim Jamshed event_remaining = TRUE; 156376404edcSAsim Jamshed } 156476404edcSAsim Jamshed } 156576404edcSAsim Jamshed /* if waiting for close, notify it if no remaining data */ 156676404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT && 156776404edcSAsim Jamshed merged_len == 0 && ret > 0) { 156876404edcSAsim Jamshed event_remaining = TRUE; 156976404edcSAsim Jamshed } 157076404edcSAsim Jamshed 157176404edcSAsim Jamshed SBUF_UNLOCK(&rcvvar->read_lock); 157276404edcSAsim Jamshed 157376404edcSAsim Jamshed if (event_remaining) { 157476404edcSAsim Jamshed if (socket->epoll) { 157576404edcSAsim Jamshed AddEpollEvent(mtcp->ep, 157676404edcSAsim Jamshed USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 157776404edcSAsim Jamshed } 157876404edcSAsim Jamshed } 157976404edcSAsim Jamshed 1580*df3fae06SAsim Jamshed TRACE_API("Stream %d: mtcp_recv() returning %d\n", cur_stream->id, ret); 158176404edcSAsim Jamshed return ret; 158276404edcSAsim Jamshed } 158376404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 1584*df3fae06SAsim Jamshed inline ssize_t 1585*df3fae06SAsim Jamshed mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len) 1586*df3fae06SAsim Jamshed { 1587*df3fae06SAsim Jamshed return mtcp_recv(mctx, sockid, buf, len, 0); 1588*df3fae06SAsim Jamshed } 1589*df3fae06SAsim Jamshed /*----------------------------------------------------------------------------*/ 159076404edcSAsim Jamshed ssize_t 1591a5e1a556SAsim Jamshed mtcp_readv(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV) 159276404edcSAsim Jamshed { 159376404edcSAsim Jamshed mtcp_manager_t mtcp; 159476404edcSAsim Jamshed socket_map_t socket; 159576404edcSAsim Jamshed tcp_stream *cur_stream; 159676404edcSAsim Jamshed struct tcp_recv_vars *rcvvar; 159776404edcSAsim Jamshed int ret, bytes_read, i; 159876404edcSAsim Jamshed int event_remaining, merged_len; 159976404edcSAsim Jamshed 160076404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 160176404edcSAsim Jamshed if (!mtcp) { 160276404edcSAsim Jamshed errno = EACCES; 160376404edcSAsim Jamshed return -1; 160476404edcSAsim Jamshed } 160576404edcSAsim Jamshed 160676404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 160776404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 160876404edcSAsim Jamshed errno = EBADF; 160976404edcSAsim Jamshed return -1; 161076404edcSAsim Jamshed } 161176404edcSAsim Jamshed 161276404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 161376404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 161476404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 161576404edcSAsim Jamshed errno = EBADF; 161676404edcSAsim Jamshed return -1; 161776404edcSAsim Jamshed } 161876404edcSAsim Jamshed 161976404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM) { 162076404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 162176404edcSAsim Jamshed errno = ENOTSOCK; 162276404edcSAsim Jamshed return -1; 162376404edcSAsim Jamshed } 162476404edcSAsim Jamshed 162576404edcSAsim Jamshed /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 162676404edcSAsim Jamshed cur_stream = socket->stream; 162776404edcSAsim Jamshed if (!cur_stream || 162876404edcSAsim Jamshed !(cur_stream->state >= TCP_ST_ESTABLISHED && 162976404edcSAsim Jamshed cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 163076404edcSAsim Jamshed errno = ENOTCONN; 163176404edcSAsim Jamshed return -1; 163276404edcSAsim Jamshed } 163376404edcSAsim Jamshed 163476404edcSAsim Jamshed rcvvar = cur_stream->rcvvar; 163576404edcSAsim Jamshed 163676404edcSAsim Jamshed merged_len = tcprb_cflen(rcvvar->rcvbuf); 163776404edcSAsim Jamshed 163876404edcSAsim Jamshed /* if CLOSE_WAIT, return 0 if there is no payload */ 163976404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 164076404edcSAsim Jamshed if (!rcvvar->rcvbuf) 164176404edcSAsim Jamshed return 0; 164276404edcSAsim Jamshed 164376404edcSAsim Jamshed if (merged_len == 0) 164476404edcSAsim Jamshed return 0; 164576404edcSAsim Jamshed } 164676404edcSAsim Jamshed 164776404edcSAsim Jamshed /* return EAGAIN if no receive buffer */ 164876404edcSAsim Jamshed if (socket->opts & MTCP_NONBLOCK) { 164976404edcSAsim Jamshed if (!rcvvar->rcvbuf || merged_len == 0) { 165076404edcSAsim Jamshed errno = EAGAIN; 165176404edcSAsim Jamshed return -1; 165276404edcSAsim Jamshed } 165376404edcSAsim Jamshed } 165476404edcSAsim Jamshed 165576404edcSAsim Jamshed SBUF_LOCK(&rcvvar->read_lock); 165676404edcSAsim Jamshed 165776404edcSAsim Jamshed /* read and store the contents to the vectored buffers */ 165876404edcSAsim Jamshed bytes_read = 0; 165976404edcSAsim Jamshed for (i = 0; i < numIOV; i++) { 166076404edcSAsim Jamshed if (iov[i].iov_len <= 0) 166176404edcSAsim Jamshed continue; 166276404edcSAsim Jamshed 166376404edcSAsim Jamshed ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 166476404edcSAsim Jamshed if (ret <= 0) 166576404edcSAsim Jamshed break; 166676404edcSAsim Jamshed 166776404edcSAsim Jamshed bytes_read += ret; 166876404edcSAsim Jamshed 166976404edcSAsim Jamshed if (ret < iov[i].iov_len) 167076404edcSAsim Jamshed break; 167176404edcSAsim Jamshed } 167276404edcSAsim Jamshed 167376404edcSAsim Jamshed merged_len = tcprb_cflen(rcvvar->rcvbuf); 167476404edcSAsim Jamshed 167576404edcSAsim Jamshed event_remaining = FALSE; 167676404edcSAsim Jamshed /* if there are remaining payload, generate read event */ 167776404edcSAsim Jamshed /* (may due to insufficient user buffer) */ 167876404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLIN) { 167976404edcSAsim Jamshed if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 168076404edcSAsim Jamshed event_remaining = TRUE; 168176404edcSAsim Jamshed } 168276404edcSAsim Jamshed } 168376404edcSAsim Jamshed /* if waiting for close, notify it if no remaining data */ 168476404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT && 168576404edcSAsim Jamshed merged_len == 0 && bytes_read > 0) { 168676404edcSAsim Jamshed event_remaining = TRUE; 168776404edcSAsim Jamshed } 168876404edcSAsim Jamshed 168976404edcSAsim Jamshed SBUF_UNLOCK(&rcvvar->read_lock); 169076404edcSAsim Jamshed 169176404edcSAsim Jamshed if(event_remaining) { 169276404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) { 169376404edcSAsim Jamshed AddEpollEvent(mtcp->ep, 169476404edcSAsim Jamshed USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 169576404edcSAsim Jamshed } 169676404edcSAsim Jamshed } 169776404edcSAsim Jamshed 169876404edcSAsim Jamshed TRACE_API("Stream %d: mtcp_readv() returning %d\n", 169976404edcSAsim Jamshed cur_stream->id, bytes_read); 170076404edcSAsim Jamshed return bytes_read; 170176404edcSAsim Jamshed } 170276404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 170376404edcSAsim Jamshed static inline int 1704a5e1a556SAsim Jamshed CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, const char *buf, int len) 170576404edcSAsim Jamshed { 170676404edcSAsim Jamshed struct tcp_send_vars *sndvar = cur_stream->sndvar; 170776404edcSAsim Jamshed int sndlen; 170876404edcSAsim Jamshed int ret; 170976404edcSAsim Jamshed 171076404edcSAsim Jamshed sndlen = MIN((int)sndvar->snd_wnd, len); 171176404edcSAsim Jamshed if (sndlen <= 0) { 171276404edcSAsim Jamshed errno = EAGAIN; 171376404edcSAsim Jamshed return -1; 171476404edcSAsim Jamshed } 171576404edcSAsim Jamshed 171676404edcSAsim Jamshed /* allocate send buffer if not exist */ 171776404edcSAsim Jamshed if (!sndvar->sndbuf) { 171876404edcSAsim Jamshed sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1); 171976404edcSAsim Jamshed if (!sndvar->sndbuf) { 172076404edcSAsim Jamshed cur_stream->close_reason = TCP_NO_MEM; 172176404edcSAsim Jamshed /* notification may not required due to -1 return */ 172276404edcSAsim Jamshed errno = ENOMEM; 172376404edcSAsim Jamshed return -1; 172476404edcSAsim Jamshed } 172576404edcSAsim Jamshed } 172676404edcSAsim Jamshed 172776404edcSAsim Jamshed ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen); 172876404edcSAsim Jamshed assert(ret == sndlen); 172976404edcSAsim Jamshed sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len; 173076404edcSAsim Jamshed if (ret <= 0) { 173176404edcSAsim Jamshed TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n", 173276404edcSAsim Jamshed ret, sndlen, sndvar->sndbuf->len); 173376404edcSAsim Jamshed errno = EAGAIN; 173476404edcSAsim Jamshed return -1; 173576404edcSAsim Jamshed } 173676404edcSAsim Jamshed 173776404edcSAsim Jamshed if (sndvar->snd_wnd <= 0) { 173876404edcSAsim Jamshed TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n", 173976404edcSAsim Jamshed cur_stream->id, sndvar->snd_wnd); 174076404edcSAsim Jamshed } 174176404edcSAsim Jamshed 174276404edcSAsim Jamshed return ret; 174376404edcSAsim Jamshed } 174476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 174576404edcSAsim Jamshed ssize_t 1746a5e1a556SAsim Jamshed mtcp_write(mctx_t mctx, int sockid, const char *buf, size_t len) 174776404edcSAsim Jamshed { 174876404edcSAsim Jamshed mtcp_manager_t mtcp; 174976404edcSAsim Jamshed socket_map_t socket; 175076404edcSAsim Jamshed tcp_stream *cur_stream; 175176404edcSAsim Jamshed struct tcp_send_vars *sndvar; 175276404edcSAsim Jamshed int ret; 175376404edcSAsim Jamshed 175476404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 175576404edcSAsim Jamshed if (!mtcp) { 175676404edcSAsim Jamshed errno = EACCES; 175776404edcSAsim Jamshed return -1; 175876404edcSAsim Jamshed } 175976404edcSAsim Jamshed 176076404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 176176404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 176276404edcSAsim Jamshed errno = EBADF; 176376404edcSAsim Jamshed return -1; 176476404edcSAsim Jamshed } 176576404edcSAsim Jamshed 176676404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 176776404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 176876404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 176976404edcSAsim Jamshed errno = EBADF; 177076404edcSAsim Jamshed return -1; 177176404edcSAsim Jamshed } 177276404edcSAsim Jamshed 177376404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_PIPE) { 177476404edcSAsim Jamshed return PipeWrite(mctx, sockid, buf, len); 177576404edcSAsim Jamshed } 177676404edcSAsim Jamshed 177776404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM) { 177876404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 177976404edcSAsim Jamshed errno = ENOTSOCK; 178076404edcSAsim Jamshed return -1; 178176404edcSAsim Jamshed } 178276404edcSAsim Jamshed 178376404edcSAsim Jamshed cur_stream = socket->stream; 178476404edcSAsim Jamshed if (!cur_stream || 178576404edcSAsim Jamshed !(cur_stream->state == TCP_ST_ESTABLISHED || 178676404edcSAsim Jamshed cur_stream->state == TCP_ST_CLOSE_WAIT)) { 178776404edcSAsim Jamshed errno = ENOTCONN; 178876404edcSAsim Jamshed return -1; 178976404edcSAsim Jamshed } 179076404edcSAsim Jamshed 179176404edcSAsim Jamshed if (len <= 0) { 179276404edcSAsim Jamshed if (socket->opts & MTCP_NONBLOCK) { 179376404edcSAsim Jamshed errno = EAGAIN; 179476404edcSAsim Jamshed return -1; 179576404edcSAsim Jamshed } else { 179676404edcSAsim Jamshed return 0; 179776404edcSAsim Jamshed } 179876404edcSAsim Jamshed } 179976404edcSAsim Jamshed 180076404edcSAsim Jamshed sndvar = cur_stream->sndvar; 180176404edcSAsim Jamshed 180276404edcSAsim Jamshed SBUF_LOCK(&sndvar->write_lock); 180376404edcSAsim Jamshed ret = CopyFromUser(mtcp, cur_stream, buf, len); 180476404edcSAsim Jamshed 180576404edcSAsim Jamshed SBUF_UNLOCK(&sndvar->write_lock); 180676404edcSAsim Jamshed 180776404edcSAsim Jamshed if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 180876404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->sendq_lock); 180976404edcSAsim Jamshed sndvar->on_sendq = TRUE; 181076404edcSAsim Jamshed StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 181176404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->sendq_lock); 181276404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 181376404edcSAsim Jamshed } 181476404edcSAsim Jamshed 181576404edcSAsim Jamshed if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) { 181676404edcSAsim Jamshed ret = -1; 181776404edcSAsim Jamshed errno = EAGAIN; 181876404edcSAsim Jamshed } 181976404edcSAsim Jamshed 182076404edcSAsim Jamshed /* if there are remaining sending buffer, generate write event */ 182176404edcSAsim Jamshed if (sndvar->snd_wnd > 0) { 182276404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 182376404edcSAsim Jamshed AddEpollEvent(mtcp->ep, 182476404edcSAsim Jamshed USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 182576404edcSAsim Jamshed } 182676404edcSAsim Jamshed } 182776404edcSAsim Jamshed 182876404edcSAsim Jamshed TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret); 182976404edcSAsim Jamshed return ret; 183076404edcSAsim Jamshed } 183176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 183276404edcSAsim Jamshed ssize_t 1833a5e1a556SAsim Jamshed mtcp_writev(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV) 183476404edcSAsim Jamshed { 183576404edcSAsim Jamshed mtcp_manager_t mtcp; 183676404edcSAsim Jamshed socket_map_t socket; 183776404edcSAsim Jamshed tcp_stream *cur_stream; 183876404edcSAsim Jamshed struct tcp_send_vars *sndvar; 183976404edcSAsim Jamshed int ret, to_write, i; 184076404edcSAsim Jamshed 184176404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 184276404edcSAsim Jamshed if (!mtcp) { 184376404edcSAsim Jamshed errno = EACCES; 184476404edcSAsim Jamshed return -1; 184576404edcSAsim Jamshed } 184676404edcSAsim Jamshed 184776404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 184876404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 184976404edcSAsim Jamshed errno = EBADF; 185076404edcSAsim Jamshed return -1; 185176404edcSAsim Jamshed } 185276404edcSAsim Jamshed 185376404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 185476404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 185576404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 185676404edcSAsim Jamshed errno = EBADF; 185776404edcSAsim Jamshed return -1; 185876404edcSAsim Jamshed } 185976404edcSAsim Jamshed 186076404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM) { 186176404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 186276404edcSAsim Jamshed errno = ENOTSOCK; 186376404edcSAsim Jamshed return -1; 186476404edcSAsim Jamshed } 186576404edcSAsim Jamshed 186676404edcSAsim Jamshed cur_stream = socket->stream; 186776404edcSAsim Jamshed if (!cur_stream || 186876404edcSAsim Jamshed !(cur_stream->state == TCP_ST_ESTABLISHED || 186976404edcSAsim Jamshed cur_stream->state == TCP_ST_CLOSE_WAIT)) { 187076404edcSAsim Jamshed errno = ENOTCONN; 187176404edcSAsim Jamshed return -1; 187276404edcSAsim Jamshed } 187376404edcSAsim Jamshed 187476404edcSAsim Jamshed sndvar = cur_stream->sndvar; 187576404edcSAsim Jamshed SBUF_LOCK(&sndvar->write_lock); 187676404edcSAsim Jamshed 187776404edcSAsim Jamshed /* write from the vectored buffers */ 187876404edcSAsim Jamshed to_write = 0; 187976404edcSAsim Jamshed for (i = 0; i < numIOV; i++) { 188076404edcSAsim Jamshed if (iov[i].iov_len <= 0) 188176404edcSAsim Jamshed continue; 188276404edcSAsim Jamshed 188376404edcSAsim Jamshed ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 188476404edcSAsim Jamshed if (ret <= 0) 188576404edcSAsim Jamshed break; 188676404edcSAsim Jamshed 188776404edcSAsim Jamshed to_write += ret; 188876404edcSAsim Jamshed 188976404edcSAsim Jamshed if (ret < iov[i].iov_len) 189076404edcSAsim Jamshed break; 189176404edcSAsim Jamshed } 189276404edcSAsim Jamshed SBUF_UNLOCK(&sndvar->write_lock); 189376404edcSAsim Jamshed 189476404edcSAsim Jamshed if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 189576404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->sendq_lock); 189676404edcSAsim Jamshed sndvar->on_sendq = TRUE; 189776404edcSAsim Jamshed StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 189876404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->sendq_lock); 189976404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 190076404edcSAsim Jamshed } 190176404edcSAsim Jamshed 190276404edcSAsim Jamshed if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) { 190376404edcSAsim Jamshed to_write = -1; 190476404edcSAsim Jamshed errno = EAGAIN; 190576404edcSAsim Jamshed } 190676404edcSAsim Jamshed 190776404edcSAsim Jamshed /* if there are remaining sending buffer, generate write event */ 190876404edcSAsim Jamshed if (sndvar->snd_wnd > 0) { 190976404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 191076404edcSAsim Jamshed AddEpollEvent(mtcp->ep, 191176404edcSAsim Jamshed USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 191276404edcSAsim Jamshed } 191376404edcSAsim Jamshed } 191476404edcSAsim Jamshed 191576404edcSAsim Jamshed TRACE_API("Stream %d: mtcp_writev() returning %d\n", 191676404edcSAsim Jamshed cur_stream->id, to_write); 191776404edcSAsim Jamshed return to_write; 191876404edcSAsim Jamshed } 191976404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 192076404edcSAsim Jamshed uint32_t 192176404edcSAsim Jamshed mtcp_get_connection_cnt(mctx_t mctx) 192276404edcSAsim Jamshed { 192376404edcSAsim Jamshed mtcp_manager_t mtcp; 192476404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 192576404edcSAsim Jamshed if (!mtcp) { 192676404edcSAsim Jamshed errno = EACCES; 192776404edcSAsim Jamshed return -1; 192876404edcSAsim Jamshed } 192976404edcSAsim Jamshed 193076404edcSAsim Jamshed if (mtcp->num_msp > 0) 193176404edcSAsim Jamshed return mtcp->flow_cnt / 2; 193276404edcSAsim Jamshed else 193376404edcSAsim Jamshed return mtcp->flow_cnt; 193476404edcSAsim Jamshed } 193576404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 1936