1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <unistd.h>
5 #include <stdint.h>
6 #include <time.h>
7 #include <sys/time.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <fcntl.h>
11 #include <pthread.h>
12 #include <signal.h>
13 #include <sys/socket.h>
14 #include <netinet/in.h>
15 #include <arpa/inet.h>
16 #include <sys/queue.h>
17 #include <assert.h>
18 
19 #include <mos_api.h>
20 #include "cpu.h"
21 #include "rss.h"
22 #include "http_parsing.h"
23 #include "debug.h"
24 #include "applib.h"
25 
26 #define CONFIG_FILE       "config/epwget.conf"
27 static struct conf_var g_conf[] = {
28 	{ "url",               {0} },
29 	{ "total_flows",       {0} },
30 	{ "core_limit",        {0} },
31 	{ "total_concurrency", {0} },
32 	{ "dest_port",         {0} },
33 	{ "keep_alive",        {0} },
34 };
35 #define NUM_CONF_VAR (sizeof(g_conf) / sizeof(struct conf_var))
36 
37 #define PORT_NUM 80
38 //#define PORT_NUM 3333
39 
40 #define MAX_URL_LEN 128
41 #define MAX_FILE_LEN 128
42 #define HTTP_HEADER_LEN 1024
43 
44 #define IP_RANGE 1
45 #define MAX_IP_STR_LEN 16
46 
47 #define BUF_SIZE (8*1024)
48 
49 /*----------------------------------------------------------------------------*/
50 struct mtcp_conf g_mcfg;
51 /*----------------------------------------------------------------------------*/
52 static mctx_t g_mctx[MAX_CPUS];
53 static int done[MAX_CPUS];
54 /*----------------------------------------------------------------------------*/
55 static int num_cores;
56 static int core_limit;
57 /*----------------------------------------------------------------------------*/
58 static int fio = FALSE;
59 static char outfile[MAX_FILE_LEN + 1];
60 /*----------------------------------------------------------------------------*/
61 static char host[MAX_IP_STR_LEN + 1];
62 static char path[MAX_URL_LEN + 1];
63 static in_addr_t daddr;
64 static in_port_t dport;
65 static in_addr_t saddr;
66 /*----------------------------------------------------------------------------*/
67 static int total_flows;
68 static int flows[MAX_CPUS];
69 static int flowcnt = 0;
70 static int concurrency;
71 static int max_fds;
72 static uint16_t dest_port;
73 static int keep_alive = FALSE;
74 /*----------------------------------------------------------------------------*/
75 struct wget_stat
76 {
77 	uint64_t waits;
78 	uint64_t events;
79 	uint64_t connects;
80 	uint64_t reads;
81 	uint64_t writes;
82 	uint64_t completes;
83 
84 	uint64_t errors;
85 	uint64_t timedout;
86 
87 	uint64_t sum_resp_time;
88 	uint64_t max_resp_time;
89 };
90 /*----------------------------------------------------------------------------*/
91 struct thread_context
92 {
93 	int core;
94 
95 	mctx_t mctx;
96 	int ep;
97 	struct wget_vars *wvars;
98 
99 	int target;
100 	int started;
101 	int errors;
102 	int incompletes;
103 	int done;
104 	int pending;
105 
106 	int maxevents;
107 	struct mtcp_epoll_event *events;
108 
109 	struct wget_stat stat;
110 };
111 typedef struct thread_context* thread_context_t;
112 /*----------------------------------------------------------------------------*/
113 struct wget_vars
114 {
115 	int request_sent;
116 
117 	char response[HTTP_HEADER_LEN];
118 	int resp_len;
119 	int headerset;
120 	uint32_t header_len;
121 	uint64_t file_len;
122 	uint64_t recv;
123 	uint64_t write;
124 
125 	struct timeval t_start;
126 	struct timeval t_end;
127 
128 	int fd;
129 };
130 /*----------------------------------------------------------------------------*/
131 static struct thread_context *g_ctx[MAX_CPUS] = {0};
132 static struct wget_stat *g_stat[MAX_CPUS] = {0};
133 /*----------------------------------------------------------------------------*/
134 static thread_context_t
CreateContext(mctx_t mctx)135 CreateContext(mctx_t mctx)
136 {
137 	thread_context_t ctx;
138 
139 	ctx = (thread_context_t)calloc(1, sizeof(struct thread_context));
140 	if (!ctx) {
141 		perror("malloc");
142 		TRACE_ERROR("Failed to allocate memory for thread context.\n");
143 		return NULL;
144 	}
145 
146 	ctx->mctx = mctx;
147 	ctx->core = mctx->cpu;
148 
149 	g_mctx[ctx->core] = mctx;
150 
151 	return ctx;
152 }
153 /*----------------------------------------------------------------------------*/
154 static void
DestroyContext(thread_context_t ctx)155 DestroyContext(thread_context_t ctx)
156 {
157 	g_stat[ctx->core] = NULL;
158 	free(ctx);
159 }
160 /*----------------------------------------------------------------------------*/
161 static inline int
CreateConnection(thread_context_t ctx)162 CreateConnection(thread_context_t ctx)
163 {
164 	mctx_t mctx = ctx->mctx;
165 	struct mtcp_epoll_event ev;
166 	struct sockaddr_in addr;
167 	int sockid;
168 	int ret;
169 
170 	assert(mctx);
171 
172 	errno = 0;
173 	sockid = mtcp_socket(mctx, AF_INET, SOCK_STREAM, 0);
174 	if (sockid < 0) {
175 		TRACE_INFO("Failed to create socket! (%s)\n",
176 			   strerror(errno));
177 		return -1;
178 	}
179 	memset(&ctx->wvars[sockid], 0, sizeof(struct wget_vars));
180 	ret = mtcp_setsock_nonblock(mctx, sockid);
181 	if (ret < 0) {
182 		TRACE_ERROR("Failed to set socket in nonblocking mode.\n");
183 		exit(-1);
184 	}
185 
186 	addr.sin_family = AF_INET;
187 	addr.sin_addr.s_addr = daddr;
188 	addr.sin_port = dport;
189 
190 	ret = mtcp_connect(mctx, sockid,
191 			(struct sockaddr *)&addr, sizeof(struct sockaddr_in));
192 	if (ret < 0) {
193 		if (errno != EINPROGRESS) {
194 			perror("mtcp_connect");
195 			mtcp_close(mctx, sockid);
196 			return -1;
197 		}
198 	}
199 
200 	ctx->started++;
201 	ctx->pending++;
202 	ctx->stat.connects++;
203 
204 	ev.events = MOS_EPOLLOUT;
205 	ev.data.sock = sockid;
206 	mtcp_epoll_ctl(mctx, ctx->ep, MOS_EPOLL_CTL_ADD, sockid, &ev);
207 
208 	return sockid;
209 }
210 /*----------------------------------------------------------------------------*/
211 static inline void
CloseConnection(thread_context_t ctx,int sockid)212 CloseConnection(thread_context_t ctx, int sockid)
213 {
214 	mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_DEL, sockid, NULL);
215 	mtcp_close(ctx->mctx, sockid);
216 	ctx->pending--;
217 	ctx->done++;
218 	assert(ctx->pending >= 0);
219 	while (/*ctx->pending*/ mtcp_get_connection_cnt(ctx->mctx) < concurrency && ctx->started < ctx->target) {
220 		if (CreateConnection(ctx) < 0) {
221 			done[ctx->core] = TRUE;
222 			break;
223 		}
224 	}
225 }
226 /*----------------------------------------------------------------------------*/
227 static inline int
SendHTTPRequest(thread_context_t ctx,int sockid,struct wget_vars * wv)228 SendHTTPRequest(thread_context_t ctx, int sockid, struct wget_vars *wv)
229 {
230 	char request[HTTP_HEADER_LEN];
231 	struct mtcp_epoll_event ev;
232 	int wr;
233 	int len;
234 
235 	wv->headerset = FALSE;
236 	wv->recv = 0;
237 	wv->header_len = wv->file_len = 0;
238 
239 	if (keep_alive) {
240 		snprintf(request, HTTP_HEADER_LEN, "GET %s HTTP/1.0\r\n"
241 			 "User-Agent: Wget/1.12 (linux-gnu)\r\n"
242 			 "Accept: */*\r\n"
243 			 "Host: %s\r\n"
244 			 "Connection: Keep-Alive\r\n\r\n",
245 			 path, host);
246 	} else {
247 		snprintf(request, HTTP_HEADER_LEN, "GET %s HTTP/1.0\r\n"
248 			 "User-Agent: Wget/1.12 (linux-gnu)\r\n"
249 			 "Accept: */*\r\n"
250 			 "Host: %s\r\n"
251 			 //			"Connection: Keep-Alive\r\n\r\n",
252 			 "Connection: Close\r\n\r\n",
253 			 path, host);
254 	}
255 
256 	len = strlen(request);
257 
258 	wr = mtcp_write(ctx->mctx, sockid, request, len);
259 	if (wr < len) {
260 		TRACE_ERROR("Socket %d: Sending HTTP request failed. "
261 				"try: %d, sent: %d\n", sockid, len, wr);
262 		CloseConnection(ctx, sockid);
263 	}
264 	ctx->stat.writes += wr;
265 	TRACE_APP("Socket %d HTTP Request of %d bytes. sent.\n", sockid, wr);
266 	wv->request_sent = TRUE;
267 
268 	ev.events = MOS_EPOLLIN;
269 	ev.data.sock = sockid;
270 	mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_MOD, sockid, &ev);
271 
272 	gettimeofday(&wv->t_start, NULL);
273 
274 	char fname[MAX_FILE_LEN + 1];
275 	if (fio) {
276 		snprintf(fname, MAX_FILE_LEN, "%s.%d", outfile, flowcnt++);
277 		wv->fd = open(fname, O_WRONLY | O_CREAT | O_TRUNC, 0644);
278 		if (wv->fd < 0) {
279 			TRACE_APP("Failed to open file descriptor for %s\n", fname);
280 			exit(1);
281 		}
282 	}
283 
284 	return 0;
285 }
286 /*----------------------------------------------------------------------------*/
287 static inline int
DownloadNext(thread_context_t ctx,int sockid,struct wget_vars * wv)288 DownloadNext(thread_context_t ctx, int sockid, struct wget_vars *wv)
289 {
290 	mctx_t mctx = ctx->mctx;
291 	uint64_t tdiff;
292 	struct mtcp_epoll_event ev;
293 
294 	TRACE_APP("Socket %d File download complete!\n", sockid);
295 	gettimeofday(&wv->t_end, NULL);
296 
297 	ctx->stat.completes++;
298 
299 	if (wv->recv - wv->header_len != wv->file_len) {
300 		fprintf(stderr, "Response size mismatch! "
301 			"actual recved: %ld,  expected to recved: %ld\n",
302 			wv->recv-wv->header_len, wv->file_len);
303 	}
304 
305 	tdiff = (wv->t_end.tv_sec - wv->t_start.tv_sec) * 1000000 +
306 		(wv->t_end.tv_usec - wv->t_start.tv_usec);
307 	TRACE_APP("Socket %d Total received bytes: %lu (%luMB)\n",
308 		  sockid, wv->recv, wv->recv / 1000000);
309 	TRACE_APP("Socket %d Total spent time: %lu us\n", sockid, tdiff);
310 	if (tdiff > 0) {
311 		TRACE_APP("Socket %d Average bandwidth: %lf[MB/s]\n",
312 			  sockid, (double)wv->recv / tdiff);
313 	}
314 	ctx->stat.sum_resp_time += tdiff;
315 	if (tdiff > ctx->stat.max_resp_time)
316 		ctx->stat.max_resp_time = tdiff;
317 
318 	ctx->done++;
319 
320 	/* instead of closing the connection, download the next file */
321 	memset(&ctx->wvars[sockid], 0, sizeof(struct wget_vars));
322 
323 	ctx->started++;
324 
325 	ev.events = MOS_EPOLLOUT;
326 	ev.data.sock = sockid;
327 	mtcp_epoll_ctl(mctx, ctx->ep, MOS_EPOLL_CTL_MOD, sockid, &ev);
328 
329 	return 0;
330 }
331 /*----------------------------------------------------------------------------*/
332 static inline int
DownloadComplete(thread_context_t ctx,int sockid,struct wget_vars * wv)333 DownloadComplete(thread_context_t ctx, int sockid, struct wget_vars *wv)
334 {
335 #ifdef APP
336 	mctx_t mctx = ctx->mctx;
337 #endif
338 	uint64_t tdiff;
339 
340 	TRACE_APP("Socket %d File download complete!\n", sockid);
341 	gettimeofday(&wv->t_end, NULL);
342 	CloseConnection(ctx, sockid);
343 	ctx->stat.completes++;
344 
345 	if (wv->recv - wv->header_len != wv->file_len) {
346 		fprintf(stderr, "Response size mismatch! "
347 						"actual recved: %ld,  expected to recved: %ld\n",
348 						wv->recv-wv->header_len, wv->file_len);
349 	}
350 
351 	tdiff = (wv->t_end.tv_sec - wv->t_start.tv_sec) * 1000000 +
352 			(wv->t_end.tv_usec - wv->t_start.tv_usec);
353 	TRACE_APP("Socket %d Total received bytes: %lu (%luMB)\n",
354 			sockid, wv->recv, wv->recv / 1000000);
355 	TRACE_APP("Socket %d Total spent time: %lu us\n", sockid, tdiff);
356 	if (tdiff > 0) {
357 		TRACE_APP("Socket %d Average bandwidth: %lf[MB/s]\n",
358 				sockid, (double)wv->recv / tdiff);
359 	}
360 	ctx->stat.sum_resp_time += tdiff;
361 	if (tdiff > ctx->stat.max_resp_time)
362 		ctx->stat.max_resp_time = tdiff;
363 
364 	if (fio && wv->fd > 0)
365 		close(wv->fd);
366 
367 	return 0;
368 }
369 /*----------------------------------------------------------------------------*/
370 static inline int
HandleReadEvent(thread_context_t ctx,int sockid,struct wget_vars * wv)371 HandleReadEvent(thread_context_t ctx, int sockid, struct wget_vars *wv)
372 {
373 	mctx_t mctx = ctx->mctx;
374 	char buf[BUF_SIZE];
375 	char *pbuf;
376 	int rd, copy_len;
377 
378 	rd = 1;
379 	while (rd > 0) {
380 		rd = mtcp_read(mctx, sockid, buf, BUF_SIZE);
381 		if (rd <= 0)
382 			break;
383 		ctx->stat.reads += rd;
384 
385 		TRACE_APP("Socket %d: mtcp_read ret: %d, total_recv: %lu, "
386 				"header_set: %d, header_len: %u, file_len: %lu\n",
387 				sockid, rd, wv->recv + rd,
388 				wv->headerset, wv->header_len, wv->file_len);
389 
390 		pbuf = buf;
391 		if (!wv->headerset) {
392 			copy_len = MIN(rd, HTTP_HEADER_LEN - wv->resp_len);
393 			memcpy(wv->response + wv->resp_len, buf, copy_len);
394 			wv->resp_len += copy_len;
395 			wv->header_len = find_http_header(wv->response, wv->resp_len);
396 			if (wv->header_len > 0) {
397 				wv->response[wv->header_len] = '\0';
398 				wv->file_len = http_header_long_val(wv->response,
399 						CONTENT_LENGTH_HDR, sizeof(CONTENT_LENGTH_HDR) - 1);
400 				if (wv->file_len < 0) {
401 					/* failed to find for the Content-Length field */
402 					wv->recv += rd;
403 					rd = 0;
404 					CloseConnection(ctx, sockid);
405 					return 0;
406 				}
407 				TRACE_APP("Socket %d Parsed response header. "
408 						"Header length: %u, File length: %lu (%luMB)\n",
409 						sockid, wv->header_len,
410 						wv->file_len, wv->file_len / 1024 / 1024);
411 				wv->headerset = TRUE;
412 				wv->recv += (rd - (wv->resp_len - wv->header_len));
413 				rd = (wv->resp_len - wv->header_len);
414 
415 				pbuf += (rd - (wv->resp_len - wv->header_len));
416 
417 			} else {
418 				/* failed to parse response header */
419 				wv->recv += rd;
420 				rd = 0;
421 				ctx->stat.errors++;
422 				ctx->errors++;
423 				CloseConnection(ctx, sockid);
424 				return 0;
425 			}
426 		}
427 		wv->recv += rd;
428 
429 		if (fio && wv->fd > 0) {
430 			int wr = 0;
431 			while (wr < rd) {
432 				int _wr = write(wv->fd, pbuf + wr, rd - wr);
433 				assert (_wr == rd - wr);
434 				 if (_wr < 0) {
435 					 perror("write");
436 					 TRACE_ERROR("Failed to write.\n");
437 					 assert(0);
438 					 break;
439 				 }
440 				 wr += _wr;
441 				 wv->write += _wr;
442 			}
443 		}
444 
445 		if (wv->header_len && (wv->recv >= wv->header_len + wv->file_len)) {
446 			break;
447 		}
448 	}
449 
450 	if (rd > 0) {
451 		if (wv->header_len && (wv->recv >= wv->header_len + wv->file_len)) {
452 			TRACE_APP("Socket %d Done Write: "
453 					"header: %u file: %lu recv: %lu write: %lu\n",
454 					sockid, wv->header_len, wv->file_len,
455 					wv->recv - wv->header_len, wv->write);
456 			if (keep_alive && ctx->started < ctx->target)
457 				DownloadNext(ctx, sockid, wv);
458 			else
459 				DownloadComplete(ctx, sockid, wv);
460 
461 			return 0;
462 		}
463 
464 	} else if (rd == 0) {
465 		/* connection closed by remote host */
466 		TRACE_DBG("Socket %d connection closed with server.\n", sockid);
467 
468 		if (wv->header_len && (wv->recv >= wv->header_len + wv->file_len)) {
469 			DownloadComplete(ctx, sockid, wv);
470 		} else {
471 			ctx->stat.errors++;
472 			ctx->incompletes++;
473 			CloseConnection(ctx, sockid);
474 		}
475 
476 	} else if (rd < 0) {
477 		if (errno != EAGAIN) {
478 			TRACE_DBG("Socket %d: mtcp_read() error %s\n",
479 					sockid, strerror(errno));
480 			ctx->stat.errors++;
481 			ctx->errors++;
482 			CloseConnection(ctx, sockid);
483 		}
484 	}
485 
486 	return 0;
487 }
488 /*----------------------------------------------------------------------------*/
489 static void
PrintStats()490 PrintStats()
491 {
492 	struct wget_stat total = {0};
493 	struct wget_stat *st;
494 	uint64_t avg_resp_time;
495 	uint64_t total_resp_time = 0;
496 	int i;
497 
498 	for (i = 0; i < core_limit; i++) {
499 		st = g_stat[i];
500 		if (!st)
501 			continue;
502 		avg_resp_time = st->completes? st->sum_resp_time / st->completes : 0;
503 
504 		total.waits += st->waits;
505 		total.events += st->events;
506 		total.connects += st->connects;
507 		total.reads += st->reads;
508 		total.writes += st->writes;
509 		total.completes += st->completes;
510 		total_resp_time += avg_resp_time;
511 		if (st->max_resp_time > total.max_resp_time)
512 			total.max_resp_time = st->max_resp_time;
513 		total.errors += st->errors;
514 		total.timedout += st->timedout;
515 
516 		memset(st, 0, sizeof(struct wget_stat));
517 	}
518 	fprintf(stderr, "[ ALL ] connect: %7lu, read: %4lu MB, write: %4lu MB, "
519 			"completes: %7lu (resp_time avg: %4lu, max: %6lu us)\n",
520 			total.connects,
521 			total.reads / 1024 / 1024, total.writes / 1024 / 1024,
522 			total.completes, total_resp_time / core_limit, total.max_resp_time);
523 }
524 /*----------------------------------------------------------------------------*/
525 static void
GlbInitWget()526 GlbInitWget()
527 {
528 	struct mtcp_conf mcfg;
529 	char *url;
530 	int total_concurrency = 0;
531 	int flow_per_thread;
532 	int flow_remainder_cnt;
533 	int i;
534 
535 	num_cores = GetNumCPUs();
536 	core_limit = num_cores;
537 	concurrency = 100;
538 	total_flows = -1;
539 
540 	LoadConfig(CONFIG_FILE, g_conf, NUM_CONF_VAR);
541 
542 	url = g_conf[0].value;
543 	total_flows = atoi(g_conf[1].value);
544 	core_limit = atoi(g_conf[2].value);
545 	total_concurrency = atoi(g_conf[3].value);
546 	dest_port = atoi(g_conf[4].value);
547 	keep_alive = atoi(g_conf[5].value);
548 
549 	if ((strlen(url) == 0)
550 			|| (strlen(url) > MAX_URL_LEN)
551 			|| (total_flows <= 0)) {
552 		TRACE_INFO("Invalid configuration\n");
553 		exit(0);
554 	}
555 
556 	char* slash_p = strchr(url, '/');
557 	if (slash_p) {
558 		strncpy(host, url, slash_p - url);
559 		strncpy(path, strchr(url, '/'), MAX_URL_LEN);
560 	} else {
561 		strncpy(host, url, MAX_IP_STR_LEN);
562 		strncpy(path, "/", 1);
563 	}
564 
565 	daddr = inet_addr(host);
566 	dport = (dest_port == 0) ? htons(PORT_NUM) : htons(dest_port);
567 	saddr = INADDR_ANY;
568 
569 	if (total_flows < core_limit) {
570 		core_limit = total_flows;
571 	}
572 
573 	/* per-core concurrency = total_concurrency / # cores */
574 	if (total_concurrency > 0)
575 		concurrency = total_concurrency / core_limit;
576 
577 	/* set the max number of fds 3x larger than concurrency */
578 	max_fds = concurrency * 3;
579 
580 	TRACE_CONFIG("Application configuration:\n");
581 	TRACE_CONFIG("URL: %s\n", path);
582 	TRACE_CONFIG("# of total_flows: %d\n", total_flows);
583 	TRACE_CONFIG("# of cores: %d\n", core_limit);
584 	TRACE_CONFIG("Concurrency: %d\n", total_concurrency);
585 
586 	mtcp_getconf(&mcfg);
587 	mcfg.max_concurrency = max_fds;
588 	mcfg.max_num_buffers = max_fds;
589 	mtcp_setconf(&mcfg);
590 
591 	flow_per_thread = total_flows / core_limit;
592 	flow_remainder_cnt = total_flows % core_limit;
593 
594 	for (i = 0; i < MAX_CPUS; i++) {
595 		done[i] = FALSE;
596 		flows[i] = flow_per_thread;
597 
598 		if (flow_remainder_cnt-- > 0)
599 			flows[i]++;
600 	}
601 
602 	return;
603 }
604 /*----------------------------------------------------------------------------*/
605 static void
InitWget(mctx_t mctx,void ** app_ctx)606 InitWget(mctx_t mctx, void **app_ctx)
607 {
608 	thread_context_t ctx;
609 	int ep;
610 
611 	assert(mctx);
612 
613 	int core = mctx->cpu;
614 
615 	ctx = CreateContext(mctx);
616 	if (!ctx)
617 		exit(-1);
618 	g_ctx[core] = ctx;
619 	g_stat[core] = &ctx->stat;
620 	srand(time(NULL));
621 
622 	mtcp_init_rss(mctx, saddr, IP_RANGE, daddr, dport);
623 
624 	if (flows[core] == 0) {
625 		TRACE_DBG("Application thread %d finished.\n", core);
626 		exit(-1);
627 	}
628 	ctx->target = flows[core];
629 
630 	/* Initialization */
631 	ctx->maxevents = max_fds * 3;
632 	ep = mtcp_epoll_create(mctx, ctx->maxevents);
633 	if (ep < 0) {
634 		TRACE_ERROR("Failed to create epoll struct!n");
635 		exit(EXIT_FAILURE);
636 	}
637 	ctx->events = (struct mtcp_epoll_event *)
638 			calloc(ctx->maxevents, sizeof(struct mtcp_epoll_event));
639 	if (!ctx->events) {
640 		TRACE_ERROR("Failed to allocate events!\n");
641 		exit(EXIT_FAILURE);
642 	}
643 	ctx->ep = ep;
644 
645 	ctx->wvars = (struct wget_vars *)calloc(max_fds, sizeof(struct wget_vars));
646 	if (!ctx->wvars) {
647 		TRACE_ERROR("Failed to create wget variables!\n");
648 		exit(EXIT_FAILURE);
649 	}
650 
651 	ctx->started = ctx->done = ctx->pending = 0;
652 	ctx->errors = ctx->incompletes = 0;
653 
654 	*app_ctx = ctx;
655 
656 	return;
657 }
658 /*----------------------------------------------------------------------------*/
659 static void
RunWget(mctx_t mctx,void ** app_ctx)660 RunWget(mctx_t mctx, void **app_ctx)
661 {
662 	struct in_addr daddr_in;
663 	struct timeval cur_tv, prev_tv;
664 	int nevents;
665 	int i;
666 
667 	assert(mctx);
668 	assert(*app_ctx);
669 
670 	thread_context_t ctx = *app_ctx;
671 	int core = ctx->core;
672 
673 	daddr_in.s_addr = daddr;
674 	fprintf(stderr, "Thread %d handles %d flows. connecting to %s:%u\n",
675 			core, flows[core], inet_ntoa(daddr_in), ntohs(dport));
676 
677 	gettimeofday(&cur_tv, NULL);
678 	prev_tv = cur_tv;
679 
680 	while (!done[core]) {
681 		gettimeofday(&cur_tv, NULL);
682 
683 		/* print statistics every second */
684 		if (core == 0 && cur_tv.tv_sec > prev_tv.tv_sec) {
685 			PrintStats();
686 			prev_tv = cur_tv;
687 		}
688 
689 		while (/*ctx->pending*/ mtcp_get_connection_cnt(ctx->mctx) < concurrency && ctx->started < ctx->target) {
690 			if (CreateConnection(ctx) < 0) {
691 				done[core] = TRUE;
692 				break;
693 			}
694 		}
695 
696 		nevents = mtcp_epoll_wait(mctx, ctx->ep,
697 				ctx->events, ctx->maxevents, ctx->pending ? -1 : 10);
698 		ctx->stat.waits++;
699 
700 		if (nevents < 0) {
701 			if (errno != EINTR) {
702 				TRACE_ERROR("mtcp_epoll_wait failed! ret: %d\n", nevents);
703 			}
704 			done[core] = TRUE;
705 			break;
706 		} else {
707 			ctx->stat.events += nevents;
708 		}
709 
710 		for (i = 0; i < nevents; i++) {
711 
712 			if (ctx->events[i].events & MOS_EPOLLERR) {
713 				int err;
714 				socklen_t len = sizeof(err);
715 
716 				TRACE_APP("[CPU %d] Error on socket %d\n",
717 						core, ctx->events[i].data.sockid);
718 				ctx->stat.errors++;
719 				ctx->errors++;
720 				if (mtcp_getsockopt(mctx, ctx->events[i].data.sock,
721 							SOL_SOCKET, SO_ERROR, (void *)&err, &len) == 0) {
722 					if (err == ETIMEDOUT)
723 						ctx->stat.timedout++;
724 				}
725 				CloseConnection(ctx, ctx->events[i].data.sock);
726 
727 			} else if (ctx->events[i].events & MOS_EPOLLIN) {
728 				HandleReadEvent(ctx,
729 						ctx->events[i].data.sock,
730 						&ctx->wvars[ctx->events[i].data.sock]);
731 
732 			} else if (ctx->events[i].events == MOS_EPOLLOUT) {
733 				struct wget_vars *wv = &ctx->wvars[ctx->events[i].data.sock];
734 
735 				if (!wv->request_sent) {
736 					SendHTTPRequest(ctx, ctx->events[i].data.sock, wv);
737 				} else {
738 					//TRACE_DBG("Request already sent.\n");
739 				}
740 
741 			} else {
742 				TRACE_ERROR("Socket %d: event: %s\n",
743 						ctx->events[i].data.sock,
744 						EventToString(ctx->events[i].events));
745 				assert(0);
746 			}
747 		}
748 
749 		if (ctx->done >= ctx->target) {
750 			fprintf(stdout, "Completed %d connections, "
751 					"errors: %d incompletes: %d\n",
752 					ctx->done, ctx->errors, ctx->incompletes);
753 			break;
754 		}
755 	}
756 
757 	TRACE_INFO("Wget thread %d waiting for mtcp to be destroyed.\n", core);
758 
759 	g_stat[core] = NULL;
760 	g_ctx[core] = NULL;
761 	DestroyContext(ctx);
762 
763 	return;
764 }
765 /*----------------------------------------------------------------------------*/
766 void
RunApplication(mctx_t mctx)767 RunApplication(mctx_t mctx)
768 {
769 	void *app_ctx;
770 
771 	app_ctx = (void *)calloc(1, sizeof(void *));
772 	if (!app_ctx) {
773 		TRACE_ERROR("calloc failure\n");
774 		return;
775 	}
776 
777 	TRACE_INFO("run application on core %d\n", mctx->cpu);
778 	InitWget(mctx, &(app_ctx));
779 	RunWget(mctx, &(app_ctx));
780 }
781 /*----------------------------------------------------------------------------*/
782 void *
RunMTCP(void * arg)783 RunMTCP(void *arg)
784 {
785 	int core = *(int *)arg;
786 	mctx_t mctx;
787 
788 	mtcp_core_affinitize(core);
789 
790 	/* mTCP Initialization */
791 	mctx = mtcp_create_context(core);
792 	if (!mctx) {
793 		pthread_exit(NULL);
794 		TRACE_ERROR("Failed to craete mtcp context.\n");
795 		return NULL;
796 	}
797 
798 	/* Run application here */
799 	RunApplication(mctx);
800 
801 	/* mTCP Tear Down */
802 	mtcp_destroy_context(mctx);
803 
804 	return NULL;
805 }
806 /*----------------------------------------------------------------------------*/
807 int
main(int argc,char ** argv)808 main(int argc, char **argv)
809 {
810 	int ret;
811 	int current_core = -1;
812 	char *fname = "config/mos.conf";
813 
814 	int opt;
815 	while ((opt = getopt(argc, argv, "f:c:")) != -1) {
816 		switch (opt) {
817 		case 'f':
818 			fname = optarg;
819 			break;
820 		case 'c':
821 			current_core = atoi(optarg);
822 			if (current_core >= MAX_CPUS) {
823 				TRACE_ERROR("Invalid core id!\n");
824 				exit(EXIT_FAILURE);
825 			}
826 			break;
827 		default:
828 			printf("Usage: %s [-f config_file]\n", argv[0]);
829 			return 0;
830 		}
831 
832 	}
833 
834 	core_limit = sysconf(_SC_NPROCESSORS_ONLN);
835 
836 	ret = mtcp_init(fname);
837 	if (ret) {
838 		TRACE_ERROR("Failed to initialize mtcp.\n");
839 		exit(EXIT_FAILURE);
840 	}
841 
842 	mtcp_getconf(&g_mcfg);
843 
844 	core_limit = g_mcfg.num_cores;
845 
846 	GlbInitWget();
847 
848 	RunMTCP(&current_core);
849 
850 	mtcp_destroy();
851 	return 0;
852 }
853 /*----------------------------------------------------------------------------*/
854