1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2010-2017 Intel Corporation
3 */
4
5 #include "test.h"
6
7 #include <unistd.h>
8 #include <string.h>
9 #include <rte_cycles.h>
10 #include <rte_errno.h>
11 #include <rte_mempool.h>
12 #include <rte_mbuf.h>
13 #include <rte_mbuf_dyn.h>
14 #include <rte_distributor.h>
15 #include <rte_string_fns.h>
16
17 #define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */
18 #define BURST 32
19 #define BIG_BATCH 1024
20
21 typedef uint32_t seq_dynfield_t;
22 static int seq_dynfield_offset = -1;
23
24 static inline seq_dynfield_t *
seq_field(struct rte_mbuf * mbuf)25 seq_field(struct rte_mbuf *mbuf)
26 {
27 return RTE_MBUF_DYNFIELD(mbuf, seq_dynfield_offset, seq_dynfield_t *);
28 }
29
30 struct worker_params {
31 char name[64];
32 struct rte_distributor *dist;
33 };
34
35 struct worker_params worker_params;
36
37 /* statics - all zero-initialized by default */
38 static volatile int quit; /**< general quit variable for all threads */
39 static volatile int zero_quit; /**< var for when we just want thr0 to quit*/
40 static volatile int zero_sleep; /**< thr0 has quit basic loop and is sleeping*/
41 static volatile unsigned worker_idx;
42 static volatile unsigned zero_idx;
43
44 struct worker_stats {
45 volatile unsigned handled_packets;
46 } __rte_cache_aligned;
47 struct worker_stats worker_stats[RTE_MAX_LCORE];
48
49 /* returns the total count of the number of packets handled by the worker
50 * functions given below.
51 */
52 static inline unsigned
total_packet_count(void)53 total_packet_count(void)
54 {
55 unsigned i, count = 0;
56 for (i = 0; i < worker_idx; i++)
57 count += __atomic_load_n(&worker_stats[i].handled_packets,
58 __ATOMIC_RELAXED);
59 return count;
60 }
61
62 /* resets the packet counts for a new test */
63 static inline void
clear_packet_count(void)64 clear_packet_count(void)
65 {
66 unsigned int i;
67 for (i = 0; i < RTE_MAX_LCORE; i++)
68 __atomic_store_n(&worker_stats[i].handled_packets, 0,
69 __ATOMIC_RELAXED);
70 }
71
72 /* this is the basic worker function for sanity test
73 * it does nothing but return packets and count them.
74 */
75 static int
handle_work(void * arg)76 handle_work(void *arg)
77 {
78 struct rte_mbuf *buf[8] __rte_cache_aligned;
79 struct worker_params *wp = arg;
80 struct rte_distributor *db = wp->dist;
81 unsigned int num;
82 unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
83
84 num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
85 while (!quit) {
86 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
87 __ATOMIC_RELAXED);
88 num = rte_distributor_get_pkt(db, id,
89 buf, buf, num);
90 }
91 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
92 __ATOMIC_RELAXED);
93 rte_distributor_return_pkt(db, id, buf, num);
94 return 0;
95 }
96
97 /* do basic sanity testing of the distributor. This test tests the following:
98 * - send 32 packets through distributor with the same tag and ensure they
99 * all go to the one worker
100 * - send 32 packets through the distributor with two different tags and
101 * verify that they go equally to two different workers.
102 * - send 32 packets with different tags through the distributors and
103 * just verify we get all packets back.
104 * - send 1024 packets through the distributor, gathering the returned packets
105 * as we go. Then verify that we correctly got all 1024 pointers back again,
106 * not necessarily in the same order (as different flows).
107 */
108 static int
sanity_test(struct worker_params * wp,struct rte_mempool * p)109 sanity_test(struct worker_params *wp, struct rte_mempool *p)
110 {
111 struct rte_distributor *db = wp->dist;
112 struct rte_mbuf *bufs[BURST];
113 struct rte_mbuf *returns[BURST*2];
114 unsigned int i, count;
115 unsigned int retries;
116 unsigned int processed;
117
118 printf("=== Basic distributor sanity tests ===\n");
119 clear_packet_count();
120 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
121 printf("line %d: Error getting mbufs from pool\n", __LINE__);
122 return -1;
123 }
124
125 /* now set all hash values in all buffers to zero, so all pkts go to the
126 * one worker thread */
127 for (i = 0; i < BURST; i++)
128 bufs[i]->hash.usr = 0;
129
130 processed = 0;
131 while (processed < BURST)
132 processed += rte_distributor_process(db, &bufs[processed],
133 BURST - processed);
134
135 count = 0;
136 do {
137
138 rte_distributor_flush(db);
139 count += rte_distributor_returned_pkts(db,
140 returns, BURST*2);
141 } while (count < BURST);
142
143 if (total_packet_count() != BURST) {
144 printf("Line %d: Error, not all packets flushed. "
145 "Expected %u, got %u\n",
146 __LINE__, BURST, total_packet_count());
147 rte_mempool_put_bulk(p, (void *)bufs, BURST);
148 return -1;
149 }
150
151 for (i = 0; i < rte_lcore_count() - 1; i++)
152 printf("Worker %u handled %u packets\n", i,
153 __atomic_load_n(&worker_stats[i].handled_packets,
154 __ATOMIC_RELAXED));
155 printf("Sanity test with all zero hashes done.\n");
156
157 /* pick two flows and check they go correctly */
158 if (rte_lcore_count() >= 3) {
159 clear_packet_count();
160 for (i = 0; i < BURST; i++)
161 bufs[i]->hash.usr = (i & 1) << 8;
162
163 rte_distributor_process(db, bufs, BURST);
164 count = 0;
165 do {
166 rte_distributor_flush(db);
167 count += rte_distributor_returned_pkts(db,
168 returns, BURST*2);
169 } while (count < BURST);
170 if (total_packet_count() != BURST) {
171 printf("Line %d: Error, not all packets flushed. "
172 "Expected %u, got %u\n",
173 __LINE__, BURST, total_packet_count());
174 rte_mempool_put_bulk(p, (void *)bufs, BURST);
175 return -1;
176 }
177
178 for (i = 0; i < rte_lcore_count() - 1; i++)
179 printf("Worker %u handled %u packets\n", i,
180 __atomic_load_n(
181 &worker_stats[i].handled_packets,
182 __ATOMIC_RELAXED));
183 printf("Sanity test with two hash values done\n");
184 }
185
186 /* give a different hash value to each packet,
187 * so load gets distributed */
188 clear_packet_count();
189 for (i = 0; i < BURST; i++)
190 bufs[i]->hash.usr = i+1;
191
192 rte_distributor_process(db, bufs, BURST);
193 count = 0;
194 do {
195 rte_distributor_flush(db);
196 count += rte_distributor_returned_pkts(db,
197 returns, BURST*2);
198 } while (count < BURST);
199 if (total_packet_count() != BURST) {
200 printf("Line %d: Error, not all packets flushed. "
201 "Expected %u, got %u\n",
202 __LINE__, BURST, total_packet_count());
203 rte_mempool_put_bulk(p, (void *)bufs, BURST);
204 return -1;
205 }
206
207 for (i = 0; i < rte_lcore_count() - 1; i++)
208 printf("Worker %u handled %u packets\n", i,
209 __atomic_load_n(&worker_stats[i].handled_packets,
210 __ATOMIC_RELAXED));
211 printf("Sanity test with non-zero hashes done\n");
212
213 rte_mempool_put_bulk(p, (void *)bufs, BURST);
214
215 /* sanity test with BIG_BATCH packets to ensure they all arrived back
216 * from the returned packets function */
217 clear_packet_count();
218 struct rte_mbuf *many_bufs[BIG_BATCH], *return_bufs[BIG_BATCH];
219 unsigned num_returned = 0;
220
221 /* flush out any remaining packets */
222 rte_distributor_flush(db);
223 rte_distributor_clear_returns(db);
224
225 if (rte_mempool_get_bulk(p, (void *)many_bufs, BIG_BATCH) != 0) {
226 printf("line %d: Error getting mbufs from pool\n", __LINE__);
227 return -1;
228 }
229 for (i = 0; i < BIG_BATCH; i++)
230 many_bufs[i]->hash.usr = i << 2;
231
232 printf("=== testing big burst (%s) ===\n", wp->name);
233 for (i = 0; i < BIG_BATCH/BURST; i++) {
234 rte_distributor_process(db,
235 &many_bufs[i*BURST], BURST);
236 count = rte_distributor_returned_pkts(db,
237 &return_bufs[num_returned],
238 BIG_BATCH - num_returned);
239 num_returned += count;
240 }
241 rte_distributor_flush(db);
242 count = rte_distributor_returned_pkts(db,
243 &return_bufs[num_returned],
244 BIG_BATCH - num_returned);
245 num_returned += count;
246 retries = 0;
247 do {
248 rte_distributor_flush(db);
249 count = rte_distributor_returned_pkts(db,
250 &return_bufs[num_returned],
251 BIG_BATCH - num_returned);
252 num_returned += count;
253 retries++;
254 } while ((num_returned < BIG_BATCH) && (retries < 100));
255
256 if (num_returned != BIG_BATCH) {
257 printf("line %d: Missing packets, expected %d\n",
258 __LINE__, num_returned);
259 rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH);
260 return -1;
261 }
262
263 /* big check - make sure all packets made it back!! */
264 for (i = 0; i < BIG_BATCH; i++) {
265 unsigned j;
266 struct rte_mbuf *src = many_bufs[i];
267 for (j = 0; j < BIG_BATCH; j++) {
268 if (return_bufs[j] == src)
269 break;
270 }
271
272 if (j == BIG_BATCH) {
273 printf("Error: could not find source packet #%u\n", i);
274 rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH);
275 return -1;
276 }
277 }
278 printf("Sanity test of returned packets done\n");
279
280 rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH);
281
282 printf("\n");
283 return 0;
284 }
285
286
287 /* to test that the distributor does not lose packets, we use this worker
288 * function which frees mbufs when it gets them. The distributor thread does
289 * the mbuf allocation. If distributor drops packets we'll eventually run out
290 * of mbufs.
291 */
292 static int
handle_work_with_free_mbufs(void * arg)293 handle_work_with_free_mbufs(void *arg)
294 {
295 struct rte_mbuf *buf[8] __rte_cache_aligned;
296 struct worker_params *wp = arg;
297 struct rte_distributor *d = wp->dist;
298 unsigned int i;
299 unsigned int num;
300 unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
301
302 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
303 while (!quit) {
304 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
305 __ATOMIC_RELAXED);
306 for (i = 0; i < num; i++)
307 rte_pktmbuf_free(buf[i]);
308 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
309 }
310 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
311 __ATOMIC_RELAXED);
312 rte_distributor_return_pkt(d, id, buf, num);
313 return 0;
314 }
315
316 /* Perform a sanity test of the distributor with a large number of packets,
317 * where we allocate a new set of mbufs for each burst. The workers then
318 * free the mbufs. This ensures that we don't have any packet leaks in the
319 * library.
320 */
321 static int
sanity_test_with_mbuf_alloc(struct worker_params * wp,struct rte_mempool * p)322 sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p)
323 {
324 struct rte_distributor *d = wp->dist;
325 unsigned i;
326 struct rte_mbuf *bufs[BURST];
327 unsigned int processed;
328
329 printf("=== Sanity test with mbuf alloc/free (%s) ===\n", wp->name);
330
331 clear_packet_count();
332 for (i = 0; i < ((1<<ITER_POWER)); i += BURST) {
333 unsigned j;
334 while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0)
335 rte_distributor_process(d, NULL, 0);
336 for (j = 0; j < BURST; j++) {
337 bufs[j]->hash.usr = (i+j) << 1;
338 }
339
340 processed = 0;
341 while (processed < BURST)
342 processed += rte_distributor_process(d,
343 &bufs[processed], BURST - processed);
344 }
345
346 rte_distributor_flush(d);
347
348 rte_delay_us(10000);
349
350 if (total_packet_count() < (1<<ITER_POWER)) {
351 printf("Line %u: Packet count is incorrect, %u, expected %u\n",
352 __LINE__, total_packet_count(),
353 (1<<ITER_POWER));
354 return -1;
355 }
356
357 printf("Sanity test with mbuf alloc/free passed\n\n");
358 return 0;
359 }
360
361 static int
handle_work_for_shutdown_test(void * arg)362 handle_work_for_shutdown_test(void *arg)
363 {
364 struct rte_mbuf *buf[8] __rte_cache_aligned;
365 struct worker_params *wp = arg;
366 struct rte_distributor *d = wp->dist;
367 unsigned int num;
368 unsigned int zero_id = 0;
369 unsigned int zero_unset;
370 const unsigned int id = __atomic_fetch_add(&worker_idx, 1,
371 __ATOMIC_RELAXED);
372
373 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
374
375 if (num > 0) {
376 zero_unset = RTE_MAX_LCORE;
377 __atomic_compare_exchange_n(&zero_idx, &zero_unset, id,
378 false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE);
379 }
380 zero_id = __atomic_load_n(&zero_idx, __ATOMIC_ACQUIRE);
381
382 /* wait for quit single globally, or for worker zero, wait
383 * for zero_quit */
384 while (!quit && !(id == zero_id && zero_quit)) {
385 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
386 __ATOMIC_RELAXED);
387 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
388
389 if (num > 0) {
390 zero_unset = RTE_MAX_LCORE;
391 __atomic_compare_exchange_n(&zero_idx, &zero_unset, id,
392 false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE);
393 }
394 zero_id = __atomic_load_n(&zero_idx, __ATOMIC_ACQUIRE);
395 }
396
397 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
398 __ATOMIC_RELAXED);
399 if (id == zero_id) {
400 rte_distributor_return_pkt(d, id, NULL, 0);
401
402 /* for worker zero, allow it to restart to pick up last packet
403 * when all workers are shutting down.
404 */
405 __atomic_store_n(&zero_sleep, 1, __ATOMIC_RELEASE);
406 while (zero_quit)
407 usleep(100);
408 __atomic_store_n(&zero_sleep, 0, __ATOMIC_RELEASE);
409
410 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
411
412 while (!quit) {
413 __atomic_fetch_add(&worker_stats[id].handled_packets,
414 num, __ATOMIC_RELAXED);
415 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
416 }
417 }
418 rte_distributor_return_pkt(d, id, buf, num);
419 return 0;
420 }
421
422
423 /* Perform a sanity test of the distributor with a large number of packets,
424 * where we allocate a new set of mbufs for each burst. The workers then
425 * free the mbufs. This ensures that we don't have any packet leaks in the
426 * library.
427 */
428 static int
sanity_test_with_worker_shutdown(struct worker_params * wp,struct rte_mempool * p)429 sanity_test_with_worker_shutdown(struct worker_params *wp,
430 struct rte_mempool *p)
431 {
432 struct rte_distributor *d = wp->dist;
433 struct rte_mbuf *bufs[BURST];
434 struct rte_mbuf *bufs2[BURST];
435 unsigned int i;
436 unsigned int failed = 0;
437 unsigned int processed = 0;
438
439 printf("=== Sanity test of worker shutdown ===\n");
440
441 clear_packet_count();
442
443 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
444 printf("line %d: Error getting mbufs from pool\n", __LINE__);
445 return -1;
446 }
447
448 /*
449 * Now set all hash values in all buffers to same value so all
450 * pkts go to the one worker thread
451 */
452 for (i = 0; i < BURST; i++)
453 bufs[i]->hash.usr = 1;
454
455 processed = 0;
456 while (processed < BURST)
457 processed += rte_distributor_process(d, &bufs[processed],
458 BURST - processed);
459 rte_distributor_flush(d);
460
461 /* at this point, we will have processed some packets and have a full
462 * backlog for the other ones at worker 0.
463 */
464
465 /* get more buffers to queue up, again setting them to the same flow */
466 if (rte_mempool_get_bulk(p, (void *)bufs2, BURST) != 0) {
467 printf("line %d: Error getting mbufs from pool\n", __LINE__);
468 rte_mempool_put_bulk(p, (void *)bufs, BURST);
469 return -1;
470 }
471 for (i = 0; i < BURST; i++)
472 bufs2[i]->hash.usr = 1;
473
474 /* get worker zero to quit */
475 zero_quit = 1;
476 rte_distributor_process(d, bufs2, BURST);
477
478 /* flush the distributor */
479 rte_distributor_flush(d);
480 while (!__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
481 rte_distributor_flush(d);
482
483 zero_quit = 0;
484 while (__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
485 rte_delay_us(100);
486
487 for (i = 0; i < rte_lcore_count() - 1; i++)
488 printf("Worker %u handled %u packets\n", i,
489 __atomic_load_n(&worker_stats[i].handled_packets,
490 __ATOMIC_RELAXED));
491
492 if (total_packet_count() != BURST * 2) {
493 printf("Line %d: Error, not all packets flushed. "
494 "Expected %u, got %u\n",
495 __LINE__, BURST * 2, total_packet_count());
496 failed = 1;
497 }
498
499 rte_mempool_put_bulk(p, (void *)bufs, BURST);
500 rte_mempool_put_bulk(p, (void *)bufs2, BURST);
501
502 if (failed)
503 return -1;
504
505 printf("Sanity test with worker shutdown passed\n\n");
506 return 0;
507 }
508
509 /* Test that the flush function is able to move packets between workers when
510 * one worker shuts down..
511 */
512 static int
test_flush_with_worker_shutdown(struct worker_params * wp,struct rte_mempool * p)513 test_flush_with_worker_shutdown(struct worker_params *wp,
514 struct rte_mempool *p)
515 {
516 struct rte_distributor *d = wp->dist;
517 struct rte_mbuf *bufs[BURST];
518 unsigned int i;
519 unsigned int failed = 0;
520 unsigned int processed;
521
522 printf("=== Test flush fn with worker shutdown (%s) ===\n", wp->name);
523
524 clear_packet_count();
525 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
526 printf("line %d: Error getting mbufs from pool\n", __LINE__);
527 return -1;
528 }
529
530 /* now set all hash values in all buffers to zero, so all pkts go to the
531 * one worker thread */
532 for (i = 0; i < BURST; i++)
533 bufs[i]->hash.usr = 0;
534
535 processed = 0;
536 while (processed < BURST)
537 processed += rte_distributor_process(d, &bufs[processed],
538 BURST - processed);
539 /* at this point, we will have processed some packets and have a full
540 * backlog for the other ones at worker 0.
541 */
542
543 /* get worker zero to quit */
544 zero_quit = 1;
545
546 /* flush the distributor */
547 rte_distributor_flush(d);
548
549 while (!__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
550 rte_distributor_flush(d);
551
552 zero_quit = 0;
553
554 while (__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
555 rte_delay_us(100);
556
557 for (i = 0; i < rte_lcore_count() - 1; i++)
558 printf("Worker %u handled %u packets\n", i,
559 __atomic_load_n(&worker_stats[i].handled_packets,
560 __ATOMIC_RELAXED));
561
562 if (total_packet_count() != BURST) {
563 printf("Line %d: Error, not all packets flushed. "
564 "Expected %u, got %u\n",
565 __LINE__, BURST, total_packet_count());
566 failed = 1;
567 }
568
569 rte_mempool_put_bulk(p, (void *)bufs, BURST);
570
571 if (failed)
572 return -1;
573
574 printf("Flush test with worker shutdown passed\n\n");
575 return 0;
576 }
577
578 static int
handle_and_mark_work(void * arg)579 handle_and_mark_work(void *arg)
580 {
581 struct rte_mbuf *buf[8] __rte_cache_aligned;
582 struct worker_params *wp = arg;
583 struct rte_distributor *db = wp->dist;
584 unsigned int num, i;
585 unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
586 num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
587 while (!quit) {
588 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
589 __ATOMIC_RELAXED);
590 for (i = 0; i < num; i++)
591 *seq_field(buf[i]) += id + 1;
592 num = rte_distributor_get_pkt(db, id,
593 buf, buf, num);
594 }
595 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
596 __ATOMIC_RELAXED);
597 rte_distributor_return_pkt(db, id, buf, num);
598 return 0;
599 }
600
601 /* sanity_mark_test sends packets to workers which mark them.
602 * Every packet has also encoded sequence number.
603 * The returned packets are sorted and verified if they were handled
604 * by proper workers.
605 */
606 static int
sanity_mark_test(struct worker_params * wp,struct rte_mempool * p)607 sanity_mark_test(struct worker_params *wp, struct rte_mempool *p)
608 {
609 const unsigned int buf_count = 24;
610 const unsigned int burst = 8;
611 const unsigned int shift = 12;
612 const unsigned int seq_shift = 10;
613
614 struct rte_distributor *db = wp->dist;
615 struct rte_mbuf *bufs[buf_count];
616 struct rte_mbuf *returns[buf_count];
617 unsigned int i, count, id;
618 unsigned int sorted[buf_count], seq;
619 unsigned int failed = 0;
620 unsigned int processed;
621
622 printf("=== Marked packets test ===\n");
623 clear_packet_count();
624 if (rte_mempool_get_bulk(p, (void *)bufs, buf_count) != 0) {
625 printf("line %d: Error getting mbufs from pool\n", __LINE__);
626 return -1;
627 }
628
629 /* bufs' hashes will be like these below, but shifted left.
630 * The shifting is for avoiding collisions with backlogs
631 * and in-flight tags left by previous tests.
632 * [1, 1, 1, 1, 1, 1, 1, 1
633 * 1, 1, 1, 1, 2, 2, 2, 2
634 * 2, 2, 2, 2, 1, 1, 1, 1]
635 */
636 for (i = 0; i < burst; i++) {
637 bufs[0 * burst + i]->hash.usr = 1 << shift;
638 bufs[1 * burst + i]->hash.usr = ((i < burst / 2) ? 1 : 2)
639 << shift;
640 bufs[2 * burst + i]->hash.usr = ((i < burst / 2) ? 2 : 1)
641 << shift;
642 }
643 /* Assign a sequence number to each packet. The sequence is shifted,
644 * so that lower bits will hold mark from worker.
645 */
646 for (i = 0; i < buf_count; i++)
647 *seq_field(bufs[i]) = i << seq_shift;
648
649 count = 0;
650 for (i = 0; i < buf_count/burst; i++) {
651 processed = 0;
652 while (processed < burst)
653 processed += rte_distributor_process(db,
654 &bufs[i * burst + processed],
655 burst - processed);
656 count += rte_distributor_returned_pkts(db, &returns[count],
657 buf_count - count);
658 }
659
660 do {
661 rte_distributor_flush(db);
662 count += rte_distributor_returned_pkts(db, &returns[count],
663 buf_count - count);
664 } while (count < buf_count);
665
666 for (i = 0; i < rte_lcore_count() - 1; i++)
667 printf("Worker %u handled %u packets\n", i,
668 __atomic_load_n(&worker_stats[i].handled_packets,
669 __ATOMIC_RELAXED));
670
671 /* Sort returned packets by sent order (sequence numbers). */
672 for (i = 0; i < buf_count; i++) {
673 seq = *seq_field(returns[i]) >> seq_shift;
674 id = *seq_field(returns[i]) - (seq << seq_shift);
675 sorted[seq] = id;
676 }
677
678 /* Verify that packets [0-11] and [20-23] were processed
679 * by the same worker
680 */
681 for (i = 1; i < 12; i++) {
682 if (sorted[i] != sorted[0]) {
683 printf("Packet number %u processed by worker %u,"
684 " but should be processes by worker %u\n",
685 i, sorted[i], sorted[0]);
686 failed = 1;
687 }
688 }
689 for (i = 20; i < 24; i++) {
690 if (sorted[i] != sorted[0]) {
691 printf("Packet number %u processed by worker %u,"
692 " but should be processes by worker %u\n",
693 i, sorted[i], sorted[0]);
694 failed = 1;
695 }
696 }
697 /* And verify that packets [12-19] were processed
698 * by the another worker
699 */
700 for (i = 13; i < 20; i++) {
701 if (sorted[i] != sorted[12]) {
702 printf("Packet number %u processed by worker %u,"
703 " but should be processes by worker %u\n",
704 i, sorted[i], sorted[12]);
705 failed = 1;
706 }
707 }
708
709 rte_mempool_put_bulk(p, (void *)bufs, buf_count);
710
711 if (failed)
712 return -1;
713
714 printf("Marked packets test passed\n");
715 return 0;
716 }
717
718 static
test_error_distributor_create_name(void)719 int test_error_distributor_create_name(void)
720 {
721 struct rte_distributor *d = NULL;
722 struct rte_distributor *db = NULL;
723 char *name = NULL;
724
725 d = rte_distributor_create(name, rte_socket_id(),
726 rte_lcore_count() - 1,
727 RTE_DIST_ALG_SINGLE);
728 if (d != NULL || rte_errno != EINVAL) {
729 printf("ERROR: No error on create() with NULL name param\n");
730 return -1;
731 }
732
733 db = rte_distributor_create(name, rte_socket_id(),
734 rte_lcore_count() - 1,
735 RTE_DIST_ALG_BURST);
736 if (db != NULL || rte_errno != EINVAL) {
737 printf("ERROR: No error on create() with NULL param\n");
738 return -1;
739 }
740
741 return 0;
742 }
743
744
745 static
test_error_distributor_create_numworkers(void)746 int test_error_distributor_create_numworkers(void)
747 {
748 struct rte_distributor *ds = NULL;
749 struct rte_distributor *db = NULL;
750
751 ds = rte_distributor_create("test_numworkers", rte_socket_id(),
752 RTE_MAX_LCORE + 10,
753 RTE_DIST_ALG_SINGLE);
754 if (ds != NULL || rte_errno != EINVAL) {
755 printf("ERROR: No error on create() with num_workers > MAX\n");
756 return -1;
757 }
758
759 db = rte_distributor_create("test_numworkers", rte_socket_id(),
760 RTE_MAX_LCORE + 10,
761 RTE_DIST_ALG_BURST);
762 if (db != NULL || rte_errno != EINVAL) {
763 printf("ERROR: No error on create() num_workers > MAX\n");
764 return -1;
765 }
766
767 return 0;
768 }
769
770
771 /* Useful function which ensures that all worker functions terminate */
772 static void
quit_workers(struct worker_params * wp,struct rte_mempool * p)773 quit_workers(struct worker_params *wp, struct rte_mempool *p)
774 {
775 struct rte_distributor *d = wp->dist;
776 const unsigned num_workers = rte_lcore_count() - 1;
777 unsigned i;
778 struct rte_mbuf *bufs[RTE_MAX_LCORE];
779 struct rte_mbuf *returns[RTE_MAX_LCORE];
780 if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) {
781 printf("line %d: Error getting mbufs from pool\n", __LINE__);
782 return;
783 }
784
785 zero_quit = 0;
786 quit = 1;
787 for (i = 0; i < num_workers; i++) {
788 bufs[i]->hash.usr = i << 1;
789 rte_distributor_process(d, &bufs[i], 1);
790 }
791
792 rte_distributor_process(d, NULL, 0);
793 rte_distributor_flush(d);
794 rte_eal_mp_wait_lcore();
795
796 while (rte_distributor_returned_pkts(d, returns, RTE_MAX_LCORE))
797 ;
798
799 rte_distributor_clear_returns(d);
800 rte_mempool_put_bulk(p, (void *)bufs, num_workers);
801
802 quit = 0;
803 worker_idx = 0;
804 zero_idx = RTE_MAX_LCORE;
805 zero_quit = 0;
806 zero_sleep = 0;
807 }
808
809 static int
test_distributor(void)810 test_distributor(void)
811 {
812 static struct rte_distributor *ds;
813 static struct rte_distributor *db;
814 static struct rte_distributor *dist[2];
815 static struct rte_mempool *p;
816 int i;
817
818 static const struct rte_mbuf_dynfield seq_dynfield_desc = {
819 .name = "test_distributor_dynfield_seq",
820 .size = sizeof(seq_dynfield_t),
821 .align = __alignof__(seq_dynfield_t),
822 };
823 seq_dynfield_offset =
824 rte_mbuf_dynfield_register(&seq_dynfield_desc);
825 if (seq_dynfield_offset < 0) {
826 printf("Error registering mbuf field\n");
827 return TEST_FAILED;
828 }
829
830 if (rte_lcore_count() < 2) {
831 printf("Not enough cores for distributor_autotest, expecting at least 2\n");
832 return TEST_SKIPPED;
833 }
834
835 if (db == NULL) {
836 db = rte_distributor_create("Test_dist_burst", rte_socket_id(),
837 rte_lcore_count() - 1,
838 RTE_DIST_ALG_BURST);
839 if (db == NULL) {
840 printf("Error creating burst distributor\n");
841 return -1;
842 }
843 } else {
844 rte_distributor_flush(db);
845 rte_distributor_clear_returns(db);
846 }
847
848 if (ds == NULL) {
849 ds = rte_distributor_create("Test_dist_single",
850 rte_socket_id(),
851 rte_lcore_count() - 1,
852 RTE_DIST_ALG_SINGLE);
853 if (ds == NULL) {
854 printf("Error creating single distributor\n");
855 return -1;
856 }
857 } else {
858 rte_distributor_flush(ds);
859 rte_distributor_clear_returns(ds);
860 }
861
862 const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ?
863 (BIG_BATCH * 2) - 1 : (511 * rte_lcore_count());
864 if (p == NULL) {
865 p = rte_pktmbuf_pool_create("DT_MBUF_POOL", nb_bufs, BURST,
866 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
867 if (p == NULL) {
868 printf("Error creating mempool\n");
869 return -1;
870 }
871 }
872
873 dist[0] = ds;
874 dist[1] = db;
875
876 for (i = 0; i < 2; i++) {
877
878 worker_params.dist = dist[i];
879 if (i)
880 strlcpy(worker_params.name, "burst",
881 sizeof(worker_params.name));
882 else
883 strlcpy(worker_params.name, "single",
884 sizeof(worker_params.name));
885
886 rte_eal_mp_remote_launch(handle_work,
887 &worker_params, SKIP_MAIN);
888 if (sanity_test(&worker_params, p) < 0)
889 goto err;
890 quit_workers(&worker_params, p);
891
892 rte_eal_mp_remote_launch(handle_work_with_free_mbufs,
893 &worker_params, SKIP_MAIN);
894 if (sanity_test_with_mbuf_alloc(&worker_params, p) < 0)
895 goto err;
896 quit_workers(&worker_params, p);
897
898 if (rte_lcore_count() > 2) {
899 rte_eal_mp_remote_launch(handle_work_for_shutdown_test,
900 &worker_params,
901 SKIP_MAIN);
902 if (sanity_test_with_worker_shutdown(&worker_params,
903 p) < 0)
904 goto err;
905 quit_workers(&worker_params, p);
906
907 rte_eal_mp_remote_launch(handle_work_for_shutdown_test,
908 &worker_params,
909 SKIP_MAIN);
910 if (test_flush_with_worker_shutdown(&worker_params,
911 p) < 0)
912 goto err;
913 quit_workers(&worker_params, p);
914
915 rte_eal_mp_remote_launch(handle_and_mark_work,
916 &worker_params, SKIP_MAIN);
917 if (sanity_mark_test(&worker_params, p) < 0)
918 goto err;
919 quit_workers(&worker_params, p);
920
921 } else {
922 printf("Too few cores to run worker shutdown test\n");
923 }
924
925 }
926
927 if (test_error_distributor_create_numworkers() == -1 ||
928 test_error_distributor_create_name() == -1) {
929 printf("rte_distributor_create parameter check tests failed");
930 return -1;
931 }
932
933 return 0;
934
935 err:
936 quit_workers(&worker_params, p);
937 return -1;
938 }
939
940 REGISTER_TEST_COMMAND(distributor_autotest, test_distributor);
941