xref: /mOS-networking-stack/core/src/logger.c (revision d270d183)
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <string.h>
4 #include <stdlib.h>
5 #include <assert.h>
6 #include <errno.h>
7 #include <sys/queue.h>
8 #include <sys/types.h>
9 #include <sys/socket.h>
10 #include <pthread.h>
11 #include "cpu.h"
12 #include "debug.h"
13 #include "logger.h"
14 
15 /*----------------------------------------------------------------------------*/
16 static void
17 EnqueueFreeBuffer(log_thread_context *ctx, log_buff *free_bp)
18 {
19 	pthread_mutex_lock(&ctx->free_mutex);
20 	TAILQ_INSERT_TAIL(&ctx->free_queue, free_bp, buff_link);
21 	ctx->free_buff_cnt++;
22 
23 	assert(ctx->free_buff_cnt <= NUM_LOG_BUFF);
24 	assert(ctx->free_buff_cnt + ctx->job_buff_cnt <= NUM_LOG_BUFF);
25 	pthread_mutex_unlock(&ctx->free_mutex);
26 }
27 /*----------------------------------------------------------------------------*/
28 log_buff*
29 DequeueFreeBuffer(log_thread_context *ctx)
30 {
31 	pthread_mutex_lock(&ctx->free_mutex);
32 	log_buff *free_bp = TAILQ_FIRST(&ctx->free_queue);
33 	if (free_bp) {
34 		TAILQ_REMOVE(&ctx->free_queue, free_bp, buff_link);
35 		ctx->free_buff_cnt--;
36 	}
37 
38 	assert(ctx->free_buff_cnt >= 0);
39 	assert(ctx->free_buff_cnt + ctx->job_buff_cnt <= NUM_LOG_BUFF);
40 	pthread_mutex_unlock(&ctx->free_mutex);
41 	return (free_bp);
42 }
43 /*----------------------------------------------------------------------------*/
44 void
45 EnqueueJobBuffer(log_thread_context *ctx, log_buff *working_bp)
46 {
47 	TAILQ_INSERT_TAIL(&ctx->working_queue, working_bp, buff_link);
48 	ctx->job_buff_cnt++;
49 	ctx->state = ACTIVE_LOGT;
50 	assert(ctx->job_buff_cnt <= NUM_LOG_BUFF);
51 	if (ctx->free_buff_cnt + ctx->job_buff_cnt > NUM_LOG_BUFF) {
52 		TRACE_ERROR("free_buff_cnt(%d) + job_buff_cnt(%d) > NUM_LOG_BUFF(%d)\n",
53 				ctx->free_buff_cnt, ctx->job_buff_cnt, NUM_LOG_BUFF);
54 	}
55 	assert(ctx->free_buff_cnt + ctx->job_buff_cnt <= NUM_LOG_BUFF);
56 }
57 /*----------------------------------------------------------------------------*/
58 static log_buff*
59 DequeueJobBuffer(log_thread_context *ctx)
60 {
61 	pthread_mutex_lock(&ctx->mutex);
62 	log_buff *working_bp = TAILQ_FIRST(&ctx->working_queue);
63 	if (working_bp) {
64 		TAILQ_REMOVE(&ctx->working_queue, working_bp, buff_link);
65 		ctx->job_buff_cnt--;
66 	} else {
67 		ctx->state = IDLE_LOGT;
68 	}
69 
70 	assert(ctx->job_buff_cnt >= 0);
71 	assert(ctx->free_buff_cnt + ctx->job_buff_cnt <= NUM_LOG_BUFF);
72 	pthread_mutex_unlock(&ctx->mutex);
73 	return (working_bp);
74 }
75 /*----------------------------------------------------------------------------*/
76 void
77 InitLogThreadContext(struct log_thread_context *ctx, int cpu)
78 {
79 	int i;
80 	int sv[2];
81 
82 	/* initialize log_thread_context */
83 	memset(ctx, 0, sizeof(struct log_thread_context));
84 	ctx->cpu = cpu;
85 	ctx->state = IDLE_LOGT;
86 	ctx->done = 0;
87 
88 	if (pipe(sv)) {
89 		fprintf(stderr, "pipe() failed, errno=%d, errstr=%s\n",
90 				errno, strerror(errno));
91 		exit(1);
92 	}
93 	ctx->sp_fd = sv[0];
94 	ctx->pair_sp_fd = sv[1];
95 
96 	pthread_mutex_init(&ctx->mutex, NULL);
97 	pthread_mutex_init(&ctx->free_mutex, NULL);
98 
99 	TAILQ_INIT(&ctx->working_queue);
100 	TAILQ_INIT(&ctx->free_queue);
101 
102 	/* initialize free log_buff */
103 	log_buff *w_buff = malloc(sizeof(log_buff) * NUM_LOG_BUFF);
104 	assert(w_buff);
105 	for (i = 0; i < NUM_LOG_BUFF; i++) {
106 		EnqueueFreeBuffer(ctx, &w_buff[i]);
107 	}
108 }
109 /*----------------------------------------------------------------------------*/
110 void *
111 ThreadLogMain(void* arg)
112 {
113 	size_t len;
114 	log_thread_context* ctx = (log_thread_context *) arg;
115 	log_buff* w_buff;
116 	int cnt;
117 
118 	mtcp_core_affinitize(ctx->cpu);
119 	//fprintf(stderr, "[CPU %d] Log thread created. thread: %lu\n",
120 	//		ctx->cpu, pthread_self());
121 
122 	TRACE_LOG("Log thread %d is starting.\n", ctx->cpu);
123 
124 	while (!ctx->done) {
125 		/* handle every jobs in job buffer*/
126 		cnt = 0;
127 		while ((w_buff = DequeueJobBuffer(ctx))){
128 			if (++cnt > NUM_LOG_BUFF) {
129 				TRACE_ERROR("CPU %d: Exceed NUM_LOG_BUFF %d.\n",
130 						ctx->cpu, cnt);
131 				break;
132 			}
133 			len = fwrite(w_buff->buff, 1, w_buff->buff_len, w_buff->fid);
134 			if (len != w_buff->buff_len) {
135 				TRACE_ERROR("CPU %d: Tried to write %d, but only write %ld\n",
136 						ctx->cpu, w_buff->buff_len, len);
137 			}
138 			//assert(len == w_buff->buff_len);
139 			EnqueueFreeBuffer(ctx, w_buff);
140 		}
141 
142 		/* */
143 		while (ctx->state == IDLE_LOGT && !ctx->done) {
144 			char temp[1];
145 			int ret = read(ctx->sp_fd, temp, 1);
146 			if (ret)
147 				break;
148 		}
149 	}
150 
151 	TRACE_LOG("Log thread %d out of first loop.\n", ctx->cpu);
152 	/* handle every jobs in job buffer*/
153 	cnt = 0;
154 	while ((w_buff = DequeueJobBuffer(ctx))){
155 		if (++cnt > NUM_LOG_BUFF) {
156 			TRACE_ERROR("CPU %d: "
157 					"Exceed NUM_LOG_BUFF %d in final loop.\n", ctx->cpu, cnt);
158 			break;
159 		}
160 		len = fwrite(w_buff->buff, 1, w_buff->buff_len, w_buff->fid);
161 		assert(len == w_buff->buff_len);
162 		EnqueueFreeBuffer(ctx, w_buff);
163 	}
164 
165 	TRACE_LOG("Log thread %d finished.\n", ctx->cpu);
166 	pthread_exit(NULL);
167 
168 	return NULL;
169 }
170 /*----------------------------------------------------------------------------*/
171