1 #define _LARGEFILE64_SOURCE 2 #include <stdio.h> 3 #include <stdlib.h> 4 #include <unistd.h> 5 #include <stdint.h> 6 #include <sys/types.h> 7 #include <sys/stat.h> 8 #include <sys/socket.h> 9 #include <netinet/in.h> 10 #include <arpa/inet.h> 11 #include <fcntl.h> 12 #include <dirent.h> 13 #include <string.h> 14 #include <time.h> 15 #include <pthread.h> 16 #include <signal.h> 17 18 #include <mos_api.h> 19 #include "cpu.h" 20 #include "http_parsing.h" 21 #include "debug.h" 22 #include "applib.h" 23 24 #define CONFIG_FILE "config/epserver.conf" 25 static struct conf_var g_conf[] = { 26 { "core_limit", {0} }, 27 { "www_main", {0} }, 28 }; 29 #define NUM_CONF_VAR (sizeof(g_conf) / sizeof(struct conf_var)) 30 31 #define HTTP_HEADER_LEN 1024 32 #define URL_LEN 128 33 34 /* shinae 10.27.2014 35 * SNDBUF_SIZE should be removed 36 */ 37 #define SNDBUF_SIZE (8*1024) 38 39 #define MAX_FILES 100 40 41 /*----------------------------------------------------------------------------*/ 42 struct mtcp_conf g_mcfg; 43 static pthread_t mtcp_thread[MAX_CPUS]; 44 /*----------------------------------------------------------------------------*/ 45 struct file_cache 46 { 47 char name[128]; 48 char fullname[256]; 49 uint64_t size; 50 char *file; 51 }; 52 /*----------------------------------------------------------------------------*/ 53 struct server_vars 54 { 55 char request[HTTP_HEADER_LEN]; 56 int recv_len; 57 int request_len; 58 long int total_read, total_sent; 59 uint8_t done; 60 uint8_t rspheader_sent; 61 uint8_t keep_alive; 62 63 int fidx; // file cache index 64 char fname[128]; // file name 65 long int fsize; // file size 66 }; 67 /*----------------------------------------------------------------------------*/ 68 struct thread_context 69 { 70 mctx_t mctx; 71 int listener; 72 int ep; 73 struct server_vars *svars; 74 }; 75 /*----------------------------------------------------------------------------*/ 76 static int num_cores; 77 static int core_limit; 78 /*----------------------------------------------------------------------------*/ 79 char *www_main; 80 static struct file_cache fcache[MAX_FILES]; 81 static int nfiles; 82 /*----------------------------------------------------------------------------*/ 83 static int finished; 84 /*----------------------------------------------------------------------------*/ 85 static char * 86 StatusCodeToString(int scode) 87 { 88 switch (scode) { 89 case 200: 90 return "OK"; 91 break; 92 93 case 404: 94 return "Not Found"; 95 break; 96 } 97 98 return NULL; 99 } 100 /*----------------------------------------------------------------------------*/ 101 static void 102 CleanServerVariable(struct server_vars *sv) 103 { 104 sv->recv_len = 0; 105 sv->request_len = 0; 106 sv->total_read = 0; 107 sv->total_sent = 0; 108 sv->done = 0; 109 sv->rspheader_sent = 0; 110 sv->keep_alive = 0; 111 } 112 /*----------------------------------------------------------------------------*/ 113 static void 114 CloseConnection(struct thread_context *ctx, int sockid, struct server_vars *sv) 115 { 116 mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_DEL, sockid, NULL); 117 mtcp_close(ctx->mctx, sockid); 118 } 119 /*----------------------------------------------------------------------------*/ 120 static int 121 SendUntilAvailable(struct thread_context *ctx, int sockid, struct server_vars *sv) 122 { 123 int ret; 124 int sent; 125 int len; 126 127 if (sv->done || !sv->rspheader_sent) { 128 return 0; 129 } 130 131 sent = 0; 132 ret = 1; 133 while (ret > 0) { 134 len = MIN(SNDBUF_SIZE, sv->fsize - sv->total_sent); 135 if (len <= 0) { 136 break; 137 } 138 ret = mtcp_write(ctx->mctx, sockid, 139 fcache[sv->fidx].file + sv->total_sent, len); 140 if (ret < 0) { 141 if (errno != EAGAIN) { 142 TRACE_ERROR("Socket %d: Sending HTTP response body failed. " 143 "try: %d, sent: %d\n", sockid, len, ret); 144 } 145 break; 146 } 147 TRACE_APP("Socket %d: mtcp_write try: %d, ret: %d\n", sockid, len, ret); 148 sent += ret; 149 sv->total_sent += ret; 150 } 151 152 if (sv->total_sent >= fcache[sv->fidx].size) { 153 struct mtcp_epoll_event ev; 154 sv->done = TRUE; 155 finished++; 156 157 if (sv->keep_alive) { 158 /* if keep-alive connection, wait for the incoming request */ 159 ev.events = MOS_EPOLLIN; 160 ev.data.sock = sockid; 161 mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_MOD, sockid, &ev); 162 163 CleanServerVariable(sv); 164 } else { 165 /* else, close connection */ 166 CloseConnection(ctx, sockid, sv); 167 } 168 } 169 170 return sent; 171 } 172 /*----------------------------------------------------------------------------*/ 173 static int 174 HandleReadEvent(struct thread_context *ctx, int sockid, struct server_vars *sv) 175 { 176 struct mtcp_epoll_event ev; 177 char buf[HTTP_HEADER_LEN]; 178 char url[URL_LEN]; 179 char response[HTTP_HEADER_LEN]; 180 int scode; // status code 181 time_t t_now; 182 char t_str[128]; 183 char keepalive_str[128]; 184 int rd; 185 int i; 186 int len; 187 int sent; 188 189 /* HTTP request handling */ 190 rd = mtcp_read(ctx->mctx, sockid, buf, HTTP_HEADER_LEN); 191 if (rd <= 0) { 192 return rd; 193 } 194 memcpy(sv->request + sv->recv_len, 195 (char *)buf, MIN(rd, HTTP_HEADER_LEN - sv->recv_len)); 196 sv->recv_len += rd; 197 //sv->request[rd] = '\0'; 198 //fprintf(stderr, "HTTP Request: \n%s", request); 199 sv->request_len = find_http_header(sv->request, sv->recv_len); 200 if (sv->request_len <= 0) { 201 TRACE_ERROR("Socket %d: Failed to parse HTTP request header.\n" 202 "read bytes: %d, recv_len: %d, " 203 "request_len: %d, strlen: %ld, request: \n%s\n", 204 sockid, rd, sv->recv_len, 205 sv->request_len, strlen(sv->request), sv->request); 206 return rd; 207 } 208 209 http_get_url(sv->request, sv->request_len, url, URL_LEN); 210 TRACE_APP("Socket %d URL: %s\n", sockid, url); 211 sprintf(sv->fname, "%s%s", www_main, url); 212 TRACE_APP("Socket %d File name: %s\n", sockid, sv->fname); 213 214 215 sv->keep_alive = FALSE; 216 if (http_header_str_val(sv->request, CONN_HDR_FLD, 217 sizeof(CONN_HDR_FLD)-1, keepalive_str, sizeof(keepalive_str))) { 218 sv->keep_alive = !strcasecmp(keepalive_str, KEEP_ALIVE_STR); 219 } 220 221 /* Find file in cache */ 222 scode = 404; 223 for (i = 0; i < nfiles; i++) { 224 if (strcmp(sv->fname, fcache[i].fullname) == 0) { 225 sv->fsize = fcache[i].size; 226 sv->fidx = i; 227 scode = 200; 228 break; 229 } 230 } 231 TRACE_APP("Socket %d File size: %ld (%ldMB)\n", 232 sockid, sv->fsize, sv->fsize / 1024 / 1024); 233 234 /* Response header handling */ 235 time(&t_now); 236 strftime(t_str, 128, "%a, %d %b %Y %X GMT", gmtime(&t_now)); 237 if (sv->keep_alive) 238 sprintf(keepalive_str, "Keep-Alive"); 239 else 240 sprintf(keepalive_str, "Close"); 241 242 sprintf(response, "HTTP/1.1 %d %s\r\n" 243 "Date: %s\r\n" 244 "Server: Webserver on Middlebox TCP (Ubuntu)\r\n" 245 "Content-Length: %ld\r\n" 246 "Connection: %s\r\n\r\n", 247 scode, StatusCodeToString(scode), t_str, sv->fsize, keepalive_str); 248 len = strlen(response); 249 TRACE_APP("Socket %d HTTP Response: \n%s", sockid, response); 250 sent = mtcp_write(ctx->mctx, sockid, response, len); 251 if (sent < len) { 252 TRACE_ERROR("Socket %d: Sending HTTP response failed. " 253 "try: %d, sent: %d\n", sockid, len, sent); 254 CloseConnection(ctx, sockid, sv); 255 } 256 TRACE_APP("Socket %d Sent response header: try: %d, sent: %d\n", 257 sockid, len, sent); 258 assert(sent == len); 259 sv->rspheader_sent = TRUE; 260 261 ev.events = MOS_EPOLLIN | MOS_EPOLLOUT; 262 ev.data.sock = sockid; 263 mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_MOD, sockid, &ev); 264 265 SendUntilAvailable(ctx, sockid, sv); 266 267 return rd; 268 } 269 /*----------------------------------------------------------------------------*/ 270 static int 271 AcceptConnection(struct thread_context *ctx, int listener) 272 { 273 mctx_t mctx = ctx->mctx; 274 struct server_vars *sv; 275 struct mtcp_epoll_event ev; 276 int c; 277 278 c = mtcp_accept(mctx, listener, NULL, NULL); 279 280 if (c >= 0) { 281 if (c >= MAX_FLOW_NUM) { 282 TRACE_ERROR("Invalid socket id %d.\n", c); 283 return -1; 284 } 285 286 sv = &ctx->svars[c]; 287 CleanServerVariable(sv); 288 TRACE_APP("New connection %d accepted.\n", c); 289 ev.events = MOS_EPOLLIN; 290 ev.data.sock = c; 291 mtcp_setsock_nonblock(ctx->mctx, c); 292 mtcp_epoll_ctl(mctx, ctx->ep, MOS_EPOLL_CTL_ADD, c, &ev); 293 TRACE_APP("Socket %d registered.\n", c); 294 295 } else { 296 if (errno != EAGAIN) { 297 TRACE_ERROR("mtcp_accept() error %s\n", 298 strerror(errno)); 299 } 300 } 301 302 return c; 303 } 304 /*----------------------------------------------------------------------------*/ 305 static int 306 CreateListeningSocket(struct thread_context *ctx) 307 { 308 int listener; 309 struct mtcp_epoll_event ev; 310 struct sockaddr_in saddr; 311 int ret; 312 313 /* create socket and set it as nonblocking */ 314 listener = mtcp_socket(ctx->mctx, AF_INET, SOCK_STREAM, 0); 315 if (listener < 0) { 316 TRACE_ERROR("Failed to create listening socket!\n"); 317 return -1; 318 } 319 ret = mtcp_setsock_nonblock(ctx->mctx, listener); 320 if (ret < 0) { 321 TRACE_ERROR("Failed to set socket in nonblocking mode.\n"); 322 return -1; 323 } 324 325 /* bind to port 80 */ 326 saddr.sin_family = AF_INET; 327 saddr.sin_addr.s_addr = INADDR_ANY; 328 saddr.sin_port = htons(80); 329 ret = mtcp_bind(ctx->mctx, listener, 330 (struct sockaddr *)&saddr, sizeof(struct sockaddr_in)); 331 if (ret < 0) { 332 TRACE_ERROR("Failed to bind to the listening socket!\n"); 333 return -1; 334 } 335 336 /* listen (backlog: 4K) */ 337 ret = mtcp_listen(ctx->mctx, listener, 4096); 338 if (ret < 0) { 339 TRACE_ERROR("mtcp_listen() failed!\n"); 340 return -1; 341 } 342 343 /* wait for incoming accept events */ 344 ev.events = MOS_EPOLLIN; 345 ev.data.sock = listener; 346 mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_ADD, listener, &ev); 347 348 return listener; 349 } 350 /*----------------------------------------------------------------------------*/ 351 static void 352 GlobInitServer() 353 { 354 DIR *dir; 355 struct dirent *ent; 356 int fd; 357 int ret; 358 uint64_t total_read; 359 360 num_cores = GetNumCPUs(); 361 core_limit = num_cores; 362 363 if (LoadConfig(CONFIG_FILE, g_conf, NUM_CONF_VAR)) 364 exit(-1); 365 366 core_limit = atoi(g_conf[0].value); 367 www_main = g_conf[1].value; 368 369 /* open the directory to serve */ 370 dir = opendir(www_main); 371 if (!dir) { 372 TRACE_ERROR("Failed to open %s.\n", www_main); 373 perror("opendir"); 374 exit(-1); 375 } 376 377 nfiles = 0; 378 while ((ent = readdir(dir)) != NULL) { 379 if (strcmp(ent->d_name, ".") == 0) 380 continue; 381 else if (strcmp(ent->d_name, "..") == 0) 382 continue; 383 384 strcpy(fcache[nfiles].name, ent->d_name); 385 sprintf(fcache[nfiles].fullname, "%s/%s", www_main, ent->d_name); 386 fd = open(fcache[nfiles].fullname, O_RDONLY); 387 if (fd < 0) { 388 perror("open"); 389 continue; 390 } else { 391 fcache[nfiles].size = lseek64(fd, 0, SEEK_END); 392 lseek64(fd, 0, SEEK_SET); 393 } 394 395 fcache[nfiles].file = (char *)malloc(fcache[nfiles].size); 396 if (!fcache[nfiles].file) { 397 TRACE_ERROR("Failed to allocate memory for file %s\n", 398 fcache[nfiles].name); 399 perror("malloc"); 400 continue; 401 } 402 403 TRACE_INFO("Reading %s (%lu bytes)\n", 404 fcache[nfiles].name, fcache[nfiles].size); 405 total_read = 0; 406 while (1) { 407 ret = read(fd, fcache[nfiles].file + total_read, 408 fcache[nfiles].size - total_read); 409 if (ret < 0) { 410 break; 411 } else if (ret == 0) { 412 break; 413 } 414 total_read += ret; 415 } 416 if (total_read < fcache[nfiles].size) { 417 free(fcache[nfiles].file); 418 continue; 419 } 420 close(fd); 421 nfiles++; 422 423 if (nfiles >= MAX_FILES) 424 break; 425 } 426 427 finished = 0; 428 429 return; 430 } 431 /*----------------------------------------------------------------------------*/ 432 static void 433 InitServer(mctx_t mctx, void **app_ctx) 434 { 435 struct thread_context *ctx; 436 437 ctx = (struct thread_context *)calloc(1, sizeof(struct thread_context)); 438 if (!ctx) { 439 TRACE_ERROR("Failed to create thread context!\n"); 440 exit(-1); 441 } 442 443 ctx->mctx = mctx; 444 445 /* create epoll descriptor */ 446 ctx->ep = mtcp_epoll_create(mctx, MAX_EVENTS); 447 if (ctx->ep < 0) { 448 TRACE_ERROR("Failed to create epoll descriptor!\n"); 449 exit(-1); 450 } 451 452 /* allocate memory for server variables */ 453 ctx->svars = (struct server_vars *) 454 calloc(MAX_FLOW_NUM, sizeof(struct server_vars)); 455 if (!ctx->svars) { 456 TRACE_ERROR("Failed to create server_vars struct!\n"); 457 exit(-1); 458 } 459 460 ctx->listener = CreateListeningSocket(ctx); 461 if (ctx->listener < 0) { 462 TRACE_ERROR("Failed to create listening socket.\n"); 463 exit(-1); 464 } 465 466 *app_ctx = (void *)ctx; 467 468 return; 469 } 470 /*----------------------------------------------------------------------------*/ 471 static void 472 RunServer(mctx_t mctx, void **app_ctx) 473 { 474 struct thread_context *ctx = (*app_ctx); 475 int nevents; 476 int i, ret; 477 int do_accept; 478 struct mtcp_epoll_event *events; 479 480 assert(ctx); 481 int ep = ctx->ep; 482 483 events = (struct mtcp_epoll_event *) 484 calloc(MAX_EVENTS, sizeof(struct mtcp_epoll_event)); 485 if (!events) { 486 TRACE_ERROR("Failed to create event struct!\n"); 487 exit(-1); 488 } 489 490 while (1) { 491 nevents = mtcp_epoll_wait(mctx, ep, events, MAX_EVENTS, -1); 492 if (nevents < 0) { 493 if (errno != EINTR) 494 perror("mtcp_epoll_wait"); 495 break; 496 } 497 498 do_accept = FALSE; 499 for (i = 0; i < nevents; i++) { 500 501 if (events[i].data.sock == ctx->listener) { 502 /* if the event is for the listener, accept connection */ 503 do_accept = TRUE; 504 505 } else if (events[i].events & MOS_EPOLLERR) { 506 int err; 507 socklen_t len = sizeof(err); 508 509 /* error on the connection */ 510 TRACE_APP("[CPU %d] Error on socket %d\n", 511 core, events[i].data.sock); 512 if (mtcp_getsockopt(mctx, events[i].data.sock, 513 SOL_SOCKET, SO_ERROR, (void *)&err, &len) == 0) { 514 if (err != ETIMEDOUT) { 515 fprintf(stderr, "Error on socket %d: %s\n", 516 events[i].data.sock, strerror(err)); 517 } 518 } else { 519 fprintf(stderr, "mtcp_getsockopt: %s (for sockid: %d)\n", 520 strerror(errno), events[i].data.sock); 521 exit(-1); 522 } 523 CloseConnection(ctx, events[i].data.sock, 524 &ctx->svars[events[i].data.sock]); 525 526 } else if (events[i].events & MOS_EPOLLIN) { 527 ret = HandleReadEvent(ctx, events[i].data.sock, 528 &ctx->svars[events[i].data.sock]); 529 530 if (ret == 0) { 531 /* connection closed by remote host */ 532 CloseConnection(ctx, events[i].data.sock, 533 &ctx->svars[events[i].data.sock]); 534 } else if (ret < 0) { 535 /* if not EAGAIN, it's an error */ 536 if (errno != EAGAIN) { 537 CloseConnection(ctx, events[i].data.sock, 538 &ctx->svars[events[i].data.sock]); 539 } 540 } 541 542 } else if (events[i].events & MOS_EPOLLOUT) { 543 struct server_vars *sv = &ctx->svars[events[i].data.sock]; 544 if (sv->rspheader_sent) { 545 SendUntilAvailable(ctx, events[i].data.sock, sv); 546 } else { 547 TRACE_APP("Socket %d: Response header not sent yet.\n", 548 events[i].data.sock); 549 } 550 551 } else { 552 assert(0); 553 } 554 } 555 556 /* if do_accept flag is set, accept connections */ 557 if (do_accept) { 558 while (1) { 559 ret = AcceptConnection(ctx, ctx->listener); 560 if (ret < 0) 561 break; 562 } 563 } 564 565 } 566 567 return; 568 } 569 /*----------------------------------------------------------------------------*/ 570 void 571 RunApplication(mctx_t mctx) 572 { 573 void *app_ctx; 574 575 app_ctx = (void *)calloc(1, sizeof(void *)); 576 if (!app_ctx) { 577 TRACE_ERROR("calloc failure\n"); 578 return; 579 } 580 581 TRACE_INFO("run application on core %d\n", mctx->cpu); 582 InitServer(mctx, &(app_ctx)); 583 RunServer(mctx, &(app_ctx)); 584 } 585 /*----------------------------------------------------------------------------*/ 586 void * 587 RunMTCP(void *arg) 588 { 589 int core = *(int *)arg; 590 mctx_t mctx; 591 592 mtcp_core_affinitize(core); 593 594 /* mTCP Initialization */ 595 mctx = mtcp_create_context(core); 596 if (!mctx) { 597 pthread_exit(NULL); 598 TRACE_ERROR("Failed to craete mtcp context.\n"); 599 return NULL; 600 } 601 602 /* Run application here */ 603 RunApplication(mctx); 604 605 /* mTCP Tear Down */ 606 mtcp_destroy_context(mctx); 607 pthread_exit(NULL); 608 609 return NULL; 610 } 611 /*----------------------------------------------------------------------------*/ 612 int 613 main(int argc, char **argv) 614 { 615 int ret, i; 616 int cores[MAX_CPUS]; 617 char *fname = "config/mos.conf"; 618 619 int opt; 620 while ((opt = getopt(argc, argv, "f:")) != -1) { 621 switch (opt) { 622 case 'f': 623 fname = optarg; 624 break; 625 default: 626 printf("Usage: %s [-f config_file]\n", argv[0]); 627 return 0; 628 } 629 630 } 631 632 core_limit = sysconf(_SC_NPROCESSORS_ONLN); 633 634 ret = mtcp_init(fname); 635 if (ret) { 636 TRACE_ERROR("Failed to initialize mtcp.\n"); 637 exit(EXIT_FAILURE); 638 } 639 640 mtcp_getconf(&g_mcfg); 641 642 core_limit = g_mcfg.num_cores; 643 644 GlobInitServer(); 645 646 for (i = 0; i < core_limit; i++) { 647 cores[i] = i; 648 649 /* Run mtcp thread */ 650 if ((g_mcfg.cpu_mask & (1L << i)) && 651 pthread_create(&mtcp_thread[i], NULL, RunMTCP, (void *)&cores[i])) { 652 perror("pthread_create"); 653 TRACE_ERROR("Failed to create msg_test thread.\n"); 654 exit(-1); 655 } 656 } 657 658 for (i = 0; i < core_limit; i++) { 659 if (g_mcfg.cpu_mask & (1L << i)) 660 pthread_join(mtcp_thread[i], NULL); 661 TRACE_INFO("Message test thread %d joined.\n", i); 662 } 663 664 mtcp_destroy(); 665 return 0; 666 } 667 /*----------------------------------------------------------------------------*/ 668