1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) 2015 Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 /*
35  * Some portions of this software is derived from the
36  * https://github.com/halayli/lthread which carrys the following license.
37  *
38  * Copyright (C) 2012, Hasan Alayli <[email protected]>
39  *
40  * Redistribution and use in source and binary forms, with or without
41  * modification, are permitted provided that the following conditions
42  * are met:
43  * 1. Redistributions of source code must retain the above copyright
44  *    notice, this list of conditions and the following disclaimer.
45  * 2. Redistributions in binary form must reproduce the above copyright
46  *    notice, this list of conditions and the following disclaimer in the
47  *    documentation and/or other materials provided with the distribution.
48  *
49  * THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND
50  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
51  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
52  * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
53  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
54  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
55  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
56  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
57  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
58  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
59  * SUCH DAMAGE.
60  */
61 
62 
63 #define RTE_MEM 1
64 
65 #include <stdio.h>
66 #include <stdlib.h>
67 #include <string.h>
68 #include <stdint.h>
69 #include <stddef.h>
70 #include <limits.h>
71 #include <inttypes.h>
72 #include <unistd.h>
73 #include <pthread.h>
74 #include <fcntl.h>
75 #include <sys/time.h>
76 #include <sys/mman.h>
77 #include <sched.h>
78 
79 #include <rte_prefetch.h>
80 #include <rte_per_lcore.h>
81 #include <rte_atomic.h>
82 #include <rte_atomic_64.h>
83 #include <rte_log.h>
84 #include <rte_common.h>
85 #include <rte_branch_prediction.h>
86 
87 #include "lthread_api.h"
88 #include "lthread_int.h"
89 #include "lthread_sched.h"
90 #include "lthread_objcache.h"
91 #include "lthread_timer.h"
92 #include "lthread_mutex.h"
93 #include "lthread_cond.h"
94 #include "lthread_tls.h"
95 #include "lthread_diag.h"
96 
97 /*
98  * This file implements the lthread scheduler
99  * The scheduler is the function lthread_run()
100  * This must be run as the main loop of an EAL thread.
101  *
102  * Currently once a scheduler is created it cannot be destroyed
103  * When a scheduler shuts down it is assumed that the application is terminating
104  */
105 
106 static rte_atomic16_t num_schedulers;
107 static rte_atomic16_t active_schedulers;
108 
109 /* one scheduler per lcore */
110 RTE_DEFINE_PER_LCORE(struct lthread_sched *, this_sched) = NULL;
111 
112 struct lthread_sched *schedcore[LTHREAD_MAX_LCORES];
113 
114 diag_callback diag_cb;
115 
116 uint64_t diag_mask;
117 
118 
119 /* constructor */
120 RTE_INIT(lthread_sched_ctor)
121 {
122 	memset(schedcore, 0, sizeof(schedcore));
123 	rte_atomic16_init(&num_schedulers);
124 	rte_atomic16_set(&num_schedulers, 1);
125 	rte_atomic16_init(&active_schedulers);
126 	rte_atomic16_set(&active_schedulers, 0);
127 	diag_cb = NULL;
128 }
129 
130 
131 enum sched_alloc_phase {
132 	SCHED_ALLOC_OK,
133 	SCHED_ALLOC_QNODE_POOL,
134 	SCHED_ALLOC_READY_QUEUE,
135 	SCHED_ALLOC_PREADY_QUEUE,
136 	SCHED_ALLOC_LTHREAD_CACHE,
137 	SCHED_ALLOC_STACK_CACHE,
138 	SCHED_ALLOC_PERLT_CACHE,
139 	SCHED_ALLOC_TLS_CACHE,
140 	SCHED_ALLOC_COND_CACHE,
141 	SCHED_ALLOC_MUTEX_CACHE,
142 };
143 
144 static int
145 _lthread_sched_alloc_resources(struct lthread_sched *new_sched)
146 {
147 	int alloc_status;
148 
149 	do {
150 		/* Initialize per scheduler queue node pool */
151 		alloc_status = SCHED_ALLOC_QNODE_POOL;
152 		new_sched->qnode_pool =
153 			_qnode_pool_create("qnode pool", LTHREAD_PREALLOC);
154 		if (new_sched->qnode_pool == NULL)
155 			break;
156 
157 		/* Initialize per scheduler local ready queue */
158 		alloc_status = SCHED_ALLOC_READY_QUEUE;
159 		new_sched->ready = _lthread_queue_create("ready queue");
160 		if (new_sched->ready == NULL)
161 			break;
162 
163 		/* Initialize per scheduler local peer ready queue */
164 		alloc_status = SCHED_ALLOC_PREADY_QUEUE;
165 		new_sched->pready = _lthread_queue_create("pready queue");
166 		if (new_sched->pready == NULL)
167 			break;
168 
169 		/* Initialize per scheduler local free lthread cache */
170 		alloc_status = SCHED_ALLOC_LTHREAD_CACHE;
171 		new_sched->lthread_cache =
172 			_lthread_objcache_create("lthread cache",
173 						sizeof(struct lthread),
174 						LTHREAD_PREALLOC);
175 		if (new_sched->lthread_cache == NULL)
176 			break;
177 
178 		/* Initialize per scheduler local free stack cache */
179 		alloc_status = SCHED_ALLOC_STACK_CACHE;
180 		new_sched->stack_cache =
181 			_lthread_objcache_create("stack_cache",
182 						sizeof(struct lthread_stack),
183 						LTHREAD_PREALLOC);
184 		if (new_sched->stack_cache == NULL)
185 			break;
186 
187 		/* Initialize per scheduler local free per lthread data cache */
188 		alloc_status = SCHED_ALLOC_PERLT_CACHE;
189 		new_sched->per_lthread_cache =
190 			_lthread_objcache_create("per_lt cache",
191 						RTE_PER_LTHREAD_SECTION_SIZE,
192 						LTHREAD_PREALLOC);
193 		if (new_sched->per_lthread_cache == NULL)
194 			break;
195 
196 		/* Initialize per scheduler local free tls cache */
197 		alloc_status = SCHED_ALLOC_TLS_CACHE;
198 		new_sched->tls_cache =
199 			_lthread_objcache_create("TLS cache",
200 						sizeof(struct lthread_tls),
201 						LTHREAD_PREALLOC);
202 		if (new_sched->tls_cache == NULL)
203 			break;
204 
205 		/* Initialize per scheduler local free cond var cache */
206 		alloc_status = SCHED_ALLOC_COND_CACHE;
207 		new_sched->cond_cache =
208 			_lthread_objcache_create("cond cache",
209 						sizeof(struct lthread_cond),
210 						LTHREAD_PREALLOC);
211 		if (new_sched->cond_cache == NULL)
212 			break;
213 
214 		/* Initialize per scheduler local free mutex cache */
215 		alloc_status = SCHED_ALLOC_MUTEX_CACHE;
216 		new_sched->mutex_cache =
217 			_lthread_objcache_create("mutex cache",
218 						sizeof(struct lthread_mutex),
219 						LTHREAD_PREALLOC);
220 		if (new_sched->mutex_cache == NULL)
221 			break;
222 
223 		alloc_status = SCHED_ALLOC_OK;
224 	} while (0);
225 
226 	/* roll back on any failure */
227 	switch (alloc_status) {
228 	case SCHED_ALLOC_MUTEX_CACHE:
229 		_lthread_objcache_destroy(new_sched->cond_cache);
230 		/* fall through */
231 	case SCHED_ALLOC_COND_CACHE:
232 		_lthread_objcache_destroy(new_sched->tls_cache);
233 		/* fall through */
234 	case SCHED_ALLOC_TLS_CACHE:
235 		_lthread_objcache_destroy(new_sched->per_lthread_cache);
236 		/* fall through */
237 	case SCHED_ALLOC_PERLT_CACHE:
238 		_lthread_objcache_destroy(new_sched->stack_cache);
239 		/* fall through */
240 	case SCHED_ALLOC_STACK_CACHE:
241 		_lthread_objcache_destroy(new_sched->lthread_cache);
242 		/* fall through */
243 	case SCHED_ALLOC_LTHREAD_CACHE:
244 		_lthread_queue_destroy(new_sched->pready);
245 		/* fall through */
246 	case SCHED_ALLOC_PREADY_QUEUE:
247 		_lthread_queue_destroy(new_sched->ready);
248 		/* fall through */
249 	case SCHED_ALLOC_READY_QUEUE:
250 		_qnode_pool_destroy(new_sched->qnode_pool);
251 		/* fall through */
252 	case SCHED_ALLOC_QNODE_POOL:
253 		/* fall through */
254 	case SCHED_ALLOC_OK:
255 		break;
256 	}
257 	return alloc_status;
258 }
259 
260 
261 /*
262  * Create a scheduler on the current lcore
263  */
264 struct lthread_sched *_lthread_sched_create(size_t stack_size)
265 {
266 	int status;
267 	struct lthread_sched *new_sched;
268 	unsigned lcoreid = rte_lcore_id();
269 
270 	RTE_ASSERT(stack_size <= LTHREAD_MAX_STACK_SIZE);
271 
272 	if (stack_size == 0)
273 		stack_size = LTHREAD_MAX_STACK_SIZE;
274 
275 	new_sched =
276 	     rte_calloc_socket(NULL, 1, sizeof(struct lthread_sched),
277 				RTE_CACHE_LINE_SIZE,
278 				rte_socket_id());
279 	if (new_sched == NULL) {
280 		RTE_LOG(CRIT, LTHREAD,
281 			"Failed to allocate memory for scheduler\n");
282 		return NULL;
283 	}
284 
285 	_lthread_key_pool_init();
286 
287 	new_sched->stack_size = stack_size;
288 	new_sched->birth = rte_rdtsc();
289 	THIS_SCHED = new_sched;
290 
291 	status = _lthread_sched_alloc_resources(new_sched);
292 	if (status != SCHED_ALLOC_OK) {
293 		RTE_LOG(CRIT, LTHREAD,
294 			"Failed to allocate resources for scheduler code = %d\n",
295 			status);
296 		rte_free(new_sched);
297 		return NULL;
298 	}
299 
300 	bzero(&new_sched->ctx, sizeof(struct ctx));
301 
302 	new_sched->lcore_id = lcoreid;
303 
304 	schedcore[lcoreid] = new_sched;
305 
306 	new_sched->run_flag = 1;
307 
308 	DIAG_EVENT(new_sched, LT_DIAG_SCHED_CREATE, rte_lcore_id(), 0);
309 
310 	rte_wmb();
311 	return new_sched;
312 }
313 
314 /*
315  * Set the number of schedulers in the system
316  */
317 int lthread_num_schedulers_set(int num)
318 {
319 	rte_atomic16_set(&num_schedulers, num);
320 	return (int)rte_atomic16_read(&num_schedulers);
321 }
322 
323 /*
324  * Return the number of schedulers active
325  */
326 int lthread_active_schedulers(void)
327 {
328 	return (int)rte_atomic16_read(&active_schedulers);
329 }
330 
331 
332 /**
333  * shutdown the scheduler running on the specified lcore
334  */
335 void lthread_scheduler_shutdown(unsigned lcoreid)
336 {
337 	uint64_t coreid = (uint64_t) lcoreid;
338 
339 	if (coreid < LTHREAD_MAX_LCORES) {
340 		if (schedcore[coreid] != NULL)
341 			schedcore[coreid]->run_flag = 0;
342 	}
343 }
344 
345 /**
346  * shutdown all schedulers
347  */
348 void lthread_scheduler_shutdown_all(void)
349 {
350 	uint64_t i;
351 
352 	/*
353 	 * give time for all schedulers to have started
354 	 * Note we use sched_yield() rather than pthread_yield() to allow
355 	 * for the possibility of a pthread wrapper on lthread_yield(),
356 	 * something that is not possible unless the scheduler is running.
357 	 */
358 	while (rte_atomic16_read(&active_schedulers) <
359 	       rte_atomic16_read(&num_schedulers))
360 		sched_yield();
361 
362 	for (i = 0; i < LTHREAD_MAX_LCORES; i++) {
363 		if (schedcore[i] != NULL)
364 			schedcore[i]->run_flag = 0;
365 	}
366 }
367 
368 /*
369  * Resume a suspended lthread
370  */
371 static __rte_always_inline void
372 _lthread_resume(struct lthread *lt);
373 static inline void _lthread_resume(struct lthread *lt)
374 {
375 	struct lthread_sched *sched = THIS_SCHED;
376 	struct lthread_stack *s;
377 	uint64_t state = lt->state;
378 #if LTHREAD_DIAG
379 	int init = 0;
380 #endif
381 
382 	sched->current_lthread = lt;
383 
384 	if (state & (BIT(ST_LT_CANCELLED) | BIT(ST_LT_EXITED))) {
385 		/* if detached we can free the thread now */
386 		if (state & BIT(ST_LT_DETACH)) {
387 			_lthread_free(lt);
388 			sched->current_lthread = NULL;
389 			return;
390 		}
391 	}
392 
393 	if (state & BIT(ST_LT_INIT)) {
394 		/* first time this thread has been run */
395 		/* assign thread to this scheduler */
396 		lt->sched = THIS_SCHED;
397 
398 		/* allocate stack */
399 		s = _stack_alloc();
400 
401 		lt->stack_container = s;
402 		_lthread_set_stack(lt, s->stack, s->stack_size);
403 
404 		/* allocate memory for TLS used by this thread */
405 		_lthread_tls_alloc(lt);
406 
407 		lt->state = BIT(ST_LT_READY);
408 #if LTHREAD_DIAG
409 		init = 1;
410 #endif
411 	}
412 
413 	DIAG_EVENT(lt, LT_DIAG_LTHREAD_RESUMED, init, lt);
414 
415 	/* switch to the new thread */
416 	ctx_switch(&lt->ctx, &sched->ctx);
417 
418 	/* If posting to a queue that could be read by another lcore
419 	 * we defer the queue write till now to ensure the context has been
420 	 * saved before the other core tries to resume it
421 	 * This applies to blocking on mutex, cond, and to set_affinity
422 	 */
423 	if (lt->pending_wr_queue != NULL) {
424 		struct lthread_queue *dest = lt->pending_wr_queue;
425 
426 		lt->pending_wr_queue = NULL;
427 
428 		/* queue the current thread to the specified queue */
429 		_lthread_queue_insert_mp(dest, lt);
430 	}
431 
432 	sched->current_lthread = NULL;
433 }
434 
435 /*
436  * Handle sleep timer expiry
437 */
438 void
439 _sched_timer_cb(struct rte_timer *tim, void *arg)
440 {
441 	struct lthread *lt = (struct lthread *) arg;
442 	uint64_t state = lt->state;
443 
444 	DIAG_EVENT(lt, LT_DIAG_LTHREAD_TMR_EXPIRED, &lt->tim, 0);
445 
446 	rte_timer_stop(tim);
447 
448 	if (lt->state & BIT(ST_LT_CANCELLED))
449 		(THIS_SCHED)->nb_blocked_threads--;
450 
451 	lt->state = state | BIT(ST_LT_EXPIRED);
452 	_lthread_resume(lt);
453 	lt->state = state & CLEARBIT(ST_LT_EXPIRED);
454 }
455 
456 
457 
458 /*
459  * Returns 0 if there is a pending job in scheduler or 1 if done and can exit.
460  */
461 static inline int _lthread_sched_isdone(struct lthread_sched *sched)
462 {
463 	return (sched->run_flag == 0) &&
464 			(_lthread_queue_empty(sched->ready)) &&
465 			(_lthread_queue_empty(sched->pready)) &&
466 			(sched->nb_blocked_threads == 0);
467 }
468 
469 /*
470  * Wait for all schedulers to start
471  */
472 static inline void _lthread_schedulers_sync_start(void)
473 {
474 	rte_atomic16_inc(&active_schedulers);
475 
476 	/* wait for lthread schedulers
477 	 * Note we use sched_yield() rather than pthread_yield() to allow
478 	 * for the possibility of a pthread wrapper on lthread_yield(),
479 	 * something that is not possible unless the scheduler is running.
480 	 */
481 	while (rte_atomic16_read(&active_schedulers) <
482 	       rte_atomic16_read(&num_schedulers))
483 		sched_yield();
484 
485 }
486 
487 /*
488  * Wait for all schedulers to stop
489  */
490 static inline void _lthread_schedulers_sync_stop(void)
491 {
492 	rte_atomic16_dec(&active_schedulers);
493 	rte_atomic16_dec(&num_schedulers);
494 
495 	/* wait for schedulers
496 	 * Note we use sched_yield() rather than pthread_yield() to allow
497 	 * for the possibility of a pthread wrapper on lthread_yield(),
498 	 * something that is not possible unless the scheduler is running.
499 	 */
500 	while (rte_atomic16_read(&active_schedulers) > 0)
501 		sched_yield();
502 
503 }
504 
505 
506 /*
507  * Run the lthread scheduler
508  * This loop is the heart of the system
509  */
510 void lthread_run(void)
511 {
512 
513 	struct lthread_sched *sched = THIS_SCHED;
514 	struct lthread *lt = NULL;
515 
516 	RTE_LOG(INFO, LTHREAD,
517 		"starting scheduler %p on lcore %u phys core %u\n",
518 		sched, rte_lcore_id(),
519 		rte_lcore_index(rte_lcore_id()));
520 
521 	/* if more than one, wait for all schedulers to start */
522 	_lthread_schedulers_sync_start();
523 
524 
525 	/*
526 	 * This is the main scheduling loop
527 	 * So long as there are tasks in existence we run this loop.
528 	 * We check for:-
529 	 *   expired timers,
530 	 *   the local ready queue,
531 	 *   and the peer ready queue,
532 	 *
533 	 * and resume lthreads ad infinitum.
534 	 */
535 	while (!_lthread_sched_isdone(sched)) {
536 
537 		rte_timer_manage();
538 
539 		lt = _lthread_queue_poll(sched->ready);
540 		if (lt != NULL)
541 			_lthread_resume(lt);
542 		lt = _lthread_queue_poll(sched->pready);
543 		if (lt != NULL)
544 			_lthread_resume(lt);
545 	}
546 
547 
548 	/* if more than one wait for all schedulers to stop */
549 	_lthread_schedulers_sync_stop();
550 
551 	(THIS_SCHED) = NULL;
552 
553 	RTE_LOG(INFO, LTHREAD,
554 		"stopping scheduler %p on lcore %u phys core %u\n",
555 		sched, rte_lcore_id(),
556 		rte_lcore_index(rte_lcore_id()));
557 	fflush(stdout);
558 }
559 
560 /*
561  * Return the scheduler for this lcore
562  *
563  */
564 struct lthread_sched *_lthread_sched_get(unsigned int lcore_id)
565 {
566 	struct lthread_sched *res = NULL;
567 
568 	if (lcore_id < LTHREAD_MAX_LCORES)
569 		res = schedcore[lcore_id];
570 
571 	return res;
572 }
573 
574 /*
575  * migrate the current thread to another scheduler running
576  * on the specified lcore.
577  */
578 int lthread_set_affinity(unsigned lcoreid)
579 {
580 	struct lthread *lt = THIS_LTHREAD;
581 	struct lthread_sched *dest_sched;
582 
583 	if (unlikely(lcoreid >= LTHREAD_MAX_LCORES))
584 		return POSIX_ERRNO(EINVAL);
585 
586 	DIAG_EVENT(lt, LT_DIAG_LTHREAD_AFFINITY, lcoreid, 0);
587 
588 	dest_sched = schedcore[lcoreid];
589 
590 	if (unlikely(dest_sched == NULL))
591 		return POSIX_ERRNO(EINVAL);
592 
593 	if (likely(dest_sched != THIS_SCHED)) {
594 		lt->sched = dest_sched;
595 		lt->pending_wr_queue = dest_sched->pready;
596 		_affinitize();
597 		return 0;
598 	}
599 	return 0;
600 }
601