xref: /f-stack/lib/ff_dpdk_if.c (revision 144c6bcd)
1 /*
2  * Copyright (C) 2017 THL A29 Limited, a Tencent company.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * 1. Redistributions of source code must retain the above copyright notice, this
9  *   list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright notice,
11  *   this list of conditions and the following disclaimer in the documentation
12  *   and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17  * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
18  * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
19  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
20  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
21  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
22  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
23  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24  *
25  */
26 
27 #include <rte_common.h>
28 #include <rte_byteorder.h>
29 #include <rte_log.h>
30 #include <rte_memory.h>
31 #include <rte_memcpy.h>
32 #include <rte_memzone.h>
33 #include <rte_config.h>
34 #include <rte_eal.h>
35 #include <rte_pci.h>
36 #include <rte_mbuf.h>
37 #include <rte_memory.h>
38 #include <rte_lcore.h>
39 #include <rte_launch.h>
40 #include <rte_ethdev.h>
41 #include <rte_debug.h>
42 #include <rte_common.h>
43 #include <rte_ether.h>
44 #include <rte_malloc.h>
45 #include <rte_cycles.h>
46 #include <rte_timer.h>
47 #include <rte_thash.h>
48 #include <rte_ip.h>
49 #include <rte_tcp.h>
50 #include <rte_udp.h>
51 
52 #include "ff_dpdk_if.h"
53 #include "ff_dpdk_pcap.h"
54 #include "ff_dpdk_kni.h"
55 #include "ff_config.h"
56 #include "ff_veth.h"
57 #include "ff_host_interface.h"
58 #include "ff_msg.h"
59 #include "ff_api.h"
60 
61 #define MEMPOOL_CACHE_SIZE 256
62 
63 #define ARP_RING_SIZE 2048
64 
65 #define MSG_RING_SIZE 32
66 
67 /*
68  * Configurable number of RX/TX ring descriptors
69  */
70 #define RX_QUEUE_SIZE 512
71 #define TX_QUEUE_SIZE 256
72 
73 #define MAX_PKT_BURST 32
74 #define BURST_TX_DRAIN_US 100 /* TX drain every ~100us */
75 
76 /*
77  * Try to avoid TX buffering if we have at least MAX_TX_BURST packets to send.
78  */
79 #define MAX_TX_BURST    (MAX_PKT_BURST / 2)
80 
81 #define NB_SOCKETS 8
82 
83 /* Configure how many packets ahead to prefetch, when reading packets */
84 #define PREFETCH_OFFSET    3
85 
86 #define MAX_RX_QUEUE_PER_LCORE 16
87 #define MAX_TX_QUEUE_PER_PORT RTE_MAX_ETHPORTS
88 #define MAX_RX_QUEUE_PER_PORT 128
89 
90 #define BITS_PER_HEX 4
91 
92 static int enable_kni;
93 static int kni_accept;
94 
95 static struct rte_timer freebsd_clock;
96 
97 // Mellanox Linux's driver key
98 static uint8_t default_rsskey_40bytes[40] = {
99     0xd1, 0x81, 0xc6, 0x2c, 0xf7, 0xf4, 0xdb, 0x5b,
100     0x19, 0x83, 0xa2, 0xfc, 0x94, 0x3e, 0x1a, 0xdb,
101     0xd9, 0x38, 0x9e, 0x6b, 0xd1, 0x03, 0x9c, 0x2c,
102     0xa7, 0x44, 0x99, 0xad, 0x59, 0x3d, 0x56, 0xd9,
103     0xf3, 0x25, 0x3c, 0x06, 0x2a, 0xdc, 0x1f, 0xfc
104 };
105 
106 static struct rte_eth_conf default_port_conf = {
107     .rxmode = {
108         .mq_mode = ETH_MQ_RX_RSS,
109         .max_rx_pkt_len = ETHER_MAX_LEN,
110         .split_hdr_size = 0, /**< hdr buf size */
111         .header_split   = 0, /**< Header Split disabled */
112         .hw_ip_checksum = 0, /**< IP checksum offload disabled */
113         .hw_vlan_filter = 0, /**< VLAN filtering disabled */
114         .hw_vlan_strip  = 0, /**< VLAN strip disabled. */
115         .hw_vlan_extend = 0, /**< Extended VLAN disabled. */
116         .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
117         .hw_strip_crc   = 0, /**< CRC stripped by hardware */
118         .enable_lro     = 0, /**< LRO disabled */
119     },
120     .rx_adv_conf = {
121         .rss_conf = {
122             .rss_key = default_rsskey_40bytes,
123             .rss_key_len = 40,
124             .rss_hf = ETH_RSS_PROTO_MASK,
125         },
126     },
127     .txmode = {
128         .mq_mode = ETH_MQ_TX_NONE,
129     },
130 };
131 
132 struct mbuf_table {
133     uint16_t len;
134     struct rte_mbuf *m_table[MAX_PKT_BURST];
135 };
136 
137 struct lcore_rx_queue {
138     uint8_t port_id;
139     uint8_t queue_id;
140 } __rte_cache_aligned;
141 
142 struct lcore_conf {
143     uint16_t proc_id;
144     uint16_t nb_procs;
145     uint16_t socket_id;
146     uint16_t nb_rx_queue;
147     uint16_t *lcore_proc;
148     struct lcore_rx_queue rx_queue_list[MAX_RX_QUEUE_PER_LCORE];
149     uint16_t tx_queue_id[RTE_MAX_ETHPORTS];
150     struct mbuf_table tx_mbufs[RTE_MAX_ETHPORTS];
151     char *pcap[RTE_MAX_ETHPORTS];
152 } __rte_cache_aligned;
153 
154 static struct lcore_conf lcore_conf;
155 
156 static struct rte_mempool *pktmbuf_pool[NB_SOCKETS];
157 
158 static struct rte_ring **arp_ring[RTE_MAX_LCORE];
159 
160 struct ff_msg_ring {
161     char ring_name[2][RTE_RING_NAMESIZE];
162     /* ring[0] for lcore recv msg, other send */
163     /* ring[1] for lcore send msg, other read */
164     struct rte_ring *ring[2];
165 } __rte_cache_aligned;
166 
167 static struct ff_msg_ring msg_ring[RTE_MAX_LCORE];
168 static struct rte_mempool *message_pool;
169 
170 struct ff_dpdk_if_context {
171     void *sc;
172     void *ifp;
173     uint16_t port_id;
174     struct ff_hw_features hw_features;
175 } __rte_cache_aligned;
176 
177 static struct ff_dpdk_if_context *veth_ctx[RTE_MAX_ETHPORTS];
178 
179 extern void ff_hardclock(void);
180 
181 static void
182 freebsd_hardclock_job(__rte_unused struct rte_timer *timer,
183     __rte_unused void *arg) {
184     ff_hardclock();
185 }
186 
187 struct ff_dpdk_if_context *
188 ff_dpdk_register_if(void *sc, void *ifp, struct ff_port_cfg *cfg)
189 {
190     struct ff_dpdk_if_context *ctx;
191 
192     ctx = calloc(1, sizeof(struct ff_dpdk_if_context));
193     if (ctx == NULL)
194         return NULL;
195 
196     ctx->sc = sc;
197     ctx->ifp = ifp;
198     ctx->port_id = cfg->port_id;
199     ctx->hw_features = cfg->hw_features;
200 
201     return ctx;
202 }
203 
204 void
205 ff_dpdk_deregister_if(struct ff_dpdk_if_context *ctx)
206 {
207     free(ctx);
208 }
209 
210 static void
211 check_all_ports_link_status(void)
212 {
213     #define CHECK_INTERVAL 100 /* 100ms */
214     #define MAX_CHECK_TIME 90  /* 9s (90 * 100ms) in total */
215 
216     uint8_t portid, count, all_ports_up, print_flag = 0;
217     struct rte_eth_link link;
218 
219     printf("\nChecking link status");
220     fflush(stdout);
221 
222     int i, nb_ports;
223     nb_ports = ff_global_cfg.dpdk.nb_ports;
224     for (count = 0; count <= MAX_CHECK_TIME; count++) {
225         all_ports_up = 1;
226         for (i = 0; i < nb_ports; i++) {
227             uint8_t portid = ff_global_cfg.dpdk.port_cfgs[i].port_id;
228             memset(&link, 0, sizeof(link));
229             rte_eth_link_get_nowait(portid, &link);
230 
231             /* print link status if flag set */
232             if (print_flag == 1) {
233                 if (link.link_status) {
234                     printf("Port %d Link Up - speed %u "
235                         "Mbps - %s\n", (int)portid,
236                         (unsigned)link.link_speed,
237                         (link.link_duplex == ETH_LINK_FULL_DUPLEX) ?
238                         ("full-duplex") : ("half-duplex\n"));
239                 } else {
240                     printf("Port %d Link Down\n", (int)portid);
241                 }
242                 continue;
243             }
244             /* clear all_ports_up flag if any link down */
245             if (link.link_status == 0) {
246                 all_ports_up = 0;
247                 break;
248             }
249         }
250 
251         /* after finally printing all link status, get out */
252         if (print_flag == 1)
253             break;
254 
255         if (all_ports_up == 0) {
256             printf(".");
257             fflush(stdout);
258             rte_delay_ms(CHECK_INTERVAL);
259         }
260 
261         /* set the print_flag if all ports up or timeout */
262         if (all_ports_up == 1 || count == (MAX_CHECK_TIME - 1)) {
263             print_flag = 1;
264             printf("done\n");
265         }
266     }
267 }
268 
269 static int
270 xdigit2val(unsigned char c)
271 {
272     int val;
273 
274     if (isdigit(c))
275         val = c - '0';
276     else if (isupper(c))
277         val = c - 'A' + 10;
278     else
279         val = c - 'a' + 10;
280     return val;
281 }
282 
283 static int
284 parse_lcore_mask(const char *coremask, uint16_t *lcore_proc,
285     uint16_t nb_procs)
286 {
287     int i, j, idx = 0;
288     unsigned count = 0;
289     char c;
290     int val;
291 
292     if (coremask == NULL)
293         return -1;
294 
295     /* Remove all blank characters ahead and after.
296      * Remove 0x/0X if exists.
297      */
298     while (isblank(*coremask))
299         coremask++;
300     if (coremask[0] == '0' && ((coremask[1] == 'x')
301         || (coremask[1] == 'X')))
302         coremask += 2;
303 
304     i = strlen(coremask);
305     while ((i > 0) && isblank(coremask[i - 1]))
306         i--;
307 
308     if (i == 0)
309         return -1;
310 
311     for (i = i - 1; i >= 0 && idx < RTE_MAX_LCORE && count < nb_procs; i--) {
312         c = coremask[i];
313         if (isxdigit(c) == 0) {
314             return -1;
315         }
316         val = xdigit2val(c);
317         for (j = 0; j < BITS_PER_HEX && idx < RTE_MAX_LCORE && count < nb_procs;
318             j++, idx++) {
319             if ((1 << j) & val) {
320                 if (!lcore_config[idx].detected) {
321                     RTE_LOG(ERR, EAL, "lcore %u unavailable\n", idx);
322                     return -1;
323                 }
324                 lcore_proc[count] = idx;
325                 count++;
326             }
327         }
328     }
329 
330     for (; i >= 0; i--)
331         if (coremask[i] != '0')
332             return -1;
333 
334     if (count < nb_procs)
335         return -1;
336 
337     return 0;
338 }
339 
340 static int
341 init_lcore_conf(void)
342 {
343     uint8_t nb_ports = rte_eth_dev_count();
344     if (nb_ports == 0) {
345         rte_exit(EXIT_FAILURE, "No probed ethernet devices\n");
346     }
347 
348     lcore_conf.proc_id = ff_global_cfg.dpdk.proc_id;
349     lcore_conf.nb_procs = ff_global_cfg.dpdk.nb_procs;
350     lcore_conf.lcore_proc = rte_zmalloc(NULL,
351         sizeof(uint16_t)*lcore_conf.nb_procs, 0);
352     if (lcore_conf.lcore_proc == NULL) {
353         rte_exit(EXIT_FAILURE, "rte_zmalloc lcore_proc failed\n");
354     }
355 
356     int ret = parse_lcore_mask(ff_global_cfg.dpdk.lcore_mask,
357         lcore_conf.lcore_proc, lcore_conf.nb_procs);
358     if (ret < 0) {
359         rte_exit(EXIT_FAILURE, "parse_lcore_mask failed:%s\n",
360             ff_global_cfg.dpdk.lcore_mask);
361     }
362 
363     uint16_t socket_id = 0;
364     if (ff_global_cfg.dpdk.numa_on) {
365         socket_id = rte_lcore_to_socket_id(rte_lcore_id());
366     }
367 
368     lcore_conf.socket_id = socket_id;
369 
370     /* Currently, proc id 1:1 map to rx/tx queue id per port. */
371     uint8_t port_id, enabled_ports = 0;
372     for (port_id = 0; port_id < nb_ports; port_id++) {
373         if (ff_global_cfg.dpdk.port_mask &&
374             (ff_global_cfg.dpdk.port_mask & (1 << port_id)) == 0) {
375             printf("\nSkipping disabled port %d\n", port_id);
376             continue;
377         }
378 
379         if (port_id >= ff_global_cfg.dpdk.nb_ports) {
380             printf("\nSkipping non-configured port %d\n", port_id);
381             break;
382         }
383 
384         uint16_t nb_rx_queue = lcore_conf.nb_rx_queue;
385         lcore_conf.rx_queue_list[nb_rx_queue].port_id = port_id;
386         lcore_conf.rx_queue_list[nb_rx_queue].queue_id = lcore_conf.proc_id;
387         lcore_conf.nb_rx_queue++;
388 
389         lcore_conf.tx_queue_id[port_id] = lcore_conf.proc_id;
390         lcore_conf.pcap[port_id] = ff_global_cfg.dpdk.port_cfgs[enabled_ports].pcap;
391 
392         ff_global_cfg.dpdk.port_cfgs[enabled_ports].port_id = port_id;
393 
394         enabled_ports++;
395     }
396 
397     ff_global_cfg.dpdk.nb_ports = enabled_ports;
398 
399     return 0;
400 }
401 
402 static int
403 init_mem_pool(void)
404 {
405     uint8_t nb_ports = ff_global_cfg.dpdk.nb_ports;
406     uint32_t nb_lcores = ff_global_cfg.dpdk.nb_procs;
407     uint32_t nb_tx_queue = nb_lcores;
408     uint32_t nb_rx_queue = lcore_conf.nb_rx_queue * nb_lcores;
409 
410     unsigned nb_mbuf = RTE_MAX (
411         (nb_rx_queue*RX_QUEUE_SIZE          +
412         nb_ports*nb_lcores*MAX_PKT_BURST    +
413         nb_ports*nb_tx_queue*TX_QUEUE_SIZE  +
414         nb_lcores*MEMPOOL_CACHE_SIZE),
415         (unsigned)8192);
416 
417     unsigned socketid = 0;
418     uint16_t i, lcore_id;
419     char s[64];
420     int numa_on = ff_global_cfg.dpdk.numa_on;
421 
422     for (i = 0; i < lcore_conf.nb_procs; i++) {
423         lcore_id = lcore_conf.lcore_proc[i];
424         if (numa_on) {
425             socketid = rte_lcore_to_socket_id(lcore_id);
426         }
427 
428         if (socketid >= NB_SOCKETS) {
429             rte_exit(EXIT_FAILURE, "Socket %d of lcore %u is out of range %d\n",
430                 socketid, i, NB_SOCKETS);
431         }
432 
433         if (pktmbuf_pool[socketid] != NULL) {
434             continue;
435         }
436 
437         if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
438             snprintf(s, sizeof(s), "mbuf_pool_%d", socketid);
439             pktmbuf_pool[socketid] =
440                 rte_pktmbuf_pool_create(s, nb_mbuf,
441                     MEMPOOL_CACHE_SIZE, 0,
442                     RTE_MBUF_DEFAULT_BUF_SIZE, socketid);
443         } else {
444             snprintf(s, sizeof(s), "mbuf_pool_%d", socketid);
445             pktmbuf_pool[socketid] = rte_mempool_lookup(s);
446         }
447 
448         if (pktmbuf_pool[socketid] == NULL) {
449             rte_exit(EXIT_FAILURE, "Cannot create mbuf pool on socket %d\n", socketid);
450         } else {
451             printf("create mbuf pool on socket %d\n", socketid);
452         }
453     }
454 
455     return 0;
456 }
457 
458 static struct rte_ring *
459 create_ring(const char *name, unsigned count, int socket_id, unsigned flags)
460 {
461     struct rte_ring *ring;
462 
463     if (name == NULL)
464         return NULL;
465 
466     /* If already create, just attached it */
467     if (likely((ring = rte_ring_lookup(name)) != NULL))
468         return ring;
469 
470     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
471         return rte_ring_create(name, count, socket_id, flags);
472     } else {
473         return rte_ring_lookup(name);
474     }
475 }
476 
477 static int
478 init_arp_ring(void)
479 {
480     int i, j, ret;
481     char name_buf[RTE_RING_NAMESIZE];
482     int nb_procs = ff_global_cfg.dpdk.nb_procs;
483     int proc_id = ff_global_cfg.dpdk.proc_id;
484 
485     /* Allocate arp ring ptr according to eth dev count. */
486     int nb_ports = rte_eth_dev_count();
487     for(i = 0; i < nb_procs; ++i) {
488         snprintf(name_buf, RTE_RING_NAMESIZE, "ring_ptr_%d_%d",
489             proc_id, i);
490 
491         arp_ring[i] = rte_zmalloc(name_buf,
492             sizeof(struct rte_ring *) * nb_ports,
493              RTE_CACHE_LINE_SIZE);
494         if (arp_ring[i] == NULL) {
495             rte_exit(EXIT_FAILURE, "rte_zmalloc(%s (struct rte_ring*)) "
496                 "failed\n", name_buf);
497         }
498     }
499 
500     unsigned socketid = lcore_conf.socket_id;
501 
502     /* Create ring according to ports actually being used. */
503     nb_ports = ff_global_cfg.dpdk.nb_ports;
504     for (j = 0; j < nb_ports; j++) {
505         uint8_t port_id = ff_global_cfg.dpdk.port_cfgs[j].port_id;
506 
507         for(i = 0; i < nb_procs; ++i) {
508             snprintf(name_buf, RTE_RING_NAMESIZE, "arp_ring_%d_%d", i, port_id);
509             arp_ring[i][port_id] = create_ring(name_buf, ARP_RING_SIZE,
510                 socketid, RING_F_SC_DEQ);
511 
512             if (arp_ring[i][port_id] == NULL)
513                 rte_panic("create ring:%s failed!\n", name_buf);
514 
515             printf("create ring:%s success, %u ring entries are now free!\n",
516                 name_buf, rte_ring_free_count(arp_ring[i][port_id]));
517         }
518     }
519 
520     return 0;
521 }
522 
523 static void
524 ff_msg_init(struct rte_mempool *mp,
525     __attribute__((unused)) void *opaque_arg,
526     void *obj, __attribute__((unused)) unsigned i)
527 {
528     struct ff_msg *msg = (struct ff_msg *)obj;
529     msg->msg_type = FF_UNKNOWN;
530     msg->buf_addr = (char *)msg + sizeof(struct ff_msg);
531     msg->buf_len = mp->elt_size - sizeof(struct ff_msg);
532 }
533 
534 static int
535 init_msg_ring(void)
536 {
537     uint16_t i;
538     uint16_t nb_procs = ff_global_cfg.dpdk.nb_procs;
539     unsigned socketid = lcore_conf.socket_id;
540 
541     /* Create message buffer pool */
542     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
543         message_pool = rte_mempool_create(FF_MSG_POOL,
544            MSG_RING_SIZE * 2 * nb_procs,
545            MAX_MSG_BUF_SIZE, MSG_RING_SIZE / 2, 0,
546            NULL, NULL, ff_msg_init, NULL,
547            socketid, 0);
548     } else {
549         message_pool = rte_mempool_lookup(FF_MSG_POOL);
550     }
551 
552     if (message_pool == NULL) {
553         rte_panic("Create msg mempool failed\n");
554     }
555 
556     for(i = 0; i < nb_procs; ++i) {
557         snprintf(msg_ring[i].ring_name[0], RTE_RING_NAMESIZE,
558             "%s%u", FF_MSG_RING_IN, i);
559         snprintf(msg_ring[i].ring_name[1], RTE_RING_NAMESIZE,
560             "%s%u", FF_MSG_RING_OUT, i);
561 
562         msg_ring[i].ring[0] = create_ring(msg_ring[i].ring_name[0],
563             MSG_RING_SIZE, socketid, RING_F_SP_ENQ | RING_F_SC_DEQ);
564         if (msg_ring[i].ring[0] == NULL)
565             rte_panic("create ring::%s failed!\n", msg_ring[i].ring_name[0]);
566 
567         msg_ring[i].ring[1] = create_ring(msg_ring[i].ring_name[1],
568             MSG_RING_SIZE, socketid, RING_F_SP_ENQ | RING_F_SC_DEQ);
569         if (msg_ring[i].ring[1] == NULL)
570             rte_panic("create ring::%s failed!\n", msg_ring[i].ring_name[0]);
571     }
572 
573     return 0;
574 }
575 
576 static int
577 init_kni(void)
578 {
579     int nb_ports = rte_eth_dev_count();
580     kni_accept = 0;
581     if(strcasecmp(ff_global_cfg.kni.method, "accept") == 0)
582         kni_accept = 1;
583 
584     ff_kni_init(nb_ports, ff_global_cfg.kni.tcp_port,
585         ff_global_cfg.kni.udp_port);
586 
587     unsigned socket_id = lcore_conf.socket_id;
588     struct rte_mempool *mbuf_pool = pktmbuf_pool[socket_id];
589 
590     nb_ports = ff_global_cfg.dpdk.nb_ports;
591     int i, ret;
592     for (i = 0; i < nb_ports; i++) {
593         uint8_t port_id = ff_global_cfg.dpdk.port_cfgs[i].port_id;
594         ff_kni_alloc(port_id, socket_id, mbuf_pool);
595     }
596 
597     return 0;
598 }
599 
600 static int
601 init_port_start(void)
602 {
603     int nb_ports = ff_global_cfg.dpdk.nb_ports;
604     uint16_t nb_procs = ff_global_cfg.dpdk.nb_procs;
605     unsigned socketid = rte_lcore_to_socket_id(rte_lcore_id());
606     struct rte_mempool *mbuf_pool = pktmbuf_pool[socketid];
607     uint16_t i;
608 
609     for (i = 0; i < nb_ports; i++) {
610         uint8_t port_id = ff_global_cfg.dpdk.port_cfgs[i].port_id;
611 
612         struct rte_eth_dev_info dev_info;
613         rte_eth_dev_info_get(port_id, &dev_info);
614 
615         if (nb_procs > dev_info.max_rx_queues) {
616             rte_exit(EXIT_FAILURE, "num_procs[%d] bigger than max_rx_queues[%d]\n",
617                 nb_procs,
618                 dev_info.max_rx_queues);
619         }
620 
621         if (nb_procs > dev_info.max_tx_queues) {
622             rte_exit(EXIT_FAILURE, "num_procs[%d] bigger than max_tx_queues[%d]\n",
623                 nb_procs,
624                 dev_info.max_tx_queues);
625         }
626 
627         struct ether_addr addr;
628         rte_eth_macaddr_get(port_id, &addr);
629         printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
630                    " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
631                 (unsigned)port_id,
632                 addr.addr_bytes[0], addr.addr_bytes[1],
633                 addr.addr_bytes[2], addr.addr_bytes[3],
634                 addr.addr_bytes[4], addr.addr_bytes[5]);
635 
636         rte_memcpy(ff_global_cfg.dpdk.port_cfgs[i].mac,
637             addr.addr_bytes, ETHER_ADDR_LEN);
638 
639         /* Clear txq_flags - we do not need multi-mempool and refcnt */
640         dev_info.default_txconf.txq_flags = ETH_TXQ_FLAGS_NOMULTMEMP |
641             ETH_TXQ_FLAGS_NOREFCOUNT;
642 
643         /* Disable features that are not supported by port's HW */
644         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM)) {
645             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMUDP;
646         }
647 
648         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)) {
649             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMTCP;
650         }
651 
652         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_SCTP_CKSUM)) {
653             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMSCTP;
654         }
655 
656         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_VLAN_INSERT)) {
657             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOVLANOFFL;
658         }
659 
660         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_VLAN_INSERT)) {
661             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOVLANOFFL;
662         }
663 
664         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_TSO) &&
665             !(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_TSO)) {
666             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOMULTSEGS;
667         }
668 
669         struct rte_eth_conf port_conf = {0};
670 
671         /* Set RSS mode */
672         port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
673         port_conf.rx_adv_conf.rss_conf.rss_hf = ETH_RSS_PROTO_MASK;
674         port_conf.rx_adv_conf.rss_conf.rss_key = default_rsskey_40bytes;
675         port_conf.rx_adv_conf.rss_conf.rss_key_len = 40;
676 
677         /* Set Rx VLAN stripping */
678         if (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_VLAN_STRIP) {
679             port_conf.rxmode.hw_vlan_strip = 1;
680         }
681 
682         /* Enable HW CRC stripping */
683         port_conf.rxmode.hw_strip_crc = 1;
684 
685         /* FIXME: Enable TCP LRO ?*/
686         #if 0
687         if (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_LRO) {
688             printf("LRO is supported\n");
689             port_conf.rxmode.enable_lro = 1;
690             ff_global_cfg.dpdk.port_cfgs[i].hw_features.rx_lro = 1;
691         }
692         #endif
693 
694         /* Set Rx checksum checking */
695         if ((dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
696             (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_UDP_CKSUM) &&
697             (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)) {
698             printf("RX checksum offload supported\n");
699             port_conf.rxmode.hw_ip_checksum = 1;
700             ff_global_cfg.dpdk.port_cfgs[i].hw_features.rx_csum = 1;
701         }
702 
703         if ((dev_info.tx_offload_capa & DEV_TX_OFFLOAD_IPV4_CKSUM)) {
704             printf("TX ip checksum offload supported\n");
705             ff_global_cfg.dpdk.port_cfgs[i].hw_features.tx_csum_ip = 1;
706         }
707 
708         if ((dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM) &&
709             (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)) {
710             printf("TX TCP&UDP checksum offload supported\n");
711             ff_global_cfg.dpdk.port_cfgs[i].hw_features.tx_csum_l4 = 1;
712         }
713 
714         if (ff_global_cfg.dpdk.tso) {
715             if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_TSO) {
716                 printf("TSO is supported\n");
717                 ff_global_cfg.dpdk.port_cfgs[i].hw_features.tx_tso = 1;
718             }
719         } else {
720             printf("TSO is disabled\n");
721         }
722 
723         if (rte_eal_process_type() != RTE_PROC_PRIMARY) {
724             continue;
725         }
726 
727         /* Currently, proc id 1:1 map to queue id per port. */
728         int ret = rte_eth_dev_configure(port_id, nb_procs, nb_procs, &port_conf);
729         if (ret != 0) {
730             return ret;
731         }
732 
733         uint16_t q;
734         for (q = 0; q < nb_procs; q++) {
735             ret = rte_eth_tx_queue_setup(port_id, q, TX_QUEUE_SIZE,
736                 socketid, &dev_info.default_txconf);
737             if (ret < 0) {
738                 return ret;
739             }
740 
741             ret = rte_eth_rx_queue_setup(port_id, q, RX_QUEUE_SIZE,
742                 socketid, &dev_info.default_rxconf, mbuf_pool);
743             if (ret < 0) {
744                 return ret;
745             }
746         }
747 
748         ret = rte_eth_dev_start(port_id);
749         if (ret < 0) {
750             return ret;
751         }
752 
753         /* Enable RX in promiscuous mode for the Ethernet device. */
754         if (ff_global_cfg.dpdk.promiscuous) {
755             rte_eth_promiscuous_enable(port_id);
756             ret = rte_eth_promiscuous_get(port_id);
757             if (ret == 1) {
758                 printf("set port %u to promiscuous mode ok\n", port_id);
759             } else {
760                 printf("set port %u to promiscuous mode error\n", port_id);
761             }
762         }
763 
764         /* Enable pcap dump */
765         if (ff_global_cfg.dpdk.port_cfgs[i].pcap) {
766             ff_enable_pcap(ff_global_cfg.dpdk.port_cfgs[i].pcap);
767         }
768     }
769 
770     return 0;
771 }
772 
773 static int
774 init_freebsd_clock(void)
775 {
776     rte_timer_subsystem_init();
777     uint64_t hz = rte_get_timer_hz();
778     uint64_t intrs = MS_PER_S/ff_global_cfg.freebsd.hz;
779     uint64_t tsc = (hz + MS_PER_S - 1) / MS_PER_S*intrs;
780 
781     rte_timer_init(&freebsd_clock);
782     rte_timer_reset(&freebsd_clock, tsc, PERIODICAL,
783         rte_lcore_id(), &freebsd_hardclock_job, NULL);
784 
785     return 0;
786 }
787 
788 int
789 ff_dpdk_init(int argc, char **argv)
790 {
791     if (ff_global_cfg.dpdk.nb_procs < 1 ||
792         ff_global_cfg.dpdk.nb_procs > RTE_MAX_LCORE ||
793         ff_global_cfg.dpdk.proc_id >= ff_global_cfg.dpdk.nb_procs ||
794         ff_global_cfg.dpdk.proc_id < 0) {
795         printf("param num_procs[%d] or proc_id[%d] error!\n",
796             ff_global_cfg.dpdk.nb_procs,
797             ff_global_cfg.dpdk.proc_id);
798         exit(1);
799     }
800 
801     int ret = rte_eal_init(argc, argv);
802     if (ret < 0) {
803         rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
804     }
805 
806     init_lcore_conf();
807 
808     init_mem_pool();
809 
810     init_arp_ring();
811 
812     init_msg_ring();
813 
814     enable_kni = ff_global_cfg.kni.enable;
815     if (enable_kni) {
816         init_kni();
817     }
818 
819     ret = init_port_start();
820     if (ret < 0) {
821         rte_exit(EXIT_FAILURE, "init_port_start failed\n");
822     }
823 
824     check_all_ports_link_status();
825 
826     init_freebsd_clock();
827 
828     return 0;
829 }
830 
831 static void
832 ff_veth_input(const struct ff_dpdk_if_context *ctx, struct rte_mbuf *pkt)
833 {
834     uint8_t rx_csum = ctx->hw_features.rx_csum;
835     if (rx_csum) {
836         if (pkt->ol_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD)) {
837             return;
838         }
839     }
840 
841     /*
842      * FIXME: should we save pkt->vlan_tci
843      * if (pkt->ol_flags & PKT_RX_VLAN_PKT)
844      */
845 
846     void *data = rte_pktmbuf_mtod(pkt, void*);
847     uint16_t len = rte_pktmbuf_data_len(pkt);
848 
849     void *hdr = ff_mbuf_gethdr(pkt, pkt->pkt_len, data, len, rx_csum);
850     if (hdr == NULL) {
851         rte_pktmbuf_free(pkt);
852         return;
853     }
854 
855     struct rte_mbuf *pn = pkt->next;
856     void *prev = hdr;
857     while(pn != NULL) {
858         data = rte_pktmbuf_mtod(pkt, void*);
859         len = rte_pktmbuf_data_len(pkt);
860 
861         void *mb = ff_mbuf_get(prev, data, len);
862         if (mb == NULL) {
863             ff_mbuf_free(hdr);
864             rte_pktmbuf_free(pkt);
865             return;
866         }
867         pn = pn->next;
868         prev = mb;
869     }
870 
871     ff_veth_process_packet(ctx->ifp, hdr);
872 }
873 
874 static enum FilterReturn
875 protocol_filter(const void *data, uint16_t len)
876 {
877     if(len < sizeof(struct ether_hdr))
878         return FILTER_UNKNOWN;
879 
880     const struct ether_hdr *hdr;
881     hdr = (const struct ether_hdr *)data;
882 
883     if(ntohs(hdr->ether_type) == ETHER_TYPE_ARP)
884         return FILTER_ARP;
885 
886     if (!enable_kni) {
887         return FILTER_UNKNOWN;
888     }
889 
890     if(ntohs(hdr->ether_type) != ETHER_TYPE_IPv4)
891         return FILTER_UNKNOWN;
892 
893     return ff_kni_proto_filter(data + sizeof(struct ether_hdr),
894         len - sizeof(struct ether_hdr));
895 }
896 
897 static inline void
898 process_packets(uint8_t port_id, uint16_t queue_id, struct rte_mbuf **bufs,
899     uint16_t count, const struct ff_dpdk_if_context *ctx, int pkts_from_ring)
900 {
901     struct lcore_conf *qconf = &lcore_conf;
902 
903     uint16_t i;
904     for (i = 0; i < count; i++) {
905         struct rte_mbuf *rtem = bufs[i];
906 
907         if (unlikely(qconf->pcap[port_id] != NULL)) {
908             ff_dump_packets(qconf->pcap[port_id], rtem);
909         }
910 
911         void *data = rte_pktmbuf_mtod(rtem, void*);
912         uint16_t len = rte_pktmbuf_data_len(rtem);
913 
914         enum FilterReturn filter = protocol_filter(data, len);
915         if (filter == FILTER_ARP) {
916             struct rte_mempool *mbuf_pool;
917             struct rte_mbuf *mbuf_clone;
918             if (pkts_from_ring == 0) {
919                 uint16_t i;
920                 for(i = 0; i < qconf->nb_procs; ++i) {
921                     if(i == queue_id)
922                         continue;
923 
924                     mbuf_pool = pktmbuf_pool[rte_lcore_to_socket_id(qconf->lcore_proc[i])];
925                     mbuf_clone = rte_pktmbuf_clone(rtem, mbuf_pool);
926                     if(mbuf_clone) {
927                         int ret = rte_ring_enqueue(arp_ring[i][port_id], mbuf_clone);
928                         if (ret < 0)
929                             rte_pktmbuf_free(mbuf_clone);
930                     }
931                 }
932             }
933 
934             if (enable_kni && rte_eal_process_type() == RTE_PROC_PRIMARY) {
935                 mbuf_pool = pktmbuf_pool[qconf->socket_id];
936                 mbuf_clone = rte_pktmbuf_clone(rtem, mbuf_pool);
937                 if(mbuf_clone) {
938                     ff_kni_enqueue(port_id, rtem);
939                 }
940             }
941 
942             ff_veth_input(ctx, rtem);
943         } else if (enable_kni && ((filter == FILTER_KNI && kni_accept) ||
944             (filter == FILTER_UNKNOWN && !kni_accept)) ) {
945             ff_kni_enqueue(port_id, rtem);
946         } else {
947             ff_veth_input(ctx, rtem);
948         }
949     }
950 }
951 
952 static inline int
953 process_arp_ring(uint8_t port_id, uint16_t queue_id,
954     struct rte_mbuf **pkts_burst, const struct ff_dpdk_if_context *ctx)
955 {
956     /* read packet from ring buf and to process */
957     uint16_t nb_rb;
958     nb_rb = rte_ring_dequeue_burst(arp_ring[queue_id][port_id],
959         (void **)pkts_burst, MAX_PKT_BURST);
960 
961     if(nb_rb > 0) {
962         process_packets(port_id, queue_id, pkts_burst, nb_rb, ctx, 1);
963     }
964 
965     return 0;
966 }
967 
968 static inline void
969 handle_sysctl_msg(struct ff_msg *msg, uint16_t proc_id)
970 {
971     int ret = ff_sysctl(msg->sysctl.name, msg->sysctl.namelen,
972         msg->sysctl.old, msg->sysctl.oldlenp, msg->sysctl.new,
973         msg->sysctl.newlen);
974 
975     if (ret < 0) {
976         msg->result = errno;
977     } else {
978         msg->result = 0;
979     }
980 
981     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
982 }
983 
984 static inline void
985 handle_ioctl_msg(struct ff_msg *msg, uint16_t proc_id)
986 {
987     int fd, ret;
988     fd = ff_socket(AF_INET, SOCK_DGRAM, 0);
989     if (fd < 0) {
990         ret = -1;
991         goto done;
992     }
993 
994     ret = ff_ioctl(fd, msg->ioctl.cmd, msg->ioctl.data);
995 
996     ff_close(fd);
997 
998 done:
999     if (ret < 0) {
1000         msg->result = errno;
1001     } else {
1002         msg->result = 0;
1003     }
1004 
1005     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
1006 }
1007 
1008 static inline void
1009 handle_route_msg(struct ff_msg *msg, uint16_t proc_id)
1010 {
1011     msg->result = ff_rtioctl(msg->route.fib, msg->route.data,
1012         &msg->route.len, msg->route.maxlen);
1013 
1014     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
1015 }
1016 
1017 static inline void
1018 handle_default_msg(struct ff_msg *msg, uint16_t proc_id)
1019 {
1020     msg->result = EINVAL;
1021     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
1022 }
1023 
1024 static inline void
1025 handle_msg(struct ff_msg *msg, uint16_t proc_id)
1026 {
1027     switch (msg->msg_type) {
1028         case FF_SYSCTL:
1029             handle_sysctl_msg(msg, proc_id);
1030             break;
1031         case FF_IOCTL:
1032             handle_ioctl_msg(msg, proc_id);
1033             break;
1034         case FF_ROUTE:
1035             handle_route_msg(msg, proc_id);
1036             break;
1037         default:
1038             handle_default_msg(msg, proc_id);
1039             break;
1040     }
1041 }
1042 
1043 static inline int
1044 process_msg_ring(uint16_t proc_id)
1045 {
1046     void *msg;
1047     int ret = rte_ring_dequeue(msg_ring[proc_id].ring[0], &msg);
1048 
1049     if (unlikely(ret == 0)) {
1050         handle_msg((struct ff_msg *)msg, proc_id);
1051     }
1052 
1053     return 0;
1054 }
1055 
1056 /* Send burst of packets on an output interface */
1057 static inline int
1058 send_burst(struct lcore_conf *qconf, uint16_t n, uint8_t port)
1059 {
1060     struct rte_mbuf **m_table;
1061     int ret;
1062     uint16_t queueid;
1063 
1064     queueid = qconf->tx_queue_id[port];
1065     m_table = (struct rte_mbuf **)qconf->tx_mbufs[port].m_table;
1066 
1067     if (unlikely(qconf->pcap[port] != NULL)) {
1068         uint16_t i;
1069         for (i = 0; i < n; i++) {
1070             ff_dump_packets(qconf->pcap[port], m_table[i]);
1071         }
1072     }
1073 
1074     ret = rte_eth_tx_burst(port, queueid, m_table, n);
1075     if (unlikely(ret < n)) {
1076         do {
1077             rte_pktmbuf_free(m_table[ret]);
1078         } while (++ret < n);
1079     }
1080 
1081     return 0;
1082 }
1083 
1084 /* Enqueue a single packet, and send burst if queue is filled */
1085 static inline int
1086 send_single_packet(struct rte_mbuf *m, uint8_t port)
1087 {
1088     uint16_t len;
1089     struct lcore_conf *qconf;
1090 
1091     qconf = &lcore_conf;
1092     len = qconf->tx_mbufs[port].len;
1093     qconf->tx_mbufs[port].m_table[len] = m;
1094     len++;
1095 
1096     /* enough pkts to be sent */
1097     if (unlikely(len == MAX_PKT_BURST)) {
1098         send_burst(qconf, MAX_PKT_BURST, port);
1099         len = 0;
1100     }
1101 
1102     qconf->tx_mbufs[port].len = len;
1103     return 0;
1104 }
1105 
1106 int
1107 ff_dpdk_if_send(struct ff_dpdk_if_context *ctx, void *m,
1108     int total)
1109 {
1110     struct rte_mempool *mbuf_pool = pktmbuf_pool[lcore_conf.socket_id];
1111     struct rte_mbuf *head = rte_pktmbuf_alloc(mbuf_pool);
1112     if (head == NULL) {
1113         ff_mbuf_free(m);
1114         return -1;
1115     }
1116 
1117     head->pkt_len = total;
1118     head->nb_segs = 0;
1119 
1120     int off = 0;
1121     struct rte_mbuf *cur = head, *prev = NULL;
1122     while(total > 0) {
1123         if (cur == NULL) {
1124             cur = rte_pktmbuf_alloc(mbuf_pool);
1125             if (cur == NULL) {
1126                 rte_pktmbuf_free(head);
1127                 ff_mbuf_free(m);
1128                 return -1;
1129             }
1130         }
1131 
1132         void *data = rte_pktmbuf_mtod(cur, void*);
1133         int len = total > RTE_MBUF_DEFAULT_DATAROOM ? RTE_MBUF_DEFAULT_DATAROOM : total;
1134         int ret = ff_mbuf_copydata(m, data, off, len);
1135         if (ret < 0) {
1136             rte_pktmbuf_free(head);
1137             ff_mbuf_free(m);
1138             return -1;
1139         }
1140 
1141         if (prev != NULL) {
1142             prev->next = cur;
1143         }
1144         prev = cur;
1145 
1146         cur->data_len = len;
1147         off += len;
1148         total -= len;
1149         head->nb_segs++;
1150         cur = NULL;
1151     }
1152 
1153     struct ff_tx_offload offload = {0};
1154     ff_mbuf_tx_offload(m, &offload);
1155 
1156     if (offload.ip_csum) {
1157         head->ol_flags |= PKT_TX_IP_CKSUM;
1158         head->l2_len = sizeof(struct ether_hdr);
1159         head->l3_len = sizeof(struct ipv4_hdr);
1160     }
1161 
1162     if (ctx->hw_features.tx_csum_l4) {
1163         if (offload.tcp_csum) {
1164             head->ol_flags |= PKT_TX_TCP_CKSUM;
1165             head->l2_len = sizeof(struct ether_hdr);
1166             head->l3_len = sizeof(struct ipv4_hdr);
1167         }
1168 
1169         if (offload.tso_seg_size) {
1170             head->ol_flags |= PKT_TX_TCP_SEG;
1171             head->l4_len = sizeof(struct tcp_hdr);
1172             head->tso_segsz = offload.tso_seg_size;
1173         }
1174 
1175         if (offload.udp_csum) {
1176             head->ol_flags |= PKT_TX_UDP_CKSUM;
1177             head->l2_len = sizeof(struct ether_hdr);
1178             head->l3_len = sizeof(struct ipv4_hdr);
1179         }
1180     }
1181 
1182     ff_mbuf_free(m);
1183 
1184     return send_single_packet(head, ctx->port_id);
1185 }
1186 
1187 static int
1188 main_loop(void *arg)
1189 {
1190     struct loop_routine *lr = (struct loop_routine *)arg;
1191 
1192     struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
1193     unsigned lcore_id;
1194     uint64_t prev_tsc, diff_tsc, cur_tsc;
1195     int i, j, nb_rx;
1196     uint8_t port_id, queue_id;
1197     struct lcore_conf *qconf;
1198     const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) /
1199         US_PER_S * BURST_TX_DRAIN_US;
1200     struct ff_dpdk_if_context *ctx;
1201 
1202     prev_tsc = 0;
1203 
1204     lcore_id = rte_lcore_id();
1205     qconf = &lcore_conf;
1206 
1207     if (qconf->nb_rx_queue == 0) {
1208         printf("lcore %u has nothing to do\n", lcore_id);
1209         return 0;
1210     }
1211 
1212     while (1) {
1213         cur_tsc = rte_rdtsc();
1214         if (unlikely(freebsd_clock.expire < cur_tsc)) {
1215             rte_timer_manage();
1216         }
1217 
1218         /*
1219          * TX burst queue drain
1220          */
1221         diff_tsc = cur_tsc - prev_tsc;
1222         if (unlikely(diff_tsc > drain_tsc)) {
1223             /*
1224              * This could be optimized (use queueid instead of
1225              * portid), but it is not called so often
1226              */
1227             for (port_id = 0; port_id < RTE_MAX_ETHPORTS; port_id++) {
1228                 if (qconf->tx_mbufs[port_id].len == 0)
1229                     continue;
1230                 send_burst(qconf,
1231                     qconf->tx_mbufs[port_id].len,
1232                     port_id);
1233                 qconf->tx_mbufs[port_id].len = 0;
1234             }
1235 
1236             prev_tsc = cur_tsc;
1237         }
1238 
1239         /*
1240          * Read packet from RX queues
1241          */
1242         for (i = 0; i < qconf->nb_rx_queue; ++i) {
1243             port_id = qconf->rx_queue_list[i].port_id;
1244             queue_id = qconf->rx_queue_list[i].queue_id;
1245             ctx = veth_ctx[port_id];
1246 
1247             if (enable_kni && rte_eal_process_type() == RTE_PROC_PRIMARY) {
1248                 ff_kni_process(port_id, queue_id, pkts_burst, MAX_PKT_BURST);
1249             }
1250 
1251             process_arp_ring(port_id, queue_id, pkts_burst, ctx);
1252 
1253             nb_rx = rte_eth_rx_burst(port_id, queue_id, pkts_burst,
1254                 MAX_PKT_BURST);
1255             if (nb_rx == 0)
1256                 continue;
1257 
1258             /* Prefetch first packets */
1259             for (j = 0; j < PREFETCH_OFFSET && j < nb_rx; j++) {
1260                 rte_prefetch0(rte_pktmbuf_mtod(
1261                         pkts_burst[j], void *));
1262             }
1263 
1264             /* Prefetch and handle already prefetched packets */
1265             for (j = 0; j < (nb_rx - PREFETCH_OFFSET); j++) {
1266                 rte_prefetch0(rte_pktmbuf_mtod(pkts_burst[
1267                         j + PREFETCH_OFFSET], void *));
1268                 process_packets(port_id, queue_id, &pkts_burst[j], 1, ctx, 0);
1269             }
1270 
1271             /* Handle remaining prefetched packets */
1272             for (; j < nb_rx; j++) {
1273                 process_packets(port_id, queue_id, &pkts_burst[j], 1, ctx, 0);
1274             }
1275         }
1276 
1277         process_msg_ring(qconf->proc_id);
1278 
1279         if (likely(lr->loop != NULL)) {
1280             lr->loop(lr->arg);
1281         }
1282     }
1283 }
1284 
1285 int
1286 ff_dpdk_if_up(void) {
1287     int nb_ports = ff_global_cfg.dpdk.nb_ports;
1288     int i;
1289     for (i = 0; i < nb_ports; i++) {
1290         uint8_t port_id = ff_global_cfg.dpdk.port_cfgs[i].port_id;
1291         veth_ctx[port_id] = ff_veth_attach(ff_global_cfg.dpdk.port_cfgs + i);
1292         if (veth_ctx[port_id] == NULL) {
1293             rte_exit(EXIT_FAILURE, "ff_veth_attach failed");
1294         }
1295     }
1296 
1297     return 0;
1298 }
1299 
1300 void
1301 ff_dpdk_run(loop_func_t loop, void *arg) {
1302     struct loop_routine *lr = malloc(sizeof(struct loop_routine));
1303     lr->loop = loop;
1304     lr->arg = arg;
1305     rte_eal_mp_remote_launch(main_loop, lr, CALL_MASTER);
1306     rte_eal_mp_wait_lcore();
1307     free(lr);
1308 }
1309 
1310 void
1311 ff_dpdk_pktmbuf_free(void *m)
1312 {
1313     rte_pktmbuf_free((struct rte_mbuf *)m);
1314 }
1315 
1316 static uint32_t
1317 toeplitz_hash(unsigned keylen, const uint8_t *key,
1318     unsigned datalen, const uint8_t *data)
1319 {
1320     uint32_t hash = 0, v;
1321     u_int i, b;
1322 
1323     /* XXXRW: Perhaps an assertion about key length vs. data length? */
1324 
1325     v = (key[0]<<24) + (key[1]<<16) + (key[2] <<8) + key[3];
1326     for (i = 0; i < datalen; i++) {
1327         for (b = 0; b < 8; b++) {
1328             if (data[i] & (1<<(7-b)))
1329                 hash ^= v;
1330             v <<= 1;
1331             if ((i + 4) < keylen &&
1332                 (key[i+4] & (1<<(7-b))))
1333                 v |= 1;
1334         }
1335     }
1336     return (hash);
1337 }
1338 
1339 int
1340 ff_rss_check(uint32_t saddr, uint32_t daddr, uint16_t sport, uint16_t dport)
1341 {
1342     struct lcore_conf *qconf = &lcore_conf;
1343 
1344     if (qconf->nb_procs == 1) {
1345         return 1;
1346     }
1347 
1348     uint8_t data[sizeof(saddr) + sizeof(daddr) + sizeof(sport) +
1349         sizeof(dport)];
1350 
1351     unsigned datalen = 0;
1352 
1353     bcopy(&saddr, &data[datalen], sizeof(saddr));
1354     datalen += sizeof(saddr);
1355 
1356     bcopy(&daddr, &data[datalen], sizeof(daddr));
1357     datalen += sizeof(daddr);
1358 
1359     bcopy(&sport, &data[datalen], sizeof(sport));
1360     datalen += sizeof(sport);
1361 
1362     bcopy(&dport, &data[datalen], sizeof(dport));
1363     datalen += sizeof(dport);
1364 
1365     uint32_t hash = toeplitz_hash(sizeof(default_rsskey_40bytes), default_rsskey_40bytes, datalen, data);
1366 
1367     return (hash % qconf->nb_procs) == qconf->proc_id;
1368 }
1369 
1370 
1371