1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2010-2017 Intel Corporation 3 */ 4 5 #include "test.h" 6 7 #include <unistd.h> 8 #include <string.h> 9 #include <rte_cycles.h> 10 #include <rte_errno.h> 11 #include <rte_mempool.h> 12 #include <rte_mbuf.h> 13 #include <rte_distributor.h> 14 #include <rte_string_fns.h> 15 16 #define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */ 17 #define BURST 32 18 #define BIG_BATCH 1024 19 20 struct worker_params { 21 char name[64]; 22 struct rte_distributor *dist; 23 }; 24 25 struct worker_params worker_params; 26 27 /* statics - all zero-initialized by default */ 28 static volatile int quit; /**< general quit variable for all threads */ 29 static volatile int zero_quit; /**< var for when we just want thr0 to quit*/ 30 static volatile int zero_sleep; /**< thr0 has quit basic loop and is sleeping*/ 31 static volatile unsigned worker_idx; 32 static volatile unsigned zero_idx; 33 34 struct worker_stats { 35 volatile unsigned handled_packets; 36 } __rte_cache_aligned; 37 struct worker_stats worker_stats[RTE_MAX_LCORE]; 38 39 /* returns the total count of the number of packets handled by the worker 40 * functions given below. 41 */ 42 static inline unsigned 43 total_packet_count(void) 44 { 45 unsigned i, count = 0; 46 for (i = 0; i < worker_idx; i++) 47 count += __atomic_load_n(&worker_stats[i].handled_packets, 48 __ATOMIC_RELAXED); 49 return count; 50 } 51 52 /* resets the packet counts for a new test */ 53 static inline void 54 clear_packet_count(void) 55 { 56 unsigned int i; 57 for (i = 0; i < RTE_MAX_LCORE; i++) 58 __atomic_store_n(&worker_stats[i].handled_packets, 0, 59 __ATOMIC_RELAXED); 60 } 61 62 /* this is the basic worker function for sanity test 63 * it does nothing but return packets and count them. 64 */ 65 static int 66 handle_work(void *arg) 67 { 68 struct rte_mbuf *buf[8] __rte_cache_aligned; 69 struct worker_params *wp = arg; 70 struct rte_distributor *db = wp->dist; 71 unsigned int num; 72 unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED); 73 74 num = rte_distributor_get_pkt(db, id, buf, NULL, 0); 75 while (!quit) { 76 __atomic_fetch_add(&worker_stats[id].handled_packets, num, 77 __ATOMIC_RELAXED); 78 num = rte_distributor_get_pkt(db, id, 79 buf, buf, num); 80 } 81 __atomic_fetch_add(&worker_stats[id].handled_packets, num, 82 __ATOMIC_RELAXED); 83 rte_distributor_return_pkt(db, id, buf, num); 84 return 0; 85 } 86 87 /* do basic sanity testing of the distributor. This test tests the following: 88 * - send 32 packets through distributor with the same tag and ensure they 89 * all go to the one worker 90 * - send 32 packets through the distributor with two different tags and 91 * verify that they go equally to two different workers. 92 * - send 32 packets with different tags through the distributors and 93 * just verify we get all packets back. 94 * - send 1024 packets through the distributor, gathering the returned packets 95 * as we go. Then verify that we correctly got all 1024 pointers back again, 96 * not necessarily in the same order (as different flows). 97 */ 98 static int 99 sanity_test(struct worker_params *wp, struct rte_mempool *p) 100 { 101 struct rte_distributor *db = wp->dist; 102 struct rte_mbuf *bufs[BURST]; 103 struct rte_mbuf *returns[BURST*2]; 104 unsigned int i, count; 105 unsigned int retries; 106 107 printf("=== Basic distributor sanity tests ===\n"); 108 clear_packet_count(); 109 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) { 110 printf("line %d: Error getting mbufs from pool\n", __LINE__); 111 return -1; 112 } 113 114 /* now set all hash values in all buffers to zero, so all pkts go to the 115 * one worker thread */ 116 for (i = 0; i < BURST; i++) 117 bufs[i]->hash.usr = 0; 118 119 rte_distributor_process(db, bufs, BURST); 120 count = 0; 121 do { 122 123 rte_distributor_flush(db); 124 count += rte_distributor_returned_pkts(db, 125 returns, BURST*2); 126 } while (count < BURST); 127 128 if (total_packet_count() != BURST) { 129 printf("Line %d: Error, not all packets flushed. " 130 "Expected %u, got %u\n", 131 __LINE__, BURST, total_packet_count()); 132 return -1; 133 } 134 135 for (i = 0; i < rte_lcore_count() - 1; i++) 136 printf("Worker %u handled %u packets\n", i, 137 __atomic_load_n(&worker_stats[i].handled_packets, 138 __ATOMIC_RELAXED)); 139 printf("Sanity test with all zero hashes done.\n"); 140 141 /* pick two flows and check they go correctly */ 142 if (rte_lcore_count() >= 3) { 143 clear_packet_count(); 144 for (i = 0; i < BURST; i++) 145 bufs[i]->hash.usr = (i & 1) << 8; 146 147 rte_distributor_process(db, bufs, BURST); 148 count = 0; 149 do { 150 rte_distributor_flush(db); 151 count += rte_distributor_returned_pkts(db, 152 returns, BURST*2); 153 } while (count < BURST); 154 if (total_packet_count() != BURST) { 155 printf("Line %d: Error, not all packets flushed. " 156 "Expected %u, got %u\n", 157 __LINE__, BURST, total_packet_count()); 158 return -1; 159 } 160 161 for (i = 0; i < rte_lcore_count() - 1; i++) 162 printf("Worker %u handled %u packets\n", i, 163 __atomic_load_n( 164 &worker_stats[i].handled_packets, 165 __ATOMIC_RELAXED)); 166 printf("Sanity test with two hash values done\n"); 167 } 168 169 /* give a different hash value to each packet, 170 * so load gets distributed */ 171 clear_packet_count(); 172 for (i = 0; i < BURST; i++) 173 bufs[i]->hash.usr = i+1; 174 175 rte_distributor_process(db, bufs, BURST); 176 count = 0; 177 do { 178 rte_distributor_flush(db); 179 count += rte_distributor_returned_pkts(db, 180 returns, BURST*2); 181 } while (count < BURST); 182 if (total_packet_count() != BURST) { 183 printf("Line %d: Error, not all packets flushed. " 184 "Expected %u, got %u\n", 185 __LINE__, BURST, total_packet_count()); 186 return -1; 187 } 188 189 for (i = 0; i < rte_lcore_count() - 1; i++) 190 printf("Worker %u handled %u packets\n", i, 191 __atomic_load_n(&worker_stats[i].handled_packets, 192 __ATOMIC_RELAXED)); 193 printf("Sanity test with non-zero hashes done\n"); 194 195 rte_mempool_put_bulk(p, (void *)bufs, BURST); 196 197 /* sanity test with BIG_BATCH packets to ensure they all arrived back 198 * from the returned packets function */ 199 clear_packet_count(); 200 struct rte_mbuf *many_bufs[BIG_BATCH], *return_bufs[BIG_BATCH]; 201 unsigned num_returned = 0; 202 203 /* flush out any remaining packets */ 204 rte_distributor_flush(db); 205 rte_distributor_clear_returns(db); 206 207 if (rte_mempool_get_bulk(p, (void *)many_bufs, BIG_BATCH) != 0) { 208 printf("line %d: Error getting mbufs from pool\n", __LINE__); 209 return -1; 210 } 211 for (i = 0; i < BIG_BATCH; i++) 212 many_bufs[i]->hash.usr = i << 2; 213 214 printf("=== testing big burst (%s) ===\n", wp->name); 215 for (i = 0; i < BIG_BATCH/BURST; i++) { 216 rte_distributor_process(db, 217 &many_bufs[i*BURST], BURST); 218 count = rte_distributor_returned_pkts(db, 219 &return_bufs[num_returned], 220 BIG_BATCH - num_returned); 221 num_returned += count; 222 } 223 rte_distributor_flush(db); 224 count = rte_distributor_returned_pkts(db, 225 &return_bufs[num_returned], 226 BIG_BATCH - num_returned); 227 num_returned += count; 228 retries = 0; 229 do { 230 rte_distributor_flush(db); 231 count = rte_distributor_returned_pkts(db, 232 &return_bufs[num_returned], 233 BIG_BATCH - num_returned); 234 num_returned += count; 235 retries++; 236 } while ((num_returned < BIG_BATCH) && (retries < 100)); 237 238 if (num_returned != BIG_BATCH) { 239 printf("line %d: Missing packets, expected %d\n", 240 __LINE__, num_returned); 241 return -1; 242 } 243 244 /* big check - make sure all packets made it back!! */ 245 for (i = 0; i < BIG_BATCH; i++) { 246 unsigned j; 247 struct rte_mbuf *src = many_bufs[i]; 248 for (j = 0; j < BIG_BATCH; j++) { 249 if (return_bufs[j] == src) 250 break; 251 } 252 253 if (j == BIG_BATCH) { 254 printf("Error: could not find source packet #%u\n", i); 255 return -1; 256 } 257 } 258 printf("Sanity test of returned packets done\n"); 259 260 rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH); 261 262 printf("\n"); 263 return 0; 264 } 265 266 267 /* to test that the distributor does not lose packets, we use this worker 268 * function which frees mbufs when it gets them. The distributor thread does 269 * the mbuf allocation. If distributor drops packets we'll eventually run out 270 * of mbufs. 271 */ 272 static int 273 handle_work_with_free_mbufs(void *arg) 274 { 275 struct rte_mbuf *buf[8] __rte_cache_aligned; 276 struct worker_params *wp = arg; 277 struct rte_distributor *d = wp->dist; 278 unsigned int i; 279 unsigned int num; 280 unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED); 281 282 num = rte_distributor_get_pkt(d, id, buf, NULL, 0); 283 while (!quit) { 284 __atomic_fetch_add(&worker_stats[id].handled_packets, num, 285 __ATOMIC_RELAXED); 286 for (i = 0; i < num; i++) 287 rte_pktmbuf_free(buf[i]); 288 num = rte_distributor_get_pkt(d, id, buf, NULL, 0); 289 } 290 __atomic_fetch_add(&worker_stats[id].handled_packets, num, 291 __ATOMIC_RELAXED); 292 rte_distributor_return_pkt(d, id, buf, num); 293 return 0; 294 } 295 296 /* Perform a sanity test of the distributor with a large number of packets, 297 * where we allocate a new set of mbufs for each burst. The workers then 298 * free the mbufs. This ensures that we don't have any packet leaks in the 299 * library. 300 */ 301 static int 302 sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p) 303 { 304 struct rte_distributor *d = wp->dist; 305 unsigned i; 306 struct rte_mbuf *bufs[BURST]; 307 308 printf("=== Sanity test with mbuf alloc/free (%s) ===\n", wp->name); 309 310 clear_packet_count(); 311 for (i = 0; i < ((1<<ITER_POWER)); i += BURST) { 312 unsigned j; 313 while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0) 314 rte_distributor_process(d, NULL, 0); 315 for (j = 0; j < BURST; j++) { 316 bufs[j]->hash.usr = (i+j) << 1; 317 } 318 319 rte_distributor_process(d, bufs, BURST); 320 } 321 322 rte_distributor_flush(d); 323 324 rte_delay_us(10000); 325 326 if (total_packet_count() < (1<<ITER_POWER)) { 327 printf("Line %u: Packet count is incorrect, %u, expected %u\n", 328 __LINE__, total_packet_count(), 329 (1<<ITER_POWER)); 330 return -1; 331 } 332 333 printf("Sanity test with mbuf alloc/free passed\n\n"); 334 return 0; 335 } 336 337 static int 338 handle_work_for_shutdown_test(void *arg) 339 { 340 struct rte_mbuf *buf[8] __rte_cache_aligned; 341 struct worker_params *wp = arg; 342 struct rte_distributor *d = wp->dist; 343 unsigned int num; 344 unsigned int zero_id = 0; 345 unsigned int zero_unset; 346 const unsigned int id = __atomic_fetch_add(&worker_idx, 1, 347 __ATOMIC_RELAXED); 348 349 num = rte_distributor_get_pkt(d, id, buf, NULL, 0); 350 351 if (num > 0) { 352 zero_unset = RTE_MAX_LCORE; 353 __atomic_compare_exchange_n(&zero_idx, &zero_unset, id, 354 false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE); 355 } 356 zero_id = __atomic_load_n(&zero_idx, __ATOMIC_ACQUIRE); 357 358 /* wait for quit single globally, or for worker zero, wait 359 * for zero_quit */ 360 while (!quit && !(id == zero_id && zero_quit)) { 361 __atomic_fetch_add(&worker_stats[id].handled_packets, num, 362 __ATOMIC_RELAXED); 363 num = rte_distributor_get_pkt(d, id, buf, NULL, 0); 364 365 if (num > 0) { 366 zero_unset = RTE_MAX_LCORE; 367 __atomic_compare_exchange_n(&zero_idx, &zero_unset, id, 368 false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE); 369 } 370 zero_id = __atomic_load_n(&zero_idx, __ATOMIC_ACQUIRE); 371 } 372 373 __atomic_fetch_add(&worker_stats[id].handled_packets, num, 374 __ATOMIC_RELAXED); 375 if (id == zero_id) { 376 rte_distributor_return_pkt(d, id, NULL, 0); 377 378 /* for worker zero, allow it to restart to pick up last packet 379 * when all workers are shutting down. 380 */ 381 __atomic_store_n(&zero_sleep, 1, __ATOMIC_RELEASE); 382 while (zero_quit) 383 usleep(100); 384 __atomic_store_n(&zero_sleep, 0, __ATOMIC_RELEASE); 385 386 num = rte_distributor_get_pkt(d, id, buf, NULL, 0); 387 388 while (!quit) { 389 __atomic_fetch_add(&worker_stats[id].handled_packets, 390 num, __ATOMIC_RELAXED); 391 num = rte_distributor_get_pkt(d, id, buf, NULL, 0); 392 } 393 } 394 rte_distributor_return_pkt(d, id, buf, num); 395 return 0; 396 } 397 398 399 /* Perform a sanity test of the distributor with a large number of packets, 400 * where we allocate a new set of mbufs for each burst. The workers then 401 * free the mbufs. This ensures that we don't have any packet leaks in the 402 * library. 403 */ 404 static int 405 sanity_test_with_worker_shutdown(struct worker_params *wp, 406 struct rte_mempool *p) 407 { 408 struct rte_distributor *d = wp->dist; 409 struct rte_mbuf *bufs[BURST]; 410 struct rte_mbuf *bufs2[BURST]; 411 unsigned int i; 412 unsigned int failed = 0; 413 414 printf("=== Sanity test of worker shutdown ===\n"); 415 416 clear_packet_count(); 417 418 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) { 419 printf("line %d: Error getting mbufs from pool\n", __LINE__); 420 return -1; 421 } 422 423 /* 424 * Now set all hash values in all buffers to same value so all 425 * pkts go to the one worker thread 426 */ 427 for (i = 0; i < BURST; i++) 428 bufs[i]->hash.usr = 1; 429 430 rte_distributor_process(d, bufs, BURST); 431 rte_distributor_flush(d); 432 433 /* at this point, we will have processed some packets and have a full 434 * backlog for the other ones at worker 0. 435 */ 436 437 /* get more buffers to queue up, again setting them to the same flow */ 438 if (rte_mempool_get_bulk(p, (void *)bufs2, BURST) != 0) { 439 printf("line %d: Error getting mbufs from pool\n", __LINE__); 440 rte_mempool_put_bulk(p, (void *)bufs, BURST); 441 return -1; 442 } 443 for (i = 0; i < BURST; i++) 444 bufs2[i]->hash.usr = 1; 445 446 /* get worker zero to quit */ 447 zero_quit = 1; 448 rte_distributor_process(d, bufs2, BURST); 449 450 /* flush the distributor */ 451 rte_distributor_flush(d); 452 while (!__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE)) 453 rte_distributor_flush(d); 454 455 zero_quit = 0; 456 while (__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE)) 457 rte_delay_us(100); 458 459 for (i = 0; i < rte_lcore_count() - 1; i++) 460 printf("Worker %u handled %u packets\n", i, 461 __atomic_load_n(&worker_stats[i].handled_packets, 462 __ATOMIC_RELAXED)); 463 464 if (total_packet_count() != BURST * 2) { 465 printf("Line %d: Error, not all packets flushed. " 466 "Expected %u, got %u\n", 467 __LINE__, BURST * 2, total_packet_count()); 468 failed = 1; 469 } 470 471 rte_mempool_put_bulk(p, (void *)bufs, BURST); 472 rte_mempool_put_bulk(p, (void *)bufs2, BURST); 473 474 if (failed) 475 return -1; 476 477 printf("Sanity test with worker shutdown passed\n\n"); 478 return 0; 479 } 480 481 /* Test that the flush function is able to move packets between workers when 482 * one worker shuts down.. 483 */ 484 static int 485 test_flush_with_worker_shutdown(struct worker_params *wp, 486 struct rte_mempool *p) 487 { 488 struct rte_distributor *d = wp->dist; 489 struct rte_mbuf *bufs[BURST]; 490 unsigned int i; 491 unsigned int failed = 0; 492 493 printf("=== Test flush fn with worker shutdown (%s) ===\n", wp->name); 494 495 clear_packet_count(); 496 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) { 497 printf("line %d: Error getting mbufs from pool\n", __LINE__); 498 return -1; 499 } 500 501 /* now set all hash values in all buffers to zero, so all pkts go to the 502 * one worker thread */ 503 for (i = 0; i < BURST; i++) 504 bufs[i]->hash.usr = 0; 505 506 rte_distributor_process(d, bufs, BURST); 507 /* at this point, we will have processed some packets and have a full 508 * backlog for the other ones at worker 0. 509 */ 510 511 /* get worker zero to quit */ 512 zero_quit = 1; 513 514 /* flush the distributor */ 515 rte_distributor_flush(d); 516 517 while (!__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE)) 518 rte_distributor_flush(d); 519 520 zero_quit = 0; 521 522 while (__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE)) 523 rte_delay_us(100); 524 525 for (i = 0; i < rte_lcore_count() - 1; i++) 526 printf("Worker %u handled %u packets\n", i, 527 __atomic_load_n(&worker_stats[i].handled_packets, 528 __ATOMIC_RELAXED)); 529 530 if (total_packet_count() != BURST) { 531 printf("Line %d: Error, not all packets flushed. " 532 "Expected %u, got %u\n", 533 __LINE__, BURST, total_packet_count()); 534 failed = 1; 535 } 536 537 rte_mempool_put_bulk(p, (void *)bufs, BURST); 538 539 if (failed) 540 return -1; 541 542 printf("Flush test with worker shutdown passed\n\n"); 543 return 0; 544 } 545 546 static int 547 handle_and_mark_work(void *arg) 548 { 549 struct rte_mbuf *buf[8] __rte_cache_aligned; 550 struct worker_params *wp = arg; 551 struct rte_distributor *db = wp->dist; 552 unsigned int num, i; 553 unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED); 554 num = rte_distributor_get_pkt(db, id, buf, NULL, 0); 555 while (!quit) { 556 __atomic_fetch_add(&worker_stats[id].handled_packets, num, 557 __ATOMIC_RELAXED); 558 for (i = 0; i < num; i++) 559 buf[i]->udata64 += id + 1; 560 num = rte_distributor_get_pkt(db, id, 561 buf, buf, num); 562 } 563 __atomic_fetch_add(&worker_stats[id].handled_packets, num, 564 __ATOMIC_RELAXED); 565 rte_distributor_return_pkt(db, id, buf, num); 566 return 0; 567 } 568 569 /* sanity_mark_test sends packets to workers which mark them. 570 * Every packet has also encoded sequence number. 571 * The returned packets are sorted and verified if they were handled 572 * by proper workers. 573 */ 574 static int 575 sanity_mark_test(struct worker_params *wp, struct rte_mempool *p) 576 { 577 const unsigned int buf_count = 24; 578 const unsigned int burst = 8; 579 const unsigned int shift = 12; 580 const unsigned int seq_shift = 10; 581 582 struct rte_distributor *db = wp->dist; 583 struct rte_mbuf *bufs[buf_count]; 584 struct rte_mbuf *returns[buf_count]; 585 unsigned int i, count, id; 586 unsigned int sorted[buf_count], seq; 587 unsigned int failed = 0; 588 589 printf("=== Marked packets test ===\n"); 590 clear_packet_count(); 591 if (rte_mempool_get_bulk(p, (void *)bufs, buf_count) != 0) { 592 printf("line %d: Error getting mbufs from pool\n", __LINE__); 593 return -1; 594 } 595 596 /* bufs' hashes will be like these below, but shifted left. 597 * The shifting is for avoiding collisions with backlogs 598 * and in-flight tags left by previous tests. 599 * [1, 1, 1, 1, 1, 1, 1, 1 600 * 1, 1, 1, 1, 2, 2, 2, 2 601 * 2, 2, 2, 2, 1, 1, 1, 1] 602 */ 603 for (i = 0; i < burst; i++) { 604 bufs[0 * burst + i]->hash.usr = 1 << shift; 605 bufs[1 * burst + i]->hash.usr = ((i < burst / 2) ? 1 : 2) 606 << shift; 607 bufs[2 * burst + i]->hash.usr = ((i < burst / 2) ? 2 : 1) 608 << shift; 609 } 610 /* Assign a sequence number to each packet. The sequence is shifted, 611 * so that lower bits of the udate64 will hold mark from worker. 612 */ 613 for (i = 0; i < buf_count; i++) 614 bufs[i]->udata64 = i << seq_shift; 615 616 count = 0; 617 for (i = 0; i < buf_count/burst; i++) { 618 rte_distributor_process(db, &bufs[i * burst], burst); 619 count += rte_distributor_returned_pkts(db, &returns[count], 620 buf_count - count); 621 } 622 623 do { 624 rte_distributor_flush(db); 625 count += rte_distributor_returned_pkts(db, &returns[count], 626 buf_count - count); 627 } while (count < buf_count); 628 629 for (i = 0; i < rte_lcore_count() - 1; i++) 630 printf("Worker %u handled %u packets\n", i, 631 __atomic_load_n(&worker_stats[i].handled_packets, 632 __ATOMIC_RELAXED)); 633 634 /* Sort returned packets by sent order (sequence numbers). */ 635 for (i = 0; i < buf_count; i++) { 636 seq = returns[i]->udata64 >> seq_shift; 637 id = returns[i]->udata64 - (seq << seq_shift); 638 sorted[seq] = id; 639 } 640 641 /* Verify that packets [0-11] and [20-23] were processed 642 * by the same worker 643 */ 644 for (i = 1; i < 12; i++) { 645 if (sorted[i] != sorted[0]) { 646 printf("Packet number %u processed by worker %u," 647 " but should be processes by worker %u\n", 648 i, sorted[i], sorted[0]); 649 failed = 1; 650 } 651 } 652 for (i = 20; i < 24; i++) { 653 if (sorted[i] != sorted[0]) { 654 printf("Packet number %u processed by worker %u," 655 " but should be processes by worker %u\n", 656 i, sorted[i], sorted[0]); 657 failed = 1; 658 } 659 } 660 /* And verify that packets [12-19] were processed 661 * by the another worker 662 */ 663 for (i = 13; i < 20; i++) { 664 if (sorted[i] != sorted[12]) { 665 printf("Packet number %u processed by worker %u," 666 " but should be processes by worker %u\n", 667 i, sorted[i], sorted[12]); 668 failed = 1; 669 } 670 } 671 672 rte_mempool_put_bulk(p, (void *)bufs, buf_count); 673 674 if (failed) 675 return -1; 676 677 printf("Marked packets test passed\n"); 678 return 0; 679 } 680 681 static 682 int test_error_distributor_create_name(void) 683 { 684 struct rte_distributor *d = NULL; 685 struct rte_distributor *db = NULL; 686 char *name = NULL; 687 688 d = rte_distributor_create(name, rte_socket_id(), 689 rte_lcore_count() - 1, 690 RTE_DIST_ALG_SINGLE); 691 if (d != NULL || rte_errno != EINVAL) { 692 printf("ERROR: No error on create() with NULL name param\n"); 693 return -1; 694 } 695 696 db = rte_distributor_create(name, rte_socket_id(), 697 rte_lcore_count() - 1, 698 RTE_DIST_ALG_BURST); 699 if (db != NULL || rte_errno != EINVAL) { 700 printf("ERROR: No error on create() with NULL param\n"); 701 return -1; 702 } 703 704 return 0; 705 } 706 707 708 static 709 int test_error_distributor_create_numworkers(void) 710 { 711 struct rte_distributor *ds = NULL; 712 struct rte_distributor *db = NULL; 713 714 ds = rte_distributor_create("test_numworkers", rte_socket_id(), 715 RTE_MAX_LCORE + 10, 716 RTE_DIST_ALG_SINGLE); 717 if (ds != NULL || rte_errno != EINVAL) { 718 printf("ERROR: No error on create() with num_workers > MAX\n"); 719 return -1; 720 } 721 722 db = rte_distributor_create("test_numworkers", rte_socket_id(), 723 RTE_MAX_LCORE + 10, 724 RTE_DIST_ALG_BURST); 725 if (db != NULL || rte_errno != EINVAL) { 726 printf("ERROR: No error on create() num_workers > MAX\n"); 727 return -1; 728 } 729 730 return 0; 731 } 732 733 734 /* Useful function which ensures that all worker functions terminate */ 735 static void 736 quit_workers(struct worker_params *wp, struct rte_mempool *p) 737 { 738 struct rte_distributor *d = wp->dist; 739 const unsigned num_workers = rte_lcore_count() - 1; 740 unsigned i; 741 struct rte_mbuf *bufs[RTE_MAX_LCORE]; 742 struct rte_mbuf *returns[RTE_MAX_LCORE]; 743 if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) { 744 printf("line %d: Error getting mbufs from pool\n", __LINE__); 745 return; 746 } 747 748 zero_quit = 0; 749 quit = 1; 750 for (i = 0; i < num_workers; i++) 751 bufs[i]->hash.usr = i << 1; 752 rte_distributor_process(d, bufs, num_workers); 753 754 rte_distributor_process(d, NULL, 0); 755 rte_distributor_flush(d); 756 rte_eal_mp_wait_lcore(); 757 758 while (rte_distributor_returned_pkts(d, returns, RTE_MAX_LCORE)) 759 ; 760 761 rte_distributor_clear_returns(d); 762 rte_mempool_put_bulk(p, (void *)bufs, num_workers); 763 764 quit = 0; 765 worker_idx = 0; 766 zero_idx = RTE_MAX_LCORE; 767 zero_quit = 0; 768 zero_sleep = 0; 769 } 770 771 static int 772 test_distributor(void) 773 { 774 static struct rte_distributor *ds; 775 static struct rte_distributor *db; 776 static struct rte_distributor *dist[2]; 777 static struct rte_mempool *p; 778 int i; 779 780 if (rte_lcore_count() < 2) { 781 printf("Not enough cores for distributor_autotest, expecting at least 2\n"); 782 return TEST_SKIPPED; 783 } 784 785 if (db == NULL) { 786 db = rte_distributor_create("Test_dist_burst", rte_socket_id(), 787 rte_lcore_count() - 1, 788 RTE_DIST_ALG_BURST); 789 if (db == NULL) { 790 printf("Error creating burst distributor\n"); 791 return -1; 792 } 793 } else { 794 rte_distributor_flush(db); 795 rte_distributor_clear_returns(db); 796 } 797 798 if (ds == NULL) { 799 ds = rte_distributor_create("Test_dist_single", 800 rte_socket_id(), 801 rte_lcore_count() - 1, 802 RTE_DIST_ALG_SINGLE); 803 if (ds == NULL) { 804 printf("Error creating single distributor\n"); 805 return -1; 806 } 807 } else { 808 rte_distributor_flush(ds); 809 rte_distributor_clear_returns(ds); 810 } 811 812 const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ? 813 (BIG_BATCH * 2) - 1 : (511 * rte_lcore_count()); 814 if (p == NULL) { 815 p = rte_pktmbuf_pool_create("DT_MBUF_POOL", nb_bufs, BURST, 816 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); 817 if (p == NULL) { 818 printf("Error creating mempool\n"); 819 return -1; 820 } 821 } 822 823 dist[0] = ds; 824 dist[1] = db; 825 826 for (i = 0; i < 2; i++) { 827 828 worker_params.dist = dist[i]; 829 if (i) 830 strlcpy(worker_params.name, "burst", 831 sizeof(worker_params.name)); 832 else 833 strlcpy(worker_params.name, "single", 834 sizeof(worker_params.name)); 835 836 rte_eal_mp_remote_launch(handle_work, 837 &worker_params, SKIP_MASTER); 838 if (sanity_test(&worker_params, p) < 0) 839 goto err; 840 quit_workers(&worker_params, p); 841 842 rte_eal_mp_remote_launch(handle_work_with_free_mbufs, 843 &worker_params, SKIP_MASTER); 844 if (sanity_test_with_mbuf_alloc(&worker_params, p) < 0) 845 goto err; 846 quit_workers(&worker_params, p); 847 848 if (rte_lcore_count() > 2) { 849 rte_eal_mp_remote_launch(handle_work_for_shutdown_test, 850 &worker_params, 851 SKIP_MASTER); 852 if (sanity_test_with_worker_shutdown(&worker_params, 853 p) < 0) 854 goto err; 855 quit_workers(&worker_params, p); 856 857 rte_eal_mp_remote_launch(handle_work_for_shutdown_test, 858 &worker_params, 859 SKIP_MASTER); 860 if (test_flush_with_worker_shutdown(&worker_params, 861 p) < 0) 862 goto err; 863 quit_workers(&worker_params, p); 864 865 rte_eal_mp_remote_launch(handle_and_mark_work, 866 &worker_params, SKIP_MASTER); 867 if (sanity_mark_test(&worker_params, p) < 0) 868 goto err; 869 quit_workers(&worker_params, p); 870 871 } else { 872 printf("Too few cores to run worker shutdown test\n"); 873 } 874 875 } 876 877 if (test_error_distributor_create_numworkers() == -1 || 878 test_error_distributor_create_name() == -1) { 879 printf("rte_distributor_create parameter check tests failed"); 880 return -1; 881 } 882 883 return 0; 884 885 err: 886 quit_workers(&worker_params, p); 887 return -1; 888 } 889 890 REGISTER_TEST_COMMAND(distributor_autotest, test_distributor); 891