1 #define _LARGEFILE64_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <unistd.h>
5 #include <stdint.h>
6 #include <sys/types.h>
7 #include <sys/stat.h>
8 #include <sys/socket.h>
9 #include <netinet/in.h>
10 #include <arpa/inet.h>
11 #include <fcntl.h>
12 #include <string.h>
13 #include <time.h>
14 #include <sys/time.h>
15 #include <asm/byteorder.h>
16 #include <assert.h>
17 #include <signal.h>
18 #include <sys/queue.h>
19 #include <errno.h>
20
21 #include <mos_api.h>
22 #include "cpu.h"
23
24 /* Maximum CPU cores */
25 #define MAX_CORES 16
26 /* Number of TCP flags to monitor */
27 #define NUM_FLAG 6
28 /* Default path to mOS configuration file */
29 #define MOS_CONFIG_FILE "config/mos.conf"
30 /*----------------------------------------------------------------------------*/
31 /* Global variables */
32
33 struct connection {
34 int sock; /* socket ID */
35 struct sockaddr_in addrs[2]; /* Address of a client and a serer */
36 int cli_state; /* TCP state of the client */
37 int svr_state; /* TCP state of the server */
38 TAILQ_ENTRY(connection) link; /* link to next context in this core */
39 };
40
41 int g_max_cores; /* Number of CPU cores to be used */
42 mctx_t g_mctx[MAX_CORES]; /* mOS context */
43 TAILQ_HEAD(, connection) g_sockq[MAX_CORES]; /* connection queue */
44 /*----------------------------------------------------------------------------*/
45 /* Signal handler */
46 static void
sigint_handler(int signum)47 sigint_handler(int signum)
48 {
49 int i;
50
51 /* Terminate the program if any interrupt happens */
52 for (i = 0; i < g_max_cores; i++)
53 mtcp_destroy_context(g_mctx[i]);
54 }
55 /*----------------------------------------------------------------------------*/
56 /* Find connection structure by socket ID */
57 static inline struct connection *
find_connection(int cpu,int sock)58 find_connection(int cpu, int sock)
59 {
60 struct connection *c;
61
62 TAILQ_FOREACH(c, &g_sockq[cpu], link)
63 if (c->sock == sock)
64 return c;
65
66 return NULL;
67 }
68 /*----------------------------------------------------------------------------*/
69 /* Create connection structure for new connection */
70 static void
cb_creation(mctx_t mctx,int sock,int side,uint64_t events,filter_arg_t * arg)71 cb_creation(mctx_t mctx, int sock, int side, uint64_t events, filter_arg_t *arg)
72 {
73 socklen_t addrslen = sizeof(struct sockaddr) * 2;
74 struct connection *c;
75
76 c = calloc(sizeof(struct connection), 1);
77 if (!c)
78 return;
79
80 /* Fill values of the connection structure */
81 c->sock = sock;
82 if (mtcp_getpeername(mctx, c->sock, (void *)c->addrs, &addrslen,
83 MOS_SIDE_BOTH) < 0) {
84 perror("mtcp_getpeername");
85 /* it's better to stop here and do debugging */
86 exit(EXIT_FAILURE);
87 }
88
89 /* Insert the structure to the queue */
90 TAILQ_INSERT_TAIL(&g_sockq[mctx->cpu], c, link);
91 }
92 /*----------------------------------------------------------------------------*/
93 /* Destroy connection structure */
94 static void
cb_destroy(mctx_t mctx,int sock,int side,uint64_t events,filter_arg_t * arg)95 cb_destroy(mctx_t mctx, int sock, int side, uint64_t events, filter_arg_t *arg)
96 {
97 struct connection *c;
98
99 if (!(c = find_connection(mctx->cpu, sock)))
100 return;
101
102 TAILQ_REMOVE(&g_sockq[mctx->cpu], c, link);
103 free(c);
104 }
105 /*----------------------------------------------------------------------------*/
106 /* Update connection's TCP state of each side */
107 static void
cb_st_chg(mctx_t mctx,int sock,int side,uint64_t events,filter_arg_t * arg)108 cb_st_chg(mctx_t mctx, int sock, int side, uint64_t events, filter_arg_t *arg)
109 {
110 struct connection *c;
111 socklen_t intlen = sizeof(int);
112
113 if (!(c = find_connection(mctx->cpu, sock)))
114 return;
115
116 if (side == MOS_SIDE_CLI) {
117 if (mtcp_getsockopt(mctx, c->sock, SOL_MONSOCKET, MOS_TCP_STATE_CLI,
118 (void *)&c->cli_state, &intlen) < 0) {
119 perror("mtcp_getsockopt");
120 exit(-1); /* it's better to stop here and do debugging */
121 }
122 } else {
123 if (mtcp_getsockopt(mctx, c->sock, SOL_MONSOCKET, MOS_TCP_STATE_SVR,
124 (void *)&c->svr_state, &intlen) < 0) {
125 perror("mtcp_getsockopt");
126 exit(-1); /* it's better to stop here and do debugging */
127 }
128 }
129 }
130 /*----------------------------------------------------------------------------*/
131 /* Convert state value (integer) to string (char array) */
132 const char *
strstate(int state)133 strstate(int state)
134 {
135 switch (state) {
136 #define CASE(s) case TCP_##s: return #s
137 CASE(CLOSED);
138 CASE(LISTEN);
139 CASE(SYN_SENT);
140 CASE(SYN_RCVD);
141 CASE(ESTABLISHED);
142 CASE(FIN_WAIT_1);
143 CASE(FIN_WAIT_2);
144 CASE(CLOSE_WAIT);
145 CASE(CLOSING);
146 CASE(LAST_ACK);
147 CASE(TIME_WAIT);
148 default:
149 return "-";
150 }
151 }
152 /*----------------------------------------------------------------------------*/
153 /* Print ongoing connection information based on connection structure */
154 static void
cb_printstat(mctx_t mctx,int sock,int side,uint64_t events,filter_arg_t * arg)155 cb_printstat(mctx_t mctx, int sock, int side,
156 uint64_t events, filter_arg_t *arg)
157 {
158 int i;
159 struct connection *c;
160 struct timeval tv_1sec = { /* 1 second */
161 .tv_sec = 1,
162 .tv_usec = 0
163 };
164
165 printf("Proto CPU "
166 "Client Address Client State "
167 "Server Address Server State\n");
168 for (i = 0; i < g_max_cores; i++)
169 TAILQ_FOREACH(c, &g_sockq[i], link) {
170 int space;
171
172 printf("%-5s %-3d ", "tcp", i);
173 space = printf("%s:", inet_ntoa(c->addrs[MOS_SIDE_CLI].sin_addr));
174 printf("%*d %-12s ",
175 space - 21,
176 ntohs(c->addrs[MOS_SIDE_CLI].sin_port),
177 strstate(c->cli_state));
178 space = printf("%s:", inet_ntoa(c->addrs[MOS_SIDE_SVR].sin_addr));
179 printf("%*d %-12s\n",
180 space - 21,
181 ntohs(c->addrs[MOS_SIDE_SVR].sin_port),
182 strstate(c->svr_state));
183 }
184
185 /* Set a timer for next printing */
186 if (mtcp_settimer(mctx, sock, &tv_1sec, cb_printstat)) {
187 fprintf(stderr, "Failed to register print timer\n");
188 exit(-1); /* no point in proceeding if the timer is broken */
189 }
190
191 return;
192 }
193 /*----------------------------------------------------------------------------*/
194 /* Register required callbacks */
195 static void
RegisterCallbacks(mctx_t mctx,int sock,event_t ev_new_syn)196 RegisterCallbacks(mctx_t mctx, int sock, event_t ev_new_syn)
197 {
198 struct timeval tv_1sec = { /* 1 second */
199 .tv_sec = 1,
200 .tv_usec = 0
201 };
202
203 /* Register callbacks */
204 if (mtcp_register_callback(mctx, sock, MOS_ON_CONN_START,
205 MOS_HK_SND, cb_creation)) {
206 fprintf(stderr, "Failed to register cb_creation()\n");
207 exit(-1); /* no point in proceeding if callback registration fails */
208 }
209 if (mtcp_register_callback(mctx, sock, MOS_ON_CONN_END,
210 MOS_HK_SND, cb_destroy)) {
211 fprintf(stderr, "Failed to register cb_destroy()\n");
212 exit(-1); /* no point in proceeding if callback registration fails */
213 }
214 if (mtcp_register_callback(mctx, sock, MOS_ON_TCP_STATE_CHANGE,
215 MOS_HK_SND, cb_st_chg)) {
216 fprintf(stderr, "Failed to register cb_st_chg()\n");
217 exit(-1); /* no point in proceeding if callback registration fails */
218 }
219 if (mtcp_register_callback(mctx, sock, MOS_ON_TCP_STATE_CHANGE,
220 MOS_HK_RCV, cb_st_chg)) {
221 fprintf(stderr, "Failed to register cb_st_chg()\n");
222 exit(-1); /* no point in proceeding if callback registration fails */
223 }
224
225 /* CPU 0 is in charge of printing stats */
226 if (mctx->cpu == 0 &&
227 mtcp_settimer(mctx, sock, &tv_1sec, cb_printstat)) {
228 fprintf(stderr, "Failed to register print timer\n");
229 exit(-1); /* no point in proceeding if the titmer is broken*/
230 }
231 }
232 /*----------------------------------------------------------------------------*/
233 /* Open monitoring socket and ready it for monitoring */
234 static void
InitMonitor(mctx_t mctx,event_t ev_new_syn)235 InitMonitor(mctx_t mctx, event_t ev_new_syn)
236 {
237 int sock;
238
239 /* Initialize internal memory structures */
240 TAILQ_INIT(&g_sockq[mctx->cpu]);
241
242 /* create socket and set it as nonblocking */
243 if ((sock = mtcp_socket(mctx, AF_INET,
244 MOS_SOCK_MONITOR_STREAM, 0)) < 0) {
245 fprintf(stderr, "Failed to create monitor listening socket!\n");
246 exit(-1); /* no point in proceeding if we don't have a listening socket */
247 }
248
249 /* Disable socket buffer */
250 int optval = 0;
251 if (mtcp_setsockopt(mctx, sock, SOL_MONSOCKET, MOS_CLIBUF,
252 &optval, sizeof(optval)) == -1) {
253 fprintf(stderr, "Could not disable CLIBUF!\n");
254 }
255 if (mtcp_setsockopt(mctx, sock, SOL_MONSOCKET, MOS_SVRBUF,
256 &optval, sizeof(optval)) == -1) {
257 fprintf(stderr, "Could not disable SVRBUF!\n");
258 }
259
260 RegisterCallbacks(mctx, sock, ev_new_syn);
261 }
262 /*----------------------------------------------------------------------------*/
263 int
main(int argc,char ** argv)264 main(int argc, char **argv)
265 {
266 int i, opt;
267 event_t ev_new_syn; /* New SYN UDE */
268 char *fname = MOS_CONFIG_FILE; /* path to the default mos config file */
269 struct mtcp_conf mcfg; /* mOS configuration */
270
271 /* get the total # of cpu cores */
272 g_max_cores = GetNumCPUs();
273
274 /* Parse command line arguments */
275 while ((opt = getopt(argc, argv, "c:f:")) != -1) {
276 switch (opt) {
277 case 'f':
278 fname = optarg;
279 break;
280 case 'c':
281 if (atoi(optarg) > g_max_cores) {
282 printf("Available number of CPU cores is %d\n", g_max_cores);
283 return -1;
284 }
285 g_max_cores = atoi(optarg);
286 break;
287 default:
288 printf("Usage: %s [-f mos_config_file] [-c #_of_cpu]\n", argv[0]);
289 return 0;
290 }
291 }
292
293 /* parse mos configuration file */
294 if (mtcp_init(fname)) {
295 fprintf(stderr, "Failed to initialize mtcp.\n");
296 exit(EXIT_FAILURE);
297 }
298
299 /* set the core limit */
300 mtcp_getconf(&mcfg);
301 mcfg.num_cores = g_max_cores;
302 mtcp_setconf(&mcfg);
303
304 /* Register signal handler */
305 mtcp_register_signal(SIGINT, sigint_handler);
306
307 for (i = 0; i < g_max_cores; i++) {
308 /* Run mOS for each CPU core */
309 if (!(g_mctx[i] = mtcp_create_context(i))) {
310 fprintf(stderr, "Failed to craete mtcp context.\n");
311 return -1;
312 }
313
314 /* init monitor */
315 InitMonitor(g_mctx[i], ev_new_syn);
316 }
317
318 /* wait until mOS finishes */
319 for (i = 0; i < g_max_cores; i++)
320 mtcp_app_join(g_mctx[i]);
321
322 mtcp_destroy();
323 return 0;
324 }
325 /*----------------------------------------------------------------------------*/
326