xref: /f-stack/dpdk/drivers/event/opdl/opdl_ring.c (revision 2d9fd380)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation
3  */
4 
5 #include <stdbool.h>
6 #include <stddef.h>
7 #include <stdint.h>
8 #include <stdio.h>
9 
10 #include <rte_string_fns.h>
11 #include <rte_branch_prediction.h>
12 #include <rte_debug.h>
13 #include <rte_lcore.h>
14 #include <rte_log.h>
15 #include <rte_malloc.h>
16 #include <rte_memcpy.h>
17 #include <rte_memory.h>
18 #include <rte_memzone.h>
19 #include <rte_atomic.h>
20 
21 #include "opdl_ring.h"
22 #include "opdl_log.h"
23 
24 #define LIB_NAME "opdl_ring"
25 
26 #define OPDL_NAME_SIZE 64
27 
28 
29 #define OPDL_EVENT_MASK  (0x00000000000FFFFFULL)
30 #define OPDL_FLOWID_MASK (0xFFFFF)
31 #define OPDL_OPA_MASK    (0xFF)
32 #define OPDL_OPA_OFFSET  (0x38)
33 
34 /* Types of dependency between stages */
35 enum dep_type {
36 	DEP_NONE = 0,  /* no dependency */
37 	DEP_DIRECT,  /* stage has direct dependency */
38 	DEP_INDIRECT,  /* in-direct dependency through other stage(s) */
39 	DEP_SELF,  /* stage dependency on itself, used to detect loops */
40 };
41 
42 /* Shared section of stage state.
43  * Care is needed when accessing and the layout is important, especially to
44  * limit the adjacent cache-line HW prefetcher from impacting performance.
45  */
46 struct shared_state {
47 	/* Last known minimum sequence number of dependencies, used for multi
48 	 * thread operation
49 	 */
50 	uint32_t available_seq;
51 	char _pad1[RTE_CACHE_LINE_SIZE * 3];
52 	uint32_t head;  /* Head sequence number (for multi thread operation) */
53 	char _pad2[RTE_CACHE_LINE_SIZE * 3];
54 	struct opdl_stage *stage;  /* back pointer */
55 	uint32_t tail;  /* Tail sequence number */
56 	char _pad3[RTE_CACHE_LINE_SIZE * 2];
57 } __rte_cache_aligned;
58 
59 /* A structure to keep track of "unfinished" claims. This is only used for
60  * stages that are threadsafe. Each lcore accesses its own instance of this
61  * structure to record the entries it has claimed. This allows one lcore to make
62  * multiple claims without being blocked by another. When disclaiming it moves
63  * forward the shared tail when the shared tail matches the tail value recorded
64  * here.
65  */
66 struct claim_manager {
67 	uint32_t num_to_disclaim;
68 	uint32_t num_claimed;
69 	uint32_t mgr_head;
70 	uint32_t mgr_tail;
71 	struct {
72 		uint32_t head;
73 		uint32_t tail;
74 	} claims[OPDL_DISCLAIMS_PER_LCORE];
75 } __rte_cache_aligned;
76 
77 /* Context for each stage of opdl_ring.
78  * Calculations on sequence numbers need to be done with other uint32_t values
79  * so that results are modulus 2^32, and not undefined.
80  */
81 struct opdl_stage {
82 	struct opdl_ring *t;  /* back pointer, set at init */
83 	uint32_t num_slots;  /* Number of slots for entries, set at init */
84 	uint32_t index;  /* ID for this stage, set at init */
85 	bool threadsafe;  /* Set to 1 if this stage supports threadsafe use */
86 	/* Last known min seq number of dependencies for used for single thread
87 	 * operation
88 	 */
89 	uint32_t available_seq;
90 	uint32_t head;  /* Current head for single-thread operation */
91 	uint32_t nb_instance;  /* Number of instances */
92 	uint32_t instance_id;  /* ID of this stage instance */
93 	uint16_t num_claimed;  /* Number of slots claimed */
94 	uint16_t num_event;		/* Number of events */
95 	uint32_t seq;			/* sequence number  */
96 	uint32_t num_deps;  /* Number of direct dependencies */
97 	/* Keep track of all dependencies, used during init only */
98 	enum dep_type *dep_tracking;
99 	/* Direct dependencies of this stage */
100 	struct shared_state **deps;
101 	/* Other stages read this! */
102 	struct shared_state shared __rte_cache_aligned;
103 	/* For managing disclaims in multi-threaded processing stages */
104 	struct claim_manager pending_disclaims[RTE_MAX_LCORE]
105 					       __rte_cache_aligned;
106 	uint32_t shadow_head;  /* Shadow head for single-thread operation */
107 	uint32_t queue_id;     /* ID of Queue which is assigned to this stage */
108 	uint32_t pos;		/* Atomic scan position */
109 } __rte_cache_aligned;
110 
111 /* Context for opdl_ring */
112 struct opdl_ring {
113 	char name[OPDL_NAME_SIZE];  /* OPDL queue instance name */
114 	int socket;  /* NUMA socket that memory is allocated on */
115 	uint32_t num_slots;  /* Number of slots for entries */
116 	uint32_t mask;  /* Mask for sequence numbers (num_slots - 1) */
117 	uint32_t slot_size;  /* Size of each slot in bytes */
118 	uint32_t num_stages;  /* Number of stages that have been added */
119 	uint32_t max_num_stages;  /* Max number of stages */
120 	/* Stages indexed by ID */
121 	struct opdl_stage *stages;
122 	/* Memory for storing slot data */
123 	uint8_t slots[0] __rte_cache_aligned;
124 };
125 
126 
127 /* Return input stage of a opdl_ring */
128 static __rte_always_inline struct opdl_stage *
input_stage(const struct opdl_ring * t)129 input_stage(const struct opdl_ring *t)
130 {
131 	return &t->stages[0];
132 }
133 
134 /* Check if a stage is the input stage */
135 static __rte_always_inline bool
is_input_stage(const struct opdl_stage * s)136 is_input_stage(const struct opdl_stage *s)
137 {
138 	return s->index == 0;
139 }
140 
141 /* Get slot pointer from sequence number */
142 static __rte_always_inline void *
get_slot(const struct opdl_ring * t,uint32_t n)143 get_slot(const struct opdl_ring *t, uint32_t n)
144 {
145 	return (void *)(uintptr_t)&t->slots[(n & t->mask) * t->slot_size];
146 }
147 
148 /* Find how many entries are available for processing */
149 static __rte_always_inline uint32_t
available(const struct opdl_stage * s)150 available(const struct opdl_stage *s)
151 {
152 	if (s->threadsafe == true) {
153 		uint32_t n = __atomic_load_n(&s->shared.available_seq,
154 				__ATOMIC_ACQUIRE) -
155 				__atomic_load_n(&s->shared.head,
156 				__ATOMIC_ACQUIRE);
157 
158 		/* Return 0 if available_seq needs to be updated */
159 		return (n <= s->num_slots) ? n : 0;
160 	}
161 
162 	/* Single threaded */
163 	return s->available_seq - s->head;
164 }
165 
166 /* Read sequence number of dependencies and find minimum */
167 static __rte_always_inline void
update_available_seq(struct opdl_stage * s)168 update_available_seq(struct opdl_stage *s)
169 {
170 	uint32_t i;
171 	uint32_t this_tail = s->shared.tail;
172 	uint32_t min_seq = __atomic_load_n(&s->deps[0]->tail, __ATOMIC_ACQUIRE);
173 	/* Input stage sequence numbers are greater than the sequence numbers of
174 	 * its dependencies so an offset of t->num_slots is needed when
175 	 * calculating available slots and also the condition which is used to
176 	 * determine the dependencies minimum sequence number must be reverted.
177 	 */
178 	uint32_t wrap;
179 
180 	if (is_input_stage(s)) {
181 		wrap = s->num_slots;
182 		for (i = 1; i < s->num_deps; i++) {
183 			uint32_t seq = __atomic_load_n(&s->deps[i]->tail,
184 					__ATOMIC_ACQUIRE);
185 			if ((this_tail - seq) > (this_tail - min_seq))
186 				min_seq = seq;
187 		}
188 	} else {
189 		wrap = 0;
190 		for (i = 1; i < s->num_deps; i++) {
191 			uint32_t seq = __atomic_load_n(&s->deps[i]->tail,
192 					__ATOMIC_ACQUIRE);
193 			if ((seq - this_tail) < (min_seq - this_tail))
194 				min_seq = seq;
195 		}
196 	}
197 
198 	if (s->threadsafe == false)
199 		s->available_seq = min_seq + wrap;
200 	else
201 		__atomic_store_n(&s->shared.available_seq, min_seq + wrap,
202 				__ATOMIC_RELEASE);
203 }
204 
205 /* Wait until the number of available slots reaches number requested */
206 static __rte_always_inline void
wait_for_available(struct opdl_stage * s,uint32_t n)207 wait_for_available(struct opdl_stage *s, uint32_t n)
208 {
209 	while (available(s) < n) {
210 		rte_pause();
211 		update_available_seq(s);
212 	}
213 }
214 
215 /* Return number of slots to process based on number requested and mode */
216 static __rte_always_inline uint32_t
num_to_process(struct opdl_stage * s,uint32_t n,bool block)217 num_to_process(struct opdl_stage *s, uint32_t n, bool block)
218 {
219 	/* Don't read tail sequences of dependencies if not needed */
220 	if (available(s) >= n)
221 		return n;
222 
223 	update_available_seq(s);
224 
225 	if (block == false) {
226 		uint32_t avail = available(s);
227 
228 		if (avail == 0) {
229 			rte_pause();
230 			return 0;
231 		}
232 		return (avail <= n) ? avail : n;
233 	}
234 
235 	if (unlikely(n > s->num_slots)) {
236 		PMD_DRV_LOG(ERR, "%u entries is more than max (%u)",
237 				n, s->num_slots);
238 		return 0;  /* Avoid infinite loop */
239 	}
240 	/* blocking */
241 	wait_for_available(s, n);
242 	return n;
243 }
244 
245 /* Copy entries in to slots with wrap-around */
246 static __rte_always_inline void
copy_entries_in(struct opdl_ring * t,uint32_t start,const void * entries,uint32_t num_entries)247 copy_entries_in(struct opdl_ring *t, uint32_t start, const void *entries,
248 		uint32_t num_entries)
249 {
250 	uint32_t slot_size = t->slot_size;
251 	uint32_t slot_index = start & t->mask;
252 
253 	if (slot_index + num_entries <= t->num_slots) {
254 		rte_memcpy(get_slot(t, start), entries,
255 				num_entries * slot_size);
256 	} else {
257 		uint32_t split = t->num_slots - slot_index;
258 
259 		rte_memcpy(get_slot(t, start), entries, split * slot_size);
260 		rte_memcpy(get_slot(t, 0),
261 				RTE_PTR_ADD(entries, split * slot_size),
262 				(num_entries - split) * slot_size);
263 	}
264 }
265 
266 /* Copy entries out from slots with wrap-around */
267 static __rte_always_inline void
copy_entries_out(struct opdl_ring * t,uint32_t start,void * entries,uint32_t num_entries)268 copy_entries_out(struct opdl_ring *t, uint32_t start, void *entries,
269 		uint32_t num_entries)
270 {
271 	uint32_t slot_size = t->slot_size;
272 	uint32_t slot_index = start & t->mask;
273 
274 	if (slot_index + num_entries <= t->num_slots) {
275 		rte_memcpy(entries, get_slot(t, start),
276 				num_entries * slot_size);
277 	} else {
278 		uint32_t split = t->num_slots - slot_index;
279 
280 		rte_memcpy(entries, get_slot(t, start), split * slot_size);
281 		rte_memcpy(RTE_PTR_ADD(entries, split * slot_size),
282 				get_slot(t, 0),
283 				(num_entries - split) * slot_size);
284 	}
285 }
286 
287 /* Input function optimised for single thread */
288 static __rte_always_inline uint32_t
opdl_ring_input_singlethread(struct opdl_ring * t,const void * entries,uint32_t num_entries,bool block)289 opdl_ring_input_singlethread(struct opdl_ring *t, const void *entries,
290 		uint32_t num_entries, bool block)
291 {
292 	struct opdl_stage *s = input_stage(t);
293 	uint32_t head = s->head;
294 
295 	num_entries = num_to_process(s, num_entries, block);
296 	if (num_entries == 0)
297 		return 0;
298 
299 	copy_entries_in(t, head, entries, num_entries);
300 
301 	s->head += num_entries;
302 	__atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
303 
304 	return num_entries;
305 }
306 
307 /* Convert head and tail of claim_manager into valid index */
308 static __rte_always_inline uint32_t
claim_mgr_index(uint32_t n)309 claim_mgr_index(uint32_t n)
310 {
311 	return n & (OPDL_DISCLAIMS_PER_LCORE - 1);
312 }
313 
314 /* Check if there are available slots in claim_manager */
315 static __rte_always_inline bool
claim_mgr_available(struct claim_manager * mgr)316 claim_mgr_available(struct claim_manager *mgr)
317 {
318 	return (mgr->mgr_head < (mgr->mgr_tail + OPDL_DISCLAIMS_PER_LCORE)) ?
319 			true : false;
320 }
321 
322 /* Record a new claim. Only use after first checking an entry is available */
323 static __rte_always_inline void
claim_mgr_add(struct claim_manager * mgr,uint32_t tail,uint32_t head)324 claim_mgr_add(struct claim_manager *mgr, uint32_t tail, uint32_t head)
325 {
326 	if ((mgr->mgr_head != mgr->mgr_tail) &&
327 			(mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head ==
328 			tail)) {
329 		/* Combine with previous claim */
330 		mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head = head;
331 	} else {
332 		mgr->claims[claim_mgr_index(mgr->mgr_head)].head = head;
333 		mgr->claims[claim_mgr_index(mgr->mgr_head)].tail = tail;
334 		mgr->mgr_head++;
335 	}
336 
337 	mgr->num_claimed += (head - tail);
338 }
339 
340 /* Read the oldest recorded claim */
341 static __rte_always_inline bool
claim_mgr_read(struct claim_manager * mgr,uint32_t * tail,uint32_t * head)342 claim_mgr_read(struct claim_manager *mgr, uint32_t *tail, uint32_t *head)
343 {
344 	if (mgr->mgr_head == mgr->mgr_tail)
345 		return false;
346 
347 	*head = mgr->claims[claim_mgr_index(mgr->mgr_tail)].head;
348 	*tail = mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail;
349 	return true;
350 }
351 
352 /* Remove the oldest recorded claim. Only use after first reading the entry */
353 static __rte_always_inline void
claim_mgr_remove(struct claim_manager * mgr)354 claim_mgr_remove(struct claim_manager *mgr)
355 {
356 	mgr->num_claimed -= (mgr->claims[claim_mgr_index(mgr->mgr_tail)].head -
357 			mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail);
358 	mgr->mgr_tail++;
359 }
360 
361 /* Update tail in the oldest claim. Only use after first reading the entry */
362 static __rte_always_inline void
claim_mgr_move_tail(struct claim_manager * mgr,uint32_t num_entries)363 claim_mgr_move_tail(struct claim_manager *mgr, uint32_t num_entries)
364 {
365 	mgr->num_claimed -= num_entries;
366 	mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail += num_entries;
367 }
368 
369 static __rte_always_inline void
opdl_stage_disclaim_multithread_n(struct opdl_stage * s,uint32_t num_entries,bool block)370 opdl_stage_disclaim_multithread_n(struct opdl_stage *s,
371 		uint32_t num_entries, bool block)
372 {
373 	struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
374 	uint32_t head;
375 	uint32_t tail;
376 
377 	while (num_entries) {
378 		bool ret = claim_mgr_read(disclaims, &tail, &head);
379 
380 		if (ret == false)
381 			break;  /* nothing is claimed */
382 		/* There should be no race condition here. If shared.tail
383 		 * matches, no other core can update it until this one does.
384 		 */
385 		if (__atomic_load_n(&s->shared.tail, __ATOMIC_ACQUIRE) ==
386 				tail) {
387 			if (num_entries >= (head - tail)) {
388 				claim_mgr_remove(disclaims);
389 				__atomic_store_n(&s->shared.tail, head,
390 						__ATOMIC_RELEASE);
391 				num_entries -= (head - tail);
392 			} else {
393 				claim_mgr_move_tail(disclaims, num_entries);
394 				__atomic_store_n(&s->shared.tail,
395 						num_entries + tail,
396 						__ATOMIC_RELEASE);
397 				num_entries = 0;
398 			}
399 		} else if (block == false)
400 			break;  /* blocked by other thread */
401 		/* Keep going until num_entries are disclaimed. */
402 		rte_pause();
403 	}
404 
405 	disclaims->num_to_disclaim = num_entries;
406 }
407 
408 /* Move head atomically, returning number of entries available to process and
409  * the original value of head. For non-input stages, the claim is recorded
410  * so that the tail can be updated later by opdl_stage_disclaim().
411  */
412 static __rte_always_inline void
move_head_atomically(struct opdl_stage * s,uint32_t * num_entries,uint32_t * old_head,bool block,bool claim_func)413 move_head_atomically(struct opdl_stage *s, uint32_t *num_entries,
414 		uint32_t *old_head, bool block, bool claim_func)
415 {
416 	uint32_t orig_num_entries = *num_entries;
417 	uint32_t ret;
418 	struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
419 
420 	/* Attempt to disclaim any outstanding claims */
421 	opdl_stage_disclaim_multithread_n(s, disclaims->num_to_disclaim,
422 			false);
423 
424 	*old_head = __atomic_load_n(&s->shared.head, __ATOMIC_ACQUIRE);
425 	while (true) {
426 		bool success;
427 		/* If called by opdl_ring_input(), claim does not need to be
428 		 * recorded, as there will be no disclaim.
429 		 */
430 		if (claim_func) {
431 			/* Check that the claim can be recorded */
432 			ret = claim_mgr_available(disclaims);
433 			if (ret == false) {
434 				/* exit out if claim can't be recorded */
435 				*num_entries = 0;
436 				return;
437 			}
438 		}
439 
440 		*num_entries = num_to_process(s, orig_num_entries, block);
441 		if (*num_entries == 0)
442 			return;
443 
444 		success = __atomic_compare_exchange_n(&s->shared.head, old_head,
445 				*old_head + *num_entries,
446 				true,  /* may fail spuriously */
447 				__ATOMIC_RELEASE,  /* memory order on success */
448 				__ATOMIC_ACQUIRE);  /* memory order on fail */
449 		if (likely(success))
450 			break;
451 		rte_pause();
452 	}
453 
454 	if (claim_func)
455 		/* Store the claim record */
456 		claim_mgr_add(disclaims, *old_head, *old_head + *num_entries);
457 }
458 
459 /* Input function that supports multiple threads */
460 static __rte_always_inline uint32_t
opdl_ring_input_multithread(struct opdl_ring * t,const void * entries,uint32_t num_entries,bool block)461 opdl_ring_input_multithread(struct opdl_ring *t, const void *entries,
462 		uint32_t num_entries, bool block)
463 {
464 	struct opdl_stage *s = input_stage(t);
465 	uint32_t old_head;
466 
467 	move_head_atomically(s, &num_entries, &old_head, block, false);
468 	if (num_entries == 0)
469 		return 0;
470 
471 	copy_entries_in(t, old_head, entries, num_entries);
472 
473 	/* If another thread started inputting before this one, but hasn't
474 	 * finished, we need to wait for it to complete to update the tail.
475 	 */
476 	rte_wait_until_equal_32(&s->shared.tail, old_head, __ATOMIC_ACQUIRE);
477 
478 	__atomic_store_n(&s->shared.tail, old_head + num_entries,
479 			__ATOMIC_RELEASE);
480 
481 	return num_entries;
482 }
483 
484 static __rte_always_inline uint32_t
opdl_first_entry_id(uint32_t start_seq,uint8_t nb_p_lcores,uint8_t this_lcore)485 opdl_first_entry_id(uint32_t start_seq, uint8_t nb_p_lcores,
486 		uint8_t this_lcore)
487 {
488 	return ((nb_p_lcores <= 1) ? 0 :
489 			(nb_p_lcores - (start_seq % nb_p_lcores) + this_lcore) %
490 			nb_p_lcores);
491 }
492 
493 /* Claim slots to process, optimised for single-thread operation */
494 static __rte_always_inline uint32_t
opdl_stage_claim_singlethread(struct opdl_stage * s,void * entries,uint32_t num_entries,uint32_t * seq,bool block,bool atomic)495 opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
496 		uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
497 {
498 	uint32_t i = 0, j = 0,  offset;
499 	uint32_t opa_id   = 0;
500 	uint32_t flow_id  = 0;
501 	uint64_t event    = 0;
502 	void *get_slots;
503 	struct rte_event *ev;
504 	RTE_SET_USED(seq);
505 	struct opdl_ring *t = s->t;
506 	uint8_t *entries_offset = (uint8_t *)entries;
507 
508 	if (!atomic) {
509 
510 		offset = opdl_first_entry_id(s->seq, s->nb_instance,
511 				s->instance_id);
512 
513 		num_entries = s->nb_instance * num_entries;
514 
515 		num_entries = num_to_process(s, num_entries, block);
516 
517 		for (; offset < num_entries; offset += s->nb_instance) {
518 			get_slots = get_slot(t, s->head + offset);
519 			memcpy(entries_offset, get_slots, t->slot_size);
520 			entries_offset += t->slot_size;
521 			i++;
522 		}
523 	} else {
524 		num_entries = num_to_process(s, num_entries, block);
525 
526 		for (j = 0; j < num_entries; j++) {
527 			ev = (struct rte_event *)get_slot(t, s->head+j);
528 
529 			event  = __atomic_load_n(&(ev->event),
530 					__ATOMIC_ACQUIRE);
531 
532 			opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET);
533 			flow_id  = OPDL_FLOWID_MASK & event;
534 
535 			if (opa_id >= s->queue_id)
536 				continue;
537 
538 			if ((flow_id % s->nb_instance) == s->instance_id) {
539 				memcpy(entries_offset, ev, t->slot_size);
540 				entries_offset += t->slot_size;
541 				i++;
542 			}
543 		}
544 	}
545 	s->shadow_head = s->head;
546 	s->head += num_entries;
547 	s->num_claimed = num_entries;
548 	s->num_event = i;
549 	s->pos = 0;
550 
551 	/* automatically disclaim entries if number of rte_events is zero */
552 	if (unlikely(i == 0))
553 		opdl_stage_disclaim(s, 0, false);
554 
555 	return i;
556 }
557 
558 /* Thread-safe version of function to claim slots for processing */
559 static __rte_always_inline uint32_t
opdl_stage_claim_multithread(struct opdl_stage * s,void * entries,uint32_t num_entries,uint32_t * seq,bool block)560 opdl_stage_claim_multithread(struct opdl_stage *s, void *entries,
561 		uint32_t num_entries, uint32_t *seq, bool block)
562 {
563 	uint32_t old_head;
564 	struct opdl_ring *t = s->t;
565 	uint32_t i = 0, offset;
566 	uint8_t *entries_offset = (uint8_t *)entries;
567 
568 	if (seq == NULL) {
569 		PMD_DRV_LOG(ERR, "Invalid seq PTR");
570 		return 0;
571 	}
572 	offset = opdl_first_entry_id(*seq, s->nb_instance, s->instance_id);
573 	num_entries = offset + (s->nb_instance * num_entries);
574 
575 	move_head_atomically(s, &num_entries, &old_head, block, true);
576 
577 	for (; offset < num_entries; offset += s->nb_instance) {
578 		memcpy(entries_offset, get_slot(t, s->head + offset),
579 			t->slot_size);
580 		entries_offset += t->slot_size;
581 		i++;
582 	}
583 
584 	*seq = old_head;
585 
586 	return i;
587 }
588 
589 /* Claim and copy slot pointers, optimised for single-thread operation */
590 static __rte_always_inline uint32_t
opdl_stage_claim_copy_singlethread(struct opdl_stage * s,void * entries,uint32_t num_entries,uint32_t * seq,bool block)591 opdl_stage_claim_copy_singlethread(struct opdl_stage *s, void *entries,
592 		uint32_t num_entries, uint32_t *seq, bool block)
593 {
594 	num_entries = num_to_process(s, num_entries, block);
595 	if (num_entries == 0)
596 		return 0;
597 	copy_entries_out(s->t, s->head, entries, num_entries);
598 	if (seq != NULL)
599 		*seq = s->head;
600 	s->head += num_entries;
601 	return num_entries;
602 }
603 
604 /* Thread-safe version of function to claim and copy pointers to slots */
605 static __rte_always_inline uint32_t
opdl_stage_claim_copy_multithread(struct opdl_stage * s,void * entries,uint32_t num_entries,uint32_t * seq,bool block)606 opdl_stage_claim_copy_multithread(struct opdl_stage *s, void *entries,
607 		uint32_t num_entries, uint32_t *seq, bool block)
608 {
609 	uint32_t old_head;
610 
611 	move_head_atomically(s, &num_entries, &old_head, block, true);
612 	if (num_entries == 0)
613 		return 0;
614 	copy_entries_out(s->t, old_head, entries, num_entries);
615 	if (seq != NULL)
616 		*seq = old_head;
617 	return num_entries;
618 }
619 
620 static __rte_always_inline void
opdl_stage_disclaim_singlethread_n(struct opdl_stage * s,uint32_t num_entries)621 opdl_stage_disclaim_singlethread_n(struct opdl_stage *s,
622 		uint32_t num_entries)
623 {
624 	uint32_t old_tail = s->shared.tail;
625 
626 	if (unlikely(num_entries > (s->head - old_tail))) {
627 		PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
628 				num_entries, s->head - old_tail);
629 		num_entries = s->head - old_tail;
630 	}
631 	__atomic_store_n(&s->shared.tail, num_entries + old_tail,
632 			__ATOMIC_RELEASE);
633 }
634 
635 uint32_t
opdl_ring_input(struct opdl_ring * t,const void * entries,uint32_t num_entries,bool block)636 opdl_ring_input(struct opdl_ring *t, const void *entries, uint32_t num_entries,
637 		bool block)
638 {
639 	if (input_stage(t)->threadsafe == false)
640 		return opdl_ring_input_singlethread(t, entries, num_entries,
641 				block);
642 	else
643 		return opdl_ring_input_multithread(t, entries, num_entries,
644 				block);
645 }
646 
647 uint32_t
opdl_ring_copy_from_burst(struct opdl_ring * t,struct opdl_stage * s,const void * entries,uint32_t num_entries,bool block)648 opdl_ring_copy_from_burst(struct opdl_ring *t, struct opdl_stage *s,
649 		const void *entries, uint32_t num_entries, bool block)
650 {
651 	uint32_t head = s->head;
652 
653 	num_entries = num_to_process(s, num_entries, block);
654 
655 	if (num_entries == 0)
656 		return 0;
657 
658 	copy_entries_in(t, head, entries, num_entries);
659 
660 	s->head += num_entries;
661 	__atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
662 
663 	return num_entries;
664 
665 }
666 
667 uint32_t
opdl_ring_copy_to_burst(struct opdl_ring * t,struct opdl_stage * s,void * entries,uint32_t num_entries,bool block)668 opdl_ring_copy_to_burst(struct opdl_ring *t, struct opdl_stage *s,
669 		void *entries, uint32_t num_entries, bool block)
670 {
671 	uint32_t head = s->head;
672 
673 	num_entries = num_to_process(s, num_entries, block);
674 	if (num_entries == 0)
675 		return 0;
676 
677 	copy_entries_out(t, head, entries, num_entries);
678 
679 	s->head += num_entries;
680 	__atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
681 
682 	return num_entries;
683 }
684 
685 uint32_t
opdl_stage_find_num_available(struct opdl_stage * s,uint32_t num_entries)686 opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries)
687 {
688 	/* return (num_to_process(s, num_entries, false)); */
689 
690 	if (available(s) >= num_entries)
691 		return num_entries;
692 
693 	update_available_seq(s);
694 
695 	uint32_t avail = available(s);
696 
697 	if (avail == 0) {
698 		rte_pause();
699 		return 0;
700 	}
701 	return (avail <= num_entries) ? avail : num_entries;
702 }
703 
704 uint32_t
opdl_stage_claim(struct opdl_stage * s,void * entries,uint32_t num_entries,uint32_t * seq,bool block,bool atomic)705 opdl_stage_claim(struct opdl_stage *s, void *entries,
706 		uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
707 {
708 	if (s->threadsafe == false)
709 		return opdl_stage_claim_singlethread(s, entries, num_entries,
710 				seq, block, atomic);
711 	else
712 		return opdl_stage_claim_multithread(s, entries, num_entries,
713 				seq, block);
714 }
715 
716 uint32_t
opdl_stage_claim_copy(struct opdl_stage * s,void * entries,uint32_t num_entries,uint32_t * seq,bool block)717 opdl_stage_claim_copy(struct opdl_stage *s, void *entries,
718 		uint32_t num_entries, uint32_t *seq, bool block)
719 {
720 	if (s->threadsafe == false)
721 		return opdl_stage_claim_copy_singlethread(s, entries,
722 				num_entries, seq, block);
723 	else
724 		return opdl_stage_claim_copy_multithread(s, entries,
725 				num_entries, seq, block);
726 }
727 
728 void
opdl_stage_disclaim_n(struct opdl_stage * s,uint32_t num_entries,bool block)729 opdl_stage_disclaim_n(struct opdl_stage *s, uint32_t num_entries,
730 		bool block)
731 {
732 
733 	if (s->threadsafe == false) {
734 		opdl_stage_disclaim_singlethread_n(s, s->num_claimed);
735 	} else {
736 		struct claim_manager *disclaims =
737 			&s->pending_disclaims[rte_lcore_id()];
738 
739 		if (unlikely(num_entries > s->num_slots)) {
740 			PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
741 					num_entries, disclaims->num_claimed);
742 			num_entries = disclaims->num_claimed;
743 		}
744 
745 		num_entries = RTE_MIN(num_entries + disclaims->num_to_disclaim,
746 				disclaims->num_claimed);
747 		opdl_stage_disclaim_multithread_n(s, num_entries, block);
748 	}
749 }
750 
751 int
opdl_stage_disclaim(struct opdl_stage * s,uint32_t num_entries,bool block)752 opdl_stage_disclaim(struct opdl_stage *s, uint32_t num_entries, bool block)
753 {
754 	if (num_entries != s->num_event) {
755 		rte_errno = EINVAL;
756 		return 0;
757 	}
758 	if (s->threadsafe == false) {
759 		__atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
760 		s->seq += s->num_claimed;
761 		s->shadow_head = s->head;
762 		s->num_claimed = 0;
763 	} else {
764 		struct claim_manager *disclaims =
765 				&s->pending_disclaims[rte_lcore_id()];
766 		opdl_stage_disclaim_multithread_n(s, disclaims->num_claimed,
767 				block);
768 	}
769 	return num_entries;
770 }
771 
772 uint32_t
opdl_ring_available(struct opdl_ring * t)773 opdl_ring_available(struct opdl_ring *t)
774 {
775 	return opdl_stage_available(&t->stages[0]);
776 }
777 
778 uint32_t
opdl_stage_available(struct opdl_stage * s)779 opdl_stage_available(struct opdl_stage *s)
780 {
781 	update_available_seq(s);
782 	return available(s);
783 }
784 
785 void
opdl_ring_flush(struct opdl_ring * t)786 opdl_ring_flush(struct opdl_ring *t)
787 {
788 	struct opdl_stage *s = input_stage(t);
789 
790 	wait_for_available(s, s->num_slots);
791 }
792 
793 /******************** Non performance sensitive functions ********************/
794 
795 /* Initial setup of a new stage's context */
796 static int
init_stage(struct opdl_ring * t,struct opdl_stage * s,bool threadsafe,bool is_input)797 init_stage(struct opdl_ring *t, struct opdl_stage *s, bool threadsafe,
798 		bool is_input)
799 {
800 	uint32_t available = (is_input) ? t->num_slots : 0;
801 
802 	s->t = t;
803 	s->num_slots = t->num_slots;
804 	s->index = t->num_stages;
805 	s->threadsafe = threadsafe;
806 	s->shared.stage = s;
807 
808 	/* Alloc memory for deps */
809 	s->dep_tracking = rte_zmalloc_socket(LIB_NAME,
810 			t->max_num_stages * sizeof(enum dep_type),
811 			0, t->socket);
812 	if (s->dep_tracking == NULL)
813 		return -ENOMEM;
814 
815 	s->deps = rte_zmalloc_socket(LIB_NAME,
816 			t->max_num_stages * sizeof(struct shared_state *),
817 			0, t->socket);
818 	if (s->deps == NULL) {
819 		rte_free(s->dep_tracking);
820 		return -ENOMEM;
821 	}
822 
823 	s->dep_tracking[s->index] = DEP_SELF;
824 
825 	if (threadsafe == true)
826 		s->shared.available_seq = available;
827 	else
828 		s->available_seq = available;
829 
830 	return 0;
831 }
832 
833 /* Add direct or indirect dependencies between stages */
834 static int
add_dep(struct opdl_stage * dependent,const struct opdl_stage * dependency,enum dep_type type)835 add_dep(struct opdl_stage *dependent, const struct opdl_stage *dependency,
836 		enum dep_type type)
837 {
838 	struct opdl_ring *t = dependent->t;
839 	uint32_t i;
840 
841 	/* Add new direct dependency */
842 	if ((type == DEP_DIRECT) &&
843 			(dependent->dep_tracking[dependency->index] ==
844 					DEP_NONE)) {
845 		PMD_DRV_LOG(DEBUG, "%s:%u direct dependency on %u",
846 				t->name, dependent->index, dependency->index);
847 		dependent->dep_tracking[dependency->index] = DEP_DIRECT;
848 	}
849 
850 	/* Add new indirect dependency or change direct to indirect */
851 	if ((type == DEP_INDIRECT) &&
852 			((dependent->dep_tracking[dependency->index] ==
853 			DEP_NONE) ||
854 			(dependent->dep_tracking[dependency->index] ==
855 			DEP_DIRECT))) {
856 		PMD_DRV_LOG(DEBUG, "%s:%u indirect dependency on %u",
857 				t->name, dependent->index, dependency->index);
858 		dependent->dep_tracking[dependency->index] = DEP_INDIRECT;
859 	}
860 
861 	/* Shouldn't happen... */
862 	if ((dependent->dep_tracking[dependency->index] == DEP_SELF) &&
863 			(dependent != input_stage(t))) {
864 		PMD_DRV_LOG(ERR, "Loop in dependency graph %s:%u",
865 				t->name, dependent->index);
866 		return -EINVAL;
867 	}
868 
869 	/* Keep going to dependencies of the dependency, until input stage */
870 	if (dependency != input_stage(t))
871 		for (i = 0; i < dependency->num_deps; i++) {
872 			int ret = add_dep(dependent, dependency->deps[i]->stage,
873 					DEP_INDIRECT);
874 
875 			if (ret < 0)
876 				return ret;
877 		}
878 
879 	/* Make list of sequence numbers for direct dependencies only */
880 	if (type == DEP_DIRECT)
881 		for (i = 0, dependent->num_deps = 0; i < t->num_stages; i++)
882 			if (dependent->dep_tracking[i] == DEP_DIRECT) {
883 				if ((i == 0) && (dependent->num_deps > 1))
884 					rte_panic("%s:%u depends on > input",
885 							t->name,
886 							dependent->index);
887 				dependent->deps[dependent->num_deps++] =
888 						&t->stages[i].shared;
889 			}
890 
891 	return 0;
892 }
893 
894 struct opdl_ring *
opdl_ring_create(const char * name,uint32_t num_slots,uint32_t slot_size,uint32_t max_num_stages,int socket)895 opdl_ring_create(const char *name, uint32_t num_slots, uint32_t slot_size,
896 		uint32_t max_num_stages, int socket)
897 {
898 	struct opdl_ring *t;
899 	char mz_name[RTE_MEMZONE_NAMESIZE];
900 	int mz_flags = 0;
901 	struct opdl_stage *st = NULL;
902 	const struct rte_memzone *mz = NULL;
903 	size_t alloc_size = RTE_CACHE_LINE_ROUNDUP(sizeof(*t) +
904 			(num_slots * slot_size));
905 
906 	/* Compile time checking */
907 	RTE_BUILD_BUG_ON((sizeof(struct shared_state) & RTE_CACHE_LINE_MASK) !=
908 			0);
909 	RTE_BUILD_BUG_ON((offsetof(struct opdl_stage, shared) &
910 			RTE_CACHE_LINE_MASK) != 0);
911 	RTE_BUILD_BUG_ON((offsetof(struct opdl_ring, slots) &
912 			RTE_CACHE_LINE_MASK) != 0);
913 	RTE_BUILD_BUG_ON(!rte_is_power_of_2(OPDL_DISCLAIMS_PER_LCORE));
914 
915 	/* Parameter checking */
916 	if (name == NULL) {
917 		PMD_DRV_LOG(ERR, "name param is NULL");
918 		return NULL;
919 	}
920 	if (!rte_is_power_of_2(num_slots)) {
921 		PMD_DRV_LOG(ERR, "num_slots (%u) for %s is not power of 2",
922 				num_slots, name);
923 		return NULL;
924 	}
925 
926 	/* Alloc memory for stages */
927 	st = rte_zmalloc_socket(LIB_NAME,
928 		max_num_stages * sizeof(struct opdl_stage),
929 		RTE_CACHE_LINE_SIZE, socket);
930 	if (st == NULL)
931 		goto exit_fail;
932 
933 	snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
934 
935 	/* Alloc memory for memzone */
936 	mz = rte_memzone_reserve(mz_name, alloc_size, socket, mz_flags);
937 	if (mz == NULL)
938 		goto exit_fail;
939 
940 	t = mz->addr;
941 
942 	/* Initialise opdl_ring queue */
943 	memset(t, 0, sizeof(*t));
944 	strlcpy(t->name, name, sizeof(t->name));
945 	t->socket = socket;
946 	t->num_slots = num_slots;
947 	t->mask = num_slots - 1;
948 	t->slot_size = slot_size;
949 	t->max_num_stages = max_num_stages;
950 	t->stages = st;
951 
952 	PMD_DRV_LOG(DEBUG, "Created %s at %p (num_slots=%u,socket=%i,slot_size=%u)",
953 			t->name, t, num_slots, socket, slot_size);
954 
955 	return t;
956 
957 exit_fail:
958 	PMD_DRV_LOG(ERR, "Cannot reserve memory");
959 	rte_free(st);
960 	rte_memzone_free(mz);
961 
962 	return NULL;
963 }
964 
965 void *
opdl_ring_get_slot(const struct opdl_ring * t,uint32_t index)966 opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index)
967 {
968 	return get_slot(t, index);
969 }
970 
971 bool
opdl_ring_cas_slot(struct opdl_stage * s,const struct rte_event * ev,uint32_t index,bool atomic)972 opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev,
973 		uint32_t index, bool atomic)
974 {
975 	uint32_t i = 0, offset;
976 	struct opdl_ring *t = s->t;
977 	struct rte_event *ev_orig = NULL;
978 	bool ev_updated = false;
979 	uint64_t ev_temp    = 0;
980 	uint64_t ev_update  = 0;
981 
982 	uint32_t opa_id   = 0;
983 	uint32_t flow_id  = 0;
984 	uint64_t event    = 0;
985 
986 	if (index > s->num_event) {
987 		PMD_DRV_LOG(ERR, "index is overflow");
988 		return ev_updated;
989 	}
990 
991 	ev_temp = ev->event & OPDL_EVENT_MASK;
992 
993 	if (!atomic) {
994 		offset = opdl_first_entry_id(s->seq, s->nb_instance,
995 				s->instance_id);
996 		offset += index*s->nb_instance;
997 		ev_orig = get_slot(t, s->shadow_head+offset);
998 		if ((ev_orig->event&OPDL_EVENT_MASK) != ev_temp) {
999 			ev_orig->event = ev->event;
1000 			ev_updated = true;
1001 		}
1002 		if (ev_orig->u64 != ev->u64) {
1003 			ev_orig->u64 = ev->u64;
1004 			ev_updated = true;
1005 		}
1006 
1007 	} else {
1008 		for (i = s->pos; i < s->num_claimed; i++) {
1009 			ev_orig = (struct rte_event *)
1010 				get_slot(t, s->shadow_head+i);
1011 
1012 			event  = __atomic_load_n(&(ev_orig->event),
1013 					__ATOMIC_ACQUIRE);
1014 
1015 			opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET);
1016 			flow_id  = OPDL_FLOWID_MASK & event;
1017 
1018 			if (opa_id >= s->queue_id)
1019 				continue;
1020 
1021 			if ((flow_id % s->nb_instance) == s->instance_id) {
1022 				ev_update = s->queue_id;
1023 				ev_update = (ev_update << OPDL_OPA_OFFSET)
1024 					| ev->event;
1025 
1026 				s->pos = i + 1;
1027 
1028 				if ((event & OPDL_EVENT_MASK) !=
1029 						ev_temp) {
1030 					__atomic_store_n(&(ev_orig->event),
1031 							ev_update,
1032 							__ATOMIC_RELEASE);
1033 					ev_updated = true;
1034 				}
1035 				if (ev_orig->u64 != ev->u64) {
1036 					ev_orig->u64 = ev->u64;
1037 					ev_updated = true;
1038 				}
1039 
1040 				break;
1041 			}
1042 		}
1043 
1044 	}
1045 
1046 	return ev_updated;
1047 }
1048 
1049 int
opdl_ring_get_socket(const struct opdl_ring * t)1050 opdl_ring_get_socket(const struct opdl_ring *t)
1051 {
1052 	return t->socket;
1053 }
1054 
1055 uint32_t
opdl_ring_get_num_slots(const struct opdl_ring * t)1056 opdl_ring_get_num_slots(const struct opdl_ring *t)
1057 {
1058 	return t->num_slots;
1059 }
1060 
1061 const char *
opdl_ring_get_name(const struct opdl_ring * t)1062 opdl_ring_get_name(const struct opdl_ring *t)
1063 {
1064 	return t->name;
1065 }
1066 
1067 /* Check dependency list is valid for a given opdl_ring */
1068 static int
check_deps(struct opdl_ring * t,struct opdl_stage * deps[],uint32_t num_deps)1069 check_deps(struct opdl_ring *t, struct opdl_stage *deps[],
1070 		uint32_t num_deps)
1071 {
1072 	unsigned int i;
1073 
1074 	for (i = 0; i < num_deps; ++i) {
1075 		if (!deps[i]) {
1076 			PMD_DRV_LOG(ERR, "deps[%u] is NULL", i);
1077 			return -EINVAL;
1078 		}
1079 		if (t != deps[i]->t) {
1080 			PMD_DRV_LOG(ERR, "deps[%u] is in opdl_ring %s, not %s",
1081 					i, deps[i]->t->name, t->name);
1082 			return -EINVAL;
1083 		}
1084 	}
1085 
1086 	return 0;
1087 }
1088 
1089 struct opdl_stage *
opdl_stage_add(struct opdl_ring * t,bool threadsafe,bool is_input)1090 opdl_stage_add(struct opdl_ring *t, bool threadsafe, bool is_input)
1091 {
1092 	struct opdl_stage *s;
1093 
1094 	/* Parameter checking */
1095 	if (!t) {
1096 		PMD_DRV_LOG(ERR, "opdl_ring is NULL");
1097 		return NULL;
1098 	}
1099 	if (t->num_stages == t->max_num_stages) {
1100 		PMD_DRV_LOG(ERR, "%s has max number of stages (%u)",
1101 				t->name, t->max_num_stages);
1102 		return NULL;
1103 	}
1104 
1105 	s = &t->stages[t->num_stages];
1106 
1107 	if (((uintptr_t)&s->shared & RTE_CACHE_LINE_MASK) != 0)
1108 		PMD_DRV_LOG(WARNING, "Tail seq num (%p) of %s stage not cache aligned",
1109 				&s->shared, t->name);
1110 
1111 	if (init_stage(t, s, threadsafe, is_input) < 0) {
1112 		PMD_DRV_LOG(ERR, "Cannot reserve memory");
1113 		return NULL;
1114 	}
1115 	t->num_stages++;
1116 
1117 	return s;
1118 }
1119 
1120 uint32_t
opdl_stage_deps_add(struct opdl_ring * t,struct opdl_stage * s,uint32_t nb_instance,uint32_t instance_id,struct opdl_stage * deps[],uint32_t num_deps)1121 opdl_stage_deps_add(struct opdl_ring *t, struct opdl_stage *s,
1122 		uint32_t nb_instance, uint32_t instance_id,
1123 		struct opdl_stage *deps[],
1124 		uint32_t num_deps)
1125 {
1126 	uint32_t i;
1127 	int ret = 0;
1128 
1129 	if ((num_deps > 0) && (!deps)) {
1130 		PMD_DRV_LOG(ERR, "%s stage has NULL dependencies", t->name);
1131 		return -1;
1132 	}
1133 	ret = check_deps(t, deps, num_deps);
1134 	if (ret < 0)
1135 		return ret;
1136 
1137 	for (i = 0; i < num_deps; i++) {
1138 		ret = add_dep(s, deps[i], DEP_DIRECT);
1139 		if (ret < 0)
1140 			return ret;
1141 	}
1142 
1143 	s->nb_instance = nb_instance;
1144 	s->instance_id = instance_id;
1145 
1146 	return ret;
1147 }
1148 
1149 struct opdl_stage *
opdl_ring_get_input_stage(const struct opdl_ring * t)1150 opdl_ring_get_input_stage(const struct opdl_ring *t)
1151 {
1152 	return input_stage(t);
1153 }
1154 
1155 int
opdl_stage_set_deps(struct opdl_stage * s,struct opdl_stage * deps[],uint32_t num_deps)1156 opdl_stage_set_deps(struct opdl_stage *s, struct opdl_stage *deps[],
1157 		uint32_t num_deps)
1158 {
1159 	unsigned int i;
1160 	int ret;
1161 
1162 	if ((num_deps == 0) || (!deps)) {
1163 		PMD_DRV_LOG(ERR, "cannot set NULL dependencies");
1164 		return -EINVAL;
1165 	}
1166 
1167 	ret = check_deps(s->t, deps, num_deps);
1168 	if (ret < 0)
1169 		return ret;
1170 
1171 	/* Update deps */
1172 	for (i = 0; i < num_deps; i++)
1173 		s->deps[i] = &deps[i]->shared;
1174 	s->num_deps = num_deps;
1175 
1176 	return 0;
1177 }
1178 
1179 struct opdl_ring *
opdl_stage_get_opdl_ring(const struct opdl_stage * s)1180 opdl_stage_get_opdl_ring(const struct opdl_stage *s)
1181 {
1182 	return s->t;
1183 }
1184 
1185 void
opdl_stage_set_queue_id(struct opdl_stage * s,uint32_t queue_id)1186 opdl_stage_set_queue_id(struct opdl_stage *s,
1187 		uint32_t queue_id)
1188 {
1189 	s->queue_id = queue_id;
1190 }
1191 
1192 void
opdl_ring_dump(const struct opdl_ring * t,FILE * f)1193 opdl_ring_dump(const struct opdl_ring *t, FILE *f)
1194 {
1195 	uint32_t i;
1196 
1197 	if (t == NULL) {
1198 		fprintf(f, "NULL OPDL!\n");
1199 		return;
1200 	}
1201 	fprintf(f, "OPDL \"%s\": num_slots=%u; mask=%#x; slot_size=%u; num_stages=%u; socket=%i\n",
1202 			t->name, t->num_slots, t->mask, t->slot_size,
1203 			t->num_stages, t->socket);
1204 	for (i = 0; i < t->num_stages; i++) {
1205 		uint32_t j;
1206 		const struct opdl_stage *s = &t->stages[i];
1207 
1208 		fprintf(f, "  %s[%u]: threadsafe=%s; head=%u; available_seq=%u; tail=%u; deps=%u",
1209 				t->name, i, (s->threadsafe) ? "true" : "false",
1210 				(s->threadsafe) ? s->shared.head : s->head,
1211 				(s->threadsafe) ? s->shared.available_seq :
1212 				s->available_seq,
1213 				s->shared.tail, (s->num_deps > 0) ?
1214 				s->deps[0]->stage->index : 0);
1215 		for (j = 1; j < s->num_deps; j++)
1216 			fprintf(f, ",%u", s->deps[j]->stage->index);
1217 		fprintf(f, "\n");
1218 	}
1219 	fflush(f);
1220 }
1221 
1222 void
opdl_ring_free(struct opdl_ring * t)1223 opdl_ring_free(struct opdl_ring *t)
1224 {
1225 	uint32_t i;
1226 	const struct rte_memzone *mz;
1227 	char mz_name[RTE_MEMZONE_NAMESIZE];
1228 
1229 	if (t == NULL) {
1230 		PMD_DRV_LOG(DEBUG, "Freeing NULL OPDL Ring!");
1231 		return;
1232 	}
1233 
1234 	PMD_DRV_LOG(DEBUG, "Freeing %s opdl_ring at %p", t->name, t);
1235 
1236 	for (i = 0; i < t->num_stages; ++i) {
1237 		rte_free(t->stages[i].deps);
1238 		rte_free(t->stages[i].dep_tracking);
1239 	}
1240 
1241 	rte_free(t->stages);
1242 
1243 	snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, t->name);
1244 	mz = rte_memzone_lookup(mz_name);
1245 	if (rte_memzone_free(mz) != 0)
1246 		PMD_DRV_LOG(ERR, "Cannot free memzone for %s", t->name);
1247 }
1248 
1249 /* search a opdl_ring from its name */
1250 struct opdl_ring *
opdl_ring_lookup(const char * name)1251 opdl_ring_lookup(const char *name)
1252 {
1253 	const struct rte_memzone *mz;
1254 	char mz_name[RTE_MEMZONE_NAMESIZE];
1255 
1256 	snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
1257 
1258 	mz = rte_memzone_lookup(mz_name);
1259 	if (mz == NULL)
1260 		return NULL;
1261 
1262 	return mz->addr;
1263 }
1264 
1265 void
opdl_ring_set_stage_threadsafe(struct opdl_stage * s,bool threadsafe)1266 opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe)
1267 {
1268 	s->threadsafe = threadsafe;
1269 }
1270