1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Nginx, Inc.
5 */
6
7
8 #include <ngx_config.h>
9 #include <ngx_core.h>
10 #include <ngx_event.h>
11 #include <ngx_event_pipe.h>
12
13
14 static ngx_int_t ngx_event_pipe_read_upstream(ngx_event_pipe_t *p);
15 static ngx_int_t ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p);
16
17 static ngx_int_t ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p);
18 static ngx_inline void ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf);
19 static ngx_int_t ngx_event_pipe_drain_chains(ngx_event_pipe_t *p);
20
21
22 ngx_int_t
ngx_event_pipe(ngx_event_pipe_t * p,ngx_int_t do_write)23 ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write)
24 {
25 ngx_int_t rc;
26 ngx_uint_t flags;
27 ngx_event_t *rev, *wev;
28
29 for ( ;; ) {
30 if (do_write) {
31 p->log->action = "sending to client";
32
33 rc = ngx_event_pipe_write_to_downstream(p);
34
35 if (rc == NGX_ABORT) {
36 return NGX_ABORT;
37 }
38
39 if (rc == NGX_BUSY) {
40 return NGX_OK;
41 }
42 }
43
44 p->read = 0;
45 p->upstream_blocked = 0;
46
47 p->log->action = "reading upstream";
48
49 if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) {
50 return NGX_ABORT;
51 }
52
53 if (!p->read && !p->upstream_blocked) {
54 break;
55 }
56
57 do_write = 1;
58 }
59
60 if (p->upstream->fd != (ngx_socket_t) -1) {
61 rev = p->upstream->read;
62
63 flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0;
64
65 if (ngx_handle_read_event(rev, flags) != NGX_OK) {
66 return NGX_ABORT;
67 }
68
69 if (!rev->delayed) {
70 if (rev->active && !rev->ready) {
71 ngx_add_timer(rev, p->read_timeout);
72
73 } else if (rev->timer_set) {
74 ngx_del_timer(rev);
75 }
76 }
77 }
78
79 if (p->downstream->fd != (ngx_socket_t) -1
80 && p->downstream->data == p->output_ctx)
81 {
82 wev = p->downstream->write;
83 if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {
84 return NGX_ABORT;
85 }
86
87 if (!wev->delayed) {
88 if (wev->active && !wev->ready) {
89 ngx_add_timer(wev, p->send_timeout);
90
91 } else if (wev->timer_set) {
92 ngx_del_timer(wev);
93 }
94 }
95 }
96
97 return NGX_OK;
98 }
99
100
101 static ngx_int_t
ngx_event_pipe_read_upstream(ngx_event_pipe_t * p)102 ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
103 {
104 off_t limit;
105 ssize_t n, size;
106 ngx_int_t rc;
107 ngx_buf_t *b;
108 ngx_msec_t delay;
109 ngx_chain_t *chain, *cl, *ln;
110
111 if (p->upstream_eof || p->upstream_error || p->upstream_done) {
112 return NGX_OK;
113 }
114
115 #if (NGX_THREADS)
116
117 if (p->aio) {
118 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
119 "pipe read upstream: aio");
120 return NGX_AGAIN;
121 }
122
123 if (p->writing) {
124 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
125 "pipe read upstream: writing");
126
127 rc = ngx_event_pipe_write_chain_to_temp_file(p);
128
129 if (rc != NGX_OK) {
130 return rc;
131 }
132 }
133
134 #endif
135
136 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
137 "pipe read upstream: %d", p->upstream->read->ready);
138
139 for ( ;; ) {
140
141 if (p->upstream_eof || p->upstream_error || p->upstream_done) {
142 break;
143 }
144
145 if (p->preread_bufs == NULL && !p->upstream->read->ready) {
146 break;
147 }
148
149 if (p->preread_bufs) {
150
151 /* use the pre-read bufs if they exist */
152
153 chain = p->preread_bufs;
154 p->preread_bufs = NULL;
155 n = p->preread_size;
156
157 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
158 "pipe preread: %z", n);
159
160 if (n) {
161 p->read = 1;
162 }
163
164 } else {
165
166 #if (NGX_HAVE_KQUEUE) || (NGX_HAVE_FSTACK)
167
168 /*
169 * kqueue notifies about the end of file or a pending error.
170 * This test allows not to allocate a buf on these conditions
171 * and not to call c->recv_chain().
172 */
173
174 if (p->upstream->read->available == 0
175 && p->upstream->read->pending_eof)
176 {
177 p->upstream->read->ready = 0;
178 p->upstream->read->eof = 1;
179 p->upstream_eof = 1;
180 p->read = 1;
181
182 if (p->upstream->read->kq_errno) {
183 p->upstream->read->error = 1;
184 p->upstream_error = 1;
185 p->upstream_eof = 0;
186
187 ngx_log_error(NGX_LOG_ERR, p->log,
188 p->upstream->read->kq_errno,
189 "kevent() reported that upstream "
190 "closed connection");
191 }
192
193 break;
194 }
195 #endif
196
197 if (p->limit_rate) {
198 if (p->upstream->read->delayed) {
199 break;
200 }
201
202 limit = (off_t) p->limit_rate * (ngx_time() - p->start_sec + 1)
203 - p->read_length;
204
205 if (limit <= 0) {
206 p->upstream->read->delayed = 1;
207 delay = (ngx_msec_t) (- limit * 1000 / p->limit_rate + 1);
208 ngx_add_timer(p->upstream->read, delay);
209 break;
210 }
211
212 } else {
213 limit = 0;
214 }
215
216 if (p->free_raw_bufs) {
217
218 /* use the free bufs if they exist */
219
220 chain = p->free_raw_bufs;
221 if (p->single_buf) {
222 p->free_raw_bufs = p->free_raw_bufs->next;
223 chain->next = NULL;
224 } else {
225 p->free_raw_bufs = NULL;
226 }
227
228 } else if (p->allocated < p->bufs.num) {
229
230 /* allocate a new buf if it's still allowed */
231
232 b = ngx_create_temp_buf(p->pool, p->bufs.size);
233 if (b == NULL) {
234 return NGX_ABORT;
235 }
236
237 p->allocated++;
238
239 chain = ngx_alloc_chain_link(p->pool);
240 if (chain == NULL) {
241 return NGX_ABORT;
242 }
243
244 chain->buf = b;
245 chain->next = NULL;
246
247 } else if (!p->cacheable
248 && p->downstream->data == p->output_ctx
249 && p->downstream->write->ready
250 && !p->downstream->write->delayed)
251 {
252 /*
253 * if the bufs are not needed to be saved in a cache and
254 * a downstream is ready then write the bufs to a downstream
255 */
256
257 p->upstream_blocked = 1;
258
259 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
260 "pipe downstream ready");
261
262 break;
263
264 } else if (p->cacheable
265 || p->temp_file->offset < p->max_temp_file_size)
266 {
267
268 /*
269 * if it is allowed, then save some bufs from p->in
270 * to a temporary file, and add them to a p->out chain
271 */
272
273 rc = ngx_event_pipe_write_chain_to_temp_file(p);
274
275 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
276 "pipe temp offset: %O", p->temp_file->offset);
277
278 if (rc == NGX_BUSY) {
279 break;
280 }
281
282 if (rc != NGX_OK) {
283 return rc;
284 }
285
286 chain = p->free_raw_bufs;
287 if (p->single_buf) {
288 p->free_raw_bufs = p->free_raw_bufs->next;
289 chain->next = NULL;
290 } else {
291 p->free_raw_bufs = NULL;
292 }
293
294 } else {
295
296 /* there are no bufs to read in */
297
298 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
299 "no pipe bufs to read in");
300
301 break;
302 }
303
304 n = p->upstream->recv_chain(p->upstream, chain, limit);
305
306 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
307 "pipe recv chain: %z", n);
308
309 if (p->free_raw_bufs) {
310 chain->next = p->free_raw_bufs;
311 }
312 p->free_raw_bufs = chain;
313
314 if (n == NGX_ERROR) {
315 p->upstream_error = 1;
316 break;
317 }
318
319 if (n == NGX_AGAIN) {
320 if (p->single_buf) {
321 ngx_event_pipe_remove_shadow_links(chain->buf);
322 }
323
324 break;
325 }
326
327 p->read = 1;
328
329 if (n == 0) {
330 p->upstream_eof = 1;
331 break;
332 }
333 }
334
335 delay = p->limit_rate ? (ngx_msec_t) n * 1000 / p->limit_rate : 0;
336
337 p->read_length += n;
338 cl = chain;
339 p->free_raw_bufs = NULL;
340
341 while (cl && n > 0) {
342
343 ngx_event_pipe_remove_shadow_links(cl->buf);
344
345 size = cl->buf->end - cl->buf->last;
346
347 if (n >= size) {
348 cl->buf->last = cl->buf->end;
349
350 /* STUB */ cl->buf->num = p->num++;
351
352 if (p->input_filter(p, cl->buf) == NGX_ERROR) {
353 return NGX_ABORT;
354 }
355
356 n -= size;
357 ln = cl;
358 cl = cl->next;
359 ngx_free_chain(p->pool, ln);
360
361 } else {
362 cl->buf->last += n;
363 n = 0;
364 }
365 }
366
367 if (cl) {
368 for (ln = cl; ln->next; ln = ln->next) { /* void */ }
369
370 ln->next = p->free_raw_bufs;
371 p->free_raw_bufs = cl;
372 }
373
374 if (delay > 0) {
375 p->upstream->read->delayed = 1;
376 ngx_add_timer(p->upstream->read, delay);
377 break;
378 }
379 }
380
381 #if (NGX_DEBUG)
382
383 for (cl = p->busy; cl; cl = cl->next) {
384 ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
385 "pipe buf busy s:%d t:%d f:%d "
386 "%p, pos %p, size: %z "
387 "file: %O, size: %O",
388 (cl->buf->shadow ? 1 : 0),
389 cl->buf->temporary, cl->buf->in_file,
390 cl->buf->start, cl->buf->pos,
391 cl->buf->last - cl->buf->pos,
392 cl->buf->file_pos,
393 cl->buf->file_last - cl->buf->file_pos);
394 }
395
396 for (cl = p->out; cl; cl = cl->next) {
397 ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
398 "pipe buf out s:%d t:%d f:%d "
399 "%p, pos %p, size: %z "
400 "file: %O, size: %O",
401 (cl->buf->shadow ? 1 : 0),
402 cl->buf->temporary, cl->buf->in_file,
403 cl->buf->start, cl->buf->pos,
404 cl->buf->last - cl->buf->pos,
405 cl->buf->file_pos,
406 cl->buf->file_last - cl->buf->file_pos);
407 }
408
409 for (cl = p->in; cl; cl = cl->next) {
410 ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
411 "pipe buf in s:%d t:%d f:%d "
412 "%p, pos %p, size: %z "
413 "file: %O, size: %O",
414 (cl->buf->shadow ? 1 : 0),
415 cl->buf->temporary, cl->buf->in_file,
416 cl->buf->start, cl->buf->pos,
417 cl->buf->last - cl->buf->pos,
418 cl->buf->file_pos,
419 cl->buf->file_last - cl->buf->file_pos);
420 }
421
422 for (cl = p->free_raw_bufs; cl; cl = cl->next) {
423 ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
424 "pipe buf free s:%d t:%d f:%d "
425 "%p, pos %p, size: %z "
426 "file: %O, size: %O",
427 (cl->buf->shadow ? 1 : 0),
428 cl->buf->temporary, cl->buf->in_file,
429 cl->buf->start, cl->buf->pos,
430 cl->buf->last - cl->buf->pos,
431 cl->buf->file_pos,
432 cl->buf->file_last - cl->buf->file_pos);
433 }
434
435 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
436 "pipe length: %O", p->length);
437
438 #endif
439
440 if (p->free_raw_bufs && p->length != -1) {
441 cl = p->free_raw_bufs;
442
443 if (cl->buf->last - cl->buf->pos >= p->length) {
444
445 p->free_raw_bufs = cl->next;
446
447 /* STUB */ cl->buf->num = p->num++;
448
449 if (p->input_filter(p, cl->buf) == NGX_ERROR) {
450 return NGX_ABORT;
451 }
452
453 ngx_free_chain(p->pool, cl);
454 }
455 }
456
457 if (p->length == 0) {
458 p->upstream_done = 1;
459 p->read = 1;
460 }
461
462 if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) {
463
464 /* STUB */ p->free_raw_bufs->buf->num = p->num++;
465
466 if (p->input_filter(p, p->free_raw_bufs->buf) == NGX_ERROR) {
467 return NGX_ABORT;
468 }
469
470 p->free_raw_bufs = p->free_raw_bufs->next;
471
472 if (p->free_bufs && p->buf_to_file == NULL) {
473 for (cl = p->free_raw_bufs; cl; cl = cl->next) {
474 if (cl->buf->shadow == NULL) {
475 ngx_pfree(p->pool, cl->buf->start);
476 }
477 }
478 }
479 }
480
481 if (p->cacheable && (p->in || p->buf_to_file)) {
482
483 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
484 "pipe write chain");
485
486 rc = ngx_event_pipe_write_chain_to_temp_file(p);
487
488 if (rc != NGX_OK) {
489 return rc;
490 }
491 }
492
493 return NGX_OK;
494 }
495
496
497 static ngx_int_t
ngx_event_pipe_write_to_downstream(ngx_event_pipe_t * p)498 ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
499 {
500 u_char *prev;
501 size_t bsize;
502 ngx_int_t rc;
503 ngx_uint_t flush, flushed, prev_last_shadow;
504 ngx_chain_t *out, **ll, *cl;
505 ngx_connection_t *downstream;
506
507 downstream = p->downstream;
508
509 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
510 "pipe write downstream: %d", downstream->write->ready);
511
512 #if (NGX_THREADS)
513
514 if (p->writing) {
515 rc = ngx_event_pipe_write_chain_to_temp_file(p);
516
517 if (rc == NGX_ABORT) {
518 return NGX_ABORT;
519 }
520 }
521
522 #endif
523
524 flushed = 0;
525
526 for ( ;; ) {
527 if (p->downstream_error) {
528 return ngx_event_pipe_drain_chains(p);
529 }
530
531 if (p->upstream_eof || p->upstream_error || p->upstream_done) {
532
533 /* pass the p->out and p->in chains to the output filter */
534
535 for (cl = p->busy; cl; cl = cl->next) {
536 cl->buf->recycled = 0;
537 }
538
539 if (p->out) {
540 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
541 "pipe write downstream flush out");
542
543 for (cl = p->out; cl; cl = cl->next) {
544 cl->buf->recycled = 0;
545 }
546
547 rc = p->output_filter(p->output_ctx, p->out);
548
549 if (rc == NGX_ERROR) {
550 p->downstream_error = 1;
551 return ngx_event_pipe_drain_chains(p);
552 }
553
554 p->out = NULL;
555 }
556
557 if (p->writing) {
558 break;
559 }
560
561 if (p->in) {
562 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
563 "pipe write downstream flush in");
564
565 for (cl = p->in; cl; cl = cl->next) {
566 cl->buf->recycled = 0;
567 }
568
569 rc = p->output_filter(p->output_ctx, p->in);
570
571 if (rc == NGX_ERROR) {
572 p->downstream_error = 1;
573 return ngx_event_pipe_drain_chains(p);
574 }
575
576 p->in = NULL;
577 }
578
579 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
580 "pipe write downstream done");
581
582 /* TODO: free unused bufs */
583
584 p->downstream_done = 1;
585 break;
586 }
587
588 if (downstream->data != p->output_ctx
589 || !downstream->write->ready
590 || downstream->write->delayed)
591 {
592 break;
593 }
594
595 /* bsize is the size of the busy recycled bufs */
596
597 prev = NULL;
598 bsize = 0;
599
600 for (cl = p->busy; cl; cl = cl->next) {
601
602 if (cl->buf->recycled) {
603 if (prev == cl->buf->start) {
604 continue;
605 }
606
607 bsize += cl->buf->end - cl->buf->start;
608 prev = cl->buf->start;
609 }
610 }
611
612 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
613 "pipe write busy: %uz", bsize);
614
615 out = NULL;
616
617 if (bsize >= (size_t) p->busy_size) {
618 flush = 1;
619 goto flush;
620 }
621
622 flush = 0;
623 ll = NULL;
624 prev_last_shadow = 1;
625
626 for ( ;; ) {
627 if (p->out) {
628 cl = p->out;
629
630 if (cl->buf->recycled) {
631 ngx_log_error(NGX_LOG_ALERT, p->log, 0,
632 "recycled buffer in pipe out chain");
633 }
634
635 p->out = p->out->next;
636
637 } else if (!p->cacheable && !p->writing && p->in) {
638 cl = p->in;
639
640 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
641 "pipe write buf ls:%d %p %z",
642 cl->buf->last_shadow,
643 cl->buf->pos,
644 cl->buf->last - cl->buf->pos);
645
646 if (cl->buf->recycled && prev_last_shadow) {
647 if (bsize + cl->buf->end - cl->buf->start > p->busy_size) {
648 flush = 1;
649 break;
650 }
651
652 bsize += cl->buf->end - cl->buf->start;
653 }
654
655 prev_last_shadow = cl->buf->last_shadow;
656
657 p->in = p->in->next;
658
659 } else {
660 break;
661 }
662
663 cl->next = NULL;
664
665 if (out) {
666 *ll = cl;
667 } else {
668 out = cl;
669 }
670 ll = &cl->next;
671 }
672
673 flush:
674
675 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
676 "pipe write: out:%p, f:%ui", out, flush);
677
678 if (out == NULL) {
679
680 if (!flush) {
681 break;
682 }
683
684 /* a workaround for AIO */
685 if (flushed++ > 10) {
686 return NGX_BUSY;
687 }
688 }
689
690 rc = p->output_filter(p->output_ctx, out);
691
692 ngx_chain_update_chains(p->pool, &p->free, &p->busy, &out, p->tag);
693
694 if (rc == NGX_ERROR) {
695 p->downstream_error = 1;
696 return ngx_event_pipe_drain_chains(p);
697 }
698
699 for (cl = p->free; cl; cl = cl->next) {
700
701 if (cl->buf->temp_file) {
702 if (p->cacheable || !p->cyclic_temp_file) {
703 continue;
704 }
705
706 /* reset p->temp_offset if all bufs had been sent */
707
708 if (cl->buf->file_last == p->temp_file->offset) {
709 p->temp_file->offset = 0;
710 }
711 }
712
713 /* TODO: free buf if p->free_bufs && upstream done */
714
715 /* add the free shadow raw buf to p->free_raw_bufs */
716
717 if (cl->buf->last_shadow) {
718 if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
719 return NGX_ABORT;
720 }
721
722 cl->buf->last_shadow = 0;
723 }
724
725 cl->buf->shadow = NULL;
726 }
727 }
728
729 return NGX_OK;
730 }
731
732
733 static ngx_int_t
ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t * p)734 ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
735 {
736 ssize_t size, bsize, n;
737 ngx_buf_t *b;
738 ngx_uint_t prev_last_shadow;
739 ngx_chain_t *cl, *tl, *next, *out, **ll, **last_out, **last_free;
740
741 #if (NGX_THREADS)
742
743 if (p->writing) {
744
745 if (p->aio) {
746 return NGX_AGAIN;
747 }
748
749 out = p->writing;
750 p->writing = NULL;
751
752 n = ngx_write_chain_to_temp_file(p->temp_file, NULL);
753
754 if (n == NGX_ERROR) {
755 return NGX_ABORT;
756 }
757
758 goto done;
759 }
760
761 #endif
762
763 if (p->buf_to_file) {
764 out = ngx_alloc_chain_link(p->pool);
765 if (out == NULL) {
766 return NGX_ABORT;
767 }
768
769 out->buf = p->buf_to_file;
770 out->next = p->in;
771
772 } else {
773 out = p->in;
774 }
775
776 if (!p->cacheable) {
777
778 size = 0;
779 cl = out;
780 ll = NULL;
781 prev_last_shadow = 1;
782
783 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
784 "pipe offset: %O", p->temp_file->offset);
785
786 do {
787 bsize = cl->buf->last - cl->buf->pos;
788
789 ngx_log_debug4(NGX_LOG_DEBUG_EVENT, p->log, 0,
790 "pipe buf ls:%d %p, pos %p, size: %z",
791 cl->buf->last_shadow, cl->buf->start,
792 cl->buf->pos, bsize);
793
794 if (prev_last_shadow
795 && ((size + bsize > p->temp_file_write_size)
796 || (p->temp_file->offset + size + bsize
797 > p->max_temp_file_size)))
798 {
799 break;
800 }
801
802 prev_last_shadow = cl->buf->last_shadow;
803
804 size += bsize;
805 ll = &cl->next;
806 cl = cl->next;
807
808 } while (cl);
809
810 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "size: %z", size);
811
812 if (ll == NULL) {
813 return NGX_BUSY;
814 }
815
816 if (cl) {
817 p->in = cl;
818 *ll = NULL;
819
820 } else {
821 p->in = NULL;
822 p->last_in = &p->in;
823 }
824
825 } else {
826 p->in = NULL;
827 p->last_in = &p->in;
828 }
829
830 #if (NGX_THREADS)
831 if (p->thread_handler) {
832 p->temp_file->thread_write = 1;
833 p->temp_file->file.thread_task = p->thread_task;
834 p->temp_file->file.thread_handler = p->thread_handler;
835 p->temp_file->file.thread_ctx = p->thread_ctx;
836 }
837 #endif
838
839 n = ngx_write_chain_to_temp_file(p->temp_file, out);
840
841 if (n == NGX_ERROR) {
842 return NGX_ABORT;
843 }
844
845 #if (NGX_THREADS)
846
847 if (n == NGX_AGAIN) {
848 p->writing = out;
849 p->thread_task = p->temp_file->file.thread_task;
850 return NGX_AGAIN;
851 }
852
853 done:
854
855 #endif
856
857 if (p->buf_to_file) {
858 p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos;
859 n -= p->buf_to_file->last - p->buf_to_file->pos;
860 p->buf_to_file = NULL;
861 out = out->next;
862 }
863
864 if (n > 0) {
865 /* update previous buffer or add new buffer */
866
867 if (p->out) {
868 for (cl = p->out; cl->next; cl = cl->next) { /* void */ }
869
870 b = cl->buf;
871
872 if (b->file_last == p->temp_file->offset) {
873 p->temp_file->offset += n;
874 b->file_last = p->temp_file->offset;
875 goto free;
876 }
877
878 last_out = &cl->next;
879
880 } else {
881 last_out = &p->out;
882 }
883
884 cl = ngx_chain_get_free_buf(p->pool, &p->free);
885 if (cl == NULL) {
886 return NGX_ABORT;
887 }
888
889 b = cl->buf;
890
891 ngx_memzero(b, sizeof(ngx_buf_t));
892
893 b->tag = p->tag;
894
895 b->file = &p->temp_file->file;
896 b->file_pos = p->temp_file->offset;
897 p->temp_file->offset += n;
898 b->file_last = p->temp_file->offset;
899
900 b->in_file = 1;
901 b->temp_file = 1;
902
903 *last_out = cl;
904 }
905
906 free:
907
908 for (last_free = &p->free_raw_bufs;
909 *last_free != NULL;
910 last_free = &(*last_free)->next)
911 {
912 /* void */
913 }
914
915 for (cl = out; cl; cl = next) {
916 next = cl->next;
917
918 cl->next = p->free;
919 p->free = cl;
920
921 b = cl->buf;
922
923 if (b->last_shadow) {
924
925 tl = ngx_alloc_chain_link(p->pool);
926 if (tl == NULL) {
927 return NGX_ABORT;
928 }
929
930 tl->buf = b->shadow;
931 tl->next = NULL;
932
933 *last_free = tl;
934 last_free = &tl->next;
935
936 b->shadow->pos = b->shadow->start;
937 b->shadow->last = b->shadow->start;
938
939 ngx_event_pipe_remove_shadow_links(b->shadow);
940 }
941 }
942
943 return NGX_OK;
944 }
945
946
947 /* the copy input filter */
948
949 ngx_int_t
ngx_event_pipe_copy_input_filter(ngx_event_pipe_t * p,ngx_buf_t * buf)950 ngx_event_pipe_copy_input_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
951 {
952 ngx_buf_t *b;
953 ngx_chain_t *cl;
954
955 if (buf->pos == buf->last) {
956 return NGX_OK;
957 }
958
959 cl = ngx_chain_get_free_buf(p->pool, &p->free);
960 if (cl == NULL) {
961 return NGX_ERROR;
962 }
963
964 b = cl->buf;
965
966 ngx_memcpy(b, buf, sizeof(ngx_buf_t));
967 b->shadow = buf;
968 b->tag = p->tag;
969 b->last_shadow = 1;
970 b->recycled = 1;
971 buf->shadow = b;
972
973 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "input buf #%d", b->num);
974
975 if (p->in) {
976 *p->last_in = cl;
977 } else {
978 p->in = cl;
979 }
980 p->last_in = &cl->next;
981
982 if (p->length == -1) {
983 return NGX_OK;
984 }
985
986 p->length -= b->last - b->pos;
987
988 return NGX_OK;
989 }
990
991
992 static ngx_inline void
ngx_event_pipe_remove_shadow_links(ngx_buf_t * buf)993 ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf)
994 {
995 ngx_buf_t *b, *next;
996
997 b = buf->shadow;
998
999 if (b == NULL) {
1000 return;
1001 }
1002
1003 while (!b->last_shadow) {
1004 next = b->shadow;
1005
1006 b->temporary = 0;
1007 b->recycled = 0;
1008
1009 b->shadow = NULL;
1010 b = next;
1011 }
1012
1013 b->temporary = 0;
1014 b->recycled = 0;
1015 b->last_shadow = 0;
1016
1017 b->shadow = NULL;
1018
1019 buf->shadow = NULL;
1020 }
1021
1022
1023 ngx_int_t
ngx_event_pipe_add_free_buf(ngx_event_pipe_t * p,ngx_buf_t * b)1024 ngx_event_pipe_add_free_buf(ngx_event_pipe_t *p, ngx_buf_t *b)
1025 {
1026 ngx_chain_t *cl;
1027
1028 cl = ngx_alloc_chain_link(p->pool);
1029 if (cl == NULL) {
1030 return NGX_ERROR;
1031 }
1032
1033 if (p->buf_to_file && b->start == p->buf_to_file->start) {
1034 b->pos = p->buf_to_file->last;
1035 b->last = p->buf_to_file->last;
1036
1037 } else {
1038 b->pos = b->start;
1039 b->last = b->start;
1040 }
1041
1042 b->shadow = NULL;
1043
1044 cl->buf = b;
1045
1046 if (p->free_raw_bufs == NULL) {
1047 p->free_raw_bufs = cl;
1048 cl->next = NULL;
1049
1050 return NGX_OK;
1051 }
1052
1053 if (p->free_raw_bufs->buf->pos == p->free_raw_bufs->buf->last) {
1054
1055 /* add the free buf to the list start */
1056
1057 cl->next = p->free_raw_bufs;
1058 p->free_raw_bufs = cl;
1059
1060 return NGX_OK;
1061 }
1062
1063 /* the first free buf is partially filled, thus add the free buf after it */
1064
1065 cl->next = p->free_raw_bufs->next;
1066 p->free_raw_bufs->next = cl;
1067
1068 return NGX_OK;
1069 }
1070
1071
1072 static ngx_int_t
ngx_event_pipe_drain_chains(ngx_event_pipe_t * p)1073 ngx_event_pipe_drain_chains(ngx_event_pipe_t *p)
1074 {
1075 ngx_chain_t *cl, *tl;
1076
1077 for ( ;; ) {
1078 if (p->busy) {
1079 cl = p->busy;
1080 p->busy = NULL;
1081
1082 } else if (p->out) {
1083 cl = p->out;
1084 p->out = NULL;
1085
1086 } else if (p->in) {
1087 cl = p->in;
1088 p->in = NULL;
1089
1090 } else {
1091 return NGX_OK;
1092 }
1093
1094 while (cl) {
1095 if (cl->buf->last_shadow) {
1096 if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
1097 return NGX_ABORT;
1098 }
1099
1100 cl->buf->last_shadow = 0;
1101 }
1102
1103 cl->buf->shadow = NULL;
1104 tl = cl->next;
1105 cl->next = p->free;
1106 p->free = cl;
1107 cl = tl;
1108 }
1109 }
1110 }
1111