1 #define _GNU_SOURCE 2 #include <sched.h> 3 #include <unistd.h> 4 #include <sys/time.h> 5 #include <semaphore.h> 6 #include <sys/mman.h> 7 #include <signal.h> 8 #include <assert.h> 9 #include <string.h> 10 11 #include "cpu.h" 12 #include "eth_in.h" 13 #include "fhash.h" 14 #include "tcp_send_buffer.h" 15 #include "tcp_ring_buffer.h" 16 #include "socket.h" 17 #include "eth_out.h" 18 #include "tcp.h" 19 #include "tcp_in.h" 20 #include "tcp_out.h" 21 #include "mtcp_api.h" 22 #include "eventpoll.h" 23 #include "logger.h" 24 #include "config.h" 25 #include "arp.h" 26 #include "ip_out.h" 27 #include "timer.h" 28 #include "debug.h" 29 #include "event_callback.h" 30 #include "tcp_rb.h" 31 #include "tcp_stream.h" 32 #include "io_module.h" 33 34 #ifdef ENABLE_PSIO 35 #include "ps.h" 36 #endif 37 38 #ifdef ENABLE_DPDK 39 /* for launching rte thread */ 40 #include <rte_launch.h> 41 #include <rte_lcore.h> 42 #endif /* !ENABLE_DPDK */ 43 #define PS_CHUNK_SIZE 64 44 #define RX_THRESH (PS_CHUNK_SIZE * 0.8) 45 46 #define ROUND_STAT FALSE 47 #define TIME_STAT FALSE 48 #define EVENT_STAT FALSE 49 #define TESTING FALSE 50 51 #define LOG_FILE_NAME "log" 52 #define MAX_FILE_NAME 1024 53 54 #define MAX(a, b) ((a)>(b)?(a):(b)) 55 #define MIN(a, b) ((a)<(b)?(a):(b)) 56 57 #define PER_STREAM_SLICE 0.1 // in ms 58 #define PER_STREAM_TCHECK 1 // in ms 59 #define PS_SELECT_TIMEOUT 100 // in us 60 61 #define GBPS(bytes) (bytes * 8.0 / (1000 * 1000 * 1000)) 62 63 /*----------------------------------------------------------------------------*/ 64 /* handlers for threads */ 65 struct mtcp_thread_context *g_pctx[MAX_CPUS] = {0}; 66 struct log_thread_context *g_logctx[MAX_CPUS] = {0}; 67 /*----------------------------------------------------------------------------*/ 68 static pthread_t g_thread[MAX_CPUS] = {0}; 69 static pthread_t log_thread[MAX_CPUS] = {0}; 70 /*----------------------------------------------------------------------------*/ 71 static sem_t g_init_sem[MAX_CPUS]; 72 static sem_t g_done_sem[MAX_CPUS]; 73 static int running[MAX_CPUS] = {0}; 74 /*----------------------------------------------------------------------------*/ 75 mtcp_sighandler_t app_signal_handler; 76 static int sigint_cnt[MAX_CPUS] = {0}; 77 static struct timespec sigint_ts[MAX_CPUS]; 78 /*----------------------------------------------------------------------------*/ 79 #ifdef NETSTAT 80 #if NETSTAT_TOTAL 81 static int printer = -1; 82 #if ROUND_STAT 83 #endif /* ROUND_STAT */ 84 #endif /* NETSTAT_TOTAL */ 85 #endif /* NETSTAT */ 86 /*----------------------------------------------------------------------------*/ 87 void 88 HandleSignal(int signal) 89 { 90 int i = 0; 91 92 if (signal == SIGINT) { 93 #ifdef DARWIN 94 int core = 0; 95 #else 96 int core = sched_getcpu(); 97 #endif 98 struct timespec cur_ts; 99 100 clock_gettime(CLOCK_REALTIME, &cur_ts); 101 102 if (sigint_cnt[core] > 0 && cur_ts.tv_sec == sigint_ts[core].tv_sec) { 103 for (i = 0; i < g_config.mos->num_cores; i++) { 104 if (running[i]) { 105 exit(0); 106 g_pctx[i]->exit = TRUE; 107 } 108 } 109 } else { 110 for (i = 0; i < g_config.mos->num_cores; i++) { 111 if (g_pctx[i]) 112 g_pctx[i]->interrupt = TRUE; 113 } 114 if (!app_signal_handler) { 115 for (i = 0; i < g_config.mos->num_cores; i++) { 116 if (running[i]) { 117 exit(0); 118 g_pctx[i]->exit = TRUE; 119 } 120 } 121 } 122 } 123 sigint_cnt[core]++; 124 clock_gettime(CLOCK_REALTIME, &sigint_ts[core]); 125 } 126 127 if (signal != SIGUSR1) { 128 if (app_signal_handler) { 129 app_signal_handler(signal); 130 } 131 } 132 } 133 /*----------------------------------------------------------------------------*/ 134 static int 135 AttachDevice(struct mtcp_thread_context* ctx) 136 { 137 int working = -1; 138 mtcp_manager_t mtcp = ctx->mtcp_manager; 139 140 if (mtcp->iom->link_devices) 141 working = mtcp->iom->link_devices(ctx); 142 else 143 return 0; 144 145 return working; 146 } 147 /*----------------------------------------------------------------------------*/ 148 #ifdef TIMESTAT 149 static inline void 150 InitStatCounter(struct stat_counter *counter) 151 { 152 counter->cnt = 0; 153 counter->sum = 0; 154 counter->max = 0; 155 counter->min = 0; 156 } 157 /*----------------------------------------------------------------------------*/ 158 static inline void 159 UpdateStatCounter(struct stat_counter *counter, int64_t value) 160 { 161 counter->cnt++; 162 counter->sum += value; 163 if (value > counter->max) 164 counter->max = value; 165 if (counter->min == 0 || value < counter->min) 166 counter->min = value; 167 } 168 /*----------------------------------------------------------------------------*/ 169 static inline uint64_t 170 GetAverageStat(struct stat_counter *counter) 171 { 172 return counter->cnt ? (counter->sum / counter->cnt) : 0; 173 } 174 /*----------------------------------------------------------------------------*/ 175 static inline int64_t 176 TimeDiffUs(struct timeval *t2, struct timeval *t1) 177 { 178 return (t2->tv_sec - t1->tv_sec) * 1000000 + 179 (int64_t)(t2->tv_usec - t1->tv_usec); 180 } 181 /*----------------------------------------------------------------------------*/ 182 #endif 183 #ifdef NETSTAT 184 static inline void 185 PrintThreadNetworkStats(mtcp_manager_t mtcp, struct net_stat *ns) 186 { 187 int i; 188 189 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 190 ns->rx_packets[i] = mtcp->nstat.rx_packets[i] - mtcp->p_nstat.rx_packets[i]; 191 ns->rx_errors[i] = mtcp->nstat.rx_errors[i] - mtcp->p_nstat.rx_errors[i]; 192 ns->rx_bytes[i] = mtcp->nstat.rx_bytes[i] - mtcp->p_nstat.rx_bytes[i]; 193 ns->tx_packets[i] = mtcp->nstat.tx_packets[i] - mtcp->p_nstat.tx_packets[i]; 194 ns->tx_drops[i] = mtcp->nstat.tx_drops[i] - mtcp->p_nstat.tx_drops[i]; 195 ns->tx_bytes[i] = mtcp->nstat.tx_bytes[i] - mtcp->p_nstat.tx_bytes[i]; 196 #if NETSTAT_PERTHREAD 197 if (g_config.mos->netdev_table->ent[i]->stat_print) { 198 fprintf(stderr, "[CPU%2d] %s flows: %6u, " 199 "RX: %7llu(pps) (err: %5llu), %5.2lf(Gbps), " 200 "TX: %7llu(pps), %5.2lf(Gbps)\n", 201 mtcp->ctx->cpu, 202 g_config.mos->netdev_table->ent[i]->dev_name, 203 (unsigned)mtcp->flow_cnt, 204 (long long unsigned)ns->rx_packets[i], 205 (long long unsigned)ns->rx_errors[i], 206 GBPS(ns->rx_bytes[i]), 207 (long long unsigned)ns->tx_packets[i], 208 GBPS(ns->tx_bytes[i])); 209 } 210 #endif 211 } 212 mtcp->p_nstat = mtcp->nstat; 213 214 } 215 /*----------------------------------------------------------------------------*/ 216 #if ROUND_STAT 217 static inline void 218 PrintThreadRoundStats(mtcp_manager_t mtcp, struct run_stat *rs) 219 { 220 #define ROUND_DIV (1000) 221 rs->rounds = mtcp->runstat.rounds - mtcp->p_runstat.rounds; 222 rs->rounds_rx = mtcp->runstat.rounds_rx - mtcp->p_runstat.rounds_rx; 223 rs->rounds_rx_try = mtcp->runstat.rounds_rx_try - mtcp->p_runstat.rounds_rx_try; 224 rs->rounds_tx = mtcp->runstat.rounds_tx - mtcp->p_runstat.rounds_tx; 225 rs->rounds_tx_try = mtcp->runstat.rounds_tx_try - mtcp->p_runstat.rounds_tx_try; 226 rs->rounds_select = mtcp->runstat.rounds_select - mtcp->p_runstat.rounds_select; 227 rs->rounds_select_rx = mtcp->runstat.rounds_select_rx - mtcp->p_runstat.rounds_select_rx; 228 rs->rounds_select_tx = mtcp->runstat.rounds_select_tx - mtcp->p_runstat.rounds_select_tx; 229 rs->rounds_select_intr = mtcp->runstat.rounds_select_intr - mtcp->p_runstat.rounds_select_intr; 230 rs->rounds_twcheck = mtcp->runstat.rounds_twcheck - mtcp->p_runstat.rounds_twcheck; 231 mtcp->p_runstat = mtcp->runstat; 232 #if NETSTAT_PERTHREAD 233 fprintf(stderr, "[CPU%2d] Rounds: %4lluK, " 234 "rx: %3lluK (try: %4lluK), tx: %3lluK (try: %4lluK), " 235 "ps_select: %4llu (rx: %4llu, tx: %4llu, intr: %3llu)\n", 236 mtcp->ctx->cpu, rs->rounds / ROUND_DIV, 237 rs->rounds_rx / ROUND_DIV, rs->rounds_rx_try / ROUND_DIV, 238 rs->rounds_tx / ROUND_DIV, rs->rounds_tx_try / ROUND_DIV, 239 rs->rounds_select, 240 rs->rounds_select_rx, rs->rounds_select_tx, rs->rounds_select_intr); 241 #endif 242 } 243 #endif /* ROUND_STAT */ 244 /*----------------------------------------------------------------------------*/ 245 #if TIME_STAT 246 static inline void 247 PrintThreadRoundTime(mtcp_manager_t mtcp) 248 { 249 fprintf(stderr, "[CPU%2d] Time: (avg, max) " 250 "round: (%4luus, %4luus), processing: (%4luus, %4luus), " 251 "tcheck: (%4luus, %4luus), epoll: (%4luus, %4luus), " 252 "handle: (%4luus, %4luus), xmit: (%4luus, %4luus), " 253 "select: (%4luus, %4luus)\n", mtcp->ctx->cpu, 254 GetAverageStat(&mtcp->rtstat.round), mtcp->rtstat.round.max, 255 GetAverageStat(&mtcp->rtstat.processing), mtcp->rtstat.processing.max, 256 GetAverageStat(&mtcp->rtstat.tcheck), mtcp->rtstat.tcheck.max, 257 GetAverageStat(&mtcp->rtstat.epoll), mtcp->rtstat.epoll.max, 258 GetAverageStat(&mtcp->rtstat.handle), mtcp->rtstat.handle.max, 259 GetAverageStat(&mtcp->rtstat.xmit), mtcp->rtstat.xmit.max, 260 GetAverageStat(&mtcp->rtstat.select), mtcp->rtstat.select.max); 261 262 InitStatCounter(&mtcp->rtstat.round); 263 InitStatCounter(&mtcp->rtstat.processing); 264 InitStatCounter(&mtcp->rtstat.tcheck); 265 InitStatCounter(&mtcp->rtstat.epoll); 266 InitStatCounter(&mtcp->rtstat.handle); 267 InitStatCounter(&mtcp->rtstat.xmit); 268 InitStatCounter(&mtcp->rtstat.select); 269 } 270 #endif 271 #endif /* NETSTAT */ 272 /*----------------------------------------------------------------------------*/ 273 #if EVENT_STAT 274 static inline void 275 PrintEventStat(int core, struct mtcp_epoll_stat *stat) 276 { 277 fprintf(stderr, "[CPU%2d] calls: %lu, waits: %lu, wakes: %lu, " 278 "issued: %lu, registered: %lu, invalidated: %lu, handled: %lu\n", 279 core, stat->calls, stat->waits, stat->wakes, 280 stat->issued, stat->registered, stat->invalidated, stat->handled); 281 memset(stat, 0, sizeof(struct mtcp_epoll_stat)); 282 } 283 #endif /* EVENT_STAT */ 284 /*----------------------------------------------------------------------------*/ 285 #ifdef NETSTAT 286 static inline void 287 PrintNetworkStats(mtcp_manager_t mtcp, uint32_t cur_ts) 288 { 289 #define TIMEOUT 1 290 int i; 291 struct net_stat ns; 292 bool stat_print = false; 293 #if ROUND_STAT 294 struct run_stat rs; 295 #endif /* ROUND_STAT */ 296 #ifdef NETSTAT_TOTAL 297 static double peak_total_rx_gbps = 0; 298 static double peak_total_tx_gbps = 0; 299 static double avg_total_rx_gbps = 0; 300 static double avg_total_tx_gbps = 0; 301 302 double total_rx_gbps = 0, total_tx_gbps = 0; 303 int j; 304 uint32_t gflow_cnt = 0; 305 struct net_stat g_nstat; 306 #if ROUND_STAT 307 struct run_stat g_runstat; 308 #endif /* ROUND_STAT */ 309 #endif /* NETSTAT_TOTAL */ 310 311 if (TS_TO_MSEC(cur_ts - mtcp->p_nstat_ts) < SEC_TO_MSEC(TIMEOUT)) { 312 return; 313 } 314 315 mtcp->p_nstat_ts = cur_ts; 316 gflow_cnt = 0; 317 memset(&g_nstat, 0, sizeof(struct net_stat)); 318 for (i = 0; i < g_config.mos->num_cores; i++) { 319 if (running[i]) { 320 PrintThreadNetworkStats(g_mtcp[i], &ns); 321 #if NETSTAT_TOTAL 322 gflow_cnt += g_mtcp[i]->flow_cnt; 323 for (j = 0; j < g_config.mos->netdev_table->num; j++) { 324 g_nstat.rx_packets[j] += ns.rx_packets[j]; 325 g_nstat.rx_errors[j] += ns.rx_errors[j]; 326 g_nstat.rx_bytes[j] += ns.rx_bytes[j]; 327 g_nstat.tx_packets[j] += ns.tx_packets[j]; 328 g_nstat.tx_drops[j] += ns.tx_drops[j]; 329 g_nstat.tx_bytes[j] += ns.tx_bytes[j]; 330 } 331 #endif 332 } 333 } 334 #if NETSTAT_TOTAL 335 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 336 if (g_config.mos->netdev_table->ent[i]->stat_print) { 337 fprintf(stderr, "[ ALL ] %s, " 338 "RX: %7llu(pps) (err: %5llu), %5.2lf(Gbps), " 339 "TX: %7llu(pps), %5.2lf(Gbps)\n", 340 g_config.mos->netdev_table->ent[i]->dev_name, 341 (long long unsigned)g_nstat.rx_packets[i], 342 (long long unsigned)g_nstat.rx_errors[i], 343 GBPS(g_nstat.rx_bytes[i]), 344 (long long unsigned)g_nstat.tx_packets[i], 345 GBPS(g_nstat.tx_bytes[i])); 346 total_rx_gbps += GBPS(g_nstat.rx_bytes[i]); 347 total_tx_gbps += GBPS(g_nstat.tx_bytes[i]); 348 stat_print = true; 349 } 350 } 351 if (stat_print) { 352 fprintf(stderr, "[ ALL ] flows: %6u\n", gflow_cnt); 353 if (avg_total_rx_gbps == 0) 354 avg_total_rx_gbps = total_rx_gbps; 355 else 356 avg_total_rx_gbps = avg_total_rx_gbps * 0.6 + total_rx_gbps * 0.4; 357 358 if (avg_total_tx_gbps == 0) 359 avg_total_tx_gbps = total_tx_gbps; 360 else 361 avg_total_tx_gbps = avg_total_tx_gbps * 0.6 + total_tx_gbps * 0.4; 362 363 if (peak_total_rx_gbps < total_rx_gbps) 364 peak_total_rx_gbps = total_rx_gbps; 365 if (peak_total_tx_gbps < total_tx_gbps) 366 peak_total_tx_gbps = total_tx_gbps; 367 368 fprintf(stderr, "[ PEAK ] RX: %5.2lf(Gbps), TX: %5.2lf(Gbps)\n" 369 "[ RECENT AVG ] RX: %5.2lf(Gbps), TX: %5.2lf(Gbps)\n", 370 peak_total_rx_gbps, peak_total_tx_gbps, 371 avg_total_rx_gbps, avg_total_tx_gbps); 372 } 373 #endif 374 375 #if ROUND_STAT 376 memset(&g_runstat, 0, sizeof(struct run_stat)); 377 for (i = 0; i < g_config.mos->num_cores; i++) { 378 if (running[i]) { 379 PrintThreadRoundStats(g_mtcp[i], &rs); 380 #if DBGMSG 381 g_runstat.rounds += rs.rounds; 382 g_runstat.rounds_rx += rs.rounds_rx; 383 g_runstat.rounds_rx_try += rs.rounds_rx_try; 384 g_runstat.rounds_tx += rs.rounds_tx; 385 g_runstat.rounds_tx_try += rs.rounds_tx_try; 386 g_runstat.rounds_select += rs.rounds_select; 387 g_runstat.rounds_select_rx += rs.rounds_select_rx; 388 g_runstat.rounds_select_tx += rs.rounds_select_tx; 389 #endif 390 } 391 } 392 393 TRACE_DBG("[ ALL ] Rounds: %4ldK, " 394 "rx: %3ldK (try: %4ldK), tx: %3ldK (try: %4ldK), " 395 "ps_select: %4ld (rx: %4ld, tx: %4ld)\n", 396 g_runstat.rounds / 1000, g_runstat.rounds_rx / 1000, 397 g_runstat.rounds_rx_try / 1000, g_runstat.rounds_tx / 1000, 398 g_runstat.rounds_tx_try / 1000, g_runstat.rounds_select, 399 g_runstat.rounds_select_rx, g_runstat.rounds_select_tx); 400 #endif /* ROUND_STAT */ 401 402 #if TIME_STAT 403 for (i = 0; i < g_config.mos->num_cores; i++) { 404 if (running[i]) { 405 PrintThreadRoundTime(g_mtcp[i]); 406 } 407 } 408 #endif 409 410 #if EVENT_STAT 411 for (i = 0; i < g_config.mos->num_cores; i++) { 412 if (running[i] && g_mtcp[i]->ep) { 413 PrintEventStat(i, &g_mtcp[i]->ep->stat); 414 } 415 } 416 #endif 417 418 fflush(stderr); 419 } 420 #endif /* NETSTAT */ 421 /*----------------------------------------------------------------------------*/ 422 static inline void 423 FlushMonitorReadEvents(mtcp_manager_t mtcp) 424 { 425 struct event_queue *mtcpq; 426 struct tcp_stream *cur_stream; 427 struct mon_listener *walk; 428 429 /* check if monitor sockets should be passed data */ 430 TAILQ_FOREACH(walk, &mtcp->monitors, link) { 431 if (walk->socket->socktype != MOS_SOCK_MONITOR_STREAM || 432 !(mtcpq = walk->eq)) 433 continue; 434 435 while (mtcpq->num_events > 0) { 436 cur_stream = 437 (struct tcp_stream *)mtcpq->events[mtcpq->start++].ev.data.ptr; 438 /* only read events */ 439 if (cur_stream != NULL && 440 (cur_stream->socket->events | MOS_EPOLLIN)) { 441 if (cur_stream->rcvvar != NULL && 442 cur_stream->rcvvar->rcvbuf != NULL) { 443 /* no need to pass pkt context */ 444 struct socket_map *walk; 445 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 446 HandleCallback(mtcp, MOS_NULL, walk, 447 cur_stream->side, NULL, 448 MOS_ON_CONN_NEW_DATA); 449 } SOCKQ_FOREACH_END; 450 #ifndef NEWRB 451 /* re-adjust tcp_ring_buffer now */ 452 RBRemove(mtcp->rbm_rcv, cur_stream->rcvvar->rcvbuf, 453 cur_stream->rcvvar->rcvbuf->merged_len, 454 AT_MTCP, cur_stream->buffer_mgmt); 455 cur_stream->rcvvar->rcv_wnd = 456 cur_stream->rcvvar->rcvbuf->size 457 - 1 - cur_stream->rcvvar->rcvbuf->last_len; 458 #endif 459 } 460 /* reset the actions now */ 461 cur_stream->actions = 0; 462 } 463 if (mtcpq->start >= mtcpq->size) 464 mtcpq->start = 0; 465 mtcpq->num_events--; 466 } 467 } 468 } 469 /*----------------------------------------------------------------------------*/ 470 static inline void 471 FlushBufferedReadEvents(mtcp_manager_t mtcp) 472 { 473 int i; 474 int offset; 475 struct event_queue *mtcpq; 476 struct tcp_stream *cur_stream; 477 478 if (mtcp->ep == NULL) { 479 TRACE_EPOLL("No epoll socket has been registered yet!\n"); 480 return; 481 } else { 482 /* case when mtcpq exists */ 483 mtcpq = mtcp->ep->mtcp_queue; 484 offset = mtcpq->start; 485 } 486 487 /* we will use queued-up epoll read-in events 488 * to trigger buffered read monitor events */ 489 for (i = 0; i < mtcpq->num_events; i++) { 490 cur_stream = mtcp->smap[mtcpq->events[offset++].sockid].stream; 491 /* only read events */ 492 /* Raise new data callback event */ 493 if (cur_stream != NULL && 494 (cur_stream->socket->events | MOS_EPOLLIN)) { 495 if (cur_stream->rcvvar != NULL && 496 cur_stream->rcvvar->rcvbuf != NULL) { 497 /* no need to pass pkt context */ 498 struct socket_map *walk; 499 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 500 HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side, 501 NULL, MOS_ON_CONN_NEW_DATA); 502 } SOCKQ_FOREACH_END; 503 } 504 } 505 if (offset >= mtcpq->size) 506 offset = 0; 507 } 508 } 509 /*----------------------------------------------------------------------------*/ 510 static inline void 511 FlushEpollEvents(mtcp_manager_t mtcp, uint32_t cur_ts) 512 { 513 struct mtcp_epoll *ep = mtcp->ep; 514 struct event_queue *usrq = ep->usr_queue; 515 struct event_queue *mtcpq = ep->mtcp_queue; 516 517 pthread_mutex_lock(&ep->epoll_lock); 518 if (ep->mtcp_queue->num_events > 0) { 519 /* while mtcp_queue have events */ 520 /* and usr_queue is not full */ 521 while (mtcpq->num_events > 0 && usrq->num_events < usrq->size) { 522 /* copy the event from mtcp_queue to usr_queue */ 523 usrq->events[usrq->end++] = mtcpq->events[mtcpq->start++]; 524 525 if (usrq->end >= usrq->size) 526 usrq->end = 0; 527 usrq->num_events++; 528 529 if (mtcpq->start >= mtcpq->size) 530 mtcpq->start = 0; 531 mtcpq->num_events--; 532 } 533 } 534 535 /* if there are pending events, wake up user */ 536 if (ep->waiting && (ep->usr_queue->num_events > 0 || 537 ep->usr_shadow_queue->num_events > 0)) { 538 STAT_COUNT(mtcp->runstat.rounds_epoll); 539 TRACE_EPOLL("Broadcasting events. num: %d, cur_ts: %u, prev_ts: %u\n", 540 ep->usr_queue->num_events, cur_ts, mtcp->ts_last_event); 541 mtcp->ts_last_event = cur_ts; 542 ep->stat.wakes++; 543 pthread_cond_signal(&ep->epoll_cond); 544 } 545 pthread_mutex_unlock(&ep->epoll_lock); 546 } 547 /*----------------------------------------------------------------------------*/ 548 static inline void 549 HandleApplicationCalls(mtcp_manager_t mtcp, uint32_t cur_ts) 550 { 551 tcp_stream *stream; 552 int cnt, max_cnt; 553 int handled, delayed; 554 int control, send, ack; 555 556 /* connect handling */ 557 while ((stream = StreamDequeue(mtcp->connectq))) { 558 if (stream->state != TCP_ST_SYN_SENT) { 559 TRACE_INFO("Got a connection request from app with state: %s", 560 TCPStateToString(stream)); 561 exit(EXIT_FAILURE); 562 } else { 563 stream->cb_events |= MOS_ON_CONN_START | 564 MOS_ON_TCP_STATE_CHANGE; 565 /* if monitor is on... */ 566 if (stream->pair_stream != NULL) 567 stream->pair_stream->cb_events |= 568 MOS_ON_CONN_START; 569 } 570 AddtoControlList(mtcp, stream, cur_ts); 571 } 572 573 /* send queue handling */ 574 while ((stream = StreamDequeue(mtcp->sendq))) { 575 stream->sndvar->on_sendq = FALSE; 576 AddtoSendList(mtcp, stream); 577 } 578 579 /* ack queue handling */ 580 while ((stream = StreamDequeue(mtcp->ackq))) { 581 stream->sndvar->on_ackq = FALSE; 582 EnqueueACK(mtcp, stream, cur_ts, ACK_OPT_AGGREGATE); 583 } 584 585 /* close handling */ 586 handled = delayed = 0; 587 control = send = ack = 0; 588 while ((stream = StreamDequeue(mtcp->closeq))) { 589 struct tcp_send_vars *sndvar = stream->sndvar; 590 sndvar->on_closeq = FALSE; 591 592 if (sndvar->sndbuf) { 593 sndvar->fss = sndvar->sndbuf->head_seq + sndvar->sndbuf->len; 594 } else { 595 sndvar->fss = stream->snd_nxt; 596 } 597 598 if (g_config.mos->tcp_timeout > 0) 599 RemoveFromTimeoutList(mtcp, stream); 600 601 if (stream->have_reset) { 602 handled++; 603 if (stream->state != TCP_ST_CLOSED_RSVD) { 604 stream->close_reason = TCP_RESET; 605 stream->state = TCP_ST_CLOSED_RSVD; 606 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 607 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 608 DestroyTCPStream(mtcp, stream); 609 } else { 610 TRACE_ERROR("Stream already closed.\n"); 611 } 612 613 } else if (sndvar->on_control_list) { 614 sndvar->on_closeq_int = TRUE; 615 StreamInternalEnqueue(mtcp->closeq_int, stream); 616 delayed++; 617 if (sndvar->on_control_list) 618 control++; 619 if (sndvar->on_send_list) 620 send++; 621 if (sndvar->on_ack_list) 622 ack++; 623 624 } else if (sndvar->on_send_list || sndvar->on_ack_list) { 625 handled++; 626 if (stream->state == TCP_ST_ESTABLISHED) { 627 stream->state = TCP_ST_FIN_WAIT_1; 628 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 629 TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id); 630 631 } else if (stream->state == TCP_ST_CLOSE_WAIT) { 632 stream->state = TCP_ST_LAST_ACK; 633 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 634 TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id); 635 } 636 stream->control_list_waiting = TRUE; 637 638 } else if (stream->state != TCP_ST_CLOSED_RSVD) { 639 handled++; 640 if (stream->state == TCP_ST_ESTABLISHED) { 641 stream->state = TCP_ST_FIN_WAIT_1; 642 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 643 TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id); 644 645 } else if (stream->state == TCP_ST_CLOSE_WAIT) { 646 stream->state = TCP_ST_LAST_ACK; 647 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 648 TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id); 649 } 650 //sndvar->rto = TCP_FIN_RTO; 651 //UpdateRetransmissionTimer(mtcp, stream, mtcp->cur_ts); 652 AddtoControlList(mtcp, stream, cur_ts); 653 } else { 654 TRACE_ERROR("Already closed connection!\n"); 655 } 656 } 657 TRACE_ROUND("Handling close connections. cnt: %d\n", cnt); 658 659 cnt = 0; 660 max_cnt = mtcp->closeq_int->count; 661 while (cnt++ < max_cnt) { 662 stream = StreamInternalDequeue(mtcp->closeq_int); 663 664 if (stream->sndvar->on_control_list) { 665 StreamInternalEnqueue(mtcp->closeq_int, stream); 666 667 } else if (stream->state != TCP_ST_CLOSED_RSVD) { 668 handled++; 669 stream->sndvar->on_closeq_int = FALSE; 670 if (stream->state == TCP_ST_ESTABLISHED) { 671 stream->state = TCP_ST_FIN_WAIT_1; 672 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 673 TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id); 674 675 } else if (stream->state == TCP_ST_CLOSE_WAIT) { 676 stream->state = TCP_ST_LAST_ACK; 677 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 678 TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id); 679 } 680 AddtoControlList(mtcp, stream, cur_ts); 681 } else { 682 stream->sndvar->on_closeq_int = FALSE; 683 TRACE_ERROR("Already closed connection!\n"); 684 } 685 } 686 687 /* reset handling */ 688 while ((stream = StreamDequeue(mtcp->resetq))) { 689 stream->sndvar->on_resetq = FALSE; 690 691 if (g_config.mos->tcp_timeout > 0) 692 RemoveFromTimeoutList(mtcp, stream); 693 694 if (stream->have_reset) { 695 if (stream->state != TCP_ST_CLOSED_RSVD) { 696 stream->close_reason = TCP_RESET; 697 stream->state = TCP_ST_CLOSED_RSVD; 698 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 699 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 700 DestroyTCPStream(mtcp, stream); 701 } else { 702 TRACE_ERROR("Stream already closed.\n"); 703 } 704 705 } else if (stream->sndvar->on_control_list || 706 stream->sndvar->on_send_list || stream->sndvar->on_ack_list) { 707 /* wait until all the queues are flushed */ 708 stream->sndvar->on_resetq_int = TRUE; 709 StreamInternalEnqueue(mtcp->resetq_int, stream); 710 711 } else { 712 if (stream->state != TCP_ST_CLOSED_RSVD) { 713 stream->close_reason = TCP_ACTIVE_CLOSE; 714 stream->state = TCP_ST_CLOSED_RSVD; 715 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 716 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 717 AddtoControlList(mtcp, stream, cur_ts); 718 } else { 719 TRACE_ERROR("Stream already closed.\n"); 720 } 721 } 722 } 723 TRACE_ROUND("Handling reset connections. cnt: %d\n", cnt); 724 725 cnt = 0; 726 max_cnt = mtcp->resetq_int->count; 727 while (cnt++ < max_cnt) { 728 stream = StreamInternalDequeue(mtcp->resetq_int); 729 730 if (stream->sndvar->on_control_list || 731 stream->sndvar->on_send_list || stream->sndvar->on_ack_list) { 732 /* wait until all the queues are flushed */ 733 StreamInternalEnqueue(mtcp->resetq_int, stream); 734 735 } else { 736 stream->sndvar->on_resetq_int = FALSE; 737 738 if (stream->state != TCP_ST_CLOSED_RSVD) { 739 stream->close_reason = TCP_ACTIVE_CLOSE; 740 stream->state = TCP_ST_CLOSED_RSVD; 741 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 742 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 743 AddtoControlList(mtcp, stream, cur_ts); 744 } else { 745 TRACE_ERROR("Stream already closed.\n"); 746 } 747 } 748 } 749 750 /* destroy streams in destroyq */ 751 while ((stream = StreamDequeue(mtcp->destroyq))) { 752 DestroyTCPStream(mtcp, stream); 753 } 754 755 mtcp->wakeup_flag = FALSE; 756 } 757 /*----------------------------------------------------------------------------*/ 758 static inline void 759 WritePacketsToChunks(mtcp_manager_t mtcp, uint32_t cur_ts) 760 { 761 int thresh = g_config.mos->max_concurrency; 762 int i; 763 764 /* Set the threshold to g_config.mos->max_concurrency to send ACK immediately */ 765 /* Otherwise, set to appropriate value (e.g. thresh) */ 766 assert(mtcp->g_sender != NULL); 767 if (mtcp->g_sender->control_list_cnt) 768 WriteTCPControlList(mtcp, mtcp->g_sender, cur_ts, thresh); 769 if (mtcp->g_sender->ack_list_cnt) 770 WriteTCPACKList(mtcp, mtcp->g_sender, cur_ts, thresh); 771 if (mtcp->g_sender->send_list_cnt) 772 WriteTCPDataList(mtcp, mtcp->g_sender, cur_ts, thresh); 773 774 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 775 assert(mtcp->n_sender[i] != NULL); 776 if (mtcp->n_sender[i]->control_list_cnt) 777 WriteTCPControlList(mtcp, mtcp->n_sender[i], cur_ts, thresh); 778 if (mtcp->n_sender[i]->ack_list_cnt) 779 WriteTCPACKList(mtcp, mtcp->n_sender[i], cur_ts, thresh); 780 if (mtcp->n_sender[i]->send_list_cnt) 781 WriteTCPDataList(mtcp, mtcp->n_sender[i], cur_ts, thresh); 782 } 783 } 784 /*----------------------------------------------------------------------------*/ 785 #if TESTING 786 static int 787 DestroyRemainingFlows(mtcp_manager_t mtcp) 788 { 789 struct hashtable *ht = mtcp->tcp_flow_table; 790 tcp_stream *walk; 791 int cnt, i; 792 793 cnt = 0; 794 795 thread_printf(mtcp, mtcp->log_fp, 796 "CPU %d: Flushing remaining flows.\n", mtcp->ctx->cpu); 797 798 for (i = 0; i < NUM_BINS; i++) { 799 TAILQ_FOREACH(walk, &ht->ht_table[i], rcvvar->he_link) { 800 thread_printf(mtcp, mtcp->log_fp, 801 "CPU %d: Destroying stream %d\n", mtcp->ctx->cpu, walk->id); 802 DumpStream(mtcp, walk); 803 DestroyTCPStream(mtcp, walk); 804 cnt++; 805 } 806 } 807 808 return cnt; 809 } 810 #endif 811 /*----------------------------------------------------------------------------*/ 812 static void 813 InterruptApplication(mtcp_manager_t mtcp) 814 { 815 /* interrupt if the mtcp_epoll_wait() is waiting */ 816 if (mtcp->ep) { 817 pthread_mutex_lock(&mtcp->ep->epoll_lock); 818 if (mtcp->ep->waiting) { 819 pthread_cond_signal(&mtcp->ep->epoll_cond); 820 } 821 pthread_mutex_unlock(&mtcp->ep->epoll_lock); 822 } 823 /* interrupt if the accept() is waiting */ 824 if (mtcp->listener) { 825 if (mtcp->listener->socket) { 826 pthread_mutex_lock(&mtcp->listener->accept_lock); 827 if (!(mtcp->listener->socket->opts & MTCP_NONBLOCK)) { 828 pthread_cond_signal(&mtcp->listener->accept_cond); 829 } 830 pthread_mutex_unlock(&mtcp->listener->accept_lock); 831 } 832 } 833 } 834 /*----------------------------------------------------------------------------*/ 835 void 836 RunPassiveLoop(mtcp_manager_t mtcp) 837 { 838 sem_wait(&g_done_sem[mtcp->ctx->cpu]); 839 sem_destroy(&g_done_sem[mtcp->ctx->cpu]); 840 return; 841 } 842 /*----------------------------------------------------------------------------*/ 843 static void 844 RunMainLoop(struct mtcp_thread_context *ctx) 845 { 846 mtcp_manager_t mtcp = ctx->mtcp_manager; 847 int i; 848 int recv_cnt; 849 850 #if E_PSIO 851 int rx_inf, tx_inf; 852 #endif 853 854 struct timeval cur_ts = {0}; 855 uint32_t ts, ts_prev; 856 857 #if TIME_STAT 858 struct timeval prev_ts, processing_ts, tcheck_ts, 859 epoll_ts, handle_ts, xmit_ts, select_ts; 860 #endif 861 int thresh; 862 863 gettimeofday(&cur_ts, NULL); 864 865 #if E_PSIO 866 /* do nothing */ 867 #else 868 #if !USE_CHUNK_BUF 869 /* create packet write chunk */ 870 InitWriteChunks(handle, ctx->w_chunk); 871 for (i = 0; i < ETH_NUM; i++) { 872 ctx->w_chunk[i].cnt = 0; 873 ctx->w_off[i] = 0; 874 ctx->w_cur_idx[i] = 0; 875 } 876 #endif 877 #endif 878 879 TRACE_DBG("CPU %d: mtcp thread running.\n", ctx->cpu); 880 881 #if TIME_STAT 882 prev_ts = cur_ts; 883 InitStatCounter(&mtcp->rtstat.round); 884 InitStatCounter(&mtcp->rtstat.processing); 885 InitStatCounter(&mtcp->rtstat.tcheck); 886 InitStatCounter(&mtcp->rtstat.epoll); 887 InitStatCounter(&mtcp->rtstat.handle); 888 InitStatCounter(&mtcp->rtstat.xmit); 889 InitStatCounter(&mtcp->rtstat.select); 890 #endif 891 892 ts = ts_prev = 0; 893 while ((!ctx->done || mtcp->flow_cnt) && !ctx->exit) { 894 895 STAT_COUNT(mtcp->runstat.rounds); 896 recv_cnt = 0; 897 898 #if E_PSIO 899 #if 0 900 event.timeout = PS_SELECT_TIMEOUT; 901 NID_ZERO(event.rx_nids); 902 NID_ZERO(event.tx_nids); 903 NID_ZERO(rx_avail); 904 //NID_ZERO(tx_avail); 905 #endif 906 gettimeofday(&cur_ts, NULL); 907 #if TIME_STAT 908 /* measure the inter-round delay */ 909 UpdateStatCounter(&mtcp->rtstat.round, TimeDiffUs(&cur_ts, &prev_ts)); 910 prev_ts = cur_ts; 911 #endif 912 913 ts = TIMEVAL_TO_TS(&cur_ts); 914 mtcp->cur_ts = ts; 915 916 for (rx_inf = 0; rx_inf < g_config.mos->netdev_table->num; rx_inf++) { 917 918 recv_cnt = mtcp->iom->recv_pkts(ctx, rx_inf); 919 STAT_COUNT(mtcp->runstat.rounds_rx_try); 920 921 for (i = 0; i < recv_cnt; i++) { 922 uint16_t len; 923 uint8_t *pktbuf; 924 pktbuf = mtcp->iom->get_rptr(mtcp->ctx, rx_inf, i, &len); 925 ProcessPacket(mtcp, rx_inf, i, ts, pktbuf, len); 926 } 927 928 #ifdef ENABLE_DPDKR 929 mtcp->iom->send_pkts(ctx, rx_inf); 930 continue; 931 #endif 932 } 933 STAT_COUNT(mtcp->runstat.rounds_rx); 934 935 #else /* E_PSIO */ 936 gettimeofday(&cur_ts, NULL); 937 ts = TIMEVAL_TO_TS(&cur_ts); 938 mtcp->cur_ts = ts; 939 /* 940 * Read packets into a chunk from NIC 941 * chk_w_idx : next chunk index to write packets from NIC 942 */ 943 944 STAT_COUNT(mtcp->runstat.rounds_rx_try); 945 chunk.cnt = PS_CHUNK_SIZE; 946 recv_cnt = ps_recv_chunk(handle, &chunk); 947 if (recv_cnt < 0) { 948 if (errno != EAGAIN && errno != EINTR) { 949 perror("ps_recv_chunk"); 950 assert(0); 951 } 952 } 953 954 /* 955 * Handling Packets 956 * chk_r_idx : next chunk index to read and handle 957 */ 958 for (i = 0; i < recv_cnt; i++) { 959 ProcessPacket(mtcp, chunk.queue.ifindex, ts, 960 (u_char *)(chunk.buf + chunk.info[i].offset), 961 chunk.info[i].len); 962 } 963 964 if (recv_cnt > 0) 965 STAT_COUNT(mtcp->runstat.rounds_rx); 966 #endif /* E_PSIO */ 967 #if TIME_STAT 968 gettimeofday(&processing_ts, NULL); 969 UpdateStatCounter(&mtcp->rtstat.processing, 970 TimeDiffUs(&processing_ts, &cur_ts)); 971 #endif /* TIME_STAT */ 972 973 /* Handle user defined timeout */ 974 struct timer *walk, *tmp; 975 for (walk = TAILQ_FIRST(&mtcp->timer_list); walk != NULL; walk = tmp) { 976 tmp = TAILQ_NEXT(walk, timer_link); 977 if (TIMEVAL_LT(&cur_ts, &walk->exp)) 978 break; 979 980 struct mtcp_context mctx = {.cpu = ctx->cpu}; 981 walk->cb(&mctx, walk->id, 0, 0 /* FIXME */, NULL); 982 DelTimer(mtcp, walk); 983 } 984 985 /* interaction with application */ 986 if (mtcp->flow_cnt > 0) { 987 988 /* check retransmission timeout and timewait expire */ 989 #if 0 990 thresh = (int)mtcp->flow_cnt / (TS_TO_USEC(PER_STREAM_TCHECK)); 991 assert(thresh >= 0); 992 if (thresh == 0) 993 thresh = 1; 994 if (recv_cnt > 0 && thresh > recv_cnt) 995 thresh = recv_cnt; 996 #else 997 thresh = g_config.mos->max_concurrency; 998 #endif 999 1000 /* Eunyoung, you may fix this later 1001 * if there is no rcv packet, we will send as much as possible 1002 */ 1003 if (thresh == -1) 1004 thresh = g_config.mos->max_concurrency; 1005 1006 CheckRtmTimeout(mtcp, ts, thresh); 1007 CheckTimewaitExpire(mtcp, ts, thresh); 1008 1009 if (g_config.mos->tcp_timeout > 0 && ts != ts_prev) { 1010 CheckConnectionTimeout(mtcp, ts, thresh); 1011 } 1012 1013 #if TIME_STAT 1014 } 1015 gettimeofday(&tcheck_ts, NULL); 1016 UpdateStatCounter(&mtcp->rtstat.tcheck, 1017 TimeDiffUs(&tcheck_ts, &processing_ts)); 1018 1019 if (mtcp->flow_cnt > 0) { 1020 #endif /* TIME_STAT */ 1021 1022 } 1023 1024 /* 1025 * before flushing epoll events, call monitor events for 1026 * all registered `read` events 1027 */ 1028 if (mtcp->num_msp > 0) 1029 /* call this when only a standalone monitor is running */ 1030 FlushMonitorReadEvents(mtcp); 1031 1032 /* if epoll is in use, flush all the queued events */ 1033 if (mtcp->ep) { 1034 FlushBufferedReadEvents(mtcp); 1035 FlushEpollEvents(mtcp, ts); 1036 } 1037 #if TIME_STAT 1038 gettimeofday(&epoll_ts, NULL); 1039 UpdateStatCounter(&mtcp->rtstat.epoll, 1040 TimeDiffUs(&epoll_ts, &tcheck_ts)); 1041 #endif /* TIME_STAT */ 1042 1043 if (end_app_exists && mtcp->flow_cnt > 0) { 1044 /* handle stream queues */ 1045 HandleApplicationCalls(mtcp, ts); 1046 } 1047 1048 #ifdef ENABLE_DPDKR 1049 continue; 1050 #endif 1051 1052 #if TIME_STAT 1053 gettimeofday(&handle_ts, NULL); 1054 UpdateStatCounter(&mtcp->rtstat.handle, 1055 TimeDiffUs(&handle_ts, &epoll_ts)); 1056 #endif /* TIME_STAT */ 1057 1058 WritePacketsToChunks(mtcp, ts); 1059 1060 /* send packets from write buffer */ 1061 #if E_PSIO 1062 /* With E_PSIO, send until tx is available */ 1063 int num_dev = g_config.mos->netdev_table->num; 1064 if (likely(mtcp->iom->send_pkts != NULL)) 1065 for (tx_inf = 0; tx_inf < num_dev; tx_inf++) { 1066 mtcp->iom->send_pkts(ctx, tx_inf); 1067 } 1068 1069 #else /* E_PSIO */ 1070 /* Without E_PSIO, try send chunks immediately */ 1071 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 1072 #if USE_CHUNK_BUF 1073 /* in the case of using ps_send_chunk_buf() without E_PSIO */ 1074 ret = FlushSendChunkBuf(mtcp, i); 1075 #else 1076 /* if not using ps_send_chunk_buf() */ 1077 ret = FlushWriteBuffer(ctx, i); 1078 #endif 1079 if (ret < 0) { 1080 TRACE_ERROR("Failed to send chunks.\n"); 1081 } else if (ret > 0) { 1082 STAT_COUNT(mtcp->runstat.rounds_tx); 1083 } 1084 } 1085 #endif /* E_PSIO */ 1086 1087 #if TIME_STAT 1088 gettimeofday(&xmit_ts, NULL); 1089 UpdateStatCounter(&mtcp->rtstat.xmit, 1090 TimeDiffUs(&xmit_ts, &handle_ts)); 1091 #endif /* TIME_STAT */ 1092 1093 if (ts != ts_prev) { 1094 ts_prev = ts; 1095 #ifdef NETSTAT 1096 if (ctx->cpu == printer) { 1097 #ifdef RUN_ARP 1098 ARPTimer(mtcp, ts); 1099 #endif 1100 PrintNetworkStats(mtcp, ts); 1101 } 1102 #endif /* NETSTAT */ 1103 } 1104 1105 if (mtcp->iom->select) 1106 mtcp->iom->select(ctx); 1107 1108 if (ctx->interrupt) { 1109 InterruptApplication(mtcp); 1110 } 1111 } 1112 1113 #if TESTING 1114 DestroyRemainingFlows(mtcp); 1115 #endif 1116 1117 TRACE_DBG("MTCP thread %d out of main loop.\n", ctx->cpu); 1118 /* flush logs */ 1119 flush_log_data(mtcp); 1120 TRACE_DBG("MTCP thread %d flushed logs.\n", ctx->cpu); 1121 InterruptApplication(mtcp); 1122 TRACE_INFO("MTCP thread %d finished.\n", ctx->cpu); 1123 } 1124 /*----------------------------------------------------------------------------*/ 1125 struct mtcp_sender * 1126 CreateMTCPSender(int ifidx) 1127 { 1128 struct mtcp_sender *sender; 1129 1130 sender = (struct mtcp_sender *)calloc(1, sizeof(struct mtcp_sender)); 1131 if (!sender) { 1132 return NULL; 1133 } 1134 1135 sender->ifidx = ifidx; 1136 1137 TAILQ_INIT(&sender->control_list); 1138 TAILQ_INIT(&sender->send_list); 1139 TAILQ_INIT(&sender->ack_list); 1140 1141 sender->control_list_cnt = 0; 1142 sender->send_list_cnt = 0; 1143 sender->ack_list_cnt = 0; 1144 1145 return sender; 1146 } 1147 /*----------------------------------------------------------------------------*/ 1148 void 1149 DestroyMTCPSender(struct mtcp_sender *sender) 1150 { 1151 free(sender); 1152 } 1153 /*----------------------------------------------------------------------------*/ 1154 static mtcp_manager_t 1155 InitializeMTCPManager(struct mtcp_thread_context* ctx) 1156 { 1157 mtcp_manager_t mtcp; 1158 char log_name[MAX_FILE_NAME]; 1159 int i; 1160 1161 posix_seq_srand((unsigned)pthread_self()); 1162 1163 mtcp = (mtcp_manager_t)calloc(1, sizeof(struct mtcp_manager)); 1164 if (!mtcp) { 1165 perror("malloc"); 1166 CTRACE_ERROR("Failed to allocate mtcp_manager.\n"); 1167 return NULL; 1168 } 1169 g_mtcp[ctx->cpu] = mtcp; 1170 1171 mtcp->tcp_flow_table = CreateHashtable(); 1172 if (!mtcp->tcp_flow_table) { 1173 CTRACE_ERROR("Falied to allocate tcp flow table.\n"); 1174 return NULL; 1175 } 1176 1177 #ifdef HUGEPAGE 1178 #define IS_HUGEPAGE 1 1179 #else 1180 #define IS_HUGEPAGE 0 1181 #endif 1182 if (mon_app_exists) { 1183 /* initialize event callback */ 1184 #ifdef NEWEV 1185 InitEvent(mtcp); 1186 #else 1187 InitEvent(mtcp, NUM_EV_TABLE); 1188 #endif 1189 } 1190 1191 if (!(mtcp->bufseg_pool = MPCreate(sizeof(tcpbufseg_t), 1192 sizeof(tcpbufseg_t) * g_config.mos->max_concurrency * 1193 ((g_config.mos->rmem_size - 1) / UNITBUFSIZE + 1), 0))) { 1194 TRACE_ERROR("Failed to allocate ev_table pool\n"); 1195 exit(0); 1196 } 1197 if (!(mtcp->sockent_pool = MPCreate(sizeof(struct sockent), 1198 sizeof(struct sockent) * g_config.mos->max_concurrency * 3, 0))) { 1199 TRACE_ERROR("Failed to allocate ev_table pool\n"); 1200 exit(0); 1201 } 1202 #ifdef USE_TIMER_POOL 1203 if (!(mtcp->timer_pool = MPCreate(sizeof(struct timer), 1204 sizeof(struct timer) * g_config.mos->max_concurrency * 10, 0))) { 1205 TRACE_ERROR("Failed to allocate ev_table pool\n"); 1206 exit(0); 1207 } 1208 #endif 1209 mtcp->flow_pool = MPCreate(sizeof(tcp_stream), 1210 sizeof(tcp_stream) * g_config.mos->max_concurrency, IS_HUGEPAGE); 1211 if (!mtcp->flow_pool) { 1212 CTRACE_ERROR("Failed to allocate tcp flow pool.\n"); 1213 return NULL; 1214 } 1215 mtcp->rv_pool = MPCreate(sizeof(struct tcp_recv_vars), 1216 sizeof(struct tcp_recv_vars) * g_config.mos->max_concurrency, IS_HUGEPAGE); 1217 if (!mtcp->rv_pool) { 1218 CTRACE_ERROR("Failed to allocate tcp recv variable pool.\n"); 1219 return NULL; 1220 } 1221 mtcp->sv_pool = MPCreate(sizeof(struct tcp_send_vars), 1222 sizeof(struct tcp_send_vars) * g_config.mos->max_concurrency, IS_HUGEPAGE); 1223 if (!mtcp->sv_pool) { 1224 CTRACE_ERROR("Failed to allocate tcp send variable pool.\n"); 1225 return NULL; 1226 } 1227 1228 mtcp->rbm_snd = SBManagerCreate(g_config.mos->wmem_size, g_config.mos->no_ring_buffers, 1229 g_config.mos->max_concurrency); 1230 if (!mtcp->rbm_snd) { 1231 CTRACE_ERROR("Failed to create send ring buffer.\n"); 1232 return NULL; 1233 } 1234 #ifndef NEWRB 1235 mtcp->rbm_rcv = RBManagerCreate(g_config.mos->rmem_size, g_config.mos->no_ring_buffers, 1236 g_config.mos->max_concurrency); 1237 if (!mtcp->rbm_rcv) { 1238 CTRACE_ERROR("Failed to create recv ring buffer.\n"); 1239 return NULL; 1240 } 1241 #endif 1242 1243 mtcp->smap = (socket_map_t)calloc(g_config.mos->max_concurrency, sizeof(struct socket_map)); 1244 if (!mtcp->smap) { 1245 perror("calloc"); 1246 CTRACE_ERROR("Failed to allocate memory for stream map.\n"); 1247 return NULL; 1248 } 1249 1250 if (mon_app_exists) { 1251 mtcp->msmap = (socket_map_t)calloc(g_config.mos->max_concurrency, sizeof(struct socket_map)); 1252 if (!mtcp->msmap) { 1253 perror("calloc"); 1254 CTRACE_ERROR("Failed to allocate memory for monitor stream map.\n"); 1255 return NULL; 1256 } 1257 1258 for (i = 0; i < g_config.mos->max_concurrency; i++) { 1259 mtcp->msmap[i].monitor_stream = calloc(1, sizeof(struct mon_stream)); 1260 if (!mtcp->msmap[i].monitor_stream) { 1261 perror("calloc"); 1262 CTRACE_ERROR("Failed to allocate memory for monitr stream map\n"); 1263 return NULL; 1264 } 1265 } 1266 } 1267 1268 TAILQ_INIT(&mtcp->timer_list); 1269 TAILQ_INIT(&mtcp->monitors); 1270 1271 TAILQ_INIT(&mtcp->free_smap); 1272 for (i = 0; i < g_config.mos->max_concurrency; i++) { 1273 mtcp->smap[i].id = i; 1274 mtcp->smap[i].socktype = MOS_SOCK_UNUSED; 1275 memset(&mtcp->smap[i].saddr, 0, sizeof(struct sockaddr_in)); 1276 mtcp->smap[i].stream = NULL; 1277 TAILQ_INSERT_TAIL(&mtcp->free_smap, &mtcp->smap[i], link); 1278 } 1279 1280 if (mon_app_exists) { 1281 TAILQ_INIT(&mtcp->free_msmap); 1282 for (i = 0; i < g_config.mos->max_concurrency; i++) { 1283 mtcp->msmap[i].id = i; 1284 mtcp->msmap[i].socktype = MOS_SOCK_UNUSED; 1285 memset(&mtcp->msmap[i].saddr, 0, sizeof(struct sockaddr_in)); 1286 TAILQ_INSERT_TAIL(&mtcp->free_msmap, &mtcp->msmap[i], link); 1287 } 1288 } 1289 1290 mtcp->ctx = ctx; 1291 mtcp->ep = NULL; 1292 1293 snprintf(log_name, MAX_FILE_NAME, "%s/"LOG_FILE_NAME"_%d", 1294 g_config.mos->mos_log, ctx->cpu); 1295 mtcp->log_fp = fopen(log_name, "w+"); 1296 if (!mtcp->log_fp) { 1297 perror("fopen"); 1298 CTRACE_ERROR("Failed to create file for logging. (%s)\n", log_name); 1299 return NULL; 1300 } 1301 mtcp->sp_fd = g_logctx[ctx->cpu]->pair_sp_fd; 1302 mtcp->logger = g_logctx[ctx->cpu]; 1303 1304 mtcp->connectq = CreateStreamQueue(BACKLOG_SIZE); 1305 if (!mtcp->connectq) { 1306 CTRACE_ERROR("Failed to create connect queue.\n"); 1307 return NULL; 1308 } 1309 mtcp->sendq = CreateStreamQueue(g_config.mos->max_concurrency); 1310 if (!mtcp->sendq) { 1311 CTRACE_ERROR("Failed to create send queue.\n"); 1312 return NULL; 1313 } 1314 mtcp->ackq = CreateStreamQueue(g_config.mos->max_concurrency); 1315 if (!mtcp->ackq) { 1316 CTRACE_ERROR("Failed to create ack queue.\n"); 1317 return NULL; 1318 } 1319 mtcp->closeq = CreateStreamQueue(g_config.mos->max_concurrency); 1320 if (!mtcp->closeq) { 1321 CTRACE_ERROR("Failed to create close queue.\n"); 1322 return NULL; 1323 } 1324 mtcp->closeq_int = CreateInternalStreamQueue(g_config.mos->max_concurrency); 1325 if (!mtcp->closeq_int) { 1326 CTRACE_ERROR("Failed to create close queue.\n"); 1327 return NULL; 1328 } 1329 mtcp->resetq = CreateStreamQueue(g_config.mos->max_concurrency); 1330 if (!mtcp->resetq) { 1331 CTRACE_ERROR("Failed to create reset queue.\n"); 1332 return NULL; 1333 } 1334 mtcp->resetq_int = CreateInternalStreamQueue(g_config.mos->max_concurrency); 1335 if (!mtcp->resetq_int) { 1336 CTRACE_ERROR("Failed to create reset queue.\n"); 1337 return NULL; 1338 } 1339 mtcp->destroyq = CreateStreamQueue(g_config.mos->max_concurrency); 1340 if (!mtcp->destroyq) { 1341 CTRACE_ERROR("Failed to create destroy queue.\n"); 1342 return NULL; 1343 } 1344 1345 mtcp->g_sender = CreateMTCPSender(-1); 1346 if (!mtcp->g_sender) { 1347 CTRACE_ERROR("Failed to create global sender structure.\n"); 1348 return NULL; 1349 } 1350 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 1351 mtcp->n_sender[i] = CreateMTCPSender(i); 1352 if (!mtcp->n_sender[i]) { 1353 CTRACE_ERROR("Failed to create per-nic sender structure.\n"); 1354 return NULL; 1355 } 1356 } 1357 1358 mtcp->rto_store = InitRTOHashstore(); 1359 TAILQ_INIT(&mtcp->timewait_list); 1360 TAILQ_INIT(&mtcp->timeout_list); 1361 1362 return mtcp; 1363 } 1364 /*----------------------------------------------------------------------------*/ 1365 static void * 1366 MTCPRunThread(void *arg) 1367 { 1368 mctx_t mctx = (mctx_t)arg; 1369 int cpu = mctx->cpu; 1370 int working; 1371 struct mtcp_manager *mtcp; 1372 struct mtcp_thread_context *ctx; 1373 1374 /* affinitize the thread to this core first */ 1375 mtcp_core_affinitize(cpu); 1376 1377 /* memory alloc after core affinitization would use local memory 1378 most time */ 1379 ctx = calloc(1, sizeof(*ctx)); 1380 if (!ctx) { 1381 perror("calloc"); 1382 TRACE_ERROR("Failed to calloc mtcp context.\n"); 1383 exit(-1); 1384 } 1385 ctx->thread = pthread_self(); 1386 ctx->cpu = cpu; 1387 mtcp = ctx->mtcp_manager = InitializeMTCPManager(ctx); 1388 if (!mtcp) { 1389 TRACE_ERROR("Failed to initialize mtcp manager.\n"); 1390 exit(-1); 1391 } 1392 1393 /* assign mtcp context's underlying I/O module */ 1394 mtcp->iom = current_iomodule_func; 1395 1396 /* I/O initializing */ 1397 if (mtcp->iom->init_handle) 1398 mtcp->iom->init_handle(ctx); 1399 1400 if (pthread_mutex_init(&ctx->flow_pool_lock, NULL)) { 1401 perror("pthread_mutex_init of ctx->flow_pool_lock\n"); 1402 exit(-1); 1403 } 1404 1405 if (pthread_mutex_init(&ctx->socket_pool_lock, NULL)) { 1406 perror("pthread_mutex_init of ctx->socket_pool_lock\n"); 1407 exit(-1); 1408 } 1409 1410 SQ_LOCK_INIT(&ctx->connect_lock, "ctx->connect_lock", exit(-1)); 1411 SQ_LOCK_INIT(&ctx->close_lock, "ctx->close_lock", exit(-1)); 1412 SQ_LOCK_INIT(&ctx->reset_lock, "ctx->reset_lock", exit(-1)); 1413 SQ_LOCK_INIT(&ctx->sendq_lock, "ctx->sendq_lock", exit(-1)); 1414 SQ_LOCK_INIT(&ctx->ackq_lock, "ctx->ackq_lock", exit(-1)); 1415 SQ_LOCK_INIT(&ctx->destroyq_lock, "ctx->destroyq_lock", exit(-1)); 1416 1417 /* remember this context pointer for signal processing */ 1418 g_pctx[cpu] = ctx; 1419 mlockall(MCL_CURRENT); 1420 1421 // attach (nic device, queue) 1422 working = AttachDevice(ctx); 1423 if (working != 0) { 1424 sem_post(&g_init_sem[ctx->cpu]); 1425 TRACE_DBG("MTCP thread %d finished. Not attached any device\n", ctx->cpu); 1426 pthread_exit(NULL); 1427 } 1428 1429 TRACE_DBG("CPU %d: initialization finished.\n", cpu); 1430 sem_post(&g_init_sem[ctx->cpu]); 1431 1432 /* start the main loop */ 1433 RunMainLoop(ctx); 1434 1435 TRACE_DBG("MTCP thread %d finished.\n", ctx->cpu); 1436 1437 /* signaling mTCP thread is done */ 1438 sem_post(&g_done_sem[mctx->cpu]); 1439 1440 //pthread_exit(NULL); 1441 return 0; 1442 } 1443 /*----------------------------------------------------------------------------*/ 1444 #ifdef ENABLE_DPDK 1445 static int MTCPDPDKRunThread(void *arg) 1446 { 1447 MTCPRunThread(arg); 1448 return 0; 1449 } 1450 #endif /* !ENABLE_DPDK */ 1451 /*----------------------------------------------------------------------------*/ 1452 mctx_t 1453 mtcp_create_context(int cpu) 1454 { 1455 mctx_t mctx; 1456 int ret; 1457 1458 if (cpu >= g_config.mos->num_cores) { 1459 TRACE_ERROR("Failed initialize new mtcp context. " 1460 "Requested cpu id %d exceed the number of cores %d configured to use.\n", 1461 cpu, g_config.mos->num_cores); 1462 return NULL; 1463 } 1464 1465 /* check if mtcp_create_context() was already initialized */ 1466 if (g_logctx[cpu] != NULL) { 1467 TRACE_ERROR("%s was already initialized before!\n", 1468 __FUNCTION__); 1469 return NULL; 1470 } 1471 1472 ret = sem_init(&g_init_sem[cpu], 0, 0); 1473 if (ret) { 1474 TRACE_ERROR("Failed initialize init_sem.\n"); 1475 return NULL; 1476 } 1477 1478 ret = sem_init(&g_done_sem[cpu], 0, 0); 1479 if (ret) { 1480 TRACE_ERROR("Failed initialize done_sem.\n"); 1481 return NULL; 1482 } 1483 1484 mctx = (mctx_t)calloc(1, sizeof(struct mtcp_context)); 1485 if (!mctx) { 1486 TRACE_ERROR("Failed to allocate memory for mtcp_context.\n"); 1487 return NULL; 1488 } 1489 mctx->cpu = cpu; 1490 g_ctx[cpu] = mctx; 1491 1492 /* initialize logger */ 1493 g_logctx[cpu] = (struct log_thread_context *) 1494 calloc(1, sizeof(struct log_thread_context)); 1495 if (!g_logctx[cpu]) { 1496 perror("malloc"); 1497 TRACE_ERROR("Failed to allocate memory for log thread context.\n"); 1498 return NULL; 1499 } 1500 InitLogThreadContext(g_logctx[cpu], cpu); 1501 if (pthread_create(&log_thread[cpu], 1502 NULL, ThreadLogMain, (void *)g_logctx[cpu])) { 1503 perror("pthread_create"); 1504 TRACE_ERROR("Failed to create log thread\n"); 1505 return NULL; 1506 } 1507 1508 #ifdef ENABLE_DPDK 1509 /* Wake up mTCP threads (wake up I/O threads) */ 1510 if (current_iomodule_func == &dpdk_module_func) { 1511 int master; 1512 master = rte_get_master_lcore(); 1513 if (master == cpu) { 1514 lcore_config[master].ret = 0; 1515 lcore_config[master].state = FINISHED; 1516 if (pthread_create(&g_thread[cpu], 1517 NULL, MTCPRunThread, (void *)mctx) != 0) { 1518 TRACE_ERROR("pthread_create of mtcp thread failed!\n"); 1519 return NULL; 1520 } 1521 } else 1522 rte_eal_remote_launch(MTCPDPDKRunThread, mctx, cpu); 1523 } else 1524 #endif /* !ENABLE_DPDK */ 1525 { 1526 if (pthread_create(&g_thread[cpu], 1527 NULL, MTCPRunThread, (void *)mctx) != 0) { 1528 TRACE_ERROR("pthread_create of mtcp thread failed!\n"); 1529 return NULL; 1530 } 1531 } 1532 1533 sem_wait(&g_init_sem[cpu]); 1534 sem_destroy(&g_init_sem[cpu]); 1535 1536 running[cpu] = TRUE; 1537 1538 #ifdef NETSTAT 1539 #if NETSTAT_TOTAL 1540 if (printer < 0) { 1541 printer = cpu; 1542 TRACE_INFO("CPU %d is in charge of printing stats.\n", printer); 1543 } 1544 #endif 1545 #endif 1546 1547 return mctx; 1548 } 1549 /*----------------------------------------------------------------------------*/ 1550 /** 1551 * TODO: It currently always returns 0. Add appropriate error return values 1552 */ 1553 int 1554 mtcp_destroy_context(mctx_t mctx) 1555 { 1556 struct mtcp_thread_context *ctx = g_pctx[mctx->cpu]; 1557 struct mtcp_manager *mtcp = ctx->mtcp_manager; 1558 struct log_thread_context *log_ctx = mtcp->logger; 1559 int ret, i; 1560 1561 TRACE_DBG("CPU %d: mtcp_destroy_context()\n", mctx->cpu); 1562 1563 /* close all stream sockets that are still open */ 1564 if (!ctx->exit) { 1565 for (i = 0; i < g_config.mos->max_concurrency; i++) { 1566 if (mtcp->smap[i].socktype == MOS_SOCK_STREAM) { 1567 TRACE_DBG("Closing remaining socket %d (%s)\n", 1568 i, TCPStateToString(mtcp->smap[i].stream)); 1569 DumpStream(mtcp, mtcp->smap[i].stream); 1570 mtcp_close(mctx, i); 1571 } 1572 } 1573 } 1574 1575 ctx->done = 1; 1576 1577 //pthread_kill(g_thread[mctx->cpu], SIGINT); 1578 #ifdef ENABLE_DPDK 1579 ctx->exit = 1; 1580 /* XXX - dpdk logic changes */ 1581 if (current_iomodule_func == &dpdk_module_func) { 1582 int master = rte_get_master_lcore(); 1583 if (master == mctx->cpu) 1584 pthread_join(g_thread[mctx->cpu], NULL); 1585 else 1586 rte_eal_wait_lcore(mctx->cpu); 1587 } else 1588 #endif /* !ENABLE_DPDK */ 1589 { 1590 pthread_join(g_thread[mctx->cpu], NULL); 1591 } 1592 1593 TRACE_INFO("MTCP thread %d joined.\n", mctx->cpu); 1594 running[mctx->cpu] = FALSE; 1595 1596 #ifdef NETSTAT 1597 #if NETSTAT_TOTAL 1598 if (printer == mctx->cpu) { 1599 for (i = 0; i < num_cpus; i++) { 1600 if (i != mctx->cpu && running[i]) { 1601 printer = i; 1602 break; 1603 } 1604 } 1605 } 1606 #endif 1607 #endif 1608 1609 log_ctx->done = 1; 1610 ret = write(log_ctx->pair_sp_fd, "F", 1); 1611 if (ret != 1) 1612 TRACE_ERROR("CPU %d: Fail to signal socket pair\n", mctx->cpu); 1613 1614 pthread_join(log_thread[ctx->cpu], NULL); 1615 fclose(mtcp->log_fp); 1616 TRACE_LOG("Log thread %d joined.\n", mctx->cpu); 1617 1618 if (mtcp->connectq) { 1619 DestroyStreamQueue(mtcp->connectq); 1620 mtcp->connectq = NULL; 1621 } 1622 if (mtcp->sendq) { 1623 DestroyStreamQueue(mtcp->sendq); 1624 mtcp->sendq = NULL; 1625 } 1626 if (mtcp->ackq) { 1627 DestroyStreamQueue(mtcp->ackq); 1628 mtcp->ackq = NULL; 1629 } 1630 if (mtcp->closeq) { 1631 DestroyStreamQueue(mtcp->closeq); 1632 mtcp->closeq = NULL; 1633 } 1634 if (mtcp->closeq_int) { 1635 DestroyInternalStreamQueue(mtcp->closeq_int); 1636 mtcp->closeq_int = NULL; 1637 } 1638 if (mtcp->resetq) { 1639 DestroyStreamQueue(mtcp->resetq); 1640 mtcp->resetq = NULL; 1641 } 1642 if (mtcp->resetq_int) { 1643 DestroyInternalStreamQueue(mtcp->resetq_int); 1644 mtcp->resetq_int = NULL; 1645 } 1646 if (mtcp->destroyq) { 1647 DestroyStreamQueue(mtcp->destroyq); 1648 mtcp->destroyq = NULL; 1649 } 1650 1651 DestroyMTCPSender(mtcp->g_sender); 1652 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 1653 DestroyMTCPSender(mtcp->n_sender[i]); 1654 } 1655 1656 MPDestroy(mtcp->rv_pool); 1657 MPDestroy(mtcp->sv_pool); 1658 MPDestroy(mtcp->flow_pool); 1659 1660 if (mtcp->ap) { 1661 DestroyAddressPool(mtcp->ap); 1662 } 1663 1664 SQ_LOCK_DESTROY(&ctx->connect_lock); 1665 SQ_LOCK_DESTROY(&ctx->close_lock); 1666 SQ_LOCK_DESTROY(&ctx->reset_lock); 1667 SQ_LOCK_DESTROY(&ctx->sendq_lock); 1668 SQ_LOCK_DESTROY(&ctx->ackq_lock); 1669 SQ_LOCK_DESTROY(&ctx->destroyq_lock); 1670 1671 //TRACE_INFO("MTCP thread %d destroyed.\n", mctx->cpu); 1672 if (mtcp->iom->destroy_handle) 1673 mtcp->iom->destroy_handle(ctx); 1674 free(ctx); 1675 free(mctx); 1676 1677 return 0; 1678 } 1679 /*----------------------------------------------------------------------------*/ 1680 mtcp_sighandler_t 1681 mtcp_register_signal(int signum, mtcp_sighandler_t handler) 1682 { 1683 mtcp_sighandler_t prev; 1684 1685 if (signum == SIGINT) { 1686 prev = app_signal_handler; 1687 app_signal_handler = handler; 1688 } else { 1689 if ((prev = signal(signum, handler)) == SIG_ERR) { 1690 perror("signal"); 1691 return SIG_ERR; 1692 } 1693 } 1694 1695 return prev; 1696 } 1697 /*----------------------------------------------------------------------------*/ 1698 int 1699 mtcp_getconf(struct mtcp_conf *conf) 1700 { 1701 int i, j; 1702 1703 if (!conf) 1704 return -1; 1705 1706 conf->num_cores = g_config.mos->num_cores; 1707 conf->max_concurrency = g_config.mos->max_concurrency; 1708 conf->cpu_mask = g_config.mos->cpu_mask; 1709 1710 conf->rcvbuf_size = g_config.mos->rmem_size; 1711 conf->sndbuf_size = g_config.mos->wmem_size; 1712 1713 conf->tcp_timewait = g_config.mos->tcp_tw_interval; 1714 conf->tcp_timeout = g_config.mos->tcp_timeout; 1715 1716 i = 0; 1717 struct conf_block *bwalk; 1718 TAILQ_FOREACH(bwalk, &g_config.app_blkh, link) { 1719 struct app_conf *app_conf = (struct app_conf *)bwalk->conf; 1720 for (j = 0; j < app_conf->app_argc; j++) 1721 conf->app_argv[i][j] = app_conf->app_argv[j]; 1722 conf->app_argc[i] = app_conf->app_argc; 1723 conf->app_cpu_mask[i] = app_conf->cpu_mask; 1724 i++; 1725 } 1726 conf->num_app = i; 1727 1728 return 0; 1729 } 1730 /*----------------------------------------------------------------------------*/ 1731 int 1732 mtcp_setconf(const struct mtcp_conf *conf) 1733 { 1734 if (!conf) 1735 return -1; 1736 1737 g_config.mos->num_cores = conf->num_cores; 1738 g_config.mos->max_concurrency = conf->max_concurrency; 1739 1740 g_config.mos->rmem_size = conf->rcvbuf_size; 1741 g_config.mos->wmem_size = conf->sndbuf_size; 1742 1743 g_config.mos->tcp_tw_interval = conf->tcp_timewait; 1744 g_config.mos->tcp_timeout = conf->tcp_timeout; 1745 1746 TRACE_CONFIG("Configuration updated by mtcp_setconf().\n"); 1747 //PrintConfiguration(); 1748 1749 return 0; 1750 } 1751 /*----------------------------------------------------------------------------*/ 1752 int 1753 mtcp_init(const char *config_file) 1754 { 1755 int i; 1756 int ret; 1757 1758 if (geteuid()) { 1759 TRACE_CONFIG("[CAUTION] Run as root if mlock is necessary.\n"); 1760 } 1761 1762 /* getting cpu and NIC */ 1763 num_cpus = GetNumCPUs(); 1764 assert(num_cpus >= 1); 1765 for (i = 0; i < num_cpus; i++) { 1766 g_mtcp[i] = NULL; 1767 running[i] = FALSE; 1768 sigint_cnt[i] = 0; 1769 } 1770 1771 ret = LoadConfigurationUpperHalf(config_file); 1772 if (ret) { 1773 TRACE_CONFIG("Error occured while loading configuration.\n"); 1774 return -1; 1775 } 1776 1777 #if defined(ENABLE_PSIO) 1778 current_iomodule_func = &ps_module_func; 1779 #elif defined(ENABLE_DPDK) 1780 current_iomodule_func = &dpdk_module_func; 1781 #elif defined(ENABLE_PCAP) 1782 current_iomodule_func = &pcap_module_func; 1783 #elif defined(ENABLE_DPDKR) 1784 current_iomodule_func = &dpdkr_module_func; 1785 #endif 1786 1787 if (current_iomodule_func->load_module_upper_half) 1788 current_iomodule_func->load_module_upper_half(); 1789 1790 LoadConfigurationLowerHalf(); 1791 1792 //PrintConfiguration(); 1793 1794 /* TODO: this should be fixed */ 1795 ap = CreateAddressPool(g_config.mos->netdev_table->ent[0]->ip_addr, 1); 1796 if (!ap) { 1797 TRACE_CONFIG("Error occured while creating address pool.\n"); 1798 return -1; 1799 } 1800 1801 //PrintInterfaceInfo(); 1802 //PrintRoutingTable(); 1803 //PrintARPTable(); 1804 InitARPTable(); 1805 1806 if (signal(SIGUSR1, HandleSignal) == SIG_ERR) { 1807 perror("signal, SIGUSR1"); 1808 return -1; 1809 } 1810 if (signal(SIGINT, HandleSignal) == SIG_ERR) { 1811 perror("signal, SIGINT"); 1812 return -1; 1813 } 1814 app_signal_handler = NULL; 1815 1816 printf("load_module(): %p\n", current_iomodule_func); 1817 /* load system-wide io module specs */ 1818 if (current_iomodule_func->load_module_lower_half) 1819 current_iomodule_func->load_module_lower_half(); 1820 1821 GlobInitEvent(); 1822 1823 PrintConf(&g_config); 1824 1825 return 0; 1826 } 1827 /*----------------------------------------------------------------------------*/ 1828 int 1829 mtcp_destroy() 1830 { 1831 int i; 1832 1833 /* wait until all threads are closed */ 1834 for (i = 0; i < num_cpus; i++) { 1835 if (running[i]) { 1836 if (pthread_join(g_thread[i], NULL) != 0) 1837 return -1; 1838 } 1839 } 1840 1841 DestroyAddressPool(ap); 1842 1843 TRACE_INFO("All MTCP threads are joined.\n"); 1844 1845 return 0; 1846 } 1847 /*----------------------------------------------------------------------------*/ 1848