1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <string.h> 4 #include <unistd.h> 5 #include <stdint.h> 6 #include <time.h> 7 #include <sys/time.h> 8 #include <sys/types.h> 9 #include <sys/stat.h> 10 #include <fcntl.h> 11 #include <pthread.h> 12 #include <signal.h> 13 #include <sys/socket.h> 14 #include <netinet/in.h> 15 #include <arpa/inet.h> 16 #include <sys/queue.h> 17 #include <assert.h> 18 19 #include <mos_api.h> 20 #include "cpu.h" 21 #include "rss.h" 22 #include "http_parsing.h" 23 #include "debug.h" 24 #include "applib.h" 25 26 #define CONFIG_FILE "config/epwget.conf" 27 static struct conf_var g_conf[] = { 28 { "url", {0} }, 29 { "total_flows", {0} }, 30 { "core_limit", {0} }, 31 { "total_concurrency", {0} }, 32 { "dest_port", {0} }, 33 }; 34 #define NUM_CONF_VAR (sizeof(g_conf) / sizeof(struct conf_var)) 35 36 #define PORT_NUM 80 37 //#define PORT_NUM 3333 38 39 #define MAX_URL_LEN 128 40 #define MAX_FILE_LEN 128 41 #define HTTP_HEADER_LEN 1024 42 43 #define IP_RANGE 1 44 #define MAX_IP_STR_LEN 16 45 46 #define BUF_SIZE (8*1024) 47 48 /*----------------------------------------------------------------------------*/ 49 struct mtcp_conf g_mcfg; 50 /*----------------------------------------------------------------------------*/ 51 static mctx_t g_mctx[MAX_CPUS]; 52 static int done[MAX_CPUS]; 53 /*----------------------------------------------------------------------------*/ 54 static int num_cores; 55 static int core_limit; 56 /*----------------------------------------------------------------------------*/ 57 static int fio = FALSE; 58 static char outfile[MAX_FILE_LEN + 1]; 59 /*----------------------------------------------------------------------------*/ 60 static char host[MAX_IP_STR_LEN + 1]; 61 static char path[MAX_URL_LEN + 1]; 62 static in_addr_t daddr; 63 static in_port_t dport; 64 static in_addr_t saddr; 65 /*----------------------------------------------------------------------------*/ 66 static int total_flows; 67 static int flows[MAX_CPUS]; 68 static int flowcnt = 0; 69 static int concurrency; 70 static int max_fds; 71 static uint16_t dest_port; 72 /*----------------------------------------------------------------------------*/ 73 struct wget_stat 74 { 75 uint64_t waits; 76 uint64_t events; 77 uint64_t connects; 78 uint64_t reads; 79 uint64_t writes; 80 uint64_t completes; 81 82 uint64_t errors; 83 uint64_t timedout; 84 85 uint64_t sum_resp_time; 86 uint64_t max_resp_time; 87 }; 88 /*----------------------------------------------------------------------------*/ 89 struct thread_context 90 { 91 int core; 92 93 mctx_t mctx; 94 int ep; 95 struct wget_vars *wvars; 96 97 int target; 98 int started; 99 int errors; 100 int incompletes; 101 int done; 102 int pending; 103 104 int maxevents; 105 struct mtcp_epoll_event *events; 106 107 struct wget_stat stat; 108 }; 109 typedef struct thread_context* thread_context_t; 110 /*----------------------------------------------------------------------------*/ 111 struct wget_vars 112 { 113 int request_sent; 114 115 char response[HTTP_HEADER_LEN]; 116 int resp_len; 117 int headerset; 118 uint32_t header_len; 119 uint64_t file_len; 120 uint64_t recv; 121 uint64_t write; 122 123 struct timeval t_start; 124 struct timeval t_end; 125 126 int fd; 127 }; 128 /*----------------------------------------------------------------------------*/ 129 static struct thread_context *g_ctx[MAX_CPUS]; 130 static struct wget_stat *g_stat[MAX_CPUS]; 131 /*----------------------------------------------------------------------------*/ 132 static thread_context_t 133 CreateContext(mctx_t mctx) 134 { 135 thread_context_t ctx; 136 137 ctx = (thread_context_t)calloc(1, sizeof(struct thread_context)); 138 if (!ctx) { 139 perror("malloc"); 140 TRACE_ERROR("Failed to allocate memory for thread context.\n"); 141 return NULL; 142 } 143 144 ctx->mctx = mctx; 145 ctx->core = mctx->cpu; 146 147 g_mctx[ctx->core] = mctx; 148 149 return ctx; 150 } 151 /*----------------------------------------------------------------------------*/ 152 static void 153 DestroyContext(thread_context_t ctx) 154 { 155 free(ctx); 156 } 157 /*----------------------------------------------------------------------------*/ 158 static inline int 159 CreateConnection(thread_context_t ctx) 160 { 161 mctx_t mctx = ctx->mctx; 162 struct mtcp_epoll_event ev; 163 struct sockaddr_in addr; 164 int sockid; 165 int ret; 166 167 assert(mctx); 168 169 errno = 0; 170 sockid = mtcp_socket(mctx, AF_INET, SOCK_STREAM, 0); 171 if (sockid < 0) { 172 TRACE_INFO("Failed to create socket! (%s)\n", 173 strerror(errno)); 174 return -1; 175 } 176 memset(&ctx->wvars[sockid], 0, sizeof(struct wget_vars)); 177 ret = mtcp_setsock_nonblock(mctx, sockid); 178 if (ret < 0) { 179 TRACE_ERROR("Failed to set socket in nonblocking mode.\n"); 180 exit(-1); 181 } 182 183 addr.sin_family = AF_INET; 184 addr.sin_addr.s_addr = daddr; 185 addr.sin_port = dport; 186 187 ret = mtcp_connect(mctx, sockid, 188 (struct sockaddr *)&addr, sizeof(struct sockaddr_in)); 189 if (ret < 0) { 190 if (errno != EINPROGRESS) { 191 perror("mtcp_connect"); 192 mtcp_close(mctx, sockid); 193 return -1; 194 } 195 } 196 197 ctx->started++; 198 ctx->pending++; 199 ctx->stat.connects++; 200 201 ev.events = MOS_EPOLLOUT; 202 ev.data.sock = sockid; 203 mtcp_epoll_ctl(mctx, ctx->ep, MOS_EPOLL_CTL_ADD, sockid, &ev); 204 205 return sockid; 206 } 207 /*----------------------------------------------------------------------------*/ 208 static inline void 209 CloseConnection(thread_context_t ctx, int sockid) 210 { 211 mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_DEL, sockid, NULL); 212 mtcp_close(ctx->mctx, sockid); 213 ctx->pending--; 214 ctx->done++; 215 assert(ctx->pending >= 0); 216 while (/*ctx->pending*/ mtcp_get_connection_cnt(ctx->mctx) < concurrency && ctx->started < ctx->target) { 217 if (CreateConnection(ctx) < 0) { 218 done[ctx->core] = TRUE; 219 break; 220 } 221 } 222 } 223 /*----------------------------------------------------------------------------*/ 224 static inline int 225 SendHTTPRequest(thread_context_t ctx, int sockid, struct wget_vars *wv) 226 { 227 char request[HTTP_HEADER_LEN]; 228 struct mtcp_epoll_event ev; 229 int wr; 230 int len; 231 232 wv->headerset = FALSE; 233 wv->recv = 0; 234 wv->header_len = wv->file_len = 0; 235 236 snprintf(request, HTTP_HEADER_LEN, "GET %s HTTP/1.0\r\n" 237 "User-Agent: Wget/1.12 (linux-gnu)\r\n" 238 "Accept: */*\r\n" 239 "Host: %s\r\n" 240 // "Connection: Keep-Alive\r\n\r\n", 241 "Connection: Close\r\n\r\n", 242 path, host); 243 len = strlen(request); 244 245 wr = mtcp_write(ctx->mctx, sockid, request, len); 246 if (wr < len) { 247 TRACE_ERROR("Socket %d: Sending HTTP request failed. " 248 "try: %d, sent: %d\n", sockid, len, wr); 249 CloseConnection(ctx, sockid); 250 } 251 ctx->stat.writes += wr; 252 TRACE_APP("Socket %d HTTP Request of %d bytes. sent.\n", sockid, wr); 253 wv->request_sent = TRUE; 254 255 ev.events = MOS_EPOLLIN; 256 ev.data.sock = sockid; 257 mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_MOD, sockid, &ev); 258 259 gettimeofday(&wv->t_start, NULL); 260 261 char fname[MAX_FILE_LEN + 1]; 262 if (fio) { 263 snprintf(fname, MAX_FILE_LEN, "%s.%d", outfile, flowcnt++); 264 wv->fd = open(fname, O_WRONLY | O_CREAT | O_TRUNC, 0644); 265 if (wv->fd < 0) { 266 TRACE_APP("Failed to open file descriptor for %s\n", fname); 267 exit(1); 268 } 269 } 270 271 return 0; 272 } 273 /*----------------------------------------------------------------------------*/ 274 static inline int 275 DownloadComplete(thread_context_t ctx, int sockid, struct wget_vars *wv) 276 { 277 #ifdef APP 278 mctx_t mctx = ctx->mctx; 279 #endif 280 uint64_t tdiff; 281 282 TRACE_APP("Socket %d File download complete!\n", sockid); 283 gettimeofday(&wv->t_end, NULL); 284 CloseConnection(ctx, sockid); 285 ctx->stat.completes++; 286 287 if (wv->recv - wv->header_len != wv->file_len) { 288 fprintf(stderr, "Response size mismatch! " 289 "actual recved: %ld, expected to recved: %ld\n", 290 wv->recv-wv->header_len, wv->file_len); 291 } 292 293 tdiff = (wv->t_end.tv_sec - wv->t_start.tv_sec) * 1000000 + 294 (wv->t_end.tv_usec - wv->t_start.tv_usec); 295 TRACE_APP("Socket %d Total received bytes: %lu (%luMB)\n", 296 sockid, wv->recv, wv->recv / 1000000); 297 TRACE_APP("Socket %d Total spent time: %lu us\n", sockid, tdiff); 298 if (tdiff > 0) { 299 TRACE_APP("Socket %d Average bandwidth: %lf[MB/s]\n", 300 sockid, (double)wv->recv / tdiff); 301 } 302 ctx->stat.sum_resp_time += tdiff; 303 if (tdiff > ctx->stat.max_resp_time) 304 ctx->stat.max_resp_time = tdiff; 305 306 if (fio && wv->fd > 0) 307 close(wv->fd); 308 309 return 0; 310 } 311 /*----------------------------------------------------------------------------*/ 312 static inline int 313 HandleReadEvent(thread_context_t ctx, int sockid, struct wget_vars *wv) 314 { 315 mctx_t mctx = ctx->mctx; 316 char buf[BUF_SIZE]; 317 char *pbuf; 318 int rd, copy_len; 319 320 rd = 1; 321 while (rd > 0) { 322 rd = mtcp_read(mctx, sockid, buf, BUF_SIZE); 323 if (rd <= 0) 324 break; 325 ctx->stat.reads += rd; 326 327 TRACE_APP("Socket %d: mtcp_read ret: %d, total_recv: %lu, " 328 "header_set: %d, header_len: %u, file_len: %lu\n", 329 sockid, rd, wv->recv + rd, 330 wv->headerset, wv->header_len, wv->file_len); 331 332 pbuf = buf; 333 if (!wv->headerset) { 334 copy_len = MIN(rd, HTTP_HEADER_LEN - wv->resp_len); 335 memcpy(wv->response + wv->resp_len, buf, copy_len); 336 wv->resp_len += copy_len; 337 wv->header_len = find_http_header(wv->response, wv->resp_len); 338 if (wv->header_len > 0) { 339 wv->response[wv->header_len] = '\0'; 340 wv->file_len = http_header_long_val(wv->response, 341 CONTENT_LENGTH_HDR, sizeof(CONTENT_LENGTH_HDR) - 1); 342 TRACE_APP("Socket %d Parsed response header. " 343 "Header length: %u, File length: %lu (%luMB)\n", 344 sockid, wv->header_len, 345 wv->file_len, wv->file_len / 1024 / 1024); 346 wv->headerset = TRUE; 347 wv->recv += (rd - (wv->resp_len - wv->header_len)); 348 349 pbuf += (rd - (wv->resp_len - wv->header_len)); 350 rd = (wv->resp_len - wv->header_len); 351 352 } else { 353 /* failed to parse response header */ 354 wv->recv += rd; 355 rd = 0; 356 ctx->stat.errors++; 357 ctx->errors++; 358 CloseConnection(ctx, sockid); 359 return 0; 360 } 361 } 362 wv->recv += rd; 363 364 if (fio && wv->fd > 0) { 365 int wr = 0; 366 while (wr < rd) { 367 int _wr = write(wv->fd, pbuf + wr, rd - wr); 368 assert (_wr == rd - wr); 369 if (_wr < 0) { 370 perror("write"); 371 TRACE_ERROR("Failed to write.\n"); 372 assert(0); 373 break; 374 } 375 wr += _wr; 376 wv->write += _wr; 377 } 378 } 379 380 if (wv->header_len && (wv->recv >= wv->header_len + wv->file_len)) { 381 break; 382 } 383 } 384 385 if (rd > 0) { 386 if (wv->header_len && (wv->recv >= wv->header_len + wv->file_len)) { 387 TRACE_APP("Socket %d Done Write: " 388 "header: %u file: %lu recv: %lu write: %lu\n", 389 sockid, wv->header_len, wv->file_len, 390 wv->recv - wv->header_len, wv->write); 391 DownloadComplete(ctx, sockid, wv); 392 393 return 0; 394 } 395 396 } else if (rd == 0) { 397 /* connection closed by remote host */ 398 TRACE_DBG("Socket %d connection closed with server.\n", sockid); 399 400 if (wv->header_len && (wv->recv >= wv->header_len + wv->file_len)) { 401 DownloadComplete(ctx, sockid, wv); 402 } else { 403 ctx->stat.errors++; 404 ctx->incompletes++; 405 CloseConnection(ctx, sockid); 406 } 407 408 } else if (rd < 0) { 409 if (errno != EAGAIN) { 410 TRACE_DBG("Socket %d: mtcp_read() error %s\n", 411 sockid, strerror(errno)); 412 ctx->stat.errors++; 413 ctx->errors++; 414 CloseConnection(ctx, sockid); 415 } 416 } 417 418 return 0; 419 } 420 /*----------------------------------------------------------------------------*/ 421 static void 422 PrintStats() 423 { 424 struct wget_stat total = {0}; 425 struct wget_stat *st; 426 uint64_t avg_resp_time; 427 uint64_t total_resp_time = 0; 428 int i; 429 430 for (i = 0; i < core_limit; i++) { 431 st = g_stat[i]; 432 if (!st) 433 continue; 434 avg_resp_time = st->completes? st->sum_resp_time / st->completes : 0; 435 436 total.waits += st->waits; 437 total.events += st->events; 438 total.connects += st->connects; 439 total.reads += st->reads; 440 total.writes += st->writes; 441 total.completes += st->completes; 442 total_resp_time += avg_resp_time; 443 if (st->max_resp_time > total.max_resp_time) 444 total.max_resp_time = st->max_resp_time; 445 total.errors += st->errors; 446 total.timedout += st->timedout; 447 448 memset(st, 0, sizeof(struct wget_stat)); 449 } 450 fprintf(stderr, "[ ALL ] connect: %7lu, read: %4lu MB, write: %4lu MB, " 451 "completes: %7lu (resp_time avg: %4lu, max: %6lu us)\n", 452 total.connects, 453 total.reads / 1000 / 1000, total.writes / 1000 / 1000, 454 total.completes, total_resp_time / core_limit, total.max_resp_time); 455 } 456 /*----------------------------------------------------------------------------*/ 457 static void 458 GlbInitWget() 459 { 460 struct mtcp_conf mcfg; 461 char *url; 462 int total_concurrency = 0; 463 int flow_per_thread; 464 int flow_remainder_cnt; 465 int i; 466 467 num_cores = GetNumCPUs(); 468 core_limit = num_cores; 469 concurrency = 100; 470 total_flows = -1; 471 472 LoadConfig(CONFIG_FILE, g_conf, NUM_CONF_VAR); 473 474 url = g_conf[0].value; 475 total_flows = atoi(g_conf[1].value); 476 core_limit = atoi(g_conf[2].value); 477 total_concurrency = atoi(g_conf[3].value); 478 dest_port = atoi(g_conf[4].value); 479 480 if ((strlen(url) == 0) 481 || (strlen(url) > MAX_URL_LEN) 482 || (total_flows <= 0)) { 483 TRACE_INFO("Invalid configuration\n"); 484 exit(0); 485 } 486 487 char* slash_p = strchr(url, '/'); 488 if (slash_p) { 489 strncpy(host, url, slash_p - url); 490 strncpy(path, strchr(url, '/'), MAX_URL_LEN); 491 } else { 492 strncpy(host, url, MAX_IP_STR_LEN); 493 strncpy(path, "/", 1); 494 } 495 496 daddr = inet_addr(host); 497 dport = (dest_port == 0) ? htons(PORT_NUM) : htons(dest_port); 498 saddr = INADDR_ANY; 499 500 if (total_flows < core_limit) { 501 core_limit = total_flows; 502 } 503 504 /* per-core concurrency = total_concurrency / # cores */ 505 if (total_concurrency > 0) 506 concurrency = total_concurrency / core_limit; 507 508 /* set the max number of fds 3x larger than concurrency */ 509 max_fds = concurrency * 3; 510 511 TRACE_CONFIG("Application configuration:\n"); 512 TRACE_CONFIG("URL: %s\n", path); 513 TRACE_CONFIG("# of total_flows: %d\n", total_flows); 514 TRACE_CONFIG("# of cores: %d\n", core_limit); 515 TRACE_CONFIG("Concurrency: %d\n", total_concurrency); 516 517 mtcp_getconf(&mcfg); 518 mcfg.max_concurrency = max_fds; 519 mcfg.max_num_buffers = max_fds; 520 mtcp_setconf(&mcfg); 521 522 flow_per_thread = total_flows / core_limit; 523 flow_remainder_cnt = total_flows % core_limit; 524 525 for (i = 0; i < MAX_CPUS; i++) { 526 done[i] = FALSE; 527 flows[i] = flow_per_thread; 528 529 if (flow_remainder_cnt-- > 0) 530 flows[i]++; 531 } 532 533 return; 534 } 535 /*----------------------------------------------------------------------------*/ 536 static void 537 InitWget(mctx_t mctx, void **app_ctx) 538 { 539 thread_context_t ctx; 540 int ep; 541 542 assert(mctx); 543 544 int core = mctx->cpu; 545 546 ctx = CreateContext(mctx); 547 if (!ctx) 548 exit(-1); 549 g_ctx[core] = ctx; 550 g_stat[core] = &ctx->stat; 551 srand(time(NULL)); 552 553 mtcp_init_rss(mctx, saddr, IP_RANGE, daddr, dport); 554 555 if (flows[core] == 0) { 556 TRACE_DBG("Application thread %d finished.\n", core); 557 exit(-1); 558 } 559 ctx->target = flows[core]; 560 561 /* Initialization */ 562 ctx->maxevents = max_fds * 3; 563 ep = mtcp_epoll_create(mctx, ctx->maxevents); 564 if (ep < 0) { 565 TRACE_ERROR("Failed to create epoll struct!n"); 566 exit(EXIT_FAILURE); 567 } 568 ctx->events = (struct mtcp_epoll_event *) 569 calloc(ctx->maxevents, sizeof(struct mtcp_epoll_event)); 570 if (!ctx->events) { 571 TRACE_ERROR("Failed to allocate events!\n"); 572 exit(EXIT_FAILURE); 573 } 574 ctx->ep = ep; 575 576 ctx->wvars = (struct wget_vars *)calloc(max_fds, sizeof(struct wget_vars)); 577 if (!ctx->wvars) { 578 TRACE_ERROR("Failed to create wget variables!\n"); 579 exit(EXIT_FAILURE); 580 } 581 582 ctx->started = ctx->done = ctx->pending = 0; 583 ctx->errors = ctx->incompletes = 0; 584 585 *app_ctx = ctx; 586 587 return; 588 } 589 /*----------------------------------------------------------------------------*/ 590 static void 591 RunWget(mctx_t mctx, void **app_ctx) 592 { 593 struct in_addr daddr_in; 594 struct timeval cur_tv, prev_tv; 595 int nevents; 596 int i; 597 598 assert(mctx); 599 assert(*app_ctx); 600 601 thread_context_t ctx = *app_ctx; 602 int core = ctx->core; 603 604 daddr_in.s_addr = daddr; 605 fprintf(stderr, "Thread %d handles %d flows. connecting to %s:%u\n", 606 core, flows[core], inet_ntoa(daddr_in), ntohs(dport)); 607 608 gettimeofday(&cur_tv, NULL); 609 prev_tv = cur_tv; 610 611 while (!done[core]) { 612 gettimeofday(&cur_tv, NULL); 613 614 /* print statistics every second */ 615 if (core == 0 && cur_tv.tv_sec > prev_tv.tv_sec) { 616 PrintStats(); 617 prev_tv = cur_tv; 618 } 619 620 while (/*ctx->pending*/ mtcp_get_connection_cnt(ctx->mctx) < concurrency && ctx->started < ctx->target) { 621 if (CreateConnection(ctx) < 0) { 622 done[core] = TRUE; 623 break; 624 } 625 } 626 627 nevents = mtcp_epoll_wait(mctx, ctx->ep, 628 ctx->events, ctx->maxevents, ctx->pending ? -1 : 10); 629 ctx->stat.waits++; 630 631 if (nevents < 0) { 632 if (errno != EINTR) { 633 TRACE_ERROR("mtcp_epoll_wait failed! ret: %d\n", nevents); 634 } 635 done[core] = TRUE; 636 break; 637 } else { 638 ctx->stat.events += nevents; 639 } 640 641 for (i = 0; i < nevents; i++) { 642 643 if (ctx->events[i].events & MOS_EPOLLERR) { 644 int err; 645 socklen_t len = sizeof(err); 646 647 TRACE_APP("[CPU %d] Error on socket %d\n", 648 core, ctx->events[i].data.sockid); 649 ctx->stat.errors++; 650 ctx->errors++; 651 if (mtcp_getsockopt(mctx, ctx->events[i].data.sock, 652 SOL_SOCKET, SO_ERROR, (void *)&err, &len) == 0) { 653 if (err == ETIMEDOUT) 654 ctx->stat.timedout++; 655 } 656 CloseConnection(ctx, ctx->events[i].data.sock); 657 658 } else if (ctx->events[i].events & MOS_EPOLLIN) { 659 HandleReadEvent(ctx, 660 ctx->events[i].data.sock, 661 &ctx->wvars[ctx->events[i].data.sock]); 662 663 } else if (ctx->events[i].events == MOS_EPOLLOUT) { 664 struct wget_vars *wv = &ctx->wvars[ctx->events[i].data.sock]; 665 666 if (!wv->request_sent) { 667 SendHTTPRequest(ctx, ctx->events[i].data.sock, wv); 668 } else { 669 //TRACE_DBG("Request already sent.\n"); 670 } 671 672 } else { 673 TRACE_ERROR("Socket %d: event: %s\n", 674 ctx->events[i].data.sock, 675 EventToString(ctx->events[i].events)); 676 assert(0); 677 } 678 } 679 680 if (ctx->done >= ctx->target) { 681 fprintf(stdout, "Completed %d connections, " 682 "errors: %d incompletes: %d\n", 683 ctx->done, ctx->errors, ctx->incompletes); 684 break; 685 } 686 } 687 688 TRACE_INFO("Wget thread %d waiting for mtcp to be destroyed.\n", core); 689 690 g_stat[core] = NULL; 691 g_ctx[core] = NULL; 692 DestroyContext(ctx); 693 694 return; 695 } 696 /*----------------------------------------------------------------------------*/ 697 void 698 RunApplication(mctx_t mctx) 699 { 700 void *app_ctx; 701 702 app_ctx = (void *)calloc(1, sizeof(void *)); 703 if (!app_ctx) { 704 TRACE_ERROR("calloc failure\n"); 705 return; 706 } 707 708 TRACE_INFO("run application on core %d\n", mctx->cpu); 709 InitWget(mctx, &(app_ctx)); 710 RunWget(mctx, &(app_ctx)); 711 } 712 /*----------------------------------------------------------------------------*/ 713 void * 714 RunMTCP(void *arg) 715 { 716 int core = *(int *)arg; 717 mctx_t mctx; 718 719 mtcp_core_affinitize(core); 720 721 /* mTCP Initialization */ 722 mctx = mtcp_create_context(core); 723 if (!mctx) { 724 pthread_exit(NULL); 725 TRACE_ERROR("Failed to craete mtcp context.\n"); 726 return NULL; 727 } 728 729 /* Run application here */ 730 RunApplication(mctx); 731 732 /* mTCP Tear Down */ 733 mtcp_destroy_context(mctx); 734 735 return NULL; 736 } 737 /*----------------------------------------------------------------------------*/ 738 int 739 main(int argc, char **argv) 740 { 741 int ret; 742 int current_core = -1; 743 char *fname = "config/mos.conf"; 744 745 int opt; 746 while ((opt = getopt(argc, argv, "f:c:")) != -1) { 747 switch (opt) { 748 case 'f': 749 fname = optarg; 750 break; 751 case 'c': 752 current_core = atoi(optarg); 753 if (current_core >= MAX_CPUS) { 754 TRACE_ERROR("Invalid core id!\n"); 755 exit(EXIT_FAILURE); 756 } 757 break; 758 default: 759 printf("Usage: %s [-f config_file]\n", argv[0]); 760 return 0; 761 } 762 763 } 764 765 core_limit = sysconf(_SC_NPROCESSORS_ONLN); 766 767 ret = mtcp_init(fname); 768 if (ret) { 769 TRACE_ERROR("Failed to initialize mtcp.\n"); 770 exit(EXIT_FAILURE); 771 } 772 773 mtcp_getconf(&g_mcfg); 774 775 core_limit = g_mcfg.num_cores; 776 777 GlbInitWget(); 778 779 RunMTCP(¤t_core); 780 781 mtcp_destroy(); 782 return 0; 783 } 784 /*----------------------------------------------------------------------------*/ 785