Lines Matching refs:stream

64 	struct tcp_stream *stream;  in GetFragInfo()  local
66 stream = NULL; in GetFragInfo()
76 struct tcp_stream *mstrm = sock->monitor_stream->stream; in GetFragInfo()
77 stream = (side == mstrm->side) ? mstrm : mstrm->pair_stream; in GetFragInfo()
79 if (stream == NULL) goto frag_info_error; in GetFragInfo()
82 if (stream->rcvvar != NULL && in GetFragInfo()
83 stream->rcvvar->rcvbuf != NULL) { in GetFragInfo()
84 tcprb_t *rcvbuf = stream->rcvvar->rcvbuf; in GetFragInfo()
118 struct tcp_stream *stream; in GetBufInfo() local
123 stream = NULL; in GetBufInfo()
136 struct tcp_stream *mstrm = sock->monitor_stream->stream; in GetBufInfo()
137 stream = (side == mstrm->side) ? mstrm : mstrm->pair_stream; in GetBufInfo()
140 if (stream != NULL && in GetBufInfo()
141 stream->rcvvar != NULL && in GetBufInfo()
142 stream->rcvvar->rcvbuf != NULL) { in GetBufInfo()
143 tcprb_t *rcvbuf = stream->rcvvar->rcvbuf; in GetBufInfo()
145 tbi->tcpbi_init_seq = stream->rcvvar->irs + 1; in GetBufInfo()
168 struct tcp_stream *stream; in DisableBuf() local
184 stream = sock->monitor_stream->stream; in DisableBuf()
185 if (stream->side != side) in DisableBuf()
186 stream = stream->pair_stream; in DisableBuf()
187 assert(stream->side == side); in DisableBuf()
188 stream->buffer_mgmt = 0; in DisableBuf()
200 GetLastTimestamp(struct tcp_stream *stream, uint32_t *usecs, socklen_t *len) in GetLastTimestamp() argument
210 *usecs = (stream->last_active_ts > in GetLastTimestamp()
211 stream->pair_stream->last_active_ts) in GetLastTimestamp()
213 TS_TO_USEC(stream->last_active_ts) : in GetLastTimestamp()
214 TS_TO_USEC(stream->pair_stream->last_active_ts); in GetLastTimestamp()
220 GetTCPState(struct tcp_stream *stream, int side, in GetTCPState() argument
223 if (!stream || !(stream = (side == stream->side) ? stream : stream->pair_stream)) in GetTCPState()
225 *(int *)optval = (int)((stream->state == TCP_ST_CLOSED_RSVD) ? in GetTCPState()
226 TCP_ST_CLOSED : stream->state); in GetTCPState()
231 TCPStateToString(const tcp_stream *stream) in TCPStateToString() argument
233 return (stream) ? state_str[stream->state] : NULL; in TCPStateToString()
237 RaiseReadEvent(mtcp_manager_t mtcp, tcp_stream *stream) in RaiseReadEvent() argument
241 rcvvar = stream->rcvvar; in RaiseReadEvent()
243 if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) { in RaiseReadEvent()
244 if (stream->socket && (stream->socket->epoll & MOS_EPOLLIN)) in RaiseReadEvent()
245 AddEpollEvent(mtcp->ep, MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLIN); in RaiseReadEvent()
256 SOCKQ_FOREACH_START(walk, &stream->msocks) { in RaiseReadEvent()
261 if (stream->actions & MOS_ACT_READ_DATA) in RaiseReadEvent()
271 eq->events[index].ev.data.ptr = (void *)stream; in RaiseReadEvent()
277 stream->actions |= MOS_ACT_READ_DATA; in RaiseReadEvent()
280 TRACE_EPOLL("Stream %d: Raising read without a socket!\n", stream->id); in RaiseReadEvent()
285 RaiseWriteEvent(mtcp_manager_t mtcp, tcp_stream *stream) in RaiseWriteEvent() argument
287 if (stream->socket) { in RaiseWriteEvent()
288 if (stream->socket->epoll & MOS_EPOLLOUT) { in RaiseWriteEvent()
290 MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLOUT); in RaiseWriteEvent()
293 TRACE_EPOLL("Stream %d: Raising write without a socket!\n", stream->id); in RaiseWriteEvent()
298 RaiseCloseEvent(mtcp_manager_t mtcp, tcp_stream *stream) in RaiseCloseEvent() argument
300 if (stream->socket) { in RaiseCloseEvent()
301 if (stream->socket->epoll & MOS_EPOLLRDHUP) { in RaiseCloseEvent()
303 MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLRDHUP); in RaiseCloseEvent()
304 } else if (stream->socket->epoll & MOS_EPOLLIN) { in RaiseCloseEvent()
306 MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLIN); in RaiseCloseEvent()
309 TRACE_EPOLL("Stream %d: Raising close without a socket!\n", stream->id); in RaiseCloseEvent()
314 RaiseErrorEvent(mtcp_manager_t mtcp, tcp_stream *stream) in RaiseErrorEvent() argument
316 if (stream->socket) { in RaiseErrorEvent()
317 if (stream->socket->epoll & MOS_EPOLLERR) { in RaiseErrorEvent()
320 MOS_EVENT_QUEUE, stream->socket, MOS_EPOLLERR); in RaiseErrorEvent()
323 TRACE_EPOLL("Stream %d: Raising error without a socket!\n", stream->id); in RaiseErrorEvent()
329 AddMonitorStreamSockets(mtcp_manager_t mtcp, struct tcp_stream *stream) in AddMonitorStreamSockets() argument
358 s->monitor_stream->stream = stream; in AddMonitorStreamSockets()
386 SOCKQ_INSERT_TAIL(&stream->msocks, s); in AddMonitorStreamSockets()
431 tcp_stream *stream = NULL; local
440 stream = (tcp_stream *)MPAllocateChunk(mtcp->flow_pool);
441 if (!stream) {
449 memset(stream, 0, sizeof(tcp_stream));
451 stream->rcvvar = (struct tcp_recv_vars *)MPAllocateChunk(mtcp->rv_pool);
452 if (!stream->rcvvar) {
453 MPFreeChunk(mtcp->flow_pool, stream);
458 memset(stream->rcvvar, 0, sizeof(struct tcp_recv_vars));
461 stream->sndvar = (struct tcp_send_vars *)MPAllocateChunk(mtcp->sv_pool);
462 if (!stream->sndvar) {
463 MPFreeChunk(mtcp->rv_pool, stream->rcvvar);
464 MPFreeChunk(mtcp->flow_pool, stream);
470 memset(stream->sndvar, 0, sizeof(struct tcp_send_vars));
472 stream->id = mtcp->g_id++;
473 stream->saddr = saddr;
474 stream->sport = sport;
475 stream->daddr = daddr;
476 stream->dport = dport;
478 ret = HTInsert(mtcp->tcp_flow_table, stream, hash);
481 "Failed to insert the stream into hash table.\n", stream->id);
482 MPFreeChunk(mtcp->flow_pool, stream);
487 stream->on_hash_table = TRUE;
490 SOCKQ_INIT(&stream->msocks);
499 if (AddMonitorStreamSockets(mtcp, stream) < 0)
506 stream->socket = socket;
507 socket->stream = stream;
510 stream->stream_type = type;
511 stream->state = TCP_ST_LISTEN;
515 stream->on_rto_idx = -1;
518 stream->sndvar->mss = TCP_DEFAULT_MSS;
519 stream->sndvar->wscale_mine = TCP_DEFAULT_WSCALE;
520 stream->sndvar->wscale_peer = 0;
522 if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) {
523 stream->sndvar->ip_id = 0;
524 stream->sndvar->nif_out = GetOutputInterface(stream->daddr);
526 stream->sndvar->iss = posix_seq_rand() % TCP_MAX_SEQ;
528 stream->snd_nxt = stream->sndvar->iss;
529 stream->sndvar->snd_una = stream->sndvar->iss;
530 stream->sndvar->snd_wnd = g_config.mos->wmem_size;
531 stream->sndvar->rto = TCP_INITIAL_RTO;
533 if (pthread_spin_init(&stream->sndvar->write_lock, PTHREAD_PROCESS_PRIVATE)) {
535 pthread_spin_destroy(&stream->rcvvar->read_lock);
537 if (pthread_mutex_init(&stream->sndvar->write_lock, NULL)) {
539 pthread_mutex_destroy(&stream->rcvvar->read_lock);
544 stream->rcvvar->irs = 0;
546 stream->rcv_nxt = 0;
547 stream->rcvvar->rcv_wnd = TCP_INITIAL_WINDOW;
549 stream->rcvvar->snd_wl1 = stream->rcvvar->irs - 1;
551 stream->buffer_mgmt = BUFMGMT_FULL;
554 stream->status_mgmt = 1;
557 if (pthread_spin_init(&stream->rcvvar->read_lock, PTHREAD_PROCESS_PRIVATE)) {
559 if (pthread_mutex_init(&stream->rcvvar->read_lock, NULL)) {
569 sa = (uint8_t *)&stream->saddr;
570 da = (uint8_t *)&stream->daddr;
572 "%u.%u.%u.%u(%d) -> %u.%u.%u.%u(%d) (ISS: %u)\n", stream->id,
573 sa[0], sa[1], sa[2], sa[3], ntohs(stream->sport),
574 da[0], da[1], da[2], da[3], ntohs(stream->dport),
575 stream->sndvar->iss);
578 return stream;
687 DestroySingleTCPStream(mtcp_manager_t mtcp, tcp_stream *stream) argument
693 bool flow_lock = HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM);
698 stream->state = TCP_ST_CLOSED_RSVD;
700 SOCKQ_FOREACH_START(walk, &stream->msocks) {
701 HandleCallback(mtcp, MOS_HK_RCV, walk, stream->side, NULL,
702 MOS_ON_CONN_END | MOS_ON_TCP_STATE_CHANGE | stream->cb_events);
703 HandleCallback(mtcp, MOS_HK_SND, walk, stream->side, NULL,
704 MOS_ON_CONN_END | MOS_ON_TCP_STATE_CHANGE | stream->cb_events);
709 if (stream->close_reason != TCP_ACTIVE_CLOSE &&
710 stream->close_reason != TCP_PASSIVE_CLOSE) {
712 "Stream %d abnormally closed.\n", stream->id);
713 DumpStream(mtcp, stream);
720 sa = (uint8_t *)&stream->saddr;
721 da = (uint8_t *)&stream->daddr;
723 "%u.%u.%u.%u(%d) -> %u.%u.%u.%u(%d) (%s)\n", stream->id,
724 sa[0], sa[1], sa[2], sa[3], ntohs(stream->sport),
725 da[0], da[1], da[2], da[3], ntohs(stream->dport),
726 close_reason_str[stream->close_reason]);
729 if (stream->sndvar->sndbuf) {
731 "cum_len: %lu, len: %u\n", stream->id,
732 stream->sndvar->sndbuf->cum_len,
733 stream->sndvar->sndbuf->len);
735 if (stream->rcvvar->rcvbuf) {
737 "cum_len: %lu, merged_len: %u, last_len: %u\n", stream->id,
738 stream->rcvvar->rcvbuf->cum_len,
739 stream->rcvvar->rcvbuf->merged_len,
740 stream->rcvvar->rcvbuf->last_len);
745 if (stream->sndvar->rstat.tdp_ack_cnt) {
748 stream->id,
749 stream->sndvar->rstat.tdp_ack_cnt, stream->sndvar->rstat.tdp_ack_bytes,
750 stream->sndvar->rstat.tdp_ack_bytes / stream->sndvar->rstat.tdp_ack_cnt);
754 if (stream->sndvar->rstat.rto_cnt > 0) {
755 TRACE_FSTAT("Stream %d: timeout count: %u, bytes: %u\n", stream->id,
756 stream->sndvar->rstat.rto_cnt, stream->sndvar->rstat.rto_bytes);
760 if (stream->sndvar->rstat.ack_upd_cnt) {
763 stream->id,
764 stream->sndvar->rstat.ack_upd_cnt, stream->sndvar->rstat.ack_upd_bytes,
765 stream->sndvar->rstat.ack_upd_bytes / stream->sndvar->rstat.ack_upd_cnt);
768 if (stream->sndvar->rstat.sack_cnt) {
771 stream->sndvar->rstat.sack_cnt, stream->sndvar->rstat.sack_bytes,
772 stream->sndvar->rstat.sack_bytes / stream->sndvar->rstat.sack_cnt);
775 stream->sndvar->rstat.sack_cnt, stream->sndvar->rstat.sack_bytes);
777 if (stream->sndvar->rstat.tdp_sack_cnt) {
780 stream->sndvar->rstat.tdp_sack_cnt, stream->sndvar->rstat.tdp_sack_bytes,
781 stream->sndvar->rstat.tdp_sack_bytes / stream->sndvar->rstat.tdp_sack_cnt);
784 stream->sndvar->rstat.tdp_sack_cnt, stream->sndvar->rstat.tdp_sack_bytes);
790 if (HAS_STREAM_TYPE(stream, MOS_SOCK_STREAM)) {
792 if (stream->is_bound_addr) {
794 addr.sin_addr.s_addr = stream->saddr;
795 addr.sin_port = stream->sport;
798 RemoveFromControlList(mtcp, stream);
799 RemoveFromSendList(mtcp, stream);
800 RemoveFromACKList(mtcp, stream);
802 if (stream->on_rto_idx >= 0)
803 RemoveFromRTOList(mtcp, stream);
805 SBUF_LOCK_DESTROY(&stream->rcvvar->read_lock);
806 SBUF_LOCK_DESTROY(&stream->sndvar->write_lock);
808 assert(stream->on_hash_table == TRUE);
811 if (stream->sndvar->sndbuf) {
812 SBFree(mtcp->rbm_snd, stream->sndvar->sndbuf);
813 stream->sndvar->sndbuf = NULL;
817 if (stream->on_timewait_list)
818 RemoveFromTimewaitList(mtcp, stream);
821 RemoveFromTimeoutList(mtcp, stream);
823 if (stream->rcvvar->rcvbuf) {
824 tcprb_del(stream->rcvvar->rcvbuf);
825 stream->rcvvar->rcvbuf = NULL;
832 HTRemove(mtcp->tcp_flow_table, stream);
833 stream->on_hash_table = FALSE;
839 SOCKQ_FOREACH_START(walk, &stream->msocks) {
840 SOCKQ_REMOVE(&stream->msocks, walk);
841 if (stream->pair_stream == NULL)
845 if (stream->pair_stream != NULL) {
847 stream->pair_stream->pair_stream = NULL;
850 MPFreeChunk(mtcp->rv_pool, stream->rcvvar);
851 MPFreeChunk(mtcp->sv_pool, stream->sndvar);
852 MPFreeChunk(mtcp->flow_pool, stream);
885 DestroyTCPStream(mtcp_manager_t mtcp, tcp_stream *stream) argument
887 tcp_stream *pair_stream = stream->pair_stream;
889 DestroySingleTCPStream(mtcp, stream);
896 DumpStream(mtcp_manager_t mtcp, tcp_stream *stream) argument
899 struct tcp_send_vars *sndvar = stream->sndvar;
900 struct tcp_recv_vars *rcvvar = stream->rcvvar;
902 sa = (uint8_t *)&stream->saddr;
903 da = (uint8_t *)&stream->daddr;
905 "%u.%u.%u.%u(%u) -> %u.%u.%u.%u(%u) ==========\n", stream->id,
906 sa[0], sa[1], sa[2], sa[3], ntohs(stream->sport),
907 da[0], da[1], da[2], da[3], ntohs(stream->dport));
910 stream->id, stream->stream_type,
911 TCPStateToString(stream), close_reason_str[stream->close_reason]);
912 if (stream->socket) {
913 socket_map_t socket = stream->socket;
937 "is_bound_addr: %u, need_wnd_adv: %u\n", stream->on_hash_table,
938 sndvar->on_control_list, stream->control_list_waiting, sndvar->on_send_list,
940 stream->on_rto_idx, stream->on_timewait_list, stream->on_timeout_list,
941 stream->on_rcv_br_list, stream->on_snd_br_list,
943 stream->closed, sndvar->on_closeq, sndvar->on_closeq_int,
945 stream->have_reset, sndvar->is_fin_sent,
946 sndvar->is_fin_ackd, stream->saw_timestamp, stream->sack_permit,
947 stream->is_bound_addr, stream->need_wnd_adv);
957 stream->snd_nxt, sndvar->snd_una, sndvar->iss, sndvar->fss,
979 stream->rcv_nxt, rcvvar->irs,