1 #include <stdio.h> 2 #include <unistd.h> 3 #include <string.h> 4 #include <stdlib.h> 5 #include <assert.h> 6 #include <errno.h> 7 #include <sys/queue.h> 8 #include <sys/types.h> 9 #include <sys/socket.h> 10 #include <pthread.h> 11 #include "cpu.h" 12 #include "debug.h" 13 #include "logger.h" 14 15 /*----------------------------------------------------------------------------*/ 16 static void 17 EnqueueFreeBuffer(log_thread_context *ctx, log_buff *free_bp) 18 { 19 pthread_mutex_lock(&ctx->free_mutex); 20 TAILQ_INSERT_TAIL(&ctx->free_queue, free_bp, buff_link); 21 ctx->free_buff_cnt++; 22 23 assert(ctx->free_buff_cnt <= NUM_LOG_BUFF); 24 assert(ctx->free_buff_cnt + ctx->job_buff_cnt <= NUM_LOG_BUFF); 25 pthread_mutex_unlock(&ctx->free_mutex); 26 } 27 /*----------------------------------------------------------------------------*/ 28 log_buff* 29 DequeueFreeBuffer(log_thread_context *ctx) 30 { 31 pthread_mutex_lock(&ctx->free_mutex); 32 log_buff *free_bp = TAILQ_FIRST(&ctx->free_queue); 33 if (free_bp) { 34 TAILQ_REMOVE(&ctx->free_queue, free_bp, buff_link); 35 ctx->free_buff_cnt--; 36 } 37 38 assert(ctx->free_buff_cnt >= 0); 39 assert(ctx->free_buff_cnt + ctx->job_buff_cnt <= NUM_LOG_BUFF); 40 pthread_mutex_unlock(&ctx->free_mutex); 41 return (free_bp); 42 } 43 /*----------------------------------------------------------------------------*/ 44 void 45 EnqueueJobBuffer(log_thread_context *ctx, log_buff *working_bp) 46 { 47 TAILQ_INSERT_TAIL(&ctx->working_queue, working_bp, buff_link); 48 ctx->job_buff_cnt++; 49 ctx->state = ACTIVE_LOGT; 50 assert(ctx->job_buff_cnt <= NUM_LOG_BUFF); 51 if (ctx->free_buff_cnt + ctx->job_buff_cnt > NUM_LOG_BUFF) { 52 TRACE_ERROR("free_buff_cnt(%d) + job_buff_cnt(%d) > NUM_LOG_BUFF(%d)\n", 53 ctx->free_buff_cnt, ctx->job_buff_cnt, NUM_LOG_BUFF); 54 } 55 assert(ctx->free_buff_cnt + ctx->job_buff_cnt <= NUM_LOG_BUFF); 56 } 57 /*----------------------------------------------------------------------------*/ 58 static log_buff* 59 DequeueJobBuffer(log_thread_context *ctx) 60 { 61 pthread_mutex_lock(&ctx->mutex); 62 log_buff *working_bp = TAILQ_FIRST(&ctx->working_queue); 63 if (working_bp) { 64 TAILQ_REMOVE(&ctx->working_queue, working_bp, buff_link); 65 ctx->job_buff_cnt--; 66 } else { 67 ctx->state = IDLE_LOGT; 68 } 69 70 assert(ctx->job_buff_cnt >= 0); 71 assert(ctx->free_buff_cnt + ctx->job_buff_cnt <= NUM_LOG_BUFF); 72 pthread_mutex_unlock(&ctx->mutex); 73 return (working_bp); 74 } 75 /*----------------------------------------------------------------------------*/ 76 void 77 InitLogThreadContext(struct log_thread_context *ctx, int cpu) 78 { 79 int i; 80 int sv[2]; 81 82 /* initialize log_thread_context */ 83 memset(ctx, 0, sizeof(struct log_thread_context)); 84 ctx->cpu = cpu; 85 ctx->state = IDLE_LOGT; 86 ctx->done = 0; 87 88 if (pipe(sv)) { 89 fprintf(stderr, "pipe() failed, errno=%d, errstr=%s\n", 90 errno, strerror(errno)); 91 exit(1); 92 } 93 ctx->sp_fd = sv[0]; 94 ctx->pair_sp_fd = sv[1]; 95 96 pthread_mutex_init(&ctx->mutex, NULL); 97 pthread_mutex_init(&ctx->free_mutex, NULL); 98 99 TAILQ_INIT(&ctx->working_queue); 100 TAILQ_INIT(&ctx->free_queue); 101 102 /* initialize free log_buff */ 103 log_buff *w_buff = malloc(sizeof(log_buff) * NUM_LOG_BUFF); 104 assert(w_buff); 105 for (i = 0; i < NUM_LOG_BUFF; i++) { 106 EnqueueFreeBuffer(ctx, &w_buff[i]); 107 } 108 } 109 /*----------------------------------------------------------------------------*/ 110 void * 111 ThreadLogMain(void* arg) 112 { 113 size_t len; 114 log_thread_context* ctx = (log_thread_context *) arg; 115 log_buff* w_buff; 116 int cnt; 117 118 mtcp_core_affinitize(ctx->cpu); 119 //fprintf(stderr, "[CPU %d] Log thread created. thread: %lu\n", 120 // ctx->cpu, pthread_self()); 121 122 TRACE_LOG("Log thread %d is starting.\n", ctx->cpu); 123 124 while (!ctx->done) { 125 /* handle every jobs in job buffer*/ 126 cnt = 0; 127 while ((w_buff = DequeueJobBuffer(ctx))){ 128 if (++cnt > NUM_LOG_BUFF) { 129 TRACE_ERROR("CPU %d: Exceed NUM_LOG_BUFF %d.\n", 130 ctx->cpu, cnt); 131 break; 132 } 133 len = fwrite(w_buff->buff, 1, w_buff->buff_len, w_buff->fid); 134 if (len != w_buff->buff_len) { 135 TRACE_ERROR("CPU %d: Tried to write %d, but only write %ld\n", 136 ctx->cpu, w_buff->buff_len, len); 137 } 138 //assert(len == w_buff->buff_len); 139 EnqueueFreeBuffer(ctx, w_buff); 140 } 141 142 /* */ 143 while (ctx->state == IDLE_LOGT && !ctx->done) { 144 char temp[1]; 145 int ret = read(ctx->sp_fd, temp, 1); 146 if (ret) 147 break; 148 } 149 } 150 151 TRACE_LOG("Log thread %d out of first loop.\n", ctx->cpu); 152 /* handle every jobs in job buffer*/ 153 cnt = 0; 154 while ((w_buff = DequeueJobBuffer(ctx))){ 155 if (++cnt > NUM_LOG_BUFF) { 156 TRACE_ERROR("CPU %d: " 157 "Exceed NUM_LOG_BUFF %d in final loop.\n", ctx->cpu, cnt); 158 break; 159 } 160 len = fwrite(w_buff->buff, 1, w_buff->buff_len, w_buff->fid); 161 assert(len == w_buff->buff_len); 162 EnqueueFreeBuffer(ctx, w_buff); 163 } 164 165 TRACE_LOG("Log thread %d finished.\n", ctx->cpu); 166 pthread_exit(NULL); 167 168 return NULL; 169 } 170 /*----------------------------------------------------------------------------*/ 171