xref: /xnu-11215/tools/tests/affinity/pool.c (revision a5e72196)
1 #include <AvailabilityMacros.h>
2 #include <mach/thread_policy.h>
3 #include <mach/mach.h>
4 #include <mach/mach_error.h>
5 #include <mach/mach_time.h>
6 #include <pthread.h>
7 #include <sys/queue.h>
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include <string.h>
11 #include <unistd.h>
12 #include <err.h>
13 
14 /*
15  * Pool is another multithreaded test/benchmarking program to evaluate
16  * affinity set placement in Leopard.
17  *
18  * The basic picture is:
19  *
20  *                  -> producer --                 -> consumer --
21  *       free     /                \    work     /                \
22  *    -> queue --      ...          --> queue --                   --
23  *   |            \                /             \                /  |
24  *   |              -> producer --                 -> consumer --    |
25  *    ---------------------------------------------------------------
26  *
27  *       <---------- "stage" ---------> <---------- "stage" --------->
28  *
29  * There are a series of work stages. Each stage has an input and an output
30  * queue and multiple threads. The first stage is the producer and subsequent
31  * stages are consumers. By defuaut there are 2 stages. There are N producer
32  * and M consumer threads. The are B buffers per producer threads circulating
33  * through the system.
34  *
35  * When affinity is enabled, each producer thread is tagged with an affinity tag
36  * 1 .. N - so each runs on a different L2 cache. When a buffer is queued to
37  * the work queue it is tagged with this affinity. When a consumer dequeues a
38  * work item, it sets its affinity to this tag. Hence consumer threads migrate
39  * to the same affinity set where the data was produced.
40  *
41  * Buffer management uses pthread mutex/condition variables. A thread blocks
42  * when no buffer is available on a queue and it is signaled when a buffer
43  * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>.
44  * The queue management is centralized in a single routine: what queues to
45  * use as input and output and what function to call for processing is
46  * data-driven.
47  */
48 
49 pthread_mutex_t funnel;
50 pthread_cond_t  barrier;
51 
52 uint64_t        timer;
53 int             threads;
54 int             threads_ready = 0;
55 
56 int             iterations = 10000;
57 boolean_t       affinity = FALSE;
58 boolean_t       halting = FALSE;
59 int             verbosity = 1;
60 
61 typedef struct work {
62 	TAILQ_ENTRY(work)       link;
63 	int                     *data;
64 	int                     isize;
65 	int                     tag;
66 	int                     number;
67 } work_t;
68 
69 /*
70  * A work queue, complete with pthread objects for its management
71  */
72 typedef struct work_queue {
73 	pthread_mutex_t         mtx;
74 	pthread_cond_t          cnd;
75 	TAILQ_HEAD(, work)      queue;
76 	unsigned int            waiters;
77 } work_queue_t;
78 
79 /* Worker functions take a integer array and size */
80 typedef void (worker_fn_t)(int *, int);
81 
82 /* This struct controls the function of a stage */
83 #define WORKERS_MAX 10
84 typedef struct {
85 	int                     stagenum;
86 	char                    *name;
87 	worker_fn_t             *fn;
88 	work_queue_t            *input;
89 	work_queue_t            *output;
90 	work_queue_t            bufq;
91 	int                     work_todo;
92 } stage_info_t;
93 
94 /* This defines a worker thread */
95 typedef struct worker_info {
96 	int                     setnum;
97 	stage_info_t            *stage;
98 	pthread_t               thread;
99 } worker_info_t;
100 
101 #define DBG(x...) do {                          \
102 	if (verbosity > 1) {                    \
103 	        pthread_mutex_lock(&funnel);    \
104 	        printf(x);                      \
105 	        pthread_mutex_unlock(&funnel);  \
106 	}                                       \
107 } while (0)
108 
109 #define mutter(x...) do {                       \
110 	if (verbosity > 0) {                    \
111 	        printf(x);                      \
112 	}                                       \
113 } while (0)
114 
115 #define s_if_plural(x)  (((x) > 1) ? "s" : "")
116 
117 static void
usage()118 usage()
119 {
120 	fprintf(stderr,
121 	    "usage: pool [-a]    Turn affinity on (off)\n"
122 	    "            [-b B]  Number of buffers per producer (2)\n"
123 	    "            [-i I]  Number of buffers to produce (10000)\n"
124 	    "            [-s S]  Number of stages (2)\n"
125 	    "            [-p P]  Number of pages per buffer (256=1MB)]\n"
126 	    "            [-w]    Consumer writes data\n"
127 	    "            [-v V]  Verbosity level 0..2 (1)\n"
128 	    "            [N [M]] Number of producer and consumers (2)\n"
129 	    );
130 	exit(1);
131 }
132 
133 /* Trivial producer: write to each byte */
134 void
writer_fn(int * data,int isize)135 writer_fn(int *data, int isize)
136 {
137 	int     i;
138 
139 	for (i = 0; i < isize; i++) {
140 		data[i] = i;
141 	}
142 }
143 
144 /* Trivial consumer: read each byte */
145 void
reader_fn(int * data,int isize)146 reader_fn(int *data, int isize)
147 {
148 	int     i;
149 	int     datum;
150 
151 	for (i = 0; i < isize; i++) {
152 		datum = data[i];
153 	}
154 }
155 
156 /* Consumer reading and writing the buffer */
157 void
reader_writer_fn(int * data,int isize)158 reader_writer_fn(int *data, int isize)
159 {
160 	int     i;
161 
162 	for (i = 0; i < isize; i++) {
163 		data[i] += 1;
164 	}
165 }
166 
167 void
affinity_set(int tag)168 affinity_set(int tag)
169 {
170 	kern_return_t                   ret;
171 	thread_affinity_policy_data_t   policy;
172 	if (affinity) {
173 		policy.affinity_tag = tag;
174 		ret = thread_policy_set(
175 			mach_thread_self(), THREAD_AFFINITY_POLICY,
176 			(thread_policy_t) &policy,
177 			THREAD_AFFINITY_POLICY_COUNT);
178 		if (ret != KERN_SUCCESS) {
179 			printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret);
180 		}
181 	}
182 }
183 
184 /*
185  * This is the central function for every thread.
186  * For each invocation, its role is ets by (a pointer to) a stage_info_t.
187  */
188 void *
manager_fn(void * arg)189 manager_fn(void *arg)
190 {
191 	worker_info_t   *wp = (worker_info_t *) arg;
192 	stage_info_t    *sp = wp->stage;
193 	boolean_t       is_producer = (sp->stagenum == 0);
194 	long            iteration = 0;
195 	int             current_tag = 0;
196 
197 	kern_return_t                   ret;
198 	thread_extended_policy_data_t   epolicy;
199 	epolicy.timeshare = FALSE;
200 	ret = thread_policy_set(
201 		mach_thread_self(), THREAD_EXTENDED_POLICY,
202 		(thread_policy_t) &epolicy,
203 		THREAD_EXTENDED_POLICY_COUNT);
204 	if (ret != KERN_SUCCESS) {
205 		printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret);
206 	}
207 
208 	/*
209 	 * If we're using affinity sets and we're a producer
210 	 * set our tag to by our thread set number.
211 	 */
212 	if (affinity && is_producer) {
213 		affinity_set(wp->setnum);
214 		current_tag = wp->setnum;
215 	}
216 
217 	DBG("Starting %s %d, stage: %d\n", sp->name, wp->setnum, sp->stagenum);
218 
219 	/*
220 	 * Start barrier.
221 	 * The tets thread to get here releases everyone and starts the timer.
222 	 */
223 	pthread_mutex_lock(&funnel);
224 	threads_ready++;
225 	if (threads_ready == threads) {
226 		pthread_mutex_unlock(&funnel);
227 		if (halting) {
228 			printf("  all threads ready for process %d, "
229 			    "hit any key to start", getpid());
230 			fflush(stdout);
231 			(void) getchar();
232 		}
233 		pthread_cond_broadcast(&barrier);
234 		timer = mach_absolute_time();
235 	} else {
236 		pthread_cond_wait(&barrier, &funnel);
237 		pthread_mutex_unlock(&funnel);
238 	}
239 
240 	do {
241 		work_t          *workp;
242 
243 		/*
244 		 * Get a buffer from the input queue.
245 		 * Block if none.
246 		 * Quit if all work done.
247 		 */
248 		pthread_mutex_lock(&sp->input->mtx);
249 		while (1) {
250 			if (sp->work_todo == 0) {
251 				pthread_mutex_unlock(&sp->input->mtx);
252 				goto out;
253 			}
254 			workp = TAILQ_FIRST(&(sp->input->queue));
255 			if (workp != NULL) {
256 				break;
257 			}
258 			DBG("    %s[%d,%d] todo %d waiting for buffer\n",
259 			    sp->name, wp->setnum, sp->stagenum, sp->work_todo);
260 			sp->input->waiters++;
261 			pthread_cond_wait(&sp->input->cnd, &sp->input->mtx);
262 			sp->input->waiters--;
263 		}
264 		TAILQ_REMOVE(&(sp->input->queue), workp, link);
265 		iteration = sp->work_todo--;
266 		pthread_mutex_unlock(&sp->input->mtx);
267 
268 		if (is_producer) {
269 			workp->number = iteration;
270 			workp->tag = wp->setnum;
271 		} else {
272 			if (affinity && current_tag != workp->tag) {
273 				affinity_set(workp->tag);
274 				current_tag = workp->tag;
275 			}
276 		}
277 
278 		DBG("  %s[%d,%d] todo %d work %p data %p\n",
279 		    sp->name, wp->setnum, sp->stagenum, iteration, workp, workp->data);
280 
281 		/* Do our stuff with the buffer */
282 		(void) sp->fn(workp->data, workp->isize);
283 
284 		/*
285 		 * Place the buffer on the input queue of the next stage.
286 		 * Signal waiters if required.
287 		 */
288 		pthread_mutex_lock(&sp->output->mtx);
289 		TAILQ_INSERT_TAIL(&(sp->output->queue), workp, link);
290 		if (sp->output->waiters) {
291 			DBG("    %s[%d,%d] todo %d signaling work\n",
292 			    sp->name, wp->setnum, sp->stagenum, iteration);
293 			pthread_cond_signal(&sp->output->cnd);
294 		}
295 		pthread_mutex_unlock(&sp->output->mtx);
296 	} while (1);
297 
298 out:
299 	pthread_cond_broadcast(&sp->output->cnd);
300 
301 	DBG("Ending %s[%d,%d]\n", sp->name, wp->setnum, sp->stagenum);
302 
303 	return (void *) iteration;
304 }
305 
306 void (*producer_fnp)(int *data, int isize) = &writer_fn;
307 void (*consumer_fnp)(int *data, int isize) = &reader_fn;
308 
309 int
main(int argc,char * argv[])310 main(int argc, char *argv[])
311 {
312 	int                     i;
313 	int                     j;
314 	int                     k;
315 	int                     pages = 256; /* 1MB */
316 	int                     buffers = 2;
317 	int                     producers = 2;
318 	int                     consumers = 2;
319 	int                     stages = 2;
320 	int                     *status;
321 	stage_info_t            *stage_info;
322 	stage_info_t            *sp;
323 	worker_info_t           *worker_info;
324 	worker_info_t           *wp;
325 	kern_return_t           ret;
326 	int                     c;
327 
328 	/* Do switch parsing: */
329 	while ((c = getopt(argc, argv, "ab:i:p:s:twv:")) != -1) {
330 		switch (c) {
331 		case 'a':
332 			affinity = !affinity;
333 			break;
334 		case 'b':
335 			buffers = atoi(optarg);
336 			break;
337 		case 'i':
338 			iterations = atoi(optarg);
339 			break;
340 		case 'p':
341 			pages = atoi(optarg);
342 			break;
343 		case 's':
344 			stages = atoi(optarg);
345 			if (stages >= WORKERS_MAX) {
346 				usage();
347 			}
348 			break;
349 		case 't':
350 			halting = TRUE;
351 			break;
352 		case 'w':
353 			consumer_fnp = &reader_writer_fn;
354 			break;
355 		case 'v':
356 			verbosity = atoi(optarg);
357 			break;
358 		case 'h':
359 		case '?':
360 		default:
361 			usage();
362 		}
363 	}
364 	argc -= optind; argv += optind;
365 	if (argc > 0) {
366 		producers = atoi(*argv);
367 	}
368 	argc--; argv++;
369 	if (argc > 0) {
370 		consumers = atoi(*argv);
371 	}
372 
373 	pthread_mutex_init(&funnel, NULL);
374 	pthread_cond_init(&barrier, NULL);
375 
376 	/*
377 	 * Fire up the worker threads.
378 	 */
379 	threads = consumers * (stages - 1) + producers;
380 	mutter("Launching %d producer%s with %d stage%s of %d consumer%s\n"
381 	    "  with %saffinity, consumer reads%s data\n",
382 	    producers, s_if_plural(producers),
383 	    stages - 1, s_if_plural(stages - 1),
384 	    consumers, s_if_plural(consumers),
385 	    affinity? "": "no ",
386 	    (consumer_fnp == &reader_writer_fn)? " and writes" : "");
387 	if (pages < 256) {
388 		mutter("  %dkB bytes per buffer, ", pages * 4);
389 	} else {
390 		mutter("  %dMB bytes per buffer, ", pages / 256);
391 	}
392 	mutter("%d buffer%s per producer ",
393 	    buffers, s_if_plural(buffers));
394 	if (buffers * pages < 256) {
395 		mutter("(total %dkB)\n", buffers * pages * 4);
396 	} else {
397 		mutter("(total %dMB)\n", buffers * pages / 256);
398 	}
399 	mutter("  processing %d buffer%s...\n",
400 	    iterations, s_if_plural(iterations));
401 
402 	stage_info = (stage_info_t *) malloc(stages * sizeof(stage_info_t));
403 	worker_info = (worker_info_t *) malloc(threads * sizeof(worker_info_t));
404 
405 	/* Set up the queue for the workers of this thread set: */
406 	for (i = 0; i < stages; i++) {
407 		sp = &stage_info[i];
408 		sp->stagenum = i;
409 		pthread_mutex_init(&sp->bufq.mtx, NULL);
410 		pthread_cond_init(&sp->bufq.cnd, NULL);
411 		TAILQ_INIT(&sp->bufq.queue);
412 		sp->bufq.waiters = 0;
413 		if (i == 0) {
414 			sp->fn = producer_fnp;
415 			sp->name = "producer";
416 		} else {
417 			sp->fn = consumer_fnp;
418 			sp->name = "consumer";
419 		}
420 		sp->input = &sp->bufq;
421 		sp->output = &stage_info[(i + 1) % stages].bufq;
422 		stage_info[i].work_todo = iterations;
423 	}
424 
425 	/* Create the producers */
426 	for (i = 0; i < producers; i++) {
427 		work_t  *work_array;
428 		int     *data;
429 		int     isize;
430 
431 		isize = pages * 4096 / sizeof(int);
432 		data = (int *) malloc(buffers * pages * 4096);
433 
434 		/* Set up the empty work buffers */
435 		work_array = (work_t *)  malloc(buffers * sizeof(work_t));
436 		for (j = 0; j < buffers; j++) {
437 			work_array[j].data = data + (isize * j);
438 			work_array[j].isize = isize;
439 			work_array[j].tag = 0;
440 			TAILQ_INSERT_TAIL(&stage_info[0].bufq.queue, &work_array[j], link);
441 			DBG("  empty work item %p for data %p\n",
442 			    &work_array[j], work_array[j].data);
443 		}
444 		wp = &worker_info[i];
445 		wp->setnum = i + 1;
446 		wp->stage = &stage_info[0];
447 		if (ret = pthread_create(&wp->thread,
448 		    NULL,
449 		    &manager_fn,
450 		    (void *) wp)) {
451 			err(1, "pthread_create %d,%d", 0, i);
452 		}
453 	}
454 
455 	/* Create consumers */
456 	for (i = 1; i < stages; i++) {
457 		for (j = 0; j < consumers; j++) {
458 			wp = &worker_info[producers + (consumers * (i - 1)) + j];
459 			wp->setnum = j + 1;
460 			wp->stage = &stage_info[i];
461 			if (ret = pthread_create(&wp->thread,
462 			    NULL,
463 			    &manager_fn,
464 			    (void *) wp)) {
465 				err(1, "pthread_create %d,%d", i, j);
466 			}
467 		}
468 	}
469 
470 	/*
471 	 * We sit back anf wait for the slaves to finish.
472 	 */
473 	for (k = 0; k < threads; k++) {
474 		int     i;
475 		int     j;
476 
477 		wp = &worker_info[k];
478 		if (k < producers) {
479 			i = 0;
480 			j = k;
481 		} else {
482 			i = (k - producers) / consumers;
483 			j = (k - producers) % consumers;
484 		}
485 		if (ret = pthread_join(wp->thread, (void **)&status)) {
486 			err(1, "pthread_join %d,%d", i, j);
487 		}
488 		DBG("Thread %d,%d status %d\n", i, j, status);
489 	}
490 
491 	/*
492 	 * See how long the work took.
493 	 */
494 	timer = mach_absolute_time() - timer;
495 	timer = timer / 1000000ULL;
496 	printf("%d.%03d seconds elapsed.\n",
497 	    (int) (timer / 1000ULL), (int) (timer % 1000ULL));
498 
499 	return 0;
500 }
501