xref: /f-stack/lib/ff_dpdk_if.c (revision e7741141)
1 /*
2  * Copyright (C) 2017 THL A29 Limited, a Tencent company.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * 1. Redistributions of source code must retain the above copyright notice, this
9  *   list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright notice,
11  *   this list of conditions and the following disclaimer in the documentation
12  *   and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17  * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
18  * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
19  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
20  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
21  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
22  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
23  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24  *
25  */
26 
27 #include <rte_common.h>
28 #include <rte_byteorder.h>
29 #include <rte_log.h>
30 #include <rte_memory.h>
31 #include <rte_memcpy.h>
32 #include <rte_memzone.h>
33 #include <rte_config.h>
34 #include <rte_eal.h>
35 #include <rte_pci.h>
36 #include <rte_mbuf.h>
37 #include <rte_memory.h>
38 #include <rte_lcore.h>
39 #include <rte_launch.h>
40 #include <rte_ethdev.h>
41 #include <rte_debug.h>
42 #include <rte_common.h>
43 #include <rte_ether.h>
44 #include <rte_malloc.h>
45 #include <rte_cycles.h>
46 #include <rte_timer.h>
47 #include <rte_thash.h>
48 #include <rte_ip.h>
49 #include <rte_tcp.h>
50 #include <rte_udp.h>
51 
52 #include "ff_dpdk_if.h"
53 #include "ff_dpdk_pcap.h"
54 #include "ff_dpdk_kni.h"
55 #include "ff_config.h"
56 #include "ff_veth.h"
57 #include "ff_host_interface.h"
58 #include "ff_msg.h"
59 #include "ff_api.h"
60 
61 #define MEMPOOL_CACHE_SIZE 256
62 
63 #define ARP_RING_SIZE 2048
64 
65 #define MSG_RING_SIZE 32
66 
67 /*
68  * Configurable number of RX/TX ring descriptors
69  */
70 #define RX_QUEUE_SIZE 512
71 #define TX_QUEUE_SIZE 256
72 
73 #define MAX_PKT_BURST 32
74 #define BURST_TX_DRAIN_US 100 /* TX drain every ~100us */
75 
76 /*
77  * Try to avoid TX buffering if we have at least MAX_TX_BURST packets to send.
78  */
79 #define MAX_TX_BURST    (MAX_PKT_BURST / 2)
80 
81 #define NB_SOCKETS 8
82 
83 /* Configure how many packets ahead to prefetch, when reading packets */
84 #define PREFETCH_OFFSET    3
85 
86 #define MAX_RX_QUEUE_PER_LCORE 16
87 #define MAX_TX_QUEUE_PER_PORT RTE_MAX_ETHPORTS
88 #define MAX_RX_QUEUE_PER_PORT 128
89 
90 #define KNI_MBUF_MAX 2048
91 #define KNI_QUEUE_SIZE 2048
92 
93 static int enable_kni;
94 static int kni_accept;
95 
96 static struct rte_timer freebsd_clock;
97 
98 // Mellanox Linux's driver key
99 static uint8_t default_rsskey_40bytes[40] = {
100     0xd1, 0x81, 0xc6, 0x2c, 0xf7, 0xf4, 0xdb, 0x5b,
101     0x19, 0x83, 0xa2, 0xfc, 0x94, 0x3e, 0x1a, 0xdb,
102     0xd9, 0x38, 0x9e, 0x6b, 0xd1, 0x03, 0x9c, 0x2c,
103     0xa7, 0x44, 0x99, 0xad, 0x59, 0x3d, 0x56, 0xd9,
104     0xf3, 0x25, 0x3c, 0x06, 0x2a, 0xdc, 0x1f, 0xfc
105 };
106 
107 static struct rte_eth_conf default_port_conf = {
108     .rxmode = {
109         .mq_mode = ETH_MQ_RX_RSS,
110         .max_rx_pkt_len = ETHER_MAX_LEN,
111         .split_hdr_size = 0, /**< hdr buf size */
112         .header_split   = 0, /**< Header Split disabled */
113         .hw_ip_checksum = 0, /**< IP checksum offload disabled */
114         .hw_vlan_filter = 0, /**< VLAN filtering disabled */
115         .hw_vlan_strip  = 0, /**< VLAN strip disabled. */
116         .hw_vlan_extend = 0, /**< Extended VLAN disabled. */
117         .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
118         .hw_strip_crc   = 0, /**< CRC stripped by hardware */
119         .enable_lro     = 0, /**< LRO disabled */
120     },
121     .rx_adv_conf = {
122         .rss_conf = {
123             .rss_key = default_rsskey_40bytes,
124             .rss_key_len = 40,
125             .rss_hf = ETH_RSS_PROTO_MASK,
126         },
127     },
128     .txmode = {
129         .mq_mode = ETH_MQ_TX_NONE,
130     },
131 };
132 
133 struct mbuf_table {
134     uint16_t len;
135     struct rte_mbuf *m_table[MAX_PKT_BURST];
136 };
137 
138 struct lcore_rx_queue {
139     uint8_t port_id;
140     uint8_t queue_id;
141 } __rte_cache_aligned;
142 
143 struct lcore_conf {
144     uint16_t proc_id;
145     uint16_t nb_procs;
146     uint16_t socket_id;
147     uint16_t nb_rx_queue;
148     uint16_t *proc_lcore;
149     struct lcore_rx_queue rx_queue_list[MAX_RX_QUEUE_PER_LCORE];
150     uint16_t tx_queue_id[RTE_MAX_ETHPORTS];
151     struct mbuf_table tx_mbufs[RTE_MAX_ETHPORTS];
152     char *pcap[RTE_MAX_ETHPORTS];
153 } __rte_cache_aligned;
154 
155 static struct lcore_conf lcore_conf;
156 
157 static struct rte_mempool *pktmbuf_pool[NB_SOCKETS];
158 
159 static struct rte_ring **arp_ring[RTE_MAX_LCORE];
160 
161 struct ff_msg_ring {
162     char ring_name[2][RTE_RING_NAMESIZE];
163     /* ring[0] for lcore recv msg, other send */
164     /* ring[1] for lcore send msg, other read */
165     struct rte_ring *ring[2];
166 } __rte_cache_aligned;
167 
168 static struct ff_msg_ring msg_ring[RTE_MAX_LCORE];
169 static struct rte_mempool *message_pool;
170 
171 struct ff_dpdk_if_context {
172     void *sc;
173     void *ifp;
174     uint16_t port_id;
175     struct ff_hw_features hw_features;
176 } __rte_cache_aligned;
177 
178 static struct ff_dpdk_if_context *veth_ctx[RTE_MAX_ETHPORTS];
179 
180 extern void ff_hardclock(void);
181 
182 static void
183 ff_hardclock_job(__rte_unused struct rte_timer *timer,
184     __rte_unused void *arg) {
185     ff_hardclock();
186     ff_update_current_ts();
187 }
188 
189 struct ff_dpdk_if_context *
190 ff_dpdk_register_if(void *sc, void *ifp, struct ff_port_cfg *cfg)
191 {
192     struct ff_dpdk_if_context *ctx;
193 
194     ctx = calloc(1, sizeof(struct ff_dpdk_if_context));
195     if (ctx == NULL)
196         return NULL;
197 
198     ctx->sc = sc;
199     ctx->ifp = ifp;
200     ctx->port_id = cfg->port_id;
201     ctx->hw_features = cfg->hw_features;
202 
203     return ctx;
204 }
205 
206 void
207 ff_dpdk_deregister_if(struct ff_dpdk_if_context *ctx)
208 {
209     free(ctx);
210 }
211 
212 static void
213 check_all_ports_link_status(void)
214 {
215     #define CHECK_INTERVAL 100 /* 100ms */
216     #define MAX_CHECK_TIME 90  /* 9s (90 * 100ms) in total */
217 
218     uint8_t portid, count, all_ports_up, print_flag = 0;
219     struct rte_eth_link link;
220 
221     printf("\nChecking link status");
222     fflush(stdout);
223 
224     int i, nb_ports;
225     nb_ports = ff_global_cfg.dpdk.nb_ports;
226     for (count = 0; count <= MAX_CHECK_TIME; count++) {
227         all_ports_up = 1;
228         for (i = 0; i < nb_ports; i++) {
229             uint8_t portid = ff_global_cfg.dpdk.port_cfgs[i].port_id;
230             memset(&link, 0, sizeof(link));
231             rte_eth_link_get_nowait(portid, &link);
232 
233             /* print link status if flag set */
234             if (print_flag == 1) {
235                 if (link.link_status) {
236                     printf("Port %d Link Up - speed %u "
237                         "Mbps - %s\n", (int)portid,
238                         (unsigned)link.link_speed,
239                         (link.link_duplex == ETH_LINK_FULL_DUPLEX) ?
240                         ("full-duplex") : ("half-duplex\n"));
241                 } else {
242                     printf("Port %d Link Down\n", (int)portid);
243                 }
244                 continue;
245             }
246             /* clear all_ports_up flag if any link down */
247             if (link.link_status == 0) {
248                 all_ports_up = 0;
249                 break;
250             }
251         }
252 
253         /* after finally printing all link status, get out */
254         if (print_flag == 1)
255             break;
256 
257         if (all_ports_up == 0) {
258             printf(".");
259             fflush(stdout);
260             rte_delay_ms(CHECK_INTERVAL);
261         }
262 
263         /* set the print_flag if all ports up or timeout */
264         if (all_ports_up == 1 || count == (MAX_CHECK_TIME - 1)) {
265             print_flag = 1;
266             printf("done\n");
267         }
268     }
269 }
270 
271 static int
272 init_lcore_conf(void)
273 {
274     uint8_t nb_ports = rte_eth_dev_count();
275     if (nb_ports == 0) {
276         rte_exit(EXIT_FAILURE, "No probed ethernet devices\n");
277     }
278 
279     lcore_conf.proc_id = ff_global_cfg.dpdk.proc_id;
280     lcore_conf.nb_procs = ff_global_cfg.dpdk.nb_procs;
281 
282     lcore_conf.proc_lcore = rte_zmalloc(NULL,
283         sizeof(uint16_t) * lcore_conf.nb_procs, 0);
284     if (lcore_conf.proc_lcore == NULL) {
285         rte_exit(EXIT_FAILURE, "rte_zmalloc proc_lcore failed\n");
286     }
287     rte_memcpy(lcore_conf.proc_lcore, ff_global_cfg.dpdk.proc_lcore,
288         sizeof(uint16_t) * lcore_conf.nb_procs);
289     uint16_t proc_id;
290     for (proc_id = 0; proc_id < lcore_conf.nb_procs; proc_id++) {
291         uint16_t lcore_id = lcore_conf.proc_lcore[proc_id];
292         if (!lcore_config[lcore_id].detected) {
293             rte_exit(EXIT_FAILURE, "lcore %u unavailable\n", lcore_id);
294         }
295     }
296 
297     uint16_t socket_id = 0;
298     if (ff_global_cfg.dpdk.numa_on) {
299         socket_id = rte_lcore_to_socket_id(rte_lcore_id());
300     }
301 
302     lcore_conf.socket_id = socket_id;
303 
304     /* Currently, proc id 1:1 map to rx/tx queue id per port. */
305     uint8_t port_id, enabled_ports = 0;
306     for (port_id = 0; port_id < nb_ports; port_id++) {
307         if (ff_global_cfg.dpdk.port_mask &&
308             (ff_global_cfg.dpdk.port_mask & (1 << port_id)) == 0) {
309             printf("\nSkipping disabled port %d\n", port_id);
310             continue;
311         }
312 
313         if (port_id >= ff_global_cfg.dpdk.nb_ports) {
314             printf("\nSkipping non-configured port %d\n", port_id);
315             break;
316         }
317 
318         uint16_t nb_rx_queue = lcore_conf.nb_rx_queue;
319         lcore_conf.rx_queue_list[nb_rx_queue].port_id = port_id;
320         lcore_conf.rx_queue_list[nb_rx_queue].queue_id = lcore_conf.proc_id;
321         lcore_conf.nb_rx_queue++;
322 
323         lcore_conf.tx_queue_id[port_id] = lcore_conf.proc_id;
324         lcore_conf.pcap[port_id] = ff_global_cfg.dpdk.port_cfgs[enabled_ports].pcap;
325 
326         ff_global_cfg.dpdk.port_cfgs[enabled_ports].port_id = port_id;
327 
328         enabled_ports++;
329     }
330 
331     ff_global_cfg.dpdk.nb_ports = enabled_ports;
332 
333     return 0;
334 }
335 
336 static int
337 init_mem_pool(void)
338 {
339     uint8_t nb_ports = ff_global_cfg.dpdk.nb_ports;
340     uint32_t nb_lcores = ff_global_cfg.dpdk.nb_procs;
341     uint32_t nb_tx_queue = nb_lcores;
342     uint32_t nb_rx_queue = lcore_conf.nb_rx_queue * nb_lcores;
343 
344     unsigned nb_mbuf = RTE_MAX (
345         (nb_rx_queue*RX_QUEUE_SIZE          +
346         nb_ports*nb_lcores*MAX_PKT_BURST    +
347         nb_ports*nb_tx_queue*TX_QUEUE_SIZE  +
348         nb_lcores*MEMPOOL_CACHE_SIZE +
349         nb_ports*KNI_MBUF_MAX +
350         nb_ports*KNI_QUEUE_SIZE +
351         nb_lcores*nb_ports*ARP_RING_SIZE),
352         (unsigned)8192);
353 
354     unsigned socketid = 0;
355     uint16_t i, lcore_id;
356     char s[64];
357     int numa_on = ff_global_cfg.dpdk.numa_on;
358 
359     for (i = 0; i < lcore_conf.nb_procs; i++) {
360         lcore_id = lcore_conf.proc_lcore[i];
361         if (numa_on) {
362             socketid = rte_lcore_to_socket_id(lcore_id);
363         }
364 
365         if (socketid >= NB_SOCKETS) {
366             rte_exit(EXIT_FAILURE, "Socket %d of lcore %u is out of range %d\n",
367                 socketid, i, NB_SOCKETS);
368         }
369 
370         if (pktmbuf_pool[socketid] != NULL) {
371             continue;
372         }
373 
374         if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
375             snprintf(s, sizeof(s), "mbuf_pool_%d", socketid);
376             pktmbuf_pool[socketid] =
377                 rte_pktmbuf_pool_create(s, nb_mbuf,
378                     MEMPOOL_CACHE_SIZE, 0,
379                     RTE_MBUF_DEFAULT_BUF_SIZE, socketid);
380         } else {
381             snprintf(s, sizeof(s), "mbuf_pool_%d", socketid);
382             pktmbuf_pool[socketid] = rte_mempool_lookup(s);
383         }
384 
385         if (pktmbuf_pool[socketid] == NULL) {
386             rte_exit(EXIT_FAILURE, "Cannot create mbuf pool on socket %d\n", socketid);
387         } else {
388             printf("create mbuf pool on socket %d\n", socketid);
389         }
390     }
391 
392     return 0;
393 }
394 
395 static struct rte_ring *
396 create_ring(const char *name, unsigned count, int socket_id, unsigned flags)
397 {
398     struct rte_ring *ring;
399 
400     if (name == NULL)
401         return NULL;
402 
403     /* If already create, just attached it */
404     if (likely((ring = rte_ring_lookup(name)) != NULL))
405         return ring;
406 
407     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
408         return rte_ring_create(name, count, socket_id, flags);
409     } else {
410         return rte_ring_lookup(name);
411     }
412 }
413 
414 static int
415 init_arp_ring(void)
416 {
417     int i, j, ret;
418     char name_buf[RTE_RING_NAMESIZE];
419     int nb_procs = ff_global_cfg.dpdk.nb_procs;
420     int proc_id = ff_global_cfg.dpdk.proc_id;
421 
422     /* Allocate arp ring ptr according to eth dev count. */
423     int nb_ports = rte_eth_dev_count();
424     for(i = 0; i < nb_procs; ++i) {
425         snprintf(name_buf, RTE_RING_NAMESIZE, "ring_ptr_%d_%d",
426             proc_id, i);
427 
428         arp_ring[i] = rte_zmalloc(name_buf,
429             sizeof(struct rte_ring *) * nb_ports,
430              RTE_CACHE_LINE_SIZE);
431         if (arp_ring[i] == NULL) {
432             rte_exit(EXIT_FAILURE, "rte_zmalloc(%s (struct rte_ring*)) "
433                 "failed\n", name_buf);
434         }
435     }
436 
437     unsigned socketid = lcore_conf.socket_id;
438 
439     /* Create ring according to ports actually being used. */
440     nb_ports = ff_global_cfg.dpdk.nb_ports;
441     for (j = 0; j < nb_ports; j++) {
442         uint8_t port_id = ff_global_cfg.dpdk.port_cfgs[j].port_id;
443 
444         for(i = 0; i < nb_procs; ++i) {
445             snprintf(name_buf, RTE_RING_NAMESIZE, "arp_ring_%d_%d", i, port_id);
446             arp_ring[i][port_id] = create_ring(name_buf, ARP_RING_SIZE,
447                 socketid, RING_F_SC_DEQ);
448 
449             if (arp_ring[i][port_id] == NULL)
450                 rte_panic("create ring:%s failed!\n", name_buf);
451 
452             printf("create ring:%s success, %u ring entries are now free!\n",
453                 name_buf, rte_ring_free_count(arp_ring[i][port_id]));
454         }
455     }
456 
457     return 0;
458 }
459 
460 static void
461 ff_msg_init(struct rte_mempool *mp,
462     __attribute__((unused)) void *opaque_arg,
463     void *obj, __attribute__((unused)) unsigned i)
464 {
465     struct ff_msg *msg = (struct ff_msg *)obj;
466     msg->msg_type = FF_UNKNOWN;
467     msg->buf_addr = (char *)msg + sizeof(struct ff_msg);
468     msg->buf_len = mp->elt_size - sizeof(struct ff_msg);
469 }
470 
471 static int
472 init_msg_ring(void)
473 {
474     uint16_t i;
475     uint16_t nb_procs = ff_global_cfg.dpdk.nb_procs;
476     unsigned socketid = lcore_conf.socket_id;
477 
478     /* Create message buffer pool */
479     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
480         message_pool = rte_mempool_create(FF_MSG_POOL,
481            MSG_RING_SIZE * 2 * nb_procs,
482            MAX_MSG_BUF_SIZE, MSG_RING_SIZE / 2, 0,
483            NULL, NULL, ff_msg_init, NULL,
484            socketid, 0);
485     } else {
486         message_pool = rte_mempool_lookup(FF_MSG_POOL);
487     }
488 
489     if (message_pool == NULL) {
490         rte_panic("Create msg mempool failed\n");
491     }
492 
493     for(i = 0; i < nb_procs; ++i) {
494         snprintf(msg_ring[i].ring_name[0], RTE_RING_NAMESIZE,
495             "%s%u", FF_MSG_RING_IN, i);
496         snprintf(msg_ring[i].ring_name[1], RTE_RING_NAMESIZE,
497             "%s%u", FF_MSG_RING_OUT, i);
498 
499         msg_ring[i].ring[0] = create_ring(msg_ring[i].ring_name[0],
500             MSG_RING_SIZE, socketid, RING_F_SP_ENQ | RING_F_SC_DEQ);
501         if (msg_ring[i].ring[0] == NULL)
502             rte_panic("create ring::%s failed!\n", msg_ring[i].ring_name[0]);
503 
504         msg_ring[i].ring[1] = create_ring(msg_ring[i].ring_name[1],
505             MSG_RING_SIZE, socketid, RING_F_SP_ENQ | RING_F_SC_DEQ);
506         if (msg_ring[i].ring[1] == NULL)
507             rte_panic("create ring::%s failed!\n", msg_ring[i].ring_name[0]);
508     }
509 
510     return 0;
511 }
512 
513 static int
514 init_kni(void)
515 {
516     int nb_ports = rte_eth_dev_count();
517     kni_accept = 0;
518     if(strcasecmp(ff_global_cfg.kni.method, "accept") == 0)
519         kni_accept = 1;
520 
521     ff_kni_init(nb_ports, ff_global_cfg.kni.tcp_port,
522         ff_global_cfg.kni.udp_port);
523 
524     unsigned socket_id = lcore_conf.socket_id;
525     struct rte_mempool *mbuf_pool = pktmbuf_pool[socket_id];
526 
527     nb_ports = ff_global_cfg.dpdk.nb_ports;
528     int i, ret;
529     for (i = 0; i < nb_ports; i++) {
530         uint8_t port_id = ff_global_cfg.dpdk.port_cfgs[i].port_id;
531         ff_kni_alloc(port_id, socket_id, mbuf_pool, KNI_QUEUE_SIZE);
532     }
533 
534     return 0;
535 }
536 
537 static int
538 init_port_start(void)
539 {
540     int nb_ports = ff_global_cfg.dpdk.nb_ports;
541     uint16_t nb_procs = ff_global_cfg.dpdk.nb_procs;
542     unsigned socketid = rte_lcore_to_socket_id(rte_lcore_id());
543     struct rte_mempool *mbuf_pool = pktmbuf_pool[socketid];
544     uint16_t i;
545 
546     for (i = 0; i < nb_ports; i++) {
547         uint8_t port_id = ff_global_cfg.dpdk.port_cfgs[i].port_id;
548 
549         struct rte_eth_dev_info dev_info;
550         rte_eth_dev_info_get(port_id, &dev_info);
551 
552         if (nb_procs > dev_info.max_rx_queues) {
553             rte_exit(EXIT_FAILURE, "num_procs[%d] bigger than max_rx_queues[%d]\n",
554                 nb_procs,
555                 dev_info.max_rx_queues);
556         }
557 
558         if (nb_procs > dev_info.max_tx_queues) {
559             rte_exit(EXIT_FAILURE, "num_procs[%d] bigger than max_tx_queues[%d]\n",
560                 nb_procs,
561                 dev_info.max_tx_queues);
562         }
563 
564         struct ether_addr addr;
565         rte_eth_macaddr_get(port_id, &addr);
566         printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
567                    " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
568                 (unsigned)port_id,
569                 addr.addr_bytes[0], addr.addr_bytes[1],
570                 addr.addr_bytes[2], addr.addr_bytes[3],
571                 addr.addr_bytes[4], addr.addr_bytes[5]);
572 
573         rte_memcpy(ff_global_cfg.dpdk.port_cfgs[i].mac,
574             addr.addr_bytes, ETHER_ADDR_LEN);
575 
576         /* Clear txq_flags - we do not need multi-mempool and refcnt */
577         dev_info.default_txconf.txq_flags = ETH_TXQ_FLAGS_NOMULTMEMP |
578             ETH_TXQ_FLAGS_NOREFCOUNT;
579 
580         /* Disable features that are not supported by port's HW */
581         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM)) {
582             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMUDP;
583         }
584 
585         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)) {
586             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMTCP;
587         }
588 
589         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_SCTP_CKSUM)) {
590             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMSCTP;
591         }
592 
593         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_VLAN_INSERT)) {
594             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOVLANOFFL;
595         }
596 
597         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_VLAN_INSERT)) {
598             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOVLANOFFL;
599         }
600 
601         if (!(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_TSO) &&
602             !(dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_TSO)) {
603             dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOMULTSEGS;
604         }
605 
606         struct rte_eth_conf port_conf = {0};
607 
608         /* Set RSS mode */
609         port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
610         port_conf.rx_adv_conf.rss_conf.rss_hf = ETH_RSS_PROTO_MASK;
611         port_conf.rx_adv_conf.rss_conf.rss_key = default_rsskey_40bytes;
612         port_conf.rx_adv_conf.rss_conf.rss_key_len = 40;
613 
614         /* Set Rx VLAN stripping */
615         if (ff_global_cfg.dpdk.vlan_strip) {
616             if (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_VLAN_STRIP) {
617                 port_conf.rxmode.hw_vlan_strip = 1;
618             }
619         }
620 
621         /* Enable HW CRC stripping */
622         port_conf.rxmode.hw_strip_crc = 1;
623 
624         /* FIXME: Enable TCP LRO ?*/
625         #if 0
626         if (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_LRO) {
627             printf("LRO is supported\n");
628             port_conf.rxmode.enable_lro = 1;
629             ff_global_cfg.dpdk.port_cfgs[i].hw_features.rx_lro = 1;
630         }
631         #endif
632 
633         /* Set Rx checksum checking */
634         if ((dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
635             (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_UDP_CKSUM) &&
636             (dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)) {
637             printf("RX checksum offload supported\n");
638             port_conf.rxmode.hw_ip_checksum = 1;
639             ff_global_cfg.dpdk.port_cfgs[i].hw_features.rx_csum = 1;
640         }
641 
642         if ((dev_info.tx_offload_capa & DEV_TX_OFFLOAD_IPV4_CKSUM)) {
643             printf("TX ip checksum offload supported\n");
644             ff_global_cfg.dpdk.port_cfgs[i].hw_features.tx_csum_ip = 1;
645         }
646 
647         if ((dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM) &&
648             (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)) {
649             printf("TX TCP&UDP checksum offload supported\n");
650             ff_global_cfg.dpdk.port_cfgs[i].hw_features.tx_csum_l4 = 1;
651         }
652 
653         if (ff_global_cfg.dpdk.tso) {
654             if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_TSO) {
655                 printf("TSO is supported\n");
656                 ff_global_cfg.dpdk.port_cfgs[i].hw_features.tx_tso = 1;
657             }
658         } else {
659             printf("TSO is disabled\n");
660         }
661 
662         if (rte_eal_process_type() != RTE_PROC_PRIMARY) {
663             continue;
664         }
665 
666         /* Currently, proc id 1:1 map to queue id per port. */
667         int ret = rte_eth_dev_configure(port_id, nb_procs, nb_procs, &port_conf);
668         if (ret != 0) {
669             return ret;
670         }
671 
672         uint16_t q;
673         for (q = 0; q < nb_procs; q++) {
674             ret = rte_eth_tx_queue_setup(port_id, q, TX_QUEUE_SIZE,
675                 socketid, &dev_info.default_txconf);
676             if (ret < 0) {
677                 return ret;
678             }
679 
680             ret = rte_eth_rx_queue_setup(port_id, q, RX_QUEUE_SIZE,
681                 socketid, &dev_info.default_rxconf, mbuf_pool);
682             if (ret < 0) {
683                 return ret;
684             }
685         }
686 
687         ret = rte_eth_dev_start(port_id);
688         if (ret < 0) {
689             return ret;
690         }
691 
692         /* Enable RX in promiscuous mode for the Ethernet device. */
693         if (ff_global_cfg.dpdk.promiscuous) {
694             rte_eth_promiscuous_enable(port_id);
695             ret = rte_eth_promiscuous_get(port_id);
696             if (ret == 1) {
697                 printf("set port %u to promiscuous mode ok\n", port_id);
698             } else {
699                 printf("set port %u to promiscuous mode error\n", port_id);
700             }
701         }
702 
703         /* Enable pcap dump */
704         if (ff_global_cfg.dpdk.port_cfgs[i].pcap) {
705             ff_enable_pcap(ff_global_cfg.dpdk.port_cfgs[i].pcap);
706         }
707     }
708 
709     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
710         check_all_ports_link_status();
711     }
712 
713     return 0;
714 }
715 
716 static int
717 init_clock(void)
718 {
719     rte_timer_subsystem_init();
720     uint64_t hz = rte_get_timer_hz();
721     uint64_t intrs = MS_PER_S/ff_global_cfg.freebsd.hz;
722     uint64_t tsc = (hz + MS_PER_S - 1) / MS_PER_S*intrs;
723 
724     rte_timer_init(&freebsd_clock);
725     rte_timer_reset(&freebsd_clock, tsc, PERIODICAL,
726         rte_lcore_id(), &ff_hardclock_job, NULL);
727 
728     ff_update_current_ts();
729 
730     return 0;
731 }
732 
733 int
734 ff_dpdk_init(int argc, char **argv)
735 {
736     if (ff_global_cfg.dpdk.nb_procs < 1 ||
737         ff_global_cfg.dpdk.nb_procs > RTE_MAX_LCORE ||
738         ff_global_cfg.dpdk.proc_id >= ff_global_cfg.dpdk.nb_procs ||
739         ff_global_cfg.dpdk.proc_id < 0) {
740         printf("param num_procs[%d] or proc_id[%d] error!\n",
741             ff_global_cfg.dpdk.nb_procs,
742             ff_global_cfg.dpdk.proc_id);
743         exit(1);
744     }
745 
746     int ret = rte_eal_init(argc, argv);
747     if (ret < 0) {
748         rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
749     }
750 
751     init_lcore_conf();
752 
753     init_mem_pool();
754 
755     init_arp_ring();
756 
757     init_msg_ring();
758 
759     enable_kni = ff_global_cfg.kni.enable;
760     if (enable_kni) {
761         init_kni();
762     }
763 
764     ret = init_port_start();
765     if (ret < 0) {
766         rte_exit(EXIT_FAILURE, "init_port_start failed\n");
767     }
768 
769     init_clock();
770 
771     return 0;
772 }
773 
774 static void
775 ff_veth_input(const struct ff_dpdk_if_context *ctx, struct rte_mbuf *pkt)
776 {
777     uint8_t rx_csum = ctx->hw_features.rx_csum;
778     if (rx_csum) {
779         if (pkt->ol_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD)) {
780             return;
781         }
782     }
783 
784     /*
785      * FIXME: should we save pkt->vlan_tci
786      * if (pkt->ol_flags & PKT_RX_VLAN_PKT)
787      */
788 
789     void *data = rte_pktmbuf_mtod(pkt, void*);
790     uint16_t len = rte_pktmbuf_data_len(pkt);
791 
792     void *hdr = ff_mbuf_gethdr(pkt, pkt->pkt_len, data, len, rx_csum);
793     if (hdr == NULL) {
794         rte_pktmbuf_free(pkt);
795         return;
796     }
797 
798     struct rte_mbuf *pn = pkt->next;
799     void *prev = hdr;
800     while(pn != NULL) {
801         data = rte_pktmbuf_mtod(pkt, void*);
802         len = rte_pktmbuf_data_len(pkt);
803 
804         void *mb = ff_mbuf_get(prev, data, len);
805         if (mb == NULL) {
806             ff_mbuf_free(hdr);
807             rte_pktmbuf_free(pkt);
808             return;
809         }
810         pn = pn->next;
811         prev = mb;
812     }
813 
814     ff_veth_process_packet(ctx->ifp, hdr);
815 }
816 
817 static enum FilterReturn
818 protocol_filter(const void *data, uint16_t len)
819 {
820     if(len < sizeof(struct ether_hdr))
821         return FILTER_UNKNOWN;
822 
823     const struct ether_hdr *hdr;
824     hdr = (const struct ether_hdr *)data;
825 
826     if(ntohs(hdr->ether_type) == ETHER_TYPE_ARP)
827         return FILTER_ARP;
828 
829     if (!enable_kni) {
830         return FILTER_UNKNOWN;
831     }
832 
833     if(ntohs(hdr->ether_type) != ETHER_TYPE_IPv4)
834         return FILTER_UNKNOWN;
835 
836     return ff_kni_proto_filter(data + sizeof(struct ether_hdr),
837         len - sizeof(struct ether_hdr));
838 }
839 
840 static inline void
841 process_packets(uint8_t port_id, uint16_t queue_id, struct rte_mbuf **bufs,
842     uint16_t count, const struct ff_dpdk_if_context *ctx, int pkts_from_ring)
843 {
844     struct lcore_conf *qconf = &lcore_conf;
845 
846     uint16_t i;
847     for (i = 0; i < count; i++) {
848         struct rte_mbuf *rtem = bufs[i];
849 
850         if (unlikely(qconf->pcap[port_id] != NULL)) {
851             ff_dump_packets(qconf->pcap[port_id], rtem);
852         }
853 
854         void *data = rte_pktmbuf_mtod(rtem, void*);
855         uint16_t len = rte_pktmbuf_data_len(rtem);
856 
857         enum FilterReturn filter = protocol_filter(data, len);
858         if (filter == FILTER_ARP) {
859             struct rte_mempool *mbuf_pool;
860             struct rte_mbuf *mbuf_clone;
861             if (pkts_from_ring == 0) {
862                 uint16_t i;
863                 for(i = 0; i < qconf->nb_procs; ++i) {
864                     if(i == queue_id)
865                         continue;
866 
867                     mbuf_pool = pktmbuf_pool[rte_lcore_to_socket_id(qconf->proc_lcore[i])];
868                     mbuf_clone = rte_pktmbuf_clone(rtem, mbuf_pool);
869                     if(mbuf_clone) {
870                         int ret = rte_ring_enqueue(arp_ring[i][port_id], mbuf_clone);
871                         if (ret < 0)
872                             rte_pktmbuf_free(mbuf_clone);
873                     }
874                 }
875             }
876 
877             if (enable_kni && rte_eal_process_type() == RTE_PROC_PRIMARY) {
878                 mbuf_pool = pktmbuf_pool[qconf->socket_id];
879                 mbuf_clone = rte_pktmbuf_clone(rtem, mbuf_pool);
880                 if(mbuf_clone) {
881                     ff_kni_enqueue(port_id, mbuf_clone);
882                 }
883             }
884 
885             ff_veth_input(ctx, rtem);
886         } else if (enable_kni && ((filter == FILTER_KNI && kni_accept) ||
887             (filter == FILTER_UNKNOWN && !kni_accept)) ) {
888             ff_kni_enqueue(port_id, rtem);
889         } else {
890             ff_veth_input(ctx, rtem);
891         }
892     }
893 }
894 
895 static inline int
896 process_arp_ring(uint8_t port_id, uint16_t queue_id,
897     struct rte_mbuf **pkts_burst, const struct ff_dpdk_if_context *ctx)
898 {
899     /* read packet from ring buf and to process */
900     uint16_t nb_rb;
901     nb_rb = rte_ring_dequeue_burst(arp_ring[queue_id][port_id],
902         (void **)pkts_burst, MAX_PKT_BURST);
903 
904     if(nb_rb > 0) {
905         process_packets(port_id, queue_id, pkts_burst, nb_rb, ctx, 1);
906     }
907 
908     return 0;
909 }
910 
911 static inline void
912 handle_sysctl_msg(struct ff_msg *msg, uint16_t proc_id)
913 {
914     int ret = ff_sysctl(msg->sysctl.name, msg->sysctl.namelen,
915         msg->sysctl.old, msg->sysctl.oldlenp, msg->sysctl.new,
916         msg->sysctl.newlen);
917 
918     if (ret < 0) {
919         msg->result = errno;
920     } else {
921         msg->result = 0;
922     }
923 
924     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
925 }
926 
927 static inline void
928 handle_ioctl_msg(struct ff_msg *msg, uint16_t proc_id)
929 {
930     int fd, ret;
931     fd = ff_socket(AF_INET, SOCK_DGRAM, 0);
932     if (fd < 0) {
933         ret = -1;
934         goto done;
935     }
936 
937     ret = ff_ioctl(fd, msg->ioctl.cmd, msg->ioctl.data);
938 
939     ff_close(fd);
940 
941 done:
942     if (ret < 0) {
943         msg->result = errno;
944     } else {
945         msg->result = 0;
946     }
947 
948     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
949 }
950 
951 static inline void
952 handle_route_msg(struct ff_msg *msg, uint16_t proc_id)
953 {
954     msg->result = ff_rtioctl(msg->route.fib, msg->route.data,
955         &msg->route.len, msg->route.maxlen);
956 
957     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
958 }
959 
960 static struct ff_top_args ff_status;
961 static inline void
962 handle_top_msg(struct ff_msg *msg, uint16_t proc_id)
963 {
964     msg->top = ff_status;
965     msg->result = 0;
966 
967     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
968 }
969 
970 static inline void
971 handle_default_msg(struct ff_msg *msg, uint16_t proc_id)
972 {
973     msg->result = EINVAL;
974     rte_ring_enqueue(msg_ring[proc_id].ring[1], msg);
975 }
976 
977 static inline void
978 handle_msg(struct ff_msg *msg, uint16_t proc_id)
979 {
980     switch (msg->msg_type) {
981         case FF_SYSCTL:
982             handle_sysctl_msg(msg, proc_id);
983             break;
984         case FF_IOCTL:
985             handle_ioctl_msg(msg, proc_id);
986             break;
987         case FF_ROUTE:
988             handle_route_msg(msg, proc_id);
989             break;
990         case FF_TOP:
991             handle_top_msg(msg, proc_id);
992             break;
993         default:
994             handle_default_msg(msg, proc_id);
995             break;
996     }
997 }
998 
999 static inline int
1000 process_msg_ring(uint16_t proc_id)
1001 {
1002     void *msg;
1003     int ret = rte_ring_dequeue(msg_ring[proc_id].ring[0], &msg);
1004 
1005     if (unlikely(ret == 0)) {
1006         handle_msg((struct ff_msg *)msg, proc_id);
1007     }
1008 
1009     return 0;
1010 }
1011 
1012 /* Send burst of packets on an output interface */
1013 static inline int
1014 send_burst(struct lcore_conf *qconf, uint16_t n, uint8_t port)
1015 {
1016     struct rte_mbuf **m_table;
1017     int ret;
1018     uint16_t queueid;
1019 
1020     queueid = qconf->tx_queue_id[port];
1021     m_table = (struct rte_mbuf **)qconf->tx_mbufs[port].m_table;
1022 
1023     if (unlikely(qconf->pcap[port] != NULL)) {
1024         uint16_t i;
1025         for (i = 0; i < n; i++) {
1026             ff_dump_packets(qconf->pcap[port], m_table[i]);
1027         }
1028     }
1029 
1030     ret = rte_eth_tx_burst(port, queueid, m_table, n);
1031     if (unlikely(ret < n)) {
1032         do {
1033             rte_pktmbuf_free(m_table[ret]);
1034         } while (++ret < n);
1035     }
1036 
1037     return 0;
1038 }
1039 
1040 /* Enqueue a single packet, and send burst if queue is filled */
1041 static inline int
1042 send_single_packet(struct rte_mbuf *m, uint8_t port)
1043 {
1044     uint16_t len;
1045     struct lcore_conf *qconf;
1046 
1047     qconf = &lcore_conf;
1048     len = qconf->tx_mbufs[port].len;
1049     qconf->tx_mbufs[port].m_table[len] = m;
1050     len++;
1051 
1052     /* enough pkts to be sent */
1053     if (unlikely(len == MAX_PKT_BURST)) {
1054         send_burst(qconf, MAX_PKT_BURST, port);
1055         len = 0;
1056     }
1057 
1058     qconf->tx_mbufs[port].len = len;
1059     return 0;
1060 }
1061 
1062 int
1063 ff_dpdk_if_send(struct ff_dpdk_if_context *ctx, void *m,
1064     int total)
1065 {
1066     struct rte_mempool *mbuf_pool = pktmbuf_pool[lcore_conf.socket_id];
1067     struct rte_mbuf *head = rte_pktmbuf_alloc(mbuf_pool);
1068     if (head == NULL) {
1069         ff_mbuf_free(m);
1070         return -1;
1071     }
1072 
1073     head->pkt_len = total;
1074     head->nb_segs = 0;
1075 
1076     int off = 0;
1077     struct rte_mbuf *cur = head, *prev = NULL;
1078     while(total > 0) {
1079         if (cur == NULL) {
1080             cur = rte_pktmbuf_alloc(mbuf_pool);
1081             if (cur == NULL) {
1082                 rte_pktmbuf_free(head);
1083                 ff_mbuf_free(m);
1084                 return -1;
1085             }
1086         }
1087 
1088         void *data = rte_pktmbuf_mtod(cur, void*);
1089         int len = total > RTE_MBUF_DEFAULT_DATAROOM ? RTE_MBUF_DEFAULT_DATAROOM : total;
1090         int ret = ff_mbuf_copydata(m, data, off, len);
1091         if (ret < 0) {
1092             rte_pktmbuf_free(head);
1093             ff_mbuf_free(m);
1094             return -1;
1095         }
1096 
1097         if (prev != NULL) {
1098             prev->next = cur;
1099         }
1100         prev = cur;
1101 
1102         cur->data_len = len;
1103         off += len;
1104         total -= len;
1105         head->nb_segs++;
1106         cur = NULL;
1107     }
1108 
1109     struct ff_tx_offload offload = {0};
1110     ff_mbuf_tx_offload(m, &offload);
1111 
1112     if (offload.ip_csum) {
1113         head->ol_flags |= PKT_TX_IP_CKSUM;
1114         head->l2_len = sizeof(struct ether_hdr);
1115         head->l3_len = sizeof(struct ipv4_hdr);
1116     }
1117 
1118     if (ctx->hw_features.tx_csum_l4) {
1119         if (offload.tcp_csum) {
1120             head->ol_flags |= PKT_TX_TCP_CKSUM;
1121             head->l2_len = sizeof(struct ether_hdr);
1122             head->l3_len = sizeof(struct ipv4_hdr);
1123         }
1124 
1125         if (offload.tso_seg_size) {
1126             head->ol_flags |= PKT_TX_TCP_SEG;
1127             head->l4_len = sizeof(struct tcp_hdr);
1128             head->tso_segsz = offload.tso_seg_size;
1129         }
1130 
1131         if (offload.udp_csum) {
1132             head->ol_flags |= PKT_TX_UDP_CKSUM;
1133             head->l2_len = sizeof(struct ether_hdr);
1134             head->l3_len = sizeof(struct ipv4_hdr);
1135         }
1136     }
1137 
1138     ff_mbuf_free(m);
1139 
1140     return send_single_packet(head, ctx->port_id);
1141 }
1142 
1143 static int
1144 main_loop(void *arg)
1145 {
1146     struct loop_routine *lr = (struct loop_routine *)arg;
1147 
1148     struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
1149     unsigned lcore_id;
1150     uint64_t prev_tsc, diff_tsc, cur_tsc, div_tsc, usr_tsc, sys_tsc, end_tsc;
1151     int i, j, nb_rx, idle;
1152     uint8_t port_id, queue_id;
1153     struct lcore_conf *qconf;
1154     const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) /
1155         US_PER_S * BURST_TX_DRAIN_US;
1156     struct ff_dpdk_if_context *ctx;
1157 
1158     prev_tsc = 0;
1159 
1160     lcore_id = rte_lcore_id();
1161     qconf = &lcore_conf;
1162 
1163     if (qconf->nb_rx_queue == 0) {
1164         printf("lcore %u has nothing to do\n", lcore_id);
1165         return 0;
1166     }
1167 
1168     while (1) {
1169         cur_tsc = rte_rdtsc();
1170         if (unlikely(freebsd_clock.expire < cur_tsc)) {
1171             rte_timer_manage();
1172         }
1173 
1174         idle = 1;
1175         sys_tsc = 0;
1176         usr_tsc = 0;
1177 
1178         /*
1179          * TX burst queue drain
1180          */
1181         diff_tsc = cur_tsc - prev_tsc;
1182         if (unlikely(diff_tsc > drain_tsc)) {
1183             /*
1184              * This could be optimized (use queueid instead of
1185              * portid), but it is not called so often
1186              */
1187             for (port_id = 0; port_id < RTE_MAX_ETHPORTS; port_id++) {
1188                 if (qconf->tx_mbufs[port_id].len == 0)
1189                     continue;
1190 
1191                 idle = 0;
1192                 send_burst(qconf,
1193                     qconf->tx_mbufs[port_id].len,
1194                     port_id);
1195                 qconf->tx_mbufs[port_id].len = 0;
1196             }
1197 
1198             prev_tsc = cur_tsc;
1199         }
1200 
1201         /*
1202          * Read packet from RX queues
1203          */
1204         for (i = 0; i < qconf->nb_rx_queue; ++i) {
1205             port_id = qconf->rx_queue_list[i].port_id;
1206             queue_id = qconf->rx_queue_list[i].queue_id;
1207             ctx = veth_ctx[port_id];
1208 
1209             if (enable_kni && rte_eal_process_type() == RTE_PROC_PRIMARY) {
1210                 ff_kni_process(port_id, queue_id, pkts_burst, MAX_PKT_BURST);
1211             }
1212 
1213             process_arp_ring(port_id, queue_id, pkts_burst, ctx);
1214 
1215             nb_rx = rte_eth_rx_burst(port_id, queue_id, pkts_burst,
1216                 MAX_PKT_BURST);
1217             if (nb_rx == 0)
1218                 continue;
1219 
1220             idle = 0;
1221 
1222             /* Prefetch first packets */
1223             for (j = 0; j < PREFETCH_OFFSET && j < nb_rx; j++) {
1224                 rte_prefetch0(rte_pktmbuf_mtod(
1225                         pkts_burst[j], void *));
1226             }
1227 
1228             /* Prefetch and handle already prefetched packets */
1229             for (j = 0; j < (nb_rx - PREFETCH_OFFSET); j++) {
1230                 rte_prefetch0(rte_pktmbuf_mtod(pkts_burst[
1231                         j + PREFETCH_OFFSET], void *));
1232                 process_packets(port_id, queue_id, &pkts_burst[j], 1, ctx, 0);
1233             }
1234 
1235             /* Handle remaining prefetched packets */
1236             for (; j < nb_rx; j++) {
1237                 process_packets(port_id, queue_id, &pkts_burst[j], 1, ctx, 0);
1238             }
1239         }
1240 
1241         process_msg_ring(qconf->proc_id);
1242 
1243         div_tsc = rte_rdtsc();
1244 
1245         if (likely(lr->loop != NULL)) {
1246             lr->loop(lr->arg);
1247         }
1248 
1249         end_tsc = rte_rdtsc();
1250         usr_tsc = end_tsc - div_tsc;
1251 
1252         if (!idle) {
1253             sys_tsc = div_tsc - cur_tsc;
1254             ff_status.sys_tsc += sys_tsc;
1255         }
1256 
1257         ff_status.usr_tsc += usr_tsc;
1258         ff_status.work_tsc += end_tsc - cur_tsc;
1259         ff_status.idle_tsc += end_tsc - cur_tsc - usr_tsc - sys_tsc;
1260 
1261         ff_status.loops++;
1262     }
1263 }
1264 
1265 int
1266 ff_dpdk_if_up(void) {
1267     int nb_ports = ff_global_cfg.dpdk.nb_ports;
1268     int i;
1269     for (i = 0; i < nb_ports; i++) {
1270         uint8_t port_id = ff_global_cfg.dpdk.port_cfgs[i].port_id;
1271         veth_ctx[port_id] = ff_veth_attach(ff_global_cfg.dpdk.port_cfgs + i);
1272         if (veth_ctx[port_id] == NULL) {
1273             rte_exit(EXIT_FAILURE, "ff_veth_attach failed");
1274         }
1275     }
1276 
1277     return 0;
1278 }
1279 
1280 void
1281 ff_dpdk_run(loop_func_t loop, void *arg) {
1282     struct loop_routine *lr = malloc(sizeof(struct loop_routine));
1283     lr->loop = loop;
1284     lr->arg = arg;
1285     rte_eal_mp_remote_launch(main_loop, lr, CALL_MASTER);
1286     rte_eal_mp_wait_lcore();
1287     free(lr);
1288 }
1289 
1290 void
1291 ff_dpdk_pktmbuf_free(void *m)
1292 {
1293     rte_pktmbuf_free((struct rte_mbuf *)m);
1294 }
1295 
1296 static uint32_t
1297 toeplitz_hash(unsigned keylen, const uint8_t *key,
1298     unsigned datalen, const uint8_t *data)
1299 {
1300     uint32_t hash = 0, v;
1301     u_int i, b;
1302 
1303     /* XXXRW: Perhaps an assertion about key length vs. data length? */
1304 
1305     v = (key[0]<<24) + (key[1]<<16) + (key[2] <<8) + key[3];
1306     for (i = 0; i < datalen; i++) {
1307         for (b = 0; b < 8; b++) {
1308             if (data[i] & (1<<(7-b)))
1309                 hash ^= v;
1310             v <<= 1;
1311             if ((i + 4) < keylen &&
1312                 (key[i+4] & (1<<(7-b))))
1313                 v |= 1;
1314         }
1315     }
1316     return (hash);
1317 }
1318 
1319 int
1320 ff_rss_check(uint32_t saddr, uint32_t daddr, uint16_t sport, uint16_t dport)
1321 {
1322     struct lcore_conf *qconf = &lcore_conf;
1323 
1324     if (qconf->nb_procs == 1) {
1325         return 1;
1326     }
1327 
1328     uint8_t data[sizeof(saddr) + sizeof(daddr) + sizeof(sport) +
1329         sizeof(dport)];
1330 
1331     unsigned datalen = 0;
1332 
1333     bcopy(&saddr, &data[datalen], sizeof(saddr));
1334     datalen += sizeof(saddr);
1335 
1336     bcopy(&daddr, &data[datalen], sizeof(daddr));
1337     datalen += sizeof(daddr);
1338 
1339     bcopy(&sport, &data[datalen], sizeof(sport));
1340     datalen += sizeof(sport);
1341 
1342     bcopy(&dport, &data[datalen], sizeof(dport));
1343     datalen += sizeof(dport);
1344 
1345     uint32_t hash = toeplitz_hash(sizeof(default_rsskey_40bytes), default_rsskey_40bytes, datalen, data);
1346 
1347     return (hash % qconf->nb_procs) == qconf->proc_id;
1348 }
1349 
1350 
1351