xref: /freebsd-13.1/tools/tools/netmap/lb.c (revision 2c19e8ed)
1 /*
2  * Copyright (C) 2017 Corelight, Inc. and Universita` di Pisa. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  *   1. Redistributions of source code must retain the above copyright
8  *      notice, this list of conditions and the following disclaimer.
9  *   2. Redistributions in binary form must reproduce the above copyright
10  *      notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
23  * SUCH DAMAGE.
24  */
25 /* $FreeBSD$ */
26 #include <stdio.h>
27 #include <string.h>
28 #include <ctype.h>
29 #include <stdbool.h>
30 #include <inttypes.h>
31 #include <syslog.h>
32 
33 #define NETMAP_WITH_LIBS
34 #include <net/netmap_user.h>
35 #include <sys/poll.h>
36 
37 #include <netinet/in.h>		/* htonl */
38 
39 #include <pthread.h>
40 
41 #include "pkt_hash.h"
42 #include "ctrs.h"
43 
44 
45 /*
46  * use our version of header structs, rather than bringing in a ton
47  * of platform specific ones
48  */
49 #ifndef ETH_ALEN
50 #define ETH_ALEN 6
51 #endif
52 
53 struct compact_eth_hdr {
54 	unsigned char h_dest[ETH_ALEN];
55 	unsigned char h_source[ETH_ALEN];
56 	u_int16_t h_proto;
57 };
58 
59 struct compact_ip_hdr {
60 	u_int8_t ihl:4, version:4;
61 	u_int8_t tos;
62 	u_int16_t tot_len;
63 	u_int16_t id;
64 	u_int16_t frag_off;
65 	u_int8_t ttl;
66 	u_int8_t protocol;
67 	u_int16_t check;
68 	u_int32_t saddr;
69 	u_int32_t daddr;
70 };
71 
72 struct compact_ipv6_hdr {
73 	u_int8_t priority:4, version:4;
74 	u_int8_t flow_lbl[3];
75 	u_int16_t payload_len;
76 	u_int8_t nexthdr;
77 	u_int8_t hop_limit;
78 	struct in6_addr saddr;
79 	struct in6_addr daddr;
80 };
81 
82 #define MAX_IFNAMELEN 	64
83 #define MAX_PORTNAMELEN	(MAX_IFNAMELEN + 40)
84 #define DEF_OUT_PIPES 	2
85 #define DEF_EXTRA_BUFS 	0
86 #define DEF_BATCH	2048
87 #define DEF_WAIT_LINK	2
88 #define DEF_STATS_INT	600
89 #define BUF_REVOKE	100
90 #define STAT_MSG_MAXSIZE 1024
91 
92 static struct {
93 	char ifname[MAX_IFNAMELEN];
94 	char base_name[MAX_IFNAMELEN];
95 	int netmap_fd;
96 	uint16_t output_rings;
97 	uint16_t num_groups;
98 	uint32_t extra_bufs;
99 	uint16_t batch;
100 	int stdout_interval;
101 	int syslog_interval;
102 	int wait_link;
103 	bool busy_wait;
104 } glob_arg;
105 
106 /*
107  * the overflow queue is a circular queue of buffers
108  */
109 struct overflow_queue {
110 	char name[MAX_IFNAMELEN + 16];
111 	struct netmap_slot *slots;
112 	uint32_t head;
113 	uint32_t tail;
114 	uint32_t n;
115 	uint32_t size;
116 };
117 
118 static struct overflow_queue *freeq;
119 
120 static inline int
121 oq_full(struct overflow_queue *q)
122 {
123 	return q->n >= q->size;
124 }
125 
126 static inline int
127 oq_empty(struct overflow_queue *q)
128 {
129 	return q->n <= 0;
130 }
131 
132 static inline void
133 oq_enq(struct overflow_queue *q, const struct netmap_slot *s)
134 {
135 	if (unlikely(oq_full(q))) {
136 		D("%s: queue full!", q->name);
137 		abort();
138 	}
139 	q->slots[q->tail] = *s;
140 	q->n++;
141 	q->tail++;
142 	if (q->tail >= q->size)
143 		q->tail = 0;
144 }
145 
146 static inline struct netmap_slot
147 oq_deq(struct overflow_queue *q)
148 {
149 	struct netmap_slot s = q->slots[q->head];
150 	if (unlikely(oq_empty(q))) {
151 		D("%s: queue empty!", q->name);
152 		abort();
153 	}
154 	q->n--;
155 	q->head++;
156 	if (q->head >= q->size)
157 		q->head = 0;
158 	return s;
159 }
160 
161 static volatile int do_abort = 0;
162 
163 static uint64_t dropped = 0;
164 static uint64_t forwarded = 0;
165 static uint64_t received_bytes = 0;
166 static uint64_t received_pkts = 0;
167 static uint64_t non_ip = 0;
168 static uint32_t freeq_n = 0;
169 
170 struct port_des {
171 	char interface[MAX_PORTNAMELEN];
172 	struct my_ctrs ctr;
173 	unsigned int last_sync;
174 	uint32_t last_tail;
175 	struct overflow_queue *oq;
176 	struct nm_desc *nmd;
177 	struct netmap_ring *ring;
178 	struct group_des *group;
179 };
180 
181 static struct port_des *ports;
182 
183 /* each group of pipes receives all the packets */
184 struct group_des {
185 	char pipename[MAX_IFNAMELEN];
186 	struct port_des *ports;
187 	int first_id;
188 	int nports;
189 	int last;
190 	int custom_port;
191 };
192 
193 static struct group_des *groups;
194 
195 /* statistcs */
196 struct counters {
197 	struct timeval ts;
198 	struct my_ctrs *ctrs;
199 	uint64_t received_pkts;
200 	uint64_t received_bytes;
201 	uint64_t non_ip;
202 	uint32_t freeq_n;
203 	int status __attribute__((aligned(64)));
204 #define COUNTERS_EMPTY	0
205 #define COUNTERS_FULL	1
206 };
207 
208 static struct counters counters_buf;
209 
210 static void *
211 print_stats(void *arg)
212 {
213 	int npipes = glob_arg.output_rings;
214 	int sys_int = 0;
215 	(void)arg;
216 	struct my_ctrs cur, prev;
217 	struct my_ctrs *pipe_prev;
218 
219 	pipe_prev = calloc(npipes, sizeof(struct my_ctrs));
220 	if (pipe_prev == NULL) {
221 		D("out of memory");
222 		exit(1);
223 	}
224 
225 	char stat_msg[STAT_MSG_MAXSIZE] = "";
226 
227 	memset(&prev, 0, sizeof(prev));
228 	while (!do_abort) {
229 		int j, dosyslog = 0, dostdout = 0, newdata;
230 		uint64_t pps = 0, dps = 0, bps = 0, dbps = 0, usec = 0;
231 		struct my_ctrs x;
232 
233 		counters_buf.status = COUNTERS_EMPTY;
234 		newdata = 0;
235 		memset(&cur, 0, sizeof(cur));
236 		sleep(1);
237 		if (counters_buf.status == COUNTERS_FULL) {
238 			__sync_synchronize();
239 			newdata = 1;
240 			cur.t = counters_buf.ts;
241 			if (prev.t.tv_sec || prev.t.tv_usec) {
242 				usec = (cur.t.tv_sec - prev.t.tv_sec) * 1000000 +
243 					cur.t.tv_usec - prev.t.tv_usec;
244 			}
245 		}
246 
247 		++sys_int;
248 		if (glob_arg.stdout_interval && sys_int % glob_arg.stdout_interval == 0)
249 				dostdout = 1;
250 		if (glob_arg.syslog_interval && sys_int % glob_arg.syslog_interval == 0)
251 				dosyslog = 1;
252 
253 		for (j = 0; j < npipes; ++j) {
254 			struct my_ctrs *c = &counters_buf.ctrs[j];
255 			cur.pkts += c->pkts;
256 			cur.drop += c->drop;
257 			cur.drop_bytes += c->drop_bytes;
258 			cur.bytes += c->bytes;
259 
260 			if (usec) {
261 				x.pkts = c->pkts - pipe_prev[j].pkts;
262 				x.drop = c->drop - pipe_prev[j].drop;
263 				x.bytes = c->bytes - pipe_prev[j].bytes;
264 				x.drop_bytes = c->drop_bytes - pipe_prev[j].drop_bytes;
265 				pps = (x.pkts*1000000 + usec/2) / usec;
266 				dps = (x.drop*1000000 + usec/2) / usec;
267 				bps = ((x.bytes*1000000 + usec/2) / usec) * 8;
268 				dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8;
269 			}
270 			pipe_prev[j] = *c;
271 
272 			if ( (dosyslog || dostdout) && newdata )
273 				snprintf(stat_msg, STAT_MSG_MAXSIZE,
274 				       "{"
275 				       "\"ts\":%.6f,"
276 				       "\"interface\":\"%s\","
277 				       "\"output_ring\":%" PRIu16 ","
278 				       "\"packets_forwarded\":%" PRIu64 ","
279 				       "\"packets_dropped\":%" PRIu64 ","
280 				       "\"data_forward_rate_Mbps\":%.4f,"
281 				       "\"data_drop_rate_Mbps\":%.4f,"
282 				       "\"packet_forward_rate_kpps\":%.4f,"
283 				       "\"packet_drop_rate_kpps\":%.4f,"
284 				       "\"overflow_queue_size\":%" PRIu32
285 				       "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0),
286 				            ports[j].interface,
287 				            j,
288 				            c->pkts,
289 				            c->drop,
290 				            (double)bps / 1024 / 1024,
291 				            (double)dbps / 1024 / 1024,
292 				            (double)pps / 1000,
293 				            (double)dps / 1000,
294 				            c->oq_n);
295 
296 			if (dosyslog && stat_msg[0])
297 				syslog(LOG_INFO, "%s", stat_msg);
298 			if (dostdout && stat_msg[0])
299 				printf("%s\n", stat_msg);
300 		}
301 		if (usec) {
302 			x.pkts = cur.pkts - prev.pkts;
303 			x.drop = cur.drop - prev.drop;
304 			x.bytes = cur.bytes - prev.bytes;
305 			x.drop_bytes = cur.drop_bytes - prev.drop_bytes;
306 			pps = (x.pkts*1000000 + usec/2) / usec;
307 			dps = (x.drop*1000000 + usec/2) / usec;
308 			bps = ((x.bytes*1000000 + usec/2) / usec) * 8;
309 			dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8;
310 		}
311 
312 		if ( (dosyslog || dostdout) && newdata )
313 			snprintf(stat_msg, STAT_MSG_MAXSIZE,
314 			         "{"
315 			         "\"ts\":%.6f,"
316 			         "\"interface\":\"%s\","
317 			         "\"output_ring\":null,"
318 			         "\"packets_received\":%" PRIu64 ","
319 			         "\"packets_forwarded\":%" PRIu64 ","
320 			         "\"packets_dropped\":%" PRIu64 ","
321 			         "\"non_ip_packets\":%" PRIu64 ","
322 			         "\"data_forward_rate_Mbps\":%.4f,"
323 			         "\"data_drop_rate_Mbps\":%.4f,"
324 			         "\"packet_forward_rate_kpps\":%.4f,"
325 			         "\"packet_drop_rate_kpps\":%.4f,"
326 			         "\"free_buffer_slots\":%" PRIu32
327 			         "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0),
328 			              glob_arg.ifname,
329 			              received_pkts,
330 			              cur.pkts,
331 			              cur.drop,
332 			              counters_buf.non_ip,
333 			              (double)bps / 1024 / 1024,
334 			              (double)dbps / 1024 / 1024,
335 			              (double)pps / 1000,
336 			              (double)dps / 1000,
337 			              counters_buf.freeq_n);
338 
339 		if (dosyslog && stat_msg[0])
340 			syslog(LOG_INFO, "%s", stat_msg);
341 		if (dostdout && stat_msg[0])
342 			printf("%s\n", stat_msg);
343 
344 		prev = cur;
345 	}
346 
347 	free(pipe_prev);
348 
349 	return NULL;
350 }
351 
352 static void
353 free_buffers(void)
354 {
355 	int i, tot = 0;
356 	struct port_des *rxport = &ports[glob_arg.output_rings];
357 
358 	/* build a netmap free list with the buffers in all the overflow queues */
359 	for (i = 0; i < glob_arg.output_rings + 1; i++) {
360 		struct port_des *cp = &ports[i];
361 		struct overflow_queue *q = cp->oq;
362 
363 		if (!q)
364 			continue;
365 
366 		while (q->n) {
367 			struct netmap_slot s = oq_deq(q);
368 			uint32_t *b = (uint32_t *)NETMAP_BUF(cp->ring, s.buf_idx);
369 
370 			*b = rxport->nmd->nifp->ni_bufs_head;
371 			rxport->nmd->nifp->ni_bufs_head = s.buf_idx;
372 			tot++;
373 		}
374 	}
375 	D("added %d buffers to netmap free list", tot);
376 
377 	for (i = 0; i < glob_arg.output_rings + 1; ++i) {
378 		nm_close(ports[i].nmd);
379 	}
380 }
381 
382 
383 static void sigint_h(int sig)
384 {
385 	(void)sig;		/* UNUSED */
386 	do_abort = 1;
387 	signal(SIGINT, SIG_DFL);
388 }
389 
390 static void usage()
391 {
392 	printf("usage: lb [options]\n");
393 	printf("where options are:\n");
394 	printf("  -h              	view help text\n");
395 	printf("  -i iface        	interface name (required)\n");
396 	printf("  -p [prefix:]npipes	add a new group of output pipes\n");
397 	printf("  -B nbufs        	number of extra buffers (default: %d)\n", DEF_EXTRA_BUFS);
398 	printf("  -b batch        	batch size (default: %d)\n", DEF_BATCH);
399 	printf("  -w seconds        	wait for link up (default: %d)\n", DEF_WAIT_LINK);
400 	printf("  -W                    enable busy waiting. this will run your CPU at 100%%\n");
401 	printf("  -s seconds      	seconds between syslog stats messages (default: 0)\n");
402 	printf("  -o seconds      	seconds between stdout stats messages (default: 0)\n");
403 	exit(0);
404 }
405 
406 static int
407 parse_pipes(const char *spec)
408 {
409 	const char *end = index(spec, ':');
410 	static int max_groups = 0;
411 	struct group_des *g;
412 
413 	ND("spec %s num_groups %d", spec, glob_arg.num_groups);
414 	if (max_groups < glob_arg.num_groups + 1) {
415 		size_t size = sizeof(*g) * (glob_arg.num_groups + 1);
416 		groups = realloc(groups, size);
417 		if (groups == NULL) {
418 			D("out of memory");
419 			return 1;
420 		}
421 	}
422 	g = &groups[glob_arg.num_groups];
423 	memset(g, 0, sizeof(*g));
424 
425 	if (end != NULL) {
426 		if (end - spec > MAX_IFNAMELEN - 8) {
427 			D("name '%s' too long", spec);
428 			return 1;
429 		}
430 		if (end == spec) {
431 			D("missing prefix before ':' in '%s'", spec);
432 			return 1;
433 		}
434 		strncpy(g->pipename, spec, end - spec);
435 		g->custom_port = 1;
436 		end++;
437 	} else {
438 		/* no prefix, this group will use the
439 		 * name of the input port.
440 		 * This will be set in init_groups(),
441 		 * since here the input port may still
442 		 * be uninitialized
443 		 */
444 		end = spec;
445 	}
446 	if (*end == '\0') {
447 		g->nports = DEF_OUT_PIPES;
448 	} else {
449 		g->nports = atoi(end);
450 		if (g->nports < 1) {
451 			D("invalid number of pipes '%s' (must be at least 1)", end);
452 			return 1;
453 		}
454 	}
455 	glob_arg.output_rings += g->nports;
456 	glob_arg.num_groups++;
457 	return 0;
458 }
459 
460 /* complete the initialization of the groups data structure */
461 static void
462 init_groups(void)
463 {
464 	int i, j, t = 0;
465 	struct group_des *g = NULL;
466 	for (i = 0; i < glob_arg.num_groups; i++) {
467 		g = &groups[i];
468 		g->ports = &ports[t];
469 		for (j = 0; j < g->nports; j++)
470 			g->ports[j].group = g;
471 		t += g->nports;
472 		if (!g->custom_port)
473 			strcpy(g->pipename, glob_arg.base_name);
474 		for (j = 0; j < i; j++) {
475 			struct group_des *h = &groups[j];
476 			if (!strcmp(h->pipename, g->pipename))
477 				g->first_id += h->nports;
478 		}
479 	}
480 	g->last = 1;
481 }
482 
483 /* push the packet described by slot rs to the group g.
484  * This may cause other buffers to be pushed down the
485  * chain headed by g.
486  * Return a free buffer.
487  */
488 static uint32_t
489 forward_packet(struct group_des *g, struct netmap_slot *rs)
490 {
491 	uint32_t hash = rs->ptr;
492 	uint32_t output_port = hash % g->nports;
493 	struct port_des *port = &g->ports[output_port];
494 	struct netmap_ring *ring = port->ring;
495 	struct overflow_queue *q = port->oq;
496 
497 	/* Move the packet to the output pipe, unless there is
498 	 * either no space left on the ring, or there is some
499 	 * packet still in the overflow queue (since those must
500 	 * take precedence over the new one)
501 	*/
502 	if (ring->head != ring->tail && (q == NULL || oq_empty(q))) {
503 		struct netmap_slot *ts = &ring->slot[ring->head];
504 		struct netmap_slot old_slot = *ts;
505 
506 		ts->buf_idx = rs->buf_idx;
507 		ts->len = rs->len;
508 		ts->flags |= NS_BUF_CHANGED;
509 		ts->ptr = rs->ptr;
510 		ring->head = nm_ring_next(ring, ring->head);
511 		port->ctr.bytes += rs->len;
512 		port->ctr.pkts++;
513 		forwarded++;
514 		return old_slot.buf_idx;
515 	}
516 
517 	/* use the overflow queue, if available */
518 	if (q == NULL || oq_full(q)) {
519 		/* no space left on the ring and no overflow queue
520 		 * available: we are forced to drop the packet
521 		 */
522 		dropped++;
523 		port->ctr.drop++;
524 		port->ctr.drop_bytes += rs->len;
525 		return rs->buf_idx;
526 	}
527 
528 	oq_enq(q, rs);
529 
530 	/*
531 	 * we cannot continue down the chain and we need to
532 	 * return a free buffer now. We take it from the free queue.
533 	 */
534 	if (oq_empty(freeq)) {
535 		/* the free queue is empty. Revoke some buffers
536 		 * from the longest overflow queue
537 		 */
538 		uint32_t j;
539 		struct port_des *lp = &ports[0];
540 		uint32_t max = lp->oq->n;
541 
542 		/* let lp point to the port with the longest queue */
543 		for (j = 1; j < glob_arg.output_rings; j++) {
544 			struct port_des *cp = &ports[j];
545 			if (cp->oq->n > max) {
546 				lp = cp;
547 				max = cp->oq->n;
548 			}
549 		}
550 
551 		/* move the oldest BUF_REVOKE buffers from the
552 		 * lp queue to the free queue
553 		 */
554 		// XXX optimize this cycle
555 		for (j = 0; lp->oq->n && j < BUF_REVOKE; j++) {
556 			struct netmap_slot tmp = oq_deq(lp->oq);
557 
558 			dropped++;
559 			lp->ctr.drop++;
560 			lp->ctr.drop_bytes += tmp.len;
561 
562 			oq_enq(freeq, &tmp);
563 		}
564 
565 		ND(1, "revoked %d buffers from %s", j, lq->name);
566 	}
567 
568 	return oq_deq(freeq).buf_idx;
569 }
570 
571 int main(int argc, char **argv)
572 {
573 	int ch;
574 	uint32_t i;
575 	int rv;
576 	unsigned int iter = 0;
577 	int poll_timeout = 10; /* default */
578 
579 	glob_arg.ifname[0] = '\0';
580 	glob_arg.output_rings = 0;
581 	glob_arg.batch = DEF_BATCH;
582 	glob_arg.wait_link = DEF_WAIT_LINK;
583 	glob_arg.busy_wait = false;
584 	glob_arg.syslog_interval = 0;
585 	glob_arg.stdout_interval = 0;
586 
587 	while ( (ch = getopt(argc, argv, "hi:p:b:B:s:o:w:W")) != -1) {
588 		switch (ch) {
589 		case 'i':
590 			D("interface is %s", optarg);
591 			if (strlen(optarg) > MAX_IFNAMELEN - 8) {
592 				D("ifname too long %s", optarg);
593 				return 1;
594 			}
595 			if (strncmp(optarg, "netmap:", 7) && strncmp(optarg, "vale", 4)) {
596 				sprintf(glob_arg.ifname, "netmap:%s", optarg);
597 			} else {
598 				strcpy(glob_arg.ifname, optarg);
599 			}
600 			break;
601 
602 		case 'p':
603 			if (parse_pipes(optarg)) {
604 				usage();
605 				return 1;
606 			}
607 			break;
608 
609 		case 'B':
610 			glob_arg.extra_bufs = atoi(optarg);
611 			D("requested %d extra buffers", glob_arg.extra_bufs);
612 			break;
613 
614 		case 'b':
615 			glob_arg.batch = atoi(optarg);
616 			D("batch is %d", glob_arg.batch);
617 			break;
618 
619 		case 'w':
620 			glob_arg.wait_link = atoi(optarg);
621 			D("link wait for up time is %d", glob_arg.wait_link);
622 			break;
623 
624 		case 'W':
625 			glob_arg.busy_wait = true;
626 			break;
627 
628 		case 'o':
629 			glob_arg.stdout_interval = atoi(optarg);
630 			break;
631 
632 		case 's':
633 			glob_arg.syslog_interval = atoi(optarg);
634 			break;
635 
636 		case 'h':
637 			usage();
638 			return 0;
639 			break;
640 
641 		default:
642 			D("bad option %c %s", ch, optarg);
643 			usage();
644 			return 1;
645 		}
646 	}
647 
648 	if (glob_arg.ifname[0] == '\0') {
649 		D("missing interface name");
650 		usage();
651 		return 1;
652 	}
653 
654 	/* extract the base name */
655 	char *nscan = strncmp(glob_arg.ifname, "netmap:", 7) ?
656 			glob_arg.ifname : glob_arg.ifname + 7;
657 	strncpy(glob_arg.base_name, nscan, MAX_IFNAMELEN - 1);
658 	for (nscan = glob_arg.base_name; *nscan && !index("-*^{}/@", *nscan); nscan++)
659 		;
660 	*nscan = '\0';
661 
662 	if (glob_arg.num_groups == 0)
663 		parse_pipes("");
664 
665 	if (glob_arg.syslog_interval) {
666 		setlogmask(LOG_UPTO(LOG_INFO));
667 		openlog("lb", LOG_CONS | LOG_PID | LOG_NDELAY, LOG_LOCAL1);
668 	}
669 
670 	uint32_t npipes = glob_arg.output_rings;
671 
672 
673 	pthread_t stat_thread;
674 
675 	ports = calloc(npipes + 1, sizeof(struct port_des));
676 	if (!ports) {
677 		D("failed to allocate the stats array");
678 		return 1;
679 	}
680 	struct port_des *rxport = &ports[npipes];
681 	init_groups();
682 
683 	memset(&counters_buf, 0, sizeof(counters_buf));
684 	counters_buf.ctrs = calloc(npipes, sizeof(struct my_ctrs));
685 	if (!counters_buf.ctrs) {
686 		D("failed to allocate the counters snapshot buffer");
687 		return 1;
688 	}
689 
690 	/* we need base_req to specify pipes and extra bufs */
691 	struct nmreq base_req;
692 	memset(&base_req, 0, sizeof(base_req));
693 
694 	base_req.nr_arg1 = npipes;
695 	base_req.nr_arg3 = glob_arg.extra_bufs;
696 
697 	rxport->nmd = nm_open(glob_arg.ifname, &base_req, 0, NULL);
698 
699 	if (rxport->nmd == NULL) {
700 		D("cannot open %s", glob_arg.ifname);
701 		return (1);
702 	} else {
703 		D("successfully opened %s (tx rings: %u)", glob_arg.ifname,
704 		  rxport->nmd->req.nr_tx_slots);
705 	}
706 
707 	uint32_t extra_bufs = rxport->nmd->req.nr_arg3;
708 	struct overflow_queue *oq = NULL;
709 	/* reference ring to access the buffers */
710 	rxport->ring = NETMAP_RXRING(rxport->nmd->nifp, 0);
711 
712 	if (!glob_arg.extra_bufs)
713 		goto run;
714 
715 	D("obtained %d extra buffers", extra_bufs);
716 	if (!extra_bufs)
717 		goto run;
718 
719 	/* one overflow queue for each output pipe, plus one for the
720 	 * free extra buffers
721 	 */
722 	oq = calloc(npipes + 1, sizeof(struct overflow_queue));
723 	if (!oq) {
724 		D("failed to allocated overflow queues descriptors");
725 		goto run;
726 	}
727 
728 	freeq = &oq[npipes];
729 	rxport->oq = freeq;
730 
731 	freeq->slots = calloc(extra_bufs, sizeof(struct netmap_slot));
732 	if (!freeq->slots) {
733 		D("failed to allocate the free list");
734 	}
735 	freeq->size = extra_bufs;
736 	snprintf(freeq->name, MAX_IFNAMELEN, "free queue");
737 
738 	/*
739 	 * the list of buffers uses the first uint32_t in each buffer
740 	 * as the index of the next buffer.
741 	 */
742 	uint32_t scan;
743 	for (scan = rxport->nmd->nifp->ni_bufs_head;
744 	     scan;
745 	     scan = *(uint32_t *)NETMAP_BUF(rxport->ring, scan))
746 	{
747 		struct netmap_slot s;
748 		s.len = s.flags = 0;
749 		s.ptr = 0;
750 		s.buf_idx = scan;
751 		ND("freeq <- %d", s.buf_idx);
752 		oq_enq(freeq, &s);
753 	}
754 
755 
756 	if (freeq->n != extra_bufs) {
757 		D("something went wrong: netmap reported %d extra_bufs, but the free list contained %d",
758 				extra_bufs, freeq->n);
759 		return 1;
760 	}
761 	rxport->nmd->nifp->ni_bufs_head = 0;
762 
763 run:
764 	atexit(free_buffers);
765 
766 	int j, t = 0;
767 	for (j = 0; j < glob_arg.num_groups; j++) {
768 		struct group_des *g = &groups[j];
769 		int k;
770 		for (k = 0; k < g->nports; ++k) {
771 			struct port_des *p = &g->ports[k];
772 			snprintf(p->interface, MAX_PORTNAMELEN, "%s%s{%d/xT@%d",
773 					(strncmp(g->pipename, "vale", 4) ? "netmap:" : ""),
774 					g->pipename, g->first_id + k,
775 					rxport->nmd->req.nr_arg2);
776 			D("opening pipe named %s", p->interface);
777 
778 			p->nmd = nm_open(p->interface, NULL, 0, rxport->nmd);
779 
780 			if (p->nmd == NULL) {
781 				D("cannot open %s", p->interface);
782 				return (1);
783 			} else if (p->nmd->req.nr_arg2 != rxport->nmd->req.nr_arg2) {
784 				D("failed to open pipe #%d in zero-copy mode, "
785 					"please close any application that uses either pipe %s}%d, "
786 				        "or %s{%d, and retry",
787 					k + 1, g->pipename, g->first_id + k, g->pipename, g->first_id + k);
788 				return (1);
789 			} else {
790 				D("successfully opened pipe #%d %s (tx slots: %d)",
791 				  k + 1, p->interface, p->nmd->req.nr_tx_slots);
792 				p->ring = NETMAP_TXRING(p->nmd->nifp, 0);
793 				p->last_tail = nm_ring_next(p->ring, p->ring->tail);
794 			}
795 			D("zerocopy %s",
796 			  (rxport->nmd->mem == p->nmd->mem) ? "enabled" : "disabled");
797 
798 			if (extra_bufs) {
799 				struct overflow_queue *q = &oq[t + k];
800 				q->slots = calloc(extra_bufs, sizeof(struct netmap_slot));
801 				if (!q->slots) {
802 					D("failed to allocate overflow queue for pipe %d", k);
803 					/* make all overflow queue management fail */
804 					extra_bufs = 0;
805 				}
806 				q->size = extra_bufs;
807 				snprintf(q->name, sizeof(q->name), "oq %s{%4d", g->pipename, k);
808 				p->oq = q;
809 			}
810 		}
811 		t += g->nports;
812 	}
813 
814 	if (glob_arg.extra_bufs && !extra_bufs) {
815 		if (oq) {
816 			for (i = 0; i < npipes + 1; i++) {
817 				free(oq[i].slots);
818 				oq[i].slots = NULL;
819 			}
820 			free(oq);
821 			oq = NULL;
822 		}
823 		D("*** overflow queues disabled ***");
824 	}
825 
826 	sleep(glob_arg.wait_link);
827 
828 	/* start stats thread after wait_link */
829 	if (pthread_create(&stat_thread, NULL, print_stats, NULL) == -1) {
830 		D("unable to create the stats thread: %s", strerror(errno));
831 		return 1;
832 	}
833 
834 	struct pollfd pollfd[npipes + 1];
835 	memset(&pollfd, 0, sizeof(pollfd));
836 	signal(SIGINT, sigint_h);
837 
838 	/* make sure we wake up as often as needed, even when there are no
839 	 * packets coming in
840 	 */
841 	if (glob_arg.syslog_interval > 0 && glob_arg.syslog_interval < poll_timeout)
842 		poll_timeout = glob_arg.syslog_interval;
843 	if (glob_arg.stdout_interval > 0 && glob_arg.stdout_interval < poll_timeout)
844 		poll_timeout = glob_arg.stdout_interval;
845 
846 	while (!do_abort) {
847 		u_int polli = 0;
848 		iter++;
849 
850 		for (i = 0; i < npipes; ++i) {
851 			struct netmap_ring *ring = ports[i].ring;
852 			int pending = nm_tx_pending(ring);
853 
854 			/* if there are packets pending, we want to be notified when
855 			 * tail moves, so we let cur=tail
856 			 */
857 			ring->cur = pending ? ring->tail : ring->head;
858 
859 			if (!glob_arg.busy_wait && !pending) {
860 				/* no need to poll, there are no packets pending */
861 				continue;
862 			}
863 			pollfd[polli].fd = ports[i].nmd->fd;
864 			pollfd[polli].events = POLLOUT;
865 			pollfd[polli].revents = 0;
866 			++polli;
867 		}
868 
869 		pollfd[polli].fd = rxport->nmd->fd;
870 		pollfd[polli].events = POLLIN;
871 		pollfd[polli].revents = 0;
872 		++polli;
873 
874 		//RD(5, "polling %d file descriptors", polli+1);
875 		rv = poll(pollfd, polli, poll_timeout);
876 		if (rv <= 0) {
877 			if (rv < 0 && errno != EAGAIN && errno != EINTR)
878 				RD(1, "poll error %s", strerror(errno));
879 			goto send_stats;
880 		}
881 
882 		/* if there are several groups, try pushing released packets from
883 		 * upstream groups to the downstream ones.
884 		 *
885 		 * It is important to do this before returned slots are reused
886 		 * for new transmissions. For the same reason, this must be
887 		 * done starting from the last group going backwards.
888 		 */
889 		for (i = glob_arg.num_groups - 1U; i > 0; i--) {
890 			struct group_des *g = &groups[i - 1];
891 
892 			for (j = 0; j < g->nports; j++) {
893 				struct port_des *p = &g->ports[j];
894 				struct netmap_ring *ring = p->ring;
895 				uint32_t last = p->last_tail,
896 					 stop = nm_ring_next(ring, ring->tail);
897 
898 				/* slight abuse of the API here: we touch the slot
899 				 * pointed to by tail
900 				 */
901 				for ( ; last != stop; last = nm_ring_next(ring, last)) {
902 					struct netmap_slot *rs = &ring->slot[last];
903 					// XXX less aggressive?
904 					rs->buf_idx = forward_packet(g + 1, rs);
905 					rs->flags |= NS_BUF_CHANGED;
906 					rs->ptr = 0;
907 				}
908 				p->last_tail = last;
909 			}
910 		}
911 
912 
913 
914 		if (oq) {
915 			/* try to push packets from the overflow queues
916 			 * to the corresponding pipes
917 			 */
918 			for (i = 0; i < npipes; i++) {
919 				struct port_des *p = &ports[i];
920 				struct overflow_queue *q = p->oq;
921 				uint32_t k, lim;
922 				struct netmap_ring *ring;
923 				struct netmap_slot *slot;
924 
925 				if (oq_empty(q))
926 					continue;
927 				ring = p->ring;
928 				lim = nm_ring_space(ring);
929 				if (!lim)
930 					continue;
931 				if (q->n < lim)
932 					lim = q->n;
933 				for (k = 0; k < lim; k++) {
934 					struct netmap_slot s = oq_deq(q), tmp;
935 					tmp.ptr = 0;
936 					slot = &ring->slot[ring->head];
937 					tmp.buf_idx = slot->buf_idx;
938 					oq_enq(freeq, &tmp);
939 					*slot = s;
940 					slot->flags |= NS_BUF_CHANGED;
941 					ring->head = nm_ring_next(ring, ring->head);
942 				}
943 			}
944 		}
945 
946 		/* push any new packets from the input port to the first group */
947 		int batch = 0;
948 		for (i = rxport->nmd->first_rx_ring; i <= rxport->nmd->last_rx_ring; i++) {
949 			struct netmap_ring *rxring = NETMAP_RXRING(rxport->nmd->nifp, i);
950 
951 			//D("prepare to scan rings");
952 			int next_head = rxring->head;
953 			struct netmap_slot *next_slot = &rxring->slot[next_head];
954 			const char *next_buf = NETMAP_BUF(rxring, next_slot->buf_idx);
955 			while (!nm_ring_empty(rxring)) {
956 				struct netmap_slot *rs = next_slot;
957 				struct group_des *g = &groups[0];
958 				++received_pkts;
959 				received_bytes += rs->len;
960 
961 				// CHOOSE THE CORRECT OUTPUT PIPE
962 				rs->ptr = pkt_hdr_hash((const unsigned char *)next_buf, 4, 'B');
963 				if (rs->ptr == 0) {
964 					non_ip++; // XXX ??
965 				}
966 				// prefetch the buffer for the next round
967 				next_head = nm_ring_next(rxring, next_head);
968 				next_slot = &rxring->slot[next_head];
969 				next_buf = NETMAP_BUF(rxring, next_slot->buf_idx);
970 				__builtin_prefetch(next_buf);
971 				// 'B' is just a hashing seed
972 				rs->buf_idx = forward_packet(g, rs);
973 				rs->flags |= NS_BUF_CHANGED;
974 				rxring->head = rxring->cur = next_head;
975 
976 				batch++;
977 				if (unlikely(batch >= glob_arg.batch)) {
978 					ioctl(rxport->nmd->fd, NIOCRXSYNC, NULL);
979 					batch = 0;
980 				}
981 				ND(1,
982 				   "Forwarded Packets: %"PRIu64" Dropped packets: %"PRIu64"   Percent: %.2f",
983 				   forwarded, dropped,
984 				   ((float)dropped / (float)forwarded * 100));
985 			}
986 
987 		}
988 
989 	send_stats:
990 		if (counters_buf.status == COUNTERS_FULL)
991 			continue;
992 		/* take a new snapshot of the counters */
993 		gettimeofday(&counters_buf.ts, NULL);
994 		for (i = 0; i < npipes; i++) {
995 			struct my_ctrs *c = &counters_buf.ctrs[i];
996 			*c = ports[i].ctr;
997 			/*
998 			 * If there are overflow queues, copy the number of them for each
999 			 * port to the ctrs.oq_n variable for each port.
1000 			 */
1001 			if (ports[i].oq != NULL)
1002 				c->oq_n = ports[i].oq->n;
1003 		}
1004 		counters_buf.received_pkts = received_pkts;
1005 		counters_buf.received_bytes = received_bytes;
1006 		counters_buf.non_ip = non_ip;
1007 		if (freeq != NULL)
1008 			counters_buf.freeq_n = freeq->n;
1009 		__sync_synchronize();
1010 		counters_buf.status = COUNTERS_FULL;
1011 	}
1012 
1013 	/*
1014 	 * If freeq exists, copy the number to the freeq_n member of the
1015 	 * message struct, otherwise set it to 0.
1016 	 */
1017 	if (freeq != NULL) {
1018 		freeq_n = freeq->n;
1019 	} else {
1020 		freeq_n = 0;
1021 	}
1022 
1023 	pthread_join(stat_thread, NULL);
1024 
1025 	printf("%"PRIu64" packets forwarded.  %"PRIu64" packets dropped. Total %"PRIu64"\n", forwarded,
1026 	       dropped, forwarded + dropped);
1027 	return 0;
1028 }
1029