diff --git a/include/haproxy/mux_fcgi-t.h b/include/haproxy/mux_fcgi-t.h index 27973dbf4..a1c66205b 100644 --- a/include/haproxy/mux_fcgi-t.h +++ b/include/haproxy/mux_fcgi-t.h @@ -58,6 +58,9 @@ #define FCGI_CF_ERR_PENDING 0x00008000 /* A write error was detected (block sends but not reads) */ #define FCGI_CF_ERROR 0x00010000 /* A read error was detected (handled has an abort) */ +#define FCGI_CF_DEM_SHORT_READ 0x00020000 /* demux blocked on incomplete frame */ +#define FCGI_CF_END_REACHED 0x00040000 /* pending data too short with RCVD_SHUT present */ + /* This function is used to report flags in debugging tools. Please reflect * below any single-bit flag addition above in the same order via the diff --git a/src/mux_fcgi.c b/src/mux_fcgi.c index 4adf658b5..85d30efc7 100644 --- a/src/mux_fcgi.c +++ b/src/mux_fcgi.c @@ -437,41 +437,57 @@ static void fcgi_trace(enum trace_level level, uint64_t mask, const struct trace /* Indicates whether or not the we may call the fcgi_recv() function to attempt * to receive data into the buffer and/or demux pending data. The condition is * a bit complex due to some API limits for now. The rules are the following : - * - if an error or a shutdown was detected on the connection and the buffer - * is empty, we must not attempt to receive + * - if an error or a shutdown was detected on the connection, + we must not attempt to receive + * - if we're subscribed for receving, no need to try again * - if the demux buf failed to be allocated, we must not try to receive and - * we know there is nothing pending - * - if no flag indicates a blocking condition, we may attempt to receive, - * regardless of whether the demux buffer is full or not, so that only - * de demux part decides whether or not to block. This is needed because - * the connection API indeed prevents us from re-enabling receipt that is - * already enabled in a polled state, so we must always immediately stop - * as soon as the demux can't proceed so as never to hit an end of read - * with data pending in the buffers. - * - otherwise must may not attempt + * we know there is nothing pending (we'll be woken up once allocated) + * - if the demux buf is full, we will not be able to receive. + * - otherwise we may attempt to receive */ static inline int fcgi_recv_allowed(const struct fcgi_conn *fconn) { - if (fconn->flags & (FCGI_CF_EOS|FCGI_CF_ERROR)) + if (fconn->flags & (FCGI_CF_EOS|FCGI_CF_ERROR) || fconn->state == FCGI_CS_CLOSED) return 0; - if (b_data(&fconn->dbuf) == 0 && fconn->state == FCGI_CS_CLOSED) + if ((fconn->wait_event.events & SUB_RETRY_RECV)) return 0; - if (!(fconn->flags & FCGI_CF_DEM_DALLOC) && - !(fconn->flags & FCGI_CF_DEM_BLOCK_ANY)) - return 1; + if (!(fconn->flags & (FCGI_CF_DEM_DALLOC | FCGI_CF_DEM_DFULL))) + return 1; return 0; } -/* Restarts reading on the connection if it was not enabled */ +/* Indicates whether it's worth waking up the I/O handler to restart demuxing. + * Its conditions are the following: + * - if the buffer is empty and the connection is closed, there's nothing + * to demux + * - if a short read was reported, no need to try demuxing again + * - if some blocking conditions remain, no need to try again + * - otherwise it's safe to try demuxing again + */ +static inline int fcgi_may_demux(const struct fcgi_conn *fconn) +{ + if (fconn->state == FCGI_CS_CLOSED && !b_data(&fconn->dbuf)) + return 0; + + if (fconn->flags & FCGI_CF_DEM_SHORT_READ) + return 0; + + if (fconn->flags & FCGI_CF_DEM_BLOCK_ANY) + return 0; + + return 1; +} + + +/* restarts reading/processing on the connection if we can receive or demux + * (both are called from the same tasklet). + */ static inline void fcgi_conn_restart_reading(const struct fcgi_conn *fconn, int consider_buffer) { - if (!fcgi_recv_allowed(fconn)) - return; - if ((!consider_buffer || !b_data(&fconn->dbuf)) && - (fconn->wait_event.events & SUB_RETRY_RECV)) + if (!fcgi_recv_allowed(fconn) && !fcgi_may_demux(fconn)) return; tasklet_wakeup(fconn->wait_event.tasklet); } @@ -782,15 +798,15 @@ static void fcgi_release(struct fcgi_conn *fconn) } } -/* Detect a pending read0 for a FCGI connection. It happens if a read0 is - * pending on the connection AND if there is no more data in the demux - * buffer. The function returns 1 to report a read0 or 0 otherwise. +/* Detect a pending read0 for a FCGI connection. It happens if a read0 was + * already reported on a previous xprt->rcvbuf() AND a record parser failed + * to parse pending data, confirming no more progress is possible because + * we're facing a truncated frame. The function returns 1 to report a read0 + * or 0 otherwise. */ static int fcgi_conn_read0_pending(struct fcgi_conn *fconn) { - if ((fconn->flags & FCGI_CF_EOS) && !b_data(&fconn->dbuf)) - return 1; - return 0; + return !!(fconn->flags & FCGI_CF_END_REACHED); } @@ -1531,6 +1547,7 @@ static int fcgi_conn_handle_values_result(struct fcgi_conn *fconn) /* process full record only */ if (b_data(dbuf) < (fconn->drl + fconn->drp)) { + fconn->flags |= FCGI_CF_DEM_SHORT_READ; TRACE_DEVEL("leaving on missing data", FCGI_EV_RX_RECORD|FCGI_EV_RX_GETVAL, fconn->conn); return 0; } @@ -2256,10 +2273,10 @@ static int fcgi_strm_handle_stdout(struct fcgi_conn *fconn, struct fcgi_strm *fs if (fconn->state == FCGI_CS_RECORD_P) goto end_transfer; - if (b_data(dbuf) < (fconn->drl + fconn->drp) && - b_size(dbuf) > (fconn->drl + fconn->drp) && - buf_room_for_htx_data(dbuf)) + if (b_data(dbuf) < (fconn->drl + fconn->drp) && !b_full(dbuf)) { + fconn->flags |= FCGI_CF_DEM_SHORT_READ; goto fail; // incomplete record + } if (!fcgi_get_buf(fconn, &fstrm->rxbuf)) { fconn->flags |= FCGI_CF_DEM_SALLOC; @@ -2319,23 +2336,31 @@ static int fcgi_strm_handle_empty_stdout(struct fcgi_conn *fconn, struct fcgi_st TRACE_ENTER(FCGI_EV_RX_RECORD|FCGI_EV_RX_STDOUT, fconn->conn, fstrm); + if (b_data(&fconn->dbuf) < (fconn->drl + fconn->drp) && !b_full(&fconn->dbuf)) { + fconn->flags |= FCGI_CF_DEM_SHORT_READ; + goto fail; // incomplete record + } + fconn->state = FCGI_CS_RECORD_P; TRACE_STATE("switching to RECORD_P", FCGI_EV_RX_RECORD|FCGI_EV_RX_STDOUT, fconn->conn, fstrm); + fconn->drl += fconn->drp; fconn->drp = 0; ret = MIN(b_data(&fconn->dbuf), fconn->drl); b_del(&fconn->dbuf, ret); fconn->drl -= ret; - if (fconn->drl) { - TRACE_DEVEL("leaving on missing data or error", FCGI_EV_RX_RECORD|FCGI_EV_RX_STDOUT, fconn->conn, fstrm); - return 0; - } + if (fconn->drl) + goto fail; fconn->state = FCGI_CS_RECORD_H; fstrm->flags |= FCGI_SF_ES_RCVD; TRACE_PROTO("FCGI STDOUT record rcvd", FCGI_EV_RX_RECORD|FCGI_EV_RX_STDOUT, fconn->conn, fstrm, 0, (size_t[]){0}); TRACE_STATE("stdout data fully send, switching to RECORD_H", FCGI_EV_RX_RECORD|FCGI_EV_RX_FHDR|FCGI_EV_RX_EOI, fconn->conn, fstrm); TRACE_LEAVE(FCGI_EV_RX_RECORD|FCGI_EV_RX_STDOUT, fconn->conn, fstrm); return 1; + + fail: + TRACE_DEVEL("leaving on missing data or error", FCGI_EV_RX_RECORD|FCGI_EV_RX_STDOUT, fconn->conn, fstrm); + return 0; } /* Processes a STDERR record. Returns > 0 on success, 0 if it couldn't do @@ -2354,13 +2379,13 @@ static int fcgi_strm_handle_stderr(struct fcgi_conn *fconn, struct fcgi_strm *fs if (fconn->state == FCGI_CS_RECORD_P || !fconn->drl) goto end_transfer; - if (b_data(dbuf) < (fconn->drl + fconn->drp) && - b_size(dbuf) > (fconn->drl + fconn->drp) && - buf_room_for_htx_data(dbuf)) + if (b_data(dbuf) < (fconn->drl + fconn->drp) && !b_full(dbuf)) { + fconn->flags |= FCGI_CF_DEM_SHORT_READ; goto fail; // incomplete record + } chunk_reset(&trash); - ret = b_force_xfer(&trash, dbuf, MIN(b_room(&trash), fconn->drl)); + ret = b_force_xfer(&trash, dbuf, MIN(b_room(&trash) - 2, fconn->drl)); if (!ret) goto fail; fconn->drl -= ret; @@ -2415,6 +2440,7 @@ static int fcgi_strm_handle_end_request(struct fcgi_conn *fconn, struct fcgi_str /* process full record only */ if (b_data(dbuf) < (fconn->drl + fconn->drp)) { + fconn->flags |= FCGI_CF_DEM_SHORT_READ; TRACE_DEVEL("leaving on missing data", FCGI_EV_RX_RECORD|FCGI_EV_RX_ENDREQ, fconn->conn); return 0; } @@ -2476,6 +2502,7 @@ static void fcgi_process_demux(struct fcgi_conn *fconn) TRACE_STATE("receiving FCGI record header", FCGI_EV_RX_RECORD|FCGI_EV_RX_FHDR, fconn->conn); ret = fcgi_decode_record_hdr(&fconn->dbuf, 0, &hdr); if (!ret) { + fconn->flags |= FCGI_CF_DEM_SHORT_READ; TRACE_ERROR("header record decoding failure", FCGI_EV_RX_RECORD|FCGI_EV_RX_ENDREQ|FCGI_EV_FSTRM_ERR, fconn->conn, fstrm); goto fail; } @@ -2493,8 +2520,13 @@ static void fcgi_process_demux(struct fcgi_conn *fconn) /* process as many incoming records as possible below */ while (1) { + /* Make sure to clear DFULL if contents were deleted */ + if (!b_full(&fconn->dbuf)) + fconn->flags &= ~FCGI_CF_DEM_DFULL; + if (!b_data(&fconn->dbuf)) { TRACE_DEVEL("no more Rx data", FCGI_EV_RX_RECORD, fconn->conn); + fconn->flags |= FCGI_CF_DEM_SHORT_READ; break; } @@ -2506,8 +2538,10 @@ static void fcgi_process_demux(struct fcgi_conn *fconn) if (fconn->state == FCGI_CS_RECORD_H) { TRACE_PROTO("receiving FCGI record header", FCGI_EV_RX_RECORD|FCGI_EV_RX_FHDR, fconn->conn); ret = fcgi_decode_record_hdr(&fconn->dbuf, 0, &hdr); - if (!ret) + if (!ret) { + fconn->flags |= FCGI_CF_DEM_SHORT_READ; break; + } b_del(&fconn->dbuf, ret); new_record: @@ -2519,6 +2553,10 @@ static void fcgi_process_demux(struct fcgi_conn *fconn) TRACE_STATE("FCGI record header rcvd, switching to RECORD_D", FCGI_EV_RX_RECORD|FCGI_EV_RX_FHDR, fconn->conn); } + /* Make sure to clear DFULL if contents were deleted */ + if (!b_full(&fconn->dbuf)) + fconn->flags &= ~FCGI_CF_DEM_DFULL; + /* Only FCGI_CS_RECORD_D or FCGI_CS_RECORD_P */ tmp_fstrm = fcgi_conn_st_by_id(fconn, fconn->dsi); @@ -2578,6 +2616,12 @@ static void fcgi_process_demux(struct fcgi_conn *fconn) * larger than the buffer so we drain all of * their contents until we reach the end. */ + if (b_data(&fconn->dbuf) < (fconn->drl + fconn->drp) && !b_full(&fconn->dbuf)) { + fconn->flags |= FCGI_CF_DEM_SHORT_READ; + ret = 0; + break; + } + fconn->state = FCGI_CS_RECORD_P; fconn->drl += fconn->drp; fconn->drp = 0; @@ -2602,6 +2646,15 @@ static void fcgi_process_demux(struct fcgi_conn *fconn) } fail: + if (fconn->state == FCGI_CS_CLOSED || (fconn->flags & FCGI_CF_DEM_SHORT_READ)) { + if (fconn->flags & FCGI_CF_EOS) + fconn->flags |= FCGI_CF_END_REACHED; + } + + /* Make sure to clear DFULL if contents were deleted */ + if (!b_full(&fconn->dbuf)) + fconn->flags &= ~FCGI_CF_DEM_DFULL; + /* we can go here on missing data, blocked response or error */ if (fstrm && fcgi_strm_sc(fstrm) && (b_data(&fstrm->rxbuf) || @@ -2745,14 +2798,17 @@ static int fcgi_recv(struct fcgi_conn *fconn) TRACE_DATA("failed to receive data, subscribing", FCGI_EV_FCONN_RECV, conn); conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV, &fconn->wait_event); } - else + else if (ret) { + fconn->flags &= ~FCGI_CF_DEM_SHORT_READ; TRACE_DATA("recv data", FCGI_EV_FCONN_RECV, conn, 0, 0, (size_t[]){ret}); + } if (conn_xprt_read0_pending(conn)) { TRACE_DATA("received read0", FCGI_EV_FCONN_RECV, conn); fconn->flags |= FCGI_CF_EOS; } - if (conn->flags & CO_FL_ERROR) { + if (conn->flags & CO_FL_ERROR && + (!b_data(&fconn->dbuf) || (fconn->flags & FCGI_CF_DEM_SHORT_READ))) { TRACE_DATA("connection error", FCGI_EV_FCONN_RECV, conn); fconn->flags |= FCGI_CF_ERROR; } @@ -2999,14 +3055,30 @@ static int fcgi_process(struct fcgi_conn *fconn) TRACE_POINT(FCGI_EV_FCONN_WAKE, conn); - if (b_data(&fconn->dbuf) && !(fconn->flags & FCGI_CF_DEM_BLOCK_ANY)) { - fcgi_process_demux(fconn); + if (!(fconn->flags & FCGI_CF_DEM_BLOCK_ANY) && + (b_data(&fconn->dbuf) || (fconn->flags & FCGI_CF_EOS))) { + do { + fcgi_process_demux(fconn); + + /* hint: if we ended up aligned on a record, we've very + * likely reached the end, no point trying again. + */ + if (fconn->state == FCGI_CS_RECORD_H) + break; + + if (!fcgi_recv_allowed(fconn)) + break; + + /* OK, it's worth trying to grab a few more records */ + fcgi_recv(fconn); + + } while ((b_data(&fconn->dbuf) && fcgi_may_demux(fconn)) || (fconn->flags & FCGI_CF_EOS)); + + /* now's time to wake the task up */ + fcgi_conn_restart_reading(fconn, 0); if (fconn->state == FCGI_CS_CLOSED || (fconn->flags & FCGI_CF_ERROR)) b_reset(&fconn->dbuf); - - if (buf_room_for_htx_data(&fconn->dbuf)) - fconn->flags &= ~FCGI_CF_DEM_DFULL; } fcgi_send(fconn);