1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2019 Intel Corporation
3 */
4
5 #include <string.h>
6 #include <stdint.h>
7
8 #include <rte_mbuf.h>
9 #include <rte_malloc.h>
10
11 #include "rte_port_eventdev.h"
12
13 /*
14 * Port EVENTDEV Reader
15 */
16 #ifdef RTE_PORT_STATS_COLLECT
17
18 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_IN_ADD(port, val) \
19 do {port->stats.n_pkts_in += val;} while (0)
20 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_DROP_ADD(port, val) \
21 do {port->stats.n_pkts_drop += val;} while (0)
22
23 #else
24
25 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_IN_ADD(port, val)
26 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_DROP_ADD(port, val)
27
28 #endif
29
30 struct rte_port_eventdev_reader {
31 struct rte_port_in_stats stats;
32
33 uint8_t eventdev_id;
34 uint16_t port_id;
35
36 struct rte_event ev[RTE_PORT_IN_BURST_SIZE_MAX];
37 };
38
39 static void *
rte_port_eventdev_reader_create(void * params,int socket_id)40 rte_port_eventdev_reader_create(void *params, int socket_id)
41 {
42 struct rte_port_eventdev_reader_params *conf =
43 params;
44 struct rte_port_eventdev_reader *port;
45
46 /* Check input parameters */
47 if (conf == NULL) {
48 RTE_LOG(ERR, PORT, "%s: params is NULL\n", __func__);
49 return NULL;
50 }
51
52 /* Memory allocation */
53 port = rte_zmalloc_socket("PORT", sizeof(*port),
54 RTE_CACHE_LINE_SIZE, socket_id);
55 if (port == NULL) {
56 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
57 return NULL;
58 }
59
60 /* Initialization */
61 port->eventdev_id = conf->eventdev_id;
62 port->port_id = conf->port_id;
63
64 return port;
65 }
66
67 static int
rte_port_eventdev_reader_rx(void * port,struct rte_mbuf ** pkts,uint32_t n_pkts)68 rte_port_eventdev_reader_rx(void *port, struct rte_mbuf **pkts, uint32_t n_pkts)
69 {
70 struct rte_port_eventdev_reader *p = port;
71 uint16_t rx_evts_cnt, i;
72
73 rx_evts_cnt = rte_event_dequeue_burst(p->eventdev_id, p->port_id,
74 p->ev, n_pkts, 0);
75
76 for (i = 0; i < rx_evts_cnt; i++)
77 pkts[i] = p->ev[i].mbuf;
78
79 RTE_PORT_EVENTDEV_READER_STATS_PKTS_IN_ADD(p, rx_evts_cnt);
80
81 return rx_evts_cnt;
82 }
83
84 static int
rte_port_eventdev_reader_free(void * port)85 rte_port_eventdev_reader_free(void *port)
86 {
87 if (port == NULL) {
88 RTE_LOG(ERR, PORT, "%s: port is NULL\n", __func__);
89 return -EINVAL;
90 }
91
92 rte_free(port);
93
94 return 0;
95 }
96
rte_port_eventdev_reader_stats_read(void * port,struct rte_port_in_stats * stats,int clear)97 static int rte_port_eventdev_reader_stats_read(void *port,
98 struct rte_port_in_stats *stats, int clear)
99 {
100 struct rte_port_eventdev_reader *p =
101 port;
102
103 if (stats != NULL)
104 memcpy(stats, &p->stats, sizeof(p->stats));
105
106 if (clear)
107 memset(&p->stats, 0, sizeof(p->stats));
108
109 return 0;
110 }
111
112 /*
113 * Port EVENTDEV Writer
114 */
115 #ifdef RTE_PORT_STATS_COLLECT
116
117 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(port, val) \
118 do {port->stats.n_pkts_in += val;} while (0)
119 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(port, val) \
120 do {port->stats.n_pkts_drop += val;} while (0)
121
122 #else
123
124 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(port, val)
125 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(port, val)
126
127 #endif
128
129 struct rte_port_eventdev_writer {
130 struct rte_port_out_stats stats;
131
132 struct rte_event ev[2 * RTE_PORT_IN_BURST_SIZE_MAX];
133
134 uint32_t enq_burst_sz;
135 uint32_t enq_buf_count;
136 uint64_t bsz_mask;
137
138 uint8_t eventdev_id;
139 uint8_t port_id;
140 uint8_t queue_id;
141 uint8_t sched_type;
142 uint8_t evt_op;
143 };
144
145 static void *
rte_port_eventdev_writer_create(void * params,int socket_id)146 rte_port_eventdev_writer_create(void *params, int socket_id)
147 {
148 struct rte_port_eventdev_writer_params *conf =
149 params;
150 struct rte_port_eventdev_writer *port;
151 unsigned int i;
152
153 /* Check input parameters */
154 if ((conf == NULL) ||
155 (conf->enq_burst_sz == 0) ||
156 (conf->enq_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX) ||
157 (!rte_is_power_of_2(conf->enq_burst_sz))) {
158 RTE_LOG(ERR, PORT, "%s: Invalid input parameters\n", __func__);
159 return NULL;
160 }
161
162 /* Memory allocation */
163 port = rte_zmalloc_socket("PORT", sizeof(*port),
164 RTE_CACHE_LINE_SIZE, socket_id);
165 if (port == NULL) {
166 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
167 return NULL;
168 }
169
170 /* Initialization */
171 port->enq_burst_sz = conf->enq_burst_sz;
172 port->enq_buf_count = 0;
173 port->bsz_mask = 1LLU << (conf->enq_burst_sz - 1);
174
175 port->eventdev_id = conf->eventdev_id;
176 port->port_id = conf->port_id;
177 port->queue_id = conf->queue_id;
178 port->sched_type = conf->sched_type;
179 port->evt_op = conf->evt_op;
180 memset(&port->ev, 0, sizeof(port->ev));
181
182 for (i = 0; i < RTE_DIM(port->ev); i++) {
183 port->ev[i].queue_id = port->queue_id;
184 port->ev[i].sched_type = port->sched_type;
185 port->ev[i].op = port->evt_op;
186 }
187
188 return port;
189 }
190
191 static inline void
send_burst(struct rte_port_eventdev_writer * p)192 send_burst(struct rte_port_eventdev_writer *p)
193 {
194 uint32_t nb_enq;
195
196 nb_enq = rte_event_enqueue_burst(p->eventdev_id, p->port_id,
197 p->ev, p->enq_buf_count);
198
199 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(p, p->enq_buf_count -
200 nb_enq);
201
202 for (; nb_enq < p->enq_buf_count; nb_enq++)
203 rte_pktmbuf_free(p->ev[nb_enq].mbuf);
204
205 p->enq_buf_count = 0;
206 }
207
208 static int
rte_port_eventdev_writer_tx(void * port,struct rte_mbuf * pkt)209 rte_port_eventdev_writer_tx(void *port, struct rte_mbuf *pkt)
210 {
211 struct rte_port_eventdev_writer *p = port;
212
213 p->ev[p->enq_buf_count++].mbuf = pkt;
214 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, 1);
215 if (p->enq_buf_count >= p->enq_burst_sz)
216 send_burst(p);
217
218 return 0;
219 }
220
221 static int
rte_port_eventdev_writer_tx_bulk(void * port,struct rte_mbuf ** pkts,uint64_t pkts_mask)222 rte_port_eventdev_writer_tx_bulk(void *port,
223 struct rte_mbuf **pkts,
224 uint64_t pkts_mask)
225 {
226 struct rte_port_eventdev_writer *p =
227 port;
228 uint64_t bsz_mask = p->bsz_mask;
229 uint32_t enq_buf_count = p->enq_buf_count;
230 uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
231 ((pkts_mask & bsz_mask) ^ bsz_mask);
232
233 if (expr == 0) {
234 uint64_t n_pkts = __builtin_popcountll(pkts_mask);
235 uint32_t i, n_enq_ok;
236
237 if (enq_buf_count)
238 send_burst(p);
239
240 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, n_pkts);
241
242 struct rte_event events[2 * RTE_PORT_IN_BURST_SIZE_MAX] = {};
243 for (i = 0; i < n_pkts; i++) {
244 events[i].mbuf = pkts[i];
245 events[i].queue_id = p->queue_id;
246 events[i].sched_type = p->sched_type;
247 events[i].op = p->evt_op;
248 }
249
250 n_enq_ok = rte_event_enqueue_burst(p->eventdev_id, p->port_id,
251 events, n_pkts);
252
253 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(p,
254 n_pkts - n_enq_ok);
255 for (; n_enq_ok < n_pkts; n_enq_ok++)
256 rte_pktmbuf_free(pkts[n_enq_ok]);
257
258 } else {
259 for (; pkts_mask;) {
260 uint32_t pkt_index = __builtin_ctzll(pkts_mask);
261 uint64_t pkt_mask = 1LLU << pkt_index;
262
263 p->ev[enq_buf_count++].mbuf = pkts[pkt_index];
264
265 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, 1);
266 pkts_mask &= ~pkt_mask;
267 }
268
269 p->enq_buf_count = enq_buf_count;
270 if (enq_buf_count >= p->enq_burst_sz)
271 send_burst(p);
272 }
273
274 return 0;
275 }
276
277 static int
rte_port_eventdev_writer_flush(void * port)278 rte_port_eventdev_writer_flush(void *port)
279 {
280 struct rte_port_eventdev_writer *p =
281 port;
282
283 if (p->enq_buf_count > 0)
284 send_burst(p);
285
286 return 0;
287 }
288
289 static int
rte_port_eventdev_writer_free(void * port)290 rte_port_eventdev_writer_free(void *port)
291 {
292 if (port == NULL) {
293 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
294 return -EINVAL;
295 }
296
297 rte_port_eventdev_writer_flush(port);
298 rte_free(port);
299
300 return 0;
301 }
302
rte_port_eventdev_writer_stats_read(void * port,struct rte_port_out_stats * stats,int clear)303 static int rte_port_eventdev_writer_stats_read(void *port,
304 struct rte_port_out_stats *stats, int clear)
305 {
306 struct rte_port_eventdev_writer *p =
307 port;
308
309 if (stats != NULL)
310 memcpy(stats, &p->stats, sizeof(p->stats));
311
312 if (clear)
313 memset(&p->stats, 0, sizeof(p->stats));
314
315 return 0;
316 }
317
318 /*
319 * Port EVENTDEV Writer Nodrop
320 */
321 #ifdef RTE_PORT_STATS_COLLECT
322
323 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val) \
324 do {port->stats.n_pkts_in += val;} while (0)
325 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val) \
326 do {port->stats.n_pkts_drop += val;} while (0)
327
328 #else
329
330 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val)
331 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val)
332
333 #endif
334
335 struct rte_port_eventdev_writer_nodrop {
336 struct rte_port_out_stats stats;
337
338 struct rte_event ev[2 * RTE_PORT_IN_BURST_SIZE_MAX];
339
340 uint32_t enq_burst_sz;
341 uint32_t enq_buf_count;
342 uint64_t bsz_mask;
343 uint64_t n_retries;
344 uint8_t eventdev_id;
345 uint8_t port_id;
346 uint8_t queue_id;
347 uint8_t sched_type;
348 uint8_t evt_op;
349 };
350
351
352 static void *
rte_port_eventdev_writer_nodrop_create(void * params,int socket_id)353 rte_port_eventdev_writer_nodrop_create(void *params, int socket_id)
354 {
355 struct rte_port_eventdev_writer_nodrop_params *conf =
356 params;
357 struct rte_port_eventdev_writer_nodrop *port;
358 unsigned int i;
359
360 /* Check input parameters */
361 if ((conf == NULL) ||
362 (conf->enq_burst_sz == 0) ||
363 (conf->enq_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX) ||
364 (!rte_is_power_of_2(conf->enq_burst_sz))) {
365 RTE_LOG(ERR, PORT, "%s: Invalid input parameters\n", __func__);
366 return NULL;
367 }
368
369 /* Memory allocation */
370 port = rte_zmalloc_socket("PORT", sizeof(*port),
371 RTE_CACHE_LINE_SIZE, socket_id);
372 if (port == NULL) {
373 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
374 return NULL;
375 }
376
377 /* Initialization */
378 port->enq_burst_sz = conf->enq_burst_sz;
379 port->enq_buf_count = 0;
380 port->bsz_mask = 1LLU << (conf->enq_burst_sz - 1);
381
382 port->eventdev_id = conf->eventdev_id;
383 port->port_id = conf->port_id;
384 port->queue_id = conf->queue_id;
385 port->sched_type = conf->sched_type;
386 port->evt_op = conf->evt_op;
387 memset(&port->ev, 0, sizeof(port->ev));
388
389 for (i = 0; i < RTE_DIM(port->ev); i++) {
390 port->ev[i].queue_id = port->queue_id;
391 port->ev[i].sched_type = port->sched_type;
392 port->ev[i].op = port->evt_op;
393 }
394 /*
395 * When n_retries is 0 it means that we should wait for every event to
396 * send no matter how many retries should it take. To limit number of
397 * branches in fast path, we use UINT64_MAX instead of branching.
398 */
399 port->n_retries = (conf->n_retries == 0) ? UINT64_MAX : conf->n_retries;
400
401 return port;
402 }
403
404 static inline void
send_burst_nodrop(struct rte_port_eventdev_writer_nodrop * p)405 send_burst_nodrop(struct rte_port_eventdev_writer_nodrop *p)
406 {
407 uint32_t nb_enq, i;
408
409 nb_enq = rte_event_enqueue_burst(p->eventdev_id, p->port_id,
410 p->ev, p->enq_buf_count);
411
412 /* We sent all the packets in a first try */
413 if (nb_enq >= p->enq_buf_count) {
414 p->enq_buf_count = 0;
415 return;
416 }
417
418 for (i = 0; i < p->n_retries; i++) {
419 nb_enq += rte_event_enqueue_burst(p->eventdev_id, p->port_id,
420 p->ev + nb_enq,
421 p->enq_buf_count - nb_enq);
422
423 /* We sent all the events in more than one try */
424 if (nb_enq >= p->enq_buf_count) {
425 p->enq_buf_count = 0;
426 return;
427 }
428 }
429 /* We didn't send the events in maximum allowed attempts */
430 RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_DROP_ADD(p,
431 p->enq_buf_count - nb_enq);
432 for (; nb_enq < p->enq_buf_count; nb_enq++)
433 rte_pktmbuf_free(p->ev[nb_enq].mbuf);
434
435 p->enq_buf_count = 0;
436 }
437
438 static int
rte_port_eventdev_writer_nodrop_tx(void * port,struct rte_mbuf * pkt)439 rte_port_eventdev_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
440 {
441 struct rte_port_eventdev_writer_nodrop *p = port;
442
443 p->ev[p->enq_buf_count++].mbuf = pkt;
444
445 RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
446 if (p->enq_buf_count >= p->enq_burst_sz)
447 send_burst_nodrop(p);
448
449 return 0;
450 }
451
452 static int
rte_port_eventdev_writer_nodrop_tx_bulk(void * port,struct rte_mbuf ** pkts,uint64_t pkts_mask)453 rte_port_eventdev_writer_nodrop_tx_bulk(void *port,
454 struct rte_mbuf **pkts,
455 uint64_t pkts_mask)
456 {
457 struct rte_port_eventdev_writer_nodrop *p =
458 port;
459
460 uint64_t bsz_mask = p->bsz_mask;
461 uint32_t enq_buf_count = p->enq_buf_count;
462 uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
463 ((pkts_mask & bsz_mask) ^ bsz_mask);
464
465 if (expr == 0) {
466 uint64_t n_pkts = __builtin_popcountll(pkts_mask);
467 uint32_t i, n_enq_ok;
468
469 if (enq_buf_count)
470 send_burst_nodrop(p);
471
472 RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(p, n_pkts);
473
474 struct rte_event events[RTE_PORT_IN_BURST_SIZE_MAX] = {};
475
476 for (i = 0; i < n_pkts; i++) {
477 events[i].mbuf = pkts[i];
478 events[i].queue_id = p->queue_id;
479 events[i].sched_type = p->sched_type;
480 events[i].op = p->evt_op;
481 }
482
483 n_enq_ok = rte_event_enqueue_burst(p->eventdev_id, p->port_id,
484 events, n_pkts);
485
486 if (n_enq_ok >= n_pkts)
487 return 0;
488
489 /*
490 * If we did not manage to enqueue all events in single burst,
491 * move remaining events to the buffer and call send burst.
492 */
493 for (; n_enq_ok < n_pkts; n_enq_ok++) {
494 struct rte_mbuf *pkt = pkts[n_enq_ok];
495 p->ev[p->enq_buf_count++].mbuf = pkt;
496 }
497 send_burst_nodrop(p);
498 } else {
499 for (; pkts_mask;) {
500 uint32_t pkt_index = __builtin_ctzll(pkts_mask);
501 uint64_t pkt_mask = 1LLU << pkt_index;
502
503 p->ev[enq_buf_count++].mbuf = pkts[pkt_index];
504
505 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, 1);
506 pkts_mask &= ~pkt_mask;
507 }
508
509 p->enq_buf_count = enq_buf_count;
510 if (enq_buf_count >= p->enq_burst_sz)
511 send_burst_nodrop(p);
512 }
513
514 return 0;
515 }
516
517 static int
rte_port_eventdev_writer_nodrop_flush(void * port)518 rte_port_eventdev_writer_nodrop_flush(void *port)
519 {
520 struct rte_port_eventdev_writer_nodrop *p =
521 port;
522
523 if (p->enq_buf_count > 0)
524 send_burst_nodrop(p);
525
526 return 0;
527 }
528
529 static int
rte_port_eventdev_writer_nodrop_free(void * port)530 rte_port_eventdev_writer_nodrop_free(void *port)
531 {
532 if (port == NULL) {
533 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
534 return -EINVAL;
535 }
536
537 rte_port_eventdev_writer_nodrop_flush(port);
538 rte_free(port);
539
540 return 0;
541 }
542
rte_port_eventdev_writer_nodrop_stats_read(void * port,struct rte_port_out_stats * stats,int clear)543 static int rte_port_eventdev_writer_nodrop_stats_read(void *port,
544 struct rte_port_out_stats *stats, int clear)
545 {
546 struct rte_port_eventdev_writer_nodrop *p =
547 port;
548
549 if (stats != NULL)
550 memcpy(stats, &p->stats, sizeof(p->stats));
551
552 if (clear)
553 memset(&p->stats, 0, sizeof(p->stats));
554
555 return 0;
556 }
557
558 /*
559 * Summary of port operations
560 */
561 struct rte_port_in_ops rte_port_eventdev_reader_ops = {
562 .f_create = rte_port_eventdev_reader_create,
563 .f_free = rte_port_eventdev_reader_free,
564 .f_rx = rte_port_eventdev_reader_rx,
565 .f_stats = rte_port_eventdev_reader_stats_read,
566 };
567
568 struct rte_port_out_ops rte_port_eventdev_writer_ops = {
569 .f_create = rte_port_eventdev_writer_create,
570 .f_free = rte_port_eventdev_writer_free,
571 .f_tx = rte_port_eventdev_writer_tx,
572 .f_tx_bulk = rte_port_eventdev_writer_tx_bulk,
573 .f_flush = rte_port_eventdev_writer_flush,
574 .f_stats = rte_port_eventdev_writer_stats_read,
575 };
576
577 struct rte_port_out_ops rte_port_eventdev_writer_nodrop_ops = {
578 .f_create = rte_port_eventdev_writer_nodrop_create,
579 .f_free = rte_port_eventdev_writer_nodrop_free,
580 .f_tx = rte_port_eventdev_writer_nodrop_tx,
581 .f_tx_bulk = rte_port_eventdev_writer_nodrop_tx_bulk,
582 .f_flush = rte_port_eventdev_writer_nodrop_flush,
583 .f_stats = rte_port_eventdev_writer_nodrop_stats_read,
584 };
585