xref: /f-stack/lib/ff_dpdk_if.c (revision 488adaba)
1 /*
2  * Copyright (C) 2017 THL A29 Limited, a Tencent company.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * 1. Redistributions of source code must retain the above copyright notice, this
9  *   list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright notice,
11  *   this list of conditions and the following disclaimer in the documentation
12  *   and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17  * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
18  * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
19  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
20  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
21  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
22  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
23  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24  *
25  */
26 
27 #include <rte_common.h>
28 #include <rte_byteorder.h>
29 #include <rte_log.h>
30 #include <rte_memory.h>
31 #include <rte_memcpy.h>
32 #include <rte_memzone.h>
33 #include <rte_config.h>
34 #include <rte_eal.h>
35 #include <rte_pci.h>
36 #include <rte_mbuf.h>
37 #include <rte_memory.h>
38 #include <rte_lcore.h>
39 #include <rte_launch.h>
40 #include <rte_ethdev.h>
41 #include <rte_debug.h>
42 #include <rte_common.h>
43 #include <rte_ether.h>
44 #include <rte_malloc.h>
45 #include <rte_cycles.h>
46 #include <rte_timer.h>
47 #include <rte_thash.h>
48 #include <rte_ip.h>
49 #include <rte_tcp.h>
50 #include <rte_udp.h>
51 
52 #include "ff_dpdk_if.h"
53 #include "ff_dpdk_pcap.h"
54 #include "ff_dpdk_kni.h"
55 #include "ff_config.h"
56 #include "ff_veth.h"
57 #include "ff_host_interface.h"
58 #include "ff_msg.h"
59 #include "ff_api.h"
60 
61 #define MEMPOOL_CACHE_SIZE 256
62 
63 #define ARP_RING_SIZE 2048
64 
65 #define MSG_RING_SIZE 32
66 
67 /*
68  * Configurable number of RX/TX ring descriptors
69  */
70 #define RX_QUEUE_SIZE 512
71 #define TX_QUEUE_SIZE 256
72 
73 #define MAX_PKT_BURST 32
74 #define BURST_TX_DRAIN_US 100 /* TX drain every ~100us */
75 
76 /*
77  * Try to avoid TX buffering if we have at least MAX_TX_BURST packets to send.
78  */
79 #define MAX_TX_BURST    (MAX_PKT_BURST / 2)
80 
81 #define NB_SOCKETS 8
82 
83 /* Configure how many packets ahead to prefetch, when reading packets */
84 #define PREFETCH_OFFSET    3
85 
86 #define MAX_RX_QUEUE_PER_LCORE 16
87 #define MAX_TX_QUEUE_PER_PORT RTE_MAX_ETHPORTS
88 #define MAX_RX_QUEUE_PER_PORT 128
89 
90 #define BITS_PER_HEX 4
91 
92 static int enable_kni;
93 static int kni_accept;
94 
95 static struct rte_timer freebsd_clock;
96 
97 // Mellanox Linux's driver key
98 static uint8_t default_rsskey_40bytes[40] = {
99     0xd1, 0x81, 0xc6, 0x2c, 0xf7, 0xf4, 0xdb, 0x5b,
100     0x19, 0x83, 0xa2, 0xfc, 0x94, 0x3e, 0x1a, 0xdb,
101     0xd9, 0x38, 0x9e, 0x6b, 0xd1, 0x03, 0x9c, 0x2c,
102     0xa7, 0x44, 0x99, 0xad, 0x59, 0x3d, 0x56, 0xd9,
103     0xf3, 0x25, 0x3c, 0x06, 0x2a, 0xdc, 0x1f, 0xfc
104 };
105 
106 static struct rte_eth_conf default_port_conf = {
107     .rxmode = {
108         .mq_mode = ETH_MQ_RX_RSS,
109         .max_rx_pkt_len = ETHER_MAX_LEN,
110         .split_hdr_size = 0, /**< hdr buf size */
111         .header_split   = 0, /**< Header Split disabled */
112         .hw_ip_checksum = 0, /**< IP checksum offload disabled */
113         .hw_vlan_filter = 0, /**< VLAN filtering disabled */
114         .hw_vlan_strip  = 0, /**< VLAN strip disabled. */
115         .hw_vlan_extend = 0, /**< Extended VLAN disabled. */
116         .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
117         .hw_strip_crc   = 0, /**< CRC stripped by hardware */
118         .enable_lro     = 0, /**< LRO disabled */
119     },
120     .rx_adv_conf = {
121         .rss_conf = {
122             .rss_key = default_rsskey_40bytes,
123             .rss_key_len = 40,
124             .rss_hf = ETH_RSS_PROTO_MASK,
125         },
126     },
127     .txmode = {
128         .mq_mode = ETH_MQ_TX_NONE,
129     },
130 };
131 
132 struct mbuf_table {
133     uint16_t len;
134     struct rte_mbuf *m_table[MAX_PKT_BURST];
135 };
136 
137 struct lcore_rx_queue {
138     uint8_t port_id;
139     uint8_t queue_id;
140 } __rte_cache_aligned;
141 
142 struct lcore_conf {
143     uint16_t proc_id;
144     uint16_t nb_procs;
145     uint16_t socket_id;
146     uint16_t nb_rx_queue;
147     uint16_t *lcore_proc;
148     struct lcore_rx_queue rx_queue_list[MAX_RX_QUEUE_PER_LCORE];
149     uint16_t tx_queue_id[RTE_MAX_ETHPORTS];
150     struct mbuf_table tx_mbufs[RTE_MAX_ETHPORTS];
151     char *pcap[RTE_MAX_ETHPORTS];
152 } __rte_cache_aligned;
153 
154 static struct lcore_conf lcore_conf;
155 
156 static struct rte_mempool *pktmbuf_pool[NB_SOCKETS];
157 
158 static struct rte_ring **arp_ring[RTE_MAX_LCORE];
159 
160 struct ff_msg_ring {
161     char ring_name[2][RTE_RING_NAMESIZE];
162     /* ring[0] for lcore recv msg, other send */
163     /* ring[1] for lcore send msg, other read */
164     struct rte_ring *ring[2];
165 } __rte_cache_aligned;
166 
167 static struct ff_msg_ring msg_ring[RTE_MAX_LCORE];
168 static struct rte_mempool *message_pool;
169 
170 struct ff_dpdk_if_context {
171     void *sc;
172     void *ifp;
173     uint16_t port_id;
174     struct ff_hw_features hw_features;
175 } __rte_cache_aligned;
176 
177 static struct ff_dpdk_if_context *veth_ctx[RTE_MAX_ETHPORTS];
178 
179 extern void ff_hardclock(void);
180 
181 static void
182 freebsd_hardclock_job(__rte_unused struct rte_timer *timer,
183     __rte_unused void *arg) {
184     ff_hardclock();
185 }
186 
187 struct ff_dpdk_if_context *
188 ff_dpdk_register_if(void *sc, void *ifp, struct ff_port_cfg *cfg)
189 {
190     struct ff_dpdk_if_context *ctx;
191 
192     ctx = calloc(1, sizeof(struct ff_dpdk_if_context));
193     if (ctx == NULL)
194         return NULL;
195 
196     ctx->sc = sc;
197     ctx->ifp = ifp;
198     ctx->port_id = cfg->port_id;
199     ctx->hw_features = cfg->hw_features;
200 
201     return ctx;
202 }
203 
204 void
205 ff_dpdk_deregister_if(struct ff_dpdk_if_context *ctx)
206 {
207     free(ctx);
208 }
209 
210 static void
211 check_all_ports_link_status(void)
212 {
213     #define CHECK_INTERVAL 100 /* 100ms */
214     #define MAX_CHECK_TIME 90  /* 9s (90 * 100ms) in total */
215 
216     uint8_t portid, count, all_ports_up, print_flag = 0;
217     struct rte_eth_link link;
218 
219     printf("\nChecking link status");
220     fflush(stdout);
221 
222     int i, nb_ports;
223     nb_ports = ff_global_cfg.dpdk.nb_ports;
224     for (count = 0; count <= MAX_CHECK_TIME; count++) {
225         all_ports_up = 1;
226         for (i = 0; i < nb_ports; i++) {
227             uint8_t portid = ff_global_cfg.dpdk.port_cfgs[i].port_id;
228             memset(&link, 0, sizeof(link));
229             rte_eth_link_get_nowait(portid, &link);
230 
231             /* print link status if flag set */
232             if (print_flag == 1) {
233                 if (link.link_status) {
234                     printf("Port %d Link Up - speed %u "
235                         "Mbps - %s\n", (int)portid,
236                         (unsigned)link.link_speed,
237                         (link.link_duplex == ETH_LINK_FULL_DUPLEX) ?
238                         ("full-duplex") : ("half-duplex\n"));
239                 } else {
240                     printf("Port %d Link Down\n", (int)portid);
241                 }
242                 continue;
243             }
244             /* clear all_ports_up flag if any link down */
245             if (link.link_status == 0) {
246                 all_ports_up = 0;
247                 break;
248             }
249         }
250 
251         /* after finally printing all link status, get out */
252         if (print_flag == 1)
253             break;
254 
255         if (all_ports_up == 0) {
256             printf(".");
257             fflush(stdout);
258             rte_delay_ms(CHECK_INTERVAL);
259         }
260 
261         /* set the print_flag if all ports up or timeout */
262         if (all_ports_up == 1 || count == (MAX_CHECK_TIME - 1)) {
263             print_flag = 1;
264             printf("done\n");
265         }
266     }
267 }
268 
269 static int
270 xdigit2val(unsigned char c)
271 {
272     int val;
273 
274     if (isdigit(c))
275         val = c - '0';
276     else if (isupper(c))
277         val = c - 'A' + 10;
278     else
279         val = c - 'a' + 10;
280     return val;
281 }
282 
283 static int
284 parse_lcore_mask(const char *coremask, uint16_t *lcore_proc,
285     uint16_t nb_procs)
286 {
287     int i, j, idx = 0;
288     unsigned count = 0;
289     char c;
290     int val;
291 
292     if (coremask == NULL)
293         return -1;
294 
295     /* Remove all blank characters ahead and after.
296      * Remove 0x/0X if exists.
297      */
298     while (isblank(*coremask))
299         coremask++;
300     if (coremask[0] == '0' && ((coremask[1] == 'x')
301         || (coremask[1] == 'X')))
302         coremask += 2;
303 
304     i = strlen(coremask);
305     while ((i > 0) && isblank(coremask[i - 1]))
306         i--;
307 
308     if (i == 0)
309         return -1;
310 
311     for (i = i - 1; i >= 0 && idx < RTE_MAX_LCORE && count < nb_procs; i--) {
312         c = coremask[i];
313         if (isxdigit(c) == 0) {
314             return -1;
315         }
316         val = xdigit2val(c);
317         for (j = 0; j < BITS_PER_HEX && idx < RTE_MAX_LCORE && count < nb_procs;
318             j++, idx++) {
319             if ((1 << j) & val) {
320                 if (!lcore_config[idx].detected) {
321                     RTE_LOG(ERR, EAL, "lcore %u unavailable\n", idx);
322                     return -1;
323                 }
324                 lcore_proc[count] = idx;
325                 count++;
326             }
327         }
328     }
329 
330     for (; i >= 0; i--)
331         if (coremask[i] != '0')
332             return -1;
333 
334     if (count < nb_procs)
335         return -1;
336 
337     return 0;
338 }
339 
340 static int
341 init_lcore_conf(void)
342 {
343     uint8_t nb_ports = rte_eth_dev_count();
344     if (nb_ports == 0) {
345         rte_exit(EXIT_FAILURE, "No probed ethernet devices\n");
346     }
347 
348     lcore_conf.proc_id = ff_global_cfg.dpdk.proc_id;
349     lcore_conf.nb_procs = ff_global_cfg.dpdk.nb_procs;
350     lcore_conf.lcore_proc = rte_zmalloc(NULL,
351         sizeof(uint16_t)*lcore_conf.nb_procs, 0);
352     if (lcore_conf.lcore_proc == NULL) {
353         rte_exit(EXIT_FAILURE, "rte_zmalloc lcore_proc failed\n");
354     }
355 
356     int ret = parse_lcore_mask(ff_global_cfg.dpdk.lcore_mask,
357         lcore_conf.lcore_proc, lcore_conf.nb_procs);
358     if (ret < 0) {
359         rte_exit(EXIT_FAILURE, "parse_lcore_mask failed:%s\n",
360             ff_global_cfg.dpdk.lcore_mask);
361     }
362 
363     uint16_t socket_id = 0;
364     if (ff_global_cfg.dpdk.numa_on) {
365         socket_id = rte_lcore_to_socket_id(rte_lcore_id());
366     }
367 
368     lcore_conf.socket_id = socket_id;
369 
370     /* Currently, proc id 1:1 map to rx/tx queue id per port. */
371     uint8_t port_id, enabled_ports = 0;
372     for (port_id = 0; port_id < nb_ports; port_id++) {
373         if (ff_global_cfg.dpdk.port_mask &&
374             (ff_global_cfg.dpdk.port_mask & (1 << port_id)) == 0) {
375             printf("\nSkipping disabled port %d\n", port_id);
376             continue;
377         }
378 
379         if (port_id >= ff_global_cfg.dpdk.nb_ports) {
380             printf("\nSkipping non-configured port %d\n", port_id);
381             break;
382         }
383 
384         uint16_t nb_rx_queue = lcore_conf.nb_rx_queue;
385         lcore_conf.rx_queue_list[nb_rx_queue].port_id = port_id;
386         lcore_conf.rx_queue_list[nb_rx_queue].queue_id = lcore_conf.proc_id;
387         lcore_conf.nb_rx_queue++;
388 
389         lcore_conf.tx_queue_id[port_id] = lcore_conf.proc_id;
390         lcore_conf.pcap[port_id] = ff_global_cfg.dpdk.port_cfgs[enabled_ports].pcap;
391 
392         ff_global_cfg.dpdk.port_cfgs[enabled_ports].port_id = port_id;
393 
394         enabled_ports++;
395     }
396 
397     ff_global_cfg.dpdk.nb_ports = enabled_ports;
398 
399     return 0;
400 }
401 
402 static int
403 init_mem_pool(void)
404 {
405     uint8_t nb_ports = ff_global_cfg.dpdk.nb_ports;
406     uint32_t nb_lcores = ff_global_cfg.dpdk.nb_procs;
407     uint32_t nb_tx_queue = nb_lcores;
408     uint32_t nb_rx_queue = lcore_conf.nb_rx_queue * nb_lcores;
409 
410     unsigned nb_mbuf = RTE_MAX (
411         (nb_rx_queue*RX_QUEUE_SIZE          +
412         nb_ports*nb_lcores*MAX_PKT_BURST    +
413         nb_ports*nb_tx_queue*TX_QUEUE_SIZE  +
414         nb_lcores*MEMPOOL_CACHE_SIZE),
415         (unsigned)8192);
416 
417     unsigned socketid = 0;
418     uint16_t i, lcore_id;
419     char s[64];
420     int numa_on = ff_global_cfg.dpdk.numa_on;
421 
422     for (i = 0; i < lcore_conf.nb_procs; i++) {
423         lcore_id = lcore_conf.lcore_proc[i];
424         if (numa_on) {
425             socketid = rte_lcore_to_socket_id(lcore_id);
426         }
427 
428         if (socketid >= NB_SOCKETS) {
429             rte_exit(EXIT_FAILURE, "Socket %d of lcore %u is out of range %d\n",
430                 socketid, i, NB_SOCKETS);
431         }
432 
433         if (pktmbuf_pool[socketid] != NULL) {
434             continue;
435         }
436 
437         if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
438             snprintf(s, sizeof(s), "mbuf_pool_%d", socketid);
439             pktmbuf_pool[socketid] =
440                 rte_pktmbuf_pool_create(s, nb_mbuf,
441                     MEMPOOL_CACHE_SIZE, 0,
442                     RTE_MBUF_DEFAULT_BUF_SIZE, socketid);
443         } else {
444             snprintf(s, sizeof(s), "mbuf_pool_%d", socketid);
445             pktmbuf_pool[socketid] = rte_mempool_lookup(s);
446         }
447 
448         if (pktmbuf_pool[socketid] == NULL) {
449             rte_exit(EXIT_FAILURE, "Cannot create mbuf pool on socket %d\n", socketid);
450         } else {
451             printf("create mbuf pool on socket %d\n", socketid);
452         }
453     }
454 
455     return 0;
456 }
457 
458 static struct rte_ring *
459 create_ring(const char *name, unsigned count, int socket_id, unsigned flags)
460 {
461     struct rte_ring *ring;
462 
463     if (name == NULL)
464         return NULL;
465 
466     /* If already create, just attached it */
467     if (likely((ring = rte_ring_lookup(name)) != NULL))
468         return ring;
469 
470     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
471         return rte_ring_create(name, count, socket_id, flags);
472     } else {
473         return rte_ring_lookup(name);
474     }
475 }
476 
477 static int
478 init_arp_ring(void)
479 {
480     int i, j, ret;
481     char name_buf[RTE_RING_NAMESIZE];
482     int nb_procs = ff_global_cfg.dpdk.nb_procs;
483     int proc_id = ff_global_cfg.dpdk.proc_id;
484 
485     /* Allocate arp ring ptr according to eth dev count. */
486     int nb_ports = rte_eth_dev_count();
487     for(i = 0; i < nb_procs; ++i) {
488         snprintf(name_buf, RTE_RING_NAMESIZE, "ring_ptr_%d_%d",
489             proc_id, i);
490 
491         arp_ring[i] = rte_zmalloc(name_buf,
492             sizeof(struct rte_ring *) * nb_ports,
493              RTE_CACHE_LINE_SIZE);
494         if (arp_ring[i] == NULL) {
495             rte_exit(EXIT_FAILURE, "rte_zmalloc(%s (struct rte_ring*)) "
496                 "failed\n", name_buf);
497         }
498     }
499 
500     unsigned socketid = lcore_conf.socket_id;
501 
502     /* Create ring according to ports actually being used. */
503     nb_ports = ff_global_cfg.dpdk.nb_ports;
504     for (j = 0; j < nb_ports; j++) {
505         uint8_t port_id = ff_global_cfg.dpdk.port_cfgs[j].port_id;
506 
507         for(i = 0; i < nb_procs; ++i) {
508             snprintf(name_buf, RTE_RING_NAMESIZE, "arp_ring_%d_%d", i, port_id);
509             arp_ring[i][port_id] = create_ring(name_buf, ARP_RING_SIZE,
510                 socketid, RING_F_SC_DEQ);
511 
512             if (arp_ring[i][port_id] == NULL)
513                 rte_panic("create ring:%s failed!\n", name_buf);
514 
515             printf("create ring:%s success, %u ring entries are now free!\n",
516                 name_buf, rte_ring_free_count(arp_ring[i][port_id]));
517         }
518     }
519 
520     return 0;
521 }
522 
523 static void
524 ff_msg_init(struct rte_mempool *mp,
525     __attribute__((unused)) void *opaque_arg,
526     void *obj, __attribute__((unused)) unsigned i)
527 {
528     struct ff_msg *msg = (struct ff_msg *)obj;
529     msg->msg_type = FF_UNKNOWN;
530     msg->buf_addr = (char *)msg + sizeof(struct ff_msg);
531     msg->buf_len = mp->elt_size - sizeof(struct ff_msg);
532 }
533 
534 static int
535 init_msg_ring(void)
536 {
537     uint16_t i;
538     uint16_t nb_procs = ff_global_cfg.dpdk.nb_procs;
539     unsigned socketid = lcore_conf.socket_id;
540 
541     /* Create message buffer pool */
542     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
543         message_pool = rte_mempool_create(FF_MSG_POOL,
544            MSG_RING_SIZE * 2 * nb_procs,
545            MAX_MSG_BUF_SIZE, MSG_RING_SIZE / 2, 0,
546            NULL, NULL, ff_msg_init, NULL,
547            socketid, 0);
548     } else {
549         message_pool = rte_mempool_lookup(FF_MSG_POOL);
550     }
551 
552     if (message_pool == NULL) {
553         rte_panic("Create msg mempool failed\n");
554     }
555 
556     for(i = 0; i < nb_procs; ++i) {
557         snprintf(msg_ring[i].ring_name[0], RTE_RING_NAMESIZE,
558             "%s%u", FF_MSG_RING_IN, i);
559         snprintf(msg_ring[i].ring_name[1], RTE_RING_NAMESIZE,
560             "%s%u", FF_MSG_RING_OUT, i);
561 
562         msg_ring[i].ring[0] = create_ring(msg_ring[i].ring_name[0],
563             MSG_RING_SIZE, socketid, RING_F_SP_ENQ | RING_F_SC_DEQ);
564         if (msg_ring[i].ring[0] == NULL)
565             rte_panic("create ring::%s failed!\n", msg_ring[i].ring_name[0]);
566 
567         msg_ring[i].ring[1] = create_ring(msg_ring[i].ring_name[1],
568             MSG_RING_SIZE, socketid, RING_F_SP_ENQ | RING_F_SC_DEQ);
569         if (msg_ring[i].ring[1] == NULL)
570             rte_panic("create ring::%s failed!\n", msg_ring[i].ring_name[0]);
571     }
572 
573     return 0;
574 }
575 
576 static int
577 init_kni(void)
578 {
579     int nb_ports = rte_eth_dev_count();
580     kni_accept = 0;
581     if(strcasecmp(ff_global_cfg.kni.method, "accept") == 0)
582         kni_accept = 1;
583 
584     ff_kni_init(nb_ports, ff_global_cfg.kni.tcp_port,
585         ff_global_cfg.kni.udp_port);
586 
587     unsigned socket_id = lcore_conf.socket_id;
588     struct rte_mempool *mbuf_pool = pktmbuf_pool[socket_id];
589 
590     nb_ports = ff_global_cfg.dpdk.nb_ports;
591     int i, ret;
592     for (i = 0; i < nb_ports; i++) {
593         uint8_t port_id = ff_global_cfg.dpdk.port_cfgs[i].port_id;
594         ff_kni_alloc(port_id, socket_id, mbuf_pool);
595     }
596 
597     return 0;
598 }
599 
600 static int
601 init_port_start(void)
602 {
603     int nb_ports = ff_global_cfg.dpdk.nb_ports;
604     uint16_t nb_procs = ff_global_cfg.dpdk.nb_procs;
605     unsigned socketid = rte_lcore_to_socket_id(rte_lcore_id());
606     struct rte_mempool *mbuf_pool = pktmbuf_pool[socketid];
607     uint16_t i;
608 
609     for (i = 0; i < nb_ports; i++) {
610         uint8_t port_id = ff_global_cfg.dpdk.port_cfgs[i].port_id;
611 
612         struct rte_eth_dev_info dev_info;
613         rte_eth_dev_info_get(port_id, &dev_info);
614 
615         if (nb_procs > dev_info.max_rx_queues) {
616             rte_exit(EXIT_FAILURE, "num_procs[%d] bigger than max_rx_queues[%d]\n",
617                 nb_procs,
618                 dev_info.max_rx_queues);
619         }
620 
621         if (nb_procs > dev_info.max_tx_queues) {
622             rte_exit(EXIT_FAILURE, "num_procs[%d] bigger than max_tx_queues[%d]\n",
623                 nb_procs,
624                 dev_info.max_tx_queues);
625         }
626 
627         struct ether_addr addr;
628         rte_eth_macaddr_get(port_id, &addr);
629         printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
630                    " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
631                 (unsigned)port_id,
632                 addr.addr_bytes[0], addr.addr_bytes[1],
633                 addr.addr_bytes[2], addr.addr_bytes[3],
634                 addr.addr_bytes[4], addr.addr_bytes[5]);
635 
636         rte_memcpy(ff_global_cfg.dpdk.port_cfgs[i].mac,
637             addr.addr_bytes, ETHER_ADDR_LEN);
638 
639         /* Clear txq_flags - we do not need multi-mempool and refcnt */
640         dev_info.default_txconf.txq_flags = ETH_TXQ_FLAGS_NOMULTMEMP |
641             ETH_TXQ_FLAGS_NOREFCOUNT;
642 
643         /* Disable features that are not supported by port's HW */
644         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM)) {
645             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMUDP;
646         }
647 
648         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)) {
649             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMTCP;
650         }
651 
652         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_SCTP_CKSUM)) {
653             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMSCTP;
654         }
655 
656         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_VLAN_INSERT)) {
657             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOVLANOFFL;
658         }
659 
660         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_VLAN_INSERT)) {
661             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOVLANOFFL;
662         }
663 
664         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_TSO) &&
665             !(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_TSO)) {
666             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOMULTSEGS;
667         }
668 
669         struct rte_eth_conf port_conf = {0};
670 
671         /* Set RSS mode */
672         port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
673         port_conf.rx_adv_conf.rss_conf.rss_hf = ETH_RSS_PROTO_MASK;
674         port_conf.rx_adv_conf.rss_conf.rss_key = default_rsskey_40bytes;
675         port_conf.rx_adv_conf.rss_conf.rss_key_len = 40;
676 
677         /* Set Rx VLAN stripping */
678         if (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_VLAN_STRIP) {
679             port_conf.rxmode.hw_vlan_strip = 1;
680         }
681 
682         /* Enable HW CRC stripping */
683         port_conf.rxmode.hw_strip_crc = 1;
684 
685         /* FIXME: Enable TCP LRO ?*/
686         #if 0
687         if (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_LRO) {
688             printf("LRO is supported\n");
689             port_conf.rxmode.enable_lro = 1;
690             ff_global_cfg.dpdk.port_cfgs[i].hw_features.rx_lro = 1;
691         }
692         #endif
693 
694         /* Set Rx checksum checking */
695         if ((dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
696             (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_UDP_CKSUM) &&
697             (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)) {
698             printf("RX checksum offload supported\n");
699             port_conf.rxmode.hw_ip_checksum = 1;
700             ff_global_cfg.dpdk.port_cfgs[i].hw_features.rx_csum = 1;
701         }
702 
703         if ((dev_info.tx_offload_capa & DEV_TX_OFFLOAD_IPV4_CKSUM)) {
704             printf("TX ip checksum offload supported\n");
705             ff_global_cfg.dpdk.port_cfgs[i].hw_features.tx_csum_ip = 1;
706         }
707 
708         if ((dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM) &&
709             (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)) {
710             printf("TX TCP&UDP checksum offload supported\n");
711             ff_global_cfg.dpdk.port_cfgs[i].hw_features.tx_csum_l4 = 1;
712         }
713 
714         if (ff_global_cfg.dpdk.tso) {
715             if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_TSO) {
716                 printf("TSO is supported\n");
717                 ff_global_cfg.dpdk.port_cfgs[i].hw_features.tx_tso = 1;
718             }
719         } else {
720             printf("TSO is disabled\n");
721         }
722 
723         if (rte_eal_process_type() != RTE_PROC_PRIMARY) {
724             return 0;
725         }
726 
727         /* Currently, proc id 1:1 map to queue id per port. */
728         int ret = rte_eth_dev_configure(port_id, nb_procs, nb_procs, &port_conf);
729         if (ret != 0) {
730             return ret;
731         }
732 
733         uint16_t q;
734         for (q = 0; q < nb_procs; q++) {
735             ret = rte_eth_tx_queue_setup(port_id, q, TX_QUEUE_SIZE,
736                 socketid, &dev_info.default_txconf);
737             if (ret < 0) {
738                 return ret;
739             }
740 
741             ret = rte_eth_rx_queue_setup(port_id, q, RX_QUEUE_SIZE,
742                 socketid, &dev_info.default_rxconf, mbuf_pool);
743             if (ret < 0) {
744                 return ret;
745             }
746         }
747 
748         ret = rte_eth_dev_start(port_id);
749         if (ret < 0) {
750             return ret;
751         }
752 
753         /* Enable RX in promiscuous mode for the Ethernet device. */
754         if (ff_global_cfg.dpdk.promiscuous) {
755             rte_eth_promiscuous_enable(port_id);
756             ret = rte_eth_promiscuous_get(port_id);
757             if (ret == 1) {
758                 printf("set port %u to promiscuous mode ok\n", port_id);
759             } else {
760                 printf("set port %u to promiscuous mode error\n", port_id);
761             }
762         }
763 
764         /* Enable pcap dump */
765         if (ff_global_cfg.dpdk.port_cfgs[i].pcap) {
766             ff_enable_pcap(ff_global_cfg.dpdk.port_cfgs[i].pcap);
767         }
768     }
769 
770     return 0;
771 }
772 
773 static int
774 init_freebsd_clock(void)
775 {
776     rte_timer_subsystem_init();
777     uint64_t hz = rte_get_timer_hz();
778     uint64_t intrs = MS_PER_S/ff_global_cfg.freebsd.hz;
779     uint64_t tsc = (hz + MS_PER_S - 1) / MS_PER_S*intrs;
780 
781     rte_timer_init(&freebsd_clock);
782     rte_timer_reset(&freebsd_clock, tsc, PERIODICAL,
783         rte_lcore_id(), &freebsd_hardclock_job, NULL);
784 
785     return 0;
786 }
787 
788 int
789 ff_dpdk_init(int argc, char **argv)
790 {
791     if (ff_global_cfg.dpdk.nb_procs < 1 ||
792         ff_global_cfg.dpdk.nb_procs > RTE_MAX_LCORE ||
793         ff_global_cfg.dpdk.proc_id >= ff_global_cfg.dpdk.nb_procs ||
794         ff_global_cfg.dpdk.proc_id < 0) {
795         printf("param num_procs[%d] or proc_id[%d] error!\n",
796             ff_global_cfg.dpdk.nb_procs,
797             ff_global_cfg.dpdk.proc_id);
798         exit(1);
799     }
800 
801     int ret = rte_eal_init(argc, argv);
802     if (ret < 0) {
803         rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
804     }
805 
806     init_lcore_conf();
807 
808     init_mem_pool();
809 
810     init_arp_ring();
811 
812     init_msg_ring();
813 
814     enable_kni = ff_global_cfg.kni.enable;
815     if (enable_kni) {
816         init_kni();
817     }
818 
819     ret = init_port_start();
820     if (ret < 0) {
821         rte_exit(EXIT_FAILURE, "init_port_start failed\n");
822     }
823 
824     check_all_ports_link_status();
825 
826     init_freebsd_clock();
827 
828     return 0;
829 }
830 
831 static void
832 ff_veth_input(const struct ff_dpdk_if_context *ctx, struct rte_mbuf *pkt)
833 {
834     uint8_t rx_csum = ctx->hw_features.rx_csum;
835     if (rx_csum) {
836         if (pkt->ol_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD)) {
837             return;
838         }
839     }
840 
841     /*
842      * FIXME: should we save pkt->vlan_tci
843      * if (pkt->ol_flags & PKT_RX_VLAN_PKT)
844      */
845 
846     void *data = rte_pktmbuf_mtod(pkt, void*);
847     uint16_t len = rte_pktmbuf_data_len(pkt);
848 
849     void *hdr = ff_mbuf_gethdr(pkt, pkt->pkt_len, data, len, rx_csum);
850     if (hdr == NULL) {
851         rte_pktmbuf_free(pkt);
852         return;
853     }
854 
855     pkt = pkt->next;
856     void *prev = hdr;
857     while(pkt != NULL) {
858         data = rte_pktmbuf_mtod(pkt, void*);
859         len = rte_pktmbuf_data_len(pkt);
860 
861         void *mb = ff_mbuf_get(prev, data, len);
862         if (mb == NULL) {
863             ff_mbuf_free(hdr);
864             return;
865         }
866         pkt = pkt->next;
867         prev = mb;
868     }
869 
870     ff_veth_process_packet(ctx->ifp, hdr);
871 }
872 
873 static enum FilterReturn
874 protocol_filter(const void *data, uint16_t len)
875 {
876     if(len < sizeof(struct ether_hdr))
877         return FILTER_UNKNOWN;
878 
879     const struct ether_hdr *hdr;
880     hdr = (const struct ether_hdr *)data;
881 
882     if(ntohs(hdr->ether_type) == ETHER_TYPE_ARP)
883         return FILTER_ARP;
884 
885     if (!enable_kni) {
886         return FILTER_UNKNOWN;
887     }
888 
889     if(ntohs(hdr->ether_type) != ETHER_TYPE_IPv4)
890         return FILTER_UNKNOWN;
891 
892     return ff_kni_proto_filter(data + sizeof(struct ether_hdr),
893         len - sizeof(struct ether_hdr));
894 }
895 
896 static inline void
897 process_packets(uint8_t port_id, uint16_t queue_id, struct rte_mbuf **bufs,
898     uint16_t count, const struct ff_dpdk_if_context *ctx, int pkts_from_ring)
899 {
900     struct lcore_conf *qconf = &lcore_conf;
901 
902     uint16_t i;
903     for (i = 0; i < count; i++) {
904         struct rte_mbuf *rtem = bufs[i];
905 
906         if (unlikely(qconf->pcap[port_id] != NULL)) {
907             ff_dump_packets(qconf->pcap[port_id], rtem);
908         }
909 
910         void *data = rte_pktmbuf_mtod(rtem, void*);
911         uint16_t len = rte_pktmbuf_data_len(rtem);
912 
913         enum FilterReturn filter = protocol_filter(data, len);
914         if (filter == FILTER_ARP) {
915             struct rte_mempool *mbuf_pool;
916             struct rte_mbuf *mbuf_clone;
917             if (pkts_from_ring == 0) {
918                 uint16_t i;
919                 for(i = 0; i < qconf->nb_procs; ++i) {
920                     if(i == queue_id)
921                         continue;
922 
923                     mbuf_pool = pktmbuf_pool[rte_lcore_to_socket_id(qconf->lcore_proc[i])];
924                     mbuf_clone = rte_pktmbuf_clone(rtem, mbuf_pool);
925                     if(mbuf_clone) {
926                         int ret = rte_ring_enqueue(arp_ring[i][port_id], mbuf_clone);
927                         if (ret < 0)
928                             rte_pktmbuf_free(mbuf_clone);
929                     }
930                 }
931             }
932 
933             if (enable_kni && rte_eal_process_type() == RTE_PROC_PRIMARY) {
934                 mbuf_pool = pktmbuf_pool[qconf->socket_id];
935                 mbuf_clone = rte_pktmbuf_clone(rtem, mbuf_pool);
936                 if(mbuf_clone) {
937                     ff_kni_enqueue(port_id, rtem);
938                 }
939             }
940 
941             ff_veth_input(ctx, rtem);
942         } else if (enable_kni && ((filter == FILTER_KNI && kni_accept) ||
943             (filter == FILTER_UNKNOWN && !kni_accept)) ) {
944             ff_kni_enqueue(port_id, rtem);
945         } else {
946             ff_veth_input(ctx, rtem);
947         }
948     }
949 }
950 
951 static inline int
952 process_arp_ring(uint8_t port_id, uint16_t queue_id,
953     struct rte_mbuf **pkts_burst, const struct ff_dpdk_if_context *ctx)
954 {
955     /* read packet from ring buf and to process */
956     uint16_t nb_rb;
957     nb_rb = rte_ring_dequeue_burst(arp_ring[queue_id][port_id],
958         (void **)pkts_burst, MAX_PKT_BURST);
959 
960     if(nb_rb > 0) {
961         process_packets(port_id, queue_id, pkts_burst, nb_rb, ctx, 1);
962     }
963 
964     return 0;
965 }
966 
967 static inline void
968 handle_sysctl_msg(struct ff_msg *msg, uint16_t proc_id)
969 {
970     int ret = ff_sysctl(msg->sysctl.name, msg->sysctl.namelen,
971         msg->sysctl.old, msg->sysctl.oldlenp, msg->sysctl.new,
972         msg->sysctl.newlen);
973 
974     if (ret < 0) {
975         msg->result = errno;
976     } else {
977         msg->result = 0;
978     }
979 
980     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
981 }
982 
983 static inline void
984 handle_ioctl_msg(struct ff_msg *msg, uint16_t proc_id)
985 {
986     int fd, ret;
987     fd = ff_socket(AF_INET, SOCK_DGRAM, 0);
988     if (fd < 0) {
989         ret = -1;
990         goto done;
991     }
992 
993     ret = ff_ioctl(fd, msg->ioctl.cmd, msg->ioctl.data);
994 
995     ff_close(fd);
996 
997 done:
998     if (ret < 0) {
999         msg->result = errno;
1000     } else {
1001         msg->result = 0;
1002     }
1003 
1004     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
1005 }
1006 
1007 static inline void
1008 handle_default_msg(struct ff_msg *msg, uint16_t proc_id)
1009 {
1010     msg->result = EINVAL;
1011     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
1012 }
1013 
1014 static inline void
1015 handle_msg(struct ff_msg *msg, uint16_t proc_id)
1016 {
1017     switch (msg->msg_type) {
1018         case FF_SYSCTL:
1019             handle_sysctl_msg(msg, proc_id);
1020             break;
1021         case FF_IOCTL:
1022             handle_ioctl_msg(msg, proc_id);
1023             break;
1024         default:
1025             handle_default_msg(msg, proc_id);
1026             break;
1027     }
1028 }
1029 
1030 static inline int
1031 process_msg_ring(uint16_t proc_id)
1032 {
1033     void *msg;
1034     int ret = rte_ring_dequeue(msg_ring[proc_id].ring[0], &msg);
1035 
1036     if (unlikely(ret == 0)) {
1037         handle_msg((struct ff_msg *)msg, proc_id);
1038     }
1039 
1040     return 0;
1041 }
1042 
1043 /* Send burst of packets on an output interface */
1044 static inline int
1045 send_burst(struct lcore_conf *qconf, uint16_t n, uint8_t port)
1046 {
1047     struct rte_mbuf **m_table;
1048     int ret;
1049     uint16_t queueid;
1050 
1051     queueid = qconf->tx_queue_id[port];
1052     m_table = (struct rte_mbuf **)qconf->tx_mbufs[port].m_table;
1053 
1054     if (unlikely(qconf->pcap[port] != NULL)) {
1055         uint16_t i;
1056         for (i = 0; i < n; i++) {
1057             ff_dump_packets(qconf->pcap[port], m_table[i]);
1058         }
1059     }
1060 
1061     ret = rte_eth_tx_burst(port, queueid, m_table, n);
1062     if (unlikely(ret < n)) {
1063         do {
1064             rte_pktmbuf_free(m_table[ret]);
1065         } while (++ret < n);
1066     }
1067 
1068     return 0;
1069 }
1070 
1071 /* Enqueue a single packet, and send burst if queue is filled */
1072 static inline int
1073 send_single_packet(struct rte_mbuf *m, uint8_t port)
1074 {
1075     uint16_t len;
1076     struct lcore_conf *qconf;
1077 
1078     qconf = &lcore_conf;
1079     len = qconf->tx_mbufs[port].len;
1080     qconf->tx_mbufs[port].m_table[len] = m;
1081     len++;
1082 
1083     /* enough pkts to be sent */
1084     if (unlikely(len == MAX_PKT_BURST)) {
1085         send_burst(qconf, MAX_PKT_BURST, port);
1086         len = 0;
1087     }
1088 
1089     qconf->tx_mbufs[port].len = len;
1090     return 0;
1091 }
1092 
1093 int
1094 ff_dpdk_if_send(struct ff_dpdk_if_context *ctx, void *m,
1095     int total)
1096 {
1097     struct rte_mempool *mbuf_pool = pktmbuf_pool[lcore_conf.socket_id];
1098     struct rte_mbuf *head = rte_pktmbuf_alloc(mbuf_pool);
1099     if (head == NULL) {
1100         ff_mbuf_free(m);
1101         return -1;
1102     }
1103 
1104     head->pkt_len = total;
1105     head->nb_segs = 0;
1106 
1107     int off = 0;
1108     struct rte_mbuf *cur = head, *prev = NULL;
1109     while(total > 0) {
1110         if (cur == NULL) {
1111             cur = rte_pktmbuf_alloc(mbuf_pool);
1112             if (cur == NULL) {
1113                 rte_pktmbuf_free(head);
1114                 ff_mbuf_free(m);
1115                 return -1;
1116             }
1117         }
1118 
1119         void *data = rte_pktmbuf_mtod(cur, void*);
1120         int len = total > RTE_MBUF_DEFAULT_DATAROOM ? RTE_MBUF_DEFAULT_DATAROOM : total;
1121         int ret = ff_mbuf_copydata(m, data, off, len);
1122         if (ret < 0) {
1123             rte_pktmbuf_free(head);
1124             ff_mbuf_free(m);
1125             return -1;
1126         }
1127 
1128         if (prev != NULL) {
1129             prev->next = cur;
1130         }
1131         prev = cur;
1132 
1133         cur->data_len = len;
1134         off += len;
1135         total -= len;
1136         head->nb_segs++;
1137         cur = NULL;
1138     }
1139 
1140     struct ff_tx_offload offload = {0};
1141     ff_mbuf_tx_offload(m, &offload);
1142 
1143     if (offload.ip_csum) {
1144         head->ol_flags |= PKT_TX_IP_CKSUM;
1145         head->l2_len = sizeof(struct ether_hdr);
1146         head->l3_len = sizeof(struct ipv4_hdr);
1147     }
1148 
1149     if (ctx->hw_features.tx_csum_l4) {
1150         if (offload.tcp_csum) {
1151             head->ol_flags |= PKT_TX_TCP_CKSUM;
1152             head->l2_len = sizeof(struct ether_hdr);
1153             head->l3_len = sizeof(struct ipv4_hdr);
1154         }
1155 
1156         if (offload.tso_seg_size) {
1157             head->ol_flags |= PKT_TX_TCP_SEG;
1158             head->l4_len = sizeof(struct tcp_hdr);
1159             head->tso_segsz = offload.tso_seg_size;
1160         }
1161 
1162         if (offload.udp_csum) {
1163             head->ol_flags |= PKT_TX_UDP_CKSUM;
1164             head->l2_len = sizeof(struct ether_hdr);
1165             head->l3_len = sizeof(struct ipv4_hdr);
1166         }
1167     }
1168 
1169     ff_mbuf_free(m);
1170 
1171     return send_single_packet(head, ctx->port_id);
1172 }
1173 
1174 static int
1175 main_loop(void *arg)
1176 {
1177     struct loop_routine *lr = (struct loop_routine *)arg;
1178 
1179     struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
1180     unsigned lcore_id;
1181     uint64_t prev_tsc, diff_tsc, cur_tsc;
1182     int i, j, nb_rx;
1183     uint8_t port_id, queue_id;
1184     struct lcore_conf *qconf;
1185     const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) /
1186         US_PER_S * BURST_TX_DRAIN_US;
1187     struct ff_dpdk_if_context *ctx;
1188 
1189     prev_tsc = 0;
1190 
1191     lcore_id = rte_lcore_id();
1192     qconf = &lcore_conf;
1193 
1194     if (qconf->nb_rx_queue == 0) {
1195         printf("lcore %u has nothing to do\n", lcore_id);
1196         return 0;
1197     }
1198 
1199     while (1) {
1200         cur_tsc = rte_rdtsc();
1201         if (unlikely(freebsd_clock.expire < cur_tsc)) {
1202             rte_timer_manage();
1203         }
1204 
1205         /*
1206          * TX burst queue drain
1207          */
1208         diff_tsc = cur_tsc - prev_tsc;
1209         if (unlikely(diff_tsc > drain_tsc)) {
1210             /*
1211              * This could be optimized (use queueid instead of
1212              * portid), but it is not called so often
1213              */
1214             for (port_id = 0; port_id < RTE_MAX_ETHPORTS; port_id++) {
1215                 if (qconf->tx_mbufs[port_id].len == 0)
1216                     continue;
1217                 send_burst(qconf,
1218                     qconf->tx_mbufs[port_id].len,
1219                     port_id);
1220                 qconf->tx_mbufs[port_id].len = 0;
1221             }
1222 
1223             prev_tsc = cur_tsc;
1224         }
1225 
1226         /*
1227          * Read packet from RX queues
1228          */
1229         for (i = 0; i < qconf->nb_rx_queue; ++i) {
1230             port_id = qconf->rx_queue_list[i].port_id;
1231             queue_id = qconf->rx_queue_list[i].queue_id;
1232             ctx = veth_ctx[port_id];
1233 
1234             if (enable_kni && rte_eal_process_type() == RTE_PROC_PRIMARY) {
1235                 ff_kni_process(port_id, queue_id, pkts_burst, MAX_PKT_BURST);
1236             }
1237 
1238             process_arp_ring(port_id, queue_id, pkts_burst, ctx);
1239 
1240             nb_rx = rte_eth_rx_burst(port_id, queue_id, pkts_burst,
1241                 MAX_PKT_BURST);
1242             if (nb_rx == 0)
1243                 continue;
1244 
1245             /* Prefetch first packets */
1246             for (j = 0; j < PREFETCH_OFFSET && j < nb_rx; j++) {
1247                 rte_prefetch0(rte_pktmbuf_mtod(
1248                         pkts_burst[j], void *));
1249             }
1250 
1251             /* Prefetch and handle already prefetched packets */
1252             for (j = 0; j < (nb_rx - PREFETCH_OFFSET); j++) {
1253                 rte_prefetch0(rte_pktmbuf_mtod(pkts_burst[
1254                         j + PREFETCH_OFFSET], void *));
1255                 process_packets(port_id, queue_id, &pkts_burst[j], 1, ctx, 0);
1256             }
1257 
1258             /* Handle remaining prefetched packets */
1259             for (; j < nb_rx; j++) {
1260                 process_packets(port_id, queue_id, &pkts_burst[j], 1, ctx, 0);
1261             }
1262         }
1263 
1264         process_msg_ring(qconf->proc_id);
1265 
1266         if (likely(lr->loop != NULL)) {
1267             lr->loop(lr->arg);
1268         }
1269     }
1270 }
1271 
1272 int
1273 ff_dpdk_if_up(void) {
1274     int nb_ports = ff_global_cfg.dpdk.nb_ports;
1275     int i;
1276     for (i = 0; i < nb_ports; i++) {
1277         uint8_t port_id = ff_global_cfg.dpdk.port_cfgs[i].port_id;
1278         veth_ctx[port_id] = ff_veth_attach(ff_global_cfg.dpdk.port_cfgs + i);
1279         if (veth_ctx[port_id] == NULL) {
1280             rte_exit(EXIT_FAILURE, "ff_veth_attach failed");
1281         }
1282     }
1283 
1284     return 0;
1285 }
1286 
1287 void
1288 ff_dpdk_run(loop_func_t loop, void *arg) {
1289     struct loop_routine *lr = malloc(sizeof(struct loop_routine));
1290     lr->loop = loop;
1291     lr->arg = arg;
1292     rte_eal_mp_remote_launch(main_loop, lr, CALL_MASTER);
1293     rte_eal_mp_wait_lcore();
1294     free(lr);
1295 }
1296 
1297 void
1298 ff_dpdk_pktmbuf_free(void *m)
1299 {
1300     rte_pktmbuf_free((struct rte_mbuf *)m);
1301 }
1302 
1303 static uint32_t
1304 toeplitz_hash(unsigned keylen, const uint8_t *key,
1305     unsigned datalen, const uint8_t *data)
1306 {
1307     uint32_t hash = 0, v;
1308     u_int i, b;
1309 
1310     /* XXXRW: Perhaps an assertion about key length vs. data length? */
1311 
1312     v = (key[0]<<24) + (key[1]<<16) + (key[2] <<8) + key[3];
1313     for (i = 0; i < datalen; i++) {
1314         for (b = 0; b < 8; b++) {
1315             if (data[i] & (1<<(7-b)))
1316                 hash ^= v;
1317             v <<= 1;
1318             if ((i + 4) < keylen &&
1319                 (key[i+4] & (1<<(7-b))))
1320                 v |= 1;
1321         }
1322     }
1323     return (hash);
1324 }
1325 
1326 int
1327 ff_rss_check(uint32_t saddr, uint32_t daddr, uint16_t sport, uint16_t dport)
1328 {
1329     struct lcore_conf *qconf = &lcore_conf;
1330 
1331     if (qconf->nb_procs == 1) {
1332         return 1;
1333     }
1334 
1335     uint8_t data[sizeof(saddr) + sizeof(daddr) + sizeof(sport) +
1336         sizeof(dport)];
1337 
1338     unsigned datalen = 0;
1339 
1340     bcopy(&saddr, &data[datalen], sizeof(saddr));
1341     datalen += sizeof(saddr);
1342 
1343     bcopy(&daddr, &data[datalen], sizeof(daddr));
1344     datalen += sizeof(daddr);
1345 
1346     bcopy(&sport, &data[datalen], sizeof(sport));
1347     datalen += sizeof(sport);
1348 
1349     bcopy(&dport, &data[datalen], sizeof(dport));
1350     datalen += sizeof(dport);
1351 
1352     uint32_t hash = toeplitz_hash(sizeof(default_rsskey_40bytes), default_rsskey_40bytes, datalen, data);
1353 
1354     return (hash % qconf->nb_procs) == qconf->proc_id;
1355 }
1356 
1357 
1358