1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2020 Intel Corporation 3 */ 4 5 #include "test_ring_stress.h" 6 7 /** 8 * Stress test for ring enqueue/dequeue operations. 9 * Performs the following pattern on each worker: 10 * dequeue/read-write data from the dequeued objects/enqueue. 11 * Serves as both functional and performance test of ring 12 * enqueue/dequeue operations under high contention 13 * (for both over committed and non-over committed scenarios). 14 */ 15 16 #define RING_NAME "RING_STRESS" 17 #define BULK_NUM 32 18 #define RING_SIZE (2 * BULK_NUM * RTE_MAX_LCORE) 19 20 enum { 21 WRK_CMD_STOP, 22 WRK_CMD_RUN, 23 }; 24 25 static uint32_t wrk_cmd __rte_cache_aligned = WRK_CMD_STOP; 26 27 /* test run-time in seconds */ 28 static const uint32_t run_time = 60; 29 static const uint32_t verbose; 30 31 struct lcore_stat { 32 uint64_t nb_cycle; 33 struct { 34 uint64_t nb_call; 35 uint64_t nb_obj; 36 uint64_t nb_cycle; 37 uint64_t max_cycle; 38 uint64_t min_cycle; 39 } op; 40 }; 41 42 struct lcore_arg { 43 struct rte_ring *rng; 44 struct lcore_stat stats; 45 } __rte_cache_aligned; 46 47 struct ring_elem { 48 uint32_t cnt[RTE_CACHE_LINE_SIZE / sizeof(uint32_t)]; 49 } __rte_cache_aligned; 50 51 /* 52 * redefinable functions 53 */ 54 static uint32_t 55 _st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n, 56 uint32_t *avail); 57 58 static uint32_t 59 _st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n, 60 uint32_t *free); 61 62 static int 63 _st_ring_init(struct rte_ring *r, const char *name, uint32_t num); 64 65 66 static void 67 lcore_stat_update(struct lcore_stat *ls, uint64_t call, uint64_t obj, 68 uint64_t tm, int32_t prcs) 69 { 70 ls->op.nb_call += call; 71 ls->op.nb_obj += obj; 72 ls->op.nb_cycle += tm; 73 if (prcs) { 74 ls->op.max_cycle = RTE_MAX(ls->op.max_cycle, tm); 75 ls->op.min_cycle = RTE_MIN(ls->op.min_cycle, tm); 76 } 77 } 78 79 static void 80 lcore_op_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls) 81 { 82 83 ms->op.nb_call += ls->op.nb_call; 84 ms->op.nb_obj += ls->op.nb_obj; 85 ms->op.nb_cycle += ls->op.nb_cycle; 86 ms->op.max_cycle = RTE_MAX(ms->op.max_cycle, ls->op.max_cycle); 87 ms->op.min_cycle = RTE_MIN(ms->op.min_cycle, ls->op.min_cycle); 88 } 89 90 static void 91 lcore_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls) 92 { 93 ms->nb_cycle = RTE_MAX(ms->nb_cycle, ls->nb_cycle); 94 lcore_op_stat_aggr(ms, ls); 95 } 96 97 static void 98 lcore_stat_dump(FILE *f, uint32_t lc, const struct lcore_stat *ls) 99 { 100 long double st; 101 102 st = (long double)rte_get_timer_hz() / US_PER_S; 103 104 if (lc == UINT32_MAX) 105 fprintf(f, "%s(AGGREGATE)={\n", __func__); 106 else 107 fprintf(f, "%s(lcore=%u)={\n", __func__, lc); 108 109 fprintf(f, "\tnb_cycle=%" PRIu64 "(%.2Lf usec),\n", 110 ls->nb_cycle, (long double)ls->nb_cycle / st); 111 112 fprintf(f, "\tDEQ+ENQ={\n"); 113 114 fprintf(f, "\t\tnb_call=%" PRIu64 ",\n", ls->op.nb_call); 115 fprintf(f, "\t\tnb_obj=%" PRIu64 ",\n", ls->op.nb_obj); 116 fprintf(f, "\t\tnb_cycle=%" PRIu64 ",\n", ls->op.nb_cycle); 117 fprintf(f, "\t\tobj/call(avg): %.2Lf\n", 118 (long double)ls->op.nb_obj / ls->op.nb_call); 119 fprintf(f, "\t\tcycles/obj(avg): %.2Lf\n", 120 (long double)ls->op.nb_cycle / ls->op.nb_obj); 121 fprintf(f, "\t\tcycles/call(avg): %.2Lf\n", 122 (long double)ls->op.nb_cycle / ls->op.nb_call); 123 124 /* if min/max cycles per call stats was collected */ 125 if (ls->op.min_cycle != UINT64_MAX) { 126 fprintf(f, "\t\tmax cycles/call=%" PRIu64 "(%.2Lf usec),\n", 127 ls->op.max_cycle, 128 (long double)ls->op.max_cycle / st); 129 fprintf(f, "\t\tmin cycles/call=%" PRIu64 "(%.2Lf usec),\n", 130 ls->op.min_cycle, 131 (long double)ls->op.min_cycle / st); 132 } 133 134 fprintf(f, "\t},\n"); 135 fprintf(f, "};\n"); 136 } 137 138 static void 139 fill_ring_elm(struct ring_elem *elm, uint32_t fill) 140 { 141 uint32_t i; 142 143 for (i = 0; i != RTE_DIM(elm->cnt); i++) 144 elm->cnt[i] = fill; 145 } 146 147 static int32_t 148 check_updt_elem(struct ring_elem *elm[], uint32_t num, 149 const struct ring_elem *check, const struct ring_elem *fill) 150 { 151 uint32_t i; 152 153 static rte_spinlock_t dump_lock; 154 155 for (i = 0; i != num; i++) { 156 if (memcmp(check, elm[i], sizeof(*check)) != 0) { 157 rte_spinlock_lock(&dump_lock); 158 printf("%s(lc=%u, num=%u) failed at %u-th iter, " 159 "offending object: %p\n", 160 __func__, rte_lcore_id(), num, i, elm[i]); 161 rte_memdump(stdout, "expected", check, sizeof(*check)); 162 rte_memdump(stdout, "result", elm[i], sizeof(*elm[i])); 163 rte_spinlock_unlock(&dump_lock); 164 return -EINVAL; 165 } 166 memcpy(elm[i], fill, sizeof(*elm[i])); 167 } 168 169 return 0; 170 } 171 172 static int 173 check_ring_op(uint32_t exp, uint32_t res, uint32_t lc, 174 const char *fname, const char *opname) 175 { 176 if (exp != res) { 177 printf("%s(lc=%u) failure: %s expected: %u, returned %u\n", 178 fname, lc, opname, exp, res); 179 return -ENOSPC; 180 } 181 return 0; 182 } 183 184 static int 185 test_worker(void *arg, const char *fname, int32_t prcs) 186 { 187 int32_t rc; 188 uint32_t lc, n, num; 189 uint64_t cl, tm0, tm1; 190 struct lcore_arg *la; 191 struct ring_elem def_elm, loc_elm; 192 struct ring_elem *obj[2 * BULK_NUM]; 193 194 la = arg; 195 lc = rte_lcore_id(); 196 197 fill_ring_elm(&def_elm, UINT32_MAX); 198 fill_ring_elm(&loc_elm, lc); 199 200 /* Acquire ordering is not required as the main is not 201 * really releasing any data through 'wrk_cmd' to 202 * the worker. 203 */ 204 while (__atomic_load_n(&wrk_cmd, __ATOMIC_RELAXED) != WRK_CMD_RUN) 205 rte_pause(); 206 207 cl = rte_rdtsc_precise(); 208 209 do { 210 /* num in interval [7/8, 11/8] of BULK_NUM */ 211 num = 7 * BULK_NUM / 8 + rte_rand() % (BULK_NUM / 2); 212 213 /* reset all pointer values */ 214 memset(obj, 0, sizeof(obj)); 215 216 /* dequeue num elems */ 217 tm0 = (prcs != 0) ? rte_rdtsc_precise() : 0; 218 n = _st_ring_dequeue_bulk(la->rng, (void **)obj, num, NULL); 219 tm0 = (prcs != 0) ? rte_rdtsc_precise() - tm0 : 0; 220 221 /* check return value and objects */ 222 rc = check_ring_op(num, n, lc, fname, 223 RTE_STR(_st_ring_dequeue_bulk)); 224 if (rc == 0) 225 rc = check_updt_elem(obj, num, &def_elm, &loc_elm); 226 if (rc != 0) 227 break; 228 229 /* enqueue num elems */ 230 rte_compiler_barrier(); 231 rc = check_updt_elem(obj, num, &loc_elm, &def_elm); 232 if (rc != 0) 233 break; 234 235 tm1 = (prcs != 0) ? rte_rdtsc_precise() : 0; 236 n = _st_ring_enqueue_bulk(la->rng, (void **)obj, num, NULL); 237 tm1 = (prcs != 0) ? rte_rdtsc_precise() - tm1 : 0; 238 239 /* check return value */ 240 rc = check_ring_op(num, n, lc, fname, 241 RTE_STR(_st_ring_enqueue_bulk)); 242 if (rc != 0) 243 break; 244 245 lcore_stat_update(&la->stats, 1, num, tm0 + tm1, prcs); 246 247 } while (__atomic_load_n(&wrk_cmd, __ATOMIC_RELAXED) == WRK_CMD_RUN); 248 249 cl = rte_rdtsc_precise() - cl; 250 if (prcs == 0) 251 lcore_stat_update(&la->stats, 0, 0, cl, 0); 252 la->stats.nb_cycle = cl; 253 return rc; 254 } 255 static int 256 test_worker_prcs(void *arg) 257 { 258 return test_worker(arg, __func__, 1); 259 } 260 261 static int 262 test_worker_avg(void *arg) 263 { 264 return test_worker(arg, __func__, 0); 265 } 266 267 static void 268 mt1_fini(struct rte_ring *rng, void *data) 269 { 270 rte_free(rng); 271 rte_free(data); 272 } 273 274 static int 275 mt1_init(struct rte_ring **rng, void **data, uint32_t num) 276 { 277 int32_t rc; 278 size_t sz; 279 uint32_t i, nr; 280 struct rte_ring *r; 281 struct ring_elem *elm; 282 void *p; 283 284 *rng = NULL; 285 *data = NULL; 286 287 sz = num * sizeof(*elm); 288 elm = rte_zmalloc(NULL, sz, __alignof__(*elm)); 289 if (elm == NULL) { 290 printf("%s: alloc(%zu) for %u elems data failed", 291 __func__, sz, num); 292 return -ENOMEM; 293 } 294 295 *data = elm; 296 297 /* alloc ring */ 298 nr = 2 * num; 299 sz = rte_ring_get_memsize(nr); 300 r = rte_zmalloc(NULL, sz, __alignof__(*r)); 301 if (r == NULL) { 302 printf("%s: alloc(%zu) for FIFO with %u elems failed", 303 __func__, sz, nr); 304 return -ENOMEM; 305 } 306 307 *rng = r; 308 309 rc = _st_ring_init(r, RING_NAME, nr); 310 if (rc != 0) { 311 printf("%s: _st_ring_init(%p, %u) failed, error: %d(%s)\n", 312 __func__, r, nr, rc, strerror(-rc)); 313 return rc; 314 } 315 316 for (i = 0; i != num; i++) { 317 fill_ring_elm(elm + i, UINT32_MAX); 318 p = elm + i; 319 if (_st_ring_enqueue_bulk(r, &p, 1, NULL) != 1) 320 break; 321 } 322 323 if (i != num) { 324 printf("%s: _st_ring_enqueue(%p, %u) returned %u\n", 325 __func__, r, num, i); 326 return -ENOSPC; 327 } 328 329 return 0; 330 } 331 332 static int 333 test_mt1(int (*test)(void *)) 334 { 335 int32_t rc; 336 uint32_t lc, mc; 337 struct rte_ring *r; 338 void *data; 339 struct lcore_arg arg[RTE_MAX_LCORE]; 340 341 static const struct lcore_stat init_stat = { 342 .op.min_cycle = UINT64_MAX, 343 }; 344 345 rc = mt1_init(&r, &data, RING_SIZE); 346 if (rc != 0) { 347 mt1_fini(r, data); 348 return rc; 349 } 350 351 memset(arg, 0, sizeof(arg)); 352 353 /* launch on all workers */ 354 RTE_LCORE_FOREACH_WORKER(lc) { 355 arg[lc].rng = r; 356 arg[lc].stats = init_stat; 357 rte_eal_remote_launch(test, &arg[lc], lc); 358 } 359 360 /* signal worker to start test */ 361 __atomic_store_n(&wrk_cmd, WRK_CMD_RUN, __ATOMIC_RELEASE); 362 363 rte_delay_us(run_time * US_PER_S); 364 365 /* signal worker to start test */ 366 __atomic_store_n(&wrk_cmd, WRK_CMD_STOP, __ATOMIC_RELEASE); 367 368 /* wait for workers and collect stats. */ 369 mc = rte_lcore_id(); 370 arg[mc].stats = init_stat; 371 372 rc = 0; 373 RTE_LCORE_FOREACH_WORKER(lc) { 374 rc |= rte_eal_wait_lcore(lc); 375 lcore_stat_aggr(&arg[mc].stats, &arg[lc].stats); 376 if (verbose != 0) 377 lcore_stat_dump(stdout, lc, &arg[lc].stats); 378 } 379 380 lcore_stat_dump(stdout, UINT32_MAX, &arg[mc].stats); 381 mt1_fini(r, data); 382 return rc; 383 } 384 385 static const struct test_case tests[] = { 386 { 387 .name = "MT-WRK_ENQ_DEQ-MST_NONE-PRCS", 388 .func = test_mt1, 389 .wfunc = test_worker_prcs, 390 }, 391 { 392 .name = "MT-WRK_ENQ_DEQ-MST_NONE-AVG", 393 .func = test_mt1, 394 .wfunc = test_worker_avg, 395 }, 396 }; 397