1 #include <sys/queue.h>
2 #include <sys/ioctl.h>
3 #include <limits.h>
4 #include <unistd.h>
5 #include <assert.h>
6 #include <string.h>
7
8 #ifdef DARWIN
9 #include <netinet/tcp.h>
10 #include <netinet/ip.h>
11 #include <netinet/if_ether.h>
12 #endif
13
14 #include "mtcp.h"
15 #include "mtcp_api.h"
16 #include "tcp_in.h"
17 #include "tcp_stream.h"
18 #include "tcp_out.h"
19 #include "ip_out.h"
20 #include "eventpoll.h"
21 #include "pipe.h"
22 #include "fhash.h"
23 #include "addr_pool.h"
24 #include "util.h"
25 #include "config.h"
26 #include "debug.h"
27 #include "eventpoll.h"
28 #include "mos_api.h"
29 #include "tcp_rb.h"
30
31 #define MAX(a, b) ((a)>(b)?(a):(b))
32 #define MIN(a, b) ((a)<(b)?(a):(b))
33
34 /*----------------------------------------------------------------------------*/
35 /** Stop monitoring the socket! (function prototype)
36 * @param [in] mctx: mtcp context
37 * @param [in] sock: monitoring stream socket id
38 * @param [in] side: side of monitoring (client side, server side or both)
39 *
40 * This function is now DEPRECATED and is only used within mOS core...
41 */
42 int
43 mtcp_cb_stop(mctx_t mctx, int sock, int side);
44 /*----------------------------------------------------------------------------*/
45 /** Reset the connection (send RST packets to both sides)
46 * (We need to decide the API for this.)
47 */
48 //int
49 //mtcp_cb_reset(mctx_t mctx, int sock, int side);
50 /*----------------------------------------------------------------------------*/
51 inline mtcp_manager_t
GetMTCPManager(mctx_t mctx)52 GetMTCPManager(mctx_t mctx)
53 {
54 if (!mctx) {
55 errno = EACCES;
56 return NULL;
57 }
58
59 if (mctx->cpu < 0 || mctx->cpu >= num_cpus) {
60 errno = EINVAL;
61 return NULL;
62 }
63
64 if (!g_mtcp[mctx->cpu] || g_mtcp[mctx->cpu]->ctx->done || g_mtcp[mctx->cpu]->ctx->exit) {
65 errno = EPERM;
66 return NULL;
67 }
68
69 return g_mtcp[mctx->cpu];
70 }
71 /*----------------------------------------------------------------------------*/
72 static inline int
GetSocketError(socket_map_t socket,void * optval,socklen_t * optlen)73 GetSocketError(socket_map_t socket, void *optval, socklen_t *optlen)
74 {
75 tcp_stream *cur_stream;
76
77 if (!socket->stream) {
78 errno = EBADF;
79 return -1;
80 }
81
82 cur_stream = socket->stream;
83 if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
84 if (cur_stream->close_reason == TCP_TIMEDOUT ||
85 cur_stream->close_reason == TCP_CONN_FAIL ||
86 cur_stream->close_reason == TCP_CONN_LOST) {
87 *(int *)optval = ETIMEDOUT;
88 *optlen = sizeof(int);
89
90 return 0;
91 }
92 }
93
94 if (cur_stream->state == TCP_ST_CLOSE_WAIT ||
95 cur_stream->state == TCP_ST_CLOSED_RSVD) {
96 if (cur_stream->close_reason == TCP_RESET) {
97 *(int *)optval = ECONNRESET;
98 *optlen = sizeof(int);
99
100 return 0;
101 }
102 }
103
104 if (cur_stream->state == TCP_ST_SYN_SENT &&
105 errno == EINPROGRESS) {
106 *(int *)optval = errno;
107 *optlen = sizeof(int);
108
109 return -1;
110 }
111
112 /*
113 * `base case`: If socket sees no so_error, then
114 * this also means close_reason will always be
115 * TCP_NOT_CLOSED.
116 */
117 if (cur_stream->close_reason == TCP_NOT_CLOSED) {
118 *(int *)optval = 0;
119 *optlen = sizeof(int);
120
121 return 0;
122 }
123
124 errno = ENOSYS;
125 return -1;
126 }
127 /*----------------------------------------------------------------------------*/
128 int
mtcp_getsockname(mctx_t mctx,int sockid,struct sockaddr * addr,socklen_t * addrlen)129 mtcp_getsockname(mctx_t mctx, int sockid, struct sockaddr *addr,
130 socklen_t *addrlen)
131 {
132 mtcp_manager_t mtcp;
133 socket_map_t socket;
134
135 mtcp = GetMTCPManager(mctx);
136 if (!mtcp) {
137 return -1;
138 }
139
140 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
141 TRACE_API("Socket id %d out of range.\n", sockid);
142 errno = EBADF;
143 return -1;
144 }
145
146 socket = &mtcp->smap[sockid];
147 if (socket->socktype == MOS_SOCK_UNUSED) {
148 TRACE_API("Invalid socket id: %d\n", sockid);
149 errno = EBADF;
150 return -1;
151 }
152
153 if (*addrlen <= 0) {
154 TRACE_API("Invalid addrlen: %d\n", *addrlen);
155 errno = EINVAL;
156 return -1;
157 }
158
159 if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
160 socket->socktype != MOS_SOCK_STREAM) {
161 TRACE_API("Invalid socket id: %d\n", sockid);
162 errno = ENOTSOCK;
163 return -1;
164 }
165
166 *(struct sockaddr_in *)addr = socket->saddr;
167 *addrlen = sizeof(socket->saddr);
168
169 return 0;
170 }
171 /*----------------------------------------------------------------------------*/
172 int
mtcp_getsockopt(mctx_t mctx,int sockid,int level,int optname,void * optval,socklen_t * optlen)173 mtcp_getsockopt(mctx_t mctx, int sockid, int level,
174 int optname, void *optval, socklen_t *optlen)
175 {
176 mtcp_manager_t mtcp;
177 socket_map_t socket;
178
179 mtcp = GetMTCPManager(mctx);
180 if (!mtcp) {
181 errno = EACCES;
182 return -1;
183 }
184
185 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
186 TRACE_API("Socket id %d out of range.\n", sockid);
187 errno = EBADF;
188 return -1;
189 }
190
191 switch (level) {
192 case SOL_SOCKET:
193 socket = &mtcp->smap[sockid];
194 if (socket->socktype == MOS_SOCK_UNUSED) {
195 TRACE_API("Invalid socket id: %d\n", sockid);
196 errno = EBADF;
197 return -1;
198 }
199
200 if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
201 socket->socktype != MOS_SOCK_STREAM) {
202 TRACE_API("Invalid socket id: %d\n", sockid);
203 errno = ENOTSOCK;
204 return -1;
205 }
206
207 if (optname == SO_ERROR) {
208 if (socket->socktype == MOS_SOCK_STREAM) {
209 return GetSocketError(socket, optval, optlen);
210 }
211 }
212 break;
213 case SOL_MONSOCKET:
214 /* check if the calling thread is in MOS context */
215 if (mtcp->ctx->thread != pthread_self()) {
216 errno = EPERM;
217 return -1;
218 }
219 /*
220 * All options will only work for active
221 * monitor stream sockets
222 */
223 socket = &mtcp->msmap[sockid];
224 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
225 TRACE_API("Invalid socket id: %d\n", sockid);
226 errno = ENOTSOCK;
227 return -1;
228 }
229
230 switch (optname) {
231 case MOS_FRAGINFO_CLIBUF:
232 return GetFragInfo(socket, MOS_SIDE_CLI, optval, optlen);
233 case MOS_FRAGINFO_SVRBUF:
234 return GetFragInfo(socket, MOS_SIDE_SVR, optval, optlen);
235 case MOS_INFO_CLIBUF:
236 return GetBufInfo(socket, MOS_SIDE_CLI, optval, optlen);
237 case MOS_INFO_SVRBUF:
238 return GetBufInfo(socket, MOS_SIDE_SVR, optval, optlen);
239 case MOS_TCP_STATE_CLI:
240 return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_CLI,
241 optval, optlen);
242 case MOS_TCP_STATE_SVR:
243 return GetTCPState(socket->monitor_stream->stream, MOS_SIDE_SVR,
244 optval, optlen);
245 case MOS_TIMESTAMP:
246 return GetLastTimestamp(socket->monitor_stream->stream,
247 (uint32_t *)optval,
248 optlen);
249 default:
250 TRACE_API("can't recognize the optname=%d\n", optname);
251 assert(0);
252 }
253 break;
254 }
255 errno = ENOSYS;
256 return -1;
257 }
258 /*----------------------------------------------------------------------------*/
259 int
mtcp_setsockopt(mctx_t mctx,int sockid,int level,int optname,const void * optval,socklen_t optlen)260 mtcp_setsockopt(mctx_t mctx, int sockid, int level,
261 int optname, const void *optval, socklen_t optlen)
262 {
263 mtcp_manager_t mtcp;
264 socket_map_t socket;
265 tcprb_t *rb;
266
267 mtcp = GetMTCPManager(mctx);
268 if (!mtcp) {
269 errno = EACCES;
270 return -1;
271 }
272
273 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
274 TRACE_API("Socket id %d out of range.\n", sockid);
275 errno = EBADF;
276 return -1;
277 }
278
279 switch (level) {
280 case SOL_SOCKET:
281 socket = &mtcp->smap[sockid];
282 if (socket->socktype == MOS_SOCK_UNUSED) {
283 TRACE_API("Invalid socket id: %d\n", sockid);
284 errno = EBADF;
285 return -1;
286 }
287
288 if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
289 socket->socktype != MOS_SOCK_STREAM) {
290 TRACE_API("Invalid socket id: %d\n", sockid);
291 errno = ENOTSOCK;
292 return -1;
293 }
294 break;
295 case SOL_MONSOCKET:
296 socket = &mtcp->msmap[sockid];
297 /*
298 * checking of calling thread to be in MOS context is
299 * disabled since both optnames can be called from
300 * `application' context (on passive sockets)
301 */
302 /*
303 * if (mtcp->ctx->thread != pthread_self())
304 * return -1;
305 */
306
307 switch (optname) {
308 case MOS_CLIOVERLAP:
309 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
310 socket->monitor_stream->stream->rcvvar->rcvbuf :
311 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
312 if (rb == NULL) {
313 errno = EFAULT;
314 return -1;
315 }
316 if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) {
317 errno = EINVAL;
318 return -1;
319 } else
320 return 0;
321 break;
322 case MOS_SVROVERLAP:
323 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
324 socket->monitor_stream->stream->rcvvar->rcvbuf :
325 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
326 if (rb == NULL) {
327 errno = EFAULT;
328 return -1;
329 }
330 if (tcprb_setpolicy(rb, *(uint8_t *)optval) < 0) {
331 errno = EINVAL;
332 return -1;
333 } else
334 return 0;
335 break;
336 case MOS_CLIBUF:
337 #if 0
338 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
339 errno = EBADF;
340 return -1;
341 }
342 #endif
343 #ifdef DISABLE_DYN_RESIZE
344 if (*(int *)optval != 0)
345 return -1;
346 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
347 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
348 socket->monitor_stream->stream->rcvvar->rcvbuf :
349 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
350 if (rb) {
351 tcprb_resize_meta(rb, 0);
352 tcprb_resize(rb, 0);
353 }
354 }
355 return DisableBuf(socket, MOS_SIDE_CLI);
356 #else
357 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
358 socket->monitor_stream->stream->rcvvar->rcvbuf :
359 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
360 if (tcprb_resize_meta(rb, *(int *)optval) < 0)
361 return -1;
362 return tcprb_resize(rb,
363 (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
364 #endif
365 case MOS_SVRBUF:
366 #if 0
367 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
368 errno = EBADF;
369 return -1;
370 }
371 #endif
372 #ifdef DISABLE_DYN_RESIZE
373 if (*(int *)optval != 0)
374 return -1;
375 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
376 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
377 socket->monitor_stream->stream->rcvvar->rcvbuf :
378 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
379 if (rb) {
380 tcprb_resize_meta(rb, 0);
381 tcprb_resize(rb, 0);
382 }
383 }
384 return DisableBuf(socket, MOS_SIDE_SVR);
385 #else
386 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
387 socket->monitor_stream->stream->rcvvar->rcvbuf :
388 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
389 if (tcprb_resize_meta(rb, *(int *)optval) < 0)
390 return -1;
391 return tcprb_resize(rb,
392 (((int)rb->metalen - 1) / UNITBUFSIZE + 1) * UNITBUFSIZE);
393 #endif
394 case MOS_FRAG_CLIBUF:
395 #if 0
396 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
397 errno = EBADF;
398 return -1;
399 }
400 #endif
401 #ifdef DISABLE_DYN_RESIZE
402 if (*(int *)optval != 0)
403 return -1;
404 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
405 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
406 socket->monitor_stream->stream->rcvvar->rcvbuf :
407 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
408 if (rb)
409 tcprb_resize(rb, 0);
410 }
411 return 0;
412 #else
413 rb = (socket->monitor_stream->stream->side == MOS_SIDE_CLI) ?
414 socket->monitor_stream->stream->rcvvar->rcvbuf :
415 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
416 if (rb->len == 0)
417 return tcprb_resize_meta(rb, *(int *)optval);
418 else
419 return -1;
420 #endif
421 case MOS_FRAG_SVRBUF:
422 #if 0
423 if (socket->socktype != MOS_SOCK_MONITOR_STREAM_ACTIVE) {
424 errno = EBADF;
425 return -1;
426 }
427 #endif
428 #ifdef DISABLE_DYN_RESIZE
429 if (*(int *)optval != 0)
430 return -1;
431 if (socket->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE) {
432 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
433 socket->monitor_stream->stream->rcvvar->rcvbuf :
434 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
435 if (rb)
436 tcprb_resize(rb, 0);
437 }
438 return 0;
439 #else
440 rb = (socket->monitor_stream->stream->side == MOS_SIDE_SVR) ?
441 socket->monitor_stream->stream->rcvvar->rcvbuf :
442 socket->monitor_stream->stream->pair_stream->rcvvar->rcvbuf;
443 if (rb->len == 0)
444 return tcprb_resize_meta(rb, *(int *)optval);
445 else
446 return -1;
447 #endif
448 case MOS_SEQ_REMAP:
449 break;
450 case MOS_STOP_MON:
451 return mtcp_cb_stop(mctx, sockid, *(int *)optval);
452 default:
453 TRACE_API("invalid optname=%d\n", optname);
454 assert(0);
455 }
456 break;
457 }
458
459 errno = ENOSYS;
460 return -1;
461 }
462 /*----------------------------------------------------------------------------*/
463 int
mtcp_setsock_nonblock(mctx_t mctx,int sockid)464 mtcp_setsock_nonblock(mctx_t mctx, int sockid)
465 {
466 mtcp_manager_t mtcp;
467
468 mtcp = GetMTCPManager(mctx);
469 if (!mtcp) {
470 errno = EACCES;
471 return -1;
472 }
473
474 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
475 TRACE_API("Socket id %d out of range.\n", sockid);
476 errno = EBADF;
477 return -1;
478 }
479
480 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
481 TRACE_API("Invalid socket id: %d\n", sockid);
482 errno = EBADF;
483 return -1;
484 }
485
486 mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
487
488 return 0;
489 }
490 /*----------------------------------------------------------------------------*/
491 int
mtcp_ioctl(mctx_t mctx,int sockid,int request,void * argp)492 mtcp_ioctl(mctx_t mctx, int sockid, int request, void *argp)
493 {
494 mtcp_manager_t mtcp;
495 socket_map_t socket;
496
497 mtcp = GetMTCPManager(mctx);
498 if (!mtcp) {
499 errno = EACCES;
500 return -1;
501 }
502
503 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
504 TRACE_API("Socket id %d out of range.\n", sockid);
505 errno = EBADF;
506 return -1;
507 }
508
509 /* only support stream socket */
510 socket = &mtcp->smap[sockid];
511
512 if (socket->socktype != MOS_SOCK_STREAM_LISTEN &&
513 socket->socktype != MOS_SOCK_STREAM) {
514 TRACE_API("Invalid socket id: %d\n", sockid);
515 errno = EBADF;
516 return -1;
517 }
518
519 if (!argp) {
520 errno = EFAULT;
521 return -1;
522 }
523
524 if (request == FIONREAD) {
525 tcp_stream *cur_stream;
526 tcprb_t *rbuf;
527
528 cur_stream = socket->stream;
529 if (!cur_stream) {
530 errno = EBADF;
531 return -1;
532 }
533
534 rbuf = cur_stream->rcvvar->rcvbuf;
535 *(int *)argp = (rbuf) ? tcprb_cflen(rbuf) : 0;
536
537 } else if (request == FIONBIO) {
538 /*
539 * sockets can only be set to blocking/non-blocking
540 * modes during initialization
541 */
542 if ((*(int *)argp))
543 mtcp->smap[sockid].opts |= MTCP_NONBLOCK;
544 else
545 mtcp->smap[sockid].opts &= ~MTCP_NONBLOCK;
546 } else {
547 errno = EINVAL;
548 return -1;
549 }
550
551 return 0;
552 }
553 /*----------------------------------------------------------------------------*/
554 static int
mtcp_monitor(mctx_t mctx,socket_map_t sock)555 mtcp_monitor(mctx_t mctx, socket_map_t sock)
556 {
557 mtcp_manager_t mtcp;
558 struct mon_listener *monitor;
559 int sockid = sock->id;
560
561 mtcp = GetMTCPManager(mctx);
562 if (!mtcp) {
563 errno = EACCES;
564 return -1;
565 }
566
567 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
568 TRACE_API("Socket id %d out of range.\n", sockid);
569 errno = EBADF;
570 return -1;
571 }
572
573 if (mtcp->msmap[sockid].socktype == MOS_SOCK_UNUSED) {
574 TRACE_API("Invalid socket id: %d\n", sockid);
575 errno = EBADF;
576 return -1;
577 }
578
579 if (!(mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_STREAM ||
580 mtcp->msmap[sockid].socktype == MOS_SOCK_MONITOR_RAW)) {
581 TRACE_API("Not a monitor socket. id: %d\n", sockid);
582 errno = ENOTSOCK;
583 return -1;
584 }
585
586 monitor = (struct mon_listener *)calloc(1, sizeof(struct mon_listener));
587 if (!monitor) {
588 /* errno set from the malloc() */
589 errno = ENOMEM;
590 return -1;
591 }
592
593 /* create a monitor-specific event queue */
594 monitor->eq = CreateEventQueue(g_config.mos->max_concurrency);
595 if (!monitor->eq) {
596 TRACE_API("Can't create event queue (concurrency: %d) for "
597 "monitor read event registrations!\n",
598 g_config.mos->max_concurrency);
599 free(monitor);
600 errno = ENOMEM;
601 return -1;
602 }
603
604 /* set monitor-related basic parameters */
605 #ifndef NEWEV
606 monitor->ude_id = UDE_OFFSET;
607 #endif
608 monitor->socket = sock;
609 monitor->client_buf_mgmt = monitor->server_buf_mgmt = BUFMGMT_FULL;
610
611 /* perform both sides monitoring by default */
612 monitor->client_mon = monitor->server_mon = 1;
613
614 /* add monitor socket to the monitor list */
615 TAILQ_INSERT_TAIL(&mtcp->monitors, monitor, link);
616
617 mtcp->msmap[sockid].monitor_listener = monitor;
618
619 return 0;
620 }
621 /*----------------------------------------------------------------------------*/
622 int
mtcp_socket(mctx_t mctx,int domain,int type,int protocol)623 mtcp_socket(mctx_t mctx, int domain, int type, int protocol)
624 {
625 mtcp_manager_t mtcp;
626 socket_map_t socket;
627
628 mtcp = GetMTCPManager(mctx);
629 if (!mtcp) {
630 errno = EACCES;
631 return -1;
632 }
633
634 if (domain != AF_INET) {
635 errno = EAFNOSUPPORT;
636 return -1;
637 }
638
639 if (type == (int)SOCK_STREAM) {
640 type = MOS_SOCK_STREAM;
641 } else if (type == MOS_SOCK_MONITOR_STREAM ||
642 type == MOS_SOCK_MONITOR_RAW) {
643 /* do nothing for the time being */
644 } else {
645 /* Not supported type */
646 errno = EINVAL;
647 return -1;
648 }
649
650 socket = AllocateSocket(mctx, type);
651 if (!socket) {
652 errno = ENFILE;
653 return -1;
654 }
655
656 if (type == MOS_SOCK_MONITOR_STREAM ||
657 type == MOS_SOCK_MONITOR_RAW) {
658 mtcp_manager_t mtcp = GetMTCPManager(mctx);
659 if (!mtcp) {
660 errno = EACCES;
661 return -1;
662 }
663 mtcp_monitor(mctx, socket);
664 #ifdef NEWEV
665 socket->monitor_listener->stree_dontcare = NULL;
666 socket->monitor_listener->stree_pre_rcv = NULL;
667 socket->monitor_listener->stree_post_snd = NULL;
668 #else
669 InitEvB(mtcp, &socket->monitor_listener->dontcare_evb);
670 InitEvB(mtcp, &socket->monitor_listener->pre_tcp_evb);
671 InitEvB(mtcp, &socket->monitor_listener->post_tcp_evb);
672 #endif
673 }
674
675 return socket->id;
676 }
677 /*----------------------------------------------------------------------------*/
678 int
mtcp_bind(mctx_t mctx,int sockid,const struct sockaddr * addr,socklen_t addrlen)679 mtcp_bind(mctx_t mctx, int sockid,
680 const struct sockaddr *addr, socklen_t addrlen)
681 {
682 mtcp_manager_t mtcp;
683 struct sockaddr_in *addr_in;
684
685 mtcp = GetMTCPManager(mctx);
686 if (!mtcp) {
687 errno = EACCES;
688 return -1;
689 }
690
691 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
692 TRACE_API("Socket id %d out of range.\n", sockid);
693 errno = EBADF;
694 return -1;
695 }
696
697 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
698 TRACE_API("Invalid socket id: %d\n", sockid);
699 errno = EBADF;
700 return -1;
701 }
702
703 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM &&
704 mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
705 TRACE_API("Not a stream socket id: %d\n", sockid);
706 errno = ENOTSOCK;
707 return -1;
708 }
709
710 if (!addr) {
711 TRACE_API("Socket %d: empty address!\n", sockid);
712 errno = EINVAL;
713 return -1;
714 }
715
716 if (mtcp->smap[sockid].opts & MTCP_ADDR_BIND) {
717 TRACE_API("Socket %d: adress already bind for this socket.\n", sockid);
718 errno = EINVAL;
719 return -1;
720 }
721
722 /* we only allow bind() for AF_INET address */
723 if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
724 TRACE_API("Socket %d: invalid argument!\n", sockid);
725 errno = EINVAL;
726 return -1;
727 }
728
729 if (mtcp->listener) {
730 TRACE_API("Address already bound!\n");
731 errno = EINVAL;
732 return -1;
733 }
734 addr_in = (struct sockaddr_in *)addr;
735 mtcp->smap[sockid].saddr = *addr_in;
736 mtcp->smap[sockid].opts |= MTCP_ADDR_BIND;
737
738 return 0;
739 }
740 /*----------------------------------------------------------------------------*/
741 int
mtcp_listen(mctx_t mctx,int sockid,int backlog)742 mtcp_listen(mctx_t mctx, int sockid, int backlog)
743 {
744 mtcp_manager_t mtcp;
745 struct tcp_listener *listener;
746
747 mtcp = GetMTCPManager(mctx);
748 if (!mtcp) {
749 errno = EACCES;
750 return -1;
751 }
752
753 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
754 TRACE_API("Socket id %d out of range.\n", sockid);
755 errno = EBADF;
756 return -1;
757 }
758
759 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
760 TRACE_API("Invalid socket id: %d\n", sockid);
761 errno = EBADF;
762 return -1;
763 }
764
765 if (mtcp->smap[sockid].socktype == MOS_SOCK_STREAM) {
766 mtcp->smap[sockid].socktype = MOS_SOCK_STREAM_LISTEN;
767 }
768
769 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
770 TRACE_API("Not a listening socket. id: %d\n", sockid);
771 errno = ENOTSOCK;
772 return -1;
773 }
774
775 if (backlog <= 0 || backlog > g_config.mos->max_concurrency) {
776 errno = EINVAL;
777 return -1;
778 }
779
780 listener = (struct tcp_listener *)calloc(1, sizeof(struct tcp_listener));
781 if (!listener) {
782 /* errno set from the malloc() */
783 errno = ENOMEM;
784 return -1;
785 }
786
787 listener->sockid = sockid;
788 listener->backlog = backlog;
789 listener->socket = &mtcp->smap[sockid];
790
791 if (pthread_cond_init(&listener->accept_cond, NULL)) {
792 perror("pthread_cond_init of ctx->accept_cond\n");
793 /* errno set by pthread_cond_init() */
794 free(listener);
795 return -1;
796 }
797 if (pthread_mutex_init(&listener->accept_lock, NULL)) {
798 perror("pthread_mutex_init of ctx->accept_lock\n");
799 /* errno set by pthread_mutex_init() */
800 free(listener);
801 return -1;
802 }
803
804 listener->acceptq = CreateStreamQueue(backlog);
805 if (!listener->acceptq) {
806 free(listener);
807 errno = ENOMEM;
808 return -1;
809 }
810
811 mtcp->smap[sockid].listener = listener;
812 mtcp->listener = listener;
813
814 return 0;
815 }
816 /*----------------------------------------------------------------------------*/
817 int
mtcp_accept(mctx_t mctx,int sockid,struct sockaddr * addr,socklen_t * addrlen)818 mtcp_accept(mctx_t mctx, int sockid, struct sockaddr *addr, socklen_t *addrlen)
819 {
820 mtcp_manager_t mtcp;
821 struct tcp_listener *listener;
822 socket_map_t socket;
823 tcp_stream *accepted = NULL;
824
825 mtcp = GetMTCPManager(mctx);
826 if (!mtcp) {
827 errno = EACCES;
828 return -1;
829 }
830
831 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
832 TRACE_API("Socket id %d out of range.\n", sockid);
833 errno = EBADF;
834 return -1;
835 }
836
837 /* requires listening socket */
838 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM_LISTEN) {
839 errno = EINVAL;
840 return -1;
841 }
842
843 listener = mtcp->smap[sockid].listener;
844
845 /* dequeue from the acceptq without lock first */
846 /* if nothing there, acquire lock and cond_wait */
847 accepted = StreamDequeue(listener->acceptq);
848 if (!accepted) {
849 if (listener->socket->opts & MTCP_NONBLOCK) {
850 errno = EAGAIN;
851 return -1;
852
853 } else {
854 pthread_mutex_lock(&listener->accept_lock);
855 while ((accepted = StreamDequeue(listener->acceptq)) == NULL) {
856 pthread_cond_wait(&listener->accept_cond, &listener->accept_lock);
857
858 if (mtcp->ctx->done || mtcp->ctx->exit) {
859 pthread_mutex_unlock(&listener->accept_lock);
860 errno = EINTR;
861 return -1;
862 }
863 }
864 pthread_mutex_unlock(&listener->accept_lock);
865 }
866 }
867
868 if (!accepted) {
869 TRACE_ERROR("[NEVER HAPPEN] Empty accept queue!\n");
870 }
871
872 if (!accepted->socket) {
873 socket = AllocateSocket(mctx, MOS_SOCK_STREAM);
874 if (!socket) {
875 TRACE_ERROR("Failed to create new socket!\n");
876 /* TODO: destroy the stream */
877 errno = ENFILE;
878 return -1;
879 }
880 socket->stream = accepted;
881 accepted->socket = socket;
882
883 /* set socket addr parameters */
884 socket->saddr.sin_family = AF_INET;
885 socket->saddr.sin_port = accepted->dport;
886 socket->saddr.sin_addr.s_addr = accepted->daddr;
887
888 /* if monitor is enabled, complete the socket assignment */
889 if (socket->stream->pair_stream != NULL)
890 socket->stream->pair_stream->socket = socket;
891 }
892
893 if (!(listener->socket->epoll & MOS_EPOLLET) &&
894 !StreamQueueIsEmpty(listener->acceptq))
895 AddEpollEvent(mtcp->ep,
896 USR_SHADOW_EVENT_QUEUE,
897 listener->socket, MOS_EPOLLIN);
898
899 TRACE_API("Stream %d accepted.\n", accepted->id);
900
901 if (addr && addrlen) {
902 struct sockaddr_in *addr_in = (struct sockaddr_in *)addr;
903 addr_in->sin_family = AF_INET;
904 addr_in->sin_port = accepted->dport;
905 addr_in->sin_addr.s_addr = accepted->daddr;
906 *addrlen = sizeof(struct sockaddr_in);
907 }
908
909 return accepted->socket->id;
910 }
911 /*----------------------------------------------------------------------------*/
912 int
mtcp_init_rss(mctx_t mctx,in_addr_t saddr_base,int num_addr,in_addr_t daddr,in_addr_t dport)913 mtcp_init_rss(mctx_t mctx, in_addr_t saddr_base, int num_addr,
914 in_addr_t daddr, in_addr_t dport)
915 {
916 mtcp_manager_t mtcp;
917 addr_pool_t ap;
918
919 mtcp = GetMTCPManager(mctx);
920 if (!mtcp) {
921 errno = EACCES;
922 return -1;
923 }
924
925 if (saddr_base == INADDR_ANY) {
926 int nif_out;
927
928 /* for the INADDR_ANY, find the output interface for the destination
929 and set the saddr_base as the ip address of the output interface */
930 nif_out = GetOutputInterface(daddr);
931 if (nif_out < 0) {
932 TRACE_DBG("Could not determine nif idx!\n");
933 errno = EINVAL;
934 return -1;
935 }
936 saddr_base = g_config.mos->netdev_table->ent[nif_out]->ip_addr;
937 }
938
939 ap = CreateAddressPoolPerCore(mctx->cpu, num_cpus,
940 saddr_base, num_addr, daddr, dport);
941 if (!ap) {
942 errno = ENOMEM;
943 return -1;
944 }
945
946 mtcp->ap = ap;
947
948 return 0;
949 }
950 /*----------------------------------------------------------------------------*/
951 int
eval_bpf_5tuple(struct sfbpf_program fcode,in_addr_t saddr,in_port_t sport,in_addr_t daddr,in_port_t dport)952 eval_bpf_5tuple(struct sfbpf_program fcode,
953 in_addr_t saddr, in_port_t sport,
954 in_addr_t daddr, in_port_t dport) {
955 uint8_t buf[TOTAL_TCP_HEADER_LEN];
956 struct ethhdr *ethh;
957 struct iphdr *iph;
958 struct tcphdr *tcph;
959
960 ethh = (struct ethhdr *)buf;
961 ethh->h_proto = htons(ETH_P_IP);
962 iph = (struct iphdr *)(ethh + 1);
963 iph->ihl = IP_HEADER_LEN >> 2;
964 iph->version = 4;
965 iph->tos = 0;
966 iph->tot_len = htons(IP_HEADER_LEN + TCP_HEADER_LEN);
967 iph->id = htons(0);
968 iph->protocol = IPPROTO_TCP;
969 iph->saddr = saddr;
970 iph->daddr = daddr;
971 iph->check = 0;
972 tcph = (struct tcphdr *)(iph + 1);
973 tcph->source = sport;
974 tcph->dest = dport;
975
976 return EVAL_BPFFILTER(fcode, (uint8_t *)iph - sizeof(struct ethhdr),
977 TOTAL_TCP_HEADER_LEN);
978 }
979 /*----------------------------------------------------------------------------*/
980 int
mtcp_connect(mctx_t mctx,int sockid,const struct sockaddr * addr,socklen_t addrlen)981 mtcp_connect(mctx_t mctx, int sockid,
982 const struct sockaddr *addr, socklen_t addrlen)
983 {
984 mtcp_manager_t mtcp;
985 socket_map_t socket;
986 tcp_stream *cur_stream;
987 struct sockaddr_in *addr_in;
988 in_addr_t dip;
989 in_port_t dport;
990 int is_dyn_bound = FALSE;
991 int ret, nif;
992 int cnt_match = 0;
993 struct mon_listener *walk;
994 struct sfbpf_program fcode;
995
996 cur_stream = NULL;
997 mtcp = GetMTCPManager(mctx);
998 if (!mtcp) {
999 errno = EACCES;
1000 return -1;
1001 }
1002
1003 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1004 TRACE_API("Socket id %d out of range.\n", sockid);
1005 errno = EBADF;
1006 return -1;
1007 }
1008
1009 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1010 TRACE_API("Invalid socket id: %d\n", sockid);
1011 errno = EBADF;
1012 return -1;
1013 }
1014
1015 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
1016 TRACE_API("Not an end socket. id: %d\n", sockid);
1017 errno = ENOTSOCK;
1018 return -1;
1019 }
1020
1021 if (!addr) {
1022 TRACE_API("Socket %d: empty address!\n", sockid);
1023 errno = EFAULT;
1024 return -1;
1025 }
1026
1027 /* we only allow bind() for AF_INET address */
1028 if (addr->sa_family != AF_INET || addrlen < sizeof(struct sockaddr_in)) {
1029 TRACE_API("Socket %d: invalid argument!\n", sockid);
1030 errno = EAFNOSUPPORT;
1031 return -1;
1032 }
1033
1034 socket = &mtcp->smap[sockid];
1035 if (socket->stream) {
1036 TRACE_API("Socket %d: stream already exist!\n", sockid);
1037 if (socket->stream->state >= TCP_ST_ESTABLISHED) {
1038 errno = EISCONN;
1039 } else {
1040 errno = EALREADY;
1041 }
1042 return -1;
1043 }
1044
1045 addr_in = (struct sockaddr_in *)addr;
1046 dip = addr_in->sin_addr.s_addr;
1047 dport = addr_in->sin_port;
1048
1049 /* address binding */
1050 if (socket->opts & MTCP_ADDR_BIND &&
1051 socket->saddr.sin_port != INPORT_ANY &&
1052 socket->saddr.sin_addr.s_addr != INADDR_ANY) {
1053 int rss_core;
1054
1055 rss_core = GetRSSCPUCore(socket->saddr.sin_addr.s_addr, dip,
1056 socket->saddr.sin_port, dport, num_queues);
1057
1058 if (rss_core != mctx->cpu) {
1059 errno = EINVAL;
1060 return -1;
1061 }
1062 } else {
1063 if (mtcp->ap) {
1064 ret = FetchAddressPerCore(mtcp->ap,
1065 mctx->cpu, num_queues, addr_in, &socket->saddr);
1066 } else {
1067 nif = GetOutputInterface(dip);
1068 if (nif < 0) {
1069 errno = EINVAL;
1070 return -1;
1071 }
1072 ret = FetchAddress(ap[nif],
1073 mctx->cpu, num_queues, addr_in, &socket->saddr);
1074 }
1075 if (ret < 0) {
1076 errno = EAGAIN;
1077 return -1;
1078 }
1079 socket->opts |= MTCP_ADDR_BIND;
1080 is_dyn_bound = TRUE;
1081 }
1082
1083 cnt_match = 0;
1084 if (mtcp->num_msp > 0) {
1085 TAILQ_FOREACH(walk, &mtcp->monitors, link) {
1086 fcode = walk->stream_syn_fcode;
1087 if (!(ISSET_BPFFILTER(fcode) &&
1088 eval_bpf_5tuple(fcode, socket->saddr.sin_addr.s_addr,
1089 socket->saddr.sin_port,
1090 dip, dport) == 0)) {
1091 walk->is_stream_syn_filter_hit = 1; // set the 'filter hit' flag to 1
1092 cnt_match++;
1093 }
1094 }
1095 }
1096
1097 if (mtcp->num_msp > 0 && cnt_match > 0) {
1098 /* 150820 dhkim: XXX: embedded mode is not verified */
1099 #if 1
1100 cur_stream = CreateClientTCPStream(mtcp, socket,
1101 STREAM_TYPE(MOS_SOCK_STREAM) |
1102 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
1103 socket->saddr.sin_addr.s_addr,
1104 socket->saddr.sin_port, dip, dport, NULL);
1105 #else
1106 cur_stream = CreateDualTCPStream(mtcp, socket,
1107 STREAM_TYPE(MOS_SOCK_STREAM) |
1108 STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE),
1109 socket->saddr.sin_addr.s_addr,
1110 socket->saddr.sin_port, dip, dport, NULL);
1111 #endif
1112 }
1113 else
1114 cur_stream = CreateTCPStream(mtcp, socket, STREAM_TYPE(MOS_SOCK_STREAM),
1115 socket->saddr.sin_addr.s_addr,
1116 socket->saddr.sin_port, dip, dport, NULL);
1117 if (!cur_stream) {
1118 TRACE_ERROR("Socket %d: failed to create tcp_stream!\n", sockid);
1119 errno = ENOMEM;
1120 return -1;
1121 }
1122
1123 if (is_dyn_bound)
1124 cur_stream->is_bound_addr = TRUE;
1125 cur_stream->sndvar->cwnd = 1;
1126 cur_stream->sndvar->ssthresh = cur_stream->sndvar->mss * 10;
1127 cur_stream->side = MOS_SIDE_CLI;
1128 /* if monitor is enabled, update the pair stream side as well */
1129 if (cur_stream->pair_stream) {
1130 cur_stream->pair_stream->side = MOS_SIDE_SVR;
1131 /*
1132 * if buffer management is off, then disable
1133 * monitoring tcp ring of server...
1134 * if there is even a single monitor asking for
1135 * buffer management, enable it (that's why the
1136 * need for the loop)
1137 */
1138 cur_stream->pair_stream->buffer_mgmt = BUFMGMT_OFF;
1139 struct socket_map *walk;
1140 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
1141 uint8_t bm = walk->monitor_stream->monitor_listener->server_buf_mgmt;
1142 if (bm > cur_stream->pair_stream->buffer_mgmt) {
1143 cur_stream->pair_stream->buffer_mgmt = bm;
1144 break;
1145 }
1146 } SOCKQ_FOREACH_END;
1147 }
1148
1149 cur_stream->state = TCP_ST_SYN_SENT;
1150 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1151
1152 TRACE_STATE("Stream %d: TCP_ST_SYN_SENT\n", cur_stream->id);
1153
1154 SQ_LOCK(&mtcp->ctx->connect_lock);
1155 ret = StreamEnqueue(mtcp->connectq, cur_stream);
1156 SQ_UNLOCK(&mtcp->ctx->connect_lock);
1157 mtcp->wakeup_flag = TRUE;
1158 if (ret < 0) {
1159 TRACE_ERROR("Socket %d: failed to enqueue to conenct queue!\n", sockid);
1160 SQ_LOCK(&mtcp->ctx->destroyq_lock);
1161 StreamEnqueue(mtcp->destroyq, cur_stream);
1162 SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1163 errno = EAGAIN;
1164 return -1;
1165 }
1166
1167 /* if nonblocking socket, return EINPROGRESS */
1168 if (socket->opts & MTCP_NONBLOCK) {
1169 errno = EINPROGRESS;
1170 return -1;
1171
1172 } else {
1173 while (1) {
1174 if (!cur_stream) {
1175 TRACE_ERROR("STREAM DESTROYED\n");
1176 errno = ETIMEDOUT;
1177 return -1;
1178 }
1179 if (cur_stream->state > TCP_ST_ESTABLISHED) {
1180 TRACE_ERROR("Socket %d: weird state %s\n",
1181 sockid, TCPStateToString(cur_stream));
1182 // TODO: how to handle this?
1183 errno = ENOSYS;
1184 return -1;
1185 }
1186
1187 if (cur_stream->state == TCP_ST_ESTABLISHED) {
1188 break;
1189 }
1190 usleep(1000);
1191 }
1192 }
1193
1194 return 0;
1195 }
1196 /*----------------------------------------------------------------------------*/
1197 static inline int
CloseStreamSocket(mctx_t mctx,int sockid)1198 CloseStreamSocket(mctx_t mctx, int sockid)
1199 {
1200 mtcp_manager_t mtcp;
1201 tcp_stream *cur_stream;
1202 int ret;
1203
1204 mtcp = GetMTCPManager(mctx);
1205 if (!mtcp) {
1206 errno = EACCES;
1207 return -1;
1208 }
1209
1210 cur_stream = mtcp->smap[sockid].stream;
1211 if (!cur_stream) {
1212 TRACE_API("Socket %d: stream does not exist.\n", sockid);
1213 errno = ENOTCONN;
1214 return -1;
1215 }
1216
1217 if (cur_stream->closed) {
1218 TRACE_API("Socket %d (Stream %u): already closed stream\n",
1219 sockid, cur_stream->id);
1220 return 0;
1221 }
1222 cur_stream->closed = TRUE;
1223
1224 TRACE_API("Stream %d: closing the stream.\n", cur_stream->id);
1225
1226 /* 141029 dhkim: Check this! */
1227 cur_stream->socket = NULL;
1228
1229 if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
1230 TRACE_API("Stream %d at TCP_ST_CLOSED_RSVD. destroying the stream.\n",
1231 cur_stream->id);
1232 SQ_LOCK(&mtcp->ctx->destroyq_lock);
1233 StreamEnqueue(mtcp->destroyq, cur_stream);
1234 mtcp->wakeup_flag = TRUE;
1235 SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1236 return 0;
1237
1238 } else if (cur_stream->state == TCP_ST_SYN_SENT) {
1239 #if 1
1240 SQ_LOCK(&mtcp->ctx->destroyq_lock);
1241 StreamEnqueue(mtcp->destroyq, cur_stream);
1242 SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1243 mtcp->wakeup_flag = TRUE;
1244 #endif
1245 return -1;
1246
1247 } else if (cur_stream->state != TCP_ST_ESTABLISHED &&
1248 cur_stream->state != TCP_ST_CLOSE_WAIT) {
1249 TRACE_API("Stream %d at state %s\n",
1250 cur_stream->id, TCPStateToString(cur_stream));
1251 errno = EBADF;
1252 return -1;
1253 }
1254
1255 SQ_LOCK(&mtcp->ctx->close_lock);
1256 cur_stream->sndvar->on_closeq = TRUE;
1257 ret = StreamEnqueue(mtcp->closeq, cur_stream);
1258 mtcp->wakeup_flag = TRUE;
1259 SQ_UNLOCK(&mtcp->ctx->close_lock);
1260
1261 if (ret < 0) {
1262 TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
1263 errno = EAGAIN;
1264 return -1;
1265 }
1266
1267 return 0;
1268 }
1269 /*----------------------------------------------------------------------------*/
1270 static inline int
CloseListeningSocket(mctx_t mctx,int sockid)1271 CloseListeningSocket(mctx_t mctx, int sockid)
1272 {
1273 mtcp_manager_t mtcp;
1274 struct tcp_listener *listener;
1275
1276 mtcp = GetMTCPManager(mctx);
1277 if (!mtcp) {
1278 errno = EACCES;
1279 return -1;
1280 }
1281
1282 listener = mtcp->smap[sockid].listener;
1283 if (!listener) {
1284 errno = EINVAL;
1285 return -1;
1286 }
1287
1288 if (listener->acceptq) {
1289 DestroyStreamQueue(listener->acceptq);
1290 listener->acceptq = NULL;
1291 }
1292
1293 pthread_mutex_lock(&listener->accept_lock);
1294 pthread_cond_signal(&listener->accept_cond);
1295 pthread_mutex_unlock(&listener->accept_lock);
1296
1297 pthread_cond_destroy(&listener->accept_cond);
1298 pthread_mutex_destroy(&listener->accept_lock);
1299
1300 free(listener);
1301 mtcp->smap[sockid].listener = NULL;
1302
1303 return 0;
1304 }
1305 /*----------------------------------------------------------------------------*/
1306 int
mtcp_close(mctx_t mctx,int sockid)1307 mtcp_close(mctx_t mctx, int sockid)
1308 {
1309 mtcp_manager_t mtcp;
1310 int ret;
1311
1312 mtcp = GetMTCPManager(mctx);
1313 if (!mtcp) {
1314 errno = EACCES;
1315 return -1;
1316 }
1317
1318 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1319 TRACE_API("Socket id %d out of range.\n", sockid);
1320 errno = EBADF;
1321 return -1;
1322 }
1323
1324 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1325 TRACE_API("Invalid socket id: %d\n", sockid);
1326 errno = EBADF;
1327 return -1;
1328 }
1329
1330 TRACE_API("Socket %d: mtcp_close called.\n", sockid);
1331
1332 switch (mtcp->smap[sockid].socktype) {
1333 case MOS_SOCK_STREAM:
1334 ret = CloseStreamSocket(mctx, sockid);
1335 break;
1336
1337 case MOS_SOCK_STREAM_LISTEN:
1338 ret = CloseListeningSocket(mctx, sockid);
1339 break;
1340
1341 case MOS_SOCK_EPOLL:
1342 ret = CloseEpollSocket(mctx, sockid);
1343 break;
1344
1345 case MOS_SOCK_PIPE:
1346 ret = PipeClose(mctx, sockid);
1347 break;
1348
1349 default:
1350 errno = EINVAL;
1351 ret = -1;
1352 break;
1353 }
1354
1355 FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
1356
1357 return ret;
1358 }
1359 /*----------------------------------------------------------------------------*/
1360 int
mtcp_abort(mctx_t mctx,int sockid)1361 mtcp_abort(mctx_t mctx, int sockid)
1362 {
1363 mtcp_manager_t mtcp;
1364 tcp_stream *cur_stream;
1365 int ret;
1366
1367 mtcp = GetMTCPManager(mctx);
1368 if (!mtcp) {
1369 errno = EACCES;
1370 return -1;
1371 }
1372
1373 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1374 TRACE_API("Socket id %d out of range.\n", sockid);
1375 errno = EBADF;
1376 return -1;
1377 }
1378
1379 if (mtcp->smap[sockid].socktype == MOS_SOCK_UNUSED) {
1380 TRACE_API("Invalid socket id: %d\n", sockid);
1381 errno = EBADF;
1382 return -1;
1383 }
1384
1385 if (mtcp->smap[sockid].socktype != MOS_SOCK_STREAM) {
1386 TRACE_API("Not an end socket. id: %d\n", sockid);
1387 errno = ENOTSOCK;
1388 return -1;
1389 }
1390
1391 cur_stream = mtcp->smap[sockid].stream;
1392 if (!cur_stream) {
1393 TRACE_API("Stream %d: does not exist.\n", sockid);
1394 errno = ENOTCONN;
1395 return -1;
1396 }
1397
1398 TRACE_API("Socket %d: mtcp_abort()\n", sockid);
1399
1400 FreeSocket(mctx, sockid, mtcp->smap[sockid].socktype);
1401 cur_stream->socket = NULL;
1402
1403 if (cur_stream->state == TCP_ST_CLOSED_RSVD) {
1404 TRACE_API("Stream %d: connection already reset.\n", sockid);
1405 return ERROR;
1406
1407 } else if (cur_stream->state == TCP_ST_SYN_SENT) {
1408 /* TODO: this should notify event failure to all
1409 previous read() or write() calls */
1410 cur_stream->state = TCP_ST_CLOSED_RSVD;
1411 cur_stream->close_reason = TCP_ACTIVE_CLOSE;
1412 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1413 SQ_LOCK(&mtcp->ctx->destroyq_lock);
1414 StreamEnqueue(mtcp->destroyq, cur_stream);
1415 SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1416 mtcp->wakeup_flag = TRUE;
1417 return 0;
1418
1419 } else if (cur_stream->state == TCP_ST_CLOSING ||
1420 cur_stream->state == TCP_ST_LAST_ACK ||
1421 cur_stream->state == TCP_ST_TIME_WAIT) {
1422 cur_stream->state = TCP_ST_CLOSED_RSVD;
1423 cur_stream->close_reason = TCP_ACTIVE_CLOSE;
1424 cur_stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
1425 SQ_LOCK(&mtcp->ctx->destroyq_lock);
1426 StreamEnqueue(mtcp->destroyq, cur_stream);
1427 SQ_UNLOCK(&mtcp->ctx->destroyq_lock);
1428 mtcp->wakeup_flag = TRUE;
1429 return 0;
1430 }
1431
1432 /* the stream structure will be destroyed after sending RST */
1433 if (cur_stream->sndvar->on_resetq) {
1434 TRACE_ERROR("Stream %d: calling mtcp_abort() "
1435 "when in reset queue.\n", sockid);
1436 errno = ECONNRESET;
1437 return -1;
1438 }
1439 SQ_LOCK(&mtcp->ctx->reset_lock);
1440 cur_stream->sndvar->on_resetq = TRUE;
1441 ret = StreamEnqueue(mtcp->resetq, cur_stream);
1442 SQ_UNLOCK(&mtcp->ctx->reset_lock);
1443 mtcp->wakeup_flag = TRUE;
1444
1445 if (ret < 0) {
1446 TRACE_ERROR("(NEVER HAPPEN) Failed to enqueue the stream to close.\n");
1447 errno = EAGAIN;
1448 return -1;
1449 }
1450
1451 return 0;
1452 }
1453 /*----------------------------------------------------------------------------*/
1454 static inline int
PeekForUser(mtcp_manager_t mtcp,tcp_stream * cur_stream,char * buf,int len)1455 PeekForUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len)
1456 {
1457 struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
1458 int copylen;
1459 tcprb_t *rb = rcvvar->rcvbuf;
1460
1461 if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) {
1462 errno = EAGAIN;
1463 return -1;
1464 }
1465
1466 return copylen;
1467 }
1468 /*----------------------------------------------------------------------------*/
1469 static inline int
CopyToUser(mtcp_manager_t mtcp,tcp_stream * cur_stream,char * buf,int len)1470 CopyToUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, char *buf, int len)
1471 {
1472 struct tcp_recv_vars *rcvvar = cur_stream->rcvvar;
1473 int copylen;
1474 tcprb_t *rb = rcvvar->rcvbuf;
1475 if ((copylen = tcprb_ppeek(rb, (uint8_t *)buf, len, rb->pile)) <= 0) {
1476 errno = EAGAIN;
1477 return -1;
1478 }
1479 tcprb_setpile(rb, rb->pile + copylen);
1480
1481 rcvvar->rcv_wnd = rb->len - tcprb_cflen(rb);
1482 //printf("rcv_wnd: %d\n", rcvvar->rcv_wnd);
1483
1484 /* Advertise newly freed receive buffer */
1485 if (cur_stream->need_wnd_adv) {
1486 if (rcvvar->rcv_wnd > cur_stream->sndvar->eff_mss) {
1487 if (!cur_stream->sndvar->on_ackq) {
1488 SQ_LOCK(&mtcp->ctx->ackq_lock);
1489 cur_stream->sndvar->on_ackq = TRUE;
1490 StreamEnqueue(mtcp->ackq, cur_stream); /* this always success */
1491 SQ_UNLOCK(&mtcp->ctx->ackq_lock);
1492 cur_stream->need_wnd_adv = FALSE;
1493 mtcp->wakeup_flag = TRUE;
1494 }
1495 }
1496 }
1497
1498 return copylen;
1499 }
1500 /*----------------------------------------------------------------------------*/
1501 ssize_t
mtcp_recv(mctx_t mctx,int sockid,char * buf,size_t len,int flags)1502 mtcp_recv(mctx_t mctx, int sockid, char *buf, size_t len, int flags)
1503 {
1504 mtcp_manager_t mtcp;
1505 socket_map_t socket;
1506 tcp_stream *cur_stream;
1507 struct tcp_recv_vars *rcvvar;
1508 int event_remaining, merged_len;
1509 int ret;
1510
1511 mtcp = GetMTCPManager(mctx);
1512 if (!mtcp) {
1513 errno = EACCES;
1514 return -1;
1515 }
1516
1517 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1518 TRACE_API("Socket id %d out of range.\n", sockid);
1519 errno = EBADF;
1520 return -1;
1521 }
1522
1523 socket = &mtcp->smap[sockid];
1524 if (socket->socktype == MOS_SOCK_UNUSED) {
1525 TRACE_API("Invalid socket id: %d\n", sockid);
1526 errno = EBADF;
1527 return -1;
1528 }
1529
1530 if (socket->socktype == MOS_SOCK_PIPE) {
1531 return PipeRead(mctx, sockid, buf, len);
1532 }
1533
1534 if (socket->socktype != MOS_SOCK_STREAM) {
1535 TRACE_API("Not an end socket. id: %d\n", sockid);
1536 errno = ENOTSOCK;
1537 return -1;
1538 }
1539
1540 /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
1541 cur_stream = socket->stream;
1542 if (!cur_stream || !cur_stream->rcvvar || !cur_stream->rcvvar->rcvbuf ||
1543 !(cur_stream->state >= TCP_ST_ESTABLISHED &&
1544 cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
1545 errno = ENOTCONN;
1546 return -1;
1547 }
1548
1549 rcvvar = cur_stream->rcvvar;
1550
1551 merged_len = tcprb_cflen(rcvvar->rcvbuf);
1552
1553 /* if CLOSE_WAIT, return 0 if there is no payload */
1554 if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
1555 if (merged_len == 0)
1556 return 0;
1557 }
1558
1559 /* return EAGAIN if no receive buffer */
1560 if (socket->opts & MTCP_NONBLOCK) {
1561 if (merged_len == 0) {
1562 errno = EAGAIN;
1563 return -1;
1564 }
1565 }
1566
1567 SBUF_LOCK(&rcvvar->read_lock);
1568
1569 switch (flags) {
1570 case 0:
1571 ret = CopyToUser(mtcp, cur_stream, buf, len);
1572 break;
1573 case MSG_PEEK:
1574 ret = PeekForUser(mtcp, cur_stream, buf, len);
1575 break;
1576 default:
1577 SBUF_UNLOCK(&rcvvar->read_lock);
1578 ret = -1;
1579 errno = EINVAL;
1580 return ret;
1581 }
1582
1583 merged_len = tcprb_cflen(rcvvar->rcvbuf);
1584 event_remaining = FALSE;
1585 /* if there are remaining payload, generate EPOLLIN */
1586 /* (may due to insufficient user buffer) */
1587 if (socket->epoll & MOS_EPOLLIN) {
1588 if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
1589 event_remaining = TRUE;
1590 }
1591 }
1592 /* if waiting for close, notify it if no remaining data */
1593 if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
1594 merged_len == 0 && ret > 0) {
1595 event_remaining = TRUE;
1596 }
1597
1598 SBUF_UNLOCK(&rcvvar->read_lock);
1599
1600 if (event_remaining) {
1601 if (socket->epoll) {
1602 AddEpollEvent(mtcp->ep,
1603 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
1604 }
1605 }
1606
1607 TRACE_API("Stream %d: mtcp_recv() returning %d\n", cur_stream->id, ret);
1608 return ret;
1609 }
1610 /*----------------------------------------------------------------------------*/
1611 inline ssize_t
mtcp_read(mctx_t mctx,int sockid,char * buf,size_t len)1612 mtcp_read(mctx_t mctx, int sockid, char *buf, size_t len)
1613 {
1614 return mtcp_recv(mctx, sockid, buf, len, 0);
1615 }
1616 /*----------------------------------------------------------------------------*/
1617 ssize_t
mtcp_readv(mctx_t mctx,int sockid,const struct iovec * iov,int numIOV)1618 mtcp_readv(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV)
1619 {
1620 mtcp_manager_t mtcp;
1621 socket_map_t socket;
1622 tcp_stream *cur_stream;
1623 struct tcp_recv_vars *rcvvar;
1624 int ret, bytes_read, i;
1625 int event_remaining, merged_len;
1626
1627 mtcp = GetMTCPManager(mctx);
1628 if (!mtcp) {
1629 errno = EACCES;
1630 return -1;
1631 }
1632
1633 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1634 TRACE_API("Socket id %d out of range.\n", sockid);
1635 errno = EBADF;
1636 return -1;
1637 }
1638
1639 socket = &mtcp->smap[sockid];
1640 if (socket->socktype == MOS_SOCK_UNUSED) {
1641 TRACE_API("Invalid socket id: %d\n", sockid);
1642 errno = EBADF;
1643 return -1;
1644 }
1645
1646 if (socket->socktype != MOS_SOCK_STREAM) {
1647 TRACE_API("Not an end socket. id: %d\n", sockid);
1648 errno = ENOTSOCK;
1649 return -1;
1650 }
1651
1652 /* stream should be in ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, CLOSE_WAIT */
1653 cur_stream = socket->stream;
1654 if (!cur_stream || !cur_stream->rcvvar->rcvbuf ||
1655 !(cur_stream->state >= TCP_ST_ESTABLISHED &&
1656 cur_stream->state <= TCP_ST_CLOSE_WAIT)) {
1657 errno = ENOTCONN;
1658 return -1;
1659 }
1660
1661 rcvvar = cur_stream->rcvvar;
1662
1663 merged_len = tcprb_cflen(rcvvar->rcvbuf);
1664
1665 /* if CLOSE_WAIT, return 0 if there is no payload */
1666 if (cur_stream->state == TCP_ST_CLOSE_WAIT) {
1667 if (merged_len == 0)
1668 return 0;
1669 }
1670
1671 /* return EAGAIN if no receive buffer */
1672 if (socket->opts & MTCP_NONBLOCK) {
1673 if (merged_len == 0) {
1674 errno = EAGAIN;
1675 return -1;
1676 }
1677 }
1678
1679 SBUF_LOCK(&rcvvar->read_lock);
1680
1681 /* read and store the contents to the vectored buffers */
1682 bytes_read = 0;
1683 for (i = 0; i < numIOV; i++) {
1684 if (iov[i].iov_len <= 0)
1685 continue;
1686
1687 ret = CopyToUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
1688 if (ret <= 0)
1689 break;
1690
1691 bytes_read += ret;
1692
1693 if (ret < iov[i].iov_len)
1694 break;
1695 }
1696
1697 merged_len = tcprb_cflen(rcvvar->rcvbuf);
1698
1699 event_remaining = FALSE;
1700 /* if there are remaining payload, generate read event */
1701 /* (may due to insufficient user buffer) */
1702 if (socket->epoll & MOS_EPOLLIN) {
1703 if (!(socket->epoll & MOS_EPOLLET) && merged_len > 0) {
1704 event_remaining = TRUE;
1705 }
1706 }
1707 /* if waiting for close, notify it if no remaining data */
1708 if (cur_stream->state == TCP_ST_CLOSE_WAIT &&
1709 merged_len == 0 && bytes_read > 0) {
1710 event_remaining = TRUE;
1711 }
1712
1713 SBUF_UNLOCK(&rcvvar->read_lock);
1714
1715 if(event_remaining) {
1716 if (socket->epoll & MOS_EPOLLIN && !(socket->epoll & MOS_EPOLLET)) {
1717 AddEpollEvent(mtcp->ep,
1718 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
1719 }
1720 }
1721
1722 TRACE_API("Stream %d: mtcp_readv() returning %d\n",
1723 cur_stream->id, bytes_read);
1724 return bytes_read;
1725 }
1726 /*----------------------------------------------------------------------------*/
1727 static inline int
CopyFromUser(mtcp_manager_t mtcp,tcp_stream * cur_stream,const char * buf,int len)1728 CopyFromUser(mtcp_manager_t mtcp, tcp_stream *cur_stream, const char *buf, int len)
1729 {
1730 struct tcp_send_vars *sndvar = cur_stream->sndvar;
1731 int sndlen;
1732 int ret;
1733
1734 sndlen = MIN((int)sndvar->snd_wnd, len);
1735 if (sndlen <= 0) {
1736 errno = EAGAIN;
1737 return -1;
1738 }
1739
1740 /* allocate send buffer if not exist */
1741 if (!sndvar->sndbuf) {
1742 sndvar->sndbuf = SBInit(mtcp->rbm_snd, sndvar->iss + 1);
1743 if (!sndvar->sndbuf) {
1744 cur_stream->close_reason = TCP_NO_MEM;
1745 /* notification may not required due to -1 return */
1746 errno = ENOMEM;
1747 return -1;
1748 }
1749 }
1750
1751 ret = SBPut(mtcp->rbm_snd, sndvar->sndbuf, buf, sndlen);
1752 assert(ret == sndlen);
1753 sndvar->snd_wnd = sndvar->sndbuf->size - sndvar->sndbuf->len;
1754 if (ret <= 0) {
1755 TRACE_ERROR("SBPut failed. reason: %d (sndlen: %u, len: %u\n",
1756 ret, sndlen, sndvar->sndbuf->len);
1757 errno = EAGAIN;
1758 return -1;
1759 }
1760
1761 if (sndvar->snd_wnd <= 0) {
1762 TRACE_SNDBUF("%u Sending buffer became full!! snd_wnd: %u\n",
1763 cur_stream->id, sndvar->snd_wnd);
1764 }
1765
1766 return ret;
1767 }
1768 /*----------------------------------------------------------------------------*/
1769 ssize_t
mtcp_write(mctx_t mctx,int sockid,const char * buf,size_t len)1770 mtcp_write(mctx_t mctx, int sockid, const char *buf, size_t len)
1771 {
1772 mtcp_manager_t mtcp;
1773 socket_map_t socket;
1774 tcp_stream *cur_stream;
1775 struct tcp_send_vars *sndvar;
1776 int ret;
1777
1778 mtcp = GetMTCPManager(mctx);
1779 if (!mtcp) {
1780 errno = EACCES;
1781 return -1;
1782 }
1783
1784 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1785 TRACE_API("Socket id %d out of range.\n", sockid);
1786 errno = EBADF;
1787 return -1;
1788 }
1789
1790 socket = &mtcp->smap[sockid];
1791 if (socket->socktype == MOS_SOCK_UNUSED) {
1792 TRACE_API("Invalid socket id: %d\n", sockid);
1793 errno = EBADF;
1794 return -1;
1795 }
1796
1797 if (socket->socktype == MOS_SOCK_PIPE) {
1798 return PipeWrite(mctx, sockid, buf, len);
1799 }
1800
1801 if (socket->socktype != MOS_SOCK_STREAM) {
1802 TRACE_API("Not an end socket. id: %d\n", sockid);
1803 errno = ENOTSOCK;
1804 return -1;
1805 }
1806
1807 cur_stream = socket->stream;
1808 if (!cur_stream ||
1809 !(cur_stream->state == TCP_ST_ESTABLISHED ||
1810 cur_stream->state == TCP_ST_CLOSE_WAIT)) {
1811 errno = ENOTCONN;
1812 return -1;
1813 }
1814
1815 if (len <= 0) {
1816 if (socket->opts & MTCP_NONBLOCK) {
1817 errno = EAGAIN;
1818 return -1;
1819 } else {
1820 return 0;
1821 }
1822 }
1823
1824 sndvar = cur_stream->sndvar;
1825
1826 SBUF_LOCK(&sndvar->write_lock);
1827 ret = CopyFromUser(mtcp, cur_stream, buf, len);
1828
1829 SBUF_UNLOCK(&sndvar->write_lock);
1830
1831 if (ret > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
1832 SQ_LOCK(&mtcp->ctx->sendq_lock);
1833 sndvar->on_sendq = TRUE;
1834 StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */
1835 SQ_UNLOCK(&mtcp->ctx->sendq_lock);
1836 mtcp->wakeup_flag = TRUE;
1837 }
1838
1839 if (ret == 0 && (socket->opts & MTCP_NONBLOCK)) {
1840 ret = -1;
1841 errno = EAGAIN;
1842 }
1843
1844 /* if there are remaining sending buffer, generate write event */
1845 if (sndvar->snd_wnd > 0) {
1846 if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
1847 AddEpollEvent(mtcp->ep,
1848 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
1849 }
1850 }
1851
1852 TRACE_API("Stream %d: mtcp_write() returning %d\n", cur_stream->id, ret);
1853 return ret;
1854 }
1855 /*----------------------------------------------------------------------------*/
1856 ssize_t
mtcp_writev(mctx_t mctx,int sockid,const struct iovec * iov,int numIOV)1857 mtcp_writev(mctx_t mctx, int sockid, const struct iovec *iov, int numIOV)
1858 {
1859 mtcp_manager_t mtcp;
1860 socket_map_t socket;
1861 tcp_stream *cur_stream;
1862 struct tcp_send_vars *sndvar;
1863 int ret, to_write, i;
1864
1865 mtcp = GetMTCPManager(mctx);
1866 if (!mtcp) {
1867 errno = EACCES;
1868 return -1;
1869 }
1870
1871 if (sockid < 0 || sockid >= g_config.mos->max_concurrency) {
1872 TRACE_API("Socket id %d out of range.\n", sockid);
1873 errno = EBADF;
1874 return -1;
1875 }
1876
1877 socket = &mtcp->smap[sockid];
1878 if (socket->socktype == MOS_SOCK_UNUSED) {
1879 TRACE_API("Invalid socket id: %d\n", sockid);
1880 errno = EBADF;
1881 return -1;
1882 }
1883
1884 if (socket->socktype != MOS_SOCK_STREAM) {
1885 TRACE_API("Not an end socket. id: %d\n", sockid);
1886 errno = ENOTSOCK;
1887 return -1;
1888 }
1889
1890 cur_stream = socket->stream;
1891 if (!cur_stream ||
1892 !(cur_stream->state == TCP_ST_ESTABLISHED ||
1893 cur_stream->state == TCP_ST_CLOSE_WAIT)) {
1894 errno = ENOTCONN;
1895 return -1;
1896 }
1897
1898 sndvar = cur_stream->sndvar;
1899 SBUF_LOCK(&sndvar->write_lock);
1900
1901 /* write from the vectored buffers */
1902 to_write = 0;
1903 for (i = 0; i < numIOV; i++) {
1904 if (iov[i].iov_len <= 0)
1905 continue;
1906
1907 ret = CopyFromUser(mtcp, cur_stream, iov[i].iov_base, iov[i].iov_len);
1908 if (ret <= 0)
1909 break;
1910
1911 to_write += ret;
1912
1913 if (ret < iov[i].iov_len)
1914 break;
1915 }
1916 SBUF_UNLOCK(&sndvar->write_lock);
1917
1918 if (to_write > 0 && !(sndvar->on_sendq || sndvar->on_send_list)) {
1919 SQ_LOCK(&mtcp->ctx->sendq_lock);
1920 sndvar->on_sendq = TRUE;
1921 StreamEnqueue(mtcp->sendq, cur_stream); /* this always success */
1922 SQ_UNLOCK(&mtcp->ctx->sendq_lock);
1923 mtcp->wakeup_flag = TRUE;
1924 }
1925
1926 if (to_write == 0 && (socket->opts & MTCP_NONBLOCK)) {
1927 to_write = -1;
1928 errno = EAGAIN;
1929 }
1930
1931 /* if there are remaining sending buffer, generate write event */
1932 if (sndvar->snd_wnd > 0) {
1933 if (socket->epoll & MOS_EPOLLOUT && !(socket->epoll & MOS_EPOLLET)) {
1934 AddEpollEvent(mtcp->ep,
1935 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
1936 }
1937 }
1938
1939 TRACE_API("Stream %d: mtcp_writev() returning %d\n",
1940 cur_stream->id, to_write);
1941 return to_write;
1942 }
1943 /*----------------------------------------------------------------------------*/
1944 uint32_t
mtcp_get_connection_cnt(mctx_t mctx)1945 mtcp_get_connection_cnt(mctx_t mctx)
1946 {
1947 mtcp_manager_t mtcp;
1948 mtcp = GetMTCPManager(mctx);
1949 if (!mtcp) {
1950 errno = EACCES;
1951 return -1;
1952 }
1953
1954 if (mtcp->num_msp > 0)
1955 return mtcp->flow_cnt / 2;
1956 else
1957 return mtcp->flow_cnt;
1958 }
1959 /*----------------------------------------------------------------------------*/
1960