xref: /f-stack/lib/ff_dpdk_if.c (revision b9e91cfd)
1 /*
2  * Copyright (C) 2017 THL A29 Limited, a Tencent company.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * 1. Redistributions of source code must retain the above copyright notice, this
9  *   list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright notice,
11  *   this list of conditions and the following disclaimer in the documentation
12  *   and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17  * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
18  * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
19  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
20  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
21  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
22  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
23  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24  *
25  */
26 #include <assert.h>
27 
28 #include <rte_common.h>
29 #include <rte_byteorder.h>
30 #include <rte_log.h>
31 #include <rte_memory.h>
32 #include <rte_memcpy.h>
33 #include <rte_memzone.h>
34 #include <rte_config.h>
35 #include <rte_eal.h>
36 #include <rte_pci.h>
37 #include <rte_mbuf.h>
38 #include <rte_memory.h>
39 #include <rte_lcore.h>
40 #include <rte_launch.h>
41 #include <rte_ethdev.h>
42 #include <rte_debug.h>
43 #include <rte_common.h>
44 #include <rte_ether.h>
45 #include <rte_malloc.h>
46 #include <rte_cycles.h>
47 #include <rte_timer.h>
48 #include <rte_thash.h>
49 #include <rte_ip.h>
50 #include <rte_tcp.h>
51 #include <rte_udp.h>
52 
53 #include "ff_dpdk_if.h"
54 #include "ff_dpdk_pcap.h"
55 #include "ff_dpdk_kni.h"
56 #include "ff_config.h"
57 #include "ff_veth.h"
58 #include "ff_host_interface.h"
59 #include "ff_msg.h"
60 #include "ff_api.h"
61 
62 #define MEMPOOL_CACHE_SIZE 256
63 
64 #define DISPATCH_RING_SIZE 2048
65 
66 #define MSG_RING_SIZE 32
67 
68 /*
69  * Configurable number of RX/TX ring descriptors
70  */
71 #define RX_QUEUE_SIZE 512
72 #define TX_QUEUE_SIZE 512
73 
74 #define MAX_PKT_BURST 32
75 #define BURST_TX_DRAIN_US 100 /* TX drain every ~100us */
76 
77 /*
78  * Try to avoid TX buffering if we have at least MAX_TX_BURST packets to send.
79  */
80 #define MAX_TX_BURST    (MAX_PKT_BURST / 2)
81 
82 #define NB_SOCKETS 8
83 
84 /* Configure how many packets ahead to prefetch, when reading packets */
85 #define PREFETCH_OFFSET    3
86 
87 #define MAX_RX_QUEUE_PER_LCORE 16
88 #define MAX_TX_QUEUE_PER_PORT RTE_MAX_ETHPORTS
89 #define MAX_RX_QUEUE_PER_PORT 128
90 
91 #define KNI_MBUF_MAX 2048
92 #define KNI_QUEUE_SIZE 2048
93 
94 static int enable_kni;
95 static int kni_accept;
96 
97 static int numa_on;
98 
99 static struct rte_timer freebsd_clock;
100 
101 // Mellanox Linux's driver key
102 static uint8_t default_rsskey_40bytes[40] = {
103     0xd1, 0x81, 0xc6, 0x2c, 0xf7, 0xf4, 0xdb, 0x5b,
104     0x19, 0x83, 0xa2, 0xfc, 0x94, 0x3e, 0x1a, 0xdb,
105     0xd9, 0x38, 0x9e, 0x6b, 0xd1, 0x03, 0x9c, 0x2c,
106     0xa7, 0x44, 0x99, 0xad, 0x59, 0x3d, 0x56, 0xd9,
107     0xf3, 0x25, 0x3c, 0x06, 0x2a, 0xdc, 0x1f, 0xfc
108 };
109 
110 static struct rte_eth_conf default_port_conf = {
111     .rxmode = {
112         .mq_mode = ETH_MQ_RX_RSS,
113         .max_rx_pkt_len = ETHER_MAX_LEN,
114         .split_hdr_size = 0, /**< hdr buf size */
115         .header_split   = 0, /**< Header Split disabled */
116         .hw_ip_checksum = 0, /**< IP checksum offload disabled */
117         .hw_vlan_filter = 0, /**< VLAN filtering disabled */
118         .hw_vlan_strip  = 0, /**< VLAN strip disabled. */
119         .hw_vlan_extend = 0, /**< Extended VLAN disabled. */
120         .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
121         .hw_strip_crc   = 0, /**< CRC stripped by hardware */
122         .enable_lro     = 0, /**< LRO disabled */
123     },
124     .rx_adv_conf = {
125         .rss_conf = {
126             .rss_key = default_rsskey_40bytes,
127             .rss_key_len = 40,
128             .rss_hf = ETH_RSS_PROTO_MASK,
129         },
130     },
131     .txmode = {
132         .mq_mode = ETH_MQ_TX_NONE,
133     },
134 };
135 
136 struct mbuf_table {
137     uint16_t len;
138     struct rte_mbuf *m_table[MAX_PKT_BURST];
139 };
140 
141 struct lcore_rx_queue {
142     uint8_t port_id;
143     uint8_t queue_id;
144 } __rte_cache_aligned;
145 
146 struct lcore_conf {
147     uint16_t proc_id;
148     uint16_t socket_id;
149     uint16_t nb_queue_list[RTE_MAX_ETHPORTS];
150     struct ff_port_cfg *port_cfgs;
151 
152     uint16_t nb_rx_queue;
153     struct lcore_rx_queue rx_queue_list[MAX_RX_QUEUE_PER_LCORE];
154     uint16_t nb_tx_port;
155     uint16_t tx_port_id[RTE_MAX_ETHPORTS];
156     uint16_t tx_queue_id[RTE_MAX_ETHPORTS];
157     struct mbuf_table tx_mbufs[RTE_MAX_ETHPORTS];
158     char *pcap[RTE_MAX_ETHPORTS];
159 } __rte_cache_aligned;
160 
161 static struct lcore_conf lcore_conf;
162 
163 static struct rte_mempool *pktmbuf_pool[NB_SOCKETS];
164 
165 static struct rte_ring **dispatch_ring[RTE_MAX_ETHPORTS];
166 static dispatch_func_t packet_dispatcher;
167 
168 static uint16_t rss_reta_size[RTE_MAX_ETHPORTS];
169 
170 struct ff_msg_ring {
171     char ring_name[2][RTE_RING_NAMESIZE];
172     /* ring[0] for lcore recv msg, other send */
173     /* ring[1] for lcore send msg, other read */
174     struct rte_ring *ring[2];
175 } __rte_cache_aligned;
176 
177 static struct ff_msg_ring msg_ring[RTE_MAX_LCORE];
178 static struct rte_mempool *message_pool;
179 
180 struct ff_dpdk_if_context {
181     void *sc;
182     void *ifp;
183     uint16_t port_id;
184     struct ff_hw_features hw_features;
185 } __rte_cache_aligned;
186 
187 static struct ff_dpdk_if_context *veth_ctx[RTE_MAX_ETHPORTS];
188 
189 extern void ff_hardclock(void);
190 
191 static void
192 ff_hardclock_job(__rte_unused struct rte_timer *timer,
193     __rte_unused void *arg) {
194     ff_hardclock();
195     ff_update_current_ts();
196 }
197 
198 struct ff_dpdk_if_context *
199 ff_dpdk_register_if(void *sc, void *ifp, struct ff_port_cfg *cfg)
200 {
201     struct ff_dpdk_if_context *ctx;
202 
203     ctx = calloc(1, sizeof(struct ff_dpdk_if_context));
204     if (ctx == NULL)
205         return NULL;
206 
207     ctx->sc = sc;
208     ctx->ifp = ifp;
209     ctx->port_id = cfg->port_id;
210     ctx->hw_features = cfg->hw_features;
211 
212     return ctx;
213 }
214 
215 void
216 ff_dpdk_deregister_if(struct ff_dpdk_if_context *ctx)
217 {
218     free(ctx);
219 }
220 
221 static void
222 check_all_ports_link_status(void)
223 {
224     #define CHECK_INTERVAL 100 /* 100ms */
225     #define MAX_CHECK_TIME 90  /* 9s (90 * 100ms) in total */
226 
227     uint8_t portid, count, all_ports_up, print_flag = 0;
228     struct rte_eth_link link;
229 
230     printf("\nChecking link status");
231     fflush(stdout);
232 
233     int i, nb_ports;
234     nb_ports = ff_global_cfg.dpdk.nb_ports;
235     for (count = 0; count <= MAX_CHECK_TIME; count++) {
236         all_ports_up = 1;
237         for (i = 0; i < nb_ports; i++) {
238             uint8_t portid = ff_global_cfg.dpdk.portid_list[i];
239             memset(&link, 0, sizeof(link));
240             rte_eth_link_get_nowait(portid, &link);
241 
242             /* print link status if flag set */
243             if (print_flag == 1) {
244                 if (link.link_status) {
245                     printf("Port %d Link Up - speed %u "
246                         "Mbps - %s\n", (int)portid,
247                         (unsigned)link.link_speed,
248                         (link.link_duplex == ETH_LINK_FULL_DUPLEX) ?
249                         ("full-duplex") : ("half-duplex\n"));
250                 } else {
251                     printf("Port %d Link Down\n", (int)portid);
252                 }
253                 continue;
254             }
255             /* clear all_ports_up flag if any link down */
256             if (link.link_status == 0) {
257                 all_ports_up = 0;
258                 break;
259             }
260         }
261 
262         /* after finally printing all link status, get out */
263         if (print_flag == 1)
264             break;
265 
266         if (all_ports_up == 0) {
267             printf(".");
268             fflush(stdout);
269             rte_delay_ms(CHECK_INTERVAL);
270         }
271 
272         /* set the print_flag if all ports up or timeout */
273         if (all_ports_up == 1 || count == (MAX_CHECK_TIME - 1)) {
274             print_flag = 1;
275             printf("done\n");
276         }
277     }
278 }
279 
280 static int
281 init_lcore_conf(void)
282 {
283     uint8_t nb_dev_ports = rte_eth_dev_count();
284     if (nb_dev_ports == 0) {
285         rte_exit(EXIT_FAILURE, "No probed ethernet devices\n");
286     }
287 
288     if (ff_global_cfg.dpdk.max_portid >= nb_dev_ports) {
289         rte_exit(EXIT_FAILURE, "this machine doesn't have port %d.\n",
290                  ff_global_cfg.dpdk.max_portid);
291     }
292 
293     lcore_conf.port_cfgs = ff_global_cfg.dpdk.port_cfgs;
294     lcore_conf.proc_id = ff_global_cfg.dpdk.proc_id;
295 
296     uint16_t proc_id;
297     for (proc_id = 0; proc_id < ff_global_cfg.dpdk.nb_procs; proc_id++) {
298         uint16_t lcore_id = ff_global_cfg.dpdk.proc_lcore[proc_id];
299         if (!lcore_config[lcore_id].detected) {
300             rte_exit(EXIT_FAILURE, "lcore %u unavailable\n", lcore_id);
301         }
302     }
303 
304     uint16_t socket_id = 0;
305     if (numa_on) {
306         socket_id = rte_lcore_to_socket_id(rte_lcore_id());
307     }
308 
309     lcore_conf.socket_id = socket_id;
310 
311     uint16_t lcore_id = ff_global_cfg.dpdk.proc_lcore[lcore_conf.proc_id];
312     int j;
313     for (j = 0; j < ff_global_cfg.dpdk.nb_ports; ++j) {
314         uint16_t port_id = ff_global_cfg.dpdk.portid_list[j];
315         struct ff_port_cfg *pconf = &ff_global_cfg.dpdk.port_cfgs[port_id];
316 
317         int queueid = -1;
318         int i;
319         for (i = 0; i < pconf->nb_lcores; i++) {
320             if (pconf->lcore_list[i] == lcore_id) {
321                 queueid = i;
322             }
323         }
324         if (queueid < 0) {
325             continue;
326         }
327         printf("lcore: %u, port: %u, queue: %u\n", lcore_id, port_id, queueid);
328         uint16_t nb_rx_queue = lcore_conf.nb_rx_queue;
329         lcore_conf.rx_queue_list[nb_rx_queue].port_id = port_id;
330         lcore_conf.rx_queue_list[nb_rx_queue].queue_id = queueid;
331         lcore_conf.nb_rx_queue++;
332 
333         lcore_conf.tx_queue_id[port_id] = queueid;
334         lcore_conf.tx_port_id[lcore_conf.nb_tx_port] = port_id;
335         lcore_conf.nb_tx_port++;
336 
337         lcore_conf.pcap[port_id] = pconf->pcap;
338         lcore_conf.nb_queue_list[port_id] = pconf->nb_lcores;
339     }
340 
341     if (lcore_conf.nb_rx_queue == 0) {
342         rte_exit(EXIT_FAILURE, "lcore %u has nothing to do\n", lcore_id);
343     }
344 
345     return 0;
346 }
347 
348 static int
349 init_mem_pool(void)
350 {
351     uint8_t nb_ports = ff_global_cfg.dpdk.nb_ports;
352     uint32_t nb_lcores = ff_global_cfg.dpdk.nb_procs;
353     uint32_t nb_tx_queue = nb_lcores;
354     uint32_t nb_rx_queue = lcore_conf.nb_rx_queue * nb_lcores;
355 
356     unsigned nb_mbuf = RTE_MAX (
357         (nb_rx_queue*RX_QUEUE_SIZE          +
358         nb_ports*nb_lcores*MAX_PKT_BURST    +
359         nb_ports*nb_tx_queue*TX_QUEUE_SIZE  +
360         nb_lcores*MEMPOOL_CACHE_SIZE +
361         nb_ports*KNI_MBUF_MAX +
362         nb_ports*KNI_QUEUE_SIZE +
363         nb_lcores*nb_ports*DISPATCH_RING_SIZE),
364         (unsigned)8192);
365 
366     unsigned socketid = 0;
367     uint16_t i, lcore_id;
368     char s[64];
369 
370     for (i = 0; i < ff_global_cfg.dpdk.nb_procs; i++) {
371         lcore_id = ff_global_cfg.dpdk.proc_lcore[i];
372         if (numa_on) {
373             socketid = rte_lcore_to_socket_id(lcore_id);
374         }
375 
376         if (socketid >= NB_SOCKETS) {
377             rte_exit(EXIT_FAILURE, "Socket %d of lcore %u is out of range %d\n",
378                 socketid, i, NB_SOCKETS);
379         }
380 
381         if (pktmbuf_pool[socketid] != NULL) {
382             continue;
383         }
384 
385         if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
386             snprintf(s, sizeof(s), "mbuf_pool_%d", socketid);
387             pktmbuf_pool[socketid] =
388                 rte_pktmbuf_pool_create(s, nb_mbuf,
389                     MEMPOOL_CACHE_SIZE, 0,
390                     RTE_MBUF_DEFAULT_BUF_SIZE, socketid);
391         } else {
392             snprintf(s, sizeof(s), "mbuf_pool_%d", socketid);
393             pktmbuf_pool[socketid] = rte_mempool_lookup(s);
394         }
395 
396         if (pktmbuf_pool[socketid] == NULL) {
397             rte_exit(EXIT_FAILURE, "Cannot create mbuf pool on socket %d\n", socketid);
398         } else {
399             printf("create mbuf pool on socket %d\n", socketid);
400         }
401     }
402 
403     return 0;
404 }
405 
406 static struct rte_ring *
407 create_ring(const char *name, unsigned count, int socket_id, unsigned flags)
408 {
409     struct rte_ring *ring;
410 
411     if (name == NULL)
412         return NULL;
413 
414     /* If already create, just attached it */
415     if (likely((ring = rte_ring_lookup(name)) != NULL))
416         return ring;
417 
418     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
419         return rte_ring_create(name, count, socket_id, flags);
420     } else {
421         return rte_ring_lookup(name);
422     }
423 }
424 
425 static int
426 init_dispatch_ring(void)
427 {
428     int j;
429     char name_buf[RTE_RING_NAMESIZE];
430     int queueid;
431 
432     unsigned socketid = lcore_conf.socket_id;
433 
434     /* Create ring according to ports actually being used. */
435     int nb_ports = ff_global_cfg.dpdk.nb_ports;
436     for (j = 0; j < nb_ports; j++) {
437         uint16_t portid = ff_global_cfg.dpdk.portid_list[j];
438         struct ff_port_cfg *pconf = &ff_global_cfg.dpdk.port_cfgs[portid];
439         int nb_queues = pconf->nb_lcores;
440         if (dispatch_ring[portid] == NULL) {
441             snprintf(name_buf, RTE_RING_NAMESIZE, "ring_ptr_p%d", portid);
442 
443             dispatch_ring[portid] = rte_zmalloc(name_buf,
444                 sizeof(struct rte_ring *) * nb_queues,
445                 RTE_CACHE_LINE_SIZE);
446             if (dispatch_ring[portid] == NULL) {
447                 rte_exit(EXIT_FAILURE, "rte_zmalloc(%s (struct rte_ring*)) "
448                     "failed\n", name_buf);
449             }
450         }
451 
452         for(queueid = 0; queueid < nb_queues; ++queueid) {
453             snprintf(name_buf, RTE_RING_NAMESIZE, "dispatch_ring_p%d_q%d",
454                 portid, queueid);
455             dispatch_ring[portid][queueid] = create_ring(name_buf,
456                 DISPATCH_RING_SIZE, socketid, RING_F_SC_DEQ);
457 
458             if (dispatch_ring[portid][queueid] == NULL)
459                 rte_panic("create ring:%s failed!\n", name_buf);
460 
461             printf("create ring:%s success, %u ring entries are now free!\n",
462                 name_buf, rte_ring_free_count(dispatch_ring[portid][queueid]));
463         }
464     }
465 
466     return 0;
467 }
468 
469 static void
470 ff_msg_init(struct rte_mempool *mp,
471     __attribute__((unused)) void *opaque_arg,
472     void *obj, __attribute__((unused)) unsigned i)
473 {
474     struct ff_msg *msg = (struct ff_msg *)obj;
475     msg->msg_type = FF_UNKNOWN;
476     msg->buf_addr = (char *)msg + sizeof(struct ff_msg);
477     msg->buf_len = mp->elt_size - sizeof(struct ff_msg);
478 }
479 
480 static int
481 init_msg_ring(void)
482 {
483     uint16_t i;
484     uint16_t nb_procs = ff_global_cfg.dpdk.nb_procs;
485     unsigned socketid = lcore_conf.socket_id;
486 
487     /* Create message buffer pool */
488     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
489         message_pool = rte_mempool_create(FF_MSG_POOL,
490            MSG_RING_SIZE * 2 * nb_procs,
491            MAX_MSG_BUF_SIZE, MSG_RING_SIZE / 2, 0,
492            NULL, NULL, ff_msg_init, NULL,
493            socketid, 0);
494     } else {
495         message_pool = rte_mempool_lookup(FF_MSG_POOL);
496     }
497 
498     if (message_pool == NULL) {
499         rte_panic("Create msg mempool failed\n");
500     }
501 
502     for(i = 0; i < nb_procs; ++i) {
503         snprintf(msg_ring[i].ring_name[0], RTE_RING_NAMESIZE,
504             "%s%u", FF_MSG_RING_IN, i);
505         snprintf(msg_ring[i].ring_name[1], RTE_RING_NAMESIZE,
506             "%s%u", FF_MSG_RING_OUT, i);
507 
508         msg_ring[i].ring[0] = create_ring(msg_ring[i].ring_name[0],
509             MSG_RING_SIZE, socketid, RING_F_SP_ENQ | RING_F_SC_DEQ);
510         if (msg_ring[i].ring[0] == NULL)
511             rte_panic("create ring::%s failed!\n", msg_ring[i].ring_name[0]);
512 
513         msg_ring[i].ring[1] = create_ring(msg_ring[i].ring_name[1],
514             MSG_RING_SIZE, socketid, RING_F_SP_ENQ | RING_F_SC_DEQ);
515         if (msg_ring[i].ring[1] == NULL)
516             rte_panic("create ring::%s failed!\n", msg_ring[i].ring_name[0]);
517     }
518 
519     return 0;
520 }
521 
522 static int
523 init_kni(void)
524 {
525     int nb_ports = rte_eth_dev_count();
526     kni_accept = 0;
527     if(strcasecmp(ff_global_cfg.kni.method, "accept") == 0)
528         kni_accept = 1;
529 
530     ff_kni_init(nb_ports, ff_global_cfg.kni.tcp_port,
531         ff_global_cfg.kni.udp_port);
532 
533     unsigned socket_id = lcore_conf.socket_id;
534     struct rte_mempool *mbuf_pool = pktmbuf_pool[socket_id];
535 
536     nb_ports = ff_global_cfg.dpdk.nb_ports;
537     int i, ret;
538     for (i = 0; i < nb_ports; i++) {
539         uint16_t port_id = ff_global_cfg.dpdk.portid_list[i];
540         ff_kni_alloc(port_id, socket_id, mbuf_pool, KNI_QUEUE_SIZE);
541     }
542 
543     return 0;
544 }
545 
546 static void
547 set_rss_table(uint8_t port_id, uint16_t reta_size, uint16_t nb_queues)
548 {
549     if (reta_size == 0) {
550         return;
551     }
552 
553     int reta_conf_size = RTE_MAX(1, reta_size / RTE_RETA_GROUP_SIZE);
554     struct rte_eth_rss_reta_entry64 reta_conf[reta_conf_size];
555 
556     /* config HW indirection table */
557     unsigned i, j, hash=0;
558     for (i = 0; i < reta_conf_size; i++) {
559         reta_conf[i].mask = ~0ULL;
560         for (j = 0; j < RTE_RETA_GROUP_SIZE; j++) {
561             reta_conf[i].reta[j] = hash++ % nb_queues;
562         }
563     }
564 
565     if (rte_eth_dev_rss_reta_update(port_id, reta_conf, reta_size)) {
566         rte_exit(EXIT_FAILURE, "port[%d], failed to update rss table\n",
567             port_id);
568     }
569 }
570 
571 static int
572 init_port_start(void)
573 {
574     int nb_ports = ff_global_cfg.dpdk.nb_ports;
575     unsigned socketid = rte_lcore_to_socket_id(rte_lcore_id());
576     struct rte_mempool *mbuf_pool = pktmbuf_pool[socketid];
577     uint16_t i;
578 
579     for (i = 0; i < nb_ports; i++) {
580         uint16_t port_id = ff_global_cfg.dpdk.portid_list[i];
581         struct ff_port_cfg *pconf = &ff_global_cfg.dpdk.port_cfgs[port_id];
582         uint16_t nb_queues = pconf->nb_lcores;
583 
584         struct rte_eth_dev_info dev_info;
585         rte_eth_dev_info_get(port_id, &dev_info);
586 
587         if (nb_queues > dev_info.max_rx_queues) {
588             rte_exit(EXIT_FAILURE, "num_procs[%d] bigger than max_rx_queues[%d]\n",
589                 nb_queues,
590                 dev_info.max_rx_queues);
591         }
592 
593         if (nb_queues > dev_info.max_tx_queues) {
594             rte_exit(EXIT_FAILURE, "num_procs[%d] bigger than max_tx_queues[%d]\n",
595                 nb_queues,
596                 dev_info.max_tx_queues);
597         }
598 
599         struct ether_addr addr;
600         rte_eth_macaddr_get(port_id, &addr);
601         printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
602                    " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
603                 (unsigned)port_id,
604                 addr.addr_bytes[0], addr.addr_bytes[1],
605                 addr.addr_bytes[2], addr.addr_bytes[3],
606                 addr.addr_bytes[4], addr.addr_bytes[5]);
607 
608         rte_memcpy(pconf->mac,
609             addr.addr_bytes, ETHER_ADDR_LEN);
610 
611         /* Clear txq_flags - we do not need multi-mempool and refcnt */
612         dev_info.default_txconf.txq_flags = ETH_TXQ_FLAGS_NOMULTMEMP |
613             ETH_TXQ_FLAGS_NOREFCOUNT;
614 
615         /* Disable features that are not supported by port's HW */
616         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM)) {
617             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMUDP;
618         }
619 
620         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)) {
621             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMTCP;
622         }
623 
624         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_SCTP_CKSUM)) {
625             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMSCTP;
626         }
627 
628         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_VLAN_INSERT)) {
629             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOVLANOFFL;
630         }
631 
632         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_VLAN_INSERT)) {
633             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOVLANOFFL;
634         }
635 
636         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_TSO) &&
637             !(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_TSO)) {
638             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOMULTSEGS;
639         }
640 
641         struct rte_eth_conf port_conf = {0};
642 
643         /* Set RSS mode */
644         port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
645         port_conf.rx_adv_conf.rss_conf.rss_hf = ETH_RSS_PROTO_MASK;
646         port_conf.rx_adv_conf.rss_conf.rss_key = default_rsskey_40bytes;
647         port_conf.rx_adv_conf.rss_conf.rss_key_len = 40;
648 
649         /* Set Rx VLAN stripping */
650         if (ff_global_cfg.dpdk.vlan_strip) {
651             if (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_VLAN_STRIP) {
652                 port_conf.rxmode.hw_vlan_strip = 1;
653             }
654         }
655 
656         /* Enable HW CRC stripping */
657         port_conf.rxmode.hw_strip_crc = 1;
658 
659         /* FIXME: Enable TCP LRO ?*/
660         #if 0
661         if (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_LRO) {
662             printf("LRO is supported\n");
663             port_conf.rxmode.enable_lro = 1;
664             pconf->hw_features.rx_lro = 1;
665         }
666         #endif
667 
668         /* Set Rx checksum checking */
669         if ((dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
670             (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_UDP_CKSUM) &&
671             (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)) {
672             printf("RX checksum offload supported\n");
673             port_conf.rxmode.hw_ip_checksum = 1;
674             pconf->hw_features.rx_csum = 1;
675         }
676 
677         if ((dev_info.tx_offload_capa & DEV_TX_OFFLOAD_IPV4_CKSUM)) {
678             printf("TX ip checksum offload supported\n");
679             pconf->hw_features.tx_csum_ip = 1;
680         }
681 
682         if ((dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM) &&
683             (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)) {
684             printf("TX TCP&UDP checksum offload supported\n");
685             pconf->hw_features.tx_csum_l4 = 1;
686         }
687 
688         if (ff_global_cfg.dpdk.tso) {
689             if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_TSO) {
690                 printf("TSO is supported\n");
691                 pconf->hw_features.tx_tso = 1;
692             }
693         } else {
694             printf("TSO is disabled\n");
695         }
696 
697         if (dev_info.reta_size) {
698             /* reta size must be power of 2 */
699             assert((dev_info.reta_size & (dev_info.reta_size - 1)) == 0);
700 
701             rss_reta_size[port_id] = dev_info.reta_size;
702             printf("port[%d]: rss table size: %d\n", port_id,
703                 dev_info.reta_size);
704         }
705 
706         if (rte_eal_process_type() != RTE_PROC_PRIMARY) {
707             continue;
708         }
709 
710         int ret = rte_eth_dev_configure(port_id, nb_queues, nb_queues, &port_conf);
711         if (ret != 0) {
712             return ret;
713         }
714         uint16_t q;
715         for (q = 0; q < nb_queues; q++) {
716             ret = rte_eth_tx_queue_setup(port_id, q, TX_QUEUE_SIZE,
717                 socketid, &dev_info.default_txconf);
718             if (ret < 0) {
719                 return ret;
720             }
721 
722             ret = rte_eth_rx_queue_setup(port_id, q, RX_QUEUE_SIZE,
723                 socketid, &dev_info.default_rxconf, mbuf_pool);
724             if (ret < 0) {
725                 return ret;
726             }
727         }
728 
729         ret = rte_eth_dev_start(port_id);
730         if (ret < 0) {
731             return ret;
732         }
733 
734         if (nb_queues > 1) {
735             /* set HW rss hash function to Toeplitz. */
736             if (!rte_eth_dev_filter_supported(port_id, RTE_ETH_FILTER_HASH)) {
737                 struct rte_eth_hash_filter_info info = {0};
738                 info.info_type = RTE_ETH_HASH_FILTER_GLOBAL_CONFIG;
739                 info.info.global_conf.hash_func = RTE_ETH_HASH_FUNCTION_TOEPLITZ;
740 
741                 if (rte_eth_dev_filter_ctrl(port_id, RTE_ETH_FILTER_HASH,
742                     RTE_ETH_FILTER_SET, &info) < 0) {
743                     rte_exit(EXIT_FAILURE, "port[%d] set hash func failed\n",
744                         port_id);
745                 }
746             }
747 
748             set_rss_table(port_id, dev_info.reta_size, nb_queues);
749         }
750 
751         /* Enable RX in promiscuous mode for the Ethernet device. */
752         if (ff_global_cfg.dpdk.promiscuous) {
753             rte_eth_promiscuous_enable(port_id);
754             ret = rte_eth_promiscuous_get(port_id);
755             if (ret == 1) {
756                 printf("set port %u to promiscuous mode ok\n", port_id);
757             } else {
758                 printf("set port %u to promiscuous mode error\n", port_id);
759             }
760         }
761 
762         /* Enable pcap dump */
763         if (pconf->pcap) {
764             ff_enable_pcap(pconf->pcap);
765         }
766     }
767 
768     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
769         check_all_ports_link_status();
770     }
771 
772     return 0;
773 }
774 
775 static int
776 init_clock(void)
777 {
778     rte_timer_subsystem_init();
779     uint64_t hz = rte_get_timer_hz();
780     uint64_t intrs = MS_PER_S/ff_global_cfg.freebsd.hz;
781     uint64_t tsc = (hz + MS_PER_S - 1) / MS_PER_S*intrs;
782 
783     rte_timer_init(&freebsd_clock);
784     rte_timer_reset(&freebsd_clock, tsc, PERIODICAL,
785         rte_lcore_id(), &ff_hardclock_job, NULL);
786 
787     ff_update_current_ts();
788 
789     return 0;
790 }
791 
792 int
793 ff_dpdk_init(int argc, char **argv)
794 {
795     if (ff_global_cfg.dpdk.nb_procs < 1 ||
796         ff_global_cfg.dpdk.nb_procs > RTE_MAX_LCORE ||
797         ff_global_cfg.dpdk.proc_id >= ff_global_cfg.dpdk.nb_procs ||
798         ff_global_cfg.dpdk.proc_id < 0) {
799         printf("param num_procs[%d] or proc_id[%d] error!\n",
800             ff_global_cfg.dpdk.nb_procs,
801             ff_global_cfg.dpdk.proc_id);
802         exit(1);
803     }
804 
805     int ret = rte_eal_init(argc, argv);
806     if (ret < 0) {
807         rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
808     }
809 
810     numa_on = ff_global_cfg.dpdk.numa_on;
811 
812     init_lcore_conf();
813 
814     init_mem_pool();
815 
816     init_dispatch_ring();
817 
818     init_msg_ring();
819 
820     enable_kni = ff_global_cfg.kni.enable;
821     if (enable_kni) {
822         init_kni();
823     }
824 
825     ret = init_port_start();
826     if (ret < 0) {
827         rte_exit(EXIT_FAILURE, "init_port_start failed\n");
828     }
829 
830     init_clock();
831 
832     return 0;
833 }
834 
835 static void
836 ff_veth_input(const struct ff_dpdk_if_context *ctx, struct rte_mbuf *pkt)
837 {
838     uint8_t rx_csum = ctx->hw_features.rx_csum;
839     if (rx_csum) {
840         if (pkt->ol_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD)) {
841             return;
842         }
843     }
844 
845     /*
846      * FIXME: should we save pkt->vlan_tci
847      * if (pkt->ol_flags & PKT_RX_VLAN_PKT)
848      */
849 
850     void *data = rte_pktmbuf_mtod(pkt, void*);
851     uint16_t len = rte_pktmbuf_data_len(pkt);
852 
853     void *hdr = ff_mbuf_gethdr(pkt, pkt->pkt_len, data, len, rx_csum);
854     if (hdr == NULL) {
855         rte_pktmbuf_free(pkt);
856         return;
857     }
858 
859     struct rte_mbuf *pn = pkt->next;
860     void *prev = hdr;
861     while(pn != NULL) {
862         data = rte_pktmbuf_mtod(pn, void*);
863         len = rte_pktmbuf_data_len(pn);
864 
865         void *mb = ff_mbuf_get(prev, data, len);
866         if (mb == NULL) {
867             ff_mbuf_free(hdr);
868             rte_pktmbuf_free(pkt);
869             return;
870         }
871         pn = pn->next;
872         prev = mb;
873     }
874 
875     ff_veth_process_packet(ctx->ifp, hdr);
876 }
877 
878 static enum FilterReturn
879 protocol_filter(const void *data, uint16_t len)
880 {
881     if(len < ETHER_HDR_LEN)
882         return FILTER_UNKNOWN;
883 
884     const struct ether_hdr *hdr;
885     hdr = (const struct ether_hdr *)data;
886 
887     if(ntohs(hdr->ether_type) == ETHER_TYPE_ARP)
888         return FILTER_ARP;
889 
890     if (!enable_kni) {
891         return FILTER_UNKNOWN;
892     }
893 
894     if(ntohs(hdr->ether_type) != ETHER_TYPE_IPv4)
895         return FILTER_UNKNOWN;
896 
897     return ff_kni_proto_filter(data + ETHER_HDR_LEN,
898         len - ETHER_HDR_LEN);
899 }
900 
901 static inline void
902 pktmbuf_deep_attach(struct rte_mbuf *mi, const struct rte_mbuf *m)
903 {
904     struct rte_mbuf *md;
905     void *src, *dst;
906 
907     dst = rte_pktmbuf_mtod(mi, void *);
908     src = rte_pktmbuf_mtod(m, void *);
909 
910     mi->data_len = m->data_len;
911     rte_memcpy(dst, src, m->data_len);
912 
913     mi->port = m->port;
914     mi->vlan_tci = m->vlan_tci;
915     mi->vlan_tci_outer = m->vlan_tci_outer;
916     mi->tx_offload = m->tx_offload;
917     mi->hash = m->hash;
918     mi->ol_flags = m->ol_flags;
919     mi->packet_type = m->packet_type;
920 }
921 
922 /* copied from rte_pktmbuf_clone */
923 static inline struct rte_mbuf *
924 pktmbuf_deep_clone(const struct rte_mbuf *md,
925     struct rte_mempool *mp)
926 {
927     struct rte_mbuf *mc, *mi, **prev;
928     uint32_t pktlen;
929     uint8_t nseg;
930 
931     if (unlikely ((mc = rte_pktmbuf_alloc(mp)) == NULL))
932         return NULL;
933 
934     mi = mc;
935     prev = &mi->next;
936     pktlen = md->pkt_len;
937     nseg = 0;
938 
939     do {
940         nseg++;
941         pktmbuf_deep_attach(mi, md);
942         *prev = mi;
943         prev = &mi->next;
944     } while ((md = md->next) != NULL &&
945         (mi = rte_pktmbuf_alloc(mp)) != NULL);
946 
947     *prev = NULL;
948     mc->nb_segs = nseg;
949     mc->pkt_len = pktlen;
950 
951     /* Allocation of new indirect segment failed */
952     if (unlikely (mi == NULL)) {
953         rte_pktmbuf_free(mc);
954         return NULL;
955     }
956 
957     __rte_mbuf_sanity_check(mc, 1);
958     return mc;
959 }
960 
961 static inline void
962 process_packets(uint8_t port_id, uint16_t queue_id, struct rte_mbuf **bufs,
963     uint16_t count, const struct ff_dpdk_if_context *ctx, int pkts_from_ring)
964 {
965     struct lcore_conf *qconf = &lcore_conf;
966     uint16_t nb_queues = qconf->nb_queue_list[port_id];
967 
968     uint16_t i;
969     for (i = 0; i < count; i++) {
970         struct rte_mbuf *rtem = bufs[i];
971 
972         if (unlikely(qconf->pcap[port_id] != NULL)) {
973             if (!pkts_from_ring) {
974                 ff_dump_packets(qconf->pcap[port_id], rtem);
975             }
976         }
977 
978         void *data = rte_pktmbuf_mtod(rtem, void*);
979         uint16_t len = rte_pktmbuf_data_len(rtem);
980 
981         if (!pkts_from_ring && packet_dispatcher) {
982             int ret = (*packet_dispatcher)(data, len, queue_id, nb_queues);
983             if (ret < 0 || ret >= nb_queues) {
984                 rte_pktmbuf_free(rtem);
985                 continue;
986             }
987 
988             if (ret != queue_id) {
989                 ret = rte_ring_enqueue(dispatch_ring[port_id][ret], rtem);
990                 if (ret < 0)
991                     rte_pktmbuf_free(rtem);
992 
993                 continue;
994             }
995         }
996 
997         enum FilterReturn filter = protocol_filter(data, len);
998         if (filter == FILTER_ARP) {
999             struct rte_mempool *mbuf_pool;
1000             struct rte_mbuf *mbuf_clone;
1001             if (!pkts_from_ring) {
1002                 uint16_t j;
1003                 for(j = 0; j < nb_queues; ++j) {
1004                     if(j == queue_id)
1005                         continue;
1006 
1007                     unsigned socket_id = 0;
1008                     if (numa_on) {
1009                         uint16_t lcore_id = qconf->port_cfgs[port_id].lcore_list[j];
1010                         socket_id = rte_lcore_to_socket_id(lcore_id);
1011                     }
1012                     mbuf_pool = pktmbuf_pool[socket_id];
1013                     mbuf_clone = pktmbuf_deep_clone(rtem, mbuf_pool);
1014                     if(mbuf_clone) {
1015                         int ret = rte_ring_enqueue(dispatch_ring[port_id][j],
1016                             mbuf_clone);
1017                         if (ret < 0)
1018                             rte_pktmbuf_free(mbuf_clone);
1019                     }
1020                 }
1021             }
1022 
1023             if (enable_kni && rte_eal_process_type() == RTE_PROC_PRIMARY) {
1024                 mbuf_pool = pktmbuf_pool[qconf->socket_id];
1025                 mbuf_clone = pktmbuf_deep_clone(rtem, mbuf_pool);
1026                 if(mbuf_clone) {
1027                     ff_kni_enqueue(port_id, mbuf_clone);
1028                 }
1029             }
1030 
1031             ff_veth_input(ctx, rtem);
1032         } else if (enable_kni &&
1033             ((filter == FILTER_KNI && kni_accept) ||
1034             (filter == FILTER_UNKNOWN && !kni_accept)) ) {
1035             ff_kni_enqueue(port_id, rtem);
1036         } else {
1037             ff_veth_input(ctx, rtem);
1038         }
1039     }
1040 }
1041 
1042 static inline int
1043 process_dispatch_ring(uint8_t port_id, uint16_t queue_id,
1044     struct rte_mbuf **pkts_burst, const struct ff_dpdk_if_context *ctx)
1045 {
1046     /* read packet from ring buf and to process */
1047     uint16_t nb_rb;
1048     nb_rb = rte_ring_dequeue_burst(dispatch_ring[port_id][queue_id],
1049         (void **)pkts_burst, MAX_PKT_BURST);
1050 
1051     if(nb_rb > 0) {
1052         process_packets(port_id, queue_id, pkts_burst, nb_rb, ctx, 1);
1053     }
1054 
1055     return 0;
1056 }
1057 
1058 static inline void
1059 handle_sysctl_msg(struct ff_msg *msg)
1060 {
1061     int ret = ff_sysctl(msg->sysctl.name, msg->sysctl.namelen,
1062         msg->sysctl.old, msg->sysctl.oldlenp, msg->sysctl.new,
1063         msg->sysctl.newlen);
1064 
1065     if (ret < 0) {
1066         msg->result = errno;
1067     } else {
1068         msg->result = 0;
1069     }
1070 }
1071 
1072 static inline void
1073 handle_ioctl_msg(struct ff_msg *msg)
1074 {
1075     int fd, ret;
1076     fd = ff_socket(AF_INET, SOCK_DGRAM, 0);
1077     if (fd < 0) {
1078         ret = -1;
1079         goto done;
1080     }
1081 
1082     ret = ff_ioctl(fd, msg->ioctl.cmd, msg->ioctl.data);
1083 
1084     ff_close(fd);
1085 
1086 done:
1087     if (ret < 0) {
1088         msg->result = errno;
1089     } else {
1090         msg->result = 0;
1091     }
1092 }
1093 
1094 static inline void
1095 handle_route_msg(struct ff_msg *msg)
1096 {
1097     int ret = ff_rtioctl(msg->route.fib, msg->route.data,
1098         &msg->route.len, msg->route.maxlen);
1099     if (ret < 0) {
1100         msg->result = errno;
1101     } else {
1102         msg->result = 0;
1103     }
1104 }
1105 
1106 static struct ff_top_args ff_status;
1107 static inline void
1108 handle_top_msg(struct ff_msg *msg)
1109 {
1110     msg->top = ff_status;
1111     msg->result = 0;
1112 }
1113 
1114 #ifdef FF_NETGRAPH
1115 static inline void
1116 handle_ngctl_msg(struct ff_msg *msg)
1117 {
1118     int ret = ff_ngctl(msg->ngctl.cmd, msg->ngctl.data);
1119     if (ret < 0) {
1120         msg->result = errno;
1121     } else {
1122         msg->result = 0;
1123         msg->ngctl.ret = ret;
1124     }
1125 }
1126 #endif
1127 
1128 #ifdef FF_IPFW
1129 static inline void
1130 handle_ipfw_msg(struct ff_msg *msg)
1131 {
1132     int fd, ret;
1133     fd = ff_socket(AF_INET, SOCK_RAW, IPPROTO_RAW);
1134     if (fd < 0) {
1135         ret = -1;
1136         goto done;
1137     }
1138 
1139     switch (msg->ipfw.cmd) {
1140         case FF_IPFW_GET:
1141             ret = ff_getsockopt(fd, msg->ipfw.level,
1142                 msg->ipfw.optname, msg->ipfw.optval,
1143                 msg->ipfw.optlen);
1144             break;
1145         case FF_IPFW_SET:
1146             ret = ff_setsockopt(fd, msg->ipfw.level,
1147                 msg->ipfw.optname, msg->ipfw.optval,
1148                 *(msg->ipfw.optlen));
1149             break;
1150         default:
1151             ret = -1;
1152             errno = ENOTSUP;
1153             break;
1154     }
1155 
1156     ff_close(fd);
1157 
1158 done:
1159     if (ret < 0) {
1160         msg->result = errno;
1161     } else {
1162         msg->result = 0;
1163     }
1164 }
1165 #endif
1166 
1167 static inline void
1168 handle_default_msg(struct ff_msg *msg)
1169 {
1170     msg->result = ENOTSUP;
1171 }
1172 
1173 static inline void
1174 handle_msg(struct ff_msg *msg, uint16_t proc_id)
1175 {
1176     switch (msg->msg_type) {
1177         case FF_SYSCTL:
1178             handle_sysctl_msg(msg);
1179             break;
1180         case FF_IOCTL:
1181             handle_ioctl_msg(msg);
1182             break;
1183         case FF_ROUTE:
1184             handle_route_msg(msg);
1185             break;
1186         case FF_TOP:
1187             handle_top_msg(msg);
1188             break;
1189 #ifdef FF_NETGRAPH
1190         case FF_NGCTL:
1191             handle_ngctl_msg(msg);
1192             break;
1193 #endif
1194 #ifdef FF_IPFW
1195         case FF_IPFW_CTL:
1196             handle_ipfw_msg(msg);
1197             break;
1198 #endif
1199         default:
1200             handle_default_msg(msg);
1201             break;
1202     }
1203     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
1204 }
1205 
1206 static inline int
1207 process_msg_ring(uint16_t proc_id)
1208 {
1209     void *msg;
1210     int ret = rte_ring_dequeue(msg_ring[proc_id].ring[0], &msg);
1211 
1212     if (unlikely(ret == 0)) {
1213         handle_msg((struct ff_msg *)msg, proc_id);
1214     }
1215 
1216     return 0;
1217 }
1218 
1219 /* Send burst of packets on an output interface */
1220 static inline int
1221 send_burst(struct lcore_conf *qconf, uint16_t n, uint8_t port)
1222 {
1223     struct rte_mbuf **m_table;
1224     int ret;
1225     uint16_t queueid;
1226 
1227     queueid = qconf->tx_queue_id[port];
1228     m_table = (struct rte_mbuf **)qconf->tx_mbufs[port].m_table;
1229 
1230     if (unlikely(qconf->pcap[port] != NULL)) {
1231         uint16_t i;
1232         for (i = 0; i < n; i++) {
1233             ff_dump_packets(qconf->pcap[port], m_table[i]);
1234         }
1235     }
1236 
1237     ret = rte_eth_tx_burst(port, queueid, m_table, n);
1238     if (unlikely(ret < n)) {
1239         do {
1240             rte_pktmbuf_free(m_table[ret]);
1241         } while (++ret < n);
1242     }
1243 
1244     return 0;
1245 }
1246 
1247 /* Enqueue a single packet, and send burst if queue is filled */
1248 static inline int
1249 send_single_packet(struct rte_mbuf *m, uint8_t port)
1250 {
1251     uint16_t len;
1252     struct lcore_conf *qconf;
1253 
1254     qconf = &lcore_conf;
1255     len = qconf->tx_mbufs[port].len;
1256     qconf->tx_mbufs[port].m_table[len] = m;
1257     len++;
1258 
1259     /* enough pkts to be sent */
1260     if (unlikely(len == MAX_PKT_BURST)) {
1261         send_burst(qconf, MAX_PKT_BURST, port);
1262         len = 0;
1263     }
1264 
1265     qconf->tx_mbufs[port].len = len;
1266     return 0;
1267 }
1268 
1269 int
1270 ff_dpdk_if_send(struct ff_dpdk_if_context *ctx, void *m,
1271     int total)
1272 {
1273     struct rte_mempool *mbuf_pool = pktmbuf_pool[lcore_conf.socket_id];
1274     struct rte_mbuf *head = rte_pktmbuf_alloc(mbuf_pool);
1275     if (head == NULL) {
1276         ff_mbuf_free(m);
1277         return -1;
1278     }
1279 
1280     head->pkt_len = total;
1281     head->nb_segs = 0;
1282 
1283     int off = 0;
1284     struct rte_mbuf *cur = head, *prev = NULL;
1285     while(total > 0) {
1286         if (cur == NULL) {
1287             cur = rte_pktmbuf_alloc(mbuf_pool);
1288             if (cur == NULL) {
1289                 rte_pktmbuf_free(head);
1290                 ff_mbuf_free(m);
1291                 return -1;
1292             }
1293         }
1294 
1295         void *data = rte_pktmbuf_mtod(cur, void*);
1296         int len = total > RTE_MBUF_DEFAULT_DATAROOM ? RTE_MBUF_DEFAULT_DATAROOM : total;
1297         int ret = ff_mbuf_copydata(m, data, off, len);
1298         if (ret < 0) {
1299             rte_pktmbuf_free(head);
1300             ff_mbuf_free(m);
1301             return -1;
1302         }
1303 
1304         if (prev != NULL) {
1305             prev->next = cur;
1306         }
1307         prev = cur;
1308 
1309         cur->data_len = len;
1310         off += len;
1311         total -= len;
1312         head->nb_segs++;
1313         cur = NULL;
1314     }
1315 
1316     struct ff_tx_offload offload = {0};
1317     ff_mbuf_tx_offload(m, &offload);
1318 
1319     void *data = rte_pktmbuf_mtod(head, void*);
1320 
1321     if (offload.ip_csum) {
1322         /* ipv6 not supported yet */
1323         struct ipv4_hdr *iph;
1324         int iph_len;
1325         iph = (struct ipv4_hdr *)(data + ETHER_HDR_LEN);
1326         iph_len = (iph->version_ihl & 0x0f) << 2;
1327 
1328         head->ol_flags |= PKT_TX_IP_CKSUM | PKT_TX_IPV4;
1329         head->l2_len = ETHER_HDR_LEN;
1330         head->l3_len = iph_len;
1331     }
1332 
1333     if (ctx->hw_features.tx_csum_l4) {
1334         struct ipv4_hdr *iph;
1335         int iph_len;
1336         iph = (struct ipv4_hdr *)(data + ETHER_HDR_LEN);
1337         iph_len = (iph->version_ihl & 0x0f) << 2;
1338 
1339         if (offload.tcp_csum) {
1340             head->ol_flags |= PKT_TX_TCP_CKSUM;
1341             head->l2_len = ETHER_HDR_LEN;
1342             head->l3_len = iph_len;
1343         }
1344 
1345         /*
1346          *  TCP segmentation offload.
1347          *
1348          *  - set the PKT_TX_TCP_SEG flag in mbuf->ol_flags (this flag
1349          *    implies PKT_TX_TCP_CKSUM)
1350          *  - set the flag PKT_TX_IPV4 or PKT_TX_IPV6
1351          *  - if it's IPv4, set the PKT_TX_IP_CKSUM flag and
1352          *    write the IP checksum to 0 in the packet
1353          *  - fill the mbuf offload information: l2_len,
1354          *    l3_len, l4_len, tso_segsz
1355          *  - calculate the pseudo header checksum without taking ip_len
1356          *    in account, and set it in the TCP header. Refer to
1357          *    rte_ipv4_phdr_cksum() and rte_ipv6_phdr_cksum() that can be
1358          *    used as helpers.
1359          */
1360         if (offload.tso_seg_size) {
1361             struct tcp_hdr *tcph;
1362             int tcph_len;
1363             tcph = (struct tcp_hdr *)((char *)iph + iph_len);
1364             tcph_len = (tcph->data_off & 0xf0) >> 2;
1365             tcph->cksum = rte_ipv4_phdr_cksum(iph, PKT_TX_TCP_SEG);
1366 
1367             head->ol_flags |= PKT_TX_TCP_SEG;
1368             head->l4_len = tcph_len;
1369             head->tso_segsz = offload.tso_seg_size;
1370         }
1371 
1372         if (offload.udp_csum) {
1373             head->ol_flags |= PKT_TX_UDP_CKSUM;
1374             head->l2_len = ETHER_HDR_LEN;
1375             head->l3_len = iph_len;
1376         }
1377     }
1378 
1379     ff_mbuf_free(m);
1380 
1381     return send_single_packet(head, ctx->port_id);
1382 }
1383 
1384 static int
1385 main_loop(void *arg)
1386 {
1387     struct loop_routine *lr = (struct loop_routine *)arg;
1388 
1389     struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
1390     uint64_t prev_tsc, diff_tsc, cur_tsc, usch_tsc, div_tsc, usr_tsc, sys_tsc, end_tsc;
1391     int i, j, nb_rx, idle;
1392     uint8_t port_id, queue_id;
1393     struct lcore_conf *qconf;
1394     const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) /
1395         US_PER_S * BURST_TX_DRAIN_US;
1396     struct ff_dpdk_if_context *ctx;
1397 
1398     prev_tsc = 0;
1399     usch_tsc = 0;
1400 
1401     qconf = &lcore_conf;
1402 
1403     while (1) {
1404         cur_tsc = rte_rdtsc();
1405         if (unlikely(freebsd_clock.expire < cur_tsc)) {
1406             rte_timer_manage();
1407         }
1408 
1409         idle = 1;
1410         sys_tsc = 0;
1411         usr_tsc = 0;
1412 
1413         /*
1414          * TX burst queue drain
1415          */
1416         diff_tsc = cur_tsc - prev_tsc;
1417         if (unlikely(diff_tsc > drain_tsc)) {
1418             for (i = 0; i < qconf->nb_tx_port; i++) {
1419                 port_id = qconf->tx_port_id[i];
1420                 if (qconf->tx_mbufs[port_id].len == 0)
1421                     continue;
1422 
1423                 idle = 0;
1424 
1425                 send_burst(qconf,
1426                     qconf->tx_mbufs[port_id].len,
1427                     port_id);
1428                 qconf->tx_mbufs[port_id].len = 0;
1429             }
1430 
1431             prev_tsc = cur_tsc;
1432         }
1433 
1434         /*
1435          * Read packet from RX queues
1436          */
1437         for (i = 0; i < qconf->nb_rx_queue; ++i) {
1438             port_id = qconf->rx_queue_list[i].port_id;
1439             queue_id = qconf->rx_queue_list[i].queue_id;
1440             ctx = veth_ctx[port_id];
1441 
1442             if (enable_kni && rte_eal_process_type() == RTE_PROC_PRIMARY) {
1443                 ff_kni_process(port_id, queue_id, pkts_burst, MAX_PKT_BURST);
1444             }
1445 
1446             process_dispatch_ring(port_id, queue_id, pkts_burst, ctx);
1447 
1448             nb_rx = rte_eth_rx_burst(port_id, queue_id, pkts_burst,
1449                 MAX_PKT_BURST);
1450             if (nb_rx == 0)
1451                 continue;
1452 
1453             idle = 0;
1454 
1455             /* Prefetch first packets */
1456             for (j = 0; j < PREFETCH_OFFSET && j < nb_rx; j++) {
1457                 rte_prefetch0(rte_pktmbuf_mtod(
1458                         pkts_burst[j], void *));
1459             }
1460 
1461             /* Prefetch and handle already prefetched packets */
1462             for (j = 0; j < (nb_rx - PREFETCH_OFFSET); j++) {
1463                 rte_prefetch0(rte_pktmbuf_mtod(pkts_burst[
1464                         j + PREFETCH_OFFSET], void *));
1465                 process_packets(port_id, queue_id, &pkts_burst[j], 1, ctx, 0);
1466             }
1467 
1468             /* Handle remaining prefetched packets */
1469             for (; j < nb_rx; j++) {
1470                 process_packets(port_id, queue_id, &pkts_burst[j], 1, ctx, 0);
1471             }
1472         }
1473 
1474         process_msg_ring(qconf->proc_id);
1475 
1476         div_tsc = rte_rdtsc();
1477 
1478         if (likely(lr->loop != NULL && (!idle || cur_tsc - usch_tsc > drain_tsc))) {
1479             usch_tsc = cur_tsc;
1480             lr->loop(lr->arg);
1481         }
1482 
1483         end_tsc = rte_rdtsc();
1484 
1485         if (usch_tsc == cur_tsc) {
1486             usr_tsc = end_tsc - div_tsc;
1487         }
1488 
1489         if (!idle) {
1490             sys_tsc = div_tsc - cur_tsc;
1491             ff_status.sys_tsc += sys_tsc;
1492         }
1493 
1494         ff_status.usr_tsc += usr_tsc;
1495         ff_status.work_tsc += end_tsc - cur_tsc;
1496         ff_status.idle_tsc += end_tsc - cur_tsc - usr_tsc - sys_tsc;
1497 
1498         ff_status.loops++;
1499     }
1500 
1501     return 0;
1502 }
1503 
1504 int
1505 ff_dpdk_if_up(void) {
1506     int i;
1507     struct lcore_conf *qconf = &lcore_conf;
1508     for (i = 0; i < qconf->nb_tx_port; i++) {
1509         uint16_t port_id = qconf->tx_port_id[i];
1510 
1511         struct ff_port_cfg *pconf = &qconf->port_cfgs[port_id];
1512         veth_ctx[port_id] = ff_veth_attach(pconf);
1513         if (veth_ctx[port_id] == NULL) {
1514             rte_exit(EXIT_FAILURE, "ff_veth_attach failed");
1515         }
1516     }
1517 
1518     return 0;
1519 }
1520 
1521 void
1522 ff_dpdk_run(loop_func_t loop, void *arg) {
1523     struct loop_routine *lr = rte_malloc(NULL,
1524         sizeof(struct loop_routine), 0);
1525     lr->loop = loop;
1526     lr->arg = arg;
1527     rte_eal_mp_remote_launch(main_loop, lr, CALL_MASTER);
1528     rte_eal_mp_wait_lcore();
1529     rte_free(lr);
1530 }
1531 
1532 void
1533 ff_dpdk_pktmbuf_free(void *m)
1534 {
1535     rte_pktmbuf_free((struct rte_mbuf *)m);
1536 }
1537 
1538 static uint32_t
1539 toeplitz_hash(unsigned keylen, const uint8_t *key,
1540     unsigned datalen, const uint8_t *data)
1541 {
1542     uint32_t hash = 0, v;
1543     u_int i, b;
1544 
1545     /* XXXRW: Perhaps an assertion about key length vs. data length? */
1546 
1547     v = (key[0]<<24) + (key[1]<<16) + (key[2] <<8) + key[3];
1548     for (i = 0; i < datalen; i++) {
1549         for (b = 0; b < 8; b++) {
1550             if (data[i] & (1<<(7-b)))
1551                 hash ^= v;
1552             v <<= 1;
1553             if ((i + 4) < keylen &&
1554                 (key[i+4] & (1<<(7-b))))
1555                 v |= 1;
1556         }
1557     }
1558     return (hash);
1559 }
1560 
1561 int
1562 ff_rss_check(void *softc, uint32_t saddr, uint32_t daddr,
1563     uint16_t sport, uint16_t dport)
1564 {
1565     struct lcore_conf *qconf = &lcore_conf;
1566     struct ff_dpdk_if_context *ctx = ff_veth_softc_to_hostc(softc);
1567     uint16_t nb_queues = qconf->nb_queue_list[ctx->port_id];
1568 
1569     if (nb_queues <= 1) {
1570         return 1;
1571     }
1572 
1573     uint16_t reta_size = rss_reta_size[ctx->port_id];
1574     uint16_t queueid = qconf->tx_queue_id[ctx->port_id];
1575 
1576     uint8_t data[sizeof(saddr) + sizeof(daddr) + sizeof(sport) +
1577         sizeof(dport)];
1578 
1579     unsigned datalen = 0;
1580 
1581     bcopy(&saddr, &data[datalen], sizeof(saddr));
1582     datalen += sizeof(saddr);
1583 
1584     bcopy(&daddr, &data[datalen], sizeof(daddr));
1585     datalen += sizeof(daddr);
1586 
1587     bcopy(&sport, &data[datalen], sizeof(sport));
1588     datalen += sizeof(sport);
1589 
1590     bcopy(&dport, &data[datalen], sizeof(dport));
1591     datalen += sizeof(dport);
1592 
1593     uint32_t hash = toeplitz_hash(sizeof(default_rsskey_40bytes),
1594         default_rsskey_40bytes, datalen, data);
1595 
1596     return ((hash & (reta_size - 1)) % nb_queues) == queueid;
1597 }
1598 
1599 void
1600 ff_regist_packet_dispatcher(dispatch_func_t func)
1601 {
1602     packet_dispatcher = func;
1603 }
1604