1 /*
2 * TCP stream queue - tcp_stream_queue.c/h
3 *
4 * EunYoung Jeong
5 *
6 * Part of this code borrows Click's simple queue implementation
7 *
8 * ============================== Click License =============================
9 *
10 * Copyright (c) 1999-2000 Massachusetts Institute of Technology
11 *
12 * Permission is hereby granted, free of charge, to any person obtaining a
13 * copy of this software and associated documentation files (the "Software"),
14 * to deal in the Software without restriction, subject to the conditions
15 * listed in the Click LICENSE file. These conditions include: you must
16 * preserve this copyright notice, and you cannot mention the copyright
17 * holders in advertising related to the Software without their permission.
18 * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
19 * notice is a summary of the Click LICENSE file; the license in that file is
20 * legally binding.
21 */
22
23 #include <stdio.h>
24 #include <stdlib.h>
25
26 #include "tcp_stream_queue.h"
27 #include "debug.h"
28
29 #ifndef _INDEX_TYPE_
30 #define _INDEX_TYPE_
31 typedef uint32_t index_type;
32 typedef int32_t signed_index_type;
33 #endif
34 /*---------------------------------------------------------------------------*/
35 struct stream_queue
36 {
37 index_type _capacity;
38 volatile index_type _head;
39 volatile index_type _tail;
40
41 struct tcp_stream * volatile * _q;
42 };
43 /*----------------------------------------------------------------------------*/
44 stream_queue_int *
CreateInternalStreamQueue(int size)45 CreateInternalStreamQueue(int size)
46 {
47 stream_queue_int *sq;
48
49 sq = (stream_queue_int *)calloc(1, sizeof(stream_queue_int));
50 if (!sq) {
51 return NULL;
52 }
53
54 sq->array = (tcp_stream **)calloc(size, sizeof(tcp_stream *));
55 if (!sq->array) {
56 free(sq);
57 return NULL;
58 }
59
60 sq->size = size;
61 sq->first = sq->last = 0;
62 sq->count = 0;
63
64 return sq;
65 }
66 /*----------------------------------------------------------------------------*/
67 void
DestroyInternalStreamQueue(stream_queue_int * sq)68 DestroyInternalStreamQueue(stream_queue_int *sq)
69 {
70 if (!sq)
71 return;
72
73 if (sq->array) {
74 free(sq->array);
75 sq->array = NULL;
76 }
77
78 free(sq);
79 }
80 /*----------------------------------------------------------------------------*/
81 int
StreamInternalEnqueue(stream_queue_int * sq,struct tcp_stream * stream)82 StreamInternalEnqueue(stream_queue_int *sq, struct tcp_stream *stream)
83 {
84 if (sq->count >= sq->size) {
85 /* queue is full */
86 TRACE_INFO("[WARNING] Queue overflow. Set larger queue size! "
87 "count: %d, size: %d\n", sq->count, sq->size);
88 return -1;
89 }
90
91 sq->array[sq->last++] = stream;
92 sq->count++;
93 if (sq->last >= sq->size) {
94 sq->last = 0;
95 }
96 assert (sq->count <= sq->size);
97
98 return 0;
99 }
100 /*----------------------------------------------------------------------------*/
101 struct tcp_stream *
StreamInternalDequeue(stream_queue_int * sq)102 StreamInternalDequeue(stream_queue_int *sq)
103 {
104 struct tcp_stream *stream = NULL;
105
106 if (sq->count <= 0) {
107 return NULL;
108 }
109
110 stream = sq->array[sq->first++];
111 assert(stream != NULL);
112 if (sq->first >= sq->size) {
113 sq->first = 0;
114 }
115 sq->count--;
116 assert(sq->count >= 0);
117
118 return stream;
119 }
120 /*---------------------------------------------------------------------------*/
121 static inline index_type
NextIndex(stream_queue_t sq,index_type i)122 NextIndex(stream_queue_t sq, index_type i)
123 {
124 return (i != sq->_capacity ? i + 1: 0);
125 }
126 /*---------------------------------------------------------------------------*/
127 static inline index_type
PrevIndex(stream_queue_t sq,index_type i)128 PrevIndex(stream_queue_t sq, index_type i)
129 {
130 return (i != 0 ? i - 1: sq->_capacity);
131 }
132 /*---------------------------------------------------------------------------*/
133 int
StreamQueueIsEmpty(stream_queue_t sq)134 StreamQueueIsEmpty(stream_queue_t sq)
135 {
136 return (sq->_head == sq->_tail);
137 }
138 /*---------------------------------------------------------------------------*/
139 static inline void
StreamMemoryBarrier(tcp_stream * volatile stream,volatile index_type index)140 StreamMemoryBarrier(tcp_stream * volatile stream, volatile index_type index)
141 {
142 __asm__ volatile("" : : "m" (stream), "m" (index));
143 }
144 /*---------------------------------------------------------------------------*/
145 stream_queue_t
CreateStreamQueue(int capacity)146 CreateStreamQueue(int capacity)
147 {
148 stream_queue_t sq;
149
150 sq = (stream_queue_t)calloc(1, sizeof(struct stream_queue));
151 if (!sq)
152 return NULL;
153
154 sq->_q = (tcp_stream **)calloc(capacity + 1, sizeof(tcp_stream *));
155 if (!sq->_q) {
156 free(sq);
157 return NULL;
158 }
159
160 sq->_capacity = capacity;
161 sq->_head = sq->_tail = 0;
162
163 return sq;
164 }
165 /*---------------------------------------------------------------------------*/
166 void
DestroyStreamQueue(stream_queue_t sq)167 DestroyStreamQueue(stream_queue_t sq)
168 {
169 if (!sq)
170 return;
171
172 if (sq->_q) {
173 free((void *)sq->_q);
174 sq->_q = NULL;
175 }
176
177 free(sq);
178 }
179 /*---------------------------------------------------------------------------*/
180 int
StreamEnqueue(stream_queue_t sq,tcp_stream * stream)181 StreamEnqueue(stream_queue_t sq, tcp_stream *stream)
182 {
183 index_type h = sq->_head;
184 index_type t = sq->_tail;
185 index_type nt = NextIndex(sq, t);
186
187 if (nt != h) {
188 sq->_q[t] = stream;
189 StreamMemoryBarrier(sq->_q[t], sq->_tail);
190 sq->_tail = nt;
191 return 0;
192 }
193
194 TRACE_ERROR("Exceed capacity of stream queue!\n");
195 return -1;
196 }
197 /*---------------------------------------------------------------------------*/
198 tcp_stream *
StreamDequeue(stream_queue_t sq)199 StreamDequeue(stream_queue_t sq)
200 {
201 index_type h = sq->_head;
202 index_type t = sq->_tail;
203
204 if (h != t) {
205 tcp_stream *stream = sq->_q[h];
206 StreamMemoryBarrier(sq->_q[h], sq->_head);
207 sq->_head = NextIndex(sq, h);
208 assert(stream);
209 return stream;
210 }
211
212 return NULL;
213 }
214 /*---------------------------------------------------------------------------*/
215