1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2010-2014 Intel Corporation
3 * Copyright(c) 2019 Arm Limited
4 */
5
6
7 #include <stdio.h>
8 #include <inttypes.h>
9 #include <rte_ring.h>
10 #include <rte_cycles.h>
11 #include <rte_launch.h>
12 #include <rte_pause.h>
13 #include <string.h>
14
15 #include "test.h"
16 #include "test_ring.h"
17
18 /*
19 * Ring performance test cases, measures performance of various operations
20 * using rdtsc for legacy and 16B size ring elements.
21 */
22
23 #define RING_NAME "RING_PERF"
24 #define RING_SIZE 4096
25 #define MAX_BURST 32
26
27 /*
28 * the sizes to enqueue and dequeue in testing
29 * (marked volatile so they won't be seen as compile-time constants)
30 */
31 static const volatile unsigned bulk_sizes[] = { 8, 32 };
32
33 struct lcore_pair {
34 unsigned c1, c2;
35 };
36
37 static volatile unsigned lcore_count = 0;
38
39 static void
test_ring_print_test_string(unsigned int api_type,int esize,unsigned int bsz,double value)40 test_ring_print_test_string(unsigned int api_type, int esize,
41 unsigned int bsz, double value)
42 {
43 if (esize == -1)
44 printf("legacy APIs");
45 else
46 printf("elem APIs: element size %dB", esize);
47
48 if (api_type == TEST_RING_IGNORE_API_TYPE)
49 return;
50
51 if ((api_type & TEST_RING_THREAD_DEF) == TEST_RING_THREAD_DEF)
52 printf(": default enqueue/dequeue: ");
53 else if ((api_type & TEST_RING_THREAD_SPSC) == TEST_RING_THREAD_SPSC)
54 printf(": SP/SC: ");
55 else if ((api_type & TEST_RING_THREAD_MPMC) == TEST_RING_THREAD_MPMC)
56 printf(": MP/MC: ");
57
58 if ((api_type & TEST_RING_ELEM_SINGLE) == TEST_RING_ELEM_SINGLE)
59 printf("single: ");
60 else if ((api_type & TEST_RING_ELEM_BULK) == TEST_RING_ELEM_BULK)
61 printf("bulk (size: %u): ", bsz);
62 else if ((api_type & TEST_RING_ELEM_BURST) == TEST_RING_ELEM_BURST)
63 printf("burst (size: %u): ", bsz);
64
65 printf("%.2F\n", value);
66 }
67
68 /**** Functions to analyse our core mask to get cores for different tests ***/
69
70 static int
get_two_hyperthreads(struct lcore_pair * lcp)71 get_two_hyperthreads(struct lcore_pair *lcp)
72 {
73 unsigned id1, id2;
74 unsigned c1, c2, s1, s2;
75 RTE_LCORE_FOREACH(id1) {
76 /* inner loop just re-reads all id's. We could skip the first few
77 * elements, but since number of cores is small there is little point
78 */
79 RTE_LCORE_FOREACH(id2) {
80 if (id1 == id2)
81 continue;
82
83 c1 = rte_lcore_to_cpu_id(id1);
84 c2 = rte_lcore_to_cpu_id(id2);
85 s1 = rte_lcore_to_socket_id(id1);
86 s2 = rte_lcore_to_socket_id(id2);
87 if ((c1 == c2) && (s1 == s2)){
88 lcp->c1 = id1;
89 lcp->c2 = id2;
90 return 0;
91 }
92 }
93 }
94 return 1;
95 }
96
97 static int
get_two_cores(struct lcore_pair * lcp)98 get_two_cores(struct lcore_pair *lcp)
99 {
100 unsigned id1, id2;
101 unsigned c1, c2, s1, s2;
102 RTE_LCORE_FOREACH(id1) {
103 RTE_LCORE_FOREACH(id2) {
104 if (id1 == id2)
105 continue;
106
107 c1 = rte_lcore_to_cpu_id(id1);
108 c2 = rte_lcore_to_cpu_id(id2);
109 s1 = rte_lcore_to_socket_id(id1);
110 s2 = rte_lcore_to_socket_id(id2);
111 if ((c1 != c2) && (s1 == s2)){
112 lcp->c1 = id1;
113 lcp->c2 = id2;
114 return 0;
115 }
116 }
117 }
118 return 1;
119 }
120
121 static int
get_two_sockets(struct lcore_pair * lcp)122 get_two_sockets(struct lcore_pair *lcp)
123 {
124 unsigned id1, id2;
125 unsigned s1, s2;
126 RTE_LCORE_FOREACH(id1) {
127 RTE_LCORE_FOREACH(id2) {
128 if (id1 == id2)
129 continue;
130 s1 = rte_lcore_to_socket_id(id1);
131 s2 = rte_lcore_to_socket_id(id2);
132 if (s1 != s2){
133 lcp->c1 = id1;
134 lcp->c2 = id2;
135 return 0;
136 }
137 }
138 }
139 return 1;
140 }
141
142 /* Get cycle counts for dequeuing from an empty ring. Should be 2 or 3 cycles */
143 static void
test_empty_dequeue(struct rte_ring * r,const int esize,const unsigned int api_type)144 test_empty_dequeue(struct rte_ring *r, const int esize,
145 const unsigned int api_type)
146 {
147 const unsigned int iter_shift = 26;
148 const unsigned int iterations = 1 << iter_shift;
149 unsigned int i = 0;
150 void *burst[MAX_BURST];
151
152 const uint64_t start = rte_rdtsc();
153 for (i = 0; i < iterations; i++)
154 test_ring_dequeue(r, burst, esize, bulk_sizes[0], api_type);
155 const uint64_t end = rte_rdtsc();
156
157 test_ring_print_test_string(api_type, esize, bulk_sizes[0],
158 ((double)(end - start)) / iterations);
159 }
160
161 /*
162 * for the separate enqueue and dequeue threads they take in one param
163 * and return two. Input = burst size, output = cycle average for sp/sc & mp/mc
164 */
165 struct thread_params {
166 struct rte_ring *r;
167 unsigned size; /* input value, the burst size */
168 double spsc, mpmc; /* output value, the single or multi timings */
169 };
170
171 /*
172 * Helper function to call bulk SP/MP enqueue functions.
173 * flag == 0 -> enqueue
174 * flag == 1 -> dequeue
175 */
176 static __rte_always_inline int
enqueue_dequeue_bulk_helper(const unsigned int flag,const int esize,struct thread_params * p)177 enqueue_dequeue_bulk_helper(const unsigned int flag, const int esize,
178 struct thread_params *p)
179 {
180 int ret;
181 const unsigned int iter_shift = 23;
182 const unsigned int iterations = 1 << iter_shift;
183 struct rte_ring *r = p->r;
184 unsigned int bsize = p->size;
185 unsigned int i;
186 void *burst = NULL;
187
188 #ifdef RTE_USE_C11_MEM_MODEL
189 if (__atomic_add_fetch(&lcore_count, 1, __ATOMIC_RELAXED) != 2)
190 #else
191 if (__sync_add_and_fetch(&lcore_count, 1) != 2)
192 #endif
193 while(lcore_count != 2)
194 rte_pause();
195
196 burst = test_ring_calloc(MAX_BURST, esize);
197 if (burst == NULL)
198 return -1;
199
200 const uint64_t sp_start = rte_rdtsc();
201 for (i = 0; i < iterations; i++)
202 do {
203 if (flag == 0)
204 ret = test_ring_enqueue(r, burst, esize, bsize,
205 TEST_RING_THREAD_SPSC |
206 TEST_RING_ELEM_BULK);
207 else if (flag == 1)
208 ret = test_ring_dequeue(r, burst, esize, bsize,
209 TEST_RING_THREAD_SPSC |
210 TEST_RING_ELEM_BULK);
211 if (ret == 0)
212 rte_pause();
213 } while (!ret);
214 const uint64_t sp_end = rte_rdtsc();
215
216 const uint64_t mp_start = rte_rdtsc();
217 for (i = 0; i < iterations; i++)
218 do {
219 if (flag == 0)
220 ret = test_ring_enqueue(r, burst, esize, bsize,
221 TEST_RING_THREAD_MPMC |
222 TEST_RING_ELEM_BULK);
223 else if (flag == 1)
224 ret = test_ring_dequeue(r, burst, esize, bsize,
225 TEST_RING_THREAD_MPMC |
226 TEST_RING_ELEM_BULK);
227 if (ret == 0)
228 rte_pause();
229 } while (!ret);
230 const uint64_t mp_end = rte_rdtsc();
231
232 p->spsc = ((double)(sp_end - sp_start))/(iterations * bsize);
233 p->mpmc = ((double)(mp_end - mp_start))/(iterations * bsize);
234 return 0;
235 }
236
237 /*
238 * Function that uses rdtsc to measure timing for ring enqueue. Needs pair
239 * thread running dequeue_bulk function
240 */
241 static int
enqueue_bulk(void * p)242 enqueue_bulk(void *p)
243 {
244 struct thread_params *params = p;
245
246 return enqueue_dequeue_bulk_helper(0, -1, params);
247 }
248
249 static int
enqueue_bulk_16B(void * p)250 enqueue_bulk_16B(void *p)
251 {
252 struct thread_params *params = p;
253
254 return enqueue_dequeue_bulk_helper(0, 16, params);
255 }
256
257 /*
258 * Function that uses rdtsc to measure timing for ring dequeue. Needs pair
259 * thread running enqueue_bulk function
260 */
261 static int
dequeue_bulk(void * p)262 dequeue_bulk(void *p)
263 {
264 struct thread_params *params = p;
265
266 return enqueue_dequeue_bulk_helper(1, -1, params);
267 }
268
269 static int
dequeue_bulk_16B(void * p)270 dequeue_bulk_16B(void *p)
271 {
272 struct thread_params *params = p;
273
274 return enqueue_dequeue_bulk_helper(1, 16, params);
275 }
276
277 /*
278 * Function that calls the enqueue and dequeue bulk functions on pairs of cores.
279 * used to measure ring perf between hyperthreads, cores and sockets.
280 */
281 static int
run_on_core_pair(struct lcore_pair * cores,struct rte_ring * r,const int esize)282 run_on_core_pair(struct lcore_pair *cores, struct rte_ring *r, const int esize)
283 {
284 lcore_function_t *f1, *f2;
285 struct thread_params param1 = {0}, param2 = {0};
286 unsigned i;
287
288 if (esize == -1) {
289 f1 = enqueue_bulk;
290 f2 = dequeue_bulk;
291 } else {
292 f1 = enqueue_bulk_16B;
293 f2 = dequeue_bulk_16B;
294 }
295
296 for (i = 0; i < RTE_DIM(bulk_sizes); i++) {
297 lcore_count = 0;
298 param1.size = param2.size = bulk_sizes[i];
299 param1.r = param2.r = r;
300 if (cores->c1 == rte_get_main_lcore()) {
301 rte_eal_remote_launch(f2, ¶m2, cores->c2);
302 f1(¶m1);
303 rte_eal_wait_lcore(cores->c2);
304 } else {
305 rte_eal_remote_launch(f1, ¶m1, cores->c1);
306 rte_eal_remote_launch(f2, ¶m2, cores->c2);
307 if (rte_eal_wait_lcore(cores->c1) < 0)
308 return -1;
309 if (rte_eal_wait_lcore(cores->c2) < 0)
310 return -1;
311 }
312 test_ring_print_test_string(
313 TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BULK,
314 esize, bulk_sizes[i], param1.spsc + param2.spsc);
315 test_ring_print_test_string(
316 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK,
317 esize, bulk_sizes[i], param1.mpmc + param2.mpmc);
318 }
319
320 return 0;
321 }
322
323 static rte_atomic32_t synchro;
324 static uint64_t queue_count[RTE_MAX_LCORE];
325
326 #define TIME_MS 100
327
328 static int
load_loop_fn_helper(struct thread_params * p,const int esize)329 load_loop_fn_helper(struct thread_params *p, const int esize)
330 {
331 uint64_t time_diff = 0;
332 uint64_t begin = 0;
333 uint64_t hz = rte_get_timer_hz();
334 uint64_t lcount = 0;
335 const unsigned int lcore = rte_lcore_id();
336 struct thread_params *params = p;
337 void *burst = NULL;
338
339 burst = test_ring_calloc(MAX_BURST, esize);
340 if (burst == NULL)
341 return -1;
342
343 /* wait synchro for workers */
344 if (lcore != rte_get_main_lcore())
345 while (rte_atomic32_read(&synchro) == 0)
346 rte_pause();
347
348 begin = rte_get_timer_cycles();
349 while (time_diff < hz * TIME_MS / 1000) {
350 test_ring_enqueue(params->r, burst, esize, params->size,
351 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK);
352 test_ring_dequeue(params->r, burst, esize, params->size,
353 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK);
354 lcount++;
355 time_diff = rte_get_timer_cycles() - begin;
356 }
357 queue_count[lcore] = lcount;
358
359 rte_free(burst);
360
361 return 0;
362 }
363
364 static int
load_loop_fn(void * p)365 load_loop_fn(void *p)
366 {
367 struct thread_params *params = p;
368
369 return load_loop_fn_helper(params, -1);
370 }
371
372 static int
load_loop_fn_16B(void * p)373 load_loop_fn_16B(void *p)
374 {
375 struct thread_params *params = p;
376
377 return load_loop_fn_helper(params, 16);
378 }
379
380 static int
run_on_all_cores(struct rte_ring * r,const int esize)381 run_on_all_cores(struct rte_ring *r, const int esize)
382 {
383 uint64_t total;
384 struct thread_params param;
385 lcore_function_t *lcore_f;
386 unsigned int i, c;
387
388 if (esize == -1)
389 lcore_f = load_loop_fn;
390 else
391 lcore_f = load_loop_fn_16B;
392
393 memset(¶m, 0, sizeof(struct thread_params));
394 for (i = 0; i < RTE_DIM(bulk_sizes); i++) {
395 total = 0;
396 printf("\nBulk enq/dequeue count on size %u\n", bulk_sizes[i]);
397 param.size = bulk_sizes[i];
398 param.r = r;
399
400 /* clear synchro and start workers */
401 rte_atomic32_set(&synchro, 0);
402 if (rte_eal_mp_remote_launch(lcore_f, ¶m, SKIP_MAIN) < 0)
403 return -1;
404
405 /* start synchro and launch test on main */
406 rte_atomic32_set(&synchro, 1);
407 lcore_f(¶m);
408
409 rte_eal_mp_wait_lcore();
410
411 RTE_LCORE_FOREACH(c) {
412 printf("Core [%u] count = %"PRIu64"\n",
413 c, queue_count[c]);
414 total += queue_count[c];
415 }
416
417 printf("Total count (size: %u): %"PRIu64"\n",
418 bulk_sizes[i], total);
419 }
420
421 return 0;
422 }
423
424 /*
425 * Test function that determines how long an enqueue + dequeue of a single item
426 * takes on a single lcore. Result is for comparison with the bulk enq+deq.
427 */
428 static int
test_single_enqueue_dequeue(struct rte_ring * r,const int esize,const unsigned int api_type)429 test_single_enqueue_dequeue(struct rte_ring *r, const int esize,
430 const unsigned int api_type)
431 {
432 const unsigned int iter_shift = 24;
433 const unsigned int iterations = 1 << iter_shift;
434 unsigned int i = 0;
435 void *burst = NULL;
436
437 /* alloc dummy object pointers */
438 burst = test_ring_calloc(1, esize);
439 if (burst == NULL)
440 return -1;
441
442 const uint64_t start = rte_rdtsc();
443 for (i = 0; i < iterations; i++) {
444 test_ring_enqueue(r, burst, esize, 1, api_type);
445 test_ring_dequeue(r, burst, esize, 1, api_type);
446 }
447 const uint64_t end = rte_rdtsc();
448
449 test_ring_print_test_string(api_type, esize, 1,
450 ((double)(end - start)) / iterations);
451
452 rte_free(burst);
453
454 return 0;
455 }
456
457 /*
458 * Test that does both enqueue and dequeue on a core using the burst/bulk API
459 * calls Results should be the same as for the bulk function called on a
460 * single lcore.
461 */
462 static int
test_burst_bulk_enqueue_dequeue(struct rte_ring * r,const int esize,const unsigned int api_type)463 test_burst_bulk_enqueue_dequeue(struct rte_ring *r, const int esize,
464 const unsigned int api_type)
465 {
466 const unsigned int iter_shift = 23;
467 const unsigned int iterations = 1 << iter_shift;
468 unsigned int sz, i = 0;
469 void **burst = NULL;
470
471 burst = test_ring_calloc(MAX_BURST, esize);
472 if (burst == NULL)
473 return -1;
474
475 for (sz = 0; sz < RTE_DIM(bulk_sizes); sz++) {
476 const uint64_t start = rte_rdtsc();
477 for (i = 0; i < iterations; i++) {
478 test_ring_enqueue(r, burst, esize, bulk_sizes[sz],
479 api_type);
480 test_ring_dequeue(r, burst, esize, bulk_sizes[sz],
481 api_type);
482 }
483 const uint64_t end = rte_rdtsc();
484
485 test_ring_print_test_string(api_type, esize, bulk_sizes[sz],
486 ((double)(end - start)) / iterations);
487 }
488
489 rte_free(burst);
490
491 return 0;
492 }
493
494 /* Run all tests for a given element size */
495 static __rte_always_inline int
test_ring_perf_esize(const int esize)496 test_ring_perf_esize(const int esize)
497 {
498 struct lcore_pair cores;
499 struct rte_ring *r = NULL;
500
501 /*
502 * Performance test for legacy/_elem APIs
503 * SP-SC/MP-MC, single
504 */
505 r = test_ring_create(RING_NAME, esize, RING_SIZE, rte_socket_id(), 0);
506 if (r == NULL)
507 goto test_fail;
508
509 printf("\n### Testing single element enq/deq ###\n");
510 if (test_single_enqueue_dequeue(r, esize,
511 TEST_RING_THREAD_SPSC | TEST_RING_ELEM_SINGLE) < 0)
512 goto test_fail;
513 if (test_single_enqueue_dequeue(r, esize,
514 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_SINGLE) < 0)
515 goto test_fail;
516
517 printf("\n### Testing burst enq/deq ###\n");
518 if (test_burst_bulk_enqueue_dequeue(r, esize,
519 TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BURST) < 0)
520 goto test_fail;
521 if (test_burst_bulk_enqueue_dequeue(r, esize,
522 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BURST) < 0)
523 goto test_fail;
524
525 printf("\n### Testing bulk enq/deq ###\n");
526 if (test_burst_bulk_enqueue_dequeue(r, esize,
527 TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BULK) < 0)
528 goto test_fail;
529 if (test_burst_bulk_enqueue_dequeue(r, esize,
530 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK) < 0)
531 goto test_fail;
532
533 printf("\n### Testing empty bulk deq ###\n");
534 test_empty_dequeue(r, esize,
535 TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BULK);
536 test_empty_dequeue(r, esize,
537 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK);
538
539 if (get_two_hyperthreads(&cores) == 0) {
540 printf("\n### Testing using two hyperthreads ###\n");
541 if (run_on_core_pair(&cores, r, esize) < 0)
542 goto test_fail;
543 }
544
545 if (get_two_cores(&cores) == 0) {
546 printf("\n### Testing using two physical cores ###\n");
547 if (run_on_core_pair(&cores, r, esize) < 0)
548 goto test_fail;
549 }
550 if (get_two_sockets(&cores) == 0) {
551 printf("\n### Testing using two NUMA nodes ###\n");
552 if (run_on_core_pair(&cores, r, esize) < 0)
553 goto test_fail;
554 }
555
556 printf("\n### Testing using all worker nodes ###\n");
557 if (run_on_all_cores(r, esize) < 0)
558 goto test_fail;
559
560 rte_ring_free(r);
561
562 return 0;
563
564 test_fail:
565 rte_ring_free(r);
566
567 return -1;
568 }
569
570 static int
test_ring_perf(void)571 test_ring_perf(void)
572 {
573 /* Run all the tests for different element sizes */
574 if (test_ring_perf_esize(-1) == -1)
575 return -1;
576
577 if (test_ring_perf_esize(16) == -1)
578 return -1;
579
580 return 0;
581 }
582
583 REGISTER_TEST_COMMAND(ring_perf_autotest, test_ring_perf);
584