1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2010-2017 Intel Corporation 3 */ 4 5 #include "test.h" 6 7 #include <unistd.h> 8 #include <string.h> 9 #include <rte_cycles.h> 10 #include <rte_errno.h> 11 #include <rte_mempool.h> 12 #include <rte_mbuf.h> 13 #include <rte_mbuf_dyn.h> 14 15 #ifdef RTE_EXEC_ENV_WINDOWS 16 static int 17 test_distributor(void) 18 { 19 printf("distributor not supported on Windows, skipping test\n"); 20 return TEST_SKIPPED; 21 } 22 23 #else 24 25 #include <rte_distributor.h> 26 #include <rte_string_fns.h> 27 28 #define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */ 29 #define BURST 32 30 #define BIG_BATCH 1024 31 32 typedef uint32_t seq_dynfield_t; 33 static int seq_dynfield_offset = -1; 34 35 static inline seq_dynfield_t * 36 seq_field(struct rte_mbuf *mbuf) 37 { 38 return RTE_MBUF_DYNFIELD(mbuf, seq_dynfield_offset, seq_dynfield_t *); 39 } 40 41 struct worker_params { 42 char name[64]; 43 struct rte_distributor *dist; 44 }; 45 46 struct worker_params worker_params; 47 48 /* statics - all zero-initialized by default */ 49 static volatile int quit; /**< general quit variable for all threads */ 50 static volatile int zero_quit; /**< var for when we just want thr0 to quit*/ 51 static volatile int zero_sleep; /**< thr0 has quit basic loop and is sleeping*/ 52 static volatile unsigned worker_idx; 53 static volatile unsigned zero_idx; 54 55 struct worker_stats { 56 volatile unsigned handled_packets; 57 } __rte_cache_aligned; 58 struct worker_stats worker_stats[RTE_MAX_LCORE]; 59 60 /* returns the total count of the number of packets handled by the worker 61 * functions given below. 62 */ 63 static inline unsigned 64 total_packet_count(void) 65 { 66 unsigned i, count = 0; 67 for (i = 0; i < worker_idx; i++) 68 count += __atomic_load_n(&worker_stats[i].handled_packets, 69 __ATOMIC_RELAXED); 70 return count; 71 } 72 73 /* resets the packet counts for a new test */ 74 static inline void 75 clear_packet_count(void) 76 { 77 unsigned int i; 78 for (i = 0; i < RTE_MAX_LCORE; i++) 79 __atomic_store_n(&worker_stats[i].handled_packets, 0, 80 __ATOMIC_RELAXED); 81 } 82 83 /* this is the basic worker function for sanity test 84 * it does nothing but return packets and count them. 85 */ 86 static int 87 handle_work(void *arg) 88 { 89 struct rte_mbuf *buf[8] __rte_cache_aligned; 90 struct worker_params *wp = arg; 91 struct rte_distributor *db = wp->dist; 92 unsigned int num; 93 unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED); 94 95 num = rte_distributor_get_pkt(db, id, buf, NULL, 0); 96 while (!quit) { 97 __atomic_fetch_add(&worker_stats[id].handled_packets, num, 98 __ATOMIC_RELAXED); 99 num = rte_distributor_get_pkt(db, id, 100 buf, buf, num); 101 } 102 __atomic_fetch_add(&worker_stats[id].handled_packets, num, 103 __ATOMIC_RELAXED); 104 rte_distributor_return_pkt(db, id, buf, num); 105 return 0; 106 } 107 108 /* do basic sanity testing of the distributor. This test tests the following: 109 * - send 32 packets through distributor with the same tag and ensure they 110 * all go to the one worker 111 * - send 32 packets through the distributor with two different tags and 112 * verify that they go equally to two different workers. 113 * - send 32 packets with different tags through the distributors and 114 * just verify we get all packets back. 115 * - send 1024 packets through the distributor, gathering the returned packets 116 * as we go. Then verify that we correctly got all 1024 pointers back again, 117 * not necessarily in the same order (as different flows). 118 */ 119 static int 120 sanity_test(struct worker_params *wp, struct rte_mempool *p) 121 { 122 struct rte_distributor *db = wp->dist; 123 struct rte_mbuf *bufs[BURST]; 124 struct rte_mbuf *returns[BURST*2]; 125 unsigned int i, count; 126 unsigned int retries; 127 unsigned int processed; 128 129 printf("=== Basic distributor sanity tests ===\n"); 130 clear_packet_count(); 131 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) { 132 printf("line %d: Error getting mbufs from pool\n", __LINE__); 133 return -1; 134 } 135 136 /* now set all hash values in all buffers to zero, so all pkts go to the 137 * one worker thread */ 138 for (i = 0; i < BURST; i++) 139 bufs[i]->hash.usr = 0; 140 141 processed = 0; 142 while (processed < BURST) 143 processed += rte_distributor_process(db, &bufs[processed], 144 BURST - processed); 145 146 count = 0; 147 do { 148 149 rte_distributor_flush(db); 150 count += rte_distributor_returned_pkts(db, 151 returns, BURST*2); 152 } while (count < BURST); 153 154 if (total_packet_count() != BURST) { 155 printf("Line %d: Error, not all packets flushed. " 156 "Expected %u, got %u\n", 157 __LINE__, BURST, total_packet_count()); 158 rte_mempool_put_bulk(p, (void *)bufs, BURST); 159 return -1; 160 } 161 162 for (i = 0; i < rte_lcore_count() - 1; i++) 163 printf("Worker %u handled %u packets\n", i, 164 __atomic_load_n(&worker_stats[i].handled_packets, 165 __ATOMIC_RELAXED)); 166 printf("Sanity test with all zero hashes done.\n"); 167 168 /* pick two flows and check they go correctly */ 169 if (rte_lcore_count() >= 3) { 170 clear_packet_count(); 171 for (i = 0; i < BURST; i++) 172 bufs[i]->hash.usr = (i & 1) << 8; 173 174 rte_distributor_process(db, bufs, BURST); 175 count = 0; 176 do { 177 rte_distributor_flush(db); 178 count += rte_distributor_returned_pkts(db, 179 returns, BURST*2); 180 } while (count < BURST); 181 if (total_packet_count() != BURST) { 182 printf("Line %d: Error, not all packets flushed. " 183 "Expected %u, got %u\n", 184 __LINE__, BURST, total_packet_count()); 185 rte_mempool_put_bulk(p, (void *)bufs, BURST); 186 return -1; 187 } 188 189 for (i = 0; i < rte_lcore_count() - 1; i++) 190 printf("Worker %u handled %u packets\n", i, 191 __atomic_load_n( 192 &worker_stats[i].handled_packets, 193 __ATOMIC_RELAXED)); 194 printf("Sanity test with two hash values done\n"); 195 } 196 197 /* give a different hash value to each packet, 198 * so load gets distributed */ 199 clear_packet_count(); 200 for (i = 0; i < BURST; i++) 201 bufs[i]->hash.usr = i+1; 202 203 rte_distributor_process(db, bufs, BURST); 204 count = 0; 205 do { 206 rte_distributor_flush(db); 207 count += rte_distributor_returned_pkts(db, 208 returns, BURST*2); 209 } while (count < BURST); 210 if (total_packet_count() != BURST) { 211 printf("Line %d: Error, not all packets flushed. " 212 "Expected %u, got %u\n", 213 __LINE__, BURST, total_packet_count()); 214 rte_mempool_put_bulk(p, (void *)bufs, BURST); 215 return -1; 216 } 217 218 for (i = 0; i < rte_lcore_count() - 1; i++) 219 printf("Worker %u handled %u packets\n", i, 220 __atomic_load_n(&worker_stats[i].handled_packets, 221 __ATOMIC_RELAXED)); 222 printf("Sanity test with non-zero hashes done\n"); 223 224 rte_mempool_put_bulk(p, (void *)bufs, BURST); 225 226 /* sanity test with BIG_BATCH packets to ensure they all arrived back 227 * from the returned packets function */ 228 clear_packet_count(); 229 struct rte_mbuf *many_bufs[BIG_BATCH], *return_bufs[BIG_BATCH]; 230 unsigned num_returned = 0; 231 unsigned int num_being_processed = 0; 232 unsigned int return_buffer_capacity = 127;/* RTE_DISTRIB_RETURNS_MASK */ 233 234 /* flush out any remaining packets */ 235 rte_distributor_flush(db); 236 rte_distributor_clear_returns(db); 237 238 if (rte_mempool_get_bulk(p, (void *)many_bufs, BIG_BATCH) != 0) { 239 printf("line %d: Error getting mbufs from pool\n", __LINE__); 240 return -1; 241 } 242 for (i = 0; i < BIG_BATCH; i++) 243 many_bufs[i]->hash.usr = i << 2; 244 245 printf("=== testing big burst (%s) ===\n", wp->name); 246 for (i = 0; i < BIG_BATCH/BURST; i++) { 247 rte_distributor_process(db, 248 &many_bufs[i*BURST], BURST); 249 num_being_processed += BURST; 250 do { 251 count = rte_distributor_returned_pkts(db, 252 &return_bufs[num_returned], 253 BIG_BATCH - num_returned); 254 num_being_processed -= count; 255 num_returned += count; 256 rte_distributor_flush(db); 257 } while (num_being_processed + BURST > return_buffer_capacity); 258 } 259 retries = 0; 260 do { 261 rte_distributor_flush(db); 262 count = rte_distributor_returned_pkts(db, 263 &return_bufs[num_returned], 264 BIG_BATCH - num_returned); 265 num_returned += count; 266 retries++; 267 } while ((num_returned < BIG_BATCH) && (retries < 100)); 268 269 if (num_returned != BIG_BATCH) { 270 printf("line %d: Missing packets, expected %d\n", 271 __LINE__, num_returned); 272 rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH); 273 return -1; 274 } 275 276 /* big check - make sure all packets made it back!! */ 277 for (i = 0; i < BIG_BATCH; i++) { 278 unsigned j; 279 struct rte_mbuf *src = many_bufs[i]; 280 for (j = 0; j < BIG_BATCH; j++) { 281 if (return_bufs[j] == src) 282 break; 283 } 284 285 if (j == BIG_BATCH) { 286 printf("Error: could not find source packet #%u\n", i); 287 rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH); 288 return -1; 289 } 290 } 291 printf("Sanity test of returned packets done\n"); 292 293 rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH); 294 295 printf("\n"); 296 return 0; 297 } 298 299 300 /* to test that the distributor does not lose packets, we use this worker 301 * function which frees mbufs when it gets them. The distributor thread does 302 * the mbuf allocation. If distributor drops packets we'll eventually run out 303 * of mbufs. 304 */ 305 static int 306 handle_work_with_free_mbufs(void *arg) 307 { 308 struct rte_mbuf *buf[8] __rte_cache_aligned; 309 struct worker_params *wp = arg; 310 struct rte_distributor *d = wp->dist; 311 unsigned int i; 312 unsigned int num; 313 unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED); 314 315 num = rte_distributor_get_pkt(d, id, buf, NULL, 0); 316 while (!quit) { 317 __atomic_fetch_add(&worker_stats[id].handled_packets, num, 318 __ATOMIC_RELAXED); 319 for (i = 0; i < num; i++) 320 rte_pktmbuf_free(buf[i]); 321 num = rte_distributor_get_pkt(d, id, buf, NULL, 0); 322 } 323 __atomic_fetch_add(&worker_stats[id].handled_packets, num, 324 __ATOMIC_RELAXED); 325 rte_distributor_return_pkt(d, id, buf, num); 326 return 0; 327 } 328 329 /* Perform a sanity test of the distributor with a large number of packets, 330 * where we allocate a new set of mbufs for each burst. The workers then 331 * free the mbufs. This ensures that we don't have any packet leaks in the 332 * library. 333 */ 334 static int 335 sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p) 336 { 337 struct rte_distributor *d = wp->dist; 338 unsigned i; 339 struct rte_mbuf *bufs[BURST]; 340 unsigned int processed; 341 342 printf("=== Sanity test with mbuf alloc/free (%s) ===\n", wp->name); 343 344 clear_packet_count(); 345 for (i = 0; i < ((1<<ITER_POWER)); i += BURST) { 346 unsigned j; 347 while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0) 348 rte_distributor_process(d, NULL, 0); 349 for (j = 0; j < BURST; j++) { 350 bufs[j]->hash.usr = (i+j) << 1; 351 } 352 353 processed = 0; 354 while (processed < BURST) 355 processed += rte_distributor_process(d, 356 &bufs[processed], BURST - processed); 357 } 358 359 rte_distributor_flush(d); 360 361 rte_delay_us(10000); 362 363 if (total_packet_count() < (1<<ITER_POWER)) { 364 printf("Line %u: Packet count is incorrect, %u, expected %u\n", 365 __LINE__, total_packet_count(), 366 (1<<ITER_POWER)); 367 return -1; 368 } 369 370 printf("Sanity test with mbuf alloc/free passed\n\n"); 371 return 0; 372 } 373 374 static int 375 handle_work_for_shutdown_test(void *arg) 376 { 377 struct rte_mbuf *buf[8] __rte_cache_aligned; 378 struct worker_params *wp = arg; 379 struct rte_distributor *d = wp->dist; 380 unsigned int num; 381 unsigned int zero_id = 0; 382 unsigned int zero_unset; 383 const unsigned int id = __atomic_fetch_add(&worker_idx, 1, 384 __ATOMIC_RELAXED); 385 386 num = rte_distributor_get_pkt(d, id, buf, NULL, 0); 387 388 if (num > 0) { 389 zero_unset = RTE_MAX_LCORE; 390 __atomic_compare_exchange_n(&zero_idx, &zero_unset, id, 391 false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE); 392 } 393 zero_id = __atomic_load_n(&zero_idx, __ATOMIC_ACQUIRE); 394 395 /* wait for quit single globally, or for worker zero, wait 396 * for zero_quit */ 397 while (!quit && !(id == zero_id && zero_quit)) { 398 __atomic_fetch_add(&worker_stats[id].handled_packets, num, 399 __ATOMIC_RELAXED); 400 num = rte_distributor_get_pkt(d, id, buf, NULL, 0); 401 402 if (num > 0) { 403 zero_unset = RTE_MAX_LCORE; 404 __atomic_compare_exchange_n(&zero_idx, &zero_unset, id, 405 false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE); 406 } 407 zero_id = __atomic_load_n(&zero_idx, __ATOMIC_ACQUIRE); 408 } 409 410 __atomic_fetch_add(&worker_stats[id].handled_packets, num, 411 __ATOMIC_RELAXED); 412 if (id == zero_id) { 413 rte_distributor_return_pkt(d, id, NULL, 0); 414 415 /* for worker zero, allow it to restart to pick up last packet 416 * when all workers are shutting down. 417 */ 418 __atomic_store_n(&zero_sleep, 1, __ATOMIC_RELEASE); 419 while (zero_quit) 420 usleep(100); 421 __atomic_store_n(&zero_sleep, 0, __ATOMIC_RELEASE); 422 423 num = rte_distributor_get_pkt(d, id, buf, NULL, 0); 424 425 while (!quit) { 426 __atomic_fetch_add(&worker_stats[id].handled_packets, 427 num, __ATOMIC_RELAXED); 428 num = rte_distributor_get_pkt(d, id, buf, NULL, 0); 429 } 430 } 431 rte_distributor_return_pkt(d, id, buf, num); 432 return 0; 433 } 434 435 436 /* Perform a sanity test of the distributor with a large number of packets, 437 * where we allocate a new set of mbufs for each burst. The workers then 438 * free the mbufs. This ensures that we don't have any packet leaks in the 439 * library. 440 */ 441 static int 442 sanity_test_with_worker_shutdown(struct worker_params *wp, 443 struct rte_mempool *p) 444 { 445 struct rte_distributor *d = wp->dist; 446 struct rte_mbuf *bufs[BURST]; 447 struct rte_mbuf *bufs2[BURST]; 448 unsigned int i; 449 unsigned int failed = 0; 450 unsigned int processed = 0; 451 452 printf("=== Sanity test of worker shutdown ===\n"); 453 454 clear_packet_count(); 455 456 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) { 457 printf("line %d: Error getting mbufs from pool\n", __LINE__); 458 return -1; 459 } 460 461 /* 462 * Now set all hash values in all buffers to same value so all 463 * pkts go to the one worker thread 464 */ 465 for (i = 0; i < BURST; i++) 466 bufs[i]->hash.usr = 1; 467 468 processed = 0; 469 while (processed < BURST) 470 processed += rte_distributor_process(d, &bufs[processed], 471 BURST - processed); 472 rte_distributor_flush(d); 473 474 /* at this point, we will have processed some packets and have a full 475 * backlog for the other ones at worker 0. 476 */ 477 478 /* get more buffers to queue up, again setting them to the same flow */ 479 if (rte_mempool_get_bulk(p, (void *)bufs2, BURST) != 0) { 480 printf("line %d: Error getting mbufs from pool\n", __LINE__); 481 rte_mempool_put_bulk(p, (void *)bufs, BURST); 482 return -1; 483 } 484 for (i = 0; i < BURST; i++) 485 bufs2[i]->hash.usr = 1; 486 487 /* get worker zero to quit */ 488 zero_quit = 1; 489 rte_distributor_process(d, bufs2, BURST); 490 491 /* flush the distributor */ 492 rte_distributor_flush(d); 493 while (!__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE)) 494 rte_distributor_flush(d); 495 496 zero_quit = 0; 497 while (__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE)) 498 rte_delay_us(100); 499 500 for (i = 0; i < rte_lcore_count() - 1; i++) 501 printf("Worker %u handled %u packets\n", i, 502 __atomic_load_n(&worker_stats[i].handled_packets, 503 __ATOMIC_RELAXED)); 504 505 if (total_packet_count() != BURST * 2) { 506 printf("Line %d: Error, not all packets flushed. " 507 "Expected %u, got %u\n", 508 __LINE__, BURST * 2, total_packet_count()); 509 failed = 1; 510 } 511 512 rte_mempool_put_bulk(p, (void *)bufs, BURST); 513 rte_mempool_put_bulk(p, (void *)bufs2, BURST); 514 515 if (failed) 516 return -1; 517 518 printf("Sanity test with worker shutdown passed\n\n"); 519 return 0; 520 } 521 522 /* Test that the flush function is able to move packets between workers when 523 * one worker shuts down.. 524 */ 525 static int 526 test_flush_with_worker_shutdown(struct worker_params *wp, 527 struct rte_mempool *p) 528 { 529 struct rte_distributor *d = wp->dist; 530 struct rte_mbuf *bufs[BURST]; 531 unsigned int i; 532 unsigned int failed = 0; 533 unsigned int processed; 534 535 printf("=== Test flush fn with worker shutdown (%s) ===\n", wp->name); 536 537 clear_packet_count(); 538 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) { 539 printf("line %d: Error getting mbufs from pool\n", __LINE__); 540 return -1; 541 } 542 543 /* now set all hash values in all buffers to zero, so all pkts go to the 544 * one worker thread */ 545 for (i = 0; i < BURST; i++) 546 bufs[i]->hash.usr = 0; 547 548 processed = 0; 549 while (processed < BURST) 550 processed += rte_distributor_process(d, &bufs[processed], 551 BURST - processed); 552 /* at this point, we will have processed some packets and have a full 553 * backlog for the other ones at worker 0. 554 */ 555 556 /* get worker zero to quit */ 557 zero_quit = 1; 558 559 /* flush the distributor */ 560 rte_distributor_flush(d); 561 562 while (!__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE)) 563 rte_distributor_flush(d); 564 565 zero_quit = 0; 566 567 while (__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE)) 568 rte_delay_us(100); 569 570 for (i = 0; i < rte_lcore_count() - 1; i++) 571 printf("Worker %u handled %u packets\n", i, 572 __atomic_load_n(&worker_stats[i].handled_packets, 573 __ATOMIC_RELAXED)); 574 575 if (total_packet_count() != BURST) { 576 printf("Line %d: Error, not all packets flushed. " 577 "Expected %u, got %u\n", 578 __LINE__, BURST, total_packet_count()); 579 failed = 1; 580 } 581 582 rte_mempool_put_bulk(p, (void *)bufs, BURST); 583 584 if (failed) 585 return -1; 586 587 printf("Flush test with worker shutdown passed\n\n"); 588 return 0; 589 } 590 591 static int 592 handle_and_mark_work(void *arg) 593 { 594 struct rte_mbuf *buf[8] __rte_cache_aligned; 595 struct worker_params *wp = arg; 596 struct rte_distributor *db = wp->dist; 597 unsigned int num, i; 598 unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED); 599 num = rte_distributor_get_pkt(db, id, buf, NULL, 0); 600 while (!quit) { 601 __atomic_fetch_add(&worker_stats[id].handled_packets, num, 602 __ATOMIC_RELAXED); 603 for (i = 0; i < num; i++) 604 *seq_field(buf[i]) += id + 1; 605 num = rte_distributor_get_pkt(db, id, 606 buf, buf, num); 607 } 608 __atomic_fetch_add(&worker_stats[id].handled_packets, num, 609 __ATOMIC_RELAXED); 610 rte_distributor_return_pkt(db, id, buf, num); 611 return 0; 612 } 613 614 /* sanity_mark_test sends packets to workers which mark them. 615 * Every packet has also encoded sequence number. 616 * The returned packets are sorted and verified if they were handled 617 * by proper workers. 618 */ 619 static int 620 sanity_mark_test(struct worker_params *wp, struct rte_mempool *p) 621 { 622 const unsigned int buf_count = 24; 623 const unsigned int burst = 8; 624 const unsigned int shift = 12; 625 const unsigned int seq_shift = 10; 626 627 struct rte_distributor *db = wp->dist; 628 struct rte_mbuf *bufs[buf_count]; 629 struct rte_mbuf *returns[buf_count]; 630 unsigned int i, count, id; 631 unsigned int sorted[buf_count], seq; 632 unsigned int failed = 0; 633 unsigned int processed; 634 635 printf("=== Marked packets test ===\n"); 636 clear_packet_count(); 637 if (rte_mempool_get_bulk(p, (void *)bufs, buf_count) != 0) { 638 printf("line %d: Error getting mbufs from pool\n", __LINE__); 639 return -1; 640 } 641 642 /* bufs' hashes will be like these below, but shifted left. 643 * The shifting is for avoiding collisions with backlogs 644 * and in-flight tags left by previous tests. 645 * [1, 1, 1, 1, 1, 1, 1, 1 646 * 1, 1, 1, 1, 2, 2, 2, 2 647 * 2, 2, 2, 2, 1, 1, 1, 1] 648 */ 649 for (i = 0; i < burst; i++) { 650 bufs[0 * burst + i]->hash.usr = 1 << shift; 651 bufs[1 * burst + i]->hash.usr = ((i < burst / 2) ? 1 : 2) 652 << shift; 653 bufs[2 * burst + i]->hash.usr = ((i < burst / 2) ? 2 : 1) 654 << shift; 655 } 656 /* Assign a sequence number to each packet. The sequence is shifted, 657 * so that lower bits will hold mark from worker. 658 */ 659 for (i = 0; i < buf_count; i++) 660 *seq_field(bufs[i]) = i << seq_shift; 661 662 count = 0; 663 for (i = 0; i < buf_count/burst; i++) { 664 processed = 0; 665 while (processed < burst) 666 processed += rte_distributor_process(db, 667 &bufs[i * burst + processed], 668 burst - processed); 669 count += rte_distributor_returned_pkts(db, &returns[count], 670 buf_count - count); 671 } 672 673 do { 674 rte_distributor_flush(db); 675 count += rte_distributor_returned_pkts(db, &returns[count], 676 buf_count - count); 677 } while (count < buf_count); 678 679 for (i = 0; i < rte_lcore_count() - 1; i++) 680 printf("Worker %u handled %u packets\n", i, 681 __atomic_load_n(&worker_stats[i].handled_packets, 682 __ATOMIC_RELAXED)); 683 684 /* Sort returned packets by sent order (sequence numbers). */ 685 for (i = 0; i < buf_count; i++) { 686 seq = *seq_field(returns[i]) >> seq_shift; 687 id = *seq_field(returns[i]) - (seq << seq_shift); 688 sorted[seq] = id; 689 } 690 691 /* Verify that packets [0-11] and [20-23] were processed 692 * by the same worker 693 */ 694 for (i = 1; i < 12; i++) { 695 if (sorted[i] != sorted[0]) { 696 printf("Packet number %u processed by worker %u," 697 " but should be processes by worker %u\n", 698 i, sorted[i], sorted[0]); 699 failed = 1; 700 } 701 } 702 for (i = 20; i < 24; i++) { 703 if (sorted[i] != sorted[0]) { 704 printf("Packet number %u processed by worker %u," 705 " but should be processes by worker %u\n", 706 i, sorted[i], sorted[0]); 707 failed = 1; 708 } 709 } 710 /* And verify that packets [12-19] were processed 711 * by the another worker 712 */ 713 for (i = 13; i < 20; i++) { 714 if (sorted[i] != sorted[12]) { 715 printf("Packet number %u processed by worker %u," 716 " but should be processes by worker %u\n", 717 i, sorted[i], sorted[12]); 718 failed = 1; 719 } 720 } 721 722 rte_mempool_put_bulk(p, (void *)bufs, buf_count); 723 724 if (failed) 725 return -1; 726 727 printf("Marked packets test passed\n"); 728 return 0; 729 } 730 731 static 732 int test_error_distributor_create_name(void) 733 { 734 struct rte_distributor *d = NULL; 735 struct rte_distributor *db = NULL; 736 char *name = NULL; 737 738 d = rte_distributor_create(name, rte_socket_id(), 739 rte_lcore_count() - 1, 740 RTE_DIST_ALG_SINGLE); 741 if (d != NULL || rte_errno != EINVAL) { 742 printf("ERROR: No error on create() with NULL name param\n"); 743 return -1; 744 } 745 746 db = rte_distributor_create(name, rte_socket_id(), 747 rte_lcore_count() - 1, 748 RTE_DIST_ALG_BURST); 749 if (db != NULL || rte_errno != EINVAL) { 750 printf("ERROR: No error on create() with NULL param\n"); 751 return -1; 752 } 753 754 return 0; 755 } 756 757 758 static 759 int test_error_distributor_create_numworkers(void) 760 { 761 struct rte_distributor *ds = NULL; 762 struct rte_distributor *db = NULL; 763 764 ds = rte_distributor_create("test_numworkers", rte_socket_id(), 765 RTE_MAX_LCORE + 10, 766 RTE_DIST_ALG_SINGLE); 767 if (ds != NULL || rte_errno != EINVAL) { 768 printf("ERROR: No error on create() with num_workers > MAX\n"); 769 return -1; 770 } 771 772 db = rte_distributor_create("test_numworkers", rte_socket_id(), 773 RTE_MAX_LCORE + 10, 774 RTE_DIST_ALG_BURST); 775 if (db != NULL || rte_errno != EINVAL) { 776 printf("ERROR: No error on create() num_workers > MAX\n"); 777 return -1; 778 } 779 780 return 0; 781 } 782 783 784 /* Useful function which ensures that all worker functions terminate */ 785 static void 786 quit_workers(struct worker_params *wp, struct rte_mempool *p) 787 { 788 struct rte_distributor *d = wp->dist; 789 const unsigned num_workers = rte_lcore_count() - 1; 790 unsigned i; 791 struct rte_mbuf *bufs[RTE_MAX_LCORE]; 792 struct rte_mbuf *returns[RTE_MAX_LCORE]; 793 if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) { 794 printf("line %d: Error getting mbufs from pool\n", __LINE__); 795 return; 796 } 797 798 zero_quit = 0; 799 quit = 1; 800 for (i = 0; i < num_workers; i++) { 801 bufs[i]->hash.usr = i << 1; 802 rte_distributor_process(d, &bufs[i], 1); 803 } 804 805 rte_distributor_process(d, NULL, 0); 806 rte_distributor_flush(d); 807 rte_eal_mp_wait_lcore(); 808 809 while (rte_distributor_returned_pkts(d, returns, RTE_MAX_LCORE)) 810 ; 811 812 rte_distributor_clear_returns(d); 813 rte_mempool_put_bulk(p, (void *)bufs, num_workers); 814 815 quit = 0; 816 worker_idx = 0; 817 zero_idx = RTE_MAX_LCORE; 818 zero_quit = 0; 819 zero_sleep = 0; 820 } 821 822 static int 823 test_distributor(void) 824 { 825 static struct rte_distributor *ds; 826 static struct rte_distributor *db; 827 static struct rte_distributor *dist[2]; 828 static struct rte_mempool *p; 829 int i; 830 831 static const struct rte_mbuf_dynfield seq_dynfield_desc = { 832 .name = "test_distributor_dynfield_seq", 833 .size = sizeof(seq_dynfield_t), 834 .align = __alignof__(seq_dynfield_t), 835 }; 836 seq_dynfield_offset = 837 rte_mbuf_dynfield_register(&seq_dynfield_desc); 838 if (seq_dynfield_offset < 0) { 839 printf("Error registering mbuf field\n"); 840 return TEST_FAILED; 841 } 842 843 if (rte_lcore_count() < 2) { 844 printf("Not enough cores for distributor_autotest, expecting at least 2\n"); 845 return TEST_SKIPPED; 846 } 847 848 if (db == NULL) { 849 db = rte_distributor_create("Test_dist_burst", rte_socket_id(), 850 rte_lcore_count() - 1, 851 RTE_DIST_ALG_BURST); 852 if (db == NULL) { 853 printf("Error creating burst distributor\n"); 854 return -1; 855 } 856 } else { 857 rte_distributor_flush(db); 858 rte_distributor_clear_returns(db); 859 } 860 861 if (ds == NULL) { 862 ds = rte_distributor_create("Test_dist_single", 863 rte_socket_id(), 864 rte_lcore_count() - 1, 865 RTE_DIST_ALG_SINGLE); 866 if (ds == NULL) { 867 printf("Error creating single distributor\n"); 868 return -1; 869 } 870 } else { 871 rte_distributor_flush(ds); 872 rte_distributor_clear_returns(ds); 873 } 874 875 const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ? 876 (BIG_BATCH * 2) - 1 : (511 * rte_lcore_count()); 877 if (p == NULL) { 878 p = rte_pktmbuf_pool_create("DT_MBUF_POOL", nb_bufs, BURST, 879 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); 880 if (p == NULL) { 881 printf("Error creating mempool\n"); 882 return -1; 883 } 884 } 885 886 dist[0] = ds; 887 dist[1] = db; 888 889 for (i = 0; i < 2; i++) { 890 891 worker_params.dist = dist[i]; 892 if (i) 893 strlcpy(worker_params.name, "burst", 894 sizeof(worker_params.name)); 895 else 896 strlcpy(worker_params.name, "single", 897 sizeof(worker_params.name)); 898 899 rte_eal_mp_remote_launch(handle_work, 900 &worker_params, SKIP_MAIN); 901 if (sanity_test(&worker_params, p) < 0) 902 goto err; 903 quit_workers(&worker_params, p); 904 905 rte_eal_mp_remote_launch(handle_work_with_free_mbufs, 906 &worker_params, SKIP_MAIN); 907 if (sanity_test_with_mbuf_alloc(&worker_params, p) < 0) 908 goto err; 909 quit_workers(&worker_params, p); 910 911 if (rte_lcore_count() > 2) { 912 rte_eal_mp_remote_launch(handle_work_for_shutdown_test, 913 &worker_params, 914 SKIP_MAIN); 915 if (sanity_test_with_worker_shutdown(&worker_params, 916 p) < 0) 917 goto err; 918 quit_workers(&worker_params, p); 919 920 rte_eal_mp_remote_launch(handle_work_for_shutdown_test, 921 &worker_params, 922 SKIP_MAIN); 923 if (test_flush_with_worker_shutdown(&worker_params, 924 p) < 0) 925 goto err; 926 quit_workers(&worker_params, p); 927 928 rte_eal_mp_remote_launch(handle_and_mark_work, 929 &worker_params, SKIP_MAIN); 930 if (sanity_mark_test(&worker_params, p) < 0) 931 goto err; 932 quit_workers(&worker_params, p); 933 934 } else { 935 printf("Too few cores to run worker shutdown test\n"); 936 } 937 938 } 939 940 if (test_error_distributor_create_numworkers() == -1 || 941 test_error_distributor_create_name() == -1) { 942 printf("rte_distributor_create parameter check tests failed"); 943 return -1; 944 } 945 946 return 0; 947 948 err: 949 quit_workers(&worker_params, p); 950 return -1; 951 } 952 953 #endif /* !RTE_EXEC_ENV_WINDOWS */ 954 955 REGISTER_TEST_COMMAND(distributor_autotest, test_distributor); 956