1*76404edcSAsim Jamshed /*
2*76404edcSAsim Jamshed  * TCP stream queue - tcp_stream_queue.c/h
3*76404edcSAsim Jamshed  *
4*76404edcSAsim Jamshed  * EunYoung Jeong
5*76404edcSAsim Jamshed  *
6*76404edcSAsim Jamshed  * Part of this code borrows Click's simple queue implementation
7*76404edcSAsim Jamshed  *
8*76404edcSAsim Jamshed  * ============================== Click License =============================
9*76404edcSAsim Jamshed  *
10*76404edcSAsim Jamshed  * Copyright (c) 1999-2000 Massachusetts Institute of Technology
11*76404edcSAsim Jamshed  *
12*76404edcSAsim Jamshed  * Permission is hereby granted, free of charge, to any person obtaining a
13*76404edcSAsim Jamshed  * copy of this software and associated documentation files (the "Software"),
14*76404edcSAsim Jamshed  * to deal in the Software without restriction, subject to the conditions
15*76404edcSAsim Jamshed  * listed in the Click LICENSE file. These conditions include: you must
16*76404edcSAsim Jamshed  * preserve this copyright notice, and you cannot mention the copyright
17*76404edcSAsim Jamshed  * holders in advertising related to the Software without their permission.
18*76404edcSAsim Jamshed  * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
19*76404edcSAsim Jamshed  * notice is a summary of the Click LICENSE file; the license in that file is
20*76404edcSAsim Jamshed  * legally binding.
21*76404edcSAsim Jamshed  */
22*76404edcSAsim Jamshed 
23*76404edcSAsim Jamshed #include <stdio.h>
24*76404edcSAsim Jamshed #include <stdlib.h>
25*76404edcSAsim Jamshed 
26*76404edcSAsim Jamshed #include "tcp_stream_queue.h"
27*76404edcSAsim Jamshed #include "debug.h"
28*76404edcSAsim Jamshed 
29*76404edcSAsim Jamshed #ifndef _INDEX_TYPE_
30*76404edcSAsim Jamshed #define _INDEX_TYPE_
31*76404edcSAsim Jamshed typedef uint32_t index_type;
32*76404edcSAsim Jamshed typedef int32_t signed_index_type;
33*76404edcSAsim Jamshed #endif
34*76404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
35*76404edcSAsim Jamshed struct stream_queue
36*76404edcSAsim Jamshed {
37*76404edcSAsim Jamshed 	index_type _capacity;
38*76404edcSAsim Jamshed 	volatile index_type _head;
39*76404edcSAsim Jamshed 	volatile index_type _tail;
40*76404edcSAsim Jamshed 
41*76404edcSAsim Jamshed 	struct tcp_stream * volatile * _q;
42*76404edcSAsim Jamshed };
43*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
44*76404edcSAsim Jamshed stream_queue_int *
CreateInternalStreamQueue(int size)45*76404edcSAsim Jamshed CreateInternalStreamQueue(int size)
46*76404edcSAsim Jamshed {
47*76404edcSAsim Jamshed 	stream_queue_int *sq;
48*76404edcSAsim Jamshed 
49*76404edcSAsim Jamshed 	sq = (stream_queue_int *)calloc(1, sizeof(stream_queue_int));
50*76404edcSAsim Jamshed 	if (!sq) {
51*76404edcSAsim Jamshed 		return NULL;
52*76404edcSAsim Jamshed 	}
53*76404edcSAsim Jamshed 
54*76404edcSAsim Jamshed 	sq->array = (tcp_stream **)calloc(size, sizeof(tcp_stream *));
55*76404edcSAsim Jamshed 	if (!sq->array) {
56*76404edcSAsim Jamshed 		free(sq);
57*76404edcSAsim Jamshed 		return NULL;
58*76404edcSAsim Jamshed 	}
59*76404edcSAsim Jamshed 
60*76404edcSAsim Jamshed 	sq->size = size;
61*76404edcSAsim Jamshed 	sq->first = sq->last = 0;
62*76404edcSAsim Jamshed 	sq->count = 0;
63*76404edcSAsim Jamshed 
64*76404edcSAsim Jamshed 	return sq;
65*76404edcSAsim Jamshed }
66*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
67*76404edcSAsim Jamshed void
DestroyInternalStreamQueue(stream_queue_int * sq)68*76404edcSAsim Jamshed DestroyInternalStreamQueue(stream_queue_int *sq)
69*76404edcSAsim Jamshed {
70*76404edcSAsim Jamshed 	if (!sq)
71*76404edcSAsim Jamshed 		return;
72*76404edcSAsim Jamshed 
73*76404edcSAsim Jamshed 	if (sq->array) {
74*76404edcSAsim Jamshed 		free(sq->array);
75*76404edcSAsim Jamshed 		sq->array = NULL;
76*76404edcSAsim Jamshed 	}
77*76404edcSAsim Jamshed 
78*76404edcSAsim Jamshed 	free(sq);
79*76404edcSAsim Jamshed }
80*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
81*76404edcSAsim Jamshed int
StreamInternalEnqueue(stream_queue_int * sq,struct tcp_stream * stream)82*76404edcSAsim Jamshed StreamInternalEnqueue(stream_queue_int *sq, struct tcp_stream *stream)
83*76404edcSAsim Jamshed {
84*76404edcSAsim Jamshed 	if (sq->count >= sq->size) {
85*76404edcSAsim Jamshed 		/* queue is full */
86*76404edcSAsim Jamshed 		TRACE_INFO("[WARNING] Queue overflow. Set larger queue size! "
87*76404edcSAsim Jamshed 				"count: %d, size: %d\n", sq->count, sq->size);
88*76404edcSAsim Jamshed 		return -1;
89*76404edcSAsim Jamshed 	}
90*76404edcSAsim Jamshed 
91*76404edcSAsim Jamshed 	sq->array[sq->last++] = stream;
92*76404edcSAsim Jamshed 	sq->count++;
93*76404edcSAsim Jamshed 	if (sq->last >= sq->size) {
94*76404edcSAsim Jamshed 		sq->last = 0;
95*76404edcSAsim Jamshed 	}
96*76404edcSAsim Jamshed 	assert (sq->count <= sq->size);
97*76404edcSAsim Jamshed 
98*76404edcSAsim Jamshed 	return 0;
99*76404edcSAsim Jamshed }
100*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
101*76404edcSAsim Jamshed struct tcp_stream *
StreamInternalDequeue(stream_queue_int * sq)102*76404edcSAsim Jamshed StreamInternalDequeue(stream_queue_int *sq)
103*76404edcSAsim Jamshed {
104*76404edcSAsim Jamshed 	struct tcp_stream *stream = NULL;
105*76404edcSAsim Jamshed 
106*76404edcSAsim Jamshed 	if (sq->count <= 0) {
107*76404edcSAsim Jamshed 		return NULL;
108*76404edcSAsim Jamshed 	}
109*76404edcSAsim Jamshed 
110*76404edcSAsim Jamshed 	stream = sq->array[sq->first++];
111*76404edcSAsim Jamshed 	assert(stream != NULL);
112*76404edcSAsim Jamshed 	if (sq->first >= sq->size) {
113*76404edcSAsim Jamshed 		sq->first = 0;
114*76404edcSAsim Jamshed 	}
115*76404edcSAsim Jamshed 	sq->count--;
116*76404edcSAsim Jamshed 	assert(sq->count >= 0);
117*76404edcSAsim Jamshed 
118*76404edcSAsim Jamshed 	return stream;
119*76404edcSAsim Jamshed }
120*76404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
121*76404edcSAsim Jamshed static inline index_type
NextIndex(stream_queue_t sq,index_type i)122*76404edcSAsim Jamshed NextIndex(stream_queue_t sq, index_type i)
123*76404edcSAsim Jamshed {
124*76404edcSAsim Jamshed 	return (i != sq->_capacity ? i + 1: 0);
125*76404edcSAsim Jamshed }
126*76404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
127*76404edcSAsim Jamshed static inline index_type
PrevIndex(stream_queue_t sq,index_type i)128*76404edcSAsim Jamshed PrevIndex(stream_queue_t sq, index_type i)
129*76404edcSAsim Jamshed {
130*76404edcSAsim Jamshed 	return (i != 0 ? i - 1: sq->_capacity);
131*76404edcSAsim Jamshed }
132*76404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
133*76404edcSAsim Jamshed int
StreamQueueIsEmpty(stream_queue_t sq)134*76404edcSAsim Jamshed StreamQueueIsEmpty(stream_queue_t sq)
135*76404edcSAsim Jamshed {
136*76404edcSAsim Jamshed 	return (sq->_head == sq->_tail);
137*76404edcSAsim Jamshed }
138*76404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
139*76404edcSAsim Jamshed static inline void
StreamMemoryBarrier(tcp_stream * volatile stream,volatile index_type index)140*76404edcSAsim Jamshed StreamMemoryBarrier(tcp_stream * volatile stream, volatile index_type index)
141*76404edcSAsim Jamshed {
142*76404edcSAsim Jamshed 	__asm__ volatile("" : : "m" (stream), "m" (index));
143*76404edcSAsim Jamshed }
144*76404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
145*76404edcSAsim Jamshed stream_queue_t
CreateStreamQueue(int capacity)146*76404edcSAsim Jamshed CreateStreamQueue(int capacity)
147*76404edcSAsim Jamshed {
148*76404edcSAsim Jamshed 	stream_queue_t sq;
149*76404edcSAsim Jamshed 
150*76404edcSAsim Jamshed 	sq = (stream_queue_t)calloc(1, sizeof(struct stream_queue));
151*76404edcSAsim Jamshed 	if (!sq)
152*76404edcSAsim Jamshed 		return NULL;
153*76404edcSAsim Jamshed 
154*76404edcSAsim Jamshed 	sq->_q = (tcp_stream **)calloc(capacity + 1, sizeof(tcp_stream *));
155*76404edcSAsim Jamshed 	if (!sq->_q) {
156*76404edcSAsim Jamshed 		free(sq);
157*76404edcSAsim Jamshed 		return NULL;
158*76404edcSAsim Jamshed 	}
159*76404edcSAsim Jamshed 
160*76404edcSAsim Jamshed 	sq->_capacity = capacity;
161*76404edcSAsim Jamshed 	sq->_head = sq->_tail = 0;
162*76404edcSAsim Jamshed 
163*76404edcSAsim Jamshed 	return sq;
164*76404edcSAsim Jamshed }
165*76404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
166*76404edcSAsim Jamshed void
DestroyStreamQueue(stream_queue_t sq)167*76404edcSAsim Jamshed DestroyStreamQueue(stream_queue_t sq)
168*76404edcSAsim Jamshed {
169*76404edcSAsim Jamshed 	if (!sq)
170*76404edcSAsim Jamshed 		return;
171*76404edcSAsim Jamshed 
172*76404edcSAsim Jamshed 	if (sq->_q) {
173*76404edcSAsim Jamshed 		free((void *)sq->_q);
174*76404edcSAsim Jamshed 		sq->_q = NULL;
175*76404edcSAsim Jamshed 	}
176*76404edcSAsim Jamshed 
177*76404edcSAsim Jamshed 	free(sq);
178*76404edcSAsim Jamshed }
179*76404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
180*76404edcSAsim Jamshed int
StreamEnqueue(stream_queue_t sq,tcp_stream * stream)181*76404edcSAsim Jamshed StreamEnqueue(stream_queue_t sq, tcp_stream *stream)
182*76404edcSAsim Jamshed {
183*76404edcSAsim Jamshed 	index_type h = sq->_head;
184*76404edcSAsim Jamshed 	index_type t = sq->_tail;
185*76404edcSAsim Jamshed 	index_type nt = NextIndex(sq, t);
186*76404edcSAsim Jamshed 
187*76404edcSAsim Jamshed 	if (nt != h) {
188*76404edcSAsim Jamshed 		sq->_q[t] = stream;
189*76404edcSAsim Jamshed 		StreamMemoryBarrier(sq->_q[t], sq->_tail);
190*76404edcSAsim Jamshed 		sq->_tail = nt;
191*76404edcSAsim Jamshed 		return 0;
192*76404edcSAsim Jamshed 	}
193*76404edcSAsim Jamshed 
194*76404edcSAsim Jamshed 	TRACE_ERROR("Exceed capacity of stream queue!\n");
195*76404edcSAsim Jamshed 	return -1;
196*76404edcSAsim Jamshed }
197*76404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
198*76404edcSAsim Jamshed tcp_stream *
StreamDequeue(stream_queue_t sq)199*76404edcSAsim Jamshed StreamDequeue(stream_queue_t sq)
200*76404edcSAsim Jamshed {
201*76404edcSAsim Jamshed 	index_type h = sq->_head;
202*76404edcSAsim Jamshed 	index_type t = sq->_tail;
203*76404edcSAsim Jamshed 
204*76404edcSAsim Jamshed 	if (h != t) {
205*76404edcSAsim Jamshed 		tcp_stream *stream = sq->_q[h];
206*76404edcSAsim Jamshed 		StreamMemoryBarrier(sq->_q[h], sq->_head);
207*76404edcSAsim Jamshed 		sq->_head = NextIndex(sq, h);
208*76404edcSAsim Jamshed 		assert(stream);
209*76404edcSAsim Jamshed 		return stream;
210*76404edcSAsim Jamshed 	}
211*76404edcSAsim Jamshed 
212*76404edcSAsim Jamshed 	return NULL;
213*76404edcSAsim Jamshed }
214*76404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
215