xref: /mOS-networking-stack/core/src/pipe.c (revision a14d6bd4)
176404edcSAsim Jamshed #include <pthread.h>
276404edcSAsim Jamshed #include <errno.h>
376404edcSAsim Jamshed #include <string.h>
476404edcSAsim Jamshed 
576404edcSAsim Jamshed #include "pipe.h"
676404edcSAsim Jamshed #include "eventpoll.h"
776404edcSAsim Jamshed #include "tcp_stream.h"
876404edcSAsim Jamshed #include "mtcp.h"
976404edcSAsim Jamshed #include "debug.h"
1076404edcSAsim Jamshed 
1176404edcSAsim Jamshed #define PIPE_BUF_SIZE 10240
1276404edcSAsim Jamshed 
1376404edcSAsim Jamshed #define MAX(a, b) ((a)>(b)?(a):(b))
1476404edcSAsim Jamshed #define MIN(a, b) ((a)<(b)?(a):(b))
1576404edcSAsim Jamshed 
1676404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
1776404edcSAsim Jamshed enum pipe_state
1876404edcSAsim Jamshed {
1976404edcSAsim Jamshed 	PIPE_CLOSED,
2076404edcSAsim Jamshed 	PIPE_ACTIVE,
2176404edcSAsim Jamshed 	PIPE_CLOSE_WAIT,
2276404edcSAsim Jamshed };
2376404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
2476404edcSAsim Jamshed struct pipe
2576404edcSAsim Jamshed {
2676404edcSAsim Jamshed 	int state;
2776404edcSAsim Jamshed 	socket_map_t socket[2];
2876404edcSAsim Jamshed 
2976404edcSAsim Jamshed 	char *buf;
3076404edcSAsim Jamshed 	int buf_off;
3176404edcSAsim Jamshed 	int buf_tail;
3276404edcSAsim Jamshed 	int buf_len;
3376404edcSAsim Jamshed 	int buf_size;
3476404edcSAsim Jamshed 
3576404edcSAsim Jamshed 	pthread_mutex_t pipe_lock;
3676404edcSAsim Jamshed 	pthread_cond_t pipe_cond;
3776404edcSAsim Jamshed };
3876404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
3976404edcSAsim Jamshed int
mtcp_pipe(mctx_t mctx,int pipeid[2])4076404edcSAsim Jamshed mtcp_pipe(mctx_t mctx, int pipeid[2])
4176404edcSAsim Jamshed {
4276404edcSAsim Jamshed 	socket_map_t socket[2];
4376404edcSAsim Jamshed 	struct pipe *pp;
4476404edcSAsim Jamshed 	int ret;
4576404edcSAsim Jamshed 
4676404edcSAsim Jamshed 	socket[0] = AllocateSocket(mctx, MOS_SOCK_PIPE);
4776404edcSAsim Jamshed 	if (!socket[0]) {
4876404edcSAsim Jamshed 		errno = ENFILE;
4976404edcSAsim Jamshed 		return -1;
5076404edcSAsim Jamshed 	}
5176404edcSAsim Jamshed 	socket[1] = AllocateSocket(mctx, MOS_SOCK_PIPE);
5276404edcSAsim Jamshed 	if (!socket[1]) {
5376404edcSAsim Jamshed 		FreeSocket(mctx, socket[0]->id, MOS_SOCK_PIPE);
5476404edcSAsim Jamshed 		errno = ENFILE;
5576404edcSAsim Jamshed 		return -1;
5676404edcSAsim Jamshed 	}
5776404edcSAsim Jamshed 
5876404edcSAsim Jamshed 	pp = (struct pipe *)calloc(1, sizeof(struct pipe));
5976404edcSAsim Jamshed 	if (!pp) {
6076404edcSAsim Jamshed 		/* errno set by calloc() */
6176404edcSAsim Jamshed 		FreeSocket(mctx, socket[0]->id, MOS_SOCK_PIPE);
6276404edcSAsim Jamshed 		FreeSocket(mctx, socket[1]->id, MOS_SOCK_PIPE);
6376404edcSAsim Jamshed 		return -1;
6476404edcSAsim Jamshed 	}
6576404edcSAsim Jamshed 
6676404edcSAsim Jamshed 	pp->buf_size = PIPE_BUF_SIZE;
6776404edcSAsim Jamshed 	pp->buf = (char *)malloc(pp->buf_size);
6876404edcSAsim Jamshed 	if (!pp->buf) {
6976404edcSAsim Jamshed 		/* errno set by malloc() */
7076404edcSAsim Jamshed 		FreeSocket(mctx, socket[0]->id, MOS_SOCK_PIPE);
7176404edcSAsim Jamshed 		FreeSocket(mctx, socket[1]->id, MOS_SOCK_PIPE);
7276404edcSAsim Jamshed 		free(pp);
7376404edcSAsim Jamshed 		return -1;
7476404edcSAsim Jamshed 	}
7576404edcSAsim Jamshed 
7676404edcSAsim Jamshed 	ret = pthread_mutex_init(&pp->pipe_lock, NULL);
7776404edcSAsim Jamshed 	if (ret) {
7876404edcSAsim Jamshed 		/* errno set by pthread_mutex_init() */
7976404edcSAsim Jamshed 		FreeSocket(mctx, socket[0]->id, MOS_SOCK_PIPE);
8076404edcSAsim Jamshed 		FreeSocket(mctx, socket[1]->id, MOS_SOCK_PIPE);
8176404edcSAsim Jamshed 		free(pp->buf);
8276404edcSAsim Jamshed 		free(pp);
8376404edcSAsim Jamshed 		return -1;
8476404edcSAsim Jamshed 
8576404edcSAsim Jamshed 	}
8676404edcSAsim Jamshed 	ret = pthread_cond_init(&pp->pipe_cond, NULL);
8776404edcSAsim Jamshed 	if (ret) {
8876404edcSAsim Jamshed 		/* errno set by pthread_cond_init() */
8976404edcSAsim Jamshed 		FreeSocket(mctx, socket[0]->id, MOS_SOCK_PIPE);
9076404edcSAsim Jamshed 		FreeSocket(mctx, socket[1]->id, MOS_SOCK_PIPE);
9176404edcSAsim Jamshed 		free(pp->buf);
9276404edcSAsim Jamshed 		pthread_mutex_destroy(&pp->pipe_lock);
93*a14d6bd4SAsim Jamshed 		free(pp);
9476404edcSAsim Jamshed 		return -1;
9576404edcSAsim Jamshed 	}
9676404edcSAsim Jamshed 
9776404edcSAsim Jamshed 	pp->state = PIPE_ACTIVE;
9876404edcSAsim Jamshed 	pp->socket[0] = socket[0];
9976404edcSAsim Jamshed 	pp->socket[1] = socket[1];
10076404edcSAsim Jamshed 	socket[0]->pp = pp;
10176404edcSAsim Jamshed 	socket[1]->pp = pp;
10276404edcSAsim Jamshed 
10376404edcSAsim Jamshed 	pipeid[0] = socket[0]->id;
10476404edcSAsim Jamshed 	pipeid[1] = socket[1]->id;
10576404edcSAsim Jamshed 
10676404edcSAsim Jamshed 	return 0;
10776404edcSAsim Jamshed 
10876404edcSAsim Jamshed }
10976404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
11076404edcSAsim Jamshed static void
RaiseEventToPair(mtcp_manager_t mtcp,socket_map_t socket,uint32_t event)11176404edcSAsim Jamshed RaiseEventToPair(mtcp_manager_t mtcp, socket_map_t socket, uint32_t event)
11276404edcSAsim Jamshed {
11376404edcSAsim Jamshed 	struct pipe *pp = socket->pp;
11476404edcSAsim Jamshed 	socket_map_t pair_socket;
11576404edcSAsim Jamshed 
11676404edcSAsim Jamshed 	if (pp->socket[0] == socket)
11776404edcSAsim Jamshed 		pair_socket = pp->socket[1];
11876404edcSAsim Jamshed 	else
11976404edcSAsim Jamshed 		pair_socket = pp->socket[0];
12076404edcSAsim Jamshed 
12176404edcSAsim Jamshed 	if (pair_socket->opts & MTCP_NONBLOCK) {
12276404edcSAsim Jamshed 		if (pair_socket->epoll) {
12376404edcSAsim Jamshed 			AddEpollEvent(mtcp->ep, USR_EVENT_QUEUE, pair_socket, event);
12476404edcSAsim Jamshed 		}
12576404edcSAsim Jamshed 	} else {
12676404edcSAsim Jamshed 		pthread_cond_signal(&pp->pipe_cond);
12776404edcSAsim Jamshed 	}
12876404edcSAsim Jamshed }
12976404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
13076404edcSAsim Jamshed int
PipeRead(mctx_t mctx,int pipeid,char * buf,int len)13176404edcSAsim Jamshed PipeRead(mctx_t mctx, int pipeid, char *buf, int len)
13276404edcSAsim Jamshed {
13376404edcSAsim Jamshed 	mtcp_manager_t mtcp;
13476404edcSAsim Jamshed 	socket_map_t socket;
13576404edcSAsim Jamshed 	struct pipe *pp;
13676404edcSAsim Jamshed 	int to_read;
13776404edcSAsim Jamshed 	int to_notify;
13876404edcSAsim Jamshed 	int ret;
13976404edcSAsim Jamshed 
14076404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
14176404edcSAsim Jamshed 	if (!mtcp) {
14276404edcSAsim Jamshed 		return -1;
14376404edcSAsim Jamshed 	}
14476404edcSAsim Jamshed 	socket = GetSocket(mctx, pipeid);
14576404edcSAsim Jamshed 	if (!socket) {
14676404edcSAsim Jamshed 		return -1;
14776404edcSAsim Jamshed 	}
14876404edcSAsim Jamshed 	if (socket->socktype != MOS_SOCK_PIPE) {
14976404edcSAsim Jamshed 		errno = EBADF;
15076404edcSAsim Jamshed 		return -1;
15176404edcSAsim Jamshed 	}
15276404edcSAsim Jamshed 	pp = socket->pp;
15376404edcSAsim Jamshed 	if (!pp) {
15476404edcSAsim Jamshed 		errno = EBADF;
15576404edcSAsim Jamshed 		return -1;
15676404edcSAsim Jamshed 	}
15776404edcSAsim Jamshed 	if (pp->state == PIPE_CLOSED) {
15876404edcSAsim Jamshed 		errno = EINVAL;
15976404edcSAsim Jamshed 		return -1;
16076404edcSAsim Jamshed 	}
16176404edcSAsim Jamshed 	if (pp->state == PIPE_CLOSE_WAIT && pp->buf_len == 0) {
16276404edcSAsim Jamshed 		return 0;
16376404edcSAsim Jamshed 	}
16476404edcSAsim Jamshed 
16576404edcSAsim Jamshed 	if (len <= 0) {
16676404edcSAsim Jamshed 		if (socket->opts & MTCP_NONBLOCK) {
16776404edcSAsim Jamshed 			errno = EAGAIN;
16876404edcSAsim Jamshed 			return -1;
16976404edcSAsim Jamshed 		} else {
17076404edcSAsim Jamshed 			return 0;
17176404edcSAsim Jamshed 		}
17276404edcSAsim Jamshed 	}
17376404edcSAsim Jamshed 
17476404edcSAsim Jamshed 	pthread_mutex_lock(&pp->pipe_lock);
17576404edcSAsim Jamshed 	if (!(socket->opts & MTCP_NONBLOCK)) {
17676404edcSAsim Jamshed 		while (pp->buf_len == 0) {
17776404edcSAsim Jamshed 			ret = pthread_cond_wait(&pp->pipe_cond, &pp->pipe_lock);
17876404edcSAsim Jamshed 			if (ret) {
17976404edcSAsim Jamshed 				/* errno set by pthread_cond_wait() */
18076404edcSAsim Jamshed 				pthread_mutex_unlock(&pp->pipe_lock);
18176404edcSAsim Jamshed 				return -1;
18276404edcSAsim Jamshed 			}
18376404edcSAsim Jamshed 		}
18476404edcSAsim Jamshed 	}
18576404edcSAsim Jamshed 
18676404edcSAsim Jamshed 	to_read = MIN(len, pp->buf_len);
18776404edcSAsim Jamshed 	if (to_read <= 0) {
18876404edcSAsim Jamshed 		pthread_mutex_unlock(&pp->pipe_lock);
18976404edcSAsim Jamshed 		if (pp->state == PIPE_ACTIVE) {
19076404edcSAsim Jamshed 			errno = EAGAIN;
19176404edcSAsim Jamshed 			return -1;
19276404edcSAsim Jamshed 		} else if (pp->state == PIPE_CLOSE_WAIT) {
19376404edcSAsim Jamshed 			return 0;
19476404edcSAsim Jamshed 		}
19576404edcSAsim Jamshed 	}
19676404edcSAsim Jamshed 
19776404edcSAsim Jamshed 	/* if the buffer was full, notify the write event to the pair socket */
19876404edcSAsim Jamshed 	to_notify = FALSE;
19976404edcSAsim Jamshed 	if (pp->buf_len == pp->buf_size)
20076404edcSAsim Jamshed 		to_notify = TRUE;
20176404edcSAsim Jamshed 
20276404edcSAsim Jamshed 	if (pp->buf_off + to_read < pp->buf_size) {
20376404edcSAsim Jamshed 		memcpy(buf, pp->buf + pp->buf_off, to_read);
20476404edcSAsim Jamshed 		pp->buf_off += to_read;
20576404edcSAsim Jamshed 	} else {
20676404edcSAsim Jamshed 		int temp_read = pp->buf_size - pp->buf_off;
20776404edcSAsim Jamshed 		memcpy(buf, pp->buf + pp->buf_off, temp_read);
20876404edcSAsim Jamshed 		memcpy(buf + temp_read, pp->buf, to_read - temp_read);
20976404edcSAsim Jamshed 		pp->buf_off = to_read - temp_read;
21076404edcSAsim Jamshed 	}
21176404edcSAsim Jamshed 	pp->buf_len -= to_read;
21276404edcSAsim Jamshed 
21376404edcSAsim Jamshed 	/* notify to the pair socket for new buffer space */
21476404edcSAsim Jamshed 	if (to_notify) {
21576404edcSAsim Jamshed 		RaiseEventToPair(mtcp, socket, MOS_EPOLLOUT);
21676404edcSAsim Jamshed 	}
21776404edcSAsim Jamshed 
21876404edcSAsim Jamshed 	pthread_mutex_unlock(&pp->pipe_lock);
21976404edcSAsim Jamshed 
22076404edcSAsim Jamshed 	/* if level triggered, raise event for remainig buffer */
22176404edcSAsim Jamshed 	if (pp->buf_len > 0) {
22276404edcSAsim Jamshed 		if ((socket->epoll & MOS_EPOLLIN) && !(socket->epoll & MOS_EPOLLET)) {
22376404edcSAsim Jamshed 			AddEpollEvent(mtcp->ep,
22476404edcSAsim Jamshed 					USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
22576404edcSAsim Jamshed 		}
22676404edcSAsim Jamshed 	} else if (pp->state == PIPE_CLOSE_WAIT && pp->buf_len == 0) {
22776404edcSAsim Jamshed 		AddEpollEvent(mtcp->ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
22876404edcSAsim Jamshed 	}
22976404edcSAsim Jamshed 
23076404edcSAsim Jamshed 	return to_read;
23176404edcSAsim Jamshed }
23276404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
23376404edcSAsim Jamshed int
PipeWrite(mctx_t mctx,int pipeid,const char * buf,int len)234a5e1a556SAsim Jamshed PipeWrite(mctx_t mctx, int pipeid, const char *buf, int len)
23576404edcSAsim Jamshed {
23676404edcSAsim Jamshed 	mtcp_manager_t mtcp;
23776404edcSAsim Jamshed 	socket_map_t socket;
23876404edcSAsim Jamshed 	struct pipe *pp;
23976404edcSAsim Jamshed 	int to_write;
24076404edcSAsim Jamshed 	int to_notify;
24176404edcSAsim Jamshed 	int ret;
24276404edcSAsim Jamshed 
24376404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
24476404edcSAsim Jamshed 	if (!mtcp) {
24576404edcSAsim Jamshed 		return -1;
24676404edcSAsim Jamshed 	}
24776404edcSAsim Jamshed 	socket = GetSocket(mctx, pipeid);
24876404edcSAsim Jamshed 	if (!socket) {
24976404edcSAsim Jamshed 		return -1;
25076404edcSAsim Jamshed 	}
25176404edcSAsim Jamshed 	if (socket->socktype != MOS_SOCK_PIPE) {
25276404edcSAsim Jamshed 		errno = EBADF;
25376404edcSAsim Jamshed 		return -1;
25476404edcSAsim Jamshed 	}
25576404edcSAsim Jamshed 	pp = socket->pp;
25676404edcSAsim Jamshed 	if (!pp) {
25776404edcSAsim Jamshed 		errno = EBADF;
25876404edcSAsim Jamshed 		return -1;
25976404edcSAsim Jamshed 	}
26076404edcSAsim Jamshed 	if (pp->state == PIPE_CLOSED) {
26176404edcSAsim Jamshed 		errno = EINVAL;
26276404edcSAsim Jamshed 		return -1;
26376404edcSAsim Jamshed 	}
26476404edcSAsim Jamshed 	if (pp->state == PIPE_CLOSE_WAIT) {
26576404edcSAsim Jamshed 		errno = EPIPE;
26676404edcSAsim Jamshed 		return -1;
26776404edcSAsim Jamshed 	}
26876404edcSAsim Jamshed 
26976404edcSAsim Jamshed 	if (len <= 0) {
27076404edcSAsim Jamshed 		if (socket->opts & MTCP_NONBLOCK) {
27176404edcSAsim Jamshed 			errno = EAGAIN;
27276404edcSAsim Jamshed 			return -1;
27376404edcSAsim Jamshed 		} else {
27476404edcSAsim Jamshed 			return 0;
27576404edcSAsim Jamshed 		}
27676404edcSAsim Jamshed 	}
27776404edcSAsim Jamshed 
27876404edcSAsim Jamshed 	pthread_mutex_lock(&pp->pipe_lock);
27976404edcSAsim Jamshed 	if (!(socket->opts & MTCP_NONBLOCK)) {
28076404edcSAsim Jamshed 		while (pp->buf_len == pp->buf_size) {
28176404edcSAsim Jamshed 			ret = pthread_cond_wait(&pp->pipe_cond, &pp->pipe_lock);
28276404edcSAsim Jamshed 			if (ret) {
28376404edcSAsim Jamshed 				/* errno set by pthread_cond_wait() */
28476404edcSAsim Jamshed 				pthread_mutex_unlock(&pp->pipe_lock);
28576404edcSAsim Jamshed 				return -1;
28676404edcSAsim Jamshed 			}
28776404edcSAsim Jamshed 		}
28876404edcSAsim Jamshed 	}
28976404edcSAsim Jamshed 
29076404edcSAsim Jamshed 	to_write = MIN(len, pp->buf_size - pp->buf_len);
29176404edcSAsim Jamshed 	if (to_write <= 0) {
29276404edcSAsim Jamshed 		pthread_mutex_unlock(&pp->pipe_lock);
29376404edcSAsim Jamshed 		errno = EAGAIN;
29476404edcSAsim Jamshed 		return -1;
29576404edcSAsim Jamshed 	}
29676404edcSAsim Jamshed 
29776404edcSAsim Jamshed 	/* if the buffer was empty, notify read event to the pair socket */
29876404edcSAsim Jamshed 	to_notify = FALSE;
29976404edcSAsim Jamshed 	if (pp->buf_len == 0)
30076404edcSAsim Jamshed 		to_notify = TRUE;
30176404edcSAsim Jamshed 
30276404edcSAsim Jamshed 	if (pp->buf_tail + to_write < pp->buf_size) {
30376404edcSAsim Jamshed 		/* if the data fit into the buffer, copy it */
30476404edcSAsim Jamshed 		memcpy(pp->buf + pp->buf_tail, buf, to_write);
30576404edcSAsim Jamshed 		pp->buf_tail += to_write;
30676404edcSAsim Jamshed 	} else {
30776404edcSAsim Jamshed 		/* if the data overflow the buffer, wrap around the buffer */
30876404edcSAsim Jamshed 		int temp_write = pp->buf_size - pp->buf_tail;
30976404edcSAsim Jamshed 		memcpy(pp->buf + pp->buf_tail, buf, temp_write);
31076404edcSAsim Jamshed 		memcpy(pp->buf, buf + temp_write, to_write - temp_write);
31176404edcSAsim Jamshed 		pp->buf_tail = to_write - temp_write;
31276404edcSAsim Jamshed 	}
31376404edcSAsim Jamshed 	pp->buf_len += to_write;
31476404edcSAsim Jamshed 
31576404edcSAsim Jamshed 	/* notify to the pair socket for the new buffers */
31676404edcSAsim Jamshed 	if (to_notify) {
31776404edcSAsim Jamshed 		RaiseEventToPair(mtcp, socket, MOS_EPOLLIN);
31876404edcSAsim Jamshed 	}
31976404edcSAsim Jamshed 
32076404edcSAsim Jamshed 	pthread_mutex_unlock(&pp->pipe_lock);
32176404edcSAsim Jamshed 
32276404edcSAsim Jamshed 	/* if level triggered, raise event for remainig buffer */
32376404edcSAsim Jamshed 	if (pp->buf_len < pp->buf_size) {
32476404edcSAsim Jamshed 		if ((socket->epoll & MOS_EPOLLOUT) && !(socket->epoll & MOS_EPOLLET)) {
32576404edcSAsim Jamshed 			AddEpollEvent(mtcp->ep,
32676404edcSAsim Jamshed 					USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
32776404edcSAsim Jamshed 		}
32876404edcSAsim Jamshed 	}
32976404edcSAsim Jamshed 
33076404edcSAsim Jamshed 	return to_write;
33176404edcSAsim Jamshed }
33276404edcSAsim Jamshed /*----------------------------------------------------------------------------*/
33376404edcSAsim Jamshed int
RaisePendingPipeEvents(mctx_t mctx,int epid,int pipeid)33476404edcSAsim Jamshed RaisePendingPipeEvents(mctx_t mctx, int epid, int pipeid)
33576404edcSAsim Jamshed {
33676404edcSAsim Jamshed 	struct mtcp_epoll *ep = GetSocket(mctx, epid)->ep;
33776404edcSAsim Jamshed 	socket_map_t socket = GetSocket(mctx, pipeid);
33876404edcSAsim Jamshed 	struct pipe *pp = socket->pp;
33976404edcSAsim Jamshed 
34076404edcSAsim Jamshed 	if (!pp)
34176404edcSAsim Jamshed 		return -1;
34276404edcSAsim Jamshed 	if (pp->state < PIPE_ACTIVE)
34376404edcSAsim Jamshed 		return -1;
34476404edcSAsim Jamshed 
34576404edcSAsim Jamshed 	/* if there are payloads already read before epoll registration */
34676404edcSAsim Jamshed 	/* generate read event */
34776404edcSAsim Jamshed 	if (socket->epoll & MOS_EPOLLIN) {
34876404edcSAsim Jamshed 		if (pp->buf_len > 0) {
34976404edcSAsim Jamshed 			AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
35076404edcSAsim Jamshed 		} else if (pp->state == PIPE_CLOSE_WAIT) {
35176404edcSAsim Jamshed 			AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
35276404edcSAsim Jamshed 		}
35376404edcSAsim Jamshed 	}
35476404edcSAsim Jamshed 
35576404edcSAsim Jamshed 	/* same thing to the write event */
35676404edcSAsim Jamshed 	if (socket->epoll & MOS_EPOLLOUT) {
35776404edcSAsim Jamshed 		if (pp->buf_len < pp->buf_size) {
35876404edcSAsim Jamshed 			AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
35976404edcSAsim Jamshed 		}
36076404edcSAsim Jamshed 	}
36176404edcSAsim Jamshed 
36276404edcSAsim Jamshed 	return 0;
36376404edcSAsim Jamshed }
36476404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
36576404edcSAsim Jamshed int
PipeClose(mctx_t mctx,int pipeid)36676404edcSAsim Jamshed PipeClose(mctx_t mctx, int pipeid)
36776404edcSAsim Jamshed {
36876404edcSAsim Jamshed 	mtcp_manager_t mtcp;
36976404edcSAsim Jamshed 	socket_map_t socket;
37076404edcSAsim Jamshed 	struct pipe *pp;
37176404edcSAsim Jamshed 
37276404edcSAsim Jamshed 	mtcp = GetMTCPManager(mctx);
37376404edcSAsim Jamshed 	if (!mtcp) {
37476404edcSAsim Jamshed 		return -1;
37576404edcSAsim Jamshed 	}
37676404edcSAsim Jamshed 	socket = GetSocket(mctx, pipeid);
37776404edcSAsim Jamshed 	if (!socket) {
37876404edcSAsim Jamshed 		return -1;
37976404edcSAsim Jamshed 	}
38076404edcSAsim Jamshed 	if (socket->socktype != MOS_SOCK_PIPE) {
38176404edcSAsim Jamshed 		errno = EINVAL;
38276404edcSAsim Jamshed 		return -1;
38376404edcSAsim Jamshed 	}
38476404edcSAsim Jamshed 	pp = socket->pp;
38576404edcSAsim Jamshed 	if (!pp) {
38676404edcSAsim Jamshed 		return 0;
38776404edcSAsim Jamshed 	}
38876404edcSAsim Jamshed 
38976404edcSAsim Jamshed 	if (pp->state == PIPE_CLOSED) {
39076404edcSAsim Jamshed 		return 0;
39176404edcSAsim Jamshed 	}
39276404edcSAsim Jamshed 
39376404edcSAsim Jamshed 	pthread_mutex_lock(&pp->pipe_lock);
39476404edcSAsim Jamshed 	if (pp->state == PIPE_ACTIVE) {
39576404edcSAsim Jamshed 		pp->state = PIPE_CLOSE_WAIT;
39676404edcSAsim Jamshed 		RaiseEventToPair(mtcp, socket, MOS_EPOLLIN);
39776404edcSAsim Jamshed 		pthread_mutex_unlock(&pp->pipe_lock);
39876404edcSAsim Jamshed 		return 0;
39976404edcSAsim Jamshed 	}
40076404edcSAsim Jamshed 
40176404edcSAsim Jamshed 	/* control reaches here only when PIPE_CLOSE_WAIT */
40276404edcSAsim Jamshed 
40376404edcSAsim Jamshed 	if (pp->socket[0])
40476404edcSAsim Jamshed 		pp->socket[0]->pp = NULL;
40576404edcSAsim Jamshed 	if (pp->socket[1])
40676404edcSAsim Jamshed 		pp->socket[1]->pp = NULL;
40776404edcSAsim Jamshed 
40876404edcSAsim Jamshed 	pthread_mutex_unlock(&pp->pipe_lock);
40976404edcSAsim Jamshed 
41076404edcSAsim Jamshed 	pthread_mutex_destroy(&pp->pipe_lock);
41176404edcSAsim Jamshed 	pthread_cond_destroy(&pp->pipe_cond);
41276404edcSAsim Jamshed 
41376404edcSAsim Jamshed 	free(pp->buf);
41476404edcSAsim Jamshed 
41576404edcSAsim Jamshed 	free(pp);
41676404edcSAsim Jamshed 
41776404edcSAsim Jamshed 	return 0;
41876404edcSAsim Jamshed }
41976404edcSAsim Jamshed /*---------------------------------------------------------------------------*/
420