1*76404edcSAsim Jamshed #include <sys/queue.h> 2*76404edcSAsim Jamshed #include <sys/ioctl.h> 3*76404edcSAsim Jamshed #include <limits.h> 4*76404edcSAsim Jamshed #include <unistd.h> 5*76404edcSAsim Jamshed #include <assert.h> 6*76404edcSAsim Jamshed #include <string.h> 7*76404edcSAsim Jamshed 8*76404edcSAsim Jamshed #ifdef DARWIN 9*76404edcSAsim Jamshed #include <netinet/tcp.h> 10*76404edcSAsim Jamshed #include <netinet/ip.h> 11*76404edcSAsim Jamshed #include <netinet/if_ether.h> 12*76404edcSAsim Jamshed #endif 13*76404edcSAsim Jamshed 14*76404edcSAsim Jamshed #include "mtcp.h" 15*76404edcSAsim Jamshed #include "mtcp_api.h" 16*76404edcSAsim Jamshed #include "tcp_in.h" 17*76404edcSAsim Jamshed #include "tcp_stream.h" 18*76404edcSAsim Jamshed #include "tcp_out.h" 19*76404edcSAsim Jamshed #include "ip_out.h" 20*76404edcSAsim Jamshed #include "eventpoll.h" 21*76404edcSAsim Jamshed #include "pipe.h" 22*76404edcSAsim Jamshed #include "fhash.h" 23*76404edcSAsim Jamshed #include "addr_pool.h" 24*76404edcSAsim Jamshed #include "rss.h" 25*76404edcSAsim Jamshed #include "config.h" 26*76404edcSAsim Jamshed #include "debug.h" 27*76404edcSAsim Jamshed #include "eventpoll.h" 28*76404edcSAsim Jamshed #include "mos_api.h" 29*76404edcSAsim Jamshed #include "tcp_rb.h" 30*76404edcSAsim Jamshed 31*76404edcSAsim Jamshed #define MAX(a, b) ((a)>(b)?(a):(b)) 32*76404edcSAsim Jamshed #define MIN(a, b) ((a)<(b)?(a):(b)) 33*76404edcSAsim Jamshed 34*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 35*76404edcSAsim Jamshed /** Stop monitoring the socket! (function prototype) 36*76404edcSAsim Jamshed * @param [in] mctx: mtcp context 37*76404edcSAsim Jamshed * @param [in] sock: monitoring stream socket id 38*76404edcSAsim Jamshed * @param [in] side: side of monitoring (client side, server side or both) 39*76404edcSAsim Jamshed * 40*76404edcSAsim Jamshed * This function is now DEPRECATED and is only used within mOS core... 41*76404edcSAsim Jamshed */ 42*76404edcSAsim Jamshed int 43*76404edcSAsim Jamshed mtcp_cb_stop(mctx_t mctx, int sock, int side); 44*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 45*76404edcSAsim Jamshed /** Reset the connection (send RST packets to both sides) 46*76404edcSAsim Jamshed * (We need to decide the API for this.) 47*76404edcSAsim Jamshed */ 48*76404edcSAsim Jamshed //int 49*76404edcSAsim Jamshed //mtcp_cb_reset(mctx_t mctx, int sock, int side); 50*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 51*76404edcSAsim Jamshed inline mtcp_manager_t 52*76404edcSAsim Jamshed GetMTCPManager(mctx_t mctx) 53*76404edcSAsim Jamshed { 54*76404edcSAsim Jamshed if (!mctx) { 55*76404edcSAsim Jamshed errno = EACCES; 56*76404edcSAsim Jamshed return NULL; 57*76404edcSAsim Jamshed } 58*76404edcSAsim Jamshed 59*76404edcSAsim Jamshed if (mctx->cpu < 0 || mctx->cpu >= num_cpus) { 60*76404edcSAsim Jamshed errno = EINVAL; 61*76404edcSAsim Jamshed return NULL; 62*76404edcSAsim Jamshed } 63*76404edcSAsim Jamshed 64*76404edcSAsim Jamshed if (g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) { 65*76404edcSAsim Jamshed errno = EPERM; 66*76404edcSAsim Jamshed return NULL; 67*76404edcSAsim Jamshed } 68*76404edcSAsim Jamshed 69*76404edcSAsim Jamshed return g_mtcp[mctx->cpu]; 70*76404edcSAsim Jamshed } 71*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 72*76404edcSAsim Jamshed inline int 73*76404edcSAsim Jamshed GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen) 74*76404edcSAsim Jamshed { 75*76404edcSAsim Jamshed tcp_stream *cur_stream; 76*76404edcSAsim Jamshed 77*76404edcSAsim Jamshed if (!socket->stream) { 78*76404edcSAsim Jamshed errno = EBADF; 79*76404edcSAsim Jamshed return -1; 80*76404edcSAsim Jamshed } 81*76404edcSAsim Jamshed 82*76404edcSAsim Jamshed cur_stream = socket->stream; 83*76404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 84*76404edcSAsim Jamshed if (cur_stream->close_reason == TCP_TIMEDOUT || 85*76404edcSAsim Jamshed cur_stream->close_reason == TCP_CONN_FAIL || 86*76404edcSAsim Jamshed cur_stream->close_reason == TCP_CONN_LOST) { 87*76404edcSAsim Jamshed *(int *)optval = ETIMEDOUT; 88*76404edcSAsim Jamshed *optlen = sizeof(int); 89*76404edcSAsim Jamshed 90*76404edcSAsim Jamshed return 0; 91*76404edcSAsim Jamshed } 92*76404edcSAsim Jamshed } 93*76404edcSAsim Jamshed 94*76404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT || 95*76404edcSAsim Jamshed cur_stream->state == TCP_ST_CLOSED_RSVD) { 96*76404edcSAsim Jamshed if (cur_stream->close_reason == TCP_RESET) { 97*76404edcSAsim Jamshed *(int *)optval = ECONNRESET; 98*76404edcSAsim Jamshed *optlen = sizeof(int); 99*76404edcSAsim Jamshed 100*76404edcSAsim Jamshed return 0; 101*76404edcSAsim Jamshed } 102*76404edcSAsim Jamshed } 103*76404edcSAsim Jamshed 104*76404edcSAsim Jamshed errno = ENOSYS; 105*76404edcSAsim Jamshed return -1; 106*76404edcSAsim Jamshed } 107*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 108*76404edcSAsim Jamshed int 109*76404edcSAsim Jamshed mtcp_getsockopt(mctx_t mctx, int sockid, int level, 110*76404edcSAsim Jamshed int optname, void *optval, socklen_t *optlen) 111*76404edcSAsim Jamshed { 112*76404edcSAsim Jamshed mtcp_manager_t mtcp; 113*76404edcSAsim Jamshed socket_map_t socket; 114*76404edcSAsim Jamshed 115*76404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 116*76404edcSAsim Jamshed if (!mtcp) { 117*76404edcSAsim Jamshed errno = EACCES; 118*76404edcSAsim Jamshed return -1; 119*76404edcSAsim Jamshed } 120*76404edcSAsim Jamshed 121*76404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 122*76404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 123*76404edcSAsim Jamshed errno = EBADF; 124*76404edcSAsim Jamshed return -1; 125*76404edcSAsim Jamshed } 126*76404edcSAsim Jamshed 127*76404edcSAsim Jamshed switch (level) { 128*76404edcSAsim Jamshed case SOL_SOCKET: 129*76404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 130*76404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 131*76404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 132*76404edcSAsim Jamshed errno = EBADF; 133*76404edcSAsim Jamshed return -1; 134*76404edcSAsim Jamshed } 135*76404edcSAsim Jamshed 136*76404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 137*76404edcSAsim Jamshed socket->socktype != MOS_SOCK_STREAM) { 138*76404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 139*76404edcSAsim Jamshed errno = ENOTSOCK; 140*76404edcSAsim Jamshed return -1; 141*76404edcSAsim Jamshed } 142*76404edcSAsim Jamshed 143*76404edcSAsim Jamshed if (optname == SO_ERROR) { 144*76404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_STREAM) { 145*76404edcSAsim Jamshed return GetSocketError(socket, optval, optlen); 146*76404edcSAsim Jamshed } 147*76404edcSAsim Jamshed } 148*76404edcSAsim Jamshed break; 149*76404edcSAsim Jamshed case SOL_MONSOCKET: 150*76404edcSAsim Jamshed /* check if the calling thread is in MOS context */ 151*76404edcSAsim Jamshed if (mtcp->ctx->thread != pthread_self()) { 152*76404edcSAsim Jamshed errno = EPERM; 153*76404edcSAsim Jamshed return -1; 154*76404edcSAsim Jamshed } 155*76404edcSAsim Jamshed /* 156*76404edcSAsim Jamshed * All options will only work for active 157*76404edcSAsim Jamshed * monitor stream sockets 158*76404edcSAsim Jamshed */ 159*76404edcSAsim Jamshed socket = &mtcp->msmap[sockid]; 160*76404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 161*76404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 162*76404edcSAsim Jamshed errno = ENOTSOCK; 163*76404edcSAsim Jamshed return -1; 164*76404edcSAsim Jamshed } 165*76404edcSAsim Jamshed 166*76404edcSAsim Jamshed switch (optname) { 167*76404edcSAsim Jamshed case MOS_FRAGINFO_CLIBUF: 168*76404edcSAsim Jamshed return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen); 169*76404edcSAsim Jamshed case MOS_FRAGINFO_SVRBUF: 170*76404edcSAsim Jamshed return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen); 171*76404edcSAsim Jamshed case MOS_INFO_CLIBUF: 172*76404edcSAsim Jamshed return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen); 173*76404edcSAsim Jamshed case MOS_INFO_SVRBUF: 174*76404edcSAsim Jamshed return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen); 175*76404edcSAsim Jamshed case MOS_TCP_STATE_CLI: 176*76404edcSAsim Jamshed return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI, 177*76404edcSAsim Jamshed optval, optlen); 178*76404edcSAsim Jamshed case MOS_TCP_STATE_SVR: 179*76404edcSAsim Jamshed return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR, 180*76404edcSAsim Jamshed optval, optlen); 181*76404edcSAsim Jamshed case MOS_TIMESTAMP: 182*76404edcSAsim Jamshed return GetLastTimestamp(socket->monitor_stream->stream, 183*76404edcSAsim Jamshed (uint32_t *)optval, 184*76404edcSAsim Jamshed optlen); 185*76404edcSAsim Jamshed default: 186*76404edcSAsim Jamshed TRACE_API("can't recognize the optname=%d\n", optname); 187*76404edcSAsim Jamshed assert(0); 188*76404edcSAsim Jamshed } 189*76404edcSAsim Jamshed break; 190*76404edcSAsim Jamshed } 191*76404edcSAsim Jamshed errno = ENOSYS; 192*76404edcSAsim Jamshed return -1; 193*76404edcSAsim Jamshed } 194*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 195*76404edcSAsim Jamshed int 196*76404edcSAsim Jamshed mtcp_setsockopt(mctx_t mctx, int sockid, int level, 197*76404edcSAsim Jamshed int optname, const void *optval, socklen_t optlen) 198*76404edcSAsim Jamshed { 199*76404edcSAsim Jamshed mtcp_manager_t mtcp; 200*76404edcSAsim Jamshed socket_map_t socket; 201*76404edcSAsim Jamshed tcprb_t *rb; 202*76404edcSAsim Jamshed 203*76404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 204*76404edcSAsim Jamshed if (!mtcp) { 205*76404edcSAsim Jamshed errno = EACCES; 206*76404edcSAsim Jamshed return -1; 207*76404edcSAsim Jamshed } 208*76404edcSAsim Jamshed 209*76404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 210*76404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 211*76404edcSAsim Jamshed errno = EBADF; 212*76404edcSAsim Jamshed return -1; 213*76404edcSAsim Jamshed } 214*76404edcSAsim Jamshed 215*76404edcSAsim Jamshed switch (level) { 216*76404edcSAsim Jamshed case SOL_SOCKET: 217*76404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 218*76404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 219*76404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 220*76404edcSAsim Jamshed errno = EBADF; 221*76404edcSAsim Jamshed return -1; 222*76404edcSAsim Jamshed } 223*76404edcSAsim Jamshed 224*76404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 225*76404edcSAsim Jamshed socket->socktype != MOS_SOCK_STREAM) { 226*76404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 227*76404edcSAsim Jamshed errno = ENOTSOCK; 228*76404edcSAsim Jamshed return -1; 229*76404edcSAsim Jamshed } 230*76404edcSAsim Jamshed break; 231*76404edcSAsim Jamshed case SOL_MONSOCKET: 232*76404edcSAsim Jamshed socket = &mtcp->msmap[sockid]; 233*76404edcSAsim Jamshed /* 234*76404edcSAsim Jamshed * checking of calling thread to be in MOS context is 235*76404edcSAsim Jamshed * disabled since both optnames can be called from 236*76404edcSAsim Jamshed * `application' context (on passive sockets) 237*76404edcSAsim Jamshed */ 238*76404edcSAsim Jamshed /* 239*76404edcSAsim Jamshed * if (mtcp->ctx->thread != pthread_self()) 240*76404edcSAsim Jamshed * return -1; 241*76404edcSAsim Jamshed */ 242*76404edcSAsim Jamshed 243*76404edcSAsim Jamshed switch (optname) { 244*76404edcSAsim Jamshed case MOS_CLIBUF: 245*76404edcSAsim Jamshed #if 0 246*76404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 247*76404edcSAsim Jamshed errno = EBADF; 248*76404edcSAsim Jamshed return -1; 249*76404edcSAsim Jamshed } 250*76404edcSAsim Jamshed #endif 251*76404edcSAsim Jamshed #ifdef NEWRB 252*76404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE 253*76404edcSAsim Jamshed if (*(int *)optval != 0) 254*76404edcSAsim Jamshed return -1; 255*76404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 256*76404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 257*76404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 258*76404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 259*76404edcSAsim Jamshed if (rb) { 260*76404edcSAsim Jamshed tcprb_resize_meta(rb, 0); 261*76404edcSAsim Jamshed tcprb_resize(rb, 0); 262*76404edcSAsim Jamshed } 263*76404edcSAsim Jamshed } 264*76404edcSAsim Jamshed return DisableBuf(socket, MOS_SIDE_CLI); 265*76404edcSAsim Jamshed #else 266*76404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 267*76404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 268*76404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 269*76404edcSAsim Jamshed if (tcprb_resize_meta(rb, *(int *)optval) < 0) 270*76404edcSAsim Jamshed return -1; 271*76404edcSAsim Jamshed return tcprb_resize(rb, 272*76404edcSAsim Jamshed (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 273*76404edcSAsim Jamshed #endif 274*76404edcSAsim Jamshed #else 275*76404edcSAsim Jamshed errno = EBADF; 276*76404edcSAsim Jamshed return -1; 277*76404edcSAsim Jamshed #endif 278*76404edcSAsim Jamshed case MOS_SVRBUF: 279*76404edcSAsim Jamshed #if 0 280*76404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 281*76404edcSAsim Jamshed errno = EBADF; 282*76404edcSAsim Jamshed return -1; 283*76404edcSAsim Jamshed } 284*76404edcSAsim Jamshed #endif 285*76404edcSAsim Jamshed #ifdef NEWRB 286*76404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE 287*76404edcSAsim Jamshed if (*(int *)optval != 0) 288*76404edcSAsim Jamshed return -1; 289*76404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 290*76404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 291*76404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 292*76404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 293*76404edcSAsim Jamshed if (rb) { 294*76404edcSAsim Jamshed tcprb_resize_meta(rb, 0); 295*76404edcSAsim Jamshed tcprb_resize(rb, 0); 296*76404edcSAsim Jamshed } 297*76404edcSAsim Jamshed } 298*76404edcSAsim Jamshed return DisableBuf(socket, MOS_SIDE_SVR); 299*76404edcSAsim Jamshed #else 300*76404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 301*76404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 302*76404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 303*76404edcSAsim Jamshed if (tcprb_resize_meta(rb, *(int *)optval) < 0) 304*76404edcSAsim Jamshed return -1; 305*76404edcSAsim Jamshed return tcprb_resize(rb, 306*76404edcSAsim Jamshed (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE); 307*76404edcSAsim Jamshed #endif 308*76404edcSAsim Jamshed #else 309*76404edcSAsim Jamshed errno = EBADF; 310*76404edcSAsim Jamshed return -1; 311*76404edcSAsim Jamshed #endif 312*76404edcSAsim Jamshed case MOS_FRAG_CLIBUF: 313*76404edcSAsim Jamshed #if 0 314*76404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 315*76404edcSAsim Jamshed errno = EBADF; 316*76404edcSAsim Jamshed return -1; 317*76404edcSAsim Jamshed } 318*76404edcSAsim Jamshed #endif 319*76404edcSAsim Jamshed #ifdef NEWRB 320*76404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE 321*76404edcSAsim Jamshed if (*(int *)optval != 0) 322*76404edcSAsim Jamshed return -1; 323*76404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 324*76404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 325*76404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 326*76404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 327*76404edcSAsim Jamshed if (rb) 328*76404edcSAsim Jamshed tcprb_resize(rb, 0); 329*76404edcSAsim Jamshed } 330*76404edcSAsim Jamshed return 0; 331*76404edcSAsim Jamshed #else 332*76404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ? 333*76404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 334*76404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 335*76404edcSAsim Jamshed if (rb->len == 0) 336*76404edcSAsim Jamshed return tcprb_resize_meta(rb, *(int *)optval); 337*76404edcSAsim Jamshed else 338*76404edcSAsim Jamshed return -1; 339*76404edcSAsim Jamshed #endif 340*76404edcSAsim Jamshed #else 341*76404edcSAsim Jamshed errno = EBADF; 342*76404edcSAsim Jamshed return -1; 343*76404edcSAsim Jamshed #endif 344*76404edcSAsim Jamshed case MOS_FRAG_SVRBUF: 345*76404edcSAsim Jamshed #if 0 346*76404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) { 347*76404edcSAsim Jamshed errno = EBADF; 348*76404edcSAsim Jamshed return -1; 349*76404edcSAsim Jamshed } 350*76404edcSAsim Jamshed #endif 351*76404edcSAsim Jamshed #ifdef NEWRB 352*76404edcSAsim Jamshed #ifdef DISABLE_DYN_RESIZE 353*76404edcSAsim Jamshed if (*(int *)optval != 0) 354*76404edcSAsim Jamshed return -1; 355*76404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) { 356*76404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 357*76404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 358*76404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 359*76404edcSAsim Jamshed if (rb) 360*76404edcSAsim Jamshed tcprb_resize(rb, 0); 361*76404edcSAsim Jamshed } 362*76404edcSAsim Jamshed return 0; 363*76404edcSAsim Jamshed #else 364*76404edcSAsim Jamshed rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ? 365*76404edcSAsim Jamshed socket->monitor_stream->stream->rcvvar->rcvbuf : 366*76404edcSAsim Jamshed socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf; 367*76404edcSAsim Jamshed if (rb->len == 0) 368*76404edcSAsim Jamshed return tcprb_resize_meta(rb, *(int *)optval); 369*76404edcSAsim Jamshed else 370*76404edcSAsim Jamshed return -1; 371*76404edcSAsim Jamshed #endif 372*76404edcSAsim Jamshed #else 373*76404edcSAsim Jamshed errno = EBADF; 374*76404edcSAsim Jamshed return -1; 375*76404edcSAsim Jamshed #endif 376*76404edcSAsim Jamshed case MOS_MONLEVEL: 377*76404edcSAsim Jamshed #ifdef OLD_API 378*76404edcSAsim Jamshed assert(*(int *)optval == MOS_NO_CLIBUF || 379*76404edcSAsim Jamshed *(int *)optval == MOS_NO_SVRBUF); 380*76404edcSAsim Jamshed return DisableBuf(socket, 381*76404edcSAsim Jamshed (*(int *)optval == MOS_NO_CLIBUF) ? 382*76404edcSAsim Jamshed MOS_SIDE_CLI : MOS_SIDE_SVR); 383*76404edcSAsim Jamshed #endif 384*76404edcSAsim Jamshed case MOS_SEQ_REMAP: 385*76404edcSAsim Jamshed return TcpSeqChange(socket, 386*76404edcSAsim Jamshed (uint32_t)((seq_remap_info *)optval)->seq_off, 387*76404edcSAsim Jamshed ((seq_remap_info *)optval)->side, 388*76404edcSAsim Jamshed mtcp->pctx->p.seq); 389*76404edcSAsim Jamshed case MOS_STOP_MON: 390*76404edcSAsim Jamshed return mtcp_cb_stop(mctx, sockid, *(int *)optval); 391*76404edcSAsim Jamshed default: 392*76404edcSAsim Jamshed TRACE_API("invalid optname=%d\n", optname); 393*76404edcSAsim Jamshed assert(0); 394*76404edcSAsim Jamshed } 395*76404edcSAsim Jamshed break; 396*76404edcSAsim Jamshed } 397*76404edcSAsim Jamshed 398*76404edcSAsim Jamshed errno = ENOSYS; 399*76404edcSAsim Jamshed return -1; 400*76404edcSAsim Jamshed } 401*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 402*76404edcSAsim Jamshed int 403*76404edcSAsim Jamshed mtcp_setsock_nonblock(mctx_t mctx, int sockid) 404*76404edcSAsim Jamshed { 405*76404edcSAsim Jamshed mtcp_manager_t mtcp; 406*76404edcSAsim Jamshed 407*76404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 408*76404edcSAsim Jamshed if (!mtcp) { 409*76404edcSAsim Jamshed errno = EACCES; 410*76404edcSAsim Jamshed return -1; 411*76404edcSAsim Jamshed } 412*76404edcSAsim Jamshed 413*76404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 414*76404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 415*76404edcSAsim Jamshed errno = EBADF; 416*76404edcSAsim Jamshed return -1; 417*76404edcSAsim Jamshed } 418*76404edcSAsim Jamshed 419*76404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 420*76404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 421*76404edcSAsim Jamshed errno = EBADF; 422*76404edcSAsim Jamshed return -1; 423*76404edcSAsim Jamshed } 424*76404edcSAsim Jamshed 425*76404edcSAsim Jamshed mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 426*76404edcSAsim Jamshed 427*76404edcSAsim Jamshed return 0; 428*76404edcSAsim Jamshed } 429*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 430*76404edcSAsim Jamshed int 431*76404edcSAsim Jamshed mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp) 432*76404edcSAsim Jamshed { 433*76404edcSAsim Jamshed mtcp_manager_t mtcp; 434*76404edcSAsim Jamshed socket_map_t socket; 435*76404edcSAsim Jamshed 436*76404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 437*76404edcSAsim Jamshed if (!mtcp) { 438*76404edcSAsim Jamshed errno = EACCES; 439*76404edcSAsim Jamshed return -1; 440*76404edcSAsim Jamshed } 441*76404edcSAsim Jamshed 442*76404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 443*76404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 444*76404edcSAsim Jamshed errno = EBADF; 445*76404edcSAsim Jamshed return -1; 446*76404edcSAsim Jamshed } 447*76404edcSAsim Jamshed 448*76404edcSAsim Jamshed /* only support stream socket */ 449*76404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 450*76404edcSAsim Jamshed 451*76404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM_LISTEN && 452*76404edcSAsim Jamshed socket->socktype != MOS_SOCK_STREAM) { 453*76404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 454*76404edcSAsim Jamshed errno = EBADF; 455*76404edcSAsim Jamshed return -1; 456*76404edcSAsim Jamshed } 457*76404edcSAsim Jamshed 458*76404edcSAsim Jamshed if (!argp) { 459*76404edcSAsim Jamshed errno = EFAULT; 460*76404edcSAsim Jamshed return -1; 461*76404edcSAsim Jamshed } 462*76404edcSAsim Jamshed 463*76404edcSAsim Jamshed if (request == FIONREAD) { 464*76404edcSAsim Jamshed tcp_stream *cur_stream; 465*76404edcSAsim Jamshed #ifdef NEWRB 466*76404edcSAsim Jamshed tcprb_t *rbuf; 467*76404edcSAsim Jamshed #else 468*76404edcSAsim Jamshed struct tcp_ring_buffer *rbuf; 469*76404edcSAsim Jamshed #endif 470*76404edcSAsim Jamshed 471*76404edcSAsim Jamshed cur_stream = socket->stream; 472*76404edcSAsim Jamshed if (!cur_stream) { 473*76404edcSAsim Jamshed errno = EBADF; 474*76404edcSAsim Jamshed return -1; 475*76404edcSAsim Jamshed } 476*76404edcSAsim Jamshed 477*76404edcSAsim Jamshed rbuf = cur_stream->rcvvar->rcvbuf; 478*76404edcSAsim Jamshed #ifdef NEWRB 479*76404edcSAsim Jamshed *(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0; 480*76404edcSAsim Jamshed #else 481*76404edcSAsim Jamshed *(int *)argp = (rbuf) ? rbuf->merged_len : 0; 482*76404edcSAsim Jamshed #endif 483*76404edcSAsim Jamshed 484*76404edcSAsim Jamshed } else if (request == FIONBIO) { 485*76404edcSAsim Jamshed /* 486*76404edcSAsim Jamshed * sockets can only be set to blocking/non-blocking 487*76404edcSAsim Jamshed * modes during initialization 488*76404edcSAsim Jamshed */ 489*76404edcSAsim Jamshed if ((*(int *)argp)) 490*76404edcSAsim Jamshed mtcp->smap[sockid].opts |= MTCP_NONBLOCK; 491*76404edcSAsim Jamshed else 492*76404edcSAsim Jamshed mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK; 493*76404edcSAsim Jamshed } else { 494*76404edcSAsim Jamshed errno = EINVAL; 495*76404edcSAsim Jamshed return -1; 496*76404edcSAsim Jamshed } 497*76404edcSAsim Jamshed 498*76404edcSAsim Jamshed return 0; 499*76404edcSAsim Jamshed } 500*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 501*76404edcSAsim Jamshed static int 502*76404edcSAsim Jamshed mtcp_monitor(mctx_t mctx, socket_map_t sock) 503*76404edcSAsim Jamshed { 504*76404edcSAsim Jamshed mtcp_manager_t mtcp; 505*76404edcSAsim Jamshed struct mon_listener *monitor; 506*76404edcSAsim Jamshed int sockid = sock->id; 507*76404edcSAsim Jamshed 508*76404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 509*76404edcSAsim Jamshed if (!mtcp) { 510*76404edcSAsim Jamshed errno = EACCES; 511*76404edcSAsim Jamshed return -1; 512*76404edcSAsim Jamshed } 513*76404edcSAsim Jamshed 514*76404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 515*76404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 516*76404edcSAsim Jamshed errno = EBADF; 517*76404edcSAsim Jamshed return -1; 518*76404edcSAsim Jamshed } 519*76404edcSAsim Jamshed 520*76404edcSAsim Jamshed if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) { 521*76404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 522*76404edcSAsim Jamshed errno = EBADF; 523*76404edcSAsim Jamshed return -1; 524*76404edcSAsim Jamshed } 525*76404edcSAsim Jamshed 526*76404edcSAsim Jamshed if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM || 527*76404edcSAsim Jamshed mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) { 528*76404edcSAsim Jamshed TRACE_API("Not a monitor socket. id: %d\n", sockid); 529*76404edcSAsim Jamshed errno = ENOTSOCK; 530*76404edcSAsim Jamshed return -1; 531*76404edcSAsim Jamshed } 532*76404edcSAsim Jamshed 533*76404edcSAsim Jamshed monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener)); 534*76404edcSAsim Jamshed if (!monitor) { 535*76404edcSAsim Jamshed /* errno set from the malloc() */ 536*76404edcSAsim Jamshed errno = ENOMEM; 537*76404edcSAsim Jamshed return -1; 538*76404edcSAsim Jamshed } 539*76404edcSAsim Jamshed 540*76404edcSAsim Jamshed /* create a monitor-specific event queue */ 541*76404edcSAsim Jamshed monitor->eq = CreateEventQueue(g_config.mos->max_concurrency); 542*76404edcSAsim Jamshed if (!monitor->eq) { 543*76404edcSAsim Jamshed TRACE_API("Can't create event queue (concurrency: %d) for " 544*76404edcSAsim Jamshed "monitor read event registrations!\n", 545*76404edcSAsim Jamshed g_config.mos->max_concurrency); 546*76404edcSAsim Jamshed free(monitor); 547*76404edcSAsim Jamshed errno = ENOMEM; 548*76404edcSAsim Jamshed return -1; 549*76404edcSAsim Jamshed } 550*76404edcSAsim Jamshed 551*76404edcSAsim Jamshed /* set monitor-related basic parameters */ 552*76404edcSAsim Jamshed #ifndef NEWEV 553*76404edcSAsim Jamshed monitor->ude_id = UDE_OFFSET; 554*76404edcSAsim Jamshed #endif 555*76404edcSAsim Jamshed monitor->socket = sock; 556*76404edcSAsim Jamshed #ifdef NEWRB 557*76404edcSAsim Jamshed monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL; 558*76404edcSAsim Jamshed #else 559*76404edcSAsim Jamshed monitor->client_buf_mgmt = monitor->server_buf_mgmt = 1; 560*76404edcSAsim Jamshed #endif 561*76404edcSAsim Jamshed 562*76404edcSAsim Jamshed /* perform both sides monitoring by default */ 563*76404edcSAsim Jamshed monitor->client_mon = monitor->server_mon = 1; 564*76404edcSAsim Jamshed 565*76404edcSAsim Jamshed /* add monitor socket to the monitor list */ 566*76404edcSAsim Jamshed TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link); 567*76404edcSAsim Jamshed 568*76404edcSAsim Jamshed mtcp->msmap[sockid].monitor_listener = monitor; 569*76404edcSAsim Jamshed 570*76404edcSAsim Jamshed return 0; 571*76404edcSAsim Jamshed } 572*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 573*76404edcSAsim Jamshed int 574*76404edcSAsim Jamshed mtcp_socket(mctx_t mctx, int domain, int type, int protocol) 575*76404edcSAsim Jamshed { 576*76404edcSAsim Jamshed mtcp_manager_t mtcp; 577*76404edcSAsim Jamshed socket_map_t socket; 578*76404edcSAsim Jamshed 579*76404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 580*76404edcSAsim Jamshed if (!mtcp) { 581*76404edcSAsim Jamshed errno = EACCES; 582*76404edcSAsim Jamshed return -1; 583*76404edcSAsim Jamshed } 584*76404edcSAsim Jamshed 585*76404edcSAsim Jamshed if (domain != AF_INET) { 586*76404edcSAsim Jamshed errno = EAFNOSUPPORT; 587*76404edcSAsim Jamshed return -1; 588*76404edcSAsim Jamshed } 589*76404edcSAsim Jamshed 590*76404edcSAsim Jamshed if (type == SOCK_STREAM) { 591*76404edcSAsim Jamshed type = MOS_SOCK_STREAM; 592*76404edcSAsim Jamshed } else if (type == MOS_SOCK_MONITOR_STREAM || 593*76404edcSAsim Jamshed type == MOS_SOCK_MONITOR_RAW) { 594*76404edcSAsim Jamshed /* do nothing for the time being */ 595*76404edcSAsim Jamshed } else { 596*76404edcSAsim Jamshed /* Not supported type */ 597*76404edcSAsim Jamshed errno = EINVAL; 598*76404edcSAsim Jamshed return -1; 599*76404edcSAsim Jamshed } 600*76404edcSAsim Jamshed 601*76404edcSAsim Jamshed socket = AllocateSocket(mctx, type); 602*76404edcSAsim Jamshed if (!socket) { 603*76404edcSAsim Jamshed errno = ENFILE; 604*76404edcSAsim Jamshed return -1; 605*76404edcSAsim Jamshed } 606*76404edcSAsim Jamshed 607*76404edcSAsim Jamshed if (type == MOS_SOCK_MONITOR_STREAM || 608*76404edcSAsim Jamshed type == MOS_SOCK_MONITOR_RAW) { 609*76404edcSAsim Jamshed mtcp_manager_t mtcp = GetMTCPManager(mctx); 610*76404edcSAsim Jamshed if (!mtcp) { 611*76404edcSAsim Jamshed errno = EACCES; 612*76404edcSAsim Jamshed return -1; 613*76404edcSAsim Jamshed } 614*76404edcSAsim Jamshed mtcp_monitor(mctx, socket); 615*76404edcSAsim Jamshed #ifdef NEWEV 616*76404edcSAsim Jamshed socket->monitor_listener->stree_dontcare = NULL; 617*76404edcSAsim Jamshed socket->monitor_listener->stree_pre_rcv = NULL; 618*76404edcSAsim Jamshed socket->monitor_listener->stree_post_snd = NULL; 619*76404edcSAsim Jamshed #else 620*76404edcSAsim Jamshed InitEvB(mtcp, &socket->monitor_listener->dontcare_evb); 621*76404edcSAsim Jamshed InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb); 622*76404edcSAsim Jamshed InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb); 623*76404edcSAsim Jamshed #endif 624*76404edcSAsim Jamshed } 625*76404edcSAsim Jamshed 626*76404edcSAsim Jamshed return socket->id; 627*76404edcSAsim Jamshed } 628*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 629*76404edcSAsim Jamshed int 630*76404edcSAsim Jamshed mtcp_bind(mctx_t mctx, int sockid, 631*76404edcSAsim Jamshed const struct sockaddr *addr, socklen_t addrlen) 632*76404edcSAsim Jamshed { 633*76404edcSAsim Jamshed mtcp_manager_t mtcp; 634*76404edcSAsim Jamshed struct sockaddr_in *addr_in; 635*76404edcSAsim Jamshed 636*76404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 637*76404edcSAsim Jamshed if (!mtcp) { 638*76404edcSAsim Jamshed errno = EACCES; 639*76404edcSAsim Jamshed return -1; 640*76404edcSAsim Jamshed } 641*76404edcSAsim Jamshed 642*76404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 643*76404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 644*76404edcSAsim Jamshed errno = EBADF; 645*76404edcSAsim Jamshed return -1; 646*76404edcSAsim Jamshed } 647*76404edcSAsim Jamshed 648*76404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 649*76404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 650*76404edcSAsim Jamshed errno = EBADF; 651*76404edcSAsim Jamshed return -1; 652*76404edcSAsim Jamshed } 653*76404edcSAsim Jamshed 654*76404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM && 655*76404edcSAsim Jamshed mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 656*76404edcSAsim Jamshed TRACE_API("Not a stream socket id: %d\n", sockid); 657*76404edcSAsim Jamshed errno = ENOTSOCK; 658*76404edcSAsim Jamshed return -1; 659*76404edcSAsim Jamshed } 660*76404edcSAsim Jamshed 661*76404edcSAsim Jamshed if (!addr) { 662*76404edcSAsim Jamshed TRACE_API("Socket %d: empty address!\n", sockid); 663*76404edcSAsim Jamshed errno = EINVAL; 664*76404edcSAsim Jamshed return -1; 665*76404edcSAsim Jamshed } 666*76404edcSAsim Jamshed 667*76404edcSAsim Jamshed if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) { 668*76404edcSAsim Jamshed TRACE_API("Socket %d: adress already bind for this socket.\n", sockid); 669*76404edcSAsim Jamshed errno = EINVAL; 670*76404edcSAsim Jamshed return -1; 671*76404edcSAsim Jamshed } 672*76404edcSAsim Jamshed 673*76404edcSAsim Jamshed /* we only allow bind() for AF_INET address */ 674*76404edcSAsim Jamshed if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 675*76404edcSAsim Jamshed TRACE_API("Socket %d: invalid argument!\n", sockid); 676*76404edcSAsim Jamshed errno = EINVAL; 677*76404edcSAsim Jamshed return -1; 678*76404edcSAsim Jamshed } 679*76404edcSAsim Jamshed 680*76404edcSAsim Jamshed if (mtcp->listener) { 681*76404edcSAsim Jamshed TRACE_API("Address already bound!\n"); 682*76404edcSAsim Jamshed errno = EINVAL; 683*76404edcSAsim Jamshed return -1; 684*76404edcSAsim Jamshed } 685*76404edcSAsim Jamshed addr_in = (struct sockaddr_in *)addr; 686*76404edcSAsim Jamshed mtcp->smap[sockid].saddr = *addr_in; 687*76404edcSAsim Jamshed mtcp->smap[sockid].opts |= MTCP_ADDR_BIND; 688*76404edcSAsim Jamshed 689*76404edcSAsim Jamshed return 0; 690*76404edcSAsim Jamshed } 691*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 692*76404edcSAsim Jamshed int 693*76404edcSAsim Jamshed mtcp_listen(mctx_t mctx, int sockid, int backlog) 694*76404edcSAsim Jamshed { 695*76404edcSAsim Jamshed mtcp_manager_t mtcp; 696*76404edcSAsim Jamshed struct tcp_listener *listener; 697*76404edcSAsim Jamshed 698*76404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 699*76404edcSAsim Jamshed if (!mtcp) { 700*76404edcSAsim Jamshed errno = EACCES; 701*76404edcSAsim Jamshed return -1; 702*76404edcSAsim Jamshed } 703*76404edcSAsim Jamshed 704*76404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 705*76404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 706*76404edcSAsim Jamshed errno = EBADF; 707*76404edcSAsim Jamshed return -1; 708*76404edcSAsim Jamshed } 709*76404edcSAsim Jamshed 710*76404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 711*76404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 712*76404edcSAsim Jamshed errno = EBADF; 713*76404edcSAsim Jamshed return -1; 714*76404edcSAsim Jamshed } 715*76404edcSAsim Jamshed 716*76404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) { 717*76404edcSAsim Jamshed mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN; 718*76404edcSAsim Jamshed } 719*76404edcSAsim Jamshed 720*76404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 721*76404edcSAsim Jamshed TRACE_API("Not a listening socket. id: %d\n", sockid); 722*76404edcSAsim Jamshed errno = ENOTSOCK; 723*76404edcSAsim Jamshed return -1; 724*76404edcSAsim Jamshed } 725*76404edcSAsim Jamshed 726*76404edcSAsim Jamshed if (backlog <= 0 || backlog > g_config.mos->max_concurrency) { 727*76404edcSAsim Jamshed errno = EINVAL; 728*76404edcSAsim Jamshed return -1; 729*76404edcSAsim Jamshed } 730*76404edcSAsim Jamshed 731*76404edcSAsim Jamshed listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener)); 732*76404edcSAsim Jamshed if (!listener) { 733*76404edcSAsim Jamshed /* errno set from the malloc() */ 734*76404edcSAsim Jamshed errno = ENOMEM; 735*76404edcSAsim Jamshed return -1; 736*76404edcSAsim Jamshed } 737*76404edcSAsim Jamshed 738*76404edcSAsim Jamshed listener->sockid = sockid; 739*76404edcSAsim Jamshed listener->backlog = backlog; 740*76404edcSAsim Jamshed listener->socket = &mtcp->smap[sockid]; 741*76404edcSAsim Jamshed 742*76404edcSAsim Jamshed if (pthread_cond_init(&listener->accept_cond, NULL)) { 743*76404edcSAsim Jamshed perror("pthread_cond_init of ctx->accept_cond\n"); 744*76404edcSAsim Jamshed /* errno set by pthread_cond_init() */ 745*76404edcSAsim Jamshed return -1; 746*76404edcSAsim Jamshed } 747*76404edcSAsim Jamshed if (pthread_mutex_init(&listener->accept_lock, NULL)) { 748*76404edcSAsim Jamshed perror("pthread_mutex_init of ctx->accept_lock\n"); 749*76404edcSAsim Jamshed /* errno set by pthread_mutex_init() */ 750*76404edcSAsim Jamshed return -1; 751*76404edcSAsim Jamshed } 752*76404edcSAsim Jamshed 753*76404edcSAsim Jamshed listener->acceptq = CreateStreamQueue(backlog); 754*76404edcSAsim Jamshed if (!listener->acceptq) { 755*76404edcSAsim Jamshed errno = ENOMEM; 756*76404edcSAsim Jamshed return -1; 757*76404edcSAsim Jamshed } 758*76404edcSAsim Jamshed 759*76404edcSAsim Jamshed mtcp->smap[sockid].listener = listener; 760*76404edcSAsim Jamshed mtcp->listener = listener; 761*76404edcSAsim Jamshed 762*76404edcSAsim Jamshed return 0; 763*76404edcSAsim Jamshed } 764*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 765*76404edcSAsim Jamshed int 766*76404edcSAsim Jamshed mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen) 767*76404edcSAsim Jamshed { 768*76404edcSAsim Jamshed mtcp_manager_t mtcp; 769*76404edcSAsim Jamshed struct tcp_listener *listener; 770*76404edcSAsim Jamshed socket_map_t socket; 771*76404edcSAsim Jamshed tcp_stream *accepted = NULL; 772*76404edcSAsim Jamshed 773*76404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 774*76404edcSAsim Jamshed if (!mtcp) { 775*76404edcSAsim Jamshed errno = EACCES; 776*76404edcSAsim Jamshed return -1; 777*76404edcSAsim Jamshed } 778*76404edcSAsim Jamshed 779*76404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 780*76404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 781*76404edcSAsim Jamshed errno = EBADF; 782*76404edcSAsim Jamshed return -1; 783*76404edcSAsim Jamshed } 784*76404edcSAsim Jamshed 785*76404edcSAsim Jamshed /* requires listening socket */ 786*76404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) { 787*76404edcSAsim Jamshed errno = EINVAL; 788*76404edcSAsim Jamshed return -1; 789*76404edcSAsim Jamshed } 790*76404edcSAsim Jamshed 791*76404edcSAsim Jamshed listener = mtcp->smap[sockid].listener; 792*76404edcSAsim Jamshed 793*76404edcSAsim Jamshed /* dequeue from the acceptq without lock first */ 794*76404edcSAsim Jamshed /* if nothing there, acquire lock and cond_wait */ 795*76404edcSAsim Jamshed accepted = StreamDequeue(listener->acceptq); 796*76404edcSAsim Jamshed if (!accepted) { 797*76404edcSAsim Jamshed if (listener->socket->opts & MTCP_NONBLOCK) { 798*76404edcSAsim Jamshed errno = EAGAIN; 799*76404edcSAsim Jamshed return -1; 800*76404edcSAsim Jamshed 801*76404edcSAsim Jamshed } else { 802*76404edcSAsim Jamshed pthread_mutex_lock(&listener->accept_lock); 803*76404edcSAsim Jamshed while ((accepted = StreamDequeue(listener->acceptq)) == NULL) { 804*76404edcSAsim Jamshed pthread_cond_wait(&listener->accept_cond, &listener->accept_lock); 805*76404edcSAsim Jamshed 806*76404edcSAsim Jamshed if (mtcp->ctx->done || mtcp->ctx->exit) { 807*76404edcSAsim Jamshed pthread_mutex_unlock(&listener->accept_lock); 808*76404edcSAsim Jamshed errno = EINTR; 809*76404edcSAsim Jamshed return -1; 810*76404edcSAsim Jamshed } 811*76404edcSAsim Jamshed } 812*76404edcSAsim Jamshed pthread_mutex_unlock(&listener->accept_lock); 813*76404edcSAsim Jamshed } 814*76404edcSAsim Jamshed } 815*76404edcSAsim Jamshed 816*76404edcSAsim Jamshed if (!accepted) { 817*76404edcSAsim Jamshed TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n"); 818*76404edcSAsim Jamshed } 819*76404edcSAsim Jamshed 820*76404edcSAsim Jamshed if (!accepted->socket) { 821*76404edcSAsim Jamshed socket = AllocateSocket(mctx, MOS_SOCK_STREAM); 822*76404edcSAsim Jamshed if (!socket) { 823*76404edcSAsim Jamshed TRACE_ERROR("Failed to create new socket!\n"); 824*76404edcSAsim Jamshed /* TODO: destroy the stream */ 825*76404edcSAsim Jamshed errno = ENFILE; 826*76404edcSAsim Jamshed return -1; 827*76404edcSAsim Jamshed } 828*76404edcSAsim Jamshed socket->stream = accepted; 829*76404edcSAsim Jamshed accepted->socket = socket; 830*76404edcSAsim Jamshed /* if monitor is enabled, complete the socket assignment */ 831*76404edcSAsim Jamshed if (socket->stream->pair_stream != NULL) 832*76404edcSAsim Jamshed socket->stream->pair_stream->socket = socket; 833*76404edcSAsim Jamshed } 834*76404edcSAsim Jamshed 835*76404edcSAsim Jamshed TRACE_API("Stream %d accepted.\n", accepted->id); 836*76404edcSAsim Jamshed 837*76404edcSAsim Jamshed if (addr && addrlen) { 838*76404edcSAsim Jamshed struct sockaddr_in *addr_in = (struct sockaddr_in *)addr; 839*76404edcSAsim Jamshed addr_in->sin_family = AF_INET; 840*76404edcSAsim Jamshed addr_in->sin_port = accepted->dport; 841*76404edcSAsim Jamshed addr_in->sin_addr.s_addr = accepted->daddr; 842*76404edcSAsim Jamshed *addrlen = sizeof(struct sockaddr_in); 843*76404edcSAsim Jamshed } 844*76404edcSAsim Jamshed 845*76404edcSAsim Jamshed return accepted->socket->id; 846*76404edcSAsim Jamshed } 847*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 848*76404edcSAsim Jamshed int 849*76404edcSAsim Jamshed mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr, 850*76404edcSAsim Jamshed in_addr_t daddr, in_addr_t dport) 851*76404edcSAsim Jamshed { 852*76404edcSAsim Jamshed mtcp_manager_t mtcp; 853*76404edcSAsim Jamshed addr_pool_t ap; 854*76404edcSAsim Jamshed 855*76404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 856*76404edcSAsim Jamshed if (!mtcp) { 857*76404edcSAsim Jamshed errno = EACCES; 858*76404edcSAsim Jamshed return -1; 859*76404edcSAsim Jamshed } 860*76404edcSAsim Jamshed 861*76404edcSAsim Jamshed if (saddr_base == INADDR_ANY) { 862*76404edcSAsim Jamshed int nif_out; 863*76404edcSAsim Jamshed 864*76404edcSAsim Jamshed /* for the INADDR_ANY, find the output interface for the destination 865*76404edcSAsim Jamshed and set the saddr_base as the ip address of the output interface */ 866*76404edcSAsim Jamshed nif_out = GetOutputInterface(daddr); 867*76404edcSAsim Jamshed saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr; 868*76404edcSAsim Jamshed } 869*76404edcSAsim Jamshed 870*76404edcSAsim Jamshed ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus, 871*76404edcSAsim Jamshed saddr_base, num_addr, daddr, dport); 872*76404edcSAsim Jamshed if (!ap) { 873*76404edcSAsim Jamshed errno = ENOMEM; 874*76404edcSAsim Jamshed return -1; 875*76404edcSAsim Jamshed } 876*76404edcSAsim Jamshed 877*76404edcSAsim Jamshed mtcp->ap = ap; 878*76404edcSAsim Jamshed 879*76404edcSAsim Jamshed return 0; 880*76404edcSAsim Jamshed } 881*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 882*76404edcSAsim Jamshed int 883*76404edcSAsim Jamshed eval_bpf_5tuple(struct sfbpf_program fcode, 884*76404edcSAsim Jamshed in_addr_t saddr, in_port_t sport, 885*76404edcSAsim Jamshed in_addr_t daddr, in_port_t dport) { 886*76404edcSAsim Jamshed uint8_t buf[TOTAL_TCP_HEADER_LEN]; 887*76404edcSAsim Jamshed struct ethhdr *ethh; 888*76404edcSAsim Jamshed struct iphdr *iph; 889*76404edcSAsim Jamshed struct tcphdr *tcph; 890*76404edcSAsim Jamshed 891*76404edcSAsim Jamshed ethh = (struct ethhdr *)buf; 892*76404edcSAsim Jamshed ethh->h_proto = htons(ETH_P_IP); 893*76404edcSAsim Jamshed iph = (struct iphdr *)(ethh + 1); 894*76404edcSAsim Jamshed iph->ihl = IP_HEADER_LEN >> 2; 895*76404edcSAsim Jamshed iph->version = 4; 896*76404edcSAsim Jamshed iph->tos = 0; 897*76404edcSAsim Jamshed iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN); 898*76404edcSAsim Jamshed iph->id = htons(0); 899*76404edcSAsim Jamshed iph->protocol = IPPROTO_TCP; 900*76404edcSAsim Jamshed iph->saddr = saddr; 901*76404edcSAsim Jamshed iph->daddr = daddr; 902*76404edcSAsim Jamshed iph->check = 0; 903*76404edcSAsim Jamshed tcph = (struct tcphdr *)(iph + 1); 904*76404edcSAsim Jamshed tcph->source = sport; 905*76404edcSAsim Jamshed tcph->dest = dport; 906*76404edcSAsim Jamshed 907*76404edcSAsim Jamshed return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr), 908*76404edcSAsim Jamshed TOTAL_TCP_HEADER_LEN); 909*76404edcSAsim Jamshed } 910*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 911*76404edcSAsim Jamshed int 912*76404edcSAsim Jamshed mtcp_connect(mctx_t mctx, int sockid, 913*76404edcSAsim Jamshed const struct sockaddr *addr, socklen_t addrlen) 914*76404edcSAsim Jamshed { 915*76404edcSAsim Jamshed mtcp_manager_t mtcp; 916*76404edcSAsim Jamshed socket_map_t socket; 917*76404edcSAsim Jamshed tcp_stream *cur_stream; 918*76404edcSAsim Jamshed struct sockaddr_in *addr_in; 919*76404edcSAsim Jamshed in_addr_t dip; 920*76404edcSAsim Jamshed in_port_t dport; 921*76404edcSAsim Jamshed int is_dyn_bound = FALSE; 922*76404edcSAsim Jamshed int ret; 923*76404edcSAsim Jamshed int cnt_match = 0; 924*76404edcSAsim Jamshed struct mon_listener *walk; 925*76404edcSAsim Jamshed struct sfbpf_program fcode; 926*76404edcSAsim Jamshed 927*76404edcSAsim Jamshed cur_stream = NULL; 928*76404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 929*76404edcSAsim Jamshed if (!mtcp) { 930*76404edcSAsim Jamshed errno = EACCES; 931*76404edcSAsim Jamshed return -1; 932*76404edcSAsim Jamshed } 933*76404edcSAsim Jamshed 934*76404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 935*76404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 936*76404edcSAsim Jamshed errno = EBADF; 937*76404edcSAsim Jamshed return -1; 938*76404edcSAsim Jamshed } 939*76404edcSAsim Jamshed 940*76404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 941*76404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 942*76404edcSAsim Jamshed errno = EBADF; 943*76404edcSAsim Jamshed return -1; 944*76404edcSAsim Jamshed } 945*76404edcSAsim Jamshed 946*76404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 947*76404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 948*76404edcSAsim Jamshed errno = ENOTSOCK; 949*76404edcSAsim Jamshed return -1; 950*76404edcSAsim Jamshed } 951*76404edcSAsim Jamshed 952*76404edcSAsim Jamshed if (!addr) { 953*76404edcSAsim Jamshed TRACE_API("Socket %d: empty address!\n", sockid); 954*76404edcSAsim Jamshed errno = EFAULT; 955*76404edcSAsim Jamshed return -1; 956*76404edcSAsim Jamshed } 957*76404edcSAsim Jamshed 958*76404edcSAsim Jamshed /* we only allow bind() for AF_INET address */ 959*76404edcSAsim Jamshed if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) { 960*76404edcSAsim Jamshed TRACE_API("Socket %d: invalid argument!\n", sockid); 961*76404edcSAsim Jamshed errno = EAFNOSUPPORT; 962*76404edcSAsim Jamshed return -1; 963*76404edcSAsim Jamshed } 964*76404edcSAsim Jamshed 965*76404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 966*76404edcSAsim Jamshed if (socket->stream) { 967*76404edcSAsim Jamshed TRACE_API("Socket %d: stream already exist!\n", sockid); 968*76404edcSAsim Jamshed if (socket->stream->state >= TCP_ST_ESTABLISHED) { 969*76404edcSAsim Jamshed errno = EISCONN; 970*76404edcSAsim Jamshed } else { 971*76404edcSAsim Jamshed errno = EALREADY; 972*76404edcSAsim Jamshed } 973*76404edcSAsim Jamshed return -1; 974*76404edcSAsim Jamshed } 975*76404edcSAsim Jamshed 976*76404edcSAsim Jamshed addr_in = (struct sockaddr_in *)addr; 977*76404edcSAsim Jamshed dip = addr_in->sin_addr.s_addr; 978*76404edcSAsim Jamshed dport = addr_in->sin_port; 979*76404edcSAsim Jamshed 980*76404edcSAsim Jamshed /* address binding */ 981*76404edcSAsim Jamshed if (socket->opts & MTCP_ADDR_BIND && 982*76404edcSAsim Jamshed socket->saddr.sin_port != INPORT_ANY && 983*76404edcSAsim Jamshed socket->saddr.sin_addr.s_addr != INADDR_ANY) { 984*76404edcSAsim Jamshed int rss_core; 985*76404edcSAsim Jamshed 986*76404edcSAsim Jamshed rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip, 987*76404edcSAsim Jamshed socket->saddr.sin_port, dport, num_queues); 988*76404edcSAsim Jamshed 989*76404edcSAsim Jamshed if (rss_core != mctx->cpu) { 990*76404edcSAsim Jamshed errno = EINVAL; 991*76404edcSAsim Jamshed return -1; 992*76404edcSAsim Jamshed } 993*76404edcSAsim Jamshed } else { 994*76404edcSAsim Jamshed if (mtcp->ap) { 995*76404edcSAsim Jamshed ret = FetchAddress(mtcp->ap, 996*76404edcSAsim Jamshed mctx->cpu, num_queues, addr_in, &socket->saddr); 997*76404edcSAsim Jamshed } else { 998*76404edcSAsim Jamshed ret = FetchAddress(ap, 999*76404edcSAsim Jamshed mctx->cpu, num_queues, addr_in, &socket->saddr); 1000*76404edcSAsim Jamshed } 1001*76404edcSAsim Jamshed if (ret < 0) { 1002*76404edcSAsim Jamshed errno = EAGAIN; 1003*76404edcSAsim Jamshed return -1; 1004*76404edcSAsim Jamshed } 1005*76404edcSAsim Jamshed socket->opts |= MTCP_ADDR_BIND; 1006*76404edcSAsim Jamshed is_dyn_bound = TRUE; 1007*76404edcSAsim Jamshed } 1008*76404edcSAsim Jamshed 1009*76404edcSAsim Jamshed cnt_match = 0; 1010*76404edcSAsim Jamshed if (mtcp->num_msp > 0) { 1011*76404edcSAsim Jamshed TAILQ_FOREACH(walk, &mtcp->monitors, link) { 1012*76404edcSAsim Jamshed fcode = walk->stream_syn_fcode; 1013*76404edcSAsim Jamshed if (!(ISSET_BPFFILTER(fcode) && 1014*76404edcSAsim Jamshed eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr, 1015*76404edcSAsim Jamshed socket->saddr.sin_port, 1016*76404edcSAsim Jamshed dip, dport) == 0)) { 1017*76404edcSAsim Jamshed walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1 1018*76404edcSAsim Jamshed cnt_match++; 1019*76404edcSAsim Jamshed } 1020*76404edcSAsim Jamshed } 1021*76404edcSAsim Jamshed } 1022*76404edcSAsim Jamshed 1023*76404edcSAsim Jamshed if (mtcp->num_msp > 0 && cnt_match > 0) { 1024*76404edcSAsim Jamshed /* 150820 dhkim: XXX: embedded mode is not verified */ 1025*76404edcSAsim Jamshed #if 1 1026*76404edcSAsim Jamshed cur_stream = CreateClientTCPStream(mtcp, socket, 1027*76404edcSAsim Jamshed STREAM_TYPE(MOS_SOCK_STREAM) | 1028*76404edcSAsim Jamshed STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 1029*76404edcSAsim Jamshed socket->saddr.sin_addr.s_addr, 1030*76404edcSAsim Jamshed socket->saddr.sin_port, dip, dport, NULL); 1031*76404edcSAsim Jamshed #else 1032*76404edcSAsim Jamshed cur_stream = CreateDualTCPStream(mtcp, socket, 1033*76404edcSAsim Jamshed STREAM_TYPE(MOS_SOCK_STREAM) | 1034*76404edcSAsim Jamshed STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE), 1035*76404edcSAsim Jamshed socket->saddr.sin_addr.s_addr, 1036*76404edcSAsim Jamshed socket->saddr.sin_port, dip, dport, NULL); 1037*76404edcSAsim Jamshed #endif 1038*76404edcSAsim Jamshed } 1039*76404edcSAsim Jamshed else 1040*76404edcSAsim Jamshed cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM), 1041*76404edcSAsim Jamshed socket->saddr.sin_addr.s_addr, 1042*76404edcSAsim Jamshed socket->saddr.sin_port, dip, dport, NULL); 1043*76404edcSAsim Jamshed if (!cur_stream) { 1044*76404edcSAsim Jamshed TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid); 1045*76404edcSAsim Jamshed errno = ENOMEM; 1046*76404edcSAsim Jamshed return -1; 1047*76404edcSAsim Jamshed } 1048*76404edcSAsim Jamshed 1049*76404edcSAsim Jamshed if (is_dyn_bound) 1050*76404edcSAsim Jamshed cur_stream->is_bound_addr = TRUE; 1051*76404edcSAsim Jamshed cur_stream->sndvar->cwnd = 1; 1052*76404edcSAsim Jamshed cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10; 1053*76404edcSAsim Jamshed cur_stream->side = MOS_SIDE_CLI; 1054*76404edcSAsim Jamshed /* if monitor is enabled, update the pair stream side as well */ 1055*76404edcSAsim Jamshed if (cur_stream->pair_stream) { 1056*76404edcSAsim Jamshed cur_stream->pair_stream->side = MOS_SIDE_SVR; 1057*76404edcSAsim Jamshed /* 1058*76404edcSAsim Jamshed * if buffer management is off, then disable 1059*76404edcSAsim Jamshed * monitoring tcp ring of server... 1060*76404edcSAsim Jamshed * if there is even a single monitor asking for 1061*76404edcSAsim Jamshed * buffer management, enable it (that's why the 1062*76404edcSAsim Jamshed * need for the loop) 1063*76404edcSAsim Jamshed */ 1064*76404edcSAsim Jamshed cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF; 1065*76404edcSAsim Jamshed struct socket_map *walk; 1066*76404edcSAsim Jamshed SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 1067*76404edcSAsim Jamshed uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt; 1068*76404edcSAsim Jamshed if (bm > cur_stream->pair_stream->buffer_mgmt) { 1069*76404edcSAsim Jamshed cur_stream->pair_stream->buffer_mgmt = bm; 1070*76404edcSAsim Jamshed break; 1071*76404edcSAsim Jamshed } 1072*76404edcSAsim Jamshed } SOCKQ_FOREACH_END; 1073*76404edcSAsim Jamshed } 1074*76404edcSAsim Jamshed 1075*76404edcSAsim Jamshed cur_stream->state = TCP_ST_SYN_SENT; 1076*76404edcSAsim Jamshed cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1077*76404edcSAsim Jamshed 1078*76404edcSAsim Jamshed TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id); 1079*76404edcSAsim Jamshed 1080*76404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->connect_lock); 1081*76404edcSAsim Jamshed ret = StreamEnqueue(mtcp->connectq, cur_stream); 1082*76404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->connect_lock); 1083*76404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 1084*76404edcSAsim Jamshed if (ret < 0) { 1085*76404edcSAsim Jamshed TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid); 1086*76404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock); 1087*76404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream); 1088*76404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1089*76404edcSAsim Jamshed errno = EAGAIN; 1090*76404edcSAsim Jamshed return -1; 1091*76404edcSAsim Jamshed } 1092*76404edcSAsim Jamshed 1093*76404edcSAsim Jamshed /* if nonblocking socket, return EINPROGRESS */ 1094*76404edcSAsim Jamshed if (socket->opts & MTCP_NONBLOCK) { 1095*76404edcSAsim Jamshed errno = EINPROGRESS; 1096*76404edcSAsim Jamshed return -1; 1097*76404edcSAsim Jamshed 1098*76404edcSAsim Jamshed } else { 1099*76404edcSAsim Jamshed while (1) { 1100*76404edcSAsim Jamshed if (!cur_stream) { 1101*76404edcSAsim Jamshed TRACE_ERROR("STREAM DESTROYED\n"); 1102*76404edcSAsim Jamshed errno = ETIMEDOUT; 1103*76404edcSAsim Jamshed return -1; 1104*76404edcSAsim Jamshed } 1105*76404edcSAsim Jamshed if (cur_stream->state > TCP_ST_ESTABLISHED) { 1106*76404edcSAsim Jamshed TRACE_ERROR("Socket %d: weird state %s\n", 1107*76404edcSAsim Jamshed sockid, TCPStateToString(cur_stream)); 1108*76404edcSAsim Jamshed // TODO: how to handle this? 1109*76404edcSAsim Jamshed errno = ENOSYS; 1110*76404edcSAsim Jamshed return -1; 1111*76404edcSAsim Jamshed } 1112*76404edcSAsim Jamshed 1113*76404edcSAsim Jamshed if (cur_stream->state == TCP_ST_ESTABLISHED) { 1114*76404edcSAsim Jamshed break; 1115*76404edcSAsim Jamshed } 1116*76404edcSAsim Jamshed usleep(1000); 1117*76404edcSAsim Jamshed } 1118*76404edcSAsim Jamshed } 1119*76404edcSAsim Jamshed 1120*76404edcSAsim Jamshed return 0; 1121*76404edcSAsim Jamshed } 1122*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 1123*76404edcSAsim Jamshed static inline int 1124*76404edcSAsim Jamshed CloseStreamSocket(mctx_t mctx, int sockid) 1125*76404edcSAsim Jamshed { 1126*76404edcSAsim Jamshed mtcp_manager_t mtcp; 1127*76404edcSAsim Jamshed tcp_stream *cur_stream; 1128*76404edcSAsim Jamshed int ret; 1129*76404edcSAsim Jamshed 1130*76404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 1131*76404edcSAsim Jamshed if (!mtcp) { 1132*76404edcSAsim Jamshed errno = EACCES; 1133*76404edcSAsim Jamshed return -1; 1134*76404edcSAsim Jamshed } 1135*76404edcSAsim Jamshed 1136*76404edcSAsim Jamshed cur_stream = mtcp->smap[sockid].stream; 1137*76404edcSAsim Jamshed if (!cur_stream) { 1138*76404edcSAsim Jamshed TRACE_API("Socket %d: stream does not exist.\n", sockid); 1139*76404edcSAsim Jamshed errno = ENOTCONN; 1140*76404edcSAsim Jamshed return -1; 1141*76404edcSAsim Jamshed } 1142*76404edcSAsim Jamshed 1143*76404edcSAsim Jamshed if (cur_stream->closed) { 1144*76404edcSAsim Jamshed TRACE_API("Socket %d (Stream %u): already closed stream\n", 1145*76404edcSAsim Jamshed sockid, cur_stream->id); 1146*76404edcSAsim Jamshed return 0; 1147*76404edcSAsim Jamshed } 1148*76404edcSAsim Jamshed cur_stream->closed = TRUE; 1149*76404edcSAsim Jamshed 1150*76404edcSAsim Jamshed TRACE_API("Stream %d: closing the stream.\n", cur_stream->id); 1151*76404edcSAsim Jamshed 1152*76404edcSAsim Jamshed /* 141029 dhkim: Check this! */ 1153*76404edcSAsim Jamshed cur_stream->socket = NULL; 1154*76404edcSAsim Jamshed 1155*76404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 1156*76404edcSAsim Jamshed TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n", 1157*76404edcSAsim Jamshed cur_stream->id); 1158*76404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock); 1159*76404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream); 1160*76404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 1161*76404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1162*76404edcSAsim Jamshed return 0; 1163*76404edcSAsim Jamshed 1164*76404edcSAsim Jamshed } else if (cur_stream->state == TCP_ST_SYN_SENT) { 1165*76404edcSAsim Jamshed #if 1 1166*76404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock); 1167*76404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream); 1168*76404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1169*76404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 1170*76404edcSAsim Jamshed #endif 1171*76404edcSAsim Jamshed return -1; 1172*76404edcSAsim Jamshed 1173*76404edcSAsim Jamshed } else if (cur_stream->state != TCP_ST_ESTABLISHED && 1174*76404edcSAsim Jamshed cur_stream->state != TCP_ST_CLOSE_WAIT) { 1175*76404edcSAsim Jamshed TRACE_API("Stream %d at state %s\n", 1176*76404edcSAsim Jamshed cur_stream->id, TCPStateToString(cur_stream)); 1177*76404edcSAsim Jamshed errno = EBADF; 1178*76404edcSAsim Jamshed return -1; 1179*76404edcSAsim Jamshed } 1180*76404edcSAsim Jamshed 1181*76404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->close_lock); 1182*76404edcSAsim Jamshed cur_stream->sndvar->on_closeq = TRUE; 1183*76404edcSAsim Jamshed ret = StreamEnqueue(mtcp->closeq, cur_stream); 1184*76404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 1185*76404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->close_lock); 1186*76404edcSAsim Jamshed 1187*76404edcSAsim Jamshed if (ret < 0) { 1188*76404edcSAsim Jamshed TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 1189*76404edcSAsim Jamshed errno = EAGAIN; 1190*76404edcSAsim Jamshed return -1; 1191*76404edcSAsim Jamshed } 1192*76404edcSAsim Jamshed 1193*76404edcSAsim Jamshed return 0; 1194*76404edcSAsim Jamshed } 1195*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 1196*76404edcSAsim Jamshed static inline int 1197*76404edcSAsim Jamshed CloseListeningSocket(mctx_t mctx, int sockid) 1198*76404edcSAsim Jamshed { 1199*76404edcSAsim Jamshed mtcp_manager_t mtcp; 1200*76404edcSAsim Jamshed struct tcp_listener *listener; 1201*76404edcSAsim Jamshed 1202*76404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 1203*76404edcSAsim Jamshed if (!mtcp) { 1204*76404edcSAsim Jamshed errno = EACCES; 1205*76404edcSAsim Jamshed return -1; 1206*76404edcSAsim Jamshed } 1207*76404edcSAsim Jamshed 1208*76404edcSAsim Jamshed listener = mtcp->smap[sockid].listener; 1209*76404edcSAsim Jamshed if (!listener) { 1210*76404edcSAsim Jamshed errno = EINVAL; 1211*76404edcSAsim Jamshed return -1; 1212*76404edcSAsim Jamshed } 1213*76404edcSAsim Jamshed 1214*76404edcSAsim Jamshed if (listener->acceptq) { 1215*76404edcSAsim Jamshed DestroyStreamQueue(listener->acceptq); 1216*76404edcSAsim Jamshed listener->acceptq = NULL; 1217*76404edcSAsim Jamshed } 1218*76404edcSAsim Jamshed 1219*76404edcSAsim Jamshed pthread_mutex_lock(&listener->accept_lock); 1220*76404edcSAsim Jamshed pthread_cond_signal(&listener->accept_cond); 1221*76404edcSAsim Jamshed pthread_mutex_unlock(&listener->accept_lock); 1222*76404edcSAsim Jamshed 1223*76404edcSAsim Jamshed pthread_cond_destroy(&listener->accept_cond); 1224*76404edcSAsim Jamshed pthread_mutex_destroy(&listener->accept_lock); 1225*76404edcSAsim Jamshed 1226*76404edcSAsim Jamshed free(listener); 1227*76404edcSAsim Jamshed mtcp->smap[sockid].listener = NULL; 1228*76404edcSAsim Jamshed 1229*76404edcSAsim Jamshed return 0; 1230*76404edcSAsim Jamshed } 1231*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 1232*76404edcSAsim Jamshed int 1233*76404edcSAsim Jamshed mtcp_close(mctx_t mctx, int sockid) 1234*76404edcSAsim Jamshed { 1235*76404edcSAsim Jamshed mtcp_manager_t mtcp; 1236*76404edcSAsim Jamshed int ret; 1237*76404edcSAsim Jamshed 1238*76404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 1239*76404edcSAsim Jamshed if (!mtcp) { 1240*76404edcSAsim Jamshed errno = EACCES; 1241*76404edcSAsim Jamshed return -1; 1242*76404edcSAsim Jamshed } 1243*76404edcSAsim Jamshed 1244*76404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1245*76404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 1246*76404edcSAsim Jamshed errno = EBADF; 1247*76404edcSAsim Jamshed return -1; 1248*76404edcSAsim Jamshed } 1249*76404edcSAsim Jamshed 1250*76404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 1251*76404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 1252*76404edcSAsim Jamshed errno = EBADF; 1253*76404edcSAsim Jamshed return -1; 1254*76404edcSAsim Jamshed } 1255*76404edcSAsim Jamshed 1256*76404edcSAsim Jamshed TRACE_API("Socket %d: mtcp_close called.\n", sockid); 1257*76404edcSAsim Jamshed 1258*76404edcSAsim Jamshed switch (mtcp->smap[sockid].socktype) { 1259*76404edcSAsim Jamshed case MOS_SOCK_STREAM: 1260*76404edcSAsim Jamshed ret = CloseStreamSocket(mctx, sockid); 1261*76404edcSAsim Jamshed break; 1262*76404edcSAsim Jamshed 1263*76404edcSAsim Jamshed case MOS_SOCK_STREAM_LISTEN: 1264*76404edcSAsim Jamshed ret = CloseListeningSocket(mctx, sockid); 1265*76404edcSAsim Jamshed break; 1266*76404edcSAsim Jamshed 1267*76404edcSAsim Jamshed case MOS_SOCK_EPOLL: 1268*76404edcSAsim Jamshed ret = CloseEpollSocket(mctx, sockid); 1269*76404edcSAsim Jamshed break; 1270*76404edcSAsim Jamshed 1271*76404edcSAsim Jamshed case MOS_SOCK_PIPE: 1272*76404edcSAsim Jamshed ret = PipeClose(mctx, sockid); 1273*76404edcSAsim Jamshed break; 1274*76404edcSAsim Jamshed 1275*76404edcSAsim Jamshed default: 1276*76404edcSAsim Jamshed errno = EINVAL; 1277*76404edcSAsim Jamshed ret = -1; 1278*76404edcSAsim Jamshed break; 1279*76404edcSAsim Jamshed } 1280*76404edcSAsim Jamshed 1281*76404edcSAsim Jamshed FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 1282*76404edcSAsim Jamshed 1283*76404edcSAsim Jamshed return ret; 1284*76404edcSAsim Jamshed } 1285*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 1286*76404edcSAsim Jamshed int 1287*76404edcSAsim Jamshed mtcp_abort(mctx_t mctx, int sockid) 1288*76404edcSAsim Jamshed { 1289*76404edcSAsim Jamshed mtcp_manager_t mtcp; 1290*76404edcSAsim Jamshed tcp_stream *cur_stream; 1291*76404edcSAsim Jamshed int ret; 1292*76404edcSAsim Jamshed 1293*76404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 1294*76404edcSAsim Jamshed if (!mtcp) { 1295*76404edcSAsim Jamshed errno = EACCES; 1296*76404edcSAsim Jamshed return -1; 1297*76404edcSAsim Jamshed } 1298*76404edcSAsim Jamshed 1299*76404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1300*76404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 1301*76404edcSAsim Jamshed errno = EBADF; 1302*76404edcSAsim Jamshed return -1; 1303*76404edcSAsim Jamshed } 1304*76404edcSAsim Jamshed 1305*76404edcSAsim Jamshed if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) { 1306*76404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 1307*76404edcSAsim Jamshed errno = EBADF; 1308*76404edcSAsim Jamshed return -1; 1309*76404edcSAsim Jamshed } 1310*76404edcSAsim Jamshed 1311*76404edcSAsim Jamshed if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) { 1312*76404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 1313*76404edcSAsim Jamshed errno = ENOTSOCK; 1314*76404edcSAsim Jamshed return -1; 1315*76404edcSAsim Jamshed } 1316*76404edcSAsim Jamshed 1317*76404edcSAsim Jamshed cur_stream = mtcp->smap[sockid].stream; 1318*76404edcSAsim Jamshed if (!cur_stream) { 1319*76404edcSAsim Jamshed TRACE_API("Stream %d: does not exist.\n", sockid); 1320*76404edcSAsim Jamshed errno = ENOTCONN; 1321*76404edcSAsim Jamshed return -1; 1322*76404edcSAsim Jamshed } 1323*76404edcSAsim Jamshed 1324*76404edcSAsim Jamshed TRACE_API("Socket %d: mtcp_abort()\n", sockid); 1325*76404edcSAsim Jamshed 1326*76404edcSAsim Jamshed FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype); 1327*76404edcSAsim Jamshed cur_stream->socket = NULL; 1328*76404edcSAsim Jamshed 1329*76404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSED_RSVD) { 1330*76404edcSAsim Jamshed TRACE_API("Stream %d: connection already reset.\n", sockid); 1331*76404edcSAsim Jamshed return ERROR; 1332*76404edcSAsim Jamshed 1333*76404edcSAsim Jamshed } else if (cur_stream->state == TCP_ST_SYN_SENT) { 1334*76404edcSAsim Jamshed /* TODO: this should notify event failure to all 1335*76404edcSAsim Jamshed previous read() or write() calls */ 1336*76404edcSAsim Jamshed cur_stream->state = TCP_ST_CLOSED_RSVD; 1337*76404edcSAsim Jamshed cur_stream->close_reason = TCP_ACTIVE_CLOSE; 1338*76404edcSAsim Jamshed cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1339*76404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock); 1340*76404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream); 1341*76404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1342*76404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 1343*76404edcSAsim Jamshed return 0; 1344*76404edcSAsim Jamshed 1345*76404edcSAsim Jamshed } else if (cur_stream->state == TCP_ST_CLOSING || 1346*76404edcSAsim Jamshed cur_stream->state == TCP_ST_LAST_ACK || 1347*76404edcSAsim Jamshed cur_stream->state == TCP_ST_TIME_WAIT) { 1348*76404edcSAsim Jamshed cur_stream->state = TCP_ST_CLOSED_RSVD; 1349*76404edcSAsim Jamshed cur_stream->close_reason = TCP_ACTIVE_CLOSE; 1350*76404edcSAsim Jamshed cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 1351*76404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->destroyq_lock); 1352*76404edcSAsim Jamshed StreamEnqueue(mtcp->destroyq, cur_stream); 1353*76404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->destroyq_lock); 1354*76404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 1355*76404edcSAsim Jamshed return 0; 1356*76404edcSAsim Jamshed } 1357*76404edcSAsim Jamshed 1358*76404edcSAsim Jamshed /* the stream structure will be destroyed after sending RST */ 1359*76404edcSAsim Jamshed if (cur_stream->sndvar->on_resetq) { 1360*76404edcSAsim Jamshed TRACE_ERROR("Stream %d: calling mtcp_abort() " 1361*76404edcSAsim Jamshed "when in reset queue.\n", sockid); 1362*76404edcSAsim Jamshed errno = ECONNRESET; 1363*76404edcSAsim Jamshed return -1; 1364*76404edcSAsim Jamshed } 1365*76404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->reset_lock); 1366*76404edcSAsim Jamshed cur_stream->sndvar->on_resetq = TRUE; 1367*76404edcSAsim Jamshed ret = StreamEnqueue(mtcp->resetq, cur_stream); 1368*76404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->reset_lock); 1369*76404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 1370*76404edcSAsim Jamshed 1371*76404edcSAsim Jamshed if (ret < 0) { 1372*76404edcSAsim Jamshed TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n"); 1373*76404edcSAsim Jamshed errno = EAGAIN; 1374*76404edcSAsim Jamshed return -1; 1375*76404edcSAsim Jamshed } 1376*76404edcSAsim Jamshed 1377*76404edcSAsim Jamshed return 0; 1378*76404edcSAsim Jamshed } 1379*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 1380*76404edcSAsim Jamshed static inline int 1381*76404edcSAsim Jamshed CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len) 1382*76404edcSAsim Jamshed { 1383*76404edcSAsim Jamshed struct tcp_recv_vars *rcvvar = cur_stream->rcvvar; 1384*76404edcSAsim Jamshed int copylen; 1385*76404edcSAsim Jamshed #ifdef NEWRB 1386*76404edcSAsim Jamshed tcprb_t *rb = rcvvar->rcvbuf; 1387*76404edcSAsim Jamshed if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) { 1388*76404edcSAsim Jamshed errno = EAGAIN; 1389*76404edcSAsim Jamshed return -1; 1390*76404edcSAsim Jamshed } 1391*76404edcSAsim Jamshed tcprb_setpile(rb, rb->pile + copylen); 1392*76404edcSAsim Jamshed 1393*76404edcSAsim Jamshed rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb); 1394*76404edcSAsim Jamshed //printf("rcv_wnd: %d\n", rcvvar->rcv_wnd); 1395*76404edcSAsim Jamshed #else 1396*76404edcSAsim Jamshed copylen = MIN(rcvvar->rcvbuf->merged_len, len); 1397*76404edcSAsim Jamshed if (copylen <= 0) { 1398*76404edcSAsim Jamshed errno = EAGAIN; 1399*76404edcSAsim Jamshed return -1; 1400*76404edcSAsim Jamshed } 1401*76404edcSAsim Jamshed 1402*76404edcSAsim Jamshed assert(rcvvar->rcvbuf->data); 1403*76404edcSAsim Jamshed /* Copy data to user buffer and remove it from receiving buffer */ 1404*76404edcSAsim Jamshed memcpy(buf, rcvvar->rcvbuf->head, copylen); 1405*76404edcSAsim Jamshed RBRemove(mtcp->rbm_rcv, rcvvar->rcvbuf, copylen, AT_APP, cur_stream->buffer_mgmt); 1406*76404edcSAsim Jamshed rcvvar->rcv_wnd = rcvvar->rcvbuf->size - rcvvar->rcvbuf->merged_len; 1407*76404edcSAsim Jamshed #endif 1408*76404edcSAsim Jamshed 1409*76404edcSAsim Jamshed /* Advertise newly freed receive buffer */ 1410*76404edcSAsim Jamshed if (cur_stream->need_wnd_adv) { 1411*76404edcSAsim Jamshed if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) { 1412*76404edcSAsim Jamshed if (!cur_stream->sndvar->on_ackq) { 1413*76404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->ackq_lock); 1414*76404edcSAsim Jamshed cur_stream->sndvar->on_ackq = TRUE; 1415*76404edcSAsim Jamshed StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */ 1416*76404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->ackq_lock); 1417*76404edcSAsim Jamshed cur_stream->need_wnd_adv = FALSE; 1418*76404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 1419*76404edcSAsim Jamshed } 1420*76404edcSAsim Jamshed } 1421*76404edcSAsim Jamshed } 1422*76404edcSAsim Jamshed 1423*76404edcSAsim Jamshed return copylen; 1424*76404edcSAsim Jamshed } 1425*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 1426*76404edcSAsim Jamshed ssize_t 1427*76404edcSAsim Jamshed mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len) 1428*76404edcSAsim Jamshed { 1429*76404edcSAsim Jamshed mtcp_manager_t mtcp; 1430*76404edcSAsim Jamshed socket_map_t socket; 1431*76404edcSAsim Jamshed tcp_stream *cur_stream; 1432*76404edcSAsim Jamshed struct tcp_recv_vars *rcvvar; 1433*76404edcSAsim Jamshed int event_remaining, merged_len; 1434*76404edcSAsim Jamshed int ret; 1435*76404edcSAsim Jamshed 1436*76404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 1437*76404edcSAsim Jamshed if (!mtcp) { 1438*76404edcSAsim Jamshed errno = EACCES; 1439*76404edcSAsim Jamshed return -1; 1440*76404edcSAsim Jamshed } 1441*76404edcSAsim Jamshed 1442*76404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1443*76404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 1444*76404edcSAsim Jamshed errno = EBADF; 1445*76404edcSAsim Jamshed return -1; 1446*76404edcSAsim Jamshed } 1447*76404edcSAsim Jamshed 1448*76404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 1449*76404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 1450*76404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 1451*76404edcSAsim Jamshed errno = EBADF; 1452*76404edcSAsim Jamshed return -1; 1453*76404edcSAsim Jamshed } 1454*76404edcSAsim Jamshed 1455*76404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_PIPE) { 1456*76404edcSAsim Jamshed return PipeRead(mctx, sockid, buf, len); 1457*76404edcSAsim Jamshed } 1458*76404edcSAsim Jamshed 1459*76404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM) { 1460*76404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 1461*76404edcSAsim Jamshed errno = ENOTSOCK; 1462*76404edcSAsim Jamshed return -1; 1463*76404edcSAsim Jamshed } 1464*76404edcSAsim Jamshed 1465*76404edcSAsim Jamshed /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 1466*76404edcSAsim Jamshed cur_stream = socket->stream; 1467*76404edcSAsim Jamshed if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf || 1468*76404edcSAsim Jamshed !(cur_stream->state >= TCP_ST_ESTABLISHED && 1469*76404edcSAsim Jamshed cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 1470*76404edcSAsim Jamshed errno = ENOTCONN; 1471*76404edcSAsim Jamshed return -1; 1472*76404edcSAsim Jamshed } 1473*76404edcSAsim Jamshed 1474*76404edcSAsim Jamshed rcvvar = cur_stream->rcvvar; 1475*76404edcSAsim Jamshed 1476*76404edcSAsim Jamshed #ifdef NEWRB 1477*76404edcSAsim Jamshed merged_len = tcprb_cflen(rcvvar->rcvbuf); 1478*76404edcSAsim Jamshed #else 1479*76404edcSAsim Jamshed merged_len = rcvvar->rcvbuf->merged_len; 1480*76404edcSAsim Jamshed #endif 1481*76404edcSAsim Jamshed 1482*76404edcSAsim Jamshed /* if CLOSE_WAIT, return 0 if there is no payload */ 1483*76404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 1484*76404edcSAsim Jamshed if (!rcvvar->rcvbuf) 1485*76404edcSAsim Jamshed return 0; 1486*76404edcSAsim Jamshed 1487*76404edcSAsim Jamshed if (merged_len == 0) 1488*76404edcSAsim Jamshed return 0; 1489*76404edcSAsim Jamshed } 1490*76404edcSAsim Jamshed 1491*76404edcSAsim Jamshed /* return EAGAIN if no receive buffer */ 1492*76404edcSAsim Jamshed if (socket->opts & MTCP_NONBLOCK) { 1493*76404edcSAsim Jamshed if (!rcvvar->rcvbuf || merged_len == 0) { 1494*76404edcSAsim Jamshed errno = EAGAIN; 1495*76404edcSAsim Jamshed return -1; 1496*76404edcSAsim Jamshed } 1497*76404edcSAsim Jamshed } 1498*76404edcSAsim Jamshed 1499*76404edcSAsim Jamshed SBUF_LOCK(&rcvvar->read_lock); 1500*76404edcSAsim Jamshed 1501*76404edcSAsim Jamshed ret = CopyToUser(mtcp, cur_stream, buf, len); 1502*76404edcSAsim Jamshed 1503*76404edcSAsim Jamshed #ifdef NEWRB 1504*76404edcSAsim Jamshed merged_len = tcprb_cflen(rcvvar->rcvbuf); 1505*76404edcSAsim Jamshed #else 1506*76404edcSAsim Jamshed merged_len = rcvvar->rcvbuf->merged_len; 1507*76404edcSAsim Jamshed #endif 1508*76404edcSAsim Jamshed 1509*76404edcSAsim Jamshed event_remaining = FALSE; 1510*76404edcSAsim Jamshed /* if there are remaining payload, generate EPOLLIN */ 1511*76404edcSAsim Jamshed /* (may due to insufficient user buffer) */ 1512*76404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLIN) { 1513*76404edcSAsim Jamshed if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 1514*76404edcSAsim Jamshed event_remaining = TRUE; 1515*76404edcSAsim Jamshed } 1516*76404edcSAsim Jamshed } 1517*76404edcSAsim Jamshed /* if waiting for close, notify it if no remaining data */ 1518*76404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT && 1519*76404edcSAsim Jamshed merged_len == 0 && ret > 0) { 1520*76404edcSAsim Jamshed event_remaining = TRUE; 1521*76404edcSAsim Jamshed } 1522*76404edcSAsim Jamshed 1523*76404edcSAsim Jamshed SBUF_UNLOCK(&rcvvar->read_lock); 1524*76404edcSAsim Jamshed 1525*76404edcSAsim Jamshed if (event_remaining) { 1526*76404edcSAsim Jamshed if (socket->epoll) { 1527*76404edcSAsim Jamshed AddEpollEvent(mtcp->ep, 1528*76404edcSAsim Jamshed USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 1529*76404edcSAsim Jamshed } 1530*76404edcSAsim Jamshed } 1531*76404edcSAsim Jamshed 1532*76404edcSAsim Jamshed TRACE_API("Stream %d: mtcp_read() returning %d\n", cur_stream->id, ret); 1533*76404edcSAsim Jamshed return ret; 1534*76404edcSAsim Jamshed } 1535*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 1536*76404edcSAsim Jamshed ssize_t 1537*76404edcSAsim Jamshed mtcp_readv(mctx_t mctx, int sockid, struct iovec *iov, int numIOV) 1538*76404edcSAsim Jamshed { 1539*76404edcSAsim Jamshed mtcp_manager_t mtcp; 1540*76404edcSAsim Jamshed socket_map_t socket; 1541*76404edcSAsim Jamshed tcp_stream *cur_stream; 1542*76404edcSAsim Jamshed struct tcp_recv_vars *rcvvar; 1543*76404edcSAsim Jamshed int ret, bytes_read, i; 1544*76404edcSAsim Jamshed int event_remaining, merged_len; 1545*76404edcSAsim Jamshed 1546*76404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 1547*76404edcSAsim Jamshed if (!mtcp) { 1548*76404edcSAsim Jamshed errno = EACCES; 1549*76404edcSAsim Jamshed return -1; 1550*76404edcSAsim Jamshed } 1551*76404edcSAsim Jamshed 1552*76404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1553*76404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 1554*76404edcSAsim Jamshed errno = EBADF; 1555*76404edcSAsim Jamshed return -1; 1556*76404edcSAsim Jamshed } 1557*76404edcSAsim Jamshed 1558*76404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 1559*76404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 1560*76404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 1561*76404edcSAsim Jamshed errno = EBADF; 1562*76404edcSAsim Jamshed return -1; 1563*76404edcSAsim Jamshed } 1564*76404edcSAsim Jamshed 1565*76404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM) { 1566*76404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 1567*76404edcSAsim Jamshed errno = ENOTSOCK; 1568*76404edcSAsim Jamshed return -1; 1569*76404edcSAsim Jamshed } 1570*76404edcSAsim Jamshed 1571*76404edcSAsim Jamshed /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */ 1572*76404edcSAsim Jamshed cur_stream = socket->stream; 1573*76404edcSAsim Jamshed if (!cur_stream || 1574*76404edcSAsim Jamshed !(cur_stream->state >= TCP_ST_ESTABLISHED && 1575*76404edcSAsim Jamshed cur_stream->state <= TCP_ST_CLOSE_WAIT)) { 1576*76404edcSAsim Jamshed errno = ENOTCONN; 1577*76404edcSAsim Jamshed return -1; 1578*76404edcSAsim Jamshed } 1579*76404edcSAsim Jamshed 1580*76404edcSAsim Jamshed rcvvar = cur_stream->rcvvar; 1581*76404edcSAsim Jamshed 1582*76404edcSAsim Jamshed #ifdef NEWRB 1583*76404edcSAsim Jamshed merged_len = tcprb_cflen(rcvvar->rcvbuf); 1584*76404edcSAsim Jamshed #else 1585*76404edcSAsim Jamshed merged_len = rcvvar->rcvbuf->merged_len; 1586*76404edcSAsim Jamshed #endif 1587*76404edcSAsim Jamshed 1588*76404edcSAsim Jamshed /* if CLOSE_WAIT, return 0 if there is no payload */ 1589*76404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT) { 1590*76404edcSAsim Jamshed if (!rcvvar->rcvbuf) 1591*76404edcSAsim Jamshed return 0; 1592*76404edcSAsim Jamshed 1593*76404edcSAsim Jamshed if (merged_len == 0) 1594*76404edcSAsim Jamshed return 0; 1595*76404edcSAsim Jamshed } 1596*76404edcSAsim Jamshed 1597*76404edcSAsim Jamshed /* return EAGAIN if no receive buffer */ 1598*76404edcSAsim Jamshed if (socket->opts & MTCP_NONBLOCK) { 1599*76404edcSAsim Jamshed if (!rcvvar->rcvbuf || merged_len == 0) { 1600*76404edcSAsim Jamshed errno = EAGAIN; 1601*76404edcSAsim Jamshed return -1; 1602*76404edcSAsim Jamshed } 1603*76404edcSAsim Jamshed } 1604*76404edcSAsim Jamshed 1605*76404edcSAsim Jamshed SBUF_LOCK(&rcvvar->read_lock); 1606*76404edcSAsim Jamshed 1607*76404edcSAsim Jamshed /* read and store the contents to the vectored buffers */ 1608*76404edcSAsim Jamshed bytes_read = 0; 1609*76404edcSAsim Jamshed for (i = 0; i < numIOV; i++) { 1610*76404edcSAsim Jamshed if (iov[i].iov_len <= 0) 1611*76404edcSAsim Jamshed continue; 1612*76404edcSAsim Jamshed 1613*76404edcSAsim Jamshed ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 1614*76404edcSAsim Jamshed if (ret <= 0) 1615*76404edcSAsim Jamshed break; 1616*76404edcSAsim Jamshed 1617*76404edcSAsim Jamshed bytes_read += ret; 1618*76404edcSAsim Jamshed 1619*76404edcSAsim Jamshed if (ret < iov[i].iov_len) 1620*76404edcSAsim Jamshed break; 1621*76404edcSAsim Jamshed } 1622*76404edcSAsim Jamshed 1623*76404edcSAsim Jamshed #ifdef NEWRB 1624*76404edcSAsim Jamshed merged_len = tcprb_cflen(rcvvar->rcvbuf); 1625*76404edcSAsim Jamshed #else 1626*76404edcSAsim Jamshed merged_len = rcvvar->rcvbuf->merged_len; 1627*76404edcSAsim Jamshed #endif 1628*76404edcSAsim Jamshed 1629*76404edcSAsim Jamshed event_remaining = FALSE; 1630*76404edcSAsim Jamshed /* if there are remaining payload, generate read event */ 1631*76404edcSAsim Jamshed /* (may due to insufficient user buffer) */ 1632*76404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLIN) { 1633*76404edcSAsim Jamshed if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) { 1634*76404edcSAsim Jamshed event_remaining = TRUE; 1635*76404edcSAsim Jamshed } 1636*76404edcSAsim Jamshed } 1637*76404edcSAsim Jamshed /* if waiting for close, notify it if no remaining data */ 1638*76404edcSAsim Jamshed if (cur_stream->state == TCP_ST_CLOSE_WAIT && 1639*76404edcSAsim Jamshed merged_len == 0 && bytes_read > 0) { 1640*76404edcSAsim Jamshed event_remaining = TRUE; 1641*76404edcSAsim Jamshed } 1642*76404edcSAsim Jamshed 1643*76404edcSAsim Jamshed SBUF_UNLOCK(&rcvvar->read_lock); 1644*76404edcSAsim Jamshed 1645*76404edcSAsim Jamshed if(event_remaining) { 1646*76404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) { 1647*76404edcSAsim Jamshed AddEpollEvent(mtcp->ep, 1648*76404edcSAsim Jamshed USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 1649*76404edcSAsim Jamshed } 1650*76404edcSAsim Jamshed } 1651*76404edcSAsim Jamshed 1652*76404edcSAsim Jamshed TRACE_API("Stream %d: mtcp_readv() returning %d\n", 1653*76404edcSAsim Jamshed cur_stream->id, bytes_read); 1654*76404edcSAsim Jamshed return bytes_read; 1655*76404edcSAsim Jamshed } 1656*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 1657*76404edcSAsim Jamshed static inline int 1658*76404edcSAsim Jamshed CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len) 1659*76404edcSAsim Jamshed { 1660*76404edcSAsim Jamshed struct tcp_send_vars *sndvar = cur_stream->sndvar; 1661*76404edcSAsim Jamshed int sndlen; 1662*76404edcSAsim Jamshed int ret; 1663*76404edcSAsim Jamshed 1664*76404edcSAsim Jamshed sndlen = MIN((int)sndvar->snd_wnd, len); 1665*76404edcSAsim Jamshed if (sndlen <= 0) { 1666*76404edcSAsim Jamshed errno = EAGAIN; 1667*76404edcSAsim Jamshed return -1; 1668*76404edcSAsim Jamshed } 1669*76404edcSAsim Jamshed 1670*76404edcSAsim Jamshed /* allocate send buffer if not exist */ 1671*76404edcSAsim Jamshed if (!sndvar->sndbuf) { 1672*76404edcSAsim Jamshed sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1); 1673*76404edcSAsim Jamshed if (!sndvar->sndbuf) { 1674*76404edcSAsim Jamshed cur_stream->close_reason = TCP_NO_MEM; 1675*76404edcSAsim Jamshed /* notification may not required due to -1 return */ 1676*76404edcSAsim Jamshed errno = ENOMEM; 1677*76404edcSAsim Jamshed return -1; 1678*76404edcSAsim Jamshed } 1679*76404edcSAsim Jamshed } 1680*76404edcSAsim Jamshed 1681*76404edcSAsim Jamshed ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen); 1682*76404edcSAsim Jamshed assert(ret == sndlen); 1683*76404edcSAsim Jamshed sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len; 1684*76404edcSAsim Jamshed if (ret <= 0) { 1685*76404edcSAsim Jamshed TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n", 1686*76404edcSAsim Jamshed ret, sndlen, sndvar->sndbuf->len); 1687*76404edcSAsim Jamshed errno = EAGAIN; 1688*76404edcSAsim Jamshed return -1; 1689*76404edcSAsim Jamshed } 1690*76404edcSAsim Jamshed 1691*76404edcSAsim Jamshed if (sndvar->snd_wnd <= 0) { 1692*76404edcSAsim Jamshed TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n", 1693*76404edcSAsim Jamshed cur_stream->id, sndvar->snd_wnd); 1694*76404edcSAsim Jamshed } 1695*76404edcSAsim Jamshed 1696*76404edcSAsim Jamshed return ret; 1697*76404edcSAsim Jamshed } 1698*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 1699*76404edcSAsim Jamshed ssize_t 1700*76404edcSAsim Jamshed mtcp_write(mctx_t mctx, int sockid, char *buf, size_t len) 1701*76404edcSAsim Jamshed { 1702*76404edcSAsim Jamshed mtcp_manager_t mtcp; 1703*76404edcSAsim Jamshed socket_map_t socket; 1704*76404edcSAsim Jamshed tcp_stream *cur_stream; 1705*76404edcSAsim Jamshed struct tcp_send_vars *sndvar; 1706*76404edcSAsim Jamshed int ret; 1707*76404edcSAsim Jamshed 1708*76404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 1709*76404edcSAsim Jamshed if (!mtcp) { 1710*76404edcSAsim Jamshed errno = EACCES; 1711*76404edcSAsim Jamshed return -1; 1712*76404edcSAsim Jamshed } 1713*76404edcSAsim Jamshed 1714*76404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1715*76404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 1716*76404edcSAsim Jamshed errno = EBADF; 1717*76404edcSAsim Jamshed return -1; 1718*76404edcSAsim Jamshed } 1719*76404edcSAsim Jamshed 1720*76404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 1721*76404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 1722*76404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 1723*76404edcSAsim Jamshed errno = EBADF; 1724*76404edcSAsim Jamshed return -1; 1725*76404edcSAsim Jamshed } 1726*76404edcSAsim Jamshed 1727*76404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_PIPE) { 1728*76404edcSAsim Jamshed return PipeWrite(mctx, sockid, buf, len); 1729*76404edcSAsim Jamshed } 1730*76404edcSAsim Jamshed 1731*76404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM) { 1732*76404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 1733*76404edcSAsim Jamshed errno = ENOTSOCK; 1734*76404edcSAsim Jamshed return -1; 1735*76404edcSAsim Jamshed } 1736*76404edcSAsim Jamshed 1737*76404edcSAsim Jamshed cur_stream = socket->stream; 1738*76404edcSAsim Jamshed if (!cur_stream || 1739*76404edcSAsim Jamshed !(cur_stream->state == TCP_ST_ESTABLISHED || 1740*76404edcSAsim Jamshed cur_stream->state == TCP_ST_CLOSE_WAIT)) { 1741*76404edcSAsim Jamshed errno = ENOTCONN; 1742*76404edcSAsim Jamshed return -1; 1743*76404edcSAsim Jamshed } 1744*76404edcSAsim Jamshed 1745*76404edcSAsim Jamshed if (len <= 0) { 1746*76404edcSAsim Jamshed if (socket->opts & MTCP_NONBLOCK) { 1747*76404edcSAsim Jamshed errno = EAGAIN; 1748*76404edcSAsim Jamshed return -1; 1749*76404edcSAsim Jamshed } else { 1750*76404edcSAsim Jamshed return 0; 1751*76404edcSAsim Jamshed } 1752*76404edcSAsim Jamshed } 1753*76404edcSAsim Jamshed 1754*76404edcSAsim Jamshed sndvar = cur_stream->sndvar; 1755*76404edcSAsim Jamshed 1756*76404edcSAsim Jamshed SBUF_LOCK(&sndvar->write_lock); 1757*76404edcSAsim Jamshed ret = CopyFromUser(mtcp, cur_stream, buf, len); 1758*76404edcSAsim Jamshed 1759*76404edcSAsim Jamshed SBUF_UNLOCK(&sndvar->write_lock); 1760*76404edcSAsim Jamshed 1761*76404edcSAsim Jamshed if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 1762*76404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->sendq_lock); 1763*76404edcSAsim Jamshed sndvar->on_sendq = TRUE; 1764*76404edcSAsim Jamshed StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 1765*76404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->sendq_lock); 1766*76404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 1767*76404edcSAsim Jamshed } 1768*76404edcSAsim Jamshed 1769*76404edcSAsim Jamshed if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) { 1770*76404edcSAsim Jamshed ret = -1; 1771*76404edcSAsim Jamshed errno = EAGAIN; 1772*76404edcSAsim Jamshed } 1773*76404edcSAsim Jamshed 1774*76404edcSAsim Jamshed /* if there are remaining sending buffer, generate write event */ 1775*76404edcSAsim Jamshed if (sndvar->snd_wnd > 0) { 1776*76404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 1777*76404edcSAsim Jamshed AddEpollEvent(mtcp->ep, 1778*76404edcSAsim Jamshed USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 1779*76404edcSAsim Jamshed } 1780*76404edcSAsim Jamshed } 1781*76404edcSAsim Jamshed 1782*76404edcSAsim Jamshed TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret); 1783*76404edcSAsim Jamshed return ret; 1784*76404edcSAsim Jamshed } 1785*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 1786*76404edcSAsim Jamshed ssize_t 1787*76404edcSAsim Jamshed mtcp_writev(mctx_t mctx, int sockid, struct iovec *iov, int numIOV) 1788*76404edcSAsim Jamshed { 1789*76404edcSAsim Jamshed mtcp_manager_t mtcp; 1790*76404edcSAsim Jamshed socket_map_t socket; 1791*76404edcSAsim Jamshed tcp_stream *cur_stream; 1792*76404edcSAsim Jamshed struct tcp_send_vars *sndvar; 1793*76404edcSAsim Jamshed int ret, to_write, i; 1794*76404edcSAsim Jamshed 1795*76404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 1796*76404edcSAsim Jamshed if (!mtcp) { 1797*76404edcSAsim Jamshed errno = EACCES; 1798*76404edcSAsim Jamshed return -1; 1799*76404edcSAsim Jamshed } 1800*76404edcSAsim Jamshed 1801*76404edcSAsim Jamshed if (sockid < 0 || sockid >= g_config.mos->max_concurrency) { 1802*76404edcSAsim Jamshed TRACE_API("Socket id %d out of range.\n", sockid); 1803*76404edcSAsim Jamshed errno = EBADF; 1804*76404edcSAsim Jamshed return -1; 1805*76404edcSAsim Jamshed } 1806*76404edcSAsim Jamshed 1807*76404edcSAsim Jamshed socket = &mtcp->smap[sockid]; 1808*76404edcSAsim Jamshed if (socket->socktype == MOS_SOCK_UNUSED) { 1809*76404edcSAsim Jamshed TRACE_API("Invalid socket id: %d\n", sockid); 1810*76404edcSAsim Jamshed errno = EBADF; 1811*76404edcSAsim Jamshed return -1; 1812*76404edcSAsim Jamshed } 1813*76404edcSAsim Jamshed 1814*76404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_STREAM) { 1815*76404edcSAsim Jamshed TRACE_API("Not an end socket. id: %d\n", sockid); 1816*76404edcSAsim Jamshed errno = ENOTSOCK; 1817*76404edcSAsim Jamshed return -1; 1818*76404edcSAsim Jamshed } 1819*76404edcSAsim Jamshed 1820*76404edcSAsim Jamshed cur_stream = socket->stream; 1821*76404edcSAsim Jamshed if (!cur_stream || 1822*76404edcSAsim Jamshed !(cur_stream->state == TCP_ST_ESTABLISHED || 1823*76404edcSAsim Jamshed cur_stream->state == TCP_ST_CLOSE_WAIT)) { 1824*76404edcSAsim Jamshed errno = ENOTCONN; 1825*76404edcSAsim Jamshed return -1; 1826*76404edcSAsim Jamshed } 1827*76404edcSAsim Jamshed 1828*76404edcSAsim Jamshed sndvar = cur_stream->sndvar; 1829*76404edcSAsim Jamshed SBUF_LOCK(&sndvar->write_lock); 1830*76404edcSAsim Jamshed 1831*76404edcSAsim Jamshed /* write from the vectored buffers */ 1832*76404edcSAsim Jamshed to_write = 0; 1833*76404edcSAsim Jamshed for (i = 0; i < numIOV; i++) { 1834*76404edcSAsim Jamshed if (iov[i].iov_len <= 0) 1835*76404edcSAsim Jamshed continue; 1836*76404edcSAsim Jamshed 1837*76404edcSAsim Jamshed ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len); 1838*76404edcSAsim Jamshed if (ret <= 0) 1839*76404edcSAsim Jamshed break; 1840*76404edcSAsim Jamshed 1841*76404edcSAsim Jamshed to_write += ret; 1842*76404edcSAsim Jamshed 1843*76404edcSAsim Jamshed if (ret < iov[i].iov_len) 1844*76404edcSAsim Jamshed break; 1845*76404edcSAsim Jamshed } 1846*76404edcSAsim Jamshed SBUF_UNLOCK(&sndvar->write_lock); 1847*76404edcSAsim Jamshed 1848*76404edcSAsim Jamshed if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) { 1849*76404edcSAsim Jamshed SQ_LOCK(&mtcp->ctx->sendq_lock); 1850*76404edcSAsim Jamshed sndvar->on_sendq = TRUE; 1851*76404edcSAsim Jamshed StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */ 1852*76404edcSAsim Jamshed SQ_UNLOCK(&mtcp->ctx->sendq_lock); 1853*76404edcSAsim Jamshed mtcp->wakeup_flag = TRUE; 1854*76404edcSAsim Jamshed } 1855*76404edcSAsim Jamshed 1856*76404edcSAsim Jamshed if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) { 1857*76404edcSAsim Jamshed to_write = -1; 1858*76404edcSAsim Jamshed errno = EAGAIN; 1859*76404edcSAsim Jamshed } 1860*76404edcSAsim Jamshed 1861*76404edcSAsim Jamshed /* if there are remaining sending buffer, generate write event */ 1862*76404edcSAsim Jamshed if (sndvar->snd_wnd > 0) { 1863*76404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) { 1864*76404edcSAsim Jamshed AddEpollEvent(mtcp->ep, 1865*76404edcSAsim Jamshed USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 1866*76404edcSAsim Jamshed } 1867*76404edcSAsim Jamshed } 1868*76404edcSAsim Jamshed 1869*76404edcSAsim Jamshed TRACE_API("Stream %d: mtcp_writev() returning %d\n", 1870*76404edcSAsim Jamshed cur_stream->id, to_write); 1871*76404edcSAsim Jamshed return to_write; 1872*76404edcSAsim Jamshed } 1873*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 1874*76404edcSAsim Jamshed uint32_t 1875*76404edcSAsim Jamshed mtcp_get_connection_cnt(mctx_t mctx) 1876*76404edcSAsim Jamshed { 1877*76404edcSAsim Jamshed mtcp_manager_t mtcp; 1878*76404edcSAsim Jamshed mtcp = GetMTCPManager(mctx); 1879*76404edcSAsim Jamshed if (!mtcp) { 1880*76404edcSAsim Jamshed errno = EACCES; 1881*76404edcSAsim Jamshed return -1; 1882*76404edcSAsim Jamshed } 1883*76404edcSAsim Jamshed 1884*76404edcSAsim Jamshed if (mtcp->num_msp > 0) 1885*76404edcSAsim Jamshed return mtcp->flow_cnt / 2; 1886*76404edcSAsim Jamshed else 1887*76404edcSAsim Jamshed return mtcp->flow_cnt; 1888*76404edcSAsim Jamshed } 1889*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 1890