xref: /f-stack/lib/ff_dpdk_if.c (revision a9643ea8)
1 /*
2  * Copyright (C) 2017 THL A29 Limited, a Tencent company.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * 1. Redistributions of source code must retain the above copyright notice, this
9  *   list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright notice,
11  *   this list of conditions and the following disclaimer in the documentation
12  *   and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17  * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
18  * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
19  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
20  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
21  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
22  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
23  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24  *
25  */
26 
27 #include <rte_common.h>
28 #include <rte_byteorder.h>
29 #include <rte_log.h>
30 #include <rte_memory.h>
31 #include <rte_memcpy.h>
32 #include <rte_memzone.h>
33 #include <rte_config.h>
34 #include <rte_eal.h>
35 #include <rte_pci.h>
36 #include <rte_mbuf.h>
37 #include <rte_memory.h>
38 #include <rte_lcore.h>
39 #include <rte_launch.h>
40 #include <rte_ethdev.h>
41 #include <rte_debug.h>
42 #include <rte_common.h>
43 #include <rte_ether.h>
44 #include <rte_malloc.h>
45 #include <rte_cycles.h>
46 #include <rte_timer.h>
47 #include <rte_thash.h>
48 
49 #include "ff_dpdk_if.h"
50 #include "ff_dpdk_pcap.h"
51 #include "ff_dpdk_kni.h"
52 #include "ff_config.h"
53 #include "ff_veth.h"
54 #include "ff_host_interface.h"
55 
56 #define MEMPOOL_CACHE_SIZE 256
57 
58 #define ARP_RING_SIZE 2048
59 
60 /*
61  * Configurable number of RX/TX ring descriptors
62  */
63 #define RX_QUEUE_SIZE 512
64 #define TX_QUEUE_SIZE 256
65 
66 #define MAX_PKT_BURST 32
67 #define BURST_TX_DRAIN_US 100 /* TX drain every ~100us */
68 
69 /*
70  * Try to avoid TX buffering if we have at least MAX_TX_BURST packets to send.
71  */
72 #define MAX_TX_BURST    (MAX_PKT_BURST / 2)
73 
74 #define NB_SOCKETS 8
75 
76 /* Configure how many packets ahead to prefetch, when reading packets */
77 #define PREFETCH_OFFSET    3
78 
79 #define MAX_RX_QUEUE_PER_LCORE 16
80 #define MAX_TX_QUEUE_PER_PORT RTE_MAX_ETHPORTS
81 #define MAX_RX_QUEUE_PER_PORT 128
82 
83 #define BITS_PER_HEX 4
84 
85 static int enable_kni = 0;
86 
87 static struct rte_timer freebsd_clock;
88 
89 // Mellanox Linux's driver key
90 static uint8_t default_rsskey_40bytes[40] = {
91     0xd1, 0x81, 0xc6, 0x2c, 0xf7, 0xf4, 0xdb, 0x5b,
92     0x19, 0x83, 0xa2, 0xfc, 0x94, 0x3e, 0x1a, 0xdb,
93     0xd9, 0x38, 0x9e, 0x6b, 0xd1, 0x03, 0x9c, 0x2c,
94     0xa7, 0x44, 0x99, 0xad, 0x59, 0x3d, 0x56, 0xd9,
95     0xf3, 0x25, 0x3c, 0x06, 0x2a, 0xdc, 0x1f, 0xfc
96 };
97 
98 static struct rte_eth_conf default_port_conf = {
99     .rxmode = {
100         .mq_mode = ETH_MQ_RX_RSS,
101         .max_rx_pkt_len = ETHER_MAX_LEN,
102         .split_hdr_size = 0, /**< hdr buf size */
103         .header_split   = 0, /**< Header Split disabled */
104         .hw_ip_checksum = 0, /**< IP checksum offload disabled */
105         .hw_vlan_filter = 0, /**< VLAN filtering disabled */
106         .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
107         .hw_strip_crc   = 0, /**< CRC stripped by hardware */
108         .enable_lro     = 0, /**< LRO disabled */
109     },
110     .rx_adv_conf = {
111         .rss_conf = {
112             .rss_key = default_rsskey_40bytes,
113             .rss_key_len = 40,
114             .rss_hf = ETH_RSS_PROTO_MASK,
115         },
116     },
117     .txmode = {
118         .mq_mode = ETH_MQ_TX_NONE,
119     },
120 };
121 
122 struct mbuf_table {
123     uint16_t len;
124     struct rte_mbuf *m_table[MAX_PKT_BURST];
125 };
126 
127 struct lcore_rx_queue {
128     uint8_t port_id;
129     uint8_t queue_id;
130 } __rte_cache_aligned;
131 
132 struct lcore_conf {
133     uint16_t proc_id;
134     uint16_t nb_procs;
135     uint16_t socket_id;
136     uint16_t nb_rx_queue;
137     uint16_t *lcore_proc;
138     struct lcore_rx_queue rx_queue_list[MAX_RX_QUEUE_PER_LCORE];
139     uint16_t tx_queue_id[RTE_MAX_ETHPORTS];
140     struct mbuf_table tx_mbufs[RTE_MAX_ETHPORTS];
141     char *pcap[RTE_MAX_ETHPORTS];
142 } __rte_cache_aligned;
143 
144 static struct lcore_conf lcore_conf;
145 
146 static struct rte_mempool *pktmbuf_pool[NB_SOCKETS];
147 
148 static struct rte_ring **arp_ring[RTE_MAX_LCORE];
149 
150 struct ff_dpdk_if_context {
151     void *sc;
152     void *ifp;
153     uint16_t port_id;
154 };
155 
156 static struct ff_dpdk_if_context *veth_ctx[RTE_MAX_ETHPORTS];
157 
158 extern void ff_hardclock(void);
159 
160 static void
161 freebsd_hardclock_job(__rte_unused struct rte_timer *timer,
162     __rte_unused void *arg) {
163     ff_hardclock();
164 }
165 
166 struct ff_dpdk_if_context *
167 ff_dpdk_register_if(void *sc, void *ifp, struct ff_port_cfg *cfg)
168 {
169     struct ff_dpdk_if_context *ctx;
170 
171     ctx = calloc(1, sizeof(struct ff_dpdk_if_context));
172     if (ctx == NULL)
173         return NULL;
174 
175     ctx->sc = sc;
176     ctx->ifp = ifp;
177     ctx->port_id = cfg->port_id;
178 
179     return ctx;
180 }
181 
182 void
183 ff_dpdk_deregister_if(struct ff_dpdk_if_context *ctx)
184 {
185     free(ctx);
186 }
187 
188 static void
189 check_all_ports_link_status(void)
190 {
191     #define CHECK_INTERVAL 100 /* 100ms */
192     #define MAX_CHECK_TIME 90  /* 9s (90 * 100ms) in total */
193 
194     uint8_t portid, count, all_ports_up, print_flag = 0;
195     struct rte_eth_link link;
196 
197     printf("\nChecking link status");
198     fflush(stdout);
199 
200     int i, nb_ports;
201     nb_ports = ff_global_cfg.dpdk.nb_ports;
202     for (count = 0; count <= MAX_CHECK_TIME; count++) {
203         all_ports_up = 1;
204         for (i = 0; i < nb_ports; i++) {
205             uint8_t portid = ff_global_cfg.dpdk.port_cfgs[i].port_id;
206             memset(&link, 0, sizeof(link));
207             rte_eth_link_get_nowait(portid, &link);
208 
209             /* print link status if flag set */
210             if (print_flag == 1) {
211                 if (link.link_status) {
212                     printf("Port %d Link Up - speed %u "
213                         "Mbps - %s\n", (int)portid,
214                         (unsigned)link.link_speed,
215                         (link.link_duplex == ETH_LINK_FULL_DUPLEX) ?
216                         ("full-duplex") : ("half-duplex\n"));
217                 } else {
218                     printf("Port %d Link Down\n", (int)portid);
219                 }
220                 continue;
221             }
222             /* clear all_ports_up flag if any link down */
223             if (link.link_status == 0) {
224                 all_ports_up = 0;
225                 break;
226             }
227         }
228 
229         /* after finally printing all link status, get out */
230         if (print_flag == 1)
231             break;
232 
233         if (all_ports_up == 0) {
234             printf(".");
235             fflush(stdout);
236             rte_delay_ms(CHECK_INTERVAL);
237         }
238 
239         /* set the print_flag if all ports up or timeout */
240         if (all_ports_up == 1 || count == (MAX_CHECK_TIME - 1)) {
241             print_flag = 1;
242             printf("done\n");
243         }
244     }
245 }
246 
247 static int
248 xdigit2val(unsigned char c)
249 {
250     int val;
251 
252     if (isdigit(c))
253         val = c - '0';
254     else if (isupper(c))
255         val = c - 'A' + 10;
256     else
257         val = c - 'a' + 10;
258     return val;
259 }
260 
261 static int
262 parse_lcore_mask(const char *coremask, uint16_t *lcore_proc,
263     uint16_t nb_procs)
264 {
265     int i, j, idx = 0;
266     unsigned count = 0;
267     char c;
268     int val;
269 
270     if (coremask == NULL)
271         return -1;
272 
273     /* Remove all blank characters ahead and after.
274      * Remove 0x/0X if exists.
275      */
276     while (isblank(*coremask))
277         coremask++;
278     if (coremask[0] == '0' && ((coremask[1] == 'x')
279         || (coremask[1] == 'X')))
280         coremask += 2;
281 
282     i = strlen(coremask);
283     while ((i > 0) && isblank(coremask[i - 1]))
284         i--;
285 
286     if (i == 0)
287         return -1;
288 
289     for (i = i - 1; i >= 0 && idx < RTE_MAX_LCORE && count < nb_procs; i--) {
290         c = coremask[i];
291         if (isxdigit(c) == 0) {
292             return -1;
293         }
294         val = xdigit2val(c);
295         for (j = 0; j < BITS_PER_HEX && idx < RTE_MAX_LCORE && count < nb_procs;
296             j++, idx++) {
297             if ((1 << j) & val) {
298                 if (!lcore_config[idx].detected) {
299                     RTE_LOG(ERR, EAL, "lcore %u unavailable\n", idx);
300                     return -1;
301                 }
302                 lcore_proc[count] = idx;
303                 count++;
304             }
305         }
306     }
307 
308     for (; i >= 0; i--)
309         if (coremask[i] != '0')
310             return -1;
311 
312     if (count < nb_procs)
313         return -1;
314 
315     return 0;
316 }
317 
318 static int
319 init_lcore_conf(void)
320 {
321     uint8_t nb_ports = rte_eth_dev_count();
322     if (nb_ports == 0) {
323         rte_exit(EXIT_FAILURE, "No probed ethernet devices\n");
324     }
325 
326     lcore_conf.proc_id = ff_global_cfg.dpdk.proc_id;
327     lcore_conf.nb_procs = ff_global_cfg.dpdk.nb_procs;
328     lcore_conf.lcore_proc = rte_zmalloc(NULL,
329         sizeof(uint16_t)*lcore_conf.nb_procs, 0);
330     if (lcore_conf.lcore_proc == NULL) {
331         rte_exit(EXIT_FAILURE, "rte_zmalloc lcore_proc failed\n");
332     }
333 
334     int ret = parse_lcore_mask(ff_global_cfg.dpdk.lcore_mask,
335         lcore_conf.lcore_proc, lcore_conf.nb_procs);
336     if (ret < 0) {
337         rte_exit(EXIT_FAILURE, "parse_lcore_mask failed:%s\n",
338             ff_global_cfg.dpdk.lcore_mask);
339     }
340 
341     uint16_t socket_id = 0;
342     if (ff_global_cfg.dpdk.numa_on) {
343         socket_id = rte_lcore_to_socket_id(rte_lcore_id());
344     }
345 
346     lcore_conf.socket_id = socket_id;
347 
348     /* Currently, proc id 1:1 map to rx/tx queue id per port. */
349     uint8_t port_id, enabled_ports = 0;
350     for (port_id = 0; port_id < nb_ports; port_id++) {
351         if (ff_global_cfg.dpdk.port_mask &&
352             (ff_global_cfg.dpdk.port_mask & (1 << port_id)) == 0) {
353             printf("\nSkipping disabled port %d\n", port_id);
354             continue;
355         }
356 
357         if (port_id >= ff_global_cfg.dpdk.nb_ports) {
358             printf("\nSkipping non-configured port %d\n", port_id);
359             break;
360         }
361 
362         uint16_t nb_rx_queue = lcore_conf.nb_rx_queue;
363         lcore_conf.rx_queue_list[nb_rx_queue].port_id = port_id;
364         lcore_conf.rx_queue_list[nb_rx_queue].queue_id = lcore_conf.proc_id;
365         lcore_conf.nb_rx_queue++;
366 
367         lcore_conf.tx_queue_id[port_id] = lcore_conf.proc_id;
368         lcore_conf.pcap[port_id] = ff_global_cfg.dpdk.port_cfgs[enabled_ports].pcap;
369 
370         ff_global_cfg.dpdk.port_cfgs[enabled_ports].port_id = port_id;
371 
372         enabled_ports++;
373     }
374 
375     ff_global_cfg.dpdk.nb_ports = enabled_ports;
376 
377     return 0;
378 }
379 
380 static int
381 init_mem_pool(void)
382 {
383     uint8_t nb_ports = ff_global_cfg.dpdk.nb_ports;
384     uint32_t nb_lcores = ff_global_cfg.dpdk.nb_procs;
385     uint32_t nb_tx_queue = nb_lcores;
386     uint32_t nb_rx_queue = lcore_conf.nb_rx_queue * nb_lcores;
387 
388     unsigned nb_mbuf = RTE_MAX (
389         (nb_rx_queue*RX_QUEUE_SIZE          +
390         nb_ports*nb_lcores*MAX_PKT_BURST    +
391         nb_ports*nb_tx_queue*TX_QUEUE_SIZE  +
392         nb_lcores*MEMPOOL_CACHE_SIZE),
393         (unsigned)8192);
394 
395     unsigned socketid = 0;
396     uint16_t i, lcore_id;
397     char s[64];
398     int numa_on = ff_global_cfg.dpdk.numa_on;
399 
400     for (i = 0; i < lcore_conf.nb_procs; i++) {
401         lcore_id = lcore_conf.lcore_proc[i];
402         if (numa_on) {
403             socketid = rte_lcore_to_socket_id(lcore_id);
404         }
405 
406         if (socketid >= NB_SOCKETS) {
407             rte_exit(EXIT_FAILURE, "Socket %d of lcore %u is out of range %d\n",
408                 socketid, i, NB_SOCKETS);
409         }
410 
411         if (pktmbuf_pool[socketid] != NULL) {
412             continue;
413         }
414 
415         if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
416             snprintf(s, sizeof(s), "mbuf_pool_%d", socketid);
417             pktmbuf_pool[socketid] =
418                 rte_pktmbuf_pool_create(s, nb_mbuf,
419                     MEMPOOL_CACHE_SIZE, 0,
420                     RTE_MBUF_DEFAULT_BUF_SIZE, socketid);
421         } else {
422             snprintf(s, sizeof(s), "mbuf_pool_%d", socketid);
423             pktmbuf_pool[socketid] = rte_mempool_lookup(s);
424         }
425 
426         if (pktmbuf_pool[socketid] == NULL) {
427             rte_exit(EXIT_FAILURE, "Cannot create mbuf pool on socket %d\n", socketid);
428         } else {
429             printf("create mbuf pool on socket %d\n", socketid);
430         }
431     }
432 
433     return 0;
434 }
435 
436 static int
437 init_arp_ring(void)
438 {
439     int i, ret;
440     char name_buf[RTE_RING_NAMESIZE];
441     int nb_procs = ff_global_cfg.dpdk.nb_procs;
442     int proc_id = ff_global_cfg.dpdk.proc_id;
443 
444     /* Allocate arp ring ptr according to eth dev count. */
445     int nb_ports = rte_eth_dev_count();
446     for(i = 0; i < nb_procs; ++i) {
447         snprintf(name_buf, RTE_RING_NAMESIZE, "ring_ptr_%d_%d",
448             proc_id, i);
449 
450         arp_ring[i] = rte_zmalloc(name_buf,
451             sizeof(struct rte_ring *) * nb_ports,
452              RTE_CACHE_LINE_SIZE);
453         if (arp_ring[i] == NULL) {
454             rte_exit(EXIT_FAILURE, "rte_zmalloc(%s (struct rte_ring*)) "
455                 "failed\n", name_buf);
456         }
457     }
458 
459     unsigned socketid = lcore_conf.socket_id;
460 
461     /* Create ring according to ports actually being used. */
462     nb_ports = ff_global_cfg.dpdk.nb_ports;
463     for (i = 0; i < nb_ports; i++) {
464         uint8_t port_id = ff_global_cfg.dpdk.port_cfgs[i].port_id;
465 
466         for(i = 0; i < nb_procs; ++i) {
467             snprintf(name_buf, RTE_RING_NAMESIZE, "ring_%d_%d", i, port_id);
468             if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
469                 arp_ring[i][port_id] = rte_ring_create(name_buf,
470                     ARP_RING_SIZE, socketid,
471                     RING_F_SC_DEQ);
472             } else {
473                 arp_ring[i][port_id] = rte_ring_lookup(name_buf);
474             }
475 
476             if (arp_ring[i][port_id] == NULL)
477                 rte_panic("create kni ring::%s failed!\n", name_buf);
478 
479             if (rte_ring_lookup(name_buf) != arp_ring[i][port_id])
480                 rte_panic("lookup kni ring:%s failed!\n", name_buf);
481 
482             printf("create arp ring:%s success, %u ring entries are now free!\n",
483                 name_buf, rte_ring_free_count(arp_ring[i][port_id]));
484         }
485     }
486 
487     return 0;
488 }
489 
490 static int
491 init_kni(void)
492 {
493     int nb_ports = rte_eth_dev_count();
494     int accept = 0;
495     if(strcasecmp(ff_global_cfg.kni.method, "accept") == 0)
496         accept = 1;
497 
498     ff_kni_init(nb_ports,
499         ff_global_cfg.kni.tcp_port,
500         ff_global_cfg.kni.udp_port,
501         accept);
502 
503     unsigned socket_id = lcore_conf.socket_id;
504     struct rte_mempool *mbuf_pool = pktmbuf_pool[socket_id];
505 
506     nb_ports = ff_global_cfg.dpdk.nb_ports;
507     int i, ret;
508     for (i = 0; i < nb_ports; i++) {
509         uint8_t port_id = ff_global_cfg.dpdk.port_cfgs[i].port_id;
510         ff_kni_alloc(port_id, socket_id, mbuf_pool);
511     }
512 
513     return 0;
514 }
515 
516 static int
517 init_port_start(void)
518 {
519     int nb_ports = ff_global_cfg.dpdk.nb_ports;
520     uint16_t nb_procs = ff_global_cfg.dpdk.nb_procs;
521     unsigned socketid = rte_lcore_to_socket_id(rte_lcore_id());
522     struct rte_mempool *mbuf_pool = pktmbuf_pool[socketid];
523     uint16_t i;
524 
525     for (i = 0; i < nb_ports; i++) {
526         uint8_t port_id = ff_global_cfg.dpdk.port_cfgs[i].port_id;
527 
528         struct rte_eth_dev_info dev_info;
529         rte_eth_dev_info_get(port_id, &dev_info);
530 
531         if (nb_procs > dev_info.max_rx_queues) {
532             rte_exit(EXIT_FAILURE, "num_procs[%d] bigger than max_rx_queues[%d]\n",
533                 nb_procs,
534                 dev_info.max_rx_queues);
535         }
536 
537         if (nb_procs > dev_info.max_tx_queues) {
538             rte_exit(EXIT_FAILURE, "num_procs[%d] bigger than max_tx_queues[%d]\n",
539                 nb_procs,
540                 dev_info.max_tx_queues);
541         }
542 
543         struct ether_addr addr;
544         rte_eth_macaddr_get(port_id, &addr);
545         printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
546                    " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
547                 (unsigned)port_id,
548                 addr.addr_bytes[0], addr.addr_bytes[1],
549                 addr.addr_bytes[2], addr.addr_bytes[3],
550                 addr.addr_bytes[4], addr.addr_bytes[5]);
551 
552         rte_memcpy(ff_global_cfg.dpdk.port_cfgs[port_id].mac,
553             addr.addr_bytes, ETHER_ADDR_LEN);
554 
555         if (rte_eal_process_type() != RTE_PROC_PRIMARY) {
556             return 0;
557         }
558 
559         /*
560          * TODO:
561          * Set port conf according to dev's capability.
562          */
563         struct rte_eth_conf port_conf = default_port_conf;
564 
565         /* Currently, proc id 1:1 map to queue id per port. */
566         int ret = rte_eth_dev_configure(port_id, nb_procs, nb_procs, &port_conf);
567         if (ret != 0) {
568             return ret;
569         }
570 
571         uint16_t q;
572         for (q = 0; q < nb_procs; q++) {
573             ret = rte_eth_tx_queue_setup(port_id, q, TX_QUEUE_SIZE,
574                     socketid, &dev_info.default_txconf);
575             if (ret < 0) {
576                 return ret;
577             }
578 
579             ret = rte_eth_rx_queue_setup(port_id, q, RX_QUEUE_SIZE,
580                     socketid, &dev_info.default_rxconf, mbuf_pool);
581             if (ret < 0) {
582                 return ret;
583             }
584         }
585 
586         ret = rte_eth_dev_start(port_id);
587         if (ret < 0) {
588             return ret;
589         }
590 
591         /* Enable RX in promiscuous mode for the Ethernet device. */
592         if (ff_global_cfg.dpdk.promiscuous) {
593             rte_eth_promiscuous_enable(port_id);
594             ret = rte_eth_promiscuous_get(port_id);
595             if (ret == 1) {
596                 printf("set port %u to promiscuous mode ok\n", port_id);
597             } else {
598                 printf("set port %u to promiscuous mode error\n", port_id);
599             }
600         }
601 
602         /* Enable pcap dump */
603         if (ff_global_cfg.dpdk.port_cfgs[port_id].pcap) {
604             ff_enable_pcap(ff_global_cfg.dpdk.port_cfgs[port_id].pcap);
605         }
606     }
607 
608     return 0;
609 }
610 
611 static int
612 init_freebsd_clock(void)
613 {
614     rte_timer_subsystem_init();
615     uint64_t hz = rte_get_timer_hz();
616     uint64_t intrs = MS_PER_S/ff_global_cfg.freebsd.hz;
617     uint64_t tsc = (hz + MS_PER_S - 1) / MS_PER_S*intrs;
618 
619     rte_timer_init(&freebsd_clock);
620     rte_timer_reset(&freebsd_clock, tsc, PERIODICAL,
621         rte_lcore_id(), &freebsd_hardclock_job, NULL);
622 
623     return 0;
624 }
625 
626 int
627 ff_dpdk_init(int argc, char **argv)
628 {
629     if (ff_global_cfg.dpdk.nb_procs < 1 ||
630         ff_global_cfg.dpdk.nb_procs > RTE_MAX_LCORE ||
631         ff_global_cfg.dpdk.proc_id >= ff_global_cfg.dpdk.nb_procs ||
632         ff_global_cfg.dpdk.nb_procs < 0) {
633         printf("param num_procs[%d] or proc_id[%d] error!\n",
634             ff_global_cfg.dpdk.nb_procs,
635             ff_global_cfg.dpdk.proc_id);
636         exit(1);
637     }
638 
639     int ret = rte_eal_init(argc, argv);
640     if (ret < 0) {
641         rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
642     }
643 
644     init_lcore_conf();
645 
646     init_mem_pool();
647 
648     init_arp_ring();
649 
650     enable_kni = ff_global_cfg.kni.enable;
651     if (enable_kni) {
652         init_kni();
653     }
654 
655     ret = init_port_start();
656     if (ret < 0) {
657         rte_exit(EXIT_FAILURE, "init_port_start failed\n");
658     }
659 
660     check_all_ports_link_status();
661 
662     init_freebsd_clock();
663 
664     return 0;
665 }
666 
667 static void
668 ff_veth_input(void *ifp, struct rte_mbuf *pkt)
669 {
670     void *data = rte_pktmbuf_mtod(pkt, void*);
671     uint16_t len = rte_pktmbuf_data_len(pkt);
672 
673     void *hdr = ff_mbuf_gethdr(pkt, pkt->pkt_len, data, len);
674     if (hdr == NULL) {
675         rte_pktmbuf_free(pkt);
676         return;
677     }
678 
679     pkt = pkt->next;
680     void *prev = hdr;
681     while(pkt != NULL) {
682         data = rte_pktmbuf_mtod(pkt, void*);
683         len = rte_pktmbuf_data_len(pkt);
684 
685         void *mb = ff_mbuf_get(prev, data, len);
686         if (mb == NULL) {
687             ff_mbuf_free(hdr);
688             return;
689         }
690         pkt = pkt->next;
691         prev = mb;
692     }
693 
694     ff_veth_process_packet(ifp, hdr);
695 }
696 
697 static enum FilterReturn
698 protocol_filter(const void *data, uint16_t len)
699 {
700     if(len < sizeof(struct ether_hdr))
701         return FILTER_UNKNOWN;
702 
703     const struct ether_hdr *hdr;
704     hdr = (const struct ether_hdr *)data;
705 
706     if(ntohs(hdr->ether_type) == ETHER_TYPE_ARP)
707         return FILTER_ARP;
708 
709     if (!enable_kni) {
710         return FILTER_UNKNOWN;
711     }
712 
713     if(ntohs(hdr->ether_type) != ETHER_TYPE_IPv4)
714         return FILTER_UNKNOWN;
715 
716     return ff_kni_proto_filter(data + sizeof(struct ether_hdr),
717         len - sizeof(struct ether_hdr));
718 }
719 
720 static inline void
721 process_packets(uint8_t port_id, uint16_t queue_id, struct rte_mbuf **bufs,
722     uint16_t count, void *ifp, int pkts_from_ring)
723 {
724     struct lcore_conf *qconf = &lcore_conf;
725 
726     uint16_t i;
727     for (i = 0; i < count; i++) {
728         struct rte_mbuf *rtem = bufs[i];
729 
730         if (unlikely(qconf->pcap[port_id] != NULL)) {
731             ff_dump_packets(qconf->pcap[port_id], rtem);
732         }
733 
734         void *data = rte_pktmbuf_mtod(rtem, void*);
735         uint16_t len = rte_pktmbuf_data_len(rtem);
736 
737         enum FilterReturn filter = protocol_filter(data, len);
738         if (filter == FILTER_UNKNOWN) {
739             ff_veth_input(ifp, rtem);
740         } else if (filter == FILTER_KNI) {
741             ff_kni_enqueue(port_id, rtem);
742         } else {
743             struct rte_mempool *mbuf_pool;
744             struct rte_mbuf *mbuf_clone;
745             if (pkts_from_ring == 0) {
746                 uint16_t i;
747                 for(i = 0; i < qconf->nb_procs; ++i) {
748                     if(i == queue_id)
749                         continue;
750 
751                     mbuf_pool = pktmbuf_pool[rte_lcore_to_socket_id(qconf->lcore_proc[i])];
752                     mbuf_clone = rte_pktmbuf_clone(rtem, mbuf_pool);
753                     if(mbuf_clone) {
754                         int ret = rte_ring_enqueue(arp_ring[i][port_id], mbuf_clone);
755                         if (ret < 0)
756                             rte_pktmbuf_free(mbuf_clone);
757                     }
758                 }
759             }
760 
761             if (enable_kni && rte_eal_process_type() == RTE_PROC_PRIMARY) {
762                 mbuf_pool = pktmbuf_pool[qconf->socket_id];
763                 mbuf_clone = rte_pktmbuf_clone(rtem, mbuf_pool);
764                 if(mbuf_clone) {
765                     ff_kni_enqueue(port_id, rtem);
766                 }
767             }
768 
769             ff_veth_input(ifp, rtem);
770         }
771     }
772 }
773 
774 static inline int
775 process_arp_ring(uint8_t port_id, uint16_t queue_id,
776     struct rte_mbuf **pkts_burst, void *ifp)
777 {
778     /* read packet from ring buf and to process */
779     uint16_t nb_tx;
780     nb_tx = rte_ring_dequeue_burst(arp_ring[queue_id][port_id],
781         (void **)pkts_burst, MAX_PKT_BURST);
782 
783     if(nb_tx > 0) {
784         process_packets(port_id, queue_id, pkts_burst, nb_tx, ifp, 1);
785     }
786 
787     return 0;
788 }
789 
790 /* Send burst of packets on an output interface */
791 static inline int
792 send_burst(struct lcore_conf *qconf, uint16_t n, uint8_t port)
793 {
794     struct rte_mbuf **m_table;
795     int ret;
796     uint16_t queueid;
797 
798     queueid = qconf->tx_queue_id[port];
799     m_table = (struct rte_mbuf **)qconf->tx_mbufs[port].m_table;
800 
801     if (unlikely(qconf->pcap[port] != NULL)) {
802         uint16_t i;
803         for (i = 0; i < n; i++) {
804             ff_dump_packets(qconf->pcap[port], m_table[i]);
805         }
806     }
807 
808     ret = rte_eth_tx_burst(port, queueid, m_table, n);
809     if (unlikely(ret < n)) {
810         do {
811             rte_pktmbuf_free(m_table[ret]);
812         } while (++ret < n);
813     }
814 
815     return 0;
816 }
817 
818 /* Enqueue a single packet, and send burst if queue is filled */
819 static inline int
820 send_single_packet(struct rte_mbuf *m, uint8_t port)
821 {
822     uint16_t len;
823     struct lcore_conf *qconf;
824 
825     qconf = &lcore_conf;
826     len = qconf->tx_mbufs[port].len;
827     qconf->tx_mbufs[port].m_table[len] = m;
828     len++;
829 
830     /* enough pkts to be sent */
831     if (unlikely(len == MAX_PKT_BURST)) {
832         send_burst(qconf, MAX_PKT_BURST, port);
833         len = 0;
834     }
835 
836     qconf->tx_mbufs[port].len = len;
837     return 0;
838 }
839 
840 int
841 ff_dpdk_if_send(struct ff_dpdk_if_context *ctx, void *m,
842     int total)
843 {
844     struct rte_mempool *mbuf_pool = pktmbuf_pool[lcore_conf.socket_id];
845     struct rte_mbuf *head = rte_pktmbuf_alloc(mbuf_pool);
846     if (head == NULL) {
847         ff_mbuf_free(m);
848         return -1;
849     }
850 
851     head->pkt_len = total;
852 
853     int off = 0;
854     struct rte_mbuf *cur = head, *prev = NULL;
855     while(total > 0) {
856         if (cur == NULL) {
857             struct rte_mbuf *cur = rte_pktmbuf_alloc(mbuf_pool);
858             if (cur == NULL) {
859                 rte_pktmbuf_free(head);
860                 ff_mbuf_free(m);
861                 return -1;
862             }
863         }
864 
865         void *data = rte_pktmbuf_mtod(cur, void*);
866         int len = total > RTE_MBUF_DEFAULT_DATAROOM ? RTE_MBUF_DEFAULT_DATAROOM : total;
867         int ret = ff_mbuf_copydata(m, data, off, len);
868         if (ret < 0) {
869             rte_pktmbuf_free(head);
870             ff_mbuf_free(m);
871             return -1;
872         }
873 
874         if (prev == NULL) {
875             prev = cur;
876         } else {
877             prev->next = cur;
878         }
879 
880         cur->data_len = len;
881         off += len;
882         total -= len;
883         head->nb_segs++;
884     }
885 
886     /*
887      * FIXME: set offload flags according to mbuf.pkthdr;
888      */
889     head->ol_flags = 0;
890     head->vlan_tci = 0;
891 
892     ff_mbuf_free(m);
893 
894     return send_single_packet(head, ctx->port_id);
895 }
896 
897 static int
898 main_loop(void *arg)
899 {
900     struct loop_routine *lr = (struct loop_routine *)arg;
901 
902     struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
903     unsigned lcore_id;
904     uint64_t prev_tsc, diff_tsc, cur_tsc;
905     int i, j, nb_rx;
906     uint8_t port_id, queue_id;
907     struct lcore_conf *qconf;
908     const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) /
909         US_PER_S * BURST_TX_DRAIN_US;
910     void *ifp;
911 
912     prev_tsc = 0;
913 
914     lcore_id = rte_lcore_id();
915     qconf = &lcore_conf;
916 
917     if (qconf->nb_rx_queue == 0) {
918         printf("lcore %u has nothing to do\n", lcore_id);
919         return 0;
920     }
921 
922     while (1) {
923         cur_tsc = rte_rdtsc();
924         if (unlikely(freebsd_clock.expire < cur_tsc)) {
925             rte_timer_manage();
926         }
927 
928         /*
929          * TX burst queue drain
930          */
931         diff_tsc = cur_tsc - prev_tsc;
932         if (unlikely(diff_tsc > drain_tsc)) {
933             /*
934              * This could be optimized (use queueid instead of
935              * portid), but it is not called so often
936              */
937             for (port_id = 0; port_id < RTE_MAX_ETHPORTS; port_id++) {
938                 if (qconf->tx_mbufs[port_id].len == 0)
939                     continue;
940                 send_burst(qconf,
941                     qconf->tx_mbufs[port_id].len,
942                     port_id);
943                 qconf->tx_mbufs[port_id].len = 0;
944             }
945 
946             prev_tsc = cur_tsc;
947         }
948 
949         /*
950          * Read packet from RX queues
951          */
952         for (i = 0; i < qconf->nb_rx_queue; ++i) {
953             port_id = qconf->rx_queue_list[i].port_id;
954             queue_id = qconf->rx_queue_list[i].queue_id;
955             ifp = veth_ctx[port_id]->ifp;
956 
957             if (enable_kni && rte_eal_process_type() == RTE_PROC_PRIMARY) {
958                 ff_kni_process(port_id, queue_id, pkts_burst, MAX_PKT_BURST);
959             }
960 
961             process_arp_ring(port_id, queue_id, pkts_burst, ifp);
962 
963             nb_rx = rte_eth_rx_burst(port_id, queue_id, pkts_burst,
964                 MAX_PKT_BURST);
965             if (nb_rx == 0)
966                 continue;
967 
968             /* Prefetch first packets */
969             for (j = 0; j < PREFETCH_OFFSET && j < nb_rx; j++) {
970                 rte_prefetch0(rte_pktmbuf_mtod(
971                         pkts_burst[j], void *));
972             }
973 
974             /* Prefetch and handle already prefetched packets */
975             for (j = 0; j < (nb_rx - PREFETCH_OFFSET); j++) {
976                 rte_prefetch0(rte_pktmbuf_mtod(pkts_burst[
977                         j + PREFETCH_OFFSET], void *));
978                 process_packets(port_id, queue_id, &pkts_burst[j], 1, ifp, 0);
979             }
980 
981             /* Handle remaining prefetched packets */
982             for (; j < nb_rx; j++) {
983                 process_packets(port_id, queue_id, &pkts_burst[j], 1, ifp, 0);
984             }
985         }
986 
987         if (likely(lr->loop != NULL)) {
988             lr->loop(lr->arg);
989         }
990     }
991 }
992 
993 int
994 ff_dpdk_if_up(void) {
995     int nb_ports = ff_global_cfg.dpdk.nb_ports;
996     int i;
997     for (i = 0; i < nb_ports; i++) {
998         uint8_t port_id = ff_global_cfg.dpdk.port_cfgs[i].port_id;
999         veth_ctx[port_id] = ff_veth_attach(ff_global_cfg.dpdk.port_cfgs + i);
1000         if (veth_ctx[port_id] == NULL) {
1001             rte_exit(EXIT_FAILURE, "ff_veth_attach failed");
1002         }
1003     }
1004 
1005     return 0;
1006 }
1007 
1008 void
1009 ff_dpdk_run(loop_func_t loop, void *arg) {
1010     struct loop_routine *lr = malloc(sizeof(struct loop_routine));
1011     lr->loop = loop;
1012     lr->arg = arg;
1013     rte_eal_mp_remote_launch(main_loop, lr, CALL_MASTER);
1014     rte_eal_mp_wait_lcore();
1015     free(lr);
1016 }
1017 
1018 void
1019 ff_dpdk_pktmbuf_free(void *m)
1020 {
1021     rte_pktmbuf_free((struct rte_mbuf *)m);
1022 }
1023 
1024 static uint32_t
1025 toeplitz_hash(unsigned keylen, const uint8_t *key,
1026     unsigned datalen, const uint8_t *data)
1027 {
1028     uint32_t hash = 0, v;
1029     u_int i, b;
1030 
1031     /* XXXRW: Perhaps an assertion about key length vs. data length? */
1032 
1033     v = (key[0]<<24) + (key[1]<<16) + (key[2] <<8) + key[3];
1034     for (i = 0; i < datalen; i++) {
1035         for (b = 0; b < 8; b++) {
1036             if (data[i] & (1<<(7-b)))
1037                 hash ^= v;
1038             v <<= 1;
1039             if ((i + 4) < keylen &&
1040                 (key[i+4] & (1<<(7-b))))
1041                 v |= 1;
1042         }
1043     }
1044     return (hash);
1045 }
1046 
1047 int
1048 ff_rss_check(uint32_t saddr, uint32_t daddr, uint16_t sport, uint16_t dport)
1049 {
1050     struct lcore_conf *qconf = &lcore_conf;
1051 
1052     if (qconf->nb_procs == 1) {
1053         return 1;
1054     }
1055 
1056     uint8_t data[sizeof(saddr) + sizeof(daddr) + sizeof(sport) +
1057         sizeof(dport)];
1058 
1059     unsigned datalen = 0;
1060 
1061     bcopy(&saddr, &data[datalen], sizeof(saddr));
1062     datalen += sizeof(saddr);
1063 
1064     bcopy(&daddr, &data[datalen], sizeof(daddr));
1065     datalen += sizeof(daddr);
1066 
1067     bcopy(&sport, &data[datalen], sizeof(sport));
1068     datalen += sizeof(sport);
1069 
1070     bcopy(&dport, &data[datalen], sizeof(dport));
1071     datalen += sizeof(dport);
1072 
1073     uint32_t hash = toeplitz_hash(sizeof(default_rsskey_40bytes), default_rsskey_40bytes, datalen, data);
1074 
1075     return (hash % qconf->nb_procs) == qconf->proc_id;
1076 }
1077 
1078 
1079