1 /*- 2 * BSD LICENSE 3 * 4 * Copyright(c) 2010-2014 Intel Corporation. All rights reserved. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include <stdint.h> 35 #include <inttypes.h> 36 #include <unistd.h> 37 #include <signal.h> 38 #include <getopt.h> 39 40 #include <rte_eal.h> 41 #include <rte_ethdev.h> 42 #include <rte_cycles.h> 43 #include <rte_malloc.h> 44 #include <rte_debug.h> 45 #include <rte_prefetch.h> 46 #include <rte_distributor.h> 47 48 #define RX_RING_SIZE 256 49 #define TX_RING_SIZE 512 50 #define NUM_MBUFS ((64*1024)-1) 51 #define MBUF_CACHE_SIZE 250 52 #define BURST_SIZE 32 53 #define RTE_RING_SZ 1024 54 55 #define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1 56 57 /* mask of enabled ports */ 58 static uint32_t enabled_port_mask; 59 volatile uint8_t quit_signal; 60 volatile uint8_t quit_signal_rx; 61 62 static volatile struct app_stats { 63 struct { 64 uint64_t rx_pkts; 65 uint64_t returned_pkts; 66 uint64_t enqueued_pkts; 67 } rx __rte_cache_aligned; 68 69 struct { 70 uint64_t dequeue_pkts; 71 uint64_t tx_pkts; 72 } tx __rte_cache_aligned; 73 } app_stats; 74 75 static const struct rte_eth_conf port_conf_default = { 76 .rxmode = { 77 .mq_mode = ETH_MQ_RX_RSS, 78 .max_rx_pkt_len = ETHER_MAX_LEN, 79 }, 80 .txmode = { 81 .mq_mode = ETH_MQ_TX_NONE, 82 }, 83 .rx_adv_conf = { 84 .rss_conf = { 85 .rss_hf = ETH_RSS_IP | ETH_RSS_UDP | 86 ETH_RSS_TCP | ETH_RSS_SCTP, 87 } 88 }, 89 }; 90 91 struct output_buffer { 92 unsigned count; 93 struct rte_mbuf *mbufs[BURST_SIZE]; 94 }; 95 96 /* 97 * Initialises a given port using global settings and with the rx buffers 98 * coming from the mbuf_pool passed as parameter 99 */ 100 static inline int 101 port_init(uint8_t port, struct rte_mempool *mbuf_pool) 102 { 103 struct rte_eth_conf port_conf = port_conf_default; 104 const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1; 105 int retval; 106 uint16_t q; 107 108 if (port >= rte_eth_dev_count()) 109 return -1; 110 111 retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf); 112 if (retval != 0) 113 return retval; 114 115 for (q = 0; q < rxRings; q++) { 116 retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE, 117 rte_eth_dev_socket_id(port), 118 NULL, mbuf_pool); 119 if (retval < 0) 120 return retval; 121 } 122 123 for (q = 0; q < txRings; q++) { 124 retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE, 125 rte_eth_dev_socket_id(port), 126 NULL); 127 if (retval < 0) 128 return retval; 129 } 130 131 retval = rte_eth_dev_start(port); 132 if (retval < 0) 133 return retval; 134 135 struct rte_eth_link link; 136 rte_eth_link_get_nowait(port, &link); 137 if (!link.link_status) { 138 sleep(1); 139 rte_eth_link_get_nowait(port, &link); 140 } 141 142 if (!link.link_status) { 143 printf("Link down on port %"PRIu8"\n", port); 144 return 0; 145 } 146 147 struct ether_addr addr; 148 rte_eth_macaddr_get(port, &addr); 149 printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8 150 " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n", 151 (unsigned)port, 152 addr.addr_bytes[0], addr.addr_bytes[1], 153 addr.addr_bytes[2], addr.addr_bytes[3], 154 addr.addr_bytes[4], addr.addr_bytes[5]); 155 156 rte_eth_promiscuous_enable(port); 157 158 return 0; 159 } 160 161 struct lcore_params { 162 unsigned worker_id; 163 struct rte_distributor *d; 164 struct rte_ring *r; 165 struct rte_mempool *mem_pool; 166 }; 167 168 static int 169 quit_workers(struct rte_distributor *d, struct rte_mempool *p) 170 { 171 const unsigned num_workers = rte_lcore_count() - 2; 172 unsigned i; 173 struct rte_mbuf *bufs[num_workers]; 174 175 if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) { 176 printf("line %d: Error getting mbufs from pool\n", __LINE__); 177 return -1; 178 } 179 180 for (i = 0; i < num_workers; i++) 181 bufs[i]->hash.rss = i << 1; 182 183 rte_distributor_process(d, bufs, num_workers); 184 rte_mempool_put_bulk(p, (void *)bufs, num_workers); 185 186 return 0; 187 } 188 189 static int 190 lcore_rx(struct lcore_params *p) 191 { 192 struct rte_distributor *d = p->d; 193 struct rte_mempool *mem_pool = p->mem_pool; 194 struct rte_ring *r = p->r; 195 const uint8_t nb_ports = rte_eth_dev_count(); 196 const int socket_id = rte_socket_id(); 197 uint8_t port; 198 199 for (port = 0; port < nb_ports; port++) { 200 /* skip ports that are not enabled */ 201 if ((enabled_port_mask & (1 << port)) == 0) 202 continue; 203 204 if (rte_eth_dev_socket_id(port) > 0 && 205 rte_eth_dev_socket_id(port) != socket_id) 206 printf("WARNING, port %u is on remote NUMA node to " 207 "RX thread.\n\tPerformance will not " 208 "be optimal.\n", port); 209 } 210 211 printf("\nCore %u doing packet RX.\n", rte_lcore_id()); 212 port = 0; 213 while (!quit_signal_rx) { 214 215 /* skip ports that are not enabled */ 216 if ((enabled_port_mask & (1 << port)) == 0) { 217 if (++port == nb_ports) 218 port = 0; 219 continue; 220 } 221 struct rte_mbuf *bufs[BURST_SIZE*2]; 222 const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs, 223 BURST_SIZE); 224 if (unlikely(nb_rx == 0)) { 225 if (++port == nb_ports) 226 port = 0; 227 continue; 228 } 229 app_stats.rx.rx_pkts += nb_rx; 230 231 rte_distributor_process(d, bufs, nb_rx); 232 const uint16_t nb_ret = rte_distributor_returned_pkts(d, 233 bufs, BURST_SIZE*2); 234 app_stats.rx.returned_pkts += nb_ret; 235 if (unlikely(nb_ret == 0)) { 236 if (++port == nb_ports) 237 port = 0; 238 continue; 239 } 240 241 uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, nb_ret); 242 app_stats.rx.enqueued_pkts += sent; 243 if (unlikely(sent < nb_ret)) { 244 RTE_LOG(DEBUG, DISTRAPP, 245 "%s:Packet loss due to full ring\n", __func__); 246 while (sent < nb_ret) 247 rte_pktmbuf_free(bufs[sent++]); 248 } 249 if (++port == nb_ports) 250 port = 0; 251 } 252 rte_distributor_process(d, NULL, 0); 253 /* flush distributor to bring to known state */ 254 rte_distributor_flush(d); 255 /* set worker & tx threads quit flag */ 256 quit_signal = 1; 257 /* 258 * worker threads may hang in get packet as 259 * distributor process is not running, just make sure workers 260 * get packets till quit_signal is actually been 261 * received and they gracefully shutdown 262 */ 263 if (quit_workers(d, mem_pool) != 0) 264 return -1; 265 /* rx thread should quit at last */ 266 return 0; 267 } 268 269 static inline void 270 flush_one_port(struct output_buffer *outbuf, uint8_t outp) 271 { 272 unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs, 273 outbuf->count); 274 app_stats.tx.tx_pkts += nb_tx; 275 276 if (unlikely(nb_tx < outbuf->count)) { 277 RTE_LOG(DEBUG, DISTRAPP, 278 "%s:Packet loss with tx_burst\n", __func__); 279 do { 280 rte_pktmbuf_free(outbuf->mbufs[nb_tx]); 281 } while (++nb_tx < outbuf->count); 282 } 283 outbuf->count = 0; 284 } 285 286 static inline void 287 flush_all_ports(struct output_buffer *tx_buffers, uint8_t nb_ports) 288 { 289 uint8_t outp; 290 for (outp = 0; outp < nb_ports; outp++) { 291 /* skip ports that are not enabled */ 292 if ((enabled_port_mask & (1 << outp)) == 0) 293 continue; 294 295 if (tx_buffers[outp].count == 0) 296 continue; 297 298 flush_one_port(&tx_buffers[outp], outp); 299 } 300 } 301 302 static int 303 lcore_tx(struct rte_ring *in_r) 304 { 305 static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS]; 306 const uint8_t nb_ports = rte_eth_dev_count(); 307 const int socket_id = rte_socket_id(); 308 uint8_t port; 309 310 for (port = 0; port < nb_ports; port++) { 311 /* skip ports that are not enabled */ 312 if ((enabled_port_mask & (1 << port)) == 0) 313 continue; 314 315 if (rte_eth_dev_socket_id(port) > 0 && 316 rte_eth_dev_socket_id(port) != socket_id) 317 printf("WARNING, port %u is on remote NUMA node to " 318 "TX thread.\n\tPerformance will not " 319 "be optimal.\n", port); 320 } 321 322 printf("\nCore %u doing packet TX.\n", rte_lcore_id()); 323 while (!quit_signal) { 324 325 for (port = 0; port < nb_ports; port++) { 326 /* skip ports that are not enabled */ 327 if ((enabled_port_mask & (1 << port)) == 0) 328 continue; 329 330 struct rte_mbuf *bufs[BURST_SIZE]; 331 const uint16_t nb_rx = rte_ring_dequeue_burst(in_r, 332 (void *)bufs, BURST_SIZE); 333 app_stats.tx.dequeue_pkts += nb_rx; 334 335 /* if we get no traffic, flush anything we have */ 336 if (unlikely(nb_rx == 0)) { 337 flush_all_ports(tx_buffers, nb_ports); 338 continue; 339 } 340 341 /* for traffic we receive, queue it up for transmit */ 342 uint16_t i; 343 rte_prefetch_non_temporal((void *)bufs[0]); 344 rte_prefetch_non_temporal((void *)bufs[1]); 345 rte_prefetch_non_temporal((void *)bufs[2]); 346 for (i = 0; i < nb_rx; i++) { 347 struct output_buffer *outbuf; 348 uint8_t outp; 349 rte_prefetch_non_temporal((void *)bufs[i + 3]); 350 /* 351 * workers should update in_port to hold the 352 * output port value 353 */ 354 outp = bufs[i]->port; 355 /* skip ports that are not enabled */ 356 if ((enabled_port_mask & (1 << outp)) == 0) 357 continue; 358 359 outbuf = &tx_buffers[outp]; 360 outbuf->mbufs[outbuf->count++] = bufs[i]; 361 if (outbuf->count == BURST_SIZE) 362 flush_one_port(outbuf, outp); 363 } 364 } 365 } 366 return 0; 367 } 368 369 static void 370 int_handler(int sig_num) 371 { 372 printf("Exiting on signal %d\n", sig_num); 373 /* set quit flag for rx thread to exit */ 374 quit_signal_rx = 1; 375 } 376 377 static void 378 print_stats(void) 379 { 380 struct rte_eth_stats eth_stats; 381 unsigned i; 382 383 printf("\nRX thread stats:\n"); 384 printf(" - Received: %"PRIu64"\n", app_stats.rx.rx_pkts); 385 printf(" - Processed: %"PRIu64"\n", app_stats.rx.returned_pkts); 386 printf(" - Enqueued: %"PRIu64"\n", app_stats.rx.enqueued_pkts); 387 388 printf("\nTX thread stats:\n"); 389 printf(" - Dequeued: %"PRIu64"\n", app_stats.tx.dequeue_pkts); 390 printf(" - Transmitted: %"PRIu64"\n", app_stats.tx.tx_pkts); 391 392 for (i = 0; i < rte_eth_dev_count(); i++) { 393 rte_eth_stats_get(i, ð_stats); 394 printf("\nPort %u stats:\n", i); 395 printf(" - Pkts in: %"PRIu64"\n", eth_stats.ipackets); 396 printf(" - Pkts out: %"PRIu64"\n", eth_stats.opackets); 397 printf(" - In Errs: %"PRIu64"\n", eth_stats.ierrors); 398 printf(" - Out Errs: %"PRIu64"\n", eth_stats.oerrors); 399 printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf); 400 } 401 } 402 403 static int 404 lcore_worker(struct lcore_params *p) 405 { 406 struct rte_distributor *d = p->d; 407 const unsigned id = p->worker_id; 408 /* 409 * for single port, xor_val will be zero so we won't modify the output 410 * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa 411 */ 412 const unsigned xor_val = (rte_eth_dev_count() > 1); 413 struct rte_mbuf *buf = NULL; 414 415 printf("\nCore %u acting as worker core.\n", rte_lcore_id()); 416 while (!quit_signal) { 417 buf = rte_distributor_get_pkt(d, id, buf); 418 buf->port ^= xor_val; 419 } 420 return 0; 421 } 422 423 /* display usage */ 424 static void 425 print_usage(const char *prgname) 426 { 427 printf("%s [EAL options] -- -p PORTMASK\n" 428 " -p PORTMASK: hexadecimal bitmask of ports to configure\n", 429 prgname); 430 } 431 432 static int 433 parse_portmask(const char *portmask) 434 { 435 char *end = NULL; 436 unsigned long pm; 437 438 /* parse hexadecimal string */ 439 pm = strtoul(portmask, &end, 16); 440 if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0')) 441 return -1; 442 443 if (pm == 0) 444 return -1; 445 446 return pm; 447 } 448 449 /* Parse the argument given in the command line of the application */ 450 static int 451 parse_args(int argc, char **argv) 452 { 453 int opt; 454 char **argvopt; 455 int option_index; 456 char *prgname = argv[0]; 457 static struct option lgopts[] = { 458 {NULL, 0, 0, 0} 459 }; 460 461 argvopt = argv; 462 463 while ((opt = getopt_long(argc, argvopt, "p:", 464 lgopts, &option_index)) != EOF) { 465 466 switch (opt) { 467 /* portmask */ 468 case 'p': 469 enabled_port_mask = parse_portmask(optarg); 470 if (enabled_port_mask == 0) { 471 printf("invalid portmask\n"); 472 print_usage(prgname); 473 return -1; 474 } 475 break; 476 477 default: 478 print_usage(prgname); 479 return -1; 480 } 481 } 482 483 if (optind <= 1) { 484 print_usage(prgname); 485 return -1; 486 } 487 488 argv[optind-1] = prgname; 489 490 optind = 0; /* reset getopt lib */ 491 return 0; 492 } 493 494 /* Main function, does initialization and calls the per-lcore functions */ 495 int 496 main(int argc, char *argv[]) 497 { 498 struct rte_mempool *mbuf_pool; 499 struct rte_distributor *d; 500 struct rte_ring *output_ring; 501 unsigned lcore_id, worker_id = 0; 502 unsigned nb_ports; 503 uint8_t portid; 504 uint8_t nb_ports_available; 505 506 /* catch ctrl-c so we can print on exit */ 507 signal(SIGINT, int_handler); 508 509 /* init EAL */ 510 int ret = rte_eal_init(argc, argv); 511 if (ret < 0) 512 rte_exit(EXIT_FAILURE, "Error with EAL initialization\n"); 513 argc -= ret; 514 argv += ret; 515 516 /* parse application arguments (after the EAL ones) */ 517 ret = parse_args(argc, argv); 518 if (ret < 0) 519 rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n"); 520 521 if (rte_lcore_count() < 3) 522 rte_exit(EXIT_FAILURE, "Error, This application needs at " 523 "least 3 logical cores to run:\n" 524 "1 lcore for packet RX and distribution\n" 525 "1 lcore for packet TX\n" 526 "and at least 1 lcore for worker threads\n"); 527 528 nb_ports = rte_eth_dev_count(); 529 if (nb_ports == 0) 530 rte_exit(EXIT_FAILURE, "Error: no ethernet ports detected\n"); 531 if (nb_ports != 1 && (nb_ports & 1)) 532 rte_exit(EXIT_FAILURE, "Error: number of ports must be even, except " 533 "when using a single port\n"); 534 535 mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", 536 NUM_MBUFS * nb_ports, MBUF_CACHE_SIZE, 0, 537 RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); 538 if (mbuf_pool == NULL) 539 rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n"); 540 nb_ports_available = nb_ports; 541 542 /* initialize all ports */ 543 for (portid = 0; portid < nb_ports; portid++) { 544 /* skip ports that are not enabled */ 545 if ((enabled_port_mask & (1 << portid)) == 0) { 546 printf("\nSkipping disabled port %d\n", portid); 547 nb_ports_available--; 548 continue; 549 } 550 /* init port */ 551 printf("Initializing port %u... done\n", (unsigned) portid); 552 553 if (port_init(portid, mbuf_pool) != 0) 554 rte_exit(EXIT_FAILURE, "Cannot initialize port %"PRIu8"\n", 555 portid); 556 } 557 558 if (!nb_ports_available) { 559 rte_exit(EXIT_FAILURE, 560 "All available ports are disabled. Please set portmask.\n"); 561 } 562 563 d = rte_distributor_create("PKT_DIST", rte_socket_id(), 564 rte_lcore_count() - 2); 565 if (d == NULL) 566 rte_exit(EXIT_FAILURE, "Cannot create distributor\n"); 567 568 /* 569 * scheduler ring is read only by the transmitter core, but written to 570 * by multiple threads 571 */ 572 output_ring = rte_ring_create("Output_ring", RTE_RING_SZ, 573 rte_socket_id(), RING_F_SC_DEQ); 574 if (output_ring == NULL) 575 rte_exit(EXIT_FAILURE, "Cannot create output ring\n"); 576 577 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 578 if (worker_id == rte_lcore_count() - 2) 579 rte_eal_remote_launch((lcore_function_t *)lcore_tx, 580 output_ring, lcore_id); 581 else { 582 struct lcore_params *p = 583 rte_malloc(NULL, sizeof(*p), 0); 584 if (!p) 585 rte_panic("malloc failure\n"); 586 *p = (struct lcore_params){worker_id, d, output_ring, mbuf_pool}; 587 588 rte_eal_remote_launch((lcore_function_t *)lcore_worker, 589 p, lcore_id); 590 } 591 worker_id++; 592 } 593 /* call lcore_main on master core only */ 594 struct lcore_params p = { 0, d, output_ring, mbuf_pool}; 595 596 if (lcore_rx(&p) != 0) 597 return -1; 598 599 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 600 if (rte_eal_wait_lcore(lcore_id) < 0) 601 return -1; 602 } 603 604 print_stats(); 605 return 0; 606 } 607