xref: /mOS-networking-stack/core/src/core.c (revision 9d76ceb8)
1 #define _GNU_SOURCE
2 #include <sched.h>
3 #include <unistd.h>
4 #include <sys/time.h>
5 #include <semaphore.h>
6 #include <sys/mman.h>
7 #include <signal.h>
8 #include <assert.h>
9 #include <string.h>
10 
11 #include "cpu.h"
12 #include "eth_in.h"
13 #include "fhash.h"
14 #include "tcp_send_buffer.h"
15 #include "tcp_ring_buffer.h"
16 #include "socket.h"
17 #include "eth_out.h"
18 #include "tcp.h"
19 #include "tcp_in.h"
20 #include "tcp_out.h"
21 #include "mtcp_api.h"
22 #include "eventpoll.h"
23 #include "logger.h"
24 #include "config.h"
25 #include "arp.h"
26 #include "ip_out.h"
27 #include "timer.h"
28 #include "debug.h"
29 #include "event_callback.h"
30 #include "tcp_rb.h"
31 #include "tcp_stream.h"
32 #include "io_module.h"
33 
34 #ifdef ENABLE_DPDK
35 /* for launching rte thread */
36 #include <rte_launch.h>
37 #include <rte_lcore.h>
38 #endif /* !ENABLE_DPDK */
39 #define PS_CHUNK_SIZE 64
40 #define RX_THRESH (PS_CHUNK_SIZE * 0.8)
41 
42 #define ROUND_STAT FALSE
43 #define TIME_STAT FALSE
44 #define EVENT_STAT FALSE
45 #define TESTING FALSE
46 
47 #define LOG_FILE_NAME "log"
48 #define MAX_FILE_NAME 1024
49 
50 #define MAX(a, b) ((a)>(b)?(a):(b))
51 #define MIN(a, b) ((a)<(b)?(a):(b))
52 
53 #define PER_STREAM_SLICE 0.1		// in ms
54 #define PER_STREAM_TCHECK 1			// in ms
55 #define PS_SELECT_TIMEOUT 100		// in us
56 
57 #define GBPS(bytes) (bytes * 8.0 / (1000 * 1000 * 1000))
58 
59 /*----------------------------------------------------------------------------*/
60 /* handlers for threads */
61 struct mtcp_thread_context *g_pctx[MAX_CPUS] = {0};
62 struct log_thread_context *g_logctx[MAX_CPUS] = {0};
63 /*----------------------------------------------------------------------------*/
64 static pthread_t g_thread[MAX_CPUS] = {0};
65 static pthread_t log_thread[MAX_CPUS] = {0};
66 /*----------------------------------------------------------------------------*/
67 static sem_t g_init_sem[MAX_CPUS];
68 static sem_t g_done_sem[MAX_CPUS];
69 static int running[MAX_CPUS] = {0};
70 /*----------------------------------------------------------------------------*/
71 mtcp_sighandler_t app_signal_handler;
72 static int sigint_cnt[MAX_CPUS] = {0};
73 static struct timespec sigint_ts[MAX_CPUS];
74 void mtcp_free_context(mctx_t mctx);
75 /*----------------------------------------------------------------------------*/
76 #ifdef NETSTAT
77 #if NETSTAT_TOTAL
78 static int printer = -1;
79 #if ROUND_STAT
80 #endif /* ROUND_STAT */
81 #endif /* NETSTAT_TOTAL */
82 #endif /* NETSTAT */
83 /*----------------------------------------------------------------------------*/
84 void
85 HandleSignal(int signal)
86 {
87 	int i = 0;
88 
89 	if (signal == SIGINT) {
90 		FreeConfigResources();
91 #ifdef DARWIN
92 		int core = 0;
93 #else
94 		int core = sched_getcpu();
95 #endif
96 		struct timespec cur_ts;
97 
98 		clock_gettime(CLOCK_REALTIME, &cur_ts);
99 
100 		if (sigint_cnt[core] > 0 && cur_ts.tv_sec == sigint_ts[core].tv_sec) {
101 			for (i = 0; i < g_config.mos->num_cores; i++) {
102 				if (running[i]) {
103 					//exit(0);
104 					g_pctx[i]->exit = TRUE;
105 				}
106 			}
107 		} else {
108 			for (i = 0; i < g_config.mos->num_cores; i++) {
109 				if (g_pctx[i])
110 					g_pctx[i]->interrupt = TRUE;
111 			}
112 			if (!app_signal_handler) {
113 				for (i = 0; i < g_config.mos->num_cores; i++) {
114 					if (running[i]) {
115 						//exit(0);
116 						g_pctx[i]->exit = TRUE;
117 					}
118 				}
119 			}
120 		}
121 		sigint_cnt[core]++;
122 		clock_gettime(CLOCK_REALTIME, &sigint_ts[core]);
123 	}
124 
125 	if (signal != SIGUSR1) {
126 		if (app_signal_handler) {
127 			app_signal_handler(signal);
128 		}
129 	}
130 }
131 /*----------------------------------------------------------------------------*/
132 static int
133 AttachDevice(struct mtcp_thread_context* ctx)
134 {
135 	int working = -1;
136 	mtcp_manager_t mtcp = ctx->mtcp_manager;
137 
138 	if (mtcp->iom->link_devices)
139 		working = mtcp->iom->link_devices(ctx);
140 	else
141 		return 0;
142 
143 	return working;
144 }
145 /*----------------------------------------------------------------------------*/
146 #ifdef TIMESTAT
147 static inline void
148 InitStatCounter(struct stat_counter *counter)
149 {
150 	counter->cnt = 0;
151 	counter->sum = 0;
152 	counter->max = 0;
153 	counter->min = 0;
154 }
155 /*----------------------------------------------------------------------------*/
156 static inline void
157 UpdateStatCounter(struct stat_counter *counter, int64_t value)
158 {
159 	counter->cnt++;
160 	counter->sum += value;
161 	if (value > counter->max)
162 		counter->max = value;
163 	if (counter->min == 0 || value < counter->min)
164 		counter->min = value;
165 }
166 /*----------------------------------------------------------------------------*/
167 static inline uint64_t
168 GetAverageStat(struct stat_counter *counter)
169 {
170 	return counter->cnt ? (counter->sum / counter->cnt) : 0;
171 }
172 /*----------------------------------------------------------------------------*/
173 static inline int64_t
174 TimeDiffUs(struct timeval *t2, struct timeval *t1)
175 {
176 	return (t2->tv_sec - t1->tv_sec) * 1000000 +
177 			(int64_t)(t2->tv_usec - t1->tv_usec);
178 }
179 /*----------------------------------------------------------------------------*/
180 #endif
181 #ifdef NETSTAT
182 static inline void
183 PrintThreadNetworkStats(mtcp_manager_t mtcp, struct net_stat *ns)
184 {
185 	int i;
186 
187 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
188 		ns->rx_packets[i] = mtcp->nstat.rx_packets[i] - mtcp->p_nstat.rx_packets[i];
189 		ns->rx_errors[i] = mtcp->nstat.rx_errors[i] - mtcp->p_nstat.rx_errors[i];
190 		ns->rx_bytes[i] = mtcp->nstat.rx_bytes[i] - mtcp->p_nstat.rx_bytes[i];
191 		ns->tx_packets[i] = mtcp->nstat.tx_packets[i] - mtcp->p_nstat.tx_packets[i];
192 		ns->tx_drops[i] = mtcp->nstat.tx_drops[i] - mtcp->p_nstat.tx_drops[i];
193 		ns->tx_bytes[i] = mtcp->nstat.tx_bytes[i] - mtcp->p_nstat.tx_bytes[i];
194 #if NETSTAT_PERTHREAD
195 		if (g_config.mos->netdev_table->ent[i]->stat_print) {
196 			fprintf(stderr, "[CPU%2d] %s flows: %6u, "
197 					"RX: %7llu(pps) (err: %5llu), %5.2lf(Gbps), "
198 					"TX: %7llu(pps), %5.2lf(Gbps)\n",
199 					mtcp->ctx->cpu,
200 					g_config.mos->netdev_table->ent[i]->dev_name,
201 					(unsigned)mtcp->flow_cnt,
202 					(long long unsigned)ns->rx_packets[i],
203 					(long long unsigned)ns->rx_errors[i],
204 					GBPS(ns->rx_bytes[i]),
205 					(long long unsigned)ns->tx_packets[i],
206 					GBPS(ns->tx_bytes[i]));
207 		}
208 #endif
209 	}
210 	mtcp->p_nstat = mtcp->nstat;
211 
212 }
213 /*----------------------------------------------------------------------------*/
214 #if ROUND_STAT
215 static inline void
216 PrintThreadRoundStats(mtcp_manager_t mtcp, struct run_stat *rs)
217 {
218 #define ROUND_DIV (1000)
219 	rs->rounds = mtcp->runstat.rounds - mtcp->p_runstat.rounds;
220 	rs->rounds_rx = mtcp->runstat.rounds_rx - mtcp->p_runstat.rounds_rx;
221 	rs->rounds_rx_try = mtcp->runstat.rounds_rx_try - mtcp->p_runstat.rounds_rx_try;
222 	rs->rounds_tx = mtcp->runstat.rounds_tx - mtcp->p_runstat.rounds_tx;
223 	rs->rounds_tx_try = mtcp->runstat.rounds_tx_try - mtcp->p_runstat.rounds_tx_try;
224 	rs->rounds_select = mtcp->runstat.rounds_select - mtcp->p_runstat.rounds_select;
225 	rs->rounds_select_rx = mtcp->runstat.rounds_select_rx - mtcp->p_runstat.rounds_select_rx;
226 	rs->rounds_select_tx = mtcp->runstat.rounds_select_tx - mtcp->p_runstat.rounds_select_tx;
227 	rs->rounds_select_intr = mtcp->runstat.rounds_select_intr - mtcp->p_runstat.rounds_select_intr;
228 	rs->rounds_twcheck = mtcp->runstat.rounds_twcheck - mtcp->p_runstat.rounds_twcheck;
229 	mtcp->p_runstat = mtcp->runstat;
230 #if NETSTAT_PERTHREAD
231 	fprintf(stderr, "[CPU%2d] Rounds: %4lluK, "
232 			"rx: %3lluK (try: %4lluK), tx: %3lluK (try: %4lluK), "
233 			"ps_select: %4llu (rx: %4llu, tx: %4llu, intr: %3llu)\n",
234 			mtcp->ctx->cpu, rs->rounds / ROUND_DIV,
235 			rs->rounds_rx / ROUND_DIV, rs->rounds_rx_try / ROUND_DIV,
236 			rs->rounds_tx / ROUND_DIV, rs->rounds_tx_try / ROUND_DIV,
237 			rs->rounds_select,
238 			rs->rounds_select_rx, rs->rounds_select_tx, rs->rounds_select_intr);
239 #endif
240 }
241 #endif /* ROUND_STAT */
242 /*----------------------------------------------------------------------------*/
243 #if TIME_STAT
244 static inline void
245 PrintThreadRoundTime(mtcp_manager_t mtcp)
246 {
247 	fprintf(stderr, "[CPU%2d] Time: (avg, max) "
248 			"round: (%4luus, %4luus), processing: (%4luus, %4luus), "
249 			"tcheck: (%4luus, %4luus), epoll: (%4luus, %4luus), "
250 			"handle: (%4luus, %4luus), xmit: (%4luus, %4luus), "
251 			"select: (%4luus, %4luus)\n", mtcp->ctx->cpu,
252 			GetAverageStat(&mtcp->rtstat.round), mtcp->rtstat.round.max,
253 			GetAverageStat(&mtcp->rtstat.processing), mtcp->rtstat.processing.max,
254 			GetAverageStat(&mtcp->rtstat.tcheck), mtcp->rtstat.tcheck.max,
255 			GetAverageStat(&mtcp->rtstat.epoll), mtcp->rtstat.epoll.max,
256 			GetAverageStat(&mtcp->rtstat.handle), mtcp->rtstat.handle.max,
257 			GetAverageStat(&mtcp->rtstat.xmit), mtcp->rtstat.xmit.max,
258 			GetAverageStat(&mtcp->rtstat.select), mtcp->rtstat.select.max);
259 
260 	InitStatCounter(&mtcp->rtstat.round);
261 	InitStatCounter(&mtcp->rtstat.processing);
262 	InitStatCounter(&mtcp->rtstat.tcheck);
263 	InitStatCounter(&mtcp->rtstat.epoll);
264 	InitStatCounter(&mtcp->rtstat.handle);
265 	InitStatCounter(&mtcp->rtstat.xmit);
266 	InitStatCounter(&mtcp->rtstat.select);
267 }
268 #endif
269 #endif /* NETSTAT */
270 /*----------------------------------------------------------------------------*/
271 #if EVENT_STAT
272 static inline void
273 PrintEventStat(int core, struct mtcp_epoll_stat *stat)
274 {
275 	fprintf(stderr, "[CPU%2d] calls: %lu, waits: %lu, wakes: %lu, "
276 			"issued: %lu, registered: %lu, invalidated: %lu, handled: %lu\n",
277 			core, stat->calls, stat->waits, stat->wakes,
278 			stat->issued, stat->registered, stat->invalidated, stat->handled);
279 	memset(stat, 0, sizeof(struct mtcp_epoll_stat));
280 }
281 #endif /* EVENT_STAT */
282 /*----------------------------------------------------------------------------*/
283 #ifdef NETSTAT
284 static inline void
285 PrintNetworkStats(mtcp_manager_t mtcp, uint32_t cur_ts)
286 {
287 #define TIMEOUT 1
288 	int i;
289 	struct net_stat ns;
290 	bool stat_print = false;
291 #if ROUND_STAT
292 	struct run_stat rs;
293 #endif /* ROUND_STAT */
294 #ifdef NETSTAT_TOTAL
295 	static double peak_total_rx_gbps = 0;
296 	static double peak_total_tx_gbps = 0;
297 	static double avg_total_rx_gbps = 0;
298 	static double avg_total_tx_gbps = 0;
299 
300 	double total_rx_gbps = 0, total_tx_gbps = 0;
301 	int j;
302 	uint32_t gflow_cnt = 0;
303 	struct net_stat g_nstat;
304 #if ROUND_STAT
305 	struct run_stat g_runstat;
306 #endif /* ROUND_STAT */
307 #endif /* NETSTAT_TOTAL */
308 
309 	if (TS_TO_MSEC(cur_ts - mtcp->p_nstat_ts) < SEC_TO_MSEC(TIMEOUT)) {
310 		return;
311 	}
312 
313 	mtcp->p_nstat_ts = cur_ts;
314 	gflow_cnt = 0;
315 	memset(&g_nstat, 0, sizeof(struct net_stat));
316 	for (i = 0; i < g_config.mos->num_cores; i++) {
317 		if (running[i]) {
318 			PrintThreadNetworkStats(g_mtcp[i], &ns);
319 #if NETSTAT_TOTAL
320 			gflow_cnt += g_mtcp[i]->flow_cnt;
321 			for (j = 0; j < g_config.mos->netdev_table->num; j++) {
322 				g_nstat.rx_packets[j] += ns.rx_packets[j];
323 				g_nstat.rx_errors[j] += ns.rx_errors[j];
324 				g_nstat.rx_bytes[j] += ns.rx_bytes[j];
325 				g_nstat.tx_packets[j] += ns.tx_packets[j];
326 				g_nstat.tx_drops[j] += ns.tx_drops[j];
327 				g_nstat.tx_bytes[j] += ns.tx_bytes[j];
328 			}
329 #endif
330 		}
331 	}
332 #if NETSTAT_TOTAL
333 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
334 		if (g_config.mos->netdev_table->ent[i]->stat_print) {
335 			fprintf(stderr, "[ ALL ] %s, "
336 			"RX: %7llu(pps) (err: %5llu), %5.2lf(Gbps), "
337 			"TX: %7llu(pps), %5.2lf(Gbps)\n",
338 				g_config.mos->netdev_table->ent[i]->dev_name,
339 				(long long unsigned)g_nstat.rx_packets[i],
340 				(long long unsigned)g_nstat.rx_errors[i],
341 				GBPS(g_nstat.rx_bytes[i]),
342 				(long long unsigned)g_nstat.tx_packets[i],
343 				GBPS(g_nstat.tx_bytes[i]));
344 			total_rx_gbps += GBPS(g_nstat.rx_bytes[i]);
345 			total_tx_gbps += GBPS(g_nstat.tx_bytes[i]);
346 			stat_print = true;
347 		}
348 	}
349 	if (stat_print) {
350 		fprintf(stderr, "[ ALL ] flows: %6u\n", gflow_cnt);
351 		if (avg_total_rx_gbps == 0)
352 			avg_total_rx_gbps = total_rx_gbps;
353 		else
354 			avg_total_rx_gbps = avg_total_rx_gbps * 0.6 + total_rx_gbps * 0.4;
355 
356 		if (avg_total_tx_gbps == 0)
357 			avg_total_tx_gbps = total_tx_gbps;
358 		else
359 			avg_total_tx_gbps = avg_total_tx_gbps * 0.6 + total_tx_gbps * 0.4;
360 
361 		if (peak_total_rx_gbps < total_rx_gbps)
362 			peak_total_rx_gbps = total_rx_gbps;
363 		if (peak_total_tx_gbps < total_tx_gbps)
364 			peak_total_tx_gbps = total_tx_gbps;
365 
366 		fprintf(stderr, "[ PEAK ] RX: %5.2lf(Gbps), TX: %5.2lf(Gbps)\n"
367 						"[ RECENT AVG ] RX: %5.2lf(Gbps), TX: %5.2lf(Gbps)\n",
368 				peak_total_rx_gbps, peak_total_tx_gbps,
369 				avg_total_rx_gbps, avg_total_tx_gbps);
370 	}
371 #endif
372 
373 #if ROUND_STAT
374 	memset(&g_runstat, 0, sizeof(struct run_stat));
375 	for (i = 0; i < g_config.mos->num_cores; i++) {
376 		if (running[i]) {
377 			PrintThreadRoundStats(g_mtcp[i], &rs);
378 #if DBGMSG
379 			g_runstat.rounds += rs.rounds;
380 			g_runstat.rounds_rx += rs.rounds_rx;
381 			g_runstat.rounds_rx_try += rs.rounds_rx_try;
382 			g_runstat.rounds_tx += rs.rounds_tx;
383 			g_runstat.rounds_tx_try += rs.rounds_tx_try;
384 			g_runstat.rounds_select += rs.rounds_select;
385 			g_runstat.rounds_select_rx += rs.rounds_select_rx;
386 			g_runstat.rounds_select_tx += rs.rounds_select_tx;
387 #endif
388 		}
389 	}
390 
391 	TRACE_DBG("[ ALL ] Rounds: %4ldK, "
392 		  "rx: %3ldK (try: %4ldK), tx: %3ldK (try: %4ldK), "
393 		  "ps_select: %4ld (rx: %4ld, tx: %4ld)\n",
394 		  g_runstat.rounds / 1000, g_runstat.rounds_rx / 1000,
395 		  g_runstat.rounds_rx_try / 1000, g_runstat.rounds_tx / 1000,
396 		  g_runstat.rounds_tx_try / 1000, g_runstat.rounds_select,
397 		  g_runstat.rounds_select_rx, g_runstat.rounds_select_tx);
398 #endif /* ROUND_STAT */
399 
400 #if TIME_STAT
401 	for (i = 0; i < g_config.mos->num_cores; i++) {
402 		if (running[i]) {
403 			PrintThreadRoundTime(g_mtcp[i]);
404 		}
405 	}
406 #endif
407 
408 #if EVENT_STAT
409 	for (i = 0; i < g_config.mos->num_cores; i++) {
410 		if (running[i] && g_mtcp[i]->ep) {
411 			PrintEventStat(i, &g_mtcp[i]->ep->stat);
412 		}
413 	}
414 #endif
415 
416 	fflush(stderr);
417 }
418 #endif /* NETSTAT */
419 /*----------------------------------------------------------------------------*/
420 static inline void
421 FlushMonitorReadEvents(mtcp_manager_t mtcp)
422 {
423 	struct event_queue *mtcpq;
424 	struct tcp_stream *cur_stream;
425 	struct mon_listener *walk;
426 
427 	/* check if monitor sockets should be passed data */
428 	TAILQ_FOREACH(walk, &mtcp->monitors, link) {
429 		if (walk->socket->socktype != MOS_SOCK_MONITOR_STREAM ||
430 			!(mtcpq = walk->eq))
431 			continue;
432 
433 		while (mtcpq->num_events > 0) {
434 			cur_stream =
435 				(struct tcp_stream *)mtcpq->events[mtcpq->start++].ev.data.ptr;
436 			/* only read events */
437 			if (cur_stream != NULL &&
438 			    (cur_stream->actions & MOS_ACT_READ_DATA)) {
439 				if (cur_stream->rcvvar != NULL &&
440 						cur_stream->rcvvar->rcvbuf != NULL) {
441 					/* no need to pass pkt context */
442 					struct socket_map *walk;
443 					if (cur_stream->side == MOS_SIDE_CLI) {
444 						SOCKQ_FOREACH_REVERSE(walk, &cur_stream->msocks) {
445 							HandleCallback(mtcp, MOS_NULL, walk,
446 								       cur_stream->side, NULL,
447 								       MOS_ON_CONN_NEW_DATA);
448 						} SOCKQ_FOREACH_END;
449 					} else { /* cur_stream->side == MOS_SIDE_SVR */
450 						SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
451 							HandleCallback(mtcp, MOS_NULL, walk,
452 								       cur_stream->side, NULL,
453 								       MOS_ON_CONN_NEW_DATA);
454 						} SOCKQ_FOREACH_END;
455 					}
456 				}
457 				/* reset the actions now */
458 				cur_stream->actions = 0;
459 			}
460 			if (mtcpq->start >= mtcpq->size)
461 				mtcpq->start = 0;
462 			mtcpq->num_events--;
463 		}
464 	}
465 }
466 /*----------------------------------------------------------------------------*/
467 static inline void
468 FlushBufferedReadEvents(mtcp_manager_t mtcp)
469 {
470 	int i;
471 	int offset;
472 	struct event_queue *mtcpq;
473 	struct tcp_stream *cur_stream;
474 
475 	if (mtcp->ep == NULL) {
476 		TRACE_EPOLL("No epoll socket has been registered yet!\n");
477 		return;
478 	} else {
479 		/* case when mtcpq exists */
480 		mtcpq = mtcp->ep->mtcp_queue;
481 		offset = mtcpq->start;
482 	}
483 
484 	/* we will use queued-up epoll read-in events
485 	 * to trigger buffered read monitor events */
486 	for (i = 0; i < mtcpq->num_events; i++) {
487 		cur_stream = mtcp->smap[mtcpq->events[offset++].sockid].stream;
488 		/* only read events */
489 		/* Raise new data callback event */
490 		if (cur_stream != NULL &&
491 		    	(cur_stream->socket->events | MOS_EPOLLIN)) {
492 			if (cur_stream->rcvvar != NULL &&
493 					cur_stream->rcvvar->rcvbuf != NULL) {
494 				/* no need to pass pkt context */
495 				struct socket_map *walk;
496 				if (cur_stream->side == MOS_SIDE_CLI) {
497 					SOCKQ_FOREACH_REVERSE(walk, &cur_stream->msocks) {
498 						HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side,
499 							       NULL, MOS_ON_CONN_NEW_DATA);
500 					} SOCKQ_FOREACH_END;
501 				} else { /* cur_stream->side == MOS_SIDE_SVR */
502 					SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
503 						HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side,
504 							       NULL, MOS_ON_CONN_NEW_DATA);
505 					} SOCKQ_FOREACH_END;
506 				}
507 			}
508 		}
509 		if (offset >= mtcpq->size)
510 			offset = 0;
511 	}
512 }
513 /*----------------------------------------------------------------------------*/
514 static inline void
515 FlushEpollEvents(mtcp_manager_t mtcp, uint32_t cur_ts)
516 {
517 	struct mtcp_epoll *ep = mtcp->ep;
518 	struct event_queue *usrq = ep->usr_queue;
519 	struct event_queue *mtcpq = ep->mtcp_queue;
520 
521 	pthread_mutex_lock(&ep->epoll_lock);
522 	if (ep->mtcp_queue->num_events > 0) {
523 		/* while mtcp_queue have events */
524 		/* and usr_queue is not full */
525 		while (mtcpq->num_events > 0 && usrq->num_events < usrq->size) {
526 			/* copy the event from mtcp_queue to usr_queue */
527 			usrq->events[usrq->end++] = mtcpq->events[mtcpq->start++];
528 
529 			if (usrq->end >= usrq->size)
530 				usrq->end = 0;
531 			usrq->num_events++;
532 
533 			if (mtcpq->start >= mtcpq->size)
534 				mtcpq->start = 0;
535 			mtcpq->num_events--;
536 		}
537 	}
538 
539 	/* if there are pending events, wake up user */
540 	if (ep->waiting && (ep->usr_queue->num_events > 0 ||
541 				ep->usr_shadow_queue->num_events > 0)) {
542 		STAT_COUNT(mtcp->runstat.rounds_epoll);
543 		TRACE_EPOLL("Broadcasting events. num: %d, cur_ts: %u, prev_ts: %u\n",
544 				ep->usr_queue->num_events, cur_ts, mtcp->ts_last_event);
545 		mtcp->ts_last_event = cur_ts;
546 		ep->stat.wakes++;
547 		pthread_cond_signal(&ep->epoll_cond);
548 	}
549 	pthread_mutex_unlock(&ep->epoll_lock);
550 }
551 /*----------------------------------------------------------------------------*/
552 static inline void
553 HandleApplicationCalls(mtcp_manager_t mtcp, uint32_t cur_ts)
554 {
555 	tcp_stream *stream;
556 	int cnt, max_cnt;
557 	int handled, delayed;
558 	int control, send, ack;
559 
560 	/* connect handling */
561 	while ((stream = StreamDequeue(mtcp->connectq))) {
562 		if (stream->state != TCP_ST_SYN_SENT) {
563 			TRACE_INFO("Got a connection request from app with state: %s",
564 				   TCPStateToString(stream));
565 			exit(EXIT_FAILURE);
566 		} else {
567 			stream->cb_events |= MOS_ON_CONN_START |
568 				MOS_ON_TCP_STATE_CHANGE;
569 			/* if monitor is on... */
570 			if (stream->pair_stream != NULL)
571 				stream->pair_stream->cb_events |=
572 					MOS_ON_CONN_START;
573 		}
574 		AddtoControlList(mtcp, stream, cur_ts);
575 	}
576 
577 	/* send queue handling */
578 	while ((stream = StreamDequeue(mtcp->sendq))) {
579 		stream->sndvar->on_sendq = FALSE;
580 		AddtoSendList(mtcp, stream);
581 	}
582 
583 	/* ack queue handling */
584 	while ((stream = StreamDequeue(mtcp->ackq))) {
585 		stream->sndvar->on_ackq = FALSE;
586 		EnqueueACK(mtcp, stream, cur_ts, ACK_OPT_AGGREGATE);
587 	}
588 
589 	/* close handling */
590 	handled = delayed = 0;
591 	control = send = ack = 0;
592 	while ((stream = StreamDequeue(mtcp->closeq))) {
593 		struct tcp_send_vars *sndvar = stream->sndvar;
594 		sndvar->on_closeq = FALSE;
595 
596 		if (sndvar->sndbuf) {
597 			sndvar->fss = sndvar->sndbuf->head_seq + sndvar->sndbuf->len;
598 		} else {
599 			sndvar->fss = stream->snd_nxt;
600 		}
601 
602 		if (g_config.mos->tcp_timeout > 0)
603 			RemoveFromTimeoutList(mtcp, stream);
604 
605 		if (stream->have_reset) {
606 			handled++;
607 			if (stream->state != TCP_ST_CLOSED_RSVD) {
608 				stream->close_reason = TCP_RESET;
609 				stream->state = TCP_ST_CLOSED_RSVD;
610 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
611 				TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id);
612 				DestroyTCPStream(mtcp, stream);
613 			} else {
614 				TRACE_ERROR("Stream already closed.\n");
615 			}
616 
617 		} else if (sndvar->on_control_list) {
618 			sndvar->on_closeq_int = TRUE;
619 			StreamInternalEnqueue(mtcp->closeq_int, stream);
620 			delayed++;
621 			if (sndvar->on_control_list)
622 				control++;
623 			if (sndvar->on_send_list)
624 				send++;
625 			if (sndvar->on_ack_list)
626 				ack++;
627 
628 		} else if (sndvar->on_send_list || sndvar->on_ack_list) {
629 			handled++;
630 			if (stream->state == TCP_ST_ESTABLISHED) {
631 				stream->state = TCP_ST_FIN_WAIT_1;
632 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
633 				TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id);
634 
635 			} else if (stream->state == TCP_ST_CLOSE_WAIT) {
636 				stream->state = TCP_ST_LAST_ACK;
637 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
638 				TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id);
639 			}
640 			stream->control_list_waiting = TRUE;
641 
642 		} else if (stream->state != TCP_ST_CLOSED_RSVD) {
643 			handled++;
644 			if (stream->state == TCP_ST_ESTABLISHED) {
645 				stream->state = TCP_ST_FIN_WAIT_1;
646 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
647 				TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id);
648 
649 			} else if (stream->state == TCP_ST_CLOSE_WAIT) {
650 				stream->state = TCP_ST_LAST_ACK;
651 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
652 				TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id);
653 			}
654 			//sndvar->rto = TCP_FIN_RTO;
655 			//UpdateRetransmissionTimer(mtcp, stream, mtcp->cur_ts);
656 			AddtoControlList(mtcp, stream, cur_ts);
657 		} else {
658 			TRACE_ERROR("Already closed connection!\n");
659 		}
660 	}
661 	TRACE_ROUND("Handling close connections. cnt: %d\n", cnt);
662 
663 	cnt = 0;
664 	max_cnt = mtcp->closeq_int->count;
665 	while (cnt++ < max_cnt) {
666 		stream = StreamInternalDequeue(mtcp->closeq_int);
667 
668 		if (stream->sndvar->on_control_list) {
669 			StreamInternalEnqueue(mtcp->closeq_int, stream);
670 
671 		} else if (stream->state != TCP_ST_CLOSED_RSVD) {
672 			handled++;
673 			stream->sndvar->on_closeq_int = FALSE;
674 			if (stream->state == TCP_ST_ESTABLISHED) {
675 				stream->state = TCP_ST_FIN_WAIT_1;
676 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
677 				TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id);
678 
679 			} else if (stream->state == TCP_ST_CLOSE_WAIT) {
680 				stream->state = TCP_ST_LAST_ACK;
681 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
682 				TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id);
683 			}
684 			AddtoControlList(mtcp, stream, cur_ts);
685 		} else {
686 			stream->sndvar->on_closeq_int = FALSE;
687 			TRACE_ERROR("Already closed connection!\n");
688 		}
689 	}
690 
691 	/* reset handling */
692 	while ((stream = StreamDequeue(mtcp->resetq))) {
693 		stream->sndvar->on_resetq = FALSE;
694 
695 		if (g_config.mos->tcp_timeout > 0)
696 			RemoveFromTimeoutList(mtcp, stream);
697 
698 		if (stream->have_reset) {
699 			if (stream->state != TCP_ST_CLOSED_RSVD) {
700 				stream->close_reason = TCP_RESET;
701 				stream->state = TCP_ST_CLOSED_RSVD;
702 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
703 				TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id);
704 				DestroyTCPStream(mtcp, stream);
705 			} else {
706 				TRACE_ERROR("Stream already closed.\n");
707 			}
708 
709 		} else if (stream->sndvar->on_control_list ||
710 				stream->sndvar->on_send_list || stream->sndvar->on_ack_list) {
711 			/* wait until all the queues are flushed */
712 			stream->sndvar->on_resetq_int = TRUE;
713 			StreamInternalEnqueue(mtcp->resetq_int, stream);
714 
715 		} else {
716 			if (stream->state != TCP_ST_CLOSED_RSVD) {
717 				stream->close_reason = TCP_ACTIVE_CLOSE;
718 				stream->state = TCP_ST_CLOSED_RSVD;
719 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
720 				TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id);
721 				AddtoControlList(mtcp, stream, cur_ts);
722 			} else {
723 				TRACE_ERROR("Stream already closed.\n");
724 			}
725 		}
726 	}
727 	TRACE_ROUND("Handling reset connections. cnt: %d\n", cnt);
728 
729 	cnt = 0;
730 	max_cnt = mtcp->resetq_int->count;
731 	while (cnt++ < max_cnt) {
732 		stream = StreamInternalDequeue(mtcp->resetq_int);
733 
734 		if (stream->sndvar->on_control_list ||
735 				stream->sndvar->on_send_list || stream->sndvar->on_ack_list) {
736 			/* wait until all the queues are flushed */
737 			StreamInternalEnqueue(mtcp->resetq_int, stream);
738 
739 		} else {
740 			stream->sndvar->on_resetq_int = FALSE;
741 
742 			if (stream->state != TCP_ST_CLOSED_RSVD) {
743 				stream->close_reason = TCP_ACTIVE_CLOSE;
744 				stream->state = TCP_ST_CLOSED_RSVD;
745 				stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
746 				TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id);
747 				AddtoControlList(mtcp, stream, cur_ts);
748 			} else {
749 				TRACE_ERROR("Stream already closed.\n");
750 			}
751 		}
752 	}
753 
754 	/* destroy streams in destroyq */
755 	while ((stream = StreamDequeue(mtcp->destroyq))) {
756 		DestroyTCPStream(mtcp, stream);
757 	}
758 
759 	mtcp->wakeup_flag = FALSE;
760 }
761 /*----------------------------------------------------------------------------*/
762 static inline void
763 WritePacketsToChunks(mtcp_manager_t mtcp, uint32_t cur_ts)
764 {
765 	int thresh = g_config.mos->max_concurrency;
766 	int i;
767 
768 	/* Set the threshold to g_config.mos->max_concurrency to send ACK immediately */
769 	/* Otherwise, set to appropriate value (e.g. thresh) */
770 	assert(mtcp->g_sender != NULL);
771 	if (mtcp->g_sender->control_list_cnt)
772 		WriteTCPControlList(mtcp, mtcp->g_sender, cur_ts, thresh);
773 	if (mtcp->g_sender->ack_list_cnt)
774 		WriteTCPACKList(mtcp, mtcp->g_sender, cur_ts, thresh);
775 	if (mtcp->g_sender->send_list_cnt)
776 		WriteTCPDataList(mtcp, mtcp->g_sender, cur_ts, thresh);
777 
778 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
779 		assert(mtcp->n_sender[i] != NULL);
780 		if (mtcp->n_sender[i]->control_list_cnt)
781 			WriteTCPControlList(mtcp, mtcp->n_sender[i], cur_ts, thresh);
782 		if (mtcp->n_sender[i]->ack_list_cnt)
783 			WriteTCPACKList(mtcp, mtcp->n_sender[i], cur_ts, thresh);
784 		if (mtcp->n_sender[i]->send_list_cnt)
785 			WriteTCPDataList(mtcp, mtcp->n_sender[i], cur_ts, thresh);
786 	}
787 }
788 /*----------------------------------------------------------------------------*/
789 #if TESTING
790 static int
791 DestroyRemainingFlows(mtcp_manager_t mtcp)
792 {
793 	struct hashtable *ht = mtcp->tcp_flow_table;
794 	tcp_stream *walk;
795 	int cnt, i;
796 
797 	cnt = 0;
798 
799 	thread_printf(mtcp, mtcp->log_fp,
800 			"CPU %d: Flushing remaining flows.\n", mtcp->ctx->cpu);
801 
802 	for (i = 0; i < NUM_BINS; i++) {
803 		TAILQ_FOREACH(walk, &ht->ht_table[i], rcvvar->he_link) {
804 			thread_printf(mtcp, mtcp->log_fp,
805 					"CPU %d: Destroying stream %d\n", mtcp->ctx->cpu, walk->id);
806 #ifdef DUMP_STREAM
807 			DumpStream(mtcp, walk);
808 #endif
809 			DestroyTCPStream(mtcp, walk);
810 			cnt++;
811 		}
812 	}
813 
814 	return cnt;
815 }
816 #endif
817 /*----------------------------------------------------------------------------*/
818 static void
819 InterruptApplication(mtcp_manager_t mtcp)
820 {
821 	/* interrupt if the mtcp_epoll_wait() is waiting */
822 	if (mtcp->ep) {
823 		pthread_mutex_lock(&mtcp->ep->epoll_lock);
824 		if (mtcp->ep->waiting) {
825 			pthread_cond_signal(&mtcp->ep->epoll_cond);
826 		}
827 		pthread_mutex_unlock(&mtcp->ep->epoll_lock);
828 	}
829 	/* interrupt if the accept() is waiting */
830 	if (mtcp->listener) {
831 		if (mtcp->listener->socket) {
832 			pthread_mutex_lock(&mtcp->listener->accept_lock);
833 			if (!(mtcp->listener->socket->opts & MTCP_NONBLOCK)) {
834 				pthread_cond_signal(&mtcp->listener->accept_cond);
835 			}
836 			pthread_mutex_unlock(&mtcp->listener->accept_lock);
837 		}
838 	}
839 }
840 /*----------------------------------------------------------------------------*/
841 void
842 RunPassiveLoop(mtcp_manager_t mtcp)
843 {
844 	sem_wait(&g_done_sem[mtcp->ctx->cpu]);
845 	sem_destroy(&g_done_sem[mtcp->ctx->cpu]);
846 	return;
847 }
848 /*----------------------------------------------------------------------------*/
849 static void
850 RunMainLoop(struct mtcp_thread_context *ctx)
851 {
852 	mtcp_manager_t mtcp = ctx->mtcp_manager;
853 	int i;
854 	int recv_cnt;
855 	int rx_inf, tx_inf;
856 	struct timeval cur_ts = {0};
857 	uint32_t ts, ts_prev;
858 
859 #if TIME_STAT
860 	struct timeval prev_ts, processing_ts, tcheck_ts,
861 				   epoll_ts, handle_ts, xmit_ts, select_ts;
862 #endif
863 	int thresh;
864 
865 	gettimeofday(&cur_ts, NULL);
866 
867 	TRACE_DBG("CPU %d: mtcp thread running.\n", ctx->cpu);
868 
869 #if TIME_STAT
870 	prev_ts = cur_ts;
871 	InitStatCounter(&mtcp->rtstat.round);
872 	InitStatCounter(&mtcp->rtstat.processing);
873 	InitStatCounter(&mtcp->rtstat.tcheck);
874 	InitStatCounter(&mtcp->rtstat.epoll);
875 	InitStatCounter(&mtcp->rtstat.handle);
876 	InitStatCounter(&mtcp->rtstat.xmit);
877 	InitStatCounter(&mtcp->rtstat.select);
878 #endif
879 
880 	ts = ts_prev = 0;
881 	while ((!ctx->done || mtcp->flow_cnt) && !ctx->exit) {
882 
883 		STAT_COUNT(mtcp->runstat.rounds);
884 		recv_cnt = 0;
885 		gettimeofday(&cur_ts, NULL);
886 #if TIME_STAT
887 		/* measure the inter-round delay */
888 		UpdateStatCounter(&mtcp->rtstat.round, TimeDiffUs(&cur_ts, &prev_ts));
889 		prev_ts = cur_ts;
890 #endif
891 
892 		ts = TIMEVAL_TO_TS(&cur_ts);
893 		mtcp->cur_ts = ts;
894 
895 		for (rx_inf = 0; rx_inf < g_config.mos->netdev_table->num; rx_inf++) {
896 
897 			recv_cnt = mtcp->iom->recv_pkts(ctx, rx_inf);
898 			STAT_COUNT(mtcp->runstat.rounds_rx_try);
899 
900 			for (i = 0; i < recv_cnt; i++) {
901 				uint16_t len;
902 				uint8_t *pktbuf;
903 				pktbuf = mtcp->iom->get_rptr(mtcp->ctx, rx_inf, i, &len);
904 				ProcessPacket(mtcp, rx_inf, i, ts, pktbuf, len);
905 			}
906 
907 		}
908 		STAT_COUNT(mtcp->runstat.rounds_rx);
909 
910 #if TIME_STAT
911 		gettimeofday(&processing_ts, NULL);
912 		UpdateStatCounter(&mtcp->rtstat.processing,
913 				TimeDiffUs(&processing_ts, &cur_ts));
914 #endif /* TIME_STAT */
915 
916 		/* Handle user defined timeout */
917 		struct timer *walk, *tmp;
918 		for (walk = TAILQ_FIRST(&mtcp->timer_list); walk != NULL; walk = tmp) {
919 			tmp = TAILQ_NEXT(walk, timer_link);
920 			if (TIMEVAL_LT(&cur_ts, &walk->exp))
921 				break;
922 
923 			struct mtcp_context mctx = {.cpu = ctx->cpu};
924 			walk->cb(&mctx, walk->id, 0, 0 /* FIXME */, NULL);
925 			DelTimer(mtcp, walk);
926 		}
927 
928 		/* interaction with application */
929 		if (mtcp->flow_cnt > 0) {
930 
931 			/* check retransmission timeout and timewait expire */
932 #if 0
933 			thresh = (int)mtcp->flow_cnt / (TS_TO_USEC(PER_STREAM_TCHECK));
934 			assert(thresh >= 0);
935 			if (thresh == 0)
936 				thresh = 1;
937 			if (recv_cnt > 0 && thresh > recv_cnt)
938 				thresh = recv_cnt;
939 #else
940 			thresh = g_config.mos->max_concurrency;
941 #endif
942 
943 			/* Eunyoung, you may fix this later
944 			 * if there is no rcv packet, we will send as much as possible
945 			 */
946 			if (thresh == -1)
947 				thresh = g_config.mos->max_concurrency;
948 
949 			CheckRtmTimeout(mtcp, ts, thresh);
950 			CheckTimewaitExpire(mtcp, ts, thresh);
951 
952 			if (g_config.mos->tcp_timeout > 0 && ts != ts_prev) {
953 				CheckConnectionTimeout(mtcp, ts, thresh);
954 			}
955 
956 #if TIME_STAT
957 		}
958 		gettimeofday(&tcheck_ts, NULL);
959 		UpdateStatCounter(&mtcp->rtstat.tcheck,
960 				TimeDiffUs(&tcheck_ts, &processing_ts));
961 
962 		if (mtcp->flow_cnt > 0) {
963 #endif /* TIME_STAT */
964 
965 		}
966 
967 		/*
968 		 * before flushing epoll events, call monitor events for
969 		 * all registered `read` events
970 		 */
971 		if (mtcp->num_msp > 0)
972 			/* call this when only a standalone monitor is running */
973 			FlushMonitorReadEvents(mtcp);
974 
975 		/* if epoll is in use, flush all the queued events */
976 		if (mtcp->ep) {
977 			FlushBufferedReadEvents(mtcp);
978 			FlushEpollEvents(mtcp, ts);
979 		}
980 #if TIME_STAT
981 		gettimeofday(&epoll_ts, NULL);
982 		UpdateStatCounter(&mtcp->rtstat.epoll,
983 				TimeDiffUs(&epoll_ts, &tcheck_ts));
984 #endif /* TIME_STAT */
985 
986 		if (end_app_exists && mtcp->flow_cnt > 0) {
987 			/* handle stream queues  */
988 			HandleApplicationCalls(mtcp, ts);
989 		}
990 
991 #if TIME_STAT
992 		gettimeofday(&handle_ts, NULL);
993 		UpdateStatCounter(&mtcp->rtstat.handle,
994 				TimeDiffUs(&handle_ts, &epoll_ts));
995 #endif /* TIME_STAT */
996 
997 		WritePacketsToChunks(mtcp, ts);
998 
999 		/* send packets from write buffer */
1000 		/* Send until tx is available */
1001 		int num_dev = g_config.mos->netdev_table->num;
1002 		if (likely(mtcp->iom->send_pkts != NULL))
1003 			for (tx_inf = 0; tx_inf < num_dev; tx_inf++) {
1004 				mtcp->iom->send_pkts(ctx, tx_inf);
1005 			}
1006 
1007 #if TIME_STAT
1008 		gettimeofday(&xmit_ts, NULL);
1009 		UpdateStatCounter(&mtcp->rtstat.xmit,
1010 				TimeDiffUs(&xmit_ts, &handle_ts));
1011 #endif /* TIME_STAT */
1012 
1013 		if (ts != ts_prev) {
1014 			ts_prev = ts;
1015 #ifdef NETSTAT
1016 			if (ctx->cpu == printer) {
1017 #ifdef RUN_ARP
1018 				ARPTimer(mtcp, ts);
1019 #endif
1020 #ifdef NETSTAT
1021 				PrintNetworkStats(mtcp, ts);
1022 #endif
1023 			}
1024 #endif /* NETSTAT */
1025 		}
1026 
1027 		if (mtcp->iom->select)
1028 			mtcp->iom->select(ctx);
1029 
1030 		if (ctx->interrupt) {
1031 			InterruptApplication(mtcp);
1032 		}
1033 	}
1034 
1035 #if TESTING
1036 	DestroyRemainingFlows(mtcp);
1037 #endif
1038 
1039 	TRACE_DBG("MTCP thread %d out of main loop.\n", ctx->cpu);
1040 	/* flush logs */
1041 	flush_log_data(mtcp);
1042 	TRACE_DBG("MTCP thread %d flushed logs.\n", ctx->cpu);
1043 	InterruptApplication(mtcp);
1044 	TRACE_INFO("MTCP thread %d finished.\n", ctx->cpu);
1045 }
1046 /*----------------------------------------------------------------------------*/
1047 struct mtcp_sender *
1048 CreateMTCPSender(int ifidx)
1049 {
1050 	struct mtcp_sender *sender;
1051 
1052 	sender = (struct mtcp_sender *)calloc(1, sizeof(struct mtcp_sender));
1053 	if (!sender) {
1054 		return NULL;
1055 	}
1056 
1057 	sender->ifidx = ifidx;
1058 
1059 	TAILQ_INIT(&sender->control_list);
1060 	TAILQ_INIT(&sender->send_list);
1061 	TAILQ_INIT(&sender->ack_list);
1062 
1063 	sender->control_list_cnt = 0;
1064 	sender->send_list_cnt = 0;
1065 	sender->ack_list_cnt = 0;
1066 
1067 	return sender;
1068 }
1069 /*----------------------------------------------------------------------------*/
1070 void
1071 DestroyMTCPSender(struct mtcp_sender *sender)
1072 {
1073 	free(sender);
1074 }
1075 /*----------------------------------------------------------------------------*/
1076 static mtcp_manager_t
1077 InitializeMTCPManager(struct mtcp_thread_context* ctx)
1078 {
1079 	mtcp_manager_t mtcp;
1080 	char log_name[MAX_FILE_NAME];
1081 	int i;
1082 
1083 	posix_seq_srand((unsigned)pthread_self());
1084 
1085 	mtcp = (mtcp_manager_t)calloc(1, sizeof(struct mtcp_manager));
1086 	if (!mtcp) {
1087 		perror("malloc");
1088 		TRACE_ERROR("Failed to allocate mtcp_manager.\n");
1089 		return NULL;
1090 	}
1091 	g_mtcp[ctx->cpu] = mtcp;
1092 
1093 	mtcp->tcp_flow_table = CreateHashtable();
1094 	if (!mtcp->tcp_flow_table) {
1095 		CTRACE_ERROR("Falied to allocate tcp flow table.\n");
1096 		return NULL;
1097 	}
1098 
1099 #ifdef HUGEPAGE
1100 #define	IS_HUGEPAGE 1
1101 #else
1102 #define	IS_HUGEPAGE 0
1103 #endif
1104 	if (mon_app_exists) {
1105 		/* initialize event callback */
1106 #ifdef NEWEV
1107 		InitEvent(mtcp);
1108 #else
1109 		InitEvent(mtcp, NUM_EV_TABLE);
1110 #endif
1111 	}
1112 
1113 	if (!(mtcp->bufseg_pool = MPCreate(sizeof(tcpbufseg_t),
1114 			sizeof(tcpbufseg_t) * g_config.mos->max_concurrency *
1115 			((g_config.mos->rmem_size - 1) / UNITBUFSIZE + 1), 0))) {
1116 		TRACE_ERROR("Failed to allocate ev_table pool\n");
1117 		exit(0);
1118 	}
1119 	if (!(mtcp->sockent_pool = MPCreate(sizeof(struct sockent),
1120 			sizeof(struct sockent) * g_config.mos->max_concurrency * 3, 0))) {
1121 		TRACE_ERROR("Failed to allocate ev_table pool\n");
1122 		exit(0);
1123 	}
1124 #ifdef USE_TIMER_POOL
1125 	if (!(mtcp->timer_pool = MPCreate(sizeof(struct timer),
1126 					  sizeof(struct timer) * g_config.mos->max_concurrency * 10, 0))) {
1127 		TRACE_ERROR("Failed to allocate ev_table pool\n");
1128 		exit(0);
1129 	}
1130 #endif
1131 	mtcp->flow_pool = MPCreate(sizeof(tcp_stream),
1132 								sizeof(tcp_stream) * g_config.mos->max_concurrency, IS_HUGEPAGE);
1133 	if (!mtcp->flow_pool) {
1134 		CTRACE_ERROR("Failed to allocate tcp flow pool.\n");
1135 		return NULL;
1136 	}
1137 	mtcp->rv_pool = MPCreate(sizeof(struct tcp_recv_vars),
1138 			sizeof(struct tcp_recv_vars) * g_config.mos->max_concurrency, IS_HUGEPAGE);
1139 	if (!mtcp->rv_pool) {
1140 		CTRACE_ERROR("Failed to allocate tcp recv variable pool.\n");
1141 		return NULL;
1142 	}
1143 	mtcp->sv_pool = MPCreate(sizeof(struct tcp_send_vars),
1144 			sizeof(struct tcp_send_vars) * g_config.mos->max_concurrency, IS_HUGEPAGE);
1145 	if (!mtcp->sv_pool) {
1146 		CTRACE_ERROR("Failed to allocate tcp send variable pool.\n");
1147 		return NULL;
1148 	}
1149 
1150 	mtcp->rbm_snd = SBManagerCreate(g_config.mos->wmem_size, g_config.mos->no_ring_buffers,
1151 					g_config.mos->max_concurrency);
1152 	if (!mtcp->rbm_snd) {
1153 		CTRACE_ERROR("Failed to create send ring buffer.\n");
1154 		return NULL;
1155 	}
1156 
1157 	mtcp->smap = (socket_map_t)calloc(g_config.mos->max_concurrency, sizeof(struct socket_map));
1158 	if (!mtcp->smap) {
1159 		perror("calloc");
1160 		CTRACE_ERROR("Failed to allocate memory for stream map.\n");
1161 		return NULL;
1162 	}
1163 
1164 	if (mon_app_exists) {
1165 		mtcp->msmap = (socket_map_t)calloc(g_config.mos->max_concurrency, sizeof(struct socket_map));
1166 		if (!mtcp->msmap) {
1167 			perror("calloc");
1168 			CTRACE_ERROR("Failed to allocate memory for monitor stream map.\n");
1169 			return NULL;
1170 		}
1171 
1172 		for (i = 0; i < g_config.mos->max_concurrency; i++) {
1173 			mtcp->msmap[i].monitor_stream = calloc(1, sizeof(struct mon_stream));
1174 			if (!mtcp->msmap[i].monitor_stream) {
1175 				perror("calloc");
1176 				CTRACE_ERROR("Failed to allocate memory for monitr stream map\n");
1177 				return NULL;
1178 			}
1179 		}
1180 	}
1181 
1182 	TAILQ_INIT(&mtcp->timer_list);
1183 	TAILQ_INIT(&mtcp->monitors);
1184 
1185 	TAILQ_INIT(&mtcp->free_smap);
1186 	for (i = 0; i < g_config.mos->max_concurrency; i++) {
1187 		mtcp->smap[i].id = i;
1188 		mtcp->smap[i].socktype = MOS_SOCK_UNUSED;
1189 		memset(&mtcp->smap[i].saddr, 0, sizeof(struct sockaddr_in));
1190 		mtcp->smap[i].stream = NULL;
1191 		TAILQ_INSERT_TAIL(&mtcp->free_smap, &mtcp->smap[i], link);
1192 	}
1193 
1194 	if (mon_app_exists) {
1195 		TAILQ_INIT(&mtcp->free_msmap);
1196 		for (i = 0; i < g_config.mos->max_concurrency; i++) {
1197 			mtcp->msmap[i].id = i;
1198 			mtcp->msmap[i].socktype = MOS_SOCK_UNUSED;
1199 			memset(&mtcp->msmap[i].saddr, 0, sizeof(struct sockaddr_in));
1200 			TAILQ_INSERT_TAIL(&mtcp->free_msmap, &mtcp->msmap[i], link);
1201 		}
1202 	}
1203 
1204 	mtcp->ctx = ctx;
1205 	mtcp->ep = NULL;
1206 
1207 	snprintf(log_name, MAX_FILE_NAME, "%s/"LOG_FILE_NAME"_%d",
1208 			g_config.mos->mos_log, ctx->cpu);
1209 	mtcp->log_fp = fopen(log_name, "w+");
1210 	if (!mtcp->log_fp) {
1211 		perror("fopen");
1212 		CTRACE_ERROR("Failed to create file for logging. (%s)\n", log_name);
1213 		return NULL;
1214 	}
1215 	mtcp->sp_fd = g_logctx[ctx->cpu]->pair_sp_fd;
1216 	mtcp->logger = g_logctx[ctx->cpu];
1217 
1218 	mtcp->connectq = CreateStreamQueue(BACKLOG_SIZE);
1219 	if (!mtcp->connectq) {
1220 		CTRACE_ERROR("Failed to create connect queue.\n");
1221 		return NULL;
1222 	}
1223 	mtcp->sendq = CreateStreamQueue(g_config.mos->max_concurrency);
1224 	if (!mtcp->sendq) {
1225 		CTRACE_ERROR("Failed to create send queue.\n");
1226 		return NULL;
1227 	}
1228 	mtcp->ackq = CreateStreamQueue(g_config.mos->max_concurrency);
1229 	if (!mtcp->ackq) {
1230 		CTRACE_ERROR("Failed to create ack queue.\n");
1231 		return NULL;
1232 	}
1233 	mtcp->closeq = CreateStreamQueue(g_config.mos->max_concurrency);
1234 	if (!mtcp->closeq) {
1235 		CTRACE_ERROR("Failed to create close queue.\n");
1236 		return NULL;
1237 	}
1238 	mtcp->closeq_int = CreateInternalStreamQueue(g_config.mos->max_concurrency);
1239 	if (!mtcp->closeq_int) {
1240 		CTRACE_ERROR("Failed to create close queue.\n");
1241 		return NULL;
1242 	}
1243 	mtcp->resetq = CreateStreamQueue(g_config.mos->max_concurrency);
1244 	if (!mtcp->resetq) {
1245 		CTRACE_ERROR("Failed to create reset queue.\n");
1246 		return NULL;
1247 	}
1248 	mtcp->resetq_int = CreateInternalStreamQueue(g_config.mos->max_concurrency);
1249 	if (!mtcp->resetq_int) {
1250 		CTRACE_ERROR("Failed to create reset queue.\n");
1251 		return NULL;
1252 	}
1253 	mtcp->destroyq = CreateStreamQueue(g_config.mos->max_concurrency);
1254 	if (!mtcp->destroyq) {
1255 		CTRACE_ERROR("Failed to create destroy queue.\n");
1256 		return NULL;
1257 	}
1258 
1259 	mtcp->g_sender = CreateMTCPSender(-1);
1260 	if (!mtcp->g_sender) {
1261 		CTRACE_ERROR("Failed to create global sender structure.\n");
1262 		return NULL;
1263 	}
1264 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
1265 		mtcp->n_sender[i] = CreateMTCPSender(i);
1266 		if (!mtcp->n_sender[i]) {
1267 			CTRACE_ERROR("Failed to create per-nic sender structure.\n");
1268 			return NULL;
1269 		}
1270 	}
1271 
1272 	mtcp->rto_store = InitRTOHashstore();
1273 	TAILQ_INIT(&mtcp->timewait_list);
1274 	TAILQ_INIT(&mtcp->timeout_list);
1275 
1276 	return mtcp;
1277 }
1278 /*----------------------------------------------------------------------------*/
1279 static void *
1280 MTCPRunThread(void *arg)
1281 {
1282 	mctx_t mctx = (mctx_t)arg;
1283 	int cpu = mctx->cpu;
1284 	int working;
1285 	struct mtcp_manager *mtcp;
1286 	struct mtcp_thread_context *ctx;
1287 
1288 	/* affinitize the thread to this core first */
1289 	mtcp_core_affinitize(cpu);
1290 
1291 	/* memory alloc after core affinitization would use local memory
1292 	   most time */
1293 	ctx = calloc(1, sizeof(*ctx));
1294 	if (!ctx) {
1295 		perror("calloc");
1296 		TRACE_ERROR("Failed to calloc mtcp context.\n");
1297 		exit(-1);
1298 	}
1299 	ctx->thread = pthread_self();
1300 	ctx->cpu = cpu;
1301 	mtcp = ctx->mtcp_manager = InitializeMTCPManager(ctx);
1302 	if (!mtcp) {
1303 		TRACE_ERROR("Failed to initialize mtcp manager.\n");
1304 		exit(-1);
1305 	}
1306 
1307 	/* assign mtcp context's underlying I/O module */
1308 	mtcp->iom = current_iomodule_func;
1309 
1310 	/* I/O initializing */
1311 	if (mtcp->iom->init_handle)
1312 		mtcp->iom->init_handle(ctx);
1313 
1314 	if (pthread_mutex_init(&ctx->flow_pool_lock, NULL)) {
1315 		perror("pthread_mutex_init of ctx->flow_pool_lock\n");
1316 		exit(-1);
1317 	}
1318 
1319 	if (pthread_mutex_init(&ctx->socket_pool_lock, NULL)) {
1320 		perror("pthread_mutex_init of ctx->socket_pool_lock\n");
1321 		exit(-1);
1322 	}
1323 
1324 	SQ_LOCK_INIT(&ctx->connect_lock, "ctx->connect_lock", exit(-1));
1325 	SQ_LOCK_INIT(&ctx->close_lock, "ctx->close_lock", exit(-1));
1326 	SQ_LOCK_INIT(&ctx->reset_lock, "ctx->reset_lock", exit(-1));
1327 	SQ_LOCK_INIT(&ctx->sendq_lock, "ctx->sendq_lock", exit(-1));
1328 	SQ_LOCK_INIT(&ctx->ackq_lock, "ctx->ackq_lock", exit(-1));
1329 	SQ_LOCK_INIT(&ctx->destroyq_lock, "ctx->destroyq_lock", exit(-1));
1330 
1331 	/* remember this context pointer for signal processing */
1332 	g_pctx[cpu] = ctx;
1333 	mlockall(MCL_CURRENT);
1334 
1335 	// attach (nic device, queue)
1336 	working = AttachDevice(ctx);
1337 	if (working != 0) {
1338 		sem_post(&g_init_sem[ctx->cpu]);
1339 		TRACE_DBG("MTCP thread %d finished. Not attached any device\n", ctx->cpu);
1340 		pthread_exit(NULL);
1341 	}
1342 
1343 	TRACE_DBG("CPU %d: initialization finished.\n", cpu);
1344 	sem_post(&g_init_sem[ctx->cpu]);
1345 
1346 	/* start the main loop */
1347 	RunMainLoop(ctx);
1348 
1349 	TRACE_DBG("MTCP thread %d finished.\n", ctx->cpu);
1350 
1351 	/* signaling mTCP thread is done */
1352 	sem_post(&g_done_sem[mctx->cpu]);
1353 
1354 	//pthread_exit(NULL);
1355 	return 0;
1356 }
1357 /*----------------------------------------------------------------------------*/
1358 #ifdef ENABLE_DPDK
1359 static int MTCPDPDKRunThread(void *arg)
1360 {
1361 	MTCPRunThread(arg);
1362 	return 0;
1363 }
1364 #endif /* !ENABLE_DPDK */
1365 /*----------------------------------------------------------------------------*/
1366 mctx_t
1367 mtcp_create_context(int cpu)
1368 {
1369 	mctx_t mctx;
1370 	int ret;
1371 
1372 	if (cpu >=  g_config.mos->num_cores) {
1373 		TRACE_ERROR("Failed initialize new mtcp context. "
1374 					"Requested cpu id %d exceed the number of cores %d configured to use.\n",
1375 					cpu, g_config.mos->num_cores);
1376 		return NULL;
1377 	}
1378 
1379         /* check if mtcp_create_context() was already initialized */
1380         if (g_logctx[cpu] != NULL) {
1381                 TRACE_ERROR("%s was already initialized before!\n",
1382                             __FUNCTION__);
1383                 return NULL;
1384         }
1385 
1386 	ret = sem_init(&g_init_sem[cpu], 0, 0);
1387 	if (ret) {
1388 		TRACE_ERROR("Failed initialize init_sem.\n");
1389 		return NULL;
1390 	}
1391 
1392 	ret = sem_init(&g_done_sem[cpu], 0, 0);
1393 	if (ret) {
1394 		TRACE_ERROR("Failed initialize done_sem.\n");
1395 		return NULL;
1396 	}
1397 
1398 	mctx = (mctx_t)calloc(1, sizeof(struct mtcp_context));
1399 	if (!mctx) {
1400 		TRACE_ERROR("Failed to allocate memory for mtcp_context.\n");
1401 		return NULL;
1402 	}
1403 	mctx->cpu = cpu;
1404 	g_ctx[cpu] = mctx;
1405 
1406 	/* initialize logger */
1407 	g_logctx[cpu] = (struct log_thread_context *)
1408 			calloc(1, sizeof(struct log_thread_context));
1409 	if (!g_logctx[cpu]) {
1410 		perror("malloc");
1411 		TRACE_ERROR("Failed to allocate memory for log thread context.\n");
1412 		return NULL;
1413 	}
1414 	InitLogThreadContext(g_logctx[cpu], cpu);
1415 	if (pthread_create(&log_thread[cpu],
1416 			   NULL, ThreadLogMain, (void *)g_logctx[cpu])) {
1417 		perror("pthread_create");
1418 		TRACE_ERROR("Failed to create log thread\n");
1419 		return NULL;
1420 	}
1421 
1422 	/* use rte_eal_remote_launch() for DPDK
1423 	   (worker/slave threads are already initialized by rte_eal_init()) */
1424 #ifdef ENABLE_DPDK
1425 	/* Wake up mTCP threads (wake up I/O threads) */
1426 	if (current_iomodule_func == &dpdk_module_func) {
1427 		int master;
1428 		master = rte_get_master_lcore();
1429 		if (master == cpu) {
1430 			lcore_config[master].ret = 0;
1431 			lcore_config[master].state = FINISHED;
1432 			if (pthread_create(&g_thread[cpu],
1433 					   NULL, MTCPRunThread, (void *)mctx) != 0) {
1434 				TRACE_ERROR("pthread_create of mtcp thread failed!\n");
1435 				return NULL;
1436 			}
1437 		} else
1438 			rte_eal_remote_launch(MTCPDPDKRunThread, mctx, cpu);
1439 	} else
1440 #endif /* !ENABLE_DPDK */
1441 		{
1442 			if (pthread_create(&g_thread[cpu],
1443 					   NULL, MTCPRunThread, (void *)mctx) != 0) {
1444 				TRACE_ERROR("pthread_create of mtcp thread failed!\n");
1445 				return NULL;
1446 			}
1447 		}
1448 
1449 	sem_wait(&g_init_sem[cpu]);
1450 	sem_destroy(&g_init_sem[cpu]);
1451 
1452 	running[cpu] = TRUE;
1453 
1454 #ifdef NETSTAT
1455 #if NETSTAT_TOTAL
1456 	if (printer < 0) {
1457 		printer = cpu;
1458 		TRACE_INFO("CPU %d is in charge of printing stats.\n", printer);
1459 	}
1460 #endif
1461 #endif
1462 
1463 	return mctx;
1464 }
1465 /*----------------------------------------------------------------------------*/
1466 int
1467 mtcp_destroy_context(mctx_t mctx)
1468 {
1469 	struct mtcp_thread_context *ctx = g_pctx[mctx->cpu];
1470 	if (ctx != NULL)
1471 		ctx->done = 1;
1472 
1473 	struct mtcp_context m;
1474 	m.cpu = mctx->cpu;
1475 	mtcp_free_context(&m);
1476 
1477 	free(mctx);
1478 
1479 	return 0;
1480 }
1481 /*----------------------------------------------------------------------------*/
1482 void
1483 mtcp_free_context(mctx_t mctx)
1484 {
1485 	struct mtcp_thread_context *ctx = g_pctx[mctx->cpu];
1486 	struct mtcp_manager *mtcp = ctx->mtcp_manager;
1487 	struct log_thread_context *log_ctx = mtcp->logger;
1488 	int ret, i;
1489 
1490 	TRACE_DBG("CPU %d: mtcp_free_context()\n", mctx->cpu);
1491 
1492 	if (g_pctx[mctx->cpu] == NULL) return;
1493 
1494 	/* close all stream sockets that are still open */
1495 	if (!ctx->exit) {
1496 		for (i = 0; i < g_config.mos->max_concurrency; i++) {
1497 			if (mtcp->smap[i].socktype == MOS_SOCK_STREAM) {
1498 				TRACE_DBG("Closing remaining socket %d (%s)\n",
1499 						i, TCPStateToString(mtcp->smap[i].stream));
1500 #ifdef DUMP_STREAM
1501 				DumpStream(mtcp, mtcp->smap[i].stream);
1502 #endif
1503 				mtcp_close(mctx, i);
1504 			}
1505 		}
1506 	}
1507 
1508 	ctx->done = 1;
1509 	ctx->exit = 1;
1510 
1511 #ifdef ENABLE_DPDK
1512 	if (current_iomodule_func == &dpdk_module_func) {
1513 		int master = rte_get_master_lcore();
1514 		if (master == mctx->cpu)
1515 			pthread_join(g_thread[mctx->cpu], NULL);
1516 		else
1517 			rte_eal_wait_lcore(mctx->cpu);
1518 	} else
1519 #endif /* !ENABLE_DPDK */
1520 		{
1521 			pthread_join(g_thread[mctx->cpu], NULL);
1522 		}
1523 
1524 	TRACE_INFO("MTCP thread %d joined.\n", mctx->cpu);
1525 
1526 	running[mctx->cpu] = FALSE;
1527 
1528 #ifdef NETSTAT
1529 #if NETSTAT_TOTAL
1530 	if (printer == mctx->cpu) {
1531 		for (i = 0; i < num_cpus; i++) {
1532 			if (i != mctx->cpu && running[i]) {
1533 				printer = i;
1534 				break;
1535 			}
1536 		}
1537 	}
1538 #endif
1539 #endif
1540 
1541 	log_ctx->done = 1;
1542 	ret = write(log_ctx->pair_sp_fd, "F", 1);
1543 	if (ret != 1)
1544 		TRACE_ERROR("CPU %d: Fail to signal socket pair\n", mctx->cpu);
1545 
1546 	if ((ret = pthread_join(log_thread[ctx->cpu], NULL) != 0)) {
1547 	    TRACE_ERROR("pthread_join() returns error (errno = %s)\n", strerror(ret));
1548 		exit(-1);
1549 	}
1550 
1551 
1552 	fclose(mtcp->log_fp);
1553 	TRACE_LOG("Log thread %d joined.\n", mctx->cpu);
1554 
1555 	if (mtcp->connectq) {
1556 		DestroyStreamQueue(mtcp->connectq);
1557 		mtcp->connectq = NULL;
1558 	}
1559 	if (mtcp->sendq) {
1560 		DestroyStreamQueue(mtcp->sendq);
1561 		mtcp->sendq = NULL;
1562 	}
1563 	if (mtcp->ackq) {
1564 		DestroyStreamQueue(mtcp->ackq);
1565 		mtcp->ackq = NULL;
1566 	}
1567 	if (mtcp->closeq) {
1568 		DestroyStreamQueue(mtcp->closeq);
1569 		mtcp->closeq = NULL;
1570 	}
1571 	if (mtcp->closeq_int) {
1572 		DestroyInternalStreamQueue(mtcp->closeq_int);
1573 		mtcp->closeq_int = NULL;
1574 	}
1575 	if (mtcp->resetq) {
1576 		DestroyStreamQueue(mtcp->resetq);
1577 		mtcp->resetq = NULL;
1578 	}
1579 	if (mtcp->resetq_int) {
1580 		DestroyInternalStreamQueue(mtcp->resetq_int);
1581 		mtcp->resetq_int = NULL;
1582 	}
1583 	if (mtcp->destroyq) {
1584 		DestroyStreamQueue(mtcp->destroyq);
1585 		mtcp->destroyq = NULL;
1586 	}
1587 
1588 	DestroyMTCPSender(mtcp->g_sender);
1589 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
1590 		DestroyMTCPSender(mtcp->n_sender[i]);
1591 	}
1592 
1593 	MPDestroy(mtcp->rv_pool);
1594 	MPDestroy(mtcp->sv_pool);
1595 	MPDestroy(mtcp->flow_pool);
1596 
1597 	if (mtcp->ap) {
1598 		DestroyAddressPool(mtcp->ap);
1599 		mtcp->ap = NULL;
1600 	}
1601 
1602 	SQ_LOCK_DESTROY(&ctx->connect_lock);
1603 	SQ_LOCK_DESTROY(&ctx->close_lock);
1604 	SQ_LOCK_DESTROY(&ctx->reset_lock);
1605 	SQ_LOCK_DESTROY(&ctx->sendq_lock);
1606 	SQ_LOCK_DESTROY(&ctx->ackq_lock);
1607 	SQ_LOCK_DESTROY(&ctx->destroyq_lock);
1608 
1609 	//TRACE_INFO("MTCP thread %d destroyed.\n", mctx->cpu);
1610 	if (mtcp->iom->destroy_handle)
1611 		mtcp->iom->destroy_handle(ctx);
1612 	if (g_logctx[mctx->cpu]) {
1613 		free(g_logctx[mctx->cpu]);
1614 		g_logctx[mctx->cpu] = NULL;
1615 	}
1616 	free(ctx);
1617 	g_pctx[mctx->cpu] = NULL;
1618 }
1619 /*----------------------------------------------------------------------------*/
1620 mtcp_sighandler_t
1621 mtcp_register_signal(int signum, mtcp_sighandler_t handler)
1622 {
1623 	mtcp_sighandler_t prev;
1624 
1625 	if (signum == SIGINT) {
1626 		prev = app_signal_handler;
1627 		app_signal_handler = handler;
1628 	} else {
1629 		if ((prev = signal(signum, handler)) == SIG_ERR) {
1630 			perror("signal");
1631 			return SIG_ERR;
1632 		}
1633 	}
1634 
1635 	return prev;
1636 }
1637 /*----------------------------------------------------------------------------*/
1638 int
1639 mtcp_getconf(struct mtcp_conf *conf)
1640 {
1641 	int i, j;
1642 
1643 	if (!conf) {
1644 		errno = EINVAL;
1645 		return -1;
1646 	}
1647 
1648 	conf->num_cores = g_config.mos->num_cores;
1649 	conf->max_concurrency = g_config.mos->max_concurrency;
1650 	conf->cpu_mask = g_config.mos->cpu_mask;
1651 
1652 	conf->rcvbuf_size = g_config.mos->rmem_size;
1653 	conf->sndbuf_size = g_config.mos->wmem_size;
1654 
1655 	conf->tcp_timewait = g_config.mos->tcp_tw_interval;
1656 	conf->tcp_timeout = g_config.mos->tcp_timeout;
1657 
1658 	i = 0;
1659 	struct conf_block *bwalk;
1660 	TAILQ_FOREACH(bwalk, &g_config.app_blkh, link) {
1661 		struct app_conf *app_conf = (struct app_conf *)bwalk->conf;
1662 		for (j = 0; j < app_conf->app_argc; j++)
1663 			conf->app_argv[i][j] = app_conf->app_argv[j];
1664 		conf->app_argc[i] = app_conf->app_argc;
1665 		conf->app_cpu_mask[i] = app_conf->cpu_mask;
1666 		i++;
1667 	}
1668 	conf->num_app = i;
1669 
1670 	return 0;
1671 }
1672 /*----------------------------------------------------------------------------*/
1673 int
1674 mtcp_setconf(const struct mtcp_conf *conf)
1675 {
1676 	if (!conf)
1677 		return -1;
1678 
1679 	g_config.mos->num_cores = conf->num_cores;
1680 	g_config.mos->max_concurrency = conf->max_concurrency;
1681 
1682 	g_config.mos->rmem_size = conf->rcvbuf_size;
1683 	g_config.mos->wmem_size = conf->sndbuf_size;
1684 
1685 	g_config.mos->tcp_tw_interval = conf->tcp_timewait;
1686 	g_config.mos->tcp_timeout = conf->tcp_timeout;
1687 
1688 	TRACE_CONFIG("Configuration updated by mtcp_setconf().\n");
1689 	//PrintConfiguration();
1690 
1691 	return 0;
1692 }
1693 /*----------------------------------------------------------------------------*/
1694 int
1695 mtcp_init(const char *config_file)
1696 {
1697 	int i;
1698 	int ret;
1699 
1700 	if (geteuid()) {
1701 		TRACE_CONFIG("[CAUTION] Run as root if mlock is necessary.\n");
1702 #if defined(ENABLE_DPDK) || defined(ENABLE_NETMAP)
1703 		TRACE_CONFIG("[CAUTION] Run the app as root!\n");
1704 		exit(EXIT_FAILURE);
1705 #endif
1706 	}
1707 
1708 	/* getting cpu and NIC */
1709 	num_cpus = GetNumCPUs();
1710 	assert(num_cpus >= 1);
1711 	for (i = 0; i < num_cpus; i++) {
1712 		g_mtcp[i] = NULL;
1713 		running[i] = FALSE;
1714 		sigint_cnt[i] = 0;
1715 	}
1716 
1717 	ret = LoadConfigurationUpperHalf(config_file);
1718 	if (ret) {
1719 		TRACE_CONFIG("Error occured while loading configuration.\n");
1720 		return -1;
1721 	}
1722 
1723 #if defined(ENABLE_PSIO)
1724 	current_iomodule_func = &ps_module_func;
1725 #elif defined(ENABLE_DPDK)
1726 	current_iomodule_func = &dpdk_module_func;
1727 #elif defined(ENABLE_PCAP)
1728 	current_iomodule_func = &pcap_module_func;
1729 #elif defined(ENABLE_NETMAP)
1730 	current_iomodule_func = &netmap_module_func;
1731 #endif
1732 
1733 	if (current_iomodule_func->load_module_upper_half)
1734 		current_iomodule_func->load_module_upper_half();
1735 
1736 	LoadConfigurationLowerHalf();
1737 
1738 	//PrintConfiguration();
1739 
1740 	for (i = 0; i < g_config.mos->netdev_table->num; i++) {
1741 		ap[i] = CreateAddressPool(g_config.mos->netdev_table->ent[i]->ip_addr, 1);
1742 		if (!ap[i]) {
1743 			TRACE_CONFIG("Error occured while create address pool[%d]\n",
1744 				     i);
1745 			return -1;
1746 		}
1747         }
1748 
1749 	//PrintInterfaceInfo();
1750 	//PrintRoutingTable();
1751 	//PrintARPTable();
1752 	InitARPTable();
1753 
1754 	if (signal(SIGUSR1, HandleSignal) == SIG_ERR) {
1755 		perror("signal, SIGUSR1");
1756 		return -1;
1757 	}
1758 	if (signal(SIGINT, HandleSignal) == SIG_ERR) {
1759 		perror("signal, SIGINT");
1760 		return -1;
1761 	}
1762 	app_signal_handler = NULL;
1763 
1764 	printf("load_module(): %p\n", current_iomodule_func);
1765 	/* load system-wide io module specs */
1766 	if (current_iomodule_func->load_module_lower_half)
1767 		current_iomodule_func->load_module_lower_half();
1768 
1769 	GlobInitEvent();
1770 
1771 	PrintConf(&g_config);
1772 
1773 	return 0;
1774 }
1775 /*----------------------------------------------------------------------------*/
1776 int
1777 mtcp_destroy()
1778 {
1779 	int i;
1780 
1781 	/* wait until all threads are closed */
1782 	/*
1783 	for (i = 0; i < num_cpus; i++) {
1784 		if (running[i]) {
1785 			if (pthread_join(g_thread[i], NULL) != 0)
1786 				return -1;
1787 		}
1788 	}
1789 	*/
1790 
1791 	for (i = 0; i < g_config.mos->netdev_table->num; i++)
1792 		DestroyAddressPool(ap[i]);
1793 
1794 	TRACE_INFO("All MTCP threads are joined.\n");
1795 
1796 	return 0;
1797 }
1798 /*----------------------------------------------------------------------------*/
1799