xref: /f-stack/lib/ff_dpdk_if.c (revision 744da4ea)
1 /*
2  * Copyright (C) 2017 THL A29 Limited, a Tencent company.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * 1. Redistributions of source code must retain the above copyright notice, this
9  *   list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright notice,
11  *   this list of conditions and the following disclaimer in the documentation
12  *   and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17  * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
18  * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
19  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
20  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
21  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
22  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
23  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24  *
25  */
26 #include <assert.h>
27 
28 #include <rte_common.h>
29 #include <rte_byteorder.h>
30 #include <rte_log.h>
31 #include <rte_memory.h>
32 #include <rte_memcpy.h>
33 #include <rte_memzone.h>
34 #include <rte_config.h>
35 #include <rte_eal.h>
36 #include <rte_pci.h>
37 #include <rte_mbuf.h>
38 #include <rte_memory.h>
39 #include <rte_lcore.h>
40 #include <rte_launch.h>
41 #include <rte_ethdev.h>
42 #include <rte_debug.h>
43 #include <rte_common.h>
44 #include <rte_ether.h>
45 #include <rte_malloc.h>
46 #include <rte_cycles.h>
47 #include <rte_timer.h>
48 #include <rte_thash.h>
49 #include <rte_ip.h>
50 #include <rte_tcp.h>
51 #include <rte_udp.h>
52 
53 #include "ff_dpdk_if.h"
54 #include "ff_dpdk_pcap.h"
55 #include "ff_dpdk_kni.h"
56 #include "ff_config.h"
57 #include "ff_veth.h"
58 #include "ff_host_interface.h"
59 #include "ff_msg.h"
60 #include "ff_api.h"
61 
62 #define MEMPOOL_CACHE_SIZE 256
63 
64 #define DISPATCH_RING_SIZE 2048
65 
66 #define MSG_RING_SIZE 32
67 
68 /*
69  * Configurable number of RX/TX ring descriptors
70  */
71 #define RX_QUEUE_SIZE 512
72 #define TX_QUEUE_SIZE 512
73 
74 #define MAX_PKT_BURST 32
75 #define BURST_TX_DRAIN_US 100 /* TX drain every ~100us */
76 
77 /*
78  * Try to avoid TX buffering if we have at least MAX_TX_BURST packets to send.
79  */
80 #define MAX_TX_BURST    (MAX_PKT_BURST / 2)
81 
82 #define NB_SOCKETS 8
83 
84 /* Configure how many packets ahead to prefetch, when reading packets */
85 #define PREFETCH_OFFSET    3
86 
87 #define MAX_RX_QUEUE_PER_LCORE 16
88 #define MAX_TX_QUEUE_PER_PORT RTE_MAX_ETHPORTS
89 #define MAX_RX_QUEUE_PER_PORT 128
90 
91 #define KNI_MBUF_MAX 2048
92 #define KNI_QUEUE_SIZE 2048
93 
94 static int enable_kni;
95 static int kni_accept;
96 
97 static int numa_on;
98 
99 static struct rte_timer freebsd_clock;
100 
101 // Mellanox Linux's driver key
102 static uint8_t default_rsskey_40bytes[40] = {
103     0xd1, 0x81, 0xc6, 0x2c, 0xf7, 0xf4, 0xdb, 0x5b,
104     0x19, 0x83, 0xa2, 0xfc, 0x94, 0x3e, 0x1a, 0xdb,
105     0xd9, 0x38, 0x9e, 0x6b, 0xd1, 0x03, 0x9c, 0x2c,
106     0xa7, 0x44, 0x99, 0xad, 0x59, 0x3d, 0x56, 0xd9,
107     0xf3, 0x25, 0x3c, 0x06, 0x2a, 0xdc, 0x1f, 0xfc
108 };
109 
110 static struct rte_eth_conf default_port_conf = {
111     .rxmode = {
112         .mq_mode = ETH_MQ_RX_RSS,
113         .max_rx_pkt_len = ETHER_MAX_LEN,
114         .split_hdr_size = 0, /**< hdr buf size */
115         .header_split   = 0, /**< Header Split disabled */
116         .hw_ip_checksum = 0, /**< IP checksum offload disabled */
117         .hw_vlan_filter = 0, /**< VLAN filtering disabled */
118         .hw_vlan_strip  = 0, /**< VLAN strip disabled. */
119         .hw_vlan_extend = 0, /**< Extended VLAN disabled. */
120         .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
121         .hw_strip_crc   = 0, /**< CRC stripped by hardware */
122         .enable_lro     = 0, /**< LRO disabled */
123     },
124     .rx_adv_conf = {
125         .rss_conf = {
126             .rss_key = default_rsskey_40bytes,
127             .rss_key_len = 40,
128             .rss_hf = ETH_RSS_PROTO_MASK,
129         },
130     },
131     .txmode = {
132         .mq_mode = ETH_MQ_TX_NONE,
133     },
134 };
135 
136 struct mbuf_table {
137     uint16_t len;
138     struct rte_mbuf *m_table[MAX_PKT_BURST];
139 };
140 
141 struct lcore_rx_queue {
142     uint8_t port_id;
143     uint8_t queue_id;
144 } __rte_cache_aligned;
145 
146 struct lcore_conf {
147     uint16_t proc_id;
148     uint16_t socket_id;
149     uint16_t nb_queue_list[RTE_MAX_ETHPORTS];
150     struct ff_port_cfg *port_cfgs;
151 
152     uint16_t nb_rx_queue;
153     struct lcore_rx_queue rx_queue_list[MAX_RX_QUEUE_PER_LCORE];
154     uint16_t nb_tx_port;
155     uint16_t tx_port_id[RTE_MAX_ETHPORTS];
156     uint16_t tx_queue_id[RTE_MAX_ETHPORTS];
157     struct mbuf_table tx_mbufs[RTE_MAX_ETHPORTS];
158     char *pcap[RTE_MAX_ETHPORTS];
159 } __rte_cache_aligned;
160 
161 static struct lcore_conf lcore_conf;
162 
163 static struct rte_mempool *pktmbuf_pool[NB_SOCKETS];
164 
165 static struct rte_ring **dispatch_ring[RTE_MAX_ETHPORTS];
166 static dispatch_func_t packet_dispatcher;
167 
168 static uint16_t rss_reta_size[RTE_MAX_ETHPORTS];
169 
170 struct ff_msg_ring {
171     char ring_name[2][RTE_RING_NAMESIZE];
172     /* ring[0] for lcore recv msg, other send */
173     /* ring[1] for lcore send msg, other read */
174     struct rte_ring *ring[2];
175 } __rte_cache_aligned;
176 
177 static struct ff_msg_ring msg_ring[RTE_MAX_LCORE];
178 static struct rte_mempool *message_pool;
179 
180 struct ff_dpdk_if_context {
181     void *sc;
182     void *ifp;
183     uint16_t port_id;
184     struct ff_hw_features hw_features;
185 } __rte_cache_aligned;
186 
187 static struct ff_dpdk_if_context *veth_ctx[RTE_MAX_ETHPORTS];
188 
189 extern void ff_hardclock(void);
190 
191 static void
192 ff_hardclock_job(__rte_unused struct rte_timer *timer,
193     __rte_unused void *arg) {
194     ff_hardclock();
195     ff_update_current_ts();
196 }
197 
198 struct ff_dpdk_if_context *
199 ff_dpdk_register_if(void *sc, void *ifp, struct ff_port_cfg *cfg)
200 {
201     struct ff_dpdk_if_context *ctx;
202 
203     ctx = calloc(1, sizeof(struct ff_dpdk_if_context));
204     if (ctx == NULL)
205         return NULL;
206 
207     ctx->sc = sc;
208     ctx->ifp = ifp;
209     ctx->port_id = cfg->port_id;
210     ctx->hw_features = cfg->hw_features;
211 
212     return ctx;
213 }
214 
215 void
216 ff_dpdk_deregister_if(struct ff_dpdk_if_context *ctx)
217 {
218     free(ctx);
219 }
220 
221 static void
222 check_all_ports_link_status(void)
223 {
224     #define CHECK_INTERVAL 100 /* 100ms */
225     #define MAX_CHECK_TIME 90  /* 9s (90 * 100ms) in total */
226 
227     uint8_t portid, count, all_ports_up, print_flag = 0;
228     struct rte_eth_link link;
229 
230     printf("\nChecking link status");
231     fflush(stdout);
232 
233     int i, nb_ports;
234     nb_ports = ff_global_cfg.dpdk.nb_ports;
235     for (count = 0; count <= MAX_CHECK_TIME; count++) {
236         all_ports_up = 1;
237         for (i = 0; i < nb_ports; i++) {
238             uint8_t portid = ff_global_cfg.dpdk.portid_list[i];
239             memset(&link, 0, sizeof(link));
240             rte_eth_link_get_nowait(portid, &link);
241 
242             /* print link status if flag set */
243             if (print_flag == 1) {
244                 if (link.link_status) {
245                     printf("Port %d Link Up - speed %u "
246                         "Mbps - %s\n", (int)portid,
247                         (unsigned)link.link_speed,
248                         (link.link_duplex == ETH_LINK_FULL_DUPLEX) ?
249                         ("full-duplex") : ("half-duplex\n"));
250                 } else {
251                     printf("Port %d Link Down\n", (int)portid);
252                 }
253                 continue;
254             }
255             /* clear all_ports_up flag if any link down */
256             if (link.link_status == 0) {
257                 all_ports_up = 0;
258                 break;
259             }
260         }
261 
262         /* after finally printing all link status, get out */
263         if (print_flag == 1)
264             break;
265 
266         if (all_ports_up == 0) {
267             printf(".");
268             fflush(stdout);
269             rte_delay_ms(CHECK_INTERVAL);
270         }
271 
272         /* set the print_flag if all ports up or timeout */
273         if (all_ports_up == 1 || count == (MAX_CHECK_TIME - 1)) {
274             print_flag = 1;
275             printf("done\n");
276         }
277     }
278 }
279 
280 static int
281 init_lcore_conf(void)
282 {
283     uint8_t nb_dev_ports = rte_eth_dev_count();
284     if (nb_dev_ports == 0) {
285         rte_exit(EXIT_FAILURE, "No probed ethernet devices\n");
286     }
287 
288     if (ff_global_cfg.dpdk.max_portid >= nb_dev_ports) {
289         rte_exit(EXIT_FAILURE, "this machine doesn't have port %d.\n",
290                  ff_global_cfg.dpdk.max_portid);
291     }
292 
293     lcore_conf.port_cfgs = ff_global_cfg.dpdk.port_cfgs;
294     lcore_conf.proc_id = ff_global_cfg.dpdk.proc_id;
295 
296     uint16_t proc_id;
297     for (proc_id = 0; proc_id < ff_global_cfg.dpdk.nb_procs; proc_id++) {
298         uint16_t lcore_id = ff_global_cfg.dpdk.proc_lcore[proc_id];
299         if (!lcore_config[lcore_id].detected) {
300             rte_exit(EXIT_FAILURE, "lcore %u unavailable\n", lcore_id);
301         }
302     }
303 
304     uint16_t socket_id = 0;
305     if (numa_on) {
306         socket_id = rte_lcore_to_socket_id(rte_lcore_id());
307     }
308 
309     lcore_conf.socket_id = socket_id;
310 
311     uint16_t lcore_id = ff_global_cfg.dpdk.proc_lcore[lcore_conf.proc_id];
312     int j;
313     for (j = 0; j < ff_global_cfg.dpdk.nb_ports; ++j) {
314         uint16_t port_id = ff_global_cfg.dpdk.portid_list[j];
315         struct ff_port_cfg *pconf = &ff_global_cfg.dpdk.port_cfgs[port_id];
316 
317         int queueid = -1;
318         int i;
319         for (i = 0; i < pconf->nb_lcores; i++) {
320             if (pconf->lcore_list[i] == lcore_id) {
321                 queueid = i;
322             }
323         }
324         if (queueid < 0) {
325             continue;
326         }
327         printf("lcore: %u, port: %u, queue: %u\n", lcore_id, port_id, queueid);
328         uint16_t nb_rx_queue = lcore_conf.nb_rx_queue;
329         lcore_conf.rx_queue_list[nb_rx_queue].port_id = port_id;
330         lcore_conf.rx_queue_list[nb_rx_queue].queue_id = queueid;
331         lcore_conf.nb_rx_queue++;
332 
333         lcore_conf.tx_queue_id[port_id] = queueid;
334         lcore_conf.tx_port_id[lcore_conf.nb_tx_port] = port_id;
335         lcore_conf.nb_tx_port++;
336 
337         lcore_conf.pcap[port_id] = pconf->pcap;
338         lcore_conf.nb_queue_list[port_id] = pconf->nb_lcores;
339     }
340 
341     if (lcore_conf.nb_rx_queue == 0) {
342         rte_exit(EXIT_FAILURE, "lcore %u has nothing to do\n", lcore_id);
343     }
344 
345     return 0;
346 }
347 
348 static int
349 init_mem_pool(void)
350 {
351     uint8_t nb_ports = ff_global_cfg.dpdk.nb_ports;
352     uint32_t nb_lcores = ff_global_cfg.dpdk.nb_procs;
353     uint32_t nb_tx_queue = nb_lcores;
354     uint32_t nb_rx_queue = lcore_conf.nb_rx_queue * nb_lcores;
355 
356     unsigned nb_mbuf = RTE_MAX (
357         (nb_rx_queue*RX_QUEUE_SIZE          +
358         nb_ports*nb_lcores*MAX_PKT_BURST    +
359         nb_ports*nb_tx_queue*TX_QUEUE_SIZE  +
360         nb_lcores*MEMPOOL_CACHE_SIZE +
361         nb_ports*KNI_MBUF_MAX +
362         nb_ports*KNI_QUEUE_SIZE +
363         nb_lcores*nb_ports*DISPATCH_RING_SIZE),
364         (unsigned)8192);
365 
366     unsigned socketid = 0;
367     uint16_t i, lcore_id;
368     char s[64];
369 
370     for (i = 0; i < ff_global_cfg.dpdk.nb_procs; i++) {
371         lcore_id = ff_global_cfg.dpdk.proc_lcore[i];
372         if (numa_on) {
373             socketid = rte_lcore_to_socket_id(lcore_id);
374         }
375 
376         if (socketid >= NB_SOCKETS) {
377             rte_exit(EXIT_FAILURE, "Socket %d of lcore %u is out of range %d\n",
378                 socketid, i, NB_SOCKETS);
379         }
380 
381         if (pktmbuf_pool[socketid] != NULL) {
382             continue;
383         }
384 
385         if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
386             snprintf(s, sizeof(s), "mbuf_pool_%d", socketid);
387             pktmbuf_pool[socketid] =
388                 rte_pktmbuf_pool_create(s, nb_mbuf,
389                     MEMPOOL_CACHE_SIZE, 0,
390                     RTE_MBUF_DEFAULT_BUF_SIZE, socketid);
391         } else {
392             snprintf(s, sizeof(s), "mbuf_pool_%d", socketid);
393             pktmbuf_pool[socketid] = rte_mempool_lookup(s);
394         }
395 
396         if (pktmbuf_pool[socketid] == NULL) {
397             rte_exit(EXIT_FAILURE, "Cannot create mbuf pool on socket %d\n", socketid);
398         } else {
399             printf("create mbuf pool on socket %d\n", socketid);
400         }
401     }
402 
403     return 0;
404 }
405 
406 static struct rte_ring *
407 create_ring(const char *name, unsigned count, int socket_id, unsigned flags)
408 {
409     struct rte_ring *ring;
410 
411     if (name == NULL)
412         return NULL;
413 
414     /* If already create, just attached it */
415     if (likely((ring = rte_ring_lookup(name)) != NULL))
416         return ring;
417 
418     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
419         return rte_ring_create(name, count, socket_id, flags);
420     } else {
421         return rte_ring_lookup(name);
422     }
423 }
424 
425 static int
426 init_dispatch_ring(void)
427 {
428     int j;
429     char name_buf[RTE_RING_NAMESIZE];
430     int queueid;
431 
432     unsigned socketid = lcore_conf.socket_id;
433 
434     /* Create ring according to ports actually being used. */
435     int nb_ports = ff_global_cfg.dpdk.nb_ports;
436     for (j = 0; j < nb_ports; j++) {
437         uint16_t portid = ff_global_cfg.dpdk.portid_list[j];
438         struct ff_port_cfg *pconf = &ff_global_cfg.dpdk.port_cfgs[portid];
439         int nb_queues = pconf->nb_lcores;
440         if (dispatch_ring[portid] == NULL) {
441             snprintf(name_buf, RTE_RING_NAMESIZE, "ring_ptr_p%d", portid);
442 
443             dispatch_ring[portid] = rte_zmalloc(name_buf,
444                 sizeof(struct rte_ring *) * nb_queues,
445                 RTE_CACHE_LINE_SIZE);
446             if (dispatch_ring[portid] == NULL) {
447                 rte_exit(EXIT_FAILURE, "rte_zmalloc(%s (struct rte_ring*)) "
448                     "failed\n", name_buf);
449             }
450         }
451 
452         for(queueid = 0; queueid < nb_queues; ++queueid) {
453             snprintf(name_buf, RTE_RING_NAMESIZE, "dispatch_ring_p%d_q%d",
454                 portid, queueid);
455             dispatch_ring[portid][queueid] = create_ring(name_buf,
456                 DISPATCH_RING_SIZE, socketid, RING_F_SC_DEQ);
457 
458             if (dispatch_ring[portid][queueid] == NULL)
459                 rte_panic("create ring:%s failed!\n", name_buf);
460 
461             printf("create ring:%s success, %u ring entries are now free!\n",
462                 name_buf, rte_ring_free_count(dispatch_ring[portid][queueid]));
463         }
464     }
465 
466     return 0;
467 }
468 
469 static void
470 ff_msg_init(struct rte_mempool *mp,
471     __attribute__((unused)) void *opaque_arg,
472     void *obj, __attribute__((unused)) unsigned i)
473 {
474     struct ff_msg *msg = (struct ff_msg *)obj;
475     msg->msg_type = FF_UNKNOWN;
476     msg->buf_addr = (char *)msg + sizeof(struct ff_msg);
477     msg->buf_len = mp->elt_size - sizeof(struct ff_msg);
478 }
479 
480 static int
481 init_msg_ring(void)
482 {
483     uint16_t i;
484     uint16_t nb_procs = ff_global_cfg.dpdk.nb_procs;
485     unsigned socketid = lcore_conf.socket_id;
486 
487     /* Create message buffer pool */
488     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
489         message_pool = rte_mempool_create(FF_MSG_POOL,
490            MSG_RING_SIZE * 2 * nb_procs,
491            MAX_MSG_BUF_SIZE, MSG_RING_SIZE / 2, 0,
492            NULL, NULL, ff_msg_init, NULL,
493            socketid, 0);
494     } else {
495         message_pool = rte_mempool_lookup(FF_MSG_POOL);
496     }
497 
498     if (message_pool == NULL) {
499         rte_panic("Create msg mempool failed\n");
500     }
501 
502     for(i = 0; i < nb_procs; ++i) {
503         snprintf(msg_ring[i].ring_name[0], RTE_RING_NAMESIZE,
504             "%s%u", FF_MSG_RING_IN, i);
505         snprintf(msg_ring[i].ring_name[1], RTE_RING_NAMESIZE,
506             "%s%u", FF_MSG_RING_OUT, i);
507 
508         msg_ring[i].ring[0] = create_ring(msg_ring[i].ring_name[0],
509             MSG_RING_SIZE, socketid, RING_F_SP_ENQ | RING_F_SC_DEQ);
510         if (msg_ring[i].ring[0] == NULL)
511             rte_panic("create ring::%s failed!\n", msg_ring[i].ring_name[0]);
512 
513         msg_ring[i].ring[1] = create_ring(msg_ring[i].ring_name[1],
514             MSG_RING_SIZE, socketid, RING_F_SP_ENQ | RING_F_SC_DEQ);
515         if (msg_ring[i].ring[1] == NULL)
516             rte_panic("create ring::%s failed!\n", msg_ring[i].ring_name[0]);
517     }
518 
519     return 0;
520 }
521 
522 static int
523 init_kni(void)
524 {
525     int nb_ports = rte_eth_dev_count();
526     kni_accept = 0;
527     if(strcasecmp(ff_global_cfg.kni.method, "accept") == 0)
528         kni_accept = 1;
529 
530     ff_kni_init(nb_ports, ff_global_cfg.kni.tcp_port,
531         ff_global_cfg.kni.udp_port);
532 
533     unsigned socket_id = lcore_conf.socket_id;
534     struct rte_mempool *mbuf_pool = pktmbuf_pool[socket_id];
535 
536     nb_ports = ff_global_cfg.dpdk.nb_ports;
537     int i, ret;
538     for (i = 0; i < nb_ports; i++) {
539         uint16_t port_id = ff_global_cfg.dpdk.portid_list[i];
540         ff_kni_alloc(port_id, socket_id, mbuf_pool, KNI_QUEUE_SIZE);
541     }
542 
543     return 0;
544 }
545 
546 static void
547 set_rss_table(uint8_t port_id, uint16_t reta_size, uint16_t nb_queues)
548 {
549     if (reta_size == 0) {
550         return;
551     }
552 
553     int reta_conf_size = RTE_MAX(1, reta_size / RTE_RETA_GROUP_SIZE);
554     struct rte_eth_rss_reta_entry64 reta_conf[reta_conf_size];
555 
556     /* config HW indirection table */
557     unsigned i, j, hash=0;
558     for (i = 0; i < reta_conf_size; i++) {
559         reta_conf[i].mask = ~0ULL;
560         for (j = 0; j < RTE_RETA_GROUP_SIZE; j++) {
561             reta_conf[i].reta[j] = hash++ % nb_queues;
562         }
563     }
564 
565     if (rte_eth_dev_rss_reta_update(port_id, reta_conf, reta_size)) {
566         rte_exit(EXIT_FAILURE, "port[%d], failed to update rss table\n",
567             port_id);
568     }
569 }
570 
571 static int
572 init_port_start(void)
573 {
574     int nb_ports = ff_global_cfg.dpdk.nb_ports;
575     unsigned socketid = rte_lcore_to_socket_id(rte_lcore_id());
576     struct rte_mempool *mbuf_pool = pktmbuf_pool[socketid];
577     uint16_t i;
578 
579     for (i = 0; i < nb_ports; i++) {
580         uint16_t port_id = ff_global_cfg.dpdk.portid_list[i];
581         struct ff_port_cfg *pconf = &ff_global_cfg.dpdk.port_cfgs[port_id];
582         uint16_t nb_queues = pconf->nb_lcores;
583 
584         struct rte_eth_dev_info dev_info;
585         rte_eth_dev_info_get(port_id, &dev_info);
586 
587         if (nb_queues > dev_info.max_rx_queues) {
588             rte_exit(EXIT_FAILURE, "num_procs[%d] bigger than max_rx_queues[%d]\n",
589                 nb_queues,
590                 dev_info.max_rx_queues);
591         }
592 
593         if (nb_queues > dev_info.max_tx_queues) {
594             rte_exit(EXIT_FAILURE, "num_procs[%d] bigger than max_tx_queues[%d]\n",
595                 nb_queues,
596                 dev_info.max_tx_queues);
597         }
598 
599         struct ether_addr addr;
600         rte_eth_macaddr_get(port_id, &addr);
601         printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
602                    " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
603                 (unsigned)port_id,
604                 addr.addr_bytes[0], addr.addr_bytes[1],
605                 addr.addr_bytes[2], addr.addr_bytes[3],
606                 addr.addr_bytes[4], addr.addr_bytes[5]);
607 
608         rte_memcpy(pconf->mac,
609             addr.addr_bytes, ETHER_ADDR_LEN);
610 
611         /* Clear txq_flags - we do not need multi-mempool and refcnt */
612         dev_info.default_txconf.txq_flags = ETH_TXQ_FLAGS_NOMULTMEMP |
613             ETH_TXQ_FLAGS_NOREFCOUNT;
614 
615         /* Disable features that are not supported by port's HW */
616         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM)) {
617             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMUDP;
618         }
619 
620         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)) {
621             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMTCP;
622         }
623 
624         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_SCTP_CKSUM)) {
625             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMSCTP;
626         }
627 
628         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_VLAN_INSERT)) {
629             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOVLANOFFL;
630         }
631 
632         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_TSO) &&
633             !(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_TSO)) {
634             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOMULTSEGS;
635         }
636 
637         struct rte_eth_conf port_conf = {0};
638 
639         /* Set RSS mode */
640         port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
641         port_conf.rx_adv_conf.rss_conf.rss_hf = ETH_RSS_PROTO_MASK;
642         port_conf.rx_adv_conf.rss_conf.rss_key = default_rsskey_40bytes;
643         port_conf.rx_adv_conf.rss_conf.rss_key_len = 40;
644 
645         /* Set Rx VLAN stripping */
646         if (ff_global_cfg.dpdk.vlan_strip) {
647             if (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_VLAN_STRIP) {
648                 port_conf.rxmode.hw_vlan_strip = 1;
649             }
650         }
651 
652         /* Enable HW CRC stripping */
653         port_conf.rxmode.hw_strip_crc = 1;
654 
655         /* FIXME: Enable TCP LRO ?*/
656         #if 0
657         if (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_LRO) {
658             printf("LRO is supported\n");
659             port_conf.rxmode.enable_lro = 1;
660             pconf->hw_features.rx_lro = 1;
661         }
662         #endif
663 
664         /* Set Rx checksum checking */
665         if ((dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
666             (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_UDP_CKSUM) &&
667             (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)) {
668             printf("RX checksum offload supported\n");
669             port_conf.rxmode.hw_ip_checksum = 1;
670             pconf->hw_features.rx_csum = 1;
671         }
672 
673         if ((dev_info.tx_offload_capa & DEV_TX_OFFLOAD_IPV4_CKSUM)) {
674             printf("TX ip checksum offload supported\n");
675             pconf->hw_features.tx_csum_ip = 1;
676         }
677 
678         if ((dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM) &&
679             (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)) {
680             printf("TX TCP&UDP checksum offload supported\n");
681             pconf->hw_features.tx_csum_l4 = 1;
682         }
683 
684         if (ff_global_cfg.dpdk.tso) {
685             if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_TSO) {
686                 printf("TSO is supported\n");
687                 pconf->hw_features.tx_tso = 1;
688             }
689         } else {
690             printf("TSO is disabled\n");
691         }
692 
693         if (dev_info.reta_size) {
694             /* reta size must be power of 2 */
695             assert((dev_info.reta_size & (dev_info.reta_size - 1)) == 0);
696 
697             rss_reta_size[port_id] = dev_info.reta_size;
698             printf("port[%d]: rss table size: %d\n", port_id,
699                 dev_info.reta_size);
700         }
701 
702         if (rte_eal_process_type() != RTE_PROC_PRIMARY) {
703             continue;
704         }
705 
706         int ret = rte_eth_dev_configure(port_id, nb_queues, nb_queues, &port_conf);
707         if (ret != 0) {
708             return ret;
709         }
710         uint16_t q;
711         for (q = 0; q < nb_queues; q++) {
712             ret = rte_eth_tx_queue_setup(port_id, q, TX_QUEUE_SIZE,
713                 socketid, &dev_info.default_txconf);
714             if (ret < 0) {
715                 return ret;
716             }
717 
718             ret = rte_eth_rx_queue_setup(port_id, q, RX_QUEUE_SIZE,
719                 socketid, &dev_info.default_rxconf, mbuf_pool);
720             if (ret < 0) {
721                 return ret;
722             }
723         }
724 
725         ret = rte_eth_dev_start(port_id);
726         if (ret < 0) {
727             return ret;
728         }
729 
730         if (nb_queues > 1) {
731             /* set HW rss hash function to Toeplitz. */
732             if (!rte_eth_dev_filter_supported(port_id, RTE_ETH_FILTER_HASH)) {
733                 struct rte_eth_hash_filter_info info = {0};
734                 info.info_type = RTE_ETH_HASH_FILTER_GLOBAL_CONFIG;
735                 info.info.global_conf.hash_func = RTE_ETH_HASH_FUNCTION_TOEPLITZ;
736 
737                 if (rte_eth_dev_filter_ctrl(port_id, RTE_ETH_FILTER_HASH,
738                     RTE_ETH_FILTER_SET, &info) < 0) {
739                     rte_exit(EXIT_FAILURE, "port[%d] set hash func failed\n",
740                         port_id);
741                 }
742             }
743 
744             set_rss_table(port_id, dev_info.reta_size, nb_queues);
745         }
746 
747         /* Enable RX in promiscuous mode for the Ethernet device. */
748         if (ff_global_cfg.dpdk.promiscuous) {
749             rte_eth_promiscuous_enable(port_id);
750             ret = rte_eth_promiscuous_get(port_id);
751             if (ret == 1) {
752                 printf("set port %u to promiscuous mode ok\n", port_id);
753             } else {
754                 printf("set port %u to promiscuous mode error\n", port_id);
755             }
756         }
757 
758         /* Enable pcap dump */
759         if (pconf->pcap) {
760             ff_enable_pcap(pconf->pcap);
761         }
762     }
763 
764     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
765         check_all_ports_link_status();
766     }
767 
768     return 0;
769 }
770 
771 static int
772 init_clock(void)
773 {
774     rte_timer_subsystem_init();
775     uint64_t hz = rte_get_timer_hz();
776     uint64_t intrs = MS_PER_S/ff_global_cfg.freebsd.hz;
777     uint64_t tsc = (hz + MS_PER_S - 1) / MS_PER_S*intrs;
778 
779     rte_timer_init(&freebsd_clock);
780     rte_timer_reset(&freebsd_clock, tsc, PERIODICAL,
781         rte_lcore_id(), &ff_hardclock_job, NULL);
782 
783     ff_update_current_ts();
784 
785     return 0;
786 }
787 
788 int
789 ff_dpdk_init(int argc, char **argv)
790 {
791     if (ff_global_cfg.dpdk.nb_procs < 1 ||
792         ff_global_cfg.dpdk.nb_procs > RTE_MAX_LCORE ||
793         ff_global_cfg.dpdk.proc_id >= ff_global_cfg.dpdk.nb_procs ||
794         ff_global_cfg.dpdk.proc_id < 0) {
795         printf("param num_procs[%d] or proc_id[%d] error!\n",
796             ff_global_cfg.dpdk.nb_procs,
797             ff_global_cfg.dpdk.proc_id);
798         exit(1);
799     }
800 
801     int ret = rte_eal_init(argc, argv);
802     if (ret < 0) {
803         rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
804     }
805 
806     numa_on = ff_global_cfg.dpdk.numa_on;
807 
808     init_lcore_conf();
809 
810     init_mem_pool();
811 
812     init_dispatch_ring();
813 
814     init_msg_ring();
815 
816     enable_kni = ff_global_cfg.kni.enable;
817     if (enable_kni) {
818         init_kni();
819     }
820 
821     ret = init_port_start();
822     if (ret < 0) {
823         rte_exit(EXIT_FAILURE, "init_port_start failed\n");
824     }
825 
826     init_clock();
827 
828     return 0;
829 }
830 
831 static void
832 ff_veth_input(const struct ff_dpdk_if_context *ctx, struct rte_mbuf *pkt)
833 {
834     uint8_t rx_csum = ctx->hw_features.rx_csum;
835     if (rx_csum) {
836         if (pkt->ol_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD)) {
837             return;
838         }
839     }
840 
841     /*
842      * FIXME: should we save pkt->vlan_tci
843      * if (pkt->ol_flags & PKT_RX_VLAN_PKT)
844      */
845 
846     void *data = rte_pktmbuf_mtod(pkt, void*);
847     uint16_t len = rte_pktmbuf_data_len(pkt);
848 
849     void *hdr = ff_mbuf_gethdr(pkt, pkt->pkt_len, data, len, rx_csum);
850     if (hdr == NULL) {
851         rte_pktmbuf_free(pkt);
852         return;
853     }
854 
855     struct rte_mbuf *pn = pkt->next;
856     void *prev = hdr;
857     while(pn != NULL) {
858         data = rte_pktmbuf_mtod(pn, void*);
859         len = rte_pktmbuf_data_len(pn);
860 
861         void *mb = ff_mbuf_get(prev, data, len);
862         if (mb == NULL) {
863             ff_mbuf_free(hdr);
864             rte_pktmbuf_free(pkt);
865             return;
866         }
867         pn = pn->next;
868         prev = mb;
869     }
870 
871     ff_veth_process_packet(ctx->ifp, hdr);
872 }
873 
874 static enum FilterReturn
875 protocol_filter(const void *data, uint16_t len)
876 {
877     if(len < ETHER_HDR_LEN)
878         return FILTER_UNKNOWN;
879 
880     const struct ether_hdr *hdr;
881     hdr = (const struct ether_hdr *)data;
882 
883     if(ntohs(hdr->ether_type) == ETHER_TYPE_ARP)
884         return FILTER_ARP;
885 
886     if (!enable_kni) {
887         return FILTER_UNKNOWN;
888     }
889 
890     if(ntohs(hdr->ether_type) != ETHER_TYPE_IPv4)
891         return FILTER_UNKNOWN;
892 
893     return ff_kni_proto_filter(data + ETHER_HDR_LEN,
894         len - ETHER_HDR_LEN);
895 }
896 
897 static inline void
898 pktmbuf_deep_attach(struct rte_mbuf *mi, const struct rte_mbuf *m)
899 {
900     struct rte_mbuf *md;
901     void *src, *dst;
902 
903     dst = rte_pktmbuf_mtod(mi, void *);
904     src = rte_pktmbuf_mtod(m, void *);
905 
906     mi->data_len = m->data_len;
907     rte_memcpy(dst, src, m->data_len);
908 
909     mi->port = m->port;
910     mi->vlan_tci = m->vlan_tci;
911     mi->vlan_tci_outer = m->vlan_tci_outer;
912     mi->tx_offload = m->tx_offload;
913     mi->hash = m->hash;
914     mi->ol_flags = m->ol_flags;
915     mi->packet_type = m->packet_type;
916 }
917 
918 /* copied from rte_pktmbuf_clone */
919 static inline struct rte_mbuf *
920 pktmbuf_deep_clone(const struct rte_mbuf *md,
921     struct rte_mempool *mp)
922 {
923     struct rte_mbuf *mc, *mi, **prev;
924     uint32_t pktlen;
925     uint8_t nseg;
926 
927     if (unlikely ((mc = rte_pktmbuf_alloc(mp)) == NULL))
928         return NULL;
929 
930     mi = mc;
931     prev = &mi->next;
932     pktlen = md->pkt_len;
933     nseg = 0;
934 
935     do {
936         nseg++;
937         pktmbuf_deep_attach(mi, md);
938         *prev = mi;
939         prev = &mi->next;
940     } while ((md = md->next) != NULL &&
941         (mi = rte_pktmbuf_alloc(mp)) != NULL);
942 
943     *prev = NULL;
944     mc->nb_segs = nseg;
945     mc->pkt_len = pktlen;
946 
947     /* Allocation of new indirect segment failed */
948     if (unlikely (mi == NULL)) {
949         rte_pktmbuf_free(mc);
950         return NULL;
951     }
952 
953     __rte_mbuf_sanity_check(mc, 1);
954     return mc;
955 }
956 
957 static inline void
958 process_packets(uint8_t port_id, uint16_t queue_id, struct rte_mbuf **bufs,
959     uint16_t count, const struct ff_dpdk_if_context *ctx, int pkts_from_ring)
960 {
961     struct lcore_conf *qconf = &lcore_conf;
962     uint16_t nb_queues = qconf->nb_queue_list[port_id];
963 
964     uint16_t i;
965     for (i = 0; i < count; i++) {
966         struct rte_mbuf *rtem = bufs[i];
967 
968         if (unlikely(qconf->pcap[port_id] != NULL)) {
969             if (!pkts_from_ring) {
970                 ff_dump_packets(qconf->pcap[port_id], rtem);
971             }
972         }
973 
974         void *data = rte_pktmbuf_mtod(rtem, void*);
975         uint16_t len = rte_pktmbuf_data_len(rtem);
976 
977         if (!pkts_from_ring && packet_dispatcher) {
978             int ret = (*packet_dispatcher)(data, len, queue_id, nb_queues);
979             if (ret < 0 || ret >= nb_queues) {
980                 rte_pktmbuf_free(rtem);
981                 continue;
982             }
983 
984             if (ret != queue_id) {
985                 ret = rte_ring_enqueue(dispatch_ring[port_id][ret], rtem);
986                 if (ret < 0)
987                     rte_pktmbuf_free(rtem);
988 
989                 continue;
990             }
991         }
992 
993         enum FilterReturn filter = protocol_filter(data, len);
994         if (filter == FILTER_ARP) {
995             struct rte_mempool *mbuf_pool;
996             struct rte_mbuf *mbuf_clone;
997             if (!pkts_from_ring) {
998                 uint16_t j;
999                 for(j = 0; j < nb_queues; ++j) {
1000                     if(j == queue_id)
1001                         continue;
1002 
1003                     unsigned socket_id = 0;
1004                     if (numa_on) {
1005                         uint16_t lcore_id = qconf->port_cfgs[port_id].lcore_list[j];
1006                         socket_id = rte_lcore_to_socket_id(lcore_id);
1007                     }
1008                     mbuf_pool = pktmbuf_pool[socket_id];
1009                     mbuf_clone = pktmbuf_deep_clone(rtem, mbuf_pool);
1010                     if(mbuf_clone) {
1011                         int ret = rte_ring_enqueue(dispatch_ring[port_id][j],
1012                             mbuf_clone);
1013                         if (ret < 0)
1014                             rte_pktmbuf_free(mbuf_clone);
1015                     }
1016                 }
1017             }
1018 
1019             if (enable_kni && rte_eal_process_type() == RTE_PROC_PRIMARY) {
1020                 mbuf_pool = pktmbuf_pool[qconf->socket_id];
1021                 mbuf_clone = pktmbuf_deep_clone(rtem, mbuf_pool);
1022                 if(mbuf_clone) {
1023                     ff_kni_enqueue(port_id, mbuf_clone);
1024                 }
1025             }
1026 
1027             ff_veth_input(ctx, rtem);
1028         } else if (enable_kni &&
1029             ((filter == FILTER_KNI && kni_accept) ||
1030             (filter == FILTER_UNKNOWN && !kni_accept)) ) {
1031             ff_kni_enqueue(port_id, rtem);
1032         } else {
1033             ff_veth_input(ctx, rtem);
1034         }
1035     }
1036 }
1037 
1038 static inline int
1039 process_dispatch_ring(uint8_t port_id, uint16_t queue_id,
1040     struct rte_mbuf **pkts_burst, const struct ff_dpdk_if_context *ctx)
1041 {
1042     /* read packet from ring buf and to process */
1043     uint16_t nb_rb;
1044     nb_rb = rte_ring_dequeue_burst(dispatch_ring[port_id][queue_id],
1045         (void **)pkts_burst, MAX_PKT_BURST);
1046 
1047     if(nb_rb > 0) {
1048         process_packets(port_id, queue_id, pkts_burst, nb_rb, ctx, 1);
1049     }
1050 
1051     return 0;
1052 }
1053 
1054 static inline void
1055 handle_sysctl_msg(struct ff_msg *msg)
1056 {
1057     int ret = ff_sysctl(msg->sysctl.name, msg->sysctl.namelen,
1058         msg->sysctl.old, msg->sysctl.oldlenp, msg->sysctl.new,
1059         msg->sysctl.newlen);
1060 
1061     if (ret < 0) {
1062         msg->result = errno;
1063     } else {
1064         msg->result = 0;
1065     }
1066 }
1067 
1068 static inline void
1069 handle_ioctl_msg(struct ff_msg *msg)
1070 {
1071     int fd, ret;
1072     fd = ff_socket(AF_INET, SOCK_DGRAM, 0);
1073     if (fd < 0) {
1074         ret = -1;
1075         goto done;
1076     }
1077 
1078     ret = ff_ioctl(fd, msg->ioctl.cmd, msg->ioctl.data);
1079 
1080     ff_close(fd);
1081 
1082 done:
1083     if (ret < 0) {
1084         msg->result = errno;
1085     } else {
1086         msg->result = 0;
1087     }
1088 }
1089 
1090 static inline void
1091 handle_route_msg(struct ff_msg *msg)
1092 {
1093     int ret = ff_rtioctl(msg->route.fib, msg->route.data,
1094         &msg->route.len, msg->route.maxlen);
1095     if (ret < 0) {
1096         msg->result = errno;
1097     } else {
1098         msg->result = 0;
1099     }
1100 }
1101 
1102 static struct ff_top_args ff_status;
1103 static inline void
1104 handle_top_msg(struct ff_msg *msg)
1105 {
1106     msg->top = ff_status;
1107     msg->result = 0;
1108 }
1109 
1110 #ifdef FF_NETGRAPH
1111 static inline void
1112 handle_ngctl_msg(struct ff_msg *msg)
1113 {
1114     int ret = ff_ngctl(msg->ngctl.cmd, msg->ngctl.data);
1115     if (ret < 0) {
1116         msg->result = errno;
1117     } else {
1118         msg->result = 0;
1119         msg->ngctl.ret = ret;
1120     }
1121 }
1122 #endif
1123 
1124 #ifdef FF_IPFW
1125 static inline void
1126 handle_ipfw_msg(struct ff_msg *msg)
1127 {
1128     int fd, ret;
1129     fd = ff_socket(AF_INET, SOCK_RAW, IPPROTO_RAW);
1130     if (fd < 0) {
1131         ret = -1;
1132         goto done;
1133     }
1134 
1135     switch (msg->ipfw.cmd) {
1136         case FF_IPFW_GET:
1137             ret = ff_getsockopt(fd, msg->ipfw.level,
1138                 msg->ipfw.optname, msg->ipfw.optval,
1139                 msg->ipfw.optlen);
1140             break;
1141         case FF_IPFW_SET:
1142             ret = ff_setsockopt(fd, msg->ipfw.level,
1143                 msg->ipfw.optname, msg->ipfw.optval,
1144                 *(msg->ipfw.optlen));
1145             break;
1146         default:
1147             ret = -1;
1148             errno = ENOTSUP;
1149             break;
1150     }
1151 
1152     ff_close(fd);
1153 
1154 done:
1155     if (ret < 0) {
1156         msg->result = errno;
1157     } else {
1158         msg->result = 0;
1159     }
1160 }
1161 #endif
1162 
1163 static inline void
1164 handle_default_msg(struct ff_msg *msg)
1165 {
1166     msg->result = ENOTSUP;
1167 }
1168 
1169 static inline void
1170 handle_msg(struct ff_msg *msg, uint16_t proc_id)
1171 {
1172     switch (msg->msg_type) {
1173         case FF_SYSCTL:
1174             handle_sysctl_msg(msg);
1175             break;
1176         case FF_IOCTL:
1177             handle_ioctl_msg(msg);
1178             break;
1179         case FF_ROUTE:
1180             handle_route_msg(msg);
1181             break;
1182         case FF_TOP:
1183             handle_top_msg(msg);
1184             break;
1185 #ifdef FF_NETGRAPH
1186         case FF_NGCTL:
1187             handle_ngctl_msg(msg);
1188             break;
1189 #endif
1190 #ifdef FF_IPFW
1191         case FF_IPFW_CTL:
1192             handle_ipfw_msg(msg);
1193             break;
1194 #endif
1195         default:
1196             handle_default_msg(msg);
1197             break;
1198     }
1199     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
1200 }
1201 
1202 static inline int
1203 process_msg_ring(uint16_t proc_id)
1204 {
1205     void *msg;
1206     int ret = rte_ring_dequeue(msg_ring[proc_id].ring[0], &msg);
1207 
1208     if (unlikely(ret == 0)) {
1209         handle_msg((struct ff_msg *)msg, proc_id);
1210     }
1211 
1212     return 0;
1213 }
1214 
1215 /* Send burst of packets on an output interface */
1216 static inline int
1217 send_burst(struct lcore_conf *qconf, uint16_t n, uint8_t port)
1218 {
1219     struct rte_mbuf **m_table;
1220     int ret;
1221     uint16_t queueid;
1222 
1223     queueid = qconf->tx_queue_id[port];
1224     m_table = (struct rte_mbuf **)qconf->tx_mbufs[port].m_table;
1225 
1226     if (unlikely(qconf->pcap[port] != NULL)) {
1227         uint16_t i;
1228         for (i = 0; i < n; i++) {
1229             ff_dump_packets(qconf->pcap[port], m_table[i]);
1230         }
1231     }
1232 
1233     ret = rte_eth_tx_burst(port, queueid, m_table, n);
1234     if (unlikely(ret < n)) {
1235         do {
1236             rte_pktmbuf_free(m_table[ret]);
1237         } while (++ret < n);
1238     }
1239 
1240     return 0;
1241 }
1242 
1243 /* Enqueue a single packet, and send burst if queue is filled */
1244 static inline int
1245 send_single_packet(struct rte_mbuf *m, uint8_t port)
1246 {
1247     uint16_t len;
1248     struct lcore_conf *qconf;
1249 
1250     qconf = &lcore_conf;
1251     len = qconf->tx_mbufs[port].len;
1252     qconf->tx_mbufs[port].m_table[len] = m;
1253     len++;
1254 
1255     /* enough pkts to be sent */
1256     if (unlikely(len == MAX_PKT_BURST)) {
1257         send_burst(qconf, MAX_PKT_BURST, port);
1258         len = 0;
1259     }
1260 
1261     qconf->tx_mbufs[port].len = len;
1262     return 0;
1263 }
1264 
1265 int
1266 ff_dpdk_if_send(struct ff_dpdk_if_context *ctx, void *m,
1267     int total)
1268 {
1269     struct rte_mempool *mbuf_pool = pktmbuf_pool[lcore_conf.socket_id];
1270     struct rte_mbuf *head = rte_pktmbuf_alloc(mbuf_pool);
1271     if (head == NULL) {
1272         ff_mbuf_free(m);
1273         return -1;
1274     }
1275 
1276     head->pkt_len = total;
1277     head->nb_segs = 0;
1278 
1279     int off = 0;
1280     struct rte_mbuf *cur = head, *prev = NULL;
1281     while(total > 0) {
1282         if (cur == NULL) {
1283             cur = rte_pktmbuf_alloc(mbuf_pool);
1284             if (cur == NULL) {
1285                 rte_pktmbuf_free(head);
1286                 ff_mbuf_free(m);
1287                 return -1;
1288             }
1289         }
1290 
1291         void *data = rte_pktmbuf_mtod(cur, void*);
1292         int len = total > RTE_MBUF_DEFAULT_DATAROOM ? RTE_MBUF_DEFAULT_DATAROOM : total;
1293         int ret = ff_mbuf_copydata(m, data, off, len);
1294         if (ret < 0) {
1295             rte_pktmbuf_free(head);
1296             ff_mbuf_free(m);
1297             return -1;
1298         }
1299 
1300         if (prev != NULL) {
1301             prev->next = cur;
1302         }
1303         prev = cur;
1304 
1305         cur->data_len = len;
1306         off += len;
1307         total -= len;
1308         head->nb_segs++;
1309         cur = NULL;
1310     }
1311 
1312     struct ff_tx_offload offload = {0};
1313     ff_mbuf_tx_offload(m, &offload);
1314 
1315     void *data = rte_pktmbuf_mtod(head, void*);
1316 
1317     if (offload.ip_csum) {
1318         /* ipv6 not supported yet */
1319         struct ipv4_hdr *iph;
1320         int iph_len;
1321         iph = (struct ipv4_hdr *)(data + ETHER_HDR_LEN);
1322         iph_len = (iph->version_ihl & 0x0f) << 2;
1323 
1324         head->ol_flags |= PKT_TX_IP_CKSUM | PKT_TX_IPV4;
1325         head->l2_len = ETHER_HDR_LEN;
1326         head->l3_len = iph_len;
1327     }
1328 
1329     if (ctx->hw_features.tx_csum_l4) {
1330         struct ipv4_hdr *iph;
1331         int iph_len;
1332         iph = (struct ipv4_hdr *)(data + ETHER_HDR_LEN);
1333         iph_len = (iph->version_ihl & 0x0f) << 2;
1334 
1335         if (offload.tcp_csum) {
1336             head->ol_flags |= PKT_TX_TCP_CKSUM;
1337             head->l2_len = ETHER_HDR_LEN;
1338             head->l3_len = iph_len;
1339         }
1340 
1341         /*
1342          *  TCP segmentation offload.
1343          *
1344          *  - set the PKT_TX_TCP_SEG flag in mbuf->ol_flags (this flag
1345          *    implies PKT_TX_TCP_CKSUM)
1346          *  - set the flag PKT_TX_IPV4 or PKT_TX_IPV6
1347          *  - if it's IPv4, set the PKT_TX_IP_CKSUM flag and
1348          *    write the IP checksum to 0 in the packet
1349          *  - fill the mbuf offload information: l2_len,
1350          *    l3_len, l4_len, tso_segsz
1351          *  - calculate the pseudo header checksum without taking ip_len
1352          *    in account, and set it in the TCP header. Refer to
1353          *    rte_ipv4_phdr_cksum() and rte_ipv6_phdr_cksum() that can be
1354          *    used as helpers.
1355          */
1356         if (offload.tso_seg_size) {
1357             struct tcp_hdr *tcph;
1358             int tcph_len;
1359             tcph = (struct tcp_hdr *)((char *)iph + iph_len);
1360             tcph_len = (tcph->data_off & 0xf0) >> 2;
1361             tcph->cksum = rte_ipv4_phdr_cksum(iph, PKT_TX_TCP_SEG);
1362 
1363             head->ol_flags |= PKT_TX_TCP_SEG;
1364             head->l4_len = tcph_len;
1365             head->tso_segsz = offload.tso_seg_size;
1366         }
1367 
1368         if (offload.udp_csum) {
1369             head->ol_flags |= PKT_TX_UDP_CKSUM;
1370             head->l2_len = ETHER_HDR_LEN;
1371             head->l3_len = iph_len;
1372         }
1373     }
1374 
1375     ff_mbuf_free(m);
1376 
1377     return send_single_packet(head, ctx->port_id);
1378 }
1379 
1380 static int
1381 main_loop(void *arg)
1382 {
1383     struct loop_routine *lr = (struct loop_routine *)arg;
1384 
1385     struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
1386     uint64_t prev_tsc, diff_tsc, cur_tsc, usch_tsc, div_tsc, usr_tsc, sys_tsc, end_tsc;
1387     int i, j, nb_rx, idle;
1388     uint8_t port_id, queue_id;
1389     struct lcore_conf *qconf;
1390     const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) /
1391         US_PER_S * BURST_TX_DRAIN_US;
1392     struct ff_dpdk_if_context *ctx;
1393 
1394     prev_tsc = 0;
1395     usch_tsc = 0;
1396 
1397     qconf = &lcore_conf;
1398 
1399     while (1) {
1400         cur_tsc = rte_rdtsc();
1401         if (unlikely(freebsd_clock.expire < cur_tsc)) {
1402             rte_timer_manage();
1403         }
1404 
1405         idle = 1;
1406         sys_tsc = 0;
1407         usr_tsc = 0;
1408 
1409         /*
1410          * TX burst queue drain
1411          */
1412         diff_tsc = cur_tsc - prev_tsc;
1413         if (unlikely(diff_tsc > drain_tsc)) {
1414             for (i = 0; i < qconf->nb_tx_port; i++) {
1415                 port_id = qconf->tx_port_id[i];
1416                 if (qconf->tx_mbufs[port_id].len == 0)
1417                     continue;
1418 
1419                 idle = 0;
1420 
1421                 send_burst(qconf,
1422                     qconf->tx_mbufs[port_id].len,
1423                     port_id);
1424                 qconf->tx_mbufs[port_id].len = 0;
1425             }
1426 
1427             prev_tsc = cur_tsc;
1428         }
1429 
1430         /*
1431          * Read packet from RX queues
1432          */
1433         for (i = 0; i < qconf->nb_rx_queue; ++i) {
1434             port_id = qconf->rx_queue_list[i].port_id;
1435             queue_id = qconf->rx_queue_list[i].queue_id;
1436             ctx = veth_ctx[port_id];
1437 
1438             if (enable_kni && rte_eal_process_type() == RTE_PROC_PRIMARY) {
1439                 ff_kni_process(port_id, queue_id, pkts_burst, MAX_PKT_BURST);
1440             }
1441 
1442             process_dispatch_ring(port_id, queue_id, pkts_burst, ctx);
1443 
1444             nb_rx = rte_eth_rx_burst(port_id, queue_id, pkts_burst,
1445                 MAX_PKT_BURST);
1446             if (nb_rx == 0)
1447                 continue;
1448 
1449             idle = 0;
1450 
1451             /* Prefetch first packets */
1452             for (j = 0; j < PREFETCH_OFFSET && j < nb_rx; j++) {
1453                 rte_prefetch0(rte_pktmbuf_mtod(
1454                         pkts_burst[j], void *));
1455             }
1456 
1457             /* Prefetch and handle already prefetched packets */
1458             for (j = 0; j < (nb_rx - PREFETCH_OFFSET); j++) {
1459                 rte_prefetch0(rte_pktmbuf_mtod(pkts_burst[
1460                         j + PREFETCH_OFFSET], void *));
1461                 process_packets(port_id, queue_id, &pkts_burst[j], 1, ctx, 0);
1462             }
1463 
1464             /* Handle remaining prefetched packets */
1465             for (; j < nb_rx; j++) {
1466                 process_packets(port_id, queue_id, &pkts_burst[j], 1, ctx, 0);
1467             }
1468         }
1469 
1470         process_msg_ring(qconf->proc_id);
1471 
1472         div_tsc = rte_rdtsc();
1473 
1474         if (likely(lr->loop != NULL && (!idle || cur_tsc - usch_tsc > drain_tsc))) {
1475             usch_tsc = cur_tsc;
1476             lr->loop(lr->arg);
1477         }
1478 
1479         end_tsc = rte_rdtsc();
1480 
1481         if (usch_tsc == cur_tsc) {
1482             usr_tsc = end_tsc - div_tsc;
1483         }
1484 
1485         if (!idle) {
1486             sys_tsc = div_tsc - cur_tsc;
1487             ff_status.sys_tsc += sys_tsc;
1488         }
1489 
1490         ff_status.usr_tsc += usr_tsc;
1491         ff_status.work_tsc += end_tsc - cur_tsc;
1492         ff_status.idle_tsc += end_tsc - cur_tsc - usr_tsc - sys_tsc;
1493 
1494         ff_status.loops++;
1495     }
1496 
1497     return 0;
1498 }
1499 
1500 int
1501 ff_dpdk_if_up(void) {
1502     int i;
1503     struct lcore_conf *qconf = &lcore_conf;
1504     for (i = 0; i < qconf->nb_tx_port; i++) {
1505         uint16_t port_id = qconf->tx_port_id[i];
1506 
1507         struct ff_port_cfg *pconf = &qconf->port_cfgs[port_id];
1508         veth_ctx[port_id] = ff_veth_attach(pconf);
1509         if (veth_ctx[port_id] == NULL) {
1510             rte_exit(EXIT_FAILURE, "ff_veth_attach failed");
1511         }
1512     }
1513 
1514     return 0;
1515 }
1516 
1517 void
1518 ff_dpdk_run(loop_func_t loop, void *arg) {
1519     struct loop_routine *lr = rte_malloc(NULL,
1520         sizeof(struct loop_routine), 0);
1521     lr->loop = loop;
1522     lr->arg = arg;
1523     rte_eal_mp_remote_launch(main_loop, lr, CALL_MASTER);
1524     rte_eal_mp_wait_lcore();
1525     rte_free(lr);
1526 }
1527 
1528 void
1529 ff_dpdk_pktmbuf_free(void *m)
1530 {
1531     rte_pktmbuf_free((struct rte_mbuf *)m);
1532 }
1533 
1534 static uint32_t
1535 toeplitz_hash(unsigned keylen, const uint8_t *key,
1536     unsigned datalen, const uint8_t *data)
1537 {
1538     uint32_t hash = 0, v;
1539     u_int i, b;
1540 
1541     /* XXXRW: Perhaps an assertion about key length vs. data length? */
1542 
1543     v = (key[0]<<24) + (key[1]<<16) + (key[2] <<8) + key[3];
1544     for (i = 0; i < datalen; i++) {
1545         for (b = 0; b < 8; b++) {
1546             if (data[i] & (1<<(7-b)))
1547                 hash ^= v;
1548             v <<= 1;
1549             if ((i + 4) < keylen &&
1550                 (key[i+4] & (1<<(7-b))))
1551                 v |= 1;
1552         }
1553     }
1554     return (hash);
1555 }
1556 
1557 int
1558 ff_rss_check(void *softc, uint32_t saddr, uint32_t daddr,
1559     uint16_t sport, uint16_t dport)
1560 {
1561     struct lcore_conf *qconf = &lcore_conf;
1562     struct ff_dpdk_if_context *ctx = ff_veth_softc_to_hostc(softc);
1563     uint16_t nb_queues = qconf->nb_queue_list[ctx->port_id];
1564 
1565     if (nb_queues <= 1) {
1566         return 1;
1567     }
1568 
1569     uint16_t reta_size = rss_reta_size[ctx->port_id];
1570     uint16_t queueid = qconf->tx_queue_id[ctx->port_id];
1571 
1572     uint8_t data[sizeof(saddr) + sizeof(daddr) + sizeof(sport) +
1573         sizeof(dport)];
1574 
1575     unsigned datalen = 0;
1576 
1577     bcopy(&saddr, &data[datalen], sizeof(saddr));
1578     datalen += sizeof(saddr);
1579 
1580     bcopy(&daddr, &data[datalen], sizeof(daddr));
1581     datalen += sizeof(daddr);
1582 
1583     bcopy(&sport, &data[datalen], sizeof(sport));
1584     datalen += sizeof(sport);
1585 
1586     bcopy(&dport, &data[datalen], sizeof(dport));
1587     datalen += sizeof(dport);
1588 
1589     uint32_t hash = toeplitz_hash(sizeof(default_rsskey_40bytes),
1590         default_rsskey_40bytes, datalen, data);
1591 
1592     return ((hash & (reta_size - 1)) % nb_queues) == queueid;
1593 }
1594 
1595 void
1596 ff_regist_packet_dispatcher(dispatch_func_t func)
1597 {
1598     packet_dispatcher = func;
1599 }
1600 
1601 uint64_t
1602 ff_get_tsc_ns()
1603 {
1604     uint64_t cur_tsc = rte_rdtsc();
1605     uint64_t hz = rte_get_tsc_hz();
1606     return ((double)cur_tsc/(double)hz) * NS_PER_S;
1607 }
1608 
1609