1 #include <stdio.h>
2 #include <unistd.h>
3 #include <string.h>
4 #include <stdlib.h>
5 #include <assert.h>
6 #include <errno.h>
7 #include <sys/queue.h>
8 #include <sys/types.h>
9 #include <sys/socket.h>
10 #include <pthread.h>
11 #include "cpu.h"
12 #include "debug.h"
13 #include "logger.h"
14
15 /*----------------------------------------------------------------------------*/
16 static void
EnqueueFreeBuffer(log_thread_context * ctx,log_buff * free_bp)17 EnqueueFreeBuffer(log_thread_context *ctx, log_buff *free_bp)
18 {
19 pthread_mutex_lock(&ctx->free_mutex);
20 TAILQ_INSERT_TAIL(&ctx->free_queue, free_bp, buff_link);
21 ctx->free_buff_cnt++;
22
23 assert(ctx->free_buff_cnt <= NUM_LOG_BUFF);
24 assert(ctx->free_buff_cnt + ctx->job_buff_cnt <= NUM_LOG_BUFF);
25 pthread_mutex_unlock(&ctx->free_mutex);
26 }
27 /*----------------------------------------------------------------------------*/
28 log_buff*
DequeueFreeBuffer(log_thread_context * ctx)29 DequeueFreeBuffer(log_thread_context *ctx)
30 {
31 pthread_mutex_lock(&ctx->free_mutex);
32 log_buff *free_bp = TAILQ_FIRST(&ctx->free_queue);
33 if (free_bp) {
34 TAILQ_REMOVE(&ctx->free_queue, free_bp, buff_link);
35 ctx->free_buff_cnt--;
36 }
37
38 assert(ctx->free_buff_cnt >= 0);
39 assert(ctx->free_buff_cnt + ctx->job_buff_cnt <= NUM_LOG_BUFF);
40 pthread_mutex_unlock(&ctx->free_mutex);
41 return (free_bp);
42 }
43 /*----------------------------------------------------------------------------*/
44 void
EnqueueJobBuffer(log_thread_context * ctx,log_buff * working_bp)45 EnqueueJobBuffer(log_thread_context *ctx, log_buff *working_bp)
46 {
47 TAILQ_INSERT_TAIL(&ctx->working_queue, working_bp, buff_link);
48 ctx->job_buff_cnt++;
49 ctx->state = ACTIVE_LOGT;
50 assert(ctx->job_buff_cnt <= NUM_LOG_BUFF);
51 if (ctx->free_buff_cnt + ctx->job_buff_cnt > NUM_LOG_BUFF) {
52 TRACE_ERROR("free_buff_cnt(%d) + job_buff_cnt(%d) > NUM_LOG_BUFF(%d)\n",
53 ctx->free_buff_cnt, ctx->job_buff_cnt, NUM_LOG_BUFF);
54 }
55 assert(ctx->free_buff_cnt + ctx->job_buff_cnt <= NUM_LOG_BUFF);
56 }
57 /*----------------------------------------------------------------------------*/
58 static log_buff*
DequeueJobBuffer(log_thread_context * ctx)59 DequeueJobBuffer(log_thread_context *ctx)
60 {
61 pthread_mutex_lock(&ctx->mutex);
62 log_buff *working_bp = TAILQ_FIRST(&ctx->working_queue);
63 if (working_bp) {
64 TAILQ_REMOVE(&ctx->working_queue, working_bp, buff_link);
65 ctx->job_buff_cnt--;
66 } else {
67 ctx->state = IDLE_LOGT;
68 }
69
70 assert(ctx->job_buff_cnt >= 0);
71 assert(ctx->free_buff_cnt + ctx->job_buff_cnt <= NUM_LOG_BUFF);
72 pthread_mutex_unlock(&ctx->mutex);
73 return (working_bp);
74 }
75 /*----------------------------------------------------------------------------*/
76 void
InitLogThreadContext(struct log_thread_context * ctx,int cpu)77 InitLogThreadContext(struct log_thread_context *ctx, int cpu)
78 {
79 int i;
80 int sv[2];
81
82 /* initialize log_thread_context */
83 memset(ctx, 0, sizeof(struct log_thread_context));
84 ctx->cpu = cpu;
85 ctx->state = IDLE_LOGT;
86 ctx->done = 0;
87
88 if (pipe(sv)) {
89 fprintf(stderr, "pipe() failed, errno=%d, errstr=%s\n",
90 errno, strerror(errno));
91 exit(1);
92 }
93 ctx->sp_fd = sv[0];
94 ctx->pair_sp_fd = sv[1];
95
96 pthread_mutex_init(&ctx->mutex, NULL);
97 pthread_mutex_init(&ctx->free_mutex, NULL);
98
99 TAILQ_INIT(&ctx->working_queue);
100 TAILQ_INIT(&ctx->free_queue);
101
102 /* initialize free log_buff */
103 log_buff *w_buff = malloc(sizeof(log_buff) * NUM_LOG_BUFF);
104 assert(w_buff);
105 for (i = 0; i < NUM_LOG_BUFF; i++) {
106 EnqueueFreeBuffer(ctx, &w_buff[i]);
107 }
108 }
109 /*----------------------------------------------------------------------------*/
110 void *
ThreadLogMain(void * arg)111 ThreadLogMain(void* arg)
112 {
113 size_t len;
114 log_thread_context* ctx = (log_thread_context *) arg;
115 log_buff* w_buff;
116 int cnt;
117
118 mtcp_core_affinitize(ctx->cpu);
119 //fprintf(stderr, "[CPU %d] Log thread created. thread: %lu\n",
120 // ctx->cpu, pthread_self());
121
122 TRACE_LOG("Log thread %d is starting.\n", ctx->cpu);
123
124 while (!ctx->done) {
125 /* handle every jobs in job buffer*/
126 cnt = 0;
127 while ((w_buff = DequeueJobBuffer(ctx))){
128 if (++cnt > NUM_LOG_BUFF) {
129 TRACE_ERROR("CPU %d: Exceed NUM_LOG_BUFF %d.\n",
130 ctx->cpu, cnt);
131 break;
132 }
133 len = fwrite(w_buff->buff, 1, w_buff->buff_len, w_buff->fid);
134 if (len != w_buff->buff_len) {
135 TRACE_ERROR("CPU %d: Tried to write %d, but only write %ld\n",
136 ctx->cpu, w_buff->buff_len, len);
137 }
138 //assert(len == w_buff->buff_len);
139 EnqueueFreeBuffer(ctx, w_buff);
140 }
141
142 /* */
143 while (ctx->state == IDLE_LOGT && !ctx->done) {
144 char temp[1];
145 int ret = read(ctx->sp_fd, temp, 1);
146 if (ret)
147 break;
148 }
149 }
150
151 TRACE_LOG("Log thread %d out of first loop.\n", ctx->cpu);
152 /* handle every jobs in job buffer*/
153 cnt = 0;
154 while ((w_buff = DequeueJobBuffer(ctx))){
155 if (++cnt > NUM_LOG_BUFF) {
156 TRACE_ERROR("CPU %d: "
157 "Exceed NUM_LOG_BUFF %d in final loop.\n", ctx->cpu, cnt);
158 break;
159 }
160 len = fwrite(w_buff->buff, 1, w_buff->buff_len, w_buff->fid);
161 assert(len == w_buff->buff_len);
162 EnqueueFreeBuffer(ctx, w_buff);
163 }
164
165 TRACE_LOG("Log thread %d finished.\n", ctx->cpu);
166 pthread_exit(NULL);
167
168 return NULL;
169 }
170 /*----------------------------------------------------------------------------*/
171