1 #define _LARGEFILE64_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <unistd.h>
5 #include <stdint.h>
6 #include <sys/types.h>
7 #include <sys/stat.h>
8 #include <sys/socket.h>
9 #include <netinet/in.h>
10 #include <arpa/inet.h>
11 #include <fcntl.h>
12 #include <dirent.h>
13 #include <string.h>
14 #include <time.h>
15 #include <pthread.h>
16 #include <signal.h>
17 
18 #include <mos_api.h>
19 #include "cpu.h"
20 #include "http_parsing.h"
21 #include "debug.h"
22 #include "applib.h"
23 
24 #define CONFIG_FILE       "config/epserver.conf"
25 static struct conf_var g_conf[] = {
26 	{ "core_limit", {0} },
27 	{ "www_main",   {0} },
28 };
29 #define NUM_CONF_VAR (sizeof(g_conf) / sizeof(struct conf_var))
30 
31 #define HTTP_HEADER_LEN 1024
32 #define URL_LEN 128
33 
34 /* shinae 10.27.2014
35  * SNDBUF_SIZE should be removed
36  */
37 #define SNDBUF_SIZE (8*1024)
38 
39 #define MAX_FILES 100
40 int cores[MAX_CPUS] = {0};
41 /*----------------------------------------------------------------------------*/
42 struct mtcp_conf g_mcfg;
43 /*----------------------------------------------------------------------------*/
44 struct file_cache
45 {
46 	char name[128];
47 	char fullname[256];
48 	uint64_t size;
49 	char *file;
50 };
51 /*----------------------------------------------------------------------------*/
52 struct server_vars
53 {
54 	char request[HTTP_HEADER_LEN];
55 	int recv_len;
56 	int request_len;
57 	long int total_read, total_sent;
58 	uint8_t done;
59 	uint8_t rspheader_sent;
60 	uint8_t keep_alive;
61 
62 	int fidx;						// file cache index
63 	char fname[128];				// file name
64 	long int fsize;					// file size
65 };
66 /*----------------------------------------------------------------------------*/
67 struct thread_context
68 {
69 	mctx_t mctx;
70 	int listener;
71 	int ep;
72 	struct server_vars *svars;
73 };
74 /*----------------------------------------------------------------------------*/
75 static int num_cores;
76 static int core_limit;
77 /*----------------------------------------------------------------------------*/
78 char *www_main;
79 static struct file_cache fcache[MAX_FILES];
80 static int nfiles;
81 /*----------------------------------------------------------------------------*/
82 static int finished;
83 /*----------------------------------------------------------------------------*/
84 static char *
StatusCodeToString(int scode)85 StatusCodeToString(int scode)
86 {
87 	switch (scode) {
88 		case 200:
89 			return "OK";
90 			break;
91 
92 		case 404:
93 			return "Not Found";
94 			break;
95 	}
96 
97 	return NULL;
98 }
99 /*----------------------------------------------------------------------------*/
100 static void
CleanServerVariable(struct server_vars * sv)101 CleanServerVariable(struct server_vars *sv)
102 {
103 	sv->recv_len = 0;
104 	sv->request_len = 0;
105 	sv->total_read = 0;
106 	sv->total_sent = 0;
107 	sv->done = 0;
108 	sv->rspheader_sent = 0;
109 	sv->keep_alive = 0;
110 }
111 /*----------------------------------------------------------------------------*/
112 static void
CloseConnection(struct thread_context * ctx,int sockid,struct server_vars * sv)113 CloseConnection(struct thread_context *ctx, int sockid, struct server_vars *sv)
114 {
115 	mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_DEL, sockid, NULL);
116 	mtcp_close(ctx->mctx, sockid);
117 }
118 /*----------------------------------------------------------------------------*/
119 static int
SendUntilAvailable(struct thread_context * ctx,int sockid,struct server_vars * sv)120 SendUntilAvailable(struct thread_context *ctx, int sockid, struct server_vars *sv)
121 {
122 	int ret;
123 	int sent;
124 	int len;
125 
126 	if (sv->done || !sv->rspheader_sent) {
127 		return 0;
128 	}
129 
130 	sent = 0;
131 	ret = 1;
132 	while (ret > 0) {
133 		len = MIN(SNDBUF_SIZE, sv->fsize - sv->total_sent);
134 		if (len <= 0) {
135 			break;
136 		}
137 		ret = mtcp_write(ctx->mctx, sockid,
138 				fcache[sv->fidx].file + sv->total_sent, len);
139 		if (ret < 0) {
140 			if (errno != EAGAIN) {
141 				TRACE_ERROR("Socket %d: Sending HTTP response body failed. "
142 						"try: %d, sent: %d\n", sockid, len, ret);
143 			}
144 			break;
145 		}
146 		TRACE_APP("Socket %d: mtcp_write try: %d, ret: %d\n", sockid, len, ret);
147 		sent += ret;
148 		sv->total_sent += ret;
149 	}
150 
151 	if (sv->total_sent >= fcache[sv->fidx].size) {
152 		struct mtcp_epoll_event ev;
153 		sv->done = TRUE;
154 		finished++;
155 
156 		if (sv->keep_alive) {
157 			/* if keep-alive connection, wait for the incoming request */
158 			ev.events = MOS_EPOLLIN;
159 			ev.data.sock = sockid;
160 			mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_MOD, sockid, &ev);
161 
162 			CleanServerVariable(sv);
163 		} else {
164 			/* else, close connection */
165 			CloseConnection(ctx, sockid, sv);
166 		}
167 	}
168 
169 	return sent;
170 }
171 /*----------------------------------------------------------------------------*/
172 static int
HandleReadEvent(struct thread_context * ctx,int sockid,struct server_vars * sv)173 HandleReadEvent(struct thread_context *ctx, int sockid, struct server_vars *sv)
174 {
175 	struct mtcp_epoll_event ev;
176 	char buf[HTTP_HEADER_LEN];
177 	char url[URL_LEN];
178 	char response[HTTP_HEADER_LEN];
179 	int scode;						// status code
180 	time_t t_now;
181 	char t_str[128];
182 	char keepalive_str[128];
183 	int rd;
184 	int i;
185 	int len;
186 	int sent;
187 
188 	/* HTTP request handling */
189 	rd = mtcp_read(ctx->mctx, sockid, buf, HTTP_HEADER_LEN);
190 	if (rd <= 0) {
191 		return rd;
192 	}
193 	memcpy(sv->request + sv->recv_len,
194 			(char *)buf, MIN(rd, HTTP_HEADER_LEN - sv->recv_len));
195 	sv->recv_len += rd;
196 	//sv->request[rd] = '\0';
197 	//fprintf(stderr, "HTTP Request: \n%s", request);
198 	sv->request_len = find_http_header(sv->request, sv->recv_len);
199 	if (sv->request_len <= 0) {
200 		TRACE_ERROR("Socket %d: Failed to parse HTTP request header.\n"
201 				"read bytes: %d, recv_len: %d, "
202 				"request_len: %d, strlen: %ld, request: \n%s\n",
203 				sockid, rd, sv->recv_len,
204 				sv->request_len, strlen(sv->request), sv->request);
205 		return rd;
206 	}
207 
208 	http_get_url(sv->request, sv->request_len, url, URL_LEN);
209 	TRACE_APP("Socket %d URL: %s\n", sockid, url);
210 	sprintf(sv->fname, "%s%s", www_main, url);
211 	TRACE_APP("Socket %d File name: %s\n", sockid, sv->fname);
212 
213 
214 	sv->keep_alive = FALSE;
215 	if (http_header_str_val(sv->request, CONN_HDR_FLD,
216 				sizeof(CONN_HDR_FLD)-1, keepalive_str, sizeof(keepalive_str))) {
217 		sv->keep_alive = !strcasecmp(keepalive_str, KEEP_ALIVE_STR);
218 	}
219 
220 	/* Find file in cache */
221 	scode = 404;
222 	for (i = 0; i < nfiles; i++) {
223 		if (strcmp(sv->fname, fcache[i].fullname) == 0) {
224 			sv->fsize = fcache[i].size;
225 			sv->fidx = i;
226 			scode = 200;
227 			break;
228 		}
229 	}
230 	TRACE_APP("Socket %d File size: %ld (%ldMB)\n",
231 			sockid, sv->fsize, sv->fsize / 1024 / 1024);
232 
233 	/* Response header handling */
234 	time(&t_now);
235 	strftime(t_str, 128, "%a, %d %b %Y %X GMT", gmtime(&t_now));
236 	if (sv->keep_alive)
237 		sprintf(keepalive_str, "Keep-Alive");
238 	else
239 		sprintf(keepalive_str, "Close");
240 
241 	sprintf(response, "HTTP/1.1 %d %s\r\n"
242 			"Date: %s\r\n"
243 			"Server: Webserver on Middlebox TCP (Ubuntu)\r\n"
244 			"Content-Length: %ld\r\n"
245 			"Connection: %s\r\n\r\n",
246 			scode, StatusCodeToString(scode), t_str, sv->fsize, keepalive_str);
247 	len = strlen(response);
248 	TRACE_APP("Socket %d HTTP Response: \n%s", sockid, response);
249 	sent = mtcp_write(ctx->mctx, sockid, response, len);
250 	if (sent < len) {
251 		TRACE_ERROR("Socket %d: Sending HTTP response failed. "
252 				"try: %d, sent: %d\n", sockid, len, sent);
253 		CloseConnection(ctx, sockid, sv);
254 	}
255 	TRACE_APP("Socket %d Sent response header: try: %d, sent: %d\n",
256 			sockid, len, sent);
257 	assert(sent == len);
258 	sv->rspheader_sent = TRUE;
259 
260 	ev.events = MOS_EPOLLIN | MOS_EPOLLOUT;
261 	ev.data.sock = sockid;
262 	mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_MOD, sockid, &ev);
263 
264 	SendUntilAvailable(ctx, sockid, sv);
265 
266 	return rd;
267 }
268 /*----------------------------------------------------------------------------*/
269 static int
AcceptConnection(struct thread_context * ctx,int listener)270 AcceptConnection(struct thread_context *ctx, int listener)
271 {
272 	mctx_t mctx = ctx->mctx;
273 	struct server_vars *sv;
274 	struct mtcp_epoll_event ev;
275 	int c;
276 
277 	c = mtcp_accept(mctx, listener, NULL, NULL);
278 
279 	if (c >= 0) {
280 		if (c >= MAX_FLOW_NUM) {
281 			TRACE_ERROR("Invalid socket id %d.\n", c);
282 			return -1;
283 		}
284 
285 		sv = &ctx->svars[c];
286 		CleanServerVariable(sv);
287 		TRACE_APP("New connection %d accepted.\n", c);
288 		ev.events = MOS_EPOLLIN;
289 		ev.data.sock = c;
290 		mtcp_setsock_nonblock(ctx->mctx, c);
291 		mtcp_epoll_ctl(mctx, ctx->ep, MOS_EPOLL_CTL_ADD, c, &ev);
292 		TRACE_APP("Socket %d registered.\n", c);
293 
294 	} else {
295 		if (errno != EAGAIN) {
296 			TRACE_ERROR("mtcp_accept() error %s\n",
297 					strerror(errno));
298 		}
299 	}
300 
301 	return c;
302 }
303 /*----------------------------------------------------------------------------*/
304 static int
CreateListeningSocket(struct thread_context * ctx)305 CreateListeningSocket(struct thread_context *ctx)
306 {
307 	int listener;
308 	struct mtcp_epoll_event ev;
309 	struct sockaddr_in saddr;
310 	int ret;
311 
312 	/* create socket and set it as nonblocking */
313 	listener = mtcp_socket(ctx->mctx, AF_INET, SOCK_STREAM, 0);
314 	if (listener < 0) {
315 		TRACE_ERROR("Failed to create listening socket!\n");
316 		return -1;
317 	}
318 	ret = mtcp_setsock_nonblock(ctx->mctx, listener);
319 	if (ret < 0) {
320 		TRACE_ERROR("Failed to set socket in nonblocking mode.\n");
321 		return -1;
322 	}
323 
324 	/* bind to port 80 */
325 	saddr.sin_family = AF_INET;
326 	saddr.sin_addr.s_addr = INADDR_ANY;
327 	saddr.sin_port = htons(80);
328 	ret = mtcp_bind(ctx->mctx, listener,
329 			(struct sockaddr *)&saddr, sizeof(struct sockaddr_in));
330 	if (ret < 0) {
331 		TRACE_ERROR("Failed to bind to the listening socket!\n");
332 		return -1;
333 	}
334 
335 	/* listen (backlog: 4K) */
336 	ret = mtcp_listen(ctx->mctx, listener, 4096);
337 	if (ret < 0) {
338 		TRACE_ERROR("mtcp_listen() failed!\n");
339 		return -1;
340 	}
341 
342 	/* wait for incoming accept events */
343 	ev.events = MOS_EPOLLIN;
344 	ev.data.sock = listener;
345 	mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_ADD, listener, &ev);
346 
347 	return listener;
348 }
349 /*----------------------------------------------------------------------------*/
350 static void
GlobInitServer()351 GlobInitServer()
352 {
353 	DIR *dir;
354 	struct dirent *ent;
355 	int fd;
356 	int ret;
357 	uint64_t total_read;
358 
359 	num_cores = GetNumCPUs();
360 	core_limit = num_cores;
361 
362 	if (LoadConfig(CONFIG_FILE, g_conf, NUM_CONF_VAR))
363 		exit(-1);
364 
365 	core_limit = atoi(g_conf[0].value);
366 	www_main = g_conf[1].value;
367 
368 	/* open the directory to serve */
369 	dir = opendir(www_main);
370 	if (!dir) {
371 		TRACE_ERROR("Failed to open %s.\n", www_main);
372 		perror("opendir");
373 		exit(-1);
374 	}
375 
376 	nfiles = 0;
377 	while ((ent = readdir(dir)) != NULL) {
378 		if (strcmp(ent->d_name, ".") == 0)
379 			continue;
380 		else if (strcmp(ent->d_name, "..") == 0)
381 			continue;
382 
383 		strcpy(fcache[nfiles].name, ent->d_name);
384 		sprintf(fcache[nfiles].fullname, "%s/%s", www_main, ent->d_name);
385 		fd = open(fcache[nfiles].fullname, O_RDONLY);
386 		if (fd < 0) {
387 			perror("open");
388 			continue;
389 		} else {
390 			fcache[nfiles].size = lseek64(fd, 0, SEEK_END);
391 			lseek64(fd, 0, SEEK_SET);
392 		}
393 
394 		fcache[nfiles].file = (char *)malloc(fcache[nfiles].size);
395 		if (!fcache[nfiles].file) {
396 			TRACE_ERROR("Failed to allocate memory for file %s\n",
397 					fcache[nfiles].name);
398 			perror("malloc");
399 			continue;
400 		}
401 
402 		TRACE_INFO("Reading %s (%lu bytes)\n",
403 				fcache[nfiles].name, fcache[nfiles].size);
404 		total_read = 0;
405 		while (1) {
406 			ret = read(fd, fcache[nfiles].file + total_read,
407 					fcache[nfiles].size - total_read);
408 			if (ret < 0) {
409 				break;
410 			} else if (ret == 0) {
411 				break;
412 			}
413 			total_read += ret;
414 		}
415 		if (total_read < fcache[nfiles].size) {
416 			free(fcache[nfiles].file);
417 			continue;
418 		}
419 		close(fd);
420 		nfiles++;
421 
422 		if (nfiles >= MAX_FILES)
423 			break;
424 	}
425 
426 	finished = 0;
427 
428 	return;
429 }
430 /*----------------------------------------------------------------------------*/
431 static void
InitServer(mctx_t mctx,void ** app_ctx)432 InitServer(mctx_t mctx, void **app_ctx)
433 {
434 	struct thread_context *ctx;
435 
436 	ctx = (struct thread_context *)calloc(1, sizeof(struct thread_context));
437 	if (!ctx) {
438 		TRACE_ERROR("Failed to create thread context!\n");
439 		exit(-1);
440 	}
441 
442 	ctx->mctx = mctx;
443 
444 	/* create epoll descriptor */
445 	ctx->ep = mtcp_epoll_create(mctx, MAX_EVENTS);
446 	if (ctx->ep < 0) {
447 		TRACE_ERROR("Failed to create epoll descriptor!\n");
448 		exit(-1);
449 	}
450 
451 	/* allocate memory for server variables */
452 	ctx->svars = (struct server_vars *)
453 			calloc(MAX_FLOW_NUM, sizeof(struct server_vars));
454 	if (!ctx->svars) {
455 		TRACE_ERROR("Failed to create server_vars struct!\n");
456 		exit(-1);
457 	}
458 
459 	ctx->listener = CreateListeningSocket(ctx);
460 	if (ctx->listener < 0) {
461 		TRACE_ERROR("Failed to create listening socket.\n");
462 		exit(-1);
463 	}
464 
465 	*app_ctx = (void *)ctx;
466 
467 	return;
468 }
469 /*----------------------------------------------------------------------------*/
470 static void
RunServer(mctx_t mctx,void ** app_ctx)471 RunServer(mctx_t mctx, void **app_ctx)
472 {
473 	struct thread_context *ctx = (*app_ctx);
474 	int nevents;
475 	int i, ret;
476 	int do_accept;
477 	struct mtcp_epoll_event *events;
478 
479 	assert(ctx);
480 	int ep = ctx->ep;
481 
482 	events = (struct mtcp_epoll_event *)
483 			calloc(MAX_EVENTS, sizeof(struct mtcp_epoll_event));
484 	if (!events) {
485 		TRACE_ERROR("Failed to create event struct!\n");
486 		exit(-1);
487 	}
488 
489 	while (1) {
490 		nevents = mtcp_epoll_wait(mctx, ep, events, MAX_EVENTS, -1);
491 		if (nevents < 0) {
492 			if (errno != EINTR)
493 				perror("mtcp_epoll_wait");
494 			break;
495 		}
496 
497 		do_accept = FALSE;
498 		for (i = 0; i < nevents; i++) {
499 
500 			if (events[i].data.sock == ctx->listener) {
501 				/* if the event is for the listener, accept connection */
502 				do_accept = TRUE;
503 
504 			} else if (events[i].events & MOS_EPOLLERR) {
505 				int err;
506 				socklen_t len = sizeof(err);
507 
508 				/* error on the connection */
509 				TRACE_APP("[CPU %d] Error on socket %d\n",
510 						core, events[i].data.sock);
511 				if (mtcp_getsockopt(mctx, events[i].data.sock,
512 						SOL_SOCKET, SO_ERROR, (void *)&err, &len) == 0) {
513 					if (err != ETIMEDOUT) {
514 						fprintf(stderr, "Error on socket %d: %s\n",
515 								events[i].data.sock, strerror(err));
516 					}
517 				} else {
518 					fprintf(stderr, "mtcp_getsockopt: %s (for sockid: %d)\n",
519 						strerror(errno), events[i].data.sock);
520 					exit(-1);
521 				}
522 				CloseConnection(ctx, events[i].data.sock,
523 						&ctx->svars[events[i].data.sock]);
524 
525 			} else if (events[i].events & MOS_EPOLLIN) {
526 				ret = HandleReadEvent(ctx, events[i].data.sock,
527 						&ctx->svars[events[i].data.sock]);
528 
529 				if (ret == 0) {
530 					/* connection closed by remote host */
531 					CloseConnection(ctx, events[i].data.sock,
532 							&ctx->svars[events[i].data.sock]);
533 				} else if (ret < 0) {
534 					/* if not EAGAIN, it's an error */
535 					if (errno != EAGAIN) {
536 						CloseConnection(ctx, events[i].data.sock,
537 								&ctx->svars[events[i].data.sock]);
538 					}
539 				}
540 
541 			} else if (events[i].events & MOS_EPOLLOUT) {
542 				struct server_vars *sv = &ctx->svars[events[i].data.sock];
543 				if (sv->rspheader_sent) {
544 					SendUntilAvailable(ctx, events[i].data.sock, sv);
545 				} else {
546 					TRACE_APP("Socket %d: Response header not sent yet.\n",
547 							events[i].data.sock);
548 				}
549 
550 			} else {
551 				assert(0);
552 			}
553 		}
554 
555 		/* if do_accept flag is set, accept connections */
556 		if (do_accept) {
557 			while (1) {
558 				ret = AcceptConnection(ctx, ctx->listener);
559 				if (ret < 0)
560 					break;
561 			}
562 		}
563 
564 	}
565 
566 	return;
567 }
568 /*----------------------------------------------------------------------------*/
569 void
RunApplication(mctx_t mctx)570 RunApplication(mctx_t mctx)
571 {
572 	void *app_ctx;
573 
574 	app_ctx = (void *)calloc(1, sizeof(void *));
575 	if (!app_ctx) {
576 		TRACE_ERROR("calloc failure\n");
577 		return;
578 	}
579 
580 	TRACE_INFO("run application on core %d\n", mctx->cpu);
581 	InitServer(mctx, &(app_ctx));
582 	RunServer(mctx, &(app_ctx));
583 }
584 /*----------------------------------------------------------------------------*/
585 void *
RunMTCP(void * arg)586 RunMTCP(void *arg)
587 {
588 	int core = *(int *)arg;
589 	mctx_t mctx;
590 
591 	mtcp_core_affinitize(core);
592 
593 	/* mTCP Initialization */
594 	mctx = mtcp_create_context(core);
595 	if (!mctx) {
596 		pthread_exit(NULL);
597 		TRACE_ERROR("Failed to craete mtcp context.\n");
598 		return NULL;
599 	}
600 
601 	/* Run application here */
602 	RunApplication(mctx);
603 
604 	/* mTCP Tear Down */
605 	mtcp_destroy_context(mctx);
606 
607 	return NULL;
608 }
609 /*----------------------------------------------------------------------------*/
610 int
main(int argc,char ** argv)611 main(int argc, char **argv)
612 {
613 	int ret/*, i*/;
614 	int current_core = -1;
615 	char *fname = "config/mos.conf";
616 
617 	int opt;
618 	while ((opt = getopt(argc, argv, "f:c:")) != -1) {
619 		switch (opt) {
620 			case 'f':
621 				fname = optarg;
622 				break;
623 			case 'c':
624 				current_core = atoi(optarg);
625 				if (current_core >= MAX_CPUS) {
626 					TRACE_ERROR("Invalid core id!\n");
627 					exit(EXIT_FAILURE);
628 				}
629 				break;
630 			default:
631 				printf("Usage: %s [-f config_file]\n", argv[0]);
632 				return 0;
633 		}
634 
635 	}
636 
637 	core_limit = sysconf(_SC_NPROCESSORS_ONLN);
638 
639 	ret = mtcp_init(fname);
640 	if (ret) {
641 		TRACE_ERROR("Failed to initialize mtcp.\n");
642 		exit(EXIT_FAILURE);
643 	}
644 
645 	if (current_core < 0) {
646 		TRACE_ERROR("Invalid core id.\n");
647 		exit(EXIT_FAILURE);
648 	}
649 
650 	mtcp_getconf(&g_mcfg);
651 
652 	core_limit = g_mcfg.num_cores;
653 
654 	GlobInitServer();
655 
656 	RunMTCP(&current_core);
657 
658 	mtcp_destroy();
659 	return 0;
660 }
661 /*----------------------------------------------------------------------------*/
662