1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2017 Intel Corporation 3 */ 4 5 #include <stdio.h> 6 #include <sys/queue.h> 7 #include <string.h> 8 #include <rte_mbuf.h> 9 #include <rte_cycles.h> 10 #include <rte_memzone.h> 11 #include <rte_errno.h> 12 #include <rte_string_fns.h> 13 #include <rte_eal_memconfig.h> 14 #include <rte_pause.h> 15 #include <rte_tailq.h> 16 17 #include "rte_distributor.h" 18 #include "rte_distributor_single.h" 19 #include "distributor_private.h" 20 21 TAILQ_HEAD(rte_dist_burst_list, rte_distributor); 22 23 static struct rte_tailq_elem rte_dist_burst_tailq = { 24 .name = "RTE_DIST_BURST", 25 }; 26 EAL_REGISTER_TAILQ(rte_dist_burst_tailq) 27 28 /**** APIs called by workers ****/ 29 30 /**** Burst Packet APIs called by workers ****/ 31 32 void 33 rte_distributor_request_pkt(struct rte_distributor *d, 34 unsigned int worker_id, struct rte_mbuf **oldpkt, 35 unsigned int count) 36 { 37 struct rte_distributor_buffer *buf = &(d->bufs[worker_id]); 38 unsigned int i; 39 40 volatile int64_t *retptr64; 41 42 if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) { 43 rte_distributor_request_pkt_single(d->d_single, 44 worker_id, count ? oldpkt[0] : NULL); 45 return; 46 } 47 48 retptr64 = &(buf->retptr64[0]); 49 /* Spin while handshake bits are set (scheduler clears it). 50 * Sync with worker on GET_BUF flag. 51 */ 52 while (unlikely(__atomic_load_n(retptr64, __ATOMIC_ACQUIRE) 53 & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))) { 54 rte_pause(); 55 uint64_t t = rte_rdtsc()+100; 56 57 while (rte_rdtsc() < t) 58 rte_pause(); 59 } 60 61 /* 62 * OK, if we've got here, then the scheduler has just cleared the 63 * handshake bits. Populate the retptrs with returning packets. 64 */ 65 66 for (i = count; i < RTE_DIST_BURST_SIZE; i++) 67 buf->retptr64[i] = 0; 68 69 /* Set VALID_BUF bit for each packet returned */ 70 for (i = count; i-- > 0; ) 71 buf->retptr64[i] = 72 (((int64_t)(uintptr_t)(oldpkt[i])) << 73 RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_VALID_BUF; 74 75 /* 76 * Finally, set the GET_BUF to signal to distributor that cache 77 * line is ready for processing 78 * Sync with distributor to release retptrs 79 */ 80 __atomic_store_n(retptr64, *retptr64 | RTE_DISTRIB_GET_BUF, 81 __ATOMIC_RELEASE); 82 } 83 84 int 85 rte_distributor_poll_pkt(struct rte_distributor *d, 86 unsigned int worker_id, struct rte_mbuf **pkts) 87 { 88 struct rte_distributor_buffer *buf = &d->bufs[worker_id]; 89 uint64_t ret; 90 int count = 0; 91 unsigned int i; 92 93 if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) { 94 pkts[0] = rte_distributor_poll_pkt_single(d->d_single, 95 worker_id); 96 return (pkts[0]) ? 1 : 0; 97 } 98 99 /* If any of below bits is set, return. 100 * GET_BUF is set when distributor hasn't sent any packets yet 101 * RETURN_BUF is set when distributor must retrieve in-flight packets 102 * Sync with distributor to acquire bufptrs 103 */ 104 if (__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE) 105 & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF)) 106 return -1; 107 108 /* since bufptr64 is signed, this should be an arithmetic shift */ 109 for (i = 0; i < RTE_DIST_BURST_SIZE; i++) { 110 if (likely(buf->bufptr64[i] & RTE_DISTRIB_VALID_BUF)) { 111 ret = buf->bufptr64[i] >> RTE_DISTRIB_FLAG_BITS; 112 pkts[count++] = (struct rte_mbuf *)((uintptr_t)(ret)); 113 } 114 } 115 116 /* 117 * so now we've got the contents of the cacheline into an array of 118 * mbuf pointers, so toggle the bit so scheduler can start working 119 * on the next cacheline while we're working. 120 * Sync with distributor on GET_BUF flag. Release bufptrs. 121 */ 122 __atomic_store_n(&(buf->bufptr64[0]), 123 buf->bufptr64[0] | RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE); 124 125 return count; 126 } 127 128 int 129 rte_distributor_get_pkt(struct rte_distributor *d, 130 unsigned int worker_id, struct rte_mbuf **pkts, 131 struct rte_mbuf **oldpkt, unsigned int return_count) 132 { 133 int count; 134 135 if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) { 136 if (return_count <= 1) { 137 pkts[0] = rte_distributor_get_pkt_single(d->d_single, 138 worker_id, return_count ? oldpkt[0] : NULL); 139 return (pkts[0]) ? 1 : 0; 140 } else 141 return -EINVAL; 142 } 143 144 rte_distributor_request_pkt(d, worker_id, oldpkt, return_count); 145 146 count = rte_distributor_poll_pkt(d, worker_id, pkts); 147 while (count == -1) { 148 uint64_t t = rte_rdtsc() + 100; 149 150 while (rte_rdtsc() < t) 151 rte_pause(); 152 153 count = rte_distributor_poll_pkt(d, worker_id, pkts); 154 } 155 return count; 156 } 157 158 int 159 rte_distributor_return_pkt(struct rte_distributor *d, 160 unsigned int worker_id, struct rte_mbuf **oldpkt, int num) 161 { 162 struct rte_distributor_buffer *buf = &d->bufs[worker_id]; 163 unsigned int i; 164 165 if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) { 166 if (num == 1) 167 return rte_distributor_return_pkt_single(d->d_single, 168 worker_id, oldpkt[0]); 169 else if (num == 0) 170 return rte_distributor_return_pkt_single(d->d_single, 171 worker_id, NULL); 172 else 173 return -EINVAL; 174 } 175 176 /* Spin while handshake bits are set (scheduler clears it). 177 * Sync with worker on GET_BUF flag. 178 */ 179 while (unlikely(__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_RELAXED) 180 & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))) { 181 rte_pause(); 182 uint64_t t = rte_rdtsc()+100; 183 184 while (rte_rdtsc() < t) 185 rte_pause(); 186 } 187 188 /* Sync with distributor to acquire retptrs */ 189 __atomic_thread_fence(__ATOMIC_ACQUIRE); 190 for (i = 0; i < RTE_DIST_BURST_SIZE; i++) 191 /* Switch off the return bit first */ 192 buf->retptr64[i] = 0; 193 194 for (i = num; i-- > 0; ) 195 buf->retptr64[i] = (((int64_t)(uintptr_t)oldpkt[i]) << 196 RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_VALID_BUF; 197 198 /* Use RETURN_BUF on bufptr64 to notify distributor that 199 * we won't read any mbufs from there even if GET_BUF is set. 200 * This allows distributor to retrieve in-flight already sent packets. 201 */ 202 __atomic_or_fetch(&(buf->bufptr64[0]), RTE_DISTRIB_RETURN_BUF, 203 __ATOMIC_ACQ_REL); 204 205 /* set the RETURN_BUF on retptr64 even if we got no returns. 206 * Sync with distributor on RETURN_BUF flag. Release retptrs. 207 * Notify distributor that we don't request more packets any more. 208 */ 209 __atomic_store_n(&(buf->retptr64[0]), 210 buf->retptr64[0] | RTE_DISTRIB_RETURN_BUF, __ATOMIC_RELEASE); 211 212 return 0; 213 } 214 215 /**** APIs called on distributor core ***/ 216 217 /* stores a packet returned from a worker inside the returns array */ 218 static inline void 219 store_return(uintptr_t oldbuf, struct rte_distributor *d, 220 unsigned int *ret_start, unsigned int *ret_count) 221 { 222 if (!oldbuf) 223 return; 224 /* store returns in a circular buffer */ 225 d->returns.mbufs[(*ret_start + *ret_count) & RTE_DISTRIB_RETURNS_MASK] 226 = (void *)oldbuf; 227 *ret_start += (*ret_count == RTE_DISTRIB_RETURNS_MASK); 228 *ret_count += (*ret_count != RTE_DISTRIB_RETURNS_MASK); 229 } 230 231 /* 232 * Match then flow_ids (tags) of the incoming packets to the flow_ids 233 * of the inflight packets (both inflight on the workers and in each worker 234 * backlog). This will then allow us to pin those packets to the relevant 235 * workers to give us our atomic flow pinning. 236 */ 237 void 238 find_match_scalar(struct rte_distributor *d, 239 uint16_t *data_ptr, 240 uint16_t *output_ptr) 241 { 242 struct rte_distributor_backlog *bl; 243 uint16_t i, j, w; 244 245 /* 246 * Function overview: 247 * 1. Loop through all worker ID's 248 * 2. Compare the current inflights to the incoming tags 249 * 3. Compare the current backlog to the incoming tags 250 * 4. Add any matches to the output 251 */ 252 253 for (j = 0 ; j < RTE_DIST_BURST_SIZE; j++) 254 output_ptr[j] = 0; 255 256 for (i = 0; i < d->num_workers; i++) { 257 bl = &d->backlog[i]; 258 259 for (j = 0; j < RTE_DIST_BURST_SIZE ; j++) 260 for (w = 0; w < RTE_DIST_BURST_SIZE; w++) 261 if (d->in_flight_tags[i][w] == data_ptr[j]) { 262 output_ptr[j] = i+1; 263 break; 264 } 265 for (j = 0; j < RTE_DIST_BURST_SIZE; j++) 266 for (w = 0; w < RTE_DIST_BURST_SIZE; w++) 267 if (bl->tags[w] == data_ptr[j]) { 268 output_ptr[j] = i+1; 269 break; 270 } 271 } 272 273 /* 274 * At this stage, the output contains 8 16-bit values, with 275 * each non-zero value containing the worker ID on which the 276 * corresponding flow is pinned to. 277 */ 278 } 279 280 /* 281 * When worker called rte_distributor_return_pkt() 282 * and passed RTE_DISTRIB_RETURN_BUF handshake through retptr64, 283 * distributor must retrieve both inflight and backlog packets assigned 284 * to the worker and reprocess them to another worker. 285 */ 286 static void 287 handle_worker_shutdown(struct rte_distributor *d, unsigned int wkr) 288 { 289 struct rte_distributor_buffer *buf = &(d->bufs[wkr]); 290 /* double BURST size for storing both inflights and backlog */ 291 struct rte_mbuf *pkts[RTE_DIST_BURST_SIZE * 2]; 292 unsigned int pkts_count = 0; 293 unsigned int i; 294 295 /* If GET_BUF is cleared there are in-flight packets sent 296 * to worker which does not require new packets. 297 * They must be retrieved and assigned to another worker. 298 */ 299 if (!(__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE) 300 & RTE_DISTRIB_GET_BUF)) 301 for (i = 0; i < RTE_DIST_BURST_SIZE; i++) 302 if (buf->bufptr64[i] & RTE_DISTRIB_VALID_BUF) 303 pkts[pkts_count++] = (void *)((uintptr_t) 304 (buf->bufptr64[i] 305 >> RTE_DISTRIB_FLAG_BITS)); 306 307 /* Make following operations on handshake flags on bufptr64: 308 * - set GET_BUF to indicate that distributor can overwrite buffer 309 * with new packets if worker will make a new request. 310 * - clear RETURN_BUF to unlock reads on worker side. 311 */ 312 __atomic_store_n(&(buf->bufptr64[0]), RTE_DISTRIB_GET_BUF, 313 __ATOMIC_RELEASE); 314 315 /* Collect backlog packets from worker */ 316 for (i = 0; i < d->backlog[wkr].count; i++) 317 pkts[pkts_count++] = (void *)((uintptr_t) 318 (d->backlog[wkr].pkts[i] >> RTE_DISTRIB_FLAG_BITS)); 319 320 d->backlog[wkr].count = 0; 321 322 /* Clear both inflight and backlog tags */ 323 for (i = 0; i < RTE_DIST_BURST_SIZE; i++) { 324 d->in_flight_tags[wkr][i] = 0; 325 d->backlog[wkr].tags[i] = 0; 326 } 327 328 /* Recursive call */ 329 if (pkts_count > 0) 330 rte_distributor_process(d, pkts, pkts_count); 331 } 332 333 334 /* 335 * When the handshake bits indicate that there are packets coming 336 * back from the worker, this function is called to copy and store 337 * the valid returned pointers (store_return). 338 */ 339 static unsigned int 340 handle_returns(struct rte_distributor *d, unsigned int wkr) 341 { 342 struct rte_distributor_buffer *buf = &(d->bufs[wkr]); 343 uintptr_t oldbuf; 344 unsigned int ret_start = d->returns.start, 345 ret_count = d->returns.count; 346 unsigned int count = 0; 347 unsigned int i; 348 349 /* Sync on GET_BUF flag. Acquire retptrs. */ 350 if (__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_ACQUIRE) 351 & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF)) { 352 for (i = 0; i < RTE_DIST_BURST_SIZE; i++) { 353 if (buf->retptr64[i] & RTE_DISTRIB_VALID_BUF) { 354 oldbuf = ((uintptr_t)(buf->retptr64[i] >> 355 RTE_DISTRIB_FLAG_BITS)); 356 /* store returns in a circular buffer */ 357 store_return(oldbuf, d, &ret_start, &ret_count); 358 count++; 359 buf->retptr64[i] &= ~RTE_DISTRIB_VALID_BUF; 360 } 361 } 362 d->returns.start = ret_start; 363 d->returns.count = ret_count; 364 365 /* If worker requested packets with GET_BUF, set it to active 366 * otherwise (RETURN_BUF), set it to not active. 367 */ 368 d->activesum -= d->active[wkr]; 369 d->active[wkr] = !!(buf->retptr64[0] & RTE_DISTRIB_GET_BUF); 370 d->activesum += d->active[wkr]; 371 372 /* If worker returned packets without requesting new ones, 373 * handle all in-flights and backlog packets assigned to it. 374 */ 375 if (unlikely(buf->retptr64[0] & RTE_DISTRIB_RETURN_BUF)) 376 handle_worker_shutdown(d, wkr); 377 378 /* Clear for the worker to populate with more returns. 379 * Sync with distributor on GET_BUF flag. Release retptrs. 380 */ 381 __atomic_store_n(&(buf->retptr64[0]), 0, __ATOMIC_RELEASE); 382 } 383 return count; 384 } 385 386 /* 387 * This function releases a burst (cache line) to a worker. 388 * It is called from the process function when a cacheline is 389 * full to make room for more packets for that worker, or when 390 * all packets have been assigned to bursts and need to be flushed 391 * to the workers. 392 * It also needs to wait for any outstanding packets from the worker 393 * before sending out new packets. 394 */ 395 static unsigned int 396 release(struct rte_distributor *d, unsigned int wkr) 397 { 398 struct rte_distributor_buffer *buf = &(d->bufs[wkr]); 399 unsigned int i; 400 401 handle_returns(d, wkr); 402 if (unlikely(!d->active[wkr])) 403 return 0; 404 405 /* Sync with worker on GET_BUF flag */ 406 while (!(__atomic_load_n(&(d->bufs[wkr].bufptr64[0]), __ATOMIC_ACQUIRE) 407 & RTE_DISTRIB_GET_BUF)) { 408 handle_returns(d, wkr); 409 if (unlikely(!d->active[wkr])) 410 return 0; 411 rte_pause(); 412 } 413 414 buf->count = 0; 415 416 for (i = 0; i < d->backlog[wkr].count; i++) { 417 d->bufs[wkr].bufptr64[i] = d->backlog[wkr].pkts[i] | 418 RTE_DISTRIB_GET_BUF | RTE_DISTRIB_VALID_BUF; 419 d->in_flight_tags[wkr][i] = d->backlog[wkr].tags[i]; 420 } 421 buf->count = i; 422 for ( ; i < RTE_DIST_BURST_SIZE ; i++) { 423 buf->bufptr64[i] = RTE_DISTRIB_GET_BUF; 424 d->in_flight_tags[wkr][i] = 0; 425 } 426 427 d->backlog[wkr].count = 0; 428 429 /* Clear the GET bit. 430 * Sync with worker on GET_BUF flag. Release bufptrs. 431 */ 432 __atomic_store_n(&(buf->bufptr64[0]), 433 buf->bufptr64[0] & ~RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE); 434 return buf->count; 435 436 } 437 438 439 /* process a set of packets to distribute them to workers */ 440 int 441 rte_distributor_process(struct rte_distributor *d, 442 struct rte_mbuf **mbufs, unsigned int num_mbufs) 443 { 444 unsigned int next_idx = 0; 445 static unsigned int wkr; 446 struct rte_mbuf *next_mb = NULL; 447 int64_t next_value = 0; 448 uint16_t new_tag = 0; 449 uint16_t flows[RTE_DIST_BURST_SIZE] __rte_cache_aligned; 450 unsigned int i, j, w, wid, matching_required; 451 452 if (d->alg_type == RTE_DIST_ALG_SINGLE) { 453 /* Call the old API */ 454 return rte_distributor_process_single(d->d_single, 455 mbufs, num_mbufs); 456 } 457 458 for (wid = 0 ; wid < d->num_workers; wid++) 459 handle_returns(d, wid); 460 461 if (unlikely(num_mbufs == 0)) { 462 /* Flush out all non-full cache-lines to workers. */ 463 for (wid = 0 ; wid < d->num_workers; wid++) { 464 /* Sync with worker on GET_BUF flag. */ 465 if (__atomic_load_n(&(d->bufs[wid].bufptr64[0]), 466 __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF) { 467 d->bufs[wid].count = 0; 468 release(d, wid); 469 handle_returns(d, wid); 470 } 471 } 472 return 0; 473 } 474 475 if (unlikely(!d->activesum)) 476 return 0; 477 478 while (next_idx < num_mbufs) { 479 uint16_t matches[RTE_DIST_BURST_SIZE] __rte_aligned(128); 480 unsigned int pkts; 481 482 if ((num_mbufs - next_idx) < RTE_DIST_BURST_SIZE) 483 pkts = num_mbufs - next_idx; 484 else 485 pkts = RTE_DIST_BURST_SIZE; 486 487 for (i = 0; i < pkts; i++) { 488 if (mbufs[next_idx + i]) { 489 /* flows have to be non-zero */ 490 flows[i] = mbufs[next_idx + i]->hash.usr | 1; 491 } else 492 flows[i] = 0; 493 } 494 for (; i < RTE_DIST_BURST_SIZE; i++) 495 flows[i] = 0; 496 497 matching_required = 1; 498 499 for (j = 0; j < pkts; j++) { 500 if (unlikely(!d->activesum)) 501 return next_idx; 502 503 if (unlikely(matching_required)) { 504 switch (d->dist_match_fn) { 505 case RTE_DIST_MATCH_VECTOR: 506 find_match_vec(d, &flows[0], 507 &matches[0]); 508 break; 509 default: 510 find_match_scalar(d, &flows[0], 511 &matches[0]); 512 } 513 matching_required = 0; 514 } 515 /* 516 * Matches array now contain the intended worker ID (+1) of 517 * the incoming packets. Any zeroes need to be assigned 518 * workers. 519 */ 520 521 next_mb = mbufs[next_idx++]; 522 next_value = (((int64_t)(uintptr_t)next_mb) << 523 RTE_DISTRIB_FLAG_BITS); 524 /* 525 * User is advocated to set tag value for each 526 * mbuf before calling rte_distributor_process. 527 * User defined tags are used to identify flows, 528 * or sessions. 529 */ 530 /* flows MUST be non-zero */ 531 new_tag = (uint16_t)(next_mb->hash.usr) | 1; 532 533 /* 534 * Uncommenting the next line will cause the find_match 535 * function to be optimized out, making this function 536 * do parallel (non-atomic) distribution 537 */ 538 /* matches[j] = 0; */ 539 540 if (matches[j] && d->active[matches[j]-1]) { 541 struct rte_distributor_backlog *bl = 542 &d->backlog[matches[j]-1]; 543 if (unlikely(bl->count == 544 RTE_DIST_BURST_SIZE)) { 545 release(d, matches[j]-1); 546 if (!d->active[matches[j]-1]) { 547 j--; 548 next_idx--; 549 matching_required = 1; 550 continue; 551 } 552 } 553 554 /* Add to worker that already has flow */ 555 unsigned int idx = bl->count++; 556 557 bl->tags[idx] = new_tag; 558 bl->pkts[idx] = next_value; 559 560 } else { 561 struct rte_distributor_backlog *bl; 562 563 while (unlikely(!d->active[wkr])) 564 wkr = (wkr + 1) % d->num_workers; 565 bl = &d->backlog[wkr]; 566 567 if (unlikely(bl->count == 568 RTE_DIST_BURST_SIZE)) { 569 release(d, wkr); 570 if (!d->active[wkr]) { 571 j--; 572 next_idx--; 573 matching_required = 1; 574 continue; 575 } 576 } 577 578 /* Add to current worker worker */ 579 unsigned int idx = bl->count++; 580 581 bl->tags[idx] = new_tag; 582 bl->pkts[idx] = next_value; 583 /* 584 * Now that we've just added an unpinned flow 585 * to a worker, we need to ensure that all 586 * other packets with that same flow will go 587 * to the same worker in this burst. 588 */ 589 for (w = j; w < pkts; w++) 590 if (flows[w] == new_tag) 591 matches[w] = wkr+1; 592 } 593 } 594 wkr = (wkr + 1) % d->num_workers; 595 } 596 597 /* Flush out all non-full cache-lines to workers. */ 598 for (wid = 0 ; wid < d->num_workers; wid++) 599 /* Sync with worker on GET_BUF flag. */ 600 if ((__atomic_load_n(&(d->bufs[wid].bufptr64[0]), 601 __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF)) { 602 d->bufs[wid].count = 0; 603 release(d, wid); 604 } 605 606 return num_mbufs; 607 } 608 609 /* return to the caller, packets returned from workers */ 610 int 611 rte_distributor_returned_pkts(struct rte_distributor *d, 612 struct rte_mbuf **mbufs, unsigned int max_mbufs) 613 { 614 struct rte_distributor_returned_pkts *returns = &d->returns; 615 unsigned int retval = (max_mbufs < returns->count) ? 616 max_mbufs : returns->count; 617 unsigned int i; 618 619 if (d->alg_type == RTE_DIST_ALG_SINGLE) { 620 /* Call the old API */ 621 return rte_distributor_returned_pkts_single(d->d_single, 622 mbufs, max_mbufs); 623 } 624 625 for (i = 0; i < retval; i++) { 626 unsigned int idx = (returns->start + i) & 627 RTE_DISTRIB_RETURNS_MASK; 628 629 mbufs[i] = returns->mbufs[idx]; 630 } 631 returns->start += i; 632 returns->count -= i; 633 634 return retval; 635 } 636 637 /* 638 * Return the number of packets in-flight in a distributor, i.e. packets 639 * being worked on or queued up in a backlog. 640 */ 641 static inline unsigned int 642 total_outstanding(const struct rte_distributor *d) 643 { 644 unsigned int wkr, total_outstanding = 0; 645 646 for (wkr = 0; wkr < d->num_workers; wkr++) 647 total_outstanding += d->backlog[wkr].count + d->bufs[wkr].count; 648 649 return total_outstanding; 650 } 651 652 /* 653 * Flush the distributor, so that there are no outstanding packets in flight or 654 * queued up. 655 */ 656 int 657 rte_distributor_flush(struct rte_distributor *d) 658 { 659 unsigned int flushed; 660 unsigned int wkr; 661 662 if (d->alg_type == RTE_DIST_ALG_SINGLE) { 663 /* Call the old API */ 664 return rte_distributor_flush_single(d->d_single); 665 } 666 667 flushed = total_outstanding(d); 668 669 while (total_outstanding(d) > 0) 670 rte_distributor_process(d, NULL, 0); 671 672 /* wait 10ms to allow all worker drain the pkts */ 673 rte_delay_us(10000); 674 675 /* 676 * Send empty burst to all workers to allow them to exit 677 * gracefully, should they need to. 678 */ 679 rte_distributor_process(d, NULL, 0); 680 681 for (wkr = 0; wkr < d->num_workers; wkr++) 682 handle_returns(d, wkr); 683 684 return flushed; 685 } 686 687 /* clears the internal returns array in the distributor */ 688 void 689 rte_distributor_clear_returns(struct rte_distributor *d) 690 { 691 unsigned int wkr; 692 693 if (d->alg_type == RTE_DIST_ALG_SINGLE) { 694 /* Call the old API */ 695 rte_distributor_clear_returns_single(d->d_single); 696 return; 697 } 698 699 /* throw away returns, so workers can exit */ 700 for (wkr = 0; wkr < d->num_workers; wkr++) 701 /* Sync with worker. Release retptrs. */ 702 __atomic_store_n(&(d->bufs[wkr].retptr64[0]), 0, 703 __ATOMIC_RELEASE); 704 705 d->returns.start = d->returns.count = 0; 706 } 707 708 /* creates a distributor instance */ 709 struct rte_distributor * 710 rte_distributor_create(const char *name, 711 unsigned int socket_id, 712 unsigned int num_workers, 713 unsigned int alg_type) 714 { 715 struct rte_distributor *d; 716 struct rte_dist_burst_list *dist_burst_list; 717 char mz_name[RTE_MEMZONE_NAMESIZE]; 718 const struct rte_memzone *mz; 719 unsigned int i; 720 721 /* TODO Reorganise function properly around RTE_DIST_ALG_SINGLE/BURST */ 722 723 /* compilation-time checks */ 724 RTE_BUILD_BUG_ON((sizeof(*d) & RTE_CACHE_LINE_MASK) != 0); 725 RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0); 726 727 if (name == NULL || num_workers >= 728 (unsigned int)RTE_MIN(RTE_DISTRIB_MAX_WORKERS, RTE_MAX_LCORE)) { 729 rte_errno = EINVAL; 730 return NULL; 731 } 732 733 if (alg_type == RTE_DIST_ALG_SINGLE) { 734 d = malloc(sizeof(struct rte_distributor)); 735 if (d == NULL) { 736 rte_errno = ENOMEM; 737 return NULL; 738 } 739 d->d_single = rte_distributor_create_single(name, 740 socket_id, num_workers); 741 if (d->d_single == NULL) { 742 free(d); 743 /* rte_errno will have been set */ 744 return NULL; 745 } 746 d->alg_type = alg_type; 747 return d; 748 } 749 750 snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name); 751 mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS); 752 if (mz == NULL) { 753 rte_errno = ENOMEM; 754 return NULL; 755 } 756 757 d = mz->addr; 758 strlcpy(d->name, name, sizeof(d->name)); 759 d->num_workers = num_workers; 760 d->alg_type = alg_type; 761 762 d->dist_match_fn = RTE_DIST_MATCH_SCALAR; 763 #if defined(RTE_ARCH_X86) 764 if (rte_vect_get_max_simd_bitwidth() >= RTE_VECT_SIMD_128) 765 d->dist_match_fn = RTE_DIST_MATCH_VECTOR; 766 #endif 767 768 /* 769 * Set up the backlog tags so they're pointing at the second cache 770 * line for performance during flow matching 771 */ 772 for (i = 0 ; i < num_workers ; i++) 773 d->backlog[i].tags = &d->in_flight_tags[i][RTE_DIST_BURST_SIZE]; 774 775 memset(d->active, 0, sizeof(d->active)); 776 d->activesum = 0; 777 778 dist_burst_list = RTE_TAILQ_CAST(rte_dist_burst_tailq.head, 779 rte_dist_burst_list); 780 781 782 rte_mcfg_tailq_write_lock(); 783 TAILQ_INSERT_TAIL(dist_burst_list, d, next); 784 rte_mcfg_tailq_write_unlock(); 785 786 return d; 787 } 788