xref: /f-stack/lib/ff_dpdk_if.c (revision 0e1bd6da)
1 /*
2  * Copyright (C) 2017 THL A29 Limited, a Tencent company.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * 1. Redistributions of source code must retain the above copyright notice, this
9  *   list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright notice,
11  *   this list of conditions and the following disclaimer in the documentation
12  *   and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17  * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
18  * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
19  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
20  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
21  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
22  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
23  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24  *
25  */
26 #include <assert.h>
27 
28 #include <rte_common.h>
29 #include <rte_byteorder.h>
30 #include <rte_log.h>
31 #include <rte_memory.h>
32 #include <rte_memcpy.h>
33 #include <rte_memzone.h>
34 #include <rte_config.h>
35 #include <rte_eal.h>
36 #include <rte_pci.h>
37 #include <rte_mbuf.h>
38 #include <rte_memory.h>
39 #include <rte_lcore.h>
40 #include <rte_launch.h>
41 #include <rte_ethdev.h>
42 #include <rte_debug.h>
43 #include <rte_common.h>
44 #include <rte_ether.h>
45 #include <rte_malloc.h>
46 #include <rte_cycles.h>
47 #include <rte_timer.h>
48 #include <rte_thash.h>
49 #include <rte_ip.h>
50 #include <rte_tcp.h>
51 #include <rte_udp.h>
52 
53 #include "ff_dpdk_if.h"
54 #include "ff_dpdk_pcap.h"
55 #include "ff_dpdk_kni.h"
56 #include "ff_config.h"
57 #include "ff_veth.h"
58 #include "ff_host_interface.h"
59 #include "ff_msg.h"
60 #include "ff_api.h"
61 
62 #define MEMPOOL_CACHE_SIZE 256
63 
64 #define DISPATCH_RING_SIZE 2048
65 
66 #define MSG_RING_SIZE 32
67 
68 /*
69  * Configurable number of RX/TX ring descriptors
70  */
71 #define RX_QUEUE_SIZE 512
72 #define TX_QUEUE_SIZE 512
73 
74 #define MAX_PKT_BURST 32
75 #define BURST_TX_DRAIN_US 100 /* TX drain every ~100us */
76 
77 /*
78  * Try to avoid TX buffering if we have at least MAX_TX_BURST packets to send.
79  */
80 #define MAX_TX_BURST    (MAX_PKT_BURST / 2)
81 
82 #define NB_SOCKETS 8
83 
84 /* Configure how many packets ahead to prefetch, when reading packets */
85 #define PREFETCH_OFFSET    3
86 
87 #define MAX_RX_QUEUE_PER_LCORE 16
88 #define MAX_TX_QUEUE_PER_PORT RTE_MAX_ETHPORTS
89 #define MAX_RX_QUEUE_PER_PORT 128
90 
91 #define KNI_MBUF_MAX 2048
92 #define KNI_QUEUE_SIZE 2048
93 
94 static int enable_kni;
95 static int kni_accept;
96 
97 static int numa_on;
98 
99 static struct rte_timer freebsd_clock;
100 
101 // Mellanox Linux's driver key
102 static uint8_t default_rsskey_40bytes[40] = {
103     0xd1, 0x81, 0xc6, 0x2c, 0xf7, 0xf4, 0xdb, 0x5b,
104     0x19, 0x83, 0xa2, 0xfc, 0x94, 0x3e, 0x1a, 0xdb,
105     0xd9, 0x38, 0x9e, 0x6b, 0xd1, 0x03, 0x9c, 0x2c,
106     0xa7, 0x44, 0x99, 0xad, 0x59, 0x3d, 0x56, 0xd9,
107     0xf3, 0x25, 0x3c, 0x06, 0x2a, 0xdc, 0x1f, 0xfc
108 };
109 
110 static struct rte_eth_conf default_port_conf = {
111     .rxmode = {
112         .mq_mode = ETH_MQ_RX_RSS,
113         .max_rx_pkt_len = ETHER_MAX_LEN,
114         .split_hdr_size = 0, /**< hdr buf size */
115         .header_split   = 0, /**< Header Split disabled */
116         .hw_ip_checksum = 0, /**< IP checksum offload disabled */
117         .hw_vlan_filter = 0, /**< VLAN filtering disabled */
118         .hw_vlan_strip  = 0, /**< VLAN strip disabled. */
119         .hw_vlan_extend = 0, /**< Extended VLAN disabled. */
120         .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
121         .hw_strip_crc   = 0, /**< CRC stripped by hardware */
122         .enable_lro     = 0, /**< LRO disabled */
123     },
124     .rx_adv_conf = {
125         .rss_conf = {
126             .rss_key = default_rsskey_40bytes,
127             .rss_key_len = 40,
128             .rss_hf = ETH_RSS_PROTO_MASK,
129         },
130     },
131     .txmode = {
132         .mq_mode = ETH_MQ_TX_NONE,
133     },
134 };
135 
136 struct mbuf_table {
137     uint16_t len;
138     struct rte_mbuf *m_table[MAX_PKT_BURST];
139 };
140 
141 struct lcore_rx_queue {
142     uint8_t port_id;
143     uint8_t queue_id;
144 } __rte_cache_aligned;
145 
146 struct lcore_conf {
147     uint16_t proc_id;
148     uint16_t socket_id;
149     uint16_t nb_queue_list[RTE_MAX_ETHPORTS];
150     struct ff_port_cfg *port_cfgs;
151 
152     uint16_t nb_rx_queue;
153     struct lcore_rx_queue rx_queue_list[MAX_RX_QUEUE_PER_LCORE];
154     uint16_t nb_tx_port;
155     uint16_t tx_port_id[RTE_MAX_ETHPORTS];
156     uint16_t tx_queue_id[RTE_MAX_ETHPORTS];
157     struct mbuf_table tx_mbufs[RTE_MAX_ETHPORTS];
158     char *pcap[RTE_MAX_ETHPORTS];
159 } __rte_cache_aligned;
160 
161 static struct lcore_conf lcore_conf;
162 
163 static struct rte_mempool *pktmbuf_pool[NB_SOCKETS];
164 
165 static struct rte_ring **dispatch_ring[RTE_MAX_ETHPORTS];
166 static dispatch_func_t packet_dispatcher;
167 
168 static uint16_t rss_reta_size[RTE_MAX_ETHPORTS];
169 
170 struct ff_msg_ring {
171     char ring_name[2][RTE_RING_NAMESIZE];
172     /* ring[0] for lcore recv msg, other send */
173     /* ring[1] for lcore send msg, other read */
174     struct rte_ring *ring[2];
175 } __rte_cache_aligned;
176 
177 static struct ff_msg_ring msg_ring[RTE_MAX_LCORE];
178 static struct rte_mempool *message_pool;
179 
180 struct ff_dpdk_if_context {
181     void *sc;
182     void *ifp;
183     uint16_t port_id;
184     struct ff_hw_features hw_features;
185 } __rte_cache_aligned;
186 
187 static struct ff_dpdk_if_context *veth_ctx[RTE_MAX_ETHPORTS];
188 
189 extern void ff_hardclock(void);
190 
191 static void
192 ff_hardclock_job(__rte_unused struct rte_timer *timer,
193     __rte_unused void *arg) {
194     ff_hardclock();
195     ff_update_current_ts();
196 }
197 
198 struct ff_dpdk_if_context *
199 ff_dpdk_register_if(void *sc, void *ifp, struct ff_port_cfg *cfg)
200 {
201     struct ff_dpdk_if_context *ctx;
202 
203     ctx = calloc(1, sizeof(struct ff_dpdk_if_context));
204     if (ctx == NULL)
205         return NULL;
206 
207     ctx->sc = sc;
208     ctx->ifp = ifp;
209     ctx->port_id = cfg->port_id;
210     ctx->hw_features = cfg->hw_features;
211 
212     return ctx;
213 }
214 
215 void
216 ff_dpdk_deregister_if(struct ff_dpdk_if_context *ctx)
217 {
218     free(ctx);
219 }
220 
221 static void
222 check_all_ports_link_status(void)
223 {
224     #define CHECK_INTERVAL 100 /* 100ms */
225     #define MAX_CHECK_TIME 90  /* 9s (90 * 100ms) in total */
226 
227     uint8_t portid, count, all_ports_up, print_flag = 0;
228     struct rte_eth_link link;
229 
230     printf("\nChecking link status");
231     fflush(stdout);
232 
233     int i, nb_ports;
234     nb_ports = ff_global_cfg.dpdk.nb_ports;
235     for (count = 0; count <= MAX_CHECK_TIME; count++) {
236         all_ports_up = 1;
237         for (i = 0; i < nb_ports; i++) {
238             uint8_t portid = ff_global_cfg.dpdk.portid_list[i];
239             memset(&link, 0, sizeof(link));
240             rte_eth_link_get_nowait(portid, &link);
241 
242             /* print link status if flag set */
243             if (print_flag == 1) {
244                 if (link.link_status) {
245                     printf("Port %d Link Up - speed %u "
246                         "Mbps - %s\n", (int)portid,
247                         (unsigned)link.link_speed,
248                         (link.link_duplex == ETH_LINK_FULL_DUPLEX) ?
249                         ("full-duplex") : ("half-duplex\n"));
250                 } else {
251                     printf("Port %d Link Down\n", (int)portid);
252                 }
253                 continue;
254             }
255             /* clear all_ports_up flag if any link down */
256             if (link.link_status == 0) {
257                 all_ports_up = 0;
258                 break;
259             }
260         }
261 
262         /* after finally printing all link status, get out */
263         if (print_flag == 1)
264             break;
265 
266         if (all_ports_up == 0) {
267             printf(".");
268             fflush(stdout);
269             rte_delay_ms(CHECK_INTERVAL);
270         }
271 
272         /* set the print_flag if all ports up or timeout */
273         if (all_ports_up == 1 || count == (MAX_CHECK_TIME - 1)) {
274             print_flag = 1;
275             printf("done\n");
276         }
277     }
278 }
279 
280 static int
281 init_lcore_conf(void)
282 {
283     uint8_t nb_dev_ports = rte_eth_dev_count();
284     if (nb_dev_ports == 0) {
285         rte_exit(EXIT_FAILURE, "No probed ethernet devices\n");
286     }
287 
288     if (ff_global_cfg.dpdk.max_portid >= nb_dev_ports) {
289         rte_exit(EXIT_FAILURE, "this machine doesn't have port %d.\n",
290                  ff_global_cfg.dpdk.max_portid);
291     }
292 
293     lcore_conf.port_cfgs = ff_global_cfg.dpdk.port_cfgs;
294     lcore_conf.proc_id = ff_global_cfg.dpdk.proc_id;
295 
296     uint16_t proc_id;
297     for (proc_id = 0; proc_id < ff_global_cfg.dpdk.nb_procs; proc_id++) {
298         uint16_t lcore_id = ff_global_cfg.dpdk.proc_lcore[proc_id];
299         if (!lcore_config[lcore_id].detected) {
300             rte_exit(EXIT_FAILURE, "lcore %u unavailable\n", lcore_id);
301         }
302     }
303 
304     uint16_t socket_id = 0;
305     if (numa_on) {
306         socket_id = rte_lcore_to_socket_id(rte_lcore_id());
307     }
308 
309     lcore_conf.socket_id = socket_id;
310 
311     uint16_t lcore_id = ff_global_cfg.dpdk.proc_lcore[lcore_conf.proc_id];
312     int j;
313     for (j = 0; j < ff_global_cfg.dpdk.nb_ports; ++j) {
314         uint16_t port_id = ff_global_cfg.dpdk.portid_list[j];
315         struct ff_port_cfg *pconf = &ff_global_cfg.dpdk.port_cfgs[port_id];
316 
317         int queueid = -1;
318         int i;
319         for (i = 0; i < pconf->nb_lcores; i++) {
320             if (pconf->lcore_list[i] == lcore_id) {
321                 queueid = i;
322             }
323         }
324         if (queueid < 0) {
325             continue;
326         }
327         printf("lcore: %u, port: %u, queue: %u\n", lcore_id, port_id, queueid);
328         uint16_t nb_rx_queue = lcore_conf.nb_rx_queue;
329         lcore_conf.rx_queue_list[nb_rx_queue].port_id = port_id;
330         lcore_conf.rx_queue_list[nb_rx_queue].queue_id = queueid;
331         lcore_conf.nb_rx_queue++;
332 
333         lcore_conf.tx_queue_id[port_id] = queueid;
334         lcore_conf.tx_port_id[lcore_conf.nb_tx_port] = port_id;
335         lcore_conf.nb_tx_port++;
336 
337         lcore_conf.pcap[port_id] = pconf->pcap;
338         lcore_conf.nb_queue_list[port_id] = pconf->nb_lcores;
339     }
340 
341     if (lcore_conf.nb_rx_queue == 0) {
342         rte_exit(EXIT_FAILURE, "lcore %u has nothing to do\n", lcore_id);
343     }
344 
345     return 0;
346 }
347 
348 static int
349 init_mem_pool(void)
350 {
351     uint8_t nb_ports = ff_global_cfg.dpdk.nb_ports;
352     uint32_t nb_lcores = ff_global_cfg.dpdk.nb_procs;
353     uint32_t nb_tx_queue = nb_lcores;
354     uint32_t nb_rx_queue = lcore_conf.nb_rx_queue * nb_lcores;
355 
356     unsigned nb_mbuf = RTE_MAX (
357         (nb_rx_queue*RX_QUEUE_SIZE          +
358         nb_ports*nb_lcores*MAX_PKT_BURST    +
359         nb_ports*nb_tx_queue*TX_QUEUE_SIZE  +
360         nb_lcores*MEMPOOL_CACHE_SIZE +
361         nb_ports*KNI_MBUF_MAX +
362         nb_ports*KNI_QUEUE_SIZE +
363         nb_lcores*nb_ports*DISPATCH_RING_SIZE),
364         (unsigned)8192);
365 
366     unsigned socketid = 0;
367     uint16_t i, lcore_id;
368     char s[64];
369 
370     for (i = 0; i < ff_global_cfg.dpdk.nb_procs; i++) {
371         lcore_id = ff_global_cfg.dpdk.proc_lcore[i];
372         if (numa_on) {
373             socketid = rte_lcore_to_socket_id(lcore_id);
374         }
375 
376         if (socketid >= NB_SOCKETS) {
377             rte_exit(EXIT_FAILURE, "Socket %d of lcore %u is out of range %d\n",
378                 socketid, i, NB_SOCKETS);
379         }
380 
381         if (pktmbuf_pool[socketid] != NULL) {
382             continue;
383         }
384 
385         if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
386             snprintf(s, sizeof(s), "mbuf_pool_%d", socketid);
387             pktmbuf_pool[socketid] =
388                 rte_pktmbuf_pool_create(s, nb_mbuf,
389                     MEMPOOL_CACHE_SIZE, 0,
390                     RTE_MBUF_DEFAULT_BUF_SIZE, socketid);
391         } else {
392             snprintf(s, sizeof(s), "mbuf_pool_%d", socketid);
393             pktmbuf_pool[socketid] = rte_mempool_lookup(s);
394         }
395 
396         if (pktmbuf_pool[socketid] == NULL) {
397             rte_exit(EXIT_FAILURE, "Cannot create mbuf pool on socket %d\n", socketid);
398         } else {
399             printf("create mbuf pool on socket %d\n", socketid);
400         }
401     }
402 
403     return 0;
404 }
405 
406 static struct rte_ring *
407 create_ring(const char *name, unsigned count, int socket_id, unsigned flags)
408 {
409     struct rte_ring *ring;
410 
411     if (name == NULL)
412         return NULL;
413 
414     /* If already create, just attached it */
415     if (likely((ring = rte_ring_lookup(name)) != NULL))
416         return ring;
417 
418     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
419         return rte_ring_create(name, count, socket_id, flags);
420     } else {
421         return rte_ring_lookup(name);
422     }
423 }
424 
425 static int
426 init_dispatch_ring(void)
427 {
428     int j;
429     char name_buf[RTE_RING_NAMESIZE];
430     int queueid;
431 
432     unsigned socketid = lcore_conf.socket_id;
433 
434     /* Create ring according to ports actually being used. */
435     int nb_ports = ff_global_cfg.dpdk.nb_ports;
436     for (j = 0; j < nb_ports; j++) {
437         uint16_t portid = ff_global_cfg.dpdk.portid_list[j];
438         struct ff_port_cfg *pconf = &ff_global_cfg.dpdk.port_cfgs[portid];
439         int nb_queues = pconf->nb_lcores;
440         if (dispatch_ring[portid] == NULL) {
441             snprintf(name_buf, RTE_RING_NAMESIZE, "ring_ptr_p%d", portid);
442 
443             dispatch_ring[portid] = rte_zmalloc(name_buf,
444                 sizeof(struct rte_ring *) * nb_queues,
445                 RTE_CACHE_LINE_SIZE);
446             if (dispatch_ring[portid] == NULL) {
447                 rte_exit(EXIT_FAILURE, "rte_zmalloc(%s (struct rte_ring*)) "
448                     "failed\n", name_buf);
449             }
450         }
451 
452         for(queueid = 0; queueid < nb_queues; ++queueid) {
453             snprintf(name_buf, RTE_RING_NAMESIZE, "dispatch_ring_p%d_q%d",
454                 portid, queueid);
455             dispatch_ring[portid][queueid] = create_ring(name_buf,
456                 DISPATCH_RING_SIZE, socketid, RING_F_SC_DEQ);
457 
458             if (dispatch_ring[portid][queueid] == NULL)
459                 rte_panic("create ring:%s failed!\n", name_buf);
460 
461             printf("create ring:%s success, %u ring entries are now free!\n",
462                 name_buf, rte_ring_free_count(dispatch_ring[portid][queueid]));
463         }
464     }
465 
466     return 0;
467 }
468 
469 static void
470 ff_msg_init(struct rte_mempool *mp,
471     __attribute__((unused)) void *opaque_arg,
472     void *obj, __attribute__((unused)) unsigned i)
473 {
474     struct ff_msg *msg = (struct ff_msg *)obj;
475     msg->msg_type = FF_UNKNOWN;
476     msg->buf_addr = (char *)msg + sizeof(struct ff_msg);
477     msg->buf_len = mp->elt_size - sizeof(struct ff_msg);
478 }
479 
480 static int
481 init_msg_ring(void)
482 {
483     uint16_t i;
484     uint16_t nb_procs = ff_global_cfg.dpdk.nb_procs;
485     unsigned socketid = lcore_conf.socket_id;
486 
487     /* Create message buffer pool */
488     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
489         message_pool = rte_mempool_create(FF_MSG_POOL,
490            MSG_RING_SIZE * 2 * nb_procs,
491            MAX_MSG_BUF_SIZE, MSG_RING_SIZE / 2, 0,
492            NULL, NULL, ff_msg_init, NULL,
493            socketid, 0);
494     } else {
495         message_pool = rte_mempool_lookup(FF_MSG_POOL);
496     }
497 
498     if (message_pool == NULL) {
499         rte_panic("Create msg mempool failed\n");
500     }
501 
502     for(i = 0; i < nb_procs; ++i) {
503         snprintf(msg_ring[i].ring_name[0], RTE_RING_NAMESIZE,
504             "%s%u", FF_MSG_RING_IN, i);
505         snprintf(msg_ring[i].ring_name[1], RTE_RING_NAMESIZE,
506             "%s%u", FF_MSG_RING_OUT, i);
507 
508         msg_ring[i].ring[0] = create_ring(msg_ring[i].ring_name[0],
509             MSG_RING_SIZE, socketid, RING_F_SP_ENQ | RING_F_SC_DEQ);
510         if (msg_ring[i].ring[0] == NULL)
511             rte_panic("create ring::%s failed!\n", msg_ring[i].ring_name[0]);
512 
513         msg_ring[i].ring[1] = create_ring(msg_ring[i].ring_name[1],
514             MSG_RING_SIZE, socketid, RING_F_SP_ENQ | RING_F_SC_DEQ);
515         if (msg_ring[i].ring[1] == NULL)
516             rte_panic("create ring::%s failed!\n", msg_ring[i].ring_name[0]);
517     }
518 
519     return 0;
520 }
521 
522 static int
523 init_kni(void)
524 {
525     int nb_ports = rte_eth_dev_count();
526     kni_accept = 0;
527     if(strcasecmp(ff_global_cfg.kni.method, "accept") == 0)
528         kni_accept = 1;
529 
530     ff_kni_init(nb_ports, ff_global_cfg.kni.tcp_port,
531         ff_global_cfg.kni.udp_port);
532 
533     unsigned socket_id = lcore_conf.socket_id;
534     struct rte_mempool *mbuf_pool = pktmbuf_pool[socket_id];
535 
536     nb_ports = ff_global_cfg.dpdk.nb_ports;
537     int i, ret;
538     for (i = 0; i < nb_ports; i++) {
539         uint16_t port_id = ff_global_cfg.dpdk.portid_list[i];
540         ff_kni_alloc(port_id, socket_id, mbuf_pool, KNI_QUEUE_SIZE);
541     }
542 
543     return 0;
544 }
545 
546 static void
547 set_rss_table(uint8_t port_id, uint16_t reta_size, uint16_t nb_queues)
548 {
549     if (reta_size == 0) {
550         return;
551     }
552 
553     int reta_conf_size = RTE_MAX(1, reta_size / RTE_RETA_GROUP_SIZE);
554     struct rte_eth_rss_reta_entry64 reta_conf[reta_conf_size];
555 
556     /* config HW indirection table */
557     unsigned i, j, hash=0;
558     for (i = 0; i < reta_conf_size; i++) {
559         reta_conf[i].mask = ~0ULL;
560         for (j = 0; j < RTE_RETA_GROUP_SIZE; j++) {
561             reta_conf[i].reta[j] = hash++ % nb_queues;
562         }
563     }
564 
565     if (rte_eth_dev_rss_reta_update(port_id, reta_conf, reta_size)) {
566         rte_exit(EXIT_FAILURE, "port[%d], failed to update rss table\n",
567             port_id);
568     }
569 }
570 
571 static int
572 init_port_start(void)
573 {
574     int nb_ports = ff_global_cfg.dpdk.nb_ports;
575     unsigned socketid = rte_lcore_to_socket_id(rte_lcore_id());
576     struct rte_mempool *mbuf_pool = pktmbuf_pool[socketid];
577     uint16_t i;
578 
579     for (i = 0; i < nb_ports; i++) {
580         uint16_t port_id = ff_global_cfg.dpdk.portid_list[i];
581         struct ff_port_cfg *pconf = &ff_global_cfg.dpdk.port_cfgs[port_id];
582         uint16_t nb_queues = pconf->nb_lcores;
583 
584         struct rte_eth_dev_info dev_info;
585         rte_eth_dev_info_get(port_id, &dev_info);
586 
587         if (nb_queues > dev_info.max_rx_queues) {
588             rte_exit(EXIT_FAILURE, "num_procs[%d] bigger than max_rx_queues[%d]\n",
589                 nb_queues,
590                 dev_info.max_rx_queues);
591         }
592 
593         if (nb_queues > dev_info.max_tx_queues) {
594             rte_exit(EXIT_FAILURE, "num_procs[%d] bigger than max_tx_queues[%d]\n",
595                 nb_queues,
596                 dev_info.max_tx_queues);
597         }
598 
599         struct ether_addr addr;
600         rte_eth_macaddr_get(port_id, &addr);
601         printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
602                    " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
603                 (unsigned)port_id,
604                 addr.addr_bytes[0], addr.addr_bytes[1],
605                 addr.addr_bytes[2], addr.addr_bytes[3],
606                 addr.addr_bytes[4], addr.addr_bytes[5]);
607 
608         rte_memcpy(pconf->mac,
609             addr.addr_bytes, ETHER_ADDR_LEN);
610 
611         /* Clear txq_flags - we do not need multi-mempool and refcnt */
612         dev_info.default_txconf.txq_flags = ETH_TXQ_FLAGS_NOMULTMEMP |
613             ETH_TXQ_FLAGS_NOREFCOUNT;
614 
615         /* Disable features that are not supported by port's HW */
616         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM)) {
617             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMUDP;
618         }
619 
620         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)) {
621             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMTCP;
622         }
623 
624         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_SCTP_CKSUM)) {
625             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMSCTP;
626         }
627 
628         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_VLAN_INSERT)) {
629             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOVLANOFFL;
630         }
631 
632         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_VLAN_INSERT)) {
633             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOVLANOFFL;
634         }
635 
636         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_TSO) &&
637             !(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_TSO)) {
638             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOMULTSEGS;
639         }
640 
641         struct rte_eth_conf port_conf = {0};
642 
643         /* Set RSS mode */
644         port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
645         port_conf.rx_adv_conf.rss_conf.rss_hf = ETH_RSS_PROTO_MASK;
646         port_conf.rx_adv_conf.rss_conf.rss_key = default_rsskey_40bytes;
647         port_conf.rx_adv_conf.rss_conf.rss_key_len = 40;
648 
649         /* Set Rx VLAN stripping */
650         if (ff_global_cfg.dpdk.vlan_strip) {
651             if (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_VLAN_STRIP) {
652                 port_conf.rxmode.hw_vlan_strip = 1;
653             }
654         }
655 
656         /* Enable HW CRC stripping */
657         port_conf.rxmode.hw_strip_crc = 1;
658 
659         /* FIXME: Enable TCP LRO ?*/
660         #if 0
661         if (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_LRO) {
662             printf("LRO is supported\n");
663             port_conf.rxmode.enable_lro = 1;
664             pconf->hw_features.rx_lro = 1;
665         }
666         #endif
667 
668         /* Set Rx checksum checking */
669         if ((dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
670             (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_UDP_CKSUM) &&
671             (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)) {
672             printf("RX checksum offload supported\n");
673             port_conf.rxmode.hw_ip_checksum = 1;
674             pconf->hw_features.rx_csum = 1;
675         }
676 
677         if ((dev_info.tx_offload_capa & DEV_TX_OFFLOAD_IPV4_CKSUM)) {
678             printf("TX ip checksum offload supported\n");
679             pconf->hw_features.tx_csum_ip = 1;
680         }
681 
682         if ((dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM) &&
683             (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)) {
684             printf("TX TCP&UDP checksum offload supported\n");
685             pconf->hw_features.tx_csum_l4 = 1;
686         }
687 
688         if (ff_global_cfg.dpdk.tso) {
689             if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_TSO) {
690                 printf("TSO is supported\n");
691                 pconf->hw_features.tx_tso = 1;
692             }
693         } else {
694             printf("TSO is disabled\n");
695         }
696 
697         if (dev_info.reta_size) {
698             /* reta size must be power of 2 */
699             assert((dev_info.reta_size & (dev_info.reta_size - 1)) == 0);
700 
701             rss_reta_size[port_id] = dev_info.reta_size;
702             printf("port[%d]: rss table size: %d\n", port_id,
703                 dev_info.reta_size);
704         }
705 
706         if (rte_eal_process_type() != RTE_PROC_PRIMARY) {
707             continue;
708         }
709 
710         int ret = rte_eth_dev_configure(port_id, nb_queues, nb_queues, &port_conf);
711         if (ret != 0) {
712             return ret;
713         }
714         uint16_t q;
715         for (q = 0; q < nb_queues; q++) {
716             ret = rte_eth_tx_queue_setup(port_id, q, TX_QUEUE_SIZE,
717                 socketid, &dev_info.default_txconf);
718             if (ret < 0) {
719                 return ret;
720             }
721 
722             ret = rte_eth_rx_queue_setup(port_id, q, RX_QUEUE_SIZE,
723                 socketid, &dev_info.default_rxconf, mbuf_pool);
724             if (ret < 0) {
725                 return ret;
726             }
727         }
728 
729         ret = rte_eth_dev_start(port_id);
730         if (ret < 0) {
731             return ret;
732         }
733 
734         if (nb_queues > 1) {
735             /* set HW rss hash function to Toeplitz. */
736             if (!rte_eth_dev_filter_supported(port_id, RTE_ETH_FILTER_HASH)) {
737                 struct rte_eth_hash_filter_info info = {0};
738                 info.info_type = RTE_ETH_HASH_FILTER_GLOBAL_CONFIG;
739                 info.info.global_conf.hash_func = RTE_ETH_HASH_FUNCTION_TOEPLITZ;
740 
741                 if (rte_eth_dev_filter_ctrl(port_id, RTE_ETH_FILTER_HASH,
742                     RTE_ETH_FILTER_SET, &info) < 0) {
743                     rte_exit(EXIT_FAILURE, "port[%d] set hash func failed\n",
744                         port_id);
745                 }
746             }
747 
748             set_rss_table(port_id, dev_info.reta_size, nb_queues);
749         }
750 
751         /* Enable RX in promiscuous mode for the Ethernet device. */
752         if (ff_global_cfg.dpdk.promiscuous) {
753             rte_eth_promiscuous_enable(port_id);
754             ret = rte_eth_promiscuous_get(port_id);
755             if (ret == 1) {
756                 printf("set port %u to promiscuous mode ok\n", port_id);
757             } else {
758                 printf("set port %u to promiscuous mode error\n", port_id);
759             }
760         }
761 
762         /* Enable pcap dump */
763         if (pconf->pcap) {
764             ff_enable_pcap(pconf->pcap);
765         }
766     }
767 
768     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
769         check_all_ports_link_status();
770     }
771 
772     return 0;
773 }
774 
775 static int
776 init_clock(void)
777 {
778     rte_timer_subsystem_init();
779     uint64_t hz = rte_get_timer_hz();
780     uint64_t intrs = MS_PER_S/ff_global_cfg.freebsd.hz;
781     uint64_t tsc = (hz + MS_PER_S - 1) / MS_PER_S*intrs;
782 
783     rte_timer_init(&freebsd_clock);
784     rte_timer_reset(&freebsd_clock, tsc, PERIODICAL,
785         rte_lcore_id(), &ff_hardclock_job, NULL);
786 
787     ff_update_current_ts();
788 
789     return 0;
790 }
791 
792 int
793 ff_dpdk_init(int argc, char **argv)
794 {
795     if (ff_global_cfg.dpdk.nb_procs < 1 ||
796         ff_global_cfg.dpdk.nb_procs > RTE_MAX_LCORE ||
797         ff_global_cfg.dpdk.proc_id >= ff_global_cfg.dpdk.nb_procs ||
798         ff_global_cfg.dpdk.proc_id < 0) {
799         printf("param num_procs[%d] or proc_id[%d] error!\n",
800             ff_global_cfg.dpdk.nb_procs,
801             ff_global_cfg.dpdk.proc_id);
802         exit(1);
803     }
804 
805     int ret = rte_eal_init(argc, argv);
806     if (ret < 0) {
807         rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
808     }
809 
810     numa_on = ff_global_cfg.dpdk.numa_on;
811 
812     init_lcore_conf();
813 
814     init_mem_pool();
815 
816     init_dispatch_ring();
817 
818     init_msg_ring();
819 
820     enable_kni = ff_global_cfg.kni.enable;
821     if (enable_kni) {
822         init_kni();
823     }
824 
825     ret = init_port_start();
826     if (ret < 0) {
827         rte_exit(EXIT_FAILURE, "init_port_start failed\n");
828     }
829 
830     init_clock();
831 
832     return 0;
833 }
834 
835 static void
836 ff_veth_input(const struct ff_dpdk_if_context *ctx, struct rte_mbuf *pkt)
837 {
838     uint8_t rx_csum = ctx->hw_features.rx_csum;
839     if (rx_csum) {
840         if (pkt->ol_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD)) {
841             return;
842         }
843     }
844 
845     /*
846      * FIXME: should we save pkt->vlan_tci
847      * if (pkt->ol_flags & PKT_RX_VLAN_PKT)
848      */
849 
850     void *data = rte_pktmbuf_mtod(pkt, void*);
851     uint16_t len = rte_pktmbuf_data_len(pkt);
852 
853     void *hdr = ff_mbuf_gethdr(pkt, pkt->pkt_len, data, len, rx_csum);
854     if (hdr == NULL) {
855         rte_pktmbuf_free(pkt);
856         return;
857     }
858 
859     struct rte_mbuf *pn = pkt->next;
860     void *prev = hdr;
861     while(pn != NULL) {
862         data = rte_pktmbuf_mtod(pn, void*);
863         len = rte_pktmbuf_data_len(pn);
864 
865         void *mb = ff_mbuf_get(prev, data, len);
866         if (mb == NULL) {
867             ff_mbuf_free(hdr);
868             rte_pktmbuf_free(pkt);
869             return;
870         }
871         pn = pn->next;
872         prev = mb;
873     }
874 
875     ff_veth_process_packet(ctx->ifp, hdr);
876 }
877 
878 static enum FilterReturn
879 protocol_filter(const void *data, uint16_t len)
880 {
881     if(len < sizeof(struct ether_hdr))
882         return FILTER_UNKNOWN;
883 
884     const struct ether_hdr *hdr;
885     hdr = (const struct ether_hdr *)data;
886 
887     if(ntohs(hdr->ether_type) == ETHER_TYPE_ARP)
888         return FILTER_ARP;
889 
890     if (!enable_kni) {
891         return FILTER_UNKNOWN;
892     }
893 
894     if(ntohs(hdr->ether_type) != ETHER_TYPE_IPv4)
895         return FILTER_UNKNOWN;
896 
897     return ff_kni_proto_filter(data + sizeof(struct ether_hdr),
898         len - sizeof(struct ether_hdr));
899 }
900 
901 static inline void
902 process_packets(uint8_t port_id, uint16_t queue_id, struct rte_mbuf **bufs,
903     uint16_t count, const struct ff_dpdk_if_context *ctx, int pkts_from_ring)
904 {
905     struct lcore_conf *qconf = &lcore_conf;
906     uint16_t nb_queues = qconf->nb_queue_list[port_id];
907 
908     uint16_t i;
909     for (i = 0; i < count; i++) {
910         struct rte_mbuf *rtem = bufs[i];
911 
912         if (unlikely(qconf->pcap[port_id] != NULL)) {
913             if (!pkts_from_ring) {
914                 ff_dump_packets(qconf->pcap[port_id], rtem);
915             }
916         }
917 
918         void *data = rte_pktmbuf_mtod(rtem, void*);
919         uint16_t len = rte_pktmbuf_data_len(rtem);
920 
921         if (!pkts_from_ring && packet_dispatcher) {
922             int ret = (*packet_dispatcher)(data, len, nb_queues);
923             if (ret < 0 || ret >= nb_queues) {
924                 rte_pktmbuf_free(rtem);
925                 continue;
926             }
927 
928             if (ret != queue_id) {
929                 ret = rte_ring_enqueue(dispatch_ring[port_id][ret], rtem);
930                 if (ret < 0)
931                     rte_pktmbuf_free(rtem);
932 
933                 continue;
934             }
935         }
936 
937         enum FilterReturn filter = protocol_filter(data, len);
938         if (filter == FILTER_ARP) {
939             struct rte_mempool *mbuf_pool;
940             struct rte_mbuf *mbuf_clone;
941             if (!pkts_from_ring) {
942                 uint16_t j;
943                 for(j = 0; j < nb_queues; ++j) {
944                     if(j == queue_id)
945                         continue;
946 
947                     unsigned socket_id = 0;
948                     if (numa_on) {
949                         uint16_t lcore_id = qconf->port_cfgs[port_id].lcore_list[j];
950                         socket_id = rte_lcore_to_socket_id(lcore_id);
951                     }
952                     mbuf_pool = pktmbuf_pool[socket_id];
953                     mbuf_clone = rte_pktmbuf_clone(rtem, mbuf_pool);
954                     if(mbuf_clone) {
955                         int ret = rte_ring_enqueue(dispatch_ring[port_id][j], mbuf_clone);
956                         if (ret < 0)
957                             rte_pktmbuf_free(mbuf_clone);
958                     }
959                 }
960             }
961 
962             if (enable_kni && rte_eal_process_type() == RTE_PROC_PRIMARY) {
963                 mbuf_pool = pktmbuf_pool[qconf->socket_id];
964                 mbuf_clone = rte_pktmbuf_clone(rtem, mbuf_pool);
965                 if(mbuf_clone) {
966                     ff_kni_enqueue(port_id, mbuf_clone);
967                 }
968             }
969 
970             ff_veth_input(ctx, rtem);
971         } else if (enable_kni && ((filter == FILTER_KNI && kni_accept) ||
972             (filter == FILTER_UNKNOWN && !kni_accept)) ) {
973             ff_kni_enqueue(port_id, rtem);
974         } else {
975             ff_veth_input(ctx, rtem);
976         }
977     }
978 }
979 
980 static inline int
981 process_dispatch_ring(uint8_t port_id, uint16_t queue_id,
982     struct rte_mbuf **pkts_burst, const struct ff_dpdk_if_context *ctx)
983 {
984     /* read packet from ring buf and to process */
985     uint16_t nb_rb;
986     nb_rb = rte_ring_dequeue_burst(dispatch_ring[port_id][queue_id],
987         (void **)pkts_burst, MAX_PKT_BURST);
988 
989     if(nb_rb > 0) {
990         process_packets(port_id, queue_id, pkts_burst, nb_rb, ctx, 1);
991     }
992 
993     return 0;
994 }
995 
996 static inline void
997 handle_sysctl_msg(struct ff_msg *msg, uint16_t proc_id)
998 {
999     int ret = ff_sysctl(msg->sysctl.name, msg->sysctl.namelen,
1000         msg->sysctl.old, msg->sysctl.oldlenp, msg->sysctl.new,
1001         msg->sysctl.newlen);
1002 
1003     if (ret < 0) {
1004         msg->result = errno;
1005     } else {
1006         msg->result = 0;
1007     }
1008 
1009     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
1010 }
1011 
1012 static inline void
1013 handle_ioctl_msg(struct ff_msg *msg, uint16_t proc_id)
1014 {
1015     int fd, ret;
1016     fd = ff_socket(AF_INET, SOCK_DGRAM, 0);
1017     if (fd < 0) {
1018         ret = -1;
1019         goto done;
1020     }
1021 
1022     ret = ff_ioctl(fd, msg->ioctl.cmd, msg->ioctl.data);
1023 
1024     ff_close(fd);
1025 
1026 done:
1027     if (ret < 0) {
1028         msg->result = errno;
1029     } else {
1030         msg->result = 0;
1031     }
1032 
1033     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
1034 }
1035 
1036 static inline void
1037 handle_route_msg(struct ff_msg *msg, uint16_t proc_id)
1038 {
1039     int ret = ff_rtioctl(msg->route.fib, msg->route.data,
1040         &msg->route.len, msg->route.maxlen);
1041     if (ret < 0) {
1042         msg->result = errno;
1043     } else {
1044         msg->result = 0;
1045     }
1046 
1047     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
1048 }
1049 
1050 static struct ff_top_args ff_status;
1051 static inline void
1052 handle_top_msg(struct ff_msg *msg, uint16_t proc_id)
1053 {
1054     msg->top = ff_status;
1055     msg->result = 0;
1056 
1057     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
1058 }
1059 
1060 #ifdef FF_NETGRAPH
1061 static inline void
1062 handle_ngctl_msg(struct ff_msg *msg, uint16_t proc_id)
1063 {
1064     int ret = ff_ngctl(msg->ngctl.cmd, msg->ngctl.data);
1065     if (ret < 0) {
1066         msg->result = errno;
1067     } else {
1068         msg->result = 0;
1069         msg->ngctl.ret = ret;
1070     }
1071 
1072     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
1073 }
1074 #endif
1075 
1076 static inline void
1077 handle_default_msg(struct ff_msg *msg, uint16_t proc_id)
1078 {
1079     msg->result = ENOTSUP;
1080     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
1081 }
1082 
1083 static inline void
1084 handle_msg(struct ff_msg *msg, uint16_t proc_id)
1085 {
1086     switch (msg->msg_type) {
1087         case FF_SYSCTL:
1088             handle_sysctl_msg(msg, proc_id);
1089             break;
1090         case FF_IOCTL:
1091             handle_ioctl_msg(msg, proc_id);
1092             break;
1093         case FF_ROUTE:
1094             handle_route_msg(msg, proc_id);
1095             break;
1096         case FF_TOP:
1097             handle_top_msg(msg, proc_id);
1098             break;
1099 #ifdef FF_NETGRAPH
1100         case FF_NGCTL:
1101             handle_ngctl_msg(msg, proc_id);
1102             break;
1103 #endif
1104         default:
1105             handle_default_msg(msg, proc_id);
1106             break;
1107     }
1108 }
1109 
1110 static inline int
1111 process_msg_ring(uint16_t proc_id)
1112 {
1113     void *msg;
1114     int ret = rte_ring_dequeue(msg_ring[proc_id].ring[0], &msg);
1115 
1116     if (unlikely(ret == 0)) {
1117         handle_msg((struct ff_msg *)msg, proc_id);
1118     }
1119 
1120     return 0;
1121 }
1122 
1123 /* Send burst of packets on an output interface */
1124 static inline int
1125 send_burst(struct lcore_conf *qconf, uint16_t n, uint8_t port)
1126 {
1127     struct rte_mbuf **m_table;
1128     int ret;
1129     uint16_t queueid;
1130 
1131     queueid = qconf->tx_queue_id[port];
1132     m_table = (struct rte_mbuf **)qconf->tx_mbufs[port].m_table;
1133 
1134     if (unlikely(qconf->pcap[port] != NULL)) {
1135         uint16_t i;
1136         for (i = 0; i < n; i++) {
1137             ff_dump_packets(qconf->pcap[port], m_table[i]);
1138         }
1139     }
1140 
1141     ret = rte_eth_tx_burst(port, queueid, m_table, n);
1142     if (unlikely(ret < n)) {
1143         do {
1144             rte_pktmbuf_free(m_table[ret]);
1145         } while (++ret < n);
1146     }
1147 
1148     return 0;
1149 }
1150 
1151 /* Enqueue a single packet, and send burst if queue is filled */
1152 static inline int
1153 send_single_packet(struct rte_mbuf *m, uint8_t port)
1154 {
1155     uint16_t len;
1156     struct lcore_conf *qconf;
1157 
1158     qconf = &lcore_conf;
1159     len = qconf->tx_mbufs[port].len;
1160     qconf->tx_mbufs[port].m_table[len] = m;
1161     len++;
1162 
1163     /* enough pkts to be sent */
1164     if (unlikely(len == MAX_PKT_BURST)) {
1165         send_burst(qconf, MAX_PKT_BURST, port);
1166         len = 0;
1167     }
1168 
1169     qconf->tx_mbufs[port].len = len;
1170     return 0;
1171 }
1172 
1173 int
1174 ff_dpdk_if_send(struct ff_dpdk_if_context *ctx, void *m,
1175     int total)
1176 {
1177     struct rte_mempool *mbuf_pool = pktmbuf_pool[lcore_conf.socket_id];
1178     struct rte_mbuf *head = rte_pktmbuf_alloc(mbuf_pool);
1179     if (head == NULL) {
1180         ff_mbuf_free(m);
1181         return -1;
1182     }
1183 
1184     head->pkt_len = total;
1185     head->nb_segs = 0;
1186 
1187     int off = 0;
1188     struct rte_mbuf *cur = head, *prev = NULL;
1189     while(total > 0) {
1190         if (cur == NULL) {
1191             cur = rte_pktmbuf_alloc(mbuf_pool);
1192             if (cur == NULL) {
1193                 rte_pktmbuf_free(head);
1194                 ff_mbuf_free(m);
1195                 return -1;
1196             }
1197         }
1198 
1199         void *data = rte_pktmbuf_mtod(cur, void*);
1200         int len = total > RTE_MBUF_DEFAULT_DATAROOM ? RTE_MBUF_DEFAULT_DATAROOM : total;
1201         int ret = ff_mbuf_copydata(m, data, off, len);
1202         if (ret < 0) {
1203             rte_pktmbuf_free(head);
1204             ff_mbuf_free(m);
1205             return -1;
1206         }
1207 
1208         if (prev != NULL) {
1209             prev->next = cur;
1210         }
1211         prev = cur;
1212 
1213         cur->data_len = len;
1214         off += len;
1215         total -= len;
1216         head->nb_segs++;
1217         cur = NULL;
1218     }
1219 
1220     struct ff_tx_offload offload = {0};
1221     ff_mbuf_tx_offload(m, &offload);
1222 
1223     if (offload.ip_csum) {
1224         head->ol_flags |= PKT_TX_IP_CKSUM;
1225         head->l2_len = sizeof(struct ether_hdr);
1226         head->l3_len = sizeof(struct ipv4_hdr);
1227     }
1228 
1229     if (ctx->hw_features.tx_csum_l4) {
1230         if (offload.tcp_csum) {
1231             head->ol_flags |= PKT_TX_TCP_CKSUM;
1232             head->l2_len = sizeof(struct ether_hdr);
1233             head->l3_len = sizeof(struct ipv4_hdr);
1234         }
1235 
1236         if (offload.tso_seg_size) {
1237             head->ol_flags |= PKT_TX_TCP_SEG;
1238             head->l4_len = sizeof(struct tcp_hdr);
1239             head->tso_segsz = offload.tso_seg_size;
1240         }
1241 
1242         if (offload.udp_csum) {
1243             head->ol_flags |= PKT_TX_UDP_CKSUM;
1244             head->l2_len = sizeof(struct ether_hdr);
1245             head->l3_len = sizeof(struct ipv4_hdr);
1246         }
1247     }
1248 
1249     ff_mbuf_free(m);
1250 
1251     return send_single_packet(head, ctx->port_id);
1252 }
1253 
1254 static int
1255 main_loop(void *arg)
1256 {
1257     struct loop_routine *lr = (struct loop_routine *)arg;
1258 
1259     struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
1260     uint64_t prev_tsc, diff_tsc, cur_tsc, usch_tsc, div_tsc, usr_tsc, sys_tsc, end_tsc;
1261     int i, j, nb_rx, idle;
1262     uint8_t port_id, queue_id;
1263     struct lcore_conf *qconf;
1264     const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) /
1265         US_PER_S * BURST_TX_DRAIN_US;
1266     struct ff_dpdk_if_context *ctx;
1267 
1268     prev_tsc = 0;
1269     usch_tsc = 0;
1270 
1271     qconf = &lcore_conf;
1272 
1273     while (1) {
1274         cur_tsc = rte_rdtsc();
1275         if (unlikely(freebsd_clock.expire < cur_tsc)) {
1276             rte_timer_manage();
1277         }
1278 
1279         idle = 1;
1280         sys_tsc = 0;
1281         usr_tsc = 0;
1282 
1283         /*
1284          * TX burst queue drain
1285          */
1286         diff_tsc = cur_tsc - prev_tsc;
1287         if (unlikely(diff_tsc > drain_tsc)) {
1288             for (i = 0; i < qconf->nb_tx_port; i++) {
1289                 port_id = qconf->tx_port_id[i];
1290                 if (qconf->tx_mbufs[port_id].len == 0)
1291                     continue;
1292 
1293                 idle = 0;
1294 
1295                 send_burst(qconf,
1296                     qconf->tx_mbufs[port_id].len,
1297                     port_id);
1298                 qconf->tx_mbufs[port_id].len = 0;
1299             }
1300 
1301             prev_tsc = cur_tsc;
1302         }
1303 
1304         /*
1305          * Read packet from RX queues
1306          */
1307         for (i = 0; i < qconf->nb_rx_queue; ++i) {
1308             port_id = qconf->rx_queue_list[i].port_id;
1309             queue_id = qconf->rx_queue_list[i].queue_id;
1310             ctx = veth_ctx[port_id];
1311 
1312             if (enable_kni && rte_eal_process_type() == RTE_PROC_PRIMARY) {
1313                 ff_kni_process(port_id, queue_id, pkts_burst, MAX_PKT_BURST);
1314             }
1315 
1316             process_dispatch_ring(port_id, queue_id, pkts_burst, ctx);
1317 
1318             nb_rx = rte_eth_rx_burst(port_id, queue_id, pkts_burst,
1319                 MAX_PKT_BURST);
1320             if (nb_rx == 0)
1321                 continue;
1322 
1323             idle = 0;
1324 
1325             /* Prefetch first packets */
1326             for (j = 0; j < PREFETCH_OFFSET && j < nb_rx; j++) {
1327                 rte_prefetch0(rte_pktmbuf_mtod(
1328                         pkts_burst[j], void *));
1329             }
1330 
1331             /* Prefetch and handle already prefetched packets */
1332             for (j = 0; j < (nb_rx - PREFETCH_OFFSET); j++) {
1333                 rte_prefetch0(rte_pktmbuf_mtod(pkts_burst[
1334                         j + PREFETCH_OFFSET], void *));
1335                 process_packets(port_id, queue_id, &pkts_burst[j], 1, ctx, 0);
1336             }
1337 
1338             /* Handle remaining prefetched packets */
1339             for (; j < nb_rx; j++) {
1340                 process_packets(port_id, queue_id, &pkts_burst[j], 1, ctx, 0);
1341             }
1342         }
1343 
1344         process_msg_ring(qconf->proc_id);
1345 
1346         div_tsc = rte_rdtsc();
1347 
1348         if (likely(lr->loop != NULL && (!idle || cur_tsc - usch_tsc > drain_tsc))) {
1349             usch_tsc = cur_tsc;
1350             lr->loop(lr->arg);
1351         }
1352 
1353         end_tsc = rte_rdtsc();
1354 
1355         if (usch_tsc == cur_tsc) {
1356             usr_tsc = end_tsc - div_tsc;
1357         }
1358 
1359         if (!idle) {
1360             sys_tsc = div_tsc - cur_tsc;
1361             ff_status.sys_tsc += sys_tsc;
1362         }
1363 
1364         ff_status.usr_tsc += usr_tsc;
1365         ff_status.work_tsc += end_tsc - cur_tsc;
1366         ff_status.idle_tsc += end_tsc - cur_tsc - usr_tsc - sys_tsc;
1367 
1368         ff_status.loops++;
1369     }
1370 
1371     return 0;
1372 }
1373 
1374 int
1375 ff_dpdk_if_up(void) {
1376     int i;
1377     struct lcore_conf *qconf = &lcore_conf;
1378     for (i = 0; i < qconf->nb_tx_port; i++) {
1379         uint16_t port_id = qconf->tx_port_id[i];
1380 
1381         struct ff_port_cfg *pconf = &qconf->port_cfgs[port_id];
1382         veth_ctx[port_id] = ff_veth_attach(pconf);
1383         if (veth_ctx[port_id] == NULL) {
1384             rte_exit(EXIT_FAILURE, "ff_veth_attach failed");
1385         }
1386     }
1387 
1388     return 0;
1389 }
1390 
1391 void
1392 ff_dpdk_run(loop_func_t loop, void *arg) {
1393     struct loop_routine *lr = rte_malloc(NULL,
1394         sizeof(struct loop_routine), 0);
1395     lr->loop = loop;
1396     lr->arg = arg;
1397     rte_eal_mp_remote_launch(main_loop, lr, CALL_MASTER);
1398     rte_eal_mp_wait_lcore();
1399     rte_free(lr);
1400 }
1401 
1402 void
1403 ff_dpdk_pktmbuf_free(void *m)
1404 {
1405     rte_pktmbuf_free((struct rte_mbuf *)m);
1406 }
1407 
1408 static uint32_t
1409 toeplitz_hash(unsigned keylen, const uint8_t *key,
1410     unsigned datalen, const uint8_t *data)
1411 {
1412     uint32_t hash = 0, v;
1413     u_int i, b;
1414 
1415     /* XXXRW: Perhaps an assertion about key length vs. data length? */
1416 
1417     v = (key[0]<<24) + (key[1]<<16) + (key[2] <<8) + key[3];
1418     for (i = 0; i < datalen; i++) {
1419         for (b = 0; b < 8; b++) {
1420             if (data[i] & (1<<(7-b)))
1421                 hash ^= v;
1422             v <<= 1;
1423             if ((i + 4) < keylen &&
1424                 (key[i+4] & (1<<(7-b))))
1425                 v |= 1;
1426         }
1427     }
1428     return (hash);
1429 }
1430 
1431 int
1432 ff_rss_check(void *softc, uint32_t saddr, uint32_t daddr,
1433     uint16_t sport, uint16_t dport)
1434 {
1435     struct lcore_conf *qconf = &lcore_conf;
1436     struct ff_dpdk_if_context *ctx = ff_veth_softc_to_hostc(softc);
1437     uint16_t nb_queues = qconf->nb_queue_list[ctx->port_id];
1438 
1439     if (nb_queues <= 1) {
1440         return 1;
1441     }
1442 
1443     uint16_t reta_size = rss_reta_size[ctx->port_id];
1444     uint16_t queueid = qconf->tx_queue_id[ctx->port_id];
1445 
1446     uint8_t data[sizeof(saddr) + sizeof(daddr) + sizeof(sport) +
1447         sizeof(dport)];
1448 
1449     unsigned datalen = 0;
1450 
1451     bcopy(&saddr, &data[datalen], sizeof(saddr));
1452     datalen += sizeof(saddr);
1453 
1454     bcopy(&daddr, &data[datalen], sizeof(daddr));
1455     datalen += sizeof(daddr);
1456 
1457     bcopy(&sport, &data[datalen], sizeof(sport));
1458     datalen += sizeof(sport);
1459 
1460     bcopy(&dport, &data[datalen], sizeof(dport));
1461     datalen += sizeof(dport);
1462 
1463     uint32_t hash = toeplitz_hash(sizeof(default_rsskey_40bytes),
1464         default_rsskey_40bytes, datalen, data);
1465 
1466     return ((hash & (reta_size - 1)) % nb_queues) == queueid;
1467 }
1468 
1469 void
1470 ff_regist_packet_dispatcher(dispatch_func_t func)
1471 {
1472     packet_dispatcher = func;
1473 }
1474