3 * Copyright (C) Igor Sysoev
7 #include <ngx_config.h>
10 #include <ngx_event_pipe.h>
13 static ngx_int_t ngx_event_pipe_read_upstream(ngx_event_pipe_t *p);
14 static ngx_int_t ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p);
16 static ngx_int_t ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p);
17 static ngx_inline void ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf);
18 static ngx_inline void ngx_event_pipe_free_shadow_raw_buf(ngx_chain_t **free,
20 static ngx_int_t ngx_event_pipe_drain_chains(ngx_event_pipe_t *p);
24 ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write)
27 ngx_event_t *rev, *wev;
31 p->log->action = "sending to client";
33 if (ngx_event_pipe_write_to_downstream(p) == NGX_ABORT) {
39 p->upstream_blocked = 0;
41 p->log->action = "reading upstream";
43 if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) {
47 if (!p->read && !p->upstream_blocked) {
54 if (p->upstream->fd != -1) {
55 rev = p->upstream->read;
57 flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0;
59 if (ngx_handle_read_event(rev, flags) != NGX_OK) {
63 if (rev->active && !rev->ready) {
64 ngx_add_timer(rev, p->read_timeout);
66 } else if (rev->timer_set) {
71 if (p->downstream->fd != -1 && p->downstream->data == p->output_ctx) {
72 wev = p->downstream->write;
73 if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {
78 if (wev->active && !wev->ready) {
79 ngx_add_timer(wev, p->send_timeout);
81 } else if (wev->timer_set) {
92 ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
97 ngx_chain_t *chain, *cl, *ln;
99 if (p->upstream_eof || p->upstream_error || p->upstream_done) {
103 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
104 "pipe read upstream: %d", p->upstream->read->ready);
108 if (p->upstream_eof || p->upstream_error || p->upstream_done) {
112 if (p->preread_bufs == NULL && !p->upstream->read->ready) {
116 if (p->preread_bufs) {
118 /* use the pre-read bufs if they exist */
120 chain = p->preread_bufs;
121 p->preread_bufs = NULL;
124 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
125 "pipe preread: %z", n);
133 #if (NGX_HAVE_KQUEUE)
136 * kqueue notifies about the end of file or a pending error.
137 * This test allows not to allocate a buf on these conditions
138 * and not to call c->recv_chain().
141 if (p->upstream->read->available == 0
142 && p->upstream->read->pending_eof)
144 p->upstream->read->ready = 0;
145 p->upstream->read->eof = 0;
149 if (p->upstream->read->kq_errno) {
150 p->upstream->read->error = 1;
151 p->upstream_error = 1;
154 ngx_log_error(NGX_LOG_ERR, p->log,
155 p->upstream->read->kq_errno,
156 "kevent() reported that upstream "
157 "closed connection");
164 if (p->free_raw_bufs) {
166 /* use the free bufs if they exist */
168 chain = p->free_raw_bufs;
170 p->free_raw_bufs = p->free_raw_bufs->next;
173 p->free_raw_bufs = NULL;
176 } else if (p->allocated < p->bufs.num) {
178 /* allocate a new buf if it's still allowed */
180 b = ngx_create_temp_buf(p->pool, p->bufs.size);
187 chain = ngx_alloc_chain_link(p->pool);
195 } else if (!p->cacheable
196 && p->downstream->data == p->output_ctx
197 && p->downstream->write->ready
198 && !p->downstream->write->delayed)
201 * if the bufs are not needed to be saved in a cache and
202 * a downstream is ready then write the bufs to a downstream
205 p->upstream_blocked = 1;
207 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
208 "pipe downstream ready");
212 } else if (p->cacheable
213 || p->temp_file->offset < p->max_temp_file_size)
217 * if it is allowed, then save some bufs from r->in
218 * to a temporary file, and add them to a r->out chain
221 rc = ngx_event_pipe_write_chain_to_temp_file(p);
223 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
224 "pipe temp offset: %O", p->temp_file->offset);
226 if (rc == NGX_BUSY) {
230 if (rc == NGX_AGAIN) {
231 if (ngx_event_flags & NGX_USE_LEVEL_EVENT
232 && p->upstream->read->active
233 && p->upstream->read->ready)
235 if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0)
247 chain = p->free_raw_bufs;
249 p->free_raw_bufs = p->free_raw_bufs->next;
252 p->free_raw_bufs = NULL;
257 /* there are no bufs to read in */
259 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
260 "no pipe bufs to read in");
265 n = p->upstream->recv_chain(p->upstream, chain);
267 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
268 "pipe recv chain: %z", n);
270 if (p->free_raw_bufs) {
271 chain->next = p->free_raw_bufs;
273 p->free_raw_bufs = chain;
275 if (n == NGX_ERROR) {
276 p->upstream_error = 1;
280 if (n == NGX_AGAIN) {
282 ngx_event_pipe_remove_shadow_links(chain->buf);
298 p->free_raw_bufs = NULL;
300 while (cl && n > 0) {
302 ngx_event_pipe_remove_shadow_links(cl->buf);
304 size = cl->buf->end - cl->buf->last;
307 cl->buf->last = cl->buf->end;
309 /* STUB */ cl->buf->num = p->num++;
311 if (p->input_filter(p, cl->buf) == NGX_ERROR) {
318 ngx_free_chain(p->pool, ln);
327 for (ln = cl; ln->next; ln = ln->next) { /* void */ }
329 ln->next = p->free_raw_bufs;
330 p->free_raw_bufs = cl;
336 for (cl = p->busy; cl; cl = cl->next) {
337 ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
338 "pipe buf busy s:%d t:%d f:%d "
339 "%p, pos %p, size: %z "
340 "file: %O, size: %z",
341 (cl->buf->shadow ? 1 : 0),
342 cl->buf->temporary, cl->buf->in_file,
343 cl->buf->start, cl->buf->pos,
344 cl->buf->last - cl->buf->pos,
346 cl->buf->file_last - cl->buf->file_pos);
349 for (cl = p->out; cl; cl = cl->next) {
350 ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
351 "pipe buf out s:%d t:%d f:%d "
352 "%p, pos %p, size: %z "
353 "file: %O, size: %z",
354 (cl->buf->shadow ? 1 : 0),
355 cl->buf->temporary, cl->buf->in_file,
356 cl->buf->start, cl->buf->pos,
357 cl->buf->last - cl->buf->pos,
359 cl->buf->file_last - cl->buf->file_pos);
362 for (cl = p->in; cl; cl = cl->next) {
363 ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
364 "pipe buf in s:%d t:%d f:%d "
365 "%p, pos %p, size: %z "
366 "file: %O, size: %z",
367 (cl->buf->shadow ? 1 : 0),
368 cl->buf->temporary, cl->buf->in_file,
369 cl->buf->start, cl->buf->pos,
370 cl->buf->last - cl->buf->pos,
372 cl->buf->file_last - cl->buf->file_pos);
375 for (cl = p->free_raw_bufs; cl; cl = cl->next) {
376 ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
377 "pipe buf free s:%d t:%d f:%d "
378 "%p, pos %p, size: %z "
379 "file: %O, size: %z",
380 (cl->buf->shadow ? 1 : 0),
381 cl->buf->temporary, cl->buf->in_file,
382 cl->buf->start, cl->buf->pos,
383 cl->buf->last - cl->buf->pos,
385 cl->buf->file_last - cl->buf->file_pos);
390 if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) {
392 /* STUB */ p->free_raw_bufs->buf->num = p->num++;
394 if (p->input_filter(p, p->free_raw_bufs->buf) == NGX_ERROR) {
398 p->free_raw_bufs = p->free_raw_bufs->next;
401 for (cl = p->free_raw_bufs; cl; cl = cl->next) {
402 if (cl->buf->shadow == NULL) {
403 ngx_pfree(p->pool, cl->buf->start);
409 if (p->cacheable && p->in) {
410 if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) {
420 ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
425 ngx_uint_t flush, prev_last_shadow;
426 ngx_chain_t *out, **ll, *cl;
427 ngx_connection_t *downstream;
429 downstream = p->downstream;
431 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
432 "pipe write downstream: %d", downstream->write->ready);
435 if (p->downstream_error) {
436 return ngx_event_pipe_drain_chains(p);
439 if (p->upstream_eof || p->upstream_error || p->upstream_done) {
441 /* pass the p->out and p->in chains to the output filter */
443 for (cl = p->busy; cl; cl = cl->next) {
444 cl->buf->recycled = 0;
448 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
449 "pipe write downstream flush out");
451 for (cl = p->out; cl; cl = cl->next) {
452 cl->buf->recycled = 0;
455 rc = p->output_filter(p->output_ctx, p->out);
457 if (downstream->destroyed) {
461 if (rc == NGX_ERROR) {
462 p->downstream_error = 1;
463 return ngx_event_pipe_drain_chains(p);
470 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
471 "pipe write downstream flush in");
473 for (cl = p->in; cl; cl = cl->next) {
474 cl->buf->recycled = 0;
477 rc = p->output_filter(p->output_ctx, p->in);
479 if (downstream->destroyed) {
483 if (rc == NGX_ERROR) {
484 p->downstream_error = 1;
485 return ngx_event_pipe_drain_chains(p);
491 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
492 "pipe write downstream done");
494 /* TODO: free unused bufs */
496 p->downstream_done = 1;
500 if (downstream->data != p->output_ctx
501 || !downstream->write->ready
502 || downstream->write->delayed)
507 /* bsize is the size of the busy recycled bufs */
512 for (cl = p->busy; cl; cl = cl->next) {
514 if (cl->buf->recycled) {
515 if (prev == cl->buf->start) {
519 bsize += cl->buf->end - cl->buf->start;
520 prev = cl->buf->start;
524 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
525 "pipe write busy: %uz", bsize);
529 if (bsize >= (size_t) p->busy_size) {
536 prev_last_shadow = 1;
542 if (cl->buf->recycled
543 && bsize + cl->buf->last - cl->buf->pos > p->busy_size)
549 p->out = p->out->next;
551 ngx_event_pipe_free_shadow_raw_buf(&p->free_raw_bufs, cl->buf);
553 } else if (!p->cacheable && p->in) {
556 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
557 "pipe write buf ls:%d %p %z",
558 cl->buf->last_shadow,
560 cl->buf->last - cl->buf->pos);
562 if (cl->buf->recycled
563 && cl->buf->last_shadow
564 && bsize + cl->buf->last - cl->buf->pos > p->busy_size)
566 if (!prev_last_shadow) {
582 prev_last_shadow = cl->buf->last_shadow;
590 if (cl->buf->recycled) {
591 bsize += cl->buf->last - cl->buf->pos;
606 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
607 "pipe write: out:%p, f:%d", out, flush);
609 if (out == NULL && !flush) {
613 rc = p->output_filter(p->output_ctx, out);
615 if (downstream->destroyed) {
619 if (rc == NGX_ERROR) {
620 p->downstream_error = 1;
621 return ngx_event_pipe_drain_chains(p);
624 ngx_chain_update_chains(&p->free, &p->busy, &out, p->tag);
626 for (cl = p->free; cl; cl = cl->next) {
628 if (cl->buf->temp_file) {
629 if (p->cacheable || !p->cyclic_temp_file) {
633 /* reset p->temp_offset if all bufs had been sent */
635 if (cl->buf->file_last == p->temp_file->offset) {
636 p->temp_file->offset = 0;
640 /* TODO: free buf if p->free_bufs && upstream done */
642 /* add the free shadow raw buf to p->free_raw_bufs */
644 if (cl->buf->last_shadow) {
645 if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
649 cl->buf->last_shadow = 0;
652 cl->buf->shadow = NULL;
661 ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
665 ngx_chain_t *cl, *tl, *next, *out, **ll, **last_free, fl;
667 if (p->buf_to_file) {
668 fl.buf = p->buf_to_file;
682 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
683 "pipe offset: %O", p->temp_file->offset);
686 bsize = cl->buf->last - cl->buf->pos;
688 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
689 "pipe buf %p, pos %p, size: %z",
690 cl->buf->start, cl->buf->pos, bsize);
692 if ((size + bsize > p->temp_file_write_size)
693 || (p->temp_file->offset + size + bsize > p->max_temp_file_size))
704 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "size: %z", size);
724 if (ngx_write_chain_to_temp_file(p->temp_file, out) == NGX_ERROR) {
728 for (last_free = &p->free_raw_bufs;
730 last_free = &(*last_free)->next)
735 if (p->buf_to_file) {
736 p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos;
737 p->buf_to_file = NULL;
741 for (cl = out; cl; cl = next) {
746 b->file = &p->temp_file->file;
747 b->file_pos = p->temp_file->offset;
748 p->temp_file->offset += b->last - b->pos;
749 b->file_last = p->temp_file->offset;
759 p->last_out = &cl->next;
761 if (b->last_shadow) {
763 tl = ngx_alloc_chain_link(p->pool);
772 last_free = &tl->next;
774 b->shadow->pos = b->shadow->start;
775 b->shadow->last = b->shadow->start;
777 ngx_event_pipe_remove_shadow_links(b->shadow);
785 /* the copy input filter */
788 ngx_event_pipe_copy_input_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
793 if (buf->pos == buf->last) {
801 ngx_free_chain(p->pool, cl);
804 b = ngx_alloc_buf(p->pool);
810 ngx_memcpy(b, buf, sizeof(ngx_buf_t));
817 cl = ngx_alloc_chain_link(p->pool);
825 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "input buf #%d", b->num);
832 p->last_in = &cl->next;
838 static ngx_inline void
839 ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf)
849 while (!b->last_shadow) {
869 static ngx_inline void
870 ngx_event_pipe_free_shadow_raw_buf(ngx_chain_t **free, ngx_buf_t *buf)
873 ngx_chain_t *cl, **ll;
875 if (buf->shadow == NULL) {
879 for (s = buf->shadow; !s->last_shadow; s = s->shadow) { /* void */ }
883 for (cl = *free; cl; cl = cl->next) {
889 if (cl->buf->shadow) {
899 ngx_event_pipe_add_free_buf(ngx_event_pipe_t *p, ngx_buf_t *b)
903 cl = ngx_alloc_chain_link(p->pool);
914 if (p->free_raw_bufs == NULL) {
915 p->free_raw_bufs = cl;
921 if (p->free_raw_bufs->buf->pos == p->free_raw_bufs->buf->last) {
923 /* add the free buf to the list start */
925 cl->next = p->free_raw_bufs;
926 p->free_raw_bufs = cl;
931 /* the first free buf is partialy filled, thus add the free buf after it */
933 cl->next = p->free_raw_bufs->next;
934 p->free_raw_bufs->next = cl;
941 ngx_event_pipe_drain_chains(ngx_event_pipe_t *p)
943 ngx_chain_t *cl, *tl;
963 if (cl->buf->last_shadow) {
964 if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
968 cl->buf->last_shadow = 0;
971 cl->buf->shadow = NULL;