1 /*
2  * SPDX-License-Identifier: BSD-3-Clause
3  * Copyright 2015 Intel Corporation.
4  * Copyright 2012 Hasan Alayli <[email protected]>
5  */
6 
7 #define RTE_MEM 1
8 
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <string.h>
12 #include <stdint.h>
13 #include <stddef.h>
14 #include <limits.h>
15 #include <inttypes.h>
16 #include <unistd.h>
17 #include <pthread.h>
18 #include <fcntl.h>
19 #include <sys/time.h>
20 #include <sys/mman.h>
21 #include <sched.h>
22 
23 #include <rte_prefetch.h>
24 #include <rte_per_lcore.h>
25 #include <rte_atomic.h>
26 #include <rte_atomic_64.h>
27 #include <rte_log.h>
28 #include <rte_common.h>
29 #include <rte_branch_prediction.h>
30 
31 #include "lthread_api.h"
32 #include "lthread_int.h"
33 #include "lthread_sched.h"
34 #include "lthread_objcache.h"
35 #include "lthread_timer.h"
36 #include "lthread_mutex.h"
37 #include "lthread_cond.h"
38 #include "lthread_tls.h"
39 #include "lthread_diag.h"
40 
41 /*
42  * This file implements the lthread scheduler
43  * The scheduler is the function lthread_run()
44  * This must be run as the main loop of an EAL thread.
45  *
46  * Currently once a scheduler is created it cannot be destroyed
47  * When a scheduler shuts down it is assumed that the application is terminating
48  */
49 
50 static rte_atomic16_t num_schedulers;
51 static rte_atomic16_t active_schedulers;
52 
53 /* one scheduler per lcore */
54 RTE_DEFINE_PER_LCORE(struct lthread_sched *, this_sched) = NULL;
55 
56 struct lthread_sched *schedcore[LTHREAD_MAX_LCORES];
57 
58 diag_callback diag_cb;
59 
60 uint64_t diag_mask;
61 
62 
63 /* constructor */
RTE_INIT(lthread_sched_ctor)64 RTE_INIT(lthread_sched_ctor)
65 {
66 	memset(schedcore, 0, sizeof(schedcore));
67 	rte_atomic16_init(&num_schedulers);
68 	rte_atomic16_set(&num_schedulers, 1);
69 	rte_atomic16_init(&active_schedulers);
70 	rte_atomic16_set(&active_schedulers, 0);
71 	diag_cb = NULL;
72 }
73 
74 
75 enum sched_alloc_phase {
76 	SCHED_ALLOC_OK,
77 	SCHED_ALLOC_QNODE_POOL,
78 	SCHED_ALLOC_READY_QUEUE,
79 	SCHED_ALLOC_PREADY_QUEUE,
80 	SCHED_ALLOC_LTHREAD_CACHE,
81 	SCHED_ALLOC_STACK_CACHE,
82 	SCHED_ALLOC_PERLT_CACHE,
83 	SCHED_ALLOC_TLS_CACHE,
84 	SCHED_ALLOC_COND_CACHE,
85 	SCHED_ALLOC_MUTEX_CACHE,
86 };
87 
88 static int
_lthread_sched_alloc_resources(struct lthread_sched * new_sched)89 _lthread_sched_alloc_resources(struct lthread_sched *new_sched)
90 {
91 	int alloc_status;
92 
93 	do {
94 		/* Initialize per scheduler queue node pool */
95 		alloc_status = SCHED_ALLOC_QNODE_POOL;
96 		new_sched->qnode_pool =
97 			_qnode_pool_create("qnode pool", LTHREAD_PREALLOC);
98 		if (new_sched->qnode_pool == NULL)
99 			break;
100 
101 		/* Initialize per scheduler local ready queue */
102 		alloc_status = SCHED_ALLOC_READY_QUEUE;
103 		new_sched->ready = _lthread_queue_create("ready queue");
104 		if (new_sched->ready == NULL)
105 			break;
106 
107 		/* Initialize per scheduler local peer ready queue */
108 		alloc_status = SCHED_ALLOC_PREADY_QUEUE;
109 		new_sched->pready = _lthread_queue_create("pready queue");
110 		if (new_sched->pready == NULL)
111 			break;
112 
113 		/* Initialize per scheduler local free lthread cache */
114 		alloc_status = SCHED_ALLOC_LTHREAD_CACHE;
115 		new_sched->lthread_cache =
116 			_lthread_objcache_create("lthread cache",
117 						sizeof(struct lthread),
118 						LTHREAD_PREALLOC);
119 		if (new_sched->lthread_cache == NULL)
120 			break;
121 
122 		/* Initialize per scheduler local free stack cache */
123 		alloc_status = SCHED_ALLOC_STACK_CACHE;
124 		new_sched->stack_cache =
125 			_lthread_objcache_create("stack_cache",
126 						sizeof(struct lthread_stack),
127 						LTHREAD_PREALLOC);
128 		if (new_sched->stack_cache == NULL)
129 			break;
130 
131 		/* Initialize per scheduler local free per lthread data cache */
132 		alloc_status = SCHED_ALLOC_PERLT_CACHE;
133 		new_sched->per_lthread_cache =
134 			_lthread_objcache_create("per_lt cache",
135 						RTE_PER_LTHREAD_SECTION_SIZE,
136 						LTHREAD_PREALLOC);
137 		if (new_sched->per_lthread_cache == NULL)
138 			break;
139 
140 		/* Initialize per scheduler local free tls cache */
141 		alloc_status = SCHED_ALLOC_TLS_CACHE;
142 		new_sched->tls_cache =
143 			_lthread_objcache_create("TLS cache",
144 						sizeof(struct lthread_tls),
145 						LTHREAD_PREALLOC);
146 		if (new_sched->tls_cache == NULL)
147 			break;
148 
149 		/* Initialize per scheduler local free cond var cache */
150 		alloc_status = SCHED_ALLOC_COND_CACHE;
151 		new_sched->cond_cache =
152 			_lthread_objcache_create("cond cache",
153 						sizeof(struct lthread_cond),
154 						LTHREAD_PREALLOC);
155 		if (new_sched->cond_cache == NULL)
156 			break;
157 
158 		/* Initialize per scheduler local free mutex cache */
159 		alloc_status = SCHED_ALLOC_MUTEX_CACHE;
160 		new_sched->mutex_cache =
161 			_lthread_objcache_create("mutex cache",
162 						sizeof(struct lthread_mutex),
163 						LTHREAD_PREALLOC);
164 		if (new_sched->mutex_cache == NULL)
165 			break;
166 
167 		alloc_status = SCHED_ALLOC_OK;
168 	} while (0);
169 
170 	/* roll back on any failure */
171 	switch (alloc_status) {
172 	case SCHED_ALLOC_MUTEX_CACHE:
173 		_lthread_objcache_destroy(new_sched->cond_cache);
174 		/* fall through */
175 	case SCHED_ALLOC_COND_CACHE:
176 		_lthread_objcache_destroy(new_sched->tls_cache);
177 		/* fall through */
178 	case SCHED_ALLOC_TLS_CACHE:
179 		_lthread_objcache_destroy(new_sched->per_lthread_cache);
180 		/* fall through */
181 	case SCHED_ALLOC_PERLT_CACHE:
182 		_lthread_objcache_destroy(new_sched->stack_cache);
183 		/* fall through */
184 	case SCHED_ALLOC_STACK_CACHE:
185 		_lthread_objcache_destroy(new_sched->lthread_cache);
186 		/* fall through */
187 	case SCHED_ALLOC_LTHREAD_CACHE:
188 		_lthread_queue_destroy(new_sched->pready);
189 		/* fall through */
190 	case SCHED_ALLOC_PREADY_QUEUE:
191 		_lthread_queue_destroy(new_sched->ready);
192 		/* fall through */
193 	case SCHED_ALLOC_READY_QUEUE:
194 		_qnode_pool_destroy(new_sched->qnode_pool);
195 		/* fall through */
196 	case SCHED_ALLOC_QNODE_POOL:
197 		/* fall through */
198 	case SCHED_ALLOC_OK:
199 		break;
200 	}
201 	return alloc_status;
202 }
203 
204 
205 /*
206  * Create a scheduler on the current lcore
207  */
_lthread_sched_create(size_t stack_size)208 struct lthread_sched *_lthread_sched_create(size_t stack_size)
209 {
210 	int status;
211 	struct lthread_sched *new_sched;
212 	unsigned lcoreid = rte_lcore_id();
213 
214 	RTE_ASSERT(stack_size <= LTHREAD_MAX_STACK_SIZE);
215 
216 	if (stack_size == 0)
217 		stack_size = LTHREAD_MAX_STACK_SIZE;
218 
219 	new_sched =
220 	     rte_calloc_socket(NULL, 1, sizeof(struct lthread_sched),
221 				RTE_CACHE_LINE_SIZE,
222 				rte_socket_id());
223 	if (new_sched == NULL) {
224 		RTE_LOG(CRIT, LTHREAD,
225 			"Failed to allocate memory for scheduler\n");
226 		return NULL;
227 	}
228 
229 	_lthread_key_pool_init();
230 
231 	new_sched->stack_size = stack_size;
232 	new_sched->birth = rte_rdtsc();
233 	THIS_SCHED = new_sched;
234 
235 	status = _lthread_sched_alloc_resources(new_sched);
236 	if (status != SCHED_ALLOC_OK) {
237 		RTE_LOG(CRIT, LTHREAD,
238 			"Failed to allocate resources for scheduler code = %d\n",
239 			status);
240 		rte_free(new_sched);
241 		return NULL;
242 	}
243 
244 	bzero(&new_sched->ctx, sizeof(struct ctx));
245 
246 	new_sched->lcore_id = lcoreid;
247 
248 	schedcore[lcoreid] = new_sched;
249 
250 	new_sched->run_flag = 1;
251 
252 	DIAG_EVENT(new_sched, LT_DIAG_SCHED_CREATE, rte_lcore_id(), 0);
253 
254 	rte_wmb();
255 	return new_sched;
256 }
257 
258 /*
259  * Set the number of schedulers in the system
260  */
lthread_num_schedulers_set(int num)261 int lthread_num_schedulers_set(int num)
262 {
263 	rte_atomic16_set(&num_schedulers, num);
264 	return (int)rte_atomic16_read(&num_schedulers);
265 }
266 
267 /*
268  * Return the number of schedulers active
269  */
lthread_active_schedulers(void)270 int lthread_active_schedulers(void)
271 {
272 	return (int)rte_atomic16_read(&active_schedulers);
273 }
274 
275 
276 /**
277  * shutdown the scheduler running on the specified lcore
278  */
lthread_scheduler_shutdown(unsigned lcoreid)279 void lthread_scheduler_shutdown(unsigned lcoreid)
280 {
281 	uint64_t coreid = (uint64_t) lcoreid;
282 
283 	if (coreid < LTHREAD_MAX_LCORES) {
284 		if (schedcore[coreid] != NULL)
285 			schedcore[coreid]->run_flag = 0;
286 	}
287 }
288 
289 /**
290  * shutdown all schedulers
291  */
lthread_scheduler_shutdown_all(void)292 void lthread_scheduler_shutdown_all(void)
293 {
294 	uint64_t i;
295 
296 	/*
297 	 * give time for all schedulers to have started
298 	 * Note we use sched_yield() rather than pthread_yield() to allow
299 	 * for the possibility of a pthread wrapper on lthread_yield(),
300 	 * something that is not possible unless the scheduler is running.
301 	 */
302 	while (rte_atomic16_read(&active_schedulers) <
303 	       rte_atomic16_read(&num_schedulers))
304 		sched_yield();
305 
306 	for (i = 0; i < LTHREAD_MAX_LCORES; i++) {
307 		if (schedcore[i] != NULL)
308 			schedcore[i]->run_flag = 0;
309 	}
310 }
311 
312 /*
313  * Resume a suspended lthread
314  */
315 static __rte_always_inline void
316 _lthread_resume(struct lthread *lt);
_lthread_resume(struct lthread * lt)317 static inline void _lthread_resume(struct lthread *lt)
318 {
319 	struct lthread_sched *sched = THIS_SCHED;
320 	struct lthread_stack *s;
321 	uint64_t state = lt->state;
322 #if LTHREAD_DIAG
323 	int init = 0;
324 #endif
325 
326 	sched->current_lthread = lt;
327 
328 	if (state & (BIT(ST_LT_CANCELLED) | BIT(ST_LT_EXITED))) {
329 		/* if detached we can free the thread now */
330 		if (state & BIT(ST_LT_DETACH)) {
331 			_lthread_free(lt);
332 			sched->current_lthread = NULL;
333 			return;
334 		}
335 	}
336 
337 	if (state & BIT(ST_LT_INIT)) {
338 		/* first time this thread has been run */
339 		/* assign thread to this scheduler */
340 		lt->sched = THIS_SCHED;
341 
342 		/* allocate stack */
343 		s = _stack_alloc();
344 
345 		lt->stack_container = s;
346 		_lthread_set_stack(lt, s->stack, s->stack_size);
347 
348 		/* allocate memory for TLS used by this thread */
349 		_lthread_tls_alloc(lt);
350 
351 		lt->state = BIT(ST_LT_READY);
352 #if LTHREAD_DIAG
353 		init = 1;
354 #endif
355 	}
356 
357 	DIAG_EVENT(lt, LT_DIAG_LTHREAD_RESUMED, init, lt);
358 
359 	/* switch to the new thread */
360 	ctx_switch(&lt->ctx, &sched->ctx);
361 
362 	/* If posting to a queue that could be read by another lcore
363 	 * we defer the queue write till now to ensure the context has been
364 	 * saved before the other core tries to resume it
365 	 * This applies to blocking on mutex, cond, and to set_affinity
366 	 */
367 	if (lt->pending_wr_queue != NULL) {
368 		struct lthread_queue *dest = lt->pending_wr_queue;
369 
370 		lt->pending_wr_queue = NULL;
371 
372 		/* queue the current thread to the specified queue */
373 		_lthread_queue_insert_mp(dest, lt);
374 	}
375 
376 	sched->current_lthread = NULL;
377 }
378 
379 /*
380  * Handle sleep timer expiry
381 */
382 void
_sched_timer_cb(struct rte_timer * tim,void * arg)383 _sched_timer_cb(struct rte_timer *tim, void *arg)
384 {
385 	struct lthread *lt = (struct lthread *) arg;
386 	uint64_t state = lt->state;
387 
388 	DIAG_EVENT(lt, LT_DIAG_LTHREAD_TMR_EXPIRED, &lt->tim, 0);
389 
390 	rte_timer_stop(tim);
391 
392 	if (lt->state & BIT(ST_LT_CANCELLED))
393 		(THIS_SCHED)->nb_blocked_threads--;
394 
395 	lt->state = state | BIT(ST_LT_EXPIRED);
396 	_lthread_resume(lt);
397 	lt->state = state & CLEARBIT(ST_LT_EXPIRED);
398 }
399 
400 
401 
402 /*
403  * Returns 0 if there is a pending job in scheduler or 1 if done and can exit.
404  */
_lthread_sched_isdone(struct lthread_sched * sched)405 static inline int _lthread_sched_isdone(struct lthread_sched *sched)
406 {
407 	return (sched->run_flag == 0) &&
408 			(_lthread_queue_empty(sched->ready)) &&
409 			(_lthread_queue_empty(sched->pready)) &&
410 			(sched->nb_blocked_threads == 0);
411 }
412 
413 /*
414  * Wait for all schedulers to start
415  */
_lthread_schedulers_sync_start(void)416 static inline void _lthread_schedulers_sync_start(void)
417 {
418 	rte_atomic16_inc(&active_schedulers);
419 
420 	/* wait for lthread schedulers
421 	 * Note we use sched_yield() rather than pthread_yield() to allow
422 	 * for the possibility of a pthread wrapper on lthread_yield(),
423 	 * something that is not possible unless the scheduler is running.
424 	 */
425 	while (rte_atomic16_read(&active_schedulers) <
426 	       rte_atomic16_read(&num_schedulers))
427 		sched_yield();
428 
429 }
430 
431 /*
432  * Wait for all schedulers to stop
433  */
_lthread_schedulers_sync_stop(void)434 static inline void _lthread_schedulers_sync_stop(void)
435 {
436 	rte_atomic16_dec(&active_schedulers);
437 	rte_atomic16_dec(&num_schedulers);
438 
439 	/* wait for schedulers
440 	 * Note we use sched_yield() rather than pthread_yield() to allow
441 	 * for the possibility of a pthread wrapper on lthread_yield(),
442 	 * something that is not possible unless the scheduler is running.
443 	 */
444 	while (rte_atomic16_read(&active_schedulers) > 0)
445 		sched_yield();
446 
447 }
448 
449 
450 /*
451  * Run the lthread scheduler
452  * This loop is the heart of the system
453  */
lthread_run(void)454 void lthread_run(void)
455 {
456 
457 	struct lthread_sched *sched = THIS_SCHED;
458 	struct lthread *lt = NULL;
459 
460 	RTE_LOG(INFO, LTHREAD,
461 		"starting scheduler %p on lcore %u phys core %u\n",
462 		sched, rte_lcore_id(),
463 		rte_lcore_index(rte_lcore_id()));
464 
465 	/* if more than one, wait for all schedulers to start */
466 	_lthread_schedulers_sync_start();
467 
468 
469 	/*
470 	 * This is the main scheduling loop
471 	 * So long as there are tasks in existence we run this loop.
472 	 * We check for:-
473 	 *   expired timers,
474 	 *   the local ready queue,
475 	 *   and the peer ready queue,
476 	 *
477 	 * and resume lthreads ad infinitum.
478 	 */
479 	while (!_lthread_sched_isdone(sched)) {
480 
481 		rte_timer_manage();
482 
483 		lt = _lthread_queue_poll(sched->ready);
484 		if (lt != NULL)
485 			_lthread_resume(lt);
486 		lt = _lthread_queue_poll(sched->pready);
487 		if (lt != NULL)
488 			_lthread_resume(lt);
489 	}
490 
491 
492 	/* if more than one wait for all schedulers to stop */
493 	_lthread_schedulers_sync_stop();
494 
495 	(THIS_SCHED) = NULL;
496 
497 	RTE_LOG(INFO, LTHREAD,
498 		"stopping scheduler %p on lcore %u phys core %u\n",
499 		sched, rte_lcore_id(),
500 		rte_lcore_index(rte_lcore_id()));
501 	fflush(stdout);
502 }
503 
504 /*
505  * Return the scheduler for this lcore
506  *
507  */
_lthread_sched_get(unsigned int lcore_id)508 struct lthread_sched *_lthread_sched_get(unsigned int lcore_id)
509 {
510 	struct lthread_sched *res = NULL;
511 
512 	if (lcore_id < LTHREAD_MAX_LCORES)
513 		res = schedcore[lcore_id];
514 
515 	return res;
516 }
517 
518 /*
519  * migrate the current thread to another scheduler running
520  * on the specified lcore.
521  */
lthread_set_affinity(unsigned lcoreid)522 int lthread_set_affinity(unsigned lcoreid)
523 {
524 	struct lthread *lt = THIS_LTHREAD;
525 	struct lthread_sched *dest_sched;
526 
527 	if (unlikely(lcoreid >= LTHREAD_MAX_LCORES))
528 		return POSIX_ERRNO(EINVAL);
529 
530 	DIAG_EVENT(lt, LT_DIAG_LTHREAD_AFFINITY, lcoreid, 0);
531 
532 	dest_sched = schedcore[lcoreid];
533 
534 	if (unlikely(dest_sched == NULL))
535 		return POSIX_ERRNO(EINVAL);
536 
537 	if (likely(dest_sched != THIS_SCHED)) {
538 		lt->sched = dest_sched;
539 		lt->pending_wr_queue = dest_sched->pready;
540 		_affinitize();
541 		return 0;
542 	}
543 	return 0;
544 }
545