1 #ifndef _GNU_SOURCE
2 #define _GNU_SOURCE
3 #endif
4 #include <sched.h>
5 #include <unistd.h>
6 #include <sys/time.h>
7 #include <semaphore.h>
8 #include <sys/mman.h>
9 #include <signal.h>
10 #include <assert.h>
11 #include <string.h>
12
13 #include "cpu.h"
14 #include "eth_in.h"
15 #include "fhash.h"
16 #include "tcp_send_buffer.h"
17 #include "tcp_ring_buffer.h"
18 #include "socket.h"
19 #include "eth_out.h"
20 #include "tcp.h"
21 #include "tcp_in.h"
22 #include "tcp_out.h"
23 #include "mtcp_api.h"
24 #include "eventpoll.h"
25 #include "logger.h"
26 #include "config.h"
27 #include "arp.h"
28 #include "ip_out.h"
29 #include "timer.h"
30 #include "debug.h"
31 #include "event_callback.h"
32 #include "tcp_rb.h"
33 #include "tcp_stream.h"
34 #include "io_module.h"
35
36 #ifdef ENABLE_DPDK
37 /* for launching rte thread */
38 #include <rte_launch.h>
39 #include <rte_lcore.h>
40 #endif /* !ENABLE_DPDK */
41 #define PS_CHUNK_SIZE 64
42 #define RX_THRESH (PS_CHUNK_SIZE * 0.8)
43
44 #define ROUND_STAT FALSE
45 #define TIME_STAT FALSE
46 #define EVENT_STAT FALSE
47 #define TESTING FALSE
48
49 #define LOG_FILE_NAME "log"
50 #define MAX_FILE_NAME 1024
51
52 #define MAX(a, b) ((a)>(b)?(a):(b))
53 #define MIN(a, b) ((a)<(b)?(a):(b))
54
55 #define PER_STREAM_SLICE 0.1 // in ms
56 #define PER_STREAM_TCHECK 1 // in ms
57 #define PS_SELECT_TIMEOUT 100 // in us
58
59 #define GBPS(bytes) (bytes * 8.0 / (1000 * 1000 * 1000))
60
61 /*----------------------------------------------------------------------------*/
62 /* handlers for threads */
63 struct mtcp_thread_context *g_pctx[MAX_CPUS] = {0};
64 struct log_thread_context *g_logctx[MAX_CPUS] = {0};
65 /*----------------------------------------------------------------------------*/
66 static pthread_t g_thread[MAX_CPUS] = {0};
67 static pthread_t log_thread[MAX_CPUS] = {0};
68 /*----------------------------------------------------------------------------*/
69 static sem_t g_init_sem[MAX_CPUS];
70 static sem_t g_done_sem[MAX_CPUS];
71 static int running[MAX_CPUS] = {0};
72 /*----------------------------------------------------------------------------*/
73 mtcp_sighandler_t app_signal_handler;
74 static int sigint_cnt[MAX_CPUS] = {0};
75 static struct timespec sigint_ts[MAX_CPUS];
76 void mtcp_free_context(mctx_t mctx);
77 /*----------------------------------------------------------------------------*/
78 #ifdef NETSTAT
79 #if NETSTAT_TOTAL
80 static int printer = -1;
81 #if ROUND_STAT
82 #endif /* ROUND_STAT */
83 #endif /* NETSTAT_TOTAL */
84 #endif /* NETSTAT */
85 /*----------------------------------------------------------------------------*/
86 void
HandleSignal(int signal)87 HandleSignal(int signal)
88 {
89 int i = 0;
90
91 if (signal == SIGINT) {
92 FreeConfigResources();
93 #ifdef DARWIN
94 int core = 0;
95 #else
96 int core = sched_getcpu();
97 #endif
98 struct timespec cur_ts;
99
100 clock_gettime(CLOCK_REALTIME, &cur_ts);
101
102 if (sigint_cnt[core] > 0 && cur_ts.tv_sec == sigint_ts[core].tv_sec) {
103 for (i = 0; i < g_config.mos->num_cores; i++) {
104 if (running[i]) {
105 //exit(0);
106 g_pctx[i]->exit = TRUE;
107 }
108 }
109 } else {
110 for (i = 0; i < g_config.mos->num_cores; i++) {
111 if (g_pctx[i])
112 g_pctx[i]->interrupt = TRUE;
113 }
114 if (!app_signal_handler) {
115 for (i = 0; i < g_config.mos->num_cores; i++) {
116 if (running[i]) {
117 //exit(0);
118 g_pctx[i]->exit = TRUE;
119 }
120 }
121 }
122 }
123 sigint_cnt[core]++;
124 clock_gettime(CLOCK_REALTIME, &sigint_ts[core]);
125 }
126
127 if (signal != SIGUSR1) {
128 if (app_signal_handler) {
129 app_signal_handler(signal);
130 }
131 }
132 }
133 /*----------------------------------------------------------------------------*/
134 static int
AttachDevice(struct mtcp_thread_context * ctx)135 AttachDevice(struct mtcp_thread_context* ctx)
136 {
137 int working = -1;
138 mtcp_manager_t mtcp = ctx->mtcp_manager;
139
140 if (mtcp->iom->link_devices)
141 working = mtcp->iom->link_devices(ctx);
142 else
143 return 0;
144
145 return working;
146 }
147 /*----------------------------------------------------------------------------*/
148 #ifdef TIMESTAT
149 static inline void
InitStatCounter(struct stat_counter * counter)150 InitStatCounter(struct stat_counter *counter)
151 {
152 counter->cnt = 0;
153 counter->sum = 0;
154 counter->max = 0;
155 counter->min = 0;
156 }
157 /*----------------------------------------------------------------------------*/
158 static inline void
UpdateStatCounter(struct stat_counter * counter,int64_t value)159 UpdateStatCounter(struct stat_counter *counter, int64_t value)
160 {
161 counter->cnt++;
162 counter->sum += value;
163 if (value > counter->max)
164 counter->max = value;
165 if (counter->min == 0 || value < counter->min)
166 counter->min = value;
167 }
168 /*----------------------------------------------------------------------------*/
169 static inline uint64_t
GetAverageStat(struct stat_counter * counter)170 GetAverageStat(struct stat_counter *counter)
171 {
172 return counter->cnt ? (counter->sum / counter->cnt) : 0;
173 }
174 /*----------------------------------------------------------------------------*/
175 static inline int64_t
TimeDiffUs(struct timeval * t2,struct timeval * t1)176 TimeDiffUs(struct timeval *t2, struct timeval *t1)
177 {
178 return (t2->tv_sec - t1->tv_sec) * 1000000 +
179 (int64_t)(t2->tv_usec - t1->tv_usec);
180 }
181 /*----------------------------------------------------------------------------*/
182 #endif
183 #ifdef NETSTAT
184 static inline void
PrintThreadNetworkStats(mtcp_manager_t mtcp,struct net_stat * ns)185 PrintThreadNetworkStats(mtcp_manager_t mtcp, struct net_stat *ns)
186 {
187 int i;
188
189 for (i = 0; i < g_config.mos->netdev_table->num; i++) {
190 ns->rx_packets[i] = mtcp->nstat.rx_packets[i] - mtcp->p_nstat.rx_packets[i];
191 ns->rx_errors[i] = mtcp->nstat.rx_errors[i] - mtcp->p_nstat.rx_errors[i];
192 ns->rx_bytes[i] = mtcp->nstat.rx_bytes[i] - mtcp->p_nstat.rx_bytes[i];
193 ns->tx_packets[i] = mtcp->nstat.tx_packets[i] - mtcp->p_nstat.tx_packets[i];
194 ns->tx_drops[i] = mtcp->nstat.tx_drops[i] - mtcp->p_nstat.tx_drops[i];
195 ns->tx_bytes[i] = mtcp->nstat.tx_bytes[i] - mtcp->p_nstat.tx_bytes[i];
196 #if NETSTAT_PERTHREAD
197 if (g_config.mos->netdev_table->ent[i]->stat_print) {
198 fprintf(stderr, "[CPU%2d] %s flows: %6u, "
199 "RX: %7llu(pps) (err: %5llu), %5.2lf(Gbps), "
200 "TX: %7llu(pps), %5.2lf(Gbps)\n",
201 mtcp->ctx->cpu,
202 g_config.mos->netdev_table->ent[i]->dev_name,
203 (unsigned)mtcp->flow_cnt,
204 (long long unsigned)ns->rx_packets[i],
205 (long long unsigned)ns->rx_errors[i],
206 GBPS(ns->rx_bytes[i]),
207 (long long unsigned)ns->tx_packets[i],
208 GBPS(ns->tx_bytes[i]));
209 }
210 #endif
211 }
212 mtcp->p_nstat = mtcp->nstat;
213
214 }
215 /*----------------------------------------------------------------------------*/
216 #if ROUND_STAT
217 static inline void
PrintThreadRoundStats(mtcp_manager_t mtcp,struct run_stat * rs)218 PrintThreadRoundStats(mtcp_manager_t mtcp, struct run_stat *rs)
219 {
220 #define ROUND_DIV (1000)
221 rs->rounds = mtcp->runstat.rounds - mtcp->p_runstat.rounds;
222 rs->rounds_rx = mtcp->runstat.rounds_rx - mtcp->p_runstat.rounds_rx;
223 rs->rounds_rx_try = mtcp->runstat.rounds_rx_try - mtcp->p_runstat.rounds_rx_try;
224 rs->rounds_tx = mtcp->runstat.rounds_tx - mtcp->p_runstat.rounds_tx;
225 rs->rounds_tx_try = mtcp->runstat.rounds_tx_try - mtcp->p_runstat.rounds_tx_try;
226 rs->rounds_select = mtcp->runstat.rounds_select - mtcp->p_runstat.rounds_select;
227 rs->rounds_select_rx = mtcp->runstat.rounds_select_rx - mtcp->p_runstat.rounds_select_rx;
228 rs->rounds_select_tx = mtcp->runstat.rounds_select_tx - mtcp->p_runstat.rounds_select_tx;
229 rs->rounds_select_intr = mtcp->runstat.rounds_select_intr - mtcp->p_runstat.rounds_select_intr;
230 rs->rounds_twcheck = mtcp->runstat.rounds_twcheck - mtcp->p_runstat.rounds_twcheck;
231 mtcp->p_runstat = mtcp->runstat;
232 #if NETSTAT_PERTHREAD
233 fprintf(stderr, "[CPU%2d] Rounds: %4lluK, "
234 "rx: %3lluK (try: %4lluK), tx: %3lluK (try: %4lluK), "
235 "ps_select: %4llu (rx: %4llu, tx: %4llu, intr: %3llu)\n",
236 mtcp->ctx->cpu, rs->rounds / ROUND_DIV,
237 rs->rounds_rx / ROUND_DIV, rs->rounds_rx_try / ROUND_DIV,
238 rs->rounds_tx / ROUND_DIV, rs->rounds_tx_try / ROUND_DIV,
239 rs->rounds_select,
240 rs->rounds_select_rx, rs->rounds_select_tx, rs->rounds_select_intr);
241 #endif
242 }
243 #endif /* ROUND_STAT */
244 /*----------------------------------------------------------------------------*/
245 #if TIME_STAT
246 static inline void
PrintThreadRoundTime(mtcp_manager_t mtcp)247 PrintThreadRoundTime(mtcp_manager_t mtcp)
248 {
249 fprintf(stderr, "[CPU%2d] Time: (avg, max) "
250 "round: (%4luus, %4luus), processing: (%4luus, %4luus), "
251 "tcheck: (%4luus, %4luus), epoll: (%4luus, %4luus), "
252 "handle: (%4luus, %4luus), xmit: (%4luus, %4luus), "
253 "select: (%4luus, %4luus)\n", mtcp->ctx->cpu,
254 GetAverageStat(&mtcp->rtstat.round), mtcp->rtstat.round.max,
255 GetAverageStat(&mtcp->rtstat.processing), mtcp->rtstat.processing.max,
256 GetAverageStat(&mtcp->rtstat.tcheck), mtcp->rtstat.tcheck.max,
257 GetAverageStat(&mtcp->rtstat.epoll), mtcp->rtstat.epoll.max,
258 GetAverageStat(&mtcp->rtstat.handle), mtcp->rtstat.handle.max,
259 GetAverageStat(&mtcp->rtstat.xmit), mtcp->rtstat.xmit.max,
260 GetAverageStat(&mtcp->rtstat.select), mtcp->rtstat.select.max);
261
262 InitStatCounter(&mtcp->rtstat.round);
263 InitStatCounter(&mtcp->rtstat.processing);
264 InitStatCounter(&mtcp->rtstat.tcheck);
265 InitStatCounter(&mtcp->rtstat.epoll);
266 InitStatCounter(&mtcp->rtstat.handle);
267 InitStatCounter(&mtcp->rtstat.xmit);
268 InitStatCounter(&mtcp->rtstat.select);
269 }
270 #endif
271 #endif /* NETSTAT */
272 /*----------------------------------------------------------------------------*/
273 #if EVENT_STAT
274 static inline void
PrintEventStat(int core,struct mtcp_epoll_stat * stat)275 PrintEventStat(int core, struct mtcp_epoll_stat *stat)
276 {
277 fprintf(stderr, "[CPU%2d] calls: %lu, waits: %lu, wakes: %lu, "
278 "issued: %lu, registered: %lu, invalidated: %lu, handled: %lu\n",
279 core, stat->calls, stat->waits, stat->wakes,
280 stat->issued, stat->registered, stat->invalidated, stat->handled);
281 memset(stat, 0, sizeof(struct mtcp_epoll_stat));
282 }
283 #endif /* EVENT_STAT */
284 /*----------------------------------------------------------------------------*/
285 #ifdef NETSTAT
286 static inline void
PrintNetworkStats(mtcp_manager_t mtcp,uint32_t cur_ts)287 PrintNetworkStats(mtcp_manager_t mtcp, uint32_t cur_ts)
288 {
289 #define TIMEOUT 1
290 int i;
291 struct net_stat ns;
292 bool stat_print = false;
293 #if ROUND_STAT
294 struct run_stat rs;
295 #endif /* ROUND_STAT */
296 #ifdef NETSTAT_TOTAL
297 static double peak_total_rx_gbps = 0;
298 static double peak_total_tx_gbps = 0;
299 static double avg_total_rx_gbps = 0;
300 static double avg_total_tx_gbps = 0;
301
302 double total_rx_gbps = 0, total_tx_gbps = 0;
303 int j;
304 uint32_t gflow_cnt = 0;
305 struct net_stat g_nstat;
306 #if ROUND_STAT
307 struct run_stat g_runstat;
308 #endif /* ROUND_STAT */
309 #endif /* NETSTAT_TOTAL */
310
311 if (TS_TO_MSEC(cur_ts - mtcp->p_nstat_ts) < SEC_TO_MSEC(TIMEOUT)) {
312 return;
313 }
314
315 mtcp->p_nstat_ts = cur_ts;
316 gflow_cnt = 0;
317 memset(&g_nstat, 0, sizeof(struct net_stat));
318 for (i = 0; i < g_config.mos->num_cores; i++) {
319 if (running[i]) {
320 PrintThreadNetworkStats(g_mtcp[i], &ns);
321 #if NETSTAT_TOTAL
322 gflow_cnt += g_mtcp[i]->flow_cnt;
323 for (j = 0; j < g_config.mos->netdev_table->num; j++) {
324 g_nstat.rx_packets[j] += ns.rx_packets[j];
325 g_nstat.rx_errors[j] += ns.rx_errors[j];
326 g_nstat.rx_bytes[j] += ns.rx_bytes[j];
327 g_nstat.tx_packets[j] += ns.tx_packets[j];
328 g_nstat.tx_drops[j] += ns.tx_drops[j];
329 g_nstat.tx_bytes[j] += ns.tx_bytes[j];
330 }
331 #endif
332 }
333 }
334 #if NETSTAT_TOTAL
335 for (i = 0; i < g_config.mos->netdev_table->num; i++) {
336 if (g_config.mos->netdev_table->ent[i]->stat_print) {
337 fprintf(stderr, "[ ALL ] %s, "
338 "RX: %7llu(pps) (err: %5llu), %5.2lf(Gbps), "
339 "TX: %7llu(pps), %5.2lf(Gbps)\n",
340 g_config.mos->netdev_table->ent[i]->dev_name,
341 (long long unsigned)g_nstat.rx_packets[i],
342 (long long unsigned)g_nstat.rx_errors[i],
343 GBPS(g_nstat.rx_bytes[i]),
344 (long long unsigned)g_nstat.tx_packets[i],
345 GBPS(g_nstat.tx_bytes[i]));
346 total_rx_gbps += GBPS(g_nstat.rx_bytes[i]);
347 total_tx_gbps += GBPS(g_nstat.tx_bytes[i]);
348 stat_print = true;
349 }
350 }
351 if (stat_print) {
352 fprintf(stderr, "[ ALL ] flows: %6u\n", gflow_cnt);
353 if (avg_total_rx_gbps == 0)
354 avg_total_rx_gbps = total_rx_gbps;
355 else
356 avg_total_rx_gbps = avg_total_rx_gbps * 0.6 + total_rx_gbps * 0.4;
357
358 if (avg_total_tx_gbps == 0)
359 avg_total_tx_gbps = total_tx_gbps;
360 else
361 avg_total_tx_gbps = avg_total_tx_gbps * 0.6 + total_tx_gbps * 0.4;
362
363 if (peak_total_rx_gbps < total_rx_gbps)
364 peak_total_rx_gbps = total_rx_gbps;
365 if (peak_total_tx_gbps < total_tx_gbps)
366 peak_total_tx_gbps = total_tx_gbps;
367
368 fprintf(stderr, "[ PEAK ] RX: %5.2lf(Gbps), TX: %5.2lf(Gbps)\n"
369 "[ RECENT AVG ] RX: %5.2lf(Gbps), TX: %5.2lf(Gbps)\n",
370 peak_total_rx_gbps, peak_total_tx_gbps,
371 avg_total_rx_gbps, avg_total_tx_gbps);
372 }
373 #endif
374
375 #if ROUND_STAT
376 memset(&g_runstat, 0, sizeof(struct run_stat));
377 for (i = 0; i < g_config.mos->num_cores; i++) {
378 if (running[i]) {
379 PrintThreadRoundStats(g_mtcp[i], &rs);
380 #if DBGMSG
381 g_runstat.rounds += rs.rounds;
382 g_runstat.rounds_rx += rs.rounds_rx;
383 g_runstat.rounds_rx_try += rs.rounds_rx_try;
384 g_runstat.rounds_tx += rs.rounds_tx;
385 g_runstat.rounds_tx_try += rs.rounds_tx_try;
386 g_runstat.rounds_select += rs.rounds_select;
387 g_runstat.rounds_select_rx += rs.rounds_select_rx;
388 g_runstat.rounds_select_tx += rs.rounds_select_tx;
389 #endif
390 }
391 }
392
393 TRACE_DBG("[ ALL ] Rounds: %4ldK, "
394 "rx: %3ldK (try: %4ldK), tx: %3ldK (try: %4ldK), "
395 "ps_select: %4ld (rx: %4ld, tx: %4ld)\n",
396 g_runstat.rounds / 1000, g_runstat.rounds_rx / 1000,
397 g_runstat.rounds_rx_try / 1000, g_runstat.rounds_tx / 1000,
398 g_runstat.rounds_tx_try / 1000, g_runstat.rounds_select,
399 g_runstat.rounds_select_rx, g_runstat.rounds_select_tx);
400 #endif /* ROUND_STAT */
401
402 #if TIME_STAT
403 for (i = 0; i < g_config.mos->num_cores; i++) {
404 if (running[i]) {
405 PrintThreadRoundTime(g_mtcp[i]);
406 }
407 }
408 #endif
409
410 #if EVENT_STAT
411 for (i = 0; i < g_config.mos->num_cores; i++) {
412 if (running[i] && g_mtcp[i]->ep) {
413 PrintEventStat(i, &g_mtcp[i]->ep->stat);
414 }
415 }
416 #endif
417
418 fflush(stderr);
419 }
420 #endif /* NETSTAT */
421 /*----------------------------------------------------------------------------*/
422 static inline void
FlushMonitorReadEvents(mtcp_manager_t mtcp)423 FlushMonitorReadEvents(mtcp_manager_t mtcp)
424 {
425 struct event_queue *mtcpq;
426 struct tcp_stream *cur_stream;
427 struct mon_listener *walk;
428
429 /* check if monitor sockets should be passed data */
430 TAILQ_FOREACH(walk, &mtcp->monitors, link) {
431 if (walk->socket->socktype != MOS_SOCK_MONITOR_STREAM ||
432 !(mtcpq = walk->eq))
433 continue;
434
435 while (mtcpq->num_events > 0) {
436 cur_stream =
437 (struct tcp_stream *)mtcpq->events[mtcpq->start++].ev.data.ptr;
438 /* only read events */
439 if (cur_stream != NULL &&
440 (cur_stream->actions & MOS_ACT_READ_DATA)) {
441 if (cur_stream->rcvvar != NULL &&
442 cur_stream->rcvvar->rcvbuf != NULL) {
443 /* no need to pass pkt context */
444 struct socket_map *walk;
445 if (cur_stream->side == MOS_SIDE_CLI) {
446 SOCKQ_FOREACH_REVERSE(walk, &cur_stream->msocks) {
447 HandleCallback(mtcp, MOS_NULL, walk,
448 cur_stream->side, NULL,
449 MOS_ON_CONN_NEW_DATA);
450 } SOCKQ_FOREACH_END;
451 } else { /* cur_stream->side == MOS_SIDE_SVR */
452 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
453 HandleCallback(mtcp, MOS_NULL, walk,
454 cur_stream->side, NULL,
455 MOS_ON_CONN_NEW_DATA);
456 } SOCKQ_FOREACH_END;
457 }
458 }
459 /* reset the actions now */
460 cur_stream->actions = 0;
461 }
462 if (mtcpq->start >= mtcpq->size)
463 mtcpq->start = 0;
464 mtcpq->num_events--;
465 }
466 }
467 }
468 /*----------------------------------------------------------------------------*/
469 static inline void
FlushBufferedReadEvents(mtcp_manager_t mtcp)470 FlushBufferedReadEvents(mtcp_manager_t mtcp)
471 {
472 int i;
473 int offset;
474 struct event_queue *mtcpq;
475 struct tcp_stream *cur_stream;
476
477 if (mtcp->ep == NULL) {
478 TRACE_EPOLL("No epoll socket has been registered yet!\n");
479 return;
480 } else {
481 /* case when mtcpq exists */
482 mtcpq = mtcp->ep->mtcp_queue;
483 offset = mtcpq->start;
484 }
485
486 /* we will use queued-up epoll read-in events
487 * to trigger buffered read monitor events */
488 for (i = 0; i < mtcpq->num_events; i++) {
489 cur_stream = mtcp->smap[mtcpq->events[offset++].sockid].stream;
490 /* only read events */
491 /* Raise new data callback event */
492 if (cur_stream != NULL &&
493 (cur_stream->socket->events | MOS_EPOLLIN)) {
494 if (cur_stream->rcvvar != NULL &&
495 cur_stream->rcvvar->rcvbuf != NULL) {
496 /* no need to pass pkt context */
497 struct socket_map *walk;
498 if (cur_stream->side == MOS_SIDE_CLI) {
499 SOCKQ_FOREACH_REVERSE(walk, &cur_stream->msocks) {
500 HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side,
501 NULL, MOS_ON_CONN_NEW_DATA);
502 } SOCKQ_FOREACH_END;
503 } else { /* cur_stream->side == MOS_SIDE_SVR */
504 SOCKQ_FOREACH_START(walk, &cur_stream->msocks) {
505 HandleCallback(mtcp, MOS_NULL, walk, cur_stream->side,
506 NULL, MOS_ON_CONN_NEW_DATA);
507 } SOCKQ_FOREACH_END;
508 }
509 }
510 }
511 if (offset >= mtcpq->size)
512 offset = 0;
513 }
514 }
515 /*----------------------------------------------------------------------------*/
516 static inline void
FlushEpollEvents(mtcp_manager_t mtcp,uint32_t cur_ts)517 FlushEpollEvents(mtcp_manager_t mtcp, uint32_t cur_ts)
518 {
519 struct mtcp_epoll *ep = mtcp->ep;
520 struct event_queue *usrq = ep->usr_queue;
521 struct event_queue *mtcpq = ep->mtcp_queue;
522
523 pthread_mutex_lock(&ep->epoll_lock);
524 if (ep->mtcp_queue->num_events > 0) {
525 /* while mtcp_queue have events */
526 /* and usr_queue is not full */
527 while (mtcpq->num_events > 0 && usrq->num_events < usrq->size) {
528 /* copy the event from mtcp_queue to usr_queue */
529 usrq->events[usrq->end++] = mtcpq->events[mtcpq->start++];
530
531 if (usrq->end >= usrq->size)
532 usrq->end = 0;
533 usrq->num_events++;
534
535 if (mtcpq->start >= mtcpq->size)
536 mtcpq->start = 0;
537 mtcpq->num_events--;
538 }
539 }
540
541 /* if there are pending events, wake up user */
542 if (ep->waiting && (ep->usr_queue->num_events > 0 ||
543 ep->usr_shadow_queue->num_events > 0)) {
544 STAT_COUNT(mtcp->runstat.rounds_epoll);
545 TRACE_EPOLL("Broadcasting events. num: %d, cur_ts: %u, prev_ts: %u\n",
546 ep->usr_queue->num_events, cur_ts, mtcp->ts_last_event);
547 mtcp->ts_last_event = cur_ts;
548 ep->stat.wakes++;
549 pthread_cond_signal(&ep->epoll_cond);
550 }
551 pthread_mutex_unlock(&ep->epoll_lock);
552 }
553 /*----------------------------------------------------------------------------*/
554 static inline void
HandleApplicationCalls(mtcp_manager_t mtcp,uint32_t cur_ts)555 HandleApplicationCalls(mtcp_manager_t mtcp, uint32_t cur_ts)
556 {
557 tcp_stream *stream;
558 int cnt, max_cnt;
559 int handled, delayed;
560 int control, send, ack;
561
562 /* connect handling */
563 while ((stream = StreamDequeue(mtcp->connectq))) {
564 if (stream->state != TCP_ST_SYN_SENT) {
565 TRACE_INFO("Got a connection request from app with state: %s",
566 TCPStateToString(stream));
567 exit(EXIT_FAILURE);
568 } else {
569 stream->cb_events |= MOS_ON_CONN_START |
570 MOS_ON_TCP_STATE_CHANGE;
571 /* if monitor is on... */
572 if (stream->pair_stream != NULL)
573 stream->pair_stream->cb_events |=
574 MOS_ON_CONN_START;
575 }
576 AddtoControlList(mtcp, stream, cur_ts);
577 }
578
579 /* send queue handling */
580 while ((stream = StreamDequeue(mtcp->sendq))) {
581 stream->sndvar->on_sendq = FALSE;
582 AddtoSendList(mtcp, stream);
583 }
584
585 /* ack queue handling */
586 while ((stream = StreamDequeue(mtcp->ackq))) {
587 stream->sndvar->on_ackq = FALSE;
588 EnqueueACK(mtcp, stream, cur_ts, ACK_OPT_AGGREGATE);
589 }
590
591 /* close handling */
592 handled = delayed = 0;
593 control = send = ack = 0;
594 while ((stream = StreamDequeue(mtcp->closeq))) {
595 struct tcp_send_vars *sndvar = stream->sndvar;
596 sndvar->on_closeq = FALSE;
597
598 if (sndvar->sndbuf) {
599 sndvar->fss = sndvar->sndbuf->head_seq + sndvar->sndbuf->len;
600 } else {
601 sndvar->fss = stream->snd_nxt;
602 }
603
604 if (g_config.mos->tcp_timeout > 0)
605 RemoveFromTimeoutList(mtcp, stream);
606
607 if (stream->have_reset) {
608 handled++;
609 if (stream->state != TCP_ST_CLOSED_RSVD) {
610 stream->close_reason = TCP_RESET;
611 stream->state = TCP_ST_CLOSED_RSVD;
612 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
613 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id);
614 DestroyTCPStream(mtcp, stream);
615 } else {
616 TRACE_ERROR("Stream already closed.\n");
617 }
618
619 } else if (sndvar->on_control_list) {
620 sndvar->on_closeq_int = TRUE;
621 StreamInternalEnqueue(mtcp->closeq_int, stream);
622 delayed++;
623 if (sndvar->on_control_list)
624 control++;
625 if (sndvar->on_send_list)
626 send++;
627 if (sndvar->on_ack_list)
628 ack++;
629
630 } else if (sndvar->on_send_list || sndvar->on_ack_list) {
631 handled++;
632 if (stream->state == TCP_ST_ESTABLISHED) {
633 stream->state = TCP_ST_FIN_WAIT_1;
634 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
635 TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id);
636
637 } else if (stream->state == TCP_ST_CLOSE_WAIT) {
638 stream->state = TCP_ST_LAST_ACK;
639 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
640 TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id);
641 }
642 stream->control_list_waiting = TRUE;
643
644 } else if (stream->state != TCP_ST_CLOSED_RSVD) {
645 handled++;
646 if (stream->state == TCP_ST_ESTABLISHED) {
647 stream->state = TCP_ST_FIN_WAIT_1;
648 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
649 TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id);
650
651 } else if (stream->state == TCP_ST_CLOSE_WAIT) {
652 stream->state = TCP_ST_LAST_ACK;
653 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
654 TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id);
655 }
656 //sndvar->rto = TCP_FIN_RTO;
657 //UpdateRetransmissionTimer(mtcp, stream, mtcp->cur_ts);
658 AddtoControlList(mtcp, stream, cur_ts);
659 } else {
660 TRACE_ERROR("Already closed connection!\n");
661 }
662 }
663 TRACE_ROUND("Handling close connections. cnt: %d\n", cnt);
664
665 cnt = 0;
666 max_cnt = mtcp->closeq_int->count;
667 while (cnt++ < max_cnt) {
668 stream = StreamInternalDequeue(mtcp->closeq_int);
669
670 if (stream->sndvar->on_control_list) {
671 StreamInternalEnqueue(mtcp->closeq_int, stream);
672
673 } else if (stream->state != TCP_ST_CLOSED_RSVD) {
674 handled++;
675 stream->sndvar->on_closeq_int = FALSE;
676 if (stream->state == TCP_ST_ESTABLISHED) {
677 stream->state = TCP_ST_FIN_WAIT_1;
678 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
679 TRACE_STATE("Stream %d: TCP_ST_FIN_WAIT_1\n", stream->id);
680
681 } else if (stream->state == TCP_ST_CLOSE_WAIT) {
682 stream->state = TCP_ST_LAST_ACK;
683 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
684 TRACE_STATE("Stream %d: TCP_ST_LAST_ACK\n", stream->id);
685 }
686 AddtoControlList(mtcp, stream, cur_ts);
687 } else {
688 stream->sndvar->on_closeq_int = FALSE;
689 TRACE_ERROR("Already closed connection!\n");
690 }
691 }
692
693 /* reset handling */
694 while ((stream = StreamDequeue(mtcp->resetq))) {
695 stream->sndvar->on_resetq = FALSE;
696
697 if (g_config.mos->tcp_timeout > 0)
698 RemoveFromTimeoutList(mtcp, stream);
699
700 if (stream->have_reset) {
701 if (stream->state != TCP_ST_CLOSED_RSVD) {
702 stream->close_reason = TCP_RESET;
703 stream->state = TCP_ST_CLOSED_RSVD;
704 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
705 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id);
706 DestroyTCPStream(mtcp, stream);
707 } else {
708 TRACE_ERROR("Stream already closed.\n");
709 }
710
711 } else if (stream->sndvar->on_control_list ||
712 stream->sndvar->on_send_list || stream->sndvar->on_ack_list) {
713 /* wait until all the queues are flushed */
714 stream->sndvar->on_resetq_int = TRUE;
715 StreamInternalEnqueue(mtcp->resetq_int, stream);
716
717 } else {
718 if (stream->state != TCP_ST_CLOSED_RSVD) {
719 stream->close_reason = TCP_ACTIVE_CLOSE;
720 stream->state = TCP_ST_CLOSED_RSVD;
721 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
722 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id);
723 AddtoControlList(mtcp, stream, cur_ts);
724 } else {
725 TRACE_ERROR("Stream already closed.\n");
726 }
727 }
728 }
729 TRACE_ROUND("Handling reset connections. cnt: %d\n", cnt);
730
731 cnt = 0;
732 max_cnt = mtcp->resetq_int->count;
733 while (cnt++ < max_cnt) {
734 stream = StreamInternalDequeue(mtcp->resetq_int);
735
736 if (stream->sndvar->on_control_list ||
737 stream->sndvar->on_send_list || stream->sndvar->on_ack_list) {
738 /* wait until all the queues are flushed */
739 StreamInternalEnqueue(mtcp->resetq_int, stream);
740
741 } else {
742 stream->sndvar->on_resetq_int = FALSE;
743
744 if (stream->state != TCP_ST_CLOSED_RSVD) {
745 stream->close_reason = TCP_ACTIVE_CLOSE;
746 stream->state = TCP_ST_CLOSED_RSVD;
747 stream->cb_events |= MOS_ON_TCP_STATE_CHANGE;
748 TRACE_STATE("Stream %d: TCP_ST_CLOSED_RSVD\n", stream->id);
749 AddtoControlList(mtcp, stream, cur_ts);
750 } else {
751 TRACE_ERROR("Stream already closed.\n");
752 }
753 }
754 }
755
756 /* destroy streams in destroyq */
757 while ((stream = StreamDequeue(mtcp->destroyq))) {
758 DestroyTCPStream(mtcp, stream);
759 }
760
761 mtcp->wakeup_flag = FALSE;
762 }
763 /*----------------------------------------------------------------------------*/
764 static inline void
WritePacketsToChunks(mtcp_manager_t mtcp,uint32_t cur_ts)765 WritePacketsToChunks(mtcp_manager_t mtcp, uint32_t cur_ts)
766 {
767 int thresh = g_config.mos->max_concurrency;
768 int i;
769
770 /* Set the threshold to g_config.mos->max_concurrency to send ACK immediately */
771 /* Otherwise, set to appropriate value (e.g. thresh) */
772 assert(mtcp->g_sender != NULL);
773 if (mtcp->g_sender->control_list_cnt)
774 WriteTCPControlList(mtcp, mtcp->g_sender, cur_ts, thresh);
775 if (mtcp->g_sender->ack_list_cnt)
776 WriteTCPACKList(mtcp, mtcp->g_sender, cur_ts, thresh);
777 if (mtcp->g_sender->send_list_cnt)
778 WriteTCPDataList(mtcp, mtcp->g_sender, cur_ts, thresh);
779
780 for (i = 0; i < g_config.mos->netdev_table->num; i++) {
781 assert(mtcp->n_sender[i] != NULL);
782 if (mtcp->n_sender[i]->control_list_cnt)
783 WriteTCPControlList(mtcp, mtcp->n_sender[i], cur_ts, thresh);
784 if (mtcp->n_sender[i]->ack_list_cnt)
785 WriteTCPACKList(mtcp, mtcp->n_sender[i], cur_ts, thresh);
786 if (mtcp->n_sender[i]->send_list_cnt)
787 WriteTCPDataList(mtcp, mtcp->n_sender[i], cur_ts, thresh);
788 }
789 }
790 /*----------------------------------------------------------------------------*/
791 #if TESTING
792 static int
DestroyRemainingFlows(mtcp_manager_t mtcp)793 DestroyRemainingFlows(mtcp_manager_t mtcp)
794 {
795 struct hashtable *ht = mtcp->tcp_flow_table;
796 tcp_stream *walk;
797 int cnt, i;
798
799 cnt = 0;
800
801 thread_printf(mtcp, mtcp->log_fp,
802 "CPU %d: Flushing remaining flows.\n", mtcp->ctx->cpu);
803
804 for (i = 0; i < NUM_BINS; i++) {
805 TAILQ_FOREACH(walk, &ht->ht_table[i], rcvvar->he_link) {
806 thread_printf(mtcp, mtcp->log_fp,
807 "CPU %d: Destroying stream %d\n", mtcp->ctx->cpu, walk->id);
808 #ifdef DUMP_STREAM
809 DumpStream(mtcp, walk);
810 #endif
811 DestroyTCPStream(mtcp, walk);
812 cnt++;
813 }
814 }
815
816 return cnt;
817 }
818 #endif
819 /*----------------------------------------------------------------------------*/
820 static void
InterruptApplication(mtcp_manager_t mtcp)821 InterruptApplication(mtcp_manager_t mtcp)
822 {
823 /* interrupt if the mtcp_epoll_wait() is waiting */
824 if (mtcp->ep) {
825 pthread_mutex_lock(&mtcp->ep->epoll_lock);
826 if (mtcp->ep->waiting) {
827 pthread_cond_signal(&mtcp->ep->epoll_cond);
828 }
829 pthread_mutex_unlock(&mtcp->ep->epoll_lock);
830 }
831 /* interrupt if the accept() is waiting */
832 if (mtcp->listener) {
833 if (mtcp->listener->socket) {
834 pthread_mutex_lock(&mtcp->listener->accept_lock);
835 if (!(mtcp->listener->socket->opts & MTCP_NONBLOCK)) {
836 pthread_cond_signal(&mtcp->listener->accept_cond);
837 }
838 pthread_mutex_unlock(&mtcp->listener->accept_lock);
839 }
840 }
841 }
842 /*----------------------------------------------------------------------------*/
843 void
RunPassiveLoop(mtcp_manager_t mtcp)844 RunPassiveLoop(mtcp_manager_t mtcp)
845 {
846 sem_wait(&g_done_sem[mtcp->ctx->cpu]);
847 sem_destroy(&g_done_sem[mtcp->ctx->cpu]);
848 return;
849 }
850 /*----------------------------------------------------------------------------*/
851 static void
RunMainLoop(struct mtcp_thread_context * ctx)852 RunMainLoop(struct mtcp_thread_context *ctx)
853 {
854 mtcp_manager_t mtcp = ctx->mtcp_manager;
855 int i;
856 int recv_cnt;
857 int rx_inf, tx_inf;
858 struct timeval cur_ts = {0};
859 uint32_t ts, ts_prev;
860
861 #if TIME_STAT
862 struct timeval prev_ts, processing_ts, tcheck_ts,
863 epoll_ts, handle_ts, xmit_ts, select_ts;
864 #endif
865 int thresh;
866
867 gettimeofday(&cur_ts, NULL);
868
869 TRACE_DBG("CPU %d: mtcp thread running.\n", ctx->cpu);
870
871 #if TIME_STAT
872 prev_ts = cur_ts;
873 InitStatCounter(&mtcp->rtstat.round);
874 InitStatCounter(&mtcp->rtstat.processing);
875 InitStatCounter(&mtcp->rtstat.tcheck);
876 InitStatCounter(&mtcp->rtstat.epoll);
877 InitStatCounter(&mtcp->rtstat.handle);
878 InitStatCounter(&mtcp->rtstat.xmit);
879 InitStatCounter(&mtcp->rtstat.select);
880 #endif
881
882 ts = ts_prev = 0;
883 while ((!ctx->done || mtcp->flow_cnt) && !ctx->exit) {
884
885 STAT_COUNT(mtcp->runstat.rounds);
886 recv_cnt = 0;
887 gettimeofday(&cur_ts, NULL);
888 #if TIME_STAT
889 /* measure the inter-round delay */
890 UpdateStatCounter(&mtcp->rtstat.round, TimeDiffUs(&cur_ts, &prev_ts));
891 prev_ts = cur_ts;
892 #endif
893
894 ts = TIMEVAL_TO_TS(&cur_ts);
895 mtcp->cur_ts = ts;
896
897 for (rx_inf = 0; rx_inf < g_config.mos->netdev_table->num; rx_inf++) {
898
899 recv_cnt = mtcp->iom->recv_pkts(ctx, rx_inf);
900 STAT_COUNT(mtcp->runstat.rounds_rx_try);
901
902 for (i = 0; i < recv_cnt; i++) {
903 uint16_t len;
904 uint8_t *pktbuf;
905 pktbuf = mtcp->iom->get_rptr(mtcp->ctx, rx_inf, i, &len);
906 ProcessPacket(mtcp, rx_inf, i, ts, pktbuf, len);
907 }
908
909 }
910 STAT_COUNT(mtcp->runstat.rounds_rx);
911
912 #if TIME_STAT
913 gettimeofday(&processing_ts, NULL);
914 UpdateStatCounter(&mtcp->rtstat.processing,
915 TimeDiffUs(&processing_ts, &cur_ts));
916 #endif /* TIME_STAT */
917
918 /* Handle user defined timeout */
919 struct timer *walk, *tmp;
920 for (walk = TAILQ_FIRST(&mtcp->timer_list); walk != NULL; walk = tmp) {
921 tmp = TAILQ_NEXT(walk, timer_link);
922 if (TIMEVAL_LT(&cur_ts, &walk->exp))
923 break;
924
925 struct mtcp_context mctx = {.cpu = ctx->cpu};
926 walk->cb(&mctx, walk->id, 0, 0 /* FIXME */, NULL);
927 DelTimer(mtcp, walk);
928 }
929
930 /* interaction with application */
931 if (mtcp->flow_cnt > 0) {
932
933 /* check retransmission timeout and timewait expire */
934 #if 0
935 thresh = (int)mtcp->flow_cnt / (TS_TO_USEC(PER_STREAM_TCHECK));
936 assert(thresh >= 0);
937 if (thresh == 0)
938 thresh = 1;
939 if (recv_cnt > 0 && thresh > recv_cnt)
940 thresh = recv_cnt;
941 #else
942 thresh = g_config.mos->max_concurrency;
943 #endif
944
945 /* Eunyoung, you may fix this later
946 * if there is no rcv packet, we will send as much as possible
947 */
948 if (thresh == -1)
949 thresh = g_config.mos->max_concurrency;
950
951 CheckRtmTimeout(mtcp, ts, thresh);
952 CheckTimewaitExpire(mtcp, ts, thresh);
953
954 if (g_config.mos->tcp_timeout > 0 && ts != ts_prev) {
955 CheckConnectionTimeout(mtcp, ts, thresh);
956 }
957
958 #if TIME_STAT
959 }
960 gettimeofday(&tcheck_ts, NULL);
961 UpdateStatCounter(&mtcp->rtstat.tcheck,
962 TimeDiffUs(&tcheck_ts, &processing_ts));
963
964 if (mtcp->flow_cnt > 0) {
965 #endif /* TIME_STAT */
966
967 }
968
969 /*
970 * before flushing epoll events, call monitor events for
971 * all registered `read` events
972 */
973 if (mtcp->num_msp > 0)
974 /* call this when only a standalone monitor is running */
975 FlushMonitorReadEvents(mtcp);
976
977 /* if epoll is in use, flush all the queued events */
978 if (mtcp->ep) {
979 FlushBufferedReadEvents(mtcp);
980 FlushEpollEvents(mtcp, ts);
981 }
982 #if TIME_STAT
983 gettimeofday(&epoll_ts, NULL);
984 UpdateStatCounter(&mtcp->rtstat.epoll,
985 TimeDiffUs(&epoll_ts, &tcheck_ts));
986 #endif /* TIME_STAT */
987
988 if (end_app_exists && mtcp->flow_cnt > 0) {
989 /* handle stream queues */
990 HandleApplicationCalls(mtcp, ts);
991 }
992
993 #if TIME_STAT
994 gettimeofday(&handle_ts, NULL);
995 UpdateStatCounter(&mtcp->rtstat.handle,
996 TimeDiffUs(&handle_ts, &epoll_ts));
997 #endif /* TIME_STAT */
998
999 WritePacketsToChunks(mtcp, ts);
1000
1001 /* send packets from write buffer */
1002 /* Send until tx is available */
1003 int num_dev = g_config.mos->netdev_table->num;
1004 if (likely(mtcp->iom->send_pkts != NULL))
1005 for (tx_inf = 0; tx_inf < num_dev; tx_inf++) {
1006 mtcp->iom->send_pkts(ctx, tx_inf);
1007 }
1008
1009 #if TIME_STAT
1010 gettimeofday(&xmit_ts, NULL);
1011 UpdateStatCounter(&mtcp->rtstat.xmit,
1012 TimeDiffUs(&xmit_ts, &handle_ts));
1013 #endif /* TIME_STAT */
1014
1015 if (ts != ts_prev) {
1016 ts_prev = ts;
1017 #ifdef NETSTAT
1018 if (ctx->cpu == printer) {
1019 #ifdef RUN_ARP
1020 ARPTimer(mtcp, ts);
1021 #endif
1022 #ifdef NETSTAT
1023 PrintNetworkStats(mtcp, ts);
1024 #endif
1025 }
1026 #endif /* NETSTAT */
1027 }
1028
1029 if (mtcp->iom->select)
1030 mtcp->iom->select(ctx);
1031
1032 if (ctx->interrupt) {
1033 InterruptApplication(mtcp);
1034 }
1035 }
1036
1037 #if TESTING
1038 DestroyRemainingFlows(mtcp);
1039 #endif
1040
1041 TRACE_DBG("MTCP thread %d out of main loop.\n", ctx->cpu);
1042 /* flush logs */
1043 flush_log_data(mtcp);
1044 TRACE_DBG("MTCP thread %d flushed logs.\n", ctx->cpu);
1045 InterruptApplication(mtcp);
1046 TRACE_INFO("MTCP thread %d finished.\n", ctx->cpu);
1047 }
1048 /*----------------------------------------------------------------------------*/
1049 struct mtcp_sender *
CreateMTCPSender(int ifidx)1050 CreateMTCPSender(int ifidx)
1051 {
1052 struct mtcp_sender *sender;
1053
1054 sender = (struct mtcp_sender *)calloc(1, sizeof(struct mtcp_sender));
1055 if (!sender) {
1056 return NULL;
1057 }
1058
1059 sender->ifidx = ifidx;
1060
1061 TAILQ_INIT(&sender->control_list);
1062 TAILQ_INIT(&sender->send_list);
1063 TAILQ_INIT(&sender->ack_list);
1064
1065 sender->control_list_cnt = 0;
1066 sender->send_list_cnt = 0;
1067 sender->ack_list_cnt = 0;
1068
1069 return sender;
1070 }
1071 /*----------------------------------------------------------------------------*/
1072 void
DestroyMTCPSender(struct mtcp_sender * sender)1073 DestroyMTCPSender(struct mtcp_sender *sender)
1074 {
1075 free(sender);
1076 }
1077 /*----------------------------------------------------------------------------*/
1078 static mtcp_manager_t
InitializeMTCPManager(struct mtcp_thread_context * ctx)1079 InitializeMTCPManager(struct mtcp_thread_context* ctx)
1080 {
1081 mtcp_manager_t mtcp;
1082 char log_name[MAX_FILE_NAME];
1083 int i;
1084
1085 posix_seq_srand((unsigned)pthread_self());
1086
1087 mtcp = (mtcp_manager_t)calloc(1, sizeof(struct mtcp_manager));
1088 if (!mtcp) {
1089 perror("malloc");
1090 TRACE_ERROR("Failed to allocate mtcp_manager.\n");
1091 return NULL;
1092 }
1093 g_mtcp[ctx->cpu] = mtcp;
1094
1095 mtcp->tcp_flow_table = CreateHashtable();
1096 if (!mtcp->tcp_flow_table) {
1097 CTRACE_ERROR("Falied to allocate tcp flow table.\n");
1098 return NULL;
1099 }
1100
1101 #ifdef HUGEPAGE
1102 #define IS_HUGEPAGE 1
1103 #else
1104 #define IS_HUGEPAGE 0
1105 #endif
1106 if (mon_app_exists) {
1107 /* initialize event callback */
1108 #ifdef NEWEV
1109 InitEvent(mtcp);
1110 #else
1111 InitEvent(mtcp, NUM_EV_TABLE);
1112 #endif
1113 }
1114
1115 if (!(mtcp->bufseg_pool = MPCreate(sizeof(tcpbufseg_t),
1116 sizeof(tcpbufseg_t) * g_config.mos->max_concurrency *
1117 ((g_config.mos->rmem_size - 1) / UNITBUFSIZE + 1), 0))) {
1118 TRACE_ERROR("Failed to allocate ev_table pool\n");
1119 exit(0);
1120 }
1121 if (!(mtcp->sockent_pool = MPCreate(sizeof(struct sockent),
1122 sizeof(struct sockent) * g_config.mos->max_concurrency * 3, 0))) {
1123 TRACE_ERROR("Failed to allocate ev_table pool\n");
1124 exit(0);
1125 }
1126 #ifdef USE_TIMER_POOL
1127 if (!(mtcp->timer_pool = MPCreate(sizeof(struct timer),
1128 sizeof(struct timer) * g_config.mos->max_concurrency * 10, 0))) {
1129 TRACE_ERROR("Failed to allocate ev_table pool\n");
1130 exit(0);
1131 }
1132 #endif
1133 mtcp->flow_pool = MPCreate(sizeof(tcp_stream),
1134 sizeof(tcp_stream) * g_config.mos->max_concurrency, IS_HUGEPAGE);
1135 if (!mtcp->flow_pool) {
1136 CTRACE_ERROR("Failed to allocate tcp flow pool.\n");
1137 return NULL;
1138 }
1139 mtcp->rv_pool = MPCreate(sizeof(struct tcp_recv_vars),
1140 sizeof(struct tcp_recv_vars) * g_config.mos->max_concurrency, IS_HUGEPAGE);
1141 if (!mtcp->rv_pool) {
1142 CTRACE_ERROR("Failed to allocate tcp recv variable pool.\n");
1143 return NULL;
1144 }
1145 mtcp->sv_pool = MPCreate(sizeof(struct tcp_send_vars),
1146 sizeof(struct tcp_send_vars) * g_config.mos->max_concurrency, IS_HUGEPAGE);
1147 if (!mtcp->sv_pool) {
1148 CTRACE_ERROR("Failed to allocate tcp send variable pool.\n");
1149 return NULL;
1150 }
1151
1152 mtcp->rbm_snd = SBManagerCreate(g_config.mos->wmem_size, g_config.mos->no_ring_buffers,
1153 g_config.mos->max_concurrency);
1154 if (!mtcp->rbm_snd) {
1155 CTRACE_ERROR("Failed to create send ring buffer.\n");
1156 return NULL;
1157 }
1158
1159 mtcp->smap = (socket_map_t)calloc(g_config.mos->max_concurrency, sizeof(struct socket_map));
1160 if (!mtcp->smap) {
1161 perror("calloc");
1162 CTRACE_ERROR("Failed to allocate memory for stream map.\n");
1163 return NULL;
1164 }
1165
1166 if (mon_app_exists) {
1167 mtcp->msmap = (socket_map_t)calloc(g_config.mos->max_concurrency, sizeof(struct socket_map));
1168 if (!mtcp->msmap) {
1169 perror("calloc");
1170 CTRACE_ERROR("Failed to allocate memory for monitor stream map.\n");
1171 return NULL;
1172 }
1173
1174 for (i = 0; i < g_config.mos->max_concurrency; i++) {
1175 mtcp->msmap[i].monitor_stream = calloc(1, sizeof(struct mon_stream));
1176 if (!mtcp->msmap[i].monitor_stream) {
1177 perror("calloc");
1178 CTRACE_ERROR("Failed to allocate memory for monitr stream map\n");
1179 return NULL;
1180 }
1181 }
1182 }
1183
1184 TAILQ_INIT(&mtcp->timer_list);
1185 TAILQ_INIT(&mtcp->monitors);
1186
1187 TAILQ_INIT(&mtcp->free_smap);
1188 for (i = 0; i < g_config.mos->max_concurrency; i++) {
1189 mtcp->smap[i].id = i;
1190 mtcp->smap[i].socktype = MOS_SOCK_UNUSED;
1191 memset(&mtcp->smap[i].saddr, 0, sizeof(struct sockaddr_in));
1192 mtcp->smap[i].stream = NULL;
1193 TAILQ_INSERT_TAIL(&mtcp->free_smap, &mtcp->smap[i], link);
1194 }
1195
1196 if (mon_app_exists) {
1197 TAILQ_INIT(&mtcp->free_msmap);
1198 for (i = 0; i < g_config.mos->max_concurrency; i++) {
1199 mtcp->msmap[i].id = i;
1200 mtcp->msmap[i].socktype = MOS_SOCK_UNUSED;
1201 memset(&mtcp->msmap[i].saddr, 0, sizeof(struct sockaddr_in));
1202 TAILQ_INSERT_TAIL(&mtcp->free_msmap, &mtcp->msmap[i], link);
1203 }
1204 }
1205
1206 mtcp->ctx = ctx;
1207 mtcp->ep = NULL;
1208
1209 snprintf(log_name, MAX_FILE_NAME, "%s/"LOG_FILE_NAME"_%d",
1210 g_config.mos->mos_log, ctx->cpu);
1211 mtcp->log_fp = fopen(log_name, "w+");
1212 if (!mtcp->log_fp) {
1213 perror("fopen");
1214 CTRACE_ERROR("Failed to create file for logging. (%s)\n", log_name);
1215 return NULL;
1216 }
1217 mtcp->sp_fd = g_logctx[ctx->cpu]->pair_sp_fd;
1218 mtcp->logger = g_logctx[ctx->cpu];
1219
1220 mtcp->connectq = CreateStreamQueue(BACKLOG_SIZE);
1221 if (!mtcp->connectq) {
1222 CTRACE_ERROR("Failed to create connect queue.\n");
1223 return NULL;
1224 }
1225 mtcp->sendq = CreateStreamQueue(g_config.mos->max_concurrency);
1226 if (!mtcp->sendq) {
1227 CTRACE_ERROR("Failed to create send queue.\n");
1228 return NULL;
1229 }
1230 mtcp->ackq = CreateStreamQueue(g_config.mos->max_concurrency);
1231 if (!mtcp->ackq) {
1232 CTRACE_ERROR("Failed to create ack queue.\n");
1233 return NULL;
1234 }
1235 mtcp->closeq = CreateStreamQueue(g_config.mos->max_concurrency);
1236 if (!mtcp->closeq) {
1237 CTRACE_ERROR("Failed to create close queue.\n");
1238 return NULL;
1239 }
1240 mtcp->closeq_int = CreateInternalStreamQueue(g_config.mos->max_concurrency);
1241 if (!mtcp->closeq_int) {
1242 CTRACE_ERROR("Failed to create close queue.\n");
1243 return NULL;
1244 }
1245 mtcp->resetq = CreateStreamQueue(g_config.mos->max_concurrency);
1246 if (!mtcp->resetq) {
1247 CTRACE_ERROR("Failed to create reset queue.\n");
1248 return NULL;
1249 }
1250 mtcp->resetq_int = CreateInternalStreamQueue(g_config.mos->max_concurrency);
1251 if (!mtcp->resetq_int) {
1252 CTRACE_ERROR("Failed to create reset queue.\n");
1253 return NULL;
1254 }
1255 mtcp->destroyq = CreateStreamQueue(g_config.mos->max_concurrency);
1256 if (!mtcp->destroyq) {
1257 CTRACE_ERROR("Failed to create destroy queue.\n");
1258 return NULL;
1259 }
1260
1261 mtcp->g_sender = CreateMTCPSender(-1);
1262 if (!mtcp->g_sender) {
1263 CTRACE_ERROR("Failed to create global sender structure.\n");
1264 return NULL;
1265 }
1266 for (i = 0; i < g_config.mos->netdev_table->num; i++) {
1267 mtcp->n_sender[i] = CreateMTCPSender(i);
1268 if (!mtcp->n_sender[i]) {
1269 CTRACE_ERROR("Failed to create per-nic sender structure.\n");
1270 return NULL;
1271 }
1272 }
1273
1274 mtcp->rto_store = InitRTOHashstore();
1275 TAILQ_INIT(&mtcp->timewait_list);
1276 TAILQ_INIT(&mtcp->timeout_list);
1277
1278 return mtcp;
1279 }
1280 /*----------------------------------------------------------------------------*/
1281 static void *
MTCPRunThread(void * arg)1282 MTCPRunThread(void *arg)
1283 {
1284 mctx_t mctx = (mctx_t)arg;
1285 int cpu = mctx->cpu;
1286 int working;
1287 struct mtcp_manager *mtcp;
1288 struct mtcp_thread_context *ctx;
1289
1290 /* affinitize the thread to this core first */
1291 mtcp_core_affinitize(cpu);
1292
1293 /* memory alloc after core affinitization would use local memory
1294 most time */
1295 ctx = calloc(1, sizeof(*ctx));
1296 if (!ctx) {
1297 perror("calloc");
1298 TRACE_ERROR("Failed to calloc mtcp context.\n");
1299 exit(-1);
1300 }
1301 ctx->thread = pthread_self();
1302 ctx->cpu = cpu;
1303 mtcp = ctx->mtcp_manager = InitializeMTCPManager(ctx);
1304 if (!mtcp) {
1305 TRACE_ERROR("Failed to initialize mtcp manager.\n");
1306 exit(-1);
1307 }
1308
1309 /* assign mtcp context's underlying I/O module */
1310 mtcp->iom = current_iomodule_func;
1311
1312 /* I/O initializing */
1313 if (mtcp->iom->init_handle)
1314 mtcp->iom->init_handle(ctx);
1315
1316 if (pthread_mutex_init(&ctx->flow_pool_lock, NULL)) {
1317 perror("pthread_mutex_init of ctx->flow_pool_lock\n");
1318 exit(-1);
1319 }
1320
1321 if (pthread_mutex_init(&ctx->socket_pool_lock, NULL)) {
1322 perror("pthread_mutex_init of ctx->socket_pool_lock\n");
1323 exit(-1);
1324 }
1325
1326 SQ_LOCK_INIT(&ctx->connect_lock, "ctx->connect_lock", exit(-1));
1327 SQ_LOCK_INIT(&ctx->close_lock, "ctx->close_lock", exit(-1));
1328 SQ_LOCK_INIT(&ctx->reset_lock, "ctx->reset_lock", exit(-1));
1329 SQ_LOCK_INIT(&ctx->sendq_lock, "ctx->sendq_lock", exit(-1));
1330 SQ_LOCK_INIT(&ctx->ackq_lock, "ctx->ackq_lock", exit(-1));
1331 SQ_LOCK_INIT(&ctx->destroyq_lock, "ctx->destroyq_lock", exit(-1));
1332
1333 /* remember this context pointer for signal processing */
1334 g_pctx[cpu] = ctx;
1335 mlockall(MCL_CURRENT);
1336
1337 // attach (nic device, queue)
1338 working = AttachDevice(ctx);
1339 if (working != 0) {
1340 sem_post(&g_init_sem[ctx->cpu]);
1341 TRACE_DBG("MTCP thread %d finished. Not attached any device\n", ctx->cpu);
1342 pthread_exit(NULL);
1343 }
1344
1345 TRACE_DBG("CPU %d: initialization finished.\n", cpu);
1346 sem_post(&g_init_sem[ctx->cpu]);
1347
1348 /* start the main loop */
1349 RunMainLoop(ctx);
1350
1351 TRACE_DBG("MTCP thread %d finished.\n", ctx->cpu);
1352
1353 /* signaling mTCP thread is done */
1354 sem_post(&g_done_sem[mctx->cpu]);
1355
1356 //pthread_exit(NULL);
1357 return 0;
1358 }
1359 /*----------------------------------------------------------------------------*/
1360 #ifdef ENABLE_DPDK
MTCPDPDKRunThread(void * arg)1361 static int MTCPDPDKRunThread(void *arg)
1362 {
1363 MTCPRunThread(arg);
1364 return 0;
1365 }
1366 #endif /* !ENABLE_DPDK */
1367 /*----------------------------------------------------------------------------*/
1368 mctx_t
mtcp_create_context(int cpu)1369 mtcp_create_context(int cpu)
1370 {
1371 mctx_t mctx;
1372 int ret;
1373
1374 if (cpu >= g_config.mos->num_cores) {
1375 TRACE_ERROR("Failed initialize new mtcp context. "
1376 "Requested cpu id %d exceed the number of cores %d configured to use.\n",
1377 cpu, g_config.mos->num_cores);
1378 return NULL;
1379 }
1380
1381 /* check if mtcp_create_context() was already initialized */
1382 if (g_logctx[cpu] != NULL) {
1383 TRACE_ERROR("%s was already initialized before!\n",
1384 __FUNCTION__);
1385 return NULL;
1386 }
1387
1388 ret = sem_init(&g_init_sem[cpu], 0, 0);
1389 if (ret) {
1390 TRACE_ERROR("Failed initialize init_sem.\n");
1391 return NULL;
1392 }
1393
1394 ret = sem_init(&g_done_sem[cpu], 0, 0);
1395 if (ret) {
1396 TRACE_ERROR("Failed initialize done_sem.\n");
1397 return NULL;
1398 }
1399
1400 mctx = (mctx_t)calloc(1, sizeof(struct mtcp_context));
1401 if (!mctx) {
1402 TRACE_ERROR("Failed to allocate memory for mtcp_context.\n");
1403 return NULL;
1404 }
1405 mctx->cpu = cpu;
1406 g_ctx[cpu] = mctx;
1407
1408 /* initialize logger */
1409 g_logctx[cpu] = (struct log_thread_context *)
1410 calloc(1, sizeof(struct log_thread_context));
1411 if (!g_logctx[cpu]) {
1412 perror("malloc");
1413 TRACE_ERROR("Failed to allocate memory for log thread context.\n");
1414 return NULL;
1415 }
1416 InitLogThreadContext(g_logctx[cpu], cpu);
1417 if (pthread_create(&log_thread[cpu],
1418 NULL, ThreadLogMain, (void *)g_logctx[cpu])) {
1419 perror("pthread_create");
1420 TRACE_ERROR("Failed to create log thread\n");
1421 return NULL;
1422 }
1423
1424 /* use rte_eal_remote_launch() for DPDK
1425 (worker/slave threads are already initialized by rte_eal_init()) */
1426 #ifdef ENABLE_DPDK
1427 /* Wake up mTCP threads (wake up I/O threads) */
1428 if (current_iomodule_func == &dpdk_module_func) {
1429 int master;
1430 master = rte_get_master_lcore();
1431 if (master == cpu) {
1432 lcore_config[master].ret = 0;
1433 lcore_config[master].state = FINISHED;
1434 if (pthread_create(&g_thread[cpu],
1435 NULL, MTCPRunThread, (void *)mctx) != 0) {
1436 TRACE_ERROR("pthread_create of mtcp thread failed!\n");
1437 return NULL;
1438 }
1439 } else
1440 rte_eal_remote_launch(MTCPDPDKRunThread, mctx, cpu);
1441 } else
1442 #endif /* !ENABLE_DPDK */
1443 {
1444 if (pthread_create(&g_thread[cpu],
1445 NULL, MTCPRunThread, (void *)mctx) != 0) {
1446 TRACE_ERROR("pthread_create of mtcp thread failed!\n");
1447 return NULL;
1448 }
1449 }
1450
1451 sem_wait(&g_init_sem[cpu]);
1452 sem_destroy(&g_init_sem[cpu]);
1453
1454 running[cpu] = TRUE;
1455
1456 #ifdef NETSTAT
1457 #if NETSTAT_TOTAL
1458 if (printer < 0) {
1459 printer = cpu;
1460 TRACE_INFO("CPU %d is in charge of printing stats.\n", printer);
1461 }
1462 #endif
1463 #endif
1464
1465 return mctx;
1466 }
1467 /*----------------------------------------------------------------------------*/
1468 int
mtcp_destroy_context(mctx_t mctx)1469 mtcp_destroy_context(mctx_t mctx)
1470 {
1471 struct mtcp_thread_context *ctx = g_pctx[mctx->cpu];
1472 if (ctx != NULL)
1473 ctx->done = 1;
1474
1475 struct mtcp_context m;
1476 m.cpu = mctx->cpu;
1477 mtcp_free_context(&m);
1478
1479 free(mctx);
1480
1481 return 0;
1482 }
1483 /*----------------------------------------------------------------------------*/
1484 void
mtcp_free_context(mctx_t mctx)1485 mtcp_free_context(mctx_t mctx)
1486 {
1487 struct mtcp_thread_context *ctx = g_pctx[mctx->cpu];
1488 struct mtcp_manager *mtcp = ctx->mtcp_manager;
1489 struct log_thread_context *log_ctx = mtcp->logger;
1490 int ret, i;
1491
1492 TRACE_DBG("CPU %d: mtcp_free_context()\n", mctx->cpu);
1493
1494 if (g_pctx[mctx->cpu] == NULL) return;
1495
1496 /* close all stream sockets that are still open */
1497 if (!ctx->exit) {
1498 for (i = 0; i < g_config.mos->max_concurrency; i++) {
1499 if (mtcp->smap[i].socktype == MOS_SOCK_STREAM) {
1500 TRACE_DBG("Closing remaining socket %d (%s)\n",
1501 i, TCPStateToString(mtcp->smap[i].stream));
1502 #ifdef DUMP_STREAM
1503 DumpStream(mtcp, mtcp->smap[i].stream);
1504 #endif
1505 mtcp_close(mctx, i);
1506 }
1507 }
1508 }
1509
1510 ctx->done = 1;
1511 ctx->exit = 1;
1512
1513 #ifdef ENABLE_DPDK
1514 if (current_iomodule_func == &dpdk_module_func) {
1515 int master = rte_get_master_lcore();
1516 if (master == mctx->cpu)
1517 pthread_join(g_thread[mctx->cpu], NULL);
1518 else
1519 rte_eal_wait_lcore(mctx->cpu);
1520 } else
1521 #endif /* !ENABLE_DPDK */
1522 {
1523 pthread_join(g_thread[mctx->cpu], NULL);
1524 }
1525
1526 TRACE_INFO("MTCP thread %d joined.\n", mctx->cpu);
1527
1528 running[mctx->cpu] = FALSE;
1529
1530 #ifdef NETSTAT
1531 #if NETSTAT_TOTAL
1532 if (printer == mctx->cpu) {
1533 for (i = 0; i < num_cpus; i++) {
1534 if (i != mctx->cpu && running[i]) {
1535 printer = i;
1536 break;
1537 }
1538 }
1539 }
1540 #endif
1541 #endif
1542
1543 log_ctx->done = 1;
1544 ret = write(log_ctx->pair_sp_fd, "F", 1);
1545 if (ret != 1)
1546 TRACE_ERROR("CPU %d: Fail to signal socket pair\n", mctx->cpu);
1547
1548 if ((ret = pthread_join(log_thread[ctx->cpu], NULL) != 0)) {
1549 TRACE_ERROR("pthread_join() returns error (errno = %s)\n", strerror(ret));
1550 exit(-1);
1551 }
1552
1553
1554 fclose(mtcp->log_fp);
1555 TRACE_LOG("Log thread %d joined.\n", mctx->cpu);
1556
1557 if (mtcp->connectq) {
1558 DestroyStreamQueue(mtcp->connectq);
1559 mtcp->connectq = NULL;
1560 }
1561 if (mtcp->sendq) {
1562 DestroyStreamQueue(mtcp->sendq);
1563 mtcp->sendq = NULL;
1564 }
1565 if (mtcp->ackq) {
1566 DestroyStreamQueue(mtcp->ackq);
1567 mtcp->ackq = NULL;
1568 }
1569 if (mtcp->closeq) {
1570 DestroyStreamQueue(mtcp->closeq);
1571 mtcp->closeq = NULL;
1572 }
1573 if (mtcp->closeq_int) {
1574 DestroyInternalStreamQueue(mtcp->closeq_int);
1575 mtcp->closeq_int = NULL;
1576 }
1577 if (mtcp->resetq) {
1578 DestroyStreamQueue(mtcp->resetq);
1579 mtcp->resetq = NULL;
1580 }
1581 if (mtcp->resetq_int) {
1582 DestroyInternalStreamQueue(mtcp->resetq_int);
1583 mtcp->resetq_int = NULL;
1584 }
1585 if (mtcp->destroyq) {
1586 DestroyStreamQueue(mtcp->destroyq);
1587 mtcp->destroyq = NULL;
1588 }
1589
1590 DestroyMTCPSender(mtcp->g_sender);
1591 for (i = 0; i < g_config.mos->netdev_table->num; i++) {
1592 DestroyMTCPSender(mtcp->n_sender[i]);
1593 }
1594
1595 MPDestroy(mtcp->rv_pool);
1596 MPDestroy(mtcp->sv_pool);
1597 MPDestroy(mtcp->flow_pool);
1598
1599 if (mtcp->ap) {
1600 DestroyAddressPool(mtcp->ap);
1601 mtcp->ap = NULL;
1602 }
1603
1604 SQ_LOCK_DESTROY(&ctx->connect_lock);
1605 SQ_LOCK_DESTROY(&ctx->close_lock);
1606 SQ_LOCK_DESTROY(&ctx->reset_lock);
1607 SQ_LOCK_DESTROY(&ctx->sendq_lock);
1608 SQ_LOCK_DESTROY(&ctx->ackq_lock);
1609 SQ_LOCK_DESTROY(&ctx->destroyq_lock);
1610
1611 //TRACE_INFO("MTCP thread %d destroyed.\n", mctx->cpu);
1612 if (mtcp->iom->destroy_handle)
1613 mtcp->iom->destroy_handle(ctx);
1614 if (g_logctx[mctx->cpu]) {
1615 free(g_logctx[mctx->cpu]);
1616 g_logctx[mctx->cpu] = NULL;
1617 }
1618 free(ctx);
1619 g_pctx[mctx->cpu] = NULL;
1620 }
1621 /*----------------------------------------------------------------------------*/
1622 mtcp_sighandler_t
mtcp_register_signal(int signum,mtcp_sighandler_t handler)1623 mtcp_register_signal(int signum, mtcp_sighandler_t handler)
1624 {
1625 mtcp_sighandler_t prev;
1626
1627 if (signum == SIGINT) {
1628 prev = app_signal_handler;
1629 app_signal_handler = handler;
1630 } else {
1631 if ((prev = signal(signum, handler)) == SIG_ERR) {
1632 perror("signal");
1633 return SIG_ERR;
1634 }
1635 }
1636
1637 return prev;
1638 }
1639 /*----------------------------------------------------------------------------*/
1640 int
mtcp_getconf(struct mtcp_conf * conf)1641 mtcp_getconf(struct mtcp_conf *conf)
1642 {
1643 int i, j;
1644
1645 if (!conf) {
1646 errno = EINVAL;
1647 return -1;
1648 }
1649
1650 conf->num_cores = g_config.mos->num_cores;
1651 conf->max_concurrency = g_config.mos->max_concurrency;
1652 conf->cpu_mask = g_config.mos->cpu_mask;
1653
1654 conf->rcvbuf_size = g_config.mos->rmem_size;
1655 conf->sndbuf_size = g_config.mos->wmem_size;
1656
1657 conf->tcp_timewait = g_config.mos->tcp_tw_interval;
1658 conf->tcp_timeout = g_config.mos->tcp_timeout;
1659
1660 i = 0;
1661 struct conf_block *bwalk;
1662 TAILQ_FOREACH(bwalk, &g_config.app_blkh, link) {
1663 struct app_conf *app_conf = (struct app_conf *)bwalk->conf;
1664 for (j = 0; j < app_conf->app_argc; j++)
1665 conf->app_argv[i][j] = app_conf->app_argv[j];
1666 conf->app_argc[i] = app_conf->app_argc;
1667 conf->app_cpu_mask[i] = app_conf->cpu_mask;
1668 i++;
1669 }
1670 conf->num_app = i;
1671
1672 return 0;
1673 }
1674 /*----------------------------------------------------------------------------*/
1675 int
mtcp_setconf(const struct mtcp_conf * conf)1676 mtcp_setconf(const struct mtcp_conf *conf)
1677 {
1678 if (!conf)
1679 return -1;
1680
1681 g_config.mos->num_cores = conf->num_cores;
1682 g_config.mos->max_concurrency = conf->max_concurrency;
1683
1684 g_config.mos->rmem_size = conf->rcvbuf_size;
1685 g_config.mos->wmem_size = conf->sndbuf_size;
1686
1687 g_config.mos->tcp_tw_interval = conf->tcp_timewait;
1688 g_config.mos->tcp_timeout = conf->tcp_timeout;
1689
1690 TRACE_CONFIG("Configuration updated by mtcp_setconf().\n");
1691 //PrintConfiguration();
1692
1693 return 0;
1694 }
1695 /*----------------------------------------------------------------------------*/
1696 int
mtcp_init(const char * config_file)1697 mtcp_init(const char *config_file)
1698 {
1699 int i;
1700 int ret;
1701
1702 if (geteuid()) {
1703 TRACE_CONFIG("[CAUTION] Run as root if mlock is necessary.\n");
1704 #if defined(ENABLE_DPDK) || defined(ENABLE_NETMAP)
1705 TRACE_CONFIG("[CAUTION] Run the app as root!\n");
1706 exit(EXIT_FAILURE);
1707 #endif
1708 }
1709
1710 /* getting cpu and NIC */
1711 num_cpus = GetNumCPUs();
1712 assert(num_cpus >= 1);
1713 for (i = 0; i < num_cpus; i++) {
1714 g_mtcp[i] = NULL;
1715 running[i] = FALSE;
1716 sigint_cnt[i] = 0;
1717 }
1718
1719 ret = LoadConfigurationUpperHalf(config_file);
1720 if (ret) {
1721 TRACE_CONFIG("Error occured while loading configuration.\n");
1722 return -1;
1723 }
1724
1725 #if defined(ENABLE_PSIO)
1726 current_iomodule_func = &ps_module_func;
1727 #elif defined(ENABLE_DPDK)
1728 current_iomodule_func = &dpdk_module_func;
1729 #elif defined(ENABLE_PCAP)
1730 current_iomodule_func = &pcap_module_func;
1731 #elif defined(ENABLE_NETMAP)
1732 current_iomodule_func = &netmap_module_func;
1733 #endif
1734
1735 if (current_iomodule_func->load_module_upper_half)
1736 current_iomodule_func->load_module_upper_half();
1737
1738 LoadConfigurationLowerHalf();
1739
1740 //PrintConfiguration();
1741
1742 for (i = 0; i < g_config.mos->netdev_table->num; i++) {
1743 ap[i] = CreateAddressPool(g_config.mos->netdev_table->ent[i]->ip_addr, 1);
1744 if (!ap[i]) {
1745 TRACE_CONFIG("Error occured while create address pool[%d]\n",
1746 i);
1747 return -1;
1748 }
1749 }
1750
1751 //PrintInterfaceInfo();
1752 //PrintRoutingTable();
1753 //PrintARPTable();
1754 InitARPTable();
1755
1756 if (signal(SIGUSR1, HandleSignal) == SIG_ERR) {
1757 perror("signal, SIGUSR1");
1758 return -1;
1759 }
1760 if (signal(SIGINT, HandleSignal) == SIG_ERR) {
1761 perror("signal, SIGINT");
1762 return -1;
1763 }
1764 app_signal_handler = NULL;
1765
1766 printf("load_module(): %p\n", current_iomodule_func);
1767 /* load system-wide io module specs */
1768 if (current_iomodule_func->load_module_lower_half)
1769 current_iomodule_func->load_module_lower_half();
1770
1771 GlobInitEvent();
1772
1773 PrintConf(&g_config);
1774
1775 return 0;
1776 }
1777 /*----------------------------------------------------------------------------*/
1778 int
mtcp_destroy()1779 mtcp_destroy()
1780 {
1781 int i;
1782
1783 /* wait until all threads are closed */
1784 /*
1785 for (i = 0; i < num_cpus; i++) {
1786 if (running[i]) {
1787 if (pthread_join(g_thread[i], NULL) != 0)
1788 return -1;
1789 }
1790 }
1791 */
1792
1793 for (i = 0; i < g_config.mos->netdev_table->num; i++)
1794 DestroyAddressPool(ap[i]);
1795
1796 TRACE_INFO("All MTCP threads are joined.\n");
1797
1798 return 0;
1799 }
1800 /*----------------------------------------------------------------------------*/
1801