1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Nginx, Inc.
5  */
6 
7 
8 #include <ngx_config.h>
9 #include <ngx_core.h>
10 #include <ngx_event.h>
11 #include <ngx_event_pipe.h>
12 
13 
14 static ngx_int_t ngx_event_pipe_read_upstream(ngx_event_pipe_t *p);
15 static ngx_int_t ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p);
16 
17 static ngx_int_t ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p);
18 static ngx_inline void ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf);
19 static ngx_int_t ngx_event_pipe_drain_chains(ngx_event_pipe_t *p);
20 
21 
22 ngx_int_t
ngx_event_pipe(ngx_event_pipe_t * p,ngx_int_t do_write)23 ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write)
24 {
25     ngx_int_t     rc;
26     ngx_uint_t    flags;
27     ngx_event_t  *rev, *wev;
28 
29     for ( ;; ) {
30         if (do_write) {
31             p->log->action = "sending to client";
32 
33             rc = ngx_event_pipe_write_to_downstream(p);
34 
35             if (rc == NGX_ABORT) {
36                 return NGX_ABORT;
37             }
38 
39             if (rc == NGX_BUSY) {
40                 return NGX_OK;
41             }
42         }
43 
44         p->read = 0;
45         p->upstream_blocked = 0;
46 
47         p->log->action = "reading upstream";
48 
49         if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) {
50             return NGX_ABORT;
51         }
52 
53         if (!p->read && !p->upstream_blocked) {
54             break;
55         }
56 
57         do_write = 1;
58     }
59 
60     if (p->upstream->fd != (ngx_socket_t) -1) {
61         rev = p->upstream->read;
62 
63         flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0;
64 
65         if (ngx_handle_read_event(rev, flags) != NGX_OK) {
66             return NGX_ABORT;
67         }
68 
69         if (!rev->delayed) {
70             if (rev->active && !rev->ready) {
71                 ngx_add_timer(rev, p->read_timeout);
72 
73             } else if (rev->timer_set) {
74                 ngx_del_timer(rev);
75             }
76         }
77     }
78 
79     if (p->downstream->fd != (ngx_socket_t) -1
80         && p->downstream->data == p->output_ctx)
81     {
82         wev = p->downstream->write;
83         if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {
84             return NGX_ABORT;
85         }
86 
87         if (!wev->delayed) {
88             if (wev->active && !wev->ready) {
89                 ngx_add_timer(wev, p->send_timeout);
90 
91             } else if (wev->timer_set) {
92                 ngx_del_timer(wev);
93             }
94         }
95     }
96 
97     return NGX_OK;
98 }
99 
100 
101 static ngx_int_t
ngx_event_pipe_read_upstream(ngx_event_pipe_t * p)102 ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
103 {
104     off_t         limit;
105     ssize_t       n, size;
106     ngx_int_t     rc;
107     ngx_buf_t    *b;
108     ngx_msec_t    delay;
109     ngx_chain_t  *chain, *cl, *ln;
110 
111     if (p->upstream_eof || p->upstream_error || p->upstream_done) {
112         return NGX_OK;
113     }
114 
115 #if (NGX_THREADS)
116 
117     if (p->aio) {
118         ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
119                        "pipe read upstream: aio");
120         return NGX_AGAIN;
121     }
122 
123     if (p->writing) {
124         ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
125                        "pipe read upstream: writing");
126 
127         rc = ngx_event_pipe_write_chain_to_temp_file(p);
128 
129         if (rc != NGX_OK) {
130             return rc;
131         }
132     }
133 
134 #endif
135 
136     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
137                    "pipe read upstream: %d", p->upstream->read->ready);
138 
139     for ( ;; ) {
140 
141         if (p->upstream_eof || p->upstream_error || p->upstream_done) {
142             break;
143         }
144 
145         if (p->preread_bufs == NULL && !p->upstream->read->ready) {
146             break;
147         }
148 
149         if (p->preread_bufs) {
150 
151             /* use the pre-read bufs if they exist */
152 
153             chain = p->preread_bufs;
154             p->preread_bufs = NULL;
155             n = p->preread_size;
156 
157             ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
158                            "pipe preread: %z", n);
159 
160             if (n) {
161                 p->read = 1;
162             }
163 
164         } else {
165 
166 #if (NGX_HAVE_KQUEUE) || (NGX_HAVE_FSTACK)
167 
168             /*
169              * kqueue notifies about the end of file or a pending error.
170              * This test allows not to allocate a buf on these conditions
171              * and not to call c->recv_chain().
172              */
173 
174             if (p->upstream->read->available == 0
175                 && p->upstream->read->pending_eof)
176             {
177                 p->upstream->read->ready = 0;
178                 p->upstream->read->eof = 1;
179                 p->upstream_eof = 1;
180                 p->read = 1;
181 
182                 if (p->upstream->read->kq_errno) {
183                     p->upstream->read->error = 1;
184                     p->upstream_error = 1;
185                     p->upstream_eof = 0;
186 
187                     ngx_log_error(NGX_LOG_ERR, p->log,
188                                   p->upstream->read->kq_errno,
189                                   "kevent() reported that upstream "
190                                   "closed connection");
191                 }
192 
193                 break;
194             }
195 #endif
196 
197             if (p->limit_rate) {
198                 if (p->upstream->read->delayed) {
199                     break;
200                 }
201 
202                 limit = (off_t) p->limit_rate * (ngx_time() - p->start_sec + 1)
203                         - p->read_length;
204 
205                 if (limit <= 0) {
206                     p->upstream->read->delayed = 1;
207                     delay = (ngx_msec_t) (- limit * 1000 / p->limit_rate + 1);
208                     ngx_add_timer(p->upstream->read, delay);
209                     break;
210                 }
211 
212             } else {
213                 limit = 0;
214             }
215 
216             if (p->free_raw_bufs) {
217 
218                 /* use the free bufs if they exist */
219 
220                 chain = p->free_raw_bufs;
221                 if (p->single_buf) {
222                     p->free_raw_bufs = p->free_raw_bufs->next;
223                     chain->next = NULL;
224                 } else {
225                     p->free_raw_bufs = NULL;
226                 }
227 
228             } else if (p->allocated < p->bufs.num) {
229 
230                 /* allocate a new buf if it's still allowed */
231 
232                 b = ngx_create_temp_buf(p->pool, p->bufs.size);
233                 if (b == NULL) {
234                     return NGX_ABORT;
235                 }
236 
237                 p->allocated++;
238 
239                 chain = ngx_alloc_chain_link(p->pool);
240                 if (chain == NULL) {
241                     return NGX_ABORT;
242                 }
243 
244                 chain->buf = b;
245                 chain->next = NULL;
246 
247             } else if (!p->cacheable
248                        && p->downstream->data == p->output_ctx
249                        && p->downstream->write->ready
250                        && !p->downstream->write->delayed)
251             {
252                 /*
253                  * if the bufs are not needed to be saved in a cache and
254                  * a downstream is ready then write the bufs to a downstream
255                  */
256 
257                 p->upstream_blocked = 1;
258 
259                 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
260                                "pipe downstream ready");
261 
262                 break;
263 
264             } else if (p->cacheable
265                        || p->temp_file->offset < p->max_temp_file_size)
266             {
267 
268                 /*
269                  * if it is allowed, then save some bufs from p->in
270                  * to a temporary file, and add them to a p->out chain
271                  */
272 
273                 rc = ngx_event_pipe_write_chain_to_temp_file(p);
274 
275                 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
276                                "pipe temp offset: %O", p->temp_file->offset);
277 
278                 if (rc == NGX_BUSY) {
279                     break;
280                 }
281 
282                 if (rc != NGX_OK) {
283                     return rc;
284                 }
285 
286                 chain = p->free_raw_bufs;
287                 if (p->single_buf) {
288                     p->free_raw_bufs = p->free_raw_bufs->next;
289                     chain->next = NULL;
290                 } else {
291                     p->free_raw_bufs = NULL;
292                 }
293 
294             } else {
295 
296                 /* there are no bufs to read in */
297 
298                 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
299                                "no pipe bufs to read in");
300 
301                 break;
302             }
303 
304             n = p->upstream->recv_chain(p->upstream, chain, limit);
305 
306             ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
307                            "pipe recv chain: %z", n);
308 
309             if (p->free_raw_bufs) {
310                 chain->next = p->free_raw_bufs;
311             }
312             p->free_raw_bufs = chain;
313 
314             if (n == NGX_ERROR) {
315                 p->upstream_error = 1;
316                 break;
317             }
318 
319             if (n == NGX_AGAIN) {
320                 if (p->single_buf) {
321                     ngx_event_pipe_remove_shadow_links(chain->buf);
322                 }
323 
324                 break;
325             }
326 
327             p->read = 1;
328 
329             if (n == 0) {
330                 p->upstream_eof = 1;
331                 break;
332             }
333         }
334 
335         delay = p->limit_rate ? (ngx_msec_t) n * 1000 / p->limit_rate : 0;
336 
337         p->read_length += n;
338         cl = chain;
339         p->free_raw_bufs = NULL;
340 
341         while (cl && n > 0) {
342 
343             ngx_event_pipe_remove_shadow_links(cl->buf);
344 
345             size = cl->buf->end - cl->buf->last;
346 
347             if (n >= size) {
348                 cl->buf->last = cl->buf->end;
349 
350                 /* STUB */ cl->buf->num = p->num++;
351 
352                 if (p->input_filter(p, cl->buf) == NGX_ERROR) {
353                     return NGX_ABORT;
354                 }
355 
356                 n -= size;
357                 ln = cl;
358                 cl = cl->next;
359                 ngx_free_chain(p->pool, ln);
360 
361             } else {
362                 cl->buf->last += n;
363                 n = 0;
364             }
365         }
366 
367         if (cl) {
368             for (ln = cl; ln->next; ln = ln->next) { /* void */ }
369 
370             ln->next = p->free_raw_bufs;
371             p->free_raw_bufs = cl;
372         }
373 
374         if (delay > 0) {
375             p->upstream->read->delayed = 1;
376             ngx_add_timer(p->upstream->read, delay);
377             break;
378         }
379     }
380 
381 #if (NGX_DEBUG)
382 
383     for (cl = p->busy; cl; cl = cl->next) {
384         ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
385                        "pipe buf busy s:%d t:%d f:%d "
386                        "%p, pos %p, size: %z "
387                        "file: %O, size: %O",
388                        (cl->buf->shadow ? 1 : 0),
389                        cl->buf->temporary, cl->buf->in_file,
390                        cl->buf->start, cl->buf->pos,
391                        cl->buf->last - cl->buf->pos,
392                        cl->buf->file_pos,
393                        cl->buf->file_last - cl->buf->file_pos);
394     }
395 
396     for (cl = p->out; cl; cl = cl->next) {
397         ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
398                        "pipe buf out  s:%d t:%d f:%d "
399                        "%p, pos %p, size: %z "
400                        "file: %O, size: %O",
401                        (cl->buf->shadow ? 1 : 0),
402                        cl->buf->temporary, cl->buf->in_file,
403                        cl->buf->start, cl->buf->pos,
404                        cl->buf->last - cl->buf->pos,
405                        cl->buf->file_pos,
406                        cl->buf->file_last - cl->buf->file_pos);
407     }
408 
409     for (cl = p->in; cl; cl = cl->next) {
410         ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
411                        "pipe buf in   s:%d t:%d f:%d "
412                        "%p, pos %p, size: %z "
413                        "file: %O, size: %O",
414                        (cl->buf->shadow ? 1 : 0),
415                        cl->buf->temporary, cl->buf->in_file,
416                        cl->buf->start, cl->buf->pos,
417                        cl->buf->last - cl->buf->pos,
418                        cl->buf->file_pos,
419                        cl->buf->file_last - cl->buf->file_pos);
420     }
421 
422     for (cl = p->free_raw_bufs; cl; cl = cl->next) {
423         ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
424                        "pipe buf free s:%d t:%d f:%d "
425                        "%p, pos %p, size: %z "
426                        "file: %O, size: %O",
427                        (cl->buf->shadow ? 1 : 0),
428                        cl->buf->temporary, cl->buf->in_file,
429                        cl->buf->start, cl->buf->pos,
430                        cl->buf->last - cl->buf->pos,
431                        cl->buf->file_pos,
432                        cl->buf->file_last - cl->buf->file_pos);
433     }
434 
435     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
436                    "pipe length: %O", p->length);
437 
438 #endif
439 
440     if (p->free_raw_bufs && p->length != -1) {
441         cl = p->free_raw_bufs;
442 
443         if (cl->buf->last - cl->buf->pos >= p->length) {
444 
445             p->free_raw_bufs = cl->next;
446 
447             /* STUB */ cl->buf->num = p->num++;
448 
449             if (p->input_filter(p, cl->buf) == NGX_ERROR) {
450                 return NGX_ABORT;
451             }
452 
453             ngx_free_chain(p->pool, cl);
454         }
455     }
456 
457     if (p->length == 0) {
458         p->upstream_done = 1;
459         p->read = 1;
460     }
461 
462     if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) {
463 
464         /* STUB */ p->free_raw_bufs->buf->num = p->num++;
465 
466         if (p->input_filter(p, p->free_raw_bufs->buf) == NGX_ERROR) {
467             return NGX_ABORT;
468         }
469 
470         p->free_raw_bufs = p->free_raw_bufs->next;
471 
472         if (p->free_bufs && p->buf_to_file == NULL) {
473             for (cl = p->free_raw_bufs; cl; cl = cl->next) {
474                 if (cl->buf->shadow == NULL) {
475                     ngx_pfree(p->pool, cl->buf->start);
476                 }
477             }
478         }
479     }
480 
481     if (p->cacheable && (p->in || p->buf_to_file)) {
482 
483         ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
484                        "pipe write chain");
485 
486         rc = ngx_event_pipe_write_chain_to_temp_file(p);
487 
488         if (rc != NGX_OK) {
489             return rc;
490         }
491     }
492 
493     return NGX_OK;
494 }
495 
496 
497 static ngx_int_t
ngx_event_pipe_write_to_downstream(ngx_event_pipe_t * p)498 ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
499 {
500     u_char            *prev;
501     size_t             bsize;
502     ngx_int_t          rc;
503     ngx_uint_t         flush, flushed, prev_last_shadow;
504     ngx_chain_t       *out, **ll, *cl;
505     ngx_connection_t  *downstream;
506 
507     downstream = p->downstream;
508 
509     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
510                    "pipe write downstream: %d", downstream->write->ready);
511 
512 #if (NGX_THREADS)
513 
514     if (p->writing) {
515         rc = ngx_event_pipe_write_chain_to_temp_file(p);
516 
517         if (rc == NGX_ABORT) {
518             return NGX_ABORT;
519         }
520     }
521 
522 #endif
523 
524     flushed = 0;
525 
526     for ( ;; ) {
527         if (p->downstream_error) {
528             return ngx_event_pipe_drain_chains(p);
529         }
530 
531         if (p->upstream_eof || p->upstream_error || p->upstream_done) {
532 
533             /* pass the p->out and p->in chains to the output filter */
534 
535             for (cl = p->busy; cl; cl = cl->next) {
536                 cl->buf->recycled = 0;
537             }
538 
539             if (p->out) {
540                 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
541                                "pipe write downstream flush out");
542 
543                 for (cl = p->out; cl; cl = cl->next) {
544                     cl->buf->recycled = 0;
545                 }
546 
547                 rc = p->output_filter(p->output_ctx, p->out);
548 
549                 if (rc == NGX_ERROR) {
550                     p->downstream_error = 1;
551                     return ngx_event_pipe_drain_chains(p);
552                 }
553 
554                 p->out = NULL;
555             }
556 
557             if (p->writing) {
558                 break;
559             }
560 
561             if (p->in) {
562                 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
563                                "pipe write downstream flush in");
564 
565                 for (cl = p->in; cl; cl = cl->next) {
566                     cl->buf->recycled = 0;
567                 }
568 
569                 rc = p->output_filter(p->output_ctx, p->in);
570 
571                 if (rc == NGX_ERROR) {
572                     p->downstream_error = 1;
573                     return ngx_event_pipe_drain_chains(p);
574                 }
575 
576                 p->in = NULL;
577             }
578 
579             ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
580                            "pipe write downstream done");
581 
582             /* TODO: free unused bufs */
583 
584             p->downstream_done = 1;
585             break;
586         }
587 
588         if (downstream->data != p->output_ctx
589             || !downstream->write->ready
590             || downstream->write->delayed)
591         {
592             break;
593         }
594 
595         /* bsize is the size of the busy recycled bufs */
596 
597         prev = NULL;
598         bsize = 0;
599 
600         for (cl = p->busy; cl; cl = cl->next) {
601 
602             if (cl->buf->recycled) {
603                 if (prev == cl->buf->start) {
604                     continue;
605                 }
606 
607                 bsize += cl->buf->end - cl->buf->start;
608                 prev = cl->buf->start;
609             }
610         }
611 
612         ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
613                        "pipe write busy: %uz", bsize);
614 
615         out = NULL;
616 
617         if (bsize >= (size_t) p->busy_size) {
618             flush = 1;
619             goto flush;
620         }
621 
622         flush = 0;
623         ll = NULL;
624         prev_last_shadow = 1;
625 
626         for ( ;; ) {
627             if (p->out) {
628                 cl = p->out;
629 
630                 if (cl->buf->recycled) {
631                     ngx_log_error(NGX_LOG_ALERT, p->log, 0,
632                                   "recycled buffer in pipe out chain");
633                 }
634 
635                 p->out = p->out->next;
636 
637             } else if (!p->cacheable && !p->writing && p->in) {
638                 cl = p->in;
639 
640                 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
641                                "pipe write buf ls:%d %p %z",
642                                cl->buf->last_shadow,
643                                cl->buf->pos,
644                                cl->buf->last - cl->buf->pos);
645 
646                 if (cl->buf->recycled && prev_last_shadow) {
647                     if (bsize + cl->buf->end - cl->buf->start > p->busy_size) {
648                         flush = 1;
649                         break;
650                     }
651 
652                     bsize += cl->buf->end - cl->buf->start;
653                 }
654 
655                 prev_last_shadow = cl->buf->last_shadow;
656 
657                 p->in = p->in->next;
658 
659             } else {
660                 break;
661             }
662 
663             cl->next = NULL;
664 
665             if (out) {
666                 *ll = cl;
667             } else {
668                 out = cl;
669             }
670             ll = &cl->next;
671         }
672 
673     flush:
674 
675         ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
676                        "pipe write: out:%p, f:%ui", out, flush);
677 
678         if (out == NULL) {
679 
680             if (!flush) {
681                 break;
682             }
683 
684             /* a workaround for AIO */
685             if (flushed++ > 10) {
686                 return NGX_BUSY;
687             }
688         }
689 
690         rc = p->output_filter(p->output_ctx, out);
691 
692         ngx_chain_update_chains(p->pool, &p->free, &p->busy, &out, p->tag);
693 
694         if (rc == NGX_ERROR) {
695             p->downstream_error = 1;
696             return ngx_event_pipe_drain_chains(p);
697         }
698 
699         for (cl = p->free; cl; cl = cl->next) {
700 
701             if (cl->buf->temp_file) {
702                 if (p->cacheable || !p->cyclic_temp_file) {
703                     continue;
704                 }
705 
706                 /* reset p->temp_offset if all bufs had been sent */
707 
708                 if (cl->buf->file_last == p->temp_file->offset) {
709                     p->temp_file->offset = 0;
710                 }
711             }
712 
713             /* TODO: free buf if p->free_bufs && upstream done */
714 
715             /* add the free shadow raw buf to p->free_raw_bufs */
716 
717             if (cl->buf->last_shadow) {
718                 if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
719                     return NGX_ABORT;
720                 }
721 
722                 cl->buf->last_shadow = 0;
723             }
724 
725             cl->buf->shadow = NULL;
726         }
727     }
728 
729     return NGX_OK;
730 }
731 
732 
733 static ngx_int_t
ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t * p)734 ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
735 {
736     ssize_t       size, bsize, n;
737     ngx_buf_t    *b;
738     ngx_uint_t    prev_last_shadow;
739     ngx_chain_t  *cl, *tl, *next, *out, **ll, **last_out, **last_free;
740 
741 #if (NGX_THREADS)
742 
743     if (p->writing) {
744 
745         if (p->aio) {
746             return NGX_AGAIN;
747         }
748 
749         out = p->writing;
750         p->writing = NULL;
751 
752         n = ngx_write_chain_to_temp_file(p->temp_file, NULL);
753 
754         if (n == NGX_ERROR) {
755             return NGX_ABORT;
756         }
757 
758         goto done;
759     }
760 
761 #endif
762 
763     if (p->buf_to_file) {
764         out = ngx_alloc_chain_link(p->pool);
765         if (out == NULL) {
766             return NGX_ABORT;
767         }
768 
769         out->buf = p->buf_to_file;
770         out->next = p->in;
771 
772     } else {
773         out = p->in;
774     }
775 
776     if (!p->cacheable) {
777 
778         size = 0;
779         cl = out;
780         ll = NULL;
781         prev_last_shadow = 1;
782 
783         ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
784                        "pipe offset: %O", p->temp_file->offset);
785 
786         do {
787             bsize = cl->buf->last - cl->buf->pos;
788 
789             ngx_log_debug4(NGX_LOG_DEBUG_EVENT, p->log, 0,
790                            "pipe buf ls:%d %p, pos %p, size: %z",
791                            cl->buf->last_shadow, cl->buf->start,
792                            cl->buf->pos, bsize);
793 
794             if (prev_last_shadow
795                 && ((size + bsize > p->temp_file_write_size)
796                     || (p->temp_file->offset + size + bsize
797                         > p->max_temp_file_size)))
798             {
799                 break;
800             }
801 
802             prev_last_shadow = cl->buf->last_shadow;
803 
804             size += bsize;
805             ll = &cl->next;
806             cl = cl->next;
807 
808         } while (cl);
809 
810         ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "size: %z", size);
811 
812         if (ll == NULL) {
813             return NGX_BUSY;
814         }
815 
816         if (cl) {
817             p->in = cl;
818             *ll = NULL;
819 
820         } else {
821             p->in = NULL;
822             p->last_in = &p->in;
823         }
824 
825     } else {
826         p->in = NULL;
827         p->last_in = &p->in;
828     }
829 
830 #if (NGX_THREADS)
831     if (p->thread_handler) {
832         p->temp_file->thread_write = 1;
833         p->temp_file->file.thread_task = p->thread_task;
834         p->temp_file->file.thread_handler = p->thread_handler;
835         p->temp_file->file.thread_ctx = p->thread_ctx;
836     }
837 #endif
838 
839     n = ngx_write_chain_to_temp_file(p->temp_file, out);
840 
841     if (n == NGX_ERROR) {
842         return NGX_ABORT;
843     }
844 
845 #if (NGX_THREADS)
846 
847     if (n == NGX_AGAIN) {
848         p->writing = out;
849         p->thread_task = p->temp_file->file.thread_task;
850         return NGX_AGAIN;
851     }
852 
853 done:
854 
855 #endif
856 
857     if (p->buf_to_file) {
858         p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos;
859         n -= p->buf_to_file->last - p->buf_to_file->pos;
860         p->buf_to_file = NULL;
861         out = out->next;
862     }
863 
864     if (n > 0) {
865         /* update previous buffer or add new buffer */
866 
867         if (p->out) {
868             for (cl = p->out; cl->next; cl = cl->next) { /* void */ }
869 
870             b = cl->buf;
871 
872             if (b->file_last == p->temp_file->offset) {
873                 p->temp_file->offset += n;
874                 b->file_last = p->temp_file->offset;
875                 goto free;
876             }
877 
878             last_out = &cl->next;
879 
880         } else {
881             last_out = &p->out;
882         }
883 
884         cl = ngx_chain_get_free_buf(p->pool, &p->free);
885         if (cl == NULL) {
886             return NGX_ABORT;
887         }
888 
889         b = cl->buf;
890 
891         ngx_memzero(b, sizeof(ngx_buf_t));
892 
893         b->tag = p->tag;
894 
895         b->file = &p->temp_file->file;
896         b->file_pos = p->temp_file->offset;
897         p->temp_file->offset += n;
898         b->file_last = p->temp_file->offset;
899 
900         b->in_file = 1;
901         b->temp_file = 1;
902 
903         *last_out = cl;
904     }
905 
906 free:
907 
908     for (last_free = &p->free_raw_bufs;
909          *last_free != NULL;
910          last_free = &(*last_free)->next)
911     {
912         /* void */
913     }
914 
915     for (cl = out; cl; cl = next) {
916         next = cl->next;
917 
918         cl->next = p->free;
919         p->free = cl;
920 
921         b = cl->buf;
922 
923         if (b->last_shadow) {
924 
925             tl = ngx_alloc_chain_link(p->pool);
926             if (tl == NULL) {
927                 return NGX_ABORT;
928             }
929 
930             tl->buf = b->shadow;
931             tl->next = NULL;
932 
933             *last_free = tl;
934             last_free = &tl->next;
935 
936             b->shadow->pos = b->shadow->start;
937             b->shadow->last = b->shadow->start;
938 
939             ngx_event_pipe_remove_shadow_links(b->shadow);
940         }
941     }
942 
943     return NGX_OK;
944 }
945 
946 
947 /* the copy input filter */
948 
949 ngx_int_t
ngx_event_pipe_copy_input_filter(ngx_event_pipe_t * p,ngx_buf_t * buf)950 ngx_event_pipe_copy_input_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
951 {
952     ngx_buf_t    *b;
953     ngx_chain_t  *cl;
954 
955     if (buf->pos == buf->last) {
956         return NGX_OK;
957     }
958 
959     cl = ngx_chain_get_free_buf(p->pool, &p->free);
960     if (cl == NULL) {
961         return NGX_ERROR;
962     }
963 
964     b = cl->buf;
965 
966     ngx_memcpy(b, buf, sizeof(ngx_buf_t));
967     b->shadow = buf;
968     b->tag = p->tag;
969     b->last_shadow = 1;
970     b->recycled = 1;
971     buf->shadow = b;
972 
973     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "input buf #%d", b->num);
974 
975     if (p->in) {
976         *p->last_in = cl;
977     } else {
978         p->in = cl;
979     }
980     p->last_in = &cl->next;
981 
982     if (p->length == -1) {
983         return NGX_OK;
984     }
985 
986     p->length -= b->last - b->pos;
987 
988     return NGX_OK;
989 }
990 
991 
992 static ngx_inline void
ngx_event_pipe_remove_shadow_links(ngx_buf_t * buf)993 ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf)
994 {
995     ngx_buf_t  *b, *next;
996 
997     b = buf->shadow;
998 
999     if (b == NULL) {
1000         return;
1001     }
1002 
1003     while (!b->last_shadow) {
1004         next = b->shadow;
1005 
1006         b->temporary = 0;
1007         b->recycled = 0;
1008 
1009         b->shadow = NULL;
1010         b = next;
1011     }
1012 
1013     b->temporary = 0;
1014     b->recycled = 0;
1015     b->last_shadow = 0;
1016 
1017     b->shadow = NULL;
1018 
1019     buf->shadow = NULL;
1020 }
1021 
1022 
1023 ngx_int_t
ngx_event_pipe_add_free_buf(ngx_event_pipe_t * p,ngx_buf_t * b)1024 ngx_event_pipe_add_free_buf(ngx_event_pipe_t *p, ngx_buf_t *b)
1025 {
1026     ngx_chain_t  *cl;
1027 
1028     cl = ngx_alloc_chain_link(p->pool);
1029     if (cl == NULL) {
1030         return NGX_ERROR;
1031     }
1032 
1033     if (p->buf_to_file && b->start == p->buf_to_file->start) {
1034         b->pos = p->buf_to_file->last;
1035         b->last = p->buf_to_file->last;
1036 
1037     } else {
1038         b->pos = b->start;
1039         b->last = b->start;
1040     }
1041 
1042     b->shadow = NULL;
1043 
1044     cl->buf = b;
1045 
1046     if (p->free_raw_bufs == NULL) {
1047         p->free_raw_bufs = cl;
1048         cl->next = NULL;
1049 
1050         return NGX_OK;
1051     }
1052 
1053     if (p->free_raw_bufs->buf->pos == p->free_raw_bufs->buf->last) {
1054 
1055         /* add the free buf to the list start */
1056 
1057         cl->next = p->free_raw_bufs;
1058         p->free_raw_bufs = cl;
1059 
1060         return NGX_OK;
1061     }
1062 
1063     /* the first free buf is partially filled, thus add the free buf after it */
1064 
1065     cl->next = p->free_raw_bufs->next;
1066     p->free_raw_bufs->next = cl;
1067 
1068     return NGX_OK;
1069 }
1070 
1071 
1072 static ngx_int_t
ngx_event_pipe_drain_chains(ngx_event_pipe_t * p)1073 ngx_event_pipe_drain_chains(ngx_event_pipe_t *p)
1074 {
1075     ngx_chain_t  *cl, *tl;
1076 
1077     for ( ;; ) {
1078         if (p->busy) {
1079             cl = p->busy;
1080             p->busy = NULL;
1081 
1082         } else if (p->out) {
1083             cl = p->out;
1084             p->out = NULL;
1085 
1086         } else if (p->in) {
1087             cl = p->in;
1088             p->in = NULL;
1089 
1090         } else {
1091             return NGX_OK;
1092         }
1093 
1094         while (cl) {
1095             if (cl->buf->last_shadow) {
1096                 if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
1097                     return NGX_ABORT;
1098                 }
1099 
1100                 cl->buf->last_shadow = 0;
1101             }
1102 
1103             cl->buf->shadow = NULL;
1104             tl = cl->next;
1105             cl->next = p->free;
1106             p->free = cl;
1107             cl = tl;
1108         }
1109     }
1110 }
1111