1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2010-2014 Intel Corporation 3 */ 4 5 #include <stdio.h> 6 #include <sys/queue.h> 7 #include <rte_mbuf.h> 8 #include <rte_memzone.h> 9 #include <rte_errno.h> 10 #include <rte_string_fns.h> 11 #include <rte_eal_memconfig.h> 12 #include <rte_pause.h> 13 #include <rte_tailq.h> 14 15 #include "rte_distributor_single.h" 16 #include "distributor_private.h" 17 18 TAILQ_HEAD(rte_distributor_list, rte_distributor_single); 19 20 static struct rte_tailq_elem rte_distributor_tailq = { 21 .name = "RTE_DISTRIBUTOR", 22 }; 23 EAL_REGISTER_TAILQ(rte_distributor_tailq) 24 25 /**** APIs called by workers ****/ 26 27 void 28 rte_distributor_request_pkt_single(struct rte_distributor_single *d, 29 unsigned worker_id, struct rte_mbuf *oldpkt) 30 { 31 union rte_distributor_buffer_single *buf = &d->bufs[worker_id]; 32 int64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS) 33 | RTE_DISTRIB_GET_BUF; 34 RTE_WAIT_UNTIL_MASKED(&buf->bufptr64, RTE_DISTRIB_FLAGS_MASK, 35 ==, 0, __ATOMIC_RELAXED); 36 37 /* Sync with distributor on GET_BUF flag. */ 38 __atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE); 39 } 40 41 struct rte_mbuf * 42 rte_distributor_poll_pkt_single(struct rte_distributor_single *d, 43 unsigned worker_id) 44 { 45 union rte_distributor_buffer_single *buf = &d->bufs[worker_id]; 46 /* Sync with distributor. Acquire bufptr64. */ 47 if (__atomic_load_n(&buf->bufptr64, __ATOMIC_ACQUIRE) 48 & RTE_DISTRIB_GET_BUF) 49 return NULL; 50 51 /* since bufptr64 is signed, this should be an arithmetic shift */ 52 int64_t ret = buf->bufptr64 >> RTE_DISTRIB_FLAG_BITS; 53 return (struct rte_mbuf *)((uintptr_t)ret); 54 } 55 56 struct rte_mbuf * 57 rte_distributor_get_pkt_single(struct rte_distributor_single *d, 58 unsigned worker_id, struct rte_mbuf *oldpkt) 59 { 60 struct rte_mbuf *ret; 61 rte_distributor_request_pkt_single(d, worker_id, oldpkt); 62 while ((ret = rte_distributor_poll_pkt_single(d, worker_id)) == NULL) 63 rte_pause(); 64 return ret; 65 } 66 67 int 68 rte_distributor_return_pkt_single(struct rte_distributor_single *d, 69 unsigned worker_id, struct rte_mbuf *oldpkt) 70 { 71 union rte_distributor_buffer_single *buf = &d->bufs[worker_id]; 72 uint64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS) 73 | RTE_DISTRIB_RETURN_BUF; 74 RTE_WAIT_UNTIL_MASKED(&buf->bufptr64, RTE_DISTRIB_FLAGS_MASK, 75 ==, 0, __ATOMIC_RELAXED); 76 77 /* Sync with distributor on RETURN_BUF flag. */ 78 __atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE); 79 return 0; 80 } 81 82 /**** APIs called on distributor core ***/ 83 84 /* as name suggests, adds a packet to the backlog for a particular worker */ 85 static int 86 add_to_backlog(struct rte_distributor_backlog *bl, int64_t item) 87 { 88 if (bl->count == RTE_DISTRIB_BACKLOG_SIZE) 89 return -1; 90 91 bl->pkts[(bl->start + bl->count++) & (RTE_DISTRIB_BACKLOG_MASK)] 92 = item; 93 return 0; 94 } 95 96 /* takes the next packet for a worker off the backlog */ 97 static int64_t 98 backlog_pop(struct rte_distributor_backlog *bl) 99 { 100 bl->count--; 101 return bl->pkts[bl->start++ & RTE_DISTRIB_BACKLOG_MASK]; 102 } 103 104 /* stores a packet returned from a worker inside the returns array */ 105 static inline void 106 store_return(uintptr_t oldbuf, struct rte_distributor_single *d, 107 unsigned *ret_start, unsigned *ret_count) 108 { 109 /* store returns in a circular buffer - code is branch-free */ 110 d->returns.mbufs[(*ret_start + *ret_count) & RTE_DISTRIB_RETURNS_MASK] 111 = (void *)oldbuf; 112 *ret_start += (*ret_count == RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf); 113 *ret_count += (*ret_count != RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf); 114 } 115 116 static inline void 117 handle_worker_shutdown(struct rte_distributor_single *d, unsigned int wkr) 118 { 119 d->in_flight_tags[wkr] = 0; 120 d->in_flight_bitmask &= ~(1UL << wkr); 121 /* Sync with worker. Release bufptr64. */ 122 __atomic_store_n(&(d->bufs[wkr].bufptr64), 0, __ATOMIC_RELEASE); 123 if (unlikely(d->backlog[wkr].count != 0)) { 124 /* On return of a packet, we need to move the 125 * queued packets for this core elsewhere. 126 * Easiest solution is to set things up for 127 * a recursive call. That will cause those 128 * packets to be queued up for the next free 129 * core, i.e. it will return as soon as a 130 * core becomes free to accept the first 131 * packet, as subsequent ones will be added to 132 * the backlog for that core. 133 */ 134 struct rte_mbuf *pkts[RTE_DISTRIB_BACKLOG_SIZE]; 135 unsigned i; 136 struct rte_distributor_backlog *bl = &d->backlog[wkr]; 137 138 for (i = 0; i < bl->count; i++) { 139 unsigned idx = (bl->start + i) & 140 RTE_DISTRIB_BACKLOG_MASK; 141 pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >> 142 RTE_DISTRIB_FLAG_BITS)); 143 } 144 /* recursive call. 145 * Note that the tags were set before first level call 146 * to rte_distributor_process. 147 */ 148 rte_distributor_process_single(d, pkts, i); 149 bl->count = bl->start = 0; 150 } 151 } 152 153 /* this function is called when process() fn is called without any new 154 * packets. It goes through all the workers and clears any returned packets 155 * to do a partial flush. 156 */ 157 static int 158 process_returns(struct rte_distributor_single *d) 159 { 160 unsigned wkr; 161 unsigned flushed = 0; 162 unsigned ret_start = d->returns.start, 163 ret_count = d->returns.count; 164 165 for (wkr = 0; wkr < d->num_workers; wkr++) { 166 uintptr_t oldbuf = 0; 167 /* Sync with worker. Acquire bufptr64. */ 168 const int64_t data = __atomic_load_n(&(d->bufs[wkr].bufptr64), 169 __ATOMIC_ACQUIRE); 170 171 if (data & RTE_DISTRIB_GET_BUF) { 172 flushed++; 173 if (d->backlog[wkr].count) 174 /* Sync with worker. Release bufptr64. */ 175 __atomic_store_n(&(d->bufs[wkr].bufptr64), 176 backlog_pop(&d->backlog[wkr]), 177 __ATOMIC_RELEASE); 178 else { 179 /* Sync with worker on GET_BUF flag. */ 180 __atomic_store_n(&(d->bufs[wkr].bufptr64), 181 RTE_DISTRIB_GET_BUF, 182 __ATOMIC_RELEASE); 183 d->in_flight_tags[wkr] = 0; 184 d->in_flight_bitmask &= ~(1UL << wkr); 185 } 186 oldbuf = data >> RTE_DISTRIB_FLAG_BITS; 187 } else if (data & RTE_DISTRIB_RETURN_BUF) { 188 handle_worker_shutdown(d, wkr); 189 oldbuf = data >> RTE_DISTRIB_FLAG_BITS; 190 } 191 192 store_return(oldbuf, d, &ret_start, &ret_count); 193 } 194 195 d->returns.start = ret_start; 196 d->returns.count = ret_count; 197 198 return flushed; 199 } 200 201 /* process a set of packets to distribute them to workers */ 202 int 203 rte_distributor_process_single(struct rte_distributor_single *d, 204 struct rte_mbuf **mbufs, unsigned num_mbufs) 205 { 206 unsigned next_idx = 0; 207 unsigned wkr = 0; 208 struct rte_mbuf *next_mb = NULL; 209 int64_t next_value = 0; 210 uint32_t new_tag = 0; 211 unsigned ret_start = d->returns.start, 212 ret_count = d->returns.count; 213 214 if (unlikely(num_mbufs == 0)) 215 return process_returns(d); 216 217 while (next_idx < num_mbufs || next_mb != NULL) { 218 uintptr_t oldbuf = 0; 219 /* Sync with worker. Acquire bufptr64. */ 220 int64_t data = __atomic_load_n(&(d->bufs[wkr].bufptr64), 221 __ATOMIC_ACQUIRE); 222 223 if (!next_mb) { 224 next_mb = mbufs[next_idx++]; 225 next_value = (((int64_t)(uintptr_t)next_mb) 226 << RTE_DISTRIB_FLAG_BITS); 227 /* 228 * User is advocated to set tag value for each 229 * mbuf before calling rte_distributor_process. 230 * User defined tags are used to identify flows, 231 * or sessions. 232 */ 233 new_tag = next_mb->hash.usr; 234 235 /* 236 * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64 237 * then the size of match has to be expanded. 238 */ 239 uint64_t match = 0; 240 unsigned i; 241 /* 242 * to scan for a match use "xor" and "not" to get a 0/1 243 * value, then use shifting to merge to single "match" 244 * variable, where a one-bit indicates a match for the 245 * worker given by the bit-position 246 */ 247 for (i = 0; i < d->num_workers; i++) 248 match |= ((uint64_t)!(d->in_flight_tags[i] ^ new_tag) << i); 249 250 /* Only turned-on bits are considered as match */ 251 match &= d->in_flight_bitmask; 252 253 if (match) { 254 next_mb = NULL; 255 unsigned worker = __builtin_ctzl(match); 256 if (add_to_backlog(&d->backlog[worker], 257 next_value) < 0) 258 next_idx--; 259 } 260 } 261 262 if ((data & RTE_DISTRIB_GET_BUF) && 263 (d->backlog[wkr].count || next_mb)) { 264 265 if (d->backlog[wkr].count) 266 /* Sync with worker. Release bufptr64. */ 267 __atomic_store_n(&(d->bufs[wkr].bufptr64), 268 backlog_pop(&d->backlog[wkr]), 269 __ATOMIC_RELEASE); 270 271 else { 272 /* Sync with worker. Release bufptr64. */ 273 __atomic_store_n(&(d->bufs[wkr].bufptr64), 274 next_value, 275 __ATOMIC_RELEASE); 276 d->in_flight_tags[wkr] = new_tag; 277 d->in_flight_bitmask |= (1UL << wkr); 278 next_mb = NULL; 279 } 280 oldbuf = data >> RTE_DISTRIB_FLAG_BITS; 281 } else if (data & RTE_DISTRIB_RETURN_BUF) { 282 handle_worker_shutdown(d, wkr); 283 oldbuf = data >> RTE_DISTRIB_FLAG_BITS; 284 } 285 286 /* store returns in a circular buffer */ 287 store_return(oldbuf, d, &ret_start, &ret_count); 288 289 if (++wkr == d->num_workers) 290 wkr = 0; 291 } 292 /* to finish, check all workers for backlog and schedule work for them 293 * if they are ready */ 294 for (wkr = 0; wkr < d->num_workers; wkr++) 295 if (d->backlog[wkr].count && 296 /* Sync with worker. Acquire bufptr64. */ 297 (__atomic_load_n(&(d->bufs[wkr].bufptr64), 298 __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF)) { 299 300 int64_t oldbuf = d->bufs[wkr].bufptr64 >> 301 RTE_DISTRIB_FLAG_BITS; 302 303 store_return(oldbuf, d, &ret_start, &ret_count); 304 305 /* Sync with worker. Release bufptr64. */ 306 __atomic_store_n(&(d->bufs[wkr].bufptr64), 307 backlog_pop(&d->backlog[wkr]), 308 __ATOMIC_RELEASE); 309 } 310 311 d->returns.start = ret_start; 312 d->returns.count = ret_count; 313 return num_mbufs; 314 } 315 316 /* return to the caller, packets returned from workers */ 317 int 318 rte_distributor_returned_pkts_single(struct rte_distributor_single *d, 319 struct rte_mbuf **mbufs, unsigned max_mbufs) 320 { 321 struct rte_distributor_returned_pkts *returns = &d->returns; 322 unsigned retval = (max_mbufs < returns->count) ? 323 max_mbufs : returns->count; 324 unsigned i; 325 326 for (i = 0; i < retval; i++) { 327 unsigned idx = (returns->start + i) & RTE_DISTRIB_RETURNS_MASK; 328 mbufs[i] = returns->mbufs[idx]; 329 } 330 returns->start += i; 331 returns->count -= i; 332 333 return retval; 334 } 335 336 /* return the number of packets in-flight in a distributor, i.e. packets 337 * being worked on or queued up in a backlog. 338 */ 339 static inline unsigned 340 total_outstanding(const struct rte_distributor_single *d) 341 { 342 unsigned wkr, total_outstanding; 343 344 total_outstanding = __builtin_popcountl(d->in_flight_bitmask); 345 346 for (wkr = 0; wkr < d->num_workers; wkr++) 347 total_outstanding += d->backlog[wkr].count; 348 349 return total_outstanding; 350 } 351 352 /* flush the distributor, so that there are no outstanding packets in flight or 353 * queued up. */ 354 int 355 rte_distributor_flush_single(struct rte_distributor_single *d) 356 { 357 const unsigned flushed = total_outstanding(d); 358 359 while (total_outstanding(d) > 0) 360 rte_distributor_process_single(d, NULL, 0); 361 362 return flushed; 363 } 364 365 /* clears the internal returns array in the distributor */ 366 void 367 rte_distributor_clear_returns_single(struct rte_distributor_single *d) 368 { 369 d->returns.start = d->returns.count = 0; 370 #ifndef __OPTIMIZE__ 371 memset(d->returns.mbufs, 0, sizeof(d->returns.mbufs)); 372 #endif 373 } 374 375 /* creates a distributor instance */ 376 struct rte_distributor_single * 377 rte_distributor_create_single(const char *name, 378 unsigned socket_id, 379 unsigned num_workers) 380 { 381 struct rte_distributor_single *d; 382 struct rte_distributor_list *distributor_list; 383 char mz_name[RTE_MEMZONE_NAMESIZE]; 384 const struct rte_memzone *mz; 385 386 /* compilation-time checks */ 387 RTE_BUILD_BUG_ON((sizeof(*d) & RTE_CACHE_LINE_MASK) != 0); 388 RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0); 389 RTE_BUILD_BUG_ON(RTE_DISTRIB_MAX_WORKERS > 390 sizeof(d->in_flight_bitmask) * CHAR_BIT); 391 392 if (name == NULL || num_workers >= RTE_DISTRIB_MAX_WORKERS) { 393 rte_errno = EINVAL; 394 return NULL; 395 } 396 397 snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name); 398 mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS); 399 if (mz == NULL) { 400 rte_errno = ENOMEM; 401 return NULL; 402 } 403 404 d = mz->addr; 405 strlcpy(d->name, name, sizeof(d->name)); 406 d->num_workers = num_workers; 407 408 distributor_list = RTE_TAILQ_CAST(rte_distributor_tailq.head, 409 rte_distributor_list); 410 411 rte_mcfg_tailq_write_lock(); 412 TAILQ_INSERT_TAIL(distributor_list, d, next); 413 rte_mcfg_tailq_write_unlock(); 414 415 return d; 416 } 417