1 #define _GNU_SOURCE 2 #include <sched.h> 3 #include <unistd.h> 4 #include <sys/time.h> 5 #include <semaphore.h> 6 #include <sys/mman.h> 7 #include <signal.h> 8 #include <assert.h> 9 #include <string.h> 10 11 #include "cpu.h" 12 #include "eth_in.h" 13 #include "fhash.h" 14 #include "tcp_send_buffer.h" 15 #include "tcp_ring_buffer.h" 16 #include "socket.h" 17 #include "eth_out.h" 18 #include "tcp.h" 19 #include "tcp_in.h" 20 #include "tcp_out.h" 21 #include "mtcp_api.h" 22 #include "eventpoll.h" 23 #include "logger.h" 24 #include "config.h" 25 #include "arp.h" 26 #include "ip_out.h" 27 #include "timer.h" 28 #include "debug.h" 29 #include "event_callback.h" 30 #include "tcp_rb.h" 31 #include "tcp_stream.h" 32 #include "io_module.h" 33 34 #ifdef ENABLE_DPDK 35 /* for launching rte thread */ 36 #include <rte_launch.h> 37 #include <rte_lcore.h> 38 #endif /* !ENABLE_DPDK */ 39 #define PS_CHUNK_SIZE 64 40 #define RX_THRESH (PS_CHUNK_SIZE * 0.8) 41 42 #define ROUND_STAT FALSE 43 #define TIME_STAT FALSE 44 #define EVENT_STAT FALSE 45 #define TESTING FALSE 46 47 #define LOG_FILE_NAME "log" 48 #define MAX_FILE_NAME 1024 49 50 #define MAX(a, b) ((a)>(b)?(a):(b)) 51 #define MIN(a, b) ((a)<(b)?(a):(b)) 52 53 #define PER_STREAM_SLICE 0.1 // in ms 54 #define PER_STREAM_TCHECK 1 // in ms 55 #define PS_SELECT_TIMEOUT 100 // in us 56 57 #define GBPS(bytes) (bytes * 8.0 / (1000 * 1000 * 1000)) 58 59 /*----------------------------------------------------------------------------*/ 60 /* handlers for threads */ 61 struct mtcp_thread_context *g_pctx[MAX_CPUS] = {0}; 62 struct log_thread_context *g_logctx[MAX_CPUS] = {0}; 63 /*----------------------------------------------------------------------------*/ 64 static pthread_t g_thread[MAX_CPUS] = {0}; 65 static pthread_t log_thread[MAX_CPUS] = {0}; 66 /*----------------------------------------------------------------------------*/ 67 static sem_t g_init_sem[MAX_CPUS]; 68 static sem_t g_done_sem[MAX_CPUS]; 69 static int running[MAX_CPUS] = {0}; 70 /*----------------------------------------------------------------------------*/ 71 mtcp_sighandler_t app_signal_handler; 72 static int sigint_cnt[MAX_CPUS] = {0}; 73 static struct timespec sigint_ts[MAX_CPUS]; 74 /*----------------------------------------------------------------------------*/ 75 #ifdef NETSTAT 76 #if NETSTAT_TOTAL 77 static int printer = -1; 78 #if ROUND_STAT 79 #endif /* ROUND_STAT */ 80 #endif /* NETSTAT_TOTAL */ 81 #endif /* NETSTAT */ 82 void mtcp_free_context(mctx_t mctx); 83 /*----------------------------------------------------------------------------*/ 84 void 85 HandleSignal(int signal) 86 { 87 int i = 0; 88 89 if (signal == SIGINT) { 90 FreeConfigResources(); 91 #ifdef DARWIN 92 int core = 0; 93 #else 94 int core = sched_getcpu(); 95 #endif 96 struct timespec cur_ts; 97 98 clock_gettime(CLOCK_REALTIME, &cur_ts); 99 100 if (sigint_cnt[core] > 0 && cur_ts.tv_sec == sigint_ts[core].tv_sec) { 101 for (i = 0; i < g_config.mos->num_cores; i++) { 102 if (running[i]) { 103 //exit(0); 104 g_pctx[i]->exit = TRUE; 105 } 106 } 107 } else { 108 for (i = 0; i < g_config.mos->num_cores; i++) { 109 if (g_pctx[i]) 110 g_pctx[i]->interrupt = TRUE; 111 } 112 if (!app_signal_handler) { 113 for (i = 0; i < g_config.mos->num_cores; i++) { 114 if (running[i]) { 115 //exit(0); 116 g_pctx[i]->exit = TRUE; 117 } 118 } 119 } 120 } 121 sigint_cnt[core]++; 122 clock_gettime(CLOCK_REALTIME, &sigint_ts[core]); 123 } 124 125 if (signal != SIGUSR1) { 126 if (app_signal_handler) { 127 app_signal_handler(signal); 128 } 129 } 130 } 131 /*----------------------------------------------------------------------------*/ 132 static int 133 AttachDevice(struct mtcp_thread_context* ctx) 134 { 135 int working = -1; 136 mtcp_manager_t mtcp = ctx->mtcp_manager; 137 138 if (mtcp->iom->link_devices) 139 working = mtcp->iom->link_devices(ctx); 140 else 141 return 0; 142 143 return working; 144 } 145 /*----------------------------------------------------------------------------*/ 146 #ifdef TIMESTAT 147 static inline void 148 InitStatCounter(struct stat_counter *counter) 149 { 150 counter->cnt = 0; 151 counter->sum = 0; 152 counter->max = 0; 153 counter->min = 0; 154 } 155 /*----------------------------------------------------------------------------*/ 156 static inline void 157 UpdateStatCounter(struct stat_counter *counter, int64_t value) 158 { 159 counter->cnt++; 160 counter->sum += value; 161 if (value > counter->max) 162 counter->max = value; 163 if (counter->min == 0 || value < counter->min) 164 counter->min = value; 165 } 166 /*----------------------------------------------------------------------------*/ 167 static inline uint64_t 168 GetAverageStat(struct stat_counter *counter) 169 { 170 return counter->cnt ? (counter->sum / counter->cnt) : 0; 171 } 172 /*----------------------------------------------------------------------------*/ 173 static inline int64_t 174 TimeDiffUs(struct timeval *t2, struct timeval *t1) 175 { 176 return (t2->tv_sec - t1->tv_sec) * 1000000 + 177 (int64_t)(t2->tv_usec - t1->tv_usec); 178 } 179 /*----------------------------------------------------------------------------*/ 180 #endif 181 #ifdef NETSTAT 182 static inline void 183 PrintThreadNetworkStats(mtcp_manager_t mtcp, struct net_stat *ns) 184 { 185 int i; 186 187 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 188 ns->rx_packets[i] = mtcp->nstat.rx_packets[i] - mtcp->p_nstat.rx_packets[i]; 189 ns->rx_errors[i] = mtcp->nstat.rx_errors[i] - mtcp->p_nstat.rx_errors[i]; 190 ns->rx_bytes[i] = mtcp->nstat.rx_bytes[i] - mtcp->p_nstat.rx_bytes[i]; 191 ns->tx_packets[i] = mtcp->nstat.tx_packets[i] - mtcp->p_nstat.tx_packets[i]; 192 ns->tx_drops[i] = mtcp->nstat.tx_drops[i] - mtcp->p_nstat.tx_drops[i]; 193 ns->tx_bytes[i] = mtcp->nstat.tx_bytes[i] - mtcp->p_nstat.tx_bytes[i]; 194 #if NETSTAT_PERTHREAD 195 if (g_config.mos->netdev_table->ent[i]->stat_print) { 196 fprintf(stderr, "[CPU%2d] %s flows: %6u, " 197 "RX: %7llu(pps) (err: %5llu), %5.2lf(Gbps), " 198 "TX: %7llu(pps), %5.2lf(Gbps)\n", 199 mtcp->ctx->cpu, 200 g_config.mos->netdev_table->ent[i]->dev_name, 201 (unsigned)mtcp->flow_cnt, 202 (long long unsigned)ns->rx_packets[i], 203 (long long unsigned)ns->rx_errors[i], 204 GBPS(ns->rx_bytes[i]), 205 (long long unsigned)ns->tx_packets[i], 206 GBPS(ns->tx_bytes[i])); 207 } 208 #endif 209 } 210 mtcp->p_nstat = mtcp->nstat; 211 212 } 213 /*----------------------------------------------------------------------------*/ 214 #if ROUND_STAT 215 static inline void 216 PrintThreadRoundStats(mtcp_manager_t mtcp, struct run_stat *rs) 217 { 218 #define ROUND_DIV (1000) 219 rs->rounds = mtcp->runstat.rounds - mtcp->p_runstat.rounds; 220 rs->rounds_rx = mtcp->runstat.rounds_rx - mtcp->p_runstat.rounds_rx; 221 rs->rounds_rx_try = mtcp->runstat.rounds_rx_try - mtcp->p_runstat.rounds_rx_try; 222 rs->rounds_tx = mtcp->runstat.rounds_tx - mtcp->p_runstat.rounds_tx; 223 rs->rounds_tx_try = mtcp->runstat.rounds_tx_try - mtcp->p_runstat.rounds_tx_try; 224 rs->rounds_select = mtcp->runstat.rounds_select - mtcp->p_runstat.rounds_select; 225 rs->rounds_select_rx = mtcp->runstat.rounds_select_rx - mtcp->p_runstat.rounds_select_rx; 226 rs->rounds_select_tx = mtcp->runstat.rounds_select_tx - mtcp->p_runstat.rounds_select_tx; 227 rs->rounds_select_intr = mtcp->runstat.rounds_select_intr - mtcp->p_runstat.rounds_select_intr; 228 rs->rounds_twcheck = mtcp->runstat.rounds_twcheck - mtcp->p_runstat.rounds_twcheck; 229 mtcp->p_runstat = mtcp->runstat; 230 #if NETSTAT_PERTHREAD 231 fprintf(stderr, "[CPU%2d] Rounds: %4lluK, " 232 "rx: %3lluK (try: %4lluK), tx: %3lluK (try: %4lluK), " 233 "ps_select: %4llu (rx: %4llu, tx: %4llu, intr: %3llu)\n", 234 mtcp->ctx->cpu, rs->rounds / ROUND_DIV, 235 rs->rounds_rx / ROUND_DIV, rs->rounds_rx_try / ROUND_DIV, 236 rs->rounds_tx / ROUND_DIV, rs->rounds_tx_try / ROUND_DIV, 237 rs->rounds_select, 238 rs->rounds_select_rx, rs->rounds_select_tx, rs->rounds_select_intr); 239 #endif 240 } 241 #endif /* ROUND_STAT */ 242 /*----------------------------------------------------------------------------*/ 243 #if TIME_STAT 244 static inline void 245 PrintThreadRoundTime(mtcp_manager_t mtcp) 246 { 247 fprintf(stderr, "[CPU%2d] Time: (avg, max) " 248 "round: (%4luus, %4luus), processing: (%4luus, %4luus), " 249 "tcheck: (%4luus, %4luus), epoll: (%4luus, %4luus), " 250 "handle: (%4luus, %4luus), xmit: (%4luus, %4luus), " 251 "select: (%4luus, %4luus)\n", mtcp->ctx->cpu, 252 GetAverageStat(&mtcp->rtstat.round), mtcp->rtstat.round.max, 253 GetAverageStat(&mtcp->rtstat.processing), mtcp->rtstat.processing.max, 254 GetAverageStat(&mtcp->rtstat.tcheck), mtcp->rtstat.tcheck.max, 255 GetAverageStat(&mtcp->rtstat.epoll), mtcp->rtstat.epoll.max, 256 GetAverageStat(&mtcp->rtstat.handle), mtcp->rtstat.handle.max, 257 GetAverageStat(&mtcp->rtstat.xmit), mtcp->rtstat.xmit.max, 258 GetAverageStat(&mtcp->rtstat.select), mtcp->rtstat.select.max); 259 260 InitStatCounter(&mtcp->rtstat.round); 261 InitStatCounter(&mtcp->rtstat.processing); 262 InitStatCounter(&mtcp->rtstat.tcheck); 263 InitStatCounter(&mtcp->rtstat.epoll); 264 InitStatCounter(&mtcp->rtstat.handle); 265 InitStatCounter(&mtcp->rtstat.xmit); 266 InitStatCounter(&mtcp->rtstat.select); 267 } 268 #endif 269 #endif /* NETSTAT */ 270 /*----------------------------------------------------------------------------*/ 271 #if EVENT_STAT 272 static inline void 273 PrintEventStat(int core, struct mtcp_epoll_stat *stat) 274 { 275 fprintf(stderr, "[CPU%2d] calls: %lu, waits: %lu, wakes: %lu, " 276 "issued: %lu, registered: %lu, invalidated: %lu, handled: %lu\n", 277 core, stat->calls, stat->waits, stat->wakes, 278 stat->issued, stat->registered, stat->invalidated, stat->handled); 279 memset(stat, 0, sizeof(struct mtcp_epoll_stat)); 280 } 281 #endif /* EVENT_STAT */ 282 /*----------------------------------------------------------------------------*/ 283 #ifdef NETSTAT 284 static inline void 285 PrintNetworkStats(mtcp_manager_t mtcp, uint32_t cur_ts) 286 { 287 #define TIMEOUT 1 288 int i; 289 struct net_stat ns; 290 bool stat_print = false; 291 #if ROUND_STAT 292 struct run_stat rs; 293 #endif /* ROUND_STAT */ 294 #ifdef NETSTAT_TOTAL 295 static double peak_total_rx_gbps = 0; 296 static double peak_total_tx_gbps = 0; 297 static double avg_total_rx_gbps = 0; 298 static double avg_total_tx_gbps = 0; 299 300 double total_rx_gbps = 0, total_tx_gbps = 0; 301 int j; 302 uint32_t gflow_cnt = 0; 303 struct net_stat g_nstat; 304 #if ROUND_STAT 305 struct run_stat g_runstat; 306 #endif /* ROUND_STAT */ 307 #endif /* NETSTAT_TOTAL */ 308 309 if (TS_TO_MSEC(cur_ts - mtcp->p_nstat_ts) < SEC_TO_MSEC(TIMEOUT)) { 310 return; 311 } 312 313 mtcp->p_nstat_ts = cur_ts; 314 gflow_cnt = 0; 315 memset(&g_nstat, 0, sizeof(struct net_stat)); 316 for (i = 0; i < g_config.mos->num_cores; i++) { 317 if (running[i]) { 318 PrintThreadNetworkStats(g_mtcp[i], &ns); 319 #if NETSTAT_TOTAL 320 gflow_cnt += g_mtcp[i]->flow_cnt; 321 for (j = 0; j < g_config.mos->netdev_table->num; j++) { 322 g_nstat.rx_packets[j] += ns.rx_packets[j]; 323 g_nstat.rx_errors[j] += ns.rx_errors[j]; 324 g_nstat.rx_bytes[j] += ns.rx_bytes[j]; 325 g_nstat.tx_packets[j] += ns.tx_packets[j]; 326 g_nstat.tx_drops[j] += ns.tx_drops[j]; 327 g_nstat.tx_bytes[j] += ns.tx_bytes[j]; 328 } 329 #endif 330 } 331 } 332 #if NETSTAT_TOTAL 333 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 334 if (g_config.mos->netdev_table->ent[i]->stat_print) { 335 fprintf(stderr, "[ ALL ] %s, " 336 "RX: %7llu(pps) (err: %5llu), %5.2lf(Gbps), " 337 "TX: %7llu(pps), %5.2lf(Gbps)\n", 338 g_config.mos->netdev_table->ent[i]->dev_name, 339 (long long unsigned)g_nstat.rx_packets[i], 340 (long long unsigned)g_nstat.rx_errors[i], 341 GBPS(g_nstat.rx_bytes[i]), 342 (long long unsigned)g_nstat.tx_packets[i], 343 GBPS(g_nstat.tx_bytes[i])); 344 total_rx_gbps += GBPS(g_nstat.rx_bytes[i]); 345 total_tx_gbps += GBPS(g_nstat.tx_bytes[i]); 346 stat_print = true; 347 } 348 } 349 if (stat_print) { 350 fprintf(stderr, "[ ALL ] flows: %6u\n", gflow_cnt); 351 if (avg_total_rx_gbps == 0) 352 avg_total_rx_gbps = total_rx_gbps; 353 else 354 avg_total_rx_gbps = avg_total_rx_gbps * 0.6 + total_rx_gbps * 0.4; 355 356 if (avg_total_tx_gbps == 0) 357 avg_total_tx_gbps = total_tx_gbps; 358 else 359 avg_total_tx_gbps = avg_total_tx_gbps * 0.6 + total_tx_gbps * 0.4; 360 361 if (peak_total_rx_gbps < total_rx_gbps) 362 peak_total_rx_gbps = total_rx_gbps; 363 if (peak_total_tx_gbps < total_tx_gbps) 364 peak_total_tx_gbps = total_tx_gbps; 365 366 fprintf(stderr, "[ PEAK ] RX: %5.2lf(Gbps), TX: %5.2lf(Gbps)\n" 367 "[ RECENT AVG ] RX: %5.2lf(Gbps), TX: %5.2lf(Gbps)\n", 368 peak_total_rx_gbps, peak_total_tx_gbps, 369 avg_total_rx_gbps, avg_total_tx_gbps); 370 } 371 #endif 372 373 #if ROUND_STAT 374 memset(&g_runstat, 0, sizeof(struct run_stat)); 375 for (i = 0; i < g_config.mos->num_cores; i++) { 376 if (running[i]) { 377 PrintThreadRoundStats(g_mtcp[i], &rs); 378 #if DBGMSG 379 g_runstat.rounds += rs.rounds; 380 g_runstat.rounds_rx += rs.rounds_rx; 381 g_runstat.rounds_rx_try += rs.rounds_rx_try; 382 g_runstat.rounds_tx += rs.rounds_tx; 383 g_runstat.rounds_tx_try += rs.rounds_tx_try; 384 g_runstat.rounds_select += rs.rounds_select; 385 g_runstat.rounds_select_rx += rs.rounds_select_rx; 386 g_runstat.rounds_select_tx += rs.rounds_select_tx; 387 #endif 388 } 389 } 390 391 TRACE_DBG("[ ALL ] Rounds: %4ldK, " 392 "rx: %3ldK (try: %4ldK), tx: %3ldK (try: %4ldK), " 393 "ps_select: %4ld (rx: %4ld, tx: %4ld)\n", 394 g_runstat.rounds / 1000, g_runstat.rounds_rx / 1000, 395 g_runstat.rounds_rx_try / 1000, g_runstat.rounds_tx / 1000, 396 g_runstat.rounds_tx_try / 1000, g_runstat.rounds_select, 397 g_runstat.rounds_select_rx, g_runstat.rounds_select_tx); 398 #endif /* ROUND_STAT */ 399 400 #if TIME_STAT 401 for (i = 0; i < g_config.mos->num_cores; i++) { 402 if (running[i]) { 403 PrintThreadRoundTime(g_mtcp[i]); 404 } 405 } 406 #endif 407 408 #if EVENT_STAT 409 for (i = 0; i < g_config.mos->num_cores; i++) { 410 if (running[i] && g_mtcp[i]->ep) { 411 PrintEventStat(i, &g_mtcp[i]->ep->stat); 412 } 413 } 414 #endif 415 416 fflush(stderr); 417 } 418 #endif /* NETSTAT */ 419 /*----------------------------------------------------------------------------*/ 420 static inline void 421 FlushMonitorReadEvents(mtcp_manager_t mtcp) 422 { 423 struct event_queue *mtcpq; 424 struct tcp_stream *cur_stream; 425 struct mon_listener *walk; 426 427 /* check if monitor sockets should be passed data */ 428 TAILQ_FOREACH(walk, &mtcp->monitors, link) { 429 if (walk->socket->socktype != MOS_SOCK_MONITOR_STREAM || 430 !(mtcpq = walk->eq)) 431 continue; 432 433 while (mtcpq->num_events > 0) { 434 cur_stream = 435 (struct tcp_stream *)mtcpq->events[mtcpq->start++].ev.data.ptr; 436 /* only read events */ 437 if (cur_stream != NULL && 438 (cur_stream->actions & MOS_ACT_READ_DATA)) { 439 if (cur_stream->rcvvar != NULL && 440 cur_stream->rcvvar->rcvbuf != NULL) { 441 /* no need to pass pkt context */ 442 struct socket_map *walk; 443 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 444 HandleCallback(mtcp, MOS_NULL, walk, 445 cur_stream->side, NULL, 446 MOS_ON_CONN_NEW_DATA); 447 } SOCKQ_FOREACH_END; 448 } 449 /* reset the actions now */ 450 cur_stream->actions = 0; 451 } 452 if (mtcpq->start >= mtcpq->size) 453 mtcpq->start = 0; 454 mtcpq->num_events--; 455 } 456 } 457 } 458 /*----------------------------------------------------------------------------*/ 459 static inline void 460 FlushBufferedReadEvents(mtcp_manager_t mtcp) 461 { 462 int i; 463 int offset; 464 struct event_queue *mtcpq; 465 struct tcp_stream *cur_stream; 466 467 if (mtcp->ep == NULL) { 468 TRACE_EPOLL("No epoll socket has been registered yet!\n"); 469 return; 470 } else { 471 /* case when mtcpq exists */ 472 mtcpq = mtcp->ep->mtcp_queue; 473 offset = mtcpq->start; 474 } 475 476 /* we will use queued-up epoll read-in events 477 * to trigger buffered read monitor events */ 478 for (i = 0; i < mtcpq->num_events; i++) { 479 cur_stream = mtcp->smap[mtcpq->events[offset++].sockid].stream; 480 /* only read events */ 481 /* Raise new data callback event */ 482 if (cur_stream != NULL && 483 (cur_stream->socket->events | MOS_EPOLLIN)) { 484 if (cur_stream->rcvvar != NULL && 485 cur_stream->rcvvar->rcvbuf != NULL) { 486 /* no need to pass pkt context */ 487 struct socket_map *walk; 488 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 489 HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side, 490 NULL, MOS_ON_CONN_NEW_DATA); 491 } SOCKQ_FOREACH_END; 492 } 493 } 494 if (offset >= mtcpq->size) 495 offset = 0; 496 } 497 } 498 /*----------------------------------------------------------------------------*/ 499 static inline void 500 FlushEpollEvents(mtcp_manager_t mtcp, uint32_t cur_ts) 501 { 502 struct mtcp_epoll *ep = mtcp->ep; 503 struct event_queue *usrq = ep->usr_queue; 504 struct event_queue *mtcpq = ep->mtcp_queue; 505 506 pthread_mutex_lock(&ep->epoll_lock); 507 if (ep->mtcp_queue->num_events > 0) { 508 /* while mtcp_queue have events */ 509 /* and usr_queue is not full */ 510 while (mtcpq->num_events > 0 && usrq->num_events < usrq->size) { 511 /* copy the event from mtcp_queue to usr_queue */ 512 usrq->events[usrq->end++] = mtcpq->events[mtcpq->start++]; 513 514 if (usrq->end >= usrq->size) 515 usrq->end = 0; 516 usrq->num_events++; 517 518 if (mtcpq->start >= mtcpq->size) 519 mtcpq->start = 0; 520 mtcpq->num_events--; 521 } 522 } 523 524 /* if there are pending events, wake up user */ 525 if (ep->waiting && (ep->usr_queue->num_events > 0 || 526 ep->usr_shadow_queue->num_events > 0)) { 527 STAT_COUNT(mtcp->runstat.rounds_epoll); 528 TRACE_EPOLL("Broadcasting events. num: %d, cur_ts: %u, prev_ts: %u\n", 529 ep->usr_queue->num_events, cur_ts, mtcp->ts_last_event); 530 mtcp->ts_last_event = cur_ts; 531 ep->stat.wakes++; 532 pthread_cond_signal(&ep->epoll_cond); 533 } 534 pthread_mutex_unlock(&ep->epoll_lock); 535 } 536 /*----------------------------------------------------------------------------*/ 537 static inline void 538 HandleApplicationCalls(mtcp_manager_t mtcp, uint32_t cur_ts) 539 { 540 tcp_stream *stream; 541 int cnt, max_cnt; 542 int handled, delayed; 543 int control, send, ack; 544 545 /* connect handling */ 546 while ((stream = StreamDequeue(mtcp->connectq))) { 547 if (stream->state != TCP_ST_SYN_SENT) { 548 TRACE_INFO("Got a connection request from app with state: %s", 549 TCPStateToString(stream)); 550 exit(EXIT_FAILURE); 551 } else { 552 stream->cb_events |= MOS_ON_CONN_START | 553 MOS_ON_TCP_STATE_CHANGE; 554 /* if monitor is on... */ 555 if (stream->pair_stream != NULL) 556 stream->pair_stream->cb_events |= 557 MOS_ON_CONN_START; 558 } 559 AddtoControlList(mtcp, stream, cur_ts); 560 } 561 562 /* send queue handling */ 563 while ((stream = StreamDequeue(mtcp->sendq))) { 564 stream->sndvar->on_sendq = FALSE; 565 AddtoSendList(mtcp, stream); 566 } 567 568 /* ack queue handling */ 569 while ((stream = StreamDequeue(mtcp->ackq))) { 570 stream->sndvar->on_ackq = FALSE; 571 EnqueueACK(mtcp, stream, cur_ts, ACK_OPT_AGGREGATE); 572 } 573 574 /* close handling */ 575 handled = delayed = 0; 576 control = send = ack = 0; 577 while ((stream = StreamDequeue(mtcp->closeq))) { 578 struct tcp_send_vars *sndvar = stream->sndvar; 579 sndvar->on_closeq = FALSE; 580 581 if (sndvar->sndbuf) { 582 sndvar->fss = sndvar->sndbuf->head_seq + sndvar->sndbuf->len; 583 } else { 584 sndvar->fss = stream->snd_nxt; 585 } 586 587 if (g_config.mos->tcp_timeout > 0) 588 RemoveFromTimeoutList(mtcp, stream); 589 590 if (stream->have_reset) { 591 handled++; 592 if (stream->state != TCP_ST_CLOSED_RSVD) { 593 stream->close_reason = TCP_RESET; 594 stream->state = TCP_ST_CLOSED_RSVD; 595 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 596 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 597 DestroyTCPStream(mtcp, stream); 598 } else { 599 TRACE_ERROR("Stream already closed.\n"); 600 } 601 602 } else if (sndvar->on_control_list) { 603 sndvar->on_closeq_int = TRUE; 604 StreamInternalEnqueue(mtcp->closeq_int, stream); 605 delayed++; 606 if (sndvar->on_control_list) 607 control++; 608 if (sndvar->on_send_list) 609 send++; 610 if (sndvar->on_ack_list) 611 ack++; 612 613 } else if (sndvar->on_send_list || sndvar->on_ack_list) { 614 handled++; 615 if (stream->state == TCP_ST_ESTABLISHED) { 616 stream->state = TCP_ST_FIN_WAIT_1; 617 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 618 TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id); 619 620 } else if (stream->state == TCP_ST_CLOSE_WAIT) { 621 stream->state = TCP_ST_LAST_ACK; 622 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 623 TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id); 624 } 625 stream->control_list_waiting = TRUE; 626 627 } else if (stream->state != TCP_ST_CLOSED_RSVD) { 628 handled++; 629 if (stream->state == TCP_ST_ESTABLISHED) { 630 stream->state = TCP_ST_FIN_WAIT_1; 631 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 632 TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id); 633 634 } else if (stream->state == TCP_ST_CLOSE_WAIT) { 635 stream->state = TCP_ST_LAST_ACK; 636 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 637 TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id); 638 } 639 //sndvar->rto = TCP_FIN_RTO; 640 //UpdateRetransmissionTimer(mtcp, stream, mtcp->cur_ts); 641 AddtoControlList(mtcp, stream, cur_ts); 642 } else { 643 TRACE_ERROR("Already closed connection!\n"); 644 } 645 } 646 TRACE_ROUND("Handling close connections. cnt: %d\n", cnt); 647 648 cnt = 0; 649 max_cnt = mtcp->closeq_int->count; 650 while (cnt++ < max_cnt) { 651 stream = StreamInternalDequeue(mtcp->closeq_int); 652 653 if (stream->sndvar->on_control_list) { 654 StreamInternalEnqueue(mtcp->closeq_int, stream); 655 656 } else if (stream->state != TCP_ST_CLOSED_RSVD) { 657 handled++; 658 stream->sndvar->on_closeq_int = FALSE; 659 if (stream->state == TCP_ST_ESTABLISHED) { 660 stream->state = TCP_ST_FIN_WAIT_1; 661 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 662 TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id); 663 664 } else if (stream->state == TCP_ST_CLOSE_WAIT) { 665 stream->state = TCP_ST_LAST_ACK; 666 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 667 TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id); 668 } 669 AddtoControlList(mtcp, stream, cur_ts); 670 } else { 671 stream->sndvar->on_closeq_int = FALSE; 672 TRACE_ERROR("Already closed connection!\n"); 673 } 674 } 675 676 /* reset handling */ 677 while ((stream = StreamDequeue(mtcp->resetq))) { 678 stream->sndvar->on_resetq = FALSE; 679 680 if (g_config.mos->tcp_timeout > 0) 681 RemoveFromTimeoutList(mtcp, stream); 682 683 if (stream->have_reset) { 684 if (stream->state != TCP_ST_CLOSED_RSVD) { 685 stream->close_reason = TCP_RESET; 686 stream->state = TCP_ST_CLOSED_RSVD; 687 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 688 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 689 DestroyTCPStream(mtcp, stream); 690 } else { 691 TRACE_ERROR("Stream already closed.\n"); 692 } 693 694 } else if (stream->sndvar->on_control_list || 695 stream->sndvar->on_send_list || stream->sndvar->on_ack_list) { 696 /* wait until all the queues are flushed */ 697 stream->sndvar->on_resetq_int = TRUE; 698 StreamInternalEnqueue(mtcp->resetq_int, stream); 699 700 } else { 701 if (stream->state != TCP_ST_CLOSED_RSVD) { 702 stream->close_reason = TCP_ACTIVE_CLOSE; 703 stream->state = TCP_ST_CLOSED_RSVD; 704 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 705 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 706 AddtoControlList(mtcp, stream, cur_ts); 707 } else { 708 TRACE_ERROR("Stream already closed.\n"); 709 } 710 } 711 } 712 TRACE_ROUND("Handling reset connections. cnt: %d\n", cnt); 713 714 cnt = 0; 715 max_cnt = mtcp->resetq_int->count; 716 while (cnt++ < max_cnt) { 717 stream = StreamInternalDequeue(mtcp->resetq_int); 718 719 if (stream->sndvar->on_control_list || 720 stream->sndvar->on_send_list || stream->sndvar->on_ack_list) { 721 /* wait until all the queues are flushed */ 722 StreamInternalEnqueue(mtcp->resetq_int, stream); 723 724 } else { 725 stream->sndvar->on_resetq_int = FALSE; 726 727 if (stream->state != TCP_ST_CLOSED_RSVD) { 728 stream->close_reason = TCP_ACTIVE_CLOSE; 729 stream->state = TCP_ST_CLOSED_RSVD; 730 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 731 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 732 AddtoControlList(mtcp, stream, cur_ts); 733 } else { 734 TRACE_ERROR("Stream already closed.\n"); 735 } 736 } 737 } 738 739 /* destroy streams in destroyq */ 740 while ((stream = StreamDequeue(mtcp->destroyq))) { 741 DestroyTCPStream(mtcp, stream); 742 } 743 744 mtcp->wakeup_flag = FALSE; 745 } 746 /*----------------------------------------------------------------------------*/ 747 static inline void 748 WritePacketsToChunks(mtcp_manager_t mtcp, uint32_t cur_ts) 749 { 750 int thresh = g_config.mos->max_concurrency; 751 int i; 752 753 /* Set the threshold to g_config.mos->max_concurrency to send ACK immediately */ 754 /* Otherwise, set to appropriate value (e.g. thresh) */ 755 assert(mtcp->g_sender != NULL); 756 if (mtcp->g_sender->control_list_cnt) 757 WriteTCPControlList(mtcp, mtcp->g_sender, cur_ts, thresh); 758 if (mtcp->g_sender->ack_list_cnt) 759 WriteTCPACKList(mtcp, mtcp->g_sender, cur_ts, thresh); 760 if (mtcp->g_sender->send_list_cnt) 761 WriteTCPDataList(mtcp, mtcp->g_sender, cur_ts, thresh); 762 763 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 764 assert(mtcp->n_sender[i] != NULL); 765 if (mtcp->n_sender[i]->control_list_cnt) 766 WriteTCPControlList(mtcp, mtcp->n_sender[i], cur_ts, thresh); 767 if (mtcp->n_sender[i]->ack_list_cnt) 768 WriteTCPACKList(mtcp, mtcp->n_sender[i], cur_ts, thresh); 769 if (mtcp->n_sender[i]->send_list_cnt) 770 WriteTCPDataList(mtcp, mtcp->n_sender[i], cur_ts, thresh); 771 } 772 } 773 /*----------------------------------------------------------------------------*/ 774 #if TESTING 775 static int 776 DestroyRemainingFlows(mtcp_manager_t mtcp) 777 { 778 struct hashtable *ht = mtcp->tcp_flow_table; 779 tcp_stream *walk; 780 int cnt, i; 781 782 cnt = 0; 783 784 thread_printf(mtcp, mtcp->log_fp, 785 "CPU %d: Flushing remaining flows.\n", mtcp->ctx->cpu); 786 787 for (i = 0; i < NUM_BINS; i++) { 788 TAILQ_FOREACH(walk, &ht->ht_table[i], rcvvar->he_link) { 789 thread_printf(mtcp, mtcp->log_fp, 790 "CPU %d: Destroying stream %d\n", mtcp->ctx->cpu, walk->id); 791 #ifdef DUMP_STREAM 792 DumpStream(mtcp, walk); 793 #endif 794 DestroyTCPStream(mtcp, walk); 795 cnt++; 796 } 797 } 798 799 return cnt; 800 } 801 #endif 802 /*----------------------------------------------------------------------------*/ 803 static void 804 InterruptApplication(mtcp_manager_t mtcp) 805 { 806 /* interrupt if the mtcp_epoll_wait() is waiting */ 807 if (mtcp->ep) { 808 pthread_mutex_lock(&mtcp->ep->epoll_lock); 809 if (mtcp->ep->waiting) { 810 pthread_cond_signal(&mtcp->ep->epoll_cond); 811 } 812 pthread_mutex_unlock(&mtcp->ep->epoll_lock); 813 } 814 /* interrupt if the accept() is waiting */ 815 if (mtcp->listener) { 816 if (mtcp->listener->socket) { 817 pthread_mutex_lock(&mtcp->listener->accept_lock); 818 if (!(mtcp->listener->socket->opts & MTCP_NONBLOCK)) { 819 pthread_cond_signal(&mtcp->listener->accept_cond); 820 } 821 pthread_mutex_unlock(&mtcp->listener->accept_lock); 822 } 823 } 824 } 825 /*----------------------------------------------------------------------------*/ 826 void 827 RunPassiveLoop(mtcp_manager_t mtcp) 828 { 829 sem_wait(&g_done_sem[mtcp->ctx->cpu]); 830 sem_destroy(&g_done_sem[mtcp->ctx->cpu]); 831 return; 832 } 833 /*----------------------------------------------------------------------------*/ 834 static void 835 RunMainLoop(struct mtcp_thread_context *ctx) 836 { 837 mtcp_manager_t mtcp = ctx->mtcp_manager; 838 int i; 839 int recv_cnt; 840 int rx_inf, tx_inf; 841 struct timeval cur_ts = {0}; 842 uint32_t ts, ts_prev; 843 844 #if TIME_STAT 845 struct timeval prev_ts, processing_ts, tcheck_ts, 846 epoll_ts, handle_ts, xmit_ts, select_ts; 847 #endif 848 int thresh; 849 850 gettimeofday(&cur_ts, NULL); 851 852 TRACE_DBG("CPU %d: mtcp thread running.\n", ctx->cpu); 853 854 #if TIME_STAT 855 prev_ts = cur_ts; 856 InitStatCounter(&mtcp->rtstat.round); 857 InitStatCounter(&mtcp->rtstat.processing); 858 InitStatCounter(&mtcp->rtstat.tcheck); 859 InitStatCounter(&mtcp->rtstat.epoll); 860 InitStatCounter(&mtcp->rtstat.handle); 861 InitStatCounter(&mtcp->rtstat.xmit); 862 InitStatCounter(&mtcp->rtstat.select); 863 #endif 864 865 ts = ts_prev = 0; 866 while ((!ctx->done || mtcp->flow_cnt) && !ctx->exit) { 867 868 STAT_COUNT(mtcp->runstat.rounds); 869 recv_cnt = 0; 870 gettimeofday(&cur_ts, NULL); 871 #if TIME_STAT 872 /* measure the inter-round delay */ 873 UpdateStatCounter(&mtcp->rtstat.round, TimeDiffUs(&cur_ts, &prev_ts)); 874 prev_ts = cur_ts; 875 #endif 876 877 ts = TIMEVAL_TO_TS(&cur_ts); 878 mtcp->cur_ts = ts; 879 880 for (rx_inf = 0; rx_inf < g_config.mos->netdev_table->num; rx_inf++) { 881 882 recv_cnt = mtcp->iom->recv_pkts(ctx, rx_inf); 883 STAT_COUNT(mtcp->runstat.rounds_rx_try); 884 885 for (i = 0; i < recv_cnt; i++) { 886 uint16_t len; 887 uint8_t *pktbuf; 888 pktbuf = mtcp->iom->get_rptr(mtcp->ctx, rx_inf, i, &len); 889 ProcessPacket(mtcp, rx_inf, i, ts, pktbuf, len); 890 } 891 } 892 STAT_COUNT(mtcp->runstat.rounds_rx); 893 894 #if TIME_STAT 895 gettimeofday(&processing_ts, NULL); 896 UpdateStatCounter(&mtcp->rtstat.processing, 897 TimeDiffUs(&processing_ts, &cur_ts)); 898 #endif /* TIME_STAT */ 899 900 /* Handle user defined timeout */ 901 struct timer *walk, *tmp; 902 for (walk = TAILQ_FIRST(&mtcp->timer_list); walk != NULL; walk = tmp) { 903 tmp = TAILQ_NEXT(walk, timer_link); 904 if (TIMEVAL_LT(&cur_ts, &walk->exp)) 905 break; 906 907 struct mtcp_context mctx = {.cpu = ctx->cpu}; 908 walk->cb(&mctx, walk->id, 0, 0 /* FIXME */, NULL); 909 DelTimer(mtcp, walk); 910 } 911 912 /* interaction with application */ 913 if (mtcp->flow_cnt > 0) { 914 915 /* check retransmission timeout and timewait expire */ 916 #if 0 917 thresh = (int)mtcp->flow_cnt / (TS_TO_USEC(PER_STREAM_TCHECK)); 918 assert(thresh >= 0); 919 if (thresh == 0) 920 thresh = 1; 921 if (recv_cnt > 0 && thresh > recv_cnt) 922 thresh = recv_cnt; 923 #else 924 thresh = g_config.mos->max_concurrency; 925 #endif 926 927 /* Eunyoung, you may fix this later 928 * if there is no rcv packet, we will send as much as possible 929 */ 930 if (thresh == -1) 931 thresh = g_config.mos->max_concurrency; 932 933 CheckRtmTimeout(mtcp, ts, thresh); 934 CheckTimewaitExpire(mtcp, ts, thresh); 935 936 if (g_config.mos->tcp_timeout > 0 && ts != ts_prev) { 937 CheckConnectionTimeout(mtcp, ts, thresh); 938 } 939 940 #if TIME_STAT 941 } 942 gettimeofday(&tcheck_ts, NULL); 943 UpdateStatCounter(&mtcp->rtstat.tcheck, 944 TimeDiffUs(&tcheck_ts, &processing_ts)); 945 946 if (mtcp->flow_cnt > 0) { 947 #endif /* TIME_STAT */ 948 949 } 950 951 /* 952 * before flushing epoll events, call monitor events for 953 * all registered `read` events 954 */ 955 if (mtcp->num_msp > 0) 956 /* call this when only a standalone monitor is running */ 957 FlushMonitorReadEvents(mtcp); 958 959 /* if epoll is in use, flush all the queued events */ 960 if (mtcp->ep) { 961 FlushBufferedReadEvents(mtcp); 962 FlushEpollEvents(mtcp, ts); 963 } 964 #if TIME_STAT 965 gettimeofday(&epoll_ts, NULL); 966 UpdateStatCounter(&mtcp->rtstat.epoll, 967 TimeDiffUs(&epoll_ts, &tcheck_ts)); 968 #endif /* TIME_STAT */ 969 970 if (end_app_exists && mtcp->flow_cnt > 0) { 971 /* handle stream queues */ 972 HandleApplicationCalls(mtcp, ts); 973 } 974 975 #if TIME_STAT 976 gettimeofday(&handle_ts, NULL); 977 UpdateStatCounter(&mtcp->rtstat.handle, 978 TimeDiffUs(&handle_ts, &epoll_ts)); 979 #endif /* TIME_STAT */ 980 981 WritePacketsToChunks(mtcp, ts); 982 983 /* send packets from write buffer */ 984 /* Send until tx is available */ 985 int num_dev = g_config.mos->netdev_table->num; 986 if (likely(mtcp->iom->send_pkts != NULL)) 987 for (tx_inf = 0; tx_inf < num_dev; tx_inf++) { 988 mtcp->iom->send_pkts(ctx, tx_inf); 989 } 990 991 #if TIME_STAT 992 gettimeofday(&xmit_ts, NULL); 993 UpdateStatCounter(&mtcp->rtstat.xmit, 994 TimeDiffUs(&xmit_ts, &handle_ts)); 995 #endif /* TIME_STAT */ 996 997 if (ts != ts_prev) { 998 ts_prev = ts; 999 #ifdef NETSTAT 1000 if (ctx->cpu == printer) { 1001 #ifdef RUN_ARP 1002 ARPTimer(mtcp, ts); 1003 #endif 1004 #ifdef NETSTAT 1005 PrintNetworkStats(mtcp, ts); 1006 #endif 1007 } 1008 #endif /* NETSTAT */ 1009 } 1010 1011 if (mtcp->iom->select) 1012 mtcp->iom->select(ctx); 1013 1014 if (ctx->interrupt) { 1015 InterruptApplication(mtcp); 1016 } 1017 } 1018 1019 #if TESTING 1020 DestroyRemainingFlows(mtcp); 1021 #endif 1022 1023 TRACE_DBG("MTCP thread %d out of main loop.\n", ctx->cpu); 1024 /* flush logs */ 1025 flush_log_data(mtcp); 1026 TRACE_DBG("MTCP thread %d flushed logs.\n", ctx->cpu); 1027 InterruptApplication(mtcp); 1028 TRACE_INFO("MTCP thread %d finished.\n", ctx->cpu); 1029 } 1030 /*----------------------------------------------------------------------------*/ 1031 struct mtcp_sender * 1032 CreateMTCPSender(int ifidx) 1033 { 1034 struct mtcp_sender *sender; 1035 1036 sender = (struct mtcp_sender *)calloc(1, sizeof(struct mtcp_sender)); 1037 if (!sender) { 1038 return NULL; 1039 } 1040 1041 sender->ifidx = ifidx; 1042 1043 TAILQ_INIT(&sender->control_list); 1044 TAILQ_INIT(&sender->send_list); 1045 TAILQ_INIT(&sender->ack_list); 1046 1047 sender->control_list_cnt = 0; 1048 sender->send_list_cnt = 0; 1049 sender->ack_list_cnt = 0; 1050 1051 return sender; 1052 } 1053 /*----------------------------------------------------------------------------*/ 1054 void 1055 DestroyMTCPSender(struct mtcp_sender *sender) 1056 { 1057 free(sender); 1058 } 1059 /*----------------------------------------------------------------------------*/ 1060 static mtcp_manager_t 1061 InitializeMTCPManager(struct mtcp_thread_context* ctx) 1062 { 1063 mtcp_manager_t mtcp; 1064 char log_name[MAX_FILE_NAME]; 1065 int i; 1066 1067 posix_seq_srand((unsigned)pthread_self()); 1068 1069 mtcp = (mtcp_manager_t)calloc(1, sizeof(struct mtcp_manager)); 1070 if (!mtcp) { 1071 perror("malloc"); 1072 TRACE_ERROR("Failed to allocate mtcp_manager.\n"); 1073 return NULL; 1074 } 1075 g_mtcp[ctx->cpu] = mtcp; 1076 1077 mtcp->tcp_flow_table = CreateHashtable(); 1078 if (!mtcp->tcp_flow_table) { 1079 CTRACE_ERROR("Falied to allocate tcp flow table.\n"); 1080 return NULL; 1081 } 1082 1083 #ifdef HUGEPAGE 1084 #define IS_HUGEPAGE 1 1085 #else 1086 #define IS_HUGEPAGE 0 1087 #endif 1088 if (mon_app_exists) { 1089 /* initialize event callback */ 1090 #ifdef NEWEV 1091 InitEvent(mtcp); 1092 #else 1093 InitEvent(mtcp, NUM_EV_TABLE); 1094 #endif 1095 } 1096 1097 if (!(mtcp->bufseg_pool = MPCreate(sizeof(tcpbufseg_t), 1098 sizeof(tcpbufseg_t) * g_config.mos->max_concurrency * 1099 ((g_config.mos->rmem_size - 1) / UNITBUFSIZE + 1), 0))) { 1100 TRACE_ERROR("Failed to allocate ev_table pool\n"); 1101 exit(0); 1102 } 1103 if (!(mtcp->sockent_pool = MPCreate(sizeof(struct sockent), 1104 sizeof(struct sockent) * g_config.mos->max_concurrency * 3, 0))) { 1105 TRACE_ERROR("Failed to allocate ev_table pool\n"); 1106 exit(0); 1107 } 1108 #ifdef USE_TIMER_POOL 1109 if (!(mtcp->timer_pool = MPCreate(sizeof(struct timer), 1110 sizeof(struct timer) * g_config.mos->max_concurrency * 10, 0))) { 1111 TRACE_ERROR("Failed to allocate ev_table pool\n"); 1112 exit(0); 1113 } 1114 #endif 1115 mtcp->flow_pool = MPCreate(sizeof(tcp_stream), 1116 sizeof(tcp_stream) * g_config.mos->max_concurrency, IS_HUGEPAGE); 1117 if (!mtcp->flow_pool) { 1118 CTRACE_ERROR("Failed to allocate tcp flow pool.\n"); 1119 return NULL; 1120 } 1121 mtcp->rv_pool = MPCreate(sizeof(struct tcp_recv_vars), 1122 sizeof(struct tcp_recv_vars) * g_config.mos->max_concurrency, IS_HUGEPAGE); 1123 if (!mtcp->rv_pool) { 1124 CTRACE_ERROR("Failed to allocate tcp recv variable pool.\n"); 1125 return NULL; 1126 } 1127 mtcp->sv_pool = MPCreate(sizeof(struct tcp_send_vars), 1128 sizeof(struct tcp_send_vars) * g_config.mos->max_concurrency, IS_HUGEPAGE); 1129 if (!mtcp->sv_pool) { 1130 CTRACE_ERROR("Failed to allocate tcp send variable pool.\n"); 1131 return NULL; 1132 } 1133 1134 mtcp->rbm_snd = SBManagerCreate(g_config.mos->wmem_size, g_config.mos->no_ring_buffers, 1135 g_config.mos->max_concurrency); 1136 if (!mtcp->rbm_snd) { 1137 CTRACE_ERROR("Failed to create send ring buffer.\n"); 1138 return NULL; 1139 } 1140 1141 mtcp->smap = (socket_map_t)calloc(g_config.mos->max_concurrency, sizeof(struct socket_map)); 1142 if (!mtcp->smap) { 1143 perror("calloc"); 1144 CTRACE_ERROR("Failed to allocate memory for stream map.\n"); 1145 return NULL; 1146 } 1147 1148 if (mon_app_exists) { 1149 mtcp->msmap = (socket_map_t)calloc(g_config.mos->max_concurrency, sizeof(struct socket_map)); 1150 if (!mtcp->msmap) { 1151 perror("calloc"); 1152 CTRACE_ERROR("Failed to allocate memory for monitor stream map.\n"); 1153 return NULL; 1154 } 1155 1156 for (i = 0; i < g_config.mos->max_concurrency; i++) { 1157 mtcp->msmap[i].monitor_stream = calloc(1, sizeof(struct mon_stream)); 1158 if (!mtcp->msmap[i].monitor_stream) { 1159 perror("calloc"); 1160 CTRACE_ERROR("Failed to allocate memory for monitr stream map\n"); 1161 return NULL; 1162 } 1163 } 1164 } 1165 1166 TAILQ_INIT(&mtcp->timer_list); 1167 TAILQ_INIT(&mtcp->monitors); 1168 1169 TAILQ_INIT(&mtcp->free_smap); 1170 for (i = 0; i < g_config.mos->max_concurrency; i++) { 1171 mtcp->smap[i].id = i; 1172 mtcp->smap[i].socktype = MOS_SOCK_UNUSED; 1173 memset(&mtcp->smap[i].saddr, 0, sizeof(struct sockaddr_in)); 1174 mtcp->smap[i].stream = NULL; 1175 TAILQ_INSERT_TAIL(&mtcp->free_smap, &mtcp->smap[i], link); 1176 } 1177 1178 if (mon_app_exists) { 1179 TAILQ_INIT(&mtcp->free_msmap); 1180 for (i = 0; i < g_config.mos->max_concurrency; i++) { 1181 mtcp->msmap[i].id = i; 1182 mtcp->msmap[i].socktype = MOS_SOCK_UNUSED; 1183 memset(&mtcp->msmap[i].saddr, 0, sizeof(struct sockaddr_in)); 1184 TAILQ_INSERT_TAIL(&mtcp->free_msmap, &mtcp->msmap[i], link); 1185 } 1186 } 1187 1188 mtcp->ctx = ctx; 1189 mtcp->ep = NULL; 1190 1191 snprintf(log_name, MAX_FILE_NAME, "%s/"LOG_FILE_NAME"_%d", 1192 g_config.mos->mos_log, ctx->cpu); 1193 mtcp->log_fp = fopen(log_name, "w+"); 1194 if (!mtcp->log_fp) { 1195 perror("fopen"); 1196 CTRACE_ERROR("Failed to create file for logging. (%s)\n", log_name); 1197 return NULL; 1198 } 1199 mtcp->sp_fd = g_logctx[ctx->cpu]->pair_sp_fd; 1200 mtcp->logger = g_logctx[ctx->cpu]; 1201 1202 mtcp->connectq = CreateStreamQueue(BACKLOG_SIZE); 1203 if (!mtcp->connectq) { 1204 CTRACE_ERROR("Failed to create connect queue.\n"); 1205 return NULL; 1206 } 1207 mtcp->sendq = CreateStreamQueue(g_config.mos->max_concurrency); 1208 if (!mtcp->sendq) { 1209 CTRACE_ERROR("Failed to create send queue.\n"); 1210 return NULL; 1211 } 1212 mtcp->ackq = CreateStreamQueue(g_config.mos->max_concurrency); 1213 if (!mtcp->ackq) { 1214 CTRACE_ERROR("Failed to create ack queue.\n"); 1215 return NULL; 1216 } 1217 mtcp->closeq = CreateStreamQueue(g_config.mos->max_concurrency); 1218 if (!mtcp->closeq) { 1219 CTRACE_ERROR("Failed to create close queue.\n"); 1220 return NULL; 1221 } 1222 mtcp->closeq_int = CreateInternalStreamQueue(g_config.mos->max_concurrency); 1223 if (!mtcp->closeq_int) { 1224 CTRACE_ERROR("Failed to create close queue.\n"); 1225 return NULL; 1226 } 1227 mtcp->resetq = CreateStreamQueue(g_config.mos->max_concurrency); 1228 if (!mtcp->resetq) { 1229 CTRACE_ERROR("Failed to create reset queue.\n"); 1230 return NULL; 1231 } 1232 mtcp->resetq_int = CreateInternalStreamQueue(g_config.mos->max_concurrency); 1233 if (!mtcp->resetq_int) { 1234 CTRACE_ERROR("Failed to create reset queue.\n"); 1235 return NULL; 1236 } 1237 mtcp->destroyq = CreateStreamQueue(g_config.mos->max_concurrency); 1238 if (!mtcp->destroyq) { 1239 CTRACE_ERROR("Failed to create destroy queue.\n"); 1240 return NULL; 1241 } 1242 1243 mtcp->g_sender = CreateMTCPSender(-1); 1244 if (!mtcp->g_sender) { 1245 CTRACE_ERROR("Failed to create global sender structure.\n"); 1246 return NULL; 1247 } 1248 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 1249 mtcp->n_sender[i] = CreateMTCPSender(i); 1250 if (!mtcp->n_sender[i]) { 1251 CTRACE_ERROR("Failed to create per-nic sender structure.\n"); 1252 return NULL; 1253 } 1254 } 1255 1256 mtcp->rto_store = InitRTOHashstore(); 1257 TAILQ_INIT(&mtcp->timewait_list); 1258 TAILQ_INIT(&mtcp->timeout_list); 1259 1260 return mtcp; 1261 } 1262 /*----------------------------------------------------------------------------*/ 1263 static void * 1264 MTCPRunThread(void *arg) 1265 { 1266 mctx_t mctx = (mctx_t)arg; 1267 int cpu = mctx->cpu; 1268 int working; 1269 struct mtcp_manager *mtcp; 1270 struct mtcp_thread_context *ctx; 1271 1272 /* affinitize the thread to this core first */ 1273 mtcp_core_affinitize(cpu); 1274 1275 /* memory alloc after core affinitization would use local memory 1276 most time */ 1277 ctx = calloc(1, sizeof(*ctx)); 1278 if (!ctx) { 1279 perror("calloc"); 1280 TRACE_ERROR("Failed to calloc mtcp context.\n"); 1281 exit(-1); 1282 } 1283 ctx->thread = pthread_self(); 1284 ctx->cpu = cpu; 1285 mtcp = ctx->mtcp_manager = InitializeMTCPManager(ctx); 1286 if (!mtcp) { 1287 TRACE_ERROR("Failed to initialize mtcp manager.\n"); 1288 exit(-1); 1289 } 1290 1291 /* assign mtcp context's underlying I/O module */ 1292 mtcp->iom = current_iomodule_func; 1293 1294 /* I/O initializing */ 1295 if (mtcp->iom->init_handle) 1296 mtcp->iom->init_handle(ctx); 1297 1298 if (pthread_mutex_init(&ctx->flow_pool_lock, NULL)) { 1299 perror("pthread_mutex_init of ctx->flow_pool_lock\n"); 1300 exit(-1); 1301 } 1302 1303 if (pthread_mutex_init(&ctx->socket_pool_lock, NULL)) { 1304 perror("pthread_mutex_init of ctx->socket_pool_lock\n"); 1305 exit(-1); 1306 } 1307 1308 SQ_LOCK_INIT(&ctx->connect_lock, "ctx->connect_lock", exit(-1)); 1309 SQ_LOCK_INIT(&ctx->close_lock, "ctx->close_lock", exit(-1)); 1310 SQ_LOCK_INIT(&ctx->reset_lock, "ctx->reset_lock", exit(-1)); 1311 SQ_LOCK_INIT(&ctx->sendq_lock, "ctx->sendq_lock", exit(-1)); 1312 SQ_LOCK_INIT(&ctx->ackq_lock, "ctx->ackq_lock", exit(-1)); 1313 SQ_LOCK_INIT(&ctx->destroyq_lock, "ctx->destroyq_lock", exit(-1)); 1314 1315 /* remember this context pointer for signal processing */ 1316 g_pctx[cpu] = ctx; 1317 mlockall(MCL_CURRENT); 1318 1319 // attach (nic device, queue) 1320 working = AttachDevice(ctx); 1321 if (working != 0) { 1322 sem_post(&g_init_sem[ctx->cpu]); 1323 TRACE_DBG("MTCP thread %d finished. Not attached any device\n", ctx->cpu); 1324 pthread_exit(NULL); 1325 } 1326 1327 TRACE_DBG("CPU %d: initialization finished.\n", cpu); 1328 sem_post(&g_init_sem[ctx->cpu]); 1329 1330 /* start the main loop */ 1331 RunMainLoop(ctx); 1332 1333 TRACE_DBG("MTCP thread %d finished.\n", ctx->cpu); 1334 1335 /* signaling mTCP thread is done */ 1336 sem_post(&g_done_sem[mctx->cpu]); 1337 1338 //pthread_exit(NULL); 1339 return 0; 1340 } 1341 /*----------------------------------------------------------------------------*/ 1342 mctx_t 1343 mtcp_create_context(int cpu) 1344 { 1345 mctx_t mctx; 1346 int ret; 1347 1348 if (cpu >= g_config.mos->num_cores) { 1349 TRACE_ERROR("Failed initialize new mtcp context. " 1350 "Requested cpu id %d exceed the number of cores %d configured to use.\n", 1351 cpu, g_config.mos->num_cores); 1352 return NULL; 1353 } 1354 1355 /* check if mtcp_create_context() was already initialized */ 1356 if (g_logctx[cpu] != NULL) { 1357 TRACE_ERROR("%s was already initialized before!\n", 1358 __FUNCTION__); 1359 return NULL; 1360 } 1361 1362 ret = sem_init(&g_init_sem[cpu], 0, 0); 1363 if (ret) { 1364 TRACE_ERROR("Failed initialize init_sem.\n"); 1365 return NULL; 1366 } 1367 1368 ret = sem_init(&g_done_sem[cpu], 0, 0); 1369 if (ret) { 1370 TRACE_ERROR("Failed initialize done_sem.\n"); 1371 return NULL; 1372 } 1373 1374 mctx = (mctx_t)calloc(1, sizeof(struct mtcp_context)); 1375 if (!mctx) { 1376 TRACE_ERROR("Failed to allocate memory for mtcp_context.\n"); 1377 return NULL; 1378 } 1379 mctx->cpu = cpu; 1380 g_ctx[cpu] = mctx; 1381 1382 /* initialize logger */ 1383 g_logctx[cpu] = (struct log_thread_context *) 1384 calloc(1, sizeof(struct log_thread_context)); 1385 if (!g_logctx[cpu]) { 1386 perror("malloc"); 1387 TRACE_ERROR("Failed to allocate memory for log thread context.\n"); 1388 return NULL; 1389 } 1390 InitLogThreadContext(g_logctx[cpu], cpu); 1391 if (pthread_create(&log_thread[cpu], 1392 NULL, ThreadLogMain, (void *)g_logctx[cpu])) { 1393 perror("pthread_create"); 1394 TRACE_ERROR("Failed to create log thread\n"); 1395 return NULL; 1396 } 1397 1398 if (pthread_create(&g_thread[cpu], 1399 NULL, MTCPRunThread, (void *)mctx) != 0) { 1400 TRACE_ERROR("pthread_create of mtcp thread failed!\n"); 1401 return NULL; 1402 } 1403 1404 sem_wait(&g_init_sem[cpu]); 1405 sem_destroy(&g_init_sem[cpu]); 1406 1407 running[cpu] = TRUE; 1408 1409 #ifdef NETSTAT 1410 #if NETSTAT_TOTAL 1411 if (printer < 0) { 1412 printer = cpu; 1413 TRACE_INFO("CPU %d is in charge of printing stats.\n", printer); 1414 } 1415 #endif 1416 #endif 1417 1418 return mctx; 1419 } 1420 /*----------------------------------------------------------------------------*/ 1421 int 1422 mtcp_destroy_context(mctx_t mctx) 1423 { 1424 struct mtcp_thread_context *ctx = g_pctx[mctx->cpu]; 1425 if (ctx != NULL) 1426 ctx->done = 1; 1427 1428 struct mtcp_context m; 1429 m.cpu = mctx->cpu; 1430 mtcp_free_context(&m); 1431 1432 free(mctx); 1433 1434 return 0; 1435 } 1436 /*----------------------------------------------------------------------------*/ 1437 /** 1438 * TODO: It currently always returns 0. Add appropriate error return values 1439 */ 1440 void 1441 mtcp_free_context(mctx_t mctx) 1442 { 1443 struct mtcp_thread_context *ctx = g_pctx[mctx->cpu]; 1444 struct mtcp_manager *mtcp = ctx->mtcp_manager; 1445 struct log_thread_context *log_ctx = mtcp->logger; 1446 int ret, i; 1447 1448 TRACE_DBG("CPU %d: mtcp_free_context()\n", mctx->cpu); 1449 1450 if (g_pctx[mctx->cpu] == NULL) return; 1451 /* close all stream sockets that are still open */ 1452 if (!ctx->exit) { 1453 for (i = 0; i < g_config.mos->max_concurrency; i++) { 1454 if (mtcp->smap[i].socktype == MOS_SOCK_STREAM) { 1455 TRACE_DBG("Closing remaining socket %d (%s)\n", 1456 i, TCPStateToString(mtcp->smap[i].stream)); 1457 #ifdef DUMP_STREAM 1458 DumpStream(mtcp, mtcp->smap[i].stream); 1459 #endif 1460 mtcp_close(mctx, i); 1461 } 1462 } 1463 } 1464 1465 ctx->done = 1; 1466 1467 //pthread_kill(g_thread[mctx->cpu], SIGINT); 1468 ctx->exit = 1; 1469 1470 if ((ret = pthread_join(g_thread[ctx->cpu], NULL) != 0)) { 1471 TRACE_ERROR("pthread_join() returns error (errno = %s)\n", strerror(ret)); 1472 exit(EXIT_FAILURE); 1473 } 1474 1475 TRACE_INFO("MTCP thread %d joined.\n", mctx->cpu); 1476 running[mctx->cpu] = FALSE; 1477 1478 #ifdef NETSTAT 1479 #if NETSTAT_TOTAL 1480 if (printer == mctx->cpu) { 1481 for (i = 0; i < num_cpus; i++) { 1482 if (i != mctx->cpu && running[i]) { 1483 printer = i; 1484 break; 1485 } 1486 } 1487 } 1488 #endif 1489 #endif 1490 1491 log_ctx->done = 1; 1492 ret = write(log_ctx->pair_sp_fd, "F", 1); 1493 if (ret != 1) 1494 TRACE_ERROR("CPU %d: Fail to signal socket pair\n", mctx->cpu); 1495 1496 if ((ret = pthread_join(log_thread[ctx->cpu], NULL) != 0)) { 1497 TRACE_ERROR("pthread_join() returns error (errno = %s)\n", strerror(ret)); 1498 exit(EXIT_FAILURE); 1499 } 1500 1501 fclose(mtcp->log_fp); 1502 TRACE_LOG("Log thread %d joined.\n", mctx->cpu); 1503 1504 if (mtcp->connectq) { 1505 DestroyStreamQueue(mtcp->connectq); 1506 mtcp->connectq = NULL; 1507 } 1508 if (mtcp->sendq) { 1509 DestroyStreamQueue(mtcp->sendq); 1510 mtcp->sendq = NULL; 1511 } 1512 if (mtcp->ackq) { 1513 DestroyStreamQueue(mtcp->ackq); 1514 mtcp->ackq = NULL; 1515 } 1516 if (mtcp->closeq) { 1517 DestroyStreamQueue(mtcp->closeq); 1518 mtcp->closeq = NULL; 1519 } 1520 if (mtcp->closeq_int) { 1521 DestroyInternalStreamQueue(mtcp->closeq_int); 1522 mtcp->closeq_int = NULL; 1523 } 1524 if (mtcp->resetq) { 1525 DestroyStreamQueue(mtcp->resetq); 1526 mtcp->resetq = NULL; 1527 } 1528 if (mtcp->resetq_int) { 1529 DestroyInternalStreamQueue(mtcp->resetq_int); 1530 mtcp->resetq_int = NULL; 1531 } 1532 if (mtcp->destroyq) { 1533 DestroyStreamQueue(mtcp->destroyq); 1534 mtcp->destroyq = NULL; 1535 } 1536 1537 DestroyMTCPSender(mtcp->g_sender); 1538 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 1539 DestroyMTCPSender(mtcp->n_sender[i]); 1540 } 1541 1542 MPDestroy(mtcp->rv_pool); 1543 MPDestroy(mtcp->sv_pool); 1544 MPDestroy(mtcp->flow_pool); 1545 1546 if (mtcp->ap) { 1547 DestroyAddressPool(mtcp->ap); 1548 mtcp->ap = NULL; 1549 } 1550 1551 SQ_LOCK_DESTROY(&ctx->connect_lock); 1552 SQ_LOCK_DESTROY(&ctx->close_lock); 1553 SQ_LOCK_DESTROY(&ctx->reset_lock); 1554 SQ_LOCK_DESTROY(&ctx->sendq_lock); 1555 SQ_LOCK_DESTROY(&ctx->ackq_lock); 1556 SQ_LOCK_DESTROY(&ctx->destroyq_lock); 1557 1558 //TRACE_INFO("MTCP thread %d destroyed.\n", mctx->cpu); 1559 if (mtcp->iom->destroy_handle) 1560 mtcp->iom->destroy_handle(ctx); 1561 1562 if (g_logctx[mctx->cpu]) { 1563 free(g_logctx[mctx->cpu]); 1564 g_logctx[mctx->cpu] = NULL; 1565 } 1566 free(ctx); 1567 g_pctx[mctx->cpu] = NULL; 1568 } 1569 /*----------------------------------------------------------------------------*/ 1570 mtcp_sighandler_t 1571 mtcp_register_signal(int signum, mtcp_sighandler_t handler) 1572 { 1573 mtcp_sighandler_t prev; 1574 1575 if (signum == SIGINT) { 1576 prev = app_signal_handler; 1577 app_signal_handler = handler; 1578 } else { 1579 if ((prev = signal(signum, handler)) == SIG_ERR) { 1580 perror("signal"); 1581 return SIG_ERR; 1582 } 1583 } 1584 1585 return prev; 1586 } 1587 /*----------------------------------------------------------------------------*/ 1588 int 1589 mtcp_getconf(struct mtcp_conf *conf) 1590 { 1591 int i, j; 1592 1593 if (!conf) { 1594 errno = EINVAL; 1595 return -1; 1596 } 1597 1598 conf->num_cores = g_config.mos->num_cores; 1599 conf->max_concurrency = g_config.mos->max_concurrency; 1600 conf->cpu_mask = g_config.mos->cpu_mask; 1601 1602 conf->rcvbuf_size = g_config.mos->rmem_size; 1603 conf->sndbuf_size = g_config.mos->wmem_size; 1604 1605 conf->tcp_timewait = g_config.mos->tcp_tw_interval; 1606 conf->tcp_timeout = g_config.mos->tcp_timeout; 1607 1608 i = 0; 1609 struct conf_block *bwalk; 1610 TAILQ_FOREACH(bwalk, &g_config.app_blkh, link) { 1611 struct app_conf *app_conf = (struct app_conf *)bwalk->conf; 1612 for (j = 0; j < app_conf->app_argc; j++) 1613 conf->app_argv[i][j] = app_conf->app_argv[j]; 1614 conf->app_argc[i] = app_conf->app_argc; 1615 conf->app_cpu_mask[i] = app_conf->cpu_mask; 1616 i++; 1617 } 1618 conf->num_app = i; 1619 1620 return 0; 1621 } 1622 /*----------------------------------------------------------------------------*/ 1623 int 1624 mtcp_setconf(const struct mtcp_conf *conf) 1625 { 1626 if (!conf) 1627 return -1; 1628 1629 g_config.mos->num_cores = conf->num_cores; 1630 g_config.mos->max_concurrency = conf->max_concurrency; 1631 1632 g_config.mos->rmem_size = conf->rcvbuf_size; 1633 g_config.mos->wmem_size = conf->sndbuf_size; 1634 1635 g_config.mos->tcp_tw_interval = conf->tcp_timewait; 1636 g_config.mos->tcp_timeout = conf->tcp_timeout; 1637 1638 TRACE_CONFIG("Configuration updated by mtcp_setconf().\n"); 1639 //PrintConfiguration(); 1640 1641 return 0; 1642 } 1643 /*----------------------------------------------------------------------------*/ 1644 int 1645 mtcp_init(const char *config_file) 1646 { 1647 int i; 1648 int ret; 1649 1650 if (geteuid()) { 1651 TRACE_CONFIG("[CAUTION] Run as root if mlock is necessary.\n"); 1652 #if defined(ENABLE_DPDK) || defined(ENABLE_NETMAP) 1653 TRACE_CONFIG("[CAUTION] Run the app as root!\n"); 1654 exit(EXIT_FAILURE); 1655 #endif 1656 } 1657 1658 /* getting cpu and NIC */ 1659 num_cpus = GetNumCPUs(); 1660 assert(num_cpus >= 1); 1661 for (i = 0; i < num_cpus; i++) { 1662 g_mtcp[i] = NULL; 1663 running[i] = FALSE; 1664 sigint_cnt[i] = 0; 1665 } 1666 1667 ret = LoadConfigurationUpperHalf(config_file); 1668 if (ret) { 1669 TRACE_CONFIG("Error occured while loading configuration.\n"); 1670 return -1; 1671 } 1672 1673 #if defined(ENABLE_PSIO) 1674 current_iomodule_func = &ps_module_func; 1675 #elif defined(ENABLE_DPDK) 1676 current_iomodule_func = &dpdk_module_func; 1677 #elif defined(ENABLE_PCAP) 1678 current_iomodule_func = &pcap_module_func; 1679 #elif defined(ENABLE_NETMAP) 1680 current_iomodule_func = &netmap_module_func; 1681 #endif 1682 1683 if (current_iomodule_func->load_module_upper_half) 1684 current_iomodule_func->load_module_upper_half(); 1685 1686 LoadConfigurationLowerHalf(); 1687 1688 //PrintConfiguration(); 1689 1690 for (i = 0; i < g_config.mos->netdev_table->num; i++) { 1691 ap[i] = CreateAddressPool(g_config.mos->netdev_table->ent[i]->ip_addr, 1); 1692 if (!ap[i]) { 1693 TRACE_CONFIG("Error occured while create address pool[%d]\n", 1694 i); 1695 return -1; 1696 } 1697 } 1698 1699 //PrintInterfaceInfo(); 1700 //PrintRoutingTable(); 1701 //PrintARPTable(); 1702 InitARPTable(); 1703 1704 if (signal(SIGUSR1, HandleSignal) == SIG_ERR) { 1705 perror("signal, SIGUSR1"); 1706 return -1; 1707 } 1708 if (signal(SIGINT, HandleSignal) == SIG_ERR) { 1709 perror("signal, SIGINT"); 1710 return -1; 1711 } 1712 app_signal_handler = NULL; 1713 1714 printf("load_module(): %p\n", current_iomodule_func); 1715 /* load system-wide io module specs */ 1716 if (current_iomodule_func->load_module_lower_half) 1717 current_iomodule_func->load_module_lower_half(); 1718 1719 GlobInitEvent(); 1720 1721 PrintConf(&g_config); 1722 1723 return 0; 1724 } 1725 /*----------------------------------------------------------------------------*/ 1726 int 1727 mtcp_destroy() 1728 { 1729 int i; 1730 1731 /* wait until all threads are closed */ 1732 for (i = 0; i < num_cpus; i++) { 1733 if (running[i]) { 1734 if (pthread_join(g_thread[i], NULL) != 0) 1735 return -1; 1736 } 1737 } 1738 1739 for (i = 0; i < g_config.mos->netdev_table->num; i++) 1740 DestroyAddressPool(ap[i]); 1741 1742 TRACE_INFO("All MTCP threads are joined.\n"); 1743 1744 return 0; 1745 } 1746 /*----------------------------------------------------------------------------*/ 1747