xref: /f-stack/dpdk/drivers/event/dsw/dsw_evdev.c (revision ebf5cedb)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Ericsson AB
3  */
4 
5 #include <stdbool.h>
6 
7 #include <rte_cycles.h>
8 #include <rte_eventdev_pmd.h>
9 #include <rte_eventdev_pmd_vdev.h>
10 #include <rte_random.h>
11 
12 #include "dsw_evdev.h"
13 
14 #define EVENTDEV_NAME_DSW_PMD event_dsw
15 
16 static int
17 dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
18 	       const struct rte_event_port_conf *conf)
19 {
20 	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
21 	struct dsw_port *port;
22 	struct rte_event_ring *in_ring;
23 	struct rte_ring *ctl_in_ring;
24 	char ring_name[RTE_RING_NAMESIZE];
25 
26 	port = &dsw->ports[port_id];
27 
28 	*port = (struct dsw_port) {
29 		.id = port_id,
30 		.dsw = dsw,
31 		.dequeue_depth = conf->dequeue_depth,
32 		.enqueue_depth = conf->enqueue_depth,
33 		.new_event_threshold = conf->new_event_threshold
34 	};
35 
36 	snprintf(ring_name, sizeof(ring_name), "dsw%d_p%u", dev->data->dev_id,
37 		 port_id);
38 
39 	in_ring = rte_event_ring_create(ring_name, DSW_IN_RING_SIZE,
40 					dev->data->socket_id,
41 					RING_F_SC_DEQ|RING_F_EXACT_SZ);
42 
43 	if (in_ring == NULL)
44 		return -ENOMEM;
45 
46 	snprintf(ring_name, sizeof(ring_name), "dswctl%d_p%u",
47 		 dev->data->dev_id, port_id);
48 
49 	ctl_in_ring = rte_ring_create(ring_name, DSW_CTL_IN_RING_SIZE,
50 				      dev->data->socket_id,
51 				      RING_F_SC_DEQ|RING_F_EXACT_SZ);
52 
53 	if (ctl_in_ring == NULL) {
54 		rte_event_ring_free(in_ring);
55 		return -ENOMEM;
56 	}
57 
58 	port->in_ring = in_ring;
59 	port->ctl_in_ring = ctl_in_ring;
60 
61 	rte_atomic16_init(&port->load);
62 
63 	port->load_update_interval =
64 		(DSW_LOAD_UPDATE_INTERVAL * rte_get_timer_hz()) / US_PER_S;
65 
66 	port->migration_interval =
67 		(DSW_MIGRATION_INTERVAL * rte_get_timer_hz()) / US_PER_S;
68 
69 	dev->data->ports[port_id] = port;
70 
71 	return 0;
72 }
73 
74 static void
75 dsw_port_def_conf(struct rte_eventdev *dev __rte_unused,
76 		  uint8_t port_id __rte_unused,
77 		  struct rte_event_port_conf *port_conf)
78 {
79 	*port_conf = (struct rte_event_port_conf) {
80 		.new_event_threshold = 1024,
81 		.dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH / 4,
82 		.enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH / 4
83 	};
84 }
85 
86 static void
87 dsw_port_release(void *p)
88 {
89 	struct dsw_port *port = p;
90 
91 	rte_event_ring_free(port->in_ring);
92 	rte_ring_free(port->ctl_in_ring);
93 }
94 
95 static int
96 dsw_queue_setup(struct rte_eventdev *dev, uint8_t queue_id,
97 		const struct rte_event_queue_conf *conf)
98 {
99 	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
100 	struct dsw_queue *queue = &dsw->queues[queue_id];
101 
102 	if (RTE_EVENT_QUEUE_CFG_ALL_TYPES & conf->event_queue_cfg)
103 		return -ENOTSUP;
104 
105 	/* SINGLE_LINK is better off treated as TYPE_ATOMIC, since it
106 	 * avoid the "fake" TYPE_PARALLEL flow_id assignment. Since
107 	 * the queue will only have a single serving port, no
108 	 * migration will ever happen, so the extra TYPE_ATOMIC
109 	 * migration overhead is avoided.
110 	 */
111 	if (RTE_EVENT_QUEUE_CFG_SINGLE_LINK & conf->event_queue_cfg)
112 		queue->schedule_type = RTE_SCHED_TYPE_ATOMIC;
113 	else {
114 		if (conf->schedule_type == RTE_SCHED_TYPE_ORDERED)
115 			return -ENOTSUP;
116 		/* atomic or parallel */
117 		queue->schedule_type = conf->schedule_type;
118 	}
119 
120 	queue->num_serving_ports = 0;
121 
122 	return 0;
123 }
124 
125 static void
126 dsw_queue_def_conf(struct rte_eventdev *dev __rte_unused,
127 		   uint8_t queue_id __rte_unused,
128 		   struct rte_event_queue_conf *queue_conf)
129 {
130 	*queue_conf = (struct rte_event_queue_conf) {
131 		.nb_atomic_flows = 4096,
132 		.schedule_type = RTE_SCHED_TYPE_ATOMIC,
133 		.priority = RTE_EVENT_DEV_PRIORITY_NORMAL
134 	};
135 }
136 
137 static void
138 dsw_queue_release(struct rte_eventdev *dev __rte_unused,
139 		  uint8_t queue_id __rte_unused)
140 {
141 }
142 
143 static void
144 queue_add_port(struct dsw_queue *queue, uint16_t port_id)
145 {
146 	queue->serving_ports[queue->num_serving_ports] = port_id;
147 	queue->num_serving_ports++;
148 }
149 
150 static bool
151 queue_remove_port(struct dsw_queue *queue, uint16_t port_id)
152 {
153 	uint16_t i;
154 
155 	for (i = 0; i < queue->num_serving_ports; i++)
156 		if (queue->serving_ports[i] == port_id) {
157 			uint16_t last_idx = queue->num_serving_ports - 1;
158 			if (i != last_idx)
159 				queue->serving_ports[i] =
160 					queue->serving_ports[last_idx];
161 			queue->num_serving_ports--;
162 			return true;
163 		}
164 	return false;
165 }
166 
167 static int
168 dsw_port_link_unlink(struct rte_eventdev *dev, void *port,
169 		     const uint8_t queues[], uint16_t num, bool link)
170 {
171 	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
172 	struct dsw_port *p = port;
173 	uint16_t i;
174 	uint16_t count = 0;
175 
176 	for (i = 0; i < num; i++) {
177 		uint8_t qid = queues[i];
178 		struct dsw_queue *q = &dsw->queues[qid];
179 		if (link) {
180 			queue_add_port(q, p->id);
181 			count++;
182 		} else {
183 			bool removed = queue_remove_port(q, p->id);
184 			if (removed)
185 				count++;
186 		}
187 	}
188 
189 	return count;
190 }
191 
192 static int
193 dsw_port_link(struct rte_eventdev *dev, void *port, const uint8_t queues[],
194 	      const uint8_t priorities[] __rte_unused, uint16_t num)
195 {
196 	return dsw_port_link_unlink(dev, port, queues, num, true);
197 }
198 
199 static int
200 dsw_port_unlink(struct rte_eventdev *dev, void *port, uint8_t queues[],
201 		uint16_t num)
202 {
203 	return dsw_port_link_unlink(dev, port, queues, num, false);
204 }
205 
206 static void
207 dsw_info_get(struct rte_eventdev *dev __rte_unused,
208 	     struct rte_event_dev_info *info)
209 {
210 	*info = (struct rte_event_dev_info) {
211 		.driver_name = DSW_PMD_NAME,
212 		.max_event_queues = DSW_MAX_QUEUES,
213 		.max_event_queue_flows = DSW_MAX_FLOWS,
214 		.max_event_queue_priority_levels = 1,
215 		.max_event_priority_levels = 1,
216 		.max_event_ports = DSW_MAX_PORTS,
217 		.max_event_port_dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH,
218 		.max_event_port_enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH,
219 		.max_num_events = DSW_MAX_EVENTS,
220 		.event_dev_cap = RTE_EVENT_DEV_CAP_BURST_MODE|
221 		RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED|
222 		RTE_EVENT_DEV_CAP_NONSEQ_MODE|
223 		RTE_EVENT_DEV_CAP_MULTIPLE_QUEUE_PORT
224 	};
225 }
226 
227 static int
228 dsw_configure(const struct rte_eventdev *dev)
229 {
230 	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
231 	const struct rte_event_dev_config *conf = &dev->data->dev_conf;
232 	int32_t min_max_in_flight;
233 
234 	dsw->num_ports = conf->nb_event_ports;
235 	dsw->num_queues = conf->nb_event_queues;
236 
237 	/* Avoid a situation where consumer ports are holding all the
238 	 * credits, without making use of them.
239 	 */
240 	min_max_in_flight = conf->nb_event_ports * DSW_PORT_MAX_CREDITS;
241 
242 	dsw->max_inflight = RTE_MAX(conf->nb_events_limit, min_max_in_flight);
243 
244 	return 0;
245 }
246 
247 
248 static void
249 initial_flow_to_port_assignment(struct dsw_evdev *dsw)
250 {
251 	uint8_t queue_id;
252 	for (queue_id = 0; queue_id < dsw->num_queues; queue_id++) {
253 		struct dsw_queue *queue = &dsw->queues[queue_id];
254 		uint16_t flow_hash;
255 		for (flow_hash = 0; flow_hash < DSW_MAX_FLOWS; flow_hash++) {
256 			uint8_t port_idx =
257 				rte_rand() % queue->num_serving_ports;
258 			uint8_t port_id =
259 				queue->serving_ports[port_idx];
260 			dsw->queues[queue_id].flow_to_port_map[flow_hash] =
261 				port_id;
262 		}
263 	}
264 }
265 
266 static int
267 dsw_start(struct rte_eventdev *dev)
268 {
269 	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
270 	uint16_t i;
271 	uint64_t now;
272 
273 	rte_atomic32_init(&dsw->credits_on_loan);
274 
275 	initial_flow_to_port_assignment(dsw);
276 
277 	now = rte_get_timer_cycles();
278 	for (i = 0; i < dsw->num_ports; i++) {
279 		dsw->ports[i].measurement_start = now;
280 		dsw->ports[i].busy_start = now;
281 	}
282 
283 	return 0;
284 }
285 
286 static void
287 dsw_port_drain_buf(uint8_t dev_id, struct rte_event *buf, uint16_t buf_len,
288 		   eventdev_stop_flush_t flush, void *flush_arg)
289 {
290 	uint16_t i;
291 
292 	for (i = 0; i < buf_len; i++)
293 		flush(dev_id, buf[i], flush_arg);
294 }
295 
296 static void
297 dsw_port_drain_paused(uint8_t dev_id, struct dsw_port *port,
298 		      eventdev_stop_flush_t flush, void *flush_arg)
299 {
300 	dsw_port_drain_buf(dev_id, port->paused_events, port->paused_events_len,
301 			   flush, flush_arg);
302 }
303 
304 static void
305 dsw_port_drain_out(uint8_t dev_id, struct dsw_evdev *dsw, struct dsw_port *port,
306 		   eventdev_stop_flush_t flush, void *flush_arg)
307 {
308 	uint16_t dport_id;
309 
310 	for (dport_id = 0; dport_id < dsw->num_ports; dport_id++)
311 		if (dport_id != port->id)
312 			dsw_port_drain_buf(dev_id, port->out_buffer[dport_id],
313 					   port->out_buffer_len[dport_id],
314 					   flush, flush_arg);
315 }
316 
317 static void
318 dsw_port_drain_in_ring(uint8_t dev_id, struct dsw_port *port,
319 		       eventdev_stop_flush_t flush, void *flush_arg)
320 {
321 	struct rte_event ev;
322 
323 	while (rte_event_ring_dequeue_burst(port->in_ring, &ev, 1, NULL))
324 		flush(dev_id, ev, flush_arg);
325 }
326 
327 static void
328 dsw_drain(uint8_t dev_id, struct dsw_evdev *dsw,
329 	  eventdev_stop_flush_t flush, void *flush_arg)
330 {
331 	uint16_t port_id;
332 
333 	if (flush == NULL)
334 		return;
335 
336 	for (port_id = 0; port_id < dsw->num_ports; port_id++) {
337 		struct dsw_port *port = &dsw->ports[port_id];
338 
339 		dsw_port_drain_out(dev_id, dsw, port, flush, flush_arg);
340 		dsw_port_drain_paused(dev_id, port, flush, flush_arg);
341 		dsw_port_drain_in_ring(dev_id, port, flush, flush_arg);
342 	}
343 }
344 
345 static void
346 dsw_stop(struct rte_eventdev *dev)
347 {
348 	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
349 	uint8_t dev_id;
350 	eventdev_stop_flush_t flush;
351 	void *flush_arg;
352 
353 	dev_id = dev->data->dev_id;
354 	flush = dev->dev_ops->dev_stop_flush;
355 	flush_arg = dev->data->dev_stop_flush_arg;
356 
357 	dsw_drain(dev_id, dsw, flush, flush_arg);
358 }
359 
360 static int
361 dsw_close(struct rte_eventdev *dev)
362 {
363 	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
364 
365 	dsw->num_ports = 0;
366 	dsw->num_queues = 0;
367 
368 	return 0;
369 }
370 
371 static struct rte_eventdev_ops dsw_evdev_ops = {
372 	.port_setup = dsw_port_setup,
373 	.port_def_conf = dsw_port_def_conf,
374 	.port_release = dsw_port_release,
375 	.queue_setup = dsw_queue_setup,
376 	.queue_def_conf = dsw_queue_def_conf,
377 	.queue_release = dsw_queue_release,
378 	.port_link = dsw_port_link,
379 	.port_unlink = dsw_port_unlink,
380 	.dev_infos_get = dsw_info_get,
381 	.dev_configure = dsw_configure,
382 	.dev_start = dsw_start,
383 	.dev_stop = dsw_stop,
384 	.dev_close = dsw_close,
385 	.xstats_get = dsw_xstats_get,
386 	.xstats_get_names = dsw_xstats_get_names,
387 	.xstats_get_by_name = dsw_xstats_get_by_name
388 };
389 
390 static int
391 dsw_probe(struct rte_vdev_device *vdev)
392 {
393 	const char *name;
394 	struct rte_eventdev *dev;
395 	struct dsw_evdev *dsw;
396 
397 	name = rte_vdev_device_name(vdev);
398 
399 	dev = rte_event_pmd_vdev_init(name, sizeof(struct dsw_evdev),
400 				      rte_socket_id());
401 	if (dev == NULL)
402 		return -EFAULT;
403 
404 	dev->dev_ops = &dsw_evdev_ops;
405 	dev->enqueue = dsw_event_enqueue;
406 	dev->enqueue_burst = dsw_event_enqueue_burst;
407 	dev->enqueue_new_burst = dsw_event_enqueue_new_burst;
408 	dev->enqueue_forward_burst = dsw_event_enqueue_forward_burst;
409 	dev->dequeue = dsw_event_dequeue;
410 	dev->dequeue_burst = dsw_event_dequeue_burst;
411 
412 	if (rte_eal_process_type() != RTE_PROC_PRIMARY)
413 		return 0;
414 
415 	dsw = dev->data->dev_private;
416 	dsw->data = dev->data;
417 
418 	return 0;
419 }
420 
421 static int
422 dsw_remove(struct rte_vdev_device *vdev)
423 {
424 	const char *name;
425 
426 	name = rte_vdev_device_name(vdev);
427 	if (name == NULL)
428 		return -EINVAL;
429 
430 	return rte_event_pmd_vdev_uninit(name);
431 }
432 
433 static struct rte_vdev_driver evdev_dsw_pmd_drv = {
434 	.probe = dsw_probe,
435 	.remove = dsw_remove
436 };
437 
438 RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_DSW_PMD, evdev_dsw_pmd_drv);
439