176404edcSAsim Jamshed #include <sys/queue.h> 276404edcSAsim Jamshed #include <sys/ioctl.h> 376404edcSAsim Jamshed #include <limits.h> 476404edcSAsim Jamshed #include <unistd.h> 576404edcSAsim Jamshed #include <assert.h> 676404edcSAsim Jamshed #include <string.h> 776404edcSAsim Jamshed 876404edcSAsim Jamshed #ifdef DARWIN 976404edcSAsim Jamshed #include <netinet/tcp.h> 1076404edcSAsim Jamshed #include <netinet/ip.h> 1176404edcSAsim Jamshed #include <netinet/if_ether.h> 1276404edcSAsim Jamshed #endif 1376404edcSAsim Jamshed 1476404edcSAsim Jamshed #include "mtcp.h" 1576404edcSAsim Jamshed #include "mtcp_api.h" 1676404edcSAsim Jamshed #include "tcp_in.h" 1776404edcSAsim Jamshed #include "tcp_stream.h" 1876404edcSAsim Jamshed #include "tcp_out.h" 1976404edcSAsim Jamshed #include "ip_out.h" 2076404edcSAsim Jamshed #include "eventpoll.h" 2176404edcSAsim Jamshed #include "pipe.h" 2276404edcSAsim Jamshed #include "fhash.h" 2376404edcSAsim Jamshed #include "addr_pool.h" 2476404edcSAsim Jamshed #include "rss.h" 2576404edcSAsim Jamshed #include "config.h" 2676404edcSAsim Jamshed #include "debug.h" 2776404edcSAsim Jamshed #include "eventpoll.h" 2876404edcSAsim Jamshed #include "mos_api.h" 2976404edcSAsim Jamshed #include "tcp_rb.h" 3076404edcSAsim Jamshed 3176404edcSAsim Jamshed #define MAX(a, b) ((a)>(b)?(a):(b)) 3276404edcSAsim Jamshed #define MIN(a, b) ((a)<(b)?(a):(b)) 3376404edcSAsim Jamshed 3476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 3576404edcSAsim Jamshed /** Stop monitoring the socket! (function prototype) 3676404edcSAsim Jamshed * @param [in] mctx: mtcp context 3776404edcSAsim Jamshed * @param [in] sock: monitoring stream socket id 3876404edcSAsim Jamshed * @param [in] side: side of monitoring (client side, server side or both) 3976404edcSAsim Jamshed * 4076404edcSAsim Jamshed * This function is now DEPRECATED and is only used within mOS core... 4176404edcSAsim Jamshed */ 4276404edcSAsim Jamshed int 4376404edcSAsim Jamshed mtcp_cb_stop(mctx_t mctx, int sock, int side); 4476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 4576404edcSAsim Jamshed /** Reset the connection (send RST packets to both sides) 4676404edcSAsim Jamshed * (We need to decide the API for this.) 4776404edcSAsim Jamshed */ 4876404edcSAsim Jamshed //int 4976404edcSAsim Jamshed //mtcp_cb_reset(mctx_t mctx, int sock, int side); 5076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 5176404edcSAsim Jamshed inline mtcp_manager_t 5276404edcSAsim Jamshed GetMTCPManager(mctx_t mctx) 5376404edcSAsim Jamshed { 5476404edcSAsim Jamshed if (!mctx) { 5576404edcSAsim Jamshed errno = EACCES; 5676404edcSAsim Jamshed return NULL; 5776404edcSAsim Jamshed } 5876404edcSAsim Jamshed 5976404edcSAsim Jamshed if (mctx->cpu < 0 || mctx->cpu >= num_cpus) { 6076404edcSAsim Jamshed errno = EINVAL; 6176404edcSAsim Jamshed return NULL; 6276404edcSAsim Jamshed } 6376404edcSAsim Jamshed 64*4cb4e140SAsim Jamshed if (!g_mtcp[mctx->cpu] || g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) { 6576404edcSAsim Jamshed errno = EPERM; 6676404edcSAsim Jamshed return NULL; 6776404edcSAsim Jamshed } 6876404edcSAsim Jamshed 6976404edcSAsim Jamshed return g_mtcp[mctx->cpu]; 7076404edcSAsim Jamshed } 7176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 7276404edcSAsim Jamshed inline int 7376404edcSAsim Jamshed GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen) 7476404edcSAsim Jamshed { 7576404edcSAsim Jamshed tcp_stream *cur_stream; 7676404edcSAsim Jamshed 7776404edcSAsim Jamshed if (!socket->stream) { 7876404edcSAsim Jamshed errno = EBADF; 7976404edcSAsim Jamshed return -1; 8076404edcSAsim Jamshed } 8176404edcSAsim Jamshed 8276404edcSAsim Jamshed cur_stream = socket->stream; 8376404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 8476404edcSAsim Jamshed if (cur_stream->close_reason == TCP_TIMEDOUT || 8576404edcSAsim Jamshed cur_stream->close_reason == TCP_CONN_FAIL || 8676404edcSAsim Jamshed cur_stream->close_reason == TCP_CONN_LOST) { 8776404edcSAsim Jamshed *(int *)optval = ETIMEDOUT; 8876404edcSAsim Jamshed *optlen = sizeof(int); 8976404edcSAsim Jamshed 9076404edcSAsim Jamshed return 0; 9176404edcSAsim Jamshed } 9276404edcSAsim Jamshed } 9376404edcSAsim Jamshed 9476404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT || 9576404edcSAsim Jamshed cur_stream->state == TCP_ST_CLOSED_RSVD) { 9676404edcSAsim Jamshed if (cur_stream->close_reason == TCP_RESET) { 9776404edcSAsim Jamshed *(int *)optval = ECONNRESET; 9876404edcSAsim Jamshed *optlen = sizeof(int); 9976404edcSAsim Jamshed 10076404edcSAsim Jamshed return 0; 10176404edcSAsim Jamshed } 10276404edcSAsim Jamshed } 10376404edcSAsim Jamshed 104265cb675SAsim Jamshed /* 105265cb675SAsim Jamshed * `base case`: If socket sees no so_error, then 106265cb675SAsim Jamshed * this also means close_reason will always be 107265cb675SAsim Jamshed * TCP_NOT_CLOSED. 108265cb675SAsim Jamshed */ 109265cb675SAsim Jamshed if (cur_stream->close_reason == TCP_NOT_CLOSED) { 110265cb675SAsim Jamshed *(int *)optval = 0; 111265cb675SAsim Jamshed *optlen = sizeof(int); 112265cb675SAsim Jamshed 113265cb675SAsim Jamshed return 0; 114265cb675SAsim Jamshed } 115265cb675SAsim Jamshed 11676404edcSAsim Jamshed errno = ENOSYS; 11776404edcSAsim Jamshed return -1; 11876404edcSAsim Jamshed } 11976404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 12076404edcSAsim Jamshed int 12176404edcSAsim Jamshed mtcp_getsockopt(mctx_t mctx, int sockid, int level, 12276404edcSAsim Jamshed int optname, void *optval, socklen_t *optlen) 12376404edcSAsim Jamshed { 12476404edcSAsim Jamshed mtcp_manager_t mtcp; 12576404edcSAsim Jamshed socket_map_t socket; 12676404edcSAsim Jamshed 12776404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 12876404edcSAsim Jamshed if (!mtcp) { 12976404edcSAsim Jamshed errno = EACCES; 13076404edcSAsim Jamshed return -1; 13176404edcSAsim Jamshed } 13276404edcSAsim Jamshed 13376404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 13476404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 13576404edcSAsim Jamshed errno = EBADF; 13676404edcSAsim Jamshed return -1; 13776404edcSAsim Jamshed } 13876404edcSAsim Jamshed 13976404edcSAsim Jamshed switch (level) { 14076404edcSAsim Jamshed case SOL_SOCKET: 14176404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 14276404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 14376404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 14476404edcSAsim Jamshed errno = EBADF; 14576404edcSAsim Jamshed return -1; 14676404edcSAsim Jamshed } 14776404edcSAsim Jamshed 14876404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 14976404edcSAsim Jamshed socket->socktype != MOS_SOCK_STREAM) { 15076404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 15176404edcSAsim Jamshed errno = ENOTSOCK; 15276404edcSAsim Jamshed return -1; 15376404edcSAsim Jamshed } 15476404edcSAsim Jamshed 15576404edcSAsim Jamshed if (optname == SO_ERROR) { 15676404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_STREAM) { 15776404edcSAsim Jamshed return GetSocketError(socket, optval, optlen); 15876404edcSAsim Jamshed } 15976404edcSAsim Jamshed } 16076404edcSAsim Jamshed break; 16176404edcSAsim Jamshed case SOL_MONSOCKET: 16276404edcSAsim Jamshed /* check if the calling thread is in MOS context */ 16376404edcSAsim Jamshed if (mtcp->ctx->thread != pthread_self()) { 16476404edcSAsim Jamshed errno = EPERM; 16576404edcSAsim Jamshed return -1; 16676404edcSAsim Jamshed } 16776404edcSAsim Jamshed /* 16876404edcSAsim Jamshed * All options will only work for active 16976404edcSAsim Jamshed * monitor stream sockets 17076404edcSAsim Jamshed */ 17176404edcSAsim Jamshed socket = &mtcp->msmap[sockid]; 17276404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 17376404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 17476404edcSAsim Jamshed errno = ENOTSOCK; 17576404edcSAsim Jamshed return -1; 17676404edcSAsim Jamshed } 17776404edcSAsim Jamshed 17876404edcSAsim Jamshed switch (optname) { 17976404edcSAsim Jamshed case MOS_FRAGINFO_CLIBUF: 18076404edcSAsim Jamshed return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen); 18176404edcSAsim Jamshed case MOS_FRAGINFO_SVRBUF: 18276404edcSAsim Jamshed return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen); 18376404edcSAsim Jamshed case MOS_INFO_CLIBUF: 18476404edcSAsim Jamshed return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen); 18576404edcSAsim Jamshed case MOS_INFO_SVRBUF: 18676404edcSAsim Jamshed return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen); 18776404edcSAsim Jamshed case MOS_TCP_STATE_CLI: 18876404edcSAsim Jamshed return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI, 18976404edcSAsim Jamshed optval, optlen); 19076404edcSAsim Jamshed case MOS_TCP_STATE_SVR: 19176404edcSAsim Jamshed return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR, 19276404edcSAsim Jamshed optval, optlen); 19376404edcSAsim Jamshed case MOS_TIMESTAMP: 19476404edcSAsim Jamshed return GetLastTimestamp(socket->monitor_stream->stream, 19576404edcSAsim Jamshed (uint32_t *)optval, 19676404edcSAsim Jamshed optlen); 19776404edcSAsim Jamshed default: 19876404edcSAsim Jamshed TRACE_API("can't recognize the optname=%d\n", optname); 19976404edcSAsim Jamshed assert(0); 20076404edcSAsim Jamshed } 20176404edcSAsim Jamshed break; 20276404edcSAsim Jamshed } 20376404edcSAsim Jamshed errno = ENOSYS; 20476404edcSAsim Jamshed return -1; 20576404edcSAsim Jamshed } 20676404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 20776404edcSAsim Jamshed int 20876404edcSAsim Jamshed mtcp_setsockopt(mctx_t mctx, int sockid, int level, 20976404edcSAsim Jamshed int optname, const void *optval, socklen_t optlen) 21076404edcSAsim Jamshed { 21176404edcSAsim Jamshed mtcp_manager_t mtcp; 21276404edcSAsim Jamshed socket_map_t socket; 21376404edcSAsim Jamshed tcprb_t *rb; 21476404edcSAsim Jamshed 21576404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 21676404edcSAsim Jamshed if (!mtcp) { 21776404edcSAsim Jamshed errno = EACCES; 21876404edcSAsim Jamshed return -1; 21976404edcSAsim Jamshed } 22076404edcSAsim Jamshed 22176404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 22276404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 22376404edcSAsim Jamshed errno = EBADF; 22476404edcSAsim Jamshed return -1; 22576404edcSAsim Jamshed } 22676404edcSAsim Jamshed 22776404edcSAsim Jamshed switch (level) { 22876404edcSAsim Jamshed case SOL_SOCKET: 22976404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 23076404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 23176404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 23276404edcSAsim Jamshed errno = EBADF; 23376404edcSAsim Jamshed return -1; 23476404edcSAsim Jamshed } 23576404edcSAsim Jamshed 23676404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 23776404edcSAsim Jamshed socket->socktype != MOS_SOCK_STREAM) { 23876404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 23976404edcSAsim Jamshed errno = ENOTSOCK; 24076404edcSAsim Jamshed return -1; 24176404edcSAsim Jamshed } 24276404edcSAsim Jamshed break; 24376404edcSAsim Jamshed case SOL_MONSOCKET: 24476404edcSAsim Jamshed socket = &mtcp->msmap[sockid]; 24576404edcSAsim Jamshed /* 24676404edcSAsim Jamshed * checking of calling thread to be in MOS context is 24776404edcSAsim Jamshed * disabled since both optnames can be called from 24876404edcSAsim Jamshed * `application' context (on passive sockets) 24976404edcSAsim Jamshed */ 25076404edcSAsim Jamshed /* 25176404edcSAsim Jamshed * if (mtcp->ctx->thread != pthread_self()) 25276404edcSAsim Jamshed * return -1; 25376404edcSAsim Jamshed */ 25476404edcSAsim Jamshed 25576404edcSAsim Jamshed switch (optname) { 25676404edcSAsim Jamshed case MOS_CLIBUF: 25776404edcSAsim Jamshed #if 0 25876404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 25976404edcSAsim Jamshed errno = EBADF; 26076404edcSAsim Jamshed return -1; 26176404edcSAsim Jamshed } 26276404edcSAsim Jamshed #endif 26376404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE 26476404edcSAsim Jamshed if (*(int *)optval != 0) 26576404edcSAsim Jamshed return -1; 26676404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 26776404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 26876404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 26976404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 27076404edcSAsim Jamshed if (rb) { 27176404edcSAsim Jamshed tcprb_resize_meta(rb, 0); 27276404edcSAsim Jamshed tcprb_resize(rb, 0); 27376404edcSAsim Jamshed } 27476404edcSAsim Jamshed } 27576404edcSAsim Jamshed return DisableBuf(socket, MOS_SIDE_CLI); 27676404edcSAsim Jamshed #else 27776404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 27876404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 27976404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 28076404edcSAsim Jamshed if (tcprb_resize_meta(rb, *(int *)optval) < 0) 28176404edcSAsim Jamshed return -1; 28276404edcSAsim Jamshed return tcprb_resize(rb, 28376404edcSAsim Jamshed (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 28476404edcSAsim Jamshed #endif 28576404edcSAsim Jamshed case MOS_SVRBUF: 28676404edcSAsim Jamshed #if 0 28776404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 28876404edcSAsim Jamshed errno = EBADF; 28976404edcSAsim Jamshed return -1; 29076404edcSAsim Jamshed } 29176404edcSAsim Jamshed #endif 29276404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE 29376404edcSAsim Jamshed if (*(int *)optval != 0) 29476404edcSAsim Jamshed return -1; 29576404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 29676404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 29776404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 29876404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 29976404edcSAsim Jamshed if (rb) { 30076404edcSAsim Jamshed tcprb_resize_meta(rb, 0); 30176404edcSAsim Jamshed tcprb_resize(rb, 0); 30276404edcSAsim Jamshed } 30376404edcSAsim Jamshed } 30476404edcSAsim Jamshed return DisableBuf(socket, MOS_SIDE_SVR); 30576404edcSAsim Jamshed #else 30676404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 30776404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 30876404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 30976404edcSAsim Jamshed if (tcprb_resize_meta(rb, *(int *)optval) < 0) 31076404edcSAsim Jamshed return -1; 31176404edcSAsim Jamshed return tcprb_resize(rb, 31276404edcSAsim Jamshed (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 31376404edcSAsim Jamshed #endif 31476404edcSAsim Jamshed case MOS_FRAG_CLIBUF: 31576404edcSAsim Jamshed #if 0 31676404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 31776404edcSAsim Jamshed errno = EBADF; 31876404edcSAsim Jamshed return -1; 31976404edcSAsim Jamshed } 32076404edcSAsim Jamshed #endif 32176404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE 32276404edcSAsim Jamshed if (*(int *)optval != 0) 32376404edcSAsim Jamshed return -1; 32476404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 32576404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 32676404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 32776404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 32876404edcSAsim Jamshed if (rb) 32976404edcSAsim Jamshed tcprb_resize(rb, 0); 33076404edcSAsim Jamshed } 33176404edcSAsim Jamshed return 0; 33276404edcSAsim Jamshed #else 33376404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 33476404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 33576404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 33676404edcSAsim Jamshed if (rb->len == 0) 33776404edcSAsim Jamshed return tcprb_resize_meta(rb, *(int *)optval); 33876404edcSAsim Jamshed else 33976404edcSAsim Jamshed return -1; 34076404edcSAsim Jamshed #endif 34176404edcSAsim Jamshed case MOS_FRAG_SVRBUF: 34276404edcSAsim Jamshed #if 0 34376404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 34476404edcSAsim Jamshed errno = EBADF; 34576404edcSAsim Jamshed return -1; 34676404edcSAsim Jamshed } 34776404edcSAsim Jamshed #endif 34876404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE 34976404edcSAsim Jamshed if (*(int *)optval != 0) 35076404edcSAsim Jamshed return -1; 35176404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 35276404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 35376404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 35476404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 35576404edcSAsim Jamshed if (rb) 35676404edcSAsim Jamshed tcprb_resize(rb, 0); 35776404edcSAsim Jamshed } 35876404edcSAsim Jamshed return 0; 35976404edcSAsim Jamshed #else 36076404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 36176404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 36276404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 36376404edcSAsim Jamshed if (rb->len == 0) 36476404edcSAsim Jamshed return tcprb_resize_meta(rb, *(int *)optval); 36576404edcSAsim Jamshed else 36676404edcSAsim Jamshed return -1; 36776404edcSAsim Jamshed #endif 36876404edcSAsim Jamshed case MOS_MONLEVEL: 36976404edcSAsim Jamshed #ifdef OLD_API 37076404edcSAsim Jamshed assert(*(int *)optval == MOS_NO_CLIBUF || 37176404edcSAsim Jamshed *(int *)optval == MOS_NO_SVRBUF); 37276404edcSAsim Jamshed return DisableBuf(socket, 37376404edcSAsim Jamshed (*(int *)optval == MOS_NO_CLIBUF) ? 37476404edcSAsim Jamshed MOS_SIDE_CLI : MOS_SIDE_SVR); 37576404edcSAsim Jamshed #endif 37676404edcSAsim Jamshed case MOS_SEQ_REMAP: 37776404edcSAsim Jamshed return TcpSeqChange(socket, 37876404edcSAsim Jamshed (uint32_t)((seq_remap_info *)optval)->seq_off, 37976404edcSAsim Jamshed ((seq_remap_info *)optval)->side, 38076404edcSAsim Jamshed mtcp->pctx->p.seq); 38176404edcSAsim Jamshed case MOS_STOP_MON: 38276404edcSAsim Jamshed return mtcp_cb_stop(mctx, sockid, *(int *)optval); 38376404edcSAsim Jamshed default: 38476404edcSAsim Jamshed TRACE_API("invalid optname=%d\n", optname); 38576404edcSAsim Jamshed assert(0); 38676404edcSAsim Jamshed } 38776404edcSAsim Jamshed break; 38876404edcSAsim Jamshed } 38976404edcSAsim Jamshed 39076404edcSAsim Jamshed errno = ENOSYS; 39176404edcSAsim Jamshed return -1; 39276404edcSAsim Jamshed } 39376404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 39476404edcSAsim Jamshed int 39576404edcSAsim Jamshed mtcp_setsock_nonblock(mctx_t mctx, int sockid) 39676404edcSAsim Jamshed { 39776404edcSAsim Jamshed mtcp_manager_t mtcp; 39876404edcSAsim Jamshed 39976404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 40076404edcSAsim Jamshed if (!mtcp) { 40176404edcSAsim Jamshed errno = EACCES; 40276404edcSAsim Jamshed return -1; 40376404edcSAsim Jamshed } 40476404edcSAsim Jamshed 40576404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 40676404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 40776404edcSAsim Jamshed errno = EBADF; 40876404edcSAsim Jamshed return -1; 40976404edcSAsim Jamshed } 41076404edcSAsim Jamshed 41176404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 41276404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 41376404edcSAsim Jamshed errno = EBADF; 41476404edcSAsim Jamshed return -1; 41576404edcSAsim Jamshed } 41676404edcSAsim Jamshed 41776404edcSAsim Jamshed mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 41876404edcSAsim Jamshed 41976404edcSAsim Jamshed return 0; 42076404edcSAsim Jamshed } 42176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 42276404edcSAsim Jamshed int 42376404edcSAsim Jamshed mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp) 42476404edcSAsim Jamshed { 42576404edcSAsim Jamshed mtcp_manager_t mtcp; 42676404edcSAsim Jamshed socket_map_t socket; 42776404edcSAsim Jamshed 42876404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 42976404edcSAsim Jamshed if (!mtcp) { 43076404edcSAsim Jamshed errno = EACCES; 43176404edcSAsim Jamshed return -1; 43276404edcSAsim Jamshed } 43376404edcSAsim Jamshed 43476404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 43576404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 43676404edcSAsim Jamshed errno = EBADF; 43776404edcSAsim Jamshed return -1; 43876404edcSAsim Jamshed } 43976404edcSAsim Jamshed 44076404edcSAsim Jamshed /* only support stream socket */ 44176404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 44276404edcSAsim Jamshed 44376404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 44476404edcSAsim Jamshed socket->socktype != MOS_SOCK_STREAM) { 44576404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 44676404edcSAsim Jamshed errno = EBADF; 44776404edcSAsim Jamshed return -1; 44876404edcSAsim Jamshed } 44976404edcSAsim Jamshed 45076404edcSAsim Jamshed if (!argp) { 45176404edcSAsim Jamshed errno = EFAULT; 45276404edcSAsim Jamshed return -1; 45376404edcSAsim Jamshed } 45476404edcSAsim Jamshed 45576404edcSAsim Jamshed if (request == FIONREAD) { 45676404edcSAsim Jamshed tcp_stream *cur_stream; 45776404edcSAsim Jamshed tcprb_t *rbuf; 45876404edcSAsim Jamshed 45976404edcSAsim Jamshed cur_stream = socket->stream; 46076404edcSAsim Jamshed if (!cur_stream) { 46176404edcSAsim Jamshed errno = EBADF; 46276404edcSAsim Jamshed return -1; 46376404edcSAsim Jamshed } 46476404edcSAsim Jamshed 46576404edcSAsim Jamshed rbuf = cur_stream->rcvvar->rcvbuf; 46676404edcSAsim Jamshed *(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0; 46776404edcSAsim Jamshed 46876404edcSAsim Jamshed } else if (request == FIONBIO) { 46976404edcSAsim Jamshed /* 47076404edcSAsim Jamshed * sockets can only be set to blocking/non-blocking 47176404edcSAsim Jamshed * modes during initialization 47276404edcSAsim Jamshed */ 47376404edcSAsim Jamshed if ((*(int *)argp)) 47476404edcSAsim Jamshed mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 47576404edcSAsim Jamshed else 47676404edcSAsim Jamshed mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK; 47776404edcSAsim Jamshed } else { 47876404edcSAsim Jamshed errno = EINVAL; 47976404edcSAsim Jamshed return -1; 48076404edcSAsim Jamshed } 48176404edcSAsim Jamshed 48276404edcSAsim Jamshed return 0; 48376404edcSAsim Jamshed } 48476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 48576404edcSAsim Jamshed static int 48676404edcSAsim Jamshed mtcp_monitor(mctx_t mctx, socket_map_t sock) 48776404edcSAsim Jamshed { 48876404edcSAsim Jamshed mtcp_manager_t mtcp; 48976404edcSAsim Jamshed struct mon_listener *monitor; 49076404edcSAsim Jamshed int sockid = sock->id; 49176404edcSAsim Jamshed 49276404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 49376404edcSAsim Jamshed if (!mtcp) { 49476404edcSAsim Jamshed errno = EACCES; 49576404edcSAsim Jamshed return -1; 49676404edcSAsim Jamshed } 49776404edcSAsim Jamshed 49876404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 49976404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 50076404edcSAsim Jamshed errno = EBADF; 50176404edcSAsim Jamshed return -1; 50276404edcSAsim Jamshed } 50376404edcSAsim Jamshed 50476404edcSAsim Jamshed if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) { 50576404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 50676404edcSAsim Jamshed errno = EBADF; 50776404edcSAsim Jamshed return -1; 50876404edcSAsim Jamshed } 50976404edcSAsim Jamshed 51076404edcSAsim Jamshed if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM || 51176404edcSAsim Jamshed mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) { 51276404edcSAsim Jamshed TRACE_API("Not a monitor socket. id: %d\n", sockid); 51376404edcSAsim Jamshed errno = ENOTSOCK; 51476404edcSAsim Jamshed return -1; 51576404edcSAsim Jamshed } 51676404edcSAsim Jamshed 51776404edcSAsim Jamshed monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener)); 51876404edcSAsim Jamshed if (!monitor) { 51976404edcSAsim Jamshed /* errno set from the malloc() */ 52076404edcSAsim Jamshed errno = ENOMEM; 52176404edcSAsim Jamshed return -1; 52276404edcSAsim Jamshed } 52376404edcSAsim Jamshed 52476404edcSAsim Jamshed /* create a monitor-specific event queue */ 52576404edcSAsim Jamshed monitor->eq = CreateEventQueue(g_config.mos->max_concurrency); 52676404edcSAsim Jamshed if (!monitor->eq) { 52776404edcSAsim Jamshed TRACE_API("Can't create event queue (concurrency: %d) for " 52876404edcSAsim Jamshed "monitor read event registrations!\n", 52976404edcSAsim Jamshed g_config.mos->max_concurrency); 53076404edcSAsim Jamshed free(monitor); 53176404edcSAsim Jamshed errno = ENOMEM; 53276404edcSAsim Jamshed return -1; 53376404edcSAsim Jamshed } 53476404edcSAsim Jamshed 53576404edcSAsim Jamshed /* set monitor-related basic parameters */ 53676404edcSAsim Jamshed #ifndef NEWEV 53776404edcSAsim Jamshed monitor->ude_id = UDE_OFFSET; 53876404edcSAsim Jamshed #endif 53976404edcSAsim Jamshed monitor->socket = sock; 54076404edcSAsim Jamshed monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL; 54176404edcSAsim Jamshed 54276404edcSAsim Jamshed /* perform both sides monitoring by default */ 54376404edcSAsim Jamshed monitor->client_mon = monitor->server_mon = 1; 54476404edcSAsim Jamshed 54576404edcSAsim Jamshed /* add monitor socket to the monitor list */ 54676404edcSAsim Jamshed TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link); 54776404edcSAsim Jamshed 54876404edcSAsim Jamshed mtcp->msmap[sockid].monitor_listener = monitor; 54976404edcSAsim Jamshed 55076404edcSAsim Jamshed return 0; 55176404edcSAsim Jamshed } 55276404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 55376404edcSAsim Jamshed int 55476404edcSAsim Jamshed mtcp_socket(mctx_t mctx, int domain, int type, int protocol) 55576404edcSAsim Jamshed { 55676404edcSAsim Jamshed mtcp_manager_t mtcp; 55776404edcSAsim Jamshed socket_map_t socket; 55876404edcSAsim Jamshed 55976404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 56076404edcSAsim Jamshed if (!mtcp) { 56176404edcSAsim Jamshed errno = EACCES; 56276404edcSAsim Jamshed return -1; 56376404edcSAsim Jamshed } 56476404edcSAsim Jamshed 56576404edcSAsim Jamshed if (domain != AF_INET) { 56676404edcSAsim Jamshed errno = EAFNOSUPPORT; 56776404edcSAsim Jamshed return -1; 56876404edcSAsim Jamshed } 56976404edcSAsim Jamshed 57076404edcSAsim Jamshed if (type == SOCK_STREAM) { 57176404edcSAsim Jamshed type = MOS_SOCK_STREAM; 57276404edcSAsim Jamshed } else if (type == MOS_SOCK_MONITOR_STREAM || 57376404edcSAsim Jamshed type == MOS_SOCK_MONITOR_RAW) { 57476404edcSAsim Jamshed /* do nothing for the time being */ 57576404edcSAsim Jamshed } else { 57676404edcSAsim Jamshed /* Not supported type */ 57776404edcSAsim Jamshed errno = EINVAL; 57876404edcSAsim Jamshed return -1; 57976404edcSAsim Jamshed } 58076404edcSAsim Jamshed 58176404edcSAsim Jamshed socket = AllocateSocket(mctx, type); 58276404edcSAsim Jamshed if (!socket) { 58376404edcSAsim Jamshed errno = ENFILE; 58476404edcSAsim Jamshed return -1; 58576404edcSAsim Jamshed } 58676404edcSAsim Jamshed 58776404edcSAsim Jamshed if (type == MOS_SOCK_MONITOR_STREAM || 58876404edcSAsim Jamshed type == MOS_SOCK_MONITOR_RAW) { 58976404edcSAsim Jamshed mtcp_manager_t mtcp = GetMTCPManager(mctx); 59076404edcSAsim Jamshed if (!mtcp) { 59176404edcSAsim Jamshed errno = EACCES; 59276404edcSAsim Jamshed return -1; 59376404edcSAsim Jamshed } 59476404edcSAsim Jamshed mtcp_monitor(mctx, socket); 59576404edcSAsim Jamshed #ifdef NEWEV 59676404edcSAsim Jamshed socket->monitor_listener->stree_dontcare = NULL; 59776404edcSAsim Jamshed socket->monitor_listener->stree_pre_rcv = NULL; 59876404edcSAsim Jamshed socket->monitor_listener->stree_post_snd = NULL; 59976404edcSAsim Jamshed #else 60076404edcSAsim Jamshed InitEvB(mtcp, &socket->monitor_listener->dontcare_evb); 60176404edcSAsim Jamshed InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb); 60276404edcSAsim Jamshed InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb); 60376404edcSAsim Jamshed #endif 60476404edcSAsim Jamshed } 60576404edcSAsim Jamshed 60676404edcSAsim Jamshed return socket->id; 60776404edcSAsim Jamshed } 60876404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 60976404edcSAsim Jamshed int 61076404edcSAsim Jamshed mtcp_bind(mctx_t mctx, int sockid, 61176404edcSAsim Jamshed const struct sockaddr *addr, socklen_t addrlen) 61276404edcSAsim Jamshed { 61376404edcSAsim Jamshed mtcp_manager_t mtcp; 61476404edcSAsim Jamshed struct sockaddr_in *addr_in; 61576404edcSAsim Jamshed 61676404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 61776404edcSAsim Jamshed if (!mtcp) { 61876404edcSAsim Jamshed errno = EACCES; 61976404edcSAsim Jamshed return -1; 62076404edcSAsim Jamshed } 62176404edcSAsim Jamshed 62276404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 62376404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 62476404edcSAsim Jamshed errno = EBADF; 62576404edcSAsim Jamshed return -1; 62676404edcSAsim Jamshed } 62776404edcSAsim Jamshed 62876404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 62976404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 63076404edcSAsim Jamshed errno = EBADF; 63176404edcSAsim Jamshed return -1; 63276404edcSAsim Jamshed } 63376404edcSAsim Jamshed 63476404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM && 63576404edcSAsim Jamshed mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 63676404edcSAsim Jamshed TRACE_API("Not a stream socket id: %d\n", sockid); 63776404edcSAsim Jamshed errno = ENOTSOCK; 63876404edcSAsim Jamshed return -1; 63976404edcSAsim Jamshed } 64076404edcSAsim Jamshed 64176404edcSAsim Jamshed if (!addr) { 64276404edcSAsim Jamshed TRACE_API("Socket %d: empty address!\n", sockid); 64376404edcSAsim Jamshed errno = EINVAL; 64476404edcSAsim Jamshed return -1; 64576404edcSAsim Jamshed } 64676404edcSAsim Jamshed 64776404edcSAsim Jamshed if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) { 64876404edcSAsim Jamshed TRACE_API("Socket %d: adress already bind for this socket.\n", sockid); 64976404edcSAsim Jamshed errno = EINVAL; 65076404edcSAsim Jamshed return -1; 65176404edcSAsim Jamshed } 65276404edcSAsim Jamshed 65376404edcSAsim Jamshed /* we only allow bind() for AF_INET address */ 65476404edcSAsim Jamshed if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 65576404edcSAsim Jamshed TRACE_API("Socket %d: invalid argument!\n", sockid); 65676404edcSAsim Jamshed errno = EINVAL; 65776404edcSAsim Jamshed return -1; 65876404edcSAsim Jamshed } 65976404edcSAsim Jamshed 66076404edcSAsim Jamshed if (mtcp->listener) { 66176404edcSAsim Jamshed TRACE_API("Address already bound!\n"); 66276404edcSAsim Jamshed errno = EINVAL; 66376404edcSAsim Jamshed return -1; 66476404edcSAsim Jamshed } 66576404edcSAsim Jamshed addr_in = (struct sockaddr_in *)addr; 66676404edcSAsim Jamshed mtcp->smap[sockid].saddr = *addr_in; 66776404edcSAsim Jamshed mtcp->smap[sockid].opts |= MTCP_ADDR_BIND; 66876404edcSAsim Jamshed 66976404edcSAsim Jamshed return 0; 67076404edcSAsim Jamshed } 67176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 67276404edcSAsim Jamshed int 67376404edcSAsim Jamshed mtcp_listen(mctx_t mctx, int sockid, int backlog) 67476404edcSAsim Jamshed { 67576404edcSAsim Jamshed mtcp_manager_t mtcp; 67676404edcSAsim Jamshed struct tcp_listener *listener; 67776404edcSAsim Jamshed 67876404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 67976404edcSAsim Jamshed if (!mtcp) { 68076404edcSAsim Jamshed errno = EACCES; 68176404edcSAsim Jamshed return -1; 68276404edcSAsim Jamshed } 68376404edcSAsim Jamshed 68476404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 68576404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 68676404edcSAsim Jamshed errno = EBADF; 68776404edcSAsim Jamshed return -1; 68876404edcSAsim Jamshed } 68976404edcSAsim Jamshed 69076404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 69176404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 69276404edcSAsim Jamshed errno = EBADF; 69376404edcSAsim Jamshed return -1; 69476404edcSAsim Jamshed } 69576404edcSAsim Jamshed 69676404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) { 69776404edcSAsim Jamshed mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN; 69876404edcSAsim Jamshed } 69976404edcSAsim Jamshed 70076404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 70176404edcSAsim Jamshed TRACE_API("Not a listening socket. id: %d\n", sockid); 70276404edcSAsim Jamshed errno = ENOTSOCK; 70376404edcSAsim Jamshed return -1; 70476404edcSAsim Jamshed } 70576404edcSAsim Jamshed 70676404edcSAsim Jamshed if (backlog <= 0 || backlog > g_config.mos->max_concurrency) { 70776404edcSAsim Jamshed errno = EINVAL; 70876404edcSAsim Jamshed return -1; 70976404edcSAsim Jamshed } 71076404edcSAsim Jamshed 71176404edcSAsim Jamshed listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener)); 71276404edcSAsim Jamshed if (!listener) { 71376404edcSAsim Jamshed /* errno set from the malloc() */ 71476404edcSAsim Jamshed errno = ENOMEM; 71576404edcSAsim Jamshed return -1; 71676404edcSAsim Jamshed } 71776404edcSAsim Jamshed 71876404edcSAsim Jamshed listener->sockid = sockid; 71976404edcSAsim Jamshed listener->backlog = backlog; 72076404edcSAsim Jamshed listener->socket = &mtcp->smap[sockid]; 72176404edcSAsim Jamshed 72276404edcSAsim Jamshed if (pthread_cond_init(&listener->accept_cond, NULL)) { 72376404edcSAsim Jamshed perror("pthread_cond_init of ctx->accept_cond\n"); 72476404edcSAsim Jamshed /* errno set by pthread_cond_init() */ 72576404edcSAsim Jamshed return -1; 72676404edcSAsim Jamshed } 72776404edcSAsim Jamshed if (pthread_mutex_init(&listener->accept_lock, NULL)) { 72876404edcSAsim Jamshed perror("pthread_mutex_init of ctx->accept_lock\n"); 72976404edcSAsim Jamshed /* errno set by pthread_mutex_init() */ 73076404edcSAsim Jamshed return -1; 73176404edcSAsim Jamshed } 73276404edcSAsim Jamshed 73376404edcSAsim Jamshed listener->acceptq = CreateStreamQueue(backlog); 73476404edcSAsim Jamshed if (!listener->acceptq) { 73576404edcSAsim Jamshed errno = ENOMEM; 73676404edcSAsim Jamshed return -1; 73776404edcSAsim Jamshed } 73876404edcSAsim Jamshed 73976404edcSAsim Jamshed mtcp->smap[sockid].listener = listener; 74076404edcSAsim Jamshed mtcp->listener = listener; 74176404edcSAsim Jamshed 74276404edcSAsim Jamshed return 0; 74376404edcSAsim Jamshed } 74476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 74576404edcSAsim Jamshed int 74676404edcSAsim Jamshed mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen) 74776404edcSAsim Jamshed { 74876404edcSAsim Jamshed mtcp_manager_t mtcp; 74976404edcSAsim Jamshed struct tcp_listener *listener; 75076404edcSAsim Jamshed socket_map_t socket; 75176404edcSAsim Jamshed tcp_stream *accepted = NULL; 75276404edcSAsim Jamshed 75376404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 75476404edcSAsim Jamshed if (!mtcp) { 75576404edcSAsim Jamshed errno = EACCES; 75676404edcSAsim Jamshed return -1; 75776404edcSAsim Jamshed } 75876404edcSAsim Jamshed 75976404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 76076404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 76176404edcSAsim Jamshed errno = EBADF; 76276404edcSAsim Jamshed return -1; 76376404edcSAsim Jamshed } 76476404edcSAsim Jamshed 76576404edcSAsim Jamshed /* requires listening socket */ 76676404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 76776404edcSAsim Jamshed errno = EINVAL; 76876404edcSAsim Jamshed return -1; 76976404edcSAsim Jamshed } 77076404edcSAsim Jamshed 77176404edcSAsim Jamshed listener = mtcp->smap[sockid].listener; 77276404edcSAsim Jamshed 77376404edcSAsim Jamshed /* dequeue from the acceptq without lock first */ 77476404edcSAsim Jamshed /* if nothing there, acquire lock and cond_wait */ 77576404edcSAsim Jamshed accepted = StreamDequeue(listener->acceptq); 77676404edcSAsim Jamshed if (!accepted) { 77776404edcSAsim Jamshed if (listener->socket->opts & MTCP_NONBLOCK) { 77876404edcSAsim Jamshed errno = EAGAIN; 77976404edcSAsim Jamshed return -1; 78076404edcSAsim Jamshed 78176404edcSAsim Jamshed } else { 78276404edcSAsim Jamshed pthread_mutex_lock(&listener->accept_lock); 78376404edcSAsim Jamshed while ((accepted = StreamDequeue(listener->acceptq)) == NULL) { 78476404edcSAsim Jamshed pthread_cond_wait(&listener->accept_cond, &listener->accept_lock); 78576404edcSAsim Jamshed 78676404edcSAsim Jamshed if (mtcp->ctx->done || mtcp->ctx->exit) { 78776404edcSAsim Jamshed pthread_mutex_unlock(&listener->accept_lock); 78876404edcSAsim Jamshed errno = EINTR; 78976404edcSAsim Jamshed return -1; 79076404edcSAsim Jamshed } 79176404edcSAsim Jamshed } 79276404edcSAsim Jamshed pthread_mutex_unlock(&listener->accept_lock); 79376404edcSAsim Jamshed } 79476404edcSAsim Jamshed } 79576404edcSAsim Jamshed 79676404edcSAsim Jamshed if (!accepted) { 79776404edcSAsim Jamshed TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n"); 79876404edcSAsim Jamshed } 79976404edcSAsim Jamshed 80076404edcSAsim Jamshed if (!accepted->socket) { 80176404edcSAsim Jamshed socket = AllocateSocket(mctx, MOS_SOCK_STREAM); 80276404edcSAsim Jamshed if (!socket) { 80376404edcSAsim Jamshed TRACE_ERROR("Failed to create new socket!\n"); 80476404edcSAsim Jamshed /* TODO: destroy the stream */ 80576404edcSAsim Jamshed errno = ENFILE; 80676404edcSAsim Jamshed return -1; 80776404edcSAsim Jamshed } 80876404edcSAsim Jamshed socket->stream = accepted; 80976404edcSAsim Jamshed accepted->socket = socket; 81076404edcSAsim Jamshed /* if monitor is enabled, complete the socket assignment */ 81176404edcSAsim Jamshed if (socket->stream->pair_stream != NULL) 81276404edcSAsim Jamshed socket->stream->pair_stream->socket = socket; 81376404edcSAsim Jamshed } 81476404edcSAsim Jamshed 81576404edcSAsim Jamshed TRACE_API("Stream %d accepted.\n", accepted->id); 81676404edcSAsim Jamshed 81776404edcSAsim Jamshed if (addr && addrlen) { 81876404edcSAsim Jamshed struct sockaddr_in *addr_in = (struct sockaddr_in *)addr; 81976404edcSAsim Jamshed addr_in->sin_family = AF_INET; 82076404edcSAsim Jamshed addr_in->sin_port = accepted->dport; 82176404edcSAsim Jamshed addr_in->sin_addr.s_addr = accepted->daddr; 82276404edcSAsim Jamshed *addrlen = sizeof(struct sockaddr_in); 82376404edcSAsim Jamshed } 82476404edcSAsim Jamshed 82576404edcSAsim Jamshed return accepted->socket->id; 82676404edcSAsim Jamshed } 82776404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 82876404edcSAsim Jamshed int 82976404edcSAsim Jamshed mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr, 83076404edcSAsim Jamshed in_addr_t daddr, in_addr_t dport) 83176404edcSAsim Jamshed { 83276404edcSAsim Jamshed mtcp_manager_t mtcp; 83376404edcSAsim Jamshed addr_pool_t ap; 83476404edcSAsim Jamshed 83576404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 83676404edcSAsim Jamshed if (!mtcp) { 83776404edcSAsim Jamshed errno = EACCES; 83876404edcSAsim Jamshed return -1; 83976404edcSAsim Jamshed } 84076404edcSAsim Jamshed 84176404edcSAsim Jamshed if (saddr_base == INADDR_ANY) { 84276404edcSAsim Jamshed int nif_out; 84376404edcSAsim Jamshed 84476404edcSAsim Jamshed /* for the INADDR_ANY, find the output interface for the destination 84576404edcSAsim Jamshed and set the saddr_base as the ip address of the output interface */ 84676404edcSAsim Jamshed nif_out = GetOutputInterface(daddr); 84776404edcSAsim Jamshed saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr; 84876404edcSAsim Jamshed } 84976404edcSAsim Jamshed 85076404edcSAsim Jamshed ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus, 85176404edcSAsim Jamshed saddr_base, num_addr, daddr, dport); 85276404edcSAsim Jamshed if (!ap) { 85376404edcSAsim Jamshed errno = ENOMEM; 85476404edcSAsim Jamshed return -1; 85576404edcSAsim Jamshed } 85676404edcSAsim Jamshed 85776404edcSAsim Jamshed mtcp->ap = ap; 85876404edcSAsim Jamshed 85976404edcSAsim Jamshed return 0; 86076404edcSAsim Jamshed } 86176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 86276404edcSAsim Jamshed int 86376404edcSAsim Jamshed eval_bpf_5tuple(struct sfbpf_program fcode, 86476404edcSAsim Jamshed in_addr_t saddr, in_port_t sport, 86576404edcSAsim Jamshed in_addr_t daddr, in_port_t dport) { 86676404edcSAsim Jamshed uint8_t buf[TOTAL_TCP_HEADER_LEN]; 86776404edcSAsim Jamshed struct ethhdr *ethh; 86876404edcSAsim Jamshed struct iphdr *iph; 86976404edcSAsim Jamshed struct tcphdr *tcph; 87076404edcSAsim Jamshed 87176404edcSAsim Jamshed ethh = (struct ethhdr *)buf; 87276404edcSAsim Jamshed ethh->h_proto = htons(ETH_P_IP); 87376404edcSAsim Jamshed iph = (struct iphdr *)(ethh + 1); 87476404edcSAsim Jamshed iph->ihl = IP_HEADER_LEN >> 2; 87576404edcSAsim Jamshed iph->version = 4; 87676404edcSAsim Jamshed iph->tos = 0; 87776404edcSAsim Jamshed iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN); 87876404edcSAsim Jamshed iph->id = htons(0); 87976404edcSAsim Jamshed iph->protocol = IPPROTO_TCP; 88076404edcSAsim Jamshed iph->saddr = saddr; 88176404edcSAsim Jamshed iph->daddr = daddr; 88276404edcSAsim Jamshed iph->check = 0; 88376404edcSAsim Jamshed tcph = (struct tcphdr *)(iph + 1); 88476404edcSAsim Jamshed tcph->source = sport; 88576404edcSAsim Jamshed tcph->dest = dport; 88676404edcSAsim Jamshed 88776404edcSAsim Jamshed return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr), 88876404edcSAsim Jamshed TOTAL_TCP_HEADER_LEN); 88976404edcSAsim Jamshed } 89076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 89176404edcSAsim Jamshed int 89276404edcSAsim Jamshed mtcp_connect(mctx_t mctx, int sockid, 89376404edcSAsim Jamshed const struct sockaddr *addr, socklen_t addrlen) 89476404edcSAsim Jamshed { 89576404edcSAsim Jamshed mtcp_manager_t mtcp; 89676404edcSAsim Jamshed socket_map_t socket; 89776404edcSAsim Jamshed tcp_stream *cur_stream; 89876404edcSAsim Jamshed struct sockaddr_in *addr_in; 89976404edcSAsim Jamshed in_addr_t dip; 90076404edcSAsim Jamshed in_port_t dport; 90176404edcSAsim Jamshed int is_dyn_bound = FALSE; 90276404edcSAsim Jamshed int ret; 90376404edcSAsim Jamshed int cnt_match = 0; 90476404edcSAsim Jamshed struct mon_listener *walk; 90576404edcSAsim Jamshed struct sfbpf_program fcode; 90676404edcSAsim Jamshed 90776404edcSAsim Jamshed cur_stream = NULL; 90876404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 90976404edcSAsim Jamshed if (!mtcp) { 91076404edcSAsim Jamshed errno = EACCES; 91176404edcSAsim Jamshed return -1; 91276404edcSAsim Jamshed } 91376404edcSAsim Jamshed 91476404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 91576404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 91676404edcSAsim Jamshed errno = EBADF; 91776404edcSAsim Jamshed return -1; 91876404edcSAsim Jamshed } 91976404edcSAsim Jamshed 92076404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 92176404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 92276404edcSAsim Jamshed errno = EBADF; 92376404edcSAsim Jamshed return -1; 92476404edcSAsim Jamshed } 92576404edcSAsim Jamshed 92676404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 92776404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 92876404edcSAsim Jamshed errno = ENOTSOCK; 92976404edcSAsim Jamshed return -1; 93076404edcSAsim Jamshed } 93176404edcSAsim Jamshed 93276404edcSAsim Jamshed if (!addr) { 93376404edcSAsim Jamshed TRACE_API("Socket %d: empty address!\n", sockid); 93476404edcSAsim Jamshed errno = EFAULT; 93576404edcSAsim Jamshed return -1; 93676404edcSAsim Jamshed } 93776404edcSAsim Jamshed 93876404edcSAsim Jamshed /* we only allow bind() for AF_INET address */ 93976404edcSAsim Jamshed if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 94076404edcSAsim Jamshed TRACE_API("Socket %d: invalid argument!\n", sockid); 94176404edcSAsim Jamshed errno = EAFNOSUPPORT; 94276404edcSAsim Jamshed return -1; 94376404edcSAsim Jamshed } 94476404edcSAsim Jamshed 94576404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 94676404edcSAsim Jamshed if (socket->stream) { 94776404edcSAsim Jamshed TRACE_API("Socket %d: stream already exist!\n", sockid); 94876404edcSAsim Jamshed if (socket->stream->state >= TCP_ST_ESTABLISHED) { 94976404edcSAsim Jamshed errno = EISCONN; 95076404edcSAsim Jamshed } else { 95176404edcSAsim Jamshed errno = EALREADY; 95276404edcSAsim Jamshed } 95376404edcSAsim Jamshed return -1; 95476404edcSAsim Jamshed } 95576404edcSAsim Jamshed 95676404edcSAsim Jamshed addr_in = (struct sockaddr_in *)addr; 95776404edcSAsim Jamshed dip = addr_in->sin_addr.s_addr; 95876404edcSAsim Jamshed dport = addr_in->sin_port; 95976404edcSAsim Jamshed 96076404edcSAsim Jamshed /* address binding */ 96176404edcSAsim Jamshed if (socket->opts & MTCP_ADDR_BIND && 96276404edcSAsim Jamshed socket->saddr.sin_port != INPORT_ANY && 96376404edcSAsim Jamshed socket->saddr.sin_addr.s_addr != INADDR_ANY) { 96476404edcSAsim Jamshed int rss_core; 96576404edcSAsim Jamshed 96676404edcSAsim Jamshed rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip, 96776404edcSAsim Jamshed socket->saddr.sin_port, dport, num_queues); 96876404edcSAsim Jamshed 96976404edcSAsim Jamshed if (rss_core != mctx->cpu) { 97076404edcSAsim Jamshed errno = EINVAL; 97176404edcSAsim Jamshed return -1; 97276404edcSAsim Jamshed } 97376404edcSAsim Jamshed } else { 97476404edcSAsim Jamshed if (mtcp->ap) { 97576404edcSAsim Jamshed ret = FetchAddress(mtcp->ap, 97676404edcSAsim Jamshed mctx->cpu, num_queues, addr_in, &socket->saddr); 97776404edcSAsim Jamshed } else { 97876404edcSAsim Jamshed ret = FetchAddress(ap, 97976404edcSAsim Jamshed mctx->cpu, num_queues, addr_in, &socket->saddr); 98076404edcSAsim Jamshed } 98176404edcSAsim Jamshed if (ret < 0) { 98276404edcSAsim Jamshed errno = EAGAIN; 98376404edcSAsim Jamshed return -1; 98476404edcSAsim Jamshed } 98576404edcSAsim Jamshed socket->opts |= MTCP_ADDR_BIND; 98676404edcSAsim Jamshed is_dyn_bound = TRUE; 98776404edcSAsim Jamshed } 98876404edcSAsim Jamshed 98976404edcSAsim Jamshed cnt_match = 0; 99076404edcSAsim Jamshed if (mtcp->num_msp > 0) { 99176404edcSAsim Jamshed TAILQ_FOREACH(walk, &mtcp->monitors, link) { 99276404edcSAsim Jamshed fcode = walk->stream_syn_fcode; 99376404edcSAsim Jamshed if (!(ISSET_BPFFILTER(fcode) && 99476404edcSAsim Jamshed eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr, 99576404edcSAsim Jamshed socket->saddr.sin_port, 99676404edcSAsim Jamshed dip, dport) == 0)) { 99776404edcSAsim Jamshed walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1 99876404edcSAsim Jamshed cnt_match++; 99976404edcSAsim Jamshed } 100076404edcSAsim Jamshed } 100176404edcSAsim Jamshed } 100276404edcSAsim Jamshed 100376404edcSAsim Jamshed if (mtcp->num_msp > 0 && cnt_match > 0) { 100476404edcSAsim Jamshed /* 150820 dhkim: XXX: embedded mode is not verified */ 100576404edcSAsim Jamshed #if 1 100676404edcSAsim Jamshed cur_stream = CreateClientTCPStream(mtcp, socket, 100776404edcSAsim Jamshed STREAM_TYPE(MOS_SOCK_STREAM) | 100876404edcSAsim Jamshed STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 100976404edcSAsim Jamshed socket->saddr.sin_addr.s_addr, 101076404edcSAsim Jamshed socket->saddr.sin_port, dip, dport, NULL); 101176404edcSAsim Jamshed #else 101276404edcSAsim Jamshed cur_stream = CreateDualTCPStream(mtcp, socket, 101376404edcSAsim Jamshed STREAM_TYPE(MOS_SOCK_STREAM) | 101476404edcSAsim Jamshed STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 101576404edcSAsim Jamshed socket->saddr.sin_addr.s_addr, 101676404edcSAsim Jamshed socket->saddr.sin_port, dip, dport, NULL); 101776404edcSAsim Jamshed #endif 101876404edcSAsim Jamshed } 101976404edcSAsim Jamshed else 102076404edcSAsim Jamshed cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM), 102176404edcSAsim Jamshed socket->saddr.sin_addr.s_addr, 102276404edcSAsim Jamshed socket->saddr.sin_port, dip, dport, NULL); 102376404edcSAsim Jamshed if (!cur_stream) { 102476404edcSAsim Jamshed TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid); 102576404edcSAsim Jamshed errno = ENOMEM; 102676404edcSAsim Jamshed return -1; 102776404edcSAsim Jamshed } 102876404edcSAsim Jamshed 102976404edcSAsim Jamshed if (is_dyn_bound) 103076404edcSAsim Jamshed cur_stream->is_bound_addr = TRUE; 103176404edcSAsim Jamshed cur_stream->sndvar->cwnd = 1; 103276404edcSAsim Jamshed cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10; 103376404edcSAsim Jamshed cur_stream->side = MOS_SIDE_CLI; 103476404edcSAsim Jamshed /* if monitor is enabled, update the pair stream side as well */ 103576404edcSAsim Jamshed if (cur_stream->pair_stream) { 103676404edcSAsim Jamshed cur_stream->pair_stream->side = MOS_SIDE_SVR; 103776404edcSAsim Jamshed /* 103876404edcSAsim Jamshed * if buffer management is off, then disable 103976404edcSAsim Jamshed * monitoring tcp ring of server... 104076404edcSAsim Jamshed * if there is even a single monitor asking for 104176404edcSAsim Jamshed * buffer management, enable it (that's why the 104276404edcSAsim Jamshed * need for the loop) 104376404edcSAsim Jamshed */ 104476404edcSAsim Jamshed cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF; 104576404edcSAsim Jamshed struct socket_map *walk; 104676404edcSAsim Jamshed SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 104776404edcSAsim Jamshed uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt; 104876404edcSAsim Jamshed if (bm > cur_stream->pair_stream->buffer_mgmt) { 104976404edcSAsim Jamshed cur_stream->pair_stream->buffer_mgmt = bm; 105076404edcSAsim Jamshed break; 105176404edcSAsim Jamshed } 105276404edcSAsim Jamshed } SOCKQ_FOREACH_END; 105376404edcSAsim Jamshed } 105476404edcSAsim Jamshed 105576404edcSAsim Jamshed cur_stream->state = TCP_ST_SYN_SENT; 105676404edcSAsim Jamshed cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 105776404edcSAsim Jamshed 105876404edcSAsim Jamshed TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id); 105976404edcSAsim Jamshed 106076404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->connect_lock); 106176404edcSAsim Jamshed ret = StreamEnqueue(mtcp->connectq, cur_stream); 106276404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->connect_lock); 106376404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 106476404edcSAsim Jamshed if (ret < 0) { 106576404edcSAsim Jamshed TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid); 106676404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock); 106776404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream); 106876404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 106976404edcSAsim Jamshed errno = EAGAIN; 107076404edcSAsim Jamshed return -1; 107176404edcSAsim Jamshed } 107276404edcSAsim Jamshed 107376404edcSAsim Jamshed /* if nonblocking socket, return EINPROGRESS */ 107476404edcSAsim Jamshed if (socket->opts & MTCP_NONBLOCK) { 107576404edcSAsim Jamshed errno = EINPROGRESS; 107676404edcSAsim Jamshed return -1; 107776404edcSAsim Jamshed 107876404edcSAsim Jamshed } else { 107976404edcSAsim Jamshed while (1) { 108076404edcSAsim Jamshed if (!cur_stream) { 108176404edcSAsim Jamshed TRACE_ERROR("STREAM DESTROYED\n"); 108276404edcSAsim Jamshed errno = ETIMEDOUT; 108376404edcSAsim Jamshed return -1; 108476404edcSAsim Jamshed } 108576404edcSAsim Jamshed if (cur_stream->state > TCP_ST_ESTABLISHED) { 108676404edcSAsim Jamshed TRACE_ERROR("Socket %d: weird state %s\n", 108776404edcSAsim Jamshed sockid, TCPStateToString(cur_stream)); 108876404edcSAsim Jamshed // TODO: how to handle this? 108976404edcSAsim Jamshed errno = ENOSYS; 109076404edcSAsim Jamshed return -1; 109176404edcSAsim Jamshed } 109276404edcSAsim Jamshed 109376404edcSAsim Jamshed if (cur_stream->state == TCP_ST_ESTABLISHED) { 109476404edcSAsim Jamshed break; 109576404edcSAsim Jamshed } 109676404edcSAsim Jamshed usleep(1000); 109776404edcSAsim Jamshed } 109876404edcSAsim Jamshed } 109976404edcSAsim Jamshed 110076404edcSAsim Jamshed return 0; 110176404edcSAsim Jamshed } 110276404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 110376404edcSAsim Jamshed static inline int 110476404edcSAsim Jamshed CloseStreamSocket(mctx_t mctx, int sockid) 110576404edcSAsim Jamshed { 110676404edcSAsim Jamshed mtcp_manager_t mtcp; 110776404edcSAsim Jamshed tcp_stream *cur_stream; 110876404edcSAsim Jamshed int ret; 110976404edcSAsim Jamshed 111076404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 111176404edcSAsim Jamshed if (!mtcp) { 111276404edcSAsim Jamshed errno = EACCES; 111376404edcSAsim Jamshed return -1; 111476404edcSAsim Jamshed } 111576404edcSAsim Jamshed 111676404edcSAsim Jamshed cur_stream = mtcp->smap[sockid].stream; 111776404edcSAsim Jamshed if (!cur_stream) { 111876404edcSAsim Jamshed TRACE_API("Socket %d: stream does not exist.\n", sockid); 111976404edcSAsim Jamshed errno = ENOTCONN; 112076404edcSAsim Jamshed return -1; 112176404edcSAsim Jamshed } 112276404edcSAsim Jamshed 112376404edcSAsim Jamshed if (cur_stream->closed) { 112476404edcSAsim Jamshed TRACE_API("Socket %d (Stream %u): already closed stream\n", 112576404edcSAsim Jamshed sockid, cur_stream->id); 112676404edcSAsim Jamshed return 0; 112776404edcSAsim Jamshed } 112876404edcSAsim Jamshed cur_stream->closed = TRUE; 112976404edcSAsim Jamshed 113076404edcSAsim Jamshed TRACE_API("Stream %d: closing the stream.\n", cur_stream->id); 113176404edcSAsim Jamshed 113276404edcSAsim Jamshed /* 141029 dhkim: Check this! */ 113376404edcSAsim Jamshed cur_stream->socket = NULL; 113476404edcSAsim Jamshed 113576404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 113676404edcSAsim Jamshed TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n", 113776404edcSAsim Jamshed cur_stream->id); 113876404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock); 113976404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream); 114076404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 114176404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 114276404edcSAsim Jamshed return 0; 114376404edcSAsim Jamshed 114476404edcSAsim Jamshed } else if (cur_stream->state == TCP_ST_SYN_SENT) { 114576404edcSAsim Jamshed #if 1 114676404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock); 114776404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream); 114876404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 114976404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 115076404edcSAsim Jamshed #endif 115176404edcSAsim Jamshed return -1; 115276404edcSAsim Jamshed 115376404edcSAsim Jamshed } else if (cur_stream->state != TCP_ST_ESTABLISHED && 115476404edcSAsim Jamshed cur_stream->state != TCP_ST_CLOSE_WAIT) { 115576404edcSAsim Jamshed TRACE_API("Stream %d at state %s\n", 115676404edcSAsim Jamshed cur_stream->id, TCPStateToString(cur_stream)); 115776404edcSAsim Jamshed errno = EBADF; 115876404edcSAsim Jamshed return -1; 115976404edcSAsim Jamshed } 116076404edcSAsim Jamshed 116176404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->close_lock); 116276404edcSAsim Jamshed cur_stream->sndvar->on_closeq = TRUE; 116376404edcSAsim Jamshed ret = StreamEnqueue(mtcp->closeq, cur_stream); 116476404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 116576404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->close_lock); 116676404edcSAsim Jamshed 116776404edcSAsim Jamshed if (ret < 0) { 116876404edcSAsim Jamshed TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 116976404edcSAsim Jamshed errno = EAGAIN; 117076404edcSAsim Jamshed return -1; 117176404edcSAsim Jamshed } 117276404edcSAsim Jamshed 117376404edcSAsim Jamshed return 0; 117476404edcSAsim Jamshed } 117576404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 117676404edcSAsim Jamshed static inline int 117776404edcSAsim Jamshed CloseListeningSocket(mctx_t mctx, int sockid) 117876404edcSAsim Jamshed { 117976404edcSAsim Jamshed mtcp_manager_t mtcp; 118076404edcSAsim Jamshed struct tcp_listener *listener; 118176404edcSAsim Jamshed 118276404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 118376404edcSAsim Jamshed if (!mtcp) { 118476404edcSAsim Jamshed errno = EACCES; 118576404edcSAsim Jamshed return -1; 118676404edcSAsim Jamshed } 118776404edcSAsim Jamshed 118876404edcSAsim Jamshed listener = mtcp->smap[sockid].listener; 118976404edcSAsim Jamshed if (!listener) { 119076404edcSAsim Jamshed errno = EINVAL; 119176404edcSAsim Jamshed return -1; 119276404edcSAsim Jamshed } 119376404edcSAsim Jamshed 119476404edcSAsim Jamshed if (listener->acceptq) { 119576404edcSAsim Jamshed DestroyStreamQueue(listener->acceptq); 119676404edcSAsim Jamshed listener->acceptq = NULL; 119776404edcSAsim Jamshed } 119876404edcSAsim Jamshed 119976404edcSAsim Jamshed pthread_mutex_lock(&listener->accept_lock); 120076404edcSAsim Jamshed pthread_cond_signal(&listener->accept_cond); 120176404edcSAsim Jamshed pthread_mutex_unlock(&listener->accept_lock); 120276404edcSAsim Jamshed 120376404edcSAsim Jamshed pthread_cond_destroy(&listener->accept_cond); 120476404edcSAsim Jamshed pthread_mutex_destroy(&listener->accept_lock); 120576404edcSAsim Jamshed 120676404edcSAsim Jamshed free(listener); 120776404edcSAsim Jamshed mtcp->smap[sockid].listener = NULL; 120876404edcSAsim Jamshed 120976404edcSAsim Jamshed return 0; 121076404edcSAsim Jamshed } 121176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 121276404edcSAsim Jamshed int 121376404edcSAsim Jamshed mtcp_close(mctx_t mctx, int sockid) 121476404edcSAsim Jamshed { 121576404edcSAsim Jamshed mtcp_manager_t mtcp; 121676404edcSAsim Jamshed int ret; 121776404edcSAsim Jamshed 121876404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 121976404edcSAsim Jamshed if (!mtcp) { 122076404edcSAsim Jamshed errno = EACCES; 122176404edcSAsim Jamshed return -1; 122276404edcSAsim Jamshed } 122376404edcSAsim Jamshed 122476404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 122576404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 122676404edcSAsim Jamshed errno = EBADF; 122776404edcSAsim Jamshed return -1; 122876404edcSAsim Jamshed } 122976404edcSAsim Jamshed 123076404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 123176404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 123276404edcSAsim Jamshed errno = EBADF; 123376404edcSAsim Jamshed return -1; 123476404edcSAsim Jamshed } 123576404edcSAsim Jamshed 123676404edcSAsim Jamshed TRACE_API("Socket %d: mtcp_close called.\n", sockid); 123776404edcSAsim Jamshed 123876404edcSAsim Jamshed switch (mtcp->smap[sockid].socktype) { 123976404edcSAsim Jamshed case MOS_SOCK_STREAM: 124076404edcSAsim Jamshed ret = CloseStreamSocket(mctx, sockid); 124176404edcSAsim Jamshed break; 124276404edcSAsim Jamshed 124376404edcSAsim Jamshed case MOS_SOCK_STREAM_LISTEN: 124476404edcSAsim Jamshed ret = CloseListeningSocket(mctx, sockid); 124576404edcSAsim Jamshed break; 124676404edcSAsim Jamshed 124776404edcSAsim Jamshed case MOS_SOCK_EPOLL: 124876404edcSAsim Jamshed ret = CloseEpollSocket(mctx, sockid); 124976404edcSAsim Jamshed break; 125076404edcSAsim Jamshed 125176404edcSAsim Jamshed case MOS_SOCK_PIPE: 125276404edcSAsim Jamshed ret = PipeClose(mctx, sockid); 125376404edcSAsim Jamshed break; 125476404edcSAsim Jamshed 125576404edcSAsim Jamshed default: 125676404edcSAsim Jamshed errno = EINVAL; 125776404edcSAsim Jamshed ret = -1; 125876404edcSAsim Jamshed break; 125976404edcSAsim Jamshed } 126076404edcSAsim Jamshed 126176404edcSAsim Jamshed FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 126276404edcSAsim Jamshed 126376404edcSAsim Jamshed return ret; 126476404edcSAsim Jamshed } 126576404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 126676404edcSAsim Jamshed int 126776404edcSAsim Jamshed mtcp_abort(mctx_t mctx, int sockid) 126876404edcSAsim Jamshed { 126976404edcSAsim Jamshed mtcp_manager_t mtcp; 127076404edcSAsim Jamshed tcp_stream *cur_stream; 127176404edcSAsim Jamshed int ret; 127276404edcSAsim Jamshed 127376404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 127476404edcSAsim Jamshed if (!mtcp) { 127576404edcSAsim Jamshed errno = EACCES; 127676404edcSAsim Jamshed return -1; 127776404edcSAsim Jamshed } 127876404edcSAsim Jamshed 127976404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 128076404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 128176404edcSAsim Jamshed errno = EBADF; 128276404edcSAsim Jamshed return -1; 128376404edcSAsim Jamshed } 128476404edcSAsim Jamshed 128576404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 128676404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 128776404edcSAsim Jamshed errno = EBADF; 128876404edcSAsim Jamshed return -1; 128976404edcSAsim Jamshed } 129076404edcSAsim Jamshed 129176404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 129276404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 129376404edcSAsim Jamshed errno = ENOTSOCK; 129476404edcSAsim Jamshed return -1; 129576404edcSAsim Jamshed } 129676404edcSAsim Jamshed 129776404edcSAsim Jamshed cur_stream = mtcp->smap[sockid].stream; 129876404edcSAsim Jamshed if (!cur_stream) { 129976404edcSAsim Jamshed TRACE_API("Stream %d: does not exist.\n", sockid); 130076404edcSAsim Jamshed errno = ENOTCONN; 130176404edcSAsim Jamshed return -1; 130276404edcSAsim Jamshed } 130376404edcSAsim Jamshed 130476404edcSAsim Jamshed TRACE_API("Socket %d: mtcp_abort()\n", sockid); 130576404edcSAsim Jamshed 130676404edcSAsim Jamshed FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 130776404edcSAsim Jamshed cur_stream->socket = NULL; 130876404edcSAsim Jamshed 130976404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 131076404edcSAsim Jamshed TRACE_API("Stream %d: connection already reset.\n", sockid); 131176404edcSAsim Jamshed return ERROR; 131276404edcSAsim Jamshed 131376404edcSAsim Jamshed } else if (cur_stream->state == TCP_ST_SYN_SENT) { 131476404edcSAsim Jamshed /* TODO: this should notify event failure to all 131576404edcSAsim Jamshed previous read() or write() calls */ 131676404edcSAsim Jamshed cur_stream->state = TCP_ST_CLOSED_RSVD; 131776404edcSAsim Jamshed cur_stream->close_reason = TCP_ACTIVE_CLOSE; 131876404edcSAsim Jamshed cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 131976404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock); 132076404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream); 132176404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 132276404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 132376404edcSAsim Jamshed return 0; 132476404edcSAsim Jamshed 132576404edcSAsim Jamshed } else if (cur_stream->state == TCP_ST_CLOSING || 132676404edcSAsim Jamshed cur_stream->state == TCP_ST_LAST_ACK || 132776404edcSAsim Jamshed cur_stream->state == TCP_ST_TIME_WAIT) { 132876404edcSAsim Jamshed cur_stream->state = TCP_ST_CLOSED_RSVD; 132976404edcSAsim Jamshed cur_stream->close_reason = TCP_ACTIVE_CLOSE; 133076404edcSAsim Jamshed cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 133176404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock); 133276404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream); 133376404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 133476404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 133576404edcSAsim Jamshed return 0; 133676404edcSAsim Jamshed } 133776404edcSAsim Jamshed 133876404edcSAsim Jamshed /* the stream structure will be destroyed after sending RST */ 133976404edcSAsim Jamshed if (cur_stream->sndvar->on_resetq) { 134076404edcSAsim Jamshed TRACE_ERROR("Stream %d: calling mtcp_abort() " 134176404edcSAsim Jamshed "when in reset queue.\n", sockid); 134276404edcSAsim Jamshed errno = ECONNRESET; 134376404edcSAsim Jamshed return -1; 134476404edcSAsim Jamshed } 134576404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->reset_lock); 134676404edcSAsim Jamshed cur_stream->sndvar->on_resetq = TRUE; 134776404edcSAsim Jamshed ret = StreamEnqueue(mtcp->resetq, cur_stream); 134876404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->reset_lock); 134976404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 135076404edcSAsim Jamshed 135176404edcSAsim Jamshed if (ret < 0) { 135276404edcSAsim Jamshed TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 135376404edcSAsim Jamshed errno = EAGAIN; 135476404edcSAsim Jamshed return -1; 135576404edcSAsim Jamshed } 135676404edcSAsim Jamshed 135776404edcSAsim Jamshed return 0; 135876404edcSAsim Jamshed } 135976404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 136076404edcSAsim Jamshed static inline int 136176404edcSAsim Jamshed CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len) 136276404edcSAsim Jamshed { 136376404edcSAsim Jamshed struct tcp_recv_vars *rcvvar = cur_stream->rcvvar; 136476404edcSAsim Jamshed int copylen; 136576404edcSAsim Jamshed tcprb_t *rb = rcvvar->rcvbuf; 136676404edcSAsim Jamshed if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) { 136776404edcSAsim Jamshed errno = EAGAIN; 136876404edcSAsim Jamshed return -1; 136976404edcSAsim Jamshed } 137076404edcSAsim Jamshed tcprb_setpile(rb, rb->pile + copylen); 137176404edcSAsim Jamshed 137276404edcSAsim Jamshed rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb); 137376404edcSAsim Jamshed //printf("rcv_wnd: %d\n", rcvvar->rcv_wnd); 137476404edcSAsim Jamshed 137576404edcSAsim Jamshed /* Advertise newly freed receive buffer */ 137676404edcSAsim Jamshed if (cur_stream->need_wnd_adv) { 137776404edcSAsim Jamshed if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) { 137876404edcSAsim Jamshed if (!cur_stream->sndvar->on_ackq) { 137976404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->ackq_lock); 138076404edcSAsim Jamshed cur_stream->sndvar->on_ackq = TRUE; 138176404edcSAsim Jamshed StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */ 138276404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->ackq_lock); 138376404edcSAsim Jamshed cur_stream->need_wnd_adv = FALSE; 138476404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 138576404edcSAsim Jamshed } 138676404edcSAsim Jamshed } 138776404edcSAsim Jamshed } 138876404edcSAsim Jamshed 138976404edcSAsim Jamshed return copylen; 139076404edcSAsim Jamshed } 139176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 139276404edcSAsim Jamshed ssize_t 139376404edcSAsim Jamshed mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len) 139476404edcSAsim Jamshed { 139576404edcSAsim Jamshed mtcp_manager_t mtcp; 139676404edcSAsim Jamshed socket_map_t socket; 139776404edcSAsim Jamshed tcp_stream *cur_stream; 139876404edcSAsim Jamshed struct tcp_recv_vars *rcvvar; 139976404edcSAsim Jamshed int event_remaining, merged_len; 140076404edcSAsim Jamshed int ret; 140176404edcSAsim Jamshed 140276404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 140376404edcSAsim Jamshed if (!mtcp) { 140476404edcSAsim Jamshed errno = EACCES; 140576404edcSAsim Jamshed return -1; 140676404edcSAsim Jamshed } 140776404edcSAsim Jamshed 140876404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 140976404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 141076404edcSAsim Jamshed errno = EBADF; 141176404edcSAsim Jamshed return -1; 141276404edcSAsim Jamshed } 141376404edcSAsim Jamshed 141476404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 141576404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 141676404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 141776404edcSAsim Jamshed errno = EBADF; 141876404edcSAsim Jamshed return -1; 141976404edcSAsim Jamshed } 142076404edcSAsim Jamshed 142176404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_PIPE) { 142276404edcSAsim Jamshed return PipeRead(mctx, sockid, buf, len); 142376404edcSAsim Jamshed } 142476404edcSAsim Jamshed 142576404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM) { 142676404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 142776404edcSAsim Jamshed errno = ENOTSOCK; 142876404edcSAsim Jamshed return -1; 142976404edcSAsim Jamshed } 143076404edcSAsim Jamshed 143176404edcSAsim Jamshed /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 143276404edcSAsim Jamshed cur_stream = socket->stream; 143376404edcSAsim Jamshed if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf || 143476404edcSAsim Jamshed !(cur_stream->state >= TCP_ST_ESTABLISHED && 143576404edcSAsim Jamshed cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 143676404edcSAsim Jamshed errno = ENOTCONN; 143776404edcSAsim Jamshed return -1; 143876404edcSAsim Jamshed } 143976404edcSAsim Jamshed 144076404edcSAsim Jamshed rcvvar = cur_stream->rcvvar; 144176404edcSAsim Jamshed 144276404edcSAsim Jamshed merged_len = tcprb_cflen(rcvvar->rcvbuf); 144376404edcSAsim Jamshed 144476404edcSAsim Jamshed /* if CLOSE_WAIT, return 0 if there is no payload */ 144576404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 144676404edcSAsim Jamshed if (!rcvvar->rcvbuf) 144776404edcSAsim Jamshed return 0; 144876404edcSAsim Jamshed 144976404edcSAsim Jamshed if (merged_len == 0) 145076404edcSAsim Jamshed return 0; 145176404edcSAsim Jamshed } 145276404edcSAsim Jamshed 145376404edcSAsim Jamshed /* return EAGAIN if no receive buffer */ 145476404edcSAsim Jamshed if (socket->opts & MTCP_NONBLOCK) { 145576404edcSAsim Jamshed if (!rcvvar->rcvbuf || merged_len == 0) { 145676404edcSAsim Jamshed errno = EAGAIN; 145776404edcSAsim Jamshed return -1; 145876404edcSAsim Jamshed } 145976404edcSAsim Jamshed } 146076404edcSAsim Jamshed 146176404edcSAsim Jamshed SBUF_LOCK(&rcvvar->read_lock); 146276404edcSAsim Jamshed 146376404edcSAsim Jamshed ret = CopyToUser(mtcp, cur_stream, buf, len); 146476404edcSAsim Jamshed 146576404edcSAsim Jamshed merged_len = tcprb_cflen(rcvvar->rcvbuf); 146676404edcSAsim Jamshed event_remaining = FALSE; 146776404edcSAsim Jamshed /* if there are remaining payload, generate EPOLLIN */ 146876404edcSAsim Jamshed /* (may due to insufficient user buffer) */ 146976404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLIN) { 147076404edcSAsim Jamshed if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 147176404edcSAsim Jamshed event_remaining = TRUE; 147276404edcSAsim Jamshed } 147376404edcSAsim Jamshed } 147476404edcSAsim Jamshed /* if waiting for close, notify it if no remaining data */ 147576404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT && 147676404edcSAsim Jamshed merged_len == 0 && ret > 0) { 147776404edcSAsim Jamshed event_remaining = TRUE; 147876404edcSAsim Jamshed } 147976404edcSAsim Jamshed 148076404edcSAsim Jamshed SBUF_UNLOCK(&rcvvar->read_lock); 148176404edcSAsim Jamshed 148276404edcSAsim Jamshed if (event_remaining) { 148376404edcSAsim Jamshed if (socket->epoll) { 148476404edcSAsim Jamshed AddEpollEvent(mtcp->ep, 148576404edcSAsim Jamshed USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 148676404edcSAsim Jamshed } 148776404edcSAsim Jamshed } 148876404edcSAsim Jamshed 148976404edcSAsim Jamshed TRACE_API("Stream %d: mtcp_read() returning %d\n", cur_stream->id, ret); 149076404edcSAsim Jamshed return ret; 149176404edcSAsim Jamshed } 149276404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 149376404edcSAsim Jamshed ssize_t 149476404edcSAsim Jamshed mtcp_readv(mctx_t mctx, int sockid, struct iovec *iov, int numIOV) 149576404edcSAsim Jamshed { 149676404edcSAsim Jamshed mtcp_manager_t mtcp; 149776404edcSAsim Jamshed socket_map_t socket; 149876404edcSAsim Jamshed tcp_stream *cur_stream; 149976404edcSAsim Jamshed struct tcp_recv_vars *rcvvar; 150076404edcSAsim Jamshed int ret, bytes_read, i; 150176404edcSAsim Jamshed int event_remaining, merged_len; 150276404edcSAsim Jamshed 150376404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 150476404edcSAsim Jamshed if (!mtcp) { 150576404edcSAsim Jamshed errno = EACCES; 150676404edcSAsim Jamshed return -1; 150776404edcSAsim Jamshed } 150876404edcSAsim Jamshed 150976404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 151076404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 151176404edcSAsim Jamshed errno = EBADF; 151276404edcSAsim Jamshed return -1; 151376404edcSAsim Jamshed } 151476404edcSAsim Jamshed 151576404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 151676404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 151776404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 151876404edcSAsim Jamshed errno = EBADF; 151976404edcSAsim Jamshed return -1; 152076404edcSAsim Jamshed } 152176404edcSAsim Jamshed 152276404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM) { 152376404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 152476404edcSAsim Jamshed errno = ENOTSOCK; 152576404edcSAsim Jamshed return -1; 152676404edcSAsim Jamshed } 152776404edcSAsim Jamshed 152876404edcSAsim Jamshed /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 152976404edcSAsim Jamshed cur_stream = socket->stream; 153076404edcSAsim Jamshed if (!cur_stream || 153176404edcSAsim Jamshed !(cur_stream->state >= TCP_ST_ESTABLISHED && 153276404edcSAsim Jamshed cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 153376404edcSAsim Jamshed errno = ENOTCONN; 153476404edcSAsim Jamshed return -1; 153576404edcSAsim Jamshed } 153676404edcSAsim Jamshed 153776404edcSAsim Jamshed rcvvar = cur_stream->rcvvar; 153876404edcSAsim Jamshed 153976404edcSAsim Jamshed merged_len = tcprb_cflen(rcvvar->rcvbuf); 154076404edcSAsim Jamshed 154176404edcSAsim Jamshed /* if CLOSE_WAIT, return 0 if there is no payload */ 154276404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 154376404edcSAsim Jamshed if (!rcvvar->rcvbuf) 154476404edcSAsim Jamshed return 0; 154576404edcSAsim Jamshed 154676404edcSAsim Jamshed if (merged_len == 0) 154776404edcSAsim Jamshed return 0; 154876404edcSAsim Jamshed } 154976404edcSAsim Jamshed 155076404edcSAsim Jamshed /* return EAGAIN if no receive buffer */ 155176404edcSAsim Jamshed if (socket->opts & MTCP_NONBLOCK) { 155276404edcSAsim Jamshed if (!rcvvar->rcvbuf || merged_len == 0) { 155376404edcSAsim Jamshed errno = EAGAIN; 155476404edcSAsim Jamshed return -1; 155576404edcSAsim Jamshed } 155676404edcSAsim Jamshed } 155776404edcSAsim Jamshed 155876404edcSAsim Jamshed SBUF_LOCK(&rcvvar->read_lock); 155976404edcSAsim Jamshed 156076404edcSAsim Jamshed /* read and store the contents to the vectored buffers */ 156176404edcSAsim Jamshed bytes_read = 0; 156276404edcSAsim Jamshed for (i = 0; i < numIOV; i++) { 156376404edcSAsim Jamshed if (iov[i].iov_len <= 0) 156476404edcSAsim Jamshed continue; 156576404edcSAsim Jamshed 156676404edcSAsim Jamshed ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 156776404edcSAsim Jamshed if (ret <= 0) 156876404edcSAsim Jamshed break; 156976404edcSAsim Jamshed 157076404edcSAsim Jamshed bytes_read += ret; 157176404edcSAsim Jamshed 157276404edcSAsim Jamshed if (ret < iov[i].iov_len) 157376404edcSAsim Jamshed break; 157476404edcSAsim Jamshed } 157576404edcSAsim Jamshed 157676404edcSAsim Jamshed merged_len = tcprb_cflen(rcvvar->rcvbuf); 157776404edcSAsim Jamshed 157876404edcSAsim Jamshed event_remaining = FALSE; 157976404edcSAsim Jamshed /* if there are remaining payload, generate read event */ 158076404edcSAsim Jamshed /* (may due to insufficient user buffer) */ 158176404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLIN) { 158276404edcSAsim Jamshed if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 158376404edcSAsim Jamshed event_remaining = TRUE; 158476404edcSAsim Jamshed } 158576404edcSAsim Jamshed } 158676404edcSAsim Jamshed /* if waiting for close, notify it if no remaining data */ 158776404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT && 158876404edcSAsim Jamshed merged_len == 0 && bytes_read > 0) { 158976404edcSAsim Jamshed event_remaining = TRUE; 159076404edcSAsim Jamshed } 159176404edcSAsim Jamshed 159276404edcSAsim Jamshed SBUF_UNLOCK(&rcvvar->read_lock); 159376404edcSAsim Jamshed 159476404edcSAsim Jamshed if(event_remaining) { 159576404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) { 159676404edcSAsim Jamshed AddEpollEvent(mtcp->ep, 159776404edcSAsim Jamshed USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 159876404edcSAsim Jamshed } 159976404edcSAsim Jamshed } 160076404edcSAsim Jamshed 160176404edcSAsim Jamshed TRACE_API("Stream %d: mtcp_readv() returning %d\n", 160276404edcSAsim Jamshed cur_stream->id, bytes_read); 160376404edcSAsim Jamshed return bytes_read; 160476404edcSAsim Jamshed } 160576404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 160676404edcSAsim Jamshed static inline int 160776404edcSAsim Jamshed CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len) 160876404edcSAsim Jamshed { 160976404edcSAsim Jamshed struct tcp_send_vars *sndvar = cur_stream->sndvar; 161076404edcSAsim Jamshed int sndlen; 161176404edcSAsim Jamshed int ret; 161276404edcSAsim Jamshed 161376404edcSAsim Jamshed sndlen = MIN((int)sndvar->snd_wnd, len); 161476404edcSAsim Jamshed if (sndlen <= 0) { 161576404edcSAsim Jamshed errno = EAGAIN; 161676404edcSAsim Jamshed return -1; 161776404edcSAsim Jamshed } 161876404edcSAsim Jamshed 161976404edcSAsim Jamshed /* allocate send buffer if not exist */ 162076404edcSAsim Jamshed if (!sndvar->sndbuf) { 162176404edcSAsim Jamshed sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1); 162276404edcSAsim Jamshed if (!sndvar->sndbuf) { 162376404edcSAsim Jamshed cur_stream->close_reason = TCP_NO_MEM; 162476404edcSAsim Jamshed /* notification may not required due to -1 return */ 162576404edcSAsim Jamshed errno = ENOMEM; 162676404edcSAsim Jamshed return -1; 162776404edcSAsim Jamshed } 162876404edcSAsim Jamshed } 162976404edcSAsim Jamshed 163076404edcSAsim Jamshed ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen); 163176404edcSAsim Jamshed assert(ret == sndlen); 163276404edcSAsim Jamshed sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len; 163376404edcSAsim Jamshed if (ret <= 0) { 163476404edcSAsim Jamshed TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n", 163576404edcSAsim Jamshed ret, sndlen, sndvar->sndbuf->len); 163676404edcSAsim Jamshed errno = EAGAIN; 163776404edcSAsim Jamshed return -1; 163876404edcSAsim Jamshed } 163976404edcSAsim Jamshed 164076404edcSAsim Jamshed if (sndvar->snd_wnd <= 0) { 164176404edcSAsim Jamshed TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n", 164276404edcSAsim Jamshed cur_stream->id, sndvar->snd_wnd); 164376404edcSAsim Jamshed } 164476404edcSAsim Jamshed 164576404edcSAsim Jamshed return ret; 164676404edcSAsim Jamshed } 164776404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 164876404edcSAsim Jamshed ssize_t 164976404edcSAsim Jamshed mtcp_write(mctx_t mctx, int sockid, char *buf, size_t len) 165076404edcSAsim Jamshed { 165176404edcSAsim Jamshed mtcp_manager_t mtcp; 165276404edcSAsim Jamshed socket_map_t socket; 165376404edcSAsim Jamshed tcp_stream *cur_stream; 165476404edcSAsim Jamshed struct tcp_send_vars *sndvar; 165576404edcSAsim Jamshed int ret; 165676404edcSAsim Jamshed 165776404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 165876404edcSAsim Jamshed if (!mtcp) { 165976404edcSAsim Jamshed errno = EACCES; 166076404edcSAsim Jamshed return -1; 166176404edcSAsim Jamshed } 166276404edcSAsim Jamshed 166376404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 166476404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 166576404edcSAsim Jamshed errno = EBADF; 166676404edcSAsim Jamshed return -1; 166776404edcSAsim Jamshed } 166876404edcSAsim Jamshed 166976404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 167076404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 167176404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 167276404edcSAsim Jamshed errno = EBADF; 167376404edcSAsim Jamshed return -1; 167476404edcSAsim Jamshed } 167576404edcSAsim Jamshed 167676404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_PIPE) { 167776404edcSAsim Jamshed return PipeWrite(mctx, sockid, buf, len); 167876404edcSAsim Jamshed } 167976404edcSAsim Jamshed 168076404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM) { 168176404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 168276404edcSAsim Jamshed errno = ENOTSOCK; 168376404edcSAsim Jamshed return -1; 168476404edcSAsim Jamshed } 168576404edcSAsim Jamshed 168676404edcSAsim Jamshed cur_stream = socket->stream; 168776404edcSAsim Jamshed if (!cur_stream || 168876404edcSAsim Jamshed !(cur_stream->state == TCP_ST_ESTABLISHED || 168976404edcSAsim Jamshed cur_stream->state == TCP_ST_CLOSE_WAIT)) { 169076404edcSAsim Jamshed errno = ENOTCONN; 169176404edcSAsim Jamshed return -1; 169276404edcSAsim Jamshed } 169376404edcSAsim Jamshed 169476404edcSAsim Jamshed if (len <= 0) { 169576404edcSAsim Jamshed if (socket->opts & MTCP_NONBLOCK) { 169676404edcSAsim Jamshed errno = EAGAIN; 169776404edcSAsim Jamshed return -1; 169876404edcSAsim Jamshed } else { 169976404edcSAsim Jamshed return 0; 170076404edcSAsim Jamshed } 170176404edcSAsim Jamshed } 170276404edcSAsim Jamshed 170376404edcSAsim Jamshed sndvar = cur_stream->sndvar; 170476404edcSAsim Jamshed 170576404edcSAsim Jamshed SBUF_LOCK(&sndvar->write_lock); 170676404edcSAsim Jamshed ret = CopyFromUser(mtcp, cur_stream, buf, len); 170776404edcSAsim Jamshed 170876404edcSAsim Jamshed SBUF_UNLOCK(&sndvar->write_lock); 170976404edcSAsim Jamshed 171076404edcSAsim Jamshed if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 171176404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->sendq_lock); 171276404edcSAsim Jamshed sndvar->on_sendq = TRUE; 171376404edcSAsim Jamshed StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 171476404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->sendq_lock); 171576404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 171676404edcSAsim Jamshed } 171776404edcSAsim Jamshed 171876404edcSAsim Jamshed if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) { 171976404edcSAsim Jamshed ret = -1; 172076404edcSAsim Jamshed errno = EAGAIN; 172176404edcSAsim Jamshed } 172276404edcSAsim Jamshed 172376404edcSAsim Jamshed /* if there are remaining sending buffer, generate write event */ 172476404edcSAsim Jamshed if (sndvar->snd_wnd > 0) { 172576404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 172676404edcSAsim Jamshed AddEpollEvent(mtcp->ep, 172776404edcSAsim Jamshed USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 172876404edcSAsim Jamshed } 172976404edcSAsim Jamshed } 173076404edcSAsim Jamshed 173176404edcSAsim Jamshed TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret); 173276404edcSAsim Jamshed return ret; 173376404edcSAsim Jamshed } 173476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 173576404edcSAsim Jamshed ssize_t 173676404edcSAsim Jamshed mtcp_writev(mctx_t mctx, int sockid, struct iovec *iov, int numIOV) 173776404edcSAsim Jamshed { 173876404edcSAsim Jamshed mtcp_manager_t mtcp; 173976404edcSAsim Jamshed socket_map_t socket; 174076404edcSAsim Jamshed tcp_stream *cur_stream; 174176404edcSAsim Jamshed struct tcp_send_vars *sndvar; 174276404edcSAsim Jamshed int ret, to_write, i; 174376404edcSAsim Jamshed 174476404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 174576404edcSAsim Jamshed if (!mtcp) { 174676404edcSAsim Jamshed errno = EACCES; 174776404edcSAsim Jamshed return -1; 174876404edcSAsim Jamshed } 174976404edcSAsim Jamshed 175076404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 175176404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 175276404edcSAsim Jamshed errno = EBADF; 175376404edcSAsim Jamshed return -1; 175476404edcSAsim Jamshed } 175576404edcSAsim Jamshed 175676404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 175776404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 175876404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 175976404edcSAsim Jamshed errno = EBADF; 176076404edcSAsim Jamshed return -1; 176176404edcSAsim Jamshed } 176276404edcSAsim Jamshed 176376404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM) { 176476404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 176576404edcSAsim Jamshed errno = ENOTSOCK; 176676404edcSAsim Jamshed return -1; 176776404edcSAsim Jamshed } 176876404edcSAsim Jamshed 176976404edcSAsim Jamshed cur_stream = socket->stream; 177076404edcSAsim Jamshed if (!cur_stream || 177176404edcSAsim Jamshed !(cur_stream->state == TCP_ST_ESTABLISHED || 177276404edcSAsim Jamshed cur_stream->state == TCP_ST_CLOSE_WAIT)) { 177376404edcSAsim Jamshed errno = ENOTCONN; 177476404edcSAsim Jamshed return -1; 177576404edcSAsim Jamshed } 177676404edcSAsim Jamshed 177776404edcSAsim Jamshed sndvar = cur_stream->sndvar; 177876404edcSAsim Jamshed SBUF_LOCK(&sndvar->write_lock); 177976404edcSAsim Jamshed 178076404edcSAsim Jamshed /* write from the vectored buffers */ 178176404edcSAsim Jamshed to_write = 0; 178276404edcSAsim Jamshed for (i = 0; i < numIOV; i++) { 178376404edcSAsim Jamshed if (iov[i].iov_len <= 0) 178476404edcSAsim Jamshed continue; 178576404edcSAsim Jamshed 178676404edcSAsim Jamshed ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 178776404edcSAsim Jamshed if (ret <= 0) 178876404edcSAsim Jamshed break; 178976404edcSAsim Jamshed 179076404edcSAsim Jamshed to_write += ret; 179176404edcSAsim Jamshed 179276404edcSAsim Jamshed if (ret < iov[i].iov_len) 179376404edcSAsim Jamshed break; 179476404edcSAsim Jamshed } 179576404edcSAsim Jamshed SBUF_UNLOCK(&sndvar->write_lock); 179676404edcSAsim Jamshed 179776404edcSAsim Jamshed if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 179876404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->sendq_lock); 179976404edcSAsim Jamshed sndvar->on_sendq = TRUE; 180076404edcSAsim Jamshed StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 180176404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->sendq_lock); 180276404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 180376404edcSAsim Jamshed } 180476404edcSAsim Jamshed 180576404edcSAsim Jamshed if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) { 180676404edcSAsim Jamshed to_write = -1; 180776404edcSAsim Jamshed errno = EAGAIN; 180876404edcSAsim Jamshed } 180976404edcSAsim Jamshed 181076404edcSAsim Jamshed /* if there are remaining sending buffer, generate write event */ 181176404edcSAsim Jamshed if (sndvar->snd_wnd > 0) { 181276404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 181376404edcSAsim Jamshed AddEpollEvent(mtcp->ep, 181476404edcSAsim Jamshed USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 181576404edcSAsim Jamshed } 181676404edcSAsim Jamshed } 181776404edcSAsim Jamshed 181876404edcSAsim Jamshed TRACE_API("Stream %d: mtcp_writev() returning %d\n", 181976404edcSAsim Jamshed cur_stream->id, to_write); 182076404edcSAsim Jamshed return to_write; 182176404edcSAsim Jamshed } 182276404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 182376404edcSAsim Jamshed uint32_t 182476404edcSAsim Jamshed mtcp_get_connection_cnt(mctx_t mctx) 182576404edcSAsim Jamshed { 182676404edcSAsim Jamshed mtcp_manager_t mtcp; 182776404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 182876404edcSAsim Jamshed if (!mtcp) { 182976404edcSAsim Jamshed errno = EACCES; 183076404edcSAsim Jamshed return -1; 183176404edcSAsim Jamshed } 183276404edcSAsim Jamshed 183376404edcSAsim Jamshed if (mtcp->num_msp > 0) 183476404edcSAsim Jamshed return mtcp->flow_cnt / 2; 183576404edcSAsim Jamshed else 183676404edcSAsim Jamshed return mtcp->flow_cnt; 183776404edcSAsim Jamshed } 183876404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 1839