xref: /dpdk/app/test/test_ring_stress_impl.h (revision 987d40a0)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2020 Intel Corporation
3  */
4 
5 #include "test_ring_stress.h"
6 
7 /**
8  * Stress test for ring enqueue/dequeue operations.
9  * Performs the following pattern on each worker:
10  * dequeue/read-write data from the dequeued objects/enqueue.
11  * Serves as both functional and performance test of ring
12  * enqueue/dequeue operations under high contention
13  * (for both over committed and non-over committed scenarios).
14  */
15 
16 #define RING_NAME	"RING_STRESS"
17 #define BULK_NUM	32
18 #define RING_SIZE	(2 * BULK_NUM * RTE_MAX_LCORE)
19 
20 enum {
21 	WRK_CMD_STOP,
22 	WRK_CMD_RUN,
23 };
24 
25 static uint32_t wrk_cmd __rte_cache_aligned = WRK_CMD_STOP;
26 
27 /* test run-time in seconds */
28 static const uint32_t run_time = 60;
29 static const uint32_t verbose;
30 
31 struct lcore_stat {
32 	uint64_t nb_cycle;
33 	struct {
34 		uint64_t nb_call;
35 		uint64_t nb_obj;
36 		uint64_t nb_cycle;
37 		uint64_t max_cycle;
38 		uint64_t min_cycle;
39 	} op;
40 };
41 
42 struct lcore_arg {
43 	struct rte_ring *rng;
44 	struct lcore_stat stats;
45 } __rte_cache_aligned;
46 
47 struct ring_elem {
48 	uint32_t cnt[RTE_CACHE_LINE_SIZE / sizeof(uint32_t)];
49 } __rte_cache_aligned;
50 
51 /*
52  * redefinable functions
53  */
54 static uint32_t
55 _st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n,
56 	uint32_t *avail);
57 
58 static uint32_t
59 _st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n,
60 	uint32_t *free);
61 
62 static int
63 _st_ring_init(struct rte_ring *r, const char *name, uint32_t num);
64 
65 
66 static void
lcore_stat_update(struct lcore_stat * ls,uint64_t call,uint64_t obj,uint64_t tm,int32_t prcs)67 lcore_stat_update(struct lcore_stat *ls, uint64_t call, uint64_t obj,
68 	uint64_t tm, int32_t prcs)
69 {
70 	ls->op.nb_call += call;
71 	ls->op.nb_obj += obj;
72 	ls->op.nb_cycle += tm;
73 	if (prcs) {
74 		ls->op.max_cycle = RTE_MAX(ls->op.max_cycle, tm);
75 		ls->op.min_cycle = RTE_MIN(ls->op.min_cycle, tm);
76 	}
77 }
78 
79 static void
lcore_op_stat_aggr(struct lcore_stat * ms,const struct lcore_stat * ls)80 lcore_op_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls)
81 {
82 
83 	ms->op.nb_call += ls->op.nb_call;
84 	ms->op.nb_obj += ls->op.nb_obj;
85 	ms->op.nb_cycle += ls->op.nb_cycle;
86 	ms->op.max_cycle = RTE_MAX(ms->op.max_cycle, ls->op.max_cycle);
87 	ms->op.min_cycle = RTE_MIN(ms->op.min_cycle, ls->op.min_cycle);
88 }
89 
90 static void
lcore_stat_aggr(struct lcore_stat * ms,const struct lcore_stat * ls)91 lcore_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls)
92 {
93 	ms->nb_cycle = RTE_MAX(ms->nb_cycle, ls->nb_cycle);
94 	lcore_op_stat_aggr(ms, ls);
95 }
96 
97 static void
lcore_stat_dump(FILE * f,uint32_t lc,const struct lcore_stat * ls)98 lcore_stat_dump(FILE *f, uint32_t lc, const struct lcore_stat *ls)
99 {
100 	long double st;
101 
102 	st = (long double)rte_get_timer_hz() / US_PER_S;
103 
104 	if (lc == UINT32_MAX)
105 		fprintf(f, "%s(AGGREGATE)={\n", __func__);
106 	else
107 		fprintf(f, "%s(lcore=%u)={\n", __func__, lc);
108 
109 	fprintf(f, "\tnb_cycle=%" PRIu64 "(%.2Lf usec),\n",
110 		ls->nb_cycle, (long double)ls->nb_cycle / st);
111 
112 	fprintf(f, "\tDEQ+ENQ={\n");
113 
114 	fprintf(f, "\t\tnb_call=%" PRIu64 ",\n", ls->op.nb_call);
115 	fprintf(f, "\t\tnb_obj=%" PRIu64 ",\n", ls->op.nb_obj);
116 	fprintf(f, "\t\tnb_cycle=%" PRIu64 ",\n", ls->op.nb_cycle);
117 	fprintf(f, "\t\tobj/call(avg): %.2Lf\n",
118 		(long double)ls->op.nb_obj / ls->op.nb_call);
119 	fprintf(f, "\t\tcycles/obj(avg): %.2Lf\n",
120 		(long double)ls->op.nb_cycle / ls->op.nb_obj);
121 	fprintf(f, "\t\tcycles/call(avg): %.2Lf\n",
122 		(long double)ls->op.nb_cycle / ls->op.nb_call);
123 
124 	/* if min/max cycles per call stats was collected */
125 	if (ls->op.min_cycle != UINT64_MAX) {
126 		fprintf(f, "\t\tmax cycles/call=%" PRIu64 "(%.2Lf usec),\n",
127 			ls->op.max_cycle,
128 			(long double)ls->op.max_cycle / st);
129 		fprintf(f, "\t\tmin cycles/call=%" PRIu64 "(%.2Lf usec),\n",
130 			ls->op.min_cycle,
131 			(long double)ls->op.min_cycle / st);
132 	}
133 
134 	fprintf(f, "\t},\n");
135 	fprintf(f, "};\n");
136 }
137 
138 static void
fill_ring_elm(struct ring_elem * elm,uint32_t fill)139 fill_ring_elm(struct ring_elem *elm, uint32_t fill)
140 {
141 	uint32_t i;
142 
143 	for (i = 0; i != RTE_DIM(elm->cnt); i++)
144 		elm->cnt[i] = fill;
145 }
146 
147 static int32_t
check_updt_elem(struct ring_elem * elm[],uint32_t num,const struct ring_elem * check,const struct ring_elem * fill)148 check_updt_elem(struct ring_elem *elm[], uint32_t num,
149 	const struct ring_elem *check, const struct ring_elem *fill)
150 {
151 	uint32_t i;
152 
153 	static rte_spinlock_t dump_lock;
154 
155 	for (i = 0; i != num; i++) {
156 		if (memcmp(check, elm[i], sizeof(*check)) != 0) {
157 			rte_spinlock_lock(&dump_lock);
158 			printf("%s(lc=%u, num=%u) failed at %u-th iter, "
159 				"offending object: %p\n",
160 				__func__, rte_lcore_id(), num, i, elm[i]);
161 			rte_memdump(stdout, "expected", check, sizeof(*check));
162 			rte_memdump(stdout, "result", elm[i], sizeof(*elm[i]));
163 			rte_spinlock_unlock(&dump_lock);
164 			return -EINVAL;
165 		}
166 		memcpy(elm[i], fill, sizeof(*elm[i]));
167 	}
168 
169 	return 0;
170 }
171 
172 static int
check_ring_op(uint32_t exp,uint32_t res,uint32_t lc,const char * fname,const char * opname)173 check_ring_op(uint32_t exp, uint32_t res, uint32_t lc,
174 	const char *fname, const char *opname)
175 {
176 	if (exp != res) {
177 		printf("%s(lc=%u) failure: %s expected: %u, returned %u\n",
178 			fname, lc, opname, exp, res);
179 		return -ENOSPC;
180 	}
181 	return 0;
182 }
183 
184 static int
test_worker(void * arg,const char * fname,int32_t prcs)185 test_worker(void *arg, const char *fname, int32_t prcs)
186 {
187 	int32_t rc;
188 	uint32_t lc, n, num;
189 	uint64_t cl, tm0, tm1;
190 	struct lcore_arg *la;
191 	struct ring_elem def_elm, loc_elm;
192 	struct ring_elem *obj[2 * BULK_NUM];
193 
194 	la = arg;
195 	lc = rte_lcore_id();
196 
197 	fill_ring_elm(&def_elm, UINT32_MAX);
198 	fill_ring_elm(&loc_elm, lc);
199 
200 	/* Acquire ordering is not required as the main is not
201 	 * really releasing any data through 'wrk_cmd' to
202 	 * the worker.
203 	 */
204 	while (__atomic_load_n(&wrk_cmd, __ATOMIC_RELAXED) != WRK_CMD_RUN)
205 		rte_pause();
206 
207 	cl = rte_rdtsc_precise();
208 
209 	do {
210 		/* num in interval [7/8, 11/8] of BULK_NUM */
211 		num = 7 * BULK_NUM / 8 + rte_rand() % (BULK_NUM / 2);
212 
213 		/* reset all pointer values */
214 		memset(obj, 0, sizeof(obj));
215 
216 		/* dequeue num elems */
217 		tm0 = (prcs != 0) ? rte_rdtsc_precise() : 0;
218 		n = _st_ring_dequeue_bulk(la->rng, (void **)obj, num, NULL);
219 		tm0 = (prcs != 0) ? rte_rdtsc_precise() - tm0 : 0;
220 
221 		/* check return value and objects */
222 		rc = check_ring_op(num, n, lc, fname,
223 			RTE_STR(_st_ring_dequeue_bulk));
224 		if (rc == 0)
225 			rc = check_updt_elem(obj, num, &def_elm, &loc_elm);
226 		if (rc != 0)
227 			break;
228 
229 		/* enqueue num elems */
230 		rte_compiler_barrier();
231 		rc = check_updt_elem(obj, num, &loc_elm, &def_elm);
232 		if (rc != 0)
233 			break;
234 
235 		tm1 = (prcs != 0) ? rte_rdtsc_precise() : 0;
236 		n = _st_ring_enqueue_bulk(la->rng, (void **)obj, num, NULL);
237 		tm1 = (prcs != 0) ? rte_rdtsc_precise() - tm1 : 0;
238 
239 		/* check return value */
240 		rc = check_ring_op(num, n, lc, fname,
241 			RTE_STR(_st_ring_enqueue_bulk));
242 		if (rc != 0)
243 			break;
244 
245 		lcore_stat_update(&la->stats, 1, num, tm0 + tm1, prcs);
246 
247 	} while (__atomic_load_n(&wrk_cmd, __ATOMIC_RELAXED) == WRK_CMD_RUN);
248 
249 	cl = rte_rdtsc_precise() - cl;
250 	if (prcs == 0)
251 		lcore_stat_update(&la->stats, 0, 0, cl, 0);
252 	la->stats.nb_cycle = cl;
253 	return rc;
254 }
255 static int
test_worker_prcs(void * arg)256 test_worker_prcs(void *arg)
257 {
258 	return test_worker(arg, __func__, 1);
259 }
260 
261 static int
test_worker_avg(void * arg)262 test_worker_avg(void *arg)
263 {
264 	return test_worker(arg, __func__, 0);
265 }
266 
267 static void
mt1_fini(struct rte_ring * rng,void * data)268 mt1_fini(struct rte_ring *rng, void *data)
269 {
270 	rte_free(rng);
271 	rte_free(data);
272 }
273 
274 static int
mt1_init(struct rte_ring ** rng,void ** data,uint32_t num)275 mt1_init(struct rte_ring **rng, void **data, uint32_t num)
276 {
277 	int32_t rc;
278 	size_t sz;
279 	uint32_t i, nr;
280 	struct rte_ring *r;
281 	struct ring_elem *elm;
282 	void *p;
283 
284 	*rng = NULL;
285 	*data = NULL;
286 
287 	sz = num * sizeof(*elm);
288 	elm = rte_zmalloc(NULL, sz, __alignof__(*elm));
289 	if (elm == NULL) {
290 		printf("%s: alloc(%zu) for %u elems data failed",
291 			__func__, sz, num);
292 		return -ENOMEM;
293 	}
294 
295 	*data = elm;
296 
297 	/* alloc ring */
298 	nr = 2 * num;
299 	sz = rte_ring_get_memsize(nr);
300 	r = rte_zmalloc(NULL, sz, __alignof__(*r));
301 	if (r == NULL) {
302 		printf("%s: alloc(%zu) for FIFO with %u elems failed",
303 			__func__, sz, nr);
304 		return -ENOMEM;
305 	}
306 
307 	*rng = r;
308 
309 	rc = _st_ring_init(r, RING_NAME, nr);
310 	if (rc != 0) {
311 		printf("%s: _st_ring_init(%p, %u) failed, error: %d(%s)\n",
312 			__func__, r, nr, rc, strerror(-rc));
313 		return rc;
314 	}
315 
316 	for (i = 0; i != num; i++) {
317 		fill_ring_elm(elm + i, UINT32_MAX);
318 		p = elm + i;
319 		if (_st_ring_enqueue_bulk(r, &p, 1, NULL) != 1)
320 			break;
321 	}
322 
323 	if (i != num) {
324 		printf("%s: _st_ring_enqueue(%p, %u) returned %u\n",
325 			__func__, r, num, i);
326 		return -ENOSPC;
327 	}
328 
329 	return 0;
330 }
331 
332 static int
test_mt1(int (* test)(void *))333 test_mt1(int (*test)(void *))
334 {
335 	int32_t rc;
336 	uint32_t lc, mc;
337 	struct rte_ring *r;
338 	void *data;
339 	struct lcore_arg arg[RTE_MAX_LCORE];
340 
341 	static const struct lcore_stat init_stat = {
342 		.op.min_cycle = UINT64_MAX,
343 	};
344 
345 	rc = mt1_init(&r, &data, RING_SIZE);
346 	if (rc != 0) {
347 		mt1_fini(r, data);
348 		return rc;
349 	}
350 
351 	memset(arg, 0, sizeof(arg));
352 
353 	/* launch on all workers */
354 	RTE_LCORE_FOREACH_WORKER(lc) {
355 		arg[lc].rng = r;
356 		arg[lc].stats = init_stat;
357 		rte_eal_remote_launch(test, &arg[lc], lc);
358 	}
359 
360 	/* signal worker to start test */
361 	__atomic_store_n(&wrk_cmd, WRK_CMD_RUN, __ATOMIC_RELEASE);
362 
363 	rte_delay_us(run_time * US_PER_S);
364 
365 	/* signal worker to start test */
366 	__atomic_store_n(&wrk_cmd, WRK_CMD_STOP, __ATOMIC_RELEASE);
367 
368 	/* wait for workers and collect stats. */
369 	mc = rte_lcore_id();
370 	arg[mc].stats = init_stat;
371 
372 	rc = 0;
373 	RTE_LCORE_FOREACH_WORKER(lc) {
374 		rc |= rte_eal_wait_lcore(lc);
375 		lcore_stat_aggr(&arg[mc].stats, &arg[lc].stats);
376 		if (verbose != 0)
377 			lcore_stat_dump(stdout, lc, &arg[lc].stats);
378 	}
379 
380 	lcore_stat_dump(stdout, UINT32_MAX, &arg[mc].stats);
381 	mt1_fini(r, data);
382 	return rc;
383 }
384 
385 static const struct test_case tests[] = {
386 	{
387 		.name = "MT-WRK_ENQ_DEQ-MST_NONE-PRCS",
388 		.func = test_mt1,
389 		.wfunc = test_worker_prcs,
390 	},
391 	{
392 		.name = "MT-WRK_ENQ_DEQ-MST_NONE-AVG",
393 		.func = test_mt1,
394 		.wfunc = test_worker_avg,
395 	},
396 };
397