1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <unistd.h>
5 #include <stdint.h>
6 #include <time.h>
7 #include <sys/time.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <fcntl.h>
11 #include <pthread.h>
12 #include <signal.h>
13 #include <sys/socket.h>
14 #include <netinet/in.h>
15 #include <arpa/inet.h>
16 #include <sys/queue.h>
17 #include <assert.h>
18
19 #include <mos_api.h>
20 #include "cpu.h"
21 #include "rss.h"
22 #include "http_parsing.h"
23 #include "debug.h"
24 #include "applib.h"
25
26 #define CONFIG_FILE "config/epwget.conf"
27 static struct conf_var g_conf[] = {
28 { "url", {0} },
29 { "total_flows", {0} },
30 { "core_limit", {0} },
31 { "total_concurrency", {0} },
32 { "dest_port", {0} },
33 { "keep_alive", {0} },
34 };
35 #define NUM_CONF_VAR (sizeof(g_conf) / sizeof(struct conf_var))
36
37 #define PORT_NUM 80
38 //#define PORT_NUM 3333
39
40 #define MAX_URL_LEN 128
41 #define MAX_FILE_LEN 128
42 #define HTTP_HEADER_LEN 1024
43
44 #define IP_RANGE 1
45 #define MAX_IP_STR_LEN 16
46
47 #define BUF_SIZE (8*1024)
48
49 /*----------------------------------------------------------------------------*/
50 struct mtcp_conf g_mcfg;
51 /*----------------------------------------------------------------------------*/
52 static mctx_t g_mctx[MAX_CPUS];
53 static int done[MAX_CPUS];
54 /*----------------------------------------------------------------------------*/
55 static int num_cores;
56 static int core_limit;
57 /*----------------------------------------------------------------------------*/
58 static int fio = FALSE;
59 static char outfile[MAX_FILE_LEN + 1];
60 /*----------------------------------------------------------------------------*/
61 static char host[MAX_IP_STR_LEN + 1];
62 static char path[MAX_URL_LEN + 1];
63 static in_addr_t daddr;
64 static in_port_t dport;
65 static in_addr_t saddr;
66 /*----------------------------------------------------------------------------*/
67 static int total_flows;
68 static int flows[MAX_CPUS];
69 static int flowcnt = 0;
70 static int concurrency;
71 static int max_fds;
72 static uint16_t dest_port;
73 static int keep_alive = FALSE;
74 /*----------------------------------------------------------------------------*/
75 struct wget_stat
76 {
77 uint64_t waits;
78 uint64_t events;
79 uint64_t connects;
80 uint64_t reads;
81 uint64_t writes;
82 uint64_t completes;
83
84 uint64_t errors;
85 uint64_t timedout;
86
87 uint64_t sum_resp_time;
88 uint64_t max_resp_time;
89 };
90 /*----------------------------------------------------------------------------*/
91 struct thread_context
92 {
93 int core;
94
95 mctx_t mctx;
96 int ep;
97 struct wget_vars *wvars;
98
99 int target;
100 int started;
101 int errors;
102 int incompletes;
103 int done;
104 int pending;
105
106 int maxevents;
107 struct mtcp_epoll_event *events;
108
109 struct wget_stat stat;
110 };
111 typedef struct thread_context* thread_context_t;
112 /*----------------------------------------------------------------------------*/
113 struct wget_vars
114 {
115 int request_sent;
116
117 char response[HTTP_HEADER_LEN];
118 int resp_len;
119 int headerset;
120 uint32_t header_len;
121 uint64_t file_len;
122 uint64_t recv;
123 uint64_t write;
124
125 struct timeval t_start;
126 struct timeval t_end;
127
128 int fd;
129 };
130 /*----------------------------------------------------------------------------*/
131 static struct thread_context *g_ctx[MAX_CPUS] = {0};
132 static struct wget_stat *g_stat[MAX_CPUS] = {0};
133 /*----------------------------------------------------------------------------*/
134 static thread_context_t
CreateContext(mctx_t mctx)135 CreateContext(mctx_t mctx)
136 {
137 thread_context_t ctx;
138
139 ctx = (thread_context_t)calloc(1, sizeof(struct thread_context));
140 if (!ctx) {
141 perror("malloc");
142 TRACE_ERROR("Failed to allocate memory for thread context.\n");
143 return NULL;
144 }
145
146 ctx->mctx = mctx;
147 ctx->core = mctx->cpu;
148
149 g_mctx[ctx->core] = mctx;
150
151 return ctx;
152 }
153 /*----------------------------------------------------------------------------*/
154 static void
DestroyContext(thread_context_t ctx)155 DestroyContext(thread_context_t ctx)
156 {
157 g_stat[ctx->core] = NULL;
158 free(ctx);
159 }
160 /*----------------------------------------------------------------------------*/
161 static inline int
CreateConnection(thread_context_t ctx)162 CreateConnection(thread_context_t ctx)
163 {
164 mctx_t mctx = ctx->mctx;
165 struct mtcp_epoll_event ev;
166 struct sockaddr_in addr;
167 int sockid;
168 int ret;
169
170 assert(mctx);
171
172 errno = 0;
173 sockid = mtcp_socket(mctx, AF_INET, SOCK_STREAM, 0);
174 if (sockid < 0) {
175 TRACE_INFO("Failed to create socket! (%s)\n",
176 strerror(errno));
177 return -1;
178 }
179 memset(&ctx->wvars[sockid], 0, sizeof(struct wget_vars));
180 ret = mtcp_setsock_nonblock(mctx, sockid);
181 if (ret < 0) {
182 TRACE_ERROR("Failed to set socket in nonblocking mode.\n");
183 exit(-1);
184 }
185
186 addr.sin_family = AF_INET;
187 addr.sin_addr.s_addr = daddr;
188 addr.sin_port = dport;
189
190 ret = mtcp_connect(mctx, sockid,
191 (struct sockaddr *)&addr, sizeof(struct sockaddr_in));
192 if (ret < 0) {
193 if (errno != EINPROGRESS) {
194 perror("mtcp_connect");
195 mtcp_close(mctx, sockid);
196 return -1;
197 }
198 }
199
200 ctx->started++;
201 ctx->pending++;
202 ctx->stat.connects++;
203
204 ev.events = MOS_EPOLLOUT;
205 ev.data.sock = sockid;
206 mtcp_epoll_ctl(mctx, ctx->ep, MOS_EPOLL_CTL_ADD, sockid, &ev);
207
208 return sockid;
209 }
210 /*----------------------------------------------------------------------------*/
211 static inline void
CloseConnection(thread_context_t ctx,int sockid)212 CloseConnection(thread_context_t ctx, int sockid)
213 {
214 mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_DEL, sockid, NULL);
215 mtcp_close(ctx->mctx, sockid);
216 ctx->pending--;
217 ctx->done++;
218 assert(ctx->pending >= 0);
219 while (/*ctx->pending*/ mtcp_get_connection_cnt(ctx->mctx) < concurrency && ctx->started < ctx->target) {
220 if (CreateConnection(ctx) < 0) {
221 done[ctx->core] = TRUE;
222 break;
223 }
224 }
225 }
226 /*----------------------------------------------------------------------------*/
227 static inline int
SendHTTPRequest(thread_context_t ctx,int sockid,struct wget_vars * wv)228 SendHTTPRequest(thread_context_t ctx, int sockid, struct wget_vars *wv)
229 {
230 char request[HTTP_HEADER_LEN];
231 struct mtcp_epoll_event ev;
232 int wr;
233 int len;
234
235 wv->headerset = FALSE;
236 wv->recv = 0;
237 wv->header_len = wv->file_len = 0;
238
239 if (keep_alive) {
240 snprintf(request, HTTP_HEADER_LEN, "GET %s HTTP/1.0\r\n"
241 "User-Agent: Wget/1.12 (linux-gnu)\r\n"
242 "Accept: */*\r\n"
243 "Host: %s\r\n"
244 "Connection: Keep-Alive\r\n\r\n",
245 path, host);
246 } else {
247 snprintf(request, HTTP_HEADER_LEN, "GET %s HTTP/1.0\r\n"
248 "User-Agent: Wget/1.12 (linux-gnu)\r\n"
249 "Accept: */*\r\n"
250 "Host: %s\r\n"
251 // "Connection: Keep-Alive\r\n\r\n",
252 "Connection: Close\r\n\r\n",
253 path, host);
254 }
255
256 len = strlen(request);
257
258 wr = mtcp_write(ctx->mctx, sockid, request, len);
259 if (wr < len) {
260 TRACE_ERROR("Socket %d: Sending HTTP request failed. "
261 "try: %d, sent: %d\n", sockid, len, wr);
262 CloseConnection(ctx, sockid);
263 }
264 ctx->stat.writes += wr;
265 TRACE_APP("Socket %d HTTP Request of %d bytes. sent.\n", sockid, wr);
266 wv->request_sent = TRUE;
267
268 ev.events = MOS_EPOLLIN;
269 ev.data.sock = sockid;
270 mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_MOD, sockid, &ev);
271
272 gettimeofday(&wv->t_start, NULL);
273
274 char fname[MAX_FILE_LEN + 1];
275 if (fio) {
276 snprintf(fname, MAX_FILE_LEN, "%s.%d", outfile, flowcnt++);
277 wv->fd = open(fname, O_WRONLY | O_CREAT | O_TRUNC, 0644);
278 if (wv->fd < 0) {
279 TRACE_APP("Failed to open file descriptor for %s\n", fname);
280 exit(1);
281 }
282 }
283
284 return 0;
285 }
286 /*----------------------------------------------------------------------------*/
287 static inline int
DownloadNext(thread_context_t ctx,int sockid,struct wget_vars * wv)288 DownloadNext(thread_context_t ctx, int sockid, struct wget_vars *wv)
289 {
290 mctx_t mctx = ctx->mctx;
291 uint64_t tdiff;
292 struct mtcp_epoll_event ev;
293
294 TRACE_APP("Socket %d File download complete!\n", sockid);
295 gettimeofday(&wv->t_end, NULL);
296
297 ctx->stat.completes++;
298
299 if (wv->recv - wv->header_len != wv->file_len) {
300 fprintf(stderr, "Response size mismatch! "
301 "actual recved: %ld, expected to recved: %ld\n",
302 wv->recv-wv->header_len, wv->file_len);
303 }
304
305 tdiff = (wv->t_end.tv_sec - wv->t_start.tv_sec) * 1000000 +
306 (wv->t_end.tv_usec - wv->t_start.tv_usec);
307 TRACE_APP("Socket %d Total received bytes: %lu (%luMB)\n",
308 sockid, wv->recv, wv->recv / 1000000);
309 TRACE_APP("Socket %d Total spent time: %lu us\n", sockid, tdiff);
310 if (tdiff > 0) {
311 TRACE_APP("Socket %d Average bandwidth: %lf[MB/s]\n",
312 sockid, (double)wv->recv / tdiff);
313 }
314 ctx->stat.sum_resp_time += tdiff;
315 if (tdiff > ctx->stat.max_resp_time)
316 ctx->stat.max_resp_time = tdiff;
317
318 ctx->done++;
319
320 /* instead of closing the connection, download the next file */
321 memset(&ctx->wvars[sockid], 0, sizeof(struct wget_vars));
322
323 ctx->started++;
324
325 ev.events = MOS_EPOLLOUT;
326 ev.data.sock = sockid;
327 mtcp_epoll_ctl(mctx, ctx->ep, MOS_EPOLL_CTL_MOD, sockid, &ev);
328
329 return 0;
330 }
331 /*----------------------------------------------------------------------------*/
332 static inline int
DownloadComplete(thread_context_t ctx,int sockid,struct wget_vars * wv)333 DownloadComplete(thread_context_t ctx, int sockid, struct wget_vars *wv)
334 {
335 #ifdef APP
336 mctx_t mctx = ctx->mctx;
337 #endif
338 uint64_t tdiff;
339
340 TRACE_APP("Socket %d File download complete!\n", sockid);
341 gettimeofday(&wv->t_end, NULL);
342 CloseConnection(ctx, sockid);
343 ctx->stat.completes++;
344
345 if (wv->recv - wv->header_len != wv->file_len) {
346 fprintf(stderr, "Response size mismatch! "
347 "actual recved: %ld, expected to recved: %ld\n",
348 wv->recv-wv->header_len, wv->file_len);
349 }
350
351 tdiff = (wv->t_end.tv_sec - wv->t_start.tv_sec) * 1000000 +
352 (wv->t_end.tv_usec - wv->t_start.tv_usec);
353 TRACE_APP("Socket %d Total received bytes: %lu (%luMB)\n",
354 sockid, wv->recv, wv->recv / 1000000);
355 TRACE_APP("Socket %d Total spent time: %lu us\n", sockid, tdiff);
356 if (tdiff > 0) {
357 TRACE_APP("Socket %d Average bandwidth: %lf[MB/s]\n",
358 sockid, (double)wv->recv / tdiff);
359 }
360 ctx->stat.sum_resp_time += tdiff;
361 if (tdiff > ctx->stat.max_resp_time)
362 ctx->stat.max_resp_time = tdiff;
363
364 if (fio && wv->fd > 0)
365 close(wv->fd);
366
367 return 0;
368 }
369 /*----------------------------------------------------------------------------*/
370 static inline int
HandleReadEvent(thread_context_t ctx,int sockid,struct wget_vars * wv)371 HandleReadEvent(thread_context_t ctx, int sockid, struct wget_vars *wv)
372 {
373 mctx_t mctx = ctx->mctx;
374 char buf[BUF_SIZE];
375 char *pbuf;
376 int rd, copy_len;
377
378 rd = 1;
379 while (rd > 0) {
380 rd = mtcp_read(mctx, sockid, buf, BUF_SIZE);
381 if (rd <= 0)
382 break;
383 ctx->stat.reads += rd;
384
385 TRACE_APP("Socket %d: mtcp_read ret: %d, total_recv: %lu, "
386 "header_set: %d, header_len: %u, file_len: %lu\n",
387 sockid, rd, wv->recv + rd,
388 wv->headerset, wv->header_len, wv->file_len);
389
390 pbuf = buf;
391 if (!wv->headerset) {
392 copy_len = MIN(rd, HTTP_HEADER_LEN - wv->resp_len);
393 memcpy(wv->response + wv->resp_len, buf, copy_len);
394 wv->resp_len += copy_len;
395 wv->header_len = find_http_header(wv->response, wv->resp_len);
396 if (wv->header_len > 0) {
397 wv->response[wv->header_len] = '\0';
398 wv->file_len = http_header_long_val(wv->response,
399 CONTENT_LENGTH_HDR, sizeof(CONTENT_LENGTH_HDR) - 1);
400 if (wv->file_len < 0) {
401 /* failed to find for the Content-Length field */
402 wv->recv += rd;
403 rd = 0;
404 CloseConnection(ctx, sockid);
405 return 0;
406 }
407 TRACE_APP("Socket %d Parsed response header. "
408 "Header length: %u, File length: %lu (%luMB)\n",
409 sockid, wv->header_len,
410 wv->file_len, wv->file_len / 1024 / 1024);
411 wv->headerset = TRUE;
412 wv->recv += (rd - (wv->resp_len - wv->header_len));
413 rd = (wv->resp_len - wv->header_len);
414
415 pbuf += (rd - (wv->resp_len - wv->header_len));
416
417 } else {
418 /* failed to parse response header */
419 wv->recv += rd;
420 rd = 0;
421 ctx->stat.errors++;
422 ctx->errors++;
423 CloseConnection(ctx, sockid);
424 return 0;
425 }
426 }
427 wv->recv += rd;
428
429 if (fio && wv->fd > 0) {
430 int wr = 0;
431 while (wr < rd) {
432 int _wr = write(wv->fd, pbuf + wr, rd - wr);
433 assert (_wr == rd - wr);
434 if (_wr < 0) {
435 perror("write");
436 TRACE_ERROR("Failed to write.\n");
437 assert(0);
438 break;
439 }
440 wr += _wr;
441 wv->write += _wr;
442 }
443 }
444
445 if (wv->header_len && (wv->recv >= wv->header_len + wv->file_len)) {
446 break;
447 }
448 }
449
450 if (rd > 0) {
451 if (wv->header_len && (wv->recv >= wv->header_len + wv->file_len)) {
452 TRACE_APP("Socket %d Done Write: "
453 "header: %u file: %lu recv: %lu write: %lu\n",
454 sockid, wv->header_len, wv->file_len,
455 wv->recv - wv->header_len, wv->write);
456 if (keep_alive && ctx->started < ctx->target)
457 DownloadNext(ctx, sockid, wv);
458 else
459 DownloadComplete(ctx, sockid, wv);
460
461 return 0;
462 }
463
464 } else if (rd == 0) {
465 /* connection closed by remote host */
466 TRACE_DBG("Socket %d connection closed with server.\n", sockid);
467
468 if (wv->header_len && (wv->recv >= wv->header_len + wv->file_len)) {
469 DownloadComplete(ctx, sockid, wv);
470 } else {
471 ctx->stat.errors++;
472 ctx->incompletes++;
473 CloseConnection(ctx, sockid);
474 }
475
476 } else if (rd < 0) {
477 if (errno != EAGAIN) {
478 TRACE_DBG("Socket %d: mtcp_read() error %s\n",
479 sockid, strerror(errno));
480 ctx->stat.errors++;
481 ctx->errors++;
482 CloseConnection(ctx, sockid);
483 }
484 }
485
486 return 0;
487 }
488 /*----------------------------------------------------------------------------*/
489 static void
PrintStats()490 PrintStats()
491 {
492 struct wget_stat total = {0};
493 struct wget_stat *st;
494 uint64_t avg_resp_time;
495 uint64_t total_resp_time = 0;
496 int i;
497
498 for (i = 0; i < core_limit; i++) {
499 st = g_stat[i];
500 if (!st)
501 continue;
502 avg_resp_time = st->completes? st->sum_resp_time / st->completes : 0;
503
504 total.waits += st->waits;
505 total.events += st->events;
506 total.connects += st->connects;
507 total.reads += st->reads;
508 total.writes += st->writes;
509 total.completes += st->completes;
510 total_resp_time += avg_resp_time;
511 if (st->max_resp_time > total.max_resp_time)
512 total.max_resp_time = st->max_resp_time;
513 total.errors += st->errors;
514 total.timedout += st->timedout;
515
516 memset(st, 0, sizeof(struct wget_stat));
517 }
518 fprintf(stderr, "[ ALL ] connect: %7lu, read: %4lu MB, write: %4lu MB, "
519 "completes: %7lu (resp_time avg: %4lu, max: %6lu us)\n",
520 total.connects,
521 total.reads / 1024 / 1024, total.writes / 1024 / 1024,
522 total.completes, total_resp_time / core_limit, total.max_resp_time);
523 }
524 /*----------------------------------------------------------------------------*/
525 static void
GlbInitWget()526 GlbInitWget()
527 {
528 struct mtcp_conf mcfg;
529 char *url;
530 int total_concurrency = 0;
531 int flow_per_thread;
532 int flow_remainder_cnt;
533 int i;
534
535 num_cores = GetNumCPUs();
536 core_limit = num_cores;
537 concurrency = 100;
538 total_flows = -1;
539
540 LoadConfig(CONFIG_FILE, g_conf, NUM_CONF_VAR);
541
542 url = g_conf[0].value;
543 total_flows = atoi(g_conf[1].value);
544 core_limit = atoi(g_conf[2].value);
545 total_concurrency = atoi(g_conf[3].value);
546 dest_port = atoi(g_conf[4].value);
547 keep_alive = atoi(g_conf[5].value);
548
549 if ((strlen(url) == 0)
550 || (strlen(url) > MAX_URL_LEN)
551 || (total_flows <= 0)) {
552 TRACE_INFO("Invalid configuration\n");
553 exit(0);
554 }
555
556 char* slash_p = strchr(url, '/');
557 if (slash_p) {
558 strncpy(host, url, slash_p - url);
559 strncpy(path, strchr(url, '/'), MAX_URL_LEN);
560 } else {
561 strncpy(host, url, MAX_IP_STR_LEN);
562 strncpy(path, "/", 1);
563 }
564
565 daddr = inet_addr(host);
566 dport = (dest_port == 0) ? htons(PORT_NUM) : htons(dest_port);
567 saddr = INADDR_ANY;
568
569 if (total_flows < core_limit) {
570 core_limit = total_flows;
571 }
572
573 /* per-core concurrency = total_concurrency / # cores */
574 if (total_concurrency > 0)
575 concurrency = total_concurrency / core_limit;
576
577 /* set the max number of fds 3x larger than concurrency */
578 max_fds = concurrency * 3;
579
580 TRACE_CONFIG("Application configuration:\n");
581 TRACE_CONFIG("URL: %s\n", path);
582 TRACE_CONFIG("# of total_flows: %d\n", total_flows);
583 TRACE_CONFIG("# of cores: %d\n", core_limit);
584 TRACE_CONFIG("Concurrency: %d\n", total_concurrency);
585
586 mtcp_getconf(&mcfg);
587 mcfg.max_concurrency = max_fds;
588 mcfg.max_num_buffers = max_fds;
589 mtcp_setconf(&mcfg);
590
591 flow_per_thread = total_flows / core_limit;
592 flow_remainder_cnt = total_flows % core_limit;
593
594 for (i = 0; i < MAX_CPUS; i++) {
595 done[i] = FALSE;
596 flows[i] = flow_per_thread;
597
598 if (flow_remainder_cnt-- > 0)
599 flows[i]++;
600 }
601
602 return;
603 }
604 /*----------------------------------------------------------------------------*/
605 static void
InitWget(mctx_t mctx,void ** app_ctx)606 InitWget(mctx_t mctx, void **app_ctx)
607 {
608 thread_context_t ctx;
609 int ep;
610
611 assert(mctx);
612
613 int core = mctx->cpu;
614
615 ctx = CreateContext(mctx);
616 if (!ctx)
617 exit(-1);
618 g_ctx[core] = ctx;
619 g_stat[core] = &ctx->stat;
620 srand(time(NULL));
621
622 mtcp_init_rss(mctx, saddr, IP_RANGE, daddr, dport);
623
624 if (flows[core] == 0) {
625 TRACE_DBG("Application thread %d finished.\n", core);
626 exit(-1);
627 }
628 ctx->target = flows[core];
629
630 /* Initialization */
631 ctx->maxevents = max_fds * 3;
632 ep = mtcp_epoll_create(mctx, ctx->maxevents);
633 if (ep < 0) {
634 TRACE_ERROR("Failed to create epoll struct!n");
635 exit(EXIT_FAILURE);
636 }
637 ctx->events = (struct mtcp_epoll_event *)
638 calloc(ctx->maxevents, sizeof(struct mtcp_epoll_event));
639 if (!ctx->events) {
640 TRACE_ERROR("Failed to allocate events!\n");
641 exit(EXIT_FAILURE);
642 }
643 ctx->ep = ep;
644
645 ctx->wvars = (struct wget_vars *)calloc(max_fds, sizeof(struct wget_vars));
646 if (!ctx->wvars) {
647 TRACE_ERROR("Failed to create wget variables!\n");
648 exit(EXIT_FAILURE);
649 }
650
651 ctx->started = ctx->done = ctx->pending = 0;
652 ctx->errors = ctx->incompletes = 0;
653
654 *app_ctx = ctx;
655
656 return;
657 }
658 /*----------------------------------------------------------------------------*/
659 static void
RunWget(mctx_t mctx,void ** app_ctx)660 RunWget(mctx_t mctx, void **app_ctx)
661 {
662 struct in_addr daddr_in;
663 struct timeval cur_tv, prev_tv;
664 int nevents;
665 int i;
666
667 assert(mctx);
668 assert(*app_ctx);
669
670 thread_context_t ctx = *app_ctx;
671 int core = ctx->core;
672
673 daddr_in.s_addr = daddr;
674 fprintf(stderr, "Thread %d handles %d flows. connecting to %s:%u\n",
675 core, flows[core], inet_ntoa(daddr_in), ntohs(dport));
676
677 gettimeofday(&cur_tv, NULL);
678 prev_tv = cur_tv;
679
680 while (!done[core]) {
681 gettimeofday(&cur_tv, NULL);
682
683 /* print statistics every second */
684 if (core == 0 && cur_tv.tv_sec > prev_tv.tv_sec) {
685 PrintStats();
686 prev_tv = cur_tv;
687 }
688
689 while (/*ctx->pending*/ mtcp_get_connection_cnt(ctx->mctx) < concurrency && ctx->started < ctx->target) {
690 if (CreateConnection(ctx) < 0) {
691 done[core] = TRUE;
692 break;
693 }
694 }
695
696 nevents = mtcp_epoll_wait(mctx, ctx->ep,
697 ctx->events, ctx->maxevents, ctx->pending ? -1 : 10);
698 ctx->stat.waits++;
699
700 if (nevents < 0) {
701 if (errno != EINTR) {
702 TRACE_ERROR("mtcp_epoll_wait failed! ret: %d\n", nevents);
703 }
704 done[core] = TRUE;
705 break;
706 } else {
707 ctx->stat.events += nevents;
708 }
709
710 for (i = 0; i < nevents; i++) {
711
712 if (ctx->events[i].events & MOS_EPOLLERR) {
713 int err;
714 socklen_t len = sizeof(err);
715
716 TRACE_APP("[CPU %d] Error on socket %d\n",
717 core, ctx->events[i].data.sockid);
718 ctx->stat.errors++;
719 ctx->errors++;
720 if (mtcp_getsockopt(mctx, ctx->events[i].data.sock,
721 SOL_SOCKET, SO_ERROR, (void *)&err, &len) == 0) {
722 if (err == ETIMEDOUT)
723 ctx->stat.timedout++;
724 }
725 CloseConnection(ctx, ctx->events[i].data.sock);
726
727 } else if (ctx->events[i].events & MOS_EPOLLIN) {
728 HandleReadEvent(ctx,
729 ctx->events[i].data.sock,
730 &ctx->wvars[ctx->events[i].data.sock]);
731
732 } else if (ctx->events[i].events == MOS_EPOLLOUT) {
733 struct wget_vars *wv = &ctx->wvars[ctx->events[i].data.sock];
734
735 if (!wv->request_sent) {
736 SendHTTPRequest(ctx, ctx->events[i].data.sock, wv);
737 } else {
738 //TRACE_DBG("Request already sent.\n");
739 }
740
741 } else {
742 TRACE_ERROR("Socket %d: event: %s\n",
743 ctx->events[i].data.sock,
744 EventToString(ctx->events[i].events));
745 assert(0);
746 }
747 }
748
749 if (ctx->done >= ctx->target) {
750 fprintf(stdout, "Completed %d connections, "
751 "errors: %d incompletes: %d\n",
752 ctx->done, ctx->errors, ctx->incompletes);
753 break;
754 }
755 }
756
757 TRACE_INFO("Wget thread %d waiting for mtcp to be destroyed.\n", core);
758
759 g_stat[core] = NULL;
760 g_ctx[core] = NULL;
761 DestroyContext(ctx);
762
763 return;
764 }
765 /*----------------------------------------------------------------------------*/
766 void
RunApplication(mctx_t mctx)767 RunApplication(mctx_t mctx)
768 {
769 void *app_ctx;
770
771 app_ctx = (void *)calloc(1, sizeof(void *));
772 if (!app_ctx) {
773 TRACE_ERROR("calloc failure\n");
774 return;
775 }
776
777 TRACE_INFO("run application on core %d\n", mctx->cpu);
778 InitWget(mctx, &(app_ctx));
779 RunWget(mctx, &(app_ctx));
780 }
781 /*----------------------------------------------------------------------------*/
782 void *
RunMTCP(void * arg)783 RunMTCP(void *arg)
784 {
785 int core = *(int *)arg;
786 mctx_t mctx;
787
788 mtcp_core_affinitize(core);
789
790 /* mTCP Initialization */
791 mctx = mtcp_create_context(core);
792 if (!mctx) {
793 pthread_exit(NULL);
794 TRACE_ERROR("Failed to craete mtcp context.\n");
795 return NULL;
796 }
797
798 /* Run application here */
799 RunApplication(mctx);
800
801 /* mTCP Tear Down */
802 mtcp_destroy_context(mctx);
803
804 return NULL;
805 }
806 /*----------------------------------------------------------------------------*/
807 int
main(int argc,char ** argv)808 main(int argc, char **argv)
809 {
810 int ret;
811 int current_core = -1;
812 char *fname = "config/mos.conf";
813
814 int opt;
815 while ((opt = getopt(argc, argv, "f:c:")) != -1) {
816 switch (opt) {
817 case 'f':
818 fname = optarg;
819 break;
820 case 'c':
821 current_core = atoi(optarg);
822 if (current_core >= MAX_CPUS) {
823 TRACE_ERROR("Invalid core id!\n");
824 exit(EXIT_FAILURE);
825 }
826 break;
827 default:
828 printf("Usage: %s [-f config_file]\n", argv[0]);
829 return 0;
830 }
831
832 }
833
834 core_limit = sysconf(_SC_NPROCESSORS_ONLN);
835
836 ret = mtcp_init(fname);
837 if (ret) {
838 TRACE_ERROR("Failed to initialize mtcp.\n");
839 exit(EXIT_FAILURE);
840 }
841
842 mtcp_getconf(&g_mcfg);
843
844 core_limit = g_mcfg.num_cores;
845
846 GlbInitWget();
847
848 RunMTCP(¤t_core);
849
850 mtcp_destroy();
851 return 0;
852 }
853 /*----------------------------------------------------------------------------*/
854