176404edcSAsim Jamshed #include <pthread.h>
276404edcSAsim Jamshed #include <errno.h>
376404edcSAsim Jamshed #include <string.h>
476404edcSAsim Jamshed
576404edcSAsim Jamshed #include "pipe.h"
676404edcSAsim Jamshed #include "eventpoll.h"
776404edcSAsim Jamshed #include "tcp_stream.h"
876404edcSAsim Jamshed #include "mtcp.h"
976404edcSAsim Jamshed #include "debug.h"
1076404edcSAsim Jamshed
1176404edcSAsim Jamshed #define PIPE_BUF_SIZE 10240
1276404edcSAsim Jamshed
1376404edcSAsim Jamshed #define MAX(a, b) ((a)>(b)?(a):(b))
1476404edcSAsim Jamshed #define MIN(a, b) ((a)<(b)?(a):(b))
1576404edcSAsim Jamshed
1676404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
1776404edcSAsim Jamshed enum pipe_state
1876404edcSAsim Jamshed {
1976404edcSAsim Jamshed PIPE_CLOSED,
2076404edcSAsim Jamshed PIPE_ACTIVE,
2176404edcSAsim Jamshed PIPE_CLOSE_WAIT,
2276404edcSAsim Jamshed };
2376404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
2476404edcSAsim Jamshed struct pipe
2576404edcSAsim Jamshed {
2676404edcSAsim Jamshed int state;
2776404edcSAsim Jamshed socket_map_t socket[2];
2876404edcSAsim Jamshed
2976404edcSAsim Jamshed char *buf;
3076404edcSAsim Jamshed int buf_off;
3176404edcSAsim Jamshed int buf_tail;
3276404edcSAsim Jamshed int buf_len;
3376404edcSAsim Jamshed int buf_size;
3476404edcSAsim Jamshed
3576404edcSAsim Jamshed pthread_mutex_t pipe_lock;
3676404edcSAsim Jamshed pthread_cond_t pipe_cond;
3776404edcSAsim Jamshed };
3876404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
3976404edcSAsim Jamshed int
mtcp_pipe(mctx_t mctx,int pipeid[2])4076404edcSAsim Jamshed mtcp_pipe(mctx_t mctx, int pipeid[2])
4176404edcSAsim Jamshed {
4276404edcSAsim Jamshed socket_map_t socket[2];
4376404edcSAsim Jamshed struct pipe *pp;
4476404edcSAsim Jamshed int ret;
4576404edcSAsim Jamshed
4676404edcSAsim Jamshed socket[0] = AllocateSocket(mctx, MOS_SOCK_PIPE);
4776404edcSAsim Jamshed if (!socket[0]) {
4876404edcSAsim Jamshed errno = ENFILE;
4976404edcSAsim Jamshed return -1;
5076404edcSAsim Jamshed }
5176404edcSAsim Jamshed socket[1] = AllocateSocket(mctx, MOS_SOCK_PIPE);
5276404edcSAsim Jamshed if (!socket[1]) {
5376404edcSAsim Jamshed FreeSocket(mctx, socket[0]->id, MOS_SOCK_PIPE);
5476404edcSAsim Jamshed errno = ENFILE;
5576404edcSAsim Jamshed return -1;
5676404edcSAsim Jamshed }
5776404edcSAsim Jamshed
5876404edcSAsim Jamshed pp = (struct pipe *)calloc(1, sizeof(struct pipe));
5976404edcSAsim Jamshed if (!pp) {
6076404edcSAsim Jamshed /* errno set by calloc() */
6176404edcSAsim Jamshed FreeSocket(mctx, socket[0]->id, MOS_SOCK_PIPE);
6276404edcSAsim Jamshed FreeSocket(mctx, socket[1]->id, MOS_SOCK_PIPE);
6376404edcSAsim Jamshed return -1;
6476404edcSAsim Jamshed }
6576404edcSAsim Jamshed
6676404edcSAsim Jamshed pp->buf_size = PIPE_BUF_SIZE;
6776404edcSAsim Jamshed pp->buf = (char *)malloc(pp->buf_size);
6876404edcSAsim Jamshed if (!pp->buf) {
6976404edcSAsim Jamshed /* errno set by malloc() */
7076404edcSAsim Jamshed FreeSocket(mctx, socket[0]->id, MOS_SOCK_PIPE);
7176404edcSAsim Jamshed FreeSocket(mctx, socket[1]->id, MOS_SOCK_PIPE);
7276404edcSAsim Jamshed free(pp);
7376404edcSAsim Jamshed return -1;
7476404edcSAsim Jamshed }
7576404edcSAsim Jamshed
7676404edcSAsim Jamshed ret = pthread_mutex_init(&pp->pipe_lock, NULL);
7776404edcSAsim Jamshed if (ret) {
7876404edcSAsim Jamshed /* errno set by pthread_mutex_init() */
7976404edcSAsim Jamshed FreeSocket(mctx, socket[0]->id, MOS_SOCK_PIPE);
8076404edcSAsim Jamshed FreeSocket(mctx, socket[1]->id, MOS_SOCK_PIPE);
8176404edcSAsim Jamshed free(pp->buf);
8276404edcSAsim Jamshed free(pp);
8376404edcSAsim Jamshed return -1;
8476404edcSAsim Jamshed
8576404edcSAsim Jamshed }
8676404edcSAsim Jamshed ret = pthread_cond_init(&pp->pipe_cond, NULL);
8776404edcSAsim Jamshed if (ret) {
8876404edcSAsim Jamshed /* errno set by pthread_cond_init() */
8976404edcSAsim Jamshed FreeSocket(mctx, socket[0]->id, MOS_SOCK_PIPE);
9076404edcSAsim Jamshed FreeSocket(mctx, socket[1]->id, MOS_SOCK_PIPE);
9176404edcSAsim Jamshed free(pp->buf);
9276404edcSAsim Jamshed pthread_mutex_destroy(&pp->pipe_lock);
93*a14d6bd4SAsim Jamshed free(pp);
9476404edcSAsim Jamshed return -1;
9576404edcSAsim Jamshed }
9676404edcSAsim Jamshed
9776404edcSAsim Jamshed pp->state = PIPE_ACTIVE;
9876404edcSAsim Jamshed pp->socket[0] = socket[0];
9976404edcSAsim Jamshed pp->socket[1] = socket[1];
10076404edcSAsim Jamshed socket[0]->pp = pp;
10176404edcSAsim Jamshed socket[1]->pp = pp;
10276404edcSAsim Jamshed
10376404edcSAsim Jamshed pipeid[0] = socket[0]->id;
10476404edcSAsim Jamshed pipeid[1] = socket[1]->id;
10576404edcSAsim Jamshed
10676404edcSAsim Jamshed return 0;
10776404edcSAsim Jamshed
10876404edcSAsim Jamshed }
10976404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
11076404edcSAsim Jamshed static void
RaiseEventToPair(mtcp_manager_t mtcp,socket_map_t socket,uint32_t event)11176404edcSAsim Jamshed RaiseEventToPair(mtcp_manager_t mtcp, socket_map_t socket, uint32_t event)
11276404edcSAsim Jamshed {
11376404edcSAsim Jamshed struct pipe *pp = socket->pp;
11476404edcSAsim Jamshed socket_map_t pair_socket;
11576404edcSAsim Jamshed
11676404edcSAsim Jamshed if (pp->socket[0] == socket)
11776404edcSAsim Jamshed pair_socket = pp->socket[1];
11876404edcSAsim Jamshed else
11976404edcSAsim Jamshed pair_socket = pp->socket[0];
12076404edcSAsim Jamshed
12176404edcSAsim Jamshed if (pair_socket->opts & MTCP_NONBLOCK) {
12276404edcSAsim Jamshed if (pair_socket->epoll) {
12376404edcSAsim Jamshed AddEpollEvent(mtcp->ep, USR_EVENT_QUEUE, pair_socket, event);
12476404edcSAsim Jamshed }
12576404edcSAsim Jamshed } else {
12676404edcSAsim Jamshed pthread_cond_signal(&pp->pipe_cond);
12776404edcSAsim Jamshed }
12876404edcSAsim Jamshed }
12976404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
13076404edcSAsim Jamshed int
PipeRead(mctx_t mctx,int pipeid,char * buf,int len)13176404edcSAsim Jamshed PipeRead(mctx_t mctx, int pipeid, char *buf, int len)
13276404edcSAsim Jamshed {
13376404edcSAsim Jamshed mtcp_manager_t mtcp;
13476404edcSAsim Jamshed socket_map_t socket;
13576404edcSAsim Jamshed struct pipe *pp;
13676404edcSAsim Jamshed int to_read;
13776404edcSAsim Jamshed int to_notify;
13876404edcSAsim Jamshed int ret;
13976404edcSAsim Jamshed
14076404edcSAsim Jamshed mtcp = GetMTCPManager(mctx);
14176404edcSAsim Jamshed if (!mtcp) {
14276404edcSAsim Jamshed return -1;
14376404edcSAsim Jamshed }
14476404edcSAsim Jamshed socket = GetSocket(mctx, pipeid);
14576404edcSAsim Jamshed if (!socket) {
14676404edcSAsim Jamshed return -1;
14776404edcSAsim Jamshed }
14876404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_PIPE) {
14976404edcSAsim Jamshed errno = EBADF;
15076404edcSAsim Jamshed return -1;
15176404edcSAsim Jamshed }
15276404edcSAsim Jamshed pp = socket->pp;
15376404edcSAsim Jamshed if (!pp) {
15476404edcSAsim Jamshed errno = EBADF;
15576404edcSAsim Jamshed return -1;
15676404edcSAsim Jamshed }
15776404edcSAsim Jamshed if (pp->state == PIPE_CLOSED) {
15876404edcSAsim Jamshed errno = EINVAL;
15976404edcSAsim Jamshed return -1;
16076404edcSAsim Jamshed }
16176404edcSAsim Jamshed if (pp->state == PIPE_CLOSE_WAIT && pp->buf_len == 0) {
16276404edcSAsim Jamshed return 0;
16376404edcSAsim Jamshed }
16476404edcSAsim Jamshed
16576404edcSAsim Jamshed if (len <= 0) {
16676404edcSAsim Jamshed if (socket->opts & MTCP_NONBLOCK) {
16776404edcSAsim Jamshed errno = EAGAIN;
16876404edcSAsim Jamshed return -1;
16976404edcSAsim Jamshed } else {
17076404edcSAsim Jamshed return 0;
17176404edcSAsim Jamshed }
17276404edcSAsim Jamshed }
17376404edcSAsim Jamshed
17476404edcSAsim Jamshed pthread_mutex_lock(&pp->pipe_lock);
17576404edcSAsim Jamshed if (!(socket->opts & MTCP_NONBLOCK)) {
17676404edcSAsim Jamshed while (pp->buf_len == 0) {
17776404edcSAsim Jamshed ret = pthread_cond_wait(&pp->pipe_cond, &pp->pipe_lock);
17876404edcSAsim Jamshed if (ret) {
17976404edcSAsim Jamshed /* errno set by pthread_cond_wait() */
18076404edcSAsim Jamshed pthread_mutex_unlock(&pp->pipe_lock);
18176404edcSAsim Jamshed return -1;
18276404edcSAsim Jamshed }
18376404edcSAsim Jamshed }
18476404edcSAsim Jamshed }
18576404edcSAsim Jamshed
18676404edcSAsim Jamshed to_read = MIN(len, pp->buf_len);
18776404edcSAsim Jamshed if (to_read <= 0) {
18876404edcSAsim Jamshed pthread_mutex_unlock(&pp->pipe_lock);
18976404edcSAsim Jamshed if (pp->state == PIPE_ACTIVE) {
19076404edcSAsim Jamshed errno = EAGAIN;
19176404edcSAsim Jamshed return -1;
19276404edcSAsim Jamshed } else if (pp->state == PIPE_CLOSE_WAIT) {
19376404edcSAsim Jamshed return 0;
19476404edcSAsim Jamshed }
19576404edcSAsim Jamshed }
19676404edcSAsim Jamshed
19776404edcSAsim Jamshed /* if the buffer was full, notify the write event to the pair socket */
19876404edcSAsim Jamshed to_notify = FALSE;
19976404edcSAsim Jamshed if (pp->buf_len == pp->buf_size)
20076404edcSAsim Jamshed to_notify = TRUE;
20176404edcSAsim Jamshed
20276404edcSAsim Jamshed if (pp->buf_off + to_read < pp->buf_size) {
20376404edcSAsim Jamshed memcpy(buf, pp->buf + pp->buf_off, to_read);
20476404edcSAsim Jamshed pp->buf_off += to_read;
20576404edcSAsim Jamshed } else {
20676404edcSAsim Jamshed int temp_read = pp->buf_size - pp->buf_off;
20776404edcSAsim Jamshed memcpy(buf, pp->buf + pp->buf_off, temp_read);
20876404edcSAsim Jamshed memcpy(buf + temp_read, pp->buf, to_read - temp_read);
20976404edcSAsim Jamshed pp->buf_off = to_read - temp_read;
21076404edcSAsim Jamshed }
21176404edcSAsim Jamshed pp->buf_len -= to_read;
21276404edcSAsim Jamshed
21376404edcSAsim Jamshed /* notify to the pair socket for new buffer space */
21476404edcSAsim Jamshed if (to_notify) {
21576404edcSAsim Jamshed RaiseEventToPair(mtcp, socket, MOS_EPOLLOUT);
21676404edcSAsim Jamshed }
21776404edcSAsim Jamshed
21876404edcSAsim Jamshed pthread_mutex_unlock(&pp->pipe_lock);
21976404edcSAsim Jamshed
22076404edcSAsim Jamshed /* if level triggered, raise event for remainig buffer */
22176404edcSAsim Jamshed if (pp->buf_len > 0) {
22276404edcSAsim Jamshed if ((socket->epoll & MOS_EPOLLIN) && !(socket->epoll & MOS_EPOLLET)) {
22376404edcSAsim Jamshed AddEpollEvent(mtcp->ep,
22476404edcSAsim Jamshed USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
22576404edcSAsim Jamshed }
22676404edcSAsim Jamshed } else if (pp->state == PIPE_CLOSE_WAIT && pp->buf_len == 0) {
22776404edcSAsim Jamshed AddEpollEvent(mtcp->ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
22876404edcSAsim Jamshed }
22976404edcSAsim Jamshed
23076404edcSAsim Jamshed return to_read;
23176404edcSAsim Jamshed }
23276404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
23376404edcSAsim Jamshed int
PipeWrite(mctx_t mctx,int pipeid,const char * buf,int len)234a5e1a556SAsim Jamshed PipeWrite(mctx_t mctx, int pipeid, const char *buf, int len)
23576404edcSAsim Jamshed {
23676404edcSAsim Jamshed mtcp_manager_t mtcp;
23776404edcSAsim Jamshed socket_map_t socket;
23876404edcSAsim Jamshed struct pipe *pp;
23976404edcSAsim Jamshed int to_write;
24076404edcSAsim Jamshed int to_notify;
24176404edcSAsim Jamshed int ret;
24276404edcSAsim Jamshed
24376404edcSAsim Jamshed mtcp = GetMTCPManager(mctx);
24476404edcSAsim Jamshed if (!mtcp) {
24576404edcSAsim Jamshed return -1;
24676404edcSAsim Jamshed }
24776404edcSAsim Jamshed socket = GetSocket(mctx, pipeid);
24876404edcSAsim Jamshed if (!socket) {
24976404edcSAsim Jamshed return -1;
25076404edcSAsim Jamshed }
25176404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_PIPE) {
25276404edcSAsim Jamshed errno = EBADF;
25376404edcSAsim Jamshed return -1;
25476404edcSAsim Jamshed }
25576404edcSAsim Jamshed pp = socket->pp;
25676404edcSAsim Jamshed if (!pp) {
25776404edcSAsim Jamshed errno = EBADF;
25876404edcSAsim Jamshed return -1;
25976404edcSAsim Jamshed }
26076404edcSAsim Jamshed if (pp->state == PIPE_CLOSED) {
26176404edcSAsim Jamshed errno = EINVAL;
26276404edcSAsim Jamshed return -1;
26376404edcSAsim Jamshed }
26476404edcSAsim Jamshed if (pp->state == PIPE_CLOSE_WAIT) {
26576404edcSAsim Jamshed errno = EPIPE;
26676404edcSAsim Jamshed return -1;
26776404edcSAsim Jamshed }
26876404edcSAsim Jamshed
26976404edcSAsim Jamshed if (len <= 0) {
27076404edcSAsim Jamshed if (socket->opts & MTCP_NONBLOCK) {
27176404edcSAsim Jamshed errno = EAGAIN;
27276404edcSAsim Jamshed return -1;
27376404edcSAsim Jamshed } else {
27476404edcSAsim Jamshed return 0;
27576404edcSAsim Jamshed }
27676404edcSAsim Jamshed }
27776404edcSAsim Jamshed
27876404edcSAsim Jamshed pthread_mutex_lock(&pp->pipe_lock);
27976404edcSAsim Jamshed if (!(socket->opts & MTCP_NONBLOCK)) {
28076404edcSAsim Jamshed while (pp->buf_len == pp->buf_size) {
28176404edcSAsim Jamshed ret = pthread_cond_wait(&pp->pipe_cond, &pp->pipe_lock);
28276404edcSAsim Jamshed if (ret) {
28376404edcSAsim Jamshed /* errno set by pthread_cond_wait() */
28476404edcSAsim Jamshed pthread_mutex_unlock(&pp->pipe_lock);
28576404edcSAsim Jamshed return -1;
28676404edcSAsim Jamshed }
28776404edcSAsim Jamshed }
28876404edcSAsim Jamshed }
28976404edcSAsim Jamshed
29076404edcSAsim Jamshed to_write = MIN(len, pp->buf_size - pp->buf_len);
29176404edcSAsim Jamshed if (to_write <= 0) {
29276404edcSAsim Jamshed pthread_mutex_unlock(&pp->pipe_lock);
29376404edcSAsim Jamshed errno = EAGAIN;
29476404edcSAsim Jamshed return -1;
29576404edcSAsim Jamshed }
29676404edcSAsim Jamshed
29776404edcSAsim Jamshed /* if the buffer was empty, notify read event to the pair socket */
29876404edcSAsim Jamshed to_notify = FALSE;
29976404edcSAsim Jamshed if (pp->buf_len == 0)
30076404edcSAsim Jamshed to_notify = TRUE;
30176404edcSAsim Jamshed
30276404edcSAsim Jamshed if (pp->buf_tail + to_write < pp->buf_size) {
30376404edcSAsim Jamshed /* if the data fit into the buffer, copy it */
30476404edcSAsim Jamshed memcpy(pp->buf + pp->buf_tail, buf, to_write);
30576404edcSAsim Jamshed pp->buf_tail += to_write;
30676404edcSAsim Jamshed } else {
30776404edcSAsim Jamshed /* if the data overflow the buffer, wrap around the buffer */
30876404edcSAsim Jamshed int temp_write = pp->buf_size - pp->buf_tail;
30976404edcSAsim Jamshed memcpy(pp->buf + pp->buf_tail, buf, temp_write);
31076404edcSAsim Jamshed memcpy(pp->buf, buf + temp_write, to_write - temp_write);
31176404edcSAsim Jamshed pp->buf_tail = to_write - temp_write;
31276404edcSAsim Jamshed }
31376404edcSAsim Jamshed pp->buf_len += to_write;
31476404edcSAsim Jamshed
31576404edcSAsim Jamshed /* notify to the pair socket for the new buffers */
31676404edcSAsim Jamshed if (to_notify) {
31776404edcSAsim Jamshed RaiseEventToPair(mtcp, socket, MOS_EPOLLIN);
31876404edcSAsim Jamshed }
31976404edcSAsim Jamshed
32076404edcSAsim Jamshed pthread_mutex_unlock(&pp->pipe_lock);
32176404edcSAsim Jamshed
32276404edcSAsim Jamshed /* if level triggered, raise event for remainig buffer */
32376404edcSAsim Jamshed if (pp->buf_len < pp->buf_size) {
32476404edcSAsim Jamshed if ((socket->epoll & MOS_EPOLLOUT) && !(socket->epoll & MOS_EPOLLET)) {
32576404edcSAsim Jamshed AddEpollEvent(mtcp->ep,
32676404edcSAsim Jamshed USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
32776404edcSAsim Jamshed }
32876404edcSAsim Jamshed }
32976404edcSAsim Jamshed
33076404edcSAsim Jamshed return to_write;
33176404edcSAsim Jamshed }
33276404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
33376404edcSAsim Jamshed int
RaisePendingPipeEvents(mctx_t mctx,int epid,int pipeid)33476404edcSAsim Jamshed RaisePendingPipeEvents(mctx_t mctx, int epid, int pipeid)
33576404edcSAsim Jamshed {
33676404edcSAsim Jamshed struct mtcp_epoll *ep = GetSocket(mctx, epid)->ep;
33776404edcSAsim Jamshed socket_map_t socket = GetSocket(mctx, pipeid);
33876404edcSAsim Jamshed struct pipe *pp = socket->pp;
33976404edcSAsim Jamshed
34076404edcSAsim Jamshed if (!pp)
34176404edcSAsim Jamshed return -1;
34276404edcSAsim Jamshed if (pp->state < PIPE_ACTIVE)
34376404edcSAsim Jamshed return -1;
34476404edcSAsim Jamshed
34576404edcSAsim Jamshed /* if there are payloads already read before epoll registration */
34676404edcSAsim Jamshed /* generate read event */
34776404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLIN) {
34876404edcSAsim Jamshed if (pp->buf_len > 0) {
34976404edcSAsim Jamshed AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
35076404edcSAsim Jamshed } else if (pp->state == PIPE_CLOSE_WAIT) {
35176404edcSAsim Jamshed AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
35276404edcSAsim Jamshed }
35376404edcSAsim Jamshed }
35476404edcSAsim Jamshed
35576404edcSAsim Jamshed /* same thing to the write event */
35676404edcSAsim Jamshed if (socket->epoll & MOS_EPOLLOUT) {
35776404edcSAsim Jamshed if (pp->buf_len < pp->buf_size) {
35876404edcSAsim Jamshed AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
35976404edcSAsim Jamshed }
36076404edcSAsim Jamshed }
36176404edcSAsim Jamshed
36276404edcSAsim Jamshed return 0;
36376404edcSAsim Jamshed }
36476404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
36576404edcSAsim Jamshed int
PipeClose(mctx_t mctx,int pipeid)36676404edcSAsim Jamshed PipeClose(mctx_t mctx, int pipeid)
36776404edcSAsim Jamshed {
36876404edcSAsim Jamshed mtcp_manager_t mtcp;
36976404edcSAsim Jamshed socket_map_t socket;
37076404edcSAsim Jamshed struct pipe *pp;
37176404edcSAsim Jamshed
37276404edcSAsim Jamshed mtcp = GetMTCPManager(mctx);
37376404edcSAsim Jamshed if (!mtcp) {
37476404edcSAsim Jamshed return -1;
37576404edcSAsim Jamshed }
37676404edcSAsim Jamshed socket = GetSocket(mctx, pipeid);
37776404edcSAsim Jamshed if (!socket) {
37876404edcSAsim Jamshed return -1;
37976404edcSAsim Jamshed }
38076404edcSAsim Jamshed if (socket->socktype != MOS_SOCK_PIPE) {
38176404edcSAsim Jamshed errno = EINVAL;
38276404edcSAsim Jamshed return -1;
38376404edcSAsim Jamshed }
38476404edcSAsim Jamshed pp = socket->pp;
38576404edcSAsim Jamshed if (!pp) {
38676404edcSAsim Jamshed return 0;
38776404edcSAsim Jamshed }
38876404edcSAsim Jamshed
38976404edcSAsim Jamshed if (pp->state == PIPE_CLOSED) {
39076404edcSAsim Jamshed return 0;
39176404edcSAsim Jamshed }
39276404edcSAsim Jamshed
39376404edcSAsim Jamshed pthread_mutex_lock(&pp->pipe_lock);
39476404edcSAsim Jamshed if (pp->state == PIPE_ACTIVE) {
39576404edcSAsim Jamshed pp->state = PIPE_CLOSE_WAIT;
39676404edcSAsim Jamshed RaiseEventToPair(mtcp, socket, MOS_EPOLLIN);
39776404edcSAsim Jamshed pthread_mutex_unlock(&pp->pipe_lock);
39876404edcSAsim Jamshed return 0;
39976404edcSAsim Jamshed }
40076404edcSAsim Jamshed
40176404edcSAsim Jamshed /* control reaches here only when PIPE_CLOSE_WAIT */
40276404edcSAsim Jamshed
40376404edcSAsim Jamshed if (pp->socket[0])
40476404edcSAsim Jamshed pp->socket[0]->pp = NULL;
40576404edcSAsim Jamshed if (pp->socket[1])
40676404edcSAsim Jamshed pp->socket[1]->pp = NULL;
40776404edcSAsim Jamshed
40876404edcSAsim Jamshed pthread_mutex_unlock(&pp->pipe_lock);
40976404edcSAsim Jamshed
41076404edcSAsim Jamshed pthread_mutex_destroy(&pp->pipe_lock);
41176404edcSAsim Jamshed pthread_cond_destroy(&pp->pipe_cond);
41276404edcSAsim Jamshed
41376404edcSAsim Jamshed free(pp->buf);
41476404edcSAsim Jamshed
41576404edcSAsim Jamshed free(pp);
41676404edcSAsim Jamshed
41776404edcSAsim Jamshed return 0;
41876404edcSAsim Jamshed }
41976404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
420