1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <unistd.h>
5 #include <stdint.h>
6 #include <time.h>
7 #include <sys/time.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <fcntl.h>
11 #include <pthread.h>
12 #include <signal.h>
13 #include <sys/socket.h>
14 #include <netinet/in.h>
15 #include <arpa/inet.h>
16 #include <sys/queue.h>
17 #include <assert.h>
18 
19 #include <mos_api.h>
20 #include "cpu.h"
21 #include "rss.h"
22 #include "http_parsing.h"
23 #include "debug.h"
24 #include "applib.h"
25 
26 #define CONFIG_FILE       "config/epwget.conf"
27 static struct conf_var g_conf[] = {
28 	{ "url",               {0} },
29 	{ "total_flows",       {0} },
30 	{ "core_limit",        {0} },
31 	{ "total_concurrency", {0} },
32 	{ "dest_port",         {0} },
33 	{ "keep_alive",        {0} },
34 };
35 #define NUM_CONF_VAR (sizeof(g_conf) / sizeof(struct conf_var))
36 
37 #define PORT_NUM 80
38 //#define PORT_NUM 3333
39 
40 #define MAX_URL_LEN 128
41 #define MAX_FILE_LEN 128
42 #define HTTP_HEADER_LEN 1024
43 
44 #define IP_RANGE 1
45 #define MAX_IP_STR_LEN 16
46 
47 #define BUF_SIZE (8*1024)
48 
49 /*----------------------------------------------------------------------------*/
50 struct mtcp_conf g_mcfg;
51 static pthread_t mtcp_thread[MAX_CPUS];
52 /*----------------------------------------------------------------------------*/
53 static mctx_t g_mctx[MAX_CPUS];
54 static int done[MAX_CPUS];
55 /*----------------------------------------------------------------------------*/
56 static int num_cores;
57 static int core_limit;
58 /*----------------------------------------------------------------------------*/
59 static int fio = FALSE;
60 static char outfile[MAX_FILE_LEN + 1];
61 /*----------------------------------------------------------------------------*/
62 static char host[MAX_IP_STR_LEN + 1];
63 static char path[MAX_URL_LEN + 1];
64 static in_addr_t daddr;
65 static in_port_t dport;
66 static in_addr_t saddr;
67 /*----------------------------------------------------------------------------*/
68 static int total_flows;
69 static int flows[MAX_CPUS];
70 static int flowcnt = 0;
71 static int concurrency;
72 static int max_fds;
73 static uint16_t dest_port;
74 static int keep_alive = FALSE;
75 /*----------------------------------------------------------------------------*/
76 struct wget_stat
77 {
78 	uint64_t waits;
79 	uint64_t events;
80 	uint64_t connects;
81 	uint64_t reads;
82 	uint64_t writes;
83 	uint64_t completes;
84 
85 	uint64_t errors;
86 	uint64_t timedout;
87 
88 	uint64_t sum_resp_time;
89 	uint64_t max_resp_time;
90 };
91 /*----------------------------------------------------------------------------*/
92 struct thread_context
93 {
94 	int core;
95 
96 	mctx_t mctx;
97 	int ep;
98 	struct wget_vars *wvars;
99 
100 	int target;
101 	int started;
102 	int errors;
103 	int incompletes;
104 	int done;
105 	int pending;
106 
107 	int maxevents;
108 	struct mtcp_epoll_event *events;
109 
110 	struct wget_stat stat;
111 };
112 typedef struct thread_context* thread_context_t;
113 /*----------------------------------------------------------------------------*/
114 struct wget_vars
115 {
116 	int request_sent;
117 
118 	char response[HTTP_HEADER_LEN];
119 	int resp_len;
120 	int headerset;
121 	uint32_t header_len;
122 	uint64_t file_len;
123 	uint64_t recv;
124 	uint64_t write;
125 
126 	struct timeval t_start;
127 	struct timeval t_end;
128 
129 	int fd;
130 };
131 /*----------------------------------------------------------------------------*/
132 static struct thread_context *g_ctx[MAX_CPUS] = {0};
133 static struct wget_stat *g_stat[MAX_CPUS] = {0};
134 /*----------------------------------------------------------------------------*/
135 static thread_context_t
CreateContext(mctx_t mctx)136 CreateContext(mctx_t mctx)
137 {
138 	thread_context_t ctx;
139 
140 	ctx = (thread_context_t)calloc(1, sizeof(struct thread_context));
141 	if (!ctx) {
142 		perror("malloc");
143 		TRACE_ERROR("Failed to allocate memory for thread context.\n");
144 		return NULL;
145 	}
146 
147 	ctx->mctx = mctx;
148 	ctx->core = mctx->cpu;
149 
150 	g_mctx[ctx->core] = mctx;
151 
152 	return ctx;
153 }
154 /*----------------------------------------------------------------------------*/
155 static void
DestroyContext(thread_context_t ctx)156 DestroyContext(thread_context_t ctx)
157 {
158 	g_stat[ctx->core] = NULL;
159 	free(ctx);
160 }
161 /*----------------------------------------------------------------------------*/
162 static inline int
CreateConnection(thread_context_t ctx)163 CreateConnection(thread_context_t ctx)
164 {
165 	mctx_t mctx = ctx->mctx;
166 	struct mtcp_epoll_event ev;
167 	struct sockaddr_in addr;
168 	int sockid;
169 	int ret;
170 
171 	assert(mctx);
172 
173 	errno = 0;
174 	sockid = mtcp_socket(mctx, AF_INET, SOCK_STREAM, 0);
175 	if (sockid < 0) {
176 		TRACE_INFO("Failed to create socket! (%s)\n",
177 			   strerror(errno));
178 		return -1;
179 	}
180 	memset(&ctx->wvars[sockid], 0, sizeof(struct wget_vars));
181 	ret = mtcp_setsock_nonblock(mctx, sockid);
182 	if (ret < 0) {
183 		TRACE_ERROR("Failed to set socket in nonblocking mode.\n");
184 		exit(-1);
185 	}
186 
187 	addr.sin_family = AF_INET;
188 	addr.sin_addr.s_addr = daddr;
189 	addr.sin_port = dport;
190 
191 	ret = mtcp_connect(mctx, sockid,
192 			(struct sockaddr *)&addr, sizeof(struct sockaddr_in));
193 	if (ret < 0) {
194 		if (errno != EINPROGRESS) {
195 			perror("mtcp_connect");
196 			mtcp_close(mctx, sockid);
197 			return -1;
198 		}
199 	}
200 
201 	ctx->started++;
202 	ctx->pending++;
203 	ctx->stat.connects++;
204 
205 	ev.events = MOS_EPOLLOUT;
206 	ev.data.sock = sockid;
207 	mtcp_epoll_ctl(mctx, ctx->ep, MOS_EPOLL_CTL_ADD, sockid, &ev);
208 
209 	return sockid;
210 }
211 /*----------------------------------------------------------------------------*/
212 static inline void
CloseConnection(thread_context_t ctx,int sockid)213 CloseConnection(thread_context_t ctx, int sockid)
214 {
215 	mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_DEL, sockid, NULL);
216 	mtcp_close(ctx->mctx, sockid);
217 	ctx->pending--;
218 	ctx->done++;
219 	assert(ctx->pending >= 0);
220 	while (/*ctx->pending*/ mtcp_get_connection_cnt(ctx->mctx) < concurrency && ctx->started < ctx->target) {
221 		if (CreateConnection(ctx) < 0) {
222 			done[ctx->core] = TRUE;
223 			break;
224 		}
225 	}
226 }
227 /*----------------------------------------------------------------------------*/
228 static inline int
SendHTTPRequest(thread_context_t ctx,int sockid,struct wget_vars * wv)229 SendHTTPRequest(thread_context_t ctx, int sockid, struct wget_vars *wv)
230 {
231 	char request[HTTP_HEADER_LEN];
232 	struct mtcp_epoll_event ev;
233 	int wr;
234 	int len;
235 
236 	wv->headerset = FALSE;
237 	wv->recv = 0;
238 	wv->header_len = wv->file_len = 0;
239 
240 	if (keep_alive) {
241 		snprintf(request, HTTP_HEADER_LEN, "GET %s HTTP/1.0\r\n"
242 			 "User-Agent: Wget/1.12 (linux-gnu)\r\n"
243 			 "Accept: */*\r\n"
244 			 "Host: %s\r\n"
245 			 "Connection: Keep-Alive\r\n\r\n",
246 			 path, host);
247 	} else {
248 		snprintf(request, HTTP_HEADER_LEN, "GET %s HTTP/1.0\r\n"
249 			 "User-Agent: Wget/1.12 (linux-gnu)\r\n"
250 			 "Accept: */*\r\n"
251 			 "Host: %s\r\n"
252 			 //			"Connection: Keep-Alive\r\n\r\n",
253 			 "Connection: Close\r\n\r\n",
254 			 path, host);
255 	}
256 
257 	len = strlen(request);
258 
259 	wr = mtcp_write(ctx->mctx, sockid, request, len);
260 	if (wr < len) {
261 		TRACE_ERROR("Socket %d: Sending HTTP request failed. "
262 				"try: %d, sent: %d\n", sockid, len, wr);
263 		CloseConnection(ctx, sockid);
264 	}
265 	ctx->stat.writes += wr;
266 	TRACE_APP("Socket %d HTTP Request of %d bytes. sent.\n", sockid, wr);
267 	wv->request_sent = TRUE;
268 
269 	ev.events = MOS_EPOLLIN;
270 	ev.data.sock = sockid;
271 	mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_MOD, sockid, &ev);
272 
273 	gettimeofday(&wv->t_start, NULL);
274 
275 	char fname[MAX_FILE_LEN + 1];
276 	if (fio) {
277 		snprintf(fname, MAX_FILE_LEN, "%s.%d", outfile, flowcnt++);
278 		wv->fd = open(fname, O_WRONLY | O_CREAT | O_TRUNC, 0644);
279 		if (wv->fd < 0) {
280 			TRACE_APP("Failed to open file descriptor for %s\n", fname);
281 			exit(1);
282 		}
283 	}
284 
285 	return 0;
286 }
287 /*----------------------------------------------------------------------------*/
288 static inline int
DownloadNext(thread_context_t ctx,int sockid,struct wget_vars * wv)289 DownloadNext(thread_context_t ctx, int sockid, struct wget_vars *wv)
290 {
291 	mctx_t mctx = ctx->mctx;
292 	uint64_t tdiff;
293 	struct mtcp_epoll_event ev;
294 
295 	TRACE_APP("Socket %d File download complete!\n", sockid);
296 	gettimeofday(&wv->t_end, NULL);
297 
298 	ctx->stat.completes++;
299 
300 	if (wv->recv - wv->header_len != wv->file_len) {
301 		fprintf(stderr, "Response size mismatch! "
302 			"actual recved: %ld,  expected to recved: %ld\n",
303 			wv->recv-wv->header_len, wv->file_len);
304 	}
305 
306 	tdiff = (wv->t_end.tv_sec - wv->t_start.tv_sec) * 1000000 +
307 		(wv->t_end.tv_usec - wv->t_start.tv_usec);
308 	TRACE_APP("Socket %d Total received bytes: %lu (%luMB)\n",
309 		  sockid, wv->recv, wv->recv / 1000000);
310 	TRACE_APP("Socket %d Total spent time: %lu us\n", sockid, tdiff);
311 	if (tdiff > 0) {
312 		TRACE_APP("Socket %d Average bandwidth: %lf[MB/s]\n",
313 			  sockid, (double)wv->recv / tdiff);
314 	}
315 	ctx->stat.sum_resp_time += tdiff;
316 	if (tdiff > ctx->stat.max_resp_time)
317 		ctx->stat.max_resp_time = tdiff;
318 
319 	ctx->done++;
320 
321 	/* instead of closing the connection, download the next file */
322 	memset(&ctx->wvars[sockid], 0, sizeof(struct wget_vars));
323 
324 	ctx->started++;
325 
326 	ev.events = MOS_EPOLLOUT;
327 	ev.data.sock = sockid;
328 	mtcp_epoll_ctl(mctx, ctx->ep, MOS_EPOLL_CTL_MOD, sockid, &ev);
329 
330 	return 0;
331 }
332 /*----------------------------------------------------------------------------*/
333 static inline int
DownloadComplete(thread_context_t ctx,int sockid,struct wget_vars * wv)334 DownloadComplete(thread_context_t ctx, int sockid, struct wget_vars *wv)
335 {
336 #ifdef APP
337 	mctx_t mctx = ctx->mctx;
338 #endif
339 	uint64_t tdiff;
340 
341 	TRACE_APP("Socket %d File download complete!\n", sockid);
342 	gettimeofday(&wv->t_end, NULL);
343 	CloseConnection(ctx, sockid);
344 	ctx->stat.completes++;
345 
346 	if (wv->recv - wv->header_len != wv->file_len) {
347 		fprintf(stderr, "Response size mismatch! "
348 						"actual recved: %ld,  expected to recved: %ld\n",
349 						wv->recv-wv->header_len, wv->file_len);
350 	}
351 
352 	tdiff = (wv->t_end.tv_sec - wv->t_start.tv_sec) * 1000000 +
353 			(wv->t_end.tv_usec - wv->t_start.tv_usec);
354 	TRACE_APP("Socket %d Total received bytes: %lu (%luMB)\n",
355 			sockid, wv->recv, wv->recv / 1000000);
356 	TRACE_APP("Socket %d Total spent time: %lu us\n", sockid, tdiff);
357 	if (tdiff > 0) {
358 		TRACE_APP("Socket %d Average bandwidth: %lf[MB/s]\n",
359 				sockid, (double)wv->recv / tdiff);
360 	}
361 	ctx->stat.sum_resp_time += tdiff;
362 	if (tdiff > ctx->stat.max_resp_time)
363 		ctx->stat.max_resp_time = tdiff;
364 
365 	if (fio && wv->fd > 0)
366 		close(wv->fd);
367 
368 	return 0;
369 }
370 /*----------------------------------------------------------------------------*/
371 static inline int
HandleReadEvent(thread_context_t ctx,int sockid,struct wget_vars * wv)372 HandleReadEvent(thread_context_t ctx, int sockid, struct wget_vars *wv)
373 {
374 	mctx_t mctx = ctx->mctx;
375 	char buf[BUF_SIZE];
376 	char *pbuf;
377 	int rd, copy_len;
378 
379 	rd = 1;
380 	while (rd > 0) {
381 		rd = mtcp_read(mctx, sockid, buf, BUF_SIZE);
382 		if (rd <= 0)
383 			break;
384 		ctx->stat.reads += rd;
385 
386 		TRACE_APP("Socket %d: mtcp_read ret: %d, total_recv: %lu, "
387 				"header_set: %d, header_len: %u, file_len: %lu\n",
388 				sockid, rd, wv->recv + rd,
389 				wv->headerset, wv->header_len, wv->file_len);
390 
391 		pbuf = buf;
392 		if (!wv->headerset) {
393 			copy_len = MIN(rd, HTTP_HEADER_LEN - wv->resp_len);
394 			memcpy(wv->response + wv->resp_len, buf, copy_len);
395 			wv->resp_len += copy_len;
396 			wv->header_len = find_http_header(wv->response, wv->resp_len);
397 			if (wv->header_len > 0) {
398 				wv->response[wv->header_len] = '\0';
399 				wv->file_len = http_header_long_val(wv->response,
400 						CONTENT_LENGTH_HDR, sizeof(CONTENT_LENGTH_HDR) - 1);
401 				if (wv->file_len < 0) {
402 					/* failed to find for the Content-Length field */
403 					wv->recv += rd;
404 					rd = 0;
405 					CloseConnection(ctx, sockid);
406 					return 0;
407 				}
408 				TRACE_APP("Socket %d Parsed response header. "
409 						"Header length: %u, File length: %lu (%luMB)\n",
410 						sockid, wv->header_len,
411 						wv->file_len, wv->file_len / 1024 / 1024);
412 				wv->headerset = TRUE;
413 				wv->recv += (rd - (wv->resp_len - wv->header_len));
414 
415 				pbuf += (rd - (wv->resp_len - wv->header_len));
416 				rd = (wv->resp_len - wv->header_len);
417 
418 			} else {
419 				/* failed to parse response header */
420 				wv->recv += rd;
421 				rd = 0;
422 				ctx->stat.errors++;
423 				ctx->errors++;
424 				CloseConnection(ctx, sockid);
425 				return 0;
426 			}
427 		}
428 		wv->recv += rd;
429 
430 		if (fio && wv->fd > 0) {
431 			int wr = 0;
432 			while (wr < rd) {
433 				int _wr = write(wv->fd, pbuf + wr, rd - wr);
434 				assert (_wr == rd - wr);
435 				 if (_wr < 0) {
436 					 perror("write");
437 					 TRACE_ERROR("Failed to write.\n");
438 					 assert(0);
439 					 break;
440 				 }
441 				 wr += _wr;
442 				 wv->write += _wr;
443 			}
444 		}
445 
446 		if (wv->header_len && (wv->recv >= wv->header_len + wv->file_len)) {
447 			break;
448 		}
449 	}
450 
451 	if (rd > 0) {
452 		if (wv->header_len && (wv->recv >= wv->header_len + wv->file_len)) {
453 			TRACE_APP("Socket %d Done Write: "
454 					"header: %u file: %lu recv: %lu write: %lu\n",
455 					sockid, wv->header_len, wv->file_len,
456 					wv->recv - wv->header_len, wv->write);
457 
458 			if (keep_alive && ctx->started < ctx->target)
459 				DownloadNext(ctx, sockid, wv);
460 			else
461 				DownloadComplete(ctx, sockid, wv);
462 
463 			return 0;
464 		}
465 
466 	} else if (rd == 0) {
467 		/* connection closed by remote host */
468 		TRACE_DBG("Socket %d connection closed with server.\n", sockid);
469 
470 		if (wv->header_len && (wv->recv >= wv->header_len + wv->file_len)) {
471 			DownloadComplete(ctx, sockid, wv);
472 		} else {
473 			ctx->stat.errors++;
474 			ctx->incompletes++;
475 			CloseConnection(ctx, sockid);
476 		}
477 
478 	} else if (rd < 0) {
479 		if (errno != EAGAIN) {
480 			TRACE_DBG("Socket %d: mtcp_read() error %s\n",
481 					sockid, strerror(errno));
482 			ctx->stat.errors++;
483 			ctx->errors++;
484 			CloseConnection(ctx, sockid);
485 		}
486 	}
487 
488 	return 0;
489 }
490 /*----------------------------------------------------------------------------*/
491 static void
PrintStats()492 PrintStats()
493 {
494 	struct wget_stat total = {0};
495 	struct wget_stat *st;
496 	uint64_t avg_resp_time;
497 	uint64_t total_resp_time = 0;
498 	int i;
499 
500 	for (i = 0; i < core_limit; i++) {
501 		st = g_stat[i];
502 		if (!st)
503 			continue;
504 		avg_resp_time = st->completes? st->sum_resp_time / st->completes : 0;
505 
506 		total.waits += st->waits;
507 		total.events += st->events;
508 		total.connects += st->connects;
509 		total.reads += st->reads;
510 		total.writes += st->writes;
511 		total.completes += st->completes;
512 		total_resp_time += avg_resp_time;
513 		if (st->max_resp_time > total.max_resp_time)
514 			total.max_resp_time = st->max_resp_time;
515 		total.errors += st->errors;
516 		total.timedout += st->timedout;
517 
518 		memset(st, 0, sizeof(struct wget_stat));
519 	}
520 	fprintf(stderr, "[ ALL ] connect: %7lu, read: %4lu MB, write: %4lu MB, "
521 			"completes: %7lu (resp_time avg: %4lu, max: %6lu us)\n",
522 			total.connects,
523 			total.reads / 1024 / 1024, total.writes / 1024 / 1024,
524 			total.completes, total_resp_time / core_limit, total.max_resp_time);
525 }
526 /*----------------------------------------------------------------------------*/
527 static void
GlbInitWget()528 GlbInitWget()
529 {
530 	struct mtcp_conf mcfg;
531 	char *url;
532 	int total_concurrency = 0;
533 	int flow_per_thread;
534 	int flow_remainder_cnt;
535 	int i;
536 
537 	num_cores = GetNumCPUs();
538 	core_limit = num_cores;
539 	concurrency = 100;
540 	total_flows = -1;
541 
542 	LoadConfig(CONFIG_FILE, g_conf, NUM_CONF_VAR);
543 
544 	url = g_conf[0].value;
545 	total_flows = atoi(g_conf[1].value);
546 	core_limit = atoi(g_conf[2].value);
547 	total_concurrency = atoi(g_conf[3].value);
548 	dest_port = atoi(g_conf[4].value);
549 	keep_alive = atoi(g_conf[5].value);
550 
551 	if ((strlen(url) == 0)
552 			|| (strlen(url) > MAX_URL_LEN)
553 			|| (total_flows <= 0)) {
554 		TRACE_INFO("Invalid configuration\n");
555 		exit(0);
556 	}
557 
558 	char* slash_p = strchr(url, '/');
559 	if (slash_p) {
560 		strncpy(host, url, slash_p - url);
561 		strncpy(path, strchr(url, '/'), MAX_URL_LEN);
562 	} else {
563 		strncpy(host, url, MAX_IP_STR_LEN);
564 		strncpy(path, "/", 1);
565 	}
566 
567 	daddr = inet_addr(host);
568 	dport = (dest_port == 0) ? htons(PORT_NUM) : htons(dest_port);
569 	saddr = INADDR_ANY;
570 
571 	if (total_flows < core_limit) {
572 		core_limit = total_flows;
573 	}
574 
575 	/* per-core concurrency = total_concurrency / # cores */
576 	if (total_concurrency > 0)
577 		concurrency = total_concurrency / core_limit;
578 
579 	/* set the max number of fds 3x larger than concurrency */
580 	max_fds = concurrency * 3;
581 
582 	TRACE_CONFIG("Application configuration:\n");
583 	TRACE_CONFIG("URL: %s\n", path);
584 	TRACE_CONFIG("# of total_flows: %d\n", total_flows);
585 	TRACE_CONFIG("# of cores: %d\n", core_limit);
586 	TRACE_CONFIG("Concurrency: %d\n", total_concurrency);
587 
588 	mtcp_getconf(&mcfg);
589 	mcfg.max_concurrency = max_fds;
590 	mcfg.max_num_buffers = max_fds;
591 	mtcp_setconf(&mcfg);
592 
593 	flow_per_thread = total_flows / core_limit;
594 	flow_remainder_cnt = total_flows % core_limit;
595 
596 	for (i = 0; i < MAX_CPUS; i++) {
597 		done[i] = FALSE;
598 		flows[i] = flow_per_thread;
599 
600 		if (flow_remainder_cnt-- > 0)
601 			flows[i]++;
602 	}
603 
604 	return;
605 }
606 /*----------------------------------------------------------------------------*/
607 static void
InitWget(mctx_t mctx,void ** app_ctx)608 InitWget(mctx_t mctx, void **app_ctx)
609 {
610 	thread_context_t ctx;
611 	int ep;
612 
613 	assert(mctx);
614 
615 	int core = mctx->cpu;
616 
617 	ctx = CreateContext(mctx);
618 	if (!ctx)
619 		exit(-1);
620 	g_ctx[core] = ctx;
621 	g_stat[core] = &ctx->stat;
622 	srand(time(NULL));
623 
624 	mtcp_init_rss(mctx, saddr, IP_RANGE, daddr, dport);
625 
626 	if (flows[core] == 0) {
627 		TRACE_DBG("Application thread %d finished.\n", core);
628 		exit(-1);
629 	}
630 	ctx->target = flows[core];
631 
632 	/* Initialization */
633 	ctx->maxevents = max_fds * 3;
634 	ep = mtcp_epoll_create(mctx, ctx->maxevents);
635 	if (ep < 0) {
636 		TRACE_ERROR("Failed to create epoll struct!n");
637 		exit(EXIT_FAILURE);
638 	}
639 	ctx->events = (struct mtcp_epoll_event *)
640 			calloc(ctx->maxevents, sizeof(struct mtcp_epoll_event));
641 	if (!ctx->events) {
642 		TRACE_ERROR("Failed to allocate events!\n");
643 		exit(EXIT_FAILURE);
644 	}
645 	ctx->ep = ep;
646 
647 	ctx->wvars = (struct wget_vars *)calloc(max_fds, sizeof(struct wget_vars));
648 	if (!ctx->wvars) {
649 		TRACE_ERROR("Failed to create wget variables!\n");
650 		exit(EXIT_FAILURE);
651 	}
652 
653 	ctx->started = ctx->done = ctx->pending = 0;
654 	ctx->errors = ctx->incompletes = 0;
655 
656 	*app_ctx = ctx;
657 
658 	return;
659 }
660 /*----------------------------------------------------------------------------*/
661 static void
RunWget(mctx_t mctx,void ** app_ctx)662 RunWget(mctx_t mctx, void **app_ctx)
663 {
664 	struct in_addr daddr_in;
665 	struct timeval cur_tv, prev_tv;
666 	int nevents;
667 	int i;
668 
669 	assert(mctx);
670 	assert(*app_ctx);
671 
672 	thread_context_t ctx = *app_ctx;
673 	int core = ctx->core;
674 
675 	daddr_in.s_addr = daddr;
676 	fprintf(stderr, "Thread %d handles %d flows. connecting to %s:%u\n",
677 			core, flows[core], inet_ntoa(daddr_in), ntohs(dport));
678 
679 	gettimeofday(&cur_tv, NULL);
680 	prev_tv = cur_tv;
681 
682 	while (!done[core]) {
683 		gettimeofday(&cur_tv, NULL);
684 
685 		/* print statistics every second */
686 		if (core == 0 && cur_tv.tv_sec > prev_tv.tv_sec) {
687 			PrintStats();
688 			prev_tv = cur_tv;
689 		}
690 
691 		while (/*ctx->pending*/ mtcp_get_connection_cnt(ctx->mctx) < concurrency && ctx->started < ctx->target) {
692 			if (CreateConnection(ctx) < 0) {
693 				done[core] = TRUE;
694 				break;
695 			}
696 		}
697 
698 		nevents = mtcp_epoll_wait(mctx, ctx->ep,
699 				ctx->events, ctx->maxevents, ctx->pending ? -1 : 10);
700 		ctx->stat.waits++;
701 
702 		if (nevents < 0) {
703 			if (errno != EINTR) {
704 				TRACE_ERROR("mtcp_epoll_wait failed! ret: %d\n", nevents);
705 			}
706 			done[core] = TRUE;
707 			break;
708 		} else {
709 			ctx->stat.events += nevents;
710 		}
711 
712 		for (i = 0; i < nevents; i++) {
713 
714 			if (ctx->events[i].events & MOS_EPOLLERR) {
715 				int err;
716 				socklen_t len = sizeof(err);
717 
718 				TRACE_APP("[CPU %d] Error on socket %d\n",
719 						core, ctx->events[i].data.sockid);
720 				ctx->stat.errors++;
721 				ctx->errors++;
722 				if (mtcp_getsockopt(mctx, ctx->events[i].data.sock,
723 							SOL_SOCKET, SO_ERROR, (void *)&err, &len) == 0) {
724 					if (err == ETIMEDOUT)
725 						ctx->stat.timedout++;
726 				}
727 				CloseConnection(ctx, ctx->events[i].data.sock);
728 
729 			} else if (ctx->events[i].events & MOS_EPOLLIN) {
730 				HandleReadEvent(ctx,
731 						ctx->events[i].data.sock,
732 						&ctx->wvars[ctx->events[i].data.sock]);
733 
734 			} else if (ctx->events[i].events == MOS_EPOLLOUT) {
735 				struct wget_vars *wv = &ctx->wvars[ctx->events[i].data.sock];
736 
737 				if (!wv->request_sent) {
738 					SendHTTPRequest(ctx, ctx->events[i].data.sock, wv);
739 				} else {
740 					//TRACE_DBG("Request already sent.\n");
741 				}
742 
743 			} else {
744 				TRACE_ERROR("Socket %d: event: %s\n",
745 						ctx->events[i].data.sock,
746 						EventToString(ctx->events[i].events));
747 				assert(0);
748 			}
749 		}
750 
751 		if (ctx->done >= ctx->target) {
752 			fprintf(stdout, "Completed %d connections, "
753 					"errors: %d incompletes: %d\n",
754 					ctx->done, ctx->errors, ctx->incompletes);
755 			break;
756 		}
757 	}
758 
759 	TRACE_INFO("Wget thread %d waiting for mtcp to be destroyed.\n", core);
760 
761 	g_stat[core] = NULL;
762 	g_ctx[core] = NULL;
763 	DestroyContext(ctx);
764 
765 	return;
766 }
767 /*----------------------------------------------------------------------------*/
768 void
RunApplication(mctx_t mctx)769 RunApplication(mctx_t mctx)
770 {
771 	void *app_ctx;
772 
773 	app_ctx = (void *)calloc(1, sizeof(void *));
774 	if (!app_ctx) {
775 		TRACE_ERROR("calloc failure\n");
776 		return;
777 	}
778 
779 	TRACE_INFO("run application on core %d\n", mctx->cpu);
780 	InitWget(mctx, &(app_ctx));
781 	RunWget(mctx, &(app_ctx));
782 }
783 /*----------------------------------------------------------------------------*/
784 void *
RunMTCP(void * arg)785 RunMTCP(void *arg)
786 {
787 	int core = *(int *)arg;
788 	mctx_t mctx;
789 
790 	mtcp_core_affinitize(core);
791 
792 	/* mTCP Initialization */
793 	mctx = mtcp_create_context(core);
794 	if (!mctx) {
795 		pthread_exit(NULL);
796 		TRACE_ERROR("Failed to craete mtcp context.\n");
797 		return NULL;
798 	}
799 
800 	/* Run application here */
801 	RunApplication(mctx);
802 
803 	/* mTCP Tear Down */
804 	mtcp_destroy_context(mctx);
805 	pthread_exit(NULL);
806 
807 	return NULL;
808 }
809 /*----------------------------------------------------------------------------*/
810 int
main(int argc,char ** argv)811 main(int argc, char **argv)
812 {
813 	int ret, i;
814 	int cores[MAX_CPUS];
815 	char *fname = "config/mos.conf";
816 
817 	int opt;
818 	while ((opt = getopt(argc, argv, "f:")) != -1) {
819 		switch (opt) {
820 			case 'f':
821 				fname = optarg;
822 				break;
823 			default:
824 				printf("Usage: %s [-f config_file]\n", argv[0]);
825 				return 0;
826 		}
827 
828 	}
829 
830 	core_limit = sysconf(_SC_NPROCESSORS_ONLN);
831 
832 	ret = mtcp_init(fname);
833 	if (ret) {
834 		TRACE_ERROR("Failed to initialize mtcp.\n");
835 		exit(EXIT_FAILURE);
836 	}
837 
838 	mtcp_getconf(&g_mcfg);
839 
840 	core_limit = g_mcfg.num_cores;
841 
842 	GlbInitWget();
843 
844 	for (i = 0; i < core_limit; i++) {
845 		cores[i] = i;
846 
847 		/* Run mtcp thread */
848 		if ((g_mcfg.cpu_mask & (1L << i)) &&
849 			pthread_create(&mtcp_thread[i], NULL, RunMTCP, (void *)&cores[i])) {
850 			perror("pthread_create");
851 			TRACE_ERROR("Failed to create msg_test thread.\n");
852 			exit(-1);
853 		}
854 	}
855 
856 	for (i = 0; i < core_limit; i++) {
857 		if (g_mcfg.cpu_mask & (1L << i))
858 			pthread_join(mtcp_thread[i], NULL);
859 		TRACE_INFO("Message test thread %d joined.\n", i);
860 	}
861 
862 	mtcp_destroy();
863 	return 0;
864 }
865 /*----------------------------------------------------------------------------*/
866