1 #ifndef _GNU_SOURCE 2 #define _GNU_SOURCE 3 #endif 4 #include <sched.h> 5 #include <unistd.h> 6 #include <sys/time.h> 7 #include <semaphore.h> 8 #include <sys/mman.h> 9 #include <signal.h> 10 #include <assert.h> 11 #include <string.h> 12 13 #include "cpu.h" 14 #include "eth_in.h" 15 #include "fhash.h" 16 #include "tcp_send_buffer.h" 17 #include "tcp_ring_buffer.h" 18 #include "socket.h" 19 #include "eth_out.h" 20 #include "tcp.h" 21 #include "tcp_in.h" 22 #include "tcp_out.h" 23 #include "mtcp_api.h" 24 #include "eventpoll.h" 25 #include "logger.h" 26 #include "config.h" 27 #include "arp.h" 28 #include "ip_out.h" 29 #include "timer.h" 30 #include "debug.h" 31 #include "event_callback.h" 32 #include "tcp_rb.h" 33 #include "tcp_stream.h" 34 #include "io_module.h" 35 36 #ifdef ENABLE_DPDK 37 /* for launching rte thread */ 38 #include <rte_launch.h> 39 #include <rte_lcore.h> 40 #endif /* !ENABLE_DPDK */ 41 #define PS_CHUNK_SIZE 64 42 #define RX_THRESH (PS_CHUNK_SIZE * 0.8) 43 44 #define ROUND_STAT FALSE 45 #define TIME_STAT FALSE 46 #define EVENT_STAT FALSE 47 #define TESTING FALSE 48 49 #define LOG_FILE_NAME "log" 50 #define MAX_FILE_NAME 1024 51 52 #define MAX(a, b) ((a)>(b)?(a):(b)) 53 #define MIN(a, b) ((a)<(b)?(a):(b)) 54 55 #define PER_STREAM_SLICE 0.1 // in ms 56 #define PER_STREAM_TCHECK 1 // in ms 57 #define PS_SELECT_TIMEOUT 100 // in us 58 59 #define GBPS(bytes) (bytes * 8.0 / (1000 * 1000 * 1000)) 60 61 /*----------------------------------------------------------------------------*/ 62 /* handlers for threads */ 63 struct mtcp_thread_context *g_pctx[MAX_CPUS] = {0}; 64 struct log_thread_context *g_logctx[MAX_CPUS] = {0}; 65 /*----------------------------------------------------------------------------*/ 66 static pthread_t g_thread[MAX_CPUS] = {0}; 67 static pthread_t log_thread[MAX_CPUS] = {0}; 68 /*----------------------------------------------------------------------------*/ 69 static sem_t g_init_sem[MAX_CPUS]; 70 static sem_t g_done_sem[MAX_CPUS]; 71 static int running[MAX_CPUS] = {0}; 72 /*----------------------------------------------------------------------------*/ 73 mtcp_sighandler_t app_signal_handler; 74 static int sigint_cnt[MAX_CPUS] = {0}; 75 static struct timespec sigint_ts[MAX_CPUS]; 76 void mtcp_free_context(mctx_t mctx); 77 /*----------------------------------------------------------------------------*/ 78 #ifdef NETSTAT 79 #if NETSTAT_TOTAL 80 static int printer = -1; 81 #if ROUND_STAT 82 #endif /* ROUND_STAT */ 83 #endif /* NETSTAT_TOTAL */ 84 #endif /* NETSTAT */ 85 /*----------------------------------------------------------------------------*/ 86 void 87 HandleSignal(int signal) 88 { 89 int i = 0; 90 91 if (signal == SIGINT) { 92 FreeConfigResources(); 93 #ifdef DARWIN 94 int core = 0; 95 #else 96 int core = sched_getcpu(); 97 #endif 98 struct timespec cur_ts; 99 100 clock_gettime(CLOCK_REALTIME, &cur_ts); 101 102 if (sigint_cnt[core] > 0 && cur_ts.tv_sec == sigint_ts[core].tv_sec) { 103 for (i = 0; i < g_config.mos->num_cores; i++) { 104 if (running[i]) { 105 //exit(0); 106 g_pctx[i]->exit = TRUE; 107 } 108 } 109 } else { 110 for (i = 0; i < g_config.mos->num_cores; i++) { 111 if (g_pctx[i]) 112 g_pctx[i]->interrupt = TRUE; 113 } 114 if (!app_signal_handler) { 115 for (i = 0; i < g_config.mos->num_cores; i++) { 116 if (running[i]) { 117 //exit(0); 118 g_pctx[i]->exit = TRUE; 119 } 120 } 121 } 122 } 123 sigint_cnt[core]++; 124 clock_gettime(CLOCK_REALTIME, &sigint_ts[core]); 125 } 126 127 if (signal != SIGUSR1) { 128 if (app_signal_handler) { 129 app_signal_handler(signal); 130 } 131 } 132 } 133 /*----------------------------------------------------------------------------*/ 134 static int 135 AttachDevice(struct mtcp_thread_context* ctx) 136 { 137 int working = -1; 138 mtcp_manager_t mtcp = ctx->mtcp_manager; 139 140 if (mtcp->iom->link_devices) 141 working = mtcp->iom->link_devices(ctx); 142 else 143 return 0; 144 145 return working; 146 } 147 /*----------------------------------------------------------------------------*/ 148 #ifdef TIMESTAT 149 static inline void 150 InitStatCounter(struct stat_counter *counter) 151 { 152 counter->cnt = 0; 153 counter->sum = 0; 154 counter->max = 0; 155 counter->min = 0; 156 } 157 /*----------------------------------------------------------------------------*/ 158 static inline void 159 UpdateStatCounter(struct stat_counter *counter, int64_t value) 160 { 161 counter->cnt++; 162 counter->sum += value; 163 if (value > counter->max) 164 counter->max = value; 165 if (counter->min == 0 || value < counter->min) 166 counter->min = value; 167 } 168 /*----------------------------------------------------------------------------*/ 169 static inline uint64_t 170 GetAverageStat(struct stat_counter *counter) 171 { 172 return counter->cnt ? (counter->sum / counter->cnt) : 0; 173 } 174 /*----------------------------------------------------------------------------*/ 175 static inline int64_t 176 TimeDiffUs(struct timeval *t2, struct timeval *t1) 177 { 178 return (t2->tv_sec - t1->tv_sec) * 1000000 + 179 (int64_t)(t2->tv_usec - t1->tv_usec); 180 } 181 /*----------------------------------------------------------------------------*/ 182 #endif 183 #ifdef NETSTAT 184 static inline void 185 PrintThreadNetworkStats(mtcp_manager_t mtcp, struct net_stat *ns) 186 { 187 int i; 188 189 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 190 ns->rx_packets[i] = mtcp->nstat.rx_packets[i] - mtcp->p_nstat.rx_packets[i]; 191 ns->rx_errors[i] = mtcp->nstat.rx_errors[i] - mtcp->p_nstat.rx_errors[i]; 192 ns->rx_bytes[i] = mtcp->nstat.rx_bytes[i] - mtcp->p_nstat.rx_bytes[i]; 193 ns->tx_packets[i] = mtcp->nstat.tx_packets[i] - mtcp->p_nstat.tx_packets[i]; 194 ns->tx_drops[i] = mtcp->nstat.tx_drops[i] - mtcp->p_nstat.tx_drops[i]; 195 ns->tx_bytes[i] = mtcp->nstat.tx_bytes[i] - mtcp->p_nstat.tx_bytes[i]; 196 #if NETSTAT_PERTHREAD 197 if (g_config.mos->netdev_table->ent[i]->stat_print) { 198 fprintf(stderr, "[CPU%2d] %s flows: %6u, " 199 "RX: %7llu(pps) (err: %5llu), %5.2lf(Gbps), " 200 "TX: %7llu(pps), %5.2lf(Gbps)\n", 201 mtcp->ctx->cpu, 202 g_config.mos->netdev_table->ent[i]->dev_name, 203 (unsigned)mtcp->flow_cnt, 204 (long long unsigned)ns->rx_packets[i], 205 (long long unsigned)ns->rx_errors[i], 206 GBPS(ns->rx_bytes[i]), 207 (long long unsigned)ns->tx_packets[i], 208 GBPS(ns->tx_bytes[i])); 209 } 210 #endif 211 } 212 mtcp->p_nstat = mtcp->nstat; 213 214 } 215 /*----------------------------------------------------------------------------*/ 216 #if ROUND_STAT 217 static inline void 218 PrintThreadRoundStats(mtcp_manager_t mtcp, struct run_stat *rs) 219 { 220 #define ROUND_DIV (1000) 221 rs->rounds = mtcp->runstat.rounds - mtcp->p_runstat.rounds; 222 rs->rounds_rx = mtcp->runstat.rounds_rx - mtcp->p_runstat.rounds_rx; 223 rs->rounds_rx_try = mtcp->runstat.rounds_rx_try - mtcp->p_runstat.rounds_rx_try; 224 rs->rounds_tx = mtcp->runstat.rounds_tx - mtcp->p_runstat.rounds_tx; 225 rs->rounds_tx_try = mtcp->runstat.rounds_tx_try - mtcp->p_runstat.rounds_tx_try; 226 rs->rounds_select = mtcp->runstat.rounds_select - mtcp->p_runstat.rounds_select; 227 rs->rounds_select_rx = mtcp->runstat.rounds_select_rx - mtcp->p_runstat.rounds_select_rx; 228 rs->rounds_select_tx = mtcp->runstat.rounds_select_tx - mtcp->p_runstat.rounds_select_tx; 229 rs->rounds_select_intr = mtcp->runstat.rounds_select_intr - mtcp->p_runstat.rounds_select_intr; 230 rs->rounds_twcheck = mtcp->runstat.rounds_twcheck - mtcp->p_runstat.rounds_twcheck; 231 mtcp->p_runstat = mtcp->runstat; 232 #if NETSTAT_PERTHREAD 233 fprintf(stderr, "[CPU%2d] Rounds: %4lluK, " 234 "rx: %3lluK (try: %4lluK), tx: %3lluK (try: %4lluK), " 235 "ps_select: %4llu (rx: %4llu, tx: %4llu, intr: %3llu)\n", 236 mtcp->ctx->cpu, rs->rounds / ROUND_DIV, 237 rs->rounds_rx / ROUND_DIV, rs->rounds_rx_try / ROUND_DIV, 238 rs->rounds_tx / ROUND_DIV, rs->rounds_tx_try / ROUND_DIV, 239 rs->rounds_select, 240 rs->rounds_select_rx, rs->rounds_select_tx, rs->rounds_select_intr); 241 #endif 242 } 243 #endif /* ROUND_STAT */ 244 /*----------------------------------------------------------------------------*/ 245 #if TIME_STAT 246 static inline void 247 PrintThreadRoundTime(mtcp_manager_t mtcp) 248 { 249 fprintf(stderr, "[CPU%2d] Time: (avg, max) " 250 "round: (%4luus, %4luus), processing: (%4luus, %4luus), " 251 "tcheck: (%4luus, %4luus), epoll: (%4luus, %4luus), " 252 "handle: (%4luus, %4luus), xmit: (%4luus, %4luus), " 253 "select: (%4luus, %4luus)\n", mtcp->ctx->cpu, 254 GetAverageStat(&mtcp->rtstat.round), mtcp->rtstat.round.max, 255 GetAverageStat(&mtcp->rtstat.processing), mtcp->rtstat.processing.max, 256 GetAverageStat(&mtcp->rtstat.tcheck), mtcp->rtstat.tcheck.max, 257 GetAverageStat(&mtcp->rtstat.epoll), mtcp->rtstat.epoll.max, 258 GetAverageStat(&mtcp->rtstat.handle), mtcp->rtstat.handle.max, 259 GetAverageStat(&mtcp->rtstat.xmit), mtcp->rtstat.xmit.max, 260 GetAverageStat(&mtcp->rtstat.select), mtcp->rtstat.select.max); 261 262 InitStatCounter(&mtcp->rtstat.round); 263 InitStatCounter(&mtcp->rtstat.processing); 264 InitStatCounter(&mtcp->rtstat.tcheck); 265 InitStatCounter(&mtcp->rtstat.epoll); 266 InitStatCounter(&mtcp->rtstat.handle); 267 InitStatCounter(&mtcp->rtstat.xmit); 268 InitStatCounter(&mtcp->rtstat.select); 269 } 270 #endif 271 #endif /* NETSTAT */ 272 /*----------------------------------------------------------------------------*/ 273 #if EVENT_STAT 274 static inline void 275 PrintEventStat(int core, struct mtcp_epoll_stat *stat) 276 { 277 fprintf(stderr, "[CPU%2d] calls: %lu, waits: %lu, wakes: %lu, " 278 "issued: %lu, registered: %lu, invalidated: %lu, handled: %lu\n", 279 core, stat->calls, stat->waits, stat->wakes, 280 stat->issued, stat->registered, stat->invalidated, stat->handled); 281 memset(stat, 0, sizeof(struct mtcp_epoll_stat)); 282 } 283 #endif /* EVENT_STAT */ 284 /*----------------------------------------------------------------------------*/ 285 #ifdef NETSTAT 286 static inline void 287 PrintNetworkStats(mtcp_manager_t mtcp, uint32_t cur_ts) 288 { 289 #define TIMEOUT 1 290 int i; 291 struct net_stat ns; 292 bool stat_print = false; 293 #if ROUND_STAT 294 struct run_stat rs; 295 #endif /* ROUND_STAT */ 296 #ifdef NETSTAT_TOTAL 297 static double peak_total_rx_gbps = 0; 298 static double peak_total_tx_gbps = 0; 299 static double avg_total_rx_gbps = 0; 300 static double avg_total_tx_gbps = 0; 301 302 double total_rx_gbps = 0, total_tx_gbps = 0; 303 int j; 304 uint32_t gflow_cnt = 0; 305 struct net_stat g_nstat; 306 #if ROUND_STAT 307 struct run_stat g_runstat; 308 #endif /* ROUND_STAT */ 309 #endif /* NETSTAT_TOTAL */ 310 311 if (TS_TO_MSEC(cur_ts - mtcp->p_nstat_ts) < SEC_TO_MSEC(TIMEOUT)) { 312 return; 313 } 314 315 mtcp->p_nstat_ts = cur_ts; 316 gflow_cnt = 0; 317 memset(&g_nstat, 0, sizeof(struct net_stat)); 318 for (i = 0; i < g_config.mos->num_cores; i++) { 319 if (running[i]) { 320 PrintThreadNetworkStats(g_mtcp[i], &ns); 321 #if NETSTAT_TOTAL 322 gflow_cnt += g_mtcp[i]->flow_cnt; 323 for (j = 0; j < g_config.mos->netdev_table->num; j++) { 324 g_nstat.rx_packets[j] += ns.rx_packets[j]; 325 g_nstat.rx_errors[j] += ns.rx_errors[j]; 326 g_nstat.rx_bytes[j] += ns.rx_bytes[j]; 327 g_nstat.tx_packets[j] += ns.tx_packets[j]; 328 g_nstat.tx_drops[j] += ns.tx_drops[j]; 329 g_nstat.tx_bytes[j] += ns.tx_bytes[j]; 330 } 331 #endif 332 } 333 } 334 #if NETSTAT_TOTAL 335 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 336 if (g_config.mos->netdev_table->ent[i]->stat_print) { 337 fprintf(stderr, "[ ALL ] %s, " 338 "RX: %7llu(pps) (err: %5llu), %5.2lf(Gbps), " 339 "TX: %7llu(pps), %5.2lf(Gbps)\n", 340 g_config.mos->netdev_table->ent[i]->dev_name, 341 (long long unsigned)g_nstat.rx_packets[i], 342 (long long unsigned)g_nstat.rx_errors[i], 343 GBPS(g_nstat.rx_bytes[i]), 344 (long long unsigned)g_nstat.tx_packets[i], 345 GBPS(g_nstat.tx_bytes[i])); 346 total_rx_gbps += GBPS(g_nstat.rx_bytes[i]); 347 total_tx_gbps += GBPS(g_nstat.tx_bytes[i]); 348 stat_print = true; 349 } 350 } 351 if (stat_print) { 352 fprintf(stderr, "[ ALL ] flows: %6u\n", gflow_cnt); 353 if (avg_total_rx_gbps == 0) 354 avg_total_rx_gbps = total_rx_gbps; 355 else 356 avg_total_rx_gbps = avg_total_rx_gbps * 0.6 + total_rx_gbps * 0.4; 357 358 if (avg_total_tx_gbps == 0) 359 avg_total_tx_gbps = total_tx_gbps; 360 else 361 avg_total_tx_gbps = avg_total_tx_gbps * 0.6 + total_tx_gbps * 0.4; 362 363 if (peak_total_rx_gbps < total_rx_gbps) 364 peak_total_rx_gbps = total_rx_gbps; 365 if (peak_total_tx_gbps < total_tx_gbps) 366 peak_total_tx_gbps = total_tx_gbps; 367 368 fprintf(stderr, "[ PEAK ] RX: %5.2lf(Gbps), TX: %5.2lf(Gbps)\n" 369 "[ RECENT AVG ] RX: %5.2lf(Gbps), TX: %5.2lf(Gbps)\n", 370 peak_total_rx_gbps, peak_total_tx_gbps, 371 avg_total_rx_gbps, avg_total_tx_gbps); 372 } 373 #endif 374 375 #if ROUND_STAT 376 memset(&g_runstat, 0, sizeof(struct run_stat)); 377 for (i = 0; i < g_config.mos->num_cores; i++) { 378 if (running[i]) { 379 PrintThreadRoundStats(g_mtcp[i], &rs); 380 #if DBGMSG 381 g_runstat.rounds += rs.rounds; 382 g_runstat.rounds_rx += rs.rounds_rx; 383 g_runstat.rounds_rx_try += rs.rounds_rx_try; 384 g_runstat.rounds_tx += rs.rounds_tx; 385 g_runstat.rounds_tx_try += rs.rounds_tx_try; 386 g_runstat.rounds_select += rs.rounds_select; 387 g_runstat.rounds_select_rx += rs.rounds_select_rx; 388 g_runstat.rounds_select_tx += rs.rounds_select_tx; 389 #endif 390 } 391 } 392 393 TRACE_DBG("[ ALL ] Rounds: %4ldK, " 394 "rx: %3ldK (try: %4ldK), tx: %3ldK (try: %4ldK), " 395 "ps_select: %4ld (rx: %4ld, tx: %4ld)\n", 396 g_runstat.rounds / 1000, g_runstat.rounds_rx / 1000, 397 g_runstat.rounds_rx_try / 1000, g_runstat.rounds_tx / 1000, 398 g_runstat.rounds_tx_try / 1000, g_runstat.rounds_select, 399 g_runstat.rounds_select_rx, g_runstat.rounds_select_tx); 400 #endif /* ROUND_STAT */ 401 402 #if TIME_STAT 403 for (i = 0; i < g_config.mos->num_cores; i++) { 404 if (running[i]) { 405 PrintThreadRoundTime(g_mtcp[i]); 406 } 407 } 408 #endif 409 410 #if EVENT_STAT 411 for (i = 0; i < g_config.mos->num_cores; i++) { 412 if (running[i] && g_mtcp[i]->ep) { 413 PrintEventStat(i, &g_mtcp[i]->ep->stat); 414 } 415 } 416 #endif 417 418 fflush(stderr); 419 } 420 #endif /* NETSTAT */ 421 /*----------------------------------------------------------------------------*/ 422 static inline void 423 FlushMonitorReadEvents(mtcp_manager_t mtcp) 424 { 425 struct event_queue *mtcpq; 426 struct tcp_stream *cur_stream; 427 struct mon_listener *walk; 428 429 /* check if monitor sockets should be passed data */ 430 TAILQ_FOREACH(walk, &mtcp->monitors, link) { 431 if (walk->socket->socktype != MOS_SOCK_MONITOR_STREAM || 432 !(mtcpq = walk->eq)) 433 continue; 434 435 while (mtcpq->num_events > 0) { 436 cur_stream = 437 (struct tcp_stream *)mtcpq->events[mtcpq->start++].ev.data.ptr; 438 /* only read events */ 439 if (cur_stream != NULL && 440 (cur_stream->actions & MOS_ACT_READ_DATA)) { 441 if (cur_stream->rcvvar != NULL && 442 cur_stream->rcvvar->rcvbuf != NULL) { 443 /* no need to pass pkt context */ 444 struct socket_map *walk; 445 if (cur_stream->side == MOS_SIDE_CLI) { 446 SOCKQ_FOREACH_REVERSE(walk, &cur_stream->msocks) { 447 HandleCallback(mtcp, MOS_NULL, walk, 448 cur_stream->side, NULL, 449 MOS_ON_CONN_NEW_DATA); 450 } SOCKQ_FOREACH_END; 451 } else { /* cur_stream->side == MOS_SIDE_SVR */ 452 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 453 HandleCallback(mtcp, MOS_NULL, walk, 454 cur_stream->side, NULL, 455 MOS_ON_CONN_NEW_DATA); 456 } SOCKQ_FOREACH_END; 457 } 458 } 459 /* reset the actions now */ 460 cur_stream->actions = 0; 461 } 462 if (mtcpq->start >= mtcpq->size) 463 mtcpq->start = 0; 464 mtcpq->num_events--; 465 } 466 } 467 } 468 /*----------------------------------------------------------------------------*/ 469 static inline void 470 FlushBufferedReadEvents(mtcp_manager_t mtcp) 471 { 472 int i; 473 int offset; 474 struct event_queue *mtcpq; 475 struct tcp_stream *cur_stream; 476 477 if (mtcp->ep == NULL) { 478 TRACE_EPOLL("No epoll socket has been registered yet!\n"); 479 return; 480 } else { 481 /* case when mtcpq exists */ 482 mtcpq = mtcp->ep->mtcp_queue; 483 offset = mtcpq->start; 484 } 485 486 /* we will use queued-up epoll read-in events 487 * to trigger buffered read monitor events */ 488 for (i = 0; i < mtcpq->num_events; i++) { 489 cur_stream = mtcp->smap[mtcpq->events[offset++].sockid].stream; 490 /* only read events */ 491 /* Raise new data callback event */ 492 if (cur_stream != NULL && 493 (cur_stream->socket->events | MOS_EPOLLIN)) { 494 if (cur_stream->rcvvar != NULL && 495 cur_stream->rcvvar->rcvbuf != NULL) { 496 /* no need to pass pkt context */ 497 struct socket_map *walk; 498 if (cur_stream->side == MOS_SIDE_CLI) { 499 SOCKQ_FOREACH_REVERSE(walk, &cur_stream->msocks) { 500 HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side, 501 NULL, MOS_ON_CONN_NEW_DATA); 502 } SOCKQ_FOREACH_END; 503 } else { /* cur_stream->side == MOS_SIDE_SVR */ 504 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 505 HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side, 506 NULL, MOS_ON_CONN_NEW_DATA); 507 } SOCKQ_FOREACH_END; 508 } 509 } 510 } 511 if (offset >= mtcpq->size) 512 offset = 0; 513 } 514 } 515 /*----------------------------------------------------------------------------*/ 516 static inline void 517 FlushEpollEvents(mtcp_manager_t mtcp, uint32_t cur_ts) 518 { 519 struct mtcp_epoll *ep = mtcp->ep; 520 struct event_queue *usrq = ep->usr_queue; 521 struct event_queue *mtcpq = ep->mtcp_queue; 522 523 pthread_mutex_lock(&ep->epoll_lock); 524 if (ep->mtcp_queue->num_events > 0) { 525 /* while mtcp_queue have events */ 526 /* and usr_queue is not full */ 527 while (mtcpq->num_events > 0 && usrq->num_events < usrq->size) { 528 /* copy the event from mtcp_queue to usr_queue */ 529 usrq->events[usrq->end++] = mtcpq->events[mtcpq->start++]; 530 531 if (usrq->end >= usrq->size) 532 usrq->end = 0; 533 usrq->num_events++; 534 535 if (mtcpq->start >= mtcpq->size) 536 mtcpq->start = 0; 537 mtcpq->num_events--; 538 } 539 } 540 541 /* if there are pending events, wake up user */ 542 if (ep->waiting && (ep->usr_queue->num_events > 0 || 543 ep->usr_shadow_queue->num_events > 0)) { 544 STAT_COUNT(mtcp->runstat.rounds_epoll); 545 TRACE_EPOLL("Broadcasting events. num: %d, cur_ts: %u, prev_ts: %u\n", 546 ep->usr_queue->num_events, cur_ts, mtcp->ts_last_event); 547 mtcp->ts_last_event = cur_ts; 548 ep->stat.wakes++; 549 pthread_cond_signal(&ep->epoll_cond); 550 } 551 pthread_mutex_unlock(&ep->epoll_lock); 552 } 553 /*----------------------------------------------------------------------------*/ 554 static inline void 555 HandleApplicationCalls(mtcp_manager_t mtcp, uint32_t cur_ts) 556 { 557 tcp_stream *stream; 558 int cnt, max_cnt; 559 int handled, delayed; 560 int control, send, ack; 561 562 /* connect handling */ 563 while ((stream = StreamDequeue(mtcp->connectq))) { 564 if (stream->state != TCP_ST_SYN_SENT) { 565 TRACE_INFO("Got a connection request from app with state: %s", 566 TCPStateToString(stream)); 567 exit(EXIT_FAILURE); 568 } else { 569 stream->cb_events |= MOS_ON_CONN_START | 570 MOS_ON_TCP_STATE_CHANGE; 571 /* if monitor is on... */ 572 if (stream->pair_stream != NULL) 573 stream->pair_stream->cb_events |= 574 MOS_ON_CONN_START; 575 } 576 AddtoControlList(mtcp, stream, cur_ts); 577 } 578 579 /* send queue handling */ 580 while ((stream = StreamDequeue(mtcp->sendq))) { 581 stream->sndvar->on_sendq = FALSE; 582 AddtoSendList(mtcp, stream); 583 } 584 585 /* ack queue handling */ 586 while ((stream = StreamDequeue(mtcp->ackq))) { 587 stream->sndvar->on_ackq = FALSE; 588 EnqueueACK(mtcp, stream, cur_ts, ACK_OPT_AGGREGATE); 589 } 590 591 /* close handling */ 592 handled = delayed = 0; 593 control = send = ack = 0; 594 while ((stream = StreamDequeue(mtcp->closeq))) { 595 struct tcp_send_vars *sndvar = stream->sndvar; 596 sndvar->on_closeq = FALSE; 597 598 if (sndvar->sndbuf) { 599 sndvar->fss = sndvar->sndbuf->head_seq + sndvar->sndbuf->len; 600 } else { 601 sndvar->fss = stream->snd_nxt; 602 } 603 604 if (g_config.mos->tcp_timeout > 0) 605 RemoveFromTimeoutList(mtcp, stream); 606 607 if (stream->have_reset) { 608 handled++; 609 if (stream->state != TCP_ST_CLOSED_RSVD) { 610 stream->close_reason = TCP_RESET; 611 stream->state = TCP_ST_CLOSED_RSVD; 612 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 613 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 614 DestroyTCPStream(mtcp, stream); 615 } else { 616 TRACE_ERROR("Stream already closed.\n"); 617 } 618 619 } else if (sndvar->on_control_list) { 620 sndvar->on_closeq_int = TRUE; 621 StreamInternalEnqueue(mtcp->closeq_int, stream); 622 delayed++; 623 if (sndvar->on_control_list) 624 control++; 625 if (sndvar->on_send_list) 626 send++; 627 if (sndvar->on_ack_list) 628 ack++; 629 630 } else if (sndvar->on_send_list || sndvar->on_ack_list) { 631 handled++; 632 if (stream->state == TCP_ST_ESTABLISHED) { 633 stream->state = TCP_ST_FIN_WAIT_1; 634 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 635 TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id); 636 637 } else if (stream->state == TCP_ST_CLOSE_WAIT) { 638 stream->state = TCP_ST_LAST_ACK; 639 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 640 TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id); 641 } 642 stream->control_list_waiting = TRUE; 643 644 } else if (stream->state != TCP_ST_CLOSED_RSVD) { 645 handled++; 646 if (stream->state == TCP_ST_ESTABLISHED) { 647 stream->state = TCP_ST_FIN_WAIT_1; 648 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 649 TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id); 650 651 } else if (stream->state == TCP_ST_CLOSE_WAIT) { 652 stream->state = TCP_ST_LAST_ACK; 653 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 654 TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id); 655 } 656 //sndvar->rto = TCP_FIN_RTO; 657 //UpdateRetransmissionTimer(mtcp, stream, mtcp->cur_ts); 658 AddtoControlList(mtcp, stream, cur_ts); 659 } else { 660 TRACE_ERROR("Already closed connection!\n"); 661 } 662 } 663 TRACE_ROUND("Handling close connections. cnt: %d\n", cnt); 664 665 cnt = 0; 666 max_cnt = mtcp->closeq_int->count; 667 while (cnt++ < max_cnt) { 668 stream = StreamInternalDequeue(mtcp->closeq_int); 669 670 if (stream->sndvar->on_control_list) { 671 StreamInternalEnqueue(mtcp->closeq_int, stream); 672 673 } else if (stream->state != TCP_ST_CLOSED_RSVD) { 674 handled++; 675 stream->sndvar->on_closeq_int = FALSE; 676 if (stream->state == TCP_ST_ESTABLISHED) { 677 stream->state = TCP_ST_FIN_WAIT_1; 678 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 679 TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id); 680 681 } else if (stream->state == TCP_ST_CLOSE_WAIT) { 682 stream->state = TCP_ST_LAST_ACK; 683 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 684 TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id); 685 } 686 AddtoControlList(mtcp, stream, cur_ts); 687 } else { 688 stream->sndvar->on_closeq_int = FALSE; 689 TRACE_ERROR("Already closed connection!\n"); 690 } 691 } 692 693 /* reset handling */ 694 while ((stream = StreamDequeue(mtcp->resetq))) { 695 stream->sndvar->on_resetq = FALSE; 696 697 if (g_config.mos->tcp_timeout > 0) 698 RemoveFromTimeoutList(mtcp, stream); 699 700 if (stream->have_reset) { 701 if (stream->state != TCP_ST_CLOSED_RSVD) { 702 stream->close_reason = TCP_RESET; 703 stream->state = TCP_ST_CLOSED_RSVD; 704 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 705 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 706 DestroyTCPStream(mtcp, stream); 707 } else { 708 TRACE_ERROR("Stream already closed.\n"); 709 } 710 711 } else if (stream->sndvar->on_control_list || 712 stream->sndvar->on_send_list || stream->sndvar->on_ack_list) { 713 /* wait until all the queues are flushed */ 714 stream->sndvar->on_resetq_int = TRUE; 715 StreamInternalEnqueue(mtcp->resetq_int, stream); 716 717 } else { 718 if (stream->state != TCP_ST_CLOSED_RSVD) { 719 stream->close_reason = TCP_ACTIVE_CLOSE; 720 stream->state = TCP_ST_CLOSED_RSVD; 721 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 722 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 723 AddtoControlList(mtcp, stream, cur_ts); 724 } else { 725 TRACE_ERROR("Stream already closed.\n"); 726 } 727 } 728 } 729 TRACE_ROUND("Handling reset connections. cnt: %d\n", cnt); 730 731 cnt = 0; 732 max_cnt = mtcp->resetq_int->count; 733 while (cnt++ < max_cnt) { 734 stream = StreamInternalDequeue(mtcp->resetq_int); 735 736 if (stream->sndvar->on_control_list || 737 stream->sndvar->on_send_list || stream->sndvar->on_ack_list) { 738 /* wait until all the queues are flushed */ 739 StreamInternalEnqueue(mtcp->resetq_int, stream); 740 741 } else { 742 stream->sndvar->on_resetq_int = FALSE; 743 744 if (stream->state != TCP_ST_CLOSED_RSVD) { 745 stream->close_reason = TCP_ACTIVE_CLOSE; 746 stream->state = TCP_ST_CLOSED_RSVD; 747 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 748 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 749 AddtoControlList(mtcp, stream, cur_ts); 750 } else { 751 TRACE_ERROR("Stream already closed.\n"); 752 } 753 } 754 } 755 756 /* destroy streams in destroyq */ 757 while ((stream = StreamDequeue(mtcp->destroyq))) { 758 DestroyTCPStream(mtcp, stream); 759 } 760 761 mtcp->wakeup_flag = FALSE; 762 } 763 /*----------------------------------------------------------------------------*/ 764 static inline void 765 WritePacketsToChunks(mtcp_manager_t mtcp, uint32_t cur_ts) 766 { 767 int thresh = g_config.mos->max_concurrency; 768 int i; 769 770 /* Set the threshold to g_config.mos->max_concurrency to send ACK immediately */ 771 /* Otherwise, set to appropriate value (e.g. thresh) */ 772 assert(mtcp->g_sender != NULL); 773 if (mtcp->g_sender->control_list_cnt) 774 WriteTCPControlList(mtcp, mtcp->g_sender, cur_ts, thresh); 775 if (mtcp->g_sender->ack_list_cnt) 776 WriteTCPACKList(mtcp, mtcp->g_sender, cur_ts, thresh); 777 if (mtcp->g_sender->send_list_cnt) 778 WriteTCPDataList(mtcp, mtcp->g_sender, cur_ts, thresh); 779 780 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 781 assert(mtcp->n_sender[i] != NULL); 782 if (mtcp->n_sender[i]->control_list_cnt) 783 WriteTCPControlList(mtcp, mtcp->n_sender[i], cur_ts, thresh); 784 if (mtcp->n_sender[i]->ack_list_cnt) 785 WriteTCPACKList(mtcp, mtcp->n_sender[i], cur_ts, thresh); 786 if (mtcp->n_sender[i]->send_list_cnt) 787 WriteTCPDataList(mtcp, mtcp->n_sender[i], cur_ts, thresh); 788 } 789 } 790 /*----------------------------------------------------------------------------*/ 791 #if TESTING 792 static int 793 DestroyRemainingFlows(mtcp_manager_t mtcp) 794 { 795 struct hashtable *ht = mtcp->tcp_flow_table; 796 tcp_stream *walk; 797 int cnt, i; 798 799 cnt = 0; 800 801 thread_printf(mtcp, mtcp->log_fp, 802 "CPU %d: Flushing remaining flows.\n", mtcp->ctx->cpu); 803 804 for (i = 0; i < NUM_BINS; i++) { 805 TAILQ_FOREACH(walk, &ht->ht_table[i], rcvvar->he_link) { 806 thread_printf(mtcp, mtcp->log_fp, 807 "CPU %d: Destroying stream %d\n", mtcp->ctx->cpu, walk->id); 808 #ifdef DUMP_STREAM 809 DumpStream(mtcp, walk); 810 #endif 811 DestroyTCPStream(mtcp, walk); 812 cnt++; 813 } 814 } 815 816 return cnt; 817 } 818 #endif 819 /*----------------------------------------------------------------------------*/ 820 static void 821 InterruptApplication(mtcp_manager_t mtcp) 822 { 823 /* interrupt if the mtcp_epoll_wait() is waiting */ 824 if (mtcp->ep) { 825 pthread_mutex_lock(&mtcp->ep->epoll_lock); 826 if (mtcp->ep->waiting) { 827 pthread_cond_signal(&mtcp->ep->epoll_cond); 828 } 829 pthread_mutex_unlock(&mtcp->ep->epoll_lock); 830 } 831 /* interrupt if the accept() is waiting */ 832 if (mtcp->listener) { 833 if (mtcp->listener->socket) { 834 pthread_mutex_lock(&mtcp->listener->accept_lock); 835 if (!(mtcp->listener->socket->opts & MTCP_NONBLOCK)) { 836 pthread_cond_signal(&mtcp->listener->accept_cond); 837 } 838 pthread_mutex_unlock(&mtcp->listener->accept_lock); 839 } 840 } 841 } 842 /*----------------------------------------------------------------------------*/ 843 void 844 RunPassiveLoop(mtcp_manager_t mtcp) 845 { 846 sem_wait(&g_done_sem[mtcp->ctx->cpu]); 847 sem_destroy(&g_done_sem[mtcp->ctx->cpu]); 848 return; 849 } 850 /*----------------------------------------------------------------------------*/ 851 static void 852 RunMainLoop(struct mtcp_thread_context *ctx) 853 { 854 mtcp_manager_t mtcp = ctx->mtcp_manager; 855 int i; 856 int recv_cnt; 857 int rx_inf, tx_inf; 858 struct timeval cur_ts = {0}; 859 uint32_t ts, ts_prev; 860 861 #if TIME_STAT 862 struct timeval prev_ts, processing_ts, tcheck_ts, 863 epoll_ts, handle_ts, xmit_ts, select_ts; 864 #endif 865 int thresh; 866 867 gettimeofday(&cur_ts, NULL); 868 869 TRACE_DBG("CPU %d: mtcp thread running.\n", ctx->cpu); 870 871 #if TIME_STAT 872 prev_ts = cur_ts; 873 InitStatCounter(&mtcp->rtstat.round); 874 InitStatCounter(&mtcp->rtstat.processing); 875 InitStatCounter(&mtcp->rtstat.tcheck); 876 InitStatCounter(&mtcp->rtstat.epoll); 877 InitStatCounter(&mtcp->rtstat.handle); 878 InitStatCounter(&mtcp->rtstat.xmit); 879 InitStatCounter(&mtcp->rtstat.select); 880 #endif 881 882 ts = ts_prev = 0; 883 while ((!ctx->done || mtcp->flow_cnt) && !ctx->exit) { 884 885 STAT_COUNT(mtcp->runstat.rounds); 886 recv_cnt = 0; 887 gettimeofday(&cur_ts, NULL); 888 #if TIME_STAT 889 /* measure the inter-round delay */ 890 UpdateStatCounter(&mtcp->rtstat.round, TimeDiffUs(&cur_ts, &prev_ts)); 891 prev_ts = cur_ts; 892 #endif 893 894 ts = TIMEVAL_TO_TS(&cur_ts); 895 mtcp->cur_ts = ts; 896 897 for (rx_inf = 0; rx_inf < g_config.mos->netdev_table->num; rx_inf++) { 898 899 recv_cnt = mtcp->iom->recv_pkts(ctx, rx_inf); 900 STAT_COUNT(mtcp->runstat.rounds_rx_try); 901 902 for (i = 0; i < recv_cnt; i++) { 903 uint16_t len; 904 uint8_t *pktbuf; 905 pktbuf = mtcp->iom->get_rptr(mtcp->ctx, rx_inf, i, &len); 906 ProcessPacket(mtcp, rx_inf, i, ts, pktbuf, len); 907 } 908 909 } 910 STAT_COUNT(mtcp->runstat.rounds_rx); 911 912 #if TIME_STAT 913 gettimeofday(&processing_ts, NULL); 914 UpdateStatCounter(&mtcp->rtstat.processing, 915 TimeDiffUs(&processing_ts, &cur_ts)); 916 #endif /* TIME_STAT */ 917 918 /* Handle user defined timeout */ 919 struct timer *walk, *tmp; 920 for (walk = TAILQ_FIRST(&mtcp->timer_list); walk != NULL; walk = tmp) { 921 tmp = TAILQ_NEXT(walk, timer_link); 922 if (TIMEVAL_LT(&cur_ts, &walk->exp)) 923 break; 924 925 struct mtcp_context mctx = {.cpu = ctx->cpu}; 926 walk->cb(&mctx, walk->id, 0, 0 /* FIXME */, NULL); 927 DelTimer(mtcp, walk); 928 } 929 930 /* interaction with application */ 931 if (mtcp->flow_cnt > 0) { 932 933 /* check retransmission timeout and timewait expire */ 934 #if 0 935 thresh = (int)mtcp->flow_cnt / (TS_TO_USEC(PER_STREAM_TCHECK)); 936 assert(thresh >= 0); 937 if (thresh == 0) 938 thresh = 1; 939 if (recv_cnt > 0 && thresh > recv_cnt) 940 thresh = recv_cnt; 941 #else 942 thresh = g_config.mos->max_concurrency; 943 #endif 944 945 /* Eunyoung, you may fix this later 946 * if there is no rcv packet, we will send as much as possible 947 */ 948 if (thresh == -1) 949 thresh = g_config.mos->max_concurrency; 950 951 CheckRtmTimeout(mtcp, ts, thresh); 952 CheckTimewaitExpire(mtcp, ts, thresh); 953 954 if (g_config.mos->tcp_timeout > 0 && ts != ts_prev) { 955 CheckConnectionTimeout(mtcp, ts, thresh); 956 } 957 958 #if TIME_STAT 959 } 960 gettimeofday(&tcheck_ts, NULL); 961 UpdateStatCounter(&mtcp->rtstat.tcheck, 962 TimeDiffUs(&tcheck_ts, &processing_ts)); 963 964 if (mtcp->flow_cnt > 0) { 965 #endif /* TIME_STAT */ 966 967 } 968 969 /* 970 * before flushing epoll events, call monitor events for 971 * all registered `read` events 972 */ 973 if (mtcp->num_msp > 0) 974 /* call this when only a standalone monitor is running */ 975 FlushMonitorReadEvents(mtcp); 976 977 /* if epoll is in use, flush all the queued events */ 978 if (mtcp->ep) { 979 FlushBufferedReadEvents(mtcp); 980 FlushEpollEvents(mtcp, ts); 981 } 982 #if TIME_STAT 983 gettimeofday(&epoll_ts, NULL); 984 UpdateStatCounter(&mtcp->rtstat.epoll, 985 TimeDiffUs(&epoll_ts, &tcheck_ts)); 986 #endif /* TIME_STAT */ 987 988 if (end_app_exists && mtcp->flow_cnt > 0) { 989 /* handle stream queues */ 990 HandleApplicationCalls(mtcp, ts); 991 } 992 993 #if TIME_STAT 994 gettimeofday(&handle_ts, NULL); 995 UpdateStatCounter(&mtcp->rtstat.handle, 996 TimeDiffUs(&handle_ts, &epoll_ts)); 997 #endif /* TIME_STAT */ 998 999 WritePacketsToChunks(mtcp, ts); 1000 1001 /* send packets from write buffer */ 1002 /* Send until tx is available */ 1003 int num_dev = g_config.mos->netdev_table->num; 1004 if (likely(mtcp->iom->send_pkts != NULL)) 1005 for (tx_inf = 0; tx_inf < num_dev; tx_inf++) { 1006 mtcp->iom->send_pkts(ctx, tx_inf); 1007 } 1008 1009 #if TIME_STAT 1010 gettimeofday(&xmit_ts, NULL); 1011 UpdateStatCounter(&mtcp->rtstat.xmit, 1012 TimeDiffUs(&xmit_ts, &handle_ts)); 1013 #endif /* TIME_STAT */ 1014 1015 if (ts != ts_prev) { 1016 ts_prev = ts; 1017 #ifdef NETSTAT 1018 if (ctx->cpu == printer) { 1019 #ifdef RUN_ARP 1020 ARPTimer(mtcp, ts); 1021 #endif 1022 #ifdef NETSTAT 1023 PrintNetworkStats(mtcp, ts); 1024 #endif 1025 } 1026 #endif /* NETSTAT */ 1027 } 1028 1029 if (mtcp->iom->select) 1030 mtcp->iom->select(ctx); 1031 1032 if (ctx->interrupt) { 1033 InterruptApplication(mtcp); 1034 } 1035 } 1036 1037 #if TESTING 1038 DestroyRemainingFlows(mtcp); 1039 #endif 1040 1041 TRACE_DBG("MTCP thread %d out of main loop.\n", ctx->cpu); 1042 /* flush logs */ 1043 flush_log_data(mtcp); 1044 TRACE_DBG("MTCP thread %d flushed logs.\n", ctx->cpu); 1045 InterruptApplication(mtcp); 1046 TRACE_INFO("MTCP thread %d finished.\n", ctx->cpu); 1047 } 1048 /*----------------------------------------------------------------------------*/ 1049 struct mtcp_sender * 1050 CreateMTCPSender(int ifidx) 1051 { 1052 struct mtcp_sender *sender; 1053 1054 sender = (struct mtcp_sender *)calloc(1, sizeof(struct mtcp_sender)); 1055 if (!sender) { 1056 return NULL; 1057 } 1058 1059 sender->ifidx = ifidx; 1060 1061 TAILQ_INIT(&sender->control_list); 1062 TAILQ_INIT(&sender->send_list); 1063 TAILQ_INIT(&sender->ack_list); 1064 1065 sender->control_list_cnt = 0; 1066 sender->send_list_cnt = 0; 1067 sender->ack_list_cnt = 0; 1068 1069 return sender; 1070 } 1071 /*----------------------------------------------------------------------------*/ 1072 void 1073 DestroyMTCPSender(struct mtcp_sender *sender) 1074 { 1075 free(sender); 1076 } 1077 /*----------------------------------------------------------------------------*/ 1078 static mtcp_manager_t 1079 InitializeMTCPManager(struct mtcp_thread_context* ctx) 1080 { 1081 mtcp_manager_t mtcp; 1082 char log_name[MAX_FILE_NAME]; 1083 int i; 1084 1085 posix_seq_srand((unsigned)pthread_self()); 1086 1087 mtcp = (mtcp_manager_t)calloc(1, sizeof(struct mtcp_manager)); 1088 if (!mtcp) { 1089 perror("malloc"); 1090 TRACE_ERROR("Failed to allocate mtcp_manager.\n"); 1091 return NULL; 1092 } 1093 g_mtcp[ctx->cpu] = mtcp; 1094 1095 mtcp->tcp_flow_table = CreateHashtable(); 1096 if (!mtcp->tcp_flow_table) { 1097 CTRACE_ERROR("Falied to allocate tcp flow table.\n"); 1098 return NULL; 1099 } 1100 1101 #ifdef HUGEPAGE 1102 #define IS_HUGEPAGE 1 1103 #else 1104 #define IS_HUGEPAGE 0 1105 #endif 1106 if (mon_app_exists) { 1107 /* initialize event callback */ 1108 #ifdef NEWEV 1109 InitEvent(mtcp); 1110 #else 1111 InitEvent(mtcp, NUM_EV_TABLE); 1112 #endif 1113 } 1114 1115 if (!(mtcp->bufseg_pool = MPCreate(sizeof(tcpbufseg_t), 1116 sizeof(tcpbufseg_t) * g_config.mos->max_concurrency * 1117 ((g_config.mos->rmem_size - 1) / UNITBUFSIZE + 1), 0))) { 1118 TRACE_ERROR("Failed to allocate ev_table pool\n"); 1119 exit(0); 1120 } 1121 if (!(mtcp->sockent_pool = MPCreate(sizeof(struct sockent), 1122 sizeof(struct sockent) * g_config.mos->max_concurrency * 3, 0))) { 1123 TRACE_ERROR("Failed to allocate ev_table pool\n"); 1124 exit(0); 1125 } 1126 #ifdef USE_TIMER_POOL 1127 if (!(mtcp->timer_pool = MPCreate(sizeof(struct timer), 1128 sizeof(struct timer) * g_config.mos->max_concurrency * 10, 0))) { 1129 TRACE_ERROR("Failed to allocate ev_table pool\n"); 1130 exit(0); 1131 } 1132 #endif 1133 mtcp->flow_pool = MPCreate(sizeof(tcp_stream), 1134 sizeof(tcp_stream) * g_config.mos->max_concurrency, IS_HUGEPAGE); 1135 if (!mtcp->flow_pool) { 1136 CTRACE_ERROR("Failed to allocate tcp flow pool.\n"); 1137 return NULL; 1138 } 1139 mtcp->rv_pool = MPCreate(sizeof(struct tcp_recv_vars), 1140 sizeof(struct tcp_recv_vars) * g_config.mos->max_concurrency, IS_HUGEPAGE); 1141 if (!mtcp->rv_pool) { 1142 CTRACE_ERROR("Failed to allocate tcp recv variable pool.\n"); 1143 return NULL; 1144 } 1145 mtcp->sv_pool = MPCreate(sizeof(struct tcp_send_vars), 1146 sizeof(struct tcp_send_vars) * g_config.mos->max_concurrency, IS_HUGEPAGE); 1147 if (!mtcp->sv_pool) { 1148 CTRACE_ERROR("Failed to allocate tcp send variable pool.\n"); 1149 return NULL; 1150 } 1151 1152 mtcp->rbm_snd = SBManagerCreate(g_config.mos->wmem_size, g_config.mos->no_ring_buffers, 1153 g_config.mos->max_concurrency); 1154 if (!mtcp->rbm_snd) { 1155 CTRACE_ERROR("Failed to create send ring buffer.\n"); 1156 return NULL; 1157 } 1158 1159 mtcp->smap = (socket_map_t)calloc(g_config.mos->max_concurrency, sizeof(struct socket_map)); 1160 if (!mtcp->smap) { 1161 perror("calloc"); 1162 CTRACE_ERROR("Failed to allocate memory for stream map.\n"); 1163 return NULL; 1164 } 1165 1166 if (mon_app_exists) { 1167 mtcp->msmap = (socket_map_t)calloc(g_config.mos->max_concurrency, sizeof(struct socket_map)); 1168 if (!mtcp->msmap) { 1169 perror("calloc"); 1170 CTRACE_ERROR("Failed to allocate memory for monitor stream map.\n"); 1171 return NULL; 1172 } 1173 1174 for (i = 0; i < g_config.mos->max_concurrency; i++) { 1175 mtcp->msmap[i].monitor_stream = calloc(1, sizeof(struct mon_stream)); 1176 if (!mtcp->msmap[i].monitor_stream) { 1177 perror("calloc"); 1178 CTRACE_ERROR("Failed to allocate memory for monitr stream map\n"); 1179 return NULL; 1180 } 1181 } 1182 } 1183 1184 TAILQ_INIT(&mtcp->timer_list); 1185 TAILQ_INIT(&mtcp->monitors); 1186 1187 TAILQ_INIT(&mtcp->free_smap); 1188 for (i = 0; i < g_config.mos->max_concurrency; i++) { 1189 mtcp->smap[i].id = i; 1190 mtcp->smap[i].socktype = MOS_SOCK_UNUSED; 1191 memset(&mtcp->smap[i].saddr, 0, sizeof(struct sockaddr_in)); 1192 mtcp->smap[i].stream = NULL; 1193 TAILQ_INSERT_TAIL(&mtcp->free_smap, &mtcp->smap[i], link); 1194 } 1195 1196 if (mon_app_exists) { 1197 TAILQ_INIT(&mtcp->free_msmap); 1198 for (i = 0; i < g_config.mos->max_concurrency; i++) { 1199 mtcp->msmap[i].id = i; 1200 mtcp->msmap[i].socktype = MOS_SOCK_UNUSED; 1201 memset(&mtcp->msmap[i].saddr, 0, sizeof(struct sockaddr_in)); 1202 TAILQ_INSERT_TAIL(&mtcp->free_msmap, &mtcp->msmap[i], link); 1203 } 1204 } 1205 1206 mtcp->ctx = ctx; 1207 mtcp->ep = NULL; 1208 1209 snprintf(log_name, MAX_FILE_NAME, "%s/"LOG_FILE_NAME"_%d", 1210 g_config.mos->mos_log, ctx->cpu); 1211 mtcp->log_fp = fopen(log_name, "w+"); 1212 if (!mtcp->log_fp) { 1213 perror("fopen"); 1214 CTRACE_ERROR("Failed to create file for logging. (%s)\n", log_name); 1215 return NULL; 1216 } 1217 mtcp->sp_fd = g_logctx[ctx->cpu]->pair_sp_fd; 1218 mtcp->logger = g_logctx[ctx->cpu]; 1219 1220 mtcp->connectq = CreateStreamQueue(BACKLOG_SIZE); 1221 if (!mtcp->connectq) { 1222 CTRACE_ERROR("Failed to create connect queue.\n"); 1223 return NULL; 1224 } 1225 mtcp->sendq = CreateStreamQueue(g_config.mos->max_concurrency); 1226 if (!mtcp->sendq) { 1227 CTRACE_ERROR("Failed to create send queue.\n"); 1228 return NULL; 1229 } 1230 mtcp->ackq = CreateStreamQueue(g_config.mos->max_concurrency); 1231 if (!mtcp->ackq) { 1232 CTRACE_ERROR("Failed to create ack queue.\n"); 1233 return NULL; 1234 } 1235 mtcp->closeq = CreateStreamQueue(g_config.mos->max_concurrency); 1236 if (!mtcp->closeq) { 1237 CTRACE_ERROR("Failed to create close queue.\n"); 1238 return NULL; 1239 } 1240 mtcp->closeq_int = CreateInternalStreamQueue(g_config.mos->max_concurrency); 1241 if (!mtcp->closeq_int) { 1242 CTRACE_ERROR("Failed to create close queue.\n"); 1243 return NULL; 1244 } 1245 mtcp->resetq = CreateStreamQueue(g_config.mos->max_concurrency); 1246 if (!mtcp->resetq) { 1247 CTRACE_ERROR("Failed to create reset queue.\n"); 1248 return NULL; 1249 } 1250 mtcp->resetq_int = CreateInternalStreamQueue(g_config.mos->max_concurrency); 1251 if (!mtcp->resetq_int) { 1252 CTRACE_ERROR("Failed to create reset queue.\n"); 1253 return NULL; 1254 } 1255 mtcp->destroyq = CreateStreamQueue(g_config.mos->max_concurrency); 1256 if (!mtcp->destroyq) { 1257 CTRACE_ERROR("Failed to create destroy queue.\n"); 1258 return NULL; 1259 } 1260 1261 mtcp->g_sender = CreateMTCPSender(-1); 1262 if (!mtcp->g_sender) { 1263 CTRACE_ERROR("Failed to create global sender structure.\n"); 1264 return NULL; 1265 } 1266 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 1267 mtcp->n_sender[i] = CreateMTCPSender(i); 1268 if (!mtcp->n_sender[i]) { 1269 CTRACE_ERROR("Failed to create per-nic sender structure.\n"); 1270 return NULL; 1271 } 1272 } 1273 1274 mtcp->rto_store = InitRTOHashstore(); 1275 TAILQ_INIT(&mtcp->timewait_list); 1276 TAILQ_INIT(&mtcp->timeout_list); 1277 1278 return mtcp; 1279 } 1280 /*----------------------------------------------------------------------------*/ 1281 static void * 1282 MTCPRunThread(void *arg) 1283 { 1284 mctx_t mctx = (mctx_t)arg; 1285 int cpu = mctx->cpu; 1286 int working; 1287 struct mtcp_manager *mtcp; 1288 struct mtcp_thread_context *ctx; 1289 1290 /* affinitize the thread to this core first */ 1291 mtcp_core_affinitize(cpu); 1292 1293 /* memory alloc after core affinitization would use local memory 1294 most time */ 1295 ctx = calloc(1, sizeof(*ctx)); 1296 if (!ctx) { 1297 perror("calloc"); 1298 TRACE_ERROR("Failed to calloc mtcp context.\n"); 1299 exit(-1); 1300 } 1301 ctx->thread = pthread_self(); 1302 ctx->cpu = cpu; 1303 mtcp = ctx->mtcp_manager = InitializeMTCPManager(ctx); 1304 if (!mtcp) { 1305 TRACE_ERROR("Failed to initialize mtcp manager.\n"); 1306 exit(-1); 1307 } 1308 1309 /* assign mtcp context's underlying I/O module */ 1310 mtcp->iom = current_iomodule_func; 1311 1312 /* I/O initializing */ 1313 if (mtcp->iom->init_handle) 1314 mtcp->iom->init_handle(ctx); 1315 1316 if (pthread_mutex_init(&ctx->flow_pool_lock, NULL)) { 1317 perror("pthread_mutex_init of ctx->flow_pool_lock\n"); 1318 exit(-1); 1319 } 1320 1321 if (pthread_mutex_init(&ctx->socket_pool_lock, NULL)) { 1322 perror("pthread_mutex_init of ctx->socket_pool_lock\n"); 1323 exit(-1); 1324 } 1325 1326 SQ_LOCK_INIT(&ctx->connect_lock, "ctx->connect_lock", exit(-1)); 1327 SQ_LOCK_INIT(&ctx->close_lock, "ctx->close_lock", exit(-1)); 1328 SQ_LOCK_INIT(&ctx->reset_lock, "ctx->reset_lock", exit(-1)); 1329 SQ_LOCK_INIT(&ctx->sendq_lock, "ctx->sendq_lock", exit(-1)); 1330 SQ_LOCK_INIT(&ctx->ackq_lock, "ctx->ackq_lock", exit(-1)); 1331 SQ_LOCK_INIT(&ctx->destroyq_lock, "ctx->destroyq_lock", exit(-1)); 1332 1333 /* remember this context pointer for signal processing */ 1334 g_pctx[cpu] = ctx; 1335 mlockall(MCL_CURRENT); 1336 1337 // attach (nic device, queue) 1338 working = AttachDevice(ctx); 1339 if (working != 0) { 1340 sem_post(&g_init_sem[ctx->cpu]); 1341 TRACE_DBG("MTCP thread %d finished. Not attached any device\n", ctx->cpu); 1342 pthread_exit(NULL); 1343 } 1344 1345 TRACE_DBG("CPU %d: initialization finished.\n", cpu); 1346 sem_post(&g_init_sem[ctx->cpu]); 1347 1348 /* start the main loop */ 1349 RunMainLoop(ctx); 1350 1351 TRACE_DBG("MTCP thread %d finished.\n", ctx->cpu); 1352 1353 /* signaling mTCP thread is done */ 1354 sem_post(&g_done_sem[mctx->cpu]); 1355 1356 //pthread_exit(NULL); 1357 return 0; 1358 } 1359 /*----------------------------------------------------------------------------*/ 1360 #ifdef ENABLE_DPDK 1361 static int MTCPDPDKRunThread(void *arg) 1362 { 1363 MTCPRunThread(arg); 1364 return 0; 1365 } 1366 #endif /* !ENABLE_DPDK */ 1367 /*----------------------------------------------------------------------------*/ 1368 mctx_t 1369 mtcp_create_context(int cpu) 1370 { 1371 mctx_t mctx; 1372 int ret; 1373 1374 if (cpu >= g_config.mos->num_cores) { 1375 TRACE_ERROR("Failed initialize new mtcp context. " 1376 "Requested cpu id %d exceed the number of cores %d configured to use.\n", 1377 cpu, g_config.mos->num_cores); 1378 return NULL; 1379 } 1380 1381 /* check if mtcp_create_context() was already initialized */ 1382 if (g_logctx[cpu] != NULL) { 1383 TRACE_ERROR("%s was already initialized before!\n", 1384 __FUNCTION__); 1385 return NULL; 1386 } 1387 1388 ret = sem_init(&g_init_sem[cpu], 0, 0); 1389 if (ret) { 1390 TRACE_ERROR("Failed initialize init_sem.\n"); 1391 return NULL; 1392 } 1393 1394 ret = sem_init(&g_done_sem[cpu], 0, 0); 1395 if (ret) { 1396 TRACE_ERROR("Failed initialize done_sem.\n"); 1397 return NULL; 1398 } 1399 1400 mctx = (mctx_t)calloc(1, sizeof(struct mtcp_context)); 1401 if (!mctx) { 1402 TRACE_ERROR("Failed to allocate memory for mtcp_context.\n"); 1403 return NULL; 1404 } 1405 mctx->cpu = cpu; 1406 g_ctx[cpu] = mctx; 1407 1408 /* initialize logger */ 1409 g_logctx[cpu] = (struct log_thread_context *) 1410 calloc(1, sizeof(struct log_thread_context)); 1411 if (!g_logctx[cpu]) { 1412 perror("malloc"); 1413 TRACE_ERROR("Failed to allocate memory for log thread context.\n"); 1414 return NULL; 1415 } 1416 InitLogThreadContext(g_logctx[cpu], cpu); 1417 if (pthread_create(&log_thread[cpu], 1418 NULL, ThreadLogMain, (void *)g_logctx[cpu])) { 1419 perror("pthread_create"); 1420 TRACE_ERROR("Failed to create log thread\n"); 1421 return NULL; 1422 } 1423 1424 /* use rte_eal_remote_launch() for DPDK 1425 (worker/slave threads are already initialized by rte_eal_init()) */ 1426 #ifdef ENABLE_DPDK 1427 /* Wake up mTCP threads (wake up I/O threads) */ 1428 if (current_iomodule_func == &dpdk_module_func) { 1429 int master; 1430 master = rte_get_master_lcore(); 1431 if (master == cpu) { 1432 lcore_config[master].ret = 0; 1433 lcore_config[master].state = FINISHED; 1434 if (pthread_create(&g_thread[cpu], 1435 NULL, MTCPRunThread, (void *)mctx) != 0) { 1436 TRACE_ERROR("pthread_create of mtcp thread failed!\n"); 1437 return NULL; 1438 } 1439 } else 1440 rte_eal_remote_launch(MTCPDPDKRunThread, mctx, cpu); 1441 } else 1442 #endif /* !ENABLE_DPDK */ 1443 { 1444 if (pthread_create(&g_thread[cpu], 1445 NULL, MTCPRunThread, (void *)mctx) != 0) { 1446 TRACE_ERROR("pthread_create of mtcp thread failed!\n"); 1447 return NULL; 1448 } 1449 } 1450 1451 sem_wait(&g_init_sem[cpu]); 1452 sem_destroy(&g_init_sem[cpu]); 1453 1454 running[cpu] = TRUE; 1455 1456 #ifdef NETSTAT 1457 #if NETSTAT_TOTAL 1458 if (printer < 0) { 1459 printer = cpu; 1460 TRACE_INFO("CPU %d is in charge of printing stats.\n", printer); 1461 } 1462 #endif 1463 #endif 1464 1465 return mctx; 1466 } 1467 /*----------------------------------------------------------------------------*/ 1468 int 1469 mtcp_destroy_context(mctx_t mctx) 1470 { 1471 struct mtcp_thread_context *ctx = g_pctx[mctx->cpu]; 1472 if (ctx != NULL) 1473 ctx->done = 1; 1474 1475 struct mtcp_context m; 1476 m.cpu = mctx->cpu; 1477 mtcp_free_context(&m); 1478 1479 free(mctx); 1480 1481 return 0; 1482 } 1483 /*----------------------------------------------------------------------------*/ 1484 void 1485 mtcp_free_context(mctx_t mctx) 1486 { 1487 struct mtcp_thread_context *ctx = g_pctx[mctx->cpu]; 1488 struct mtcp_manager *mtcp = ctx->mtcp_manager; 1489 struct log_thread_context *log_ctx = mtcp->logger; 1490 int ret, i; 1491 1492 TRACE_DBG("CPU %d: mtcp_free_context()\n", mctx->cpu); 1493 1494 if (g_pctx[mctx->cpu] == NULL) return; 1495 1496 /* close all stream sockets that are still open */ 1497 if (!ctx->exit) { 1498 for (i = 0; i < g_config.mos->max_concurrency; i++) { 1499 if (mtcp->smap[i].socktype == MOS_SOCK_STREAM) { 1500 TRACE_DBG("Closing remaining socket %d (%s)\n", 1501 i, TCPStateToString(mtcp->smap[i].stream)); 1502 #ifdef DUMP_STREAM 1503 DumpStream(mtcp, mtcp->smap[i].stream); 1504 #endif 1505 mtcp_close(mctx, i); 1506 } 1507 } 1508 } 1509 1510 ctx->done = 1; 1511 ctx->exit = 1; 1512 1513 #ifdef ENABLE_DPDK 1514 if (current_iomodule_func == &dpdk_module_func) { 1515 int master = rte_get_master_lcore(); 1516 if (master == mctx->cpu) 1517 pthread_join(g_thread[mctx->cpu], NULL); 1518 else 1519 rte_eal_wait_lcore(mctx->cpu); 1520 } else 1521 #endif /* !ENABLE_DPDK */ 1522 { 1523 pthread_join(g_thread[mctx->cpu], NULL); 1524 } 1525 1526 TRACE_INFO("MTCP thread %d joined.\n", mctx->cpu); 1527 1528 running[mctx->cpu] = FALSE; 1529 1530 #ifdef NETSTAT 1531 #if NETSTAT_TOTAL 1532 if (printer == mctx->cpu) { 1533 for (i = 0; i < num_cpus; i++) { 1534 if (i != mctx->cpu && running[i]) { 1535 printer = i; 1536 break; 1537 } 1538 } 1539 } 1540 #endif 1541 #endif 1542 1543 log_ctx->done = 1; 1544 ret = write(log_ctx->pair_sp_fd, "F", 1); 1545 if (ret != 1) 1546 TRACE_ERROR("CPU %d: Fail to signal socket pair\n", mctx->cpu); 1547 1548 if ((ret = pthread_join(log_thread[ctx->cpu], NULL) != 0)) { 1549 TRACE_ERROR("pthread_join() returns error (errno = %s)\n", strerror(ret)); 1550 exit(-1); 1551 } 1552 1553 1554 fclose(mtcp->log_fp); 1555 TRACE_LOG("Log thread %d joined.\n", mctx->cpu); 1556 1557 if (mtcp->connectq) { 1558 DestroyStreamQueue(mtcp->connectq); 1559 mtcp->connectq = NULL; 1560 } 1561 if (mtcp->sendq) { 1562 DestroyStreamQueue(mtcp->sendq); 1563 mtcp->sendq = NULL; 1564 } 1565 if (mtcp->ackq) { 1566 DestroyStreamQueue(mtcp->ackq); 1567 mtcp->ackq = NULL; 1568 } 1569 if (mtcp->closeq) { 1570 DestroyStreamQueue(mtcp->closeq); 1571 mtcp->closeq = NULL; 1572 } 1573 if (mtcp->closeq_int) { 1574 DestroyInternalStreamQueue(mtcp->closeq_int); 1575 mtcp->closeq_int = NULL; 1576 } 1577 if (mtcp->resetq) { 1578 DestroyStreamQueue(mtcp->resetq); 1579 mtcp->resetq = NULL; 1580 } 1581 if (mtcp->resetq_int) { 1582 DestroyInternalStreamQueue(mtcp->resetq_int); 1583 mtcp->resetq_int = NULL; 1584 } 1585 if (mtcp->destroyq) { 1586 DestroyStreamQueue(mtcp->destroyq); 1587 mtcp->destroyq = NULL; 1588 } 1589 1590 DestroyMTCPSender(mtcp->g_sender); 1591 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 1592 DestroyMTCPSender(mtcp->n_sender[i]); 1593 } 1594 1595 MPDestroy(mtcp->rv_pool); 1596 MPDestroy(mtcp->sv_pool); 1597 MPDestroy(mtcp->flow_pool); 1598 1599 if (mtcp->ap) { 1600 DestroyAddressPool(mtcp->ap); 1601 mtcp->ap = NULL; 1602 } 1603 1604 SQ_LOCK_DESTROY(&ctx->connect_lock); 1605 SQ_LOCK_DESTROY(&ctx->close_lock); 1606 SQ_LOCK_DESTROY(&ctx->reset_lock); 1607 SQ_LOCK_DESTROY(&ctx->sendq_lock); 1608 SQ_LOCK_DESTROY(&ctx->ackq_lock); 1609 SQ_LOCK_DESTROY(&ctx->destroyq_lock); 1610 1611 //TRACE_INFO("MTCP thread %d destroyed.\n", mctx->cpu); 1612 if (mtcp->iom->destroy_handle) 1613 mtcp->iom->destroy_handle(ctx); 1614 if (g_logctx[mctx->cpu]) { 1615 free(g_logctx[mctx->cpu]); 1616 g_logctx[mctx->cpu] = NULL; 1617 } 1618 free(ctx); 1619 g_pctx[mctx->cpu] = NULL; 1620 } 1621 /*----------------------------------------------------------------------------*/ 1622 mtcp_sighandler_t 1623 mtcp_register_signal(int signum, mtcp_sighandler_t handler) 1624 { 1625 mtcp_sighandler_t prev; 1626 1627 if (signum == SIGINT) { 1628 prev = app_signal_handler; 1629 app_signal_handler = handler; 1630 } else { 1631 if ((prev = signal(signum, handler)) == SIG_ERR) { 1632 perror("signal"); 1633 return SIG_ERR; 1634 } 1635 } 1636 1637 return prev; 1638 } 1639 /*----------------------------------------------------------------------------*/ 1640 int 1641 mtcp_getconf(struct mtcp_conf *conf) 1642 { 1643 int i, j; 1644 1645 if (!conf) { 1646 errno = EINVAL; 1647 return -1; 1648 } 1649 1650 conf->num_cores = g_config.mos->num_cores; 1651 conf->max_concurrency = g_config.mos->max_concurrency; 1652 conf->cpu_mask = g_config.mos->cpu_mask; 1653 1654 conf->rcvbuf_size = g_config.mos->rmem_size; 1655 conf->sndbuf_size = g_config.mos->wmem_size; 1656 1657 conf->tcp_timewait = g_config.mos->tcp_tw_interval; 1658 conf->tcp_timeout = g_config.mos->tcp_timeout; 1659 1660 i = 0; 1661 struct conf_block *bwalk; 1662 TAILQ_FOREACH(bwalk, &g_config.app_blkh, link) { 1663 struct app_conf *app_conf = (struct app_conf *)bwalk->conf; 1664 for (j = 0; j < app_conf->app_argc; j++) 1665 conf->app_argv[i][j] = app_conf->app_argv[j]; 1666 conf->app_argc[i] = app_conf->app_argc; 1667 conf->app_cpu_mask[i] = app_conf->cpu_mask; 1668 i++; 1669 } 1670 conf->num_app = i; 1671 1672 return 0; 1673 } 1674 /*----------------------------------------------------------------------------*/ 1675 int 1676 mtcp_setconf(const struct mtcp_conf *conf) 1677 { 1678 if (!conf) 1679 return -1; 1680 1681 g_config.mos->num_cores = conf->num_cores; 1682 g_config.mos->max_concurrency = conf->max_concurrency; 1683 1684 g_config.mos->rmem_size = conf->rcvbuf_size; 1685 g_config.mos->wmem_size = conf->sndbuf_size; 1686 1687 g_config.mos->tcp_tw_interval = conf->tcp_timewait; 1688 g_config.mos->tcp_timeout = conf->tcp_timeout; 1689 1690 TRACE_CONFIG("Configuration updated by mtcp_setconf().\n"); 1691 //PrintConfiguration(); 1692 1693 return 0; 1694 } 1695 /*----------------------------------------------------------------------------*/ 1696 int 1697 mtcp_init(const char *config_file) 1698 { 1699 int i; 1700 int ret; 1701 1702 if (geteuid()) { 1703 TRACE_CONFIG("[CAUTION] Run as root if mlock is necessary.\n"); 1704 #if defined(ENABLE_DPDK) || defined(ENABLE_NETMAP) 1705 TRACE_CONFIG("[CAUTION] Run the app as root!\n"); 1706 exit(EXIT_FAILURE); 1707 #endif 1708 } 1709 1710 /* getting cpu and NIC */ 1711 num_cpus = GetNumCPUs(); 1712 assert(num_cpus >= 1); 1713 for (i = 0; i < num_cpus; i++) { 1714 g_mtcp[i] = NULL; 1715 running[i] = FALSE; 1716 sigint_cnt[i] = 0; 1717 } 1718 1719 ret = LoadConfigurationUpperHalf(config_file); 1720 if (ret) { 1721 TRACE_CONFIG("Error occured while loading configuration.\n"); 1722 return -1; 1723 } 1724 1725 #if defined(ENABLE_PSIO) 1726 current_iomodule_func = &ps_module_func; 1727 #elif defined(ENABLE_DPDK) 1728 current_iomodule_func = &dpdk_module_func; 1729 #elif defined(ENABLE_PCAP) 1730 current_iomodule_func = &pcap_module_func; 1731 #elif defined(ENABLE_NETMAP) 1732 current_iomodule_func = &netmap_module_func; 1733 #endif 1734 1735 if (current_iomodule_func->load_module_upper_half) 1736 current_iomodule_func->load_module_upper_half(); 1737 1738 LoadConfigurationLowerHalf(); 1739 1740 //PrintConfiguration(); 1741 1742 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 1743 ap[i] = CreateAddressPool(g_config.mos->netdev_table->ent[i]->ip_addr, 1); 1744 if (!ap[i]) { 1745 TRACE_CONFIG("Error occured while create address pool[%d]\n", 1746 i); 1747 return -1; 1748 } 1749 } 1750 1751 //PrintInterfaceInfo(); 1752 //PrintRoutingTable(); 1753 //PrintARPTable(); 1754 InitARPTable(); 1755 1756 if (signal(SIGUSR1, HandleSignal) == SIG_ERR) { 1757 perror("signal, SIGUSR1"); 1758 return -1; 1759 } 1760 if (signal(SIGINT, HandleSignal) == SIG_ERR) { 1761 perror("signal, SIGINT"); 1762 return -1; 1763 } 1764 app_signal_handler = NULL; 1765 1766 printf("load_module(): %p\n", current_iomodule_func); 1767 /* load system-wide io module specs */ 1768 if (current_iomodule_func->load_module_lower_half) 1769 current_iomodule_func->load_module_lower_half(); 1770 1771 GlobInitEvent(); 1772 1773 PrintConf(&g_config); 1774 1775 return 0; 1776 } 1777 /*----------------------------------------------------------------------------*/ 1778 int 1779 mtcp_destroy() 1780 { 1781 int i; 1782 1783 /* wait until all threads are closed */ 1784 /* 1785 for (i = 0; i < num_cpus; i++) { 1786 if (running[i]) { 1787 if (pthread_join(g_thread[i], NULL) != 0) 1788 return -1; 1789 } 1790 } 1791 */ 1792 1793 for (i = 0; i < g_config.mos->netdev_table->num; i++) 1794 DestroyAddressPool(ap[i]); 1795 1796 TRACE_INFO("All MTCP threads are joined.\n"); 1797 1798 return 0; 1799 } 1800 /*----------------------------------------------------------------------------*/ 1801