1a9643ea8Slogwang /*
2*d30ea906Sjfb8856606 * SPDX-License-Identifier: BSD-3-Clause
3*d30ea906Sjfb8856606 * Copyright 2015 Intel Corporation.
4*d30ea906Sjfb8856606 * Copyright 2010-2011 Dmitry Vyukov
5a9643ea8Slogwang */
6a9643ea8Slogwang
7a9643ea8Slogwang #ifndef LTHREAD_QUEUE_H_
8a9643ea8Slogwang #define LTHREAD_QUEUE_H_
9a9643ea8Slogwang
102bfe3f2eSlogwang #ifdef __cplusplus
112bfe3f2eSlogwang extern "C" {
122bfe3f2eSlogwang #endif
132bfe3f2eSlogwang
14a9643ea8Slogwang #include <string.h>
15a9643ea8Slogwang
16a9643ea8Slogwang #include <rte_prefetch.h>
17a9643ea8Slogwang #include <rte_per_lcore.h>
18a9643ea8Slogwang
19a9643ea8Slogwang #include "lthread_int.h"
20a9643ea8Slogwang #include "lthread.h"
21a9643ea8Slogwang #include "lthread_diag.h"
22a9643ea8Slogwang #include "lthread_pool.h"
23a9643ea8Slogwang
24a9643ea8Slogwang struct lthread_queue;
25a9643ea8Slogwang
26a9643ea8Slogwang /*
27a9643ea8Slogwang * This file implements an unbounded FIFO queue based on a lock free
28a9643ea8Slogwang * linked list.
29a9643ea8Slogwang *
30a9643ea8Slogwang * The queue is non-intrusive in that it uses intermediate nodes, and does
31a9643ea8Slogwang * not require these nodes to be inserted into the object being placed
32a9643ea8Slogwang * in the queue.
33a9643ea8Slogwang *
34a9643ea8Slogwang * This is slightly more efficient than the very similar queue in lthread_pool
35a9643ea8Slogwang * in that it does not have to swing a stub node as the queue becomes empty.
36a9643ea8Slogwang *
37a9643ea8Slogwang * The queue access functions allocate and free intermediate node
38a9643ea8Slogwang * transparently from/to a per scheduler pool ( see lthread_pool.h ).
39a9643ea8Slogwang *
40a9643ea8Slogwang * The queue provides both MPSC and SPSC insert methods
41a9643ea8Slogwang */
42a9643ea8Slogwang
43a9643ea8Slogwang /*
44a9643ea8Slogwang * define a queue of lthread nodes
45a9643ea8Slogwang */
46a9643ea8Slogwang struct lthread_queue {
47a9643ea8Slogwang struct qnode *head;
48a9643ea8Slogwang struct qnode *tail __rte_cache_aligned;
49a9643ea8Slogwang struct lthread_queue *p;
50a9643ea8Slogwang char name[LT_MAX_NAME_SIZE];
51a9643ea8Slogwang
52a9643ea8Slogwang DIAG_COUNT_DEFINE(rd);
53a9643ea8Slogwang DIAG_COUNT_DEFINE(wr);
54a9643ea8Slogwang DIAG_COUNT_DEFINE(size);
55a9643ea8Slogwang
56a9643ea8Slogwang } __rte_cache_aligned;
57a9643ea8Slogwang
58a9643ea8Slogwang
59a9643ea8Slogwang
60a9643ea8Slogwang static inline struct lthread_queue *
_lthread_queue_create(const char * name)61a9643ea8Slogwang _lthread_queue_create(const char *name)
62a9643ea8Slogwang {
63a9643ea8Slogwang struct qnode *stub;
64a9643ea8Slogwang struct lthread_queue *new_queue;
65a9643ea8Slogwang
66a9643ea8Slogwang new_queue = rte_malloc_socket(NULL, sizeof(struct lthread_queue),
67a9643ea8Slogwang RTE_CACHE_LINE_SIZE,
68a9643ea8Slogwang rte_socket_id());
69a9643ea8Slogwang if (new_queue == NULL)
70a9643ea8Slogwang return NULL;
71a9643ea8Slogwang
72a9643ea8Slogwang /* allocated stub node */
73a9643ea8Slogwang stub = _qnode_alloc();
74a9643ea8Slogwang RTE_ASSERT(stub);
75a9643ea8Slogwang
76a9643ea8Slogwang if (name != NULL)
77a9643ea8Slogwang strncpy(new_queue->name, name, sizeof(new_queue->name));
78a9643ea8Slogwang new_queue->name[sizeof(new_queue->name)-1] = 0;
79a9643ea8Slogwang
80a9643ea8Slogwang /* initialize queue as empty */
81a9643ea8Slogwang stub->next = NULL;
82a9643ea8Slogwang new_queue->head = stub;
83a9643ea8Slogwang new_queue->tail = stub;
84a9643ea8Slogwang
85a9643ea8Slogwang DIAG_COUNT_INIT(new_queue, rd);
86a9643ea8Slogwang DIAG_COUNT_INIT(new_queue, wr);
87a9643ea8Slogwang DIAG_COUNT_INIT(new_queue, size);
88a9643ea8Slogwang
89a9643ea8Slogwang return new_queue;
90a9643ea8Slogwang }
91a9643ea8Slogwang
92a9643ea8Slogwang /**
93a9643ea8Slogwang * Return true if the queue is empty
94a9643ea8Slogwang */
952bfe3f2eSlogwang static __rte_always_inline int
_lthread_queue_empty(struct lthread_queue * q)96a9643ea8Slogwang _lthread_queue_empty(struct lthread_queue *q)
97a9643ea8Slogwang {
98a9643ea8Slogwang return q->tail == q->head;
99a9643ea8Slogwang }
100a9643ea8Slogwang
101a9643ea8Slogwang
102a9643ea8Slogwang
103a9643ea8Slogwang /**
104a9643ea8Slogwang * Destroy a queue
105a9643ea8Slogwang * fail if queue is not empty
106a9643ea8Slogwang */
_lthread_queue_destroy(struct lthread_queue * q)107a9643ea8Slogwang static inline int _lthread_queue_destroy(struct lthread_queue *q)
108a9643ea8Slogwang {
109a9643ea8Slogwang if (q == NULL)
110a9643ea8Slogwang return -1;
111a9643ea8Slogwang
112a9643ea8Slogwang if (!_lthread_queue_empty(q))
113a9643ea8Slogwang return -1;
114a9643ea8Slogwang
115a9643ea8Slogwang _qnode_free(q->head);
116a9643ea8Slogwang rte_free(q);
117a9643ea8Slogwang return 0;
118a9643ea8Slogwang }
119a9643ea8Slogwang
120a9643ea8Slogwang RTE_DECLARE_PER_LCORE(struct lthread_sched *, this_sched);
121a9643ea8Slogwang
122a9643ea8Slogwang /*
123a9643ea8Slogwang * Insert a node into a queue
124a9643ea8Slogwang * this implementation is multi producer safe
125a9643ea8Slogwang */
1262bfe3f2eSlogwang static __rte_always_inline struct qnode *
_lthread_queue_insert_mp(struct lthread_queue * q,void * data)127a9643ea8Slogwang _lthread_queue_insert_mp(struct lthread_queue
128a9643ea8Slogwang *q, void *data)
129a9643ea8Slogwang {
130a9643ea8Slogwang struct qnode *prev;
131a9643ea8Slogwang struct qnode *n = _qnode_alloc();
132a9643ea8Slogwang
133a9643ea8Slogwang if (n == NULL)
134a9643ea8Slogwang return NULL;
135a9643ea8Slogwang
136a9643ea8Slogwang /* set object in node */
137a9643ea8Slogwang n->data = data;
138a9643ea8Slogwang n->next = NULL;
139a9643ea8Slogwang
140a9643ea8Slogwang /* this is an MPSC method, perform a locked update */
141a9643ea8Slogwang prev = n;
142a9643ea8Slogwang prev =
143a9643ea8Slogwang (struct qnode *)__sync_lock_test_and_set((uint64_t *) &(q)->head,
144a9643ea8Slogwang (uint64_t) prev);
145a9643ea8Slogwang /* there is a window of inconsistency until prev next is set,
146a9643ea8Slogwang * which is why remove must retry
147a9643ea8Slogwang */
148a9643ea8Slogwang prev->next = n;
149a9643ea8Slogwang
150a9643ea8Slogwang DIAG_COUNT_INC(q, wr);
151a9643ea8Slogwang DIAG_COUNT_INC(q, size);
152a9643ea8Slogwang
153a9643ea8Slogwang return n;
154a9643ea8Slogwang }
155a9643ea8Slogwang
156a9643ea8Slogwang /*
157a9643ea8Slogwang * Insert an node into a queue in single producer mode
158a9643ea8Slogwang * this implementation is NOT mult producer safe
159a9643ea8Slogwang */
1602bfe3f2eSlogwang static __rte_always_inline struct qnode *
_lthread_queue_insert_sp(struct lthread_queue * q,void * data)161a9643ea8Slogwang _lthread_queue_insert_sp(struct lthread_queue
162a9643ea8Slogwang *q, void *data)
163a9643ea8Slogwang {
164a9643ea8Slogwang /* allocate a queue node */
165a9643ea8Slogwang struct qnode *prev;
166a9643ea8Slogwang struct qnode *n = _qnode_alloc();
167a9643ea8Slogwang
168a9643ea8Slogwang if (n == NULL)
169a9643ea8Slogwang return NULL;
170a9643ea8Slogwang
171a9643ea8Slogwang /* set data in node */
172a9643ea8Slogwang n->data = data;
173a9643ea8Slogwang n->next = NULL;
174a9643ea8Slogwang
175a9643ea8Slogwang /* this is an SPSC method, no need for locked exchange operation */
176a9643ea8Slogwang prev = q->head;
177a9643ea8Slogwang prev->next = q->head = n;
178a9643ea8Slogwang
179a9643ea8Slogwang DIAG_COUNT_INC(q, wr);
180a9643ea8Slogwang DIAG_COUNT_INC(q, size);
181a9643ea8Slogwang
182a9643ea8Slogwang return n;
183a9643ea8Slogwang }
184a9643ea8Slogwang
185a9643ea8Slogwang /*
186a9643ea8Slogwang * Remove a node from a queue
187a9643ea8Slogwang */
1882bfe3f2eSlogwang static __rte_always_inline void *
_lthread_queue_poll(struct lthread_queue * q)189a9643ea8Slogwang _lthread_queue_poll(struct lthread_queue *q)
190a9643ea8Slogwang {
191a9643ea8Slogwang void *data = NULL;
192a9643ea8Slogwang struct qnode *tail = q->tail;
193a9643ea8Slogwang struct qnode *next = (struct qnode *)tail->next;
194a9643ea8Slogwang /*
195a9643ea8Slogwang * There is a small window of inconsistency between producer and
196a9643ea8Slogwang * consumer whereby the queue may appear empty if consumer and
197a9643ea8Slogwang * producer access it at the same time.
198a9643ea8Slogwang * The consumer must handle this by retrying
199a9643ea8Slogwang */
200a9643ea8Slogwang
201a9643ea8Slogwang if (likely(next != NULL)) {
202a9643ea8Slogwang q->tail = next;
203a9643ea8Slogwang tail->data = next->data;
204a9643ea8Slogwang data = tail->data;
205a9643ea8Slogwang
206a9643ea8Slogwang /* free the node */
207a9643ea8Slogwang _qnode_free(tail);
208a9643ea8Slogwang
209a9643ea8Slogwang DIAG_COUNT_INC(q, rd);
210a9643ea8Slogwang DIAG_COUNT_DEC(q, size);
211a9643ea8Slogwang return data;
212a9643ea8Slogwang }
213a9643ea8Slogwang return NULL;
214a9643ea8Slogwang }
215a9643ea8Slogwang
216a9643ea8Slogwang /*
217a9643ea8Slogwang * Remove a node from a queue
218a9643ea8Slogwang */
2192bfe3f2eSlogwang static __rte_always_inline void *
_lthread_queue_remove(struct lthread_queue * q)220a9643ea8Slogwang _lthread_queue_remove(struct lthread_queue *q)
221a9643ea8Slogwang {
222a9643ea8Slogwang void *data = NULL;
223a9643ea8Slogwang
224a9643ea8Slogwang /*
225a9643ea8Slogwang * There is a small window of inconsistency between producer and
226a9643ea8Slogwang * consumer whereby the queue may appear empty if consumer and
227a9643ea8Slogwang * producer access it at the same time. We handle this by retrying
228a9643ea8Slogwang */
229a9643ea8Slogwang do {
230a9643ea8Slogwang data = _lthread_queue_poll(q);
231a9643ea8Slogwang
232a9643ea8Slogwang if (likely(data != NULL)) {
233a9643ea8Slogwang
234a9643ea8Slogwang DIAG_COUNT_INC(q, rd);
235a9643ea8Slogwang DIAG_COUNT_DEC(q, size);
236a9643ea8Slogwang return data;
237a9643ea8Slogwang }
238a9643ea8Slogwang rte_compiler_barrier();
239a9643ea8Slogwang } while (unlikely(!_lthread_queue_empty(q)));
240a9643ea8Slogwang return NULL;
241a9643ea8Slogwang }
242a9643ea8Slogwang
2432bfe3f2eSlogwang #ifdef __cplusplus
2442bfe3f2eSlogwang }
2452bfe3f2eSlogwang #endif
246a9643ea8Slogwang
247a9643ea8Slogwang #endif /* LTHREAD_QUEUE_H_ */
248