1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2010-2014 Intel Corporation 3 * Copyright(c) 2019 Arm Limited 4 */ 5 6 7 #include <stdio.h> 8 #include <inttypes.h> 9 #include <rte_ring.h> 10 #include <rte_cycles.h> 11 #include <rte_launch.h> 12 #include <rte_pause.h> 13 #include <string.h> 14 15 #include "test.h" 16 17 /* 18 * Ring 19 * ==== 20 * 21 * Measures performance of various operations using rdtsc 22 * * Empty ring dequeue 23 * * Enqueue/dequeue of bursts in 1 threads 24 * * Enqueue/dequeue of bursts in 2 threads 25 * * Enqueue/dequeue of bursts in all available threads 26 */ 27 28 #define RING_NAME "RING_PERF" 29 #define RING_SIZE 4096 30 #define MAX_BURST 32 31 32 /* 33 * the sizes to enqueue and dequeue in testing 34 * (marked volatile so they won't be seen as compile-time constants) 35 */ 36 static const volatile unsigned bulk_sizes[] = { 8, 32 }; 37 38 struct lcore_pair { 39 unsigned c1, c2; 40 }; 41 42 static volatile unsigned lcore_count = 0; 43 44 /**** Functions to analyse our core mask to get cores for different tests ***/ 45 46 static int 47 get_two_hyperthreads(struct lcore_pair *lcp) 48 { 49 unsigned id1, id2; 50 unsigned c1, c2, s1, s2; 51 RTE_LCORE_FOREACH(id1) { 52 /* inner loop just re-reads all id's. We could skip the first few 53 * elements, but since number of cores is small there is little point 54 */ 55 RTE_LCORE_FOREACH(id2) { 56 if (id1 == id2) 57 continue; 58 59 c1 = rte_lcore_to_cpu_id(id1); 60 c2 = rte_lcore_to_cpu_id(id2); 61 s1 = rte_lcore_to_socket_id(id1); 62 s2 = rte_lcore_to_socket_id(id2); 63 if ((c1 == c2) && (s1 == s2)){ 64 lcp->c1 = id1; 65 lcp->c2 = id2; 66 return 0; 67 } 68 } 69 } 70 return 1; 71 } 72 73 static int 74 get_two_cores(struct lcore_pair *lcp) 75 { 76 unsigned id1, id2; 77 unsigned c1, c2, s1, s2; 78 RTE_LCORE_FOREACH(id1) { 79 RTE_LCORE_FOREACH(id2) { 80 if (id1 == id2) 81 continue; 82 83 c1 = rte_lcore_to_cpu_id(id1); 84 c2 = rte_lcore_to_cpu_id(id2); 85 s1 = rte_lcore_to_socket_id(id1); 86 s2 = rte_lcore_to_socket_id(id2); 87 if ((c1 != c2) && (s1 == s2)){ 88 lcp->c1 = id1; 89 lcp->c2 = id2; 90 return 0; 91 } 92 } 93 } 94 return 1; 95 } 96 97 static int 98 get_two_sockets(struct lcore_pair *lcp) 99 { 100 unsigned id1, id2; 101 unsigned s1, s2; 102 RTE_LCORE_FOREACH(id1) { 103 RTE_LCORE_FOREACH(id2) { 104 if (id1 == id2) 105 continue; 106 s1 = rte_lcore_to_socket_id(id1); 107 s2 = rte_lcore_to_socket_id(id2); 108 if (s1 != s2){ 109 lcp->c1 = id1; 110 lcp->c2 = id2; 111 return 0; 112 } 113 } 114 } 115 return 1; 116 } 117 118 /* Get cycle counts for dequeuing from an empty ring. Should be 2 or 3 cycles */ 119 static void 120 test_empty_dequeue(struct rte_ring *r) 121 { 122 const unsigned iter_shift = 26; 123 const unsigned iterations = 1<<iter_shift; 124 unsigned i = 0; 125 void *burst[MAX_BURST]; 126 127 const uint64_t sc_start = rte_rdtsc(); 128 for (i = 0; i < iterations; i++) 129 rte_ring_sc_dequeue_bulk(r, burst, bulk_sizes[0], NULL); 130 const uint64_t sc_end = rte_rdtsc(); 131 132 const uint64_t mc_start = rte_rdtsc(); 133 for (i = 0; i < iterations; i++) 134 rte_ring_mc_dequeue_bulk(r, burst, bulk_sizes[0], NULL); 135 const uint64_t mc_end = rte_rdtsc(); 136 137 printf("SC empty dequeue: %.2F\n", 138 (double)(sc_end-sc_start) / iterations); 139 printf("MC empty dequeue: %.2F\n", 140 (double)(mc_end-mc_start) / iterations); 141 } 142 143 /* 144 * for the separate enqueue and dequeue threads they take in one param 145 * and return two. Input = burst size, output = cycle average for sp/sc & mp/mc 146 */ 147 struct thread_params { 148 struct rte_ring *r; 149 unsigned size; /* input value, the burst size */ 150 double spsc, mpmc; /* output value, the single or multi timings */ 151 }; 152 153 /* 154 * Function that uses rdtsc to measure timing for ring enqueue. Needs pair 155 * thread running dequeue_bulk function 156 */ 157 static int 158 enqueue_bulk(void *p) 159 { 160 const unsigned iter_shift = 23; 161 const unsigned iterations = 1<<iter_shift; 162 struct thread_params *params = p; 163 struct rte_ring *r = params->r; 164 const unsigned size = params->size; 165 unsigned i; 166 void *burst[MAX_BURST] = {0}; 167 168 #ifdef RTE_USE_C11_MEM_MODEL 169 if (__atomic_add_fetch(&lcore_count, 1, __ATOMIC_RELAXED) != 2) 170 #else 171 if (__sync_add_and_fetch(&lcore_count, 1) != 2) 172 #endif 173 while(lcore_count != 2) 174 rte_pause(); 175 176 const uint64_t sp_start = rte_rdtsc(); 177 for (i = 0; i < iterations; i++) 178 while (rte_ring_sp_enqueue_bulk(r, burst, size, NULL) == 0) 179 rte_pause(); 180 const uint64_t sp_end = rte_rdtsc(); 181 182 const uint64_t mp_start = rte_rdtsc(); 183 for (i = 0; i < iterations; i++) 184 while (rte_ring_mp_enqueue_bulk(r, burst, size, NULL) == 0) 185 rte_pause(); 186 const uint64_t mp_end = rte_rdtsc(); 187 188 params->spsc = ((double)(sp_end - sp_start))/(iterations*size); 189 params->mpmc = ((double)(mp_end - mp_start))/(iterations*size); 190 return 0; 191 } 192 193 /* 194 * Function that uses rdtsc to measure timing for ring dequeue. Needs pair 195 * thread running enqueue_bulk function 196 */ 197 static int 198 dequeue_bulk(void *p) 199 { 200 const unsigned iter_shift = 23; 201 const unsigned iterations = 1<<iter_shift; 202 struct thread_params *params = p; 203 struct rte_ring *r = params->r; 204 const unsigned size = params->size; 205 unsigned i; 206 void *burst[MAX_BURST] = {0}; 207 208 #ifdef RTE_USE_C11_MEM_MODEL 209 if (__atomic_add_fetch(&lcore_count, 1, __ATOMIC_RELAXED) != 2) 210 #else 211 if (__sync_add_and_fetch(&lcore_count, 1) != 2) 212 #endif 213 while(lcore_count != 2) 214 rte_pause(); 215 216 const uint64_t sc_start = rte_rdtsc(); 217 for (i = 0; i < iterations; i++) 218 while (rte_ring_sc_dequeue_bulk(r, burst, size, NULL) == 0) 219 rte_pause(); 220 const uint64_t sc_end = rte_rdtsc(); 221 222 const uint64_t mc_start = rte_rdtsc(); 223 for (i = 0; i < iterations; i++) 224 while (rte_ring_mc_dequeue_bulk(r, burst, size, NULL) == 0) 225 rte_pause(); 226 const uint64_t mc_end = rte_rdtsc(); 227 228 params->spsc = ((double)(sc_end - sc_start))/(iterations*size); 229 params->mpmc = ((double)(mc_end - mc_start))/(iterations*size); 230 return 0; 231 } 232 233 /* 234 * Function that calls the enqueue and dequeue bulk functions on pairs of cores. 235 * used to measure ring perf between hyperthreads, cores and sockets. 236 */ 237 static void 238 run_on_core_pair(struct lcore_pair *cores, struct rte_ring *r, 239 lcore_function_t f1, lcore_function_t f2) 240 { 241 struct thread_params param1 = {0}, param2 = {0}; 242 unsigned i; 243 for (i = 0; i < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); i++) { 244 lcore_count = 0; 245 param1.size = param2.size = bulk_sizes[i]; 246 param1.r = param2.r = r; 247 if (cores->c1 == rte_get_master_lcore()) { 248 rte_eal_remote_launch(f2, ¶m2, cores->c2); 249 f1(¶m1); 250 rte_eal_wait_lcore(cores->c2); 251 } else { 252 rte_eal_remote_launch(f1, ¶m1, cores->c1); 253 rte_eal_remote_launch(f2, ¶m2, cores->c2); 254 rte_eal_wait_lcore(cores->c1); 255 rte_eal_wait_lcore(cores->c2); 256 } 257 printf("SP/SC bulk enq/dequeue (size: %u): %.2F\n", bulk_sizes[i], 258 param1.spsc + param2.spsc); 259 printf("MP/MC bulk enq/dequeue (size: %u): %.2F\n", bulk_sizes[i], 260 param1.mpmc + param2.mpmc); 261 } 262 } 263 264 static rte_atomic32_t synchro; 265 static uint64_t queue_count[RTE_MAX_LCORE]; 266 267 #define TIME_MS 100 268 269 static int 270 load_loop_fn(void *p) 271 { 272 uint64_t time_diff = 0; 273 uint64_t begin = 0; 274 uint64_t hz = rte_get_timer_hz(); 275 uint64_t lcount = 0; 276 const unsigned int lcore = rte_lcore_id(); 277 struct thread_params *params = p; 278 void *burst[MAX_BURST] = {0}; 279 280 /* wait synchro for slaves */ 281 if (lcore != rte_get_master_lcore()) 282 while (rte_atomic32_read(&synchro) == 0) 283 rte_pause(); 284 285 begin = rte_get_timer_cycles(); 286 while (time_diff < hz * TIME_MS / 1000) { 287 rte_ring_mp_enqueue_bulk(params->r, burst, params->size, NULL); 288 rte_ring_mc_dequeue_bulk(params->r, burst, params->size, NULL); 289 lcount++; 290 time_diff = rte_get_timer_cycles() - begin; 291 } 292 queue_count[lcore] = lcount; 293 return 0; 294 } 295 296 static int 297 run_on_all_cores(struct rte_ring *r) 298 { 299 uint64_t total = 0; 300 struct thread_params param; 301 unsigned int i, c; 302 303 memset(¶m, 0, sizeof(struct thread_params)); 304 for (i = 0; i < RTE_DIM(bulk_sizes); i++) { 305 printf("\nBulk enq/dequeue count on size %u\n", bulk_sizes[i]); 306 param.size = bulk_sizes[i]; 307 param.r = r; 308 309 /* clear synchro and start slaves */ 310 rte_atomic32_set(&synchro, 0); 311 if (rte_eal_mp_remote_launch(load_loop_fn, ¶m, 312 SKIP_MASTER) < 0) 313 return -1; 314 315 /* start synchro and launch test on master */ 316 rte_atomic32_set(&synchro, 1); 317 load_loop_fn(¶m); 318 319 rte_eal_mp_wait_lcore(); 320 321 RTE_LCORE_FOREACH(c) { 322 printf("Core [%u] count = %"PRIu64"\n", 323 c, queue_count[c]); 324 total += queue_count[c]; 325 } 326 327 printf("Total count (size: %u): %"PRIu64"\n", 328 bulk_sizes[i], total); 329 } 330 331 return 0; 332 } 333 334 /* 335 * Test function that determines how long an enqueue + dequeue of a single item 336 * takes on a single lcore. Result is for comparison with the bulk enq+deq. 337 */ 338 static void 339 test_single_enqueue_dequeue(struct rte_ring *r) 340 { 341 const unsigned iter_shift = 24; 342 const unsigned iterations = 1<<iter_shift; 343 unsigned i = 0; 344 void *burst = NULL; 345 346 const uint64_t sc_start = rte_rdtsc(); 347 for (i = 0; i < iterations; i++) { 348 rte_ring_sp_enqueue(r, burst); 349 rte_ring_sc_dequeue(r, &burst); 350 } 351 const uint64_t sc_end = rte_rdtsc(); 352 353 const uint64_t mc_start = rte_rdtsc(); 354 for (i = 0; i < iterations; i++) { 355 rte_ring_mp_enqueue(r, burst); 356 rte_ring_mc_dequeue(r, &burst); 357 } 358 const uint64_t mc_end = rte_rdtsc(); 359 360 printf("SP/SC single enq/dequeue: %"PRIu64"\n", 361 (sc_end-sc_start) >> iter_shift); 362 printf("MP/MC single enq/dequeue: %"PRIu64"\n", 363 (mc_end-mc_start) >> iter_shift); 364 } 365 366 /* 367 * Test that does both enqueue and dequeue on a core using the burst() API calls 368 * instead of the bulk() calls used in other tests. Results should be the same 369 * as for the bulk function called on a single lcore. 370 */ 371 static void 372 test_burst_enqueue_dequeue(struct rte_ring *r) 373 { 374 const unsigned iter_shift = 23; 375 const unsigned iterations = 1<<iter_shift; 376 unsigned sz, i = 0; 377 void *burst[MAX_BURST] = {0}; 378 379 for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) { 380 const uint64_t sc_start = rte_rdtsc(); 381 for (i = 0; i < iterations; i++) { 382 rte_ring_sp_enqueue_burst(r, burst, 383 bulk_sizes[sz], NULL); 384 rte_ring_sc_dequeue_burst(r, burst, 385 bulk_sizes[sz], NULL); 386 } 387 const uint64_t sc_end = rte_rdtsc(); 388 389 const uint64_t mc_start = rte_rdtsc(); 390 for (i = 0; i < iterations; i++) { 391 rte_ring_mp_enqueue_burst(r, burst, 392 bulk_sizes[sz], NULL); 393 rte_ring_mc_dequeue_burst(r, burst, 394 bulk_sizes[sz], NULL); 395 } 396 const uint64_t mc_end = rte_rdtsc(); 397 398 uint64_t mc_avg = ((mc_end-mc_start) >> iter_shift) / bulk_sizes[sz]; 399 uint64_t sc_avg = ((sc_end-sc_start) >> iter_shift) / bulk_sizes[sz]; 400 401 printf("SP/SC burst enq/dequeue (size: %u): %"PRIu64"\n", bulk_sizes[sz], 402 sc_avg); 403 printf("MP/MC burst enq/dequeue (size: %u): %"PRIu64"\n", bulk_sizes[sz], 404 mc_avg); 405 } 406 } 407 408 /* Times enqueue and dequeue on a single lcore */ 409 static void 410 test_bulk_enqueue_dequeue(struct rte_ring *r) 411 { 412 const unsigned iter_shift = 23; 413 const unsigned iterations = 1<<iter_shift; 414 unsigned sz, i = 0; 415 void *burst[MAX_BURST] = {0}; 416 417 for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) { 418 const uint64_t sc_start = rte_rdtsc(); 419 for (i = 0; i < iterations; i++) { 420 rte_ring_sp_enqueue_bulk(r, burst, 421 bulk_sizes[sz], NULL); 422 rte_ring_sc_dequeue_bulk(r, burst, 423 bulk_sizes[sz], NULL); 424 } 425 const uint64_t sc_end = rte_rdtsc(); 426 427 const uint64_t mc_start = rte_rdtsc(); 428 for (i = 0; i < iterations; i++) { 429 rte_ring_mp_enqueue_bulk(r, burst, 430 bulk_sizes[sz], NULL); 431 rte_ring_mc_dequeue_bulk(r, burst, 432 bulk_sizes[sz], NULL); 433 } 434 const uint64_t mc_end = rte_rdtsc(); 435 436 double sc_avg = ((double)(sc_end-sc_start) / 437 (iterations * bulk_sizes[sz])); 438 double mc_avg = ((double)(mc_end-mc_start) / 439 (iterations * bulk_sizes[sz])); 440 441 printf("SP/SC bulk enq/dequeue (size: %u): %.2F\n", bulk_sizes[sz], 442 sc_avg); 443 printf("MP/MC bulk enq/dequeue (size: %u): %.2F\n", bulk_sizes[sz], 444 mc_avg); 445 } 446 } 447 448 static int 449 test_ring_perf(void) 450 { 451 struct lcore_pair cores; 452 struct rte_ring *r = NULL; 453 454 r = rte_ring_create(RING_NAME, RING_SIZE, rte_socket_id(), 0); 455 if (r == NULL) 456 return -1; 457 458 printf("### Testing single element and burst enq/deq ###\n"); 459 test_single_enqueue_dequeue(r); 460 test_burst_enqueue_dequeue(r); 461 462 printf("\n### Testing empty dequeue ###\n"); 463 test_empty_dequeue(r); 464 465 printf("\n### Testing using a single lcore ###\n"); 466 test_bulk_enqueue_dequeue(r); 467 468 if (get_two_hyperthreads(&cores) == 0) { 469 printf("\n### Testing using two hyperthreads ###\n"); 470 run_on_core_pair(&cores, r, enqueue_bulk, dequeue_bulk); 471 } 472 if (get_two_cores(&cores) == 0) { 473 printf("\n### Testing using two physical cores ###\n"); 474 run_on_core_pair(&cores, r, enqueue_bulk, dequeue_bulk); 475 } 476 if (get_two_sockets(&cores) == 0) { 477 printf("\n### Testing using two NUMA nodes ###\n"); 478 run_on_core_pair(&cores, r, enqueue_bulk, dequeue_bulk); 479 } 480 481 printf("\n### Testing using all slave nodes ###\n"); 482 run_on_all_cores(r); 483 484 rte_ring_free(r); 485 return 0; 486 } 487 488 REGISTER_TEST_COMMAND(ring_perf_autotest, test_ring_perf); 489