1 #define _LARGEFILE64_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <unistd.h>
5 #include <stdint.h>
6 #include <sys/types.h>
7 #include <sys/stat.h>
8 #include <sys/socket.h>
9 #include <netinet/in.h>
10 #include <arpa/inet.h>
11 #include <fcntl.h>
12 #include <dirent.h>
13 #include <string.h>
14 #include <time.h>
15 #include <pthread.h>
16 #include <signal.h>
17
18 #include <mos_api.h>
19 #include "cpu.h"
20 #include "http_parsing.h"
21 #include "debug.h"
22 #include "applib.h"
23
24 #define CONFIG_FILE "config/epserver.conf"
25 static struct conf_var g_conf[] = {
26 { "core_limit", {0} },
27 { "www_main", {0} },
28 };
29 #define NUM_CONF_VAR (sizeof(g_conf) / sizeof(struct conf_var))
30
31 #define HTTP_HEADER_LEN 1024
32 #define URL_LEN 128
33
34 /* shinae 10.27.2014
35 * SNDBUF_SIZE should be removed
36 */
37 #define SNDBUF_SIZE (8*1024)
38
39 #define MAX_FILES 100
40
41 /*----------------------------------------------------------------------------*/
42 struct mtcp_conf g_mcfg;
43 static pthread_t mtcp_thread[MAX_CPUS];
44 /*----------------------------------------------------------------------------*/
45 struct file_cache
46 {
47 char name[128];
48 char fullname[256];
49 uint64_t size;
50 char *file;
51 };
52 /*----------------------------------------------------------------------------*/
53 struct server_vars
54 {
55 char request[HTTP_HEADER_LEN];
56 int recv_len;
57 int request_len;
58 long int total_read, total_sent;
59 uint8_t done;
60 uint8_t rspheader_sent;
61 uint8_t keep_alive;
62
63 int fidx; // file cache index
64 char fname[128]; // file name
65 long int fsize; // file size
66 };
67 /*----------------------------------------------------------------------------*/
68 struct thread_context
69 {
70 mctx_t mctx;
71 int listener;
72 int ep;
73 struct server_vars *svars;
74 };
75 /*----------------------------------------------------------------------------*/
76 static int num_cores;
77 static int core_limit;
78 /*----------------------------------------------------------------------------*/
79 char *www_main;
80 static struct file_cache fcache[MAX_FILES];
81 static int nfiles;
82 /*----------------------------------------------------------------------------*/
83 static int finished;
84 /*----------------------------------------------------------------------------*/
85 static char *
StatusCodeToString(int scode)86 StatusCodeToString(int scode)
87 {
88 switch (scode) {
89 case 200:
90 return "OK";
91 break;
92
93 case 404:
94 return "Not Found";
95 break;
96 }
97
98 return NULL;
99 }
100 /*----------------------------------------------------------------------------*/
101 static void
CleanServerVariable(struct server_vars * sv)102 CleanServerVariable(struct server_vars *sv)
103 {
104 sv->recv_len = 0;
105 sv->request_len = 0;
106 sv->total_read = 0;
107 sv->total_sent = 0;
108 sv->done = 0;
109 sv->rspheader_sent = 0;
110 sv->keep_alive = 0;
111 }
112 /*----------------------------------------------------------------------------*/
113 static void
CloseConnection(struct thread_context * ctx,int sockid,struct server_vars * sv)114 CloseConnection(struct thread_context *ctx, int sockid, struct server_vars *sv)
115 {
116 mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_DEL, sockid, NULL);
117 mtcp_close(ctx->mctx, sockid);
118 }
119 /*----------------------------------------------------------------------------*/
120 static int
SendUntilAvailable(struct thread_context * ctx,int sockid,struct server_vars * sv)121 SendUntilAvailable(struct thread_context *ctx, int sockid, struct server_vars *sv)
122 {
123 int ret;
124 int sent;
125 int len;
126
127 if (sv->done || !sv->rspheader_sent) {
128 return 0;
129 }
130
131 sent = 0;
132 ret = 1;
133 while (ret > 0) {
134 len = MIN(SNDBUF_SIZE, sv->fsize - sv->total_sent);
135 if (len <= 0) {
136 break;
137 }
138 ret = mtcp_write(ctx->mctx, sockid,
139 fcache[sv->fidx].file + sv->total_sent, len);
140 if (ret < 0) {
141 if (errno != EAGAIN) {
142 TRACE_ERROR("Socket %d: Sending HTTP response body failed. "
143 "try: %d, sent: %d\n", sockid, len, ret);
144 }
145 break;
146 }
147 TRACE_APP("Socket %d: mtcp_write try: %d, ret: %d\n", sockid, len, ret);
148 sent += ret;
149 sv->total_sent += ret;
150 }
151
152 if (sv->total_sent >= fcache[sv->fidx].size) {
153 struct mtcp_epoll_event ev;
154 sv->done = TRUE;
155 finished++;
156
157 if (sv->keep_alive) {
158 /* if keep-alive connection, wait for the incoming request */
159 ev.events = MOS_EPOLLIN;
160 ev.data.sock = sockid;
161 mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_MOD, sockid, &ev);
162
163 CleanServerVariable(sv);
164 } else {
165 /* else, close connection */
166 CloseConnection(ctx, sockid, sv);
167 }
168 }
169
170 return sent;
171 }
172 /*----------------------------------------------------------------------------*/
173 static int
HandleReadEvent(struct thread_context * ctx,int sockid,struct server_vars * sv)174 HandleReadEvent(struct thread_context *ctx, int sockid, struct server_vars *sv)
175 {
176 struct mtcp_epoll_event ev;
177 char buf[HTTP_HEADER_LEN];
178 char url[URL_LEN];
179 char response[HTTP_HEADER_LEN];
180 int scode; // status code
181 time_t t_now;
182 char t_str[128];
183 char keepalive_str[128];
184 int rd;
185 int i;
186 int len;
187 int sent;
188
189 /* HTTP request handling */
190 rd = mtcp_read(ctx->mctx, sockid, buf, HTTP_HEADER_LEN);
191 if (rd <= 0) {
192 return rd;
193 }
194 memcpy(sv->request + sv->recv_len,
195 (char *)buf, MIN(rd, HTTP_HEADER_LEN - sv->recv_len));
196 sv->recv_len += rd;
197 //sv->request[rd] = '\0';
198 //fprintf(stderr, "HTTP Request: \n%s", request);
199 sv->request_len = find_http_header(sv->request, sv->recv_len);
200 if (sv->request_len <= 0) {
201 TRACE_ERROR("Socket %d: Failed to parse HTTP request header.\n"
202 "read bytes: %d, recv_len: %d, "
203 "request_len: %d, strlen: %ld, request: \n%s\n",
204 sockid, rd, sv->recv_len,
205 sv->request_len, strlen(sv->request), sv->request);
206 return rd;
207 }
208
209 http_get_url(sv->request, sv->request_len, url, URL_LEN);
210 TRACE_APP("Socket %d URL: %s\n", sockid, url);
211 sprintf(sv->fname, "%s%s", www_main, url);
212 TRACE_APP("Socket %d File name: %s\n", sockid, sv->fname);
213
214
215 sv->keep_alive = FALSE;
216 if (http_header_str_val(sv->request, CONN_HDR_FLD,
217 sizeof(CONN_HDR_FLD)-1, keepalive_str, sizeof(keepalive_str))) {
218 sv->keep_alive = !strcasecmp(keepalive_str, KEEP_ALIVE_STR);
219 }
220
221 /* Find file in cache */
222 scode = 404;
223 for (i = 0; i < nfiles; i++) {
224 if (strcmp(sv->fname, fcache[i].fullname) == 0) {
225 sv->fsize = fcache[i].size;
226 sv->fidx = i;
227 scode = 200;
228 break;
229 }
230 }
231 TRACE_APP("Socket %d File size: %ld (%ldMB)\n",
232 sockid, sv->fsize, sv->fsize / 1024 / 1024);
233
234 /* Response header handling */
235 time(&t_now);
236 strftime(t_str, 128, "%a, %d %b %Y %X GMT", gmtime(&t_now));
237 if (sv->keep_alive)
238 sprintf(keepalive_str, "Keep-Alive");
239 else
240 sprintf(keepalive_str, "Close");
241
242 sprintf(response, "HTTP/1.1 %d %s\r\n"
243 "Date: %s\r\n"
244 "Server: Webserver on Middlebox TCP (Ubuntu)\r\n"
245 "Content-Length: %ld\r\n"
246 "Connection: %s\r\n\r\n",
247 scode, StatusCodeToString(scode), t_str, sv->fsize, keepalive_str);
248 len = strlen(response);
249 TRACE_APP("Socket %d HTTP Response: \n%s", sockid, response);
250 sent = mtcp_write(ctx->mctx, sockid, response, len);
251 if (sent < len) {
252 TRACE_ERROR("Socket %d: Sending HTTP response failed. "
253 "try: %d, sent: %d\n", sockid, len, sent);
254 CloseConnection(ctx, sockid, sv);
255 }
256 TRACE_APP("Socket %d Sent response header: try: %d, sent: %d\n",
257 sockid, len, sent);
258 assert(sent == len);
259 sv->rspheader_sent = TRUE;
260
261 ev.events = MOS_EPOLLIN | MOS_EPOLLOUT;
262 ev.data.sock = sockid;
263 mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_MOD, sockid, &ev);
264
265 SendUntilAvailable(ctx, sockid, sv);
266
267 return rd;
268 }
269 /*----------------------------------------------------------------------------*/
270 static int
AcceptConnection(struct thread_context * ctx,int listener)271 AcceptConnection(struct thread_context *ctx, int listener)
272 {
273 mctx_t mctx = ctx->mctx;
274 struct server_vars *sv;
275 struct mtcp_epoll_event ev;
276 int c;
277
278 c = mtcp_accept(mctx, listener, NULL, NULL);
279
280 if (c >= 0) {
281 if (c >= MAX_FLOW_NUM) {
282 TRACE_ERROR("Invalid socket id %d.\n", c);
283 return -1;
284 }
285
286 sv = &ctx->svars[c];
287 CleanServerVariable(sv);
288 TRACE_APP("New connection %d accepted.\n", c);
289 ev.events = MOS_EPOLLIN;
290 ev.data.sock = c;
291 mtcp_setsock_nonblock(ctx->mctx, c);
292 mtcp_epoll_ctl(mctx, ctx->ep, MOS_EPOLL_CTL_ADD, c, &ev);
293 TRACE_APP("Socket %d registered.\n", c);
294
295 } else {
296 if (errno != EAGAIN) {
297 TRACE_ERROR("mtcp_accept() error %s\n",
298 strerror(errno));
299 }
300 }
301
302 return c;
303 }
304 /*----------------------------------------------------------------------------*/
305 static int
CreateListeningSocket(struct thread_context * ctx)306 CreateListeningSocket(struct thread_context *ctx)
307 {
308 int listener;
309 struct mtcp_epoll_event ev;
310 struct sockaddr_in saddr;
311 int ret;
312
313 /* create socket and set it as nonblocking */
314 listener = mtcp_socket(ctx->mctx, AF_INET, SOCK_STREAM, 0);
315 if (listener < 0) {
316 TRACE_ERROR("Failed to create listening socket!\n");
317 return -1;
318 }
319 ret = mtcp_setsock_nonblock(ctx->mctx, listener);
320 if (ret < 0) {
321 TRACE_ERROR("Failed to set socket in nonblocking mode.\n");
322 return -1;
323 }
324
325 /* bind to port 80 */
326 saddr.sin_family = AF_INET;
327 saddr.sin_addr.s_addr = INADDR_ANY;
328 saddr.sin_port = htons(80);
329 ret = mtcp_bind(ctx->mctx, listener,
330 (struct sockaddr *)&saddr, sizeof(struct sockaddr_in));
331 if (ret < 0) {
332 TRACE_ERROR("Failed to bind to the listening socket!\n");
333 return -1;
334 }
335
336 /* listen (backlog: 4K) */
337 ret = mtcp_listen(ctx->mctx, listener, 4096);
338 if (ret < 0) {
339 TRACE_ERROR("mtcp_listen() failed!\n");
340 return -1;
341 }
342
343 /* wait for incoming accept events */
344 ev.events = MOS_EPOLLIN;
345 ev.data.sock = listener;
346 mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_ADD, listener, &ev);
347
348 return listener;
349 }
350 /*----------------------------------------------------------------------------*/
351 static void
GlobInitServer()352 GlobInitServer()
353 {
354 DIR *dir;
355 struct dirent *ent;
356 int fd;
357 int ret;
358 uint64_t total_read;
359
360 num_cores = GetNumCPUs();
361 core_limit = num_cores;
362
363 if (LoadConfig(CONFIG_FILE, g_conf, NUM_CONF_VAR))
364 exit(-1);
365
366 core_limit = atoi(g_conf[0].value);
367 www_main = g_conf[1].value;
368
369 /* open the directory to serve */
370 dir = opendir(www_main);
371 if (!dir) {
372 TRACE_ERROR("Failed to open %s.\n", www_main);
373 perror("opendir");
374 exit(-1);
375 }
376
377 nfiles = 0;
378 while ((ent = readdir(dir)) != NULL) {
379 if (strcmp(ent->d_name, ".") == 0)
380 continue;
381 else if (strcmp(ent->d_name, "..") == 0)
382 continue;
383
384 strcpy(fcache[nfiles].name, ent->d_name);
385 sprintf(fcache[nfiles].fullname, "%s/%s", www_main, ent->d_name);
386 fd = open(fcache[nfiles].fullname, O_RDONLY);
387 if (fd < 0) {
388 perror("open");
389 continue;
390 } else {
391 fcache[nfiles].size = lseek64(fd, 0, SEEK_END);
392 lseek64(fd, 0, SEEK_SET);
393 }
394
395 fcache[nfiles].file = (char *)malloc(fcache[nfiles].size);
396 if (!fcache[nfiles].file) {
397 TRACE_ERROR("Failed to allocate memory for file %s\n",
398 fcache[nfiles].name);
399 perror("malloc");
400 continue;
401 }
402
403 TRACE_INFO("Reading %s (%lu bytes)\n",
404 fcache[nfiles].name, fcache[nfiles].size);
405 total_read = 0;
406 while (1) {
407 ret = read(fd, fcache[nfiles].file + total_read,
408 fcache[nfiles].size - total_read);
409 if (ret < 0) {
410 break;
411 } else if (ret == 0) {
412 break;
413 }
414 total_read += ret;
415 }
416 if (total_read < fcache[nfiles].size) {
417 free(fcache[nfiles].file);
418 continue;
419 }
420 close(fd);
421 nfiles++;
422
423 if (nfiles >= MAX_FILES)
424 break;
425 }
426
427 finished = 0;
428
429 return;
430 }
431 /*----------------------------------------------------------------------------*/
432 static void
InitServer(mctx_t mctx,void ** app_ctx)433 InitServer(mctx_t mctx, void **app_ctx)
434 {
435 struct thread_context *ctx;
436
437 ctx = (struct thread_context *)calloc(1, sizeof(struct thread_context));
438 if (!ctx) {
439 TRACE_ERROR("Failed to create thread context!\n");
440 exit(-1);
441 }
442
443 ctx->mctx = mctx;
444
445 /* create epoll descriptor */
446 ctx->ep = mtcp_epoll_create(mctx, MAX_EVENTS);
447 if (ctx->ep < 0) {
448 TRACE_ERROR("Failed to create epoll descriptor!\n");
449 exit(-1);
450 }
451
452 /* allocate memory for server variables */
453 ctx->svars = (struct server_vars *)
454 calloc(MAX_FLOW_NUM, sizeof(struct server_vars));
455 if (!ctx->svars) {
456 TRACE_ERROR("Failed to create server_vars struct!\n");
457 exit(-1);
458 }
459
460 ctx->listener = CreateListeningSocket(ctx);
461 if (ctx->listener < 0) {
462 TRACE_ERROR("Failed to create listening socket.\n");
463 exit(-1);
464 }
465
466 *app_ctx = (void *)ctx;
467
468 return;
469 }
470 /*----------------------------------------------------------------------------*/
471 static void
RunServer(mctx_t mctx,void ** app_ctx)472 RunServer(mctx_t mctx, void **app_ctx)
473 {
474 struct thread_context *ctx = (*app_ctx);
475 int nevents;
476 int i, ret;
477 int do_accept;
478 struct mtcp_epoll_event *events;
479
480 assert(ctx);
481 int ep = ctx->ep;
482
483 events = (struct mtcp_epoll_event *)
484 calloc(MAX_EVENTS, sizeof(struct mtcp_epoll_event));
485 if (!events) {
486 TRACE_ERROR("Failed to create event struct!\n");
487 exit(-1);
488 }
489
490 while (1) {
491 nevents = mtcp_epoll_wait(mctx, ep, events, MAX_EVENTS, -1);
492 if (nevents < 0) {
493 if (errno != EINTR)
494 perror("mtcp_epoll_wait");
495 break;
496 }
497
498 do_accept = FALSE;
499 for (i = 0; i < nevents; i++) {
500
501 if (events[i].data.sock == ctx->listener) {
502 /* if the event is for the listener, accept connection */
503 do_accept = TRUE;
504
505 } else if (events[i].events & MOS_EPOLLERR) {
506 int err;
507 socklen_t len = sizeof(err);
508
509 /* error on the connection */
510 TRACE_APP("[CPU %d] Error on socket %d\n",
511 core, events[i].data.sock);
512 if (mtcp_getsockopt(mctx, events[i].data.sock,
513 SOL_SOCKET, SO_ERROR, (void *)&err, &len) == 0) {
514 if (err != ETIMEDOUT) {
515 fprintf(stderr, "Error on socket %d: %s\n",
516 events[i].data.sock, strerror(err));
517 }
518 } else {
519 fprintf(stderr, "mtcp_getsockopt: %s (for sockid: %d)\n",
520 strerror(errno), events[i].data.sock);
521 exit(-1);
522 }
523 CloseConnection(ctx, events[i].data.sock,
524 &ctx->svars[events[i].data.sock]);
525
526 } else if (events[i].events & MOS_EPOLLIN) {
527 ret = HandleReadEvent(ctx, events[i].data.sock,
528 &ctx->svars[events[i].data.sock]);
529
530 if (ret == 0) {
531 /* connection closed by remote host */
532 CloseConnection(ctx, events[i].data.sock,
533 &ctx->svars[events[i].data.sock]);
534 } else if (ret < 0) {
535 /* if not EAGAIN, it's an error */
536 if (errno != EAGAIN) {
537 CloseConnection(ctx, events[i].data.sock,
538 &ctx->svars[events[i].data.sock]);
539 }
540 }
541
542 } else if (events[i].events & MOS_EPOLLOUT) {
543 struct server_vars *sv = &ctx->svars[events[i].data.sock];
544 if (sv->rspheader_sent) {
545 SendUntilAvailable(ctx, events[i].data.sock, sv);
546 } else {
547 TRACE_APP("Socket %d: Response header not sent yet.\n",
548 events[i].data.sock);
549 }
550
551 } else {
552 assert(0);
553 }
554 }
555
556 /* if do_accept flag is set, accept connections */
557 if (do_accept) {
558 while (1) {
559 ret = AcceptConnection(ctx, ctx->listener);
560 if (ret < 0)
561 break;
562 }
563 }
564
565 }
566
567 return;
568 }
569 /*----------------------------------------------------------------------------*/
570 void
RunApplication(mctx_t mctx)571 RunApplication(mctx_t mctx)
572 {
573 void *app_ctx;
574
575 app_ctx = (void *)calloc(1, sizeof(void *));
576 if (!app_ctx) {
577 TRACE_ERROR("calloc failure\n");
578 return;
579 }
580
581 TRACE_INFO("run application on core %d\n", mctx->cpu);
582 InitServer(mctx, &(app_ctx));
583 RunServer(mctx, &(app_ctx));
584 }
585 /*----------------------------------------------------------------------------*/
586 void *
RunMTCP(void * arg)587 RunMTCP(void *arg)
588 {
589 int core = *(int *)arg;
590 mctx_t mctx;
591
592 mtcp_core_affinitize(core);
593
594 /* mTCP Initialization */
595 mctx = mtcp_create_context(core);
596 if (!mctx) {
597 pthread_exit(NULL);
598 TRACE_ERROR("Failed to craete mtcp context.\n");
599 return NULL;
600 }
601
602 /* Run application here */
603 RunApplication(mctx);
604
605 /* mTCP Tear Down */
606 mtcp_destroy_context(mctx);
607 pthread_exit(NULL);
608
609 return NULL;
610 }
611 /*----------------------------------------------------------------------------*/
612 int
main(int argc,char ** argv)613 main(int argc, char **argv)
614 {
615 int ret, i;
616 int cores[MAX_CPUS];
617 char *fname = "config/mos.conf";
618
619 int opt;
620 while ((opt = getopt(argc, argv, "f:")) != -1) {
621 switch (opt) {
622 case 'f':
623 fname = optarg;
624 break;
625 default:
626 printf("Usage: %s [-f config_file]\n", argv[0]);
627 return 0;
628 }
629
630 }
631
632 core_limit = sysconf(_SC_NPROCESSORS_ONLN);
633
634 ret = mtcp_init(fname);
635 if (ret) {
636 TRACE_ERROR("Failed to initialize mtcp.\n");
637 exit(EXIT_FAILURE);
638 }
639
640 mtcp_getconf(&g_mcfg);
641
642 core_limit = g_mcfg.num_cores;
643
644 GlobInitServer();
645
646 for (i = 0; i < core_limit; i++) {
647 cores[i] = i;
648
649 /* Run mtcp thread */
650 if ((g_mcfg.cpu_mask & (1L << i)) &&
651 pthread_create(&mtcp_thread[i], NULL, RunMTCP, (void *)&cores[i])) {
652 perror("pthread_create");
653 TRACE_ERROR("Failed to create msg_test thread.\n");
654 exit(-1);
655 }
656 }
657
658 for (i = 0; i < core_limit; i++) {
659 if (g_mcfg.cpu_mask & (1L << i))
660 pthread_join(mtcp_thread[i], NULL);
661 TRACE_INFO("Message test thread %d joined.\n", i);
662 }
663
664 mtcp_destroy();
665 return 0;
666 }
667 /*----------------------------------------------------------------------------*/
668