xref: /dpdk/app/test/test_distributor.c (revision 3c57ec00)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2017 Intel Corporation
3  */
4 
5 #include "test.h"
6 
7 #include <unistd.h>
8 #include <string.h>
9 #include <rte_cycles.h>
10 #include <rte_errno.h>
11 #include <rte_mempool.h>
12 #include <rte_mbuf.h>
13 #include <rte_distributor.h>
14 #include <rte_string_fns.h>
15 
16 #define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */
17 #define BURST 32
18 #define BIG_BATCH 1024
19 
20 struct worker_params {
21 	char name[64];
22 	struct rte_distributor *dist;
23 };
24 
25 struct worker_params worker_params;
26 
27 /* statics - all zero-initialized by default */
28 static volatile int quit;      /**< general quit variable for all threads */
29 static volatile int zero_quit; /**< var for when we just want thr0 to quit*/
30 static volatile int zero_sleep; /**< thr0 has quit basic loop and is sleeping*/
31 static volatile unsigned worker_idx;
32 static volatile unsigned zero_idx;
33 
34 struct worker_stats {
35 	volatile unsigned handled_packets;
36 } __rte_cache_aligned;
37 struct worker_stats worker_stats[RTE_MAX_LCORE];
38 
39 /* returns the total count of the number of packets handled by the worker
40  * functions given below.
41  */
42 static inline unsigned
43 total_packet_count(void)
44 {
45 	unsigned i, count = 0;
46 	for (i = 0; i < worker_idx; i++)
47 		count += __atomic_load_n(&worker_stats[i].handled_packets,
48 				__ATOMIC_RELAXED);
49 	return count;
50 }
51 
52 /* resets the packet counts for a new test */
53 static inline void
54 clear_packet_count(void)
55 {
56 	unsigned int i;
57 	for (i = 0; i < RTE_MAX_LCORE; i++)
58 		__atomic_store_n(&worker_stats[i].handled_packets, 0,
59 			__ATOMIC_RELAXED);
60 }
61 
62 /* this is the basic worker function for sanity test
63  * it does nothing but return packets and count them.
64  */
65 static int
66 handle_work(void *arg)
67 {
68 	struct rte_mbuf *buf[8] __rte_cache_aligned;
69 	struct worker_params *wp = arg;
70 	struct rte_distributor *db = wp->dist;
71 	unsigned int num;
72 	unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
73 
74 	num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
75 	while (!quit) {
76 		__atomic_fetch_add(&worker_stats[id].handled_packets, num,
77 				__ATOMIC_RELAXED);
78 		num = rte_distributor_get_pkt(db, id,
79 				buf, buf, num);
80 	}
81 	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
82 			__ATOMIC_RELAXED);
83 	rte_distributor_return_pkt(db, id, buf, num);
84 	return 0;
85 }
86 
87 /* do basic sanity testing of the distributor. This test tests the following:
88  * - send 32 packets through distributor with the same tag and ensure they
89  *   all go to the one worker
90  * - send 32 packets through the distributor with two different tags and
91  *   verify that they go equally to two different workers.
92  * - send 32 packets with different tags through the distributors and
93  *   just verify we get all packets back.
94  * - send 1024 packets through the distributor, gathering the returned packets
95  *   as we go. Then verify that we correctly got all 1024 pointers back again,
96  *   not necessarily in the same order (as different flows).
97  */
98 static int
99 sanity_test(struct worker_params *wp, struct rte_mempool *p)
100 {
101 	struct rte_distributor *db = wp->dist;
102 	struct rte_mbuf *bufs[BURST];
103 	struct rte_mbuf *returns[BURST*2];
104 	unsigned int i, count;
105 	unsigned int retries;
106 
107 	printf("=== Basic distributor sanity tests ===\n");
108 	clear_packet_count();
109 	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
110 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
111 		return -1;
112 	}
113 
114 	/* now set all hash values in all buffers to zero, so all pkts go to the
115 	 * one worker thread */
116 	for (i = 0; i < BURST; i++)
117 		bufs[i]->hash.usr = 0;
118 
119 	rte_distributor_process(db, bufs, BURST);
120 	count = 0;
121 	do {
122 
123 		rte_distributor_flush(db);
124 		count += rte_distributor_returned_pkts(db,
125 				returns, BURST*2);
126 	} while (count < BURST);
127 
128 	if (total_packet_count() != BURST) {
129 		printf("Line %d: Error, not all packets flushed. "
130 				"Expected %u, got %u\n",
131 				__LINE__, BURST, total_packet_count());
132 		return -1;
133 	}
134 
135 	for (i = 0; i < rte_lcore_count() - 1; i++)
136 		printf("Worker %u handled %u packets\n", i,
137 			__atomic_load_n(&worker_stats[i].handled_packets,
138 					__ATOMIC_RELAXED));
139 	printf("Sanity test with all zero hashes done.\n");
140 
141 	/* pick two flows and check they go correctly */
142 	if (rte_lcore_count() >= 3) {
143 		clear_packet_count();
144 		for (i = 0; i < BURST; i++)
145 			bufs[i]->hash.usr = (i & 1) << 8;
146 
147 		rte_distributor_process(db, bufs, BURST);
148 		count = 0;
149 		do {
150 			rte_distributor_flush(db);
151 			count += rte_distributor_returned_pkts(db,
152 					returns, BURST*2);
153 		} while (count < BURST);
154 		if (total_packet_count() != BURST) {
155 			printf("Line %d: Error, not all packets flushed. "
156 					"Expected %u, got %u\n",
157 					__LINE__, BURST, total_packet_count());
158 			return -1;
159 		}
160 
161 		for (i = 0; i < rte_lcore_count() - 1; i++)
162 			printf("Worker %u handled %u packets\n", i,
163 				__atomic_load_n(
164 					&worker_stats[i].handled_packets,
165 					__ATOMIC_RELAXED));
166 		printf("Sanity test with two hash values done\n");
167 	}
168 
169 	/* give a different hash value to each packet,
170 	 * so load gets distributed */
171 	clear_packet_count();
172 	for (i = 0; i < BURST; i++)
173 		bufs[i]->hash.usr = i+1;
174 
175 	rte_distributor_process(db, bufs, BURST);
176 	count = 0;
177 	do {
178 		rte_distributor_flush(db);
179 		count += rte_distributor_returned_pkts(db,
180 				returns, BURST*2);
181 	} while (count < BURST);
182 	if (total_packet_count() != BURST) {
183 		printf("Line %d: Error, not all packets flushed. "
184 				"Expected %u, got %u\n",
185 				__LINE__, BURST, total_packet_count());
186 		return -1;
187 	}
188 
189 	for (i = 0; i < rte_lcore_count() - 1; i++)
190 		printf("Worker %u handled %u packets\n", i,
191 			__atomic_load_n(&worker_stats[i].handled_packets,
192 					__ATOMIC_RELAXED));
193 	printf("Sanity test with non-zero hashes done\n");
194 
195 	rte_mempool_put_bulk(p, (void *)bufs, BURST);
196 
197 	/* sanity test with BIG_BATCH packets to ensure they all arrived back
198 	 * from the returned packets function */
199 	clear_packet_count();
200 	struct rte_mbuf *many_bufs[BIG_BATCH], *return_bufs[BIG_BATCH];
201 	unsigned num_returned = 0;
202 
203 	/* flush out any remaining packets */
204 	rte_distributor_flush(db);
205 	rte_distributor_clear_returns(db);
206 
207 	if (rte_mempool_get_bulk(p, (void *)many_bufs, BIG_BATCH) != 0) {
208 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
209 		return -1;
210 	}
211 	for (i = 0; i < BIG_BATCH; i++)
212 		many_bufs[i]->hash.usr = i << 2;
213 
214 	printf("=== testing big burst (%s) ===\n", wp->name);
215 	for (i = 0; i < BIG_BATCH/BURST; i++) {
216 		rte_distributor_process(db,
217 				&many_bufs[i*BURST], BURST);
218 		count = rte_distributor_returned_pkts(db,
219 				&return_bufs[num_returned],
220 				BIG_BATCH - num_returned);
221 		num_returned += count;
222 	}
223 	rte_distributor_flush(db);
224 	count = rte_distributor_returned_pkts(db,
225 		&return_bufs[num_returned],
226 			BIG_BATCH - num_returned);
227 	num_returned += count;
228 	retries = 0;
229 	do {
230 		rte_distributor_flush(db);
231 		count = rte_distributor_returned_pkts(db,
232 				&return_bufs[num_returned],
233 				BIG_BATCH - num_returned);
234 		num_returned += count;
235 		retries++;
236 	} while ((num_returned < BIG_BATCH) && (retries < 100));
237 
238 	if (num_returned != BIG_BATCH) {
239 		printf("line %d: Missing packets, expected %d\n",
240 				__LINE__, num_returned);
241 		return -1;
242 	}
243 
244 	/* big check -  make sure all packets made it back!! */
245 	for (i = 0; i < BIG_BATCH; i++) {
246 		unsigned j;
247 		struct rte_mbuf *src = many_bufs[i];
248 		for (j = 0; j < BIG_BATCH; j++) {
249 			if (return_bufs[j] == src)
250 				break;
251 		}
252 
253 		if (j == BIG_BATCH) {
254 			printf("Error: could not find source packet #%u\n", i);
255 			return -1;
256 		}
257 	}
258 	printf("Sanity test of returned packets done\n");
259 
260 	rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH);
261 
262 	printf("\n");
263 	return 0;
264 }
265 
266 
267 /* to test that the distributor does not lose packets, we use this worker
268  * function which frees mbufs when it gets them. The distributor thread does
269  * the mbuf allocation. If distributor drops packets we'll eventually run out
270  * of mbufs.
271  */
272 static int
273 handle_work_with_free_mbufs(void *arg)
274 {
275 	struct rte_mbuf *buf[8] __rte_cache_aligned;
276 	struct worker_params *wp = arg;
277 	struct rte_distributor *d = wp->dist;
278 	unsigned int i;
279 	unsigned int num;
280 	unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
281 
282 	num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
283 	while (!quit) {
284 		__atomic_fetch_add(&worker_stats[id].handled_packets, num,
285 				__ATOMIC_RELAXED);
286 		for (i = 0; i < num; i++)
287 			rte_pktmbuf_free(buf[i]);
288 		num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
289 	}
290 	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
291 			__ATOMIC_RELAXED);
292 	rte_distributor_return_pkt(d, id, buf, num);
293 	return 0;
294 }
295 
296 /* Perform a sanity test of the distributor with a large number of packets,
297  * where we allocate a new set of mbufs for each burst. The workers then
298  * free the mbufs. This ensures that we don't have any packet leaks in the
299  * library.
300  */
301 static int
302 sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p)
303 {
304 	struct rte_distributor *d = wp->dist;
305 	unsigned i;
306 	struct rte_mbuf *bufs[BURST];
307 
308 	printf("=== Sanity test with mbuf alloc/free (%s) ===\n", wp->name);
309 
310 	clear_packet_count();
311 	for (i = 0; i < ((1<<ITER_POWER)); i += BURST) {
312 		unsigned j;
313 		while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0)
314 			rte_distributor_process(d, NULL, 0);
315 		for (j = 0; j < BURST; j++) {
316 			bufs[j]->hash.usr = (i+j) << 1;
317 		}
318 
319 		rte_distributor_process(d, bufs, BURST);
320 	}
321 
322 	rte_distributor_flush(d);
323 
324 	rte_delay_us(10000);
325 
326 	if (total_packet_count() < (1<<ITER_POWER)) {
327 		printf("Line %u: Packet count is incorrect, %u, expected %u\n",
328 				__LINE__, total_packet_count(),
329 				(1<<ITER_POWER));
330 		return -1;
331 	}
332 
333 	printf("Sanity test with mbuf alloc/free passed\n\n");
334 	return 0;
335 }
336 
337 static int
338 handle_work_for_shutdown_test(void *arg)
339 {
340 	struct rte_mbuf *buf[8] __rte_cache_aligned;
341 	struct worker_params *wp = arg;
342 	struct rte_distributor *d = wp->dist;
343 	unsigned int num;
344 	unsigned int zero_id = 0;
345 	unsigned int zero_unset;
346 	const unsigned int id = __atomic_fetch_add(&worker_idx, 1,
347 			__ATOMIC_RELAXED);
348 
349 	num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
350 
351 	if (num > 0) {
352 		zero_unset = RTE_MAX_LCORE;
353 		__atomic_compare_exchange_n(&zero_idx, &zero_unset, id,
354 			false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE);
355 	}
356 	zero_id = __atomic_load_n(&zero_idx, __ATOMIC_ACQUIRE);
357 
358 	/* wait for quit single globally, or for worker zero, wait
359 	 * for zero_quit */
360 	while (!quit && !(id == zero_id && zero_quit)) {
361 		__atomic_fetch_add(&worker_stats[id].handled_packets, num,
362 				__ATOMIC_RELAXED);
363 		num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
364 
365 		if (num > 0) {
366 			zero_unset = RTE_MAX_LCORE;
367 			__atomic_compare_exchange_n(&zero_idx, &zero_unset, id,
368 				false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE);
369 		}
370 		zero_id = __atomic_load_n(&zero_idx, __ATOMIC_ACQUIRE);
371 	}
372 
373 	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
374 			__ATOMIC_RELAXED);
375 	if (id == zero_id) {
376 		rte_distributor_return_pkt(d, id, NULL, 0);
377 
378 		/* for worker zero, allow it to restart to pick up last packet
379 		 * when all workers are shutting down.
380 		 */
381 		__atomic_store_n(&zero_sleep, 1, __ATOMIC_RELEASE);
382 		while (zero_quit)
383 			usleep(100);
384 		__atomic_store_n(&zero_sleep, 0, __ATOMIC_RELEASE);
385 
386 		num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
387 
388 		while (!quit) {
389 			__atomic_fetch_add(&worker_stats[id].handled_packets,
390 					num, __ATOMIC_RELAXED);
391 			num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
392 		}
393 	}
394 	rte_distributor_return_pkt(d, id, buf, num);
395 	return 0;
396 }
397 
398 
399 /* Perform a sanity test of the distributor with a large number of packets,
400  * where we allocate a new set of mbufs for each burst. The workers then
401  * free the mbufs. This ensures that we don't have any packet leaks in the
402  * library.
403  */
404 static int
405 sanity_test_with_worker_shutdown(struct worker_params *wp,
406 		struct rte_mempool *p)
407 {
408 	struct rte_distributor *d = wp->dist;
409 	struct rte_mbuf *bufs[BURST];
410 	struct rte_mbuf *bufs2[BURST];
411 	unsigned int i;
412 	unsigned int failed = 0;
413 
414 	printf("=== Sanity test of worker shutdown ===\n");
415 
416 	clear_packet_count();
417 
418 	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
419 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
420 		return -1;
421 	}
422 
423 	/*
424 	 * Now set all hash values in all buffers to same value so all
425 	 * pkts go to the one worker thread
426 	 */
427 	for (i = 0; i < BURST; i++)
428 		bufs[i]->hash.usr = 1;
429 
430 	rte_distributor_process(d, bufs, BURST);
431 	rte_distributor_flush(d);
432 
433 	/* at this point, we will have processed some packets and have a full
434 	 * backlog for the other ones at worker 0.
435 	 */
436 
437 	/* get more buffers to queue up, again setting them to the same flow */
438 	if (rte_mempool_get_bulk(p, (void *)bufs2, BURST) != 0) {
439 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
440 		rte_mempool_put_bulk(p, (void *)bufs, BURST);
441 		return -1;
442 	}
443 	for (i = 0; i < BURST; i++)
444 		bufs2[i]->hash.usr = 1;
445 
446 	/* get worker zero to quit */
447 	zero_quit = 1;
448 	rte_distributor_process(d, bufs2, BURST);
449 
450 	/* flush the distributor */
451 	rte_distributor_flush(d);
452 	while (!__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
453 		rte_distributor_flush(d);
454 
455 	zero_quit = 0;
456 	while (__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
457 		rte_delay_us(100);
458 
459 	for (i = 0; i < rte_lcore_count() - 1; i++)
460 		printf("Worker %u handled %u packets\n", i,
461 			__atomic_load_n(&worker_stats[i].handled_packets,
462 					__ATOMIC_RELAXED));
463 
464 	if (total_packet_count() != BURST * 2) {
465 		printf("Line %d: Error, not all packets flushed. "
466 				"Expected %u, got %u\n",
467 				__LINE__, BURST * 2, total_packet_count());
468 		failed = 1;
469 	}
470 
471 	rte_mempool_put_bulk(p, (void *)bufs, BURST);
472 	rte_mempool_put_bulk(p, (void *)bufs2, BURST);
473 
474 	if (failed)
475 		return -1;
476 
477 	printf("Sanity test with worker shutdown passed\n\n");
478 	return 0;
479 }
480 
481 /* Test that the flush function is able to move packets between workers when
482  * one worker shuts down..
483  */
484 static int
485 test_flush_with_worker_shutdown(struct worker_params *wp,
486 		struct rte_mempool *p)
487 {
488 	struct rte_distributor *d = wp->dist;
489 	struct rte_mbuf *bufs[BURST];
490 	unsigned int i;
491 	unsigned int failed = 0;
492 
493 	printf("=== Test flush fn with worker shutdown (%s) ===\n", wp->name);
494 
495 	clear_packet_count();
496 	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
497 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
498 		return -1;
499 	}
500 
501 	/* now set all hash values in all buffers to zero, so all pkts go to the
502 	 * one worker thread */
503 	for (i = 0; i < BURST; i++)
504 		bufs[i]->hash.usr = 0;
505 
506 	rte_distributor_process(d, bufs, BURST);
507 	/* at this point, we will have processed some packets and have a full
508 	 * backlog for the other ones at worker 0.
509 	 */
510 
511 	/* get worker zero to quit */
512 	zero_quit = 1;
513 
514 	/* flush the distributor */
515 	rte_distributor_flush(d);
516 
517 	while (!__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
518 		rte_distributor_flush(d);
519 
520 	zero_quit = 0;
521 
522 	while (__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
523 		rte_delay_us(100);
524 
525 	for (i = 0; i < rte_lcore_count() - 1; i++)
526 		printf("Worker %u handled %u packets\n", i,
527 			__atomic_load_n(&worker_stats[i].handled_packets,
528 					__ATOMIC_RELAXED));
529 
530 	if (total_packet_count() != BURST) {
531 		printf("Line %d: Error, not all packets flushed. "
532 				"Expected %u, got %u\n",
533 				__LINE__, BURST, total_packet_count());
534 		failed = 1;
535 	}
536 
537 	rte_mempool_put_bulk(p, (void *)bufs, BURST);
538 
539 	if (failed)
540 		return -1;
541 
542 	printf("Flush test with worker shutdown passed\n\n");
543 	return 0;
544 }
545 
546 static int
547 handle_and_mark_work(void *arg)
548 {
549 	struct rte_mbuf *buf[8] __rte_cache_aligned;
550 	struct worker_params *wp = arg;
551 	struct rte_distributor *db = wp->dist;
552 	unsigned int num, i;
553 	unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
554 	num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
555 	while (!quit) {
556 		__atomic_fetch_add(&worker_stats[id].handled_packets, num,
557 				__ATOMIC_RELAXED);
558 		for (i = 0; i < num; i++)
559 			buf[i]->udata64 += id + 1;
560 		num = rte_distributor_get_pkt(db, id,
561 				buf, buf, num);
562 	}
563 	__atomic_fetch_add(&worker_stats[id].handled_packets, num,
564 			__ATOMIC_RELAXED);
565 	rte_distributor_return_pkt(db, id, buf, num);
566 	return 0;
567 }
568 
569 /* sanity_mark_test sends packets to workers which mark them.
570  * Every packet has also encoded sequence number.
571  * The returned packets are sorted and verified if they were handled
572  * by proper workers.
573  */
574 static int
575 sanity_mark_test(struct worker_params *wp, struct rte_mempool *p)
576 {
577 	const unsigned int buf_count = 24;
578 	const unsigned int burst = 8;
579 	const unsigned int shift = 12;
580 	const unsigned int seq_shift = 10;
581 
582 	struct rte_distributor *db = wp->dist;
583 	struct rte_mbuf *bufs[buf_count];
584 	struct rte_mbuf *returns[buf_count];
585 	unsigned int i, count, id;
586 	unsigned int sorted[buf_count], seq;
587 	unsigned int failed = 0;
588 
589 	printf("=== Marked packets test ===\n");
590 	clear_packet_count();
591 	if (rte_mempool_get_bulk(p, (void *)bufs, buf_count) != 0) {
592 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
593 		return -1;
594 	}
595 
596 	/* bufs' hashes will be like these below, but shifted left.
597 	 * The shifting is for avoiding collisions with backlogs
598 	 * and in-flight tags left by previous tests.
599 	 * [1, 1, 1, 1, 1, 1, 1, 1
600 	 *  1, 1, 1, 1, 2, 2, 2, 2
601 	 *  2, 2, 2, 2, 1, 1, 1, 1]
602 	 */
603 	for (i = 0; i < burst; i++) {
604 		bufs[0 * burst + i]->hash.usr = 1 << shift;
605 		bufs[1 * burst + i]->hash.usr = ((i < burst / 2) ? 1 : 2)
606 			<< shift;
607 		bufs[2 * burst + i]->hash.usr = ((i < burst / 2) ? 2 : 1)
608 			<< shift;
609 	}
610 	/* Assign a sequence number to each packet. The sequence is shifted,
611 	 * so that lower bits of the udate64 will hold mark from worker.
612 	 */
613 	for (i = 0; i < buf_count; i++)
614 		bufs[i]->udata64 = i << seq_shift;
615 
616 	count = 0;
617 	for (i = 0; i < buf_count/burst; i++) {
618 		rte_distributor_process(db, &bufs[i * burst], burst);
619 		count += rte_distributor_returned_pkts(db, &returns[count],
620 			buf_count - count);
621 	}
622 
623 	do {
624 		rte_distributor_flush(db);
625 		count += rte_distributor_returned_pkts(db, &returns[count],
626 			buf_count - count);
627 	} while (count < buf_count);
628 
629 	for (i = 0; i < rte_lcore_count() - 1; i++)
630 		printf("Worker %u handled %u packets\n", i,
631 			__atomic_load_n(&worker_stats[i].handled_packets,
632 					__ATOMIC_RELAXED));
633 
634 	/* Sort returned packets by sent order (sequence numbers). */
635 	for (i = 0; i < buf_count; i++) {
636 		seq = returns[i]->udata64 >> seq_shift;
637 		id = returns[i]->udata64 - (seq << seq_shift);
638 		sorted[seq] = id;
639 	}
640 
641 	/* Verify that packets [0-11] and [20-23] were processed
642 	 * by the same worker
643 	 */
644 	for (i = 1; i < 12; i++) {
645 		if (sorted[i] != sorted[0]) {
646 			printf("Packet number %u processed by worker %u,"
647 				" but should be processes by worker %u\n",
648 				i, sorted[i], sorted[0]);
649 			failed = 1;
650 		}
651 	}
652 	for (i = 20; i < 24; i++) {
653 		if (sorted[i] != sorted[0]) {
654 			printf("Packet number %u processed by worker %u,"
655 				" but should be processes by worker %u\n",
656 				i, sorted[i], sorted[0]);
657 			failed = 1;
658 		}
659 	}
660 	/* And verify that packets [12-19] were processed
661 	 * by the another worker
662 	 */
663 	for (i = 13; i < 20; i++) {
664 		if (sorted[i] != sorted[12]) {
665 			printf("Packet number %u processed by worker %u,"
666 				" but should be processes by worker %u\n",
667 				i, sorted[i], sorted[12]);
668 			failed = 1;
669 		}
670 	}
671 
672 	rte_mempool_put_bulk(p, (void *)bufs, buf_count);
673 
674 	if (failed)
675 		return -1;
676 
677 	printf("Marked packets test passed\n");
678 	return 0;
679 }
680 
681 static
682 int test_error_distributor_create_name(void)
683 {
684 	struct rte_distributor *d = NULL;
685 	struct rte_distributor *db = NULL;
686 	char *name = NULL;
687 
688 	d = rte_distributor_create(name, rte_socket_id(),
689 			rte_lcore_count() - 1,
690 			RTE_DIST_ALG_SINGLE);
691 	if (d != NULL || rte_errno != EINVAL) {
692 		printf("ERROR: No error on create() with NULL name param\n");
693 		return -1;
694 	}
695 
696 	db = rte_distributor_create(name, rte_socket_id(),
697 			rte_lcore_count() - 1,
698 			RTE_DIST_ALG_BURST);
699 	if (db != NULL || rte_errno != EINVAL) {
700 		printf("ERROR: No error on create() with NULL param\n");
701 		return -1;
702 	}
703 
704 	return 0;
705 }
706 
707 
708 static
709 int test_error_distributor_create_numworkers(void)
710 {
711 	struct rte_distributor *ds = NULL;
712 	struct rte_distributor *db = NULL;
713 
714 	ds = rte_distributor_create("test_numworkers", rte_socket_id(),
715 			RTE_MAX_LCORE + 10,
716 			RTE_DIST_ALG_SINGLE);
717 	if (ds != NULL || rte_errno != EINVAL) {
718 		printf("ERROR: No error on create() with num_workers > MAX\n");
719 		return -1;
720 	}
721 
722 	db = rte_distributor_create("test_numworkers", rte_socket_id(),
723 			RTE_MAX_LCORE + 10,
724 			RTE_DIST_ALG_BURST);
725 	if (db != NULL || rte_errno != EINVAL) {
726 		printf("ERROR: No error on create() num_workers > MAX\n");
727 		return -1;
728 	}
729 
730 	return 0;
731 }
732 
733 
734 /* Useful function which ensures that all worker functions terminate */
735 static void
736 quit_workers(struct worker_params *wp, struct rte_mempool *p)
737 {
738 	struct rte_distributor *d = wp->dist;
739 	const unsigned num_workers = rte_lcore_count() - 1;
740 	unsigned i;
741 	struct rte_mbuf *bufs[RTE_MAX_LCORE];
742 	struct rte_mbuf *returns[RTE_MAX_LCORE];
743 	if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) {
744 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
745 		return;
746 	}
747 
748 	zero_quit = 0;
749 	quit = 1;
750 	for (i = 0; i < num_workers; i++)
751 		bufs[i]->hash.usr = i << 1;
752 	rte_distributor_process(d, bufs, num_workers);
753 
754 	rte_distributor_process(d, NULL, 0);
755 	rte_distributor_flush(d);
756 	rte_eal_mp_wait_lcore();
757 
758 	while (rte_distributor_returned_pkts(d, returns, RTE_MAX_LCORE))
759 		;
760 
761 	rte_distributor_clear_returns(d);
762 	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
763 
764 	quit = 0;
765 	worker_idx = 0;
766 	zero_idx = RTE_MAX_LCORE;
767 	zero_quit = 0;
768 	zero_sleep = 0;
769 }
770 
771 static int
772 test_distributor(void)
773 {
774 	static struct rte_distributor *ds;
775 	static struct rte_distributor *db;
776 	static struct rte_distributor *dist[2];
777 	static struct rte_mempool *p;
778 	int i;
779 
780 	if (rte_lcore_count() < 2) {
781 		printf("Not enough cores for distributor_autotest, expecting at least 2\n");
782 		return TEST_SKIPPED;
783 	}
784 
785 	if (db == NULL) {
786 		db = rte_distributor_create("Test_dist_burst", rte_socket_id(),
787 				rte_lcore_count() - 1,
788 				RTE_DIST_ALG_BURST);
789 		if (db == NULL) {
790 			printf("Error creating burst distributor\n");
791 			return -1;
792 		}
793 	} else {
794 		rte_distributor_flush(db);
795 		rte_distributor_clear_returns(db);
796 	}
797 
798 	if (ds == NULL) {
799 		ds = rte_distributor_create("Test_dist_single",
800 				rte_socket_id(),
801 				rte_lcore_count() - 1,
802 			RTE_DIST_ALG_SINGLE);
803 		if (ds == NULL) {
804 			printf("Error creating single distributor\n");
805 			return -1;
806 		}
807 	} else {
808 		rte_distributor_flush(ds);
809 		rte_distributor_clear_returns(ds);
810 	}
811 
812 	const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ?
813 			(BIG_BATCH * 2) - 1 : (511 * rte_lcore_count());
814 	if (p == NULL) {
815 		p = rte_pktmbuf_pool_create("DT_MBUF_POOL", nb_bufs, BURST,
816 			0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
817 		if (p == NULL) {
818 			printf("Error creating mempool\n");
819 			return -1;
820 		}
821 	}
822 
823 	dist[0] = ds;
824 	dist[1] = db;
825 
826 	for (i = 0; i < 2; i++) {
827 
828 		worker_params.dist = dist[i];
829 		if (i)
830 			strlcpy(worker_params.name, "burst",
831 					sizeof(worker_params.name));
832 		else
833 			strlcpy(worker_params.name, "single",
834 					sizeof(worker_params.name));
835 
836 		rte_eal_mp_remote_launch(handle_work,
837 				&worker_params, SKIP_MASTER);
838 		if (sanity_test(&worker_params, p) < 0)
839 			goto err;
840 		quit_workers(&worker_params, p);
841 
842 		rte_eal_mp_remote_launch(handle_work_with_free_mbufs,
843 				&worker_params, SKIP_MASTER);
844 		if (sanity_test_with_mbuf_alloc(&worker_params, p) < 0)
845 			goto err;
846 		quit_workers(&worker_params, p);
847 
848 		if (rte_lcore_count() > 2) {
849 			rte_eal_mp_remote_launch(handle_work_for_shutdown_test,
850 					&worker_params,
851 					SKIP_MASTER);
852 			if (sanity_test_with_worker_shutdown(&worker_params,
853 					p) < 0)
854 				goto err;
855 			quit_workers(&worker_params, p);
856 
857 			rte_eal_mp_remote_launch(handle_work_for_shutdown_test,
858 					&worker_params,
859 					SKIP_MASTER);
860 			if (test_flush_with_worker_shutdown(&worker_params,
861 					p) < 0)
862 				goto err;
863 			quit_workers(&worker_params, p);
864 
865 			rte_eal_mp_remote_launch(handle_and_mark_work,
866 					&worker_params, SKIP_MASTER);
867 			if (sanity_mark_test(&worker_params, p) < 0)
868 				goto err;
869 			quit_workers(&worker_params, p);
870 
871 		} else {
872 			printf("Too few cores to run worker shutdown test\n");
873 		}
874 
875 	}
876 
877 	if (test_error_distributor_create_numworkers() == -1 ||
878 			test_error_distributor_create_name() == -1) {
879 		printf("rte_distributor_create parameter check tests failed");
880 		return -1;
881 	}
882 
883 	return 0;
884 
885 err:
886 	quit_workers(&worker_params, p);
887 	return -1;
888 }
889 
890 REGISTER_TEST_COMMAND(distributor_autotest, test_distributor);
891