xref: /f-stack/lib/ff_dpdk_if.c (revision e8591dc5)
1 /*
2  * Copyright (C) 2017 THL A29 Limited, a Tencent company.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * 1. Redistributions of source code must retain the above copyright notice, this
9  *   list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright notice,
11  *   this list of conditions and the following disclaimer in the documentation
12  *   and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17  * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
18  * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
19  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
20  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
21  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
22  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
23  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24  *
25  */
26 #include <assert.h>
27 #include <unistd.h>
28 
29 #include <rte_common.h>
30 #include <rte_byteorder.h>
31 #include <rte_log.h>
32 #include <rte_memory.h>
33 #include <rte_memcpy.h>
34 #include <rte_memzone.h>
35 #include <rte_config.h>
36 #include <rte_eal.h>
37 #include <rte_pci.h>
38 #include <rte_mbuf.h>
39 #include <rte_memory.h>
40 #include <rte_lcore.h>
41 #include <rte_launch.h>
42 #include <rte_ethdev.h>
43 #include <rte_debug.h>
44 #include <rte_common.h>
45 #include <rte_ether.h>
46 #include <rte_malloc.h>
47 #include <rte_cycles.h>
48 #include <rte_timer.h>
49 #include <rte_thash.h>
50 #include <rte_ip.h>
51 #include <rte_tcp.h>
52 #include <rte_udp.h>
53 
54 #include "ff_dpdk_if.h"
55 #include "ff_dpdk_pcap.h"
56 #include "ff_dpdk_kni.h"
57 #include "ff_config.h"
58 #include "ff_veth.h"
59 #include "ff_host_interface.h"
60 #include "ff_msg.h"
61 #include "ff_api.h"
62 
63 #define MEMPOOL_CACHE_SIZE 256
64 
65 #define DISPATCH_RING_SIZE 2048
66 
67 #define MSG_RING_SIZE 32
68 
69 /*
70  * Configurable number of RX/TX ring descriptors
71  */
72 #define RX_QUEUE_SIZE 512
73 #define TX_QUEUE_SIZE 512
74 
75 #define MAX_PKT_BURST 32
76 #define BURST_TX_DRAIN_US 100 /* TX drain every ~100us */
77 
78 /*
79  * Try to avoid TX buffering if we have at least MAX_TX_BURST packets to send.
80  */
81 #define MAX_TX_BURST    (MAX_PKT_BURST / 2)
82 
83 #define NB_SOCKETS 8
84 
85 /* Configure how many packets ahead to prefetch, when reading packets */
86 #define PREFETCH_OFFSET    3
87 
88 #define MAX_RX_QUEUE_PER_LCORE 16
89 #define MAX_TX_QUEUE_PER_PORT RTE_MAX_ETHPORTS
90 #define MAX_RX_QUEUE_PER_PORT 128
91 
92 #ifdef FF_KNI
93 #define KNI_MBUF_MAX 2048
94 #define KNI_QUEUE_SIZE 2048
95 
96 static int enable_kni;
97 static int kni_accept;
98 #endif
99 
100 static int numa_on;
101 
102 static unsigned idle_sleep;
103 
104 static struct rte_timer freebsd_clock;
105 
106 // Mellanox Linux's driver key
107 static uint8_t default_rsskey_40bytes[40] = {
108     0xd1, 0x81, 0xc6, 0x2c, 0xf7, 0xf4, 0xdb, 0x5b,
109     0x19, 0x83, 0xa2, 0xfc, 0x94, 0x3e, 0x1a, 0xdb,
110     0xd9, 0x38, 0x9e, 0x6b, 0xd1, 0x03, 0x9c, 0x2c,
111     0xa7, 0x44, 0x99, 0xad, 0x59, 0x3d, 0x56, 0xd9,
112     0xf3, 0x25, 0x3c, 0x06, 0x2a, 0xdc, 0x1f, 0xfc
113 };
114 
115 struct mbuf_table {
116     uint16_t len;
117     struct rte_mbuf *m_table[MAX_PKT_BURST];
118 };
119 
120 struct lcore_rx_queue {
121     uint16_t port_id;
122     uint16_t queue_id;
123 } __rte_cache_aligned;
124 
125 struct lcore_conf {
126     uint16_t proc_id;
127     uint16_t socket_id;
128     uint16_t nb_queue_list[RTE_MAX_ETHPORTS];
129     struct ff_port_cfg *port_cfgs;
130 
131     uint16_t nb_rx_queue;
132     struct lcore_rx_queue rx_queue_list[MAX_RX_QUEUE_PER_LCORE];
133     uint16_t nb_tx_port;
134     uint16_t tx_port_id[RTE_MAX_ETHPORTS];
135     uint16_t tx_queue_id[RTE_MAX_ETHPORTS];
136     struct mbuf_table tx_mbufs[RTE_MAX_ETHPORTS];
137     char *pcap[RTE_MAX_ETHPORTS];
138 } __rte_cache_aligned;
139 
140 static struct lcore_conf lcore_conf;
141 
142 static struct rte_mempool *pktmbuf_pool[NB_SOCKETS];
143 
144 static struct rte_ring **dispatch_ring[RTE_MAX_ETHPORTS];
145 static dispatch_func_t packet_dispatcher;
146 
147 static uint16_t rss_reta_size[RTE_MAX_ETHPORTS];
148 
149 struct ff_msg_ring {
150     char ring_name[2][RTE_RING_NAMESIZE];
151     /* ring[0] for lcore recv msg, other send */
152     /* ring[1] for lcore send msg, other read */
153     struct rte_ring *ring[2];
154 } __rte_cache_aligned;
155 
156 static struct ff_msg_ring msg_ring[RTE_MAX_LCORE];
157 static struct rte_mempool *message_pool;
158 
159 struct ff_dpdk_if_context {
160     void *sc;
161     void *ifp;
162     uint16_t port_id;
163     struct ff_hw_features hw_features;
164 } __rte_cache_aligned;
165 
166 static struct ff_dpdk_if_context *veth_ctx[RTE_MAX_ETHPORTS];
167 
168 static struct ff_top_args ff_top_status;
169 static struct ff_traffic_args ff_traffic;
170 
171 extern void ff_hardclock(void);
172 
173 static void
174 ff_hardclock_job(__rte_unused struct rte_timer *timer,
175     __rte_unused void *arg) {
176     ff_hardclock();
177     ff_update_current_ts();
178 }
179 
180 struct ff_dpdk_if_context *
181 ff_dpdk_register_if(void *sc, void *ifp, struct ff_port_cfg *cfg)
182 {
183     struct ff_dpdk_if_context *ctx;
184 
185     ctx = calloc(1, sizeof(struct ff_dpdk_if_context));
186     if (ctx == NULL)
187         return NULL;
188 
189     ctx->sc = sc;
190     ctx->ifp = ifp;
191     ctx->port_id = cfg->port_id;
192     ctx->hw_features = cfg->hw_features;
193 
194     return ctx;
195 }
196 
197 void
198 ff_dpdk_deregister_if(struct ff_dpdk_if_context *ctx)
199 {
200     free(ctx);
201 }
202 
203 static void
204 check_all_ports_link_status(void)
205 {
206     #define CHECK_INTERVAL 100 /* 100ms */
207     #define MAX_CHECK_TIME 90  /* 9s (90 * 100ms) in total */
208 
209     uint16_t portid;
210     uint8_t count, all_ports_up, print_flag = 0;
211     struct rte_eth_link link;
212 
213     printf("\nChecking link status");
214     fflush(stdout);
215 
216     int i, nb_ports;
217     nb_ports = ff_global_cfg.dpdk.nb_ports;
218     for (count = 0; count <= MAX_CHECK_TIME; count++) {
219         all_ports_up = 1;
220         for (i = 0; i < nb_ports; i++) {
221             uint16_t portid = ff_global_cfg.dpdk.portid_list[i];
222             memset(&link, 0, sizeof(link));
223             rte_eth_link_get_nowait(portid, &link);
224 
225             /* print link status if flag set */
226             if (print_flag == 1) {
227                 if (link.link_status) {
228                     printf("Port %d Link Up - speed %u "
229                         "Mbps - %s\n", (int)portid,
230                         (unsigned)link.link_speed,
231                         (link.link_duplex == ETH_LINK_FULL_DUPLEX) ?
232                         ("full-duplex") : ("half-duplex\n"));
233                 } else {
234                     printf("Port %d Link Down\n", (int)portid);
235                 }
236                 continue;
237             }
238             /* clear all_ports_up flag if any link down */
239             if (link.link_status == 0) {
240                 all_ports_up = 0;
241                 break;
242             }
243         }
244 
245         /* after finally printing all link status, get out */
246         if (print_flag == 1)
247             break;
248 
249         if (all_ports_up == 0) {
250             printf(".");
251             fflush(stdout);
252             rte_delay_ms(CHECK_INTERVAL);
253         }
254 
255         /* set the print_flag if all ports up or timeout */
256         if (all_ports_up == 1 || count == (MAX_CHECK_TIME - 1)) {
257             print_flag = 1;
258             printf("done\n");
259         }
260     }
261 }
262 
263 static int
264 init_lcore_conf(void)
265 {
266     uint8_t nb_dev_ports = rte_eth_dev_count_avail();
267     if (nb_dev_ports == 0) {
268         rte_exit(EXIT_FAILURE, "No probed ethernet devices\n");
269     }
270 
271     if (ff_global_cfg.dpdk.max_portid >= nb_dev_ports) {
272         rte_exit(EXIT_FAILURE, "this machine doesn't have port %d.\n",
273                  ff_global_cfg.dpdk.max_portid);
274     }
275 
276     lcore_conf.port_cfgs = ff_global_cfg.dpdk.port_cfgs;
277     lcore_conf.proc_id = ff_global_cfg.dpdk.proc_id;
278 
279     uint16_t proc_id;
280     for (proc_id = 0; proc_id < ff_global_cfg.dpdk.nb_procs; proc_id++) {
281         uint16_t lcore_id = ff_global_cfg.dpdk.proc_lcore[proc_id];
282         if (!lcore_config[lcore_id].detected) {
283             rte_exit(EXIT_FAILURE, "lcore %u unavailable\n", lcore_id);
284         }
285     }
286 
287     uint16_t socket_id = 0;
288     if (numa_on) {
289         socket_id = rte_lcore_to_socket_id(rte_lcore_id());
290     }
291 
292     lcore_conf.socket_id = socket_id;
293 
294     uint16_t lcore_id = ff_global_cfg.dpdk.proc_lcore[lcore_conf.proc_id];
295     int j;
296     for (j = 0; j < ff_global_cfg.dpdk.nb_ports; ++j) {
297         uint16_t port_id = ff_global_cfg.dpdk.portid_list[j];
298         struct ff_port_cfg *pconf = &ff_global_cfg.dpdk.port_cfgs[port_id];
299 
300         int queueid = -1;
301         int i;
302         for (i = 0; i < pconf->nb_lcores; i++) {
303             if (pconf->lcore_list[i] == lcore_id) {
304                 queueid = i;
305             }
306         }
307         if (queueid < 0) {
308             continue;
309         }
310         printf("lcore: %u, port: %u, queue: %u\n", lcore_id, port_id, queueid);
311         uint16_t nb_rx_queue = lcore_conf.nb_rx_queue;
312         lcore_conf.rx_queue_list[nb_rx_queue].port_id = port_id;
313         lcore_conf.rx_queue_list[nb_rx_queue].queue_id = queueid;
314         lcore_conf.nb_rx_queue++;
315 
316         lcore_conf.tx_queue_id[port_id] = queueid;
317         lcore_conf.tx_port_id[lcore_conf.nb_tx_port] = port_id;
318         lcore_conf.nb_tx_port++;
319 
320         lcore_conf.pcap[port_id] = pconf->pcap;
321         lcore_conf.nb_queue_list[port_id] = pconf->nb_lcores;
322     }
323 
324     if (lcore_conf.nb_rx_queue == 0) {
325         rte_exit(EXIT_FAILURE, "lcore %u has nothing to do\n", lcore_id);
326     }
327 
328     return 0;
329 }
330 
331 static int
332 init_mem_pool(void)
333 {
334     uint8_t nb_ports = ff_global_cfg.dpdk.nb_ports;
335     uint32_t nb_lcores = ff_global_cfg.dpdk.nb_procs;
336     uint32_t nb_tx_queue = nb_lcores;
337     uint32_t nb_rx_queue = lcore_conf.nb_rx_queue * nb_lcores;
338 
339     unsigned nb_mbuf = RTE_MAX (
340         (nb_rx_queue*RX_QUEUE_SIZE          +
341         nb_ports*nb_lcores*MAX_PKT_BURST    +
342         nb_ports*nb_tx_queue*TX_QUEUE_SIZE  +
343         nb_lcores*MEMPOOL_CACHE_SIZE +
344 #ifdef FF_KNI
345         nb_ports*KNI_MBUF_MAX +
346         nb_ports*KNI_QUEUE_SIZE +
347 #endif
348         nb_lcores*nb_ports*DISPATCH_RING_SIZE),
349         (unsigned)8192);
350 
351     unsigned socketid = 0;
352     uint16_t i, lcore_id;
353     char s[64];
354 
355     for (i = 0; i < ff_global_cfg.dpdk.nb_procs; i++) {
356         lcore_id = ff_global_cfg.dpdk.proc_lcore[i];
357         if (numa_on) {
358             socketid = rte_lcore_to_socket_id(lcore_id);
359         }
360 
361         if (socketid >= NB_SOCKETS) {
362             rte_exit(EXIT_FAILURE, "Socket %d of lcore %u is out of range %d\n",
363                 socketid, i, NB_SOCKETS);
364         }
365 
366         if (pktmbuf_pool[socketid] != NULL) {
367             continue;
368         }
369 
370         if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
371             snprintf(s, sizeof(s), "mbuf_pool_%d", socketid);
372             pktmbuf_pool[socketid] =
373                 rte_pktmbuf_pool_create(s, nb_mbuf,
374                     MEMPOOL_CACHE_SIZE, 0,
375                     RTE_MBUF_DEFAULT_BUF_SIZE, socketid);
376         } else {
377             snprintf(s, sizeof(s), "mbuf_pool_%d", socketid);
378             pktmbuf_pool[socketid] = rte_mempool_lookup(s);
379         }
380 
381         if (pktmbuf_pool[socketid] == NULL) {
382             rte_exit(EXIT_FAILURE, "Cannot create mbuf pool on socket %d\n", socketid);
383         } else {
384             printf("create mbuf pool on socket %d\n", socketid);
385         }
386     }
387 
388     return 0;
389 }
390 
391 static struct rte_ring *
392 create_ring(const char *name, unsigned count, int socket_id, unsigned flags)
393 {
394     struct rte_ring *ring;
395 
396     if (name == NULL) {
397         rte_exit(EXIT_FAILURE, "create ring failed, no name!\n");
398     }
399 
400     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
401         ring = rte_ring_create(name, count, socket_id, flags);
402     } else {
403         ring = rte_ring_lookup(name);
404     }
405 
406     if (ring == NULL) {
407         rte_exit(EXIT_FAILURE, "create ring:%s failed!\n", name);
408     }
409 
410     return ring;
411 }
412 
413 static int
414 init_dispatch_ring(void)
415 {
416     int j;
417     char name_buf[RTE_RING_NAMESIZE];
418     int queueid;
419 
420     unsigned socketid = lcore_conf.socket_id;
421 
422     /* Create ring according to ports actually being used. */
423     int nb_ports = ff_global_cfg.dpdk.nb_ports;
424     for (j = 0; j < nb_ports; j++) {
425         uint16_t portid = ff_global_cfg.dpdk.portid_list[j];
426         struct ff_port_cfg *pconf = &ff_global_cfg.dpdk.port_cfgs[portid];
427         int nb_queues = pconf->nb_lcores;
428         if (dispatch_ring[portid] == NULL) {
429             snprintf(name_buf, RTE_RING_NAMESIZE, "ring_ptr_p%d", portid);
430 
431             dispatch_ring[portid] = rte_zmalloc(name_buf,
432                 sizeof(struct rte_ring *) * nb_queues,
433                 RTE_CACHE_LINE_SIZE);
434             if (dispatch_ring[portid] == NULL) {
435                 rte_exit(EXIT_FAILURE, "rte_zmalloc(%s (struct rte_ring*)) "
436                     "failed\n", name_buf);
437             }
438         }
439 
440         for(queueid = 0; queueid < nb_queues; ++queueid) {
441             snprintf(name_buf, RTE_RING_NAMESIZE, "dispatch_ring_p%d_q%d",
442                 portid, queueid);
443             dispatch_ring[portid][queueid] = create_ring(name_buf,
444                 DISPATCH_RING_SIZE, socketid, RING_F_SC_DEQ);
445 
446             if (dispatch_ring[portid][queueid] == NULL)
447                 rte_panic("create ring:%s failed!\n", name_buf);
448 
449             printf("create ring:%s success, %u ring entries are now free!\n",
450                 name_buf, rte_ring_free_count(dispatch_ring[portid][queueid]));
451         }
452     }
453 
454     return 0;
455 }
456 
457 static void
458 ff_msg_init(struct rte_mempool *mp,
459     __attribute__((unused)) void *opaque_arg,
460     void *obj, __attribute__((unused)) unsigned i)
461 {
462     struct ff_msg *msg = (struct ff_msg *)obj;
463     msg->msg_type = FF_UNKNOWN;
464     msg->buf_addr = (char *)msg + sizeof(struct ff_msg);
465     msg->buf_len = mp->elt_size - sizeof(struct ff_msg);
466 }
467 
468 static int
469 init_msg_ring(void)
470 {
471     uint16_t i;
472     uint16_t nb_procs = ff_global_cfg.dpdk.nb_procs;
473     unsigned socketid = lcore_conf.socket_id;
474 
475     /* Create message buffer pool */
476     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
477         message_pool = rte_mempool_create(FF_MSG_POOL,
478            MSG_RING_SIZE * 2 * nb_procs,
479            MAX_MSG_BUF_SIZE, MSG_RING_SIZE / 2, 0,
480            NULL, NULL, ff_msg_init, NULL,
481            socketid, 0);
482     } else {
483         message_pool = rte_mempool_lookup(FF_MSG_POOL);
484     }
485 
486     if (message_pool == NULL) {
487         rte_panic("Create msg mempool failed\n");
488     }
489 
490     for(i = 0; i < nb_procs; ++i) {
491         snprintf(msg_ring[i].ring_name[0], RTE_RING_NAMESIZE,
492             "%s%u", FF_MSG_RING_IN, i);
493         snprintf(msg_ring[i].ring_name[1], RTE_RING_NAMESIZE,
494             "%s%u", FF_MSG_RING_OUT, i);
495 
496         msg_ring[i].ring[0] = create_ring(msg_ring[i].ring_name[0],
497             MSG_RING_SIZE, socketid, RING_F_SP_ENQ | RING_F_SC_DEQ);
498         if (msg_ring[i].ring[0] == NULL)
499             rte_panic("create ring::%s failed!\n", msg_ring[i].ring_name[0]);
500 
501         msg_ring[i].ring[1] = create_ring(msg_ring[i].ring_name[1],
502             MSG_RING_SIZE, socketid, RING_F_SP_ENQ | RING_F_SC_DEQ);
503         if (msg_ring[i].ring[1] == NULL)
504             rte_panic("create ring::%s failed!\n", msg_ring[i].ring_name[0]);
505     }
506 
507     return 0;
508 }
509 
510 #ifdef FF_KNI
511 static int
512 init_kni(void)
513 {
514     int nb_ports = rte_eth_dev_count_avail();
515     kni_accept = 0;
516     if(strcasecmp(ff_global_cfg.kni.method, "accept") == 0)
517         kni_accept = 1;
518 
519     ff_kni_init(nb_ports, ff_global_cfg.kni.tcp_port,
520         ff_global_cfg.kni.udp_port);
521 
522     unsigned socket_id = lcore_conf.socket_id;
523     struct rte_mempool *mbuf_pool = pktmbuf_pool[socket_id];
524 
525     nb_ports = ff_global_cfg.dpdk.nb_ports;
526     int i, ret;
527     for (i = 0; i < nb_ports; i++) {
528         uint16_t port_id = ff_global_cfg.dpdk.portid_list[i];
529         ff_kni_alloc(port_id, socket_id, mbuf_pool, KNI_QUEUE_SIZE);
530     }
531 
532     return 0;
533 }
534 #endif
535 
536 static void
537 set_rss_table(uint16_t port_id, uint16_t reta_size, uint16_t nb_queues)
538 {
539     if (reta_size == 0) {
540         return;
541     }
542 
543     int reta_conf_size = RTE_MAX(1, reta_size / RTE_RETA_GROUP_SIZE);
544     struct rte_eth_rss_reta_entry64 reta_conf[reta_conf_size];
545 
546     /* config HW indirection table */
547     unsigned i, j, hash=0;
548     for (i = 0; i < reta_conf_size; i++) {
549         reta_conf[i].mask = ~0ULL;
550         for (j = 0; j < RTE_RETA_GROUP_SIZE; j++) {
551             reta_conf[i].reta[j] = hash++ % nb_queues;
552         }
553     }
554 
555     if (rte_eth_dev_rss_reta_update(port_id, reta_conf, reta_size)) {
556         rte_exit(EXIT_FAILURE, "port[%d], failed to update rss table\n",
557             port_id);
558     }
559 }
560 
561 static int
562 init_port_start(void)
563 {
564     int nb_ports = ff_global_cfg.dpdk.nb_ports;
565     unsigned socketid = 0;
566     struct rte_mempool *mbuf_pool;
567     uint16_t i;
568 
569     for (i = 0; i < nb_ports; i++) {
570         uint16_t port_id = ff_global_cfg.dpdk.portid_list[i];
571         struct ff_port_cfg *pconf = &ff_global_cfg.dpdk.port_cfgs[port_id];
572         uint16_t nb_queues = pconf->nb_lcores;
573 
574         struct rte_eth_dev_info dev_info;
575         struct rte_eth_conf port_conf = {0};
576         struct rte_eth_rxconf rxq_conf;
577         struct rte_eth_txconf txq_conf;
578 
579         rte_eth_dev_info_get(port_id, &dev_info);
580 
581         if (nb_queues > dev_info.max_rx_queues) {
582             rte_exit(EXIT_FAILURE, "num_procs[%d] bigger than max_rx_queues[%d]\n",
583                 nb_queues,
584                 dev_info.max_rx_queues);
585         }
586 
587         if (nb_queues > dev_info.max_tx_queues) {
588             rte_exit(EXIT_FAILURE, "num_procs[%d] bigger than max_tx_queues[%d]\n",
589                 nb_queues,
590                 dev_info.max_tx_queues);
591         }
592 
593         struct ether_addr addr;
594         rte_eth_macaddr_get(port_id, &addr);
595         printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
596                    " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
597                 (unsigned)port_id,
598                 addr.addr_bytes[0], addr.addr_bytes[1],
599                 addr.addr_bytes[2], addr.addr_bytes[3],
600                 addr.addr_bytes[4], addr.addr_bytes[5]);
601 
602         rte_memcpy(pconf->mac,
603             addr.addr_bytes, ETHER_ADDR_LEN);
604 
605         /* Set RSS mode */
606         uint64_t default_rss_hf = ETH_RSS_PROTO_MASK;
607         port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
608         port_conf.rx_adv_conf.rss_conf.rss_hf = default_rss_hf;
609         port_conf.rx_adv_conf.rss_conf.rss_key = default_rsskey_40bytes;
610         port_conf.rx_adv_conf.rss_conf.rss_key_len = 40;
611         port_conf.rx_adv_conf.rss_conf.rss_hf &= dev_info.flow_type_rss_offloads;
612          if (port_conf.rx_adv_conf.rss_conf.rss_hf !=
613                 ETH_RSS_PROTO_MASK) {
614             printf("Port %u modified RSS hash function based on hardware support,"
615                     "requested:%#"PRIx64" configured:%#"PRIx64"\n",
616                     port_id, default_rss_hf,
617                     port_conf.rx_adv_conf.rss_conf.rss_hf);
618         }
619 
620         if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
621             port_conf.txmode.offloads |=
622                 DEV_TX_OFFLOAD_MBUF_FAST_FREE;
623 
624         /* Set Rx VLAN stripping */
625         if (ff_global_cfg.dpdk.vlan_strip) {
626             if (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_VLAN_STRIP) {
627                 port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_VLAN_STRIP;
628             }
629         }
630 
631         /* Enable HW CRC stripping */
632         if (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_KEEP_CRC) {
633             port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_KEEP_CRC;
634         }
635 
636         /* FIXME: Enable TCP LRO ?*/
637         #if 0
638         if (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_LRO) {
639             printf("LRO is supported\n");
640             port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_TCP_LRO;
641             pconf->hw_features.rx_lro = 1;
642         }
643         #endif
644 
645         /* Set Rx checksum checking */
646         if ((dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
647             (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_UDP_CKSUM) &&
648             (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)) {
649             printf("RX checksum offload supported\n");
650             port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_CHECKSUM;
651             pconf->hw_features.rx_csum = 1;
652         }
653 
654         if ((dev_info.tx_offload_capa & DEV_TX_OFFLOAD_IPV4_CKSUM)) {
655             printf("TX ip checksum offload supported\n");
656             port_conf.txmode.offloads |= DEV_TX_OFFLOAD_IPV4_CKSUM;
657             pconf->hw_features.tx_csum_ip = 1;
658         }
659 
660         if ((dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM) &&
661             (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)) {
662             printf("TX TCP&UDP checksum offload supported\n");
663             port_conf.txmode.offloads |= DEV_TX_OFFLOAD_UDP_CKSUM | DEV_TX_OFFLOAD_TCP_CKSUM;
664             pconf->hw_features.tx_csum_l4 = 1;
665         }
666 
667         if (ff_global_cfg.dpdk.tso) {
668             if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_TSO) {
669                 printf("TSO is supported\n");
670                 port_conf.txmode.offloads |= DEV_TX_OFFLOAD_TCP_TSO;
671                 pconf->hw_features.tx_tso = 1;
672             }
673         } else {
674             printf("TSO is disabled\n");
675         }
676 
677         if (dev_info.reta_size) {
678             /* reta size must be power of 2 */
679             assert((dev_info.reta_size & (dev_info.reta_size - 1)) == 0);
680 
681             rss_reta_size[port_id] = dev_info.reta_size;
682             printf("port[%d]: rss table size: %d\n", port_id,
683                 dev_info.reta_size);
684         }
685 
686         if (rte_eal_process_type() != RTE_PROC_PRIMARY) {
687             continue;
688         }
689 
690         int ret = rte_eth_dev_configure(port_id, nb_queues, nb_queues, &port_conf);
691         if (ret != 0) {
692             return ret;
693         }
694 
695         static uint16_t nb_rxd = RX_QUEUE_SIZE;
696         static uint16_t nb_txd = TX_QUEUE_SIZE;
697         ret = rte_eth_dev_adjust_nb_rx_tx_desc(port_id, &nb_rxd, &nb_txd);
698         if (ret < 0)
699             printf("Could not adjust number of descriptors "
700                     "for port%u (%d)\n", (unsigned)port_id, ret);
701 
702         uint16_t q;
703         for (q = 0; q < nb_queues; q++) {
704             if (numa_on) {
705                 uint16_t lcore_id = lcore_conf.port_cfgs[port_id].lcore_list[q];
706                 socketid = rte_lcore_to_socket_id(lcore_id);
707             }
708             mbuf_pool = pktmbuf_pool[socketid];
709 
710             txq_conf = dev_info.default_txconf;
711             txq_conf.offloads = port_conf.txmode.offloads;
712             ret = rte_eth_tx_queue_setup(port_id, q, nb_txd,
713                 socketid, &txq_conf);
714             if (ret < 0) {
715                 return ret;
716             }
717 
718             rxq_conf = dev_info.default_rxconf;
719             rxq_conf.offloads = port_conf.rxmode.offloads;
720             ret = rte_eth_rx_queue_setup(port_id, q, nb_rxd,
721                 socketid, &rxq_conf, mbuf_pool);
722             if (ret < 0) {
723                 return ret;
724             }
725         }
726 
727         ret = rte_eth_dev_start(port_id);
728         if (ret < 0) {
729             return ret;
730         }
731 
732         if (nb_queues > 1) {
733             /* set HW rss hash function to Toeplitz. */
734             if (!rte_eth_dev_filter_supported(port_id, RTE_ETH_FILTER_HASH)) {
735                 struct rte_eth_hash_filter_info info = {0};
736                 info.info_type = RTE_ETH_HASH_FILTER_GLOBAL_CONFIG;
737                 info.info.global_conf.hash_func = RTE_ETH_HASH_FUNCTION_TOEPLITZ;
738 
739                 if (rte_eth_dev_filter_ctrl(port_id, RTE_ETH_FILTER_HASH,
740                     RTE_ETH_FILTER_SET, &info) < 0) {
741                     rte_exit(EXIT_FAILURE, "port[%d] set hash func failed\n",
742                         port_id);
743                 }
744             }
745 
746             set_rss_table(port_id, dev_info.reta_size, nb_queues);
747         }
748 
749         /* Enable RX in promiscuous mode for the Ethernet device. */
750         if (ff_global_cfg.dpdk.promiscuous) {
751             rte_eth_promiscuous_enable(port_id);
752             ret = rte_eth_promiscuous_get(port_id);
753             if (ret == 1) {
754                 printf("set port %u to promiscuous mode ok\n", port_id);
755             } else {
756                 printf("set port %u to promiscuous mode error\n", port_id);
757             }
758         }
759 
760         /* Enable pcap dump */
761         if (pconf->pcap) {
762             ff_enable_pcap(pconf->pcap);
763         }
764     }
765 
766     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
767         check_all_ports_link_status();
768     }
769 
770     return 0;
771 }
772 
773 static int
774 init_clock(void)
775 {
776     rte_timer_subsystem_init();
777     uint64_t hz = rte_get_timer_hz();
778     uint64_t intrs = MS_PER_S/ff_global_cfg.freebsd.hz;
779     uint64_t tsc = (hz + MS_PER_S - 1) / MS_PER_S*intrs;
780 
781     rte_timer_init(&freebsd_clock);
782     rte_timer_reset(&freebsd_clock, tsc, PERIODICAL,
783         rte_lcore_id(), &ff_hardclock_job, NULL);
784 
785     ff_update_current_ts();
786 
787     return 0;
788 }
789 
790 int
791 ff_dpdk_init(int argc, char **argv)
792 {
793     if (ff_global_cfg.dpdk.nb_procs < 1 ||
794         ff_global_cfg.dpdk.nb_procs > RTE_MAX_LCORE ||
795         ff_global_cfg.dpdk.proc_id >= ff_global_cfg.dpdk.nb_procs ||
796         ff_global_cfg.dpdk.proc_id < 0) {
797         printf("param num_procs[%d] or proc_id[%d] error!\n",
798             ff_global_cfg.dpdk.nb_procs,
799             ff_global_cfg.dpdk.proc_id);
800         exit(1);
801     }
802 
803     int ret = rte_eal_init(argc, argv);
804     if (ret < 0) {
805         rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
806     }
807 
808     numa_on = ff_global_cfg.dpdk.numa_on;
809 
810     idle_sleep = ff_global_cfg.dpdk.idle_sleep;
811 
812     init_lcore_conf();
813 
814     init_mem_pool();
815 
816     init_dispatch_ring();
817 
818     init_msg_ring();
819 
820 #ifdef FF_KNI
821     enable_kni = ff_global_cfg.kni.enable;
822     if (enable_kni) {
823         init_kni();
824     }
825 #endif
826 
827     ret = init_port_start();
828     if (ret < 0) {
829         rte_exit(EXIT_FAILURE, "init_port_start failed\n");
830     }
831 
832     init_clock();
833 
834     return 0;
835 }
836 
837 static void
838 ff_veth_input(const struct ff_dpdk_if_context *ctx, struct rte_mbuf *pkt)
839 {
840     uint8_t rx_csum = ctx->hw_features.rx_csum;
841     if (rx_csum) {
842         if (pkt->ol_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD)) {
843             rte_pktmbuf_free(pkt);
844             return;
845         }
846     }
847 
848     /*
849      * FIXME: should we save pkt->vlan_tci
850      * if (pkt->ol_flags & PKT_RX_VLAN_PKT)
851      */
852 
853     void *data = rte_pktmbuf_mtod(pkt, void*);
854     uint16_t len = rte_pktmbuf_data_len(pkt);
855 
856     void *hdr = ff_mbuf_gethdr(pkt, pkt->pkt_len, data, len, rx_csum);
857     if (hdr == NULL) {
858         rte_pktmbuf_free(pkt);
859         return;
860     }
861 
862     struct rte_mbuf *pn = pkt->next;
863     void *prev = hdr;
864     while(pn != NULL) {
865         data = rte_pktmbuf_mtod(pn, void*);
866         len = rte_pktmbuf_data_len(pn);
867 
868         void *mb = ff_mbuf_get(prev, data, len);
869         if (mb == NULL) {
870             ff_mbuf_free(hdr);
871             rte_pktmbuf_free(pkt);
872             return;
873         }
874         pn = pn->next;
875         prev = mb;
876     }
877 
878     ff_veth_process_packet(ctx->ifp, hdr);
879 }
880 
881 static enum FilterReturn
882 protocol_filter(const void *data, uint16_t len)
883 {
884     if(len < ETHER_HDR_LEN)
885         return FILTER_UNKNOWN;
886 
887     const struct ether_hdr *hdr;
888     hdr = (const struct ether_hdr *)data;
889 
890     if(ntohs(hdr->ether_type) == ETHER_TYPE_ARP)
891         return FILTER_ARP;
892 
893 #ifndef FF_KNI
894     return FILTER_UNKNOWN;
895 #else
896     if (!enable_kni) {
897         return FILTER_UNKNOWN;
898     }
899 
900     if(ntohs(hdr->ether_type) != ETHER_TYPE_IPv4)
901         return FILTER_UNKNOWN;
902 
903     return ff_kni_proto_filter(data + ETHER_HDR_LEN,
904         len - ETHER_HDR_LEN);
905 #endif
906 }
907 
908 static inline void
909 pktmbuf_deep_attach(struct rte_mbuf *mi, const struct rte_mbuf *m)
910 {
911     struct rte_mbuf *md;
912     void *src, *dst;
913 
914     dst = rte_pktmbuf_mtod(mi, void *);
915     src = rte_pktmbuf_mtod(m, void *);
916 
917     mi->data_len = m->data_len;
918     rte_memcpy(dst, src, m->data_len);
919 
920     mi->port = m->port;
921     mi->vlan_tci = m->vlan_tci;
922     mi->vlan_tci_outer = m->vlan_tci_outer;
923     mi->tx_offload = m->tx_offload;
924     mi->hash = m->hash;
925     mi->ol_flags = m->ol_flags;
926     mi->packet_type = m->packet_type;
927 }
928 
929 /* copied from rte_pktmbuf_clone */
930 static inline struct rte_mbuf *
931 pktmbuf_deep_clone(const struct rte_mbuf *md,
932     struct rte_mempool *mp)
933 {
934     struct rte_mbuf *mc, *mi, **prev;
935     uint32_t pktlen;
936     uint8_t nseg;
937 
938     if (unlikely ((mc = rte_pktmbuf_alloc(mp)) == NULL))
939         return NULL;
940 
941     mi = mc;
942     prev = &mi->next;
943     pktlen = md->pkt_len;
944     nseg = 0;
945 
946     do {
947         nseg++;
948         pktmbuf_deep_attach(mi, md);
949         *prev = mi;
950         prev = &mi->next;
951     } while ((md = md->next) != NULL &&
952         (mi = rte_pktmbuf_alloc(mp)) != NULL);
953 
954     *prev = NULL;
955     mc->nb_segs = nseg;
956     mc->pkt_len = pktlen;
957 
958     /* Allocation of new indirect segment failed */
959     if (unlikely (mi == NULL)) {
960         rte_pktmbuf_free(mc);
961         return NULL;
962     }
963 
964     __rte_mbuf_sanity_check(mc, 1);
965     return mc;
966 }
967 
968 static inline void
969 process_packets(uint16_t port_id, uint16_t queue_id, struct rte_mbuf **bufs,
970     uint16_t count, const struct ff_dpdk_if_context *ctx, int pkts_from_ring)
971 {
972     struct lcore_conf *qconf = &lcore_conf;
973     uint16_t nb_queues = qconf->nb_queue_list[port_id];
974 
975     uint16_t i;
976     for (i = 0; i < count; i++) {
977         struct rte_mbuf *rtem = bufs[i];
978 
979         if (unlikely(qconf->pcap[port_id] != NULL)) {
980             if (!pkts_from_ring) {
981                 ff_dump_packets(qconf->pcap[port_id], rtem);
982             }
983         }
984 
985         void *data = rte_pktmbuf_mtod(rtem, void*);
986         uint16_t len = rte_pktmbuf_data_len(rtem);
987 
988         if (!pkts_from_ring) {
989             ff_traffic.rx_packets++;
990             ff_traffic.rx_bytes += len;
991         }
992 
993         if (!pkts_from_ring && packet_dispatcher) {
994             int ret = (*packet_dispatcher)(data, len, queue_id, nb_queues);
995             if (ret < 0 || ret >= nb_queues) {
996                 rte_pktmbuf_free(rtem);
997                 continue;
998             }
999 
1000             if (ret != queue_id) {
1001                 ret = rte_ring_enqueue(dispatch_ring[port_id][ret], rtem);
1002                 if (ret < 0)
1003                     rte_pktmbuf_free(rtem);
1004 
1005                 continue;
1006             }
1007         }
1008 
1009         enum FilterReturn filter = protocol_filter(data, len);
1010         if (filter == FILTER_ARP) {
1011             struct rte_mempool *mbuf_pool;
1012             struct rte_mbuf *mbuf_clone;
1013             if (!pkts_from_ring) {
1014                 uint16_t j;
1015                 for(j = 0; j < nb_queues; ++j) {
1016                     if(j == queue_id)
1017                         continue;
1018 
1019                     unsigned socket_id = 0;
1020                     if (numa_on) {
1021                         uint16_t lcore_id = qconf->port_cfgs[port_id].lcore_list[j];
1022                         socket_id = rte_lcore_to_socket_id(lcore_id);
1023                     }
1024                     mbuf_pool = pktmbuf_pool[socket_id];
1025                     mbuf_clone = pktmbuf_deep_clone(rtem, mbuf_pool);
1026                     if(mbuf_clone) {
1027                         int ret = rte_ring_enqueue(dispatch_ring[port_id][j],
1028                             mbuf_clone);
1029                         if (ret < 0)
1030                             rte_pktmbuf_free(mbuf_clone);
1031                     }
1032                 }
1033             }
1034 
1035 #ifdef FF_KNI
1036             if (enable_kni && rte_eal_process_type() == RTE_PROC_PRIMARY) {
1037                 mbuf_pool = pktmbuf_pool[qconf->socket_id];
1038                 mbuf_clone = pktmbuf_deep_clone(rtem, mbuf_pool);
1039                 if(mbuf_clone) {
1040                     ff_kni_enqueue(port_id, mbuf_clone);
1041                 }
1042             }
1043 #endif
1044             ff_veth_input(ctx, rtem);
1045 #ifdef FF_KNI
1046         } else if (enable_kni &&
1047             ((filter == FILTER_KNI && kni_accept) ||
1048             (filter == FILTER_UNKNOWN && !kni_accept)) ) {
1049             ff_kni_enqueue(port_id, rtem);
1050 #endif
1051         } else {
1052             ff_veth_input(ctx, rtem);
1053         }
1054     }
1055 }
1056 
1057 static inline int
1058 process_dispatch_ring(uint16_t port_id, uint16_t queue_id,
1059     struct rte_mbuf **pkts_burst, const struct ff_dpdk_if_context *ctx)
1060 {
1061     /* read packet from ring buf and to process */
1062     uint16_t nb_rb;
1063     nb_rb = rte_ring_dequeue_burst(dispatch_ring[port_id][queue_id],
1064         (void **)pkts_burst, MAX_PKT_BURST, NULL);
1065 
1066     if(nb_rb > 0) {
1067         process_packets(port_id, queue_id, pkts_burst, nb_rb, ctx, 1);
1068     }
1069 
1070     return 0;
1071 }
1072 
1073 static inline void
1074 handle_sysctl_msg(struct ff_msg *msg)
1075 {
1076     int ret = ff_sysctl(msg->sysctl.name, msg->sysctl.namelen,
1077         msg->sysctl.old, msg->sysctl.oldlenp, msg->sysctl.new,
1078         msg->sysctl.newlen);
1079 
1080     if (ret < 0) {
1081         msg->result = errno;
1082     } else {
1083         msg->result = 0;
1084     }
1085 }
1086 
1087 static inline void
1088 handle_ioctl_msg(struct ff_msg *msg)
1089 {
1090     int fd, ret;
1091     fd = ff_socket(AF_INET, SOCK_DGRAM, 0);
1092     if (fd < 0) {
1093         ret = -1;
1094         goto done;
1095     }
1096 
1097     ret = ff_ioctl_freebsd(fd, msg->ioctl.cmd, msg->ioctl.data);
1098 
1099     ff_close(fd);
1100 
1101 done:
1102     if (ret < 0) {
1103         msg->result = errno;
1104     } else {
1105         msg->result = 0;
1106     }
1107 }
1108 
1109 static inline void
1110 handle_route_msg(struct ff_msg *msg)
1111 {
1112     int ret = ff_rtioctl(msg->route.fib, msg->route.data,
1113         &msg->route.len, msg->route.maxlen);
1114     if (ret < 0) {
1115         msg->result = errno;
1116     } else {
1117         msg->result = 0;
1118     }
1119 }
1120 
1121 static inline void
1122 handle_top_msg(struct ff_msg *msg)
1123 {
1124     msg->top = ff_top_status;
1125     msg->result = 0;
1126 }
1127 
1128 #ifdef FF_NETGRAPH
1129 static inline void
1130 handle_ngctl_msg(struct ff_msg *msg)
1131 {
1132     int ret = ff_ngctl(msg->ngctl.cmd, msg->ngctl.data);
1133     if (ret < 0) {
1134         msg->result = errno;
1135     } else {
1136         msg->result = 0;
1137         msg->ngctl.ret = ret;
1138     }
1139 }
1140 #endif
1141 
1142 #ifdef FF_IPFW
1143 static inline void
1144 handle_ipfw_msg(struct ff_msg *msg)
1145 {
1146     int fd, ret;
1147     fd = ff_socket(AF_INET, SOCK_RAW, IPPROTO_RAW);
1148     if (fd < 0) {
1149         ret = -1;
1150         goto done;
1151     }
1152 
1153     switch (msg->ipfw.cmd) {
1154         case FF_IPFW_GET:
1155             ret = ff_getsockopt_freebsd(fd, msg->ipfw.level,
1156                 msg->ipfw.optname, msg->ipfw.optval,
1157                 msg->ipfw.optlen);
1158             break;
1159         case FF_IPFW_SET:
1160             ret = ff_setsockopt_freebsd(fd, msg->ipfw.level,
1161                 msg->ipfw.optname, msg->ipfw.optval,
1162                 *(msg->ipfw.optlen));
1163             break;
1164         default:
1165             ret = -1;
1166             errno = ENOTSUP;
1167             break;
1168     }
1169 
1170     ff_close(fd);
1171 
1172 done:
1173     if (ret < 0) {
1174         msg->result = errno;
1175     } else {
1176         msg->result = 0;
1177     }
1178 }
1179 #endif
1180 
1181 static inline void
1182 handle_traffic_msg(struct ff_msg *msg)
1183 {
1184     msg->traffic = ff_traffic;
1185     msg->result = 0;
1186 }
1187 
1188 static inline void
1189 handle_default_msg(struct ff_msg *msg)
1190 {
1191     msg->result = ENOTSUP;
1192 }
1193 
1194 static inline void
1195 handle_msg(struct ff_msg *msg, uint16_t proc_id)
1196 {
1197     switch (msg->msg_type) {
1198         case FF_SYSCTL:
1199             handle_sysctl_msg(msg);
1200             break;
1201         case FF_IOCTL:
1202             handle_ioctl_msg(msg);
1203             break;
1204         case FF_ROUTE:
1205             handle_route_msg(msg);
1206             break;
1207         case FF_TOP:
1208             handle_top_msg(msg);
1209             break;
1210 #ifdef FF_NETGRAPH
1211         case FF_NGCTL:
1212             handle_ngctl_msg(msg);
1213             break;
1214 #endif
1215 #ifdef FF_IPFW
1216         case FF_IPFW_CTL:
1217             handle_ipfw_msg(msg);
1218             break;
1219 #endif
1220         case FF_TRAFFIC:
1221             handle_traffic_msg(msg);
1222             break;
1223         default:
1224             handle_default_msg(msg);
1225             break;
1226     }
1227     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
1228 }
1229 
1230 static inline int
1231 process_msg_ring(uint16_t proc_id)
1232 {
1233     void *msg;
1234     int ret = rte_ring_dequeue(msg_ring[proc_id].ring[0], &msg);
1235 
1236     if (unlikely(ret == 0)) {
1237         handle_msg((struct ff_msg *)msg, proc_id);
1238     }
1239 
1240     return 0;
1241 }
1242 
1243 /* Send burst of packets on an output interface */
1244 static inline int
1245 send_burst(struct lcore_conf *qconf, uint16_t n, uint8_t port)
1246 {
1247     struct rte_mbuf **m_table;
1248     int ret;
1249     uint16_t queueid;
1250 
1251     queueid = qconf->tx_queue_id[port];
1252     m_table = (struct rte_mbuf **)qconf->tx_mbufs[port].m_table;
1253 
1254     if (unlikely(qconf->pcap[port] != NULL)) {
1255         uint16_t i;
1256         for (i = 0; i < n; i++) {
1257             ff_dump_packets(qconf->pcap[port], m_table[i]);
1258         }
1259     }
1260 
1261     ff_traffic.tx_packets += n;
1262     uint16_t i;
1263     for (i = 0; i < n; i++) {
1264         ff_traffic.tx_bytes += rte_pktmbuf_data_len(m_table[i]);
1265     }
1266 
1267     ret = rte_eth_tx_burst(port, queueid, m_table, n);
1268     if (unlikely(ret < n)) {
1269         do {
1270             rte_pktmbuf_free(m_table[ret]);
1271         } while (++ret < n);
1272     }
1273 
1274     return 0;
1275 }
1276 
1277 /* Enqueue a single packet, and send burst if queue is filled */
1278 static inline int
1279 send_single_packet(struct rte_mbuf *m, uint8_t port)
1280 {
1281     uint16_t len;
1282     struct lcore_conf *qconf;
1283 
1284     qconf = &lcore_conf;
1285     len = qconf->tx_mbufs[port].len;
1286     qconf->tx_mbufs[port].m_table[len] = m;
1287     len++;
1288 
1289     /* enough pkts to be sent */
1290     if (unlikely(len == MAX_PKT_BURST)) {
1291         send_burst(qconf, MAX_PKT_BURST, port);
1292         len = 0;
1293     }
1294 
1295     qconf->tx_mbufs[port].len = len;
1296     return 0;
1297 }
1298 
1299 int
1300 ff_dpdk_if_send(struct ff_dpdk_if_context *ctx, void *m,
1301     int total)
1302 {
1303     struct rte_mempool *mbuf_pool = pktmbuf_pool[lcore_conf.socket_id];
1304     struct rte_mbuf *head = rte_pktmbuf_alloc(mbuf_pool);
1305     if (head == NULL) {
1306         ff_mbuf_free(m);
1307         return -1;
1308     }
1309 
1310     head->pkt_len = total;
1311     head->nb_segs = 0;
1312 
1313     int off = 0;
1314     struct rte_mbuf *cur = head, *prev = NULL;
1315     while(total > 0) {
1316         if (cur == NULL) {
1317             cur = rte_pktmbuf_alloc(mbuf_pool);
1318             if (cur == NULL) {
1319                 rte_pktmbuf_free(head);
1320                 ff_mbuf_free(m);
1321                 return -1;
1322             }
1323         }
1324 
1325         if (prev != NULL) {
1326             prev->next = cur;
1327         }
1328         head->nb_segs++;
1329 
1330         prev = cur;
1331         void *data = rte_pktmbuf_mtod(cur, void*);
1332         int len = total > RTE_MBUF_DEFAULT_DATAROOM ? RTE_MBUF_DEFAULT_DATAROOM : total;
1333         int ret = ff_mbuf_copydata(m, data, off, len);
1334         if (ret < 0) {
1335             rte_pktmbuf_free(head);
1336             ff_mbuf_free(m);
1337             return -1;
1338         }
1339 
1340 
1341         cur->data_len = len;
1342         off += len;
1343         total -= len;
1344         cur = NULL;
1345     }
1346 
1347     struct ff_tx_offload offload = {0};
1348     ff_mbuf_tx_offload(m, &offload);
1349 
1350     void *data = rte_pktmbuf_mtod(head, void*);
1351 
1352     if (offload.ip_csum) {
1353         /* ipv6 not supported yet */
1354         struct ipv4_hdr *iph;
1355         int iph_len;
1356         iph = (struct ipv4_hdr *)(data + ETHER_HDR_LEN);
1357         iph_len = (iph->version_ihl & 0x0f) << 2;
1358 
1359         head->ol_flags |= PKT_TX_IP_CKSUM | PKT_TX_IPV4;
1360         head->l2_len = ETHER_HDR_LEN;
1361         head->l3_len = iph_len;
1362     }
1363 
1364     if (ctx->hw_features.tx_csum_l4) {
1365         struct ipv4_hdr *iph;
1366         int iph_len;
1367         iph = (struct ipv4_hdr *)(data + ETHER_HDR_LEN);
1368         iph_len = (iph->version_ihl & 0x0f) << 2;
1369 
1370         if (offload.tcp_csum) {
1371             head->ol_flags |= PKT_TX_TCP_CKSUM;
1372             head->l2_len = ETHER_HDR_LEN;
1373             head->l3_len = iph_len;
1374         }
1375 
1376         /*
1377          *  TCP segmentation offload.
1378          *
1379          *  - set the PKT_TX_TCP_SEG flag in mbuf->ol_flags (this flag
1380          *    implies PKT_TX_TCP_CKSUM)
1381          *  - set the flag PKT_TX_IPV4 or PKT_TX_IPV6
1382          *  - if it's IPv4, set the PKT_TX_IP_CKSUM flag and
1383          *    write the IP checksum to 0 in the packet
1384          *  - fill the mbuf offload information: l2_len,
1385          *    l3_len, l4_len, tso_segsz
1386          *  - calculate the pseudo header checksum without taking ip_len
1387          *    in account, and set it in the TCP header. Refer to
1388          *    rte_ipv4_phdr_cksum() and rte_ipv6_phdr_cksum() that can be
1389          *    used as helpers.
1390          */
1391         if (offload.tso_seg_size) {
1392             struct tcp_hdr *tcph;
1393             int tcph_len;
1394             tcph = (struct tcp_hdr *)((char *)iph + iph_len);
1395             tcph_len = (tcph->data_off & 0xf0) >> 2;
1396             tcph->cksum = rte_ipv4_phdr_cksum(iph, PKT_TX_TCP_SEG);
1397 
1398             head->ol_flags |= PKT_TX_TCP_SEG;
1399             head->l4_len = tcph_len;
1400             head->tso_segsz = offload.tso_seg_size;
1401         }
1402 
1403         if (offload.udp_csum) {
1404             head->ol_flags |= PKT_TX_UDP_CKSUM;
1405             head->l2_len = ETHER_HDR_LEN;
1406             head->l3_len = iph_len;
1407         }
1408     }
1409 
1410     ff_mbuf_free(m);
1411 
1412     return send_single_packet(head, ctx->port_id);
1413 }
1414 
1415 static int
1416 main_loop(void *arg)
1417 {
1418     struct loop_routine *lr = (struct loop_routine *)arg;
1419 
1420     struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
1421     uint64_t prev_tsc, diff_tsc, cur_tsc, usch_tsc, div_tsc, usr_tsc, sys_tsc, end_tsc, idle_sleep_tsc;
1422     int i, j, nb_rx, idle;
1423     uint16_t port_id, queue_id;
1424     struct lcore_conf *qconf;
1425     const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) /
1426         US_PER_S * BURST_TX_DRAIN_US;
1427     struct ff_dpdk_if_context *ctx;
1428 
1429     prev_tsc = 0;
1430     usch_tsc = 0;
1431 
1432     qconf = &lcore_conf;
1433 
1434     while (1) {
1435         cur_tsc = rte_rdtsc();
1436         if (unlikely(freebsd_clock.expire < cur_tsc)) {
1437             rte_timer_manage();
1438         }
1439 
1440         idle = 1;
1441         sys_tsc = 0;
1442         usr_tsc = 0;
1443 
1444         /*
1445          * TX burst queue drain
1446          */
1447         diff_tsc = cur_tsc - prev_tsc;
1448         if (unlikely(diff_tsc > drain_tsc)) {
1449             for (i = 0; i < qconf->nb_tx_port; i++) {
1450                 port_id = qconf->tx_port_id[i];
1451                 if (qconf->tx_mbufs[port_id].len == 0)
1452                     continue;
1453 
1454                 idle = 0;
1455 
1456                 send_burst(qconf,
1457                     qconf->tx_mbufs[port_id].len,
1458                     port_id);
1459                 qconf->tx_mbufs[port_id].len = 0;
1460             }
1461 
1462             prev_tsc = cur_tsc;
1463         }
1464 
1465         /*
1466          * Read packet from RX queues
1467          */
1468         for (i = 0; i < qconf->nb_rx_queue; ++i) {
1469             port_id = qconf->rx_queue_list[i].port_id;
1470             queue_id = qconf->rx_queue_list[i].queue_id;
1471             ctx = veth_ctx[port_id];
1472 
1473 #ifdef FF_KNI
1474             if (enable_kni && rte_eal_process_type() == RTE_PROC_PRIMARY) {
1475                 ff_kni_process(port_id, queue_id, pkts_burst, MAX_PKT_BURST);
1476             }
1477 #endif
1478 
1479             process_dispatch_ring(port_id, queue_id, pkts_burst, ctx);
1480 
1481             nb_rx = rte_eth_rx_burst(port_id, queue_id, pkts_burst,
1482                 MAX_PKT_BURST);
1483             if (nb_rx == 0)
1484                 continue;
1485 
1486             idle = 0;
1487 
1488             /* Prefetch first packets */
1489             for (j = 0; j < PREFETCH_OFFSET && j < nb_rx; j++) {
1490                 rte_prefetch0(rte_pktmbuf_mtod(
1491                         pkts_burst[j], void *));
1492             }
1493 
1494             /* Prefetch and handle already prefetched packets */
1495             for (j = 0; j < (nb_rx - PREFETCH_OFFSET); j++) {
1496                 rte_prefetch0(rte_pktmbuf_mtod(pkts_burst[
1497                         j + PREFETCH_OFFSET], void *));
1498                 process_packets(port_id, queue_id, &pkts_burst[j], 1, ctx, 0);
1499             }
1500 
1501             /* Handle remaining prefetched packets */
1502             for (; j < nb_rx; j++) {
1503                 process_packets(port_id, queue_id, &pkts_burst[j], 1, ctx, 0);
1504             }
1505         }
1506 
1507         process_msg_ring(qconf->proc_id);
1508 
1509         div_tsc = rte_rdtsc();
1510 
1511         if (likely(lr->loop != NULL && (!idle || cur_tsc - usch_tsc > drain_tsc))) {
1512             usch_tsc = cur_tsc;
1513             lr->loop(lr->arg);
1514         }
1515 
1516         idle_sleep_tsc = rte_rdtsc();
1517         if (likely(idle && idle_sleep)) {
1518             usleep(idle_sleep);
1519             end_tsc = rte_rdtsc();
1520         } else {
1521             end_tsc = idle_sleep_tsc;
1522         }
1523 
1524         end_tsc = rte_rdtsc();
1525 
1526         if (usch_tsc == cur_tsc) {
1527             usr_tsc = idle_sleep_tsc - div_tsc;
1528         }
1529 
1530         if (!idle) {
1531             sys_tsc = div_tsc - cur_tsc;
1532             ff_top_status.sys_tsc += sys_tsc;
1533         }
1534 
1535         ff_top_status.usr_tsc += usr_tsc;
1536         ff_top_status.work_tsc += end_tsc - cur_tsc;
1537         ff_top_status.idle_tsc += end_tsc - cur_tsc - usr_tsc - sys_tsc;
1538 
1539         ff_top_status.loops++;
1540     }
1541 
1542     return 0;
1543 }
1544 
1545 int
1546 ff_dpdk_if_up(void) {
1547     int i;
1548     struct lcore_conf *qconf = &lcore_conf;
1549     for (i = 0; i < qconf->nb_tx_port; i++) {
1550         uint16_t port_id = qconf->tx_port_id[i];
1551 
1552         struct ff_port_cfg *pconf = &qconf->port_cfgs[port_id];
1553         veth_ctx[port_id] = ff_veth_attach(pconf);
1554         if (veth_ctx[port_id] == NULL) {
1555             rte_exit(EXIT_FAILURE, "ff_veth_attach failed");
1556         }
1557     }
1558 
1559     return 0;
1560 }
1561 
1562 void
1563 ff_dpdk_run(loop_func_t loop, void *arg) {
1564     struct loop_routine *lr = rte_malloc(NULL,
1565         sizeof(struct loop_routine), 0);
1566     lr->loop = loop;
1567     lr->arg = arg;
1568     rte_eal_mp_remote_launch(main_loop, lr, CALL_MASTER);
1569     rte_eal_mp_wait_lcore();
1570     rte_free(lr);
1571 }
1572 
1573 void
1574 ff_dpdk_pktmbuf_free(void *m)
1575 {
1576     rte_pktmbuf_free((struct rte_mbuf *)m);
1577 }
1578 
1579 static uint32_t
1580 toeplitz_hash(unsigned keylen, const uint8_t *key,
1581     unsigned datalen, const uint8_t *data)
1582 {
1583     uint32_t hash = 0, v;
1584     u_int i, b;
1585 
1586     /* XXXRW: Perhaps an assertion about key length vs. data length? */
1587 
1588     v = (key[0]<<24) + (key[1]<<16) + (key[2] <<8) + key[3];
1589     for (i = 0; i < datalen; i++) {
1590         for (b = 0; b < 8; b++) {
1591             if (data[i] & (1<<(7-b)))
1592                 hash ^= v;
1593             v <<= 1;
1594             if ((i + 4) < keylen &&
1595                 (key[i+4] & (1<<(7-b))))
1596                 v |= 1;
1597         }
1598     }
1599     return (hash);
1600 }
1601 
1602 int
1603 ff_rss_check(void *softc, uint32_t saddr, uint32_t daddr,
1604     uint16_t sport, uint16_t dport)
1605 {
1606     struct lcore_conf *qconf = &lcore_conf;
1607     struct ff_dpdk_if_context *ctx = ff_veth_softc_to_hostc(softc);
1608     uint16_t nb_queues = qconf->nb_queue_list[ctx->port_id];
1609 
1610     if (nb_queues <= 1) {
1611         return 1;
1612     }
1613 
1614     uint16_t reta_size = rss_reta_size[ctx->port_id];
1615     uint16_t queueid = qconf->tx_queue_id[ctx->port_id];
1616 
1617     uint8_t data[sizeof(saddr) + sizeof(daddr) + sizeof(sport) +
1618         sizeof(dport)];
1619 
1620     unsigned datalen = 0;
1621 
1622     bcopy(&saddr, &data[datalen], sizeof(saddr));
1623     datalen += sizeof(saddr);
1624 
1625     bcopy(&daddr, &data[datalen], sizeof(daddr));
1626     datalen += sizeof(daddr);
1627 
1628     bcopy(&sport, &data[datalen], sizeof(sport));
1629     datalen += sizeof(sport);
1630 
1631     bcopy(&dport, &data[datalen], sizeof(dport));
1632     datalen += sizeof(dport);
1633 
1634     uint32_t hash = toeplitz_hash(sizeof(default_rsskey_40bytes),
1635         default_rsskey_40bytes, datalen, data);
1636 
1637     return ((hash & (reta_size - 1)) % nb_queues) == queueid;
1638 }
1639 
1640 void
1641 ff_regist_packet_dispatcher(dispatch_func_t func)
1642 {
1643     packet_dispatcher = func;
1644 }
1645 
1646 uint64_t
1647 ff_get_tsc_ns()
1648 {
1649     uint64_t cur_tsc = rte_rdtsc();
1650     uint64_t hz = rte_get_tsc_hz();
1651     return ((double)cur_tsc/(double)hz) * NS_PER_S;
1652 }
1653 
1654