xref: /f-stack/dpdk/drivers/event/dlb/dlb.c (revision 2d9fd380)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2020 Intel Corporation
3  */
4 
5 #include <assert.h>
6 #include <errno.h>
7 #include <nmmintrin.h>
8 #include <pthread.h>
9 #include <stdbool.h>
10 #include <stdint.h>
11 #include <stdio.h>
12 #include <string.h>
13 #include <sys/fcntl.h>
14 #include <sys/mman.h>
15 #include <unistd.h>
16 
17 #include <rte_common.h>
18 #include <rte_config.h>
19 #include <rte_cycles.h>
20 #include <rte_debug.h>
21 #include <rte_dev.h>
22 #include <rte_errno.h>
23 #include <rte_io.h>
24 #include <rte_kvargs.h>
25 #include <rte_log.h>
26 #include <rte_malloc.h>
27 #include <rte_mbuf.h>
28 #include <rte_power_intrinsics.h>
29 #include <rte_prefetch.h>
30 #include <rte_ring.h>
31 #include <rte_string_fns.h>
32 
33 #include <rte_eventdev.h>
34 #include <rte_eventdev_pmd.h>
35 
36 #include "dlb_priv.h"
37 #include "dlb_iface.h"
38 #include "dlb_inline_fns.h"
39 
40 /*
41  * Resources exposed to eventdev.
42  */
43 #if (RTE_EVENT_MAX_QUEUES_PER_DEV > UINT8_MAX)
44 #error "RTE_EVENT_MAX_QUEUES_PER_DEV cannot fit in member max_event_queues"
45 #endif
46 static struct rte_event_dev_info evdev_dlb_default_info = {
47 	.driver_name = "", /* probe will set */
48 	.min_dequeue_timeout_ns = DLB_MIN_DEQUEUE_TIMEOUT_NS,
49 	.max_dequeue_timeout_ns = DLB_MAX_DEQUEUE_TIMEOUT_NS,
50 #if (RTE_EVENT_MAX_QUEUES_PER_DEV < DLB_MAX_NUM_LDB_QUEUES)
51 	.max_event_queues = RTE_EVENT_MAX_QUEUES_PER_DEV,
52 #else
53 	.max_event_queues = DLB_MAX_NUM_LDB_QUEUES,
54 #endif
55 	.max_event_queue_flows = DLB_MAX_NUM_FLOWS,
56 	.max_event_queue_priority_levels = DLB_QID_PRIORITIES,
57 	.max_event_priority_levels = DLB_QID_PRIORITIES,
58 	.max_event_ports = DLB_MAX_NUM_LDB_PORTS,
59 	.max_event_port_dequeue_depth = DLB_MAX_CQ_DEPTH,
60 	.max_event_port_enqueue_depth = DLB_MAX_ENQUEUE_DEPTH,
61 	.max_event_port_links = DLB_MAX_NUM_QIDS_PER_LDB_CQ,
62 	.max_num_events = DLB_MAX_NUM_LDB_CREDITS,
63 	.max_single_link_event_port_queue_pairs = DLB_MAX_NUM_DIR_PORTS,
64 	.event_dev_cap = (RTE_EVENT_DEV_CAP_QUEUE_QOS |
65 			  RTE_EVENT_DEV_CAP_EVENT_QOS |
66 			  RTE_EVENT_DEV_CAP_BURST_MODE |
67 			  RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED |
68 			  RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE |
69 			  RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES),
70 };
71 
72 struct process_local_port_data
73 dlb_port[DLB_MAX_NUM_PORTS][NUM_DLB_PORT_TYPES];
74 
75 static inline uint16_t
76 dlb_event_enqueue_delayed(void *event_port,
77 			  const struct rte_event events[]);
78 
79 static inline uint16_t
80 dlb_event_enqueue_burst_delayed(void *event_port,
81 				const struct rte_event events[],
82 				uint16_t num);
83 
84 static inline uint16_t
85 dlb_event_enqueue_new_burst_delayed(void *event_port,
86 				    const struct rte_event events[],
87 				    uint16_t num);
88 
89 static inline uint16_t
90 dlb_event_enqueue_forward_burst_delayed(void *event_port,
91 					const struct rte_event events[],
92 					uint16_t num);
93 
94 static int
dlb_hw_query_resources(struct dlb_eventdev * dlb)95 dlb_hw_query_resources(struct dlb_eventdev *dlb)
96 {
97 	struct dlb_hw_dev *handle = &dlb->qm_instance;
98 	struct dlb_hw_resource_info *dlb_info = &handle->info;
99 	int ret;
100 
101 	ret = dlb_iface_get_num_resources(handle,
102 					  &dlb->hw_rsrc_query_results);
103 	if (ret) {
104 		DLB_LOG_ERR("get dlb num resources, err=%d\n", ret);
105 		return ret;
106 	}
107 
108 	/* Complete filling in device resource info returned to evdev app,
109 	 * overriding any default values.
110 	 * The capabilities (CAPs) were set at compile time.
111 	 */
112 
113 	evdev_dlb_default_info.max_event_queues =
114 		dlb->hw_rsrc_query_results.num_ldb_queues;
115 
116 	evdev_dlb_default_info.max_event_ports =
117 		dlb->hw_rsrc_query_results.num_ldb_ports;
118 
119 	evdev_dlb_default_info.max_num_events =
120 		dlb->hw_rsrc_query_results.max_contiguous_ldb_credits;
121 
122 	/* Save off values used when creating the scheduling domain. */
123 
124 	handle->info.num_sched_domains =
125 		dlb->hw_rsrc_query_results.num_sched_domains;
126 
127 	handle->info.hw_rsrc_max.nb_events_limit =
128 		dlb->hw_rsrc_query_results.max_contiguous_ldb_credits;
129 
130 	handle->info.hw_rsrc_max.num_queues =
131 		dlb->hw_rsrc_query_results.num_ldb_queues +
132 		dlb->hw_rsrc_query_results.num_dir_ports;
133 
134 	handle->info.hw_rsrc_max.num_ldb_queues =
135 		dlb->hw_rsrc_query_results.num_ldb_queues;
136 
137 	handle->info.hw_rsrc_max.num_ldb_ports =
138 		dlb->hw_rsrc_query_results.num_ldb_ports;
139 
140 	handle->info.hw_rsrc_max.num_dir_ports =
141 		dlb->hw_rsrc_query_results.num_dir_ports;
142 
143 	handle->info.hw_rsrc_max.reorder_window_size =
144 		dlb->hw_rsrc_query_results.num_hist_list_entries;
145 
146 	rte_memcpy(dlb_info, &handle->info.hw_rsrc_max, sizeof(*dlb_info));
147 
148 	return 0;
149 }
150 
151 static void
dlb_free_qe_mem(struct dlb_port * qm_port)152 dlb_free_qe_mem(struct dlb_port *qm_port)
153 {
154 	if (qm_port == NULL)
155 		return;
156 
157 	rte_free(qm_port->qe4);
158 	qm_port->qe4 = NULL;
159 
160 	rte_free(qm_port->consume_qe);
161 	qm_port->consume_qe = NULL;
162 
163 	rte_memzone_free(dlb_port[qm_port->id][PORT_TYPE(qm_port)].mz);
164 	dlb_port[qm_port->id][PORT_TYPE(qm_port)].mz = NULL;
165 }
166 
167 static int
dlb_init_consume_qe(struct dlb_port * qm_port,char * mz_name)168 dlb_init_consume_qe(struct dlb_port *qm_port, char *mz_name)
169 {
170 	struct dlb_cq_pop_qe *qe;
171 
172 	qe = rte_zmalloc(mz_name,
173 			DLB_NUM_QES_PER_CACHE_LINE *
174 				sizeof(struct dlb_cq_pop_qe),
175 			RTE_CACHE_LINE_SIZE);
176 
177 	if (qe == NULL)	{
178 		DLB_LOG_ERR("dlb: no memory for consume_qe\n");
179 		return -ENOMEM;
180 	}
181 
182 	qm_port->consume_qe = qe;
183 
184 	qe->qe_valid = 0;
185 	qe->qe_frag = 0;
186 	qe->qe_comp = 0;
187 	qe->cq_token = 1;
188 	/* Tokens value is 0-based; i.e. '0' returns 1 token, '1' returns 2,
189 	 * and so on.
190 	 */
191 	qe->tokens = 0;	/* set at run time */
192 	qe->meas_lat = 0;
193 	qe->no_dec = 0;
194 	/* Completion IDs are disabled */
195 	qe->cmp_id = 0;
196 
197 	return 0;
198 }
199 
200 static int
dlb_init_qe_mem(struct dlb_port * qm_port,char * mz_name)201 dlb_init_qe_mem(struct dlb_port *qm_port, char *mz_name)
202 {
203 	int ret, sz;
204 
205 	sz = DLB_NUM_QES_PER_CACHE_LINE * sizeof(struct dlb_enqueue_qe);
206 
207 	qm_port->qe4 = rte_zmalloc(mz_name, sz, RTE_CACHE_LINE_SIZE);
208 
209 	if (qm_port->qe4 == NULL) {
210 		DLB_LOG_ERR("dlb: no qe4 memory\n");
211 		ret = -ENOMEM;
212 		goto error_exit;
213 	}
214 
215 	ret = dlb_init_consume_qe(qm_port, mz_name);
216 	if (ret < 0) {
217 		DLB_LOG_ERR("dlb: dlb_init_consume_qe ret=%d\n", ret);
218 		goto error_exit;
219 	}
220 
221 	return 0;
222 
223 error_exit:
224 
225 	dlb_free_qe_mem(qm_port);
226 
227 	return ret;
228 }
229 
230 /* Wrapper for string to int conversion. Substituted for atoi(...), which is
231  * unsafe.
232  */
233 #define DLB_BASE_10 10
234 
235 static int
dlb_string_to_int(int * result,const char * str)236 dlb_string_to_int(int *result, const char *str)
237 {
238 	long ret;
239 	char *endstr;
240 
241 	if (str == NULL || result == NULL)
242 		return -EINVAL;
243 
244 	errno = 0;
245 	ret = strtol(str, &endstr, DLB_BASE_10);
246 	if (errno)
247 		return -errno;
248 
249 	/* long int and int may be different width for some architectures */
250 	if (ret < INT_MIN || ret > INT_MAX || endstr == str)
251 		return -EINVAL;
252 
253 	*result = ret;
254 	return 0;
255 }
256 
257 static int
set_numa_node(const char * key __rte_unused,const char * value,void * opaque)258 set_numa_node(const char *key __rte_unused, const char *value, void *opaque)
259 {
260 	int *socket_id = opaque;
261 	int ret;
262 
263 	ret = dlb_string_to_int(socket_id, value);
264 	if (ret < 0)
265 		return ret;
266 
267 	if (*socket_id > RTE_MAX_NUMA_NODES)
268 		return -EINVAL;
269 
270 	return 0;
271 }
272 
273 static int
set_max_num_events(const char * key __rte_unused,const char * value,void * opaque)274 set_max_num_events(const char *key __rte_unused,
275 		   const char *value,
276 		   void *opaque)
277 {
278 	int *max_num_events = opaque;
279 	int ret;
280 
281 	if (value == NULL || opaque == NULL) {
282 		DLB_LOG_ERR("NULL pointer\n");
283 		return -EINVAL;
284 	}
285 
286 	ret = dlb_string_to_int(max_num_events, value);
287 	if (ret < 0)
288 		return ret;
289 
290 	if (*max_num_events < 0 || *max_num_events > DLB_MAX_NUM_LDB_CREDITS) {
291 		DLB_LOG_ERR("dlb: max_num_events must be between 0 and %d\n",
292 			    DLB_MAX_NUM_LDB_CREDITS);
293 		return -EINVAL;
294 	}
295 
296 	return 0;
297 }
298 
299 static int
set_num_dir_credits(const char * key __rte_unused,const char * value,void * opaque)300 set_num_dir_credits(const char *key __rte_unused,
301 		    const char *value,
302 		    void *opaque)
303 {
304 	int *num_dir_credits = opaque;
305 	int ret;
306 
307 	if (value == NULL || opaque == NULL) {
308 		DLB_LOG_ERR("NULL pointer\n");
309 		return -EINVAL;
310 	}
311 
312 	ret = dlb_string_to_int(num_dir_credits, value);
313 	if (ret < 0)
314 		return ret;
315 
316 	if (*num_dir_credits < 0 ||
317 	    *num_dir_credits > DLB_MAX_NUM_DIR_CREDITS) {
318 		DLB_LOG_ERR("dlb: num_dir_credits must be between 0 and %d\n",
319 			    DLB_MAX_NUM_DIR_CREDITS);
320 		return -EINVAL;
321 	}
322 	return 0;
323 }
324 
325 /* VDEV-only notes:
326  * This function first unmaps all memory mappings and closes the
327  * domain's file descriptor, which causes the driver to reset the
328  * scheduling domain. Once that completes (when close() returns), we
329  * can safely free the dynamically allocated memory used by the
330  * scheduling domain.
331  *
332  * PF-only notes:
333  * We will maintain a use count and use that to determine when
334  * a reset is required.  In PF mode, we never mmap, or munmap
335  * device memory,  and we own the entire physical PCI device.
336  */
337 
338 static void
dlb_hw_reset_sched_domain(const struct rte_eventdev * dev,bool reconfig)339 dlb_hw_reset_sched_domain(const struct rte_eventdev *dev, bool reconfig)
340 {
341 	struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
342 	enum dlb_configuration_state config_state;
343 	int i, j;
344 
345 	/* Close and reset the domain */
346 	dlb_iface_domain_close(dlb);
347 
348 	/* Free all dynamically allocated port memory */
349 	for (i = 0; i < dlb->num_ports; i++)
350 		dlb_free_qe_mem(&dlb->ev_ports[i].qm_port);
351 
352 	/* If reconfiguring, mark the device's queues and ports as "previously
353 	 * configured." If the user does not reconfigure them, the PMD will
354 	 * reapply their previous configuration when the device is started.
355 	 */
356 	config_state = (reconfig) ? DLB_PREV_CONFIGURED : DLB_NOT_CONFIGURED;
357 
358 	for (i = 0; i < dlb->num_ports; i++) {
359 		dlb->ev_ports[i].qm_port.config_state = config_state;
360 		/* Reset setup_done so ports can be reconfigured */
361 		dlb->ev_ports[i].setup_done = false;
362 		for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++)
363 			dlb->ev_ports[i].link[j].mapped = false;
364 	}
365 
366 	for (i = 0; i < dlb->num_queues; i++)
367 		dlb->ev_queues[i].qm_queue.config_state = config_state;
368 
369 	for (i = 0; i < DLB_MAX_NUM_QUEUES; i++)
370 		dlb->ev_queues[i].setup_done = false;
371 
372 	dlb->num_ports = 0;
373 	dlb->num_ldb_ports = 0;
374 	dlb->num_dir_ports = 0;
375 	dlb->num_queues = 0;
376 	dlb->num_ldb_queues = 0;
377 	dlb->num_dir_queues = 0;
378 	dlb->configured = false;
379 }
380 
381 static int
dlb_ldb_credit_pool_create(struct dlb_hw_dev * handle)382 dlb_ldb_credit_pool_create(struct dlb_hw_dev *handle)
383 {
384 	struct dlb_create_ldb_pool_args cfg;
385 	struct dlb_cmd_response response;
386 	int ret;
387 
388 	if (handle == NULL)
389 		return -EINVAL;
390 
391 	if (!handle->cfg.resources.num_ldb_credits) {
392 		handle->cfg.ldb_credit_pool_id = 0;
393 		handle->cfg.num_ldb_credits = 0;
394 		return 0;
395 	}
396 
397 	cfg.response = (uintptr_t)&response;
398 	cfg.num_ldb_credits = handle->cfg.resources.num_ldb_credits;
399 
400 	ret = dlb_iface_ldb_credit_pool_create(handle,
401 					       &cfg);
402 	if (ret < 0) {
403 		DLB_LOG_ERR("dlb: ldb_credit_pool_create ret=%d (driver status: %s)\n",
404 			    ret, dlb_error_strings[response.status]);
405 	}
406 
407 	handle->cfg.ldb_credit_pool_id = response.id;
408 	handle->cfg.num_ldb_credits = cfg.num_ldb_credits;
409 
410 	return ret;
411 }
412 
413 static int
dlb_dir_credit_pool_create(struct dlb_hw_dev * handle)414 dlb_dir_credit_pool_create(struct dlb_hw_dev *handle)
415 {
416 	struct dlb_create_dir_pool_args cfg;
417 	struct dlb_cmd_response response;
418 	int ret;
419 
420 	if (handle == NULL)
421 		return -EINVAL;
422 
423 	if (!handle->cfg.resources.num_dir_credits) {
424 		handle->cfg.dir_credit_pool_id = 0;
425 		handle->cfg.num_dir_credits = 0;
426 		return 0;
427 	}
428 
429 	cfg.response = (uintptr_t)&response;
430 	cfg.num_dir_credits = handle->cfg.resources.num_dir_credits;
431 
432 	ret = dlb_iface_dir_credit_pool_create(handle, &cfg);
433 	if (ret < 0)
434 		DLB_LOG_ERR("dlb: dir_credit_pool_create ret=%d (driver status: %s)\n",
435 			    ret, dlb_error_strings[response.status]);
436 
437 	handle->cfg.dir_credit_pool_id = response.id;
438 	handle->cfg.num_dir_credits = cfg.num_dir_credits;
439 
440 	return ret;
441 }
442 
443 static int
dlb_hw_create_sched_domain(struct dlb_hw_dev * handle,struct dlb_eventdev * dlb,const struct dlb_hw_rsrcs * resources_asked)444 dlb_hw_create_sched_domain(struct dlb_hw_dev *handle,
445 			   struct dlb_eventdev *dlb,
446 			   const struct dlb_hw_rsrcs *resources_asked)
447 {
448 	int ret = 0;
449 	struct dlb_create_sched_domain_args *config_params;
450 	struct dlb_cmd_response response;
451 
452 	if (resources_asked == NULL) {
453 		DLB_LOG_ERR("dlb: dlb_create NULL parameter\n");
454 		ret = EINVAL;
455 		goto error_exit;
456 	}
457 
458 	/* Map generic qm resources to dlb resources */
459 	config_params = &handle->cfg.resources;
460 
461 	config_params->response = (uintptr_t)&response;
462 
463 	/* DIR ports and queues */
464 
465 	config_params->num_dir_ports =
466 		resources_asked->num_dir_ports;
467 
468 	config_params->num_dir_credits =
469 		resources_asked->num_dir_credits;
470 
471 	/* LDB ports and queues */
472 
473 	config_params->num_ldb_queues =
474 		resources_asked->num_ldb_queues;
475 
476 	config_params->num_ldb_ports =
477 		resources_asked->num_ldb_ports;
478 
479 	config_params->num_ldb_credits =
480 		resources_asked->num_ldb_credits;
481 
482 	config_params->num_atomic_inflights =
483 		dlb->num_atm_inflights_per_queue *
484 		config_params->num_ldb_queues;
485 
486 	config_params->num_hist_list_entries = config_params->num_ldb_ports *
487 		DLB_NUM_HIST_LIST_ENTRIES_PER_LDB_PORT;
488 
489 	/* dlb limited to 1 credit pool per queue type */
490 	config_params->num_ldb_credit_pools = 1;
491 	config_params->num_dir_credit_pools = 1;
492 
493 	DLB_LOG_DBG("sched domain create - ldb_qs=%d, ldb_ports=%d, dir_ports=%d, atomic_inflights=%d, hist_list_entries=%d, ldb_credits=%d, dir_credits=%d, ldb_cred_pools=%d, dir-credit_pools=%d\n",
494 		    config_params->num_ldb_queues,
495 		    config_params->num_ldb_ports,
496 		    config_params->num_dir_ports,
497 		    config_params->num_atomic_inflights,
498 		    config_params->num_hist_list_entries,
499 		    config_params->num_ldb_credits,
500 		    config_params->num_dir_credits,
501 		    config_params->num_ldb_credit_pools,
502 		    config_params->num_dir_credit_pools);
503 
504 	/* Configure the QM */
505 
506 	ret = dlb_iface_sched_domain_create(handle, config_params);
507 	if (ret < 0) {
508 		DLB_LOG_ERR("dlb: domain create failed, device_id = %d, (driver ret = %d, extra status: %s)\n",
509 			    handle->device_id,
510 			    ret,
511 			    dlb_error_strings[response.status]);
512 		goto error_exit;
513 	}
514 
515 	handle->domain_id = response.id;
516 	handle->domain_id_valid = 1;
517 
518 	config_params->response = 0;
519 
520 	ret = dlb_ldb_credit_pool_create(handle);
521 	if (ret < 0) {
522 		DLB_LOG_ERR("dlb: create ldb credit pool failed\n");
523 		goto error_exit2;
524 	}
525 
526 	ret = dlb_dir_credit_pool_create(handle);
527 	if (ret < 0) {
528 		DLB_LOG_ERR("dlb: create dir credit pool failed\n");
529 		goto error_exit2;
530 	}
531 
532 	handle->cfg.configured = true;
533 
534 	return 0;
535 
536 error_exit2:
537 	dlb_iface_domain_close(dlb);
538 
539 error_exit:
540 	return ret;
541 }
542 
543 /* End HW specific */
544 static void
dlb_eventdev_info_get(struct rte_eventdev * dev,struct rte_event_dev_info * dev_info)545 dlb_eventdev_info_get(struct rte_eventdev *dev,
546 		      struct rte_event_dev_info *dev_info)
547 {
548 	struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
549 	int ret;
550 
551 	ret = dlb_hw_query_resources(dlb);
552 	if (ret) {
553 		const struct rte_eventdev_data *data = dev->data;
554 
555 		DLB_LOG_ERR("get resources err=%d, devid=%d\n",
556 			    ret, data->dev_id);
557 		/* fn is void, so fall through and return values set up in
558 		 * probe
559 		 */
560 	}
561 
562 	/* Add num resources currently owned by this domain.
563 	 * These would become available if the scheduling domain were reset due
564 	 * to the application recalling eventdev_configure to *reconfigure* the
565 	 * domain.
566 	 */
567 	evdev_dlb_default_info.max_event_ports += dlb->num_ldb_ports;
568 	evdev_dlb_default_info.max_event_queues += dlb->num_ldb_queues;
569 	evdev_dlb_default_info.max_num_events += dlb->num_ldb_credits;
570 
571 	/* In DLB A-stepping hardware, applications are limited to 128
572 	 * configured ports (load-balanced or directed). The reported number of
573 	 * available ports must reflect this.
574 	 */
575 	if (dlb->revision < DLB_REV_B0) {
576 		int used_ports;
577 
578 		used_ports = DLB_MAX_NUM_LDB_PORTS + DLB_MAX_NUM_DIR_PORTS -
579 			dlb->hw_rsrc_query_results.num_ldb_ports -
580 			dlb->hw_rsrc_query_results.num_dir_ports;
581 
582 		evdev_dlb_default_info.max_event_ports =
583 			RTE_MIN(evdev_dlb_default_info.max_event_ports,
584 				128 - used_ports);
585 	}
586 
587 	evdev_dlb_default_info.max_event_queues =
588 		RTE_MIN(evdev_dlb_default_info.max_event_queues,
589 			RTE_EVENT_MAX_QUEUES_PER_DEV);
590 
591 	evdev_dlb_default_info.max_num_events =
592 		RTE_MIN(evdev_dlb_default_info.max_num_events,
593 			dlb->max_num_events_override);
594 
595 	*dev_info = evdev_dlb_default_info;
596 }
597 
598 /* Note: 1 QM instance per QM device, QM instance/device == event device */
599 static int
dlb_eventdev_configure(const struct rte_eventdev * dev)600 dlb_eventdev_configure(const struct rte_eventdev *dev)
601 {
602 	struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
603 	struct dlb_hw_dev *handle = &dlb->qm_instance;
604 	struct dlb_hw_rsrcs *rsrcs = &handle->info.hw_rsrc_max;
605 	const struct rte_eventdev_data *data = dev->data;
606 	const struct rte_event_dev_config *config = &data->dev_conf;
607 	int ret;
608 
609 	/* If this eventdev is already configured, we must release the current
610 	 * scheduling domain before attempting to configure a new one.
611 	 */
612 	if (dlb->configured) {
613 		dlb_hw_reset_sched_domain(dev, true);
614 
615 		ret = dlb_hw_query_resources(dlb);
616 		if (ret) {
617 			DLB_LOG_ERR("get resources err=%d, devid=%d\n",
618 				    ret, data->dev_id);
619 			return ret;
620 		}
621 	}
622 
623 	if (config->nb_event_queues > rsrcs->num_queues) {
624 		DLB_LOG_ERR("nb_event_queues parameter (%d) exceeds the QM device's capabilities (%d).\n",
625 			    config->nb_event_queues,
626 			    rsrcs->num_queues);
627 		return -EINVAL;
628 	}
629 	if (config->nb_event_ports > (rsrcs->num_ldb_ports
630 			+ rsrcs->num_dir_ports)) {
631 		DLB_LOG_ERR("nb_event_ports parameter (%d) exceeds the QM device's capabilities (%d).\n",
632 			    config->nb_event_ports,
633 			    (rsrcs->num_ldb_ports + rsrcs->num_dir_ports));
634 		return -EINVAL;
635 	}
636 	if (config->nb_events_limit > rsrcs->nb_events_limit) {
637 		DLB_LOG_ERR("nb_events_limit parameter (%d) exceeds the QM device's capabilities (%d).\n",
638 			    config->nb_events_limit,
639 			    rsrcs->nb_events_limit);
640 		return -EINVAL;
641 	}
642 
643 	if (config->event_dev_cfg & RTE_EVENT_DEV_CFG_PER_DEQUEUE_TIMEOUT)
644 		dlb->global_dequeue_wait = false;
645 	else {
646 		uint32_t timeout32;
647 
648 		dlb->global_dequeue_wait = true;
649 
650 		timeout32 = config->dequeue_timeout_ns;
651 
652 		dlb->global_dequeue_wait_ticks =
653 			timeout32 * (rte_get_timer_hz() / 1E9);
654 	}
655 
656 	/* Does this platform support umonitor/umwait? */
657 	if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_WAITPKG)) {
658 		if (RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE != 0 &&
659 		    RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE != 1) {
660 			DLB_LOG_ERR("invalid value (%d) for RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE must be 0 or 1.\n",
661 				    RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE);
662 			return -EINVAL;
663 		}
664 		dlb->umwait_allowed = true;
665 	}
666 
667 	rsrcs->num_dir_ports = config->nb_single_link_event_port_queues;
668 	rsrcs->num_ldb_ports = config->nb_event_ports - rsrcs->num_dir_ports;
669 	/* 1 dir queue per dir port */
670 	rsrcs->num_ldb_queues = config->nb_event_queues - rsrcs->num_dir_ports;
671 
672 	/* Scale down nb_events_limit by 4 for directed credits, since there
673 	 * are 4x as many load-balanced credits.
674 	 */
675 	rsrcs->num_ldb_credits = 0;
676 	rsrcs->num_dir_credits = 0;
677 
678 	if (rsrcs->num_ldb_queues)
679 		rsrcs->num_ldb_credits = config->nb_events_limit;
680 	if (rsrcs->num_dir_ports)
681 		rsrcs->num_dir_credits = config->nb_events_limit / 4;
682 	if (dlb->num_dir_credits_override != -1)
683 		rsrcs->num_dir_credits = dlb->num_dir_credits_override;
684 
685 	if (dlb_hw_create_sched_domain(handle, dlb, rsrcs) < 0) {
686 		DLB_LOG_ERR("dlb_hw_create_sched_domain failed\n");
687 		return -ENODEV;
688 	}
689 
690 	dlb->new_event_limit = config->nb_events_limit;
691 	__atomic_store_n(&dlb->inflights, 0, __ATOMIC_SEQ_CST);
692 
693 	/* Save number of ports/queues for this event dev */
694 	dlb->num_ports = config->nb_event_ports;
695 	dlb->num_queues = config->nb_event_queues;
696 	dlb->num_dir_ports = rsrcs->num_dir_ports;
697 	dlb->num_ldb_ports = dlb->num_ports - dlb->num_dir_ports;
698 	dlb->num_ldb_queues = dlb->num_queues - dlb->num_dir_ports;
699 	dlb->num_dir_queues = dlb->num_dir_ports;
700 	dlb->num_ldb_credits = rsrcs->num_ldb_credits;
701 	dlb->num_dir_credits = rsrcs->num_dir_credits;
702 
703 	dlb->configured = true;
704 
705 	return 0;
706 }
707 
708 static int16_t
dlb_hw_unmap_ldb_qid_from_port(struct dlb_hw_dev * handle,uint32_t qm_port_id,uint16_t qm_qid)709 dlb_hw_unmap_ldb_qid_from_port(struct dlb_hw_dev *handle,
710 			       uint32_t qm_port_id,
711 			       uint16_t qm_qid)
712 {
713 	struct dlb_unmap_qid_args cfg;
714 	struct dlb_cmd_response response;
715 	int32_t ret;
716 
717 	if (handle == NULL)
718 		return -EINVAL;
719 
720 	cfg.response = (uintptr_t)&response;
721 	cfg.port_id = qm_port_id;
722 	cfg.qid = qm_qid;
723 
724 	ret = dlb_iface_unmap_qid(handle, &cfg);
725 	if (ret < 0)
726 		DLB_LOG_ERR("dlb: unmap qid error, ret=%d (driver status: %s)\n",
727 			    ret, dlb_error_strings[response.status]);
728 
729 	return ret;
730 }
731 
732 static int
dlb_event_queue_detach_ldb(struct dlb_eventdev * dlb,struct dlb_eventdev_port * ev_port,struct dlb_eventdev_queue * ev_queue)733 dlb_event_queue_detach_ldb(struct dlb_eventdev *dlb,
734 			   struct dlb_eventdev_port *ev_port,
735 			   struct dlb_eventdev_queue *ev_queue)
736 {
737 	int ret, i;
738 
739 	/* Don't unlink until start time. */
740 	if (dlb->run_state == DLB_RUN_STATE_STOPPED)
741 		return 0;
742 
743 	for (i = 0; i < DLB_MAX_NUM_QIDS_PER_LDB_CQ; i++) {
744 		if (ev_port->link[i].valid &&
745 		    ev_port->link[i].queue_id == ev_queue->id)
746 			break; /* found */
747 	}
748 
749 	/* This is expected with eventdev API!
750 	 * It blindly attempts to unmap all queues.
751 	 */
752 	if (i == DLB_MAX_NUM_QIDS_PER_LDB_CQ) {
753 		DLB_LOG_DBG("dlb: ignoring LB QID %d not mapped for qm_port %d.\n",
754 			    ev_queue->qm_queue.id,
755 			    ev_port->qm_port.id);
756 		return 0;
757 	}
758 
759 	ret = dlb_hw_unmap_ldb_qid_from_port(&dlb->qm_instance,
760 					     ev_port->qm_port.id,
761 					     ev_queue->qm_queue.id);
762 	if (!ret)
763 		ev_port->link[i].mapped = false;
764 
765 	return ret;
766 }
767 
768 static int
dlb_eventdev_port_unlink(struct rte_eventdev * dev,void * event_port,uint8_t queues[],uint16_t nb_unlinks)769 dlb_eventdev_port_unlink(struct rte_eventdev *dev, void *event_port,
770 			 uint8_t queues[], uint16_t nb_unlinks)
771 {
772 	struct dlb_eventdev_port *ev_port = event_port;
773 	struct dlb_eventdev *dlb;
774 	int i;
775 
776 	RTE_SET_USED(dev);
777 
778 	if (!ev_port->setup_done) {
779 		DLB_LOG_ERR("dlb: evport %d is not configured\n",
780 			    ev_port->id);
781 		rte_errno = -EINVAL;
782 		return 0;
783 	}
784 
785 	if (queues == NULL || nb_unlinks == 0) {
786 		DLB_LOG_DBG("dlb: queues is NULL or nb_unlinks is 0\n");
787 		return 0; /* Ignore and return success */
788 	}
789 
790 	if (ev_port->qm_port.is_directed) {
791 		DLB_LOG_DBG("dlb: ignore unlink from dir port %d\n",
792 			    ev_port->id);
793 		rte_errno = 0;
794 		return nb_unlinks; /* as if success */
795 	}
796 
797 	dlb = ev_port->dlb;
798 
799 	for (i = 0; i < nb_unlinks; i++) {
800 		struct dlb_eventdev_queue *ev_queue;
801 		int ret, j;
802 
803 		if (queues[i] >= dlb->num_queues) {
804 			DLB_LOG_ERR("dlb: invalid queue id %d\n", queues[i]);
805 			rte_errno = -EINVAL;
806 			return i; /* return index of offending queue */
807 		}
808 
809 		ev_queue = &dlb->ev_queues[queues[i]];
810 
811 		/* Does a link exist? */
812 		for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++)
813 			if (ev_port->link[j].queue_id == queues[i] &&
814 			    ev_port->link[j].valid)
815 				break;
816 
817 		if (j == DLB_MAX_NUM_QIDS_PER_LDB_CQ)
818 			continue;
819 
820 		ret = dlb_event_queue_detach_ldb(dlb, ev_port, ev_queue);
821 		if (ret) {
822 			DLB_LOG_ERR("unlink err=%d for port %d queue %d\n",
823 				    ret, ev_port->id, queues[i]);
824 			rte_errno = -ENOENT;
825 			return i; /* return index of offending queue */
826 		}
827 
828 		ev_port->link[j].valid = false;
829 		ev_port->num_links--;
830 		ev_queue->num_links--;
831 	}
832 
833 	return nb_unlinks;
834 }
835 
836 static int
dlb_eventdev_port_unlinks_in_progress(struct rte_eventdev * dev,void * event_port)837 dlb_eventdev_port_unlinks_in_progress(struct rte_eventdev *dev,
838 				      void *event_port)
839 {
840 	struct dlb_eventdev_port *ev_port = event_port;
841 	struct dlb_eventdev *dlb;
842 	struct dlb_hw_dev *handle;
843 	struct dlb_pending_port_unmaps_args cfg;
844 	struct dlb_cmd_response response;
845 	int ret;
846 
847 	RTE_SET_USED(dev);
848 
849 	if (!ev_port->setup_done) {
850 		DLB_LOG_ERR("dlb: evport %d is not configured\n",
851 			    ev_port->id);
852 		rte_errno = -EINVAL;
853 		return 0;
854 	}
855 
856 	cfg.port_id = ev_port->qm_port.id;
857 	cfg.response = (uintptr_t)&response;
858 	dlb = ev_port->dlb;
859 	handle = &dlb->qm_instance;
860 	ret = dlb_iface_pending_port_unmaps(handle, &cfg);
861 
862 	if (ret < 0) {
863 		DLB_LOG_ERR("dlb: num_unlinks_in_progress ret=%d (driver status: %s)\n",
864 			    ret, dlb_error_strings[response.status]);
865 		return ret;
866 	}
867 
868 	return response.id;
869 }
870 
871 static void
dlb_eventdev_port_default_conf_get(struct rte_eventdev * dev,uint8_t port_id,struct rte_event_port_conf * port_conf)872 dlb_eventdev_port_default_conf_get(struct rte_eventdev *dev,
873 				   uint8_t port_id,
874 				   struct rte_event_port_conf *port_conf)
875 {
876 	RTE_SET_USED(port_id);
877 	struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
878 
879 	port_conf->new_event_threshold = dlb->new_event_limit;
880 	port_conf->dequeue_depth = 32;
881 	port_conf->enqueue_depth = DLB_MAX_ENQUEUE_DEPTH;
882 	port_conf->event_port_cfg = 0;
883 }
884 
885 static void
dlb_eventdev_queue_default_conf_get(struct rte_eventdev * dev,uint8_t queue_id,struct rte_event_queue_conf * queue_conf)886 dlb_eventdev_queue_default_conf_get(struct rte_eventdev *dev,
887 				    uint8_t queue_id,
888 				    struct rte_event_queue_conf *queue_conf)
889 {
890 	RTE_SET_USED(dev);
891 	RTE_SET_USED(queue_id);
892 	queue_conf->nb_atomic_flows = 1024;
893 	queue_conf->nb_atomic_order_sequences = 32;
894 	queue_conf->event_queue_cfg = 0;
895 	queue_conf->priority = 0;
896 }
897 
898 static int
dlb_hw_create_ldb_port(struct dlb_eventdev * dlb,struct dlb_eventdev_port * ev_port,uint32_t dequeue_depth,uint32_t cq_depth,uint32_t enqueue_depth,uint16_t rsvd_tokens,bool use_rsvd_token_scheme)899 dlb_hw_create_ldb_port(struct dlb_eventdev *dlb,
900 		       struct dlb_eventdev_port *ev_port,
901 		       uint32_t dequeue_depth,
902 		       uint32_t cq_depth,
903 		       uint32_t enqueue_depth,
904 		       uint16_t rsvd_tokens,
905 		       bool use_rsvd_token_scheme)
906 {
907 	struct dlb_hw_dev *handle = &dlb->qm_instance;
908 	struct dlb_create_ldb_port_args cfg = {0};
909 	struct dlb_cmd_response response = {0};
910 	int ret;
911 	struct dlb_port *qm_port = NULL;
912 	char mz_name[RTE_MEMZONE_NAMESIZE];
913 	uint32_t qm_port_id;
914 
915 	if (handle == NULL)
916 		return -EINVAL;
917 
918 	if (cq_depth < DLB_MIN_LDB_CQ_DEPTH) {
919 		DLB_LOG_ERR("dlb: invalid cq_depth, must be %d-%d\n",
920 			DLB_MIN_LDB_CQ_DEPTH, DLB_MAX_INPUT_QUEUE_DEPTH);
921 		return -EINVAL;
922 	}
923 
924 	if (enqueue_depth < DLB_MIN_ENQUEUE_DEPTH) {
925 		DLB_LOG_ERR("dlb: invalid enqueue_depth, must be at least %d\n",
926 			    DLB_MIN_ENQUEUE_DEPTH);
927 		return -EINVAL;
928 	}
929 
930 	rte_spinlock_lock(&handle->resource_lock);
931 
932 	cfg.response = (uintptr_t)&response;
933 
934 	/* We round up to the next power of 2 if necessary */
935 	cfg.cq_depth = rte_align32pow2(cq_depth);
936 	cfg.cq_depth_threshold = rsvd_tokens;
937 
938 	cfg.cq_history_list_size = DLB_NUM_HIST_LIST_ENTRIES_PER_LDB_PORT;
939 
940 	/* User controls the LDB high watermark via enqueue depth. The DIR high
941 	 * watermark is equal, unless the directed credit pool is too small.
942 	 */
943 	cfg.ldb_credit_high_watermark = enqueue_depth;
944 
945 	/* If there are no directed ports, the kernel driver will ignore this
946 	 * port's directed credit settings. Don't use enqueue_depth if it would
947 	 * require more directed credits than are available.
948 	 */
949 	cfg.dir_credit_high_watermark =
950 		RTE_MIN(enqueue_depth,
951 			handle->cfg.num_dir_credits / dlb->num_ports);
952 
953 	cfg.ldb_credit_quantum = cfg.ldb_credit_high_watermark / 2;
954 	cfg.ldb_credit_low_watermark = RTE_MIN(16, cfg.ldb_credit_quantum);
955 
956 	cfg.dir_credit_quantum = cfg.dir_credit_high_watermark / 2;
957 	cfg.dir_credit_low_watermark = RTE_MIN(16, cfg.dir_credit_quantum);
958 
959 	/* Per QM values */
960 
961 	cfg.ldb_credit_pool_id = handle->cfg.ldb_credit_pool_id;
962 	cfg.dir_credit_pool_id = handle->cfg.dir_credit_pool_id;
963 
964 	ret = dlb_iface_ldb_port_create(handle, &cfg, dlb->poll_mode);
965 	if (ret < 0) {
966 		DLB_LOG_ERR("dlb: dlb_ldb_port_create error, ret=%d (driver status: %s)\n",
967 			    ret, dlb_error_strings[response.status]);
968 		goto error_exit;
969 	}
970 
971 	qm_port_id = response.id;
972 
973 	DLB_LOG_DBG("dlb: ev_port %d uses qm LB port %d <<<<<\n",
974 		    ev_port->id, qm_port_id);
975 
976 	qm_port = &ev_port->qm_port;
977 	qm_port->ev_port = ev_port; /* back ptr */
978 	qm_port->dlb = dlb; /* back ptr */
979 
980 	/*
981 	 * Allocate and init local qe struct(s).
982 	 * Note: MOVDIR64 requires the enqueue QE (qe4) to be aligned.
983 	 */
984 
985 	snprintf(mz_name, sizeof(mz_name), "ldb_port%d",
986 		 ev_port->id);
987 
988 	ret = dlb_init_qe_mem(qm_port, mz_name);
989 	if (ret < 0) {
990 		DLB_LOG_ERR("dlb: init_qe_mem failed, ret=%d\n", ret);
991 		goto error_exit;
992 	}
993 
994 	qm_port->pp_mmio_base = DLB_LDB_PP_BASE + PAGE_SIZE * qm_port_id;
995 	qm_port->id = qm_port_id;
996 
997 	/* The credit window is one high water mark of QEs */
998 	qm_port->ldb_pushcount_at_credit_expiry = 0;
999 	qm_port->cached_ldb_credits = cfg.ldb_credit_high_watermark;
1000 	/* The credit window is one high water mark of QEs */
1001 	qm_port->dir_pushcount_at_credit_expiry = 0;
1002 	qm_port->cached_dir_credits = cfg.dir_credit_high_watermark;
1003 	/* CQs with depth < 8 use an 8-entry queue, but withhold credits so
1004 	 * the effective depth is smaller.
1005 	 */
1006 	qm_port->cq_depth = cfg.cq_depth <= 8 ? 8 : cfg.cq_depth;
1007 	qm_port->cq_idx = 0;
1008 	qm_port->cq_idx_unmasked = 0;
1009 	if (dlb->poll_mode == DLB_CQ_POLL_MODE_SPARSE)
1010 		qm_port->cq_depth_mask = (qm_port->cq_depth * 4) - 1;
1011 	else
1012 		qm_port->cq_depth_mask = qm_port->cq_depth - 1;
1013 
1014 	qm_port->gen_bit_shift = __builtin_popcount(qm_port->cq_depth_mask);
1015 	/* starting value of gen bit - it toggles at wrap time */
1016 	qm_port->gen_bit = 1;
1017 
1018 	qm_port->use_rsvd_token_scheme = use_rsvd_token_scheme;
1019 	qm_port->cq_rsvd_token_deficit = rsvd_tokens;
1020 	qm_port->int_armed = false;
1021 
1022 	/* Save off for later use in info and lookup APIs. */
1023 	qm_port->qid_mappings = &dlb->qm_ldb_to_ev_queue_id[0];
1024 
1025 	qm_port->dequeue_depth = dequeue_depth;
1026 
1027 	/* When using the reserved token scheme, token_pop_thresh is
1028 	 * initially 2 * dequeue_depth. Once the tokens are reserved,
1029 	 * the enqueue code re-assigns it to dequeue_depth.
1030 	 */
1031 	qm_port->token_pop_thresh = cq_depth;
1032 
1033 	/* When the deferred scheduling vdev arg is selected, use deferred pop
1034 	 * for all single-entry CQs.
1035 	 */
1036 	if (cfg.cq_depth == 1 || (cfg.cq_depth == 2 && use_rsvd_token_scheme)) {
1037 		if (dlb->defer_sched)
1038 			qm_port->token_pop_mode = DEFERRED_POP;
1039 	}
1040 
1041 	/* The default enqueue functions do not include delayed-pop support for
1042 	 * performance reasons.
1043 	 */
1044 	if (qm_port->token_pop_mode == DELAYED_POP) {
1045 		dlb->event_dev->enqueue = dlb_event_enqueue_delayed;
1046 		dlb->event_dev->enqueue_burst =
1047 			dlb_event_enqueue_burst_delayed;
1048 		dlb->event_dev->enqueue_new_burst =
1049 			dlb_event_enqueue_new_burst_delayed;
1050 		dlb->event_dev->enqueue_forward_burst =
1051 			dlb_event_enqueue_forward_burst_delayed;
1052 	}
1053 
1054 	qm_port->owed_tokens = 0;
1055 	qm_port->issued_releases = 0;
1056 
1057 	/* update state */
1058 	qm_port->state = PORT_STARTED; /* enabled at create time */
1059 	qm_port->config_state = DLB_CONFIGURED;
1060 
1061 	qm_port->dir_credits = cfg.dir_credit_high_watermark;
1062 	qm_port->ldb_credits = cfg.ldb_credit_high_watermark;
1063 
1064 	DLB_LOG_DBG("dlb: created ldb port %d, depth = %d, ldb credits=%d, dir credits=%d\n",
1065 		    qm_port_id,
1066 		    cq_depth,
1067 		    qm_port->ldb_credits,
1068 		    qm_port->dir_credits);
1069 
1070 	rte_spinlock_unlock(&handle->resource_lock);
1071 
1072 	return 0;
1073 
1074 error_exit:
1075 	if (qm_port) {
1076 		dlb_free_qe_mem(qm_port);
1077 		qm_port->pp_mmio_base = 0;
1078 	}
1079 
1080 	rte_spinlock_unlock(&handle->resource_lock);
1081 
1082 	DLB_LOG_ERR("dlb: create ldb port failed!\n");
1083 
1084 	return ret;
1085 }
1086 
1087 static int
dlb_hw_create_dir_port(struct dlb_eventdev * dlb,struct dlb_eventdev_port * ev_port,uint32_t dequeue_depth,uint32_t cq_depth,uint32_t enqueue_depth,uint16_t rsvd_tokens,bool use_rsvd_token_scheme)1088 dlb_hw_create_dir_port(struct dlb_eventdev *dlb,
1089 		       struct dlb_eventdev_port *ev_port,
1090 		       uint32_t dequeue_depth,
1091 		       uint32_t cq_depth,
1092 		       uint32_t enqueue_depth,
1093 		       uint16_t rsvd_tokens,
1094 		       bool use_rsvd_token_scheme)
1095 {
1096 	struct dlb_hw_dev *handle = &dlb->qm_instance;
1097 	struct dlb_create_dir_port_args cfg = {0};
1098 	struct dlb_cmd_response response = {0};
1099 	int ret;
1100 	struct dlb_port *qm_port = NULL;
1101 	char mz_name[RTE_MEMZONE_NAMESIZE];
1102 	uint32_t qm_port_id;
1103 
1104 	if (dlb == NULL || handle == NULL)
1105 		return -EINVAL;
1106 
1107 	if (cq_depth < DLB_MIN_DIR_CQ_DEPTH) {
1108 		DLB_LOG_ERR("dlb: invalid cq_depth, must be at least %d\n",
1109 			    DLB_MIN_DIR_CQ_DEPTH);
1110 		return -EINVAL;
1111 	}
1112 
1113 	if (enqueue_depth < DLB_MIN_ENQUEUE_DEPTH) {
1114 		DLB_LOG_ERR("dlb: invalid enqueue_depth, must be at least %d\n",
1115 			    DLB_MIN_ENQUEUE_DEPTH);
1116 		return -EINVAL;
1117 	}
1118 
1119 	rte_spinlock_lock(&handle->resource_lock);
1120 
1121 	/* Directed queues are configured at link time. */
1122 	cfg.queue_id = -1;
1123 
1124 	cfg.response = (uintptr_t)&response;
1125 
1126 	/* We round up to the next power of 2 if necessary */
1127 	cfg.cq_depth = rte_align32pow2(cq_depth);
1128 	cfg.cq_depth_threshold = rsvd_tokens;
1129 
1130 	/* User controls the LDB high watermark via enqueue depth. The DIR high
1131 	 * watermark is equal, unless the directed credit pool is too small.
1132 	 */
1133 	cfg.ldb_credit_high_watermark = enqueue_depth;
1134 
1135 	/* Don't use enqueue_depth if it would require more directed credits
1136 	 * than are available.
1137 	 */
1138 	cfg.dir_credit_high_watermark =
1139 		RTE_MIN(enqueue_depth,
1140 			handle->cfg.num_dir_credits / dlb->num_ports);
1141 
1142 	cfg.ldb_credit_quantum = cfg.ldb_credit_high_watermark / 2;
1143 	cfg.ldb_credit_low_watermark = RTE_MIN(16, cfg.ldb_credit_quantum);
1144 
1145 	cfg.dir_credit_quantum = cfg.dir_credit_high_watermark / 2;
1146 	cfg.dir_credit_low_watermark = RTE_MIN(16, cfg.dir_credit_quantum);
1147 
1148 	/* Per QM values */
1149 
1150 	cfg.ldb_credit_pool_id = handle->cfg.ldb_credit_pool_id;
1151 	cfg.dir_credit_pool_id = handle->cfg.dir_credit_pool_id;
1152 
1153 	ret = dlb_iface_dir_port_create(handle, &cfg, dlb->poll_mode);
1154 	if (ret < 0) {
1155 		DLB_LOG_ERR("dlb: dlb_dir_port_create error, ret=%d (driver status: %s)\n",
1156 			    ret, dlb_error_strings[response.status]);
1157 		goto error_exit;
1158 	}
1159 
1160 	qm_port_id = response.id;
1161 
1162 	DLB_LOG_DBG("dlb: ev_port %d uses qm DIR port %d <<<<<\n",
1163 		    ev_port->id, qm_port_id);
1164 
1165 	qm_port = &ev_port->qm_port;
1166 	qm_port->ev_port = ev_port; /* back ptr */
1167 	qm_port->dlb = dlb;  /* back ptr */
1168 
1169 	/*
1170 	 * Init local qe struct(s).
1171 	 * Note: MOVDIR64 requires the enqueue QE to be aligned
1172 	 */
1173 
1174 	snprintf(mz_name, sizeof(mz_name), "dir_port%d",
1175 		 ev_port->id);
1176 
1177 	ret = dlb_init_qe_mem(qm_port, mz_name);
1178 
1179 	if (ret < 0) {
1180 		DLB_LOG_ERR("dlb: init_qe_mem failed, ret=%d\n", ret);
1181 		goto error_exit;
1182 	}
1183 
1184 	qm_port->pp_mmio_base = DLB_DIR_PP_BASE + PAGE_SIZE * qm_port_id;
1185 	qm_port->id = qm_port_id;
1186 
1187 	/* The credit window is one high water mark of QEs */
1188 	qm_port->ldb_pushcount_at_credit_expiry = 0;
1189 	qm_port->cached_ldb_credits = cfg.ldb_credit_high_watermark;
1190 	/* The credit window is one high water mark of QEs */
1191 	qm_port->dir_pushcount_at_credit_expiry = 0;
1192 	qm_port->cached_dir_credits = cfg.dir_credit_high_watermark;
1193 	qm_port->cq_depth = cfg.cq_depth;
1194 	qm_port->cq_idx = 0;
1195 	qm_port->cq_idx_unmasked = 0;
1196 	if (dlb->poll_mode == DLB_CQ_POLL_MODE_SPARSE)
1197 		qm_port->cq_depth_mask = (cfg.cq_depth * 4) - 1;
1198 	else
1199 		qm_port->cq_depth_mask = cfg.cq_depth - 1;
1200 
1201 	qm_port->gen_bit_shift = __builtin_popcount(qm_port->cq_depth_mask);
1202 	/* starting value of gen bit - it toggles at wrap time */
1203 	qm_port->gen_bit = 1;
1204 
1205 	qm_port->use_rsvd_token_scheme = use_rsvd_token_scheme;
1206 	qm_port->cq_rsvd_token_deficit = rsvd_tokens;
1207 	qm_port->int_armed = false;
1208 
1209 	/* Save off for later use in info and lookup APIs. */
1210 	qm_port->qid_mappings = &dlb->qm_dir_to_ev_queue_id[0];
1211 
1212 	qm_port->dequeue_depth = dequeue_depth;
1213 
1214 	/* Directed ports are auto-pop, by default. */
1215 	qm_port->token_pop_mode = AUTO_POP;
1216 	qm_port->owed_tokens = 0;
1217 	qm_port->issued_releases = 0;
1218 
1219 	/* update state */
1220 	qm_port->state = PORT_STARTED; /* enabled at create time */
1221 	qm_port->config_state = DLB_CONFIGURED;
1222 
1223 	qm_port->dir_credits = cfg.dir_credit_high_watermark;
1224 	qm_port->ldb_credits = cfg.ldb_credit_high_watermark;
1225 
1226 	DLB_LOG_DBG("dlb: created dir port %d, depth = %d cr=%d,%d\n",
1227 		    qm_port_id,
1228 		    cq_depth,
1229 		    cfg.dir_credit_high_watermark,
1230 		    cfg.ldb_credit_high_watermark);
1231 
1232 	rte_spinlock_unlock(&handle->resource_lock);
1233 
1234 	return 0;
1235 
1236 error_exit:
1237 	if (qm_port) {
1238 		qm_port->pp_mmio_base = 0;
1239 		dlb_free_qe_mem(qm_port);
1240 	}
1241 
1242 	rte_spinlock_unlock(&handle->resource_lock);
1243 
1244 	DLB_LOG_ERR("dlb: create dir port failed!\n");
1245 
1246 	return ret;
1247 }
1248 
1249 static int32_t
dlb_hw_create_ldb_queue(struct dlb_eventdev * dlb,struct dlb_queue * queue,const struct rte_event_queue_conf * evq_conf)1250 dlb_hw_create_ldb_queue(struct dlb_eventdev *dlb,
1251 			struct dlb_queue *queue,
1252 			const struct rte_event_queue_conf *evq_conf)
1253 {
1254 	struct dlb_hw_dev *handle = &dlb->qm_instance;
1255 	struct dlb_create_ldb_queue_args cfg;
1256 	struct dlb_cmd_response response;
1257 	int32_t ret;
1258 	uint32_t qm_qid;
1259 	int sched_type = -1;
1260 
1261 	if (evq_conf == NULL)
1262 		return -EINVAL;
1263 
1264 	if (evq_conf->event_queue_cfg & RTE_EVENT_QUEUE_CFG_ALL_TYPES) {
1265 		if (evq_conf->nb_atomic_order_sequences != 0)
1266 			sched_type = RTE_SCHED_TYPE_ORDERED;
1267 		else
1268 			sched_type = RTE_SCHED_TYPE_PARALLEL;
1269 	} else
1270 		sched_type = evq_conf->schedule_type;
1271 
1272 	cfg.response = (uintptr_t)&response;
1273 	cfg.num_atomic_inflights = dlb->num_atm_inflights_per_queue;
1274 	cfg.num_sequence_numbers = evq_conf->nb_atomic_order_sequences;
1275 	cfg.num_qid_inflights = evq_conf->nb_atomic_order_sequences;
1276 
1277 	if (sched_type != RTE_SCHED_TYPE_ORDERED) {
1278 		cfg.num_sequence_numbers = 0;
1279 		cfg.num_qid_inflights = DLB_DEF_UNORDERED_QID_INFLIGHTS;
1280 	}
1281 
1282 	ret = dlb_iface_ldb_queue_create(handle, &cfg);
1283 	if (ret < 0) {
1284 		DLB_LOG_ERR("dlb: create LB event queue error, ret=%d (driver status: %s)\n",
1285 			    ret, dlb_error_strings[response.status]);
1286 		return -EINVAL;
1287 	}
1288 
1289 	qm_qid = response.id;
1290 
1291 	/* Save off queue config for debug, resource lookups, and reconfig */
1292 	queue->num_qid_inflights = cfg.num_qid_inflights;
1293 	queue->num_atm_inflights = cfg.num_atomic_inflights;
1294 
1295 	queue->sched_type = sched_type;
1296 	queue->config_state = DLB_CONFIGURED;
1297 
1298 	DLB_LOG_DBG("Created LB event queue %d, nb_inflights=%d, nb_seq=%d, qid inflights=%d\n",
1299 		    qm_qid,
1300 		    cfg.num_atomic_inflights,
1301 		    cfg.num_sequence_numbers,
1302 		    cfg.num_qid_inflights);
1303 
1304 	return qm_qid;
1305 }
1306 
1307 static int32_t
dlb_get_sn_allocation(struct dlb_eventdev * dlb,int group)1308 dlb_get_sn_allocation(struct dlb_eventdev *dlb, int group)
1309 {
1310 	struct dlb_hw_dev *handle = &dlb->qm_instance;
1311 	struct dlb_get_sn_allocation_args cfg;
1312 	struct dlb_cmd_response response;
1313 	int ret;
1314 
1315 	cfg.group = group;
1316 	cfg.response = (uintptr_t)&response;
1317 
1318 	ret = dlb_iface_get_sn_allocation(handle, &cfg);
1319 	if (ret < 0) {
1320 		DLB_LOG_ERR("dlb: get_sn_allocation ret=%d (driver status: %s)\n",
1321 			    ret, dlb_error_strings[response.status]);
1322 		return ret;
1323 	}
1324 
1325 	return response.id;
1326 }
1327 
1328 static int
dlb_set_sn_allocation(struct dlb_eventdev * dlb,int group,int num)1329 dlb_set_sn_allocation(struct dlb_eventdev *dlb, int group, int num)
1330 {
1331 	struct dlb_hw_dev *handle = &dlb->qm_instance;
1332 	struct dlb_set_sn_allocation_args cfg;
1333 	struct dlb_cmd_response response;
1334 	int ret;
1335 
1336 	cfg.num = num;
1337 	cfg.group = group;
1338 	cfg.response = (uintptr_t)&response;
1339 
1340 	ret = dlb_iface_set_sn_allocation(handle, &cfg);
1341 	if (ret < 0) {
1342 		DLB_LOG_ERR("dlb: set_sn_allocation ret=%d (driver status: %s)\n",
1343 			    ret, dlb_error_strings[response.status]);
1344 		return ret;
1345 	}
1346 
1347 	return ret;
1348 }
1349 
1350 static int32_t
dlb_get_sn_occupancy(struct dlb_eventdev * dlb,int group)1351 dlb_get_sn_occupancy(struct dlb_eventdev *dlb, int group)
1352 {
1353 	struct dlb_hw_dev *handle = &dlb->qm_instance;
1354 	struct dlb_get_sn_occupancy_args cfg;
1355 	struct dlb_cmd_response response;
1356 	int ret;
1357 
1358 	cfg.group = group;
1359 	cfg.response = (uintptr_t)&response;
1360 
1361 	ret = dlb_iface_get_sn_occupancy(handle, &cfg);
1362 	if (ret < 0) {
1363 		DLB_LOG_ERR("dlb: get_sn_occupancy ret=%d (driver status: %s)\n",
1364 			    ret, dlb_error_strings[response.status]);
1365 		return ret;
1366 	}
1367 
1368 	return response.id;
1369 }
1370 
1371 /* Query the current sequence number allocations and, if they conflict with the
1372  * requested LDB queue configuration, attempt to re-allocate sequence numbers.
1373  * This is best-effort; if it fails, the PMD will attempt to configure the
1374  * load-balanced queue and return an error.
1375  */
1376 static void
dlb_program_sn_allocation(struct dlb_eventdev * dlb,const struct rte_event_queue_conf * queue_conf)1377 dlb_program_sn_allocation(struct dlb_eventdev *dlb,
1378 			  const struct rte_event_queue_conf *queue_conf)
1379 {
1380 	int grp_occupancy[DLB_NUM_SN_GROUPS];
1381 	int grp_alloc[DLB_NUM_SN_GROUPS];
1382 	int i, sequence_numbers;
1383 
1384 	sequence_numbers = (int)queue_conf->nb_atomic_order_sequences;
1385 
1386 	for (i = 0; i < DLB_NUM_SN_GROUPS; i++) {
1387 		int total_slots;
1388 
1389 		grp_alloc[i] = dlb_get_sn_allocation(dlb, i);
1390 		if (grp_alloc[i] < 0)
1391 			return;
1392 
1393 		total_slots = DLB_MAX_LDB_SN_ALLOC / grp_alloc[i];
1394 
1395 		grp_occupancy[i] = dlb_get_sn_occupancy(dlb, i);
1396 		if (grp_occupancy[i] < 0)
1397 			return;
1398 
1399 		/* DLB has at least one available slot for the requested
1400 		 * sequence numbers, so no further configuration required.
1401 		 */
1402 		if (grp_alloc[i] == sequence_numbers &&
1403 		    grp_occupancy[i] < total_slots)
1404 			return;
1405 	}
1406 
1407 	/* None of the sequence number groups are configured for the requested
1408 	 * sequence numbers, so we have to reconfigure one of them. This is
1409 	 * only possible if a group is not in use.
1410 	 */
1411 	for (i = 0; i < DLB_NUM_SN_GROUPS; i++) {
1412 		if (grp_occupancy[i] == 0)
1413 			break;
1414 	}
1415 
1416 	if (i == DLB_NUM_SN_GROUPS) {
1417 		DLB_LOG_ERR("[%s()] No groups with %d sequence_numbers are available or have free slots\n",
1418 		       __func__, sequence_numbers);
1419 		return;
1420 	}
1421 
1422 	/* Attempt to configure slot i with the requested number of sequence
1423 	 * numbers. Ignore the return value -- if this fails, the error will be
1424 	 * caught during subsequent queue configuration.
1425 	 */
1426 	dlb_set_sn_allocation(dlb, i, sequence_numbers);
1427 }
1428 
1429 static int
dlb_eventdev_ldb_queue_setup(struct rte_eventdev * dev,struct dlb_eventdev_queue * ev_queue,const struct rte_event_queue_conf * queue_conf)1430 dlb_eventdev_ldb_queue_setup(struct rte_eventdev *dev,
1431 			     struct dlb_eventdev_queue *ev_queue,
1432 			     const struct rte_event_queue_conf *queue_conf)
1433 {
1434 	struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1435 	int32_t qm_qid;
1436 
1437 	if (queue_conf->nb_atomic_order_sequences)
1438 		dlb_program_sn_allocation(dlb, queue_conf);
1439 
1440 	qm_qid = dlb_hw_create_ldb_queue(dlb,
1441 					 &ev_queue->qm_queue,
1442 					 queue_conf);
1443 	if (qm_qid < 0) {
1444 		DLB_LOG_ERR("Failed to create the load-balanced queue\n");
1445 
1446 		return qm_qid;
1447 	}
1448 
1449 	dlb->qm_ldb_to_ev_queue_id[qm_qid] = ev_queue->id;
1450 
1451 	ev_queue->qm_queue.id = qm_qid;
1452 
1453 	return 0;
1454 }
1455 
dlb_num_dir_queues_setup(struct dlb_eventdev * dlb)1456 static int dlb_num_dir_queues_setup(struct dlb_eventdev *dlb)
1457 {
1458 	int i, num = 0;
1459 
1460 	for (i = 0; i < dlb->num_queues; i++) {
1461 		if (dlb->ev_queues[i].setup_done &&
1462 		    dlb->ev_queues[i].qm_queue.is_directed)
1463 			num++;
1464 	}
1465 
1466 	return num;
1467 }
1468 
1469 static void
dlb_queue_link_teardown(struct dlb_eventdev * dlb,struct dlb_eventdev_queue * ev_queue)1470 dlb_queue_link_teardown(struct dlb_eventdev *dlb,
1471 			struct dlb_eventdev_queue *ev_queue)
1472 {
1473 	struct dlb_eventdev_port *ev_port;
1474 	int i, j;
1475 
1476 	for (i = 0; i < dlb->num_ports; i++) {
1477 		ev_port = &dlb->ev_ports[i];
1478 
1479 		for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++) {
1480 			if (!ev_port->link[j].valid ||
1481 			    ev_port->link[j].queue_id != ev_queue->id)
1482 				continue;
1483 
1484 			ev_port->link[j].valid = false;
1485 			ev_port->num_links--;
1486 		}
1487 	}
1488 
1489 	ev_queue->num_links = 0;
1490 }
1491 
1492 static int
dlb_eventdev_queue_setup(struct rte_eventdev * dev,uint8_t ev_qid,const struct rte_event_queue_conf * queue_conf)1493 dlb_eventdev_queue_setup(struct rte_eventdev *dev,
1494 			 uint8_t ev_qid,
1495 			 const struct rte_event_queue_conf *queue_conf)
1496 {
1497 	struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1498 	struct dlb_eventdev_queue *ev_queue;
1499 	int ret;
1500 
1501 	if (queue_conf == NULL)
1502 		return -EINVAL;
1503 
1504 	if (ev_qid >= dlb->num_queues)
1505 		return -EINVAL;
1506 
1507 	ev_queue = &dlb->ev_queues[ev_qid];
1508 
1509 	ev_queue->qm_queue.is_directed = queue_conf->event_queue_cfg &
1510 		RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
1511 	ev_queue->id = ev_qid;
1512 	ev_queue->conf = *queue_conf;
1513 
1514 	if (!ev_queue->qm_queue.is_directed) {
1515 		ret = dlb_eventdev_ldb_queue_setup(dev, ev_queue, queue_conf);
1516 	} else {
1517 		/* The directed queue isn't setup until link time, at which
1518 		 * point we know its directed port ID. Directed queue setup
1519 		 * will only fail if this queue is already setup or there are
1520 		 * no directed queues left to configure.
1521 		 */
1522 		ret = 0;
1523 
1524 		ev_queue->qm_queue.config_state = DLB_NOT_CONFIGURED;
1525 
1526 		if (ev_queue->setup_done ||
1527 		    dlb_num_dir_queues_setup(dlb) == dlb->num_dir_queues)
1528 			ret = -EINVAL;
1529 	}
1530 
1531 	/* Tear down pre-existing port->queue links */
1532 	if (!ret && dlb->run_state == DLB_RUN_STATE_STOPPED)
1533 		dlb_queue_link_teardown(dlb, ev_queue);
1534 
1535 	if (!ret)
1536 		ev_queue->setup_done = true;
1537 
1538 	return ret;
1539 }
1540 
1541 static void
dlb_port_link_teardown(struct dlb_eventdev * dlb,struct dlb_eventdev_port * ev_port)1542 dlb_port_link_teardown(struct dlb_eventdev *dlb,
1543 		       struct dlb_eventdev_port *ev_port)
1544 {
1545 	struct dlb_eventdev_queue *ev_queue;
1546 	int i;
1547 
1548 	for (i = 0; i < DLB_MAX_NUM_QIDS_PER_LDB_CQ; i++) {
1549 		if (!ev_port->link[i].valid)
1550 			continue;
1551 
1552 		ev_queue = &dlb->ev_queues[ev_port->link[i].queue_id];
1553 
1554 		ev_port->link[i].valid = false;
1555 		ev_port->num_links--;
1556 		ev_queue->num_links--;
1557 	}
1558 }
1559 
1560 static int
dlb_eventdev_port_setup(struct rte_eventdev * dev,uint8_t ev_port_id,const struct rte_event_port_conf * port_conf)1561 dlb_eventdev_port_setup(struct rte_eventdev *dev,
1562 			uint8_t ev_port_id,
1563 			const struct rte_event_port_conf *port_conf)
1564 {
1565 	struct dlb_eventdev *dlb;
1566 	struct dlb_eventdev_port *ev_port;
1567 	bool use_rsvd_token_scheme;
1568 	uint32_t adj_cq_depth;
1569 	uint16_t rsvd_tokens;
1570 	int ret;
1571 
1572 	if (dev == NULL || port_conf == NULL) {
1573 		DLB_LOG_ERR("Null parameter\n");
1574 		return -EINVAL;
1575 	}
1576 
1577 	dlb = dlb_pmd_priv(dev);
1578 
1579 	if (ev_port_id >= DLB_MAX_NUM_PORTS)
1580 		return -EINVAL;
1581 
1582 	if (port_conf->dequeue_depth >
1583 		evdev_dlb_default_info.max_event_port_dequeue_depth ||
1584 	    port_conf->enqueue_depth >
1585 		evdev_dlb_default_info.max_event_port_enqueue_depth)
1586 		return -EINVAL;
1587 
1588 	ev_port = &dlb->ev_ports[ev_port_id];
1589 	/* configured? */
1590 	if (ev_port->setup_done) {
1591 		DLB_LOG_ERR("evport %d is already configured\n", ev_port_id);
1592 		return -EINVAL;
1593 	}
1594 
1595 	/* The reserved token interrupt arming scheme requires that one or more
1596 	 * CQ tokens be reserved by the PMD. This limits the amount of CQ space
1597 	 * usable by the DLB, so in order to give an *effective* CQ depth equal
1598 	 * to the user-requested value, we double CQ depth and reserve half of
1599 	 * its tokens. If the user requests the max CQ depth (256) then we
1600 	 * cannot double it, so we reserve one token and give an effective
1601 	 * depth of 255 entries.
1602 	 */
1603 	use_rsvd_token_scheme = true;
1604 	rsvd_tokens = 1;
1605 	adj_cq_depth = port_conf->dequeue_depth;
1606 
1607 	if (use_rsvd_token_scheme && adj_cq_depth < 256) {
1608 		rsvd_tokens = adj_cq_depth;
1609 		adj_cq_depth *= 2;
1610 	}
1611 
1612 	ev_port->qm_port.is_directed = port_conf->event_port_cfg &
1613 		RTE_EVENT_PORT_CFG_SINGLE_LINK;
1614 
1615 	if (!ev_port->qm_port.is_directed) {
1616 		ret = dlb_hw_create_ldb_port(dlb,
1617 					     ev_port,
1618 					     port_conf->dequeue_depth,
1619 					     adj_cq_depth,
1620 					     port_conf->enqueue_depth,
1621 					     rsvd_tokens,
1622 					     use_rsvd_token_scheme);
1623 		if (ret < 0) {
1624 			DLB_LOG_ERR("Failed to create the lB port ve portId=%d\n",
1625 				    ev_port_id);
1626 			return ret;
1627 		}
1628 	} else {
1629 		ret = dlb_hw_create_dir_port(dlb,
1630 					     ev_port,
1631 					     port_conf->dequeue_depth,
1632 					     adj_cq_depth,
1633 					     port_conf->enqueue_depth,
1634 					     rsvd_tokens,
1635 					     use_rsvd_token_scheme);
1636 		if (ret < 0) {
1637 			DLB_LOG_ERR("Failed to create the DIR port\n");
1638 			return ret;
1639 		}
1640 	}
1641 
1642 	/* Save off port config for reconfig */
1643 	dlb->ev_ports[ev_port_id].conf = *port_conf;
1644 
1645 	dlb->ev_ports[ev_port_id].id = ev_port_id;
1646 	dlb->ev_ports[ev_port_id].enq_configured = true;
1647 	dlb->ev_ports[ev_port_id].setup_done = true;
1648 	dlb->ev_ports[ev_port_id].inflight_max =
1649 		port_conf->new_event_threshold;
1650 	dlb->ev_ports[ev_port_id].implicit_release =
1651 		!(port_conf->event_port_cfg &
1652 		  RTE_EVENT_PORT_CFG_DISABLE_IMPL_REL);
1653 	dlb->ev_ports[ev_port_id].outstanding_releases = 0;
1654 	dlb->ev_ports[ev_port_id].inflight_credits = 0;
1655 	dlb->ev_ports[ev_port_id].credit_update_quanta =
1656 		RTE_LIBRTE_PMD_DLB_SW_CREDIT_QUANTA;
1657 	dlb->ev_ports[ev_port_id].dlb = dlb; /* reverse link */
1658 
1659 	/* Tear down pre-existing port->queue links */
1660 	if (dlb->run_state == DLB_RUN_STATE_STOPPED)
1661 		dlb_port_link_teardown(dlb, &dlb->ev_ports[ev_port_id]);
1662 
1663 	dev->data->ports[ev_port_id] = &dlb->ev_ports[ev_port_id];
1664 
1665 	return 0;
1666 }
1667 
1668 static int
dlb_eventdev_reapply_configuration(struct rte_eventdev * dev)1669 dlb_eventdev_reapply_configuration(struct rte_eventdev *dev)
1670 {
1671 	struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1672 	int ret, i;
1673 
1674 	/* If an event queue or port was previously configured, but hasn't been
1675 	 * reconfigured, reapply its original configuration.
1676 	 */
1677 	for (i = 0; i < dlb->num_queues; i++) {
1678 		struct dlb_eventdev_queue *ev_queue;
1679 
1680 		ev_queue = &dlb->ev_queues[i];
1681 
1682 		if (ev_queue->qm_queue.config_state != DLB_PREV_CONFIGURED)
1683 			continue;
1684 
1685 		ret = dlb_eventdev_queue_setup(dev, i, &ev_queue->conf);
1686 		if (ret < 0) {
1687 			DLB_LOG_ERR("dlb: failed to reconfigure queue %d", i);
1688 			return ret;
1689 		}
1690 	}
1691 
1692 	for (i = 0; i < dlb->num_ports; i++) {
1693 		struct dlb_eventdev_port *ev_port = &dlb->ev_ports[i];
1694 
1695 		if (ev_port->qm_port.config_state != DLB_PREV_CONFIGURED)
1696 			continue;
1697 
1698 		ret = dlb_eventdev_port_setup(dev, i, &ev_port->conf);
1699 		if (ret < 0) {
1700 			DLB_LOG_ERR("dlb: failed to reconfigure ev_port %d",
1701 				    i);
1702 			return ret;
1703 		}
1704 	}
1705 
1706 	return 0;
1707 }
1708 
1709 static int
set_dev_id(const char * key __rte_unused,const char * value,void * opaque)1710 set_dev_id(const char *key __rte_unused,
1711 	   const char *value,
1712 	   void *opaque)
1713 {
1714 	int *dev_id = opaque;
1715 	int ret;
1716 
1717 	if (value == NULL || opaque == NULL) {
1718 		DLB_LOG_ERR("NULL pointer\n");
1719 		return -EINVAL;
1720 	}
1721 
1722 	ret = dlb_string_to_int(dev_id, value);
1723 	if (ret < 0)
1724 		return ret;
1725 
1726 	return 0;
1727 }
1728 
1729 static int
set_defer_sched(const char * key __rte_unused,const char * value,void * opaque)1730 set_defer_sched(const char *key __rte_unused,
1731 		const char *value,
1732 		void *opaque)
1733 {
1734 	int *defer_sched = opaque;
1735 
1736 	if (value == NULL || opaque == NULL) {
1737 		DLB_LOG_ERR("NULL pointer\n");
1738 		return -EINVAL;
1739 	}
1740 
1741 	if (strncmp(value, "on", 2) != 0) {
1742 		DLB_LOG_ERR("Invalid defer_sched argument \"%s\" (expected \"on\")\n",
1743 			    value);
1744 		return -EINVAL;
1745 	}
1746 
1747 	*defer_sched = 1;
1748 
1749 	return 0;
1750 }
1751 
1752 static int
set_num_atm_inflights(const char * key __rte_unused,const char * value,void * opaque)1753 set_num_atm_inflights(const char *key __rte_unused,
1754 		      const char *value,
1755 		      void *opaque)
1756 {
1757 	int *num_atm_inflights = opaque;
1758 	int ret;
1759 
1760 	if (value == NULL || opaque == NULL) {
1761 		DLB_LOG_ERR("NULL pointer\n");
1762 		return -EINVAL;
1763 	}
1764 
1765 	ret = dlb_string_to_int(num_atm_inflights, value);
1766 	if (ret < 0)
1767 		return ret;
1768 
1769 	if (*num_atm_inflights < 0 ||
1770 	    *num_atm_inflights > DLB_MAX_NUM_ATM_INFLIGHTS) {
1771 		DLB_LOG_ERR("dlb: atm_inflights must be between 0 and %d\n",
1772 			    DLB_MAX_NUM_ATM_INFLIGHTS);
1773 		return -EINVAL;
1774 	}
1775 
1776 	return 0;
1777 }
1778 
1779 static int
dlb_validate_port_link(struct dlb_eventdev_port * ev_port,uint8_t queue_id,bool link_exists,int index)1780 dlb_validate_port_link(struct dlb_eventdev_port *ev_port,
1781 		       uint8_t queue_id,
1782 		       bool link_exists,
1783 		       int index)
1784 {
1785 	struct dlb_eventdev *dlb = ev_port->dlb;
1786 	struct dlb_eventdev_queue *ev_queue;
1787 	bool port_is_dir, queue_is_dir;
1788 
1789 	if (queue_id > dlb->num_queues) {
1790 		DLB_LOG_ERR("queue_id %d > num queues %d\n",
1791 			    queue_id, dlb->num_queues);
1792 		rte_errno = -EINVAL;
1793 		return -1;
1794 	}
1795 
1796 	ev_queue = &dlb->ev_queues[queue_id];
1797 
1798 	if (!ev_queue->setup_done &&
1799 	    ev_queue->qm_queue.config_state != DLB_PREV_CONFIGURED) {
1800 		DLB_LOG_ERR("setup not done and not previously configured\n");
1801 		rte_errno = -EINVAL;
1802 		return -1;
1803 	}
1804 
1805 	port_is_dir = ev_port->qm_port.is_directed;
1806 	queue_is_dir = ev_queue->qm_queue.is_directed;
1807 
1808 	if (port_is_dir != queue_is_dir) {
1809 		DLB_LOG_ERR("%s queue %u can't link to %s port %u\n",
1810 			    queue_is_dir ? "DIR" : "LDB", ev_queue->id,
1811 			    port_is_dir ? "DIR" : "LDB", ev_port->id);
1812 
1813 		rte_errno = -EINVAL;
1814 		return -1;
1815 	}
1816 
1817 	/* Check if there is space for the requested link */
1818 	if (!link_exists && index == -1) {
1819 		DLB_LOG_ERR("no space for new link\n");
1820 		rte_errno = -ENOSPC;
1821 		return -1;
1822 	}
1823 
1824 	/* Check if the directed port is already linked */
1825 	if (ev_port->qm_port.is_directed && ev_port->num_links > 0 &&
1826 	    !link_exists) {
1827 		DLB_LOG_ERR("Can't link DIR port %d to >1 queues\n",
1828 			    ev_port->id);
1829 		rte_errno = -EINVAL;
1830 		return -1;
1831 	}
1832 
1833 	/* Check if the directed queue is already linked */
1834 	if (ev_queue->qm_queue.is_directed && ev_queue->num_links > 0 &&
1835 	    !link_exists) {
1836 		DLB_LOG_ERR("Can't link DIR queue %d to >1 ports\n",
1837 			    ev_queue->id);
1838 		rte_errno = -EINVAL;
1839 		return -1;
1840 	}
1841 
1842 	return 0;
1843 }
1844 
1845 static int32_t
dlb_hw_create_dir_queue(struct dlb_eventdev * dlb,int32_t qm_port_id)1846 dlb_hw_create_dir_queue(struct dlb_eventdev *dlb, int32_t qm_port_id)
1847 {
1848 	struct dlb_hw_dev *handle = &dlb->qm_instance;
1849 	struct dlb_create_dir_queue_args cfg;
1850 	struct dlb_cmd_response response;
1851 	int32_t ret;
1852 
1853 	cfg.response = (uintptr_t)&response;
1854 
1855 	/* The directed port is always configured before its queue */
1856 	cfg.port_id = qm_port_id;
1857 
1858 	ret = dlb_iface_dir_queue_create(handle, &cfg);
1859 	if (ret < 0) {
1860 		DLB_LOG_ERR("dlb: create DIR event queue error, ret=%d (driver status: %s)\n",
1861 			    ret, dlb_error_strings[response.status]);
1862 		return -EINVAL;
1863 	}
1864 
1865 	return response.id;
1866 }
1867 
1868 static int
dlb_eventdev_dir_queue_setup(struct dlb_eventdev * dlb,struct dlb_eventdev_queue * ev_queue,struct dlb_eventdev_port * ev_port)1869 dlb_eventdev_dir_queue_setup(struct dlb_eventdev *dlb,
1870 			     struct dlb_eventdev_queue *ev_queue,
1871 			     struct dlb_eventdev_port *ev_port)
1872 {
1873 	int32_t qm_qid;
1874 
1875 	qm_qid = dlb_hw_create_dir_queue(dlb, ev_port->qm_port.id);
1876 
1877 	if (qm_qid < 0) {
1878 		DLB_LOG_ERR("Failed to create the DIR queue\n");
1879 		return qm_qid;
1880 	}
1881 
1882 	dlb->qm_dir_to_ev_queue_id[qm_qid] = ev_queue->id;
1883 
1884 	ev_queue->qm_queue.id = qm_qid;
1885 
1886 	return 0;
1887 }
1888 
1889 static int16_t
dlb_hw_map_ldb_qid_to_port(struct dlb_hw_dev * handle,uint32_t qm_port_id,uint16_t qm_qid,uint8_t priority)1890 dlb_hw_map_ldb_qid_to_port(struct dlb_hw_dev *handle,
1891 			   uint32_t qm_port_id,
1892 			   uint16_t qm_qid,
1893 			   uint8_t priority)
1894 {
1895 	struct dlb_map_qid_args cfg;
1896 	struct dlb_cmd_response response;
1897 	int32_t ret;
1898 
1899 	if (handle == NULL)
1900 		return -EINVAL;
1901 
1902 	/* Build message */
1903 	cfg.response = (uintptr_t)&response;
1904 	cfg.port_id = qm_port_id;
1905 	cfg.qid = qm_qid;
1906 	cfg.priority = EV_TO_DLB_PRIO(priority);
1907 
1908 	ret = dlb_iface_map_qid(handle, &cfg);
1909 	if (ret < 0) {
1910 		DLB_LOG_ERR("dlb: map qid error, ret=%d (driver status: %s)\n",
1911 			    ret, dlb_error_strings[response.status]);
1912 		DLB_LOG_ERR("dlb: device_id=%d grp=%d, qm_port=%d, qm_qid=%d prio=%d\n",
1913 			    handle->device_id,
1914 			    handle->domain_id, cfg.port_id,
1915 			    cfg.qid,
1916 			    cfg.priority);
1917 	} else {
1918 		DLB_LOG_DBG("dlb: mapped queue %d to qm_port %d\n",
1919 			    qm_qid, qm_port_id);
1920 	}
1921 
1922 	return ret;
1923 }
1924 
1925 static int
dlb_event_queue_join_ldb(struct dlb_eventdev * dlb,struct dlb_eventdev_port * ev_port,struct dlb_eventdev_queue * ev_queue,uint8_t priority)1926 dlb_event_queue_join_ldb(struct dlb_eventdev *dlb,
1927 			 struct dlb_eventdev_port *ev_port,
1928 			 struct dlb_eventdev_queue *ev_queue,
1929 			 uint8_t priority)
1930 {
1931 	int first_avail = -1;
1932 	int ret, i;
1933 
1934 	for (i = 0; i < DLB_MAX_NUM_QIDS_PER_LDB_CQ; i++) {
1935 		if (ev_port->link[i].valid) {
1936 			if (ev_port->link[i].queue_id == ev_queue->id &&
1937 			    ev_port->link[i].priority == priority) {
1938 				if (ev_port->link[i].mapped)
1939 					return 0; /* already mapped */
1940 				first_avail = i;
1941 			}
1942 		} else {
1943 			if (first_avail == -1)
1944 				first_avail = i;
1945 		}
1946 	}
1947 	if (first_avail == -1) {
1948 		DLB_LOG_ERR("dlb: qm_port %d has no available QID slots.\n",
1949 			    ev_port->qm_port.id);
1950 		return -EINVAL;
1951 	}
1952 
1953 	ret = dlb_hw_map_ldb_qid_to_port(&dlb->qm_instance,
1954 					 ev_port->qm_port.id,
1955 					 ev_queue->qm_queue.id,
1956 					 priority);
1957 
1958 	if (!ret)
1959 		ev_port->link[first_avail].mapped = true;
1960 
1961 	return ret;
1962 }
1963 
1964 static int
dlb_do_port_link(struct rte_eventdev * dev,struct dlb_eventdev_queue * ev_queue,struct dlb_eventdev_port * ev_port,uint8_t prio)1965 dlb_do_port_link(struct rte_eventdev *dev,
1966 		 struct dlb_eventdev_queue *ev_queue,
1967 		 struct dlb_eventdev_port *ev_port,
1968 		 uint8_t prio)
1969 {
1970 	struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1971 	int err;
1972 
1973 	/* Don't link until start time. */
1974 	if (dlb->run_state == DLB_RUN_STATE_STOPPED)
1975 		return 0;
1976 
1977 	if (ev_queue->qm_queue.is_directed)
1978 		err = dlb_eventdev_dir_queue_setup(dlb, ev_queue, ev_port);
1979 	else
1980 		err = dlb_event_queue_join_ldb(dlb, ev_port, ev_queue, prio);
1981 
1982 	if (err) {
1983 		DLB_LOG_ERR("port link failure for %s ev_q %d, ev_port %d\n",
1984 			    ev_queue->qm_queue.is_directed ? "DIR" : "LDB",
1985 			    ev_queue->id, ev_port->id);
1986 
1987 		rte_errno = err;
1988 		return -1;
1989 	}
1990 
1991 	return 0;
1992 }
1993 
1994 static int
dlb_eventdev_apply_port_links(struct rte_eventdev * dev)1995 dlb_eventdev_apply_port_links(struct rte_eventdev *dev)
1996 {
1997 	struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1998 	int i;
1999 
2000 	/* Perform requested port->queue links */
2001 	for (i = 0; i < dlb->num_ports; i++) {
2002 		struct dlb_eventdev_port *ev_port = &dlb->ev_ports[i];
2003 		int j;
2004 
2005 		for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++) {
2006 			struct dlb_eventdev_queue *ev_queue;
2007 			uint8_t prio, queue_id;
2008 
2009 			if (!ev_port->link[j].valid)
2010 				continue;
2011 
2012 			prio = ev_port->link[j].priority;
2013 			queue_id = ev_port->link[j].queue_id;
2014 
2015 			if (dlb_validate_port_link(ev_port, queue_id, true, j))
2016 				return -EINVAL;
2017 
2018 			ev_queue = &dlb->ev_queues[queue_id];
2019 
2020 			if (dlb_do_port_link(dev, ev_queue, ev_port, prio))
2021 				return -EINVAL;
2022 		}
2023 	}
2024 
2025 	return 0;
2026 }
2027 
2028 static int
dlb_eventdev_port_link(struct rte_eventdev * dev,void * event_port,const uint8_t queues[],const uint8_t priorities[],uint16_t nb_links)2029 dlb_eventdev_port_link(struct rte_eventdev *dev, void *event_port,
2030 		       const uint8_t queues[], const uint8_t priorities[],
2031 		       uint16_t nb_links)
2032 
2033 {
2034 	struct dlb_eventdev_port *ev_port = event_port;
2035 	struct dlb_eventdev *dlb;
2036 	int i, j;
2037 
2038 	RTE_SET_USED(dev);
2039 
2040 	if (ev_port == NULL) {
2041 		DLB_LOG_ERR("dlb: evport not setup\n");
2042 		rte_errno = -EINVAL;
2043 		return 0;
2044 	}
2045 
2046 	if (!ev_port->setup_done &&
2047 	    ev_port->qm_port.config_state != DLB_PREV_CONFIGURED) {
2048 		DLB_LOG_ERR("dlb: evport not setup\n");
2049 		rte_errno = -EINVAL;
2050 		return 0;
2051 	}
2052 
2053 	/* Note: rte_event_port_link() ensures the PMD won't receive a NULL
2054 	 * queues pointer.
2055 	 */
2056 	if (nb_links == 0) {
2057 		DLB_LOG_DBG("dlb: nb_links is 0\n");
2058 		return 0; /* Ignore and return success */
2059 	}
2060 
2061 	dlb = ev_port->dlb;
2062 
2063 	DLB_LOG_DBG("Linking %u queues to %s port %d\n",
2064 		    nb_links,
2065 		    ev_port->qm_port.is_directed ? "DIR" : "LDB",
2066 		    ev_port->id);
2067 
2068 	for (i = 0; i < nb_links; i++) {
2069 		struct dlb_eventdev_queue *ev_queue;
2070 		uint8_t queue_id, prio;
2071 		bool found = false;
2072 		int index = -1;
2073 
2074 		queue_id = queues[i];
2075 		prio = priorities[i];
2076 
2077 		/* Check if the link already exists. */
2078 		for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++)
2079 			if (ev_port->link[j].valid) {
2080 				if (ev_port->link[j].queue_id == queue_id) {
2081 					found = true;
2082 					index = j;
2083 					break;
2084 				}
2085 			} else {
2086 				if (index == -1)
2087 					index = j;
2088 			}
2089 
2090 		/* could not link */
2091 		if (index == -1)
2092 			break;
2093 
2094 		/* Check if already linked at the requested priority */
2095 		if (found && ev_port->link[j].priority == prio)
2096 			continue;
2097 
2098 		if (dlb_validate_port_link(ev_port, queue_id, found, index))
2099 			break; /* return index of offending queue */
2100 
2101 		ev_queue = &dlb->ev_queues[queue_id];
2102 
2103 		if (dlb_do_port_link(dev, ev_queue, ev_port, prio))
2104 			break; /* return index of offending queue */
2105 
2106 		ev_queue->num_links++;
2107 
2108 		ev_port->link[index].queue_id = queue_id;
2109 		ev_port->link[index].priority = prio;
2110 		ev_port->link[index].valid = true;
2111 		/* Entry already exists?  If so, then must be prio change */
2112 		if (!found)
2113 			ev_port->num_links++;
2114 	}
2115 	return i;
2116 }
2117 
2118 static int
dlb_eventdev_start(struct rte_eventdev * dev)2119 dlb_eventdev_start(struct rte_eventdev *dev)
2120 {
2121 	struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
2122 	struct dlb_hw_dev *handle = &dlb->qm_instance;
2123 	struct dlb_start_domain_args cfg;
2124 	struct dlb_cmd_response response;
2125 	int ret, i;
2126 
2127 	rte_spinlock_lock(&dlb->qm_instance.resource_lock);
2128 	if (dlb->run_state != DLB_RUN_STATE_STOPPED) {
2129 		DLB_LOG_ERR("bad state %d for dev_start\n",
2130 			    (int)dlb->run_state);
2131 		rte_spinlock_unlock(&dlb->qm_instance.resource_lock);
2132 		return -EINVAL;
2133 	}
2134 	dlb->run_state	= DLB_RUN_STATE_STARTING;
2135 	rte_spinlock_unlock(&dlb->qm_instance.resource_lock);
2136 
2137 	/* If the device was configured more than once, some event ports and/or
2138 	 * queues may need to be reconfigured.
2139 	 */
2140 	ret = dlb_eventdev_reapply_configuration(dev);
2141 	if (ret)
2142 		return ret;
2143 
2144 	/* The DLB PMD delays port links until the device is started. */
2145 	ret = dlb_eventdev_apply_port_links(dev);
2146 	if (ret)
2147 		return ret;
2148 
2149 	cfg.response = (uintptr_t)&response;
2150 
2151 	for (i = 0; i < dlb->num_ports; i++) {
2152 		if (!dlb->ev_ports[i].setup_done) {
2153 			DLB_LOG_ERR("dlb: port %d not setup", i);
2154 			return -ESTALE;
2155 		}
2156 	}
2157 
2158 	for (i = 0; i < dlb->num_queues; i++) {
2159 		if (dlb->ev_queues[i].num_links == 0) {
2160 			DLB_LOG_ERR("dlb: queue %d is not linked", i);
2161 			return -ENOLINK;
2162 		}
2163 	}
2164 
2165 	ret = dlb_iface_sched_domain_start(handle, &cfg);
2166 	if (ret < 0) {
2167 		DLB_LOG_ERR("dlb: sched_domain_start ret=%d (driver status: %s)\n",
2168 			    ret, dlb_error_strings[response.status]);
2169 		return ret;
2170 	}
2171 
2172 	dlb->run_state = DLB_RUN_STATE_STARTED;
2173 	DLB_LOG_DBG("dlb: sched_domain_start completed OK\n");
2174 
2175 	return 0;
2176 }
2177 
2178 static inline int
dlb_check_enqueue_sw_credits(struct dlb_eventdev * dlb,struct dlb_eventdev_port * ev_port)2179 dlb_check_enqueue_sw_credits(struct dlb_eventdev *dlb,
2180 			     struct dlb_eventdev_port *ev_port)
2181 {
2182 	uint32_t sw_inflights = __atomic_load_n(&dlb->inflights,
2183 						__ATOMIC_SEQ_CST);
2184 	const int num = 1;
2185 
2186 	if (unlikely(ev_port->inflight_max < sw_inflights)) {
2187 		DLB_INC_STAT(ev_port->stats.traffic.tx_nospc_inflight_max, 1);
2188 		rte_errno = -ENOSPC;
2189 		return 1;
2190 	}
2191 
2192 	if (ev_port->inflight_credits < num) {
2193 		/* check if event enqueue brings ev_port over max threshold */
2194 		uint32_t credit_update_quanta = ev_port->credit_update_quanta;
2195 
2196 		if (sw_inflights + credit_update_quanta >
2197 		    dlb->new_event_limit) {
2198 			DLB_INC_STAT(
2199 				ev_port->stats.traffic.tx_nospc_new_event_limit,
2200 				1);
2201 			rte_errno = -ENOSPC;
2202 			return 1;
2203 		}
2204 
2205 		__atomic_fetch_add(&dlb->inflights, credit_update_quanta,
2206 				   __ATOMIC_SEQ_CST);
2207 		ev_port->inflight_credits += (credit_update_quanta);
2208 
2209 		if (ev_port->inflight_credits < num) {
2210 			DLB_INC_STAT(
2211 			    ev_port->stats.traffic.tx_nospc_inflight_credits,
2212 			    1);
2213 			rte_errno = -ENOSPC;
2214 			return 1;
2215 		}
2216 	}
2217 
2218 	return 0;
2219 }
2220 
2221 static inline void
dlb_replenish_sw_credits(struct dlb_eventdev * dlb,struct dlb_eventdev_port * ev_port)2222 dlb_replenish_sw_credits(struct dlb_eventdev *dlb,
2223 			 struct dlb_eventdev_port *ev_port)
2224 {
2225 	uint16_t quanta = ev_port->credit_update_quanta;
2226 
2227 	if (ev_port->inflight_credits >= quanta * 2) {
2228 		/* Replenish credits, saving one quanta for enqueues */
2229 		uint16_t val = ev_port->inflight_credits - quanta;
2230 
2231 		__atomic_fetch_sub(&dlb->inflights, val, __ATOMIC_SEQ_CST);
2232 		ev_port->inflight_credits -= val;
2233 	}
2234 }
2235 
2236 static __rte_always_inline uint16_t
dlb_read_pc(struct process_local_port_data * port_data,bool ldb)2237 dlb_read_pc(struct process_local_port_data *port_data, bool ldb)
2238 {
2239 	volatile uint16_t *popcount;
2240 
2241 	if (ldb)
2242 		popcount = port_data->ldb_popcount;
2243 	else
2244 		popcount = port_data->dir_popcount;
2245 
2246 	return *popcount;
2247 }
2248 
2249 static inline int
dlb_check_enqueue_hw_ldb_credits(struct dlb_port * qm_port,struct process_local_port_data * port_data)2250 dlb_check_enqueue_hw_ldb_credits(struct dlb_port *qm_port,
2251 				 struct process_local_port_data *port_data)
2252 {
2253 	if (unlikely(qm_port->cached_ldb_credits == 0)) {
2254 		uint16_t pc;
2255 
2256 		pc = dlb_read_pc(port_data, true);
2257 
2258 		qm_port->cached_ldb_credits = pc -
2259 			qm_port->ldb_pushcount_at_credit_expiry;
2260 		if (unlikely(qm_port->cached_ldb_credits == 0)) {
2261 			DLB_INC_STAT(
2262 			qm_port->ev_port->stats.traffic.tx_nospc_ldb_hw_credits,
2263 			1);
2264 
2265 			DLB_LOG_DBG("ldb credits exhausted\n");
2266 			return 1;
2267 		}
2268 		qm_port->ldb_pushcount_at_credit_expiry +=
2269 			qm_port->cached_ldb_credits;
2270 	}
2271 
2272 	return 0;
2273 }
2274 
2275 static inline int
dlb_check_enqueue_hw_dir_credits(struct dlb_port * qm_port,struct process_local_port_data * port_data)2276 dlb_check_enqueue_hw_dir_credits(struct dlb_port *qm_port,
2277 				 struct process_local_port_data *port_data)
2278 {
2279 	if (unlikely(qm_port->cached_dir_credits == 0)) {
2280 		uint16_t pc;
2281 
2282 		pc = dlb_read_pc(port_data, false);
2283 
2284 		qm_port->cached_dir_credits = pc -
2285 			qm_port->dir_pushcount_at_credit_expiry;
2286 
2287 		if (unlikely(qm_port->cached_dir_credits == 0)) {
2288 			DLB_INC_STAT(
2289 			qm_port->ev_port->stats.traffic.tx_nospc_dir_hw_credits,
2290 			1);
2291 
2292 			DLB_LOG_DBG("dir credits exhausted\n");
2293 			return 1;
2294 		}
2295 		qm_port->dir_pushcount_at_credit_expiry +=
2296 			qm_port->cached_dir_credits;
2297 	}
2298 
2299 	return 0;
2300 }
2301 
2302 static inline int
dlb_event_enqueue_prep(struct dlb_eventdev_port * ev_port,struct dlb_port * qm_port,const struct rte_event ev[],struct process_local_port_data * port_data,uint8_t * sched_type,uint8_t * queue_id)2303 dlb_event_enqueue_prep(struct dlb_eventdev_port *ev_port,
2304 		       struct dlb_port *qm_port,
2305 		       const struct rte_event ev[],
2306 		       struct process_local_port_data *port_data,
2307 		       uint8_t *sched_type,
2308 		       uint8_t *queue_id)
2309 {
2310 	struct dlb_eventdev *dlb = ev_port->dlb;
2311 	struct dlb_eventdev_queue *ev_queue;
2312 	uint16_t *cached_credits = NULL;
2313 	struct dlb_queue *qm_queue;
2314 
2315 	ev_queue = &dlb->ev_queues[ev->queue_id];
2316 	qm_queue = &ev_queue->qm_queue;
2317 	*queue_id = qm_queue->id;
2318 
2319 	/* Ignore sched_type and hardware credits on release events */
2320 	if (ev->op == RTE_EVENT_OP_RELEASE)
2321 		goto op_check;
2322 
2323 	if (!qm_queue->is_directed) {
2324 		/* Load balanced destination queue */
2325 
2326 		if (dlb_check_enqueue_hw_ldb_credits(qm_port, port_data)) {
2327 			rte_errno = -ENOSPC;
2328 			return 1;
2329 		}
2330 		cached_credits = &qm_port->cached_ldb_credits;
2331 
2332 		switch (ev->sched_type) {
2333 		case RTE_SCHED_TYPE_ORDERED:
2334 			DLB_LOG_DBG("dlb: put_qe: RTE_SCHED_TYPE_ORDERED\n");
2335 			if (qm_queue->sched_type != RTE_SCHED_TYPE_ORDERED) {
2336 				DLB_LOG_ERR("dlb: tried to send ordered event to unordered queue %d\n",
2337 					    *queue_id);
2338 				rte_errno = -EINVAL;
2339 				return 1;
2340 			}
2341 			*sched_type = DLB_SCHED_ORDERED;
2342 			break;
2343 		case RTE_SCHED_TYPE_ATOMIC:
2344 			DLB_LOG_DBG("dlb: put_qe: RTE_SCHED_TYPE_ATOMIC\n");
2345 			*sched_type = DLB_SCHED_ATOMIC;
2346 			break;
2347 		case RTE_SCHED_TYPE_PARALLEL:
2348 			DLB_LOG_DBG("dlb: put_qe: RTE_SCHED_TYPE_PARALLEL\n");
2349 			if (qm_queue->sched_type == RTE_SCHED_TYPE_ORDERED)
2350 				*sched_type = DLB_SCHED_ORDERED;
2351 			else
2352 				*sched_type = DLB_SCHED_UNORDERED;
2353 			break;
2354 		default:
2355 			DLB_LOG_ERR("Unsupported LDB sched type in put_qe\n");
2356 			DLB_INC_STAT(ev_port->stats.tx_invalid, 1);
2357 			rte_errno = -EINVAL;
2358 			return 1;
2359 		}
2360 	} else {
2361 		/* Directed destination queue */
2362 
2363 		if (dlb_check_enqueue_hw_dir_credits(qm_port, port_data)) {
2364 			rte_errno = -ENOSPC;
2365 			return 1;
2366 		}
2367 		cached_credits = &qm_port->cached_dir_credits;
2368 
2369 		DLB_LOG_DBG("dlb: put_qe: RTE_SCHED_TYPE_DIRECTED\n");
2370 
2371 		*sched_type = DLB_SCHED_DIRECTED;
2372 	}
2373 
2374 op_check:
2375 	switch (ev->op) {
2376 	case RTE_EVENT_OP_NEW:
2377 		/* Check that a sw credit is available */
2378 		if (dlb_check_enqueue_sw_credits(dlb, ev_port)) {
2379 			rte_errno = -ENOSPC;
2380 			return 1;
2381 		}
2382 		ev_port->inflight_credits--;
2383 		(*cached_credits)--;
2384 		break;
2385 	case RTE_EVENT_OP_FORWARD:
2386 		/* Check for outstanding_releases underflow. If this occurs,
2387 		 * the application is not using the EVENT_OPs correctly; for
2388 		 * example, forwarding or releasing events that were not
2389 		 * dequeued.
2390 		 */
2391 		RTE_ASSERT(ev_port->outstanding_releases > 0);
2392 		ev_port->outstanding_releases--;
2393 		qm_port->issued_releases++;
2394 		(*cached_credits)--;
2395 		break;
2396 	case RTE_EVENT_OP_RELEASE:
2397 		ev_port->inflight_credits++;
2398 		/* Check for outstanding_releases underflow. If this occurs,
2399 		 * the application is not using the EVENT_OPs correctly; for
2400 		 * example, forwarding or releasing events that were not
2401 		 * dequeued.
2402 		 */
2403 		RTE_ASSERT(ev_port->outstanding_releases > 0);
2404 		ev_port->outstanding_releases--;
2405 		qm_port->issued_releases++;
2406 		/* Replenish s/w credits if enough are cached */
2407 		dlb_replenish_sw_credits(dlb, ev_port);
2408 		break;
2409 	}
2410 
2411 	DLB_INC_STAT(ev_port->stats.tx_op_cnt[ev->op], 1);
2412 	DLB_INC_STAT(ev_port->stats.traffic.tx_ok, 1);
2413 
2414 #ifndef RTE_LIBRTE_PMD_DLB_QUELL_STATS
2415 	if (ev->op != RTE_EVENT_OP_RELEASE) {
2416 		DLB_INC_STAT(ev_port->stats.enq_ok[ev->queue_id], 1);
2417 		DLB_INC_STAT(ev_port->stats.tx_sched_cnt[*sched_type], 1);
2418 	}
2419 #endif
2420 
2421 	return 0;
2422 }
2423 
2424 static uint8_t cmd_byte_map[NUM_DLB_PORT_TYPES][DLB_NUM_HW_SCHED_TYPES] = {
2425 	{
2426 		/* Load-balanced cmd bytes */
2427 		[RTE_EVENT_OP_NEW] = DLB_NEW_CMD_BYTE,
2428 		[RTE_EVENT_OP_FORWARD] = DLB_FWD_CMD_BYTE,
2429 		[RTE_EVENT_OP_RELEASE] = DLB_COMP_CMD_BYTE,
2430 	},
2431 	{
2432 		/* Directed cmd bytes */
2433 		[RTE_EVENT_OP_NEW] = DLB_NEW_CMD_BYTE,
2434 		[RTE_EVENT_OP_FORWARD] = DLB_NEW_CMD_BYTE,
2435 		[RTE_EVENT_OP_RELEASE] = DLB_NOOP_CMD_BYTE,
2436 	},
2437 };
2438 
2439 static inline void
dlb_event_build_hcws(struct dlb_port * qm_port,const struct rte_event ev[],int num,uint8_t * sched_type,uint8_t * queue_id)2440 dlb_event_build_hcws(struct dlb_port *qm_port,
2441 		     const struct rte_event ev[],
2442 		     int num,
2443 		     uint8_t *sched_type,
2444 		     uint8_t *queue_id)
2445 {
2446 	struct dlb_enqueue_qe *qe;
2447 	uint16_t sched_word[4];
2448 	__m128i sse_qe[2];
2449 	int i;
2450 
2451 	qe = qm_port->qe4;
2452 
2453 	sse_qe[0] = _mm_setzero_si128();
2454 	sse_qe[1] = _mm_setzero_si128();
2455 
2456 	switch (num) {
2457 	case 4:
2458 		/* Construct the metadata portion of two HCWs in one 128b SSE
2459 		 * register. HCW metadata is constructed in the SSE registers
2460 		 * like so:
2461 		 * sse_qe[0][63:0]:   qe[0]'s metadata
2462 		 * sse_qe[0][127:64]: qe[1]'s metadata
2463 		 * sse_qe[1][63:0]:   qe[2]'s metadata
2464 		 * sse_qe[1][127:64]: qe[3]'s metadata
2465 		 */
2466 
2467 		/* Convert the event operation into a command byte and store it
2468 		 * in the metadata:
2469 		 * sse_qe[0][63:56]   = cmd_byte_map[is_directed][ev[0].op]
2470 		 * sse_qe[0][127:120] = cmd_byte_map[is_directed][ev[1].op]
2471 		 * sse_qe[1][63:56]   = cmd_byte_map[is_directed][ev[2].op]
2472 		 * sse_qe[1][127:120] = cmd_byte_map[is_directed][ev[3].op]
2473 		 */
2474 #define DLB_QE_CMD_BYTE 7
2475 		sse_qe[0] = _mm_insert_epi8(sse_qe[0],
2476 				cmd_byte_map[qm_port->is_directed][ev[0].op],
2477 				DLB_QE_CMD_BYTE);
2478 		sse_qe[0] = _mm_insert_epi8(sse_qe[0],
2479 				cmd_byte_map[qm_port->is_directed][ev[1].op],
2480 				DLB_QE_CMD_BYTE + 8);
2481 		sse_qe[1] = _mm_insert_epi8(sse_qe[1],
2482 				cmd_byte_map[qm_port->is_directed][ev[2].op],
2483 				DLB_QE_CMD_BYTE);
2484 		sse_qe[1] = _mm_insert_epi8(sse_qe[1],
2485 				cmd_byte_map[qm_port->is_directed][ev[3].op],
2486 				DLB_QE_CMD_BYTE + 8);
2487 
2488 		/* Store priority, scheduling type, and queue ID in the sched
2489 		 * word array because these values are re-used when the
2490 		 * destination is a directed queue.
2491 		 */
2492 		sched_word[0] = EV_TO_DLB_PRIO(ev[0].priority) << 10 |
2493 				sched_type[0] << 8 |
2494 				queue_id[0];
2495 		sched_word[1] = EV_TO_DLB_PRIO(ev[1].priority) << 10 |
2496 				sched_type[1] << 8 |
2497 				queue_id[1];
2498 		sched_word[2] = EV_TO_DLB_PRIO(ev[2].priority) << 10 |
2499 				sched_type[2] << 8 |
2500 				queue_id[2];
2501 		sched_word[3] = EV_TO_DLB_PRIO(ev[3].priority) << 10 |
2502 				sched_type[3] << 8 |
2503 				queue_id[3];
2504 
2505 		/* Store the event priority, scheduling type, and queue ID in
2506 		 * the metadata:
2507 		 * sse_qe[0][31:16] = sched_word[0]
2508 		 * sse_qe[0][95:80] = sched_word[1]
2509 		 * sse_qe[1][31:16] = sched_word[2]
2510 		 * sse_qe[1][95:80] = sched_word[3]
2511 		 */
2512 #define DLB_QE_QID_SCHED_WORD 1
2513 		sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2514 					     sched_word[0],
2515 					     DLB_QE_QID_SCHED_WORD);
2516 		sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2517 					     sched_word[1],
2518 					     DLB_QE_QID_SCHED_WORD + 4);
2519 		sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2520 					     sched_word[2],
2521 					     DLB_QE_QID_SCHED_WORD);
2522 		sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2523 					     sched_word[3],
2524 					     DLB_QE_QID_SCHED_WORD + 4);
2525 
2526 		/* If the destination is a load-balanced queue, store the lock
2527 		 * ID. If it is a directed queue, DLB places this field in
2528 		 * bytes 10-11 of the received QE, so we format it accordingly:
2529 		 * sse_qe[0][47:32]  = dir queue ? sched_word[0] : flow_id[0]
2530 		 * sse_qe[0][111:96] = dir queue ? sched_word[1] : flow_id[1]
2531 		 * sse_qe[1][47:32]  = dir queue ? sched_word[2] : flow_id[2]
2532 		 * sse_qe[1][111:96] = dir queue ? sched_word[3] : flow_id[3]
2533 		 */
2534 #define DLB_QE_LOCK_ID_WORD 2
2535 		sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2536 				(sched_type[0] == DLB_SCHED_DIRECTED) ?
2537 					sched_word[0] : ev[0].flow_id,
2538 				DLB_QE_LOCK_ID_WORD);
2539 		sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2540 				(sched_type[1] == DLB_SCHED_DIRECTED) ?
2541 					sched_word[1] : ev[1].flow_id,
2542 				DLB_QE_LOCK_ID_WORD + 4);
2543 		sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2544 				(sched_type[2] == DLB_SCHED_DIRECTED) ?
2545 					sched_word[2] : ev[2].flow_id,
2546 				DLB_QE_LOCK_ID_WORD);
2547 		sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2548 				(sched_type[3] == DLB_SCHED_DIRECTED) ?
2549 					sched_word[3] : ev[3].flow_id,
2550 				DLB_QE_LOCK_ID_WORD + 4);
2551 
2552 		/* Store the event type and sub event type in the metadata:
2553 		 * sse_qe[0][15:0]  = flow_id[0]
2554 		 * sse_qe[0][79:64] = flow_id[1]
2555 		 * sse_qe[1][15:0]  = flow_id[2]
2556 		 * sse_qe[1][79:64] = flow_id[3]
2557 		 */
2558 #define DLB_QE_EV_TYPE_WORD 0
2559 		sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2560 					     ev[0].sub_event_type << 8 |
2561 						ev[0].event_type,
2562 					     DLB_QE_EV_TYPE_WORD);
2563 		sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2564 					     ev[1].sub_event_type << 8 |
2565 						ev[1].event_type,
2566 					     DLB_QE_EV_TYPE_WORD + 4);
2567 		sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2568 					     ev[2].sub_event_type << 8 |
2569 						ev[2].event_type,
2570 					     DLB_QE_EV_TYPE_WORD);
2571 		sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2572 					     ev[3].sub_event_type << 8 |
2573 						ev[3].event_type,
2574 					     DLB_QE_EV_TYPE_WORD + 4);
2575 
2576 		/* Store the metadata to memory (use the double-precision
2577 		 * _mm_storeh_pd because there is no integer function for
2578 		 * storing the upper 64b):
2579 		 * qe[0] metadata = sse_qe[0][63:0]
2580 		 * qe[1] metadata = sse_qe[0][127:64]
2581 		 * qe[2] metadata = sse_qe[1][63:0]
2582 		 * qe[3] metadata = sse_qe[1][127:64]
2583 		 */
2584 		_mm_storel_epi64((__m128i *)&qe[0].u.opaque_data, sse_qe[0]);
2585 		_mm_storeh_pd((double *)&qe[1].u.opaque_data,
2586 			      (__m128d) sse_qe[0]);
2587 		_mm_storel_epi64((__m128i *)&qe[2].u.opaque_data, sse_qe[1]);
2588 		_mm_storeh_pd((double *)&qe[3].u.opaque_data,
2589 			      (__m128d) sse_qe[1]);
2590 
2591 		qe[0].data = ev[0].u64;
2592 		qe[1].data = ev[1].u64;
2593 		qe[2].data = ev[2].u64;
2594 		qe[3].data = ev[3].u64;
2595 
2596 		break;
2597 	case 3:
2598 	case 2:
2599 	case 1:
2600 		for (i = 0; i < num; i++) {
2601 			qe[i].cmd_byte =
2602 				cmd_byte_map[qm_port->is_directed][ev[i].op];
2603 			qe[i].sched_type = sched_type[i];
2604 			qe[i].data = ev[i].u64;
2605 			qe[i].qid = queue_id[i];
2606 			qe[i].priority = EV_TO_DLB_PRIO(ev[i].priority);
2607 			qe[i].lock_id = ev[i].flow_id;
2608 			if (sched_type[i] == DLB_SCHED_DIRECTED) {
2609 				struct dlb_msg_info *info =
2610 					(struct dlb_msg_info *)&qe[i].lock_id;
2611 
2612 				info->qid = queue_id[i];
2613 				info->sched_type = DLB_SCHED_DIRECTED;
2614 				info->priority = qe[i].priority;
2615 			}
2616 			qe[i].u.event_type.major = ev[i].event_type;
2617 			qe[i].u.event_type.sub = ev[i].sub_event_type;
2618 		}
2619 		break;
2620 	case 0:
2621 		break;
2622 	}
2623 }
2624 
2625 static inline void
dlb_construct_token_pop_qe(struct dlb_port * qm_port,int idx)2626 dlb_construct_token_pop_qe(struct dlb_port *qm_port, int idx)
2627 {
2628 	struct dlb_cq_pop_qe *qe = (void *)qm_port->qe4;
2629 	int num = qm_port->owed_tokens;
2630 
2631 	if (qm_port->use_rsvd_token_scheme) {
2632 		/* Check if there's a deficit of reserved tokens, and return
2633 		 * early if there are no (unreserved) tokens to consume.
2634 		 */
2635 		if (num <= qm_port->cq_rsvd_token_deficit) {
2636 			qm_port->cq_rsvd_token_deficit -= num;
2637 			qm_port->owed_tokens = 0;
2638 			return;
2639 		}
2640 		num -= qm_port->cq_rsvd_token_deficit;
2641 		qm_port->cq_rsvd_token_deficit = 0;
2642 	}
2643 
2644 	qe[idx].cmd_byte = DLB_POP_CMD_BYTE;
2645 	qe[idx].tokens = num - 1;
2646 	qm_port->owed_tokens = 0;
2647 }
2648 
2649 static __rte_always_inline void
dlb_pp_write(struct dlb_enqueue_qe * qe4,struct process_local_port_data * port_data)2650 dlb_pp_write(struct dlb_enqueue_qe *qe4,
2651 	     struct process_local_port_data *port_data)
2652 {
2653 	dlb_movdir64b(port_data->pp_addr, qe4);
2654 }
2655 
2656 static inline void
dlb_hw_do_enqueue(struct dlb_port * qm_port,bool do_sfence,struct process_local_port_data * port_data)2657 dlb_hw_do_enqueue(struct dlb_port *qm_port,
2658 		  bool do_sfence,
2659 		  struct process_local_port_data *port_data)
2660 {
2661 	DLB_LOG_DBG("dlb: Flushing QE(s) to DLB\n");
2662 
2663 	/* Since MOVDIR64B is weakly-ordered, use an SFENCE to ensure that
2664 	 * application writes complete before enqueueing the release HCW.
2665 	 */
2666 	if (do_sfence)
2667 		rte_wmb();
2668 
2669 	dlb_pp_write(qm_port->qe4, port_data);
2670 }
2671 
2672 static inline int
dlb_consume_qe_immediate(struct dlb_port * qm_port,int num)2673 dlb_consume_qe_immediate(struct dlb_port *qm_port, int num)
2674 {
2675 	struct process_local_port_data *port_data;
2676 	struct dlb_cq_pop_qe *qe;
2677 
2678 	RTE_ASSERT(qm_port->config_state == DLB_CONFIGURED);
2679 
2680 	if (qm_port->use_rsvd_token_scheme) {
2681 		/* Check if there's a deficit of reserved tokens, and return
2682 		 * early if there are no (unreserved) tokens to consume.
2683 		 */
2684 		if (num <= qm_port->cq_rsvd_token_deficit) {
2685 			qm_port->cq_rsvd_token_deficit -= num;
2686 			qm_port->owed_tokens = 0;
2687 			return 0;
2688 		}
2689 		num -= qm_port->cq_rsvd_token_deficit;
2690 		qm_port->cq_rsvd_token_deficit = 0;
2691 	}
2692 
2693 	qe = qm_port->consume_qe;
2694 
2695 	qe->tokens = num - 1;
2696 	qe->int_arm = 0;
2697 
2698 	/* No store fence needed since no pointer is being sent, and CQ token
2699 	 * pops can be safely reordered with other HCWs.
2700 	 */
2701 	port_data = &dlb_port[qm_port->id][PORT_TYPE(qm_port)];
2702 
2703 	dlb_movntdq_single(port_data->pp_addr, qe);
2704 
2705 	DLB_LOG_DBG("dlb: consume immediate - %d QEs\n", num);
2706 
2707 	qm_port->owed_tokens = 0;
2708 
2709 	return 0;
2710 }
2711 
2712 static inline uint16_t
__dlb_event_enqueue_burst(void * event_port,const struct rte_event events[],uint16_t num,bool use_delayed)2713 __dlb_event_enqueue_burst(void *event_port,
2714 			  const struct rte_event events[],
2715 			  uint16_t num,
2716 			  bool use_delayed)
2717 {
2718 	struct dlb_eventdev_port *ev_port = event_port;
2719 	struct dlb_port *qm_port = &ev_port->qm_port;
2720 	struct process_local_port_data *port_data;
2721 	int i;
2722 
2723 	RTE_ASSERT(ev_port->enq_configured);
2724 	RTE_ASSERT(events != NULL);
2725 
2726 	rte_errno = 0;
2727 	i = 0;
2728 
2729 	port_data = &dlb_port[qm_port->id][PORT_TYPE(qm_port)];
2730 
2731 	while (i < num) {
2732 		uint8_t sched_types[DLB_NUM_QES_PER_CACHE_LINE];
2733 		uint8_t queue_ids[DLB_NUM_QES_PER_CACHE_LINE];
2734 		int pop_offs = 0;
2735 		int j = 0;
2736 
2737 		memset(qm_port->qe4,
2738 		       0,
2739 		       DLB_NUM_QES_PER_CACHE_LINE *
2740 		       sizeof(struct dlb_enqueue_qe));
2741 
2742 		for (; j < DLB_NUM_QES_PER_CACHE_LINE && (i + j) < num; j++) {
2743 			const struct rte_event *ev = &events[i + j];
2744 			int16_t thresh = qm_port->token_pop_thresh;
2745 
2746 			if (use_delayed &&
2747 			    qm_port->token_pop_mode == DELAYED_POP &&
2748 			    (ev->op == RTE_EVENT_OP_FORWARD ||
2749 			     ev->op == RTE_EVENT_OP_RELEASE) &&
2750 			    qm_port->issued_releases >= thresh - 1) {
2751 				/* Insert the token pop QE and break out. This
2752 				 * may result in a partial HCW, but that is
2753 				 * simpler than supporting arbitrary QE
2754 				 * insertion.
2755 				 */
2756 				dlb_construct_token_pop_qe(qm_port, j);
2757 
2758 				/* Reset the releases for the next QE batch */
2759 				qm_port->issued_releases -= thresh;
2760 
2761 				/* When using delayed token pop mode, the
2762 				 * initial token threshold is the full CQ
2763 				 * depth. After the first token pop, we need to
2764 				 * reset it to the dequeue_depth.
2765 				 */
2766 				qm_port->token_pop_thresh =
2767 					qm_port->dequeue_depth;
2768 
2769 				pop_offs = 1;
2770 				j++;
2771 				break;
2772 			}
2773 
2774 			if (dlb_event_enqueue_prep(ev_port, qm_port, ev,
2775 						   port_data, &sched_types[j],
2776 						   &queue_ids[j]))
2777 				break;
2778 		}
2779 
2780 		if (j == 0)
2781 			break;
2782 
2783 		dlb_event_build_hcws(qm_port, &events[i], j - pop_offs,
2784 				     sched_types, queue_ids);
2785 
2786 		dlb_hw_do_enqueue(qm_port, i == 0, port_data);
2787 
2788 		/* Don't include the token pop QE in the enqueue count */
2789 		i += j - pop_offs;
2790 
2791 		/* Don't interpret j < DLB_NUM_... as out-of-credits if
2792 		 * pop_offs != 0
2793 		 */
2794 		if (j < DLB_NUM_QES_PER_CACHE_LINE && pop_offs == 0)
2795 			break;
2796 	}
2797 
2798 	RTE_ASSERT(!((i == 0 && rte_errno != -ENOSPC)));
2799 
2800 	return i;
2801 }
2802 
2803 static inline uint16_t
dlb_event_enqueue_burst(void * event_port,const struct rte_event events[],uint16_t num)2804 dlb_event_enqueue_burst(void *event_port,
2805 			const struct rte_event events[],
2806 			uint16_t num)
2807 {
2808 	return __dlb_event_enqueue_burst(event_port, events, num, false);
2809 }
2810 
2811 static inline uint16_t
dlb_event_enqueue_burst_delayed(void * event_port,const struct rte_event events[],uint16_t num)2812 dlb_event_enqueue_burst_delayed(void *event_port,
2813 				const struct rte_event events[],
2814 				uint16_t num)
2815 {
2816 	return __dlb_event_enqueue_burst(event_port, events, num, true);
2817 }
2818 
2819 static inline uint16_t
dlb_event_enqueue(void * event_port,const struct rte_event events[])2820 dlb_event_enqueue(void *event_port,
2821 		  const struct rte_event events[])
2822 {
2823 	return __dlb_event_enqueue_burst(event_port, events, 1, false);
2824 }
2825 
2826 static inline uint16_t
dlb_event_enqueue_delayed(void * event_port,const struct rte_event events[])2827 dlb_event_enqueue_delayed(void *event_port,
2828 			  const struct rte_event events[])
2829 {
2830 	return __dlb_event_enqueue_burst(event_port, events, 1, true);
2831 }
2832 
2833 static uint16_t
dlb_event_enqueue_new_burst(void * event_port,const struct rte_event events[],uint16_t num)2834 dlb_event_enqueue_new_burst(void *event_port,
2835 			    const struct rte_event events[],
2836 			    uint16_t num)
2837 {
2838 	return __dlb_event_enqueue_burst(event_port, events, num, false);
2839 }
2840 
2841 static uint16_t
dlb_event_enqueue_new_burst_delayed(void * event_port,const struct rte_event events[],uint16_t num)2842 dlb_event_enqueue_new_burst_delayed(void *event_port,
2843 				    const struct rte_event events[],
2844 				    uint16_t num)
2845 {
2846 	return __dlb_event_enqueue_burst(event_port, events, num, true);
2847 }
2848 
2849 static uint16_t
dlb_event_enqueue_forward_burst(void * event_port,const struct rte_event events[],uint16_t num)2850 dlb_event_enqueue_forward_burst(void *event_port,
2851 				const struct rte_event events[],
2852 				uint16_t num)
2853 {
2854 	return __dlb_event_enqueue_burst(event_port, events, num, false);
2855 }
2856 
2857 static uint16_t
dlb_event_enqueue_forward_burst_delayed(void * event_port,const struct rte_event events[],uint16_t num)2858 dlb_event_enqueue_forward_burst_delayed(void *event_port,
2859 					const struct rte_event events[],
2860 					uint16_t num)
2861 {
2862 	return __dlb_event_enqueue_burst(event_port, events, num, true);
2863 }
2864 
2865 static __rte_always_inline int
dlb_recv_qe(struct dlb_port * qm_port,struct dlb_dequeue_qe * qe,uint8_t * offset)2866 dlb_recv_qe(struct dlb_port *qm_port, struct dlb_dequeue_qe *qe,
2867 	    uint8_t *offset)
2868 {
2869 	uint8_t xor_mask[2][4] = { {0x0F, 0x0E, 0x0C, 0x08},
2870 				   {0x00, 0x01, 0x03, 0x07} };
2871 	uint8_t and_mask[4] = {0x0F, 0x0E, 0x0C, 0x08};
2872 	volatile struct dlb_dequeue_qe *cq_addr;
2873 	__m128i *qes = (__m128i *)qe;
2874 	uint64_t *cache_line_base;
2875 	uint8_t gen_bits;
2876 
2877 	cq_addr = dlb_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
2878 	cq_addr = &cq_addr[qm_port->cq_idx];
2879 
2880 	cache_line_base = (void *)(((uintptr_t)cq_addr) & ~0x3F);
2881 	*offset = ((uintptr_t)cq_addr & 0x30) >> 4;
2882 
2883 	/* Load the next CQ cache line from memory. Pack these reads as tight
2884 	 * as possible to reduce the chance that DLB invalidates the line while
2885 	 * the CPU is reading it. Read the cache line backwards to ensure that
2886 	 * if QE[N] (N > 0) is valid, then QEs[0:N-1] are too.
2887 	 *
2888 	 * (Valid QEs start at &qe[offset])
2889 	 */
2890 	qes[3] = _mm_load_si128((__m128i *)&cache_line_base[6]);
2891 	qes[2] = _mm_load_si128((__m128i *)&cache_line_base[4]);
2892 	qes[1] = _mm_load_si128((__m128i *)&cache_line_base[2]);
2893 	qes[0] = _mm_load_si128((__m128i *)&cache_line_base[0]);
2894 
2895 	/* Evict the cache line ASAP */
2896 	rte_cldemote(cache_line_base);
2897 
2898 	/* Extract and combine the gen bits */
2899 	gen_bits = ((_mm_extract_epi8(qes[0], 15) & 0x1) << 0) |
2900 		   ((_mm_extract_epi8(qes[1], 15) & 0x1) << 1) |
2901 		   ((_mm_extract_epi8(qes[2], 15) & 0x1) << 2) |
2902 		   ((_mm_extract_epi8(qes[3], 15) & 0x1) << 3);
2903 
2904 	/* XOR the combined bits such that a 1 represents a valid QE */
2905 	gen_bits ^= xor_mask[qm_port->gen_bit][*offset];
2906 
2907 	/* Mask off gen bits we don't care about */
2908 	gen_bits &= and_mask[*offset];
2909 
2910 	return __builtin_popcount(gen_bits);
2911 }
2912 
2913 static inline void
dlb_inc_cq_idx(struct dlb_port * qm_port,int cnt)2914 dlb_inc_cq_idx(struct dlb_port *qm_port, int cnt)
2915 {
2916 	uint16_t idx = qm_port->cq_idx_unmasked + cnt;
2917 
2918 	qm_port->cq_idx_unmasked = idx;
2919 	qm_port->cq_idx = idx & qm_port->cq_depth_mask;
2920 	qm_port->gen_bit = (~(idx >> qm_port->gen_bit_shift)) & 0x1;
2921 }
2922 
2923 static inline int
dlb_process_dequeue_qes(struct dlb_eventdev_port * ev_port,struct dlb_port * qm_port,struct rte_event * events,struct dlb_dequeue_qe * qes,int cnt)2924 dlb_process_dequeue_qes(struct dlb_eventdev_port *ev_port,
2925 			struct dlb_port *qm_port,
2926 			struct rte_event *events,
2927 			struct dlb_dequeue_qe *qes,
2928 			int cnt)
2929 {
2930 	uint8_t *qid_mappings = qm_port->qid_mappings;
2931 	int i, num;
2932 
2933 	RTE_SET_USED(ev_port);  /* avoids unused variable error */
2934 
2935 	for (i = 0, num = 0; i < cnt; i++) {
2936 		struct dlb_dequeue_qe *qe = &qes[i];
2937 		int sched_type_map[4] = {
2938 			[DLB_SCHED_ATOMIC] = RTE_SCHED_TYPE_ATOMIC,
2939 			[DLB_SCHED_UNORDERED] = RTE_SCHED_TYPE_PARALLEL,
2940 			[DLB_SCHED_ORDERED] = RTE_SCHED_TYPE_ORDERED,
2941 			[DLB_SCHED_DIRECTED] = RTE_SCHED_TYPE_ATOMIC,
2942 		};
2943 
2944 		DLB_LOG_DBG("dequeue success, data = 0x%llx, qid=%d, event_type=%d, subevent=%d\npp_id = %d, sched_type = %d, qid = %d, err=%d\n",
2945 			    (long long)qe->data, qe->qid,
2946 			    qe->u.event_type.major,
2947 			    qe->u.event_type.sub,
2948 			    qe->pp_id, qe->sched_type, qe->qid, qe->error);
2949 
2950 		/* Fill in event information.
2951 		 * Note that flow_id must be embedded in the data by
2952 		 * the app, such as the mbuf RSS hash field if the data
2953 		 * buffer is a mbuf.
2954 		 */
2955 		if (unlikely(qe->error)) {
2956 			DLB_LOG_ERR("QE error bit ON\n");
2957 			DLB_INC_STAT(ev_port->stats.traffic.rx_drop, 1);
2958 			dlb_consume_qe_immediate(qm_port, 1);
2959 			continue; /* Ignore */
2960 		}
2961 
2962 		events[num].u64 = qe->data;
2963 		events[num].queue_id = qid_mappings[qe->qid];
2964 		events[num].priority = DLB_TO_EV_PRIO((uint8_t)qe->priority);
2965 		events[num].event_type = qe->u.event_type.major;
2966 		events[num].sub_event_type = qe->u.event_type.sub;
2967 		events[num].sched_type = sched_type_map[qe->sched_type];
2968 		DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qe->sched_type], 1);
2969 		num++;
2970 	}
2971 	DLB_INC_STAT(ev_port->stats.traffic.rx_ok, num);
2972 
2973 	return num;
2974 }
2975 
2976 static inline int
dlb_process_dequeue_four_qes(struct dlb_eventdev_port * ev_port,struct dlb_port * qm_port,struct rte_event * events,struct dlb_dequeue_qe * qes)2977 dlb_process_dequeue_four_qes(struct dlb_eventdev_port *ev_port,
2978 			     struct dlb_port *qm_port,
2979 			     struct rte_event *events,
2980 			     struct dlb_dequeue_qe *qes)
2981 {
2982 	int sched_type_map[] = {
2983 		[DLB_SCHED_ATOMIC] = RTE_SCHED_TYPE_ATOMIC,
2984 		[DLB_SCHED_UNORDERED] = RTE_SCHED_TYPE_PARALLEL,
2985 		[DLB_SCHED_ORDERED] = RTE_SCHED_TYPE_ORDERED,
2986 		[DLB_SCHED_DIRECTED] = RTE_SCHED_TYPE_ATOMIC,
2987 	};
2988 	const int num_events = DLB_NUM_QES_PER_CACHE_LINE;
2989 	uint8_t *qid_mappings = qm_port->qid_mappings;
2990 	__m128i sse_evt[2];
2991 	int i;
2992 
2993 	/* In the unlikely case that any of the QE error bits are set, process
2994 	 * them one at a time.
2995 	 */
2996 	if (unlikely(qes[0].error || qes[1].error ||
2997 		     qes[2].error || qes[3].error))
2998 		return dlb_process_dequeue_qes(ev_port, qm_port, events,
2999 					       qes, num_events);
3000 
3001 	for (i = 0; i < DLB_NUM_QES_PER_CACHE_LINE; i++) {
3002 		DLB_LOG_DBG("dequeue success, data = 0x%llx, qid=%d, event_type=%d, subevent=%d\npp_id = %d, sched_type = %d, qid = %d, err=%d\n",
3003 			    (long long)qes[i].data, qes[i].qid,
3004 			    qes[i].u.event_type.major,
3005 			    qes[i].u.event_type.sub,
3006 			    qes[i].pp_id, qes[i].sched_type, qes[i].qid,
3007 			    qes[i].error);
3008 	}
3009 
3010 	events[0].u64 = qes[0].data;
3011 	events[1].u64 = qes[1].data;
3012 	events[2].u64 = qes[2].data;
3013 	events[3].u64 = qes[3].data;
3014 
3015 	/* Construct the metadata portion of two struct rte_events
3016 	 * in one 128b SSE register. Event metadata is constructed in the SSE
3017 	 * registers like so:
3018 	 * sse_evt[0][63:0]:   event[0]'s metadata
3019 	 * sse_evt[0][127:64]: event[1]'s metadata
3020 	 * sse_evt[1][63:0]:   event[2]'s metadata
3021 	 * sse_evt[1][127:64]: event[3]'s metadata
3022 	 */
3023 	sse_evt[0] = _mm_setzero_si128();
3024 	sse_evt[1] = _mm_setzero_si128();
3025 
3026 	/* Convert the hardware queue ID to an event queue ID and store it in
3027 	 * the metadata:
3028 	 * sse_evt[0][47:40]   = qid_mappings[qes[0].qid]
3029 	 * sse_evt[0][111:104] = qid_mappings[qes[1].qid]
3030 	 * sse_evt[1][47:40]   = qid_mappings[qes[2].qid]
3031 	 * sse_evt[1][111:104] = qid_mappings[qes[3].qid]
3032 	 */
3033 #define DLB_EVENT_QUEUE_ID_BYTE 5
3034 	sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3035 				     qid_mappings[qes[0].qid],
3036 				     DLB_EVENT_QUEUE_ID_BYTE);
3037 	sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3038 				     qid_mappings[qes[1].qid],
3039 				     DLB_EVENT_QUEUE_ID_BYTE + 8);
3040 	sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3041 				     qid_mappings[qes[2].qid],
3042 				     DLB_EVENT_QUEUE_ID_BYTE);
3043 	sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3044 				     qid_mappings[qes[3].qid],
3045 				     DLB_EVENT_QUEUE_ID_BYTE + 8);
3046 
3047 	/* Convert the hardware priority to an event priority and store it in
3048 	 * the metadata:
3049 	 * sse_evt[0][55:48]   = DLB_TO_EV_PRIO(qes[0].priority)
3050 	 * sse_evt[0][119:112] = DLB_TO_EV_PRIO(qes[1].priority)
3051 	 * sse_evt[1][55:48]   = DLB_TO_EV_PRIO(qes[2].priority)
3052 	 * sse_evt[1][119:112] = DLB_TO_EV_PRIO(qes[3].priority)
3053 	 */
3054 #define DLB_EVENT_PRIO_BYTE 6
3055 	sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3056 				     DLB_TO_EV_PRIO((uint8_t)qes[0].priority),
3057 				     DLB_EVENT_PRIO_BYTE);
3058 	sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3059 				     DLB_TO_EV_PRIO((uint8_t)qes[1].priority),
3060 				     DLB_EVENT_PRIO_BYTE + 8);
3061 	sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3062 				     DLB_TO_EV_PRIO((uint8_t)qes[2].priority),
3063 				     DLB_EVENT_PRIO_BYTE);
3064 	sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3065 				     DLB_TO_EV_PRIO((uint8_t)qes[3].priority),
3066 				     DLB_EVENT_PRIO_BYTE + 8);
3067 
3068 	/* Write the event type and sub event type to the event metadata. Leave
3069 	 * flow ID unspecified, since the hardware does not maintain it during
3070 	 * scheduling:
3071 	 * sse_evt[0][31:0]   = qes[0].u.event_type.major << 28 |
3072 	 *			qes[0].u.event_type.sub << 20;
3073 	 * sse_evt[0][95:64]  = qes[1].u.event_type.major << 28 |
3074 	 *			qes[1].u.event_type.sub << 20;
3075 	 * sse_evt[1][31:0]   = qes[2].u.event_type.major << 28 |
3076 	 *			qes[2].u.event_type.sub << 20;
3077 	 * sse_evt[1][95:64]  = qes[3].u.event_type.major << 28 |
3078 	 *			qes[3].u.event_type.sub << 20;
3079 	 */
3080 #define DLB_EVENT_EV_TYPE_DW 0
3081 #define DLB_EVENT_EV_TYPE_SHIFT 28
3082 #define DLB_EVENT_SUB_EV_TYPE_SHIFT 20
3083 	sse_evt[0] = _mm_insert_epi32(sse_evt[0],
3084 			qes[0].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT |
3085 			qes[0].u.event_type.sub << DLB_EVENT_SUB_EV_TYPE_SHIFT,
3086 			DLB_EVENT_EV_TYPE_DW);
3087 	sse_evt[0] = _mm_insert_epi32(sse_evt[0],
3088 			qes[1].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT |
3089 			qes[1].u.event_type.sub <<  DLB_EVENT_SUB_EV_TYPE_SHIFT,
3090 			DLB_EVENT_EV_TYPE_DW + 2);
3091 	sse_evt[1] = _mm_insert_epi32(sse_evt[1],
3092 			qes[2].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT |
3093 			qes[2].u.event_type.sub <<  DLB_EVENT_SUB_EV_TYPE_SHIFT,
3094 			DLB_EVENT_EV_TYPE_DW);
3095 	sse_evt[1] = _mm_insert_epi32(sse_evt[1],
3096 			qes[3].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT  |
3097 			qes[3].u.event_type.sub << DLB_EVENT_SUB_EV_TYPE_SHIFT,
3098 			DLB_EVENT_EV_TYPE_DW + 2);
3099 
3100 	/* Write the sched type to the event metadata. 'op' and 'rsvd' are not
3101 	 * set:
3102 	 * sse_evt[0][39:32]  = sched_type_map[qes[0].sched_type] << 6
3103 	 * sse_evt[0][103:96] = sched_type_map[qes[1].sched_type] << 6
3104 	 * sse_evt[1][39:32]  = sched_type_map[qes[2].sched_type] << 6
3105 	 * sse_evt[1][103:96] = sched_type_map[qes[3].sched_type] << 6
3106 	 */
3107 #define DLB_EVENT_SCHED_TYPE_BYTE 4
3108 #define DLB_EVENT_SCHED_TYPE_SHIFT 6
3109 	sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3110 		sched_type_map[qes[0].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
3111 		DLB_EVENT_SCHED_TYPE_BYTE);
3112 	sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3113 		sched_type_map[qes[1].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
3114 		DLB_EVENT_SCHED_TYPE_BYTE + 8);
3115 	sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3116 		sched_type_map[qes[2].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
3117 		DLB_EVENT_SCHED_TYPE_BYTE);
3118 	sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3119 		sched_type_map[qes[3].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
3120 		DLB_EVENT_SCHED_TYPE_BYTE + 8);
3121 
3122 	/* Store the metadata to the event (use the double-precision
3123 	 * _mm_storeh_pd because there is no integer function for storing the
3124 	 * upper 64b):
3125 	 * events[0].event = sse_evt[0][63:0]
3126 	 * events[1].event = sse_evt[0][127:64]
3127 	 * events[2].event = sse_evt[1][63:0]
3128 	 * events[3].event = sse_evt[1][127:64]
3129 	 */
3130 	_mm_storel_epi64((__m128i *)&events[0].event, sse_evt[0]);
3131 	_mm_storeh_pd((double *)&events[1].event, (__m128d) sse_evt[0]);
3132 	_mm_storel_epi64((__m128i *)&events[2].event, sse_evt[1]);
3133 	_mm_storeh_pd((double *)&events[3].event, (__m128d) sse_evt[1]);
3134 
3135 	DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[0].sched_type], 1);
3136 	DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[1].sched_type], 1);
3137 	DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[2].sched_type], 1);
3138 	DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[3].sched_type], 1);
3139 
3140 	DLB_INC_STAT(ev_port->stats.traffic.rx_ok, num_events);
3141 
3142 	return num_events;
3143 }
3144 
3145 static inline int
dlb_dequeue_wait(struct dlb_eventdev * dlb,struct dlb_eventdev_port * ev_port,struct dlb_port * qm_port,uint64_t timeout,uint64_t start_ticks)3146 dlb_dequeue_wait(struct dlb_eventdev *dlb,
3147 		 struct dlb_eventdev_port *ev_port,
3148 		 struct dlb_port *qm_port,
3149 		 uint64_t timeout,
3150 		 uint64_t start_ticks)
3151 {
3152 	struct process_local_port_data *port_data;
3153 	uint64_t elapsed_ticks;
3154 
3155 	port_data = &dlb_port[qm_port->id][PORT_TYPE(qm_port)];
3156 
3157 	elapsed_ticks = rte_get_timer_cycles() - start_ticks;
3158 
3159 	/* Wait/poll time expired */
3160 	if (elapsed_ticks >= timeout) {
3161 		/* Interrupts not supported by PF PMD */
3162 		return 1;
3163 	} else if (dlb->umwait_allowed) {
3164 		volatile struct dlb_dequeue_qe *cq_base;
3165 		union {
3166 			uint64_t raw_qe[2];
3167 			struct dlb_dequeue_qe qe;
3168 		} qe_mask;
3169 		uint64_t expected_value;
3170 		volatile uint64_t *monitor_addr;
3171 
3172 		qe_mask.qe.cq_gen = 1; /* set mask */
3173 
3174 		cq_base = port_data->cq_base;
3175 		monitor_addr = (volatile uint64_t *)(volatile void *)
3176 			&cq_base[qm_port->cq_idx];
3177 		monitor_addr++; /* cq_gen bit is in second 64bit location */
3178 
3179 		if (qm_port->gen_bit)
3180 			expected_value = qe_mask.raw_qe[1];
3181 		else
3182 			expected_value = 0;
3183 
3184 		rte_power_monitor(monitor_addr, expected_value,
3185 				  qe_mask.raw_qe[1], timeout + start_ticks,
3186 				  sizeof(uint64_t));
3187 
3188 		DLB_INC_STAT(ev_port->stats.traffic.rx_umonitor_umwait, 1);
3189 	} else {
3190 		uint64_t poll_interval = RTE_LIBRTE_PMD_DLB_POLL_INTERVAL;
3191 		uint64_t curr_ticks = rte_get_timer_cycles();
3192 		uint64_t init_ticks = curr_ticks;
3193 
3194 		while ((curr_ticks - start_ticks < timeout) &&
3195 		       (curr_ticks - init_ticks < poll_interval))
3196 			curr_ticks = rte_get_timer_cycles();
3197 	}
3198 
3199 	return 0;
3200 }
3201 
3202 static inline int16_t
dlb_hw_dequeue(struct dlb_eventdev * dlb,struct dlb_eventdev_port * ev_port,struct rte_event * events,uint16_t max_num,uint64_t dequeue_timeout_ticks)3203 dlb_hw_dequeue(struct dlb_eventdev *dlb,
3204 	       struct dlb_eventdev_port *ev_port,
3205 	       struct rte_event *events,
3206 	       uint16_t max_num,
3207 	       uint64_t dequeue_timeout_ticks)
3208 {
3209 	uint64_t timeout;
3210 	uint64_t start_ticks = 0ULL;
3211 	struct dlb_port *qm_port;
3212 	int num = 0;
3213 
3214 	qm_port = &ev_port->qm_port;
3215 
3216 	/* If configured for per dequeue wait, then use wait value provided
3217 	 * to this API. Otherwise we must use the global
3218 	 * value from eventdev config time.
3219 	 */
3220 	if (!dlb->global_dequeue_wait)
3221 		timeout = dequeue_timeout_ticks;
3222 	else
3223 		timeout = dlb->global_dequeue_wait_ticks;
3224 
3225 	if (timeout)
3226 		start_ticks = rte_get_timer_cycles();
3227 
3228 	while (num < max_num) {
3229 		struct dlb_dequeue_qe qes[DLB_NUM_QES_PER_CACHE_LINE];
3230 		uint8_t offset;
3231 		int num_avail;
3232 
3233 		/* Copy up to 4 QEs from the current cache line into qes */
3234 		num_avail = dlb_recv_qe(qm_port, qes, &offset);
3235 
3236 		/* But don't process more than the user requested */
3237 		num_avail = RTE_MIN(num_avail, max_num - num);
3238 
3239 		dlb_inc_cq_idx(qm_port, num_avail);
3240 
3241 		if (num_avail == DLB_NUM_QES_PER_CACHE_LINE)
3242 			num += dlb_process_dequeue_four_qes(ev_port,
3243 							     qm_port,
3244 							     &events[num],
3245 							     &qes[offset]);
3246 		else if (num_avail)
3247 			num += dlb_process_dequeue_qes(ev_port,
3248 							qm_port,
3249 							&events[num],
3250 							&qes[offset],
3251 							num_avail);
3252 		else if ((timeout == 0) || (num > 0))
3253 			/* Not waiting in any form, or 1+ events received? */
3254 			break;
3255 		else if (dlb_dequeue_wait(dlb, ev_port, qm_port,
3256 					  timeout, start_ticks))
3257 			break;
3258 	}
3259 
3260 	qm_port->owed_tokens += num;
3261 
3262 	if (num && qm_port->token_pop_mode == AUTO_POP)
3263 		dlb_consume_qe_immediate(qm_port, num);
3264 
3265 	ev_port->outstanding_releases += num;
3266 
3267 	return num;
3268 }
3269 
3270 static __rte_always_inline int
dlb_recv_qe_sparse(struct dlb_port * qm_port,struct dlb_dequeue_qe * qe)3271 dlb_recv_qe_sparse(struct dlb_port *qm_port, struct dlb_dequeue_qe *qe)
3272 {
3273 	volatile struct dlb_dequeue_qe *cq_addr;
3274 	uint8_t xor_mask[2] = {0x0F, 0x00};
3275 	const uint8_t and_mask = 0x0F;
3276 	__m128i *qes = (__m128i *)qe;
3277 	uint8_t gen_bits, gen_bit;
3278 	uintptr_t addr[4];
3279 	uint16_t idx;
3280 
3281 	cq_addr = dlb_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
3282 
3283 	idx = qm_port->cq_idx;
3284 
3285 	/* Load the next 4 QEs */
3286 	addr[0] = (uintptr_t)&cq_addr[idx];
3287 	addr[1] = (uintptr_t)&cq_addr[(idx +  4) & qm_port->cq_depth_mask];
3288 	addr[2] = (uintptr_t)&cq_addr[(idx +  8) & qm_port->cq_depth_mask];
3289 	addr[3] = (uintptr_t)&cq_addr[(idx + 12) & qm_port->cq_depth_mask];
3290 
3291 	/* Prefetch next batch of QEs (all CQs occupy minimum 8 cache lines) */
3292 	rte_prefetch0(&cq_addr[(idx + 16) & qm_port->cq_depth_mask]);
3293 	rte_prefetch0(&cq_addr[(idx + 20) & qm_port->cq_depth_mask]);
3294 	rte_prefetch0(&cq_addr[(idx + 24) & qm_port->cq_depth_mask]);
3295 	rte_prefetch0(&cq_addr[(idx + 28) & qm_port->cq_depth_mask]);
3296 
3297 	/* Correct the xor_mask for wrap-around QEs */
3298 	gen_bit = qm_port->gen_bit;
3299 	xor_mask[gen_bit] ^= !!((idx +  4) > qm_port->cq_depth_mask) << 1;
3300 	xor_mask[gen_bit] ^= !!((idx +  8) > qm_port->cq_depth_mask) << 2;
3301 	xor_mask[gen_bit] ^= !!((idx + 12) > qm_port->cq_depth_mask) << 3;
3302 
3303 	/* Read the cache lines backwards to ensure that if QE[N] (N > 0) is
3304 	 * valid, then QEs[0:N-1] are too.
3305 	 */
3306 	qes[3] = _mm_load_si128((__m128i *)(void *)addr[3]);
3307 	rte_compiler_barrier();
3308 	qes[2] = _mm_load_si128((__m128i *)(void *)addr[2]);
3309 	rte_compiler_barrier();
3310 	qes[1] = _mm_load_si128((__m128i *)(void *)addr[1]);
3311 	rte_compiler_barrier();
3312 	qes[0] = _mm_load_si128((__m128i *)(void *)addr[0]);
3313 
3314 	/* Extract and combine the gen bits */
3315 	gen_bits = ((_mm_extract_epi8(qes[0], 15) & 0x1) << 0) |
3316 		   ((_mm_extract_epi8(qes[1], 15) & 0x1) << 1) |
3317 		   ((_mm_extract_epi8(qes[2], 15) & 0x1) << 2) |
3318 		   ((_mm_extract_epi8(qes[3], 15) & 0x1) << 3);
3319 
3320 	/* XOR the combined bits such that a 1 represents a valid QE */
3321 	gen_bits ^= xor_mask[gen_bit];
3322 
3323 	/* Mask off gen bits we don't care about */
3324 	gen_bits &= and_mask;
3325 
3326 	return __builtin_popcount(gen_bits);
3327 }
3328 
3329 static inline int16_t
dlb_hw_dequeue_sparse(struct dlb_eventdev * dlb,struct dlb_eventdev_port * ev_port,struct rte_event * events,uint16_t max_num,uint64_t dequeue_timeout_ticks)3330 dlb_hw_dequeue_sparse(struct dlb_eventdev *dlb,
3331 		      struct dlb_eventdev_port *ev_port,
3332 		      struct rte_event *events,
3333 		      uint16_t max_num,
3334 		      uint64_t dequeue_timeout_ticks)
3335 {
3336 	uint64_t timeout;
3337 	uint64_t start_ticks = 0ULL;
3338 	struct dlb_port *qm_port;
3339 	int num = 0;
3340 
3341 	qm_port = &ev_port->qm_port;
3342 
3343 	/* If configured for per dequeue wait, then use wait value provided
3344 	 * to this API. Otherwise we must use the global
3345 	 * value from eventdev config time.
3346 	 */
3347 	if (!dlb->global_dequeue_wait)
3348 		timeout = dequeue_timeout_ticks;
3349 	else
3350 		timeout = dlb->global_dequeue_wait_ticks;
3351 
3352 	if (timeout)
3353 		start_ticks = rte_get_timer_cycles();
3354 
3355 	while (num < max_num) {
3356 		struct dlb_dequeue_qe qes[DLB_NUM_QES_PER_CACHE_LINE];
3357 		int num_avail;
3358 
3359 		/* Copy up to 4 QEs from the current cache line into qes */
3360 		num_avail = dlb_recv_qe_sparse(qm_port, qes);
3361 
3362 		/* But don't process more than the user requested */
3363 		num_avail = RTE_MIN(num_avail, max_num - num);
3364 
3365 		dlb_inc_cq_idx(qm_port, num_avail << 2);
3366 
3367 		if (num_avail == DLB_NUM_QES_PER_CACHE_LINE)
3368 			num += dlb_process_dequeue_four_qes(ev_port,
3369 							     qm_port,
3370 							     &events[num],
3371 							     &qes[0]);
3372 		else if (num_avail)
3373 			num += dlb_process_dequeue_qes(ev_port,
3374 							qm_port,
3375 							&events[num],
3376 							&qes[0],
3377 							num_avail);
3378 		else if ((timeout == 0) || (num > 0))
3379 			/* Not waiting in any form, or 1+ events received? */
3380 			break;
3381 		else if (dlb_dequeue_wait(dlb, ev_port, qm_port,
3382 					  timeout, start_ticks))
3383 			break;
3384 	}
3385 
3386 	qm_port->owed_tokens += num;
3387 
3388 	if (num && qm_port->token_pop_mode == AUTO_POP)
3389 		dlb_consume_qe_immediate(qm_port, num);
3390 
3391 	ev_port->outstanding_releases += num;
3392 
3393 	return num;
3394 }
3395 
3396 static int
dlb_event_release(struct dlb_eventdev * dlb,uint8_t port_id,int n)3397 dlb_event_release(struct dlb_eventdev *dlb, uint8_t port_id, int n)
3398 {
3399 	struct process_local_port_data *port_data;
3400 	struct dlb_eventdev_port *ev_port;
3401 	struct dlb_port *qm_port;
3402 	int i;
3403 
3404 	if (port_id > dlb->num_ports) {
3405 		DLB_LOG_ERR("Invalid port id %d in dlb-event_release\n",
3406 			    port_id);
3407 		rte_errno = -EINVAL;
3408 		return rte_errno;
3409 	}
3410 
3411 	ev_port = &dlb->ev_ports[port_id];
3412 	qm_port = &ev_port->qm_port;
3413 	port_data = &dlb_port[qm_port->id][PORT_TYPE(qm_port)];
3414 
3415 	i = 0;
3416 
3417 	if (qm_port->is_directed) {
3418 		i = n;
3419 		goto sw_credit_update;
3420 	}
3421 
3422 	while (i < n) {
3423 		int pop_offs = 0;
3424 		int j = 0;
3425 
3426 		/* Zero-out QEs */
3427 		qm_port->qe4[0].cmd_byte = 0;
3428 		qm_port->qe4[1].cmd_byte = 0;
3429 		qm_port->qe4[2].cmd_byte = 0;
3430 		qm_port->qe4[3].cmd_byte = 0;
3431 
3432 		for (; j < DLB_NUM_QES_PER_CACHE_LINE && (i + j) < n; j++) {
3433 			int16_t thresh = qm_port->token_pop_thresh;
3434 
3435 			if (qm_port->token_pop_mode == DELAYED_POP &&
3436 			    qm_port->issued_releases >= thresh - 1) {
3437 				/* Insert the token pop QE */
3438 				dlb_construct_token_pop_qe(qm_port, j);
3439 
3440 				/* Reset the releases for the next QE batch */
3441 				qm_port->issued_releases -= thresh;
3442 
3443 				/* When using delayed token pop mode, the
3444 				 * initial token threshold is the full CQ
3445 				 * depth. After the first token pop, we need to
3446 				 * reset it to the dequeue_depth.
3447 				 */
3448 				qm_port->token_pop_thresh =
3449 					qm_port->dequeue_depth;
3450 
3451 				pop_offs = 1;
3452 				j++;
3453 				break;
3454 			}
3455 
3456 			qm_port->qe4[j].cmd_byte = DLB_COMP_CMD_BYTE;
3457 			qm_port->issued_releases++;
3458 		}
3459 
3460 		dlb_hw_do_enqueue(qm_port, i == 0, port_data);
3461 
3462 		/* Don't include the token pop QE in the release count */
3463 		i += j - pop_offs;
3464 	}
3465 
3466 sw_credit_update:
3467 	/* each release returns one credit */
3468 	if (!ev_port->outstanding_releases) {
3469 		DLB_LOG_ERR("Unrecoverable application error. Outstanding releases underflowed.\n");
3470 		rte_errno = -ENOTRECOVERABLE;
3471 		return rte_errno;
3472 	}
3473 
3474 	ev_port->outstanding_releases -= i;
3475 	ev_port->inflight_credits += i;
3476 
3477 	/* Replenish s/w credits if enough releases are performed */
3478 	dlb_replenish_sw_credits(dlb, ev_port);
3479 	return 0;
3480 }
3481 
3482 static uint16_t
dlb_event_dequeue_burst(void * event_port,struct rte_event * ev,uint16_t num,uint64_t wait)3483 dlb_event_dequeue_burst(void *event_port, struct rte_event *ev, uint16_t num,
3484 			uint64_t wait)
3485 {
3486 	struct dlb_eventdev_port *ev_port = event_port;
3487 	struct dlb_port *qm_port = &ev_port->qm_port;
3488 	struct dlb_eventdev *dlb = ev_port->dlb;
3489 	uint16_t cnt;
3490 	int ret;
3491 
3492 	rte_errno = 0;
3493 
3494 	RTE_ASSERT(ev_port->setup_done);
3495 	RTE_ASSERT(ev != NULL);
3496 
3497 	if (ev_port->implicit_release && ev_port->outstanding_releases > 0) {
3498 		uint16_t out_rels = ev_port->outstanding_releases;
3499 
3500 		ret = dlb_event_release(dlb, ev_port->id, out_rels);
3501 		if (ret)
3502 			return(ret);
3503 
3504 		DLB_INC_STAT(ev_port->stats.tx_implicit_rel, out_rels);
3505 	}
3506 
3507 	if (qm_port->token_pop_mode == DEFERRED_POP &&
3508 			qm_port->owed_tokens)
3509 		dlb_consume_qe_immediate(qm_port, qm_port->owed_tokens);
3510 
3511 	cnt = dlb_hw_dequeue(dlb, ev_port, ev, num, wait);
3512 
3513 	DLB_INC_STAT(ev_port->stats.traffic.total_polls, 1);
3514 	DLB_INC_STAT(ev_port->stats.traffic.zero_polls, ((cnt == 0) ? 1 : 0));
3515 	return cnt;
3516 }
3517 
3518 static uint16_t
dlb_event_dequeue(void * event_port,struct rte_event * ev,uint64_t wait)3519 dlb_event_dequeue(void *event_port, struct rte_event *ev, uint64_t wait)
3520 {
3521 	return dlb_event_dequeue_burst(event_port, ev, 1, wait);
3522 }
3523 
3524 static uint16_t
dlb_event_dequeue_burst_sparse(void * event_port,struct rte_event * ev,uint16_t num,uint64_t wait)3525 dlb_event_dequeue_burst_sparse(void *event_port, struct rte_event *ev,
3526 			       uint16_t num, uint64_t wait)
3527 {
3528 	struct dlb_eventdev_port *ev_port = event_port;
3529 	struct dlb_port *qm_port = &ev_port->qm_port;
3530 	struct dlb_eventdev *dlb = ev_port->dlb;
3531 	uint16_t cnt;
3532 	int ret;
3533 
3534 	rte_errno = 0;
3535 
3536 	RTE_ASSERT(ev_port->setup_done);
3537 	RTE_ASSERT(ev != NULL);
3538 
3539 	if (ev_port->implicit_release && ev_port->outstanding_releases > 0) {
3540 		uint16_t out_rels = ev_port->outstanding_releases;
3541 
3542 		ret = dlb_event_release(dlb, ev_port->id, out_rels);
3543 		if (ret)
3544 			return(ret);
3545 
3546 		DLB_INC_STAT(ev_port->stats.tx_implicit_rel, out_rels);
3547 	}
3548 
3549 	if (qm_port->token_pop_mode == DEFERRED_POP &&
3550 	    qm_port->owed_tokens)
3551 		dlb_consume_qe_immediate(qm_port, qm_port->owed_tokens);
3552 
3553 	cnt = dlb_hw_dequeue_sparse(dlb, ev_port, ev, num, wait);
3554 
3555 	DLB_INC_STAT(ev_port->stats.traffic.total_polls, 1);
3556 	DLB_INC_STAT(ev_port->stats.traffic.zero_polls, ((cnt == 0) ? 1 : 0));
3557 	return cnt;
3558 }
3559 
3560 static uint16_t
dlb_event_dequeue_sparse(void * event_port,struct rte_event * ev,uint64_t wait)3561 dlb_event_dequeue_sparse(void *event_port, struct rte_event *ev, uint64_t wait)
3562 {
3563 	return dlb_event_dequeue_burst_sparse(event_port, ev, 1, wait);
3564 }
3565 
3566 static uint32_t
dlb_get_ldb_queue_depth(struct dlb_eventdev * dlb,struct dlb_eventdev_queue * queue)3567 dlb_get_ldb_queue_depth(struct dlb_eventdev *dlb,
3568 			struct dlb_eventdev_queue *queue)
3569 {
3570 	struct dlb_hw_dev *handle = &dlb->qm_instance;
3571 	struct dlb_get_ldb_queue_depth_args cfg;
3572 	struct dlb_cmd_response response;
3573 	int ret;
3574 
3575 	cfg.queue_id = queue->qm_queue.id;
3576 	cfg.response = (uintptr_t)&response;
3577 
3578 	ret = dlb_iface_get_ldb_queue_depth(handle, &cfg);
3579 	if (ret < 0) {
3580 		DLB_LOG_ERR("dlb: get_ldb_queue_depth ret=%d (driver status: %s)\n",
3581 			    ret, dlb_error_strings[response.status]);
3582 		return ret;
3583 	}
3584 
3585 	return response.id;
3586 }
3587 
3588 static uint32_t
dlb_get_dir_queue_depth(struct dlb_eventdev * dlb,struct dlb_eventdev_queue * queue)3589 dlb_get_dir_queue_depth(struct dlb_eventdev *dlb,
3590 			struct dlb_eventdev_queue *queue)
3591 {
3592 	struct dlb_hw_dev *handle = &dlb->qm_instance;
3593 	struct dlb_get_dir_queue_depth_args cfg;
3594 	struct dlb_cmd_response response;
3595 	int ret;
3596 
3597 	cfg.queue_id = queue->qm_queue.id;
3598 	cfg.response = (uintptr_t)&response;
3599 
3600 	ret = dlb_iface_get_dir_queue_depth(handle, &cfg);
3601 	if (ret < 0) {
3602 		DLB_LOG_ERR("dlb: get_dir_queue_depth ret=%d (driver status: %s)\n",
3603 			    ret, dlb_error_strings[response.status]);
3604 		return ret;
3605 	}
3606 
3607 	return response.id;
3608 }
3609 
3610 uint32_t
dlb_get_queue_depth(struct dlb_eventdev * dlb,struct dlb_eventdev_queue * queue)3611 dlb_get_queue_depth(struct dlb_eventdev *dlb,
3612 		    struct dlb_eventdev_queue *queue)
3613 {
3614 	if (queue->qm_queue.is_directed)
3615 		return dlb_get_dir_queue_depth(dlb, queue);
3616 	else
3617 		return dlb_get_ldb_queue_depth(dlb, queue);
3618 }
3619 
3620 static bool
dlb_queue_is_empty(struct dlb_eventdev * dlb,struct dlb_eventdev_queue * queue)3621 dlb_queue_is_empty(struct dlb_eventdev *dlb,
3622 		   struct dlb_eventdev_queue *queue)
3623 {
3624 	return dlb_get_queue_depth(dlb, queue) == 0;
3625 }
3626 
3627 static bool
dlb_linked_queues_empty(struct dlb_eventdev * dlb)3628 dlb_linked_queues_empty(struct dlb_eventdev *dlb)
3629 {
3630 	int i;
3631 
3632 	for (i = 0; i < dlb->num_queues; i++) {
3633 		if (dlb->ev_queues[i].num_links == 0)
3634 			continue;
3635 		if (!dlb_queue_is_empty(dlb, &dlb->ev_queues[i]))
3636 			return false;
3637 	}
3638 
3639 	return true;
3640 }
3641 
3642 static bool
dlb_queues_empty(struct dlb_eventdev * dlb)3643 dlb_queues_empty(struct dlb_eventdev *dlb)
3644 {
3645 	int i;
3646 
3647 	for (i = 0; i < dlb->num_queues; i++) {
3648 		if (!dlb_queue_is_empty(dlb, &dlb->ev_queues[i]))
3649 			return false;
3650 	}
3651 
3652 	return true;
3653 }
3654 
3655 static void
dlb_flush_port(struct rte_eventdev * dev,int port_id)3656 dlb_flush_port(struct rte_eventdev *dev, int port_id)
3657 {
3658 	struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
3659 	eventdev_stop_flush_t flush;
3660 	struct rte_event ev;
3661 	uint8_t dev_id;
3662 	void *arg;
3663 	int i;
3664 
3665 	flush = dev->dev_ops->dev_stop_flush;
3666 	dev_id = dev->data->dev_id;
3667 	arg = dev->data->dev_stop_flush_arg;
3668 
3669 	while (rte_event_dequeue_burst(dev_id, port_id, &ev, 1, 0)) {
3670 		if (flush)
3671 			flush(dev_id, ev, arg);
3672 
3673 		if (dlb->ev_ports[port_id].qm_port.is_directed)
3674 			continue;
3675 
3676 		ev.op = RTE_EVENT_OP_RELEASE;
3677 
3678 		rte_event_enqueue_burst(dev_id, port_id, &ev, 1);
3679 	}
3680 
3681 	/* Enqueue any additional outstanding releases */
3682 	ev.op = RTE_EVENT_OP_RELEASE;
3683 
3684 	for (i = dlb->ev_ports[port_id].outstanding_releases; i > 0; i--)
3685 		rte_event_enqueue_burst(dev_id, port_id, &ev, 1);
3686 }
3687 
3688 static void
dlb_drain(struct rte_eventdev * dev)3689 dlb_drain(struct rte_eventdev *dev)
3690 {
3691 	struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
3692 	struct dlb_eventdev_port *ev_port = NULL;
3693 	uint8_t dev_id;
3694 	int i;
3695 
3696 	dev_id = dev->data->dev_id;
3697 
3698 	while (!dlb_linked_queues_empty(dlb)) {
3699 		/* Flush all the ev_ports, which will drain all their connected
3700 		 * queues.
3701 		 */
3702 		for (i = 0; i < dlb->num_ports; i++)
3703 			dlb_flush_port(dev, i);
3704 	}
3705 
3706 	/* The queues are empty, but there may be events left in the ports. */
3707 	for (i = 0; i < dlb->num_ports; i++)
3708 		dlb_flush_port(dev, i);
3709 
3710 	/* If the domain's queues are empty, we're done. */
3711 	if (dlb_queues_empty(dlb))
3712 		return;
3713 
3714 	/* Else, there must be at least one unlinked load-balanced queue.
3715 	 * Select a load-balanced port with which to drain the unlinked
3716 	 * queue(s).
3717 	 */
3718 	for (i = 0; i < dlb->num_ports; i++) {
3719 		ev_port = &dlb->ev_ports[i];
3720 
3721 		if (!ev_port->qm_port.is_directed)
3722 			break;
3723 	}
3724 
3725 	if (i == dlb->num_ports) {
3726 		DLB_LOG_ERR("internal error: no LDB ev_ports\n");
3727 		return;
3728 	}
3729 
3730 	rte_errno = 0;
3731 	rte_event_port_unlink(dev_id, ev_port->id, NULL, 0);
3732 
3733 	if (rte_errno) {
3734 		DLB_LOG_ERR("internal error: failed to unlink ev_port %d\n",
3735 			    ev_port->id);
3736 		return;
3737 	}
3738 
3739 	for (i = 0; i < dlb->num_queues; i++) {
3740 		uint8_t qid, prio;
3741 		int ret;
3742 
3743 		if (dlb_queue_is_empty(dlb, &dlb->ev_queues[i]))
3744 			continue;
3745 
3746 		qid = i;
3747 		prio = 0;
3748 
3749 		/* Link the ev_port to the queue */
3750 		ret = rte_event_port_link(dev_id, ev_port->id, &qid, &prio, 1);
3751 		if (ret != 1) {
3752 			DLB_LOG_ERR("internal error: failed to link ev_port %d to queue %d\n",
3753 				    ev_port->id, qid);
3754 			return;
3755 		}
3756 
3757 		/* Flush the queue */
3758 		while (!dlb_queue_is_empty(dlb, &dlb->ev_queues[i]))
3759 			dlb_flush_port(dev, ev_port->id);
3760 
3761 		/* Drain any extant events in the ev_port. */
3762 		dlb_flush_port(dev, ev_port->id);
3763 
3764 		/* Unlink the ev_port from the queue */
3765 		ret = rte_event_port_unlink(dev_id, ev_port->id, &qid, 1);
3766 		if (ret != 1) {
3767 			DLB_LOG_ERR("internal error: failed to unlink ev_port %d to queue %d\n",
3768 				    ev_port->id, qid);
3769 			return;
3770 		}
3771 	}
3772 }
3773 
3774 static void
dlb_eventdev_stop(struct rte_eventdev * dev)3775 dlb_eventdev_stop(struct rte_eventdev *dev)
3776 {
3777 	struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
3778 
3779 	rte_spinlock_lock(&dlb->qm_instance.resource_lock);
3780 
3781 	if (dlb->run_state == DLB_RUN_STATE_STOPPED) {
3782 		DLB_LOG_DBG("Internal error: already stopped\n");
3783 		rte_spinlock_unlock(&dlb->qm_instance.resource_lock);
3784 		return;
3785 	} else if (dlb->run_state != DLB_RUN_STATE_STARTED) {
3786 		DLB_LOG_ERR("Internal error: bad state %d for dev_stop\n",
3787 			    (int)dlb->run_state);
3788 		rte_spinlock_unlock(&dlb->qm_instance.resource_lock);
3789 		return;
3790 	}
3791 
3792 	dlb->run_state = DLB_RUN_STATE_STOPPING;
3793 
3794 	rte_spinlock_unlock(&dlb->qm_instance.resource_lock);
3795 
3796 	dlb_drain(dev);
3797 
3798 	dlb->run_state = DLB_RUN_STATE_STOPPED;
3799 }
3800 
3801 static int
dlb_eventdev_close(struct rte_eventdev * dev)3802 dlb_eventdev_close(struct rte_eventdev *dev)
3803 {
3804 	dlb_hw_reset_sched_domain(dev, false);
3805 
3806 	return 0;
3807 }
3808 
3809 static void
dlb_eventdev_port_release(void * port)3810 dlb_eventdev_port_release(void *port)
3811 {
3812 	struct dlb_eventdev_port *ev_port = port;
3813 
3814 	if (ev_port) {
3815 		struct dlb_port *qm_port = &ev_port->qm_port;
3816 
3817 		if (qm_port->config_state == DLB_CONFIGURED)
3818 			dlb_free_qe_mem(qm_port);
3819 	}
3820 }
3821 
3822 static void
dlb_eventdev_queue_release(struct rte_eventdev * dev,uint8_t id)3823 dlb_eventdev_queue_release(struct rte_eventdev *dev, uint8_t id)
3824 {
3825 	RTE_SET_USED(dev);
3826 	RTE_SET_USED(id);
3827 
3828 	/* This function intentionally left blank. */
3829 }
3830 
3831 static int
dlb_eventdev_timeout_ticks(struct rte_eventdev * dev,uint64_t ns,uint64_t * timeout_ticks)3832 dlb_eventdev_timeout_ticks(struct rte_eventdev *dev, uint64_t ns,
3833 			   uint64_t *timeout_ticks)
3834 {
3835 	RTE_SET_USED(dev);
3836 	uint64_t cycles_per_ns = rte_get_timer_hz() / 1E9;
3837 
3838 	*timeout_ticks = ns * cycles_per_ns;
3839 
3840 	return 0;
3841 }
3842 
3843 void
dlb_entry_points_init(struct rte_eventdev * dev)3844 dlb_entry_points_init(struct rte_eventdev *dev)
3845 {
3846 	struct dlb_eventdev *dlb;
3847 
3848 	static struct rte_eventdev_ops dlb_eventdev_entry_ops = {
3849 		.dev_infos_get    = dlb_eventdev_info_get,
3850 		.dev_configure    = dlb_eventdev_configure,
3851 		.dev_start        = dlb_eventdev_start,
3852 		.dev_stop         = dlb_eventdev_stop,
3853 		.dev_close        = dlb_eventdev_close,
3854 		.queue_def_conf   = dlb_eventdev_queue_default_conf_get,
3855 		.port_def_conf    = dlb_eventdev_port_default_conf_get,
3856 		.queue_setup      = dlb_eventdev_queue_setup,
3857 		.queue_release    = dlb_eventdev_queue_release,
3858 		.port_setup       = dlb_eventdev_port_setup,
3859 		.port_release     = dlb_eventdev_port_release,
3860 		.port_link        = dlb_eventdev_port_link,
3861 		.port_unlink      = dlb_eventdev_port_unlink,
3862 		.port_unlinks_in_progress =
3863 				    dlb_eventdev_port_unlinks_in_progress,
3864 		.timeout_ticks    = dlb_eventdev_timeout_ticks,
3865 		.dump             = dlb_eventdev_dump,
3866 		.xstats_get       = dlb_eventdev_xstats_get,
3867 		.xstats_get_names = dlb_eventdev_xstats_get_names,
3868 		.xstats_get_by_name = dlb_eventdev_xstats_get_by_name,
3869 		.xstats_reset	    = dlb_eventdev_xstats_reset,
3870 		.dev_selftest     = test_dlb_eventdev,
3871 	};
3872 
3873 	/* Expose PMD's eventdev interface */
3874 	dev->dev_ops = &dlb_eventdev_entry_ops;
3875 
3876 	dev->enqueue = dlb_event_enqueue;
3877 	dev->enqueue_burst = dlb_event_enqueue_burst;
3878 	dev->enqueue_new_burst = dlb_event_enqueue_new_burst;
3879 	dev->enqueue_forward_burst = dlb_event_enqueue_forward_burst;
3880 	dev->dequeue = dlb_event_dequeue;
3881 	dev->dequeue_burst = dlb_event_dequeue_burst;
3882 
3883 	dlb = dev->data->dev_private;
3884 
3885 	if (dlb->poll_mode == DLB_CQ_POLL_MODE_SPARSE) {
3886 		dev->dequeue = dlb_event_dequeue_sparse;
3887 		dev->dequeue_burst = dlb_event_dequeue_burst_sparse;
3888 	}
3889 }
3890 
3891 int
dlb_primary_eventdev_probe(struct rte_eventdev * dev,const char * name,struct dlb_devargs * dlb_args)3892 dlb_primary_eventdev_probe(struct rte_eventdev *dev,
3893 			   const char *name,
3894 			   struct dlb_devargs *dlb_args)
3895 {
3896 	struct dlb_eventdev *dlb;
3897 	int err, i;
3898 
3899 	dlb = dev->data->dev_private;
3900 
3901 	dlb->event_dev = dev; /* backlink */
3902 
3903 	evdev_dlb_default_info.driver_name = name;
3904 
3905 	dlb->max_num_events_override = dlb_args->max_num_events;
3906 	dlb->num_dir_credits_override = dlb_args->num_dir_credits_override;
3907 	dlb->defer_sched = dlb_args->defer_sched;
3908 	dlb->num_atm_inflights_per_queue = dlb_args->num_atm_inflights;
3909 
3910 	/* Open the interface.
3911 	 * For vdev mode, this means open the dlb kernel module.
3912 	 */
3913 	err = dlb_iface_open(&dlb->qm_instance, name);
3914 	if (err < 0) {
3915 		DLB_LOG_ERR("could not open event hardware device, err=%d\n",
3916 			    err);
3917 		return err;
3918 	}
3919 
3920 	err = dlb_iface_get_device_version(&dlb->qm_instance, &dlb->revision);
3921 	if (err < 0) {
3922 		DLB_LOG_ERR("dlb: failed to get the device version, err=%d\n",
3923 			    err);
3924 		return err;
3925 	}
3926 
3927 	err = dlb_hw_query_resources(dlb);
3928 	if (err) {
3929 		DLB_LOG_ERR("get resources err=%d for %s\n", err, name);
3930 		return err;
3931 	}
3932 
3933 	err = dlb_iface_get_cq_poll_mode(&dlb->qm_instance, &dlb->poll_mode);
3934 	if (err < 0) {
3935 		DLB_LOG_ERR("dlb: failed to get the poll mode, err=%d\n", err);
3936 		return err;
3937 	}
3938 
3939 	/* Complete xtstats runtime initialization */
3940 	err = dlb_xstats_init(dlb);
3941 	if (err) {
3942 		DLB_LOG_ERR("dlb: failed to init xstats, err=%d\n", err);
3943 		return err;
3944 	}
3945 
3946 	/* Initialize each port's token pop mode */
3947 	for (i = 0; i < DLB_MAX_NUM_PORTS; i++)
3948 		dlb->ev_ports[i].qm_port.token_pop_mode = AUTO_POP;
3949 
3950 	rte_spinlock_init(&dlb->qm_instance.resource_lock);
3951 
3952 	dlb_iface_low_level_io_init(dlb);
3953 
3954 	dlb_entry_points_init(dev);
3955 
3956 	return 0;
3957 }
3958 
3959 int
dlb_secondary_eventdev_probe(struct rte_eventdev * dev,const char * name)3960 dlb_secondary_eventdev_probe(struct rte_eventdev *dev,
3961 			     const char *name)
3962 {
3963 	struct dlb_eventdev *dlb;
3964 	int err;
3965 
3966 	dlb = dev->data->dev_private;
3967 
3968 	evdev_dlb_default_info.driver_name = name;
3969 
3970 	err = dlb_iface_open(&dlb->qm_instance, name);
3971 	if (err < 0) {
3972 		DLB_LOG_ERR("could not open event hardware device, err=%d\n",
3973 			    err);
3974 		return err;
3975 	}
3976 
3977 	err = dlb_hw_query_resources(dlb);
3978 	if (err) {
3979 		DLB_LOG_ERR("get resources err=%d for %s\n", err, name);
3980 		return err;
3981 	}
3982 
3983 	dlb_iface_low_level_io_init(dlb);
3984 
3985 	dlb_entry_points_init(dev);
3986 
3987 	return 0;
3988 }
3989 
3990 int
dlb_parse_params(const char * params,const char * name,struct dlb_devargs * dlb_args)3991 dlb_parse_params(const char *params,
3992 		 const char *name,
3993 		 struct dlb_devargs *dlb_args)
3994 {
3995 	int ret = 0;
3996 	static const char * const args[] = { NUMA_NODE_ARG,
3997 					     DLB_MAX_NUM_EVENTS,
3998 					     DLB_NUM_DIR_CREDITS,
3999 					     DEV_ID_ARG,
4000 					     DLB_DEFER_SCHED_ARG,
4001 					     DLB_NUM_ATM_INFLIGHTS_ARG,
4002 					     NULL };
4003 
4004 	if (params && params[0] != '\0') {
4005 		struct rte_kvargs *kvlist = rte_kvargs_parse(params, args);
4006 
4007 		if (kvlist == NULL) {
4008 			DLB_LOG_INFO("Ignoring unsupported parameters when creating device '%s'\n",
4009 				     name);
4010 		} else {
4011 			int ret = rte_kvargs_process(kvlist, NUMA_NODE_ARG,
4012 						     set_numa_node,
4013 						     &dlb_args->socket_id);
4014 			if (ret != 0) {
4015 				DLB_LOG_ERR("%s: Error parsing numa node parameter",
4016 					    name);
4017 				rte_kvargs_free(kvlist);
4018 				return ret;
4019 			}
4020 
4021 			ret = rte_kvargs_process(kvlist, DLB_MAX_NUM_EVENTS,
4022 						 set_max_num_events,
4023 						 &dlb_args->max_num_events);
4024 			if (ret != 0) {
4025 				DLB_LOG_ERR("%s: Error parsing max_num_events parameter",
4026 					    name);
4027 				rte_kvargs_free(kvlist);
4028 				return ret;
4029 			}
4030 
4031 			ret = rte_kvargs_process(kvlist,
4032 					DLB_NUM_DIR_CREDITS,
4033 					set_num_dir_credits,
4034 					&dlb_args->num_dir_credits_override);
4035 			if (ret != 0) {
4036 				DLB_LOG_ERR("%s: Error parsing num_dir_credits parameter",
4037 					    name);
4038 				rte_kvargs_free(kvlist);
4039 				return ret;
4040 			}
4041 
4042 			ret = rte_kvargs_process(kvlist, DEV_ID_ARG,
4043 						 set_dev_id,
4044 						 &dlb_args->dev_id);
4045 			if (ret != 0) {
4046 				DLB_LOG_ERR("%s: Error parsing dev_id parameter",
4047 					    name);
4048 				rte_kvargs_free(kvlist);
4049 				return ret;
4050 			}
4051 
4052 			ret = rte_kvargs_process(kvlist, DLB_DEFER_SCHED_ARG,
4053 						 set_defer_sched,
4054 						 &dlb_args->defer_sched);
4055 			if (ret != 0) {
4056 				DLB_LOG_ERR("%s: Error parsing defer_sched parameter",
4057 					    name);
4058 				rte_kvargs_free(kvlist);
4059 				return ret;
4060 			}
4061 
4062 			ret = rte_kvargs_process(kvlist,
4063 						 DLB_NUM_ATM_INFLIGHTS_ARG,
4064 						 set_num_atm_inflights,
4065 						 &dlb_args->num_atm_inflights);
4066 			if (ret != 0) {
4067 				DLB_LOG_ERR("%s: Error parsing atm_inflights parameter",
4068 					    name);
4069 				rte_kvargs_free(kvlist);
4070 				return ret;
4071 			}
4072 
4073 			rte_kvargs_free(kvlist);
4074 		}
4075 	}
4076 	return ret;
4077 }
4078 RTE_LOG_REGISTER(eventdev_dlb_log_level, pmd.event.dlb, NOTICE);
4079