1 #define _LARGEFILE64_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <unistd.h>
5 #include <stdint.h>
6 #include <sys/types.h>
7 #include <sys/stat.h>
8 #include <sys/socket.h>
9 #include <netinet/in.h>
10 #include <arpa/inet.h>
11 #include <fcntl.h>
12 #include <dirent.h>
13 #include <string.h>
14 #include <time.h>
15 #include <pthread.h>
16 #include <signal.h>
17 
18 #include <mos_api.h>
19 #include "cpu.h"
20 #include "http_parsing.h"
21 #include "debug.h"
22 #include "applib.h"
23 
24 #define CONFIG_FILE       "config/epserver.conf"
25 static struct conf_var g_conf[] = {
26 	{ "core_limit", {0} },
27 	{ "www_main",   {0} },
28 };
29 #define NUM_CONF_VAR (sizeof(g_conf) / sizeof(struct conf_var))
30 
31 #define HTTP_HEADER_LEN 1024
32 #define URL_LEN 128
33 
34 /* shinae 10.27.2014
35  * SNDBUF_SIZE should be removed
36  */
37 #define SNDBUF_SIZE (8*1024)
38 
39 #define MAX_FILES 100
40 
41 /*----------------------------------------------------------------------------*/
42 struct mtcp_conf g_mcfg;
43 static pthread_t mtcp_thread[MAX_CPUS];
44 /*----------------------------------------------------------------------------*/
45 struct file_cache
46 {
47 	char name[128];
48 	char fullname[256];
49 	uint64_t size;
50 	char *file;
51 };
52 /*----------------------------------------------------------------------------*/
53 struct server_vars
54 {
55 	char request[HTTP_HEADER_LEN];
56 	int recv_len;
57 	int request_len;
58 	long int total_read, total_sent;
59 	uint8_t done;
60 	uint8_t rspheader_sent;
61 	uint8_t keep_alive;
62 
63 	int fidx;						// file cache index
64 	char fname[128];				// file name
65 	long int fsize;					// file size
66 };
67 /*----------------------------------------------------------------------------*/
68 struct thread_context
69 {
70 	mctx_t mctx;
71 	int listener;
72 	int ep;
73 	struct server_vars *svars;
74 };
75 /*----------------------------------------------------------------------------*/
76 static int num_cores;
77 static int core_limit;
78 /*----------------------------------------------------------------------------*/
79 char *www_main;
80 static struct file_cache fcache[MAX_FILES];
81 static int nfiles;
82 /*----------------------------------------------------------------------------*/
83 static int finished;
84 /*----------------------------------------------------------------------------*/
85 static char *
StatusCodeToString(int scode)86 StatusCodeToString(int scode)
87 {
88 	switch (scode) {
89 		case 200:
90 			return "OK";
91 			break;
92 
93 		case 404:
94 			return "Not Found";
95 			break;
96 	}
97 
98 	return NULL;
99 }
100 /*----------------------------------------------------------------------------*/
101 static void
CleanServerVariable(struct server_vars * sv)102 CleanServerVariable(struct server_vars *sv)
103 {
104 	sv->recv_len = 0;
105 	sv->request_len = 0;
106 	sv->total_read = 0;
107 	sv->total_sent = 0;
108 	sv->done = 0;
109 	sv->rspheader_sent = 0;
110 	sv->keep_alive = 0;
111 }
112 /*----------------------------------------------------------------------------*/
113 static void
CloseConnection(struct thread_context * ctx,int sockid,struct server_vars * sv)114 CloseConnection(struct thread_context *ctx, int sockid, struct server_vars *sv)
115 {
116 	mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_DEL, sockid, NULL);
117 	mtcp_close(ctx->mctx, sockid);
118 }
119 /*----------------------------------------------------------------------------*/
120 static int
SendUntilAvailable(struct thread_context * ctx,int sockid,struct server_vars * sv)121 SendUntilAvailable(struct thread_context *ctx, int sockid, struct server_vars *sv)
122 {
123 	int ret;
124 	int sent;
125 	int len;
126 
127 	if (sv->done || !sv->rspheader_sent) {
128 		return 0;
129 	}
130 
131 	sent = 0;
132 	ret = 1;
133 	while (ret > 0) {
134 		len = MIN(SNDBUF_SIZE, sv->fsize - sv->total_sent);
135 		if (len <= 0) {
136 			break;
137 		}
138 		ret = mtcp_write(ctx->mctx, sockid,
139 				fcache[sv->fidx].file + sv->total_sent, len);
140 		if (ret < 0) {
141 			if (errno != EAGAIN) {
142 				TRACE_ERROR("Socket %d: Sending HTTP response body failed. "
143 						"try: %d, sent: %d\n", sockid, len, ret);
144 			}
145 			break;
146 		}
147 		TRACE_APP("Socket %d: mtcp_write try: %d, ret: %d\n", sockid, len, ret);
148 		sent += ret;
149 		sv->total_sent += ret;
150 	}
151 
152 	if (sv->total_sent >= fcache[sv->fidx].size) {
153 		struct mtcp_epoll_event ev;
154 		sv->done = TRUE;
155 		finished++;
156 
157 		if (sv->keep_alive) {
158 			/* if keep-alive connection, wait for the incoming request */
159 			ev.events = MOS_EPOLLIN;
160 			ev.data.sock = sockid;
161 			mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_MOD, sockid, &ev);
162 
163 			CleanServerVariable(sv);
164 		} else {
165 			/* else, close connection */
166 			CloseConnection(ctx, sockid, sv);
167 		}
168 	}
169 
170 	return sent;
171 }
172 /*----------------------------------------------------------------------------*/
173 static int
HandleReadEvent(struct thread_context * ctx,int sockid,struct server_vars * sv)174 HandleReadEvent(struct thread_context *ctx, int sockid, struct server_vars *sv)
175 {
176 	struct mtcp_epoll_event ev;
177 	char buf[HTTP_HEADER_LEN];
178 	char url[URL_LEN];
179 	char response[HTTP_HEADER_LEN];
180 	int scode;						// status code
181 	time_t t_now;
182 	char t_str[128];
183 	char keepalive_str[128];
184 	int rd;
185 	int i;
186 	int len;
187 	int sent;
188 
189 	/* HTTP request handling */
190 	rd = mtcp_read(ctx->mctx, sockid, buf, HTTP_HEADER_LEN);
191 	if (rd <= 0) {
192 		return rd;
193 	}
194 	memcpy(sv->request + sv->recv_len,
195 			(char *)buf, MIN(rd, HTTP_HEADER_LEN - sv->recv_len));
196 	sv->recv_len += rd;
197 	//sv->request[rd] = '\0';
198 	//fprintf(stderr, "HTTP Request: \n%s", request);
199 	sv->request_len = find_http_header(sv->request, sv->recv_len);
200 	if (sv->request_len <= 0) {
201 		TRACE_ERROR("Socket %d: Failed to parse HTTP request header.\n"
202 				"read bytes: %d, recv_len: %d, "
203 				"request_len: %d, strlen: %ld, request: \n%s\n",
204 				sockid, rd, sv->recv_len,
205 				sv->request_len, strlen(sv->request), sv->request);
206 		return rd;
207 	}
208 
209 	http_get_url(sv->request, sv->request_len, url, URL_LEN);
210 	TRACE_APP("Socket %d URL: %s\n", sockid, url);
211 	sprintf(sv->fname, "%s%s", www_main, url);
212 	TRACE_APP("Socket %d File name: %s\n", sockid, sv->fname);
213 
214 
215 	sv->keep_alive = FALSE;
216 	if (http_header_str_val(sv->request, CONN_HDR_FLD,
217 				sizeof(CONN_HDR_FLD)-1, keepalive_str, sizeof(keepalive_str))) {
218 		sv->keep_alive = !strcasecmp(keepalive_str, KEEP_ALIVE_STR);
219 	}
220 
221 	/* Find file in cache */
222 	scode = 404;
223 	for (i = 0; i < nfiles; i++) {
224 		if (strcmp(sv->fname, fcache[i].fullname) == 0) {
225 			sv->fsize = fcache[i].size;
226 			sv->fidx = i;
227 			scode = 200;
228 			break;
229 		}
230 	}
231 	TRACE_APP("Socket %d File size: %ld (%ldMB)\n",
232 			sockid, sv->fsize, sv->fsize / 1024 / 1024);
233 
234 	/* Response header handling */
235 	time(&t_now);
236 	strftime(t_str, 128, "%a, %d %b %Y %X GMT", gmtime(&t_now));
237 	if (sv->keep_alive)
238 		sprintf(keepalive_str, "Keep-Alive");
239 	else
240 		sprintf(keepalive_str, "Close");
241 
242 	sprintf(response, "HTTP/1.1 %d %s\r\n"
243 			"Date: %s\r\n"
244 			"Server: Webserver on Middlebox TCP (Ubuntu)\r\n"
245 			"Content-Length: %ld\r\n"
246 			"Connection: %s\r\n\r\n",
247 			scode, StatusCodeToString(scode), t_str, sv->fsize, keepalive_str);
248 	len = strlen(response);
249 	TRACE_APP("Socket %d HTTP Response: \n%s", sockid, response);
250 	sent = mtcp_write(ctx->mctx, sockid, response, len);
251 	if (sent < len) {
252 		TRACE_ERROR("Socket %d: Sending HTTP response failed. "
253 				"try: %d, sent: %d\n", sockid, len, sent);
254 		CloseConnection(ctx, sockid, sv);
255 	}
256 	TRACE_APP("Socket %d Sent response header: try: %d, sent: %d\n",
257 			sockid, len, sent);
258 	assert(sent == len);
259 	sv->rspheader_sent = TRUE;
260 
261 	ev.events = MOS_EPOLLIN | MOS_EPOLLOUT;
262 	ev.data.sock = sockid;
263 	mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_MOD, sockid, &ev);
264 
265 	SendUntilAvailable(ctx, sockid, sv);
266 
267 	return rd;
268 }
269 /*----------------------------------------------------------------------------*/
270 static int
AcceptConnection(struct thread_context * ctx,int listener)271 AcceptConnection(struct thread_context *ctx, int listener)
272 {
273 	mctx_t mctx = ctx->mctx;
274 	struct server_vars *sv;
275 	struct mtcp_epoll_event ev;
276 	int c;
277 
278 	c = mtcp_accept(mctx, listener, NULL, NULL);
279 
280 	if (c >= 0) {
281 		if (c >= MAX_FLOW_NUM) {
282 			TRACE_ERROR("Invalid socket id %d.\n", c);
283 			return -1;
284 		}
285 
286 		sv = &ctx->svars[c];
287 		CleanServerVariable(sv);
288 		TRACE_APP("New connection %d accepted.\n", c);
289 		ev.events = MOS_EPOLLIN;
290 		ev.data.sock = c;
291 		mtcp_setsock_nonblock(ctx->mctx, c);
292 		mtcp_epoll_ctl(mctx, ctx->ep, MOS_EPOLL_CTL_ADD, c, &ev);
293 		TRACE_APP("Socket %d registered.\n", c);
294 
295 	} else {
296 		if (errno != EAGAIN) {
297 			TRACE_ERROR("mtcp_accept() error %s\n",
298 					strerror(errno));
299 		}
300 	}
301 
302 	return c;
303 }
304 /*----------------------------------------------------------------------------*/
305 static int
CreateListeningSocket(struct thread_context * ctx)306 CreateListeningSocket(struct thread_context *ctx)
307 {
308 	int listener;
309 	struct mtcp_epoll_event ev;
310 	struct sockaddr_in saddr;
311 	int ret;
312 
313 	/* create socket and set it as nonblocking */
314 	listener = mtcp_socket(ctx->mctx, AF_INET, SOCK_STREAM, 0);
315 	if (listener < 0) {
316 		TRACE_ERROR("Failed to create listening socket!\n");
317 		return -1;
318 	}
319 	ret = mtcp_setsock_nonblock(ctx->mctx, listener);
320 	if (ret < 0) {
321 		TRACE_ERROR("Failed to set socket in nonblocking mode.\n");
322 		return -1;
323 	}
324 
325 	/* bind to port 80 */
326 	saddr.sin_family = AF_INET;
327 	saddr.sin_addr.s_addr = INADDR_ANY;
328 	saddr.sin_port = htons(80);
329 	ret = mtcp_bind(ctx->mctx, listener,
330 			(struct sockaddr *)&saddr, sizeof(struct sockaddr_in));
331 	if (ret < 0) {
332 		TRACE_ERROR("Failed to bind to the listening socket!\n");
333 		return -1;
334 	}
335 
336 	/* listen (backlog: 4K) */
337 	ret = mtcp_listen(ctx->mctx, listener, 4096);
338 	if (ret < 0) {
339 		TRACE_ERROR("mtcp_listen() failed!\n");
340 		return -1;
341 	}
342 
343 	/* wait for incoming accept events */
344 	ev.events = MOS_EPOLLIN;
345 	ev.data.sock = listener;
346 	mtcp_epoll_ctl(ctx->mctx, ctx->ep, MOS_EPOLL_CTL_ADD, listener, &ev);
347 
348 	return listener;
349 }
350 /*----------------------------------------------------------------------------*/
351 static void
GlobInitServer()352 GlobInitServer()
353 {
354 	DIR *dir;
355 	struct dirent *ent;
356 	int fd;
357 	int ret;
358 	uint64_t total_read;
359 
360 	num_cores = GetNumCPUs();
361 	core_limit = num_cores;
362 
363 	if (LoadConfig(CONFIG_FILE, g_conf, NUM_CONF_VAR))
364 		exit(-1);
365 
366 	core_limit = atoi(g_conf[0].value);
367 	www_main = g_conf[1].value;
368 
369 	/* open the directory to serve */
370 	dir = opendir(www_main);
371 	if (!dir) {
372 		TRACE_ERROR("Failed to open %s.\n", www_main);
373 		perror("opendir");
374 		exit(-1);
375 	}
376 
377 	nfiles = 0;
378 	while ((ent = readdir(dir)) != NULL) {
379 		if (strcmp(ent->d_name, ".") == 0)
380 			continue;
381 		else if (strcmp(ent->d_name, "..") == 0)
382 			continue;
383 
384 		strcpy(fcache[nfiles].name, ent->d_name);
385 		sprintf(fcache[nfiles].fullname, "%s/%s", www_main, ent->d_name);
386 		fd = open(fcache[nfiles].fullname, O_RDONLY);
387 		if (fd < 0) {
388 			perror("open");
389 			continue;
390 		} else {
391 			fcache[nfiles].size = lseek64(fd, 0, SEEK_END);
392 			lseek64(fd, 0, SEEK_SET);
393 		}
394 
395 		fcache[nfiles].file = (char *)malloc(fcache[nfiles].size);
396 		if (!fcache[nfiles].file) {
397 			TRACE_ERROR("Failed to allocate memory for file %s\n",
398 					fcache[nfiles].name);
399 			perror("malloc");
400 			continue;
401 		}
402 
403 		TRACE_INFO("Reading %s (%lu bytes)\n",
404 				fcache[nfiles].name, fcache[nfiles].size);
405 		total_read = 0;
406 		while (1) {
407 			ret = read(fd, fcache[nfiles].file + total_read,
408 					fcache[nfiles].size - total_read);
409 			if (ret < 0) {
410 				break;
411 			} else if (ret == 0) {
412 				break;
413 			}
414 			total_read += ret;
415 		}
416 		if (total_read < fcache[nfiles].size) {
417 			free(fcache[nfiles].file);
418 			continue;
419 		}
420 		close(fd);
421 		nfiles++;
422 
423 		if (nfiles >= MAX_FILES)
424 			break;
425 	}
426 
427 	finished = 0;
428 
429 	return;
430 }
431 /*----------------------------------------------------------------------------*/
432 static void
InitServer(mctx_t mctx,void ** app_ctx)433 InitServer(mctx_t mctx, void **app_ctx)
434 {
435 	struct thread_context *ctx;
436 
437 	ctx = (struct thread_context *)calloc(1, sizeof(struct thread_context));
438 	if (!ctx) {
439 		TRACE_ERROR("Failed to create thread context!\n");
440 		exit(-1);
441 	}
442 
443 	ctx->mctx = mctx;
444 
445 	/* create epoll descriptor */
446 	ctx->ep = mtcp_epoll_create(mctx, MAX_EVENTS);
447 	if (ctx->ep < 0) {
448 		TRACE_ERROR("Failed to create epoll descriptor!\n");
449 		exit(-1);
450 	}
451 
452 	/* allocate memory for server variables */
453 	ctx->svars = (struct server_vars *)
454 			calloc(MAX_FLOW_NUM, sizeof(struct server_vars));
455 	if (!ctx->svars) {
456 		TRACE_ERROR("Failed to create server_vars struct!\n");
457 		exit(-1);
458 	}
459 
460 	ctx->listener = CreateListeningSocket(ctx);
461 	if (ctx->listener < 0) {
462 		TRACE_ERROR("Failed to create listening socket.\n");
463 		exit(-1);
464 	}
465 
466 	*app_ctx = (void *)ctx;
467 
468 	return;
469 }
470 /*----------------------------------------------------------------------------*/
471 static void
RunServer(mctx_t mctx,void ** app_ctx)472 RunServer(mctx_t mctx, void **app_ctx)
473 {
474 	struct thread_context *ctx = (*app_ctx);
475 	int nevents;
476 	int i, ret;
477 	int do_accept;
478 	struct mtcp_epoll_event *events;
479 
480 	assert(ctx);
481 	int ep = ctx->ep;
482 
483 	events = (struct mtcp_epoll_event *)
484 			calloc(MAX_EVENTS, sizeof(struct mtcp_epoll_event));
485 	if (!events) {
486 		TRACE_ERROR("Failed to create event struct!\n");
487 		exit(-1);
488 	}
489 
490 	while (1) {
491 		nevents = mtcp_epoll_wait(mctx, ep, events, MAX_EVENTS, -1);
492 		if (nevents < 0) {
493 			if (errno != EINTR)
494 				perror("mtcp_epoll_wait");
495 			break;
496 		}
497 
498 		do_accept = FALSE;
499 		for (i = 0; i < nevents; i++) {
500 
501 			if (events[i].data.sock == ctx->listener) {
502 				/* if the event is for the listener, accept connection */
503 				do_accept = TRUE;
504 
505 			} else if (events[i].events & MOS_EPOLLERR) {
506 				int err;
507 				socklen_t len = sizeof(err);
508 
509 				/* error on the connection */
510 				TRACE_APP("[CPU %d] Error on socket %d\n",
511 						core, events[i].data.sock);
512 				if (mtcp_getsockopt(mctx, events[i].data.sock,
513 						SOL_SOCKET, SO_ERROR, (void *)&err, &len) == 0) {
514 					if (err != ETIMEDOUT) {
515 						fprintf(stderr, "Error on socket %d: %s\n",
516 								events[i].data.sock, strerror(err));
517 					}
518 				} else {
519 					fprintf(stderr, "mtcp_getsockopt: %s (for sockid: %d)\n",
520 						strerror(errno), events[i].data.sock);
521 					exit(-1);
522 				}
523 				CloseConnection(ctx, events[i].data.sock,
524 						&ctx->svars[events[i].data.sock]);
525 
526 			} else if (events[i].events & MOS_EPOLLIN) {
527 				ret = HandleReadEvent(ctx, events[i].data.sock,
528 						&ctx->svars[events[i].data.sock]);
529 
530 				if (ret == 0) {
531 					/* connection closed by remote host */
532 					CloseConnection(ctx, events[i].data.sock,
533 							&ctx->svars[events[i].data.sock]);
534 				} else if (ret < 0) {
535 					/* if not EAGAIN, it's an error */
536 					if (errno != EAGAIN) {
537 						CloseConnection(ctx, events[i].data.sock,
538 								&ctx->svars[events[i].data.sock]);
539 					}
540 				}
541 
542 			} else if (events[i].events & MOS_EPOLLOUT) {
543 				struct server_vars *sv = &ctx->svars[events[i].data.sock];
544 				if (sv->rspheader_sent) {
545 					SendUntilAvailable(ctx, events[i].data.sock, sv);
546 				} else {
547 					TRACE_APP("Socket %d: Response header not sent yet.\n",
548 							events[i].data.sock);
549 				}
550 
551 			} else {
552 				assert(0);
553 			}
554 		}
555 
556 		/* if do_accept flag is set, accept connections */
557 		if (do_accept) {
558 			while (1) {
559 				ret = AcceptConnection(ctx, ctx->listener);
560 				if (ret < 0)
561 					break;
562 			}
563 		}
564 
565 	}
566 
567 	return;
568 }
569 /*----------------------------------------------------------------------------*/
570 void
RunApplication(mctx_t mctx)571 RunApplication(mctx_t mctx)
572 {
573 	void *app_ctx;
574 
575 	app_ctx = (void *)calloc(1, sizeof(void *));
576 	if (!app_ctx) {
577 		TRACE_ERROR("calloc failure\n");
578 		return;
579 	}
580 
581 	TRACE_INFO("run application on core %d\n", mctx->cpu);
582 	InitServer(mctx, &(app_ctx));
583 	RunServer(mctx, &(app_ctx));
584 }
585 /*----------------------------------------------------------------------------*/
586 void *
RunMTCP(void * arg)587 RunMTCP(void *arg)
588 {
589 	int core = *(int *)arg;
590 	mctx_t mctx;
591 
592 	mtcp_core_affinitize(core);
593 
594 	/* mTCP Initialization */
595 	mctx = mtcp_create_context(core);
596 	if (!mctx) {
597 		pthread_exit(NULL);
598 		TRACE_ERROR("Failed to craete mtcp context.\n");
599 		return NULL;
600 	}
601 
602 	/* Run application here */
603 	RunApplication(mctx);
604 
605 	/* mTCP Tear Down */
606 	mtcp_destroy_context(mctx);
607 	pthread_exit(NULL);
608 
609 	return NULL;
610 }
611 /*----------------------------------------------------------------------------*/
612 int
main(int argc,char ** argv)613 main(int argc, char **argv)
614 {
615 	int ret, i;
616 	int cores[MAX_CPUS];
617 	char *fname = "config/mos.conf";
618 
619 	int opt;
620 	while ((opt = getopt(argc, argv, "f:")) != -1) {
621 		switch (opt) {
622 			case 'f':
623 				fname = optarg;
624 				break;
625 			default:
626 				printf("Usage: %s [-f config_file]\n", argv[0]);
627 				return 0;
628 		}
629 
630 	}
631 
632 	core_limit = sysconf(_SC_NPROCESSORS_ONLN);
633 
634 	ret = mtcp_init(fname);
635 	if (ret) {
636 		TRACE_ERROR("Failed to initialize mtcp.\n");
637 		exit(EXIT_FAILURE);
638 	}
639 
640 	mtcp_getconf(&g_mcfg);
641 
642 	core_limit = g_mcfg.num_cores;
643 
644 	GlobInitServer();
645 
646 	for (i = 0; i < core_limit; i++) {
647 		cores[i] = i;
648 
649 		/* Run mtcp thread */
650 		if ((g_mcfg.cpu_mask & (1L << i)) &&
651 			pthread_create(&mtcp_thread[i], NULL, RunMTCP, (void *)&cores[i])) {
652 			perror("pthread_create");
653 			TRACE_ERROR("Failed to create msg_test thread.\n");
654 			exit(-1);
655 		}
656 	}
657 
658 	for (i = 0; i < core_limit; i++) {
659 		if (g_mcfg.cpu_mask & (1L << i))
660 			pthread_join(mtcp_thread[i], NULL);
661 		TRACE_INFO("Message test thread %d joined.\n", i);
662 	}
663 
664 	mtcp_destroy();
665 	return 0;
666 }
667 /*----------------------------------------------------------------------------*/
668