1 /*- 2 * BSD LICENSE 3 * 4 * Copyright(c) 2010-2014 Intel Corporation. All rights reserved. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "test.h" 35 36 #include <unistd.h> 37 #include <string.h> 38 #include <rte_cycles.h> 39 #include <rte_errno.h> 40 #include <rte_mempool.h> 41 #include <rte_mbuf.h> 42 #include <rte_distributor.h> 43 44 #define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */ 45 #define BURST 32 46 #define BIG_BATCH 1024 47 48 /* statics - all zero-initialized by default */ 49 static volatile int quit; /**< general quit variable for all threads */ 50 static volatile int zero_quit; /**< var for when we just want thr0 to quit*/ 51 static volatile unsigned worker_idx; 52 53 struct worker_stats { 54 volatile unsigned handled_packets; 55 } __rte_cache_aligned; 56 struct worker_stats worker_stats[RTE_MAX_LCORE]; 57 58 /* returns the total count of the number of packets handled by the worker 59 * functions given below. 60 */ 61 static inline unsigned 62 total_packet_count(void) 63 { 64 unsigned i, count = 0; 65 for (i = 0; i < worker_idx; i++) 66 count += worker_stats[i].handled_packets; 67 return count; 68 } 69 70 /* resets the packet counts for a new test */ 71 static inline void 72 clear_packet_count(void) 73 { 74 memset(&worker_stats, 0, sizeof(worker_stats)); 75 } 76 77 /* this is the basic worker function for sanity test 78 * it does nothing but return packets and count them. 79 */ 80 static int 81 handle_work(void *arg) 82 { 83 struct rte_mbuf *pkt = NULL; 84 struct rte_distributor *d = arg; 85 unsigned count = 0; 86 unsigned id = __sync_fetch_and_add(&worker_idx, 1); 87 88 pkt = rte_distributor_get_pkt(d, id, NULL); 89 while (!quit) { 90 worker_stats[id].handled_packets++, count++; 91 pkt = rte_distributor_get_pkt(d, id, pkt); 92 } 93 worker_stats[id].handled_packets++, count++; 94 rte_distributor_return_pkt(d, id, pkt); 95 return 0; 96 } 97 98 /* do basic sanity testing of the distributor. This test tests the following: 99 * - send 32 packets through distributor with the same tag and ensure they 100 * all go to the one worker 101 * - send 32 packets throught the distributor with two different tags and 102 * verify that they go equally to two different workers. 103 * - send 32 packets with different tags through the distributors and 104 * just verify we get all packets back. 105 * - send 1024 packets through the distributor, gathering the returned packets 106 * as we go. Then verify that we correctly got all 1024 pointers back again, 107 * not necessarily in the same order (as different flows). 108 */ 109 static int 110 sanity_test(struct rte_distributor *d, struct rte_mempool *p) 111 { 112 struct rte_mbuf *bufs[BURST]; 113 unsigned i; 114 115 printf("=== Basic distributor sanity tests ===\n"); 116 clear_packet_count(); 117 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) { 118 printf("line %d: Error getting mbufs from pool\n", __LINE__); 119 return -1; 120 } 121 122 /* now set all hash values in all buffers to zero, so all pkts go to the 123 * one worker thread */ 124 for (i = 0; i < BURST; i++) 125 bufs[i]->hash.usr = 0; 126 127 rte_distributor_process(d, bufs, BURST); 128 rte_distributor_flush(d); 129 if (total_packet_count() != BURST) { 130 printf("Line %d: Error, not all packets flushed. " 131 "Expected %u, got %u\n", 132 __LINE__, BURST, total_packet_count()); 133 return -1; 134 } 135 136 for (i = 0; i < rte_lcore_count() - 1; i++) 137 printf("Worker %u handled %u packets\n", i, 138 worker_stats[i].handled_packets); 139 printf("Sanity test with all zero hashes done.\n"); 140 if (worker_stats[0].handled_packets != BURST) 141 return -1; 142 143 /* pick two flows and check they go correctly */ 144 if (rte_lcore_count() >= 3) { 145 clear_packet_count(); 146 for (i = 0; i < BURST; i++) 147 bufs[i]->hash.usr = (i & 1) << 8; 148 149 rte_distributor_process(d, bufs, BURST); 150 rte_distributor_flush(d); 151 if (total_packet_count() != BURST) { 152 printf("Line %d: Error, not all packets flushed. " 153 "Expected %u, got %u\n", 154 __LINE__, BURST, total_packet_count()); 155 return -1; 156 } 157 158 for (i = 0; i < rte_lcore_count() - 1; i++) 159 printf("Worker %u handled %u packets\n", i, 160 worker_stats[i].handled_packets); 161 printf("Sanity test with two hash values done\n"); 162 163 if (worker_stats[0].handled_packets != 16 || 164 worker_stats[1].handled_packets != 16) 165 return -1; 166 } 167 168 /* give a different hash value to each packet, 169 * so load gets distributed */ 170 clear_packet_count(); 171 for (i = 0; i < BURST; i++) 172 bufs[i]->hash.usr = i; 173 174 rte_distributor_process(d, bufs, BURST); 175 rte_distributor_flush(d); 176 if (total_packet_count() != BURST) { 177 printf("Line %d: Error, not all packets flushed. " 178 "Expected %u, got %u\n", 179 __LINE__, BURST, total_packet_count()); 180 return -1; 181 } 182 183 for (i = 0; i < rte_lcore_count() - 1; i++) 184 printf("Worker %u handled %u packets\n", i, 185 worker_stats[i].handled_packets); 186 printf("Sanity test with non-zero hashes done\n"); 187 188 rte_mempool_put_bulk(p, (void *)bufs, BURST); 189 190 /* sanity test with BIG_BATCH packets to ensure they all arrived back 191 * from the returned packets function */ 192 clear_packet_count(); 193 struct rte_mbuf *many_bufs[BIG_BATCH], *return_bufs[BIG_BATCH]; 194 unsigned num_returned = 0; 195 196 /* flush out any remaining packets */ 197 rte_distributor_flush(d); 198 rte_distributor_clear_returns(d); 199 if (rte_mempool_get_bulk(p, (void *)many_bufs, BIG_BATCH) != 0) { 200 printf("line %d: Error getting mbufs from pool\n", __LINE__); 201 return -1; 202 } 203 for (i = 0; i < BIG_BATCH; i++) 204 many_bufs[i]->hash.usr = i << 2; 205 206 for (i = 0; i < BIG_BATCH/BURST; i++) { 207 rte_distributor_process(d, &many_bufs[i*BURST], BURST); 208 num_returned += rte_distributor_returned_pkts(d, 209 &return_bufs[num_returned], 210 BIG_BATCH - num_returned); 211 } 212 rte_distributor_flush(d); 213 num_returned += rte_distributor_returned_pkts(d, 214 &return_bufs[num_returned], BIG_BATCH - num_returned); 215 216 if (num_returned != BIG_BATCH) { 217 printf("line %d: Number returned is not the same as " 218 "number sent\n", __LINE__); 219 return -1; 220 } 221 /* big check - make sure all packets made it back!! */ 222 for (i = 0; i < BIG_BATCH; i++) { 223 unsigned j; 224 struct rte_mbuf *src = many_bufs[i]; 225 for (j = 0; j < BIG_BATCH; j++) 226 if (return_bufs[j] == src) 227 break; 228 229 if (j == BIG_BATCH) { 230 printf("Error: could not find source packet #%u\n", i); 231 return -1; 232 } 233 } 234 printf("Sanity test of returned packets done\n"); 235 236 rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH); 237 238 printf("\n"); 239 return 0; 240 } 241 242 243 /* to test that the distributor does not lose packets, we use this worker 244 * function which frees mbufs when it gets them. The distributor thread does 245 * the mbuf allocation. If distributor drops packets we'll eventually run out 246 * of mbufs. 247 */ 248 static int 249 handle_work_with_free_mbufs(void *arg) 250 { 251 struct rte_mbuf *pkt = NULL; 252 struct rte_distributor *d = arg; 253 unsigned count = 0; 254 unsigned id = __sync_fetch_and_add(&worker_idx, 1); 255 256 pkt = rte_distributor_get_pkt(d, id, NULL); 257 while (!quit) { 258 worker_stats[id].handled_packets++, count++; 259 rte_pktmbuf_free(pkt); 260 pkt = rte_distributor_get_pkt(d, id, pkt); 261 } 262 worker_stats[id].handled_packets++, count++; 263 rte_distributor_return_pkt(d, id, pkt); 264 return 0; 265 } 266 267 /* Perform a sanity test of the distributor with a large number of packets, 268 * where we allocate a new set of mbufs for each burst. The workers then 269 * free the mbufs. This ensures that we don't have any packet leaks in the 270 * library. 271 */ 272 static int 273 sanity_test_with_mbuf_alloc(struct rte_distributor *d, struct rte_mempool *p) 274 { 275 unsigned i; 276 struct rte_mbuf *bufs[BURST]; 277 278 printf("=== Sanity test with mbuf alloc/free ===\n"); 279 clear_packet_count(); 280 for (i = 0; i < ((1<<ITER_POWER)); i += BURST) { 281 unsigned j; 282 while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0) 283 rte_distributor_process(d, NULL, 0); 284 for (j = 0; j < BURST; j++) { 285 bufs[j]->hash.usr = (i+j) << 1; 286 rte_mbuf_refcnt_set(bufs[j], 1); 287 } 288 289 rte_distributor_process(d, bufs, BURST); 290 } 291 292 rte_distributor_flush(d); 293 if (total_packet_count() < (1<<ITER_POWER)) { 294 printf("Line %u: Packet count is incorrect, %u, expected %u\n", 295 __LINE__, total_packet_count(), 296 (1<<ITER_POWER)); 297 return -1; 298 } 299 300 printf("Sanity test with mbuf alloc/free passed\n\n"); 301 return 0; 302 } 303 304 static int 305 handle_work_for_shutdown_test(void *arg) 306 { 307 struct rte_mbuf *pkt = NULL; 308 struct rte_distributor *d = arg; 309 unsigned count = 0; 310 const unsigned id = __sync_fetch_and_add(&worker_idx, 1); 311 312 pkt = rte_distributor_get_pkt(d, id, NULL); 313 /* wait for quit single globally, or for worker zero, wait 314 * for zero_quit */ 315 while (!quit && !(id == 0 && zero_quit)) { 316 worker_stats[id].handled_packets++, count++; 317 rte_pktmbuf_free(pkt); 318 pkt = rte_distributor_get_pkt(d, id, NULL); 319 } 320 worker_stats[id].handled_packets++, count++; 321 rte_distributor_return_pkt(d, id, pkt); 322 323 if (id == 0) { 324 /* for worker zero, allow it to restart to pick up last packet 325 * when all workers are shutting down. 326 */ 327 while (zero_quit) 328 usleep(100); 329 pkt = rte_distributor_get_pkt(d, id, NULL); 330 while (!quit) { 331 worker_stats[id].handled_packets++, count++; 332 rte_pktmbuf_free(pkt); 333 pkt = rte_distributor_get_pkt(d, id, NULL); 334 } 335 rte_distributor_return_pkt(d, id, pkt); 336 } 337 return 0; 338 } 339 340 341 /* Perform a sanity test of the distributor with a large number of packets, 342 * where we allocate a new set of mbufs for each burst. The workers then 343 * free the mbufs. This ensures that we don't have any packet leaks in the 344 * library. 345 */ 346 static int 347 sanity_test_with_worker_shutdown(struct rte_distributor *d, 348 struct rte_mempool *p) 349 { 350 struct rte_mbuf *bufs[BURST]; 351 unsigned i; 352 353 printf("=== Sanity test of worker shutdown ===\n"); 354 355 clear_packet_count(); 356 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) { 357 printf("line %d: Error getting mbufs from pool\n", __LINE__); 358 return -1; 359 } 360 361 /* now set all hash values in all buffers to zero, so all pkts go to the 362 * one worker thread */ 363 for (i = 0; i < BURST; i++) 364 bufs[i]->hash.usr = 0; 365 366 rte_distributor_process(d, bufs, BURST); 367 /* at this point, we will have processed some packets and have a full 368 * backlog for the other ones at worker 0. 369 */ 370 371 /* get more buffers to queue up, again setting them to the same flow */ 372 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) { 373 printf("line %d: Error getting mbufs from pool\n", __LINE__); 374 return -1; 375 } 376 for (i = 0; i < BURST; i++) 377 bufs[i]->hash.usr = 0; 378 379 /* get worker zero to quit */ 380 zero_quit = 1; 381 rte_distributor_process(d, bufs, BURST); 382 383 /* flush the distributor */ 384 rte_distributor_flush(d); 385 if (total_packet_count() != BURST * 2) { 386 printf("Line %d: Error, not all packets flushed. " 387 "Expected %u, got %u\n", 388 __LINE__, BURST * 2, total_packet_count()); 389 return -1; 390 } 391 392 for (i = 0; i < rte_lcore_count() - 1; i++) 393 printf("Worker %u handled %u packets\n", i, 394 worker_stats[i].handled_packets); 395 396 printf("Sanity test with worker shutdown passed\n\n"); 397 return 0; 398 } 399 400 /* Test that the flush function is able to move packets between workers when 401 * one worker shuts down.. 402 */ 403 static int 404 test_flush_with_worker_shutdown(struct rte_distributor *d, 405 struct rte_mempool *p) 406 { 407 struct rte_mbuf *bufs[BURST]; 408 unsigned i; 409 410 printf("=== Test flush fn with worker shutdown ===\n"); 411 412 clear_packet_count(); 413 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) { 414 printf("line %d: Error getting mbufs from pool\n", __LINE__); 415 return -1; 416 } 417 418 /* now set all hash values in all buffers to zero, so all pkts go to the 419 * one worker thread */ 420 for (i = 0; i < BURST; i++) 421 bufs[i]->hash.usr = 0; 422 423 rte_distributor_process(d, bufs, BURST); 424 /* at this point, we will have processed some packets and have a full 425 * backlog for the other ones at worker 0. 426 */ 427 428 /* get worker zero to quit */ 429 zero_quit = 1; 430 431 /* flush the distributor */ 432 rte_distributor_flush(d); 433 434 zero_quit = 0; 435 if (total_packet_count() != BURST) { 436 printf("Line %d: Error, not all packets flushed. " 437 "Expected %u, got %u\n", 438 __LINE__, BURST, total_packet_count()); 439 return -1; 440 } 441 442 for (i = 0; i < rte_lcore_count() - 1; i++) 443 printf("Worker %u handled %u packets\n", i, 444 worker_stats[i].handled_packets); 445 446 printf("Flush test with worker shutdown passed\n\n"); 447 return 0; 448 } 449 450 static 451 int test_error_distributor_create_name(void) 452 { 453 struct rte_distributor *d = NULL; 454 char *name = NULL; 455 456 d = rte_distributor_create(name, rte_socket_id(), 457 rte_lcore_count() - 1); 458 if (d != NULL || rte_errno != EINVAL) { 459 printf("ERROR: No error on create() with NULL name param\n"); 460 return -1; 461 } 462 463 return 0; 464 } 465 466 467 static 468 int test_error_distributor_create_numworkers(void) 469 { 470 struct rte_distributor *d = NULL; 471 d = rte_distributor_create("test_numworkers", rte_socket_id(), 472 RTE_MAX_LCORE + 10); 473 if (d != NULL || rte_errno != EINVAL) { 474 printf("ERROR: No error on create() with num_workers > MAX\n"); 475 return -1; 476 } 477 return 0; 478 } 479 480 481 /* Useful function which ensures that all worker functions terminate */ 482 static void 483 quit_workers(struct rte_distributor *d, struct rte_mempool *p) 484 { 485 const unsigned num_workers = rte_lcore_count() - 1; 486 unsigned i; 487 struct rte_mbuf *bufs[RTE_MAX_LCORE]; 488 rte_mempool_get_bulk(p, (void *)bufs, num_workers); 489 490 zero_quit = 0; 491 quit = 1; 492 for (i = 0; i < num_workers; i++) 493 bufs[i]->hash.usr = i << 1; 494 rte_distributor_process(d, bufs, num_workers); 495 496 rte_mempool_put_bulk(p, (void *)bufs, num_workers); 497 498 rte_distributor_process(d, NULL, 0); 499 rte_distributor_flush(d); 500 rte_eal_mp_wait_lcore(); 501 quit = 0; 502 worker_idx = 0; 503 } 504 505 static int 506 test_distributor(void) 507 { 508 static struct rte_distributor *d; 509 static struct rte_mempool *p; 510 511 if (rte_lcore_count() < 2) { 512 printf("ERROR: not enough cores to test distributor\n"); 513 return -1; 514 } 515 516 if (d == NULL) { 517 d = rte_distributor_create("Test_distributor", rte_socket_id(), 518 rte_lcore_count() - 1); 519 if (d == NULL) { 520 printf("Error creating distributor\n"); 521 return -1; 522 } 523 } else { 524 rte_distributor_flush(d); 525 rte_distributor_clear_returns(d); 526 } 527 528 const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ? 529 (BIG_BATCH * 2) - 1 : (511 * rte_lcore_count()); 530 if (p == NULL) { 531 p = rte_pktmbuf_pool_create("DT_MBUF_POOL", nb_bufs, BURST, 532 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); 533 if (p == NULL) { 534 printf("Error creating mempool\n"); 535 return -1; 536 } 537 } 538 539 rte_eal_mp_remote_launch(handle_work, d, SKIP_MASTER); 540 if (sanity_test(d, p) < 0) 541 goto err; 542 quit_workers(d, p); 543 544 rte_eal_mp_remote_launch(handle_work_with_free_mbufs, d, SKIP_MASTER); 545 if (sanity_test_with_mbuf_alloc(d, p) < 0) 546 goto err; 547 quit_workers(d, p); 548 549 if (rte_lcore_count() > 2) { 550 rte_eal_mp_remote_launch(handle_work_for_shutdown_test, d, 551 SKIP_MASTER); 552 if (sanity_test_with_worker_shutdown(d, p) < 0) 553 goto err; 554 quit_workers(d, p); 555 556 rte_eal_mp_remote_launch(handle_work_for_shutdown_test, d, 557 SKIP_MASTER); 558 if (test_flush_with_worker_shutdown(d, p) < 0) 559 goto err; 560 quit_workers(d, p); 561 562 } else { 563 printf("Not enough cores to run tests for worker shutdown\n"); 564 } 565 566 if (test_error_distributor_create_numworkers() == -1 || 567 test_error_distributor_create_name() == -1) { 568 printf("rte_distributor_create parameter check tests failed"); 569 return -1; 570 } 571 572 return 0; 573 574 err: 575 quit_workers(d, p); 576 return -1; 577 } 578 579 REGISTER_TEST_COMMAND(distributor_autotest, test_distributor); 580