xref: /mOS-networking-stack/core/src/core.c (revision 76404edc)
1 #define _GNU_SOURCE
2 #include <sched.h>
3 #include <unistd.h>
4 #include <sys/time.h>
5 #include <semaphore.h>
6 #include <sys/mman.h>
7 #include <signal.h>
8 #include <assert.h>
9 #include <string.h>
10 
11 #include "cpu.h"
12 #include "eth_in.h"
13 #include "fhash.h"
14 #include "tcp_send_buffer.h"
15 #include "tcp_ring_buffer.h"
16 #include "socket.h"
17 #include "eth_out.h"
18 #include "tcp.h"
19 #include "tcp_in.h"
20 #include "tcp_out.h"
21 #include "mtcp_api.h"
22 #include "eventpoll.h"
23 #include "logger.h"
24 #include "config.h"
25 #include "arp.h"
26 #include "ip_out.h"
27 #include "timer.h"
28 #include "debug.h"
29 #include "event_callback.h"
30 #include "tcp_rb.h"
31 #include "tcp_stream.h"
32 #include "io_module.h"
33 
34 #ifdef ENABLE_PSIO
35 #include "ps.h"
36 #endif
37 
38 #ifdef ENABLE_DPDK
39 /* for launching rte thread */
40 #include <rte_launch.h>
41 #include <rte_lcore.h>
42 #endif /* !ENABLE_DPDK */
43 #define PS_CHUNK_SIZE 64
44 #define RX_THRESH (PS_CHUNK_SIZE * 0.8)
45 
46 #define ROUND_STAT FALSE
47 #define TIME_STAT FALSE
48 #define EVENT_STAT FALSE
49 #define TESTING FALSE
50 
51 #define LOG_FILE_NAME "log"
52 #define MAX_FILE_NAME 1024
53 
54 #define MAX(a, b) ((a)>(b)?(a):(b))
55 #define MIN(a, b) ((a)<(b)?(a):(b))
56 
57 #define PER_STREAM_SLICE 0.1		// in ms
58 #define PER_STREAM_TCHECK 1			// in ms
59 #define PS_SELECT_TIMEOUT 100		// in us
60 
61 #define GBPS(bytes) (bytes * 8.0 / (1000 * 1000 * 1000))
62 
63 /*----------------------------------------------------------------------------*/
64 /* handlers for threads */
65 struct mtcp_thread_context *g_pctx[MAX_CPUS] = {0};
66 struct log_thread_context *g_logctx[MAX_CPUS] = {0};
67 /*----------------------------------------------------------------------------*/
68 static pthread_t g_thread[MAX_CPUS] = {0};
69 static pthread_t log_thread[MAX_CPUS] = {0};
70 /*----------------------------------------------------------------------------*/
71 static sem_t g_init_sem[MAX_CPUS];
72 static sem_t g_done_sem[MAX_CPUS];
73 static int running[MAX_CPUS] = {0};
74 /*----------------------------------------------------------------------------*/
75 mtcp_sighandler_t app_signal_handler;
76 static int sigint_cnt[MAX_CPUS] = {0};
77 static struct timespec sigint_ts[MAX_CPUS];
78 /*----------------------------------------------------------------------------*/
79 #ifdef NETSTAT
80 #if NETSTAT_TOTAL
81 static int printer = -1;
82 #if ROUND_STAT
83 #endif /* ROUND_STAT */
84 #endif /* NETSTAT_TOTAL */
85 #endif /* NETSTAT */
86 /*----------------------------------------------------------------------------*/
87 void
88 HandleSignal(int signal)
89 {
90 	int i = 0;
91 
92 	if (signal == SIGINT) {
93 #ifdef DARWIN
94 		int core = 0;
95 #else
96 		int core = sched_getcpu();
97 #endif
98 		struct timespec cur_ts;
99 
100 		clock_gettime(CLOCK_REALTIME, &cur_ts);
101 
102 		if (sigint_cnt[core] > 0 && cur_ts.tv_sec == sigint_ts[core].tv_sec) {
103 			for (i = 0; i < g_config.mos->num_cores; i++) {
104 				if (running[i]) {
105 					exit(0);
106 					g_pctx[i]->exit = TRUE;
107 				}
108 			}
109 		} else {
110 			for (i = 0; i < g_config.mos->num_cores; i++) {
111 				if (g_pctx[i])
112 					g_pctx[i]->interrupt = TRUE;
113 			}
114 			if (!app_signal_handler) {
115 				for (i = 0; i < g_config.mos->num_cores; i++) {
116 					if (running[i]) {
117 						exit(0);
118 						g_pctx[i]->exit = TRUE;
119 					}
120 				}
121 			}
122 		}
123 		sigint_cnt[core]++;
124 		clock_gettime(CLOCK_REALTIME, &sigint_ts[core]);
125 	}
126 
127 	if (signal != SIGUSR1) {
128 		if (app_signal_handler) {
129 			app_signal_handler(signal);
130 		}
131 	}
132 }
133 /*----------------------------------------------------------------------------*/
134 static int
135 AttachDevice(struct mtcp_thread_context* ctx)
136 {
137 	int working = -1;
138 	mtcp_manager_t mtcp = ctx->mtcp_manager;
139 
140 	if (mtcp->iom->link_devices)
141 		working = mtcp->iom->link_devices(ctx);
142 	else
143 		return 0;
144 
145 	return working;
146 }
147 /*----------------------------------------------------------------------------*/
148 #ifdef TIMESTAT
149 static inline void
150 InitStatCounter(struct stat_counter *counter)
151 {
152 	counter->cnt = 0;
153 	counter->sum = 0;
154 	counter->max = 0;
155 	counter->min = 0;
156 }
157 /*----------------------------------------------------------------------------*/
158 static inline void
159 UpdateStatCounter(struct stat_counter *counter, int64_t value)
160 {
161 	counter->cnt++;
162 	counter->sum += value;
163 	if (value > counter->max)
164 		counter->max = value;
165 	if (counter->min == 0 || value < counter->min)
166 		counter->min = value;
167 }
168 /*----------------------------------------------------------------------------*/
169 static inline uint64_t
170 GetAverageStat(struct stat_counter *counter)
171 {
172 	return counter->cnt ? (counter->sum / counter->cnt) : 0;
173 }
174 /*----------------------------------------------------------------------------*/
175 static inline int64_t
176 TimeDiffUs(struct timeval *t2, struct timeval *t1)
177 {
178 	return (t2->tv_sec - t1->tv_sec) * 1000000 +
179 			(int64_t)(t2->tv_usec - t1->tv_usec);
180 }
181 /*----------------------------------------------------------------------------*/
182 #endif
183 #ifdef NETSTAT
184 static inline void
185 PrintThreadNetworkStats(mtcp_manager_t mtcp, struct net_stat *ns)
186 {
187 	int i;
188 
189 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
190 		ns->rx_packets[i] = mtcp->nstat.rx_packets[i] - mtcp->p_nstat.rx_packets[i];
191 		ns->rx_errors[i] = mtcp->nstat.rx_errors[i] - mtcp->p_nstat.rx_errors[i];
192 		ns->rx_bytes[i] = mtcp->nstat.rx_bytes[i] - mtcp->p_nstat.rx_bytes[i];
193 		ns->tx_packets[i] = mtcp->nstat.tx_packets[i] - mtcp->p_nstat.tx_packets[i];
194 		ns->tx_drops[i] = mtcp->nstat.tx_drops[i] - mtcp->p_nstat.tx_drops[i];
195 		ns->tx_bytes[i] = mtcp->nstat.tx_bytes[i] - mtcp->p_nstat.tx_bytes[i];
196 #if NETSTAT_PERTHREAD
197 		if (g_config.mos->netdev_table->ent[i]->stat_print) {
198 			fprintf(stderr, "[CPU%2d] %s flows: %6u, "
199 					"RX: %7llu(pps) (err: %5llu), %5.2lf(Gbps), "
200 					"TX: %7llu(pps), %5.2lf(Gbps)\n",
201 					mtcp->ctx->cpu,
202 					g_config.mos->netdev_table->ent[i]->dev_name,
203 					(unsigned)mtcp->flow_cnt,
204 					(long long unsigned)ns->rx_packets[i],
205 					(long long unsigned)ns->rx_errors[i],
206 					GBPS(ns->rx_bytes[i]),
207 					(long long unsigned)ns->tx_packets[i],
208 					GBPS(ns->tx_bytes[i]));
209 		}
210 #endif
211 	}
212 	mtcp->p_nstat = mtcp->nstat;
213 
214 }
215 /*----------------------------------------------------------------------------*/
216 #if ROUND_STAT
217 static inline void
218 PrintThreadRoundStats(mtcp_manager_t mtcp, struct run_stat *rs)
219 {
220 #define ROUND_DIV (1000)
221 	rs->rounds = mtcp->runstat.rounds - mtcp->p_runstat.rounds;
222 	rs->rounds_rx = mtcp->runstat.rounds_rx - mtcp->p_runstat.rounds_rx;
223 	rs->rounds_rx_try = mtcp->runstat.rounds_rx_try - mtcp->p_runstat.rounds_rx_try;
224 	rs->rounds_tx = mtcp->runstat.rounds_tx - mtcp->p_runstat.rounds_tx;
225 	rs->rounds_tx_try = mtcp->runstat.rounds_tx_try - mtcp->p_runstat.rounds_tx_try;
226 	rs->rounds_select = mtcp->runstat.rounds_select - mtcp->p_runstat.rounds_select;
227 	rs->rounds_select_rx = mtcp->runstat.rounds_select_rx - mtcp->p_runstat.rounds_select_rx;
228 	rs->rounds_select_tx = mtcp->runstat.rounds_select_tx - mtcp->p_runstat.rounds_select_tx;
229 	rs->rounds_select_intr = mtcp->runstat.rounds_select_intr - mtcp->p_runstat.rounds_select_intr;
230 	rs->rounds_twcheck = mtcp->runstat.rounds_twcheck - mtcp->p_runstat.rounds_twcheck;
231 	mtcp->p_runstat = mtcp->runstat;
232 #if NETSTAT_PERTHREAD
233 	fprintf(stderr, "[CPU%2d] Rounds: %4lluK, "
234 			"rx: %3lluK (try: %4lluK), tx: %3lluK (try: %4lluK), "
235 			"ps_select: %4llu (rx: %4llu, tx: %4llu, intr: %3llu)\n",
236 			mtcp->ctx->cpu, rs->rounds / ROUND_DIV,
237 			rs->rounds_rx / ROUND_DIV, rs->rounds_rx_try / ROUND_DIV,
238 			rs->rounds_tx / ROUND_DIV, rs->rounds_tx_try / ROUND_DIV,
239 			rs->rounds_select,
240 			rs->rounds_select_rx, rs->rounds_select_tx, rs->rounds_select_intr);
241 #endif
242 }
243 #endif /* ROUND_STAT */
244 /*----------------------------------------------------------------------------*/
245 #if TIME_STAT
246 static inline void
247 PrintThreadRoundTime(mtcp_manager_t mtcp)
248 {
249 	fprintf(stderr, "[CPU%2d] Time: (avg, max) "
250 			"round: (%4luus, %4luus), processing: (%4luus, %4luus), "
251 			"tcheck: (%4luus, %4luus), epoll: (%4luus, %4luus), "
252 			"handle: (%4luus, %4luus), xmit: (%4luus, %4luus), "
253 			"select: (%4luus, %4luus)\n", mtcp->ctx->cpu,
254 			GetAverageStat(&mtcp->rtstat.round), mtcp->rtstat.round.max,
255 			GetAverageStat(&mtcp->rtstat.processing), mtcp->rtstat.processing.max,
256 			GetAverageStat(&mtcp->rtstat.tcheck), mtcp->rtstat.tcheck.max,
257 			GetAverageStat(&mtcp->rtstat.epoll), mtcp->rtstat.epoll.max,
258 			GetAverageStat(&mtcp->rtstat.handle), mtcp->rtstat.handle.max,
259 			GetAverageStat(&mtcp->rtstat.xmit), mtcp->rtstat.xmit.max,
260 			GetAverageStat(&mtcp->rtstat.select), mtcp->rtstat.select.max);
261 
262 	InitStatCounter(&mtcp->rtstat.round);
263 	InitStatCounter(&mtcp->rtstat.processing);
264 	InitStatCounter(&mtcp->rtstat.tcheck);
265 	InitStatCounter(&mtcp->rtstat.epoll);
266 	InitStatCounter(&mtcp->rtstat.handle);
267 	InitStatCounter(&mtcp->rtstat.xmit);
268 	InitStatCounter(&mtcp->rtstat.select);
269 }
270 #endif
271 #endif /* NETSTAT */
272 /*----------------------------------------------------------------------------*/
273 #if EVENT_STAT
274 static inline void
275 PrintEventStat(int core, struct mtcp_epoll_stat *stat)
276 {
277 	fprintf(stderr, "[CPU%2d] calls: %lu, waits: %lu, wakes: %lu, "
278 			"issued: %lu, registered: %lu, invalidated: %lu, handled: %lu\n",
279 			core, stat->calls, stat->waits, stat->wakes,
280 			stat->issued, stat->registered, stat->invalidated, stat->handled);
281 	memset(stat, 0, sizeof(struct mtcp_epoll_stat));
282 }
283 #endif /* EVENT_STAT */
284 /*----------------------------------------------------------------------------*/
285 #ifdef NETSTAT
286 static inline void
287 PrintNetworkStats(mtcp_manager_t mtcp, uint32_t cur_ts)
288 {
289 #define TIMEOUT 1
290 	int i;
291 	struct net_stat ns;
292 	bool stat_print = false;
293 #if ROUND_STAT
294 	struct run_stat rs;
295 #endif /* ROUND_STAT */
296 #ifdef NETSTAT_TOTAL
297 	static double peak_total_rx_gbps = 0;
298 	static double peak_total_tx_gbps = 0;
299 	static double avg_total_rx_gbps = 0;
300 	static double avg_total_tx_gbps = 0;
301 
302 	double total_rx_gbps = 0, total_tx_gbps = 0;
303 	int j;
304 	uint32_t gflow_cnt = 0;
305 	struct net_stat g_nstat;
306 #if ROUND_STAT
307 	struct run_stat g_runstat;
308 #endif /* ROUND_STAT */
309 #endif /* NETSTAT_TOTAL */
310 
311 	if (TS_TO_MSEC(cur_ts - mtcp->p_nstat_ts) < SEC_TO_MSEC(TIMEOUT)) {
312 		return;
313 	}
314 
315 	mtcp->p_nstat_ts = cur_ts;
316 	gflow_cnt = 0;
317 	memset(&g_nstat, 0, sizeof(struct net_stat));
318 	for (i = 0; i < g_config.mos->num_cores; i++) {
319 		if (running[i]) {
320 			PrintThreadNetworkStats(g_mtcp[i], &ns);
321 #if NETSTAT_TOTAL
322 			gflow_cnt += g_mtcp[i]->flow_cnt;
323 			for (j = 0; j < g_config.mos->netdev_table->num; j++) {
324 				g_nstat.rx_packets[j] += ns.rx_packets[j];
325 				g_nstat.rx_errors[j] += ns.rx_errors[j];
326 				g_nstat.rx_bytes[j] += ns.rx_bytes[j];
327 				g_nstat.tx_packets[j] += ns.tx_packets[j];
328 				g_nstat.tx_drops[j] += ns.tx_drops[j];
329 				g_nstat.tx_bytes[j] += ns.tx_bytes[j];
330 			}
331 #endif
332 		}
333 	}
334 #if NETSTAT_TOTAL
335 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
336 		if (g_config.mos->netdev_table->ent[i]->stat_print) {
337 			fprintf(stderr, "[ ALL ] %s, "
338 			"RX: %7llu(pps) (err: %5llu), %5.2lf(Gbps), "
339 			"TX: %7llu(pps), %5.2lf(Gbps)\n",
340 				g_config.mos->netdev_table->ent[i]->dev_name,
341 				(long long unsigned)g_nstat.rx_packets[i],
342 				(long long unsigned)g_nstat.rx_errors[i],
343 				GBPS(g_nstat.rx_bytes[i]),
344 				(long long unsigned)g_nstat.tx_packets[i],
345 				GBPS(g_nstat.tx_bytes[i]));
346 			total_rx_gbps += GBPS(g_nstat.rx_bytes[i]);
347 			total_tx_gbps += GBPS(g_nstat.tx_bytes[i]);
348 			stat_print = true;
349 		}
350 	}
351 	if (stat_print) {
352 		fprintf(stderr, "[ ALL ] flows: %6u\n", gflow_cnt);
353 		if (avg_total_rx_gbps == 0)
354 			avg_total_rx_gbps = total_rx_gbps;
355 		else
356 			avg_total_rx_gbps = avg_total_rx_gbps * 0.6 + total_rx_gbps * 0.4;
357 
358 		if (avg_total_tx_gbps == 0)
359 			avg_total_tx_gbps = total_tx_gbps;
360 		else
361 			avg_total_tx_gbps = avg_total_tx_gbps * 0.6 + total_tx_gbps * 0.4;
362 
363 		if (peak_total_rx_gbps < total_rx_gbps)
364 			peak_total_rx_gbps = total_rx_gbps;
365 		if (peak_total_tx_gbps < total_tx_gbps)
366 			peak_total_tx_gbps = total_tx_gbps;
367 
368 		fprintf(stderr, "[ PEAK ] RX: %5.2lf(Gbps), TX: %5.2lf(Gbps)\n"
369 						"[ RECENT AVG ] RX: %5.2lf(Gbps), TX: %5.2lf(Gbps)\n",
370 				peak_total_rx_gbps, peak_total_tx_gbps,
371 				avg_total_rx_gbps, avg_total_tx_gbps);
372 	}
373 #endif
374 
375 #if ROUND_STAT
376 	memset(&g_runstat, 0, sizeof(struct run_stat));
377 	for (i = 0; i < g_config.mos->num_cores; i++) {
378 		if (running[i]) {
379 			PrintThreadRoundStats(g_mtcp[i], &rs);
380 #if DBGMSG
381 			g_runstat.rounds += rs.rounds;
382 			g_runstat.rounds_rx += rs.rounds_rx;
383 			g_runstat.rounds_rx_try += rs.rounds_rx_try;
384 			g_runstat.rounds_tx += rs.rounds_tx;
385 			g_runstat.rounds_tx_try += rs.rounds_tx_try;
386 			g_runstat.rounds_select += rs.rounds_select;
387 			g_runstat.rounds_select_rx += rs.rounds_select_rx;
388 			g_runstat.rounds_select_tx += rs.rounds_select_tx;
389 #endif
390 		}
391 	}
392 
393 	TRACE_DBG("[ ALL ] Rounds: %4ldK, "
394 		  "rx: %3ldK (try: %4ldK), tx: %3ldK (try: %4ldK), "
395 		  "ps_select: %4ld (rx: %4ld, tx: %4ld)\n",
396 		  g_runstat.rounds / 1000, g_runstat.rounds_rx / 1000,
397 		  g_runstat.rounds_rx_try / 1000, g_runstat.rounds_tx / 1000,
398 		  g_runstat.rounds_tx_try / 1000, g_runstat.rounds_select,
399 		  g_runstat.rounds_select_rx, g_runstat.rounds_select_tx);
400 #endif /* ROUND_STAT */
401 
402 #if TIME_STAT
403 	for (i = 0; i < g_config.mos->num_cores; i++) {
404 		if (running[i]) {
405 			PrintThreadRoundTime(g_mtcp[i]);
406 		}
407 	}
408 #endif
409 
410 #if EVENT_STAT
411 	for (i = 0; i < g_config.mos->num_cores; i++) {
412 		if (running[i] && g_mtcp[i]->ep) {
413 			PrintEventStat(i, &g_mtcp[i]->ep->stat);
414 		}
415 	}
416 #endif
417 
418 	fflush(stderr);
419 }
420 #endif /* NETSTAT */
421 /*----------------------------------------------------------------------------*/
422 static inline void
423 FlushMonitorReadEvents(mtcp_manager_t mtcp)
424 {
425 	struct event_queue *mtcpq;
426 	struct tcp_stream *cur_stream;
427 	struct mon_listener *walk;
428 
429 	/* check if monitor sockets should be passed data */
430 	TAILQ_FOREACH(walk, &mtcp->monitors, link) {
431 		if (walk->socket->socktype != MOS_SOCK_MONITOR_STREAM ||
432 			!(mtcpq = walk->eq))
433 			continue;
434 
435 		while (mtcpq->num_events > 0) {
436 			cur_stream =
437 				(struct tcp_stream *)mtcpq->events[mtcpq->start++].ev.data.ptr;
438 			/* only read events */
439 			if (cur_stream != NULL &&
440 					(cur_stream->socket->events | MOS_EPOLLIN)) {
441 				if (cur_stream->rcvvar != NULL &&
442 						cur_stream->rcvvar->rcvbuf != NULL) {
443 					/* no need to pass pkt context */
444 					struct socket_map *walk;
445 					SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
446 						HandleCallback(mtcp, MOS_NULL, walk,
447 							       cur_stream->side, NULL,
448 							       MOS_ON_CONN_NEW_DATA);
449 					} SOCKQ_FOREACH_END;
450 #ifndef NEWRB
451 					/* re-adjust tcp_ring_buffer now */
452 					RBRemove(mtcp->rbm_rcv, cur_stream->rcvvar->rcvbuf,
453 							cur_stream->rcvvar->rcvbuf->merged_len,
454 							AT_MTCP, cur_stream->buffer_mgmt);
455 					cur_stream->rcvvar->rcv_wnd =
456 						cur_stream->rcvvar->rcvbuf->size
457 						- 1 - cur_stream->rcvvar->rcvbuf->last_len;
458 #endif
459 				}
460 				/* reset the actions now */
461 				cur_stream->actions = 0;
462 			}
463 			if (mtcpq->start >= mtcpq->size)
464 				mtcpq->start = 0;
465 			mtcpq->num_events--;
466 		}
467 	}
468 }
469 /*----------------------------------------------------------------------------*/
470 static inline void
471 FlushBufferedReadEvents(mtcp_manager_t mtcp)
472 {
473 	int i;
474 	int offset;
475 	struct event_queue *mtcpq;
476 	struct tcp_stream *cur_stream;
477 
478 	if (mtcp->ep == NULL) {
479 		TRACE_EPOLL("No epoll socket has been registered yet!\n");
480 		return;
481 	} else {
482 		/* case when mtcpq exists */
483 		mtcpq = mtcp->ep->mtcp_queue;
484 		offset = mtcpq->start;
485 	}
486 
487 	/* we will use queued-up epoll read-in events
488 	 * to trigger buffered read monitor events */
489 	for (i = 0; i < mtcpq->num_events; i++) {
490 		cur_stream = mtcp->smap[mtcpq->events[offset++].sockid].stream;
491 		/* only read events */
492 		/* Raise new data callback event */
493 		if (cur_stream != NULL &&
494 				(cur_stream->socket->events | MOS_EPOLLIN)) {
495 			if (cur_stream->rcvvar != NULL &&
496 					cur_stream->rcvvar->rcvbuf != NULL) {
497 				/* no need to pass pkt context */
498 				struct socket_map *walk;
499 				SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
500 					HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side,
501 							NULL, MOS_ON_CONN_NEW_DATA);
502 				} SOCKQ_FOREACH_END;
503 			}
504 		}
505 		if (offset >= mtcpq->size)
506 			offset = 0;
507 	}
508 }
509 /*----------------------------------------------------------------------------*/
510 static inline void
511 FlushEpollEvents(mtcp_manager_t mtcp, uint32_t cur_ts)
512 {
513 	struct mtcp_epoll *ep = mtcp->ep;
514 	struct event_queue *usrq = ep->usr_queue;
515 	struct event_queue *mtcpq = ep->mtcp_queue;
516 
517 	pthread_mutex_lock(&ep->epoll_lock);
518 	if (ep->mtcp_queue->num_events > 0) {
519 		/* while mtcp_queue have events */
520 		/* and usr_queue is not full */
521 		while (mtcpq->num_events > 0 && usrq->num_events < usrq->size) {
522 			/* copy the event from mtcp_queue to usr_queue */
523 			usrq->events[usrq->end++] = mtcpq->events[mtcpq->start++];
524 
525 			if (usrq->end >= usrq->size)
526 				usrq->end = 0;
527 			usrq->num_events++;
528 
529 			if (mtcpq->start >= mtcpq->size)
530 				mtcpq->start = 0;
531 			mtcpq->num_events--;
532 		}
533 	}
534 
535 	/* if there are pending events, wake up user */
536 	if (ep->waiting && (ep->usr_queue->num_events > 0 ||
537 				ep->usr_shadow_queue->num_events > 0)) {
538 		STAT_COUNT(mtcp->runstat.rounds_epoll);
539 		TRACE_EPOLL("Broadcasting events. num: %d, cur_ts: %u, prev_ts: %u\n",
540 				ep->usr_queue->num_events, cur_ts, mtcp->ts_last_event);
541 		mtcp->ts_last_event = cur_ts;
542 		ep->stat.wakes++;
543 		pthread_cond_signal(&ep->epoll_cond);
544 	}
545 	pthread_mutex_unlock(&ep->epoll_lock);
546 }
547 /*----------------------------------------------------------------------------*/
548 static inline void
549 HandleApplicationCalls(mtcp_manager_t mtcp, uint32_t cur_ts)
550 {
551 	tcp_stream *stream;
552 	int cnt, max_cnt;
553 	int handled, delayed;
554 	int control, send, ack;
555 
556 	/* connect handling */
557 	while ((stream = StreamDequeue(mtcp->connectq))) {
558 		if (stream->state != TCP_ST_SYN_SENT) {
559 			TRACE_INFO("Got a connection request from app with state: %s",
560 				   TCPStateToString(stream));
561 			exit(EXIT_FAILURE);
562 		} else {
563 			stream->cb_events |= MOS_ON_CONN_START |
564 				MOS_ON_TCP_STATE_CHANGE;
565 			/* if monitor is on... */
566 			if (stream->pair_stream != NULL)
567 				stream->pair_stream->cb_events |=
568 					MOS_ON_CONN_START;
569 		}
570 		AddtoControlList(mtcp, stream, cur_ts);
571 	}
572 
573 	/* send queue handling */
574 	while ((stream = StreamDequeue(mtcp->sendq))) {
575 		stream->sndvar->on_sendq = FALSE;
576 		AddtoSendList(mtcp, stream);
577 	}
578 
579 	/* ack queue handling */
580 	while ((stream = StreamDequeue(mtcp->ackq))) {
581 		stream->sndvar->on_ackq = FALSE;
582 		EnqueueACK(mtcp, stream, cur_ts, ACK_OPT_AGGREGATE);
583 	}
584 
585 	/* close handling */
586 	handled = delayed = 0;
587 	control = send = ack = 0;
588 	while ((stream = StreamDequeue(mtcp->closeq))) {
589 		struct tcp_send_vars *sndvar = stream->sndvar;
590 		sndvar->on_closeq = FALSE;
591 
592 		if (sndvar->sndbuf) {
593 			sndvar->fss = sndvar->sndbuf->head_seq + sndvar->sndbuf->len;
594 		} else {
595 			sndvar->fss = stream->snd_nxt;
596 		}
597 
598 		if (g_config.mos->tcp_timeout > 0)
599 			RemoveFromTimeoutList(mtcp, stream);
600 
601 		if (stream->have_reset) {
602 			handled++;
603 			if (stream->state != TCP_ST_CLOSED_RSVD) {
604 				stream->close_reason = TCP_RESET;
605 				stream->state = TCP_ST_CLOSED_RSVD;
606 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
607 				TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id);
608 				DestroyTCPStream(mtcp, stream);
609 			} else {
610 				TRACE_ERROR("Stream already closed.\n");
611 			}
612 
613 		} else if (sndvar->on_control_list) {
614 			sndvar->on_closeq_int = TRUE;
615 			StreamInternalEnqueue(mtcp->closeq_int, stream);
616 			delayed++;
617 			if (sndvar->on_control_list)
618 				control++;
619 			if (sndvar->on_send_list)
620 				send++;
621 			if (sndvar->on_ack_list)
622 				ack++;
623 
624 		} else if (sndvar->on_send_list || sndvar->on_ack_list) {
625 			handled++;
626 			if (stream->state == TCP_ST_ESTABLISHED) {
627 				stream->state = TCP_ST_FIN_WAIT_1;
628 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
629 				TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id);
630 
631 			} else if (stream->state == TCP_ST_CLOSE_WAIT) {
632 				stream->state = TCP_ST_LAST_ACK;
633 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
634 				TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id);
635 			}
636 			stream->control_list_waiting = TRUE;
637 
638 		} else if (stream->state != TCP_ST_CLOSED_RSVD) {
639 			handled++;
640 			if (stream->state == TCP_ST_ESTABLISHED) {
641 				stream->state = TCP_ST_FIN_WAIT_1;
642 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
643 				TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id);
644 
645 			} else if (stream->state == TCP_ST_CLOSE_WAIT) {
646 				stream->state = TCP_ST_LAST_ACK;
647 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
648 				TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id);
649 			}
650 			//sndvar->rto = TCP_FIN_RTO;
651 			//UpdateRetransmissionTimer(mtcp, stream, mtcp->cur_ts);
652 			AddtoControlList(mtcp, stream, cur_ts);
653 		} else {
654 			TRACE_ERROR("Already closed connection!\n");
655 		}
656 	}
657 	TRACE_ROUND("Handling close connections. cnt: %d\n", cnt);
658 
659 	cnt = 0;
660 	max_cnt = mtcp->closeq_int->count;
661 	while (cnt++ < max_cnt) {
662 		stream = StreamInternalDequeue(mtcp->closeq_int);
663 
664 		if (stream->sndvar->on_control_list) {
665 			StreamInternalEnqueue(mtcp->closeq_int, stream);
666 
667 		} else if (stream->state != TCP_ST_CLOSED_RSVD) {
668 			handled++;
669 			stream->sndvar->on_closeq_int = FALSE;
670 			if (stream->state == TCP_ST_ESTABLISHED) {
671 				stream->state = TCP_ST_FIN_WAIT_1;
672 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
673 				TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id);
674 
675 			} else if (stream->state == TCP_ST_CLOSE_WAIT) {
676 				stream->state = TCP_ST_LAST_ACK;
677 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
678 				TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id);
679 			}
680 			AddtoControlList(mtcp, stream, cur_ts);
681 		} else {
682 			stream->sndvar->on_closeq_int = FALSE;
683 			TRACE_ERROR("Already closed connection!\n");
684 		}
685 	}
686 
687 	/* reset handling */
688 	while ((stream = StreamDequeue(mtcp->resetq))) {
689 		stream->sndvar->on_resetq = FALSE;
690 
691 		if (g_config.mos->tcp_timeout > 0)
692 			RemoveFromTimeoutList(mtcp, stream);
693 
694 		if (stream->have_reset) {
695 			if (stream->state != TCP_ST_CLOSED_RSVD) {
696 				stream->close_reason = TCP_RESET;
697 				stream->state = TCP_ST_CLOSED_RSVD;
698 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
699 				TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id);
700 				DestroyTCPStream(mtcp, stream);
701 			} else {
702 				TRACE_ERROR("Stream already closed.\n");
703 			}
704 
705 		} else if (stream->sndvar->on_control_list ||
706 				stream->sndvar->on_send_list || stream->sndvar->on_ack_list) {
707 			/* wait until all the queues are flushed */
708 			stream->sndvar->on_resetq_int = TRUE;
709 			StreamInternalEnqueue(mtcp->resetq_int, stream);
710 
711 		} else {
712 			if (stream->state != TCP_ST_CLOSED_RSVD) {
713 				stream->close_reason = TCP_ACTIVE_CLOSE;
714 				stream->state = TCP_ST_CLOSED_RSVD;
715 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
716 				TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id);
717 				AddtoControlList(mtcp, stream, cur_ts);
718 			} else {
719 				TRACE_ERROR("Stream already closed.\n");
720 			}
721 		}
722 	}
723 	TRACE_ROUND("Handling reset connections. cnt: %d\n", cnt);
724 
725 	cnt = 0;
726 	max_cnt = mtcp->resetq_int->count;
727 	while (cnt++ < max_cnt) {
728 		stream = StreamInternalDequeue(mtcp->resetq_int);
729 
730 		if (stream->sndvar->on_control_list ||
731 				stream->sndvar->on_send_list || stream->sndvar->on_ack_list) {
732 			/* wait until all the queues are flushed */
733 			StreamInternalEnqueue(mtcp->resetq_int, stream);
734 
735 		} else {
736 			stream->sndvar->on_resetq_int = FALSE;
737 
738 			if (stream->state != TCP_ST_CLOSED_RSVD) {
739 				stream->close_reason = TCP_ACTIVE_CLOSE;
740 				stream->state = TCP_ST_CLOSED_RSVD;
741 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
742 				TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id);
743 				AddtoControlList(mtcp, stream, cur_ts);
744 			} else {
745 				TRACE_ERROR("Stream already closed.\n");
746 			}
747 		}
748 	}
749 
750 	/* destroy streams in destroyq */
751 	while ((stream = StreamDequeue(mtcp->destroyq))) {
752 		DestroyTCPStream(mtcp, stream);
753 	}
754 
755 	mtcp->wakeup_flag = FALSE;
756 }
757 /*----------------------------------------------------------------------------*/
758 static inline void
759 WritePacketsToChunks(mtcp_manager_t mtcp, uint32_t cur_ts)
760 {
761 	int thresh = g_config.mos->max_concurrency;
762 	int i;
763 
764 	/* Set the threshold to g_config.mos->max_concurrency to send ACK immediately */
765 	/* Otherwise, set to appropriate value (e.g. thresh) */
766 	assert(mtcp->g_sender != NULL);
767 	if (mtcp->g_sender->control_list_cnt)
768 		WriteTCPControlList(mtcp, mtcp->g_sender, cur_ts, thresh);
769 	if (mtcp->g_sender->ack_list_cnt)
770 		WriteTCPACKList(mtcp, mtcp->g_sender, cur_ts, thresh);
771 	if (mtcp->g_sender->send_list_cnt)
772 		WriteTCPDataList(mtcp, mtcp->g_sender, cur_ts, thresh);
773 
774 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
775 		assert(mtcp->n_sender[i] != NULL);
776 		if (mtcp->n_sender[i]->control_list_cnt)
777 			WriteTCPControlList(mtcp, mtcp->n_sender[i], cur_ts, thresh);
778 		if (mtcp->n_sender[i]->ack_list_cnt)
779 			WriteTCPACKList(mtcp, mtcp->n_sender[i], cur_ts, thresh);
780 		if (mtcp->n_sender[i]->send_list_cnt)
781 			WriteTCPDataList(mtcp, mtcp->n_sender[i], cur_ts, thresh);
782 	}
783 }
784 /*----------------------------------------------------------------------------*/
785 #if TESTING
786 static int
787 DestroyRemainingFlows(mtcp_manager_t mtcp)
788 {
789 	struct hashtable *ht = mtcp->tcp_flow_table;
790 	tcp_stream *walk;
791 	int cnt, i;
792 
793 	cnt = 0;
794 
795 	thread_printf(mtcp, mtcp->log_fp,
796 			"CPU %d: Flushing remaining flows.\n", mtcp->ctx->cpu);
797 
798 	for (i = 0; i < NUM_BINS; i++) {
799 		TAILQ_FOREACH(walk, &ht->ht_table[i], rcvvar->he_link) {
800 			thread_printf(mtcp, mtcp->log_fp,
801 					"CPU %d: Destroying stream %d\n", mtcp->ctx->cpu, walk->id);
802 			DumpStream(mtcp, walk);
803 			DestroyTCPStream(mtcp, walk);
804 			cnt++;
805 		}
806 	}
807 
808 	return cnt;
809 }
810 #endif
811 /*----------------------------------------------------------------------------*/
812 static void
813 InterruptApplication(mtcp_manager_t mtcp)
814 {
815 	/* interrupt if the mtcp_epoll_wait() is waiting */
816 	if (mtcp->ep) {
817 		pthread_mutex_lock(&mtcp->ep->epoll_lock);
818 		if (mtcp->ep->waiting) {
819 			pthread_cond_signal(&mtcp->ep->epoll_cond);
820 		}
821 		pthread_mutex_unlock(&mtcp->ep->epoll_lock);
822 	}
823 	/* interrupt if the accept() is waiting */
824 	if (mtcp->listener) {
825 		if (mtcp->listener->socket) {
826 			pthread_mutex_lock(&mtcp->listener->accept_lock);
827 			if (!(mtcp->listener->socket->opts & MTCP_NONBLOCK)) {
828 				pthread_cond_signal(&mtcp->listener->accept_cond);
829 			}
830 			pthread_mutex_unlock(&mtcp->listener->accept_lock);
831 		}
832 	}
833 }
834 /*----------------------------------------------------------------------------*/
835 void
836 RunPassiveLoop(mtcp_manager_t mtcp)
837 {
838 	sem_wait(&g_done_sem[mtcp->ctx->cpu]);
839 	sem_destroy(&g_done_sem[mtcp->ctx->cpu]);
840 	return;
841 }
842 /*----------------------------------------------------------------------------*/
843 static void
844 RunMainLoop(struct mtcp_thread_context *ctx)
845 {
846 	mtcp_manager_t mtcp = ctx->mtcp_manager;
847 	int i;
848 	int recv_cnt;
849 
850 #if E_PSIO
851 	int rx_inf, tx_inf;
852 #endif
853 
854 	struct timeval cur_ts = {0};
855 	uint32_t ts, ts_prev;
856 
857 #if TIME_STAT
858 	struct timeval prev_ts, processing_ts, tcheck_ts,
859 				   epoll_ts, handle_ts, xmit_ts, select_ts;
860 #endif
861 	int thresh;
862 
863 	gettimeofday(&cur_ts, NULL);
864 
865 #if E_PSIO
866 	/* do nothing */
867 #else
868 #if !USE_CHUNK_BUF
869 	/* create packet write chunk */
870 	InitWriteChunks(handle, ctx->w_chunk);
871 	for (i = 0; i < ETH_NUM; i++) {
872 		ctx->w_chunk[i].cnt = 0;
873 		ctx->w_off[i] = 0;
874 		ctx->w_cur_idx[i] = 0;
875 	}
876 #endif
877 #endif
878 
879 	TRACE_DBG("CPU %d: mtcp thread running.\n", ctx->cpu);
880 
881 #if TIME_STAT
882 	prev_ts = cur_ts;
883 	InitStatCounter(&mtcp->rtstat.round);
884 	InitStatCounter(&mtcp->rtstat.processing);
885 	InitStatCounter(&mtcp->rtstat.tcheck);
886 	InitStatCounter(&mtcp->rtstat.epoll);
887 	InitStatCounter(&mtcp->rtstat.handle);
888 	InitStatCounter(&mtcp->rtstat.xmit);
889 	InitStatCounter(&mtcp->rtstat.select);
890 #endif
891 
892 	ts = ts_prev = 0;
893 	while ((!ctx->done || mtcp->flow_cnt) && !ctx->exit) {
894 
895 		STAT_COUNT(mtcp->runstat.rounds);
896 		recv_cnt = 0;
897 
898 #if E_PSIO
899 #if 0
900 		event.timeout = PS_SELECT_TIMEOUT;
901 		NID_ZERO(event.rx_nids);
902 		NID_ZERO(event.tx_nids);
903 		NID_ZERO(rx_avail);
904 		//NID_ZERO(tx_avail);
905 #endif
906 		gettimeofday(&cur_ts, NULL);
907 #if TIME_STAT
908 		/* measure the inter-round delay */
909 		UpdateStatCounter(&mtcp->rtstat.round, TimeDiffUs(&cur_ts, &prev_ts));
910 		prev_ts = cur_ts;
911 #endif
912 
913 		ts = TIMEVAL_TO_TS(&cur_ts);
914 		mtcp->cur_ts = ts;
915 
916 		for (rx_inf = 0; rx_inf < g_config.mos->netdev_table->num; rx_inf++) {
917 
918 			recv_cnt = mtcp->iom->recv_pkts(ctx, rx_inf);
919 			STAT_COUNT(mtcp->runstat.rounds_rx_try);
920 
921 			for (i = 0; i < recv_cnt; i++) {
922 				uint16_t len;
923 				uint8_t *pktbuf;
924 				pktbuf = mtcp->iom->get_rptr(mtcp->ctx, rx_inf, i, &len);
925 				ProcessPacket(mtcp, rx_inf, i, ts, pktbuf, len);
926 			}
927 
928 #ifdef ENABLE_DPDKR
929 			mtcp->iom->send_pkts(ctx, rx_inf);
930 			continue;
931 #endif
932 		}
933 		STAT_COUNT(mtcp->runstat.rounds_rx);
934 
935 #else /* E_PSIO */
936 		gettimeofday(&cur_ts, NULL);
937 		ts = TIMEVAL_TO_TS(&cur_ts);
938 		mtcp->cur_ts = ts;
939 		/*
940 		 * Read packets into a chunk from NIC
941 		 * chk_w_idx : next chunk index to write packets from NIC
942 		 */
943 
944 		STAT_COUNT(mtcp->runstat.rounds_rx_try);
945 		chunk.cnt = PS_CHUNK_SIZE;
946 		recv_cnt = ps_recv_chunk(handle, &chunk);
947 		if (recv_cnt < 0) {
948 			if (errno != EAGAIN && errno != EINTR) {
949 				perror("ps_recv_chunk");
950 				assert(0);
951 			}
952 		}
953 
954 		/*
955 		 * Handling Packets
956 		 * chk_r_idx : next chunk index to read and handle
957 		*/
958 		for (i = 0; i < recv_cnt; i++) {
959 			ProcessPacket(mtcp, chunk.queue.ifindex, ts,
960 					(u_char *)(chunk.buf + chunk.info[i].offset),
961 					chunk.info[i].len);
962 		}
963 
964 		if (recv_cnt > 0)
965 			STAT_COUNT(mtcp->runstat.rounds_rx);
966 #endif /* E_PSIO */
967 #if TIME_STAT
968 		gettimeofday(&processing_ts, NULL);
969 		UpdateStatCounter(&mtcp->rtstat.processing,
970 				TimeDiffUs(&processing_ts, &cur_ts));
971 #endif /* TIME_STAT */
972 
973 		/* Handle user defined timeout */
974 		struct timer *walk, *tmp;
975 		for (walk = TAILQ_FIRST(&mtcp->timer_list); walk != NULL; walk = tmp) {
976 			tmp = TAILQ_NEXT(walk, timer_link);
977 			if (TIMEVAL_LT(&cur_ts, &walk->exp))
978 				break;
979 
980 			struct mtcp_context mctx = {.cpu = ctx->cpu};
981 			walk->cb(&mctx, walk->id, 0, 0 /* FIXME */, NULL);
982 			DelTimer(mtcp, walk);
983 		}
984 
985 		/* interaction with application */
986 		if (mtcp->flow_cnt > 0) {
987 
988 			/* check retransmission timeout and timewait expire */
989 #if 0
990 			thresh = (int)mtcp->flow_cnt / (TS_TO_USEC(PER_STREAM_TCHECK));
991 			assert(thresh >= 0);
992 			if (thresh == 0)
993 				thresh = 1;
994 			if (recv_cnt > 0 && thresh > recv_cnt)
995 				thresh = recv_cnt;
996 #else
997 			thresh = g_config.mos->max_concurrency;
998 #endif
999 
1000 			/* Eunyoung, you may fix this later
1001 			 * if there is no rcv packet, we will send as much as possible
1002 			 */
1003 			if (thresh == -1)
1004 				thresh = g_config.mos->max_concurrency;
1005 
1006 			CheckRtmTimeout(mtcp, ts, thresh);
1007 			CheckTimewaitExpire(mtcp, ts, thresh);
1008 
1009 			if (g_config.mos->tcp_timeout > 0 && ts != ts_prev) {
1010 				CheckConnectionTimeout(mtcp, ts, thresh);
1011 			}
1012 
1013 #if TIME_STAT
1014 		}
1015 		gettimeofday(&tcheck_ts, NULL);
1016 		UpdateStatCounter(&mtcp->rtstat.tcheck,
1017 				TimeDiffUs(&tcheck_ts, &processing_ts));
1018 
1019 		if (mtcp->flow_cnt > 0) {
1020 #endif /* TIME_STAT */
1021 
1022 		}
1023 
1024 		/*
1025 		 * before flushing epoll events, call monitor events for
1026 		 * all registered `read` events
1027 		 */
1028 		if (mtcp->num_msp > 0)
1029 			/* call this when only a standalone monitor is running */
1030 			FlushMonitorReadEvents(mtcp);
1031 
1032 		/* if epoll is in use, flush all the queued events */
1033 		if (mtcp->ep) {
1034 			FlushBufferedReadEvents(mtcp);
1035 			FlushEpollEvents(mtcp, ts);
1036 		}
1037 #if TIME_STAT
1038 		gettimeofday(&epoll_ts, NULL);
1039 		UpdateStatCounter(&mtcp->rtstat.epoll,
1040 				TimeDiffUs(&epoll_ts, &tcheck_ts));
1041 #endif /* TIME_STAT */
1042 
1043 		if (end_app_exists && mtcp->flow_cnt > 0) {
1044 			/* handle stream queues  */
1045 			HandleApplicationCalls(mtcp, ts);
1046 		}
1047 
1048 #ifdef ENABLE_DPDKR
1049 		continue;
1050 #endif
1051 
1052 #if TIME_STAT
1053 		gettimeofday(&handle_ts, NULL);
1054 		UpdateStatCounter(&mtcp->rtstat.handle,
1055 				TimeDiffUs(&handle_ts, &epoll_ts));
1056 #endif /* TIME_STAT */
1057 
1058 		WritePacketsToChunks(mtcp, ts);
1059 
1060 		/* send packets from write buffer */
1061 #if E_PSIO
1062 		/* With E_PSIO, send until tx is available */
1063 		int num_dev = g_config.mos->netdev_table->num;
1064 		if (likely(mtcp->iom->send_pkts != NULL))
1065 			for (tx_inf = 0; tx_inf < num_dev; tx_inf++) {
1066 				mtcp->iom->send_pkts(ctx, tx_inf);
1067 			}
1068 
1069 #else /* E_PSIO */
1070 		/* Without E_PSIO, try send chunks immediately */
1071 		for (i = 0; i < g_config.mos->netdev_table->num; i++) {
1072 #if USE_CHUNK_BUF
1073 			/* in the case of using ps_send_chunk_buf() without E_PSIO */
1074 			ret = FlushSendChunkBuf(mtcp, i);
1075 #else
1076 			/* if not using ps_send_chunk_buf() */
1077 			ret = FlushWriteBuffer(ctx, i);
1078 #endif
1079 			if (ret < 0) {
1080 				TRACE_ERROR("Failed to send chunks.\n");
1081 			} else if (ret > 0) {
1082 				STAT_COUNT(mtcp->runstat.rounds_tx);
1083 			}
1084 		}
1085 #endif /* E_PSIO */
1086 
1087 #if TIME_STAT
1088 		gettimeofday(&xmit_ts, NULL);
1089 		UpdateStatCounter(&mtcp->rtstat.xmit,
1090 				TimeDiffUs(&xmit_ts, &handle_ts));
1091 #endif /* TIME_STAT */
1092 
1093 		if (ts != ts_prev) {
1094 			ts_prev = ts;
1095 #ifdef NETSTAT
1096 			if (ctx->cpu == printer) {
1097 #ifdef RUN_ARP
1098 				ARPTimer(mtcp, ts);
1099 #endif
1100 				PrintNetworkStats(mtcp, ts);
1101 			}
1102 #endif /* NETSTAT */
1103 		}
1104 
1105 		if (mtcp->iom->select)
1106 			mtcp->iom->select(ctx);
1107 
1108 		if (ctx->interrupt) {
1109 			InterruptApplication(mtcp);
1110 		}
1111 	}
1112 
1113 #if TESTING
1114 	DestroyRemainingFlows(mtcp);
1115 #endif
1116 
1117 	TRACE_DBG("MTCP thread %d out of main loop.\n", ctx->cpu);
1118 	/* flush logs */
1119 	flush_log_data(mtcp);
1120 	TRACE_DBG("MTCP thread %d flushed logs.\n", ctx->cpu);
1121 	InterruptApplication(mtcp);
1122 	TRACE_INFO("MTCP thread %d finished.\n", ctx->cpu);
1123 }
1124 /*----------------------------------------------------------------------------*/
1125 struct mtcp_sender *
1126 CreateMTCPSender(int ifidx)
1127 {
1128 	struct mtcp_sender *sender;
1129 
1130 	sender = (struct mtcp_sender *)calloc(1, sizeof(struct mtcp_sender));
1131 	if (!sender) {
1132 		return NULL;
1133 	}
1134 
1135 	sender->ifidx = ifidx;
1136 
1137 	TAILQ_INIT(&sender->control_list);
1138 	TAILQ_INIT(&sender->send_list);
1139 	TAILQ_INIT(&sender->ack_list);
1140 
1141 	sender->control_list_cnt = 0;
1142 	sender->send_list_cnt = 0;
1143 	sender->ack_list_cnt = 0;
1144 
1145 	return sender;
1146 }
1147 /*----------------------------------------------------------------------------*/
1148 void
1149 DestroyMTCPSender(struct mtcp_sender *sender)
1150 {
1151 	free(sender);
1152 }
1153 /*----------------------------------------------------------------------------*/
1154 static mtcp_manager_t
1155 InitializeMTCPManager(struct mtcp_thread_context* ctx)
1156 {
1157 	mtcp_manager_t mtcp;
1158 	char log_name[MAX_FILE_NAME];
1159 	int i;
1160 
1161 	posix_seq_srand((unsigned)pthread_self());
1162 
1163 	mtcp = (mtcp_manager_t)calloc(1, sizeof(struct mtcp_manager));
1164 	if (!mtcp) {
1165 		perror("malloc");
1166 		CTRACE_ERROR("Failed to allocate mtcp_manager.\n");
1167 		return NULL;
1168 	}
1169 	g_mtcp[ctx->cpu] = mtcp;
1170 
1171 	mtcp->tcp_flow_table = CreateHashtable();
1172 	if (!mtcp->tcp_flow_table) {
1173 		CTRACE_ERROR("Falied to allocate tcp flow table.\n");
1174 		return NULL;
1175 	}
1176 
1177 #ifdef HUGEPAGE
1178 #define	IS_HUGEPAGE 1
1179 #else
1180 #define	IS_HUGEPAGE 0
1181 #endif
1182 	if (mon_app_exists) {
1183 		/* initialize event callback */
1184 #ifdef NEWEV
1185 		InitEvent(mtcp);
1186 #else
1187 		InitEvent(mtcp, NUM_EV_TABLE);
1188 #endif
1189 	}
1190 
1191 	if (!(mtcp->bufseg_pool = MPCreate(sizeof(tcpbufseg_t),
1192 			sizeof(tcpbufseg_t) * g_config.mos->max_concurrency *
1193 			((g_config.mos->rmem_size - 1) / UNITBUFSIZE + 1), 0))) {
1194 		TRACE_ERROR("Failed to allocate ev_table pool\n");
1195 		exit(0);
1196 	}
1197 	if (!(mtcp->sockent_pool = MPCreate(sizeof(struct sockent),
1198 			sizeof(struct sockent) * g_config.mos->max_concurrency * 3, 0))) {
1199 		TRACE_ERROR("Failed to allocate ev_table pool\n");
1200 		exit(0);
1201 	}
1202 #ifdef USE_TIMER_POOL
1203 	if (!(mtcp->timer_pool = MPCreate(sizeof(struct timer),
1204 					  sizeof(struct timer) * g_config.mos->max_concurrency * 10, 0))) {
1205 		TRACE_ERROR("Failed to allocate ev_table pool\n");
1206 		exit(0);
1207 	}
1208 #endif
1209 	mtcp->flow_pool = MPCreate(sizeof(tcp_stream),
1210 								sizeof(tcp_stream) * g_config.mos->max_concurrency, IS_HUGEPAGE);
1211 	if (!mtcp->flow_pool) {
1212 		CTRACE_ERROR("Failed to allocate tcp flow pool.\n");
1213 		return NULL;
1214 	}
1215 	mtcp->rv_pool = MPCreate(sizeof(struct tcp_recv_vars),
1216 			sizeof(struct tcp_recv_vars) * g_config.mos->max_concurrency, IS_HUGEPAGE);
1217 	if (!mtcp->rv_pool) {
1218 		CTRACE_ERROR("Failed to allocate tcp recv variable pool.\n");
1219 		return NULL;
1220 	}
1221 	mtcp->sv_pool = MPCreate(sizeof(struct tcp_send_vars),
1222 			sizeof(struct tcp_send_vars) * g_config.mos->max_concurrency, IS_HUGEPAGE);
1223 	if (!mtcp->sv_pool) {
1224 		CTRACE_ERROR("Failed to allocate tcp send variable pool.\n");
1225 		return NULL;
1226 	}
1227 
1228 	mtcp->rbm_snd = SBManagerCreate(g_config.mos->wmem_size, g_config.mos->no_ring_buffers,
1229 					g_config.mos->max_concurrency);
1230 	if (!mtcp->rbm_snd) {
1231 		CTRACE_ERROR("Failed to create send ring buffer.\n");
1232 		return NULL;
1233 	}
1234 #ifndef NEWRB
1235 	mtcp->rbm_rcv = RBManagerCreate(g_config.mos->rmem_size, g_config.mos->no_ring_buffers,
1236 					g_config.mos->max_concurrency);
1237 	if (!mtcp->rbm_rcv) {
1238 		CTRACE_ERROR("Failed to create recv ring buffer.\n");
1239 		return NULL;
1240 	}
1241 #endif
1242 
1243 	mtcp->smap = (socket_map_t)calloc(g_config.mos->max_concurrency, sizeof(struct socket_map));
1244 	if (!mtcp->smap) {
1245 		perror("calloc");
1246 		CTRACE_ERROR("Failed to allocate memory for stream map.\n");
1247 		return NULL;
1248 	}
1249 
1250 	if (mon_app_exists) {
1251 		mtcp->msmap = (socket_map_t)calloc(g_config.mos->max_concurrency, sizeof(struct socket_map));
1252 		if (!mtcp->msmap) {
1253 			perror("calloc");
1254 			CTRACE_ERROR("Failed to allocate memory for monitor stream map.\n");
1255 			return NULL;
1256 		}
1257 
1258 		for (i = 0; i < g_config.mos->max_concurrency; i++) {
1259 			mtcp->msmap[i].monitor_stream = calloc(1, sizeof(struct mon_stream));
1260 			if (!mtcp->msmap[i].monitor_stream) {
1261 				perror("calloc");
1262 				CTRACE_ERROR("Failed to allocate memory for monitr stream map\n");
1263 				return NULL;
1264 			}
1265 		}
1266 	}
1267 
1268 	TAILQ_INIT(&mtcp->timer_list);
1269 	TAILQ_INIT(&mtcp->monitors);
1270 
1271 	TAILQ_INIT(&mtcp->free_smap);
1272 	for (i = 0; i < g_config.mos->max_concurrency; i++) {
1273 		mtcp->smap[i].id = i;
1274 		mtcp->smap[i].socktype = MOS_SOCK_UNUSED;
1275 		memset(&mtcp->smap[i].saddr, 0, sizeof(struct sockaddr_in));
1276 		mtcp->smap[i].stream = NULL;
1277 		TAILQ_INSERT_TAIL(&mtcp->free_smap, &mtcp->smap[i], link);
1278 	}
1279 
1280 	if (mon_app_exists) {
1281 		TAILQ_INIT(&mtcp->free_msmap);
1282 		for (i = 0; i < g_config.mos->max_concurrency; i++) {
1283 			mtcp->msmap[i].id = i;
1284 			mtcp->msmap[i].socktype = MOS_SOCK_UNUSED;
1285 			memset(&mtcp->msmap[i].saddr, 0, sizeof(struct sockaddr_in));
1286 			TAILQ_INSERT_TAIL(&mtcp->free_msmap, &mtcp->msmap[i], link);
1287 		}
1288 	}
1289 
1290 	mtcp->ctx = ctx;
1291 	mtcp->ep = NULL;
1292 
1293 	snprintf(log_name, MAX_FILE_NAME, "%s/"LOG_FILE_NAME"_%d",
1294 			g_config.mos->mos_log, ctx->cpu);
1295 	mtcp->log_fp = fopen(log_name, "w+");
1296 	if (!mtcp->log_fp) {
1297 		perror("fopen");
1298 		CTRACE_ERROR("Failed to create file for logging. (%s)\n", log_name);
1299 		return NULL;
1300 	}
1301 	mtcp->sp_fd = g_logctx[ctx->cpu]->pair_sp_fd;
1302 	mtcp->logger = g_logctx[ctx->cpu];
1303 
1304 	mtcp->connectq = CreateStreamQueue(BACKLOG_SIZE);
1305 	if (!mtcp->connectq) {
1306 		CTRACE_ERROR("Failed to create connect queue.\n");
1307 		return NULL;
1308 	}
1309 	mtcp->sendq = CreateStreamQueue(g_config.mos->max_concurrency);
1310 	if (!mtcp->sendq) {
1311 		CTRACE_ERROR("Failed to create send queue.\n");
1312 		return NULL;
1313 	}
1314 	mtcp->ackq = CreateStreamQueue(g_config.mos->max_concurrency);
1315 	if (!mtcp->ackq) {
1316 		CTRACE_ERROR("Failed to create ack queue.\n");
1317 		return NULL;
1318 	}
1319 	mtcp->closeq = CreateStreamQueue(g_config.mos->max_concurrency);
1320 	if (!mtcp->closeq) {
1321 		CTRACE_ERROR("Failed to create close queue.\n");
1322 		return NULL;
1323 	}
1324 	mtcp->closeq_int = CreateInternalStreamQueue(g_config.mos->max_concurrency);
1325 	if (!mtcp->closeq_int) {
1326 		CTRACE_ERROR("Failed to create close queue.\n");
1327 		return NULL;
1328 	}
1329 	mtcp->resetq = CreateStreamQueue(g_config.mos->max_concurrency);
1330 	if (!mtcp->resetq) {
1331 		CTRACE_ERROR("Failed to create reset queue.\n");
1332 		return NULL;
1333 	}
1334 	mtcp->resetq_int = CreateInternalStreamQueue(g_config.mos->max_concurrency);
1335 	if (!mtcp->resetq_int) {
1336 		CTRACE_ERROR("Failed to create reset queue.\n");
1337 		return NULL;
1338 	}
1339 	mtcp->destroyq = CreateStreamQueue(g_config.mos->max_concurrency);
1340 	if (!mtcp->destroyq) {
1341 		CTRACE_ERROR("Failed to create destroy queue.\n");
1342 		return NULL;
1343 	}
1344 
1345 	mtcp->g_sender = CreateMTCPSender(-1);
1346 	if (!mtcp->g_sender) {
1347 		CTRACE_ERROR("Failed to create global sender structure.\n");
1348 		return NULL;
1349 	}
1350 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
1351 		mtcp->n_sender[i] = CreateMTCPSender(i);
1352 		if (!mtcp->n_sender[i]) {
1353 			CTRACE_ERROR("Failed to create per-nic sender structure.\n");
1354 			return NULL;
1355 		}
1356 	}
1357 
1358 	mtcp->rto_store = InitRTOHashstore();
1359 	TAILQ_INIT(&mtcp->timewait_list);
1360 	TAILQ_INIT(&mtcp->timeout_list);
1361 
1362 	return mtcp;
1363 }
1364 /*----------------------------------------------------------------------------*/
1365 static void *
1366 MTCPRunThread(void *arg)
1367 {
1368 	mctx_t mctx = (mctx_t)arg;
1369 	int cpu = mctx->cpu;
1370 	int working;
1371 	struct mtcp_manager *mtcp;
1372 	struct mtcp_thread_context *ctx;
1373 
1374 	/* affinitize the thread to this core first */
1375 	mtcp_core_affinitize(cpu);
1376 
1377 	/* memory alloc after core affinitization would use local memory
1378 	   most time */
1379 	ctx = calloc(1, sizeof(*ctx));
1380 	if (!ctx) {
1381 		perror("calloc");
1382 		TRACE_ERROR("Failed to calloc mtcp context.\n");
1383 		exit(-1);
1384 	}
1385 	ctx->thread = pthread_self();
1386 	ctx->cpu = cpu;
1387 	mtcp = ctx->mtcp_manager = InitializeMTCPManager(ctx);
1388 	if (!mtcp) {
1389 		TRACE_ERROR("Failed to initialize mtcp manager.\n");
1390 		exit(-1);
1391 	}
1392 
1393 	/* assign mtcp context's underlying I/O module */
1394 	mtcp->iom = current_iomodule_func;
1395 
1396 	/* I/O initializing */
1397 	if (mtcp->iom->init_handle)
1398 		mtcp->iom->init_handle(ctx);
1399 
1400 	if (pthread_mutex_init(&ctx->flow_pool_lock, NULL)) {
1401 		perror("pthread_mutex_init of ctx->flow_pool_lock\n");
1402 		exit(-1);
1403 	}
1404 
1405 	if (pthread_mutex_init(&ctx->socket_pool_lock, NULL)) {
1406 		perror("pthread_mutex_init of ctx->socket_pool_lock\n");
1407 		exit(-1);
1408 	}
1409 
1410 	SQ_LOCK_INIT(&ctx->connect_lock, "ctx->connect_lock", exit(-1));
1411 	SQ_LOCK_INIT(&ctx->close_lock, "ctx->close_lock", exit(-1));
1412 	SQ_LOCK_INIT(&ctx->reset_lock, "ctx->reset_lock", exit(-1));
1413 	SQ_LOCK_INIT(&ctx->sendq_lock, "ctx->sendq_lock", exit(-1));
1414 	SQ_LOCK_INIT(&ctx->ackq_lock, "ctx->ackq_lock", exit(-1));
1415 	SQ_LOCK_INIT(&ctx->destroyq_lock, "ctx->destroyq_lock", exit(-1));
1416 
1417 	/* remember this context pointer for signal processing */
1418 	g_pctx[cpu] = ctx;
1419 	mlockall(MCL_CURRENT);
1420 
1421 	// attach (nic device, queue)
1422 	working = AttachDevice(ctx);
1423 	if (working != 0) {
1424 		sem_post(&g_init_sem[ctx->cpu]);
1425 		TRACE_DBG("MTCP thread %d finished. Not attached any device\n", ctx->cpu);
1426 		pthread_exit(NULL);
1427 	}
1428 
1429 	TRACE_DBG("CPU %d: initialization finished.\n", cpu);
1430 	sem_post(&g_init_sem[ctx->cpu]);
1431 
1432 	/* start the main loop */
1433 	RunMainLoop(ctx);
1434 
1435 	TRACE_DBG("MTCP thread %d finished.\n", ctx->cpu);
1436 
1437 	/* signaling mTCP thread is done */
1438 	sem_post(&g_done_sem[mctx->cpu]);
1439 
1440 	//pthread_exit(NULL);
1441 	return 0;
1442 }
1443 /*----------------------------------------------------------------------------*/
1444 #ifdef ENABLE_DPDK
1445 static int MTCPDPDKRunThread(void *arg)
1446 {
1447 	MTCPRunThread(arg);
1448 	return 0;
1449 }
1450 #endif /* !ENABLE_DPDK */
1451 /*----------------------------------------------------------------------------*/
1452 mctx_t
1453 mtcp_create_context(int cpu)
1454 {
1455 	mctx_t mctx;
1456 	int ret;
1457 
1458 	if (cpu >=  g_config.mos->num_cores) {
1459 		TRACE_ERROR("Failed initialize new mtcp context. "
1460 					"Requested cpu id %d exceed the number of cores %d configured to use.\n",
1461 					cpu, g_config.mos->num_cores);
1462 		return NULL;
1463 	}
1464 
1465         /* check if mtcp_create_context() was already initialized */
1466         if (g_logctx[cpu] != NULL) {
1467                 TRACE_ERROR("%s was already initialized before!\n",
1468                             __FUNCTION__);
1469                 return NULL;
1470         }
1471 
1472 	ret = sem_init(&g_init_sem[cpu], 0, 0);
1473 	if (ret) {
1474 		TRACE_ERROR("Failed initialize init_sem.\n");
1475 		return NULL;
1476 	}
1477 
1478 	ret = sem_init(&g_done_sem[cpu], 0, 0);
1479 	if (ret) {
1480 		TRACE_ERROR("Failed initialize done_sem.\n");
1481 		return NULL;
1482 	}
1483 
1484 	mctx = (mctx_t)calloc(1, sizeof(struct mtcp_context));
1485 	if (!mctx) {
1486 		TRACE_ERROR("Failed to allocate memory for mtcp_context.\n");
1487 		return NULL;
1488 	}
1489 	mctx->cpu = cpu;
1490 	g_ctx[cpu] = mctx;
1491 
1492 	/* initialize logger */
1493 	g_logctx[cpu] = (struct log_thread_context *)
1494 			calloc(1, sizeof(struct log_thread_context));
1495 	if (!g_logctx[cpu]) {
1496 		perror("malloc");
1497 		TRACE_ERROR("Failed to allocate memory for log thread context.\n");
1498 		return NULL;
1499 	}
1500 	InitLogThreadContext(g_logctx[cpu], cpu);
1501 	if (pthread_create(&log_thread[cpu],
1502 			   NULL, ThreadLogMain, (void *)g_logctx[cpu])) {
1503 		perror("pthread_create");
1504 		TRACE_ERROR("Failed to create log thread\n");
1505 		return NULL;
1506 	}
1507 
1508 #ifdef ENABLE_DPDK
1509 	/* Wake up mTCP threads (wake up I/O threads) */
1510 	if (current_iomodule_func == &dpdk_module_func) {
1511 		int master;
1512 		master = rte_get_master_lcore();
1513 		if (master == cpu) {
1514 			lcore_config[master].ret = 0;
1515 			lcore_config[master].state = FINISHED;
1516 			if (pthread_create(&g_thread[cpu],
1517 					   NULL, MTCPRunThread, (void *)mctx) != 0) {
1518 				TRACE_ERROR("pthread_create of mtcp thread failed!\n");
1519 				return NULL;
1520 			}
1521 		} else
1522 			rte_eal_remote_launch(MTCPDPDKRunThread, mctx, cpu);
1523 	} else
1524 #endif /* !ENABLE_DPDK */
1525 		{
1526 			if (pthread_create(&g_thread[cpu],
1527 					   NULL, MTCPRunThread, (void *)mctx) != 0) {
1528 				TRACE_ERROR("pthread_create of mtcp thread failed!\n");
1529 				return NULL;
1530 			}
1531 		}
1532 
1533 	sem_wait(&g_init_sem[cpu]);
1534 	sem_destroy(&g_init_sem[cpu]);
1535 
1536 	running[cpu] = TRUE;
1537 
1538 #ifdef NETSTAT
1539 #if NETSTAT_TOTAL
1540 	if (printer < 0) {
1541 		printer = cpu;
1542 		TRACE_INFO("CPU %d is in charge of printing stats.\n", printer);
1543 	}
1544 #endif
1545 #endif
1546 
1547 	return mctx;
1548 }
1549 /*----------------------------------------------------------------------------*/
1550 /**
1551  * TODO: It currently always returns 0. Add appropriate error return values
1552  */
1553 int
1554 mtcp_destroy_context(mctx_t mctx)
1555 {
1556 	struct mtcp_thread_context *ctx = g_pctx[mctx->cpu];
1557 	struct mtcp_manager *mtcp = ctx->mtcp_manager;
1558 	struct log_thread_context *log_ctx = mtcp->logger;
1559 	int ret, i;
1560 
1561 	TRACE_DBG("CPU %d: mtcp_destroy_context()\n", mctx->cpu);
1562 
1563 	/* close all stream sockets that are still open */
1564 	if (!ctx->exit) {
1565 		for (i = 0; i < g_config.mos->max_concurrency; i++) {
1566 			if (mtcp->smap[i].socktype == MOS_SOCK_STREAM) {
1567 				TRACE_DBG("Closing remaining socket %d (%s)\n",
1568 						i, TCPStateToString(mtcp->smap[i].stream));
1569 				DumpStream(mtcp, mtcp->smap[i].stream);
1570 				mtcp_close(mctx, i);
1571 			}
1572 		}
1573 	}
1574 
1575 	ctx->done = 1;
1576 
1577 	//pthread_kill(g_thread[mctx->cpu], SIGINT);
1578 #ifdef ENABLE_DPDK
1579 	ctx->exit = 1;
1580 	/* XXX - dpdk logic changes */
1581 	if (current_iomodule_func == &dpdk_module_func) {
1582 		int master = rte_get_master_lcore();
1583 		if (master == mctx->cpu)
1584 			pthread_join(g_thread[mctx->cpu], NULL);
1585 		else
1586 			rte_eal_wait_lcore(mctx->cpu);
1587 	} else
1588 #endif /* !ENABLE_DPDK */
1589 		{
1590 			pthread_join(g_thread[mctx->cpu], NULL);
1591 		}
1592 
1593 	TRACE_INFO("MTCP thread %d joined.\n", mctx->cpu);
1594 	running[mctx->cpu] = FALSE;
1595 
1596 #ifdef NETSTAT
1597 #if NETSTAT_TOTAL
1598 	if (printer == mctx->cpu) {
1599 		for (i = 0; i < num_cpus; i++) {
1600 			if (i != mctx->cpu && running[i]) {
1601 				printer = i;
1602 				break;
1603 			}
1604 		}
1605 	}
1606 #endif
1607 #endif
1608 
1609 	log_ctx->done = 1;
1610 	ret = write(log_ctx->pair_sp_fd, "F", 1);
1611 	if (ret != 1)
1612 		TRACE_ERROR("CPU %d: Fail to signal socket pair\n", mctx->cpu);
1613 
1614 	pthread_join(log_thread[ctx->cpu], NULL);
1615 	fclose(mtcp->log_fp);
1616 	TRACE_LOG("Log thread %d joined.\n", mctx->cpu);
1617 
1618 	if (mtcp->connectq) {
1619 		DestroyStreamQueue(mtcp->connectq);
1620 		mtcp->connectq = NULL;
1621 	}
1622 	if (mtcp->sendq) {
1623 		DestroyStreamQueue(mtcp->sendq);
1624 		mtcp->sendq = NULL;
1625 	}
1626 	if (mtcp->ackq) {
1627 		DestroyStreamQueue(mtcp->ackq);
1628 		mtcp->ackq = NULL;
1629 	}
1630 	if (mtcp->closeq) {
1631 		DestroyStreamQueue(mtcp->closeq);
1632 		mtcp->closeq = NULL;
1633 	}
1634 	if (mtcp->closeq_int) {
1635 		DestroyInternalStreamQueue(mtcp->closeq_int);
1636 		mtcp->closeq_int = NULL;
1637 	}
1638 	if (mtcp->resetq) {
1639 		DestroyStreamQueue(mtcp->resetq);
1640 		mtcp->resetq = NULL;
1641 	}
1642 	if (mtcp->resetq_int) {
1643 		DestroyInternalStreamQueue(mtcp->resetq_int);
1644 		mtcp->resetq_int = NULL;
1645 	}
1646 	if (mtcp->destroyq) {
1647 		DestroyStreamQueue(mtcp->destroyq);
1648 		mtcp->destroyq = NULL;
1649 	}
1650 
1651 	DestroyMTCPSender(mtcp->g_sender);
1652 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
1653 		DestroyMTCPSender(mtcp->n_sender[i]);
1654 	}
1655 
1656 	MPDestroy(mtcp->rv_pool);
1657 	MPDestroy(mtcp->sv_pool);
1658 	MPDestroy(mtcp->flow_pool);
1659 
1660 	if (mtcp->ap) {
1661 		DestroyAddressPool(mtcp->ap);
1662 	}
1663 
1664 	SQ_LOCK_DESTROY(&ctx->connect_lock);
1665 	SQ_LOCK_DESTROY(&ctx->close_lock);
1666 	SQ_LOCK_DESTROY(&ctx->reset_lock);
1667 	SQ_LOCK_DESTROY(&ctx->sendq_lock);
1668 	SQ_LOCK_DESTROY(&ctx->ackq_lock);
1669 	SQ_LOCK_DESTROY(&ctx->destroyq_lock);
1670 
1671 	//TRACE_INFO("MTCP thread %d destroyed.\n", mctx->cpu);
1672 	if (mtcp->iom->destroy_handle)
1673 		mtcp->iom->destroy_handle(ctx);
1674 	free(ctx);
1675 	free(mctx);
1676 
1677 	return 0;
1678 }
1679 /*----------------------------------------------------------------------------*/
1680 mtcp_sighandler_t
1681 mtcp_register_signal(int signum, mtcp_sighandler_t handler)
1682 {
1683 	mtcp_sighandler_t prev;
1684 
1685 	if (signum == SIGINT) {
1686 		prev = app_signal_handler;
1687 		app_signal_handler = handler;
1688 	} else {
1689 		if ((prev = signal(signum, handler)) == SIG_ERR) {
1690 			perror("signal");
1691 			return SIG_ERR;
1692 		}
1693 	}
1694 
1695 	return prev;
1696 }
1697 /*----------------------------------------------------------------------------*/
1698 int
1699 mtcp_getconf(struct mtcp_conf *conf)
1700 {
1701 	int i, j;
1702 
1703 	if (!conf)
1704 		return -1;
1705 
1706 	conf->num_cores = g_config.mos->num_cores;
1707 	conf->max_concurrency = g_config.mos->max_concurrency;
1708 	conf->cpu_mask = g_config.mos->cpu_mask;
1709 
1710 	conf->rcvbuf_size = g_config.mos->rmem_size;
1711 	conf->sndbuf_size = g_config.mos->wmem_size;
1712 
1713 	conf->tcp_timewait = g_config.mos->tcp_tw_interval;
1714 	conf->tcp_timeout = g_config.mos->tcp_timeout;
1715 
1716 	i = 0;
1717 	struct conf_block *bwalk;
1718 	TAILQ_FOREACH(bwalk, &g_config.app_blkh, link) {
1719 		struct app_conf *app_conf = (struct app_conf *)bwalk->conf;
1720 		for (j = 0; j < app_conf->app_argc; j++)
1721 			conf->app_argv[i][j] = app_conf->app_argv[j];
1722 		conf->app_argc[i] = app_conf->app_argc;
1723 		conf->app_cpu_mask[i] = app_conf->cpu_mask;
1724 		i++;
1725 	}
1726 	conf->num_app = i;
1727 
1728 	return 0;
1729 }
1730 /*----------------------------------------------------------------------------*/
1731 int
1732 mtcp_setconf(const struct mtcp_conf *conf)
1733 {
1734 	if (!conf)
1735 		return -1;
1736 
1737 	g_config.mos->num_cores = conf->num_cores;
1738 	g_config.mos->max_concurrency = conf->max_concurrency;
1739 
1740 	g_config.mos->rmem_size = conf->rcvbuf_size;
1741 	g_config.mos->wmem_size = conf->sndbuf_size;
1742 
1743 	g_config.mos->tcp_tw_interval = conf->tcp_timewait;
1744 	g_config.mos->tcp_timeout = conf->tcp_timeout;
1745 
1746 	TRACE_CONFIG("Configuration updated by mtcp_setconf().\n");
1747 	//PrintConfiguration();
1748 
1749 	return 0;
1750 }
1751 /*----------------------------------------------------------------------------*/
1752 int
1753 mtcp_init(const char *config_file)
1754 {
1755 	int i;
1756 	int ret;
1757 
1758 	if (geteuid()) {
1759 		TRACE_CONFIG("[CAUTION] Run as root if mlock is necessary.\n");
1760 	}
1761 
1762 	/* getting cpu and NIC */
1763 	num_cpus = GetNumCPUs();
1764 	assert(num_cpus >= 1);
1765 	for (i = 0; i < num_cpus; i++) {
1766 		g_mtcp[i] = NULL;
1767 		running[i] = FALSE;
1768 		sigint_cnt[i] = 0;
1769 	}
1770 
1771 	ret = LoadConfigurationUpperHalf(config_file);
1772 	if (ret) {
1773 		TRACE_CONFIG("Error occured while loading configuration.\n");
1774 		return -1;
1775 	}
1776 
1777 #if defined(ENABLE_PSIO)
1778 	current_iomodule_func = &ps_module_func;
1779 #elif defined(ENABLE_DPDK)
1780 	current_iomodule_func = &dpdk_module_func;
1781 #elif defined(ENABLE_PCAP)
1782 	current_iomodule_func = &pcap_module_func;
1783 #elif defined(ENABLE_DPDKR)
1784 	current_iomodule_func = &dpdkr_module_func;
1785 #endif
1786 
1787 	if (current_iomodule_func->load_module_upper_half)
1788 		current_iomodule_func->load_module_upper_half();
1789 
1790 	LoadConfigurationLowerHalf();
1791 
1792 	//PrintConfiguration();
1793 
1794 	/* TODO: this should be fixed */
1795 	ap = CreateAddressPool(g_config.mos->netdev_table->ent[0]->ip_addr, 1);
1796 	if (!ap) {
1797 		TRACE_CONFIG("Error occured while creating address pool.\n");
1798 		return -1;
1799 	}
1800 
1801 	//PrintInterfaceInfo();
1802 	//PrintRoutingTable();
1803 	//PrintARPTable();
1804 	InitARPTable();
1805 
1806 	if (signal(SIGUSR1, HandleSignal) == SIG_ERR) {
1807 		perror("signal, SIGUSR1");
1808 		return -1;
1809 	}
1810 	if (signal(SIGINT, HandleSignal) == SIG_ERR) {
1811 		perror("signal, SIGINT");
1812 		return -1;
1813 	}
1814 	app_signal_handler = NULL;
1815 
1816 	printf("load_module(): %p\n", current_iomodule_func);
1817 	/* load system-wide io module specs */
1818 	if (current_iomodule_func->load_module_lower_half)
1819 		current_iomodule_func->load_module_lower_half();
1820 
1821 	GlobInitEvent();
1822 
1823 	PrintConf(&g_config);
1824 
1825 	return 0;
1826 }
1827 /*----------------------------------------------------------------------------*/
1828 int
1829 mtcp_destroy()
1830 {
1831 	int i;
1832 
1833 	/* wait until all threads are closed */
1834 	for (i = 0; i < num_cpus; i++) {
1835 		if (running[i]) {
1836 			if (pthread_join(g_thread[i], NULL) != 0)
1837 				return -1;
1838 		}
1839 	}
1840 
1841 	DestroyAddressPool(ap);
1842 
1843 	TRACE_INFO("All MTCP threads are joined.\n");
1844 
1845 	return 0;
1846 }
1847 /*----------------------------------------------------------------------------*/
1848