1 #define _GNU_SOURCE 2 #include <sched.h> 3 #include <unistd.h> 4 #include <sys/time.h> 5 #include <semaphore.h> 6 #include <sys/mman.h> 7 #include <signal.h> 8 #include <assert.h> 9 #include <string.h> 10 11 #include "cpu.h" 12 #include "eth_in.h" 13 #include "fhash.h" 14 #include "tcp_send_buffer.h" 15 #include "tcp_ring_buffer.h" 16 #include "socket.h" 17 #include "eth_out.h" 18 #include "tcp.h" 19 #include "tcp_in.h" 20 #include "tcp_out.h" 21 #include "mtcp_api.h" 22 #include "eventpoll.h" 23 #include "logger.h" 24 #include "config.h" 25 #include "arp.h" 26 #include "ip_out.h" 27 #include "timer.h" 28 #include "debug.h" 29 #include "event_callback.h" 30 #include "tcp_rb.h" 31 #include "tcp_stream.h" 32 #include "io_module.h" 33 34 #ifdef ENABLE_DPDK 35 /* for launching rte thread */ 36 #include <rte_launch.h> 37 #include <rte_lcore.h> 38 #endif /* !ENABLE_DPDK */ 39 #define PS_CHUNK_SIZE 64 40 #define RX_THRESH (PS_CHUNK_SIZE * 0.8) 41 42 #define ROUND_STAT FALSE 43 #define TIME_STAT FALSE 44 #define EVENT_STAT FALSE 45 #define TESTING FALSE 46 47 #define LOG_FILE_NAME "log" 48 #define MAX_FILE_NAME 1024 49 50 #define MAX(a, b) ((a)>(b)?(a):(b)) 51 #define MIN(a, b) ((a)<(b)?(a):(b)) 52 53 #define PER_STREAM_SLICE 0.1 // in ms 54 #define PER_STREAM_TCHECK 1 // in ms 55 #define PS_SELECT_TIMEOUT 100 // in us 56 57 #define GBPS(bytes) (bytes * 8.0 / (1000 * 1000 * 1000)) 58 59 /*----------------------------------------------------------------------------*/ 60 /* handlers for threads */ 61 struct mtcp_thread_context *g_pctx[MAX_CPUS] = {0}; 62 struct log_thread_context *g_logctx[MAX_CPUS] = {0}; 63 /*----------------------------------------------------------------------------*/ 64 static pthread_t g_thread[MAX_CPUS] = {0}; 65 static pthread_t log_thread[MAX_CPUS] = {0}; 66 /*----------------------------------------------------------------------------*/ 67 static sem_t g_init_sem[MAX_CPUS]; 68 static sem_t g_done_sem[MAX_CPUS]; 69 static int running[MAX_CPUS] = {0}; 70 /*----------------------------------------------------------------------------*/ 71 mtcp_sighandler_t app_signal_handler; 72 static int sigint_cnt[MAX_CPUS] = {0}; 73 static struct timespec sigint_ts[MAX_CPUS]; 74 void mtcp_free_context(mctx_t mctx); 75 /*----------------------------------------------------------------------------*/ 76 #ifdef NETSTAT 77 #if NETSTAT_TOTAL 78 static int printer = -1; 79 #if ROUND_STAT 80 #endif /* ROUND_STAT */ 81 #endif /* NETSTAT_TOTAL */ 82 #endif /* NETSTAT */ 83 /*----------------------------------------------------------------------------*/ 84 void 85 HandleSignal(int signal) 86 { 87 int i = 0; 88 89 if (signal == SIGINT) { 90 FreeConfigResources(); 91 #ifdef DARWIN 92 int core = 0; 93 #else 94 int core = sched_getcpu(); 95 #endif 96 struct timespec cur_ts; 97 98 clock_gettime(CLOCK_REALTIME, &cur_ts); 99 100 if (sigint_cnt[core] > 0 && cur_ts.tv_sec == sigint_ts[core].tv_sec) { 101 for (i = 0; i < g_config.mos->num_cores; i++) { 102 if (running[i]) { 103 //exit(0); 104 g_pctx[i]->exit = TRUE; 105 } 106 } 107 } else { 108 for (i = 0; i < g_config.mos->num_cores; i++) { 109 if (g_pctx[i]) 110 g_pctx[i]->interrupt = TRUE; 111 } 112 if (!app_signal_handler) { 113 for (i = 0; i < g_config.mos->num_cores; i++) { 114 if (running[i]) { 115 //exit(0); 116 g_pctx[i]->exit = TRUE; 117 } 118 } 119 } 120 } 121 sigint_cnt[core]++; 122 clock_gettime(CLOCK_REALTIME, &sigint_ts[core]); 123 } 124 125 if (signal != SIGUSR1) { 126 if (app_signal_handler) { 127 app_signal_handler(signal); 128 } 129 } 130 } 131 /*----------------------------------------------------------------------------*/ 132 static int 133 AttachDevice(struct mtcp_thread_context* ctx) 134 { 135 int working = -1; 136 mtcp_manager_t mtcp = ctx->mtcp_manager; 137 138 if (mtcp->iom->link_devices) 139 working = mtcp->iom->link_devices(ctx); 140 else 141 return 0; 142 143 return working; 144 } 145 /*----------------------------------------------------------------------------*/ 146 #ifdef TIMESTAT 147 static inline void 148 InitStatCounter(struct stat_counter *counter) 149 { 150 counter->cnt = 0; 151 counter->sum = 0; 152 counter->max = 0; 153 counter->min = 0; 154 } 155 /*----------------------------------------------------------------------------*/ 156 static inline void 157 UpdateStatCounter(struct stat_counter *counter, int64_t value) 158 { 159 counter->cnt++; 160 counter->sum += value; 161 if (value > counter->max) 162 counter->max = value; 163 if (counter->min == 0 || value < counter->min) 164 counter->min = value; 165 } 166 /*----------------------------------------------------------------------------*/ 167 static inline uint64_t 168 GetAverageStat(struct stat_counter *counter) 169 { 170 return counter->cnt ? (counter->sum / counter->cnt) : 0; 171 } 172 /*----------------------------------------------------------------------------*/ 173 static inline int64_t 174 TimeDiffUs(struct timeval *t2, struct timeval *t1) 175 { 176 return (t2->tv_sec - t1->tv_sec) * 1000000 + 177 (int64_t)(t2->tv_usec - t1->tv_usec); 178 } 179 /*----------------------------------------------------------------------------*/ 180 #endif 181 #ifdef NETSTAT 182 static inline void 183 PrintThreadNetworkStats(mtcp_manager_t mtcp, struct net_stat *ns) 184 { 185 int i; 186 187 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 188 ns->rx_packets[i] = mtcp->nstat.rx_packets[i] - mtcp->p_nstat.rx_packets[i]; 189 ns->rx_errors[i] = mtcp->nstat.rx_errors[i] - mtcp->p_nstat.rx_errors[i]; 190 ns->rx_bytes[i] = mtcp->nstat.rx_bytes[i] - mtcp->p_nstat.rx_bytes[i]; 191 ns->tx_packets[i] = mtcp->nstat.tx_packets[i] - mtcp->p_nstat.tx_packets[i]; 192 ns->tx_drops[i] = mtcp->nstat.tx_drops[i] - mtcp->p_nstat.tx_drops[i]; 193 ns->tx_bytes[i] = mtcp->nstat.tx_bytes[i] - mtcp->p_nstat.tx_bytes[i]; 194 #if NETSTAT_PERTHREAD 195 if (g_config.mos->netdev_table->ent[i]->stat_print) { 196 fprintf(stderr, "[CPU%2d] %s flows: %6u, " 197 "RX: %7llu(pps) (err: %5llu), %5.2lf(Gbps), " 198 "TX: %7llu(pps), %5.2lf(Gbps)\n", 199 mtcp->ctx->cpu, 200 g_config.mos->netdev_table->ent[i]->dev_name, 201 (unsigned)mtcp->flow_cnt, 202 (long long unsigned)ns->rx_packets[i], 203 (long long unsigned)ns->rx_errors[i], 204 GBPS(ns->rx_bytes[i]), 205 (long long unsigned)ns->tx_packets[i], 206 GBPS(ns->tx_bytes[i])); 207 } 208 #endif 209 } 210 mtcp->p_nstat = mtcp->nstat; 211 212 } 213 /*----------------------------------------------------------------------------*/ 214 #if ROUND_STAT 215 static inline void 216 PrintThreadRoundStats(mtcp_manager_t mtcp, struct run_stat *rs) 217 { 218 #define ROUND_DIV (1000) 219 rs->rounds = mtcp->runstat.rounds - mtcp->p_runstat.rounds; 220 rs->rounds_rx = mtcp->runstat.rounds_rx - mtcp->p_runstat.rounds_rx; 221 rs->rounds_rx_try = mtcp->runstat.rounds_rx_try - mtcp->p_runstat.rounds_rx_try; 222 rs->rounds_tx = mtcp->runstat.rounds_tx - mtcp->p_runstat.rounds_tx; 223 rs->rounds_tx_try = mtcp->runstat.rounds_tx_try - mtcp->p_runstat.rounds_tx_try; 224 rs->rounds_select = mtcp->runstat.rounds_select - mtcp->p_runstat.rounds_select; 225 rs->rounds_select_rx = mtcp->runstat.rounds_select_rx - mtcp->p_runstat.rounds_select_rx; 226 rs->rounds_select_tx = mtcp->runstat.rounds_select_tx - mtcp->p_runstat.rounds_select_tx; 227 rs->rounds_select_intr = mtcp->runstat.rounds_select_intr - mtcp->p_runstat.rounds_select_intr; 228 rs->rounds_twcheck = mtcp->runstat.rounds_twcheck - mtcp->p_runstat.rounds_twcheck; 229 mtcp->p_runstat = mtcp->runstat; 230 #if NETSTAT_PERTHREAD 231 fprintf(stderr, "[CPU%2d] Rounds: %4lluK, " 232 "rx: %3lluK (try: %4lluK), tx: %3lluK (try: %4lluK), " 233 "ps_select: %4llu (rx: %4llu, tx: %4llu, intr: %3llu)\n", 234 mtcp->ctx->cpu, rs->rounds / ROUND_DIV, 235 rs->rounds_rx / ROUND_DIV, rs->rounds_rx_try / ROUND_DIV, 236 rs->rounds_tx / ROUND_DIV, rs->rounds_tx_try / ROUND_DIV, 237 rs->rounds_select, 238 rs->rounds_select_rx, rs->rounds_select_tx, rs->rounds_select_intr); 239 #endif 240 } 241 #endif /* ROUND_STAT */ 242 /*----------------------------------------------------------------------------*/ 243 #if TIME_STAT 244 static inline void 245 PrintThreadRoundTime(mtcp_manager_t mtcp) 246 { 247 fprintf(stderr, "[CPU%2d] Time: (avg, max) " 248 "round: (%4luus, %4luus), processing: (%4luus, %4luus), " 249 "tcheck: (%4luus, %4luus), epoll: (%4luus, %4luus), " 250 "handle: (%4luus, %4luus), xmit: (%4luus, %4luus), " 251 "select: (%4luus, %4luus)\n", mtcp->ctx->cpu, 252 GetAverageStat(&mtcp->rtstat.round), mtcp->rtstat.round.max, 253 GetAverageStat(&mtcp->rtstat.processing), mtcp->rtstat.processing.max, 254 GetAverageStat(&mtcp->rtstat.tcheck), mtcp->rtstat.tcheck.max, 255 GetAverageStat(&mtcp->rtstat.epoll), mtcp->rtstat.epoll.max, 256 GetAverageStat(&mtcp->rtstat.handle), mtcp->rtstat.handle.max, 257 GetAverageStat(&mtcp->rtstat.xmit), mtcp->rtstat.xmit.max, 258 GetAverageStat(&mtcp->rtstat.select), mtcp->rtstat.select.max); 259 260 InitStatCounter(&mtcp->rtstat.round); 261 InitStatCounter(&mtcp->rtstat.processing); 262 InitStatCounter(&mtcp->rtstat.tcheck); 263 InitStatCounter(&mtcp->rtstat.epoll); 264 InitStatCounter(&mtcp->rtstat.handle); 265 InitStatCounter(&mtcp->rtstat.xmit); 266 InitStatCounter(&mtcp->rtstat.select); 267 } 268 #endif 269 #endif /* NETSTAT */ 270 /*----------------------------------------------------------------------------*/ 271 #if EVENT_STAT 272 static inline void 273 PrintEventStat(int core, struct mtcp_epoll_stat *stat) 274 { 275 fprintf(stderr, "[CPU%2d] calls: %lu, waits: %lu, wakes: %lu, " 276 "issued: %lu, registered: %lu, invalidated: %lu, handled: %lu\n", 277 core, stat->calls, stat->waits, stat->wakes, 278 stat->issued, stat->registered, stat->invalidated, stat->handled); 279 memset(stat, 0, sizeof(struct mtcp_epoll_stat)); 280 } 281 #endif /* EVENT_STAT */ 282 /*----------------------------------------------------------------------------*/ 283 #ifdef NETSTAT 284 static inline void 285 PrintNetworkStats(mtcp_manager_t mtcp, uint32_t cur_ts) 286 { 287 #define TIMEOUT 1 288 int i; 289 struct net_stat ns; 290 bool stat_print = false; 291 #if ROUND_STAT 292 struct run_stat rs; 293 #endif /* ROUND_STAT */ 294 #ifdef NETSTAT_TOTAL 295 static double peak_total_rx_gbps = 0; 296 static double peak_total_tx_gbps = 0; 297 static double avg_total_rx_gbps = 0; 298 static double avg_total_tx_gbps = 0; 299 300 double total_rx_gbps = 0, total_tx_gbps = 0; 301 int j; 302 uint32_t gflow_cnt = 0; 303 struct net_stat g_nstat; 304 #if ROUND_STAT 305 struct run_stat g_runstat; 306 #endif /* ROUND_STAT */ 307 #endif /* NETSTAT_TOTAL */ 308 309 if (TS_TO_MSEC(cur_ts - mtcp->p_nstat_ts) < SEC_TO_MSEC(TIMEOUT)) { 310 return; 311 } 312 313 mtcp->p_nstat_ts = cur_ts; 314 gflow_cnt = 0; 315 memset(&g_nstat, 0, sizeof(struct net_stat)); 316 for (i = 0; i < g_config.mos->num_cores; i++) { 317 if (running[i]) { 318 PrintThreadNetworkStats(g_mtcp[i], &ns); 319 #if NETSTAT_TOTAL 320 gflow_cnt += g_mtcp[i]->flow_cnt; 321 for (j = 0; j < g_config.mos->netdev_table->num; j++) { 322 g_nstat.rx_packets[j] += ns.rx_packets[j]; 323 g_nstat.rx_errors[j] += ns.rx_errors[j]; 324 g_nstat.rx_bytes[j] += ns.rx_bytes[j]; 325 g_nstat.tx_packets[j] += ns.tx_packets[j]; 326 g_nstat.tx_drops[j] += ns.tx_drops[j]; 327 g_nstat.tx_bytes[j] += ns.tx_bytes[j]; 328 } 329 #endif 330 } 331 } 332 #if NETSTAT_TOTAL 333 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 334 if (g_config.mos->netdev_table->ent[i]->stat_print) { 335 fprintf(stderr, "[ ALL ] %s, " 336 "RX: %7llu(pps) (err: %5llu), %5.2lf(Gbps), " 337 "TX: %7llu(pps), %5.2lf(Gbps)\n", 338 g_config.mos->netdev_table->ent[i]->dev_name, 339 (long long unsigned)g_nstat.rx_packets[i], 340 (long long unsigned)g_nstat.rx_errors[i], 341 GBPS(g_nstat.rx_bytes[i]), 342 (long long unsigned)g_nstat.tx_packets[i], 343 GBPS(g_nstat.tx_bytes[i])); 344 total_rx_gbps += GBPS(g_nstat.rx_bytes[i]); 345 total_tx_gbps += GBPS(g_nstat.tx_bytes[i]); 346 stat_print = true; 347 } 348 } 349 if (stat_print) { 350 fprintf(stderr, "[ ALL ] flows: %6u\n", gflow_cnt); 351 if (avg_total_rx_gbps == 0) 352 avg_total_rx_gbps = total_rx_gbps; 353 else 354 avg_total_rx_gbps = avg_total_rx_gbps * 0.6 + total_rx_gbps * 0.4; 355 356 if (avg_total_tx_gbps == 0) 357 avg_total_tx_gbps = total_tx_gbps; 358 else 359 avg_total_tx_gbps = avg_total_tx_gbps * 0.6 + total_tx_gbps * 0.4; 360 361 if (peak_total_rx_gbps < total_rx_gbps) 362 peak_total_rx_gbps = total_rx_gbps; 363 if (peak_total_tx_gbps < total_tx_gbps) 364 peak_total_tx_gbps = total_tx_gbps; 365 366 fprintf(stderr, "[ PEAK ] RX: %5.2lf(Gbps), TX: %5.2lf(Gbps)\n" 367 "[ RECENT AVG ] RX: %5.2lf(Gbps), TX: %5.2lf(Gbps)\n", 368 peak_total_rx_gbps, peak_total_tx_gbps, 369 avg_total_rx_gbps, avg_total_tx_gbps); 370 } 371 #endif 372 373 #if ROUND_STAT 374 memset(&g_runstat, 0, sizeof(struct run_stat)); 375 for (i = 0; i < g_config.mos->num_cores; i++) { 376 if (running[i]) { 377 PrintThreadRoundStats(g_mtcp[i], &rs); 378 #if DBGMSG 379 g_runstat.rounds += rs.rounds; 380 g_runstat.rounds_rx += rs.rounds_rx; 381 g_runstat.rounds_rx_try += rs.rounds_rx_try; 382 g_runstat.rounds_tx += rs.rounds_tx; 383 g_runstat.rounds_tx_try += rs.rounds_tx_try; 384 g_runstat.rounds_select += rs.rounds_select; 385 g_runstat.rounds_select_rx += rs.rounds_select_rx; 386 g_runstat.rounds_select_tx += rs.rounds_select_tx; 387 #endif 388 } 389 } 390 391 TRACE_DBG("[ ALL ] Rounds: %4ldK, " 392 "rx: %3ldK (try: %4ldK), tx: %3ldK (try: %4ldK), " 393 "ps_select: %4ld (rx: %4ld, tx: %4ld)\n", 394 g_runstat.rounds / 1000, g_runstat.rounds_rx / 1000, 395 g_runstat.rounds_rx_try / 1000, g_runstat.rounds_tx / 1000, 396 g_runstat.rounds_tx_try / 1000, g_runstat.rounds_select, 397 g_runstat.rounds_select_rx, g_runstat.rounds_select_tx); 398 #endif /* ROUND_STAT */ 399 400 #if TIME_STAT 401 for (i = 0; i < g_config.mos->num_cores; i++) { 402 if (running[i]) { 403 PrintThreadRoundTime(g_mtcp[i]); 404 } 405 } 406 #endif 407 408 #if EVENT_STAT 409 for (i = 0; i < g_config.mos->num_cores; i++) { 410 if (running[i] && g_mtcp[i]->ep) { 411 PrintEventStat(i, &g_mtcp[i]->ep->stat); 412 } 413 } 414 #endif 415 416 fflush(stderr); 417 } 418 #endif /* NETSTAT */ 419 /*----------------------------------------------------------------------------*/ 420 static inline void 421 FlushMonitorReadEvents(mtcp_manager_t mtcp) 422 { 423 struct event_queue *mtcpq; 424 struct tcp_stream *cur_stream; 425 struct mon_listener *walk; 426 427 /* check if monitor sockets should be passed data */ 428 TAILQ_FOREACH(walk, &mtcp->monitors, link) { 429 if (walk->socket->socktype != MOS_SOCK_MONITOR_STREAM || 430 !(mtcpq = walk->eq)) 431 continue; 432 433 while (mtcpq->num_events > 0) { 434 cur_stream = 435 (struct tcp_stream *)mtcpq->events[mtcpq->start++].ev.data.ptr; 436 /* only read events */ 437 if (cur_stream != NULL && 438 (cur_stream->actions & MOS_ACT_READ_DATA)) { 439 if (cur_stream->rcvvar != NULL && 440 cur_stream->rcvvar->rcvbuf != NULL) { 441 /* no need to pass pkt context */ 442 struct socket_map *walk; 443 if (cur_stream->side == MOS_SIDE_CLI) { 444 SOCKQ_FOREACH_REVERSE(walk, &cur_stream->msocks) { 445 HandleCallback(mtcp, MOS_NULL, walk, 446 cur_stream->side, NULL, 447 MOS_ON_CONN_NEW_DATA); 448 } SOCKQ_FOREACH_END; 449 } else { /* cur_stream->side == MOS_SIDE_SVR */ 450 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 451 HandleCallback(mtcp, MOS_NULL, walk, 452 cur_stream->side, NULL, 453 MOS_ON_CONN_NEW_DATA); 454 } SOCKQ_FOREACH_END; 455 } 456 } 457 /* reset the actions now */ 458 cur_stream->actions = 0; 459 } 460 if (mtcpq->start >= mtcpq->size) 461 mtcpq->start = 0; 462 mtcpq->num_events--; 463 } 464 } 465 } 466 /*----------------------------------------------------------------------------*/ 467 static inline void 468 FlushBufferedReadEvents(mtcp_manager_t mtcp) 469 { 470 int i; 471 int offset; 472 struct event_queue *mtcpq; 473 struct tcp_stream *cur_stream; 474 475 if (mtcp->ep == NULL) { 476 TRACE_EPOLL("No epoll socket has been registered yet!\n"); 477 return; 478 } else { 479 /* case when mtcpq exists */ 480 mtcpq = mtcp->ep->mtcp_queue; 481 offset = mtcpq->start; 482 } 483 484 /* we will use queued-up epoll read-in events 485 * to trigger buffered read monitor events */ 486 for (i = 0; i < mtcpq->num_events; i++) { 487 cur_stream = mtcp->smap[mtcpq->events[offset++].sockid].stream; 488 /* only read events */ 489 /* Raise new data callback event */ 490 if (cur_stream != NULL && 491 (cur_stream->socket->events | MOS_EPOLLIN)) { 492 if (cur_stream->rcvvar != NULL && 493 cur_stream->rcvvar->rcvbuf != NULL) { 494 /* no need to pass pkt context */ 495 struct socket_map *walk; 496 if (cur_stream->side == MOS_SIDE_CLI) { 497 SOCKQ_FOREACH_REVERSE(walk, &cur_stream->msocks) { 498 HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side, 499 NULL, MOS_ON_CONN_NEW_DATA); 500 } SOCKQ_FOREACH_END; 501 } else { /* cur_stream->side == MOS_SIDE_SVR */ 502 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 503 HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side, 504 NULL, MOS_ON_CONN_NEW_DATA); 505 } SOCKQ_FOREACH_END; 506 } 507 } 508 } 509 if (offset >= mtcpq->size) 510 offset = 0; 511 } 512 } 513 /*----------------------------------------------------------------------------*/ 514 static inline void 515 FlushEpollEvents(mtcp_manager_t mtcp, uint32_t cur_ts) 516 { 517 struct mtcp_epoll *ep = mtcp->ep; 518 struct event_queue *usrq = ep->usr_queue; 519 struct event_queue *mtcpq = ep->mtcp_queue; 520 521 pthread_mutex_lock(&ep->epoll_lock); 522 if (ep->mtcp_queue->num_events > 0) { 523 /* while mtcp_queue have events */ 524 /* and usr_queue is not full */ 525 while (mtcpq->num_events > 0 && usrq->num_events < usrq->size) { 526 /* copy the event from mtcp_queue to usr_queue */ 527 usrq->events[usrq->end++] = mtcpq->events[mtcpq->start++]; 528 529 if (usrq->end >= usrq->size) 530 usrq->end = 0; 531 usrq->num_events++; 532 533 if (mtcpq->start >= mtcpq->size) 534 mtcpq->start = 0; 535 mtcpq->num_events--; 536 } 537 } 538 539 /* if there are pending events, wake up user */ 540 if (ep->waiting && (ep->usr_queue->num_events > 0 || 541 ep->usr_shadow_queue->num_events > 0)) { 542 STAT_COUNT(mtcp->runstat.rounds_epoll); 543 TRACE_EPOLL("Broadcasting events. num: %d, cur_ts: %u, prev_ts: %u\n", 544 ep->usr_queue->num_events, cur_ts, mtcp->ts_last_event); 545 mtcp->ts_last_event = cur_ts; 546 ep->stat.wakes++; 547 pthread_cond_signal(&ep->epoll_cond); 548 } 549 pthread_mutex_unlock(&ep->epoll_lock); 550 } 551 /*----------------------------------------------------------------------------*/ 552 static inline void 553 HandleApplicationCalls(mtcp_manager_t mtcp, uint32_t cur_ts) 554 { 555 tcp_stream *stream; 556 int cnt, max_cnt; 557 int handled, delayed; 558 int control, send, ack; 559 560 /* connect handling */ 561 while ((stream = StreamDequeue(mtcp->connectq))) { 562 if (stream->state != TCP_ST_SYN_SENT) { 563 TRACE_INFO("Got a connection request from app with state: %s", 564 TCPStateToString(stream)); 565 exit(EXIT_FAILURE); 566 } else { 567 stream->cb_events |= MOS_ON_CONN_START | 568 MOS_ON_TCP_STATE_CHANGE; 569 /* if monitor is on... */ 570 if (stream->pair_stream != NULL) 571 stream->pair_stream->cb_events |= 572 MOS_ON_CONN_START; 573 } 574 AddtoControlList(mtcp, stream, cur_ts); 575 } 576 577 /* send queue handling */ 578 while ((stream = StreamDequeue(mtcp->sendq))) { 579 stream->sndvar->on_sendq = FALSE; 580 AddtoSendList(mtcp, stream); 581 } 582 583 /* ack queue handling */ 584 while ((stream = StreamDequeue(mtcp->ackq))) { 585 stream->sndvar->on_ackq = FALSE; 586 EnqueueACK(mtcp, stream, cur_ts, ACK_OPT_AGGREGATE); 587 } 588 589 /* close handling */ 590 handled = delayed = 0; 591 control = send = ack = 0; 592 while ((stream = StreamDequeue(mtcp->closeq))) { 593 struct tcp_send_vars *sndvar = stream->sndvar; 594 sndvar->on_closeq = FALSE; 595 596 if (sndvar->sndbuf) { 597 sndvar->fss = sndvar->sndbuf->head_seq + sndvar->sndbuf->len; 598 } else { 599 sndvar->fss = stream->snd_nxt; 600 } 601 602 if (g_config.mos->tcp_timeout > 0) 603 RemoveFromTimeoutList(mtcp, stream); 604 605 if (stream->have_reset) { 606 handled++; 607 if (stream->state != TCP_ST_CLOSED_RSVD) { 608 stream->close_reason = TCP_RESET; 609 stream->state = TCP_ST_CLOSED_RSVD; 610 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 611 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 612 DestroyTCPStream(mtcp, stream); 613 } else { 614 TRACE_ERROR("Stream already closed.\n"); 615 } 616 617 } else if (sndvar->on_control_list) { 618 sndvar->on_closeq_int = TRUE; 619 StreamInternalEnqueue(mtcp->closeq_int, stream); 620 delayed++; 621 if (sndvar->on_control_list) 622 control++; 623 if (sndvar->on_send_list) 624 send++; 625 if (sndvar->on_ack_list) 626 ack++; 627 628 } else if (sndvar->on_send_list || sndvar->on_ack_list) { 629 handled++; 630 if (stream->state == TCP_ST_ESTABLISHED) { 631 stream->state = TCP_ST_FIN_WAIT_1; 632 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 633 TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id); 634 635 } else if (stream->state == TCP_ST_CLOSE_WAIT) { 636 stream->state = TCP_ST_LAST_ACK; 637 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 638 TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id); 639 } 640 stream->control_list_waiting = TRUE; 641 642 } else if (stream->state != TCP_ST_CLOSED_RSVD) { 643 handled++; 644 if (stream->state == TCP_ST_ESTABLISHED) { 645 stream->state = TCP_ST_FIN_WAIT_1; 646 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 647 TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id); 648 649 } else if (stream->state == TCP_ST_CLOSE_WAIT) { 650 stream->state = TCP_ST_LAST_ACK; 651 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 652 TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id); 653 } 654 //sndvar->rto = TCP_FIN_RTO; 655 //UpdateRetransmissionTimer(mtcp, stream, mtcp->cur_ts); 656 AddtoControlList(mtcp, stream, cur_ts); 657 } else { 658 TRACE_ERROR("Already closed connection!\n"); 659 } 660 } 661 TRACE_ROUND("Handling close connections. cnt: %d\n", cnt); 662 663 cnt = 0; 664 max_cnt = mtcp->closeq_int->count; 665 while (cnt++ < max_cnt) { 666 stream = StreamInternalDequeue(mtcp->closeq_int); 667 668 if (stream->sndvar->on_control_list) { 669 StreamInternalEnqueue(mtcp->closeq_int, stream); 670 671 } else if (stream->state != TCP_ST_CLOSED_RSVD) { 672 handled++; 673 stream->sndvar->on_closeq_int = FALSE; 674 if (stream->state == TCP_ST_ESTABLISHED) { 675 stream->state = TCP_ST_FIN_WAIT_1; 676 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 677 TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id); 678 679 } else if (stream->state == TCP_ST_CLOSE_WAIT) { 680 stream->state = TCP_ST_LAST_ACK; 681 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 682 TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id); 683 } 684 AddtoControlList(mtcp, stream, cur_ts); 685 } else { 686 stream->sndvar->on_closeq_int = FALSE; 687 TRACE_ERROR("Already closed connection!\n"); 688 } 689 } 690 691 /* reset handling */ 692 while ((stream = StreamDequeue(mtcp->resetq))) { 693 stream->sndvar->on_resetq = FALSE; 694 695 if (g_config.mos->tcp_timeout > 0) 696 RemoveFromTimeoutList(mtcp, stream); 697 698 if (stream->have_reset) { 699 if (stream->state != TCP_ST_CLOSED_RSVD) { 700 stream->close_reason = TCP_RESET; 701 stream->state = TCP_ST_CLOSED_RSVD; 702 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 703 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 704 DestroyTCPStream(mtcp, stream); 705 } else { 706 TRACE_ERROR("Stream already closed.\n"); 707 } 708 709 } else if (stream->sndvar->on_control_list || 710 stream->sndvar->on_send_list || stream->sndvar->on_ack_list) { 711 /* wait until all the queues are flushed */ 712 stream->sndvar->on_resetq_int = TRUE; 713 StreamInternalEnqueue(mtcp->resetq_int, stream); 714 715 } else { 716 if (stream->state != TCP_ST_CLOSED_RSVD) { 717 stream->close_reason = TCP_ACTIVE_CLOSE; 718 stream->state = TCP_ST_CLOSED_RSVD; 719 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 720 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 721 AddtoControlList(mtcp, stream, cur_ts); 722 } else { 723 TRACE_ERROR("Stream already closed.\n"); 724 } 725 } 726 } 727 TRACE_ROUND("Handling reset connections. cnt: %d\n", cnt); 728 729 cnt = 0; 730 max_cnt = mtcp->resetq_int->count; 731 while (cnt++ < max_cnt) { 732 stream = StreamInternalDequeue(mtcp->resetq_int); 733 734 if (stream->sndvar->on_control_list || 735 stream->sndvar->on_send_list || stream->sndvar->on_ack_list) { 736 /* wait until all the queues are flushed */ 737 StreamInternalEnqueue(mtcp->resetq_int, stream); 738 739 } else { 740 stream->sndvar->on_resetq_int = FALSE; 741 742 if (stream->state != TCP_ST_CLOSED_RSVD) { 743 stream->close_reason = TCP_ACTIVE_CLOSE; 744 stream->state = TCP_ST_CLOSED_RSVD; 745 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 746 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 747 AddtoControlList(mtcp, stream, cur_ts); 748 } else { 749 TRACE_ERROR("Stream already closed.\n"); 750 } 751 } 752 } 753 754 /* destroy streams in destroyq */ 755 while ((stream = StreamDequeue(mtcp->destroyq))) { 756 DestroyTCPStream(mtcp, stream); 757 } 758 759 mtcp->wakeup_flag = FALSE; 760 } 761 /*----------------------------------------------------------------------------*/ 762 static inline void 763 WritePacketsToChunks(mtcp_manager_t mtcp, uint32_t cur_ts) 764 { 765 int thresh = g_config.mos->max_concurrency; 766 int i; 767 768 /* Set the threshold to g_config.mos->max_concurrency to send ACK immediately */ 769 /* Otherwise, set to appropriate value (e.g. thresh) */ 770 assert(mtcp->g_sender != NULL); 771 if (mtcp->g_sender->control_list_cnt) 772 WriteTCPControlList(mtcp, mtcp->g_sender, cur_ts, thresh); 773 if (mtcp->g_sender->ack_list_cnt) 774 WriteTCPACKList(mtcp, mtcp->g_sender, cur_ts, thresh); 775 if (mtcp->g_sender->send_list_cnt) 776 WriteTCPDataList(mtcp, mtcp->g_sender, cur_ts, thresh); 777 778 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 779 assert(mtcp->n_sender[i] != NULL); 780 if (mtcp->n_sender[i]->control_list_cnt) 781 WriteTCPControlList(mtcp, mtcp->n_sender[i], cur_ts, thresh); 782 if (mtcp->n_sender[i]->ack_list_cnt) 783 WriteTCPACKList(mtcp, mtcp->n_sender[i], cur_ts, thresh); 784 if (mtcp->n_sender[i]->send_list_cnt) 785 WriteTCPDataList(mtcp, mtcp->n_sender[i], cur_ts, thresh); 786 } 787 } 788 /*----------------------------------------------------------------------------*/ 789 #if TESTING 790 static int 791 DestroyRemainingFlows(mtcp_manager_t mtcp) 792 { 793 struct hashtable *ht = mtcp->tcp_flow_table; 794 tcp_stream *walk; 795 int cnt, i; 796 797 cnt = 0; 798 799 thread_printf(mtcp, mtcp->log_fp, 800 "CPU %d: Flushing remaining flows.\n", mtcp->ctx->cpu); 801 802 for (i = 0; i < NUM_BINS; i++) { 803 TAILQ_FOREACH(walk, &ht->ht_table[i], rcvvar->he_link) { 804 thread_printf(mtcp, mtcp->log_fp, 805 "CPU %d: Destroying stream %d\n", mtcp->ctx->cpu, walk->id); 806 #ifdef DUMP_STREAM 807 DumpStream(mtcp, walk); 808 #endif 809 DestroyTCPStream(mtcp, walk); 810 cnt++; 811 } 812 } 813 814 return cnt; 815 } 816 #endif 817 /*----------------------------------------------------------------------------*/ 818 static void 819 InterruptApplication(mtcp_manager_t mtcp) 820 { 821 /* interrupt if the mtcp_epoll_wait() is waiting */ 822 if (mtcp->ep) { 823 pthread_mutex_lock(&mtcp->ep->epoll_lock); 824 if (mtcp->ep->waiting) { 825 pthread_cond_signal(&mtcp->ep->epoll_cond); 826 } 827 pthread_mutex_unlock(&mtcp->ep->epoll_lock); 828 } 829 /* interrupt if the accept() is waiting */ 830 if (mtcp->listener) { 831 if (mtcp->listener->socket) { 832 pthread_mutex_lock(&mtcp->listener->accept_lock); 833 if (!(mtcp->listener->socket->opts & MTCP_NONBLOCK)) { 834 pthread_cond_signal(&mtcp->listener->accept_cond); 835 } 836 pthread_mutex_unlock(&mtcp->listener->accept_lock); 837 } 838 } 839 } 840 /*----------------------------------------------------------------------------*/ 841 void 842 RunPassiveLoop(mtcp_manager_t mtcp) 843 { 844 sem_wait(&g_done_sem[mtcp->ctx->cpu]); 845 sem_destroy(&g_done_sem[mtcp->ctx->cpu]); 846 return; 847 } 848 /*----------------------------------------------------------------------------*/ 849 static void 850 RunMainLoop(struct mtcp_thread_context *ctx) 851 { 852 mtcp_manager_t mtcp = ctx->mtcp_manager; 853 int i; 854 int recv_cnt; 855 int rx_inf, tx_inf; 856 struct timeval cur_ts = {0}; 857 uint32_t ts, ts_prev; 858 859 #if TIME_STAT 860 struct timeval prev_ts, processing_ts, tcheck_ts, 861 epoll_ts, handle_ts, xmit_ts, select_ts; 862 #endif 863 int thresh; 864 865 gettimeofday(&cur_ts, NULL); 866 867 TRACE_DBG("CPU %d: mtcp thread running.\n", ctx->cpu); 868 869 #if TIME_STAT 870 prev_ts = cur_ts; 871 InitStatCounter(&mtcp->rtstat.round); 872 InitStatCounter(&mtcp->rtstat.processing); 873 InitStatCounter(&mtcp->rtstat.tcheck); 874 InitStatCounter(&mtcp->rtstat.epoll); 875 InitStatCounter(&mtcp->rtstat.handle); 876 InitStatCounter(&mtcp->rtstat.xmit); 877 InitStatCounter(&mtcp->rtstat.select); 878 #endif 879 880 ts = ts_prev = 0; 881 while ((!ctx->done || mtcp->flow_cnt) && !ctx->exit) { 882 883 STAT_COUNT(mtcp->runstat.rounds); 884 recv_cnt = 0; 885 gettimeofday(&cur_ts, NULL); 886 #if TIME_STAT 887 /* measure the inter-round delay */ 888 UpdateStatCounter(&mtcp->rtstat.round, TimeDiffUs(&cur_ts, &prev_ts)); 889 prev_ts = cur_ts; 890 #endif 891 892 ts = TIMEVAL_TO_TS(&cur_ts); 893 mtcp->cur_ts = ts; 894 895 for (rx_inf = 0; rx_inf < g_config.mos->netdev_table->num; rx_inf++) { 896 897 recv_cnt = mtcp->iom->recv_pkts(ctx, rx_inf); 898 STAT_COUNT(mtcp->runstat.rounds_rx_try); 899 900 for (i = 0; i < recv_cnt; i++) { 901 uint16_t len; 902 uint8_t *pktbuf; 903 pktbuf = mtcp->iom->get_rptr(mtcp->ctx, rx_inf, i, &len); 904 ProcessPacket(mtcp, rx_inf, i, ts, pktbuf, len); 905 } 906 907 } 908 STAT_COUNT(mtcp->runstat.rounds_rx); 909 910 #if TIME_STAT 911 gettimeofday(&processing_ts, NULL); 912 UpdateStatCounter(&mtcp->rtstat.processing, 913 TimeDiffUs(&processing_ts, &cur_ts)); 914 #endif /* TIME_STAT */ 915 916 /* Handle user defined timeout */ 917 struct timer *walk, *tmp; 918 for (walk = TAILQ_FIRST(&mtcp->timer_list); walk != NULL; walk = tmp) { 919 tmp = TAILQ_NEXT(walk, timer_link); 920 if (TIMEVAL_LT(&cur_ts, &walk->exp)) 921 break; 922 923 struct mtcp_context mctx = {.cpu = ctx->cpu}; 924 walk->cb(&mctx, walk->id, 0, 0 /* FIXME */, NULL); 925 DelTimer(mtcp, walk); 926 } 927 928 /* interaction with application */ 929 if (mtcp->flow_cnt > 0) { 930 931 /* check retransmission timeout and timewait expire */ 932 #if 0 933 thresh = (int)mtcp->flow_cnt / (TS_TO_USEC(PER_STREAM_TCHECK)); 934 assert(thresh >= 0); 935 if (thresh == 0) 936 thresh = 1; 937 if (recv_cnt > 0 && thresh > recv_cnt) 938 thresh = recv_cnt; 939 #else 940 thresh = g_config.mos->max_concurrency; 941 #endif 942 943 /* Eunyoung, you may fix this later 944 * if there is no rcv packet, we will send as much as possible 945 */ 946 if (thresh == -1) 947 thresh = g_config.mos->max_concurrency; 948 949 CheckRtmTimeout(mtcp, ts, thresh); 950 CheckTimewaitExpire(mtcp, ts, thresh); 951 952 if (g_config.mos->tcp_timeout > 0 && ts != ts_prev) { 953 CheckConnectionTimeout(mtcp, ts, thresh); 954 } 955 956 #if TIME_STAT 957 } 958 gettimeofday(&tcheck_ts, NULL); 959 UpdateStatCounter(&mtcp->rtstat.tcheck, 960 TimeDiffUs(&tcheck_ts, &processing_ts)); 961 962 if (mtcp->flow_cnt > 0) { 963 #endif /* TIME_STAT */ 964 965 } 966 967 /* 968 * before flushing epoll events, call monitor events for 969 * all registered `read` events 970 */ 971 if (mtcp->num_msp > 0) 972 /* call this when only a standalone monitor is running */ 973 FlushMonitorReadEvents(mtcp); 974 975 /* if epoll is in use, flush all the queued events */ 976 if (mtcp->ep) { 977 FlushBufferedReadEvents(mtcp); 978 FlushEpollEvents(mtcp, ts); 979 } 980 #if TIME_STAT 981 gettimeofday(&epoll_ts, NULL); 982 UpdateStatCounter(&mtcp->rtstat.epoll, 983 TimeDiffUs(&epoll_ts, &tcheck_ts)); 984 #endif /* TIME_STAT */ 985 986 if (end_app_exists && mtcp->flow_cnt > 0) { 987 /* handle stream queues */ 988 HandleApplicationCalls(mtcp, ts); 989 } 990 991 #if TIME_STAT 992 gettimeofday(&handle_ts, NULL); 993 UpdateStatCounter(&mtcp->rtstat.handle, 994 TimeDiffUs(&handle_ts, &epoll_ts)); 995 #endif /* TIME_STAT */ 996 997 WritePacketsToChunks(mtcp, ts); 998 999 /* send packets from write buffer */ 1000 /* Send until tx is available */ 1001 int num_dev = g_config.mos->netdev_table->num; 1002 if (likely(mtcp->iom->send_pkts != NULL)) 1003 for (tx_inf = 0; tx_inf < num_dev; tx_inf++) { 1004 mtcp->iom->send_pkts(ctx, tx_inf); 1005 } 1006 1007 #if TIME_STAT 1008 gettimeofday(&xmit_ts, NULL); 1009 UpdateStatCounter(&mtcp->rtstat.xmit, 1010 TimeDiffUs(&xmit_ts, &handle_ts)); 1011 #endif /* TIME_STAT */ 1012 1013 if (ts != ts_prev) { 1014 ts_prev = ts; 1015 #ifdef NETSTAT 1016 if (ctx->cpu == printer) { 1017 #ifdef RUN_ARP 1018 ARPTimer(mtcp, ts); 1019 #endif 1020 #ifdef NETSTAT 1021 PrintNetworkStats(mtcp, ts); 1022 #endif 1023 } 1024 #endif /* NETSTAT */ 1025 } 1026 1027 if (mtcp->iom->select) 1028 mtcp->iom->select(ctx); 1029 1030 if (ctx->interrupt) { 1031 InterruptApplication(mtcp); 1032 } 1033 } 1034 1035 #if TESTING 1036 DestroyRemainingFlows(mtcp); 1037 #endif 1038 1039 TRACE_DBG("MTCP thread %d out of main loop.\n", ctx->cpu); 1040 /* flush logs */ 1041 flush_log_data(mtcp); 1042 TRACE_DBG("MTCP thread %d flushed logs.\n", ctx->cpu); 1043 InterruptApplication(mtcp); 1044 TRACE_INFO("MTCP thread %d finished.\n", ctx->cpu); 1045 } 1046 /*----------------------------------------------------------------------------*/ 1047 struct mtcp_sender * 1048 CreateMTCPSender(int ifidx) 1049 { 1050 struct mtcp_sender *sender; 1051 1052 sender = (struct mtcp_sender *)calloc(1, sizeof(struct mtcp_sender)); 1053 if (!sender) { 1054 return NULL; 1055 } 1056 1057 sender->ifidx = ifidx; 1058 1059 TAILQ_INIT(&sender->control_list); 1060 TAILQ_INIT(&sender->send_list); 1061 TAILQ_INIT(&sender->ack_list); 1062 1063 sender->control_list_cnt = 0; 1064 sender->send_list_cnt = 0; 1065 sender->ack_list_cnt = 0; 1066 1067 return sender; 1068 } 1069 /*----------------------------------------------------------------------------*/ 1070 void 1071 DestroyMTCPSender(struct mtcp_sender *sender) 1072 { 1073 free(sender); 1074 } 1075 /*----------------------------------------------------------------------------*/ 1076 static mtcp_manager_t 1077 InitializeMTCPManager(struct mtcp_thread_context* ctx) 1078 { 1079 mtcp_manager_t mtcp; 1080 char log_name[MAX_FILE_NAME]; 1081 int i; 1082 1083 posix_seq_srand((unsigned)pthread_self()); 1084 1085 mtcp = (mtcp_manager_t)calloc(1, sizeof(struct mtcp_manager)); 1086 if (!mtcp) { 1087 perror("malloc"); 1088 TRACE_ERROR("Failed to allocate mtcp_manager.\n"); 1089 return NULL; 1090 } 1091 g_mtcp[ctx->cpu] = mtcp; 1092 1093 mtcp->tcp_flow_table = CreateHashtable(); 1094 if (!mtcp->tcp_flow_table) { 1095 CTRACE_ERROR("Falied to allocate tcp flow table.\n"); 1096 return NULL; 1097 } 1098 1099 #ifdef HUGEPAGE 1100 #define IS_HUGEPAGE 1 1101 #else 1102 #define IS_HUGEPAGE 0 1103 #endif 1104 if (mon_app_exists) { 1105 /* initialize event callback */ 1106 #ifdef NEWEV 1107 InitEvent(mtcp); 1108 #else 1109 InitEvent(mtcp, NUM_EV_TABLE); 1110 #endif 1111 } 1112 1113 if (!(mtcp->bufseg_pool = MPCreate(sizeof(tcpbufseg_t), 1114 sizeof(tcpbufseg_t) * g_config.mos->max_concurrency * 1115 ((g_config.mos->rmem_size - 1) / UNITBUFSIZE + 1), 0))) { 1116 TRACE_ERROR("Failed to allocate ev_table pool\n"); 1117 exit(0); 1118 } 1119 if (!(mtcp->sockent_pool = MPCreate(sizeof(struct sockent), 1120 sizeof(struct sockent) * g_config.mos->max_concurrency * 3, 0))) { 1121 TRACE_ERROR("Failed to allocate ev_table pool\n"); 1122 exit(0); 1123 } 1124 #ifdef USE_TIMER_POOL 1125 if (!(mtcp->timer_pool = MPCreate(sizeof(struct timer), 1126 sizeof(struct timer) * g_config.mos->max_concurrency * 10, 0))) { 1127 TRACE_ERROR("Failed to allocate ev_table pool\n"); 1128 exit(0); 1129 } 1130 #endif 1131 mtcp->flow_pool = MPCreate(sizeof(tcp_stream), 1132 sizeof(tcp_stream) * g_config.mos->max_concurrency, IS_HUGEPAGE); 1133 if (!mtcp->flow_pool) { 1134 CTRACE_ERROR("Failed to allocate tcp flow pool.\n"); 1135 return NULL; 1136 } 1137 mtcp->rv_pool = MPCreate(sizeof(struct tcp_recv_vars), 1138 sizeof(struct tcp_recv_vars) * g_config.mos->max_concurrency, IS_HUGEPAGE); 1139 if (!mtcp->rv_pool) { 1140 CTRACE_ERROR("Failed to allocate tcp recv variable pool.\n"); 1141 return NULL; 1142 } 1143 mtcp->sv_pool = MPCreate(sizeof(struct tcp_send_vars), 1144 sizeof(struct tcp_send_vars) * g_config.mos->max_concurrency, IS_HUGEPAGE); 1145 if (!mtcp->sv_pool) { 1146 CTRACE_ERROR("Failed to allocate tcp send variable pool.\n"); 1147 return NULL; 1148 } 1149 1150 mtcp->rbm_snd = SBManagerCreate(g_config.mos->wmem_size, g_config.mos->no_ring_buffers, 1151 g_config.mos->max_concurrency); 1152 if (!mtcp->rbm_snd) { 1153 CTRACE_ERROR("Failed to create send ring buffer.\n"); 1154 return NULL; 1155 } 1156 1157 mtcp->smap = (socket_map_t)calloc(g_config.mos->max_concurrency, sizeof(struct socket_map)); 1158 if (!mtcp->smap) { 1159 perror("calloc"); 1160 CTRACE_ERROR("Failed to allocate memory for stream map.\n"); 1161 return NULL; 1162 } 1163 1164 if (mon_app_exists) { 1165 mtcp->msmap = (socket_map_t)calloc(g_config.mos->max_concurrency, sizeof(struct socket_map)); 1166 if (!mtcp->msmap) { 1167 perror("calloc"); 1168 CTRACE_ERROR("Failed to allocate memory for monitor stream map.\n"); 1169 return NULL; 1170 } 1171 1172 for (i = 0; i < g_config.mos->max_concurrency; i++) { 1173 mtcp->msmap[i].monitor_stream = calloc(1, sizeof(struct mon_stream)); 1174 if (!mtcp->msmap[i].monitor_stream) { 1175 perror("calloc"); 1176 CTRACE_ERROR("Failed to allocate memory for monitr stream map\n"); 1177 return NULL; 1178 } 1179 } 1180 } 1181 1182 TAILQ_INIT(&mtcp->timer_list); 1183 TAILQ_INIT(&mtcp->monitors); 1184 1185 TAILQ_INIT(&mtcp->free_smap); 1186 for (i = 0; i < g_config.mos->max_concurrency; i++) { 1187 mtcp->smap[i].id = i; 1188 mtcp->smap[i].socktype = MOS_SOCK_UNUSED; 1189 memset(&mtcp->smap[i].saddr, 0, sizeof(struct sockaddr_in)); 1190 mtcp->smap[i].stream = NULL; 1191 TAILQ_INSERT_TAIL(&mtcp->free_smap, &mtcp->smap[i], link); 1192 } 1193 1194 if (mon_app_exists) { 1195 TAILQ_INIT(&mtcp->free_msmap); 1196 for (i = 0; i < g_config.mos->max_concurrency; i++) { 1197 mtcp->msmap[i].id = i; 1198 mtcp->msmap[i].socktype = MOS_SOCK_UNUSED; 1199 memset(&mtcp->msmap[i].saddr, 0, sizeof(struct sockaddr_in)); 1200 TAILQ_INSERT_TAIL(&mtcp->free_msmap, &mtcp->msmap[i], link); 1201 } 1202 } 1203 1204 mtcp->ctx = ctx; 1205 mtcp->ep = NULL; 1206 1207 snprintf(log_name, MAX_FILE_NAME, "%s/"LOG_FILE_NAME"_%d", 1208 g_config.mos->mos_log, ctx->cpu); 1209 mtcp->log_fp = fopen(log_name, "w+"); 1210 if (!mtcp->log_fp) { 1211 perror("fopen"); 1212 CTRACE_ERROR("Failed to create file for logging. (%s)\n", log_name); 1213 return NULL; 1214 } 1215 mtcp->sp_fd = g_logctx[ctx->cpu]->pair_sp_fd; 1216 mtcp->logger = g_logctx[ctx->cpu]; 1217 1218 mtcp->connectq = CreateStreamQueue(BACKLOG_SIZE); 1219 if (!mtcp->connectq) { 1220 CTRACE_ERROR("Failed to create connect queue.\n"); 1221 return NULL; 1222 } 1223 mtcp->sendq = CreateStreamQueue(g_config.mos->max_concurrency); 1224 if (!mtcp->sendq) { 1225 CTRACE_ERROR("Failed to create send queue.\n"); 1226 return NULL; 1227 } 1228 mtcp->ackq = CreateStreamQueue(g_config.mos->max_concurrency); 1229 if (!mtcp->ackq) { 1230 CTRACE_ERROR("Failed to create ack queue.\n"); 1231 return NULL; 1232 } 1233 mtcp->closeq = CreateStreamQueue(g_config.mos->max_concurrency); 1234 if (!mtcp->closeq) { 1235 CTRACE_ERROR("Failed to create close queue.\n"); 1236 return NULL; 1237 } 1238 mtcp->closeq_int = CreateInternalStreamQueue(g_config.mos->max_concurrency); 1239 if (!mtcp->closeq_int) { 1240 CTRACE_ERROR("Failed to create close queue.\n"); 1241 return NULL; 1242 } 1243 mtcp->resetq = CreateStreamQueue(g_config.mos->max_concurrency); 1244 if (!mtcp->resetq) { 1245 CTRACE_ERROR("Failed to create reset queue.\n"); 1246 return NULL; 1247 } 1248 mtcp->resetq_int = CreateInternalStreamQueue(g_config.mos->max_concurrency); 1249 if (!mtcp->resetq_int) { 1250 CTRACE_ERROR("Failed to create reset queue.\n"); 1251 return NULL; 1252 } 1253 mtcp->destroyq = CreateStreamQueue(g_config.mos->max_concurrency); 1254 if (!mtcp->destroyq) { 1255 CTRACE_ERROR("Failed to create destroy queue.\n"); 1256 return NULL; 1257 } 1258 1259 mtcp->g_sender = CreateMTCPSender(-1); 1260 if (!mtcp->g_sender) { 1261 CTRACE_ERROR("Failed to create global sender structure.\n"); 1262 return NULL; 1263 } 1264 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 1265 mtcp->n_sender[i] = CreateMTCPSender(i); 1266 if (!mtcp->n_sender[i]) { 1267 CTRACE_ERROR("Failed to create per-nic sender structure.\n"); 1268 return NULL; 1269 } 1270 } 1271 1272 mtcp->rto_store = InitRTOHashstore(); 1273 TAILQ_INIT(&mtcp->timewait_list); 1274 TAILQ_INIT(&mtcp->timeout_list); 1275 1276 return mtcp; 1277 } 1278 /*----------------------------------------------------------------------------*/ 1279 static void * 1280 MTCPRunThread(void *arg) 1281 { 1282 mctx_t mctx = (mctx_t)arg; 1283 int cpu = mctx->cpu; 1284 int working; 1285 struct mtcp_manager *mtcp; 1286 struct mtcp_thread_context *ctx; 1287 1288 /* affinitize the thread to this core first */ 1289 mtcp_core_affinitize(cpu); 1290 1291 /* memory alloc after core affinitization would use local memory 1292 most time */ 1293 ctx = calloc(1, sizeof(*ctx)); 1294 if (!ctx) { 1295 perror("calloc"); 1296 TRACE_ERROR("Failed to calloc mtcp context.\n"); 1297 exit(-1); 1298 } 1299 ctx->thread = pthread_self(); 1300 ctx->cpu = cpu; 1301 mtcp = ctx->mtcp_manager = InitializeMTCPManager(ctx); 1302 if (!mtcp) { 1303 TRACE_ERROR("Failed to initialize mtcp manager.\n"); 1304 exit(-1); 1305 } 1306 1307 /* assign mtcp context's underlying I/O module */ 1308 mtcp->iom = current_iomodule_func; 1309 1310 /* I/O initializing */ 1311 if (mtcp->iom->init_handle) 1312 mtcp->iom->init_handle(ctx); 1313 1314 if (pthread_mutex_init(&ctx->flow_pool_lock, NULL)) { 1315 perror("pthread_mutex_init of ctx->flow_pool_lock\n"); 1316 exit(-1); 1317 } 1318 1319 if (pthread_mutex_init(&ctx->socket_pool_lock, NULL)) { 1320 perror("pthread_mutex_init of ctx->socket_pool_lock\n"); 1321 exit(-1); 1322 } 1323 1324 SQ_LOCK_INIT(&ctx->connect_lock, "ctx->connect_lock", exit(-1)); 1325 SQ_LOCK_INIT(&ctx->close_lock, "ctx->close_lock", exit(-1)); 1326 SQ_LOCK_INIT(&ctx->reset_lock, "ctx->reset_lock", exit(-1)); 1327 SQ_LOCK_INIT(&ctx->sendq_lock, "ctx->sendq_lock", exit(-1)); 1328 SQ_LOCK_INIT(&ctx->ackq_lock, "ctx->ackq_lock", exit(-1)); 1329 SQ_LOCK_INIT(&ctx->destroyq_lock, "ctx->destroyq_lock", exit(-1)); 1330 1331 /* remember this context pointer for signal processing */ 1332 g_pctx[cpu] = ctx; 1333 mlockall(MCL_CURRENT); 1334 1335 // attach (nic device, queue) 1336 working = AttachDevice(ctx); 1337 if (working != 0) { 1338 sem_post(&g_init_sem[ctx->cpu]); 1339 TRACE_DBG("MTCP thread %d finished. Not attached any device\n", ctx->cpu); 1340 pthread_exit(NULL); 1341 } 1342 1343 TRACE_DBG("CPU %d: initialization finished.\n", cpu); 1344 sem_post(&g_init_sem[ctx->cpu]); 1345 1346 /* start the main loop */ 1347 RunMainLoop(ctx); 1348 1349 TRACE_DBG("MTCP thread %d finished.\n", ctx->cpu); 1350 1351 /* signaling mTCP thread is done */ 1352 sem_post(&g_done_sem[mctx->cpu]); 1353 1354 //pthread_exit(NULL); 1355 return 0; 1356 } 1357 /*----------------------------------------------------------------------------*/ 1358 #ifdef ENABLE_DPDK 1359 static int MTCPDPDKRunThread(void *arg) 1360 { 1361 MTCPRunThread(arg); 1362 return 0; 1363 } 1364 #endif /* !ENABLE_DPDK */ 1365 /*----------------------------------------------------------------------------*/ 1366 mctx_t 1367 mtcp_create_context(int cpu) 1368 { 1369 mctx_t mctx; 1370 int ret; 1371 1372 if (cpu >= g_config.mos->num_cores) { 1373 TRACE_ERROR("Failed initialize new mtcp context. " 1374 "Requested cpu id %d exceed the number of cores %d configured to use.\n", 1375 cpu, g_config.mos->num_cores); 1376 return NULL; 1377 } 1378 1379 /* check if mtcp_create_context() was already initialized */ 1380 if (g_logctx[cpu] != NULL) { 1381 TRACE_ERROR("%s was already initialized before!\n", 1382 __FUNCTION__); 1383 return NULL; 1384 } 1385 1386 ret = sem_init(&g_init_sem[cpu], 0, 0); 1387 if (ret) { 1388 TRACE_ERROR("Failed initialize init_sem.\n"); 1389 return NULL; 1390 } 1391 1392 ret = sem_init(&g_done_sem[cpu], 0, 0); 1393 if (ret) { 1394 TRACE_ERROR("Failed initialize done_sem.\n"); 1395 return NULL; 1396 } 1397 1398 mctx = (mctx_t)calloc(1, sizeof(struct mtcp_context)); 1399 if (!mctx) { 1400 TRACE_ERROR("Failed to allocate memory for mtcp_context.\n"); 1401 return NULL; 1402 } 1403 mctx->cpu = cpu; 1404 g_ctx[cpu] = mctx; 1405 1406 /* initialize logger */ 1407 g_logctx[cpu] = (struct log_thread_context *) 1408 calloc(1, sizeof(struct log_thread_context)); 1409 if (!g_logctx[cpu]) { 1410 perror("malloc"); 1411 TRACE_ERROR("Failed to allocate memory for log thread context.\n"); 1412 return NULL; 1413 } 1414 InitLogThreadContext(g_logctx[cpu], cpu); 1415 if (pthread_create(&log_thread[cpu], 1416 NULL, ThreadLogMain, (void *)g_logctx[cpu])) { 1417 perror("pthread_create"); 1418 TRACE_ERROR("Failed to create log thread\n"); 1419 return NULL; 1420 } 1421 1422 /* use rte_eal_remote_launch() for DPDK 1423 (worker/slave threads are already initialized by rte_eal_init()) */ 1424 #ifdef ENABLE_DPDK 1425 /* Wake up mTCP threads (wake up I/O threads) */ 1426 if (current_iomodule_func == &dpdk_module_func) { 1427 int master; 1428 master = rte_get_master_lcore(); 1429 if (master == cpu) { 1430 lcore_config[master].ret = 0; 1431 lcore_config[master].state = FINISHED; 1432 if (pthread_create(&g_thread[cpu], 1433 NULL, MTCPRunThread, (void *)mctx) != 0) { 1434 TRACE_ERROR("pthread_create of mtcp thread failed!\n"); 1435 return NULL; 1436 } 1437 } else 1438 rte_eal_remote_launch(MTCPDPDKRunThread, mctx, cpu); 1439 } else 1440 #endif /* !ENABLE_DPDK */ 1441 { 1442 if (pthread_create(&g_thread[cpu], 1443 NULL, MTCPRunThread, (void *)mctx) != 0) { 1444 TRACE_ERROR("pthread_create of mtcp thread failed!\n"); 1445 return NULL; 1446 } 1447 } 1448 1449 sem_wait(&g_init_sem[cpu]); 1450 sem_destroy(&g_init_sem[cpu]); 1451 1452 running[cpu] = TRUE; 1453 1454 #ifdef NETSTAT 1455 #if NETSTAT_TOTAL 1456 if (printer < 0) { 1457 printer = cpu; 1458 TRACE_INFO("CPU %d is in charge of printing stats.\n", printer); 1459 } 1460 #endif 1461 #endif 1462 1463 return mctx; 1464 } 1465 /*----------------------------------------------------------------------------*/ 1466 int 1467 mtcp_destroy_context(mctx_t mctx) 1468 { 1469 struct mtcp_thread_context *ctx = g_pctx[mctx->cpu]; 1470 if (ctx != NULL) 1471 ctx->done = 1; 1472 1473 struct mtcp_context m; 1474 m.cpu = mctx->cpu; 1475 mtcp_free_context(&m); 1476 1477 free(mctx); 1478 1479 return 0; 1480 } 1481 /*----------------------------------------------------------------------------*/ 1482 void 1483 mtcp_free_context(mctx_t mctx) 1484 { 1485 struct mtcp_thread_context *ctx = g_pctx[mctx->cpu]; 1486 struct mtcp_manager *mtcp = ctx->mtcp_manager; 1487 struct log_thread_context *log_ctx = mtcp->logger; 1488 int ret, i; 1489 1490 TRACE_DBG("CPU %d: mtcp_free_context()\n", mctx->cpu); 1491 1492 if (g_pctx[mctx->cpu] == NULL) return; 1493 1494 /* close all stream sockets that are still open */ 1495 if (!ctx->exit) { 1496 for (i = 0; i < g_config.mos->max_concurrency; i++) { 1497 if (mtcp->smap[i].socktype == MOS_SOCK_STREAM) { 1498 TRACE_DBG("Closing remaining socket %d (%s)\n", 1499 i, TCPStateToString(mtcp->smap[i].stream)); 1500 #ifdef DUMP_STREAM 1501 DumpStream(mtcp, mtcp->smap[i].stream); 1502 #endif 1503 mtcp_close(mctx, i); 1504 } 1505 } 1506 } 1507 1508 ctx->done = 1; 1509 ctx->exit = 1; 1510 1511 #ifdef ENABLE_DPDK 1512 if (current_iomodule_func == &dpdk_module_func) { 1513 int master = rte_get_master_lcore(); 1514 if (master == mctx->cpu) 1515 pthread_join(g_thread[mctx->cpu], NULL); 1516 else 1517 rte_eal_wait_lcore(mctx->cpu); 1518 } else 1519 #endif /* !ENABLE_DPDK */ 1520 { 1521 pthread_join(g_thread[mctx->cpu], NULL); 1522 } 1523 1524 TRACE_INFO("MTCP thread %d joined.\n", mctx->cpu); 1525 1526 running[mctx->cpu] = FALSE; 1527 1528 #ifdef NETSTAT 1529 #if NETSTAT_TOTAL 1530 if (printer == mctx->cpu) { 1531 for (i = 0; i < num_cpus; i++) { 1532 if (i != mctx->cpu && running[i]) { 1533 printer = i; 1534 break; 1535 } 1536 } 1537 } 1538 #endif 1539 #endif 1540 1541 log_ctx->done = 1; 1542 ret = write(log_ctx->pair_sp_fd, "F", 1); 1543 if (ret != 1) 1544 TRACE_ERROR("CPU %d: Fail to signal socket pair\n", mctx->cpu); 1545 1546 if ((ret = pthread_join(log_thread[ctx->cpu], NULL) != 0)) { 1547 TRACE_ERROR("pthread_join() returns error (errno = %s)\n", strerror(ret)); 1548 exit(-1); 1549 } 1550 1551 1552 fclose(mtcp->log_fp); 1553 TRACE_LOG("Log thread %d joined.\n", mctx->cpu); 1554 1555 if (mtcp->connectq) { 1556 DestroyStreamQueue(mtcp->connectq); 1557 mtcp->connectq = NULL; 1558 } 1559 if (mtcp->sendq) { 1560 DestroyStreamQueue(mtcp->sendq); 1561 mtcp->sendq = NULL; 1562 } 1563 if (mtcp->ackq) { 1564 DestroyStreamQueue(mtcp->ackq); 1565 mtcp->ackq = NULL; 1566 } 1567 if (mtcp->closeq) { 1568 DestroyStreamQueue(mtcp->closeq); 1569 mtcp->closeq = NULL; 1570 } 1571 if (mtcp->closeq_int) { 1572 DestroyInternalStreamQueue(mtcp->closeq_int); 1573 mtcp->closeq_int = NULL; 1574 } 1575 if (mtcp->resetq) { 1576 DestroyStreamQueue(mtcp->resetq); 1577 mtcp->resetq = NULL; 1578 } 1579 if (mtcp->resetq_int) { 1580 DestroyInternalStreamQueue(mtcp->resetq_int); 1581 mtcp->resetq_int = NULL; 1582 } 1583 if (mtcp->destroyq) { 1584 DestroyStreamQueue(mtcp->destroyq); 1585 mtcp->destroyq = NULL; 1586 } 1587 1588 DestroyMTCPSender(mtcp->g_sender); 1589 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 1590 DestroyMTCPSender(mtcp->n_sender[i]); 1591 } 1592 1593 MPDestroy(mtcp->rv_pool); 1594 MPDestroy(mtcp->sv_pool); 1595 MPDestroy(mtcp->flow_pool); 1596 1597 if (mtcp->ap) { 1598 DestroyAddressPool(mtcp->ap); 1599 mtcp->ap = NULL; 1600 } 1601 1602 SQ_LOCK_DESTROY(&ctx->connect_lock); 1603 SQ_LOCK_DESTROY(&ctx->close_lock); 1604 SQ_LOCK_DESTROY(&ctx->reset_lock); 1605 SQ_LOCK_DESTROY(&ctx->sendq_lock); 1606 SQ_LOCK_DESTROY(&ctx->ackq_lock); 1607 SQ_LOCK_DESTROY(&ctx->destroyq_lock); 1608 1609 //TRACE_INFO("MTCP thread %d destroyed.\n", mctx->cpu); 1610 if (mtcp->iom->destroy_handle) 1611 mtcp->iom->destroy_handle(ctx); 1612 if (g_logctx[mctx->cpu]) { 1613 free(g_logctx[mctx->cpu]); 1614 g_logctx[mctx->cpu] = NULL; 1615 } 1616 free(ctx); 1617 g_pctx[mctx->cpu] = NULL; 1618 } 1619 /*----------------------------------------------------------------------------*/ 1620 mtcp_sighandler_t 1621 mtcp_register_signal(int signum, mtcp_sighandler_t handler) 1622 { 1623 mtcp_sighandler_t prev; 1624 1625 if (signum == SIGINT) { 1626 prev = app_signal_handler; 1627 app_signal_handler = handler; 1628 } else { 1629 if ((prev = signal(signum, handler)) == SIG_ERR) { 1630 perror("signal"); 1631 return SIG_ERR; 1632 } 1633 } 1634 1635 return prev; 1636 } 1637 /*----------------------------------------------------------------------------*/ 1638 int 1639 mtcp_getconf(struct mtcp_conf *conf) 1640 { 1641 int i, j; 1642 1643 if (!conf) { 1644 errno = EINVAL; 1645 return -1; 1646 } 1647 1648 conf->num_cores = g_config.mos->num_cores; 1649 conf->max_concurrency = g_config.mos->max_concurrency; 1650 conf->cpu_mask = g_config.mos->cpu_mask; 1651 1652 conf->rcvbuf_size = g_config.mos->rmem_size; 1653 conf->sndbuf_size = g_config.mos->wmem_size; 1654 1655 conf->tcp_timewait = g_config.mos->tcp_tw_interval; 1656 conf->tcp_timeout = g_config.mos->tcp_timeout; 1657 1658 i = 0; 1659 struct conf_block *bwalk; 1660 TAILQ_FOREACH(bwalk, &g_config.app_blkh, link) { 1661 struct app_conf *app_conf = (struct app_conf *)bwalk->conf; 1662 for (j = 0; j < app_conf->app_argc; j++) 1663 conf->app_argv[i][j] = app_conf->app_argv[j]; 1664 conf->app_argc[i] = app_conf->app_argc; 1665 conf->app_cpu_mask[i] = app_conf->cpu_mask; 1666 i++; 1667 } 1668 conf->num_app = i; 1669 1670 return 0; 1671 } 1672 /*----------------------------------------------------------------------------*/ 1673 int 1674 mtcp_setconf(const struct mtcp_conf *conf) 1675 { 1676 if (!conf) 1677 return -1; 1678 1679 g_config.mos->num_cores = conf->num_cores; 1680 g_config.mos->max_concurrency = conf->max_concurrency; 1681 1682 g_config.mos->rmem_size = conf->rcvbuf_size; 1683 g_config.mos->wmem_size = conf->sndbuf_size; 1684 1685 g_config.mos->tcp_tw_interval = conf->tcp_timewait; 1686 g_config.mos->tcp_timeout = conf->tcp_timeout; 1687 1688 TRACE_CONFIG("Configuration updated by mtcp_setconf().\n"); 1689 //PrintConfiguration(); 1690 1691 return 0; 1692 } 1693 /*----------------------------------------------------------------------------*/ 1694 int 1695 mtcp_init(const char *config_file) 1696 { 1697 int i; 1698 int ret; 1699 1700 if (geteuid()) { 1701 TRACE_CONFIG("[CAUTION] Run as root if mlock is necessary.\n"); 1702 #if defined(ENABLE_DPDK) || defined(ENABLE_NETMAP) 1703 TRACE_CONFIG("[CAUTION] Run the app as root!\n"); 1704 exit(EXIT_FAILURE); 1705 #endif 1706 } 1707 1708 /* getting cpu and NIC */ 1709 num_cpus = GetNumCPUs(); 1710 assert(num_cpus >= 1); 1711 for (i = 0; i < num_cpus; i++) { 1712 g_mtcp[i] = NULL; 1713 running[i] = FALSE; 1714 sigint_cnt[i] = 0; 1715 } 1716 1717 ret = LoadConfigurationUpperHalf(config_file); 1718 if (ret) { 1719 TRACE_CONFIG("Error occured while loading configuration.\n"); 1720 return -1; 1721 } 1722 1723 #if defined(ENABLE_PSIO) 1724 current_iomodule_func = &ps_module_func; 1725 #elif defined(ENABLE_DPDK) 1726 current_iomodule_func = &dpdk_module_func; 1727 #elif defined(ENABLE_PCAP) 1728 current_iomodule_func = &pcap_module_func; 1729 #elif defined(ENABLE_NETMAP) 1730 current_iomodule_func = &netmap_module_func; 1731 #endif 1732 1733 if (current_iomodule_func->load_module_upper_half) 1734 current_iomodule_func->load_module_upper_half(); 1735 1736 LoadConfigurationLowerHalf(); 1737 1738 //PrintConfiguration(); 1739 1740 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 1741 ap[i] = CreateAddressPool(g_config.mos->netdev_table->ent[i]->ip_addr, 1); 1742 if (!ap[i]) { 1743 TRACE_CONFIG("Error occured while create address pool[%d]\n", 1744 i); 1745 return -1; 1746 } 1747 } 1748 1749 //PrintInterfaceInfo(); 1750 //PrintRoutingTable(); 1751 //PrintARPTable(); 1752 InitARPTable(); 1753 1754 if (signal(SIGUSR1, HandleSignal) == SIG_ERR) { 1755 perror("signal, SIGUSR1"); 1756 return -1; 1757 } 1758 if (signal(SIGINT, HandleSignal) == SIG_ERR) { 1759 perror("signal, SIGINT"); 1760 return -1; 1761 } 1762 app_signal_handler = NULL; 1763 1764 printf("load_module(): %p\n", current_iomodule_func); 1765 /* load system-wide io module specs */ 1766 if (current_iomodule_func->load_module_lower_half) 1767 current_iomodule_func->load_module_lower_half(); 1768 1769 GlobInitEvent(); 1770 1771 PrintConf(&g_config); 1772 1773 return 0; 1774 } 1775 /*----------------------------------------------------------------------------*/ 1776 int 1777 mtcp_destroy() 1778 { 1779 int i; 1780 1781 /* wait until all threads are closed */ 1782 /* 1783 for (i = 0; i < num_cpus; i++) { 1784 if (running[i]) { 1785 if (pthread_join(g_thread[i], NULL) != 0) 1786 return -1; 1787 } 1788 } 1789 */ 1790 1791 for (i = 0; i < g_config.mos->netdev_table->num; i++) 1792 DestroyAddressPool(ap[i]); 1793 1794 TRACE_INFO("All MTCP threads are joined.\n"); 1795 1796 return 0; 1797 } 1798 /*----------------------------------------------------------------------------*/ 1799