1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2016-2020 Intel Corporation
3 */
4
5 #include <assert.h>
6 #include <errno.h>
7 #include <nmmintrin.h>
8 #include <pthread.h>
9 #include <stdbool.h>
10 #include <stdint.h>
11 #include <stdio.h>
12 #include <string.h>
13 #include <sys/fcntl.h>
14 #include <sys/mman.h>
15 #include <unistd.h>
16
17 #include <rte_common.h>
18 #include <rte_config.h>
19 #include <rte_cycles.h>
20 #include <rte_debug.h>
21 #include <rte_dev.h>
22 #include <rte_errno.h>
23 #include <rte_io.h>
24 #include <rte_kvargs.h>
25 #include <rte_log.h>
26 #include <rte_malloc.h>
27 #include <rte_mbuf.h>
28 #include <rte_power_intrinsics.h>
29 #include <rte_prefetch.h>
30 #include <rte_ring.h>
31 #include <rte_string_fns.h>
32
33 #include <rte_eventdev.h>
34 #include <rte_eventdev_pmd.h>
35
36 #include "dlb_priv.h"
37 #include "dlb_iface.h"
38 #include "dlb_inline_fns.h"
39
40 /*
41 * Resources exposed to eventdev.
42 */
43 #if (RTE_EVENT_MAX_QUEUES_PER_DEV > UINT8_MAX)
44 #error "RTE_EVENT_MAX_QUEUES_PER_DEV cannot fit in member max_event_queues"
45 #endif
46 static struct rte_event_dev_info evdev_dlb_default_info = {
47 .driver_name = "", /* probe will set */
48 .min_dequeue_timeout_ns = DLB_MIN_DEQUEUE_TIMEOUT_NS,
49 .max_dequeue_timeout_ns = DLB_MAX_DEQUEUE_TIMEOUT_NS,
50 #if (RTE_EVENT_MAX_QUEUES_PER_DEV < DLB_MAX_NUM_LDB_QUEUES)
51 .max_event_queues = RTE_EVENT_MAX_QUEUES_PER_DEV,
52 #else
53 .max_event_queues = DLB_MAX_NUM_LDB_QUEUES,
54 #endif
55 .max_event_queue_flows = DLB_MAX_NUM_FLOWS,
56 .max_event_queue_priority_levels = DLB_QID_PRIORITIES,
57 .max_event_priority_levels = DLB_QID_PRIORITIES,
58 .max_event_ports = DLB_MAX_NUM_LDB_PORTS,
59 .max_event_port_dequeue_depth = DLB_MAX_CQ_DEPTH,
60 .max_event_port_enqueue_depth = DLB_MAX_ENQUEUE_DEPTH,
61 .max_event_port_links = DLB_MAX_NUM_QIDS_PER_LDB_CQ,
62 .max_num_events = DLB_MAX_NUM_LDB_CREDITS,
63 .max_single_link_event_port_queue_pairs = DLB_MAX_NUM_DIR_PORTS,
64 .event_dev_cap = (RTE_EVENT_DEV_CAP_QUEUE_QOS |
65 RTE_EVENT_DEV_CAP_EVENT_QOS |
66 RTE_EVENT_DEV_CAP_BURST_MODE |
67 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED |
68 RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE |
69 RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES),
70 };
71
72 struct process_local_port_data
73 dlb_port[DLB_MAX_NUM_PORTS][NUM_DLB_PORT_TYPES];
74
75 static inline uint16_t
76 dlb_event_enqueue_delayed(void *event_port,
77 const struct rte_event events[]);
78
79 static inline uint16_t
80 dlb_event_enqueue_burst_delayed(void *event_port,
81 const struct rte_event events[],
82 uint16_t num);
83
84 static inline uint16_t
85 dlb_event_enqueue_new_burst_delayed(void *event_port,
86 const struct rte_event events[],
87 uint16_t num);
88
89 static inline uint16_t
90 dlb_event_enqueue_forward_burst_delayed(void *event_port,
91 const struct rte_event events[],
92 uint16_t num);
93
94 static int
dlb_hw_query_resources(struct dlb_eventdev * dlb)95 dlb_hw_query_resources(struct dlb_eventdev *dlb)
96 {
97 struct dlb_hw_dev *handle = &dlb->qm_instance;
98 struct dlb_hw_resource_info *dlb_info = &handle->info;
99 int ret;
100
101 ret = dlb_iface_get_num_resources(handle,
102 &dlb->hw_rsrc_query_results);
103 if (ret) {
104 DLB_LOG_ERR("get dlb num resources, err=%d\n", ret);
105 return ret;
106 }
107
108 /* Complete filling in device resource info returned to evdev app,
109 * overriding any default values.
110 * The capabilities (CAPs) were set at compile time.
111 */
112
113 evdev_dlb_default_info.max_event_queues =
114 dlb->hw_rsrc_query_results.num_ldb_queues;
115
116 evdev_dlb_default_info.max_event_ports =
117 dlb->hw_rsrc_query_results.num_ldb_ports;
118
119 evdev_dlb_default_info.max_num_events =
120 dlb->hw_rsrc_query_results.max_contiguous_ldb_credits;
121
122 /* Save off values used when creating the scheduling domain. */
123
124 handle->info.num_sched_domains =
125 dlb->hw_rsrc_query_results.num_sched_domains;
126
127 handle->info.hw_rsrc_max.nb_events_limit =
128 dlb->hw_rsrc_query_results.max_contiguous_ldb_credits;
129
130 handle->info.hw_rsrc_max.num_queues =
131 dlb->hw_rsrc_query_results.num_ldb_queues +
132 dlb->hw_rsrc_query_results.num_dir_ports;
133
134 handle->info.hw_rsrc_max.num_ldb_queues =
135 dlb->hw_rsrc_query_results.num_ldb_queues;
136
137 handle->info.hw_rsrc_max.num_ldb_ports =
138 dlb->hw_rsrc_query_results.num_ldb_ports;
139
140 handle->info.hw_rsrc_max.num_dir_ports =
141 dlb->hw_rsrc_query_results.num_dir_ports;
142
143 handle->info.hw_rsrc_max.reorder_window_size =
144 dlb->hw_rsrc_query_results.num_hist_list_entries;
145
146 rte_memcpy(dlb_info, &handle->info.hw_rsrc_max, sizeof(*dlb_info));
147
148 return 0;
149 }
150
151 static void
dlb_free_qe_mem(struct dlb_port * qm_port)152 dlb_free_qe_mem(struct dlb_port *qm_port)
153 {
154 if (qm_port == NULL)
155 return;
156
157 rte_free(qm_port->qe4);
158 qm_port->qe4 = NULL;
159
160 rte_free(qm_port->consume_qe);
161 qm_port->consume_qe = NULL;
162
163 rte_memzone_free(dlb_port[qm_port->id][PORT_TYPE(qm_port)].mz);
164 dlb_port[qm_port->id][PORT_TYPE(qm_port)].mz = NULL;
165 }
166
167 static int
dlb_init_consume_qe(struct dlb_port * qm_port,char * mz_name)168 dlb_init_consume_qe(struct dlb_port *qm_port, char *mz_name)
169 {
170 struct dlb_cq_pop_qe *qe;
171
172 qe = rte_zmalloc(mz_name,
173 DLB_NUM_QES_PER_CACHE_LINE *
174 sizeof(struct dlb_cq_pop_qe),
175 RTE_CACHE_LINE_SIZE);
176
177 if (qe == NULL) {
178 DLB_LOG_ERR("dlb: no memory for consume_qe\n");
179 return -ENOMEM;
180 }
181
182 qm_port->consume_qe = qe;
183
184 qe->qe_valid = 0;
185 qe->qe_frag = 0;
186 qe->qe_comp = 0;
187 qe->cq_token = 1;
188 /* Tokens value is 0-based; i.e. '0' returns 1 token, '1' returns 2,
189 * and so on.
190 */
191 qe->tokens = 0; /* set at run time */
192 qe->meas_lat = 0;
193 qe->no_dec = 0;
194 /* Completion IDs are disabled */
195 qe->cmp_id = 0;
196
197 return 0;
198 }
199
200 static int
dlb_init_qe_mem(struct dlb_port * qm_port,char * mz_name)201 dlb_init_qe_mem(struct dlb_port *qm_port, char *mz_name)
202 {
203 int ret, sz;
204
205 sz = DLB_NUM_QES_PER_CACHE_LINE * sizeof(struct dlb_enqueue_qe);
206
207 qm_port->qe4 = rte_zmalloc(mz_name, sz, RTE_CACHE_LINE_SIZE);
208
209 if (qm_port->qe4 == NULL) {
210 DLB_LOG_ERR("dlb: no qe4 memory\n");
211 ret = -ENOMEM;
212 goto error_exit;
213 }
214
215 ret = dlb_init_consume_qe(qm_port, mz_name);
216 if (ret < 0) {
217 DLB_LOG_ERR("dlb: dlb_init_consume_qe ret=%d\n", ret);
218 goto error_exit;
219 }
220
221 return 0;
222
223 error_exit:
224
225 dlb_free_qe_mem(qm_port);
226
227 return ret;
228 }
229
230 /* Wrapper for string to int conversion. Substituted for atoi(...), which is
231 * unsafe.
232 */
233 #define DLB_BASE_10 10
234
235 static int
dlb_string_to_int(int * result,const char * str)236 dlb_string_to_int(int *result, const char *str)
237 {
238 long ret;
239 char *endstr;
240
241 if (str == NULL || result == NULL)
242 return -EINVAL;
243
244 errno = 0;
245 ret = strtol(str, &endstr, DLB_BASE_10);
246 if (errno)
247 return -errno;
248
249 /* long int and int may be different width for some architectures */
250 if (ret < INT_MIN || ret > INT_MAX || endstr == str)
251 return -EINVAL;
252
253 *result = ret;
254 return 0;
255 }
256
257 static int
set_numa_node(const char * key __rte_unused,const char * value,void * opaque)258 set_numa_node(const char *key __rte_unused, const char *value, void *opaque)
259 {
260 int *socket_id = opaque;
261 int ret;
262
263 ret = dlb_string_to_int(socket_id, value);
264 if (ret < 0)
265 return ret;
266
267 if (*socket_id > RTE_MAX_NUMA_NODES)
268 return -EINVAL;
269
270 return 0;
271 }
272
273 static int
set_max_num_events(const char * key __rte_unused,const char * value,void * opaque)274 set_max_num_events(const char *key __rte_unused,
275 const char *value,
276 void *opaque)
277 {
278 int *max_num_events = opaque;
279 int ret;
280
281 if (value == NULL || opaque == NULL) {
282 DLB_LOG_ERR("NULL pointer\n");
283 return -EINVAL;
284 }
285
286 ret = dlb_string_to_int(max_num_events, value);
287 if (ret < 0)
288 return ret;
289
290 if (*max_num_events < 0 || *max_num_events > DLB_MAX_NUM_LDB_CREDITS) {
291 DLB_LOG_ERR("dlb: max_num_events must be between 0 and %d\n",
292 DLB_MAX_NUM_LDB_CREDITS);
293 return -EINVAL;
294 }
295
296 return 0;
297 }
298
299 static int
set_num_dir_credits(const char * key __rte_unused,const char * value,void * opaque)300 set_num_dir_credits(const char *key __rte_unused,
301 const char *value,
302 void *opaque)
303 {
304 int *num_dir_credits = opaque;
305 int ret;
306
307 if (value == NULL || opaque == NULL) {
308 DLB_LOG_ERR("NULL pointer\n");
309 return -EINVAL;
310 }
311
312 ret = dlb_string_to_int(num_dir_credits, value);
313 if (ret < 0)
314 return ret;
315
316 if (*num_dir_credits < 0 ||
317 *num_dir_credits > DLB_MAX_NUM_DIR_CREDITS) {
318 DLB_LOG_ERR("dlb: num_dir_credits must be between 0 and %d\n",
319 DLB_MAX_NUM_DIR_CREDITS);
320 return -EINVAL;
321 }
322 return 0;
323 }
324
325 /* VDEV-only notes:
326 * This function first unmaps all memory mappings and closes the
327 * domain's file descriptor, which causes the driver to reset the
328 * scheduling domain. Once that completes (when close() returns), we
329 * can safely free the dynamically allocated memory used by the
330 * scheduling domain.
331 *
332 * PF-only notes:
333 * We will maintain a use count and use that to determine when
334 * a reset is required. In PF mode, we never mmap, or munmap
335 * device memory, and we own the entire physical PCI device.
336 */
337
338 static void
dlb_hw_reset_sched_domain(const struct rte_eventdev * dev,bool reconfig)339 dlb_hw_reset_sched_domain(const struct rte_eventdev *dev, bool reconfig)
340 {
341 struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
342 enum dlb_configuration_state config_state;
343 int i, j;
344
345 /* Close and reset the domain */
346 dlb_iface_domain_close(dlb);
347
348 /* Free all dynamically allocated port memory */
349 for (i = 0; i < dlb->num_ports; i++)
350 dlb_free_qe_mem(&dlb->ev_ports[i].qm_port);
351
352 /* If reconfiguring, mark the device's queues and ports as "previously
353 * configured." If the user does not reconfigure them, the PMD will
354 * reapply their previous configuration when the device is started.
355 */
356 config_state = (reconfig) ? DLB_PREV_CONFIGURED : DLB_NOT_CONFIGURED;
357
358 for (i = 0; i < dlb->num_ports; i++) {
359 dlb->ev_ports[i].qm_port.config_state = config_state;
360 /* Reset setup_done so ports can be reconfigured */
361 dlb->ev_ports[i].setup_done = false;
362 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++)
363 dlb->ev_ports[i].link[j].mapped = false;
364 }
365
366 for (i = 0; i < dlb->num_queues; i++)
367 dlb->ev_queues[i].qm_queue.config_state = config_state;
368
369 for (i = 0; i < DLB_MAX_NUM_QUEUES; i++)
370 dlb->ev_queues[i].setup_done = false;
371
372 dlb->num_ports = 0;
373 dlb->num_ldb_ports = 0;
374 dlb->num_dir_ports = 0;
375 dlb->num_queues = 0;
376 dlb->num_ldb_queues = 0;
377 dlb->num_dir_queues = 0;
378 dlb->configured = false;
379 }
380
381 static int
dlb_ldb_credit_pool_create(struct dlb_hw_dev * handle)382 dlb_ldb_credit_pool_create(struct dlb_hw_dev *handle)
383 {
384 struct dlb_create_ldb_pool_args cfg;
385 struct dlb_cmd_response response;
386 int ret;
387
388 if (handle == NULL)
389 return -EINVAL;
390
391 if (!handle->cfg.resources.num_ldb_credits) {
392 handle->cfg.ldb_credit_pool_id = 0;
393 handle->cfg.num_ldb_credits = 0;
394 return 0;
395 }
396
397 cfg.response = (uintptr_t)&response;
398 cfg.num_ldb_credits = handle->cfg.resources.num_ldb_credits;
399
400 ret = dlb_iface_ldb_credit_pool_create(handle,
401 &cfg);
402 if (ret < 0) {
403 DLB_LOG_ERR("dlb: ldb_credit_pool_create ret=%d (driver status: %s)\n",
404 ret, dlb_error_strings[response.status]);
405 }
406
407 handle->cfg.ldb_credit_pool_id = response.id;
408 handle->cfg.num_ldb_credits = cfg.num_ldb_credits;
409
410 return ret;
411 }
412
413 static int
dlb_dir_credit_pool_create(struct dlb_hw_dev * handle)414 dlb_dir_credit_pool_create(struct dlb_hw_dev *handle)
415 {
416 struct dlb_create_dir_pool_args cfg;
417 struct dlb_cmd_response response;
418 int ret;
419
420 if (handle == NULL)
421 return -EINVAL;
422
423 if (!handle->cfg.resources.num_dir_credits) {
424 handle->cfg.dir_credit_pool_id = 0;
425 handle->cfg.num_dir_credits = 0;
426 return 0;
427 }
428
429 cfg.response = (uintptr_t)&response;
430 cfg.num_dir_credits = handle->cfg.resources.num_dir_credits;
431
432 ret = dlb_iface_dir_credit_pool_create(handle, &cfg);
433 if (ret < 0)
434 DLB_LOG_ERR("dlb: dir_credit_pool_create ret=%d (driver status: %s)\n",
435 ret, dlb_error_strings[response.status]);
436
437 handle->cfg.dir_credit_pool_id = response.id;
438 handle->cfg.num_dir_credits = cfg.num_dir_credits;
439
440 return ret;
441 }
442
443 static int
dlb_hw_create_sched_domain(struct dlb_hw_dev * handle,struct dlb_eventdev * dlb,const struct dlb_hw_rsrcs * resources_asked)444 dlb_hw_create_sched_domain(struct dlb_hw_dev *handle,
445 struct dlb_eventdev *dlb,
446 const struct dlb_hw_rsrcs *resources_asked)
447 {
448 int ret = 0;
449 struct dlb_create_sched_domain_args *config_params;
450 struct dlb_cmd_response response;
451
452 if (resources_asked == NULL) {
453 DLB_LOG_ERR("dlb: dlb_create NULL parameter\n");
454 ret = EINVAL;
455 goto error_exit;
456 }
457
458 /* Map generic qm resources to dlb resources */
459 config_params = &handle->cfg.resources;
460
461 config_params->response = (uintptr_t)&response;
462
463 /* DIR ports and queues */
464
465 config_params->num_dir_ports =
466 resources_asked->num_dir_ports;
467
468 config_params->num_dir_credits =
469 resources_asked->num_dir_credits;
470
471 /* LDB ports and queues */
472
473 config_params->num_ldb_queues =
474 resources_asked->num_ldb_queues;
475
476 config_params->num_ldb_ports =
477 resources_asked->num_ldb_ports;
478
479 config_params->num_ldb_credits =
480 resources_asked->num_ldb_credits;
481
482 config_params->num_atomic_inflights =
483 dlb->num_atm_inflights_per_queue *
484 config_params->num_ldb_queues;
485
486 config_params->num_hist_list_entries = config_params->num_ldb_ports *
487 DLB_NUM_HIST_LIST_ENTRIES_PER_LDB_PORT;
488
489 /* dlb limited to 1 credit pool per queue type */
490 config_params->num_ldb_credit_pools = 1;
491 config_params->num_dir_credit_pools = 1;
492
493 DLB_LOG_DBG("sched domain create - ldb_qs=%d, ldb_ports=%d, dir_ports=%d, atomic_inflights=%d, hist_list_entries=%d, ldb_credits=%d, dir_credits=%d, ldb_cred_pools=%d, dir-credit_pools=%d\n",
494 config_params->num_ldb_queues,
495 config_params->num_ldb_ports,
496 config_params->num_dir_ports,
497 config_params->num_atomic_inflights,
498 config_params->num_hist_list_entries,
499 config_params->num_ldb_credits,
500 config_params->num_dir_credits,
501 config_params->num_ldb_credit_pools,
502 config_params->num_dir_credit_pools);
503
504 /* Configure the QM */
505
506 ret = dlb_iface_sched_domain_create(handle, config_params);
507 if (ret < 0) {
508 DLB_LOG_ERR("dlb: domain create failed, device_id = %d, (driver ret = %d, extra status: %s)\n",
509 handle->device_id,
510 ret,
511 dlb_error_strings[response.status]);
512 goto error_exit;
513 }
514
515 handle->domain_id = response.id;
516 handle->domain_id_valid = 1;
517
518 config_params->response = 0;
519
520 ret = dlb_ldb_credit_pool_create(handle);
521 if (ret < 0) {
522 DLB_LOG_ERR("dlb: create ldb credit pool failed\n");
523 goto error_exit2;
524 }
525
526 ret = dlb_dir_credit_pool_create(handle);
527 if (ret < 0) {
528 DLB_LOG_ERR("dlb: create dir credit pool failed\n");
529 goto error_exit2;
530 }
531
532 handle->cfg.configured = true;
533
534 return 0;
535
536 error_exit2:
537 dlb_iface_domain_close(dlb);
538
539 error_exit:
540 return ret;
541 }
542
543 /* End HW specific */
544 static void
dlb_eventdev_info_get(struct rte_eventdev * dev,struct rte_event_dev_info * dev_info)545 dlb_eventdev_info_get(struct rte_eventdev *dev,
546 struct rte_event_dev_info *dev_info)
547 {
548 struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
549 int ret;
550
551 ret = dlb_hw_query_resources(dlb);
552 if (ret) {
553 const struct rte_eventdev_data *data = dev->data;
554
555 DLB_LOG_ERR("get resources err=%d, devid=%d\n",
556 ret, data->dev_id);
557 /* fn is void, so fall through and return values set up in
558 * probe
559 */
560 }
561
562 /* Add num resources currently owned by this domain.
563 * These would become available if the scheduling domain were reset due
564 * to the application recalling eventdev_configure to *reconfigure* the
565 * domain.
566 */
567 evdev_dlb_default_info.max_event_ports += dlb->num_ldb_ports;
568 evdev_dlb_default_info.max_event_queues += dlb->num_ldb_queues;
569 evdev_dlb_default_info.max_num_events += dlb->num_ldb_credits;
570
571 /* In DLB A-stepping hardware, applications are limited to 128
572 * configured ports (load-balanced or directed). The reported number of
573 * available ports must reflect this.
574 */
575 if (dlb->revision < DLB_REV_B0) {
576 int used_ports;
577
578 used_ports = DLB_MAX_NUM_LDB_PORTS + DLB_MAX_NUM_DIR_PORTS -
579 dlb->hw_rsrc_query_results.num_ldb_ports -
580 dlb->hw_rsrc_query_results.num_dir_ports;
581
582 evdev_dlb_default_info.max_event_ports =
583 RTE_MIN(evdev_dlb_default_info.max_event_ports,
584 128 - used_ports);
585 }
586
587 evdev_dlb_default_info.max_event_queues =
588 RTE_MIN(evdev_dlb_default_info.max_event_queues,
589 RTE_EVENT_MAX_QUEUES_PER_DEV);
590
591 evdev_dlb_default_info.max_num_events =
592 RTE_MIN(evdev_dlb_default_info.max_num_events,
593 dlb->max_num_events_override);
594
595 *dev_info = evdev_dlb_default_info;
596 }
597
598 /* Note: 1 QM instance per QM device, QM instance/device == event device */
599 static int
dlb_eventdev_configure(const struct rte_eventdev * dev)600 dlb_eventdev_configure(const struct rte_eventdev *dev)
601 {
602 struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
603 struct dlb_hw_dev *handle = &dlb->qm_instance;
604 struct dlb_hw_rsrcs *rsrcs = &handle->info.hw_rsrc_max;
605 const struct rte_eventdev_data *data = dev->data;
606 const struct rte_event_dev_config *config = &data->dev_conf;
607 int ret;
608
609 /* If this eventdev is already configured, we must release the current
610 * scheduling domain before attempting to configure a new one.
611 */
612 if (dlb->configured) {
613 dlb_hw_reset_sched_domain(dev, true);
614
615 ret = dlb_hw_query_resources(dlb);
616 if (ret) {
617 DLB_LOG_ERR("get resources err=%d, devid=%d\n",
618 ret, data->dev_id);
619 return ret;
620 }
621 }
622
623 if (config->nb_event_queues > rsrcs->num_queues) {
624 DLB_LOG_ERR("nb_event_queues parameter (%d) exceeds the QM device's capabilities (%d).\n",
625 config->nb_event_queues,
626 rsrcs->num_queues);
627 return -EINVAL;
628 }
629 if (config->nb_event_ports > (rsrcs->num_ldb_ports
630 + rsrcs->num_dir_ports)) {
631 DLB_LOG_ERR("nb_event_ports parameter (%d) exceeds the QM device's capabilities (%d).\n",
632 config->nb_event_ports,
633 (rsrcs->num_ldb_ports + rsrcs->num_dir_ports));
634 return -EINVAL;
635 }
636 if (config->nb_events_limit > rsrcs->nb_events_limit) {
637 DLB_LOG_ERR("nb_events_limit parameter (%d) exceeds the QM device's capabilities (%d).\n",
638 config->nb_events_limit,
639 rsrcs->nb_events_limit);
640 return -EINVAL;
641 }
642
643 if (config->event_dev_cfg & RTE_EVENT_DEV_CFG_PER_DEQUEUE_TIMEOUT)
644 dlb->global_dequeue_wait = false;
645 else {
646 uint32_t timeout32;
647
648 dlb->global_dequeue_wait = true;
649
650 timeout32 = config->dequeue_timeout_ns;
651
652 dlb->global_dequeue_wait_ticks =
653 timeout32 * (rte_get_timer_hz() / 1E9);
654 }
655
656 /* Does this platform support umonitor/umwait? */
657 if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_WAITPKG)) {
658 if (RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE != 0 &&
659 RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE != 1) {
660 DLB_LOG_ERR("invalid value (%d) for RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE must be 0 or 1.\n",
661 RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE);
662 return -EINVAL;
663 }
664 dlb->umwait_allowed = true;
665 }
666
667 rsrcs->num_dir_ports = config->nb_single_link_event_port_queues;
668 rsrcs->num_ldb_ports = config->nb_event_ports - rsrcs->num_dir_ports;
669 /* 1 dir queue per dir port */
670 rsrcs->num_ldb_queues = config->nb_event_queues - rsrcs->num_dir_ports;
671
672 /* Scale down nb_events_limit by 4 for directed credits, since there
673 * are 4x as many load-balanced credits.
674 */
675 rsrcs->num_ldb_credits = 0;
676 rsrcs->num_dir_credits = 0;
677
678 if (rsrcs->num_ldb_queues)
679 rsrcs->num_ldb_credits = config->nb_events_limit;
680 if (rsrcs->num_dir_ports)
681 rsrcs->num_dir_credits = config->nb_events_limit / 4;
682 if (dlb->num_dir_credits_override != -1)
683 rsrcs->num_dir_credits = dlb->num_dir_credits_override;
684
685 if (dlb_hw_create_sched_domain(handle, dlb, rsrcs) < 0) {
686 DLB_LOG_ERR("dlb_hw_create_sched_domain failed\n");
687 return -ENODEV;
688 }
689
690 dlb->new_event_limit = config->nb_events_limit;
691 __atomic_store_n(&dlb->inflights, 0, __ATOMIC_SEQ_CST);
692
693 /* Save number of ports/queues for this event dev */
694 dlb->num_ports = config->nb_event_ports;
695 dlb->num_queues = config->nb_event_queues;
696 dlb->num_dir_ports = rsrcs->num_dir_ports;
697 dlb->num_ldb_ports = dlb->num_ports - dlb->num_dir_ports;
698 dlb->num_ldb_queues = dlb->num_queues - dlb->num_dir_ports;
699 dlb->num_dir_queues = dlb->num_dir_ports;
700 dlb->num_ldb_credits = rsrcs->num_ldb_credits;
701 dlb->num_dir_credits = rsrcs->num_dir_credits;
702
703 dlb->configured = true;
704
705 return 0;
706 }
707
708 static int16_t
dlb_hw_unmap_ldb_qid_from_port(struct dlb_hw_dev * handle,uint32_t qm_port_id,uint16_t qm_qid)709 dlb_hw_unmap_ldb_qid_from_port(struct dlb_hw_dev *handle,
710 uint32_t qm_port_id,
711 uint16_t qm_qid)
712 {
713 struct dlb_unmap_qid_args cfg;
714 struct dlb_cmd_response response;
715 int32_t ret;
716
717 if (handle == NULL)
718 return -EINVAL;
719
720 cfg.response = (uintptr_t)&response;
721 cfg.port_id = qm_port_id;
722 cfg.qid = qm_qid;
723
724 ret = dlb_iface_unmap_qid(handle, &cfg);
725 if (ret < 0)
726 DLB_LOG_ERR("dlb: unmap qid error, ret=%d (driver status: %s)\n",
727 ret, dlb_error_strings[response.status]);
728
729 return ret;
730 }
731
732 static int
dlb_event_queue_detach_ldb(struct dlb_eventdev * dlb,struct dlb_eventdev_port * ev_port,struct dlb_eventdev_queue * ev_queue)733 dlb_event_queue_detach_ldb(struct dlb_eventdev *dlb,
734 struct dlb_eventdev_port *ev_port,
735 struct dlb_eventdev_queue *ev_queue)
736 {
737 int ret, i;
738
739 /* Don't unlink until start time. */
740 if (dlb->run_state == DLB_RUN_STATE_STOPPED)
741 return 0;
742
743 for (i = 0; i < DLB_MAX_NUM_QIDS_PER_LDB_CQ; i++) {
744 if (ev_port->link[i].valid &&
745 ev_port->link[i].queue_id == ev_queue->id)
746 break; /* found */
747 }
748
749 /* This is expected with eventdev API!
750 * It blindly attempts to unmap all queues.
751 */
752 if (i == DLB_MAX_NUM_QIDS_PER_LDB_CQ) {
753 DLB_LOG_DBG("dlb: ignoring LB QID %d not mapped for qm_port %d.\n",
754 ev_queue->qm_queue.id,
755 ev_port->qm_port.id);
756 return 0;
757 }
758
759 ret = dlb_hw_unmap_ldb_qid_from_port(&dlb->qm_instance,
760 ev_port->qm_port.id,
761 ev_queue->qm_queue.id);
762 if (!ret)
763 ev_port->link[i].mapped = false;
764
765 return ret;
766 }
767
768 static int
dlb_eventdev_port_unlink(struct rte_eventdev * dev,void * event_port,uint8_t queues[],uint16_t nb_unlinks)769 dlb_eventdev_port_unlink(struct rte_eventdev *dev, void *event_port,
770 uint8_t queues[], uint16_t nb_unlinks)
771 {
772 struct dlb_eventdev_port *ev_port = event_port;
773 struct dlb_eventdev *dlb;
774 int i;
775
776 RTE_SET_USED(dev);
777
778 if (!ev_port->setup_done) {
779 DLB_LOG_ERR("dlb: evport %d is not configured\n",
780 ev_port->id);
781 rte_errno = -EINVAL;
782 return 0;
783 }
784
785 if (queues == NULL || nb_unlinks == 0) {
786 DLB_LOG_DBG("dlb: queues is NULL or nb_unlinks is 0\n");
787 return 0; /* Ignore and return success */
788 }
789
790 if (ev_port->qm_port.is_directed) {
791 DLB_LOG_DBG("dlb: ignore unlink from dir port %d\n",
792 ev_port->id);
793 rte_errno = 0;
794 return nb_unlinks; /* as if success */
795 }
796
797 dlb = ev_port->dlb;
798
799 for (i = 0; i < nb_unlinks; i++) {
800 struct dlb_eventdev_queue *ev_queue;
801 int ret, j;
802
803 if (queues[i] >= dlb->num_queues) {
804 DLB_LOG_ERR("dlb: invalid queue id %d\n", queues[i]);
805 rte_errno = -EINVAL;
806 return i; /* return index of offending queue */
807 }
808
809 ev_queue = &dlb->ev_queues[queues[i]];
810
811 /* Does a link exist? */
812 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++)
813 if (ev_port->link[j].queue_id == queues[i] &&
814 ev_port->link[j].valid)
815 break;
816
817 if (j == DLB_MAX_NUM_QIDS_PER_LDB_CQ)
818 continue;
819
820 ret = dlb_event_queue_detach_ldb(dlb, ev_port, ev_queue);
821 if (ret) {
822 DLB_LOG_ERR("unlink err=%d for port %d queue %d\n",
823 ret, ev_port->id, queues[i]);
824 rte_errno = -ENOENT;
825 return i; /* return index of offending queue */
826 }
827
828 ev_port->link[j].valid = false;
829 ev_port->num_links--;
830 ev_queue->num_links--;
831 }
832
833 return nb_unlinks;
834 }
835
836 static int
dlb_eventdev_port_unlinks_in_progress(struct rte_eventdev * dev,void * event_port)837 dlb_eventdev_port_unlinks_in_progress(struct rte_eventdev *dev,
838 void *event_port)
839 {
840 struct dlb_eventdev_port *ev_port = event_port;
841 struct dlb_eventdev *dlb;
842 struct dlb_hw_dev *handle;
843 struct dlb_pending_port_unmaps_args cfg;
844 struct dlb_cmd_response response;
845 int ret;
846
847 RTE_SET_USED(dev);
848
849 if (!ev_port->setup_done) {
850 DLB_LOG_ERR("dlb: evport %d is not configured\n",
851 ev_port->id);
852 rte_errno = -EINVAL;
853 return 0;
854 }
855
856 cfg.port_id = ev_port->qm_port.id;
857 cfg.response = (uintptr_t)&response;
858 dlb = ev_port->dlb;
859 handle = &dlb->qm_instance;
860 ret = dlb_iface_pending_port_unmaps(handle, &cfg);
861
862 if (ret < 0) {
863 DLB_LOG_ERR("dlb: num_unlinks_in_progress ret=%d (driver status: %s)\n",
864 ret, dlb_error_strings[response.status]);
865 return ret;
866 }
867
868 return response.id;
869 }
870
871 static void
dlb_eventdev_port_default_conf_get(struct rte_eventdev * dev,uint8_t port_id,struct rte_event_port_conf * port_conf)872 dlb_eventdev_port_default_conf_get(struct rte_eventdev *dev,
873 uint8_t port_id,
874 struct rte_event_port_conf *port_conf)
875 {
876 RTE_SET_USED(port_id);
877 struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
878
879 port_conf->new_event_threshold = dlb->new_event_limit;
880 port_conf->dequeue_depth = 32;
881 port_conf->enqueue_depth = DLB_MAX_ENQUEUE_DEPTH;
882 port_conf->event_port_cfg = 0;
883 }
884
885 static void
dlb_eventdev_queue_default_conf_get(struct rte_eventdev * dev,uint8_t queue_id,struct rte_event_queue_conf * queue_conf)886 dlb_eventdev_queue_default_conf_get(struct rte_eventdev *dev,
887 uint8_t queue_id,
888 struct rte_event_queue_conf *queue_conf)
889 {
890 RTE_SET_USED(dev);
891 RTE_SET_USED(queue_id);
892 queue_conf->nb_atomic_flows = 1024;
893 queue_conf->nb_atomic_order_sequences = 32;
894 queue_conf->event_queue_cfg = 0;
895 queue_conf->priority = 0;
896 }
897
898 static int
dlb_hw_create_ldb_port(struct dlb_eventdev * dlb,struct dlb_eventdev_port * ev_port,uint32_t dequeue_depth,uint32_t cq_depth,uint32_t enqueue_depth,uint16_t rsvd_tokens,bool use_rsvd_token_scheme)899 dlb_hw_create_ldb_port(struct dlb_eventdev *dlb,
900 struct dlb_eventdev_port *ev_port,
901 uint32_t dequeue_depth,
902 uint32_t cq_depth,
903 uint32_t enqueue_depth,
904 uint16_t rsvd_tokens,
905 bool use_rsvd_token_scheme)
906 {
907 struct dlb_hw_dev *handle = &dlb->qm_instance;
908 struct dlb_create_ldb_port_args cfg = {0};
909 struct dlb_cmd_response response = {0};
910 int ret;
911 struct dlb_port *qm_port = NULL;
912 char mz_name[RTE_MEMZONE_NAMESIZE];
913 uint32_t qm_port_id;
914
915 if (handle == NULL)
916 return -EINVAL;
917
918 if (cq_depth < DLB_MIN_LDB_CQ_DEPTH) {
919 DLB_LOG_ERR("dlb: invalid cq_depth, must be %d-%d\n",
920 DLB_MIN_LDB_CQ_DEPTH, DLB_MAX_INPUT_QUEUE_DEPTH);
921 return -EINVAL;
922 }
923
924 if (enqueue_depth < DLB_MIN_ENQUEUE_DEPTH) {
925 DLB_LOG_ERR("dlb: invalid enqueue_depth, must be at least %d\n",
926 DLB_MIN_ENQUEUE_DEPTH);
927 return -EINVAL;
928 }
929
930 rte_spinlock_lock(&handle->resource_lock);
931
932 cfg.response = (uintptr_t)&response;
933
934 /* We round up to the next power of 2 if necessary */
935 cfg.cq_depth = rte_align32pow2(cq_depth);
936 cfg.cq_depth_threshold = rsvd_tokens;
937
938 cfg.cq_history_list_size = DLB_NUM_HIST_LIST_ENTRIES_PER_LDB_PORT;
939
940 /* User controls the LDB high watermark via enqueue depth. The DIR high
941 * watermark is equal, unless the directed credit pool is too small.
942 */
943 cfg.ldb_credit_high_watermark = enqueue_depth;
944
945 /* If there are no directed ports, the kernel driver will ignore this
946 * port's directed credit settings. Don't use enqueue_depth if it would
947 * require more directed credits than are available.
948 */
949 cfg.dir_credit_high_watermark =
950 RTE_MIN(enqueue_depth,
951 handle->cfg.num_dir_credits / dlb->num_ports);
952
953 cfg.ldb_credit_quantum = cfg.ldb_credit_high_watermark / 2;
954 cfg.ldb_credit_low_watermark = RTE_MIN(16, cfg.ldb_credit_quantum);
955
956 cfg.dir_credit_quantum = cfg.dir_credit_high_watermark / 2;
957 cfg.dir_credit_low_watermark = RTE_MIN(16, cfg.dir_credit_quantum);
958
959 /* Per QM values */
960
961 cfg.ldb_credit_pool_id = handle->cfg.ldb_credit_pool_id;
962 cfg.dir_credit_pool_id = handle->cfg.dir_credit_pool_id;
963
964 ret = dlb_iface_ldb_port_create(handle, &cfg, dlb->poll_mode);
965 if (ret < 0) {
966 DLB_LOG_ERR("dlb: dlb_ldb_port_create error, ret=%d (driver status: %s)\n",
967 ret, dlb_error_strings[response.status]);
968 goto error_exit;
969 }
970
971 qm_port_id = response.id;
972
973 DLB_LOG_DBG("dlb: ev_port %d uses qm LB port %d <<<<<\n",
974 ev_port->id, qm_port_id);
975
976 qm_port = &ev_port->qm_port;
977 qm_port->ev_port = ev_port; /* back ptr */
978 qm_port->dlb = dlb; /* back ptr */
979
980 /*
981 * Allocate and init local qe struct(s).
982 * Note: MOVDIR64 requires the enqueue QE (qe4) to be aligned.
983 */
984
985 snprintf(mz_name, sizeof(mz_name), "ldb_port%d",
986 ev_port->id);
987
988 ret = dlb_init_qe_mem(qm_port, mz_name);
989 if (ret < 0) {
990 DLB_LOG_ERR("dlb: init_qe_mem failed, ret=%d\n", ret);
991 goto error_exit;
992 }
993
994 qm_port->pp_mmio_base = DLB_LDB_PP_BASE + PAGE_SIZE * qm_port_id;
995 qm_port->id = qm_port_id;
996
997 /* The credit window is one high water mark of QEs */
998 qm_port->ldb_pushcount_at_credit_expiry = 0;
999 qm_port->cached_ldb_credits = cfg.ldb_credit_high_watermark;
1000 /* The credit window is one high water mark of QEs */
1001 qm_port->dir_pushcount_at_credit_expiry = 0;
1002 qm_port->cached_dir_credits = cfg.dir_credit_high_watermark;
1003 /* CQs with depth < 8 use an 8-entry queue, but withhold credits so
1004 * the effective depth is smaller.
1005 */
1006 qm_port->cq_depth = cfg.cq_depth <= 8 ? 8 : cfg.cq_depth;
1007 qm_port->cq_idx = 0;
1008 qm_port->cq_idx_unmasked = 0;
1009 if (dlb->poll_mode == DLB_CQ_POLL_MODE_SPARSE)
1010 qm_port->cq_depth_mask = (qm_port->cq_depth * 4) - 1;
1011 else
1012 qm_port->cq_depth_mask = qm_port->cq_depth - 1;
1013
1014 qm_port->gen_bit_shift = __builtin_popcount(qm_port->cq_depth_mask);
1015 /* starting value of gen bit - it toggles at wrap time */
1016 qm_port->gen_bit = 1;
1017
1018 qm_port->use_rsvd_token_scheme = use_rsvd_token_scheme;
1019 qm_port->cq_rsvd_token_deficit = rsvd_tokens;
1020 qm_port->int_armed = false;
1021
1022 /* Save off for later use in info and lookup APIs. */
1023 qm_port->qid_mappings = &dlb->qm_ldb_to_ev_queue_id[0];
1024
1025 qm_port->dequeue_depth = dequeue_depth;
1026
1027 /* When using the reserved token scheme, token_pop_thresh is
1028 * initially 2 * dequeue_depth. Once the tokens are reserved,
1029 * the enqueue code re-assigns it to dequeue_depth.
1030 */
1031 qm_port->token_pop_thresh = cq_depth;
1032
1033 /* When the deferred scheduling vdev arg is selected, use deferred pop
1034 * for all single-entry CQs.
1035 */
1036 if (cfg.cq_depth == 1 || (cfg.cq_depth == 2 && use_rsvd_token_scheme)) {
1037 if (dlb->defer_sched)
1038 qm_port->token_pop_mode = DEFERRED_POP;
1039 }
1040
1041 /* The default enqueue functions do not include delayed-pop support for
1042 * performance reasons.
1043 */
1044 if (qm_port->token_pop_mode == DELAYED_POP) {
1045 dlb->event_dev->enqueue = dlb_event_enqueue_delayed;
1046 dlb->event_dev->enqueue_burst =
1047 dlb_event_enqueue_burst_delayed;
1048 dlb->event_dev->enqueue_new_burst =
1049 dlb_event_enqueue_new_burst_delayed;
1050 dlb->event_dev->enqueue_forward_burst =
1051 dlb_event_enqueue_forward_burst_delayed;
1052 }
1053
1054 qm_port->owed_tokens = 0;
1055 qm_port->issued_releases = 0;
1056
1057 /* update state */
1058 qm_port->state = PORT_STARTED; /* enabled at create time */
1059 qm_port->config_state = DLB_CONFIGURED;
1060
1061 qm_port->dir_credits = cfg.dir_credit_high_watermark;
1062 qm_port->ldb_credits = cfg.ldb_credit_high_watermark;
1063
1064 DLB_LOG_DBG("dlb: created ldb port %d, depth = %d, ldb credits=%d, dir credits=%d\n",
1065 qm_port_id,
1066 cq_depth,
1067 qm_port->ldb_credits,
1068 qm_port->dir_credits);
1069
1070 rte_spinlock_unlock(&handle->resource_lock);
1071
1072 return 0;
1073
1074 error_exit:
1075 if (qm_port) {
1076 dlb_free_qe_mem(qm_port);
1077 qm_port->pp_mmio_base = 0;
1078 }
1079
1080 rte_spinlock_unlock(&handle->resource_lock);
1081
1082 DLB_LOG_ERR("dlb: create ldb port failed!\n");
1083
1084 return ret;
1085 }
1086
1087 static int
dlb_hw_create_dir_port(struct dlb_eventdev * dlb,struct dlb_eventdev_port * ev_port,uint32_t dequeue_depth,uint32_t cq_depth,uint32_t enqueue_depth,uint16_t rsvd_tokens,bool use_rsvd_token_scheme)1088 dlb_hw_create_dir_port(struct dlb_eventdev *dlb,
1089 struct dlb_eventdev_port *ev_port,
1090 uint32_t dequeue_depth,
1091 uint32_t cq_depth,
1092 uint32_t enqueue_depth,
1093 uint16_t rsvd_tokens,
1094 bool use_rsvd_token_scheme)
1095 {
1096 struct dlb_hw_dev *handle = &dlb->qm_instance;
1097 struct dlb_create_dir_port_args cfg = {0};
1098 struct dlb_cmd_response response = {0};
1099 int ret;
1100 struct dlb_port *qm_port = NULL;
1101 char mz_name[RTE_MEMZONE_NAMESIZE];
1102 uint32_t qm_port_id;
1103
1104 if (dlb == NULL || handle == NULL)
1105 return -EINVAL;
1106
1107 if (cq_depth < DLB_MIN_DIR_CQ_DEPTH) {
1108 DLB_LOG_ERR("dlb: invalid cq_depth, must be at least %d\n",
1109 DLB_MIN_DIR_CQ_DEPTH);
1110 return -EINVAL;
1111 }
1112
1113 if (enqueue_depth < DLB_MIN_ENQUEUE_DEPTH) {
1114 DLB_LOG_ERR("dlb: invalid enqueue_depth, must be at least %d\n",
1115 DLB_MIN_ENQUEUE_DEPTH);
1116 return -EINVAL;
1117 }
1118
1119 rte_spinlock_lock(&handle->resource_lock);
1120
1121 /* Directed queues are configured at link time. */
1122 cfg.queue_id = -1;
1123
1124 cfg.response = (uintptr_t)&response;
1125
1126 /* We round up to the next power of 2 if necessary */
1127 cfg.cq_depth = rte_align32pow2(cq_depth);
1128 cfg.cq_depth_threshold = rsvd_tokens;
1129
1130 /* User controls the LDB high watermark via enqueue depth. The DIR high
1131 * watermark is equal, unless the directed credit pool is too small.
1132 */
1133 cfg.ldb_credit_high_watermark = enqueue_depth;
1134
1135 /* Don't use enqueue_depth if it would require more directed credits
1136 * than are available.
1137 */
1138 cfg.dir_credit_high_watermark =
1139 RTE_MIN(enqueue_depth,
1140 handle->cfg.num_dir_credits / dlb->num_ports);
1141
1142 cfg.ldb_credit_quantum = cfg.ldb_credit_high_watermark / 2;
1143 cfg.ldb_credit_low_watermark = RTE_MIN(16, cfg.ldb_credit_quantum);
1144
1145 cfg.dir_credit_quantum = cfg.dir_credit_high_watermark / 2;
1146 cfg.dir_credit_low_watermark = RTE_MIN(16, cfg.dir_credit_quantum);
1147
1148 /* Per QM values */
1149
1150 cfg.ldb_credit_pool_id = handle->cfg.ldb_credit_pool_id;
1151 cfg.dir_credit_pool_id = handle->cfg.dir_credit_pool_id;
1152
1153 ret = dlb_iface_dir_port_create(handle, &cfg, dlb->poll_mode);
1154 if (ret < 0) {
1155 DLB_LOG_ERR("dlb: dlb_dir_port_create error, ret=%d (driver status: %s)\n",
1156 ret, dlb_error_strings[response.status]);
1157 goto error_exit;
1158 }
1159
1160 qm_port_id = response.id;
1161
1162 DLB_LOG_DBG("dlb: ev_port %d uses qm DIR port %d <<<<<\n",
1163 ev_port->id, qm_port_id);
1164
1165 qm_port = &ev_port->qm_port;
1166 qm_port->ev_port = ev_port; /* back ptr */
1167 qm_port->dlb = dlb; /* back ptr */
1168
1169 /*
1170 * Init local qe struct(s).
1171 * Note: MOVDIR64 requires the enqueue QE to be aligned
1172 */
1173
1174 snprintf(mz_name, sizeof(mz_name), "dir_port%d",
1175 ev_port->id);
1176
1177 ret = dlb_init_qe_mem(qm_port, mz_name);
1178
1179 if (ret < 0) {
1180 DLB_LOG_ERR("dlb: init_qe_mem failed, ret=%d\n", ret);
1181 goto error_exit;
1182 }
1183
1184 qm_port->pp_mmio_base = DLB_DIR_PP_BASE + PAGE_SIZE * qm_port_id;
1185 qm_port->id = qm_port_id;
1186
1187 /* The credit window is one high water mark of QEs */
1188 qm_port->ldb_pushcount_at_credit_expiry = 0;
1189 qm_port->cached_ldb_credits = cfg.ldb_credit_high_watermark;
1190 /* The credit window is one high water mark of QEs */
1191 qm_port->dir_pushcount_at_credit_expiry = 0;
1192 qm_port->cached_dir_credits = cfg.dir_credit_high_watermark;
1193 qm_port->cq_depth = cfg.cq_depth;
1194 qm_port->cq_idx = 0;
1195 qm_port->cq_idx_unmasked = 0;
1196 if (dlb->poll_mode == DLB_CQ_POLL_MODE_SPARSE)
1197 qm_port->cq_depth_mask = (cfg.cq_depth * 4) - 1;
1198 else
1199 qm_port->cq_depth_mask = cfg.cq_depth - 1;
1200
1201 qm_port->gen_bit_shift = __builtin_popcount(qm_port->cq_depth_mask);
1202 /* starting value of gen bit - it toggles at wrap time */
1203 qm_port->gen_bit = 1;
1204
1205 qm_port->use_rsvd_token_scheme = use_rsvd_token_scheme;
1206 qm_port->cq_rsvd_token_deficit = rsvd_tokens;
1207 qm_port->int_armed = false;
1208
1209 /* Save off for later use in info and lookup APIs. */
1210 qm_port->qid_mappings = &dlb->qm_dir_to_ev_queue_id[0];
1211
1212 qm_port->dequeue_depth = dequeue_depth;
1213
1214 /* Directed ports are auto-pop, by default. */
1215 qm_port->token_pop_mode = AUTO_POP;
1216 qm_port->owed_tokens = 0;
1217 qm_port->issued_releases = 0;
1218
1219 /* update state */
1220 qm_port->state = PORT_STARTED; /* enabled at create time */
1221 qm_port->config_state = DLB_CONFIGURED;
1222
1223 qm_port->dir_credits = cfg.dir_credit_high_watermark;
1224 qm_port->ldb_credits = cfg.ldb_credit_high_watermark;
1225
1226 DLB_LOG_DBG("dlb: created dir port %d, depth = %d cr=%d,%d\n",
1227 qm_port_id,
1228 cq_depth,
1229 cfg.dir_credit_high_watermark,
1230 cfg.ldb_credit_high_watermark);
1231
1232 rte_spinlock_unlock(&handle->resource_lock);
1233
1234 return 0;
1235
1236 error_exit:
1237 if (qm_port) {
1238 qm_port->pp_mmio_base = 0;
1239 dlb_free_qe_mem(qm_port);
1240 }
1241
1242 rte_spinlock_unlock(&handle->resource_lock);
1243
1244 DLB_LOG_ERR("dlb: create dir port failed!\n");
1245
1246 return ret;
1247 }
1248
1249 static int32_t
dlb_hw_create_ldb_queue(struct dlb_eventdev * dlb,struct dlb_queue * queue,const struct rte_event_queue_conf * evq_conf)1250 dlb_hw_create_ldb_queue(struct dlb_eventdev *dlb,
1251 struct dlb_queue *queue,
1252 const struct rte_event_queue_conf *evq_conf)
1253 {
1254 struct dlb_hw_dev *handle = &dlb->qm_instance;
1255 struct dlb_create_ldb_queue_args cfg;
1256 struct dlb_cmd_response response;
1257 int32_t ret;
1258 uint32_t qm_qid;
1259 int sched_type = -1;
1260
1261 if (evq_conf == NULL)
1262 return -EINVAL;
1263
1264 if (evq_conf->event_queue_cfg & RTE_EVENT_QUEUE_CFG_ALL_TYPES) {
1265 if (evq_conf->nb_atomic_order_sequences != 0)
1266 sched_type = RTE_SCHED_TYPE_ORDERED;
1267 else
1268 sched_type = RTE_SCHED_TYPE_PARALLEL;
1269 } else
1270 sched_type = evq_conf->schedule_type;
1271
1272 cfg.response = (uintptr_t)&response;
1273 cfg.num_atomic_inflights = dlb->num_atm_inflights_per_queue;
1274 cfg.num_sequence_numbers = evq_conf->nb_atomic_order_sequences;
1275 cfg.num_qid_inflights = evq_conf->nb_atomic_order_sequences;
1276
1277 if (sched_type != RTE_SCHED_TYPE_ORDERED) {
1278 cfg.num_sequence_numbers = 0;
1279 cfg.num_qid_inflights = DLB_DEF_UNORDERED_QID_INFLIGHTS;
1280 }
1281
1282 ret = dlb_iface_ldb_queue_create(handle, &cfg);
1283 if (ret < 0) {
1284 DLB_LOG_ERR("dlb: create LB event queue error, ret=%d (driver status: %s)\n",
1285 ret, dlb_error_strings[response.status]);
1286 return -EINVAL;
1287 }
1288
1289 qm_qid = response.id;
1290
1291 /* Save off queue config for debug, resource lookups, and reconfig */
1292 queue->num_qid_inflights = cfg.num_qid_inflights;
1293 queue->num_atm_inflights = cfg.num_atomic_inflights;
1294
1295 queue->sched_type = sched_type;
1296 queue->config_state = DLB_CONFIGURED;
1297
1298 DLB_LOG_DBG("Created LB event queue %d, nb_inflights=%d, nb_seq=%d, qid inflights=%d\n",
1299 qm_qid,
1300 cfg.num_atomic_inflights,
1301 cfg.num_sequence_numbers,
1302 cfg.num_qid_inflights);
1303
1304 return qm_qid;
1305 }
1306
1307 static int32_t
dlb_get_sn_allocation(struct dlb_eventdev * dlb,int group)1308 dlb_get_sn_allocation(struct dlb_eventdev *dlb, int group)
1309 {
1310 struct dlb_hw_dev *handle = &dlb->qm_instance;
1311 struct dlb_get_sn_allocation_args cfg;
1312 struct dlb_cmd_response response;
1313 int ret;
1314
1315 cfg.group = group;
1316 cfg.response = (uintptr_t)&response;
1317
1318 ret = dlb_iface_get_sn_allocation(handle, &cfg);
1319 if (ret < 0) {
1320 DLB_LOG_ERR("dlb: get_sn_allocation ret=%d (driver status: %s)\n",
1321 ret, dlb_error_strings[response.status]);
1322 return ret;
1323 }
1324
1325 return response.id;
1326 }
1327
1328 static int
dlb_set_sn_allocation(struct dlb_eventdev * dlb,int group,int num)1329 dlb_set_sn_allocation(struct dlb_eventdev *dlb, int group, int num)
1330 {
1331 struct dlb_hw_dev *handle = &dlb->qm_instance;
1332 struct dlb_set_sn_allocation_args cfg;
1333 struct dlb_cmd_response response;
1334 int ret;
1335
1336 cfg.num = num;
1337 cfg.group = group;
1338 cfg.response = (uintptr_t)&response;
1339
1340 ret = dlb_iface_set_sn_allocation(handle, &cfg);
1341 if (ret < 0) {
1342 DLB_LOG_ERR("dlb: set_sn_allocation ret=%d (driver status: %s)\n",
1343 ret, dlb_error_strings[response.status]);
1344 return ret;
1345 }
1346
1347 return ret;
1348 }
1349
1350 static int32_t
dlb_get_sn_occupancy(struct dlb_eventdev * dlb,int group)1351 dlb_get_sn_occupancy(struct dlb_eventdev *dlb, int group)
1352 {
1353 struct dlb_hw_dev *handle = &dlb->qm_instance;
1354 struct dlb_get_sn_occupancy_args cfg;
1355 struct dlb_cmd_response response;
1356 int ret;
1357
1358 cfg.group = group;
1359 cfg.response = (uintptr_t)&response;
1360
1361 ret = dlb_iface_get_sn_occupancy(handle, &cfg);
1362 if (ret < 0) {
1363 DLB_LOG_ERR("dlb: get_sn_occupancy ret=%d (driver status: %s)\n",
1364 ret, dlb_error_strings[response.status]);
1365 return ret;
1366 }
1367
1368 return response.id;
1369 }
1370
1371 /* Query the current sequence number allocations and, if they conflict with the
1372 * requested LDB queue configuration, attempt to re-allocate sequence numbers.
1373 * This is best-effort; if it fails, the PMD will attempt to configure the
1374 * load-balanced queue and return an error.
1375 */
1376 static void
dlb_program_sn_allocation(struct dlb_eventdev * dlb,const struct rte_event_queue_conf * queue_conf)1377 dlb_program_sn_allocation(struct dlb_eventdev *dlb,
1378 const struct rte_event_queue_conf *queue_conf)
1379 {
1380 int grp_occupancy[DLB_NUM_SN_GROUPS];
1381 int grp_alloc[DLB_NUM_SN_GROUPS];
1382 int i, sequence_numbers;
1383
1384 sequence_numbers = (int)queue_conf->nb_atomic_order_sequences;
1385
1386 for (i = 0; i < DLB_NUM_SN_GROUPS; i++) {
1387 int total_slots;
1388
1389 grp_alloc[i] = dlb_get_sn_allocation(dlb, i);
1390 if (grp_alloc[i] < 0)
1391 return;
1392
1393 total_slots = DLB_MAX_LDB_SN_ALLOC / grp_alloc[i];
1394
1395 grp_occupancy[i] = dlb_get_sn_occupancy(dlb, i);
1396 if (grp_occupancy[i] < 0)
1397 return;
1398
1399 /* DLB has at least one available slot for the requested
1400 * sequence numbers, so no further configuration required.
1401 */
1402 if (grp_alloc[i] == sequence_numbers &&
1403 grp_occupancy[i] < total_slots)
1404 return;
1405 }
1406
1407 /* None of the sequence number groups are configured for the requested
1408 * sequence numbers, so we have to reconfigure one of them. This is
1409 * only possible if a group is not in use.
1410 */
1411 for (i = 0; i < DLB_NUM_SN_GROUPS; i++) {
1412 if (grp_occupancy[i] == 0)
1413 break;
1414 }
1415
1416 if (i == DLB_NUM_SN_GROUPS) {
1417 DLB_LOG_ERR("[%s()] No groups with %d sequence_numbers are available or have free slots\n",
1418 __func__, sequence_numbers);
1419 return;
1420 }
1421
1422 /* Attempt to configure slot i with the requested number of sequence
1423 * numbers. Ignore the return value -- if this fails, the error will be
1424 * caught during subsequent queue configuration.
1425 */
1426 dlb_set_sn_allocation(dlb, i, sequence_numbers);
1427 }
1428
1429 static int
dlb_eventdev_ldb_queue_setup(struct rte_eventdev * dev,struct dlb_eventdev_queue * ev_queue,const struct rte_event_queue_conf * queue_conf)1430 dlb_eventdev_ldb_queue_setup(struct rte_eventdev *dev,
1431 struct dlb_eventdev_queue *ev_queue,
1432 const struct rte_event_queue_conf *queue_conf)
1433 {
1434 struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1435 int32_t qm_qid;
1436
1437 if (queue_conf->nb_atomic_order_sequences)
1438 dlb_program_sn_allocation(dlb, queue_conf);
1439
1440 qm_qid = dlb_hw_create_ldb_queue(dlb,
1441 &ev_queue->qm_queue,
1442 queue_conf);
1443 if (qm_qid < 0) {
1444 DLB_LOG_ERR("Failed to create the load-balanced queue\n");
1445
1446 return qm_qid;
1447 }
1448
1449 dlb->qm_ldb_to_ev_queue_id[qm_qid] = ev_queue->id;
1450
1451 ev_queue->qm_queue.id = qm_qid;
1452
1453 return 0;
1454 }
1455
dlb_num_dir_queues_setup(struct dlb_eventdev * dlb)1456 static int dlb_num_dir_queues_setup(struct dlb_eventdev *dlb)
1457 {
1458 int i, num = 0;
1459
1460 for (i = 0; i < dlb->num_queues; i++) {
1461 if (dlb->ev_queues[i].setup_done &&
1462 dlb->ev_queues[i].qm_queue.is_directed)
1463 num++;
1464 }
1465
1466 return num;
1467 }
1468
1469 static void
dlb_queue_link_teardown(struct dlb_eventdev * dlb,struct dlb_eventdev_queue * ev_queue)1470 dlb_queue_link_teardown(struct dlb_eventdev *dlb,
1471 struct dlb_eventdev_queue *ev_queue)
1472 {
1473 struct dlb_eventdev_port *ev_port;
1474 int i, j;
1475
1476 for (i = 0; i < dlb->num_ports; i++) {
1477 ev_port = &dlb->ev_ports[i];
1478
1479 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++) {
1480 if (!ev_port->link[j].valid ||
1481 ev_port->link[j].queue_id != ev_queue->id)
1482 continue;
1483
1484 ev_port->link[j].valid = false;
1485 ev_port->num_links--;
1486 }
1487 }
1488
1489 ev_queue->num_links = 0;
1490 }
1491
1492 static int
dlb_eventdev_queue_setup(struct rte_eventdev * dev,uint8_t ev_qid,const struct rte_event_queue_conf * queue_conf)1493 dlb_eventdev_queue_setup(struct rte_eventdev *dev,
1494 uint8_t ev_qid,
1495 const struct rte_event_queue_conf *queue_conf)
1496 {
1497 struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1498 struct dlb_eventdev_queue *ev_queue;
1499 int ret;
1500
1501 if (queue_conf == NULL)
1502 return -EINVAL;
1503
1504 if (ev_qid >= dlb->num_queues)
1505 return -EINVAL;
1506
1507 ev_queue = &dlb->ev_queues[ev_qid];
1508
1509 ev_queue->qm_queue.is_directed = queue_conf->event_queue_cfg &
1510 RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
1511 ev_queue->id = ev_qid;
1512 ev_queue->conf = *queue_conf;
1513
1514 if (!ev_queue->qm_queue.is_directed) {
1515 ret = dlb_eventdev_ldb_queue_setup(dev, ev_queue, queue_conf);
1516 } else {
1517 /* The directed queue isn't setup until link time, at which
1518 * point we know its directed port ID. Directed queue setup
1519 * will only fail if this queue is already setup or there are
1520 * no directed queues left to configure.
1521 */
1522 ret = 0;
1523
1524 ev_queue->qm_queue.config_state = DLB_NOT_CONFIGURED;
1525
1526 if (ev_queue->setup_done ||
1527 dlb_num_dir_queues_setup(dlb) == dlb->num_dir_queues)
1528 ret = -EINVAL;
1529 }
1530
1531 /* Tear down pre-existing port->queue links */
1532 if (!ret && dlb->run_state == DLB_RUN_STATE_STOPPED)
1533 dlb_queue_link_teardown(dlb, ev_queue);
1534
1535 if (!ret)
1536 ev_queue->setup_done = true;
1537
1538 return ret;
1539 }
1540
1541 static void
dlb_port_link_teardown(struct dlb_eventdev * dlb,struct dlb_eventdev_port * ev_port)1542 dlb_port_link_teardown(struct dlb_eventdev *dlb,
1543 struct dlb_eventdev_port *ev_port)
1544 {
1545 struct dlb_eventdev_queue *ev_queue;
1546 int i;
1547
1548 for (i = 0; i < DLB_MAX_NUM_QIDS_PER_LDB_CQ; i++) {
1549 if (!ev_port->link[i].valid)
1550 continue;
1551
1552 ev_queue = &dlb->ev_queues[ev_port->link[i].queue_id];
1553
1554 ev_port->link[i].valid = false;
1555 ev_port->num_links--;
1556 ev_queue->num_links--;
1557 }
1558 }
1559
1560 static int
dlb_eventdev_port_setup(struct rte_eventdev * dev,uint8_t ev_port_id,const struct rte_event_port_conf * port_conf)1561 dlb_eventdev_port_setup(struct rte_eventdev *dev,
1562 uint8_t ev_port_id,
1563 const struct rte_event_port_conf *port_conf)
1564 {
1565 struct dlb_eventdev *dlb;
1566 struct dlb_eventdev_port *ev_port;
1567 bool use_rsvd_token_scheme;
1568 uint32_t adj_cq_depth;
1569 uint16_t rsvd_tokens;
1570 int ret;
1571
1572 if (dev == NULL || port_conf == NULL) {
1573 DLB_LOG_ERR("Null parameter\n");
1574 return -EINVAL;
1575 }
1576
1577 dlb = dlb_pmd_priv(dev);
1578
1579 if (ev_port_id >= DLB_MAX_NUM_PORTS)
1580 return -EINVAL;
1581
1582 if (port_conf->dequeue_depth >
1583 evdev_dlb_default_info.max_event_port_dequeue_depth ||
1584 port_conf->enqueue_depth >
1585 evdev_dlb_default_info.max_event_port_enqueue_depth)
1586 return -EINVAL;
1587
1588 ev_port = &dlb->ev_ports[ev_port_id];
1589 /* configured? */
1590 if (ev_port->setup_done) {
1591 DLB_LOG_ERR("evport %d is already configured\n", ev_port_id);
1592 return -EINVAL;
1593 }
1594
1595 /* The reserved token interrupt arming scheme requires that one or more
1596 * CQ tokens be reserved by the PMD. This limits the amount of CQ space
1597 * usable by the DLB, so in order to give an *effective* CQ depth equal
1598 * to the user-requested value, we double CQ depth and reserve half of
1599 * its tokens. If the user requests the max CQ depth (256) then we
1600 * cannot double it, so we reserve one token and give an effective
1601 * depth of 255 entries.
1602 */
1603 use_rsvd_token_scheme = true;
1604 rsvd_tokens = 1;
1605 adj_cq_depth = port_conf->dequeue_depth;
1606
1607 if (use_rsvd_token_scheme && adj_cq_depth < 256) {
1608 rsvd_tokens = adj_cq_depth;
1609 adj_cq_depth *= 2;
1610 }
1611
1612 ev_port->qm_port.is_directed = port_conf->event_port_cfg &
1613 RTE_EVENT_PORT_CFG_SINGLE_LINK;
1614
1615 if (!ev_port->qm_port.is_directed) {
1616 ret = dlb_hw_create_ldb_port(dlb,
1617 ev_port,
1618 port_conf->dequeue_depth,
1619 adj_cq_depth,
1620 port_conf->enqueue_depth,
1621 rsvd_tokens,
1622 use_rsvd_token_scheme);
1623 if (ret < 0) {
1624 DLB_LOG_ERR("Failed to create the lB port ve portId=%d\n",
1625 ev_port_id);
1626 return ret;
1627 }
1628 } else {
1629 ret = dlb_hw_create_dir_port(dlb,
1630 ev_port,
1631 port_conf->dequeue_depth,
1632 adj_cq_depth,
1633 port_conf->enqueue_depth,
1634 rsvd_tokens,
1635 use_rsvd_token_scheme);
1636 if (ret < 0) {
1637 DLB_LOG_ERR("Failed to create the DIR port\n");
1638 return ret;
1639 }
1640 }
1641
1642 /* Save off port config for reconfig */
1643 dlb->ev_ports[ev_port_id].conf = *port_conf;
1644
1645 dlb->ev_ports[ev_port_id].id = ev_port_id;
1646 dlb->ev_ports[ev_port_id].enq_configured = true;
1647 dlb->ev_ports[ev_port_id].setup_done = true;
1648 dlb->ev_ports[ev_port_id].inflight_max =
1649 port_conf->new_event_threshold;
1650 dlb->ev_ports[ev_port_id].implicit_release =
1651 !(port_conf->event_port_cfg &
1652 RTE_EVENT_PORT_CFG_DISABLE_IMPL_REL);
1653 dlb->ev_ports[ev_port_id].outstanding_releases = 0;
1654 dlb->ev_ports[ev_port_id].inflight_credits = 0;
1655 dlb->ev_ports[ev_port_id].credit_update_quanta =
1656 RTE_LIBRTE_PMD_DLB_SW_CREDIT_QUANTA;
1657 dlb->ev_ports[ev_port_id].dlb = dlb; /* reverse link */
1658
1659 /* Tear down pre-existing port->queue links */
1660 if (dlb->run_state == DLB_RUN_STATE_STOPPED)
1661 dlb_port_link_teardown(dlb, &dlb->ev_ports[ev_port_id]);
1662
1663 dev->data->ports[ev_port_id] = &dlb->ev_ports[ev_port_id];
1664
1665 return 0;
1666 }
1667
1668 static int
dlb_eventdev_reapply_configuration(struct rte_eventdev * dev)1669 dlb_eventdev_reapply_configuration(struct rte_eventdev *dev)
1670 {
1671 struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1672 int ret, i;
1673
1674 /* If an event queue or port was previously configured, but hasn't been
1675 * reconfigured, reapply its original configuration.
1676 */
1677 for (i = 0; i < dlb->num_queues; i++) {
1678 struct dlb_eventdev_queue *ev_queue;
1679
1680 ev_queue = &dlb->ev_queues[i];
1681
1682 if (ev_queue->qm_queue.config_state != DLB_PREV_CONFIGURED)
1683 continue;
1684
1685 ret = dlb_eventdev_queue_setup(dev, i, &ev_queue->conf);
1686 if (ret < 0) {
1687 DLB_LOG_ERR("dlb: failed to reconfigure queue %d", i);
1688 return ret;
1689 }
1690 }
1691
1692 for (i = 0; i < dlb->num_ports; i++) {
1693 struct dlb_eventdev_port *ev_port = &dlb->ev_ports[i];
1694
1695 if (ev_port->qm_port.config_state != DLB_PREV_CONFIGURED)
1696 continue;
1697
1698 ret = dlb_eventdev_port_setup(dev, i, &ev_port->conf);
1699 if (ret < 0) {
1700 DLB_LOG_ERR("dlb: failed to reconfigure ev_port %d",
1701 i);
1702 return ret;
1703 }
1704 }
1705
1706 return 0;
1707 }
1708
1709 static int
set_dev_id(const char * key __rte_unused,const char * value,void * opaque)1710 set_dev_id(const char *key __rte_unused,
1711 const char *value,
1712 void *opaque)
1713 {
1714 int *dev_id = opaque;
1715 int ret;
1716
1717 if (value == NULL || opaque == NULL) {
1718 DLB_LOG_ERR("NULL pointer\n");
1719 return -EINVAL;
1720 }
1721
1722 ret = dlb_string_to_int(dev_id, value);
1723 if (ret < 0)
1724 return ret;
1725
1726 return 0;
1727 }
1728
1729 static int
set_defer_sched(const char * key __rte_unused,const char * value,void * opaque)1730 set_defer_sched(const char *key __rte_unused,
1731 const char *value,
1732 void *opaque)
1733 {
1734 int *defer_sched = opaque;
1735
1736 if (value == NULL || opaque == NULL) {
1737 DLB_LOG_ERR("NULL pointer\n");
1738 return -EINVAL;
1739 }
1740
1741 if (strncmp(value, "on", 2) != 0) {
1742 DLB_LOG_ERR("Invalid defer_sched argument \"%s\" (expected \"on\")\n",
1743 value);
1744 return -EINVAL;
1745 }
1746
1747 *defer_sched = 1;
1748
1749 return 0;
1750 }
1751
1752 static int
set_num_atm_inflights(const char * key __rte_unused,const char * value,void * opaque)1753 set_num_atm_inflights(const char *key __rte_unused,
1754 const char *value,
1755 void *opaque)
1756 {
1757 int *num_atm_inflights = opaque;
1758 int ret;
1759
1760 if (value == NULL || opaque == NULL) {
1761 DLB_LOG_ERR("NULL pointer\n");
1762 return -EINVAL;
1763 }
1764
1765 ret = dlb_string_to_int(num_atm_inflights, value);
1766 if (ret < 0)
1767 return ret;
1768
1769 if (*num_atm_inflights < 0 ||
1770 *num_atm_inflights > DLB_MAX_NUM_ATM_INFLIGHTS) {
1771 DLB_LOG_ERR("dlb: atm_inflights must be between 0 and %d\n",
1772 DLB_MAX_NUM_ATM_INFLIGHTS);
1773 return -EINVAL;
1774 }
1775
1776 return 0;
1777 }
1778
1779 static int
dlb_validate_port_link(struct dlb_eventdev_port * ev_port,uint8_t queue_id,bool link_exists,int index)1780 dlb_validate_port_link(struct dlb_eventdev_port *ev_port,
1781 uint8_t queue_id,
1782 bool link_exists,
1783 int index)
1784 {
1785 struct dlb_eventdev *dlb = ev_port->dlb;
1786 struct dlb_eventdev_queue *ev_queue;
1787 bool port_is_dir, queue_is_dir;
1788
1789 if (queue_id > dlb->num_queues) {
1790 DLB_LOG_ERR("queue_id %d > num queues %d\n",
1791 queue_id, dlb->num_queues);
1792 rte_errno = -EINVAL;
1793 return -1;
1794 }
1795
1796 ev_queue = &dlb->ev_queues[queue_id];
1797
1798 if (!ev_queue->setup_done &&
1799 ev_queue->qm_queue.config_state != DLB_PREV_CONFIGURED) {
1800 DLB_LOG_ERR("setup not done and not previously configured\n");
1801 rte_errno = -EINVAL;
1802 return -1;
1803 }
1804
1805 port_is_dir = ev_port->qm_port.is_directed;
1806 queue_is_dir = ev_queue->qm_queue.is_directed;
1807
1808 if (port_is_dir != queue_is_dir) {
1809 DLB_LOG_ERR("%s queue %u can't link to %s port %u\n",
1810 queue_is_dir ? "DIR" : "LDB", ev_queue->id,
1811 port_is_dir ? "DIR" : "LDB", ev_port->id);
1812
1813 rte_errno = -EINVAL;
1814 return -1;
1815 }
1816
1817 /* Check if there is space for the requested link */
1818 if (!link_exists && index == -1) {
1819 DLB_LOG_ERR("no space for new link\n");
1820 rte_errno = -ENOSPC;
1821 return -1;
1822 }
1823
1824 /* Check if the directed port is already linked */
1825 if (ev_port->qm_port.is_directed && ev_port->num_links > 0 &&
1826 !link_exists) {
1827 DLB_LOG_ERR("Can't link DIR port %d to >1 queues\n",
1828 ev_port->id);
1829 rte_errno = -EINVAL;
1830 return -1;
1831 }
1832
1833 /* Check if the directed queue is already linked */
1834 if (ev_queue->qm_queue.is_directed && ev_queue->num_links > 0 &&
1835 !link_exists) {
1836 DLB_LOG_ERR("Can't link DIR queue %d to >1 ports\n",
1837 ev_queue->id);
1838 rte_errno = -EINVAL;
1839 return -1;
1840 }
1841
1842 return 0;
1843 }
1844
1845 static int32_t
dlb_hw_create_dir_queue(struct dlb_eventdev * dlb,int32_t qm_port_id)1846 dlb_hw_create_dir_queue(struct dlb_eventdev *dlb, int32_t qm_port_id)
1847 {
1848 struct dlb_hw_dev *handle = &dlb->qm_instance;
1849 struct dlb_create_dir_queue_args cfg;
1850 struct dlb_cmd_response response;
1851 int32_t ret;
1852
1853 cfg.response = (uintptr_t)&response;
1854
1855 /* The directed port is always configured before its queue */
1856 cfg.port_id = qm_port_id;
1857
1858 ret = dlb_iface_dir_queue_create(handle, &cfg);
1859 if (ret < 0) {
1860 DLB_LOG_ERR("dlb: create DIR event queue error, ret=%d (driver status: %s)\n",
1861 ret, dlb_error_strings[response.status]);
1862 return -EINVAL;
1863 }
1864
1865 return response.id;
1866 }
1867
1868 static int
dlb_eventdev_dir_queue_setup(struct dlb_eventdev * dlb,struct dlb_eventdev_queue * ev_queue,struct dlb_eventdev_port * ev_port)1869 dlb_eventdev_dir_queue_setup(struct dlb_eventdev *dlb,
1870 struct dlb_eventdev_queue *ev_queue,
1871 struct dlb_eventdev_port *ev_port)
1872 {
1873 int32_t qm_qid;
1874
1875 qm_qid = dlb_hw_create_dir_queue(dlb, ev_port->qm_port.id);
1876
1877 if (qm_qid < 0) {
1878 DLB_LOG_ERR("Failed to create the DIR queue\n");
1879 return qm_qid;
1880 }
1881
1882 dlb->qm_dir_to_ev_queue_id[qm_qid] = ev_queue->id;
1883
1884 ev_queue->qm_queue.id = qm_qid;
1885
1886 return 0;
1887 }
1888
1889 static int16_t
dlb_hw_map_ldb_qid_to_port(struct dlb_hw_dev * handle,uint32_t qm_port_id,uint16_t qm_qid,uint8_t priority)1890 dlb_hw_map_ldb_qid_to_port(struct dlb_hw_dev *handle,
1891 uint32_t qm_port_id,
1892 uint16_t qm_qid,
1893 uint8_t priority)
1894 {
1895 struct dlb_map_qid_args cfg;
1896 struct dlb_cmd_response response;
1897 int32_t ret;
1898
1899 if (handle == NULL)
1900 return -EINVAL;
1901
1902 /* Build message */
1903 cfg.response = (uintptr_t)&response;
1904 cfg.port_id = qm_port_id;
1905 cfg.qid = qm_qid;
1906 cfg.priority = EV_TO_DLB_PRIO(priority);
1907
1908 ret = dlb_iface_map_qid(handle, &cfg);
1909 if (ret < 0) {
1910 DLB_LOG_ERR("dlb: map qid error, ret=%d (driver status: %s)\n",
1911 ret, dlb_error_strings[response.status]);
1912 DLB_LOG_ERR("dlb: device_id=%d grp=%d, qm_port=%d, qm_qid=%d prio=%d\n",
1913 handle->device_id,
1914 handle->domain_id, cfg.port_id,
1915 cfg.qid,
1916 cfg.priority);
1917 } else {
1918 DLB_LOG_DBG("dlb: mapped queue %d to qm_port %d\n",
1919 qm_qid, qm_port_id);
1920 }
1921
1922 return ret;
1923 }
1924
1925 static int
dlb_event_queue_join_ldb(struct dlb_eventdev * dlb,struct dlb_eventdev_port * ev_port,struct dlb_eventdev_queue * ev_queue,uint8_t priority)1926 dlb_event_queue_join_ldb(struct dlb_eventdev *dlb,
1927 struct dlb_eventdev_port *ev_port,
1928 struct dlb_eventdev_queue *ev_queue,
1929 uint8_t priority)
1930 {
1931 int first_avail = -1;
1932 int ret, i;
1933
1934 for (i = 0; i < DLB_MAX_NUM_QIDS_PER_LDB_CQ; i++) {
1935 if (ev_port->link[i].valid) {
1936 if (ev_port->link[i].queue_id == ev_queue->id &&
1937 ev_port->link[i].priority == priority) {
1938 if (ev_port->link[i].mapped)
1939 return 0; /* already mapped */
1940 first_avail = i;
1941 }
1942 } else {
1943 if (first_avail == -1)
1944 first_avail = i;
1945 }
1946 }
1947 if (first_avail == -1) {
1948 DLB_LOG_ERR("dlb: qm_port %d has no available QID slots.\n",
1949 ev_port->qm_port.id);
1950 return -EINVAL;
1951 }
1952
1953 ret = dlb_hw_map_ldb_qid_to_port(&dlb->qm_instance,
1954 ev_port->qm_port.id,
1955 ev_queue->qm_queue.id,
1956 priority);
1957
1958 if (!ret)
1959 ev_port->link[first_avail].mapped = true;
1960
1961 return ret;
1962 }
1963
1964 static int
dlb_do_port_link(struct rte_eventdev * dev,struct dlb_eventdev_queue * ev_queue,struct dlb_eventdev_port * ev_port,uint8_t prio)1965 dlb_do_port_link(struct rte_eventdev *dev,
1966 struct dlb_eventdev_queue *ev_queue,
1967 struct dlb_eventdev_port *ev_port,
1968 uint8_t prio)
1969 {
1970 struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1971 int err;
1972
1973 /* Don't link until start time. */
1974 if (dlb->run_state == DLB_RUN_STATE_STOPPED)
1975 return 0;
1976
1977 if (ev_queue->qm_queue.is_directed)
1978 err = dlb_eventdev_dir_queue_setup(dlb, ev_queue, ev_port);
1979 else
1980 err = dlb_event_queue_join_ldb(dlb, ev_port, ev_queue, prio);
1981
1982 if (err) {
1983 DLB_LOG_ERR("port link failure for %s ev_q %d, ev_port %d\n",
1984 ev_queue->qm_queue.is_directed ? "DIR" : "LDB",
1985 ev_queue->id, ev_port->id);
1986
1987 rte_errno = err;
1988 return -1;
1989 }
1990
1991 return 0;
1992 }
1993
1994 static int
dlb_eventdev_apply_port_links(struct rte_eventdev * dev)1995 dlb_eventdev_apply_port_links(struct rte_eventdev *dev)
1996 {
1997 struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1998 int i;
1999
2000 /* Perform requested port->queue links */
2001 for (i = 0; i < dlb->num_ports; i++) {
2002 struct dlb_eventdev_port *ev_port = &dlb->ev_ports[i];
2003 int j;
2004
2005 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++) {
2006 struct dlb_eventdev_queue *ev_queue;
2007 uint8_t prio, queue_id;
2008
2009 if (!ev_port->link[j].valid)
2010 continue;
2011
2012 prio = ev_port->link[j].priority;
2013 queue_id = ev_port->link[j].queue_id;
2014
2015 if (dlb_validate_port_link(ev_port, queue_id, true, j))
2016 return -EINVAL;
2017
2018 ev_queue = &dlb->ev_queues[queue_id];
2019
2020 if (dlb_do_port_link(dev, ev_queue, ev_port, prio))
2021 return -EINVAL;
2022 }
2023 }
2024
2025 return 0;
2026 }
2027
2028 static int
dlb_eventdev_port_link(struct rte_eventdev * dev,void * event_port,const uint8_t queues[],const uint8_t priorities[],uint16_t nb_links)2029 dlb_eventdev_port_link(struct rte_eventdev *dev, void *event_port,
2030 const uint8_t queues[], const uint8_t priorities[],
2031 uint16_t nb_links)
2032
2033 {
2034 struct dlb_eventdev_port *ev_port = event_port;
2035 struct dlb_eventdev *dlb;
2036 int i, j;
2037
2038 RTE_SET_USED(dev);
2039
2040 if (ev_port == NULL) {
2041 DLB_LOG_ERR("dlb: evport not setup\n");
2042 rte_errno = -EINVAL;
2043 return 0;
2044 }
2045
2046 if (!ev_port->setup_done &&
2047 ev_port->qm_port.config_state != DLB_PREV_CONFIGURED) {
2048 DLB_LOG_ERR("dlb: evport not setup\n");
2049 rte_errno = -EINVAL;
2050 return 0;
2051 }
2052
2053 /* Note: rte_event_port_link() ensures the PMD won't receive a NULL
2054 * queues pointer.
2055 */
2056 if (nb_links == 0) {
2057 DLB_LOG_DBG("dlb: nb_links is 0\n");
2058 return 0; /* Ignore and return success */
2059 }
2060
2061 dlb = ev_port->dlb;
2062
2063 DLB_LOG_DBG("Linking %u queues to %s port %d\n",
2064 nb_links,
2065 ev_port->qm_port.is_directed ? "DIR" : "LDB",
2066 ev_port->id);
2067
2068 for (i = 0; i < nb_links; i++) {
2069 struct dlb_eventdev_queue *ev_queue;
2070 uint8_t queue_id, prio;
2071 bool found = false;
2072 int index = -1;
2073
2074 queue_id = queues[i];
2075 prio = priorities[i];
2076
2077 /* Check if the link already exists. */
2078 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++)
2079 if (ev_port->link[j].valid) {
2080 if (ev_port->link[j].queue_id == queue_id) {
2081 found = true;
2082 index = j;
2083 break;
2084 }
2085 } else {
2086 if (index == -1)
2087 index = j;
2088 }
2089
2090 /* could not link */
2091 if (index == -1)
2092 break;
2093
2094 /* Check if already linked at the requested priority */
2095 if (found && ev_port->link[j].priority == prio)
2096 continue;
2097
2098 if (dlb_validate_port_link(ev_port, queue_id, found, index))
2099 break; /* return index of offending queue */
2100
2101 ev_queue = &dlb->ev_queues[queue_id];
2102
2103 if (dlb_do_port_link(dev, ev_queue, ev_port, prio))
2104 break; /* return index of offending queue */
2105
2106 ev_queue->num_links++;
2107
2108 ev_port->link[index].queue_id = queue_id;
2109 ev_port->link[index].priority = prio;
2110 ev_port->link[index].valid = true;
2111 /* Entry already exists? If so, then must be prio change */
2112 if (!found)
2113 ev_port->num_links++;
2114 }
2115 return i;
2116 }
2117
2118 static int
dlb_eventdev_start(struct rte_eventdev * dev)2119 dlb_eventdev_start(struct rte_eventdev *dev)
2120 {
2121 struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
2122 struct dlb_hw_dev *handle = &dlb->qm_instance;
2123 struct dlb_start_domain_args cfg;
2124 struct dlb_cmd_response response;
2125 int ret, i;
2126
2127 rte_spinlock_lock(&dlb->qm_instance.resource_lock);
2128 if (dlb->run_state != DLB_RUN_STATE_STOPPED) {
2129 DLB_LOG_ERR("bad state %d for dev_start\n",
2130 (int)dlb->run_state);
2131 rte_spinlock_unlock(&dlb->qm_instance.resource_lock);
2132 return -EINVAL;
2133 }
2134 dlb->run_state = DLB_RUN_STATE_STARTING;
2135 rte_spinlock_unlock(&dlb->qm_instance.resource_lock);
2136
2137 /* If the device was configured more than once, some event ports and/or
2138 * queues may need to be reconfigured.
2139 */
2140 ret = dlb_eventdev_reapply_configuration(dev);
2141 if (ret)
2142 return ret;
2143
2144 /* The DLB PMD delays port links until the device is started. */
2145 ret = dlb_eventdev_apply_port_links(dev);
2146 if (ret)
2147 return ret;
2148
2149 cfg.response = (uintptr_t)&response;
2150
2151 for (i = 0; i < dlb->num_ports; i++) {
2152 if (!dlb->ev_ports[i].setup_done) {
2153 DLB_LOG_ERR("dlb: port %d not setup", i);
2154 return -ESTALE;
2155 }
2156 }
2157
2158 for (i = 0; i < dlb->num_queues; i++) {
2159 if (dlb->ev_queues[i].num_links == 0) {
2160 DLB_LOG_ERR("dlb: queue %d is not linked", i);
2161 return -ENOLINK;
2162 }
2163 }
2164
2165 ret = dlb_iface_sched_domain_start(handle, &cfg);
2166 if (ret < 0) {
2167 DLB_LOG_ERR("dlb: sched_domain_start ret=%d (driver status: %s)\n",
2168 ret, dlb_error_strings[response.status]);
2169 return ret;
2170 }
2171
2172 dlb->run_state = DLB_RUN_STATE_STARTED;
2173 DLB_LOG_DBG("dlb: sched_domain_start completed OK\n");
2174
2175 return 0;
2176 }
2177
2178 static inline int
dlb_check_enqueue_sw_credits(struct dlb_eventdev * dlb,struct dlb_eventdev_port * ev_port)2179 dlb_check_enqueue_sw_credits(struct dlb_eventdev *dlb,
2180 struct dlb_eventdev_port *ev_port)
2181 {
2182 uint32_t sw_inflights = __atomic_load_n(&dlb->inflights,
2183 __ATOMIC_SEQ_CST);
2184 const int num = 1;
2185
2186 if (unlikely(ev_port->inflight_max < sw_inflights)) {
2187 DLB_INC_STAT(ev_port->stats.traffic.tx_nospc_inflight_max, 1);
2188 rte_errno = -ENOSPC;
2189 return 1;
2190 }
2191
2192 if (ev_port->inflight_credits < num) {
2193 /* check if event enqueue brings ev_port over max threshold */
2194 uint32_t credit_update_quanta = ev_port->credit_update_quanta;
2195
2196 if (sw_inflights + credit_update_quanta >
2197 dlb->new_event_limit) {
2198 DLB_INC_STAT(
2199 ev_port->stats.traffic.tx_nospc_new_event_limit,
2200 1);
2201 rte_errno = -ENOSPC;
2202 return 1;
2203 }
2204
2205 __atomic_fetch_add(&dlb->inflights, credit_update_quanta,
2206 __ATOMIC_SEQ_CST);
2207 ev_port->inflight_credits += (credit_update_quanta);
2208
2209 if (ev_port->inflight_credits < num) {
2210 DLB_INC_STAT(
2211 ev_port->stats.traffic.tx_nospc_inflight_credits,
2212 1);
2213 rte_errno = -ENOSPC;
2214 return 1;
2215 }
2216 }
2217
2218 return 0;
2219 }
2220
2221 static inline void
dlb_replenish_sw_credits(struct dlb_eventdev * dlb,struct dlb_eventdev_port * ev_port)2222 dlb_replenish_sw_credits(struct dlb_eventdev *dlb,
2223 struct dlb_eventdev_port *ev_port)
2224 {
2225 uint16_t quanta = ev_port->credit_update_quanta;
2226
2227 if (ev_port->inflight_credits >= quanta * 2) {
2228 /* Replenish credits, saving one quanta for enqueues */
2229 uint16_t val = ev_port->inflight_credits - quanta;
2230
2231 __atomic_fetch_sub(&dlb->inflights, val, __ATOMIC_SEQ_CST);
2232 ev_port->inflight_credits -= val;
2233 }
2234 }
2235
2236 static __rte_always_inline uint16_t
dlb_read_pc(struct process_local_port_data * port_data,bool ldb)2237 dlb_read_pc(struct process_local_port_data *port_data, bool ldb)
2238 {
2239 volatile uint16_t *popcount;
2240
2241 if (ldb)
2242 popcount = port_data->ldb_popcount;
2243 else
2244 popcount = port_data->dir_popcount;
2245
2246 return *popcount;
2247 }
2248
2249 static inline int
dlb_check_enqueue_hw_ldb_credits(struct dlb_port * qm_port,struct process_local_port_data * port_data)2250 dlb_check_enqueue_hw_ldb_credits(struct dlb_port *qm_port,
2251 struct process_local_port_data *port_data)
2252 {
2253 if (unlikely(qm_port->cached_ldb_credits == 0)) {
2254 uint16_t pc;
2255
2256 pc = dlb_read_pc(port_data, true);
2257
2258 qm_port->cached_ldb_credits = pc -
2259 qm_port->ldb_pushcount_at_credit_expiry;
2260 if (unlikely(qm_port->cached_ldb_credits == 0)) {
2261 DLB_INC_STAT(
2262 qm_port->ev_port->stats.traffic.tx_nospc_ldb_hw_credits,
2263 1);
2264
2265 DLB_LOG_DBG("ldb credits exhausted\n");
2266 return 1;
2267 }
2268 qm_port->ldb_pushcount_at_credit_expiry +=
2269 qm_port->cached_ldb_credits;
2270 }
2271
2272 return 0;
2273 }
2274
2275 static inline int
dlb_check_enqueue_hw_dir_credits(struct dlb_port * qm_port,struct process_local_port_data * port_data)2276 dlb_check_enqueue_hw_dir_credits(struct dlb_port *qm_port,
2277 struct process_local_port_data *port_data)
2278 {
2279 if (unlikely(qm_port->cached_dir_credits == 0)) {
2280 uint16_t pc;
2281
2282 pc = dlb_read_pc(port_data, false);
2283
2284 qm_port->cached_dir_credits = pc -
2285 qm_port->dir_pushcount_at_credit_expiry;
2286
2287 if (unlikely(qm_port->cached_dir_credits == 0)) {
2288 DLB_INC_STAT(
2289 qm_port->ev_port->stats.traffic.tx_nospc_dir_hw_credits,
2290 1);
2291
2292 DLB_LOG_DBG("dir credits exhausted\n");
2293 return 1;
2294 }
2295 qm_port->dir_pushcount_at_credit_expiry +=
2296 qm_port->cached_dir_credits;
2297 }
2298
2299 return 0;
2300 }
2301
2302 static inline int
dlb_event_enqueue_prep(struct dlb_eventdev_port * ev_port,struct dlb_port * qm_port,const struct rte_event ev[],struct process_local_port_data * port_data,uint8_t * sched_type,uint8_t * queue_id)2303 dlb_event_enqueue_prep(struct dlb_eventdev_port *ev_port,
2304 struct dlb_port *qm_port,
2305 const struct rte_event ev[],
2306 struct process_local_port_data *port_data,
2307 uint8_t *sched_type,
2308 uint8_t *queue_id)
2309 {
2310 struct dlb_eventdev *dlb = ev_port->dlb;
2311 struct dlb_eventdev_queue *ev_queue;
2312 uint16_t *cached_credits = NULL;
2313 struct dlb_queue *qm_queue;
2314
2315 ev_queue = &dlb->ev_queues[ev->queue_id];
2316 qm_queue = &ev_queue->qm_queue;
2317 *queue_id = qm_queue->id;
2318
2319 /* Ignore sched_type and hardware credits on release events */
2320 if (ev->op == RTE_EVENT_OP_RELEASE)
2321 goto op_check;
2322
2323 if (!qm_queue->is_directed) {
2324 /* Load balanced destination queue */
2325
2326 if (dlb_check_enqueue_hw_ldb_credits(qm_port, port_data)) {
2327 rte_errno = -ENOSPC;
2328 return 1;
2329 }
2330 cached_credits = &qm_port->cached_ldb_credits;
2331
2332 switch (ev->sched_type) {
2333 case RTE_SCHED_TYPE_ORDERED:
2334 DLB_LOG_DBG("dlb: put_qe: RTE_SCHED_TYPE_ORDERED\n");
2335 if (qm_queue->sched_type != RTE_SCHED_TYPE_ORDERED) {
2336 DLB_LOG_ERR("dlb: tried to send ordered event to unordered queue %d\n",
2337 *queue_id);
2338 rte_errno = -EINVAL;
2339 return 1;
2340 }
2341 *sched_type = DLB_SCHED_ORDERED;
2342 break;
2343 case RTE_SCHED_TYPE_ATOMIC:
2344 DLB_LOG_DBG("dlb: put_qe: RTE_SCHED_TYPE_ATOMIC\n");
2345 *sched_type = DLB_SCHED_ATOMIC;
2346 break;
2347 case RTE_SCHED_TYPE_PARALLEL:
2348 DLB_LOG_DBG("dlb: put_qe: RTE_SCHED_TYPE_PARALLEL\n");
2349 if (qm_queue->sched_type == RTE_SCHED_TYPE_ORDERED)
2350 *sched_type = DLB_SCHED_ORDERED;
2351 else
2352 *sched_type = DLB_SCHED_UNORDERED;
2353 break;
2354 default:
2355 DLB_LOG_ERR("Unsupported LDB sched type in put_qe\n");
2356 DLB_INC_STAT(ev_port->stats.tx_invalid, 1);
2357 rte_errno = -EINVAL;
2358 return 1;
2359 }
2360 } else {
2361 /* Directed destination queue */
2362
2363 if (dlb_check_enqueue_hw_dir_credits(qm_port, port_data)) {
2364 rte_errno = -ENOSPC;
2365 return 1;
2366 }
2367 cached_credits = &qm_port->cached_dir_credits;
2368
2369 DLB_LOG_DBG("dlb: put_qe: RTE_SCHED_TYPE_DIRECTED\n");
2370
2371 *sched_type = DLB_SCHED_DIRECTED;
2372 }
2373
2374 op_check:
2375 switch (ev->op) {
2376 case RTE_EVENT_OP_NEW:
2377 /* Check that a sw credit is available */
2378 if (dlb_check_enqueue_sw_credits(dlb, ev_port)) {
2379 rte_errno = -ENOSPC;
2380 return 1;
2381 }
2382 ev_port->inflight_credits--;
2383 (*cached_credits)--;
2384 break;
2385 case RTE_EVENT_OP_FORWARD:
2386 /* Check for outstanding_releases underflow. If this occurs,
2387 * the application is not using the EVENT_OPs correctly; for
2388 * example, forwarding or releasing events that were not
2389 * dequeued.
2390 */
2391 RTE_ASSERT(ev_port->outstanding_releases > 0);
2392 ev_port->outstanding_releases--;
2393 qm_port->issued_releases++;
2394 (*cached_credits)--;
2395 break;
2396 case RTE_EVENT_OP_RELEASE:
2397 ev_port->inflight_credits++;
2398 /* Check for outstanding_releases underflow. If this occurs,
2399 * the application is not using the EVENT_OPs correctly; for
2400 * example, forwarding or releasing events that were not
2401 * dequeued.
2402 */
2403 RTE_ASSERT(ev_port->outstanding_releases > 0);
2404 ev_port->outstanding_releases--;
2405 qm_port->issued_releases++;
2406 /* Replenish s/w credits if enough are cached */
2407 dlb_replenish_sw_credits(dlb, ev_port);
2408 break;
2409 }
2410
2411 DLB_INC_STAT(ev_port->stats.tx_op_cnt[ev->op], 1);
2412 DLB_INC_STAT(ev_port->stats.traffic.tx_ok, 1);
2413
2414 #ifndef RTE_LIBRTE_PMD_DLB_QUELL_STATS
2415 if (ev->op != RTE_EVENT_OP_RELEASE) {
2416 DLB_INC_STAT(ev_port->stats.enq_ok[ev->queue_id], 1);
2417 DLB_INC_STAT(ev_port->stats.tx_sched_cnt[*sched_type], 1);
2418 }
2419 #endif
2420
2421 return 0;
2422 }
2423
2424 static uint8_t cmd_byte_map[NUM_DLB_PORT_TYPES][DLB_NUM_HW_SCHED_TYPES] = {
2425 {
2426 /* Load-balanced cmd bytes */
2427 [RTE_EVENT_OP_NEW] = DLB_NEW_CMD_BYTE,
2428 [RTE_EVENT_OP_FORWARD] = DLB_FWD_CMD_BYTE,
2429 [RTE_EVENT_OP_RELEASE] = DLB_COMP_CMD_BYTE,
2430 },
2431 {
2432 /* Directed cmd bytes */
2433 [RTE_EVENT_OP_NEW] = DLB_NEW_CMD_BYTE,
2434 [RTE_EVENT_OP_FORWARD] = DLB_NEW_CMD_BYTE,
2435 [RTE_EVENT_OP_RELEASE] = DLB_NOOP_CMD_BYTE,
2436 },
2437 };
2438
2439 static inline void
dlb_event_build_hcws(struct dlb_port * qm_port,const struct rte_event ev[],int num,uint8_t * sched_type,uint8_t * queue_id)2440 dlb_event_build_hcws(struct dlb_port *qm_port,
2441 const struct rte_event ev[],
2442 int num,
2443 uint8_t *sched_type,
2444 uint8_t *queue_id)
2445 {
2446 struct dlb_enqueue_qe *qe;
2447 uint16_t sched_word[4];
2448 __m128i sse_qe[2];
2449 int i;
2450
2451 qe = qm_port->qe4;
2452
2453 sse_qe[0] = _mm_setzero_si128();
2454 sse_qe[1] = _mm_setzero_si128();
2455
2456 switch (num) {
2457 case 4:
2458 /* Construct the metadata portion of two HCWs in one 128b SSE
2459 * register. HCW metadata is constructed in the SSE registers
2460 * like so:
2461 * sse_qe[0][63:0]: qe[0]'s metadata
2462 * sse_qe[0][127:64]: qe[1]'s metadata
2463 * sse_qe[1][63:0]: qe[2]'s metadata
2464 * sse_qe[1][127:64]: qe[3]'s metadata
2465 */
2466
2467 /* Convert the event operation into a command byte and store it
2468 * in the metadata:
2469 * sse_qe[0][63:56] = cmd_byte_map[is_directed][ev[0].op]
2470 * sse_qe[0][127:120] = cmd_byte_map[is_directed][ev[1].op]
2471 * sse_qe[1][63:56] = cmd_byte_map[is_directed][ev[2].op]
2472 * sse_qe[1][127:120] = cmd_byte_map[is_directed][ev[3].op]
2473 */
2474 #define DLB_QE_CMD_BYTE 7
2475 sse_qe[0] = _mm_insert_epi8(sse_qe[0],
2476 cmd_byte_map[qm_port->is_directed][ev[0].op],
2477 DLB_QE_CMD_BYTE);
2478 sse_qe[0] = _mm_insert_epi8(sse_qe[0],
2479 cmd_byte_map[qm_port->is_directed][ev[1].op],
2480 DLB_QE_CMD_BYTE + 8);
2481 sse_qe[1] = _mm_insert_epi8(sse_qe[1],
2482 cmd_byte_map[qm_port->is_directed][ev[2].op],
2483 DLB_QE_CMD_BYTE);
2484 sse_qe[1] = _mm_insert_epi8(sse_qe[1],
2485 cmd_byte_map[qm_port->is_directed][ev[3].op],
2486 DLB_QE_CMD_BYTE + 8);
2487
2488 /* Store priority, scheduling type, and queue ID in the sched
2489 * word array because these values are re-used when the
2490 * destination is a directed queue.
2491 */
2492 sched_word[0] = EV_TO_DLB_PRIO(ev[0].priority) << 10 |
2493 sched_type[0] << 8 |
2494 queue_id[0];
2495 sched_word[1] = EV_TO_DLB_PRIO(ev[1].priority) << 10 |
2496 sched_type[1] << 8 |
2497 queue_id[1];
2498 sched_word[2] = EV_TO_DLB_PRIO(ev[2].priority) << 10 |
2499 sched_type[2] << 8 |
2500 queue_id[2];
2501 sched_word[3] = EV_TO_DLB_PRIO(ev[3].priority) << 10 |
2502 sched_type[3] << 8 |
2503 queue_id[3];
2504
2505 /* Store the event priority, scheduling type, and queue ID in
2506 * the metadata:
2507 * sse_qe[0][31:16] = sched_word[0]
2508 * sse_qe[0][95:80] = sched_word[1]
2509 * sse_qe[1][31:16] = sched_word[2]
2510 * sse_qe[1][95:80] = sched_word[3]
2511 */
2512 #define DLB_QE_QID_SCHED_WORD 1
2513 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2514 sched_word[0],
2515 DLB_QE_QID_SCHED_WORD);
2516 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2517 sched_word[1],
2518 DLB_QE_QID_SCHED_WORD + 4);
2519 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2520 sched_word[2],
2521 DLB_QE_QID_SCHED_WORD);
2522 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2523 sched_word[3],
2524 DLB_QE_QID_SCHED_WORD + 4);
2525
2526 /* If the destination is a load-balanced queue, store the lock
2527 * ID. If it is a directed queue, DLB places this field in
2528 * bytes 10-11 of the received QE, so we format it accordingly:
2529 * sse_qe[0][47:32] = dir queue ? sched_word[0] : flow_id[0]
2530 * sse_qe[0][111:96] = dir queue ? sched_word[1] : flow_id[1]
2531 * sse_qe[1][47:32] = dir queue ? sched_word[2] : flow_id[2]
2532 * sse_qe[1][111:96] = dir queue ? sched_word[3] : flow_id[3]
2533 */
2534 #define DLB_QE_LOCK_ID_WORD 2
2535 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2536 (sched_type[0] == DLB_SCHED_DIRECTED) ?
2537 sched_word[0] : ev[0].flow_id,
2538 DLB_QE_LOCK_ID_WORD);
2539 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2540 (sched_type[1] == DLB_SCHED_DIRECTED) ?
2541 sched_word[1] : ev[1].flow_id,
2542 DLB_QE_LOCK_ID_WORD + 4);
2543 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2544 (sched_type[2] == DLB_SCHED_DIRECTED) ?
2545 sched_word[2] : ev[2].flow_id,
2546 DLB_QE_LOCK_ID_WORD);
2547 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2548 (sched_type[3] == DLB_SCHED_DIRECTED) ?
2549 sched_word[3] : ev[3].flow_id,
2550 DLB_QE_LOCK_ID_WORD + 4);
2551
2552 /* Store the event type and sub event type in the metadata:
2553 * sse_qe[0][15:0] = flow_id[0]
2554 * sse_qe[0][79:64] = flow_id[1]
2555 * sse_qe[1][15:0] = flow_id[2]
2556 * sse_qe[1][79:64] = flow_id[3]
2557 */
2558 #define DLB_QE_EV_TYPE_WORD 0
2559 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2560 ev[0].sub_event_type << 8 |
2561 ev[0].event_type,
2562 DLB_QE_EV_TYPE_WORD);
2563 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2564 ev[1].sub_event_type << 8 |
2565 ev[1].event_type,
2566 DLB_QE_EV_TYPE_WORD + 4);
2567 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2568 ev[2].sub_event_type << 8 |
2569 ev[2].event_type,
2570 DLB_QE_EV_TYPE_WORD);
2571 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2572 ev[3].sub_event_type << 8 |
2573 ev[3].event_type,
2574 DLB_QE_EV_TYPE_WORD + 4);
2575
2576 /* Store the metadata to memory (use the double-precision
2577 * _mm_storeh_pd because there is no integer function for
2578 * storing the upper 64b):
2579 * qe[0] metadata = sse_qe[0][63:0]
2580 * qe[1] metadata = sse_qe[0][127:64]
2581 * qe[2] metadata = sse_qe[1][63:0]
2582 * qe[3] metadata = sse_qe[1][127:64]
2583 */
2584 _mm_storel_epi64((__m128i *)&qe[0].u.opaque_data, sse_qe[0]);
2585 _mm_storeh_pd((double *)&qe[1].u.opaque_data,
2586 (__m128d) sse_qe[0]);
2587 _mm_storel_epi64((__m128i *)&qe[2].u.opaque_data, sse_qe[1]);
2588 _mm_storeh_pd((double *)&qe[3].u.opaque_data,
2589 (__m128d) sse_qe[1]);
2590
2591 qe[0].data = ev[0].u64;
2592 qe[1].data = ev[1].u64;
2593 qe[2].data = ev[2].u64;
2594 qe[3].data = ev[3].u64;
2595
2596 break;
2597 case 3:
2598 case 2:
2599 case 1:
2600 for (i = 0; i < num; i++) {
2601 qe[i].cmd_byte =
2602 cmd_byte_map[qm_port->is_directed][ev[i].op];
2603 qe[i].sched_type = sched_type[i];
2604 qe[i].data = ev[i].u64;
2605 qe[i].qid = queue_id[i];
2606 qe[i].priority = EV_TO_DLB_PRIO(ev[i].priority);
2607 qe[i].lock_id = ev[i].flow_id;
2608 if (sched_type[i] == DLB_SCHED_DIRECTED) {
2609 struct dlb_msg_info *info =
2610 (struct dlb_msg_info *)&qe[i].lock_id;
2611
2612 info->qid = queue_id[i];
2613 info->sched_type = DLB_SCHED_DIRECTED;
2614 info->priority = qe[i].priority;
2615 }
2616 qe[i].u.event_type.major = ev[i].event_type;
2617 qe[i].u.event_type.sub = ev[i].sub_event_type;
2618 }
2619 break;
2620 case 0:
2621 break;
2622 }
2623 }
2624
2625 static inline void
dlb_construct_token_pop_qe(struct dlb_port * qm_port,int idx)2626 dlb_construct_token_pop_qe(struct dlb_port *qm_port, int idx)
2627 {
2628 struct dlb_cq_pop_qe *qe = (void *)qm_port->qe4;
2629 int num = qm_port->owed_tokens;
2630
2631 if (qm_port->use_rsvd_token_scheme) {
2632 /* Check if there's a deficit of reserved tokens, and return
2633 * early if there are no (unreserved) tokens to consume.
2634 */
2635 if (num <= qm_port->cq_rsvd_token_deficit) {
2636 qm_port->cq_rsvd_token_deficit -= num;
2637 qm_port->owed_tokens = 0;
2638 return;
2639 }
2640 num -= qm_port->cq_rsvd_token_deficit;
2641 qm_port->cq_rsvd_token_deficit = 0;
2642 }
2643
2644 qe[idx].cmd_byte = DLB_POP_CMD_BYTE;
2645 qe[idx].tokens = num - 1;
2646 qm_port->owed_tokens = 0;
2647 }
2648
2649 static __rte_always_inline void
dlb_pp_write(struct dlb_enqueue_qe * qe4,struct process_local_port_data * port_data)2650 dlb_pp_write(struct dlb_enqueue_qe *qe4,
2651 struct process_local_port_data *port_data)
2652 {
2653 dlb_movdir64b(port_data->pp_addr, qe4);
2654 }
2655
2656 static inline void
dlb_hw_do_enqueue(struct dlb_port * qm_port,bool do_sfence,struct process_local_port_data * port_data)2657 dlb_hw_do_enqueue(struct dlb_port *qm_port,
2658 bool do_sfence,
2659 struct process_local_port_data *port_data)
2660 {
2661 DLB_LOG_DBG("dlb: Flushing QE(s) to DLB\n");
2662
2663 /* Since MOVDIR64B is weakly-ordered, use an SFENCE to ensure that
2664 * application writes complete before enqueueing the release HCW.
2665 */
2666 if (do_sfence)
2667 rte_wmb();
2668
2669 dlb_pp_write(qm_port->qe4, port_data);
2670 }
2671
2672 static inline int
dlb_consume_qe_immediate(struct dlb_port * qm_port,int num)2673 dlb_consume_qe_immediate(struct dlb_port *qm_port, int num)
2674 {
2675 struct process_local_port_data *port_data;
2676 struct dlb_cq_pop_qe *qe;
2677
2678 RTE_ASSERT(qm_port->config_state == DLB_CONFIGURED);
2679
2680 if (qm_port->use_rsvd_token_scheme) {
2681 /* Check if there's a deficit of reserved tokens, and return
2682 * early if there are no (unreserved) tokens to consume.
2683 */
2684 if (num <= qm_port->cq_rsvd_token_deficit) {
2685 qm_port->cq_rsvd_token_deficit -= num;
2686 qm_port->owed_tokens = 0;
2687 return 0;
2688 }
2689 num -= qm_port->cq_rsvd_token_deficit;
2690 qm_port->cq_rsvd_token_deficit = 0;
2691 }
2692
2693 qe = qm_port->consume_qe;
2694
2695 qe->tokens = num - 1;
2696 qe->int_arm = 0;
2697
2698 /* No store fence needed since no pointer is being sent, and CQ token
2699 * pops can be safely reordered with other HCWs.
2700 */
2701 port_data = &dlb_port[qm_port->id][PORT_TYPE(qm_port)];
2702
2703 dlb_movntdq_single(port_data->pp_addr, qe);
2704
2705 DLB_LOG_DBG("dlb: consume immediate - %d QEs\n", num);
2706
2707 qm_port->owed_tokens = 0;
2708
2709 return 0;
2710 }
2711
2712 static inline uint16_t
__dlb_event_enqueue_burst(void * event_port,const struct rte_event events[],uint16_t num,bool use_delayed)2713 __dlb_event_enqueue_burst(void *event_port,
2714 const struct rte_event events[],
2715 uint16_t num,
2716 bool use_delayed)
2717 {
2718 struct dlb_eventdev_port *ev_port = event_port;
2719 struct dlb_port *qm_port = &ev_port->qm_port;
2720 struct process_local_port_data *port_data;
2721 int i;
2722
2723 RTE_ASSERT(ev_port->enq_configured);
2724 RTE_ASSERT(events != NULL);
2725
2726 rte_errno = 0;
2727 i = 0;
2728
2729 port_data = &dlb_port[qm_port->id][PORT_TYPE(qm_port)];
2730
2731 while (i < num) {
2732 uint8_t sched_types[DLB_NUM_QES_PER_CACHE_LINE];
2733 uint8_t queue_ids[DLB_NUM_QES_PER_CACHE_LINE];
2734 int pop_offs = 0;
2735 int j = 0;
2736
2737 memset(qm_port->qe4,
2738 0,
2739 DLB_NUM_QES_PER_CACHE_LINE *
2740 sizeof(struct dlb_enqueue_qe));
2741
2742 for (; j < DLB_NUM_QES_PER_CACHE_LINE && (i + j) < num; j++) {
2743 const struct rte_event *ev = &events[i + j];
2744 int16_t thresh = qm_port->token_pop_thresh;
2745
2746 if (use_delayed &&
2747 qm_port->token_pop_mode == DELAYED_POP &&
2748 (ev->op == RTE_EVENT_OP_FORWARD ||
2749 ev->op == RTE_EVENT_OP_RELEASE) &&
2750 qm_port->issued_releases >= thresh - 1) {
2751 /* Insert the token pop QE and break out. This
2752 * may result in a partial HCW, but that is
2753 * simpler than supporting arbitrary QE
2754 * insertion.
2755 */
2756 dlb_construct_token_pop_qe(qm_port, j);
2757
2758 /* Reset the releases for the next QE batch */
2759 qm_port->issued_releases -= thresh;
2760
2761 /* When using delayed token pop mode, the
2762 * initial token threshold is the full CQ
2763 * depth. After the first token pop, we need to
2764 * reset it to the dequeue_depth.
2765 */
2766 qm_port->token_pop_thresh =
2767 qm_port->dequeue_depth;
2768
2769 pop_offs = 1;
2770 j++;
2771 break;
2772 }
2773
2774 if (dlb_event_enqueue_prep(ev_port, qm_port, ev,
2775 port_data, &sched_types[j],
2776 &queue_ids[j]))
2777 break;
2778 }
2779
2780 if (j == 0)
2781 break;
2782
2783 dlb_event_build_hcws(qm_port, &events[i], j - pop_offs,
2784 sched_types, queue_ids);
2785
2786 dlb_hw_do_enqueue(qm_port, i == 0, port_data);
2787
2788 /* Don't include the token pop QE in the enqueue count */
2789 i += j - pop_offs;
2790
2791 /* Don't interpret j < DLB_NUM_... as out-of-credits if
2792 * pop_offs != 0
2793 */
2794 if (j < DLB_NUM_QES_PER_CACHE_LINE && pop_offs == 0)
2795 break;
2796 }
2797
2798 RTE_ASSERT(!((i == 0 && rte_errno != -ENOSPC)));
2799
2800 return i;
2801 }
2802
2803 static inline uint16_t
dlb_event_enqueue_burst(void * event_port,const struct rte_event events[],uint16_t num)2804 dlb_event_enqueue_burst(void *event_port,
2805 const struct rte_event events[],
2806 uint16_t num)
2807 {
2808 return __dlb_event_enqueue_burst(event_port, events, num, false);
2809 }
2810
2811 static inline uint16_t
dlb_event_enqueue_burst_delayed(void * event_port,const struct rte_event events[],uint16_t num)2812 dlb_event_enqueue_burst_delayed(void *event_port,
2813 const struct rte_event events[],
2814 uint16_t num)
2815 {
2816 return __dlb_event_enqueue_burst(event_port, events, num, true);
2817 }
2818
2819 static inline uint16_t
dlb_event_enqueue(void * event_port,const struct rte_event events[])2820 dlb_event_enqueue(void *event_port,
2821 const struct rte_event events[])
2822 {
2823 return __dlb_event_enqueue_burst(event_port, events, 1, false);
2824 }
2825
2826 static inline uint16_t
dlb_event_enqueue_delayed(void * event_port,const struct rte_event events[])2827 dlb_event_enqueue_delayed(void *event_port,
2828 const struct rte_event events[])
2829 {
2830 return __dlb_event_enqueue_burst(event_port, events, 1, true);
2831 }
2832
2833 static uint16_t
dlb_event_enqueue_new_burst(void * event_port,const struct rte_event events[],uint16_t num)2834 dlb_event_enqueue_new_burst(void *event_port,
2835 const struct rte_event events[],
2836 uint16_t num)
2837 {
2838 return __dlb_event_enqueue_burst(event_port, events, num, false);
2839 }
2840
2841 static uint16_t
dlb_event_enqueue_new_burst_delayed(void * event_port,const struct rte_event events[],uint16_t num)2842 dlb_event_enqueue_new_burst_delayed(void *event_port,
2843 const struct rte_event events[],
2844 uint16_t num)
2845 {
2846 return __dlb_event_enqueue_burst(event_port, events, num, true);
2847 }
2848
2849 static uint16_t
dlb_event_enqueue_forward_burst(void * event_port,const struct rte_event events[],uint16_t num)2850 dlb_event_enqueue_forward_burst(void *event_port,
2851 const struct rte_event events[],
2852 uint16_t num)
2853 {
2854 return __dlb_event_enqueue_burst(event_port, events, num, false);
2855 }
2856
2857 static uint16_t
dlb_event_enqueue_forward_burst_delayed(void * event_port,const struct rte_event events[],uint16_t num)2858 dlb_event_enqueue_forward_burst_delayed(void *event_port,
2859 const struct rte_event events[],
2860 uint16_t num)
2861 {
2862 return __dlb_event_enqueue_burst(event_port, events, num, true);
2863 }
2864
2865 static __rte_always_inline int
dlb_recv_qe(struct dlb_port * qm_port,struct dlb_dequeue_qe * qe,uint8_t * offset)2866 dlb_recv_qe(struct dlb_port *qm_port, struct dlb_dequeue_qe *qe,
2867 uint8_t *offset)
2868 {
2869 uint8_t xor_mask[2][4] = { {0x0F, 0x0E, 0x0C, 0x08},
2870 {0x00, 0x01, 0x03, 0x07} };
2871 uint8_t and_mask[4] = {0x0F, 0x0E, 0x0C, 0x08};
2872 volatile struct dlb_dequeue_qe *cq_addr;
2873 __m128i *qes = (__m128i *)qe;
2874 uint64_t *cache_line_base;
2875 uint8_t gen_bits;
2876
2877 cq_addr = dlb_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
2878 cq_addr = &cq_addr[qm_port->cq_idx];
2879
2880 cache_line_base = (void *)(((uintptr_t)cq_addr) & ~0x3F);
2881 *offset = ((uintptr_t)cq_addr & 0x30) >> 4;
2882
2883 /* Load the next CQ cache line from memory. Pack these reads as tight
2884 * as possible to reduce the chance that DLB invalidates the line while
2885 * the CPU is reading it. Read the cache line backwards to ensure that
2886 * if QE[N] (N > 0) is valid, then QEs[0:N-1] are too.
2887 *
2888 * (Valid QEs start at &qe[offset])
2889 */
2890 qes[3] = _mm_load_si128((__m128i *)&cache_line_base[6]);
2891 qes[2] = _mm_load_si128((__m128i *)&cache_line_base[4]);
2892 qes[1] = _mm_load_si128((__m128i *)&cache_line_base[2]);
2893 qes[0] = _mm_load_si128((__m128i *)&cache_line_base[0]);
2894
2895 /* Evict the cache line ASAP */
2896 rte_cldemote(cache_line_base);
2897
2898 /* Extract and combine the gen bits */
2899 gen_bits = ((_mm_extract_epi8(qes[0], 15) & 0x1) << 0) |
2900 ((_mm_extract_epi8(qes[1], 15) & 0x1) << 1) |
2901 ((_mm_extract_epi8(qes[2], 15) & 0x1) << 2) |
2902 ((_mm_extract_epi8(qes[3], 15) & 0x1) << 3);
2903
2904 /* XOR the combined bits such that a 1 represents a valid QE */
2905 gen_bits ^= xor_mask[qm_port->gen_bit][*offset];
2906
2907 /* Mask off gen bits we don't care about */
2908 gen_bits &= and_mask[*offset];
2909
2910 return __builtin_popcount(gen_bits);
2911 }
2912
2913 static inline void
dlb_inc_cq_idx(struct dlb_port * qm_port,int cnt)2914 dlb_inc_cq_idx(struct dlb_port *qm_port, int cnt)
2915 {
2916 uint16_t idx = qm_port->cq_idx_unmasked + cnt;
2917
2918 qm_port->cq_idx_unmasked = idx;
2919 qm_port->cq_idx = idx & qm_port->cq_depth_mask;
2920 qm_port->gen_bit = (~(idx >> qm_port->gen_bit_shift)) & 0x1;
2921 }
2922
2923 static inline int
dlb_process_dequeue_qes(struct dlb_eventdev_port * ev_port,struct dlb_port * qm_port,struct rte_event * events,struct dlb_dequeue_qe * qes,int cnt)2924 dlb_process_dequeue_qes(struct dlb_eventdev_port *ev_port,
2925 struct dlb_port *qm_port,
2926 struct rte_event *events,
2927 struct dlb_dequeue_qe *qes,
2928 int cnt)
2929 {
2930 uint8_t *qid_mappings = qm_port->qid_mappings;
2931 int i, num;
2932
2933 RTE_SET_USED(ev_port); /* avoids unused variable error */
2934
2935 for (i = 0, num = 0; i < cnt; i++) {
2936 struct dlb_dequeue_qe *qe = &qes[i];
2937 int sched_type_map[4] = {
2938 [DLB_SCHED_ATOMIC] = RTE_SCHED_TYPE_ATOMIC,
2939 [DLB_SCHED_UNORDERED] = RTE_SCHED_TYPE_PARALLEL,
2940 [DLB_SCHED_ORDERED] = RTE_SCHED_TYPE_ORDERED,
2941 [DLB_SCHED_DIRECTED] = RTE_SCHED_TYPE_ATOMIC,
2942 };
2943
2944 DLB_LOG_DBG("dequeue success, data = 0x%llx, qid=%d, event_type=%d, subevent=%d\npp_id = %d, sched_type = %d, qid = %d, err=%d\n",
2945 (long long)qe->data, qe->qid,
2946 qe->u.event_type.major,
2947 qe->u.event_type.sub,
2948 qe->pp_id, qe->sched_type, qe->qid, qe->error);
2949
2950 /* Fill in event information.
2951 * Note that flow_id must be embedded in the data by
2952 * the app, such as the mbuf RSS hash field if the data
2953 * buffer is a mbuf.
2954 */
2955 if (unlikely(qe->error)) {
2956 DLB_LOG_ERR("QE error bit ON\n");
2957 DLB_INC_STAT(ev_port->stats.traffic.rx_drop, 1);
2958 dlb_consume_qe_immediate(qm_port, 1);
2959 continue; /* Ignore */
2960 }
2961
2962 events[num].u64 = qe->data;
2963 events[num].queue_id = qid_mappings[qe->qid];
2964 events[num].priority = DLB_TO_EV_PRIO((uint8_t)qe->priority);
2965 events[num].event_type = qe->u.event_type.major;
2966 events[num].sub_event_type = qe->u.event_type.sub;
2967 events[num].sched_type = sched_type_map[qe->sched_type];
2968 DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qe->sched_type], 1);
2969 num++;
2970 }
2971 DLB_INC_STAT(ev_port->stats.traffic.rx_ok, num);
2972
2973 return num;
2974 }
2975
2976 static inline int
dlb_process_dequeue_four_qes(struct dlb_eventdev_port * ev_port,struct dlb_port * qm_port,struct rte_event * events,struct dlb_dequeue_qe * qes)2977 dlb_process_dequeue_four_qes(struct dlb_eventdev_port *ev_port,
2978 struct dlb_port *qm_port,
2979 struct rte_event *events,
2980 struct dlb_dequeue_qe *qes)
2981 {
2982 int sched_type_map[] = {
2983 [DLB_SCHED_ATOMIC] = RTE_SCHED_TYPE_ATOMIC,
2984 [DLB_SCHED_UNORDERED] = RTE_SCHED_TYPE_PARALLEL,
2985 [DLB_SCHED_ORDERED] = RTE_SCHED_TYPE_ORDERED,
2986 [DLB_SCHED_DIRECTED] = RTE_SCHED_TYPE_ATOMIC,
2987 };
2988 const int num_events = DLB_NUM_QES_PER_CACHE_LINE;
2989 uint8_t *qid_mappings = qm_port->qid_mappings;
2990 __m128i sse_evt[2];
2991 int i;
2992
2993 /* In the unlikely case that any of the QE error bits are set, process
2994 * them one at a time.
2995 */
2996 if (unlikely(qes[0].error || qes[1].error ||
2997 qes[2].error || qes[3].error))
2998 return dlb_process_dequeue_qes(ev_port, qm_port, events,
2999 qes, num_events);
3000
3001 for (i = 0; i < DLB_NUM_QES_PER_CACHE_LINE; i++) {
3002 DLB_LOG_DBG("dequeue success, data = 0x%llx, qid=%d, event_type=%d, subevent=%d\npp_id = %d, sched_type = %d, qid = %d, err=%d\n",
3003 (long long)qes[i].data, qes[i].qid,
3004 qes[i].u.event_type.major,
3005 qes[i].u.event_type.sub,
3006 qes[i].pp_id, qes[i].sched_type, qes[i].qid,
3007 qes[i].error);
3008 }
3009
3010 events[0].u64 = qes[0].data;
3011 events[1].u64 = qes[1].data;
3012 events[2].u64 = qes[2].data;
3013 events[3].u64 = qes[3].data;
3014
3015 /* Construct the metadata portion of two struct rte_events
3016 * in one 128b SSE register. Event metadata is constructed in the SSE
3017 * registers like so:
3018 * sse_evt[0][63:0]: event[0]'s metadata
3019 * sse_evt[0][127:64]: event[1]'s metadata
3020 * sse_evt[1][63:0]: event[2]'s metadata
3021 * sse_evt[1][127:64]: event[3]'s metadata
3022 */
3023 sse_evt[0] = _mm_setzero_si128();
3024 sse_evt[1] = _mm_setzero_si128();
3025
3026 /* Convert the hardware queue ID to an event queue ID and store it in
3027 * the metadata:
3028 * sse_evt[0][47:40] = qid_mappings[qes[0].qid]
3029 * sse_evt[0][111:104] = qid_mappings[qes[1].qid]
3030 * sse_evt[1][47:40] = qid_mappings[qes[2].qid]
3031 * sse_evt[1][111:104] = qid_mappings[qes[3].qid]
3032 */
3033 #define DLB_EVENT_QUEUE_ID_BYTE 5
3034 sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3035 qid_mappings[qes[0].qid],
3036 DLB_EVENT_QUEUE_ID_BYTE);
3037 sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3038 qid_mappings[qes[1].qid],
3039 DLB_EVENT_QUEUE_ID_BYTE + 8);
3040 sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3041 qid_mappings[qes[2].qid],
3042 DLB_EVENT_QUEUE_ID_BYTE);
3043 sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3044 qid_mappings[qes[3].qid],
3045 DLB_EVENT_QUEUE_ID_BYTE + 8);
3046
3047 /* Convert the hardware priority to an event priority and store it in
3048 * the metadata:
3049 * sse_evt[0][55:48] = DLB_TO_EV_PRIO(qes[0].priority)
3050 * sse_evt[0][119:112] = DLB_TO_EV_PRIO(qes[1].priority)
3051 * sse_evt[1][55:48] = DLB_TO_EV_PRIO(qes[2].priority)
3052 * sse_evt[1][119:112] = DLB_TO_EV_PRIO(qes[3].priority)
3053 */
3054 #define DLB_EVENT_PRIO_BYTE 6
3055 sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3056 DLB_TO_EV_PRIO((uint8_t)qes[0].priority),
3057 DLB_EVENT_PRIO_BYTE);
3058 sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3059 DLB_TO_EV_PRIO((uint8_t)qes[1].priority),
3060 DLB_EVENT_PRIO_BYTE + 8);
3061 sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3062 DLB_TO_EV_PRIO((uint8_t)qes[2].priority),
3063 DLB_EVENT_PRIO_BYTE);
3064 sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3065 DLB_TO_EV_PRIO((uint8_t)qes[3].priority),
3066 DLB_EVENT_PRIO_BYTE + 8);
3067
3068 /* Write the event type and sub event type to the event metadata. Leave
3069 * flow ID unspecified, since the hardware does not maintain it during
3070 * scheduling:
3071 * sse_evt[0][31:0] = qes[0].u.event_type.major << 28 |
3072 * qes[0].u.event_type.sub << 20;
3073 * sse_evt[0][95:64] = qes[1].u.event_type.major << 28 |
3074 * qes[1].u.event_type.sub << 20;
3075 * sse_evt[1][31:0] = qes[2].u.event_type.major << 28 |
3076 * qes[2].u.event_type.sub << 20;
3077 * sse_evt[1][95:64] = qes[3].u.event_type.major << 28 |
3078 * qes[3].u.event_type.sub << 20;
3079 */
3080 #define DLB_EVENT_EV_TYPE_DW 0
3081 #define DLB_EVENT_EV_TYPE_SHIFT 28
3082 #define DLB_EVENT_SUB_EV_TYPE_SHIFT 20
3083 sse_evt[0] = _mm_insert_epi32(sse_evt[0],
3084 qes[0].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT |
3085 qes[0].u.event_type.sub << DLB_EVENT_SUB_EV_TYPE_SHIFT,
3086 DLB_EVENT_EV_TYPE_DW);
3087 sse_evt[0] = _mm_insert_epi32(sse_evt[0],
3088 qes[1].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT |
3089 qes[1].u.event_type.sub << DLB_EVENT_SUB_EV_TYPE_SHIFT,
3090 DLB_EVENT_EV_TYPE_DW + 2);
3091 sse_evt[1] = _mm_insert_epi32(sse_evt[1],
3092 qes[2].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT |
3093 qes[2].u.event_type.sub << DLB_EVENT_SUB_EV_TYPE_SHIFT,
3094 DLB_EVENT_EV_TYPE_DW);
3095 sse_evt[1] = _mm_insert_epi32(sse_evt[1],
3096 qes[3].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT |
3097 qes[3].u.event_type.sub << DLB_EVENT_SUB_EV_TYPE_SHIFT,
3098 DLB_EVENT_EV_TYPE_DW + 2);
3099
3100 /* Write the sched type to the event metadata. 'op' and 'rsvd' are not
3101 * set:
3102 * sse_evt[0][39:32] = sched_type_map[qes[0].sched_type] << 6
3103 * sse_evt[0][103:96] = sched_type_map[qes[1].sched_type] << 6
3104 * sse_evt[1][39:32] = sched_type_map[qes[2].sched_type] << 6
3105 * sse_evt[1][103:96] = sched_type_map[qes[3].sched_type] << 6
3106 */
3107 #define DLB_EVENT_SCHED_TYPE_BYTE 4
3108 #define DLB_EVENT_SCHED_TYPE_SHIFT 6
3109 sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3110 sched_type_map[qes[0].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
3111 DLB_EVENT_SCHED_TYPE_BYTE);
3112 sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3113 sched_type_map[qes[1].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
3114 DLB_EVENT_SCHED_TYPE_BYTE + 8);
3115 sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3116 sched_type_map[qes[2].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
3117 DLB_EVENT_SCHED_TYPE_BYTE);
3118 sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3119 sched_type_map[qes[3].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
3120 DLB_EVENT_SCHED_TYPE_BYTE + 8);
3121
3122 /* Store the metadata to the event (use the double-precision
3123 * _mm_storeh_pd because there is no integer function for storing the
3124 * upper 64b):
3125 * events[0].event = sse_evt[0][63:0]
3126 * events[1].event = sse_evt[0][127:64]
3127 * events[2].event = sse_evt[1][63:0]
3128 * events[3].event = sse_evt[1][127:64]
3129 */
3130 _mm_storel_epi64((__m128i *)&events[0].event, sse_evt[0]);
3131 _mm_storeh_pd((double *)&events[1].event, (__m128d) sse_evt[0]);
3132 _mm_storel_epi64((__m128i *)&events[2].event, sse_evt[1]);
3133 _mm_storeh_pd((double *)&events[3].event, (__m128d) sse_evt[1]);
3134
3135 DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[0].sched_type], 1);
3136 DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[1].sched_type], 1);
3137 DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[2].sched_type], 1);
3138 DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[3].sched_type], 1);
3139
3140 DLB_INC_STAT(ev_port->stats.traffic.rx_ok, num_events);
3141
3142 return num_events;
3143 }
3144
3145 static inline int
dlb_dequeue_wait(struct dlb_eventdev * dlb,struct dlb_eventdev_port * ev_port,struct dlb_port * qm_port,uint64_t timeout,uint64_t start_ticks)3146 dlb_dequeue_wait(struct dlb_eventdev *dlb,
3147 struct dlb_eventdev_port *ev_port,
3148 struct dlb_port *qm_port,
3149 uint64_t timeout,
3150 uint64_t start_ticks)
3151 {
3152 struct process_local_port_data *port_data;
3153 uint64_t elapsed_ticks;
3154
3155 port_data = &dlb_port[qm_port->id][PORT_TYPE(qm_port)];
3156
3157 elapsed_ticks = rte_get_timer_cycles() - start_ticks;
3158
3159 /* Wait/poll time expired */
3160 if (elapsed_ticks >= timeout) {
3161 /* Interrupts not supported by PF PMD */
3162 return 1;
3163 } else if (dlb->umwait_allowed) {
3164 volatile struct dlb_dequeue_qe *cq_base;
3165 union {
3166 uint64_t raw_qe[2];
3167 struct dlb_dequeue_qe qe;
3168 } qe_mask;
3169 uint64_t expected_value;
3170 volatile uint64_t *monitor_addr;
3171
3172 qe_mask.qe.cq_gen = 1; /* set mask */
3173
3174 cq_base = port_data->cq_base;
3175 monitor_addr = (volatile uint64_t *)(volatile void *)
3176 &cq_base[qm_port->cq_idx];
3177 monitor_addr++; /* cq_gen bit is in second 64bit location */
3178
3179 if (qm_port->gen_bit)
3180 expected_value = qe_mask.raw_qe[1];
3181 else
3182 expected_value = 0;
3183
3184 rte_power_monitor(monitor_addr, expected_value,
3185 qe_mask.raw_qe[1], timeout + start_ticks,
3186 sizeof(uint64_t));
3187
3188 DLB_INC_STAT(ev_port->stats.traffic.rx_umonitor_umwait, 1);
3189 } else {
3190 uint64_t poll_interval = RTE_LIBRTE_PMD_DLB_POLL_INTERVAL;
3191 uint64_t curr_ticks = rte_get_timer_cycles();
3192 uint64_t init_ticks = curr_ticks;
3193
3194 while ((curr_ticks - start_ticks < timeout) &&
3195 (curr_ticks - init_ticks < poll_interval))
3196 curr_ticks = rte_get_timer_cycles();
3197 }
3198
3199 return 0;
3200 }
3201
3202 static inline int16_t
dlb_hw_dequeue(struct dlb_eventdev * dlb,struct dlb_eventdev_port * ev_port,struct rte_event * events,uint16_t max_num,uint64_t dequeue_timeout_ticks)3203 dlb_hw_dequeue(struct dlb_eventdev *dlb,
3204 struct dlb_eventdev_port *ev_port,
3205 struct rte_event *events,
3206 uint16_t max_num,
3207 uint64_t dequeue_timeout_ticks)
3208 {
3209 uint64_t timeout;
3210 uint64_t start_ticks = 0ULL;
3211 struct dlb_port *qm_port;
3212 int num = 0;
3213
3214 qm_port = &ev_port->qm_port;
3215
3216 /* If configured for per dequeue wait, then use wait value provided
3217 * to this API. Otherwise we must use the global
3218 * value from eventdev config time.
3219 */
3220 if (!dlb->global_dequeue_wait)
3221 timeout = dequeue_timeout_ticks;
3222 else
3223 timeout = dlb->global_dequeue_wait_ticks;
3224
3225 if (timeout)
3226 start_ticks = rte_get_timer_cycles();
3227
3228 while (num < max_num) {
3229 struct dlb_dequeue_qe qes[DLB_NUM_QES_PER_CACHE_LINE];
3230 uint8_t offset;
3231 int num_avail;
3232
3233 /* Copy up to 4 QEs from the current cache line into qes */
3234 num_avail = dlb_recv_qe(qm_port, qes, &offset);
3235
3236 /* But don't process more than the user requested */
3237 num_avail = RTE_MIN(num_avail, max_num - num);
3238
3239 dlb_inc_cq_idx(qm_port, num_avail);
3240
3241 if (num_avail == DLB_NUM_QES_PER_CACHE_LINE)
3242 num += dlb_process_dequeue_four_qes(ev_port,
3243 qm_port,
3244 &events[num],
3245 &qes[offset]);
3246 else if (num_avail)
3247 num += dlb_process_dequeue_qes(ev_port,
3248 qm_port,
3249 &events[num],
3250 &qes[offset],
3251 num_avail);
3252 else if ((timeout == 0) || (num > 0))
3253 /* Not waiting in any form, or 1+ events received? */
3254 break;
3255 else if (dlb_dequeue_wait(dlb, ev_port, qm_port,
3256 timeout, start_ticks))
3257 break;
3258 }
3259
3260 qm_port->owed_tokens += num;
3261
3262 if (num && qm_port->token_pop_mode == AUTO_POP)
3263 dlb_consume_qe_immediate(qm_port, num);
3264
3265 ev_port->outstanding_releases += num;
3266
3267 return num;
3268 }
3269
3270 static __rte_always_inline int
dlb_recv_qe_sparse(struct dlb_port * qm_port,struct dlb_dequeue_qe * qe)3271 dlb_recv_qe_sparse(struct dlb_port *qm_port, struct dlb_dequeue_qe *qe)
3272 {
3273 volatile struct dlb_dequeue_qe *cq_addr;
3274 uint8_t xor_mask[2] = {0x0F, 0x00};
3275 const uint8_t and_mask = 0x0F;
3276 __m128i *qes = (__m128i *)qe;
3277 uint8_t gen_bits, gen_bit;
3278 uintptr_t addr[4];
3279 uint16_t idx;
3280
3281 cq_addr = dlb_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
3282
3283 idx = qm_port->cq_idx;
3284
3285 /* Load the next 4 QEs */
3286 addr[0] = (uintptr_t)&cq_addr[idx];
3287 addr[1] = (uintptr_t)&cq_addr[(idx + 4) & qm_port->cq_depth_mask];
3288 addr[2] = (uintptr_t)&cq_addr[(idx + 8) & qm_port->cq_depth_mask];
3289 addr[3] = (uintptr_t)&cq_addr[(idx + 12) & qm_port->cq_depth_mask];
3290
3291 /* Prefetch next batch of QEs (all CQs occupy minimum 8 cache lines) */
3292 rte_prefetch0(&cq_addr[(idx + 16) & qm_port->cq_depth_mask]);
3293 rte_prefetch0(&cq_addr[(idx + 20) & qm_port->cq_depth_mask]);
3294 rte_prefetch0(&cq_addr[(idx + 24) & qm_port->cq_depth_mask]);
3295 rte_prefetch0(&cq_addr[(idx + 28) & qm_port->cq_depth_mask]);
3296
3297 /* Correct the xor_mask for wrap-around QEs */
3298 gen_bit = qm_port->gen_bit;
3299 xor_mask[gen_bit] ^= !!((idx + 4) > qm_port->cq_depth_mask) << 1;
3300 xor_mask[gen_bit] ^= !!((idx + 8) > qm_port->cq_depth_mask) << 2;
3301 xor_mask[gen_bit] ^= !!((idx + 12) > qm_port->cq_depth_mask) << 3;
3302
3303 /* Read the cache lines backwards to ensure that if QE[N] (N > 0) is
3304 * valid, then QEs[0:N-1] are too.
3305 */
3306 qes[3] = _mm_load_si128((__m128i *)(void *)addr[3]);
3307 rte_compiler_barrier();
3308 qes[2] = _mm_load_si128((__m128i *)(void *)addr[2]);
3309 rte_compiler_barrier();
3310 qes[1] = _mm_load_si128((__m128i *)(void *)addr[1]);
3311 rte_compiler_barrier();
3312 qes[0] = _mm_load_si128((__m128i *)(void *)addr[0]);
3313
3314 /* Extract and combine the gen bits */
3315 gen_bits = ((_mm_extract_epi8(qes[0], 15) & 0x1) << 0) |
3316 ((_mm_extract_epi8(qes[1], 15) & 0x1) << 1) |
3317 ((_mm_extract_epi8(qes[2], 15) & 0x1) << 2) |
3318 ((_mm_extract_epi8(qes[3], 15) & 0x1) << 3);
3319
3320 /* XOR the combined bits such that a 1 represents a valid QE */
3321 gen_bits ^= xor_mask[gen_bit];
3322
3323 /* Mask off gen bits we don't care about */
3324 gen_bits &= and_mask;
3325
3326 return __builtin_popcount(gen_bits);
3327 }
3328
3329 static inline int16_t
dlb_hw_dequeue_sparse(struct dlb_eventdev * dlb,struct dlb_eventdev_port * ev_port,struct rte_event * events,uint16_t max_num,uint64_t dequeue_timeout_ticks)3330 dlb_hw_dequeue_sparse(struct dlb_eventdev *dlb,
3331 struct dlb_eventdev_port *ev_port,
3332 struct rte_event *events,
3333 uint16_t max_num,
3334 uint64_t dequeue_timeout_ticks)
3335 {
3336 uint64_t timeout;
3337 uint64_t start_ticks = 0ULL;
3338 struct dlb_port *qm_port;
3339 int num = 0;
3340
3341 qm_port = &ev_port->qm_port;
3342
3343 /* If configured for per dequeue wait, then use wait value provided
3344 * to this API. Otherwise we must use the global
3345 * value from eventdev config time.
3346 */
3347 if (!dlb->global_dequeue_wait)
3348 timeout = dequeue_timeout_ticks;
3349 else
3350 timeout = dlb->global_dequeue_wait_ticks;
3351
3352 if (timeout)
3353 start_ticks = rte_get_timer_cycles();
3354
3355 while (num < max_num) {
3356 struct dlb_dequeue_qe qes[DLB_NUM_QES_PER_CACHE_LINE];
3357 int num_avail;
3358
3359 /* Copy up to 4 QEs from the current cache line into qes */
3360 num_avail = dlb_recv_qe_sparse(qm_port, qes);
3361
3362 /* But don't process more than the user requested */
3363 num_avail = RTE_MIN(num_avail, max_num - num);
3364
3365 dlb_inc_cq_idx(qm_port, num_avail << 2);
3366
3367 if (num_avail == DLB_NUM_QES_PER_CACHE_LINE)
3368 num += dlb_process_dequeue_four_qes(ev_port,
3369 qm_port,
3370 &events[num],
3371 &qes[0]);
3372 else if (num_avail)
3373 num += dlb_process_dequeue_qes(ev_port,
3374 qm_port,
3375 &events[num],
3376 &qes[0],
3377 num_avail);
3378 else if ((timeout == 0) || (num > 0))
3379 /* Not waiting in any form, or 1+ events received? */
3380 break;
3381 else if (dlb_dequeue_wait(dlb, ev_port, qm_port,
3382 timeout, start_ticks))
3383 break;
3384 }
3385
3386 qm_port->owed_tokens += num;
3387
3388 if (num && qm_port->token_pop_mode == AUTO_POP)
3389 dlb_consume_qe_immediate(qm_port, num);
3390
3391 ev_port->outstanding_releases += num;
3392
3393 return num;
3394 }
3395
3396 static int
dlb_event_release(struct dlb_eventdev * dlb,uint8_t port_id,int n)3397 dlb_event_release(struct dlb_eventdev *dlb, uint8_t port_id, int n)
3398 {
3399 struct process_local_port_data *port_data;
3400 struct dlb_eventdev_port *ev_port;
3401 struct dlb_port *qm_port;
3402 int i;
3403
3404 if (port_id > dlb->num_ports) {
3405 DLB_LOG_ERR("Invalid port id %d in dlb-event_release\n",
3406 port_id);
3407 rte_errno = -EINVAL;
3408 return rte_errno;
3409 }
3410
3411 ev_port = &dlb->ev_ports[port_id];
3412 qm_port = &ev_port->qm_port;
3413 port_data = &dlb_port[qm_port->id][PORT_TYPE(qm_port)];
3414
3415 i = 0;
3416
3417 if (qm_port->is_directed) {
3418 i = n;
3419 goto sw_credit_update;
3420 }
3421
3422 while (i < n) {
3423 int pop_offs = 0;
3424 int j = 0;
3425
3426 /* Zero-out QEs */
3427 qm_port->qe4[0].cmd_byte = 0;
3428 qm_port->qe4[1].cmd_byte = 0;
3429 qm_port->qe4[2].cmd_byte = 0;
3430 qm_port->qe4[3].cmd_byte = 0;
3431
3432 for (; j < DLB_NUM_QES_PER_CACHE_LINE && (i + j) < n; j++) {
3433 int16_t thresh = qm_port->token_pop_thresh;
3434
3435 if (qm_port->token_pop_mode == DELAYED_POP &&
3436 qm_port->issued_releases >= thresh - 1) {
3437 /* Insert the token pop QE */
3438 dlb_construct_token_pop_qe(qm_port, j);
3439
3440 /* Reset the releases for the next QE batch */
3441 qm_port->issued_releases -= thresh;
3442
3443 /* When using delayed token pop mode, the
3444 * initial token threshold is the full CQ
3445 * depth. After the first token pop, we need to
3446 * reset it to the dequeue_depth.
3447 */
3448 qm_port->token_pop_thresh =
3449 qm_port->dequeue_depth;
3450
3451 pop_offs = 1;
3452 j++;
3453 break;
3454 }
3455
3456 qm_port->qe4[j].cmd_byte = DLB_COMP_CMD_BYTE;
3457 qm_port->issued_releases++;
3458 }
3459
3460 dlb_hw_do_enqueue(qm_port, i == 0, port_data);
3461
3462 /* Don't include the token pop QE in the release count */
3463 i += j - pop_offs;
3464 }
3465
3466 sw_credit_update:
3467 /* each release returns one credit */
3468 if (!ev_port->outstanding_releases) {
3469 DLB_LOG_ERR("Unrecoverable application error. Outstanding releases underflowed.\n");
3470 rte_errno = -ENOTRECOVERABLE;
3471 return rte_errno;
3472 }
3473
3474 ev_port->outstanding_releases -= i;
3475 ev_port->inflight_credits += i;
3476
3477 /* Replenish s/w credits if enough releases are performed */
3478 dlb_replenish_sw_credits(dlb, ev_port);
3479 return 0;
3480 }
3481
3482 static uint16_t
dlb_event_dequeue_burst(void * event_port,struct rte_event * ev,uint16_t num,uint64_t wait)3483 dlb_event_dequeue_burst(void *event_port, struct rte_event *ev, uint16_t num,
3484 uint64_t wait)
3485 {
3486 struct dlb_eventdev_port *ev_port = event_port;
3487 struct dlb_port *qm_port = &ev_port->qm_port;
3488 struct dlb_eventdev *dlb = ev_port->dlb;
3489 uint16_t cnt;
3490 int ret;
3491
3492 rte_errno = 0;
3493
3494 RTE_ASSERT(ev_port->setup_done);
3495 RTE_ASSERT(ev != NULL);
3496
3497 if (ev_port->implicit_release && ev_port->outstanding_releases > 0) {
3498 uint16_t out_rels = ev_port->outstanding_releases;
3499
3500 ret = dlb_event_release(dlb, ev_port->id, out_rels);
3501 if (ret)
3502 return(ret);
3503
3504 DLB_INC_STAT(ev_port->stats.tx_implicit_rel, out_rels);
3505 }
3506
3507 if (qm_port->token_pop_mode == DEFERRED_POP &&
3508 qm_port->owed_tokens)
3509 dlb_consume_qe_immediate(qm_port, qm_port->owed_tokens);
3510
3511 cnt = dlb_hw_dequeue(dlb, ev_port, ev, num, wait);
3512
3513 DLB_INC_STAT(ev_port->stats.traffic.total_polls, 1);
3514 DLB_INC_STAT(ev_port->stats.traffic.zero_polls, ((cnt == 0) ? 1 : 0));
3515 return cnt;
3516 }
3517
3518 static uint16_t
dlb_event_dequeue(void * event_port,struct rte_event * ev,uint64_t wait)3519 dlb_event_dequeue(void *event_port, struct rte_event *ev, uint64_t wait)
3520 {
3521 return dlb_event_dequeue_burst(event_port, ev, 1, wait);
3522 }
3523
3524 static uint16_t
dlb_event_dequeue_burst_sparse(void * event_port,struct rte_event * ev,uint16_t num,uint64_t wait)3525 dlb_event_dequeue_burst_sparse(void *event_port, struct rte_event *ev,
3526 uint16_t num, uint64_t wait)
3527 {
3528 struct dlb_eventdev_port *ev_port = event_port;
3529 struct dlb_port *qm_port = &ev_port->qm_port;
3530 struct dlb_eventdev *dlb = ev_port->dlb;
3531 uint16_t cnt;
3532 int ret;
3533
3534 rte_errno = 0;
3535
3536 RTE_ASSERT(ev_port->setup_done);
3537 RTE_ASSERT(ev != NULL);
3538
3539 if (ev_port->implicit_release && ev_port->outstanding_releases > 0) {
3540 uint16_t out_rels = ev_port->outstanding_releases;
3541
3542 ret = dlb_event_release(dlb, ev_port->id, out_rels);
3543 if (ret)
3544 return(ret);
3545
3546 DLB_INC_STAT(ev_port->stats.tx_implicit_rel, out_rels);
3547 }
3548
3549 if (qm_port->token_pop_mode == DEFERRED_POP &&
3550 qm_port->owed_tokens)
3551 dlb_consume_qe_immediate(qm_port, qm_port->owed_tokens);
3552
3553 cnt = dlb_hw_dequeue_sparse(dlb, ev_port, ev, num, wait);
3554
3555 DLB_INC_STAT(ev_port->stats.traffic.total_polls, 1);
3556 DLB_INC_STAT(ev_port->stats.traffic.zero_polls, ((cnt == 0) ? 1 : 0));
3557 return cnt;
3558 }
3559
3560 static uint16_t
dlb_event_dequeue_sparse(void * event_port,struct rte_event * ev,uint64_t wait)3561 dlb_event_dequeue_sparse(void *event_port, struct rte_event *ev, uint64_t wait)
3562 {
3563 return dlb_event_dequeue_burst_sparse(event_port, ev, 1, wait);
3564 }
3565
3566 static uint32_t
dlb_get_ldb_queue_depth(struct dlb_eventdev * dlb,struct dlb_eventdev_queue * queue)3567 dlb_get_ldb_queue_depth(struct dlb_eventdev *dlb,
3568 struct dlb_eventdev_queue *queue)
3569 {
3570 struct dlb_hw_dev *handle = &dlb->qm_instance;
3571 struct dlb_get_ldb_queue_depth_args cfg;
3572 struct dlb_cmd_response response;
3573 int ret;
3574
3575 cfg.queue_id = queue->qm_queue.id;
3576 cfg.response = (uintptr_t)&response;
3577
3578 ret = dlb_iface_get_ldb_queue_depth(handle, &cfg);
3579 if (ret < 0) {
3580 DLB_LOG_ERR("dlb: get_ldb_queue_depth ret=%d (driver status: %s)\n",
3581 ret, dlb_error_strings[response.status]);
3582 return ret;
3583 }
3584
3585 return response.id;
3586 }
3587
3588 static uint32_t
dlb_get_dir_queue_depth(struct dlb_eventdev * dlb,struct dlb_eventdev_queue * queue)3589 dlb_get_dir_queue_depth(struct dlb_eventdev *dlb,
3590 struct dlb_eventdev_queue *queue)
3591 {
3592 struct dlb_hw_dev *handle = &dlb->qm_instance;
3593 struct dlb_get_dir_queue_depth_args cfg;
3594 struct dlb_cmd_response response;
3595 int ret;
3596
3597 cfg.queue_id = queue->qm_queue.id;
3598 cfg.response = (uintptr_t)&response;
3599
3600 ret = dlb_iface_get_dir_queue_depth(handle, &cfg);
3601 if (ret < 0) {
3602 DLB_LOG_ERR("dlb: get_dir_queue_depth ret=%d (driver status: %s)\n",
3603 ret, dlb_error_strings[response.status]);
3604 return ret;
3605 }
3606
3607 return response.id;
3608 }
3609
3610 uint32_t
dlb_get_queue_depth(struct dlb_eventdev * dlb,struct dlb_eventdev_queue * queue)3611 dlb_get_queue_depth(struct dlb_eventdev *dlb,
3612 struct dlb_eventdev_queue *queue)
3613 {
3614 if (queue->qm_queue.is_directed)
3615 return dlb_get_dir_queue_depth(dlb, queue);
3616 else
3617 return dlb_get_ldb_queue_depth(dlb, queue);
3618 }
3619
3620 static bool
dlb_queue_is_empty(struct dlb_eventdev * dlb,struct dlb_eventdev_queue * queue)3621 dlb_queue_is_empty(struct dlb_eventdev *dlb,
3622 struct dlb_eventdev_queue *queue)
3623 {
3624 return dlb_get_queue_depth(dlb, queue) == 0;
3625 }
3626
3627 static bool
dlb_linked_queues_empty(struct dlb_eventdev * dlb)3628 dlb_linked_queues_empty(struct dlb_eventdev *dlb)
3629 {
3630 int i;
3631
3632 for (i = 0; i < dlb->num_queues; i++) {
3633 if (dlb->ev_queues[i].num_links == 0)
3634 continue;
3635 if (!dlb_queue_is_empty(dlb, &dlb->ev_queues[i]))
3636 return false;
3637 }
3638
3639 return true;
3640 }
3641
3642 static bool
dlb_queues_empty(struct dlb_eventdev * dlb)3643 dlb_queues_empty(struct dlb_eventdev *dlb)
3644 {
3645 int i;
3646
3647 for (i = 0; i < dlb->num_queues; i++) {
3648 if (!dlb_queue_is_empty(dlb, &dlb->ev_queues[i]))
3649 return false;
3650 }
3651
3652 return true;
3653 }
3654
3655 static void
dlb_flush_port(struct rte_eventdev * dev,int port_id)3656 dlb_flush_port(struct rte_eventdev *dev, int port_id)
3657 {
3658 struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
3659 eventdev_stop_flush_t flush;
3660 struct rte_event ev;
3661 uint8_t dev_id;
3662 void *arg;
3663 int i;
3664
3665 flush = dev->dev_ops->dev_stop_flush;
3666 dev_id = dev->data->dev_id;
3667 arg = dev->data->dev_stop_flush_arg;
3668
3669 while (rte_event_dequeue_burst(dev_id, port_id, &ev, 1, 0)) {
3670 if (flush)
3671 flush(dev_id, ev, arg);
3672
3673 if (dlb->ev_ports[port_id].qm_port.is_directed)
3674 continue;
3675
3676 ev.op = RTE_EVENT_OP_RELEASE;
3677
3678 rte_event_enqueue_burst(dev_id, port_id, &ev, 1);
3679 }
3680
3681 /* Enqueue any additional outstanding releases */
3682 ev.op = RTE_EVENT_OP_RELEASE;
3683
3684 for (i = dlb->ev_ports[port_id].outstanding_releases; i > 0; i--)
3685 rte_event_enqueue_burst(dev_id, port_id, &ev, 1);
3686 }
3687
3688 static void
dlb_drain(struct rte_eventdev * dev)3689 dlb_drain(struct rte_eventdev *dev)
3690 {
3691 struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
3692 struct dlb_eventdev_port *ev_port = NULL;
3693 uint8_t dev_id;
3694 int i;
3695
3696 dev_id = dev->data->dev_id;
3697
3698 while (!dlb_linked_queues_empty(dlb)) {
3699 /* Flush all the ev_ports, which will drain all their connected
3700 * queues.
3701 */
3702 for (i = 0; i < dlb->num_ports; i++)
3703 dlb_flush_port(dev, i);
3704 }
3705
3706 /* The queues are empty, but there may be events left in the ports. */
3707 for (i = 0; i < dlb->num_ports; i++)
3708 dlb_flush_port(dev, i);
3709
3710 /* If the domain's queues are empty, we're done. */
3711 if (dlb_queues_empty(dlb))
3712 return;
3713
3714 /* Else, there must be at least one unlinked load-balanced queue.
3715 * Select a load-balanced port with which to drain the unlinked
3716 * queue(s).
3717 */
3718 for (i = 0; i < dlb->num_ports; i++) {
3719 ev_port = &dlb->ev_ports[i];
3720
3721 if (!ev_port->qm_port.is_directed)
3722 break;
3723 }
3724
3725 if (i == dlb->num_ports) {
3726 DLB_LOG_ERR("internal error: no LDB ev_ports\n");
3727 return;
3728 }
3729
3730 rte_errno = 0;
3731 rte_event_port_unlink(dev_id, ev_port->id, NULL, 0);
3732
3733 if (rte_errno) {
3734 DLB_LOG_ERR("internal error: failed to unlink ev_port %d\n",
3735 ev_port->id);
3736 return;
3737 }
3738
3739 for (i = 0; i < dlb->num_queues; i++) {
3740 uint8_t qid, prio;
3741 int ret;
3742
3743 if (dlb_queue_is_empty(dlb, &dlb->ev_queues[i]))
3744 continue;
3745
3746 qid = i;
3747 prio = 0;
3748
3749 /* Link the ev_port to the queue */
3750 ret = rte_event_port_link(dev_id, ev_port->id, &qid, &prio, 1);
3751 if (ret != 1) {
3752 DLB_LOG_ERR("internal error: failed to link ev_port %d to queue %d\n",
3753 ev_port->id, qid);
3754 return;
3755 }
3756
3757 /* Flush the queue */
3758 while (!dlb_queue_is_empty(dlb, &dlb->ev_queues[i]))
3759 dlb_flush_port(dev, ev_port->id);
3760
3761 /* Drain any extant events in the ev_port. */
3762 dlb_flush_port(dev, ev_port->id);
3763
3764 /* Unlink the ev_port from the queue */
3765 ret = rte_event_port_unlink(dev_id, ev_port->id, &qid, 1);
3766 if (ret != 1) {
3767 DLB_LOG_ERR("internal error: failed to unlink ev_port %d to queue %d\n",
3768 ev_port->id, qid);
3769 return;
3770 }
3771 }
3772 }
3773
3774 static void
dlb_eventdev_stop(struct rte_eventdev * dev)3775 dlb_eventdev_stop(struct rte_eventdev *dev)
3776 {
3777 struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
3778
3779 rte_spinlock_lock(&dlb->qm_instance.resource_lock);
3780
3781 if (dlb->run_state == DLB_RUN_STATE_STOPPED) {
3782 DLB_LOG_DBG("Internal error: already stopped\n");
3783 rte_spinlock_unlock(&dlb->qm_instance.resource_lock);
3784 return;
3785 } else if (dlb->run_state != DLB_RUN_STATE_STARTED) {
3786 DLB_LOG_ERR("Internal error: bad state %d for dev_stop\n",
3787 (int)dlb->run_state);
3788 rte_spinlock_unlock(&dlb->qm_instance.resource_lock);
3789 return;
3790 }
3791
3792 dlb->run_state = DLB_RUN_STATE_STOPPING;
3793
3794 rte_spinlock_unlock(&dlb->qm_instance.resource_lock);
3795
3796 dlb_drain(dev);
3797
3798 dlb->run_state = DLB_RUN_STATE_STOPPED;
3799 }
3800
3801 static int
dlb_eventdev_close(struct rte_eventdev * dev)3802 dlb_eventdev_close(struct rte_eventdev *dev)
3803 {
3804 dlb_hw_reset_sched_domain(dev, false);
3805
3806 return 0;
3807 }
3808
3809 static void
dlb_eventdev_port_release(void * port)3810 dlb_eventdev_port_release(void *port)
3811 {
3812 struct dlb_eventdev_port *ev_port = port;
3813
3814 if (ev_port) {
3815 struct dlb_port *qm_port = &ev_port->qm_port;
3816
3817 if (qm_port->config_state == DLB_CONFIGURED)
3818 dlb_free_qe_mem(qm_port);
3819 }
3820 }
3821
3822 static void
dlb_eventdev_queue_release(struct rte_eventdev * dev,uint8_t id)3823 dlb_eventdev_queue_release(struct rte_eventdev *dev, uint8_t id)
3824 {
3825 RTE_SET_USED(dev);
3826 RTE_SET_USED(id);
3827
3828 /* This function intentionally left blank. */
3829 }
3830
3831 static int
dlb_eventdev_timeout_ticks(struct rte_eventdev * dev,uint64_t ns,uint64_t * timeout_ticks)3832 dlb_eventdev_timeout_ticks(struct rte_eventdev *dev, uint64_t ns,
3833 uint64_t *timeout_ticks)
3834 {
3835 RTE_SET_USED(dev);
3836 uint64_t cycles_per_ns = rte_get_timer_hz() / 1E9;
3837
3838 *timeout_ticks = ns * cycles_per_ns;
3839
3840 return 0;
3841 }
3842
3843 void
dlb_entry_points_init(struct rte_eventdev * dev)3844 dlb_entry_points_init(struct rte_eventdev *dev)
3845 {
3846 struct dlb_eventdev *dlb;
3847
3848 static struct rte_eventdev_ops dlb_eventdev_entry_ops = {
3849 .dev_infos_get = dlb_eventdev_info_get,
3850 .dev_configure = dlb_eventdev_configure,
3851 .dev_start = dlb_eventdev_start,
3852 .dev_stop = dlb_eventdev_stop,
3853 .dev_close = dlb_eventdev_close,
3854 .queue_def_conf = dlb_eventdev_queue_default_conf_get,
3855 .port_def_conf = dlb_eventdev_port_default_conf_get,
3856 .queue_setup = dlb_eventdev_queue_setup,
3857 .queue_release = dlb_eventdev_queue_release,
3858 .port_setup = dlb_eventdev_port_setup,
3859 .port_release = dlb_eventdev_port_release,
3860 .port_link = dlb_eventdev_port_link,
3861 .port_unlink = dlb_eventdev_port_unlink,
3862 .port_unlinks_in_progress =
3863 dlb_eventdev_port_unlinks_in_progress,
3864 .timeout_ticks = dlb_eventdev_timeout_ticks,
3865 .dump = dlb_eventdev_dump,
3866 .xstats_get = dlb_eventdev_xstats_get,
3867 .xstats_get_names = dlb_eventdev_xstats_get_names,
3868 .xstats_get_by_name = dlb_eventdev_xstats_get_by_name,
3869 .xstats_reset = dlb_eventdev_xstats_reset,
3870 .dev_selftest = test_dlb_eventdev,
3871 };
3872
3873 /* Expose PMD's eventdev interface */
3874 dev->dev_ops = &dlb_eventdev_entry_ops;
3875
3876 dev->enqueue = dlb_event_enqueue;
3877 dev->enqueue_burst = dlb_event_enqueue_burst;
3878 dev->enqueue_new_burst = dlb_event_enqueue_new_burst;
3879 dev->enqueue_forward_burst = dlb_event_enqueue_forward_burst;
3880 dev->dequeue = dlb_event_dequeue;
3881 dev->dequeue_burst = dlb_event_dequeue_burst;
3882
3883 dlb = dev->data->dev_private;
3884
3885 if (dlb->poll_mode == DLB_CQ_POLL_MODE_SPARSE) {
3886 dev->dequeue = dlb_event_dequeue_sparse;
3887 dev->dequeue_burst = dlb_event_dequeue_burst_sparse;
3888 }
3889 }
3890
3891 int
dlb_primary_eventdev_probe(struct rte_eventdev * dev,const char * name,struct dlb_devargs * dlb_args)3892 dlb_primary_eventdev_probe(struct rte_eventdev *dev,
3893 const char *name,
3894 struct dlb_devargs *dlb_args)
3895 {
3896 struct dlb_eventdev *dlb;
3897 int err, i;
3898
3899 dlb = dev->data->dev_private;
3900
3901 dlb->event_dev = dev; /* backlink */
3902
3903 evdev_dlb_default_info.driver_name = name;
3904
3905 dlb->max_num_events_override = dlb_args->max_num_events;
3906 dlb->num_dir_credits_override = dlb_args->num_dir_credits_override;
3907 dlb->defer_sched = dlb_args->defer_sched;
3908 dlb->num_atm_inflights_per_queue = dlb_args->num_atm_inflights;
3909
3910 /* Open the interface.
3911 * For vdev mode, this means open the dlb kernel module.
3912 */
3913 err = dlb_iface_open(&dlb->qm_instance, name);
3914 if (err < 0) {
3915 DLB_LOG_ERR("could not open event hardware device, err=%d\n",
3916 err);
3917 return err;
3918 }
3919
3920 err = dlb_iface_get_device_version(&dlb->qm_instance, &dlb->revision);
3921 if (err < 0) {
3922 DLB_LOG_ERR("dlb: failed to get the device version, err=%d\n",
3923 err);
3924 return err;
3925 }
3926
3927 err = dlb_hw_query_resources(dlb);
3928 if (err) {
3929 DLB_LOG_ERR("get resources err=%d for %s\n", err, name);
3930 return err;
3931 }
3932
3933 err = dlb_iface_get_cq_poll_mode(&dlb->qm_instance, &dlb->poll_mode);
3934 if (err < 0) {
3935 DLB_LOG_ERR("dlb: failed to get the poll mode, err=%d\n", err);
3936 return err;
3937 }
3938
3939 /* Complete xtstats runtime initialization */
3940 err = dlb_xstats_init(dlb);
3941 if (err) {
3942 DLB_LOG_ERR("dlb: failed to init xstats, err=%d\n", err);
3943 return err;
3944 }
3945
3946 /* Initialize each port's token pop mode */
3947 for (i = 0; i < DLB_MAX_NUM_PORTS; i++)
3948 dlb->ev_ports[i].qm_port.token_pop_mode = AUTO_POP;
3949
3950 rte_spinlock_init(&dlb->qm_instance.resource_lock);
3951
3952 dlb_iface_low_level_io_init(dlb);
3953
3954 dlb_entry_points_init(dev);
3955
3956 return 0;
3957 }
3958
3959 int
dlb_secondary_eventdev_probe(struct rte_eventdev * dev,const char * name)3960 dlb_secondary_eventdev_probe(struct rte_eventdev *dev,
3961 const char *name)
3962 {
3963 struct dlb_eventdev *dlb;
3964 int err;
3965
3966 dlb = dev->data->dev_private;
3967
3968 evdev_dlb_default_info.driver_name = name;
3969
3970 err = dlb_iface_open(&dlb->qm_instance, name);
3971 if (err < 0) {
3972 DLB_LOG_ERR("could not open event hardware device, err=%d\n",
3973 err);
3974 return err;
3975 }
3976
3977 err = dlb_hw_query_resources(dlb);
3978 if (err) {
3979 DLB_LOG_ERR("get resources err=%d for %s\n", err, name);
3980 return err;
3981 }
3982
3983 dlb_iface_low_level_io_init(dlb);
3984
3985 dlb_entry_points_init(dev);
3986
3987 return 0;
3988 }
3989
3990 int
dlb_parse_params(const char * params,const char * name,struct dlb_devargs * dlb_args)3991 dlb_parse_params(const char *params,
3992 const char *name,
3993 struct dlb_devargs *dlb_args)
3994 {
3995 int ret = 0;
3996 static const char * const args[] = { NUMA_NODE_ARG,
3997 DLB_MAX_NUM_EVENTS,
3998 DLB_NUM_DIR_CREDITS,
3999 DEV_ID_ARG,
4000 DLB_DEFER_SCHED_ARG,
4001 DLB_NUM_ATM_INFLIGHTS_ARG,
4002 NULL };
4003
4004 if (params && params[0] != '\0') {
4005 struct rte_kvargs *kvlist = rte_kvargs_parse(params, args);
4006
4007 if (kvlist == NULL) {
4008 DLB_LOG_INFO("Ignoring unsupported parameters when creating device '%s'\n",
4009 name);
4010 } else {
4011 int ret = rte_kvargs_process(kvlist, NUMA_NODE_ARG,
4012 set_numa_node,
4013 &dlb_args->socket_id);
4014 if (ret != 0) {
4015 DLB_LOG_ERR("%s: Error parsing numa node parameter",
4016 name);
4017 rte_kvargs_free(kvlist);
4018 return ret;
4019 }
4020
4021 ret = rte_kvargs_process(kvlist, DLB_MAX_NUM_EVENTS,
4022 set_max_num_events,
4023 &dlb_args->max_num_events);
4024 if (ret != 0) {
4025 DLB_LOG_ERR("%s: Error parsing max_num_events parameter",
4026 name);
4027 rte_kvargs_free(kvlist);
4028 return ret;
4029 }
4030
4031 ret = rte_kvargs_process(kvlist,
4032 DLB_NUM_DIR_CREDITS,
4033 set_num_dir_credits,
4034 &dlb_args->num_dir_credits_override);
4035 if (ret != 0) {
4036 DLB_LOG_ERR("%s: Error parsing num_dir_credits parameter",
4037 name);
4038 rte_kvargs_free(kvlist);
4039 return ret;
4040 }
4041
4042 ret = rte_kvargs_process(kvlist, DEV_ID_ARG,
4043 set_dev_id,
4044 &dlb_args->dev_id);
4045 if (ret != 0) {
4046 DLB_LOG_ERR("%s: Error parsing dev_id parameter",
4047 name);
4048 rte_kvargs_free(kvlist);
4049 return ret;
4050 }
4051
4052 ret = rte_kvargs_process(kvlist, DLB_DEFER_SCHED_ARG,
4053 set_defer_sched,
4054 &dlb_args->defer_sched);
4055 if (ret != 0) {
4056 DLB_LOG_ERR("%s: Error parsing defer_sched parameter",
4057 name);
4058 rte_kvargs_free(kvlist);
4059 return ret;
4060 }
4061
4062 ret = rte_kvargs_process(kvlist,
4063 DLB_NUM_ATM_INFLIGHTS_ARG,
4064 set_num_atm_inflights,
4065 &dlb_args->num_atm_inflights);
4066 if (ret != 0) {
4067 DLB_LOG_ERR("%s: Error parsing atm_inflights parameter",
4068 name);
4069 rte_kvargs_free(kvlist);
4070 return ret;
4071 }
4072
4073 rte_kvargs_free(kvlist);
4074 }
4075 }
4076 return ret;
4077 }
4078 RTE_LOG_REGISTER(eventdev_dlb_log_level, pmd.event.dlb, NOTICE);
4079