1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <string.h> 4 #include <unistd.h> 5 #include <stdint.h> 6 #include <time.h> 7 #include <sys/time.h> 8 #include <sys/types.h> 9 #include <sys/stat.h> 10 #include <fcntl.h> 11 #include <pthread.h> 12 #include <signal.h> 13 #include <sys/socket.h> 14 #include <netinet/in.h> 15 #include <arpa/inet.h> 16 #include <sys/queue.h> 17 #include <assert.h> 18 19 #include <mos_api.h> 20 #include "cpu.h" 21 #include "rss.h" 22 #include "http_parsing.h" 23 #include "debug.h" 24 #include "applib.h" 25 26 #define CONFIG_FILE "config/epwget.conf" 27 static struct conf_var g_conf[] = { 28 { "url", {0} }, 29 { "total_flows", {0} }, 30 { "core_limit", {0} }, 31 { "total_concurrency", {0} }, 32 { "dest_port", {0} }, 33 }; 34 #define NUM_CONF_VAR (sizeof(g_conf) / sizeof(struct conf_var)) 35 36 #define PORT_NUM 80 37 //#define PORT_NUM 3333 38 39 #define MAX_URL_LEN 128 40 #define MAX_FILE_LEN 128 41 #define HTTP_HEADER_LEN 1024 42 43 #define IP_RANGE 1 44 #define MAX_IP_STR_LEN 16 45 46 #define BUF_SIZE (8*1024) 47 48 /*----------------------------------------------------------------------------*/ 49 struct mtcp_conf g_mcfg; 50 static pthread_t mtcp_thread[MAX_CPUS]; 51 /*----------------------------------------------------------------------------*/ 52 static mctx_t g_mctx[MAX_CPUS]; 53 static int done[MAX_CPUS]; 54 /*----------------------------------------------------------------------------*/ 55 static int num_cores; 56 static int core_limit; 57 /*----------------------------------------------------------------------------*/ 58 static int fio = FALSE; 59 static char outfile[MAX_FILE_LEN + 1]; 60 /*----------------------------------------------------------------------------*/ 61 static char host[MAX_IP_STR_LEN + 1]; 62 static char path[MAX_URL_LEN + 1]; 63 static in_addr_t daddr; 64 static in_port_t dport; 65 static in_addr_t saddr; 66 /*----------------------------------------------------------------------------*/ 67 static int total_flows; 68 static int flows[MAX_CPUS]; 69 static int flowcnt = 0; 70 static int concurrency; 71 static int max_fds; 72 static uint16_t dest_port; 73 /*----------------------------------------------------------------------------*/ 74 struct wget_stat 75 { 76 uint64_t waits; 77 uint64_t events; 78 uint64_t connects; 79 uint64_t reads; 80 uint64_t writes; 81 uint64_t completes; 82 83 uint64_t errors; 84 uint64_t timedout; 85 86 uint64_t sum_resp_time; 87 uint64_t max_resp_time; 88 }; 89 /*----------------------------------------------------------------------------*/ 90 struct thread_context 91 { 92 int core; 93 94 mctx_t mctx; 95 int ep; 96 struct wget_vars *wvars; 97 98 int target; 99 int started; 100 int errors; 101 int incompletes; 102 int done; 103 int pending; 104 105 int maxevents; 106 struct mtcp_epoll_event *events; 107 108 struct wget_stat stat; 109 }; 110 typedef struct thread_context* thread_context_t; 111 /*----------------------------------------------------------------------------*/ 112 struct wget_vars 113 { 114 int request_sent; 115 116 char response[HTTP_HEADER_LEN]; 117 int resp_len; 118 int headerset; 119 uint32_t header_len; 120 uint64_t file_len; 121 uint64_t recv; 122 uint64_t write; 123 124 struct timeval t_start; 125 struct timeval t_end; 126 127 int fd; 128 }; 129 /*----------------------------------------------------------------------------*/ 130 static struct thread_context *g_ctx[MAX_CPUS]; 131 static struct wget_stat *g_stat[MAX_CPUS]; 132 /*----------------------------------------------------------------------------*/ 133 static thread_context_t 134 CreateContext(mctx_t mctx) 135 { 136 thread_context_t ctx; 137 138 ctx = (thread_context_t)calloc(1, sizeof(struct thread_context)); 139 if (!ctx) { 140 perror("malloc"); 141 TRACE_ERROR("Failed to allocate memory for thread context.\n"); 142 return NULL; 143 } 144 145 ctx->mctx = mctx; 146 ctx->core = mctx->cpu; 147 148 g_mctx[ctx->core] = mctx; 149 150 return ctx; 151 } 152 /*----------------------------------------------------------------------------*/ 153 static void 154 DestroyContext(thread_context_t ctx) 155 { 156 free(ctx); 157 } 158 /*----------------------------------------------------------------------------*/ 159 static inline int 160 CreateConnection(thread_context_t ctx) 161 { 162 mctx_t mctx = ctx->mctx; 163 struct mtcp_epoll_event ev; 164 struct sockaddr_in addr; 165 int sockid; 166 int ret; 167 168 assert(mctx); 169 170 errno = 0; 171 sockid = mtcp_socket(mctx, AF_INET, SOCK_STREAM, 0); 172 if (sockid < 0) { 173 TRACE_INFO("Failed to create socket! (%s)\n", 174 strerror(errno)); 175 return -1; 176 } 177 memset(&ctx->wvars[sockid], 0, sizeof(struct wget_vars)); 178 ret = mtcp_setsock_nonblock(mctx, sockid); 179 if (ret < 0) { 180 TRACE_ERROR("Failed to set socket in nonblocking mode.\n"); 181 exit(-1); 182 } 183 184 addr.sin_family = AF_INET; 185 addr.sin_addr.s_addr = daddr; 186 addr.sin_port = dport; 187 188 ret = mtcp_connect(mctx, sockid, 189 (struct sockaddr *)&addr, sizeof(struct sockaddr_in)); 190 if (ret < 0) { 191 if (errno != EINPROGRESS) { 192 perror("mtcp_connect"); 193 mtcp_close(mctx, sockid); 194 return -1; 195 } 196 } 197 198 ctx->started++; 199 ctx->pending++; 200 ctx->stat.connects++; 201 202 ev.events = MOS_EPOLLOUT; 203 ev.data.sock = sockid; 204 mtcp_epoll_ctl(mctx, ctx->ep, MOS_EPOLL_CTL_ADD, sockid, &ev); 205 206 return sockid; 207 } 208 /*----------------------------------------------------------------------------*/ 209 static inline void 210 CloseConnection(thread_context_t ctx, int sockid) 211 { 212 mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_DEL, sockid, NULL); 213 mtcp_close(ctx->mctx, sockid); 214 ctx->pending--; 215 ctx->done++; 216 assert(ctx->pending >= 0); 217 while (/*ctx->pending*/ mtcp_get_connection_cnt(ctx->mctx) < concurrency && ctx->started < ctx->target) { 218 if (CreateConnection(ctx) < 0) { 219 done[ctx->core] = TRUE; 220 break; 221 } 222 } 223 } 224 /*----------------------------------------------------------------------------*/ 225 static inline int 226 SendHTTPRequest(thread_context_t ctx, int sockid, struct wget_vars *wv) 227 { 228 char request[HTTP_HEADER_LEN]; 229 struct mtcp_epoll_event ev; 230 int wr; 231 int len; 232 233 wv->headerset = FALSE; 234 wv->recv = 0; 235 wv->header_len = wv->file_len = 0; 236 237 snprintf(request, HTTP_HEADER_LEN, "GET %s HTTP/1.0\r\n" 238 "User-Agent: Wget/1.12 (linux-gnu)\r\n" 239 "Accept: */*\r\n" 240 "Host: %s\r\n" 241 // "Connection: Keep-Alive\r\n\r\n", 242 "Connection: Close\r\n\r\n", 243 path, host); 244 len = strlen(request); 245 246 wr = mtcp_write(ctx->mctx, sockid, request, len); 247 if (wr < len) { 248 TRACE_ERROR("Socket %d: Sending HTTP request failed. " 249 "try: %d, sent: %d\n", sockid, len, wr); 250 CloseConnection(ctx, sockid); 251 } 252 ctx->stat.writes += wr; 253 TRACE_APP("Socket %d HTTP Request of %d bytes. sent.\n", sockid, wr); 254 wv->request_sent = TRUE; 255 256 ev.events = MOS_EPOLLIN; 257 ev.data.sock = sockid; 258 mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_MOD, sockid, &ev); 259 260 gettimeofday(&wv->t_start, NULL); 261 262 char fname[MAX_FILE_LEN + 1]; 263 if (fio) { 264 snprintf(fname, MAX_FILE_LEN, "%s.%d", outfile, flowcnt++); 265 wv->fd = open(fname, O_WRONLY | O_CREAT | O_TRUNC, 0644); 266 if (wv->fd < 0) { 267 TRACE_APP("Failed to open file descriptor for %s\n", fname); 268 exit(1); 269 } 270 } 271 272 return 0; 273 } 274 /*----------------------------------------------------------------------------*/ 275 static inline int 276 DownloadComplete(thread_context_t ctx, int sockid, struct wget_vars *wv) 277 { 278 #ifdef APP 279 mctx_t mctx = ctx->mctx; 280 #endif 281 uint64_t tdiff; 282 283 TRACE_APP("Socket %d File download complete!\n", sockid); 284 gettimeofday(&wv->t_end, NULL); 285 CloseConnection(ctx, sockid); 286 ctx->stat.completes++; 287 288 if (wv->recv - wv->header_len != wv->file_len) { 289 fprintf(stderr, "Response size mismatch! " 290 "actual recved: %ld, expected to recved: %ld\n", 291 wv->recv-wv->header_len, wv->file_len); 292 } 293 294 tdiff = (wv->t_end.tv_sec - wv->t_start.tv_sec) * 1000000 + 295 (wv->t_end.tv_usec - wv->t_start.tv_usec); 296 TRACE_APP("Socket %d Total received bytes: %lu (%luMB)\n", 297 sockid, wv->recv, wv->recv / 1000000); 298 TRACE_APP("Socket %d Total spent time: %lu us\n", sockid, tdiff); 299 if (tdiff > 0) { 300 TRACE_APP("Socket %d Average bandwidth: %lf[MB/s]\n", 301 sockid, (double)wv->recv / tdiff); 302 } 303 ctx->stat.sum_resp_time += tdiff; 304 if (tdiff > ctx->stat.max_resp_time) 305 ctx->stat.max_resp_time = tdiff; 306 307 if (fio && wv->fd > 0) 308 close(wv->fd); 309 310 return 0; 311 } 312 /*----------------------------------------------------------------------------*/ 313 static inline int 314 HandleReadEvent(thread_context_t ctx, int sockid, struct wget_vars *wv) 315 { 316 mctx_t mctx = ctx->mctx; 317 char buf[BUF_SIZE]; 318 char *pbuf; 319 int rd, copy_len; 320 321 rd = 1; 322 while (rd > 0) { 323 rd = mtcp_read(mctx, sockid, buf, BUF_SIZE); 324 if (rd <= 0) 325 break; 326 ctx->stat.reads += rd; 327 328 TRACE_APP("Socket %d: mtcp_read ret: %d, total_recv: %lu, " 329 "header_set: %d, header_len: %u, file_len: %lu\n", 330 sockid, rd, wv->recv + rd, 331 wv->headerset, wv->header_len, wv->file_len); 332 333 pbuf = buf; 334 if (!wv->headerset) { 335 copy_len = MIN(rd, HTTP_HEADER_LEN - wv->resp_len); 336 memcpy(wv->response + wv->resp_len, buf, copy_len); 337 wv->resp_len += copy_len; 338 wv->header_len = find_http_header(wv->response, wv->resp_len); 339 if (wv->header_len > 0) { 340 wv->response[wv->header_len] = '\0'; 341 wv->file_len = http_header_long_val(wv->response, 342 CONTENT_LENGTH_HDR, sizeof(CONTENT_LENGTH_HDR) - 1); 343 TRACE_APP("Socket %d Parsed response header. " 344 "Header length: %u, File length: %lu (%luMB)\n", 345 sockid, wv->header_len, 346 wv->file_len, wv->file_len / 1024 / 1024); 347 wv->headerset = TRUE; 348 wv->recv += (rd - (wv->resp_len - wv->header_len)); 349 350 pbuf += (rd - (wv->resp_len - wv->header_len)); 351 rd = (wv->resp_len - wv->header_len); 352 353 } else { 354 /* failed to parse response header */ 355 wv->recv += rd; 356 rd = 0; 357 ctx->stat.errors++; 358 ctx->errors++; 359 CloseConnection(ctx, sockid); 360 return 0; 361 } 362 } 363 wv->recv += rd; 364 365 if (fio && wv->fd > 0) { 366 int wr = 0; 367 while (wr < rd) { 368 int _wr = write(wv->fd, pbuf + wr, rd - wr); 369 assert (_wr == rd - wr); 370 if (_wr < 0) { 371 perror("write"); 372 TRACE_ERROR("Failed to write.\n"); 373 assert(0); 374 break; 375 } 376 wr += _wr; 377 wv->write += _wr; 378 } 379 } 380 381 if (wv->header_len && (wv->recv >= wv->header_len + wv->file_len)) { 382 break; 383 } 384 } 385 386 if (rd > 0) { 387 if (wv->header_len && (wv->recv >= wv->header_len + wv->file_len)) { 388 TRACE_APP("Socket %d Done Write: " 389 "header: %u file: %lu recv: %lu write: %lu\n", 390 sockid, wv->header_len, wv->file_len, 391 wv->recv - wv->header_len, wv->write); 392 DownloadComplete(ctx, sockid, wv); 393 394 return 0; 395 } 396 397 } else if (rd == 0) { 398 /* connection closed by remote host */ 399 TRACE_DBG("Socket %d connection closed with server.\n", sockid); 400 401 if (wv->header_len && (wv->recv >= wv->header_len + wv->file_len)) { 402 DownloadComplete(ctx, sockid, wv); 403 } else { 404 ctx->stat.errors++; 405 ctx->incompletes++; 406 CloseConnection(ctx, sockid); 407 } 408 409 } else if (rd < 0) { 410 if (errno != EAGAIN) { 411 TRACE_DBG("Socket %d: mtcp_read() error %s\n", 412 sockid, strerror(errno)); 413 ctx->stat.errors++; 414 ctx->errors++; 415 CloseConnection(ctx, sockid); 416 } 417 } 418 419 return 0; 420 } 421 /*----------------------------------------------------------------------------*/ 422 static void 423 PrintStats() 424 { 425 struct wget_stat total = {0}; 426 struct wget_stat *st; 427 uint64_t avg_resp_time; 428 uint64_t total_resp_time = 0; 429 int i; 430 431 for (i = 0; i < core_limit; i++) { 432 st = g_stat[i]; 433 if (!st) 434 continue; 435 avg_resp_time = st->completes? st->sum_resp_time / st->completes : 0; 436 437 total.waits += st->waits; 438 total.events += st->events; 439 total.connects += st->connects; 440 total.reads += st->reads; 441 total.writes += st->writes; 442 total.completes += st->completes; 443 total_resp_time += avg_resp_time; 444 if (st->max_resp_time > total.max_resp_time) 445 total.max_resp_time = st->max_resp_time; 446 total.errors += st->errors; 447 total.timedout += st->timedout; 448 449 memset(st, 0, sizeof(struct wget_stat)); 450 } 451 fprintf(stderr, "[ ALL ] connect: %7lu, read: %4lu MB, write: %4lu MB, " 452 "completes: %7lu (resp_time avg: %4lu, max: %6lu us)\n", 453 total.connects, 454 total.reads / 1024 / 1024, total.writes / 1024 / 1024, 455 total.completes, total_resp_time / core_limit, total.max_resp_time); 456 } 457 /*----------------------------------------------------------------------------*/ 458 static void 459 GlbInitWget() 460 { 461 struct mtcp_conf mcfg; 462 char *url; 463 int total_concurrency = 0; 464 int flow_per_thread; 465 int flow_remainder_cnt; 466 int i; 467 468 num_cores = GetNumCPUs(); 469 core_limit = num_cores; 470 concurrency = 100; 471 total_flows = -1; 472 473 LoadConfig(CONFIG_FILE, g_conf, NUM_CONF_VAR); 474 475 url = g_conf[0].value; 476 total_flows = atoi(g_conf[1].value); 477 core_limit = atoi(g_conf[2].value); 478 total_concurrency = atoi(g_conf[3].value); 479 dest_port = atoi(g_conf[4].value); 480 481 if ((strlen(url) == 0) 482 || (strlen(url) > MAX_URL_LEN) 483 || (total_flows <= 0)) { 484 TRACE_INFO("Invalid configuration\n"); 485 exit(0); 486 } 487 488 char* slash_p = strchr(url, '/'); 489 if (slash_p) { 490 strncpy(host, url, slash_p - url); 491 strncpy(path, strchr(url, '/'), MAX_URL_LEN); 492 } else { 493 strncpy(host, url, MAX_IP_STR_LEN); 494 strncpy(path, "/", 1); 495 } 496 497 daddr = inet_addr(host); 498 dport = (dest_port == 0) ? htons(PORT_NUM) : htons(dest_port); 499 saddr = INADDR_ANY; 500 501 if (total_flows < core_limit) { 502 core_limit = total_flows; 503 } 504 505 /* per-core concurrency = total_concurrency / # cores */ 506 if (total_concurrency > 0) 507 concurrency = total_concurrency / core_limit; 508 509 /* set the max number of fds 3x larger than concurrency */ 510 max_fds = concurrency * 3; 511 512 TRACE_CONFIG("Application configuration:\n"); 513 TRACE_CONFIG("URL: %s\n", path); 514 TRACE_CONFIG("# of total_flows: %d\n", total_flows); 515 TRACE_CONFIG("# of cores: %d\n", core_limit); 516 TRACE_CONFIG("Concurrency: %d\n", total_concurrency); 517 518 mtcp_getconf(&mcfg); 519 mcfg.max_concurrency = max_fds; 520 mcfg.max_num_buffers = max_fds; 521 mtcp_setconf(&mcfg); 522 523 flow_per_thread = total_flows / core_limit; 524 flow_remainder_cnt = total_flows % core_limit; 525 526 for (i = 0; i < MAX_CPUS; i++) { 527 done[i] = FALSE; 528 flows[i] = flow_per_thread; 529 530 if (flow_remainder_cnt-- > 0) 531 flows[i]++; 532 } 533 534 return; 535 } 536 /*----------------------------------------------------------------------------*/ 537 static void 538 InitWget(mctx_t mctx, void **app_ctx) 539 { 540 thread_context_t ctx; 541 int ep; 542 543 assert(mctx); 544 545 int core = mctx->cpu; 546 547 ctx = CreateContext(mctx); 548 if (!ctx) 549 exit(-1); 550 g_ctx[core] = ctx; 551 g_stat[core] = &ctx->stat; 552 srand(time(NULL)); 553 554 mtcp_init_rss(mctx, saddr, IP_RANGE, daddr, dport); 555 556 if (flows[core] == 0) { 557 TRACE_DBG("Application thread %d finished.\n", core); 558 exit(-1); 559 } 560 ctx->target = flows[core]; 561 562 /* Initialization */ 563 ctx->maxevents = max_fds * 3; 564 ep = mtcp_epoll_create(mctx, ctx->maxevents); 565 if (ep < 0) { 566 TRACE_ERROR("Failed to create epoll struct!n"); 567 exit(EXIT_FAILURE); 568 } 569 ctx->events = (struct mtcp_epoll_event *) 570 calloc(ctx->maxevents, sizeof(struct mtcp_epoll_event)); 571 if (!ctx->events) { 572 TRACE_ERROR("Failed to allocate events!\n"); 573 exit(EXIT_FAILURE); 574 } 575 ctx->ep = ep; 576 577 ctx->wvars = (struct wget_vars *)calloc(max_fds, sizeof(struct wget_vars)); 578 if (!ctx->wvars) { 579 TRACE_ERROR("Failed to create wget variables!\n"); 580 exit(EXIT_FAILURE); 581 } 582 583 ctx->started = ctx->done = ctx->pending = 0; 584 ctx->errors = ctx->incompletes = 0; 585 586 *app_ctx = ctx; 587 588 return; 589 } 590 /*----------------------------------------------------------------------------*/ 591 static void 592 RunWget(mctx_t mctx, void **app_ctx) 593 { 594 struct in_addr daddr_in; 595 struct timeval cur_tv, prev_tv; 596 int nevents; 597 int i; 598 599 assert(mctx); 600 assert(*app_ctx); 601 602 thread_context_t ctx = *app_ctx; 603 int core = ctx->core; 604 605 daddr_in.s_addr = daddr; 606 fprintf(stderr, "Thread %d handles %d flows. connecting to %s:%u\n", 607 core, flows[core], inet_ntoa(daddr_in), ntohs(dport)); 608 609 gettimeofday(&cur_tv, NULL); 610 prev_tv = cur_tv; 611 612 while (!done[core]) { 613 gettimeofday(&cur_tv, NULL); 614 615 /* print statistics every second */ 616 if (core == 0 && cur_tv.tv_sec > prev_tv.tv_sec) { 617 PrintStats(); 618 prev_tv = cur_tv; 619 } 620 621 while (/*ctx->pending*/ mtcp_get_connection_cnt(ctx->mctx) < concurrency && ctx->started < ctx->target) { 622 if (CreateConnection(ctx) < 0) { 623 done[core] = TRUE; 624 break; 625 } 626 } 627 628 nevents = mtcp_epoll_wait(mctx, ctx->ep, 629 ctx->events, ctx->maxevents, ctx->pending ? -1 : 10); 630 ctx->stat.waits++; 631 632 if (nevents < 0) { 633 if (errno != EINTR) { 634 TRACE_ERROR("mtcp_epoll_wait failed! ret: %d\n", nevents); 635 } 636 done[core] = TRUE; 637 break; 638 } else { 639 ctx->stat.events += nevents; 640 } 641 642 for (i = 0; i < nevents; i++) { 643 644 if (ctx->events[i].events & MOS_EPOLLERR) { 645 int err; 646 socklen_t len = sizeof(err); 647 648 TRACE_APP("[CPU %d] Error on socket %d\n", 649 core, ctx->events[i].data.sockid); 650 ctx->stat.errors++; 651 ctx->errors++; 652 if (mtcp_getsockopt(mctx, ctx->events[i].data.sock, 653 SOL_SOCKET, SO_ERROR, (void *)&err, &len) == 0) { 654 if (err == ETIMEDOUT) 655 ctx->stat.timedout++; 656 } 657 CloseConnection(ctx, ctx->events[i].data.sock); 658 659 } else if (ctx->events[i].events & MOS_EPOLLIN) { 660 HandleReadEvent(ctx, 661 ctx->events[i].data.sock, 662 &ctx->wvars[ctx->events[i].data.sock]); 663 664 } else if (ctx->events[i].events == MOS_EPOLLOUT) { 665 struct wget_vars *wv = &ctx->wvars[ctx->events[i].data.sock]; 666 667 if (!wv->request_sent) { 668 SendHTTPRequest(ctx, ctx->events[i].data.sock, wv); 669 } else { 670 //TRACE_DBG("Request already sent.\n"); 671 } 672 673 } else { 674 TRACE_ERROR("Socket %d: event: %s\n", 675 ctx->events[i].data.sock, 676 EventToString(ctx->events[i].events)); 677 assert(0); 678 } 679 } 680 681 if (ctx->done >= ctx->target) { 682 fprintf(stdout, "Completed %d connections, " 683 "errors: %d incompletes: %d\n", 684 ctx->done, ctx->errors, ctx->incompletes); 685 break; 686 } 687 } 688 689 TRACE_INFO("Wget thread %d waiting for mtcp to be destroyed.\n", core); 690 691 g_stat[core] = NULL; 692 g_ctx[core] = NULL; 693 DestroyContext(ctx); 694 695 return; 696 } 697 /*----------------------------------------------------------------------------*/ 698 void 699 RunApplication(mctx_t mctx) 700 { 701 void *app_ctx; 702 703 app_ctx = (void *)calloc(1, sizeof(void *)); 704 if (!app_ctx) { 705 TRACE_ERROR("calloc failure\n"); 706 return; 707 } 708 709 TRACE_INFO("run application on core %d\n", mctx->cpu); 710 InitWget(mctx, &(app_ctx)); 711 RunWget(mctx, &(app_ctx)); 712 } 713 /*----------------------------------------------------------------------------*/ 714 void * 715 RunMTCP(void *arg) 716 { 717 int core = *(int *)arg; 718 mctx_t mctx; 719 720 mtcp_core_affinitize(core); 721 722 /* mTCP Initialization */ 723 mctx = mtcp_create_context(core); 724 if (!mctx) { 725 pthread_exit(NULL); 726 TRACE_ERROR("Failed to craete mtcp context.\n"); 727 return NULL; 728 } 729 730 /* Run application here */ 731 RunApplication(mctx); 732 733 /* mTCP Tear Down */ 734 mtcp_destroy_context(mctx); 735 pthread_exit(NULL); 736 737 return NULL; 738 } 739 /*----------------------------------------------------------------------------*/ 740 int 741 main(int argc, char **argv) 742 { 743 int ret, i; 744 int cores[MAX_CPUS]; 745 char *fname = "config/mos.conf"; 746 747 int opt; 748 while ((opt = getopt(argc, argv, "f:")) != -1) { 749 switch (opt) { 750 case 'f': 751 fname = optarg; 752 break; 753 default: 754 printf("Usage: %s [-f config_file]\n", argv[0]); 755 return 0; 756 } 757 758 } 759 760 core_limit = sysconf(_SC_NPROCESSORS_ONLN); 761 762 ret = mtcp_init(fname); 763 if (ret) { 764 TRACE_ERROR("Failed to initialize mtcp.\n"); 765 exit(EXIT_FAILURE); 766 } 767 768 mtcp_getconf(&g_mcfg); 769 770 core_limit = g_mcfg.num_cores; 771 772 GlbInitWget(); 773 774 for (i = 0; i < core_limit; i++) { 775 cores[i] = i; 776 777 /* Run mtcp thread */ 778 if ((g_mcfg.cpu_mask & (1L << i)) && 779 pthread_create(&mtcp_thread[i], NULL, RunMTCP, (void *)&cores[i])) { 780 perror("pthread_create"); 781 TRACE_ERROR("Failed to create msg_test thread.\n"); 782 exit(-1); 783 } 784 } 785 786 for (i = 0; i < core_limit; i++) { 787 if (g_mcfg.cpu_mask & (1L << i)) 788 pthread_join(mtcp_thread[i], NULL); 789 TRACE_INFO("Message test thread %d joined.\n", i); 790 } 791 792 mtcp_destroy(); 793 return 0; 794 } 795 /*----------------------------------------------------------------------------*/ 796