1cafe7743SAsim Jamshed #include "debug.h"
276404edcSAsim Jamshed #include <string.h>
376404edcSAsim Jamshed
476404edcSAsim Jamshed #include "config.h"
576404edcSAsim Jamshed #include "tcp_stream.h"
676404edcSAsim Jamshed #include "fhash.h"
776404edcSAsim Jamshed #include "tcp.h"
876404edcSAsim Jamshed #include "tcp_in.h"
976404edcSAsim Jamshed #include "tcp_out.h"
1076404edcSAsim Jamshed #include "tcp_ring_buffer.h"
1176404edcSAsim Jamshed #include "tcp_send_buffer.h"
1276404edcSAsim Jamshed #include "eventpoll.h"
1376404edcSAsim Jamshed #include "ip_out.h"
1476404edcSAsim Jamshed #include "timer.h"
1576404edcSAsim Jamshed #include "tcp_rb.h"
1676404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
1776404edcSAsim Jamshed char *state_str[] = {
1876404edcSAsim Jamshed "TCP_ST_CLOSED",
1976404edcSAsim Jamshed "TCP_ST_LISTEN",
2076404edcSAsim Jamshed "TCP_ST_SYN_SENT",
2176404edcSAsim Jamshed "TCP_ST_SYN_RCVD",
2276404edcSAsim Jamshed "TCP_ST_ESTABILSHED",
2376404edcSAsim Jamshed "TCP_ST_FIN_WAIT_1",
2476404edcSAsim Jamshed "TCP_ST_FIN_WAIT_2",
2576404edcSAsim Jamshed "TCP_ST_CLOSE_WAIT",
2676404edcSAsim Jamshed "TCP_ST_CLOSING",
2776404edcSAsim Jamshed "TCP_ST_LAST_ACK",
2876404edcSAsim Jamshed "TCP_ST_TIME_WAIT",
2976404edcSAsim Jamshed "TCP_ST_CLOSED_RSVD"
3076404edcSAsim Jamshed };
3176404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
3276404edcSAsim Jamshed char *close_reason_str[] = {
3376404edcSAsim Jamshed "NOT_CLOSED",
3476404edcSAsim Jamshed "CLOSE",
3576404edcSAsim Jamshed "CLOSED",
3676404edcSAsim Jamshed "CONN_FAIL",
3776404edcSAsim Jamshed "CONN_LOST",
3876404edcSAsim Jamshed "RESET",
3976404edcSAsim Jamshed "NO_MEM",
4076404edcSAsim Jamshed "DENIED",
4176404edcSAsim Jamshed "TIMEDOUT"
4276404edcSAsim Jamshed };
4376404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
4476404edcSAsim Jamshed static __thread unsigned long next = 1;
4576404edcSAsim Jamshed /* Function retrieved from POSIX.1-2001 standard */
4676404edcSAsim Jamshed /* RAND_MAX assumed to be 32767 */
4776404edcSAsim Jamshed static int
posix_seq_rand(void)4876404edcSAsim Jamshed posix_seq_rand(void) {
4976404edcSAsim Jamshed next = next * 1103515245 + 12345;
5076404edcSAsim Jamshed return ((unsigned)(next/65536) % 32768);
5176404edcSAsim Jamshed }
5276404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
5376404edcSAsim Jamshed void
posix_seq_srand(unsigned seed)5476404edcSAsim Jamshed posix_seq_srand(unsigned seed) {
5576404edcSAsim Jamshed next = seed % 32768;
5676404edcSAsim Jamshed }
5776404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
5876404edcSAsim Jamshed /**
5976404edcSAsim Jamshed * FYI: This is NOT a read-only return!
6076404edcSAsim Jamshed */
6176404edcSAsim Jamshed int
GetFragInfo(socket_map_t sock,int side,void * optval,socklen_t * len)6276404edcSAsim Jamshed GetFragInfo(socket_map_t sock, int side, void *optval, socklen_t *len)
6376404edcSAsim Jamshed {
6476404edcSAsim Jamshed struct tcp_stream *stream;
6576404edcSAsim Jamshed
6676404edcSAsim Jamshed stream = NULL;
6776404edcSAsim Jamshed if (!*len || ( *len % sizeof(tcpfrag_t) != 0))
6876404edcSAsim Jamshed goto frag_info_error;
6976404edcSAsim Jamshed
7076404edcSAsim Jamshed if (side != MOS_SIDE_CLI && side != MOS_SIDE_SVR) {
7176404edcSAsim Jamshed TRACE_ERROR("Invalid side requested!\n");
7276404edcSAsim Jamshed exit(EXIT_FAILURE);
7376404edcSAsim Jamshed return -1;
7476404edcSAsim Jamshed }
7576404edcSAsim Jamshed
7676404edcSAsim Jamshed struct tcp_stream *mstrm = sock->monitor_stream->stream;
7776404edcSAsim Jamshed stream = (side == mstrm->side) ? mstrm : mstrm->pair_stream;
7876404edcSAsim Jamshed
7976404edcSAsim Jamshed if (stream == NULL) goto frag_info_error;
8076404edcSAsim Jamshed
8176404edcSAsim Jamshed /* First check if the tcp ring buffer even has anything */
8276404edcSAsim Jamshed if (stream->rcvvar != NULL &&
8376404edcSAsim Jamshed stream->rcvvar->rcvbuf != NULL) {
8476404edcSAsim Jamshed tcprb_t *rcvbuf = stream->rcvvar->rcvbuf;
8576404edcSAsim Jamshed struct tcp_ring_fragment *out = (struct tcp_ring_fragment *)optval;
8676404edcSAsim Jamshed int const maxout = *len;
8776404edcSAsim Jamshed *len = 0;
8876404edcSAsim Jamshed struct _tcpfrag_t *walk;
8976404edcSAsim Jamshed TAILQ_FOREACH(walk, &rcvbuf->frags, link) {
9076404edcSAsim Jamshed if (*len == maxout)
9176404edcSAsim Jamshed break;
9276404edcSAsim Jamshed out[*len].offset = walk->head;
9376404edcSAsim Jamshed out[*len].len = walk->tail - walk->head;
9476404edcSAsim Jamshed (*len)++;
9576404edcSAsim Jamshed }
9676404edcSAsim Jamshed if (*len != maxout) {
9776404edcSAsim Jamshed /* set zero sentinel */
9876404edcSAsim Jamshed out[*len].offset = 0;
9976404edcSAsim Jamshed out[*len].len = 0;
10076404edcSAsim Jamshed }
10176404edcSAsim Jamshed } else
10276404edcSAsim Jamshed goto frag_info_error;
10376404edcSAsim Jamshed
10476404edcSAsim Jamshed return 0;
10576404edcSAsim Jamshed
10676404edcSAsim Jamshed frag_info_error:
10776404edcSAsim Jamshed optval = NULL;
10876404edcSAsim Jamshed *len = 0;
10976404edcSAsim Jamshed return -1;
11076404edcSAsim Jamshed }
11176404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
11276404edcSAsim Jamshed /**
11376404edcSAsim Jamshed * Comments later...
11476404edcSAsim Jamshed */
11576404edcSAsim Jamshed int
GetBufInfo(socket_map_t sock,int side,void * optval,socklen_t * len)11676404edcSAsim Jamshed GetBufInfo(socket_map_t sock, int side, void *optval, socklen_t *len)
11776404edcSAsim Jamshed {
11876404edcSAsim Jamshed struct tcp_stream *stream;
11976404edcSAsim Jamshed struct tcp_buf_info *tbi;
12076404edcSAsim Jamshed
12176404edcSAsim Jamshed tbi = (struct tcp_buf_info *)optval;
12276404edcSAsim Jamshed memset(tbi, 0, sizeof(struct tcp_buf_info));
12376404edcSAsim Jamshed stream = NULL;
12476404edcSAsim Jamshed
12576404edcSAsim Jamshed if (*len != sizeof(struct tcp_buf_info)) {
12676404edcSAsim Jamshed errno = EINVAL;
12776404edcSAsim Jamshed goto buf_info_error;
12876404edcSAsim Jamshed }
12976404edcSAsim Jamshed
13076404edcSAsim Jamshed if (side != MOS_SIDE_CLI && side != MOS_SIDE_SVR) {
13176404edcSAsim Jamshed TRACE_ERROR("Invalid side requested!\n");
13276404edcSAsim Jamshed errno = EINVAL;
13376404edcSAsim Jamshed goto buf_info_error;
13476404edcSAsim Jamshed }
13576404edcSAsim Jamshed
13676404edcSAsim Jamshed struct tcp_stream *mstrm = sock->monitor_stream->stream;
13776404edcSAsim Jamshed stream = (side == mstrm->side) ? mstrm : mstrm->pair_stream;
13876404edcSAsim Jamshed
13976404edcSAsim Jamshed /* First check if the tcp ring buffer even has anything */
14076404edcSAsim Jamshed if (stream != NULL &&
14176404edcSAsim Jamshed stream->rcvvar != NULL &&
14276404edcSAsim Jamshed stream->rcvvar->rcvbuf != NULL) {
14376404edcSAsim Jamshed tcprb_t *rcvbuf = stream->rcvvar->rcvbuf;
14476404edcSAsim Jamshed tcpfrag_t *f = TAILQ_LAST(&rcvbuf->frags, flist);
14576404edcSAsim Jamshed tbi->tcpbi_init_seq = stream->rcvvar->irs + 1;
14676404edcSAsim Jamshed tbi->tcpbi_last_byte_read = rcvbuf->pile;
14776404edcSAsim Jamshed tbi->tcpbi_next_byte_expected = rcvbuf->pile + tcprb_cflen(rcvbuf);
14876404edcSAsim Jamshed tbi->tcpbi_last_byte_received = (f ? f->tail : rcvbuf->head);
14976404edcSAsim Jamshed } else {
15076404edcSAsim Jamshed errno = ENODATA;
15176404edcSAsim Jamshed goto buf_info_error;
15276404edcSAsim Jamshed }
15376404edcSAsim Jamshed
15476404edcSAsim Jamshed return 0;
15576404edcSAsim Jamshed
15676404edcSAsim Jamshed buf_info_error:
15776404edcSAsim Jamshed optval = NULL;
15876404edcSAsim Jamshed *len = 0;
15976404edcSAsim Jamshed return -1;
16076404edcSAsim Jamshed }
16176404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
16276404edcSAsim Jamshed int
DisableBuf(socket_map_t sock,int side)16376404edcSAsim Jamshed DisableBuf(socket_map_t sock, int side)
16476404edcSAsim Jamshed {
165cafe7743SAsim Jamshed #ifdef DBGMSG
1663b6b9ba6SAsim Jamshed __PREPARE_DBGLOGGING();
167cafe7743SAsim Jamshed #endif
16876404edcSAsim Jamshed struct tcp_stream *stream;
16976404edcSAsim Jamshed int rc = 0;
17076404edcSAsim Jamshed
17176404edcSAsim Jamshed switch (sock->socktype) {
17276404edcSAsim Jamshed case MOS_SOCK_MONITOR_STREAM:
17376404edcSAsim Jamshed if (side == MOS_SIDE_CLI)
17476404edcSAsim Jamshed sock->monitor_listener->client_buf_mgmt = 0;
17576404edcSAsim Jamshed else if (side == MOS_SIDE_SVR)
17676404edcSAsim Jamshed sock->monitor_listener->server_buf_mgmt = 0;
17776404edcSAsim Jamshed else {
17876404edcSAsim Jamshed assert(0);
17976404edcSAsim Jamshed TRACE_DBG("Invalid side!\n");
18076404edcSAsim Jamshed rc = -1;
18176404edcSAsim Jamshed }
18276404edcSAsim Jamshed break;
18376404edcSAsim Jamshed case MOS_SOCK_MONITOR_STREAM_ACTIVE:
18476404edcSAsim Jamshed stream = sock->monitor_stream->stream;
18576404edcSAsim Jamshed if (stream->side != side)
18676404edcSAsim Jamshed stream = stream->pair_stream;
18776404edcSAsim Jamshed assert(stream->side == side);
18876404edcSAsim Jamshed stream->buffer_mgmt = 0;
18976404edcSAsim Jamshed break;
19076404edcSAsim Jamshed default:
19176404edcSAsim Jamshed assert(0);
19276404edcSAsim Jamshed TRACE_DBG("Can't disable buf for invalid socket!\n");
19376404edcSAsim Jamshed rc = -1;
19476404edcSAsim Jamshed }
19576404edcSAsim Jamshed
19676404edcSAsim Jamshed return rc;
19776404edcSAsim Jamshed }
19876404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
19976404edcSAsim Jamshed int
GetLastTimestamp(struct tcp_stream * stream,uint32_t * usecs,socklen_t * len)20076404edcSAsim Jamshed GetLastTimestamp(struct tcp_stream *stream, uint32_t *usecs, socklen_t *len)
20176404edcSAsim Jamshed {
202cafe7743SAsim Jamshed #ifdef DBGMSG
2033b6b9ba6SAsim Jamshed __PREPARE_DBGLOGGING();
204cafe7743SAsim Jamshed #endif
20576404edcSAsim Jamshed if (*len < sizeof(uint32_t)) {
20676404edcSAsim Jamshed TRACE_DBG("Size passed is not >= sizeof(uint32_t)!\n");
20776404edcSAsim Jamshed return -1;
20876404edcSAsim Jamshed }
20976404edcSAsim Jamshed
21076404edcSAsim Jamshed *usecs = (stream->last_active_ts >
21176404edcSAsim Jamshed stream->pair_stream->last_active_ts)
21276404edcSAsim Jamshed ?
21376404edcSAsim Jamshed TS_TO_USEC(stream->last_active_ts) :
21476404edcSAsim Jamshed TS_TO_USEC(stream->pair_stream->last_active_ts);
21576404edcSAsim Jamshed
21676404edcSAsim Jamshed return 0;
21776404edcSAsim Jamshed }
21876404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
21976404edcSAsim Jamshed inline int
GetTCPState(struct tcp_stream * stream,int side,void * optval,socklen_t * optlen)22076404edcSAsim Jamshed GetTCPState(struct tcp_stream *stream, int side,
22176404edcSAsim Jamshed void *optval, socklen_t *optlen)
22276404edcSAsim Jamshed {
22376404edcSAsim Jamshed if (!stream || !(stream = (side == stream->side) ? stream : stream->pair_stream))
22476404edcSAsim Jamshed return -1;
22576404edcSAsim Jamshed *(int *)optval = (int)((stream->state == TCP_ST_CLOSED_RSVD) ?
22676404edcSAsim Jamshed TCP_ST_CLOSED : stream->state);
22776404edcSAsim Jamshed return 0;
22876404edcSAsim Jamshed }
22976404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
23076404edcSAsim Jamshed inline char *
TCPStateToString(const tcp_stream * stream)23176404edcSAsim Jamshed TCPStateToString(const tcp_stream *stream)
23276404edcSAsim Jamshed {
23376404edcSAsim Jamshed return (stream) ? state_str[stream->state] : NULL;
23476404edcSAsim Jamshed }
23576404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
23676404edcSAsim Jamshed inline void
RaiseReadEvent(mtcp_manager_t mtcp,tcp_stream * stream)23776404edcSAsim Jamshed RaiseReadEvent(mtcp_manager_t mtcp, tcp_stream *stream)
23876404edcSAsim Jamshed {
23976404edcSAsim Jamshed struct tcp_recv_vars *rcvvar;
24076404edcSAsim Jamshed
24176404edcSAsim Jamshed rcvvar = stream->rcvvar;
24276404edcSAsim Jamshed
24376404edcSAsim Jamshed if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) {
24476404edcSAsim Jamshed if (stream->socket && (stream->socket->epoll & MOS_EPOLLIN))
24576404edcSAsim Jamshed AddEpollEvent(mtcp->ep, MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLIN);
24676404edcSAsim Jamshed } else if (rcvvar->rcvbuf && tcprb_cflen(rcvvar->rcvbuf) > 0) {
24776404edcSAsim Jamshed /*
24876404edcSAsim Jamshed * in case it is a monitoring socket, queue up the read events
24976404edcSAsim Jamshed * in the event_queue of only if the tcp_stream hasn't already
25076404edcSAsim Jamshed * been registered in the event queue
25176404edcSAsim Jamshed */
25276404edcSAsim Jamshed int index;
25376404edcSAsim Jamshed struct event_queue *eq;
25476404edcSAsim Jamshed struct socket_map *walk;
25576404edcSAsim Jamshed
25676404edcSAsim Jamshed SOCKQ_FOREACH_START(walk, &stream->msocks) {
25776404edcSAsim Jamshed assert(walk->socktype == MOS_SOCK_MONITOR_STREAM_ACTIVE);
25876404edcSAsim Jamshed eq = walk->monitor_stream->monitor_listener->eq;
25976404edcSAsim Jamshed
26076404edcSAsim Jamshed /* if it already has read data register... then skip this step */
26176404edcSAsim Jamshed if (stream->actions & MOS_ACT_READ_DATA)
26276404edcSAsim Jamshed return;
26376404edcSAsim Jamshed if (eq->num_events >= eq->size) {
26476404edcSAsim Jamshed TRACE_ERROR("Exceeded epoll event queue! num_events: %d, "
26576404edcSAsim Jamshed "size: %d\n", eq->num_events, eq->size);
26676404edcSAsim Jamshed return;
26776404edcSAsim Jamshed }
26876404edcSAsim Jamshed
26976404edcSAsim Jamshed index = eq->end++;
27076404edcSAsim Jamshed eq->events[index].ev.events = MOS_EPOLLIN;
27176404edcSAsim Jamshed eq->events[index].ev.data.ptr = (void *)stream;
27276404edcSAsim Jamshed
27376404edcSAsim Jamshed if (eq->end >= eq->size) {
27476404edcSAsim Jamshed eq->end = 0;
27576404edcSAsim Jamshed }
27676404edcSAsim Jamshed eq->num_events++;
27776404edcSAsim Jamshed stream->actions |= MOS_ACT_READ_DATA;
27876404edcSAsim Jamshed } SOCKQ_FOREACH_END;
27976404edcSAsim Jamshed } else {
28076404edcSAsim Jamshed TRACE_EPOLL("Stream %d: Raising read without a socket!\n", stream->id);
28176404edcSAsim Jamshed }
28276404edcSAsim Jamshed }
28376404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
28476404edcSAsim Jamshed inline void
RaiseWriteEvent(mtcp_manager_t mtcp,tcp_stream * stream)28576404edcSAsim Jamshed RaiseWriteEvent(mtcp_manager_t mtcp, tcp_stream *stream)
28676404edcSAsim Jamshed {
28776404edcSAsim Jamshed if (stream->socket) {
28876404edcSAsim Jamshed if (stream->socket->epoll & MOS_EPOLLOUT) {
28976404edcSAsim Jamshed AddEpollEvent(mtcp->ep,
29076404edcSAsim Jamshed MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLOUT);
29176404edcSAsim Jamshed }
29276404edcSAsim Jamshed } else {
29376404edcSAsim Jamshed TRACE_EPOLL("Stream %d: Raising write without a socket!\n", stream->id);
29476404edcSAsim Jamshed }
29576404edcSAsim Jamshed }
29676404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
29776404edcSAsim Jamshed inline void
RaiseCloseEvent(mtcp_manager_t mtcp,tcp_stream * stream)29876404edcSAsim Jamshed RaiseCloseEvent(mtcp_manager_t mtcp, tcp_stream *stream)
29976404edcSAsim Jamshed {
30076404edcSAsim Jamshed if (stream->socket) {
30176404edcSAsim Jamshed if (stream->socket->epoll & MOS_EPOLLRDHUP) {
30276404edcSAsim Jamshed AddEpollEvent(mtcp->ep,
30376404edcSAsim Jamshed MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLRDHUP);
30476404edcSAsim Jamshed } else if (stream->socket->epoll & MOS_EPOLLIN) {
30576404edcSAsim Jamshed AddEpollEvent(mtcp->ep,
30676404edcSAsim Jamshed MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLIN);
30776404edcSAsim Jamshed }
30876404edcSAsim Jamshed } else {
30976404edcSAsim Jamshed TRACE_EPOLL("Stream %d: Raising close without a socket!\n", stream->id);
31076404edcSAsim Jamshed }
31176404edcSAsim Jamshed }
31276404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
31376404edcSAsim Jamshed inline int
RaiseErrorEvent(mtcp_manager_t mtcp,tcp_stream * stream)31476404edcSAsim Jamshed RaiseErrorEvent(mtcp_manager_t mtcp, tcp_stream *stream)
31576404edcSAsim Jamshed {
31676404edcSAsim Jamshed if (stream->socket) {
31776404edcSAsim Jamshed if (stream->socket->epoll & MOS_EPOLLERR) {
31876404edcSAsim Jamshed /* passing closing reason for error notification */
31976404edcSAsim Jamshed return AddEpollEvent(mtcp->ep,
32076404edcSAsim Jamshed MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLERR);
32176404edcSAsim Jamshed }
32276404edcSAsim Jamshed } else {
32376404edcSAsim Jamshed TRACE_EPOLL("Stream %d: Raising error without a socket!\n", stream->id);
32476404edcSAsim Jamshed }
32576404edcSAsim Jamshed return -1;
32676404edcSAsim Jamshed }
32776404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
32876404edcSAsim Jamshed int
AddMonitorStreamSockets(mtcp_manager_t mtcp,struct tcp_stream * stream)32976404edcSAsim Jamshed AddMonitorStreamSockets(mtcp_manager_t mtcp, struct tcp_stream *stream)
33076404edcSAsim Jamshed {
33176404edcSAsim Jamshed struct mtcp_context mctx;
33276404edcSAsim Jamshed int socktype;
33376404edcSAsim Jamshed
33476404edcSAsim Jamshed mctx.cpu = mtcp->ctx->cpu;
33576404edcSAsim Jamshed struct mon_listener *walk;
33676404edcSAsim Jamshed
33776404edcSAsim Jamshed // traverse the passive socket's list
33876404edcSAsim Jamshed TAILQ_FOREACH(walk, &mtcp->monitors, link) {
33976404edcSAsim Jamshed socktype = walk->socket->socktype;
34076404edcSAsim Jamshed
34176404edcSAsim Jamshed if (socktype != MOS_SOCK_MONITOR_STREAM)
34276404edcSAsim Jamshed continue;
34376404edcSAsim Jamshed
34476404edcSAsim Jamshed /* mtcp_bind_monitor_filter()
34576404edcSAsim Jamshed * - create an monitor active socket only for the filter-passed flows
34676404edcSAsim Jamshed * - we use the result (= tag) from DetectStreamType() to avoid
34776404edcSAsim Jamshed * evaluating the same BPF filter twice */
34876404edcSAsim Jamshed if (!walk->is_stream_syn_filter_hit) {
34976404edcSAsim Jamshed continue;
35076404edcSAsim Jamshed }
35176404edcSAsim Jamshed
35276404edcSAsim Jamshed struct socket_map *s =
35376404edcSAsim Jamshed AllocateSocket(&mctx, MOS_SOCK_MONITOR_STREAM_ACTIVE);
35476404edcSAsim Jamshed if (!s)
35576404edcSAsim Jamshed return -1;
35676404edcSAsim Jamshed
35776404edcSAsim Jamshed s->monitor_stream->socket = s;
35876404edcSAsim Jamshed s->monitor_stream->stream = stream;
35976404edcSAsim Jamshed s->monitor_stream->monitor_listener = walk;
36076404edcSAsim Jamshed s->monitor_stream->client_buf_mgmt = walk->client_buf_mgmt;
36176404edcSAsim Jamshed s->monitor_stream->server_buf_mgmt = walk->server_buf_mgmt;
36276404edcSAsim Jamshed s->monitor_stream->client_mon = walk->client_mon;
36376404edcSAsim Jamshed s->monitor_stream->server_mon = walk->server_mon;
36476404edcSAsim Jamshed #ifdef NEWEV
36576404edcSAsim Jamshed s->monitor_stream->stree_dontcare =
36676404edcSAsim Jamshed s->monitor_stream->monitor_listener->stree_dontcare;
36776404edcSAsim Jamshed s->monitor_stream->stree_pre_rcv =
36876404edcSAsim Jamshed s->monitor_stream->monitor_listener->stree_pre_rcv;
36976404edcSAsim Jamshed s->monitor_stream->stree_post_snd =
37076404edcSAsim Jamshed s->monitor_stream->monitor_listener->stree_post_snd;
37176404edcSAsim Jamshed if (s->monitor_stream->stree_dontcare)
37276404edcSAsim Jamshed stree_inc_ref(s->monitor_stream->stree_dontcare);
37376404edcSAsim Jamshed if (s->monitor_stream->stree_pre_rcv)
37476404edcSAsim Jamshed stree_inc_ref(s->monitor_stream->stree_pre_rcv);
37576404edcSAsim Jamshed if (s->monitor_stream->stree_post_snd)
37676404edcSAsim Jamshed stree_inc_ref(s->monitor_stream->stree_post_snd);
37776404edcSAsim Jamshed #else
37876404edcSAsim Jamshed InitEvP(&s->monitor_stream->dontcare_evp,
37976404edcSAsim Jamshed &walk->dontcare_evb);
38076404edcSAsim Jamshed InitEvP(&s->monitor_stream->pre_tcp_evp,
38176404edcSAsim Jamshed &walk->pre_tcp_evb);
38276404edcSAsim Jamshed InitEvP(&s->monitor_stream->post_tcp_evp,
38376404edcSAsim Jamshed &walk->post_tcp_evb);
38476404edcSAsim Jamshed #endif
38576404edcSAsim Jamshed
38676404edcSAsim Jamshed SOCKQ_INSERT_TAIL(&stream->msocks, s);
38776404edcSAsim Jamshed }
38876404edcSAsim Jamshed
38976404edcSAsim Jamshed return 0;
39076404edcSAsim Jamshed }
39176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
39276404edcSAsim Jamshed int
DestroyMonitorStreamSocket(mtcp_manager_t mtcp,socket_map_t msock)39376404edcSAsim Jamshed DestroyMonitorStreamSocket(mtcp_manager_t mtcp, socket_map_t msock)
39476404edcSAsim Jamshed {
39576404edcSAsim Jamshed struct mtcp_context mctx;
39676404edcSAsim Jamshed int socktype, sockid, rc;
39776404edcSAsim Jamshed
39876404edcSAsim Jamshed if (msock == NULL) {
39976404edcSAsim Jamshed TRACE_DBG("Stream socket does not exist!\n");
40076404edcSAsim Jamshed /* exit(-1); */
40176404edcSAsim Jamshed return 0;
40276404edcSAsim Jamshed }
40376404edcSAsim Jamshed
40476404edcSAsim Jamshed rc = 0;
40576404edcSAsim Jamshed mctx.cpu = mtcp->ctx->cpu;
40676404edcSAsim Jamshed socktype = msock->socktype;
40776404edcSAsim Jamshed sockid = msock->id;
40876404edcSAsim Jamshed
40976404edcSAsim Jamshed switch (socktype) {
41076404edcSAsim Jamshed case MOS_SOCK_MONITOR_STREAM_ACTIVE:
41176404edcSAsim Jamshed FreeSocket(&mctx, sockid, socktype);
41276404edcSAsim Jamshed break;
41376404edcSAsim Jamshed case MOS_SOCK_MONITOR_RAW:
41476404edcSAsim Jamshed /* do nothing since all raw sockets point to the same socket */
41576404edcSAsim Jamshed break;
41676404edcSAsim Jamshed default:
41776404edcSAsim Jamshed TRACE_DBG("Trying to destroy a monitor socket for an unsupported type!\n");
41876404edcSAsim Jamshed rc = -1;
41976404edcSAsim Jamshed /* exit(-1); */
42076404edcSAsim Jamshed break;
42176404edcSAsim Jamshed }
42276404edcSAsim Jamshed
42376404edcSAsim Jamshed return rc;
42476404edcSAsim Jamshed }
42576404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
42676404edcSAsim Jamshed tcp_stream *
CreateTCPStream(mtcp_manager_t mtcp,socket_map_t socket,int type,uint32_t saddr,uint16_t sport,uint32_t daddr,uint16_t dport,unsigned int * hash)42776404edcSAsim Jamshed CreateTCPStream(mtcp_manager_t mtcp, socket_map_t socket, int type,
42876404edcSAsim Jamshed uint32_t saddr, uint16_t sport, uint32_t daddr, uint16_t dport,
42976404edcSAsim Jamshed unsigned int *hash)
43076404edcSAsim Jamshed {
43176404edcSAsim Jamshed tcp_stream *stream = NULL;
43276404edcSAsim Jamshed int ret;
43376404edcSAsim Jamshed /* stand-alone monitor does not need this since it is single-threaded */
43476404edcSAsim Jamshed bool flow_lock = type & STREAM_TYPE(MOS_SOCK_STREAM);
43576404edcSAsim Jamshed //bool flow_lock = false;
43676404edcSAsim Jamshed
43776404edcSAsim Jamshed if (flow_lock)
43876404edcSAsim Jamshed pthread_mutex_lock(&mtcp->ctx->flow_pool_lock);
43976404edcSAsim Jamshed
44076404edcSAsim Jamshed stream = (tcp_stream *)MPAllocateChunk(mtcp->flow_pool);
44176404edcSAsim Jamshed if (!stream) {
44276404edcSAsim Jamshed TRACE_ERROR("Cannot allocate memory for the stream. "
44376404edcSAsim Jamshed "g_config.mos->max_concurrency: %d, concurrent: %u\n",
44476404edcSAsim Jamshed g_config.mos->max_concurrency, mtcp->flow_cnt);
44576404edcSAsim Jamshed if (flow_lock)
44676404edcSAsim Jamshed pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock);
44776404edcSAsim Jamshed return NULL;
44876404edcSAsim Jamshed }
44976404edcSAsim Jamshed memset(stream, 0, sizeof(tcp_stream));
45076404edcSAsim Jamshed
45176404edcSAsim Jamshed stream->rcvvar = (struct tcp_recv_vars *)MPAllocateChunk(mtcp->rv_pool);
45276404edcSAsim Jamshed if (!stream->rcvvar) {
45376404edcSAsim Jamshed MPFreeChunk(mtcp->flow_pool, stream);
45476404edcSAsim Jamshed if (flow_lock)
45576404edcSAsim Jamshed pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock);
45676404edcSAsim Jamshed return NULL;
45776404edcSAsim Jamshed }
45876404edcSAsim Jamshed memset(stream->rcvvar, 0, sizeof(struct tcp_recv_vars));
45976404edcSAsim Jamshed
46076404edcSAsim Jamshed /* stand-alone monitor does not need to do this */
46176404edcSAsim Jamshed stream->sndvar = (struct tcp_send_vars *)MPAllocateChunk(mtcp->sv_pool);
46276404edcSAsim Jamshed if (!stream->sndvar) {
46376404edcSAsim Jamshed MPFreeChunk(mtcp->rv_pool, stream->rcvvar);
46476404edcSAsim Jamshed MPFreeChunk(mtcp->flow_pool, stream);
46576404edcSAsim Jamshed if (flow_lock)
46676404edcSAsim Jamshed pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock);
46776404edcSAsim Jamshed return NULL;
46876404edcSAsim Jamshed }
46976404edcSAsim Jamshed //if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM))
47076404edcSAsim Jamshed memset(stream->sndvar, 0, sizeof(struct tcp_send_vars));
47176404edcSAsim Jamshed
47276404edcSAsim Jamshed stream->id = mtcp->g_id++;
47376404edcSAsim Jamshed stream->saddr = saddr;
47476404edcSAsim Jamshed stream->sport = sport;
47576404edcSAsim Jamshed stream->daddr = daddr;
47676404edcSAsim Jamshed stream->dport = dport;
47776404edcSAsim Jamshed
47876404edcSAsim Jamshed ret = HTInsert(mtcp->tcp_flow_table, stream, hash);
47976404edcSAsim Jamshed if (ret < 0) {
48076404edcSAsim Jamshed TRACE_ERROR("Stream %d: "
48176404edcSAsim Jamshed "Failed to insert the stream into hash table.\n", stream->id);
48276404edcSAsim Jamshed MPFreeChunk(mtcp->flow_pool, stream);
48376404edcSAsim Jamshed if (flow_lock)
48476404edcSAsim Jamshed pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock);
48576404edcSAsim Jamshed return NULL;
48676404edcSAsim Jamshed }
48776404edcSAsim Jamshed stream->on_hash_table = TRUE;
48876404edcSAsim Jamshed mtcp->flow_cnt++;
48976404edcSAsim Jamshed
49076404edcSAsim Jamshed SOCKQ_INIT(&stream->msocks);
49176404edcSAsim Jamshed
49276404edcSAsim Jamshed /*
49376404edcSAsim Jamshed * if an embedded monitor is attached...
49476404edcSAsim Jamshed * create monitor stream socket now!
49576404edcSAsim Jamshed * If socket type is raw.. then don't create it
49676404edcSAsim Jamshed */
49776404edcSAsim Jamshed if ((mtcp->num_msp > 0) &&
49876404edcSAsim Jamshed (type & STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE)))
49976404edcSAsim Jamshed if (AddMonitorStreamSockets(mtcp, stream) < 0)
50076404edcSAsim Jamshed TRACE_DBG("Could not create monitor stream socket!\n");
50176404edcSAsim Jamshed
50276404edcSAsim Jamshed if (flow_lock)
50376404edcSAsim Jamshed pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock);
50476404edcSAsim Jamshed
50576404edcSAsim Jamshed if (socket) {
50676404edcSAsim Jamshed stream->socket = socket;
50776404edcSAsim Jamshed socket->stream = stream;
50876404edcSAsim Jamshed }
50976404edcSAsim Jamshed
51076404edcSAsim Jamshed stream->stream_type = type;
51176404edcSAsim Jamshed stream->state = TCP_ST_LISTEN;
51276404edcSAsim Jamshed /* This is handled by core.c, tcp_in.c & tcp_out.c */
51376404edcSAsim Jamshed /* stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; */
51476404edcSAsim Jamshed
51576404edcSAsim Jamshed stream->on_rto_idx = -1;
51676404edcSAsim Jamshed
51776404edcSAsim Jamshed /* stand-alone monitor does not need to do this */
51876404edcSAsim Jamshed stream->sndvar->mss = TCP_DEFAULT_MSS;
51976404edcSAsim Jamshed stream->sndvar->wscale_mine = TCP_DEFAULT_WSCALE;
52076404edcSAsim Jamshed stream->sndvar->wscale_peer = 0;
52176404edcSAsim Jamshed
52276404edcSAsim Jamshed if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) {
52376404edcSAsim Jamshed stream->sndvar->ip_id = 0;
52476404edcSAsim Jamshed stream->sndvar->nif_out = GetOutputInterface(stream->daddr);
52576404edcSAsim Jamshed
52676404edcSAsim Jamshed stream->sndvar->iss = posix_seq_rand() % TCP_MAX_SEQ;
52776404edcSAsim Jamshed //stream->sndvar->iss = 0;
52876404edcSAsim Jamshed stream->snd_nxt = stream->sndvar->iss;
52976404edcSAsim Jamshed stream->sndvar->snd_una = stream->sndvar->iss;
53076404edcSAsim Jamshed stream->sndvar->snd_wnd = g_config.mos->wmem_size;
53176404edcSAsim Jamshed stream->sndvar->rto = TCP_INITIAL_RTO;
53276404edcSAsim Jamshed #if USE_SPIN_LOCK
53376404edcSAsim Jamshed if (pthread_spin_init(&stream->sndvar->write_lock, PTHREAD_PROCESS_PRIVATE)) {
53476404edcSAsim Jamshed perror("pthread_spin_init of write_lock");
53576404edcSAsim Jamshed pthread_spin_destroy(&stream->rcvvar->read_lock);
53676404edcSAsim Jamshed #else
53776404edcSAsim Jamshed if (pthread_mutex_init(&stream->sndvar->write_lock, NULL)) {
53876404edcSAsim Jamshed perror("pthread_mutex_init of write_lock");
53976404edcSAsim Jamshed pthread_mutex_destroy(&stream->rcvvar->read_lock);
54076404edcSAsim Jamshed #endif
54176404edcSAsim Jamshed return NULL;
54276404edcSAsim Jamshed }
54376404edcSAsim Jamshed }
54476404edcSAsim Jamshed stream->rcvvar->irs = 0;
54576404edcSAsim Jamshed
54676404edcSAsim Jamshed stream->rcv_nxt = 0;
54776404edcSAsim Jamshed stream->rcvvar->rcv_wnd = TCP_INITIAL_WINDOW;
54876404edcSAsim Jamshed
54976404edcSAsim Jamshed stream->rcvvar->snd_wl1 = stream->rcvvar->irs - 1;
55076404edcSAsim Jamshed
55176404edcSAsim Jamshed stream->buffer_mgmt = BUFMGMT_FULL;
55276404edcSAsim Jamshed
55376404edcSAsim Jamshed /* needs state update by default */
55476404edcSAsim Jamshed stream->status_mgmt = 1;
55576404edcSAsim Jamshed
55676404edcSAsim Jamshed #if USE_SPIN_LOCK
55776404edcSAsim Jamshed if (pthread_spin_init(&stream->rcvvar->read_lock, PTHREAD_PROCESS_PRIVATE)) {
55876404edcSAsim Jamshed #else
55976404edcSAsim Jamshed if (pthread_mutex_init(&stream->rcvvar->read_lock, NULL)) {
56076404edcSAsim Jamshed #endif
56176404edcSAsim Jamshed perror("pthread_mutex_init of read_lock");
56276404edcSAsim Jamshed return NULL;
56376404edcSAsim Jamshed }
56476404edcSAsim Jamshed
56576404edcSAsim Jamshed #ifdef STREAM
56676404edcSAsim Jamshed uint8_t *sa;
56776404edcSAsim Jamshed uint8_t *da;
56876404edcSAsim Jamshed
56976404edcSAsim Jamshed sa = (uint8_t *)&stream->saddr;
57076404edcSAsim Jamshed da = (uint8_t *)&stream->daddr;
57176404edcSAsim Jamshed TRACE_STREAM("CREATED NEW TCP STREAM %d: "
57276404edcSAsim Jamshed "%u.%u.%u.%u(%d) -> %u.%u.%u.%u(%d) (ISS: %u)\n", stream->id,
57376404edcSAsim Jamshed sa[0], sa[1], sa[2], sa[3], ntohs(stream->sport),
57476404edcSAsim Jamshed da[0], da[1], da[2], da[3], ntohs(stream->dport),
57576404edcSAsim Jamshed stream->sndvar->iss);
57676404edcSAsim Jamshed #endif
57776404edcSAsim Jamshed
57876404edcSAsim Jamshed return stream;
57976404edcSAsim Jamshed }
58076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
58176404edcSAsim Jamshed inline tcp_stream *
58276404edcSAsim Jamshed CreateDualTCPStream(mtcp_manager_t mtcp, socket_map_t socket, int type, uint32_t saddr,
58376404edcSAsim Jamshed uint16_t sport, uint32_t daddr, uint16_t dport, unsigned int *hash)
58476404edcSAsim Jamshed {
58576404edcSAsim Jamshed tcp_stream *cur_stream, *paired_stream;
58676404edcSAsim Jamshed struct socket_map *walk;
58776404edcSAsim Jamshed
58876404edcSAsim Jamshed cur_stream = CreateTCPStream(mtcp, socket, type,
58976404edcSAsim Jamshed saddr, sport, daddr, dport, hash);
59076404edcSAsim Jamshed if (cur_stream == NULL) {
59176404edcSAsim Jamshed TRACE_ERROR("Can't create tcp_stream!\n");
59276404edcSAsim Jamshed return NULL;
59376404edcSAsim Jamshed }
59476404edcSAsim Jamshed
59576404edcSAsim Jamshed paired_stream = CreateTCPStream(mtcp, NULL, MOS_SOCK_UNUSED,
59676404edcSAsim Jamshed daddr, dport, saddr, sport, hash);
59776404edcSAsim Jamshed if (paired_stream == NULL) {
59876404edcSAsim Jamshed DestroyTCPStream(mtcp, cur_stream);
59976404edcSAsim Jamshed TRACE_ERROR("Can't create tcp_stream!\n");
60076404edcSAsim Jamshed return NULL;
60176404edcSAsim Jamshed }
60276404edcSAsim Jamshed
60376404edcSAsim Jamshed cur_stream->pair_stream = paired_stream;
60476404edcSAsim Jamshed paired_stream->pair_stream = cur_stream;
60576404edcSAsim Jamshed paired_stream->socket = socket;
60676404edcSAsim Jamshed SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
60776404edcSAsim Jamshed SOCKQ_INSERT_TAIL(&paired_stream->msocks, walk);
60876404edcSAsim Jamshed } SOCKQ_FOREACH_END;
60976404edcSAsim Jamshed paired_stream->stream_type = STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE);
61076404edcSAsim Jamshed
61176404edcSAsim Jamshed return cur_stream;
61276404edcSAsim Jamshed }
61376404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
61476404edcSAsim Jamshed inline tcp_stream *
61576404edcSAsim Jamshed CreateClientTCPStream(mtcp_manager_t mtcp, socket_map_t socket, int type,
61676404edcSAsim Jamshed uint32_t saddr, uint16_t sport, uint32_t daddr, uint16_t dport,
61776404edcSAsim Jamshed unsigned int *hash)
61876404edcSAsim Jamshed {
61976404edcSAsim Jamshed tcp_stream *cs;
62076404edcSAsim Jamshed struct socket_map *w;
62176404edcSAsim Jamshed
62276404edcSAsim Jamshed cs = CreateTCPStream(mtcp, socket, type, daddr, dport, saddr, sport, hash);
62376404edcSAsim Jamshed if (cs == NULL) {
62476404edcSAsim Jamshed TRACE_ERROR("Can't create tcp_stream!\n");
62576404edcSAsim Jamshed return NULL;
62676404edcSAsim Jamshed }
62776404edcSAsim Jamshed
62876404edcSAsim Jamshed cs->side = MOS_SIDE_CLI;
62976404edcSAsim Jamshed cs->pair_stream = NULL;
63076404edcSAsim Jamshed
63176404edcSAsim Jamshed /* if buffer management is off, then disable
63276404edcSAsim Jamshed * monitoring tcp ring of either streams (only if stream
63376404edcSAsim Jamshed * is just monitor stream active)
63476404edcSAsim Jamshed */
63576404edcSAsim Jamshed if (IS_STREAM_TYPE(cs, MOS_SOCK_MONITOR_STREAM_ACTIVE)) {
63676404edcSAsim Jamshed cs->buffer_mgmt = BUFMGMT_OFF;
63776404edcSAsim Jamshed SOCKQ_FOREACH_START(w, &cs->msocks) {
63876404edcSAsim Jamshed uint8_t bm = w->monitor_stream->client_buf_mgmt;
63976404edcSAsim Jamshed if (bm > cs->buffer_mgmt)
64076404edcSAsim Jamshed cs->buffer_mgmt = bm;
64176404edcSAsim Jamshed if (w->monitor_stream->monitor_listener->client_mon == 1)
64276404edcSAsim Jamshed cs->status_mgmt = 1;
64376404edcSAsim Jamshed } SOCKQ_FOREACH_END;
64476404edcSAsim Jamshed }
64576404edcSAsim Jamshed
64676404edcSAsim Jamshed return cs;
64776404edcSAsim Jamshed }
64876404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
64976404edcSAsim Jamshed inline tcp_stream *
65076404edcSAsim Jamshed AttachServerTCPStream(mtcp_manager_t mtcp, tcp_stream *cs, int type,
65176404edcSAsim Jamshed uint32_t saddr, uint16_t sport, uint32_t daddr, uint16_t dport)
65276404edcSAsim Jamshed {
65376404edcSAsim Jamshed tcp_stream *ss;
65476404edcSAsim Jamshed struct socket_map *w;
65576404edcSAsim Jamshed
65676404edcSAsim Jamshed /* The 3rd arg is a temp hackk... FIXIT! TODO: XXX */
65776404edcSAsim Jamshed ss = CreateTCPStream(mtcp, NULL, MOS_SOCK_UNUSED, saddr, sport, daddr, dport, NULL);
65876404edcSAsim Jamshed if (ss == NULL) {
65976404edcSAsim Jamshed TRACE_ERROR("Can't create tcp_stream!\n");
66076404edcSAsim Jamshed return NULL;
66176404edcSAsim Jamshed }
66276404edcSAsim Jamshed
66376404edcSAsim Jamshed ss->side = MOS_SIDE_SVR;
66476404edcSAsim Jamshed cs->pair_stream = ss;
66576404edcSAsim Jamshed ss->pair_stream = cs;
66676404edcSAsim Jamshed ss->socket = cs->socket;
66776404edcSAsim Jamshed SOCKQ_FOREACH_START(w, &cs->msocks) {
66876404edcSAsim Jamshed SOCKQ_INSERT_TAIL(&ss->msocks, w);
66976404edcSAsim Jamshed } SOCKQ_FOREACH_END;
67076404edcSAsim Jamshed ss->stream_type = STREAM_TYPE(MOS_SOCK_MONITOR_STREAM_ACTIVE);
67176404edcSAsim Jamshed
67276404edcSAsim Jamshed if (IS_STREAM_TYPE(ss, MOS_SOCK_MONITOR_STREAM_ACTIVE)) {
67376404edcSAsim Jamshed ss->buffer_mgmt = BUFMGMT_OFF;
67476404edcSAsim Jamshed SOCKQ_FOREACH_START(w, &ss->msocks) {
67576404edcSAsim Jamshed uint8_t bm = w->monitor_stream->server_buf_mgmt;
67676404edcSAsim Jamshed if (bm > ss->buffer_mgmt)
67776404edcSAsim Jamshed ss->buffer_mgmt = bm;
67876404edcSAsim Jamshed if (w->monitor_stream->monitor_listener->server_mon == 1)
67976404edcSAsim Jamshed ss->status_mgmt = 1;
68076404edcSAsim Jamshed } SOCKQ_FOREACH_END;
68176404edcSAsim Jamshed }
68276404edcSAsim Jamshed
68376404edcSAsim Jamshed return ss;
68476404edcSAsim Jamshed }
68576404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
68676404edcSAsim Jamshed static void
68776404edcSAsim Jamshed DestroySingleTCPStream(mtcp_manager_t mtcp, tcp_stream *stream)
68876404edcSAsim Jamshed {
68976404edcSAsim Jamshed struct sockaddr_in addr;
69076404edcSAsim Jamshed int bound_addr = FALSE;
69176404edcSAsim Jamshed int ret;
69276404edcSAsim Jamshed /* stand-alone monitor does not need this since it is single-threaded */
69376404edcSAsim Jamshed bool flow_lock = HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM);
69476404edcSAsim Jamshed
69576404edcSAsim Jamshed struct socket_map *walk;
69676404edcSAsim Jamshed
69776404edcSAsim Jamshed /* Set the stream state as CLOSED */
69876404edcSAsim Jamshed stream->state = TCP_ST_CLOSED_RSVD;
69976404edcSAsim Jamshed
70076404edcSAsim Jamshed SOCKQ_FOREACH_START(walk, &stream->msocks) {
70176404edcSAsim Jamshed HandleCallback(mtcp, MOS_HK_RCV, walk, stream->side, NULL,
70276404edcSAsim Jamshed MOS_ON_CONN_END | MOS_ON_TCP_STATE_CHANGE | stream->cb_events);
70376404edcSAsim Jamshed HandleCallback(mtcp, MOS_HK_SND, walk, stream->side, NULL,
70476404edcSAsim Jamshed MOS_ON_CONN_END | MOS_ON_TCP_STATE_CHANGE | stream->cb_events);
70576404edcSAsim Jamshed } SOCKQ_FOREACH_END;
70676404edcSAsim Jamshed
70776404edcSAsim Jamshed #if 0
70876404edcSAsim Jamshed #ifdef DUMP_STREAM
70976404edcSAsim Jamshed if (stream->close_reason != TCP_ACTIVE_CLOSE &&
71076404edcSAsim Jamshed stream->close_reason != TCP_PASSIVE_CLOSE) {
71176404edcSAsim Jamshed thread_printf(mtcp, mtcp->log_fp,
71276404edcSAsim Jamshed "Stream %d abnormally closed.\n", stream->id);
71376404edcSAsim Jamshed DumpStream(mtcp, stream);
71476404edcSAsim Jamshed DumpControlList(mtcp, mtcp->n_sender[0]);
71576404edcSAsim Jamshed }
71676404edcSAsim Jamshed #endif
71776404edcSAsim Jamshed
71876404edcSAsim Jamshed #ifdef STREAM
71976404edcSAsim Jamshed uint8_t *sa, *da;
72076404edcSAsim Jamshed sa = (uint8_t *)&stream->saddr;
72176404edcSAsim Jamshed da = (uint8_t *)&stream->daddr;
72276404edcSAsim Jamshed TRACE_STREAM("DESTROY TCP STREAM %d: "
72376404edcSAsim Jamshed "%u.%u.%u.%u(%d) -> %u.%u.%u.%u(%d) (%s)\n", stream->id,
72476404edcSAsim Jamshed sa[0], sa[1], sa[2], sa[3], ntohs(stream->sport),
72576404edcSAsim Jamshed da[0], da[1], da[2], da[3], ntohs(stream->dport),
72676404edcSAsim Jamshed close_reason_str[stream->close_reason]);
72776404edcSAsim Jamshed #endif
72876404edcSAsim Jamshed
72976404edcSAsim Jamshed if (stream->sndvar->sndbuf) {
73076404edcSAsim Jamshed TRACE_FSTAT("Stream %d: send buffer "
73176404edcSAsim Jamshed "cum_len: %lu, len: %u\n", stream->id,
73276404edcSAsim Jamshed stream->sndvar->sndbuf->cum_len,
73376404edcSAsim Jamshed stream->sndvar->sndbuf->len);
73476404edcSAsim Jamshed }
73576404edcSAsim Jamshed if (stream->rcvvar->rcvbuf) {
73676404edcSAsim Jamshed TRACE_FSTAT("Stream %d: recv buffer "
73776404edcSAsim Jamshed "cum_len: %lu, merged_len: %u, last_len: %u\n", stream->id,
73876404edcSAsim Jamshed stream->rcvvar->rcvbuf->cum_len,
73976404edcSAsim Jamshed stream->rcvvar->rcvbuf->merged_len,
74076404edcSAsim Jamshed stream->rcvvar->rcvbuf->last_len);
74176404edcSAsim Jamshed }
74276404edcSAsim Jamshed
74376404edcSAsim Jamshed #if RTM_STAT
74476404edcSAsim Jamshed /* Triple duplicated ack stats */
74576404edcSAsim Jamshed if (stream->sndvar->rstat.tdp_ack_cnt) {
74676404edcSAsim Jamshed TRACE_FSTAT("Stream %d: triple duplicated ack: %u, "
74776404edcSAsim Jamshed "retransmission bytes: %u, average rtm bytes/ack: %u\n",
74876404edcSAsim Jamshed stream->id,
74976404edcSAsim Jamshed stream->sndvar->rstat.tdp_ack_cnt, stream->sndvar->rstat.tdp_ack_bytes,
75076404edcSAsim Jamshed stream->sndvar->rstat.tdp_ack_bytes / stream->sndvar->rstat.tdp_ack_cnt);
75176404edcSAsim Jamshed }
75276404edcSAsim Jamshed
75376404edcSAsim Jamshed /* Retransmission timeout stats */
75476404edcSAsim Jamshed if (stream->sndvar->rstat.rto_cnt > 0) {
75576404edcSAsim Jamshed TRACE_FSTAT("Stream %d: timeout count: %u, bytes: %u\n", stream->id,
75676404edcSAsim Jamshed stream->sndvar->rstat.rto_cnt, stream->sndvar->rstat.rto_bytes);
75776404edcSAsim Jamshed }
75876404edcSAsim Jamshed
75976404edcSAsim Jamshed /* Recovery stats */
76076404edcSAsim Jamshed if (stream->sndvar->rstat.ack_upd_cnt) {
76176404edcSAsim Jamshed TRACE_FSTAT("Stream %d: snd_nxt update count: %u, "
76276404edcSAsim Jamshed "snd_nxt update bytes: %u, average update bytes/update: %u\n",
76376404edcSAsim Jamshed stream->id,
76476404edcSAsim Jamshed stream->sndvar->rstat.ack_upd_cnt, stream->sndvar->rstat.ack_upd_bytes,
76576404edcSAsim Jamshed stream->sndvar->rstat.ack_upd_bytes / stream->sndvar->rstat.ack_upd_cnt);
76676404edcSAsim Jamshed }
76776404edcSAsim Jamshed #if TCP_OPT_SACK_ENABLED
76876404edcSAsim Jamshed if (stream->sndvar->rstat.sack_cnt) {
76976404edcSAsim Jamshed TRACE_FSTAT("Selective ack count: %u, bytes: %u, "
77076404edcSAsim Jamshed "average bytes/ack: %u\n",
77176404edcSAsim Jamshed stream->sndvar->rstat.sack_cnt, stream->sndvar->rstat.sack_bytes,
77276404edcSAsim Jamshed stream->sndvar->rstat.sack_bytes / stream->sndvar->rstat.sack_cnt);
77376404edcSAsim Jamshed } else {
77476404edcSAsim Jamshed TRACE_FSTAT("Selective ack count: %u, bytes: %u\n",
77576404edcSAsim Jamshed stream->sndvar->rstat.sack_cnt, stream->sndvar->rstat.sack_bytes);
77676404edcSAsim Jamshed }
77776404edcSAsim Jamshed if (stream->sndvar->rstat.tdp_sack_cnt) {
77876404edcSAsim Jamshed TRACE_FSTAT("Selective tdp ack count: %u, bytes: %u, "
77976404edcSAsim Jamshed "average bytes/ack: %u\n",
78076404edcSAsim Jamshed stream->sndvar->rstat.tdp_sack_cnt, stream->sndvar->rstat.tdp_sack_bytes,
78176404edcSAsim Jamshed stream->sndvar->rstat.tdp_sack_bytes / stream->sndvar->rstat.tdp_sack_cnt);
78276404edcSAsim Jamshed } else {
78376404edcSAsim Jamshed TRACE_FSTAT("Selective ack count: %u, bytes: %u\n",
78476404edcSAsim Jamshed stream->sndvar->rstat.tdp_sack_cnt, stream->sndvar->rstat.tdp_sack_bytes);
78576404edcSAsim Jamshed }
78676404edcSAsim Jamshed #endif /* TCP_OPT_SACK_ENABLED */
78776404edcSAsim Jamshed #endif /* RTM_STAT */
78876404edcSAsim Jamshed #endif
78976404edcSAsim Jamshed
79076404edcSAsim Jamshed if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) {
79176404edcSAsim Jamshed /* stand-alone monitor does not need to do these */
79276404edcSAsim Jamshed if (stream->is_bound_addr) {
79376404edcSAsim Jamshed bound_addr = TRUE;
79476404edcSAsim Jamshed addr.sin_addr.s_addr = stream->saddr;
79576404edcSAsim Jamshed addr.sin_port = stream->sport;
79676404edcSAsim Jamshed }
79776404edcSAsim Jamshed
79876404edcSAsim Jamshed RemoveFromControlList(mtcp, stream);
79976404edcSAsim Jamshed RemoveFromSendList(mtcp, stream);
80076404edcSAsim Jamshed RemoveFromACKList(mtcp, stream);
80176404edcSAsim Jamshed
80276404edcSAsim Jamshed if (stream->on_rto_idx >= 0)
80376404edcSAsim Jamshed RemoveFromRTOList(mtcp, stream);
80476404edcSAsim Jamshed
80576404edcSAsim Jamshed SBUF_LOCK_DESTROY(&stream->rcvvar->read_lock);
80676404edcSAsim Jamshed SBUF_LOCK_DESTROY(&stream->sndvar->write_lock);
80776404edcSAsim Jamshed
80876404edcSAsim Jamshed assert(stream->on_hash_table == TRUE);
80976404edcSAsim Jamshed
81076404edcSAsim Jamshed /* free ring buffers */
81176404edcSAsim Jamshed if (stream->sndvar->sndbuf) {
81276404edcSAsim Jamshed SBFree(mtcp->rbm_snd, stream->sndvar->sndbuf);
81376404edcSAsim Jamshed stream->sndvar->sndbuf = NULL;
81476404edcSAsim Jamshed }
81576404edcSAsim Jamshed }
81676404edcSAsim Jamshed
81776404edcSAsim Jamshed if (stream->on_timewait_list)
81876404edcSAsim Jamshed RemoveFromTimewaitList(mtcp, stream);
81976404edcSAsim Jamshed
82076404edcSAsim Jamshed if (g_config.mos->tcp_timeout > 0)
82176404edcSAsim Jamshed RemoveFromTimeoutList(mtcp, stream);
82276404edcSAsim Jamshed
82376404edcSAsim Jamshed if (stream->rcvvar->rcvbuf) {
82476404edcSAsim Jamshed tcprb_del(stream->rcvvar->rcvbuf);
82576404edcSAsim Jamshed stream->rcvvar->rcvbuf = NULL;
82676404edcSAsim Jamshed }
82776404edcSAsim Jamshed
82876404edcSAsim Jamshed if (flow_lock)
82976404edcSAsim Jamshed pthread_mutex_lock(&mtcp->ctx->flow_pool_lock);
83076404edcSAsim Jamshed
83176404edcSAsim Jamshed /* remove from flow hash table */
83276404edcSAsim Jamshed HTRemove(mtcp->tcp_flow_table, stream);
83376404edcSAsim Jamshed stream->on_hash_table = FALSE;
83476404edcSAsim Jamshed
83576404edcSAsim Jamshed mtcp->flow_cnt--;
83676404edcSAsim Jamshed
83776404edcSAsim Jamshed /* if there was a corresponding monitor stream socket opened
83876404edcSAsim Jamshed * then close it */
83976404edcSAsim Jamshed SOCKQ_FOREACH_START(walk, &stream->msocks) {
84076404edcSAsim Jamshed SOCKQ_REMOVE(&stream->msocks, walk);
84176404edcSAsim Jamshed if (stream->pair_stream == NULL)
84276404edcSAsim Jamshed DestroyMonitorStreamSocket(mtcp, walk);
84376404edcSAsim Jamshed } SOCKQ_FOREACH_END;
84476404edcSAsim Jamshed
84576404edcSAsim Jamshed if (stream->pair_stream != NULL) {
84676404edcSAsim Jamshed /* Nullify pointer to sibliing tcp_stream's pair_stream */
84776404edcSAsim Jamshed stream->pair_stream->pair_stream = NULL;
84876404edcSAsim Jamshed }
84976404edcSAsim Jamshed
85076404edcSAsim Jamshed MPFreeChunk(mtcp->rv_pool, stream->rcvvar);
85176404edcSAsim Jamshed MPFreeChunk(mtcp->sv_pool, stream->sndvar);
85276404edcSAsim Jamshed MPFreeChunk(mtcp->flow_pool, stream);
85376404edcSAsim Jamshed
85476404edcSAsim Jamshed if (flow_lock)
85576404edcSAsim Jamshed /* stand-alone monitor does not need this since it is single-threaded */
85676404edcSAsim Jamshed pthread_mutex_unlock(&mtcp->ctx->flow_pool_lock);
85776404edcSAsim Jamshed
85876404edcSAsim Jamshed if (bound_addr) {
85976404edcSAsim Jamshed if (mtcp->ap) {
86076404edcSAsim Jamshed ret = FreeAddress(mtcp->ap, &addr);
86176404edcSAsim Jamshed } else {
862a5e1a556SAsim Jamshed int nif;
863a5e1a556SAsim Jamshed nif = GetOutputInterface(addr.sin_addr.s_addr);
864*8a941c7eSAsim Jamshed if (nif < 0) {
865*8a941c7eSAsim Jamshed TRACE_ERROR("Can't determine interface idx!\n");
866*8a941c7eSAsim Jamshed exit(EXIT_FAILURE);
867*8a941c7eSAsim Jamshed } else {
868a5e1a556SAsim Jamshed ret = FreeAddress(ap[nif], &addr);
86976404edcSAsim Jamshed }
870*8a941c7eSAsim Jamshed }
87176404edcSAsim Jamshed if (ret < 0) {
87276404edcSAsim Jamshed TRACE_ERROR("(NEVER HAPPEN) Failed to free address.\n");
87376404edcSAsim Jamshed }
87476404edcSAsim Jamshed }
87576404edcSAsim Jamshed
87676404edcSAsim Jamshed #ifdef NETSTAT
87776404edcSAsim Jamshed #if NETSTAT_PERTHREAD
87876404edcSAsim Jamshed TRACE_STREAM("Destroyed. Remaining flows: %u\n", mtcp->flow_cnt);
87976404edcSAsim Jamshed #endif /* NETSTAT_PERTHREAD */
88076404edcSAsim Jamshed #endif /* NETSTAT */
88176404edcSAsim Jamshed
88276404edcSAsim Jamshed }
88376404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
88476404edcSAsim Jamshed void
88576404edcSAsim Jamshed DestroyTCPStream(mtcp_manager_t mtcp, tcp_stream *stream)
88676404edcSAsim Jamshed {
88776404edcSAsim Jamshed tcp_stream *pair_stream = stream->pair_stream;
88876404edcSAsim Jamshed
88976404edcSAsim Jamshed DestroySingleTCPStream(mtcp, stream);
89076404edcSAsim Jamshed
89176404edcSAsim Jamshed if (pair_stream)
89276404edcSAsim Jamshed DestroySingleTCPStream(mtcp, pair_stream);
89376404edcSAsim Jamshed }
89476404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
89576404edcSAsim Jamshed void
89676404edcSAsim Jamshed DumpStream(mtcp_manager_t mtcp, tcp_stream *stream)
89776404edcSAsim Jamshed {
89876404edcSAsim Jamshed uint8_t *sa, *da;
89976404edcSAsim Jamshed struct tcp_send_vars *sndvar = stream->sndvar;
90076404edcSAsim Jamshed struct tcp_recv_vars *rcvvar = stream->rcvvar;
90176404edcSAsim Jamshed
90276404edcSAsim Jamshed sa = (uint8_t *)&stream->saddr;
90376404edcSAsim Jamshed da = (uint8_t *)&stream->daddr;
90476404edcSAsim Jamshed thread_printf(mtcp, mtcp->log_fp, "========== Stream %u: "
90576404edcSAsim Jamshed "%u.%u.%u.%u(%u) -> %u.%u.%u.%u(%u) ==========\n", stream->id,
90676404edcSAsim Jamshed sa[0], sa[1], sa[2], sa[3], ntohs(stream->sport),
90776404edcSAsim Jamshed da[0], da[1], da[2], da[3], ntohs(stream->dport));
90876404edcSAsim Jamshed thread_printf(mtcp, mtcp->log_fp,
90976404edcSAsim Jamshed "Stream id: %u, type: %u, state: %s, close_reason: %s\n",
91076404edcSAsim Jamshed stream->id, stream->stream_type,
91176404edcSAsim Jamshed TCPStateToString(stream), close_reason_str[stream->close_reason]);
91276404edcSAsim Jamshed if (stream->socket) {
91376404edcSAsim Jamshed socket_map_t socket = stream->socket;
91476404edcSAsim Jamshed thread_printf(mtcp, mtcp->log_fp, "Socket id: %d, type: %d, opts: %u\n"
91576404edcSAsim Jamshed "epoll: %u (IN: %u, OUT: %u, ERR: %u, RDHUP: %u, ET: %u)\n"
91676404edcSAsim Jamshed "events: %u (IN: %u, OUT: %u, ERR: %u, RDHUP: %u, ET: %u)\n",
91776404edcSAsim Jamshed socket->id, socket->socktype, socket->opts,
91876404edcSAsim Jamshed socket->epoll, socket->epoll & MOS_EPOLLIN,
91976404edcSAsim Jamshed socket->epoll & MOS_EPOLLOUT, socket->epoll & MOS_EPOLLERR,
92076404edcSAsim Jamshed socket->epoll & MOS_EPOLLRDHUP, socket->epoll & MOS_EPOLLET,
92176404edcSAsim Jamshed socket->events, socket->events & MOS_EPOLLIN,
92276404edcSAsim Jamshed socket->events & MOS_EPOLLOUT, socket->events & MOS_EPOLLERR,
92376404edcSAsim Jamshed socket->events & MOS_EPOLLRDHUP, socket->events & MOS_EPOLLET);
92476404edcSAsim Jamshed } else {
92576404edcSAsim Jamshed thread_printf(mtcp, mtcp->log_fp, "Socket: (null)\n");
92676404edcSAsim Jamshed }
92776404edcSAsim Jamshed
92876404edcSAsim Jamshed thread_printf(mtcp, mtcp->log_fp,
92976404edcSAsim Jamshed "on_hash_table: %u, on_control_list: %u (wait: %u), on_send_list: %u, "
93076404edcSAsim Jamshed "on_ack_list: %u, is_wack: %u, ack_cnt: %u\n"
93176404edcSAsim Jamshed "on_rto_idx: %d, on_timewait_list: %u, on_timeout_list: %u, "
93276404edcSAsim Jamshed "on_rcv_br_list: %u, on_snd_br_list: %u\n"
93376404edcSAsim Jamshed "on_sendq: %u, on_ackq: %u, closed: %u, on_closeq: %u, "
93476404edcSAsim Jamshed "on_closeq_int: %u, on_resetq: %u, on_resetq_int: %u\n"
93576404edcSAsim Jamshed "have_reset: %u, is_fin_sent: %u, is_fin_ackd: %u, "
93676404edcSAsim Jamshed "saw_timestamp: %u, sack_permit: %u, "
93776404edcSAsim Jamshed "is_bound_addr: %u, need_wnd_adv: %u\n", stream->on_hash_table,
93876404edcSAsim Jamshed sndvar->on_control_list, stream->control_list_waiting, sndvar->on_send_list,
93976404edcSAsim Jamshed sndvar->on_ack_list, sndvar->is_wack, sndvar->ack_cnt,
94076404edcSAsim Jamshed stream->on_rto_idx, stream->on_timewait_list, stream->on_timeout_list,
94176404edcSAsim Jamshed stream->on_rcv_br_list, stream->on_snd_br_list,
94276404edcSAsim Jamshed sndvar->on_sendq, sndvar->on_ackq,
94376404edcSAsim Jamshed stream->closed, sndvar->on_closeq, sndvar->on_closeq_int,
94476404edcSAsim Jamshed sndvar->on_resetq, sndvar->on_resetq_int,
94576404edcSAsim Jamshed stream->have_reset, sndvar->is_fin_sent,
94676404edcSAsim Jamshed sndvar->is_fin_ackd, stream->saw_timestamp, stream->sack_permit,
94776404edcSAsim Jamshed stream->is_bound_addr, stream->need_wnd_adv);
94876404edcSAsim Jamshed
94976404edcSAsim Jamshed thread_printf(mtcp, mtcp->log_fp, "========== Send variables ==========\n");
95076404edcSAsim Jamshed thread_printf(mtcp, mtcp->log_fp,
95176404edcSAsim Jamshed "ip_id: %u, mss: %u, eff_mss: %u, wscale(me, peer): (%u, %u), nif_out: %d\n",
95276404edcSAsim Jamshed sndvar->ip_id, sndvar->mss, sndvar->eff_mss,
95376404edcSAsim Jamshed sndvar->wscale_mine, sndvar->wscale_peer, sndvar->nif_out);
95476404edcSAsim Jamshed thread_printf(mtcp, mtcp->log_fp,
95576404edcSAsim Jamshed "snd_nxt: %u, snd_una: %u, iss: %u, fss: %u\nsnd_wnd: %u, "
95676404edcSAsim Jamshed "peer_wnd: %u, cwnd: %u, ssthresh: %u\n",
95776404edcSAsim Jamshed stream->snd_nxt, sndvar->snd_una, sndvar->iss, sndvar->fss,
95876404edcSAsim Jamshed sndvar->snd_wnd, sndvar->peer_wnd, sndvar->cwnd, sndvar->ssthresh);
95976404edcSAsim Jamshed
96076404edcSAsim Jamshed if (sndvar->sndbuf) {
96176404edcSAsim Jamshed thread_printf(mtcp, mtcp->log_fp,
96276404edcSAsim Jamshed "Send buffer: init_seq: %u, head_seq: %u, "
96376404edcSAsim Jamshed "len: %d, cum_len: %lu, size: %d\n",
96476404edcSAsim Jamshed sndvar->sndbuf->init_seq, sndvar->sndbuf->head_seq,
96576404edcSAsim Jamshed sndvar->sndbuf->len, sndvar->sndbuf->cum_len, sndvar->sndbuf->size);
96676404edcSAsim Jamshed } else {
96776404edcSAsim Jamshed thread_printf(mtcp, mtcp->log_fp, "Send buffer: (null)\n");
96876404edcSAsim Jamshed }
96976404edcSAsim Jamshed thread_printf(mtcp, mtcp->log_fp,
97076404edcSAsim Jamshed "nrtx: %u, max_nrtx: %u, rto: %u, ts_rto: %u, "
97176404edcSAsim Jamshed "ts_lastack_sent: %u\n", sndvar->nrtx, sndvar->max_nrtx,
97276404edcSAsim Jamshed sndvar->rto, sndvar->ts_rto, sndvar->ts_lastack_sent);
97376404edcSAsim Jamshed
97476404edcSAsim Jamshed thread_printf(mtcp, mtcp->log_fp,
97576404edcSAsim Jamshed "========== Receive variables ==========\n");
97676404edcSAsim Jamshed thread_printf(mtcp, mtcp->log_fp,
97776404edcSAsim Jamshed "rcv_nxt: %u, irs: %u, rcv_wnd: %u, "
97876404edcSAsim Jamshed "snd_wl1: %u, snd_wl2: %u\n",
97976404edcSAsim Jamshed stream->rcv_nxt, rcvvar->irs,
98076404edcSAsim Jamshed rcvvar->rcv_wnd, rcvvar->snd_wl1, rcvvar->snd_wl2);
9813b6b9ba6SAsim Jamshed if (!rcvvar->rcvbuf) {
98276404edcSAsim Jamshed thread_printf(mtcp, mtcp->log_fp, "Receive buffer: (null)\n");
98376404edcSAsim Jamshed }
98476404edcSAsim Jamshed thread_printf(mtcp, mtcp->log_fp, "last_ack_seq: %u, dup_acks: %u\n",
98576404edcSAsim Jamshed rcvvar->last_ack_seq, rcvvar->dup_acks);
98676404edcSAsim Jamshed thread_printf(mtcp, mtcp->log_fp,
98776404edcSAsim Jamshed "ts_recent: %u, ts_lastack_rcvd: %u, ts_last_ts_upd: %u, "
98876404edcSAsim Jamshed "ts_tw_expire: %u\n", rcvvar->ts_recent, rcvvar->ts_lastack_rcvd,
98976404edcSAsim Jamshed rcvvar->ts_last_ts_upd, rcvvar->ts_tw_expire);
99076404edcSAsim Jamshed thread_printf(mtcp, mtcp->log_fp,
99176404edcSAsim Jamshed "srtt: %u, mdev: %u, mdev_max: %u, rttvar: %u, rtt_seq: %u\n",
99276404edcSAsim Jamshed rcvvar->srtt, rcvvar->mdev, rcvvar->mdev_max,
99376404edcSAsim Jamshed rcvvar->rttvar, rcvvar->rtt_seq);
99476404edcSAsim Jamshed }
99576404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
996