1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #if defined(LINUX)
6 #include <sys/epoll.h>
7 #endif
8 #include <unistd.h>
9 
10 #include <rte_cycles.h>
11 #include <rte_common.h>
12 #include <rte_dev.h>
13 #include <rte_errno.h>
14 #include <rte_ethdev.h>
15 #include <rte_log.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
20 
21 #include "rte_eventdev.h"
22 #include "rte_eventdev_pmd.h"
23 #include "rte_eventdev_trace.h"
24 #include "rte_event_eth_rx_adapter.h"
25 
26 #define BATCH_SIZE		32
27 #define BLOCK_CNT_THRESHOLD	10
28 #define ETH_EVENT_BUFFER_SIZE	(4*BATCH_SIZE)
29 
30 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN	32
31 #define ETH_RX_ADAPTER_MEM_NAME_LEN	32
32 
33 #define RSS_KEY_SIZE	40
34 /* value written to intr thread pipe to signal thread exit */
35 #define ETH_BRIDGE_INTR_THREAD_EXIT	1
36 /* Sentinel value to detect initialized file handle */
37 #define INIT_FD		-1
38 
39 /*
40  * Used to store port and queue ID of interrupting Rx queue
41  */
42 union queue_data {
43 	RTE_STD_C11
44 	void *ptr;
45 	struct {
46 		uint16_t port;
47 		uint16_t queue;
48 	};
49 };
50 
51 /*
52  * There is an instance of this struct per polled Rx queue added to the
53  * adapter
54  */
55 struct eth_rx_poll_entry {
56 	/* Eth port to poll */
57 	uint16_t eth_dev_id;
58 	/* Eth rx queue to poll */
59 	uint16_t eth_rx_qid;
60 };
61 
62 /* Instance per adapter */
63 struct rte_eth_event_enqueue_buffer {
64 	/* Count of events in this buffer */
65 	uint16_t count;
66 	/* Array of events in this buffer */
67 	struct rte_event events[ETH_EVENT_BUFFER_SIZE];
68 };
69 
70 struct rte_event_eth_rx_adapter {
71 	/* RSS key */
72 	uint8_t rss_key_be[RSS_KEY_SIZE];
73 	/* Event device identifier */
74 	uint8_t eventdev_id;
75 	/* Per ethernet device structure */
76 	struct eth_device_info *eth_devices;
77 	/* Event port identifier */
78 	uint8_t event_port_id;
79 	/* Lock to serialize config updates with service function */
80 	rte_spinlock_t rx_lock;
81 	/* Max mbufs processed in any service function invocation */
82 	uint32_t max_nb_rx;
83 	/* Receive queues that need to be polled */
84 	struct eth_rx_poll_entry *eth_rx_poll;
85 	/* Size of the eth_rx_poll array */
86 	uint16_t num_rx_polled;
87 	/* Weighted round robin schedule */
88 	uint32_t *wrr_sched;
89 	/* wrr_sched[] size */
90 	uint32_t wrr_len;
91 	/* Next entry in wrr[] to begin polling */
92 	uint32_t wrr_pos;
93 	/* Event burst buffer */
94 	struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
95 	/* Per adapter stats */
96 	struct rte_event_eth_rx_adapter_stats stats;
97 	/* Block count, counts up to BLOCK_CNT_THRESHOLD */
98 	uint16_t enq_block_count;
99 	/* Block start ts */
100 	uint64_t rx_enq_block_start_ts;
101 	/* epoll fd used to wait for Rx interrupts */
102 	int epd;
103 	/* Num of interrupt driven interrupt queues */
104 	uint32_t num_rx_intr;
105 	/* Used to send <dev id, queue id> of interrupting Rx queues from
106 	 * the interrupt thread to the Rx thread
107 	 */
108 	struct rte_ring *intr_ring;
109 	/* Rx Queue data (dev id, queue id) for the last non-empty
110 	 * queue polled
111 	 */
112 	union queue_data qd;
113 	/* queue_data is valid */
114 	int qd_valid;
115 	/* Interrupt ring lock, synchronizes Rx thread
116 	 * and interrupt thread
117 	 */
118 	rte_spinlock_t intr_ring_lock;
119 	/* event array passed to rte_poll_wait */
120 	struct rte_epoll_event *epoll_events;
121 	/* Count of interrupt vectors in use */
122 	uint32_t num_intr_vec;
123 	/* Thread blocked on Rx interrupts */
124 	pthread_t rx_intr_thread;
125 	/* Configuration callback for rte_service configuration */
126 	rte_event_eth_rx_adapter_conf_cb conf_cb;
127 	/* Configuration callback argument */
128 	void *conf_arg;
129 	/* Set if  default_cb is being used */
130 	int default_cb_arg;
131 	/* Service initialization state */
132 	uint8_t service_inited;
133 	/* Total count of Rx queues in adapter */
134 	uint32_t nb_queues;
135 	/* Memory allocation name */
136 	char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
137 	/* Socket identifier cached from eventdev */
138 	int socket_id;
139 	/* Per adapter EAL service */
140 	uint32_t service_id;
141 	/* Adapter started flag */
142 	uint8_t rxa_started;
143 	/* Adapter ID */
144 	uint8_t id;
145 } __rte_cache_aligned;
146 
147 /* Per eth device */
148 struct eth_device_info {
149 	struct rte_eth_dev *dev;
150 	struct eth_rx_queue_info *rx_queue;
151 	/* Rx callback */
152 	rte_event_eth_rx_adapter_cb_fn cb_fn;
153 	/* Rx callback argument */
154 	void *cb_arg;
155 	/* Set if ethdev->eventdev packet transfer uses a
156 	 * hardware mechanism
157 	 */
158 	uint8_t internal_event_port;
159 	/* Set if the adapter is processing rx queues for
160 	 * this eth device and packet processing has been
161 	 * started, allows for the code to know if the PMD
162 	 * rx_adapter_stop callback needs to be invoked
163 	 */
164 	uint8_t dev_rx_started;
165 	/* Number of queues added for this device */
166 	uint16_t nb_dev_queues;
167 	/* Number of poll based queues
168 	 * If nb_rx_poll > 0, the start callback will
169 	 * be invoked if not already invoked
170 	 */
171 	uint16_t nb_rx_poll;
172 	/* Number of interrupt based queues
173 	 * If nb_rx_intr > 0, the start callback will
174 	 * be invoked if not already invoked.
175 	 */
176 	uint16_t nb_rx_intr;
177 	/* Number of queues that use the shared interrupt */
178 	uint16_t nb_shared_intr;
179 	/* sum(wrr(q)) for all queues within the device
180 	 * useful when deleting all device queues
181 	 */
182 	uint32_t wrr_len;
183 	/* Intr based queue index to start polling from, this is used
184 	 * if the number of shared interrupts is non-zero
185 	 */
186 	uint16_t next_q_idx;
187 	/* Intr based queue indices */
188 	uint16_t *intr_queue;
189 	/* device generates per Rx queue interrupt for queue index
190 	 * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
191 	 */
192 	int multi_intr_cap;
193 	/* shared interrupt enabled */
194 	int shared_intr_enabled;
195 };
196 
197 /* Per Rx queue */
198 struct eth_rx_queue_info {
199 	int queue_enabled;	/* True if added */
200 	int intr_enabled;
201 	uint16_t wt;		/* Polling weight */
202 	uint32_t flow_id_mask;	/* Set to ~0 if app provides flow id else 0 */
203 	uint64_t event;
204 };
205 
206 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
207 
208 static inline int
rxa_validate_id(uint8_t id)209 rxa_validate_id(uint8_t id)
210 {
211 	return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
212 }
213 
214 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
215 	if (!rxa_validate_id(id)) { \
216 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
217 		return retval; \
218 	} \
219 } while (0)
220 
221 static inline int
rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter * rx_adapter)222 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
223 {
224 	return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
225 }
226 
227 /* Greatest common divisor */
rxa_gcd_u16(uint16_t a,uint16_t b)228 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
229 {
230 	uint16_t r = a % b;
231 
232 	return r ? rxa_gcd_u16(b, r) : b;
233 }
234 
235 /* Returns the next queue in the polling sequence
236  *
237  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
238  */
239 static int
rxa_wrr_next(struct rte_event_eth_rx_adapter * rx_adapter,unsigned int n,int * cw,struct eth_rx_poll_entry * eth_rx_poll,uint16_t max_wt,uint16_t gcd,int prev)240 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
241 	 unsigned int n, int *cw,
242 	 struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
243 	 uint16_t gcd, int prev)
244 {
245 	int i = prev;
246 	uint16_t w;
247 
248 	while (1) {
249 		uint16_t q;
250 		uint16_t d;
251 
252 		i = (i + 1) % n;
253 		if (i == 0) {
254 			*cw = *cw - gcd;
255 			if (*cw <= 0)
256 				*cw = max_wt;
257 		}
258 
259 		q = eth_rx_poll[i].eth_rx_qid;
260 		d = eth_rx_poll[i].eth_dev_id;
261 		w = rx_adapter->eth_devices[d].rx_queue[q].wt;
262 
263 		if ((int)w >= *cw)
264 			return i;
265 	}
266 }
267 
268 static inline int
rxa_shared_intr(struct eth_device_info * dev_info,int rx_queue_id)269 rxa_shared_intr(struct eth_device_info *dev_info,
270 	int rx_queue_id)
271 {
272 	int multi_intr_cap;
273 
274 	if (dev_info->dev->intr_handle == NULL)
275 		return 0;
276 
277 	multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
278 	return !multi_intr_cap ||
279 		rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
280 }
281 
282 static inline int
rxa_intr_queue(struct eth_device_info * dev_info,int rx_queue_id)283 rxa_intr_queue(struct eth_device_info *dev_info,
284 	int rx_queue_id)
285 {
286 	struct eth_rx_queue_info *queue_info;
287 
288 	queue_info = &dev_info->rx_queue[rx_queue_id];
289 	return dev_info->rx_queue &&
290 		!dev_info->internal_event_port &&
291 		queue_info->queue_enabled && queue_info->wt == 0;
292 }
293 
294 static inline int
rxa_polled_queue(struct eth_device_info * dev_info,int rx_queue_id)295 rxa_polled_queue(struct eth_device_info *dev_info,
296 	int rx_queue_id)
297 {
298 	struct eth_rx_queue_info *queue_info;
299 
300 	queue_info = &dev_info->rx_queue[rx_queue_id];
301 	return !dev_info->internal_event_port &&
302 		dev_info->rx_queue &&
303 		queue_info->queue_enabled && queue_info->wt != 0;
304 }
305 
306 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
307 static int
rxa_nb_intr_vect(struct eth_device_info * dev_info,int rx_queue_id,int add)308 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
309 {
310 	uint16_t i;
311 	int n, s;
312 	uint16_t nbq;
313 
314 	nbq = dev_info->dev->data->nb_rx_queues;
315 	n = 0; /* non shared count */
316 	s = 0; /* shared count */
317 
318 	if (rx_queue_id == -1) {
319 		for (i = 0; i < nbq; i++) {
320 			if (!rxa_shared_intr(dev_info, i))
321 				n += add ? !rxa_intr_queue(dev_info, i) :
322 					rxa_intr_queue(dev_info, i);
323 			else
324 				s += add ? !rxa_intr_queue(dev_info, i) :
325 					rxa_intr_queue(dev_info, i);
326 		}
327 
328 		if (s > 0) {
329 			if ((add && dev_info->nb_shared_intr == 0) ||
330 				(!add && dev_info->nb_shared_intr))
331 				n += 1;
332 		}
333 	} else {
334 		if (!rxa_shared_intr(dev_info, rx_queue_id))
335 			n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
336 				rxa_intr_queue(dev_info, rx_queue_id);
337 		else
338 			n = add ? !dev_info->nb_shared_intr :
339 				dev_info->nb_shared_intr == 1;
340 	}
341 
342 	return add ? n : -n;
343 }
344 
345 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
346  */
347 static void
rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter * rx_adapter,struct eth_device_info * dev_info,int rx_queue_id,uint32_t * nb_rx_intr)348 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
349 			struct eth_device_info *dev_info,
350 			int rx_queue_id,
351 			uint32_t *nb_rx_intr)
352 {
353 	uint32_t intr_diff;
354 
355 	if (rx_queue_id == -1)
356 		intr_diff = dev_info->nb_rx_intr;
357 	else
358 		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
359 
360 	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
361 }
362 
363 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
364  * interrupt queues could currently be poll mode Rx queues
365  */
366 static void
rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter * rx_adapter,struct eth_device_info * dev_info,int rx_queue_id,uint32_t * nb_rx_poll,uint32_t * nb_rx_intr,uint32_t * nb_wrr)367 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
368 			struct eth_device_info *dev_info,
369 			int rx_queue_id,
370 			uint32_t *nb_rx_poll,
371 			uint32_t *nb_rx_intr,
372 			uint32_t *nb_wrr)
373 {
374 	uint32_t intr_diff;
375 	uint32_t poll_diff;
376 	uint32_t wrr_len_diff;
377 
378 	if (rx_queue_id == -1) {
379 		intr_diff = dev_info->dev->data->nb_rx_queues -
380 						dev_info->nb_rx_intr;
381 		poll_diff = dev_info->nb_rx_poll;
382 		wrr_len_diff = dev_info->wrr_len;
383 	} else {
384 		intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
385 		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
386 		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
387 					0;
388 	}
389 
390 	*nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
391 	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
392 	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
393 }
394 
395 /* Calculate size of the eth_rx_poll and wrr_sched arrays
396  * after deleting poll mode rx queues
397  */
398 static void
rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter * rx_adapter,struct eth_device_info * dev_info,int rx_queue_id,uint32_t * nb_rx_poll,uint32_t * nb_wrr)399 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
400 			struct eth_device_info *dev_info,
401 			int rx_queue_id,
402 			uint32_t *nb_rx_poll,
403 			uint32_t *nb_wrr)
404 {
405 	uint32_t poll_diff;
406 	uint32_t wrr_len_diff;
407 
408 	if (rx_queue_id == -1) {
409 		poll_diff = dev_info->nb_rx_poll;
410 		wrr_len_diff = dev_info->wrr_len;
411 	} else {
412 		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
413 		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
414 					0;
415 	}
416 
417 	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
418 	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
419 }
420 
421 /* Calculate nb_rx_* after adding poll mode rx queues
422  */
423 static void
rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter * rx_adapter,struct eth_device_info * dev_info,int rx_queue_id,uint16_t wt,uint32_t * nb_rx_poll,uint32_t * nb_rx_intr,uint32_t * nb_wrr)424 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
425 			struct eth_device_info *dev_info,
426 			int rx_queue_id,
427 			uint16_t wt,
428 			uint32_t *nb_rx_poll,
429 			uint32_t *nb_rx_intr,
430 			uint32_t *nb_wrr)
431 {
432 	uint32_t intr_diff;
433 	uint32_t poll_diff;
434 	uint32_t wrr_len_diff;
435 
436 	if (rx_queue_id == -1) {
437 		intr_diff = dev_info->nb_rx_intr;
438 		poll_diff = dev_info->dev->data->nb_rx_queues -
439 						dev_info->nb_rx_poll;
440 		wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
441 				- dev_info->wrr_len;
442 	} else {
443 		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
444 		poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
445 		wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
446 				wt - dev_info->rx_queue[rx_queue_id].wt :
447 				wt;
448 	}
449 
450 	*nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
451 	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
452 	*nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
453 }
454 
455 /* Calculate nb_rx_* after adding rx_queue_id */
456 static void
rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter * rx_adapter,struct eth_device_info * dev_info,int rx_queue_id,uint16_t wt,uint32_t * nb_rx_poll,uint32_t * nb_rx_intr,uint32_t * nb_wrr)457 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
458 		struct eth_device_info *dev_info,
459 		int rx_queue_id,
460 		uint16_t wt,
461 		uint32_t *nb_rx_poll,
462 		uint32_t *nb_rx_intr,
463 		uint32_t *nb_wrr)
464 {
465 	if (wt != 0)
466 		rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
467 					wt, nb_rx_poll, nb_rx_intr, nb_wrr);
468 	else
469 		rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
470 					nb_rx_poll, nb_rx_intr, nb_wrr);
471 }
472 
473 /* Calculate nb_rx_* after deleting rx_queue_id */
474 static void
rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter * rx_adapter,struct eth_device_info * dev_info,int rx_queue_id,uint32_t * nb_rx_poll,uint32_t * nb_rx_intr,uint32_t * nb_wrr)475 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
476 		struct eth_device_info *dev_info,
477 		int rx_queue_id,
478 		uint32_t *nb_rx_poll,
479 		uint32_t *nb_rx_intr,
480 		uint32_t *nb_wrr)
481 {
482 	rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
483 				nb_wrr);
484 	rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
485 				nb_rx_intr);
486 }
487 
488 /*
489  * Allocate the rx_poll array
490  */
491 static struct eth_rx_poll_entry *
rxa_alloc_poll(struct rte_event_eth_rx_adapter * rx_adapter,uint32_t num_rx_polled)492 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
493 	uint32_t num_rx_polled)
494 {
495 	size_t len;
496 
497 	len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
498 							RTE_CACHE_LINE_SIZE);
499 	return  rte_zmalloc_socket(rx_adapter->mem_name,
500 				len,
501 				RTE_CACHE_LINE_SIZE,
502 				rx_adapter->socket_id);
503 }
504 
505 /*
506  * Allocate the WRR array
507  */
508 static uint32_t *
rxa_alloc_wrr(struct rte_event_eth_rx_adapter * rx_adapter,int nb_wrr)509 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
510 {
511 	size_t len;
512 
513 	len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
514 			RTE_CACHE_LINE_SIZE);
515 	return  rte_zmalloc_socket(rx_adapter->mem_name,
516 				len,
517 				RTE_CACHE_LINE_SIZE,
518 				rx_adapter->socket_id);
519 }
520 
521 static int
rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter * rx_adapter,uint32_t nb_poll,uint32_t nb_wrr,struct eth_rx_poll_entry ** rx_poll,uint32_t ** wrr_sched)522 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
523 		uint32_t nb_poll,
524 		uint32_t nb_wrr,
525 		struct eth_rx_poll_entry **rx_poll,
526 		uint32_t **wrr_sched)
527 {
528 
529 	if (nb_poll == 0) {
530 		*rx_poll = NULL;
531 		*wrr_sched = NULL;
532 		return 0;
533 	}
534 
535 	*rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
536 	if (*rx_poll == NULL) {
537 		*wrr_sched = NULL;
538 		return -ENOMEM;
539 	}
540 
541 	*wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
542 	if (*wrr_sched == NULL) {
543 		rte_free(*rx_poll);
544 		return -ENOMEM;
545 	}
546 	return 0;
547 }
548 
549 /* Precalculate WRR polling sequence for all queues in rx_adapter */
550 static void
rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter * rx_adapter,struct eth_rx_poll_entry * rx_poll,uint32_t * rx_wrr)551 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
552 		struct eth_rx_poll_entry *rx_poll,
553 		uint32_t *rx_wrr)
554 {
555 	uint16_t d;
556 	uint16_t q;
557 	unsigned int i;
558 	int prev = -1;
559 	int cw = -1;
560 
561 	/* Initialize variables for calculation of wrr schedule */
562 	uint16_t max_wrr_pos = 0;
563 	unsigned int poll_q = 0;
564 	uint16_t max_wt = 0;
565 	uint16_t gcd = 0;
566 
567 	if (rx_poll == NULL)
568 		return;
569 
570 	/* Generate array of all queues to poll, the size of this
571 	 * array is poll_q
572 	 */
573 	RTE_ETH_FOREACH_DEV(d) {
574 		uint16_t nb_rx_queues;
575 		struct eth_device_info *dev_info =
576 				&rx_adapter->eth_devices[d];
577 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
578 		if (dev_info->rx_queue == NULL)
579 			continue;
580 		if (dev_info->internal_event_port)
581 			continue;
582 		dev_info->wrr_len = 0;
583 		for (q = 0; q < nb_rx_queues; q++) {
584 			struct eth_rx_queue_info *queue_info =
585 				&dev_info->rx_queue[q];
586 			uint16_t wt;
587 
588 			if (!rxa_polled_queue(dev_info, q))
589 				continue;
590 			wt = queue_info->wt;
591 			rx_poll[poll_q].eth_dev_id = d;
592 			rx_poll[poll_q].eth_rx_qid = q;
593 			max_wrr_pos += wt;
594 			dev_info->wrr_len += wt;
595 			max_wt = RTE_MAX(max_wt, wt);
596 			gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
597 			poll_q++;
598 		}
599 	}
600 
601 	/* Generate polling sequence based on weights */
602 	prev = -1;
603 	cw = -1;
604 	for (i = 0; i < max_wrr_pos; i++) {
605 		rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
606 				     rx_poll, max_wt, gcd, prev);
607 		prev = rx_wrr[i];
608 	}
609 }
610 
611 static inline void
rxa_mtoip(struct rte_mbuf * m,struct rte_ipv4_hdr ** ipv4_hdr,struct rte_ipv6_hdr ** ipv6_hdr)612 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
613 	struct rte_ipv6_hdr **ipv6_hdr)
614 {
615 	struct rte_ether_hdr *eth_hdr =
616 		rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
617 	struct rte_vlan_hdr *vlan_hdr;
618 
619 	*ipv4_hdr = NULL;
620 	*ipv6_hdr = NULL;
621 
622 	switch (eth_hdr->ether_type) {
623 	case RTE_BE16(RTE_ETHER_TYPE_IPV4):
624 		*ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
625 		break;
626 
627 	case RTE_BE16(RTE_ETHER_TYPE_IPV6):
628 		*ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
629 		break;
630 
631 	case RTE_BE16(RTE_ETHER_TYPE_VLAN):
632 		vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
633 		switch (vlan_hdr->eth_proto) {
634 		case RTE_BE16(RTE_ETHER_TYPE_IPV4):
635 			*ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
636 			break;
637 		case RTE_BE16(RTE_ETHER_TYPE_IPV6):
638 			*ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
639 			break;
640 		default:
641 			break;
642 		}
643 		break;
644 
645 	default:
646 		break;
647 	}
648 }
649 
650 /* Calculate RSS hash for IPv4/6 */
651 static inline uint32_t
rxa_do_softrss(struct rte_mbuf * m,const uint8_t * rss_key_be)652 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
653 {
654 	uint32_t input_len;
655 	void *tuple;
656 	struct rte_ipv4_tuple ipv4_tuple;
657 	struct rte_ipv6_tuple ipv6_tuple;
658 	struct rte_ipv4_hdr *ipv4_hdr;
659 	struct rte_ipv6_hdr *ipv6_hdr;
660 
661 	rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
662 
663 	if (ipv4_hdr) {
664 		ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
665 		ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
666 		tuple = &ipv4_tuple;
667 		input_len = RTE_THASH_V4_L3_LEN;
668 	} else if (ipv6_hdr) {
669 		rte_thash_load_v6_addrs(ipv6_hdr,
670 					(union rte_thash_tuple *)&ipv6_tuple);
671 		tuple = &ipv6_tuple;
672 		input_len = RTE_THASH_V6_L3_LEN;
673 	} else
674 		return 0;
675 
676 	return rte_softrss_be(tuple, input_len, rss_key_be);
677 }
678 
679 static inline int
rxa_enq_blocked(struct rte_event_eth_rx_adapter * rx_adapter)680 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
681 {
682 	return !!rx_adapter->enq_block_count;
683 }
684 
685 static inline void
rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter * rx_adapter)686 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
687 {
688 	if (rx_adapter->rx_enq_block_start_ts)
689 		return;
690 
691 	rx_adapter->enq_block_count++;
692 	if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
693 		return;
694 
695 	rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
696 }
697 
698 static inline void
rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter * rx_adapter,struct rte_event_eth_rx_adapter_stats * stats)699 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
700 		    struct rte_event_eth_rx_adapter_stats *stats)
701 {
702 	if (unlikely(!stats->rx_enq_start_ts))
703 		stats->rx_enq_start_ts = rte_get_tsc_cycles();
704 
705 	if (likely(!rxa_enq_blocked(rx_adapter)))
706 		return;
707 
708 	rx_adapter->enq_block_count = 0;
709 	if (rx_adapter->rx_enq_block_start_ts) {
710 		stats->rx_enq_end_ts = rte_get_tsc_cycles();
711 		stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
712 		    rx_adapter->rx_enq_block_start_ts;
713 		rx_adapter->rx_enq_block_start_ts = 0;
714 	}
715 }
716 
717 /* Enqueue buffered events to event device */
718 static inline uint16_t
rxa_flush_event_buffer(struct rte_event_eth_rx_adapter * rx_adapter)719 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
720 {
721 	struct rte_eth_event_enqueue_buffer *buf =
722 	    &rx_adapter->event_enqueue_buffer;
723 	struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
724 
725 	uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
726 					rx_adapter->event_port_id,
727 					buf->events,
728 					buf->count);
729 	if (n != buf->count) {
730 		memmove(buf->events,
731 			&buf->events[n],
732 			(buf->count - n) * sizeof(struct rte_event));
733 		stats->rx_enq_retry++;
734 	}
735 
736 	n ? rxa_enq_block_end_ts(rx_adapter, stats) :
737 		rxa_enq_block_start_ts(rx_adapter);
738 
739 	buf->count -= n;
740 	stats->rx_enq_count += n;
741 
742 	return n;
743 }
744 
745 static inline void
rxa_buffer_mbufs(struct rte_event_eth_rx_adapter * rx_adapter,uint16_t eth_dev_id,uint16_t rx_queue_id,struct rte_mbuf ** mbufs,uint16_t num)746 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
747 		uint16_t eth_dev_id,
748 		uint16_t rx_queue_id,
749 		struct rte_mbuf **mbufs,
750 		uint16_t num)
751 {
752 	uint32_t i;
753 	struct eth_device_info *dev_info =
754 					&rx_adapter->eth_devices[eth_dev_id];
755 	struct eth_rx_queue_info *eth_rx_queue_info =
756 					&dev_info->rx_queue[rx_queue_id];
757 	struct rte_eth_event_enqueue_buffer *buf =
758 					&rx_adapter->event_enqueue_buffer;
759 	struct rte_event *ev = &buf->events[buf->count];
760 	uint64_t event = eth_rx_queue_info->event;
761 	uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
762 	struct rte_mbuf *m = mbufs[0];
763 	uint32_t rss_mask;
764 	uint32_t rss;
765 	int do_rss;
766 	uint16_t nb_cb;
767 	uint16_t dropped;
768 
769 	/* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
770 	rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
771 	do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
772 
773 	for (i = 0; i < num; i++) {
774 		m = mbufs[i];
775 
776 		rss = do_rss ?
777 			rxa_do_softrss(m, rx_adapter->rss_key_be) :
778 			m->hash.rss;
779 		ev->event = event;
780 		ev->flow_id = (rss & ~flow_id_mask) |
781 				(ev->flow_id & flow_id_mask);
782 		ev->mbuf = m;
783 		ev++;
784 	}
785 
786 	if (dev_info->cb_fn) {
787 
788 		dropped = 0;
789 		nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
790 					ETH_EVENT_BUFFER_SIZE, buf->count, ev,
791 					num, dev_info->cb_arg, &dropped);
792 		if (unlikely(nb_cb > num))
793 			RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
794 				nb_cb, num);
795 		else
796 			num = nb_cb;
797 		if (dropped)
798 			rx_adapter->stats.rx_dropped += dropped;
799 	}
800 
801 	buf->count += num;
802 }
803 
804 /* Enqueue packets from  <port, q>  to event buffer */
805 static inline uint32_t
rxa_eth_rx(struct rte_event_eth_rx_adapter * rx_adapter,uint16_t port_id,uint16_t queue_id,uint32_t rx_count,uint32_t max_rx,int * rxq_empty)806 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
807 	uint16_t port_id,
808 	uint16_t queue_id,
809 	uint32_t rx_count,
810 	uint32_t max_rx,
811 	int *rxq_empty)
812 {
813 	struct rte_mbuf *mbufs[BATCH_SIZE];
814 	struct rte_eth_event_enqueue_buffer *buf =
815 					&rx_adapter->event_enqueue_buffer;
816 	struct rte_event_eth_rx_adapter_stats *stats =
817 					&rx_adapter->stats;
818 	uint16_t n;
819 	uint32_t nb_rx = 0;
820 
821 	if (rxq_empty)
822 		*rxq_empty = 0;
823 	/* Don't do a batch dequeue from the rx queue if there isn't
824 	 * enough space in the enqueue buffer.
825 	 */
826 	while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
827 		if (buf->count >= BATCH_SIZE)
828 			rxa_flush_event_buffer(rx_adapter);
829 
830 		stats->rx_poll_count++;
831 		n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
832 		if (unlikely(!n)) {
833 			if (rxq_empty)
834 				*rxq_empty = 1;
835 			break;
836 		}
837 		rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
838 		nb_rx += n;
839 		if (rx_count + nb_rx > max_rx)
840 			break;
841 	}
842 
843 	if (buf->count > 0)
844 		rxa_flush_event_buffer(rx_adapter);
845 
846 	return nb_rx;
847 }
848 
849 static inline void
rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter * rx_adapter,void * data)850 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
851 		void *data)
852 {
853 	uint16_t port_id;
854 	uint16_t queue;
855 	int err;
856 	union queue_data qd;
857 	struct eth_device_info *dev_info;
858 	struct eth_rx_queue_info *queue_info;
859 	int *intr_enabled;
860 
861 	qd.ptr = data;
862 	port_id = qd.port;
863 	queue = qd.queue;
864 
865 	dev_info = &rx_adapter->eth_devices[port_id];
866 	queue_info = &dev_info->rx_queue[queue];
867 	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
868 	if (rxa_shared_intr(dev_info, queue))
869 		intr_enabled = &dev_info->shared_intr_enabled;
870 	else
871 		intr_enabled = &queue_info->intr_enabled;
872 
873 	if (*intr_enabled) {
874 		*intr_enabled = 0;
875 		err = rte_ring_enqueue(rx_adapter->intr_ring, data);
876 		/* Entry should always be available.
877 		 * The ring size equals the maximum number of interrupt
878 		 * vectors supported (an interrupt vector is shared in
879 		 * case of shared interrupts)
880 		 */
881 		if (err)
882 			RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
883 				" to ring: %s", strerror(-err));
884 		else
885 			rte_eth_dev_rx_intr_disable(port_id, queue);
886 	}
887 	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
888 }
889 
890 static int
rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter * rx_adapter,uint32_t num_intr_vec)891 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
892 			uint32_t num_intr_vec)
893 {
894 	if (rx_adapter->num_intr_vec + num_intr_vec >
895 				RTE_EVENT_ETH_INTR_RING_SIZE) {
896 		RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
897 		" %d needed %d limit %d", rx_adapter->num_intr_vec,
898 		num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
899 		return -ENOSPC;
900 	}
901 
902 	return 0;
903 }
904 
905 /* Delete entries for (dev, queue) from the interrupt ring */
906 static void
rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter * rx_adapter,struct eth_device_info * dev_info,uint16_t rx_queue_id)907 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
908 			struct eth_device_info *dev_info,
909 			uint16_t rx_queue_id)
910 {
911 	int i, n;
912 	union queue_data qd;
913 
914 	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
915 
916 	n = rte_ring_count(rx_adapter->intr_ring);
917 	for (i = 0; i < n; i++) {
918 		rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
919 		if (!rxa_shared_intr(dev_info, rx_queue_id)) {
920 			if (qd.port == dev_info->dev->data->port_id &&
921 				qd.queue == rx_queue_id)
922 				continue;
923 		} else {
924 			if (qd.port == dev_info->dev->data->port_id)
925 				continue;
926 		}
927 		rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
928 	}
929 
930 	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
931 }
932 
933 /* pthread callback handling interrupt mode receive queues
934  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
935  * interrupting queue to the adapter's ring buffer for interrupt events.
936  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
937  * the adapter service function.
938  */
939 static void *
rxa_intr_thread(void * arg)940 rxa_intr_thread(void *arg)
941 {
942 	struct rte_event_eth_rx_adapter *rx_adapter = arg;
943 	struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
944 	int n, i;
945 
946 	while (1) {
947 		n = rte_epoll_wait(rx_adapter->epd, epoll_events,
948 				RTE_EVENT_ETH_INTR_RING_SIZE, -1);
949 		if (unlikely(n < 0))
950 			RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
951 					n);
952 		for (i = 0; i < n; i++) {
953 			rxa_intr_ring_enqueue(rx_adapter,
954 					epoll_events[i].epdata.data);
955 		}
956 	}
957 
958 	return NULL;
959 }
960 
961 /* Dequeue <port, q> from interrupt ring and enqueue received
962  * mbufs to eventdev
963  */
964 static inline uint32_t
rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter * rx_adapter)965 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
966 {
967 	uint32_t n;
968 	uint32_t nb_rx = 0;
969 	int rxq_empty;
970 	struct rte_eth_event_enqueue_buffer *buf;
971 	rte_spinlock_t *ring_lock;
972 	uint8_t max_done = 0;
973 
974 	if (rx_adapter->num_rx_intr == 0)
975 		return 0;
976 
977 	if (rte_ring_count(rx_adapter->intr_ring) == 0
978 		&& !rx_adapter->qd_valid)
979 		return 0;
980 
981 	buf = &rx_adapter->event_enqueue_buffer;
982 	ring_lock = &rx_adapter->intr_ring_lock;
983 
984 	if (buf->count >= BATCH_SIZE)
985 		rxa_flush_event_buffer(rx_adapter);
986 
987 	while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
988 		struct eth_device_info *dev_info;
989 		uint16_t port;
990 		uint16_t queue;
991 		union queue_data qd  = rx_adapter->qd;
992 		int err;
993 
994 		if (!rx_adapter->qd_valid) {
995 			struct eth_rx_queue_info *queue_info;
996 
997 			rte_spinlock_lock(ring_lock);
998 			err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
999 			if (err) {
1000 				rte_spinlock_unlock(ring_lock);
1001 				break;
1002 			}
1003 
1004 			port = qd.port;
1005 			queue = qd.queue;
1006 			rx_adapter->qd = qd;
1007 			rx_adapter->qd_valid = 1;
1008 			dev_info = &rx_adapter->eth_devices[port];
1009 			if (rxa_shared_intr(dev_info, queue))
1010 				dev_info->shared_intr_enabled = 1;
1011 			else {
1012 				queue_info = &dev_info->rx_queue[queue];
1013 				queue_info->intr_enabled = 1;
1014 			}
1015 			rte_eth_dev_rx_intr_enable(port, queue);
1016 			rte_spinlock_unlock(ring_lock);
1017 		} else {
1018 			port = qd.port;
1019 			queue = qd.queue;
1020 
1021 			dev_info = &rx_adapter->eth_devices[port];
1022 		}
1023 
1024 		if (rxa_shared_intr(dev_info, queue)) {
1025 			uint16_t i;
1026 			uint16_t nb_queues;
1027 
1028 			nb_queues = dev_info->dev->data->nb_rx_queues;
1029 			n = 0;
1030 			for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1031 				uint8_t enq_buffer_full;
1032 
1033 				if (!rxa_intr_queue(dev_info, i))
1034 					continue;
1035 				n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1036 					rx_adapter->max_nb_rx,
1037 					&rxq_empty);
1038 				nb_rx += n;
1039 
1040 				enq_buffer_full = !rxq_empty && n == 0;
1041 				max_done = nb_rx > rx_adapter->max_nb_rx;
1042 
1043 				if (enq_buffer_full || max_done) {
1044 					dev_info->next_q_idx = i;
1045 					goto done;
1046 				}
1047 			}
1048 
1049 			rx_adapter->qd_valid = 0;
1050 
1051 			/* Reinitialize for next interrupt */
1052 			dev_info->next_q_idx = dev_info->multi_intr_cap ?
1053 						RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1054 						0;
1055 		} else {
1056 			n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1057 				rx_adapter->max_nb_rx,
1058 				&rxq_empty);
1059 			rx_adapter->qd_valid = !rxq_empty;
1060 			nb_rx += n;
1061 			if (nb_rx > rx_adapter->max_nb_rx)
1062 				break;
1063 		}
1064 	}
1065 
1066 done:
1067 	rx_adapter->stats.rx_intr_packets += nb_rx;
1068 	return nb_rx;
1069 }
1070 
1071 /*
1072  * Polls receive queues added to the event adapter and enqueues received
1073  * packets to the event device.
1074  *
1075  * The receive code enqueues initially to a temporary buffer, the
1076  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1077  *
1078  * If there isn't space available in the temporary buffer, packets from the
1079  * Rx queue aren't dequeued from the eth device, this back pressures the
1080  * eth device, in virtual device environments this back pressure is relayed to
1081  * the hypervisor's switching layer where adjustments can be made to deal with
1082  * it.
1083  */
1084 static inline uint32_t
rxa_poll(struct rte_event_eth_rx_adapter * rx_adapter)1085 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1086 {
1087 	uint32_t num_queue;
1088 	uint32_t nb_rx = 0;
1089 	struct rte_eth_event_enqueue_buffer *buf;
1090 	uint32_t wrr_pos;
1091 	uint32_t max_nb_rx;
1092 
1093 	wrr_pos = rx_adapter->wrr_pos;
1094 	max_nb_rx = rx_adapter->max_nb_rx;
1095 	buf = &rx_adapter->event_enqueue_buffer;
1096 
1097 	/* Iterate through a WRR sequence */
1098 	for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1099 		unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1100 		uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1101 		uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1102 
1103 		/* Don't do a batch dequeue from the rx queue if there isn't
1104 		 * enough space in the enqueue buffer.
1105 		 */
1106 		if (buf->count >= BATCH_SIZE)
1107 			rxa_flush_event_buffer(rx_adapter);
1108 		if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
1109 			rx_adapter->wrr_pos = wrr_pos;
1110 			return nb_rx;
1111 		}
1112 
1113 		nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1114 				NULL);
1115 		if (nb_rx > max_nb_rx) {
1116 			rx_adapter->wrr_pos =
1117 				    (wrr_pos + 1) % rx_adapter->wrr_len;
1118 			break;
1119 		}
1120 
1121 		if (++wrr_pos == rx_adapter->wrr_len)
1122 			wrr_pos = 0;
1123 	}
1124 	return nb_rx;
1125 }
1126 
1127 static int
rxa_service_func(void * args)1128 rxa_service_func(void *args)
1129 {
1130 	struct rte_event_eth_rx_adapter *rx_adapter = args;
1131 	struct rte_event_eth_rx_adapter_stats *stats;
1132 
1133 	if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1134 		return 0;
1135 	if (!rx_adapter->rxa_started) {
1136 		rte_spinlock_unlock(&rx_adapter->rx_lock);
1137 		return 0;
1138 	}
1139 
1140 	stats = &rx_adapter->stats;
1141 	stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1142 	stats->rx_packets += rxa_poll(rx_adapter);
1143 	rte_spinlock_unlock(&rx_adapter->rx_lock);
1144 	return 0;
1145 }
1146 
1147 static int
rte_event_eth_rx_adapter_init(void)1148 rte_event_eth_rx_adapter_init(void)
1149 {
1150 	const char *name = "rte_event_eth_rx_adapter_array";
1151 	const struct rte_memzone *mz;
1152 	unsigned int sz;
1153 
1154 	sz = sizeof(*event_eth_rx_adapter) *
1155 	    RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1156 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1157 
1158 	mz = rte_memzone_lookup(name);
1159 	if (mz == NULL) {
1160 		mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1161 						 RTE_CACHE_LINE_SIZE);
1162 		if (mz == NULL) {
1163 			RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1164 					PRId32, rte_errno);
1165 			return -rte_errno;
1166 		}
1167 	}
1168 
1169 	event_eth_rx_adapter = mz->addr;
1170 	return 0;
1171 }
1172 
1173 static inline struct rte_event_eth_rx_adapter *
rxa_id_to_adapter(uint8_t id)1174 rxa_id_to_adapter(uint8_t id)
1175 {
1176 	return event_eth_rx_adapter ?
1177 		event_eth_rx_adapter[id] : NULL;
1178 }
1179 
1180 static int
rxa_default_conf_cb(uint8_t id,uint8_t dev_id,struct rte_event_eth_rx_adapter_conf * conf,void * arg)1181 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1182 		struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1183 {
1184 	int ret;
1185 	struct rte_eventdev *dev;
1186 	struct rte_event_dev_config dev_conf;
1187 	int started;
1188 	uint8_t port_id;
1189 	struct rte_event_port_conf *port_conf = arg;
1190 	struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1191 
1192 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
1193 	dev_conf = dev->data->dev_conf;
1194 
1195 	started = dev->data->dev_started;
1196 	if (started)
1197 		rte_event_dev_stop(dev_id);
1198 	port_id = dev_conf.nb_event_ports;
1199 	dev_conf.nb_event_ports += 1;
1200 	ret = rte_event_dev_configure(dev_id, &dev_conf);
1201 	if (ret) {
1202 		RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1203 						dev_id);
1204 		if (started) {
1205 			if (rte_event_dev_start(dev_id))
1206 				return -EIO;
1207 		}
1208 		return ret;
1209 	}
1210 
1211 	ret = rte_event_port_setup(dev_id, port_id, port_conf);
1212 	if (ret) {
1213 		RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1214 					port_id);
1215 		return ret;
1216 	}
1217 
1218 	conf->event_port_id = port_id;
1219 	conf->max_nb_rx = 128;
1220 	if (started)
1221 		ret = rte_event_dev_start(dev_id);
1222 	rx_adapter->default_cb_arg = 1;
1223 	return ret;
1224 }
1225 
1226 static int
rxa_epoll_create1(void)1227 rxa_epoll_create1(void)
1228 {
1229 #if defined(LINUX)
1230 	int fd;
1231 	fd = epoll_create1(EPOLL_CLOEXEC);
1232 	return fd < 0 ? -errno : fd;
1233 #elif defined(BSD)
1234 	return -ENOTSUP;
1235 #endif
1236 }
1237 
1238 static int
rxa_init_epd(struct rte_event_eth_rx_adapter * rx_adapter)1239 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1240 {
1241 	if (rx_adapter->epd != INIT_FD)
1242 		return 0;
1243 
1244 	rx_adapter->epd = rxa_epoll_create1();
1245 	if (rx_adapter->epd < 0) {
1246 		int err = rx_adapter->epd;
1247 		rx_adapter->epd = INIT_FD;
1248 		RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1249 		return err;
1250 	}
1251 
1252 	return 0;
1253 }
1254 
1255 static int
rxa_create_intr_thread(struct rte_event_eth_rx_adapter * rx_adapter)1256 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1257 {
1258 	int err;
1259 	char thread_name[RTE_MAX_THREAD_NAME_LEN];
1260 
1261 	if (rx_adapter->intr_ring)
1262 		return 0;
1263 
1264 	rx_adapter->intr_ring = rte_ring_create("intr_ring",
1265 					RTE_EVENT_ETH_INTR_RING_SIZE,
1266 					rte_socket_id(), 0);
1267 	if (!rx_adapter->intr_ring)
1268 		return -ENOMEM;
1269 
1270 	rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1271 					RTE_EVENT_ETH_INTR_RING_SIZE *
1272 					sizeof(struct rte_epoll_event),
1273 					RTE_CACHE_LINE_SIZE,
1274 					rx_adapter->socket_id);
1275 	if (!rx_adapter->epoll_events) {
1276 		err = -ENOMEM;
1277 		goto error;
1278 	}
1279 
1280 	rte_spinlock_init(&rx_adapter->intr_ring_lock);
1281 
1282 	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1283 			"rx-intr-thread-%d", rx_adapter->id);
1284 
1285 	err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1286 				NULL, rxa_intr_thread, rx_adapter);
1287 	if (!err) {
1288 		rte_thread_setname(rx_adapter->rx_intr_thread, thread_name);
1289 		return 0;
1290 	}
1291 
1292 	RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1293 error:
1294 	rte_ring_free(rx_adapter->intr_ring);
1295 	rx_adapter->intr_ring = NULL;
1296 	rx_adapter->epoll_events = NULL;
1297 	return err;
1298 }
1299 
1300 static int
rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter * rx_adapter)1301 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1302 {
1303 	int err;
1304 
1305 	err = pthread_cancel(rx_adapter->rx_intr_thread);
1306 	if (err)
1307 		RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1308 				err);
1309 
1310 	err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1311 	if (err)
1312 		RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1313 
1314 	rte_free(rx_adapter->epoll_events);
1315 	rte_ring_free(rx_adapter->intr_ring);
1316 	rx_adapter->intr_ring = NULL;
1317 	rx_adapter->epoll_events = NULL;
1318 	return 0;
1319 }
1320 
1321 static int
rxa_free_intr_resources(struct rte_event_eth_rx_adapter * rx_adapter)1322 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1323 {
1324 	int ret;
1325 
1326 	if (rx_adapter->num_rx_intr == 0)
1327 		return 0;
1328 
1329 	ret = rxa_destroy_intr_thread(rx_adapter);
1330 	if (ret)
1331 		return ret;
1332 
1333 	close(rx_adapter->epd);
1334 	rx_adapter->epd = INIT_FD;
1335 
1336 	return ret;
1337 }
1338 
1339 static int
rxa_disable_intr(struct rte_event_eth_rx_adapter * rx_adapter,struct eth_device_info * dev_info,uint16_t rx_queue_id)1340 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1341 	struct eth_device_info *dev_info,
1342 	uint16_t rx_queue_id)
1343 {
1344 	int err;
1345 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1346 	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1347 
1348 	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1349 	if (err) {
1350 		RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1351 			rx_queue_id);
1352 		return err;
1353 	}
1354 
1355 	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1356 					rx_adapter->epd,
1357 					RTE_INTR_EVENT_DEL,
1358 					0);
1359 	if (err)
1360 		RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1361 
1362 	if (sintr)
1363 		dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1364 	else
1365 		dev_info->shared_intr_enabled = 0;
1366 	return err;
1367 }
1368 
1369 static int
rxa_del_intr_queue(struct rte_event_eth_rx_adapter * rx_adapter,struct eth_device_info * dev_info,int rx_queue_id)1370 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1371 		struct eth_device_info *dev_info,
1372 		int rx_queue_id)
1373 {
1374 	int err;
1375 	int i;
1376 	int s;
1377 
1378 	if (dev_info->nb_rx_intr == 0)
1379 		return 0;
1380 
1381 	err = 0;
1382 	if (rx_queue_id == -1) {
1383 		s = dev_info->nb_shared_intr;
1384 		for (i = 0; i < dev_info->nb_rx_intr; i++) {
1385 			int sintr;
1386 			uint16_t q;
1387 
1388 			q = dev_info->intr_queue[i];
1389 			sintr = rxa_shared_intr(dev_info, q);
1390 			s -= sintr;
1391 
1392 			if (!sintr || s == 0) {
1393 
1394 				err = rxa_disable_intr(rx_adapter, dev_info,
1395 						q);
1396 				if (err)
1397 					return err;
1398 				rxa_intr_ring_del_entries(rx_adapter, dev_info,
1399 							q);
1400 			}
1401 		}
1402 	} else {
1403 		if (!rxa_intr_queue(dev_info, rx_queue_id))
1404 			return 0;
1405 		if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1406 				dev_info->nb_shared_intr == 1) {
1407 			err = rxa_disable_intr(rx_adapter, dev_info,
1408 					rx_queue_id);
1409 			if (err)
1410 				return err;
1411 			rxa_intr_ring_del_entries(rx_adapter, dev_info,
1412 						rx_queue_id);
1413 		}
1414 
1415 		for (i = 0; i < dev_info->nb_rx_intr; i++) {
1416 			if (dev_info->intr_queue[i] == rx_queue_id) {
1417 				for (; i < dev_info->nb_rx_intr - 1; i++)
1418 					dev_info->intr_queue[i] =
1419 						dev_info->intr_queue[i + 1];
1420 				break;
1421 			}
1422 		}
1423 	}
1424 
1425 	return err;
1426 }
1427 
1428 static int
rxa_config_intr(struct rte_event_eth_rx_adapter * rx_adapter,struct eth_device_info * dev_info,uint16_t rx_queue_id)1429 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1430 	struct eth_device_info *dev_info,
1431 	uint16_t rx_queue_id)
1432 {
1433 	int err, err1;
1434 	uint16_t eth_dev_id = dev_info->dev->data->port_id;
1435 	union queue_data qd;
1436 	int init_fd;
1437 	uint16_t *intr_queue;
1438 	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1439 
1440 	if (rxa_intr_queue(dev_info, rx_queue_id))
1441 		return 0;
1442 
1443 	intr_queue = dev_info->intr_queue;
1444 	if (dev_info->intr_queue == NULL) {
1445 		size_t len =
1446 			dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1447 		dev_info->intr_queue =
1448 			rte_zmalloc_socket(
1449 				rx_adapter->mem_name,
1450 				len,
1451 				0,
1452 				rx_adapter->socket_id);
1453 		if (dev_info->intr_queue == NULL)
1454 			return -ENOMEM;
1455 	}
1456 
1457 	init_fd = rx_adapter->epd;
1458 	err = rxa_init_epd(rx_adapter);
1459 	if (err)
1460 		goto err_free_queue;
1461 
1462 	qd.port = eth_dev_id;
1463 	qd.queue = rx_queue_id;
1464 
1465 	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1466 					rx_adapter->epd,
1467 					RTE_INTR_EVENT_ADD,
1468 					qd.ptr);
1469 	if (err) {
1470 		RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1471 			" Rx Queue %u err %d", rx_queue_id, err);
1472 		goto err_del_fd;
1473 	}
1474 
1475 	err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1476 	if (err) {
1477 		RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1478 				" Rx Queue %u err %d", rx_queue_id, err);
1479 
1480 		goto err_del_event;
1481 	}
1482 
1483 	err = rxa_create_intr_thread(rx_adapter);
1484 	if (!err)  {
1485 		if (sintr)
1486 			dev_info->shared_intr_enabled = 1;
1487 		else
1488 			dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1489 		return 0;
1490 	}
1491 
1492 
1493 	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1494 	if (err)
1495 		RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1496 				" Rx Queue %u err %d", rx_queue_id, err);
1497 err_del_event:
1498 	err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1499 					rx_adapter->epd,
1500 					RTE_INTR_EVENT_DEL,
1501 					0);
1502 	if (err1) {
1503 		RTE_EDEV_LOG_ERR("Could not delete event for"
1504 				" Rx Queue %u err %d", rx_queue_id, err1);
1505 	}
1506 err_del_fd:
1507 	if (init_fd == INIT_FD) {
1508 		close(rx_adapter->epd);
1509 		rx_adapter->epd = -1;
1510 	}
1511 err_free_queue:
1512 	if (intr_queue == NULL)
1513 		rte_free(dev_info->intr_queue);
1514 
1515 	return err;
1516 }
1517 
1518 static int
rxa_add_intr_queue(struct rte_event_eth_rx_adapter * rx_adapter,struct eth_device_info * dev_info,int rx_queue_id)1519 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1520 	struct eth_device_info *dev_info,
1521 	int rx_queue_id)
1522 
1523 {
1524 	int i, j, err;
1525 	int si = -1;
1526 	int shared_done = (dev_info->nb_shared_intr > 0);
1527 
1528 	if (rx_queue_id != -1) {
1529 		if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1530 			return 0;
1531 		return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1532 	}
1533 
1534 	err = 0;
1535 	for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1536 
1537 		if (rxa_shared_intr(dev_info, i) && shared_done)
1538 			continue;
1539 
1540 		err = rxa_config_intr(rx_adapter, dev_info, i);
1541 
1542 		shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1543 		if (shared_done) {
1544 			si = i;
1545 			dev_info->shared_intr_enabled = 1;
1546 		}
1547 		if (err)
1548 			break;
1549 	}
1550 
1551 	if (err == 0)
1552 		return 0;
1553 
1554 	shared_done = (dev_info->nb_shared_intr > 0);
1555 	for (j = 0; j < i; j++) {
1556 		if (rxa_intr_queue(dev_info, j))
1557 			continue;
1558 		if (rxa_shared_intr(dev_info, j) && si != j)
1559 			continue;
1560 		err = rxa_disable_intr(rx_adapter, dev_info, j);
1561 		if (err)
1562 			break;
1563 
1564 	}
1565 
1566 	return err;
1567 }
1568 
1569 
1570 static int
rxa_init_service(struct rte_event_eth_rx_adapter * rx_adapter,uint8_t id)1571 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1572 {
1573 	int ret;
1574 	struct rte_service_spec service;
1575 	struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1576 
1577 	if (rx_adapter->service_inited)
1578 		return 0;
1579 
1580 	memset(&service, 0, sizeof(service));
1581 	snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1582 		"rte_event_eth_rx_adapter_%d", id);
1583 	service.socket_id = rx_adapter->socket_id;
1584 	service.callback = rxa_service_func;
1585 	service.callback_userdata = rx_adapter;
1586 	/* Service function handles locking for queue add/del updates */
1587 	service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1588 	ret = rte_service_component_register(&service, &rx_adapter->service_id);
1589 	if (ret) {
1590 		RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1591 			service.name, ret);
1592 		return ret;
1593 	}
1594 
1595 	ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1596 		&rx_adapter_conf, rx_adapter->conf_arg);
1597 	if (ret) {
1598 		RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1599 			ret);
1600 		goto err_done;
1601 	}
1602 	rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1603 	rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1604 	rx_adapter->service_inited = 1;
1605 	rx_adapter->epd = INIT_FD;
1606 	return 0;
1607 
1608 err_done:
1609 	rte_service_component_unregister(rx_adapter->service_id);
1610 	return ret;
1611 }
1612 
1613 static void
rxa_update_queue(struct rte_event_eth_rx_adapter * rx_adapter,struct eth_device_info * dev_info,int32_t rx_queue_id,uint8_t add)1614 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1615 		struct eth_device_info *dev_info,
1616 		int32_t rx_queue_id,
1617 		uint8_t add)
1618 {
1619 	struct eth_rx_queue_info *queue_info;
1620 	int enabled;
1621 	uint16_t i;
1622 
1623 	if (dev_info->rx_queue == NULL)
1624 		return;
1625 
1626 	if (rx_queue_id == -1) {
1627 		for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1628 			rxa_update_queue(rx_adapter, dev_info, i, add);
1629 	} else {
1630 		queue_info = &dev_info->rx_queue[rx_queue_id];
1631 		enabled = queue_info->queue_enabled;
1632 		if (add) {
1633 			rx_adapter->nb_queues += !enabled;
1634 			dev_info->nb_dev_queues += !enabled;
1635 		} else {
1636 			rx_adapter->nb_queues -= enabled;
1637 			dev_info->nb_dev_queues -= enabled;
1638 		}
1639 		queue_info->queue_enabled = !!add;
1640 	}
1641 }
1642 
1643 static void
rxa_sw_del(struct rte_event_eth_rx_adapter * rx_adapter,struct eth_device_info * dev_info,int32_t rx_queue_id)1644 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1645 	struct eth_device_info *dev_info,
1646 	int32_t rx_queue_id)
1647 {
1648 	int pollq;
1649 	int intrq;
1650 	int sintrq;
1651 
1652 
1653 	if (rx_adapter->nb_queues == 0)
1654 		return;
1655 
1656 	if (rx_queue_id == -1) {
1657 		uint16_t nb_rx_queues;
1658 		uint16_t i;
1659 
1660 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1661 		for (i = 0; i <	nb_rx_queues; i++)
1662 			rxa_sw_del(rx_adapter, dev_info, i);
1663 		return;
1664 	}
1665 
1666 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
1667 	intrq = rxa_intr_queue(dev_info, rx_queue_id);
1668 	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1669 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1670 	rx_adapter->num_rx_polled -= pollq;
1671 	dev_info->nb_rx_poll -= pollq;
1672 	rx_adapter->num_rx_intr -= intrq;
1673 	dev_info->nb_rx_intr -= intrq;
1674 	dev_info->nb_shared_intr -= intrq && sintrq;
1675 }
1676 
1677 static void
rxa_add_queue(struct rte_event_eth_rx_adapter * rx_adapter,struct eth_device_info * dev_info,int32_t rx_queue_id,const struct rte_event_eth_rx_adapter_queue_conf * conf)1678 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1679 	struct eth_device_info *dev_info,
1680 	int32_t rx_queue_id,
1681 	const struct rte_event_eth_rx_adapter_queue_conf *conf)
1682 {
1683 	struct eth_rx_queue_info *queue_info;
1684 	const struct rte_event *ev = &conf->ev;
1685 	int pollq;
1686 	int intrq;
1687 	int sintrq;
1688 	struct rte_event *qi_ev;
1689 
1690 	if (rx_queue_id == -1) {
1691 		uint16_t nb_rx_queues;
1692 		uint16_t i;
1693 
1694 		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1695 		for (i = 0; i <	nb_rx_queues; i++)
1696 			rxa_add_queue(rx_adapter, dev_info, i, conf);
1697 		return;
1698 	}
1699 
1700 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
1701 	intrq = rxa_intr_queue(dev_info, rx_queue_id);
1702 	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1703 
1704 	queue_info = &dev_info->rx_queue[rx_queue_id];
1705 	queue_info->wt = conf->servicing_weight;
1706 
1707 	qi_ev = (struct rte_event *)&queue_info->event;
1708 	qi_ev->event = ev->event;
1709 	qi_ev->op = RTE_EVENT_OP_NEW;
1710 	qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
1711 	qi_ev->sub_event_type = 0;
1712 
1713 	if (conf->rx_queue_flags &
1714 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1715 		queue_info->flow_id_mask = ~0;
1716 	} else
1717 		qi_ev->flow_id = 0;
1718 
1719 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
1720 	if (rxa_polled_queue(dev_info, rx_queue_id)) {
1721 		rx_adapter->num_rx_polled += !pollq;
1722 		dev_info->nb_rx_poll += !pollq;
1723 		rx_adapter->num_rx_intr -= intrq;
1724 		dev_info->nb_rx_intr -= intrq;
1725 		dev_info->nb_shared_intr -= intrq && sintrq;
1726 	}
1727 
1728 	if (rxa_intr_queue(dev_info, rx_queue_id)) {
1729 		rx_adapter->num_rx_polled -= pollq;
1730 		dev_info->nb_rx_poll -= pollq;
1731 		rx_adapter->num_rx_intr += !intrq;
1732 		dev_info->nb_rx_intr += !intrq;
1733 		dev_info->nb_shared_intr += !intrq && sintrq;
1734 		if (dev_info->nb_shared_intr == 1) {
1735 			if (dev_info->multi_intr_cap)
1736 				dev_info->next_q_idx =
1737 					RTE_MAX_RXTX_INTR_VEC_ID - 1;
1738 			else
1739 				dev_info->next_q_idx = 0;
1740 		}
1741 	}
1742 }
1743 
rxa_sw_add(struct rte_event_eth_rx_adapter * rx_adapter,uint16_t eth_dev_id,int rx_queue_id,const struct rte_event_eth_rx_adapter_queue_conf * queue_conf)1744 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
1745 		uint16_t eth_dev_id,
1746 		int rx_queue_id,
1747 		const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
1748 {
1749 	struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1750 	struct rte_event_eth_rx_adapter_queue_conf temp_conf;
1751 	int ret;
1752 	struct eth_rx_poll_entry *rx_poll;
1753 	struct eth_rx_queue_info *rx_queue;
1754 	uint32_t *rx_wrr;
1755 	uint16_t nb_rx_queues;
1756 	uint32_t nb_rx_poll, nb_wrr;
1757 	uint32_t nb_rx_intr;
1758 	int num_intr_vec;
1759 	uint16_t wt;
1760 
1761 	if (queue_conf->servicing_weight == 0) {
1762 		struct rte_eth_dev_data *data = dev_info->dev->data;
1763 
1764 		temp_conf = *queue_conf;
1765 		if (!data->dev_conf.intr_conf.rxq) {
1766 			/* If Rx interrupts are disabled set wt = 1 */
1767 			temp_conf.servicing_weight = 1;
1768 		}
1769 		queue_conf = &temp_conf;
1770 	}
1771 
1772 	nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1773 	rx_queue = dev_info->rx_queue;
1774 	wt = queue_conf->servicing_weight;
1775 
1776 	if (dev_info->rx_queue == NULL) {
1777 		dev_info->rx_queue =
1778 		    rte_zmalloc_socket(rx_adapter->mem_name,
1779 				       nb_rx_queues *
1780 				       sizeof(struct eth_rx_queue_info), 0,
1781 				       rx_adapter->socket_id);
1782 		if (dev_info->rx_queue == NULL)
1783 			return -ENOMEM;
1784 	}
1785 	rx_wrr = NULL;
1786 	rx_poll = NULL;
1787 
1788 	rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
1789 			queue_conf->servicing_weight,
1790 			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
1791 
1792 	if (dev_info->dev->intr_handle)
1793 		dev_info->multi_intr_cap =
1794 			rte_intr_cap_multiple(dev_info->dev->intr_handle);
1795 
1796 	ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
1797 				&rx_poll, &rx_wrr);
1798 	if (ret)
1799 		goto err_free_rxqueue;
1800 
1801 	if (wt == 0) {
1802 		num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
1803 
1804 		ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
1805 		if (ret)
1806 			goto err_free_rxqueue;
1807 
1808 		ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
1809 		if (ret)
1810 			goto err_free_rxqueue;
1811 	} else {
1812 
1813 		num_intr_vec = 0;
1814 		if (rx_adapter->num_rx_intr > nb_rx_intr) {
1815 			num_intr_vec = rxa_nb_intr_vect(dev_info,
1816 						rx_queue_id, 0);
1817 			/* interrupt based queues are being converted to
1818 			 * poll mode queues, delete the interrupt configuration
1819 			 * for those.
1820 			 */
1821 			ret = rxa_del_intr_queue(rx_adapter,
1822 						dev_info, rx_queue_id);
1823 			if (ret)
1824 				goto err_free_rxqueue;
1825 		}
1826 	}
1827 
1828 	if (nb_rx_intr == 0) {
1829 		ret = rxa_free_intr_resources(rx_adapter);
1830 		if (ret)
1831 			goto err_free_rxqueue;
1832 	}
1833 
1834 	if (wt == 0) {
1835 		uint16_t i;
1836 
1837 		if (rx_queue_id  == -1) {
1838 			for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1839 				dev_info->intr_queue[i] = i;
1840 		} else {
1841 			if (!rxa_intr_queue(dev_info, rx_queue_id))
1842 				dev_info->intr_queue[nb_rx_intr - 1] =
1843 					rx_queue_id;
1844 		}
1845 	}
1846 
1847 
1848 
1849 	rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
1850 	rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
1851 
1852 	rte_free(rx_adapter->eth_rx_poll);
1853 	rte_free(rx_adapter->wrr_sched);
1854 
1855 	rx_adapter->eth_rx_poll = rx_poll;
1856 	rx_adapter->wrr_sched = rx_wrr;
1857 	rx_adapter->wrr_len = nb_wrr;
1858 	rx_adapter->num_intr_vec += num_intr_vec;
1859 	return 0;
1860 
1861 err_free_rxqueue:
1862 	if (rx_queue == NULL) {
1863 		rte_free(dev_info->rx_queue);
1864 		dev_info->rx_queue = NULL;
1865 	}
1866 
1867 	rte_free(rx_poll);
1868 	rte_free(rx_wrr);
1869 
1870 	return 0;
1871 }
1872 
1873 static int
rxa_ctrl(uint8_t id,int start)1874 rxa_ctrl(uint8_t id, int start)
1875 {
1876 	struct rte_event_eth_rx_adapter *rx_adapter;
1877 	struct rte_eventdev *dev;
1878 	struct eth_device_info *dev_info;
1879 	uint32_t i;
1880 	int use_service = 0;
1881 	int stop = !start;
1882 
1883 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1884 	rx_adapter = rxa_id_to_adapter(id);
1885 	if (rx_adapter == NULL)
1886 		return -EINVAL;
1887 
1888 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
1889 
1890 	RTE_ETH_FOREACH_DEV(i) {
1891 		dev_info = &rx_adapter->eth_devices[i];
1892 		/* if start  check for num dev queues */
1893 		if (start && !dev_info->nb_dev_queues)
1894 			continue;
1895 		/* if stop check if dev has been started */
1896 		if (stop && !dev_info->dev_rx_started)
1897 			continue;
1898 		use_service |= !dev_info->internal_event_port;
1899 		dev_info->dev_rx_started = start;
1900 		if (dev_info->internal_event_port == 0)
1901 			continue;
1902 		start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
1903 						&rte_eth_devices[i]) :
1904 			(*dev->dev_ops->eth_rx_adapter_stop)(dev,
1905 						&rte_eth_devices[i]);
1906 	}
1907 
1908 	if (use_service) {
1909 		rte_spinlock_lock(&rx_adapter->rx_lock);
1910 		rx_adapter->rxa_started = start;
1911 		rte_service_runstate_set(rx_adapter->service_id, start);
1912 		rte_spinlock_unlock(&rx_adapter->rx_lock);
1913 	}
1914 
1915 	return 0;
1916 }
1917 
1918 int
rte_event_eth_rx_adapter_create_ext(uint8_t id,uint8_t dev_id,rte_event_eth_rx_adapter_conf_cb conf_cb,void * conf_arg)1919 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1920 				rte_event_eth_rx_adapter_conf_cb conf_cb,
1921 				void *conf_arg)
1922 {
1923 	struct rte_event_eth_rx_adapter *rx_adapter;
1924 	int ret;
1925 	int socket_id;
1926 	uint16_t i;
1927 	char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
1928 	const uint8_t default_rss_key[] = {
1929 		0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
1930 		0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
1931 		0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
1932 		0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
1933 		0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
1934 	};
1935 
1936 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1937 	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1938 	if (conf_cb == NULL)
1939 		return -EINVAL;
1940 
1941 	if (event_eth_rx_adapter == NULL) {
1942 		ret = rte_event_eth_rx_adapter_init();
1943 		if (ret)
1944 			return ret;
1945 	}
1946 
1947 	rx_adapter = rxa_id_to_adapter(id);
1948 	if (rx_adapter != NULL) {
1949 		RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
1950 		return -EEXIST;
1951 	}
1952 
1953 	socket_id = rte_event_dev_socket_id(dev_id);
1954 	snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
1955 		"rte_event_eth_rx_adapter_%d",
1956 		id);
1957 
1958 	rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
1959 			RTE_CACHE_LINE_SIZE, socket_id);
1960 	if (rx_adapter == NULL) {
1961 		RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
1962 		return -ENOMEM;
1963 	}
1964 
1965 	rx_adapter->eventdev_id = dev_id;
1966 	rx_adapter->socket_id = socket_id;
1967 	rx_adapter->conf_cb = conf_cb;
1968 	rx_adapter->conf_arg = conf_arg;
1969 	rx_adapter->id = id;
1970 	strcpy(rx_adapter->mem_name, mem_name);
1971 	rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
1972 					RTE_MAX_ETHPORTS *
1973 					sizeof(struct eth_device_info), 0,
1974 					socket_id);
1975 	rte_convert_rss_key((const uint32_t *)default_rss_key,
1976 			(uint32_t *)rx_adapter->rss_key_be,
1977 			    RTE_DIM(default_rss_key));
1978 
1979 	if (rx_adapter->eth_devices == NULL) {
1980 		RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
1981 		rte_free(rx_adapter);
1982 		return -ENOMEM;
1983 	}
1984 	rte_spinlock_init(&rx_adapter->rx_lock);
1985 	for (i = 0; i < RTE_MAX_ETHPORTS; i++)
1986 		rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
1987 
1988 	event_eth_rx_adapter[id] = rx_adapter;
1989 	if (conf_cb == rxa_default_conf_cb)
1990 		rx_adapter->default_cb_arg = 1;
1991 	rte_eventdev_trace_eth_rx_adapter_create(id, dev_id, conf_cb,
1992 		conf_arg);
1993 	return 0;
1994 }
1995 
1996 int
rte_event_eth_rx_adapter_create(uint8_t id,uint8_t dev_id,struct rte_event_port_conf * port_config)1997 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
1998 		struct rte_event_port_conf *port_config)
1999 {
2000 	struct rte_event_port_conf *pc;
2001 	int ret;
2002 
2003 	if (port_config == NULL)
2004 		return -EINVAL;
2005 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2006 
2007 	pc = rte_malloc(NULL, sizeof(*pc), 0);
2008 	if (pc == NULL)
2009 		return -ENOMEM;
2010 	*pc = *port_config;
2011 	ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2012 					rxa_default_conf_cb,
2013 					pc);
2014 	if (ret)
2015 		rte_free(pc);
2016 	return ret;
2017 }
2018 
2019 int
rte_event_eth_rx_adapter_free(uint8_t id)2020 rte_event_eth_rx_adapter_free(uint8_t id)
2021 {
2022 	struct rte_event_eth_rx_adapter *rx_adapter;
2023 
2024 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2025 
2026 	rx_adapter = rxa_id_to_adapter(id);
2027 	if (rx_adapter == NULL)
2028 		return -EINVAL;
2029 
2030 	if (rx_adapter->nb_queues) {
2031 		RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2032 				rx_adapter->nb_queues);
2033 		return -EBUSY;
2034 	}
2035 
2036 	if (rx_adapter->default_cb_arg)
2037 		rte_free(rx_adapter->conf_arg);
2038 	rte_free(rx_adapter->eth_devices);
2039 	rte_free(rx_adapter);
2040 	event_eth_rx_adapter[id] = NULL;
2041 
2042 	rte_eventdev_trace_eth_rx_adapter_free(id);
2043 	return 0;
2044 }
2045 
2046 int
rte_event_eth_rx_adapter_queue_add(uint8_t id,uint16_t eth_dev_id,int32_t rx_queue_id,const struct rte_event_eth_rx_adapter_queue_conf * queue_conf)2047 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2048 		uint16_t eth_dev_id,
2049 		int32_t rx_queue_id,
2050 		const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2051 {
2052 	int ret;
2053 	uint32_t cap;
2054 	struct rte_event_eth_rx_adapter *rx_adapter;
2055 	struct rte_eventdev *dev;
2056 	struct eth_device_info *dev_info;
2057 
2058 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2059 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2060 
2061 	rx_adapter = rxa_id_to_adapter(id);
2062 	if ((rx_adapter == NULL) || (queue_conf == NULL))
2063 		return -EINVAL;
2064 
2065 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2066 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2067 						eth_dev_id,
2068 						&cap);
2069 	if (ret) {
2070 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2071 			"eth port %" PRIu16, id, eth_dev_id);
2072 		return ret;
2073 	}
2074 
2075 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2076 		&& (queue_conf->rx_queue_flags &
2077 			RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2078 		RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2079 				" eth port: %" PRIu16 " adapter id: %" PRIu8,
2080 				eth_dev_id, id);
2081 		return -EINVAL;
2082 	}
2083 
2084 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2085 		(rx_queue_id != -1)) {
2086 		RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2087 			"event queue, eth port: %" PRIu16 " adapter id: %"
2088 			PRIu8, eth_dev_id, id);
2089 		return -EINVAL;
2090 	}
2091 
2092 	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2093 			rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2094 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2095 			 (uint16_t)rx_queue_id);
2096 		return -EINVAL;
2097 	}
2098 
2099 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2100 
2101 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2102 		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2103 					-ENOTSUP);
2104 		if (dev_info->rx_queue == NULL) {
2105 			dev_info->rx_queue =
2106 			    rte_zmalloc_socket(rx_adapter->mem_name,
2107 					dev_info->dev->data->nb_rx_queues *
2108 					sizeof(struct eth_rx_queue_info), 0,
2109 					rx_adapter->socket_id);
2110 			if (dev_info->rx_queue == NULL)
2111 				return -ENOMEM;
2112 		}
2113 
2114 		ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2115 				&rte_eth_devices[eth_dev_id],
2116 				rx_queue_id, queue_conf);
2117 		if (ret == 0) {
2118 			dev_info->internal_event_port = 1;
2119 			rxa_update_queue(rx_adapter,
2120 					&rx_adapter->eth_devices[eth_dev_id],
2121 					rx_queue_id,
2122 					1);
2123 		}
2124 	} else {
2125 		rte_spinlock_lock(&rx_adapter->rx_lock);
2126 		dev_info->internal_event_port = 0;
2127 		ret = rxa_init_service(rx_adapter, id);
2128 		if (ret == 0) {
2129 			uint32_t service_id = rx_adapter->service_id;
2130 			ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2131 					queue_conf);
2132 			rte_service_component_runstate_set(service_id,
2133 				rxa_sw_adapter_queue_count(rx_adapter));
2134 		}
2135 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2136 	}
2137 
2138 	rte_eventdev_trace_eth_rx_adapter_queue_add(id, eth_dev_id,
2139 		rx_queue_id, queue_conf, ret);
2140 	if (ret)
2141 		return ret;
2142 
2143 	return 0;
2144 }
2145 
2146 int
rte_event_eth_rx_adapter_queue_del(uint8_t id,uint16_t eth_dev_id,int32_t rx_queue_id)2147 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2148 				int32_t rx_queue_id)
2149 {
2150 	int ret = 0;
2151 	struct rte_eventdev *dev;
2152 	struct rte_event_eth_rx_adapter *rx_adapter;
2153 	struct eth_device_info *dev_info;
2154 	uint32_t cap;
2155 	uint32_t nb_rx_poll = 0;
2156 	uint32_t nb_wrr = 0;
2157 	uint32_t nb_rx_intr;
2158 	struct eth_rx_poll_entry *rx_poll = NULL;
2159 	uint32_t *rx_wrr = NULL;
2160 	int num_intr_vec;
2161 
2162 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2163 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2164 
2165 	rx_adapter = rxa_id_to_adapter(id);
2166 	if (rx_adapter == NULL)
2167 		return -EINVAL;
2168 
2169 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2170 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2171 						eth_dev_id,
2172 						&cap);
2173 	if (ret)
2174 		return ret;
2175 
2176 	if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2177 		rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2178 		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2179 			 (uint16_t)rx_queue_id);
2180 		return -EINVAL;
2181 	}
2182 
2183 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2184 
2185 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2186 		RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2187 				 -ENOTSUP);
2188 		ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2189 						&rte_eth_devices[eth_dev_id],
2190 						rx_queue_id);
2191 		if (ret == 0) {
2192 			rxa_update_queue(rx_adapter,
2193 					&rx_adapter->eth_devices[eth_dev_id],
2194 					rx_queue_id,
2195 					0);
2196 			if (dev_info->nb_dev_queues == 0) {
2197 				rte_free(dev_info->rx_queue);
2198 				dev_info->rx_queue = NULL;
2199 			}
2200 		}
2201 	} else {
2202 		rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2203 			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
2204 
2205 		ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2206 			&rx_poll, &rx_wrr);
2207 		if (ret)
2208 			return ret;
2209 
2210 		rte_spinlock_lock(&rx_adapter->rx_lock);
2211 
2212 		num_intr_vec = 0;
2213 		if (rx_adapter->num_rx_intr > nb_rx_intr) {
2214 
2215 			num_intr_vec = rxa_nb_intr_vect(dev_info,
2216 						rx_queue_id, 0);
2217 			ret = rxa_del_intr_queue(rx_adapter, dev_info,
2218 					rx_queue_id);
2219 			if (ret)
2220 				goto unlock_ret;
2221 		}
2222 
2223 		if (nb_rx_intr == 0) {
2224 			ret = rxa_free_intr_resources(rx_adapter);
2225 			if (ret)
2226 				goto unlock_ret;
2227 		}
2228 
2229 		rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2230 		rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2231 
2232 		rte_free(rx_adapter->eth_rx_poll);
2233 		rte_free(rx_adapter->wrr_sched);
2234 
2235 		if (nb_rx_intr == 0) {
2236 			rte_free(dev_info->intr_queue);
2237 			dev_info->intr_queue = NULL;
2238 		}
2239 
2240 		rx_adapter->eth_rx_poll = rx_poll;
2241 		rx_adapter->wrr_sched = rx_wrr;
2242 		rx_adapter->wrr_len = nb_wrr;
2243 		rx_adapter->num_intr_vec += num_intr_vec;
2244 
2245 		if (dev_info->nb_dev_queues == 0) {
2246 			rte_free(dev_info->rx_queue);
2247 			dev_info->rx_queue = NULL;
2248 		}
2249 unlock_ret:
2250 		rte_spinlock_unlock(&rx_adapter->rx_lock);
2251 		if (ret) {
2252 			rte_free(rx_poll);
2253 			rte_free(rx_wrr);
2254 			return ret;
2255 		}
2256 
2257 		rte_service_component_runstate_set(rx_adapter->service_id,
2258 				rxa_sw_adapter_queue_count(rx_adapter));
2259 	}
2260 
2261 	rte_eventdev_trace_eth_rx_adapter_queue_del(id, eth_dev_id,
2262 		rx_queue_id, ret);
2263 	return ret;
2264 }
2265 
2266 int
rte_event_eth_rx_adapter_start(uint8_t id)2267 rte_event_eth_rx_adapter_start(uint8_t id)
2268 {
2269 	rte_eventdev_trace_eth_rx_adapter_start(id);
2270 	return rxa_ctrl(id, 1);
2271 }
2272 
2273 int
rte_event_eth_rx_adapter_stop(uint8_t id)2274 rte_event_eth_rx_adapter_stop(uint8_t id)
2275 {
2276 	rte_eventdev_trace_eth_rx_adapter_stop(id);
2277 	return rxa_ctrl(id, 0);
2278 }
2279 
2280 int
rte_event_eth_rx_adapter_stats_get(uint8_t id,struct rte_event_eth_rx_adapter_stats * stats)2281 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2282 			       struct rte_event_eth_rx_adapter_stats *stats)
2283 {
2284 	struct rte_event_eth_rx_adapter *rx_adapter;
2285 	struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2286 	struct rte_event_eth_rx_adapter_stats dev_stats;
2287 	struct rte_eventdev *dev;
2288 	struct eth_device_info *dev_info;
2289 	uint32_t i;
2290 	int ret;
2291 
2292 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2293 
2294 	rx_adapter = rxa_id_to_adapter(id);
2295 	if (rx_adapter  == NULL || stats == NULL)
2296 		return -EINVAL;
2297 
2298 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2299 	memset(stats, 0, sizeof(*stats));
2300 	RTE_ETH_FOREACH_DEV(i) {
2301 		dev_info = &rx_adapter->eth_devices[i];
2302 		if (dev_info->internal_event_port == 0 ||
2303 			dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2304 			continue;
2305 		ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2306 						&rte_eth_devices[i],
2307 						&dev_stats);
2308 		if (ret)
2309 			continue;
2310 		dev_stats_sum.rx_packets += dev_stats.rx_packets;
2311 		dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2312 	}
2313 
2314 	if (rx_adapter->service_inited)
2315 		*stats = rx_adapter->stats;
2316 
2317 	stats->rx_packets += dev_stats_sum.rx_packets;
2318 	stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2319 	return 0;
2320 }
2321 
2322 int
rte_event_eth_rx_adapter_stats_reset(uint8_t id)2323 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2324 {
2325 	struct rte_event_eth_rx_adapter *rx_adapter;
2326 	struct rte_eventdev *dev;
2327 	struct eth_device_info *dev_info;
2328 	uint32_t i;
2329 
2330 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2331 
2332 	rx_adapter = rxa_id_to_adapter(id);
2333 	if (rx_adapter == NULL)
2334 		return -EINVAL;
2335 
2336 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
2337 	RTE_ETH_FOREACH_DEV(i) {
2338 		dev_info = &rx_adapter->eth_devices[i];
2339 		if (dev_info->internal_event_port == 0 ||
2340 			dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2341 			continue;
2342 		(*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2343 							&rte_eth_devices[i]);
2344 	}
2345 
2346 	memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2347 	return 0;
2348 }
2349 
2350 int
rte_event_eth_rx_adapter_service_id_get(uint8_t id,uint32_t * service_id)2351 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2352 {
2353 	struct rte_event_eth_rx_adapter *rx_adapter;
2354 
2355 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2356 
2357 	rx_adapter = rxa_id_to_adapter(id);
2358 	if (rx_adapter == NULL || service_id == NULL)
2359 		return -EINVAL;
2360 
2361 	if (rx_adapter->service_inited)
2362 		*service_id = rx_adapter->service_id;
2363 
2364 	return rx_adapter->service_inited ? 0 : -ESRCH;
2365 }
2366 
2367 int
rte_event_eth_rx_adapter_cb_register(uint8_t id,uint16_t eth_dev_id,rte_event_eth_rx_adapter_cb_fn cb_fn,void * cb_arg)2368 rte_event_eth_rx_adapter_cb_register(uint8_t id,
2369 					uint16_t eth_dev_id,
2370 					rte_event_eth_rx_adapter_cb_fn cb_fn,
2371 					void *cb_arg)
2372 {
2373 	struct rte_event_eth_rx_adapter *rx_adapter;
2374 	struct eth_device_info *dev_info;
2375 	uint32_t cap;
2376 	int ret;
2377 
2378 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2379 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2380 
2381 	rx_adapter = rxa_id_to_adapter(id);
2382 	if (rx_adapter == NULL)
2383 		return -EINVAL;
2384 
2385 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
2386 	if (dev_info->rx_queue == NULL)
2387 		return -EINVAL;
2388 
2389 	ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2390 						eth_dev_id,
2391 						&cap);
2392 	if (ret) {
2393 		RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2394 			"eth port %" PRIu16, id, eth_dev_id);
2395 		return ret;
2396 	}
2397 
2398 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2399 		RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
2400 				PRIu16, eth_dev_id);
2401 		return -EINVAL;
2402 	}
2403 
2404 	rte_spinlock_lock(&rx_adapter->rx_lock);
2405 	dev_info->cb_fn = cb_fn;
2406 	dev_info->cb_arg = cb_arg;
2407 	rte_spinlock_unlock(&rx_adapter->rx_lock);
2408 
2409 	return 0;
2410 }
2411