xref: /f-stack/lib/ff_dpdk_if.c (revision a02c88d6)
1 /*
2  * Copyright (C) 2017 THL A29 Limited, a Tencent company.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * 1. Redistributions of source code must retain the above copyright notice, this
9  *   list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright notice,
11  *   this list of conditions and the following disclaimer in the documentation
12  *   and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17  * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
18  * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
19  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
20  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
21  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
22  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
23  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24  *
25  */
26 
27 #include <rte_common.h>
28 #include <rte_byteorder.h>
29 #include <rte_log.h>
30 #include <rte_memory.h>
31 #include <rte_memcpy.h>
32 #include <rte_memzone.h>
33 #include <rte_config.h>
34 #include <rte_eal.h>
35 #include <rte_pci.h>
36 #include <rte_mbuf.h>
37 #include <rte_memory.h>
38 #include <rte_lcore.h>
39 #include <rte_launch.h>
40 #include <rte_ethdev.h>
41 #include <rte_debug.h>
42 #include <rte_common.h>
43 #include <rte_ether.h>
44 #include <rte_malloc.h>
45 #include <rte_cycles.h>
46 #include <rte_timer.h>
47 #include <rte_thash.h>
48 #include <rte_ip.h>
49 #include <rte_tcp.h>
50 #include <rte_udp.h>
51 
52 #include "ff_dpdk_if.h"
53 #include "ff_dpdk_pcap.h"
54 #include "ff_dpdk_kni.h"
55 #include "ff_config.h"
56 #include "ff_veth.h"
57 #include "ff_host_interface.h"
58 #include "ff_msg.h"
59 #include "ff_api.h"
60 
61 #define MEMPOOL_CACHE_SIZE 256
62 
63 #define ARP_RING_SIZE 2048
64 
65 #define MSG_RING_SIZE 32
66 
67 /*
68  * Configurable number of RX/TX ring descriptors
69  */
70 #define RX_QUEUE_SIZE 512
71 #define TX_QUEUE_SIZE 256
72 
73 #define MAX_PKT_BURST 32
74 #define BURST_TX_DRAIN_US 100 /* TX drain every ~100us */
75 
76 /*
77  * Try to avoid TX buffering if we have at least MAX_TX_BURST packets to send.
78  */
79 #define MAX_TX_BURST    (MAX_PKT_BURST / 2)
80 
81 #define NB_SOCKETS 8
82 
83 /* Configure how many packets ahead to prefetch, when reading packets */
84 #define PREFETCH_OFFSET    3
85 
86 #define MAX_RX_QUEUE_PER_LCORE 16
87 #define MAX_TX_QUEUE_PER_PORT RTE_MAX_ETHPORTS
88 #define MAX_RX_QUEUE_PER_PORT 128
89 
90 #define KNI_MBUF_MAX 2048
91 #define KNI_QUEUE_SIZE 2048
92 
93 static int enable_kni;
94 static int kni_accept;
95 
96 static struct rte_timer freebsd_clock;
97 
98 // Mellanox Linux's driver key
99 static uint8_t default_rsskey_40bytes[40] = {
100     0xd1, 0x81, 0xc6, 0x2c, 0xf7, 0xf4, 0xdb, 0x5b,
101     0x19, 0x83, 0xa2, 0xfc, 0x94, 0x3e, 0x1a, 0xdb,
102     0xd9, 0x38, 0x9e, 0x6b, 0xd1, 0x03, 0x9c, 0x2c,
103     0xa7, 0x44, 0x99, 0xad, 0x59, 0x3d, 0x56, 0xd9,
104     0xf3, 0x25, 0x3c, 0x06, 0x2a, 0xdc, 0x1f, 0xfc
105 };
106 
107 static struct rte_eth_conf default_port_conf = {
108     .rxmode = {
109         .mq_mode = ETH_MQ_RX_RSS,
110         .max_rx_pkt_len = ETHER_MAX_LEN,
111         .split_hdr_size = 0, /**< hdr buf size */
112         .header_split   = 0, /**< Header Split disabled */
113         .hw_ip_checksum = 0, /**< IP checksum offload disabled */
114         .hw_vlan_filter = 0, /**< VLAN filtering disabled */
115         .hw_vlan_strip  = 0, /**< VLAN strip disabled. */
116         .hw_vlan_extend = 0, /**< Extended VLAN disabled. */
117         .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
118         .hw_strip_crc   = 0, /**< CRC stripped by hardware */
119         .enable_lro     = 0, /**< LRO disabled */
120     },
121     .rx_adv_conf = {
122         .rss_conf = {
123             .rss_key = default_rsskey_40bytes,
124             .rss_key_len = 40,
125             .rss_hf = ETH_RSS_PROTO_MASK,
126         },
127     },
128     .txmode = {
129         .mq_mode = ETH_MQ_TX_NONE,
130     },
131 };
132 
133 struct mbuf_table {
134     uint16_t len;
135     struct rte_mbuf *m_table[MAX_PKT_BURST];
136 };
137 
138 struct lcore_rx_queue {
139     uint8_t port_id;
140     uint8_t queue_id;
141 } __rte_cache_aligned;
142 
143 struct lcore_conf {
144     uint16_t proc_id;
145     uint16_t nb_procs;
146     uint16_t socket_id;
147     uint16_t nb_rx_queue;
148     uint16_t *proc_lcore;
149     struct lcore_rx_queue rx_queue_list[MAX_RX_QUEUE_PER_LCORE];
150     uint16_t tx_queue_id[RTE_MAX_ETHPORTS];
151     struct mbuf_table tx_mbufs[RTE_MAX_ETHPORTS];
152     char *pcap[RTE_MAX_ETHPORTS];
153 } __rte_cache_aligned;
154 
155 static struct lcore_conf lcore_conf;
156 
157 static struct rte_mempool *pktmbuf_pool[NB_SOCKETS];
158 
159 static struct rte_ring **arp_ring[RTE_MAX_LCORE];
160 
161 struct ff_msg_ring {
162     char ring_name[2][RTE_RING_NAMESIZE];
163     /* ring[0] for lcore recv msg, other send */
164     /* ring[1] for lcore send msg, other read */
165     struct rte_ring *ring[2];
166 } __rte_cache_aligned;
167 
168 static struct ff_msg_ring msg_ring[RTE_MAX_LCORE];
169 static struct rte_mempool *message_pool;
170 
171 struct ff_dpdk_if_context {
172     void *sc;
173     void *ifp;
174     uint16_t port_id;
175     struct ff_hw_features hw_features;
176 } __rte_cache_aligned;
177 
178 static struct ff_dpdk_if_context *veth_ctx[RTE_MAX_ETHPORTS];
179 
180 extern void ff_hardclock(void);
181 
182 static void
183 ff_hardclock_job(__rte_unused struct rte_timer *timer,
184     __rte_unused void *arg) {
185     ff_hardclock();
186     ff_update_current_ts();
187 }
188 
189 struct ff_dpdk_if_context *
190 ff_dpdk_register_if(void *sc, void *ifp, struct ff_port_cfg *cfg)
191 {
192     struct ff_dpdk_if_context *ctx;
193 
194     ctx = calloc(1, sizeof(struct ff_dpdk_if_context));
195     if (ctx == NULL)
196         return NULL;
197 
198     ctx->sc = sc;
199     ctx->ifp = ifp;
200     ctx->port_id = cfg->port_id;
201     ctx->hw_features = cfg->hw_features;
202 
203     return ctx;
204 }
205 
206 void
207 ff_dpdk_deregister_if(struct ff_dpdk_if_context *ctx)
208 {
209     free(ctx);
210 }
211 
212 static void
213 check_all_ports_link_status(void)
214 {
215     #define CHECK_INTERVAL 100 /* 100ms */
216     #define MAX_CHECK_TIME 90  /* 9s (90 * 100ms) in total */
217 
218     uint8_t portid, count, all_ports_up, print_flag = 0;
219     struct rte_eth_link link;
220 
221     printf("\nChecking link status");
222     fflush(stdout);
223 
224     int i, nb_ports;
225     nb_ports = ff_global_cfg.dpdk.nb_ports;
226     for (count = 0; count <= MAX_CHECK_TIME; count++) {
227         all_ports_up = 1;
228         for (i = 0; i < nb_ports; i++) {
229             uint8_t portid = ff_global_cfg.dpdk.port_cfgs[i].port_id;
230             memset(&link, 0, sizeof(link));
231             rte_eth_link_get_nowait(portid, &link);
232 
233             /* print link status if flag set */
234             if (print_flag == 1) {
235                 if (link.link_status) {
236                     printf("Port %d Link Up - speed %u "
237                         "Mbps - %s\n", (int)portid,
238                         (unsigned)link.link_speed,
239                         (link.link_duplex == ETH_LINK_FULL_DUPLEX) ?
240                         ("full-duplex") : ("half-duplex\n"));
241                 } else {
242                     printf("Port %d Link Down\n", (int)portid);
243                 }
244                 continue;
245             }
246             /* clear all_ports_up flag if any link down */
247             if (link.link_status == 0) {
248                 all_ports_up = 0;
249                 break;
250             }
251         }
252 
253         /* after finally printing all link status, get out */
254         if (print_flag == 1)
255             break;
256 
257         if (all_ports_up == 0) {
258             printf(".");
259             fflush(stdout);
260             rte_delay_ms(CHECK_INTERVAL);
261         }
262 
263         /* set the print_flag if all ports up or timeout */
264         if (all_ports_up == 1 || count == (MAX_CHECK_TIME - 1)) {
265             print_flag = 1;
266             printf("done\n");
267         }
268     }
269 }
270 
271 static int
272 init_lcore_conf(void)
273 {
274     uint8_t nb_ports = rte_eth_dev_count();
275     if (nb_ports == 0) {
276         rte_exit(EXIT_FAILURE, "No probed ethernet devices\n");
277     }
278 
279     lcore_conf.proc_id = ff_global_cfg.dpdk.proc_id;
280     lcore_conf.nb_procs = ff_global_cfg.dpdk.nb_procs;
281 
282     lcore_conf.proc_lcore = rte_zmalloc(NULL,
283         sizeof(uint16_t) * lcore_conf.nb_procs, 0);
284     if (lcore_conf.proc_lcore == NULL) {
285         rte_exit(EXIT_FAILURE, "rte_zmalloc proc_lcore failed\n");
286     }
287     rte_memcpy(lcore_conf.proc_lcore, ff_global_cfg.dpdk.proc_lcore,
288         sizeof(uint16_t) * lcore_conf.nb_procs);
289     uint16_t proc_id;
290     for (proc_id = 0; proc_id < lcore_conf.nb_procs; proc_id++) {
291         uint16_t lcore_id = lcore_conf.proc_lcore[proc_id];
292         if (!lcore_config[lcore_id].detected) {
293             rte_exit(EXIT_FAILURE, "lcore %u unavailable\n", lcore_id);
294         }
295     }
296 
297     uint16_t socket_id = 0;
298     if (ff_global_cfg.dpdk.numa_on) {
299         socket_id = rte_lcore_to_socket_id(rte_lcore_id());
300     }
301 
302     lcore_conf.socket_id = socket_id;
303 
304     /* Currently, proc id 1:1 map to rx/tx queue id per port. */
305     uint8_t port_id, enabled_ports = 0;
306     for (port_id = 0; port_id < nb_ports; port_id++) {
307         if (ff_global_cfg.dpdk.port_mask &&
308             (ff_global_cfg.dpdk.port_mask & (1 << port_id)) == 0) {
309             printf("\nSkipping disabled port %d\n", port_id);
310             continue;
311         }
312 
313         if (port_id >= ff_global_cfg.dpdk.nb_ports) {
314             printf("\nSkipping non-configured port %d\n", port_id);
315             break;
316         }
317 
318         uint16_t nb_rx_queue = lcore_conf.nb_rx_queue;
319         lcore_conf.rx_queue_list[nb_rx_queue].port_id = port_id;
320         lcore_conf.rx_queue_list[nb_rx_queue].queue_id = lcore_conf.proc_id;
321         lcore_conf.nb_rx_queue++;
322 
323         lcore_conf.tx_queue_id[port_id] = lcore_conf.proc_id;
324         lcore_conf.pcap[port_id] = ff_global_cfg.dpdk.port_cfgs[enabled_ports].pcap;
325 
326         ff_global_cfg.dpdk.port_cfgs[enabled_ports].port_id = port_id;
327 
328         enabled_ports++;
329     }
330 
331     ff_global_cfg.dpdk.nb_ports = enabled_ports;
332 
333     return 0;
334 }
335 
336 static int
337 init_mem_pool(void)
338 {
339     uint8_t nb_ports = ff_global_cfg.dpdk.nb_ports;
340     uint32_t nb_lcores = ff_global_cfg.dpdk.nb_procs;
341     uint32_t nb_tx_queue = nb_lcores;
342     uint32_t nb_rx_queue = lcore_conf.nb_rx_queue * nb_lcores;
343 
344     unsigned nb_mbuf = RTE_MAX (
345         (nb_rx_queue*RX_QUEUE_SIZE          +
346         nb_ports*nb_lcores*MAX_PKT_BURST    +
347         nb_ports*nb_tx_queue*TX_QUEUE_SIZE  +
348         nb_lcores*MEMPOOL_CACHE_SIZE +
349         nb_ports*KNI_MBUF_MAX +
350         nb_ports*KNI_QUEUE_SIZE +
351         nb_lcores*nb_ports*ARP_RING_SIZE),
352         (unsigned)8192);
353 
354     unsigned socketid = 0;
355     uint16_t i, lcore_id;
356     char s[64];
357     int numa_on = ff_global_cfg.dpdk.numa_on;
358 
359     for (i = 0; i < lcore_conf.nb_procs; i++) {
360         lcore_id = lcore_conf.proc_lcore[i];
361         if (numa_on) {
362             socketid = rte_lcore_to_socket_id(lcore_id);
363         }
364 
365         if (socketid >= NB_SOCKETS) {
366             rte_exit(EXIT_FAILURE, "Socket %d of lcore %u is out of range %d\n",
367                 socketid, i, NB_SOCKETS);
368         }
369 
370         if (pktmbuf_pool[socketid] != NULL) {
371             continue;
372         }
373 
374         if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
375             snprintf(s, sizeof(s), "mbuf_pool_%d", socketid);
376             pktmbuf_pool[socketid] =
377                 rte_pktmbuf_pool_create(s, nb_mbuf,
378                     MEMPOOL_CACHE_SIZE, 0,
379                     RTE_MBUF_DEFAULT_BUF_SIZE, socketid);
380         } else {
381             snprintf(s, sizeof(s), "mbuf_pool_%d", socketid);
382             pktmbuf_pool[socketid] = rte_mempool_lookup(s);
383         }
384 
385         if (pktmbuf_pool[socketid] == NULL) {
386             rte_exit(EXIT_FAILURE, "Cannot create mbuf pool on socket %d\n", socketid);
387         } else {
388             printf("create mbuf pool on socket %d\n", socketid);
389         }
390     }
391 
392     return 0;
393 }
394 
395 static struct rte_ring *
396 create_ring(const char *name, unsigned count, int socket_id, unsigned flags)
397 {
398     struct rte_ring *ring;
399 
400     if (name == NULL)
401         return NULL;
402 
403     /* If already create, just attached it */
404     if (likely((ring = rte_ring_lookup(name)) != NULL))
405         return ring;
406 
407     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
408         return rte_ring_create(name, count, socket_id, flags);
409     } else {
410         return rte_ring_lookup(name);
411     }
412 }
413 
414 static int
415 init_arp_ring(void)
416 {
417     int i, j, ret;
418     char name_buf[RTE_RING_NAMESIZE];
419     int nb_procs = ff_global_cfg.dpdk.nb_procs;
420     int proc_id = ff_global_cfg.dpdk.proc_id;
421 
422     /* Allocate arp ring ptr according to eth dev count. */
423     int nb_ports = rte_eth_dev_count();
424     for(i = 0; i < nb_procs; ++i) {
425         snprintf(name_buf, RTE_RING_NAMESIZE, "ring_ptr_%d_%d",
426             proc_id, i);
427 
428         arp_ring[i] = rte_zmalloc(name_buf,
429             sizeof(struct rte_ring *) * nb_ports,
430              RTE_CACHE_LINE_SIZE);
431         if (arp_ring[i] == NULL) {
432             rte_exit(EXIT_FAILURE, "rte_zmalloc(%s (struct rte_ring*)) "
433                 "failed\n", name_buf);
434         }
435     }
436 
437     unsigned socketid = lcore_conf.socket_id;
438 
439     /* Create ring according to ports actually being used. */
440     nb_ports = ff_global_cfg.dpdk.nb_ports;
441     for (j = 0; j < nb_ports; j++) {
442         uint8_t port_id = ff_global_cfg.dpdk.port_cfgs[j].port_id;
443 
444         for(i = 0; i < nb_procs; ++i) {
445             snprintf(name_buf, RTE_RING_NAMESIZE, "arp_ring_%d_%d", i, port_id);
446             arp_ring[i][port_id] = create_ring(name_buf, ARP_RING_SIZE,
447                 socketid, RING_F_SC_DEQ);
448 
449             if (arp_ring[i][port_id] == NULL)
450                 rte_panic("create ring:%s failed!\n", name_buf);
451 
452             printf("create ring:%s success, %u ring entries are now free!\n",
453                 name_buf, rte_ring_free_count(arp_ring[i][port_id]));
454         }
455     }
456 
457     return 0;
458 }
459 
460 static void
461 ff_msg_init(struct rte_mempool *mp,
462     __attribute__((unused)) void *opaque_arg,
463     void *obj, __attribute__((unused)) unsigned i)
464 {
465     struct ff_msg *msg = (struct ff_msg *)obj;
466     msg->msg_type = FF_UNKNOWN;
467     msg->buf_addr = (char *)msg + sizeof(struct ff_msg);
468     msg->buf_len = mp->elt_size - sizeof(struct ff_msg);
469 }
470 
471 static int
472 init_msg_ring(void)
473 {
474     uint16_t i;
475     uint16_t nb_procs = ff_global_cfg.dpdk.nb_procs;
476     unsigned socketid = lcore_conf.socket_id;
477 
478     /* Create message buffer pool */
479     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
480         message_pool = rte_mempool_create(FF_MSG_POOL,
481            MSG_RING_SIZE * 2 * nb_procs,
482            MAX_MSG_BUF_SIZE, MSG_RING_SIZE / 2, 0,
483            NULL, NULL, ff_msg_init, NULL,
484            socketid, 0);
485     } else {
486         message_pool = rte_mempool_lookup(FF_MSG_POOL);
487     }
488 
489     if (message_pool == NULL) {
490         rte_panic("Create msg mempool failed\n");
491     }
492 
493     for(i = 0; i < nb_procs; ++i) {
494         snprintf(msg_ring[i].ring_name[0], RTE_RING_NAMESIZE,
495             "%s%u", FF_MSG_RING_IN, i);
496         snprintf(msg_ring[i].ring_name[1], RTE_RING_NAMESIZE,
497             "%s%u", FF_MSG_RING_OUT, i);
498 
499         msg_ring[i].ring[0] = create_ring(msg_ring[i].ring_name[0],
500             MSG_RING_SIZE, socketid, RING_F_SP_ENQ | RING_F_SC_DEQ);
501         if (msg_ring[i].ring[0] == NULL)
502             rte_panic("create ring::%s failed!\n", msg_ring[i].ring_name[0]);
503 
504         msg_ring[i].ring[1] = create_ring(msg_ring[i].ring_name[1],
505             MSG_RING_SIZE, socketid, RING_F_SP_ENQ | RING_F_SC_DEQ);
506         if (msg_ring[i].ring[1] == NULL)
507             rte_panic("create ring::%s failed!\n", msg_ring[i].ring_name[0]);
508     }
509 
510     return 0;
511 }
512 
513 static int
514 init_kni(void)
515 {
516     int nb_ports = rte_eth_dev_count();
517     kni_accept = 0;
518     if(strcasecmp(ff_global_cfg.kni.method, "accept") == 0)
519         kni_accept = 1;
520 
521     ff_kni_init(nb_ports, ff_global_cfg.kni.tcp_port,
522         ff_global_cfg.kni.udp_port);
523 
524     unsigned socket_id = lcore_conf.socket_id;
525     struct rte_mempool *mbuf_pool = pktmbuf_pool[socket_id];
526 
527     nb_ports = ff_global_cfg.dpdk.nb_ports;
528     int i, ret;
529     for (i = 0; i < nb_ports; i++) {
530         uint8_t port_id = ff_global_cfg.dpdk.port_cfgs[i].port_id;
531         ff_kni_alloc(port_id, socket_id, mbuf_pool, KNI_QUEUE_SIZE);
532     }
533 
534     return 0;
535 }
536 
537 static int
538 init_port_start(void)
539 {
540     int nb_ports = ff_global_cfg.dpdk.nb_ports;
541     uint16_t nb_procs = ff_global_cfg.dpdk.nb_procs;
542     unsigned socketid = rte_lcore_to_socket_id(rte_lcore_id());
543     struct rte_mempool *mbuf_pool = pktmbuf_pool[socketid];
544     uint16_t i;
545 
546     for (i = 0; i < nb_ports; i++) {
547         uint8_t port_id = ff_global_cfg.dpdk.port_cfgs[i].port_id;
548 
549         struct rte_eth_dev_info dev_info;
550         rte_eth_dev_info_get(port_id, &dev_info);
551 
552         if (nb_procs > dev_info.max_rx_queues) {
553             rte_exit(EXIT_FAILURE, "num_procs[%d] bigger than max_rx_queues[%d]\n",
554                 nb_procs,
555                 dev_info.max_rx_queues);
556         }
557 
558         if (nb_procs > dev_info.max_tx_queues) {
559             rte_exit(EXIT_FAILURE, "num_procs[%d] bigger than max_tx_queues[%d]\n",
560                 nb_procs,
561                 dev_info.max_tx_queues);
562         }
563 
564         struct ether_addr addr;
565         rte_eth_macaddr_get(port_id, &addr);
566         printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
567                    " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
568                 (unsigned)port_id,
569                 addr.addr_bytes[0], addr.addr_bytes[1],
570                 addr.addr_bytes[2], addr.addr_bytes[3],
571                 addr.addr_bytes[4], addr.addr_bytes[5]);
572 
573         rte_memcpy(ff_global_cfg.dpdk.port_cfgs[i].mac,
574             addr.addr_bytes, ETHER_ADDR_LEN);
575 
576         /* Clear txq_flags - we do not need multi-mempool and refcnt */
577         dev_info.default_txconf.txq_flags = ETH_TXQ_FLAGS_NOMULTMEMP |
578             ETH_TXQ_FLAGS_NOREFCOUNT;
579 
580         /* Disable features that are not supported by port's HW */
581         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM)) {
582             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMUDP;
583         }
584 
585         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)) {
586             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMTCP;
587         }
588 
589         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_SCTP_CKSUM)) {
590             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMSCTP;
591         }
592 
593         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_VLAN_INSERT)) {
594             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOVLANOFFL;
595         }
596 
597         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_VLAN_INSERT)) {
598             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOVLANOFFL;
599         }
600 
601         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_TSO) &&
602             !(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_TSO)) {
603             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOMULTSEGS;
604         }
605 
606         struct rte_eth_conf port_conf = {0};
607 
608         /* Set RSS mode */
609         port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
610         port_conf.rx_adv_conf.rss_conf.rss_hf = ETH_RSS_PROTO_MASK;
611         port_conf.rx_adv_conf.rss_conf.rss_key = default_rsskey_40bytes;
612         port_conf.rx_adv_conf.rss_conf.rss_key_len = 40;
613 
614         /* Set Rx VLAN stripping */
615         if (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_VLAN_STRIP) {
616             port_conf.rxmode.hw_vlan_strip = 1;
617         }
618 
619         /* Enable HW CRC stripping */
620         port_conf.rxmode.hw_strip_crc = 1;
621 
622         /* FIXME: Enable TCP LRO ?*/
623         #if 0
624         if (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_LRO) {
625             printf("LRO is supported\n");
626             port_conf.rxmode.enable_lro = 1;
627             ff_global_cfg.dpdk.port_cfgs[i].hw_features.rx_lro = 1;
628         }
629         #endif
630 
631         /* Set Rx checksum checking */
632         if ((dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
633             (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_UDP_CKSUM) &&
634             (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)) {
635             printf("RX checksum offload supported\n");
636             port_conf.rxmode.hw_ip_checksum = 1;
637             ff_global_cfg.dpdk.port_cfgs[i].hw_features.rx_csum = 1;
638         }
639 
640         if ((dev_info.tx_offload_capa & DEV_TX_OFFLOAD_IPV4_CKSUM)) {
641             printf("TX ip checksum offload supported\n");
642             ff_global_cfg.dpdk.port_cfgs[i].hw_features.tx_csum_ip = 1;
643         }
644 
645         if ((dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM) &&
646             (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)) {
647             printf("TX TCP&UDP checksum offload supported\n");
648             ff_global_cfg.dpdk.port_cfgs[i].hw_features.tx_csum_l4 = 1;
649         }
650 
651         if (ff_global_cfg.dpdk.tso) {
652             if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_TSO) {
653                 printf("TSO is supported\n");
654                 ff_global_cfg.dpdk.port_cfgs[i].hw_features.tx_tso = 1;
655             }
656         } else {
657             printf("TSO is disabled\n");
658         }
659 
660         if (rte_eal_process_type() != RTE_PROC_PRIMARY) {
661             continue;
662         }
663 
664         /* Currently, proc id 1:1 map to queue id per port. */
665         int ret = rte_eth_dev_configure(port_id, nb_procs, nb_procs, &port_conf);
666         if (ret != 0) {
667             return ret;
668         }
669 
670         uint16_t q;
671         for (q = 0; q < nb_procs; q++) {
672             ret = rte_eth_tx_queue_setup(port_id, q, TX_QUEUE_SIZE,
673                 socketid, &dev_info.default_txconf);
674             if (ret < 0) {
675                 return ret;
676             }
677 
678             ret = rte_eth_rx_queue_setup(port_id, q, RX_QUEUE_SIZE,
679                 socketid, &dev_info.default_rxconf, mbuf_pool);
680             if (ret < 0) {
681                 return ret;
682             }
683         }
684 
685         ret = rte_eth_dev_start(port_id);
686         if (ret < 0) {
687             return ret;
688         }
689 
690         /* Enable RX in promiscuous mode for the Ethernet device. */
691         if (ff_global_cfg.dpdk.promiscuous) {
692             rte_eth_promiscuous_enable(port_id);
693             ret = rte_eth_promiscuous_get(port_id);
694             if (ret == 1) {
695                 printf("set port %u to promiscuous mode ok\n", port_id);
696             } else {
697                 printf("set port %u to promiscuous mode error\n", port_id);
698             }
699         }
700 
701         /* Enable pcap dump */
702         if (ff_global_cfg.dpdk.port_cfgs[i].pcap) {
703             ff_enable_pcap(ff_global_cfg.dpdk.port_cfgs[i].pcap);
704         }
705     }
706 
707     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
708         check_all_ports_link_status();
709     }
710 
711     return 0;
712 }
713 
714 static int
715 init_clock(void)
716 {
717     rte_timer_subsystem_init();
718     uint64_t hz = rte_get_timer_hz();
719     uint64_t intrs = MS_PER_S/ff_global_cfg.freebsd.hz;
720     uint64_t tsc = (hz + MS_PER_S - 1) / MS_PER_S*intrs;
721 
722     rte_timer_init(&freebsd_clock);
723     rte_timer_reset(&freebsd_clock, tsc, PERIODICAL,
724         rte_lcore_id(), &ff_hardclock_job, NULL);
725 
726     ff_update_current_ts();
727 
728     return 0;
729 }
730 
731 int
732 ff_dpdk_init(int argc, char **argv)
733 {
734     if (ff_global_cfg.dpdk.nb_procs < 1 ||
735         ff_global_cfg.dpdk.nb_procs > RTE_MAX_LCORE ||
736         ff_global_cfg.dpdk.proc_id >= ff_global_cfg.dpdk.nb_procs ||
737         ff_global_cfg.dpdk.proc_id < 0) {
738         printf("param num_procs[%d] or proc_id[%d] error!\n",
739             ff_global_cfg.dpdk.nb_procs,
740             ff_global_cfg.dpdk.proc_id);
741         exit(1);
742     }
743 
744     int ret = rte_eal_init(argc, argv);
745     if (ret < 0) {
746         rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
747     }
748 
749     init_lcore_conf();
750 
751     init_mem_pool();
752 
753     init_arp_ring();
754 
755     init_msg_ring();
756 
757     enable_kni = ff_global_cfg.kni.enable;
758     if (enable_kni) {
759         init_kni();
760     }
761 
762     ret = init_port_start();
763     if (ret < 0) {
764         rte_exit(EXIT_FAILURE, "init_port_start failed\n");
765     }
766 
767     init_clock();
768 
769     return 0;
770 }
771 
772 static void
773 ff_veth_input(const struct ff_dpdk_if_context *ctx, struct rte_mbuf *pkt)
774 {
775     uint8_t rx_csum = ctx->hw_features.rx_csum;
776     if (rx_csum) {
777         if (pkt->ol_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD)) {
778             return;
779         }
780     }
781 
782     /*
783      * FIXME: should we save pkt->vlan_tci
784      * if (pkt->ol_flags & PKT_RX_VLAN_PKT)
785      */
786 
787     void *data = rte_pktmbuf_mtod(pkt, void*);
788     uint16_t len = rte_pktmbuf_data_len(pkt);
789 
790     void *hdr = ff_mbuf_gethdr(pkt, pkt->pkt_len, data, len, rx_csum);
791     if (hdr == NULL) {
792         rte_pktmbuf_free(pkt);
793         return;
794     }
795 
796     struct rte_mbuf *pn = pkt->next;
797     void *prev = hdr;
798     while(pn != NULL) {
799         data = rte_pktmbuf_mtod(pkt, void*);
800         len = rte_pktmbuf_data_len(pkt);
801 
802         void *mb = ff_mbuf_get(prev, data, len);
803         if (mb == NULL) {
804             ff_mbuf_free(hdr);
805             rte_pktmbuf_free(pkt);
806             return;
807         }
808         pn = pn->next;
809         prev = mb;
810     }
811 
812     ff_veth_process_packet(ctx->ifp, hdr);
813 }
814 
815 static enum FilterReturn
816 protocol_filter(const void *data, uint16_t len)
817 {
818     if(len < sizeof(struct ether_hdr))
819         return FILTER_UNKNOWN;
820 
821     const struct ether_hdr *hdr;
822     hdr = (const struct ether_hdr *)data;
823 
824     if(ntohs(hdr->ether_type) == ETHER_TYPE_ARP)
825         return FILTER_ARP;
826 
827     if (!enable_kni) {
828         return FILTER_UNKNOWN;
829     }
830 
831     if(ntohs(hdr->ether_type) != ETHER_TYPE_IPv4)
832         return FILTER_UNKNOWN;
833 
834     return ff_kni_proto_filter(data + sizeof(struct ether_hdr),
835         len - sizeof(struct ether_hdr));
836 }
837 
838 static inline void
839 process_packets(uint8_t port_id, uint16_t queue_id, struct rte_mbuf **bufs,
840     uint16_t count, const struct ff_dpdk_if_context *ctx, int pkts_from_ring)
841 {
842     struct lcore_conf *qconf = &lcore_conf;
843 
844     uint16_t i;
845     for (i = 0; i < count; i++) {
846         struct rte_mbuf *rtem = bufs[i];
847 
848         if (unlikely(qconf->pcap[port_id] != NULL)) {
849             ff_dump_packets(qconf->pcap[port_id], rtem);
850         }
851 
852         void *data = rte_pktmbuf_mtod(rtem, void*);
853         uint16_t len = rte_pktmbuf_data_len(rtem);
854 
855         enum FilterReturn filter = protocol_filter(data, len);
856         if (filter == FILTER_ARP) {
857             struct rte_mempool *mbuf_pool;
858             struct rte_mbuf *mbuf_clone;
859             if (pkts_from_ring == 0) {
860                 uint16_t i;
861                 for(i = 0; i < qconf->nb_procs; ++i) {
862                     if(i == queue_id)
863                         continue;
864 
865                     mbuf_pool = pktmbuf_pool[rte_lcore_to_socket_id(qconf->proc_lcore[i])];
866                     mbuf_clone = rte_pktmbuf_clone(rtem, mbuf_pool);
867                     if(mbuf_clone) {
868                         int ret = rte_ring_enqueue(arp_ring[i][port_id], mbuf_clone);
869                         if (ret < 0)
870                             rte_pktmbuf_free(mbuf_clone);
871                     }
872                 }
873             }
874 
875             if (enable_kni && rte_eal_process_type() == RTE_PROC_PRIMARY) {
876                 mbuf_pool = pktmbuf_pool[qconf->socket_id];
877                 mbuf_clone = rte_pktmbuf_clone(rtem, mbuf_pool);
878                 if(mbuf_clone) {
879                     ff_kni_enqueue(port_id, mbuf_clone);
880                 }
881             }
882 
883             ff_veth_input(ctx, rtem);
884         } else if (enable_kni && ((filter == FILTER_KNI && kni_accept) ||
885             (filter == FILTER_UNKNOWN && !kni_accept)) ) {
886             ff_kni_enqueue(port_id, rtem);
887         } else {
888             ff_veth_input(ctx, rtem);
889         }
890     }
891 }
892 
893 static inline int
894 process_arp_ring(uint8_t port_id, uint16_t queue_id,
895     struct rte_mbuf **pkts_burst, const struct ff_dpdk_if_context *ctx)
896 {
897     /* read packet from ring buf and to process */
898     uint16_t nb_rb;
899     nb_rb = rte_ring_dequeue_burst(arp_ring[queue_id][port_id],
900         (void **)pkts_burst, MAX_PKT_BURST);
901 
902     if(nb_rb > 0) {
903         process_packets(port_id, queue_id, pkts_burst, nb_rb, ctx, 1);
904     }
905 
906     return 0;
907 }
908 
909 static inline void
910 handle_sysctl_msg(struct ff_msg *msg, uint16_t proc_id)
911 {
912     int ret = ff_sysctl(msg->sysctl.name, msg->sysctl.namelen,
913         msg->sysctl.old, msg->sysctl.oldlenp, msg->sysctl.new,
914         msg->sysctl.newlen);
915 
916     if (ret < 0) {
917         msg->result = errno;
918     } else {
919         msg->result = 0;
920     }
921 
922     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
923 }
924 
925 static inline void
926 handle_ioctl_msg(struct ff_msg *msg, uint16_t proc_id)
927 {
928     int fd, ret;
929     fd = ff_socket(AF_INET, SOCK_DGRAM, 0);
930     if (fd < 0) {
931         ret = -1;
932         goto done;
933     }
934 
935     ret = ff_ioctl(fd, msg->ioctl.cmd, msg->ioctl.data);
936 
937     ff_close(fd);
938 
939 done:
940     if (ret < 0) {
941         msg->result = errno;
942     } else {
943         msg->result = 0;
944     }
945 
946     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
947 }
948 
949 static inline void
950 handle_route_msg(struct ff_msg *msg, uint16_t proc_id)
951 {
952     msg->result = ff_rtioctl(msg->route.fib, msg->route.data,
953         &msg->route.len, msg->route.maxlen);
954 
955     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
956 }
957 
958 static inline void
959 handle_default_msg(struct ff_msg *msg, uint16_t proc_id)
960 {
961     msg->result = EINVAL;
962     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
963 }
964 
965 static inline void
966 handle_msg(struct ff_msg *msg, uint16_t proc_id)
967 {
968     switch (msg->msg_type) {
969         case FF_SYSCTL:
970             handle_sysctl_msg(msg, proc_id);
971             break;
972         case FF_IOCTL:
973             handle_ioctl_msg(msg, proc_id);
974             break;
975         case FF_ROUTE:
976             handle_route_msg(msg, proc_id);
977             break;
978         default:
979             handle_default_msg(msg, proc_id);
980             break;
981     }
982 }
983 
984 static inline int
985 process_msg_ring(uint16_t proc_id)
986 {
987     void *msg;
988     int ret = rte_ring_dequeue(msg_ring[proc_id].ring[0], &msg);
989 
990     if (unlikely(ret == 0)) {
991         handle_msg((struct ff_msg *)msg, proc_id);
992     }
993 
994     return 0;
995 }
996 
997 /* Send burst of packets on an output interface */
998 static inline int
999 send_burst(struct lcore_conf *qconf, uint16_t n, uint8_t port)
1000 {
1001     struct rte_mbuf **m_table;
1002     int ret;
1003     uint16_t queueid;
1004 
1005     queueid = qconf->tx_queue_id[port];
1006     m_table = (struct rte_mbuf **)qconf->tx_mbufs[port].m_table;
1007 
1008     if (unlikely(qconf->pcap[port] != NULL)) {
1009         uint16_t i;
1010         for (i = 0; i < n; i++) {
1011             ff_dump_packets(qconf->pcap[port], m_table[i]);
1012         }
1013     }
1014 
1015     ret = rte_eth_tx_burst(port, queueid, m_table, n);
1016     if (unlikely(ret < n)) {
1017         do {
1018             rte_pktmbuf_free(m_table[ret]);
1019         } while (++ret < n);
1020     }
1021 
1022     return 0;
1023 }
1024 
1025 /* Enqueue a single packet, and send burst if queue is filled */
1026 static inline int
1027 send_single_packet(struct rte_mbuf *m, uint8_t port)
1028 {
1029     uint16_t len;
1030     struct lcore_conf *qconf;
1031 
1032     qconf = &lcore_conf;
1033     len = qconf->tx_mbufs[port].len;
1034     qconf->tx_mbufs[port].m_table[len] = m;
1035     len++;
1036 
1037     /* enough pkts to be sent */
1038     if (unlikely(len == MAX_PKT_BURST)) {
1039         send_burst(qconf, MAX_PKT_BURST, port);
1040         len = 0;
1041     }
1042 
1043     qconf->tx_mbufs[port].len = len;
1044     return 0;
1045 }
1046 
1047 int
1048 ff_dpdk_if_send(struct ff_dpdk_if_context *ctx, void *m,
1049     int total)
1050 {
1051     struct rte_mempool *mbuf_pool = pktmbuf_pool[lcore_conf.socket_id];
1052     struct rte_mbuf *head = rte_pktmbuf_alloc(mbuf_pool);
1053     if (head == NULL) {
1054         ff_mbuf_free(m);
1055         return -1;
1056     }
1057 
1058     head->pkt_len = total;
1059     head->nb_segs = 0;
1060 
1061     int off = 0;
1062     struct rte_mbuf *cur = head, *prev = NULL;
1063     while(total > 0) {
1064         if (cur == NULL) {
1065             cur = rte_pktmbuf_alloc(mbuf_pool);
1066             if (cur == NULL) {
1067                 rte_pktmbuf_free(head);
1068                 ff_mbuf_free(m);
1069                 return -1;
1070             }
1071         }
1072 
1073         void *data = rte_pktmbuf_mtod(cur, void*);
1074         int len = total > RTE_MBUF_DEFAULT_DATAROOM ? RTE_MBUF_DEFAULT_DATAROOM : total;
1075         int ret = ff_mbuf_copydata(m, data, off, len);
1076         if (ret < 0) {
1077             rte_pktmbuf_free(head);
1078             ff_mbuf_free(m);
1079             return -1;
1080         }
1081 
1082         if (prev != NULL) {
1083             prev->next = cur;
1084         }
1085         prev = cur;
1086 
1087         cur->data_len = len;
1088         off += len;
1089         total -= len;
1090         head->nb_segs++;
1091         cur = NULL;
1092     }
1093 
1094     struct ff_tx_offload offload = {0};
1095     ff_mbuf_tx_offload(m, &offload);
1096 
1097     if (offload.ip_csum) {
1098         head->ol_flags |= PKT_TX_IP_CKSUM;
1099         head->l2_len = sizeof(struct ether_hdr);
1100         head->l3_len = sizeof(struct ipv4_hdr);
1101     }
1102 
1103     if (ctx->hw_features.tx_csum_l4) {
1104         if (offload.tcp_csum) {
1105             head->ol_flags |= PKT_TX_TCP_CKSUM;
1106             head->l2_len = sizeof(struct ether_hdr);
1107             head->l3_len = sizeof(struct ipv4_hdr);
1108         }
1109 
1110         if (offload.tso_seg_size) {
1111             head->ol_flags |= PKT_TX_TCP_SEG;
1112             head->l4_len = sizeof(struct tcp_hdr);
1113             head->tso_segsz = offload.tso_seg_size;
1114         }
1115 
1116         if (offload.udp_csum) {
1117             head->ol_flags |= PKT_TX_UDP_CKSUM;
1118             head->l2_len = sizeof(struct ether_hdr);
1119             head->l3_len = sizeof(struct ipv4_hdr);
1120         }
1121     }
1122 
1123     ff_mbuf_free(m);
1124 
1125     return send_single_packet(head, ctx->port_id);
1126 }
1127 
1128 static int
1129 main_loop(void *arg)
1130 {
1131     struct loop_routine *lr = (struct loop_routine *)arg;
1132 
1133     struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
1134     unsigned lcore_id;
1135     uint64_t prev_tsc, diff_tsc, cur_tsc;
1136     int i, j, nb_rx;
1137     uint8_t port_id, queue_id;
1138     struct lcore_conf *qconf;
1139     const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) /
1140         US_PER_S * BURST_TX_DRAIN_US;
1141     struct ff_dpdk_if_context *ctx;
1142 
1143     prev_tsc = 0;
1144 
1145     lcore_id = rte_lcore_id();
1146     qconf = &lcore_conf;
1147 
1148     if (qconf->nb_rx_queue == 0) {
1149         printf("lcore %u has nothing to do\n", lcore_id);
1150         return 0;
1151     }
1152 
1153     while (1) {
1154         cur_tsc = rte_rdtsc();
1155         if (unlikely(freebsd_clock.expire < cur_tsc)) {
1156             rte_timer_manage();
1157         }
1158 
1159         /*
1160          * TX burst queue drain
1161          */
1162         diff_tsc = cur_tsc - prev_tsc;
1163         if (unlikely(diff_tsc > drain_tsc)) {
1164             /*
1165              * This could be optimized (use queueid instead of
1166              * portid), but it is not called so often
1167              */
1168             for (port_id = 0; port_id < RTE_MAX_ETHPORTS; port_id++) {
1169                 if (qconf->tx_mbufs[port_id].len == 0)
1170                     continue;
1171                 send_burst(qconf,
1172                     qconf->tx_mbufs[port_id].len,
1173                     port_id);
1174                 qconf->tx_mbufs[port_id].len = 0;
1175             }
1176 
1177             prev_tsc = cur_tsc;
1178         }
1179 
1180         /*
1181          * Read packet from RX queues
1182          */
1183         for (i = 0; i < qconf->nb_rx_queue; ++i) {
1184             port_id = qconf->rx_queue_list[i].port_id;
1185             queue_id = qconf->rx_queue_list[i].queue_id;
1186             ctx = veth_ctx[port_id];
1187 
1188             if (enable_kni && rte_eal_process_type() == RTE_PROC_PRIMARY) {
1189                 ff_kni_process(port_id, queue_id, pkts_burst, MAX_PKT_BURST);
1190             }
1191 
1192             process_arp_ring(port_id, queue_id, pkts_burst, ctx);
1193 
1194             nb_rx = rte_eth_rx_burst(port_id, queue_id, pkts_burst,
1195                 MAX_PKT_BURST);
1196             if (nb_rx == 0)
1197                 continue;
1198 
1199             /* Prefetch first packets */
1200             for (j = 0; j < PREFETCH_OFFSET && j < nb_rx; j++) {
1201                 rte_prefetch0(rte_pktmbuf_mtod(
1202                         pkts_burst[j], void *));
1203             }
1204 
1205             /* Prefetch and handle already prefetched packets */
1206             for (j = 0; j < (nb_rx - PREFETCH_OFFSET); j++) {
1207                 rte_prefetch0(rte_pktmbuf_mtod(pkts_burst[
1208                         j + PREFETCH_OFFSET], void *));
1209                 process_packets(port_id, queue_id, &pkts_burst[j], 1, ctx, 0);
1210             }
1211 
1212             /* Handle remaining prefetched packets */
1213             for (; j < nb_rx; j++) {
1214                 process_packets(port_id, queue_id, &pkts_burst[j], 1, ctx, 0);
1215             }
1216         }
1217 
1218         process_msg_ring(qconf->proc_id);
1219 
1220         if (likely(lr->loop != NULL)) {
1221             lr->loop(lr->arg);
1222         }
1223     }
1224 }
1225 
1226 int
1227 ff_dpdk_if_up(void) {
1228     int nb_ports = ff_global_cfg.dpdk.nb_ports;
1229     int i;
1230     for (i = 0; i < nb_ports; i++) {
1231         uint8_t port_id = ff_global_cfg.dpdk.port_cfgs[i].port_id;
1232         veth_ctx[port_id] = ff_veth_attach(ff_global_cfg.dpdk.port_cfgs + i);
1233         if (veth_ctx[port_id] == NULL) {
1234             rte_exit(EXIT_FAILURE, "ff_veth_attach failed");
1235         }
1236     }
1237 
1238     return 0;
1239 }
1240 
1241 void
1242 ff_dpdk_run(loop_func_t loop, void *arg) {
1243     struct loop_routine *lr = malloc(sizeof(struct loop_routine));
1244     lr->loop = loop;
1245     lr->arg = arg;
1246     rte_eal_mp_remote_launch(main_loop, lr, CALL_MASTER);
1247     rte_eal_mp_wait_lcore();
1248     free(lr);
1249 }
1250 
1251 void
1252 ff_dpdk_pktmbuf_free(void *m)
1253 {
1254     rte_pktmbuf_free((struct rte_mbuf *)m);
1255 }
1256 
1257 static uint32_t
1258 toeplitz_hash(unsigned keylen, const uint8_t *key,
1259     unsigned datalen, const uint8_t *data)
1260 {
1261     uint32_t hash = 0, v;
1262     u_int i, b;
1263 
1264     /* XXXRW: Perhaps an assertion about key length vs. data length? */
1265 
1266     v = (key[0]<<24) + (key[1]<<16) + (key[2] <<8) + key[3];
1267     for (i = 0; i < datalen; i++) {
1268         for (b = 0; b < 8; b++) {
1269             if (data[i] & (1<<(7-b)))
1270                 hash ^= v;
1271             v <<= 1;
1272             if ((i + 4) < keylen &&
1273                 (key[i+4] & (1<<(7-b))))
1274                 v |= 1;
1275         }
1276     }
1277     return (hash);
1278 }
1279 
1280 int
1281 ff_rss_check(uint32_t saddr, uint32_t daddr, uint16_t sport, uint16_t dport)
1282 {
1283     struct lcore_conf *qconf = &lcore_conf;
1284 
1285     if (qconf->nb_procs == 1) {
1286         return 1;
1287     }
1288 
1289     uint8_t data[sizeof(saddr) + sizeof(daddr) + sizeof(sport) +
1290         sizeof(dport)];
1291 
1292     unsigned datalen = 0;
1293 
1294     bcopy(&saddr, &data[datalen], sizeof(saddr));
1295     datalen += sizeof(saddr);
1296 
1297     bcopy(&daddr, &data[datalen], sizeof(daddr));
1298     datalen += sizeof(daddr);
1299 
1300     bcopy(&sport, &data[datalen], sizeof(sport));
1301     datalen += sizeof(sport);
1302 
1303     bcopy(&dport, &data[datalen], sizeof(dport));
1304     datalen += sizeof(dport);
1305 
1306     uint32_t hash = toeplitz_hash(sizeof(default_rsskey_40bytes), default_rsskey_40bytes, datalen, data);
1307 
1308     return (hash % qconf->nb_procs) == qconf->proc_id;
1309 }
1310 
1311 
1312