1 /* 2 * TCP stream queue - tcp_stream_queue.c/h 3 * 4 * EunYoung Jeong 5 * 6 * Part of this code borrows Click's simple queue implementation 7 * 8 * ============================== Click License ============================= 9 * 10 * Copyright (c) 1999-2000 Massachusetts Institute of Technology 11 * 12 * Permission is hereby granted, free of charge, to any person obtaining a 13 * copy of this software and associated documentation files (the "Software"), 14 * to deal in the Software without restriction, subject to the conditions 15 * listed in the Click LICENSE file. These conditions include: you must 16 * preserve this copyright notice, and you cannot mention the copyright 17 * holders in advertising related to the Software without their permission. 18 * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This 19 * notice is a summary of the Click LICENSE file; the license in that file is 20 * legally binding. 21 */ 22 23 #include <stdio.h> 24 #include <stdlib.h> 25 26 #include "tcp_stream_queue.h" 27 #include "debug.h" 28 29 #ifndef _INDEX_TYPE_ 30 #define _INDEX_TYPE_ 31 typedef uint32_t index_type; 32 typedef int32_t signed_index_type; 33 #endif 34 /*---------------------------------------------------------------------------*/ 35 struct stream_queue 36 { 37 index_type _capacity; 38 volatile index_type _head; 39 volatile index_type _tail; 40 41 struct tcp_stream * volatile * _q; 42 }; 43 /*----------------------------------------------------------------------------*/ 44 stream_queue_int * 45 CreateInternalStreamQueue(int size) 46 { 47 stream_queue_int *sq; 48 49 sq = (stream_queue_int *)calloc(1, sizeof(stream_queue_int)); 50 if (!sq) { 51 return NULL; 52 } 53 54 sq->array = (tcp_stream **)calloc(size, sizeof(tcp_stream *)); 55 if (!sq->array) { 56 free(sq); 57 return NULL; 58 } 59 60 sq->size = size; 61 sq->first = sq->last = 0; 62 sq->count = 0; 63 64 return sq; 65 } 66 /*----------------------------------------------------------------------------*/ 67 void 68 DestroyInternalStreamQueue(stream_queue_int *sq) 69 { 70 if (!sq) 71 return; 72 73 if (sq->array) { 74 free(sq->array); 75 sq->array = NULL; 76 } 77 78 free(sq); 79 } 80 /*----------------------------------------------------------------------------*/ 81 int 82 StreamInternalEnqueue(stream_queue_int *sq, struct tcp_stream *stream) 83 { 84 if (sq->count >= sq->size) { 85 /* queue is full */ 86 TRACE_INFO("[WARNING] Queue overflow. Set larger queue size! " 87 "count: %d, size: %d\n", sq->count, sq->size); 88 return -1; 89 } 90 91 sq->array[sq->last++] = stream; 92 sq->count++; 93 if (sq->last >= sq->size) { 94 sq->last = 0; 95 } 96 assert (sq->count <= sq->size); 97 98 return 0; 99 } 100 /*----------------------------------------------------------------------------*/ 101 struct tcp_stream * 102 StreamInternalDequeue(stream_queue_int *sq) 103 { 104 struct tcp_stream *stream = NULL; 105 106 if (sq->count <= 0) { 107 return NULL; 108 } 109 110 stream = sq->array[sq->first++]; 111 assert(stream != NULL); 112 if (sq->first >= sq->size) { 113 sq->first = 0; 114 } 115 sq->count--; 116 assert(sq->count >= 0); 117 118 return stream; 119 } 120 /*---------------------------------------------------------------------------*/ 121 static inline index_type 122 NextIndex(stream_queue_t sq, index_type i) 123 { 124 return (i != sq->_capacity ? i + 1: 0); 125 } 126 /*---------------------------------------------------------------------------*/ 127 static inline index_type 128 PrevIndex(stream_queue_t sq, index_type i) 129 { 130 return (i != 0 ? i - 1: sq->_capacity); 131 } 132 /*---------------------------------------------------------------------------*/ 133 int 134 StreamQueueIsEmpty(stream_queue_t sq) 135 { 136 return (sq->_head == sq->_tail); 137 } 138 /*---------------------------------------------------------------------------*/ 139 static inline void 140 StreamMemoryBarrier(tcp_stream * volatile stream, volatile index_type index) 141 { 142 __asm__ volatile("" : : "m" (stream), "m" (index)); 143 } 144 /*---------------------------------------------------------------------------*/ 145 stream_queue_t 146 CreateStreamQueue(int capacity) 147 { 148 stream_queue_t sq; 149 150 sq = (stream_queue_t)calloc(1, sizeof(struct stream_queue)); 151 if (!sq) 152 return NULL; 153 154 sq->_q = (tcp_stream **)calloc(capacity + 1, sizeof(tcp_stream *)); 155 if (!sq->_q) { 156 free(sq); 157 return NULL; 158 } 159 160 sq->_capacity = capacity; 161 sq->_head = sq->_tail = 0; 162 163 return sq; 164 } 165 /*---------------------------------------------------------------------------*/ 166 void 167 DestroyStreamQueue(stream_queue_t sq) 168 { 169 if (!sq) 170 return; 171 172 if (sq->_q) { 173 free((void *)sq->_q); 174 sq->_q = NULL; 175 } 176 177 free(sq); 178 } 179 /*---------------------------------------------------------------------------*/ 180 int 181 StreamEnqueue(stream_queue_t sq, tcp_stream *stream) 182 { 183 index_type h = sq->_head; 184 index_type t = sq->_tail; 185 index_type nt = NextIndex(sq, t); 186 187 if (nt != h) { 188 sq->_q[t] = stream; 189 StreamMemoryBarrier(sq->_q[t], sq->_tail); 190 sq->_tail = nt; 191 return 0; 192 } 193 194 TRACE_ERROR("Exceed capacity of stream queue!\n"); 195 return -1; 196 } 197 /*---------------------------------------------------------------------------*/ 198 tcp_stream * 199 StreamDequeue(stream_queue_t sq) 200 { 201 index_type h = sq->_head; 202 index_type t = sq->_tail; 203 204 if (h != t) { 205 tcp_stream *stream = sq->_q[h]; 206 StreamMemoryBarrier(sq->_q[h], sq->_head); 207 sq->_head = NextIndex(sq, h); 208 assert(stream); 209 return stream; 210 } 211 212 return NULL; 213 } 214 /*---------------------------------------------------------------------------*/ 215