xref: /f-stack/lib/ff_dpdk_if.c (revision 3b2bd0f6)
1 /*
2  * Copyright (C) 2017 THL A29 Limited, a Tencent company.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * 1. Redistributions of source code must retain the above copyright notice, this
9  *   list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright notice,
11  *   this list of conditions and the following disclaimer in the documentation
12  *   and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17  * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
18  * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
19  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
20  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
21  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
22  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
23  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24  *
25  */
26 #include <assert.h>
27 
28 #include <rte_common.h>
29 #include <rte_byteorder.h>
30 #include <rte_log.h>
31 #include <rte_memory.h>
32 #include <rte_memcpy.h>
33 #include <rte_memzone.h>
34 #include <rte_config.h>
35 #include <rte_eal.h>
36 #include <rte_pci.h>
37 #include <rte_mbuf.h>
38 #include <rte_memory.h>
39 #include <rte_lcore.h>
40 #include <rte_launch.h>
41 #include <rte_ethdev.h>
42 #include <rte_debug.h>
43 #include <rte_common.h>
44 #include <rte_ether.h>
45 #include <rte_malloc.h>
46 #include <rte_cycles.h>
47 #include <rte_timer.h>
48 #include <rte_thash.h>
49 #include <rte_ip.h>
50 #include <rte_tcp.h>
51 #include <rte_udp.h>
52 
53 #include "ff_dpdk_if.h"
54 #include "ff_dpdk_pcap.h"
55 #include "ff_dpdk_kni.h"
56 #include "ff_config.h"
57 #include "ff_veth.h"
58 #include "ff_host_interface.h"
59 #include "ff_msg.h"
60 #include "ff_api.h"
61 
62 #define MEMPOOL_CACHE_SIZE 256
63 
64 #define ARP_RING_SIZE 2048
65 
66 #define MSG_RING_SIZE 32
67 
68 /*
69  * Configurable number of RX/TX ring descriptors
70  */
71 #define RX_QUEUE_SIZE 512
72 #define TX_QUEUE_SIZE 512
73 
74 #define MAX_PKT_BURST 32
75 #define BURST_TX_DRAIN_US 100 /* TX drain every ~100us */
76 
77 /*
78  * Try to avoid TX buffering if we have at least MAX_TX_BURST packets to send.
79  */
80 #define MAX_TX_BURST    (MAX_PKT_BURST / 2)
81 
82 #define NB_SOCKETS 8
83 
84 /* Configure how many packets ahead to prefetch, when reading packets */
85 #define PREFETCH_OFFSET    3
86 
87 #define MAX_RX_QUEUE_PER_LCORE 16
88 #define MAX_TX_QUEUE_PER_PORT RTE_MAX_ETHPORTS
89 #define MAX_RX_QUEUE_PER_PORT 128
90 
91 #define KNI_MBUF_MAX 2048
92 #define KNI_QUEUE_SIZE 2048
93 
94 static int enable_kni;
95 static int kni_accept;
96 
97 static int numa_on;
98 
99 static struct rte_timer freebsd_clock;
100 
101 // Mellanox Linux's driver key
102 static uint8_t default_rsskey_40bytes[40] = {
103     0xd1, 0x81, 0xc6, 0x2c, 0xf7, 0xf4, 0xdb, 0x5b,
104     0x19, 0x83, 0xa2, 0xfc, 0x94, 0x3e, 0x1a, 0xdb,
105     0xd9, 0x38, 0x9e, 0x6b, 0xd1, 0x03, 0x9c, 0x2c,
106     0xa7, 0x44, 0x99, 0xad, 0x59, 0x3d, 0x56, 0xd9,
107     0xf3, 0x25, 0x3c, 0x06, 0x2a, 0xdc, 0x1f, 0xfc
108 };
109 
110 static struct rte_eth_conf default_port_conf = {
111     .rxmode = {
112         .mq_mode = ETH_MQ_RX_RSS,
113         .max_rx_pkt_len = ETHER_MAX_LEN,
114         .split_hdr_size = 0, /**< hdr buf size */
115         .header_split   = 0, /**< Header Split disabled */
116         .hw_ip_checksum = 0, /**< IP checksum offload disabled */
117         .hw_vlan_filter = 0, /**< VLAN filtering disabled */
118         .hw_vlan_strip  = 0, /**< VLAN strip disabled. */
119         .hw_vlan_extend = 0, /**< Extended VLAN disabled. */
120         .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
121         .hw_strip_crc   = 0, /**< CRC stripped by hardware */
122         .enable_lro     = 0, /**< LRO disabled */
123     },
124     .rx_adv_conf = {
125         .rss_conf = {
126             .rss_key = default_rsskey_40bytes,
127             .rss_key_len = 40,
128             .rss_hf = ETH_RSS_PROTO_MASK,
129         },
130     },
131     .txmode = {
132         .mq_mode = ETH_MQ_TX_NONE,
133     },
134 };
135 
136 struct mbuf_table {
137     uint16_t len;
138     struct rte_mbuf *m_table[MAX_PKT_BURST];
139 };
140 
141 struct lcore_rx_queue {
142     uint8_t port_id;
143     uint8_t queue_id;
144 } __rte_cache_aligned;
145 
146 struct lcore_conf {
147     uint16_t proc_id;
148     uint16_t socket_id;
149     uint16_t nb_queue_list[RTE_MAX_ETHPORTS];
150     struct ff_port_cfg *port_cfgs;
151 
152     uint16_t nb_rx_queue;
153     struct lcore_rx_queue rx_queue_list[MAX_RX_QUEUE_PER_LCORE];
154     uint16_t nb_tx_port;
155     uint16_t tx_port_id[RTE_MAX_ETHPORTS];
156     uint16_t tx_queue_id[RTE_MAX_ETHPORTS];
157     struct mbuf_table tx_mbufs[RTE_MAX_ETHPORTS];
158     char *pcap[RTE_MAX_ETHPORTS];
159 } __rte_cache_aligned;
160 
161 static struct lcore_conf lcore_conf;
162 
163 static struct rte_mempool *pktmbuf_pool[NB_SOCKETS];
164 
165 static struct rte_ring **arp_ring[RTE_MAX_ETHPORTS];
166 
167 static uint16_t rss_reta_size[RTE_MAX_ETHPORTS];
168 
169 struct ff_msg_ring {
170     char ring_name[2][RTE_RING_NAMESIZE];
171     /* ring[0] for lcore recv msg, other send */
172     /* ring[1] for lcore send msg, other read */
173     struct rte_ring *ring[2];
174 } __rte_cache_aligned;
175 
176 static struct ff_msg_ring msg_ring[RTE_MAX_LCORE];
177 static struct rte_mempool *message_pool;
178 
179 struct ff_dpdk_if_context {
180     void *sc;
181     void *ifp;
182     uint16_t port_id;
183     struct ff_hw_features hw_features;
184 } __rte_cache_aligned;
185 
186 static struct ff_dpdk_if_context *veth_ctx[RTE_MAX_ETHPORTS];
187 
188 extern void ff_hardclock(void);
189 
190 static void
191 ff_hardclock_job(__rte_unused struct rte_timer *timer,
192     __rte_unused void *arg) {
193     ff_hardclock();
194     ff_update_current_ts();
195 }
196 
197 struct ff_dpdk_if_context *
198 ff_dpdk_register_if(void *sc, void *ifp, struct ff_port_cfg *cfg)
199 {
200     struct ff_dpdk_if_context *ctx;
201 
202     ctx = calloc(1, sizeof(struct ff_dpdk_if_context));
203     if (ctx == NULL)
204         return NULL;
205 
206     ctx->sc = sc;
207     ctx->ifp = ifp;
208     ctx->port_id = cfg->port_id;
209     ctx->hw_features = cfg->hw_features;
210 
211     return ctx;
212 }
213 
214 void
215 ff_dpdk_deregister_if(struct ff_dpdk_if_context *ctx)
216 {
217     free(ctx);
218 }
219 
220 static void
221 check_all_ports_link_status(void)
222 {
223     #define CHECK_INTERVAL 100 /* 100ms */
224     #define MAX_CHECK_TIME 90  /* 9s (90 * 100ms) in total */
225 
226     uint8_t portid, count, all_ports_up, print_flag = 0;
227     struct rte_eth_link link;
228 
229     printf("\nChecking link status");
230     fflush(stdout);
231 
232     int i, nb_ports;
233     nb_ports = ff_global_cfg.dpdk.nb_ports;
234     for (count = 0; count <= MAX_CHECK_TIME; count++) {
235         all_ports_up = 1;
236         for (i = 0; i < nb_ports; i++) {
237             uint8_t portid = ff_global_cfg.dpdk.portid_list[i];
238             memset(&link, 0, sizeof(link));
239             rte_eth_link_get_nowait(portid, &link);
240 
241             /* print link status if flag set */
242             if (print_flag == 1) {
243                 if (link.link_status) {
244                     printf("Port %d Link Up - speed %u "
245                         "Mbps - %s\n", (int)portid,
246                         (unsigned)link.link_speed,
247                         (link.link_duplex == ETH_LINK_FULL_DUPLEX) ?
248                         ("full-duplex") : ("half-duplex\n"));
249                 } else {
250                     printf("Port %d Link Down\n", (int)portid);
251                 }
252                 continue;
253             }
254             /* clear all_ports_up flag if any link down */
255             if (link.link_status == 0) {
256                 all_ports_up = 0;
257                 break;
258             }
259         }
260 
261         /* after finally printing all link status, get out */
262         if (print_flag == 1)
263             break;
264 
265         if (all_ports_up == 0) {
266             printf(".");
267             fflush(stdout);
268             rte_delay_ms(CHECK_INTERVAL);
269         }
270 
271         /* set the print_flag if all ports up or timeout */
272         if (all_ports_up == 1 || count == (MAX_CHECK_TIME - 1)) {
273             print_flag = 1;
274             printf("done\n");
275         }
276     }
277 }
278 
279 static int
280 init_lcore_conf(void)
281 {
282     uint8_t nb_dev_ports = rte_eth_dev_count();
283     if (nb_dev_ports == 0) {
284         rte_exit(EXIT_FAILURE, "No probed ethernet devices\n");
285     }
286 
287     if (ff_global_cfg.dpdk.max_portid >= nb_dev_ports) {
288         rte_exit(EXIT_FAILURE, "this machine doesn't have port %d.\n",
289                  ff_global_cfg.dpdk.max_portid);
290     }
291 
292     lcore_conf.port_cfgs = ff_global_cfg.dpdk.port_cfgs;
293     lcore_conf.proc_id = ff_global_cfg.dpdk.proc_id;
294 
295     uint16_t proc_id;
296     for (proc_id = 0; proc_id < ff_global_cfg.dpdk.nb_procs; proc_id++) {
297         uint16_t lcore_id = ff_global_cfg.dpdk.proc_lcore[proc_id];
298         if (!lcore_config[lcore_id].detected) {
299             rte_exit(EXIT_FAILURE, "lcore %u unavailable\n", lcore_id);
300         }
301     }
302 
303     uint16_t socket_id = 0;
304     if (numa_on) {
305         socket_id = rte_lcore_to_socket_id(rte_lcore_id());
306     }
307 
308     lcore_conf.socket_id = socket_id;
309 
310     uint16_t lcore_id = ff_global_cfg.dpdk.proc_lcore[lcore_conf.proc_id];
311     int j;
312     for (j = 0; j < ff_global_cfg.dpdk.nb_ports; ++j) {
313         uint16_t port_id = ff_global_cfg.dpdk.portid_list[j];
314         struct ff_port_cfg *pconf = &ff_global_cfg.dpdk.port_cfgs[port_id];
315 
316         int queueid = -1;
317         int i;
318         for (i = 0; i < pconf->nb_lcores; i++) {
319             if (pconf->lcore_list[i] == lcore_id) {
320                 queueid = i;
321             }
322         }
323         if (queueid < 0) {
324             continue;
325         }
326         printf("lcore: %u, port: %u, queue: %u\n", lcore_id, port_id, queueid);
327         uint16_t nb_rx_queue = lcore_conf.nb_rx_queue;
328         lcore_conf.rx_queue_list[nb_rx_queue].port_id = port_id;
329         lcore_conf.rx_queue_list[nb_rx_queue].queue_id = queueid;
330         lcore_conf.nb_rx_queue++;
331 
332         lcore_conf.tx_queue_id[port_id] = queueid;
333         lcore_conf.tx_port_id[lcore_conf.nb_tx_port] = port_id;
334         lcore_conf.nb_tx_port++;
335 
336         lcore_conf.pcap[port_id] = pconf->pcap;
337         lcore_conf.nb_queue_list[port_id] = pconf->nb_lcores;
338     }
339 
340     return 0;
341 }
342 
343 static int
344 init_mem_pool(void)
345 {
346     uint8_t nb_ports = ff_global_cfg.dpdk.nb_ports;
347     uint32_t nb_lcores = ff_global_cfg.dpdk.nb_procs;
348     uint32_t nb_tx_queue = nb_lcores;
349     uint32_t nb_rx_queue = lcore_conf.nb_rx_queue * nb_lcores;
350 
351     unsigned nb_mbuf = RTE_MAX (
352         (nb_rx_queue*RX_QUEUE_SIZE          +
353         nb_ports*nb_lcores*MAX_PKT_BURST    +
354         nb_ports*nb_tx_queue*TX_QUEUE_SIZE  +
355         nb_lcores*MEMPOOL_CACHE_SIZE +
356         nb_ports*KNI_MBUF_MAX +
357         nb_ports*KNI_QUEUE_SIZE +
358         nb_lcores*nb_ports*ARP_RING_SIZE),
359         (unsigned)8192);
360 
361     unsigned socketid = 0;
362     uint16_t i, lcore_id;
363     char s[64];
364 
365     for (i = 0; i < ff_global_cfg.dpdk.nb_procs; i++) {
366         lcore_id = ff_global_cfg.dpdk.proc_lcore[i];
367         if (numa_on) {
368             socketid = rte_lcore_to_socket_id(lcore_id);
369         }
370 
371         if (socketid >= NB_SOCKETS) {
372             rte_exit(EXIT_FAILURE, "Socket %d of lcore %u is out of range %d\n",
373                 socketid, i, NB_SOCKETS);
374         }
375 
376         if (pktmbuf_pool[socketid] != NULL) {
377             continue;
378         }
379 
380         if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
381             snprintf(s, sizeof(s), "mbuf_pool_%d", socketid);
382             pktmbuf_pool[socketid] =
383                 rte_pktmbuf_pool_create(s, nb_mbuf,
384                     MEMPOOL_CACHE_SIZE, 0,
385                     RTE_MBUF_DEFAULT_BUF_SIZE, socketid);
386         } else {
387             snprintf(s, sizeof(s), "mbuf_pool_%d", socketid);
388             pktmbuf_pool[socketid] = rte_mempool_lookup(s);
389         }
390 
391         if (pktmbuf_pool[socketid] == NULL) {
392             rte_exit(EXIT_FAILURE, "Cannot create mbuf pool on socket %d\n", socketid);
393         } else {
394             printf("create mbuf pool on socket %d\n", socketid);
395         }
396     }
397 
398     return 0;
399 }
400 
401 static struct rte_ring *
402 create_ring(const char *name, unsigned count, int socket_id, unsigned flags)
403 {
404     struct rte_ring *ring;
405 
406     if (name == NULL)
407         return NULL;
408 
409     /* If already create, just attached it */
410     if (likely((ring = rte_ring_lookup(name)) != NULL))
411         return ring;
412 
413     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
414         return rte_ring_create(name, count, socket_id, flags);
415     } else {
416         return rte_ring_lookup(name);
417     }
418 }
419 
420 static int
421 init_arp_ring(void)
422 {
423     int j;
424     char name_buf[RTE_RING_NAMESIZE];
425     int queueid;
426 
427     unsigned socketid = lcore_conf.socket_id;
428 
429     /* Create ring according to ports actually being used. */
430     int nb_ports = ff_global_cfg.dpdk.nb_ports;
431     for (j = 0; j < nb_ports; j++) {
432         uint16_t portid = ff_global_cfg.dpdk.portid_list[j];
433         struct ff_port_cfg *pconf = &ff_global_cfg.dpdk.port_cfgs[portid];
434         int nb_queues = pconf->nb_lcores;
435         if (arp_ring[portid] == NULL) {
436             snprintf(name_buf, RTE_RING_NAMESIZE, "ring_ptr_p%d", portid);
437 
438             arp_ring[portid] = rte_zmalloc(name_buf,
439                                       sizeof(struct rte_ring *) * nb_queues,
440                                       RTE_CACHE_LINE_SIZE);
441             if (arp_ring[portid] == NULL) {
442                 rte_exit(EXIT_FAILURE, "rte_zmalloc(%s (struct rte_ring*)) "
443                          "failed\n", name_buf);
444             }
445         }
446 
447         for(queueid = 0; queueid < nb_queues; ++queueid) {
448             snprintf(name_buf, RTE_RING_NAMESIZE, "arp_ring_p%d_q%d", portid, queueid);
449             arp_ring[portid][queueid] = create_ring(name_buf, ARP_RING_SIZE,
450                 socketid, RING_F_SC_DEQ);
451 
452             if (arp_ring[portid][queueid] == NULL)
453                 rte_panic("create ring:%s failed!\n", name_buf);
454 
455             printf("create ring:%s success, %u ring entries are now free!\n",
456                 name_buf, rte_ring_free_count(arp_ring[portid][queueid]));
457         }
458     }
459 
460     return 0;
461 }
462 
463 static void
464 ff_msg_init(struct rte_mempool *mp,
465     __attribute__((unused)) void *opaque_arg,
466     void *obj, __attribute__((unused)) unsigned i)
467 {
468     struct ff_msg *msg = (struct ff_msg *)obj;
469     msg->msg_type = FF_UNKNOWN;
470     msg->buf_addr = (char *)msg + sizeof(struct ff_msg);
471     msg->buf_len = mp->elt_size - sizeof(struct ff_msg);
472 }
473 
474 static int
475 init_msg_ring(void)
476 {
477     uint16_t i;
478     uint16_t nb_procs = ff_global_cfg.dpdk.nb_procs;
479     unsigned socketid = lcore_conf.socket_id;
480 
481     /* Create message buffer pool */
482     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
483         message_pool = rte_mempool_create(FF_MSG_POOL,
484            MSG_RING_SIZE * 2 * nb_procs,
485            MAX_MSG_BUF_SIZE, MSG_RING_SIZE / 2, 0,
486            NULL, NULL, ff_msg_init, NULL,
487            socketid, 0);
488     } else {
489         message_pool = rte_mempool_lookup(FF_MSG_POOL);
490     }
491 
492     if (message_pool == NULL) {
493         rte_panic("Create msg mempool failed\n");
494     }
495 
496     for(i = 0; i < nb_procs; ++i) {
497         snprintf(msg_ring[i].ring_name[0], RTE_RING_NAMESIZE,
498             "%s%u", FF_MSG_RING_IN, i);
499         snprintf(msg_ring[i].ring_name[1], RTE_RING_NAMESIZE,
500             "%s%u", FF_MSG_RING_OUT, i);
501 
502         msg_ring[i].ring[0] = create_ring(msg_ring[i].ring_name[0],
503             MSG_RING_SIZE, socketid, RING_F_SP_ENQ | RING_F_SC_DEQ);
504         if (msg_ring[i].ring[0] == NULL)
505             rte_panic("create ring::%s failed!\n", msg_ring[i].ring_name[0]);
506 
507         msg_ring[i].ring[1] = create_ring(msg_ring[i].ring_name[1],
508             MSG_RING_SIZE, socketid, RING_F_SP_ENQ | RING_F_SC_DEQ);
509         if (msg_ring[i].ring[1] == NULL)
510             rte_panic("create ring::%s failed!\n", msg_ring[i].ring_name[0]);
511     }
512 
513     return 0;
514 }
515 
516 static int
517 init_kni(void)
518 {
519     int nb_ports = rte_eth_dev_count();
520     kni_accept = 0;
521     if(strcasecmp(ff_global_cfg.kni.method, "accept") == 0)
522         kni_accept = 1;
523 
524     ff_kni_init(nb_ports, ff_global_cfg.kni.tcp_port,
525         ff_global_cfg.kni.udp_port);
526 
527     unsigned socket_id = lcore_conf.socket_id;
528     struct rte_mempool *mbuf_pool = pktmbuf_pool[socket_id];
529 
530     nb_ports = ff_global_cfg.dpdk.nb_ports;
531     int i, ret;
532     for (i = 0; i < nb_ports; i++) {
533         uint16_t port_id = ff_global_cfg.dpdk.portid_list[i];
534         ff_kni_alloc(port_id, socket_id, mbuf_pool, KNI_QUEUE_SIZE);
535     }
536 
537     return 0;
538 }
539 
540 static void
541 set_rss_table(uint8_t port_id, uint16_t reta_size, uint16_t nb_queues)
542 {
543     if (reta_size == 0) {
544         return;
545     }
546 
547     int reta_conf_size = RTE_MAX(1, reta_size / RTE_RETA_GROUP_SIZE);
548     struct rte_eth_rss_reta_entry64 reta_conf[reta_conf_size];
549 
550     /* config HW indirection table */
551     unsigned i, j, hash=0;
552     for (i = 0; i < reta_conf_size; i++) {
553         reta_conf[i].mask = ~0ULL;
554         for (j = 0; j < RTE_RETA_GROUP_SIZE; j++) {
555             reta_conf[i].reta[j] = hash++ % nb_queues;
556         }
557     }
558 
559     if (rte_eth_dev_rss_reta_update(port_id, reta_conf, reta_size)) {
560         rte_exit(EXIT_FAILURE, "port[%d], failed to update rss table\n",
561             port_id);
562     }
563 }
564 
565 static int
566 init_port_start(void)
567 {
568     int nb_ports = ff_global_cfg.dpdk.nb_ports;
569     unsigned socketid = rte_lcore_to_socket_id(rte_lcore_id());
570     struct rte_mempool *mbuf_pool = pktmbuf_pool[socketid];
571     uint16_t i;
572 
573     for (i = 0; i < nb_ports; i++) {
574         uint16_t port_id = ff_global_cfg.dpdk.portid_list[i];
575         struct ff_port_cfg *pconf = &ff_global_cfg.dpdk.port_cfgs[port_id];
576         uint16_t nb_queues = pconf->nb_lcores;
577 
578         struct rte_eth_dev_info dev_info;
579         rte_eth_dev_info_get(port_id, &dev_info);
580 
581         if (nb_queues > dev_info.max_rx_queues) {
582             rte_exit(EXIT_FAILURE, "num_procs[%d] bigger than max_rx_queues[%d]\n",
583                 nb_queues,
584                 dev_info.max_rx_queues);
585         }
586 
587         if (nb_queues > dev_info.max_tx_queues) {
588             rte_exit(EXIT_FAILURE, "num_procs[%d] bigger than max_tx_queues[%d]\n",
589                 nb_queues,
590                 dev_info.max_tx_queues);
591         }
592 
593         struct ether_addr addr;
594         rte_eth_macaddr_get(port_id, &addr);
595         printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
596                    " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
597                 (unsigned)port_id,
598                 addr.addr_bytes[0], addr.addr_bytes[1],
599                 addr.addr_bytes[2], addr.addr_bytes[3],
600                 addr.addr_bytes[4], addr.addr_bytes[5]);
601 
602         rte_memcpy(pconf->mac,
603             addr.addr_bytes, ETHER_ADDR_LEN);
604 
605         /* Clear txq_flags - we do not need multi-mempool and refcnt */
606         dev_info.default_txconf.txq_flags = ETH_TXQ_FLAGS_NOMULTMEMP |
607             ETH_TXQ_FLAGS_NOREFCOUNT;
608 
609         /* Disable features that are not supported by port's HW */
610         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM)) {
611             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMUDP;
612         }
613 
614         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)) {
615             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMTCP;
616         }
617 
618         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_SCTP_CKSUM)) {
619             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMSCTP;
620         }
621 
622         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_VLAN_INSERT)) {
623             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOVLANOFFL;
624         }
625 
626         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_VLAN_INSERT)) {
627             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOVLANOFFL;
628         }
629 
630         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_TSO) &&
631             !(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_TSO)) {
632             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOMULTSEGS;
633         }
634 
635         struct rte_eth_conf port_conf = {0};
636 
637         /* Set RSS mode */
638         port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
639         port_conf.rx_adv_conf.rss_conf.rss_hf = ETH_RSS_PROTO_MASK;
640         port_conf.rx_adv_conf.rss_conf.rss_key = default_rsskey_40bytes;
641         port_conf.rx_adv_conf.rss_conf.rss_key_len = 40;
642 
643         /* Set Rx VLAN stripping */
644         if (ff_global_cfg.dpdk.vlan_strip) {
645             if (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_VLAN_STRIP) {
646                 port_conf.rxmode.hw_vlan_strip = 1;
647             }
648         }
649 
650         /* Enable HW CRC stripping */
651         port_conf.rxmode.hw_strip_crc = 1;
652 
653         /* FIXME: Enable TCP LRO ?*/
654         #if 0
655         if (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_LRO) {
656             printf("LRO is supported\n");
657             port_conf.rxmode.enable_lro = 1;
658             pconf->hw_features.rx_lro = 1;
659         }
660         #endif
661 
662         /* Set Rx checksum checking */
663         if ((dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
664             (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_UDP_CKSUM) &&
665             (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)) {
666             printf("RX checksum offload supported\n");
667             port_conf.rxmode.hw_ip_checksum = 1;
668             pconf->hw_features.rx_csum = 1;
669         }
670 
671         if ((dev_info.tx_offload_capa & DEV_TX_OFFLOAD_IPV4_CKSUM)) {
672             printf("TX ip checksum offload supported\n");
673             pconf->hw_features.tx_csum_ip = 1;
674         }
675 
676         if ((dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM) &&
677             (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)) {
678             printf("TX TCP&UDP checksum offload supported\n");
679             pconf->hw_features.tx_csum_l4 = 1;
680         }
681 
682         if (ff_global_cfg.dpdk.tso) {
683             if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_TSO) {
684                 printf("TSO is supported\n");
685                 pconf->hw_features.tx_tso = 1;
686             }
687         } else {
688             printf("TSO is disabled\n");
689         }
690 
691         if (dev_info.reta_size) {
692             /* reta size must be power of 2 */
693             assert((dev_info.reta_size & (dev_info.reta_size - 1)) == 0);
694 
695             rss_reta_size[port_id] = dev_info.reta_size;
696             printf("port[%d]: rss table size: %d\n", port_id,
697                 dev_info.reta_size);
698         }
699 
700         if (rte_eal_process_type() != RTE_PROC_PRIMARY) {
701             continue;
702         }
703 
704         int ret = rte_eth_dev_configure(port_id, nb_queues, nb_queues, &port_conf);
705         if (ret != 0) {
706             return ret;
707         }
708         uint16_t q;
709         for (q = 0; q < nb_queues; q++) {
710             ret = rte_eth_tx_queue_setup(port_id, q, TX_QUEUE_SIZE,
711                 socketid, &dev_info.default_txconf);
712             if (ret < 0) {
713                 return ret;
714             }
715 
716             ret = rte_eth_rx_queue_setup(port_id, q, RX_QUEUE_SIZE,
717                 socketid, &dev_info.default_rxconf, mbuf_pool);
718             if (ret < 0) {
719                 return ret;
720             }
721         }
722 
723         ret = rte_eth_dev_start(port_id);
724         if (ret < 0) {
725             return ret;
726         }
727 
728         if (nb_queues > 1) {
729             /* set HW rss hash function to Toeplitz. */
730             if (!rte_eth_dev_filter_supported(port_id, RTE_ETH_FILTER_HASH)) {
731                 struct rte_eth_hash_filter_info info = {0};
732                 info.info_type = RTE_ETH_HASH_FILTER_GLOBAL_CONFIG;
733                 info.info.global_conf.hash_func = RTE_ETH_HASH_FUNCTION_TOEPLITZ;
734 
735                 if (rte_eth_dev_filter_ctrl(port_id, RTE_ETH_FILTER_HASH,
736                     RTE_ETH_FILTER_SET, &info) < 0) {
737                     rte_exit(EXIT_FAILURE, "port[%d] set hash func failed\n",
738                         port_id);
739                 }
740             }
741 
742             set_rss_table(port_id, dev_info.reta_size, nb_queues);
743         }
744 
745         /* Enable RX in promiscuous mode for the Ethernet device. */
746         if (ff_global_cfg.dpdk.promiscuous) {
747             rte_eth_promiscuous_enable(port_id);
748             ret = rte_eth_promiscuous_get(port_id);
749             if (ret == 1) {
750                 printf("set port %u to promiscuous mode ok\n", port_id);
751             } else {
752                 printf("set port %u to promiscuous mode error\n", port_id);
753             }
754         }
755 
756         /* Enable pcap dump */
757         if (pconf->pcap) {
758             ff_enable_pcap(pconf->pcap);
759         }
760     }
761 
762     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
763         check_all_ports_link_status();
764     }
765 
766     return 0;
767 }
768 
769 static int
770 init_clock(void)
771 {
772     rte_timer_subsystem_init();
773     uint64_t hz = rte_get_timer_hz();
774     uint64_t intrs = MS_PER_S/ff_global_cfg.freebsd.hz;
775     uint64_t tsc = (hz + MS_PER_S - 1) / MS_PER_S*intrs;
776 
777     rte_timer_init(&freebsd_clock);
778     rte_timer_reset(&freebsd_clock, tsc, PERIODICAL,
779         rte_lcore_id(), &ff_hardclock_job, NULL);
780 
781     ff_update_current_ts();
782 
783     return 0;
784 }
785 
786 int
787 ff_dpdk_init(int argc, char **argv)
788 {
789     if (ff_global_cfg.dpdk.nb_procs < 1 ||
790         ff_global_cfg.dpdk.nb_procs > RTE_MAX_LCORE ||
791         ff_global_cfg.dpdk.proc_id >= ff_global_cfg.dpdk.nb_procs ||
792         ff_global_cfg.dpdk.proc_id < 0) {
793         printf("param num_procs[%d] or proc_id[%d] error!\n",
794             ff_global_cfg.dpdk.nb_procs,
795             ff_global_cfg.dpdk.proc_id);
796         exit(1);
797     }
798 
799     int ret = rte_eal_init(argc, argv);
800     if (ret < 0) {
801         rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
802     }
803 
804     numa_on = ff_global_cfg.dpdk.numa_on;
805 
806     init_lcore_conf();
807 
808     init_mem_pool();
809 
810     init_arp_ring();
811 
812     init_msg_ring();
813 
814     enable_kni = ff_global_cfg.kni.enable;
815     if (enable_kni) {
816         init_kni();
817     }
818 
819     ret = init_port_start();
820     if (ret < 0) {
821         rte_exit(EXIT_FAILURE, "init_port_start failed\n");
822     }
823 
824     init_clock();
825 
826     return 0;
827 }
828 
829 static void
830 ff_veth_input(const struct ff_dpdk_if_context *ctx, struct rte_mbuf *pkt)
831 {
832     uint8_t rx_csum = ctx->hw_features.rx_csum;
833     if (rx_csum) {
834         if (pkt->ol_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD)) {
835             return;
836         }
837     }
838 
839     /*
840      * FIXME: should we save pkt->vlan_tci
841      * if (pkt->ol_flags & PKT_RX_VLAN_PKT)
842      */
843 
844     void *data = rte_pktmbuf_mtod(pkt, void*);
845     uint16_t len = rte_pktmbuf_data_len(pkt);
846 
847     void *hdr = ff_mbuf_gethdr(pkt, pkt->pkt_len, data, len, rx_csum);
848     if (hdr == NULL) {
849         rte_pktmbuf_free(pkt);
850         return;
851     }
852 
853     struct rte_mbuf *pn = pkt->next;
854     void *prev = hdr;
855     while(pn != NULL) {
856         data = rte_pktmbuf_mtod(pkt, void*);
857         len = rte_pktmbuf_data_len(pkt);
858 
859         void *mb = ff_mbuf_get(prev, data, len);
860         if (mb == NULL) {
861             ff_mbuf_free(hdr);
862             rte_pktmbuf_free(pkt);
863             return;
864         }
865         pn = pn->next;
866         prev = mb;
867     }
868 
869     ff_veth_process_packet(ctx->ifp, hdr);
870 }
871 
872 static enum FilterReturn
873 protocol_filter(const void *data, uint16_t len)
874 {
875     if(len < sizeof(struct ether_hdr))
876         return FILTER_UNKNOWN;
877 
878     const struct ether_hdr *hdr;
879     hdr = (const struct ether_hdr *)data;
880 
881     if(ntohs(hdr->ether_type) == ETHER_TYPE_ARP)
882         return FILTER_ARP;
883 
884     if (!enable_kni) {
885         return FILTER_UNKNOWN;
886     }
887 
888     if(ntohs(hdr->ether_type) != ETHER_TYPE_IPv4)
889         return FILTER_UNKNOWN;
890 
891     return ff_kni_proto_filter(data + sizeof(struct ether_hdr),
892         len - sizeof(struct ether_hdr));
893 }
894 
895 static inline void
896 process_packets(uint8_t port_id, uint16_t queue_id, struct rte_mbuf **bufs,
897     uint16_t count, const struct ff_dpdk_if_context *ctx, int pkts_from_ring)
898 {
899     struct lcore_conf *qconf = &lcore_conf;
900 
901     uint16_t i;
902     for (i = 0; i < count; i++) {
903         struct rte_mbuf *rtem = bufs[i];
904 
905         if (unlikely(qconf->pcap[port_id] != NULL)) {
906             ff_dump_packets(qconf->pcap[port_id], rtem);
907         }
908 
909         void *data = rte_pktmbuf_mtod(rtem, void*);
910         uint16_t len = rte_pktmbuf_data_len(rtem);
911 
912         enum FilterReturn filter = protocol_filter(data, len);
913         if (filter == FILTER_ARP) {
914             struct rte_mempool *mbuf_pool;
915             struct rte_mbuf *mbuf_clone;
916             if (pkts_from_ring == 0) {
917                 uint16_t i;
918                 uint16_t nb_queues = qconf->nb_queue_list[port_id];
919                 for(i = 0; i < nb_queues; ++i) {
920                     if(i == queue_id)
921                         continue;
922 
923                     unsigned socket_id = 0;
924                     if (numa_on) {
925                         uint16_t lcore_id = qconf->port_cfgs[port_id].lcore_list[i];
926                         socket_id = rte_lcore_to_socket_id(lcore_id);
927                     }
928                     mbuf_pool = pktmbuf_pool[socket_id];
929                     mbuf_clone = rte_pktmbuf_clone(rtem, mbuf_pool);
930                     if(mbuf_clone) {
931                         int ret = rte_ring_enqueue(arp_ring[port_id][i], mbuf_clone);
932                         if (ret < 0)
933                             rte_pktmbuf_free(mbuf_clone);
934                     }
935                 }
936             }
937 
938             if (enable_kni && rte_eal_process_type() == RTE_PROC_PRIMARY) {
939                 mbuf_pool = pktmbuf_pool[qconf->socket_id];
940                 mbuf_clone = rte_pktmbuf_clone(rtem, mbuf_pool);
941                 if(mbuf_clone) {
942                     ff_kni_enqueue(port_id, mbuf_clone);
943                 }
944             }
945 
946             ff_veth_input(ctx, rtem);
947         } else if (enable_kni && ((filter == FILTER_KNI && kni_accept) ||
948             (filter == FILTER_UNKNOWN && !kni_accept)) ) {
949             ff_kni_enqueue(port_id, rtem);
950         } else {
951             ff_veth_input(ctx, rtem);
952         }
953     }
954 }
955 
956 static inline int
957 process_arp_ring(uint8_t port_id, uint16_t queue_id,
958     struct rte_mbuf **pkts_burst, const struct ff_dpdk_if_context *ctx)
959 {
960     /* read packet from ring buf and to process */
961     uint16_t nb_rb;
962     nb_rb = rte_ring_dequeue_burst(arp_ring[port_id][queue_id],
963         (void **)pkts_burst, MAX_PKT_BURST);
964 
965     if(nb_rb > 0) {
966         process_packets(port_id, queue_id, pkts_burst, nb_rb, ctx, 1);
967     }
968 
969     return 0;
970 }
971 
972 static inline void
973 handle_sysctl_msg(struct ff_msg *msg, uint16_t proc_id)
974 {
975     int ret = ff_sysctl(msg->sysctl.name, msg->sysctl.namelen,
976         msg->sysctl.old, msg->sysctl.oldlenp, msg->sysctl.new,
977         msg->sysctl.newlen);
978 
979     if (ret < 0) {
980         msg->result = errno;
981     } else {
982         msg->result = 0;
983     }
984 
985     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
986 }
987 
988 static inline void
989 handle_ioctl_msg(struct ff_msg *msg, uint16_t proc_id)
990 {
991     int fd, ret;
992     fd = ff_socket(AF_INET, SOCK_DGRAM, 0);
993     if (fd < 0) {
994         ret = -1;
995         goto done;
996     }
997 
998     ret = ff_ioctl(fd, msg->ioctl.cmd, msg->ioctl.data);
999 
1000     ff_close(fd);
1001 
1002 done:
1003     if (ret < 0) {
1004         msg->result = errno;
1005     } else {
1006         msg->result = 0;
1007     }
1008 
1009     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
1010 }
1011 
1012 static inline void
1013 handle_route_msg(struct ff_msg *msg, uint16_t proc_id)
1014 {
1015     int ret = ff_rtioctl(msg->route.fib, msg->route.data,
1016         &msg->route.len, msg->route.maxlen);
1017     if (ret < 0) {
1018         msg->result = errno;
1019     } else {
1020         msg->result = 0;
1021     }
1022 
1023     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
1024 }
1025 
1026 static struct ff_top_args ff_status;
1027 static inline void
1028 handle_top_msg(struct ff_msg *msg, uint16_t proc_id)
1029 {
1030     msg->top = ff_status;
1031     msg->result = 0;
1032 
1033     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
1034 }
1035 
1036 #ifdef FF_NETGRAPH
1037 static inline void
1038 handle_ngctl_msg(struct ff_msg *msg, uint16_t proc_id)
1039 {
1040     int ret = ff_ngctl(msg->ngctl.cmd, msg->ngctl.data);
1041     if (ret < 0) {
1042         msg->result = errno;
1043     } else {
1044         msg->result = 0;
1045         msg->ngctl.ret = ret;
1046     }
1047 
1048     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
1049 }
1050 #endif
1051 
1052 static inline void
1053 handle_default_msg(struct ff_msg *msg, uint16_t proc_id)
1054 {
1055     msg->result = ENOTSUP;
1056     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
1057 }
1058 
1059 static inline void
1060 handle_msg(struct ff_msg *msg, uint16_t proc_id)
1061 {
1062     switch (msg->msg_type) {
1063         case FF_SYSCTL:
1064             handle_sysctl_msg(msg, proc_id);
1065             break;
1066         case FF_IOCTL:
1067             handle_ioctl_msg(msg, proc_id);
1068             break;
1069         case FF_ROUTE:
1070             handle_route_msg(msg, proc_id);
1071             break;
1072         case FF_TOP:
1073             handle_top_msg(msg, proc_id);
1074             break;
1075 #ifdef FF_NETGRAPH
1076         case FF_NGCTL:
1077             handle_ngctl_msg(msg, proc_id);
1078             break;
1079 #endif
1080         default:
1081             handle_default_msg(msg, proc_id);
1082             break;
1083     }
1084 }
1085 
1086 static inline int
1087 process_msg_ring(uint16_t proc_id)
1088 {
1089     void *msg;
1090     int ret = rte_ring_dequeue(msg_ring[proc_id].ring[0], &msg);
1091 
1092     if (unlikely(ret == 0)) {
1093         handle_msg((struct ff_msg *)msg, proc_id);
1094     }
1095 
1096     return 0;
1097 }
1098 
1099 /* Send burst of packets on an output interface */
1100 static inline int
1101 send_burst(struct lcore_conf *qconf, uint16_t n, uint8_t port)
1102 {
1103     struct rte_mbuf **m_table;
1104     int ret;
1105     uint16_t queueid;
1106 
1107     queueid = qconf->tx_queue_id[port];
1108     m_table = (struct rte_mbuf **)qconf->tx_mbufs[port].m_table;
1109 
1110     if (unlikely(qconf->pcap[port] != NULL)) {
1111         uint16_t i;
1112         for (i = 0; i < n; i++) {
1113             ff_dump_packets(qconf->pcap[port], m_table[i]);
1114         }
1115     }
1116 
1117     ret = rte_eth_tx_burst(port, queueid, m_table, n);
1118     if (unlikely(ret < n)) {
1119         do {
1120             rte_pktmbuf_free(m_table[ret]);
1121         } while (++ret < n);
1122     }
1123 
1124     return 0;
1125 }
1126 
1127 /* Enqueue a single packet, and send burst if queue is filled */
1128 static inline int
1129 send_single_packet(struct rte_mbuf *m, uint8_t port)
1130 {
1131     uint16_t len;
1132     struct lcore_conf *qconf;
1133 
1134     qconf = &lcore_conf;
1135     len = qconf->tx_mbufs[port].len;
1136     qconf->tx_mbufs[port].m_table[len] = m;
1137     len++;
1138 
1139     /* enough pkts to be sent */
1140     if (unlikely(len == MAX_PKT_BURST)) {
1141         send_burst(qconf, MAX_PKT_BURST, port);
1142         len = 0;
1143     }
1144 
1145     qconf->tx_mbufs[port].len = len;
1146     return 0;
1147 }
1148 
1149 int
1150 ff_dpdk_if_send(struct ff_dpdk_if_context *ctx, void *m,
1151     int total)
1152 {
1153     struct rte_mempool *mbuf_pool = pktmbuf_pool[lcore_conf.socket_id];
1154     struct rte_mbuf *head = rte_pktmbuf_alloc(mbuf_pool);
1155     if (head == NULL) {
1156         ff_mbuf_free(m);
1157         return -1;
1158     }
1159 
1160     head->pkt_len = total;
1161     head->nb_segs = 0;
1162 
1163     int off = 0;
1164     struct rte_mbuf *cur = head, *prev = NULL;
1165     while(total > 0) {
1166         if (cur == NULL) {
1167             cur = rte_pktmbuf_alloc(mbuf_pool);
1168             if (cur == NULL) {
1169                 rte_pktmbuf_free(head);
1170                 ff_mbuf_free(m);
1171                 return -1;
1172             }
1173         }
1174 
1175         void *data = rte_pktmbuf_mtod(cur, void*);
1176         int len = total > RTE_MBUF_DEFAULT_DATAROOM ? RTE_MBUF_DEFAULT_DATAROOM : total;
1177         int ret = ff_mbuf_copydata(m, data, off, len);
1178         if (ret < 0) {
1179             rte_pktmbuf_free(head);
1180             ff_mbuf_free(m);
1181             return -1;
1182         }
1183 
1184         if (prev != NULL) {
1185             prev->next = cur;
1186         }
1187         prev = cur;
1188 
1189         cur->data_len = len;
1190         off += len;
1191         total -= len;
1192         head->nb_segs++;
1193         cur = NULL;
1194     }
1195 
1196     struct ff_tx_offload offload = {0};
1197     ff_mbuf_tx_offload(m, &offload);
1198 
1199     if (offload.ip_csum) {
1200         head->ol_flags |= PKT_TX_IP_CKSUM;
1201         head->l2_len = sizeof(struct ether_hdr);
1202         head->l3_len = sizeof(struct ipv4_hdr);
1203     }
1204 
1205     if (ctx->hw_features.tx_csum_l4) {
1206         if (offload.tcp_csum) {
1207             head->ol_flags |= PKT_TX_TCP_CKSUM;
1208             head->l2_len = sizeof(struct ether_hdr);
1209             head->l3_len = sizeof(struct ipv4_hdr);
1210         }
1211 
1212         if (offload.tso_seg_size) {
1213             head->ol_flags |= PKT_TX_TCP_SEG;
1214             head->l4_len = sizeof(struct tcp_hdr);
1215             head->tso_segsz = offload.tso_seg_size;
1216         }
1217 
1218         if (offload.udp_csum) {
1219             head->ol_flags |= PKT_TX_UDP_CKSUM;
1220             head->l2_len = sizeof(struct ether_hdr);
1221             head->l3_len = sizeof(struct ipv4_hdr);
1222         }
1223     }
1224 
1225     ff_mbuf_free(m);
1226 
1227     return send_single_packet(head, ctx->port_id);
1228 }
1229 
1230 static int
1231 main_loop(void *arg)
1232 {
1233     struct loop_routine *lr = (struct loop_routine *)arg;
1234 
1235     struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
1236     unsigned lcore_id;
1237     uint64_t prev_tsc, diff_tsc, cur_tsc, usch_tsc, div_tsc, usr_tsc, sys_tsc, end_tsc;
1238     int i, j, nb_rx, idle;
1239     uint8_t port_id, queue_id;
1240     struct lcore_conf *qconf;
1241     const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) /
1242         US_PER_S * BURST_TX_DRAIN_US;
1243     struct ff_dpdk_if_context *ctx;
1244 
1245     prev_tsc = 0;
1246     usch_tsc = 0;
1247 
1248     lcore_id = rte_lcore_id();
1249     qconf = &lcore_conf;
1250 
1251     if (qconf->nb_rx_queue == 0) {
1252         printf("lcore %u has nothing to do\n", lcore_id);
1253         return 0;
1254     }
1255 
1256     while (1) {
1257         cur_tsc = rte_rdtsc();
1258         if (unlikely(freebsd_clock.expire < cur_tsc)) {
1259             rte_timer_manage();
1260         }
1261 
1262         idle = 1;
1263         sys_tsc = 0;
1264         usr_tsc = 0;
1265 
1266         /*
1267          * TX burst queue drain
1268          */
1269         diff_tsc = cur_tsc - prev_tsc;
1270         if (unlikely(diff_tsc > drain_tsc)) {
1271             for (i = 0; i < qconf->nb_tx_port; i++) {
1272                 port_id = qconf->tx_port_id[i];
1273                 if (qconf->tx_mbufs[port_id].len == 0)
1274                     continue;
1275 
1276                 idle = 0;
1277 
1278                 send_burst(qconf,
1279                     qconf->tx_mbufs[port_id].len,
1280                     port_id);
1281                 qconf->tx_mbufs[port_id].len = 0;
1282             }
1283 
1284             prev_tsc = cur_tsc;
1285         }
1286 
1287         /*
1288          * Read packet from RX queues
1289          */
1290         for (i = 0; i < qconf->nb_rx_queue; ++i) {
1291             port_id = qconf->rx_queue_list[i].port_id;
1292             queue_id = qconf->rx_queue_list[i].queue_id;
1293             ctx = veth_ctx[port_id];
1294 
1295             if (enable_kni && rte_eal_process_type() == RTE_PROC_PRIMARY) {
1296                 ff_kni_process(port_id, queue_id, pkts_burst, MAX_PKT_BURST);
1297             }
1298 
1299             process_arp_ring(port_id, queue_id, pkts_burst, ctx);
1300 
1301             nb_rx = rte_eth_rx_burst(port_id, queue_id, pkts_burst,
1302                 MAX_PKT_BURST);
1303             if (nb_rx == 0)
1304                 continue;
1305 
1306             idle = 0;
1307 
1308             /* Prefetch first packets */
1309             for (j = 0; j < PREFETCH_OFFSET && j < nb_rx; j++) {
1310                 rte_prefetch0(rte_pktmbuf_mtod(
1311                         pkts_burst[j], void *));
1312             }
1313 
1314             /* Prefetch and handle already prefetched packets */
1315             for (j = 0; j < (nb_rx - PREFETCH_OFFSET); j++) {
1316                 rte_prefetch0(rte_pktmbuf_mtod(pkts_burst[
1317                         j + PREFETCH_OFFSET], void *));
1318                 process_packets(port_id, queue_id, &pkts_burst[j], 1, ctx, 0);
1319             }
1320 
1321             /* Handle remaining prefetched packets */
1322             for (; j < nb_rx; j++) {
1323                 process_packets(port_id, queue_id, &pkts_burst[j], 1, ctx, 0);
1324             }
1325         }
1326 
1327         process_msg_ring(qconf->proc_id);
1328 
1329         div_tsc = rte_rdtsc();
1330 
1331         if (likely(lr->loop != NULL && (!idle || cur_tsc - usch_tsc > drain_tsc))) {
1332             usch_tsc = cur_tsc;
1333             lr->loop(lr->arg);
1334         }
1335 
1336         end_tsc = rte_rdtsc();
1337 
1338         if (usch_tsc == cur_tsc) {
1339             usr_tsc = end_tsc - div_tsc;
1340         }
1341 
1342         if (!idle) {
1343             sys_tsc = div_tsc - cur_tsc;
1344             ff_status.sys_tsc += sys_tsc;
1345         }
1346 
1347         ff_status.usr_tsc += usr_tsc;
1348         ff_status.work_tsc += end_tsc - cur_tsc;
1349         ff_status.idle_tsc += end_tsc - cur_tsc - usr_tsc - sys_tsc;
1350 
1351         ff_status.loops++;
1352     }
1353 }
1354 
1355 int
1356 ff_dpdk_if_up(void) {
1357     int i;
1358     struct lcore_conf *qconf = &lcore_conf;
1359     for (i = 0; i < qconf->nb_tx_port; i++) {
1360         uint16_t port_id = qconf->tx_port_id[i];
1361 
1362         struct ff_port_cfg *pconf = &qconf->port_cfgs[port_id];
1363         veth_ctx[port_id] = ff_veth_attach(pconf);
1364         if (veth_ctx[port_id] == NULL) {
1365             rte_exit(EXIT_FAILURE, "ff_veth_attach failed");
1366         }
1367     }
1368 
1369     return 0;
1370 }
1371 
1372 void
1373 ff_dpdk_run(loop_func_t loop, void *arg) {
1374     struct loop_routine *lr = rte_malloc(NULL,
1375         sizeof(struct loop_routine), 0);
1376     lr->loop = loop;
1377     lr->arg = arg;
1378     rte_eal_mp_remote_launch(main_loop, lr, CALL_MASTER);
1379     rte_eal_mp_wait_lcore();
1380     rte_free(lr);
1381 }
1382 
1383 void
1384 ff_dpdk_pktmbuf_free(void *m)
1385 {
1386     rte_pktmbuf_free((struct rte_mbuf *)m);
1387 }
1388 
1389 static uint32_t
1390 toeplitz_hash(unsigned keylen, const uint8_t *key,
1391     unsigned datalen, const uint8_t *data)
1392 {
1393     uint32_t hash = 0, v;
1394     u_int i, b;
1395 
1396     /* XXXRW: Perhaps an assertion about key length vs. data length? */
1397 
1398     v = (key[0]<<24) + (key[1]<<16) + (key[2] <<8) + key[3];
1399     for (i = 0; i < datalen; i++) {
1400         for (b = 0; b < 8; b++) {
1401             if (data[i] & (1<<(7-b)))
1402                 hash ^= v;
1403             v <<= 1;
1404             if ((i + 4) < keylen &&
1405                 (key[i+4] & (1<<(7-b))))
1406                 v |= 1;
1407         }
1408     }
1409     return (hash);
1410 }
1411 
1412 int
1413 ff_rss_check(void *softc, uint32_t saddr, uint32_t daddr,
1414     uint16_t sport, uint16_t dport)
1415 {
1416     struct lcore_conf *qconf = &lcore_conf;
1417     struct ff_dpdk_if_context *ctx = ff_veth_softc_to_hostc(softc);
1418     uint16_t nb_queues = qconf->nb_queue_list[ctx->port_id];
1419 
1420     if (nb_queues <= 1) {
1421         return 1;
1422     }
1423 
1424     uint16_t reta_size = rss_reta_size[ctx->port_id];
1425     uint16_t queueid = qconf->tx_queue_id[ctx->port_id];
1426 
1427     uint8_t data[sizeof(saddr) + sizeof(daddr) + sizeof(sport) +
1428         sizeof(dport)];
1429 
1430     unsigned datalen = 0;
1431 
1432     bcopy(&saddr, &data[datalen], sizeof(saddr));
1433     datalen += sizeof(saddr);
1434 
1435     bcopy(&daddr, &data[datalen], sizeof(daddr));
1436     datalen += sizeof(daddr);
1437 
1438     bcopy(&sport, &data[datalen], sizeof(sport));
1439     datalen += sizeof(sport);
1440 
1441     bcopy(&dport, &data[datalen], sizeof(dport));
1442     datalen += sizeof(dport);
1443 
1444     uint32_t hash = toeplitz_hash(sizeof(default_rsskey_40bytes),
1445         default_rsskey_40bytes, datalen, data);
1446 
1447     return ((hash & (reta_size - 1)) % nb_queues) == queueid;
1448 }
1449