xref: /dpdk/drivers/event/opdl/opdl_evdev.c (revision 7be78d02)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation
3  */
4 
5 #include <inttypes.h>
6 #include <string.h>
7 
8 #include <rte_bus_vdev.h>
9 #include <rte_lcore.h>
10 #include <rte_memzone.h>
11 #include <rte_kvargs.h>
12 #include <rte_errno.h>
13 #include <rte_cycles.h>
14 
15 #include "opdl_evdev.h"
16 #include "opdl_ring.h"
17 #include "opdl_log.h"
18 
19 #define EVENTDEV_NAME_OPDL_PMD event_opdl
20 #define NUMA_NODE_ARG "numa_node"
21 #define DO_VALIDATION_ARG "do_validation"
22 #define DO_TEST_ARG "self_test"
23 
24 
25 static void
26 opdl_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info);
27 
28 uint16_t
opdl_event_enqueue_burst(void * port,const struct rte_event ev[],uint16_t num)29 opdl_event_enqueue_burst(void *port,
30 			 const struct rte_event ev[],
31 			 uint16_t num)
32 {
33 	struct opdl_port *p = port;
34 
35 	if (unlikely(!p->opdl->data->dev_started))
36 		return 0;
37 
38 
39 	/* either rx_enqueue or disclaim*/
40 	return p->enq(p, ev, num);
41 }
42 
43 uint16_t
opdl_event_enqueue(void * port,const struct rte_event * ev)44 opdl_event_enqueue(void *port, const struct rte_event *ev)
45 {
46 	struct opdl_port *p = port;
47 
48 	if (unlikely(!p->opdl->data->dev_started))
49 		return 0;
50 
51 
52 	return p->enq(p, ev, 1);
53 }
54 
55 uint16_t
opdl_event_dequeue_burst(void * port,struct rte_event * ev,uint16_t num,uint64_t wait)56 opdl_event_dequeue_burst(void *port,
57 			 struct rte_event *ev,
58 			 uint16_t num,
59 			 uint64_t wait)
60 {
61 	struct opdl_port *p = (void *)port;
62 
63 	RTE_SET_USED(wait);
64 
65 	if (unlikely(!p->opdl->data->dev_started))
66 		return 0;
67 
68 	/* This function pointer can point to tx_dequeue or claim*/
69 	return p->deq(p, ev, num);
70 }
71 
72 uint16_t
opdl_event_dequeue(void * port,struct rte_event * ev,uint64_t wait)73 opdl_event_dequeue(void *port,
74 		   struct rte_event *ev,
75 		   uint64_t wait)
76 {
77 	struct opdl_port *p = (void *)port;
78 
79 	if (unlikely(!p->opdl->data->dev_started))
80 		return 0;
81 
82 	RTE_SET_USED(wait);
83 
84 	return p->deq(p, ev, 1);
85 }
86 
87 static int
opdl_port_link(struct rte_eventdev * dev,void * port,const uint8_t queues[],const uint8_t priorities[],uint16_t num)88 opdl_port_link(struct rte_eventdev *dev,
89 	       void *port,
90 	       const uint8_t queues[],
91 	       const uint8_t priorities[],
92 	       uint16_t num)
93 {
94 	struct opdl_port *p = port;
95 
96 	RTE_SET_USED(priorities);
97 	RTE_SET_USED(dev);
98 
99 	if (unlikely(dev->data->dev_started)) {
100 		PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
101 			     "Attempt to link queue (%u) to port %d while device started\n",
102 			     dev->data->dev_id,
103 				queues[0],
104 				p->id);
105 		rte_errno = EINVAL;
106 		return 0;
107 	}
108 
109 	/* Max of 1 queue per port */
110 	if (num > 1) {
111 		PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
112 			     "Attempt to link more than one queue (%u) to port %d requested\n",
113 			     dev->data->dev_id,
114 				num,
115 				p->id);
116 		rte_errno = EDQUOT;
117 		return 0;
118 	}
119 
120 	if (!p->configured) {
121 		PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
122 			     "port %d not configured, cannot link to %u\n",
123 			     dev->data->dev_id,
124 				p->id,
125 				queues[0]);
126 		rte_errno = EINVAL;
127 		return 0;
128 	}
129 
130 	if (p->external_qid != OPDL_INVALID_QID) {
131 		PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
132 			     "port %d already linked to queue %u, cannot link to %u\n",
133 			     dev->data->dev_id,
134 				p->id,
135 				p->external_qid,
136 				queues[0]);
137 		rte_errno = EINVAL;
138 		return 0;
139 	}
140 
141 	p->external_qid = queues[0];
142 
143 	return 1;
144 }
145 
146 static int
opdl_port_unlink(struct rte_eventdev * dev,void * port,uint8_t queues[],uint16_t nb_unlinks)147 opdl_port_unlink(struct rte_eventdev *dev,
148 		 void *port,
149 		 uint8_t queues[],
150 		 uint16_t nb_unlinks)
151 {
152 	struct opdl_port *p = port;
153 
154 	RTE_SET_USED(queues);
155 	RTE_SET_USED(nb_unlinks);
156 
157 	if (unlikely(dev->data->dev_started)) {
158 		PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
159 			     "Attempt to unlink queue (%u) to port %d while device started\n",
160 			     dev->data->dev_id,
161 			     queues[0],
162 			     p->id);
163 		rte_errno = EINVAL;
164 		return 0;
165 	}
166 	RTE_SET_USED(nb_unlinks);
167 
168 	/* Port Stuff */
169 	p->queue_id = OPDL_INVALID_QID;
170 	p->p_type = OPDL_INVALID_PORT;
171 	p->external_qid = OPDL_INVALID_QID;
172 
173 	/* always unlink 0 queue due to statice pipeline */
174 	return 0;
175 }
176 
177 static int
opdl_port_setup(struct rte_eventdev * dev,uint8_t port_id,const struct rte_event_port_conf * conf)178 opdl_port_setup(struct rte_eventdev *dev,
179 		uint8_t port_id,
180 		const struct rte_event_port_conf *conf)
181 {
182 	struct opdl_evdev *device = opdl_pmd_priv(dev);
183 	struct opdl_port *p = &device->ports[port_id];
184 
185 	RTE_SET_USED(conf);
186 
187 	/* Check if port already configured */
188 	if (p->configured) {
189 		PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
190 			     "Attempt to setup port %d which is already setup\n",
191 			     dev->data->dev_id,
192 			     p->id);
193 		return -EDQUOT;
194 	}
195 
196 	*p = (struct opdl_port){0}; /* zero entire structure */
197 	p->id = port_id;
198 	p->opdl = device;
199 	p->queue_id = OPDL_INVALID_QID;
200 	p->external_qid = OPDL_INVALID_QID;
201 	dev->data->ports[port_id] = p;
202 	rte_smp_wmb();
203 	p->configured = 1;
204 	device->nb_ports++;
205 	return 0;
206 }
207 
208 static void
opdl_port_release(void * port)209 opdl_port_release(void *port)
210 {
211 	struct opdl_port *p = (void *)port;
212 
213 	if (p == NULL ||
214 	    p->opdl->data->dev_started) {
215 		return;
216 	}
217 
218 	p->configured = 0;
219 	p->initialized = 0;
220 }
221 
222 static void
opdl_port_def_conf(struct rte_eventdev * dev,uint8_t port_id,struct rte_event_port_conf * port_conf)223 opdl_port_def_conf(struct rte_eventdev *dev, uint8_t port_id,
224 		struct rte_event_port_conf *port_conf)
225 {
226 	RTE_SET_USED(dev);
227 	RTE_SET_USED(port_id);
228 
229 	port_conf->new_event_threshold = MAX_OPDL_CONS_Q_DEPTH;
230 	port_conf->dequeue_depth = MAX_OPDL_CONS_Q_DEPTH;
231 	port_conf->enqueue_depth = MAX_OPDL_CONS_Q_DEPTH;
232 }
233 
234 static int
opdl_queue_setup(struct rte_eventdev * dev,uint8_t queue_id,const struct rte_event_queue_conf * conf)235 opdl_queue_setup(struct rte_eventdev *dev,
236 		 uint8_t queue_id,
237 		 const struct rte_event_queue_conf *conf)
238 {
239 	enum queue_type type;
240 
241 	struct opdl_evdev *device = opdl_pmd_priv(dev);
242 
243 	/* Extra sanity check, probably not needed */
244 	if (queue_id == OPDL_INVALID_QID) {
245 		PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
246 			     "Invalid queue id %u requested\n",
247 			     dev->data->dev_id,
248 			     queue_id);
249 		return -EINVAL;
250 	}
251 
252 	if (device->nb_q_md > device->max_queue_nb) {
253 		PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
254 			     "Max number of queues %u exceeded by request %u\n",
255 			     dev->data->dev_id,
256 			     device->max_queue_nb,
257 			     device->nb_q_md);
258 		return -EINVAL;
259 	}
260 
261 	if (RTE_EVENT_QUEUE_CFG_ALL_TYPES
262 	    & conf->event_queue_cfg) {
263 		PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
264 			     "QUEUE_CFG_ALL_TYPES not supported\n",
265 			     dev->data->dev_id);
266 		return -ENOTSUP;
267 	} else if (RTE_EVENT_QUEUE_CFG_SINGLE_LINK
268 		   & conf->event_queue_cfg) {
269 		type = OPDL_Q_TYPE_SINGLE_LINK;
270 	} else {
271 		switch (conf->schedule_type) {
272 		case RTE_SCHED_TYPE_ORDERED:
273 			type = OPDL_Q_TYPE_ORDERED;
274 			break;
275 		case RTE_SCHED_TYPE_ATOMIC:
276 			type = OPDL_Q_TYPE_ATOMIC;
277 			break;
278 		case RTE_SCHED_TYPE_PARALLEL:
279 			type = OPDL_Q_TYPE_ORDERED;
280 			break;
281 		default:
282 			PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
283 				     "Unknown queue type %d requested\n",
284 				     dev->data->dev_id,
285 				     conf->event_queue_cfg);
286 			return -EINVAL;
287 		}
288 	}
289 	/* Check if queue id has been setup already */
290 	uint32_t i;
291 	for (i = 0; i < device->nb_q_md; i++) {
292 		if (device->q_md[i].ext_id == queue_id) {
293 			PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
294 				     "queue id %u already setup\n",
295 				     dev->data->dev_id,
296 				     queue_id);
297 			return -EINVAL;
298 		}
299 	}
300 
301 	device->q_md[device->nb_q_md].ext_id = queue_id;
302 	device->q_md[device->nb_q_md].type = type;
303 	device->q_md[device->nb_q_md].setup = 1;
304 	device->nb_q_md++;
305 
306 	return 1;
307 }
308 
309 static void
opdl_queue_release(struct rte_eventdev * dev,uint8_t queue_id)310 opdl_queue_release(struct rte_eventdev *dev, uint8_t queue_id)
311 {
312 	struct opdl_evdev *device = opdl_pmd_priv(dev);
313 
314 	RTE_SET_USED(queue_id);
315 
316 	if (device->data->dev_started)
317 		return;
318 
319 }
320 
321 static void
opdl_queue_def_conf(struct rte_eventdev * dev,uint8_t queue_id,struct rte_event_queue_conf * conf)322 opdl_queue_def_conf(struct rte_eventdev *dev,
323 		    uint8_t queue_id,
324 		    struct rte_event_queue_conf *conf)
325 {
326 	RTE_SET_USED(dev);
327 	RTE_SET_USED(queue_id);
328 
329 	static const struct rte_event_queue_conf default_conf = {
330 		.nb_atomic_flows = 1024,
331 		.nb_atomic_order_sequences = 1,
332 		.event_queue_cfg = 0,
333 		.schedule_type = RTE_SCHED_TYPE_ORDERED,
334 		.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
335 	};
336 
337 	*conf = default_conf;
338 }
339 
340 
341 static int
opdl_dev_configure(const struct rte_eventdev * dev)342 opdl_dev_configure(const struct rte_eventdev *dev)
343 {
344 	struct opdl_evdev *opdl = opdl_pmd_priv(dev);
345 	const struct rte_eventdev_data *data = dev->data;
346 	const struct rte_event_dev_config *conf = &data->dev_conf;
347 
348 	opdl->max_queue_nb = conf->nb_event_queues;
349 	opdl->max_port_nb = conf->nb_event_ports;
350 	opdl->nb_events_limit = conf->nb_events_limit;
351 
352 	if (conf->event_dev_cfg & RTE_EVENT_DEV_CFG_PER_DEQUEUE_TIMEOUT) {
353 		PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
354 			     "DEQUEUE_TIMEOUT not supported\n",
355 			     dev->data->dev_id);
356 		return -ENOTSUP;
357 	}
358 
359 	return 0;
360 }
361 
362 static void
opdl_info_get(struct rte_eventdev * dev,struct rte_event_dev_info * info)363 opdl_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info)
364 {
365 	RTE_SET_USED(dev);
366 
367 	static const struct rte_event_dev_info evdev_opdl_info = {
368 		.driver_name = OPDL_PMD_NAME,
369 		.max_event_queues = RTE_EVENT_MAX_QUEUES_PER_DEV,
370 		.max_event_queue_flows = OPDL_QID_NUM_FIDS,
371 		.max_event_queue_priority_levels = OPDL_Q_PRIORITY_MAX,
372 		.max_event_priority_levels = OPDL_IQS_MAX,
373 		.max_event_ports = OPDL_PORTS_MAX,
374 		.max_event_port_dequeue_depth = MAX_OPDL_CONS_Q_DEPTH,
375 		.max_event_port_enqueue_depth = MAX_OPDL_CONS_Q_DEPTH,
376 		.max_num_events = OPDL_INFLIGHT_EVENTS_TOTAL,
377 		.event_dev_cap = RTE_EVENT_DEV_CAP_BURST_MODE |
378 				 RTE_EVENT_DEV_CAP_CARRY_FLOW_ID |
379 				 RTE_EVENT_DEV_CAP_MAINTENANCE_FREE,
380 	};
381 
382 	*info = evdev_opdl_info;
383 }
384 
385 static void
opdl_dump(struct rte_eventdev * dev,FILE * f)386 opdl_dump(struct rte_eventdev *dev, FILE *f)
387 {
388 	struct opdl_evdev *device = opdl_pmd_priv(dev);
389 
390 	if (!device->do_validation)
391 		return;
392 
393 	fprintf(f,
394 		"\n\n -- RING STATISTICS --\n");
395 	uint32_t i;
396 	for (i = 0; i < device->nb_opdls; i++)
397 		opdl_ring_dump(device->opdl[i], f);
398 
399 	fprintf(f,
400 		"\n\n -- PORT STATISTICS --\n"
401 		"Type Port Index  Port Id  Queue Id     Av. Req Size  "
402 		"Av. Grant Size     Av. Cycles PP"
403 		"      Empty DEQs   Non Empty DEQs   Pkts Processed\n");
404 
405 	for (i = 0; i < device->max_port_nb; i++) {
406 		char queue_id[64];
407 		char total_cyc[64];
408 		const char *p_type;
409 
410 		uint64_t cne, cpg;
411 		struct opdl_port *port = &device->ports[i];
412 
413 		if (port->initialized) {
414 			cne = port->port_stat[claim_non_empty];
415 			cpg = port->port_stat[claim_pkts_granted];
416 			if (port->p_type == OPDL_REGULAR_PORT)
417 				p_type = "REG";
418 			else if (port->p_type == OPDL_PURE_RX_PORT)
419 				p_type = "  RX";
420 			else if (port->p_type == OPDL_PURE_TX_PORT)
421 				p_type = "  TX";
422 			else if (port->p_type == OPDL_ASYNC_PORT)
423 				p_type = "SYNC";
424 			else
425 				p_type = "????";
426 
427 			snprintf(queue_id, sizeof(queue_id), "%02u",
428 					port->external_qid);
429 			if (port->p_type == OPDL_REGULAR_PORT ||
430 					port->p_type == OPDL_ASYNC_PORT)
431 				snprintf(total_cyc, sizeof(total_cyc),
432 					" %'16"PRIu64"",
433 					(cpg != 0 ?
434 					 port->port_stat[total_cycles] / cpg
435 					 : 0));
436 			else
437 				snprintf(total_cyc, sizeof(total_cyc),
438 					"             ----");
439 			fprintf(f,
440 				"%4s %10u %8u %9s %'16"PRIu64" %'16"PRIu64" %s "
441 				"%'16"PRIu64" %'16"PRIu64" %'16"PRIu64"\n",
442 				p_type,
443 				i,
444 				port->id,
445 				(port->external_qid == OPDL_INVALID_QID ? "---"
446 				 : queue_id),
447 				(cne != 0 ?
448 				 port->port_stat[claim_pkts_requested] / cne
449 				 : 0),
450 				(cne != 0 ?
451 				 port->port_stat[claim_pkts_granted] / cne
452 				 : 0),
453 				total_cyc,
454 				port->port_stat[claim_empty],
455 				port->port_stat[claim_non_empty],
456 				port->port_stat[claim_pkts_granted]);
457 		}
458 	}
459 	fprintf(f, "\n");
460 }
461 
462 
463 static void
opdl_stop(struct rte_eventdev * dev)464 opdl_stop(struct rte_eventdev *dev)
465 {
466 	struct opdl_evdev *device = opdl_pmd_priv(dev);
467 
468 	opdl_xstats_uninit(dev);
469 
470 	destroy_queues_and_rings(dev);
471 
472 
473 	device->started = 0;
474 
475 	rte_smp_wmb();
476 }
477 
478 static int
opdl_start(struct rte_eventdev * dev)479 opdl_start(struct rte_eventdev *dev)
480 {
481 	int err = 0;
482 
483 	if (!err)
484 		err = create_queues_and_rings(dev);
485 
486 
487 	if (!err)
488 		err = assign_internal_queue_ids(dev);
489 
490 
491 	if (!err)
492 		err = initialise_queue_zero_ports(dev);
493 
494 
495 	if (!err)
496 		err = initialise_all_other_ports(dev);
497 
498 
499 	if (!err)
500 		err = check_queues_linked(dev);
501 
502 
503 	if (!err)
504 		err = opdl_add_event_handlers(dev);
505 
506 
507 	if (!err)
508 		err = build_all_dependencies(dev);
509 
510 	if (!err) {
511 		opdl_xstats_init(dev);
512 
513 		struct opdl_evdev *device = opdl_pmd_priv(dev);
514 
515 		PMD_DRV_LOG(INFO, "DEV_ID:[%02d] : "
516 			      "SUCCESS : Created %u total queues (%u ex, %u in),"
517 			      " %u opdls, %u event_dev ports, %u input ports",
518 			      opdl_pmd_dev_id(device),
519 			      device->nb_queues,
520 			      (device->nb_queues - device->nb_opdls),
521 			      device->nb_opdls,
522 			      device->nb_opdls,
523 			      device->nb_ports,
524 			      device->queue[0].nb_ports);
525 	} else
526 		opdl_stop(dev);
527 
528 	return err;
529 }
530 
531 static int
opdl_close(struct rte_eventdev * dev)532 opdl_close(struct rte_eventdev *dev)
533 {
534 	struct opdl_evdev *device = opdl_pmd_priv(dev);
535 	uint32_t i;
536 
537 	for (i = 0; i < device->max_port_nb; i++) {
538 		memset(&device->ports[i],
539 		       0,
540 		       sizeof(struct opdl_port));
541 	}
542 
543 	memset(&device->s_md,
544 			0x0,
545 			sizeof(struct opdl_stage_meta_data)*OPDL_PORTS_MAX);
546 
547 	memset(&device->q_md,
548 			0xFF,
549 			sizeof(struct opdl_queue_meta_data)*OPDL_MAX_QUEUES);
550 
551 
552 	memset(device->q_map_ex_to_in,
553 			0,
554 			sizeof(uint8_t)*OPDL_INVALID_QID);
555 
556 	opdl_xstats_uninit(dev);
557 
558 	device->max_port_nb = 0;
559 
560 	device->max_queue_nb = 0;
561 
562 	device->nb_opdls = 0;
563 
564 	device->nb_queues   = 0;
565 
566 	device->nb_ports    = 0;
567 
568 	device->nb_q_md     = 0;
569 
570 	dev->data->nb_queues = 0;
571 
572 	dev->data->nb_ports = 0;
573 
574 
575 	return 0;
576 }
577 
578 static int
assign_numa_node(const char * key __rte_unused,const char * value,void * opaque)579 assign_numa_node(const char *key __rte_unused, const char *value, void *opaque)
580 {
581 	int *socket_id = opaque;
582 	*socket_id = atoi(value);
583 	if (*socket_id >= RTE_MAX_NUMA_NODES)
584 		return -1;
585 	return 0;
586 }
587 
588 static int
set_do_validation(const char * key __rte_unused,const char * value,void * opaque)589 set_do_validation(const char *key __rte_unused, const char *value, void *opaque)
590 {
591 	int *do_val = opaque;
592 	*do_val = atoi(value);
593 	if (*do_val != 0)
594 		*do_val = 1;
595 
596 	return 0;
597 }
598 static int
set_do_test(const char * key __rte_unused,const char * value,void * opaque)599 set_do_test(const char *key __rte_unused, const char *value, void *opaque)
600 {
601 	int *do_test = opaque;
602 
603 	*do_test = atoi(value);
604 
605 	if (*do_test != 0)
606 		*do_test = 1;
607 	return 0;
608 }
609 
610 static int
opdl_probe(struct rte_vdev_device * vdev)611 opdl_probe(struct rte_vdev_device *vdev)
612 {
613 	static struct eventdev_ops evdev_opdl_ops = {
614 		.dev_configure = opdl_dev_configure,
615 		.dev_infos_get = opdl_info_get,
616 		.dev_close = opdl_close,
617 		.dev_start = opdl_start,
618 		.dev_stop = opdl_stop,
619 		.dump = opdl_dump,
620 
621 		.queue_def_conf = opdl_queue_def_conf,
622 		.queue_setup = opdl_queue_setup,
623 		.queue_release = opdl_queue_release,
624 		.port_def_conf = opdl_port_def_conf,
625 		.port_setup = opdl_port_setup,
626 		.port_release = opdl_port_release,
627 		.port_link = opdl_port_link,
628 		.port_unlink = opdl_port_unlink,
629 
630 
631 		.xstats_get = opdl_xstats_get,
632 		.xstats_get_names = opdl_xstats_get_names,
633 		.xstats_get_by_name = opdl_xstats_get_by_name,
634 		.xstats_reset = opdl_xstats_reset,
635 	};
636 
637 	static const char *const args[] = {
638 		NUMA_NODE_ARG,
639 		DO_VALIDATION_ARG,
640 		DO_TEST_ARG,
641 		NULL
642 	};
643 	const char *name;
644 	const char *params;
645 	struct rte_eventdev *dev;
646 	struct opdl_evdev *opdl;
647 	int socket_id = rte_socket_id();
648 	int do_validation = 0;
649 	int do_test = 0;
650 	int str_len;
651 	int test_result = 0;
652 
653 	name = rte_vdev_device_name(vdev);
654 	params = rte_vdev_device_args(vdev);
655 	if (params != NULL && params[0] != '\0') {
656 		struct rte_kvargs *kvlist = rte_kvargs_parse(params, args);
657 
658 		if (!kvlist) {
659 			PMD_DRV_LOG(INFO,
660 					"Ignoring unsupported parameters when creating device '%s'\n",
661 					name);
662 		} else {
663 			int ret = rte_kvargs_process(kvlist, NUMA_NODE_ARG,
664 					assign_numa_node, &socket_id);
665 			if (ret != 0) {
666 				PMD_DRV_LOG(ERR,
667 						"%s: Error parsing numa node parameter",
668 						name);
669 
670 				rte_kvargs_free(kvlist);
671 				return ret;
672 			}
673 
674 			ret = rte_kvargs_process(kvlist, DO_VALIDATION_ARG,
675 					set_do_validation, &do_validation);
676 			if (ret != 0) {
677 				PMD_DRV_LOG(ERR,
678 					"%s: Error parsing do validation parameter",
679 					name);
680 				rte_kvargs_free(kvlist);
681 				return ret;
682 			}
683 
684 			ret = rte_kvargs_process(kvlist, DO_TEST_ARG,
685 					set_do_test, &do_test);
686 			if (ret != 0) {
687 				PMD_DRV_LOG(ERR,
688 					"%s: Error parsing do test parameter",
689 					name);
690 				rte_kvargs_free(kvlist);
691 				return ret;
692 			}
693 
694 			rte_kvargs_free(kvlist);
695 		}
696 	}
697 	dev = rte_event_pmd_vdev_init(name,
698 			sizeof(struct opdl_evdev), socket_id);
699 
700 	if (dev == NULL) {
701 		PMD_DRV_LOG(ERR, "eventdev vdev init() failed");
702 		return -EFAULT;
703 	}
704 
705 	PMD_DRV_LOG(INFO, "DEV_ID:[%02d] : "
706 		      "Success - creating eventdev device %s, numa_node:[%d], do_validation:[%s]"
707 			  " , self_test:[%s]\n",
708 		      dev->data->dev_id,
709 		      name,
710 		      socket_id,
711 		      (do_validation ? "true" : "false"),
712 			  (do_test ? "true" : "false"));
713 
714 	dev->dev_ops = &evdev_opdl_ops;
715 
716 	dev->enqueue = opdl_event_enqueue;
717 	dev->enqueue_burst = opdl_event_enqueue_burst;
718 	dev->enqueue_new_burst = opdl_event_enqueue_burst;
719 	dev->enqueue_forward_burst = opdl_event_enqueue_burst;
720 	dev->dequeue = opdl_event_dequeue;
721 	dev->dequeue_burst = opdl_event_dequeue_burst;
722 
723 	if (rte_eal_process_type() != RTE_PROC_PRIMARY)
724 		goto done;
725 
726 	opdl = dev->data->dev_private;
727 	opdl->data = dev->data;
728 	opdl->socket = socket_id;
729 	opdl->do_validation = do_validation;
730 	opdl->do_test = do_test;
731 	str_len = strlen(name);
732 	memcpy(opdl->service_name, name, str_len);
733 
734 	if (do_test == 1)
735 		test_result =  opdl_selftest();
736 
737 done:
738 	event_dev_probing_finish(dev);
739 	return test_result;
740 }
741 
742 static int
opdl_remove(struct rte_vdev_device * vdev)743 opdl_remove(struct rte_vdev_device *vdev)
744 {
745 	const char *name;
746 
747 	name = rte_vdev_device_name(vdev);
748 	if (name == NULL)
749 		return -EINVAL;
750 
751 	PMD_DRV_LOG(INFO, "Closing eventdev opdl device %s\n", name);
752 
753 	return rte_event_pmd_vdev_uninit(name);
754 }
755 
756 static struct rte_vdev_driver evdev_opdl_pmd_drv = {
757 	.probe = opdl_probe,
758 	.remove = opdl_remove
759 };
760 
761 RTE_LOG_REGISTER_SUFFIX(opdl_logtype_driver, driver, INFO);
762 
763 RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_OPDL_PMD, evdev_opdl_pmd_drv);
764 RTE_PMD_REGISTER_PARAM_STRING(event_opdl, NUMA_NODE_ARG "=<int>"
765 			      DO_VALIDATION_ARG "=<int>" DO_TEST_ARG "=<int>");
766