xref: /dpdk/app/test/test_ring_perf.c (revision cf435a07)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 
35 #include <stdio.h>
36 #include <inttypes.h>
37 #include <rte_ring.h>
38 #include <rte_cycles.h>
39 #include <rte_launch.h>
40 
41 #include "test.h"
42 
43 /*
44  * Ring
45  * ====
46  *
47  * Measures performance of various operations using rdtsc
48  *  * Empty ring dequeue
49  *  * Enqueue/dequeue of bursts in 1 threads
50  *  * Enqueue/dequeue of bursts in 2 threads
51  */
52 
53 #define RING_NAME "RING_PERF"
54 #define RING_SIZE 4096
55 #define MAX_BURST 32
56 
57 /*
58  * the sizes to enqueue and dequeue in testing
59  * (marked volatile so they won't be seen as compile-time constants)
60  */
61 static const volatile unsigned bulk_sizes[] = { 8, 32 };
62 
63 /* The ring structure used for tests */
64 static struct rte_ring *r;
65 
66 struct lcore_pair {
67 	unsigned c1, c2;
68 };
69 
70 static volatile unsigned lcore_count = 0;
71 
72 /**** Functions to analyse our core mask to get cores for different tests ***/
73 
74 static int
75 get_two_hyperthreads(struct lcore_pair *lcp)
76 {
77 	unsigned id1, id2;
78 	unsigned c1, c2, s1, s2;
79 	RTE_LCORE_FOREACH(id1) {
80 		/* inner loop just re-reads all id's. We could skip the first few
81 		 * elements, but since number of cores is small there is little point
82 		 */
83 		RTE_LCORE_FOREACH(id2) {
84 			if (id1 == id2)
85 				continue;
86 			c1 = lcore_config[id1].core_id;
87 			c2 = lcore_config[id2].core_id;
88 			s1 = lcore_config[id1].socket_id;
89 			s2 = lcore_config[id2].socket_id;
90 			if ((c1 == c2) && (s1 == s2)){
91 				lcp->c1 = id1;
92 				lcp->c2 = id2;
93 				return 0;
94 			}
95 		}
96 	}
97 	return 1;
98 }
99 
100 static int
101 get_two_cores(struct lcore_pair *lcp)
102 {
103 	unsigned id1, id2;
104 	unsigned c1, c2, s1, s2;
105 	RTE_LCORE_FOREACH(id1) {
106 		RTE_LCORE_FOREACH(id2) {
107 			if (id1 == id2)
108 				continue;
109 			c1 = lcore_config[id1].core_id;
110 			c2 = lcore_config[id2].core_id;
111 			s1 = lcore_config[id1].socket_id;
112 			s2 = lcore_config[id2].socket_id;
113 			if ((c1 != c2) && (s1 == s2)){
114 				lcp->c1 = id1;
115 				lcp->c2 = id2;
116 				return 0;
117 			}
118 		}
119 	}
120 	return 1;
121 }
122 
123 static int
124 get_two_sockets(struct lcore_pair *lcp)
125 {
126 	unsigned id1, id2;
127 	unsigned s1, s2;
128 	RTE_LCORE_FOREACH(id1) {
129 		RTE_LCORE_FOREACH(id2) {
130 			if (id1 == id2)
131 				continue;
132 			s1 = lcore_config[id1].socket_id;
133 			s2 = lcore_config[id2].socket_id;
134 			if (s1 != s2){
135 				lcp->c1 = id1;
136 				lcp->c2 = id2;
137 				return 0;
138 			}
139 		}
140 	}
141 	return 1;
142 }
143 
144 /* Get cycle counts for dequeuing from an empty ring. Should be 2 or 3 cycles */
145 static void
146 test_empty_dequeue(void)
147 {
148 	const unsigned iter_shift = 26;
149 	const unsigned iterations = 1<<iter_shift;
150 	unsigned i = 0;
151 	void *burst[MAX_BURST];
152 
153 	const uint64_t sc_start = rte_rdtsc();
154 	for (i = 0; i < iterations; i++)
155 		rte_ring_sc_dequeue_bulk(r, burst, bulk_sizes[0]);
156 	const uint64_t sc_end = rte_rdtsc();
157 
158 	const uint64_t mc_start = rte_rdtsc();
159 	for (i = 0; i < iterations; i++)
160 		rte_ring_mc_dequeue_bulk(r, burst, bulk_sizes[0]);
161 	const uint64_t mc_end = rte_rdtsc();
162 
163 	printf("SC empty dequeue: %.2F\n",
164 			(double)(sc_end-sc_start) / iterations);
165 	printf("MC empty dequeue: %.2F\n",
166 			(double)(mc_end-mc_start) / iterations);
167 }
168 
169 /*
170  * for the separate enqueue and dequeue threads they take in one param
171  * and return two. Input = burst size, output = cycle average for sp/sc & mp/mc
172  */
173 struct thread_params {
174 	unsigned size;        /* input value, the burst size */
175 	double spsc, mpmc;    /* output value, the single or multi timings */
176 };
177 
178 /*
179  * Function that uses rdtsc to measure timing for ring enqueue. Needs pair
180  * thread running dequeue_bulk function
181  */
182 static int
183 enqueue_bulk(void *p)
184 {
185 	const unsigned iter_shift = 23;
186 	const unsigned iterations = 1<<iter_shift;
187 	struct thread_params *params = p;
188 	const unsigned size = params->size;
189 	unsigned i;
190 	void *burst[MAX_BURST] = {0};
191 
192 	if ( __sync_add_and_fetch(&lcore_count, 1) != 2 )
193 		while(lcore_count != 2)
194 			rte_pause();
195 
196 	const uint64_t sp_start = rte_rdtsc();
197 	for (i = 0; i < iterations; i++)
198 		while (rte_ring_sp_enqueue_bulk(r, burst, size) != 0)
199 			rte_pause();
200 	const uint64_t sp_end = rte_rdtsc();
201 
202 	const uint64_t mp_start = rte_rdtsc();
203 	for (i = 0; i < iterations; i++)
204 		while (rte_ring_mp_enqueue_bulk(r, burst, size) != 0)
205 			rte_pause();
206 	const uint64_t mp_end = rte_rdtsc();
207 
208 	params->spsc = ((double)(sp_end - sp_start))/(iterations*size);
209 	params->mpmc = ((double)(mp_end - mp_start))/(iterations*size);
210 	return 0;
211 }
212 
213 /*
214  * Function that uses rdtsc to measure timing for ring dequeue. Needs pair
215  * thread running enqueue_bulk function
216  */
217 static int
218 dequeue_bulk(void *p)
219 {
220 	const unsigned iter_shift = 23;
221 	const unsigned iterations = 1<<iter_shift;
222 	struct thread_params *params = p;
223 	const unsigned size = params->size;
224 	unsigned i;
225 	void *burst[MAX_BURST] = {0};
226 
227 	if ( __sync_add_and_fetch(&lcore_count, 1) != 2 )
228 		while(lcore_count != 2)
229 			rte_pause();
230 
231 	const uint64_t sc_start = rte_rdtsc();
232 	for (i = 0; i < iterations; i++)
233 		while (rte_ring_sc_dequeue_bulk(r, burst, size) != 0)
234 			rte_pause();
235 	const uint64_t sc_end = rte_rdtsc();
236 
237 	const uint64_t mc_start = rte_rdtsc();
238 	for (i = 0; i < iterations; i++)
239 		while (rte_ring_mc_dequeue_bulk(r, burst, size) != 0)
240 			rte_pause();
241 	const uint64_t mc_end = rte_rdtsc();
242 
243 	params->spsc = ((double)(sc_end - sc_start))/(iterations*size);
244 	params->mpmc = ((double)(mc_end - mc_start))/(iterations*size);
245 	return 0;
246 }
247 
248 /*
249  * Function that calls the enqueue and dequeue bulk functions on pairs of cores.
250  * used to measure ring perf between hyperthreads, cores and sockets.
251  */
252 static void
253 run_on_core_pair(struct lcore_pair *cores,
254 		lcore_function_t f1, lcore_function_t f2)
255 {
256 	struct thread_params param1 = {0}, param2 = {0};
257 	unsigned i;
258 	for (i = 0; i < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); i++) {
259 		lcore_count = 0;
260 		param1.size = param2.size = bulk_sizes[i];
261 		if (cores->c1 == rte_get_master_lcore()) {
262 			rte_eal_remote_launch(f2, &param2, cores->c2);
263 			f1(&param1);
264 			rte_eal_wait_lcore(cores->c2);
265 		} else {
266 			rte_eal_remote_launch(f1, &param1, cores->c1);
267 			rte_eal_remote_launch(f2, &param2, cores->c2);
268 			rte_eal_wait_lcore(cores->c1);
269 			rte_eal_wait_lcore(cores->c2);
270 		}
271 		printf("SP/SC bulk enq/dequeue (size: %u): %.2F\n", bulk_sizes[i],
272 				param1.spsc + param2.spsc);
273 		printf("MP/MC bulk enq/dequeue (size: %u): %.2F\n", bulk_sizes[i],
274 				param1.mpmc + param2.mpmc);
275 	}
276 }
277 
278 /*
279  * Test function that determines how long an enqueue + dequeue of a single item
280  * takes on a single lcore. Result is for comparison with the bulk enq+deq.
281  */
282 static void
283 test_single_enqueue_dequeue(void)
284 {
285 	const unsigned iter_shift = 24;
286 	const unsigned iterations = 1<<iter_shift;
287 	unsigned i = 0;
288 	void *burst = NULL;
289 
290 	const uint64_t sc_start = rte_rdtsc();
291 	for (i = 0; i < iterations; i++) {
292 		rte_ring_sp_enqueue(r, burst);
293 		rte_ring_sc_dequeue(r, &burst);
294 	}
295 	const uint64_t sc_end = rte_rdtsc();
296 
297 	const uint64_t mc_start = rte_rdtsc();
298 	for (i = 0; i < iterations; i++) {
299 		rte_ring_mp_enqueue(r, burst);
300 		rte_ring_mc_dequeue(r, &burst);
301 	}
302 	const uint64_t mc_end = rte_rdtsc();
303 
304 	printf("SP/SC single enq/dequeue: %"PRIu64"\n",
305 			(sc_end-sc_start) >> iter_shift);
306 	printf("MP/MC single enq/dequeue: %"PRIu64"\n",
307 			(mc_end-mc_start) >> iter_shift);
308 }
309 
310 /*
311  * Test that does both enqueue and dequeue on a core using the burst() API calls
312  * instead of the bulk() calls used in other tests. Results should be the same
313  * as for the bulk function called on a single lcore.
314  */
315 static void
316 test_burst_enqueue_dequeue(void)
317 {
318 	const unsigned iter_shift = 23;
319 	const unsigned iterations = 1<<iter_shift;
320 	unsigned sz, i = 0;
321 	void *burst[MAX_BURST] = {0};
322 
323 	for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) {
324 		const uint64_t sc_start = rte_rdtsc();
325 		for (i = 0; i < iterations; i++) {
326 			rte_ring_sp_enqueue_burst(r, burst, bulk_sizes[sz]);
327 			rte_ring_sc_dequeue_burst(r, burst, bulk_sizes[sz]);
328 		}
329 		const uint64_t sc_end = rte_rdtsc();
330 
331 		const uint64_t mc_start = rte_rdtsc();
332 		for (i = 0; i < iterations; i++) {
333 			rte_ring_mp_enqueue_burst(r, burst, bulk_sizes[sz]);
334 			rte_ring_mc_dequeue_burst(r, burst, bulk_sizes[sz]);
335 		}
336 		const uint64_t mc_end = rte_rdtsc();
337 
338 		uint64_t mc_avg = ((mc_end-mc_start) >> iter_shift) / bulk_sizes[sz];
339 		uint64_t sc_avg = ((sc_end-sc_start) >> iter_shift) / bulk_sizes[sz];
340 
341 		printf("SP/SC burst enq/dequeue (size: %u): %"PRIu64"\n", bulk_sizes[sz],
342 				sc_avg);
343 		printf("MP/MC burst enq/dequeue (size: %u): %"PRIu64"\n", bulk_sizes[sz],
344 				mc_avg);
345 	}
346 }
347 
348 /* Times enqueue and dequeue on a single lcore */
349 static void
350 test_bulk_enqueue_dequeue(void)
351 {
352 	const unsigned iter_shift = 23;
353 	const unsigned iterations = 1<<iter_shift;
354 	unsigned sz, i = 0;
355 	void *burst[MAX_BURST] = {0};
356 
357 	for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) {
358 		const uint64_t sc_start = rte_rdtsc();
359 		for (i = 0; i < iterations; i++) {
360 			rte_ring_sp_enqueue_bulk(r, burst, bulk_sizes[sz]);
361 			rte_ring_sc_dequeue_bulk(r, burst, bulk_sizes[sz]);
362 		}
363 		const uint64_t sc_end = rte_rdtsc();
364 
365 		const uint64_t mc_start = rte_rdtsc();
366 		for (i = 0; i < iterations; i++) {
367 			rte_ring_mp_enqueue_bulk(r, burst, bulk_sizes[sz]);
368 			rte_ring_mc_dequeue_bulk(r, burst, bulk_sizes[sz]);
369 		}
370 		const uint64_t mc_end = rte_rdtsc();
371 
372 		double sc_avg = ((double)(sc_end-sc_start) /
373 				(iterations * bulk_sizes[sz]));
374 		double mc_avg = ((double)(mc_end-mc_start) /
375 				(iterations * bulk_sizes[sz]));
376 
377 		printf("SP/SC bulk enq/dequeue (size: %u): %.2F\n", bulk_sizes[sz],
378 				sc_avg);
379 		printf("MP/MC bulk enq/dequeue (size: %u): %.2F\n", bulk_sizes[sz],
380 				mc_avg);
381 	}
382 }
383 
384 static int
385 test_ring_perf(void)
386 {
387 	struct lcore_pair cores;
388 	r = rte_ring_create(RING_NAME, RING_SIZE, rte_socket_id(), 0);
389 	if (r == NULL && (r = rte_ring_lookup(RING_NAME)) == NULL)
390 		return -1;
391 
392 	printf("### Testing single element and burst enq/deq ###\n");
393 	test_single_enqueue_dequeue();
394 	test_burst_enqueue_dequeue();
395 
396 	printf("\n### Testing empty dequeue ###\n");
397 	test_empty_dequeue();
398 
399 	printf("\n### Testing using a single lcore ###\n");
400 	test_bulk_enqueue_dequeue();
401 
402 	if (get_two_hyperthreads(&cores) == 0) {
403 		printf("\n### Testing using two hyperthreads ###\n");
404 		run_on_core_pair(&cores, enqueue_bulk, dequeue_bulk);
405 	}
406 	if (get_two_cores(&cores) == 0) {
407 		printf("\n### Testing using two physical cores ###\n");
408 		run_on_core_pair(&cores, enqueue_bulk, dequeue_bulk);
409 	}
410 	if (get_two_sockets(&cores) == 0) {
411 		printf("\n### Testing using two NUMA nodes ###\n");
412 		run_on_core_pair(&cores, enqueue_bulk, dequeue_bulk);
413 	}
414 	return 0;
415 }
416 
417 REGISTER_TEST_COMMAND(ring_perf_autotest, test_ring_perf);
418