1 /* 2 * Copyright (C) 2017 Corelight, Inc. and Universita` di Pisa. All rights reserved. 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions 6 * are met: 7 * 1. Redistributions of source code must retain the above copyright 8 * notice, this list of conditions and the following disclaimer. 9 * 2. Redistributions in binary form must reproduce the above copyright 10 * notice, this list of conditions and the following disclaimer in the 11 * documentation and/or other materials provided with the distribution. 12 * 13 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 14 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 15 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 16 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 17 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 18 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 19 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 20 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 21 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 22 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 23 * SUCH DAMAGE. 24 */ 25 /* $FreeBSD$ */ 26 #include <stdio.h> 27 #include <string.h> 28 #include <ctype.h> 29 #include <stdbool.h> 30 #include <inttypes.h> 31 #include <syslog.h> 32 33 #define NETMAP_WITH_LIBS 34 #include <net/netmap_user.h> 35 #include <sys/poll.h> 36 37 #include <netinet/in.h> /* htonl */ 38 39 #include <pthread.h> 40 41 #include "pkt_hash.h" 42 #include "ctrs.h" 43 44 45 /* 46 * use our version of header structs, rather than bringing in a ton 47 * of platform specific ones 48 */ 49 #ifndef ETH_ALEN 50 #define ETH_ALEN 6 51 #endif 52 53 struct compact_eth_hdr { 54 unsigned char h_dest[ETH_ALEN]; 55 unsigned char h_source[ETH_ALEN]; 56 u_int16_t h_proto; 57 }; 58 59 struct compact_ip_hdr { 60 u_int8_t ihl:4, version:4; 61 u_int8_t tos; 62 u_int16_t tot_len; 63 u_int16_t id; 64 u_int16_t frag_off; 65 u_int8_t ttl; 66 u_int8_t protocol; 67 u_int16_t check; 68 u_int32_t saddr; 69 u_int32_t daddr; 70 }; 71 72 struct compact_ipv6_hdr { 73 u_int8_t priority:4, version:4; 74 u_int8_t flow_lbl[3]; 75 u_int16_t payload_len; 76 u_int8_t nexthdr; 77 u_int8_t hop_limit; 78 struct in6_addr saddr; 79 struct in6_addr daddr; 80 }; 81 82 #define MAX_IFNAMELEN 64 83 #define MAX_PORTNAMELEN (MAX_IFNAMELEN + 40) 84 #define DEF_OUT_PIPES 2 85 #define DEF_EXTRA_BUFS 0 86 #define DEF_BATCH 2048 87 #define DEF_WAIT_LINK 2 88 #define DEF_STATS_INT 600 89 #define BUF_REVOKE 100 90 #define STAT_MSG_MAXSIZE 1024 91 92 static struct { 93 char ifname[MAX_IFNAMELEN]; 94 char base_name[MAX_IFNAMELEN]; 95 int netmap_fd; 96 uint16_t output_rings; 97 uint16_t num_groups; 98 uint32_t extra_bufs; 99 uint16_t batch; 100 int stdout_interval; 101 int syslog_interval; 102 int wait_link; 103 bool busy_wait; 104 } glob_arg; 105 106 /* 107 * the overflow queue is a circular queue of buffers 108 */ 109 struct overflow_queue { 110 char name[MAX_IFNAMELEN + 16]; 111 struct netmap_slot *slots; 112 uint32_t head; 113 uint32_t tail; 114 uint32_t n; 115 uint32_t size; 116 }; 117 118 static struct overflow_queue *freeq; 119 120 static inline int 121 oq_full(struct overflow_queue *q) 122 { 123 return q->n >= q->size; 124 } 125 126 static inline int 127 oq_empty(struct overflow_queue *q) 128 { 129 return q->n <= 0; 130 } 131 132 static inline void 133 oq_enq(struct overflow_queue *q, const struct netmap_slot *s) 134 { 135 if (unlikely(oq_full(q))) { 136 D("%s: queue full!", q->name); 137 abort(); 138 } 139 q->slots[q->tail] = *s; 140 q->n++; 141 q->tail++; 142 if (q->tail >= q->size) 143 q->tail = 0; 144 } 145 146 static inline struct netmap_slot 147 oq_deq(struct overflow_queue *q) 148 { 149 struct netmap_slot s = q->slots[q->head]; 150 if (unlikely(oq_empty(q))) { 151 D("%s: queue empty!", q->name); 152 abort(); 153 } 154 q->n--; 155 q->head++; 156 if (q->head >= q->size) 157 q->head = 0; 158 return s; 159 } 160 161 static volatile int do_abort = 0; 162 163 static uint64_t dropped = 0; 164 static uint64_t forwarded = 0; 165 static uint64_t received_bytes = 0; 166 static uint64_t received_pkts = 0; 167 static uint64_t non_ip = 0; 168 static uint32_t freeq_n = 0; 169 170 struct port_des { 171 char interface[MAX_PORTNAMELEN]; 172 struct my_ctrs ctr; 173 unsigned int last_sync; 174 uint32_t last_tail; 175 struct overflow_queue *oq; 176 struct nm_desc *nmd; 177 struct netmap_ring *ring; 178 struct group_des *group; 179 }; 180 181 static struct port_des *ports; 182 183 /* each group of pipes receives all the packets */ 184 struct group_des { 185 char pipename[MAX_IFNAMELEN]; 186 struct port_des *ports; 187 int first_id; 188 int nports; 189 int last; 190 int custom_port; 191 }; 192 193 static struct group_des *groups; 194 195 /* statistcs */ 196 struct counters { 197 struct timeval ts; 198 struct my_ctrs *ctrs; 199 uint64_t received_pkts; 200 uint64_t received_bytes; 201 uint64_t non_ip; 202 uint32_t freeq_n; 203 int status __attribute__((aligned(64))); 204 #define COUNTERS_EMPTY 0 205 #define COUNTERS_FULL 1 206 }; 207 208 static struct counters counters_buf; 209 210 static void * 211 print_stats(void *arg) 212 { 213 int npipes = glob_arg.output_rings; 214 int sys_int = 0; 215 (void)arg; 216 struct my_ctrs cur, prev; 217 struct my_ctrs *pipe_prev; 218 219 pipe_prev = calloc(npipes, sizeof(struct my_ctrs)); 220 if (pipe_prev == NULL) { 221 D("out of memory"); 222 exit(1); 223 } 224 225 char stat_msg[STAT_MSG_MAXSIZE] = ""; 226 227 memset(&prev, 0, sizeof(prev)); 228 while (!do_abort) { 229 int j, dosyslog = 0, dostdout = 0, newdata; 230 uint64_t pps = 0, dps = 0, bps = 0, dbps = 0, usec = 0; 231 struct my_ctrs x; 232 233 counters_buf.status = COUNTERS_EMPTY; 234 newdata = 0; 235 memset(&cur, 0, sizeof(cur)); 236 sleep(1); 237 if (counters_buf.status == COUNTERS_FULL) { 238 __sync_synchronize(); 239 newdata = 1; 240 cur.t = counters_buf.ts; 241 if (prev.t.tv_sec || prev.t.tv_usec) { 242 usec = (cur.t.tv_sec - prev.t.tv_sec) * 1000000 + 243 cur.t.tv_usec - prev.t.tv_usec; 244 } 245 } 246 247 ++sys_int; 248 if (glob_arg.stdout_interval && sys_int % glob_arg.stdout_interval == 0) 249 dostdout = 1; 250 if (glob_arg.syslog_interval && sys_int % glob_arg.syslog_interval == 0) 251 dosyslog = 1; 252 253 for (j = 0; j < npipes; ++j) { 254 struct my_ctrs *c = &counters_buf.ctrs[j]; 255 cur.pkts += c->pkts; 256 cur.drop += c->drop; 257 cur.drop_bytes += c->drop_bytes; 258 cur.bytes += c->bytes; 259 260 if (usec) { 261 x.pkts = c->pkts - pipe_prev[j].pkts; 262 x.drop = c->drop - pipe_prev[j].drop; 263 x.bytes = c->bytes - pipe_prev[j].bytes; 264 x.drop_bytes = c->drop_bytes - pipe_prev[j].drop_bytes; 265 pps = (x.pkts*1000000 + usec/2) / usec; 266 dps = (x.drop*1000000 + usec/2) / usec; 267 bps = ((x.bytes*1000000 + usec/2) / usec) * 8; 268 dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8; 269 } 270 pipe_prev[j] = *c; 271 272 if ( (dosyslog || dostdout) && newdata ) 273 snprintf(stat_msg, STAT_MSG_MAXSIZE, 274 "{" 275 "\"ts\":%.6f," 276 "\"interface\":\"%s\"," 277 "\"output_ring\":%" PRIu16 "," 278 "\"packets_forwarded\":%" PRIu64 "," 279 "\"packets_dropped\":%" PRIu64 "," 280 "\"data_forward_rate_Mbps\":%.4f," 281 "\"data_drop_rate_Mbps\":%.4f," 282 "\"packet_forward_rate_kpps\":%.4f," 283 "\"packet_drop_rate_kpps\":%.4f," 284 "\"overflow_queue_size\":%" PRIu32 285 "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0), 286 ports[j].interface, 287 j, 288 c->pkts, 289 c->drop, 290 (double)bps / 1024 / 1024, 291 (double)dbps / 1024 / 1024, 292 (double)pps / 1000, 293 (double)dps / 1000, 294 c->oq_n); 295 296 if (dosyslog && stat_msg[0]) 297 syslog(LOG_INFO, "%s", stat_msg); 298 if (dostdout && stat_msg[0]) 299 printf("%s\n", stat_msg); 300 } 301 if (usec) { 302 x.pkts = cur.pkts - prev.pkts; 303 x.drop = cur.drop - prev.drop; 304 x.bytes = cur.bytes - prev.bytes; 305 x.drop_bytes = cur.drop_bytes - prev.drop_bytes; 306 pps = (x.pkts*1000000 + usec/2) / usec; 307 dps = (x.drop*1000000 + usec/2) / usec; 308 bps = ((x.bytes*1000000 + usec/2) / usec) * 8; 309 dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8; 310 } 311 312 if ( (dosyslog || dostdout) && newdata ) 313 snprintf(stat_msg, STAT_MSG_MAXSIZE, 314 "{" 315 "\"ts\":%.6f," 316 "\"interface\":\"%s\"," 317 "\"output_ring\":null," 318 "\"packets_received\":%" PRIu64 "," 319 "\"packets_forwarded\":%" PRIu64 "," 320 "\"packets_dropped\":%" PRIu64 "," 321 "\"non_ip_packets\":%" PRIu64 "," 322 "\"data_forward_rate_Mbps\":%.4f," 323 "\"data_drop_rate_Mbps\":%.4f," 324 "\"packet_forward_rate_kpps\":%.4f," 325 "\"packet_drop_rate_kpps\":%.4f," 326 "\"free_buffer_slots\":%" PRIu32 327 "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0), 328 glob_arg.ifname, 329 received_pkts, 330 cur.pkts, 331 cur.drop, 332 counters_buf.non_ip, 333 (double)bps / 1024 / 1024, 334 (double)dbps / 1024 / 1024, 335 (double)pps / 1000, 336 (double)dps / 1000, 337 counters_buf.freeq_n); 338 339 if (dosyslog && stat_msg[0]) 340 syslog(LOG_INFO, "%s", stat_msg); 341 if (dostdout && stat_msg[0]) 342 printf("%s\n", stat_msg); 343 344 prev = cur; 345 } 346 347 free(pipe_prev); 348 349 return NULL; 350 } 351 352 static void 353 free_buffers(void) 354 { 355 int i, tot = 0; 356 struct port_des *rxport = &ports[glob_arg.output_rings]; 357 358 /* build a netmap free list with the buffers in all the overflow queues */ 359 for (i = 0; i < glob_arg.output_rings + 1; i++) { 360 struct port_des *cp = &ports[i]; 361 struct overflow_queue *q = cp->oq; 362 363 if (!q) 364 continue; 365 366 while (q->n) { 367 struct netmap_slot s = oq_deq(q); 368 uint32_t *b = (uint32_t *)NETMAP_BUF(cp->ring, s.buf_idx); 369 370 *b = rxport->nmd->nifp->ni_bufs_head; 371 rxport->nmd->nifp->ni_bufs_head = s.buf_idx; 372 tot++; 373 } 374 } 375 D("added %d buffers to netmap free list", tot); 376 377 for (i = 0; i < glob_arg.output_rings + 1; ++i) { 378 nm_close(ports[i].nmd); 379 } 380 } 381 382 383 static void sigint_h(int sig) 384 { 385 (void)sig; /* UNUSED */ 386 do_abort = 1; 387 signal(SIGINT, SIG_DFL); 388 } 389 390 static void usage() 391 { 392 printf("usage: lb [options]\n"); 393 printf("where options are:\n"); 394 printf(" -h view help text\n"); 395 printf(" -i iface interface name (required)\n"); 396 printf(" -p [prefix:]npipes add a new group of output pipes\n"); 397 printf(" -B nbufs number of extra buffers (default: %d)\n", DEF_EXTRA_BUFS); 398 printf(" -b batch batch size (default: %d)\n", DEF_BATCH); 399 printf(" -w seconds wait for link up (default: %d)\n", DEF_WAIT_LINK); 400 printf(" -W enable busy waiting. this will run your CPU at 100%%\n"); 401 printf(" -s seconds seconds between syslog stats messages (default: 0)\n"); 402 printf(" -o seconds seconds between stdout stats messages (default: 0)\n"); 403 exit(0); 404 } 405 406 static int 407 parse_pipes(const char *spec) 408 { 409 const char *end = index(spec, ':'); 410 static int max_groups = 0; 411 struct group_des *g; 412 413 ND("spec %s num_groups %d", spec, glob_arg.num_groups); 414 if (max_groups < glob_arg.num_groups + 1) { 415 size_t size = sizeof(*g) * (glob_arg.num_groups + 1); 416 groups = realloc(groups, size); 417 if (groups == NULL) { 418 D("out of memory"); 419 return 1; 420 } 421 } 422 g = &groups[glob_arg.num_groups]; 423 memset(g, 0, sizeof(*g)); 424 425 if (end != NULL) { 426 if (end - spec > MAX_IFNAMELEN - 8) { 427 D("name '%s' too long", spec); 428 return 1; 429 } 430 if (end == spec) { 431 D("missing prefix before ':' in '%s'", spec); 432 return 1; 433 } 434 strncpy(g->pipename, spec, end - spec); 435 g->custom_port = 1; 436 end++; 437 } else { 438 /* no prefix, this group will use the 439 * name of the input port. 440 * This will be set in init_groups(), 441 * since here the input port may still 442 * be uninitialized 443 */ 444 end = spec; 445 } 446 if (*end == '\0') { 447 g->nports = DEF_OUT_PIPES; 448 } else { 449 g->nports = atoi(end); 450 if (g->nports < 1) { 451 D("invalid number of pipes '%s' (must be at least 1)", end); 452 return 1; 453 } 454 } 455 glob_arg.output_rings += g->nports; 456 glob_arg.num_groups++; 457 return 0; 458 } 459 460 /* complete the initialization of the groups data structure */ 461 static void 462 init_groups(void) 463 { 464 int i, j, t = 0; 465 struct group_des *g = NULL; 466 for (i = 0; i < glob_arg.num_groups; i++) { 467 g = &groups[i]; 468 g->ports = &ports[t]; 469 for (j = 0; j < g->nports; j++) 470 g->ports[j].group = g; 471 t += g->nports; 472 if (!g->custom_port) 473 strcpy(g->pipename, glob_arg.base_name); 474 for (j = 0; j < i; j++) { 475 struct group_des *h = &groups[j]; 476 if (!strcmp(h->pipename, g->pipename)) 477 g->first_id += h->nports; 478 } 479 } 480 g->last = 1; 481 } 482 483 /* push the packet described by slot rs to the group g. 484 * This may cause other buffers to be pushed down the 485 * chain headed by g. 486 * Return a free buffer. 487 */ 488 static uint32_t 489 forward_packet(struct group_des *g, struct netmap_slot *rs) 490 { 491 uint32_t hash = rs->ptr; 492 uint32_t output_port = hash % g->nports; 493 struct port_des *port = &g->ports[output_port]; 494 struct netmap_ring *ring = port->ring; 495 struct overflow_queue *q = port->oq; 496 497 /* Move the packet to the output pipe, unless there is 498 * either no space left on the ring, or there is some 499 * packet still in the overflow queue (since those must 500 * take precedence over the new one) 501 */ 502 if (ring->head != ring->tail && (q == NULL || oq_empty(q))) { 503 struct netmap_slot *ts = &ring->slot[ring->head]; 504 struct netmap_slot old_slot = *ts; 505 506 ts->buf_idx = rs->buf_idx; 507 ts->len = rs->len; 508 ts->flags |= NS_BUF_CHANGED; 509 ts->ptr = rs->ptr; 510 ring->head = nm_ring_next(ring, ring->head); 511 port->ctr.bytes += rs->len; 512 port->ctr.pkts++; 513 forwarded++; 514 return old_slot.buf_idx; 515 } 516 517 /* use the overflow queue, if available */ 518 if (q == NULL || oq_full(q)) { 519 /* no space left on the ring and no overflow queue 520 * available: we are forced to drop the packet 521 */ 522 dropped++; 523 port->ctr.drop++; 524 port->ctr.drop_bytes += rs->len; 525 return rs->buf_idx; 526 } 527 528 oq_enq(q, rs); 529 530 /* 531 * we cannot continue down the chain and we need to 532 * return a free buffer now. We take it from the free queue. 533 */ 534 if (oq_empty(freeq)) { 535 /* the free queue is empty. Revoke some buffers 536 * from the longest overflow queue 537 */ 538 uint32_t j; 539 struct port_des *lp = &ports[0]; 540 uint32_t max = lp->oq->n; 541 542 /* let lp point to the port with the longest queue */ 543 for (j = 1; j < glob_arg.output_rings; j++) { 544 struct port_des *cp = &ports[j]; 545 if (cp->oq->n > max) { 546 lp = cp; 547 max = cp->oq->n; 548 } 549 } 550 551 /* move the oldest BUF_REVOKE buffers from the 552 * lp queue to the free queue 553 */ 554 // XXX optimize this cycle 555 for (j = 0; lp->oq->n && j < BUF_REVOKE; j++) { 556 struct netmap_slot tmp = oq_deq(lp->oq); 557 558 dropped++; 559 lp->ctr.drop++; 560 lp->ctr.drop_bytes += tmp.len; 561 562 oq_enq(freeq, &tmp); 563 } 564 565 ND(1, "revoked %d buffers from %s", j, lq->name); 566 } 567 568 return oq_deq(freeq).buf_idx; 569 } 570 571 int main(int argc, char **argv) 572 { 573 int ch; 574 uint32_t i; 575 int rv; 576 unsigned int iter = 0; 577 int poll_timeout = 10; /* default */ 578 579 glob_arg.ifname[0] = '\0'; 580 glob_arg.output_rings = 0; 581 glob_arg.batch = DEF_BATCH; 582 glob_arg.wait_link = DEF_WAIT_LINK; 583 glob_arg.busy_wait = false; 584 glob_arg.syslog_interval = 0; 585 glob_arg.stdout_interval = 0; 586 587 while ( (ch = getopt(argc, argv, "hi:p:b:B:s:o:w:W")) != -1) { 588 switch (ch) { 589 case 'i': 590 D("interface is %s", optarg); 591 if (strlen(optarg) > MAX_IFNAMELEN - 8) { 592 D("ifname too long %s", optarg); 593 return 1; 594 } 595 if (strncmp(optarg, "netmap:", 7) && strncmp(optarg, "vale", 4)) { 596 sprintf(glob_arg.ifname, "netmap:%s", optarg); 597 } else { 598 strcpy(glob_arg.ifname, optarg); 599 } 600 break; 601 602 case 'p': 603 if (parse_pipes(optarg)) { 604 usage(); 605 return 1; 606 } 607 break; 608 609 case 'B': 610 glob_arg.extra_bufs = atoi(optarg); 611 D("requested %d extra buffers", glob_arg.extra_bufs); 612 break; 613 614 case 'b': 615 glob_arg.batch = atoi(optarg); 616 D("batch is %d", glob_arg.batch); 617 break; 618 619 case 'w': 620 glob_arg.wait_link = atoi(optarg); 621 D("link wait for up time is %d", glob_arg.wait_link); 622 break; 623 624 case 'W': 625 glob_arg.busy_wait = true; 626 break; 627 628 case 'o': 629 glob_arg.stdout_interval = atoi(optarg); 630 break; 631 632 case 's': 633 glob_arg.syslog_interval = atoi(optarg); 634 break; 635 636 case 'h': 637 usage(); 638 return 0; 639 break; 640 641 default: 642 D("bad option %c %s", ch, optarg); 643 usage(); 644 return 1; 645 } 646 } 647 648 if (glob_arg.ifname[0] == '\0') { 649 D("missing interface name"); 650 usage(); 651 return 1; 652 } 653 654 /* extract the base name */ 655 char *nscan = strncmp(glob_arg.ifname, "netmap:", 7) ? 656 glob_arg.ifname : glob_arg.ifname + 7; 657 strncpy(glob_arg.base_name, nscan, MAX_IFNAMELEN - 1); 658 for (nscan = glob_arg.base_name; *nscan && !index("-*^{}/@", *nscan); nscan++) 659 ; 660 *nscan = '\0'; 661 662 if (glob_arg.num_groups == 0) 663 parse_pipes(""); 664 665 if (glob_arg.syslog_interval) { 666 setlogmask(LOG_UPTO(LOG_INFO)); 667 openlog("lb", LOG_CONS | LOG_PID | LOG_NDELAY, LOG_LOCAL1); 668 } 669 670 uint32_t npipes = glob_arg.output_rings; 671 672 673 pthread_t stat_thread; 674 675 ports = calloc(npipes + 1, sizeof(struct port_des)); 676 if (!ports) { 677 D("failed to allocate the stats array"); 678 return 1; 679 } 680 struct port_des *rxport = &ports[npipes]; 681 init_groups(); 682 683 memset(&counters_buf, 0, sizeof(counters_buf)); 684 counters_buf.ctrs = calloc(npipes, sizeof(struct my_ctrs)); 685 if (!counters_buf.ctrs) { 686 D("failed to allocate the counters snapshot buffer"); 687 return 1; 688 } 689 690 /* we need base_req to specify pipes and extra bufs */ 691 struct nmreq base_req; 692 memset(&base_req, 0, sizeof(base_req)); 693 694 base_req.nr_arg1 = npipes; 695 base_req.nr_arg3 = glob_arg.extra_bufs; 696 697 rxport->nmd = nm_open(glob_arg.ifname, &base_req, 0, NULL); 698 699 if (rxport->nmd == NULL) { 700 D("cannot open %s", glob_arg.ifname); 701 return (1); 702 } else { 703 D("successfully opened %s (tx rings: %u)", glob_arg.ifname, 704 rxport->nmd->req.nr_tx_slots); 705 } 706 707 uint32_t extra_bufs = rxport->nmd->req.nr_arg3; 708 struct overflow_queue *oq = NULL; 709 /* reference ring to access the buffers */ 710 rxport->ring = NETMAP_RXRING(rxport->nmd->nifp, 0); 711 712 if (!glob_arg.extra_bufs) 713 goto run; 714 715 D("obtained %d extra buffers", extra_bufs); 716 if (!extra_bufs) 717 goto run; 718 719 /* one overflow queue for each output pipe, plus one for the 720 * free extra buffers 721 */ 722 oq = calloc(npipes + 1, sizeof(struct overflow_queue)); 723 if (!oq) { 724 D("failed to allocated overflow queues descriptors"); 725 goto run; 726 } 727 728 freeq = &oq[npipes]; 729 rxport->oq = freeq; 730 731 freeq->slots = calloc(extra_bufs, sizeof(struct netmap_slot)); 732 if (!freeq->slots) { 733 D("failed to allocate the free list"); 734 } 735 freeq->size = extra_bufs; 736 snprintf(freeq->name, MAX_IFNAMELEN, "free queue"); 737 738 /* 739 * the list of buffers uses the first uint32_t in each buffer 740 * as the index of the next buffer. 741 */ 742 uint32_t scan; 743 for (scan = rxport->nmd->nifp->ni_bufs_head; 744 scan; 745 scan = *(uint32_t *)NETMAP_BUF(rxport->ring, scan)) 746 { 747 struct netmap_slot s; 748 s.len = s.flags = 0; 749 s.ptr = 0; 750 s.buf_idx = scan; 751 ND("freeq <- %d", s.buf_idx); 752 oq_enq(freeq, &s); 753 } 754 755 756 if (freeq->n != extra_bufs) { 757 D("something went wrong: netmap reported %d extra_bufs, but the free list contained %d", 758 extra_bufs, freeq->n); 759 return 1; 760 } 761 rxport->nmd->nifp->ni_bufs_head = 0; 762 763 run: 764 atexit(free_buffers); 765 766 int j, t = 0; 767 for (j = 0; j < glob_arg.num_groups; j++) { 768 struct group_des *g = &groups[j]; 769 int k; 770 for (k = 0; k < g->nports; ++k) { 771 struct port_des *p = &g->ports[k]; 772 snprintf(p->interface, MAX_PORTNAMELEN, "%s%s{%d/xT@%d", 773 (strncmp(g->pipename, "vale", 4) ? "netmap:" : ""), 774 g->pipename, g->first_id + k, 775 rxport->nmd->req.nr_arg2); 776 D("opening pipe named %s", p->interface); 777 778 p->nmd = nm_open(p->interface, NULL, 0, rxport->nmd); 779 780 if (p->nmd == NULL) { 781 D("cannot open %s", p->interface); 782 return (1); 783 } else if (p->nmd->req.nr_arg2 != rxport->nmd->req.nr_arg2) { 784 D("failed to open pipe #%d in zero-copy mode, " 785 "please close any application that uses either pipe %s}%d, " 786 "or %s{%d, and retry", 787 k + 1, g->pipename, g->first_id + k, g->pipename, g->first_id + k); 788 return (1); 789 } else { 790 D("successfully opened pipe #%d %s (tx slots: %d)", 791 k + 1, p->interface, p->nmd->req.nr_tx_slots); 792 p->ring = NETMAP_TXRING(p->nmd->nifp, 0); 793 p->last_tail = nm_ring_next(p->ring, p->ring->tail); 794 } 795 D("zerocopy %s", 796 (rxport->nmd->mem == p->nmd->mem) ? "enabled" : "disabled"); 797 798 if (extra_bufs) { 799 struct overflow_queue *q = &oq[t + k]; 800 q->slots = calloc(extra_bufs, sizeof(struct netmap_slot)); 801 if (!q->slots) { 802 D("failed to allocate overflow queue for pipe %d", k); 803 /* make all overflow queue management fail */ 804 extra_bufs = 0; 805 } 806 q->size = extra_bufs; 807 snprintf(q->name, sizeof(q->name), "oq %s{%4d", g->pipename, k); 808 p->oq = q; 809 } 810 } 811 t += g->nports; 812 } 813 814 if (glob_arg.extra_bufs && !extra_bufs) { 815 if (oq) { 816 for (i = 0; i < npipes + 1; i++) { 817 free(oq[i].slots); 818 oq[i].slots = NULL; 819 } 820 free(oq); 821 oq = NULL; 822 } 823 D("*** overflow queues disabled ***"); 824 } 825 826 sleep(glob_arg.wait_link); 827 828 /* start stats thread after wait_link */ 829 if (pthread_create(&stat_thread, NULL, print_stats, NULL) == -1) { 830 D("unable to create the stats thread: %s", strerror(errno)); 831 return 1; 832 } 833 834 struct pollfd pollfd[npipes + 1]; 835 memset(&pollfd, 0, sizeof(pollfd)); 836 signal(SIGINT, sigint_h); 837 838 /* make sure we wake up as often as needed, even when there are no 839 * packets coming in 840 */ 841 if (glob_arg.syslog_interval > 0 && glob_arg.syslog_interval < poll_timeout) 842 poll_timeout = glob_arg.syslog_interval; 843 if (glob_arg.stdout_interval > 0 && glob_arg.stdout_interval < poll_timeout) 844 poll_timeout = glob_arg.stdout_interval; 845 846 while (!do_abort) { 847 u_int polli = 0; 848 iter++; 849 850 for (i = 0; i < npipes; ++i) { 851 struct netmap_ring *ring = ports[i].ring; 852 int pending = nm_tx_pending(ring); 853 854 /* if there are packets pending, we want to be notified when 855 * tail moves, so we let cur=tail 856 */ 857 ring->cur = pending ? ring->tail : ring->head; 858 859 if (!glob_arg.busy_wait && !pending) { 860 /* no need to poll, there are no packets pending */ 861 continue; 862 } 863 pollfd[polli].fd = ports[i].nmd->fd; 864 pollfd[polli].events = POLLOUT; 865 pollfd[polli].revents = 0; 866 ++polli; 867 } 868 869 pollfd[polli].fd = rxport->nmd->fd; 870 pollfd[polli].events = POLLIN; 871 pollfd[polli].revents = 0; 872 ++polli; 873 874 //RD(5, "polling %d file descriptors", polli+1); 875 rv = poll(pollfd, polli, poll_timeout); 876 if (rv <= 0) { 877 if (rv < 0 && errno != EAGAIN && errno != EINTR) 878 RD(1, "poll error %s", strerror(errno)); 879 goto send_stats; 880 } 881 882 /* if there are several groups, try pushing released packets from 883 * upstream groups to the downstream ones. 884 * 885 * It is important to do this before returned slots are reused 886 * for new transmissions. For the same reason, this must be 887 * done starting from the last group going backwards. 888 */ 889 for (i = glob_arg.num_groups - 1U; i > 0; i--) { 890 struct group_des *g = &groups[i - 1]; 891 892 for (j = 0; j < g->nports; j++) { 893 struct port_des *p = &g->ports[j]; 894 struct netmap_ring *ring = p->ring; 895 uint32_t last = p->last_tail, 896 stop = nm_ring_next(ring, ring->tail); 897 898 /* slight abuse of the API here: we touch the slot 899 * pointed to by tail 900 */ 901 for ( ; last != stop; last = nm_ring_next(ring, last)) { 902 struct netmap_slot *rs = &ring->slot[last]; 903 // XXX less aggressive? 904 rs->buf_idx = forward_packet(g + 1, rs); 905 rs->flags |= NS_BUF_CHANGED; 906 rs->ptr = 0; 907 } 908 p->last_tail = last; 909 } 910 } 911 912 913 914 if (oq) { 915 /* try to push packets from the overflow queues 916 * to the corresponding pipes 917 */ 918 for (i = 0; i < npipes; i++) { 919 struct port_des *p = &ports[i]; 920 struct overflow_queue *q = p->oq; 921 uint32_t k, lim; 922 struct netmap_ring *ring; 923 struct netmap_slot *slot; 924 925 if (oq_empty(q)) 926 continue; 927 ring = p->ring; 928 lim = nm_ring_space(ring); 929 if (!lim) 930 continue; 931 if (q->n < lim) 932 lim = q->n; 933 for (k = 0; k < lim; k++) { 934 struct netmap_slot s = oq_deq(q), tmp; 935 tmp.ptr = 0; 936 slot = &ring->slot[ring->head]; 937 tmp.buf_idx = slot->buf_idx; 938 oq_enq(freeq, &tmp); 939 *slot = s; 940 slot->flags |= NS_BUF_CHANGED; 941 ring->head = nm_ring_next(ring, ring->head); 942 } 943 } 944 } 945 946 /* push any new packets from the input port to the first group */ 947 int batch = 0; 948 for (i = rxport->nmd->first_rx_ring; i <= rxport->nmd->last_rx_ring; i++) { 949 struct netmap_ring *rxring = NETMAP_RXRING(rxport->nmd->nifp, i); 950 951 //D("prepare to scan rings"); 952 int next_head = rxring->head; 953 struct netmap_slot *next_slot = &rxring->slot[next_head]; 954 const char *next_buf = NETMAP_BUF(rxring, next_slot->buf_idx); 955 while (!nm_ring_empty(rxring)) { 956 struct netmap_slot *rs = next_slot; 957 struct group_des *g = &groups[0]; 958 ++received_pkts; 959 received_bytes += rs->len; 960 961 // CHOOSE THE CORRECT OUTPUT PIPE 962 rs->ptr = pkt_hdr_hash((const unsigned char *)next_buf, 4, 'B'); 963 if (rs->ptr == 0) { 964 non_ip++; // XXX ?? 965 } 966 // prefetch the buffer for the next round 967 next_head = nm_ring_next(rxring, next_head); 968 next_slot = &rxring->slot[next_head]; 969 next_buf = NETMAP_BUF(rxring, next_slot->buf_idx); 970 __builtin_prefetch(next_buf); 971 // 'B' is just a hashing seed 972 rs->buf_idx = forward_packet(g, rs); 973 rs->flags |= NS_BUF_CHANGED; 974 rxring->head = rxring->cur = next_head; 975 976 batch++; 977 if (unlikely(batch >= glob_arg.batch)) { 978 ioctl(rxport->nmd->fd, NIOCRXSYNC, NULL); 979 batch = 0; 980 } 981 ND(1, 982 "Forwarded Packets: %"PRIu64" Dropped packets: %"PRIu64" Percent: %.2f", 983 forwarded, dropped, 984 ((float)dropped / (float)forwarded * 100)); 985 } 986 987 } 988 989 send_stats: 990 if (counters_buf.status == COUNTERS_FULL) 991 continue; 992 /* take a new snapshot of the counters */ 993 gettimeofday(&counters_buf.ts, NULL); 994 for (i = 0; i < npipes; i++) { 995 struct my_ctrs *c = &counters_buf.ctrs[i]; 996 *c = ports[i].ctr; 997 /* 998 * If there are overflow queues, copy the number of them for each 999 * port to the ctrs.oq_n variable for each port. 1000 */ 1001 if (ports[i].oq != NULL) 1002 c->oq_n = ports[i].oq->n; 1003 } 1004 counters_buf.received_pkts = received_pkts; 1005 counters_buf.received_bytes = received_bytes; 1006 counters_buf.non_ip = non_ip; 1007 if (freeq != NULL) 1008 counters_buf.freeq_n = freeq->n; 1009 __sync_synchronize(); 1010 counters_buf.status = COUNTERS_FULL; 1011 } 1012 1013 /* 1014 * If freeq exists, copy the number to the freeq_n member of the 1015 * message struct, otherwise set it to 0. 1016 */ 1017 if (freeq != NULL) { 1018 freeq_n = freeq->n; 1019 } else { 1020 freeq_n = 0; 1021 } 1022 1023 pthread_join(stat_thread, NULL); 1024 1025 printf("%"PRIu64" packets forwarded. %"PRIu64" packets dropped. Total %"PRIu64"\n", forwarded, 1026 dropped, forwarded + dropped); 1027 return 0; 1028 } 1029