1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <unistd.h>
5 #include <stdint.h>
6 #include <time.h>
7 #include <sys/time.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <fcntl.h>
11 #include <pthread.h>
12 #include <signal.h>
13 #include <sys/socket.h>
14 #include <netinet/in.h>
15 #include <arpa/inet.h>
16 #include <sys/queue.h>
17 #include <assert.h>
18 
19 #include <mos_api.h>
20 #include "cpu.h"
21 #include "rss.h"
22 #include "http_parsing.h"
23 #include "debug.h"
24 #include "applib.h"
25 
26 #define CONFIG_FILE       "config/epwget.conf"
27 static struct conf_var g_conf[] = {
28 	{ "url",               {0} },
29 	{ "total_flows",       {0} },
30 	{ "core_limit",        {0} },
31 	{ "total_concurrency", {0} },
32 	{ "dest_port",         {0} },
33 };
34 #define NUM_CONF_VAR (sizeof(g_conf) / sizeof(struct conf_var))
35 
36 #define PORT_NUM 80
37 //#define PORT_NUM 3333
38 
39 #define MAX_URL_LEN 128
40 #define MAX_FILE_LEN 128
41 #define HTTP_HEADER_LEN 1024
42 
43 #define IP_RANGE 1
44 #define MAX_IP_STR_LEN 16
45 
46 #define BUF_SIZE (8*1024)
47 
48 /*----------------------------------------------------------------------------*/
49 struct mtcp_conf g_mcfg;
50 /*----------------------------------------------------------------------------*/
51 static mctx_t g_mctx[MAX_CPUS];
52 static int done[MAX_CPUS];
53 /*----------------------------------------------------------------------------*/
54 static int num_cores;
55 static int core_limit;
56 /*----------------------------------------------------------------------------*/
57 static int fio = FALSE;
58 static char outfile[MAX_FILE_LEN + 1];
59 /*----------------------------------------------------------------------------*/
60 static char host[MAX_IP_STR_LEN + 1];
61 static char path[MAX_URL_LEN + 1];
62 static in_addr_t daddr;
63 static in_port_t dport;
64 static in_addr_t saddr;
65 /*----------------------------------------------------------------------------*/
66 static int total_flows;
67 static int flows[MAX_CPUS];
68 static int flowcnt = 0;
69 static int concurrency;
70 static int max_fds;
71 static uint16_t dest_port;
72 /*----------------------------------------------------------------------------*/
73 struct wget_stat
74 {
75 	uint64_t waits;
76 	uint64_t events;
77 	uint64_t connects;
78 	uint64_t reads;
79 	uint64_t writes;
80 	uint64_t completes;
81 
82 	uint64_t errors;
83 	uint64_t timedout;
84 
85 	uint64_t sum_resp_time;
86 	uint64_t max_resp_time;
87 };
88 /*----------------------------------------------------------------------------*/
89 struct thread_context
90 {
91 	int core;
92 
93 	mctx_t mctx;
94 	int ep;
95 	struct wget_vars *wvars;
96 
97 	int target;
98 	int started;
99 	int errors;
100 	int incompletes;
101 	int done;
102 	int pending;
103 
104 	int maxevents;
105 	struct mtcp_epoll_event *events;
106 
107 	struct wget_stat stat;
108 };
109 typedef struct thread_context* thread_context_t;
110 /*----------------------------------------------------------------------------*/
111 struct wget_vars
112 {
113 	int request_sent;
114 
115 	char response[HTTP_HEADER_LEN];
116 	int resp_len;
117 	int headerset;
118 	uint32_t header_len;
119 	uint64_t file_len;
120 	uint64_t recv;
121 	uint64_t write;
122 
123 	struct timeval t_start;
124 	struct timeval t_end;
125 
126 	int fd;
127 };
128 /*----------------------------------------------------------------------------*/
129 static struct thread_context *g_ctx[MAX_CPUS];
130 static struct wget_stat *g_stat[MAX_CPUS];
131 /*----------------------------------------------------------------------------*/
132 static thread_context_t
133 CreateContext(mctx_t mctx)
134 {
135 	thread_context_t ctx;
136 
137 	ctx = (thread_context_t)calloc(1, sizeof(struct thread_context));
138 	if (!ctx) {
139 		perror("malloc");
140 		TRACE_ERROR("Failed to allocate memory for thread context.\n");
141 		return NULL;
142 	}
143 
144 	ctx->mctx = mctx;
145 	ctx->core = mctx->cpu;
146 
147 	g_mctx[ctx->core] = mctx;
148 
149 	return ctx;
150 }
151 /*----------------------------------------------------------------------------*/
152 static void
153 DestroyContext(thread_context_t ctx)
154 {
155 	free(ctx);
156 }
157 /*----------------------------------------------------------------------------*/
158 static inline int
159 CreateConnection(thread_context_t ctx)
160 {
161 	mctx_t mctx = ctx->mctx;
162 	struct mtcp_epoll_event ev;
163 	struct sockaddr_in addr;
164 	int sockid;
165 	int ret;
166 
167 	assert(mctx);
168 
169 	errno = 0;
170 	sockid = mtcp_socket(mctx, AF_INET, SOCK_STREAM, 0);
171 	if (sockid < 0) {
172 		TRACE_INFO("Failed to create socket! (%s)\n",
173 			   strerror(errno));
174 		return -1;
175 	}
176 	memset(&ctx->wvars[sockid], 0, sizeof(struct wget_vars));
177 	ret = mtcp_setsock_nonblock(mctx, sockid);
178 	if (ret < 0) {
179 		TRACE_ERROR("Failed to set socket in nonblocking mode.\n");
180 		exit(-1);
181 	}
182 
183 	addr.sin_family = AF_INET;
184 	addr.sin_addr.s_addr = daddr;
185 	addr.sin_port = dport;
186 
187 	ret = mtcp_connect(mctx, sockid,
188 			(struct sockaddr *)&addr, sizeof(struct sockaddr_in));
189 	if (ret < 0) {
190 		if (errno != EINPROGRESS) {
191 			perror("mtcp_connect");
192 			mtcp_close(mctx, sockid);
193 			return -1;
194 		}
195 	}
196 
197 	ctx->started++;
198 	ctx->pending++;
199 	ctx->stat.connects++;
200 
201 	ev.events = MOS_EPOLLOUT;
202 	ev.data.sock = sockid;
203 	mtcp_epoll_ctl(mctx, ctx->ep, MOS_EPOLL_CTL_ADD, sockid, &ev);
204 
205 	return sockid;
206 }
207 /*----------------------------------------------------------------------------*/
208 static inline void
209 CloseConnection(thread_context_t ctx, int sockid)
210 {
211 	mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_DEL, sockid, NULL);
212 	mtcp_close(ctx->mctx, sockid);
213 	ctx->pending--;
214 	ctx->done++;
215 	assert(ctx->pending >= 0);
216 	while (/*ctx->pending*/ mtcp_get_connection_cnt(ctx->mctx) < concurrency && ctx->started < ctx->target) {
217 		if (CreateConnection(ctx) < 0) {
218 			done[ctx->core] = TRUE;
219 			break;
220 		}
221 	}
222 }
223 /*----------------------------------------------------------------------------*/
224 static inline int
225 SendHTTPRequest(thread_context_t ctx, int sockid, struct wget_vars *wv)
226 {
227 	char request[HTTP_HEADER_LEN];
228 	struct mtcp_epoll_event ev;
229 	int wr;
230 	int len;
231 
232 	wv->headerset = FALSE;
233 	wv->recv = 0;
234 	wv->header_len = wv->file_len = 0;
235 
236 	snprintf(request, HTTP_HEADER_LEN, "GET %s HTTP/1.0\r\n"
237 			"User-Agent: Wget/1.12 (linux-gnu)\r\n"
238 			"Accept: */*\r\n"
239 			"Host: %s\r\n"
240 //			"Connection: Keep-Alive\r\n\r\n",
241 			"Connection: Close\r\n\r\n",
242 			path, host);
243 	len = strlen(request);
244 
245 	wr = mtcp_write(ctx->mctx, sockid, request, len);
246 	if (wr < len) {
247 		TRACE_ERROR("Socket %d: Sending HTTP request failed. "
248 				"try: %d, sent: %d\n", sockid, len, wr);
249 		CloseConnection(ctx, sockid);
250 	}
251 	ctx->stat.writes += wr;
252 	TRACE_APP("Socket %d HTTP Request of %d bytes. sent.\n", sockid, wr);
253 	wv->request_sent = TRUE;
254 
255 	ev.events = MOS_EPOLLIN;
256 	ev.data.sock = sockid;
257 	mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_MOD, sockid, &ev);
258 
259 	gettimeofday(&wv->t_start, NULL);
260 
261 	char fname[MAX_FILE_LEN + 1];
262 	if (fio) {
263 		snprintf(fname, MAX_FILE_LEN, "%s.%d", outfile, flowcnt++);
264 		wv->fd = open(fname, O_WRONLY | O_CREAT | O_TRUNC, 0644);
265 		if (wv->fd < 0) {
266 			TRACE_APP("Failed to open file descriptor for %s\n", fname);
267 			exit(1);
268 		}
269 	}
270 
271 	return 0;
272 }
273 /*----------------------------------------------------------------------------*/
274 static inline int
275 DownloadComplete(thread_context_t ctx, int sockid, struct wget_vars *wv)
276 {
277 #ifdef APP
278 	mctx_t mctx = ctx->mctx;
279 #endif
280 	uint64_t tdiff;
281 
282 	TRACE_APP("Socket %d File download complete!\n", sockid);
283 	gettimeofday(&wv->t_end, NULL);
284 	CloseConnection(ctx, sockid);
285 	ctx->stat.completes++;
286 
287 	if (wv->recv - wv->header_len != wv->file_len) {
288 		fprintf(stderr, "Response size mismatch! "
289 						"actual recved: %ld,  expected to recved: %ld\n",
290 						wv->recv-wv->header_len, wv->file_len);
291 	}
292 
293 	tdiff = (wv->t_end.tv_sec - wv->t_start.tv_sec) * 1000000 +
294 			(wv->t_end.tv_usec - wv->t_start.tv_usec);
295 	TRACE_APP("Socket %d Total received bytes: %lu (%luMB)\n",
296 			sockid, wv->recv, wv->recv / 1000000);
297 	TRACE_APP("Socket %d Total spent time: %lu us\n", sockid, tdiff);
298 	if (tdiff > 0) {
299 		TRACE_APP("Socket %d Average bandwidth: %lf[MB/s]\n",
300 				sockid, (double)wv->recv / tdiff);
301 	}
302 	ctx->stat.sum_resp_time += tdiff;
303 	if (tdiff > ctx->stat.max_resp_time)
304 		ctx->stat.max_resp_time = tdiff;
305 
306 	if (fio && wv->fd > 0)
307 		close(wv->fd);
308 
309 	return 0;
310 }
311 /*----------------------------------------------------------------------------*/
312 static inline int
313 HandleReadEvent(thread_context_t ctx, int sockid, struct wget_vars *wv)
314 {
315 	mctx_t mctx = ctx->mctx;
316 	char buf[BUF_SIZE];
317 	char *pbuf;
318 	int rd, copy_len;
319 
320 	rd = 1;
321 	while (rd > 0) {
322 		rd = mtcp_read(mctx, sockid, buf, BUF_SIZE);
323 		if (rd <= 0)
324 			break;
325 		ctx->stat.reads += rd;
326 
327 		TRACE_APP("Socket %d: mtcp_read ret: %d, total_recv: %lu, "
328 				"header_set: %d, header_len: %u, file_len: %lu\n",
329 				sockid, rd, wv->recv + rd,
330 				wv->headerset, wv->header_len, wv->file_len);
331 
332 		pbuf = buf;
333 		if (!wv->headerset) {
334 			copy_len = MIN(rd, HTTP_HEADER_LEN - wv->resp_len);
335 			memcpy(wv->response + wv->resp_len, buf, copy_len);
336 			wv->resp_len += copy_len;
337 			wv->header_len = find_http_header(wv->response, wv->resp_len);
338 			if (wv->header_len > 0) {
339 				wv->response[wv->header_len] = '\0';
340 				wv->file_len = http_header_long_val(wv->response,
341 						CONTENT_LENGTH_HDR, sizeof(CONTENT_LENGTH_HDR) - 1);
342 				TRACE_APP("Socket %d Parsed response header. "
343 						"Header length: %u, File length: %lu (%luMB)\n",
344 						sockid, wv->header_len,
345 						wv->file_len, wv->file_len / 1024 / 1024);
346 				wv->headerset = TRUE;
347 				wv->recv += (rd - (wv->resp_len - wv->header_len));
348 
349 				pbuf += (rd - (wv->resp_len - wv->header_len));
350 				rd = (wv->resp_len - wv->header_len);
351 
352 			} else {
353 				/* failed to parse response header */
354 				wv->recv += rd;
355 				rd = 0;
356 				ctx->stat.errors++;
357 				ctx->errors++;
358 				CloseConnection(ctx, sockid);
359 				return 0;
360 			}
361 		}
362 		wv->recv += rd;
363 
364 		if (fio && wv->fd > 0) {
365 			int wr = 0;
366 			while (wr < rd) {
367 				int _wr = write(wv->fd, pbuf + wr, rd - wr);
368 				assert (_wr == rd - wr);
369 				 if (_wr < 0) {
370 					 perror("write");
371 					 TRACE_ERROR("Failed to write.\n");
372 					 assert(0);
373 					 break;
374 				 }
375 				 wr += _wr;
376 				 wv->write += _wr;
377 			}
378 		}
379 
380 		if (wv->header_len && (wv->recv >= wv->header_len + wv->file_len)) {
381 			break;
382 		}
383 	}
384 
385 	if (rd > 0) {
386 		if (wv->header_len && (wv->recv >= wv->header_len + wv->file_len)) {
387 			TRACE_APP("Socket %d Done Write: "
388 					"header: %u file: %lu recv: %lu write: %lu\n",
389 					sockid, wv->header_len, wv->file_len,
390 					wv->recv - wv->header_len, wv->write);
391 			DownloadComplete(ctx, sockid, wv);
392 
393 			return 0;
394 		}
395 
396 	} else if (rd == 0) {
397 		/* connection closed by remote host */
398 		TRACE_DBG("Socket %d connection closed with server.\n", sockid);
399 
400 		if (wv->header_len && (wv->recv >= wv->header_len + wv->file_len)) {
401 			DownloadComplete(ctx, sockid, wv);
402 		} else {
403 			ctx->stat.errors++;
404 			ctx->incompletes++;
405 			CloseConnection(ctx, sockid);
406 		}
407 
408 	} else if (rd < 0) {
409 		if (errno != EAGAIN) {
410 			TRACE_DBG("Socket %d: mtcp_read() error %s\n",
411 					sockid, strerror(errno));
412 			ctx->stat.errors++;
413 			ctx->errors++;
414 			CloseConnection(ctx, sockid);
415 		}
416 	}
417 
418 	return 0;
419 }
420 /*----------------------------------------------------------------------------*/
421 static void
422 PrintStats()
423 {
424 	struct wget_stat total = {0};
425 	struct wget_stat *st;
426 	uint64_t avg_resp_time;
427 	uint64_t total_resp_time = 0;
428 	int i;
429 
430 	for (i = 0; i < core_limit; i++) {
431 		st = g_stat[i];
432 		if (!st)
433 			continue;
434 		avg_resp_time = st->completes? st->sum_resp_time / st->completes : 0;
435 
436 		total.waits += st->waits;
437 		total.events += st->events;
438 		total.connects += st->connects;
439 		total.reads += st->reads;
440 		total.writes += st->writes;
441 		total.completes += st->completes;
442 		total_resp_time += avg_resp_time;
443 		if (st->max_resp_time > total.max_resp_time)
444 			total.max_resp_time = st->max_resp_time;
445 		total.errors += st->errors;
446 		total.timedout += st->timedout;
447 
448 		memset(st, 0, sizeof(struct wget_stat));
449 	}
450 	fprintf(stderr, "[ ALL ] connect: %7lu, read: %4lu MB, write: %4lu MB, "
451 			"completes: %7lu (resp_time avg: %4lu, max: %6lu us)\n",
452 			total.connects,
453 			total.reads / 1000 / 1000, total.writes / 1000 / 1000,
454 			total.completes, total_resp_time / core_limit, total.max_resp_time);
455 }
456 /*----------------------------------------------------------------------------*/
457 static void
458 GlbInitWget()
459 {
460 	struct mtcp_conf mcfg;
461 	char *url;
462 	int total_concurrency = 0;
463 	int flow_per_thread;
464 	int flow_remainder_cnt;
465 	int i;
466 
467 	num_cores = GetNumCPUs();
468 	core_limit = num_cores;
469 	concurrency = 100;
470 	total_flows = -1;
471 
472 	LoadConfig(CONFIG_FILE, g_conf, NUM_CONF_VAR);
473 
474 	url = g_conf[0].value;
475 	total_flows = atoi(g_conf[1].value);
476 	core_limit = atoi(g_conf[2].value);
477 	total_concurrency = atoi(g_conf[3].value);
478 	dest_port = atoi(g_conf[4].value);
479 
480 	if ((strlen(url) == 0)
481 			|| (strlen(url) > MAX_URL_LEN)
482 			|| (total_flows <= 0)) {
483 		TRACE_INFO("Invalid configuration\n");
484 		exit(0);
485 	}
486 
487 	char* slash_p = strchr(url, '/');
488 	if (slash_p) {
489 		strncpy(host, url, slash_p - url);
490 		strncpy(path, strchr(url, '/'), MAX_URL_LEN);
491 	} else {
492 		strncpy(host, url, MAX_IP_STR_LEN);
493 		strncpy(path, "/", 1);
494 	}
495 
496 	daddr = inet_addr(host);
497 	dport = (dest_port == 0) ? htons(PORT_NUM) : htons(dest_port);
498 	saddr = INADDR_ANY;
499 
500 	if (total_flows < core_limit) {
501 		core_limit = total_flows;
502 	}
503 
504 	/* per-core concurrency = total_concurrency / # cores */
505 	if (total_concurrency > 0)
506 		concurrency = total_concurrency / core_limit;
507 
508 	/* set the max number of fds 3x larger than concurrency */
509 	max_fds = concurrency * 3;
510 
511 	TRACE_CONFIG("Application configuration:\n");
512 	TRACE_CONFIG("URL: %s\n", path);
513 	TRACE_CONFIG("# of total_flows: %d\n", total_flows);
514 	TRACE_CONFIG("# of cores: %d\n", core_limit);
515 	TRACE_CONFIG("Concurrency: %d\n", total_concurrency);
516 
517 	mtcp_getconf(&mcfg);
518 	mcfg.max_concurrency = max_fds;
519 	mcfg.max_num_buffers = max_fds;
520 	mtcp_setconf(&mcfg);
521 
522 	flow_per_thread = total_flows / core_limit;
523 	flow_remainder_cnt = total_flows % core_limit;
524 
525 	for (i = 0; i < MAX_CPUS; i++) {
526 		done[i] = FALSE;
527 		flows[i] = flow_per_thread;
528 
529 		if (flow_remainder_cnt-- > 0)
530 			flows[i]++;
531 	}
532 
533 	return;
534 }
535 /*----------------------------------------------------------------------------*/
536 static void
537 InitWget(mctx_t mctx, void **app_ctx)
538 {
539 	thread_context_t ctx;
540 	int ep;
541 
542 	assert(mctx);
543 
544 	int core = mctx->cpu;
545 
546 	ctx = CreateContext(mctx);
547 	if (!ctx)
548 		exit(-1);
549 	g_ctx[core] = ctx;
550 	g_stat[core] = &ctx->stat;
551 	srand(time(NULL));
552 
553 	mtcp_init_rss(mctx, saddr, IP_RANGE, daddr, dport);
554 
555 	if (flows[core] == 0) {
556 		TRACE_DBG("Application thread %d finished.\n", core);
557 		exit(-1);
558 	}
559 	ctx->target = flows[core];
560 
561 	/* Initialization */
562 	ctx->maxevents = max_fds * 3;
563 	ep = mtcp_epoll_create(mctx, ctx->maxevents);
564 	if (ep < 0) {
565 		TRACE_ERROR("Failed to create epoll struct!n");
566 		exit(EXIT_FAILURE);
567 	}
568 	ctx->events = (struct mtcp_epoll_event *)
569 			calloc(ctx->maxevents, sizeof(struct mtcp_epoll_event));
570 	if (!ctx->events) {
571 		TRACE_ERROR("Failed to allocate events!\n");
572 		exit(EXIT_FAILURE);
573 	}
574 	ctx->ep = ep;
575 
576 	ctx->wvars = (struct wget_vars *)calloc(max_fds, sizeof(struct wget_vars));
577 	if (!ctx->wvars) {
578 		TRACE_ERROR("Failed to create wget variables!\n");
579 		exit(EXIT_FAILURE);
580 	}
581 
582 	ctx->started = ctx->done = ctx->pending = 0;
583 	ctx->errors = ctx->incompletes = 0;
584 
585 	*app_ctx = ctx;
586 
587 	return;
588 }
589 /*----------------------------------------------------------------------------*/
590 static void
591 RunWget(mctx_t mctx, void **app_ctx)
592 {
593 	struct in_addr daddr_in;
594 	struct timeval cur_tv, prev_tv;
595 	int nevents;
596 	int i;
597 
598 	assert(mctx);
599 	assert(*app_ctx);
600 
601 	thread_context_t ctx = *app_ctx;
602 	int core = ctx->core;
603 
604 	daddr_in.s_addr = daddr;
605 	fprintf(stderr, "Thread %d handles %d flows. connecting to %s:%u\n",
606 			core, flows[core], inet_ntoa(daddr_in), ntohs(dport));
607 
608 	gettimeofday(&cur_tv, NULL);
609 	prev_tv = cur_tv;
610 
611 	while (!done[core]) {
612 		gettimeofday(&cur_tv, NULL);
613 
614 		/* print statistics every second */
615 		if (core == 0 && cur_tv.tv_sec > prev_tv.tv_sec) {
616 			PrintStats();
617 			prev_tv = cur_tv;
618 		}
619 
620 		while (/*ctx->pending*/ mtcp_get_connection_cnt(ctx->mctx) < concurrency && ctx->started < ctx->target) {
621 			if (CreateConnection(ctx) < 0) {
622 				done[core] = TRUE;
623 				break;
624 			}
625 		}
626 
627 		nevents = mtcp_epoll_wait(mctx, ctx->ep,
628 				ctx->events, ctx->maxevents, ctx->pending ? -1 : 10);
629 		ctx->stat.waits++;
630 
631 		if (nevents < 0) {
632 			if (errno != EINTR) {
633 				TRACE_ERROR("mtcp_epoll_wait failed! ret: %d\n", nevents);
634 			}
635 			done[core] = TRUE;
636 			break;
637 		} else {
638 			ctx->stat.events += nevents;
639 		}
640 
641 		for (i = 0; i < nevents; i++) {
642 
643 			if (ctx->events[i].events & MOS_EPOLLERR) {
644 				int err;
645 				socklen_t len = sizeof(err);
646 
647 				TRACE_APP("[CPU %d] Error on socket %d\n",
648 						core, ctx->events[i].data.sockid);
649 				ctx->stat.errors++;
650 				ctx->errors++;
651 				if (mtcp_getsockopt(mctx, ctx->events[i].data.sock,
652 							SOL_SOCKET, SO_ERROR, (void *)&err, &len) == 0) {
653 					if (err == ETIMEDOUT)
654 						ctx->stat.timedout++;
655 				}
656 				CloseConnection(ctx, ctx->events[i].data.sock);
657 
658 			} else if (ctx->events[i].events & MOS_EPOLLIN) {
659 				HandleReadEvent(ctx,
660 						ctx->events[i].data.sock,
661 						&ctx->wvars[ctx->events[i].data.sock]);
662 
663 			} else if (ctx->events[i].events == MOS_EPOLLOUT) {
664 				struct wget_vars *wv = &ctx->wvars[ctx->events[i].data.sock];
665 
666 				if (!wv->request_sent) {
667 					SendHTTPRequest(ctx, ctx->events[i].data.sock, wv);
668 				} else {
669 					//TRACE_DBG("Request already sent.\n");
670 				}
671 
672 			} else {
673 				TRACE_ERROR("Socket %d: event: %s\n",
674 						ctx->events[i].data.sock,
675 						EventToString(ctx->events[i].events));
676 				assert(0);
677 			}
678 		}
679 
680 		if (ctx->done >= ctx->target) {
681 			fprintf(stdout, "Completed %d connections, "
682 					"errors: %d incompletes: %d\n",
683 					ctx->done, ctx->errors, ctx->incompletes);
684 			break;
685 		}
686 	}
687 
688 	TRACE_INFO("Wget thread %d waiting for mtcp to be destroyed.\n", core);
689 
690 	g_stat[core] = NULL;
691 	g_ctx[core] = NULL;
692 	DestroyContext(ctx);
693 
694 	return;
695 }
696 /*----------------------------------------------------------------------------*/
697 void
698 RunApplication(mctx_t mctx)
699 {
700 	void *app_ctx;
701 
702 	app_ctx = (void *)calloc(1, sizeof(void *));
703 	if (!app_ctx) {
704 		TRACE_ERROR("calloc failure\n");
705 		return;
706 	}
707 
708 	TRACE_INFO("run application on core %d\n", mctx->cpu);
709 	InitWget(mctx, &(app_ctx));
710 	RunWget(mctx, &(app_ctx));
711 }
712 /*----------------------------------------------------------------------------*/
713 void *
714 RunMTCP(void *arg)
715 {
716 	int core = *(int *)arg;
717 	mctx_t mctx;
718 
719 	mtcp_core_affinitize(core);
720 
721 	/* mTCP Initialization */
722 	mctx = mtcp_create_context(core);
723 	if (!mctx) {
724 		pthread_exit(NULL);
725 		TRACE_ERROR("Failed to craete mtcp context.\n");
726 		return NULL;
727 	}
728 
729 	/* Run application here */
730 	RunApplication(mctx);
731 
732 	/* mTCP Tear Down */
733 	mtcp_destroy_context(mctx);
734 
735 	return NULL;
736 }
737 /*----------------------------------------------------------------------------*/
738 int
739 main(int argc, char **argv)
740 {
741 	int ret;
742 	int current_core = -1;
743 	char *fname = "config/mos.conf";
744 
745 	int opt;
746 	while ((opt = getopt(argc, argv, "f:c:")) != -1) {
747 		switch (opt) {
748 		case 'f':
749 			fname = optarg;
750 			break;
751 		case 'c':
752 			current_core = atoi(optarg);
753 			if (current_core >= MAX_CPUS) {
754 				TRACE_ERROR("Invalid core id!\n");
755 				exit(EXIT_FAILURE);
756 			}
757 			break;
758 		default:
759 			printf("Usage: %s [-f config_file]\n", argv[0]);
760 			return 0;
761 		}
762 
763 	}
764 
765 	core_limit = sysconf(_SC_NPROCESSORS_ONLN);
766 
767 	ret = mtcp_init(fname);
768 	if (ret) {
769 		TRACE_ERROR("Failed to initialize mtcp.\n");
770 		exit(EXIT_FAILURE);
771 	}
772 
773 	mtcp_getconf(&g_mcfg);
774 
775 	core_limit = g_mcfg.num_cores;
776 
777 	GlbInitWget();
778 
779 	RunMTCP(&current_core);
780 
781 	mtcp_destroy();
782 	return 0;
783 }
784 /*----------------------------------------------------------------------------*/
785