xref: /mOS-networking-stack/core/src/core.c (revision c6a5549b)
1 #define _GNU_SOURCE
2 #include <sched.h>
3 #include <unistd.h>
4 #include <sys/time.h>
5 #include <semaphore.h>
6 #include <sys/mman.h>
7 #include <signal.h>
8 #include <assert.h>
9 #include <string.h>
10 
11 #include "cpu.h"
12 #include "eth_in.h"
13 #include "fhash.h"
14 #include "tcp_send_buffer.h"
15 #include "tcp_ring_buffer.h"
16 #include "socket.h"
17 #include "eth_out.h"
18 #include "tcp.h"
19 #include "tcp_in.h"
20 #include "tcp_out.h"
21 #include "mtcp_api.h"
22 #include "eventpoll.h"
23 #include "logger.h"
24 #include "config.h"
25 #include "arp.h"
26 #include "ip_out.h"
27 #include "timer.h"
28 #include "debug.h"
29 #include "event_callback.h"
30 #include "tcp_rb.h"
31 #include "tcp_stream.h"
32 #include "io_module.h"
33 
34 #ifdef ENABLE_DPDK
35 /* for launching rte thread */
36 #include <rte_launch.h>
37 #include <rte_lcore.h>
38 #endif /* !ENABLE_DPDK */
39 #define PS_CHUNK_SIZE 64
40 #define RX_THRESH (PS_CHUNK_SIZE * 0.8)
41 
42 #define ROUND_STAT FALSE
43 #define TIME_STAT FALSE
44 #define EVENT_STAT FALSE
45 #define TESTING FALSE
46 
47 #define LOG_FILE_NAME "log"
48 #define MAX_FILE_NAME 1024
49 
50 #define MAX(a, b) ((a)>(b)?(a):(b))
51 #define MIN(a, b) ((a)<(b)?(a):(b))
52 
53 #define PER_STREAM_SLICE 0.1		// in ms
54 #define PER_STREAM_TCHECK 1			// in ms
55 #define PS_SELECT_TIMEOUT 100		// in us
56 
57 #define GBPS(bytes) (bytes * 8.0 / (1000 * 1000 * 1000))
58 
59 /*----------------------------------------------------------------------------*/
60 /* handlers for threads */
61 struct mtcp_thread_context *g_pctx[MAX_CPUS] = {0};
62 struct log_thread_context *g_logctx[MAX_CPUS] = {0};
63 /*----------------------------------------------------------------------------*/
64 static pthread_t g_thread[MAX_CPUS] = {0};
65 static pthread_t log_thread[MAX_CPUS] = {0};
66 /*----------------------------------------------------------------------------*/
67 static sem_t g_init_sem[MAX_CPUS];
68 static sem_t g_done_sem[MAX_CPUS];
69 static int running[MAX_CPUS] = {0};
70 /*----------------------------------------------------------------------------*/
71 mtcp_sighandler_t app_signal_handler;
72 static int sigint_cnt[MAX_CPUS] = {0};
73 static struct timespec sigint_ts[MAX_CPUS];
74 /*----------------------------------------------------------------------------*/
75 #ifdef NETSTAT
76 #if NETSTAT_TOTAL
77 static int printer = -1;
78 #if ROUND_STAT
79 #endif /* ROUND_STAT */
80 #endif /* NETSTAT_TOTAL */
81 #endif /* NETSTAT */
82 /*----------------------------------------------------------------------------*/
83 void
84 HandleSignal(int signal)
85 {
86 	int i = 0;
87 
88 	if (signal == SIGINT) {
89 		FreeConfigResources();
90 #ifdef DARWIN
91 		int core = 0;
92 #else
93 		int core = sched_getcpu();
94 #endif
95 		struct timespec cur_ts;
96 
97 		clock_gettime(CLOCK_REALTIME, &cur_ts);
98 
99 		if (sigint_cnt[core] > 0 && cur_ts.tv_sec == sigint_ts[core].tv_sec) {
100 			for (i = 0; i < g_config.mos->num_cores; i++) {
101 				if (running[i]) {
102 					exit(0);
103 					g_pctx[i]->exit = TRUE;
104 				}
105 			}
106 		} else {
107 			for (i = 0; i < g_config.mos->num_cores; i++) {
108 				if (g_pctx[i])
109 					g_pctx[i]->interrupt = TRUE;
110 			}
111 			if (!app_signal_handler) {
112 				for (i = 0; i < g_config.mos->num_cores; i++) {
113 					if (running[i]) {
114 						exit(0);
115 						g_pctx[i]->exit = TRUE;
116 					}
117 				}
118 			}
119 		}
120 		sigint_cnt[core]++;
121 		clock_gettime(CLOCK_REALTIME, &sigint_ts[core]);
122 	}
123 
124 	if (signal != SIGUSR1) {
125 		if (app_signal_handler) {
126 			app_signal_handler(signal);
127 		}
128 	}
129 }
130 /*----------------------------------------------------------------------------*/
131 static int
132 AttachDevice(struct mtcp_thread_context* ctx)
133 {
134 	int working = -1;
135 	mtcp_manager_t mtcp = ctx->mtcp_manager;
136 
137 	if (mtcp->iom->link_devices)
138 		working = mtcp->iom->link_devices(ctx);
139 	else
140 		return 0;
141 
142 	return working;
143 }
144 /*----------------------------------------------------------------------------*/
145 #ifdef TIMESTAT
146 static inline void
147 InitStatCounter(struct stat_counter *counter)
148 {
149 	counter->cnt = 0;
150 	counter->sum = 0;
151 	counter->max = 0;
152 	counter->min = 0;
153 }
154 /*----------------------------------------------------------------------------*/
155 static inline void
156 UpdateStatCounter(struct stat_counter *counter, int64_t value)
157 {
158 	counter->cnt++;
159 	counter->sum += value;
160 	if (value > counter->max)
161 		counter->max = value;
162 	if (counter->min == 0 || value < counter->min)
163 		counter->min = value;
164 }
165 /*----------------------------------------------------------------------------*/
166 static inline uint64_t
167 GetAverageStat(struct stat_counter *counter)
168 {
169 	return counter->cnt ? (counter->sum / counter->cnt) : 0;
170 }
171 /*----------------------------------------------------------------------------*/
172 static inline int64_t
173 TimeDiffUs(struct timeval *t2, struct timeval *t1)
174 {
175 	return (t2->tv_sec - t1->tv_sec) * 1000000 +
176 			(int64_t)(t2->tv_usec - t1->tv_usec);
177 }
178 /*----------------------------------------------------------------------------*/
179 #endif
180 #ifdef NETSTAT
181 static inline void
182 PrintThreadNetworkStats(mtcp_manager_t mtcp, struct net_stat *ns)
183 {
184 	int i;
185 
186 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
187 		ns->rx_packets[i] = mtcp->nstat.rx_packets[i] - mtcp->p_nstat.rx_packets[i];
188 		ns->rx_errors[i] = mtcp->nstat.rx_errors[i] - mtcp->p_nstat.rx_errors[i];
189 		ns->rx_bytes[i] = mtcp->nstat.rx_bytes[i] - mtcp->p_nstat.rx_bytes[i];
190 		ns->tx_packets[i] = mtcp->nstat.tx_packets[i] - mtcp->p_nstat.tx_packets[i];
191 		ns->tx_drops[i] = mtcp->nstat.tx_drops[i] - mtcp->p_nstat.tx_drops[i];
192 		ns->tx_bytes[i] = mtcp->nstat.tx_bytes[i] - mtcp->p_nstat.tx_bytes[i];
193 #if NETSTAT_PERTHREAD
194 		if (g_config.mos->netdev_table->ent[i]->stat_print) {
195 			fprintf(stderr, "[CPU%2d] %s flows: %6u, "
196 					"RX: %7llu(pps) (err: %5llu), %5.2lf(Gbps), "
197 					"TX: %7llu(pps), %5.2lf(Gbps)\n",
198 					mtcp->ctx->cpu,
199 					g_config.mos->netdev_table->ent[i]->dev_name,
200 					(unsigned)mtcp->flow_cnt,
201 					(long long unsigned)ns->rx_packets[i],
202 					(long long unsigned)ns->rx_errors[i],
203 					GBPS(ns->rx_bytes[i]),
204 					(long long unsigned)ns->tx_packets[i],
205 					GBPS(ns->tx_bytes[i]));
206 		}
207 #endif
208 	}
209 	mtcp->p_nstat = mtcp->nstat;
210 
211 }
212 /*----------------------------------------------------------------------------*/
213 #if ROUND_STAT
214 static inline void
215 PrintThreadRoundStats(mtcp_manager_t mtcp, struct run_stat *rs)
216 {
217 #define ROUND_DIV (1000)
218 	rs->rounds = mtcp->runstat.rounds - mtcp->p_runstat.rounds;
219 	rs->rounds_rx = mtcp->runstat.rounds_rx - mtcp->p_runstat.rounds_rx;
220 	rs->rounds_rx_try = mtcp->runstat.rounds_rx_try - mtcp->p_runstat.rounds_rx_try;
221 	rs->rounds_tx = mtcp->runstat.rounds_tx - mtcp->p_runstat.rounds_tx;
222 	rs->rounds_tx_try = mtcp->runstat.rounds_tx_try - mtcp->p_runstat.rounds_tx_try;
223 	rs->rounds_select = mtcp->runstat.rounds_select - mtcp->p_runstat.rounds_select;
224 	rs->rounds_select_rx = mtcp->runstat.rounds_select_rx - mtcp->p_runstat.rounds_select_rx;
225 	rs->rounds_select_tx = mtcp->runstat.rounds_select_tx - mtcp->p_runstat.rounds_select_tx;
226 	rs->rounds_select_intr = mtcp->runstat.rounds_select_intr - mtcp->p_runstat.rounds_select_intr;
227 	rs->rounds_twcheck = mtcp->runstat.rounds_twcheck - mtcp->p_runstat.rounds_twcheck;
228 	mtcp->p_runstat = mtcp->runstat;
229 #if NETSTAT_PERTHREAD
230 	fprintf(stderr, "[CPU%2d] Rounds: %4lluK, "
231 			"rx: %3lluK (try: %4lluK), tx: %3lluK (try: %4lluK), "
232 			"ps_select: %4llu (rx: %4llu, tx: %4llu, intr: %3llu)\n",
233 			mtcp->ctx->cpu, rs->rounds / ROUND_DIV,
234 			rs->rounds_rx / ROUND_DIV, rs->rounds_rx_try / ROUND_DIV,
235 			rs->rounds_tx / ROUND_DIV, rs->rounds_tx_try / ROUND_DIV,
236 			rs->rounds_select,
237 			rs->rounds_select_rx, rs->rounds_select_tx, rs->rounds_select_intr);
238 #endif
239 }
240 #endif /* ROUND_STAT */
241 /*----------------------------------------------------------------------------*/
242 #if TIME_STAT
243 static inline void
244 PrintThreadRoundTime(mtcp_manager_t mtcp)
245 {
246 	fprintf(stderr, "[CPU%2d] Time: (avg, max) "
247 			"round: (%4luus, %4luus), processing: (%4luus, %4luus), "
248 			"tcheck: (%4luus, %4luus), epoll: (%4luus, %4luus), "
249 			"handle: (%4luus, %4luus), xmit: (%4luus, %4luus), "
250 			"select: (%4luus, %4luus)\n", mtcp->ctx->cpu,
251 			GetAverageStat(&mtcp->rtstat.round), mtcp->rtstat.round.max,
252 			GetAverageStat(&mtcp->rtstat.processing), mtcp->rtstat.processing.max,
253 			GetAverageStat(&mtcp->rtstat.tcheck), mtcp->rtstat.tcheck.max,
254 			GetAverageStat(&mtcp->rtstat.epoll), mtcp->rtstat.epoll.max,
255 			GetAverageStat(&mtcp->rtstat.handle), mtcp->rtstat.handle.max,
256 			GetAverageStat(&mtcp->rtstat.xmit), mtcp->rtstat.xmit.max,
257 			GetAverageStat(&mtcp->rtstat.select), mtcp->rtstat.select.max);
258 
259 	InitStatCounter(&mtcp->rtstat.round);
260 	InitStatCounter(&mtcp->rtstat.processing);
261 	InitStatCounter(&mtcp->rtstat.tcheck);
262 	InitStatCounter(&mtcp->rtstat.epoll);
263 	InitStatCounter(&mtcp->rtstat.handle);
264 	InitStatCounter(&mtcp->rtstat.xmit);
265 	InitStatCounter(&mtcp->rtstat.select);
266 }
267 #endif
268 #endif /* NETSTAT */
269 /*----------------------------------------------------------------------------*/
270 #if EVENT_STAT
271 static inline void
272 PrintEventStat(int core, struct mtcp_epoll_stat *stat)
273 {
274 	fprintf(stderr, "[CPU%2d] calls: %lu, waits: %lu, wakes: %lu, "
275 			"issued: %lu, registered: %lu, invalidated: %lu, handled: %lu\n",
276 			core, stat->calls, stat->waits, stat->wakes,
277 			stat->issued, stat->registered, stat->invalidated, stat->handled);
278 	memset(stat, 0, sizeof(struct mtcp_epoll_stat));
279 }
280 #endif /* EVENT_STAT */
281 /*----------------------------------------------------------------------------*/
282 #ifdef NETSTAT
283 static inline void
284 PrintNetworkStats(mtcp_manager_t mtcp, uint32_t cur_ts)
285 {
286 #define TIMEOUT 1
287 	int i;
288 	struct net_stat ns;
289 	bool stat_print = false;
290 #if ROUND_STAT
291 	struct run_stat rs;
292 #endif /* ROUND_STAT */
293 #ifdef NETSTAT_TOTAL
294 	static double peak_total_rx_gbps = 0;
295 	static double peak_total_tx_gbps = 0;
296 	static double avg_total_rx_gbps = 0;
297 	static double avg_total_tx_gbps = 0;
298 
299 	double total_rx_gbps = 0, total_tx_gbps = 0;
300 	int j;
301 	uint32_t gflow_cnt = 0;
302 	struct net_stat g_nstat;
303 #if ROUND_STAT
304 	struct run_stat g_runstat;
305 #endif /* ROUND_STAT */
306 #endif /* NETSTAT_TOTAL */
307 
308 	if (TS_TO_MSEC(cur_ts - mtcp->p_nstat_ts) < SEC_TO_MSEC(TIMEOUT)) {
309 		return;
310 	}
311 
312 	mtcp->p_nstat_ts = cur_ts;
313 	gflow_cnt = 0;
314 	memset(&g_nstat, 0, sizeof(struct net_stat));
315 	for (i = 0; i < g_config.mos->num_cores; i++) {
316 		if (running[i]) {
317 			PrintThreadNetworkStats(g_mtcp[i], &ns);
318 #if NETSTAT_TOTAL
319 			gflow_cnt += g_mtcp[i]->flow_cnt;
320 			for (j = 0; j < g_config.mos->netdev_table->num; j++) {
321 				g_nstat.rx_packets[j] += ns.rx_packets[j];
322 				g_nstat.rx_errors[j] += ns.rx_errors[j];
323 				g_nstat.rx_bytes[j] += ns.rx_bytes[j];
324 				g_nstat.tx_packets[j] += ns.tx_packets[j];
325 				g_nstat.tx_drops[j] += ns.tx_drops[j];
326 				g_nstat.tx_bytes[j] += ns.tx_bytes[j];
327 			}
328 #endif
329 		}
330 	}
331 #if NETSTAT_TOTAL
332 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
333 		if (g_config.mos->netdev_table->ent[i]->stat_print) {
334 			fprintf(stderr, "[ ALL ] %s, "
335 			"RX: %7llu(pps) (err: %5llu), %5.2lf(Gbps), "
336 			"TX: %7llu(pps), %5.2lf(Gbps)\n",
337 				g_config.mos->netdev_table->ent[i]->dev_name,
338 				(long long unsigned)g_nstat.rx_packets[i],
339 				(long long unsigned)g_nstat.rx_errors[i],
340 				GBPS(g_nstat.rx_bytes[i]),
341 				(long long unsigned)g_nstat.tx_packets[i],
342 				GBPS(g_nstat.tx_bytes[i]));
343 			total_rx_gbps += GBPS(g_nstat.rx_bytes[i]);
344 			total_tx_gbps += GBPS(g_nstat.tx_bytes[i]);
345 			stat_print = true;
346 		}
347 	}
348 	if (stat_print) {
349 		fprintf(stderr, "[ ALL ] flows: %6u\n", gflow_cnt);
350 		if (avg_total_rx_gbps == 0)
351 			avg_total_rx_gbps = total_rx_gbps;
352 		else
353 			avg_total_rx_gbps = avg_total_rx_gbps * 0.6 + total_rx_gbps * 0.4;
354 
355 		if (avg_total_tx_gbps == 0)
356 			avg_total_tx_gbps = total_tx_gbps;
357 		else
358 			avg_total_tx_gbps = avg_total_tx_gbps * 0.6 + total_tx_gbps * 0.4;
359 
360 		if (peak_total_rx_gbps < total_rx_gbps)
361 			peak_total_rx_gbps = total_rx_gbps;
362 		if (peak_total_tx_gbps < total_tx_gbps)
363 			peak_total_tx_gbps = total_tx_gbps;
364 
365 		fprintf(stderr, "[ PEAK ] RX: %5.2lf(Gbps), TX: %5.2lf(Gbps)\n"
366 						"[ RECENT AVG ] RX: %5.2lf(Gbps), TX: %5.2lf(Gbps)\n",
367 				peak_total_rx_gbps, peak_total_tx_gbps,
368 				avg_total_rx_gbps, avg_total_tx_gbps);
369 	}
370 #endif
371 
372 #if ROUND_STAT
373 	memset(&g_runstat, 0, sizeof(struct run_stat));
374 	for (i = 0; i < g_config.mos->num_cores; i++) {
375 		if (running[i]) {
376 			PrintThreadRoundStats(g_mtcp[i], &rs);
377 #if DBGMSG
378 			g_runstat.rounds += rs.rounds;
379 			g_runstat.rounds_rx += rs.rounds_rx;
380 			g_runstat.rounds_rx_try += rs.rounds_rx_try;
381 			g_runstat.rounds_tx += rs.rounds_tx;
382 			g_runstat.rounds_tx_try += rs.rounds_tx_try;
383 			g_runstat.rounds_select += rs.rounds_select;
384 			g_runstat.rounds_select_rx += rs.rounds_select_rx;
385 			g_runstat.rounds_select_tx += rs.rounds_select_tx;
386 #endif
387 		}
388 	}
389 
390 	TRACE_DBG("[ ALL ] Rounds: %4ldK, "
391 		  "rx: %3ldK (try: %4ldK), tx: %3ldK (try: %4ldK), "
392 		  "ps_select: %4ld (rx: %4ld, tx: %4ld)\n",
393 		  g_runstat.rounds / 1000, g_runstat.rounds_rx / 1000,
394 		  g_runstat.rounds_rx_try / 1000, g_runstat.rounds_tx / 1000,
395 		  g_runstat.rounds_tx_try / 1000, g_runstat.rounds_select,
396 		  g_runstat.rounds_select_rx, g_runstat.rounds_select_tx);
397 #endif /* ROUND_STAT */
398 
399 #if TIME_STAT
400 	for (i = 0; i < g_config.mos->num_cores; i++) {
401 		if (running[i]) {
402 			PrintThreadRoundTime(g_mtcp[i]);
403 		}
404 	}
405 #endif
406 
407 #if EVENT_STAT
408 	for (i = 0; i < g_config.mos->num_cores; i++) {
409 		if (running[i] && g_mtcp[i]->ep) {
410 			PrintEventStat(i, &g_mtcp[i]->ep->stat);
411 		}
412 	}
413 #endif
414 
415 	fflush(stderr);
416 }
417 #endif /* NETSTAT */
418 /*----------------------------------------------------------------------------*/
419 static inline void
420 FlushMonitorReadEvents(mtcp_manager_t mtcp)
421 {
422 	struct event_queue *mtcpq;
423 	struct tcp_stream *cur_stream;
424 	struct mon_listener *walk;
425 
426 	/* check if monitor sockets should be passed data */
427 	TAILQ_FOREACH(walk, &mtcp->monitors, link) {
428 		if (walk->socket->socktype != MOS_SOCK_MONITOR_STREAM ||
429 			!(mtcpq = walk->eq))
430 			continue;
431 
432 		while (mtcpq->num_events > 0) {
433 			cur_stream =
434 				(struct tcp_stream *)mtcpq->events[mtcpq->start++].ev.data.ptr;
435 			/* only read events */
436 			if (cur_stream != NULL &&
437 			    (cur_stream->actions & MOS_ACT_READ_DATA)) {
438 				if (cur_stream->rcvvar != NULL &&
439 						cur_stream->rcvvar->rcvbuf != NULL) {
440 					/* no need to pass pkt context */
441 					struct socket_map *walk;
442 					SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
443 						HandleCallback(mtcp, MOS_NULL, walk,
444 							       cur_stream->side, NULL,
445 							       MOS_ON_CONN_NEW_DATA);
446 					} SOCKQ_FOREACH_END;
447 				}
448 				/* reset the actions now */
449 				cur_stream->actions = 0;
450 			}
451 			if (mtcpq->start >= mtcpq->size)
452 				mtcpq->start = 0;
453 			mtcpq->num_events--;
454 		}
455 	}
456 }
457 /*----------------------------------------------------------------------------*/
458 static inline void
459 FlushBufferedReadEvents(mtcp_manager_t mtcp)
460 {
461 	int i;
462 	int offset;
463 	struct event_queue *mtcpq;
464 	struct tcp_stream *cur_stream;
465 
466 	if (mtcp->ep == NULL) {
467 		TRACE_EPOLL("No epoll socket has been registered yet!\n");
468 		return;
469 	} else {
470 		/* case when mtcpq exists */
471 		mtcpq = mtcp->ep->mtcp_queue;
472 		offset = mtcpq->start;
473 	}
474 
475 	/* we will use queued-up epoll read-in events
476 	 * to trigger buffered read monitor events */
477 	for (i = 0; i < mtcpq->num_events; i++) {
478 		cur_stream = mtcp->smap[mtcpq->events[offset++].sockid].stream;
479 		/* only read events */
480 		/* Raise new data callback event */
481 		if (cur_stream != NULL &&
482 		    	(cur_stream->socket->events | MOS_EPOLLIN)) {
483 			if (cur_stream->rcvvar != NULL &&
484 					cur_stream->rcvvar->rcvbuf != NULL) {
485 				/* no need to pass pkt context */
486 				struct socket_map *walk;
487 				SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
488 					HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side,
489 						       NULL, MOS_ON_CONN_NEW_DATA);
490 				} SOCKQ_FOREACH_END;
491 			}
492 		}
493 		if (offset >= mtcpq->size)
494 			offset = 0;
495 	}
496 }
497 /*----------------------------------------------------------------------------*/
498 static inline void
499 FlushEpollEvents(mtcp_manager_t mtcp, uint32_t cur_ts)
500 {
501 	struct mtcp_epoll *ep = mtcp->ep;
502 	struct event_queue *usrq = ep->usr_queue;
503 	struct event_queue *mtcpq = ep->mtcp_queue;
504 
505 	pthread_mutex_lock(&ep->epoll_lock);
506 	if (ep->mtcp_queue->num_events > 0) {
507 		/* while mtcp_queue have events */
508 		/* and usr_queue is not full */
509 		while (mtcpq->num_events > 0 && usrq->num_events < usrq->size) {
510 			/* copy the event from mtcp_queue to usr_queue */
511 			usrq->events[usrq->end++] = mtcpq->events[mtcpq->start++];
512 
513 			if (usrq->end >= usrq->size)
514 				usrq->end = 0;
515 			usrq->num_events++;
516 
517 			if (mtcpq->start >= mtcpq->size)
518 				mtcpq->start = 0;
519 			mtcpq->num_events--;
520 		}
521 	}
522 
523 	/* if there are pending events, wake up user */
524 	if (ep->waiting && (ep->usr_queue->num_events > 0 ||
525 				ep->usr_shadow_queue->num_events > 0)) {
526 		STAT_COUNT(mtcp->runstat.rounds_epoll);
527 		TRACE_EPOLL("Broadcasting events. num: %d, cur_ts: %u, prev_ts: %u\n",
528 				ep->usr_queue->num_events, cur_ts, mtcp->ts_last_event);
529 		mtcp->ts_last_event = cur_ts;
530 		ep->stat.wakes++;
531 		pthread_cond_signal(&ep->epoll_cond);
532 	}
533 	pthread_mutex_unlock(&ep->epoll_lock);
534 }
535 /*----------------------------------------------------------------------------*/
536 static inline void
537 HandleApplicationCalls(mtcp_manager_t mtcp, uint32_t cur_ts)
538 {
539 	tcp_stream *stream;
540 	int cnt, max_cnt;
541 	int handled, delayed;
542 	int control, send, ack;
543 
544 	/* connect handling */
545 	while ((stream = StreamDequeue(mtcp->connectq))) {
546 		if (stream->state != TCP_ST_SYN_SENT) {
547 			TRACE_INFO("Got a connection request from app with state: %s",
548 				   TCPStateToString(stream));
549 			exit(EXIT_FAILURE);
550 		} else {
551 			stream->cb_events |= MOS_ON_CONN_START |
552 				MOS_ON_TCP_STATE_CHANGE;
553 			/* if monitor is on... */
554 			if (stream->pair_stream != NULL)
555 				stream->pair_stream->cb_events |=
556 					MOS_ON_CONN_START;
557 		}
558 		AddtoControlList(mtcp, stream, cur_ts);
559 	}
560 
561 	/* send queue handling */
562 	while ((stream = StreamDequeue(mtcp->sendq))) {
563 		stream->sndvar->on_sendq = FALSE;
564 		AddtoSendList(mtcp, stream);
565 	}
566 
567 	/* ack queue handling */
568 	while ((stream = StreamDequeue(mtcp->ackq))) {
569 		stream->sndvar->on_ackq = FALSE;
570 		EnqueueACK(mtcp, stream, cur_ts, ACK_OPT_AGGREGATE);
571 	}
572 
573 	/* close handling */
574 	handled = delayed = 0;
575 	control = send = ack = 0;
576 	while ((stream = StreamDequeue(mtcp->closeq))) {
577 		struct tcp_send_vars *sndvar = stream->sndvar;
578 		sndvar->on_closeq = FALSE;
579 
580 		if (sndvar->sndbuf) {
581 			sndvar->fss = sndvar->sndbuf->head_seq + sndvar->sndbuf->len;
582 		} else {
583 			sndvar->fss = stream->snd_nxt;
584 		}
585 
586 		if (g_config.mos->tcp_timeout > 0)
587 			RemoveFromTimeoutList(mtcp, stream);
588 
589 		if (stream->have_reset) {
590 			handled++;
591 			if (stream->state != TCP_ST_CLOSED_RSVD) {
592 				stream->close_reason = TCP_RESET;
593 				stream->state = TCP_ST_CLOSED_RSVD;
594 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
595 				TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id);
596 				DestroyTCPStream(mtcp, stream);
597 			} else {
598 				TRACE_ERROR("Stream already closed.\n");
599 			}
600 
601 		} else if (sndvar->on_control_list) {
602 			sndvar->on_closeq_int = TRUE;
603 			StreamInternalEnqueue(mtcp->closeq_int, stream);
604 			delayed++;
605 			if (sndvar->on_control_list)
606 				control++;
607 			if (sndvar->on_send_list)
608 				send++;
609 			if (sndvar->on_ack_list)
610 				ack++;
611 
612 		} else if (sndvar->on_send_list || sndvar->on_ack_list) {
613 			handled++;
614 			if (stream->state == TCP_ST_ESTABLISHED) {
615 				stream->state = TCP_ST_FIN_WAIT_1;
616 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
617 				TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id);
618 
619 			} else if (stream->state == TCP_ST_CLOSE_WAIT) {
620 				stream->state = TCP_ST_LAST_ACK;
621 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
622 				TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id);
623 			}
624 			stream->control_list_waiting = TRUE;
625 
626 		} else if (stream->state != TCP_ST_CLOSED_RSVD) {
627 			handled++;
628 			if (stream->state == TCP_ST_ESTABLISHED) {
629 				stream->state = TCP_ST_FIN_WAIT_1;
630 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
631 				TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id);
632 
633 			} else if (stream->state == TCP_ST_CLOSE_WAIT) {
634 				stream->state = TCP_ST_LAST_ACK;
635 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
636 				TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id);
637 			}
638 			//sndvar->rto = TCP_FIN_RTO;
639 			//UpdateRetransmissionTimer(mtcp, stream, mtcp->cur_ts);
640 			AddtoControlList(mtcp, stream, cur_ts);
641 		} else {
642 			TRACE_ERROR("Already closed connection!\n");
643 		}
644 	}
645 	TRACE_ROUND("Handling close connections. cnt: %d\n", cnt);
646 
647 	cnt = 0;
648 	max_cnt = mtcp->closeq_int->count;
649 	while (cnt++ < max_cnt) {
650 		stream = StreamInternalDequeue(mtcp->closeq_int);
651 
652 		if (stream->sndvar->on_control_list) {
653 			StreamInternalEnqueue(mtcp->closeq_int, stream);
654 
655 		} else if (stream->state != TCP_ST_CLOSED_RSVD) {
656 			handled++;
657 			stream->sndvar->on_closeq_int = FALSE;
658 			if (stream->state == TCP_ST_ESTABLISHED) {
659 				stream->state = TCP_ST_FIN_WAIT_1;
660 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
661 				TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id);
662 
663 			} else if (stream->state == TCP_ST_CLOSE_WAIT) {
664 				stream->state = TCP_ST_LAST_ACK;
665 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
666 				TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id);
667 			}
668 			AddtoControlList(mtcp, stream, cur_ts);
669 		} else {
670 			stream->sndvar->on_closeq_int = FALSE;
671 			TRACE_ERROR("Already closed connection!\n");
672 		}
673 	}
674 
675 	/* reset handling */
676 	while ((stream = StreamDequeue(mtcp->resetq))) {
677 		stream->sndvar->on_resetq = FALSE;
678 
679 		if (g_config.mos->tcp_timeout > 0)
680 			RemoveFromTimeoutList(mtcp, stream);
681 
682 		if (stream->have_reset) {
683 			if (stream->state != TCP_ST_CLOSED_RSVD) {
684 				stream->close_reason = TCP_RESET;
685 				stream->state = TCP_ST_CLOSED_RSVD;
686 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
687 				TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id);
688 				DestroyTCPStream(mtcp, stream);
689 			} else {
690 				TRACE_ERROR("Stream already closed.\n");
691 			}
692 
693 		} else if (stream->sndvar->on_control_list ||
694 				stream->sndvar->on_send_list || stream->sndvar->on_ack_list) {
695 			/* wait until all the queues are flushed */
696 			stream->sndvar->on_resetq_int = TRUE;
697 			StreamInternalEnqueue(mtcp->resetq_int, stream);
698 
699 		} else {
700 			if (stream->state != TCP_ST_CLOSED_RSVD) {
701 				stream->close_reason = TCP_ACTIVE_CLOSE;
702 				stream->state = TCP_ST_CLOSED_RSVD;
703 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
704 				TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id);
705 				AddtoControlList(mtcp, stream, cur_ts);
706 			} else {
707 				TRACE_ERROR("Stream already closed.\n");
708 			}
709 		}
710 	}
711 	TRACE_ROUND("Handling reset connections. cnt: %d\n", cnt);
712 
713 	cnt = 0;
714 	max_cnt = mtcp->resetq_int->count;
715 	while (cnt++ < max_cnt) {
716 		stream = StreamInternalDequeue(mtcp->resetq_int);
717 
718 		if (stream->sndvar->on_control_list ||
719 				stream->sndvar->on_send_list || stream->sndvar->on_ack_list) {
720 			/* wait until all the queues are flushed */
721 			StreamInternalEnqueue(mtcp->resetq_int, stream);
722 
723 		} else {
724 			stream->sndvar->on_resetq_int = FALSE;
725 
726 			if (stream->state != TCP_ST_CLOSED_RSVD) {
727 				stream->close_reason = TCP_ACTIVE_CLOSE;
728 				stream->state = TCP_ST_CLOSED_RSVD;
729 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
730 				TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id);
731 				AddtoControlList(mtcp, stream, cur_ts);
732 			} else {
733 				TRACE_ERROR("Stream already closed.\n");
734 			}
735 		}
736 	}
737 
738 	/* destroy streams in destroyq */
739 	while ((stream = StreamDequeue(mtcp->destroyq))) {
740 		DestroyTCPStream(mtcp, stream);
741 	}
742 
743 	mtcp->wakeup_flag = FALSE;
744 }
745 /*----------------------------------------------------------------------------*/
746 static inline void
747 WritePacketsToChunks(mtcp_manager_t mtcp, uint32_t cur_ts)
748 {
749 	int thresh = g_config.mos->max_concurrency;
750 	int i;
751 
752 	/* Set the threshold to g_config.mos->max_concurrency to send ACK immediately */
753 	/* Otherwise, set to appropriate value (e.g. thresh) */
754 	assert(mtcp->g_sender != NULL);
755 	if (mtcp->g_sender->control_list_cnt)
756 		WriteTCPControlList(mtcp, mtcp->g_sender, cur_ts, thresh);
757 	if (mtcp->g_sender->ack_list_cnt)
758 		WriteTCPACKList(mtcp, mtcp->g_sender, cur_ts, thresh);
759 	if (mtcp->g_sender->send_list_cnt)
760 		WriteTCPDataList(mtcp, mtcp->g_sender, cur_ts, thresh);
761 
762 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
763 		assert(mtcp->n_sender[i] != NULL);
764 		if (mtcp->n_sender[i]->control_list_cnt)
765 			WriteTCPControlList(mtcp, mtcp->n_sender[i], cur_ts, thresh);
766 		if (mtcp->n_sender[i]->ack_list_cnt)
767 			WriteTCPACKList(mtcp, mtcp->n_sender[i], cur_ts, thresh);
768 		if (mtcp->n_sender[i]->send_list_cnt)
769 			WriteTCPDataList(mtcp, mtcp->n_sender[i], cur_ts, thresh);
770 	}
771 }
772 /*----------------------------------------------------------------------------*/
773 #if TESTING
774 static int
775 DestroyRemainingFlows(mtcp_manager_t mtcp)
776 {
777 	struct hashtable *ht = mtcp->tcp_flow_table;
778 	tcp_stream *walk;
779 	int cnt, i;
780 
781 	cnt = 0;
782 
783 	thread_printf(mtcp, mtcp->log_fp,
784 			"CPU %d: Flushing remaining flows.\n", mtcp->ctx->cpu);
785 
786 	for (i = 0; i < NUM_BINS; i++) {
787 		TAILQ_FOREACH(walk, &ht->ht_table[i], rcvvar->he_link) {
788 			thread_printf(mtcp, mtcp->log_fp,
789 					"CPU %d: Destroying stream %d\n", mtcp->ctx->cpu, walk->id);
790 #ifdef DUMP_STREAM
791 			DumpStream(mtcp, walk);
792 #endif
793 			DestroyTCPStream(mtcp, walk);
794 			cnt++;
795 		}
796 	}
797 
798 	return cnt;
799 }
800 #endif
801 /*----------------------------------------------------------------------------*/
802 static void
803 InterruptApplication(mtcp_manager_t mtcp)
804 {
805 	/* interrupt if the mtcp_epoll_wait() is waiting */
806 	if (mtcp->ep) {
807 		pthread_mutex_lock(&mtcp->ep->epoll_lock);
808 		if (mtcp->ep->waiting) {
809 			pthread_cond_signal(&mtcp->ep->epoll_cond);
810 		}
811 		pthread_mutex_unlock(&mtcp->ep->epoll_lock);
812 	}
813 	/* interrupt if the accept() is waiting */
814 	if (mtcp->listener) {
815 		if (mtcp->listener->socket) {
816 			pthread_mutex_lock(&mtcp->listener->accept_lock);
817 			if (!(mtcp->listener->socket->opts & MTCP_NONBLOCK)) {
818 				pthread_cond_signal(&mtcp->listener->accept_cond);
819 			}
820 			pthread_mutex_unlock(&mtcp->listener->accept_lock);
821 		}
822 	}
823 }
824 /*----------------------------------------------------------------------------*/
825 void
826 RunPassiveLoop(mtcp_manager_t mtcp)
827 {
828 	sem_wait(&g_done_sem[mtcp->ctx->cpu]);
829 	sem_destroy(&g_done_sem[mtcp->ctx->cpu]);
830 	return;
831 }
832 /*----------------------------------------------------------------------------*/
833 static void
834 RunMainLoop(struct mtcp_thread_context *ctx)
835 {
836 	mtcp_manager_t mtcp = ctx->mtcp_manager;
837 	int i;
838 	int recv_cnt;
839 	int rx_inf, tx_inf;
840 	struct timeval cur_ts = {0};
841 	uint32_t ts, ts_prev;
842 
843 #if TIME_STAT
844 	struct timeval prev_ts, processing_ts, tcheck_ts,
845 				   epoll_ts, handle_ts, xmit_ts, select_ts;
846 #endif
847 	int thresh;
848 
849 	gettimeofday(&cur_ts, NULL);
850 
851 	TRACE_DBG("CPU %d: mtcp thread running.\n", ctx->cpu);
852 
853 #if TIME_STAT
854 	prev_ts = cur_ts;
855 	InitStatCounter(&mtcp->rtstat.round);
856 	InitStatCounter(&mtcp->rtstat.processing);
857 	InitStatCounter(&mtcp->rtstat.tcheck);
858 	InitStatCounter(&mtcp->rtstat.epoll);
859 	InitStatCounter(&mtcp->rtstat.handle);
860 	InitStatCounter(&mtcp->rtstat.xmit);
861 	InitStatCounter(&mtcp->rtstat.select);
862 #endif
863 
864 	ts = ts_prev = 0;
865 	while ((!ctx->done || mtcp->flow_cnt) && !ctx->exit) {
866 
867 		STAT_COUNT(mtcp->runstat.rounds);
868 		recv_cnt = 0;
869 		gettimeofday(&cur_ts, NULL);
870 #if TIME_STAT
871 		/* measure the inter-round delay */
872 		UpdateStatCounter(&mtcp->rtstat.round, TimeDiffUs(&cur_ts, &prev_ts));
873 		prev_ts = cur_ts;
874 #endif
875 
876 		ts = TIMEVAL_TO_TS(&cur_ts);
877 		mtcp->cur_ts = ts;
878 
879 		for (rx_inf = 0; rx_inf < g_config.mos->netdev_table->num; rx_inf++) {
880 
881 			recv_cnt = mtcp->iom->recv_pkts(ctx, rx_inf);
882 			STAT_COUNT(mtcp->runstat.rounds_rx_try);
883 
884 			for (i = 0; i < recv_cnt; i++) {
885 				uint16_t len;
886 				uint8_t *pktbuf;
887 				pktbuf = mtcp->iom->get_rptr(mtcp->ctx, rx_inf, i, &len);
888 				ProcessPacket(mtcp, rx_inf, i, ts, pktbuf, len);
889 			}
890 
891 #ifdef ENABLE_DPDKR
892 			mtcp->iom->send_pkts(ctx, rx_inf);
893 			continue;
894 #endif
895 		}
896 		STAT_COUNT(mtcp->runstat.rounds_rx);
897 
898 #if TIME_STAT
899 		gettimeofday(&processing_ts, NULL);
900 		UpdateStatCounter(&mtcp->rtstat.processing,
901 				TimeDiffUs(&processing_ts, &cur_ts));
902 #endif /* TIME_STAT */
903 
904 		/* Handle user defined timeout */
905 		struct timer *walk, *tmp;
906 		for (walk = TAILQ_FIRST(&mtcp->timer_list); walk != NULL; walk = tmp) {
907 			tmp = TAILQ_NEXT(walk, timer_link);
908 			if (TIMEVAL_LT(&cur_ts, &walk->exp))
909 				break;
910 
911 			struct mtcp_context mctx = {.cpu = ctx->cpu};
912 			walk->cb(&mctx, walk->id, 0, 0 /* FIXME */, NULL);
913 			DelTimer(mtcp, walk);
914 		}
915 
916 		/* interaction with application */
917 		if (mtcp->flow_cnt > 0) {
918 
919 			/* check retransmission timeout and timewait expire */
920 #if 0
921 			thresh = (int)mtcp->flow_cnt / (TS_TO_USEC(PER_STREAM_TCHECK));
922 			assert(thresh >= 0);
923 			if (thresh == 0)
924 				thresh = 1;
925 			if (recv_cnt > 0 && thresh > recv_cnt)
926 				thresh = recv_cnt;
927 #else
928 			thresh = g_config.mos->max_concurrency;
929 #endif
930 
931 			/* Eunyoung, you may fix this later
932 			 * if there is no rcv packet, we will send as much as possible
933 			 */
934 			if (thresh == -1)
935 				thresh = g_config.mos->max_concurrency;
936 
937 			CheckRtmTimeout(mtcp, ts, thresh);
938 			CheckTimewaitExpire(mtcp, ts, thresh);
939 
940 			if (g_config.mos->tcp_timeout > 0 && ts != ts_prev) {
941 				CheckConnectionTimeout(mtcp, ts, thresh);
942 			}
943 
944 #if TIME_STAT
945 		}
946 		gettimeofday(&tcheck_ts, NULL);
947 		UpdateStatCounter(&mtcp->rtstat.tcheck,
948 				TimeDiffUs(&tcheck_ts, &processing_ts));
949 
950 		if (mtcp->flow_cnt > 0) {
951 #endif /* TIME_STAT */
952 
953 		}
954 
955 		/*
956 		 * before flushing epoll events, call monitor events for
957 		 * all registered `read` events
958 		 */
959 		if (mtcp->num_msp > 0)
960 			/* call this when only a standalone monitor is running */
961 			FlushMonitorReadEvents(mtcp);
962 
963 		/* if epoll is in use, flush all the queued events */
964 		if (mtcp->ep) {
965 			FlushBufferedReadEvents(mtcp);
966 			FlushEpollEvents(mtcp, ts);
967 		}
968 #if TIME_STAT
969 		gettimeofday(&epoll_ts, NULL);
970 		UpdateStatCounter(&mtcp->rtstat.epoll,
971 				TimeDiffUs(&epoll_ts, &tcheck_ts));
972 #endif /* TIME_STAT */
973 
974 		if (end_app_exists && mtcp->flow_cnt > 0) {
975 			/* handle stream queues  */
976 			HandleApplicationCalls(mtcp, ts);
977 		}
978 
979 #ifdef ENABLE_DPDKR
980 		continue;
981 #endif
982 
983 #if TIME_STAT
984 		gettimeofday(&handle_ts, NULL);
985 		UpdateStatCounter(&mtcp->rtstat.handle,
986 				TimeDiffUs(&handle_ts, &epoll_ts));
987 #endif /* TIME_STAT */
988 
989 		WritePacketsToChunks(mtcp, ts);
990 
991 		/* send packets from write buffer */
992 		/* Send until tx is available */
993 		int num_dev = g_config.mos->netdev_table->num;
994 		if (likely(mtcp->iom->send_pkts != NULL))
995 			for (tx_inf = 0; tx_inf < num_dev; tx_inf++) {
996 				mtcp->iom->send_pkts(ctx, tx_inf);
997 			}
998 
999 #if TIME_STAT
1000 		gettimeofday(&xmit_ts, NULL);
1001 		UpdateStatCounter(&mtcp->rtstat.xmit,
1002 				TimeDiffUs(&xmit_ts, &handle_ts));
1003 #endif /* TIME_STAT */
1004 
1005 		if (ts != ts_prev) {
1006 			ts_prev = ts;
1007 #ifdef NETSTAT
1008 			if (ctx->cpu == printer) {
1009 #ifdef RUN_ARP
1010 				ARPTimer(mtcp, ts);
1011 #endif
1012 #ifdef NETSTAT
1013 				PrintNetworkStats(mtcp, ts);
1014 #endif
1015 			}
1016 #endif /* NETSTAT */
1017 		}
1018 
1019 		if (mtcp->iom->select)
1020 			mtcp->iom->select(ctx);
1021 
1022 		if (ctx->interrupt) {
1023 			InterruptApplication(mtcp);
1024 		}
1025 	}
1026 
1027 #if TESTING
1028 	DestroyRemainingFlows(mtcp);
1029 #endif
1030 
1031 	TRACE_DBG("MTCP thread %d out of main loop.\n", ctx->cpu);
1032 	/* flush logs */
1033 	flush_log_data(mtcp);
1034 	TRACE_DBG("MTCP thread %d flushed logs.\n", ctx->cpu);
1035 	InterruptApplication(mtcp);
1036 	TRACE_INFO("MTCP thread %d finished.\n", ctx->cpu);
1037 }
1038 /*----------------------------------------------------------------------------*/
1039 struct mtcp_sender *
1040 CreateMTCPSender(int ifidx)
1041 {
1042 	struct mtcp_sender *sender;
1043 
1044 	sender = (struct mtcp_sender *)calloc(1, sizeof(struct mtcp_sender));
1045 	if (!sender) {
1046 		return NULL;
1047 	}
1048 
1049 	sender->ifidx = ifidx;
1050 
1051 	TAILQ_INIT(&sender->control_list);
1052 	TAILQ_INIT(&sender->send_list);
1053 	TAILQ_INIT(&sender->ack_list);
1054 
1055 	sender->control_list_cnt = 0;
1056 	sender->send_list_cnt = 0;
1057 	sender->ack_list_cnt = 0;
1058 
1059 	return sender;
1060 }
1061 /*----------------------------------------------------------------------------*/
1062 void
1063 DestroyMTCPSender(struct mtcp_sender *sender)
1064 {
1065 	free(sender);
1066 }
1067 /*----------------------------------------------------------------------------*/
1068 static mtcp_manager_t
1069 InitializeMTCPManager(struct mtcp_thread_context* ctx)
1070 {
1071 	mtcp_manager_t mtcp;
1072 	char log_name[MAX_FILE_NAME];
1073 	int i;
1074 
1075 	posix_seq_srand((unsigned)pthread_self());
1076 
1077 	mtcp = (mtcp_manager_t)calloc(1, sizeof(struct mtcp_manager));
1078 	if (!mtcp) {
1079 		perror("malloc");
1080 		fprintf(stderr, "Failed to allocate mtcp_manager.\n");
1081 		return NULL;
1082 	}
1083 	g_mtcp[ctx->cpu] = mtcp;
1084 
1085 	mtcp->tcp_flow_table = CreateHashtable();
1086 	if (!mtcp->tcp_flow_table) {
1087 		CTRACE_ERROR("Falied to allocate tcp flow table.\n");
1088 		return NULL;
1089 	}
1090 
1091 #ifdef HUGEPAGE
1092 #define	IS_HUGEPAGE 1
1093 #else
1094 #define	IS_HUGEPAGE 0
1095 #endif
1096 	if (mon_app_exists) {
1097 		/* initialize event callback */
1098 #ifdef NEWEV
1099 		InitEvent(mtcp);
1100 #else
1101 		InitEvent(mtcp, NUM_EV_TABLE);
1102 #endif
1103 	}
1104 
1105 	if (!(mtcp->bufseg_pool = MPCreate(sizeof(tcpbufseg_t),
1106 			sizeof(tcpbufseg_t) * g_config.mos->max_concurrency *
1107 			((g_config.mos->rmem_size - 1) / UNITBUFSIZE + 1), 0))) {
1108 		TRACE_ERROR("Failed to allocate ev_table pool\n");
1109 		exit(0);
1110 	}
1111 	if (!(mtcp->sockent_pool = MPCreate(sizeof(struct sockent),
1112 			sizeof(struct sockent) * g_config.mos->max_concurrency * 3, 0))) {
1113 		TRACE_ERROR("Failed to allocate ev_table pool\n");
1114 		exit(0);
1115 	}
1116 #ifdef USE_TIMER_POOL
1117 	if (!(mtcp->timer_pool = MPCreate(sizeof(struct timer),
1118 					  sizeof(struct timer) * g_config.mos->max_concurrency * 10, 0))) {
1119 		TRACE_ERROR("Failed to allocate ev_table pool\n");
1120 		exit(0);
1121 	}
1122 #endif
1123 	mtcp->flow_pool = MPCreate(sizeof(tcp_stream),
1124 								sizeof(tcp_stream) * g_config.mos->max_concurrency, IS_HUGEPAGE);
1125 	if (!mtcp->flow_pool) {
1126 		CTRACE_ERROR("Failed to allocate tcp flow pool.\n");
1127 		return NULL;
1128 	}
1129 	mtcp->rv_pool = MPCreate(sizeof(struct tcp_recv_vars),
1130 			sizeof(struct tcp_recv_vars) * g_config.mos->max_concurrency, IS_HUGEPAGE);
1131 	if (!mtcp->rv_pool) {
1132 		CTRACE_ERROR("Failed to allocate tcp recv variable pool.\n");
1133 		return NULL;
1134 	}
1135 	mtcp->sv_pool = MPCreate(sizeof(struct tcp_send_vars),
1136 			sizeof(struct tcp_send_vars) * g_config.mos->max_concurrency, IS_HUGEPAGE);
1137 	if (!mtcp->sv_pool) {
1138 		CTRACE_ERROR("Failed to allocate tcp send variable pool.\n");
1139 		return NULL;
1140 	}
1141 
1142 	mtcp->rbm_snd = SBManagerCreate(g_config.mos->wmem_size, g_config.mos->no_ring_buffers,
1143 					g_config.mos->max_concurrency);
1144 	if (!mtcp->rbm_snd) {
1145 		CTRACE_ERROR("Failed to create send ring buffer.\n");
1146 		return NULL;
1147 	}
1148 
1149 	mtcp->smap = (socket_map_t)calloc(g_config.mos->max_concurrency, sizeof(struct socket_map));
1150 	if (!mtcp->smap) {
1151 		perror("calloc");
1152 		CTRACE_ERROR("Failed to allocate memory for stream map.\n");
1153 		return NULL;
1154 	}
1155 
1156 	if (mon_app_exists) {
1157 		mtcp->msmap = (socket_map_t)calloc(g_config.mos->max_concurrency, sizeof(struct socket_map));
1158 		if (!mtcp->msmap) {
1159 			perror("calloc");
1160 			CTRACE_ERROR("Failed to allocate memory for monitor stream map.\n");
1161 			return NULL;
1162 		}
1163 
1164 		for (i = 0; i < g_config.mos->max_concurrency; i++) {
1165 			mtcp->msmap[i].monitor_stream = calloc(1, sizeof(struct mon_stream));
1166 			if (!mtcp->msmap[i].monitor_stream) {
1167 				perror("calloc");
1168 				CTRACE_ERROR("Failed to allocate memory for monitr stream map\n");
1169 				return NULL;
1170 			}
1171 		}
1172 	}
1173 
1174 	TAILQ_INIT(&mtcp->timer_list);
1175 	TAILQ_INIT(&mtcp->monitors);
1176 
1177 	TAILQ_INIT(&mtcp->free_smap);
1178 	for (i = 0; i < g_config.mos->max_concurrency; i++) {
1179 		mtcp->smap[i].id = i;
1180 		mtcp->smap[i].socktype = MOS_SOCK_UNUSED;
1181 		memset(&mtcp->smap[i].saddr, 0, sizeof(struct sockaddr_in));
1182 		mtcp->smap[i].stream = NULL;
1183 		TAILQ_INSERT_TAIL(&mtcp->free_smap, &mtcp->smap[i], link);
1184 	}
1185 
1186 	if (mon_app_exists) {
1187 		TAILQ_INIT(&mtcp->free_msmap);
1188 		for (i = 0; i < g_config.mos->max_concurrency; i++) {
1189 			mtcp->msmap[i].id = i;
1190 			mtcp->msmap[i].socktype = MOS_SOCK_UNUSED;
1191 			memset(&mtcp->msmap[i].saddr, 0, sizeof(struct sockaddr_in));
1192 			TAILQ_INSERT_TAIL(&mtcp->free_msmap, &mtcp->msmap[i], link);
1193 		}
1194 	}
1195 
1196 	mtcp->ctx = ctx;
1197 	mtcp->ep = NULL;
1198 
1199 	snprintf(log_name, MAX_FILE_NAME, "%s/"LOG_FILE_NAME"_%d",
1200 			g_config.mos->mos_log, ctx->cpu);
1201 	mtcp->log_fp = fopen(log_name, "w+");
1202 	if (!mtcp->log_fp) {
1203 		perror("fopen");
1204 		CTRACE_ERROR("Failed to create file for logging. (%s)\n", log_name);
1205 		return NULL;
1206 	}
1207 	mtcp->sp_fd = g_logctx[ctx->cpu]->pair_sp_fd;
1208 	mtcp->logger = g_logctx[ctx->cpu];
1209 
1210 	mtcp->connectq = CreateStreamQueue(BACKLOG_SIZE);
1211 	if (!mtcp->connectq) {
1212 		CTRACE_ERROR("Failed to create connect queue.\n");
1213 		return NULL;
1214 	}
1215 	mtcp->sendq = CreateStreamQueue(g_config.mos->max_concurrency);
1216 	if (!mtcp->sendq) {
1217 		CTRACE_ERROR("Failed to create send queue.\n");
1218 		return NULL;
1219 	}
1220 	mtcp->ackq = CreateStreamQueue(g_config.mos->max_concurrency);
1221 	if (!mtcp->ackq) {
1222 		CTRACE_ERROR("Failed to create ack queue.\n");
1223 		return NULL;
1224 	}
1225 	mtcp->closeq = CreateStreamQueue(g_config.mos->max_concurrency);
1226 	if (!mtcp->closeq) {
1227 		CTRACE_ERROR("Failed to create close queue.\n");
1228 		return NULL;
1229 	}
1230 	mtcp->closeq_int = CreateInternalStreamQueue(g_config.mos->max_concurrency);
1231 	if (!mtcp->closeq_int) {
1232 		CTRACE_ERROR("Failed to create close queue.\n");
1233 		return NULL;
1234 	}
1235 	mtcp->resetq = CreateStreamQueue(g_config.mos->max_concurrency);
1236 	if (!mtcp->resetq) {
1237 		CTRACE_ERROR("Failed to create reset queue.\n");
1238 		return NULL;
1239 	}
1240 	mtcp->resetq_int = CreateInternalStreamQueue(g_config.mos->max_concurrency);
1241 	if (!mtcp->resetq_int) {
1242 		CTRACE_ERROR("Failed to create reset queue.\n");
1243 		return NULL;
1244 	}
1245 	mtcp->destroyq = CreateStreamQueue(g_config.mos->max_concurrency);
1246 	if (!mtcp->destroyq) {
1247 		CTRACE_ERROR("Failed to create destroy queue.\n");
1248 		return NULL;
1249 	}
1250 
1251 	mtcp->g_sender = CreateMTCPSender(-1);
1252 	if (!mtcp->g_sender) {
1253 		CTRACE_ERROR("Failed to create global sender structure.\n");
1254 		return NULL;
1255 	}
1256 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
1257 		mtcp->n_sender[i] = CreateMTCPSender(i);
1258 		if (!mtcp->n_sender[i]) {
1259 			CTRACE_ERROR("Failed to create per-nic sender structure.\n");
1260 			return NULL;
1261 		}
1262 	}
1263 
1264 	mtcp->rto_store = InitRTOHashstore();
1265 	TAILQ_INIT(&mtcp->timewait_list);
1266 	TAILQ_INIT(&mtcp->timeout_list);
1267 
1268 	return mtcp;
1269 }
1270 /*----------------------------------------------------------------------------*/
1271 static void *
1272 MTCPRunThread(void *arg)
1273 {
1274 	mctx_t mctx = (mctx_t)arg;
1275 	int cpu = mctx->cpu;
1276 	int working;
1277 	struct mtcp_manager *mtcp;
1278 	struct mtcp_thread_context *ctx;
1279 
1280 	/* affinitize the thread to this core first */
1281 	mtcp_core_affinitize(cpu);
1282 
1283 	/* memory alloc after core affinitization would use local memory
1284 	   most time */
1285 	ctx = calloc(1, sizeof(*ctx));
1286 	if (!ctx) {
1287 		perror("calloc");
1288 		TRACE_ERROR("Failed to calloc mtcp context.\n");
1289 		exit(-1);
1290 	}
1291 	ctx->thread = pthread_self();
1292 	ctx->cpu = cpu;
1293 	mtcp = ctx->mtcp_manager = InitializeMTCPManager(ctx);
1294 	if (!mtcp) {
1295 		TRACE_ERROR("Failed to initialize mtcp manager.\n");
1296 		exit(-1);
1297 	}
1298 
1299 	/* assign mtcp context's underlying I/O module */
1300 	mtcp->iom = current_iomodule_func;
1301 
1302 	/* I/O initializing */
1303 	if (mtcp->iom->init_handle)
1304 		mtcp->iom->init_handle(ctx);
1305 
1306 	if (pthread_mutex_init(&ctx->flow_pool_lock, NULL)) {
1307 		perror("pthread_mutex_init of ctx->flow_pool_lock\n");
1308 		exit(-1);
1309 	}
1310 
1311 	if (pthread_mutex_init(&ctx->socket_pool_lock, NULL)) {
1312 		perror("pthread_mutex_init of ctx->socket_pool_lock\n");
1313 		exit(-1);
1314 	}
1315 
1316 	SQ_LOCK_INIT(&ctx->connect_lock, "ctx->connect_lock", exit(-1));
1317 	SQ_LOCK_INIT(&ctx->close_lock, "ctx->close_lock", exit(-1));
1318 	SQ_LOCK_INIT(&ctx->reset_lock, "ctx->reset_lock", exit(-1));
1319 	SQ_LOCK_INIT(&ctx->sendq_lock, "ctx->sendq_lock", exit(-1));
1320 	SQ_LOCK_INIT(&ctx->ackq_lock, "ctx->ackq_lock", exit(-1));
1321 	SQ_LOCK_INIT(&ctx->destroyq_lock, "ctx->destroyq_lock", exit(-1));
1322 
1323 	/* remember this context pointer for signal processing */
1324 	g_pctx[cpu] = ctx;
1325 	mlockall(MCL_CURRENT);
1326 
1327 	// attach (nic device, queue)
1328 	working = AttachDevice(ctx);
1329 	if (working != 0) {
1330 		sem_post(&g_init_sem[ctx->cpu]);
1331 		TRACE_DBG("MTCP thread %d finished. Not attached any device\n", ctx->cpu);
1332 		pthread_exit(NULL);
1333 	}
1334 
1335 	TRACE_DBG("CPU %d: initialization finished.\n", cpu);
1336 	sem_post(&g_init_sem[ctx->cpu]);
1337 
1338 	/* start the main loop */
1339 	RunMainLoop(ctx);
1340 
1341 	TRACE_DBG("MTCP thread %d finished.\n", ctx->cpu);
1342 
1343 	/* signaling mTCP thread is done */
1344 	sem_post(&g_done_sem[mctx->cpu]);
1345 
1346 	//pthread_exit(NULL);
1347 	return 0;
1348 }
1349 /*----------------------------------------------------------------------------*/
1350 #ifdef ENABLE_DPDK
1351 static int MTCPDPDKRunThread(void *arg)
1352 {
1353 	MTCPRunThread(arg);
1354 	return 0;
1355 }
1356 #endif /* !ENABLE_DPDK */
1357 /*----------------------------------------------------------------------------*/
1358 mctx_t
1359 mtcp_create_context(int cpu)
1360 {
1361 	mctx_t mctx;
1362 	int ret;
1363 
1364 	if (cpu >=  g_config.mos->num_cores) {
1365 		TRACE_ERROR("Failed initialize new mtcp context. "
1366 					"Requested cpu id %d exceed the number of cores %d configured to use.\n",
1367 					cpu, g_config.mos->num_cores);
1368 		return NULL;
1369 	}
1370 
1371         /* check if mtcp_create_context() was already initialized */
1372         if (g_logctx[cpu] != NULL) {
1373                 TRACE_ERROR("%s was already initialized before!\n",
1374                             __FUNCTION__);
1375                 return NULL;
1376         }
1377 
1378 	ret = sem_init(&g_init_sem[cpu], 0, 0);
1379 	if (ret) {
1380 		TRACE_ERROR("Failed initialize init_sem.\n");
1381 		return NULL;
1382 	}
1383 
1384 	ret = sem_init(&g_done_sem[cpu], 0, 0);
1385 	if (ret) {
1386 		TRACE_ERROR("Failed initialize done_sem.\n");
1387 		return NULL;
1388 	}
1389 
1390 	mctx = (mctx_t)calloc(1, sizeof(struct mtcp_context));
1391 	if (!mctx) {
1392 		TRACE_ERROR("Failed to allocate memory for mtcp_context.\n");
1393 		return NULL;
1394 	}
1395 	mctx->cpu = cpu;
1396 	g_ctx[cpu] = mctx;
1397 
1398 	/* initialize logger */
1399 	g_logctx[cpu] = (struct log_thread_context *)
1400 			calloc(1, sizeof(struct log_thread_context));
1401 	if (!g_logctx[cpu]) {
1402 		perror("malloc");
1403 		TRACE_ERROR("Failed to allocate memory for log thread context.\n");
1404 		return NULL;
1405 	}
1406 	InitLogThreadContext(g_logctx[cpu], cpu);
1407 	if (pthread_create(&log_thread[cpu],
1408 			   NULL, ThreadLogMain, (void *)g_logctx[cpu])) {
1409 		perror("pthread_create");
1410 		TRACE_ERROR("Failed to create log thread\n");
1411 		return NULL;
1412 	}
1413 
1414 #ifdef ENABLE_DPDK
1415 	/* Wake up mTCP threads (wake up I/O threads) */
1416 	if (current_iomodule_func == &dpdk_module_func) {
1417 		int master;
1418 		master = rte_get_master_lcore();
1419 		if (master == cpu) {
1420 			lcore_config[master].ret = 0;
1421 			lcore_config[master].state = FINISHED;
1422 			if (pthread_create(&g_thread[cpu],
1423 					   NULL, MTCPRunThread, (void *)mctx) != 0) {
1424 				TRACE_ERROR("pthread_create of mtcp thread failed!\n");
1425 				return NULL;
1426 			}
1427 		} else
1428 			rte_eal_remote_launch(MTCPDPDKRunThread, mctx, cpu);
1429 	} else
1430 #endif /* !ENABLE_DPDK */
1431 		{
1432 			if (pthread_create(&g_thread[cpu],
1433 					   NULL, MTCPRunThread, (void *)mctx) != 0) {
1434 				TRACE_ERROR("pthread_create of mtcp thread failed!\n");
1435 				return NULL;
1436 			}
1437 		}
1438 
1439 	sem_wait(&g_init_sem[cpu]);
1440 	sem_destroy(&g_init_sem[cpu]);
1441 
1442 	running[cpu] = TRUE;
1443 
1444 #ifdef NETSTAT
1445 #if NETSTAT_TOTAL
1446 	if (printer < 0) {
1447 		printer = cpu;
1448 		TRACE_INFO("CPU %d is in charge of printing stats.\n", printer);
1449 	}
1450 #endif
1451 #endif
1452 
1453 	return mctx;
1454 }
1455 /*----------------------------------------------------------------------------*/
1456 /**
1457  * TODO: It currently always returns 0. Add appropriate error return values
1458  */
1459 int
1460 mtcp_destroy_context(mctx_t mctx)
1461 {
1462 	struct mtcp_thread_context *ctx = g_pctx[mctx->cpu];
1463 	struct mtcp_manager *mtcp = ctx->mtcp_manager;
1464 	struct log_thread_context *log_ctx = mtcp->logger;
1465 	int ret, i;
1466 
1467 	TRACE_DBG("CPU %d: mtcp_destroy_context()\n", mctx->cpu);
1468 
1469 	/* close all stream sockets that are still open */
1470 	if (!ctx->exit) {
1471 		for (i = 0; i < g_config.mos->max_concurrency; i++) {
1472 			if (mtcp->smap[i].socktype == MOS_SOCK_STREAM) {
1473 				TRACE_DBG("Closing remaining socket %d (%s)\n",
1474 						i, TCPStateToString(mtcp->smap[i].stream));
1475 #ifdef DUMP_STREAM
1476 				DumpStream(mtcp, mtcp->smap[i].stream);
1477 #endif
1478 				mtcp_close(mctx, i);
1479 			}
1480 		}
1481 	}
1482 
1483 	ctx->done = 1;
1484 
1485 	//pthread_kill(g_thread[mctx->cpu], SIGINT);
1486 #ifdef ENABLE_DPDK
1487 	ctx->exit = 1;
1488 	/* XXX - dpdk logic changes */
1489 	if (current_iomodule_func == &dpdk_module_func) {
1490 		int master = rte_get_master_lcore();
1491 		if (master == mctx->cpu)
1492 			pthread_join(g_thread[mctx->cpu], NULL);
1493 		else
1494 			rte_eal_wait_lcore(mctx->cpu);
1495 	} else
1496 #endif /* !ENABLE_DPDK */
1497 		{
1498 			pthread_join(g_thread[mctx->cpu], NULL);
1499 		}
1500 
1501 	TRACE_INFO("MTCP thread %d joined.\n", mctx->cpu);
1502 	running[mctx->cpu] = FALSE;
1503 
1504 #ifdef NETSTAT
1505 #if NETSTAT_TOTAL
1506 	if (printer == mctx->cpu) {
1507 		for (i = 0; i < num_cpus; i++) {
1508 			if (i != mctx->cpu && running[i]) {
1509 				printer = i;
1510 				break;
1511 			}
1512 		}
1513 	}
1514 #endif
1515 #endif
1516 
1517 	log_ctx->done = 1;
1518 	ret = write(log_ctx->pair_sp_fd, "F", 1);
1519 	if (ret != 1)
1520 		TRACE_ERROR("CPU %d: Fail to signal socket pair\n", mctx->cpu);
1521 
1522 	pthread_join(log_thread[ctx->cpu], NULL);
1523 	fclose(mtcp->log_fp);
1524 	TRACE_LOG("Log thread %d joined.\n", mctx->cpu);
1525 
1526 	if (mtcp->connectq) {
1527 		DestroyStreamQueue(mtcp->connectq);
1528 		mtcp->connectq = NULL;
1529 	}
1530 	if (mtcp->sendq) {
1531 		DestroyStreamQueue(mtcp->sendq);
1532 		mtcp->sendq = NULL;
1533 	}
1534 	if (mtcp->ackq) {
1535 		DestroyStreamQueue(mtcp->ackq);
1536 		mtcp->ackq = NULL;
1537 	}
1538 	if (mtcp->closeq) {
1539 		DestroyStreamQueue(mtcp->closeq);
1540 		mtcp->closeq = NULL;
1541 	}
1542 	if (mtcp->closeq_int) {
1543 		DestroyInternalStreamQueue(mtcp->closeq_int);
1544 		mtcp->closeq_int = NULL;
1545 	}
1546 	if (mtcp->resetq) {
1547 		DestroyStreamQueue(mtcp->resetq);
1548 		mtcp->resetq = NULL;
1549 	}
1550 	if (mtcp->resetq_int) {
1551 		DestroyInternalStreamQueue(mtcp->resetq_int);
1552 		mtcp->resetq_int = NULL;
1553 	}
1554 	if (mtcp->destroyq) {
1555 		DestroyStreamQueue(mtcp->destroyq);
1556 		mtcp->destroyq = NULL;
1557 	}
1558 
1559 	DestroyMTCPSender(mtcp->g_sender);
1560 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
1561 		DestroyMTCPSender(mtcp->n_sender[i]);
1562 	}
1563 
1564 	MPDestroy(mtcp->rv_pool);
1565 	MPDestroy(mtcp->sv_pool);
1566 	MPDestroy(mtcp->flow_pool);
1567 
1568 	if (mtcp->ap) {
1569 		DestroyAddressPool(mtcp->ap);
1570 	}
1571 
1572 	SQ_LOCK_DESTROY(&ctx->connect_lock);
1573 	SQ_LOCK_DESTROY(&ctx->close_lock);
1574 	SQ_LOCK_DESTROY(&ctx->reset_lock);
1575 	SQ_LOCK_DESTROY(&ctx->sendq_lock);
1576 	SQ_LOCK_DESTROY(&ctx->ackq_lock);
1577 	SQ_LOCK_DESTROY(&ctx->destroyq_lock);
1578 
1579 	//TRACE_INFO("MTCP thread %d destroyed.\n", mctx->cpu);
1580 	if (mtcp->iom->destroy_handle)
1581 		mtcp->iom->destroy_handle(ctx);
1582 	free(ctx);
1583 	free(mctx);
1584 
1585 	return 0;
1586 }
1587 /*----------------------------------------------------------------------------*/
1588 mtcp_sighandler_t
1589 mtcp_register_signal(int signum, mtcp_sighandler_t handler)
1590 {
1591 	mtcp_sighandler_t prev;
1592 
1593 	if (signum == SIGINT) {
1594 		prev = app_signal_handler;
1595 		app_signal_handler = handler;
1596 	} else {
1597 		if ((prev = signal(signum, handler)) == SIG_ERR) {
1598 			perror("signal");
1599 			return SIG_ERR;
1600 		}
1601 	}
1602 
1603 	return prev;
1604 }
1605 /*----------------------------------------------------------------------------*/
1606 int
1607 mtcp_getconf(struct mtcp_conf *conf)
1608 {
1609 	int i, j;
1610 
1611 	if (!conf) {
1612 		errno = EINVAL;
1613 		return -1;
1614 	}
1615 
1616 	conf->num_cores = g_config.mos->num_cores;
1617 	conf->max_concurrency = g_config.mos->max_concurrency;
1618 	conf->cpu_mask = g_config.mos->cpu_mask;
1619 
1620 	conf->rcvbuf_size = g_config.mos->rmem_size;
1621 	conf->sndbuf_size = g_config.mos->wmem_size;
1622 
1623 	conf->tcp_timewait = g_config.mos->tcp_tw_interval;
1624 	conf->tcp_timeout = g_config.mos->tcp_timeout;
1625 
1626 	i = 0;
1627 	struct conf_block *bwalk;
1628 	TAILQ_FOREACH(bwalk, &g_config.app_blkh, link) {
1629 		struct app_conf *app_conf = (struct app_conf *)bwalk->conf;
1630 		for (j = 0; j < app_conf->app_argc; j++)
1631 			conf->app_argv[i][j] = app_conf->app_argv[j];
1632 		conf->app_argc[i] = app_conf->app_argc;
1633 		conf->app_cpu_mask[i] = app_conf->cpu_mask;
1634 		i++;
1635 	}
1636 	conf->num_app = i;
1637 
1638 	return 0;
1639 }
1640 /*----------------------------------------------------------------------------*/
1641 int
1642 mtcp_setconf(const struct mtcp_conf *conf)
1643 {
1644 	if (!conf)
1645 		return -1;
1646 
1647 	g_config.mos->num_cores = conf->num_cores;
1648 	g_config.mos->max_concurrency = conf->max_concurrency;
1649 
1650 	g_config.mos->rmem_size = conf->rcvbuf_size;
1651 	g_config.mos->wmem_size = conf->sndbuf_size;
1652 
1653 	g_config.mos->tcp_tw_interval = conf->tcp_timewait;
1654 	g_config.mos->tcp_timeout = conf->tcp_timeout;
1655 
1656 	TRACE_CONFIG("Configuration updated by mtcp_setconf().\n");
1657 	//PrintConfiguration();
1658 
1659 	return 0;
1660 }
1661 /*----------------------------------------------------------------------------*/
1662 int
1663 mtcp_init(const char *config_file)
1664 {
1665 	int i;
1666 	int ret;
1667 
1668 	if (geteuid()) {
1669 		TRACE_CONFIG("[CAUTION] Run as root if mlock is necessary.\n");
1670 #if defined(ENABLE_DPDK) || defined(ENABLE_DPDKR) || defined(ENABLE_NETMAP)
1671 		TRACE_CONFIG("[CAUTION] Run the app as root!\n");
1672 		exit(EXIT_FAILURE);
1673 #endif
1674 	}
1675 
1676 	/* getting cpu and NIC */
1677 	num_cpus = GetNumCPUs();
1678 	assert(num_cpus >= 1);
1679 	for (i = 0; i < num_cpus; i++) {
1680 		g_mtcp[i] = NULL;
1681 		running[i] = FALSE;
1682 		sigint_cnt[i] = 0;
1683 	}
1684 
1685 	ret = LoadConfigurationUpperHalf(config_file);
1686 	if (ret) {
1687 		TRACE_CONFIG("Error occured while loading configuration.\n");
1688 		return -1;
1689 	}
1690 
1691 #if defined(ENABLE_PSIO)
1692 	current_iomodule_func = &ps_module_func;
1693 #elif defined(ENABLE_DPDK)
1694 	current_iomodule_func = &dpdk_module_func;
1695 #elif defined(ENABLE_PCAP)
1696 	current_iomodule_func = &pcap_module_func;
1697 #elif defined(ENABLE_DPDKR)
1698 	current_iomodule_func = &dpdkr_module_func;
1699 #elif defined(ENABLE_NETMAP)
1700 	current_iomodule_func = &netmap_module_func;
1701 #endif
1702 
1703 	if (current_iomodule_func->load_module_upper_half)
1704 		current_iomodule_func->load_module_upper_half();
1705 
1706 	LoadConfigurationLowerHalf();
1707 
1708 	//PrintConfiguration();
1709 
1710 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
1711 		ap[i] = CreateAddressPool(g_config.mos->netdev_table->ent[i]->ip_addr, 1);
1712 		if (!ap[i]) {
1713 			TRACE_CONFIG("Error occured while create address pool[%d]\n",
1714 				     i);
1715 			return -1;
1716 		}
1717         }
1718 
1719 	//PrintInterfaceInfo();
1720 	//PrintRoutingTable();
1721 	//PrintARPTable();
1722 	InitARPTable();
1723 
1724 	if (signal(SIGUSR1, HandleSignal) == SIG_ERR) {
1725 		perror("signal, SIGUSR1");
1726 		return -1;
1727 	}
1728 	if (signal(SIGINT, HandleSignal) == SIG_ERR) {
1729 		perror("signal, SIGINT");
1730 		return -1;
1731 	}
1732 	app_signal_handler = NULL;
1733 
1734 	printf("load_module(): %p\n", current_iomodule_func);
1735 	/* load system-wide io module specs */
1736 	if (current_iomodule_func->load_module_lower_half)
1737 		current_iomodule_func->load_module_lower_half();
1738 
1739 	GlobInitEvent();
1740 
1741 	PrintConf(&g_config);
1742 
1743 	return 0;
1744 }
1745 /*----------------------------------------------------------------------------*/
1746 int
1747 mtcp_destroy()
1748 {
1749 	int i;
1750 
1751 	/* wait until all threads are closed */
1752 	for (i = 0; i < num_cpus; i++) {
1753 		if (running[i]) {
1754 			if (pthread_join(g_thread[i], NULL) != 0)
1755 				return -1;
1756 		}
1757 	}
1758 
1759 	for (i = 0; i < g_config.mos->netdev_table->num; i++)
1760 		DestroyAddressPool(ap[i]);
1761 
1762 	TRACE_INFO("All MTCP threads are joined.\n");
1763 
1764 	return 0;
1765 }
1766 /*----------------------------------------------------------------------------*/
1767