1 /*
2 * Copyright (C) 2017 Corelight, Inc. and Universita` di Pisa. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 * 1. Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * 2. Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 *
13 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
23 * SUCH DAMAGE.
24 */
25 /* $FreeBSD$ */
26 #include <ctype.h>
27 #include <errno.h>
28 #include <inttypes.h>
29 #include <libnetmap.h>
30 #include <netinet/in.h> /* htonl */
31 #include <pthread.h>
32 #include <signal.h>
33 #include <stdbool.h>
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <syslog.h>
38 #include <sys/ioctl.h>
39 #include <sys/poll.h>
40 #include <unistd.h>
41
42 #include "pkt_hash.h"
43 #include "ctrs.h"
44
45
46 /*
47 * use our version of header structs, rather than bringing in a ton
48 * of platform specific ones
49 */
50 #ifndef ETH_ALEN
51 #define ETH_ALEN 6
52 #endif
53
54 struct compact_eth_hdr {
55 unsigned char h_dest[ETH_ALEN];
56 unsigned char h_source[ETH_ALEN];
57 u_int16_t h_proto;
58 };
59
60 struct compact_ip_hdr {
61 u_int8_t ihl:4, version:4;
62 u_int8_t tos;
63 u_int16_t tot_len;
64 u_int16_t id;
65 u_int16_t frag_off;
66 u_int8_t ttl;
67 u_int8_t protocol;
68 u_int16_t check;
69 u_int32_t saddr;
70 u_int32_t daddr;
71 };
72
73 struct compact_ipv6_hdr {
74 u_int8_t priority:4, version:4;
75 u_int8_t flow_lbl[3];
76 u_int16_t payload_len;
77 u_int8_t nexthdr;
78 u_int8_t hop_limit;
79 struct in6_addr saddr;
80 struct in6_addr daddr;
81 };
82
83 #define MAX_IFNAMELEN 64
84 #define MAX_PORTNAMELEN (MAX_IFNAMELEN + 40)
85 #define DEF_OUT_PIPES 2
86 #define DEF_EXTRA_BUFS 0
87 #define DEF_BATCH 2048
88 #define DEF_WAIT_LINK 2
89 #define DEF_STATS_INT 600
90 #define BUF_REVOKE 150
91 #define STAT_MSG_MAXSIZE 1024
92
93 static struct {
94 char ifname[MAX_IFNAMELEN + 1];
95 char base_name[MAX_IFNAMELEN + 1];
96 int netmap_fd;
97 uint16_t output_rings;
98 uint16_t num_groups;
99 uint32_t extra_bufs;
100 uint16_t batch;
101 int stdout_interval;
102 int syslog_interval;
103 int wait_link;
104 bool busy_wait;
105 } glob_arg;
106
107 /*
108 * the overflow queue is a circular queue of buffers
109 */
110 struct overflow_queue {
111 char name[MAX_IFNAMELEN + 16];
112 struct netmap_slot *slots;
113 uint32_t head;
114 uint32_t tail;
115 uint32_t n;
116 uint32_t size;
117 };
118
119 static struct overflow_queue *freeq;
120
121 static inline int
oq_full(struct overflow_queue * q)122 oq_full(struct overflow_queue *q)
123 {
124 return q->n >= q->size;
125 }
126
127 static inline int
oq_empty(struct overflow_queue * q)128 oq_empty(struct overflow_queue *q)
129 {
130 return q->n <= 0;
131 }
132
133 static inline void
oq_enq(struct overflow_queue * q,const struct netmap_slot * s)134 oq_enq(struct overflow_queue *q, const struct netmap_slot *s)
135 {
136 if (unlikely(oq_full(q))) {
137 D("%s: queue full!", q->name);
138 abort();
139 }
140 q->slots[q->tail] = *s;
141 q->n++;
142 q->tail++;
143 if (q->tail >= q->size)
144 q->tail = 0;
145 }
146
147 static inline struct netmap_slot
oq_deq(struct overflow_queue * q)148 oq_deq(struct overflow_queue *q)
149 {
150 struct netmap_slot s = q->slots[q->head];
151 if (unlikely(oq_empty(q))) {
152 D("%s: queue empty!", q->name);
153 abort();
154 }
155 q->n--;
156 q->head++;
157 if (q->head >= q->size)
158 q->head = 0;
159 return s;
160 }
161
162 static volatile int do_abort = 0;
163
164 static uint64_t dropped = 0;
165 static uint64_t forwarded = 0;
166 static uint64_t received_bytes = 0;
167 static uint64_t received_pkts = 0;
168 static uint64_t non_ip = 0;
169 static uint32_t freeq_n = 0;
170
171 struct port_des {
172 char interface[MAX_PORTNAMELEN];
173 struct my_ctrs ctr;
174 unsigned int last_sync;
175 uint32_t last_tail;
176 struct overflow_queue *oq;
177 struct nmport_d *nmd;
178 struct netmap_ring *ring;
179 struct group_des *group;
180 };
181
182 static struct port_des *ports;
183
184 /* each group of pipes receives all the packets */
185 struct group_des {
186 char pipename[MAX_IFNAMELEN];
187 struct port_des *ports;
188 int first_id;
189 int nports;
190 int last;
191 int custom_port;
192 };
193
194 static struct group_des *groups;
195
196 /* statistcs */
197 struct counters {
198 struct timeval ts;
199 struct my_ctrs *ctrs;
200 uint64_t received_pkts;
201 uint64_t received_bytes;
202 uint64_t non_ip;
203 uint32_t freeq_n;
204 int status __attribute__((aligned(64)));
205 #define COUNTERS_EMPTY 0
206 #define COUNTERS_FULL 1
207 };
208
209 static struct counters counters_buf;
210
211 static void *
print_stats(void * arg)212 print_stats(void *arg)
213 {
214 int npipes = glob_arg.output_rings;
215 int sys_int = 0;
216 (void)arg;
217 struct my_ctrs cur, prev;
218 struct my_ctrs *pipe_prev;
219
220 pipe_prev = calloc(npipes, sizeof(struct my_ctrs));
221 if (pipe_prev == NULL) {
222 D("out of memory");
223 exit(1);
224 }
225
226 char stat_msg[STAT_MSG_MAXSIZE] = "";
227
228 memset(&prev, 0, sizeof(prev));
229 while (!do_abort) {
230 int j, dosyslog = 0, dostdout = 0, newdata;
231 uint64_t pps = 0, dps = 0, bps = 0, dbps = 0, usec = 0;
232 struct my_ctrs x;
233
234 counters_buf.status = COUNTERS_EMPTY;
235 newdata = 0;
236 memset(&cur, 0, sizeof(cur));
237 sleep(1);
238 if (counters_buf.status == COUNTERS_FULL) {
239 __sync_synchronize();
240 newdata = 1;
241 cur.t = counters_buf.ts;
242 if (prev.t.tv_sec || prev.t.tv_usec) {
243 usec = (cur.t.tv_sec - prev.t.tv_sec) * 1000000 +
244 cur.t.tv_usec - prev.t.tv_usec;
245 }
246 }
247
248 ++sys_int;
249 if (glob_arg.stdout_interval && sys_int % glob_arg.stdout_interval == 0)
250 dostdout = 1;
251 if (glob_arg.syslog_interval && sys_int % glob_arg.syslog_interval == 0)
252 dosyslog = 1;
253
254 for (j = 0; j < npipes; ++j) {
255 struct my_ctrs *c = &counters_buf.ctrs[j];
256 cur.pkts += c->pkts;
257 cur.drop += c->drop;
258 cur.drop_bytes += c->drop_bytes;
259 cur.bytes += c->bytes;
260
261 if (usec) {
262 x.pkts = c->pkts - pipe_prev[j].pkts;
263 x.drop = c->drop - pipe_prev[j].drop;
264 x.bytes = c->bytes - pipe_prev[j].bytes;
265 x.drop_bytes = c->drop_bytes - pipe_prev[j].drop_bytes;
266 pps = (x.pkts*1000000 + usec/2) / usec;
267 dps = (x.drop*1000000 + usec/2) / usec;
268 bps = ((x.bytes*1000000 + usec/2) / usec) * 8;
269 dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8;
270 }
271 pipe_prev[j] = *c;
272
273 if ( (dosyslog || dostdout) && newdata )
274 snprintf(stat_msg, STAT_MSG_MAXSIZE,
275 "{"
276 "\"ts\":%.6f,"
277 "\"interface\":\"%s\","
278 "\"output_ring\":%" PRIu16 ","
279 "\"packets_forwarded\":%" PRIu64 ","
280 "\"packets_dropped\":%" PRIu64 ","
281 "\"data_forward_rate_Mbps\":%.4f,"
282 "\"data_drop_rate_Mbps\":%.4f,"
283 "\"packet_forward_rate_kpps\":%.4f,"
284 "\"packet_drop_rate_kpps\":%.4f,"
285 "\"overflow_queue_size\":%" PRIu32
286 "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0),
287 ports[j].interface,
288 j,
289 c->pkts,
290 c->drop,
291 (double)bps / 1024 / 1024,
292 (double)dbps / 1024 / 1024,
293 (double)pps / 1000,
294 (double)dps / 1000,
295 c->oq_n);
296
297 if (dosyslog && stat_msg[0])
298 syslog(LOG_INFO, "%s", stat_msg);
299 if (dostdout && stat_msg[0])
300 printf("%s\n", stat_msg);
301 }
302 if (usec) {
303 x.pkts = cur.pkts - prev.pkts;
304 x.drop = cur.drop - prev.drop;
305 x.bytes = cur.bytes - prev.bytes;
306 x.drop_bytes = cur.drop_bytes - prev.drop_bytes;
307 pps = (x.pkts*1000000 + usec/2) / usec;
308 dps = (x.drop*1000000 + usec/2) / usec;
309 bps = ((x.bytes*1000000 + usec/2) / usec) * 8;
310 dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8;
311 }
312
313 if ( (dosyslog || dostdout) && newdata )
314 snprintf(stat_msg, STAT_MSG_MAXSIZE,
315 "{"
316 "\"ts\":%.6f,"
317 "\"interface\":\"%s\","
318 "\"output_ring\":null,"
319 "\"packets_received\":%" PRIu64 ","
320 "\"packets_forwarded\":%" PRIu64 ","
321 "\"packets_dropped\":%" PRIu64 ","
322 "\"non_ip_packets\":%" PRIu64 ","
323 "\"data_forward_rate_Mbps\":%.4f,"
324 "\"data_drop_rate_Mbps\":%.4f,"
325 "\"packet_forward_rate_kpps\":%.4f,"
326 "\"packet_drop_rate_kpps\":%.4f,"
327 "\"free_buffer_slots\":%" PRIu32
328 "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0),
329 glob_arg.ifname,
330 received_pkts,
331 cur.pkts,
332 cur.drop,
333 counters_buf.non_ip,
334 (double)bps / 1024 / 1024,
335 (double)dbps / 1024 / 1024,
336 (double)pps / 1000,
337 (double)dps / 1000,
338 counters_buf.freeq_n);
339
340 if (dosyslog && stat_msg[0])
341 syslog(LOG_INFO, "%s", stat_msg);
342 if (dostdout && stat_msg[0])
343 printf("%s\n", stat_msg);
344
345 prev = cur;
346 }
347
348 free(pipe_prev);
349
350 return NULL;
351 }
352
353 static void
free_buffers(void)354 free_buffers(void)
355 {
356 int i, tot = 0;
357 struct port_des *rxport = &ports[glob_arg.output_rings];
358
359 /* build a netmap free list with the buffers in all the overflow queues */
360 for (i = 0; i < glob_arg.output_rings + 1; i++) {
361 struct port_des *cp = &ports[i];
362 struct overflow_queue *q = cp->oq;
363
364 if (!q)
365 continue;
366
367 while (q->n) {
368 struct netmap_slot s = oq_deq(q);
369 uint32_t *b = (uint32_t *)NETMAP_BUF(cp->ring, s.buf_idx);
370
371 *b = rxport->nmd->nifp->ni_bufs_head;
372 rxport->nmd->nifp->ni_bufs_head = s.buf_idx;
373 tot++;
374 }
375 }
376 D("added %d buffers to netmap free list", tot);
377
378 for (i = 0; i < glob_arg.output_rings + 1; ++i) {
379 nmport_close(ports[i].nmd);
380 }
381 }
382
383
sigint_h(int sig)384 static void sigint_h(int sig)
385 {
386 (void)sig; /* UNUSED */
387 do_abort = 1;
388 signal(SIGINT, SIG_DFL);
389 }
390
usage()391 static void usage()
392 {
393 printf("usage: lb [options]\n");
394 printf("where options are:\n");
395 printf(" -h view help text\n");
396 printf(" -i iface interface name (required)\n");
397 printf(" -p [prefix:]npipes add a new group of output pipes\n");
398 printf(" -B nbufs number of extra buffers (default: %d)\n", DEF_EXTRA_BUFS);
399 printf(" -b batch batch size (default: %d)\n", DEF_BATCH);
400 printf(" -w seconds wait for link up (default: %d)\n", DEF_WAIT_LINK);
401 printf(" -W enable busy waiting. this will run your CPU at 100%%\n");
402 printf(" -s seconds seconds between syslog stats messages (default: 0)\n");
403 printf(" -o seconds seconds between stdout stats messages (default: 0)\n");
404 exit(0);
405 }
406
407 static int
parse_pipes(const char * spec)408 parse_pipes(const char *spec)
409 {
410 const char *end = index(spec, ':');
411 static int max_groups = 0;
412 struct group_des *g;
413
414 ND("spec %s num_groups %d", spec, glob_arg.num_groups);
415 if (max_groups < glob_arg.num_groups + 1) {
416 size_t size = sizeof(*g) * (glob_arg.num_groups + 1);
417 groups = realloc(groups, size);
418 if (groups == NULL) {
419 D("out of memory");
420 return 1;
421 }
422 }
423 g = &groups[glob_arg.num_groups];
424 memset(g, 0, sizeof(*g));
425
426 if (end != NULL) {
427 if (end - spec > MAX_IFNAMELEN - 8) {
428 D("name '%s' too long", spec);
429 return 1;
430 }
431 if (end == spec) {
432 D("missing prefix before ':' in '%s'", spec);
433 return 1;
434 }
435 strncpy(g->pipename, spec, end - spec);
436 g->custom_port = 1;
437 end++;
438 } else {
439 /* no prefix, this group will use the
440 * name of the input port.
441 * This will be set in init_groups(),
442 * since here the input port may still
443 * be uninitialized
444 */
445 end = spec;
446 }
447 if (*end == '\0') {
448 g->nports = DEF_OUT_PIPES;
449 } else {
450 g->nports = atoi(end);
451 if (g->nports < 1) {
452 D("invalid number of pipes '%s' (must be at least 1)", end);
453 return 1;
454 }
455 }
456 glob_arg.output_rings += g->nports;
457 glob_arg.num_groups++;
458 return 0;
459 }
460
461 /* complete the initialization of the groups data structure */
462 static void
init_groups(void)463 init_groups(void)
464 {
465 int i, j, t = 0;
466 struct group_des *g = NULL;
467 for (i = 0; i < glob_arg.num_groups; i++) {
468 g = &groups[i];
469 g->ports = &ports[t];
470 for (j = 0; j < g->nports; j++)
471 g->ports[j].group = g;
472 t += g->nports;
473 if (!g->custom_port)
474 strcpy(g->pipename, glob_arg.base_name);
475 for (j = 0; j < i; j++) {
476 struct group_des *h = &groups[j];
477 if (!strcmp(h->pipename, g->pipename))
478 g->first_id += h->nports;
479 }
480 }
481 g->last = 1;
482 }
483
484
485 /* To support packets that span multiple slots (NS_MOREFRAG) we
486 * need to make sure of the following:
487 *
488 * - all fragments of the same packet must go to the same output pipe
489 * - when dropping, all fragments of the same packet must be dropped
490 *
491 * For the former point we remember and reuse the last hash computed
492 * in each input ring, and only update it when NS_MOREFRAG was not
493 * set in the last received slot (this marks the start of a new packet).
494 *
495 * For the latter point, we only update the output ring head pointer
496 * when an entire packet has been forwarded. We keep a shadow_head
497 * pointer to know where to put the next partial fragment and,
498 * when the need to drop arises, we roll it back to head.
499 */
500 struct morefrag {
501 uint16_t last_flag; /* for intput rings */
502 uint32_t last_hash; /* for input rings */
503 uint32_t shadow_head; /* for output rings */
504 };
505
506 /* push the packet described by slot rs to the group g.
507 * This may cause other buffers to be pushed down the
508 * chain headed by g.
509 * Return a free buffer.
510 */
511 static uint32_t
forward_packet(struct group_des * g,struct netmap_slot * rs)512 forward_packet(struct group_des *g, struct netmap_slot *rs)
513 {
514 uint32_t hash = rs->ptr;
515 uint32_t output_port = hash % g->nports;
516 struct port_des *port = &g->ports[output_port];
517 struct netmap_ring *ring = port->ring;
518 struct overflow_queue *q = port->oq;
519 struct morefrag *mf = (struct morefrag *)ring->sem;
520 uint16_t curmf = rs->flags & NS_MOREFRAG;
521
522 /* Move the packet to the output pipe, unless there is
523 * either no space left on the ring, or there is some
524 * packet still in the overflow queue (since those must
525 * take precedence over the new one)
526 */
527 if (mf->shadow_head != ring->tail && (q == NULL || oq_empty(q))) {
528 struct netmap_slot *ts = &ring->slot[mf->shadow_head];
529 struct netmap_slot old_slot = *ts;
530
531 ts->buf_idx = rs->buf_idx;
532 ts->len = rs->len;
533 ts->flags = rs->flags | NS_BUF_CHANGED;
534 ts->ptr = rs->ptr;
535 mf->shadow_head = nm_ring_next(ring, mf->shadow_head);
536 if (!curmf) {
537 ring->head = mf->shadow_head;
538 }
539 ND("curmf %2x ts->flags %2x shadow_head %3u head %3u tail %3u",
540 curmf, ts->flags, mf->shadow_head, ring->head, ring->tail);
541 port->ctr.bytes += rs->len;
542 port->ctr.pkts++;
543 forwarded++;
544 return old_slot.buf_idx;
545 }
546
547 /* use the overflow queue, if available */
548 if (q == NULL || oq_full(q)) {
549 uint32_t scan;
550 /* no space left on the ring and no overflow queue
551 * available: we are forced to drop the packet
552 */
553
554 /* drop previous fragments, if any */
555 for (scan = ring->head; scan != mf->shadow_head;
556 scan = nm_ring_next(ring, scan)) {
557 struct netmap_slot *ts = &ring->slot[scan];
558 dropped++;
559 port->ctr.drop_bytes += ts->len;
560 }
561 mf->shadow_head = ring->head;
562
563 dropped++;
564 port->ctr.drop++;
565 port->ctr.drop_bytes += rs->len;
566 return rs->buf_idx;
567 }
568
569 oq_enq(q, rs);
570
571 /*
572 * we cannot continue down the chain and we need to
573 * return a free buffer now. We take it from the free queue.
574 */
575 if (oq_empty(freeq)) {
576 /* the free queue is empty. Revoke some buffers
577 * from the longest overflow queue
578 */
579 uint32_t j;
580 struct port_des *lp = &ports[0];
581 uint32_t max = lp->oq->n;
582
583 /* let lp point to the port with the longest queue */
584 for (j = 1; j < glob_arg.output_rings; j++) {
585 struct port_des *cp = &ports[j];
586 if (cp->oq->n > max) {
587 lp = cp;
588 max = cp->oq->n;
589 }
590 }
591
592 /* move the oldest BUF_REVOKE buffers from the
593 * lp queue to the free queue
594 *
595 * We cannot revoke a partially received packet.
596 * To make thinks simple we make sure to leave
597 * at least NETMAP_MAX_FRAGS slots in the queue.
598 */
599 for (j = 0; lp->oq->n > NETMAP_MAX_FRAGS && j < BUF_REVOKE; j++) {
600 struct netmap_slot tmp = oq_deq(lp->oq);
601
602 dropped++;
603 lp->ctr.drop++;
604 lp->ctr.drop_bytes += tmp.len;
605
606 oq_enq(freeq, &tmp);
607 }
608
609 ND(1, "revoked %d buffers from %s", j, lq->name);
610 }
611
612 return oq_deq(freeq).buf_idx;
613 }
614
main(int argc,char ** argv)615 int main(int argc, char **argv)
616 {
617 int ch;
618 uint32_t i;
619 int rv;
620 unsigned int iter = 0;
621 int poll_timeout = 10; /* default */
622
623 glob_arg.ifname[0] = '\0';
624 glob_arg.output_rings = 0;
625 glob_arg.batch = DEF_BATCH;
626 glob_arg.wait_link = DEF_WAIT_LINK;
627 glob_arg.busy_wait = false;
628 glob_arg.syslog_interval = 0;
629 glob_arg.stdout_interval = 0;
630
631 while ( (ch = getopt(argc, argv, "hi:p:b:B:s:o:w:W")) != -1) {
632 switch (ch) {
633 case 'i':
634 D("interface is %s", optarg);
635 if (strlen(optarg) > MAX_IFNAMELEN - 8) {
636 D("ifname too long %s", optarg);
637 return 1;
638 }
639 if (strncmp(optarg, "netmap:", 7) && strncmp(optarg, "vale", 4)) {
640 sprintf(glob_arg.ifname, "netmap:%s", optarg);
641 } else {
642 strcpy(glob_arg.ifname, optarg);
643 }
644 break;
645
646 case 'p':
647 if (parse_pipes(optarg)) {
648 usage();
649 return 1;
650 }
651 break;
652
653 case 'B':
654 glob_arg.extra_bufs = atoi(optarg);
655 D("requested %d extra buffers", glob_arg.extra_bufs);
656 break;
657
658 case 'b':
659 glob_arg.batch = atoi(optarg);
660 D("batch is %d", glob_arg.batch);
661 break;
662
663 case 'w':
664 glob_arg.wait_link = atoi(optarg);
665 D("link wait for up time is %d", glob_arg.wait_link);
666 break;
667
668 case 'W':
669 glob_arg.busy_wait = true;
670 break;
671
672 case 'o':
673 glob_arg.stdout_interval = atoi(optarg);
674 break;
675
676 case 's':
677 glob_arg.syslog_interval = atoi(optarg);
678 break;
679
680 case 'h':
681 usage();
682 return 0;
683 break;
684
685 default:
686 D("bad option %c %s", ch, optarg);
687 usage();
688 return 1;
689 }
690 }
691
692 if (glob_arg.ifname[0] == '\0') {
693 D("missing interface name");
694 usage();
695 return 1;
696 }
697
698 if (glob_arg.num_groups == 0)
699 parse_pipes("");
700
701 if (glob_arg.syslog_interval) {
702 setlogmask(LOG_UPTO(LOG_INFO));
703 openlog("lb", LOG_CONS | LOG_PID | LOG_NDELAY, LOG_LOCAL1);
704 }
705
706 uint32_t npipes = glob_arg.output_rings;
707
708
709 pthread_t stat_thread;
710
711 ports = calloc(npipes + 1, sizeof(struct port_des));
712 if (!ports) {
713 D("failed to allocate the stats array");
714 return 1;
715 }
716 struct port_des *rxport = &ports[npipes];
717
718 rxport->nmd = nmport_prepare(glob_arg.ifname);
719 if (rxport->nmd == NULL) {
720 D("cannot parse %s", glob_arg.ifname);
721 return (1);
722 }
723 /* extract the base name */
724 strncpy(glob_arg.base_name, rxport->nmd->hdr.nr_name, MAX_IFNAMELEN);
725
726 init_groups();
727
728 memset(&counters_buf, 0, sizeof(counters_buf));
729 counters_buf.ctrs = calloc(npipes, sizeof(struct my_ctrs));
730 if (!counters_buf.ctrs) {
731 D("failed to allocate the counters snapshot buffer");
732 return 1;
733 }
734
735 rxport->nmd->reg.nr_extra_bufs = glob_arg.extra_bufs;
736
737 if (nmport_open_desc(rxport->nmd) < 0) {
738 D("cannot open %s", glob_arg.ifname);
739 return (1);
740 }
741 D("successfully opened %s", glob_arg.ifname);
742
743 uint32_t extra_bufs = rxport->nmd->reg.nr_extra_bufs;
744 struct overflow_queue *oq = NULL;
745 /* reference ring to access the buffers */
746 rxport->ring = NETMAP_RXRING(rxport->nmd->nifp, 0);
747
748 if (!glob_arg.extra_bufs)
749 goto run;
750
751 D("obtained %d extra buffers", extra_bufs);
752 if (!extra_bufs)
753 goto run;
754
755 /* one overflow queue for each output pipe, plus one for the
756 * free extra buffers
757 */
758 oq = calloc(npipes + 1, sizeof(struct overflow_queue));
759 if (!oq) {
760 D("failed to allocated overflow queues descriptors");
761 goto run;
762 }
763
764 freeq = &oq[npipes];
765 rxport->oq = freeq;
766
767 freeq->slots = calloc(extra_bufs, sizeof(struct netmap_slot));
768 if (!freeq->slots) {
769 D("failed to allocate the free list");
770 }
771 freeq->size = extra_bufs;
772 snprintf(freeq->name, MAX_IFNAMELEN, "free queue");
773
774 /*
775 * the list of buffers uses the first uint32_t in each buffer
776 * as the index of the next buffer.
777 */
778 uint32_t scan;
779 for (scan = rxport->nmd->nifp->ni_bufs_head;
780 scan;
781 scan = *(uint32_t *)NETMAP_BUF(rxport->ring, scan))
782 {
783 struct netmap_slot s;
784 s.len = s.flags = 0;
785 s.ptr = 0;
786 s.buf_idx = scan;
787 ND("freeq <- %d", s.buf_idx);
788 oq_enq(freeq, &s);
789 }
790
791
792 if (freeq->n != extra_bufs) {
793 D("something went wrong: netmap reported %d extra_bufs, but the free list contained %d",
794 extra_bufs, freeq->n);
795 return 1;
796 }
797 rxport->nmd->nifp->ni_bufs_head = 0;
798
799 run:
800 atexit(free_buffers);
801
802 int j, t = 0;
803 for (j = 0; j < glob_arg.num_groups; j++) {
804 struct group_des *g = &groups[j];
805 int k;
806 for (k = 0; k < g->nports; ++k) {
807 struct port_des *p = &g->ports[k];
808 snprintf(p->interface, MAX_PORTNAMELEN, "%s%s{%d/xT@%d",
809 (strncmp(g->pipename, "vale", 4) ? "netmap:" : ""),
810 g->pipename, g->first_id + k,
811 rxport->nmd->reg.nr_mem_id);
812 D("opening pipe named %s", p->interface);
813
814 p->nmd = nmport_open(p->interface);
815
816 if (p->nmd == NULL) {
817 D("cannot open %s", p->interface);
818 return (1);
819 } else if (p->nmd->mem != rxport->nmd->mem) {
820 D("failed to open pipe #%d in zero-copy mode, "
821 "please close any application that uses either pipe %s}%d, "
822 "or %s{%d, and retry",
823 k + 1, g->pipename, g->first_id + k, g->pipename, g->first_id + k);
824 return (1);
825 } else {
826 struct morefrag *mf;
827
828 D("successfully opened pipe #%d %s (tx slots: %d)",
829 k + 1, p->interface, p->nmd->reg.nr_tx_slots);
830 p->ring = NETMAP_TXRING(p->nmd->nifp, 0);
831 p->last_tail = nm_ring_next(p->ring, p->ring->tail);
832 mf = (struct morefrag *)p->ring->sem;
833 mf->last_flag = 0; /* unused */
834 mf->last_hash = 0; /* unused */
835 mf->shadow_head = p->ring->head;
836 }
837 D("zerocopy %s",
838 (rxport->nmd->mem == p->nmd->mem) ? "enabled" : "disabled");
839
840 if (extra_bufs) {
841 struct overflow_queue *q = &oq[t + k];
842 q->slots = calloc(extra_bufs, sizeof(struct netmap_slot));
843 if (!q->slots) {
844 D("failed to allocate overflow queue for pipe %d", k);
845 /* make all overflow queue management fail */
846 extra_bufs = 0;
847 }
848 q->size = extra_bufs;
849 snprintf(q->name, sizeof(q->name), "oq %s{%4d", g->pipename, k);
850 p->oq = q;
851 }
852 }
853 t += g->nports;
854 }
855
856 if (glob_arg.extra_bufs && !extra_bufs) {
857 if (oq) {
858 for (i = 0; i < npipes + 1; i++) {
859 free(oq[i].slots);
860 oq[i].slots = NULL;
861 }
862 free(oq);
863 oq = NULL;
864 }
865 D("*** overflow queues disabled ***");
866 }
867
868 sleep(glob_arg.wait_link);
869
870 /* start stats thread after wait_link */
871 if (pthread_create(&stat_thread, NULL, print_stats, NULL) == -1) {
872 D("unable to create the stats thread: %s", strerror(errno));
873 return 1;
874 }
875
876 struct pollfd pollfd[npipes + 1];
877 memset(&pollfd, 0, sizeof(pollfd));
878 signal(SIGINT, sigint_h);
879
880 /* make sure we wake up as often as needed, even when there are no
881 * packets coming in
882 */
883 if (glob_arg.syslog_interval > 0 && glob_arg.syslog_interval < poll_timeout)
884 poll_timeout = glob_arg.syslog_interval;
885 if (glob_arg.stdout_interval > 0 && glob_arg.stdout_interval < poll_timeout)
886 poll_timeout = glob_arg.stdout_interval;
887
888 /* initialize the morefrag structures for the input rings */
889 for (i = rxport->nmd->first_rx_ring; i <= rxport->nmd->last_rx_ring; i++) {
890 struct netmap_ring *rxring = NETMAP_RXRING(rxport->nmd->nifp, i);
891 struct morefrag *mf = (struct morefrag *)rxring->sem;
892
893 mf->last_flag = 0;
894 mf->last_hash = 0;
895 mf->shadow_head = 0; /* unused */
896 }
897
898 while (!do_abort) {
899 u_int polli = 0;
900 iter++;
901
902 for (i = 0; i < npipes; ++i) {
903 struct netmap_ring *ring = ports[i].ring;
904 int pending = nm_tx_pending(ring);
905
906 /* if there are packets pending, we want to be notified when
907 * tail moves, so we let cur=tail
908 */
909 ring->cur = pending ? ring->tail : ring->head;
910
911 if (!glob_arg.busy_wait && !pending) {
912 /* no need to poll, there are no packets pending */
913 continue;
914 }
915 pollfd[polli].fd = ports[i].nmd->fd;
916 pollfd[polli].events = POLLOUT;
917 pollfd[polli].revents = 0;
918 ++polli;
919 }
920
921 pollfd[polli].fd = rxport->nmd->fd;
922 pollfd[polli].events = POLLIN;
923 pollfd[polli].revents = 0;
924 ++polli;
925
926 ND(5, "polling %d file descriptors", polli);
927 rv = poll(pollfd, polli, poll_timeout);
928 if (rv <= 0) {
929 if (rv < 0 && errno != EAGAIN && errno != EINTR)
930 RD(1, "poll error %s", strerror(errno));
931 goto send_stats;
932 }
933
934 /* if there are several groups, try pushing released packets from
935 * upstream groups to the downstream ones.
936 *
937 * It is important to do this before returned slots are reused
938 * for new transmissions. For the same reason, this must be
939 * done starting from the last group going backwards.
940 */
941 for (i = glob_arg.num_groups - 1U; i > 0; i--) {
942 struct group_des *g = &groups[i - 1];
943
944 for (j = 0; j < g->nports; j++) {
945 struct port_des *p = &g->ports[j];
946 struct netmap_ring *ring = p->ring;
947 uint32_t last = p->last_tail,
948 stop = nm_ring_next(ring, ring->tail);
949
950 /* slight abuse of the API here: we touch the slot
951 * pointed to by tail
952 */
953 for ( ; last != stop; last = nm_ring_next(ring, last)) {
954 struct netmap_slot *rs = &ring->slot[last];
955 // XXX less aggressive?
956 rs->buf_idx = forward_packet(g + 1, rs);
957 rs->flags = NS_BUF_CHANGED;
958 rs->ptr = 0;
959 }
960 p->last_tail = last;
961 }
962 }
963
964
965
966 if (oq) {
967 /* try to push packets from the overflow queues
968 * to the corresponding pipes
969 */
970 for (i = 0; i < npipes; i++) {
971 struct port_des *p = &ports[i];
972 struct overflow_queue *q = p->oq;
973 uint32_t k;
974 int64_t lim;
975 struct netmap_ring *ring;
976 struct netmap_slot *slot;
977 struct morefrag *mf;
978
979 if (oq_empty(q))
980 continue;
981 ring = p->ring;
982 mf = (struct morefrag *)ring->sem;
983 lim = ring->tail - mf->shadow_head;
984 if (!lim)
985 continue;
986 if (lim < 0)
987 lim += ring->num_slots;
988 if (q->n < lim)
989 lim = q->n;
990 for (k = 0; k < lim; k++) {
991 struct netmap_slot s = oq_deq(q), tmp;
992 tmp.ptr = 0;
993 slot = &ring->slot[mf->shadow_head];
994 tmp.buf_idx = slot->buf_idx;
995 oq_enq(freeq, &tmp);
996 *slot = s;
997 slot->flags |= NS_BUF_CHANGED;
998 mf->shadow_head = nm_ring_next(ring, mf->shadow_head);
999 if (!(slot->flags & NS_MOREFRAG))
1000 ring->head = mf->shadow_head;
1001 }
1002 }
1003 }
1004
1005 /* push any new packets from the input port to the first group */
1006 int batch = 0;
1007 for (i = rxport->nmd->first_rx_ring; i <= rxport->nmd->last_rx_ring; i++) {
1008 struct netmap_ring *rxring = NETMAP_RXRING(rxport->nmd->nifp, i);
1009 struct morefrag *mf = (struct morefrag *)rxring->sem;
1010
1011 //D("prepare to scan rings");
1012 int next_head = rxring->head;
1013 struct netmap_slot *next_slot = &rxring->slot[next_head];
1014 const char *next_buf = NETMAP_BUF(rxring, next_slot->buf_idx);
1015 while (!nm_ring_empty(rxring)) {
1016 struct netmap_slot *rs = next_slot;
1017 struct group_des *g = &groups[0];
1018 ++received_pkts;
1019 received_bytes += rs->len;
1020
1021 // CHOOSE THE CORRECT OUTPUT PIPE
1022 // If the previous slot had NS_MOREFRAG set, this is another
1023 // fragment of the last packet and it should go to the same
1024 // output pipe as before.
1025 if (!mf->last_flag) {
1026 // 'B' is just a hashing seed
1027 mf->last_hash = pkt_hdr_hash((const unsigned char *)next_buf, 4, 'B');
1028 }
1029 mf->last_flag = rs->flags & NS_MOREFRAG;
1030 rs->ptr = mf->last_hash;
1031 if (rs->ptr == 0) {
1032 non_ip++; // XXX ??
1033 }
1034 // prefetch the buffer for the next round
1035 next_head = nm_ring_next(rxring, next_head);
1036 next_slot = &rxring->slot[next_head];
1037 next_buf = NETMAP_BUF(rxring, next_slot->buf_idx);
1038 __builtin_prefetch(next_buf);
1039 rs->buf_idx = forward_packet(g, rs);
1040 rs->flags = NS_BUF_CHANGED;
1041 rxring->head = rxring->cur = next_head;
1042
1043 batch++;
1044 if (unlikely(batch >= glob_arg.batch)) {
1045 ioctl(rxport->nmd->fd, NIOCRXSYNC, NULL);
1046 batch = 0;
1047 }
1048 ND(1,
1049 "Forwarded Packets: %"PRIu64" Dropped packets: %"PRIu64" Percent: %.2f",
1050 forwarded, dropped,
1051 ((float)dropped / (float)forwarded * 100));
1052 }
1053
1054 }
1055
1056 send_stats:
1057 if (counters_buf.status == COUNTERS_FULL)
1058 continue;
1059 /* take a new snapshot of the counters */
1060 gettimeofday(&counters_buf.ts, NULL);
1061 for (i = 0; i < npipes; i++) {
1062 struct my_ctrs *c = &counters_buf.ctrs[i];
1063 *c = ports[i].ctr;
1064 /*
1065 * If there are overflow queues, copy the number of them for each
1066 * port to the ctrs.oq_n variable for each port.
1067 */
1068 if (ports[i].oq != NULL)
1069 c->oq_n = ports[i].oq->n;
1070 }
1071 counters_buf.received_pkts = received_pkts;
1072 counters_buf.received_bytes = received_bytes;
1073 counters_buf.non_ip = non_ip;
1074 if (freeq != NULL)
1075 counters_buf.freeq_n = freeq->n;
1076 __sync_synchronize();
1077 counters_buf.status = COUNTERS_FULL;
1078 }
1079
1080 /*
1081 * If freeq exists, copy the number to the freeq_n member of the
1082 * message struct, otherwise set it to 0.
1083 */
1084 if (freeq != NULL) {
1085 freeq_n = freeq->n;
1086 } else {
1087 freeq_n = 0;
1088 }
1089
1090 pthread_join(stat_thread, NULL);
1091
1092 printf("%"PRIu64" packets forwarded. %"PRIu64" packets dropped. Total %"PRIu64"\n", forwarded,
1093 dropped, forwarded + dropped);
1094 return 0;
1095 }
1096