1 /*
2  * ring buffer tester and benchmark
3  *
4  * Copyright (C) 2009 Steven Rostedt <[email protected]>
5  */
6 #include <linux/ring_buffer.h>
7 #include <linux/completion.h>
8 #include <linux/kthread.h>
9 #include <linux/module.h>
10 #include <linux/ktime.h>
11 #include <asm/local.h>
12 
13 struct rb_page {
14 	u64		ts;
15 	local_t		commit;
16 	char		data[4080];
17 };
18 
19 /* run time and sleep time in seconds */
20 #define RUN_TIME	10ULL
21 #define SLEEP_TIME	10
22 
23 /* number of events for writer to wake up the reader */
24 static int wakeup_interval = 100;
25 
26 static int reader_finish;
27 static DECLARE_COMPLETION(read_start);
28 static DECLARE_COMPLETION(read_done);
29 
30 static struct ring_buffer *buffer;
31 static struct task_struct *producer;
32 static struct task_struct *consumer;
33 static unsigned long read;
34 
35 static unsigned int disable_reader;
36 module_param(disable_reader, uint, 0644);
37 MODULE_PARM_DESC(disable_reader, "only run producer");
38 
39 static unsigned int write_iteration = 50;
40 module_param(write_iteration, uint, 0644);
41 MODULE_PARM_DESC(write_iteration, "# of writes between timestamp readings");
42 
43 static int producer_nice = MAX_NICE;
44 static int consumer_nice = MAX_NICE;
45 
46 static int producer_fifo = -1;
47 static int consumer_fifo = -1;
48 
49 module_param(producer_nice, int, 0644);
50 MODULE_PARM_DESC(producer_nice, "nice prio for producer");
51 
52 module_param(consumer_nice, int, 0644);
53 MODULE_PARM_DESC(consumer_nice, "nice prio for consumer");
54 
55 module_param(producer_fifo, int, 0644);
56 MODULE_PARM_DESC(producer_fifo, "fifo prio for producer");
57 
58 module_param(consumer_fifo, int, 0644);
59 MODULE_PARM_DESC(consumer_fifo, "fifo prio for consumer");
60 
61 static int read_events;
62 
63 static int test_error;
64 
65 #define TEST_ERROR()				\
66 	do {					\
67 		if (!test_error) {		\
68 			test_error = 1;		\
69 			WARN_ON(1);		\
70 		}				\
71 	} while (0)
72 
73 enum event_status {
74 	EVENT_FOUND,
75 	EVENT_DROPPED,
76 };
77 
78 static bool break_test(void)
79 {
80 	return test_error || kthread_should_stop();
81 }
82 
83 static enum event_status read_event(int cpu)
84 {
85 	struct ring_buffer_event *event;
86 	int *entry;
87 	u64 ts;
88 
89 	event = ring_buffer_consume(buffer, cpu, &ts, NULL);
90 	if (!event)
91 		return EVENT_DROPPED;
92 
93 	entry = ring_buffer_event_data(event);
94 	if (*entry != cpu) {
95 		TEST_ERROR();
96 		return EVENT_DROPPED;
97 	}
98 
99 	read++;
100 	return EVENT_FOUND;
101 }
102 
103 static enum event_status read_page(int cpu)
104 {
105 	struct ring_buffer_event *event;
106 	struct rb_page *rpage;
107 	unsigned long commit;
108 	void *bpage;
109 	int *entry;
110 	int ret;
111 	int inc;
112 	int i;
113 
114 	bpage = ring_buffer_alloc_read_page(buffer, cpu);
115 	if (!bpage)
116 		return EVENT_DROPPED;
117 
118 	ret = ring_buffer_read_page(buffer, &bpage, PAGE_SIZE, cpu, 1);
119 	if (ret >= 0) {
120 		rpage = bpage;
121 		/* The commit may have missed event flags set, clear them */
122 		commit = local_read(&rpage->commit) & 0xfffff;
123 		for (i = 0; i < commit && !test_error ; i += inc) {
124 
125 			if (i >= (PAGE_SIZE - offsetof(struct rb_page, data))) {
126 				TEST_ERROR();
127 				break;
128 			}
129 
130 			inc = -1;
131 			event = (void *)&rpage->data[i];
132 			switch (event->type_len) {
133 			case RINGBUF_TYPE_PADDING:
134 				/* failed writes may be discarded events */
135 				if (!event->time_delta)
136 					TEST_ERROR();
137 				inc = event->array[0] + 4;
138 				break;
139 			case RINGBUF_TYPE_TIME_EXTEND:
140 				inc = 8;
141 				break;
142 			case 0:
143 				entry = ring_buffer_event_data(event);
144 				if (*entry != cpu) {
145 					TEST_ERROR();
146 					break;
147 				}
148 				read++;
149 				if (!event->array[0]) {
150 					TEST_ERROR();
151 					break;
152 				}
153 				inc = event->array[0] + 4;
154 				break;
155 			default:
156 				entry = ring_buffer_event_data(event);
157 				if (*entry != cpu) {
158 					TEST_ERROR();
159 					break;
160 				}
161 				read++;
162 				inc = ((event->type_len + 1) * 4);
163 			}
164 			if (test_error)
165 				break;
166 
167 			if (inc <= 0) {
168 				TEST_ERROR();
169 				break;
170 			}
171 		}
172 	}
173 	ring_buffer_free_read_page(buffer, bpage);
174 
175 	if (ret < 0)
176 		return EVENT_DROPPED;
177 	return EVENT_FOUND;
178 }
179 
180 static void ring_buffer_consumer(void)
181 {
182 	/* toggle between reading pages and events */
183 	read_events ^= 1;
184 
185 	read = 0;
186 	/*
187 	 * Continue running until the producer specifically asks to stop
188 	 * and is ready for the completion.
189 	 */
190 	while (!READ_ONCE(reader_finish)) {
191 		int found = 1;
192 
193 		while (found && !test_error) {
194 			int cpu;
195 
196 			found = 0;
197 			for_each_online_cpu(cpu) {
198 				enum event_status stat;
199 
200 				if (read_events)
201 					stat = read_event(cpu);
202 				else
203 					stat = read_page(cpu);
204 
205 				if (test_error)
206 					break;
207 
208 				if (stat == EVENT_FOUND)
209 					found = 1;
210 
211 			}
212 		}
213 
214 		/* Wait till the producer wakes us up when there is more data
215 		 * available or when the producer wants us to finish reading.
216 		 */
217 		set_current_state(TASK_INTERRUPTIBLE);
218 		if (reader_finish)
219 			break;
220 
221 		schedule();
222 	}
223 	__set_current_state(TASK_RUNNING);
224 	reader_finish = 0;
225 	complete(&read_done);
226 }
227 
228 static void ring_buffer_producer(void)
229 {
230 	ktime_t start_time, end_time, timeout;
231 	unsigned long long time;
232 	unsigned long long entries;
233 	unsigned long long overruns;
234 	unsigned long missed = 0;
235 	unsigned long hit = 0;
236 	unsigned long avg;
237 	int cnt = 0;
238 
239 	/*
240 	 * Hammer the buffer for 10 secs (this may
241 	 * make the system stall)
242 	 */
243 	trace_printk("Starting ring buffer hammer\n");
244 	start_time = ktime_get();
245 	timeout = ktime_add_ns(start_time, RUN_TIME * NSEC_PER_SEC);
246 	do {
247 		struct ring_buffer_event *event;
248 		int *entry;
249 		int i;
250 
251 		for (i = 0; i < write_iteration; i++) {
252 			event = ring_buffer_lock_reserve(buffer, 10);
253 			if (!event) {
254 				missed++;
255 			} else {
256 				hit++;
257 				entry = ring_buffer_event_data(event);
258 				*entry = smp_processor_id();
259 				ring_buffer_unlock_commit(buffer, event);
260 			}
261 		}
262 		end_time = ktime_get();
263 
264 		cnt++;
265 		if (consumer && !(cnt % wakeup_interval))
266 			wake_up_process(consumer);
267 
268 #ifndef CONFIG_PREEMPT
269 		/*
270 		 * If we are a non preempt kernel, the 10 second run will
271 		 * stop everything while it runs. Instead, we will call
272 		 * cond_resched and also add any time that was lost by a
273 		 * rescedule.
274 		 *
275 		 * Do a cond resched at the same frequency we would wake up
276 		 * the reader.
277 		 */
278 		if (cnt % wakeup_interval)
279 			cond_resched();
280 #endif
281 	} while (ktime_before(end_time, timeout) && !break_test());
282 	trace_printk("End ring buffer hammer\n");
283 
284 	if (consumer) {
285 		/* Init both completions here to avoid races */
286 		init_completion(&read_start);
287 		init_completion(&read_done);
288 		/* the completions must be visible before the finish var */
289 		smp_wmb();
290 		reader_finish = 1;
291 		/* finish var visible before waking up the consumer */
292 		smp_wmb();
293 		wake_up_process(consumer);
294 		wait_for_completion(&read_done);
295 	}
296 
297 	time = ktime_us_delta(end_time, start_time);
298 
299 	entries = ring_buffer_entries(buffer);
300 	overruns = ring_buffer_overruns(buffer);
301 
302 	if (test_error)
303 		trace_printk("ERROR!\n");
304 
305 	if (!disable_reader) {
306 		if (consumer_fifo < 0)
307 			trace_printk("Running Consumer at nice: %d\n",
308 				     consumer_nice);
309 		else
310 			trace_printk("Running Consumer at SCHED_FIFO %d\n",
311 				     consumer_fifo);
312 	}
313 	if (producer_fifo < 0)
314 		trace_printk("Running Producer at nice: %d\n",
315 			     producer_nice);
316 	else
317 		trace_printk("Running Producer at SCHED_FIFO %d\n",
318 			     producer_fifo);
319 
320 	/* Let the user know that the test is running at low priority */
321 	if (producer_fifo < 0 && consumer_fifo < 0 &&
322 	    producer_nice == MAX_NICE && consumer_nice == MAX_NICE)
323 		trace_printk("WARNING!!! This test is running at lowest priority.\n");
324 
325 	trace_printk("Time:     %lld (usecs)\n", time);
326 	trace_printk("Overruns: %lld\n", overruns);
327 	if (disable_reader)
328 		trace_printk("Read:     (reader disabled)\n");
329 	else
330 		trace_printk("Read:     %ld  (by %s)\n", read,
331 			read_events ? "events" : "pages");
332 	trace_printk("Entries:  %lld\n", entries);
333 	trace_printk("Total:    %lld\n", entries + overruns + read);
334 	trace_printk("Missed:   %ld\n", missed);
335 	trace_printk("Hit:      %ld\n", hit);
336 
337 	/* Convert time from usecs to millisecs */
338 	do_div(time, USEC_PER_MSEC);
339 	if (time)
340 		hit /= (long)time;
341 	else
342 		trace_printk("TIME IS ZERO??\n");
343 
344 	trace_printk("Entries per millisec: %ld\n", hit);
345 
346 	if (hit) {
347 		/* Calculate the average time in nanosecs */
348 		avg = NSEC_PER_MSEC / hit;
349 		trace_printk("%ld ns per entry\n", avg);
350 	}
351 
352 	if (missed) {
353 		if (time)
354 			missed /= (long)time;
355 
356 		trace_printk("Total iterations per millisec: %ld\n",
357 			     hit + missed);
358 
359 		/* it is possible that hit + missed will overflow and be zero */
360 		if (!(hit + missed)) {
361 			trace_printk("hit + missed overflowed and totalled zero!\n");
362 			hit--; /* make it non zero */
363 		}
364 
365 		/* Caculate the average time in nanosecs */
366 		avg = NSEC_PER_MSEC / (hit + missed);
367 		trace_printk("%ld ns per entry\n", avg);
368 	}
369 }
370 
371 static void wait_to_die(void)
372 {
373 	set_current_state(TASK_INTERRUPTIBLE);
374 	while (!kthread_should_stop()) {
375 		schedule();
376 		set_current_state(TASK_INTERRUPTIBLE);
377 	}
378 	__set_current_state(TASK_RUNNING);
379 }
380 
381 static int ring_buffer_consumer_thread(void *arg)
382 {
383 	while (!break_test()) {
384 		complete(&read_start);
385 
386 		ring_buffer_consumer();
387 
388 		set_current_state(TASK_INTERRUPTIBLE);
389 		if (break_test())
390 			break;
391 		schedule();
392 	}
393 	__set_current_state(TASK_RUNNING);
394 
395 	if (!kthread_should_stop())
396 		wait_to_die();
397 
398 	return 0;
399 }
400 
401 static int ring_buffer_producer_thread(void *arg)
402 {
403 	while (!break_test()) {
404 		ring_buffer_reset(buffer);
405 
406 		if (consumer) {
407 			wake_up_process(consumer);
408 			wait_for_completion(&read_start);
409 		}
410 
411 		ring_buffer_producer();
412 		if (break_test())
413 			goto out_kill;
414 
415 		trace_printk("Sleeping for 10 secs\n");
416 		set_current_state(TASK_INTERRUPTIBLE);
417 		if (break_test())
418 			goto out_kill;
419 		schedule_timeout(HZ * SLEEP_TIME);
420 	}
421 
422 out_kill:
423 	__set_current_state(TASK_RUNNING);
424 	if (!kthread_should_stop())
425 		wait_to_die();
426 
427 	return 0;
428 }
429 
430 static int __init ring_buffer_benchmark_init(void)
431 {
432 	int ret;
433 
434 	/* make a one meg buffer in overwite mode */
435 	buffer = ring_buffer_alloc(1000000, RB_FL_OVERWRITE);
436 	if (!buffer)
437 		return -ENOMEM;
438 
439 	if (!disable_reader) {
440 		consumer = kthread_create(ring_buffer_consumer_thread,
441 					  NULL, "rb_consumer");
442 		ret = PTR_ERR(consumer);
443 		if (IS_ERR(consumer))
444 			goto out_fail;
445 	}
446 
447 	producer = kthread_run(ring_buffer_producer_thread,
448 			       NULL, "rb_producer");
449 	ret = PTR_ERR(producer);
450 
451 	if (IS_ERR(producer))
452 		goto out_kill;
453 
454 	/*
455 	 * Run them as low-prio background tasks by default:
456 	 */
457 	if (!disable_reader) {
458 		if (consumer_fifo >= 0) {
459 			struct sched_param param = {
460 				.sched_priority = consumer_fifo
461 			};
462 			sched_setscheduler(consumer, SCHED_FIFO, &param);
463 		} else
464 			set_user_nice(consumer, consumer_nice);
465 	}
466 
467 	if (producer_fifo >= 0) {
468 		struct sched_param param = {
469 			.sched_priority = producer_fifo
470 		};
471 		sched_setscheduler(producer, SCHED_FIFO, &param);
472 	} else
473 		set_user_nice(producer, producer_nice);
474 
475 	return 0;
476 
477  out_kill:
478 	if (consumer)
479 		kthread_stop(consumer);
480 
481  out_fail:
482 	ring_buffer_free(buffer);
483 	return ret;
484 }
485 
486 static void __exit ring_buffer_benchmark_exit(void)
487 {
488 	kthread_stop(producer);
489 	if (consumer)
490 		kthread_stop(consumer);
491 	ring_buffer_free(buffer);
492 }
493 
494 module_init(ring_buffer_benchmark_init);
495 module_exit(ring_buffer_benchmark_exit);
496 
497 MODULE_AUTHOR("Steven Rostedt");
498 MODULE_DESCRIPTION("ring_buffer_benchmark");
499 MODULE_LICENSE("GPL");
500