1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) 2015 Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 /*
35  * Some portions of this software is derived from the
36  * https://github.com/halayli/lthread which carrys the following license.
37  *
38  * Copyright (C) 2012, Hasan Alayli <[email protected]>
39  *
40  * Redistribution and use in source and binary forms, with or without
41  * modification, are permitted provided that the following conditions
42  * are met:
43  * 1. Redistributions of source code must retain the above copyright
44  *    notice, this list of conditions and the following disclaimer.
45  * 2. Redistributions in binary form must reproduce the above copyright
46  *    notice, this list of conditions and the following disclaimer in the
47  *    documentation and/or other materials provided with the distribution.
48  *
49  * THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND
50  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
51  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
52  * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
53  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
54  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
55  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
56  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
57  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
58  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
59  * SUCH DAMAGE.
60  */
61 
62 
63 #define RTE_MEM 1
64 
65 #include <stdio.h>
66 #include <stdlib.h>
67 #include <string.h>
68 #include <stdint.h>
69 #include <stddef.h>
70 #include <limits.h>
71 #include <inttypes.h>
72 #include <unistd.h>
73 #include <pthread.h>
74 #include <fcntl.h>
75 #include <sys/time.h>
76 #include <sys/mman.h>
77 #include <sched.h>
78 
79 #include <rte_prefetch.h>
80 #include <rte_per_lcore.h>
81 #include <rte_atomic.h>
82 #include <rte_atomic_64.h>
83 #include <rte_log.h>
84 #include <rte_common.h>
85 #include <rte_branch_prediction.h>
86 
87 #include "lthread_api.h"
88 #include "lthread_int.h"
89 #include "lthread_sched.h"
90 #include "lthread_objcache.h"
91 #include "lthread_timer.h"
92 #include "lthread_mutex.h"
93 #include "lthread_cond.h"
94 #include "lthread_tls.h"
95 #include "lthread_diag.h"
96 
97 /*
98  * This file implements the lthread scheduler
99  * The scheduler is the function lthread_run()
100  * This must be run as the main loop of an EAL thread.
101  *
102  * Currently once a scheduler is created it cannot be destroyed
103  * When a scheduler shuts down it is assumed that the application is terminating
104  */
105 
106 static rte_atomic16_t num_schedulers;
107 static rte_atomic16_t active_schedulers;
108 
109 /* one scheduler per lcore */
110 RTE_DEFINE_PER_LCORE(struct lthread_sched *, this_sched) = NULL;
111 
112 struct lthread_sched *schedcore[LTHREAD_MAX_LCORES];
113 
114 diag_callback diag_cb;
115 
116 uint64_t diag_mask;
117 
118 
119 /* constructor */
120 void lthread_sched_ctor(void) __attribute__ ((constructor));
121 void lthread_sched_ctor(void)
122 {
123 	memset(schedcore, 0, sizeof(schedcore));
124 	rte_atomic16_init(&num_schedulers);
125 	rte_atomic16_set(&num_schedulers, 1);
126 	rte_atomic16_init(&active_schedulers);
127 	rte_atomic16_set(&active_schedulers, 0);
128 	diag_cb = NULL;
129 }
130 
131 
132 enum sched_alloc_phase {
133 	SCHED_ALLOC_OK,
134 	SCHED_ALLOC_QNODE_POOL,
135 	SCHED_ALLOC_READY_QUEUE,
136 	SCHED_ALLOC_PREADY_QUEUE,
137 	SCHED_ALLOC_LTHREAD_CACHE,
138 	SCHED_ALLOC_STACK_CACHE,
139 	SCHED_ALLOC_PERLT_CACHE,
140 	SCHED_ALLOC_TLS_CACHE,
141 	SCHED_ALLOC_COND_CACHE,
142 	SCHED_ALLOC_MUTEX_CACHE,
143 };
144 
145 static int
146 _lthread_sched_alloc_resources(struct lthread_sched *new_sched)
147 {
148 	int alloc_status;
149 
150 	do {
151 		/* Initialize per scheduler queue node pool */
152 		alloc_status = SCHED_ALLOC_QNODE_POOL;
153 		new_sched->qnode_pool =
154 			_qnode_pool_create("qnode pool", LTHREAD_PREALLOC);
155 		if (new_sched->qnode_pool == NULL)
156 			break;
157 
158 		/* Initialize per scheduler local ready queue */
159 		alloc_status = SCHED_ALLOC_READY_QUEUE;
160 		new_sched->ready = _lthread_queue_create("ready queue");
161 		if (new_sched->ready == NULL)
162 			break;
163 
164 		/* Initialize per scheduler local peer ready queue */
165 		alloc_status = SCHED_ALLOC_PREADY_QUEUE;
166 		new_sched->pready = _lthread_queue_create("pready queue");
167 		if (new_sched->pready == NULL)
168 			break;
169 
170 		/* Initialize per scheduler local free lthread cache */
171 		alloc_status = SCHED_ALLOC_LTHREAD_CACHE;
172 		new_sched->lthread_cache =
173 			_lthread_objcache_create("lthread cache",
174 						sizeof(struct lthread),
175 						LTHREAD_PREALLOC);
176 		if (new_sched->lthread_cache == NULL)
177 			break;
178 
179 		/* Initialize per scheduler local free stack cache */
180 		alloc_status = SCHED_ALLOC_STACK_CACHE;
181 		new_sched->stack_cache =
182 			_lthread_objcache_create("stack_cache",
183 						sizeof(struct lthread_stack),
184 						LTHREAD_PREALLOC);
185 		if (new_sched->stack_cache == NULL)
186 			break;
187 
188 		/* Initialize per scheduler local free per lthread data cache */
189 		alloc_status = SCHED_ALLOC_PERLT_CACHE;
190 		new_sched->per_lthread_cache =
191 			_lthread_objcache_create("per_lt cache",
192 						RTE_PER_LTHREAD_SECTION_SIZE,
193 						LTHREAD_PREALLOC);
194 		if (new_sched->per_lthread_cache == NULL)
195 			break;
196 
197 		/* Initialize per scheduler local free tls cache */
198 		alloc_status = SCHED_ALLOC_TLS_CACHE;
199 		new_sched->tls_cache =
200 			_lthread_objcache_create("TLS cache",
201 						sizeof(struct lthread_tls),
202 						LTHREAD_PREALLOC);
203 		if (new_sched->tls_cache == NULL)
204 			break;
205 
206 		/* Initialize per scheduler local free cond var cache */
207 		alloc_status = SCHED_ALLOC_COND_CACHE;
208 		new_sched->cond_cache =
209 			_lthread_objcache_create("cond cache",
210 						sizeof(struct lthread_cond),
211 						LTHREAD_PREALLOC);
212 		if (new_sched->cond_cache == NULL)
213 			break;
214 
215 		/* Initialize per scheduler local free mutex cache */
216 		alloc_status = SCHED_ALLOC_MUTEX_CACHE;
217 		new_sched->mutex_cache =
218 			_lthread_objcache_create("mutex cache",
219 						sizeof(struct lthread_mutex),
220 						LTHREAD_PREALLOC);
221 		if (new_sched->mutex_cache == NULL)
222 			break;
223 
224 		alloc_status = SCHED_ALLOC_OK;
225 	} while (0);
226 
227 	/* roll back on any failure */
228 	switch (alloc_status) {
229 	case SCHED_ALLOC_MUTEX_CACHE:
230 		_lthread_objcache_destroy(new_sched->cond_cache);
231 		/* fall through */
232 	case SCHED_ALLOC_COND_CACHE:
233 		_lthread_objcache_destroy(new_sched->tls_cache);
234 		/* fall through */
235 	case SCHED_ALLOC_TLS_CACHE:
236 		_lthread_objcache_destroy(new_sched->per_lthread_cache);
237 		/* fall through */
238 	case SCHED_ALLOC_PERLT_CACHE:
239 		_lthread_objcache_destroy(new_sched->stack_cache);
240 		/* fall through */
241 	case SCHED_ALLOC_STACK_CACHE:
242 		_lthread_objcache_destroy(new_sched->lthread_cache);
243 		/* fall through */
244 	case SCHED_ALLOC_LTHREAD_CACHE:
245 		_lthread_queue_destroy(new_sched->pready);
246 		/* fall through */
247 	case SCHED_ALLOC_PREADY_QUEUE:
248 		_lthread_queue_destroy(new_sched->ready);
249 		/* fall through */
250 	case SCHED_ALLOC_READY_QUEUE:
251 		_qnode_pool_destroy(new_sched->qnode_pool);
252 		/* fall through */
253 	case SCHED_ALLOC_QNODE_POOL:
254 		/* fall through */
255 	case SCHED_ALLOC_OK:
256 		break;
257 	}
258 	return alloc_status;
259 }
260 
261 
262 /*
263  * Create a scheduler on the current lcore
264  */
265 struct lthread_sched *_lthread_sched_create(size_t stack_size)
266 {
267 	int status;
268 	struct lthread_sched *new_sched;
269 	unsigned lcoreid = rte_lcore_id();
270 
271 	RTE_ASSERT(stack_size <= LTHREAD_MAX_STACK_SIZE);
272 
273 	if (stack_size == 0)
274 		stack_size = LTHREAD_MAX_STACK_SIZE;
275 
276 	new_sched =
277 	     rte_calloc_socket(NULL, 1, sizeof(struct lthread_sched),
278 				RTE_CACHE_LINE_SIZE,
279 				rte_socket_id());
280 	if (new_sched == NULL) {
281 		RTE_LOG(CRIT, LTHREAD,
282 			"Failed to allocate memory for scheduler\n");
283 		return NULL;
284 	}
285 
286 	_lthread_key_pool_init();
287 
288 	new_sched->stack_size = stack_size;
289 	new_sched->birth = rte_rdtsc();
290 	THIS_SCHED = new_sched;
291 
292 	status = _lthread_sched_alloc_resources(new_sched);
293 	if (status != SCHED_ALLOC_OK) {
294 		RTE_LOG(CRIT, LTHREAD,
295 			"Failed to allocate resources for scheduler code = %d\n",
296 			status);
297 		rte_free(new_sched);
298 		return NULL;
299 	}
300 
301 	bzero(&new_sched->ctx, sizeof(struct ctx));
302 
303 	new_sched->lcore_id = lcoreid;
304 
305 	schedcore[lcoreid] = new_sched;
306 
307 	new_sched->run_flag = 1;
308 
309 	DIAG_EVENT(new_sched, LT_DIAG_SCHED_CREATE, rte_lcore_id(), 0);
310 
311 	rte_wmb();
312 	return new_sched;
313 }
314 
315 /*
316  * Set the number of schedulers in the system
317  */
318 int lthread_num_schedulers_set(int num)
319 {
320 	rte_atomic16_set(&num_schedulers, num);
321 	return (int)rte_atomic16_read(&num_schedulers);
322 }
323 
324 /*
325  * Return the number of schedulers active
326  */
327 int lthread_active_schedulers(void)
328 {
329 	return (int)rte_atomic16_read(&active_schedulers);
330 }
331 
332 
333 /**
334  * shutdown the scheduler running on the specified lcore
335  */
336 void lthread_scheduler_shutdown(unsigned lcoreid)
337 {
338 	uint64_t coreid = (uint64_t) lcoreid;
339 
340 	if (coreid < LTHREAD_MAX_LCORES) {
341 		if (schedcore[coreid] != NULL)
342 			schedcore[coreid]->run_flag = 0;
343 	}
344 }
345 
346 /**
347  * shutdown all schedulers
348  */
349 void lthread_scheduler_shutdown_all(void)
350 {
351 	uint64_t i;
352 
353 	/*
354 	 * give time for all schedulers to have started
355 	 * Note we use sched_yield() rather than pthread_yield() to allow
356 	 * for the possibility of a pthread wrapper on lthread_yield(),
357 	 * something that is not possible unless the scheduler is running.
358 	 */
359 	while (rte_atomic16_read(&active_schedulers) <
360 	       rte_atomic16_read(&num_schedulers))
361 		sched_yield();
362 
363 	for (i = 0; i < LTHREAD_MAX_LCORES; i++) {
364 		if (schedcore[i] != NULL)
365 			schedcore[i]->run_flag = 0;
366 	}
367 }
368 
369 /*
370  * Resume a suspended lthread
371  */
372 static inline void
373 _lthread_resume(struct lthread *lt) __attribute__ ((always_inline));
374 static inline void _lthread_resume(struct lthread *lt)
375 {
376 	struct lthread_sched *sched = THIS_SCHED;
377 	struct lthread_stack *s;
378 	uint64_t state = lt->state;
379 #if LTHREAD_DIAG
380 	int init = 0;
381 #endif
382 
383 	sched->current_lthread = lt;
384 
385 	if (state & (BIT(ST_LT_CANCELLED) | BIT(ST_LT_EXITED))) {
386 		/* if detached we can free the thread now */
387 		if (state & BIT(ST_LT_DETACH)) {
388 			_lthread_free(lt);
389 			sched->current_lthread = NULL;
390 			return;
391 		}
392 	}
393 
394 	if (state & BIT(ST_LT_INIT)) {
395 		/* first time this thread has been run */
396 		/* assign thread to this scheduler */
397 		lt->sched = THIS_SCHED;
398 
399 		/* allocate stack */
400 		s = _stack_alloc();
401 
402 		lt->stack_container = s;
403 		_lthread_set_stack(lt, s->stack, s->stack_size);
404 
405 		/* allocate memory for TLS used by this thread */
406 		_lthread_tls_alloc(lt);
407 
408 		lt->state = BIT(ST_LT_READY);
409 #if LTHREAD_DIAG
410 		init = 1;
411 #endif
412 	}
413 
414 	DIAG_EVENT(lt, LT_DIAG_LTHREAD_RESUMED, init, lt);
415 
416 	/* switch to the new thread */
417 	ctx_switch(&lt->ctx, &sched->ctx);
418 
419 	/* If posting to a queue that could be read by another lcore
420 	 * we defer the queue write till now to ensure the context has been
421 	 * saved before the other core tries to resume it
422 	 * This applies to blocking on mutex, cond, and to set_affinity
423 	 */
424 	if (lt->pending_wr_queue != NULL) {
425 		struct lthread_queue *dest = lt->pending_wr_queue;
426 
427 		lt->pending_wr_queue = NULL;
428 
429 		/* queue the current thread to the specified queue */
430 		_lthread_queue_insert_mp(dest, lt);
431 	}
432 
433 	sched->current_lthread = NULL;
434 }
435 
436 /*
437  * Handle sleep timer expiry
438 */
439 void
440 _sched_timer_cb(struct rte_timer *tim, void *arg)
441 {
442 	struct lthread *lt = (struct lthread *) arg;
443 	uint64_t state = lt->state;
444 
445 	DIAG_EVENT(lt, LT_DIAG_LTHREAD_TMR_EXPIRED, &lt->tim, 0);
446 
447 	rte_timer_stop(tim);
448 
449 	if (lt->state & BIT(ST_LT_CANCELLED))
450 		(THIS_SCHED)->nb_blocked_threads--;
451 
452 	lt->state = state | BIT(ST_LT_EXPIRED);
453 	_lthread_resume(lt);
454 	lt->state = state & CLEARBIT(ST_LT_EXPIRED);
455 }
456 
457 
458 
459 /*
460  * Returns 0 if there is a pending job in scheduler or 1 if done and can exit.
461  */
462 static inline int _lthread_sched_isdone(struct lthread_sched *sched)
463 {
464 	return (sched->run_flag == 0) &&
465 			(_lthread_queue_empty(sched->ready)) &&
466 			(_lthread_queue_empty(sched->pready)) &&
467 			(sched->nb_blocked_threads == 0);
468 }
469 
470 /*
471  * Wait for all schedulers to start
472  */
473 static inline void _lthread_schedulers_sync_start(void)
474 {
475 	rte_atomic16_inc(&active_schedulers);
476 
477 	/* wait for lthread schedulers
478 	 * Note we use sched_yield() rather than pthread_yield() to allow
479 	 * for the possibility of a pthread wrapper on lthread_yield(),
480 	 * something that is not possible unless the scheduler is running.
481 	 */
482 	while (rte_atomic16_read(&active_schedulers) <
483 	       rte_atomic16_read(&num_schedulers))
484 		sched_yield();
485 
486 }
487 
488 /*
489  * Wait for all schedulers to stop
490  */
491 static inline void _lthread_schedulers_sync_stop(void)
492 {
493 	rte_atomic16_dec(&active_schedulers);
494 	rte_atomic16_dec(&num_schedulers);
495 
496 	/* wait for schedulers
497 	 * Note we use sched_yield() rather than pthread_yield() to allow
498 	 * for the possibility of a pthread wrapper on lthread_yield(),
499 	 * something that is not possible unless the scheduler is running.
500 	 */
501 	while (rte_atomic16_read(&active_schedulers) > 0)
502 		sched_yield();
503 
504 }
505 
506 
507 /*
508  * Run the lthread scheduler
509  * This loop is the heart of the system
510  */
511 void lthread_run(void)
512 {
513 
514 	struct lthread_sched *sched = THIS_SCHED;
515 	struct lthread *lt = NULL;
516 
517 	RTE_LOG(INFO, LTHREAD,
518 		"starting scheduler %p on lcore %u phys core %u\n",
519 		sched, rte_lcore_id(),
520 		rte_lcore_index(rte_lcore_id()));
521 
522 	/* if more than one, wait for all schedulers to start */
523 	_lthread_schedulers_sync_start();
524 
525 
526 	/*
527 	 * This is the main scheduling loop
528 	 * So long as there are tasks in existence we run this loop.
529 	 * We check for:-
530 	 *   expired timers,
531 	 *   the local ready queue,
532 	 *   and the peer ready queue,
533 	 *
534 	 * and resume lthreads ad infinitum.
535 	 */
536 	while (!_lthread_sched_isdone(sched)) {
537 
538 		rte_timer_manage();
539 
540 		lt = _lthread_queue_poll(sched->ready);
541 		if (lt != NULL)
542 			_lthread_resume(lt);
543 		lt = _lthread_queue_poll(sched->pready);
544 		if (lt != NULL)
545 			_lthread_resume(lt);
546 	}
547 
548 
549 	/* if more than one wait for all schedulers to stop */
550 	_lthread_schedulers_sync_stop();
551 
552 	(THIS_SCHED) = NULL;
553 
554 	RTE_LOG(INFO, LTHREAD,
555 		"stopping scheduler %p on lcore %u phys core %u\n",
556 		sched, rte_lcore_id(),
557 		rte_lcore_index(rte_lcore_id()));
558 	fflush(stdout);
559 }
560 
561 /*
562  * Return the scheduler for this lcore
563  *
564  */
565 struct lthread_sched *_lthread_sched_get(int lcore_id)
566 {
567 	if (lcore_id > LTHREAD_MAX_LCORES)
568 		return NULL;
569 	return schedcore[lcore_id];
570 }
571 
572 /*
573  * migrate the current thread to another scheduler running
574  * on the specified lcore.
575  */
576 int lthread_set_affinity(unsigned lcoreid)
577 {
578 	struct lthread *lt = THIS_LTHREAD;
579 	struct lthread_sched *dest_sched;
580 
581 	if (unlikely(lcoreid > LTHREAD_MAX_LCORES))
582 		return POSIX_ERRNO(EINVAL);
583 
584 
585 	DIAG_EVENT(lt, LT_DIAG_LTHREAD_AFFINITY, lcoreid, 0);
586 
587 	dest_sched = schedcore[lcoreid];
588 
589 	if (unlikely(dest_sched == NULL))
590 		return POSIX_ERRNO(EINVAL);
591 
592 	if (likely(dest_sched != THIS_SCHED)) {
593 		lt->sched = dest_sched;
594 		lt->pending_wr_queue = dest_sched->pready;
595 		_affinitize();
596 		return 0;
597 	}
598 	return 0;
599 }
600