176404edcSAsim Jamshed #define _GNU_SOURCE 276404edcSAsim Jamshed #include <sched.h> 376404edcSAsim Jamshed #include <unistd.h> 476404edcSAsim Jamshed #include <sys/time.h> 576404edcSAsim Jamshed #include <semaphore.h> 676404edcSAsim Jamshed #include <sys/mman.h> 776404edcSAsim Jamshed #include <signal.h> 876404edcSAsim Jamshed #include <assert.h> 976404edcSAsim Jamshed #include <string.h> 1076404edcSAsim Jamshed 1176404edcSAsim Jamshed #include "cpu.h" 1276404edcSAsim Jamshed #include "eth_in.h" 1376404edcSAsim Jamshed #include "fhash.h" 1476404edcSAsim Jamshed #include "tcp_send_buffer.h" 1576404edcSAsim Jamshed #include "tcp_ring_buffer.h" 1676404edcSAsim Jamshed #include "socket.h" 1776404edcSAsim Jamshed #include "eth_out.h" 1876404edcSAsim Jamshed #include "tcp.h" 1976404edcSAsim Jamshed #include "tcp_in.h" 2076404edcSAsim Jamshed #include "tcp_out.h" 2176404edcSAsim Jamshed #include "mtcp_api.h" 2276404edcSAsim Jamshed #include "eventpoll.h" 2376404edcSAsim Jamshed #include "logger.h" 2476404edcSAsim Jamshed #include "config.h" 2576404edcSAsim Jamshed #include "arp.h" 2676404edcSAsim Jamshed #include "ip_out.h" 2776404edcSAsim Jamshed #include "timer.h" 2876404edcSAsim Jamshed #include "debug.h" 2976404edcSAsim Jamshed #include "event_callback.h" 3076404edcSAsim Jamshed #include "tcp_rb.h" 3176404edcSAsim Jamshed #include "tcp_stream.h" 3276404edcSAsim Jamshed #include "io_module.h" 3376404edcSAsim Jamshed 3476404edcSAsim Jamshed #ifdef ENABLE_DPDK 3576404edcSAsim Jamshed /* for launching rte thread */ 3676404edcSAsim Jamshed #include <rte_launch.h> 3776404edcSAsim Jamshed #include <rte_lcore.h> 3876404edcSAsim Jamshed #endif /* !ENABLE_DPDK */ 3976404edcSAsim Jamshed #define PS_CHUNK_SIZE 64 4076404edcSAsim Jamshed #define RX_THRESH (PS_CHUNK_SIZE * 0.8) 4176404edcSAsim Jamshed 4276404edcSAsim Jamshed #define ROUND_STAT FALSE 4376404edcSAsim Jamshed #define TIME_STAT FALSE 4476404edcSAsim Jamshed #define EVENT_STAT FALSE 4576404edcSAsim Jamshed #define TESTING FALSE 4676404edcSAsim Jamshed 4776404edcSAsim Jamshed #define LOG_FILE_NAME "log" 4876404edcSAsim Jamshed #define MAX_FILE_NAME 1024 4976404edcSAsim Jamshed 5076404edcSAsim Jamshed #define MAX(a, b) ((a)>(b)?(a):(b)) 5176404edcSAsim Jamshed #define MIN(a, b) ((a)<(b)?(a):(b)) 5276404edcSAsim Jamshed 5376404edcSAsim Jamshed #define PER_STREAM_SLICE 0.1 // in ms 5476404edcSAsim Jamshed #define PER_STREAM_TCHECK 1 // in ms 5576404edcSAsim Jamshed #define PS_SELECT_TIMEOUT 100 // in us 5676404edcSAsim Jamshed 5776404edcSAsim Jamshed #define GBPS(bytes) (bytes * 8.0 / (1000 * 1000 * 1000)) 5876404edcSAsim Jamshed 5976404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 6076404edcSAsim Jamshed /* handlers for threads */ 6176404edcSAsim Jamshed struct mtcp_thread_context *g_pctx[MAX_CPUS] = {0}; 6276404edcSAsim Jamshed struct log_thread_context *g_logctx[MAX_CPUS] = {0}; 6376404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 6476404edcSAsim Jamshed static pthread_t g_thread[MAX_CPUS] = {0}; 6576404edcSAsim Jamshed static pthread_t log_thread[MAX_CPUS] = {0}; 6676404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 6776404edcSAsim Jamshed static sem_t g_init_sem[MAX_CPUS]; 6876404edcSAsim Jamshed static sem_t g_done_sem[MAX_CPUS]; 6976404edcSAsim Jamshed static int running[MAX_CPUS] = {0}; 7076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 7176404edcSAsim Jamshed mtcp_sighandler_t app_signal_handler; 7276404edcSAsim Jamshed static int sigint_cnt[MAX_CPUS] = {0}; 7376404edcSAsim Jamshed static struct timespec sigint_ts[MAX_CPUS]; 7476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 7576404edcSAsim Jamshed #ifdef NETSTAT 7676404edcSAsim Jamshed #if NETSTAT_TOTAL 7776404edcSAsim Jamshed static int printer = -1; 7876404edcSAsim Jamshed #if ROUND_STAT 7976404edcSAsim Jamshed #endif /* ROUND_STAT */ 8076404edcSAsim Jamshed #endif /* NETSTAT_TOTAL */ 8176404edcSAsim Jamshed #endif /* NETSTAT */ 8276404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 8376404edcSAsim Jamshed void 8476404edcSAsim Jamshed HandleSignal(int signal) 8576404edcSAsim Jamshed { 8676404edcSAsim Jamshed int i = 0; 8776404edcSAsim Jamshed 8876404edcSAsim Jamshed if (signal == SIGINT) { 898a941c7eSAsim Jamshed FreeConfigResources(); 9076404edcSAsim Jamshed #ifdef DARWIN 9176404edcSAsim Jamshed int core = 0; 9276404edcSAsim Jamshed #else 9376404edcSAsim Jamshed int core = sched_getcpu(); 9476404edcSAsim Jamshed #endif 9576404edcSAsim Jamshed struct timespec cur_ts; 9676404edcSAsim Jamshed 9776404edcSAsim Jamshed clock_gettime(CLOCK_REALTIME, &cur_ts); 9876404edcSAsim Jamshed 9976404edcSAsim Jamshed if (sigint_cnt[core] > 0 && cur_ts.tv_sec == sigint_ts[core].tv_sec) { 10076404edcSAsim Jamshed for (i = 0; i < g_config.mos->num_cores; i++) { 10176404edcSAsim Jamshed if (running[i]) { 10276404edcSAsim Jamshed exit(0); 10376404edcSAsim Jamshed g_pctx[i]->exit = TRUE; 10476404edcSAsim Jamshed } 10576404edcSAsim Jamshed } 10676404edcSAsim Jamshed } else { 10776404edcSAsim Jamshed for (i = 0; i < g_config.mos->num_cores; i++) { 10876404edcSAsim Jamshed if (g_pctx[i]) 10976404edcSAsim Jamshed g_pctx[i]->interrupt = TRUE; 11076404edcSAsim Jamshed } 11176404edcSAsim Jamshed if (!app_signal_handler) { 11276404edcSAsim Jamshed for (i = 0; i < g_config.mos->num_cores; i++) { 11376404edcSAsim Jamshed if (running[i]) { 11476404edcSAsim Jamshed exit(0); 11576404edcSAsim Jamshed g_pctx[i]->exit = TRUE; 11676404edcSAsim Jamshed } 11776404edcSAsim Jamshed } 11876404edcSAsim Jamshed } 11976404edcSAsim Jamshed } 12076404edcSAsim Jamshed sigint_cnt[core]++; 12176404edcSAsim Jamshed clock_gettime(CLOCK_REALTIME, &sigint_ts[core]); 12276404edcSAsim Jamshed } 12376404edcSAsim Jamshed 12476404edcSAsim Jamshed if (signal != SIGUSR1) { 12576404edcSAsim Jamshed if (app_signal_handler) { 12676404edcSAsim Jamshed app_signal_handler(signal); 12776404edcSAsim Jamshed } 12876404edcSAsim Jamshed } 12976404edcSAsim Jamshed } 13076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 13176404edcSAsim Jamshed static int 13276404edcSAsim Jamshed AttachDevice(struct mtcp_thread_context* ctx) 13376404edcSAsim Jamshed { 13476404edcSAsim Jamshed int working = -1; 13576404edcSAsim Jamshed mtcp_manager_t mtcp = ctx->mtcp_manager; 13676404edcSAsim Jamshed 13776404edcSAsim Jamshed if (mtcp->iom->link_devices) 13876404edcSAsim Jamshed working = mtcp->iom->link_devices(ctx); 13976404edcSAsim Jamshed else 14076404edcSAsim Jamshed return 0; 14176404edcSAsim Jamshed 14276404edcSAsim Jamshed return working; 14376404edcSAsim Jamshed } 14476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 14576404edcSAsim Jamshed #ifdef TIMESTAT 14676404edcSAsim Jamshed static inline void 14776404edcSAsim Jamshed InitStatCounter(struct stat_counter *counter) 14876404edcSAsim Jamshed { 14976404edcSAsim Jamshed counter->cnt = 0; 15076404edcSAsim Jamshed counter->sum = 0; 15176404edcSAsim Jamshed counter->max = 0; 15276404edcSAsim Jamshed counter->min = 0; 15376404edcSAsim Jamshed } 15476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 15576404edcSAsim Jamshed static inline void 15676404edcSAsim Jamshed UpdateStatCounter(struct stat_counter *counter, int64_t value) 15776404edcSAsim Jamshed { 15876404edcSAsim Jamshed counter->cnt++; 15976404edcSAsim Jamshed counter->sum += value; 16076404edcSAsim Jamshed if (value > counter->max) 16176404edcSAsim Jamshed counter->max = value; 16276404edcSAsim Jamshed if (counter->min == 0 || value < counter->min) 16376404edcSAsim Jamshed counter->min = value; 16476404edcSAsim Jamshed } 16576404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 16676404edcSAsim Jamshed static inline uint64_t 16776404edcSAsim Jamshed GetAverageStat(struct stat_counter *counter) 16876404edcSAsim Jamshed { 16976404edcSAsim Jamshed return counter->cnt ? (counter->sum / counter->cnt) : 0; 17076404edcSAsim Jamshed } 17176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 17276404edcSAsim Jamshed static inline int64_t 17376404edcSAsim Jamshed TimeDiffUs(struct timeval *t2, struct timeval *t1) 17476404edcSAsim Jamshed { 17576404edcSAsim Jamshed return (t2->tv_sec - t1->tv_sec) * 1000000 + 17676404edcSAsim Jamshed (int64_t)(t2->tv_usec - t1->tv_usec); 17776404edcSAsim Jamshed } 17876404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 17976404edcSAsim Jamshed #endif 18076404edcSAsim Jamshed #ifdef NETSTAT 18176404edcSAsim Jamshed static inline void 18276404edcSAsim Jamshed PrintThreadNetworkStats(mtcp_manager_t mtcp, struct net_stat *ns) 18376404edcSAsim Jamshed { 18476404edcSAsim Jamshed int i; 18576404edcSAsim Jamshed 18676404edcSAsim Jamshed for (i = 0; i < g_config.mos->netdev_table->num; i++) { 18776404edcSAsim Jamshed ns->rx_packets[i] = mtcp->nstat.rx_packets[i] - mtcp->p_nstat.rx_packets[i]; 18876404edcSAsim Jamshed ns->rx_errors[i] = mtcp->nstat.rx_errors[i] - mtcp->p_nstat.rx_errors[i]; 18976404edcSAsim Jamshed ns->rx_bytes[i] = mtcp->nstat.rx_bytes[i] - mtcp->p_nstat.rx_bytes[i]; 19076404edcSAsim Jamshed ns->tx_packets[i] = mtcp->nstat.tx_packets[i] - mtcp->p_nstat.tx_packets[i]; 19176404edcSAsim Jamshed ns->tx_drops[i] = mtcp->nstat.tx_drops[i] - mtcp->p_nstat.tx_drops[i]; 19276404edcSAsim Jamshed ns->tx_bytes[i] = mtcp->nstat.tx_bytes[i] - mtcp->p_nstat.tx_bytes[i]; 19376404edcSAsim Jamshed #if NETSTAT_PERTHREAD 19476404edcSAsim Jamshed if (g_config.mos->netdev_table->ent[i]->stat_print) { 19576404edcSAsim Jamshed fprintf(stderr, "[CPU%2d] %s flows: %6u, " 19676404edcSAsim Jamshed "RX: %7llu(pps) (err: %5llu), %5.2lf(Gbps), " 19776404edcSAsim Jamshed "TX: %7llu(pps), %5.2lf(Gbps)\n", 19876404edcSAsim Jamshed mtcp->ctx->cpu, 19976404edcSAsim Jamshed g_config.mos->netdev_table->ent[i]->dev_name, 20076404edcSAsim Jamshed (unsigned)mtcp->flow_cnt, 20176404edcSAsim Jamshed (long long unsigned)ns->rx_packets[i], 20276404edcSAsim Jamshed (long long unsigned)ns->rx_errors[i], 20376404edcSAsim Jamshed GBPS(ns->rx_bytes[i]), 20476404edcSAsim Jamshed (long long unsigned)ns->tx_packets[i], 20576404edcSAsim Jamshed GBPS(ns->tx_bytes[i])); 20676404edcSAsim Jamshed } 20776404edcSAsim Jamshed #endif 20876404edcSAsim Jamshed } 20976404edcSAsim Jamshed mtcp->p_nstat = mtcp->nstat; 21076404edcSAsim Jamshed 21176404edcSAsim Jamshed } 21276404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 21376404edcSAsim Jamshed #if ROUND_STAT 21476404edcSAsim Jamshed static inline void 21576404edcSAsim Jamshed PrintThreadRoundStats(mtcp_manager_t mtcp, struct run_stat *rs) 21676404edcSAsim Jamshed { 21776404edcSAsim Jamshed #define ROUND_DIV (1000) 21876404edcSAsim Jamshed rs->rounds = mtcp->runstat.rounds - mtcp->p_runstat.rounds; 21976404edcSAsim Jamshed rs->rounds_rx = mtcp->runstat.rounds_rx - mtcp->p_runstat.rounds_rx; 22076404edcSAsim Jamshed rs->rounds_rx_try = mtcp->runstat.rounds_rx_try - mtcp->p_runstat.rounds_rx_try; 22176404edcSAsim Jamshed rs->rounds_tx = mtcp->runstat.rounds_tx - mtcp->p_runstat.rounds_tx; 22276404edcSAsim Jamshed rs->rounds_tx_try = mtcp->runstat.rounds_tx_try - mtcp->p_runstat.rounds_tx_try; 22376404edcSAsim Jamshed rs->rounds_select = mtcp->runstat.rounds_select - mtcp->p_runstat.rounds_select; 22476404edcSAsim Jamshed rs->rounds_select_rx = mtcp->runstat.rounds_select_rx - mtcp->p_runstat.rounds_select_rx; 22576404edcSAsim Jamshed rs->rounds_select_tx = mtcp->runstat.rounds_select_tx - mtcp->p_runstat.rounds_select_tx; 22676404edcSAsim Jamshed rs->rounds_select_intr = mtcp->runstat.rounds_select_intr - mtcp->p_runstat.rounds_select_intr; 22776404edcSAsim Jamshed rs->rounds_twcheck = mtcp->runstat.rounds_twcheck - mtcp->p_runstat.rounds_twcheck; 22876404edcSAsim Jamshed mtcp->p_runstat = mtcp->runstat; 22976404edcSAsim Jamshed #if NETSTAT_PERTHREAD 23076404edcSAsim Jamshed fprintf(stderr, "[CPU%2d] Rounds: %4lluK, " 23176404edcSAsim Jamshed "rx: %3lluK (try: %4lluK), tx: %3lluK (try: %4lluK), " 23276404edcSAsim Jamshed "ps_select: %4llu (rx: %4llu, tx: %4llu, intr: %3llu)\n", 23376404edcSAsim Jamshed mtcp->ctx->cpu, rs->rounds / ROUND_DIV, 23476404edcSAsim Jamshed rs->rounds_rx / ROUND_DIV, rs->rounds_rx_try / ROUND_DIV, 23576404edcSAsim Jamshed rs->rounds_tx / ROUND_DIV, rs->rounds_tx_try / ROUND_DIV, 23676404edcSAsim Jamshed rs->rounds_select, 23776404edcSAsim Jamshed rs->rounds_select_rx, rs->rounds_select_tx, rs->rounds_select_intr); 23876404edcSAsim Jamshed #endif 23976404edcSAsim Jamshed } 24076404edcSAsim Jamshed #endif /* ROUND_STAT */ 24176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 24276404edcSAsim Jamshed #if TIME_STAT 24376404edcSAsim Jamshed static inline void 24476404edcSAsim Jamshed PrintThreadRoundTime(mtcp_manager_t mtcp) 24576404edcSAsim Jamshed { 24676404edcSAsim Jamshed fprintf(stderr, "[CPU%2d] Time: (avg, max) " 24776404edcSAsim Jamshed "round: (%4luus, %4luus), processing: (%4luus, %4luus), " 24876404edcSAsim Jamshed "tcheck: (%4luus, %4luus), epoll: (%4luus, %4luus), " 24976404edcSAsim Jamshed "handle: (%4luus, %4luus), xmit: (%4luus, %4luus), " 25076404edcSAsim Jamshed "select: (%4luus, %4luus)\n", mtcp->ctx->cpu, 25176404edcSAsim Jamshed GetAverageStat(&mtcp->rtstat.round), mtcp->rtstat.round.max, 25276404edcSAsim Jamshed GetAverageStat(&mtcp->rtstat.processing), mtcp->rtstat.processing.max, 25376404edcSAsim Jamshed GetAverageStat(&mtcp->rtstat.tcheck), mtcp->rtstat.tcheck.max, 25476404edcSAsim Jamshed GetAverageStat(&mtcp->rtstat.epoll), mtcp->rtstat.epoll.max, 25576404edcSAsim Jamshed GetAverageStat(&mtcp->rtstat.handle), mtcp->rtstat.handle.max, 25676404edcSAsim Jamshed GetAverageStat(&mtcp->rtstat.xmit), mtcp->rtstat.xmit.max, 25776404edcSAsim Jamshed GetAverageStat(&mtcp->rtstat.select), mtcp->rtstat.select.max); 25876404edcSAsim Jamshed 25976404edcSAsim Jamshed InitStatCounter(&mtcp->rtstat.round); 26076404edcSAsim Jamshed InitStatCounter(&mtcp->rtstat.processing); 26176404edcSAsim Jamshed InitStatCounter(&mtcp->rtstat.tcheck); 26276404edcSAsim Jamshed InitStatCounter(&mtcp->rtstat.epoll); 26376404edcSAsim Jamshed InitStatCounter(&mtcp->rtstat.handle); 26476404edcSAsim Jamshed InitStatCounter(&mtcp->rtstat.xmit); 26576404edcSAsim Jamshed InitStatCounter(&mtcp->rtstat.select); 26676404edcSAsim Jamshed } 26776404edcSAsim Jamshed #endif 26876404edcSAsim Jamshed #endif /* NETSTAT */ 26976404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 27076404edcSAsim Jamshed #if EVENT_STAT 27176404edcSAsim Jamshed static inline void 27276404edcSAsim Jamshed PrintEventStat(int core, struct mtcp_epoll_stat *stat) 27376404edcSAsim Jamshed { 27476404edcSAsim Jamshed fprintf(stderr, "[CPU%2d] calls: %lu, waits: %lu, wakes: %lu, " 27576404edcSAsim Jamshed "issued: %lu, registered: %lu, invalidated: %lu, handled: %lu\n", 27676404edcSAsim Jamshed core, stat->calls, stat->waits, stat->wakes, 27776404edcSAsim Jamshed stat->issued, stat->registered, stat->invalidated, stat->handled); 27876404edcSAsim Jamshed memset(stat, 0, sizeof(struct mtcp_epoll_stat)); 27976404edcSAsim Jamshed } 28076404edcSAsim Jamshed #endif /* EVENT_STAT */ 28176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 28276404edcSAsim Jamshed #ifdef NETSTAT 28376404edcSAsim Jamshed static inline void 28476404edcSAsim Jamshed PrintNetworkStats(mtcp_manager_t mtcp, uint32_t cur_ts) 28576404edcSAsim Jamshed { 28676404edcSAsim Jamshed #define TIMEOUT 1 28776404edcSAsim Jamshed int i; 28876404edcSAsim Jamshed struct net_stat ns; 28976404edcSAsim Jamshed bool stat_print = false; 29076404edcSAsim Jamshed #if ROUND_STAT 29176404edcSAsim Jamshed struct run_stat rs; 29276404edcSAsim Jamshed #endif /* ROUND_STAT */ 29376404edcSAsim Jamshed #ifdef NETSTAT_TOTAL 29476404edcSAsim Jamshed static double peak_total_rx_gbps = 0; 29576404edcSAsim Jamshed static double peak_total_tx_gbps = 0; 29676404edcSAsim Jamshed static double avg_total_rx_gbps = 0; 29776404edcSAsim Jamshed static double avg_total_tx_gbps = 0; 29876404edcSAsim Jamshed 29976404edcSAsim Jamshed double total_rx_gbps = 0, total_tx_gbps = 0; 30076404edcSAsim Jamshed int j; 30176404edcSAsim Jamshed uint32_t gflow_cnt = 0; 30276404edcSAsim Jamshed struct net_stat g_nstat; 30376404edcSAsim Jamshed #if ROUND_STAT 30476404edcSAsim Jamshed struct run_stat g_runstat; 30576404edcSAsim Jamshed #endif /* ROUND_STAT */ 30676404edcSAsim Jamshed #endif /* NETSTAT_TOTAL */ 30776404edcSAsim Jamshed 30876404edcSAsim Jamshed if (TS_TO_MSEC(cur_ts - mtcp->p_nstat_ts) < SEC_TO_MSEC(TIMEOUT)) { 30976404edcSAsim Jamshed return; 31076404edcSAsim Jamshed } 31176404edcSAsim Jamshed 31276404edcSAsim Jamshed mtcp->p_nstat_ts = cur_ts; 31376404edcSAsim Jamshed gflow_cnt = 0; 31476404edcSAsim Jamshed memset(&g_nstat, 0, sizeof(struct net_stat)); 31576404edcSAsim Jamshed for (i = 0; i < g_config.mos->num_cores; i++) { 31676404edcSAsim Jamshed if (running[i]) { 31776404edcSAsim Jamshed PrintThreadNetworkStats(g_mtcp[i], &ns); 31876404edcSAsim Jamshed #if NETSTAT_TOTAL 31976404edcSAsim Jamshed gflow_cnt += g_mtcp[i]->flow_cnt; 32076404edcSAsim Jamshed for (j = 0; j < g_config.mos->netdev_table->num; j++) { 32176404edcSAsim Jamshed g_nstat.rx_packets[j] += ns.rx_packets[j]; 32276404edcSAsim Jamshed g_nstat.rx_errors[j] += ns.rx_errors[j]; 32376404edcSAsim Jamshed g_nstat.rx_bytes[j] += ns.rx_bytes[j]; 32476404edcSAsim Jamshed g_nstat.tx_packets[j] += ns.tx_packets[j]; 32576404edcSAsim Jamshed g_nstat.tx_drops[j] += ns.tx_drops[j]; 32676404edcSAsim Jamshed g_nstat.tx_bytes[j] += ns.tx_bytes[j]; 32776404edcSAsim Jamshed } 32876404edcSAsim Jamshed #endif 32976404edcSAsim Jamshed } 33076404edcSAsim Jamshed } 33176404edcSAsim Jamshed #if NETSTAT_TOTAL 33276404edcSAsim Jamshed for (i = 0; i < g_config.mos->netdev_table->num; i++) { 33376404edcSAsim Jamshed if (g_config.mos->netdev_table->ent[i]->stat_print) { 33476404edcSAsim Jamshed fprintf(stderr, "[ ALL ] %s, " 33576404edcSAsim Jamshed "RX: %7llu(pps) (err: %5llu), %5.2lf(Gbps), " 33676404edcSAsim Jamshed "TX: %7llu(pps), %5.2lf(Gbps)\n", 33776404edcSAsim Jamshed g_config.mos->netdev_table->ent[i]->dev_name, 33876404edcSAsim Jamshed (long long unsigned)g_nstat.rx_packets[i], 33976404edcSAsim Jamshed (long long unsigned)g_nstat.rx_errors[i], 34076404edcSAsim Jamshed GBPS(g_nstat.rx_bytes[i]), 34176404edcSAsim Jamshed (long long unsigned)g_nstat.tx_packets[i], 34276404edcSAsim Jamshed GBPS(g_nstat.tx_bytes[i])); 34376404edcSAsim Jamshed total_rx_gbps += GBPS(g_nstat.rx_bytes[i]); 34476404edcSAsim Jamshed total_tx_gbps += GBPS(g_nstat.tx_bytes[i]); 34576404edcSAsim Jamshed stat_print = true; 34676404edcSAsim Jamshed } 34776404edcSAsim Jamshed } 34876404edcSAsim Jamshed if (stat_print) { 34976404edcSAsim Jamshed fprintf(stderr, "[ ALL ] flows: %6u\n", gflow_cnt); 35076404edcSAsim Jamshed if (avg_total_rx_gbps == 0) 35176404edcSAsim Jamshed avg_total_rx_gbps = total_rx_gbps; 35276404edcSAsim Jamshed else 35376404edcSAsim Jamshed avg_total_rx_gbps = avg_total_rx_gbps * 0.6 + total_rx_gbps * 0.4; 35476404edcSAsim Jamshed 35576404edcSAsim Jamshed if (avg_total_tx_gbps == 0) 35676404edcSAsim Jamshed avg_total_tx_gbps = total_tx_gbps; 35776404edcSAsim Jamshed else 35876404edcSAsim Jamshed avg_total_tx_gbps = avg_total_tx_gbps * 0.6 + total_tx_gbps * 0.4; 35976404edcSAsim Jamshed 36076404edcSAsim Jamshed if (peak_total_rx_gbps < total_rx_gbps) 36176404edcSAsim Jamshed peak_total_rx_gbps = total_rx_gbps; 36276404edcSAsim Jamshed if (peak_total_tx_gbps < total_tx_gbps) 36376404edcSAsim Jamshed peak_total_tx_gbps = total_tx_gbps; 36476404edcSAsim Jamshed 36576404edcSAsim Jamshed fprintf(stderr, "[ PEAK ] RX: %5.2lf(Gbps), TX: %5.2lf(Gbps)\n" 36676404edcSAsim Jamshed "[ RECENT AVG ] RX: %5.2lf(Gbps), TX: %5.2lf(Gbps)\n", 36776404edcSAsim Jamshed peak_total_rx_gbps, peak_total_tx_gbps, 36876404edcSAsim Jamshed avg_total_rx_gbps, avg_total_tx_gbps); 36976404edcSAsim Jamshed } 37076404edcSAsim Jamshed #endif 37176404edcSAsim Jamshed 37276404edcSAsim Jamshed #if ROUND_STAT 37376404edcSAsim Jamshed memset(&g_runstat, 0, sizeof(struct run_stat)); 37476404edcSAsim Jamshed for (i = 0; i < g_config.mos->num_cores; i++) { 37576404edcSAsim Jamshed if (running[i]) { 37676404edcSAsim Jamshed PrintThreadRoundStats(g_mtcp[i], &rs); 37776404edcSAsim Jamshed #if DBGMSG 37876404edcSAsim Jamshed g_runstat.rounds += rs.rounds; 37976404edcSAsim Jamshed g_runstat.rounds_rx += rs.rounds_rx; 38076404edcSAsim Jamshed g_runstat.rounds_rx_try += rs.rounds_rx_try; 38176404edcSAsim Jamshed g_runstat.rounds_tx += rs.rounds_tx; 38276404edcSAsim Jamshed g_runstat.rounds_tx_try += rs.rounds_tx_try; 38376404edcSAsim Jamshed g_runstat.rounds_select += rs.rounds_select; 38476404edcSAsim Jamshed g_runstat.rounds_select_rx += rs.rounds_select_rx; 38576404edcSAsim Jamshed g_runstat.rounds_select_tx += rs.rounds_select_tx; 38676404edcSAsim Jamshed #endif 38776404edcSAsim Jamshed } 38876404edcSAsim Jamshed } 38976404edcSAsim Jamshed 39076404edcSAsim Jamshed TRACE_DBG("[ ALL ] Rounds: %4ldK, " 39176404edcSAsim Jamshed "rx: %3ldK (try: %4ldK), tx: %3ldK (try: %4ldK), " 39276404edcSAsim Jamshed "ps_select: %4ld (rx: %4ld, tx: %4ld)\n", 39376404edcSAsim Jamshed g_runstat.rounds / 1000, g_runstat.rounds_rx / 1000, 39476404edcSAsim Jamshed g_runstat.rounds_rx_try / 1000, g_runstat.rounds_tx / 1000, 39576404edcSAsim Jamshed g_runstat.rounds_tx_try / 1000, g_runstat.rounds_select, 39676404edcSAsim Jamshed g_runstat.rounds_select_rx, g_runstat.rounds_select_tx); 39776404edcSAsim Jamshed #endif /* ROUND_STAT */ 39876404edcSAsim Jamshed 39976404edcSAsim Jamshed #if TIME_STAT 40076404edcSAsim Jamshed for (i = 0; i < g_config.mos->num_cores; i++) { 40176404edcSAsim Jamshed if (running[i]) { 40276404edcSAsim Jamshed PrintThreadRoundTime(g_mtcp[i]); 40376404edcSAsim Jamshed } 40476404edcSAsim Jamshed } 40576404edcSAsim Jamshed #endif 40676404edcSAsim Jamshed 40776404edcSAsim Jamshed #if EVENT_STAT 40876404edcSAsim Jamshed for (i = 0; i < g_config.mos->num_cores; i++) { 40976404edcSAsim Jamshed if (running[i] && g_mtcp[i]->ep) { 41076404edcSAsim Jamshed PrintEventStat(i, &g_mtcp[i]->ep->stat); 41176404edcSAsim Jamshed } 41276404edcSAsim Jamshed } 41376404edcSAsim Jamshed #endif 41476404edcSAsim Jamshed 41576404edcSAsim Jamshed fflush(stderr); 41676404edcSAsim Jamshed } 41776404edcSAsim Jamshed #endif /* NETSTAT */ 41876404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 41976404edcSAsim Jamshed static inline void 42076404edcSAsim Jamshed FlushMonitorReadEvents(mtcp_manager_t mtcp) 42176404edcSAsim Jamshed { 42276404edcSAsim Jamshed struct event_queue *mtcpq; 42376404edcSAsim Jamshed struct tcp_stream *cur_stream; 42476404edcSAsim Jamshed struct mon_listener *walk; 42576404edcSAsim Jamshed 42676404edcSAsim Jamshed /* check if monitor sockets should be passed data */ 42776404edcSAsim Jamshed TAILQ_FOREACH(walk, &mtcp->monitors, link) { 42876404edcSAsim Jamshed if (walk->socket->socktype != MOS_SOCK_MONITOR_STREAM || 42976404edcSAsim Jamshed !(mtcpq = walk->eq)) 43076404edcSAsim Jamshed continue; 43176404edcSAsim Jamshed 43276404edcSAsim Jamshed while (mtcpq->num_events > 0) { 43376404edcSAsim Jamshed cur_stream = 43476404edcSAsim Jamshed (struct tcp_stream *)mtcpq->events[mtcpq->start++].ev.data.ptr; 43576404edcSAsim Jamshed /* only read events */ 43676404edcSAsim Jamshed if (cur_stream != NULL && 437c6a5549bSAsim Jamshed (cur_stream->actions & MOS_ACT_READ_DATA)) { 43876404edcSAsim Jamshed if (cur_stream->rcvvar != NULL && 43976404edcSAsim Jamshed cur_stream->rcvvar->rcvbuf != NULL) { 44076404edcSAsim Jamshed /* no need to pass pkt context */ 44176404edcSAsim Jamshed struct socket_map *walk; 44276404edcSAsim Jamshed SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 44376404edcSAsim Jamshed HandleCallback(mtcp, MOS_NULL, walk, 44476404edcSAsim Jamshed cur_stream->side, NULL, 44576404edcSAsim Jamshed MOS_ON_CONN_NEW_DATA); 44676404edcSAsim Jamshed } SOCKQ_FOREACH_END; 44776404edcSAsim Jamshed } 44876404edcSAsim Jamshed /* reset the actions now */ 44976404edcSAsim Jamshed cur_stream->actions = 0; 45076404edcSAsim Jamshed } 45176404edcSAsim Jamshed if (mtcpq->start >= mtcpq->size) 45276404edcSAsim Jamshed mtcpq->start = 0; 45376404edcSAsim Jamshed mtcpq->num_events--; 45476404edcSAsim Jamshed } 45576404edcSAsim Jamshed } 45676404edcSAsim Jamshed } 45776404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 45876404edcSAsim Jamshed static inline void 45976404edcSAsim Jamshed FlushBufferedReadEvents(mtcp_manager_t mtcp) 46076404edcSAsim Jamshed { 46176404edcSAsim Jamshed int i; 46276404edcSAsim Jamshed int offset; 46376404edcSAsim Jamshed struct event_queue *mtcpq; 46476404edcSAsim Jamshed struct tcp_stream *cur_stream; 46576404edcSAsim Jamshed 46676404edcSAsim Jamshed if (mtcp->ep == NULL) { 46776404edcSAsim Jamshed TRACE_EPOLL("No epoll socket has been registered yet!\n"); 46876404edcSAsim Jamshed return; 46976404edcSAsim Jamshed } else { 47076404edcSAsim Jamshed /* case when mtcpq exists */ 47176404edcSAsim Jamshed mtcpq = mtcp->ep->mtcp_queue; 47276404edcSAsim Jamshed offset = mtcpq->start; 47376404edcSAsim Jamshed } 47476404edcSAsim Jamshed 47576404edcSAsim Jamshed /* we will use queued-up epoll read-in events 47676404edcSAsim Jamshed * to trigger buffered read monitor events */ 47776404edcSAsim Jamshed for (i = 0; i < mtcpq->num_events; i++) { 47876404edcSAsim Jamshed cur_stream = mtcp->smap[mtcpq->events[offset++].sockid].stream; 47976404edcSAsim Jamshed /* only read events */ 48076404edcSAsim Jamshed /* Raise new data callback event */ 48176404edcSAsim Jamshed if (cur_stream != NULL && 48276404edcSAsim Jamshed (cur_stream->socket->events | MOS_EPOLLIN)) { 48376404edcSAsim Jamshed if (cur_stream->rcvvar != NULL && 48476404edcSAsim Jamshed cur_stream->rcvvar->rcvbuf != NULL) { 48576404edcSAsim Jamshed /* no need to pass pkt context */ 48676404edcSAsim Jamshed struct socket_map *walk; 48776404edcSAsim Jamshed SOCKQ_FOREACH_START(walk, &cur_stream->msocks) { 48876404edcSAsim Jamshed HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side, 48976404edcSAsim Jamshed NULL, MOS_ON_CONN_NEW_DATA); 49076404edcSAsim Jamshed } SOCKQ_FOREACH_END; 49176404edcSAsim Jamshed } 49276404edcSAsim Jamshed } 49376404edcSAsim Jamshed if (offset >= mtcpq->size) 49476404edcSAsim Jamshed offset = 0; 49576404edcSAsim Jamshed } 49676404edcSAsim Jamshed } 49776404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 49876404edcSAsim Jamshed static inline void 49976404edcSAsim Jamshed FlushEpollEvents(mtcp_manager_t mtcp, uint32_t cur_ts) 50076404edcSAsim Jamshed { 50176404edcSAsim Jamshed struct mtcp_epoll *ep = mtcp->ep; 50276404edcSAsim Jamshed struct event_queue *usrq = ep->usr_queue; 50376404edcSAsim Jamshed struct event_queue *mtcpq = ep->mtcp_queue; 50476404edcSAsim Jamshed 50576404edcSAsim Jamshed pthread_mutex_lock(&ep->epoll_lock); 50676404edcSAsim Jamshed if (ep->mtcp_queue->num_events > 0) { 50776404edcSAsim Jamshed /* while mtcp_queue have events */ 50876404edcSAsim Jamshed /* and usr_queue is not full */ 50976404edcSAsim Jamshed while (mtcpq->num_events > 0 && usrq->num_events < usrq->size) { 51076404edcSAsim Jamshed /* copy the event from mtcp_queue to usr_queue */ 51176404edcSAsim Jamshed usrq->events[usrq->end++] = mtcpq->events[mtcpq->start++]; 51276404edcSAsim Jamshed 51376404edcSAsim Jamshed if (usrq->end >= usrq->size) 51476404edcSAsim Jamshed usrq->end = 0; 51576404edcSAsim Jamshed usrq->num_events++; 51676404edcSAsim Jamshed 51776404edcSAsim Jamshed if (mtcpq->start >= mtcpq->size) 51876404edcSAsim Jamshed mtcpq->start = 0; 51976404edcSAsim Jamshed mtcpq->num_events--; 52076404edcSAsim Jamshed } 52176404edcSAsim Jamshed } 52276404edcSAsim Jamshed 52376404edcSAsim Jamshed /* if there are pending events, wake up user */ 52476404edcSAsim Jamshed if (ep->waiting && (ep->usr_queue->num_events > 0 || 52576404edcSAsim Jamshed ep->usr_shadow_queue->num_events > 0)) { 52676404edcSAsim Jamshed STAT_COUNT(mtcp->runstat.rounds_epoll); 52776404edcSAsim Jamshed TRACE_EPOLL("Broadcasting events. num: %d, cur_ts: %u, prev_ts: %u\n", 52876404edcSAsim Jamshed ep->usr_queue->num_events, cur_ts, mtcp->ts_last_event); 52976404edcSAsim Jamshed mtcp->ts_last_event = cur_ts; 53076404edcSAsim Jamshed ep->stat.wakes++; 53176404edcSAsim Jamshed pthread_cond_signal(&ep->epoll_cond); 53276404edcSAsim Jamshed } 53376404edcSAsim Jamshed pthread_mutex_unlock(&ep->epoll_lock); 53476404edcSAsim Jamshed } 53576404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 53676404edcSAsim Jamshed static inline void 53776404edcSAsim Jamshed HandleApplicationCalls(mtcp_manager_t mtcp, uint32_t cur_ts) 53876404edcSAsim Jamshed { 53976404edcSAsim Jamshed tcp_stream *stream; 54076404edcSAsim Jamshed int cnt, max_cnt; 54176404edcSAsim Jamshed int handled, delayed; 54276404edcSAsim Jamshed int control, send, ack; 54376404edcSAsim Jamshed 54476404edcSAsim Jamshed /* connect handling */ 54576404edcSAsim Jamshed while ((stream = StreamDequeue(mtcp->connectq))) { 54676404edcSAsim Jamshed if (stream->state != TCP_ST_SYN_SENT) { 54776404edcSAsim Jamshed TRACE_INFO("Got a connection request from app with state: %s", 54876404edcSAsim Jamshed TCPStateToString(stream)); 54976404edcSAsim Jamshed exit(EXIT_FAILURE); 55076404edcSAsim Jamshed } else { 55176404edcSAsim Jamshed stream->cb_events |= MOS_ON_CONN_START | 55276404edcSAsim Jamshed MOS_ON_TCP_STATE_CHANGE; 55376404edcSAsim Jamshed /* if monitor is on... */ 55476404edcSAsim Jamshed if (stream->pair_stream != NULL) 55576404edcSAsim Jamshed stream->pair_stream->cb_events |= 55676404edcSAsim Jamshed MOS_ON_CONN_START; 55776404edcSAsim Jamshed } 55876404edcSAsim Jamshed AddtoControlList(mtcp, stream, cur_ts); 55976404edcSAsim Jamshed } 56076404edcSAsim Jamshed 56176404edcSAsim Jamshed /* send queue handling */ 56276404edcSAsim Jamshed while ((stream = StreamDequeue(mtcp->sendq))) { 56376404edcSAsim Jamshed stream->sndvar->on_sendq = FALSE; 56476404edcSAsim Jamshed AddtoSendList(mtcp, stream); 56576404edcSAsim Jamshed } 56676404edcSAsim Jamshed 56776404edcSAsim Jamshed /* ack queue handling */ 56876404edcSAsim Jamshed while ((stream = StreamDequeue(mtcp->ackq))) { 56976404edcSAsim Jamshed stream->sndvar->on_ackq = FALSE; 57076404edcSAsim Jamshed EnqueueACK(mtcp, stream, cur_ts, ACK_OPT_AGGREGATE); 57176404edcSAsim Jamshed } 57276404edcSAsim Jamshed 57376404edcSAsim Jamshed /* close handling */ 57476404edcSAsim Jamshed handled = delayed = 0; 57576404edcSAsim Jamshed control = send = ack = 0; 57676404edcSAsim Jamshed while ((stream = StreamDequeue(mtcp->closeq))) { 57776404edcSAsim Jamshed struct tcp_send_vars *sndvar = stream->sndvar; 57876404edcSAsim Jamshed sndvar->on_closeq = FALSE; 57976404edcSAsim Jamshed 58076404edcSAsim Jamshed if (sndvar->sndbuf) { 58176404edcSAsim Jamshed sndvar->fss = sndvar->sndbuf->head_seq + sndvar->sndbuf->len; 58276404edcSAsim Jamshed } else { 58376404edcSAsim Jamshed sndvar->fss = stream->snd_nxt; 58476404edcSAsim Jamshed } 58576404edcSAsim Jamshed 58676404edcSAsim Jamshed if (g_config.mos->tcp_timeout > 0) 58776404edcSAsim Jamshed RemoveFromTimeoutList(mtcp, stream); 58876404edcSAsim Jamshed 58976404edcSAsim Jamshed if (stream->have_reset) { 59076404edcSAsim Jamshed handled++; 59176404edcSAsim Jamshed if (stream->state != TCP_ST_CLOSED_RSVD) { 59276404edcSAsim Jamshed stream->close_reason = TCP_RESET; 59376404edcSAsim Jamshed stream->state = TCP_ST_CLOSED_RSVD; 59476404edcSAsim Jamshed stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 59576404edcSAsim Jamshed TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 59676404edcSAsim Jamshed DestroyTCPStream(mtcp, stream); 59776404edcSAsim Jamshed } else { 59876404edcSAsim Jamshed TRACE_ERROR("Stream already closed.\n"); 59976404edcSAsim Jamshed } 60076404edcSAsim Jamshed 60176404edcSAsim Jamshed } else if (sndvar->on_control_list) { 60276404edcSAsim Jamshed sndvar->on_closeq_int = TRUE; 60376404edcSAsim Jamshed StreamInternalEnqueue(mtcp->closeq_int, stream); 60476404edcSAsim Jamshed delayed++; 60576404edcSAsim Jamshed if (sndvar->on_control_list) 60676404edcSAsim Jamshed control++; 60776404edcSAsim Jamshed if (sndvar->on_send_list) 60876404edcSAsim Jamshed send++; 60976404edcSAsim Jamshed if (sndvar->on_ack_list) 61076404edcSAsim Jamshed ack++; 61176404edcSAsim Jamshed 61276404edcSAsim Jamshed } else if (sndvar->on_send_list || sndvar->on_ack_list) { 61376404edcSAsim Jamshed handled++; 61476404edcSAsim Jamshed if (stream->state == TCP_ST_ESTABLISHED) { 61576404edcSAsim Jamshed stream->state = TCP_ST_FIN_WAIT_1; 61676404edcSAsim Jamshed stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 61776404edcSAsim Jamshed TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id); 61876404edcSAsim Jamshed 61976404edcSAsim Jamshed } else if (stream->state == TCP_ST_CLOSE_WAIT) { 62076404edcSAsim Jamshed stream->state = TCP_ST_LAST_ACK; 62176404edcSAsim Jamshed stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 62276404edcSAsim Jamshed TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id); 62376404edcSAsim Jamshed } 62476404edcSAsim Jamshed stream->control_list_waiting = TRUE; 62576404edcSAsim Jamshed 62676404edcSAsim Jamshed } else if (stream->state != TCP_ST_CLOSED_RSVD) { 62776404edcSAsim Jamshed handled++; 62876404edcSAsim Jamshed if (stream->state == TCP_ST_ESTABLISHED) { 62976404edcSAsim Jamshed stream->state = TCP_ST_FIN_WAIT_1; 63076404edcSAsim Jamshed stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 63176404edcSAsim Jamshed TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id); 63276404edcSAsim Jamshed 63376404edcSAsim Jamshed } else if (stream->state == TCP_ST_CLOSE_WAIT) { 63476404edcSAsim Jamshed stream->state = TCP_ST_LAST_ACK; 63576404edcSAsim Jamshed stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 63676404edcSAsim Jamshed TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id); 63776404edcSAsim Jamshed } 63876404edcSAsim Jamshed //sndvar->rto = TCP_FIN_RTO; 63976404edcSAsim Jamshed //UpdateRetransmissionTimer(mtcp, stream, mtcp->cur_ts); 64076404edcSAsim Jamshed AddtoControlList(mtcp, stream, cur_ts); 64176404edcSAsim Jamshed } else { 64276404edcSAsim Jamshed TRACE_ERROR("Already closed connection!\n"); 64376404edcSAsim Jamshed } 64476404edcSAsim Jamshed } 64576404edcSAsim Jamshed TRACE_ROUND("Handling close connections. cnt: %d\n", cnt); 64676404edcSAsim Jamshed 64776404edcSAsim Jamshed cnt = 0; 64876404edcSAsim Jamshed max_cnt = mtcp->closeq_int->count; 64976404edcSAsim Jamshed while (cnt++ < max_cnt) { 65076404edcSAsim Jamshed stream = StreamInternalDequeue(mtcp->closeq_int); 65176404edcSAsim Jamshed 65276404edcSAsim Jamshed if (stream->sndvar->on_control_list) { 65376404edcSAsim Jamshed StreamInternalEnqueue(mtcp->closeq_int, stream); 65476404edcSAsim Jamshed 65576404edcSAsim Jamshed } else if (stream->state != TCP_ST_CLOSED_RSVD) { 65676404edcSAsim Jamshed handled++; 65776404edcSAsim Jamshed stream->sndvar->on_closeq_int = FALSE; 65876404edcSAsim Jamshed if (stream->state == TCP_ST_ESTABLISHED) { 65976404edcSAsim Jamshed stream->state = TCP_ST_FIN_WAIT_1; 66076404edcSAsim Jamshed stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 66176404edcSAsim Jamshed TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id); 66276404edcSAsim Jamshed 66376404edcSAsim Jamshed } else if (stream->state == TCP_ST_CLOSE_WAIT) { 66476404edcSAsim Jamshed stream->state = TCP_ST_LAST_ACK; 66576404edcSAsim Jamshed stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 66676404edcSAsim Jamshed TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id); 66776404edcSAsim Jamshed } 66876404edcSAsim Jamshed AddtoControlList(mtcp, stream, cur_ts); 66976404edcSAsim Jamshed } else { 67076404edcSAsim Jamshed stream->sndvar->on_closeq_int = FALSE; 67176404edcSAsim Jamshed TRACE_ERROR("Already closed connection!\n"); 67276404edcSAsim Jamshed } 67376404edcSAsim Jamshed } 67476404edcSAsim Jamshed 67576404edcSAsim Jamshed /* reset handling */ 67676404edcSAsim Jamshed while ((stream = StreamDequeue(mtcp->resetq))) { 67776404edcSAsim Jamshed stream->sndvar->on_resetq = FALSE; 67876404edcSAsim Jamshed 67976404edcSAsim Jamshed if (g_config.mos->tcp_timeout > 0) 68076404edcSAsim Jamshed RemoveFromTimeoutList(mtcp, stream); 68176404edcSAsim Jamshed 68276404edcSAsim Jamshed if (stream->have_reset) { 68376404edcSAsim Jamshed if (stream->state != TCP_ST_CLOSED_RSVD) { 68476404edcSAsim Jamshed stream->close_reason = TCP_RESET; 68576404edcSAsim Jamshed stream->state = TCP_ST_CLOSED_RSVD; 68676404edcSAsim Jamshed stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 68776404edcSAsim Jamshed TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 68876404edcSAsim Jamshed DestroyTCPStream(mtcp, stream); 68976404edcSAsim Jamshed } else { 69076404edcSAsim Jamshed TRACE_ERROR("Stream already closed.\n"); 69176404edcSAsim Jamshed } 69276404edcSAsim Jamshed 69376404edcSAsim Jamshed } else if (stream->sndvar->on_control_list || 69476404edcSAsim Jamshed stream->sndvar->on_send_list || stream->sndvar->on_ack_list) { 69576404edcSAsim Jamshed /* wait until all the queues are flushed */ 69676404edcSAsim Jamshed stream->sndvar->on_resetq_int = TRUE; 69776404edcSAsim Jamshed StreamInternalEnqueue(mtcp->resetq_int, stream); 69876404edcSAsim Jamshed 69976404edcSAsim Jamshed } else { 70076404edcSAsim Jamshed if (stream->state != TCP_ST_CLOSED_RSVD) { 70176404edcSAsim Jamshed stream->close_reason = TCP_ACTIVE_CLOSE; 70276404edcSAsim Jamshed stream->state = TCP_ST_CLOSED_RSVD; 70376404edcSAsim Jamshed stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 70476404edcSAsim Jamshed TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 70576404edcSAsim Jamshed AddtoControlList(mtcp, stream, cur_ts); 70676404edcSAsim Jamshed } else { 70776404edcSAsim Jamshed TRACE_ERROR("Stream already closed.\n"); 70876404edcSAsim Jamshed } 70976404edcSAsim Jamshed } 71076404edcSAsim Jamshed } 71176404edcSAsim Jamshed TRACE_ROUND("Handling reset connections. cnt: %d\n", cnt); 71276404edcSAsim Jamshed 71376404edcSAsim Jamshed cnt = 0; 71476404edcSAsim Jamshed max_cnt = mtcp->resetq_int->count; 71576404edcSAsim Jamshed while (cnt++ < max_cnt) { 71676404edcSAsim Jamshed stream = StreamInternalDequeue(mtcp->resetq_int); 71776404edcSAsim Jamshed 71876404edcSAsim Jamshed if (stream->sndvar->on_control_list || 71976404edcSAsim Jamshed stream->sndvar->on_send_list || stream->sndvar->on_ack_list) { 72076404edcSAsim Jamshed /* wait until all the queues are flushed */ 72176404edcSAsim Jamshed StreamInternalEnqueue(mtcp->resetq_int, stream); 72276404edcSAsim Jamshed 72376404edcSAsim Jamshed } else { 72476404edcSAsim Jamshed stream->sndvar->on_resetq_int = FALSE; 72576404edcSAsim Jamshed 72676404edcSAsim Jamshed if (stream->state != TCP_ST_CLOSED_RSVD) { 72776404edcSAsim Jamshed stream->close_reason = TCP_ACTIVE_CLOSE; 72876404edcSAsim Jamshed stream->state = TCP_ST_CLOSED_RSVD; 72976404edcSAsim Jamshed stream->cb_events |= MOS_ON_TCP_STATE_CHANGE; 73076404edcSAsim Jamshed TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id); 73176404edcSAsim Jamshed AddtoControlList(mtcp, stream, cur_ts); 73276404edcSAsim Jamshed } else { 73376404edcSAsim Jamshed TRACE_ERROR("Stream already closed.\n"); 73476404edcSAsim Jamshed } 73576404edcSAsim Jamshed } 73676404edcSAsim Jamshed } 73776404edcSAsim Jamshed 73876404edcSAsim Jamshed /* destroy streams in destroyq */ 73976404edcSAsim Jamshed while ((stream = StreamDequeue(mtcp->destroyq))) { 74076404edcSAsim Jamshed DestroyTCPStream(mtcp, stream); 74176404edcSAsim Jamshed } 74276404edcSAsim Jamshed 74376404edcSAsim Jamshed mtcp->wakeup_flag = FALSE; 74476404edcSAsim Jamshed } 74576404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 74676404edcSAsim Jamshed static inline void 74776404edcSAsim Jamshed WritePacketsToChunks(mtcp_manager_t mtcp, uint32_t cur_ts) 74876404edcSAsim Jamshed { 74976404edcSAsim Jamshed int thresh = g_config.mos->max_concurrency; 75076404edcSAsim Jamshed int i; 75176404edcSAsim Jamshed 75276404edcSAsim Jamshed /* Set the threshold to g_config.mos->max_concurrency to send ACK immediately */ 75376404edcSAsim Jamshed /* Otherwise, set to appropriate value (e.g. thresh) */ 75476404edcSAsim Jamshed assert(mtcp->g_sender != NULL); 75576404edcSAsim Jamshed if (mtcp->g_sender->control_list_cnt) 75676404edcSAsim Jamshed WriteTCPControlList(mtcp, mtcp->g_sender, cur_ts, thresh); 75776404edcSAsim Jamshed if (mtcp->g_sender->ack_list_cnt) 75876404edcSAsim Jamshed WriteTCPACKList(mtcp, mtcp->g_sender, cur_ts, thresh); 75976404edcSAsim Jamshed if (mtcp->g_sender->send_list_cnt) 76076404edcSAsim Jamshed WriteTCPDataList(mtcp, mtcp->g_sender, cur_ts, thresh); 76176404edcSAsim Jamshed 76276404edcSAsim Jamshed for (i = 0; i < g_config.mos->netdev_table->num; i++) { 76376404edcSAsim Jamshed assert(mtcp->n_sender[i] != NULL); 76476404edcSAsim Jamshed if (mtcp->n_sender[i]->control_list_cnt) 76576404edcSAsim Jamshed WriteTCPControlList(mtcp, mtcp->n_sender[i], cur_ts, thresh); 76676404edcSAsim Jamshed if (mtcp->n_sender[i]->ack_list_cnt) 76776404edcSAsim Jamshed WriteTCPACKList(mtcp, mtcp->n_sender[i], cur_ts, thresh); 76876404edcSAsim Jamshed if (mtcp->n_sender[i]->send_list_cnt) 76976404edcSAsim Jamshed WriteTCPDataList(mtcp, mtcp->n_sender[i], cur_ts, thresh); 77076404edcSAsim Jamshed } 77176404edcSAsim Jamshed } 77276404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 77376404edcSAsim Jamshed #if TESTING 77476404edcSAsim Jamshed static int 77576404edcSAsim Jamshed DestroyRemainingFlows(mtcp_manager_t mtcp) 77676404edcSAsim Jamshed { 77776404edcSAsim Jamshed struct hashtable *ht = mtcp->tcp_flow_table; 77876404edcSAsim Jamshed tcp_stream *walk; 77976404edcSAsim Jamshed int cnt, i; 78076404edcSAsim Jamshed 78176404edcSAsim Jamshed cnt = 0; 78276404edcSAsim Jamshed 78376404edcSAsim Jamshed thread_printf(mtcp, mtcp->log_fp, 78476404edcSAsim Jamshed "CPU %d: Flushing remaining flows.\n", mtcp->ctx->cpu); 78576404edcSAsim Jamshed 78676404edcSAsim Jamshed for (i = 0; i < NUM_BINS; i++) { 78776404edcSAsim Jamshed TAILQ_FOREACH(walk, &ht->ht_table[i], rcvvar->he_link) { 78876404edcSAsim Jamshed thread_printf(mtcp, mtcp->log_fp, 78976404edcSAsim Jamshed "CPU %d: Destroying stream %d\n", mtcp->ctx->cpu, walk->id); 790a5e1a556SAsim Jamshed #ifdef DUMP_STREAM 79176404edcSAsim Jamshed DumpStream(mtcp, walk); 792a5e1a556SAsim Jamshed #endif 79376404edcSAsim Jamshed DestroyTCPStream(mtcp, walk); 79476404edcSAsim Jamshed cnt++; 79576404edcSAsim Jamshed } 79676404edcSAsim Jamshed } 79776404edcSAsim Jamshed 79876404edcSAsim Jamshed return cnt; 79976404edcSAsim Jamshed } 80076404edcSAsim Jamshed #endif 80176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 80276404edcSAsim Jamshed static void 80376404edcSAsim Jamshed InterruptApplication(mtcp_manager_t mtcp) 80476404edcSAsim Jamshed { 80576404edcSAsim Jamshed /* interrupt if the mtcp_epoll_wait() is waiting */ 80676404edcSAsim Jamshed if (mtcp->ep) { 80776404edcSAsim Jamshed pthread_mutex_lock(&mtcp->ep->epoll_lock); 80876404edcSAsim Jamshed if (mtcp->ep->waiting) { 80976404edcSAsim Jamshed pthread_cond_signal(&mtcp->ep->epoll_cond); 81076404edcSAsim Jamshed } 81176404edcSAsim Jamshed pthread_mutex_unlock(&mtcp->ep->epoll_lock); 81276404edcSAsim Jamshed } 81376404edcSAsim Jamshed /* interrupt if the accept() is waiting */ 81476404edcSAsim Jamshed if (mtcp->listener) { 81576404edcSAsim Jamshed if (mtcp->listener->socket) { 81676404edcSAsim Jamshed pthread_mutex_lock(&mtcp->listener->accept_lock); 81776404edcSAsim Jamshed if (!(mtcp->listener->socket->opts & MTCP_NONBLOCK)) { 81876404edcSAsim Jamshed pthread_cond_signal(&mtcp->listener->accept_cond); 81976404edcSAsim Jamshed } 82076404edcSAsim Jamshed pthread_mutex_unlock(&mtcp->listener->accept_lock); 82176404edcSAsim Jamshed } 82276404edcSAsim Jamshed } 82376404edcSAsim Jamshed } 82476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 82576404edcSAsim Jamshed void 82676404edcSAsim Jamshed RunPassiveLoop(mtcp_manager_t mtcp) 82776404edcSAsim Jamshed { 82876404edcSAsim Jamshed sem_wait(&g_done_sem[mtcp->ctx->cpu]); 82976404edcSAsim Jamshed sem_destroy(&g_done_sem[mtcp->ctx->cpu]); 83076404edcSAsim Jamshed return; 83176404edcSAsim Jamshed } 83276404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 83376404edcSAsim Jamshed static void 83476404edcSAsim Jamshed RunMainLoop(struct mtcp_thread_context *ctx) 83576404edcSAsim Jamshed { 83676404edcSAsim Jamshed mtcp_manager_t mtcp = ctx->mtcp_manager; 83776404edcSAsim Jamshed int i; 83876404edcSAsim Jamshed int recv_cnt; 83976404edcSAsim Jamshed int rx_inf, tx_inf; 84076404edcSAsim Jamshed struct timeval cur_ts = {0}; 84176404edcSAsim Jamshed uint32_t ts, ts_prev; 84276404edcSAsim Jamshed 84376404edcSAsim Jamshed #if TIME_STAT 84476404edcSAsim Jamshed struct timeval prev_ts, processing_ts, tcheck_ts, 84576404edcSAsim Jamshed epoll_ts, handle_ts, xmit_ts, select_ts; 84676404edcSAsim Jamshed #endif 84776404edcSAsim Jamshed int thresh; 84876404edcSAsim Jamshed 84976404edcSAsim Jamshed gettimeofday(&cur_ts, NULL); 85076404edcSAsim Jamshed 85176404edcSAsim Jamshed TRACE_DBG("CPU %d: mtcp thread running.\n", ctx->cpu); 85276404edcSAsim Jamshed 85376404edcSAsim Jamshed #if TIME_STAT 85476404edcSAsim Jamshed prev_ts = cur_ts; 85576404edcSAsim Jamshed InitStatCounter(&mtcp->rtstat.round); 85676404edcSAsim Jamshed InitStatCounter(&mtcp->rtstat.processing); 85776404edcSAsim Jamshed InitStatCounter(&mtcp->rtstat.tcheck); 85876404edcSAsim Jamshed InitStatCounter(&mtcp->rtstat.epoll); 85976404edcSAsim Jamshed InitStatCounter(&mtcp->rtstat.handle); 86076404edcSAsim Jamshed InitStatCounter(&mtcp->rtstat.xmit); 86176404edcSAsim Jamshed InitStatCounter(&mtcp->rtstat.select); 86276404edcSAsim Jamshed #endif 86376404edcSAsim Jamshed 86476404edcSAsim Jamshed ts = ts_prev = 0; 86576404edcSAsim Jamshed while ((!ctx->done || mtcp->flow_cnt) && !ctx->exit) { 86676404edcSAsim Jamshed 86776404edcSAsim Jamshed STAT_COUNT(mtcp->runstat.rounds); 86876404edcSAsim Jamshed recv_cnt = 0; 86976404edcSAsim Jamshed gettimeofday(&cur_ts, NULL); 87076404edcSAsim Jamshed #if TIME_STAT 87176404edcSAsim Jamshed /* measure the inter-round delay */ 87276404edcSAsim Jamshed UpdateStatCounter(&mtcp->rtstat.round, TimeDiffUs(&cur_ts, &prev_ts)); 87376404edcSAsim Jamshed prev_ts = cur_ts; 87476404edcSAsim Jamshed #endif 87576404edcSAsim Jamshed 87676404edcSAsim Jamshed ts = TIMEVAL_TO_TS(&cur_ts); 87776404edcSAsim Jamshed mtcp->cur_ts = ts; 87876404edcSAsim Jamshed 87976404edcSAsim Jamshed for (rx_inf = 0; rx_inf < g_config.mos->netdev_table->num; rx_inf++) { 88076404edcSAsim Jamshed 88176404edcSAsim Jamshed recv_cnt = mtcp->iom->recv_pkts(ctx, rx_inf); 88276404edcSAsim Jamshed STAT_COUNT(mtcp->runstat.rounds_rx_try); 88376404edcSAsim Jamshed 88476404edcSAsim Jamshed for (i = 0; i < recv_cnt; i++) { 88576404edcSAsim Jamshed uint16_t len; 88676404edcSAsim Jamshed uint8_t *pktbuf; 88776404edcSAsim Jamshed pktbuf = mtcp->iom->get_rptr(mtcp->ctx, rx_inf, i, &len); 88876404edcSAsim Jamshed ProcessPacket(mtcp, rx_inf, i, ts, pktbuf, len); 88976404edcSAsim Jamshed } 89076404edcSAsim Jamshed } 89176404edcSAsim Jamshed STAT_COUNT(mtcp->runstat.rounds_rx); 89276404edcSAsim Jamshed 89376404edcSAsim Jamshed #if TIME_STAT 89476404edcSAsim Jamshed gettimeofday(&processing_ts, NULL); 89576404edcSAsim Jamshed UpdateStatCounter(&mtcp->rtstat.processing, 89676404edcSAsim Jamshed TimeDiffUs(&processing_ts, &cur_ts)); 89776404edcSAsim Jamshed #endif /* TIME_STAT */ 89876404edcSAsim Jamshed 89976404edcSAsim Jamshed /* Handle user defined timeout */ 90076404edcSAsim Jamshed struct timer *walk, *tmp; 90176404edcSAsim Jamshed for (walk = TAILQ_FIRST(&mtcp->timer_list); walk != NULL; walk = tmp) { 90276404edcSAsim Jamshed tmp = TAILQ_NEXT(walk, timer_link); 90376404edcSAsim Jamshed if (TIMEVAL_LT(&cur_ts, &walk->exp)) 90476404edcSAsim Jamshed break; 90576404edcSAsim Jamshed 90676404edcSAsim Jamshed struct mtcp_context mctx = {.cpu = ctx->cpu}; 90776404edcSAsim Jamshed walk->cb(&mctx, walk->id, 0, 0 /* FIXME */, NULL); 90876404edcSAsim Jamshed DelTimer(mtcp, walk); 90976404edcSAsim Jamshed } 91076404edcSAsim Jamshed 91176404edcSAsim Jamshed /* interaction with application */ 91276404edcSAsim Jamshed if (mtcp->flow_cnt > 0) { 91376404edcSAsim Jamshed 91476404edcSAsim Jamshed /* check retransmission timeout and timewait expire */ 91576404edcSAsim Jamshed #if 0 91676404edcSAsim Jamshed thresh = (int)mtcp->flow_cnt / (TS_TO_USEC(PER_STREAM_TCHECK)); 91776404edcSAsim Jamshed assert(thresh >= 0); 91876404edcSAsim Jamshed if (thresh == 0) 91976404edcSAsim Jamshed thresh = 1; 92076404edcSAsim Jamshed if (recv_cnt > 0 && thresh > recv_cnt) 92176404edcSAsim Jamshed thresh = recv_cnt; 92276404edcSAsim Jamshed #else 92376404edcSAsim Jamshed thresh = g_config.mos->max_concurrency; 92476404edcSAsim Jamshed #endif 92576404edcSAsim Jamshed 92676404edcSAsim Jamshed /* Eunyoung, you may fix this later 92776404edcSAsim Jamshed * if there is no rcv packet, we will send as much as possible 92876404edcSAsim Jamshed */ 92976404edcSAsim Jamshed if (thresh == -1) 93076404edcSAsim Jamshed thresh = g_config.mos->max_concurrency; 93176404edcSAsim Jamshed 93276404edcSAsim Jamshed CheckRtmTimeout(mtcp, ts, thresh); 93376404edcSAsim Jamshed CheckTimewaitExpire(mtcp, ts, thresh); 93476404edcSAsim Jamshed 93576404edcSAsim Jamshed if (g_config.mos->tcp_timeout > 0 && ts != ts_prev) { 93676404edcSAsim Jamshed CheckConnectionTimeout(mtcp, ts, thresh); 93776404edcSAsim Jamshed } 93876404edcSAsim Jamshed 93976404edcSAsim Jamshed #if TIME_STAT 94076404edcSAsim Jamshed } 94176404edcSAsim Jamshed gettimeofday(&tcheck_ts, NULL); 94276404edcSAsim Jamshed UpdateStatCounter(&mtcp->rtstat.tcheck, 94376404edcSAsim Jamshed TimeDiffUs(&tcheck_ts, &processing_ts)); 94476404edcSAsim Jamshed 94576404edcSAsim Jamshed if (mtcp->flow_cnt > 0) { 94676404edcSAsim Jamshed #endif /* TIME_STAT */ 94776404edcSAsim Jamshed 94876404edcSAsim Jamshed } 94976404edcSAsim Jamshed 95076404edcSAsim Jamshed /* 95176404edcSAsim Jamshed * before flushing epoll events, call monitor events for 95276404edcSAsim Jamshed * all registered `read` events 95376404edcSAsim Jamshed */ 95476404edcSAsim Jamshed if (mtcp->num_msp > 0) 95576404edcSAsim Jamshed /* call this when only a standalone monitor is running */ 95676404edcSAsim Jamshed FlushMonitorReadEvents(mtcp); 95776404edcSAsim Jamshed 95876404edcSAsim Jamshed /* if epoll is in use, flush all the queued events */ 95976404edcSAsim Jamshed if (mtcp->ep) { 96076404edcSAsim Jamshed FlushBufferedReadEvents(mtcp); 96176404edcSAsim Jamshed FlushEpollEvents(mtcp, ts); 96276404edcSAsim Jamshed } 96376404edcSAsim Jamshed #if TIME_STAT 96476404edcSAsim Jamshed gettimeofday(&epoll_ts, NULL); 96576404edcSAsim Jamshed UpdateStatCounter(&mtcp->rtstat.epoll, 96676404edcSAsim Jamshed TimeDiffUs(&epoll_ts, &tcheck_ts)); 96776404edcSAsim Jamshed #endif /* TIME_STAT */ 96876404edcSAsim Jamshed 96976404edcSAsim Jamshed if (end_app_exists && mtcp->flow_cnt > 0) { 97076404edcSAsim Jamshed /* handle stream queues */ 97176404edcSAsim Jamshed HandleApplicationCalls(mtcp, ts); 97276404edcSAsim Jamshed } 97376404edcSAsim Jamshed 97476404edcSAsim Jamshed #if TIME_STAT 97576404edcSAsim Jamshed gettimeofday(&handle_ts, NULL); 97676404edcSAsim Jamshed UpdateStatCounter(&mtcp->rtstat.handle, 97776404edcSAsim Jamshed TimeDiffUs(&handle_ts, &epoll_ts)); 97876404edcSAsim Jamshed #endif /* TIME_STAT */ 97976404edcSAsim Jamshed 98076404edcSAsim Jamshed WritePacketsToChunks(mtcp, ts); 98176404edcSAsim Jamshed 98276404edcSAsim Jamshed /* send packets from write buffer */ 9837631e025SAsim Jamshed /* Send until tx is available */ 98476404edcSAsim Jamshed int num_dev = g_config.mos->netdev_table->num; 98576404edcSAsim Jamshed if (likely(mtcp->iom->send_pkts != NULL)) 98676404edcSAsim Jamshed for (tx_inf = 0; tx_inf < num_dev; tx_inf++) { 98776404edcSAsim Jamshed mtcp->iom->send_pkts(ctx, tx_inf); 98876404edcSAsim Jamshed } 98976404edcSAsim Jamshed 99076404edcSAsim Jamshed #if TIME_STAT 99176404edcSAsim Jamshed gettimeofday(&xmit_ts, NULL); 99276404edcSAsim Jamshed UpdateStatCounter(&mtcp->rtstat.xmit, 99376404edcSAsim Jamshed TimeDiffUs(&xmit_ts, &handle_ts)); 99476404edcSAsim Jamshed #endif /* TIME_STAT */ 99576404edcSAsim Jamshed 99676404edcSAsim Jamshed if (ts != ts_prev) { 99776404edcSAsim Jamshed ts_prev = ts; 99876404edcSAsim Jamshed #ifdef NETSTAT 99976404edcSAsim Jamshed if (ctx->cpu == printer) { 100076404edcSAsim Jamshed #ifdef RUN_ARP 100176404edcSAsim Jamshed ARPTimer(mtcp, ts); 100276404edcSAsim Jamshed #endif 1003265cb675SAsim Jamshed #ifdef NETSTAT 100476404edcSAsim Jamshed PrintNetworkStats(mtcp, ts); 1005265cb675SAsim Jamshed #endif 100676404edcSAsim Jamshed } 100776404edcSAsim Jamshed #endif /* NETSTAT */ 100876404edcSAsim Jamshed } 100976404edcSAsim Jamshed 101076404edcSAsim Jamshed if (mtcp->iom->select) 101176404edcSAsim Jamshed mtcp->iom->select(ctx); 101276404edcSAsim Jamshed 101376404edcSAsim Jamshed if (ctx->interrupt) { 101476404edcSAsim Jamshed InterruptApplication(mtcp); 101576404edcSAsim Jamshed } 101676404edcSAsim Jamshed } 101776404edcSAsim Jamshed 101876404edcSAsim Jamshed #if TESTING 101976404edcSAsim Jamshed DestroyRemainingFlows(mtcp); 102076404edcSAsim Jamshed #endif 102176404edcSAsim Jamshed 102276404edcSAsim Jamshed TRACE_DBG("MTCP thread %d out of main loop.\n", ctx->cpu); 102376404edcSAsim Jamshed /* flush logs */ 102476404edcSAsim Jamshed flush_log_data(mtcp); 102576404edcSAsim Jamshed TRACE_DBG("MTCP thread %d flushed logs.\n", ctx->cpu); 102676404edcSAsim Jamshed InterruptApplication(mtcp); 102776404edcSAsim Jamshed TRACE_INFO("MTCP thread %d finished.\n", ctx->cpu); 102876404edcSAsim Jamshed } 102976404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 103076404edcSAsim Jamshed struct mtcp_sender * 103176404edcSAsim Jamshed CreateMTCPSender(int ifidx) 103276404edcSAsim Jamshed { 103376404edcSAsim Jamshed struct mtcp_sender *sender; 103476404edcSAsim Jamshed 103576404edcSAsim Jamshed sender = (struct mtcp_sender *)calloc(1, sizeof(struct mtcp_sender)); 103676404edcSAsim Jamshed if (!sender) { 103776404edcSAsim Jamshed return NULL; 103876404edcSAsim Jamshed } 103976404edcSAsim Jamshed 104076404edcSAsim Jamshed sender->ifidx = ifidx; 104176404edcSAsim Jamshed 104276404edcSAsim Jamshed TAILQ_INIT(&sender->control_list); 104376404edcSAsim Jamshed TAILQ_INIT(&sender->send_list); 104476404edcSAsim Jamshed TAILQ_INIT(&sender->ack_list); 104576404edcSAsim Jamshed 104676404edcSAsim Jamshed sender->control_list_cnt = 0; 104776404edcSAsim Jamshed sender->send_list_cnt = 0; 104876404edcSAsim Jamshed sender->ack_list_cnt = 0; 104976404edcSAsim Jamshed 105076404edcSAsim Jamshed return sender; 105176404edcSAsim Jamshed } 105276404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 105376404edcSAsim Jamshed void 105476404edcSAsim Jamshed DestroyMTCPSender(struct mtcp_sender *sender) 105576404edcSAsim Jamshed { 105676404edcSAsim Jamshed free(sender); 105776404edcSAsim Jamshed } 105876404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 105976404edcSAsim Jamshed static mtcp_manager_t 106076404edcSAsim Jamshed InitializeMTCPManager(struct mtcp_thread_context* ctx) 106176404edcSAsim Jamshed { 106276404edcSAsim Jamshed mtcp_manager_t mtcp; 106376404edcSAsim Jamshed char log_name[MAX_FILE_NAME]; 106476404edcSAsim Jamshed int i; 106576404edcSAsim Jamshed 106676404edcSAsim Jamshed posix_seq_srand((unsigned)pthread_self()); 106776404edcSAsim Jamshed 106876404edcSAsim Jamshed mtcp = (mtcp_manager_t)calloc(1, sizeof(struct mtcp_manager)); 106976404edcSAsim Jamshed if (!mtcp) { 107076404edcSAsim Jamshed perror("malloc"); 1071c6a5549bSAsim Jamshed fprintf(stderr, "Failed to allocate mtcp_manager.\n"); 107276404edcSAsim Jamshed return NULL; 107376404edcSAsim Jamshed } 107476404edcSAsim Jamshed g_mtcp[ctx->cpu] = mtcp; 107576404edcSAsim Jamshed 107676404edcSAsim Jamshed mtcp->tcp_flow_table = CreateHashtable(); 107776404edcSAsim Jamshed if (!mtcp->tcp_flow_table) { 107876404edcSAsim Jamshed CTRACE_ERROR("Falied to allocate tcp flow table.\n"); 107976404edcSAsim Jamshed return NULL; 108076404edcSAsim Jamshed } 108176404edcSAsim Jamshed 108276404edcSAsim Jamshed #ifdef HUGEPAGE 108376404edcSAsim Jamshed #define IS_HUGEPAGE 1 108476404edcSAsim Jamshed #else 108576404edcSAsim Jamshed #define IS_HUGEPAGE 0 108676404edcSAsim Jamshed #endif 108776404edcSAsim Jamshed if (mon_app_exists) { 108876404edcSAsim Jamshed /* initialize event callback */ 108976404edcSAsim Jamshed #ifdef NEWEV 109076404edcSAsim Jamshed InitEvent(mtcp); 109176404edcSAsim Jamshed #else 109276404edcSAsim Jamshed InitEvent(mtcp, NUM_EV_TABLE); 109376404edcSAsim Jamshed #endif 109476404edcSAsim Jamshed } 109576404edcSAsim Jamshed 109676404edcSAsim Jamshed if (!(mtcp->bufseg_pool = MPCreate(sizeof(tcpbufseg_t), 109776404edcSAsim Jamshed sizeof(tcpbufseg_t) * g_config.mos->max_concurrency * 109876404edcSAsim Jamshed ((g_config.mos->rmem_size - 1) / UNITBUFSIZE + 1), 0))) { 109976404edcSAsim Jamshed TRACE_ERROR("Failed to allocate ev_table pool\n"); 110076404edcSAsim Jamshed exit(0); 110176404edcSAsim Jamshed } 110276404edcSAsim Jamshed if (!(mtcp->sockent_pool = MPCreate(sizeof(struct sockent), 110376404edcSAsim Jamshed sizeof(struct sockent) * g_config.mos->max_concurrency * 3, 0))) { 110476404edcSAsim Jamshed TRACE_ERROR("Failed to allocate ev_table pool\n"); 110576404edcSAsim Jamshed exit(0); 110676404edcSAsim Jamshed } 110776404edcSAsim Jamshed #ifdef USE_TIMER_POOL 110876404edcSAsim Jamshed if (!(mtcp->timer_pool = MPCreate(sizeof(struct timer), 110976404edcSAsim Jamshed sizeof(struct timer) * g_config.mos->max_concurrency * 10, 0))) { 111076404edcSAsim Jamshed TRACE_ERROR("Failed to allocate ev_table pool\n"); 111176404edcSAsim Jamshed exit(0); 111276404edcSAsim Jamshed } 111376404edcSAsim Jamshed #endif 111476404edcSAsim Jamshed mtcp->flow_pool = MPCreate(sizeof(tcp_stream), 111576404edcSAsim Jamshed sizeof(tcp_stream) * g_config.mos->max_concurrency, IS_HUGEPAGE); 111676404edcSAsim Jamshed if (!mtcp->flow_pool) { 111776404edcSAsim Jamshed CTRACE_ERROR("Failed to allocate tcp flow pool.\n"); 111876404edcSAsim Jamshed return NULL; 111976404edcSAsim Jamshed } 112076404edcSAsim Jamshed mtcp->rv_pool = MPCreate(sizeof(struct tcp_recv_vars), 112176404edcSAsim Jamshed sizeof(struct tcp_recv_vars) * g_config.mos->max_concurrency, IS_HUGEPAGE); 112276404edcSAsim Jamshed if (!mtcp->rv_pool) { 112376404edcSAsim Jamshed CTRACE_ERROR("Failed to allocate tcp recv variable pool.\n"); 112476404edcSAsim Jamshed return NULL; 112576404edcSAsim Jamshed } 112676404edcSAsim Jamshed mtcp->sv_pool = MPCreate(sizeof(struct tcp_send_vars), 112776404edcSAsim Jamshed sizeof(struct tcp_send_vars) * g_config.mos->max_concurrency, IS_HUGEPAGE); 112876404edcSAsim Jamshed if (!mtcp->sv_pool) { 112976404edcSAsim Jamshed CTRACE_ERROR("Failed to allocate tcp send variable pool.\n"); 113076404edcSAsim Jamshed return NULL; 113176404edcSAsim Jamshed } 113276404edcSAsim Jamshed 113376404edcSAsim Jamshed mtcp->rbm_snd = SBManagerCreate(g_config.mos->wmem_size, g_config.mos->no_ring_buffers, 113476404edcSAsim Jamshed g_config.mos->max_concurrency); 113576404edcSAsim Jamshed if (!mtcp->rbm_snd) { 113676404edcSAsim Jamshed CTRACE_ERROR("Failed to create send ring buffer.\n"); 113776404edcSAsim Jamshed return NULL; 113876404edcSAsim Jamshed } 113976404edcSAsim Jamshed 114076404edcSAsim Jamshed mtcp->smap = (socket_map_t)calloc(g_config.mos->max_concurrency, sizeof(struct socket_map)); 114176404edcSAsim Jamshed if (!mtcp->smap) { 114276404edcSAsim Jamshed perror("calloc"); 114376404edcSAsim Jamshed CTRACE_ERROR("Failed to allocate memory for stream map.\n"); 114476404edcSAsim Jamshed return NULL; 114576404edcSAsim Jamshed } 114676404edcSAsim Jamshed 114776404edcSAsim Jamshed if (mon_app_exists) { 114876404edcSAsim Jamshed mtcp->msmap = (socket_map_t)calloc(g_config.mos->max_concurrency, sizeof(struct socket_map)); 114976404edcSAsim Jamshed if (!mtcp->msmap) { 115076404edcSAsim Jamshed perror("calloc"); 115176404edcSAsim Jamshed CTRACE_ERROR("Failed to allocate memory for monitor stream map.\n"); 115276404edcSAsim Jamshed return NULL; 115376404edcSAsim Jamshed } 115476404edcSAsim Jamshed 115576404edcSAsim Jamshed for (i = 0; i < g_config.mos->max_concurrency; i++) { 115676404edcSAsim Jamshed mtcp->msmap[i].monitor_stream = calloc(1, sizeof(struct mon_stream)); 115776404edcSAsim Jamshed if (!mtcp->msmap[i].monitor_stream) { 115876404edcSAsim Jamshed perror("calloc"); 115976404edcSAsim Jamshed CTRACE_ERROR("Failed to allocate memory for monitr stream map\n"); 116076404edcSAsim Jamshed return NULL; 116176404edcSAsim Jamshed } 116276404edcSAsim Jamshed } 116376404edcSAsim Jamshed } 116476404edcSAsim Jamshed 116576404edcSAsim Jamshed TAILQ_INIT(&mtcp->timer_list); 116676404edcSAsim Jamshed TAILQ_INIT(&mtcp->monitors); 116776404edcSAsim Jamshed 116876404edcSAsim Jamshed TAILQ_INIT(&mtcp->free_smap); 116976404edcSAsim Jamshed for (i = 0; i < g_config.mos->max_concurrency; i++) { 117076404edcSAsim Jamshed mtcp->smap[i].id = i; 117176404edcSAsim Jamshed mtcp->smap[i].socktype = MOS_SOCK_UNUSED; 117276404edcSAsim Jamshed memset(&mtcp->smap[i].saddr, 0, sizeof(struct sockaddr_in)); 117376404edcSAsim Jamshed mtcp->smap[i].stream = NULL; 117476404edcSAsim Jamshed TAILQ_INSERT_TAIL(&mtcp->free_smap, &mtcp->smap[i], link); 117576404edcSAsim Jamshed } 117676404edcSAsim Jamshed 117776404edcSAsim Jamshed if (mon_app_exists) { 117876404edcSAsim Jamshed TAILQ_INIT(&mtcp->free_msmap); 117976404edcSAsim Jamshed for (i = 0; i < g_config.mos->max_concurrency; i++) { 118076404edcSAsim Jamshed mtcp->msmap[i].id = i; 118176404edcSAsim Jamshed mtcp->msmap[i].socktype = MOS_SOCK_UNUSED; 118276404edcSAsim Jamshed memset(&mtcp->msmap[i].saddr, 0, sizeof(struct sockaddr_in)); 118376404edcSAsim Jamshed TAILQ_INSERT_TAIL(&mtcp->free_msmap, &mtcp->msmap[i], link); 118476404edcSAsim Jamshed } 118576404edcSAsim Jamshed } 118676404edcSAsim Jamshed 118776404edcSAsim Jamshed mtcp->ctx = ctx; 118876404edcSAsim Jamshed mtcp->ep = NULL; 118976404edcSAsim Jamshed 119076404edcSAsim Jamshed snprintf(log_name, MAX_FILE_NAME, "%s/"LOG_FILE_NAME"_%d", 119176404edcSAsim Jamshed g_config.mos->mos_log, ctx->cpu); 119276404edcSAsim Jamshed mtcp->log_fp = fopen(log_name, "w+"); 119376404edcSAsim Jamshed if (!mtcp->log_fp) { 119476404edcSAsim Jamshed perror("fopen"); 119576404edcSAsim Jamshed CTRACE_ERROR("Failed to create file for logging. (%s)\n", log_name); 119676404edcSAsim Jamshed return NULL; 119776404edcSAsim Jamshed } 119876404edcSAsim Jamshed mtcp->sp_fd = g_logctx[ctx->cpu]->pair_sp_fd; 119976404edcSAsim Jamshed mtcp->logger = g_logctx[ctx->cpu]; 120076404edcSAsim Jamshed 120176404edcSAsim Jamshed mtcp->connectq = CreateStreamQueue(BACKLOG_SIZE); 120276404edcSAsim Jamshed if (!mtcp->connectq) { 120376404edcSAsim Jamshed CTRACE_ERROR("Failed to create connect queue.\n"); 120476404edcSAsim Jamshed return NULL; 120576404edcSAsim Jamshed } 120676404edcSAsim Jamshed mtcp->sendq = CreateStreamQueue(g_config.mos->max_concurrency); 120776404edcSAsim Jamshed if (!mtcp->sendq) { 120876404edcSAsim Jamshed CTRACE_ERROR("Failed to create send queue.\n"); 120976404edcSAsim Jamshed return NULL; 121076404edcSAsim Jamshed } 121176404edcSAsim Jamshed mtcp->ackq = CreateStreamQueue(g_config.mos->max_concurrency); 121276404edcSAsim Jamshed if (!mtcp->ackq) { 121376404edcSAsim Jamshed CTRACE_ERROR("Failed to create ack queue.\n"); 121476404edcSAsim Jamshed return NULL; 121576404edcSAsim Jamshed } 121676404edcSAsim Jamshed mtcp->closeq = CreateStreamQueue(g_config.mos->max_concurrency); 121776404edcSAsim Jamshed if (!mtcp->closeq) { 121876404edcSAsim Jamshed CTRACE_ERROR("Failed to create close queue.\n"); 121976404edcSAsim Jamshed return NULL; 122076404edcSAsim Jamshed } 122176404edcSAsim Jamshed mtcp->closeq_int = CreateInternalStreamQueue(g_config.mos->max_concurrency); 122276404edcSAsim Jamshed if (!mtcp->closeq_int) { 122376404edcSAsim Jamshed CTRACE_ERROR("Failed to create close queue.\n"); 122476404edcSAsim Jamshed return NULL; 122576404edcSAsim Jamshed } 122676404edcSAsim Jamshed mtcp->resetq = CreateStreamQueue(g_config.mos->max_concurrency); 122776404edcSAsim Jamshed if (!mtcp->resetq) { 122876404edcSAsim Jamshed CTRACE_ERROR("Failed to create reset queue.\n"); 122976404edcSAsim Jamshed return NULL; 123076404edcSAsim Jamshed } 123176404edcSAsim Jamshed mtcp->resetq_int = CreateInternalStreamQueue(g_config.mos->max_concurrency); 123276404edcSAsim Jamshed if (!mtcp->resetq_int) { 123376404edcSAsim Jamshed CTRACE_ERROR("Failed to create reset queue.\n"); 123476404edcSAsim Jamshed return NULL; 123576404edcSAsim Jamshed } 123676404edcSAsim Jamshed mtcp->destroyq = CreateStreamQueue(g_config.mos->max_concurrency); 123776404edcSAsim Jamshed if (!mtcp->destroyq) { 123876404edcSAsim Jamshed CTRACE_ERROR("Failed to create destroy queue.\n"); 123976404edcSAsim Jamshed return NULL; 124076404edcSAsim Jamshed } 124176404edcSAsim Jamshed 124276404edcSAsim Jamshed mtcp->g_sender = CreateMTCPSender(-1); 124376404edcSAsim Jamshed if (!mtcp->g_sender) { 124476404edcSAsim Jamshed CTRACE_ERROR("Failed to create global sender structure.\n"); 124576404edcSAsim Jamshed return NULL; 124676404edcSAsim Jamshed } 124776404edcSAsim Jamshed for (i = 0; i < g_config.mos->netdev_table->num; i++) { 124876404edcSAsim Jamshed mtcp->n_sender[i] = CreateMTCPSender(i); 124976404edcSAsim Jamshed if (!mtcp->n_sender[i]) { 125076404edcSAsim Jamshed CTRACE_ERROR("Failed to create per-nic sender structure.\n"); 125176404edcSAsim Jamshed return NULL; 125276404edcSAsim Jamshed } 125376404edcSAsim Jamshed } 125476404edcSAsim Jamshed 125576404edcSAsim Jamshed mtcp->rto_store = InitRTOHashstore(); 125676404edcSAsim Jamshed TAILQ_INIT(&mtcp->timewait_list); 125776404edcSAsim Jamshed TAILQ_INIT(&mtcp->timeout_list); 125876404edcSAsim Jamshed 125976404edcSAsim Jamshed return mtcp; 126076404edcSAsim Jamshed } 126176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 126276404edcSAsim Jamshed static void * 126376404edcSAsim Jamshed MTCPRunThread(void *arg) 126476404edcSAsim Jamshed { 126576404edcSAsim Jamshed mctx_t mctx = (mctx_t)arg; 126676404edcSAsim Jamshed int cpu = mctx->cpu; 126776404edcSAsim Jamshed int working; 126876404edcSAsim Jamshed struct mtcp_manager *mtcp; 126976404edcSAsim Jamshed struct mtcp_thread_context *ctx; 127076404edcSAsim Jamshed 127176404edcSAsim Jamshed /* affinitize the thread to this core first */ 127276404edcSAsim Jamshed mtcp_core_affinitize(cpu); 127376404edcSAsim Jamshed 127476404edcSAsim Jamshed /* memory alloc after core affinitization would use local memory 127576404edcSAsim Jamshed most time */ 127676404edcSAsim Jamshed ctx = calloc(1, sizeof(*ctx)); 127776404edcSAsim Jamshed if (!ctx) { 127876404edcSAsim Jamshed perror("calloc"); 127976404edcSAsim Jamshed TRACE_ERROR("Failed to calloc mtcp context.\n"); 128076404edcSAsim Jamshed exit(-1); 128176404edcSAsim Jamshed } 128276404edcSAsim Jamshed ctx->thread = pthread_self(); 128376404edcSAsim Jamshed ctx->cpu = cpu; 128476404edcSAsim Jamshed mtcp = ctx->mtcp_manager = InitializeMTCPManager(ctx); 128576404edcSAsim Jamshed if (!mtcp) { 128676404edcSAsim Jamshed TRACE_ERROR("Failed to initialize mtcp manager.\n"); 128776404edcSAsim Jamshed exit(-1); 128876404edcSAsim Jamshed } 128976404edcSAsim Jamshed 129076404edcSAsim Jamshed /* assign mtcp context's underlying I/O module */ 129176404edcSAsim Jamshed mtcp->iom = current_iomodule_func; 129276404edcSAsim Jamshed 129376404edcSAsim Jamshed /* I/O initializing */ 129476404edcSAsim Jamshed if (mtcp->iom->init_handle) 129576404edcSAsim Jamshed mtcp->iom->init_handle(ctx); 129676404edcSAsim Jamshed 129776404edcSAsim Jamshed if (pthread_mutex_init(&ctx->flow_pool_lock, NULL)) { 129876404edcSAsim Jamshed perror("pthread_mutex_init of ctx->flow_pool_lock\n"); 129976404edcSAsim Jamshed exit(-1); 130076404edcSAsim Jamshed } 130176404edcSAsim Jamshed 130276404edcSAsim Jamshed if (pthread_mutex_init(&ctx->socket_pool_lock, NULL)) { 130376404edcSAsim Jamshed perror("pthread_mutex_init of ctx->socket_pool_lock\n"); 130476404edcSAsim Jamshed exit(-1); 130576404edcSAsim Jamshed } 130676404edcSAsim Jamshed 130776404edcSAsim Jamshed SQ_LOCK_INIT(&ctx->connect_lock, "ctx->connect_lock", exit(-1)); 130876404edcSAsim Jamshed SQ_LOCK_INIT(&ctx->close_lock, "ctx->close_lock", exit(-1)); 130976404edcSAsim Jamshed SQ_LOCK_INIT(&ctx->reset_lock, "ctx->reset_lock", exit(-1)); 131076404edcSAsim Jamshed SQ_LOCK_INIT(&ctx->sendq_lock, "ctx->sendq_lock", exit(-1)); 131176404edcSAsim Jamshed SQ_LOCK_INIT(&ctx->ackq_lock, "ctx->ackq_lock", exit(-1)); 131276404edcSAsim Jamshed SQ_LOCK_INIT(&ctx->destroyq_lock, "ctx->destroyq_lock", exit(-1)); 131376404edcSAsim Jamshed 131476404edcSAsim Jamshed /* remember this context pointer for signal processing */ 131576404edcSAsim Jamshed g_pctx[cpu] = ctx; 131676404edcSAsim Jamshed mlockall(MCL_CURRENT); 131776404edcSAsim Jamshed 131876404edcSAsim Jamshed // attach (nic device, queue) 131976404edcSAsim Jamshed working = AttachDevice(ctx); 132076404edcSAsim Jamshed if (working != 0) { 132176404edcSAsim Jamshed sem_post(&g_init_sem[ctx->cpu]); 132276404edcSAsim Jamshed TRACE_DBG("MTCP thread %d finished. Not attached any device\n", ctx->cpu); 132376404edcSAsim Jamshed pthread_exit(NULL); 132476404edcSAsim Jamshed } 132576404edcSAsim Jamshed 132676404edcSAsim Jamshed TRACE_DBG("CPU %d: initialization finished.\n", cpu); 132776404edcSAsim Jamshed sem_post(&g_init_sem[ctx->cpu]); 132876404edcSAsim Jamshed 132976404edcSAsim Jamshed /* start the main loop */ 133076404edcSAsim Jamshed RunMainLoop(ctx); 133176404edcSAsim Jamshed 133276404edcSAsim Jamshed TRACE_DBG("MTCP thread %d finished.\n", ctx->cpu); 133376404edcSAsim Jamshed 133476404edcSAsim Jamshed /* signaling mTCP thread is done */ 133576404edcSAsim Jamshed sem_post(&g_done_sem[mctx->cpu]); 133676404edcSAsim Jamshed 133776404edcSAsim Jamshed //pthread_exit(NULL); 133876404edcSAsim Jamshed return 0; 133976404edcSAsim Jamshed } 134076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 134176404edcSAsim Jamshed #ifdef ENABLE_DPDK 134276404edcSAsim Jamshed static int MTCPDPDKRunThread(void *arg) 134376404edcSAsim Jamshed { 134476404edcSAsim Jamshed MTCPRunThread(arg); 134576404edcSAsim Jamshed return 0; 134676404edcSAsim Jamshed } 134776404edcSAsim Jamshed #endif /* !ENABLE_DPDK */ 134876404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 134976404edcSAsim Jamshed mctx_t 135076404edcSAsim Jamshed mtcp_create_context(int cpu) 135176404edcSAsim Jamshed { 135276404edcSAsim Jamshed mctx_t mctx; 135376404edcSAsim Jamshed int ret; 135476404edcSAsim Jamshed 135576404edcSAsim Jamshed if (cpu >= g_config.mos->num_cores) { 135676404edcSAsim Jamshed TRACE_ERROR("Failed initialize new mtcp context. " 135776404edcSAsim Jamshed "Requested cpu id %d exceed the number of cores %d configured to use.\n", 135876404edcSAsim Jamshed cpu, g_config.mos->num_cores); 135976404edcSAsim Jamshed return NULL; 136076404edcSAsim Jamshed } 136176404edcSAsim Jamshed 136276404edcSAsim Jamshed /* check if mtcp_create_context() was already initialized */ 136376404edcSAsim Jamshed if (g_logctx[cpu] != NULL) { 136476404edcSAsim Jamshed TRACE_ERROR("%s was already initialized before!\n", 136576404edcSAsim Jamshed __FUNCTION__); 136676404edcSAsim Jamshed return NULL; 136776404edcSAsim Jamshed } 136876404edcSAsim Jamshed 136976404edcSAsim Jamshed ret = sem_init(&g_init_sem[cpu], 0, 0); 137076404edcSAsim Jamshed if (ret) { 137176404edcSAsim Jamshed TRACE_ERROR("Failed initialize init_sem.\n"); 137276404edcSAsim Jamshed return NULL; 137376404edcSAsim Jamshed } 137476404edcSAsim Jamshed 137576404edcSAsim Jamshed ret = sem_init(&g_done_sem[cpu], 0, 0); 137676404edcSAsim Jamshed if (ret) { 137776404edcSAsim Jamshed TRACE_ERROR("Failed initialize done_sem.\n"); 137876404edcSAsim Jamshed return NULL; 137976404edcSAsim Jamshed } 138076404edcSAsim Jamshed 138176404edcSAsim Jamshed mctx = (mctx_t)calloc(1, sizeof(struct mtcp_context)); 138276404edcSAsim Jamshed if (!mctx) { 138376404edcSAsim Jamshed TRACE_ERROR("Failed to allocate memory for mtcp_context.\n"); 138476404edcSAsim Jamshed return NULL; 138576404edcSAsim Jamshed } 138676404edcSAsim Jamshed mctx->cpu = cpu; 138776404edcSAsim Jamshed g_ctx[cpu] = mctx; 138876404edcSAsim Jamshed 138976404edcSAsim Jamshed /* initialize logger */ 139076404edcSAsim Jamshed g_logctx[cpu] = (struct log_thread_context *) 139176404edcSAsim Jamshed calloc(1, sizeof(struct log_thread_context)); 139276404edcSAsim Jamshed if (!g_logctx[cpu]) { 139376404edcSAsim Jamshed perror("malloc"); 139476404edcSAsim Jamshed TRACE_ERROR("Failed to allocate memory for log thread context.\n"); 139576404edcSAsim Jamshed return NULL; 139676404edcSAsim Jamshed } 139776404edcSAsim Jamshed InitLogThreadContext(g_logctx[cpu], cpu); 139876404edcSAsim Jamshed if (pthread_create(&log_thread[cpu], 139976404edcSAsim Jamshed NULL, ThreadLogMain, (void *)g_logctx[cpu])) { 140076404edcSAsim Jamshed perror("pthread_create"); 140176404edcSAsim Jamshed TRACE_ERROR("Failed to create log thread\n"); 140276404edcSAsim Jamshed return NULL; 140376404edcSAsim Jamshed } 140476404edcSAsim Jamshed 140576404edcSAsim Jamshed #ifdef ENABLE_DPDK 140676404edcSAsim Jamshed /* Wake up mTCP threads (wake up I/O threads) */ 140776404edcSAsim Jamshed if (current_iomodule_func == &dpdk_module_func) { 140876404edcSAsim Jamshed int master; 140976404edcSAsim Jamshed master = rte_get_master_lcore(); 141076404edcSAsim Jamshed if (master == cpu) { 141176404edcSAsim Jamshed lcore_config[master].ret = 0; 141276404edcSAsim Jamshed lcore_config[master].state = FINISHED; 141376404edcSAsim Jamshed if (pthread_create(&g_thread[cpu], 141476404edcSAsim Jamshed NULL, MTCPRunThread, (void *)mctx) != 0) { 141576404edcSAsim Jamshed TRACE_ERROR("pthread_create of mtcp thread failed!\n"); 141676404edcSAsim Jamshed return NULL; 141776404edcSAsim Jamshed } 141876404edcSAsim Jamshed } else 141976404edcSAsim Jamshed rte_eal_remote_launch(MTCPDPDKRunThread, mctx, cpu); 142076404edcSAsim Jamshed } else 142176404edcSAsim Jamshed #endif /* !ENABLE_DPDK */ 142276404edcSAsim Jamshed { 142376404edcSAsim Jamshed if (pthread_create(&g_thread[cpu], 142476404edcSAsim Jamshed NULL, MTCPRunThread, (void *)mctx) != 0) { 142576404edcSAsim Jamshed TRACE_ERROR("pthread_create of mtcp thread failed!\n"); 142676404edcSAsim Jamshed return NULL; 142776404edcSAsim Jamshed } 142876404edcSAsim Jamshed } 142976404edcSAsim Jamshed 143076404edcSAsim Jamshed sem_wait(&g_init_sem[cpu]); 143176404edcSAsim Jamshed sem_destroy(&g_init_sem[cpu]); 143276404edcSAsim Jamshed 143376404edcSAsim Jamshed running[cpu] = TRUE; 143476404edcSAsim Jamshed 143576404edcSAsim Jamshed #ifdef NETSTAT 143676404edcSAsim Jamshed #if NETSTAT_TOTAL 143776404edcSAsim Jamshed if (printer < 0) { 143876404edcSAsim Jamshed printer = cpu; 143976404edcSAsim Jamshed TRACE_INFO("CPU %d is in charge of printing stats.\n", printer); 144076404edcSAsim Jamshed } 144176404edcSAsim Jamshed #endif 144276404edcSAsim Jamshed #endif 144376404edcSAsim Jamshed 144476404edcSAsim Jamshed return mctx; 144576404edcSAsim Jamshed } 144676404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 144776404edcSAsim Jamshed /** 144876404edcSAsim Jamshed * TODO: It currently always returns 0. Add appropriate error return values 144976404edcSAsim Jamshed */ 145076404edcSAsim Jamshed int 145176404edcSAsim Jamshed mtcp_destroy_context(mctx_t mctx) 145276404edcSAsim Jamshed { 145376404edcSAsim Jamshed struct mtcp_thread_context *ctx = g_pctx[mctx->cpu]; 145476404edcSAsim Jamshed struct mtcp_manager *mtcp = ctx->mtcp_manager; 145576404edcSAsim Jamshed struct log_thread_context *log_ctx = mtcp->logger; 145676404edcSAsim Jamshed int ret, i; 145776404edcSAsim Jamshed 145876404edcSAsim Jamshed TRACE_DBG("CPU %d: mtcp_destroy_context()\n", mctx->cpu); 145976404edcSAsim Jamshed 146076404edcSAsim Jamshed /* close all stream sockets that are still open */ 146176404edcSAsim Jamshed if (!ctx->exit) { 146276404edcSAsim Jamshed for (i = 0; i < g_config.mos->max_concurrency; i++) { 146376404edcSAsim Jamshed if (mtcp->smap[i].socktype == MOS_SOCK_STREAM) { 146476404edcSAsim Jamshed TRACE_DBG("Closing remaining socket %d (%s)\n", 146576404edcSAsim Jamshed i, TCPStateToString(mtcp->smap[i].stream)); 1466a5e1a556SAsim Jamshed #ifdef DUMP_STREAM 146776404edcSAsim Jamshed DumpStream(mtcp, mtcp->smap[i].stream); 1468a5e1a556SAsim Jamshed #endif 146976404edcSAsim Jamshed mtcp_close(mctx, i); 147076404edcSAsim Jamshed } 147176404edcSAsim Jamshed } 147276404edcSAsim Jamshed } 147376404edcSAsim Jamshed 147476404edcSAsim Jamshed ctx->done = 1; 147576404edcSAsim Jamshed 147676404edcSAsim Jamshed //pthread_kill(g_thread[mctx->cpu], SIGINT); 147776404edcSAsim Jamshed #ifdef ENABLE_DPDK 147876404edcSAsim Jamshed ctx->exit = 1; 147976404edcSAsim Jamshed /* XXX - dpdk logic changes */ 148076404edcSAsim Jamshed if (current_iomodule_func == &dpdk_module_func) { 148176404edcSAsim Jamshed int master = rte_get_master_lcore(); 148276404edcSAsim Jamshed if (master == mctx->cpu) 148376404edcSAsim Jamshed pthread_join(g_thread[mctx->cpu], NULL); 148476404edcSAsim Jamshed else 148576404edcSAsim Jamshed rte_eal_wait_lcore(mctx->cpu); 148676404edcSAsim Jamshed } else 148776404edcSAsim Jamshed #endif /* !ENABLE_DPDK */ 148876404edcSAsim Jamshed { 148976404edcSAsim Jamshed pthread_join(g_thread[mctx->cpu], NULL); 149076404edcSAsim Jamshed } 149176404edcSAsim Jamshed 149276404edcSAsim Jamshed TRACE_INFO("MTCP thread %d joined.\n", mctx->cpu); 149376404edcSAsim Jamshed running[mctx->cpu] = FALSE; 149476404edcSAsim Jamshed 149576404edcSAsim Jamshed #ifdef NETSTAT 149676404edcSAsim Jamshed #if NETSTAT_TOTAL 149776404edcSAsim Jamshed if (printer == mctx->cpu) { 149876404edcSAsim Jamshed for (i = 0; i < num_cpus; i++) { 149976404edcSAsim Jamshed if (i != mctx->cpu && running[i]) { 150076404edcSAsim Jamshed printer = i; 150176404edcSAsim Jamshed break; 150276404edcSAsim Jamshed } 150376404edcSAsim Jamshed } 150476404edcSAsim Jamshed } 150576404edcSAsim Jamshed #endif 150676404edcSAsim Jamshed #endif 150776404edcSAsim Jamshed 150876404edcSAsim Jamshed log_ctx->done = 1; 150976404edcSAsim Jamshed ret = write(log_ctx->pair_sp_fd, "F", 1); 151076404edcSAsim Jamshed if (ret != 1) 151176404edcSAsim Jamshed TRACE_ERROR("CPU %d: Fail to signal socket pair\n", mctx->cpu); 151276404edcSAsim Jamshed 151376404edcSAsim Jamshed pthread_join(log_thread[ctx->cpu], NULL); 151476404edcSAsim Jamshed fclose(mtcp->log_fp); 151576404edcSAsim Jamshed TRACE_LOG("Log thread %d joined.\n", mctx->cpu); 151676404edcSAsim Jamshed 151776404edcSAsim Jamshed if (mtcp->connectq) { 151876404edcSAsim Jamshed DestroyStreamQueue(mtcp->connectq); 151976404edcSAsim Jamshed mtcp->connectq = NULL; 152076404edcSAsim Jamshed } 152176404edcSAsim Jamshed if (mtcp->sendq) { 152276404edcSAsim Jamshed DestroyStreamQueue(mtcp->sendq); 152376404edcSAsim Jamshed mtcp->sendq = NULL; 152476404edcSAsim Jamshed } 152576404edcSAsim Jamshed if (mtcp->ackq) { 152676404edcSAsim Jamshed DestroyStreamQueue(mtcp->ackq); 152776404edcSAsim Jamshed mtcp->ackq = NULL; 152876404edcSAsim Jamshed } 152976404edcSAsim Jamshed if (mtcp->closeq) { 153076404edcSAsim Jamshed DestroyStreamQueue(mtcp->closeq); 153176404edcSAsim Jamshed mtcp->closeq = NULL; 153276404edcSAsim Jamshed } 153376404edcSAsim Jamshed if (mtcp->closeq_int) { 153476404edcSAsim Jamshed DestroyInternalStreamQueue(mtcp->closeq_int); 153576404edcSAsim Jamshed mtcp->closeq_int = NULL; 153676404edcSAsim Jamshed } 153776404edcSAsim Jamshed if (mtcp->resetq) { 153876404edcSAsim Jamshed DestroyStreamQueue(mtcp->resetq); 153976404edcSAsim Jamshed mtcp->resetq = NULL; 154076404edcSAsim Jamshed } 154176404edcSAsim Jamshed if (mtcp->resetq_int) { 154276404edcSAsim Jamshed DestroyInternalStreamQueue(mtcp->resetq_int); 154376404edcSAsim Jamshed mtcp->resetq_int = NULL; 154476404edcSAsim Jamshed } 154576404edcSAsim Jamshed if (mtcp->destroyq) { 154676404edcSAsim Jamshed DestroyStreamQueue(mtcp->destroyq); 154776404edcSAsim Jamshed mtcp->destroyq = NULL; 154876404edcSAsim Jamshed } 154976404edcSAsim Jamshed 155076404edcSAsim Jamshed DestroyMTCPSender(mtcp->g_sender); 155176404edcSAsim Jamshed for (i = 0; i < g_config.mos->netdev_table->num; i++) { 155276404edcSAsim Jamshed DestroyMTCPSender(mtcp->n_sender[i]); 155376404edcSAsim Jamshed } 155476404edcSAsim Jamshed 155576404edcSAsim Jamshed MPDestroy(mtcp->rv_pool); 155676404edcSAsim Jamshed MPDestroy(mtcp->sv_pool); 155776404edcSAsim Jamshed MPDestroy(mtcp->flow_pool); 155876404edcSAsim Jamshed 155976404edcSAsim Jamshed if (mtcp->ap) { 156076404edcSAsim Jamshed DestroyAddressPool(mtcp->ap); 156176404edcSAsim Jamshed } 156276404edcSAsim Jamshed 156376404edcSAsim Jamshed SQ_LOCK_DESTROY(&ctx->connect_lock); 156476404edcSAsim Jamshed SQ_LOCK_DESTROY(&ctx->close_lock); 156576404edcSAsim Jamshed SQ_LOCK_DESTROY(&ctx->reset_lock); 156676404edcSAsim Jamshed SQ_LOCK_DESTROY(&ctx->sendq_lock); 156776404edcSAsim Jamshed SQ_LOCK_DESTROY(&ctx->ackq_lock); 156876404edcSAsim Jamshed SQ_LOCK_DESTROY(&ctx->destroyq_lock); 156976404edcSAsim Jamshed 157076404edcSAsim Jamshed //TRACE_INFO("MTCP thread %d destroyed.\n", mctx->cpu); 157176404edcSAsim Jamshed if (mtcp->iom->destroy_handle) 157276404edcSAsim Jamshed mtcp->iom->destroy_handle(ctx); 157376404edcSAsim Jamshed free(ctx); 157476404edcSAsim Jamshed free(mctx); 157576404edcSAsim Jamshed 157676404edcSAsim Jamshed return 0; 157776404edcSAsim Jamshed } 157876404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 157976404edcSAsim Jamshed mtcp_sighandler_t 158076404edcSAsim Jamshed mtcp_register_signal(int signum, mtcp_sighandler_t handler) 158176404edcSAsim Jamshed { 158276404edcSAsim Jamshed mtcp_sighandler_t prev; 158376404edcSAsim Jamshed 158476404edcSAsim Jamshed if (signum == SIGINT) { 158576404edcSAsim Jamshed prev = app_signal_handler; 158676404edcSAsim Jamshed app_signal_handler = handler; 158776404edcSAsim Jamshed } else { 158876404edcSAsim Jamshed if ((prev = signal(signum, handler)) == SIG_ERR) { 158976404edcSAsim Jamshed perror("signal"); 159076404edcSAsim Jamshed return SIG_ERR; 159176404edcSAsim Jamshed } 159276404edcSAsim Jamshed } 159376404edcSAsim Jamshed 159476404edcSAsim Jamshed return prev; 159576404edcSAsim Jamshed } 159676404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 159776404edcSAsim Jamshed int 159876404edcSAsim Jamshed mtcp_getconf(struct mtcp_conf *conf) 159976404edcSAsim Jamshed { 160076404edcSAsim Jamshed int i, j; 160176404edcSAsim Jamshed 16028a941c7eSAsim Jamshed if (!conf) { 16038a941c7eSAsim Jamshed errno = EINVAL; 160476404edcSAsim Jamshed return -1; 16058a941c7eSAsim Jamshed } 160676404edcSAsim Jamshed 160776404edcSAsim Jamshed conf->num_cores = g_config.mos->num_cores; 160876404edcSAsim Jamshed conf->max_concurrency = g_config.mos->max_concurrency; 160976404edcSAsim Jamshed conf->cpu_mask = g_config.mos->cpu_mask; 161076404edcSAsim Jamshed 161176404edcSAsim Jamshed conf->rcvbuf_size = g_config.mos->rmem_size; 161276404edcSAsim Jamshed conf->sndbuf_size = g_config.mos->wmem_size; 161376404edcSAsim Jamshed 161476404edcSAsim Jamshed conf->tcp_timewait = g_config.mos->tcp_tw_interval; 161576404edcSAsim Jamshed conf->tcp_timeout = g_config.mos->tcp_timeout; 161676404edcSAsim Jamshed 161776404edcSAsim Jamshed i = 0; 161876404edcSAsim Jamshed struct conf_block *bwalk; 161976404edcSAsim Jamshed TAILQ_FOREACH(bwalk, &g_config.app_blkh, link) { 162076404edcSAsim Jamshed struct app_conf *app_conf = (struct app_conf *)bwalk->conf; 162176404edcSAsim Jamshed for (j = 0; j < app_conf->app_argc; j++) 162276404edcSAsim Jamshed conf->app_argv[i][j] = app_conf->app_argv[j]; 162376404edcSAsim Jamshed conf->app_argc[i] = app_conf->app_argc; 162476404edcSAsim Jamshed conf->app_cpu_mask[i] = app_conf->cpu_mask; 162576404edcSAsim Jamshed i++; 162676404edcSAsim Jamshed } 162776404edcSAsim Jamshed conf->num_app = i; 162876404edcSAsim Jamshed 162976404edcSAsim Jamshed return 0; 163076404edcSAsim Jamshed } 163176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 163276404edcSAsim Jamshed int 163376404edcSAsim Jamshed mtcp_setconf(const struct mtcp_conf *conf) 163476404edcSAsim Jamshed { 163576404edcSAsim Jamshed if (!conf) 163676404edcSAsim Jamshed return -1; 163776404edcSAsim Jamshed 163876404edcSAsim Jamshed g_config.mos->num_cores = conf->num_cores; 163976404edcSAsim Jamshed g_config.mos->max_concurrency = conf->max_concurrency; 164076404edcSAsim Jamshed 164176404edcSAsim Jamshed g_config.mos->rmem_size = conf->rcvbuf_size; 164276404edcSAsim Jamshed g_config.mos->wmem_size = conf->sndbuf_size; 164376404edcSAsim Jamshed 164476404edcSAsim Jamshed g_config.mos->tcp_tw_interval = conf->tcp_timewait; 164576404edcSAsim Jamshed g_config.mos->tcp_timeout = conf->tcp_timeout; 164676404edcSAsim Jamshed 164776404edcSAsim Jamshed TRACE_CONFIG("Configuration updated by mtcp_setconf().\n"); 164876404edcSAsim Jamshed //PrintConfiguration(); 164976404edcSAsim Jamshed 165076404edcSAsim Jamshed return 0; 165176404edcSAsim Jamshed } 165276404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 165376404edcSAsim Jamshed int 165476404edcSAsim Jamshed mtcp_init(const char *config_file) 165576404edcSAsim Jamshed { 165676404edcSAsim Jamshed int i; 165776404edcSAsim Jamshed int ret; 165876404edcSAsim Jamshed 165976404edcSAsim Jamshed if (geteuid()) { 166076404edcSAsim Jamshed TRACE_CONFIG("[CAUTION] Run as root if mlock is necessary.\n"); 1661*dcdbbb98SAsim Jamshed #if defined(ENABLE_DPDK) || defined(ENABLE_NETMAP) 1662df3fae06SAsim Jamshed TRACE_CONFIG("[CAUTION] Run the app as root!\n"); 1663df3fae06SAsim Jamshed exit(EXIT_FAILURE); 1664df3fae06SAsim Jamshed #endif 166576404edcSAsim Jamshed } 166676404edcSAsim Jamshed 166776404edcSAsim Jamshed /* getting cpu and NIC */ 166876404edcSAsim Jamshed num_cpus = GetNumCPUs(); 166976404edcSAsim Jamshed assert(num_cpus >= 1); 167076404edcSAsim Jamshed for (i = 0; i < num_cpus; i++) { 167176404edcSAsim Jamshed g_mtcp[i] = NULL; 167276404edcSAsim Jamshed running[i] = FALSE; 167376404edcSAsim Jamshed sigint_cnt[i] = 0; 167476404edcSAsim Jamshed } 167576404edcSAsim Jamshed 167676404edcSAsim Jamshed ret = LoadConfigurationUpperHalf(config_file); 167776404edcSAsim Jamshed if (ret) { 167876404edcSAsim Jamshed TRACE_CONFIG("Error occured while loading configuration.\n"); 167976404edcSAsim Jamshed return -1; 168076404edcSAsim Jamshed } 168176404edcSAsim Jamshed 168276404edcSAsim Jamshed #if defined(ENABLE_PSIO) 168376404edcSAsim Jamshed current_iomodule_func = &ps_module_func; 168476404edcSAsim Jamshed #elif defined(ENABLE_DPDK) 168576404edcSAsim Jamshed current_iomodule_func = &dpdk_module_func; 168676404edcSAsim Jamshed #elif defined(ENABLE_PCAP) 168776404edcSAsim Jamshed current_iomodule_func = &pcap_module_func; 1688522d5c66SAsim Jamshed #elif defined(ENABLE_NETMAP) 1689522d5c66SAsim Jamshed current_iomodule_func = &netmap_module_func; 169076404edcSAsim Jamshed #endif 169176404edcSAsim Jamshed 169276404edcSAsim Jamshed if (current_iomodule_func->load_module_upper_half) 169376404edcSAsim Jamshed current_iomodule_func->load_module_upper_half(); 169476404edcSAsim Jamshed 169576404edcSAsim Jamshed LoadConfigurationLowerHalf(); 169676404edcSAsim Jamshed 169776404edcSAsim Jamshed //PrintConfiguration(); 169876404edcSAsim Jamshed 1699a5e1a556SAsim Jamshed for (i = 0; i < g_config.mos->netdev_table->num; i++) { 1700a5e1a556SAsim Jamshed ap[i] = CreateAddressPool(g_config.mos->netdev_table->ent[i]->ip_addr, 1); 1701a5e1a556SAsim Jamshed if (!ap[i]) { 1702a5e1a556SAsim Jamshed TRACE_CONFIG("Error occured while create address pool[%d]\n", 1703a5e1a556SAsim Jamshed i); 170476404edcSAsim Jamshed return -1; 170576404edcSAsim Jamshed } 1706a5e1a556SAsim Jamshed } 170776404edcSAsim Jamshed 170876404edcSAsim Jamshed //PrintInterfaceInfo(); 170976404edcSAsim Jamshed //PrintRoutingTable(); 171076404edcSAsim Jamshed //PrintARPTable(); 171176404edcSAsim Jamshed InitARPTable(); 171276404edcSAsim Jamshed 171376404edcSAsim Jamshed if (signal(SIGUSR1, HandleSignal) == SIG_ERR) { 171476404edcSAsim Jamshed perror("signal, SIGUSR1"); 171576404edcSAsim Jamshed return -1; 171676404edcSAsim Jamshed } 171776404edcSAsim Jamshed if (signal(SIGINT, HandleSignal) == SIG_ERR) { 171876404edcSAsim Jamshed perror("signal, SIGINT"); 171976404edcSAsim Jamshed return -1; 172076404edcSAsim Jamshed } 172176404edcSAsim Jamshed app_signal_handler = NULL; 172276404edcSAsim Jamshed 172376404edcSAsim Jamshed printf("load_module(): %p\n", current_iomodule_func); 172476404edcSAsim Jamshed /* load system-wide io module specs */ 172576404edcSAsim Jamshed if (current_iomodule_func->load_module_lower_half) 172676404edcSAsim Jamshed current_iomodule_func->load_module_lower_half(); 172776404edcSAsim Jamshed 172876404edcSAsim Jamshed GlobInitEvent(); 172976404edcSAsim Jamshed 173076404edcSAsim Jamshed PrintConf(&g_config); 173176404edcSAsim Jamshed 173276404edcSAsim Jamshed return 0; 173376404edcSAsim Jamshed } 173476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 173576404edcSAsim Jamshed int 173676404edcSAsim Jamshed mtcp_destroy() 173776404edcSAsim Jamshed { 173876404edcSAsim Jamshed int i; 173976404edcSAsim Jamshed 174076404edcSAsim Jamshed /* wait until all threads are closed */ 174176404edcSAsim Jamshed for (i = 0; i < num_cpus; i++) { 174276404edcSAsim Jamshed if (running[i]) { 174376404edcSAsim Jamshed if (pthread_join(g_thread[i], NULL) != 0) 174476404edcSAsim Jamshed return -1; 174576404edcSAsim Jamshed } 174676404edcSAsim Jamshed } 174776404edcSAsim Jamshed 1748a5e1a556SAsim Jamshed for (i = 0; i < g_config.mos->netdev_table->num; i++) 1749a5e1a556SAsim Jamshed DestroyAddressPool(ap[i]); 175076404edcSAsim Jamshed 175176404edcSAsim Jamshed TRACE_INFO("All MTCP threads are joined.\n"); 175276404edcSAsim Jamshed 175376404edcSAsim Jamshed return 0; 175476404edcSAsim Jamshed } 175576404edcSAsim Jamshed /*----------------------------------------------------------------------------*/ 1756