1 #define _LARGEFILE64_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <unistd.h>
5 #include <stdint.h>
6 #include <sys/types.h>
7 #include <sys/stat.h>
8 #include <sys/socket.h>
9 #include <netinet/in.h>
10 #include <arpa/inet.h>
11 #include <fcntl.h>
12 #include <dirent.h>
13 #include <string.h>
14 #include <time.h>
15 #include <pthread.h>
16 #include <signal.h>
17
18 #include <mos_api.h>
19 #include "cpu.h"
20 #include "http_parsing.h"
21 #include "debug.h"
22 #include "applib.h"
23
24 #define CONFIG_FILE "config/epserver.conf"
25 static struct conf_var g_conf[] = {
26 { "core_limit", {0} },
27 { "www_main", {0} },
28 };
29 #define NUM_CONF_VAR (sizeof(g_conf) / sizeof(struct conf_var))
30
31 #define HTTP_HEADER_LEN 1024
32 #define URL_LEN 128
33
34 /* shinae 10.27.2014
35 * SNDBUF_SIZE should be removed
36 */
37 #define SNDBUF_SIZE (8*1024)
38
39 #define MAX_FILES 100
40 int cores[MAX_CPUS] = {0};
41 /*----------------------------------------------------------------------------*/
42 struct mtcp_conf g_mcfg;
43 /*----------------------------------------------------------------------------*/
44 struct file_cache
45 {
46 char name[128];
47 char fullname[256];
48 uint64_t size;
49 char *file;
50 };
51 /*----------------------------------------------------------------------------*/
52 struct server_vars
53 {
54 char request[HTTP_HEADER_LEN];
55 int recv_len;
56 int request_len;
57 long int total_read, total_sent;
58 uint8_t done;
59 uint8_t rspheader_sent;
60 uint8_t keep_alive;
61
62 int fidx; // file cache index
63 char fname[128]; // file name
64 long int fsize; // file size
65 };
66 /*----------------------------------------------------------------------------*/
67 struct thread_context
68 {
69 mctx_t mctx;
70 int listener;
71 int ep;
72 struct server_vars *svars;
73 };
74 /*----------------------------------------------------------------------------*/
75 static int num_cores;
76 static int core_limit;
77 /*----------------------------------------------------------------------------*/
78 char *www_main;
79 static struct file_cache fcache[MAX_FILES];
80 static int nfiles;
81 /*----------------------------------------------------------------------------*/
82 static int finished;
83 /*----------------------------------------------------------------------------*/
84 static char *
StatusCodeToString(int scode)85 StatusCodeToString(int scode)
86 {
87 switch (scode) {
88 case 200:
89 return "OK";
90 break;
91
92 case 404:
93 return "Not Found";
94 break;
95 }
96
97 return NULL;
98 }
99 /*----------------------------------------------------------------------------*/
100 static void
CleanServerVariable(struct server_vars * sv)101 CleanServerVariable(struct server_vars *sv)
102 {
103 sv->recv_len = 0;
104 sv->request_len = 0;
105 sv->total_read = 0;
106 sv->total_sent = 0;
107 sv->done = 0;
108 sv->rspheader_sent = 0;
109 sv->keep_alive = 0;
110 }
111 /*----------------------------------------------------------------------------*/
112 static void
CloseConnection(struct thread_context * ctx,int sockid,struct server_vars * sv)113 CloseConnection(struct thread_context *ctx, int sockid, struct server_vars *sv)
114 {
115 mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_DEL, sockid, NULL);
116 mtcp_close(ctx->mctx, sockid);
117 }
118 /*----------------------------------------------------------------------------*/
119 static int
SendUntilAvailable(struct thread_context * ctx,int sockid,struct server_vars * sv)120 SendUntilAvailable(struct thread_context *ctx, int sockid, struct server_vars *sv)
121 {
122 int ret;
123 int sent;
124 int len;
125
126 if (sv->done || !sv->rspheader_sent) {
127 return 0;
128 }
129
130 sent = 0;
131 ret = 1;
132 while (ret > 0) {
133 len = MIN(SNDBUF_SIZE, sv->fsize - sv->total_sent);
134 if (len <= 0) {
135 break;
136 }
137 ret = mtcp_write(ctx->mctx, sockid,
138 fcache[sv->fidx].file + sv->total_sent, len);
139 if (ret < 0) {
140 if (errno != EAGAIN) {
141 TRACE_ERROR("Socket %d: Sending HTTP response body failed. "
142 "try: %d, sent: %d\n", sockid, len, ret);
143 }
144 break;
145 }
146 TRACE_APP("Socket %d: mtcp_write try: %d, ret: %d\n", sockid, len, ret);
147 sent += ret;
148 sv->total_sent += ret;
149 }
150
151 if (sv->total_sent >= fcache[sv->fidx].size) {
152 struct mtcp_epoll_event ev;
153 sv->done = TRUE;
154 finished++;
155
156 if (sv->keep_alive) {
157 /* if keep-alive connection, wait for the incoming request */
158 ev.events = MOS_EPOLLIN;
159 ev.data.sock = sockid;
160 mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_MOD, sockid, &ev);
161
162 CleanServerVariable(sv);
163 } else {
164 /* else, close connection */
165 CloseConnection(ctx, sockid, sv);
166 }
167 }
168
169 return sent;
170 }
171 /*----------------------------------------------------------------------------*/
172 static int
HandleReadEvent(struct thread_context * ctx,int sockid,struct server_vars * sv)173 HandleReadEvent(struct thread_context *ctx, int sockid, struct server_vars *sv)
174 {
175 struct mtcp_epoll_event ev;
176 char buf[HTTP_HEADER_LEN];
177 char url[URL_LEN];
178 char response[HTTP_HEADER_LEN];
179 int scode; // status code
180 time_t t_now;
181 char t_str[128];
182 char keepalive_str[128];
183 int rd;
184 int i;
185 int len;
186 int sent;
187
188 /* HTTP request handling */
189 rd = mtcp_read(ctx->mctx, sockid, buf, HTTP_HEADER_LEN);
190 if (rd <= 0) {
191 return rd;
192 }
193 memcpy(sv->request + sv->recv_len,
194 (char *)buf, MIN(rd, HTTP_HEADER_LEN - sv->recv_len));
195 sv->recv_len += rd;
196 //sv->request[rd] = '\0';
197 //fprintf(stderr, "HTTP Request: \n%s", request);
198 sv->request_len = find_http_header(sv->request, sv->recv_len);
199 if (sv->request_len <= 0) {
200 TRACE_ERROR("Socket %d: Failed to parse HTTP request header.\n"
201 "read bytes: %d, recv_len: %d, "
202 "request_len: %d, strlen: %ld, request: \n%s\n",
203 sockid, rd, sv->recv_len,
204 sv->request_len, strlen(sv->request), sv->request);
205 return rd;
206 }
207
208 http_get_url(sv->request, sv->request_len, url, URL_LEN);
209 TRACE_APP("Socket %d URL: %s\n", sockid, url);
210 sprintf(sv->fname, "%s%s", www_main, url);
211 TRACE_APP("Socket %d File name: %s\n", sockid, sv->fname);
212
213
214 sv->keep_alive = FALSE;
215 if (http_header_str_val(sv->request, CONN_HDR_FLD,
216 sizeof(CONN_HDR_FLD)-1, keepalive_str, sizeof(keepalive_str))) {
217 sv->keep_alive = !strcasecmp(keepalive_str, KEEP_ALIVE_STR);
218 }
219
220 /* Find file in cache */
221 scode = 404;
222 for (i = 0; i < nfiles; i++) {
223 if (strcmp(sv->fname, fcache[i].fullname) == 0) {
224 sv->fsize = fcache[i].size;
225 sv->fidx = i;
226 scode = 200;
227 break;
228 }
229 }
230 TRACE_APP("Socket %d File size: %ld (%ldMB)\n",
231 sockid, sv->fsize, sv->fsize / 1024 / 1024);
232
233 /* Response header handling */
234 time(&t_now);
235 strftime(t_str, 128, "%a, %d %b %Y %X GMT", gmtime(&t_now));
236 if (sv->keep_alive)
237 sprintf(keepalive_str, "Keep-Alive");
238 else
239 sprintf(keepalive_str, "Close");
240
241 sprintf(response, "HTTP/1.1 %d %s\r\n"
242 "Date: %s\r\n"
243 "Server: Webserver on Middlebox TCP (Ubuntu)\r\n"
244 "Content-Length: %ld\r\n"
245 "Connection: %s\r\n\r\n",
246 scode, StatusCodeToString(scode), t_str, sv->fsize, keepalive_str);
247 len = strlen(response);
248 TRACE_APP("Socket %d HTTP Response: \n%s", sockid, response);
249 sent = mtcp_write(ctx->mctx, sockid, response, len);
250 if (sent < len) {
251 TRACE_ERROR("Socket %d: Sending HTTP response failed. "
252 "try: %d, sent: %d\n", sockid, len, sent);
253 CloseConnection(ctx, sockid, sv);
254 }
255 TRACE_APP("Socket %d Sent response header: try: %d, sent: %d\n",
256 sockid, len, sent);
257 assert(sent == len);
258 sv->rspheader_sent = TRUE;
259
260 ev.events = MOS_EPOLLIN | MOS_EPOLLOUT;
261 ev.data.sock = sockid;
262 mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_MOD, sockid, &ev);
263
264 SendUntilAvailable(ctx, sockid, sv);
265
266 return rd;
267 }
268 /*----------------------------------------------------------------------------*/
269 static int
AcceptConnection(struct thread_context * ctx,int listener)270 AcceptConnection(struct thread_context *ctx, int listener)
271 {
272 mctx_t mctx = ctx->mctx;
273 struct server_vars *sv;
274 struct mtcp_epoll_event ev;
275 int c;
276
277 c = mtcp_accept(mctx, listener, NULL, NULL);
278
279 if (c >= 0) {
280 if (c >= MAX_FLOW_NUM) {
281 TRACE_ERROR("Invalid socket id %d.\n", c);
282 return -1;
283 }
284
285 sv = &ctx->svars[c];
286 CleanServerVariable(sv);
287 TRACE_APP("New connection %d accepted.\n", c);
288 ev.events = MOS_EPOLLIN;
289 ev.data.sock = c;
290 mtcp_setsock_nonblock(ctx->mctx, c);
291 mtcp_epoll_ctl(mctx, ctx->ep, MOS_EPOLL_CTL_ADD, c, &ev);
292 TRACE_APP("Socket %d registered.\n", c);
293
294 } else {
295 if (errno != EAGAIN) {
296 TRACE_ERROR("mtcp_accept() error %s\n",
297 strerror(errno));
298 }
299 }
300
301 return c;
302 }
303 /*----------------------------------------------------------------------------*/
304 static int
CreateListeningSocket(struct thread_context * ctx)305 CreateListeningSocket(struct thread_context *ctx)
306 {
307 int listener;
308 struct mtcp_epoll_event ev;
309 struct sockaddr_in saddr;
310 int ret;
311
312 /* create socket and set it as nonblocking */
313 listener = mtcp_socket(ctx->mctx, AF_INET, SOCK_STREAM, 0);
314 if (listener < 0) {
315 TRACE_ERROR("Failed to create listening socket!\n");
316 return -1;
317 }
318 ret = mtcp_setsock_nonblock(ctx->mctx, listener);
319 if (ret < 0) {
320 TRACE_ERROR("Failed to set socket in nonblocking mode.\n");
321 return -1;
322 }
323
324 /* bind to port 80 */
325 saddr.sin_family = AF_INET;
326 saddr.sin_addr.s_addr = INADDR_ANY;
327 saddr.sin_port = htons(80);
328 ret = mtcp_bind(ctx->mctx, listener,
329 (struct sockaddr *)&saddr, sizeof(struct sockaddr_in));
330 if (ret < 0) {
331 TRACE_ERROR("Failed to bind to the listening socket!\n");
332 return -1;
333 }
334
335 /* listen (backlog: 4K) */
336 ret = mtcp_listen(ctx->mctx, listener, 4096);
337 if (ret < 0) {
338 TRACE_ERROR("mtcp_listen() failed!\n");
339 return -1;
340 }
341
342 /* wait for incoming accept events */
343 ev.events = MOS_EPOLLIN;
344 ev.data.sock = listener;
345 mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_ADD, listener, &ev);
346
347 return listener;
348 }
349 /*----------------------------------------------------------------------------*/
350 static void
GlobInitServer()351 GlobInitServer()
352 {
353 DIR *dir;
354 struct dirent *ent;
355 int fd;
356 int ret;
357 uint64_t total_read;
358
359 num_cores = GetNumCPUs();
360 core_limit = num_cores;
361
362 if (LoadConfig(CONFIG_FILE, g_conf, NUM_CONF_VAR))
363 exit(-1);
364
365 core_limit = atoi(g_conf[0].value);
366 www_main = g_conf[1].value;
367
368 /* open the directory to serve */
369 dir = opendir(www_main);
370 if (!dir) {
371 TRACE_ERROR("Failed to open %s.\n", www_main);
372 perror("opendir");
373 exit(-1);
374 }
375
376 nfiles = 0;
377 while ((ent = readdir(dir)) != NULL) {
378 if (strcmp(ent->d_name, ".") == 0)
379 continue;
380 else if (strcmp(ent->d_name, "..") == 0)
381 continue;
382
383 strcpy(fcache[nfiles].name, ent->d_name);
384 sprintf(fcache[nfiles].fullname, "%s/%s", www_main, ent->d_name);
385 fd = open(fcache[nfiles].fullname, O_RDONLY);
386 if (fd < 0) {
387 perror("open");
388 continue;
389 } else {
390 fcache[nfiles].size = lseek64(fd, 0, SEEK_END);
391 lseek64(fd, 0, SEEK_SET);
392 }
393
394 fcache[nfiles].file = (char *)malloc(fcache[nfiles].size);
395 if (!fcache[nfiles].file) {
396 TRACE_ERROR("Failed to allocate memory for file %s\n",
397 fcache[nfiles].name);
398 perror("malloc");
399 continue;
400 }
401
402 TRACE_INFO("Reading %s (%lu bytes)\n",
403 fcache[nfiles].name, fcache[nfiles].size);
404 total_read = 0;
405 while (1) {
406 ret = read(fd, fcache[nfiles].file + total_read,
407 fcache[nfiles].size - total_read);
408 if (ret < 0) {
409 break;
410 } else if (ret == 0) {
411 break;
412 }
413 total_read += ret;
414 }
415 if (total_read < fcache[nfiles].size) {
416 free(fcache[nfiles].file);
417 continue;
418 }
419 close(fd);
420 nfiles++;
421
422 if (nfiles >= MAX_FILES)
423 break;
424 }
425
426 finished = 0;
427
428 return;
429 }
430 /*----------------------------------------------------------------------------*/
431 static void
InitServer(mctx_t mctx,void ** app_ctx)432 InitServer(mctx_t mctx, void **app_ctx)
433 {
434 struct thread_context *ctx;
435
436 ctx = (struct thread_context *)calloc(1, sizeof(struct thread_context));
437 if (!ctx) {
438 TRACE_ERROR("Failed to create thread context!\n");
439 exit(-1);
440 }
441
442 ctx->mctx = mctx;
443
444 /* create epoll descriptor */
445 ctx->ep = mtcp_epoll_create(mctx, MAX_EVENTS);
446 if (ctx->ep < 0) {
447 TRACE_ERROR("Failed to create epoll descriptor!\n");
448 exit(-1);
449 }
450
451 /* allocate memory for server variables */
452 ctx->svars = (struct server_vars *)
453 calloc(MAX_FLOW_NUM, sizeof(struct server_vars));
454 if (!ctx->svars) {
455 TRACE_ERROR("Failed to create server_vars struct!\n");
456 exit(-1);
457 }
458
459 ctx->listener = CreateListeningSocket(ctx);
460 if (ctx->listener < 0) {
461 TRACE_ERROR("Failed to create listening socket.\n");
462 exit(-1);
463 }
464
465 *app_ctx = (void *)ctx;
466
467 return;
468 }
469 /*----------------------------------------------------------------------------*/
470 static void
RunServer(mctx_t mctx,void ** app_ctx)471 RunServer(mctx_t mctx, void **app_ctx)
472 {
473 struct thread_context *ctx = (*app_ctx);
474 int nevents;
475 int i, ret;
476 int do_accept;
477 struct mtcp_epoll_event *events;
478
479 assert(ctx);
480 int ep = ctx->ep;
481
482 events = (struct mtcp_epoll_event *)
483 calloc(MAX_EVENTS, sizeof(struct mtcp_epoll_event));
484 if (!events) {
485 TRACE_ERROR("Failed to create event struct!\n");
486 exit(-1);
487 }
488
489 while (1) {
490 nevents = mtcp_epoll_wait(mctx, ep, events, MAX_EVENTS, -1);
491 if (nevents < 0) {
492 if (errno != EINTR)
493 perror("mtcp_epoll_wait");
494 break;
495 }
496
497 do_accept = FALSE;
498 for (i = 0; i < nevents; i++) {
499
500 if (events[i].data.sock == ctx->listener) {
501 /* if the event is for the listener, accept connection */
502 do_accept = TRUE;
503
504 } else if (events[i].events & MOS_EPOLLERR) {
505 int err;
506 socklen_t len = sizeof(err);
507
508 /* error on the connection */
509 TRACE_APP("[CPU %d] Error on socket %d\n",
510 core, events[i].data.sock);
511 if (mtcp_getsockopt(mctx, events[i].data.sock,
512 SOL_SOCKET, SO_ERROR, (void *)&err, &len) == 0) {
513 if (err != ETIMEDOUT) {
514 fprintf(stderr, "Error on socket %d: %s\n",
515 events[i].data.sock, strerror(err));
516 }
517 } else {
518 fprintf(stderr, "mtcp_getsockopt: %s (for sockid: %d)\n",
519 strerror(errno), events[i].data.sock);
520 exit(-1);
521 }
522 CloseConnection(ctx, events[i].data.sock,
523 &ctx->svars[events[i].data.sock]);
524
525 } else if (events[i].events & MOS_EPOLLIN) {
526 ret = HandleReadEvent(ctx, events[i].data.sock,
527 &ctx->svars[events[i].data.sock]);
528
529 if (ret == 0) {
530 /* connection closed by remote host */
531 CloseConnection(ctx, events[i].data.sock,
532 &ctx->svars[events[i].data.sock]);
533 } else if (ret < 0) {
534 /* if not EAGAIN, it's an error */
535 if (errno != EAGAIN) {
536 CloseConnection(ctx, events[i].data.sock,
537 &ctx->svars[events[i].data.sock]);
538 }
539 }
540
541 } else if (events[i].events & MOS_EPOLLOUT) {
542 struct server_vars *sv = &ctx->svars[events[i].data.sock];
543 if (sv->rspheader_sent) {
544 SendUntilAvailable(ctx, events[i].data.sock, sv);
545 } else {
546 TRACE_APP("Socket %d: Response header not sent yet.\n",
547 events[i].data.sock);
548 }
549
550 } else {
551 assert(0);
552 }
553 }
554
555 /* if do_accept flag is set, accept connections */
556 if (do_accept) {
557 while (1) {
558 ret = AcceptConnection(ctx, ctx->listener);
559 if (ret < 0)
560 break;
561 }
562 }
563
564 }
565
566 return;
567 }
568 /*----------------------------------------------------------------------------*/
569 void
RunApplication(mctx_t mctx)570 RunApplication(mctx_t mctx)
571 {
572 void *app_ctx;
573
574 app_ctx = (void *)calloc(1, sizeof(void *));
575 if (!app_ctx) {
576 TRACE_ERROR("calloc failure\n");
577 return;
578 }
579
580 TRACE_INFO("run application on core %d\n", mctx->cpu);
581 InitServer(mctx, &(app_ctx));
582 RunServer(mctx, &(app_ctx));
583 }
584 /*----------------------------------------------------------------------------*/
585 void *
RunMTCP(void * arg)586 RunMTCP(void *arg)
587 {
588 int core = *(int *)arg;
589 mctx_t mctx;
590
591 mtcp_core_affinitize(core);
592
593 /* mTCP Initialization */
594 mctx = mtcp_create_context(core);
595 if (!mctx) {
596 pthread_exit(NULL);
597 TRACE_ERROR("Failed to craete mtcp context.\n");
598 return NULL;
599 }
600
601 /* Run application here */
602 RunApplication(mctx);
603
604 /* mTCP Tear Down */
605 mtcp_destroy_context(mctx);
606
607 return NULL;
608 }
609 /*----------------------------------------------------------------------------*/
610 int
main(int argc,char ** argv)611 main(int argc, char **argv)
612 {
613 int ret/*, i*/;
614 int current_core = -1;
615 char *fname = "config/mos.conf";
616
617 int opt;
618 while ((opt = getopt(argc, argv, "f:c:")) != -1) {
619 switch (opt) {
620 case 'f':
621 fname = optarg;
622 break;
623 case 'c':
624 current_core = atoi(optarg);
625 if (current_core >= MAX_CPUS) {
626 TRACE_ERROR("Invalid core id!\n");
627 exit(EXIT_FAILURE);
628 }
629 break;
630 default:
631 printf("Usage: %s [-f config_file]\n", argv[0]);
632 return 0;
633 }
634
635 }
636
637 core_limit = sysconf(_SC_NPROCESSORS_ONLN);
638
639 ret = mtcp_init(fname);
640 if (ret) {
641 TRACE_ERROR("Failed to initialize mtcp.\n");
642 exit(EXIT_FAILURE);
643 }
644
645 if (current_core < 0) {
646 TRACE_ERROR("Invalid core id.\n");
647 exit(EXIT_FAILURE);
648 }
649
650 mtcp_getconf(&g_mcfg);
651
652 core_limit = g_mcfg.num_cores;
653
654 GlobInitServer();
655
656 RunMTCP(¤t_core);
657
658 mtcp_destroy();
659 return 0;
660 }
661 /*----------------------------------------------------------------------------*/
662