1 /*
2  * TCP stream queue - tcp_stream_queue.c/h
3  *
4  * EunYoung Jeong
5  *
6  * Part of this code borrows Click's simple queue implementation
7  *
8  * ============================== Click License =============================
9  *
10  * Copyright (c) 1999-2000 Massachusetts Institute of Technology
11  *
12  * Permission is hereby granted, free of charge, to any person obtaining a
13  * copy of this software and associated documentation files (the "Software"),
14  * to deal in the Software without restriction, subject to the conditions
15  * listed in the Click LICENSE file. These conditions include: you must
16  * preserve this copyright notice, and you cannot mention the copyright
17  * holders in advertising related to the Software without their permission.
18  * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
19  * notice is a summary of the Click LICENSE file; the license in that file is
20  * legally binding.
21  */
22 
23 #include <stdio.h>
24 #include <stdlib.h>
25 
26 #include "tcp_stream_queue.h"
27 #include "debug.h"
28 
29 #ifndef _INDEX_TYPE_
30 #define _INDEX_TYPE_
31 typedef uint32_t index_type;
32 typedef int32_t signed_index_type;
33 #endif
34 /*---------------------------------------------------------------------------*/
35 struct stream_queue
36 {
37 	index_type _capacity;
38 	volatile index_type _head;
39 	volatile index_type _tail;
40 
41 	struct tcp_stream * volatile * _q;
42 };
43 /*----------------------------------------------------------------------------*/
44 stream_queue_int *
CreateInternalStreamQueue(int size)45 CreateInternalStreamQueue(int size)
46 {
47 	stream_queue_int *sq;
48 
49 	sq = (stream_queue_int *)calloc(1, sizeof(stream_queue_int));
50 	if (!sq) {
51 		return NULL;
52 	}
53 
54 	sq->array = (tcp_stream **)calloc(size, sizeof(tcp_stream *));
55 	if (!sq->array) {
56 		free(sq);
57 		return NULL;
58 	}
59 
60 	sq->size = size;
61 	sq->first = sq->last = 0;
62 	sq->count = 0;
63 
64 	return sq;
65 }
66 /*----------------------------------------------------------------------------*/
67 void
DestroyInternalStreamQueue(stream_queue_int * sq)68 DestroyInternalStreamQueue(stream_queue_int *sq)
69 {
70 	if (!sq)
71 		return;
72 
73 	if (sq->array) {
74 		free(sq->array);
75 		sq->array = NULL;
76 	}
77 
78 	free(sq);
79 }
80 /*----------------------------------------------------------------------------*/
81 int
StreamInternalEnqueue(stream_queue_int * sq,struct tcp_stream * stream)82 StreamInternalEnqueue(stream_queue_int *sq, struct tcp_stream *stream)
83 {
84 	if (sq->count >= sq->size) {
85 		/* queue is full */
86 		TRACE_INFO("[WARNING] Queue overflow. Set larger queue size! "
87 				"count: %d, size: %d\n", sq->count, sq->size);
88 		return -1;
89 	}
90 
91 	sq->array[sq->last++] = stream;
92 	sq->count++;
93 	if (sq->last >= sq->size) {
94 		sq->last = 0;
95 	}
96 	assert (sq->count <= sq->size);
97 
98 	return 0;
99 }
100 /*----------------------------------------------------------------------------*/
101 struct tcp_stream *
StreamInternalDequeue(stream_queue_int * sq)102 StreamInternalDequeue(stream_queue_int *sq)
103 {
104 	struct tcp_stream *stream = NULL;
105 
106 	if (sq->count <= 0) {
107 		return NULL;
108 	}
109 
110 	stream = sq->array[sq->first++];
111 	assert(stream != NULL);
112 	if (sq->first >= sq->size) {
113 		sq->first = 0;
114 	}
115 	sq->count--;
116 	assert(sq->count >= 0);
117 
118 	return stream;
119 }
120 /*---------------------------------------------------------------------------*/
121 static inline index_type
NextIndex(stream_queue_t sq,index_type i)122 NextIndex(stream_queue_t sq, index_type i)
123 {
124 	return (i != sq->_capacity ? i + 1: 0);
125 }
126 /*---------------------------------------------------------------------------*/
127 static inline index_type
PrevIndex(stream_queue_t sq,index_type i)128 PrevIndex(stream_queue_t sq, index_type i)
129 {
130 	return (i != 0 ? i - 1: sq->_capacity);
131 }
132 /*---------------------------------------------------------------------------*/
133 int
StreamQueueIsEmpty(stream_queue_t sq)134 StreamQueueIsEmpty(stream_queue_t sq)
135 {
136 	return (sq->_head == sq->_tail);
137 }
138 /*---------------------------------------------------------------------------*/
139 static inline void
StreamMemoryBarrier(tcp_stream * volatile stream,volatile index_type index)140 StreamMemoryBarrier(tcp_stream * volatile stream, volatile index_type index)
141 {
142 	__asm__ volatile("" : : "m" (stream), "m" (index));
143 }
144 /*---------------------------------------------------------------------------*/
145 stream_queue_t
CreateStreamQueue(int capacity)146 CreateStreamQueue(int capacity)
147 {
148 	stream_queue_t sq;
149 
150 	sq = (stream_queue_t)calloc(1, sizeof(struct stream_queue));
151 	if (!sq)
152 		return NULL;
153 
154 	sq->_q = (tcp_stream **)calloc(capacity + 1, sizeof(tcp_stream *));
155 	if (!sq->_q) {
156 		free(sq);
157 		return NULL;
158 	}
159 
160 	sq->_capacity = capacity;
161 	sq->_head = sq->_tail = 0;
162 
163 	return sq;
164 }
165 /*---------------------------------------------------------------------------*/
166 void
DestroyStreamQueue(stream_queue_t sq)167 DestroyStreamQueue(stream_queue_t sq)
168 {
169 	if (!sq)
170 		return;
171 
172 	if (sq->_q) {
173 		free((void *)sq->_q);
174 		sq->_q = NULL;
175 	}
176 
177 	free(sq);
178 }
179 /*---------------------------------------------------------------------------*/
180 int
StreamEnqueue(stream_queue_t sq,tcp_stream * stream)181 StreamEnqueue(stream_queue_t sq, tcp_stream *stream)
182 {
183 	index_type h = sq->_head;
184 	index_type t = sq->_tail;
185 	index_type nt = NextIndex(sq, t);
186 
187 	if (nt != h) {
188 		sq->_q[t] = stream;
189 		StreamMemoryBarrier(sq->_q[t], sq->_tail);
190 		sq->_tail = nt;
191 		return 0;
192 	}
193 
194 	TRACE_ERROR("Exceed capacity of stream queue!\n");
195 	return -1;
196 }
197 /*---------------------------------------------------------------------------*/
198 tcp_stream *
StreamDequeue(stream_queue_t sq)199 StreamDequeue(stream_queue_t sq)
200 {
201 	index_type h = sq->_head;
202 	index_type t = sq->_tail;
203 
204 	if (h != t) {
205 		tcp_stream *stream = sq->_q[h];
206 		StreamMemoryBarrier(sq->_q[h], sq->_head);
207 		sq->_head = NextIndex(sq, h);
208 		assert(stream);
209 		return stream;
210 	}
211 
212 	return NULL;
213 }
214 /*---------------------------------------------------------------------------*/
215