1 #define _LARGEFILE64_SOURCE 2 #include <stdio.h> 3 #include <stdlib.h> 4 #include <unistd.h> 5 #include <stdint.h> 6 #include <sys/types.h> 7 #include <sys/stat.h> 8 #include <sys/socket.h> 9 #include <netinet/in.h> 10 #include <arpa/inet.h> 11 #include <fcntl.h> 12 #include <dirent.h> 13 #include <string.h> 14 #include <time.h> 15 #include <pthread.h> 16 #include <signal.h> 17 18 #include <mos_api.h> 19 #include "cpu.h" 20 #include "http_parsing.h" 21 #include "debug.h" 22 #include "applib.h" 23 24 #define CONFIG_FILE "config/epserver.conf" 25 static struct conf_var g_conf[] = { 26 { "core_limit", {0} }, 27 { "www_main", {0} }, 28 }; 29 #define NUM_CONF_VAR (sizeof(g_conf) / sizeof(struct conf_var)) 30 31 #define HTTP_HEADER_LEN 1024 32 #define URL_LEN 128 33 34 /* shinae 10.27.2014 35 * SNDBUF_SIZE should be removed 36 */ 37 #define SNDBUF_SIZE (8*1024) 38 39 #define MAX_FILES 100 40 int cores[MAX_CPUS] = {0}; 41 /*----------------------------------------------------------------------------*/ 42 struct mtcp_conf g_mcfg; 43 /*----------------------------------------------------------------------------*/ 44 struct file_cache 45 { 46 char name[128]; 47 char fullname[256]; 48 uint64_t size; 49 char *file; 50 }; 51 /*----------------------------------------------------------------------------*/ 52 struct server_vars 53 { 54 char request[HTTP_HEADER_LEN]; 55 int recv_len; 56 int request_len; 57 long int total_read, total_sent; 58 uint8_t done; 59 uint8_t rspheader_sent; 60 uint8_t keep_alive; 61 62 int fidx; // file cache index 63 char fname[128]; // file name 64 long int fsize; // file size 65 }; 66 /*----------------------------------------------------------------------------*/ 67 struct thread_context 68 { 69 mctx_t mctx; 70 int listener; 71 int ep; 72 struct server_vars *svars; 73 }; 74 /*----------------------------------------------------------------------------*/ 75 static int num_cores; 76 static int core_limit; 77 /*----------------------------------------------------------------------------*/ 78 char *www_main; 79 static struct file_cache fcache[MAX_FILES]; 80 static int nfiles; 81 /*----------------------------------------------------------------------------*/ 82 static int finished; 83 /*----------------------------------------------------------------------------*/ 84 static char * 85 StatusCodeToString(int scode) 86 { 87 switch (scode) { 88 case 200: 89 return "OK"; 90 break; 91 92 case 404: 93 return "Not Found"; 94 break; 95 } 96 97 return NULL; 98 } 99 /*----------------------------------------------------------------------------*/ 100 static void 101 CleanServerVariable(struct server_vars *sv) 102 { 103 sv->recv_len = 0; 104 sv->request_len = 0; 105 sv->total_read = 0; 106 sv->total_sent = 0; 107 sv->done = 0; 108 sv->rspheader_sent = 0; 109 sv->keep_alive = 0; 110 } 111 /*----------------------------------------------------------------------------*/ 112 static void 113 CloseConnection(struct thread_context *ctx, int sockid, struct server_vars *sv) 114 { 115 mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_DEL, sockid, NULL); 116 mtcp_close(ctx->mctx, sockid); 117 } 118 /*----------------------------------------------------------------------------*/ 119 static int 120 SendUntilAvailable(struct thread_context *ctx, int sockid, struct server_vars *sv) 121 { 122 int ret; 123 int sent; 124 int len; 125 126 if (sv->done || !sv->rspheader_sent) { 127 return 0; 128 } 129 130 sent = 0; 131 ret = 1; 132 while (ret > 0) { 133 len = MIN(SNDBUF_SIZE, sv->fsize - sv->total_sent); 134 if (len <= 0) { 135 break; 136 } 137 ret = mtcp_write(ctx->mctx, sockid, 138 fcache[sv->fidx].file + sv->total_sent, len); 139 if (ret < 0) { 140 if (errno != EAGAIN) { 141 TRACE_ERROR("Socket %d: Sending HTTP response body failed. " 142 "try: %d, sent: %d\n", sockid, len, ret); 143 } 144 break; 145 } 146 TRACE_APP("Socket %d: mtcp_write try: %d, ret: %d\n", sockid, len, ret); 147 sent += ret; 148 sv->total_sent += ret; 149 } 150 151 if (sv->total_sent >= fcache[sv->fidx].size) { 152 struct mtcp_epoll_event ev; 153 sv->done = TRUE; 154 finished++; 155 156 if (sv->keep_alive) { 157 /* if keep-alive connection, wait for the incoming request */ 158 ev.events = MOS_EPOLLIN; 159 ev.data.sock = sockid; 160 mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_MOD, sockid, &ev); 161 162 CleanServerVariable(sv); 163 } else { 164 /* else, close connection */ 165 CloseConnection(ctx, sockid, sv); 166 } 167 } 168 169 return sent; 170 } 171 /*----------------------------------------------------------------------------*/ 172 static int 173 HandleReadEvent(struct thread_context *ctx, int sockid, struct server_vars *sv) 174 { 175 struct mtcp_epoll_event ev; 176 char buf[HTTP_HEADER_LEN]; 177 char url[URL_LEN]; 178 char response[HTTP_HEADER_LEN]; 179 int scode; // status code 180 time_t t_now; 181 char t_str[128]; 182 char keepalive_str[128]; 183 int rd; 184 int i; 185 int len; 186 int sent; 187 188 /* HTTP request handling */ 189 rd = mtcp_read(ctx->mctx, sockid, buf, HTTP_HEADER_LEN); 190 if (rd <= 0) { 191 return rd; 192 } 193 memcpy(sv->request + sv->recv_len, 194 (char *)buf, MIN(rd, HTTP_HEADER_LEN - sv->recv_len)); 195 sv->recv_len += rd; 196 //sv->request[rd] = '\0'; 197 //fprintf(stderr, "HTTP Request: \n%s", request); 198 sv->request_len = find_http_header(sv->request, sv->recv_len); 199 if (sv->request_len <= 0) { 200 TRACE_ERROR("Socket %d: Failed to parse HTTP request header.\n" 201 "read bytes: %d, recv_len: %d, " 202 "request_len: %d, strlen: %ld, request: \n%s\n", 203 sockid, rd, sv->recv_len, 204 sv->request_len, strlen(sv->request), sv->request); 205 return rd; 206 } 207 208 http_get_url(sv->request, sv->request_len, url, URL_LEN); 209 TRACE_APP("Socket %d URL: %s\n", sockid, url); 210 sprintf(sv->fname, "%s%s", www_main, url); 211 TRACE_APP("Socket %d File name: %s\n", sockid, sv->fname); 212 213 214 sv->keep_alive = FALSE; 215 if (http_header_str_val(sv->request, CONN_HDR_FLD, 216 sizeof(CONN_HDR_FLD)-1, keepalive_str, sizeof(keepalive_str))) { 217 sv->keep_alive = !strcasecmp(keepalive_str, KEEP_ALIVE_STR); 218 } 219 220 /* Find file in cache */ 221 scode = 404; 222 for (i = 0; i < nfiles; i++) { 223 if (strcmp(sv->fname, fcache[i].fullname) == 0) { 224 sv->fsize = fcache[i].size; 225 sv->fidx = i; 226 scode = 200; 227 break; 228 } 229 } 230 TRACE_APP("Socket %d File size: %ld (%ldMB)\n", 231 sockid, sv->fsize, sv->fsize / 1024 / 1024); 232 233 /* Response header handling */ 234 time(&t_now); 235 strftime(t_str, 128, "%a, %d %b %Y %X GMT", gmtime(&t_now)); 236 if (sv->keep_alive) 237 sprintf(keepalive_str, "Keep-Alive"); 238 else 239 sprintf(keepalive_str, "Close"); 240 241 sprintf(response, "HTTP/1.1 %d %s\r\n" 242 "Date: %s\r\n" 243 "Server: Webserver on Middlebox TCP (Ubuntu)\r\n" 244 "Content-Length: %ld\r\n" 245 "Connection: %s\r\n\r\n", 246 scode, StatusCodeToString(scode), t_str, sv->fsize, keepalive_str); 247 len = strlen(response); 248 TRACE_APP("Socket %d HTTP Response: \n%s", sockid, response); 249 sent = mtcp_write(ctx->mctx, sockid, response, len); 250 if (sent < len) { 251 TRACE_ERROR("Socket %d: Sending HTTP response failed. " 252 "try: %d, sent: %d\n", sockid, len, sent); 253 CloseConnection(ctx, sockid, sv); 254 } 255 TRACE_APP("Socket %d Sent response header: try: %d, sent: %d\n", 256 sockid, len, sent); 257 assert(sent == len); 258 sv->rspheader_sent = TRUE; 259 260 ev.events = MOS_EPOLLIN | MOS_EPOLLOUT; 261 ev.data.sock = sockid; 262 mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_MOD, sockid, &ev); 263 264 SendUntilAvailable(ctx, sockid, sv); 265 266 return rd; 267 } 268 /*----------------------------------------------------------------------------*/ 269 static int 270 AcceptConnection(struct thread_context *ctx, int listener) 271 { 272 mctx_t mctx = ctx->mctx; 273 struct server_vars *sv; 274 struct mtcp_epoll_event ev; 275 int c; 276 277 c = mtcp_accept(mctx, listener, NULL, NULL); 278 279 if (c >= 0) { 280 if (c >= MAX_FLOW_NUM) { 281 TRACE_ERROR("Invalid socket id %d.\n", c); 282 return -1; 283 } 284 285 sv = &ctx->svars[c]; 286 CleanServerVariable(sv); 287 TRACE_APP("New connection %d accepted.\n", c); 288 ev.events = MOS_EPOLLIN; 289 ev.data.sock = c; 290 mtcp_setsock_nonblock(ctx->mctx, c); 291 mtcp_epoll_ctl(mctx, ctx->ep, MOS_EPOLL_CTL_ADD, c, &ev); 292 TRACE_APP("Socket %d registered.\n", c); 293 294 } else { 295 if (errno != EAGAIN) { 296 TRACE_ERROR("mtcp_accept() error %s\n", 297 strerror(errno)); 298 } 299 } 300 301 return c; 302 } 303 /*----------------------------------------------------------------------------*/ 304 static int 305 CreateListeningSocket(struct thread_context *ctx) 306 { 307 int listener; 308 struct mtcp_epoll_event ev; 309 struct sockaddr_in saddr; 310 int ret; 311 312 /* create socket and set it as nonblocking */ 313 listener = mtcp_socket(ctx->mctx, AF_INET, SOCK_STREAM, 0); 314 if (listener < 0) { 315 TRACE_ERROR("Failed to create listening socket!\n"); 316 return -1; 317 } 318 ret = mtcp_setsock_nonblock(ctx->mctx, listener); 319 if (ret < 0) { 320 TRACE_ERROR("Failed to set socket in nonblocking mode.\n"); 321 return -1; 322 } 323 324 /* bind to port 80 */ 325 saddr.sin_family = AF_INET; 326 saddr.sin_addr.s_addr = INADDR_ANY; 327 saddr.sin_port = htons(80); 328 ret = mtcp_bind(ctx->mctx, listener, 329 (struct sockaddr *)&saddr, sizeof(struct sockaddr_in)); 330 if (ret < 0) { 331 TRACE_ERROR("Failed to bind to the listening socket!\n"); 332 return -1; 333 } 334 335 /* listen (backlog: 4K) */ 336 ret = mtcp_listen(ctx->mctx, listener, 4096); 337 if (ret < 0) { 338 TRACE_ERROR("mtcp_listen() failed!\n"); 339 return -1; 340 } 341 342 /* wait for incoming accept events */ 343 ev.events = MOS_EPOLLIN; 344 ev.data.sock = listener; 345 mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_ADD, listener, &ev); 346 347 return listener; 348 } 349 /*----------------------------------------------------------------------------*/ 350 static void 351 GlobInitServer() 352 { 353 DIR *dir; 354 struct dirent *ent; 355 int fd; 356 int ret; 357 uint64_t total_read; 358 359 num_cores = GetNumCPUs(); 360 core_limit = num_cores; 361 362 if (LoadConfig(CONFIG_FILE, g_conf, NUM_CONF_VAR)) 363 exit(-1); 364 365 core_limit = atoi(g_conf[0].value); 366 www_main = g_conf[1].value; 367 368 /* open the directory to serve */ 369 dir = opendir(www_main); 370 if (!dir) { 371 TRACE_ERROR("Failed to open %s.\n", www_main); 372 perror("opendir"); 373 exit(-1); 374 } 375 376 nfiles = 0; 377 while ((ent = readdir(dir)) != NULL) { 378 if (strcmp(ent->d_name, ".") == 0) 379 continue; 380 else if (strcmp(ent->d_name, "..") == 0) 381 continue; 382 383 strcpy(fcache[nfiles].name, ent->d_name); 384 sprintf(fcache[nfiles].fullname, "%s/%s", www_main, ent->d_name); 385 fd = open(fcache[nfiles].fullname, O_RDONLY); 386 if (fd < 0) { 387 perror("open"); 388 continue; 389 } else { 390 fcache[nfiles].size = lseek64(fd, 0, SEEK_END); 391 lseek64(fd, 0, SEEK_SET); 392 } 393 394 fcache[nfiles].file = (char *)malloc(fcache[nfiles].size); 395 if (!fcache[nfiles].file) { 396 TRACE_ERROR("Failed to allocate memory for file %s\n", 397 fcache[nfiles].name); 398 perror("malloc"); 399 continue; 400 } 401 402 TRACE_INFO("Reading %s (%lu bytes)\n", 403 fcache[nfiles].name, fcache[nfiles].size); 404 total_read = 0; 405 while (1) { 406 ret = read(fd, fcache[nfiles].file + total_read, 407 fcache[nfiles].size - total_read); 408 if (ret < 0) { 409 break; 410 } else if (ret == 0) { 411 break; 412 } 413 total_read += ret; 414 } 415 if (total_read < fcache[nfiles].size) { 416 free(fcache[nfiles].file); 417 continue; 418 } 419 close(fd); 420 nfiles++; 421 422 if (nfiles >= MAX_FILES) 423 break; 424 } 425 426 finished = 0; 427 428 return; 429 } 430 /*----------------------------------------------------------------------------*/ 431 static void 432 InitServer(mctx_t mctx, void **app_ctx) 433 { 434 struct thread_context *ctx; 435 436 ctx = (struct thread_context *)calloc(1, sizeof(struct thread_context)); 437 if (!ctx) { 438 TRACE_ERROR("Failed to create thread context!\n"); 439 exit(-1); 440 } 441 442 ctx->mctx = mctx; 443 444 /* create epoll descriptor */ 445 ctx->ep = mtcp_epoll_create(mctx, MAX_EVENTS); 446 if (ctx->ep < 0) { 447 TRACE_ERROR("Failed to create epoll descriptor!\n"); 448 exit(-1); 449 } 450 451 /* allocate memory for server variables */ 452 ctx->svars = (struct server_vars *) 453 calloc(MAX_FLOW_NUM, sizeof(struct server_vars)); 454 if (!ctx->svars) { 455 TRACE_ERROR("Failed to create server_vars struct!\n"); 456 exit(-1); 457 } 458 459 ctx->listener = CreateListeningSocket(ctx); 460 if (ctx->listener < 0) { 461 TRACE_ERROR("Failed to create listening socket.\n"); 462 exit(-1); 463 } 464 465 *app_ctx = (void *)ctx; 466 467 return; 468 } 469 /*----------------------------------------------------------------------------*/ 470 static void 471 RunServer(mctx_t mctx, void **app_ctx) 472 { 473 struct thread_context *ctx = (*app_ctx); 474 int nevents; 475 int i, ret; 476 int do_accept; 477 struct mtcp_epoll_event *events; 478 479 assert(ctx); 480 int ep = ctx->ep; 481 482 events = (struct mtcp_epoll_event *) 483 calloc(MAX_EVENTS, sizeof(struct mtcp_epoll_event)); 484 if (!events) { 485 TRACE_ERROR("Failed to create event struct!\n"); 486 exit(-1); 487 } 488 489 while (1) { 490 nevents = mtcp_epoll_wait(mctx, ep, events, MAX_EVENTS, -1); 491 if (nevents < 0) { 492 if (errno != EINTR) 493 perror("mtcp_epoll_wait"); 494 break; 495 } 496 497 do_accept = FALSE; 498 for (i = 0; i < nevents; i++) { 499 500 if (events[i].data.sock == ctx->listener) { 501 /* if the event is for the listener, accept connection */ 502 do_accept = TRUE; 503 504 } else if (events[i].events & MOS_EPOLLERR) { 505 int err; 506 socklen_t len = sizeof(err); 507 508 /* error on the connection */ 509 TRACE_APP("[CPU %d] Error on socket %d\n", 510 core, events[i].data.sock); 511 if (mtcp_getsockopt(mctx, events[i].data.sock, 512 SOL_SOCKET, SO_ERROR, (void *)&err, &len) == 0) { 513 if (err != ETIMEDOUT) { 514 fprintf(stderr, "Error on socket %d: %s\n", 515 events[i].data.sock, strerror(err)); 516 } 517 } else { 518 fprintf(stderr, "mtcp_getsockopt: %s (for sockid: %d)\n", 519 strerror(errno), events[i].data.sock); 520 exit(-1); 521 } 522 CloseConnection(ctx, events[i].data.sock, 523 &ctx->svars[events[i].data.sock]); 524 525 } else if (events[i].events & MOS_EPOLLIN) { 526 ret = HandleReadEvent(ctx, events[i].data.sock, 527 &ctx->svars[events[i].data.sock]); 528 529 if (ret == 0) { 530 /* connection closed by remote host */ 531 CloseConnection(ctx, events[i].data.sock, 532 &ctx->svars[events[i].data.sock]); 533 } else if (ret < 0) { 534 /* if not EAGAIN, it's an error */ 535 if (errno != EAGAIN) { 536 CloseConnection(ctx, events[i].data.sock, 537 &ctx->svars[events[i].data.sock]); 538 } 539 } 540 541 } else if (events[i].events & MOS_EPOLLOUT) { 542 struct server_vars *sv = &ctx->svars[events[i].data.sock]; 543 if (sv->rspheader_sent) { 544 SendUntilAvailable(ctx, events[i].data.sock, sv); 545 } else { 546 TRACE_APP("Socket %d: Response header not sent yet.\n", 547 events[i].data.sock); 548 } 549 550 } else { 551 assert(0); 552 } 553 } 554 555 /* if do_accept flag is set, accept connections */ 556 if (do_accept) { 557 while (1) { 558 ret = AcceptConnection(ctx, ctx->listener); 559 if (ret < 0) 560 break; 561 } 562 } 563 564 } 565 566 return; 567 } 568 /*----------------------------------------------------------------------------*/ 569 void 570 RunApplication(mctx_t mctx) 571 { 572 void *app_ctx; 573 574 app_ctx = (void *)calloc(1, sizeof(void *)); 575 if (!app_ctx) { 576 TRACE_ERROR("calloc failure\n"); 577 return; 578 } 579 580 TRACE_INFO("run application on core %d\n", mctx->cpu); 581 InitServer(mctx, &(app_ctx)); 582 RunServer(mctx, &(app_ctx)); 583 } 584 /*----------------------------------------------------------------------------*/ 585 void * 586 RunMTCP(void *arg) 587 { 588 int core = *(int *)arg; 589 mctx_t mctx; 590 591 mtcp_core_affinitize(core); 592 593 /* mTCP Initialization */ 594 mctx = mtcp_create_context(core); 595 if (!mctx) { 596 pthread_exit(NULL); 597 TRACE_ERROR("Failed to craete mtcp context.\n"); 598 return NULL; 599 } 600 601 /* Run application here */ 602 RunApplication(mctx); 603 604 /* mTCP Tear Down */ 605 mtcp_destroy_context(mctx); 606 607 return NULL; 608 } 609 /*----------------------------------------------------------------------------*/ 610 int 611 main(int argc, char **argv) 612 { 613 int ret/*, i*/; 614 int current_core = -1; 615 char *fname = "config/mos.conf"; 616 617 int opt; 618 while ((opt = getopt(argc, argv, "f:c:")) != -1) { 619 switch (opt) { 620 case 'f': 621 fname = optarg; 622 break; 623 case 'c': 624 current_core = atoi(optarg); 625 if (current_core >= MAX_CPUS) { 626 TRACE_ERROR("Invalid core id!\n"); 627 exit(EXIT_FAILURE); 628 } 629 break; 630 default: 631 printf("Usage: %s [-f config_file]\n", argv[0]); 632 return 0; 633 } 634 635 } 636 637 core_limit = sysconf(_SC_NPROCESSORS_ONLN); 638 639 ret = mtcp_init(fname); 640 if (ret) { 641 TRACE_ERROR("Failed to initialize mtcp.\n"); 642 exit(EXIT_FAILURE); 643 } 644 645 if (current_core < 0) { 646 TRACE_ERROR("Invalid core id.\n"); 647 exit(EXIT_FAILURE); 648 } 649 650 mtcp_getconf(&g_mcfg); 651 652 core_limit = g_mcfg.num_cores; 653 654 GlobInitServer(); 655 656 RunMTCP(¤t_core); 657 658 mtcp_destroy(); 659 return 0; 660 } 661 /*----------------------------------------------------------------------------*/ 662