1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <unistd.h>
5 #include <stdint.h>
6 #include <time.h>
7 #include <sys/time.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <fcntl.h>
11 #include <pthread.h>
12 #include <signal.h>
13 #include <sys/socket.h>
14 #include <netinet/in.h>
15 #include <arpa/inet.h>
16 #include <sys/queue.h>
17 #include <assert.h>
18 
19 #include <mos_api.h>
20 #include "cpu.h"
21 #include "rss.h"
22 #include "http_parsing.h"
23 #include "debug.h"
24 #include "applib.h"
25 
26 #define CONFIG_FILE       "config/epwget.conf"
27 static struct conf_var g_conf[] = {
28 	{ "url",               {0} },
29 	{ "total_flows",       {0} },
30 	{ "core_limit",        {0} },
31 	{ "total_concurrency", {0} },
32 	{ "dest_port",         {0} },
33 };
34 #define NUM_CONF_VAR (sizeof(g_conf) / sizeof(struct conf_var))
35 
36 #define PORT_NUM 80
37 //#define PORT_NUM 3333
38 
39 #define MAX_URL_LEN 128
40 #define MAX_FILE_LEN 128
41 #define HTTP_HEADER_LEN 1024
42 
43 #define IP_RANGE 1
44 #define MAX_IP_STR_LEN 16
45 
46 #define BUF_SIZE (8*1024)
47 
48 /*----------------------------------------------------------------------------*/
49 struct mtcp_conf g_mcfg;
50 static pthread_t mtcp_thread[MAX_CPUS];
51 /*----------------------------------------------------------------------------*/
52 static mctx_t g_mctx[MAX_CPUS];
53 static int done[MAX_CPUS];
54 /*----------------------------------------------------------------------------*/
55 static int num_cores;
56 static int core_limit;
57 /*----------------------------------------------------------------------------*/
58 static int fio = FALSE;
59 static char outfile[MAX_FILE_LEN + 1];
60 /*----------------------------------------------------------------------------*/
61 static char host[MAX_IP_STR_LEN + 1];
62 static char path[MAX_URL_LEN + 1];
63 static in_addr_t daddr;
64 static in_port_t dport;
65 static in_addr_t saddr;
66 /*----------------------------------------------------------------------------*/
67 static int total_flows;
68 static int flows[MAX_CPUS];
69 static int flowcnt = 0;
70 static int concurrency;
71 static int max_fds;
72 static uint16_t dest_port;
73 /*----------------------------------------------------------------------------*/
74 struct wget_stat
75 {
76 	uint64_t waits;
77 	uint64_t events;
78 	uint64_t connects;
79 	uint64_t reads;
80 	uint64_t writes;
81 	uint64_t completes;
82 
83 	uint64_t errors;
84 	uint64_t timedout;
85 
86 	uint64_t sum_resp_time;
87 	uint64_t max_resp_time;
88 };
89 /*----------------------------------------------------------------------------*/
90 struct thread_context
91 {
92 	int core;
93 
94 	mctx_t mctx;
95 	int ep;
96 	struct wget_vars *wvars;
97 
98 	int target;
99 	int started;
100 	int errors;
101 	int incompletes;
102 	int done;
103 	int pending;
104 
105 	int maxevents;
106 	struct mtcp_epoll_event *events;
107 
108 	struct wget_stat stat;
109 };
110 typedef struct thread_context* thread_context_t;
111 /*----------------------------------------------------------------------------*/
112 struct wget_vars
113 {
114 	int request_sent;
115 
116 	char response[HTTP_HEADER_LEN];
117 	int resp_len;
118 	int headerset;
119 	uint32_t header_len;
120 	uint64_t file_len;
121 	uint64_t recv;
122 	uint64_t write;
123 
124 	struct timeval t_start;
125 	struct timeval t_end;
126 
127 	int fd;
128 };
129 /*----------------------------------------------------------------------------*/
130 static struct thread_context *g_ctx[MAX_CPUS];
131 static struct wget_stat *g_stat[MAX_CPUS];
132 /*----------------------------------------------------------------------------*/
133 static thread_context_t
134 CreateContext(mctx_t mctx)
135 {
136 	thread_context_t ctx;
137 
138 	ctx = (thread_context_t)calloc(1, sizeof(struct thread_context));
139 	if (!ctx) {
140 		perror("malloc");
141 		TRACE_ERROR("Failed to allocate memory for thread context.\n");
142 		return NULL;
143 	}
144 
145 	ctx->mctx = mctx;
146 	ctx->core = mctx->cpu;
147 
148 	g_mctx[ctx->core] = mctx;
149 
150 	return ctx;
151 }
152 /*----------------------------------------------------------------------------*/
153 static void
154 DestroyContext(thread_context_t ctx)
155 {
156 	free(ctx);
157 }
158 /*----------------------------------------------------------------------------*/
159 static inline int
160 CreateConnection(thread_context_t ctx)
161 {
162 	mctx_t mctx = ctx->mctx;
163 	struct mtcp_epoll_event ev;
164 	struct sockaddr_in addr;
165 	int sockid;
166 	int ret;
167 
168 	assert(mctx);
169 
170 	errno = 0;
171 	sockid = mtcp_socket(mctx, AF_INET, SOCK_STREAM, 0);
172 	if (sockid < 0) {
173 		TRACE_INFO("Failed to create socket! (%s)\n",
174 			   strerror(errno));
175 		return -1;
176 	}
177 	memset(&ctx->wvars[sockid], 0, sizeof(struct wget_vars));
178 	ret = mtcp_setsock_nonblock(mctx, sockid);
179 	if (ret < 0) {
180 		TRACE_ERROR("Failed to set socket in nonblocking mode.\n");
181 		exit(-1);
182 	}
183 
184 	addr.sin_family = AF_INET;
185 	addr.sin_addr.s_addr = daddr;
186 	addr.sin_port = dport;
187 
188 	ret = mtcp_connect(mctx, sockid,
189 			(struct sockaddr *)&addr, sizeof(struct sockaddr_in));
190 	if (ret < 0) {
191 		if (errno != EINPROGRESS) {
192 			perror("mtcp_connect");
193 			mtcp_close(mctx, sockid);
194 			return -1;
195 		}
196 	}
197 
198 	ctx->started++;
199 	ctx->pending++;
200 	ctx->stat.connects++;
201 
202 	ev.events = MOS_EPOLLOUT;
203 	ev.data.sock = sockid;
204 	mtcp_epoll_ctl(mctx, ctx->ep, MOS_EPOLL_CTL_ADD, sockid, &ev);
205 
206 	return sockid;
207 }
208 /*----------------------------------------------------------------------------*/
209 static inline void
210 CloseConnection(thread_context_t ctx, int sockid)
211 {
212 	mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_DEL, sockid, NULL);
213 	mtcp_close(ctx->mctx, sockid);
214 	ctx->pending--;
215 	ctx->done++;
216 	assert(ctx->pending >= 0);
217 	while (/*ctx->pending*/ mtcp_get_connection_cnt(ctx->mctx) < concurrency && ctx->started < ctx->target) {
218 		if (CreateConnection(ctx) < 0) {
219 			done[ctx->core] = TRUE;
220 			break;
221 		}
222 	}
223 }
224 /*----------------------------------------------------------------------------*/
225 static inline int
226 SendHTTPRequest(thread_context_t ctx, int sockid, struct wget_vars *wv)
227 {
228 	char request[HTTP_HEADER_LEN];
229 	struct mtcp_epoll_event ev;
230 	int wr;
231 	int len;
232 
233 	wv->headerset = FALSE;
234 	wv->recv = 0;
235 	wv->header_len = wv->file_len = 0;
236 
237 	snprintf(request, HTTP_HEADER_LEN, "GET %s HTTP/1.0\r\n"
238 			"User-Agent: Wget/1.12 (linux-gnu)\r\n"
239 			"Accept: */*\r\n"
240 			"Host: %s\r\n"
241 //			"Connection: Keep-Alive\r\n\r\n",
242 			"Connection: Close\r\n\r\n",
243 			path, host);
244 	len = strlen(request);
245 
246 	wr = mtcp_write(ctx->mctx, sockid, request, len);
247 	if (wr < len) {
248 		TRACE_ERROR("Socket %d: Sending HTTP request failed. "
249 				"try: %d, sent: %d\n", sockid, len, wr);
250 		CloseConnection(ctx, sockid);
251 	}
252 	ctx->stat.writes += wr;
253 	TRACE_APP("Socket %d HTTP Request of %d bytes. sent.\n", sockid, wr);
254 	wv->request_sent = TRUE;
255 
256 	ev.events = MOS_EPOLLIN;
257 	ev.data.sock = sockid;
258 	mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_MOD, sockid, &ev);
259 
260 	gettimeofday(&wv->t_start, NULL);
261 
262 	char fname[MAX_FILE_LEN + 1];
263 	if (fio) {
264 		snprintf(fname, MAX_FILE_LEN, "%s.%d", outfile, flowcnt++);
265 		wv->fd = open(fname, O_WRONLY | O_CREAT | O_TRUNC, 0644);
266 		if (wv->fd < 0) {
267 			TRACE_APP("Failed to open file descriptor for %s\n", fname);
268 			exit(1);
269 		}
270 	}
271 
272 	return 0;
273 }
274 /*----------------------------------------------------------------------------*/
275 static inline int
276 DownloadComplete(thread_context_t ctx, int sockid, struct wget_vars *wv)
277 {
278 #ifdef APP
279 	mctx_t mctx = ctx->mctx;
280 #endif
281 	uint64_t tdiff;
282 
283 	TRACE_APP("Socket %d File download complete!\n", sockid);
284 	gettimeofday(&wv->t_end, NULL);
285 	CloseConnection(ctx, sockid);
286 	ctx->stat.completes++;
287 
288 	if (wv->recv - wv->header_len != wv->file_len) {
289 		fprintf(stderr, "Response size mismatch! "
290 						"actual recved: %ld,  expected to recved: %ld\n",
291 						wv->recv-wv->header_len, wv->file_len);
292 	}
293 
294 	tdiff = (wv->t_end.tv_sec - wv->t_start.tv_sec) * 1000000 +
295 			(wv->t_end.tv_usec - wv->t_start.tv_usec);
296 	TRACE_APP("Socket %d Total received bytes: %lu (%luMB)\n",
297 			sockid, wv->recv, wv->recv / 1000000);
298 	TRACE_APP("Socket %d Total spent time: %lu us\n", sockid, tdiff);
299 	if (tdiff > 0) {
300 		TRACE_APP("Socket %d Average bandwidth: %lf[MB/s]\n",
301 				sockid, (double)wv->recv / tdiff);
302 	}
303 	ctx->stat.sum_resp_time += tdiff;
304 	if (tdiff > ctx->stat.max_resp_time)
305 		ctx->stat.max_resp_time = tdiff;
306 
307 	if (fio && wv->fd > 0)
308 		close(wv->fd);
309 
310 	return 0;
311 }
312 /*----------------------------------------------------------------------------*/
313 static inline int
314 HandleReadEvent(thread_context_t ctx, int sockid, struct wget_vars *wv)
315 {
316 	mctx_t mctx = ctx->mctx;
317 	char buf[BUF_SIZE];
318 	char *pbuf;
319 	int rd, copy_len;
320 
321 	rd = 1;
322 	while (rd > 0) {
323 		rd = mtcp_read(mctx, sockid, buf, BUF_SIZE);
324 		if (rd <= 0)
325 			break;
326 		ctx->stat.reads += rd;
327 
328 		TRACE_APP("Socket %d: mtcp_read ret: %d, total_recv: %lu, "
329 				"header_set: %d, header_len: %u, file_len: %lu\n",
330 				sockid, rd, wv->recv + rd,
331 				wv->headerset, wv->header_len, wv->file_len);
332 
333 		pbuf = buf;
334 		if (!wv->headerset) {
335 			copy_len = MIN(rd, HTTP_HEADER_LEN - wv->resp_len);
336 			memcpy(wv->response + wv->resp_len, buf, copy_len);
337 			wv->resp_len += copy_len;
338 			wv->header_len = find_http_header(wv->response, wv->resp_len);
339 			if (wv->header_len > 0) {
340 				wv->response[wv->header_len] = '\0';
341 				wv->file_len = http_header_long_val(wv->response,
342 						CONTENT_LENGTH_HDR, sizeof(CONTENT_LENGTH_HDR) - 1);
343 				TRACE_APP("Socket %d Parsed response header. "
344 						"Header length: %u, File length: %lu (%luMB)\n",
345 						sockid, wv->header_len,
346 						wv->file_len, wv->file_len / 1024 / 1024);
347 				wv->headerset = TRUE;
348 				wv->recv += (rd - (wv->resp_len - wv->header_len));
349 
350 				pbuf += (rd - (wv->resp_len - wv->header_len));
351 				rd = (wv->resp_len - wv->header_len);
352 
353 			} else {
354 				/* failed to parse response header */
355 				wv->recv += rd;
356 				rd = 0;
357 				ctx->stat.errors++;
358 				ctx->errors++;
359 				CloseConnection(ctx, sockid);
360 				return 0;
361 			}
362 		}
363 		wv->recv += rd;
364 
365 		if (fio && wv->fd > 0) {
366 			int wr = 0;
367 			while (wr < rd) {
368 				int _wr = write(wv->fd, pbuf + wr, rd - wr);
369 				assert (_wr == rd - wr);
370 				 if (_wr < 0) {
371 					 perror("write");
372 					 TRACE_ERROR("Failed to write.\n");
373 					 assert(0);
374 					 break;
375 				 }
376 				 wr += _wr;
377 				 wv->write += _wr;
378 			}
379 		}
380 
381 		if (wv->header_len && (wv->recv >= wv->header_len + wv->file_len)) {
382 			break;
383 		}
384 	}
385 
386 	if (rd > 0) {
387 		if (wv->header_len && (wv->recv >= wv->header_len + wv->file_len)) {
388 			TRACE_APP("Socket %d Done Write: "
389 					"header: %u file: %lu recv: %lu write: %lu\n",
390 					sockid, wv->header_len, wv->file_len,
391 					wv->recv - wv->header_len, wv->write);
392 			DownloadComplete(ctx, sockid, wv);
393 
394 			return 0;
395 		}
396 
397 	} else if (rd == 0) {
398 		/* connection closed by remote host */
399 		TRACE_DBG("Socket %d connection closed with server.\n", sockid);
400 
401 		if (wv->header_len && (wv->recv >= wv->header_len + wv->file_len)) {
402 			DownloadComplete(ctx, sockid, wv);
403 		} else {
404 			ctx->stat.errors++;
405 			ctx->incompletes++;
406 			CloseConnection(ctx, sockid);
407 		}
408 
409 	} else if (rd < 0) {
410 		if (errno != EAGAIN) {
411 			TRACE_DBG("Socket %d: mtcp_read() error %s\n",
412 					sockid, strerror(errno));
413 			ctx->stat.errors++;
414 			ctx->errors++;
415 			CloseConnection(ctx, sockid);
416 		}
417 	}
418 
419 	return 0;
420 }
421 /*----------------------------------------------------------------------------*/
422 static void
423 PrintStats()
424 {
425 	struct wget_stat total = {0};
426 	struct wget_stat *st;
427 	uint64_t avg_resp_time;
428 	uint64_t total_resp_time = 0;
429 	int i;
430 
431 	for (i = 0; i < core_limit; i++) {
432 		st = g_stat[i];
433 		if (!st)
434 			continue;
435 		avg_resp_time = st->completes? st->sum_resp_time / st->completes : 0;
436 
437 		total.waits += st->waits;
438 		total.events += st->events;
439 		total.connects += st->connects;
440 		total.reads += st->reads;
441 		total.writes += st->writes;
442 		total.completes += st->completes;
443 		total_resp_time += avg_resp_time;
444 		if (st->max_resp_time > total.max_resp_time)
445 			total.max_resp_time = st->max_resp_time;
446 		total.errors += st->errors;
447 		total.timedout += st->timedout;
448 
449 		memset(st, 0, sizeof(struct wget_stat));
450 	}
451 	fprintf(stderr, "[ ALL ] connect: %7lu, read: %4lu MB, write: %4lu MB, "
452 			"completes: %7lu (resp_time avg: %4lu, max: %6lu us)\n",
453 			total.connects,
454 			total.reads / 1024 / 1024, total.writes / 1024 / 1024,
455 			total.completes, total_resp_time / core_limit, total.max_resp_time);
456 }
457 /*----------------------------------------------------------------------------*/
458 static void
459 GlbInitWget()
460 {
461 	struct mtcp_conf mcfg;
462 	char *url;
463 	int total_concurrency = 0;
464 	int flow_per_thread;
465 	int flow_remainder_cnt;
466 	int i;
467 
468 	num_cores = GetNumCPUs();
469 	core_limit = num_cores;
470 	concurrency = 100;
471 	total_flows = -1;
472 
473 	LoadConfig(CONFIG_FILE, g_conf, NUM_CONF_VAR);
474 
475 	url = g_conf[0].value;
476 	total_flows = atoi(g_conf[1].value);
477 	core_limit = atoi(g_conf[2].value);
478 	total_concurrency = atoi(g_conf[3].value);
479 	dest_port = atoi(g_conf[4].value);
480 
481 	if ((strlen(url) == 0)
482 			|| (strlen(url) > MAX_URL_LEN)
483 			|| (total_flows <= 0)) {
484 		TRACE_INFO("Invalid configuration\n");
485 		exit(0);
486 	}
487 
488 	char* slash_p = strchr(url, '/');
489 	if (slash_p) {
490 		strncpy(host, url, slash_p - url);
491 		strncpy(path, strchr(url, '/'), MAX_URL_LEN);
492 	} else {
493 		strncpy(host, url, MAX_IP_STR_LEN);
494 		strncpy(path, "/", 1);
495 	}
496 
497 	daddr = inet_addr(host);
498 	dport = (dest_port == 0) ? htons(PORT_NUM) : htons(dest_port);
499 	saddr = INADDR_ANY;
500 
501 	if (total_flows < core_limit) {
502 		core_limit = total_flows;
503 	}
504 
505 	/* per-core concurrency = total_concurrency / # cores */
506 	if (total_concurrency > 0)
507 		concurrency = total_concurrency / core_limit;
508 
509 	/* set the max number of fds 3x larger than concurrency */
510 	max_fds = concurrency * 3;
511 
512 	TRACE_CONFIG("Application configuration:\n");
513 	TRACE_CONFIG("URL: %s\n", path);
514 	TRACE_CONFIG("# of total_flows: %d\n", total_flows);
515 	TRACE_CONFIG("# of cores: %d\n", core_limit);
516 	TRACE_CONFIG("Concurrency: %d\n", total_concurrency);
517 
518 	mtcp_getconf(&mcfg);
519 	mcfg.max_concurrency = max_fds;
520 	mcfg.max_num_buffers = max_fds;
521 	mtcp_setconf(&mcfg);
522 
523 	flow_per_thread = total_flows / core_limit;
524 	flow_remainder_cnt = total_flows % core_limit;
525 
526 	for (i = 0; i < MAX_CPUS; i++) {
527 		done[i] = FALSE;
528 		flows[i] = flow_per_thread;
529 
530 		if (flow_remainder_cnt-- > 0)
531 			flows[i]++;
532 	}
533 
534 	return;
535 }
536 /*----------------------------------------------------------------------------*/
537 static void
538 InitWget(mctx_t mctx, void **app_ctx)
539 {
540 	thread_context_t ctx;
541 	int ep;
542 
543 	assert(mctx);
544 
545 	int core = mctx->cpu;
546 
547 	ctx = CreateContext(mctx);
548 	if (!ctx)
549 		exit(-1);
550 	g_ctx[core] = ctx;
551 	g_stat[core] = &ctx->stat;
552 	srand(time(NULL));
553 
554 	mtcp_init_rss(mctx, saddr, IP_RANGE, daddr, dport);
555 
556 	if (flows[core] == 0) {
557 		TRACE_DBG("Application thread %d finished.\n", core);
558 		exit(-1);
559 	}
560 	ctx->target = flows[core];
561 
562 	/* Initialization */
563 	ctx->maxevents = max_fds * 3;
564 	ep = mtcp_epoll_create(mctx, ctx->maxevents);
565 	if (ep < 0) {
566 		TRACE_ERROR("Failed to create epoll struct!n");
567 		exit(EXIT_FAILURE);
568 	}
569 	ctx->events = (struct mtcp_epoll_event *)
570 			calloc(ctx->maxevents, sizeof(struct mtcp_epoll_event));
571 	if (!ctx->events) {
572 		TRACE_ERROR("Failed to allocate events!\n");
573 		exit(EXIT_FAILURE);
574 	}
575 	ctx->ep = ep;
576 
577 	ctx->wvars = (struct wget_vars *)calloc(max_fds, sizeof(struct wget_vars));
578 	if (!ctx->wvars) {
579 		TRACE_ERROR("Failed to create wget variables!\n");
580 		exit(EXIT_FAILURE);
581 	}
582 
583 	ctx->started = ctx->done = ctx->pending = 0;
584 	ctx->errors = ctx->incompletes = 0;
585 
586 	*app_ctx = ctx;
587 
588 	return;
589 }
590 /*----------------------------------------------------------------------------*/
591 static void
592 RunWget(mctx_t mctx, void **app_ctx)
593 {
594 	struct in_addr daddr_in;
595 	struct timeval cur_tv, prev_tv;
596 	int nevents;
597 	int i;
598 
599 	assert(mctx);
600 	assert(*app_ctx);
601 
602 	thread_context_t ctx = *app_ctx;
603 	int core = ctx->core;
604 
605 	daddr_in.s_addr = daddr;
606 	fprintf(stderr, "Thread %d handles %d flows. connecting to %s:%u\n",
607 			core, flows[core], inet_ntoa(daddr_in), ntohs(dport));
608 
609 	gettimeofday(&cur_tv, NULL);
610 	prev_tv = cur_tv;
611 
612 	while (!done[core]) {
613 		gettimeofday(&cur_tv, NULL);
614 
615 		/* print statistics every second */
616 		if (core == 0 && cur_tv.tv_sec > prev_tv.tv_sec) {
617 			PrintStats();
618 			prev_tv = cur_tv;
619 		}
620 
621 		while (/*ctx->pending*/ mtcp_get_connection_cnt(ctx->mctx) < concurrency && ctx->started < ctx->target) {
622 			if (CreateConnection(ctx) < 0) {
623 				done[core] = TRUE;
624 				break;
625 			}
626 		}
627 
628 		nevents = mtcp_epoll_wait(mctx, ctx->ep,
629 				ctx->events, ctx->maxevents, ctx->pending ? -1 : 10);
630 		ctx->stat.waits++;
631 
632 		if (nevents < 0) {
633 			if (errno != EINTR) {
634 				TRACE_ERROR("mtcp_epoll_wait failed! ret: %d\n", nevents);
635 			}
636 			done[core] = TRUE;
637 			break;
638 		} else {
639 			ctx->stat.events += nevents;
640 		}
641 
642 		for (i = 0; i < nevents; i++) {
643 
644 			if (ctx->events[i].events & MOS_EPOLLERR) {
645 				int err;
646 				socklen_t len = sizeof(err);
647 
648 				TRACE_APP("[CPU %d] Error on socket %d\n",
649 						core, ctx->events[i].data.sockid);
650 				ctx->stat.errors++;
651 				ctx->errors++;
652 				if (mtcp_getsockopt(mctx, ctx->events[i].data.sock,
653 							SOL_SOCKET, SO_ERROR, (void *)&err, &len) == 0) {
654 					if (err == ETIMEDOUT)
655 						ctx->stat.timedout++;
656 				}
657 				CloseConnection(ctx, ctx->events[i].data.sock);
658 
659 			} else if (ctx->events[i].events & MOS_EPOLLIN) {
660 				HandleReadEvent(ctx,
661 						ctx->events[i].data.sock,
662 						&ctx->wvars[ctx->events[i].data.sock]);
663 
664 			} else if (ctx->events[i].events == MOS_EPOLLOUT) {
665 				struct wget_vars *wv = &ctx->wvars[ctx->events[i].data.sock];
666 
667 				if (!wv->request_sent) {
668 					SendHTTPRequest(ctx, ctx->events[i].data.sock, wv);
669 				} else {
670 					//TRACE_DBG("Request already sent.\n");
671 				}
672 
673 			} else {
674 				TRACE_ERROR("Socket %d: event: %s\n",
675 						ctx->events[i].data.sock,
676 						EventToString(ctx->events[i].events));
677 				assert(0);
678 			}
679 		}
680 
681 		if (ctx->done >= ctx->target) {
682 			fprintf(stdout, "Completed %d connections, "
683 					"errors: %d incompletes: %d\n",
684 					ctx->done, ctx->errors, ctx->incompletes);
685 			break;
686 		}
687 	}
688 
689 	TRACE_INFO("Wget thread %d waiting for mtcp to be destroyed.\n", core);
690 
691 	g_stat[core] = NULL;
692 	g_ctx[core] = NULL;
693 	DestroyContext(ctx);
694 
695 	return;
696 }
697 /*----------------------------------------------------------------------------*/
698 void
699 RunApplication(mctx_t mctx)
700 {
701 	void *app_ctx;
702 
703 	app_ctx = (void *)calloc(1, sizeof(void *));
704 	if (!app_ctx) {
705 		TRACE_ERROR("calloc failure\n");
706 		return;
707 	}
708 
709 	TRACE_INFO("run application on core %d\n", mctx->cpu);
710 	InitWget(mctx, &(app_ctx));
711 	RunWget(mctx, &(app_ctx));
712 }
713 /*----------------------------------------------------------------------------*/
714 void *
715 RunMTCP(void *arg)
716 {
717 	int core = *(int *)arg;
718 	mctx_t mctx;
719 
720 	mtcp_core_affinitize(core);
721 
722 	/* mTCP Initialization */
723 	mctx = mtcp_create_context(core);
724 	if (!mctx) {
725 		pthread_exit(NULL);
726 		TRACE_ERROR("Failed to craete mtcp context.\n");
727 		return NULL;
728 	}
729 
730 	/* Run application here */
731 	RunApplication(mctx);
732 
733 	/* mTCP Tear Down */
734 	mtcp_destroy_context(mctx);
735 	pthread_exit(NULL);
736 
737 	return NULL;
738 }
739 /*----------------------------------------------------------------------------*/
740 int
741 main(int argc, char **argv)
742 {
743 	int ret, i;
744 	int cores[MAX_CPUS];
745 	char *fname = "config/mos.conf";
746 
747 	int opt;
748 	while ((opt = getopt(argc, argv, "f:")) != -1) {
749 		switch (opt) {
750 			case 'f':
751 				fname = optarg;
752 				break;
753 			default:
754 				printf("Usage: %s [-f config_file]\n", argv[0]);
755 				return 0;
756 		}
757 
758 	}
759 
760 	core_limit = sysconf(_SC_NPROCESSORS_ONLN);
761 
762 	ret = mtcp_init(fname);
763 	if (ret) {
764 		TRACE_ERROR("Failed to initialize mtcp.\n");
765 		exit(EXIT_FAILURE);
766 	}
767 
768 	mtcp_getconf(&g_mcfg);
769 
770 	core_limit = g_mcfg.num_cores;
771 
772 	GlbInitWget();
773 
774 	for (i = 0; i < core_limit; i++) {
775 		cores[i] = i;
776 
777 		/* Run mtcp thread */
778 		if ((g_mcfg.cpu_mask & (1L << i)) &&
779 			pthread_create(&mtcp_thread[i], NULL, RunMTCP, (void *)&cores[i])) {
780 			perror("pthread_create");
781 			TRACE_ERROR("Failed to create msg_test thread.\n");
782 			exit(-1);
783 		}
784 	}
785 
786 	for (i = 0; i < core_limit; i++) {
787 		if (g_mcfg.cpu_mask & (1L << i))
788 			pthread_join(mtcp_thread[i], NULL);
789 		TRACE_INFO("Message test thread %d joined.\n", i);
790 	}
791 
792 	mtcp_destroy();
793 	return 0;
794 }
795 /*----------------------------------------------------------------------------*/
796