1 #define _GNU_SOURCE 2 #include <sched.h> 3 #include <unistd.h> 4 #include <sys/time.h> 5 #include <semaphore.h> 6 #include <sys/mman.h> 7 #include <signal.h> 8 #include <assert.h> 9 #include <string.h> 10 11 #include "cpu.h" 12 #include "eth_in.h" 13 #include "fhash.h" 14 #include "tcp_send_buffer.h" 15 #include "tcp_ring_buffer.h" 16 #include "socket.h" 17 #include "eth_out.h" 18 #include "tcp.h" 19 #include "tcp_in.h" 20 #include "tcp_out.h" 21 #include "mtcp_api.h" 22 #include "eventpoll.h" 23 #include "logger.h" 24 #include "config.h" 25 #include "arp.h" 26 #include "ip_out.h" 27 #include "timer.h" 28 #include "debug.h" 29 #include "event_callback.h" 30 #include "tcp_rb.h" 31 #include "tcp_stream.h" 32 #include "io_module.h" 33 34 #ifdef ENABLE_DPDK 35 /* for launching rte thread */ 36 #include <rte_launch.h> 37 #include <rte_lcore.h> 38 #endif /* !ENABLE_DPDK */ 39 #define PS_CHUNK_SIZE 64 40 #define RX_THRESH (PS_CHUNK_SIZE * 0.8) 41 42 #define ROUND_STAT FALSE 43 #define TIME_STAT FALSE 44 #define EVENT_STAT FALSE 45 #define TESTING FALSE 46 47 #define LOG_FILE_NAME "log" 48 #define MAX_FILE_NAME 1024 49 50 #define MAX(a, b) ((a)>(b)?(a):(b)) 51 #define MIN(a, b) ((a)<(b)?(a):(b)) 52 53 #define PER_STREAM_SLICE 0.1 // in ms 54 #define PER_STREAM_TCHECK 1 // in ms 55 #define PS_SELECT_TIMEOUT 100 // in us 56 57 #define GBPS(bytes) (bytes * 8.0 / (1000 * 1000 * 1000)) 58 59 /*----------------------------------------------------------------------------*/ 60 /* handlers for threads */ 61 struct mtcp_thread_context *g_pctx[MAX_CPUS] = {0}; 62 struct log_thread_context *g_logctx[MAX_CPUS] = {0}; 63 /*----------------------------------------------------------------------------*/ 64 static pthread_t g_thread[MAX_CPUS] = {0}; 65 static pthread_t log_thread[MAX_CPUS] = {0}; 66 /*----------------------------------------------------------------------------*/ 67 static sem_t g_init_sem[MAX_CPUS]; 68 static sem_t g_done_sem[MAX_CPUS]; 69 static int running[MAX_CPUS] = {0}; 70 /*----------------------------------------------------------------------------*/ 71 mtcp_sighandler_t app_signal_handler; 72 static int sigint_cnt[MAX_CPUS] = {0}; 73 static struct timespec sigint_ts[MAX_CPUS]; 74 /*----------------------------------------------------------------------------*/ 75 #ifdef NETSTAT 76 #if NETSTAT_TOTAL 77 static int printer = -1; 78 #if ROUND_STAT 79 #endif /* ROUND_STAT */ 80 #endif /* NETSTAT_TOTAL */ 81 #endif /* NETSTAT */ 82 /*----------------------------------------------------------------------------*/ 83 void 84 HandleSignal(int signal) 85 { 86 int i = 0; 87 88 if (signal == SIGINT) { 89 #ifdef DARWIN 90 int core = 0; 91 #else 92 int core = sched_getcpu(); 93 #endif 94 struct timespec cur_ts; 95 96 clock_gettime(CLOCK_REALTIME, &cur_ts); 97 98 if (sigint_cnt[core] > 0 && cur_ts.tv_sec == sigint_ts[core].tv_sec) { 99 for (i = 0; i < g_config.mos->num_cores; i++) { 100 if (running[i]) { 101 exit(0); 102 g_pctx[i]->exit = TRUE; 103 } 104 } 105 } else { 106 for (i = 0; i < g_config.mos->num_cores; i++) { 107 if (g_pctx[i]) 108 g_pctx[i]->interrupt = TRUE; 109 } 110 if (!app_signal_handler) { 111 for (i = 0; i < g_config.mos->num_cores; i++) { 112 if (running[i]) { 113 exit(0); 114 g_pctx[i]->exit = TRUE; 115 } 116 } 117 } 118 } 119 sigint_cnt[core]++; 120 clock_gettime(CLOCK_REALTIME, &sigint_ts[core]); 121 } 122 123 if (signal != SIGUSR1) { 124 if (app_signal_handler) { 125 app_signal_handler(signal); 126 } 127 } 128 } 129 /*----------------------------------------------------------------------------*/ 130 static int 131 AttachDevice(struct mtcp_thread_context* ctx) 132 { 133 int working = -1; 134 mtcp_manager_t mtcp = ctx->mtcp_manager; 135 136 if (mtcp->iom->link_devices) 137 working = mtcp->iom->link_devices(ctx); 138 else 139 return 0; 140 141 return working; 142 } 143 /*----------------------------------------------------------------------------*/ 144 #ifdef TIMESTAT 145 static inline void 146 InitStatCounter(struct stat_counter *counter) 147 { 148 counter->cnt = 0; 149 counter->sum = 0; 150 counter->max = 0; 151 counter->min = 0; 152 } 153 /*----------------------------------------------------------------------------*/ 154 static inline void 155 UpdateStatCounter(struct stat_counter *counter, int64_t value) 156 { 157 counter->cnt++; 158 counter->sum += value; 159 if (value > counter->max) 160 counter->max = value; 161 if (counter->min == 0 || value < counter->min) 162 counter->min = value; 163 } 164 /*----------------------------------------------------------------------------*/ 165 static inline uint64_t 166 GetAverageStat(struct stat_counter *counter) 167 { 168 return counter->cnt ? (counter->sum / counter->cnt) : 0; 169 } 170 /*----------------------------------------------------------------------------*/ 171 static inline int64_t 172 TimeDiffUs(struct timeval *t2, struct timeval *t1) 173 { 174 return (t2->tv_sec - t1->tv_sec) * 1000000 + 175 (int64_t)(t2->tv_usec - t1->tv_usec); 176 } 177 /*----------------------------------------------------------------------------*/ 178 #endif 179 #ifdef NETSTAT 180 static inline void 181 PrintThreadNetworkStats(mtcp_manager_t mtcp, struct net_stat *ns) 182 { 183 int i; 184 185 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 186 ns->rx_packets[i] = mtcp->nstat.rx_packets[i] - mtcp->p_nstat.rx_packets[i]; 187 ns->rx_errors[i] = mtcp->nstat.rx_errors[i] - mtcp->p_nstat.rx_errors[i]; 188 ns->rx_bytes[i] = mtcp->nstat.rx_bytes[i] - mtcp->p_nstat.rx_bytes[i]; 189 ns->tx_packets[i] = mtcp->nstat.tx_packets[i] - mtcp->p_nstat.tx_packets[i]; 190 ns->tx_drops[i] = mtcp->nstat.tx_drops[i] - mtcp->p_nstat.tx_drops[i]; 191 ns->tx_bytes[i] = mtcp->nstat.tx_bytes[i] - mtcp->p_nstat.tx_bytes[i]; 192 #if NETSTAT_PERTHREAD 193 if (g_config.mos->netdev_table->ent[i]->stat_print) { 194 fprintf(stderr, "[CPU%2d] %s flows: %6u, " 195 "RX: %7llu(pps) (err: %5llu), %5.2lf(Gbps), " 196 "TX: %7llu(pps), %5.2lf(Gbps)\n", 197 mtcp->ctx->cpu, 198 g_config.mos->netdev_table->ent[i]->dev_name, 199 (unsigned)mtcp->flow_cnt, 200 (long long unsigned)ns->rx_packets[i], 201 (long long unsigned)ns->rx_errors[i], 202 GBPS(ns->rx_bytes[i]), 203 (long long unsigned)ns->tx_packets[i], 204 GBPS(ns->tx_bytes[i])); 205 } 206 #endif 207 } 208 mtcp->p_nstat = mtcp->nstat; 209 210 } 211 /*----------------------------------------------------------------------------*/ 212 #if ROUND_STAT 213 static inline void 214 PrintThreadRoundStats(mtcp_manager_t mtcp, struct run_stat *rs) 215 { 216 #define ROUND_DIV (1000) 217 rs->rounds = mtcp->runstat.rounds - mtcp->p_runstat.rounds; 218 rs->rounds_rx = mtcp->runstat.rounds_rx - mtcp->p_runstat.rounds_rx; 219 rs->rounds_rx_try = mtcp->runstat.rounds_rx_try - mtcp->p_runstat.rounds_rx_try; 220 rs->rounds_tx = mtcp->runstat.rounds_tx - mtcp->p_runstat.rounds_tx; 221 rs->rounds_tx_try = mtcp->runstat.rounds_tx_try - mtcp->p_runstat.rounds_tx_try; 222 rs->rounds_select = mtcp->runstat.rounds_select - mtcp->p_runstat.rounds_select; 223 rs->rounds_select_rx = mtcp->runstat.rounds_select_rx - mtcp->p_runstat.rounds_select_rx; 224 rs->rounds_select_tx = mtcp->runstat.rounds_select_tx - mtcp->p_runstat.rounds_select_tx; 225 rs->rounds_select_intr = mtcp->runstat.rounds_select_intr - mtcp->p_runstat.rounds_select_intr; 226 rs->rounds_twcheck = mtcp->runstat.rounds_twcheck - mtcp->p_runstat.rounds_twcheck; 227 mtcp->p_runstat = mtcp->runstat; 228 #if NETSTAT_PERTHREAD 229 fprintf(stderr, "[CPU%2d] Rounds: %4lluK, " 230 "rx: %3lluK (try: %4lluK), tx: %3lluK (try: %4lluK), " 231 "ps_select: %4llu (rx: %4llu, tx: %4llu, intr: %3llu)\n", 232 mtcp->ctx->cpu, rs->rounds / ROUND_DIV, 233 rs->rounds_rx / ROUND_DIV, rs->rounds_rx_try / ROUND_DIV, 234 rs->rounds_tx / ROUND_DIV, rs->rounds_tx_try / ROUND_DIV, 235 rs->rounds_select, 236 rs->rounds_select_rx, rs->rounds_select_tx, rs->rounds_select_intr); 237 #endif 238 } 239 #endif /* ROUND_STAT */ 240 /*----------------------------------------------------------------------------*/ 241 #if TIME_STAT 242 static inline void 243 PrintThreadRoundTime(mtcp_manager_t mtcp) 244 { 245 fprintf(stderr, "[CPU%2d] Time: (avg, max) " 246 "round: (%4luus, %4luus), processing: (%4luus, %4luus), " 247 "tcheck: (%4luus, %4luus), epoll: (%4luus, %4luus), " 248 "handle: (%4luus, %4luus), xmit: (%4luus, %4luus), " 249 "select: (%4luus, %4luus)\n", mtcp->ctx->cpu, 250 GetAverageStat(&mtcp->rtstat.round), mtcp->rtstat.round.max, 251 GetAverageStat(&mtcp->rtstat.processing), mtcp->rtstat.processing.max, 252 GetAverageStat(&mtcp->rtstat.tcheck), mtcp->rtstat.tcheck.max, 253 GetAverageStat(&mtcp->rtstat.epoll), mtcp->rtstat.epoll.max, 254 GetAverageStat(&mtcp->rtstat.handle), mtcp->rtstat.handle.max, 255 GetAverageStat(&mtcp->rtstat.xmit), mtcp->rtstat.xmit.max, 256 GetAverageStat(&mtcp->rtstat.select), mtcp->rtstat.select.max); 257 258 InitStatCounter(&mtcp->rtstat.round); 259 InitStatCounter(&mtcp->rtstat.processing); 260 InitStatCounter(&mtcp->rtstat.tcheck); 261 InitStatCounter(&mtcp->rtstat.epoll); 262 InitStatCounter(&mtcp->rtstat.handle); 263 InitStatCounter(&mtcp->rtstat.xmit); 264 InitStatCounter(&mtcp->rtstat.select); 265 } 266 #endif 267 #endif /* NETSTAT */ 268 /*----------------------------------------------------------------------------*/ 269 #if EVENT_STAT 270 static inline void 271 PrintEventStat(int core, struct mtcp_epoll_stat *stat) 272 { 273 fprintf(stderr, "[CPU%2d] calls: %lu, waits: %lu, wakes: %lu, " 274 "issued: %lu, registered: %lu, invalidated: %lu, handled: %lu\n", 275 core, stat->calls, stat->waits, stat->wakes, 276 stat->issued, stat->registered, stat->invalidated, stat->handled); 277 memset(stat, 0, sizeof(struct mtcp_epoll_stat)); 278 } 279 #endif /* EVENT_STAT */ 280 /*----------------------------------------------------------------------------*/ 281 #ifdef NETSTAT 282 static inline void 283 PrintNetworkStats(mtcp_manager_t mtcp, uint32_t cur_ts) 284 { 285 #define TIMEOUT 1 286 int i; 287 struct net_stat ns; 288 bool stat_print = false; 289 #if ROUND_STAT 290 struct run_stat rs; 291 #endif /* ROUND_STAT */ 292 #ifdef NETSTAT_TOTAL 293 static double peak_total_rx_gbps = 0; 294 static double peak_total_tx_gbps = 0; 295 static double avg_total_rx_gbps = 0; 296 static double avg_total_tx_gbps = 0; 297 298 double total_rx_gbps = 0, total_tx_gbps = 0; 299 int j; 300 uint32_t gflow_cnt = 0; 301 struct net_stat g_nstat; 302 #if ROUND_STAT 303 struct run_stat g_runstat; 304 #endif /* ROUND_STAT */ 305 #endif /* NETSTAT_TOTAL */ 306 307 if (TS_TO_MSEC(cur_ts - mtcp->p_nstat_ts) < SEC_TO_MSEC(TIMEOUT)) { 308 return; 309 } 310 311 mtcp->p_nstat_ts = cur_ts; 312 gflow_cnt = 0; 313 memset(&g_nstat, 0, sizeof(struct net_stat)); 314 for (i = 0; i < g_config.mos->num_cores; i++) { 315 if (running[i]) { 316 PrintThreadNetworkStats(g_mtcp[i], &ns); 317 #if NETSTAT_TOTAL 318 gflow_cnt += g_mtcp[i]->flow_cnt; 319 for (j = 0; j < g_config.mos->netdev_table->num; j++) { 320 g_nstat.rx_packets[j] += ns.rx_packets[j]; 321 g_nstat.rx_errors[j] += ns.rx_errors[j]; 322 g_nstat.rx_bytes[j] += ns.rx_bytes[j]; 323 g_nstat.tx_packets[j] += ns.tx_packets[j]; 324 g_nstat.tx_drops[j] += ns.tx_drops[j]; 325 g_nstat.tx_bytes[j] += ns.tx_bytes[j]; 326 } 327 #endif 328 } 329 } 330 #if NETSTAT_TOTAL 331 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 332 if (g_config.mos->netdev_table->ent[i]->stat_print) { 333 fprintf(stderr, "[ ALL ] %s, " 334 "RX: %7llu(pps) (err: %5llu), %5.2lf(Gbps), " 335 "TX: %7llu(pps), %5.2lf(Gbps)\n", 336 g_config.mos->netdev_table->ent[i]->dev_name, 337 (long long unsigned)g_nstat.rx_packets[i], 338 (long long unsigned)g_nstat.rx_errors[i], 339 GBPS(g_nstat.rx_bytes[i]), 340 (long long unsigned)g_nstat.tx_packets[i], 341 GBPS(g_nstat.tx_bytes[i])); 342 total_rx_gbps += GBPS(g_nstat.rx_bytes[i]); 343 total_tx_gbps += GBPS(g_nstat.tx_bytes[i]); 344 stat_print = true; 345 } 346 } 347 if (stat_print) { 348 fprintf(stderr, "[ ALL ] flows: %6u\n", gflow_cnt); 349 if (avg_total_rx_gbps == 0) 350 avg_total_rx_gbps = total_rx_gbps; 351 else 352 avg_total_rx_gbps = avg_total_rx_gbps * 0.6 + total_rx_gbps * 0.4; 353 354 if (avg_total_tx_gbps == 0) 355 avg_total_tx_gbps = total_tx_gbps; 356 else 357 avg_total_tx_gbps = avg_total_tx_gbps * 0.6 + total_tx_gbps * 0.4; 358 359 if (peak_total_rx_gbps < total_rx_gbps) 360 peak_total_rx_gbps = total_rx_gbps; 361 if (peak_total_tx_gbps < total_tx_gbps) 362 peak_total_tx_gbps = total_tx_gbps; 363 364 fprintf(stderr, "[ PEAK ] RX: %5.2lf(Gbps), TX: %5.2lf(Gbps)\n" 365 "[ RECENT AVG ] RX: %5.2lf(Gbps), TX: %5.2lf(Gbps)\n", 366 peak_total_rx_gbps, peak_total_tx_gbps, 367 avg_total_rx_gbps, avg_total_tx_gbps); 368 } 369 #endif 370 371 #if ROUND_STAT 372 memset(&g_runstat, 0, sizeof(struct run_stat)); 373 for (i = 0; i < g_config.mos->num_cores; i++) { 374 if (running[i]) { 375 PrintThreadRoundStats(g_mtcp[i], &rs); 376 #if DBGMSG 377 g_runstat.rounds += rs.rounds; 378 g_runstat.rounds_rx += rs.rounds_rx; 379 g_runstat.rounds_rx_try += rs.rounds_rx_try; 380 g_runstat.rounds_tx += rs.rounds_tx; 381 g_runstat.rounds_tx_try += rs.rounds_tx_try; 382 g_runstat.rounds_select += rs.rounds_select; 383 g_runstat.rounds_select_rx += rs.rounds_select_rx; 384 g_runstat.rounds_select_tx += rs.rounds_select_tx; 385 #endif 386 } 387 } 388 389 TRACE_DBG("[ ALL ] Rounds: %4ldK, " 390 "rx: %3ldK (try: %4ldK), tx: %3ldK (try: %4ldK), " 391 "ps_select: %4ld (rx: %4ld, tx: %4ld)\n", 392 g_runstat.rounds / 1000, g_runstat.rounds_rx / 1000, 393 g_runstat.rounds_rx_try / 1000, g_runstat.rounds_tx / 1000, 394 g_runstat.rounds_tx_try / 1000, g_runstat.rounds_select, 395 g_runstat.rounds_select_rx, g_runstat.rounds_select_tx); 396 #endif /* ROUND_STAT */ 397 398 #if TIME_STAT 399 for (i = 0; i < g_config.mos->num_cores; i++) { 400 if (running[i]) { 401 PrintThreadRoundTime(g_mtcp[i]); 402 } 403 } 404 #endif 405 406 #if EVENT_STAT 407 for (i = 0; i < g_config.mos->num_cores; i++) { 408 if (running[i] && g_mtcp[i]->ep) { 409 PrintEventStat(i, &g_mtcp[i]->ep->stat); 410 } 411 } 412 #endif 413 414 fflush(stderr); 415 } 416 #endif /* NETSTAT */ 417 /*----------------------------------------------------------------------------*/ 418 static inline void 419 FlushMonitorReadEvents(mtcp_manager_t mtcp) 420 { 421 struct event_queue *mtcpq; 422 struct tcp_stream *cur_stream; 423 struct mon_listener *walk; 424 425 /* check if monitor sockets should be passed data */ 426 TAILQ_FOREACH(walk, &mtcp->monitors, link) { 427 if (walk->socket->socktype != MOS_SOCK_MONITOR_STREAM || 428 !(mtcpq = walk->eq)) 429 continue; 430 431 while (mtcpq->num_events > 0) { 432 cur_stream = 433 (struct tcp_stream *)mtcpq->events[mtcpq->start++].ev.data.ptr; 434 /* only read events */ 435 if (cur_stream != NULL && 436 (cur_stream->socket->events | MOS_EPOLLIN)) { 437 if (cur_stream->rcvvar != NULL && 438 cur_stream->rcvvar->rcvbuf != NULL) { 439 /* no need to pass pkt context */ 440 struct socket_map *walk; 441 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 442 HandleCallback(mtcp, MOS_NULL, walk, 443 cur_stream->side, NULL, 444 MOS_ON_CONN_NEW_DATA); 445 } SOCKQ_FOREACH_END; 446 #ifndef NEWRB 447 /* re-adjust tcp_ring_buffer now */ 448 RBRemove(mtcp->rbm_rcv, cur_stream->rcvvar->rcvbuf, 449 cur_stream->rcvvar->rcvbuf->merged_len, 450 AT_MTCP, cur_stream->buffer_mgmt); 451 cur_stream->rcvvar->rcv_wnd = 452 cur_stream->rcvvar->rcvbuf->size 453 - 1 - cur_stream->rcvvar->rcvbuf->last_len; 454 #endif 455 } 456 /* reset the actions now */ 457 cur_stream->actions = 0; 458 } 459 if (mtcpq->start >= mtcpq->size) 460 mtcpq->start = 0; 461 mtcpq->num_events--; 462 } 463 } 464 } 465 /*----------------------------------------------------------------------------*/ 466 static inline void 467 FlushBufferedReadEvents(mtcp_manager_t mtcp) 468 { 469 int i; 470 int offset; 471 struct event_queue *mtcpq; 472 struct tcp_stream *cur_stream; 473 474 if (mtcp->ep == NULL) { 475 TRACE_EPOLL("No epoll socket has been registered yet!\n"); 476 return; 477 } else { 478 /* case when mtcpq exists */ 479 mtcpq = mtcp->ep->mtcp_queue; 480 offset = mtcpq->start; 481 } 482 483 /* we will use queued-up epoll read-in events 484 * to trigger buffered read monitor events */ 485 for (i = 0; i < mtcpq->num_events; i++) { 486 cur_stream = mtcp->smap[mtcpq->events[offset++].sockid].stream; 487 /* only read events */ 488 /* Raise new data callback event */ 489 if (cur_stream != NULL && 490 (cur_stream->socket->events | MOS_EPOLLIN)) { 491 if (cur_stream->rcvvar != NULL && 492 cur_stream->rcvvar->rcvbuf != NULL) { 493 /* no need to pass pkt context */ 494 struct socket_map *walk; 495 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 496 HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side, 497 NULL, MOS_ON_CONN_NEW_DATA); 498 } SOCKQ_FOREACH_END; 499 } 500 } 501 if (offset >= mtcpq->size) 502 offset = 0; 503 } 504 } 505 /*----------------------------------------------------------------------------*/ 506 static inline void 507 FlushEpollEvents(mtcp_manager_t mtcp, uint32_t cur_ts) 508 { 509 struct mtcp_epoll *ep = mtcp->ep; 510 struct event_queue *usrq = ep->usr_queue; 511 struct event_queue *mtcpq = ep->mtcp_queue; 512 513 pthread_mutex_lock(&ep->epoll_lock); 514 if (ep->mtcp_queue->num_events > 0) { 515 /* while mtcp_queue have events */ 516 /* and usr_queue is not full */ 517 while (mtcpq->num_events > 0 && usrq->num_events < usrq->size) { 518 /* copy the event from mtcp_queue to usr_queue */ 519 usrq->events[usrq->end++] = mtcpq->events[mtcpq->start++]; 520 521 if (usrq->end >= usrq->size) 522 usrq->end = 0; 523 usrq->num_events++; 524 525 if (mtcpq->start >= mtcpq->size) 526 mtcpq->start = 0; 527 mtcpq->num_events--; 528 } 529 } 530 531 /* if there are pending events, wake up user */ 532 if (ep->waiting && (ep->usr_queue->num_events > 0 || 533 ep->usr_shadow_queue->num_events > 0)) { 534 STAT_COUNT(mtcp->runstat.rounds_epoll); 535 TRACE_EPOLL("Broadcasting events. num: %d, cur_ts: %u, prev_ts: %u\n", 536 ep->usr_queue->num_events, cur_ts, mtcp->ts_last_event); 537 mtcp->ts_last_event = cur_ts; 538 ep->stat.wakes++; 539 pthread_cond_signal(&ep->epoll_cond); 540 } 541 pthread_mutex_unlock(&ep->epoll_lock); 542 } 543 /*----------------------------------------------------------------------------*/ 544 static inline void 545 HandleApplicationCalls(mtcp_manager_t mtcp, uint32_t cur_ts) 546 { 547 tcp_stream *stream; 548 int cnt, max_cnt; 549 int handled, delayed; 550 int control, send, ack; 551 552 /* connect handling */ 553 while ((stream = StreamDequeue(mtcp->connectq))) { 554 if (stream->state != TCP_ST_SYN_SENT) { 555 TRACE_INFO("Got a connection request from app with state: %s", 556 TCPStateToString(stream)); 557 exit(EXIT_FAILURE); 558 } else { 559 stream->cb_events |= MOS_ON_CONN_START | 560 MOS_ON_TCP_STATE_CHANGE; 561 /* if monitor is on... */ 562 if (stream->pair_stream != NULL) 563 stream->pair_stream->cb_events |= 564 MOS_ON_CONN_START; 565 } 566 AddtoControlList(mtcp, stream, cur_ts); 567 } 568 569 /* send queue handling */ 570 while ((stream = StreamDequeue(mtcp->sendq))) { 571 stream->sndvar->on_sendq = FALSE; 572 AddtoSendList(mtcp, stream); 573 } 574 575 /* ack queue handling */ 576 while ((stream = StreamDequeue(mtcp->ackq))) { 577 stream->sndvar->on_ackq = FALSE; 578 EnqueueACK(mtcp, stream, cur_ts, ACK_OPT_AGGREGATE); 579 } 580 581 /* close handling */ 582 handled = delayed = 0; 583 control = send = ack = 0; 584 while ((stream = StreamDequeue(mtcp->closeq))) { 585 struct tcp_send_vars *sndvar = stream->sndvar; 586 sndvar->on_closeq = FALSE; 587 588 if (sndvar->sndbuf) { 589 sndvar->fss = sndvar->sndbuf->head_seq + sndvar->sndbuf->len; 590 } else { 591 sndvar->fss = stream->snd_nxt; 592 } 593 594 if (g_config.mos->tcp_timeout > 0) 595 RemoveFromTimeoutList(mtcp, stream); 596 597 if (stream->have_reset) { 598 handled++; 599 if (stream->state != TCP_ST_CLOSED_RSVD) { 600 stream->close_reason = TCP_RESET; 601 stream->state = TCP_ST_CLOSED_RSVD; 602 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 603 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 604 DestroyTCPStream(mtcp, stream); 605 } else { 606 TRACE_ERROR("Stream already closed.\n"); 607 } 608 609 } else if (sndvar->on_control_list) { 610 sndvar->on_closeq_int = TRUE; 611 StreamInternalEnqueue(mtcp->closeq_int, stream); 612 delayed++; 613 if (sndvar->on_control_list) 614 control++; 615 if (sndvar->on_send_list) 616 send++; 617 if (sndvar->on_ack_list) 618 ack++; 619 620 } else if (sndvar->on_send_list || sndvar->on_ack_list) { 621 handled++; 622 if (stream->state == TCP_ST_ESTABLISHED) { 623 stream->state = TCP_ST_FIN_WAIT_1; 624 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 625 TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id); 626 627 } else if (stream->state == TCP_ST_CLOSE_WAIT) { 628 stream->state = TCP_ST_LAST_ACK; 629 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 630 TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id); 631 } 632 stream->control_list_waiting = TRUE; 633 634 } else if (stream->state != TCP_ST_CLOSED_RSVD) { 635 handled++; 636 if (stream->state == TCP_ST_ESTABLISHED) { 637 stream->state = TCP_ST_FIN_WAIT_1; 638 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 639 TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id); 640 641 } else if (stream->state == TCP_ST_CLOSE_WAIT) { 642 stream->state = TCP_ST_LAST_ACK; 643 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 644 TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id); 645 } 646 //sndvar->rto = TCP_FIN_RTO; 647 //UpdateRetransmissionTimer(mtcp, stream, mtcp->cur_ts); 648 AddtoControlList(mtcp, stream, cur_ts); 649 } else { 650 TRACE_ERROR("Already closed connection!\n"); 651 } 652 } 653 TRACE_ROUND("Handling close connections. cnt: %d\n", cnt); 654 655 cnt = 0; 656 max_cnt = mtcp->closeq_int->count; 657 while (cnt++ < max_cnt) { 658 stream = StreamInternalDequeue(mtcp->closeq_int); 659 660 if (stream->sndvar->on_control_list) { 661 StreamInternalEnqueue(mtcp->closeq_int, stream); 662 663 } else if (stream->state != TCP_ST_CLOSED_RSVD) { 664 handled++; 665 stream->sndvar->on_closeq_int = FALSE; 666 if (stream->state == TCP_ST_ESTABLISHED) { 667 stream->state = TCP_ST_FIN_WAIT_1; 668 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 669 TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id); 670 671 } else if (stream->state == TCP_ST_CLOSE_WAIT) { 672 stream->state = TCP_ST_LAST_ACK; 673 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 674 TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id); 675 } 676 AddtoControlList(mtcp, stream, cur_ts); 677 } else { 678 stream->sndvar->on_closeq_int = FALSE; 679 TRACE_ERROR("Already closed connection!\n"); 680 } 681 } 682 683 /* reset handling */ 684 while ((stream = StreamDequeue(mtcp->resetq))) { 685 stream->sndvar->on_resetq = FALSE; 686 687 if (g_config.mos->tcp_timeout > 0) 688 RemoveFromTimeoutList(mtcp, stream); 689 690 if (stream->have_reset) { 691 if (stream->state != TCP_ST_CLOSED_RSVD) { 692 stream->close_reason = TCP_RESET; 693 stream->state = TCP_ST_CLOSED_RSVD; 694 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 695 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 696 DestroyTCPStream(mtcp, stream); 697 } else { 698 TRACE_ERROR("Stream already closed.\n"); 699 } 700 701 } else if (stream->sndvar->on_control_list || 702 stream->sndvar->on_send_list || stream->sndvar->on_ack_list) { 703 /* wait until all the queues are flushed */ 704 stream->sndvar->on_resetq_int = TRUE; 705 StreamInternalEnqueue(mtcp->resetq_int, stream); 706 707 } else { 708 if (stream->state != TCP_ST_CLOSED_RSVD) { 709 stream->close_reason = TCP_ACTIVE_CLOSE; 710 stream->state = TCP_ST_CLOSED_RSVD; 711 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 712 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 713 AddtoControlList(mtcp, stream, cur_ts); 714 } else { 715 TRACE_ERROR("Stream already closed.\n"); 716 } 717 } 718 } 719 TRACE_ROUND("Handling reset connections. cnt: %d\n", cnt); 720 721 cnt = 0; 722 max_cnt = mtcp->resetq_int->count; 723 while (cnt++ < max_cnt) { 724 stream = StreamInternalDequeue(mtcp->resetq_int); 725 726 if (stream->sndvar->on_control_list || 727 stream->sndvar->on_send_list || stream->sndvar->on_ack_list) { 728 /* wait until all the queues are flushed */ 729 StreamInternalEnqueue(mtcp->resetq_int, stream); 730 731 } else { 732 stream->sndvar->on_resetq_int = FALSE; 733 734 if (stream->state != TCP_ST_CLOSED_RSVD) { 735 stream->close_reason = TCP_ACTIVE_CLOSE; 736 stream->state = TCP_ST_CLOSED_RSVD; 737 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 738 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 739 AddtoControlList(mtcp, stream, cur_ts); 740 } else { 741 TRACE_ERROR("Stream already closed.\n"); 742 } 743 } 744 } 745 746 /* destroy streams in destroyq */ 747 while ((stream = StreamDequeue(mtcp->destroyq))) { 748 DestroyTCPStream(mtcp, stream); 749 } 750 751 mtcp->wakeup_flag = FALSE; 752 } 753 /*----------------------------------------------------------------------------*/ 754 static inline void 755 WritePacketsToChunks(mtcp_manager_t mtcp, uint32_t cur_ts) 756 { 757 int thresh = g_config.mos->max_concurrency; 758 int i; 759 760 /* Set the threshold to g_config.mos->max_concurrency to send ACK immediately */ 761 /* Otherwise, set to appropriate value (e.g. thresh) */ 762 assert(mtcp->g_sender != NULL); 763 if (mtcp->g_sender->control_list_cnt) 764 WriteTCPControlList(mtcp, mtcp->g_sender, cur_ts, thresh); 765 if (mtcp->g_sender->ack_list_cnt) 766 WriteTCPACKList(mtcp, mtcp->g_sender, cur_ts, thresh); 767 if (mtcp->g_sender->send_list_cnt) 768 WriteTCPDataList(mtcp, mtcp->g_sender, cur_ts, thresh); 769 770 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 771 assert(mtcp->n_sender[i] != NULL); 772 if (mtcp->n_sender[i]->control_list_cnt) 773 WriteTCPControlList(mtcp, mtcp->n_sender[i], cur_ts, thresh); 774 if (mtcp->n_sender[i]->ack_list_cnt) 775 WriteTCPACKList(mtcp, mtcp->n_sender[i], cur_ts, thresh); 776 if (mtcp->n_sender[i]->send_list_cnt) 777 WriteTCPDataList(mtcp, mtcp->n_sender[i], cur_ts, thresh); 778 } 779 } 780 /*----------------------------------------------------------------------------*/ 781 #if TESTING 782 static int 783 DestroyRemainingFlows(mtcp_manager_t mtcp) 784 { 785 struct hashtable *ht = mtcp->tcp_flow_table; 786 tcp_stream *walk; 787 int cnt, i; 788 789 cnt = 0; 790 791 thread_printf(mtcp, mtcp->log_fp, 792 "CPU %d: Flushing remaining flows.\n", mtcp->ctx->cpu); 793 794 for (i = 0; i < NUM_BINS; i++) { 795 TAILQ_FOREACH(walk, &ht->ht_table[i], rcvvar->he_link) { 796 thread_printf(mtcp, mtcp->log_fp, 797 "CPU %d: Destroying stream %d\n", mtcp->ctx->cpu, walk->id); 798 DumpStream(mtcp, walk); 799 DestroyTCPStream(mtcp, walk); 800 cnt++; 801 } 802 } 803 804 return cnt; 805 } 806 #endif 807 /*----------------------------------------------------------------------------*/ 808 static void 809 InterruptApplication(mtcp_manager_t mtcp) 810 { 811 /* interrupt if the mtcp_epoll_wait() is waiting */ 812 if (mtcp->ep) { 813 pthread_mutex_lock(&mtcp->ep->epoll_lock); 814 if (mtcp->ep->waiting) { 815 pthread_cond_signal(&mtcp->ep->epoll_cond); 816 } 817 pthread_mutex_unlock(&mtcp->ep->epoll_lock); 818 } 819 /* interrupt if the accept() is waiting */ 820 if (mtcp->listener) { 821 if (mtcp->listener->socket) { 822 pthread_mutex_lock(&mtcp->listener->accept_lock); 823 if (!(mtcp->listener->socket->opts & MTCP_NONBLOCK)) { 824 pthread_cond_signal(&mtcp->listener->accept_cond); 825 } 826 pthread_mutex_unlock(&mtcp->listener->accept_lock); 827 } 828 } 829 } 830 /*----------------------------------------------------------------------------*/ 831 void 832 RunPassiveLoop(mtcp_manager_t mtcp) 833 { 834 sem_wait(&g_done_sem[mtcp->ctx->cpu]); 835 sem_destroy(&g_done_sem[mtcp->ctx->cpu]); 836 return; 837 } 838 /*----------------------------------------------------------------------------*/ 839 static void 840 RunMainLoop(struct mtcp_thread_context *ctx) 841 { 842 mtcp_manager_t mtcp = ctx->mtcp_manager; 843 int i; 844 int recv_cnt; 845 846 #if E_PSIO 847 int rx_inf, tx_inf; 848 #endif 849 850 struct timeval cur_ts = {0}; 851 uint32_t ts, ts_prev; 852 853 #if TIME_STAT 854 struct timeval prev_ts, processing_ts, tcheck_ts, 855 epoll_ts, handle_ts, xmit_ts, select_ts; 856 #endif 857 int thresh; 858 859 gettimeofday(&cur_ts, NULL); 860 861 #if E_PSIO 862 /* do nothing */ 863 #else 864 #if !USE_CHUNK_BUF 865 /* create packet write chunk */ 866 InitWriteChunks(handle, ctx->w_chunk); 867 for (i = 0; i < ETH_NUM; i++) { 868 ctx->w_chunk[i].cnt = 0; 869 ctx->w_off[i] = 0; 870 ctx->w_cur_idx[i] = 0; 871 } 872 #endif 873 #endif 874 875 TRACE_DBG("CPU %d: mtcp thread running.\n", ctx->cpu); 876 877 #if TIME_STAT 878 prev_ts = cur_ts; 879 InitStatCounter(&mtcp->rtstat.round); 880 InitStatCounter(&mtcp->rtstat.processing); 881 InitStatCounter(&mtcp->rtstat.tcheck); 882 InitStatCounter(&mtcp->rtstat.epoll); 883 InitStatCounter(&mtcp->rtstat.handle); 884 InitStatCounter(&mtcp->rtstat.xmit); 885 InitStatCounter(&mtcp->rtstat.select); 886 #endif 887 888 ts = ts_prev = 0; 889 while ((!ctx->done || mtcp->flow_cnt) && !ctx->exit) { 890 891 STAT_COUNT(mtcp->runstat.rounds); 892 recv_cnt = 0; 893 894 #if E_PSIO 895 #if 0 896 event.timeout = PS_SELECT_TIMEOUT; 897 NID_ZERO(event.rx_nids); 898 NID_ZERO(event.tx_nids); 899 NID_ZERO(rx_avail); 900 //NID_ZERO(tx_avail); 901 #endif 902 gettimeofday(&cur_ts, NULL); 903 #if TIME_STAT 904 /* measure the inter-round delay */ 905 UpdateStatCounter(&mtcp->rtstat.round, TimeDiffUs(&cur_ts, &prev_ts)); 906 prev_ts = cur_ts; 907 #endif 908 909 ts = TIMEVAL_TO_TS(&cur_ts); 910 mtcp->cur_ts = ts; 911 912 for (rx_inf = 0; rx_inf < g_config.mos->netdev_table->num; rx_inf++) { 913 914 recv_cnt = mtcp->iom->recv_pkts(ctx, rx_inf); 915 STAT_COUNT(mtcp->runstat.rounds_rx_try); 916 917 for (i = 0; i < recv_cnt; i++) { 918 uint16_t len; 919 uint8_t *pktbuf; 920 pktbuf = mtcp->iom->get_rptr(mtcp->ctx, rx_inf, i, &len); 921 ProcessPacket(mtcp, rx_inf, i, ts, pktbuf, len); 922 } 923 924 #ifdef ENABLE_DPDKR 925 mtcp->iom->send_pkts(ctx, rx_inf); 926 continue; 927 #endif 928 } 929 STAT_COUNT(mtcp->runstat.rounds_rx); 930 931 #else /* E_PSIO */ 932 gettimeofday(&cur_ts, NULL); 933 ts = TIMEVAL_TO_TS(&cur_ts); 934 mtcp->cur_ts = ts; 935 /* 936 * Read packets into a chunk from NIC 937 * chk_w_idx : next chunk index to write packets from NIC 938 */ 939 940 STAT_COUNT(mtcp->runstat.rounds_rx_try); 941 chunk.cnt = PS_CHUNK_SIZE; 942 recv_cnt = ps_recv_chunk(handle, &chunk); 943 if (recv_cnt < 0) { 944 if (errno != EAGAIN && errno != EINTR) { 945 perror("ps_recv_chunk"); 946 assert(0); 947 } 948 } 949 950 /* 951 * Handling Packets 952 * chk_r_idx : next chunk index to read and handle 953 */ 954 for (i = 0; i < recv_cnt; i++) { 955 ProcessPacket(mtcp, chunk.queue.ifindex, ts, 956 (u_char *)(chunk.buf + chunk.info[i].offset), 957 chunk.info[i].len); 958 } 959 960 if (recv_cnt > 0) 961 STAT_COUNT(mtcp->runstat.rounds_rx); 962 #endif /* E_PSIO */ 963 #if TIME_STAT 964 gettimeofday(&processing_ts, NULL); 965 UpdateStatCounter(&mtcp->rtstat.processing, 966 TimeDiffUs(&processing_ts, &cur_ts)); 967 #endif /* TIME_STAT */ 968 969 /* Handle user defined timeout */ 970 struct timer *walk, *tmp; 971 for (walk = TAILQ_FIRST(&mtcp->timer_list); walk != NULL; walk = tmp) { 972 tmp = TAILQ_NEXT(walk, timer_link); 973 if (TIMEVAL_LT(&cur_ts, &walk->exp)) 974 break; 975 976 struct mtcp_context mctx = {.cpu = ctx->cpu}; 977 walk->cb(&mctx, walk->id, 0, 0 /* FIXME */, NULL); 978 DelTimer(mtcp, walk); 979 } 980 981 /* interaction with application */ 982 if (mtcp->flow_cnt > 0) { 983 984 /* check retransmission timeout and timewait expire */ 985 #if 0 986 thresh = (int)mtcp->flow_cnt / (TS_TO_USEC(PER_STREAM_TCHECK)); 987 assert(thresh >= 0); 988 if (thresh == 0) 989 thresh = 1; 990 if (recv_cnt > 0 && thresh > recv_cnt) 991 thresh = recv_cnt; 992 #else 993 thresh = g_config.mos->max_concurrency; 994 #endif 995 996 /* Eunyoung, you may fix this later 997 * if there is no rcv packet, we will send as much as possible 998 */ 999 if (thresh == -1) 1000 thresh = g_config.mos->max_concurrency; 1001 1002 CheckRtmTimeout(mtcp, ts, thresh); 1003 CheckTimewaitExpire(mtcp, ts, thresh); 1004 1005 if (g_config.mos->tcp_timeout > 0 && ts != ts_prev) { 1006 CheckConnectionTimeout(mtcp, ts, thresh); 1007 } 1008 1009 #if TIME_STAT 1010 } 1011 gettimeofday(&tcheck_ts, NULL); 1012 UpdateStatCounter(&mtcp->rtstat.tcheck, 1013 TimeDiffUs(&tcheck_ts, &processing_ts)); 1014 1015 if (mtcp->flow_cnt > 0) { 1016 #endif /* TIME_STAT */ 1017 1018 } 1019 1020 /* 1021 * before flushing epoll events, call monitor events for 1022 * all registered `read` events 1023 */ 1024 if (mtcp->num_msp > 0) 1025 /* call this when only a standalone monitor is running */ 1026 FlushMonitorReadEvents(mtcp); 1027 1028 /* if epoll is in use, flush all the queued events */ 1029 if (mtcp->ep) { 1030 FlushBufferedReadEvents(mtcp); 1031 FlushEpollEvents(mtcp, ts); 1032 } 1033 #if TIME_STAT 1034 gettimeofday(&epoll_ts, NULL); 1035 UpdateStatCounter(&mtcp->rtstat.epoll, 1036 TimeDiffUs(&epoll_ts, &tcheck_ts)); 1037 #endif /* TIME_STAT */ 1038 1039 if (end_app_exists && mtcp->flow_cnt > 0) { 1040 /* handle stream queues */ 1041 HandleApplicationCalls(mtcp, ts); 1042 } 1043 1044 #ifdef ENABLE_DPDKR 1045 continue; 1046 #endif 1047 1048 #if TIME_STAT 1049 gettimeofday(&handle_ts, NULL); 1050 UpdateStatCounter(&mtcp->rtstat.handle, 1051 TimeDiffUs(&handle_ts, &epoll_ts)); 1052 #endif /* TIME_STAT */ 1053 1054 WritePacketsToChunks(mtcp, ts); 1055 1056 /* send packets from write buffer */ 1057 #if E_PSIO 1058 /* With E_PSIO, send until tx is available */ 1059 int num_dev = g_config.mos->netdev_table->num; 1060 if (likely(mtcp->iom->send_pkts != NULL)) 1061 for (tx_inf = 0; tx_inf < num_dev; tx_inf++) { 1062 mtcp->iom->send_pkts(ctx, tx_inf); 1063 } 1064 1065 #else /* E_PSIO */ 1066 /* Without E_PSIO, try send chunks immediately */ 1067 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 1068 #if USE_CHUNK_BUF 1069 /* in the case of using ps_send_chunk_buf() without E_PSIO */ 1070 ret = FlushSendChunkBuf(mtcp, i); 1071 #else 1072 /* if not using ps_send_chunk_buf() */ 1073 ret = FlushWriteBuffer(ctx, i); 1074 #endif 1075 if (ret < 0) { 1076 TRACE_ERROR("Failed to send chunks.\n"); 1077 } else if (ret > 0) { 1078 STAT_COUNT(mtcp->runstat.rounds_tx); 1079 } 1080 } 1081 #endif /* E_PSIO */ 1082 1083 #if TIME_STAT 1084 gettimeofday(&xmit_ts, NULL); 1085 UpdateStatCounter(&mtcp->rtstat.xmit, 1086 TimeDiffUs(&xmit_ts, &handle_ts)); 1087 #endif /* TIME_STAT */ 1088 1089 if (ts != ts_prev) { 1090 ts_prev = ts; 1091 #ifdef NETSTAT 1092 if (ctx->cpu == printer) { 1093 #ifdef RUN_ARP 1094 ARPTimer(mtcp, ts); 1095 #endif 1096 #ifdef NETSTAT 1097 PrintNetworkStats(mtcp, ts); 1098 #endif 1099 } 1100 #endif /* NETSTAT */ 1101 } 1102 1103 if (mtcp->iom->select) 1104 mtcp->iom->select(ctx); 1105 1106 if (ctx->interrupt) { 1107 InterruptApplication(mtcp); 1108 } 1109 } 1110 1111 #if TESTING 1112 DestroyRemainingFlows(mtcp); 1113 #endif 1114 1115 TRACE_DBG("MTCP thread %d out of main loop.\n", ctx->cpu); 1116 /* flush logs */ 1117 flush_log_data(mtcp); 1118 TRACE_DBG("MTCP thread %d flushed logs.\n", ctx->cpu); 1119 InterruptApplication(mtcp); 1120 TRACE_INFO("MTCP thread %d finished.\n", ctx->cpu); 1121 } 1122 /*----------------------------------------------------------------------------*/ 1123 struct mtcp_sender * 1124 CreateMTCPSender(int ifidx) 1125 { 1126 struct mtcp_sender *sender; 1127 1128 sender = (struct mtcp_sender *)calloc(1, sizeof(struct mtcp_sender)); 1129 if (!sender) { 1130 return NULL; 1131 } 1132 1133 sender->ifidx = ifidx; 1134 1135 TAILQ_INIT(&sender->control_list); 1136 TAILQ_INIT(&sender->send_list); 1137 TAILQ_INIT(&sender->ack_list); 1138 1139 sender->control_list_cnt = 0; 1140 sender->send_list_cnt = 0; 1141 sender->ack_list_cnt = 0; 1142 1143 return sender; 1144 } 1145 /*----------------------------------------------------------------------------*/ 1146 void 1147 DestroyMTCPSender(struct mtcp_sender *sender) 1148 { 1149 free(sender); 1150 } 1151 /*----------------------------------------------------------------------------*/ 1152 static mtcp_manager_t 1153 InitializeMTCPManager(struct mtcp_thread_context* ctx) 1154 { 1155 mtcp_manager_t mtcp; 1156 char log_name[MAX_FILE_NAME]; 1157 int i; 1158 1159 posix_seq_srand((unsigned)pthread_self()); 1160 1161 mtcp = (mtcp_manager_t)calloc(1, sizeof(struct mtcp_manager)); 1162 if (!mtcp) { 1163 perror("malloc"); 1164 CTRACE_ERROR("Failed to allocate mtcp_manager.\n"); 1165 return NULL; 1166 } 1167 g_mtcp[ctx->cpu] = mtcp; 1168 1169 mtcp->tcp_flow_table = CreateHashtable(); 1170 if (!mtcp->tcp_flow_table) { 1171 CTRACE_ERROR("Falied to allocate tcp flow table.\n"); 1172 return NULL; 1173 } 1174 1175 #ifdef HUGEPAGE 1176 #define IS_HUGEPAGE 1 1177 #else 1178 #define IS_HUGEPAGE 0 1179 #endif 1180 if (mon_app_exists) { 1181 /* initialize event callback */ 1182 #ifdef NEWEV 1183 InitEvent(mtcp); 1184 #else 1185 InitEvent(mtcp, NUM_EV_TABLE); 1186 #endif 1187 } 1188 1189 if (!(mtcp->bufseg_pool = MPCreate(sizeof(tcpbufseg_t), 1190 sizeof(tcpbufseg_t) * g_config.mos->max_concurrency * 1191 ((g_config.mos->rmem_size - 1) / UNITBUFSIZE + 1), 0))) { 1192 TRACE_ERROR("Failed to allocate ev_table pool\n"); 1193 exit(0); 1194 } 1195 if (!(mtcp->sockent_pool = MPCreate(sizeof(struct sockent), 1196 sizeof(struct sockent) * g_config.mos->max_concurrency * 3, 0))) { 1197 TRACE_ERROR("Failed to allocate ev_table pool\n"); 1198 exit(0); 1199 } 1200 #ifdef USE_TIMER_POOL 1201 if (!(mtcp->timer_pool = MPCreate(sizeof(struct timer), 1202 sizeof(struct timer) * g_config.mos->max_concurrency * 10, 0))) { 1203 TRACE_ERROR("Failed to allocate ev_table pool\n"); 1204 exit(0); 1205 } 1206 #endif 1207 mtcp->flow_pool = MPCreate(sizeof(tcp_stream), 1208 sizeof(tcp_stream) * g_config.mos->max_concurrency, IS_HUGEPAGE); 1209 if (!mtcp->flow_pool) { 1210 CTRACE_ERROR("Failed to allocate tcp flow pool.\n"); 1211 return NULL; 1212 } 1213 mtcp->rv_pool = MPCreate(sizeof(struct tcp_recv_vars), 1214 sizeof(struct tcp_recv_vars) * g_config.mos->max_concurrency, IS_HUGEPAGE); 1215 if (!mtcp->rv_pool) { 1216 CTRACE_ERROR("Failed to allocate tcp recv variable pool.\n"); 1217 return NULL; 1218 } 1219 mtcp->sv_pool = MPCreate(sizeof(struct tcp_send_vars), 1220 sizeof(struct tcp_send_vars) * g_config.mos->max_concurrency, IS_HUGEPAGE); 1221 if (!mtcp->sv_pool) { 1222 CTRACE_ERROR("Failed to allocate tcp send variable pool.\n"); 1223 return NULL; 1224 } 1225 1226 mtcp->rbm_snd = SBManagerCreate(g_config.mos->wmem_size, g_config.mos->no_ring_buffers, 1227 g_config.mos->max_concurrency); 1228 if (!mtcp->rbm_snd) { 1229 CTRACE_ERROR("Failed to create send ring buffer.\n"); 1230 return NULL; 1231 } 1232 #ifndef NEWRB 1233 mtcp->rbm_rcv = RBManagerCreate(g_config.mos->rmem_size, g_config.mos->no_ring_buffers, 1234 g_config.mos->max_concurrency); 1235 if (!mtcp->rbm_rcv) { 1236 CTRACE_ERROR("Failed to create recv ring buffer.\n"); 1237 return NULL; 1238 } 1239 #endif 1240 1241 mtcp->smap = (socket_map_t)calloc(g_config.mos->max_concurrency, sizeof(struct socket_map)); 1242 if (!mtcp->smap) { 1243 perror("calloc"); 1244 CTRACE_ERROR("Failed to allocate memory for stream map.\n"); 1245 return NULL; 1246 } 1247 1248 if (mon_app_exists) { 1249 mtcp->msmap = (socket_map_t)calloc(g_config.mos->max_concurrency, sizeof(struct socket_map)); 1250 if (!mtcp->msmap) { 1251 perror("calloc"); 1252 CTRACE_ERROR("Failed to allocate memory for monitor stream map.\n"); 1253 return NULL; 1254 } 1255 1256 for (i = 0; i < g_config.mos->max_concurrency; i++) { 1257 mtcp->msmap[i].monitor_stream = calloc(1, sizeof(struct mon_stream)); 1258 if (!mtcp->msmap[i].monitor_stream) { 1259 perror("calloc"); 1260 CTRACE_ERROR("Failed to allocate memory for monitr stream map\n"); 1261 return NULL; 1262 } 1263 } 1264 } 1265 1266 TAILQ_INIT(&mtcp->timer_list); 1267 TAILQ_INIT(&mtcp->monitors); 1268 1269 TAILQ_INIT(&mtcp->free_smap); 1270 for (i = 0; i < g_config.mos->max_concurrency; i++) { 1271 mtcp->smap[i].id = i; 1272 mtcp->smap[i].socktype = MOS_SOCK_UNUSED; 1273 memset(&mtcp->smap[i].saddr, 0, sizeof(struct sockaddr_in)); 1274 mtcp->smap[i].stream = NULL; 1275 TAILQ_INSERT_TAIL(&mtcp->free_smap, &mtcp->smap[i], link); 1276 } 1277 1278 if (mon_app_exists) { 1279 TAILQ_INIT(&mtcp->free_msmap); 1280 for (i = 0; i < g_config.mos->max_concurrency; i++) { 1281 mtcp->msmap[i].id = i; 1282 mtcp->msmap[i].socktype = MOS_SOCK_UNUSED; 1283 memset(&mtcp->msmap[i].saddr, 0, sizeof(struct sockaddr_in)); 1284 TAILQ_INSERT_TAIL(&mtcp->free_msmap, &mtcp->msmap[i], link); 1285 } 1286 } 1287 1288 mtcp->ctx = ctx; 1289 mtcp->ep = NULL; 1290 1291 snprintf(log_name, MAX_FILE_NAME, "%s/"LOG_FILE_NAME"_%d", 1292 g_config.mos->mos_log, ctx->cpu); 1293 mtcp->log_fp = fopen(log_name, "w+"); 1294 if (!mtcp->log_fp) { 1295 perror("fopen"); 1296 CTRACE_ERROR("Failed to create file for logging. (%s)\n", log_name); 1297 return NULL; 1298 } 1299 mtcp->sp_fd = g_logctx[ctx->cpu]->pair_sp_fd; 1300 mtcp->logger = g_logctx[ctx->cpu]; 1301 1302 mtcp->connectq = CreateStreamQueue(BACKLOG_SIZE); 1303 if (!mtcp->connectq) { 1304 CTRACE_ERROR("Failed to create connect queue.\n"); 1305 return NULL; 1306 } 1307 mtcp->sendq = CreateStreamQueue(g_config.mos->max_concurrency); 1308 if (!mtcp->sendq) { 1309 CTRACE_ERROR("Failed to create send queue.\n"); 1310 return NULL; 1311 } 1312 mtcp->ackq = CreateStreamQueue(g_config.mos->max_concurrency); 1313 if (!mtcp->ackq) { 1314 CTRACE_ERROR("Failed to create ack queue.\n"); 1315 return NULL; 1316 } 1317 mtcp->closeq = CreateStreamQueue(g_config.mos->max_concurrency); 1318 if (!mtcp->closeq) { 1319 CTRACE_ERROR("Failed to create close queue.\n"); 1320 return NULL; 1321 } 1322 mtcp->closeq_int = CreateInternalStreamQueue(g_config.mos->max_concurrency); 1323 if (!mtcp->closeq_int) { 1324 CTRACE_ERROR("Failed to create close queue.\n"); 1325 return NULL; 1326 } 1327 mtcp->resetq = CreateStreamQueue(g_config.mos->max_concurrency); 1328 if (!mtcp->resetq) { 1329 CTRACE_ERROR("Failed to create reset queue.\n"); 1330 return NULL; 1331 } 1332 mtcp->resetq_int = CreateInternalStreamQueue(g_config.mos->max_concurrency); 1333 if (!mtcp->resetq_int) { 1334 CTRACE_ERROR("Failed to create reset queue.\n"); 1335 return NULL; 1336 } 1337 mtcp->destroyq = CreateStreamQueue(g_config.mos->max_concurrency); 1338 if (!mtcp->destroyq) { 1339 CTRACE_ERROR("Failed to create destroy queue.\n"); 1340 return NULL; 1341 } 1342 1343 mtcp->g_sender = CreateMTCPSender(-1); 1344 if (!mtcp->g_sender) { 1345 CTRACE_ERROR("Failed to create global sender structure.\n"); 1346 return NULL; 1347 } 1348 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 1349 mtcp->n_sender[i] = CreateMTCPSender(i); 1350 if (!mtcp->n_sender[i]) { 1351 CTRACE_ERROR("Failed to create per-nic sender structure.\n"); 1352 return NULL; 1353 } 1354 } 1355 1356 mtcp->rto_store = InitRTOHashstore(); 1357 TAILQ_INIT(&mtcp->timewait_list); 1358 TAILQ_INIT(&mtcp->timeout_list); 1359 1360 return mtcp; 1361 } 1362 /*----------------------------------------------------------------------------*/ 1363 static void * 1364 MTCPRunThread(void *arg) 1365 { 1366 mctx_t mctx = (mctx_t)arg; 1367 int cpu = mctx->cpu; 1368 int working; 1369 struct mtcp_manager *mtcp; 1370 struct mtcp_thread_context *ctx; 1371 1372 /* affinitize the thread to this core first */ 1373 mtcp_core_affinitize(cpu); 1374 1375 /* memory alloc after core affinitization would use local memory 1376 most time */ 1377 ctx = calloc(1, sizeof(*ctx)); 1378 if (!ctx) { 1379 perror("calloc"); 1380 TRACE_ERROR("Failed to calloc mtcp context.\n"); 1381 exit(-1); 1382 } 1383 ctx->thread = pthread_self(); 1384 ctx->cpu = cpu; 1385 mtcp = ctx->mtcp_manager = InitializeMTCPManager(ctx); 1386 if (!mtcp) { 1387 TRACE_ERROR("Failed to initialize mtcp manager.\n"); 1388 exit(-1); 1389 } 1390 1391 /* assign mtcp context's underlying I/O module */ 1392 mtcp->iom = current_iomodule_func; 1393 1394 /* I/O initializing */ 1395 if (mtcp->iom->init_handle) 1396 mtcp->iom->init_handle(ctx); 1397 1398 if (pthread_mutex_init(&ctx->flow_pool_lock, NULL)) { 1399 perror("pthread_mutex_init of ctx->flow_pool_lock\n"); 1400 exit(-1); 1401 } 1402 1403 if (pthread_mutex_init(&ctx->socket_pool_lock, NULL)) { 1404 perror("pthread_mutex_init of ctx->socket_pool_lock\n"); 1405 exit(-1); 1406 } 1407 1408 SQ_LOCK_INIT(&ctx->connect_lock, "ctx->connect_lock", exit(-1)); 1409 SQ_LOCK_INIT(&ctx->close_lock, "ctx->close_lock", exit(-1)); 1410 SQ_LOCK_INIT(&ctx->reset_lock, "ctx->reset_lock", exit(-1)); 1411 SQ_LOCK_INIT(&ctx->sendq_lock, "ctx->sendq_lock", exit(-1)); 1412 SQ_LOCK_INIT(&ctx->ackq_lock, "ctx->ackq_lock", exit(-1)); 1413 SQ_LOCK_INIT(&ctx->destroyq_lock, "ctx->destroyq_lock", exit(-1)); 1414 1415 /* remember this context pointer for signal processing */ 1416 g_pctx[cpu] = ctx; 1417 mlockall(MCL_CURRENT); 1418 1419 // attach (nic device, queue) 1420 working = AttachDevice(ctx); 1421 if (working != 0) { 1422 sem_post(&g_init_sem[ctx->cpu]); 1423 TRACE_DBG("MTCP thread %d finished. Not attached any device\n", ctx->cpu); 1424 pthread_exit(NULL); 1425 } 1426 1427 TRACE_DBG("CPU %d: initialization finished.\n", cpu); 1428 sem_post(&g_init_sem[ctx->cpu]); 1429 1430 /* start the main loop */ 1431 RunMainLoop(ctx); 1432 1433 TRACE_DBG("MTCP thread %d finished.\n", ctx->cpu); 1434 1435 /* signaling mTCP thread is done */ 1436 sem_post(&g_done_sem[mctx->cpu]); 1437 1438 //pthread_exit(NULL); 1439 return 0; 1440 } 1441 /*----------------------------------------------------------------------------*/ 1442 #ifdef ENABLE_DPDK 1443 static int MTCPDPDKRunThread(void *arg) 1444 { 1445 MTCPRunThread(arg); 1446 return 0; 1447 } 1448 #endif /* !ENABLE_DPDK */ 1449 /*----------------------------------------------------------------------------*/ 1450 mctx_t 1451 mtcp_create_context(int cpu) 1452 { 1453 mctx_t mctx; 1454 int ret; 1455 1456 if (cpu >= g_config.mos->num_cores) { 1457 TRACE_ERROR("Failed initialize new mtcp context. " 1458 "Requested cpu id %d exceed the number of cores %d configured to use.\n", 1459 cpu, g_config.mos->num_cores); 1460 return NULL; 1461 } 1462 1463 /* check if mtcp_create_context() was already initialized */ 1464 if (g_logctx[cpu] != NULL) { 1465 TRACE_ERROR("%s was already initialized before!\n", 1466 __FUNCTION__); 1467 return NULL; 1468 } 1469 1470 ret = sem_init(&g_init_sem[cpu], 0, 0); 1471 if (ret) { 1472 TRACE_ERROR("Failed initialize init_sem.\n"); 1473 return NULL; 1474 } 1475 1476 ret = sem_init(&g_done_sem[cpu], 0, 0); 1477 if (ret) { 1478 TRACE_ERROR("Failed initialize done_sem.\n"); 1479 return NULL; 1480 } 1481 1482 mctx = (mctx_t)calloc(1, sizeof(struct mtcp_context)); 1483 if (!mctx) { 1484 TRACE_ERROR("Failed to allocate memory for mtcp_context.\n"); 1485 return NULL; 1486 } 1487 mctx->cpu = cpu; 1488 g_ctx[cpu] = mctx; 1489 1490 /* initialize logger */ 1491 g_logctx[cpu] = (struct log_thread_context *) 1492 calloc(1, sizeof(struct log_thread_context)); 1493 if (!g_logctx[cpu]) { 1494 perror("malloc"); 1495 TRACE_ERROR("Failed to allocate memory for log thread context.\n"); 1496 return NULL; 1497 } 1498 InitLogThreadContext(g_logctx[cpu], cpu); 1499 if (pthread_create(&log_thread[cpu], 1500 NULL, ThreadLogMain, (void *)g_logctx[cpu])) { 1501 perror("pthread_create"); 1502 TRACE_ERROR("Failed to create log thread\n"); 1503 return NULL; 1504 } 1505 1506 #ifdef ENABLE_DPDK 1507 /* Wake up mTCP threads (wake up I/O threads) */ 1508 if (current_iomodule_func == &dpdk_module_func) { 1509 int master; 1510 master = rte_get_master_lcore(); 1511 if (master == cpu) { 1512 lcore_config[master].ret = 0; 1513 lcore_config[master].state = FINISHED; 1514 if (pthread_create(&g_thread[cpu], 1515 NULL, MTCPRunThread, (void *)mctx) != 0) { 1516 TRACE_ERROR("pthread_create of mtcp thread failed!\n"); 1517 return NULL; 1518 } 1519 } else 1520 rte_eal_remote_launch(MTCPDPDKRunThread, mctx, cpu); 1521 } else 1522 #endif /* !ENABLE_DPDK */ 1523 { 1524 if (pthread_create(&g_thread[cpu], 1525 NULL, MTCPRunThread, (void *)mctx) != 0) { 1526 TRACE_ERROR("pthread_create of mtcp thread failed!\n"); 1527 return NULL; 1528 } 1529 } 1530 1531 sem_wait(&g_init_sem[cpu]); 1532 sem_destroy(&g_init_sem[cpu]); 1533 1534 running[cpu] = TRUE; 1535 1536 #ifdef NETSTAT 1537 #if NETSTAT_TOTAL 1538 if (printer < 0) { 1539 printer = cpu; 1540 TRACE_INFO("CPU %d is in charge of printing stats.\n", printer); 1541 } 1542 #endif 1543 #endif 1544 1545 return mctx; 1546 } 1547 /*----------------------------------------------------------------------------*/ 1548 /** 1549 * TODO: It currently always returns 0. Add appropriate error return values 1550 */ 1551 int 1552 mtcp_destroy_context(mctx_t mctx) 1553 { 1554 struct mtcp_thread_context *ctx = g_pctx[mctx->cpu]; 1555 struct mtcp_manager *mtcp = ctx->mtcp_manager; 1556 struct log_thread_context *log_ctx = mtcp->logger; 1557 int ret, i; 1558 1559 TRACE_DBG("CPU %d: mtcp_destroy_context()\n", mctx->cpu); 1560 1561 /* close all stream sockets that are still open */ 1562 if (!ctx->exit) { 1563 for (i = 0; i < g_config.mos->max_concurrency; i++) { 1564 if (mtcp->smap[i].socktype == MOS_SOCK_STREAM) { 1565 TRACE_DBG("Closing remaining socket %d (%s)\n", 1566 i, TCPStateToString(mtcp->smap[i].stream)); 1567 DumpStream(mtcp, mtcp->smap[i].stream); 1568 mtcp_close(mctx, i); 1569 } 1570 } 1571 } 1572 1573 ctx->done = 1; 1574 1575 //pthread_kill(g_thread[mctx->cpu], SIGINT); 1576 #ifdef ENABLE_DPDK 1577 ctx->exit = 1; 1578 /* XXX - dpdk logic changes */ 1579 if (current_iomodule_func == &dpdk_module_func) { 1580 int master = rte_get_master_lcore(); 1581 if (master == mctx->cpu) 1582 pthread_join(g_thread[mctx->cpu], NULL); 1583 else 1584 rte_eal_wait_lcore(mctx->cpu); 1585 } else 1586 #endif /* !ENABLE_DPDK */ 1587 { 1588 pthread_join(g_thread[mctx->cpu], NULL); 1589 } 1590 1591 TRACE_INFO("MTCP thread %d joined.\n", mctx->cpu); 1592 running[mctx->cpu] = FALSE; 1593 1594 #ifdef NETSTAT 1595 #if NETSTAT_TOTAL 1596 if (printer == mctx->cpu) { 1597 for (i = 0; i < num_cpus; i++) { 1598 if (i != mctx->cpu && running[i]) { 1599 printer = i; 1600 break; 1601 } 1602 } 1603 } 1604 #endif 1605 #endif 1606 1607 log_ctx->done = 1; 1608 ret = write(log_ctx->pair_sp_fd, "F", 1); 1609 if (ret != 1) 1610 TRACE_ERROR("CPU %d: Fail to signal socket pair\n", mctx->cpu); 1611 1612 pthread_join(log_thread[ctx->cpu], NULL); 1613 fclose(mtcp->log_fp); 1614 TRACE_LOG("Log thread %d joined.\n", mctx->cpu); 1615 1616 if (mtcp->connectq) { 1617 DestroyStreamQueue(mtcp->connectq); 1618 mtcp->connectq = NULL; 1619 } 1620 if (mtcp->sendq) { 1621 DestroyStreamQueue(mtcp->sendq); 1622 mtcp->sendq = NULL; 1623 } 1624 if (mtcp->ackq) { 1625 DestroyStreamQueue(mtcp->ackq); 1626 mtcp->ackq = NULL; 1627 } 1628 if (mtcp->closeq) { 1629 DestroyStreamQueue(mtcp->closeq); 1630 mtcp->closeq = NULL; 1631 } 1632 if (mtcp->closeq_int) { 1633 DestroyInternalStreamQueue(mtcp->closeq_int); 1634 mtcp->closeq_int = NULL; 1635 } 1636 if (mtcp->resetq) { 1637 DestroyStreamQueue(mtcp->resetq); 1638 mtcp->resetq = NULL; 1639 } 1640 if (mtcp->resetq_int) { 1641 DestroyInternalStreamQueue(mtcp->resetq_int); 1642 mtcp->resetq_int = NULL; 1643 } 1644 if (mtcp->destroyq) { 1645 DestroyStreamQueue(mtcp->destroyq); 1646 mtcp->destroyq = NULL; 1647 } 1648 1649 DestroyMTCPSender(mtcp->g_sender); 1650 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 1651 DestroyMTCPSender(mtcp->n_sender[i]); 1652 } 1653 1654 MPDestroy(mtcp->rv_pool); 1655 MPDestroy(mtcp->sv_pool); 1656 MPDestroy(mtcp->flow_pool); 1657 1658 if (mtcp->ap) { 1659 DestroyAddressPool(mtcp->ap); 1660 } 1661 1662 SQ_LOCK_DESTROY(&ctx->connect_lock); 1663 SQ_LOCK_DESTROY(&ctx->close_lock); 1664 SQ_LOCK_DESTROY(&ctx->reset_lock); 1665 SQ_LOCK_DESTROY(&ctx->sendq_lock); 1666 SQ_LOCK_DESTROY(&ctx->ackq_lock); 1667 SQ_LOCK_DESTROY(&ctx->destroyq_lock); 1668 1669 //TRACE_INFO("MTCP thread %d destroyed.\n", mctx->cpu); 1670 if (mtcp->iom->destroy_handle) 1671 mtcp->iom->destroy_handle(ctx); 1672 free(ctx); 1673 free(mctx); 1674 1675 return 0; 1676 } 1677 /*----------------------------------------------------------------------------*/ 1678 mtcp_sighandler_t 1679 mtcp_register_signal(int signum, mtcp_sighandler_t handler) 1680 { 1681 mtcp_sighandler_t prev; 1682 1683 if (signum == SIGINT) { 1684 prev = app_signal_handler; 1685 app_signal_handler = handler; 1686 } else { 1687 if ((prev = signal(signum, handler)) == SIG_ERR) { 1688 perror("signal"); 1689 return SIG_ERR; 1690 } 1691 } 1692 1693 return prev; 1694 } 1695 /*----------------------------------------------------------------------------*/ 1696 int 1697 mtcp_getconf(struct mtcp_conf *conf) 1698 { 1699 int i, j; 1700 1701 if (!conf) 1702 return -1; 1703 1704 conf->num_cores = g_config.mos->num_cores; 1705 conf->max_concurrency = g_config.mos->max_concurrency; 1706 conf->cpu_mask = g_config.mos->cpu_mask; 1707 1708 conf->rcvbuf_size = g_config.mos->rmem_size; 1709 conf->sndbuf_size = g_config.mos->wmem_size; 1710 1711 conf->tcp_timewait = g_config.mos->tcp_tw_interval; 1712 conf->tcp_timeout = g_config.mos->tcp_timeout; 1713 1714 i = 0; 1715 struct conf_block *bwalk; 1716 TAILQ_FOREACH(bwalk, &g_config.app_blkh, link) { 1717 struct app_conf *app_conf = (struct app_conf *)bwalk->conf; 1718 for (j = 0; j < app_conf->app_argc; j++) 1719 conf->app_argv[i][j] = app_conf->app_argv[j]; 1720 conf->app_argc[i] = app_conf->app_argc; 1721 conf->app_cpu_mask[i] = app_conf->cpu_mask; 1722 i++; 1723 } 1724 conf->num_app = i; 1725 1726 return 0; 1727 } 1728 /*----------------------------------------------------------------------------*/ 1729 int 1730 mtcp_setconf(const struct mtcp_conf *conf) 1731 { 1732 if (!conf) 1733 return -1; 1734 1735 g_config.mos->num_cores = conf->num_cores; 1736 g_config.mos->max_concurrency = conf->max_concurrency; 1737 1738 g_config.mos->rmem_size = conf->rcvbuf_size; 1739 g_config.mos->wmem_size = conf->sndbuf_size; 1740 1741 g_config.mos->tcp_tw_interval = conf->tcp_timewait; 1742 g_config.mos->tcp_timeout = conf->tcp_timeout; 1743 1744 TRACE_CONFIG("Configuration updated by mtcp_setconf().\n"); 1745 //PrintConfiguration(); 1746 1747 return 0; 1748 } 1749 /*----------------------------------------------------------------------------*/ 1750 int 1751 mtcp_init(const char *config_file) 1752 { 1753 int i; 1754 int ret; 1755 1756 if (geteuid()) { 1757 TRACE_CONFIG("[CAUTION] Run as root if mlock is necessary.\n"); 1758 } 1759 1760 /* getting cpu and NIC */ 1761 num_cpus = GetNumCPUs(); 1762 assert(num_cpus >= 1); 1763 for (i = 0; i < num_cpus; i++) { 1764 g_mtcp[i] = NULL; 1765 running[i] = FALSE; 1766 sigint_cnt[i] = 0; 1767 } 1768 1769 ret = LoadConfigurationUpperHalf(config_file); 1770 if (ret) { 1771 TRACE_CONFIG("Error occured while loading configuration.\n"); 1772 return -1; 1773 } 1774 1775 #if defined(ENABLE_PSIO) 1776 current_iomodule_func = &ps_module_func; 1777 #elif defined(ENABLE_DPDK) 1778 current_iomodule_func = &dpdk_module_func; 1779 #elif defined(ENABLE_PCAP) 1780 current_iomodule_func = &pcap_module_func; 1781 #elif defined(ENABLE_DPDKR) 1782 current_iomodule_func = &dpdkr_module_func; 1783 #elif defined(ENABLE_NETMAP) 1784 current_iomodule_func = &netmap_module_func; 1785 #endif 1786 1787 if (current_iomodule_func->load_module_upper_half) 1788 current_iomodule_func->load_module_upper_half(); 1789 1790 LoadConfigurationLowerHalf(); 1791 1792 //PrintConfiguration(); 1793 1794 /* TODO: this should be fixed */ 1795 ap = CreateAddressPool(g_config.mos->netdev_table->ent[0]->ip_addr, 1); 1796 if (!ap) { 1797 TRACE_CONFIG("Error occured while creating address pool.\n"); 1798 return -1; 1799 } 1800 1801 //PrintInterfaceInfo(); 1802 //PrintRoutingTable(); 1803 //PrintARPTable(); 1804 InitARPTable(); 1805 1806 if (signal(SIGUSR1, HandleSignal) == SIG_ERR) { 1807 perror("signal, SIGUSR1"); 1808 return -1; 1809 } 1810 if (signal(SIGINT, HandleSignal) == SIG_ERR) { 1811 perror("signal, SIGINT"); 1812 return -1; 1813 } 1814 app_signal_handler = NULL; 1815 1816 printf("load_module(): %p\n", current_iomodule_func); 1817 /* load system-wide io module specs */ 1818 if (current_iomodule_func->load_module_lower_half) 1819 current_iomodule_func->load_module_lower_half(); 1820 1821 GlobInitEvent(); 1822 1823 PrintConf(&g_config); 1824 1825 return 0; 1826 } 1827 /*----------------------------------------------------------------------------*/ 1828 int 1829 mtcp_destroy() 1830 { 1831 int i; 1832 1833 /* wait until all threads are closed */ 1834 for (i = 0; i < num_cpus; i++) { 1835 if (running[i]) { 1836 if (pthread_join(g_thread[i], NULL) != 0) 1837 return -1; 1838 } 1839 } 1840 1841 DestroyAddressPool(ap); 1842 1843 TRACE_INFO("All MTCP threads are joined.\n"); 1844 1845 return 0; 1846 } 1847 /*----------------------------------------------------------------------------*/ 1848