xref: /mOS-networking-stack/core/src/core.c (revision eac4da1c)
1eac4da1cSM. Asim Jamshed #ifndef _GNU_SOURCE
276404edcSAsim Jamshed #define _GNU_SOURCE
3eac4da1cSM. Asim Jamshed #endif
476404edcSAsim Jamshed #include <sched.h>
576404edcSAsim Jamshed #include <unistd.h>
676404edcSAsim Jamshed #include <sys/time.h>
776404edcSAsim Jamshed #include <semaphore.h>
876404edcSAsim Jamshed #include <sys/mman.h>
976404edcSAsim Jamshed #include <signal.h>
1076404edcSAsim Jamshed #include <assert.h>
1176404edcSAsim Jamshed #include <string.h>
1276404edcSAsim Jamshed 
1376404edcSAsim Jamshed #include "cpu.h"
1476404edcSAsim Jamshed #include "eth_in.h"
1576404edcSAsim Jamshed #include "fhash.h"
1676404edcSAsim Jamshed #include "tcp_send_buffer.h"
1776404edcSAsim Jamshed #include "tcp_ring_buffer.h"
1876404edcSAsim Jamshed #include "socket.h"
1976404edcSAsim Jamshed #include "eth_out.h"
2076404edcSAsim Jamshed #include "tcp.h"
2176404edcSAsim Jamshed #include "tcp_in.h"
2276404edcSAsim Jamshed #include "tcp_out.h"
2376404edcSAsim Jamshed #include "mtcp_api.h"
2476404edcSAsim Jamshed #include "eventpoll.h"
2576404edcSAsim Jamshed #include "logger.h"
2676404edcSAsim Jamshed #include "config.h"
2776404edcSAsim Jamshed #include "arp.h"
2876404edcSAsim Jamshed #include "ip_out.h"
2976404edcSAsim Jamshed #include "timer.h"
3076404edcSAsim Jamshed #include "debug.h"
3176404edcSAsim Jamshed #include "event_callback.h"
3276404edcSAsim Jamshed #include "tcp_rb.h"
3376404edcSAsim Jamshed #include "tcp_stream.h"
3476404edcSAsim Jamshed #include "io_module.h"
3576404edcSAsim Jamshed 
3676404edcSAsim Jamshed #ifdef ENABLE_DPDK
3776404edcSAsim Jamshed /* for launching rte thread */
3876404edcSAsim Jamshed #include <rte_launch.h>
3976404edcSAsim Jamshed #include <rte_lcore.h>
4076404edcSAsim Jamshed #endif /* !ENABLE_DPDK */
4176404edcSAsim Jamshed #define PS_CHUNK_SIZE 64
4276404edcSAsim Jamshed #define RX_THRESH (PS_CHUNK_SIZE * 0.8)
4376404edcSAsim Jamshed 
4476404edcSAsim Jamshed #define ROUND_STAT FALSE
4576404edcSAsim Jamshed #define TIME_STAT FALSE
4676404edcSAsim Jamshed #define EVENT_STAT FALSE
4776404edcSAsim Jamshed #define TESTING FALSE
4876404edcSAsim Jamshed 
4976404edcSAsim Jamshed #define LOG_FILE_NAME "log"
5076404edcSAsim Jamshed #define MAX_FILE_NAME 1024
5176404edcSAsim Jamshed 
5276404edcSAsim Jamshed #define MAX(a, b) ((a)>(b)?(a):(b))
5376404edcSAsim Jamshed #define MIN(a, b) ((a)<(b)?(a):(b))
5476404edcSAsim Jamshed 
5576404edcSAsim Jamshed #define PER_STREAM_SLICE 0.1		// in ms
5676404edcSAsim Jamshed #define PER_STREAM_TCHECK 1			// in ms
5776404edcSAsim Jamshed #define PS_SELECT_TIMEOUT 100		// in us
5876404edcSAsim Jamshed 
5976404edcSAsim Jamshed #define GBPS(bytes) (bytes * 8.0 / (1000 * 1000 * 1000))
6076404edcSAsim Jamshed 
6176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
6276404edcSAsim Jamshed /* handlers for threads */
6376404edcSAsim Jamshed struct mtcp_thread_context *g_pctx[MAX_CPUS] = {0};
6476404edcSAsim Jamshed struct log_thread_context *g_logctx[MAX_CPUS] = {0};
6576404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
6676404edcSAsim Jamshed static pthread_t g_thread[MAX_CPUS] = {0};
6776404edcSAsim Jamshed static pthread_t log_thread[MAX_CPUS] = {0};
6876404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
6976404edcSAsim Jamshed static sem_t g_init_sem[MAX_CPUS];
7076404edcSAsim Jamshed static sem_t g_done_sem[MAX_CPUS];
7176404edcSAsim Jamshed static int running[MAX_CPUS] = {0};
7276404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
7376404edcSAsim Jamshed mtcp_sighandler_t app_signal_handler;
7476404edcSAsim Jamshed static int sigint_cnt[MAX_CPUS] = {0};
7576404edcSAsim Jamshed static struct timespec sigint_ts[MAX_CPUS];
7605e3289cSYoungGyoun void mtcp_free_context(mctx_t mctx);
7776404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
7876404edcSAsim Jamshed #ifdef NETSTAT
7976404edcSAsim Jamshed #if NETSTAT_TOTAL
8076404edcSAsim Jamshed static int printer = -1;
8176404edcSAsim Jamshed #if ROUND_STAT
8276404edcSAsim Jamshed #endif /* ROUND_STAT */
8376404edcSAsim Jamshed #endif /* NETSTAT_TOTAL */
8476404edcSAsim Jamshed #endif /* NETSTAT */
8576404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
8676404edcSAsim Jamshed void
HandleSignal(int signal)8776404edcSAsim Jamshed HandleSignal(int signal)
8876404edcSAsim Jamshed {
8976404edcSAsim Jamshed 	int i = 0;
9076404edcSAsim Jamshed 
9176404edcSAsim Jamshed 	if (signal == SIGINT) {
928a941c7eSAsim Jamshed 		FreeConfigResources();
9376404edcSAsim Jamshed #ifdef DARWIN
9476404edcSAsim Jamshed 		int core = 0;
9576404edcSAsim Jamshed #else
9676404edcSAsim Jamshed 		int core = sched_getcpu();
9776404edcSAsim Jamshed #endif
9876404edcSAsim Jamshed 		struct timespec cur_ts;
9976404edcSAsim Jamshed 
10076404edcSAsim Jamshed 		clock_gettime(CLOCK_REALTIME, &cur_ts);
10176404edcSAsim Jamshed 
10276404edcSAsim Jamshed 		if (sigint_cnt[core] > 0 && cur_ts.tv_sec == sigint_ts[core].tv_sec) {
10376404edcSAsim Jamshed 			for (i = 0; i < g_config.mos->num_cores; i++) {
10476404edcSAsim Jamshed 				if (running[i]) {
1058c9e1184SAsim Jamshed 					//exit(0);
10676404edcSAsim Jamshed 					g_pctx[i]->exit = TRUE;
10776404edcSAsim Jamshed 				}
10876404edcSAsim Jamshed 			}
10976404edcSAsim Jamshed 		} else {
11076404edcSAsim Jamshed 			for (i = 0; i < g_config.mos->num_cores; i++) {
11176404edcSAsim Jamshed 				if (g_pctx[i])
11276404edcSAsim Jamshed 					g_pctx[i]->interrupt = TRUE;
11376404edcSAsim Jamshed 			}
11476404edcSAsim Jamshed 			if (!app_signal_handler) {
11576404edcSAsim Jamshed 				for (i = 0; i < g_config.mos->num_cores; i++) {
11676404edcSAsim Jamshed 					if (running[i]) {
1178c9e1184SAsim Jamshed 						//exit(0);
11876404edcSAsim Jamshed 						g_pctx[i]->exit = TRUE;
11976404edcSAsim Jamshed 					}
12076404edcSAsim Jamshed 				}
12176404edcSAsim Jamshed 			}
12276404edcSAsim Jamshed 		}
12376404edcSAsim Jamshed 		sigint_cnt[core]++;
12476404edcSAsim Jamshed 		clock_gettime(CLOCK_REALTIME, &sigint_ts[core]);
12576404edcSAsim Jamshed 	}
12676404edcSAsim Jamshed 
12776404edcSAsim Jamshed 	if (signal != SIGUSR1) {
12876404edcSAsim Jamshed 		if (app_signal_handler) {
12976404edcSAsim Jamshed 			app_signal_handler(signal);
13076404edcSAsim Jamshed 		}
13176404edcSAsim Jamshed 	}
13276404edcSAsim Jamshed }
13376404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
13476404edcSAsim Jamshed static int
AttachDevice(struct mtcp_thread_context * ctx)13576404edcSAsim Jamshed AttachDevice(struct mtcp_thread_context* ctx)
13676404edcSAsim Jamshed {
13776404edcSAsim Jamshed 	int working = -1;
13876404edcSAsim Jamshed 	mtcp_manager_t mtcp = ctx->mtcp_manager;
13976404edcSAsim Jamshed 
14076404edcSAsim Jamshed 	if (mtcp->iom->link_devices)
14176404edcSAsim Jamshed 		working = mtcp->iom->link_devices(ctx);
14276404edcSAsim Jamshed 	else
14376404edcSAsim Jamshed 		return 0;
14476404edcSAsim Jamshed 
14576404edcSAsim Jamshed 	return working;
14676404edcSAsim Jamshed }
14776404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
14876404edcSAsim Jamshed #ifdef TIMESTAT
14976404edcSAsim Jamshed static inline void
InitStatCounter(struct stat_counter * counter)15076404edcSAsim Jamshed InitStatCounter(struct stat_counter *counter)
15176404edcSAsim Jamshed {
15276404edcSAsim Jamshed 	counter->cnt = 0;
15376404edcSAsim Jamshed 	counter->sum = 0;
15476404edcSAsim Jamshed 	counter->max = 0;
15576404edcSAsim Jamshed 	counter->min = 0;
15676404edcSAsim Jamshed }
15776404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
15876404edcSAsim Jamshed static inline void
UpdateStatCounter(struct stat_counter * counter,int64_t value)15976404edcSAsim Jamshed UpdateStatCounter(struct stat_counter *counter, int64_t value)
16076404edcSAsim Jamshed {
16176404edcSAsim Jamshed 	counter->cnt++;
16276404edcSAsim Jamshed 	counter->sum += value;
16376404edcSAsim Jamshed 	if (value > counter->max)
16476404edcSAsim Jamshed 		counter->max = value;
16576404edcSAsim Jamshed 	if (counter->min == 0 || value < counter->min)
16676404edcSAsim Jamshed 		counter->min = value;
16776404edcSAsim Jamshed }
16876404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
16976404edcSAsim Jamshed static inline uint64_t
GetAverageStat(struct stat_counter * counter)17076404edcSAsim Jamshed GetAverageStat(struct stat_counter *counter)
17176404edcSAsim Jamshed {
17276404edcSAsim Jamshed 	return counter->cnt ? (counter->sum / counter->cnt) : 0;
17376404edcSAsim Jamshed }
17476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
17576404edcSAsim Jamshed static inline int64_t
TimeDiffUs(struct timeval * t2,struct timeval * t1)17676404edcSAsim Jamshed TimeDiffUs(struct timeval *t2, struct timeval *t1)
17776404edcSAsim Jamshed {
17876404edcSAsim Jamshed 	return (t2->tv_sec - t1->tv_sec) * 1000000 +
17976404edcSAsim Jamshed 			(int64_t)(t2->tv_usec - t1->tv_usec);
18076404edcSAsim Jamshed }
18176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
18276404edcSAsim Jamshed #endif
18376404edcSAsim Jamshed #ifdef NETSTAT
18476404edcSAsim Jamshed static inline void
PrintThreadNetworkStats(mtcp_manager_t mtcp,struct net_stat * ns)18576404edcSAsim Jamshed PrintThreadNetworkStats(mtcp_manager_t mtcp, struct net_stat *ns)
18676404edcSAsim Jamshed {
18776404edcSAsim Jamshed 	int i;
18876404edcSAsim Jamshed 
18976404edcSAsim Jamshed 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
19076404edcSAsim Jamshed 		ns->rx_packets[i] = mtcp->nstat.rx_packets[i] - mtcp->p_nstat.rx_packets[i];
19176404edcSAsim Jamshed 		ns->rx_errors[i] = mtcp->nstat.rx_errors[i] - mtcp->p_nstat.rx_errors[i];
19276404edcSAsim Jamshed 		ns->rx_bytes[i] = mtcp->nstat.rx_bytes[i] - mtcp->p_nstat.rx_bytes[i];
19376404edcSAsim Jamshed 		ns->tx_packets[i] = mtcp->nstat.tx_packets[i] - mtcp->p_nstat.tx_packets[i];
19476404edcSAsim Jamshed 		ns->tx_drops[i] = mtcp->nstat.tx_drops[i] - mtcp->p_nstat.tx_drops[i];
19576404edcSAsim Jamshed 		ns->tx_bytes[i] = mtcp->nstat.tx_bytes[i] - mtcp->p_nstat.tx_bytes[i];
19676404edcSAsim Jamshed #if NETSTAT_PERTHREAD
19776404edcSAsim Jamshed 		if (g_config.mos->netdev_table->ent[i]->stat_print) {
19876404edcSAsim Jamshed 			fprintf(stderr, "[CPU%2d] %s flows: %6u, "
19976404edcSAsim Jamshed 					"RX: %7llu(pps) (err: %5llu), %5.2lf(Gbps), "
20076404edcSAsim Jamshed 					"TX: %7llu(pps), %5.2lf(Gbps)\n",
20176404edcSAsim Jamshed 					mtcp->ctx->cpu,
20276404edcSAsim Jamshed 					g_config.mos->netdev_table->ent[i]->dev_name,
20376404edcSAsim Jamshed 					(unsigned)mtcp->flow_cnt,
20476404edcSAsim Jamshed 					(long long unsigned)ns->rx_packets[i],
20576404edcSAsim Jamshed 					(long long unsigned)ns->rx_errors[i],
20676404edcSAsim Jamshed 					GBPS(ns->rx_bytes[i]),
20776404edcSAsim Jamshed 					(long long unsigned)ns->tx_packets[i],
20876404edcSAsim Jamshed 					GBPS(ns->tx_bytes[i]));
20976404edcSAsim Jamshed 		}
21076404edcSAsim Jamshed #endif
21176404edcSAsim Jamshed 	}
21276404edcSAsim Jamshed 	mtcp->p_nstat = mtcp->nstat;
21376404edcSAsim Jamshed 
21476404edcSAsim Jamshed }
21576404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
21676404edcSAsim Jamshed #if ROUND_STAT
21776404edcSAsim Jamshed static inline void
PrintThreadRoundStats(mtcp_manager_t mtcp,struct run_stat * rs)21876404edcSAsim Jamshed PrintThreadRoundStats(mtcp_manager_t mtcp, struct run_stat *rs)
21976404edcSAsim Jamshed {
22076404edcSAsim Jamshed #define ROUND_DIV (1000)
22176404edcSAsim Jamshed 	rs->rounds = mtcp->runstat.rounds - mtcp->p_runstat.rounds;
22276404edcSAsim Jamshed 	rs->rounds_rx = mtcp->runstat.rounds_rx - mtcp->p_runstat.rounds_rx;
22376404edcSAsim Jamshed 	rs->rounds_rx_try = mtcp->runstat.rounds_rx_try - mtcp->p_runstat.rounds_rx_try;
22476404edcSAsim Jamshed 	rs->rounds_tx = mtcp->runstat.rounds_tx - mtcp->p_runstat.rounds_tx;
22576404edcSAsim Jamshed 	rs->rounds_tx_try = mtcp->runstat.rounds_tx_try - mtcp->p_runstat.rounds_tx_try;
22676404edcSAsim Jamshed 	rs->rounds_select = mtcp->runstat.rounds_select - mtcp->p_runstat.rounds_select;
22776404edcSAsim Jamshed 	rs->rounds_select_rx = mtcp->runstat.rounds_select_rx - mtcp->p_runstat.rounds_select_rx;
22876404edcSAsim Jamshed 	rs->rounds_select_tx = mtcp->runstat.rounds_select_tx - mtcp->p_runstat.rounds_select_tx;
22976404edcSAsim Jamshed 	rs->rounds_select_intr = mtcp->runstat.rounds_select_intr - mtcp->p_runstat.rounds_select_intr;
23076404edcSAsim Jamshed 	rs->rounds_twcheck = mtcp->runstat.rounds_twcheck - mtcp->p_runstat.rounds_twcheck;
23176404edcSAsim Jamshed 	mtcp->p_runstat = mtcp->runstat;
23276404edcSAsim Jamshed #if NETSTAT_PERTHREAD
23376404edcSAsim Jamshed 	fprintf(stderr, "[CPU%2d] Rounds: %4lluK, "
23476404edcSAsim Jamshed 			"rx: %3lluK (try: %4lluK), tx: %3lluK (try: %4lluK), "
23576404edcSAsim Jamshed 			"ps_select: %4llu (rx: %4llu, tx: %4llu, intr: %3llu)\n",
23676404edcSAsim Jamshed 			mtcp->ctx->cpu, rs->rounds / ROUND_DIV,
23776404edcSAsim Jamshed 			rs->rounds_rx / ROUND_DIV, rs->rounds_rx_try / ROUND_DIV,
23876404edcSAsim Jamshed 			rs->rounds_tx / ROUND_DIV, rs->rounds_tx_try / ROUND_DIV,
23976404edcSAsim Jamshed 			rs->rounds_select,
24076404edcSAsim Jamshed 			rs->rounds_select_rx, rs->rounds_select_tx, rs->rounds_select_intr);
24176404edcSAsim Jamshed #endif
24276404edcSAsim Jamshed }
24376404edcSAsim Jamshed #endif /* ROUND_STAT */
24476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
24576404edcSAsim Jamshed #if TIME_STAT
24676404edcSAsim Jamshed static inline void
PrintThreadRoundTime(mtcp_manager_t mtcp)24776404edcSAsim Jamshed PrintThreadRoundTime(mtcp_manager_t mtcp)
24876404edcSAsim Jamshed {
24976404edcSAsim Jamshed 	fprintf(stderr, "[CPU%2d] Time: (avg, max) "
25076404edcSAsim Jamshed 			"round: (%4luus, %4luus), processing: (%4luus, %4luus), "
25176404edcSAsim Jamshed 			"tcheck: (%4luus, %4luus), epoll: (%4luus, %4luus), "
25276404edcSAsim Jamshed 			"handle: (%4luus, %4luus), xmit: (%4luus, %4luus), "
25376404edcSAsim Jamshed 			"select: (%4luus, %4luus)\n", mtcp->ctx->cpu,
25476404edcSAsim Jamshed 			GetAverageStat(&mtcp->rtstat.round), mtcp->rtstat.round.max,
25576404edcSAsim Jamshed 			GetAverageStat(&mtcp->rtstat.processing), mtcp->rtstat.processing.max,
25676404edcSAsim Jamshed 			GetAverageStat(&mtcp->rtstat.tcheck), mtcp->rtstat.tcheck.max,
25776404edcSAsim Jamshed 			GetAverageStat(&mtcp->rtstat.epoll), mtcp->rtstat.epoll.max,
25876404edcSAsim Jamshed 			GetAverageStat(&mtcp->rtstat.handle), mtcp->rtstat.handle.max,
25976404edcSAsim Jamshed 			GetAverageStat(&mtcp->rtstat.xmit), mtcp->rtstat.xmit.max,
26076404edcSAsim Jamshed 			GetAverageStat(&mtcp->rtstat.select), mtcp->rtstat.select.max);
26176404edcSAsim Jamshed 
26276404edcSAsim Jamshed 	InitStatCounter(&mtcp->rtstat.round);
26376404edcSAsim Jamshed 	InitStatCounter(&mtcp->rtstat.processing);
26476404edcSAsim Jamshed 	InitStatCounter(&mtcp->rtstat.tcheck);
26576404edcSAsim Jamshed 	InitStatCounter(&mtcp->rtstat.epoll);
26676404edcSAsim Jamshed 	InitStatCounter(&mtcp->rtstat.handle);
26776404edcSAsim Jamshed 	InitStatCounter(&mtcp->rtstat.xmit);
26876404edcSAsim Jamshed 	InitStatCounter(&mtcp->rtstat.select);
26976404edcSAsim Jamshed }
27076404edcSAsim Jamshed #endif
27176404edcSAsim Jamshed #endif /* NETSTAT */
27276404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
27376404edcSAsim Jamshed #if EVENT_STAT
27476404edcSAsim Jamshed static inline void
PrintEventStat(int core,struct mtcp_epoll_stat * stat)27576404edcSAsim Jamshed PrintEventStat(int core, struct mtcp_epoll_stat *stat)
27676404edcSAsim Jamshed {
27776404edcSAsim Jamshed 	fprintf(stderr, "[CPU%2d] calls: %lu, waits: %lu, wakes: %lu, "
27876404edcSAsim Jamshed 			"issued: %lu, registered: %lu, invalidated: %lu, handled: %lu\n",
27976404edcSAsim Jamshed 			core, stat->calls, stat->waits, stat->wakes,
28076404edcSAsim Jamshed 			stat->issued, stat->registered, stat->invalidated, stat->handled);
28176404edcSAsim Jamshed 	memset(stat, 0, sizeof(struct mtcp_epoll_stat));
28276404edcSAsim Jamshed }
28376404edcSAsim Jamshed #endif /* EVENT_STAT */
28476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
28576404edcSAsim Jamshed #ifdef NETSTAT
28676404edcSAsim Jamshed static inline void
PrintNetworkStats(mtcp_manager_t mtcp,uint32_t cur_ts)28776404edcSAsim Jamshed PrintNetworkStats(mtcp_manager_t mtcp, uint32_t cur_ts)
28876404edcSAsim Jamshed {
28976404edcSAsim Jamshed #define TIMEOUT 1
29076404edcSAsim Jamshed 	int i;
29176404edcSAsim Jamshed 	struct net_stat ns;
29276404edcSAsim Jamshed 	bool stat_print = false;
29376404edcSAsim Jamshed #if ROUND_STAT
29476404edcSAsim Jamshed 	struct run_stat rs;
29576404edcSAsim Jamshed #endif /* ROUND_STAT */
29676404edcSAsim Jamshed #ifdef NETSTAT_TOTAL
29776404edcSAsim Jamshed 	static double peak_total_rx_gbps = 0;
29876404edcSAsim Jamshed 	static double peak_total_tx_gbps = 0;
29976404edcSAsim Jamshed 	static double avg_total_rx_gbps = 0;
30076404edcSAsim Jamshed 	static double avg_total_tx_gbps = 0;
30176404edcSAsim Jamshed 
30276404edcSAsim Jamshed 	double total_rx_gbps = 0, total_tx_gbps = 0;
30376404edcSAsim Jamshed 	int j;
30476404edcSAsim Jamshed 	uint32_t gflow_cnt = 0;
30576404edcSAsim Jamshed 	struct net_stat g_nstat;
30676404edcSAsim Jamshed #if ROUND_STAT
30776404edcSAsim Jamshed 	struct run_stat g_runstat;
30876404edcSAsim Jamshed #endif /* ROUND_STAT */
30976404edcSAsim Jamshed #endif /* NETSTAT_TOTAL */
31076404edcSAsim Jamshed 
31176404edcSAsim Jamshed 	if (TS_TO_MSEC(cur_ts - mtcp->p_nstat_ts) < SEC_TO_MSEC(TIMEOUT)) {
31276404edcSAsim Jamshed 		return;
31376404edcSAsim Jamshed 	}
31476404edcSAsim Jamshed 
31576404edcSAsim Jamshed 	mtcp->p_nstat_ts = cur_ts;
31676404edcSAsim Jamshed 	gflow_cnt = 0;
31776404edcSAsim Jamshed 	memset(&g_nstat, 0, sizeof(struct net_stat));
31876404edcSAsim Jamshed 	for (i = 0; i < g_config.mos->num_cores; i++) {
31976404edcSAsim Jamshed 		if (running[i]) {
32076404edcSAsim Jamshed 			PrintThreadNetworkStats(g_mtcp[i], &ns);
32176404edcSAsim Jamshed #if NETSTAT_TOTAL
32276404edcSAsim Jamshed 			gflow_cnt += g_mtcp[i]->flow_cnt;
32376404edcSAsim Jamshed 			for (j = 0; j < g_config.mos->netdev_table->num; j++) {
32476404edcSAsim Jamshed 				g_nstat.rx_packets[j] += ns.rx_packets[j];
32576404edcSAsim Jamshed 				g_nstat.rx_errors[j] += ns.rx_errors[j];
32676404edcSAsim Jamshed 				g_nstat.rx_bytes[j] += ns.rx_bytes[j];
32776404edcSAsim Jamshed 				g_nstat.tx_packets[j] += ns.tx_packets[j];
32876404edcSAsim Jamshed 				g_nstat.tx_drops[j] += ns.tx_drops[j];
32976404edcSAsim Jamshed 				g_nstat.tx_bytes[j] += ns.tx_bytes[j];
33076404edcSAsim Jamshed 			}
33176404edcSAsim Jamshed #endif
33276404edcSAsim Jamshed 		}
33376404edcSAsim Jamshed 	}
33476404edcSAsim Jamshed #if NETSTAT_TOTAL
33576404edcSAsim Jamshed 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
33676404edcSAsim Jamshed 		if (g_config.mos->netdev_table->ent[i]->stat_print) {
33776404edcSAsim Jamshed 			fprintf(stderr, "[ ALL ] %s, "
33876404edcSAsim Jamshed 			"RX: %7llu(pps) (err: %5llu), %5.2lf(Gbps), "
33976404edcSAsim Jamshed 			"TX: %7llu(pps), %5.2lf(Gbps)\n",
34076404edcSAsim Jamshed 				g_config.mos->netdev_table->ent[i]->dev_name,
34176404edcSAsim Jamshed 				(long long unsigned)g_nstat.rx_packets[i],
34276404edcSAsim Jamshed 				(long long unsigned)g_nstat.rx_errors[i],
34376404edcSAsim Jamshed 				GBPS(g_nstat.rx_bytes[i]),
34476404edcSAsim Jamshed 				(long long unsigned)g_nstat.tx_packets[i],
34576404edcSAsim Jamshed 				GBPS(g_nstat.tx_bytes[i]));
34676404edcSAsim Jamshed 			total_rx_gbps += GBPS(g_nstat.rx_bytes[i]);
34776404edcSAsim Jamshed 			total_tx_gbps += GBPS(g_nstat.tx_bytes[i]);
34876404edcSAsim Jamshed 			stat_print = true;
34976404edcSAsim Jamshed 		}
35076404edcSAsim Jamshed 	}
35176404edcSAsim Jamshed 	if (stat_print) {
35276404edcSAsim Jamshed 		fprintf(stderr, "[ ALL ] flows: %6u\n", gflow_cnt);
35376404edcSAsim Jamshed 		if (avg_total_rx_gbps == 0)
35476404edcSAsim Jamshed 			avg_total_rx_gbps = total_rx_gbps;
35576404edcSAsim Jamshed 		else
35676404edcSAsim Jamshed 			avg_total_rx_gbps = avg_total_rx_gbps * 0.6 + total_rx_gbps * 0.4;
35776404edcSAsim Jamshed 
35876404edcSAsim Jamshed 		if (avg_total_tx_gbps == 0)
35976404edcSAsim Jamshed 			avg_total_tx_gbps = total_tx_gbps;
36076404edcSAsim Jamshed 		else
36176404edcSAsim Jamshed 			avg_total_tx_gbps = avg_total_tx_gbps * 0.6 + total_tx_gbps * 0.4;
36276404edcSAsim Jamshed 
36376404edcSAsim Jamshed 		if (peak_total_rx_gbps < total_rx_gbps)
36476404edcSAsim Jamshed 			peak_total_rx_gbps = total_rx_gbps;
36576404edcSAsim Jamshed 		if (peak_total_tx_gbps < total_tx_gbps)
36676404edcSAsim Jamshed 			peak_total_tx_gbps = total_tx_gbps;
36776404edcSAsim Jamshed 
36876404edcSAsim Jamshed 		fprintf(stderr, "[ PEAK ] RX: %5.2lf(Gbps), TX: %5.2lf(Gbps)\n"
36976404edcSAsim Jamshed 						"[ RECENT AVG ] RX: %5.2lf(Gbps), TX: %5.2lf(Gbps)\n",
37076404edcSAsim Jamshed 				peak_total_rx_gbps, peak_total_tx_gbps,
37176404edcSAsim Jamshed 				avg_total_rx_gbps, avg_total_tx_gbps);
37276404edcSAsim Jamshed 	}
37376404edcSAsim Jamshed #endif
37476404edcSAsim Jamshed 
37576404edcSAsim Jamshed #if ROUND_STAT
37676404edcSAsim Jamshed 	memset(&g_runstat, 0, sizeof(struct run_stat));
37776404edcSAsim Jamshed 	for (i = 0; i < g_config.mos->num_cores; i++) {
37876404edcSAsim Jamshed 		if (running[i]) {
37976404edcSAsim Jamshed 			PrintThreadRoundStats(g_mtcp[i], &rs);
38076404edcSAsim Jamshed #if DBGMSG
38176404edcSAsim Jamshed 			g_runstat.rounds += rs.rounds;
38276404edcSAsim Jamshed 			g_runstat.rounds_rx += rs.rounds_rx;
38376404edcSAsim Jamshed 			g_runstat.rounds_rx_try += rs.rounds_rx_try;
38476404edcSAsim Jamshed 			g_runstat.rounds_tx += rs.rounds_tx;
38576404edcSAsim Jamshed 			g_runstat.rounds_tx_try += rs.rounds_tx_try;
38676404edcSAsim Jamshed 			g_runstat.rounds_select += rs.rounds_select;
38776404edcSAsim Jamshed 			g_runstat.rounds_select_rx += rs.rounds_select_rx;
38876404edcSAsim Jamshed 			g_runstat.rounds_select_tx += rs.rounds_select_tx;
38976404edcSAsim Jamshed #endif
39076404edcSAsim Jamshed 		}
39176404edcSAsim Jamshed 	}
39276404edcSAsim Jamshed 
39376404edcSAsim Jamshed 	TRACE_DBG("[ ALL ] Rounds: %4ldK, "
39476404edcSAsim Jamshed 		  "rx: %3ldK (try: %4ldK), tx: %3ldK (try: %4ldK), "
39576404edcSAsim Jamshed 		  "ps_select: %4ld (rx: %4ld, tx: %4ld)\n",
39676404edcSAsim Jamshed 		  g_runstat.rounds / 1000, g_runstat.rounds_rx / 1000,
39776404edcSAsim Jamshed 		  g_runstat.rounds_rx_try / 1000, g_runstat.rounds_tx / 1000,
39876404edcSAsim Jamshed 		  g_runstat.rounds_tx_try / 1000, g_runstat.rounds_select,
39976404edcSAsim Jamshed 		  g_runstat.rounds_select_rx, g_runstat.rounds_select_tx);
40076404edcSAsim Jamshed #endif /* ROUND_STAT */
40176404edcSAsim Jamshed 
40276404edcSAsim Jamshed #if TIME_STAT
40376404edcSAsim Jamshed 	for (i = 0; i < g_config.mos->num_cores; i++) {
40476404edcSAsim Jamshed 		if (running[i]) {
40576404edcSAsim Jamshed 			PrintThreadRoundTime(g_mtcp[i]);
40676404edcSAsim Jamshed 		}
40776404edcSAsim Jamshed 	}
40876404edcSAsim Jamshed #endif
40976404edcSAsim Jamshed 
41076404edcSAsim Jamshed #if EVENT_STAT
41176404edcSAsim Jamshed 	for (i = 0; i < g_config.mos->num_cores; i++) {
41276404edcSAsim Jamshed 		if (running[i] && g_mtcp[i]->ep) {
41376404edcSAsim Jamshed 			PrintEventStat(i, &g_mtcp[i]->ep->stat);
41476404edcSAsim Jamshed 		}
41576404edcSAsim Jamshed 	}
41676404edcSAsim Jamshed #endif
41776404edcSAsim Jamshed 
41876404edcSAsim Jamshed 	fflush(stderr);
41976404edcSAsim Jamshed }
42076404edcSAsim Jamshed #endif /* NETSTAT */
42176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
42276404edcSAsim Jamshed static inline void
FlushMonitorReadEvents(mtcp_manager_t mtcp)42376404edcSAsim Jamshed FlushMonitorReadEvents(mtcp_manager_t mtcp)
42476404edcSAsim Jamshed {
42576404edcSAsim Jamshed 	struct event_queue *mtcpq;
42676404edcSAsim Jamshed 	struct tcp_stream *cur_stream;
42776404edcSAsim Jamshed 	struct mon_listener *walk;
42876404edcSAsim Jamshed 
42976404edcSAsim Jamshed 	/* check if monitor sockets should be passed data */
43076404edcSAsim Jamshed 	TAILQ_FOREACH(walk, &mtcp->monitors, link) {
43176404edcSAsim Jamshed 		if (walk->socket->socktype != MOS_SOCK_MONITOR_STREAM ||
43276404edcSAsim Jamshed 			!(mtcpq = walk->eq))
43376404edcSAsim Jamshed 			continue;
43476404edcSAsim Jamshed 
43576404edcSAsim Jamshed 		while (mtcpq->num_events > 0) {
43676404edcSAsim Jamshed 			cur_stream =
43776404edcSAsim Jamshed 				(struct tcp_stream *)mtcpq->events[mtcpq->start++].ev.data.ptr;
43876404edcSAsim Jamshed 			/* only read events */
43976404edcSAsim Jamshed 			if (cur_stream != NULL &&
440c6a5549bSAsim Jamshed 			    (cur_stream->actions & MOS_ACT_READ_DATA)) {
44176404edcSAsim Jamshed 				if (cur_stream->rcvvar != NULL &&
44276404edcSAsim Jamshed 						cur_stream->rcvvar->rcvbuf != NULL) {
44376404edcSAsim Jamshed 					/* no need to pass pkt context */
44476404edcSAsim Jamshed 					struct socket_map *walk;
44505e3289cSYoungGyoun 					if (cur_stream->side == MOS_SIDE_CLI) {
44605e3289cSYoungGyoun 						SOCKQ_FOREACH_REVERSE(walk, &cur_stream->msocks) {
44705e3289cSYoungGyoun 							HandleCallback(mtcp, MOS_NULL, walk,
44805e3289cSYoungGyoun 								       cur_stream->side, NULL,
44905e3289cSYoungGyoun 								       MOS_ON_CONN_NEW_DATA);
45005e3289cSYoungGyoun 						} SOCKQ_FOREACH_END;
45105e3289cSYoungGyoun 					} else { /* cur_stream->side == MOS_SIDE_SVR */
45276404edcSAsim Jamshed 						SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
45376404edcSAsim Jamshed 							HandleCallback(mtcp, MOS_NULL, walk,
45476404edcSAsim Jamshed 								       cur_stream->side, NULL,
45576404edcSAsim Jamshed 								       MOS_ON_CONN_NEW_DATA);
45676404edcSAsim Jamshed 						} SOCKQ_FOREACH_END;
45776404edcSAsim Jamshed 					}
45805e3289cSYoungGyoun 				}
45976404edcSAsim Jamshed 				/* reset the actions now */
46076404edcSAsim Jamshed 				cur_stream->actions = 0;
46176404edcSAsim Jamshed 			}
46276404edcSAsim Jamshed 			if (mtcpq->start >= mtcpq->size)
46376404edcSAsim Jamshed 				mtcpq->start = 0;
46476404edcSAsim Jamshed 			mtcpq->num_events--;
46576404edcSAsim Jamshed 		}
46676404edcSAsim Jamshed 	}
46776404edcSAsim Jamshed }
46876404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
46976404edcSAsim Jamshed static inline void
FlushBufferedReadEvents(mtcp_manager_t mtcp)47076404edcSAsim Jamshed FlushBufferedReadEvents(mtcp_manager_t mtcp)
47176404edcSAsim Jamshed {
47276404edcSAsim Jamshed 	int i;
47376404edcSAsim Jamshed 	int offset;
47476404edcSAsim Jamshed 	struct event_queue *mtcpq;
47576404edcSAsim Jamshed 	struct tcp_stream *cur_stream;
47676404edcSAsim Jamshed 
47776404edcSAsim Jamshed 	if (mtcp->ep == NULL) {
47876404edcSAsim Jamshed 		TRACE_EPOLL("No epoll socket has been registered yet!\n");
47976404edcSAsim Jamshed 		return;
48076404edcSAsim Jamshed 	} else {
48176404edcSAsim Jamshed 		/* case when mtcpq exists */
48276404edcSAsim Jamshed 		mtcpq = mtcp->ep->mtcp_queue;
48376404edcSAsim Jamshed 		offset = mtcpq->start;
48476404edcSAsim Jamshed 	}
48576404edcSAsim Jamshed 
48676404edcSAsim Jamshed 	/* we will use queued-up epoll read-in events
48776404edcSAsim Jamshed 	 * to trigger buffered read monitor events */
48876404edcSAsim Jamshed 	for (i = 0; i < mtcpq->num_events; i++) {
48976404edcSAsim Jamshed 		cur_stream = mtcp->smap[mtcpq->events[offset++].sockid].stream;
49076404edcSAsim Jamshed 		/* only read events */
49176404edcSAsim Jamshed 		/* Raise new data callback event */
49276404edcSAsim Jamshed 		if (cur_stream != NULL &&
49376404edcSAsim Jamshed 		    	(cur_stream->socket->events | MOS_EPOLLIN)) {
49476404edcSAsim Jamshed 			if (cur_stream->rcvvar != NULL &&
49576404edcSAsim Jamshed 					cur_stream->rcvvar->rcvbuf != NULL) {
49676404edcSAsim Jamshed 				/* no need to pass pkt context */
49776404edcSAsim Jamshed 				struct socket_map *walk;
49805e3289cSYoungGyoun 				if (cur_stream->side == MOS_SIDE_CLI) {
49905e3289cSYoungGyoun 					SOCKQ_FOREACH_REVERSE(walk, &cur_stream->msocks) {
50005e3289cSYoungGyoun 						HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side,
50105e3289cSYoungGyoun 							       NULL, MOS_ON_CONN_NEW_DATA);
50205e3289cSYoungGyoun 					} SOCKQ_FOREACH_END;
50305e3289cSYoungGyoun 				} else { /* cur_stream->side == MOS_SIDE_SVR */
50476404edcSAsim Jamshed 					SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
50576404edcSAsim Jamshed 						HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side,
50676404edcSAsim Jamshed 							       NULL, MOS_ON_CONN_NEW_DATA);
50776404edcSAsim Jamshed 					} SOCKQ_FOREACH_END;
50876404edcSAsim Jamshed 				}
50976404edcSAsim Jamshed 			}
51005e3289cSYoungGyoun 		}
51176404edcSAsim Jamshed 		if (offset >= mtcpq->size)
51276404edcSAsim Jamshed 			offset = 0;
51376404edcSAsim Jamshed 	}
51476404edcSAsim Jamshed }
51576404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
51676404edcSAsim Jamshed static inline void
FlushEpollEvents(mtcp_manager_t mtcp,uint32_t cur_ts)51776404edcSAsim Jamshed FlushEpollEvents(mtcp_manager_t mtcp, uint32_t cur_ts)
51876404edcSAsim Jamshed {
51976404edcSAsim Jamshed 	struct mtcp_epoll *ep = mtcp->ep;
52076404edcSAsim Jamshed 	struct event_queue *usrq = ep->usr_queue;
52176404edcSAsim Jamshed 	struct event_queue *mtcpq = ep->mtcp_queue;
52276404edcSAsim Jamshed 
52376404edcSAsim Jamshed 	pthread_mutex_lock(&ep->epoll_lock);
52476404edcSAsim Jamshed 	if (ep->mtcp_queue->num_events > 0) {
52576404edcSAsim Jamshed 		/* while mtcp_queue have events */
52676404edcSAsim Jamshed 		/* and usr_queue is not full */
52776404edcSAsim Jamshed 		while (mtcpq->num_events > 0 && usrq->num_events < usrq->size) {
52876404edcSAsim Jamshed 			/* copy the event from mtcp_queue to usr_queue */
52976404edcSAsim Jamshed 			usrq->events[usrq->end++] = mtcpq->events[mtcpq->start++];
53076404edcSAsim Jamshed 
53176404edcSAsim Jamshed 			if (usrq->end >= usrq->size)
53276404edcSAsim Jamshed 				usrq->end = 0;
53376404edcSAsim Jamshed 			usrq->num_events++;
53476404edcSAsim Jamshed 
53576404edcSAsim Jamshed 			if (mtcpq->start >= mtcpq->size)
53676404edcSAsim Jamshed 				mtcpq->start = 0;
53776404edcSAsim Jamshed 			mtcpq->num_events--;
53876404edcSAsim Jamshed 		}
53976404edcSAsim Jamshed 	}
54076404edcSAsim Jamshed 
54176404edcSAsim Jamshed 	/* if there are pending events, wake up user */
54276404edcSAsim Jamshed 	if (ep->waiting && (ep->usr_queue->num_events > 0 ||
54376404edcSAsim Jamshed 				ep->usr_shadow_queue->num_events > 0)) {
54476404edcSAsim Jamshed 		STAT_COUNT(mtcp->runstat.rounds_epoll);
54576404edcSAsim Jamshed 		TRACE_EPOLL("Broadcasting events. num: %d, cur_ts: %u, prev_ts: %u\n",
54676404edcSAsim Jamshed 				ep->usr_queue->num_events, cur_ts, mtcp->ts_last_event);
54776404edcSAsim Jamshed 		mtcp->ts_last_event = cur_ts;
54876404edcSAsim Jamshed 		ep->stat.wakes++;
54976404edcSAsim Jamshed 		pthread_cond_signal(&ep->epoll_cond);
55076404edcSAsim Jamshed 	}
55176404edcSAsim Jamshed 	pthread_mutex_unlock(&ep->epoll_lock);
55276404edcSAsim Jamshed }
55376404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
55476404edcSAsim Jamshed static inline void
HandleApplicationCalls(mtcp_manager_t mtcp,uint32_t cur_ts)55576404edcSAsim Jamshed HandleApplicationCalls(mtcp_manager_t mtcp, uint32_t cur_ts)
55676404edcSAsim Jamshed {
55776404edcSAsim Jamshed 	tcp_stream *stream;
55876404edcSAsim Jamshed 	int cnt, max_cnt;
55976404edcSAsim Jamshed 	int handled, delayed;
56076404edcSAsim Jamshed 	int control, send, ack;
56176404edcSAsim Jamshed 
56276404edcSAsim Jamshed 	/* connect handling */
56376404edcSAsim Jamshed 	while ((stream = StreamDequeue(mtcp->connectq))) {
56476404edcSAsim Jamshed 		if (stream->state != TCP_ST_SYN_SENT) {
56576404edcSAsim Jamshed 			TRACE_INFO("Got a connection request from app with state: %s",
56676404edcSAsim Jamshed 				   TCPStateToString(stream));
56776404edcSAsim Jamshed 			exit(EXIT_FAILURE);
56876404edcSAsim Jamshed 		} else {
56976404edcSAsim Jamshed 			stream->cb_events |= MOS_ON_CONN_START |
57076404edcSAsim Jamshed 				MOS_ON_TCP_STATE_CHANGE;
57176404edcSAsim Jamshed 			/* if monitor is on... */
57276404edcSAsim Jamshed 			if (stream->pair_stream != NULL)
57376404edcSAsim Jamshed 				stream->pair_stream->cb_events |=
57476404edcSAsim Jamshed 					MOS_ON_CONN_START;
57576404edcSAsim Jamshed 		}
57676404edcSAsim Jamshed 		AddtoControlList(mtcp, stream, cur_ts);
57776404edcSAsim Jamshed 	}
57876404edcSAsim Jamshed 
57976404edcSAsim Jamshed 	/* send queue handling */
58076404edcSAsim Jamshed 	while ((stream = StreamDequeue(mtcp->sendq))) {
58176404edcSAsim Jamshed 		stream->sndvar->on_sendq = FALSE;
58276404edcSAsim Jamshed 		AddtoSendList(mtcp, stream);
58376404edcSAsim Jamshed 	}
58476404edcSAsim Jamshed 
58576404edcSAsim Jamshed 	/* ack queue handling */
58676404edcSAsim Jamshed 	while ((stream = StreamDequeue(mtcp->ackq))) {
58776404edcSAsim Jamshed 		stream->sndvar->on_ackq = FALSE;
58876404edcSAsim Jamshed 		EnqueueACK(mtcp, stream, cur_ts, ACK_OPT_AGGREGATE);
58976404edcSAsim Jamshed 	}
59076404edcSAsim Jamshed 
59176404edcSAsim Jamshed 	/* close handling */
59276404edcSAsim Jamshed 	handled = delayed = 0;
59376404edcSAsim Jamshed 	control = send = ack = 0;
59476404edcSAsim Jamshed 	while ((stream = StreamDequeue(mtcp->closeq))) {
59576404edcSAsim Jamshed 		struct tcp_send_vars *sndvar = stream->sndvar;
59676404edcSAsim Jamshed 		sndvar->on_closeq = FALSE;
59776404edcSAsim Jamshed 
59876404edcSAsim Jamshed 		if (sndvar->sndbuf) {
59976404edcSAsim Jamshed 			sndvar->fss = sndvar->sndbuf->head_seq + sndvar->sndbuf->len;
60076404edcSAsim Jamshed 		} else {
60176404edcSAsim Jamshed 			sndvar->fss = stream->snd_nxt;
60276404edcSAsim Jamshed 		}
60376404edcSAsim Jamshed 
60476404edcSAsim Jamshed 		if (g_config.mos->tcp_timeout > 0)
60576404edcSAsim Jamshed 			RemoveFromTimeoutList(mtcp, stream);
60676404edcSAsim Jamshed 
60776404edcSAsim Jamshed 		if (stream->have_reset) {
60876404edcSAsim Jamshed 			handled++;
60976404edcSAsim Jamshed 			if (stream->state != TCP_ST_CLOSED_RSVD) {
61076404edcSAsim Jamshed 				stream->close_reason = TCP_RESET;
61176404edcSAsim Jamshed 				stream->state = TCP_ST_CLOSED_RSVD;
61276404edcSAsim Jamshed 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
61376404edcSAsim Jamshed 				TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id);
61476404edcSAsim Jamshed 				DestroyTCPStream(mtcp, stream);
61576404edcSAsim Jamshed 			} else {
61676404edcSAsim Jamshed 				TRACE_ERROR("Stream already closed.\n");
61776404edcSAsim Jamshed 			}
61876404edcSAsim Jamshed 
61976404edcSAsim Jamshed 		} else if (sndvar->on_control_list) {
62076404edcSAsim Jamshed 			sndvar->on_closeq_int = TRUE;
62176404edcSAsim Jamshed 			StreamInternalEnqueue(mtcp->closeq_int, stream);
62276404edcSAsim Jamshed 			delayed++;
62376404edcSAsim Jamshed 			if (sndvar->on_control_list)
62476404edcSAsim Jamshed 				control++;
62576404edcSAsim Jamshed 			if (sndvar->on_send_list)
62676404edcSAsim Jamshed 				send++;
62776404edcSAsim Jamshed 			if (sndvar->on_ack_list)
62876404edcSAsim Jamshed 				ack++;
62976404edcSAsim Jamshed 
63076404edcSAsim Jamshed 		} else if (sndvar->on_send_list || sndvar->on_ack_list) {
63176404edcSAsim Jamshed 			handled++;
63276404edcSAsim Jamshed 			if (stream->state == TCP_ST_ESTABLISHED) {
63376404edcSAsim Jamshed 				stream->state = TCP_ST_FIN_WAIT_1;
63476404edcSAsim Jamshed 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
63576404edcSAsim Jamshed 				TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id);
63676404edcSAsim Jamshed 
63776404edcSAsim Jamshed 			} else if (stream->state == TCP_ST_CLOSE_WAIT) {
63876404edcSAsim Jamshed 				stream->state = TCP_ST_LAST_ACK;
63976404edcSAsim Jamshed 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
64076404edcSAsim Jamshed 				TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id);
64176404edcSAsim Jamshed 			}
64276404edcSAsim Jamshed 			stream->control_list_waiting = TRUE;
64376404edcSAsim Jamshed 
64476404edcSAsim Jamshed 		} else if (stream->state != TCP_ST_CLOSED_RSVD) {
64576404edcSAsim Jamshed 			handled++;
64676404edcSAsim Jamshed 			if (stream->state == TCP_ST_ESTABLISHED) {
64776404edcSAsim Jamshed 				stream->state = TCP_ST_FIN_WAIT_1;
64876404edcSAsim Jamshed 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
64976404edcSAsim Jamshed 				TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id);
65076404edcSAsim Jamshed 
65176404edcSAsim Jamshed 			} else if (stream->state == TCP_ST_CLOSE_WAIT) {
65276404edcSAsim Jamshed 				stream->state = TCP_ST_LAST_ACK;
65376404edcSAsim Jamshed 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
65476404edcSAsim Jamshed 				TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id);
65576404edcSAsim Jamshed 			}
65676404edcSAsim Jamshed 			//sndvar->rto = TCP_FIN_RTO;
65776404edcSAsim Jamshed 			//UpdateRetransmissionTimer(mtcp, stream, mtcp->cur_ts);
65876404edcSAsim Jamshed 			AddtoControlList(mtcp, stream, cur_ts);
65976404edcSAsim Jamshed 		} else {
66076404edcSAsim Jamshed 			TRACE_ERROR("Already closed connection!\n");
66176404edcSAsim Jamshed 		}
66276404edcSAsim Jamshed 	}
66376404edcSAsim Jamshed 	TRACE_ROUND("Handling close connections. cnt: %d\n", cnt);
66476404edcSAsim Jamshed 
66576404edcSAsim Jamshed 	cnt = 0;
66676404edcSAsim Jamshed 	max_cnt = mtcp->closeq_int->count;
66776404edcSAsim Jamshed 	while (cnt++ < max_cnt) {
66876404edcSAsim Jamshed 		stream = StreamInternalDequeue(mtcp->closeq_int);
66976404edcSAsim Jamshed 
67076404edcSAsim Jamshed 		if (stream->sndvar->on_control_list) {
67176404edcSAsim Jamshed 			StreamInternalEnqueue(mtcp->closeq_int, stream);
67276404edcSAsim Jamshed 
67376404edcSAsim Jamshed 		} else if (stream->state != TCP_ST_CLOSED_RSVD) {
67476404edcSAsim Jamshed 			handled++;
67576404edcSAsim Jamshed 			stream->sndvar->on_closeq_int = FALSE;
67676404edcSAsim Jamshed 			if (stream->state == TCP_ST_ESTABLISHED) {
67776404edcSAsim Jamshed 				stream->state = TCP_ST_FIN_WAIT_1;
67876404edcSAsim Jamshed 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
67976404edcSAsim Jamshed 				TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id);
68076404edcSAsim Jamshed 
68176404edcSAsim Jamshed 			} else if (stream->state == TCP_ST_CLOSE_WAIT) {
68276404edcSAsim Jamshed 				stream->state = TCP_ST_LAST_ACK;
68376404edcSAsim Jamshed 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
68476404edcSAsim Jamshed 				TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id);
68576404edcSAsim Jamshed 			}
68676404edcSAsim Jamshed 			AddtoControlList(mtcp, stream, cur_ts);
68776404edcSAsim Jamshed 		} else {
68876404edcSAsim Jamshed 			stream->sndvar->on_closeq_int = FALSE;
68976404edcSAsim Jamshed 			TRACE_ERROR("Already closed connection!\n");
69076404edcSAsim Jamshed 		}
69176404edcSAsim Jamshed 	}
69276404edcSAsim Jamshed 
69376404edcSAsim Jamshed 	/* reset handling */
69476404edcSAsim Jamshed 	while ((stream = StreamDequeue(mtcp->resetq))) {
69576404edcSAsim Jamshed 		stream->sndvar->on_resetq = FALSE;
69676404edcSAsim Jamshed 
69776404edcSAsim Jamshed 		if (g_config.mos->tcp_timeout > 0)
69876404edcSAsim Jamshed 			RemoveFromTimeoutList(mtcp, stream);
69976404edcSAsim Jamshed 
70076404edcSAsim Jamshed 		if (stream->have_reset) {
70176404edcSAsim Jamshed 			if (stream->state != TCP_ST_CLOSED_RSVD) {
70276404edcSAsim Jamshed 				stream->close_reason = TCP_RESET;
70376404edcSAsim Jamshed 				stream->state = TCP_ST_CLOSED_RSVD;
70476404edcSAsim Jamshed 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
70576404edcSAsim Jamshed 				TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id);
70676404edcSAsim Jamshed 				DestroyTCPStream(mtcp, stream);
70776404edcSAsim Jamshed 			} else {
70876404edcSAsim Jamshed 				TRACE_ERROR("Stream already closed.\n");
70976404edcSAsim Jamshed 			}
71076404edcSAsim Jamshed 
71176404edcSAsim Jamshed 		} else if (stream->sndvar->on_control_list ||
71276404edcSAsim Jamshed 				stream->sndvar->on_send_list || stream->sndvar->on_ack_list) {
71376404edcSAsim Jamshed 			/* wait until all the queues are flushed */
71476404edcSAsim Jamshed 			stream->sndvar->on_resetq_int = TRUE;
71576404edcSAsim Jamshed 			StreamInternalEnqueue(mtcp->resetq_int, stream);
71676404edcSAsim Jamshed 
71776404edcSAsim Jamshed 		} else {
71876404edcSAsim Jamshed 			if (stream->state != TCP_ST_CLOSED_RSVD) {
71976404edcSAsim Jamshed 				stream->close_reason = TCP_ACTIVE_CLOSE;
72076404edcSAsim Jamshed 				stream->state = TCP_ST_CLOSED_RSVD;
72176404edcSAsim Jamshed 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
72276404edcSAsim Jamshed 				TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id);
72376404edcSAsim Jamshed 				AddtoControlList(mtcp, stream, cur_ts);
72476404edcSAsim Jamshed 			} else {
72576404edcSAsim Jamshed 				TRACE_ERROR("Stream already closed.\n");
72676404edcSAsim Jamshed 			}
72776404edcSAsim Jamshed 		}
72876404edcSAsim Jamshed 	}
72976404edcSAsim Jamshed 	TRACE_ROUND("Handling reset connections. cnt: %d\n", cnt);
73076404edcSAsim Jamshed 
73176404edcSAsim Jamshed 	cnt = 0;
73276404edcSAsim Jamshed 	max_cnt = mtcp->resetq_int->count;
73376404edcSAsim Jamshed 	while (cnt++ < max_cnt) {
73476404edcSAsim Jamshed 		stream = StreamInternalDequeue(mtcp->resetq_int);
73576404edcSAsim Jamshed 
73676404edcSAsim Jamshed 		if (stream->sndvar->on_control_list ||
73776404edcSAsim Jamshed 				stream->sndvar->on_send_list || stream->sndvar->on_ack_list) {
73876404edcSAsim Jamshed 			/* wait until all the queues are flushed */
73976404edcSAsim Jamshed 			StreamInternalEnqueue(mtcp->resetq_int, stream);
74076404edcSAsim Jamshed 
74176404edcSAsim Jamshed 		} else {
74276404edcSAsim Jamshed 			stream->sndvar->on_resetq_int = FALSE;
74376404edcSAsim Jamshed 
74476404edcSAsim Jamshed 			if (stream->state != TCP_ST_CLOSED_RSVD) {
74576404edcSAsim Jamshed 				stream->close_reason = TCP_ACTIVE_CLOSE;
74676404edcSAsim Jamshed 				stream->state = TCP_ST_CLOSED_RSVD;
74776404edcSAsim Jamshed 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
74876404edcSAsim Jamshed 				TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id);
74976404edcSAsim Jamshed 				AddtoControlList(mtcp, stream, cur_ts);
75076404edcSAsim Jamshed 			} else {
75176404edcSAsim Jamshed 				TRACE_ERROR("Stream already closed.\n");
75276404edcSAsim Jamshed 			}
75376404edcSAsim Jamshed 		}
75476404edcSAsim Jamshed 	}
75576404edcSAsim Jamshed 
75676404edcSAsim Jamshed 	/* destroy streams in destroyq */
75776404edcSAsim Jamshed 	while ((stream = StreamDequeue(mtcp->destroyq))) {
75876404edcSAsim Jamshed 		DestroyTCPStream(mtcp, stream);
75976404edcSAsim Jamshed 	}
76076404edcSAsim Jamshed 
76176404edcSAsim Jamshed 	mtcp->wakeup_flag = FALSE;
76276404edcSAsim Jamshed }
76376404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
76476404edcSAsim Jamshed static inline void
WritePacketsToChunks(mtcp_manager_t mtcp,uint32_t cur_ts)76576404edcSAsim Jamshed WritePacketsToChunks(mtcp_manager_t mtcp, uint32_t cur_ts)
76676404edcSAsim Jamshed {
76776404edcSAsim Jamshed 	int thresh = g_config.mos->max_concurrency;
76876404edcSAsim Jamshed 	int i;
76976404edcSAsim Jamshed 
77076404edcSAsim Jamshed 	/* Set the threshold to g_config.mos->max_concurrency to send ACK immediately */
77176404edcSAsim Jamshed 	/* Otherwise, set to appropriate value (e.g. thresh) */
77276404edcSAsim Jamshed 	assert(mtcp->g_sender != NULL);
77376404edcSAsim Jamshed 	if (mtcp->g_sender->control_list_cnt)
77476404edcSAsim Jamshed 		WriteTCPControlList(mtcp, mtcp->g_sender, cur_ts, thresh);
77576404edcSAsim Jamshed 	if (mtcp->g_sender->ack_list_cnt)
77676404edcSAsim Jamshed 		WriteTCPACKList(mtcp, mtcp->g_sender, cur_ts, thresh);
77776404edcSAsim Jamshed 	if (mtcp->g_sender->send_list_cnt)
77876404edcSAsim Jamshed 		WriteTCPDataList(mtcp, mtcp->g_sender, cur_ts, thresh);
77976404edcSAsim Jamshed 
78076404edcSAsim Jamshed 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
78176404edcSAsim Jamshed 		assert(mtcp->n_sender[i] != NULL);
78276404edcSAsim Jamshed 		if (mtcp->n_sender[i]->control_list_cnt)
78376404edcSAsim Jamshed 			WriteTCPControlList(mtcp, mtcp->n_sender[i], cur_ts, thresh);
78476404edcSAsim Jamshed 		if (mtcp->n_sender[i]->ack_list_cnt)
78576404edcSAsim Jamshed 			WriteTCPACKList(mtcp, mtcp->n_sender[i], cur_ts, thresh);
78676404edcSAsim Jamshed 		if (mtcp->n_sender[i]->send_list_cnt)
78776404edcSAsim Jamshed 			WriteTCPDataList(mtcp, mtcp->n_sender[i], cur_ts, thresh);
78876404edcSAsim Jamshed 	}
78976404edcSAsim Jamshed }
79076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
79176404edcSAsim Jamshed #if TESTING
79276404edcSAsim Jamshed static int
DestroyRemainingFlows(mtcp_manager_t mtcp)79376404edcSAsim Jamshed DestroyRemainingFlows(mtcp_manager_t mtcp)
79476404edcSAsim Jamshed {
79576404edcSAsim Jamshed 	struct hashtable *ht = mtcp->tcp_flow_table;
79676404edcSAsim Jamshed 	tcp_stream *walk;
79776404edcSAsim Jamshed 	int cnt, i;
79876404edcSAsim Jamshed 
79976404edcSAsim Jamshed 	cnt = 0;
80076404edcSAsim Jamshed 
80176404edcSAsim Jamshed 	thread_printf(mtcp, mtcp->log_fp,
80276404edcSAsim Jamshed 			"CPU %d: Flushing remaining flows.\n", mtcp->ctx->cpu);
80376404edcSAsim Jamshed 
80476404edcSAsim Jamshed 	for (i = 0; i < NUM_BINS; i++) {
80576404edcSAsim Jamshed 		TAILQ_FOREACH(walk, &ht->ht_table[i], rcvvar->he_link) {
80676404edcSAsim Jamshed 			thread_printf(mtcp, mtcp->log_fp,
80776404edcSAsim Jamshed 					"CPU %d: Destroying stream %d\n", mtcp->ctx->cpu, walk->id);
808a5e1a556SAsim Jamshed #ifdef DUMP_STREAM
80976404edcSAsim Jamshed 			DumpStream(mtcp, walk);
810a5e1a556SAsim Jamshed #endif
81176404edcSAsim Jamshed 			DestroyTCPStream(mtcp, walk);
81276404edcSAsim Jamshed 			cnt++;
81376404edcSAsim Jamshed 		}
81476404edcSAsim Jamshed 	}
81576404edcSAsim Jamshed 
81676404edcSAsim Jamshed 	return cnt;
81776404edcSAsim Jamshed }
81876404edcSAsim Jamshed #endif
81976404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
82076404edcSAsim Jamshed static void
InterruptApplication(mtcp_manager_t mtcp)82176404edcSAsim Jamshed InterruptApplication(mtcp_manager_t mtcp)
82276404edcSAsim Jamshed {
82376404edcSAsim Jamshed 	/* interrupt if the mtcp_epoll_wait() is waiting */
82476404edcSAsim Jamshed 	if (mtcp->ep) {
82576404edcSAsim Jamshed 		pthread_mutex_lock(&mtcp->ep->epoll_lock);
82676404edcSAsim Jamshed 		if (mtcp->ep->waiting) {
82776404edcSAsim Jamshed 			pthread_cond_signal(&mtcp->ep->epoll_cond);
82876404edcSAsim Jamshed 		}
82976404edcSAsim Jamshed 		pthread_mutex_unlock(&mtcp->ep->epoll_lock);
83076404edcSAsim Jamshed 	}
83176404edcSAsim Jamshed 	/* interrupt if the accept() is waiting */
83276404edcSAsim Jamshed 	if (mtcp->listener) {
83376404edcSAsim Jamshed 		if (mtcp->listener->socket) {
83476404edcSAsim Jamshed 			pthread_mutex_lock(&mtcp->listener->accept_lock);
83576404edcSAsim Jamshed 			if (!(mtcp->listener->socket->opts & MTCP_NONBLOCK)) {
83676404edcSAsim Jamshed 				pthread_cond_signal(&mtcp->listener->accept_cond);
83776404edcSAsim Jamshed 			}
83876404edcSAsim Jamshed 			pthread_mutex_unlock(&mtcp->listener->accept_lock);
83976404edcSAsim Jamshed 		}
84076404edcSAsim Jamshed 	}
84176404edcSAsim Jamshed }
84276404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
84376404edcSAsim Jamshed void
RunPassiveLoop(mtcp_manager_t mtcp)84476404edcSAsim Jamshed RunPassiveLoop(mtcp_manager_t mtcp)
84576404edcSAsim Jamshed {
84676404edcSAsim Jamshed 	sem_wait(&g_done_sem[mtcp->ctx->cpu]);
84776404edcSAsim Jamshed 	sem_destroy(&g_done_sem[mtcp->ctx->cpu]);
84876404edcSAsim Jamshed 	return;
84976404edcSAsim Jamshed }
85076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
85176404edcSAsim Jamshed static void
RunMainLoop(struct mtcp_thread_context * ctx)85276404edcSAsim Jamshed RunMainLoop(struct mtcp_thread_context *ctx)
85376404edcSAsim Jamshed {
85476404edcSAsim Jamshed 	mtcp_manager_t mtcp = ctx->mtcp_manager;
85576404edcSAsim Jamshed 	int i;
85676404edcSAsim Jamshed 	int recv_cnt;
85776404edcSAsim Jamshed 	int rx_inf, tx_inf;
85876404edcSAsim Jamshed 	struct timeval cur_ts = {0};
85976404edcSAsim Jamshed 	uint32_t ts, ts_prev;
86076404edcSAsim Jamshed 
86176404edcSAsim Jamshed #if TIME_STAT
86276404edcSAsim Jamshed 	struct timeval prev_ts, processing_ts, tcheck_ts,
86376404edcSAsim Jamshed 				   epoll_ts, handle_ts, xmit_ts, select_ts;
86476404edcSAsim Jamshed #endif
86576404edcSAsim Jamshed 	int thresh;
86676404edcSAsim Jamshed 
86776404edcSAsim Jamshed 	gettimeofday(&cur_ts, NULL);
86876404edcSAsim Jamshed 
86976404edcSAsim Jamshed 	TRACE_DBG("CPU %d: mtcp thread running.\n", ctx->cpu);
87076404edcSAsim Jamshed 
87176404edcSAsim Jamshed #if TIME_STAT
87276404edcSAsim Jamshed 	prev_ts = cur_ts;
87376404edcSAsim Jamshed 	InitStatCounter(&mtcp->rtstat.round);
87476404edcSAsim Jamshed 	InitStatCounter(&mtcp->rtstat.processing);
87576404edcSAsim Jamshed 	InitStatCounter(&mtcp->rtstat.tcheck);
87676404edcSAsim Jamshed 	InitStatCounter(&mtcp->rtstat.epoll);
87776404edcSAsim Jamshed 	InitStatCounter(&mtcp->rtstat.handle);
87876404edcSAsim Jamshed 	InitStatCounter(&mtcp->rtstat.xmit);
87976404edcSAsim Jamshed 	InitStatCounter(&mtcp->rtstat.select);
88076404edcSAsim Jamshed #endif
88176404edcSAsim Jamshed 
88276404edcSAsim Jamshed 	ts = ts_prev = 0;
88376404edcSAsim Jamshed 	while ((!ctx->done || mtcp->flow_cnt) && !ctx->exit) {
88476404edcSAsim Jamshed 
88576404edcSAsim Jamshed 		STAT_COUNT(mtcp->runstat.rounds);
88676404edcSAsim Jamshed 		recv_cnt = 0;
88776404edcSAsim Jamshed 		gettimeofday(&cur_ts, NULL);
88876404edcSAsim Jamshed #if TIME_STAT
88976404edcSAsim Jamshed 		/* measure the inter-round delay */
89076404edcSAsim Jamshed 		UpdateStatCounter(&mtcp->rtstat.round, TimeDiffUs(&cur_ts, &prev_ts));
89176404edcSAsim Jamshed 		prev_ts = cur_ts;
89276404edcSAsim Jamshed #endif
89376404edcSAsim Jamshed 
89476404edcSAsim Jamshed 		ts = TIMEVAL_TO_TS(&cur_ts);
89576404edcSAsim Jamshed 		mtcp->cur_ts = ts;
89676404edcSAsim Jamshed 
89776404edcSAsim Jamshed 		for (rx_inf = 0; rx_inf < g_config.mos->netdev_table->num; rx_inf++) {
89876404edcSAsim Jamshed 
89976404edcSAsim Jamshed 			recv_cnt = mtcp->iom->recv_pkts(ctx, rx_inf);
90076404edcSAsim Jamshed 			STAT_COUNT(mtcp->runstat.rounds_rx_try);
90176404edcSAsim Jamshed 
90276404edcSAsim Jamshed 			for (i = 0; i < recv_cnt; i++) {
90376404edcSAsim Jamshed 				uint16_t len;
90476404edcSAsim Jamshed 				uint8_t *pktbuf;
90576404edcSAsim Jamshed 				pktbuf = mtcp->iom->get_rptr(mtcp->ctx, rx_inf, i, &len);
90676404edcSAsim Jamshed 				ProcessPacket(mtcp, rx_inf, i, ts, pktbuf, len);
90776404edcSAsim Jamshed 			}
90805e3289cSYoungGyoun 
90976404edcSAsim Jamshed 		}
91076404edcSAsim Jamshed 		STAT_COUNT(mtcp->runstat.rounds_rx);
91176404edcSAsim Jamshed 
91276404edcSAsim Jamshed #if TIME_STAT
91376404edcSAsim Jamshed 		gettimeofday(&processing_ts, NULL);
91476404edcSAsim Jamshed 		UpdateStatCounter(&mtcp->rtstat.processing,
91576404edcSAsim Jamshed 				TimeDiffUs(&processing_ts, &cur_ts));
91676404edcSAsim Jamshed #endif /* TIME_STAT */
91776404edcSAsim Jamshed 
91876404edcSAsim Jamshed 		/* Handle user defined timeout */
91976404edcSAsim Jamshed 		struct timer *walk, *tmp;
92076404edcSAsim Jamshed 		for (walk = TAILQ_FIRST(&mtcp->timer_list); walk != NULL; walk = tmp) {
92176404edcSAsim Jamshed 			tmp = TAILQ_NEXT(walk, timer_link);
92276404edcSAsim Jamshed 			if (TIMEVAL_LT(&cur_ts, &walk->exp))
92376404edcSAsim Jamshed 				break;
92476404edcSAsim Jamshed 
92576404edcSAsim Jamshed 			struct mtcp_context mctx = {.cpu = ctx->cpu};
92676404edcSAsim Jamshed 			walk->cb(&mctx, walk->id, 0, 0 /* FIXME */, NULL);
92776404edcSAsim Jamshed 			DelTimer(mtcp, walk);
92876404edcSAsim Jamshed 		}
92976404edcSAsim Jamshed 
93076404edcSAsim Jamshed 		/* interaction with application */
93176404edcSAsim Jamshed 		if (mtcp->flow_cnt > 0) {
93276404edcSAsim Jamshed 
93376404edcSAsim Jamshed 			/* check retransmission timeout and timewait expire */
93476404edcSAsim Jamshed #if 0
93576404edcSAsim Jamshed 			thresh = (int)mtcp->flow_cnt / (TS_TO_USEC(PER_STREAM_TCHECK));
93676404edcSAsim Jamshed 			assert(thresh >= 0);
93776404edcSAsim Jamshed 			if (thresh == 0)
93876404edcSAsim Jamshed 				thresh = 1;
93976404edcSAsim Jamshed 			if (recv_cnt > 0 && thresh > recv_cnt)
94076404edcSAsim Jamshed 				thresh = recv_cnt;
94176404edcSAsim Jamshed #else
94276404edcSAsim Jamshed 			thresh = g_config.mos->max_concurrency;
94376404edcSAsim Jamshed #endif
94476404edcSAsim Jamshed 
94576404edcSAsim Jamshed 			/* Eunyoung, you may fix this later
94676404edcSAsim Jamshed 			 * if there is no rcv packet, we will send as much as possible
94776404edcSAsim Jamshed 			 */
94876404edcSAsim Jamshed 			if (thresh == -1)
94976404edcSAsim Jamshed 				thresh = g_config.mos->max_concurrency;
95076404edcSAsim Jamshed 
95176404edcSAsim Jamshed 			CheckRtmTimeout(mtcp, ts, thresh);
95276404edcSAsim Jamshed 			CheckTimewaitExpire(mtcp, ts, thresh);
95376404edcSAsim Jamshed 
95476404edcSAsim Jamshed 			if (g_config.mos->tcp_timeout > 0 && ts != ts_prev) {
95576404edcSAsim Jamshed 				CheckConnectionTimeout(mtcp, ts, thresh);
95676404edcSAsim Jamshed 			}
95776404edcSAsim Jamshed 
95876404edcSAsim Jamshed #if TIME_STAT
95976404edcSAsim Jamshed 		}
96076404edcSAsim Jamshed 		gettimeofday(&tcheck_ts, NULL);
96176404edcSAsim Jamshed 		UpdateStatCounter(&mtcp->rtstat.tcheck,
96276404edcSAsim Jamshed 				TimeDiffUs(&tcheck_ts, &processing_ts));
96376404edcSAsim Jamshed 
96476404edcSAsim Jamshed 		if (mtcp->flow_cnt > 0) {
96576404edcSAsim Jamshed #endif /* TIME_STAT */
96676404edcSAsim Jamshed 
96776404edcSAsim Jamshed 		}
96876404edcSAsim Jamshed 
96976404edcSAsim Jamshed 		/*
97076404edcSAsim Jamshed 		 * before flushing epoll events, call monitor events for
97176404edcSAsim Jamshed 		 * all registered `read` events
97276404edcSAsim Jamshed 		 */
97376404edcSAsim Jamshed 		if (mtcp->num_msp > 0)
97476404edcSAsim Jamshed 			/* call this when only a standalone monitor is running */
97576404edcSAsim Jamshed 			FlushMonitorReadEvents(mtcp);
97676404edcSAsim Jamshed 
97776404edcSAsim Jamshed 		/* if epoll is in use, flush all the queued events */
97876404edcSAsim Jamshed 		if (mtcp->ep) {
97976404edcSAsim Jamshed 			FlushBufferedReadEvents(mtcp);
98076404edcSAsim Jamshed 			FlushEpollEvents(mtcp, ts);
98176404edcSAsim Jamshed 		}
98276404edcSAsim Jamshed #if TIME_STAT
98376404edcSAsim Jamshed 		gettimeofday(&epoll_ts, NULL);
98476404edcSAsim Jamshed 		UpdateStatCounter(&mtcp->rtstat.epoll,
98576404edcSAsim Jamshed 				TimeDiffUs(&epoll_ts, &tcheck_ts));
98676404edcSAsim Jamshed #endif /* TIME_STAT */
98776404edcSAsim Jamshed 
98876404edcSAsim Jamshed 		if (end_app_exists && mtcp->flow_cnt > 0) {
98976404edcSAsim Jamshed 			/* handle stream queues  */
99076404edcSAsim Jamshed 			HandleApplicationCalls(mtcp, ts);
99176404edcSAsim Jamshed 		}
99276404edcSAsim Jamshed 
99376404edcSAsim Jamshed #if TIME_STAT
99476404edcSAsim Jamshed 		gettimeofday(&handle_ts, NULL);
99576404edcSAsim Jamshed 		UpdateStatCounter(&mtcp->rtstat.handle,
99676404edcSAsim Jamshed 				TimeDiffUs(&handle_ts, &epoll_ts));
99776404edcSAsim Jamshed #endif /* TIME_STAT */
99876404edcSAsim Jamshed 
99976404edcSAsim Jamshed 		WritePacketsToChunks(mtcp, ts);
100076404edcSAsim Jamshed 
100176404edcSAsim Jamshed 		/* send packets from write buffer */
10027631e025SAsim Jamshed 		/* Send until tx is available */
100376404edcSAsim Jamshed 		int num_dev = g_config.mos->netdev_table->num;
100476404edcSAsim Jamshed 		if (likely(mtcp->iom->send_pkts != NULL))
100576404edcSAsim Jamshed 			for (tx_inf = 0; tx_inf < num_dev; tx_inf++) {
100676404edcSAsim Jamshed 				mtcp->iom->send_pkts(ctx, tx_inf);
100776404edcSAsim Jamshed 			}
100876404edcSAsim Jamshed 
100976404edcSAsim Jamshed #if TIME_STAT
101076404edcSAsim Jamshed 		gettimeofday(&xmit_ts, NULL);
101176404edcSAsim Jamshed 		UpdateStatCounter(&mtcp->rtstat.xmit,
101276404edcSAsim Jamshed 				TimeDiffUs(&xmit_ts, &handle_ts));
101376404edcSAsim Jamshed #endif /* TIME_STAT */
101476404edcSAsim Jamshed 
101576404edcSAsim Jamshed 		if (ts != ts_prev) {
101676404edcSAsim Jamshed 			ts_prev = ts;
101776404edcSAsim Jamshed #ifdef NETSTAT
101876404edcSAsim Jamshed 			if (ctx->cpu == printer) {
101976404edcSAsim Jamshed #ifdef RUN_ARP
102076404edcSAsim Jamshed 				ARPTimer(mtcp, ts);
102176404edcSAsim Jamshed #endif
1022265cb675SAsim Jamshed #ifdef NETSTAT
102376404edcSAsim Jamshed 				PrintNetworkStats(mtcp, ts);
1024265cb675SAsim Jamshed #endif
102576404edcSAsim Jamshed 			}
102676404edcSAsim Jamshed #endif /* NETSTAT */
102776404edcSAsim Jamshed 		}
102876404edcSAsim Jamshed 
102976404edcSAsim Jamshed 		if (mtcp->iom->select)
103076404edcSAsim Jamshed 			mtcp->iom->select(ctx);
103176404edcSAsim Jamshed 
103276404edcSAsim Jamshed 		if (ctx->interrupt) {
103376404edcSAsim Jamshed 			InterruptApplication(mtcp);
103476404edcSAsim Jamshed 		}
103576404edcSAsim Jamshed 	}
103676404edcSAsim Jamshed 
103776404edcSAsim Jamshed #if TESTING
103876404edcSAsim Jamshed 	DestroyRemainingFlows(mtcp);
103976404edcSAsim Jamshed #endif
104076404edcSAsim Jamshed 
104176404edcSAsim Jamshed 	TRACE_DBG("MTCP thread %d out of main loop.\n", ctx->cpu);
104276404edcSAsim Jamshed 	/* flush logs */
104376404edcSAsim Jamshed 	flush_log_data(mtcp);
104476404edcSAsim Jamshed 	TRACE_DBG("MTCP thread %d flushed logs.\n", ctx->cpu);
104576404edcSAsim Jamshed 	InterruptApplication(mtcp);
104676404edcSAsim Jamshed 	TRACE_INFO("MTCP thread %d finished.\n", ctx->cpu);
104776404edcSAsim Jamshed }
104876404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
104976404edcSAsim Jamshed struct mtcp_sender *
CreateMTCPSender(int ifidx)105076404edcSAsim Jamshed CreateMTCPSender(int ifidx)
105176404edcSAsim Jamshed {
105276404edcSAsim Jamshed 	struct mtcp_sender *sender;
105376404edcSAsim Jamshed 
105476404edcSAsim Jamshed 	sender = (struct mtcp_sender *)calloc(1, sizeof(struct mtcp_sender));
105576404edcSAsim Jamshed 	if (!sender) {
105676404edcSAsim Jamshed 		return NULL;
105776404edcSAsim Jamshed 	}
105876404edcSAsim Jamshed 
105976404edcSAsim Jamshed 	sender->ifidx = ifidx;
106076404edcSAsim Jamshed 
106176404edcSAsim Jamshed 	TAILQ_INIT(&sender->control_list);
106276404edcSAsim Jamshed 	TAILQ_INIT(&sender->send_list);
106376404edcSAsim Jamshed 	TAILQ_INIT(&sender->ack_list);
106476404edcSAsim Jamshed 
106576404edcSAsim Jamshed 	sender->control_list_cnt = 0;
106676404edcSAsim Jamshed 	sender->send_list_cnt = 0;
106776404edcSAsim Jamshed 	sender->ack_list_cnt = 0;
106876404edcSAsim Jamshed 
106976404edcSAsim Jamshed 	return sender;
107076404edcSAsim Jamshed }
107176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
107276404edcSAsim Jamshed void
DestroyMTCPSender(struct mtcp_sender * sender)107376404edcSAsim Jamshed DestroyMTCPSender(struct mtcp_sender *sender)
107476404edcSAsim Jamshed {
107576404edcSAsim Jamshed 	free(sender);
107676404edcSAsim Jamshed }
107776404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
107876404edcSAsim Jamshed static mtcp_manager_t
InitializeMTCPManager(struct mtcp_thread_context * ctx)107976404edcSAsim Jamshed InitializeMTCPManager(struct mtcp_thread_context* ctx)
108076404edcSAsim Jamshed {
108176404edcSAsim Jamshed 	mtcp_manager_t mtcp;
108276404edcSAsim Jamshed 	char log_name[MAX_FILE_NAME];
108376404edcSAsim Jamshed 	int i;
108476404edcSAsim Jamshed 
108576404edcSAsim Jamshed 	posix_seq_srand((unsigned)pthread_self());
108676404edcSAsim Jamshed 
108776404edcSAsim Jamshed 	mtcp = (mtcp_manager_t)calloc(1, sizeof(struct mtcp_manager));
108876404edcSAsim Jamshed 	if (!mtcp) {
108976404edcSAsim Jamshed 		perror("malloc");
10908c9e1184SAsim Jamshed 		TRACE_ERROR("Failed to allocate mtcp_manager.\n");
109176404edcSAsim Jamshed 		return NULL;
109276404edcSAsim Jamshed 	}
109376404edcSAsim Jamshed 	g_mtcp[ctx->cpu] = mtcp;
109476404edcSAsim Jamshed 
109576404edcSAsim Jamshed 	mtcp->tcp_flow_table = CreateHashtable();
109676404edcSAsim Jamshed 	if (!mtcp->tcp_flow_table) {
109776404edcSAsim Jamshed 		CTRACE_ERROR("Falied to allocate tcp flow table.\n");
109876404edcSAsim Jamshed 		return NULL;
109976404edcSAsim Jamshed 	}
110076404edcSAsim Jamshed 
110176404edcSAsim Jamshed #ifdef HUGEPAGE
110276404edcSAsim Jamshed #define	IS_HUGEPAGE 1
110376404edcSAsim Jamshed #else
110476404edcSAsim Jamshed #define	IS_HUGEPAGE 0
110576404edcSAsim Jamshed #endif
110676404edcSAsim Jamshed 	if (mon_app_exists) {
110776404edcSAsim Jamshed 		/* initialize event callback */
110876404edcSAsim Jamshed #ifdef NEWEV
110976404edcSAsim Jamshed 		InitEvent(mtcp);
111076404edcSAsim Jamshed #else
111176404edcSAsim Jamshed 		InitEvent(mtcp, NUM_EV_TABLE);
111276404edcSAsim Jamshed #endif
111376404edcSAsim Jamshed 	}
111476404edcSAsim Jamshed 
111576404edcSAsim Jamshed 	if (!(mtcp->bufseg_pool = MPCreate(sizeof(tcpbufseg_t),
111676404edcSAsim Jamshed 			sizeof(tcpbufseg_t) * g_config.mos->max_concurrency *
111776404edcSAsim Jamshed 			((g_config.mos->rmem_size - 1) / UNITBUFSIZE + 1), 0))) {
111876404edcSAsim Jamshed 		TRACE_ERROR("Failed to allocate ev_table pool\n");
111976404edcSAsim Jamshed 		exit(0);
112076404edcSAsim Jamshed 	}
112176404edcSAsim Jamshed 	if (!(mtcp->sockent_pool = MPCreate(sizeof(struct sockent),
112276404edcSAsim Jamshed 			sizeof(struct sockent) * g_config.mos->max_concurrency * 3, 0))) {
112376404edcSAsim Jamshed 		TRACE_ERROR("Failed to allocate ev_table pool\n");
112476404edcSAsim Jamshed 		exit(0);
112576404edcSAsim Jamshed 	}
112676404edcSAsim Jamshed #ifdef USE_TIMER_POOL
112776404edcSAsim Jamshed 	if (!(mtcp->timer_pool = MPCreate(sizeof(struct timer),
112876404edcSAsim Jamshed 					  sizeof(struct timer) * g_config.mos->max_concurrency * 10, 0))) {
112976404edcSAsim Jamshed 		TRACE_ERROR("Failed to allocate ev_table pool\n");
113076404edcSAsim Jamshed 		exit(0);
113176404edcSAsim Jamshed 	}
113276404edcSAsim Jamshed #endif
113376404edcSAsim Jamshed 	mtcp->flow_pool = MPCreate(sizeof(tcp_stream),
113476404edcSAsim Jamshed 								sizeof(tcp_stream) * g_config.mos->max_concurrency, IS_HUGEPAGE);
113576404edcSAsim Jamshed 	if (!mtcp->flow_pool) {
113676404edcSAsim Jamshed 		CTRACE_ERROR("Failed to allocate tcp flow pool.\n");
113776404edcSAsim Jamshed 		return NULL;
113876404edcSAsim Jamshed 	}
113976404edcSAsim Jamshed 	mtcp->rv_pool = MPCreate(sizeof(struct tcp_recv_vars),
114076404edcSAsim Jamshed 			sizeof(struct tcp_recv_vars) * g_config.mos->max_concurrency, IS_HUGEPAGE);
114176404edcSAsim Jamshed 	if (!mtcp->rv_pool) {
114276404edcSAsim Jamshed 		CTRACE_ERROR("Failed to allocate tcp recv variable pool.\n");
114376404edcSAsim Jamshed 		return NULL;
114476404edcSAsim Jamshed 	}
114576404edcSAsim Jamshed 	mtcp->sv_pool = MPCreate(sizeof(struct tcp_send_vars),
114676404edcSAsim Jamshed 			sizeof(struct tcp_send_vars) * g_config.mos->max_concurrency, IS_HUGEPAGE);
114776404edcSAsim Jamshed 	if (!mtcp->sv_pool) {
114876404edcSAsim Jamshed 		CTRACE_ERROR("Failed to allocate tcp send variable pool.\n");
114976404edcSAsim Jamshed 		return NULL;
115076404edcSAsim Jamshed 	}
115176404edcSAsim Jamshed 
115276404edcSAsim Jamshed 	mtcp->rbm_snd = SBManagerCreate(g_config.mos->wmem_size, g_config.mos->no_ring_buffers,
115376404edcSAsim Jamshed 					g_config.mos->max_concurrency);
115476404edcSAsim Jamshed 	if (!mtcp->rbm_snd) {
115576404edcSAsim Jamshed 		CTRACE_ERROR("Failed to create send ring buffer.\n");
115676404edcSAsim Jamshed 		return NULL;
115776404edcSAsim Jamshed 	}
115876404edcSAsim Jamshed 
115976404edcSAsim Jamshed 	mtcp->smap = (socket_map_t)calloc(g_config.mos->max_concurrency, sizeof(struct socket_map));
116076404edcSAsim Jamshed 	if (!mtcp->smap) {
116176404edcSAsim Jamshed 		perror("calloc");
116276404edcSAsim Jamshed 		CTRACE_ERROR("Failed to allocate memory for stream map.\n");
116376404edcSAsim Jamshed 		return NULL;
116476404edcSAsim Jamshed 	}
116576404edcSAsim Jamshed 
116676404edcSAsim Jamshed 	if (mon_app_exists) {
116776404edcSAsim Jamshed 		mtcp->msmap = (socket_map_t)calloc(g_config.mos->max_concurrency, sizeof(struct socket_map));
116876404edcSAsim Jamshed 		if (!mtcp->msmap) {
116976404edcSAsim Jamshed 			perror("calloc");
117076404edcSAsim Jamshed 			CTRACE_ERROR("Failed to allocate memory for monitor stream map.\n");
117176404edcSAsim Jamshed 			return NULL;
117276404edcSAsim Jamshed 		}
117376404edcSAsim Jamshed 
117476404edcSAsim Jamshed 		for (i = 0; i < g_config.mos->max_concurrency; i++) {
117576404edcSAsim Jamshed 			mtcp->msmap[i].monitor_stream = calloc(1, sizeof(struct mon_stream));
117676404edcSAsim Jamshed 			if (!mtcp->msmap[i].monitor_stream) {
117776404edcSAsim Jamshed 				perror("calloc");
117876404edcSAsim Jamshed 				CTRACE_ERROR("Failed to allocate memory for monitr stream map\n");
117976404edcSAsim Jamshed 				return NULL;
118076404edcSAsim Jamshed 			}
118176404edcSAsim Jamshed 		}
118276404edcSAsim Jamshed 	}
118376404edcSAsim Jamshed 
118476404edcSAsim Jamshed 	TAILQ_INIT(&mtcp->timer_list);
118576404edcSAsim Jamshed 	TAILQ_INIT(&mtcp->monitors);
118676404edcSAsim Jamshed 
118776404edcSAsim Jamshed 	TAILQ_INIT(&mtcp->free_smap);
118876404edcSAsim Jamshed 	for (i = 0; i < g_config.mos->max_concurrency; i++) {
118976404edcSAsim Jamshed 		mtcp->smap[i].id = i;
119076404edcSAsim Jamshed 		mtcp->smap[i].socktype = MOS_SOCK_UNUSED;
119176404edcSAsim Jamshed 		memset(&mtcp->smap[i].saddr, 0, sizeof(struct sockaddr_in));
119276404edcSAsim Jamshed 		mtcp->smap[i].stream = NULL;
119376404edcSAsim Jamshed 		TAILQ_INSERT_TAIL(&mtcp->free_smap, &mtcp->smap[i], link);
119476404edcSAsim Jamshed 	}
119576404edcSAsim Jamshed 
119676404edcSAsim Jamshed 	if (mon_app_exists) {
119776404edcSAsim Jamshed 		TAILQ_INIT(&mtcp->free_msmap);
119876404edcSAsim Jamshed 		for (i = 0; i < g_config.mos->max_concurrency; i++) {
119976404edcSAsim Jamshed 			mtcp->msmap[i].id = i;
120076404edcSAsim Jamshed 			mtcp->msmap[i].socktype = MOS_SOCK_UNUSED;
120176404edcSAsim Jamshed 			memset(&mtcp->msmap[i].saddr, 0, sizeof(struct sockaddr_in));
120276404edcSAsim Jamshed 			TAILQ_INSERT_TAIL(&mtcp->free_msmap, &mtcp->msmap[i], link);
120376404edcSAsim Jamshed 		}
120476404edcSAsim Jamshed 	}
120576404edcSAsim Jamshed 
120676404edcSAsim Jamshed 	mtcp->ctx = ctx;
120776404edcSAsim Jamshed 	mtcp->ep = NULL;
120876404edcSAsim Jamshed 
120976404edcSAsim Jamshed 	snprintf(log_name, MAX_FILE_NAME, "%s/"LOG_FILE_NAME"_%d",
121076404edcSAsim Jamshed 			g_config.mos->mos_log, ctx->cpu);
121176404edcSAsim Jamshed 	mtcp->log_fp = fopen(log_name, "w+");
121276404edcSAsim Jamshed 	if (!mtcp->log_fp) {
121376404edcSAsim Jamshed 		perror("fopen");
121476404edcSAsim Jamshed 		CTRACE_ERROR("Failed to create file for logging. (%s)\n", log_name);
121576404edcSAsim Jamshed 		return NULL;
121676404edcSAsim Jamshed 	}
121776404edcSAsim Jamshed 	mtcp->sp_fd = g_logctx[ctx->cpu]->pair_sp_fd;
121876404edcSAsim Jamshed 	mtcp->logger = g_logctx[ctx->cpu];
121976404edcSAsim Jamshed 
122076404edcSAsim Jamshed 	mtcp->connectq = CreateStreamQueue(BACKLOG_SIZE);
122176404edcSAsim Jamshed 	if (!mtcp->connectq) {
122276404edcSAsim Jamshed 		CTRACE_ERROR("Failed to create connect queue.\n");
122376404edcSAsim Jamshed 		return NULL;
122476404edcSAsim Jamshed 	}
122576404edcSAsim Jamshed 	mtcp->sendq = CreateStreamQueue(g_config.mos->max_concurrency);
122676404edcSAsim Jamshed 	if (!mtcp->sendq) {
122776404edcSAsim Jamshed 		CTRACE_ERROR("Failed to create send queue.\n");
122876404edcSAsim Jamshed 		return NULL;
122976404edcSAsim Jamshed 	}
123076404edcSAsim Jamshed 	mtcp->ackq = CreateStreamQueue(g_config.mos->max_concurrency);
123176404edcSAsim Jamshed 	if (!mtcp->ackq) {
123276404edcSAsim Jamshed 		CTRACE_ERROR("Failed to create ack queue.\n");
123376404edcSAsim Jamshed 		return NULL;
123476404edcSAsim Jamshed 	}
123576404edcSAsim Jamshed 	mtcp->closeq = CreateStreamQueue(g_config.mos->max_concurrency);
123676404edcSAsim Jamshed 	if (!mtcp->closeq) {
123776404edcSAsim Jamshed 		CTRACE_ERROR("Failed to create close queue.\n");
123876404edcSAsim Jamshed 		return NULL;
123976404edcSAsim Jamshed 	}
124076404edcSAsim Jamshed 	mtcp->closeq_int = CreateInternalStreamQueue(g_config.mos->max_concurrency);
124176404edcSAsim Jamshed 	if (!mtcp->closeq_int) {
124276404edcSAsim Jamshed 		CTRACE_ERROR("Failed to create close queue.\n");
124376404edcSAsim Jamshed 		return NULL;
124476404edcSAsim Jamshed 	}
124576404edcSAsim Jamshed 	mtcp->resetq = CreateStreamQueue(g_config.mos->max_concurrency);
124676404edcSAsim Jamshed 	if (!mtcp->resetq) {
124776404edcSAsim Jamshed 		CTRACE_ERROR("Failed to create reset queue.\n");
124876404edcSAsim Jamshed 		return NULL;
124976404edcSAsim Jamshed 	}
125076404edcSAsim Jamshed 	mtcp->resetq_int = CreateInternalStreamQueue(g_config.mos->max_concurrency);
125176404edcSAsim Jamshed 	if (!mtcp->resetq_int) {
125276404edcSAsim Jamshed 		CTRACE_ERROR("Failed to create reset queue.\n");
125376404edcSAsim Jamshed 		return NULL;
125476404edcSAsim Jamshed 	}
125576404edcSAsim Jamshed 	mtcp->destroyq = CreateStreamQueue(g_config.mos->max_concurrency);
125676404edcSAsim Jamshed 	if (!mtcp->destroyq) {
125776404edcSAsim Jamshed 		CTRACE_ERROR("Failed to create destroy queue.\n");
125876404edcSAsim Jamshed 		return NULL;
125976404edcSAsim Jamshed 	}
126076404edcSAsim Jamshed 
126176404edcSAsim Jamshed 	mtcp->g_sender = CreateMTCPSender(-1);
126276404edcSAsim Jamshed 	if (!mtcp->g_sender) {
126376404edcSAsim Jamshed 		CTRACE_ERROR("Failed to create global sender structure.\n");
126476404edcSAsim Jamshed 		return NULL;
126576404edcSAsim Jamshed 	}
126676404edcSAsim Jamshed 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
126776404edcSAsim Jamshed 		mtcp->n_sender[i] = CreateMTCPSender(i);
126876404edcSAsim Jamshed 		if (!mtcp->n_sender[i]) {
126976404edcSAsim Jamshed 			CTRACE_ERROR("Failed to create per-nic sender structure.\n");
127076404edcSAsim Jamshed 			return NULL;
127176404edcSAsim Jamshed 		}
127276404edcSAsim Jamshed 	}
127376404edcSAsim Jamshed 
127476404edcSAsim Jamshed 	mtcp->rto_store = InitRTOHashstore();
127576404edcSAsim Jamshed 	TAILQ_INIT(&mtcp->timewait_list);
127676404edcSAsim Jamshed 	TAILQ_INIT(&mtcp->timeout_list);
127776404edcSAsim Jamshed 
127876404edcSAsim Jamshed 	return mtcp;
127976404edcSAsim Jamshed }
128076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
128176404edcSAsim Jamshed static void *
MTCPRunThread(void * arg)128276404edcSAsim Jamshed MTCPRunThread(void *arg)
128376404edcSAsim Jamshed {
128476404edcSAsim Jamshed 	mctx_t mctx = (mctx_t)arg;
128576404edcSAsim Jamshed 	int cpu = mctx->cpu;
128676404edcSAsim Jamshed 	int working;
128776404edcSAsim Jamshed 	struct mtcp_manager *mtcp;
128876404edcSAsim Jamshed 	struct mtcp_thread_context *ctx;
128976404edcSAsim Jamshed 
129076404edcSAsim Jamshed 	/* affinitize the thread to this core first */
129176404edcSAsim Jamshed 	mtcp_core_affinitize(cpu);
129276404edcSAsim Jamshed 
129376404edcSAsim Jamshed 	/* memory alloc after core affinitization would use local memory
129476404edcSAsim Jamshed 	   most time */
129576404edcSAsim Jamshed 	ctx = calloc(1, sizeof(*ctx));
129676404edcSAsim Jamshed 	if (!ctx) {
129776404edcSAsim Jamshed 		perror("calloc");
129876404edcSAsim Jamshed 		TRACE_ERROR("Failed to calloc mtcp context.\n");
129976404edcSAsim Jamshed 		exit(-1);
130076404edcSAsim Jamshed 	}
130176404edcSAsim Jamshed 	ctx->thread = pthread_self();
130276404edcSAsim Jamshed 	ctx->cpu = cpu;
130376404edcSAsim Jamshed 	mtcp = ctx->mtcp_manager = InitializeMTCPManager(ctx);
130476404edcSAsim Jamshed 	if (!mtcp) {
130576404edcSAsim Jamshed 		TRACE_ERROR("Failed to initialize mtcp manager.\n");
130676404edcSAsim Jamshed 		exit(-1);
130776404edcSAsim Jamshed 	}
130876404edcSAsim Jamshed 
130976404edcSAsim Jamshed 	/* assign mtcp context's underlying I/O module */
131076404edcSAsim Jamshed 	mtcp->iom = current_iomodule_func;
131176404edcSAsim Jamshed 
131276404edcSAsim Jamshed 	/* I/O initializing */
131376404edcSAsim Jamshed 	if (mtcp->iom->init_handle)
131476404edcSAsim Jamshed 		mtcp->iom->init_handle(ctx);
131576404edcSAsim Jamshed 
131676404edcSAsim Jamshed 	if (pthread_mutex_init(&ctx->flow_pool_lock, NULL)) {
131776404edcSAsim Jamshed 		perror("pthread_mutex_init of ctx->flow_pool_lock\n");
131876404edcSAsim Jamshed 		exit(-1);
131976404edcSAsim Jamshed 	}
132076404edcSAsim Jamshed 
132176404edcSAsim Jamshed 	if (pthread_mutex_init(&ctx->socket_pool_lock, NULL)) {
132276404edcSAsim Jamshed 		perror("pthread_mutex_init of ctx->socket_pool_lock\n");
132376404edcSAsim Jamshed 		exit(-1);
132476404edcSAsim Jamshed 	}
132576404edcSAsim Jamshed 
132676404edcSAsim Jamshed 	SQ_LOCK_INIT(&ctx->connect_lock, "ctx->connect_lock", exit(-1));
132776404edcSAsim Jamshed 	SQ_LOCK_INIT(&ctx->close_lock, "ctx->close_lock", exit(-1));
132876404edcSAsim Jamshed 	SQ_LOCK_INIT(&ctx->reset_lock, "ctx->reset_lock", exit(-1));
132976404edcSAsim Jamshed 	SQ_LOCK_INIT(&ctx->sendq_lock, "ctx->sendq_lock", exit(-1));
133076404edcSAsim Jamshed 	SQ_LOCK_INIT(&ctx->ackq_lock, "ctx->ackq_lock", exit(-1));
133176404edcSAsim Jamshed 	SQ_LOCK_INIT(&ctx->destroyq_lock, "ctx->destroyq_lock", exit(-1));
133276404edcSAsim Jamshed 
133376404edcSAsim Jamshed 	/* remember this context pointer for signal processing */
133476404edcSAsim Jamshed 	g_pctx[cpu] = ctx;
133576404edcSAsim Jamshed 	mlockall(MCL_CURRENT);
133676404edcSAsim Jamshed 
133776404edcSAsim Jamshed 	// attach (nic device, queue)
133876404edcSAsim Jamshed 	working = AttachDevice(ctx);
133976404edcSAsim Jamshed 	if (working != 0) {
134076404edcSAsim Jamshed 		sem_post(&g_init_sem[ctx->cpu]);
134176404edcSAsim Jamshed 		TRACE_DBG("MTCP thread %d finished. Not attached any device\n", ctx->cpu);
134276404edcSAsim Jamshed 		pthread_exit(NULL);
134376404edcSAsim Jamshed 	}
134476404edcSAsim Jamshed 
134576404edcSAsim Jamshed 	TRACE_DBG("CPU %d: initialization finished.\n", cpu);
134676404edcSAsim Jamshed 	sem_post(&g_init_sem[ctx->cpu]);
134776404edcSAsim Jamshed 
134876404edcSAsim Jamshed 	/* start the main loop */
134976404edcSAsim Jamshed 	RunMainLoop(ctx);
135076404edcSAsim Jamshed 
135176404edcSAsim Jamshed 	TRACE_DBG("MTCP thread %d finished.\n", ctx->cpu);
135276404edcSAsim Jamshed 
135376404edcSAsim Jamshed 	/* signaling mTCP thread is done */
135476404edcSAsim Jamshed 	sem_post(&g_done_sem[mctx->cpu]);
135576404edcSAsim Jamshed 
135676404edcSAsim Jamshed 	//pthread_exit(NULL);
135776404edcSAsim Jamshed 	return 0;
135876404edcSAsim Jamshed }
135976404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
136005e3289cSYoungGyoun #ifdef ENABLE_DPDK
MTCPDPDKRunThread(void * arg)136105e3289cSYoungGyoun static int MTCPDPDKRunThread(void *arg)
136205e3289cSYoungGyoun {
136305e3289cSYoungGyoun 	MTCPRunThread(arg);
136405e3289cSYoungGyoun 	return 0;
136505e3289cSYoungGyoun }
136605e3289cSYoungGyoun #endif /* !ENABLE_DPDK */
136705e3289cSYoungGyoun /*----------------------------------------------------------------------------*/
136876404edcSAsim Jamshed mctx_t
mtcp_create_context(int cpu)136976404edcSAsim Jamshed mtcp_create_context(int cpu)
137076404edcSAsim Jamshed {
137176404edcSAsim Jamshed 	mctx_t mctx;
137276404edcSAsim Jamshed 	int ret;
137376404edcSAsim Jamshed 
137476404edcSAsim Jamshed 	if (cpu >=  g_config.mos->num_cores) {
137576404edcSAsim Jamshed 		TRACE_ERROR("Failed initialize new mtcp context. "
137676404edcSAsim Jamshed 					"Requested cpu id %d exceed the number of cores %d configured to use.\n",
137776404edcSAsim Jamshed 					cpu, g_config.mos->num_cores);
137876404edcSAsim Jamshed 		return NULL;
137976404edcSAsim Jamshed 	}
138076404edcSAsim Jamshed 
138176404edcSAsim Jamshed         /* check if mtcp_create_context() was already initialized */
138276404edcSAsim Jamshed         if (g_logctx[cpu] != NULL) {
138376404edcSAsim Jamshed                 TRACE_ERROR("%s was already initialized before!\n",
138476404edcSAsim Jamshed                             __FUNCTION__);
138576404edcSAsim Jamshed                 return NULL;
138676404edcSAsim Jamshed         }
138776404edcSAsim Jamshed 
138876404edcSAsim Jamshed 	ret = sem_init(&g_init_sem[cpu], 0, 0);
138976404edcSAsim Jamshed 	if (ret) {
139076404edcSAsim Jamshed 		TRACE_ERROR("Failed initialize init_sem.\n");
139176404edcSAsim Jamshed 		return NULL;
139276404edcSAsim Jamshed 	}
139376404edcSAsim Jamshed 
139476404edcSAsim Jamshed 	ret = sem_init(&g_done_sem[cpu], 0, 0);
139576404edcSAsim Jamshed 	if (ret) {
139676404edcSAsim Jamshed 		TRACE_ERROR("Failed initialize done_sem.\n");
139776404edcSAsim Jamshed 		return NULL;
139876404edcSAsim Jamshed 	}
139976404edcSAsim Jamshed 
140076404edcSAsim Jamshed 	mctx = (mctx_t)calloc(1, sizeof(struct mtcp_context));
140176404edcSAsim Jamshed 	if (!mctx) {
140276404edcSAsim Jamshed 		TRACE_ERROR("Failed to allocate memory for mtcp_context.\n");
140376404edcSAsim Jamshed 		return NULL;
140476404edcSAsim Jamshed 	}
140576404edcSAsim Jamshed 	mctx->cpu = cpu;
140676404edcSAsim Jamshed 	g_ctx[cpu] = mctx;
140776404edcSAsim Jamshed 
140876404edcSAsim Jamshed 	/* initialize logger */
140976404edcSAsim Jamshed 	g_logctx[cpu] = (struct log_thread_context *)
141076404edcSAsim Jamshed 			calloc(1, sizeof(struct log_thread_context));
141176404edcSAsim Jamshed 	if (!g_logctx[cpu]) {
141276404edcSAsim Jamshed 		perror("malloc");
141376404edcSAsim Jamshed 		TRACE_ERROR("Failed to allocate memory for log thread context.\n");
141476404edcSAsim Jamshed 		return NULL;
141576404edcSAsim Jamshed 	}
141676404edcSAsim Jamshed 	InitLogThreadContext(g_logctx[cpu], cpu);
141776404edcSAsim Jamshed 	if (pthread_create(&log_thread[cpu],
141876404edcSAsim Jamshed 			   NULL, ThreadLogMain, (void *)g_logctx[cpu])) {
141976404edcSAsim Jamshed 		perror("pthread_create");
142076404edcSAsim Jamshed 		TRACE_ERROR("Failed to create log thread\n");
142176404edcSAsim Jamshed 		return NULL;
142276404edcSAsim Jamshed 	}
142376404edcSAsim Jamshed 
142405e3289cSYoungGyoun 	/* use rte_eal_remote_launch() for DPDK
142505e3289cSYoungGyoun 	   (worker/slave threads are already initialized by rte_eal_init()) */
142605e3289cSYoungGyoun #ifdef ENABLE_DPDK
142705e3289cSYoungGyoun 	/* Wake up mTCP threads (wake up I/O threads) */
142805e3289cSYoungGyoun 	if (current_iomodule_func == &dpdk_module_func) {
142905e3289cSYoungGyoun 		int master;
143005e3289cSYoungGyoun 		master = rte_get_master_lcore();
143105e3289cSYoungGyoun 		if (master == cpu) {
143205e3289cSYoungGyoun 			lcore_config[master].ret = 0;
143305e3289cSYoungGyoun 			lcore_config[master].state = FINISHED;
143476404edcSAsim Jamshed 			if (pthread_create(&g_thread[cpu],
143576404edcSAsim Jamshed 					   NULL, MTCPRunThread, (void *)mctx) != 0) {
143676404edcSAsim Jamshed 				TRACE_ERROR("pthread_create of mtcp thread failed!\n");
143776404edcSAsim Jamshed 				return NULL;
143876404edcSAsim Jamshed 			}
143905e3289cSYoungGyoun 		} else
144005e3289cSYoungGyoun 			rte_eal_remote_launch(MTCPDPDKRunThread, mctx, cpu);
144105e3289cSYoungGyoun 	} else
144205e3289cSYoungGyoun #endif /* !ENABLE_DPDK */
144305e3289cSYoungGyoun 		{
144405e3289cSYoungGyoun 			if (pthread_create(&g_thread[cpu],
144505e3289cSYoungGyoun 					   NULL, MTCPRunThread, (void *)mctx) != 0) {
144605e3289cSYoungGyoun 				TRACE_ERROR("pthread_create of mtcp thread failed!\n");
144705e3289cSYoungGyoun 				return NULL;
144805e3289cSYoungGyoun 			}
144905e3289cSYoungGyoun 		}
145076404edcSAsim Jamshed 
145176404edcSAsim Jamshed 	sem_wait(&g_init_sem[cpu]);
145276404edcSAsim Jamshed 	sem_destroy(&g_init_sem[cpu]);
145376404edcSAsim Jamshed 
145476404edcSAsim Jamshed 	running[cpu] = TRUE;
145576404edcSAsim Jamshed 
145676404edcSAsim Jamshed #ifdef NETSTAT
145776404edcSAsim Jamshed #if NETSTAT_TOTAL
145876404edcSAsim Jamshed 	if (printer < 0) {
145976404edcSAsim Jamshed 		printer = cpu;
146076404edcSAsim Jamshed 		TRACE_INFO("CPU %d is in charge of printing stats.\n", printer);
146176404edcSAsim Jamshed 	}
146276404edcSAsim Jamshed #endif
146376404edcSAsim Jamshed #endif
146476404edcSAsim Jamshed 
146576404edcSAsim Jamshed 	return mctx;
146676404edcSAsim Jamshed }
146776404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
14681c9bc629SAsim Jamshed int
mtcp_destroy_context(mctx_t mctx)14691c9bc629SAsim Jamshed mtcp_destroy_context(mctx_t mctx)
14701c9bc629SAsim Jamshed {
14711c9bc629SAsim Jamshed 	struct mtcp_thread_context *ctx = g_pctx[mctx->cpu];
14721c9bc629SAsim Jamshed 	if (ctx != NULL)
14731c9bc629SAsim Jamshed 		ctx->done = 1;
14748c9e1184SAsim Jamshed 
14758c9e1184SAsim Jamshed 	struct mtcp_context m;
14768c9e1184SAsim Jamshed 	m.cpu = mctx->cpu;
14778c9e1184SAsim Jamshed 	mtcp_free_context(&m);
14788c9e1184SAsim Jamshed 
14791c9bc629SAsim Jamshed 	free(mctx);
14801c9bc629SAsim Jamshed 
14811c9bc629SAsim Jamshed 	return 0;
14821c9bc629SAsim Jamshed }
14831c9bc629SAsim Jamshed /*----------------------------------------------------------------------------*/
14841c9bc629SAsim Jamshed void
mtcp_free_context(mctx_t mctx)14851c9bc629SAsim Jamshed mtcp_free_context(mctx_t mctx)
148676404edcSAsim Jamshed {
148776404edcSAsim Jamshed 	struct mtcp_thread_context *ctx = g_pctx[mctx->cpu];
148876404edcSAsim Jamshed 	struct mtcp_manager *mtcp = ctx->mtcp_manager;
148976404edcSAsim Jamshed 	struct log_thread_context *log_ctx = mtcp->logger;
149076404edcSAsim Jamshed 	int ret, i;
149176404edcSAsim Jamshed 
14921c9bc629SAsim Jamshed 	TRACE_DBG("CPU %d: mtcp_free_context()\n", mctx->cpu);
149376404edcSAsim Jamshed 
14941c9bc629SAsim Jamshed 	if (g_pctx[mctx->cpu] == NULL) return;
149505e3289cSYoungGyoun 
149676404edcSAsim Jamshed 	/* close all stream sockets that are still open */
149776404edcSAsim Jamshed 	if (!ctx->exit) {
149876404edcSAsim Jamshed 		for (i = 0; i < g_config.mos->max_concurrency; i++) {
149976404edcSAsim Jamshed 			if (mtcp->smap[i].socktype == MOS_SOCK_STREAM) {
150076404edcSAsim Jamshed 				TRACE_DBG("Closing remaining socket %d (%s)\n",
150176404edcSAsim Jamshed 						i, TCPStateToString(mtcp->smap[i].stream));
1502a5e1a556SAsim Jamshed #ifdef DUMP_STREAM
150376404edcSAsim Jamshed 				DumpStream(mtcp, mtcp->smap[i].stream);
1504a5e1a556SAsim Jamshed #endif
150576404edcSAsim Jamshed 				mtcp_close(mctx, i);
150676404edcSAsim Jamshed 			}
150776404edcSAsim Jamshed 		}
150876404edcSAsim Jamshed 	}
150976404edcSAsim Jamshed 
151076404edcSAsim Jamshed 	ctx->done = 1;
151176404edcSAsim Jamshed 	ctx->exit = 1;
15128c9e1184SAsim Jamshed 
151305e3289cSYoungGyoun #ifdef ENABLE_DPDK
151405e3289cSYoungGyoun 	if (current_iomodule_func == &dpdk_module_func) {
151505e3289cSYoungGyoun 		int master = rte_get_master_lcore();
151605e3289cSYoungGyoun 		if (master == mctx->cpu)
151705e3289cSYoungGyoun 			pthread_join(g_thread[mctx->cpu], NULL);
151805e3289cSYoungGyoun 		else
151905e3289cSYoungGyoun 			rte_eal_wait_lcore(mctx->cpu);
152005e3289cSYoungGyoun 	} else
152105e3289cSYoungGyoun #endif /* !ENABLE_DPDK */
152205e3289cSYoungGyoun 		{
152305e3289cSYoungGyoun 			pthread_join(g_thread[mctx->cpu], NULL);
15248c9e1184SAsim Jamshed 		}
152576404edcSAsim Jamshed 
152676404edcSAsim Jamshed 	TRACE_INFO("MTCP thread %d joined.\n", mctx->cpu);
152705e3289cSYoungGyoun 
152876404edcSAsim Jamshed 	running[mctx->cpu] = FALSE;
152976404edcSAsim Jamshed 
153076404edcSAsim Jamshed #ifdef NETSTAT
153176404edcSAsim Jamshed #if NETSTAT_TOTAL
153276404edcSAsim Jamshed 	if (printer == mctx->cpu) {
153376404edcSAsim Jamshed 		for (i = 0; i < num_cpus; i++) {
153476404edcSAsim Jamshed 			if (i != mctx->cpu && running[i]) {
153576404edcSAsim Jamshed 				printer = i;
153676404edcSAsim Jamshed 				break;
153776404edcSAsim Jamshed 			}
153876404edcSAsim Jamshed 		}
153976404edcSAsim Jamshed 	}
154076404edcSAsim Jamshed #endif
154176404edcSAsim Jamshed #endif
154276404edcSAsim Jamshed 
154376404edcSAsim Jamshed 	log_ctx->done = 1;
154476404edcSAsim Jamshed 	ret = write(log_ctx->pair_sp_fd, "F", 1);
154576404edcSAsim Jamshed 	if (ret != 1)
154676404edcSAsim Jamshed 		TRACE_ERROR("CPU %d: Fail to signal socket pair\n", mctx->cpu);
154776404edcSAsim Jamshed 
15488c9e1184SAsim Jamshed 	if ((ret = pthread_join(log_thread[ctx->cpu], NULL) != 0)) {
15498c9e1184SAsim Jamshed 	    TRACE_ERROR("pthread_join() returns error (errno = %s)\n", strerror(ret));
155005e3289cSYoungGyoun 		exit(-1);
15518c9e1184SAsim Jamshed 	}
15528c9e1184SAsim Jamshed 
155305e3289cSYoungGyoun 
155476404edcSAsim Jamshed 	fclose(mtcp->log_fp);
155576404edcSAsim Jamshed 	TRACE_LOG("Log thread %d joined.\n", mctx->cpu);
155676404edcSAsim Jamshed 
155776404edcSAsim Jamshed 	if (mtcp->connectq) {
155876404edcSAsim Jamshed 		DestroyStreamQueue(mtcp->connectq);
155976404edcSAsim Jamshed 		mtcp->connectq = NULL;
156076404edcSAsim Jamshed 	}
156176404edcSAsim Jamshed 	if (mtcp->sendq) {
156276404edcSAsim Jamshed 		DestroyStreamQueue(mtcp->sendq);
156376404edcSAsim Jamshed 		mtcp->sendq = NULL;
156476404edcSAsim Jamshed 	}
156576404edcSAsim Jamshed 	if (mtcp->ackq) {
156676404edcSAsim Jamshed 		DestroyStreamQueue(mtcp->ackq);
156776404edcSAsim Jamshed 		mtcp->ackq = NULL;
156876404edcSAsim Jamshed 	}
156976404edcSAsim Jamshed 	if (mtcp->closeq) {
157076404edcSAsim Jamshed 		DestroyStreamQueue(mtcp->closeq);
157176404edcSAsim Jamshed 		mtcp->closeq = NULL;
157276404edcSAsim Jamshed 	}
157376404edcSAsim Jamshed 	if (mtcp->closeq_int) {
157476404edcSAsim Jamshed 		DestroyInternalStreamQueue(mtcp->closeq_int);
157576404edcSAsim Jamshed 		mtcp->closeq_int = NULL;
157676404edcSAsim Jamshed 	}
157776404edcSAsim Jamshed 	if (mtcp->resetq) {
157876404edcSAsim Jamshed 		DestroyStreamQueue(mtcp->resetq);
157976404edcSAsim Jamshed 		mtcp->resetq = NULL;
158076404edcSAsim Jamshed 	}
158176404edcSAsim Jamshed 	if (mtcp->resetq_int) {
158276404edcSAsim Jamshed 		DestroyInternalStreamQueue(mtcp->resetq_int);
158376404edcSAsim Jamshed 		mtcp->resetq_int = NULL;
158476404edcSAsim Jamshed 	}
158576404edcSAsim Jamshed 	if (mtcp->destroyq) {
158676404edcSAsim Jamshed 		DestroyStreamQueue(mtcp->destroyq);
158776404edcSAsim Jamshed 		mtcp->destroyq = NULL;
158876404edcSAsim Jamshed 	}
158976404edcSAsim Jamshed 
159076404edcSAsim Jamshed 	DestroyMTCPSender(mtcp->g_sender);
159176404edcSAsim Jamshed 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
159276404edcSAsim Jamshed 		DestroyMTCPSender(mtcp->n_sender[i]);
159376404edcSAsim Jamshed 	}
159476404edcSAsim Jamshed 
159576404edcSAsim Jamshed 	MPDestroy(mtcp->rv_pool);
159676404edcSAsim Jamshed 	MPDestroy(mtcp->sv_pool);
159776404edcSAsim Jamshed 	MPDestroy(mtcp->flow_pool);
159876404edcSAsim Jamshed 
159976404edcSAsim Jamshed 	if (mtcp->ap) {
160076404edcSAsim Jamshed 		DestroyAddressPool(mtcp->ap);
16011c9bc629SAsim Jamshed 		mtcp->ap = NULL;
160276404edcSAsim Jamshed 	}
160376404edcSAsim Jamshed 
160476404edcSAsim Jamshed 	SQ_LOCK_DESTROY(&ctx->connect_lock);
160576404edcSAsim Jamshed 	SQ_LOCK_DESTROY(&ctx->close_lock);
160676404edcSAsim Jamshed 	SQ_LOCK_DESTROY(&ctx->reset_lock);
160776404edcSAsim Jamshed 	SQ_LOCK_DESTROY(&ctx->sendq_lock);
160876404edcSAsim Jamshed 	SQ_LOCK_DESTROY(&ctx->ackq_lock);
160976404edcSAsim Jamshed 	SQ_LOCK_DESTROY(&ctx->destroyq_lock);
161076404edcSAsim Jamshed 
161176404edcSAsim Jamshed 	//TRACE_INFO("MTCP thread %d destroyed.\n", mctx->cpu);
161276404edcSAsim Jamshed 	if (mtcp->iom->destroy_handle)
161376404edcSAsim Jamshed 		mtcp->iom->destroy_handle(ctx);
16141c9bc629SAsim Jamshed 	if (g_logctx[mctx->cpu]) {
16151c9bc629SAsim Jamshed 		free(g_logctx[mctx->cpu]);
16161c9bc629SAsim Jamshed 		g_logctx[mctx->cpu] = NULL;
16171c9bc629SAsim Jamshed 	}
16181c9bc629SAsim Jamshed 	free(ctx);
16191c9bc629SAsim Jamshed 	g_pctx[mctx->cpu] = NULL;
162076404edcSAsim Jamshed }
162176404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
162276404edcSAsim Jamshed mtcp_sighandler_t
mtcp_register_signal(int signum,mtcp_sighandler_t handler)162376404edcSAsim Jamshed mtcp_register_signal(int signum, mtcp_sighandler_t handler)
162476404edcSAsim Jamshed {
162576404edcSAsim Jamshed 	mtcp_sighandler_t prev;
162676404edcSAsim Jamshed 
162776404edcSAsim Jamshed 	if (signum == SIGINT) {
162876404edcSAsim Jamshed 		prev = app_signal_handler;
162976404edcSAsim Jamshed 		app_signal_handler = handler;
163076404edcSAsim Jamshed 	} else {
163176404edcSAsim Jamshed 		if ((prev = signal(signum, handler)) == SIG_ERR) {
163276404edcSAsim Jamshed 			perror("signal");
163376404edcSAsim Jamshed 			return SIG_ERR;
163476404edcSAsim Jamshed 		}
163576404edcSAsim Jamshed 	}
163676404edcSAsim Jamshed 
163776404edcSAsim Jamshed 	return prev;
163876404edcSAsim Jamshed }
163976404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
164076404edcSAsim Jamshed int
mtcp_getconf(struct mtcp_conf * conf)164176404edcSAsim Jamshed mtcp_getconf(struct mtcp_conf *conf)
164276404edcSAsim Jamshed {
164376404edcSAsim Jamshed 	int i, j;
164476404edcSAsim Jamshed 
16458a941c7eSAsim Jamshed 	if (!conf) {
16468a941c7eSAsim Jamshed 		errno = EINVAL;
164776404edcSAsim Jamshed 		return -1;
16488a941c7eSAsim Jamshed 	}
164976404edcSAsim Jamshed 
165076404edcSAsim Jamshed 	conf->num_cores = g_config.mos->num_cores;
165176404edcSAsim Jamshed 	conf->max_concurrency = g_config.mos->max_concurrency;
165276404edcSAsim Jamshed 	conf->cpu_mask = g_config.mos->cpu_mask;
165376404edcSAsim Jamshed 
165476404edcSAsim Jamshed 	conf->rcvbuf_size = g_config.mos->rmem_size;
165576404edcSAsim Jamshed 	conf->sndbuf_size = g_config.mos->wmem_size;
165676404edcSAsim Jamshed 
165776404edcSAsim Jamshed 	conf->tcp_timewait = g_config.mos->tcp_tw_interval;
165876404edcSAsim Jamshed 	conf->tcp_timeout = g_config.mos->tcp_timeout;
165976404edcSAsim Jamshed 
166076404edcSAsim Jamshed 	i = 0;
166176404edcSAsim Jamshed 	struct conf_block *bwalk;
166276404edcSAsim Jamshed 	TAILQ_FOREACH(bwalk, &g_config.app_blkh, link) {
166376404edcSAsim Jamshed 		struct app_conf *app_conf = (struct app_conf *)bwalk->conf;
166476404edcSAsim Jamshed 		for (j = 0; j < app_conf->app_argc; j++)
166576404edcSAsim Jamshed 			conf->app_argv[i][j] = app_conf->app_argv[j];
166676404edcSAsim Jamshed 		conf->app_argc[i] = app_conf->app_argc;
166776404edcSAsim Jamshed 		conf->app_cpu_mask[i] = app_conf->cpu_mask;
166876404edcSAsim Jamshed 		i++;
166976404edcSAsim Jamshed 	}
167076404edcSAsim Jamshed 	conf->num_app = i;
167176404edcSAsim Jamshed 
167276404edcSAsim Jamshed 	return 0;
167376404edcSAsim Jamshed }
167476404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
167576404edcSAsim Jamshed int
mtcp_setconf(const struct mtcp_conf * conf)167676404edcSAsim Jamshed mtcp_setconf(const struct mtcp_conf *conf)
167776404edcSAsim Jamshed {
167876404edcSAsim Jamshed 	if (!conf)
167976404edcSAsim Jamshed 		return -1;
168076404edcSAsim Jamshed 
168176404edcSAsim Jamshed 	g_config.mos->num_cores = conf->num_cores;
168276404edcSAsim Jamshed 	g_config.mos->max_concurrency = conf->max_concurrency;
168376404edcSAsim Jamshed 
168476404edcSAsim Jamshed 	g_config.mos->rmem_size = conf->rcvbuf_size;
168576404edcSAsim Jamshed 	g_config.mos->wmem_size = conf->sndbuf_size;
168676404edcSAsim Jamshed 
168776404edcSAsim Jamshed 	g_config.mos->tcp_tw_interval = conf->tcp_timewait;
168876404edcSAsim Jamshed 	g_config.mos->tcp_timeout = conf->tcp_timeout;
168976404edcSAsim Jamshed 
169076404edcSAsim Jamshed 	TRACE_CONFIG("Configuration updated by mtcp_setconf().\n");
169176404edcSAsim Jamshed 	//PrintConfiguration();
169276404edcSAsim Jamshed 
169376404edcSAsim Jamshed 	return 0;
169476404edcSAsim Jamshed }
169576404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
169676404edcSAsim Jamshed int
mtcp_init(const char * config_file)169776404edcSAsim Jamshed mtcp_init(const char *config_file)
169876404edcSAsim Jamshed {
169976404edcSAsim Jamshed 	int i;
170076404edcSAsim Jamshed 	int ret;
170176404edcSAsim Jamshed 
170276404edcSAsim Jamshed 	if (geteuid()) {
170376404edcSAsim Jamshed 		TRACE_CONFIG("[CAUTION] Run as root if mlock is necessary.\n");
1704dcdbbb98SAsim Jamshed #if defined(ENABLE_DPDK) || defined(ENABLE_NETMAP)
1705df3fae06SAsim Jamshed 		TRACE_CONFIG("[CAUTION] Run the app as root!\n");
1706df3fae06SAsim Jamshed 		exit(EXIT_FAILURE);
1707df3fae06SAsim Jamshed #endif
170876404edcSAsim Jamshed 	}
170976404edcSAsim Jamshed 
171076404edcSAsim Jamshed 	/* getting cpu and NIC */
171176404edcSAsim Jamshed 	num_cpus = GetNumCPUs();
171276404edcSAsim Jamshed 	assert(num_cpus >= 1);
171376404edcSAsim Jamshed 	for (i = 0; i < num_cpus; i++) {
171476404edcSAsim Jamshed 		g_mtcp[i] = NULL;
171576404edcSAsim Jamshed 		running[i] = FALSE;
171676404edcSAsim Jamshed 		sigint_cnt[i] = 0;
171776404edcSAsim Jamshed 	}
171876404edcSAsim Jamshed 
171976404edcSAsim Jamshed 	ret = LoadConfigurationUpperHalf(config_file);
172076404edcSAsim Jamshed 	if (ret) {
172176404edcSAsim Jamshed 		TRACE_CONFIG("Error occured while loading configuration.\n");
172276404edcSAsim Jamshed 		return -1;
172376404edcSAsim Jamshed 	}
172476404edcSAsim Jamshed 
172576404edcSAsim Jamshed #if defined(ENABLE_PSIO)
172676404edcSAsim Jamshed 	current_iomodule_func = &ps_module_func;
172776404edcSAsim Jamshed #elif defined(ENABLE_DPDK)
172876404edcSAsim Jamshed 	current_iomodule_func = &dpdk_module_func;
172976404edcSAsim Jamshed #elif defined(ENABLE_PCAP)
173076404edcSAsim Jamshed 	current_iomodule_func = &pcap_module_func;
1731522d5c66SAsim Jamshed #elif defined(ENABLE_NETMAP)
1732522d5c66SAsim Jamshed 	current_iomodule_func = &netmap_module_func;
173376404edcSAsim Jamshed #endif
173476404edcSAsim Jamshed 
173576404edcSAsim Jamshed 	if (current_iomodule_func->load_module_upper_half)
173676404edcSAsim Jamshed 		current_iomodule_func->load_module_upper_half();
173776404edcSAsim Jamshed 
173876404edcSAsim Jamshed 	LoadConfigurationLowerHalf();
173976404edcSAsim Jamshed 
174076404edcSAsim Jamshed 	//PrintConfiguration();
174176404edcSAsim Jamshed 
1742a5e1a556SAsim Jamshed 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
1743a5e1a556SAsim Jamshed 		ap[i] = CreateAddressPool(g_config.mos->netdev_table->ent[i]->ip_addr, 1);
1744a5e1a556SAsim Jamshed 		if (!ap[i]) {
1745a5e1a556SAsim Jamshed 			TRACE_CONFIG("Error occured while create address pool[%d]\n",
1746a5e1a556SAsim Jamshed 				     i);
174776404edcSAsim Jamshed 			return -1;
174876404edcSAsim Jamshed 		}
1749a5e1a556SAsim Jamshed         }
175076404edcSAsim Jamshed 
175176404edcSAsim Jamshed 	//PrintInterfaceInfo();
175276404edcSAsim Jamshed 	//PrintRoutingTable();
175376404edcSAsim Jamshed 	//PrintARPTable();
175476404edcSAsim Jamshed 	InitARPTable();
175576404edcSAsim Jamshed 
175676404edcSAsim Jamshed 	if (signal(SIGUSR1, HandleSignal) == SIG_ERR) {
175776404edcSAsim Jamshed 		perror("signal, SIGUSR1");
175876404edcSAsim Jamshed 		return -1;
175976404edcSAsim Jamshed 	}
176076404edcSAsim Jamshed 	if (signal(SIGINT, HandleSignal) == SIG_ERR) {
176176404edcSAsim Jamshed 		perror("signal, SIGINT");
176276404edcSAsim Jamshed 		return -1;
176376404edcSAsim Jamshed 	}
176476404edcSAsim Jamshed 	app_signal_handler = NULL;
176576404edcSAsim Jamshed 
176676404edcSAsim Jamshed 	printf("load_module(): %p\n", current_iomodule_func);
176776404edcSAsim Jamshed 	/* load system-wide io module specs */
176876404edcSAsim Jamshed 	if (current_iomodule_func->load_module_lower_half)
176976404edcSAsim Jamshed 		current_iomodule_func->load_module_lower_half();
177076404edcSAsim Jamshed 
177176404edcSAsim Jamshed 	GlobInitEvent();
177276404edcSAsim Jamshed 
177376404edcSAsim Jamshed 	PrintConf(&g_config);
177476404edcSAsim Jamshed 
177576404edcSAsim Jamshed 	return 0;
177676404edcSAsim Jamshed }
177776404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
177876404edcSAsim Jamshed int
mtcp_destroy()177976404edcSAsim Jamshed mtcp_destroy()
178076404edcSAsim Jamshed {
178176404edcSAsim Jamshed 	int i;
178276404edcSAsim Jamshed 
178376404edcSAsim Jamshed 	/* wait until all threads are closed */
178405e3289cSYoungGyoun 	/*
178576404edcSAsim Jamshed 	for (i = 0; i < num_cpus; i++) {
178676404edcSAsim Jamshed 		if (running[i]) {
178776404edcSAsim Jamshed 			if (pthread_join(g_thread[i], NULL) != 0)
178876404edcSAsim Jamshed 				return -1;
178976404edcSAsim Jamshed 		}
179076404edcSAsim Jamshed 	}
179105e3289cSYoungGyoun 	*/
179276404edcSAsim Jamshed 
1793a5e1a556SAsim Jamshed 	for (i = 0; i < g_config.mos->netdev_table->num; i++)
1794a5e1a556SAsim Jamshed 		DestroyAddressPool(ap[i]);
179576404edcSAsim Jamshed 
179676404edcSAsim Jamshed 	TRACE_INFO("All MTCP threads are joined.\n");
179776404edcSAsim Jamshed 
179876404edcSAsim Jamshed 	return 0;
179976404edcSAsim Jamshed }
180076404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
1801