xref: /mOS-networking-stack/core/src/logger.c (revision 76404edc)
1*76404edcSAsim Jamshed #include <stdio.h>
2*76404edcSAsim Jamshed #include <unistd.h>
3*76404edcSAsim Jamshed #include <string.h>
4*76404edcSAsim Jamshed #include <stdlib.h>
5*76404edcSAsim Jamshed #include <assert.h>
6*76404edcSAsim Jamshed #include <errno.h>
7*76404edcSAsim Jamshed #include <sys/queue.h>
8*76404edcSAsim Jamshed #include <sys/types.h>
9*76404edcSAsim Jamshed #include <sys/socket.h>
10*76404edcSAsim Jamshed #include <pthread.h>
11*76404edcSAsim Jamshed #include "cpu.h"
12*76404edcSAsim Jamshed #include "debug.h"
13*76404edcSAsim Jamshed #include "logger.h"
14*76404edcSAsim Jamshed 
15*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
16*76404edcSAsim Jamshed static void
EnqueueFreeBuffer(log_thread_context * ctx,log_buff * free_bp)17*76404edcSAsim Jamshed EnqueueFreeBuffer(log_thread_context *ctx, log_buff *free_bp)
18*76404edcSAsim Jamshed {
19*76404edcSAsim Jamshed 	pthread_mutex_lock(&ctx->free_mutex);
20*76404edcSAsim Jamshed 	TAILQ_INSERT_TAIL(&ctx->free_queue, free_bp, buff_link);
21*76404edcSAsim Jamshed 	ctx->free_buff_cnt++;
22*76404edcSAsim Jamshed 
23*76404edcSAsim Jamshed 	assert(ctx->free_buff_cnt <= NUM_LOG_BUFF);
24*76404edcSAsim Jamshed 	assert(ctx->free_buff_cnt + ctx->job_buff_cnt <= NUM_LOG_BUFF);
25*76404edcSAsim Jamshed 	pthread_mutex_unlock(&ctx->free_mutex);
26*76404edcSAsim Jamshed }
27*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
28*76404edcSAsim Jamshed log_buff*
DequeueFreeBuffer(log_thread_context * ctx)29*76404edcSAsim Jamshed DequeueFreeBuffer(log_thread_context *ctx)
30*76404edcSAsim Jamshed {
31*76404edcSAsim Jamshed 	pthread_mutex_lock(&ctx->free_mutex);
32*76404edcSAsim Jamshed 	log_buff *free_bp = TAILQ_FIRST(&ctx->free_queue);
33*76404edcSAsim Jamshed 	if (free_bp) {
34*76404edcSAsim Jamshed 		TAILQ_REMOVE(&ctx->free_queue, free_bp, buff_link);
35*76404edcSAsim Jamshed 		ctx->free_buff_cnt--;
36*76404edcSAsim Jamshed 	}
37*76404edcSAsim Jamshed 
38*76404edcSAsim Jamshed 	assert(ctx->free_buff_cnt >= 0);
39*76404edcSAsim Jamshed 	assert(ctx->free_buff_cnt + ctx->job_buff_cnt <= NUM_LOG_BUFF);
40*76404edcSAsim Jamshed 	pthread_mutex_unlock(&ctx->free_mutex);
41*76404edcSAsim Jamshed 	return (free_bp);
42*76404edcSAsim Jamshed }
43*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
44*76404edcSAsim Jamshed void
EnqueueJobBuffer(log_thread_context * ctx,log_buff * working_bp)45*76404edcSAsim Jamshed EnqueueJobBuffer(log_thread_context *ctx, log_buff *working_bp)
46*76404edcSAsim Jamshed {
47*76404edcSAsim Jamshed 	TAILQ_INSERT_TAIL(&ctx->working_queue, working_bp, buff_link);
48*76404edcSAsim Jamshed 	ctx->job_buff_cnt++;
49*76404edcSAsim Jamshed 	ctx->state = ACTIVE_LOGT;
50*76404edcSAsim Jamshed 	assert(ctx->job_buff_cnt <= NUM_LOG_BUFF);
51*76404edcSAsim Jamshed 	if (ctx->free_buff_cnt + ctx->job_buff_cnt > NUM_LOG_BUFF) {
52*76404edcSAsim Jamshed 		TRACE_ERROR("free_buff_cnt(%d) + job_buff_cnt(%d) > NUM_LOG_BUFF(%d)\n",
53*76404edcSAsim Jamshed 				ctx->free_buff_cnt, ctx->job_buff_cnt, NUM_LOG_BUFF);
54*76404edcSAsim Jamshed 	}
55*76404edcSAsim Jamshed 	assert(ctx->free_buff_cnt + ctx->job_buff_cnt <= NUM_LOG_BUFF);
56*76404edcSAsim Jamshed }
57*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
58*76404edcSAsim Jamshed static log_buff*
DequeueJobBuffer(log_thread_context * ctx)59*76404edcSAsim Jamshed DequeueJobBuffer(log_thread_context *ctx)
60*76404edcSAsim Jamshed {
61*76404edcSAsim Jamshed 	pthread_mutex_lock(&ctx->mutex);
62*76404edcSAsim Jamshed 	log_buff *working_bp = TAILQ_FIRST(&ctx->working_queue);
63*76404edcSAsim Jamshed 	if (working_bp) {
64*76404edcSAsim Jamshed 		TAILQ_REMOVE(&ctx->working_queue, working_bp, buff_link);
65*76404edcSAsim Jamshed 		ctx->job_buff_cnt--;
66*76404edcSAsim Jamshed 	} else {
67*76404edcSAsim Jamshed 		ctx->state = IDLE_LOGT;
68*76404edcSAsim Jamshed 	}
69*76404edcSAsim Jamshed 
70*76404edcSAsim Jamshed 	assert(ctx->job_buff_cnt >= 0);
71*76404edcSAsim Jamshed 	assert(ctx->free_buff_cnt + ctx->job_buff_cnt <= NUM_LOG_BUFF);
72*76404edcSAsim Jamshed 	pthread_mutex_unlock(&ctx->mutex);
73*76404edcSAsim Jamshed 	return (working_bp);
74*76404edcSAsim Jamshed }
75*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
76*76404edcSAsim Jamshed void
InitLogThreadContext(struct log_thread_context * ctx,int cpu)77*76404edcSAsim Jamshed InitLogThreadContext(struct log_thread_context *ctx, int cpu)
78*76404edcSAsim Jamshed {
79*76404edcSAsim Jamshed 	int i;
80*76404edcSAsim Jamshed 	int sv[2];
81*76404edcSAsim Jamshed 
82*76404edcSAsim Jamshed 	/* initialize log_thread_context */
83*76404edcSAsim Jamshed 	memset(ctx, 0, sizeof(struct log_thread_context));
84*76404edcSAsim Jamshed 	ctx->cpu = cpu;
85*76404edcSAsim Jamshed 	ctx->state = IDLE_LOGT;
86*76404edcSAsim Jamshed 	ctx->done = 0;
87*76404edcSAsim Jamshed 
88*76404edcSAsim Jamshed 	if (pipe(sv)) {
89*76404edcSAsim Jamshed 		fprintf(stderr, "pipe() failed, errno=%d, errstr=%s\n",
90*76404edcSAsim Jamshed 				errno, strerror(errno));
91*76404edcSAsim Jamshed 		exit(1);
92*76404edcSAsim Jamshed 	}
93*76404edcSAsim Jamshed 	ctx->sp_fd = sv[0];
94*76404edcSAsim Jamshed 	ctx->pair_sp_fd = sv[1];
95*76404edcSAsim Jamshed 
96*76404edcSAsim Jamshed 	pthread_mutex_init(&ctx->mutex, NULL);
97*76404edcSAsim Jamshed 	pthread_mutex_init(&ctx->free_mutex, NULL);
98*76404edcSAsim Jamshed 
99*76404edcSAsim Jamshed 	TAILQ_INIT(&ctx->working_queue);
100*76404edcSAsim Jamshed 	TAILQ_INIT(&ctx->free_queue);
101*76404edcSAsim Jamshed 
102*76404edcSAsim Jamshed 	/* initialize free log_buff */
103*76404edcSAsim Jamshed 	log_buff *w_buff = malloc(sizeof(log_buff) * NUM_LOG_BUFF);
104*76404edcSAsim Jamshed 	assert(w_buff);
105*76404edcSAsim Jamshed 	for (i = 0; i < NUM_LOG_BUFF; i++) {
106*76404edcSAsim Jamshed 		EnqueueFreeBuffer(ctx, &w_buff[i]);
107*76404edcSAsim Jamshed 	}
108*76404edcSAsim Jamshed }
109*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
110*76404edcSAsim Jamshed void *
ThreadLogMain(void * arg)111*76404edcSAsim Jamshed ThreadLogMain(void* arg)
112*76404edcSAsim Jamshed {
113*76404edcSAsim Jamshed 	size_t len;
114*76404edcSAsim Jamshed 	log_thread_context* ctx = (log_thread_context *) arg;
115*76404edcSAsim Jamshed 	log_buff* w_buff;
116*76404edcSAsim Jamshed 	int cnt;
117*76404edcSAsim Jamshed 
118*76404edcSAsim Jamshed 	mtcp_core_affinitize(ctx->cpu);
119*76404edcSAsim Jamshed 	//fprintf(stderr, "[CPU %d] Log thread created. thread: %lu\n",
120*76404edcSAsim Jamshed 	//		ctx->cpu, pthread_self());
121*76404edcSAsim Jamshed 
122*76404edcSAsim Jamshed 	TRACE_LOG("Log thread %d is starting.\n", ctx->cpu);
123*76404edcSAsim Jamshed 
124*76404edcSAsim Jamshed 	while (!ctx->done) {
125*76404edcSAsim Jamshed 		/* handle every jobs in job buffer*/
126*76404edcSAsim Jamshed 		cnt = 0;
127*76404edcSAsim Jamshed 		while ((w_buff = DequeueJobBuffer(ctx))){
128*76404edcSAsim Jamshed 			if (++cnt > NUM_LOG_BUFF) {
129*76404edcSAsim Jamshed 				TRACE_ERROR("CPU %d: Exceed NUM_LOG_BUFF %d.\n",
130*76404edcSAsim Jamshed 						ctx->cpu, cnt);
131*76404edcSAsim Jamshed 				break;
132*76404edcSAsim Jamshed 			}
133*76404edcSAsim Jamshed 			len = fwrite(w_buff->buff, 1, w_buff->buff_len, w_buff->fid);
134*76404edcSAsim Jamshed 			if (len != w_buff->buff_len) {
135*76404edcSAsim Jamshed 				TRACE_ERROR("CPU %d: Tried to write %d, but only write %ld\n",
136*76404edcSAsim Jamshed 						ctx->cpu, w_buff->buff_len, len);
137*76404edcSAsim Jamshed 			}
138*76404edcSAsim Jamshed 			//assert(len == w_buff->buff_len);
139*76404edcSAsim Jamshed 			EnqueueFreeBuffer(ctx, w_buff);
140*76404edcSAsim Jamshed 		}
141*76404edcSAsim Jamshed 
142*76404edcSAsim Jamshed 		/* */
143*76404edcSAsim Jamshed 		while (ctx->state == IDLE_LOGT && !ctx->done) {
144*76404edcSAsim Jamshed 			char temp[1];
145*76404edcSAsim Jamshed 			int ret = read(ctx->sp_fd, temp, 1);
146*76404edcSAsim Jamshed 			if (ret)
147*76404edcSAsim Jamshed 				break;
148*76404edcSAsim Jamshed 		}
149*76404edcSAsim Jamshed 	}
150*76404edcSAsim Jamshed 
151*76404edcSAsim Jamshed 	TRACE_LOG("Log thread %d out of first loop.\n", ctx->cpu);
152*76404edcSAsim Jamshed 	/* handle every jobs in job buffer*/
153*76404edcSAsim Jamshed 	cnt = 0;
154*76404edcSAsim Jamshed 	while ((w_buff = DequeueJobBuffer(ctx))){
155*76404edcSAsim Jamshed 		if (++cnt > NUM_LOG_BUFF) {
156*76404edcSAsim Jamshed 			TRACE_ERROR("CPU %d: "
157*76404edcSAsim Jamshed 					"Exceed NUM_LOG_BUFF %d in final loop.\n", ctx->cpu, cnt);
158*76404edcSAsim Jamshed 			break;
159*76404edcSAsim Jamshed 		}
160*76404edcSAsim Jamshed 		len = fwrite(w_buff->buff, 1, w_buff->buff_len, w_buff->fid);
161*76404edcSAsim Jamshed 		assert(len == w_buff->buff_len);
162*76404edcSAsim Jamshed 		EnqueueFreeBuffer(ctx, w_buff);
163*76404edcSAsim Jamshed 	}
164*76404edcSAsim Jamshed 
165*76404edcSAsim Jamshed 	TRACE_LOG("Log thread %d finished.\n", ctx->cpu);
166*76404edcSAsim Jamshed 	pthread_exit(NULL);
167*76404edcSAsim Jamshed 
168*76404edcSAsim Jamshed 	return NULL;
169*76404edcSAsim Jamshed }
170*76404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
171