xref: /f-stack/dpdk/lib/librte_rcu/rte_rcu_qsbr.c (revision 2d9fd380)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2018-2020 Arm Limited
4  */
5 
6 #include <stdio.h>
7 #include <string.h>
8 #include <stdint.h>
9 #include <inttypes.h>
10 #include <errno.h>
11 
12 #include <rte_common.h>
13 #include <rte_log.h>
14 #include <rte_memory.h>
15 #include <rte_malloc.h>
16 #include <rte_eal.h>
17 #include <rte_atomic.h>
18 #include <rte_per_lcore.h>
19 #include <rte_lcore.h>
20 #include <rte_errno.h>
21 #include <rte_ring_elem.h>
22 
23 #include "rte_rcu_qsbr.h"
24 #include "rcu_qsbr_pvt.h"
25 
26 /* Get the memory size of QSBR variable */
27 size_t
rte_rcu_qsbr_get_memsize(uint32_t max_threads)28 rte_rcu_qsbr_get_memsize(uint32_t max_threads)
29 {
30 	size_t sz;
31 
32 	if (max_threads == 0) {
33 		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
34 			"%s(): Invalid max_threads %u\n",
35 			__func__, max_threads);
36 		rte_errno = EINVAL;
37 
38 		return 1;
39 	}
40 
41 	sz = sizeof(struct rte_rcu_qsbr);
42 
43 	/* Add the size of quiescent state counter array */
44 	sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;
45 
46 	/* Add the size of the registered thread ID bitmap array */
47 	sz += __RTE_QSBR_THRID_ARRAY_SIZE(max_threads);
48 
49 	return sz;
50 }
51 
52 /* Initialize a quiescent state variable */
53 int
rte_rcu_qsbr_init(struct rte_rcu_qsbr * v,uint32_t max_threads)54 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
55 {
56 	size_t sz;
57 
58 	if (v == NULL) {
59 		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
60 			"%s(): Invalid input parameter\n", __func__);
61 		rte_errno = EINVAL;
62 
63 		return 1;
64 	}
65 
66 	sz = rte_rcu_qsbr_get_memsize(max_threads);
67 	if (sz == 1)
68 		return 1;
69 
70 	/* Set all the threads to offline */
71 	memset(v, 0, sz);
72 	v->max_threads = max_threads;
73 	v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,
74 			__RTE_QSBR_THRID_ARRAY_ELM_SIZE) /
75 			__RTE_QSBR_THRID_ARRAY_ELM_SIZE;
76 	v->token = __RTE_QSBR_CNT_INIT;
77 	v->acked_token = __RTE_QSBR_CNT_INIT - 1;
78 
79 	return 0;
80 }
81 
82 /* Register a reader thread to report its quiescent state
83  * on a QS variable.
84  */
85 int
rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr * v,unsigned int thread_id)86 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
87 {
88 	unsigned int i, id, success;
89 	uint64_t old_bmap, new_bmap;
90 
91 	if (v == NULL || thread_id >= v->max_threads) {
92 		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
93 			"%s(): Invalid input parameter\n", __func__);
94 		rte_errno = EINVAL;
95 
96 		return 1;
97 	}
98 
99 	__RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
100 				v->qsbr_cnt[thread_id].lock_cnt);
101 
102 	id = thread_id & __RTE_QSBR_THRID_MASK;
103 	i = thread_id >> __RTE_QSBR_THRID_INDEX_SHIFT;
104 
105 	/* Make sure that the counter for registered threads does not
106 	 * go out of sync. Hence, additional checks are required.
107 	 */
108 	/* Check if the thread is already registered */
109 	old_bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
110 					__ATOMIC_RELAXED);
111 	if (old_bmap & 1UL << id)
112 		return 0;
113 
114 	do {
115 		new_bmap = old_bmap | (1UL << id);
116 		success = __atomic_compare_exchange(
117 					__RTE_QSBR_THRID_ARRAY_ELM(v, i),
118 					&old_bmap, &new_bmap, 0,
119 					__ATOMIC_RELEASE, __ATOMIC_RELAXED);
120 
121 		if (success)
122 			__atomic_fetch_add(&v->num_threads,
123 						1, __ATOMIC_RELAXED);
124 		else if (old_bmap & (1UL << id))
125 			/* Someone else registered this thread.
126 			 * Counter should not be incremented.
127 			 */
128 			return 0;
129 	} while (success == 0);
130 
131 	return 0;
132 }
133 
134 /* Remove a reader thread, from the list of threads reporting their
135  * quiescent state on a QS variable.
136  */
137 int
rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr * v,unsigned int thread_id)138 rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
139 {
140 	unsigned int i, id, success;
141 	uint64_t old_bmap, new_bmap;
142 
143 	if (v == NULL || thread_id >= v->max_threads) {
144 		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
145 			"%s(): Invalid input parameter\n", __func__);
146 		rte_errno = EINVAL;
147 
148 		return 1;
149 	}
150 
151 	__RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
152 				v->qsbr_cnt[thread_id].lock_cnt);
153 
154 	id = thread_id & __RTE_QSBR_THRID_MASK;
155 	i = thread_id >> __RTE_QSBR_THRID_INDEX_SHIFT;
156 
157 	/* Make sure that the counter for registered threads does not
158 	 * go out of sync. Hence, additional checks are required.
159 	 */
160 	/* Check if the thread is already unregistered */
161 	old_bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
162 					__ATOMIC_RELAXED);
163 	if (!(old_bmap & (1UL << id)))
164 		return 0;
165 
166 	do {
167 		new_bmap = old_bmap & ~(1UL << id);
168 		/* Make sure any loads of the shared data structure are
169 		 * completed before removal of the thread from the list of
170 		 * reporting threads.
171 		 */
172 		success = __atomic_compare_exchange(
173 					__RTE_QSBR_THRID_ARRAY_ELM(v, i),
174 					&old_bmap, &new_bmap, 0,
175 					__ATOMIC_RELEASE, __ATOMIC_RELAXED);
176 
177 		if (success)
178 			__atomic_fetch_sub(&v->num_threads,
179 						1, __ATOMIC_RELAXED);
180 		else if (!(old_bmap & (1UL << id)))
181 			/* Someone else unregistered this thread.
182 			 * Counter should not be incremented.
183 			 */
184 			return 0;
185 	} while (success == 0);
186 
187 	return 0;
188 }
189 
190 /* Wait till the reader threads have entered quiescent state. */
191 void
rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr * v,unsigned int thread_id)192 rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)
193 {
194 	uint64_t t;
195 
196 	RTE_ASSERT(v != NULL);
197 
198 	t = rte_rcu_qsbr_start(v);
199 
200 	/* If the current thread has readside critical section,
201 	 * update its quiescent state status.
202 	 */
203 	if (thread_id != RTE_QSBR_THRID_INVALID)
204 		rte_rcu_qsbr_quiescent(v, thread_id);
205 
206 	/* Wait for other readers to enter quiescent state */
207 	rte_rcu_qsbr_check(v, t, true);
208 }
209 
210 /* Dump the details of a single quiescent state variable to a file. */
211 int
rte_rcu_qsbr_dump(FILE * f,struct rte_rcu_qsbr * v)212 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
213 {
214 	uint64_t bmap;
215 	uint32_t i, t, id;
216 
217 	if (v == NULL || f == NULL) {
218 		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
219 			"%s(): Invalid input parameter\n", __func__);
220 		rte_errno = EINVAL;
221 
222 		return 1;
223 	}
224 
225 	fprintf(f, "\nQuiescent State Variable @%p\n", v);
226 
227 	fprintf(f, "  QS variable memory size = %zu\n",
228 				rte_rcu_qsbr_get_memsize(v->max_threads));
229 	fprintf(f, "  Given # max threads = %u\n", v->max_threads);
230 	fprintf(f, "  Current # threads = %u\n", v->num_threads);
231 
232 	fprintf(f, "  Registered thread IDs = ");
233 	for (i = 0; i < v->num_elems; i++) {
234 		bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
235 					__ATOMIC_ACQUIRE);
236 		id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
237 		while (bmap) {
238 			t = __builtin_ctzl(bmap);
239 			fprintf(f, "%u ", id + t);
240 
241 			bmap &= ~(1UL << t);
242 		}
243 	}
244 
245 	fprintf(f, "\n");
246 
247 	fprintf(f, "  Token = %" PRIu64 "\n",
248 			__atomic_load_n(&v->token, __ATOMIC_ACQUIRE));
249 
250 	fprintf(f, "  Least Acknowledged Token = %" PRIu64 "\n",
251 			__atomic_load_n(&v->acked_token, __ATOMIC_ACQUIRE));
252 
253 	fprintf(f, "Quiescent State Counts for readers:\n");
254 	for (i = 0; i < v->num_elems; i++) {
255 		bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
256 					__ATOMIC_ACQUIRE);
257 		id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
258 		while (bmap) {
259 			t = __builtin_ctzl(bmap);
260 			fprintf(f, "thread ID = %u, count = %" PRIu64 ", lock count = %u\n",
261 				id + t,
262 				__atomic_load_n(
263 					&v->qsbr_cnt[id + t].cnt,
264 					__ATOMIC_RELAXED),
265 				__atomic_load_n(
266 					&v->qsbr_cnt[id + t].lock_cnt,
267 					__ATOMIC_RELAXED));
268 			bmap &= ~(1UL << t);
269 		}
270 	}
271 
272 	return 0;
273 }
274 
275 /* Create a queue used to store the data structure elements that can
276  * be freed later. This queue is referred to as 'defer queue'.
277  */
278 struct rte_rcu_qsbr_dq *
rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters * params)279 rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
280 {
281 	struct rte_rcu_qsbr_dq *dq;
282 	uint32_t qs_fifo_size;
283 	unsigned int flags;
284 
285 	if (params == NULL || params->free_fn == NULL ||
286 		params->v == NULL || params->name == NULL ||
287 		params->size == 0 || params->esize == 0 ||
288 		(params->esize % 4 != 0)) {
289 		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
290 			"%s(): Invalid input parameter\n", __func__);
291 		rte_errno = EINVAL;
292 
293 		return NULL;
294 	}
295 	/* If auto reclamation is configured, reclaim limit
296 	 * should be a valid value.
297 	 */
298 	if ((params->trigger_reclaim_limit <= params->size) &&
299 	    (params->max_reclaim_size == 0)) {
300 		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
301 			"%s(): Invalid input parameter, size = %u, trigger_reclaim_limit = %u, max_reclaim_size = %u\n",
302 			__func__, params->size, params->trigger_reclaim_limit,
303 			params->max_reclaim_size);
304 		rte_errno = EINVAL;
305 
306 		return NULL;
307 	}
308 
309 	dq = rte_zmalloc(NULL, sizeof(struct rte_rcu_qsbr_dq),
310 			 RTE_CACHE_LINE_SIZE);
311 	if (dq == NULL) {
312 		rte_errno = ENOMEM;
313 
314 		return NULL;
315 	}
316 
317 	/* Decide the flags for the ring.
318 	 * If MT safety is requested, use RTS for ring enqueue as most
319 	 * use cases involve dq-enqueue happening on the control plane.
320 	 * Ring dequeue is always HTS due to the possibility of revert.
321 	 */
322 	flags = RING_F_MP_RTS_ENQ;
323 	if (params->flags & RTE_RCU_QSBR_DQ_MT_UNSAFE)
324 		flags = RING_F_SP_ENQ;
325 	flags |= RING_F_MC_HTS_DEQ;
326 	/* round up qs_fifo_size to next power of two that is not less than
327 	 * max_size.
328 	 */
329 	qs_fifo_size = rte_align32pow2(params->size + 1);
330 	/* Add token size to ring element size */
331 	dq->r = rte_ring_create_elem(params->name,
332 			__RTE_QSBR_TOKEN_SIZE + params->esize,
333 			qs_fifo_size, SOCKET_ID_ANY, flags);
334 	if (dq->r == NULL) {
335 		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
336 			"%s(): defer queue create failed\n", __func__);
337 		rte_free(dq);
338 		return NULL;
339 	}
340 
341 	dq->v = params->v;
342 	dq->size = params->size;
343 	dq->esize = __RTE_QSBR_TOKEN_SIZE + params->esize;
344 	dq->trigger_reclaim_limit = params->trigger_reclaim_limit;
345 	dq->max_reclaim_size = params->max_reclaim_size;
346 	dq->free_fn = params->free_fn;
347 	dq->p = params->p;
348 
349 	return dq;
350 }
351 
352 /* Enqueue one resource to the defer queue to free after the grace
353  * period is over.
354  */
rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq * dq,void * e)355 int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
356 {
357 	__rte_rcu_qsbr_dq_elem_t *dq_elem;
358 	uint32_t cur_size;
359 
360 	if (dq == NULL || e == NULL) {
361 		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
362 			"%s(): Invalid input parameter\n", __func__);
363 		rte_errno = EINVAL;
364 
365 		return 1;
366 	}
367 
368 	char data[dq->esize];
369 	dq_elem = (__rte_rcu_qsbr_dq_elem_t *)data;
370 	/* Start the grace period */
371 	dq_elem->token = rte_rcu_qsbr_start(dq->v);
372 
373 	/* Reclaim resources if the queue size has hit the reclaim
374 	 * limit. This helps the queue from growing too large and
375 	 * allows time for reader threads to report their quiescent state.
376 	 */
377 	cur_size = rte_ring_count(dq->r);
378 	if (cur_size > dq->trigger_reclaim_limit) {
379 		rte_log(RTE_LOG_INFO, rte_rcu_log_type,
380 			"%s(): Triggering reclamation\n", __func__);
381 		rte_rcu_qsbr_dq_reclaim(dq, dq->max_reclaim_size,
382 						NULL, NULL, NULL);
383 	}
384 
385 	/* Enqueue the token and resource. Generating the token and
386 	 * enqueuing (token + resource) on the queue is not an
387 	 * atomic operation. When the defer queue is shared by multiple
388 	 * writers, this might result in tokens enqueued out of order
389 	 * on the queue. So, some tokens might wait longer than they
390 	 * are required to be reclaimed.
391 	 */
392 	memcpy(dq_elem->elem, e, dq->esize - __RTE_QSBR_TOKEN_SIZE);
393 	/* Check the status as enqueue might fail since the other threads
394 	 * might have used up the freed space.
395 	 * Enqueue uses the configured flags when the DQ was created.
396 	 */
397 	if (rte_ring_enqueue_elem(dq->r, data, dq->esize) != 0) {
398 		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
399 			"%s(): Enqueue failed\n", __func__);
400 		/* Note that the token generated above is not used.
401 		 * Other than wasting tokens, it should not cause any
402 		 * other issues.
403 		 */
404 		rte_log(RTE_LOG_INFO, rte_rcu_log_type,
405 			"%s(): Skipped enqueuing token = %" PRIu64 "\n",
406 			__func__, dq_elem->token);
407 
408 		rte_errno = ENOSPC;
409 		return 1;
410 	}
411 
412 	rte_log(RTE_LOG_INFO, rte_rcu_log_type,
413 		"%s(): Enqueued token = %" PRIu64 "\n",
414 		__func__, dq_elem->token);
415 
416 	return 0;
417 }
418 
419 /* Reclaim resources from the defer queue. */
420 int
rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq * dq,unsigned int n,unsigned int * freed,unsigned int * pending,unsigned int * available)421 rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
422 			unsigned int *freed, unsigned int *pending,
423 			unsigned int *available)
424 {
425 	uint32_t cnt;
426 	__rte_rcu_qsbr_dq_elem_t *dq_elem;
427 
428 	if (dq == NULL || n == 0) {
429 		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
430 			"%s(): Invalid input parameter\n", __func__);
431 		rte_errno = EINVAL;
432 
433 		return 1;
434 	}
435 
436 	cnt = 0;
437 
438 	char data[dq->esize];
439 	/* Check reader threads quiescent state and reclaim resources */
440 	while (cnt < n &&
441 		rte_ring_dequeue_bulk_elem_start(dq->r, &data,
442 					dq->esize, 1, available) != 0) {
443 		dq_elem = (__rte_rcu_qsbr_dq_elem_t *)data;
444 
445 		/* Reclaim the resource */
446 		if (rte_rcu_qsbr_check(dq->v, dq_elem->token, false) != 1) {
447 			rte_ring_dequeue_elem_finish(dq->r, 0);
448 			break;
449 		}
450 		rte_ring_dequeue_elem_finish(dq->r, 1);
451 
452 		rte_log(RTE_LOG_INFO, rte_rcu_log_type,
453 			"%s(): Reclaimed token = %" PRIu64 "\n",
454 			__func__, dq_elem->token);
455 
456 		dq->free_fn(dq->p, dq_elem->elem, 1);
457 
458 		cnt++;
459 	}
460 
461 	rte_log(RTE_LOG_INFO, rte_rcu_log_type,
462 		"%s(): Reclaimed %u resources\n", __func__, cnt);
463 
464 	if (freed != NULL)
465 		*freed = cnt;
466 	if (pending != NULL)
467 		*pending = rte_ring_count(dq->r);
468 
469 	return 0;
470 }
471 
472 /* Delete a defer queue. */
473 int
rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq * dq)474 rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
475 {
476 	unsigned int pending;
477 
478 	if (dq == NULL) {
479 		rte_log(RTE_LOG_DEBUG, rte_rcu_log_type,
480 			"%s(): Invalid input parameter\n", __func__);
481 
482 		return 0;
483 	}
484 
485 	/* Reclaim all the resources */
486 	rte_rcu_qsbr_dq_reclaim(dq, ~0, NULL, &pending, NULL);
487 	if (pending != 0) {
488 		rte_errno = EAGAIN;
489 
490 		return 1;
491 	}
492 
493 	rte_ring_free(dq->r);
494 	rte_free(dq);
495 
496 	return 0;
497 }
498 
499 RTE_LOG_REGISTER(rte_rcu_log_type, lib.rcu, ERR);
500