1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(C) 2020 Marvell International Ltd.
3 */
4
5 #ifndef _RTE_GRAPH_WORKER_H_
6 #define _RTE_GRAPH_WORKER_H_
7
8 /**
9 * @file rte_graph_worker.h
10 *
11 * @warning
12 * @b EXPERIMENTAL:
13 * All functions in this file may be changed or removed without prior notice.
14 *
15 * This API allows a worker thread to walk over a graph and nodes to create,
16 * process, enqueue and move streams of objects to the next nodes.
17 */
18
19 #include <rte_common.h>
20 #include <rte_cycles.h>
21 #include <rte_prefetch.h>
22 #include <rte_memcpy.h>
23 #include <rte_memory.h>
24
25 #include "rte_graph.h"
26
27 #ifdef __cplusplus
28 extern "C" {
29 #endif
30
31 /**
32 * @internal
33 *
34 * Data structure to hold graph data.
35 */
36 struct rte_graph {
37 uint32_t tail; /**< Tail of circular buffer. */
38 uint32_t head; /**< Head of circular buffer. */
39 uint32_t cir_mask; /**< Circular buffer wrap around mask. */
40 rte_node_t nb_nodes; /**< Number of nodes in the graph. */
41 rte_graph_off_t *cir_start; /**< Pointer to circular buffer. */
42 rte_graph_off_t nodes_start; /**< Offset at which node memory starts. */
43 rte_graph_t id; /**< Graph identifier. */
44 int socket; /**< Socket ID where memory is allocated. */
45 char name[RTE_GRAPH_NAMESIZE]; /**< Name of the graph. */
46 uint64_t fence; /**< Fence. */
47 } __rte_cache_aligned;
48
49 /**
50 * @internal
51 *
52 * Data structure to hold node data.
53 */
54 struct rte_node {
55 /* Slow path area */
56 uint64_t fence; /**< Fence. */
57 rte_graph_off_t next; /**< Index to next node. */
58 rte_node_t id; /**< Node identifier. */
59 rte_node_t parent_id; /**< Parent Node identifier. */
60 rte_edge_t nb_edges; /**< Number of edges from this node. */
61 uint32_t realloc_count; /**< Number of times realloced. */
62
63 char parent[RTE_NODE_NAMESIZE]; /**< Parent node name. */
64 char name[RTE_NODE_NAMESIZE]; /**< Name of the node. */
65
66 /* Fast path area */
67 #define RTE_NODE_CTX_SZ 16
68 uint8_t ctx[RTE_NODE_CTX_SZ] __rte_cache_aligned; /**< Node Context. */
69 uint16_t size; /**< Total number of objects available. */
70 uint16_t idx; /**< Number of objects used. */
71 rte_graph_off_t off; /**< Offset of node in the graph reel. */
72 uint64_t total_cycles; /**< Cycles spent in this node. */
73 uint64_t total_calls; /**< Calls done to this node. */
74 uint64_t total_objs; /**< Objects processed by this node. */
75 RTE_STD_C11
76 union {
77 void **objs; /**< Array of object pointers. */
78 uint64_t objs_u64;
79 };
80 RTE_STD_C11
81 union {
82 rte_node_process_t process; /**< Process function. */
83 uint64_t process_u64;
84 };
85 struct rte_node *nodes[] __rte_cache_min_aligned; /**< Next nodes. */
86 } __rte_cache_aligned;
87
88 /**
89 * @internal
90 *
91 * Allocate a stream of objects.
92 *
93 * If stream already exists then re-allocate it to a larger size.
94 *
95 * @param graph
96 * Pointer to the graph object.
97 * @param node
98 * Pointer to the node object.
99 */
100 __rte_experimental
101 void __rte_node_stream_alloc(struct rte_graph *graph, struct rte_node *node);
102
103 /**
104 * @internal
105 *
106 * Allocate a stream with requested number of objects.
107 *
108 * If stream already exists then re-allocate it to a larger size.
109 *
110 * @param graph
111 * Pointer to the graph object.
112 * @param node
113 * Pointer to the node object.
114 * @param req_size
115 * Number of objects to be allocated.
116 */
117 __rte_experimental
118 void __rte_node_stream_alloc_size(struct rte_graph *graph,
119 struct rte_node *node, uint16_t req_size);
120
121 /**
122 * Perform graph walk on the circular buffer and invoke the process function
123 * of the nodes and collect the stats.
124 *
125 * @param graph
126 * Graph pointer returned from rte_graph_lookup function.
127 *
128 * @see rte_graph_lookup()
129 */
130 __rte_experimental
131 static inline void
rte_graph_walk(struct rte_graph * graph)132 rte_graph_walk(struct rte_graph *graph)
133 {
134 const rte_graph_off_t *cir_start = graph->cir_start;
135 const rte_node_t mask = graph->cir_mask;
136 uint32_t head = graph->head;
137 struct rte_node *node;
138 uint64_t start;
139 uint16_t rc;
140 void **objs;
141
142 /*
143 * Walk on the source node(s) ((cir_start - head) -> cir_start) and then
144 * on the pending streams (cir_start -> (cir_start + mask) -> cir_start)
145 * in a circular buffer fashion.
146 *
147 * +-----+ <= cir_start - head [number of source nodes]
148 * | |
149 * | ... | <= source nodes
150 * | |
151 * +-----+ <= cir_start [head = 0] [tail = 0]
152 * | |
153 * | ... | <= pending streams
154 * | |
155 * +-----+ <= cir_start + mask
156 */
157 while (likely(head != graph->tail)) {
158 node = RTE_PTR_ADD(graph, cir_start[(int32_t)head++]);
159 RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
160 objs = node->objs;
161 rte_prefetch0(objs);
162
163 if (rte_graph_has_stats_feature()) {
164 start = rte_rdtsc();
165 rc = node->process(graph, node, objs, node->idx);
166 node->total_cycles += rte_rdtsc() - start;
167 node->total_calls++;
168 node->total_objs += rc;
169 } else {
170 node->process(graph, node, objs, node->idx);
171 }
172 node->idx = 0;
173 head = likely((int32_t)head > 0) ? head & mask : head;
174 }
175 graph->tail = 0;
176 }
177
178 /* Fast path helper functions */
179
180 /**
181 * @internal
182 *
183 * Enqueue a given node to the tail of the graph reel.
184 *
185 * @param graph
186 * Pointer Graph object.
187 * @param node
188 * Pointer to node object to be enqueued.
189 */
190 static __rte_always_inline void
__rte_node_enqueue_tail_update(struct rte_graph * graph,struct rte_node * node)191 __rte_node_enqueue_tail_update(struct rte_graph *graph, struct rte_node *node)
192 {
193 uint32_t tail;
194
195 tail = graph->tail;
196 graph->cir_start[tail++] = node->off;
197 graph->tail = tail & graph->cir_mask;
198 }
199
200 /**
201 * @internal
202 *
203 * Enqueue sequence prologue function.
204 *
205 * Updates the node to tail of graph reel and resizes the number of objects
206 * available in the stream as needed.
207 *
208 * @param graph
209 * Pointer to the graph object.
210 * @param node
211 * Pointer to the node object.
212 * @param idx
213 * Index at which the object enqueue starts from.
214 * @param space
215 * Space required for the object enqueue.
216 */
217 static __rte_always_inline void
__rte_node_enqueue_prologue(struct rte_graph * graph,struct rte_node * node,const uint16_t idx,const uint16_t space)218 __rte_node_enqueue_prologue(struct rte_graph *graph, struct rte_node *node,
219 const uint16_t idx, const uint16_t space)
220 {
221
222 /* Add to the pending stream list if the node is new */
223 if (idx == 0)
224 __rte_node_enqueue_tail_update(graph, node);
225
226 if (unlikely(node->size < (idx + space)))
227 __rte_node_stream_alloc(graph, node);
228 }
229
230 /**
231 * @internal
232 *
233 * Get the node pointer from current node edge id.
234 *
235 * @param node
236 * Current node pointer.
237 * @param next
238 * Edge id of the required node.
239 *
240 * @return
241 * Pointer to the node denoted by the edge id.
242 */
243 static __rte_always_inline struct rte_node *
__rte_node_next_node_get(struct rte_node * node,rte_edge_t next)244 __rte_node_next_node_get(struct rte_node *node, rte_edge_t next)
245 {
246 RTE_ASSERT(next < node->nb_edges);
247 RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
248 node = node->nodes[next];
249 RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
250
251 return node;
252 }
253
254 /**
255 * Enqueue the objs to next node for further processing and set
256 * the next node to pending state in the circular buffer.
257 *
258 * @param graph
259 * Graph pointer returned from rte_graph_lookup().
260 * @param node
261 * Current node pointer.
262 * @param next
263 * Relative next node index to enqueue objs.
264 * @param objs
265 * Objs to enqueue.
266 * @param nb_objs
267 * Number of objs to enqueue.
268 */
269 __rte_experimental
270 static inline void
rte_node_enqueue(struct rte_graph * graph,struct rte_node * node,rte_edge_t next,void ** objs,uint16_t nb_objs)271 rte_node_enqueue(struct rte_graph *graph, struct rte_node *node,
272 rte_edge_t next, void **objs, uint16_t nb_objs)
273 {
274 node = __rte_node_next_node_get(node, next);
275 const uint16_t idx = node->idx;
276
277 __rte_node_enqueue_prologue(graph, node, idx, nb_objs);
278
279 rte_memcpy(&node->objs[idx], objs, nb_objs * sizeof(void *));
280 node->idx = idx + nb_objs;
281 }
282
283 /**
284 * Enqueue only one obj to next node for further processing and
285 * set the next node to pending state in the circular buffer.
286 *
287 * @param graph
288 * Graph pointer returned from rte_graph_lookup().
289 * @param node
290 * Current node pointer.
291 * @param next
292 * Relative next node index to enqueue objs.
293 * @param obj
294 * Obj to enqueue.
295 */
296 __rte_experimental
297 static inline void
rte_node_enqueue_x1(struct rte_graph * graph,struct rte_node * node,rte_edge_t next,void * obj)298 rte_node_enqueue_x1(struct rte_graph *graph, struct rte_node *node,
299 rte_edge_t next, void *obj)
300 {
301 node = __rte_node_next_node_get(node, next);
302 uint16_t idx = node->idx;
303
304 __rte_node_enqueue_prologue(graph, node, idx, 1);
305
306 node->objs[idx++] = obj;
307 node->idx = idx;
308 }
309
310 /**
311 * Enqueue only two objs to next node for further processing and
312 * set the next node to pending state in the circular buffer.
313 * Same as rte_node_enqueue_x1 but enqueue two objs.
314 *
315 * @param graph
316 * Graph pointer returned from rte_graph_lookup().
317 * @param node
318 * Current node pointer.
319 * @param next
320 * Relative next node index to enqueue objs.
321 * @param obj0
322 * Obj to enqueue.
323 * @param obj1
324 * Obj to enqueue.
325 */
326 __rte_experimental
327 static inline void
rte_node_enqueue_x2(struct rte_graph * graph,struct rte_node * node,rte_edge_t next,void * obj0,void * obj1)328 rte_node_enqueue_x2(struct rte_graph *graph, struct rte_node *node,
329 rte_edge_t next, void *obj0, void *obj1)
330 {
331 node = __rte_node_next_node_get(node, next);
332 uint16_t idx = node->idx;
333
334 __rte_node_enqueue_prologue(graph, node, idx, 2);
335
336 node->objs[idx++] = obj0;
337 node->objs[idx++] = obj1;
338 node->idx = idx;
339 }
340
341 /**
342 * Enqueue only four objs to next node for further processing and
343 * set the next node to pending state in the circular buffer.
344 * Same as rte_node_enqueue_x1 but enqueue four objs.
345 *
346 * @param graph
347 * Graph pointer returned from rte_graph_lookup().
348 * @param node
349 * Current node pointer.
350 * @param next
351 * Relative next node index to enqueue objs.
352 * @param obj0
353 * 1st obj to enqueue.
354 * @param obj1
355 * 2nd obj to enqueue.
356 * @param obj2
357 * 3rd obj to enqueue.
358 * @param obj3
359 * 4th obj to enqueue.
360 */
361 __rte_experimental
362 static inline void
rte_node_enqueue_x4(struct rte_graph * graph,struct rte_node * node,rte_edge_t next,void * obj0,void * obj1,void * obj2,void * obj3)363 rte_node_enqueue_x4(struct rte_graph *graph, struct rte_node *node,
364 rte_edge_t next, void *obj0, void *obj1, void *obj2,
365 void *obj3)
366 {
367 node = __rte_node_next_node_get(node, next);
368 uint16_t idx = node->idx;
369
370 __rte_node_enqueue_prologue(graph, node, idx, 4);
371
372 node->objs[idx++] = obj0;
373 node->objs[idx++] = obj1;
374 node->objs[idx++] = obj2;
375 node->objs[idx++] = obj3;
376 node->idx = idx;
377 }
378
379 /**
380 * Enqueue objs to multiple next nodes for further processing and
381 * set the next nodes to pending state in the circular buffer.
382 * objs[i] will be enqueued to nexts[i].
383 *
384 * @param graph
385 * Graph pointer returned from rte_graph_lookup().
386 * @param node
387 * Current node pointer.
388 * @param nexts
389 * List of relative next node indices to enqueue objs.
390 * @param objs
391 * List of objs to enqueue.
392 * @param nb_objs
393 * Number of objs to enqueue.
394 */
395 __rte_experimental
396 static inline void
rte_node_enqueue_next(struct rte_graph * graph,struct rte_node * node,rte_edge_t * nexts,void ** objs,uint16_t nb_objs)397 rte_node_enqueue_next(struct rte_graph *graph, struct rte_node *node,
398 rte_edge_t *nexts, void **objs, uint16_t nb_objs)
399 {
400 uint16_t i;
401
402 for (i = 0; i < nb_objs; i++)
403 rte_node_enqueue_x1(graph, node, nexts[i], objs[i]);
404 }
405
406 /**
407 * Get the stream of next node to enqueue the objs.
408 * Once done with the updating the objs, needs to call
409 * rte_node_next_stream_put to put the next node to pending state.
410 *
411 * @param graph
412 * Graph pointer returned from rte_graph_lookup().
413 * @param node
414 * Current node pointer.
415 * @param next
416 * Relative next node index to get stream.
417 * @param nb_objs
418 * Requested free size of the next stream.
419 *
420 * @return
421 * Valid next stream on success.
422 *
423 * @see rte_node_next_stream_put().
424 */
425 __rte_experimental
426 static inline void **
rte_node_next_stream_get(struct rte_graph * graph,struct rte_node * node,rte_edge_t next,uint16_t nb_objs)427 rte_node_next_stream_get(struct rte_graph *graph, struct rte_node *node,
428 rte_edge_t next, uint16_t nb_objs)
429 {
430 node = __rte_node_next_node_get(node, next);
431 const uint16_t idx = node->idx;
432 uint16_t free_space = node->size - idx;
433
434 if (unlikely(free_space < nb_objs))
435 __rte_node_stream_alloc_size(graph, node, nb_objs);
436
437 return &node->objs[idx];
438 }
439
440 /**
441 * Put the next stream to pending state in the circular buffer
442 * for further processing. Should be invoked after rte_node_next_stream_get().
443 *
444 * @param graph
445 * Graph pointer returned from rte_graph_lookup().
446 * @param node
447 * Current node pointer.
448 * @param next
449 * Relative next node index..
450 * @param idx
451 * Number of objs updated in the stream after getting the stream using
452 * rte_node_next_stream_get.
453 *
454 * @see rte_node_next_stream_get().
455 */
456 __rte_experimental
457 static inline void
rte_node_next_stream_put(struct rte_graph * graph,struct rte_node * node,rte_edge_t next,uint16_t idx)458 rte_node_next_stream_put(struct rte_graph *graph, struct rte_node *node,
459 rte_edge_t next, uint16_t idx)
460 {
461 if (unlikely(!idx))
462 return;
463
464 node = __rte_node_next_node_get(node, next);
465 if (node->idx == 0)
466 __rte_node_enqueue_tail_update(graph, node);
467
468 node->idx += idx;
469 }
470
471 /**
472 * Home run scenario, Enqueue all the objs of current node to next
473 * node in optimized way by swapping the streams of both nodes.
474 * Performs good when next node is already not in pending state.
475 * If next node is already in pending state then normal enqueue
476 * will be used.
477 *
478 * @param graph
479 * Graph pointer returned from rte_graph_lookup().
480 * @param src
481 * Current node pointer.
482 * @param next
483 * Relative next node index.
484 */
485 __rte_experimental
486 static inline void
rte_node_next_stream_move(struct rte_graph * graph,struct rte_node * src,rte_edge_t next)487 rte_node_next_stream_move(struct rte_graph *graph, struct rte_node *src,
488 rte_edge_t next)
489 {
490 struct rte_node *dst = __rte_node_next_node_get(src, next);
491
492 /* Let swap the pointers if dst don't have valid objs */
493 if (likely(dst->idx == 0)) {
494 void **dobjs = dst->objs;
495 uint16_t dsz = dst->size;
496 dst->objs = src->objs;
497 dst->size = src->size;
498 src->objs = dobjs;
499 src->size = dsz;
500 dst->idx = src->idx;
501 __rte_node_enqueue_tail_update(graph, dst);
502 } else { /* Move the objects from src node to dst node */
503 rte_node_enqueue(graph, src, next, src->objs, src->idx);
504 }
505 }
506
507 #ifdef __cplusplus
508 }
509 #endif
510
511 #endif /* _RTE_GRAPH_WORKER_H_ */
512