xref: /f-stack/lib/ff_dpdk_if.c (revision 40600211)
1 /*
2  * Copyright (C) 2017 THL A29 Limited, a Tencent company.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * 1. Redistributions of source code must retain the above copyright notice, this
9  *   list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright notice,
11  *   this list of conditions and the following disclaimer in the documentation
12  *   and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17  * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
18  * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
19  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
20  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
21  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
22  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
23  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24  *
25  */
26 
27 #include <rte_common.h>
28 #include <rte_byteorder.h>
29 #include <rte_log.h>
30 #include <rte_memory.h>
31 #include <rte_memcpy.h>
32 #include <rte_memzone.h>
33 #include <rte_config.h>
34 #include <rte_eal.h>
35 #include <rte_pci.h>
36 #include <rte_mbuf.h>
37 #include <rte_memory.h>
38 #include <rte_lcore.h>
39 #include <rte_launch.h>
40 #include <rte_ethdev.h>
41 #include <rte_debug.h>
42 #include <rte_common.h>
43 #include <rte_ether.h>
44 #include <rte_malloc.h>
45 #include <rte_cycles.h>
46 #include <rte_timer.h>
47 #include <rte_thash.h>
48 #include <rte_ip.h>
49 #include <rte_tcp.h>
50 #include <rte_udp.h>
51 
52 #include "ff_dpdk_if.h"
53 #include "ff_dpdk_pcap.h"
54 #include "ff_dpdk_kni.h"
55 #include "ff_config.h"
56 #include "ff_veth.h"
57 #include "ff_host_interface.h"
58 #include "ff_msg.h"
59 #include "ff_api.h"
60 
61 #define MEMPOOL_CACHE_SIZE 256
62 
63 #define ARP_RING_SIZE 2048
64 
65 #define MSG_RING_SIZE 32
66 
67 /*
68  * Configurable number of RX/TX ring descriptors
69  */
70 #define RX_QUEUE_SIZE 512
71 #define TX_QUEUE_SIZE 256
72 
73 #define MAX_PKT_BURST 32
74 #define BURST_TX_DRAIN_US 100 /* TX drain every ~100us */
75 
76 /*
77  * Try to avoid TX buffering if we have at least MAX_TX_BURST packets to send.
78  */
79 #define MAX_TX_BURST    (MAX_PKT_BURST / 2)
80 
81 #define NB_SOCKETS 8
82 
83 /* Configure how many packets ahead to prefetch, when reading packets */
84 #define PREFETCH_OFFSET    3
85 
86 #define MAX_RX_QUEUE_PER_LCORE 16
87 #define MAX_TX_QUEUE_PER_PORT RTE_MAX_ETHPORTS
88 #define MAX_RX_QUEUE_PER_PORT 128
89 
90 #define KNI_MBUF_MAX 2048
91 #define KNI_QUEUE_SIZE 2048
92 
93 static int enable_kni;
94 static int kni_accept;
95 
96 static struct rte_timer freebsd_clock;
97 
98 // Mellanox Linux's driver key
99 static uint8_t default_rsskey_40bytes[40] = {
100     0xd1, 0x81, 0xc6, 0x2c, 0xf7, 0xf4, 0xdb, 0x5b,
101     0x19, 0x83, 0xa2, 0xfc, 0x94, 0x3e, 0x1a, 0xdb,
102     0xd9, 0x38, 0x9e, 0x6b, 0xd1, 0x03, 0x9c, 0x2c,
103     0xa7, 0x44, 0x99, 0xad, 0x59, 0x3d, 0x56, 0xd9,
104     0xf3, 0x25, 0x3c, 0x06, 0x2a, 0xdc, 0x1f, 0xfc
105 };
106 
107 static struct rte_eth_conf default_port_conf = {
108     .rxmode = {
109         .mq_mode = ETH_MQ_RX_RSS,
110         .max_rx_pkt_len = ETHER_MAX_LEN,
111         .split_hdr_size = 0, /**< hdr buf size */
112         .header_split   = 0, /**< Header Split disabled */
113         .hw_ip_checksum = 0, /**< IP checksum offload disabled */
114         .hw_vlan_filter = 0, /**< VLAN filtering disabled */
115         .hw_vlan_strip  = 0, /**< VLAN strip disabled. */
116         .hw_vlan_extend = 0, /**< Extended VLAN disabled. */
117         .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
118         .hw_strip_crc   = 0, /**< CRC stripped by hardware */
119         .enable_lro     = 0, /**< LRO disabled */
120     },
121     .rx_adv_conf = {
122         .rss_conf = {
123             .rss_key = default_rsskey_40bytes,
124             .rss_key_len = 40,
125             .rss_hf = ETH_RSS_PROTO_MASK,
126         },
127     },
128     .txmode = {
129         .mq_mode = ETH_MQ_TX_NONE,
130     },
131 };
132 
133 struct mbuf_table {
134     uint16_t len;
135     struct rte_mbuf *m_table[MAX_PKT_BURST];
136 };
137 
138 struct lcore_rx_queue {
139     uint8_t port_id;
140     uint8_t queue_id;
141 } __rte_cache_aligned;
142 
143 struct lcore_conf {
144     uint16_t proc_id;
145     uint16_t nb_procs;
146     uint16_t socket_id;
147     uint16_t nb_rx_queue;
148     uint16_t *proc_lcore;
149     struct lcore_rx_queue rx_queue_list[MAX_RX_QUEUE_PER_LCORE];
150     uint16_t tx_queue_id[RTE_MAX_ETHPORTS];
151     struct mbuf_table tx_mbufs[RTE_MAX_ETHPORTS];
152     char *pcap[RTE_MAX_ETHPORTS];
153 } __rte_cache_aligned;
154 
155 static struct lcore_conf lcore_conf;
156 
157 static struct rte_mempool *pktmbuf_pool[NB_SOCKETS];
158 
159 static struct rte_ring **arp_ring[RTE_MAX_LCORE];
160 
161 struct ff_msg_ring {
162     char ring_name[2][RTE_RING_NAMESIZE];
163     /* ring[0] for lcore recv msg, other send */
164     /* ring[1] for lcore send msg, other read */
165     struct rte_ring *ring[2];
166 } __rte_cache_aligned;
167 
168 static struct ff_msg_ring msg_ring[RTE_MAX_LCORE];
169 static struct rte_mempool *message_pool;
170 
171 struct ff_dpdk_if_context {
172     void *sc;
173     void *ifp;
174     uint16_t port_id;
175     struct ff_hw_features hw_features;
176 } __rte_cache_aligned;
177 
178 static struct ff_dpdk_if_context *veth_ctx[RTE_MAX_ETHPORTS];
179 
180 extern void ff_hardclock(void);
181 
182 static void
183 ff_hardclock_job(__rte_unused struct rte_timer *timer,
184     __rte_unused void *arg) {
185     ff_hardclock();
186     ff_update_current_ts();
187 }
188 
189 struct ff_dpdk_if_context *
190 ff_dpdk_register_if(void *sc, void *ifp, struct ff_port_cfg *cfg)
191 {
192     struct ff_dpdk_if_context *ctx;
193 
194     ctx = calloc(1, sizeof(struct ff_dpdk_if_context));
195     if (ctx == NULL)
196         return NULL;
197 
198     ctx->sc = sc;
199     ctx->ifp = ifp;
200     ctx->port_id = cfg->port_id;
201     ctx->hw_features = cfg->hw_features;
202 
203     return ctx;
204 }
205 
206 void
207 ff_dpdk_deregister_if(struct ff_dpdk_if_context *ctx)
208 {
209     free(ctx);
210 }
211 
212 static void
213 check_all_ports_link_status(void)
214 {
215     #define CHECK_INTERVAL 100 /* 100ms */
216     #define MAX_CHECK_TIME 90  /* 9s (90 * 100ms) in total */
217 
218     uint8_t portid, count, all_ports_up, print_flag = 0;
219     struct rte_eth_link link;
220 
221     printf("\nChecking link status");
222     fflush(stdout);
223 
224     int i, nb_ports;
225     nb_ports = ff_global_cfg.dpdk.nb_ports;
226     for (count = 0; count <= MAX_CHECK_TIME; count++) {
227         all_ports_up = 1;
228         for (i = 0; i < nb_ports; i++) {
229             uint8_t portid = ff_global_cfg.dpdk.port_cfgs[i].port_id;
230             memset(&link, 0, sizeof(link));
231             rte_eth_link_get_nowait(portid, &link);
232 
233             /* print link status if flag set */
234             if (print_flag == 1) {
235                 if (link.link_status) {
236                     printf("Port %d Link Up - speed %u "
237                         "Mbps - %s\n", (int)portid,
238                         (unsigned)link.link_speed,
239                         (link.link_duplex == ETH_LINK_FULL_DUPLEX) ?
240                         ("full-duplex") : ("half-duplex\n"));
241                 } else {
242                     printf("Port %d Link Down\n", (int)portid);
243                 }
244                 continue;
245             }
246             /* clear all_ports_up flag if any link down */
247             if (link.link_status == 0) {
248                 all_ports_up = 0;
249                 break;
250             }
251         }
252 
253         /* after finally printing all link status, get out */
254         if (print_flag == 1)
255             break;
256 
257         if (all_ports_up == 0) {
258             printf(".");
259             fflush(stdout);
260             rte_delay_ms(CHECK_INTERVAL);
261         }
262 
263         /* set the print_flag if all ports up or timeout */
264         if (all_ports_up == 1 || count == (MAX_CHECK_TIME - 1)) {
265             print_flag = 1;
266             printf("done\n");
267         }
268     }
269 }
270 
271 static int
272 init_lcore_conf(void)
273 {
274     uint8_t nb_ports = rte_eth_dev_count();
275     if (nb_ports == 0) {
276         rte_exit(EXIT_FAILURE, "No probed ethernet devices\n");
277     }
278 
279     lcore_conf.proc_id = ff_global_cfg.dpdk.proc_id;
280     lcore_conf.nb_procs = ff_global_cfg.dpdk.nb_procs;
281 
282     lcore_conf.proc_lcore = rte_zmalloc(NULL,
283         sizeof(uint16_t) * lcore_conf.nb_procs, 0);
284     if (lcore_conf.proc_lcore == NULL) {
285         rte_exit(EXIT_FAILURE, "rte_zmalloc proc_lcore failed\n");
286     }
287     rte_memcpy(lcore_conf.proc_lcore, ff_global_cfg.dpdk.proc_lcore,
288         sizeof(uint16_t) * lcore_conf.nb_procs);
289     uint16_t proc_id;
290     for (proc_id = 0; proc_id < lcore_conf.nb_procs; proc_id++) {
291         uint16_t lcore_id = lcore_conf.proc_lcore[proc_id];
292         if (!lcore_config[lcore_id].detected) {
293             rte_exit(EXIT_FAILURE, "lcore %u unavailable\n", lcore_id);
294         }
295     }
296 
297     uint16_t socket_id = 0;
298     if (ff_global_cfg.dpdk.numa_on) {
299         socket_id = rte_lcore_to_socket_id(rte_lcore_id());
300     }
301 
302     lcore_conf.socket_id = socket_id;
303 
304     /* Currently, proc id 1:1 map to rx/tx queue id per port. */
305     uint8_t port_id, enabled_ports = 0;
306     for (port_id = 0; port_id < nb_ports; port_id++) {
307         if (ff_global_cfg.dpdk.port_mask &&
308             (ff_global_cfg.dpdk.port_mask & (1 << port_id)) == 0) {
309             printf("\nSkipping disabled port %d\n", port_id);
310             continue;
311         }
312 
313         if (port_id >= ff_global_cfg.dpdk.nb_ports) {
314             printf("\nSkipping non-configured port %d\n", port_id);
315             break;
316         }
317 
318         uint16_t nb_rx_queue = lcore_conf.nb_rx_queue;
319         lcore_conf.rx_queue_list[nb_rx_queue].port_id = port_id;
320         lcore_conf.rx_queue_list[nb_rx_queue].queue_id = lcore_conf.proc_id;
321         lcore_conf.nb_rx_queue++;
322 
323         lcore_conf.tx_queue_id[port_id] = lcore_conf.proc_id;
324         lcore_conf.pcap[port_id] = ff_global_cfg.dpdk.port_cfgs[enabled_ports].pcap;
325 
326         ff_global_cfg.dpdk.port_cfgs[enabled_ports].port_id = port_id;
327 
328         enabled_ports++;
329     }
330 
331     ff_global_cfg.dpdk.nb_ports = enabled_ports;
332 
333     return 0;
334 }
335 
336 static int
337 init_mem_pool(void)
338 {
339     uint8_t nb_ports = ff_global_cfg.dpdk.nb_ports;
340     uint32_t nb_lcores = ff_global_cfg.dpdk.nb_procs;
341     uint32_t nb_tx_queue = nb_lcores;
342     uint32_t nb_rx_queue = lcore_conf.nb_rx_queue * nb_lcores;
343 
344     unsigned nb_mbuf = RTE_MAX (
345         (nb_rx_queue*RX_QUEUE_SIZE          +
346         nb_ports*nb_lcores*MAX_PKT_BURST    +
347         nb_ports*nb_tx_queue*TX_QUEUE_SIZE  +
348         nb_lcores*MEMPOOL_CACHE_SIZE +
349         nb_ports*KNI_MBUF_MAX +
350         nb_ports*KNI_QUEUE_SIZE +
351         nb_lcores*nb_ports*ARP_RING_SIZE),
352         (unsigned)8192);
353 
354     unsigned socketid = 0;
355     uint16_t i, lcore_id;
356     char s[64];
357     int numa_on = ff_global_cfg.dpdk.numa_on;
358 
359     for (i = 0; i < lcore_conf.nb_procs; i++) {
360         lcore_id = lcore_conf.proc_lcore[i];
361         if (numa_on) {
362             socketid = rte_lcore_to_socket_id(lcore_id);
363         }
364 
365         if (socketid >= NB_SOCKETS) {
366             rte_exit(EXIT_FAILURE, "Socket %d of lcore %u is out of range %d\n",
367                 socketid, i, NB_SOCKETS);
368         }
369 
370         if (pktmbuf_pool[socketid] != NULL) {
371             continue;
372         }
373 
374         if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
375             snprintf(s, sizeof(s), "mbuf_pool_%d", socketid);
376             pktmbuf_pool[socketid] =
377                 rte_pktmbuf_pool_create(s, nb_mbuf,
378                     MEMPOOL_CACHE_SIZE, 0,
379                     RTE_MBUF_DEFAULT_BUF_SIZE, socketid);
380         } else {
381             snprintf(s, sizeof(s), "mbuf_pool_%d", socketid);
382             pktmbuf_pool[socketid] = rte_mempool_lookup(s);
383         }
384 
385         if (pktmbuf_pool[socketid] == NULL) {
386             rte_exit(EXIT_FAILURE, "Cannot create mbuf pool on socket %d\n", socketid);
387         } else {
388             printf("create mbuf pool on socket %d\n", socketid);
389         }
390     }
391 
392     return 0;
393 }
394 
395 static struct rte_ring *
396 create_ring(const char *name, unsigned count, int socket_id, unsigned flags)
397 {
398     struct rte_ring *ring;
399 
400     if (name == NULL)
401         return NULL;
402 
403     /* If already create, just attached it */
404     if (likely((ring = rte_ring_lookup(name)) != NULL))
405         return ring;
406 
407     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
408         return rte_ring_create(name, count, socket_id, flags);
409     } else {
410         return rte_ring_lookup(name);
411     }
412 }
413 
414 static int
415 init_arp_ring(void)
416 {
417     int i, j, ret;
418     char name_buf[RTE_RING_NAMESIZE];
419     int nb_procs = ff_global_cfg.dpdk.nb_procs;
420     int proc_id = ff_global_cfg.dpdk.proc_id;
421 
422     /* Allocate arp ring ptr according to eth dev count. */
423     int nb_ports = rte_eth_dev_count();
424     for(i = 0; i < nb_procs; ++i) {
425         snprintf(name_buf, RTE_RING_NAMESIZE, "ring_ptr_%d_%d",
426             proc_id, i);
427 
428         arp_ring[i] = rte_zmalloc(name_buf,
429             sizeof(struct rte_ring *) * nb_ports,
430              RTE_CACHE_LINE_SIZE);
431         if (arp_ring[i] == NULL) {
432             rte_exit(EXIT_FAILURE, "rte_zmalloc(%s (struct rte_ring*)) "
433                 "failed\n", name_buf);
434         }
435     }
436 
437     unsigned socketid = lcore_conf.socket_id;
438 
439     /* Create ring according to ports actually being used. */
440     nb_ports = ff_global_cfg.dpdk.nb_ports;
441     for (j = 0; j < nb_ports; j++) {
442         uint8_t port_id = ff_global_cfg.dpdk.port_cfgs[j].port_id;
443 
444         for(i = 0; i < nb_procs; ++i) {
445             snprintf(name_buf, RTE_RING_NAMESIZE, "arp_ring_%d_%d", i, port_id);
446             arp_ring[i][port_id] = create_ring(name_buf, ARP_RING_SIZE,
447                 socketid, RING_F_SC_DEQ);
448 
449             if (arp_ring[i][port_id] == NULL)
450                 rte_panic("create ring:%s failed!\n", name_buf);
451 
452             printf("create ring:%s success, %u ring entries are now free!\n",
453                 name_buf, rte_ring_free_count(arp_ring[i][port_id]));
454         }
455     }
456 
457     return 0;
458 }
459 
460 static void
461 ff_msg_init(struct rte_mempool *mp,
462     __attribute__((unused)) void *opaque_arg,
463     void *obj, __attribute__((unused)) unsigned i)
464 {
465     struct ff_msg *msg = (struct ff_msg *)obj;
466     msg->msg_type = FF_UNKNOWN;
467     msg->buf_addr = (char *)msg + sizeof(struct ff_msg);
468     msg->buf_len = mp->elt_size - sizeof(struct ff_msg);
469 }
470 
471 static int
472 init_msg_ring(void)
473 {
474     uint16_t i;
475     uint16_t nb_procs = ff_global_cfg.dpdk.nb_procs;
476     unsigned socketid = lcore_conf.socket_id;
477 
478     /* Create message buffer pool */
479     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
480         message_pool = rte_mempool_create(FF_MSG_POOL,
481            MSG_RING_SIZE * 2 * nb_procs,
482            MAX_MSG_BUF_SIZE, MSG_RING_SIZE / 2, 0,
483            NULL, NULL, ff_msg_init, NULL,
484            socketid, 0);
485     } else {
486         message_pool = rte_mempool_lookup(FF_MSG_POOL);
487     }
488 
489     if (message_pool == NULL) {
490         rte_panic("Create msg mempool failed\n");
491     }
492 
493     for(i = 0; i < nb_procs; ++i) {
494         snprintf(msg_ring[i].ring_name[0], RTE_RING_NAMESIZE,
495             "%s%u", FF_MSG_RING_IN, i);
496         snprintf(msg_ring[i].ring_name[1], RTE_RING_NAMESIZE,
497             "%s%u", FF_MSG_RING_OUT, i);
498 
499         msg_ring[i].ring[0] = create_ring(msg_ring[i].ring_name[0],
500             MSG_RING_SIZE, socketid, RING_F_SP_ENQ | RING_F_SC_DEQ);
501         if (msg_ring[i].ring[0] == NULL)
502             rte_panic("create ring::%s failed!\n", msg_ring[i].ring_name[0]);
503 
504         msg_ring[i].ring[1] = create_ring(msg_ring[i].ring_name[1],
505             MSG_RING_SIZE, socketid, RING_F_SP_ENQ | RING_F_SC_DEQ);
506         if (msg_ring[i].ring[1] == NULL)
507             rte_panic("create ring::%s failed!\n", msg_ring[i].ring_name[0]);
508     }
509 
510     return 0;
511 }
512 
513 static int
514 init_kni(void)
515 {
516     int nb_ports = rte_eth_dev_count();
517     kni_accept = 0;
518     if(strcasecmp(ff_global_cfg.kni.method, "accept") == 0)
519         kni_accept = 1;
520 
521     ff_kni_init(nb_ports, ff_global_cfg.kni.tcp_port,
522         ff_global_cfg.kni.udp_port);
523 
524     unsigned socket_id = lcore_conf.socket_id;
525     struct rte_mempool *mbuf_pool = pktmbuf_pool[socket_id];
526 
527     nb_ports = ff_global_cfg.dpdk.nb_ports;
528     int i, ret;
529     for (i = 0; i < nb_ports; i++) {
530         uint8_t port_id = ff_global_cfg.dpdk.port_cfgs[i].port_id;
531         ff_kni_alloc(port_id, socket_id, mbuf_pool, KNI_QUEUE_SIZE);
532     }
533 
534     return 0;
535 }
536 
537 static int
538 init_port_start(void)
539 {
540     int nb_ports = ff_global_cfg.dpdk.nb_ports;
541     uint16_t nb_procs = ff_global_cfg.dpdk.nb_procs;
542     unsigned socketid = rte_lcore_to_socket_id(rte_lcore_id());
543     struct rte_mempool *mbuf_pool = pktmbuf_pool[socketid];
544     uint16_t i;
545 
546     for (i = 0; i < nb_ports; i++) {
547         uint8_t port_id = ff_global_cfg.dpdk.port_cfgs[i].port_id;
548 
549         struct rte_eth_dev_info dev_info;
550         rte_eth_dev_info_get(port_id, &dev_info);
551 
552         if (nb_procs > dev_info.max_rx_queues) {
553             rte_exit(EXIT_FAILURE, "num_procs[%d] bigger than max_rx_queues[%d]\n",
554                 nb_procs,
555                 dev_info.max_rx_queues);
556         }
557 
558         if (nb_procs > dev_info.max_tx_queues) {
559             rte_exit(EXIT_FAILURE, "num_procs[%d] bigger than max_tx_queues[%d]\n",
560                 nb_procs,
561                 dev_info.max_tx_queues);
562         }
563 
564         struct ether_addr addr;
565         rte_eth_macaddr_get(port_id, &addr);
566         printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
567                    " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
568                 (unsigned)port_id,
569                 addr.addr_bytes[0], addr.addr_bytes[1],
570                 addr.addr_bytes[2], addr.addr_bytes[3],
571                 addr.addr_bytes[4], addr.addr_bytes[5]);
572 
573         rte_memcpy(ff_global_cfg.dpdk.port_cfgs[i].mac,
574             addr.addr_bytes, ETHER_ADDR_LEN);
575 
576         /* Clear txq_flags - we do not need multi-mempool and refcnt */
577         dev_info.default_txconf.txq_flags = ETH_TXQ_FLAGS_NOMULTMEMP |
578             ETH_TXQ_FLAGS_NOREFCOUNT;
579 
580         /* Disable features that are not supported by port's HW */
581         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM)) {
582             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMUDP;
583         }
584 
585         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)) {
586             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMTCP;
587         }
588 
589         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_SCTP_CKSUM)) {
590             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMSCTP;
591         }
592 
593         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_VLAN_INSERT)) {
594             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOVLANOFFL;
595         }
596 
597         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_VLAN_INSERT)) {
598             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOVLANOFFL;
599         }
600 
601         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_TSO) &&
602             !(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_TSO)) {
603             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOMULTSEGS;
604         }
605 
606         struct rte_eth_conf port_conf = {0};
607 
608         /* Set RSS mode */
609         port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
610         port_conf.rx_adv_conf.rss_conf.rss_hf = ETH_RSS_PROTO_MASK;
611         port_conf.rx_adv_conf.rss_conf.rss_key = default_rsskey_40bytes;
612         port_conf.rx_adv_conf.rss_conf.rss_key_len = 40;
613 
614         /* Set Rx VLAN stripping */
615         if (ff_global_cfg.dpdk.vlan_strip) {
616             if (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_VLAN_STRIP) {
617                 port_conf.rxmode.hw_vlan_strip = 1;
618             }
619         }
620 
621         /* Enable HW CRC stripping */
622         port_conf.rxmode.hw_strip_crc = 1;
623 
624         /* FIXME: Enable TCP LRO ?*/
625         #if 0
626         if (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_LRO) {
627             printf("LRO is supported\n");
628             port_conf.rxmode.enable_lro = 1;
629             ff_global_cfg.dpdk.port_cfgs[i].hw_features.rx_lro = 1;
630         }
631         #endif
632 
633         /* Set Rx checksum checking */
634         if ((dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
635             (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_UDP_CKSUM) &&
636             (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)) {
637             printf("RX checksum offload supported\n");
638             port_conf.rxmode.hw_ip_checksum = 1;
639             ff_global_cfg.dpdk.port_cfgs[i].hw_features.rx_csum = 1;
640         }
641 
642         if ((dev_info.tx_offload_capa & DEV_TX_OFFLOAD_IPV4_CKSUM)) {
643             printf("TX ip checksum offload supported\n");
644             ff_global_cfg.dpdk.port_cfgs[i].hw_features.tx_csum_ip = 1;
645         }
646 
647         if ((dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM) &&
648             (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)) {
649             printf("TX TCP&UDP checksum offload supported\n");
650             ff_global_cfg.dpdk.port_cfgs[i].hw_features.tx_csum_l4 = 1;
651         }
652 
653         if (ff_global_cfg.dpdk.tso) {
654             if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_TSO) {
655                 printf("TSO is supported\n");
656                 ff_global_cfg.dpdk.port_cfgs[i].hw_features.tx_tso = 1;
657             }
658         } else {
659             printf("TSO is disabled\n");
660         }
661 
662         if (rte_eal_process_type() != RTE_PROC_PRIMARY) {
663             continue;
664         }
665 
666         /* Currently, proc id 1:1 map to queue id per port. */
667         int ret = rte_eth_dev_configure(port_id, nb_procs, nb_procs, &port_conf);
668         if (ret != 0) {
669             return ret;
670         }
671 
672         uint16_t q;
673         for (q = 0; q < nb_procs; q++) {
674             ret = rte_eth_tx_queue_setup(port_id, q, TX_QUEUE_SIZE,
675                 socketid, &dev_info.default_txconf);
676             if (ret < 0) {
677                 return ret;
678             }
679 
680             ret = rte_eth_rx_queue_setup(port_id, q, RX_QUEUE_SIZE,
681                 socketid, &dev_info.default_rxconf, mbuf_pool);
682             if (ret < 0) {
683                 return ret;
684             }
685         }
686 
687         ret = rte_eth_dev_start(port_id);
688         if (ret < 0) {
689             return ret;
690         }
691 
692         /* Enable RX in promiscuous mode for the Ethernet device. */
693         if (ff_global_cfg.dpdk.promiscuous) {
694             rte_eth_promiscuous_enable(port_id);
695             ret = rte_eth_promiscuous_get(port_id);
696             if (ret == 1) {
697                 printf("set port %u to promiscuous mode ok\n", port_id);
698             } else {
699                 printf("set port %u to promiscuous mode error\n", port_id);
700             }
701         }
702 
703         /* Enable pcap dump */
704         if (ff_global_cfg.dpdk.port_cfgs[i].pcap) {
705             ff_enable_pcap(ff_global_cfg.dpdk.port_cfgs[i].pcap);
706         }
707     }
708 
709     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
710         check_all_ports_link_status();
711     }
712 
713     return 0;
714 }
715 
716 static int
717 init_clock(void)
718 {
719     rte_timer_subsystem_init();
720     uint64_t hz = rte_get_timer_hz();
721     uint64_t intrs = MS_PER_S/ff_global_cfg.freebsd.hz;
722     uint64_t tsc = (hz + MS_PER_S - 1) / MS_PER_S*intrs;
723 
724     rte_timer_init(&freebsd_clock);
725     rte_timer_reset(&freebsd_clock, tsc, PERIODICAL,
726         rte_lcore_id(), &ff_hardclock_job, NULL);
727 
728     ff_update_current_ts();
729 
730     return 0;
731 }
732 
733 int
734 ff_dpdk_init(int argc, char **argv)
735 {
736     if (ff_global_cfg.dpdk.nb_procs < 1 ||
737         ff_global_cfg.dpdk.nb_procs > RTE_MAX_LCORE ||
738         ff_global_cfg.dpdk.proc_id >= ff_global_cfg.dpdk.nb_procs ||
739         ff_global_cfg.dpdk.proc_id < 0) {
740         printf("param num_procs[%d] or proc_id[%d] error!\n",
741             ff_global_cfg.dpdk.nb_procs,
742             ff_global_cfg.dpdk.proc_id);
743         exit(1);
744     }
745 
746     int ret = rte_eal_init(argc, argv);
747     if (ret < 0) {
748         rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
749     }
750 
751     init_lcore_conf();
752 
753     init_mem_pool();
754 
755     init_arp_ring();
756 
757     init_msg_ring();
758 
759     enable_kni = ff_global_cfg.kni.enable;
760     if (enable_kni) {
761         init_kni();
762     }
763 
764     ret = init_port_start();
765     if (ret < 0) {
766         rte_exit(EXIT_FAILURE, "init_port_start failed\n");
767     }
768 
769     init_clock();
770 
771     return 0;
772 }
773 
774 static void
775 ff_veth_input(const struct ff_dpdk_if_context *ctx, struct rte_mbuf *pkt)
776 {
777     uint8_t rx_csum = ctx->hw_features.rx_csum;
778     if (rx_csum) {
779         if (pkt->ol_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD)) {
780             return;
781         }
782     }
783 
784     /*
785      * FIXME: should we save pkt->vlan_tci
786      * if (pkt->ol_flags & PKT_RX_VLAN_PKT)
787      */
788 
789     void *data = rte_pktmbuf_mtod(pkt, void*);
790     uint16_t len = rte_pktmbuf_data_len(pkt);
791 
792     void *hdr = ff_mbuf_gethdr(pkt, pkt->pkt_len, data, len, rx_csum);
793     if (hdr == NULL) {
794         rte_pktmbuf_free(pkt);
795         return;
796     }
797 
798     struct rte_mbuf *pn = pkt->next;
799     void *prev = hdr;
800     while(pn != NULL) {
801         data = rte_pktmbuf_mtod(pkt, void*);
802         len = rte_pktmbuf_data_len(pkt);
803 
804         void *mb = ff_mbuf_get(prev, data, len);
805         if (mb == NULL) {
806             ff_mbuf_free(hdr);
807             rte_pktmbuf_free(pkt);
808             return;
809         }
810         pn = pn->next;
811         prev = mb;
812     }
813 
814     ff_veth_process_packet(ctx->ifp, hdr);
815 }
816 
817 static enum FilterReturn
818 protocol_filter(const void *data, uint16_t len)
819 {
820     if(len < sizeof(struct ether_hdr))
821         return FILTER_UNKNOWN;
822 
823     const struct ether_hdr *hdr;
824     hdr = (const struct ether_hdr *)data;
825 
826     if(ntohs(hdr->ether_type) == ETHER_TYPE_ARP)
827         return FILTER_ARP;
828 
829     if (!enable_kni) {
830         return FILTER_UNKNOWN;
831     }
832 
833     if(ntohs(hdr->ether_type) != ETHER_TYPE_IPv4)
834         return FILTER_UNKNOWN;
835 
836     return ff_kni_proto_filter(data + sizeof(struct ether_hdr),
837         len - sizeof(struct ether_hdr));
838 }
839 
840 static inline void
841 process_packets(uint8_t port_id, uint16_t queue_id, struct rte_mbuf **bufs,
842     uint16_t count, const struct ff_dpdk_if_context *ctx, int pkts_from_ring)
843 {
844     struct lcore_conf *qconf = &lcore_conf;
845 
846     uint16_t i;
847     for (i = 0; i < count; i++) {
848         struct rte_mbuf *rtem = bufs[i];
849 
850         if (unlikely(qconf->pcap[port_id] != NULL)) {
851             ff_dump_packets(qconf->pcap[port_id], rtem);
852         }
853 
854         void *data = rte_pktmbuf_mtod(rtem, void*);
855         uint16_t len = rte_pktmbuf_data_len(rtem);
856 
857         enum FilterReturn filter = protocol_filter(data, len);
858         if (filter == FILTER_ARP) {
859             struct rte_mempool *mbuf_pool;
860             struct rte_mbuf *mbuf_clone;
861             if (pkts_from_ring == 0) {
862                 uint16_t i;
863                 for(i = 0; i < qconf->nb_procs; ++i) {
864                     if(i == queue_id)
865                         continue;
866 
867                     mbuf_pool = pktmbuf_pool[rte_lcore_to_socket_id(qconf->proc_lcore[i])];
868                     mbuf_clone = rte_pktmbuf_clone(rtem, mbuf_pool);
869                     if(mbuf_clone) {
870                         int ret = rte_ring_enqueue(arp_ring[i][port_id], mbuf_clone);
871                         if (ret < 0)
872                             rte_pktmbuf_free(mbuf_clone);
873                     }
874                 }
875             }
876 
877             if (enable_kni && rte_eal_process_type() == RTE_PROC_PRIMARY) {
878                 mbuf_pool = pktmbuf_pool[qconf->socket_id];
879                 mbuf_clone = rte_pktmbuf_clone(rtem, mbuf_pool);
880                 if(mbuf_clone) {
881                     ff_kni_enqueue(port_id, mbuf_clone);
882                 }
883             }
884 
885             ff_veth_input(ctx, rtem);
886         } else if (enable_kni && ((filter == FILTER_KNI && kni_accept) ||
887             (filter == FILTER_UNKNOWN && !kni_accept)) ) {
888             ff_kni_enqueue(port_id, rtem);
889         } else {
890             ff_veth_input(ctx, rtem);
891         }
892     }
893 }
894 
895 static inline int
896 process_arp_ring(uint8_t port_id, uint16_t queue_id,
897     struct rte_mbuf **pkts_burst, const struct ff_dpdk_if_context *ctx)
898 {
899     /* read packet from ring buf and to process */
900     uint16_t nb_rb;
901     nb_rb = rte_ring_dequeue_burst(arp_ring[queue_id][port_id],
902         (void **)pkts_burst, MAX_PKT_BURST);
903 
904     if(nb_rb > 0) {
905         process_packets(port_id, queue_id, pkts_burst, nb_rb, ctx, 1);
906     }
907 
908     return 0;
909 }
910 
911 static inline void
912 handle_sysctl_msg(struct ff_msg *msg, uint16_t proc_id)
913 {
914     int ret = ff_sysctl(msg->sysctl.name, msg->sysctl.namelen,
915         msg->sysctl.old, msg->sysctl.oldlenp, msg->sysctl.new,
916         msg->sysctl.newlen);
917 
918     if (ret < 0) {
919         msg->result = errno;
920     } else {
921         msg->result = 0;
922     }
923 
924     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
925 }
926 
927 static inline void
928 handle_ioctl_msg(struct ff_msg *msg, uint16_t proc_id)
929 {
930     int fd, ret;
931     fd = ff_socket(AF_INET, SOCK_DGRAM, 0);
932     if (fd < 0) {
933         ret = -1;
934         goto done;
935     }
936 
937     ret = ff_ioctl(fd, msg->ioctl.cmd, msg->ioctl.data);
938 
939     ff_close(fd);
940 
941 done:
942     if (ret < 0) {
943         msg->result = errno;
944     } else {
945         msg->result = 0;
946     }
947 
948     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
949 }
950 
951 static inline void
952 handle_route_msg(struct ff_msg *msg, uint16_t proc_id)
953 {
954     msg->result = ff_rtioctl(msg->route.fib, msg->route.data,
955         &msg->route.len, msg->route.maxlen);
956 
957     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
958 }
959 
960 static inline void
961 handle_default_msg(struct ff_msg *msg, uint16_t proc_id)
962 {
963     msg->result = EINVAL;
964     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
965 }
966 
967 static inline void
968 handle_msg(struct ff_msg *msg, uint16_t proc_id)
969 {
970     switch (msg->msg_type) {
971         case FF_SYSCTL:
972             handle_sysctl_msg(msg, proc_id);
973             break;
974         case FF_IOCTL:
975             handle_ioctl_msg(msg, proc_id);
976             break;
977         case FF_ROUTE:
978             handle_route_msg(msg, proc_id);
979             break;
980         default:
981             handle_default_msg(msg, proc_id);
982             break;
983     }
984 }
985 
986 static inline int
987 process_msg_ring(uint16_t proc_id)
988 {
989     void *msg;
990     int ret = rte_ring_dequeue(msg_ring[proc_id].ring[0], &msg);
991 
992     if (unlikely(ret == 0)) {
993         handle_msg((struct ff_msg *)msg, proc_id);
994     }
995 
996     return 0;
997 }
998 
999 /* Send burst of packets on an output interface */
1000 static inline int
1001 send_burst(struct lcore_conf *qconf, uint16_t n, uint8_t port)
1002 {
1003     struct rte_mbuf **m_table;
1004     int ret;
1005     uint16_t queueid;
1006 
1007     queueid = qconf->tx_queue_id[port];
1008     m_table = (struct rte_mbuf **)qconf->tx_mbufs[port].m_table;
1009 
1010     if (unlikely(qconf->pcap[port] != NULL)) {
1011         uint16_t i;
1012         for (i = 0; i < n; i++) {
1013             ff_dump_packets(qconf->pcap[port], m_table[i]);
1014         }
1015     }
1016 
1017     ret = rte_eth_tx_burst(port, queueid, m_table, n);
1018     if (unlikely(ret < n)) {
1019         do {
1020             rte_pktmbuf_free(m_table[ret]);
1021         } while (++ret < n);
1022     }
1023 
1024     return 0;
1025 }
1026 
1027 /* Enqueue a single packet, and send burst if queue is filled */
1028 static inline int
1029 send_single_packet(struct rte_mbuf *m, uint8_t port)
1030 {
1031     uint16_t len;
1032     struct lcore_conf *qconf;
1033 
1034     qconf = &lcore_conf;
1035     len = qconf->tx_mbufs[port].len;
1036     qconf->tx_mbufs[port].m_table[len] = m;
1037     len++;
1038 
1039     /* enough pkts to be sent */
1040     if (unlikely(len == MAX_PKT_BURST)) {
1041         send_burst(qconf, MAX_PKT_BURST, port);
1042         len = 0;
1043     }
1044 
1045     qconf->tx_mbufs[port].len = len;
1046     return 0;
1047 }
1048 
1049 int
1050 ff_dpdk_if_send(struct ff_dpdk_if_context *ctx, void *m,
1051     int total)
1052 {
1053     struct rte_mempool *mbuf_pool = pktmbuf_pool[lcore_conf.socket_id];
1054     struct rte_mbuf *head = rte_pktmbuf_alloc(mbuf_pool);
1055     if (head == NULL) {
1056         ff_mbuf_free(m);
1057         return -1;
1058     }
1059 
1060     head->pkt_len = total;
1061     head->nb_segs = 0;
1062 
1063     int off = 0;
1064     struct rte_mbuf *cur = head, *prev = NULL;
1065     while(total > 0) {
1066         if (cur == NULL) {
1067             cur = rte_pktmbuf_alloc(mbuf_pool);
1068             if (cur == NULL) {
1069                 rte_pktmbuf_free(head);
1070                 ff_mbuf_free(m);
1071                 return -1;
1072             }
1073         }
1074 
1075         void *data = rte_pktmbuf_mtod(cur, void*);
1076         int len = total > RTE_MBUF_DEFAULT_DATAROOM ? RTE_MBUF_DEFAULT_DATAROOM : total;
1077         int ret = ff_mbuf_copydata(m, data, off, len);
1078         if (ret < 0) {
1079             rte_pktmbuf_free(head);
1080             ff_mbuf_free(m);
1081             return -1;
1082         }
1083 
1084         if (prev != NULL) {
1085             prev->next = cur;
1086         }
1087         prev = cur;
1088 
1089         cur->data_len = len;
1090         off += len;
1091         total -= len;
1092         head->nb_segs++;
1093         cur = NULL;
1094     }
1095 
1096     struct ff_tx_offload offload = {0};
1097     ff_mbuf_tx_offload(m, &offload);
1098 
1099     if (offload.ip_csum) {
1100         head->ol_flags |= PKT_TX_IP_CKSUM;
1101         head->l2_len = sizeof(struct ether_hdr);
1102         head->l3_len = sizeof(struct ipv4_hdr);
1103     }
1104 
1105     if (ctx->hw_features.tx_csum_l4) {
1106         if (offload.tcp_csum) {
1107             head->ol_flags |= PKT_TX_TCP_CKSUM;
1108             head->l2_len = sizeof(struct ether_hdr);
1109             head->l3_len = sizeof(struct ipv4_hdr);
1110         }
1111 
1112         if (offload.tso_seg_size) {
1113             head->ol_flags |= PKT_TX_TCP_SEG;
1114             head->l4_len = sizeof(struct tcp_hdr);
1115             head->tso_segsz = offload.tso_seg_size;
1116         }
1117 
1118         if (offload.udp_csum) {
1119             head->ol_flags |= PKT_TX_UDP_CKSUM;
1120             head->l2_len = sizeof(struct ether_hdr);
1121             head->l3_len = sizeof(struct ipv4_hdr);
1122         }
1123     }
1124 
1125     ff_mbuf_free(m);
1126 
1127     return send_single_packet(head, ctx->port_id);
1128 }
1129 
1130 static int
1131 main_loop(void *arg)
1132 {
1133     struct loop_routine *lr = (struct loop_routine *)arg;
1134 
1135     struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
1136     unsigned lcore_id;
1137     uint64_t prev_tsc, diff_tsc, cur_tsc;
1138     int i, j, nb_rx;
1139     uint8_t port_id, queue_id;
1140     struct lcore_conf *qconf;
1141     const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) /
1142         US_PER_S * BURST_TX_DRAIN_US;
1143     struct ff_dpdk_if_context *ctx;
1144 
1145     prev_tsc = 0;
1146 
1147     lcore_id = rte_lcore_id();
1148     qconf = &lcore_conf;
1149 
1150     if (qconf->nb_rx_queue == 0) {
1151         printf("lcore %u has nothing to do\n", lcore_id);
1152         return 0;
1153     }
1154 
1155     while (1) {
1156         cur_tsc = rte_rdtsc();
1157         if (unlikely(freebsd_clock.expire < cur_tsc)) {
1158             rte_timer_manage();
1159         }
1160 
1161         /*
1162          * TX burst queue drain
1163          */
1164         diff_tsc = cur_tsc - prev_tsc;
1165         if (unlikely(diff_tsc > drain_tsc)) {
1166             /*
1167              * This could be optimized (use queueid instead of
1168              * portid), but it is not called so often
1169              */
1170             for (port_id = 0; port_id < RTE_MAX_ETHPORTS; port_id++) {
1171                 if (qconf->tx_mbufs[port_id].len == 0)
1172                     continue;
1173                 send_burst(qconf,
1174                     qconf->tx_mbufs[port_id].len,
1175                     port_id);
1176                 qconf->tx_mbufs[port_id].len = 0;
1177             }
1178 
1179             prev_tsc = cur_tsc;
1180         }
1181 
1182         /*
1183          * Read packet from RX queues
1184          */
1185         for (i = 0; i < qconf->nb_rx_queue; ++i) {
1186             port_id = qconf->rx_queue_list[i].port_id;
1187             queue_id = qconf->rx_queue_list[i].queue_id;
1188             ctx = veth_ctx[port_id];
1189 
1190             if (enable_kni && rte_eal_process_type() == RTE_PROC_PRIMARY) {
1191                 ff_kni_process(port_id, queue_id, pkts_burst, MAX_PKT_BURST);
1192             }
1193 
1194             process_arp_ring(port_id, queue_id, pkts_burst, ctx);
1195 
1196             nb_rx = rte_eth_rx_burst(port_id, queue_id, pkts_burst,
1197                 MAX_PKT_BURST);
1198             if (nb_rx == 0)
1199                 continue;
1200 
1201             /* Prefetch first packets */
1202             for (j = 0; j < PREFETCH_OFFSET && j < nb_rx; j++) {
1203                 rte_prefetch0(rte_pktmbuf_mtod(
1204                         pkts_burst[j], void *));
1205             }
1206 
1207             /* Prefetch and handle already prefetched packets */
1208             for (j = 0; j < (nb_rx - PREFETCH_OFFSET); j++) {
1209                 rte_prefetch0(rte_pktmbuf_mtod(pkts_burst[
1210                         j + PREFETCH_OFFSET], void *));
1211                 process_packets(port_id, queue_id, &pkts_burst[j], 1, ctx, 0);
1212             }
1213 
1214             /* Handle remaining prefetched packets */
1215             for (; j < nb_rx; j++) {
1216                 process_packets(port_id, queue_id, &pkts_burst[j], 1, ctx, 0);
1217             }
1218         }
1219 
1220         process_msg_ring(qconf->proc_id);
1221 
1222         if (likely(lr->loop != NULL)) {
1223             lr->loop(lr->arg);
1224         }
1225     }
1226 }
1227 
1228 int
1229 ff_dpdk_if_up(void) {
1230     int nb_ports = ff_global_cfg.dpdk.nb_ports;
1231     int i;
1232     for (i = 0; i < nb_ports; i++) {
1233         uint8_t port_id = ff_global_cfg.dpdk.port_cfgs[i].port_id;
1234         veth_ctx[port_id] = ff_veth_attach(ff_global_cfg.dpdk.port_cfgs + i);
1235         if (veth_ctx[port_id] == NULL) {
1236             rte_exit(EXIT_FAILURE, "ff_veth_attach failed");
1237         }
1238     }
1239 
1240     return 0;
1241 }
1242 
1243 void
1244 ff_dpdk_run(loop_func_t loop, void *arg) {
1245     struct loop_routine *lr = rte_malloc(NULL,
1246         sizeof(struct loop_routine), 0);
1247     lr->loop = loop;
1248     lr->arg = arg;
1249     rte_eal_mp_remote_launch(main_loop, lr, CALL_MASTER);
1250     rte_eal_mp_wait_lcore();
1251     rte_free(lr);
1252 }
1253 
1254 void
1255 ff_dpdk_pktmbuf_free(void *m)
1256 {
1257     rte_pktmbuf_free((struct rte_mbuf *)m);
1258 }
1259 
1260 static uint32_t
1261 toeplitz_hash(unsigned keylen, const uint8_t *key,
1262     unsigned datalen, const uint8_t *data)
1263 {
1264     uint32_t hash = 0, v;
1265     u_int i, b;
1266 
1267     /* XXXRW: Perhaps an assertion about key length vs. data length? */
1268 
1269     v = (key[0]<<24) + (key[1]<<16) + (key[2] <<8) + key[3];
1270     for (i = 0; i < datalen; i++) {
1271         for (b = 0; b < 8; b++) {
1272             if (data[i] & (1<<(7-b)))
1273                 hash ^= v;
1274             v <<= 1;
1275             if ((i + 4) < keylen &&
1276                 (key[i+4] & (1<<(7-b))))
1277                 v |= 1;
1278         }
1279     }
1280     return (hash);
1281 }
1282 
1283 int
1284 ff_rss_check(uint32_t saddr, uint32_t daddr, uint16_t sport, uint16_t dport)
1285 {
1286     struct lcore_conf *qconf = &lcore_conf;
1287 
1288     if (qconf->nb_procs == 1) {
1289         return 1;
1290     }
1291 
1292     uint8_t data[sizeof(saddr) + sizeof(daddr) + sizeof(sport) +
1293         sizeof(dport)];
1294 
1295     unsigned datalen = 0;
1296 
1297     bcopy(&saddr, &data[datalen], sizeof(saddr));
1298     datalen += sizeof(saddr);
1299 
1300     bcopy(&daddr, &data[datalen], sizeof(daddr));
1301     datalen += sizeof(daddr);
1302 
1303     bcopy(&sport, &data[datalen], sizeof(sport));
1304     datalen += sizeof(sport);
1305 
1306     bcopy(&dport, &data[datalen], sizeof(dport));
1307     datalen += sizeof(dport);
1308 
1309     uint32_t hash = toeplitz_hash(sizeof(default_rsskey_40bytes), default_rsskey_40bytes, datalen, data);
1310 
1311     return (hash % qconf->nb_procs) == qconf->proc_id;
1312 }
1313 
1314 
1315