1 #define _GNU_SOURCE 2 #include <sched.h> 3 #include <unistd.h> 4 #include <sys/time.h> 5 #include <semaphore.h> 6 #include <sys/mman.h> 7 #include <signal.h> 8 #include <assert.h> 9 #include <string.h> 10 11 #include "cpu.h" 12 #include "eth_in.h" 13 #include "fhash.h" 14 #include "tcp_send_buffer.h" 15 #include "tcp_ring_buffer.h" 16 #include "socket.h" 17 #include "eth_out.h" 18 #include "tcp.h" 19 #include "tcp_in.h" 20 #include "tcp_out.h" 21 #include "mtcp_api.h" 22 #include "eventpoll.h" 23 #include "logger.h" 24 #include "config.h" 25 #include "arp.h" 26 #include "ip_out.h" 27 #include "timer.h" 28 #include "debug.h" 29 #include "event_callback.h" 30 #include "tcp_rb.h" 31 #include "tcp_stream.h" 32 #include "io_module.h" 33 34 #ifdef ENABLE_DPDK 35 /* for launching rte thread */ 36 #include <rte_launch.h> 37 #include <rte_lcore.h> 38 #endif /* !ENABLE_DPDK */ 39 #define PS_CHUNK_SIZE 64 40 #define RX_THRESH (PS_CHUNK_SIZE * 0.8) 41 42 #define ROUND_STAT FALSE 43 #define TIME_STAT FALSE 44 #define EVENT_STAT FALSE 45 #define TESTING FALSE 46 47 #define LOG_FILE_NAME "log" 48 #define MAX_FILE_NAME 1024 49 50 #define MAX(a, b) ((a)>(b)?(a):(b)) 51 #define MIN(a, b) ((a)<(b)?(a):(b)) 52 53 #define PER_STREAM_SLICE 0.1 // in ms 54 #define PER_STREAM_TCHECK 1 // in ms 55 #define PS_SELECT_TIMEOUT 100 // in us 56 57 #define GBPS(bytes) (bytes * 8.0 / (1000 * 1000 * 1000)) 58 59 /*----------------------------------------------------------------------------*/ 60 /* handlers for threads */ 61 struct mtcp_thread_context *g_pctx[MAX_CPUS] = {0}; 62 struct log_thread_context *g_logctx[MAX_CPUS] = {0}; 63 /*----------------------------------------------------------------------------*/ 64 static pthread_t g_thread[MAX_CPUS] = {0}; 65 static pthread_t log_thread[MAX_CPUS] = {0}; 66 /*----------------------------------------------------------------------------*/ 67 static sem_t g_init_sem[MAX_CPUS]; 68 static sem_t g_done_sem[MAX_CPUS]; 69 static int running[MAX_CPUS] = {0}; 70 /*----------------------------------------------------------------------------*/ 71 mtcp_sighandler_t app_signal_handler; 72 static int sigint_cnt[MAX_CPUS] = {0}; 73 static struct timespec sigint_ts[MAX_CPUS]; 74 /*----------------------------------------------------------------------------*/ 75 #ifdef NETSTAT 76 #if NETSTAT_TOTAL 77 static int printer = -1; 78 #if ROUND_STAT 79 #endif /* ROUND_STAT */ 80 #endif /* NETSTAT_TOTAL */ 81 #endif /* NETSTAT */ 82 /*----------------------------------------------------------------------------*/ 83 void 84 HandleSignal(int signal) 85 { 86 int i = 0; 87 88 if (signal == SIGINT) { 89 FreeConfigResources(); 90 #ifdef DARWIN 91 int core = 0; 92 #else 93 int core = sched_getcpu(); 94 #endif 95 struct timespec cur_ts; 96 97 clock_gettime(CLOCK_REALTIME, &cur_ts); 98 99 if (sigint_cnt[core] > 0 && cur_ts.tv_sec == sigint_ts[core].tv_sec) { 100 for (i = 0; i < g_config.mos->num_cores; i++) { 101 if (running[i]) { 102 exit(0); 103 g_pctx[i]->exit = TRUE; 104 } 105 } 106 } else { 107 for (i = 0; i < g_config.mos->num_cores; i++) { 108 if (g_pctx[i]) 109 g_pctx[i]->interrupt = TRUE; 110 } 111 if (!app_signal_handler) { 112 for (i = 0; i < g_config.mos->num_cores; i++) { 113 if (running[i]) { 114 exit(0); 115 g_pctx[i]->exit = TRUE; 116 } 117 } 118 } 119 } 120 sigint_cnt[core]++; 121 clock_gettime(CLOCK_REALTIME, &sigint_ts[core]); 122 } 123 124 if (signal != SIGUSR1) { 125 if (app_signal_handler) { 126 app_signal_handler(signal); 127 } 128 } 129 } 130 /*----------------------------------------------------------------------------*/ 131 static int 132 AttachDevice(struct mtcp_thread_context* ctx) 133 { 134 int working = -1; 135 mtcp_manager_t mtcp = ctx->mtcp_manager; 136 137 if (mtcp->iom->link_devices) 138 working = mtcp->iom->link_devices(ctx); 139 else 140 return 0; 141 142 return working; 143 } 144 /*----------------------------------------------------------------------------*/ 145 #ifdef TIMESTAT 146 static inline void 147 InitStatCounter(struct stat_counter *counter) 148 { 149 counter->cnt = 0; 150 counter->sum = 0; 151 counter->max = 0; 152 counter->min = 0; 153 } 154 /*----------------------------------------------------------------------------*/ 155 static inline void 156 UpdateStatCounter(struct stat_counter *counter, int64_t value) 157 { 158 counter->cnt++; 159 counter->sum += value; 160 if (value > counter->max) 161 counter->max = value; 162 if (counter->min == 0 || value < counter->min) 163 counter->min = value; 164 } 165 /*----------------------------------------------------------------------------*/ 166 static inline uint64_t 167 GetAverageStat(struct stat_counter *counter) 168 { 169 return counter->cnt ? (counter->sum / counter->cnt) : 0; 170 } 171 /*----------------------------------------------------------------------------*/ 172 static inline int64_t 173 TimeDiffUs(struct timeval *t2, struct timeval *t1) 174 { 175 return (t2->tv_sec - t1->tv_sec) * 1000000 + 176 (int64_t)(t2->tv_usec - t1->tv_usec); 177 } 178 /*----------------------------------------------------------------------------*/ 179 #endif 180 #ifdef NETSTAT 181 static inline void 182 PrintThreadNetworkStats(mtcp_manager_t mtcp, struct net_stat *ns) 183 { 184 int i; 185 186 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 187 ns->rx_packets[i] = mtcp->nstat.rx_packets[i] - mtcp->p_nstat.rx_packets[i]; 188 ns->rx_errors[i] = mtcp->nstat.rx_errors[i] - mtcp->p_nstat.rx_errors[i]; 189 ns->rx_bytes[i] = mtcp->nstat.rx_bytes[i] - mtcp->p_nstat.rx_bytes[i]; 190 ns->tx_packets[i] = mtcp->nstat.tx_packets[i] - mtcp->p_nstat.tx_packets[i]; 191 ns->tx_drops[i] = mtcp->nstat.tx_drops[i] - mtcp->p_nstat.tx_drops[i]; 192 ns->tx_bytes[i] = mtcp->nstat.tx_bytes[i] - mtcp->p_nstat.tx_bytes[i]; 193 #if NETSTAT_PERTHREAD 194 if (g_config.mos->netdev_table->ent[i]->stat_print) { 195 fprintf(stderr, "[CPU%2d] %s flows: %6u, " 196 "RX: %7llu(pps) (err: %5llu), %5.2lf(Gbps), " 197 "TX: %7llu(pps), %5.2lf(Gbps)\n", 198 mtcp->ctx->cpu, 199 g_config.mos->netdev_table->ent[i]->dev_name, 200 (unsigned)mtcp->flow_cnt, 201 (long long unsigned)ns->rx_packets[i], 202 (long long unsigned)ns->rx_errors[i], 203 GBPS(ns->rx_bytes[i]), 204 (long long unsigned)ns->tx_packets[i], 205 GBPS(ns->tx_bytes[i])); 206 } 207 #endif 208 } 209 mtcp->p_nstat = mtcp->nstat; 210 211 } 212 /*----------------------------------------------------------------------------*/ 213 #if ROUND_STAT 214 static inline void 215 PrintThreadRoundStats(mtcp_manager_t mtcp, struct run_stat *rs) 216 { 217 #define ROUND_DIV (1000) 218 rs->rounds = mtcp->runstat.rounds - mtcp->p_runstat.rounds; 219 rs->rounds_rx = mtcp->runstat.rounds_rx - mtcp->p_runstat.rounds_rx; 220 rs->rounds_rx_try = mtcp->runstat.rounds_rx_try - mtcp->p_runstat.rounds_rx_try; 221 rs->rounds_tx = mtcp->runstat.rounds_tx - mtcp->p_runstat.rounds_tx; 222 rs->rounds_tx_try = mtcp->runstat.rounds_tx_try - mtcp->p_runstat.rounds_tx_try; 223 rs->rounds_select = mtcp->runstat.rounds_select - mtcp->p_runstat.rounds_select; 224 rs->rounds_select_rx = mtcp->runstat.rounds_select_rx - mtcp->p_runstat.rounds_select_rx; 225 rs->rounds_select_tx = mtcp->runstat.rounds_select_tx - mtcp->p_runstat.rounds_select_tx; 226 rs->rounds_select_intr = mtcp->runstat.rounds_select_intr - mtcp->p_runstat.rounds_select_intr; 227 rs->rounds_twcheck = mtcp->runstat.rounds_twcheck - mtcp->p_runstat.rounds_twcheck; 228 mtcp->p_runstat = mtcp->runstat; 229 #if NETSTAT_PERTHREAD 230 fprintf(stderr, "[CPU%2d] Rounds: %4lluK, " 231 "rx: %3lluK (try: %4lluK), tx: %3lluK (try: %4lluK), " 232 "ps_select: %4llu (rx: %4llu, tx: %4llu, intr: %3llu)\n", 233 mtcp->ctx->cpu, rs->rounds / ROUND_DIV, 234 rs->rounds_rx / ROUND_DIV, rs->rounds_rx_try / ROUND_DIV, 235 rs->rounds_tx / ROUND_DIV, rs->rounds_tx_try / ROUND_DIV, 236 rs->rounds_select, 237 rs->rounds_select_rx, rs->rounds_select_tx, rs->rounds_select_intr); 238 #endif 239 } 240 #endif /* ROUND_STAT */ 241 /*----------------------------------------------------------------------------*/ 242 #if TIME_STAT 243 static inline void 244 PrintThreadRoundTime(mtcp_manager_t mtcp) 245 { 246 fprintf(stderr, "[CPU%2d] Time: (avg, max) " 247 "round: (%4luus, %4luus), processing: (%4luus, %4luus), " 248 "tcheck: (%4luus, %4luus), epoll: (%4luus, %4luus), " 249 "handle: (%4luus, %4luus), xmit: (%4luus, %4luus), " 250 "select: (%4luus, %4luus)\n", mtcp->ctx->cpu, 251 GetAverageStat(&mtcp->rtstat.round), mtcp->rtstat.round.max, 252 GetAverageStat(&mtcp->rtstat.processing), mtcp->rtstat.processing.max, 253 GetAverageStat(&mtcp->rtstat.tcheck), mtcp->rtstat.tcheck.max, 254 GetAverageStat(&mtcp->rtstat.epoll), mtcp->rtstat.epoll.max, 255 GetAverageStat(&mtcp->rtstat.handle), mtcp->rtstat.handle.max, 256 GetAverageStat(&mtcp->rtstat.xmit), mtcp->rtstat.xmit.max, 257 GetAverageStat(&mtcp->rtstat.select), mtcp->rtstat.select.max); 258 259 InitStatCounter(&mtcp->rtstat.round); 260 InitStatCounter(&mtcp->rtstat.processing); 261 InitStatCounter(&mtcp->rtstat.tcheck); 262 InitStatCounter(&mtcp->rtstat.epoll); 263 InitStatCounter(&mtcp->rtstat.handle); 264 InitStatCounter(&mtcp->rtstat.xmit); 265 InitStatCounter(&mtcp->rtstat.select); 266 } 267 #endif 268 #endif /* NETSTAT */ 269 /*----------------------------------------------------------------------------*/ 270 #if EVENT_STAT 271 static inline void 272 PrintEventStat(int core, struct mtcp_epoll_stat *stat) 273 { 274 fprintf(stderr, "[CPU%2d] calls: %lu, waits: %lu, wakes: %lu, " 275 "issued: %lu, registered: %lu, invalidated: %lu, handled: %lu\n", 276 core, stat->calls, stat->waits, stat->wakes, 277 stat->issued, stat->registered, stat->invalidated, stat->handled); 278 memset(stat, 0, sizeof(struct mtcp_epoll_stat)); 279 } 280 #endif /* EVENT_STAT */ 281 /*----------------------------------------------------------------------------*/ 282 #ifdef NETSTAT 283 static inline void 284 PrintNetworkStats(mtcp_manager_t mtcp, uint32_t cur_ts) 285 { 286 #define TIMEOUT 1 287 int i; 288 struct net_stat ns; 289 bool stat_print = false; 290 #if ROUND_STAT 291 struct run_stat rs; 292 #endif /* ROUND_STAT */ 293 #ifdef NETSTAT_TOTAL 294 static double peak_total_rx_gbps = 0; 295 static double peak_total_tx_gbps = 0; 296 static double avg_total_rx_gbps = 0; 297 static double avg_total_tx_gbps = 0; 298 299 double total_rx_gbps = 0, total_tx_gbps = 0; 300 int j; 301 uint32_t gflow_cnt = 0; 302 struct net_stat g_nstat; 303 #if ROUND_STAT 304 struct run_stat g_runstat; 305 #endif /* ROUND_STAT */ 306 #endif /* NETSTAT_TOTAL */ 307 308 if (TS_TO_MSEC(cur_ts - mtcp->p_nstat_ts) < SEC_TO_MSEC(TIMEOUT)) { 309 return; 310 } 311 312 mtcp->p_nstat_ts = cur_ts; 313 gflow_cnt = 0; 314 memset(&g_nstat, 0, sizeof(struct net_stat)); 315 for (i = 0; i < g_config.mos->num_cores; i++) { 316 if (running[i]) { 317 PrintThreadNetworkStats(g_mtcp[i], &ns); 318 #if NETSTAT_TOTAL 319 gflow_cnt += g_mtcp[i]->flow_cnt; 320 for (j = 0; j < g_config.mos->netdev_table->num; j++) { 321 g_nstat.rx_packets[j] += ns.rx_packets[j]; 322 g_nstat.rx_errors[j] += ns.rx_errors[j]; 323 g_nstat.rx_bytes[j] += ns.rx_bytes[j]; 324 g_nstat.tx_packets[j] += ns.tx_packets[j]; 325 g_nstat.tx_drops[j] += ns.tx_drops[j]; 326 g_nstat.tx_bytes[j] += ns.tx_bytes[j]; 327 } 328 #endif 329 } 330 } 331 #if NETSTAT_TOTAL 332 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 333 if (g_config.mos->netdev_table->ent[i]->stat_print) { 334 fprintf(stderr, "[ ALL ] %s, " 335 "RX: %7llu(pps) (err: %5llu), %5.2lf(Gbps), " 336 "TX: %7llu(pps), %5.2lf(Gbps)\n", 337 g_config.mos->netdev_table->ent[i]->dev_name, 338 (long long unsigned)g_nstat.rx_packets[i], 339 (long long unsigned)g_nstat.rx_errors[i], 340 GBPS(g_nstat.rx_bytes[i]), 341 (long long unsigned)g_nstat.tx_packets[i], 342 GBPS(g_nstat.tx_bytes[i])); 343 total_rx_gbps += GBPS(g_nstat.rx_bytes[i]); 344 total_tx_gbps += GBPS(g_nstat.tx_bytes[i]); 345 stat_print = true; 346 } 347 } 348 if (stat_print) { 349 fprintf(stderr, "[ ALL ] flows: %6u\n", gflow_cnt); 350 if (avg_total_rx_gbps == 0) 351 avg_total_rx_gbps = total_rx_gbps; 352 else 353 avg_total_rx_gbps = avg_total_rx_gbps * 0.6 + total_rx_gbps * 0.4; 354 355 if (avg_total_tx_gbps == 0) 356 avg_total_tx_gbps = total_tx_gbps; 357 else 358 avg_total_tx_gbps = avg_total_tx_gbps * 0.6 + total_tx_gbps * 0.4; 359 360 if (peak_total_rx_gbps < total_rx_gbps) 361 peak_total_rx_gbps = total_rx_gbps; 362 if (peak_total_tx_gbps < total_tx_gbps) 363 peak_total_tx_gbps = total_tx_gbps; 364 365 fprintf(stderr, "[ PEAK ] RX: %5.2lf(Gbps), TX: %5.2lf(Gbps)\n" 366 "[ RECENT AVG ] RX: %5.2lf(Gbps), TX: %5.2lf(Gbps)\n", 367 peak_total_rx_gbps, peak_total_tx_gbps, 368 avg_total_rx_gbps, avg_total_tx_gbps); 369 } 370 #endif 371 372 #if ROUND_STAT 373 memset(&g_runstat, 0, sizeof(struct run_stat)); 374 for (i = 0; i < g_config.mos->num_cores; i++) { 375 if (running[i]) { 376 PrintThreadRoundStats(g_mtcp[i], &rs); 377 #if DBGMSG 378 g_runstat.rounds += rs.rounds; 379 g_runstat.rounds_rx += rs.rounds_rx; 380 g_runstat.rounds_rx_try += rs.rounds_rx_try; 381 g_runstat.rounds_tx += rs.rounds_tx; 382 g_runstat.rounds_tx_try += rs.rounds_tx_try; 383 g_runstat.rounds_select += rs.rounds_select; 384 g_runstat.rounds_select_rx += rs.rounds_select_rx; 385 g_runstat.rounds_select_tx += rs.rounds_select_tx; 386 #endif 387 } 388 } 389 390 TRACE_DBG("[ ALL ] Rounds: %4ldK, " 391 "rx: %3ldK (try: %4ldK), tx: %3ldK (try: %4ldK), " 392 "ps_select: %4ld (rx: %4ld, tx: %4ld)\n", 393 g_runstat.rounds / 1000, g_runstat.rounds_rx / 1000, 394 g_runstat.rounds_rx_try / 1000, g_runstat.rounds_tx / 1000, 395 g_runstat.rounds_tx_try / 1000, g_runstat.rounds_select, 396 g_runstat.rounds_select_rx, g_runstat.rounds_select_tx); 397 #endif /* ROUND_STAT */ 398 399 #if TIME_STAT 400 for (i = 0; i < g_config.mos->num_cores; i++) { 401 if (running[i]) { 402 PrintThreadRoundTime(g_mtcp[i]); 403 } 404 } 405 #endif 406 407 #if EVENT_STAT 408 for (i = 0; i < g_config.mos->num_cores; i++) { 409 if (running[i] && g_mtcp[i]->ep) { 410 PrintEventStat(i, &g_mtcp[i]->ep->stat); 411 } 412 } 413 #endif 414 415 fflush(stderr); 416 } 417 #endif /* NETSTAT */ 418 /*----------------------------------------------------------------------------*/ 419 static inline void 420 FlushMonitorReadEvents(mtcp_manager_t mtcp) 421 { 422 struct event_queue *mtcpq; 423 struct tcp_stream *cur_stream; 424 struct mon_listener *walk; 425 426 /* check if monitor sockets should be passed data */ 427 TAILQ_FOREACH(walk, &mtcp->monitors, link) { 428 if (walk->socket->socktype != MOS_SOCK_MONITOR_STREAM || 429 !(mtcpq = walk->eq)) 430 continue; 431 432 while (mtcpq->num_events > 0) { 433 cur_stream = 434 (struct tcp_stream *)mtcpq->events[mtcpq->start++].ev.data.ptr; 435 /* only read events */ 436 if (cur_stream != NULL && 437 (cur_stream->actions & MOS_ACT_READ_DATA)) { 438 if (cur_stream->rcvvar != NULL && 439 cur_stream->rcvvar->rcvbuf != NULL) { 440 /* no need to pass pkt context */ 441 struct socket_map *walk; 442 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 443 HandleCallback(mtcp, MOS_NULL, walk, 444 cur_stream->side, NULL, 445 MOS_ON_CONN_NEW_DATA); 446 } SOCKQ_FOREACH_END; 447 } 448 /* reset the actions now */ 449 cur_stream->actions = 0; 450 } 451 if (mtcpq->start >= mtcpq->size) 452 mtcpq->start = 0; 453 mtcpq->num_events--; 454 } 455 } 456 } 457 /*----------------------------------------------------------------------------*/ 458 static inline void 459 FlushBufferedReadEvents(mtcp_manager_t mtcp) 460 { 461 int i; 462 int offset; 463 struct event_queue *mtcpq; 464 struct tcp_stream *cur_stream; 465 466 if (mtcp->ep == NULL) { 467 TRACE_EPOLL("No epoll socket has been registered yet!\n"); 468 return; 469 } else { 470 /* case when mtcpq exists */ 471 mtcpq = mtcp->ep->mtcp_queue; 472 offset = mtcpq->start; 473 } 474 475 /* we will use queued-up epoll read-in events 476 * to trigger buffered read monitor events */ 477 for (i = 0; i < mtcpq->num_events; i++) { 478 cur_stream = mtcp->smap[mtcpq->events[offset++].sockid].stream; 479 /* only read events */ 480 /* Raise new data callback event */ 481 if (cur_stream != NULL && 482 (cur_stream->socket->events | MOS_EPOLLIN)) { 483 if (cur_stream->rcvvar != NULL && 484 cur_stream->rcvvar->rcvbuf != NULL) { 485 /* no need to pass pkt context */ 486 struct socket_map *walk; 487 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 488 HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side, 489 NULL, MOS_ON_CONN_NEW_DATA); 490 } SOCKQ_FOREACH_END; 491 } 492 } 493 if (offset >= mtcpq->size) 494 offset = 0; 495 } 496 } 497 /*----------------------------------------------------------------------------*/ 498 static inline void 499 FlushEpollEvents(mtcp_manager_t mtcp, uint32_t cur_ts) 500 { 501 struct mtcp_epoll *ep = mtcp->ep; 502 struct event_queue *usrq = ep->usr_queue; 503 struct event_queue *mtcpq = ep->mtcp_queue; 504 505 pthread_mutex_lock(&ep->epoll_lock); 506 if (ep->mtcp_queue->num_events > 0) { 507 /* while mtcp_queue have events */ 508 /* and usr_queue is not full */ 509 while (mtcpq->num_events > 0 && usrq->num_events < usrq->size) { 510 /* copy the event from mtcp_queue to usr_queue */ 511 usrq->events[usrq->end++] = mtcpq->events[mtcpq->start++]; 512 513 if (usrq->end >= usrq->size) 514 usrq->end = 0; 515 usrq->num_events++; 516 517 if (mtcpq->start >= mtcpq->size) 518 mtcpq->start = 0; 519 mtcpq->num_events--; 520 } 521 } 522 523 /* if there are pending events, wake up user */ 524 if (ep->waiting && (ep->usr_queue->num_events > 0 || 525 ep->usr_shadow_queue->num_events > 0)) { 526 STAT_COUNT(mtcp->runstat.rounds_epoll); 527 TRACE_EPOLL("Broadcasting events. num: %d, cur_ts: %u, prev_ts: %u\n", 528 ep->usr_queue->num_events, cur_ts, mtcp->ts_last_event); 529 mtcp->ts_last_event = cur_ts; 530 ep->stat.wakes++; 531 pthread_cond_signal(&ep->epoll_cond); 532 } 533 pthread_mutex_unlock(&ep->epoll_lock); 534 } 535 /*----------------------------------------------------------------------------*/ 536 static inline void 537 HandleApplicationCalls(mtcp_manager_t mtcp, uint32_t cur_ts) 538 { 539 tcp_stream *stream; 540 int cnt, max_cnt; 541 int handled, delayed; 542 int control, send, ack; 543 544 /* connect handling */ 545 while ((stream = StreamDequeue(mtcp->connectq))) { 546 if (stream->state != TCP_ST_SYN_SENT) { 547 TRACE_INFO("Got a connection request from app with state: %s", 548 TCPStateToString(stream)); 549 exit(EXIT_FAILURE); 550 } else { 551 stream->cb_events |= MOS_ON_CONN_START | 552 MOS_ON_TCP_STATE_CHANGE; 553 /* if monitor is on... */ 554 if (stream->pair_stream != NULL) 555 stream->pair_stream->cb_events |= 556 MOS_ON_CONN_START; 557 } 558 AddtoControlList(mtcp, stream, cur_ts); 559 } 560 561 /* send queue handling */ 562 while ((stream = StreamDequeue(mtcp->sendq))) { 563 stream->sndvar->on_sendq = FALSE; 564 AddtoSendList(mtcp, stream); 565 } 566 567 /* ack queue handling */ 568 while ((stream = StreamDequeue(mtcp->ackq))) { 569 stream->sndvar->on_ackq = FALSE; 570 EnqueueACK(mtcp, stream, cur_ts, ACK_OPT_AGGREGATE); 571 } 572 573 /* close handling */ 574 handled = delayed = 0; 575 control = send = ack = 0; 576 while ((stream = StreamDequeue(mtcp->closeq))) { 577 struct tcp_send_vars *sndvar = stream->sndvar; 578 sndvar->on_closeq = FALSE; 579 580 if (sndvar->sndbuf) { 581 sndvar->fss = sndvar->sndbuf->head_seq + sndvar->sndbuf->len; 582 } else { 583 sndvar->fss = stream->snd_nxt; 584 } 585 586 if (g_config.mos->tcp_timeout > 0) 587 RemoveFromTimeoutList(mtcp, stream); 588 589 if (stream->have_reset) { 590 handled++; 591 if (stream->state != TCP_ST_CLOSED_RSVD) { 592 stream->close_reason = TCP_RESET; 593 stream->state = TCP_ST_CLOSED_RSVD; 594 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 595 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 596 DestroyTCPStream(mtcp, stream); 597 } else { 598 TRACE_ERROR("Stream already closed.\n"); 599 } 600 601 } else if (sndvar->on_control_list) { 602 sndvar->on_closeq_int = TRUE; 603 StreamInternalEnqueue(mtcp->closeq_int, stream); 604 delayed++; 605 if (sndvar->on_control_list) 606 control++; 607 if (sndvar->on_send_list) 608 send++; 609 if (sndvar->on_ack_list) 610 ack++; 611 612 } else if (sndvar->on_send_list || sndvar->on_ack_list) { 613 handled++; 614 if (stream->state == TCP_ST_ESTABLISHED) { 615 stream->state = TCP_ST_FIN_WAIT_1; 616 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 617 TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id); 618 619 } else if (stream->state == TCP_ST_CLOSE_WAIT) { 620 stream->state = TCP_ST_LAST_ACK; 621 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 622 TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id); 623 } 624 stream->control_list_waiting = TRUE; 625 626 } else if (stream->state != TCP_ST_CLOSED_RSVD) { 627 handled++; 628 if (stream->state == TCP_ST_ESTABLISHED) { 629 stream->state = TCP_ST_FIN_WAIT_1; 630 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 631 TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id); 632 633 } else if (stream->state == TCP_ST_CLOSE_WAIT) { 634 stream->state = TCP_ST_LAST_ACK; 635 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 636 TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id); 637 } 638 //sndvar->rto = TCP_FIN_RTO; 639 //UpdateRetransmissionTimer(mtcp, stream, mtcp->cur_ts); 640 AddtoControlList(mtcp, stream, cur_ts); 641 } else { 642 TRACE_ERROR("Already closed connection!\n"); 643 } 644 } 645 TRACE_ROUND("Handling close connections. cnt: %d\n", cnt); 646 647 cnt = 0; 648 max_cnt = mtcp->closeq_int->count; 649 while (cnt++ < max_cnt) { 650 stream = StreamInternalDequeue(mtcp->closeq_int); 651 652 if (stream->sndvar->on_control_list) { 653 StreamInternalEnqueue(mtcp->closeq_int, stream); 654 655 } else if (stream->state != TCP_ST_CLOSED_RSVD) { 656 handled++; 657 stream->sndvar->on_closeq_int = FALSE; 658 if (stream->state == TCP_ST_ESTABLISHED) { 659 stream->state = TCP_ST_FIN_WAIT_1; 660 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 661 TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id); 662 663 } else if (stream->state == TCP_ST_CLOSE_WAIT) { 664 stream->state = TCP_ST_LAST_ACK; 665 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 666 TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id); 667 } 668 AddtoControlList(mtcp, stream, cur_ts); 669 } else { 670 stream->sndvar->on_closeq_int = FALSE; 671 TRACE_ERROR("Already closed connection!\n"); 672 } 673 } 674 675 /* reset handling */ 676 while ((stream = StreamDequeue(mtcp->resetq))) { 677 stream->sndvar->on_resetq = FALSE; 678 679 if (g_config.mos->tcp_timeout > 0) 680 RemoveFromTimeoutList(mtcp, stream); 681 682 if (stream->have_reset) { 683 if (stream->state != TCP_ST_CLOSED_RSVD) { 684 stream->close_reason = TCP_RESET; 685 stream->state = TCP_ST_CLOSED_RSVD; 686 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 687 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 688 DestroyTCPStream(mtcp, stream); 689 } else { 690 TRACE_ERROR("Stream already closed.\n"); 691 } 692 693 } else if (stream->sndvar->on_control_list || 694 stream->sndvar->on_send_list || stream->sndvar->on_ack_list) { 695 /* wait until all the queues are flushed */ 696 stream->sndvar->on_resetq_int = TRUE; 697 StreamInternalEnqueue(mtcp->resetq_int, stream); 698 699 } else { 700 if (stream->state != TCP_ST_CLOSED_RSVD) { 701 stream->close_reason = TCP_ACTIVE_CLOSE; 702 stream->state = TCP_ST_CLOSED_RSVD; 703 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 704 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 705 AddtoControlList(mtcp, stream, cur_ts); 706 } else { 707 TRACE_ERROR("Stream already closed.\n"); 708 } 709 } 710 } 711 TRACE_ROUND("Handling reset connections. cnt: %d\n", cnt); 712 713 cnt = 0; 714 max_cnt = mtcp->resetq_int->count; 715 while (cnt++ < max_cnt) { 716 stream = StreamInternalDequeue(mtcp->resetq_int); 717 718 if (stream->sndvar->on_control_list || 719 stream->sndvar->on_send_list || stream->sndvar->on_ack_list) { 720 /* wait until all the queues are flushed */ 721 StreamInternalEnqueue(mtcp->resetq_int, stream); 722 723 } else { 724 stream->sndvar->on_resetq_int = FALSE; 725 726 if (stream->state != TCP_ST_CLOSED_RSVD) { 727 stream->close_reason = TCP_ACTIVE_CLOSE; 728 stream->state = TCP_ST_CLOSED_RSVD; 729 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 730 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 731 AddtoControlList(mtcp, stream, cur_ts); 732 } else { 733 TRACE_ERROR("Stream already closed.\n"); 734 } 735 } 736 } 737 738 /* destroy streams in destroyq */ 739 while ((stream = StreamDequeue(mtcp->destroyq))) { 740 DestroyTCPStream(mtcp, stream); 741 } 742 743 mtcp->wakeup_flag = FALSE; 744 } 745 /*----------------------------------------------------------------------------*/ 746 static inline void 747 WritePacketsToChunks(mtcp_manager_t mtcp, uint32_t cur_ts) 748 { 749 int thresh = g_config.mos->max_concurrency; 750 int i; 751 752 /* Set the threshold to g_config.mos->max_concurrency to send ACK immediately */ 753 /* Otherwise, set to appropriate value (e.g. thresh) */ 754 assert(mtcp->g_sender != NULL); 755 if (mtcp->g_sender->control_list_cnt) 756 WriteTCPControlList(mtcp, mtcp->g_sender, cur_ts, thresh); 757 if (mtcp->g_sender->ack_list_cnt) 758 WriteTCPACKList(mtcp, mtcp->g_sender, cur_ts, thresh); 759 if (mtcp->g_sender->send_list_cnt) 760 WriteTCPDataList(mtcp, mtcp->g_sender, cur_ts, thresh); 761 762 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 763 assert(mtcp->n_sender[i] != NULL); 764 if (mtcp->n_sender[i]->control_list_cnt) 765 WriteTCPControlList(mtcp, mtcp->n_sender[i], cur_ts, thresh); 766 if (mtcp->n_sender[i]->ack_list_cnt) 767 WriteTCPACKList(mtcp, mtcp->n_sender[i], cur_ts, thresh); 768 if (mtcp->n_sender[i]->send_list_cnt) 769 WriteTCPDataList(mtcp, mtcp->n_sender[i], cur_ts, thresh); 770 } 771 } 772 /*----------------------------------------------------------------------------*/ 773 #if TESTING 774 static int 775 DestroyRemainingFlows(mtcp_manager_t mtcp) 776 { 777 struct hashtable *ht = mtcp->tcp_flow_table; 778 tcp_stream *walk; 779 int cnt, i; 780 781 cnt = 0; 782 783 thread_printf(mtcp, mtcp->log_fp, 784 "CPU %d: Flushing remaining flows.\n", mtcp->ctx->cpu); 785 786 for (i = 0; i < NUM_BINS; i++) { 787 TAILQ_FOREACH(walk, &ht->ht_table[i], rcvvar->he_link) { 788 thread_printf(mtcp, mtcp->log_fp, 789 "CPU %d: Destroying stream %d\n", mtcp->ctx->cpu, walk->id); 790 #ifdef DUMP_STREAM 791 DumpStream(mtcp, walk); 792 #endif 793 DestroyTCPStream(mtcp, walk); 794 cnt++; 795 } 796 } 797 798 return cnt; 799 } 800 #endif 801 /*----------------------------------------------------------------------------*/ 802 static void 803 InterruptApplication(mtcp_manager_t mtcp) 804 { 805 /* interrupt if the mtcp_epoll_wait() is waiting */ 806 if (mtcp->ep) { 807 pthread_mutex_lock(&mtcp->ep->epoll_lock); 808 if (mtcp->ep->waiting) { 809 pthread_cond_signal(&mtcp->ep->epoll_cond); 810 } 811 pthread_mutex_unlock(&mtcp->ep->epoll_lock); 812 } 813 /* interrupt if the accept() is waiting */ 814 if (mtcp->listener) { 815 if (mtcp->listener->socket) { 816 pthread_mutex_lock(&mtcp->listener->accept_lock); 817 if (!(mtcp->listener->socket->opts & MTCP_NONBLOCK)) { 818 pthread_cond_signal(&mtcp->listener->accept_cond); 819 } 820 pthread_mutex_unlock(&mtcp->listener->accept_lock); 821 } 822 } 823 } 824 /*----------------------------------------------------------------------------*/ 825 void 826 RunPassiveLoop(mtcp_manager_t mtcp) 827 { 828 sem_wait(&g_done_sem[mtcp->ctx->cpu]); 829 sem_destroy(&g_done_sem[mtcp->ctx->cpu]); 830 return; 831 } 832 /*----------------------------------------------------------------------------*/ 833 static void 834 RunMainLoop(struct mtcp_thread_context *ctx) 835 { 836 mtcp_manager_t mtcp = ctx->mtcp_manager; 837 int i; 838 int recv_cnt; 839 int rx_inf, tx_inf; 840 struct timeval cur_ts = {0}; 841 uint32_t ts, ts_prev; 842 843 #if TIME_STAT 844 struct timeval prev_ts, processing_ts, tcheck_ts, 845 epoll_ts, handle_ts, xmit_ts, select_ts; 846 #endif 847 int thresh; 848 849 gettimeofday(&cur_ts, NULL); 850 851 TRACE_DBG("CPU %d: mtcp thread running.\n", ctx->cpu); 852 853 #if TIME_STAT 854 prev_ts = cur_ts; 855 InitStatCounter(&mtcp->rtstat.round); 856 InitStatCounter(&mtcp->rtstat.processing); 857 InitStatCounter(&mtcp->rtstat.tcheck); 858 InitStatCounter(&mtcp->rtstat.epoll); 859 InitStatCounter(&mtcp->rtstat.handle); 860 InitStatCounter(&mtcp->rtstat.xmit); 861 InitStatCounter(&mtcp->rtstat.select); 862 #endif 863 864 ts = ts_prev = 0; 865 while ((!ctx->done || mtcp->flow_cnt) && !ctx->exit) { 866 867 STAT_COUNT(mtcp->runstat.rounds); 868 recv_cnt = 0; 869 gettimeofday(&cur_ts, NULL); 870 #if TIME_STAT 871 /* measure the inter-round delay */ 872 UpdateStatCounter(&mtcp->rtstat.round, TimeDiffUs(&cur_ts, &prev_ts)); 873 prev_ts = cur_ts; 874 #endif 875 876 ts = TIMEVAL_TO_TS(&cur_ts); 877 mtcp->cur_ts = ts; 878 879 for (rx_inf = 0; rx_inf < g_config.mos->netdev_table->num; rx_inf++) { 880 881 recv_cnt = mtcp->iom->recv_pkts(ctx, rx_inf); 882 STAT_COUNT(mtcp->runstat.rounds_rx_try); 883 884 for (i = 0; i < recv_cnt; i++) { 885 uint16_t len; 886 uint8_t *pktbuf; 887 pktbuf = mtcp->iom->get_rptr(mtcp->ctx, rx_inf, i, &len); 888 ProcessPacket(mtcp, rx_inf, i, ts, pktbuf, len); 889 } 890 891 #ifdef ENABLE_DPDKR 892 mtcp->iom->send_pkts(ctx, rx_inf); 893 continue; 894 #endif 895 } 896 STAT_COUNT(mtcp->runstat.rounds_rx); 897 898 #if TIME_STAT 899 gettimeofday(&processing_ts, NULL); 900 UpdateStatCounter(&mtcp->rtstat.processing, 901 TimeDiffUs(&processing_ts, &cur_ts)); 902 #endif /* TIME_STAT */ 903 904 /* Handle user defined timeout */ 905 struct timer *walk, *tmp; 906 for (walk = TAILQ_FIRST(&mtcp->timer_list); walk != NULL; walk = tmp) { 907 tmp = TAILQ_NEXT(walk, timer_link); 908 if (TIMEVAL_LT(&cur_ts, &walk->exp)) 909 break; 910 911 struct mtcp_context mctx = {.cpu = ctx->cpu}; 912 walk->cb(&mctx, walk->id, 0, 0 /* FIXME */, NULL); 913 DelTimer(mtcp, walk); 914 } 915 916 /* interaction with application */ 917 if (mtcp->flow_cnt > 0) { 918 919 /* check retransmission timeout and timewait expire */ 920 #if 0 921 thresh = (int)mtcp->flow_cnt / (TS_TO_USEC(PER_STREAM_TCHECK)); 922 assert(thresh >= 0); 923 if (thresh == 0) 924 thresh = 1; 925 if (recv_cnt > 0 && thresh > recv_cnt) 926 thresh = recv_cnt; 927 #else 928 thresh = g_config.mos->max_concurrency; 929 #endif 930 931 /* Eunyoung, you may fix this later 932 * if there is no rcv packet, we will send as much as possible 933 */ 934 if (thresh == -1) 935 thresh = g_config.mos->max_concurrency; 936 937 CheckRtmTimeout(mtcp, ts, thresh); 938 CheckTimewaitExpire(mtcp, ts, thresh); 939 940 if (g_config.mos->tcp_timeout > 0 && ts != ts_prev) { 941 CheckConnectionTimeout(mtcp, ts, thresh); 942 } 943 944 #if TIME_STAT 945 } 946 gettimeofday(&tcheck_ts, NULL); 947 UpdateStatCounter(&mtcp->rtstat.tcheck, 948 TimeDiffUs(&tcheck_ts, &processing_ts)); 949 950 if (mtcp->flow_cnt > 0) { 951 #endif /* TIME_STAT */ 952 953 } 954 955 /* 956 * before flushing epoll events, call monitor events for 957 * all registered `read` events 958 */ 959 if (mtcp->num_msp > 0) 960 /* call this when only a standalone monitor is running */ 961 FlushMonitorReadEvents(mtcp); 962 963 /* if epoll is in use, flush all the queued events */ 964 if (mtcp->ep) { 965 FlushBufferedReadEvents(mtcp); 966 FlushEpollEvents(mtcp, ts); 967 } 968 #if TIME_STAT 969 gettimeofday(&epoll_ts, NULL); 970 UpdateStatCounter(&mtcp->rtstat.epoll, 971 TimeDiffUs(&epoll_ts, &tcheck_ts)); 972 #endif /* TIME_STAT */ 973 974 if (end_app_exists && mtcp->flow_cnt > 0) { 975 /* handle stream queues */ 976 HandleApplicationCalls(mtcp, ts); 977 } 978 979 #ifdef ENABLE_DPDKR 980 continue; 981 #endif 982 983 #if TIME_STAT 984 gettimeofday(&handle_ts, NULL); 985 UpdateStatCounter(&mtcp->rtstat.handle, 986 TimeDiffUs(&handle_ts, &epoll_ts)); 987 #endif /* TIME_STAT */ 988 989 WritePacketsToChunks(mtcp, ts); 990 991 /* send packets from write buffer */ 992 /* Send until tx is available */ 993 int num_dev = g_config.mos->netdev_table->num; 994 if (likely(mtcp->iom->send_pkts != NULL)) 995 for (tx_inf = 0; tx_inf < num_dev; tx_inf++) { 996 mtcp->iom->send_pkts(ctx, tx_inf); 997 } 998 999 #if TIME_STAT 1000 gettimeofday(&xmit_ts, NULL); 1001 UpdateStatCounter(&mtcp->rtstat.xmit, 1002 TimeDiffUs(&xmit_ts, &handle_ts)); 1003 #endif /* TIME_STAT */ 1004 1005 if (ts != ts_prev) { 1006 ts_prev = ts; 1007 #ifdef NETSTAT 1008 if (ctx->cpu == printer) { 1009 #ifdef RUN_ARP 1010 ARPTimer(mtcp, ts); 1011 #endif 1012 #ifdef NETSTAT 1013 PrintNetworkStats(mtcp, ts); 1014 #endif 1015 } 1016 #endif /* NETSTAT */ 1017 } 1018 1019 if (mtcp->iom->select) 1020 mtcp->iom->select(ctx); 1021 1022 if (ctx->interrupt) { 1023 InterruptApplication(mtcp); 1024 } 1025 } 1026 1027 #if TESTING 1028 DestroyRemainingFlows(mtcp); 1029 #endif 1030 1031 TRACE_DBG("MTCP thread %d out of main loop.\n", ctx->cpu); 1032 /* flush logs */ 1033 flush_log_data(mtcp); 1034 TRACE_DBG("MTCP thread %d flushed logs.\n", ctx->cpu); 1035 InterruptApplication(mtcp); 1036 TRACE_INFO("MTCP thread %d finished.\n", ctx->cpu); 1037 } 1038 /*----------------------------------------------------------------------------*/ 1039 struct mtcp_sender * 1040 CreateMTCPSender(int ifidx) 1041 { 1042 struct mtcp_sender *sender; 1043 1044 sender = (struct mtcp_sender *)calloc(1, sizeof(struct mtcp_sender)); 1045 if (!sender) { 1046 return NULL; 1047 } 1048 1049 sender->ifidx = ifidx; 1050 1051 TAILQ_INIT(&sender->control_list); 1052 TAILQ_INIT(&sender->send_list); 1053 TAILQ_INIT(&sender->ack_list); 1054 1055 sender->control_list_cnt = 0; 1056 sender->send_list_cnt = 0; 1057 sender->ack_list_cnt = 0; 1058 1059 return sender; 1060 } 1061 /*----------------------------------------------------------------------------*/ 1062 void 1063 DestroyMTCPSender(struct mtcp_sender *sender) 1064 { 1065 free(sender); 1066 } 1067 /*----------------------------------------------------------------------------*/ 1068 static mtcp_manager_t 1069 InitializeMTCPManager(struct mtcp_thread_context* ctx) 1070 { 1071 mtcp_manager_t mtcp; 1072 char log_name[MAX_FILE_NAME]; 1073 int i; 1074 1075 posix_seq_srand((unsigned)pthread_self()); 1076 1077 mtcp = (mtcp_manager_t)calloc(1, sizeof(struct mtcp_manager)); 1078 if (!mtcp) { 1079 perror("malloc"); 1080 fprintf(stderr, "Failed to allocate mtcp_manager.\n"); 1081 return NULL; 1082 } 1083 g_mtcp[ctx->cpu] = mtcp; 1084 1085 mtcp->tcp_flow_table = CreateHashtable(); 1086 if (!mtcp->tcp_flow_table) { 1087 CTRACE_ERROR("Falied to allocate tcp flow table.\n"); 1088 return NULL; 1089 } 1090 1091 #ifdef HUGEPAGE 1092 #define IS_HUGEPAGE 1 1093 #else 1094 #define IS_HUGEPAGE 0 1095 #endif 1096 if (mon_app_exists) { 1097 /* initialize event callback */ 1098 #ifdef NEWEV 1099 InitEvent(mtcp); 1100 #else 1101 InitEvent(mtcp, NUM_EV_TABLE); 1102 #endif 1103 } 1104 1105 if (!(mtcp->bufseg_pool = MPCreate(sizeof(tcpbufseg_t), 1106 sizeof(tcpbufseg_t) * g_config.mos->max_concurrency * 1107 ((g_config.mos->rmem_size - 1) / UNITBUFSIZE + 1), 0))) { 1108 TRACE_ERROR("Failed to allocate ev_table pool\n"); 1109 exit(0); 1110 } 1111 if (!(mtcp->sockent_pool = MPCreate(sizeof(struct sockent), 1112 sizeof(struct sockent) * g_config.mos->max_concurrency * 3, 0))) { 1113 TRACE_ERROR("Failed to allocate ev_table pool\n"); 1114 exit(0); 1115 } 1116 #ifdef USE_TIMER_POOL 1117 if (!(mtcp->timer_pool = MPCreate(sizeof(struct timer), 1118 sizeof(struct timer) * g_config.mos->max_concurrency * 10, 0))) { 1119 TRACE_ERROR("Failed to allocate ev_table pool\n"); 1120 exit(0); 1121 } 1122 #endif 1123 mtcp->flow_pool = MPCreate(sizeof(tcp_stream), 1124 sizeof(tcp_stream) * g_config.mos->max_concurrency, IS_HUGEPAGE); 1125 if (!mtcp->flow_pool) { 1126 CTRACE_ERROR("Failed to allocate tcp flow pool.\n"); 1127 return NULL; 1128 } 1129 mtcp->rv_pool = MPCreate(sizeof(struct tcp_recv_vars), 1130 sizeof(struct tcp_recv_vars) * g_config.mos->max_concurrency, IS_HUGEPAGE); 1131 if (!mtcp->rv_pool) { 1132 CTRACE_ERROR("Failed to allocate tcp recv variable pool.\n"); 1133 return NULL; 1134 } 1135 mtcp->sv_pool = MPCreate(sizeof(struct tcp_send_vars), 1136 sizeof(struct tcp_send_vars) * g_config.mos->max_concurrency, IS_HUGEPAGE); 1137 if (!mtcp->sv_pool) { 1138 CTRACE_ERROR("Failed to allocate tcp send variable pool.\n"); 1139 return NULL; 1140 } 1141 1142 mtcp->rbm_snd = SBManagerCreate(g_config.mos->wmem_size, g_config.mos->no_ring_buffers, 1143 g_config.mos->max_concurrency); 1144 if (!mtcp->rbm_snd) { 1145 CTRACE_ERROR("Failed to create send ring buffer.\n"); 1146 return NULL; 1147 } 1148 1149 mtcp->smap = (socket_map_t)calloc(g_config.mos->max_concurrency, sizeof(struct socket_map)); 1150 if (!mtcp->smap) { 1151 perror("calloc"); 1152 CTRACE_ERROR("Failed to allocate memory for stream map.\n"); 1153 return NULL; 1154 } 1155 1156 if (mon_app_exists) { 1157 mtcp->msmap = (socket_map_t)calloc(g_config.mos->max_concurrency, sizeof(struct socket_map)); 1158 if (!mtcp->msmap) { 1159 perror("calloc"); 1160 CTRACE_ERROR("Failed to allocate memory for monitor stream map.\n"); 1161 return NULL; 1162 } 1163 1164 for (i = 0; i < g_config.mos->max_concurrency; i++) { 1165 mtcp->msmap[i].monitor_stream = calloc(1, sizeof(struct mon_stream)); 1166 if (!mtcp->msmap[i].monitor_stream) { 1167 perror("calloc"); 1168 CTRACE_ERROR("Failed to allocate memory for monitr stream map\n"); 1169 return NULL; 1170 } 1171 } 1172 } 1173 1174 TAILQ_INIT(&mtcp->timer_list); 1175 TAILQ_INIT(&mtcp->monitors); 1176 1177 TAILQ_INIT(&mtcp->free_smap); 1178 for (i = 0; i < g_config.mos->max_concurrency; i++) { 1179 mtcp->smap[i].id = i; 1180 mtcp->smap[i].socktype = MOS_SOCK_UNUSED; 1181 memset(&mtcp->smap[i].saddr, 0, sizeof(struct sockaddr_in)); 1182 mtcp->smap[i].stream = NULL; 1183 TAILQ_INSERT_TAIL(&mtcp->free_smap, &mtcp->smap[i], link); 1184 } 1185 1186 if (mon_app_exists) { 1187 TAILQ_INIT(&mtcp->free_msmap); 1188 for (i = 0; i < g_config.mos->max_concurrency; i++) { 1189 mtcp->msmap[i].id = i; 1190 mtcp->msmap[i].socktype = MOS_SOCK_UNUSED; 1191 memset(&mtcp->msmap[i].saddr, 0, sizeof(struct sockaddr_in)); 1192 TAILQ_INSERT_TAIL(&mtcp->free_msmap, &mtcp->msmap[i], link); 1193 } 1194 } 1195 1196 mtcp->ctx = ctx; 1197 mtcp->ep = NULL; 1198 1199 snprintf(log_name, MAX_FILE_NAME, "%s/"LOG_FILE_NAME"_%d", 1200 g_config.mos->mos_log, ctx->cpu); 1201 mtcp->log_fp = fopen(log_name, "w+"); 1202 if (!mtcp->log_fp) { 1203 perror("fopen"); 1204 CTRACE_ERROR("Failed to create file for logging. (%s)\n", log_name); 1205 return NULL; 1206 } 1207 mtcp->sp_fd = g_logctx[ctx->cpu]->pair_sp_fd; 1208 mtcp->logger = g_logctx[ctx->cpu]; 1209 1210 mtcp->connectq = CreateStreamQueue(BACKLOG_SIZE); 1211 if (!mtcp->connectq) { 1212 CTRACE_ERROR("Failed to create connect queue.\n"); 1213 return NULL; 1214 } 1215 mtcp->sendq = CreateStreamQueue(g_config.mos->max_concurrency); 1216 if (!mtcp->sendq) { 1217 CTRACE_ERROR("Failed to create send queue.\n"); 1218 return NULL; 1219 } 1220 mtcp->ackq = CreateStreamQueue(g_config.mos->max_concurrency); 1221 if (!mtcp->ackq) { 1222 CTRACE_ERROR("Failed to create ack queue.\n"); 1223 return NULL; 1224 } 1225 mtcp->closeq = CreateStreamQueue(g_config.mos->max_concurrency); 1226 if (!mtcp->closeq) { 1227 CTRACE_ERROR("Failed to create close queue.\n"); 1228 return NULL; 1229 } 1230 mtcp->closeq_int = CreateInternalStreamQueue(g_config.mos->max_concurrency); 1231 if (!mtcp->closeq_int) { 1232 CTRACE_ERROR("Failed to create close queue.\n"); 1233 return NULL; 1234 } 1235 mtcp->resetq = CreateStreamQueue(g_config.mos->max_concurrency); 1236 if (!mtcp->resetq) { 1237 CTRACE_ERROR("Failed to create reset queue.\n"); 1238 return NULL; 1239 } 1240 mtcp->resetq_int = CreateInternalStreamQueue(g_config.mos->max_concurrency); 1241 if (!mtcp->resetq_int) { 1242 CTRACE_ERROR("Failed to create reset queue.\n"); 1243 return NULL; 1244 } 1245 mtcp->destroyq = CreateStreamQueue(g_config.mos->max_concurrency); 1246 if (!mtcp->destroyq) { 1247 CTRACE_ERROR("Failed to create destroy queue.\n"); 1248 return NULL; 1249 } 1250 1251 mtcp->g_sender = CreateMTCPSender(-1); 1252 if (!mtcp->g_sender) { 1253 CTRACE_ERROR("Failed to create global sender structure.\n"); 1254 return NULL; 1255 } 1256 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 1257 mtcp->n_sender[i] = CreateMTCPSender(i); 1258 if (!mtcp->n_sender[i]) { 1259 CTRACE_ERROR("Failed to create per-nic sender structure.\n"); 1260 return NULL; 1261 } 1262 } 1263 1264 mtcp->rto_store = InitRTOHashstore(); 1265 TAILQ_INIT(&mtcp->timewait_list); 1266 TAILQ_INIT(&mtcp->timeout_list); 1267 1268 return mtcp; 1269 } 1270 /*----------------------------------------------------------------------------*/ 1271 static void * 1272 MTCPRunThread(void *arg) 1273 { 1274 mctx_t mctx = (mctx_t)arg; 1275 int cpu = mctx->cpu; 1276 int working; 1277 struct mtcp_manager *mtcp; 1278 struct mtcp_thread_context *ctx; 1279 1280 /* affinitize the thread to this core first */ 1281 mtcp_core_affinitize(cpu); 1282 1283 /* memory alloc after core affinitization would use local memory 1284 most time */ 1285 ctx = calloc(1, sizeof(*ctx)); 1286 if (!ctx) { 1287 perror("calloc"); 1288 TRACE_ERROR("Failed to calloc mtcp context.\n"); 1289 exit(-1); 1290 } 1291 ctx->thread = pthread_self(); 1292 ctx->cpu = cpu; 1293 mtcp = ctx->mtcp_manager = InitializeMTCPManager(ctx); 1294 if (!mtcp) { 1295 TRACE_ERROR("Failed to initialize mtcp manager.\n"); 1296 exit(-1); 1297 } 1298 1299 /* assign mtcp context's underlying I/O module */ 1300 mtcp->iom = current_iomodule_func; 1301 1302 /* I/O initializing */ 1303 if (mtcp->iom->init_handle) 1304 mtcp->iom->init_handle(ctx); 1305 1306 if (pthread_mutex_init(&ctx->flow_pool_lock, NULL)) { 1307 perror("pthread_mutex_init of ctx->flow_pool_lock\n"); 1308 exit(-1); 1309 } 1310 1311 if (pthread_mutex_init(&ctx->socket_pool_lock, NULL)) { 1312 perror("pthread_mutex_init of ctx->socket_pool_lock\n"); 1313 exit(-1); 1314 } 1315 1316 SQ_LOCK_INIT(&ctx->connect_lock, "ctx->connect_lock", exit(-1)); 1317 SQ_LOCK_INIT(&ctx->close_lock, "ctx->close_lock", exit(-1)); 1318 SQ_LOCK_INIT(&ctx->reset_lock, "ctx->reset_lock", exit(-1)); 1319 SQ_LOCK_INIT(&ctx->sendq_lock, "ctx->sendq_lock", exit(-1)); 1320 SQ_LOCK_INIT(&ctx->ackq_lock, "ctx->ackq_lock", exit(-1)); 1321 SQ_LOCK_INIT(&ctx->destroyq_lock, "ctx->destroyq_lock", exit(-1)); 1322 1323 /* remember this context pointer for signal processing */ 1324 g_pctx[cpu] = ctx; 1325 mlockall(MCL_CURRENT); 1326 1327 // attach (nic device, queue) 1328 working = AttachDevice(ctx); 1329 if (working != 0) { 1330 sem_post(&g_init_sem[ctx->cpu]); 1331 TRACE_DBG("MTCP thread %d finished. Not attached any device\n", ctx->cpu); 1332 pthread_exit(NULL); 1333 } 1334 1335 TRACE_DBG("CPU %d: initialization finished.\n", cpu); 1336 sem_post(&g_init_sem[ctx->cpu]); 1337 1338 /* start the main loop */ 1339 RunMainLoop(ctx); 1340 1341 TRACE_DBG("MTCP thread %d finished.\n", ctx->cpu); 1342 1343 /* signaling mTCP thread is done */ 1344 sem_post(&g_done_sem[mctx->cpu]); 1345 1346 //pthread_exit(NULL); 1347 return 0; 1348 } 1349 /*----------------------------------------------------------------------------*/ 1350 #ifdef ENABLE_DPDK 1351 static int MTCPDPDKRunThread(void *arg) 1352 { 1353 MTCPRunThread(arg); 1354 return 0; 1355 } 1356 #endif /* !ENABLE_DPDK */ 1357 /*----------------------------------------------------------------------------*/ 1358 mctx_t 1359 mtcp_create_context(int cpu) 1360 { 1361 mctx_t mctx; 1362 int ret; 1363 1364 if (cpu >= g_config.mos->num_cores) { 1365 TRACE_ERROR("Failed initialize new mtcp context. " 1366 "Requested cpu id %d exceed the number of cores %d configured to use.\n", 1367 cpu, g_config.mos->num_cores); 1368 return NULL; 1369 } 1370 1371 /* check if mtcp_create_context() was already initialized */ 1372 if (g_logctx[cpu] != NULL) { 1373 TRACE_ERROR("%s was already initialized before!\n", 1374 __FUNCTION__); 1375 return NULL; 1376 } 1377 1378 ret = sem_init(&g_init_sem[cpu], 0, 0); 1379 if (ret) { 1380 TRACE_ERROR("Failed initialize init_sem.\n"); 1381 return NULL; 1382 } 1383 1384 ret = sem_init(&g_done_sem[cpu], 0, 0); 1385 if (ret) { 1386 TRACE_ERROR("Failed initialize done_sem.\n"); 1387 return NULL; 1388 } 1389 1390 mctx = (mctx_t)calloc(1, sizeof(struct mtcp_context)); 1391 if (!mctx) { 1392 TRACE_ERROR("Failed to allocate memory for mtcp_context.\n"); 1393 return NULL; 1394 } 1395 mctx->cpu = cpu; 1396 g_ctx[cpu] = mctx; 1397 1398 /* initialize logger */ 1399 g_logctx[cpu] = (struct log_thread_context *) 1400 calloc(1, sizeof(struct log_thread_context)); 1401 if (!g_logctx[cpu]) { 1402 perror("malloc"); 1403 TRACE_ERROR("Failed to allocate memory for log thread context.\n"); 1404 return NULL; 1405 } 1406 InitLogThreadContext(g_logctx[cpu], cpu); 1407 if (pthread_create(&log_thread[cpu], 1408 NULL, ThreadLogMain, (void *)g_logctx[cpu])) { 1409 perror("pthread_create"); 1410 TRACE_ERROR("Failed to create log thread\n"); 1411 return NULL; 1412 } 1413 1414 #ifdef ENABLE_DPDK 1415 /* Wake up mTCP threads (wake up I/O threads) */ 1416 if (current_iomodule_func == &dpdk_module_func) { 1417 int master; 1418 master = rte_get_master_lcore(); 1419 if (master == cpu) { 1420 lcore_config[master].ret = 0; 1421 lcore_config[master].state = FINISHED; 1422 if (pthread_create(&g_thread[cpu], 1423 NULL, MTCPRunThread, (void *)mctx) != 0) { 1424 TRACE_ERROR("pthread_create of mtcp thread failed!\n"); 1425 return NULL; 1426 } 1427 } else 1428 rte_eal_remote_launch(MTCPDPDKRunThread, mctx, cpu); 1429 } else 1430 #endif /* !ENABLE_DPDK */ 1431 { 1432 if (pthread_create(&g_thread[cpu], 1433 NULL, MTCPRunThread, (void *)mctx) != 0) { 1434 TRACE_ERROR("pthread_create of mtcp thread failed!\n"); 1435 return NULL; 1436 } 1437 } 1438 1439 sem_wait(&g_init_sem[cpu]); 1440 sem_destroy(&g_init_sem[cpu]); 1441 1442 running[cpu] = TRUE; 1443 1444 #ifdef NETSTAT 1445 #if NETSTAT_TOTAL 1446 if (printer < 0) { 1447 printer = cpu; 1448 TRACE_INFO("CPU %d is in charge of printing stats.\n", printer); 1449 } 1450 #endif 1451 #endif 1452 1453 return mctx; 1454 } 1455 /*----------------------------------------------------------------------------*/ 1456 /** 1457 * TODO: It currently always returns 0. Add appropriate error return values 1458 */ 1459 int 1460 mtcp_destroy_context(mctx_t mctx) 1461 { 1462 struct mtcp_thread_context *ctx = g_pctx[mctx->cpu]; 1463 struct mtcp_manager *mtcp = ctx->mtcp_manager; 1464 struct log_thread_context *log_ctx = mtcp->logger; 1465 int ret, i; 1466 1467 TRACE_DBG("CPU %d: mtcp_destroy_context()\n", mctx->cpu); 1468 1469 /* close all stream sockets that are still open */ 1470 if (!ctx->exit) { 1471 for (i = 0; i < g_config.mos->max_concurrency; i++) { 1472 if (mtcp->smap[i].socktype == MOS_SOCK_STREAM) { 1473 TRACE_DBG("Closing remaining socket %d (%s)\n", 1474 i, TCPStateToString(mtcp->smap[i].stream)); 1475 #ifdef DUMP_STREAM 1476 DumpStream(mtcp, mtcp->smap[i].stream); 1477 #endif 1478 mtcp_close(mctx, i); 1479 } 1480 } 1481 } 1482 1483 ctx->done = 1; 1484 1485 //pthread_kill(g_thread[mctx->cpu], SIGINT); 1486 #ifdef ENABLE_DPDK 1487 ctx->exit = 1; 1488 /* XXX - dpdk logic changes */ 1489 if (current_iomodule_func == &dpdk_module_func) { 1490 int master = rte_get_master_lcore(); 1491 if (master == mctx->cpu) 1492 pthread_join(g_thread[mctx->cpu], NULL); 1493 else 1494 rte_eal_wait_lcore(mctx->cpu); 1495 } else 1496 #endif /* !ENABLE_DPDK */ 1497 { 1498 pthread_join(g_thread[mctx->cpu], NULL); 1499 } 1500 1501 TRACE_INFO("MTCP thread %d joined.\n", mctx->cpu); 1502 running[mctx->cpu] = FALSE; 1503 1504 #ifdef NETSTAT 1505 #if NETSTAT_TOTAL 1506 if (printer == mctx->cpu) { 1507 for (i = 0; i < num_cpus; i++) { 1508 if (i != mctx->cpu && running[i]) { 1509 printer = i; 1510 break; 1511 } 1512 } 1513 } 1514 #endif 1515 #endif 1516 1517 log_ctx->done = 1; 1518 ret = write(log_ctx->pair_sp_fd, "F", 1); 1519 if (ret != 1) 1520 TRACE_ERROR("CPU %d: Fail to signal socket pair\n", mctx->cpu); 1521 1522 pthread_join(log_thread[ctx->cpu], NULL); 1523 fclose(mtcp->log_fp); 1524 TRACE_LOG("Log thread %d joined.\n", mctx->cpu); 1525 1526 if (mtcp->connectq) { 1527 DestroyStreamQueue(mtcp->connectq); 1528 mtcp->connectq = NULL; 1529 } 1530 if (mtcp->sendq) { 1531 DestroyStreamQueue(mtcp->sendq); 1532 mtcp->sendq = NULL; 1533 } 1534 if (mtcp->ackq) { 1535 DestroyStreamQueue(mtcp->ackq); 1536 mtcp->ackq = NULL; 1537 } 1538 if (mtcp->closeq) { 1539 DestroyStreamQueue(mtcp->closeq); 1540 mtcp->closeq = NULL; 1541 } 1542 if (mtcp->closeq_int) { 1543 DestroyInternalStreamQueue(mtcp->closeq_int); 1544 mtcp->closeq_int = NULL; 1545 } 1546 if (mtcp->resetq) { 1547 DestroyStreamQueue(mtcp->resetq); 1548 mtcp->resetq = NULL; 1549 } 1550 if (mtcp->resetq_int) { 1551 DestroyInternalStreamQueue(mtcp->resetq_int); 1552 mtcp->resetq_int = NULL; 1553 } 1554 if (mtcp->destroyq) { 1555 DestroyStreamQueue(mtcp->destroyq); 1556 mtcp->destroyq = NULL; 1557 } 1558 1559 DestroyMTCPSender(mtcp->g_sender); 1560 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 1561 DestroyMTCPSender(mtcp->n_sender[i]); 1562 } 1563 1564 MPDestroy(mtcp->rv_pool); 1565 MPDestroy(mtcp->sv_pool); 1566 MPDestroy(mtcp->flow_pool); 1567 1568 if (mtcp->ap) { 1569 DestroyAddressPool(mtcp->ap); 1570 } 1571 1572 SQ_LOCK_DESTROY(&ctx->connect_lock); 1573 SQ_LOCK_DESTROY(&ctx->close_lock); 1574 SQ_LOCK_DESTROY(&ctx->reset_lock); 1575 SQ_LOCK_DESTROY(&ctx->sendq_lock); 1576 SQ_LOCK_DESTROY(&ctx->ackq_lock); 1577 SQ_LOCK_DESTROY(&ctx->destroyq_lock); 1578 1579 //TRACE_INFO("MTCP thread %d destroyed.\n", mctx->cpu); 1580 if (mtcp->iom->destroy_handle) 1581 mtcp->iom->destroy_handle(ctx); 1582 free(ctx); 1583 free(mctx); 1584 1585 return 0; 1586 } 1587 /*----------------------------------------------------------------------------*/ 1588 mtcp_sighandler_t 1589 mtcp_register_signal(int signum, mtcp_sighandler_t handler) 1590 { 1591 mtcp_sighandler_t prev; 1592 1593 if (signum == SIGINT) { 1594 prev = app_signal_handler; 1595 app_signal_handler = handler; 1596 } else { 1597 if ((prev = signal(signum, handler)) == SIG_ERR) { 1598 perror("signal"); 1599 return SIG_ERR; 1600 } 1601 } 1602 1603 return prev; 1604 } 1605 /*----------------------------------------------------------------------------*/ 1606 int 1607 mtcp_getconf(struct mtcp_conf *conf) 1608 { 1609 int i, j; 1610 1611 if (!conf) { 1612 errno = EINVAL; 1613 return -1; 1614 } 1615 1616 conf->num_cores = g_config.mos->num_cores; 1617 conf->max_concurrency = g_config.mos->max_concurrency; 1618 conf->cpu_mask = g_config.mos->cpu_mask; 1619 1620 conf->rcvbuf_size = g_config.mos->rmem_size; 1621 conf->sndbuf_size = g_config.mos->wmem_size; 1622 1623 conf->tcp_timewait = g_config.mos->tcp_tw_interval; 1624 conf->tcp_timeout = g_config.mos->tcp_timeout; 1625 1626 i = 0; 1627 struct conf_block *bwalk; 1628 TAILQ_FOREACH(bwalk, &g_config.app_blkh, link) { 1629 struct app_conf *app_conf = (struct app_conf *)bwalk->conf; 1630 for (j = 0; j < app_conf->app_argc; j++) 1631 conf->app_argv[i][j] = app_conf->app_argv[j]; 1632 conf->app_argc[i] = app_conf->app_argc; 1633 conf->app_cpu_mask[i] = app_conf->cpu_mask; 1634 i++; 1635 } 1636 conf->num_app = i; 1637 1638 return 0; 1639 } 1640 /*----------------------------------------------------------------------------*/ 1641 int 1642 mtcp_setconf(const struct mtcp_conf *conf) 1643 { 1644 if (!conf) 1645 return -1; 1646 1647 g_config.mos->num_cores = conf->num_cores; 1648 g_config.mos->max_concurrency = conf->max_concurrency; 1649 1650 g_config.mos->rmem_size = conf->rcvbuf_size; 1651 g_config.mos->wmem_size = conf->sndbuf_size; 1652 1653 g_config.mos->tcp_tw_interval = conf->tcp_timewait; 1654 g_config.mos->tcp_timeout = conf->tcp_timeout; 1655 1656 TRACE_CONFIG("Configuration updated by mtcp_setconf().\n"); 1657 //PrintConfiguration(); 1658 1659 return 0; 1660 } 1661 /*----------------------------------------------------------------------------*/ 1662 int 1663 mtcp_init(const char *config_file) 1664 { 1665 int i; 1666 int ret; 1667 1668 if (geteuid()) { 1669 TRACE_CONFIG("[CAUTION] Run as root if mlock is necessary.\n"); 1670 #if defined(ENABLE_DPDK) || defined(ENABLE_DPDKR) || defined(ENABLE_NETMAP) 1671 TRACE_CONFIG("[CAUTION] Run the app as root!\n"); 1672 exit(EXIT_FAILURE); 1673 #endif 1674 } 1675 1676 /* getting cpu and NIC */ 1677 num_cpus = GetNumCPUs(); 1678 assert(num_cpus >= 1); 1679 for (i = 0; i < num_cpus; i++) { 1680 g_mtcp[i] = NULL; 1681 running[i] = FALSE; 1682 sigint_cnt[i] = 0; 1683 } 1684 1685 ret = LoadConfigurationUpperHalf(config_file); 1686 if (ret) { 1687 TRACE_CONFIG("Error occured while loading configuration.\n"); 1688 return -1; 1689 } 1690 1691 #if defined(ENABLE_PSIO) 1692 current_iomodule_func = &ps_module_func; 1693 #elif defined(ENABLE_DPDK) 1694 current_iomodule_func = &dpdk_module_func; 1695 #elif defined(ENABLE_PCAP) 1696 current_iomodule_func = &pcap_module_func; 1697 #elif defined(ENABLE_DPDKR) 1698 current_iomodule_func = &dpdkr_module_func; 1699 #elif defined(ENABLE_NETMAP) 1700 current_iomodule_func = &netmap_module_func; 1701 #endif 1702 1703 if (current_iomodule_func->load_module_upper_half) 1704 current_iomodule_func->load_module_upper_half(); 1705 1706 LoadConfigurationLowerHalf(); 1707 1708 //PrintConfiguration(); 1709 1710 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 1711 ap[i] = CreateAddressPool(g_config.mos->netdev_table->ent[i]->ip_addr, 1); 1712 if (!ap[i]) { 1713 TRACE_CONFIG("Error occured while create address pool[%d]\n", 1714 i); 1715 return -1; 1716 } 1717 } 1718 1719 //PrintInterfaceInfo(); 1720 //PrintRoutingTable(); 1721 //PrintARPTable(); 1722 InitARPTable(); 1723 1724 if (signal(SIGUSR1, HandleSignal) == SIG_ERR) { 1725 perror("signal, SIGUSR1"); 1726 return -1; 1727 } 1728 if (signal(SIGINT, HandleSignal) == SIG_ERR) { 1729 perror("signal, SIGINT"); 1730 return -1; 1731 } 1732 app_signal_handler = NULL; 1733 1734 printf("load_module(): %p\n", current_iomodule_func); 1735 /* load system-wide io module specs */ 1736 if (current_iomodule_func->load_module_lower_half) 1737 current_iomodule_func->load_module_lower_half(); 1738 1739 GlobInitEvent(); 1740 1741 PrintConf(&g_config); 1742 1743 return 0; 1744 } 1745 /*----------------------------------------------------------------------------*/ 1746 int 1747 mtcp_destroy() 1748 { 1749 int i; 1750 1751 /* wait until all threads are closed */ 1752 for (i = 0; i < num_cpus; i++) { 1753 if (running[i]) { 1754 if (pthread_join(g_thread[i], NULL) != 0) 1755 return -1; 1756 } 1757 } 1758 1759 for (i = 0; i < g_config.mos->netdev_table->num; i++) 1760 DestroyAddressPool(ap[i]); 1761 1762 TRACE_INFO("All MTCP threads are joined.\n"); 1763 1764 return 0; 1765 } 1766 /*----------------------------------------------------------------------------*/ 1767