1 #define _LARGEFILE64_SOURCE 2 #include <stdio.h> 3 #include <stdlib.h> 4 #include <unistd.h> 5 #include <stdint.h> 6 #include <sys/types.h> 7 #include <sys/stat.h> 8 #include <sys/socket.h> 9 #include <netinet/in.h> 10 #include <arpa/inet.h> 11 #include <fcntl.h> 12 #include <string.h> 13 #include <time.h> 14 #include <sys/time.h> 15 #include <asm/byteorder.h> 16 #include <assert.h> 17 #include <signal.h> 18 #include <sys/queue.h> 19 #include <errno.h> 20 21 #include <mos_api.h> 22 #include "cpu.h" 23 24 /* Maximum CPU cores */ 25 #define MAX_CORES 16 26 /* Number of TCP flags to monitor */ 27 #define NUM_FLAG 6 28 /* Default path to mOS configuration file */ 29 #define MOS_CONFIG_FILE "config/mos.conf" 30 /*----------------------------------------------------------------------------*/ 31 /* Global variables */ 32 33 struct connection { 34 int sock; /* socket ID */ 35 struct sockaddr_in addrs[2]; /* Address of a client and a serer */ 36 int cli_state; /* TCP state of the client */ 37 int svr_state; /* TCP state of the server */ 38 TAILQ_ENTRY(connection) link; /* link to next context in this core */ 39 }; 40 41 int g_max_cores; /* Number of CPU cores to be used */ 42 mctx_t g_mctx[MAX_CORES]; /* mOS context */ 43 TAILQ_HEAD(, connection) g_sockq[MAX_CORES]; /* connection queue */ 44 /*----------------------------------------------------------------------------*/ 45 /* Signal handler */ 46 static void 47 sigint_handler(int signum) 48 { 49 int i; 50 51 /* Terminate the program if any interrupt happens */ 52 for (i = 0; i < g_max_cores; i++) 53 mtcp_destroy_context(g_mctx[i]); 54 } 55 /*----------------------------------------------------------------------------*/ 56 /* Find connection structure by socket ID */ 57 static inline struct connection * 58 find_connection(int cpu, int sock) 59 { 60 struct connection *c; 61 62 TAILQ_FOREACH(c, &g_sockq[cpu], link) 63 if (c->sock == sock) 64 return c; 65 66 return NULL; 67 } 68 /*----------------------------------------------------------------------------*/ 69 /* Create connection structure for new connection */ 70 static void 71 cb_creation(mctx_t mctx, int sock, int side, uint64_t events, filter_arg_t *arg) 72 { 73 socklen_t addrslen = sizeof(struct sockaddr) * 2; 74 struct connection *c; 75 76 c = calloc(sizeof(struct connection), 1); 77 if (!c) 78 return; 79 80 /* Fill values of the connection structure */ 81 c->sock = sock; 82 if (mtcp_getpeername(mctx, c->sock, (void *)c->addrs, &addrslen, 83 MOS_SIDE_BOTH) < 0) { 84 perror("mtcp_getpeername"); 85 /* it's better to stop here and do debugging */ 86 exit(EXIT_FAILURE); 87 } 88 89 /* Insert the structure to the queue */ 90 TAILQ_INSERT_TAIL(&g_sockq[mctx->cpu], c, link); 91 } 92 /*----------------------------------------------------------------------------*/ 93 /* Destroy connection structure */ 94 static void 95 cb_destroy(mctx_t mctx, int sock, int side, uint64_t events, filter_arg_t *arg) 96 { 97 struct connection *c; 98 99 if (!(c = find_connection(mctx->cpu, sock))) 100 return; 101 102 TAILQ_REMOVE(&g_sockq[mctx->cpu], c, link); 103 free(c); 104 } 105 /*----------------------------------------------------------------------------*/ 106 /* Update connection's TCP state of each side */ 107 static void 108 cb_st_chg(mctx_t mctx, int sock, int side, uint64_t events, filter_arg_t *arg) 109 { 110 struct connection *c; 111 socklen_t intlen = sizeof(int); 112 113 if (!(c = find_connection(mctx->cpu, sock))) 114 return; 115 116 if (side == MOS_SIDE_CLI) { 117 if (mtcp_getsockopt(mctx, c->sock, SOL_MONSOCKET, MOS_TCP_STATE_CLI, 118 (void *)&c->cli_state, &intlen) < 0) { 119 perror("mtcp_getsockopt"); 120 exit(-1); /* it's better to stop here and do debugging */ 121 } 122 } else { 123 if (mtcp_getsockopt(mctx, c->sock, SOL_MONSOCKET, MOS_TCP_STATE_SVR, 124 (void *)&c->svr_state, &intlen) < 0) { 125 perror("mtcp_getsockopt"); 126 exit(-1); /* it's better to stop here and do debugging */ 127 } 128 } 129 } 130 /*----------------------------------------------------------------------------*/ 131 /* Convert state value (integer) to string (char array) */ 132 const char * 133 strstate(int state) 134 { 135 switch (state) { 136 #define CASE(s) case TCP_##s: return #s 137 CASE(CLOSED); 138 CASE(LISTEN); 139 CASE(SYN_SENT); 140 CASE(SYN_RCVD); 141 CASE(ESTABLISHED); 142 CASE(FIN_WAIT_1); 143 CASE(FIN_WAIT_2); 144 CASE(CLOSE_WAIT); 145 CASE(CLOSING); 146 CASE(LAST_ACK); 147 CASE(TIME_WAIT); 148 default: 149 return "-"; 150 } 151 } 152 /*----------------------------------------------------------------------------*/ 153 /* Print ongoing connection information based on connection structure */ 154 static void 155 cb_printstat(mctx_t mctx, int sock, int side, 156 uint64_t events, filter_arg_t *arg) 157 { 158 int i; 159 struct connection *c; 160 struct timeval tv_1sec = { /* 1 second */ 161 .tv_sec = 1, 162 .tv_usec = 0 163 }; 164 165 printf("Proto CPU " 166 "Client Address Client State " 167 "Server Address Server State\n"); 168 for (i = 0; i < g_max_cores; i++) 169 TAILQ_FOREACH(c, &g_sockq[i], link) { 170 int space; 171 172 printf("%-5s %-3d ", "tcp", i); 173 space = printf("%s:", inet_ntoa(c->addrs[MOS_SIDE_CLI].sin_addr)); 174 printf("%*d %-12s ", 175 space - 21, 176 ntohs(c->addrs[MOS_SIDE_CLI].sin_port), 177 strstate(c->cli_state)); 178 space = printf("%s:", inet_ntoa(c->addrs[MOS_SIDE_SVR].sin_addr)); 179 printf("%*d %-12s\n", 180 space - 21, 181 ntohs(c->addrs[MOS_SIDE_SVR].sin_port), 182 strstate(c->svr_state)); 183 } 184 185 /* Set a timer for next printing */ 186 if (mtcp_settimer(mctx, sock, &tv_1sec, cb_printstat)) { 187 fprintf(stderr, "Failed to register print timer\n"); 188 exit(-1); /* no point in proceeding if the timer is broken */ 189 } 190 191 return; 192 } 193 /*----------------------------------------------------------------------------*/ 194 /* Register required callbacks */ 195 static void 196 RegisterCallbacks(mctx_t mctx, int sock, event_t ev_new_syn) 197 { 198 struct timeval tv_1sec = { /* 1 second */ 199 .tv_sec = 1, 200 .tv_usec = 0 201 }; 202 203 /* Register callbacks */ 204 if (mtcp_register_callback(mctx, sock, MOS_ON_CONN_START, 205 MOS_HK_SND, cb_creation)) { 206 fprintf(stderr, "Failed to register cb_creation()\n"); 207 exit(-1); /* no point in proceeding if callback registration fails */ 208 } 209 if (mtcp_register_callback(mctx, sock, MOS_ON_CONN_END, 210 MOS_HK_SND, cb_destroy)) { 211 fprintf(stderr, "Failed to register cb_destroy()\n"); 212 exit(-1); /* no point in proceeding if callback registration fails */ 213 } 214 if (mtcp_register_callback(mctx, sock, MOS_ON_TCP_STATE_CHANGE, 215 MOS_HK_SND, cb_st_chg)) { 216 fprintf(stderr, "Failed to register cb_st_chg()\n"); 217 exit(-1); /* no point in proceeding if callback registration fails */ 218 } 219 if (mtcp_register_callback(mctx, sock, MOS_ON_TCP_STATE_CHANGE, 220 MOS_HK_RCV, cb_st_chg)) { 221 fprintf(stderr, "Failed to register cb_st_chg()\n"); 222 exit(-1); /* no point in proceeding if callback registration fails */ 223 } 224 225 /* CPU 0 is in charge of printing stats */ 226 if (mctx->cpu == 0 && 227 mtcp_settimer(mctx, sock, &tv_1sec, cb_printstat)) { 228 fprintf(stderr, "Failed to register print timer\n"); 229 exit(-1); /* no point in proceeding if the titmer is broken*/ 230 } 231 } 232 /*----------------------------------------------------------------------------*/ 233 /* Open monitoring socket and ready it for monitoring */ 234 static void 235 InitMonitor(mctx_t mctx, event_t ev_new_syn) 236 { 237 int sock; 238 239 /* Initialize internal memory structures */ 240 TAILQ_INIT(&g_sockq[mctx->cpu]); 241 242 /* create socket and set it as nonblocking */ 243 if ((sock = mtcp_socket(mctx, AF_INET, 244 MOS_SOCK_MONITOR_STREAM, 0)) < 0) { 245 fprintf(stderr, "Failed to create monitor listening socket!\n"); 246 exit(-1); /* no point in proceeding if we don't have a listening socket */ 247 } 248 249 /* Disable socket buffer */ 250 int optval = 0; 251 if (mtcp_setsockopt(mctx, sock, SOL_MONSOCKET, MOS_CLIBUF, 252 &optval, sizeof(optval)) == -1) { 253 fprintf(stderr, "Could not disable CLIBUF!\n"); 254 } 255 if (mtcp_setsockopt(mctx, sock, SOL_MONSOCKET, MOS_SVRBUF, 256 &optval, sizeof(optval)) == -1) { 257 fprintf(stderr, "Could not disable SVRBUF!\n"); 258 } 259 260 RegisterCallbacks(mctx, sock, ev_new_syn); 261 } 262 /*----------------------------------------------------------------------------*/ 263 int 264 main(int argc, char **argv) 265 { 266 int i, opt; 267 event_t ev_new_syn; /* New SYN UDE */ 268 char *fname = MOS_CONFIG_FILE; /* path to the default mos config file */ 269 struct mtcp_conf mcfg; /* mOS configuration */ 270 271 /* get the total # of cpu cores */ 272 g_max_cores = GetNumCPUs(); 273 274 /* Parse command line arguments */ 275 while ((opt = getopt(argc, argv, "c:f:")) != -1) { 276 switch (opt) { 277 case 'f': 278 fname = optarg; 279 break; 280 case 'c': 281 if (atoi(optarg) > g_max_cores) { 282 printf("Available number of CPU cores is %d\n", g_max_cores); 283 return -1; 284 } 285 g_max_cores = atoi(optarg); 286 break; 287 default: 288 printf("Usage: %s [-f mos_config_file] [-c #_of_cpu]\n", argv[0]); 289 return 0; 290 } 291 } 292 293 /* parse mos configuration file */ 294 if (mtcp_init(fname)) { 295 fprintf(stderr, "Failed to initialize mtcp.\n"); 296 exit(EXIT_FAILURE); 297 } 298 299 /* set the core limit */ 300 mtcp_getconf(&mcfg); 301 mcfg.num_cores = g_max_cores; 302 mtcp_setconf(&mcfg); 303 304 /* Register signal handler */ 305 mtcp_register_signal(SIGINT, sigint_handler); 306 307 for (i = 0; i < g_max_cores; i++) { 308 /* Run mOS for each CPU core */ 309 if (!(g_mctx[i] = mtcp_create_context(i))) { 310 fprintf(stderr, "Failed to craete mtcp context.\n"); 311 return -1; 312 } 313 314 /* init monitor */ 315 InitMonitor(g_mctx[i], ev_new_syn); 316 } 317 318 /* wait until mOS finishes */ 319 for (i = 0; i < g_max_cores; i++) 320 mtcp_app_join(g_mctx[i]); 321 322 mtcp_destroy(); 323 return 0; 324 } 325 /*----------------------------------------------------------------------------*/ 326