1 #include <pthread.h>
2 #include <errno.h>
3 #include <string.h>
4
5 #include "pipe.h"
6 #include "eventpoll.h"
7 #include "tcp_stream.h"
8 #include "mtcp.h"
9 #include "debug.h"
10
11 #define PIPE_BUF_SIZE 10240
12
13 #define MAX(a, b) ((a)>(b)?(a):(b))
14 #define MIN(a, b) ((a)<(b)?(a):(b))
15
16 /*---------------------------------------------------------------------------*/
17 enum pipe_state
18 {
19 PIPE_CLOSED,
20 PIPE_ACTIVE,
21 PIPE_CLOSE_WAIT,
22 };
23 /*---------------------------------------------------------------------------*/
24 struct pipe
25 {
26 int state;
27 socket_map_t socket[2];
28
29 char *buf;
30 int buf_off;
31 int buf_tail;
32 int buf_len;
33 int buf_size;
34
35 pthread_mutex_t pipe_lock;
36 pthread_cond_t pipe_cond;
37 };
38 /*---------------------------------------------------------------------------*/
39 int
mtcp_pipe(mctx_t mctx,int pipeid[2])40 mtcp_pipe(mctx_t mctx, int pipeid[2])
41 {
42 socket_map_t socket[2];
43 struct pipe *pp;
44 int ret;
45
46 socket[0] = AllocateSocket(mctx, MOS_SOCK_PIPE);
47 if (!socket[0]) {
48 errno = ENFILE;
49 return -1;
50 }
51 socket[1] = AllocateSocket(mctx, MOS_SOCK_PIPE);
52 if (!socket[1]) {
53 FreeSocket(mctx, socket[0]->id, MOS_SOCK_PIPE);
54 errno = ENFILE;
55 return -1;
56 }
57
58 pp = (struct pipe *)calloc(1, sizeof(struct pipe));
59 if (!pp) {
60 /* errno set by calloc() */
61 FreeSocket(mctx, socket[0]->id, MOS_SOCK_PIPE);
62 FreeSocket(mctx, socket[1]->id, MOS_SOCK_PIPE);
63 return -1;
64 }
65
66 pp->buf_size = PIPE_BUF_SIZE;
67 pp->buf = (char *)malloc(pp->buf_size);
68 if (!pp->buf) {
69 /* errno set by malloc() */
70 FreeSocket(mctx, socket[0]->id, MOS_SOCK_PIPE);
71 FreeSocket(mctx, socket[1]->id, MOS_SOCK_PIPE);
72 free(pp);
73 return -1;
74 }
75
76 ret = pthread_mutex_init(&pp->pipe_lock, NULL);
77 if (ret) {
78 /* errno set by pthread_mutex_init() */
79 FreeSocket(mctx, socket[0]->id, MOS_SOCK_PIPE);
80 FreeSocket(mctx, socket[1]->id, MOS_SOCK_PIPE);
81 free(pp->buf);
82 free(pp);
83 return -1;
84
85 }
86 ret = pthread_cond_init(&pp->pipe_cond, NULL);
87 if (ret) {
88 /* errno set by pthread_cond_init() */
89 FreeSocket(mctx, socket[0]->id, MOS_SOCK_PIPE);
90 FreeSocket(mctx, socket[1]->id, MOS_SOCK_PIPE);
91 free(pp->buf);
92 pthread_mutex_destroy(&pp->pipe_lock);
93 free(pp);
94 return -1;
95 }
96
97 pp->state = PIPE_ACTIVE;
98 pp->socket[0] = socket[0];
99 pp->socket[1] = socket[1];
100 socket[0]->pp = pp;
101 socket[1]->pp = pp;
102
103 pipeid[0] = socket[0]->id;
104 pipeid[1] = socket[1]->id;
105
106 return 0;
107
108 }
109 /*---------------------------------------------------------------------------*/
110 static void
RaiseEventToPair(mtcp_manager_t mtcp,socket_map_t socket,uint32_t event)111 RaiseEventToPair(mtcp_manager_t mtcp, socket_map_t socket, uint32_t event)
112 {
113 struct pipe *pp = socket->pp;
114 socket_map_t pair_socket;
115
116 if (pp->socket[0] == socket)
117 pair_socket = pp->socket[1];
118 else
119 pair_socket = pp->socket[0];
120
121 if (pair_socket->opts & MTCP_NONBLOCK) {
122 if (pair_socket->epoll) {
123 AddEpollEvent(mtcp->ep, USR_EVENT_QUEUE, pair_socket, event);
124 }
125 } else {
126 pthread_cond_signal(&pp->pipe_cond);
127 }
128 }
129 /*---------------------------------------------------------------------------*/
130 int
PipeRead(mctx_t mctx,int pipeid,char * buf,int len)131 PipeRead(mctx_t mctx, int pipeid, char *buf, int len)
132 {
133 mtcp_manager_t mtcp;
134 socket_map_t socket;
135 struct pipe *pp;
136 int to_read;
137 int to_notify;
138 int ret;
139
140 mtcp = GetMTCPManager(mctx);
141 if (!mtcp) {
142 return -1;
143 }
144 socket = GetSocket(mctx, pipeid);
145 if (!socket) {
146 return -1;
147 }
148 if (socket->socktype != MOS_SOCK_PIPE) {
149 errno = EBADF;
150 return -1;
151 }
152 pp = socket->pp;
153 if (!pp) {
154 errno = EBADF;
155 return -1;
156 }
157 if (pp->state == PIPE_CLOSED) {
158 errno = EINVAL;
159 return -1;
160 }
161 if (pp->state == PIPE_CLOSE_WAIT && pp->buf_len == 0) {
162 return 0;
163 }
164
165 if (len <= 0) {
166 if (socket->opts & MTCP_NONBLOCK) {
167 errno = EAGAIN;
168 return -1;
169 } else {
170 return 0;
171 }
172 }
173
174 pthread_mutex_lock(&pp->pipe_lock);
175 if (!(socket->opts & MTCP_NONBLOCK)) {
176 while (pp->buf_len == 0) {
177 ret = pthread_cond_wait(&pp->pipe_cond, &pp->pipe_lock);
178 if (ret) {
179 /* errno set by pthread_cond_wait() */
180 pthread_mutex_unlock(&pp->pipe_lock);
181 return -1;
182 }
183 }
184 }
185
186 to_read = MIN(len, pp->buf_len);
187 if (to_read <= 0) {
188 pthread_mutex_unlock(&pp->pipe_lock);
189 if (pp->state == PIPE_ACTIVE) {
190 errno = EAGAIN;
191 return -1;
192 } else if (pp->state == PIPE_CLOSE_WAIT) {
193 return 0;
194 }
195 }
196
197 /* if the buffer was full, notify the write event to the pair socket */
198 to_notify = FALSE;
199 if (pp->buf_len == pp->buf_size)
200 to_notify = TRUE;
201
202 if (pp->buf_off + to_read < pp->buf_size) {
203 memcpy(buf, pp->buf + pp->buf_off, to_read);
204 pp->buf_off += to_read;
205 } else {
206 int temp_read = pp->buf_size - pp->buf_off;
207 memcpy(buf, pp->buf + pp->buf_off, temp_read);
208 memcpy(buf + temp_read, pp->buf, to_read - temp_read);
209 pp->buf_off = to_read - temp_read;
210 }
211 pp->buf_len -= to_read;
212
213 /* notify to the pair socket for new buffer space */
214 if (to_notify) {
215 RaiseEventToPair(mtcp, socket, MOS_EPOLLOUT);
216 }
217
218 pthread_mutex_unlock(&pp->pipe_lock);
219
220 /* if level triggered, raise event for remainig buffer */
221 if (pp->buf_len > 0) {
222 if ((socket->epoll & MOS_EPOLLIN) && !(socket->epoll & MOS_EPOLLET)) {
223 AddEpollEvent(mtcp->ep,
224 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
225 }
226 } else if (pp->state == PIPE_CLOSE_WAIT && pp->buf_len == 0) {
227 AddEpollEvent(mtcp->ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
228 }
229
230 return to_read;
231 }
232 /*---------------------------------------------------------------------------*/
233 int
PipeWrite(mctx_t mctx,int pipeid,const char * buf,int len)234 PipeWrite(mctx_t mctx, int pipeid, const char *buf, int len)
235 {
236 mtcp_manager_t mtcp;
237 socket_map_t socket;
238 struct pipe *pp;
239 int to_write;
240 int to_notify;
241 int ret;
242
243 mtcp = GetMTCPManager(mctx);
244 if (!mtcp) {
245 return -1;
246 }
247 socket = GetSocket(mctx, pipeid);
248 if (!socket) {
249 return -1;
250 }
251 if (socket->socktype != MOS_SOCK_PIPE) {
252 errno = EBADF;
253 return -1;
254 }
255 pp = socket->pp;
256 if (!pp) {
257 errno = EBADF;
258 return -1;
259 }
260 if (pp->state == PIPE_CLOSED) {
261 errno = EINVAL;
262 return -1;
263 }
264 if (pp->state == PIPE_CLOSE_WAIT) {
265 errno = EPIPE;
266 return -1;
267 }
268
269 if (len <= 0) {
270 if (socket->opts & MTCP_NONBLOCK) {
271 errno = EAGAIN;
272 return -1;
273 } else {
274 return 0;
275 }
276 }
277
278 pthread_mutex_lock(&pp->pipe_lock);
279 if (!(socket->opts & MTCP_NONBLOCK)) {
280 while (pp->buf_len == pp->buf_size) {
281 ret = pthread_cond_wait(&pp->pipe_cond, &pp->pipe_lock);
282 if (ret) {
283 /* errno set by pthread_cond_wait() */
284 pthread_mutex_unlock(&pp->pipe_lock);
285 return -1;
286 }
287 }
288 }
289
290 to_write = MIN(len, pp->buf_size - pp->buf_len);
291 if (to_write <= 0) {
292 pthread_mutex_unlock(&pp->pipe_lock);
293 errno = EAGAIN;
294 return -1;
295 }
296
297 /* if the buffer was empty, notify read event to the pair socket */
298 to_notify = FALSE;
299 if (pp->buf_len == 0)
300 to_notify = TRUE;
301
302 if (pp->buf_tail + to_write < pp->buf_size) {
303 /* if the data fit into the buffer, copy it */
304 memcpy(pp->buf + pp->buf_tail, buf, to_write);
305 pp->buf_tail += to_write;
306 } else {
307 /* if the data overflow the buffer, wrap around the buffer */
308 int temp_write = pp->buf_size - pp->buf_tail;
309 memcpy(pp->buf + pp->buf_tail, buf, temp_write);
310 memcpy(pp->buf, buf + temp_write, to_write - temp_write);
311 pp->buf_tail = to_write - temp_write;
312 }
313 pp->buf_len += to_write;
314
315 /* notify to the pair socket for the new buffers */
316 if (to_notify) {
317 RaiseEventToPair(mtcp, socket, MOS_EPOLLIN);
318 }
319
320 pthread_mutex_unlock(&pp->pipe_lock);
321
322 /* if level triggered, raise event for remainig buffer */
323 if (pp->buf_len < pp->buf_size) {
324 if ((socket->epoll & MOS_EPOLLOUT) && !(socket->epoll & MOS_EPOLLET)) {
325 AddEpollEvent(mtcp->ep,
326 USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
327 }
328 }
329
330 return to_write;
331 }
332 /*----------------------------------------------------------------------------*/
333 int
RaisePendingPipeEvents(mctx_t mctx,int epid,int pipeid)334 RaisePendingPipeEvents(mctx_t mctx, int epid, int pipeid)
335 {
336 struct mtcp_epoll *ep = GetSocket(mctx, epid)->ep;
337 socket_map_t socket = GetSocket(mctx, pipeid);
338 struct pipe *pp = socket->pp;
339
340 if (!pp)
341 return -1;
342 if (pp->state < PIPE_ACTIVE)
343 return -1;
344
345 /* if there are payloads already read before epoll registration */
346 /* generate read event */
347 if (socket->epoll & MOS_EPOLLIN) {
348 if (pp->buf_len > 0) {
349 AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
350 } else if (pp->state == PIPE_CLOSE_WAIT) {
351 AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLIN);
352 }
353 }
354
355 /* same thing to the write event */
356 if (socket->epoll & MOS_EPOLLOUT) {
357 if (pp->buf_len < pp->buf_size) {
358 AddEpollEvent(ep, USR_SHADOW_EVENT_QUEUE, socket, MOS_EPOLLOUT);
359 }
360 }
361
362 return 0;
363 }
364 /*---------------------------------------------------------------------------*/
365 int
PipeClose(mctx_t mctx,int pipeid)366 PipeClose(mctx_t mctx, int pipeid)
367 {
368 mtcp_manager_t mtcp;
369 socket_map_t socket;
370 struct pipe *pp;
371
372 mtcp = GetMTCPManager(mctx);
373 if (!mtcp) {
374 return -1;
375 }
376 socket = GetSocket(mctx, pipeid);
377 if (!socket) {
378 return -1;
379 }
380 if (socket->socktype != MOS_SOCK_PIPE) {
381 errno = EINVAL;
382 return -1;
383 }
384 pp = socket->pp;
385 if (!pp) {
386 return 0;
387 }
388
389 if (pp->state == PIPE_CLOSED) {
390 return 0;
391 }
392
393 pthread_mutex_lock(&pp->pipe_lock);
394 if (pp->state == PIPE_ACTIVE) {
395 pp->state = PIPE_CLOSE_WAIT;
396 RaiseEventToPair(mtcp, socket, MOS_EPOLLIN);
397 pthread_mutex_unlock(&pp->pipe_lock);
398 return 0;
399 }
400
401 /* control reaches here only when PIPE_CLOSE_WAIT */
402
403 if (pp->socket[0])
404 pp->socket[0]->pp = NULL;
405 if (pp->socket[1])
406 pp->socket[1]->pp = NULL;
407
408 pthread_mutex_unlock(&pp->pipe_lock);
409
410 pthread_mutex_destroy(&pp->pipe_lock);
411 pthread_cond_destroy(&pp->pipe_cond);
412
413 free(pp->buf);
414
415 free(pp);
416
417 return 0;
418 }
419 /*---------------------------------------------------------------------------*/
420