1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <unistd.h>
5 #include <stdint.h>
6 #include <time.h>
7 #include <sys/time.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <fcntl.h>
11 #include <pthread.h>
12 #include <signal.h>
13 #include <sys/socket.h>
14 #include <netinet/in.h>
15 #include <arpa/inet.h>
16 #include <sys/queue.h>
17 #include <assert.h>
18
19 #include <mos_api.h>
20 #include "cpu.h"
21 #include "rss.h"
22 #include "http_parsing.h"
23 #include "debug.h"
24 #include "applib.h"
25
26 #define CONFIG_FILE "config/epwget.conf"
27 static struct conf_var g_conf[] = {
28 { "url", {0} },
29 { "total_flows", {0} },
30 { "core_limit", {0} },
31 { "total_concurrency", {0} },
32 { "dest_port", {0} },
33 { "keep_alive", {0} },
34 };
35 #define NUM_CONF_VAR (sizeof(g_conf) / sizeof(struct conf_var))
36
37 #define PORT_NUM 80
38 //#define PORT_NUM 3333
39
40 #define MAX_URL_LEN 128
41 #define MAX_FILE_LEN 128
42 #define HTTP_HEADER_LEN 1024
43
44 #define IP_RANGE 1
45 #define MAX_IP_STR_LEN 16
46
47 #define BUF_SIZE (8*1024)
48
49 /*----------------------------------------------------------------------------*/
50 struct mtcp_conf g_mcfg;
51 static pthread_t mtcp_thread[MAX_CPUS];
52 /*----------------------------------------------------------------------------*/
53 static mctx_t g_mctx[MAX_CPUS];
54 static int done[MAX_CPUS];
55 /*----------------------------------------------------------------------------*/
56 static int num_cores;
57 static int core_limit;
58 /*----------------------------------------------------------------------------*/
59 static int fio = FALSE;
60 static char outfile[MAX_FILE_LEN + 1];
61 /*----------------------------------------------------------------------------*/
62 static char host[MAX_IP_STR_LEN + 1];
63 static char path[MAX_URL_LEN + 1];
64 static in_addr_t daddr;
65 static in_port_t dport;
66 static in_addr_t saddr;
67 /*----------------------------------------------------------------------------*/
68 static int total_flows;
69 static int flows[MAX_CPUS];
70 static int flowcnt = 0;
71 static int concurrency;
72 static int max_fds;
73 static uint16_t dest_port;
74 static int keep_alive = FALSE;
75 /*----------------------------------------------------------------------------*/
76 struct wget_stat
77 {
78 uint64_t waits;
79 uint64_t events;
80 uint64_t connects;
81 uint64_t reads;
82 uint64_t writes;
83 uint64_t completes;
84
85 uint64_t errors;
86 uint64_t timedout;
87
88 uint64_t sum_resp_time;
89 uint64_t max_resp_time;
90 };
91 /*----------------------------------------------------------------------------*/
92 struct thread_context
93 {
94 int core;
95
96 mctx_t mctx;
97 int ep;
98 struct wget_vars *wvars;
99
100 int target;
101 int started;
102 int errors;
103 int incompletes;
104 int done;
105 int pending;
106
107 int maxevents;
108 struct mtcp_epoll_event *events;
109
110 struct wget_stat stat;
111 };
112 typedef struct thread_context* thread_context_t;
113 /*----------------------------------------------------------------------------*/
114 struct wget_vars
115 {
116 int request_sent;
117
118 char response[HTTP_HEADER_LEN];
119 int resp_len;
120 int headerset;
121 uint32_t header_len;
122 uint64_t file_len;
123 uint64_t recv;
124 uint64_t write;
125
126 struct timeval t_start;
127 struct timeval t_end;
128
129 int fd;
130 };
131 /*----------------------------------------------------------------------------*/
132 static struct thread_context *g_ctx[MAX_CPUS] = {0};
133 static struct wget_stat *g_stat[MAX_CPUS] = {0};
134 /*----------------------------------------------------------------------------*/
135 static thread_context_t
CreateContext(mctx_t mctx)136 CreateContext(mctx_t mctx)
137 {
138 thread_context_t ctx;
139
140 ctx = (thread_context_t)calloc(1, sizeof(struct thread_context));
141 if (!ctx) {
142 perror("malloc");
143 TRACE_ERROR("Failed to allocate memory for thread context.\n");
144 return NULL;
145 }
146
147 ctx->mctx = mctx;
148 ctx->core = mctx->cpu;
149
150 g_mctx[ctx->core] = mctx;
151
152 return ctx;
153 }
154 /*----------------------------------------------------------------------------*/
155 static void
DestroyContext(thread_context_t ctx)156 DestroyContext(thread_context_t ctx)
157 {
158 g_stat[ctx->core] = NULL;
159 free(ctx);
160 }
161 /*----------------------------------------------------------------------------*/
162 static inline int
CreateConnection(thread_context_t ctx)163 CreateConnection(thread_context_t ctx)
164 {
165 mctx_t mctx = ctx->mctx;
166 struct mtcp_epoll_event ev;
167 struct sockaddr_in addr;
168 int sockid;
169 int ret;
170
171 assert(mctx);
172
173 errno = 0;
174 sockid = mtcp_socket(mctx, AF_INET, SOCK_STREAM, 0);
175 if (sockid < 0) {
176 TRACE_INFO("Failed to create socket! (%s)\n",
177 strerror(errno));
178 return -1;
179 }
180 memset(&ctx->wvars[sockid], 0, sizeof(struct wget_vars));
181 ret = mtcp_setsock_nonblock(mctx, sockid);
182 if (ret < 0) {
183 TRACE_ERROR("Failed to set socket in nonblocking mode.\n");
184 exit(-1);
185 }
186
187 addr.sin_family = AF_INET;
188 addr.sin_addr.s_addr = daddr;
189 addr.sin_port = dport;
190
191 ret = mtcp_connect(mctx, sockid,
192 (struct sockaddr *)&addr, sizeof(struct sockaddr_in));
193 if (ret < 0) {
194 if (errno != EINPROGRESS) {
195 perror("mtcp_connect");
196 mtcp_close(mctx, sockid);
197 return -1;
198 }
199 }
200
201 ctx->started++;
202 ctx->pending++;
203 ctx->stat.connects++;
204
205 ev.events = MOS_EPOLLOUT;
206 ev.data.sock = sockid;
207 mtcp_epoll_ctl(mctx, ctx->ep, MOS_EPOLL_CTL_ADD, sockid, &ev);
208
209 return sockid;
210 }
211 /*----------------------------------------------------------------------------*/
212 static inline void
CloseConnection(thread_context_t ctx,int sockid)213 CloseConnection(thread_context_t ctx, int sockid)
214 {
215 mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_DEL, sockid, NULL);
216 mtcp_close(ctx->mctx, sockid);
217 ctx->pending--;
218 ctx->done++;
219 assert(ctx->pending >= 0);
220 while (/*ctx->pending*/ mtcp_get_connection_cnt(ctx->mctx) < concurrency && ctx->started < ctx->target) {
221 if (CreateConnection(ctx) < 0) {
222 done[ctx->core] = TRUE;
223 break;
224 }
225 }
226 }
227 /*----------------------------------------------------------------------------*/
228 static inline int
SendHTTPRequest(thread_context_t ctx,int sockid,struct wget_vars * wv)229 SendHTTPRequest(thread_context_t ctx, int sockid, struct wget_vars *wv)
230 {
231 char request[HTTP_HEADER_LEN];
232 struct mtcp_epoll_event ev;
233 int wr;
234 int len;
235
236 wv->headerset = FALSE;
237 wv->recv = 0;
238 wv->header_len = wv->file_len = 0;
239
240 if (keep_alive) {
241 snprintf(request, HTTP_HEADER_LEN, "GET %s HTTP/1.0\r\n"
242 "User-Agent: Wget/1.12 (linux-gnu)\r\n"
243 "Accept: */*\r\n"
244 "Host: %s\r\n"
245 "Connection: Keep-Alive\r\n\r\n",
246 path, host);
247 } else {
248 snprintf(request, HTTP_HEADER_LEN, "GET %s HTTP/1.0\r\n"
249 "User-Agent: Wget/1.12 (linux-gnu)\r\n"
250 "Accept: */*\r\n"
251 "Host: %s\r\n"
252 // "Connection: Keep-Alive\r\n\r\n",
253 "Connection: Close\r\n\r\n",
254 path, host);
255 }
256
257 len = strlen(request);
258
259 wr = mtcp_write(ctx->mctx, sockid, request, len);
260 if (wr < len) {
261 TRACE_ERROR("Socket %d: Sending HTTP request failed. "
262 "try: %d, sent: %d\n", sockid, len, wr);
263 CloseConnection(ctx, sockid);
264 }
265 ctx->stat.writes += wr;
266 TRACE_APP("Socket %d HTTP Request of %d bytes. sent.\n", sockid, wr);
267 wv->request_sent = TRUE;
268
269 ev.events = MOS_EPOLLIN;
270 ev.data.sock = sockid;
271 mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_MOD, sockid, &ev);
272
273 gettimeofday(&wv->t_start, NULL);
274
275 char fname[MAX_FILE_LEN + 1];
276 if (fio) {
277 snprintf(fname, MAX_FILE_LEN, "%s.%d", outfile, flowcnt++);
278 wv->fd = open(fname, O_WRONLY | O_CREAT | O_TRUNC, 0644);
279 if (wv->fd < 0) {
280 TRACE_APP("Failed to open file descriptor for %s\n", fname);
281 exit(1);
282 }
283 }
284
285 return 0;
286 }
287 /*----------------------------------------------------------------------------*/
288 static inline int
DownloadNext(thread_context_t ctx,int sockid,struct wget_vars * wv)289 DownloadNext(thread_context_t ctx, int sockid, struct wget_vars *wv)
290 {
291 mctx_t mctx = ctx->mctx;
292 uint64_t tdiff;
293 struct mtcp_epoll_event ev;
294
295 TRACE_APP("Socket %d File download complete!\n", sockid);
296 gettimeofday(&wv->t_end, NULL);
297
298 ctx->stat.completes++;
299
300 if (wv->recv - wv->header_len != wv->file_len) {
301 fprintf(stderr, "Response size mismatch! "
302 "actual recved: %ld, expected to recved: %ld\n",
303 wv->recv-wv->header_len, wv->file_len);
304 }
305
306 tdiff = (wv->t_end.tv_sec - wv->t_start.tv_sec) * 1000000 +
307 (wv->t_end.tv_usec - wv->t_start.tv_usec);
308 TRACE_APP("Socket %d Total received bytes: %lu (%luMB)\n",
309 sockid, wv->recv, wv->recv / 1000000);
310 TRACE_APP("Socket %d Total spent time: %lu us\n", sockid, tdiff);
311 if (tdiff > 0) {
312 TRACE_APP("Socket %d Average bandwidth: %lf[MB/s]\n",
313 sockid, (double)wv->recv / tdiff);
314 }
315 ctx->stat.sum_resp_time += tdiff;
316 if (tdiff > ctx->stat.max_resp_time)
317 ctx->stat.max_resp_time = tdiff;
318
319 ctx->done++;
320
321 /* instead of closing the connection, download the next file */
322 memset(&ctx->wvars[sockid], 0, sizeof(struct wget_vars));
323
324 ctx->started++;
325
326 ev.events = MOS_EPOLLOUT;
327 ev.data.sock = sockid;
328 mtcp_epoll_ctl(mctx, ctx->ep, MOS_EPOLL_CTL_MOD, sockid, &ev);
329
330 return 0;
331 }
332 /*----------------------------------------------------------------------------*/
333 static inline int
DownloadComplete(thread_context_t ctx,int sockid,struct wget_vars * wv)334 DownloadComplete(thread_context_t ctx, int sockid, struct wget_vars *wv)
335 {
336 #ifdef APP
337 mctx_t mctx = ctx->mctx;
338 #endif
339 uint64_t tdiff;
340
341 TRACE_APP("Socket %d File download complete!\n", sockid);
342 gettimeofday(&wv->t_end, NULL);
343 CloseConnection(ctx, sockid);
344 ctx->stat.completes++;
345
346 if (wv->recv - wv->header_len != wv->file_len) {
347 fprintf(stderr, "Response size mismatch! "
348 "actual recved: %ld, expected to recved: %ld\n",
349 wv->recv-wv->header_len, wv->file_len);
350 }
351
352 tdiff = (wv->t_end.tv_sec - wv->t_start.tv_sec) * 1000000 +
353 (wv->t_end.tv_usec - wv->t_start.tv_usec);
354 TRACE_APP("Socket %d Total received bytes: %lu (%luMB)\n",
355 sockid, wv->recv, wv->recv / 1000000);
356 TRACE_APP("Socket %d Total spent time: %lu us\n", sockid, tdiff);
357 if (tdiff > 0) {
358 TRACE_APP("Socket %d Average bandwidth: %lf[MB/s]\n",
359 sockid, (double)wv->recv / tdiff);
360 }
361 ctx->stat.sum_resp_time += tdiff;
362 if (tdiff > ctx->stat.max_resp_time)
363 ctx->stat.max_resp_time = tdiff;
364
365 if (fio && wv->fd > 0)
366 close(wv->fd);
367
368 return 0;
369 }
370 /*----------------------------------------------------------------------------*/
371 static inline int
HandleReadEvent(thread_context_t ctx,int sockid,struct wget_vars * wv)372 HandleReadEvent(thread_context_t ctx, int sockid, struct wget_vars *wv)
373 {
374 mctx_t mctx = ctx->mctx;
375 char buf[BUF_SIZE];
376 char *pbuf;
377 int rd, copy_len;
378
379 rd = 1;
380 while (rd > 0) {
381 rd = mtcp_read(mctx, sockid, buf, BUF_SIZE);
382 if (rd <= 0)
383 break;
384 ctx->stat.reads += rd;
385
386 TRACE_APP("Socket %d: mtcp_read ret: %d, total_recv: %lu, "
387 "header_set: %d, header_len: %u, file_len: %lu\n",
388 sockid, rd, wv->recv + rd,
389 wv->headerset, wv->header_len, wv->file_len);
390
391 pbuf = buf;
392 if (!wv->headerset) {
393 copy_len = MIN(rd, HTTP_HEADER_LEN - wv->resp_len);
394 memcpy(wv->response + wv->resp_len, buf, copy_len);
395 wv->resp_len += copy_len;
396 wv->header_len = find_http_header(wv->response, wv->resp_len);
397 if (wv->header_len > 0) {
398 wv->response[wv->header_len] = '\0';
399 wv->file_len = http_header_long_val(wv->response,
400 CONTENT_LENGTH_HDR, sizeof(CONTENT_LENGTH_HDR) - 1);
401 if (wv->file_len < 0) {
402 /* failed to find for the Content-Length field */
403 wv->recv += rd;
404 rd = 0;
405 CloseConnection(ctx, sockid);
406 return 0;
407 }
408 TRACE_APP("Socket %d Parsed response header. "
409 "Header length: %u, File length: %lu (%luMB)\n",
410 sockid, wv->header_len,
411 wv->file_len, wv->file_len / 1024 / 1024);
412 wv->headerset = TRUE;
413 wv->recv += (rd - (wv->resp_len - wv->header_len));
414
415 pbuf += (rd - (wv->resp_len - wv->header_len));
416 rd = (wv->resp_len - wv->header_len);
417
418 } else {
419 /* failed to parse response header */
420 wv->recv += rd;
421 rd = 0;
422 ctx->stat.errors++;
423 ctx->errors++;
424 CloseConnection(ctx, sockid);
425 return 0;
426 }
427 }
428 wv->recv += rd;
429
430 if (fio && wv->fd > 0) {
431 int wr = 0;
432 while (wr < rd) {
433 int _wr = write(wv->fd, pbuf + wr, rd - wr);
434 assert (_wr == rd - wr);
435 if (_wr < 0) {
436 perror("write");
437 TRACE_ERROR("Failed to write.\n");
438 assert(0);
439 break;
440 }
441 wr += _wr;
442 wv->write += _wr;
443 }
444 }
445
446 if (wv->header_len && (wv->recv >= wv->header_len + wv->file_len)) {
447 break;
448 }
449 }
450
451 if (rd > 0) {
452 if (wv->header_len && (wv->recv >= wv->header_len + wv->file_len)) {
453 TRACE_APP("Socket %d Done Write: "
454 "header: %u file: %lu recv: %lu write: %lu\n",
455 sockid, wv->header_len, wv->file_len,
456 wv->recv - wv->header_len, wv->write);
457
458 if (keep_alive && ctx->started < ctx->target)
459 DownloadNext(ctx, sockid, wv);
460 else
461 DownloadComplete(ctx, sockid, wv);
462
463 return 0;
464 }
465
466 } else if (rd == 0) {
467 /* connection closed by remote host */
468 TRACE_DBG("Socket %d connection closed with server.\n", sockid);
469
470 if (wv->header_len && (wv->recv >= wv->header_len + wv->file_len)) {
471 DownloadComplete(ctx, sockid, wv);
472 } else {
473 ctx->stat.errors++;
474 ctx->incompletes++;
475 CloseConnection(ctx, sockid);
476 }
477
478 } else if (rd < 0) {
479 if (errno != EAGAIN) {
480 TRACE_DBG("Socket %d: mtcp_read() error %s\n",
481 sockid, strerror(errno));
482 ctx->stat.errors++;
483 ctx->errors++;
484 CloseConnection(ctx, sockid);
485 }
486 }
487
488 return 0;
489 }
490 /*----------------------------------------------------------------------------*/
491 static void
PrintStats()492 PrintStats()
493 {
494 struct wget_stat total = {0};
495 struct wget_stat *st;
496 uint64_t avg_resp_time;
497 uint64_t total_resp_time = 0;
498 int i;
499
500 for (i = 0; i < core_limit; i++) {
501 st = g_stat[i];
502 if (!st)
503 continue;
504 avg_resp_time = st->completes? st->sum_resp_time / st->completes : 0;
505
506 total.waits += st->waits;
507 total.events += st->events;
508 total.connects += st->connects;
509 total.reads += st->reads;
510 total.writes += st->writes;
511 total.completes += st->completes;
512 total_resp_time += avg_resp_time;
513 if (st->max_resp_time > total.max_resp_time)
514 total.max_resp_time = st->max_resp_time;
515 total.errors += st->errors;
516 total.timedout += st->timedout;
517
518 memset(st, 0, sizeof(struct wget_stat));
519 }
520 fprintf(stderr, "[ ALL ] connect: %7lu, read: %4lu MB, write: %4lu MB, "
521 "completes: %7lu (resp_time avg: %4lu, max: %6lu us)\n",
522 total.connects,
523 total.reads / 1024 / 1024, total.writes / 1024 / 1024,
524 total.completes, total_resp_time / core_limit, total.max_resp_time);
525 }
526 /*----------------------------------------------------------------------------*/
527 static void
GlbInitWget()528 GlbInitWget()
529 {
530 struct mtcp_conf mcfg;
531 char *url;
532 int total_concurrency = 0;
533 int flow_per_thread;
534 int flow_remainder_cnt;
535 int i;
536
537 num_cores = GetNumCPUs();
538 core_limit = num_cores;
539 concurrency = 100;
540 total_flows = -1;
541
542 LoadConfig(CONFIG_FILE, g_conf, NUM_CONF_VAR);
543
544 url = g_conf[0].value;
545 total_flows = atoi(g_conf[1].value);
546 core_limit = atoi(g_conf[2].value);
547 total_concurrency = atoi(g_conf[3].value);
548 dest_port = atoi(g_conf[4].value);
549 keep_alive = atoi(g_conf[5].value);
550
551 if ((strlen(url) == 0)
552 || (strlen(url) > MAX_URL_LEN)
553 || (total_flows <= 0)) {
554 TRACE_INFO("Invalid configuration\n");
555 exit(0);
556 }
557
558 char* slash_p = strchr(url, '/');
559 if (slash_p) {
560 strncpy(host, url, slash_p - url);
561 strncpy(path, strchr(url, '/'), MAX_URL_LEN);
562 } else {
563 strncpy(host, url, MAX_IP_STR_LEN);
564 strncpy(path, "/", 1);
565 }
566
567 daddr = inet_addr(host);
568 dport = (dest_port == 0) ? htons(PORT_NUM) : htons(dest_port);
569 saddr = INADDR_ANY;
570
571 if (total_flows < core_limit) {
572 core_limit = total_flows;
573 }
574
575 /* per-core concurrency = total_concurrency / # cores */
576 if (total_concurrency > 0)
577 concurrency = total_concurrency / core_limit;
578
579 /* set the max number of fds 3x larger than concurrency */
580 max_fds = concurrency * 3;
581
582 TRACE_CONFIG("Application configuration:\n");
583 TRACE_CONFIG("URL: %s\n", path);
584 TRACE_CONFIG("# of total_flows: %d\n", total_flows);
585 TRACE_CONFIG("# of cores: %d\n", core_limit);
586 TRACE_CONFIG("Concurrency: %d\n", total_concurrency);
587
588 mtcp_getconf(&mcfg);
589 mcfg.max_concurrency = max_fds;
590 mcfg.max_num_buffers = max_fds;
591 mtcp_setconf(&mcfg);
592
593 flow_per_thread = total_flows / core_limit;
594 flow_remainder_cnt = total_flows % core_limit;
595
596 for (i = 0; i < MAX_CPUS; i++) {
597 done[i] = FALSE;
598 flows[i] = flow_per_thread;
599
600 if (flow_remainder_cnt-- > 0)
601 flows[i]++;
602 }
603
604 return;
605 }
606 /*----------------------------------------------------------------------------*/
607 static void
InitWget(mctx_t mctx,void ** app_ctx)608 InitWget(mctx_t mctx, void **app_ctx)
609 {
610 thread_context_t ctx;
611 int ep;
612
613 assert(mctx);
614
615 int core = mctx->cpu;
616
617 ctx = CreateContext(mctx);
618 if (!ctx)
619 exit(-1);
620 g_ctx[core] = ctx;
621 g_stat[core] = &ctx->stat;
622 srand(time(NULL));
623
624 mtcp_init_rss(mctx, saddr, IP_RANGE, daddr, dport);
625
626 if (flows[core] == 0) {
627 TRACE_DBG("Application thread %d finished.\n", core);
628 exit(-1);
629 }
630 ctx->target = flows[core];
631
632 /* Initialization */
633 ctx->maxevents = max_fds * 3;
634 ep = mtcp_epoll_create(mctx, ctx->maxevents);
635 if (ep < 0) {
636 TRACE_ERROR("Failed to create epoll struct!n");
637 exit(EXIT_FAILURE);
638 }
639 ctx->events = (struct mtcp_epoll_event *)
640 calloc(ctx->maxevents, sizeof(struct mtcp_epoll_event));
641 if (!ctx->events) {
642 TRACE_ERROR("Failed to allocate events!\n");
643 exit(EXIT_FAILURE);
644 }
645 ctx->ep = ep;
646
647 ctx->wvars = (struct wget_vars *)calloc(max_fds, sizeof(struct wget_vars));
648 if (!ctx->wvars) {
649 TRACE_ERROR("Failed to create wget variables!\n");
650 exit(EXIT_FAILURE);
651 }
652
653 ctx->started = ctx->done = ctx->pending = 0;
654 ctx->errors = ctx->incompletes = 0;
655
656 *app_ctx = ctx;
657
658 return;
659 }
660 /*----------------------------------------------------------------------------*/
661 static void
RunWget(mctx_t mctx,void ** app_ctx)662 RunWget(mctx_t mctx, void **app_ctx)
663 {
664 struct in_addr daddr_in;
665 struct timeval cur_tv, prev_tv;
666 int nevents;
667 int i;
668
669 assert(mctx);
670 assert(*app_ctx);
671
672 thread_context_t ctx = *app_ctx;
673 int core = ctx->core;
674
675 daddr_in.s_addr = daddr;
676 fprintf(stderr, "Thread %d handles %d flows. connecting to %s:%u\n",
677 core, flows[core], inet_ntoa(daddr_in), ntohs(dport));
678
679 gettimeofday(&cur_tv, NULL);
680 prev_tv = cur_tv;
681
682 while (!done[core]) {
683 gettimeofday(&cur_tv, NULL);
684
685 /* print statistics every second */
686 if (core == 0 && cur_tv.tv_sec > prev_tv.tv_sec) {
687 PrintStats();
688 prev_tv = cur_tv;
689 }
690
691 while (/*ctx->pending*/ mtcp_get_connection_cnt(ctx->mctx) < concurrency && ctx->started < ctx->target) {
692 if (CreateConnection(ctx) < 0) {
693 done[core] = TRUE;
694 break;
695 }
696 }
697
698 nevents = mtcp_epoll_wait(mctx, ctx->ep,
699 ctx->events, ctx->maxevents, ctx->pending ? -1 : 10);
700 ctx->stat.waits++;
701
702 if (nevents < 0) {
703 if (errno != EINTR) {
704 TRACE_ERROR("mtcp_epoll_wait failed! ret: %d\n", nevents);
705 }
706 done[core] = TRUE;
707 break;
708 } else {
709 ctx->stat.events += nevents;
710 }
711
712 for (i = 0; i < nevents; i++) {
713
714 if (ctx->events[i].events & MOS_EPOLLERR) {
715 int err;
716 socklen_t len = sizeof(err);
717
718 TRACE_APP("[CPU %d] Error on socket %d\n",
719 core, ctx->events[i].data.sockid);
720 ctx->stat.errors++;
721 ctx->errors++;
722 if (mtcp_getsockopt(mctx, ctx->events[i].data.sock,
723 SOL_SOCKET, SO_ERROR, (void *)&err, &len) == 0) {
724 if (err == ETIMEDOUT)
725 ctx->stat.timedout++;
726 }
727 CloseConnection(ctx, ctx->events[i].data.sock);
728
729 } else if (ctx->events[i].events & MOS_EPOLLIN) {
730 HandleReadEvent(ctx,
731 ctx->events[i].data.sock,
732 &ctx->wvars[ctx->events[i].data.sock]);
733
734 } else if (ctx->events[i].events == MOS_EPOLLOUT) {
735 struct wget_vars *wv = &ctx->wvars[ctx->events[i].data.sock];
736
737 if (!wv->request_sent) {
738 SendHTTPRequest(ctx, ctx->events[i].data.sock, wv);
739 } else {
740 //TRACE_DBG("Request already sent.\n");
741 }
742
743 } else {
744 TRACE_ERROR("Socket %d: event: %s\n",
745 ctx->events[i].data.sock,
746 EventToString(ctx->events[i].events));
747 assert(0);
748 }
749 }
750
751 if (ctx->done >= ctx->target) {
752 fprintf(stdout, "Completed %d connections, "
753 "errors: %d incompletes: %d\n",
754 ctx->done, ctx->errors, ctx->incompletes);
755 break;
756 }
757 }
758
759 TRACE_INFO("Wget thread %d waiting for mtcp to be destroyed.\n", core);
760
761 g_stat[core] = NULL;
762 g_ctx[core] = NULL;
763 DestroyContext(ctx);
764
765 return;
766 }
767 /*----------------------------------------------------------------------------*/
768 void
RunApplication(mctx_t mctx)769 RunApplication(mctx_t mctx)
770 {
771 void *app_ctx;
772
773 app_ctx = (void *)calloc(1, sizeof(void *));
774 if (!app_ctx) {
775 TRACE_ERROR("calloc failure\n");
776 return;
777 }
778
779 TRACE_INFO("run application on core %d\n", mctx->cpu);
780 InitWget(mctx, &(app_ctx));
781 RunWget(mctx, &(app_ctx));
782 }
783 /*----------------------------------------------------------------------------*/
784 void *
RunMTCP(void * arg)785 RunMTCP(void *arg)
786 {
787 int core = *(int *)arg;
788 mctx_t mctx;
789
790 mtcp_core_affinitize(core);
791
792 /* mTCP Initialization */
793 mctx = mtcp_create_context(core);
794 if (!mctx) {
795 pthread_exit(NULL);
796 TRACE_ERROR("Failed to craete mtcp context.\n");
797 return NULL;
798 }
799
800 /* Run application here */
801 RunApplication(mctx);
802
803 /* mTCP Tear Down */
804 mtcp_destroy_context(mctx);
805 pthread_exit(NULL);
806
807 return NULL;
808 }
809 /*----------------------------------------------------------------------------*/
810 int
main(int argc,char ** argv)811 main(int argc, char **argv)
812 {
813 int ret, i;
814 int cores[MAX_CPUS];
815 char *fname = "config/mos.conf";
816
817 int opt;
818 while ((opt = getopt(argc, argv, "f:")) != -1) {
819 switch (opt) {
820 case 'f':
821 fname = optarg;
822 break;
823 default:
824 printf("Usage: %s [-f config_file]\n", argv[0]);
825 return 0;
826 }
827
828 }
829
830 core_limit = sysconf(_SC_NPROCESSORS_ONLN);
831
832 ret = mtcp_init(fname);
833 if (ret) {
834 TRACE_ERROR("Failed to initialize mtcp.\n");
835 exit(EXIT_FAILURE);
836 }
837
838 mtcp_getconf(&g_mcfg);
839
840 core_limit = g_mcfg.num_cores;
841
842 GlbInitWget();
843
844 for (i = 0; i < core_limit; i++) {
845 cores[i] = i;
846
847 /* Run mtcp thread */
848 if ((g_mcfg.cpu_mask & (1L << i)) &&
849 pthread_create(&mtcp_thread[i], NULL, RunMTCP, (void *)&cores[i])) {
850 perror("pthread_create");
851 TRACE_ERROR("Failed to create msg_test thread.\n");
852 exit(-1);
853 }
854 }
855
856 for (i = 0; i < core_limit; i++) {
857 if (g_mcfg.cpu_mask & (1L << i))
858 pthread_join(mtcp_thread[i], NULL);
859 TRACE_INFO("Message test thread %d joined.\n", i);
860 }
861
862 mtcp_destroy();
863 return 0;
864 }
865 /*----------------------------------------------------------------------------*/
866