xref: /f-stack/lib/ff_dpdk_if.c (revision c36e692a)
1 /*
2  * Copyright (C) 2017 THL A29 Limited, a Tencent company.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * 1. Redistributions of source code must retain the above copyright notice, this
9  *   list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright notice,
11  *   this list of conditions and the following disclaimer in the documentation
12  *   and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17  * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
18  * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
19  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
20  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
21  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
22  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
23  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24  *
25  */
26 #include <assert.h>
27 
28 #include <rte_common.h>
29 #include <rte_byteorder.h>
30 #include <rte_log.h>
31 #include <rte_memory.h>
32 #include <rte_memcpy.h>
33 #include <rte_memzone.h>
34 #include <rte_config.h>
35 #include <rte_eal.h>
36 #include <rte_pci.h>
37 #include <rte_mbuf.h>
38 #include <rte_memory.h>
39 #include <rte_lcore.h>
40 #include <rte_launch.h>
41 #include <rte_ethdev.h>
42 #include <rte_debug.h>
43 #include <rte_common.h>
44 #include <rte_ether.h>
45 #include <rte_malloc.h>
46 #include <rte_cycles.h>
47 #include <rte_timer.h>
48 #include <rte_thash.h>
49 #include <rte_ip.h>
50 #include <rte_tcp.h>
51 #include <rte_udp.h>
52 
53 #include "ff_dpdk_if.h"
54 #include "ff_dpdk_pcap.h"
55 #include "ff_dpdk_kni.h"
56 #include "ff_config.h"
57 #include "ff_veth.h"
58 #include "ff_host_interface.h"
59 #include "ff_msg.h"
60 #include "ff_api.h"
61 
62 #define MEMPOOL_CACHE_SIZE 256
63 
64 #define ARP_RING_SIZE 2048
65 
66 #define MSG_RING_SIZE 32
67 
68 /*
69  * Configurable number of RX/TX ring descriptors
70  */
71 #define RX_QUEUE_SIZE 512
72 #define TX_QUEUE_SIZE 512
73 
74 #define MAX_PKT_BURST 32
75 #define BURST_TX_DRAIN_US 100 /* TX drain every ~100us */
76 
77 /*
78  * Try to avoid TX buffering if we have at least MAX_TX_BURST packets to send.
79  */
80 #define MAX_TX_BURST    (MAX_PKT_BURST / 2)
81 
82 #define NB_SOCKETS 8
83 
84 /* Configure how many packets ahead to prefetch, when reading packets */
85 #define PREFETCH_OFFSET    3
86 
87 #define MAX_RX_QUEUE_PER_LCORE 16
88 #define MAX_TX_QUEUE_PER_PORT RTE_MAX_ETHPORTS
89 #define MAX_RX_QUEUE_PER_PORT 128
90 
91 #define KNI_MBUF_MAX 2048
92 #define KNI_QUEUE_SIZE 2048
93 
94 static int enable_kni;
95 static int kni_accept;
96 
97 static int numa_on;
98 
99 static struct rte_timer freebsd_clock;
100 
101 // Mellanox Linux's driver key
102 static uint8_t default_rsskey_40bytes[40] = {
103     0xd1, 0x81, 0xc6, 0x2c, 0xf7, 0xf4, 0xdb, 0x5b,
104     0x19, 0x83, 0xa2, 0xfc, 0x94, 0x3e, 0x1a, 0xdb,
105     0xd9, 0x38, 0x9e, 0x6b, 0xd1, 0x03, 0x9c, 0x2c,
106     0xa7, 0x44, 0x99, 0xad, 0x59, 0x3d, 0x56, 0xd9,
107     0xf3, 0x25, 0x3c, 0x06, 0x2a, 0xdc, 0x1f, 0xfc
108 };
109 
110 static struct rte_eth_conf default_port_conf = {
111     .rxmode = {
112         .mq_mode = ETH_MQ_RX_RSS,
113         .max_rx_pkt_len = ETHER_MAX_LEN,
114         .split_hdr_size = 0, /**< hdr buf size */
115         .header_split   = 0, /**< Header Split disabled */
116         .hw_ip_checksum = 0, /**< IP checksum offload disabled */
117         .hw_vlan_filter = 0, /**< VLAN filtering disabled */
118         .hw_vlan_strip  = 0, /**< VLAN strip disabled. */
119         .hw_vlan_extend = 0, /**< Extended VLAN disabled. */
120         .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
121         .hw_strip_crc   = 0, /**< CRC stripped by hardware */
122         .enable_lro     = 0, /**< LRO disabled */
123     },
124     .rx_adv_conf = {
125         .rss_conf = {
126             .rss_key = default_rsskey_40bytes,
127             .rss_key_len = 40,
128             .rss_hf = ETH_RSS_PROTO_MASK,
129         },
130     },
131     .txmode = {
132         .mq_mode = ETH_MQ_TX_NONE,
133     },
134 };
135 
136 struct mbuf_table {
137     uint16_t len;
138     struct rte_mbuf *m_table[MAX_PKT_BURST];
139 };
140 
141 struct lcore_rx_queue {
142     uint8_t port_id;
143     uint8_t queue_id;
144 } __rte_cache_aligned;
145 
146 struct lcore_conf {
147     uint16_t proc_id;
148     uint16_t socket_id;
149     uint16_t nb_queue_list[RTE_MAX_ETHPORTS];
150     struct ff_port_cfg *port_cfgs;
151 
152     uint16_t nb_rx_queue;
153     struct lcore_rx_queue rx_queue_list[MAX_RX_QUEUE_PER_LCORE];
154     uint16_t nb_tx_port;
155     uint16_t tx_port_id[RTE_MAX_ETHPORTS];
156     uint16_t tx_queue_id[RTE_MAX_ETHPORTS];
157     struct mbuf_table tx_mbufs[RTE_MAX_ETHPORTS];
158     char *pcap[RTE_MAX_ETHPORTS];
159 } __rte_cache_aligned;
160 
161 static struct lcore_conf lcore_conf;
162 
163 static struct rte_mempool *pktmbuf_pool[NB_SOCKETS];
164 
165 static struct rte_ring **arp_ring[RTE_MAX_ETHPORTS];
166 
167 static uint16_t rss_reta_size[RTE_MAX_ETHPORTS];
168 
169 struct ff_msg_ring {
170     char ring_name[2][RTE_RING_NAMESIZE];
171     /* ring[0] for lcore recv msg, other send */
172     /* ring[1] for lcore send msg, other read */
173     struct rte_ring *ring[2];
174 } __rte_cache_aligned;
175 
176 static struct ff_msg_ring msg_ring[RTE_MAX_LCORE];
177 static struct rte_mempool *message_pool;
178 
179 struct ff_dpdk_if_context {
180     void *sc;
181     void *ifp;
182     uint16_t port_id;
183     struct ff_hw_features hw_features;
184 } __rte_cache_aligned;
185 
186 static struct ff_dpdk_if_context *veth_ctx[RTE_MAX_ETHPORTS];
187 
188 extern void ff_hardclock(void);
189 
190 static void
191 ff_hardclock_job(__rte_unused struct rte_timer *timer,
192     __rte_unused void *arg) {
193     ff_hardclock();
194     ff_update_current_ts();
195 }
196 
197 struct ff_dpdk_if_context *
198 ff_dpdk_register_if(void *sc, void *ifp, struct ff_port_cfg *cfg)
199 {
200     struct ff_dpdk_if_context *ctx;
201 
202     ctx = calloc(1, sizeof(struct ff_dpdk_if_context));
203     if (ctx == NULL)
204         return NULL;
205 
206     ctx->sc = sc;
207     ctx->ifp = ifp;
208     ctx->port_id = cfg->port_id;
209     ctx->hw_features = cfg->hw_features;
210 
211     return ctx;
212 }
213 
214 void
215 ff_dpdk_deregister_if(struct ff_dpdk_if_context *ctx)
216 {
217     free(ctx);
218 }
219 
220 static void
221 check_all_ports_link_status(void)
222 {
223     #define CHECK_INTERVAL 100 /* 100ms */
224     #define MAX_CHECK_TIME 90  /* 9s (90 * 100ms) in total */
225 
226     uint8_t portid, count, all_ports_up, print_flag = 0;
227     struct rte_eth_link link;
228 
229     printf("\nChecking link status");
230     fflush(stdout);
231 
232     int i, nb_ports;
233     nb_ports = ff_global_cfg.dpdk.nb_ports;
234     for (count = 0; count <= MAX_CHECK_TIME; count++) {
235         all_ports_up = 1;
236         for (i = 0; i < nb_ports; i++) {
237             uint8_t portid = ff_global_cfg.dpdk.portid_list[i];
238             memset(&link, 0, sizeof(link));
239             rte_eth_link_get_nowait(portid, &link);
240 
241             /* print link status if flag set */
242             if (print_flag == 1) {
243                 if (link.link_status) {
244                     printf("Port %d Link Up - speed %u "
245                         "Mbps - %s\n", (int)portid,
246                         (unsigned)link.link_speed,
247                         (link.link_duplex == ETH_LINK_FULL_DUPLEX) ?
248                         ("full-duplex") : ("half-duplex\n"));
249                 } else {
250                     printf("Port %d Link Down\n", (int)portid);
251                 }
252                 continue;
253             }
254             /* clear all_ports_up flag if any link down */
255             if (link.link_status == 0) {
256                 all_ports_up = 0;
257                 break;
258             }
259         }
260 
261         /* after finally printing all link status, get out */
262         if (print_flag == 1)
263             break;
264 
265         if (all_ports_up == 0) {
266             printf(".");
267             fflush(stdout);
268             rte_delay_ms(CHECK_INTERVAL);
269         }
270 
271         /* set the print_flag if all ports up or timeout */
272         if (all_ports_up == 1 || count == (MAX_CHECK_TIME - 1)) {
273             print_flag = 1;
274             printf("done\n");
275         }
276     }
277 }
278 
279 static int
280 init_lcore_conf(void)
281 {
282     uint8_t nb_dev_ports = rte_eth_dev_count();
283     if (nb_dev_ports == 0) {
284         rte_exit(EXIT_FAILURE, "No probed ethernet devices\n");
285     }
286 
287     if (ff_global_cfg.dpdk.max_portid >= nb_dev_ports) {
288         rte_exit(EXIT_FAILURE, "this machine doesn't have port %d.\n",
289                  ff_global_cfg.dpdk.max_portid);
290     }
291 
292     lcore_conf.port_cfgs = ff_global_cfg.dpdk.port_cfgs;
293     lcore_conf.proc_id = ff_global_cfg.dpdk.proc_id;
294 
295     uint16_t proc_id;
296     for (proc_id = 0; proc_id < ff_global_cfg.dpdk.nb_procs; proc_id++) {
297         uint16_t lcore_id = ff_global_cfg.dpdk.proc_lcore[proc_id];
298         if (!lcore_config[lcore_id].detected) {
299             rte_exit(EXIT_FAILURE, "lcore %u unavailable\n", lcore_id);
300         }
301     }
302 
303     uint16_t socket_id = 0;
304     if (numa_on) {
305         socket_id = rte_lcore_to_socket_id(rte_lcore_id());
306     }
307 
308     lcore_conf.socket_id = socket_id;
309 
310     uint16_t lcore_id = ff_global_cfg.dpdk.proc_lcore[lcore_conf.proc_id];
311     int j;
312     for (j = 0; j < ff_global_cfg.dpdk.nb_ports; ++j) {
313         uint16_t port_id = ff_global_cfg.dpdk.portid_list[j];
314         struct ff_port_cfg *pconf = &ff_global_cfg.dpdk.port_cfgs[port_id];
315 
316         int queueid = -1;
317         int i;
318         for (i = 0; i < pconf->nb_lcores; i++) {
319             if (pconf->lcore_list[i] == lcore_id) {
320                 queueid = i;
321             }
322         }
323         if (queueid < 0) {
324             continue;
325         }
326         printf("lcore: %u, port: %u, queue: %u\n", lcore_id, port_id, queueid);
327         uint16_t nb_rx_queue = lcore_conf.nb_rx_queue;
328         lcore_conf.rx_queue_list[nb_rx_queue].port_id = port_id;
329         lcore_conf.rx_queue_list[nb_rx_queue].queue_id = queueid;
330         lcore_conf.nb_rx_queue++;
331 
332         lcore_conf.tx_queue_id[port_id] = queueid;
333         lcore_conf.tx_port_id[lcore_conf.nb_tx_port] = port_id;
334         lcore_conf.nb_tx_port++;
335 
336         lcore_conf.pcap[port_id] = pconf->pcap;
337         lcore_conf.nb_queue_list[port_id] = pconf->nb_lcores;
338     }
339 
340     return 0;
341 }
342 
343 static int
344 init_mem_pool(void)
345 {
346     uint8_t nb_ports = ff_global_cfg.dpdk.nb_ports;
347     uint32_t nb_lcores = ff_global_cfg.dpdk.nb_procs;
348     uint32_t nb_tx_queue = nb_lcores;
349     uint32_t nb_rx_queue = lcore_conf.nb_rx_queue * nb_lcores;
350 
351     unsigned nb_mbuf = RTE_MAX (
352         (nb_rx_queue*RX_QUEUE_SIZE          +
353         nb_ports*nb_lcores*MAX_PKT_BURST    +
354         nb_ports*nb_tx_queue*TX_QUEUE_SIZE  +
355         nb_lcores*MEMPOOL_CACHE_SIZE +
356         nb_ports*KNI_MBUF_MAX +
357         nb_ports*KNI_QUEUE_SIZE +
358         nb_lcores*nb_ports*ARP_RING_SIZE),
359         (unsigned)8192);
360 
361     unsigned socketid = 0;
362     uint16_t i, lcore_id;
363     char s[64];
364 
365     for (i = 0; i < ff_global_cfg.dpdk.nb_procs; i++) {
366         lcore_id = ff_global_cfg.dpdk.proc_lcore[i];
367         if (numa_on) {
368             socketid = rte_lcore_to_socket_id(lcore_id);
369         }
370 
371         if (socketid >= NB_SOCKETS) {
372             rte_exit(EXIT_FAILURE, "Socket %d of lcore %u is out of range %d\n",
373                 socketid, i, NB_SOCKETS);
374         }
375 
376         if (pktmbuf_pool[socketid] != NULL) {
377             continue;
378         }
379 
380         if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
381             snprintf(s, sizeof(s), "mbuf_pool_%d", socketid);
382             pktmbuf_pool[socketid] =
383                 rte_pktmbuf_pool_create(s, nb_mbuf,
384                     MEMPOOL_CACHE_SIZE, 0,
385                     RTE_MBUF_DEFAULT_BUF_SIZE, socketid);
386         } else {
387             snprintf(s, sizeof(s), "mbuf_pool_%d", socketid);
388             pktmbuf_pool[socketid] = rte_mempool_lookup(s);
389         }
390 
391         if (pktmbuf_pool[socketid] == NULL) {
392             rte_exit(EXIT_FAILURE, "Cannot create mbuf pool on socket %d\n", socketid);
393         } else {
394             printf("create mbuf pool on socket %d\n", socketid);
395         }
396     }
397 
398     return 0;
399 }
400 
401 static struct rte_ring *
402 create_ring(const char *name, unsigned count, int socket_id, unsigned flags)
403 {
404     struct rte_ring *ring;
405 
406     if (name == NULL)
407         return NULL;
408 
409     /* If already create, just attached it */
410     if (likely((ring = rte_ring_lookup(name)) != NULL))
411         return ring;
412 
413     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
414         return rte_ring_create(name, count, socket_id, flags);
415     } else {
416         return rte_ring_lookup(name);
417     }
418 }
419 
420 static int
421 init_arp_ring(void)
422 {
423     int j;
424     char name_buf[RTE_RING_NAMESIZE];
425     int queueid;
426 
427     unsigned socketid = lcore_conf.socket_id;
428 
429     /* Create ring according to ports actually being used. */
430     int nb_ports = ff_global_cfg.dpdk.nb_ports;
431     for (j = 0; j < nb_ports; j++) {
432         uint16_t portid = ff_global_cfg.dpdk.portid_list[j];
433         struct ff_port_cfg *pconf = &ff_global_cfg.dpdk.port_cfgs[portid];
434         int nb_queues = pconf->nb_lcores;
435         if (arp_ring[portid] == NULL) {
436             snprintf(name_buf, RTE_RING_NAMESIZE, "ring_ptr_p%d", portid);
437 
438             arp_ring[portid] = rte_zmalloc(name_buf,
439                                       sizeof(struct rte_ring *) * nb_queues,
440                                       RTE_CACHE_LINE_SIZE);
441             if (arp_ring[portid] == NULL) {
442                 rte_exit(EXIT_FAILURE, "rte_zmalloc(%s (struct rte_ring*)) "
443                          "failed\n", name_buf);
444             }
445         }
446 
447         for(queueid = 0; queueid < nb_queues; ++queueid) {
448             snprintf(name_buf, RTE_RING_NAMESIZE, "arp_ring_p%d_q%d", portid, queueid);
449             arp_ring[portid][queueid] = create_ring(name_buf, ARP_RING_SIZE,
450                 socketid, RING_F_SC_DEQ);
451 
452             if (arp_ring[portid][queueid] == NULL)
453                 rte_panic("create ring:%s failed!\n", name_buf);
454 
455             printf("create ring:%s success, %u ring entries are now free!\n",
456                 name_buf, rte_ring_free_count(arp_ring[portid][queueid]));
457         }
458     }
459 
460     return 0;
461 }
462 
463 static void
464 ff_msg_init(struct rte_mempool *mp,
465     __attribute__((unused)) void *opaque_arg,
466     void *obj, __attribute__((unused)) unsigned i)
467 {
468     struct ff_msg *msg = (struct ff_msg *)obj;
469     msg->msg_type = FF_UNKNOWN;
470     msg->buf_addr = (char *)msg + sizeof(struct ff_msg);
471     msg->buf_len = mp->elt_size - sizeof(struct ff_msg);
472 }
473 
474 static int
475 init_msg_ring(void)
476 {
477     uint16_t i;
478     uint16_t nb_procs = ff_global_cfg.dpdk.nb_procs;
479     unsigned socketid = lcore_conf.socket_id;
480 
481     /* Create message buffer pool */
482     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
483         message_pool = rte_mempool_create(FF_MSG_POOL,
484            MSG_RING_SIZE * 2 * nb_procs,
485            MAX_MSG_BUF_SIZE, MSG_RING_SIZE / 2, 0,
486            NULL, NULL, ff_msg_init, NULL,
487            socketid, 0);
488     } else {
489         message_pool = rte_mempool_lookup(FF_MSG_POOL);
490     }
491 
492     if (message_pool == NULL) {
493         rte_panic("Create msg mempool failed\n");
494     }
495 
496     for(i = 0; i < nb_procs; ++i) {
497         snprintf(msg_ring[i].ring_name[0], RTE_RING_NAMESIZE,
498             "%s%u", FF_MSG_RING_IN, i);
499         snprintf(msg_ring[i].ring_name[1], RTE_RING_NAMESIZE,
500             "%s%u", FF_MSG_RING_OUT, i);
501 
502         msg_ring[i].ring[0] = create_ring(msg_ring[i].ring_name[0],
503             MSG_RING_SIZE, socketid, RING_F_SP_ENQ | RING_F_SC_DEQ);
504         if (msg_ring[i].ring[0] == NULL)
505             rte_panic("create ring::%s failed!\n", msg_ring[i].ring_name[0]);
506 
507         msg_ring[i].ring[1] = create_ring(msg_ring[i].ring_name[1],
508             MSG_RING_SIZE, socketid, RING_F_SP_ENQ | RING_F_SC_DEQ);
509         if (msg_ring[i].ring[1] == NULL)
510             rte_panic("create ring::%s failed!\n", msg_ring[i].ring_name[0]);
511     }
512 
513     return 0;
514 }
515 
516 static int
517 init_kni(void)
518 {
519     int nb_ports = rte_eth_dev_count();
520     kni_accept = 0;
521     if(strcasecmp(ff_global_cfg.kni.method, "accept") == 0)
522         kni_accept = 1;
523 
524     ff_kni_init(nb_ports, ff_global_cfg.kni.tcp_port,
525         ff_global_cfg.kni.udp_port);
526 
527     unsigned socket_id = lcore_conf.socket_id;
528     struct rte_mempool *mbuf_pool = pktmbuf_pool[socket_id];
529 
530     nb_ports = ff_global_cfg.dpdk.nb_ports;
531     int i, ret;
532     for (i = 0; i < nb_ports; i++) {
533         uint16_t port_id = ff_global_cfg.dpdk.portid_list[i];
534         ff_kni_alloc(port_id, socket_id, mbuf_pool, KNI_QUEUE_SIZE);
535     }
536 
537     return 0;
538 }
539 
540 static void
541 set_rss_table(uint8_t port_id, uint16_t reta_size, uint16_t nb_queues)
542 {
543     if (reta_size == 0) {
544         return;
545     }
546 
547     int reta_conf_size = RTE_MAX(1, reta_size / RTE_RETA_GROUP_SIZE);
548     struct rte_eth_rss_reta_entry64 reta_conf[reta_conf_size];
549 
550     /* config HW indirection table */
551     unsigned i, j, hash=0;
552     for (i = 0; i < reta_conf_size; i++) {
553         reta_conf[i].mask = ~0ULL;
554         for (j = 0; j < RTE_RETA_GROUP_SIZE; j++) {
555             reta_conf[i].reta[j] = hash++ % nb_queues;
556         }
557     }
558 
559     if (rte_eth_dev_rss_reta_update(port_id, reta_conf, reta_size)) {
560         rte_exit(EXIT_FAILURE, "port[%d], failed to update rss table\n",
561             port_id);
562     }
563 }
564 
565 static int
566 init_port_start(void)
567 {
568     int nb_ports = ff_global_cfg.dpdk.nb_ports;
569     unsigned socketid = rte_lcore_to_socket_id(rte_lcore_id());
570     struct rte_mempool *mbuf_pool = pktmbuf_pool[socketid];
571     uint16_t i;
572 
573     for (i = 0; i < nb_ports; i++) {
574         uint16_t port_id = ff_global_cfg.dpdk.portid_list[i];
575         struct ff_port_cfg *pconf = &ff_global_cfg.dpdk.port_cfgs[port_id];
576         uint16_t nb_queues = pconf->nb_lcores;
577 
578         struct rte_eth_dev_info dev_info;
579         rte_eth_dev_info_get(port_id, &dev_info);
580 
581         if (nb_queues > dev_info.max_rx_queues) {
582             rte_exit(EXIT_FAILURE, "num_procs[%d] bigger than max_rx_queues[%d]\n",
583                 nb_queues,
584                 dev_info.max_rx_queues);
585         }
586 
587         if (nb_queues > dev_info.max_tx_queues) {
588             rte_exit(EXIT_FAILURE, "num_procs[%d] bigger than max_tx_queues[%d]\n",
589                 nb_queues,
590                 dev_info.max_tx_queues);
591         }
592 
593         struct ether_addr addr;
594         rte_eth_macaddr_get(port_id, &addr);
595         printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
596                    " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
597                 (unsigned)port_id,
598                 addr.addr_bytes[0], addr.addr_bytes[1],
599                 addr.addr_bytes[2], addr.addr_bytes[3],
600                 addr.addr_bytes[4], addr.addr_bytes[5]);
601 
602         rte_memcpy(pconf->mac,
603             addr.addr_bytes, ETHER_ADDR_LEN);
604 
605         /* Clear txq_flags - we do not need multi-mempool and refcnt */
606         dev_info.default_txconf.txq_flags = ETH_TXQ_FLAGS_NOMULTMEMP |
607             ETH_TXQ_FLAGS_NOREFCOUNT;
608 
609         /* Disable features that are not supported by port's HW */
610         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM)) {
611             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMUDP;
612         }
613 
614         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)) {
615             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMTCP;
616         }
617 
618         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_SCTP_CKSUM)) {
619             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMSCTP;
620         }
621 
622         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_VLAN_INSERT)) {
623             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOVLANOFFL;
624         }
625 
626         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_VLAN_INSERT)) {
627             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOVLANOFFL;
628         }
629 
630         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_TSO) &&
631             !(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_TSO)) {
632             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOMULTSEGS;
633         }
634 
635         struct rte_eth_conf port_conf = {0};
636 
637         /* Set RSS mode */
638         port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
639         port_conf.rx_adv_conf.rss_conf.rss_hf = ETH_RSS_PROTO_MASK;
640         port_conf.rx_adv_conf.rss_conf.rss_key = default_rsskey_40bytes;
641         port_conf.rx_adv_conf.rss_conf.rss_key_len = 40;
642 
643         /* Set Rx VLAN stripping */
644         if (ff_global_cfg.dpdk.vlan_strip) {
645             if (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_VLAN_STRIP) {
646                 port_conf.rxmode.hw_vlan_strip = 1;
647             }
648         }
649 
650         /* Enable HW CRC stripping */
651         port_conf.rxmode.hw_strip_crc = 1;
652 
653         /* FIXME: Enable TCP LRO ?*/
654         #if 0
655         if (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_LRO) {
656             printf("LRO is supported\n");
657             port_conf.rxmode.enable_lro = 1;
658             pconf->hw_features.rx_lro = 1;
659         }
660         #endif
661 
662         /* Set Rx checksum checking */
663         if ((dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
664             (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_UDP_CKSUM) &&
665             (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)) {
666             printf("RX checksum offload supported\n");
667             port_conf.rxmode.hw_ip_checksum = 1;
668             pconf->hw_features.rx_csum = 1;
669         }
670 
671         if ((dev_info.tx_offload_capa & DEV_TX_OFFLOAD_IPV4_CKSUM)) {
672             printf("TX ip checksum offload supported\n");
673             pconf->hw_features.tx_csum_ip = 1;
674         }
675 
676         if ((dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM) &&
677             (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)) {
678             printf("TX TCP&UDP checksum offload supported\n");
679             pconf->hw_features.tx_csum_l4 = 1;
680         }
681 
682         if (ff_global_cfg.dpdk.tso) {
683             if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_TSO) {
684                 printf("TSO is supported\n");
685                 pconf->hw_features.tx_tso = 1;
686             }
687         } else {
688             printf("TSO is disabled\n");
689         }
690 
691         if (dev_info.reta_size) {
692             /* reta size must be power of 2 */
693             assert((dev_info.reta_size & (dev_info.reta_size - 1)) == 0);
694 
695             rss_reta_size[port_id] = dev_info.reta_size;
696             printf("port[%d]: rss table size: %d\n", port_id,
697                 dev_info.reta_size);
698         }
699 
700         if (rte_eal_process_type() != RTE_PROC_PRIMARY) {
701             continue;
702         }
703 
704         int ret = rte_eth_dev_configure(port_id, nb_queues, nb_queues, &port_conf);
705         if (ret != 0) {
706             return ret;
707         }
708         uint16_t q;
709         for (q = 0; q < nb_queues; q++) {
710             ret = rte_eth_tx_queue_setup(port_id, q, TX_QUEUE_SIZE,
711                 socketid, &dev_info.default_txconf);
712             if (ret < 0) {
713                 return ret;
714             }
715 
716             ret = rte_eth_rx_queue_setup(port_id, q, RX_QUEUE_SIZE,
717                 socketid, &dev_info.default_rxconf, mbuf_pool);
718             if (ret < 0) {
719                 return ret;
720             }
721         }
722 
723         ret = rte_eth_dev_start(port_id);
724         if (ret < 0) {
725             return ret;
726         }
727 
728         if (nb_queues > 1) {
729             /* set HW rss hash function to Toeplitz. */
730             if (!rte_eth_dev_filter_supported(port_id, RTE_ETH_FILTER_HASH)) {
731                 struct rte_eth_hash_filter_info info = {0};
732                 info.info_type = RTE_ETH_HASH_FILTER_GLOBAL_CONFIG;
733                 info.info.global_conf.hash_func = RTE_ETH_HASH_FUNCTION_TOEPLITZ;
734 
735                 if (rte_eth_dev_filter_ctrl(port_id, RTE_ETH_FILTER_HASH,
736                     RTE_ETH_FILTER_SET, &info) < 0) {
737                     rte_exit(EXIT_FAILURE, "port[%d] set hash func failed\n",
738                         port_id);
739                 }
740             }
741 
742             set_rss_table(port_id, dev_info.reta_size, nb_queues);
743         }
744 
745         /* Enable RX in promiscuous mode for the Ethernet device. */
746         if (ff_global_cfg.dpdk.promiscuous) {
747             rte_eth_promiscuous_enable(port_id);
748             ret = rte_eth_promiscuous_get(port_id);
749             if (ret == 1) {
750                 printf("set port %u to promiscuous mode ok\n", port_id);
751             } else {
752                 printf("set port %u to promiscuous mode error\n", port_id);
753             }
754         }
755 
756         /* Enable pcap dump */
757         if (pconf->pcap) {
758             ff_enable_pcap(pconf->pcap);
759         }
760     }
761 
762     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
763         check_all_ports_link_status();
764     }
765 
766     return 0;
767 }
768 
769 static int
770 init_clock(void)
771 {
772     rte_timer_subsystem_init();
773     uint64_t hz = rte_get_timer_hz();
774     uint64_t intrs = MS_PER_S/ff_global_cfg.freebsd.hz;
775     uint64_t tsc = (hz + MS_PER_S - 1) / MS_PER_S*intrs;
776 
777     rte_timer_init(&freebsd_clock);
778     rte_timer_reset(&freebsd_clock, tsc, PERIODICAL,
779         rte_lcore_id(), &ff_hardclock_job, NULL);
780 
781     ff_update_current_ts();
782 
783     return 0;
784 }
785 
786 int
787 ff_dpdk_init(int argc, char **argv)
788 {
789     if (ff_global_cfg.dpdk.nb_procs < 1 ||
790         ff_global_cfg.dpdk.nb_procs > RTE_MAX_LCORE ||
791         ff_global_cfg.dpdk.proc_id >= ff_global_cfg.dpdk.nb_procs ||
792         ff_global_cfg.dpdk.proc_id < 0) {
793         printf("param num_procs[%d] or proc_id[%d] error!\n",
794             ff_global_cfg.dpdk.nb_procs,
795             ff_global_cfg.dpdk.proc_id);
796         exit(1);
797     }
798 
799     int ret = rte_eal_init(argc, argv);
800     if (ret < 0) {
801         rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
802     }
803 
804     numa_on = ff_global_cfg.dpdk.numa_on;
805 
806     init_lcore_conf();
807 
808     init_mem_pool();
809 
810     init_arp_ring();
811 
812     init_msg_ring();
813 
814     enable_kni = ff_global_cfg.kni.enable;
815     if (enable_kni) {
816         init_kni();
817     }
818 
819     ret = init_port_start();
820     if (ret < 0) {
821         rte_exit(EXIT_FAILURE, "init_port_start failed\n");
822     }
823 
824     init_clock();
825 
826     return 0;
827 }
828 
829 static void
830 ff_veth_input(const struct ff_dpdk_if_context *ctx, struct rte_mbuf *pkt)
831 {
832     uint8_t rx_csum = ctx->hw_features.rx_csum;
833     if (rx_csum) {
834         if (pkt->ol_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD)) {
835             return;
836         }
837     }
838 
839     /*
840      * FIXME: should we save pkt->vlan_tci
841      * if (pkt->ol_flags & PKT_RX_VLAN_PKT)
842      */
843 
844     void *data = rte_pktmbuf_mtod(pkt, void*);
845     uint16_t len = rte_pktmbuf_data_len(pkt);
846 
847     void *hdr = ff_mbuf_gethdr(pkt, pkt->pkt_len, data, len, rx_csum);
848     if (hdr == NULL) {
849         rte_pktmbuf_free(pkt);
850         return;
851     }
852 
853     struct rte_mbuf *pn = pkt->next;
854     void *prev = hdr;
855     while(pn != NULL) {
856         data = rte_pktmbuf_mtod(pkt, void*);
857         len = rte_pktmbuf_data_len(pkt);
858 
859         void *mb = ff_mbuf_get(prev, data, len);
860         if (mb == NULL) {
861             ff_mbuf_free(hdr);
862             rte_pktmbuf_free(pkt);
863             return;
864         }
865         pn = pn->next;
866         prev = mb;
867     }
868 
869     ff_veth_process_packet(ctx->ifp, hdr);
870 }
871 
872 static enum FilterReturn
873 protocol_filter(const void *data, uint16_t len)
874 {
875     if(len < sizeof(struct ether_hdr))
876         return FILTER_UNKNOWN;
877 
878     const struct ether_hdr *hdr;
879     hdr = (const struct ether_hdr *)data;
880 
881     if(ntohs(hdr->ether_type) == ETHER_TYPE_ARP)
882         return FILTER_ARP;
883 
884     if (!enable_kni) {
885         return FILTER_UNKNOWN;
886     }
887 
888     if(ntohs(hdr->ether_type) != ETHER_TYPE_IPv4)
889         return FILTER_UNKNOWN;
890 
891     return ff_kni_proto_filter(data + sizeof(struct ether_hdr),
892         len - sizeof(struct ether_hdr));
893 }
894 
895 static inline void
896 process_packets(uint8_t port_id, uint16_t queue_id, struct rte_mbuf **bufs,
897     uint16_t count, const struct ff_dpdk_if_context *ctx, int pkts_from_ring)
898 {
899     struct lcore_conf *qconf = &lcore_conf;
900 
901     uint16_t i;
902     for (i = 0; i < count; i++) {
903         struct rte_mbuf *rtem = bufs[i];
904 
905         if (unlikely(qconf->pcap[port_id] != NULL)) {
906             ff_dump_packets(qconf->pcap[port_id], rtem);
907         }
908 
909         void *data = rte_pktmbuf_mtod(rtem, void*);
910         uint16_t len = rte_pktmbuf_data_len(rtem);
911 
912         enum FilterReturn filter = protocol_filter(data, len);
913         if (filter == FILTER_ARP) {
914             struct rte_mempool *mbuf_pool;
915             struct rte_mbuf *mbuf_clone;
916             if (pkts_from_ring == 0) {
917                 uint16_t i;
918                 uint16_t nb_queues = qconf->nb_queue_list[port_id];
919                 for(i = 0; i < nb_queues; ++i) {
920                     if(i == queue_id)
921                         continue;
922 
923                     unsigned socket_id = 0;
924                     if (numa_on) {
925                         uint16_t lcore_id = qconf->port_cfgs[port_id].lcore_list[i];
926                         socket_id = rte_lcore_to_socket_id(lcore_id);
927                     }
928                     mbuf_pool = pktmbuf_pool[socket_id];
929                     mbuf_clone = rte_pktmbuf_clone(rtem, mbuf_pool);
930                     if(mbuf_clone) {
931                         int ret = rte_ring_enqueue(arp_ring[port_id][i], mbuf_clone);
932                         if (ret < 0)
933                             rte_pktmbuf_free(mbuf_clone);
934                     }
935                 }
936             }
937 
938             if (enable_kni && rte_eal_process_type() == RTE_PROC_PRIMARY) {
939                 mbuf_pool = pktmbuf_pool[qconf->socket_id];
940                 mbuf_clone = rte_pktmbuf_clone(rtem, mbuf_pool);
941                 if(mbuf_clone) {
942                     ff_kni_enqueue(port_id, mbuf_clone);
943                 }
944             }
945 
946             ff_veth_input(ctx, rtem);
947         } else if (enable_kni && ((filter == FILTER_KNI && kni_accept) ||
948             (filter == FILTER_UNKNOWN && !kni_accept)) ) {
949             ff_kni_enqueue(port_id, rtem);
950         } else {
951             ff_veth_input(ctx, rtem);
952         }
953     }
954 }
955 
956 static inline int
957 process_arp_ring(uint8_t port_id, uint16_t queue_id,
958     struct rte_mbuf **pkts_burst, const struct ff_dpdk_if_context *ctx)
959 {
960     /* read packet from ring buf and to process */
961     uint16_t nb_rb;
962     nb_rb = rte_ring_dequeue_burst(arp_ring[port_id][queue_id],
963         (void **)pkts_burst, MAX_PKT_BURST);
964 
965     if(nb_rb > 0) {
966         process_packets(port_id, queue_id, pkts_burst, nb_rb, ctx, 1);
967     }
968 
969     return 0;
970 }
971 
972 static inline void
973 handle_sysctl_msg(struct ff_msg *msg, uint16_t proc_id)
974 {
975     int ret = ff_sysctl(msg->sysctl.name, msg->sysctl.namelen,
976         msg->sysctl.old, msg->sysctl.oldlenp, msg->sysctl.new,
977         msg->sysctl.newlen);
978 
979     if (ret < 0) {
980         msg->result = errno;
981     } else {
982         msg->result = 0;
983     }
984 
985     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
986 }
987 
988 static inline void
989 handle_ioctl_msg(struct ff_msg *msg, uint16_t proc_id)
990 {
991     int fd, ret;
992     fd = ff_socket(AF_INET, SOCK_DGRAM, 0);
993     if (fd < 0) {
994         ret = -1;
995         goto done;
996     }
997 
998     ret = ff_ioctl(fd, msg->ioctl.cmd, msg->ioctl.data);
999 
1000     ff_close(fd);
1001 
1002 done:
1003     if (ret < 0) {
1004         msg->result = errno;
1005     } else {
1006         msg->result = 0;
1007     }
1008 
1009     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
1010 }
1011 
1012 static inline void
1013 handle_route_msg(struct ff_msg *msg, uint16_t proc_id)
1014 {
1015     msg->result = ff_rtioctl(msg->route.fib, msg->route.data,
1016         &msg->route.len, msg->route.maxlen);
1017 
1018     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
1019 }
1020 
1021 static struct ff_top_args ff_status;
1022 static inline void
1023 handle_top_msg(struct ff_msg *msg, uint16_t proc_id)
1024 {
1025     msg->top = ff_status;
1026     msg->result = 0;
1027 
1028     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
1029 }
1030 
1031 static inline void
1032 handle_default_msg(struct ff_msg *msg, uint16_t proc_id)
1033 {
1034     msg->result = EINVAL;
1035     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
1036 }
1037 
1038 static inline void
1039 handle_msg(struct ff_msg *msg, uint16_t proc_id)
1040 {
1041     switch (msg->msg_type) {
1042         case FF_SYSCTL:
1043             handle_sysctl_msg(msg, proc_id);
1044             break;
1045         case FF_IOCTL:
1046             handle_ioctl_msg(msg, proc_id);
1047             break;
1048         case FF_ROUTE:
1049             handle_route_msg(msg, proc_id);
1050             break;
1051         case FF_TOP:
1052             handle_top_msg(msg, proc_id);
1053             break;
1054         default:
1055             handle_default_msg(msg, proc_id);
1056             break;
1057     }
1058 }
1059 
1060 static inline int
1061 process_msg_ring(uint16_t proc_id)
1062 {
1063     void *msg;
1064     int ret = rte_ring_dequeue(msg_ring[proc_id].ring[0], &msg);
1065 
1066     if (unlikely(ret == 0)) {
1067         handle_msg((struct ff_msg *)msg, proc_id);
1068     }
1069 
1070     return 0;
1071 }
1072 
1073 /* Send burst of packets on an output interface */
1074 static inline int
1075 send_burst(struct lcore_conf *qconf, uint16_t n, uint8_t port)
1076 {
1077     struct rte_mbuf **m_table;
1078     int ret;
1079     uint16_t queueid;
1080 
1081     queueid = qconf->tx_queue_id[port];
1082     m_table = (struct rte_mbuf **)qconf->tx_mbufs[port].m_table;
1083 
1084     if (unlikely(qconf->pcap[port] != NULL)) {
1085         uint16_t i;
1086         for (i = 0; i < n; i++) {
1087             ff_dump_packets(qconf->pcap[port], m_table[i]);
1088         }
1089     }
1090 
1091     ret = rte_eth_tx_burst(port, queueid, m_table, n);
1092     if (unlikely(ret < n)) {
1093         do {
1094             rte_pktmbuf_free(m_table[ret]);
1095         } while (++ret < n);
1096     }
1097 
1098     return 0;
1099 }
1100 
1101 /* Enqueue a single packet, and send burst if queue is filled */
1102 static inline int
1103 send_single_packet(struct rte_mbuf *m, uint8_t port)
1104 {
1105     uint16_t len;
1106     struct lcore_conf *qconf;
1107 
1108     qconf = &lcore_conf;
1109     len = qconf->tx_mbufs[port].len;
1110     qconf->tx_mbufs[port].m_table[len] = m;
1111     len++;
1112 
1113     /* enough pkts to be sent */
1114     if (unlikely(len == MAX_PKT_BURST)) {
1115         send_burst(qconf, MAX_PKT_BURST, port);
1116         len = 0;
1117     }
1118 
1119     qconf->tx_mbufs[port].len = len;
1120     return 0;
1121 }
1122 
1123 int
1124 ff_dpdk_if_send(struct ff_dpdk_if_context *ctx, void *m,
1125     int total)
1126 {
1127     struct rte_mempool *mbuf_pool = pktmbuf_pool[lcore_conf.socket_id];
1128     struct rte_mbuf *head = rte_pktmbuf_alloc(mbuf_pool);
1129     if (head == NULL) {
1130         ff_mbuf_free(m);
1131         return -1;
1132     }
1133 
1134     head->pkt_len = total;
1135     head->nb_segs = 0;
1136 
1137     int off = 0;
1138     struct rte_mbuf *cur = head, *prev = NULL;
1139     while(total > 0) {
1140         if (cur == NULL) {
1141             cur = rte_pktmbuf_alloc(mbuf_pool);
1142             if (cur == NULL) {
1143                 rte_pktmbuf_free(head);
1144                 ff_mbuf_free(m);
1145                 return -1;
1146             }
1147         }
1148 
1149         void *data = rte_pktmbuf_mtod(cur, void*);
1150         int len = total > RTE_MBUF_DEFAULT_DATAROOM ? RTE_MBUF_DEFAULT_DATAROOM : total;
1151         int ret = ff_mbuf_copydata(m, data, off, len);
1152         if (ret < 0) {
1153             rte_pktmbuf_free(head);
1154             ff_mbuf_free(m);
1155             return -1;
1156         }
1157 
1158         if (prev != NULL) {
1159             prev->next = cur;
1160         }
1161         prev = cur;
1162 
1163         cur->data_len = len;
1164         off += len;
1165         total -= len;
1166         head->nb_segs++;
1167         cur = NULL;
1168     }
1169 
1170     struct ff_tx_offload offload = {0};
1171     ff_mbuf_tx_offload(m, &offload);
1172 
1173     if (offload.ip_csum) {
1174         head->ol_flags |= PKT_TX_IP_CKSUM;
1175         head->l2_len = sizeof(struct ether_hdr);
1176         head->l3_len = sizeof(struct ipv4_hdr);
1177     }
1178 
1179     if (ctx->hw_features.tx_csum_l4) {
1180         if (offload.tcp_csum) {
1181             head->ol_flags |= PKT_TX_TCP_CKSUM;
1182             head->l2_len = sizeof(struct ether_hdr);
1183             head->l3_len = sizeof(struct ipv4_hdr);
1184         }
1185 
1186         if (offload.tso_seg_size) {
1187             head->ol_flags |= PKT_TX_TCP_SEG;
1188             head->l4_len = sizeof(struct tcp_hdr);
1189             head->tso_segsz = offload.tso_seg_size;
1190         }
1191 
1192         if (offload.udp_csum) {
1193             head->ol_flags |= PKT_TX_UDP_CKSUM;
1194             head->l2_len = sizeof(struct ether_hdr);
1195             head->l3_len = sizeof(struct ipv4_hdr);
1196         }
1197     }
1198 
1199     ff_mbuf_free(m);
1200 
1201     return send_single_packet(head, ctx->port_id);
1202 }
1203 
1204 static int
1205 main_loop(void *arg)
1206 {
1207     struct loop_routine *lr = (struct loop_routine *)arg;
1208 
1209     struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
1210     unsigned lcore_id;
1211     uint64_t prev_tsc, diff_tsc, cur_tsc, usch_tsc, div_tsc, usr_tsc, sys_tsc, end_tsc;
1212     int i, j, nb_rx, idle;
1213     uint8_t port_id, queue_id;
1214     struct lcore_conf *qconf;
1215     const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) /
1216         US_PER_S * BURST_TX_DRAIN_US;
1217     struct ff_dpdk_if_context *ctx;
1218 
1219     prev_tsc = 0;
1220     usch_tsc = 0;
1221 
1222     lcore_id = rte_lcore_id();
1223     qconf = &lcore_conf;
1224 
1225     if (qconf->nb_rx_queue == 0) {
1226         printf("lcore %u has nothing to do\n", lcore_id);
1227         return 0;
1228     }
1229 
1230     while (1) {
1231         cur_tsc = rte_rdtsc();
1232         if (unlikely(freebsd_clock.expire < cur_tsc)) {
1233             rte_timer_manage();
1234         }
1235 
1236         idle = 1;
1237         sys_tsc = 0;
1238         usr_tsc = 0;
1239 
1240         /*
1241          * TX burst queue drain
1242          */
1243         diff_tsc = cur_tsc - prev_tsc;
1244         if (unlikely(diff_tsc > drain_tsc)) {
1245             for (i = 0; i < qconf->nb_tx_port; i++) {
1246                 port_id = qconf->tx_port_id[i];
1247                 if (qconf->tx_mbufs[port_id].len == 0)
1248                     continue;
1249 
1250                 idle = 0;
1251 
1252                 send_burst(qconf,
1253                     qconf->tx_mbufs[port_id].len,
1254                     port_id);
1255                 qconf->tx_mbufs[port_id].len = 0;
1256             }
1257 
1258             prev_tsc = cur_tsc;
1259         }
1260 
1261         /*
1262          * Read packet from RX queues
1263          */
1264         for (i = 0; i < qconf->nb_rx_queue; ++i) {
1265             port_id = qconf->rx_queue_list[i].port_id;
1266             queue_id = qconf->rx_queue_list[i].queue_id;
1267             ctx = veth_ctx[port_id];
1268 
1269             if (enable_kni && rte_eal_process_type() == RTE_PROC_PRIMARY) {
1270                 ff_kni_process(port_id, queue_id, pkts_burst, MAX_PKT_BURST);
1271             }
1272 
1273             process_arp_ring(port_id, queue_id, pkts_burst, ctx);
1274 
1275             nb_rx = rte_eth_rx_burst(port_id, queue_id, pkts_burst,
1276                 MAX_PKT_BURST);
1277             if (nb_rx == 0)
1278                 continue;
1279 
1280             idle = 0;
1281 
1282             /* Prefetch first packets */
1283             for (j = 0; j < PREFETCH_OFFSET && j < nb_rx; j++) {
1284                 rte_prefetch0(rte_pktmbuf_mtod(
1285                         pkts_burst[j], void *));
1286             }
1287 
1288             /* Prefetch and handle already prefetched packets */
1289             for (j = 0; j < (nb_rx - PREFETCH_OFFSET); j++) {
1290                 rte_prefetch0(rte_pktmbuf_mtod(pkts_burst[
1291                         j + PREFETCH_OFFSET], void *));
1292                 process_packets(port_id, queue_id, &pkts_burst[j], 1, ctx, 0);
1293             }
1294 
1295             /* Handle remaining prefetched packets */
1296             for (; j < nb_rx; j++) {
1297                 process_packets(port_id, queue_id, &pkts_burst[j], 1, ctx, 0);
1298             }
1299         }
1300 
1301         process_msg_ring(qconf->proc_id);
1302 
1303         div_tsc = rte_rdtsc();
1304 
1305         if (likely(lr->loop != NULL && (!idle || cur_tsc - usch_tsc > drain_tsc))) {
1306             usch_tsc = cur_tsc;
1307             lr->loop(lr->arg);
1308         }
1309 
1310         end_tsc = rte_rdtsc();
1311 
1312         if (usch_tsc == cur_tsc) {
1313             usr_tsc = end_tsc - div_tsc;
1314         }
1315 
1316         if (!idle) {
1317             sys_tsc = div_tsc - cur_tsc;
1318             ff_status.sys_tsc += sys_tsc;
1319         }
1320 
1321         ff_status.usr_tsc += usr_tsc;
1322         ff_status.work_tsc += end_tsc - cur_tsc;
1323         ff_status.idle_tsc += end_tsc - cur_tsc - usr_tsc - sys_tsc;
1324 
1325         ff_status.loops++;
1326     }
1327 }
1328 
1329 int
1330 ff_dpdk_if_up(void) {
1331     int i;
1332     struct lcore_conf *qconf = &lcore_conf;
1333     for (i = 0; i < qconf->nb_tx_port; i++) {
1334         uint16_t port_id = qconf->tx_port_id[i];
1335 
1336         struct ff_port_cfg *pconf = &qconf->port_cfgs[port_id];
1337         veth_ctx[port_id] = ff_veth_attach(pconf);
1338         if (veth_ctx[port_id] == NULL) {
1339             rte_exit(EXIT_FAILURE, "ff_veth_attach failed");
1340         }
1341     }
1342 
1343     return 0;
1344 }
1345 
1346 void
1347 ff_dpdk_run(loop_func_t loop, void *arg) {
1348     struct loop_routine *lr = rte_malloc(NULL,
1349         sizeof(struct loop_routine), 0);
1350     lr->loop = loop;
1351     lr->arg = arg;
1352     rte_eal_mp_remote_launch(main_loop, lr, CALL_MASTER);
1353     rte_eal_mp_wait_lcore();
1354     rte_free(lr);
1355 }
1356 
1357 void
1358 ff_dpdk_pktmbuf_free(void *m)
1359 {
1360     rte_pktmbuf_free((struct rte_mbuf *)m);
1361 }
1362 
1363 static uint32_t
1364 toeplitz_hash(unsigned keylen, const uint8_t *key,
1365     unsigned datalen, const uint8_t *data)
1366 {
1367     uint32_t hash = 0, v;
1368     u_int i, b;
1369 
1370     /* XXXRW: Perhaps an assertion about key length vs. data length? */
1371 
1372     v = (key[0]<<24) + (key[1]<<16) + (key[2] <<8) + key[3];
1373     for (i = 0; i < datalen; i++) {
1374         for (b = 0; b < 8; b++) {
1375             if (data[i] & (1<<(7-b)))
1376                 hash ^= v;
1377             v <<= 1;
1378             if ((i + 4) < keylen &&
1379                 (key[i+4] & (1<<(7-b))))
1380                 v |= 1;
1381         }
1382     }
1383     return (hash);
1384 }
1385 
1386 int
1387 ff_rss_check(void *softc, uint32_t saddr, uint32_t daddr,
1388     uint16_t sport, uint16_t dport)
1389 {
1390     struct lcore_conf *qconf = &lcore_conf;
1391     struct ff_dpdk_if_context *ctx = ff_veth_softc_to_hostc(softc);
1392     uint16_t nb_queues = qconf->nb_queue_list[ctx->port_id];
1393 
1394     if (nb_queues <= 1) {
1395         return 1;
1396     }
1397 
1398     uint16_t reta_size = rss_reta_size[ctx->port_id];
1399     uint16_t queueid = qconf->tx_queue_id[ctx->port_id];
1400 
1401     uint8_t data[sizeof(saddr) + sizeof(daddr) + sizeof(sport) +
1402         sizeof(dport)];
1403 
1404     unsigned datalen = 0;
1405 
1406     bcopy(&saddr, &data[datalen], sizeof(saddr));
1407     datalen += sizeof(saddr);
1408 
1409     bcopy(&daddr, &data[datalen], sizeof(daddr));
1410     datalen += sizeof(daddr);
1411 
1412     bcopy(&sport, &data[datalen], sizeof(sport));
1413     datalen += sizeof(sport);
1414 
1415     bcopy(&dport, &data[datalen], sizeof(dport));
1416     datalen += sizeof(dport);
1417 
1418     uint32_t hash = toeplitz_hash(sizeof(default_rsskey_40bytes),
1419         default_rsskey_40bytes, datalen, data);
1420 
1421     return ((hash & (reta_size - 1)) % nb_queues) == queueid;
1422 }
1423