xref: /mOS-networking-stack/core/src/core.c (revision cafe7743)
1 #define _GNU_SOURCE
2 #include <sched.h>
3 #include <unistd.h>
4 #include <sys/time.h>
5 #include <semaphore.h>
6 #include <sys/mman.h>
7 #include <signal.h>
8 #include <assert.h>
9 #include <string.h>
10 
11 #include "cpu.h"
12 #include "eth_in.h"
13 #include "fhash.h"
14 #include "tcp_send_buffer.h"
15 #include "tcp_ring_buffer.h"
16 #include "socket.h"
17 #include "eth_out.h"
18 #include "tcp.h"
19 #include "tcp_in.h"
20 #include "tcp_out.h"
21 #include "mtcp_api.h"
22 #include "eventpoll.h"
23 #include "logger.h"
24 #include "config.h"
25 #include "arp.h"
26 #include "ip_out.h"
27 #include "timer.h"
28 #include "debug.h"
29 #include "event_callback.h"
30 #include "tcp_rb.h"
31 #include "tcp_stream.h"
32 #include "io_module.h"
33 
34 #ifdef ENABLE_DPDK
35 /* for launching rte thread */
36 #include <rte_launch.h>
37 #include <rte_lcore.h>
38 #endif /* !ENABLE_DPDK */
39 #define PS_CHUNK_SIZE 64
40 #define RX_THRESH (PS_CHUNK_SIZE * 0.8)
41 
42 #define ROUND_STAT FALSE
43 #define TIME_STAT FALSE
44 #define EVENT_STAT FALSE
45 #define TESTING FALSE
46 
47 #define LOG_FILE_NAME "log"
48 #define MAX_FILE_NAME 1024
49 
50 #define MAX(a, b) ((a)>(b)?(a):(b))
51 #define MIN(a, b) ((a)<(b)?(a):(b))
52 
53 #define PER_STREAM_SLICE 0.1		// in ms
54 #define PER_STREAM_TCHECK 1			// in ms
55 #define PS_SELECT_TIMEOUT 100		// in us
56 
57 #define GBPS(bytes) (bytes * 8.0 / (1000 * 1000 * 1000))
58 
59 /*----------------------------------------------------------------------------*/
60 /* handlers for threads */
61 struct mtcp_thread_context *g_pctx[MAX_CPUS] = {0};
62 struct log_thread_context *g_logctx[MAX_CPUS] = {0};
63 /*----------------------------------------------------------------------------*/
64 static pthread_t g_thread[MAX_CPUS] = {0};
65 static pthread_t log_thread[MAX_CPUS] = {0};
66 /*----------------------------------------------------------------------------*/
67 static sem_t g_init_sem[MAX_CPUS];
68 static sem_t g_done_sem[MAX_CPUS];
69 static int running[MAX_CPUS] = {0};
70 /*----------------------------------------------------------------------------*/
71 mtcp_sighandler_t app_signal_handler;
72 static int sigint_cnt[MAX_CPUS] = {0};
73 static struct timespec sigint_ts[MAX_CPUS];
74 /*----------------------------------------------------------------------------*/
75 #ifdef NETSTAT
76 #if NETSTAT_TOTAL
77 static int printer = -1;
78 #if ROUND_STAT
79 #endif /* ROUND_STAT */
80 #endif /* NETSTAT_TOTAL */
81 #endif /* NETSTAT */
82 /*----------------------------------------------------------------------------*/
83 void
84 HandleSignal(int signal)
85 {
86 	int i = 0;
87 
88 	if (signal == SIGINT) {
89 #ifdef DARWIN
90 		int core = 0;
91 #else
92 		int core = sched_getcpu();
93 #endif
94 		struct timespec cur_ts;
95 
96 		clock_gettime(CLOCK_REALTIME, &cur_ts);
97 
98 		if (sigint_cnt[core] > 0 && cur_ts.tv_sec == sigint_ts[core].tv_sec) {
99 			for (i = 0; i < g_config.mos->num_cores; i++) {
100 				if (running[i]) {
101 					exit(0);
102 					g_pctx[i]->exit = TRUE;
103 				}
104 			}
105 		} else {
106 			for (i = 0; i < g_config.mos->num_cores; i++) {
107 				if (g_pctx[i])
108 					g_pctx[i]->interrupt = TRUE;
109 			}
110 			if (!app_signal_handler) {
111 				for (i = 0; i < g_config.mos->num_cores; i++) {
112 					if (running[i]) {
113 						exit(0);
114 						g_pctx[i]->exit = TRUE;
115 					}
116 				}
117 			}
118 		}
119 		sigint_cnt[core]++;
120 		clock_gettime(CLOCK_REALTIME, &sigint_ts[core]);
121 	}
122 
123 	if (signal != SIGUSR1) {
124 		if (app_signal_handler) {
125 			app_signal_handler(signal);
126 		}
127 	}
128 }
129 /*----------------------------------------------------------------------------*/
130 static int
131 AttachDevice(struct mtcp_thread_context* ctx)
132 {
133 	int working = -1;
134 	mtcp_manager_t mtcp = ctx->mtcp_manager;
135 
136 	if (mtcp->iom->link_devices)
137 		working = mtcp->iom->link_devices(ctx);
138 	else
139 		return 0;
140 
141 	return working;
142 }
143 /*----------------------------------------------------------------------------*/
144 #ifdef TIMESTAT
145 static inline void
146 InitStatCounter(struct stat_counter *counter)
147 {
148 	counter->cnt = 0;
149 	counter->sum = 0;
150 	counter->max = 0;
151 	counter->min = 0;
152 }
153 /*----------------------------------------------------------------------------*/
154 static inline void
155 UpdateStatCounter(struct stat_counter *counter, int64_t value)
156 {
157 	counter->cnt++;
158 	counter->sum += value;
159 	if (value > counter->max)
160 		counter->max = value;
161 	if (counter->min == 0 || value < counter->min)
162 		counter->min = value;
163 }
164 /*----------------------------------------------------------------------------*/
165 static inline uint64_t
166 GetAverageStat(struct stat_counter *counter)
167 {
168 	return counter->cnt ? (counter->sum / counter->cnt) : 0;
169 }
170 /*----------------------------------------------------------------------------*/
171 static inline int64_t
172 TimeDiffUs(struct timeval *t2, struct timeval *t1)
173 {
174 	return (t2->tv_sec - t1->tv_sec) * 1000000 +
175 			(int64_t)(t2->tv_usec - t1->tv_usec);
176 }
177 /*----------------------------------------------------------------------------*/
178 #endif
179 #ifdef NETSTAT
180 static inline void
181 PrintThreadNetworkStats(mtcp_manager_t mtcp, struct net_stat *ns)
182 {
183 	int i;
184 
185 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
186 		ns->rx_packets[i] = mtcp->nstat.rx_packets[i] - mtcp->p_nstat.rx_packets[i];
187 		ns->rx_errors[i] = mtcp->nstat.rx_errors[i] - mtcp->p_nstat.rx_errors[i];
188 		ns->rx_bytes[i] = mtcp->nstat.rx_bytes[i] - mtcp->p_nstat.rx_bytes[i];
189 		ns->tx_packets[i] = mtcp->nstat.tx_packets[i] - mtcp->p_nstat.tx_packets[i];
190 		ns->tx_drops[i] = mtcp->nstat.tx_drops[i] - mtcp->p_nstat.tx_drops[i];
191 		ns->tx_bytes[i] = mtcp->nstat.tx_bytes[i] - mtcp->p_nstat.tx_bytes[i];
192 #if NETSTAT_PERTHREAD
193 		if (g_config.mos->netdev_table->ent[i]->stat_print) {
194 			fprintf(stderr, "[CPU%2d] %s flows: %6u, "
195 					"RX: %7llu(pps) (err: %5llu), %5.2lf(Gbps), "
196 					"TX: %7llu(pps), %5.2lf(Gbps)\n",
197 					mtcp->ctx->cpu,
198 					g_config.mos->netdev_table->ent[i]->dev_name,
199 					(unsigned)mtcp->flow_cnt,
200 					(long long unsigned)ns->rx_packets[i],
201 					(long long unsigned)ns->rx_errors[i],
202 					GBPS(ns->rx_bytes[i]),
203 					(long long unsigned)ns->tx_packets[i],
204 					GBPS(ns->tx_bytes[i]));
205 		}
206 #endif
207 	}
208 	mtcp->p_nstat = mtcp->nstat;
209 
210 }
211 /*----------------------------------------------------------------------------*/
212 #if ROUND_STAT
213 static inline void
214 PrintThreadRoundStats(mtcp_manager_t mtcp, struct run_stat *rs)
215 {
216 #define ROUND_DIV (1000)
217 	rs->rounds = mtcp->runstat.rounds - mtcp->p_runstat.rounds;
218 	rs->rounds_rx = mtcp->runstat.rounds_rx - mtcp->p_runstat.rounds_rx;
219 	rs->rounds_rx_try = mtcp->runstat.rounds_rx_try - mtcp->p_runstat.rounds_rx_try;
220 	rs->rounds_tx = mtcp->runstat.rounds_tx - mtcp->p_runstat.rounds_tx;
221 	rs->rounds_tx_try = mtcp->runstat.rounds_tx_try - mtcp->p_runstat.rounds_tx_try;
222 	rs->rounds_select = mtcp->runstat.rounds_select - mtcp->p_runstat.rounds_select;
223 	rs->rounds_select_rx = mtcp->runstat.rounds_select_rx - mtcp->p_runstat.rounds_select_rx;
224 	rs->rounds_select_tx = mtcp->runstat.rounds_select_tx - mtcp->p_runstat.rounds_select_tx;
225 	rs->rounds_select_intr = mtcp->runstat.rounds_select_intr - mtcp->p_runstat.rounds_select_intr;
226 	rs->rounds_twcheck = mtcp->runstat.rounds_twcheck - mtcp->p_runstat.rounds_twcheck;
227 	mtcp->p_runstat = mtcp->runstat;
228 #if NETSTAT_PERTHREAD
229 	fprintf(stderr, "[CPU%2d] Rounds: %4lluK, "
230 			"rx: %3lluK (try: %4lluK), tx: %3lluK (try: %4lluK), "
231 			"ps_select: %4llu (rx: %4llu, tx: %4llu, intr: %3llu)\n",
232 			mtcp->ctx->cpu, rs->rounds / ROUND_DIV,
233 			rs->rounds_rx / ROUND_DIV, rs->rounds_rx_try / ROUND_DIV,
234 			rs->rounds_tx / ROUND_DIV, rs->rounds_tx_try / ROUND_DIV,
235 			rs->rounds_select,
236 			rs->rounds_select_rx, rs->rounds_select_tx, rs->rounds_select_intr);
237 #endif
238 }
239 #endif /* ROUND_STAT */
240 /*----------------------------------------------------------------------------*/
241 #if TIME_STAT
242 static inline void
243 PrintThreadRoundTime(mtcp_manager_t mtcp)
244 {
245 	fprintf(stderr, "[CPU%2d] Time: (avg, max) "
246 			"round: (%4luus, %4luus), processing: (%4luus, %4luus), "
247 			"tcheck: (%4luus, %4luus), epoll: (%4luus, %4luus), "
248 			"handle: (%4luus, %4luus), xmit: (%4luus, %4luus), "
249 			"select: (%4luus, %4luus)\n", mtcp->ctx->cpu,
250 			GetAverageStat(&mtcp->rtstat.round), mtcp->rtstat.round.max,
251 			GetAverageStat(&mtcp->rtstat.processing), mtcp->rtstat.processing.max,
252 			GetAverageStat(&mtcp->rtstat.tcheck), mtcp->rtstat.tcheck.max,
253 			GetAverageStat(&mtcp->rtstat.epoll), mtcp->rtstat.epoll.max,
254 			GetAverageStat(&mtcp->rtstat.handle), mtcp->rtstat.handle.max,
255 			GetAverageStat(&mtcp->rtstat.xmit), mtcp->rtstat.xmit.max,
256 			GetAverageStat(&mtcp->rtstat.select), mtcp->rtstat.select.max);
257 
258 	InitStatCounter(&mtcp->rtstat.round);
259 	InitStatCounter(&mtcp->rtstat.processing);
260 	InitStatCounter(&mtcp->rtstat.tcheck);
261 	InitStatCounter(&mtcp->rtstat.epoll);
262 	InitStatCounter(&mtcp->rtstat.handle);
263 	InitStatCounter(&mtcp->rtstat.xmit);
264 	InitStatCounter(&mtcp->rtstat.select);
265 }
266 #endif
267 #endif /* NETSTAT */
268 /*----------------------------------------------------------------------------*/
269 #if EVENT_STAT
270 static inline void
271 PrintEventStat(int core, struct mtcp_epoll_stat *stat)
272 {
273 	fprintf(stderr, "[CPU%2d] calls: %lu, waits: %lu, wakes: %lu, "
274 			"issued: %lu, registered: %lu, invalidated: %lu, handled: %lu\n",
275 			core, stat->calls, stat->waits, stat->wakes,
276 			stat->issued, stat->registered, stat->invalidated, stat->handled);
277 	memset(stat, 0, sizeof(struct mtcp_epoll_stat));
278 }
279 #endif /* EVENT_STAT */
280 /*----------------------------------------------------------------------------*/
281 #ifdef NETSTAT
282 static inline void
283 PrintNetworkStats(mtcp_manager_t mtcp, uint32_t cur_ts)
284 {
285 #define TIMEOUT 1
286 	int i;
287 	struct net_stat ns;
288 	bool stat_print = false;
289 #if ROUND_STAT
290 	struct run_stat rs;
291 #endif /* ROUND_STAT */
292 #ifdef NETSTAT_TOTAL
293 	static double peak_total_rx_gbps = 0;
294 	static double peak_total_tx_gbps = 0;
295 	static double avg_total_rx_gbps = 0;
296 	static double avg_total_tx_gbps = 0;
297 
298 	double total_rx_gbps = 0, total_tx_gbps = 0;
299 	int j;
300 	uint32_t gflow_cnt = 0;
301 	struct net_stat g_nstat;
302 #if ROUND_STAT
303 	struct run_stat g_runstat;
304 #endif /* ROUND_STAT */
305 #endif /* NETSTAT_TOTAL */
306 
307 	if (TS_TO_MSEC(cur_ts - mtcp->p_nstat_ts) < SEC_TO_MSEC(TIMEOUT)) {
308 		return;
309 	}
310 
311 	mtcp->p_nstat_ts = cur_ts;
312 	gflow_cnt = 0;
313 	memset(&g_nstat, 0, sizeof(struct net_stat));
314 	for (i = 0; i < g_config.mos->num_cores; i++) {
315 		if (running[i]) {
316 			PrintThreadNetworkStats(g_mtcp[i], &ns);
317 #if NETSTAT_TOTAL
318 			gflow_cnt += g_mtcp[i]->flow_cnt;
319 			for (j = 0; j < g_config.mos->netdev_table->num; j++) {
320 				g_nstat.rx_packets[j] += ns.rx_packets[j];
321 				g_nstat.rx_errors[j] += ns.rx_errors[j];
322 				g_nstat.rx_bytes[j] += ns.rx_bytes[j];
323 				g_nstat.tx_packets[j] += ns.tx_packets[j];
324 				g_nstat.tx_drops[j] += ns.tx_drops[j];
325 				g_nstat.tx_bytes[j] += ns.tx_bytes[j];
326 			}
327 #endif
328 		}
329 	}
330 #if NETSTAT_TOTAL
331 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
332 		if (g_config.mos->netdev_table->ent[i]->stat_print) {
333 			fprintf(stderr, "[ ALL ] %s, "
334 			"RX: %7llu(pps) (err: %5llu), %5.2lf(Gbps), "
335 			"TX: %7llu(pps), %5.2lf(Gbps)\n",
336 				g_config.mos->netdev_table->ent[i]->dev_name,
337 				(long long unsigned)g_nstat.rx_packets[i],
338 				(long long unsigned)g_nstat.rx_errors[i],
339 				GBPS(g_nstat.rx_bytes[i]),
340 				(long long unsigned)g_nstat.tx_packets[i],
341 				GBPS(g_nstat.tx_bytes[i]));
342 			total_rx_gbps += GBPS(g_nstat.rx_bytes[i]);
343 			total_tx_gbps += GBPS(g_nstat.tx_bytes[i]);
344 			stat_print = true;
345 		}
346 	}
347 	if (stat_print) {
348 		fprintf(stderr, "[ ALL ] flows: %6u\n", gflow_cnt);
349 		if (avg_total_rx_gbps == 0)
350 			avg_total_rx_gbps = total_rx_gbps;
351 		else
352 			avg_total_rx_gbps = avg_total_rx_gbps * 0.6 + total_rx_gbps * 0.4;
353 
354 		if (avg_total_tx_gbps == 0)
355 			avg_total_tx_gbps = total_tx_gbps;
356 		else
357 			avg_total_tx_gbps = avg_total_tx_gbps * 0.6 + total_tx_gbps * 0.4;
358 
359 		if (peak_total_rx_gbps < total_rx_gbps)
360 			peak_total_rx_gbps = total_rx_gbps;
361 		if (peak_total_tx_gbps < total_tx_gbps)
362 			peak_total_tx_gbps = total_tx_gbps;
363 
364 		fprintf(stderr, "[ PEAK ] RX: %5.2lf(Gbps), TX: %5.2lf(Gbps)\n"
365 						"[ RECENT AVG ] RX: %5.2lf(Gbps), TX: %5.2lf(Gbps)\n",
366 				peak_total_rx_gbps, peak_total_tx_gbps,
367 				avg_total_rx_gbps, avg_total_tx_gbps);
368 	}
369 #endif
370 
371 #if ROUND_STAT
372 	memset(&g_runstat, 0, sizeof(struct run_stat));
373 	for (i = 0; i < g_config.mos->num_cores; i++) {
374 		if (running[i]) {
375 			PrintThreadRoundStats(g_mtcp[i], &rs);
376 #if DBGMSG
377 			g_runstat.rounds += rs.rounds;
378 			g_runstat.rounds_rx += rs.rounds_rx;
379 			g_runstat.rounds_rx_try += rs.rounds_rx_try;
380 			g_runstat.rounds_tx += rs.rounds_tx;
381 			g_runstat.rounds_tx_try += rs.rounds_tx_try;
382 			g_runstat.rounds_select += rs.rounds_select;
383 			g_runstat.rounds_select_rx += rs.rounds_select_rx;
384 			g_runstat.rounds_select_tx += rs.rounds_select_tx;
385 #endif
386 		}
387 	}
388 
389 	TRACE_DBG("[ ALL ] Rounds: %4ldK, "
390 		  "rx: %3ldK (try: %4ldK), tx: %3ldK (try: %4ldK), "
391 		  "ps_select: %4ld (rx: %4ld, tx: %4ld)\n",
392 		  g_runstat.rounds / 1000, g_runstat.rounds_rx / 1000,
393 		  g_runstat.rounds_rx_try / 1000, g_runstat.rounds_tx / 1000,
394 		  g_runstat.rounds_tx_try / 1000, g_runstat.rounds_select,
395 		  g_runstat.rounds_select_rx, g_runstat.rounds_select_tx);
396 #endif /* ROUND_STAT */
397 
398 #if TIME_STAT
399 	for (i = 0; i < g_config.mos->num_cores; i++) {
400 		if (running[i]) {
401 			PrintThreadRoundTime(g_mtcp[i]);
402 		}
403 	}
404 #endif
405 
406 #if EVENT_STAT
407 	for (i = 0; i < g_config.mos->num_cores; i++) {
408 		if (running[i] && g_mtcp[i]->ep) {
409 			PrintEventStat(i, &g_mtcp[i]->ep->stat);
410 		}
411 	}
412 #endif
413 
414 	fflush(stderr);
415 }
416 #endif /* NETSTAT */
417 /*----------------------------------------------------------------------------*/
418 static inline void
419 FlushMonitorReadEvents(mtcp_manager_t mtcp)
420 {
421 	struct event_queue *mtcpq;
422 	struct tcp_stream *cur_stream;
423 	struct mon_listener *walk;
424 
425 	/* check if monitor sockets should be passed data */
426 	TAILQ_FOREACH(walk, &mtcp->monitors, link) {
427 		if (walk->socket->socktype != MOS_SOCK_MONITOR_STREAM ||
428 			!(mtcpq = walk->eq))
429 			continue;
430 
431 		while (mtcpq->num_events > 0) {
432 			cur_stream =
433 				(struct tcp_stream *)mtcpq->events[mtcpq->start++].ev.data.ptr;
434 			/* only read events */
435 			if (cur_stream != NULL &&
436 					(cur_stream->socket->events | MOS_EPOLLIN)) {
437 				if (cur_stream->rcvvar != NULL &&
438 						cur_stream->rcvvar->rcvbuf != NULL) {
439 					/* no need to pass pkt context */
440 					struct socket_map *walk;
441 					SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
442 						HandleCallback(mtcp, MOS_NULL, walk,
443 							       cur_stream->side, NULL,
444 							       MOS_ON_CONN_NEW_DATA);
445 					} SOCKQ_FOREACH_END;
446 #ifndef NEWRB
447 					/* re-adjust tcp_ring_buffer now */
448 					RBRemove(mtcp->rbm_rcv, cur_stream->rcvvar->rcvbuf,
449 							cur_stream->rcvvar->rcvbuf->merged_len,
450 							AT_MTCP, cur_stream->buffer_mgmt);
451 					cur_stream->rcvvar->rcv_wnd =
452 						cur_stream->rcvvar->rcvbuf->size
453 						- 1 - cur_stream->rcvvar->rcvbuf->last_len;
454 #endif
455 				}
456 				/* reset the actions now */
457 				cur_stream->actions = 0;
458 			}
459 			if (mtcpq->start >= mtcpq->size)
460 				mtcpq->start = 0;
461 			mtcpq->num_events--;
462 		}
463 	}
464 }
465 /*----------------------------------------------------------------------------*/
466 static inline void
467 FlushBufferedReadEvents(mtcp_manager_t mtcp)
468 {
469 	int i;
470 	int offset;
471 	struct event_queue *mtcpq;
472 	struct tcp_stream *cur_stream;
473 
474 	if (mtcp->ep == NULL) {
475 		TRACE_EPOLL("No epoll socket has been registered yet!\n");
476 		return;
477 	} else {
478 		/* case when mtcpq exists */
479 		mtcpq = mtcp->ep->mtcp_queue;
480 		offset = mtcpq->start;
481 	}
482 
483 	/* we will use queued-up epoll read-in events
484 	 * to trigger buffered read monitor events */
485 	for (i = 0; i < mtcpq->num_events; i++) {
486 		cur_stream = mtcp->smap[mtcpq->events[offset++].sockid].stream;
487 		/* only read events */
488 		/* Raise new data callback event */
489 		if (cur_stream != NULL &&
490 				(cur_stream->socket->events | MOS_EPOLLIN)) {
491 			if (cur_stream->rcvvar != NULL &&
492 					cur_stream->rcvvar->rcvbuf != NULL) {
493 				/* no need to pass pkt context */
494 				struct socket_map *walk;
495 				SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
496 					HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side,
497 							NULL, MOS_ON_CONN_NEW_DATA);
498 				} SOCKQ_FOREACH_END;
499 			}
500 		}
501 		if (offset >= mtcpq->size)
502 			offset = 0;
503 	}
504 }
505 /*----------------------------------------------------------------------------*/
506 static inline void
507 FlushEpollEvents(mtcp_manager_t mtcp, uint32_t cur_ts)
508 {
509 	struct mtcp_epoll *ep = mtcp->ep;
510 	struct event_queue *usrq = ep->usr_queue;
511 	struct event_queue *mtcpq = ep->mtcp_queue;
512 
513 	pthread_mutex_lock(&ep->epoll_lock);
514 	if (ep->mtcp_queue->num_events > 0) {
515 		/* while mtcp_queue have events */
516 		/* and usr_queue is not full */
517 		while (mtcpq->num_events > 0 && usrq->num_events < usrq->size) {
518 			/* copy the event from mtcp_queue to usr_queue */
519 			usrq->events[usrq->end++] = mtcpq->events[mtcpq->start++];
520 
521 			if (usrq->end >= usrq->size)
522 				usrq->end = 0;
523 			usrq->num_events++;
524 
525 			if (mtcpq->start >= mtcpq->size)
526 				mtcpq->start = 0;
527 			mtcpq->num_events--;
528 		}
529 	}
530 
531 	/* if there are pending events, wake up user */
532 	if (ep->waiting && (ep->usr_queue->num_events > 0 ||
533 				ep->usr_shadow_queue->num_events > 0)) {
534 		STAT_COUNT(mtcp->runstat.rounds_epoll);
535 		TRACE_EPOLL("Broadcasting events. num: %d, cur_ts: %u, prev_ts: %u\n",
536 				ep->usr_queue->num_events, cur_ts, mtcp->ts_last_event);
537 		mtcp->ts_last_event = cur_ts;
538 		ep->stat.wakes++;
539 		pthread_cond_signal(&ep->epoll_cond);
540 	}
541 	pthread_mutex_unlock(&ep->epoll_lock);
542 }
543 /*----------------------------------------------------------------------------*/
544 static inline void
545 HandleApplicationCalls(mtcp_manager_t mtcp, uint32_t cur_ts)
546 {
547 	tcp_stream *stream;
548 	int cnt, max_cnt;
549 	int handled, delayed;
550 	int control, send, ack;
551 
552 	/* connect handling */
553 	while ((stream = StreamDequeue(mtcp->connectq))) {
554 		if (stream->state != TCP_ST_SYN_SENT) {
555 			TRACE_INFO("Got a connection request from app with state: %s",
556 				   TCPStateToString(stream));
557 			exit(EXIT_FAILURE);
558 		} else {
559 			stream->cb_events |= MOS_ON_CONN_START |
560 				MOS_ON_TCP_STATE_CHANGE;
561 			/* if monitor is on... */
562 			if (stream->pair_stream != NULL)
563 				stream->pair_stream->cb_events |=
564 					MOS_ON_CONN_START;
565 		}
566 		AddtoControlList(mtcp, stream, cur_ts);
567 	}
568 
569 	/* send queue handling */
570 	while ((stream = StreamDequeue(mtcp->sendq))) {
571 		stream->sndvar->on_sendq = FALSE;
572 		AddtoSendList(mtcp, stream);
573 	}
574 
575 	/* ack queue handling */
576 	while ((stream = StreamDequeue(mtcp->ackq))) {
577 		stream->sndvar->on_ackq = FALSE;
578 		EnqueueACK(mtcp, stream, cur_ts, ACK_OPT_AGGREGATE);
579 	}
580 
581 	/* close handling */
582 	handled = delayed = 0;
583 	control = send = ack = 0;
584 	while ((stream = StreamDequeue(mtcp->closeq))) {
585 		struct tcp_send_vars *sndvar = stream->sndvar;
586 		sndvar->on_closeq = FALSE;
587 
588 		if (sndvar->sndbuf) {
589 			sndvar->fss = sndvar->sndbuf->head_seq + sndvar->sndbuf->len;
590 		} else {
591 			sndvar->fss = stream->snd_nxt;
592 		}
593 
594 		if (g_config.mos->tcp_timeout > 0)
595 			RemoveFromTimeoutList(mtcp, stream);
596 
597 		if (stream->have_reset) {
598 			handled++;
599 			if (stream->state != TCP_ST_CLOSED_RSVD) {
600 				stream->close_reason = TCP_RESET;
601 				stream->state = TCP_ST_CLOSED_RSVD;
602 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
603 				TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id);
604 				DestroyTCPStream(mtcp, stream);
605 			} else {
606 				TRACE_ERROR("Stream already closed.\n");
607 			}
608 
609 		} else if (sndvar->on_control_list) {
610 			sndvar->on_closeq_int = TRUE;
611 			StreamInternalEnqueue(mtcp->closeq_int, stream);
612 			delayed++;
613 			if (sndvar->on_control_list)
614 				control++;
615 			if (sndvar->on_send_list)
616 				send++;
617 			if (sndvar->on_ack_list)
618 				ack++;
619 
620 		} else if (sndvar->on_send_list || sndvar->on_ack_list) {
621 			handled++;
622 			if (stream->state == TCP_ST_ESTABLISHED) {
623 				stream->state = TCP_ST_FIN_WAIT_1;
624 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
625 				TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id);
626 
627 			} else if (stream->state == TCP_ST_CLOSE_WAIT) {
628 				stream->state = TCP_ST_LAST_ACK;
629 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
630 				TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id);
631 			}
632 			stream->control_list_waiting = TRUE;
633 
634 		} else if (stream->state != TCP_ST_CLOSED_RSVD) {
635 			handled++;
636 			if (stream->state == TCP_ST_ESTABLISHED) {
637 				stream->state = TCP_ST_FIN_WAIT_1;
638 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
639 				TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id);
640 
641 			} else if (stream->state == TCP_ST_CLOSE_WAIT) {
642 				stream->state = TCP_ST_LAST_ACK;
643 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
644 				TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id);
645 			}
646 			//sndvar->rto = TCP_FIN_RTO;
647 			//UpdateRetransmissionTimer(mtcp, stream, mtcp->cur_ts);
648 			AddtoControlList(mtcp, stream, cur_ts);
649 		} else {
650 			TRACE_ERROR("Already closed connection!\n");
651 		}
652 	}
653 	TRACE_ROUND("Handling close connections. cnt: %d\n", cnt);
654 
655 	cnt = 0;
656 	max_cnt = mtcp->closeq_int->count;
657 	while (cnt++ < max_cnt) {
658 		stream = StreamInternalDequeue(mtcp->closeq_int);
659 
660 		if (stream->sndvar->on_control_list) {
661 			StreamInternalEnqueue(mtcp->closeq_int, stream);
662 
663 		} else if (stream->state != TCP_ST_CLOSED_RSVD) {
664 			handled++;
665 			stream->sndvar->on_closeq_int = FALSE;
666 			if (stream->state == TCP_ST_ESTABLISHED) {
667 				stream->state = TCP_ST_FIN_WAIT_1;
668 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
669 				TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id);
670 
671 			} else if (stream->state == TCP_ST_CLOSE_WAIT) {
672 				stream->state = TCP_ST_LAST_ACK;
673 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
674 				TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id);
675 			}
676 			AddtoControlList(mtcp, stream, cur_ts);
677 		} else {
678 			stream->sndvar->on_closeq_int = FALSE;
679 			TRACE_ERROR("Already closed connection!\n");
680 		}
681 	}
682 
683 	/* reset handling */
684 	while ((stream = StreamDequeue(mtcp->resetq))) {
685 		stream->sndvar->on_resetq = FALSE;
686 
687 		if (g_config.mos->tcp_timeout > 0)
688 			RemoveFromTimeoutList(mtcp, stream);
689 
690 		if (stream->have_reset) {
691 			if (stream->state != TCP_ST_CLOSED_RSVD) {
692 				stream->close_reason = TCP_RESET;
693 				stream->state = TCP_ST_CLOSED_RSVD;
694 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
695 				TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id);
696 				DestroyTCPStream(mtcp, stream);
697 			} else {
698 				TRACE_ERROR("Stream already closed.\n");
699 			}
700 
701 		} else if (stream->sndvar->on_control_list ||
702 				stream->sndvar->on_send_list || stream->sndvar->on_ack_list) {
703 			/* wait until all the queues are flushed */
704 			stream->sndvar->on_resetq_int = TRUE;
705 			StreamInternalEnqueue(mtcp->resetq_int, stream);
706 
707 		} else {
708 			if (stream->state != TCP_ST_CLOSED_RSVD) {
709 				stream->close_reason = TCP_ACTIVE_CLOSE;
710 				stream->state = TCP_ST_CLOSED_RSVD;
711 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
712 				TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id);
713 				AddtoControlList(mtcp, stream, cur_ts);
714 			} else {
715 				TRACE_ERROR("Stream already closed.\n");
716 			}
717 		}
718 	}
719 	TRACE_ROUND("Handling reset connections. cnt: %d\n", cnt);
720 
721 	cnt = 0;
722 	max_cnt = mtcp->resetq_int->count;
723 	while (cnt++ < max_cnt) {
724 		stream = StreamInternalDequeue(mtcp->resetq_int);
725 
726 		if (stream->sndvar->on_control_list ||
727 				stream->sndvar->on_send_list || stream->sndvar->on_ack_list) {
728 			/* wait until all the queues are flushed */
729 			StreamInternalEnqueue(mtcp->resetq_int, stream);
730 
731 		} else {
732 			stream->sndvar->on_resetq_int = FALSE;
733 
734 			if (stream->state != TCP_ST_CLOSED_RSVD) {
735 				stream->close_reason = TCP_ACTIVE_CLOSE;
736 				stream->state = TCP_ST_CLOSED_RSVD;
737 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
738 				TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id);
739 				AddtoControlList(mtcp, stream, cur_ts);
740 			} else {
741 				TRACE_ERROR("Stream already closed.\n");
742 			}
743 		}
744 	}
745 
746 	/* destroy streams in destroyq */
747 	while ((stream = StreamDequeue(mtcp->destroyq))) {
748 		DestroyTCPStream(mtcp, stream);
749 	}
750 
751 	mtcp->wakeup_flag = FALSE;
752 }
753 /*----------------------------------------------------------------------------*/
754 static inline void
755 WritePacketsToChunks(mtcp_manager_t mtcp, uint32_t cur_ts)
756 {
757 	int thresh = g_config.mos->max_concurrency;
758 	int i;
759 
760 	/* Set the threshold to g_config.mos->max_concurrency to send ACK immediately */
761 	/* Otherwise, set to appropriate value (e.g. thresh) */
762 	assert(mtcp->g_sender != NULL);
763 	if (mtcp->g_sender->control_list_cnt)
764 		WriteTCPControlList(mtcp, mtcp->g_sender, cur_ts, thresh);
765 	if (mtcp->g_sender->ack_list_cnt)
766 		WriteTCPACKList(mtcp, mtcp->g_sender, cur_ts, thresh);
767 	if (mtcp->g_sender->send_list_cnt)
768 		WriteTCPDataList(mtcp, mtcp->g_sender, cur_ts, thresh);
769 
770 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
771 		assert(mtcp->n_sender[i] != NULL);
772 		if (mtcp->n_sender[i]->control_list_cnt)
773 			WriteTCPControlList(mtcp, mtcp->n_sender[i], cur_ts, thresh);
774 		if (mtcp->n_sender[i]->ack_list_cnt)
775 			WriteTCPACKList(mtcp, mtcp->n_sender[i], cur_ts, thresh);
776 		if (mtcp->n_sender[i]->send_list_cnt)
777 			WriteTCPDataList(mtcp, mtcp->n_sender[i], cur_ts, thresh);
778 	}
779 }
780 /*----------------------------------------------------------------------------*/
781 #if TESTING
782 static int
783 DestroyRemainingFlows(mtcp_manager_t mtcp)
784 {
785 	struct hashtable *ht = mtcp->tcp_flow_table;
786 	tcp_stream *walk;
787 	int cnt, i;
788 
789 	cnt = 0;
790 
791 	thread_printf(mtcp, mtcp->log_fp,
792 			"CPU %d: Flushing remaining flows.\n", mtcp->ctx->cpu);
793 
794 	for (i = 0; i < NUM_BINS; i++) {
795 		TAILQ_FOREACH(walk, &ht->ht_table[i], rcvvar->he_link) {
796 			thread_printf(mtcp, mtcp->log_fp,
797 					"CPU %d: Destroying stream %d\n", mtcp->ctx->cpu, walk->id);
798 			DumpStream(mtcp, walk);
799 			DestroyTCPStream(mtcp, walk);
800 			cnt++;
801 		}
802 	}
803 
804 	return cnt;
805 }
806 #endif
807 /*----------------------------------------------------------------------------*/
808 static void
809 InterruptApplication(mtcp_manager_t mtcp)
810 {
811 	/* interrupt if the mtcp_epoll_wait() is waiting */
812 	if (mtcp->ep) {
813 		pthread_mutex_lock(&mtcp->ep->epoll_lock);
814 		if (mtcp->ep->waiting) {
815 			pthread_cond_signal(&mtcp->ep->epoll_cond);
816 		}
817 		pthread_mutex_unlock(&mtcp->ep->epoll_lock);
818 	}
819 	/* interrupt if the accept() is waiting */
820 	if (mtcp->listener) {
821 		if (mtcp->listener->socket) {
822 			pthread_mutex_lock(&mtcp->listener->accept_lock);
823 			if (!(mtcp->listener->socket->opts & MTCP_NONBLOCK)) {
824 				pthread_cond_signal(&mtcp->listener->accept_cond);
825 			}
826 			pthread_mutex_unlock(&mtcp->listener->accept_lock);
827 		}
828 	}
829 }
830 /*----------------------------------------------------------------------------*/
831 void
832 RunPassiveLoop(mtcp_manager_t mtcp)
833 {
834 	sem_wait(&g_done_sem[mtcp->ctx->cpu]);
835 	sem_destroy(&g_done_sem[mtcp->ctx->cpu]);
836 	return;
837 }
838 /*----------------------------------------------------------------------------*/
839 static void
840 RunMainLoop(struct mtcp_thread_context *ctx)
841 {
842 	mtcp_manager_t mtcp = ctx->mtcp_manager;
843 	int i;
844 	int recv_cnt;
845 
846 #if E_PSIO
847 	int rx_inf, tx_inf;
848 #endif
849 
850 	struct timeval cur_ts = {0};
851 	uint32_t ts, ts_prev;
852 
853 #if TIME_STAT
854 	struct timeval prev_ts, processing_ts, tcheck_ts,
855 				   epoll_ts, handle_ts, xmit_ts, select_ts;
856 #endif
857 	int thresh;
858 
859 	gettimeofday(&cur_ts, NULL);
860 
861 #if E_PSIO
862 	/* do nothing */
863 #else
864 #if !USE_CHUNK_BUF
865 	/* create packet write chunk */
866 	InitWriteChunks(handle, ctx->w_chunk);
867 	for (i = 0; i < ETH_NUM; i++) {
868 		ctx->w_chunk[i].cnt = 0;
869 		ctx->w_off[i] = 0;
870 		ctx->w_cur_idx[i] = 0;
871 	}
872 #endif
873 #endif
874 
875 	TRACE_DBG("CPU %d: mtcp thread running.\n", ctx->cpu);
876 
877 #if TIME_STAT
878 	prev_ts = cur_ts;
879 	InitStatCounter(&mtcp->rtstat.round);
880 	InitStatCounter(&mtcp->rtstat.processing);
881 	InitStatCounter(&mtcp->rtstat.tcheck);
882 	InitStatCounter(&mtcp->rtstat.epoll);
883 	InitStatCounter(&mtcp->rtstat.handle);
884 	InitStatCounter(&mtcp->rtstat.xmit);
885 	InitStatCounter(&mtcp->rtstat.select);
886 #endif
887 
888 	ts = ts_prev = 0;
889 	while ((!ctx->done || mtcp->flow_cnt) && !ctx->exit) {
890 
891 		STAT_COUNT(mtcp->runstat.rounds);
892 		recv_cnt = 0;
893 
894 #if E_PSIO
895 #if 0
896 		event.timeout = PS_SELECT_TIMEOUT;
897 		NID_ZERO(event.rx_nids);
898 		NID_ZERO(event.tx_nids);
899 		NID_ZERO(rx_avail);
900 		//NID_ZERO(tx_avail);
901 #endif
902 		gettimeofday(&cur_ts, NULL);
903 #if TIME_STAT
904 		/* measure the inter-round delay */
905 		UpdateStatCounter(&mtcp->rtstat.round, TimeDiffUs(&cur_ts, &prev_ts));
906 		prev_ts = cur_ts;
907 #endif
908 
909 		ts = TIMEVAL_TO_TS(&cur_ts);
910 		mtcp->cur_ts = ts;
911 
912 		for (rx_inf = 0; rx_inf < g_config.mos->netdev_table->num; rx_inf++) {
913 
914 			recv_cnt = mtcp->iom->recv_pkts(ctx, rx_inf);
915 			STAT_COUNT(mtcp->runstat.rounds_rx_try);
916 
917 			for (i = 0; i < recv_cnt; i++) {
918 				uint16_t len;
919 				uint8_t *pktbuf;
920 				pktbuf = mtcp->iom->get_rptr(mtcp->ctx, rx_inf, i, &len);
921 				ProcessPacket(mtcp, rx_inf, i, ts, pktbuf, len);
922 			}
923 
924 #ifdef ENABLE_DPDKR
925 			mtcp->iom->send_pkts(ctx, rx_inf);
926 			continue;
927 #endif
928 		}
929 		STAT_COUNT(mtcp->runstat.rounds_rx);
930 
931 #else /* E_PSIO */
932 		gettimeofday(&cur_ts, NULL);
933 		ts = TIMEVAL_TO_TS(&cur_ts);
934 		mtcp->cur_ts = ts;
935 		/*
936 		 * Read packets into a chunk from NIC
937 		 * chk_w_idx : next chunk index to write packets from NIC
938 		 */
939 
940 		STAT_COUNT(mtcp->runstat.rounds_rx_try);
941 		chunk.cnt = PS_CHUNK_SIZE;
942 		recv_cnt = ps_recv_chunk(handle, &chunk);
943 		if (recv_cnt < 0) {
944 			if (errno != EAGAIN && errno != EINTR) {
945 				perror("ps_recv_chunk");
946 				assert(0);
947 			}
948 		}
949 
950 		/*
951 		 * Handling Packets
952 		 * chk_r_idx : next chunk index to read and handle
953 		*/
954 		for (i = 0; i < recv_cnt; i++) {
955 			ProcessPacket(mtcp, chunk.queue.ifindex, ts,
956 					(u_char *)(chunk.buf + chunk.info[i].offset),
957 					chunk.info[i].len);
958 		}
959 
960 		if (recv_cnt > 0)
961 			STAT_COUNT(mtcp->runstat.rounds_rx);
962 #endif /* E_PSIO */
963 #if TIME_STAT
964 		gettimeofday(&processing_ts, NULL);
965 		UpdateStatCounter(&mtcp->rtstat.processing,
966 				TimeDiffUs(&processing_ts, &cur_ts));
967 #endif /* TIME_STAT */
968 
969 		/* Handle user defined timeout */
970 		struct timer *walk, *tmp;
971 		for (walk = TAILQ_FIRST(&mtcp->timer_list); walk != NULL; walk = tmp) {
972 			tmp = TAILQ_NEXT(walk, timer_link);
973 			if (TIMEVAL_LT(&cur_ts, &walk->exp))
974 				break;
975 
976 			struct mtcp_context mctx = {.cpu = ctx->cpu};
977 			walk->cb(&mctx, walk->id, 0, 0 /* FIXME */, NULL);
978 			DelTimer(mtcp, walk);
979 		}
980 
981 		/* interaction with application */
982 		if (mtcp->flow_cnt > 0) {
983 
984 			/* check retransmission timeout and timewait expire */
985 #if 0
986 			thresh = (int)mtcp->flow_cnt / (TS_TO_USEC(PER_STREAM_TCHECK));
987 			assert(thresh >= 0);
988 			if (thresh == 0)
989 				thresh = 1;
990 			if (recv_cnt > 0 && thresh > recv_cnt)
991 				thresh = recv_cnt;
992 #else
993 			thresh = g_config.mos->max_concurrency;
994 #endif
995 
996 			/* Eunyoung, you may fix this later
997 			 * if there is no rcv packet, we will send as much as possible
998 			 */
999 			if (thresh == -1)
1000 				thresh = g_config.mos->max_concurrency;
1001 
1002 			CheckRtmTimeout(mtcp, ts, thresh);
1003 			CheckTimewaitExpire(mtcp, ts, thresh);
1004 
1005 			if (g_config.mos->tcp_timeout > 0 && ts != ts_prev) {
1006 				CheckConnectionTimeout(mtcp, ts, thresh);
1007 			}
1008 
1009 #if TIME_STAT
1010 		}
1011 		gettimeofday(&tcheck_ts, NULL);
1012 		UpdateStatCounter(&mtcp->rtstat.tcheck,
1013 				TimeDiffUs(&tcheck_ts, &processing_ts));
1014 
1015 		if (mtcp->flow_cnt > 0) {
1016 #endif /* TIME_STAT */
1017 
1018 		}
1019 
1020 		/*
1021 		 * before flushing epoll events, call monitor events for
1022 		 * all registered `read` events
1023 		 */
1024 		if (mtcp->num_msp > 0)
1025 			/* call this when only a standalone monitor is running */
1026 			FlushMonitorReadEvents(mtcp);
1027 
1028 		/* if epoll is in use, flush all the queued events */
1029 		if (mtcp->ep) {
1030 			FlushBufferedReadEvents(mtcp);
1031 			FlushEpollEvents(mtcp, ts);
1032 		}
1033 #if TIME_STAT
1034 		gettimeofday(&epoll_ts, NULL);
1035 		UpdateStatCounter(&mtcp->rtstat.epoll,
1036 				TimeDiffUs(&epoll_ts, &tcheck_ts));
1037 #endif /* TIME_STAT */
1038 
1039 		if (end_app_exists && mtcp->flow_cnt > 0) {
1040 			/* handle stream queues  */
1041 			HandleApplicationCalls(mtcp, ts);
1042 		}
1043 
1044 #ifdef ENABLE_DPDKR
1045 		continue;
1046 #endif
1047 
1048 #if TIME_STAT
1049 		gettimeofday(&handle_ts, NULL);
1050 		UpdateStatCounter(&mtcp->rtstat.handle,
1051 				TimeDiffUs(&handle_ts, &epoll_ts));
1052 #endif /* TIME_STAT */
1053 
1054 		WritePacketsToChunks(mtcp, ts);
1055 
1056 		/* send packets from write buffer */
1057 #if E_PSIO
1058 		/* With E_PSIO, send until tx is available */
1059 		int num_dev = g_config.mos->netdev_table->num;
1060 		if (likely(mtcp->iom->send_pkts != NULL))
1061 			for (tx_inf = 0; tx_inf < num_dev; tx_inf++) {
1062 				mtcp->iom->send_pkts(ctx, tx_inf);
1063 			}
1064 
1065 #else /* E_PSIO */
1066 		/* Without E_PSIO, try send chunks immediately */
1067 		for (i = 0; i < g_config.mos->netdev_table->num; i++) {
1068 #if USE_CHUNK_BUF
1069 			/* in the case of using ps_send_chunk_buf() without E_PSIO */
1070 			ret = FlushSendChunkBuf(mtcp, i);
1071 #else
1072 			/* if not using ps_send_chunk_buf() */
1073 			ret = FlushWriteBuffer(ctx, i);
1074 #endif
1075 			if (ret < 0) {
1076 				TRACE_ERROR("Failed to send chunks.\n");
1077 			} else if (ret > 0) {
1078 				STAT_COUNT(mtcp->runstat.rounds_tx);
1079 			}
1080 		}
1081 #endif /* E_PSIO */
1082 
1083 #if TIME_STAT
1084 		gettimeofday(&xmit_ts, NULL);
1085 		UpdateStatCounter(&mtcp->rtstat.xmit,
1086 				TimeDiffUs(&xmit_ts, &handle_ts));
1087 #endif /* TIME_STAT */
1088 
1089 		if (ts != ts_prev) {
1090 			ts_prev = ts;
1091 #ifdef NETSTAT
1092 			if (ctx->cpu == printer) {
1093 #ifdef RUN_ARP
1094 				ARPTimer(mtcp, ts);
1095 #endif
1096 #ifdef NETSTAT
1097 				PrintNetworkStats(mtcp, ts);
1098 #endif
1099 			}
1100 #endif /* NETSTAT */
1101 		}
1102 
1103 		if (mtcp->iom->select)
1104 			mtcp->iom->select(ctx);
1105 
1106 		if (ctx->interrupt) {
1107 			InterruptApplication(mtcp);
1108 		}
1109 	}
1110 
1111 #if TESTING
1112 	DestroyRemainingFlows(mtcp);
1113 #endif
1114 
1115 	TRACE_DBG("MTCP thread %d out of main loop.\n", ctx->cpu);
1116 	/* flush logs */
1117 	flush_log_data(mtcp);
1118 	TRACE_DBG("MTCP thread %d flushed logs.\n", ctx->cpu);
1119 	InterruptApplication(mtcp);
1120 	TRACE_INFO("MTCP thread %d finished.\n", ctx->cpu);
1121 }
1122 /*----------------------------------------------------------------------------*/
1123 struct mtcp_sender *
1124 CreateMTCPSender(int ifidx)
1125 {
1126 	struct mtcp_sender *sender;
1127 
1128 	sender = (struct mtcp_sender *)calloc(1, sizeof(struct mtcp_sender));
1129 	if (!sender) {
1130 		return NULL;
1131 	}
1132 
1133 	sender->ifidx = ifidx;
1134 
1135 	TAILQ_INIT(&sender->control_list);
1136 	TAILQ_INIT(&sender->send_list);
1137 	TAILQ_INIT(&sender->ack_list);
1138 
1139 	sender->control_list_cnt = 0;
1140 	sender->send_list_cnt = 0;
1141 	sender->ack_list_cnt = 0;
1142 
1143 	return sender;
1144 }
1145 /*----------------------------------------------------------------------------*/
1146 void
1147 DestroyMTCPSender(struct mtcp_sender *sender)
1148 {
1149 	free(sender);
1150 }
1151 /*----------------------------------------------------------------------------*/
1152 static mtcp_manager_t
1153 InitializeMTCPManager(struct mtcp_thread_context* ctx)
1154 {
1155 	mtcp_manager_t mtcp;
1156 	char log_name[MAX_FILE_NAME];
1157 	int i;
1158 
1159 	posix_seq_srand((unsigned)pthread_self());
1160 
1161 	mtcp = (mtcp_manager_t)calloc(1, sizeof(struct mtcp_manager));
1162 	if (!mtcp) {
1163 		perror("malloc");
1164 		CTRACE_ERROR("Failed to allocate mtcp_manager.\n");
1165 		return NULL;
1166 	}
1167 	g_mtcp[ctx->cpu] = mtcp;
1168 
1169 	mtcp->tcp_flow_table = CreateHashtable();
1170 	if (!mtcp->tcp_flow_table) {
1171 		CTRACE_ERROR("Falied to allocate tcp flow table.\n");
1172 		return NULL;
1173 	}
1174 
1175 #ifdef HUGEPAGE
1176 #define	IS_HUGEPAGE 1
1177 #else
1178 #define	IS_HUGEPAGE 0
1179 #endif
1180 	if (mon_app_exists) {
1181 		/* initialize event callback */
1182 #ifdef NEWEV
1183 		InitEvent(mtcp);
1184 #else
1185 		InitEvent(mtcp, NUM_EV_TABLE);
1186 #endif
1187 	}
1188 
1189 	if (!(mtcp->bufseg_pool = MPCreate(sizeof(tcpbufseg_t),
1190 			sizeof(tcpbufseg_t) * g_config.mos->max_concurrency *
1191 			((g_config.mos->rmem_size - 1) / UNITBUFSIZE + 1), 0))) {
1192 		TRACE_ERROR("Failed to allocate ev_table pool\n");
1193 		exit(0);
1194 	}
1195 	if (!(mtcp->sockent_pool = MPCreate(sizeof(struct sockent),
1196 			sizeof(struct sockent) * g_config.mos->max_concurrency * 3, 0))) {
1197 		TRACE_ERROR("Failed to allocate ev_table pool\n");
1198 		exit(0);
1199 	}
1200 #ifdef USE_TIMER_POOL
1201 	if (!(mtcp->timer_pool = MPCreate(sizeof(struct timer),
1202 					  sizeof(struct timer) * g_config.mos->max_concurrency * 10, 0))) {
1203 		TRACE_ERROR("Failed to allocate ev_table pool\n");
1204 		exit(0);
1205 	}
1206 #endif
1207 	mtcp->flow_pool = MPCreate(sizeof(tcp_stream),
1208 								sizeof(tcp_stream) * g_config.mos->max_concurrency, IS_HUGEPAGE);
1209 	if (!mtcp->flow_pool) {
1210 		CTRACE_ERROR("Failed to allocate tcp flow pool.\n");
1211 		return NULL;
1212 	}
1213 	mtcp->rv_pool = MPCreate(sizeof(struct tcp_recv_vars),
1214 			sizeof(struct tcp_recv_vars) * g_config.mos->max_concurrency, IS_HUGEPAGE);
1215 	if (!mtcp->rv_pool) {
1216 		CTRACE_ERROR("Failed to allocate tcp recv variable pool.\n");
1217 		return NULL;
1218 	}
1219 	mtcp->sv_pool = MPCreate(sizeof(struct tcp_send_vars),
1220 			sizeof(struct tcp_send_vars) * g_config.mos->max_concurrency, IS_HUGEPAGE);
1221 	if (!mtcp->sv_pool) {
1222 		CTRACE_ERROR("Failed to allocate tcp send variable pool.\n");
1223 		return NULL;
1224 	}
1225 
1226 	mtcp->rbm_snd = SBManagerCreate(g_config.mos->wmem_size, g_config.mos->no_ring_buffers,
1227 					g_config.mos->max_concurrency);
1228 	if (!mtcp->rbm_snd) {
1229 		CTRACE_ERROR("Failed to create send ring buffer.\n");
1230 		return NULL;
1231 	}
1232 #ifndef NEWRB
1233 	mtcp->rbm_rcv = RBManagerCreate(g_config.mos->rmem_size, g_config.mos->no_ring_buffers,
1234 					g_config.mos->max_concurrency);
1235 	if (!mtcp->rbm_rcv) {
1236 		CTRACE_ERROR("Failed to create recv ring buffer.\n");
1237 		return NULL;
1238 	}
1239 #endif
1240 
1241 	mtcp->smap = (socket_map_t)calloc(g_config.mos->max_concurrency, sizeof(struct socket_map));
1242 	if (!mtcp->smap) {
1243 		perror("calloc");
1244 		CTRACE_ERROR("Failed to allocate memory for stream map.\n");
1245 		return NULL;
1246 	}
1247 
1248 	if (mon_app_exists) {
1249 		mtcp->msmap = (socket_map_t)calloc(g_config.mos->max_concurrency, sizeof(struct socket_map));
1250 		if (!mtcp->msmap) {
1251 			perror("calloc");
1252 			CTRACE_ERROR("Failed to allocate memory for monitor stream map.\n");
1253 			return NULL;
1254 		}
1255 
1256 		for (i = 0; i < g_config.mos->max_concurrency; i++) {
1257 			mtcp->msmap[i].monitor_stream = calloc(1, sizeof(struct mon_stream));
1258 			if (!mtcp->msmap[i].monitor_stream) {
1259 				perror("calloc");
1260 				CTRACE_ERROR("Failed to allocate memory for monitr stream map\n");
1261 				return NULL;
1262 			}
1263 		}
1264 	}
1265 
1266 	TAILQ_INIT(&mtcp->timer_list);
1267 	TAILQ_INIT(&mtcp->monitors);
1268 
1269 	TAILQ_INIT(&mtcp->free_smap);
1270 	for (i = 0; i < g_config.mos->max_concurrency; i++) {
1271 		mtcp->smap[i].id = i;
1272 		mtcp->smap[i].socktype = MOS_SOCK_UNUSED;
1273 		memset(&mtcp->smap[i].saddr, 0, sizeof(struct sockaddr_in));
1274 		mtcp->smap[i].stream = NULL;
1275 		TAILQ_INSERT_TAIL(&mtcp->free_smap, &mtcp->smap[i], link);
1276 	}
1277 
1278 	if (mon_app_exists) {
1279 		TAILQ_INIT(&mtcp->free_msmap);
1280 		for (i = 0; i < g_config.mos->max_concurrency; i++) {
1281 			mtcp->msmap[i].id = i;
1282 			mtcp->msmap[i].socktype = MOS_SOCK_UNUSED;
1283 			memset(&mtcp->msmap[i].saddr, 0, sizeof(struct sockaddr_in));
1284 			TAILQ_INSERT_TAIL(&mtcp->free_msmap, &mtcp->msmap[i], link);
1285 		}
1286 	}
1287 
1288 	mtcp->ctx = ctx;
1289 	mtcp->ep = NULL;
1290 
1291 	snprintf(log_name, MAX_FILE_NAME, "%s/"LOG_FILE_NAME"_%d",
1292 			g_config.mos->mos_log, ctx->cpu);
1293 	mtcp->log_fp = fopen(log_name, "w+");
1294 	if (!mtcp->log_fp) {
1295 		perror("fopen");
1296 		CTRACE_ERROR("Failed to create file for logging. (%s)\n", log_name);
1297 		return NULL;
1298 	}
1299 	mtcp->sp_fd = g_logctx[ctx->cpu]->pair_sp_fd;
1300 	mtcp->logger = g_logctx[ctx->cpu];
1301 
1302 	mtcp->connectq = CreateStreamQueue(BACKLOG_SIZE);
1303 	if (!mtcp->connectq) {
1304 		CTRACE_ERROR("Failed to create connect queue.\n");
1305 		return NULL;
1306 	}
1307 	mtcp->sendq = CreateStreamQueue(g_config.mos->max_concurrency);
1308 	if (!mtcp->sendq) {
1309 		CTRACE_ERROR("Failed to create send queue.\n");
1310 		return NULL;
1311 	}
1312 	mtcp->ackq = CreateStreamQueue(g_config.mos->max_concurrency);
1313 	if (!mtcp->ackq) {
1314 		CTRACE_ERROR("Failed to create ack queue.\n");
1315 		return NULL;
1316 	}
1317 	mtcp->closeq = CreateStreamQueue(g_config.mos->max_concurrency);
1318 	if (!mtcp->closeq) {
1319 		CTRACE_ERROR("Failed to create close queue.\n");
1320 		return NULL;
1321 	}
1322 	mtcp->closeq_int = CreateInternalStreamQueue(g_config.mos->max_concurrency);
1323 	if (!mtcp->closeq_int) {
1324 		CTRACE_ERROR("Failed to create close queue.\n");
1325 		return NULL;
1326 	}
1327 	mtcp->resetq = CreateStreamQueue(g_config.mos->max_concurrency);
1328 	if (!mtcp->resetq) {
1329 		CTRACE_ERROR("Failed to create reset queue.\n");
1330 		return NULL;
1331 	}
1332 	mtcp->resetq_int = CreateInternalStreamQueue(g_config.mos->max_concurrency);
1333 	if (!mtcp->resetq_int) {
1334 		CTRACE_ERROR("Failed to create reset queue.\n");
1335 		return NULL;
1336 	}
1337 	mtcp->destroyq = CreateStreamQueue(g_config.mos->max_concurrency);
1338 	if (!mtcp->destroyq) {
1339 		CTRACE_ERROR("Failed to create destroy queue.\n");
1340 		return NULL;
1341 	}
1342 
1343 	mtcp->g_sender = CreateMTCPSender(-1);
1344 	if (!mtcp->g_sender) {
1345 		CTRACE_ERROR("Failed to create global sender structure.\n");
1346 		return NULL;
1347 	}
1348 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
1349 		mtcp->n_sender[i] = CreateMTCPSender(i);
1350 		if (!mtcp->n_sender[i]) {
1351 			CTRACE_ERROR("Failed to create per-nic sender structure.\n");
1352 			return NULL;
1353 		}
1354 	}
1355 
1356 	mtcp->rto_store = InitRTOHashstore();
1357 	TAILQ_INIT(&mtcp->timewait_list);
1358 	TAILQ_INIT(&mtcp->timeout_list);
1359 
1360 	return mtcp;
1361 }
1362 /*----------------------------------------------------------------------------*/
1363 static void *
1364 MTCPRunThread(void *arg)
1365 {
1366 	mctx_t mctx = (mctx_t)arg;
1367 	int cpu = mctx->cpu;
1368 	int working;
1369 	struct mtcp_manager *mtcp;
1370 	struct mtcp_thread_context *ctx;
1371 
1372 	/* affinitize the thread to this core first */
1373 	mtcp_core_affinitize(cpu);
1374 
1375 	/* memory alloc after core affinitization would use local memory
1376 	   most time */
1377 	ctx = calloc(1, sizeof(*ctx));
1378 	if (!ctx) {
1379 		perror("calloc");
1380 		TRACE_ERROR("Failed to calloc mtcp context.\n");
1381 		exit(-1);
1382 	}
1383 	ctx->thread = pthread_self();
1384 	ctx->cpu = cpu;
1385 	mtcp = ctx->mtcp_manager = InitializeMTCPManager(ctx);
1386 	if (!mtcp) {
1387 		TRACE_ERROR("Failed to initialize mtcp manager.\n");
1388 		exit(-1);
1389 	}
1390 
1391 	/* assign mtcp context's underlying I/O module */
1392 	mtcp->iom = current_iomodule_func;
1393 
1394 	/* I/O initializing */
1395 	if (mtcp->iom->init_handle)
1396 		mtcp->iom->init_handle(ctx);
1397 
1398 	if (pthread_mutex_init(&ctx->flow_pool_lock, NULL)) {
1399 		perror("pthread_mutex_init of ctx->flow_pool_lock\n");
1400 		exit(-1);
1401 	}
1402 
1403 	if (pthread_mutex_init(&ctx->socket_pool_lock, NULL)) {
1404 		perror("pthread_mutex_init of ctx->socket_pool_lock\n");
1405 		exit(-1);
1406 	}
1407 
1408 	SQ_LOCK_INIT(&ctx->connect_lock, "ctx->connect_lock", exit(-1));
1409 	SQ_LOCK_INIT(&ctx->close_lock, "ctx->close_lock", exit(-1));
1410 	SQ_LOCK_INIT(&ctx->reset_lock, "ctx->reset_lock", exit(-1));
1411 	SQ_LOCK_INIT(&ctx->sendq_lock, "ctx->sendq_lock", exit(-1));
1412 	SQ_LOCK_INIT(&ctx->ackq_lock, "ctx->ackq_lock", exit(-1));
1413 	SQ_LOCK_INIT(&ctx->destroyq_lock, "ctx->destroyq_lock", exit(-1));
1414 
1415 	/* remember this context pointer for signal processing */
1416 	g_pctx[cpu] = ctx;
1417 	mlockall(MCL_CURRENT);
1418 
1419 	// attach (nic device, queue)
1420 	working = AttachDevice(ctx);
1421 	if (working != 0) {
1422 		sem_post(&g_init_sem[ctx->cpu]);
1423 		TRACE_DBG("MTCP thread %d finished. Not attached any device\n", ctx->cpu);
1424 		pthread_exit(NULL);
1425 	}
1426 
1427 	TRACE_DBG("CPU %d: initialization finished.\n", cpu);
1428 	sem_post(&g_init_sem[ctx->cpu]);
1429 
1430 	/* start the main loop */
1431 	RunMainLoop(ctx);
1432 
1433 	TRACE_DBG("MTCP thread %d finished.\n", ctx->cpu);
1434 
1435 	/* signaling mTCP thread is done */
1436 	sem_post(&g_done_sem[mctx->cpu]);
1437 
1438 	//pthread_exit(NULL);
1439 	return 0;
1440 }
1441 /*----------------------------------------------------------------------------*/
1442 #ifdef ENABLE_DPDK
1443 static int MTCPDPDKRunThread(void *arg)
1444 {
1445 	MTCPRunThread(arg);
1446 	return 0;
1447 }
1448 #endif /* !ENABLE_DPDK */
1449 /*----------------------------------------------------------------------------*/
1450 mctx_t
1451 mtcp_create_context(int cpu)
1452 {
1453 	mctx_t mctx;
1454 	int ret;
1455 
1456 	if (cpu >=  g_config.mos->num_cores) {
1457 		TRACE_ERROR("Failed initialize new mtcp context. "
1458 					"Requested cpu id %d exceed the number of cores %d configured to use.\n",
1459 					cpu, g_config.mos->num_cores);
1460 		return NULL;
1461 	}
1462 
1463         /* check if mtcp_create_context() was already initialized */
1464         if (g_logctx[cpu] != NULL) {
1465                 TRACE_ERROR("%s was already initialized before!\n",
1466                             __FUNCTION__);
1467                 return NULL;
1468         }
1469 
1470 	ret = sem_init(&g_init_sem[cpu], 0, 0);
1471 	if (ret) {
1472 		TRACE_ERROR("Failed initialize init_sem.\n");
1473 		return NULL;
1474 	}
1475 
1476 	ret = sem_init(&g_done_sem[cpu], 0, 0);
1477 	if (ret) {
1478 		TRACE_ERROR("Failed initialize done_sem.\n");
1479 		return NULL;
1480 	}
1481 
1482 	mctx = (mctx_t)calloc(1, sizeof(struct mtcp_context));
1483 	if (!mctx) {
1484 		TRACE_ERROR("Failed to allocate memory for mtcp_context.\n");
1485 		return NULL;
1486 	}
1487 	mctx->cpu = cpu;
1488 	g_ctx[cpu] = mctx;
1489 
1490 	/* initialize logger */
1491 	g_logctx[cpu] = (struct log_thread_context *)
1492 			calloc(1, sizeof(struct log_thread_context));
1493 	if (!g_logctx[cpu]) {
1494 		perror("malloc");
1495 		TRACE_ERROR("Failed to allocate memory for log thread context.\n");
1496 		return NULL;
1497 	}
1498 	InitLogThreadContext(g_logctx[cpu], cpu);
1499 	if (pthread_create(&log_thread[cpu],
1500 			   NULL, ThreadLogMain, (void *)g_logctx[cpu])) {
1501 		perror("pthread_create");
1502 		TRACE_ERROR("Failed to create log thread\n");
1503 		return NULL;
1504 	}
1505 
1506 #ifdef ENABLE_DPDK
1507 	/* Wake up mTCP threads (wake up I/O threads) */
1508 	if (current_iomodule_func == &dpdk_module_func) {
1509 		int master;
1510 		master = rte_get_master_lcore();
1511 		if (master == cpu) {
1512 			lcore_config[master].ret = 0;
1513 			lcore_config[master].state = FINISHED;
1514 			if (pthread_create(&g_thread[cpu],
1515 					   NULL, MTCPRunThread, (void *)mctx) != 0) {
1516 				TRACE_ERROR("pthread_create of mtcp thread failed!\n");
1517 				return NULL;
1518 			}
1519 		} else
1520 			rte_eal_remote_launch(MTCPDPDKRunThread, mctx, cpu);
1521 	} else
1522 #endif /* !ENABLE_DPDK */
1523 		{
1524 			if (pthread_create(&g_thread[cpu],
1525 					   NULL, MTCPRunThread, (void *)mctx) != 0) {
1526 				TRACE_ERROR("pthread_create of mtcp thread failed!\n");
1527 				return NULL;
1528 			}
1529 		}
1530 
1531 	sem_wait(&g_init_sem[cpu]);
1532 	sem_destroy(&g_init_sem[cpu]);
1533 
1534 	running[cpu] = TRUE;
1535 
1536 #ifdef NETSTAT
1537 #if NETSTAT_TOTAL
1538 	if (printer < 0) {
1539 		printer = cpu;
1540 		TRACE_INFO("CPU %d is in charge of printing stats.\n", printer);
1541 	}
1542 #endif
1543 #endif
1544 
1545 	return mctx;
1546 }
1547 /*----------------------------------------------------------------------------*/
1548 /**
1549  * TODO: It currently always returns 0. Add appropriate error return values
1550  */
1551 int
1552 mtcp_destroy_context(mctx_t mctx)
1553 {
1554 	struct mtcp_thread_context *ctx = g_pctx[mctx->cpu];
1555 	struct mtcp_manager *mtcp = ctx->mtcp_manager;
1556 	struct log_thread_context *log_ctx = mtcp->logger;
1557 	int ret, i;
1558 
1559 	TRACE_DBG("CPU %d: mtcp_destroy_context()\n", mctx->cpu);
1560 
1561 	/* close all stream sockets that are still open */
1562 	if (!ctx->exit) {
1563 		for (i = 0; i < g_config.mos->max_concurrency; i++) {
1564 			if (mtcp->smap[i].socktype == MOS_SOCK_STREAM) {
1565 				TRACE_DBG("Closing remaining socket %d (%s)\n",
1566 						i, TCPStateToString(mtcp->smap[i].stream));
1567 				DumpStream(mtcp, mtcp->smap[i].stream);
1568 				mtcp_close(mctx, i);
1569 			}
1570 		}
1571 	}
1572 
1573 	ctx->done = 1;
1574 
1575 	//pthread_kill(g_thread[mctx->cpu], SIGINT);
1576 #ifdef ENABLE_DPDK
1577 	ctx->exit = 1;
1578 	/* XXX - dpdk logic changes */
1579 	if (current_iomodule_func == &dpdk_module_func) {
1580 		int master = rte_get_master_lcore();
1581 		if (master == mctx->cpu)
1582 			pthread_join(g_thread[mctx->cpu], NULL);
1583 		else
1584 			rte_eal_wait_lcore(mctx->cpu);
1585 	} else
1586 #endif /* !ENABLE_DPDK */
1587 		{
1588 			pthread_join(g_thread[mctx->cpu], NULL);
1589 		}
1590 
1591 	TRACE_INFO("MTCP thread %d joined.\n", mctx->cpu);
1592 	running[mctx->cpu] = FALSE;
1593 
1594 #ifdef NETSTAT
1595 #if NETSTAT_TOTAL
1596 	if (printer == mctx->cpu) {
1597 		for (i = 0; i < num_cpus; i++) {
1598 			if (i != mctx->cpu && running[i]) {
1599 				printer = i;
1600 				break;
1601 			}
1602 		}
1603 	}
1604 #endif
1605 #endif
1606 
1607 	log_ctx->done = 1;
1608 	ret = write(log_ctx->pair_sp_fd, "F", 1);
1609 	if (ret != 1)
1610 		TRACE_ERROR("CPU %d: Fail to signal socket pair\n", mctx->cpu);
1611 
1612 	pthread_join(log_thread[ctx->cpu], NULL);
1613 	fclose(mtcp->log_fp);
1614 	TRACE_LOG("Log thread %d joined.\n", mctx->cpu);
1615 
1616 	if (mtcp->connectq) {
1617 		DestroyStreamQueue(mtcp->connectq);
1618 		mtcp->connectq = NULL;
1619 	}
1620 	if (mtcp->sendq) {
1621 		DestroyStreamQueue(mtcp->sendq);
1622 		mtcp->sendq = NULL;
1623 	}
1624 	if (mtcp->ackq) {
1625 		DestroyStreamQueue(mtcp->ackq);
1626 		mtcp->ackq = NULL;
1627 	}
1628 	if (mtcp->closeq) {
1629 		DestroyStreamQueue(mtcp->closeq);
1630 		mtcp->closeq = NULL;
1631 	}
1632 	if (mtcp->closeq_int) {
1633 		DestroyInternalStreamQueue(mtcp->closeq_int);
1634 		mtcp->closeq_int = NULL;
1635 	}
1636 	if (mtcp->resetq) {
1637 		DestroyStreamQueue(mtcp->resetq);
1638 		mtcp->resetq = NULL;
1639 	}
1640 	if (mtcp->resetq_int) {
1641 		DestroyInternalStreamQueue(mtcp->resetq_int);
1642 		mtcp->resetq_int = NULL;
1643 	}
1644 	if (mtcp->destroyq) {
1645 		DestroyStreamQueue(mtcp->destroyq);
1646 		mtcp->destroyq = NULL;
1647 	}
1648 
1649 	DestroyMTCPSender(mtcp->g_sender);
1650 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
1651 		DestroyMTCPSender(mtcp->n_sender[i]);
1652 	}
1653 
1654 	MPDestroy(mtcp->rv_pool);
1655 	MPDestroy(mtcp->sv_pool);
1656 	MPDestroy(mtcp->flow_pool);
1657 
1658 	if (mtcp->ap) {
1659 		DestroyAddressPool(mtcp->ap);
1660 	}
1661 
1662 	SQ_LOCK_DESTROY(&ctx->connect_lock);
1663 	SQ_LOCK_DESTROY(&ctx->close_lock);
1664 	SQ_LOCK_DESTROY(&ctx->reset_lock);
1665 	SQ_LOCK_DESTROY(&ctx->sendq_lock);
1666 	SQ_LOCK_DESTROY(&ctx->ackq_lock);
1667 	SQ_LOCK_DESTROY(&ctx->destroyq_lock);
1668 
1669 	//TRACE_INFO("MTCP thread %d destroyed.\n", mctx->cpu);
1670 	if (mtcp->iom->destroy_handle)
1671 		mtcp->iom->destroy_handle(ctx);
1672 	free(ctx);
1673 	free(mctx);
1674 
1675 	return 0;
1676 }
1677 /*----------------------------------------------------------------------------*/
1678 mtcp_sighandler_t
1679 mtcp_register_signal(int signum, mtcp_sighandler_t handler)
1680 {
1681 	mtcp_sighandler_t prev;
1682 
1683 	if (signum == SIGINT) {
1684 		prev = app_signal_handler;
1685 		app_signal_handler = handler;
1686 	} else {
1687 		if ((prev = signal(signum, handler)) == SIG_ERR) {
1688 			perror("signal");
1689 			return SIG_ERR;
1690 		}
1691 	}
1692 
1693 	return prev;
1694 }
1695 /*----------------------------------------------------------------------------*/
1696 int
1697 mtcp_getconf(struct mtcp_conf *conf)
1698 {
1699 	int i, j;
1700 
1701 	if (!conf)
1702 		return -1;
1703 
1704 	conf->num_cores = g_config.mos->num_cores;
1705 	conf->max_concurrency = g_config.mos->max_concurrency;
1706 	conf->cpu_mask = g_config.mos->cpu_mask;
1707 
1708 	conf->rcvbuf_size = g_config.mos->rmem_size;
1709 	conf->sndbuf_size = g_config.mos->wmem_size;
1710 
1711 	conf->tcp_timewait = g_config.mos->tcp_tw_interval;
1712 	conf->tcp_timeout = g_config.mos->tcp_timeout;
1713 
1714 	i = 0;
1715 	struct conf_block *bwalk;
1716 	TAILQ_FOREACH(bwalk, &g_config.app_blkh, link) {
1717 		struct app_conf *app_conf = (struct app_conf *)bwalk->conf;
1718 		for (j = 0; j < app_conf->app_argc; j++)
1719 			conf->app_argv[i][j] = app_conf->app_argv[j];
1720 		conf->app_argc[i] = app_conf->app_argc;
1721 		conf->app_cpu_mask[i] = app_conf->cpu_mask;
1722 		i++;
1723 	}
1724 	conf->num_app = i;
1725 
1726 	return 0;
1727 }
1728 /*----------------------------------------------------------------------------*/
1729 int
1730 mtcp_setconf(const struct mtcp_conf *conf)
1731 {
1732 	if (!conf)
1733 		return -1;
1734 
1735 	g_config.mos->num_cores = conf->num_cores;
1736 	g_config.mos->max_concurrency = conf->max_concurrency;
1737 
1738 	g_config.mos->rmem_size = conf->rcvbuf_size;
1739 	g_config.mos->wmem_size = conf->sndbuf_size;
1740 
1741 	g_config.mos->tcp_tw_interval = conf->tcp_timewait;
1742 	g_config.mos->tcp_timeout = conf->tcp_timeout;
1743 
1744 	TRACE_CONFIG("Configuration updated by mtcp_setconf().\n");
1745 	//PrintConfiguration();
1746 
1747 	return 0;
1748 }
1749 /*----------------------------------------------------------------------------*/
1750 int
1751 mtcp_init(const char *config_file)
1752 {
1753 	int i;
1754 	int ret;
1755 
1756 	if (geteuid()) {
1757 		TRACE_CONFIG("[CAUTION] Run as root if mlock is necessary.\n");
1758 	}
1759 
1760 	/* getting cpu and NIC */
1761 	num_cpus = GetNumCPUs();
1762 	assert(num_cpus >= 1);
1763 	for (i = 0; i < num_cpus; i++) {
1764 		g_mtcp[i] = NULL;
1765 		running[i] = FALSE;
1766 		sigint_cnt[i] = 0;
1767 	}
1768 
1769 	ret = LoadConfigurationUpperHalf(config_file);
1770 	if (ret) {
1771 		TRACE_CONFIG("Error occured while loading configuration.\n");
1772 		return -1;
1773 	}
1774 
1775 #if defined(ENABLE_PSIO)
1776 	current_iomodule_func = &ps_module_func;
1777 #elif defined(ENABLE_DPDK)
1778 	current_iomodule_func = &dpdk_module_func;
1779 #elif defined(ENABLE_PCAP)
1780 	current_iomodule_func = &pcap_module_func;
1781 #elif defined(ENABLE_DPDKR)
1782 	current_iomodule_func = &dpdkr_module_func;
1783 #elif defined(ENABLE_NETMAP)
1784 	current_iomodule_func = &netmap_module_func;
1785 #endif
1786 
1787 	if (current_iomodule_func->load_module_upper_half)
1788 		current_iomodule_func->load_module_upper_half();
1789 
1790 	LoadConfigurationLowerHalf();
1791 
1792 	//PrintConfiguration();
1793 
1794 	/* TODO: this should be fixed */
1795 	ap = CreateAddressPool(g_config.mos->netdev_table->ent[0]->ip_addr, 1);
1796 	if (!ap) {
1797 		TRACE_CONFIG("Error occured while creating address pool.\n");
1798 		return -1;
1799 	}
1800 
1801 	//PrintInterfaceInfo();
1802 	//PrintRoutingTable();
1803 	//PrintARPTable();
1804 	InitARPTable();
1805 
1806 	if (signal(SIGUSR1, HandleSignal) == SIG_ERR) {
1807 		perror("signal, SIGUSR1");
1808 		return -1;
1809 	}
1810 	if (signal(SIGINT, HandleSignal) == SIG_ERR) {
1811 		perror("signal, SIGINT");
1812 		return -1;
1813 	}
1814 	app_signal_handler = NULL;
1815 
1816 	printf("load_module(): %p\n", current_iomodule_func);
1817 	/* load system-wide io module specs */
1818 	if (current_iomodule_func->load_module_lower_half)
1819 		current_iomodule_func->load_module_lower_half();
1820 
1821 	GlobInitEvent();
1822 
1823 	PrintConf(&g_config);
1824 
1825 	return 0;
1826 }
1827 /*----------------------------------------------------------------------------*/
1828 int
1829 mtcp_destroy()
1830 {
1831 	int i;
1832 
1833 	/* wait until all threads are closed */
1834 	for (i = 0; i < num_cpus; i++) {
1835 		if (running[i]) {
1836 			if (pthread_join(g_thread[i], NULL) != 0)
1837 				return -1;
1838 		}
1839 	}
1840 
1841 	DestroyAddressPool(ap);
1842 
1843 	TRACE_INFO("All MTCP threads are joined.\n");
1844 
1845 	return 0;
1846 }
1847 /*----------------------------------------------------------------------------*/
1848