xref: /f-stack/dpdk/app/test/test_ring_perf.c (revision ebf5cedb)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2014 Intel Corporation
3  * Copyright(c) 2019 Arm Limited
4  */
5 
6 
7 #include <stdio.h>
8 #include <inttypes.h>
9 #include <rte_ring.h>
10 #include <rte_cycles.h>
11 #include <rte_launch.h>
12 #include <rte_pause.h>
13 #include <string.h>
14 
15 #include "test.h"
16 
17 /*
18  * Ring
19  * ====
20  *
21  * Measures performance of various operations using rdtsc
22  *  * Empty ring dequeue
23  *  * Enqueue/dequeue of bursts in 1 threads
24  *  * Enqueue/dequeue of bursts in 2 threads
25  *  * Enqueue/dequeue of bursts in all available threads
26  */
27 
28 #define RING_NAME "RING_PERF"
29 #define RING_SIZE 4096
30 #define MAX_BURST 32
31 
32 /*
33  * the sizes to enqueue and dequeue in testing
34  * (marked volatile so they won't be seen as compile-time constants)
35  */
36 static const volatile unsigned bulk_sizes[] = { 8, 32 };
37 
38 struct lcore_pair {
39 	unsigned c1, c2;
40 };
41 
42 static volatile unsigned lcore_count = 0;
43 
44 /**** Functions to analyse our core mask to get cores for different tests ***/
45 
46 static int
47 get_two_hyperthreads(struct lcore_pair *lcp)
48 {
49 	unsigned id1, id2;
50 	unsigned c1, c2, s1, s2;
51 	RTE_LCORE_FOREACH(id1) {
52 		/* inner loop just re-reads all id's. We could skip the first few
53 		 * elements, but since number of cores is small there is little point
54 		 */
55 		RTE_LCORE_FOREACH(id2) {
56 			if (id1 == id2)
57 				continue;
58 
59 			c1 = rte_lcore_to_cpu_id(id1);
60 			c2 = rte_lcore_to_cpu_id(id2);
61 			s1 = rte_lcore_to_socket_id(id1);
62 			s2 = rte_lcore_to_socket_id(id2);
63 			if ((c1 == c2) && (s1 == s2)){
64 				lcp->c1 = id1;
65 				lcp->c2 = id2;
66 				return 0;
67 			}
68 		}
69 	}
70 	return 1;
71 }
72 
73 static int
74 get_two_cores(struct lcore_pair *lcp)
75 {
76 	unsigned id1, id2;
77 	unsigned c1, c2, s1, s2;
78 	RTE_LCORE_FOREACH(id1) {
79 		RTE_LCORE_FOREACH(id2) {
80 			if (id1 == id2)
81 				continue;
82 
83 			c1 = rte_lcore_to_cpu_id(id1);
84 			c2 = rte_lcore_to_cpu_id(id2);
85 			s1 = rte_lcore_to_socket_id(id1);
86 			s2 = rte_lcore_to_socket_id(id2);
87 			if ((c1 != c2) && (s1 == s2)){
88 				lcp->c1 = id1;
89 				lcp->c2 = id2;
90 				return 0;
91 			}
92 		}
93 	}
94 	return 1;
95 }
96 
97 static int
98 get_two_sockets(struct lcore_pair *lcp)
99 {
100 	unsigned id1, id2;
101 	unsigned s1, s2;
102 	RTE_LCORE_FOREACH(id1) {
103 		RTE_LCORE_FOREACH(id2) {
104 			if (id1 == id2)
105 				continue;
106 			s1 = rte_lcore_to_socket_id(id1);
107 			s2 = rte_lcore_to_socket_id(id2);
108 			if (s1 != s2){
109 				lcp->c1 = id1;
110 				lcp->c2 = id2;
111 				return 0;
112 			}
113 		}
114 	}
115 	return 1;
116 }
117 
118 /* Get cycle counts for dequeuing from an empty ring. Should be 2 or 3 cycles */
119 static void
120 test_empty_dequeue(struct rte_ring *r)
121 {
122 	const unsigned iter_shift = 26;
123 	const unsigned iterations = 1<<iter_shift;
124 	unsigned i = 0;
125 	void *burst[MAX_BURST];
126 
127 	const uint64_t sc_start = rte_rdtsc();
128 	for (i = 0; i < iterations; i++)
129 		rte_ring_sc_dequeue_bulk(r, burst, bulk_sizes[0], NULL);
130 	const uint64_t sc_end = rte_rdtsc();
131 
132 	const uint64_t mc_start = rte_rdtsc();
133 	for (i = 0; i < iterations; i++)
134 		rte_ring_mc_dequeue_bulk(r, burst, bulk_sizes[0], NULL);
135 	const uint64_t mc_end = rte_rdtsc();
136 
137 	printf("SC empty dequeue: %.2F\n",
138 			(double)(sc_end-sc_start) / iterations);
139 	printf("MC empty dequeue: %.2F\n",
140 			(double)(mc_end-mc_start) / iterations);
141 }
142 
143 /*
144  * for the separate enqueue and dequeue threads they take in one param
145  * and return two. Input = burst size, output = cycle average for sp/sc & mp/mc
146  */
147 struct thread_params {
148 	struct rte_ring *r;
149 	unsigned size;        /* input value, the burst size */
150 	double spsc, mpmc;    /* output value, the single or multi timings */
151 };
152 
153 /*
154  * Function that uses rdtsc to measure timing for ring enqueue. Needs pair
155  * thread running dequeue_bulk function
156  */
157 static int
158 enqueue_bulk(void *p)
159 {
160 	const unsigned iter_shift = 23;
161 	const unsigned iterations = 1<<iter_shift;
162 	struct thread_params *params = p;
163 	struct rte_ring *r = params->r;
164 	const unsigned size = params->size;
165 	unsigned i;
166 	void *burst[MAX_BURST] = {0};
167 
168 #ifdef RTE_USE_C11_MEM_MODEL
169 	if (__atomic_add_fetch(&lcore_count, 1, __ATOMIC_RELAXED) != 2)
170 #else
171 	if (__sync_add_and_fetch(&lcore_count, 1) != 2)
172 #endif
173 		while(lcore_count != 2)
174 			rte_pause();
175 
176 	const uint64_t sp_start = rte_rdtsc();
177 	for (i = 0; i < iterations; i++)
178 		while (rte_ring_sp_enqueue_bulk(r, burst, size, NULL) == 0)
179 			rte_pause();
180 	const uint64_t sp_end = rte_rdtsc();
181 
182 	const uint64_t mp_start = rte_rdtsc();
183 	for (i = 0; i < iterations; i++)
184 		while (rte_ring_mp_enqueue_bulk(r, burst, size, NULL) == 0)
185 			rte_pause();
186 	const uint64_t mp_end = rte_rdtsc();
187 
188 	params->spsc = ((double)(sp_end - sp_start))/(iterations*size);
189 	params->mpmc = ((double)(mp_end - mp_start))/(iterations*size);
190 	return 0;
191 }
192 
193 /*
194  * Function that uses rdtsc to measure timing for ring dequeue. Needs pair
195  * thread running enqueue_bulk function
196  */
197 static int
198 dequeue_bulk(void *p)
199 {
200 	const unsigned iter_shift = 23;
201 	const unsigned iterations = 1<<iter_shift;
202 	struct thread_params *params = p;
203 	struct rte_ring *r = params->r;
204 	const unsigned size = params->size;
205 	unsigned i;
206 	void *burst[MAX_BURST] = {0};
207 
208 #ifdef RTE_USE_C11_MEM_MODEL
209 	if (__atomic_add_fetch(&lcore_count, 1, __ATOMIC_RELAXED) != 2)
210 #else
211 	if (__sync_add_and_fetch(&lcore_count, 1) != 2)
212 #endif
213 		while(lcore_count != 2)
214 			rte_pause();
215 
216 	const uint64_t sc_start = rte_rdtsc();
217 	for (i = 0; i < iterations; i++)
218 		while (rte_ring_sc_dequeue_bulk(r, burst, size, NULL) == 0)
219 			rte_pause();
220 	const uint64_t sc_end = rte_rdtsc();
221 
222 	const uint64_t mc_start = rte_rdtsc();
223 	for (i = 0; i < iterations; i++)
224 		while (rte_ring_mc_dequeue_bulk(r, burst, size, NULL) == 0)
225 			rte_pause();
226 	const uint64_t mc_end = rte_rdtsc();
227 
228 	params->spsc = ((double)(sc_end - sc_start))/(iterations*size);
229 	params->mpmc = ((double)(mc_end - mc_start))/(iterations*size);
230 	return 0;
231 }
232 
233 /*
234  * Function that calls the enqueue and dequeue bulk functions on pairs of cores.
235  * used to measure ring perf between hyperthreads, cores and sockets.
236  */
237 static void
238 run_on_core_pair(struct lcore_pair *cores, struct rte_ring *r,
239 		lcore_function_t f1, lcore_function_t f2)
240 {
241 	struct thread_params param1 = {0}, param2 = {0};
242 	unsigned i;
243 	for (i = 0; i < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); i++) {
244 		lcore_count = 0;
245 		param1.size = param2.size = bulk_sizes[i];
246 		param1.r = param2.r = r;
247 		if (cores->c1 == rte_get_master_lcore()) {
248 			rte_eal_remote_launch(f2, &param2, cores->c2);
249 			f1(&param1);
250 			rte_eal_wait_lcore(cores->c2);
251 		} else {
252 			rte_eal_remote_launch(f1, &param1, cores->c1);
253 			rte_eal_remote_launch(f2, &param2, cores->c2);
254 			rte_eal_wait_lcore(cores->c1);
255 			rte_eal_wait_lcore(cores->c2);
256 		}
257 		printf("SP/SC bulk enq/dequeue (size: %u): %.2F\n", bulk_sizes[i],
258 				param1.spsc + param2.spsc);
259 		printf("MP/MC bulk enq/dequeue (size: %u): %.2F\n", bulk_sizes[i],
260 				param1.mpmc + param2.mpmc);
261 	}
262 }
263 
264 static rte_atomic32_t synchro;
265 static uint64_t queue_count[RTE_MAX_LCORE];
266 
267 #define TIME_MS 100
268 
269 static int
270 load_loop_fn(void *p)
271 {
272 	uint64_t time_diff = 0;
273 	uint64_t begin = 0;
274 	uint64_t hz = rte_get_timer_hz();
275 	uint64_t lcount = 0;
276 	const unsigned int lcore = rte_lcore_id();
277 	struct thread_params *params = p;
278 	void *burst[MAX_BURST] = {0};
279 
280 	/* wait synchro for slaves */
281 	if (lcore != rte_get_master_lcore())
282 		while (rte_atomic32_read(&synchro) == 0)
283 			rte_pause();
284 
285 	begin = rte_get_timer_cycles();
286 	while (time_diff < hz * TIME_MS / 1000) {
287 		rte_ring_mp_enqueue_bulk(params->r, burst, params->size, NULL);
288 		rte_ring_mc_dequeue_bulk(params->r, burst, params->size, NULL);
289 		lcount++;
290 		time_diff = rte_get_timer_cycles() - begin;
291 	}
292 	queue_count[lcore] = lcount;
293 	return 0;
294 }
295 
296 static int
297 run_on_all_cores(struct rte_ring *r)
298 {
299 	uint64_t total = 0;
300 	struct thread_params param;
301 	unsigned int i, c;
302 
303 	memset(&param, 0, sizeof(struct thread_params));
304 	for (i = 0; i < RTE_DIM(bulk_sizes); i++) {
305 		printf("\nBulk enq/dequeue count on size %u\n", bulk_sizes[i]);
306 		param.size = bulk_sizes[i];
307 		param.r = r;
308 
309 		/* clear synchro and start slaves */
310 		rte_atomic32_set(&synchro, 0);
311 		if (rte_eal_mp_remote_launch(load_loop_fn, &param,
312 			SKIP_MASTER) < 0)
313 			return -1;
314 
315 		/* start synchro and launch test on master */
316 		rte_atomic32_set(&synchro, 1);
317 		load_loop_fn(&param);
318 
319 		rte_eal_mp_wait_lcore();
320 
321 		RTE_LCORE_FOREACH(c) {
322 			printf("Core [%u] count = %"PRIu64"\n",
323 					c, queue_count[c]);
324 			total += queue_count[c];
325 		}
326 
327 		printf("Total count (size: %u): %"PRIu64"\n",
328 				bulk_sizes[i], total);
329 	}
330 
331 	return 0;
332 }
333 
334 /*
335  * Test function that determines how long an enqueue + dequeue of a single item
336  * takes on a single lcore. Result is for comparison with the bulk enq+deq.
337  */
338 static void
339 test_single_enqueue_dequeue(struct rte_ring *r)
340 {
341 	const unsigned iter_shift = 24;
342 	const unsigned iterations = 1<<iter_shift;
343 	unsigned i = 0;
344 	void *burst = NULL;
345 
346 	const uint64_t sc_start = rte_rdtsc();
347 	for (i = 0; i < iterations; i++) {
348 		rte_ring_sp_enqueue(r, burst);
349 		rte_ring_sc_dequeue(r, &burst);
350 	}
351 	const uint64_t sc_end = rte_rdtsc();
352 
353 	const uint64_t mc_start = rte_rdtsc();
354 	for (i = 0; i < iterations; i++) {
355 		rte_ring_mp_enqueue(r, burst);
356 		rte_ring_mc_dequeue(r, &burst);
357 	}
358 	const uint64_t mc_end = rte_rdtsc();
359 
360 	printf("SP/SC single enq/dequeue: %"PRIu64"\n",
361 			(sc_end-sc_start) >> iter_shift);
362 	printf("MP/MC single enq/dequeue: %"PRIu64"\n",
363 			(mc_end-mc_start) >> iter_shift);
364 }
365 
366 /*
367  * Test that does both enqueue and dequeue on a core using the burst() API calls
368  * instead of the bulk() calls used in other tests. Results should be the same
369  * as for the bulk function called on a single lcore.
370  */
371 static void
372 test_burst_enqueue_dequeue(struct rte_ring *r)
373 {
374 	const unsigned iter_shift = 23;
375 	const unsigned iterations = 1<<iter_shift;
376 	unsigned sz, i = 0;
377 	void *burst[MAX_BURST] = {0};
378 
379 	for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) {
380 		const uint64_t sc_start = rte_rdtsc();
381 		for (i = 0; i < iterations; i++) {
382 			rte_ring_sp_enqueue_burst(r, burst,
383 					bulk_sizes[sz], NULL);
384 			rte_ring_sc_dequeue_burst(r, burst,
385 					bulk_sizes[sz], NULL);
386 		}
387 		const uint64_t sc_end = rte_rdtsc();
388 
389 		const uint64_t mc_start = rte_rdtsc();
390 		for (i = 0; i < iterations; i++) {
391 			rte_ring_mp_enqueue_burst(r, burst,
392 					bulk_sizes[sz], NULL);
393 			rte_ring_mc_dequeue_burst(r, burst,
394 					bulk_sizes[sz], NULL);
395 		}
396 		const uint64_t mc_end = rte_rdtsc();
397 
398 		uint64_t mc_avg = ((mc_end-mc_start) >> iter_shift) / bulk_sizes[sz];
399 		uint64_t sc_avg = ((sc_end-sc_start) >> iter_shift) / bulk_sizes[sz];
400 
401 		printf("SP/SC burst enq/dequeue (size: %u): %"PRIu64"\n", bulk_sizes[sz],
402 				sc_avg);
403 		printf("MP/MC burst enq/dequeue (size: %u): %"PRIu64"\n", bulk_sizes[sz],
404 				mc_avg);
405 	}
406 }
407 
408 /* Times enqueue and dequeue on a single lcore */
409 static void
410 test_bulk_enqueue_dequeue(struct rte_ring *r)
411 {
412 	const unsigned iter_shift = 23;
413 	const unsigned iterations = 1<<iter_shift;
414 	unsigned sz, i = 0;
415 	void *burst[MAX_BURST] = {0};
416 
417 	for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) {
418 		const uint64_t sc_start = rte_rdtsc();
419 		for (i = 0; i < iterations; i++) {
420 			rte_ring_sp_enqueue_bulk(r, burst,
421 					bulk_sizes[sz], NULL);
422 			rte_ring_sc_dequeue_bulk(r, burst,
423 					bulk_sizes[sz], NULL);
424 		}
425 		const uint64_t sc_end = rte_rdtsc();
426 
427 		const uint64_t mc_start = rte_rdtsc();
428 		for (i = 0; i < iterations; i++) {
429 			rte_ring_mp_enqueue_bulk(r, burst,
430 					bulk_sizes[sz], NULL);
431 			rte_ring_mc_dequeue_bulk(r, burst,
432 					bulk_sizes[sz], NULL);
433 		}
434 		const uint64_t mc_end = rte_rdtsc();
435 
436 		double sc_avg = ((double)(sc_end-sc_start) /
437 				(iterations * bulk_sizes[sz]));
438 		double mc_avg = ((double)(mc_end-mc_start) /
439 				(iterations * bulk_sizes[sz]));
440 
441 		printf("SP/SC bulk enq/dequeue (size: %u): %.2F\n", bulk_sizes[sz],
442 				sc_avg);
443 		printf("MP/MC bulk enq/dequeue (size: %u): %.2F\n", bulk_sizes[sz],
444 				mc_avg);
445 	}
446 }
447 
448 static int
449 test_ring_perf(void)
450 {
451 	struct lcore_pair cores;
452 	struct rte_ring *r = NULL;
453 
454 	r = rte_ring_create(RING_NAME, RING_SIZE, rte_socket_id(), 0);
455 	if (r == NULL)
456 		return -1;
457 
458 	printf("### Testing single element and burst enq/deq ###\n");
459 	test_single_enqueue_dequeue(r);
460 	test_burst_enqueue_dequeue(r);
461 
462 	printf("\n### Testing empty dequeue ###\n");
463 	test_empty_dequeue(r);
464 
465 	printf("\n### Testing using a single lcore ###\n");
466 	test_bulk_enqueue_dequeue(r);
467 
468 	if (get_two_hyperthreads(&cores) == 0) {
469 		printf("\n### Testing using two hyperthreads ###\n");
470 		run_on_core_pair(&cores, r, enqueue_bulk, dequeue_bulk);
471 	}
472 	if (get_two_cores(&cores) == 0) {
473 		printf("\n### Testing using two physical cores ###\n");
474 		run_on_core_pair(&cores, r, enqueue_bulk, dequeue_bulk);
475 	}
476 	if (get_two_sockets(&cores) == 0) {
477 		printf("\n### Testing using two NUMA nodes ###\n");
478 		run_on_core_pair(&cores, r, enqueue_bulk, dequeue_bulk);
479 	}
480 
481 	printf("\n### Testing using all slave nodes ###\n");
482 	run_on_all_cores(r);
483 
484 	rte_ring_free(r);
485 	return 0;
486 }
487 
488 REGISTER_TEST_COMMAND(ring_perf_autotest, test_ring_perf);
489