1 #include <pthread.h> 2 #include <errno.h> 3 #include <string.h> 4 5 #include "pipe.h" 6 #include "eventpoll.h" 7 #include "tcp_stream.h" 8 #include "mtcp.h" 9 #include "debug.h" 10 11 #define PIPE_BUF_SIZE 10240 12 13 #define MAX(a, b) ((a)>(b)?(a):(b)) 14 #define MIN(a, b) ((a)<(b)?(a):(b)) 15 16 /*---------------------------------------------------------------------------*/ 17 enum pipe_state 18 { 19 PIPE_CLOSED, 20 PIPE_ACTIVE, 21 PIPE_CLOSE_WAIT, 22 }; 23 /*---------------------------------------------------------------------------*/ 24 struct pipe 25 { 26 int state; 27 socket_map_t socket[2]; 28 29 char *buf; 30 int buf_off; 31 int buf_tail; 32 int buf_len; 33 int buf_size; 34 35 pthread_mutex_t pipe_lock; 36 pthread_cond_t pipe_cond; 37 }; 38 /*---------------------------------------------------------------------------*/ 39 int 40 mtcp_pipe(mctx_t mctx, int pipeid[2]) 41 { 42 socket_map_t socket[2]; 43 struct pipe *pp; 44 int ret; 45 46 socket[0] = AllocateSocket(mctx, MOS_SOCK_PIPE); 47 if (!socket[0]) { 48 errno = ENFILE; 49 return -1; 50 } 51 socket[1] = AllocateSocket(mctx, MOS_SOCK_PIPE); 52 if (!socket[1]) { 53 FreeSocket(mctx, socket[0]->id, MOS_SOCK_PIPE); 54 errno = ENFILE; 55 return -1; 56 } 57 58 pp = (struct pipe *)calloc(1, sizeof(struct pipe)); 59 if (!pp) { 60 /* errno set by calloc() */ 61 FreeSocket(mctx, socket[0]->id, MOS_SOCK_PIPE); 62 FreeSocket(mctx, socket[1]->id, MOS_SOCK_PIPE); 63 return -1; 64 } 65 66 pp->buf_size = PIPE_BUF_SIZE; 67 pp->buf = (char *)malloc(pp->buf_size); 68 if (!pp->buf) { 69 /* errno set by malloc() */ 70 FreeSocket(mctx, socket[0]->id, MOS_SOCK_PIPE); 71 FreeSocket(mctx, socket[1]->id, MOS_SOCK_PIPE); 72 free(pp); 73 return -1; 74 } 75 76 ret = pthread_mutex_init(&pp->pipe_lock, NULL); 77 if (ret) { 78 /* errno set by pthread_mutex_init() */ 79 FreeSocket(mctx, socket[0]->id, MOS_SOCK_PIPE); 80 FreeSocket(mctx, socket[1]->id, MOS_SOCK_PIPE); 81 free(pp->buf); 82 free(pp); 83 return -1; 84 85 } 86 ret = pthread_cond_init(&pp->pipe_cond, NULL); 87 if (ret) { 88 /* errno set by pthread_cond_init() */ 89 FreeSocket(mctx, socket[0]->id, MOS_SOCK_PIPE); 90 FreeSocket(mctx, socket[1]->id, MOS_SOCK_PIPE); 91 free(pp->buf); 92 pthread_mutex_destroy(&pp->pipe_lock); 93 free(pp); 94 return -1; 95 } 96 97 pp->state = PIPE_ACTIVE; 98 pp->socket[0] = socket[0]; 99 pp->socket[1] = socket[1]; 100 socket[0]->pp = pp; 101 socket[1]->pp = pp; 102 103 pipeid[0] = socket[0]->id; 104 pipeid[1] = socket[1]->id; 105 106 return 0; 107 108 } 109 /*---------------------------------------------------------------------------*/ 110 static void 111 RaiseEventToPair(mtcp_manager_t mtcp, socket_map_t socket, uint32_t event) 112 { 113 struct pipe *pp = socket->pp; 114 socket_map_t pair_socket; 115 116 if (pp->socket[0] == socket) 117 pair_socket = pp->socket[1]; 118 else 119 pair_socket = pp->socket[0]; 120 121 if (pair_socket->opts & MTCP_NONBLOCK) { 122 if (pair_socket->epoll) { 123 AddEpollEvent(mtcp->ep, USR_EVENT_QUEUE, pair_socket, event); 124 } 125 } else { 126 pthread_cond_signal(&pp->pipe_cond); 127 } 128 } 129 /*---------------------------------------------------------------------------*/ 130 int 131 PipeRead(mctx_t mctx, int pipeid, char *buf, int len) 132 { 133 mtcp_manager_t mtcp; 134 socket_map_t socket; 135 struct pipe *pp; 136 int to_read; 137 int to_notify; 138 int ret; 139 140 mtcp = GetMTCPManager(mctx); 141 if (!mtcp) { 142 return -1; 143 } 144 socket = GetSocket(mctx, pipeid); 145 if (!socket) { 146 return -1; 147 } 148 if (socket->socktype != MOS_SOCK_PIPE) { 149 errno = EBADF; 150 return -1; 151 } 152 pp = socket->pp; 153 if (!pp) { 154 errno = EBADF; 155 return -1; 156 } 157 if (pp->state == PIPE_CLOSED) { 158 errno = EINVAL; 159 return -1; 160 } 161 if (pp->state == PIPE_CLOSE_WAIT && pp->buf_len == 0) { 162 return 0; 163 } 164 165 if (len <= 0) { 166 if (socket->opts & MTCP_NONBLOCK) { 167 errno = EAGAIN; 168 return -1; 169 } else { 170 return 0; 171 } 172 } 173 174 pthread_mutex_lock(&pp->pipe_lock); 175 if (!(socket->opts & MTCP_NONBLOCK)) { 176 while (pp->buf_len == 0) { 177 ret = pthread_cond_wait(&pp->pipe_cond, &pp->pipe_lock); 178 if (ret) { 179 /* errno set by pthread_cond_wait() */ 180 pthread_mutex_unlock(&pp->pipe_lock); 181 return -1; 182 } 183 } 184 } 185 186 to_read = MIN(len, pp->buf_len); 187 if (to_read <= 0) { 188 pthread_mutex_unlock(&pp->pipe_lock); 189 if (pp->state == PIPE_ACTIVE) { 190 errno = EAGAIN; 191 return -1; 192 } else if (pp->state == PIPE_CLOSE_WAIT) { 193 return 0; 194 } 195 } 196 197 /* if the buffer was full, notify the write event to the pair socket */ 198 to_notify = FALSE; 199 if (pp->buf_len == pp->buf_size) 200 to_notify = TRUE; 201 202 if (pp->buf_off + to_read < pp->buf_size) { 203 memcpy(buf, pp->buf + pp->buf_off, to_read); 204 pp->buf_off += to_read; 205 } else { 206 int temp_read = pp->buf_size - pp->buf_off; 207 memcpy(buf, pp->buf + pp->buf_off, temp_read); 208 memcpy(buf + temp_read, pp->buf, to_read - temp_read); 209 pp->buf_off = to_read - temp_read; 210 } 211 pp->buf_len -= to_read; 212 213 /* notify to the pair socket for new buffer space */ 214 if (to_notify) { 215 RaiseEventToPair(mtcp, socket, MOS_EPOLLOUT); 216 } 217 218 pthread_mutex_unlock(&pp->pipe_lock); 219 220 /* if level triggered, raise event for remainig buffer */ 221 if (pp->buf_len > 0) { 222 if ((socket->epoll & MOS_EPOLLIN) && !(socket->epoll & MOS_EPOLLET)) { 223 AddEpollEvent(mtcp->ep, 224 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 225 } 226 } else if (pp->state == PIPE_CLOSE_WAIT && pp->buf_len == 0) { 227 AddEpollEvent(mtcp->ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 228 } 229 230 return to_read; 231 } 232 /*---------------------------------------------------------------------------*/ 233 int 234 PipeWrite(mctx_t mctx, int pipeid, const char *buf, int len) 235 { 236 mtcp_manager_t mtcp; 237 socket_map_t socket; 238 struct pipe *pp; 239 int to_write; 240 int to_notify; 241 int ret; 242 243 mtcp = GetMTCPManager(mctx); 244 if (!mtcp) { 245 return -1; 246 } 247 socket = GetSocket(mctx, pipeid); 248 if (!socket) { 249 return -1; 250 } 251 if (socket->socktype != MOS_SOCK_PIPE) { 252 errno = EBADF; 253 return -1; 254 } 255 pp = socket->pp; 256 if (!pp) { 257 errno = EBADF; 258 return -1; 259 } 260 if (pp->state == PIPE_CLOSED) { 261 errno = EINVAL; 262 return -1; 263 } 264 if (pp->state == PIPE_CLOSE_WAIT) { 265 errno = EPIPE; 266 return -1; 267 } 268 269 if (len <= 0) { 270 if (socket->opts & MTCP_NONBLOCK) { 271 errno = EAGAIN; 272 return -1; 273 } else { 274 return 0; 275 } 276 } 277 278 pthread_mutex_lock(&pp->pipe_lock); 279 if (!(socket->opts & MTCP_NONBLOCK)) { 280 while (pp->buf_len == pp->buf_size) { 281 ret = pthread_cond_wait(&pp->pipe_cond, &pp->pipe_lock); 282 if (ret) { 283 /* errno set by pthread_cond_wait() */ 284 pthread_mutex_unlock(&pp->pipe_lock); 285 return -1; 286 } 287 } 288 } 289 290 to_write = MIN(len, pp->buf_size - pp->buf_len); 291 if (to_write <= 0) { 292 pthread_mutex_unlock(&pp->pipe_lock); 293 errno = EAGAIN; 294 return -1; 295 } 296 297 /* if the buffer was empty, notify read event to the pair socket */ 298 to_notify = FALSE; 299 if (pp->buf_len == 0) 300 to_notify = TRUE; 301 302 if (pp->buf_tail + to_write < pp->buf_size) { 303 /* if the data fit into the buffer, copy it */ 304 memcpy(pp->buf + pp->buf_tail, buf, to_write); 305 pp->buf_tail += to_write; 306 } else { 307 /* if the data overflow the buffer, wrap around the buffer */ 308 int temp_write = pp->buf_size - pp->buf_tail; 309 memcpy(pp->buf + pp->buf_tail, buf, temp_write); 310 memcpy(pp->buf, buf + temp_write, to_write - temp_write); 311 pp->buf_tail = to_write - temp_write; 312 } 313 pp->buf_len += to_write; 314 315 /* notify to the pair socket for the new buffers */ 316 if (to_notify) { 317 RaiseEventToPair(mtcp, socket, MOS_EPOLLIN); 318 } 319 320 pthread_mutex_unlock(&pp->pipe_lock); 321 322 /* if level triggered, raise event for remainig buffer */ 323 if (pp->buf_len < pp->buf_size) { 324 if ((socket->epoll & MOS_EPOLLOUT) && !(socket->epoll & MOS_EPOLLET)) { 325 AddEpollEvent(mtcp->ep, 326 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 327 } 328 } 329 330 return to_write; 331 } 332 /*----------------------------------------------------------------------------*/ 333 int 334 RaisePendingPipeEvents(mctx_t mctx, int epid, int pipeid) 335 { 336 struct mtcp_epoll *ep = GetSocket(mctx, epid)->ep; 337 socket_map_t socket = GetSocket(mctx, pipeid); 338 struct pipe *pp = socket->pp; 339 340 if (!pp) 341 return -1; 342 if (pp->state < PIPE_ACTIVE) 343 return -1; 344 345 /* if there are payloads already read before epoll registration */ 346 /* generate read event */ 347 if (socket->epoll & MOS_EPOLLIN) { 348 if (pp->buf_len > 0) { 349 AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 350 } else if (pp->state == PIPE_CLOSE_WAIT) { 351 AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN); 352 } 353 } 354 355 /* same thing to the write event */ 356 if (socket->epoll & MOS_EPOLLOUT) { 357 if (pp->buf_len < pp->buf_size) { 358 AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT); 359 } 360 } 361 362 return 0; 363 } 364 /*---------------------------------------------------------------------------*/ 365 int 366 PipeClose(mctx_t mctx, int pipeid) 367 { 368 mtcp_manager_t mtcp; 369 socket_map_t socket; 370 struct pipe *pp; 371 372 mtcp = GetMTCPManager(mctx); 373 if (!mtcp) { 374 return -1; 375 } 376 socket = GetSocket(mctx, pipeid); 377 if (!socket) { 378 return -1; 379 } 380 if (socket->socktype != MOS_SOCK_PIPE) { 381 errno = EINVAL; 382 return -1; 383 } 384 pp = socket->pp; 385 if (!pp) { 386 return 0; 387 } 388 389 if (pp->state == PIPE_CLOSED) { 390 return 0; 391 } 392 393 pthread_mutex_lock(&pp->pipe_lock); 394 if (pp->state == PIPE_ACTIVE) { 395 pp->state = PIPE_CLOSE_WAIT; 396 RaiseEventToPair(mtcp, socket, MOS_EPOLLIN); 397 pthread_mutex_unlock(&pp->pipe_lock); 398 return 0; 399 } 400 401 /* control reaches here only when PIPE_CLOSE_WAIT */ 402 403 if (pp->socket[0]) 404 pp->socket[0]->pp = NULL; 405 if (pp->socket[1]) 406 pp->socket[1]->pp = NULL; 407 408 pthread_mutex_unlock(&pp->pipe_lock); 409 410 pthread_mutex_destroy(&pp->pipe_lock); 411 pthread_cond_destroy(&pp->pipe_cond); 412 413 free(pp->buf); 414 415 free(pp); 416 417 return 0; 418 } 419 /*---------------------------------------------------------------------------*/ 420