xref: /mOS-networking-stack/core/src/pipe.c (revision d8823779)
1 #include <pthread.h>
2 #include <errno.h>
3 #include <string.h>
4 
5 #include "pipe.h"
6 #include "eventpoll.h"
7 #include "tcp_stream.h"
8 #include "mtcp.h"
9 #include "debug.h"
10 
11 #define PIPE_BUF_SIZE 10240
12 
13 #define MAX(a, b) ((a)>(b)?(a):(b))
14 #define MIN(a, b) ((a)<(b)?(a):(b))
15 
16 /*---------------------------------------------------------------------------*/
17 enum pipe_state
18 {
19 	PIPE_CLOSED,
20 	PIPE_ACTIVE,
21 	PIPE_CLOSE_WAIT,
22 };
23 /*---------------------------------------------------------------------------*/
24 struct pipe
25 {
26 	int state;
27 	socket_map_t socket[2];
28 
29 	char *buf;
30 	int buf_off;
31 	int buf_tail;
32 	int buf_len;
33 	int buf_size;
34 
35 	pthread_mutex_t pipe_lock;
36 	pthread_cond_t pipe_cond;
37 };
38 /*---------------------------------------------------------------------------*/
39 int
40 mtcp_pipe(mctx_t mctx, int pipeid[2])
41 {
42 	socket_map_t socket[2];
43 	struct pipe *pp;
44 	int ret;
45 
46 	socket[0] = AllocateSocket(mctx, MOS_SOCK_PIPE);
47 	if (!socket[0]) {
48 		errno = ENFILE;
49 		return -1;
50 	}
51 	socket[1] = AllocateSocket(mctx, MOS_SOCK_PIPE);
52 	if (!socket[1]) {
53 		FreeSocket(mctx, socket[0]->id, MOS_SOCK_PIPE);
54 		errno = ENFILE;
55 		return -1;
56 	}
57 
58 	pp = (struct pipe *)calloc(1, sizeof(struct pipe));
59 	if (!pp) {
60 		/* errno set by calloc() */
61 		FreeSocket(mctx, socket[0]->id, MOS_SOCK_PIPE);
62 		FreeSocket(mctx, socket[1]->id, MOS_SOCK_PIPE);
63 		return -1;
64 	}
65 
66 	pp->buf_size = PIPE_BUF_SIZE;
67 	pp->buf = (char *)malloc(pp->buf_size);
68 	if (!pp->buf) {
69 		/* errno set by malloc() */
70 		FreeSocket(mctx, socket[0]->id, MOS_SOCK_PIPE);
71 		FreeSocket(mctx, socket[1]->id, MOS_SOCK_PIPE);
72 		free(pp);
73 		return -1;
74 	}
75 
76 	ret = pthread_mutex_init(&pp->pipe_lock, NULL);
77 	if (ret) {
78 		/* errno set by pthread_mutex_init() */
79 		FreeSocket(mctx, socket[0]->id, MOS_SOCK_PIPE);
80 		FreeSocket(mctx, socket[1]->id, MOS_SOCK_PIPE);
81 		free(pp->buf);
82 		free(pp);
83 		return -1;
84 
85 	}
86 	ret = pthread_cond_init(&pp->pipe_cond, NULL);
87 	if (ret) {
88 		/* errno set by pthread_cond_init() */
89 		FreeSocket(mctx, socket[0]->id, MOS_SOCK_PIPE);
90 		FreeSocket(mctx, socket[1]->id, MOS_SOCK_PIPE);
91 		free(pp->buf);
92 		pthread_mutex_destroy(&pp->pipe_lock);
93 		free(pp);
94 		return -1;
95 	}
96 
97 	pp->state = PIPE_ACTIVE;
98 	pp->socket[0] = socket[0];
99 	pp->socket[1] = socket[1];
100 	socket[0]->pp = pp;
101 	socket[1]->pp = pp;
102 
103 	pipeid[0] = socket[0]->id;
104 	pipeid[1] = socket[1]->id;
105 
106 	return 0;
107 
108 }
109 /*---------------------------------------------------------------------------*/
110 static void
111 RaiseEventToPair(mtcp_manager_t mtcp, socket_map_t socket, uint32_t event)
112 {
113 	struct pipe *pp = socket->pp;
114 	socket_map_t pair_socket;
115 
116 	if (pp->socket[0] == socket)
117 		pair_socket = pp->socket[1];
118 	else
119 		pair_socket = pp->socket[0];
120 
121 	if (pair_socket->opts & MTCP_NONBLOCK) {
122 		if (pair_socket->epoll) {
123 			AddEpollEvent(mtcp->ep, USR_EVENT_QUEUE, pair_socket, event);
124 		}
125 	} else {
126 		pthread_cond_signal(&pp->pipe_cond);
127 	}
128 }
129 /*---------------------------------------------------------------------------*/
130 int
131 PipeRead(mctx_t mctx, int pipeid, char *buf, int len)
132 {
133 	mtcp_manager_t mtcp;
134 	socket_map_t socket;
135 	struct pipe *pp;
136 	int to_read;
137 	int to_notify;
138 	int ret;
139 
140 	mtcp = GetMTCPManager(mctx);
141 	if (!mtcp) {
142 		return -1;
143 	}
144 	socket = GetSocket(mctx, pipeid);
145 	if (!socket) {
146 		return -1;
147 	}
148 	if (socket->socktype != MOS_SOCK_PIPE) {
149 		errno = EBADF;
150 		return -1;
151 	}
152 	pp = socket->pp;
153 	if (!pp) {
154 		errno = EBADF;
155 		return -1;
156 	}
157 	if (pp->state == PIPE_CLOSED) {
158 		errno = EINVAL;
159 		return -1;
160 	}
161 	if (pp->state == PIPE_CLOSE_WAIT && pp->buf_len == 0) {
162 		return 0;
163 	}
164 
165 	if (len <= 0) {
166 		if (socket->opts & MTCP_NONBLOCK) {
167 			errno = EAGAIN;
168 			return -1;
169 		} else {
170 			return 0;
171 		}
172 	}
173 
174 	pthread_mutex_lock(&pp->pipe_lock);
175 	if (!(socket->opts & MTCP_NONBLOCK)) {
176 		while (pp->buf_len == 0) {
177 			ret = pthread_cond_wait(&pp->pipe_cond, &pp->pipe_lock);
178 			if (ret) {
179 				/* errno set by pthread_cond_wait() */
180 				pthread_mutex_unlock(&pp->pipe_lock);
181 				return -1;
182 			}
183 		}
184 	}
185 
186 	to_read = MIN(len, pp->buf_len);
187 	if (to_read <= 0) {
188 		pthread_mutex_unlock(&pp->pipe_lock);
189 		if (pp->state == PIPE_ACTIVE) {
190 			errno = EAGAIN;
191 			return -1;
192 		} else if (pp->state == PIPE_CLOSE_WAIT) {
193 			return 0;
194 		}
195 	}
196 
197 	/* if the buffer was full, notify the write event to the pair socket */
198 	to_notify = FALSE;
199 	if (pp->buf_len == pp->buf_size)
200 		to_notify = TRUE;
201 
202 	if (pp->buf_off + to_read < pp->buf_size) {
203 		memcpy(buf, pp->buf + pp->buf_off, to_read);
204 		pp->buf_off += to_read;
205 	} else {
206 		int temp_read = pp->buf_size - pp->buf_off;
207 		memcpy(buf, pp->buf + pp->buf_off, temp_read);
208 		memcpy(buf + temp_read, pp->buf, to_read - temp_read);
209 		pp->buf_off = to_read - temp_read;
210 	}
211 	pp->buf_len -= to_read;
212 
213 	/* notify to the pair socket for new buffer space */
214 	if (to_notify) {
215 		RaiseEventToPair(mtcp, socket, MOS_EPOLLOUT);
216 	}
217 
218 	pthread_mutex_unlock(&pp->pipe_lock);
219 
220 	/* if level triggered, raise event for remainig buffer */
221 	if (pp->buf_len > 0) {
222 		if ((socket->epoll & MOS_EPOLLIN) && !(socket->epoll & MOS_EPOLLET)) {
223 			AddEpollEvent(mtcp->ep,
224 					USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
225 		}
226 	} else if (pp->state == PIPE_CLOSE_WAIT && pp->buf_len == 0) {
227 		AddEpollEvent(mtcp->ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
228 	}
229 
230 	return to_read;
231 }
232 /*---------------------------------------------------------------------------*/
233 int
234 PipeWrite(mctx_t mctx, int pipeid, const char *buf, int len)
235 {
236 	mtcp_manager_t mtcp;
237 	socket_map_t socket;
238 	struct pipe *pp;
239 	int to_write;
240 	int to_notify;
241 	int ret;
242 
243 	mtcp = GetMTCPManager(mctx);
244 	if (!mtcp) {
245 		return -1;
246 	}
247 	socket = GetSocket(mctx, pipeid);
248 	if (!socket) {
249 		return -1;
250 	}
251 	if (socket->socktype != MOS_SOCK_PIPE) {
252 		errno = EBADF;
253 		return -1;
254 	}
255 	pp = socket->pp;
256 	if (!pp) {
257 		errno = EBADF;
258 		return -1;
259 	}
260 	if (pp->state == PIPE_CLOSED) {
261 		errno = EINVAL;
262 		return -1;
263 	}
264 	if (pp->state == PIPE_CLOSE_WAIT) {
265 		errno = EPIPE;
266 		return -1;
267 	}
268 
269 	if (len <= 0) {
270 		if (socket->opts & MTCP_NONBLOCK) {
271 			errno = EAGAIN;
272 			return -1;
273 		} else {
274 			return 0;
275 		}
276 	}
277 
278 	pthread_mutex_lock(&pp->pipe_lock);
279 	if (!(socket->opts & MTCP_NONBLOCK)) {
280 		while (pp->buf_len == pp->buf_size) {
281 			ret = pthread_cond_wait(&pp->pipe_cond, &pp->pipe_lock);
282 			if (ret) {
283 				/* errno set by pthread_cond_wait() */
284 				pthread_mutex_unlock(&pp->pipe_lock);
285 				return -1;
286 			}
287 		}
288 	}
289 
290 	to_write = MIN(len, pp->buf_size - pp->buf_len);
291 	if (to_write <= 0) {
292 		pthread_mutex_unlock(&pp->pipe_lock);
293 		errno = EAGAIN;
294 		return -1;
295 	}
296 
297 	/* if the buffer was empty, notify read event to the pair socket */
298 	to_notify = FALSE;
299 	if (pp->buf_len == 0)
300 		to_notify = TRUE;
301 
302 	if (pp->buf_tail + to_write < pp->buf_size) {
303 		/* if the data fit into the buffer, copy it */
304 		memcpy(pp->buf + pp->buf_tail, buf, to_write);
305 		pp->buf_tail += to_write;
306 	} else {
307 		/* if the data overflow the buffer, wrap around the buffer */
308 		int temp_write = pp->buf_size - pp->buf_tail;
309 		memcpy(pp->buf + pp->buf_tail, buf, temp_write);
310 		memcpy(pp->buf, buf + temp_write, to_write - temp_write);
311 		pp->buf_tail = to_write - temp_write;
312 	}
313 	pp->buf_len += to_write;
314 
315 	/* notify to the pair socket for the new buffers */
316 	if (to_notify) {
317 		RaiseEventToPair(mtcp, socket, MOS_EPOLLIN);
318 	}
319 
320 	pthread_mutex_unlock(&pp->pipe_lock);
321 
322 	/* if level triggered, raise event for remainig buffer */
323 	if (pp->buf_len < pp->buf_size) {
324 		if ((socket->epoll & MOS_EPOLLOUT) && !(socket->epoll & MOS_EPOLLET)) {
325 			AddEpollEvent(mtcp->ep,
326 					USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
327 		}
328 	}
329 
330 	return to_write;
331 }
332 /*----------------------------------------------------------------------------*/
333 int
334 RaisePendingPipeEvents(mctx_t mctx, int epid, int pipeid)
335 {
336 	struct mtcp_epoll *ep = GetSocket(mctx, epid)->ep;
337 	socket_map_t socket = GetSocket(mctx, pipeid);
338 	struct pipe *pp = socket->pp;
339 
340 	if (!pp)
341 		return -1;
342 	if (pp->state < PIPE_ACTIVE)
343 		return -1;
344 
345 	/* if there are payloads already read before epoll registration */
346 	/* generate read event */
347 	if (socket->epoll & MOS_EPOLLIN) {
348 		if (pp->buf_len > 0) {
349 			AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
350 		} else if (pp->state == PIPE_CLOSE_WAIT) {
351 			AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
352 		}
353 	}
354 
355 	/* same thing to the write event */
356 	if (socket->epoll & MOS_EPOLLOUT) {
357 		if (pp->buf_len < pp->buf_size) {
358 			AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
359 		}
360 	}
361 
362 	return 0;
363 }
364 /*---------------------------------------------------------------------------*/
365 int
366 PipeClose(mctx_t mctx, int pipeid)
367 {
368 	mtcp_manager_t mtcp;
369 	socket_map_t socket;
370 	struct pipe *pp;
371 
372 	mtcp = GetMTCPManager(mctx);
373 	if (!mtcp) {
374 		return -1;
375 	}
376 	socket = GetSocket(mctx, pipeid);
377 	if (!socket) {
378 		return -1;
379 	}
380 	if (socket->socktype != MOS_SOCK_PIPE) {
381 		errno = EINVAL;
382 		return -1;
383 	}
384 	pp = socket->pp;
385 	if (!pp) {
386 		return 0;
387 	}
388 
389 	if (pp->state == PIPE_CLOSED) {
390 		return 0;
391 	}
392 
393 	pthread_mutex_lock(&pp->pipe_lock);
394 	if (pp->state == PIPE_ACTIVE) {
395 		pp->state = PIPE_CLOSE_WAIT;
396 		RaiseEventToPair(mtcp, socket, MOS_EPOLLIN);
397 		pthread_mutex_unlock(&pp->pipe_lock);
398 		return 0;
399 	}
400 
401 	/* control reaches here only when PIPE_CLOSE_WAIT */
402 
403 	if (pp->socket[0])
404 		pp->socket[0]->pp = NULL;
405 	if (pp->socket[1])
406 		pp->socket[1]->pp = NULL;
407 
408 	pthread_mutex_unlock(&pp->pipe_lock);
409 
410 	pthread_mutex_destroy(&pp->pipe_lock);
411 	pthread_cond_destroy(&pp->pipe_cond);
412 
413 	free(pp->buf);
414 
415 	free(pp);
416 
417 	return 0;
418 }
419 /*---------------------------------------------------------------------------*/
420