haproxy/src/stream_interface.c

1637 lines
50 KiB
C
Raw Normal View History

[MAJOR] add a connection error state to the stream_interface Tracking connection status changes was hard, and some code was redundant. A new SI_ST_CER state was added to the stream interface to indicate a past connection error, and an SI_FL_ERR flag was added to report past I/O error. The stream_sock code does not set the connection to SI_ST_CLO anymore in case of I/O error, it's the upper layer which does it. This makes it possible to know exactly when the file descriptors are allocated. The new SI_ST_CER state permitted to split tcp_connection_status() in two parts, one processing SI_ST_CON and the other one SI_ST_CER. Synchronous connection errors now make use of this last state, hence eliminating duplicate code. Some ib<->ob copy paste errors were found and fixed, and all entities setting SI_ST_CLO also shut the buffers down. Some of these stream_interface specific functions and structures have migrated to a new stream_interface.c file. Some types of errors are still not detected by the buffers. For instance, let's assume the following scenario in one single pass of process_session: a connection sits in SI_ST_TAR state during a retry. At TAR expiration, a new connection attempt is made, the connection is obtained and srv->cur_sess is increased. Then the buffer timeout is fires and everything is cleared, the new state becomes SI_ST_CLO. The cleaning code checks that previous state was either SI_ST_CON or SI_ST_EST to release the connection. But that's wrong because last state is still SI_ST_TAR. So the server's connection count does not get decreased. This means that prev_state must not be used, and must be replaced by some transition detection instead of level detection. The following debugging line was useful to track state changes : fprintf(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__, s->si[0].state, s->si[1].state, s->si[1].err_type, s->req->flags, s-> rep->flags);
2008-11-03 00:26:53 -05:00
/*
* Functions managing stream_interface structures
*
* Copyright 2000-2012 Willy Tarreau <w@1wt.eu>
[MAJOR] add a connection error state to the stream_interface Tracking connection status changes was hard, and some code was redundant. A new SI_ST_CER state was added to the stream interface to indicate a past connection error, and an SI_FL_ERR flag was added to report past I/O error. The stream_sock code does not set the connection to SI_ST_CLO anymore in case of I/O error, it's the upper layer which does it. This makes it possible to know exactly when the file descriptors are allocated. The new SI_ST_CER state permitted to split tcp_connection_status() in two parts, one processing SI_ST_CON and the other one SI_ST_CER. Synchronous connection errors now make use of this last state, hence eliminating duplicate code. Some ib<->ob copy paste errors were found and fixed, and all entities setting SI_ST_CLO also shut the buffers down. Some of these stream_interface specific functions and structures have migrated to a new stream_interface.c file. Some types of errors are still not detected by the buffers. For instance, let's assume the following scenario in one single pass of process_session: a connection sits in SI_ST_TAR state during a retry. At TAR expiration, a new connection attempt is made, the connection is obtained and srv->cur_sess is increased. Then the buffer timeout is fires and everything is cleared, the new state becomes SI_ST_CLO. The cleaning code checks that previous state was either SI_ST_CON or SI_ST_EST to release the connection. But that's wrong because last state is still SI_ST_TAR. So the server's connection count does not get decreased. This means that prev_state must not be used, and must be replaced by some transition detection instead of level detection. The following debugging line was useful to track state changes : fprintf(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__, s->si[0].state, s->si[1].state, s->si[1].err_type, s->req->flags, s-> rep->flags);
2008-11-03 00:26:53 -05:00
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version
* 2 of the License, or (at your option) any later version.
*
*/
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <common/buffer.h>
[MAJOR] add a connection error state to the stream_interface Tracking connection status changes was hard, and some code was redundant. A new SI_ST_CER state was added to the stream interface to indicate a past connection error, and an SI_FL_ERR flag was added to report past I/O error. The stream_sock code does not set the connection to SI_ST_CLO anymore in case of I/O error, it's the upper layer which does it. This makes it possible to know exactly when the file descriptors are allocated. The new SI_ST_CER state permitted to split tcp_connection_status() in two parts, one processing SI_ST_CON and the other one SI_ST_CER. Synchronous connection errors now make use of this last state, hence eliminating duplicate code. Some ib<->ob copy paste errors were found and fixed, and all entities setting SI_ST_CLO also shut the buffers down. Some of these stream_interface specific functions and structures have migrated to a new stream_interface.c file. Some types of errors are still not detected by the buffers. For instance, let's assume the following scenario in one single pass of process_session: a connection sits in SI_ST_TAR state during a retry. At TAR expiration, a new connection attempt is made, the connection is obtained and srv->cur_sess is increased. Then the buffer timeout is fires and everything is cleared, the new state becomes SI_ST_CLO. The cleaning code checks that previous state was either SI_ST_CON or SI_ST_EST to release the connection. But that's wrong because last state is still SI_ST_TAR. So the server's connection count does not get decreased. This means that prev_state must not be used, and must be replaced by some transition detection instead of level detection. The following debugging line was useful to track state changes : fprintf(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__, s->si[0].state, s->si[1].state, s->si[1].err_type, s->req->flags, s-> rep->flags);
2008-11-03 00:26:53 -05:00
#include <common/compat.h>
#include <common/config.h>
#include <common/debug.h>
#include <common/standard.h>
#include <common/ticks.h>
#include <common/time.h>
#include <proto/applet.h>
#include <proto/channel.h>
#include <proto/connection.h>
#include <proto/mux_pt.h>
#include <proto/pipe.h>
REORG/MAJOR: session: rename the "session" entity to "stream" With HTTP/2, we'll have to support multiplexed streams. A stream is in fact the largest part of what we currently call a session, it has buffers, logs, etc. In order to catch any error, this commit removes any reference to the struct session and tries to rename most "session" occurrences in function names to "stream" and "sess" to "strm" when that's related to a session. The files stream.{c,h} were added and session.{c,h} removed. The session will be reintroduced later and a few parts of the stream will progressively be moved overthere. It will more or less contain only what we need in an embryonic session. Sample fetch functions and converters will have to change a bit so that they'll use an L5 (session) instead of what's currently called "L4" which is in fact L6 for now. Once all changes are completed, we should see approximately this : L7 - http_txn L6 - stream L5 - session L4 - connection | applet There will be at most one http_txn per stream, and a same session will possibly be referenced by multiple streams. A connection will point to a session and to a stream. The session will hold all the information we need to keep even when we don't yet have a stream. Some more cleanup is needed because some code was already far from being clean. The server queue management still refers to sessions at many places while comments talk about connections. This will have to be cleaned up once we have a server-side connection pool manager. Stream flags "SN_*" still need to be renamed, it doesn't seem like any of them will need to move to the session.
2015-04-02 18:22:06 -04:00
#include <proto/stream.h>
#include <proto/stream_interface.h>
[MAJOR] add a connection error state to the stream_interface Tracking connection status changes was hard, and some code was redundant. A new SI_ST_CER state was added to the stream interface to indicate a past connection error, and an SI_FL_ERR flag was added to report past I/O error. The stream_sock code does not set the connection to SI_ST_CLO anymore in case of I/O error, it's the upper layer which does it. This makes it possible to know exactly when the file descriptors are allocated. The new SI_ST_CER state permitted to split tcp_connection_status() in two parts, one processing SI_ST_CON and the other one SI_ST_CER. Synchronous connection errors now make use of this last state, hence eliminating duplicate code. Some ib<->ob copy paste errors were found and fixed, and all entities setting SI_ST_CLO also shut the buffers down. Some of these stream_interface specific functions and structures have migrated to a new stream_interface.c file. Some types of errors are still not detected by the buffers. For instance, let's assume the following scenario in one single pass of process_session: a connection sits in SI_ST_TAR state during a retry. At TAR expiration, a new connection attempt is made, the connection is obtained and srv->cur_sess is increased. Then the buffer timeout is fires and everything is cleared, the new state becomes SI_ST_CLO. The cleaning code checks that previous state was either SI_ST_CON or SI_ST_EST to release the connection. But that's wrong because last state is still SI_ST_TAR. So the server's connection count does not get decreased. This means that prev_state must not be used, and must be replaced by some transition detection instead of level detection. The following debugging line was useful to track state changes : fprintf(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__, s->si[0].state, s->si[1].state, s->si[1].err_type, s->req->flags, s-> rep->flags);
2008-11-03 00:26:53 -05:00
#include <proto/task.h>
#include <types/pipe.h>
/* socket functions used when running a stream interface as a task */
static void stream_int_shutr(struct stream_interface *si);
static void stream_int_shutw(struct stream_interface *si);
static void stream_int_chk_rcv(struct stream_interface *si);
static void stream_int_chk_snd(struct stream_interface *si);
static void stream_int_shutr_conn(struct stream_interface *si);
static void stream_int_shutw_conn(struct stream_interface *si);
static void stream_int_chk_rcv_conn(struct stream_interface *si);
static void stream_int_chk_snd_conn(struct stream_interface *si);
static void stream_int_shutr_applet(struct stream_interface *si);
static void stream_int_shutw_applet(struct stream_interface *si);
static void stream_int_chk_rcv_applet(struct stream_interface *si);
static void stream_int_chk_snd_applet(struct stream_interface *si);
static int si_cs_recv(struct conn_stream *cs);
static int si_cs_process(struct conn_stream *cs);
static int si_idle_conn_wake_cb(struct conn_stream *cs);
static int si_cs_send(struct conn_stream *cs);
/* stream-interface operations for embedded tasks */
struct si_ops si_embedded_ops = {
.chk_rcv = stream_int_chk_rcv,
.chk_snd = stream_int_chk_snd,
.shutr = stream_int_shutr,
.shutw = stream_int_shutw,
};
/* stream-interface operations for connections */
struct si_ops si_conn_ops = {
.update = stream_int_update_conn,
.chk_rcv = stream_int_chk_rcv_conn,
.chk_snd = stream_int_chk_snd_conn,
.shutr = stream_int_shutr_conn,
.shutw = stream_int_shutw_conn,
};
/* stream-interface operations for connections */
struct si_ops si_applet_ops = {
.update = stream_int_update_applet,
.chk_rcv = stream_int_chk_rcv_applet,
.chk_snd = stream_int_chk_snd_applet,
.shutr = stream_int_shutr_applet,
.shutw = stream_int_shutw_applet,
};
struct data_cb si_conn_cb = {
.name = "STRM",
};
struct data_cb si_idle_conn_cb = {
.wake = si_idle_conn_wake_cb,
.name = "IDLE",
};
[MAJOR] add a connection error state to the stream_interface Tracking connection status changes was hard, and some code was redundant. A new SI_ST_CER state was added to the stream interface to indicate a past connection error, and an SI_FL_ERR flag was added to report past I/O error. The stream_sock code does not set the connection to SI_ST_CLO anymore in case of I/O error, it's the upper layer which does it. This makes it possible to know exactly when the file descriptors are allocated. The new SI_ST_CER state permitted to split tcp_connection_status() in two parts, one processing SI_ST_CON and the other one SI_ST_CER. Synchronous connection errors now make use of this last state, hence eliminating duplicate code. Some ib<->ob copy paste errors were found and fixed, and all entities setting SI_ST_CLO also shut the buffers down. Some of these stream_interface specific functions and structures have migrated to a new stream_interface.c file. Some types of errors are still not detected by the buffers. For instance, let's assume the following scenario in one single pass of process_session: a connection sits in SI_ST_TAR state during a retry. At TAR expiration, a new connection attempt is made, the connection is obtained and srv->cur_sess is increased. Then the buffer timeout is fires and everything is cleared, the new state becomes SI_ST_CLO. The cleaning code checks that previous state was either SI_ST_CON or SI_ST_EST to release the connection. But that's wrong because last state is still SI_ST_TAR. So the server's connection count does not get decreased. This means that prev_state must not be used, and must be replaced by some transition detection instead of level detection. The following debugging line was useful to track state changes : fprintf(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__, s->si[0].state, s->si[1].state, s->si[1].err_type, s->req->flags, s-> rep->flags);
2008-11-03 00:26:53 -05:00
/*
* This function only has to be called once after a wakeup event in case of
* suspected timeout. It controls the stream interface timeouts and sets
* si->flags accordingly. It does NOT close anything, as this timeout may
* be used for any purpose. It returns 1 if the timeout fired, otherwise
* zero.
*/
int stream_int_check_timeouts(struct stream_interface *si)
{
if (tick_is_expired(si->exp, now_ms)) {
si->flags |= SI_FL_EXP;
return 1;
}
return 0;
}
/* to be called only when in SI_ST_DIS with SI_FL_ERR */
[MAJOR] add a connection error state to the stream_interface Tracking connection status changes was hard, and some code was redundant. A new SI_ST_CER state was added to the stream interface to indicate a past connection error, and an SI_FL_ERR flag was added to report past I/O error. The stream_sock code does not set the connection to SI_ST_CLO anymore in case of I/O error, it's the upper layer which does it. This makes it possible to know exactly when the file descriptors are allocated. The new SI_ST_CER state permitted to split tcp_connection_status() in two parts, one processing SI_ST_CON and the other one SI_ST_CER. Synchronous connection errors now make use of this last state, hence eliminating duplicate code. Some ib<->ob copy paste errors were found and fixed, and all entities setting SI_ST_CLO also shut the buffers down. Some of these stream_interface specific functions and structures have migrated to a new stream_interface.c file. Some types of errors are still not detected by the buffers. For instance, let's assume the following scenario in one single pass of process_session: a connection sits in SI_ST_TAR state during a retry. At TAR expiration, a new connection attempt is made, the connection is obtained and srv->cur_sess is increased. Then the buffer timeout is fires and everything is cleared, the new state becomes SI_ST_CLO. The cleaning code checks that previous state was either SI_ST_CON or SI_ST_EST to release the connection. But that's wrong because last state is still SI_ST_TAR. So the server's connection count does not get decreased. This means that prev_state must not be used, and must be replaced by some transition detection instead of level detection. The following debugging line was useful to track state changes : fprintf(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__, s->si[0].state, s->si[1].state, s->si[1].err_type, s->req->flags, s-> rep->flags);
2008-11-03 00:26:53 -05:00
void stream_int_report_error(struct stream_interface *si)
{
if (!si->err_type)
si->err_type = SI_ET_DATA_ERR;
si_oc(si)->flags |= CF_WRITE_ERROR;
si_ic(si)->flags |= CF_READ_ERROR;
[MAJOR] add a connection error state to the stream_interface Tracking connection status changes was hard, and some code was redundant. A new SI_ST_CER state was added to the stream interface to indicate a past connection error, and an SI_FL_ERR flag was added to report past I/O error. The stream_sock code does not set the connection to SI_ST_CLO anymore in case of I/O error, it's the upper layer which does it. This makes it possible to know exactly when the file descriptors are allocated. The new SI_ST_CER state permitted to split tcp_connection_status() in two parts, one processing SI_ST_CON and the other one SI_ST_CER. Synchronous connection errors now make use of this last state, hence eliminating duplicate code. Some ib<->ob copy paste errors were found and fixed, and all entities setting SI_ST_CLO also shut the buffers down. Some of these stream_interface specific functions and structures have migrated to a new stream_interface.c file. Some types of errors are still not detected by the buffers. For instance, let's assume the following scenario in one single pass of process_session: a connection sits in SI_ST_TAR state during a retry. At TAR expiration, a new connection attempt is made, the connection is obtained and srv->cur_sess is increased. Then the buffer timeout is fires and everything is cleared, the new state becomes SI_ST_CLO. The cleaning code checks that previous state was either SI_ST_CON or SI_ST_EST to release the connection. But that's wrong because last state is still SI_ST_TAR. So the server's connection count does not get decreased. This means that prev_state must not be used, and must be replaced by some transition detection instead of level detection. The following debugging line was useful to track state changes : fprintf(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__, s->si[0].state, s->si[1].state, s->si[1].err_type, s->req->flags, s-> rep->flags);
2008-11-03 00:26:53 -05:00
}
/*
* Returns a message to the client ; the connection is shut down for read,
* and the request is cleared so that no server connection can be initiated.
* The buffer is marked for read shutdown on the other side to protect the
* message, and the buffer write is enabled. The message is contained in a
* "chunk". If it is null, then an empty message is used. The reply buffer does
* not need to be empty before this, and its contents will not be overwritten.
* The primary goal of this function is to return error messages to a client.
*/
void stream_int_retnclose(struct stream_interface *si,
const struct buffer *msg)
{
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
channel_auto_read(ic);
channel_abort(ic);
channel_auto_close(ic);
channel_erase(ic);
channel_truncate(oc);
if (likely(msg && msg->data))
co_inject(oc, msg->area, msg->data);
oc->wex = tick_add_ifset(now_ms, oc->wto);
channel_auto_read(oc);
channel_auto_close(oc);
channel_shutr_now(oc);
}
/*
* This function performs a shutdown-read on a detached stream interface in a
* connected or init state (it does nothing for other states). It either shuts
* the read side or marks itself as closed. The buffer flags are updated to
* reflect the new state. If the stream interface has SI_FL_NOHALF, we also
* forward the close to the write side. The owner task is woken up if it exists.
*/
static void stream_int_shutr(struct stream_interface *si)
{
struct channel *ic = si_ic(si);
ic->flags &= ~CF_SHUTR_NOW;
if (ic->flags & CF_SHUTR)
return;
ic->flags |= CF_SHUTR;
ic->rex = TICK_ETERNITY;
si->flags &= ~SI_FL_WAIT_ROOM;
if (si->state != SI_ST_EST && si->state != SI_ST_CON)
return;
if (si_oc(si)->flags & CF_SHUTW) {
si->state = SI_ST_DIS;
si->exp = TICK_ETERNITY;
}
else if (si->flags & SI_FL_NOHALF) {
/* we want to immediately forward this close to the write side */
return stream_int_shutw(si);
}
/* note that if the task exists, it must unregister itself once it runs */
if (!(si->flags & SI_FL_DONT_WAKE))
task_wakeup(si_task(si), TASK_WOKEN_IO);
}
/*
* This function performs a shutdown-write on a detached stream interface in a
* connected or init state (it does nothing for other states). It either shuts
* the write side or marks itself as closed. The buffer flags are updated to
* reflect the new state. It does also close everything if the SI was marked as
* being in error state. The owner task is woken up if it exists.
*/
static void stream_int_shutw(struct stream_interface *si)
{
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
oc->flags &= ~CF_SHUTW_NOW;
if (oc->flags & CF_SHUTW)
return;
oc->flags |= CF_SHUTW;
oc->wex = TICK_ETERNITY;
si->flags &= ~SI_FL_WAIT_DATA;
BUG/MEDIUM: stream: fix client-fin/server-fin handling A tcp half connection can cause 100% CPU on expiration. First reproduced with this haproxy configuration : global tune.bufsize 10485760 defaults timeout server-fin 90s timeout client-fin 90s backend node2 mode tcp timeout server 900s timeout connect 10s server def 127.0.0.1:3333 frontend fe_api mode tcp timeout client 900s bind :1990 use_backend node2 Ie timeout server-fin shorter than timeout server, the backend server sends data, this package is left in the cache of haproxy, the backend server continue sending fin package, haproxy recv fin package. this time the session information is as follows: time the session information is as follows: 0x2373470: proto=tcpv4 src=127.0.0.1:39513 fe=fe_api be=node2 srv=def ts=08 age=1s calls=3 rq[f=848000h,i=0,an=00h,rx=14m58s,wx=,ax=] rp[f=8004c020h,i=0,an=00h,rx=,wx=14m58s,ax=] s0=[7,0h,fd=6,ex=] s1=[7,18h,fd=7,ex=] exp=14m58s rp has set the CF_SHUTR state, next, the client sends the fin package, session information is as follows: 0x2373470: proto=tcpv4 src=127.0.0.1:39513 fe=fe_api be=node2 srv=def ts=08 age=38s calls=4 rq[f=84a020h,i=0,an=00h,rx=,wx=,ax=] rp[f=8004c020h,i=0,an=00h,rx=1m11s,wx=14m21s,ax=] s0=[7,0h,fd=6,ex=] s1=[9,10h,fd=7,ex=] exp=1m11s After waiting 90s, session information is as follows: 0x2373470: proto=tcpv4 src=127.0.0.1:39513 fe=fe_api be=node2 srv=def ts=04 age=4m11s calls=718074391 rq[f=84a020h,i=0,an=00h,rx=,wx=,ax=] rp[f=8004c020h,i=0,an=00h,rx=?,wx=10m49s,ax=] s0=[7,0h,fd=6,ex=] s1=[9,10h,fd=7,ex=] exp=? run(nice=0) cpu information: 6899 root 20 0 112224 21408 4260 R 100.0 0.7 3:04.96 haproxy Buffering is set to ensure that there is data in the haproxy buffer, and haproxy can receive the fin package, set the CF_SHUTR flag, If the CF_SHUTR flag has been set, The following code does not clear the timeout message, causing cpu 100%: stream.c:process_stream: if (unlikely((res->flags & (CF_SHUTR|CF_READ_TIMEOUT)) == CF_READ_TIMEOUT)) { if (si_b->flags & SI_FL_NOHALF) si_b->flags |= SI_FL_NOLINGER; si_shutr(si_b); } If you have closed the read, set the read timeout does not make sense. With or without cf_shutr, read timeout is set: if (tick_isset(s->be->timeout.serverfin)) { res->rto = s->be->timeout.serverfin; res->rex = tick_add(now_ms, res->rto); } After discussion on the mailing list, setting half-closed timeouts the hard way here doesn't make sense. They should be set only at the moment the shutdown() is performed. It will also solve a special case which was already reported of some half-closed timeouts not working when the shutw() is performed directly at the stream-interface layer (no analyser involved). Since the stream interface layer cannot know the timeout values, we'll have to store them directly in the stream interface so that they are used upon shutw(). This patch does this, fixing the problem. An easier reproducer to validate the fix is to keep the huge buffer and shorten all timeouts, then call it under tcploop server and client, and wait 3 seconds to see haproxy run at 100% CPU : global tune.bufsize 10485760 listen px bind :1990 timeout client 90s timeout server 90s timeout connect 1s timeout server-fin 3s timeout client-fin 3s server def 127.0.0.1:3333 $ tcploop 3333 L W N20 A P100 F P10000 & $ tcploop 127.0.0.1:1990 C S10000000 F
2017-03-10 12:41:51 -05:00
if (tick_isset(si->hcto)) {
ic->rto = si->hcto;
ic->rex = tick_add(now_ms, ic->rto);
}
switch (si->state) {
case SI_ST_EST:
/* we have to shut before closing, otherwise some short messages
* may never leave the system, especially when there are remaining
* unread data in the socket input buffer, or when nolinger is set.
* However, if SI_FL_NOLINGER is explicitly set, we know there is
* no risk so we close both sides immediately.
*/
if (!(si->flags & (SI_FL_ERR | SI_FL_NOLINGER)) &&
!(ic->flags & (CF_SHUTR|CF_DONT_READ)))
return;
/* fall through */
case SI_ST_CON:
case SI_ST_CER:
case SI_ST_QUE:
case SI_ST_TAR:
/* Note that none of these states may happen with applets */
si->state = SI_ST_DIS;
default:
si->flags &= ~(SI_FL_WAIT_ROOM | SI_FL_NOLINGER);
ic->flags &= ~CF_SHUTR_NOW;
ic->flags |= CF_SHUTR;
ic->rex = TICK_ETERNITY;
si->exp = TICK_ETERNITY;
}
/* note that if the task exists, it must unregister itself once it runs */
if (!(si->flags & SI_FL_DONT_WAKE))
task_wakeup(si_task(si), TASK_WOKEN_IO);
}
/* default chk_rcv function for scheduled tasks */
static void stream_int_chk_rcv(struct stream_interface *si)
{
struct channel *ic = si_ic(si);
DPRINTF(stderr, "%s: si=%p, si->state=%d ic->flags=%08x oc->flags=%08x\n",
__FUNCTION__,
si, si->state, ic->flags, si_oc(si)->flags);
if (unlikely(si->state != SI_ST_EST || (ic->flags & (CF_SHUTR|CF_DONT_READ))))
return;
if (!channel_may_recv(ic) || ic->pipe) {
/* stop reading */
si->flags |= SI_FL_WAIT_ROOM;
}
else {
/* (re)start reading */
si->flags &= ~SI_FL_WAIT_ROOM;
if (!(si->flags & SI_FL_DONT_WAKE))
task_wakeup(si_task(si), TASK_WOKEN_IO);
}
}
/* default chk_snd function for scheduled tasks */
static void stream_int_chk_snd(struct stream_interface *si)
{
struct channel *oc = si_oc(si);
DPRINTF(stderr, "%s: si=%p, si->state=%d ic->flags=%08x oc->flags=%08x\n",
__FUNCTION__,
si, si->state, si_ic(si)->flags, oc->flags);
if (unlikely(si->state != SI_ST_EST || (oc->flags & CF_SHUTW)))
return;
if (!(si->flags & SI_FL_WAIT_DATA) || /* not waiting for data */
channel_is_empty(oc)) /* called with nothing to send ! */
return;
/* Otherwise there are remaining data to be sent in the buffer,
* so we tell the handler.
*/
si->flags &= ~SI_FL_WAIT_DATA;
if (!tick_isset(oc->wex))
oc->wex = tick_add_ifset(now_ms, oc->wto);
if (!(si->flags & SI_FL_DONT_WAKE))
task_wakeup(si_task(si), TASK_WOKEN_IO);
}
/* Register an applet to handle a stream_interface as a new appctx. The SI will
* wake it up everytime it is solicited. The appctx must be deleted by the task
* handler using si_release_endpoint(), possibly from within the function itself.
* It also pre-initializes the applet's context and returns it (or NULL in case
* it could not be allocated).
*/
struct appctx *stream_int_register_handler(struct stream_interface *si, struct applet *app)
{
struct appctx *appctx;
DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", app, si, si_task(si));
appctx = si_alloc_appctx(si, app);
if (!appctx)
return NULL;
si_applet_cant_get(si);
appctx_wakeup(appctx);
return si_appctx(si);
}
/* This callback is used to send a valid PROXY protocol line to a socket being
* established. It returns 0 if it fails in a fatal way or needs to poll to go
* further, otherwise it returns non-zero and removes itself from the connection's
* flags (the bit is provided in <flag> by the caller). It is designed to be
* called by the connection handler and relies on it to commit polling changes.
* Note that it can emit a PROXY line by relying on the other end's address
* when the connection is attached to a stream interface, or by resolving the
* local address otherwise (also called a LOCAL line).
*/
int conn_si_send_proxy(struct connection *conn, unsigned int flag)
{
/* we might have been called just after an asynchronous shutw */
if (conn->flags & CO_FL_SOCK_WR_SH)
goto out_error;
if (!conn_ctrl_ready(conn))
MAJOR: connection: add two new flags to indicate readiness of control/transport Currently the control and transport layers of a connection are supposed to be initialized when their respective pointers are not NULL. This will not work anymore when we plan to reuse connections, because there is an asymmetry between the accept() side and the connect() side : - on accept() side, the fd is set first, then the ctrl layer then the transport layer ; upon error, they must be undone in the reverse order, then the FD must be closed. The FD must not be deleted if the control layer was not yet initialized ; - on the connect() side, the fd is set last and there is no reliable way to know if it has been initialized or not. In practice it's initialized to -1 first but this is hackish and supposes that local FDs only will be used forever. Also, there are even less solutions for keeping trace of the transport layer's state. Also it is possible to support delayed close() when something (eg: logs) tracks some information requiring the transport and/or control layers, making it even more difficult to clean them. So the proposed solution is to add two flags to the connection : - CO_FL_CTRL_READY is set when the control layer is initialized (fd_insert) and cleared after it's released (fd_delete). - CO_FL_XPRT_READY is set when the control layer is initialized (xprt->init) and cleared after it's released (xprt->close). The functions have been adapted to rely on this and not on the pointers anymore. conn_xprt_close() was unused and dangerous : it did not close the control layer (eg: the socket itself) but still marks the transport layer as closed, preventing any future call to conn_full_close() from finishing the job. The problem comes from conn_full_close() in fact. It needs to close the xprt and ctrl layers independantly. After that we're still having an issue : we don't know based on ->ctrl alone whether the fd was registered or not. For this we use the two new flags CO_FL_XPRT_READY and CO_FL_CTRL_READY. We now rely on this and not on conn->xprt nor conn->ctrl anymore to decide what remains to be done on the connection. In order not to miss some flag assignments, we introduce conn_ctrl_init() to initialize the control layer, register the fd using fd_insert() and set the flag, and conn_ctrl_close() which unregisters the fd and removes the flag, but only if the transport layer was closed. Similarly, at the transport layer, conn_xprt_init() calls ->init and sets the flag, while conn_xprt_close() checks the flag, calls ->close and clears the flag, regardless xprt_ctx or xprt_st. This also ensures that the ->init and the ->close functions are called only once each and in the correct order. Note that conn_xprt_close() does nothing if the transport layer is still tracked. conn_full_close() now simply calls conn_xprt_close() then conn_full_close() in turn, which do nothing if CO_FL_XPRT_TRACKED is set. In order to handle the error path, we also provide conn_force_close() which ignores CO_FL_XPRT_TRACKED and closes the transport and the control layers in turns. All relevant instances of fd_delete() have been replaced with conn_force_close(). Now we always know what state the connection is in and we can expect to split its initialization.
2013-10-21 10:30:56 -04:00
goto out_error;
/* If we have a PROXY line to send, we'll use this to validate the
* connection, in which case the connection is validated only once
* we've sent the whole proxy line. Otherwise we use connect().
*/
while (conn->send_proxy_ofs) {
struct conn_stream *cs;
int ret;
cs = conn->mux_ctx;
/* The target server expects a PROXY line to be sent first.
* If the send_proxy_ofs is negative, it corresponds to the
* offset to start sending from then end of the proxy string
* (which is recomputed every time since it's constant). If
* it is positive, it means we have to send from the start.
* We can only send a "normal" PROXY line when the connection
* is attached to a stream interface. Otherwise we can only
* send a LOCAL line (eg: for use with health checks).
*/
if (conn->mux == &mux_pt_ops && cs->data_cb == &si_conn_cb) {
struct stream_interface *si = cs->data;
struct conn_stream *remote_cs = objt_cs(si_opposite(si)->end);
ret = make_proxy_line(trash.area, trash.size,
objt_server(conn->target),
remote_cs ? remote_cs->conn : NULL);
}
else {
/* The target server expects a LOCAL line to be sent first. Retrieving
* local or remote addresses may fail until the connection is established.
*/
conn_get_from_addr(conn);
if (!(conn->flags & CO_FL_ADDR_FROM_SET))
goto out_wait;
conn_get_to_addr(conn);
if (!(conn->flags & CO_FL_ADDR_TO_SET))
goto out_wait;
ret = make_proxy_line(trash.area, trash.size,
objt_server(conn->target), conn);
}
if (!ret)
goto out_error;
if (conn->send_proxy_ofs > 0)
conn->send_proxy_ofs = -ret; /* first call */
/* we have to send trash from (ret+sp for -sp bytes). If the
* data layer has a pending write, we'll also set MSG_MORE.
*/
ret = conn_sock_send(conn,
trash.area + ret + conn->send_proxy_ofs,
-conn->send_proxy_ofs,
(conn->flags & CO_FL_XPRT_WR_ENA) ? MSG_MORE : 0);
if (ret < 0)
goto out_error;
conn->send_proxy_ofs += ret; /* becomes zero once complete */
if (conn->send_proxy_ofs != 0)
goto out_wait;
/* OK we've sent the whole line, we're connected */
break;
}
/* The connection is ready now, simply return and let the connection
* handler notify upper layers if needed.
*/
if (conn->flags & CO_FL_WAIT_L4_CONN)
conn->flags &= ~CO_FL_WAIT_L4_CONN;
conn->flags &= ~flag;
return 1;
out_error:
/* Write error on the file descriptor */
conn->flags |= CO_FL_ERROR;
return 0;
out_wait:
__conn_sock_stop_recv(conn);
return 0;
}
/* Callback to be used by connection I/O handlers when some activity is detected
* on an idle server connection. Its main purpose is to kill the connection once
* a close was detected on it. It returns 0 if it did nothing serious, or -1 if
* it killed the connection.
*/
static int si_idle_conn_wake_cb(struct conn_stream *cs)
{
struct connection *conn = cs->conn;
struct stream_interface *si = cs->data;
if (!conn_ctrl_ready(conn))
return 0;
conn_sock_drain(conn);
if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH) || cs->flags & CS_FL_ERROR) {
/* warning, we can't do anything on <conn> after this call ! */
si_release_endpoint(si);
return -1;
}
return 0;
}
/* This function is the equivalent to stream_int_update() except that it's
* designed to be called from outside the stream handlers, typically the lower
* layers (applets, connections) after I/O completion. After updating the stream
* interface and timeouts, it will try to forward what can be forwarded, then to
* wake the associated task up if an important event requires special handling.
* It should not be called from within the stream itself, stream_int_update()
* is designed for this.
*/
void stream_int_notify(struct stream_interface *si)
{
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
/* process consumer side */
if (channel_is_empty(oc)) {
struct connection *conn = objt_cs(si->end) ? objt_cs(si->end)->conn : NULL;
if (((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW) &&
(si->state == SI_ST_EST) && (!conn || !(conn->flags & (CO_FL_HANDSHAKE | CO_FL_EARLY_SSL_HS))))
si_shutw(si);
oc->wex = TICK_ETERNITY;
}
/* indicate that we may be waiting for data from the output channel or
* we're about to close and can't expect more data if SHUTW_NOW is there.
*/
if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0 && channel_may_recv(oc))
si->flags |= SI_FL_WAIT_DATA;
else if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW)
si->flags &= ~SI_FL_WAIT_DATA;
/* update OC timeouts and wake the other side up if it's waiting for room */
if (oc->flags & CF_WRITE_ACTIVITY) {
if ((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL)) == CF_WRITE_PARTIAL &&
!channel_is_empty(oc))
if (tick_isset(oc->wex))
oc->wex = tick_add_ifset(now_ms, oc->wto);
if (!(si->flags & SI_FL_INDEP_STR))
if (tick_isset(ic->rex))
ic->rex = tick_add_ifset(now_ms, ic->rto);
if (likely((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL &&
channel_may_recv(oc) &&
(si_opposite(si)->flags & SI_FL_WAIT_ROOM)))
si_chk_rcv(si_opposite(si));
}
/* Notify the other side when we've injected data into the IC that
* needs to be forwarded. We can do fast-forwarding as soon as there
* are output data, but we avoid doing this if some of the data are
* not yet scheduled for being forwarded, because it is very likely
* that it will be done again immediately afterwards once the following
* data are parsed (eg: HTTP chunking). We only SI_FL_WAIT_ROOM once
* we've emptied *some* of the output buffer, and not just when there
* is available room, because applets are often forced to stop before
* the buffer is full. We must not stop based on input data alone because
* an HTTP parser might need more data to complete the parsing.
*/
BUG/MAJOR: stream-int: don't re-arm recv if send fails When 1) HAProxy configured to enable splice on both directions 2) After some high load, there are 2 input channels with their socket buffer being non-empty and pipe being full at the same time, sitting in `fd_cache` without any other fds. The 2 channels will repeatedly be stopped for receiving (pipe full) and waken for receiving (data in socket), thus getting out and in of `fd_cache`, making their fd swapping location in `fd_cache`. There is a `if (entry < fd_cache_num && fd_cache[entry] != fd) continue;` statement in `fd_process_cached_events` to prevent frequent polling, but since the only 2 fds are constantly swapping location, `fd_cache[entry] != fd` will always hold true, thus HAProxy can't make any progress. The root cause of the issue is dual : - there is a single fd_cache, for next events and for the ones being processed, while using two distinct arrays would avoid the problem. - the write side of the stream interface wakes the read side up even when it couldn't write, and this one really is a bug. Due to CF_WRITE_PARTIAL not being cleared during fast forwarding, a failed send() attempt will still cause ->chk_rcv() to be called on the other side, re-creating an entry for its connection fd in the cache, causing the same sequence to be repeated indefinitely without any opportunity to make progress. CF_WRITE_PARTIAL used to be used for what is present in these tests : check if a recent write operation was performed. It's part of the CF_WRITE_ACTIVITY set and is tested to check if timeouts need to be updated. It's also used to detect if a failed connect() may be retried. What this patch does is use CF_WROTE_DATA() to check for a successful write for connection retransmits, and to clear CF_WRITE_PARTIAL before preparing to send in stream_int_notify(). This way, timeouts are still updated each time a write succeeds, but chk_rcv() won't be called anymore after a failed write. It seems the fix is required all the way down to 1.5. Without this patch, the only workaround at this point is to disable splicing in at least one direction. Strictly speaking, splicing is not absolutely required, as regular forwarding could theorically cause the issue to happen if the timing is appropriate, but in practice it appears impossible to reproduce it without splicing, and even with splicing it may vary. The following config manages to reproduce it after a few attempts (haproxy going 100% CPU and having to be killed) : global maxpipes 50000 maxconn 10000 listen srv1 option splice-request option splice-response bind :8001 server s1 127.0.0.1:8002 server$ tcploop 8002 L N20 A R10 S1000000 R10 S1000000 R10 S1000000 R10 S1000000 R10 S1000000 client$ tcploop 8001 N20 C T S1000000 R10 J
2017-09-15 02:56:40 -04:00
/* ensure it's only set if a write attempt has succeeded */
ic->flags &= ~CF_WRITE_PARTIAL;
if (!channel_is_empty(ic) &&
(si_opposite(si)->flags & SI_FL_WAIT_DATA) &&
(ci_data(ic) == 0 || ic->pipe)) {
int new_len, last_len;
last_len = co_data(ic);
if (ic->pipe)
last_len += ic->pipe->data;
si_chk_snd(si_opposite(si));
new_len = co_data(ic);
if (ic->pipe)
new_len += ic->pipe->data;
/* check if the consumer has freed some space either in the
* buffer or in the pipe.
*/
if (channel_may_recv(ic) && new_len < last_len)
si->flags &= ~SI_FL_WAIT_ROOM;
}
if (si->flags & SI_FL_WAIT_ROOM) {
ic->rex = TICK_ETERNITY;
}
else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL &&
channel_may_recv(ic)) {
/* we must re-enable reading if si_chk_snd() has freed some space */
if (!(ic->flags & CF_READ_NOEXP) && tick_isset(ic->rex))
ic->rex = tick_add_ifset(now_ms, ic->rto);
}
/* wake the task up only when needed */
if (/* changes on the production side */
(ic->flags & (CF_READ_NULL|CF_READ_ERROR)) ||
si->state != SI_ST_EST ||
(si->flags & SI_FL_ERR) ||
((ic->flags & CF_READ_PARTIAL) &&
(!ic->to_forward || si_opposite(si)->state != SI_ST_EST)) ||
/* changes on the consumption side */
(oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
BUG/MEDIUM: stream-int: Don't loss write's notifs when a stream is woken up When a write activity is reported on a channel, it is important to keep this information for the stream because it take part on the analyzers' triggering. When some data are written, the flag CF_WRITE_PARTIAL is set. It participates to the task's timeout updates and to the stream's waking. It is also used in CF_MASK_ANALYSER mask to trigger channels anaylzers. In the past, it was cleared by process_stream. Because of a bug (fixed in commit 95fad5ba4 ["BUG/MAJOR: stream-int: don't re-arm recv if send fails"]), It is now cleared before each send and in stream_int_notify. So it is possible to loss this information when process_stream is called, preventing analyzers to be called, and possibly leading to a stalled stream. Today, this happens in HTTP2 when you call the stat page or when you use the cache filter. In fact, this happens when the response is sent by an applet. In HTTP1, everything seems to work as expected. To fix the problem, we need to make the difference between the write activity reported to lower layers and the one reported to the stream. So the flag CF_WRITE_EVENT has been added to notify the stream of the write activity on a channel. It is set when a send succedded and reset by process_stream. It is also used in CF_MASK_ANALYSER. finally, it is checked in stream_int_notify to wake up a stream and in channel_check_timeouts. This bug is probably present in 1.7 but it seems to have no effect. So for now, no needs to backport it.
2017-11-09 03:36:43 -05:00
((oc->flags & (CF_WRITE_ACTIVITY|CF_WRITE_EVENT)) &&
((oc->flags & CF_SHUTW) ||
((oc->flags & CF_WAKE_WRITE) &&
(si_opposite(si)->state != SI_ST_EST ||
(channel_is_empty(oc) && !oc->to_forward)))))) {
task_wakeup(si_task(si), TASK_WOKEN_IO);
}
if (ic->flags & CF_READ_ACTIVITY)
ic->flags &= ~CF_READ_DONTWAIT;
}
/* Called by I/O handlers after completion.. It propagates
* connection flags to the stream interface, updates the stream (which may or
* may not take this opportunity to try to forward data), then update the
* connection's polling based on the channels and stream interface's final
* states. The function always returns 0.
*/
static int si_cs_process(struct conn_stream *cs)
{
struct connection *conn = cs->conn;
struct stream_interface *si = cs->data;
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
/* If we have data to send, try it now */
if (!channel_is_empty(oc) && objt_cs(si->end))
si_cs_send(objt_cs(si->end));
/* First step, report to the stream-int what was detected at the
* connection layer : errors and connection establishment.
*/
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
si->flags |= SI_FL_ERR;
/* If we had early data, and the handshake ended, then
* we can remove the flag, and attempt to wake the task up,
* in the event there's an analyser waiting for the end of
* the handshake.
*/
if (!(conn->flags & (CO_FL_HANDSHAKE | CO_FL_EARLY_SSL_HS)) &&
(cs->flags & CS_FL_WAIT_FOR_HS)) {
cs->flags &= ~CS_FL_WAIT_FOR_HS;
task_wakeup(si_task(si), TASK_WOKEN_MSG);
}
if ((si->state < SI_ST_EST) &&
(conn->flags & (CO_FL_CONNECTED | CO_FL_HANDSHAKE)) == CO_FL_CONNECTED) {
si->exp = TICK_ETERNITY;
oc->flags |= CF_WRITE_NULL;
}
/* Second step : update the stream-int and channels, try to forward any
* pending data, then possibly wake the stream up based on the new
* stream-int status.
*/
stream_int_notify(si);
BUG/MAJOR: Fix how the list of entities waiting for a buffer is handled When an entity tries to get a buffer, if it cannot be allocted, for example because the number of buffers which may be allocated per process is limited, this entity is added in a list (called <buffer_wq>) and wait for an available buffer. Historically, the <buffer_wq> list was logically attached to streams because it were the only entities likely to be added in it. Now, applets can also be waiting for a free buffer. And with filters, we could imagine to have more other entities waiting for a buffer. So it make sense to have a generic list. Anyway, with the current design there is a bug. When an applet failed to get a buffer, it will wait. But we add the stream attached to the applet in <buffer_wq>, instead of the applet itself. So when a buffer is available, we wake up the stream and not the waiting applet. So, it is possible to have waiting applets and never awakened. So, now, <buffer_wq> is independant from streams. And we really add the waiting entity in <buffer_wq>. To be generic, the entity is responsible to define the callback used to awaken it. In addition, applets will still request an input buffer when they become active. But they will not be sleeped anymore if no buffer are available. So this is the responsibility to the applet I/O handler to check if this buffer is allocated or not. This way, an applet can decide if this buffer is required or not and can do additional processing if not. [wt: backport to 1.7 and 1.6]
2016-12-09 11:30:18 -05:00
channel_release_buffer(ic, &(si_strm(si)->buffer_wait));
/* Third step : update the connection's polling status based on what
* was done above (eg: maybe some buffers got emptied).
*/
if (channel_is_empty(oc))
__cs_stop_send(cs);
if (si->flags & SI_FL_WAIT_ROOM) {
__cs_stop_recv(cs);
}
else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL &&
channel_may_recv(ic)) {
__cs_want_recv(cs);
}
return 0;
}
/*
* This function is called to send buffer data to a stream socket.
* It calls the mux layer's snd_buf function. It relies on the
* caller to commit polling changes. The caller should check conn->flags
* for errors.
*/
static int si_cs_send(struct conn_stream *cs)
{
struct connection *conn = cs->conn;
struct stream_interface *si = cs->data;
struct channel *oc = si_oc(si);
int ret;
int did_send = 0;
/* We're already waiting to be able to send, give up */
if (si->wait_list.wait_reason & SUB_CAN_SEND)
return 0;
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
return 1;
if (conn->flags & CO_FL_HANDSHAKE) {
/* a handshake was requested */
/* Schedule ourself to be woken up once the handshake is done */
conn->xprt->subscribe(conn, SUB_CAN_SEND, &si->wait_list);
return 0;
}
/* we might have been called just after an asynchronous shutw */
if (si_oc(si)->flags & CF_SHUTW)
return 1;
BUG/MAJOR: stream-int: don't re-arm recv if send fails When 1) HAProxy configured to enable splice on both directions 2) After some high load, there are 2 input channels with their socket buffer being non-empty and pipe being full at the same time, sitting in `fd_cache` without any other fds. The 2 channels will repeatedly be stopped for receiving (pipe full) and waken for receiving (data in socket), thus getting out and in of `fd_cache`, making their fd swapping location in `fd_cache`. There is a `if (entry < fd_cache_num && fd_cache[entry] != fd) continue;` statement in `fd_process_cached_events` to prevent frequent polling, but since the only 2 fds are constantly swapping location, `fd_cache[entry] != fd` will always hold true, thus HAProxy can't make any progress. The root cause of the issue is dual : - there is a single fd_cache, for next events and for the ones being processed, while using two distinct arrays would avoid the problem. - the write side of the stream interface wakes the read side up even when it couldn't write, and this one really is a bug. Due to CF_WRITE_PARTIAL not being cleared during fast forwarding, a failed send() attempt will still cause ->chk_rcv() to be called on the other side, re-creating an entry for its connection fd in the cache, causing the same sequence to be repeated indefinitely without any opportunity to make progress. CF_WRITE_PARTIAL used to be used for what is present in these tests : check if a recent write operation was performed. It's part of the CF_WRITE_ACTIVITY set and is tested to check if timeouts need to be updated. It's also used to detect if a failed connect() may be retried. What this patch does is use CF_WROTE_DATA() to check for a successful write for connection retransmits, and to clear CF_WRITE_PARTIAL before preparing to send in stream_int_notify(). This way, timeouts are still updated each time a write succeeds, but chk_rcv() won't be called anymore after a failed write. It seems the fix is required all the way down to 1.5. Without this patch, the only workaround at this point is to disable splicing in at least one direction. Strictly speaking, splicing is not absolutely required, as regular forwarding could theorically cause the issue to happen if the timing is appropriate, but in practice it appears impossible to reproduce it without splicing, and even with splicing it may vary. The following config manages to reproduce it after a few attempts (haproxy going 100% CPU and having to be killed) : global maxpipes 50000 maxconn 10000 listen srv1 option splice-request option splice-response bind :8001 server s1 127.0.0.1:8002 server$ tcploop 8002 L N20 A R10 S1000000 R10 S1000000 R10 S1000000 R10 S1000000 R10 S1000000 client$ tcploop 8001 N20 C T S1000000 R10 J
2017-09-15 02:56:40 -04:00
/* ensure it's only set if a write attempt has succeeded */
oc->flags &= ~CF_WRITE_PARTIAL;
if (oc->pipe && conn->xprt->snd_pipe && conn->mux->snd_pipe) {
ret = conn->mux->snd_pipe(cs, oc->pipe);
if (ret > 0) {
BUG/MEDIUM: stream-int: Don't loss write's notifs when a stream is woken up When a write activity is reported on a channel, it is important to keep this information for the stream because it take part on the analyzers' triggering. When some data are written, the flag CF_WRITE_PARTIAL is set. It participates to the task's timeout updates and to the stream's waking. It is also used in CF_MASK_ANALYSER mask to trigger channels anaylzers. In the past, it was cleared by process_stream. Because of a bug (fixed in commit 95fad5ba4 ["BUG/MAJOR: stream-int: don't re-arm recv if send fails"]), It is now cleared before each send and in stream_int_notify. So it is possible to loss this information when process_stream is called, preventing analyzers to be called, and possibly leading to a stalled stream. Today, this happens in HTTP2 when you call the stat page or when you use the cache filter. In fact, this happens when the response is sent by an applet. In HTTP1, everything seems to work as expected. To fix the problem, we need to make the difference between the write activity reported to lower layers and the one reported to the stream. So the flag CF_WRITE_EVENT has been added to notify the stream of the write activity on a channel. It is set when a send succedded and reset by process_stream. It is also used in CF_MASK_ANALYSER. finally, it is checked in stream_int_notify to wake up a stream and in channel_check_timeouts. This bug is probably present in 1.7 but it seems to have no effect. So for now, no needs to backport it.
2017-11-09 03:36:43 -05:00
oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA | CF_WRITE_EVENT;
did_send = 1;
}
if (!oc->pipe->data) {
put_pipe(oc->pipe);
oc->pipe = NULL;
}
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
return 0;
}
/* At this point, the pipe is empty, but we may still have data pending
* in the normal buffer.
*/
if (!co_data(oc))
goto wake_others;
/* when we're here, we already know that there is no spliced
* data left, and that there are sendable buffered data.
*/
if (!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_WR_SH | CO_FL_HANDSHAKE)) &&
!(cs->flags & CS_FL_ERROR) && !(oc->flags & CF_SHUTW)) {
/* check if we want to inform the kernel that we're interested in
* sending more data after this call. We want this if :
* - we're about to close after this last send and want to merge
* the ongoing FIN with the last segment.
* - we know we can't send everything at once and must get back
* here because of unaligned data
* - there is still a finite amount of data to forward
* The test is arranged so that the most common case does only 2
* tests.
*/
unsigned int send_flag = 0;
if ((!(oc->flags & (CF_NEVER_WAIT|CF_SEND_DONTWAIT)) &&
((oc->to_forward && oc->to_forward != CHN_INFINITE_FORWARD) ||
(oc->flags & CF_EXPECT_MORE))) ||
((oc->flags & CF_ISRESP) &&
((oc->flags & (CF_AUTO_CLOSE|CF_SHUTW_NOW)) == (CF_AUTO_CLOSE|CF_SHUTW_NOW))))
send_flag |= CO_SFL_MSG_MORE;
if (oc->flags & CF_STREAMER)
send_flag |= CO_SFL_STREAMER;
ret = cs->conn->mux->snd_buf(cs, &oc->buf, co_data(oc), send_flag);
if (ret > 0) {
did_send = 1;
BUG/MEDIUM: stream-int: Don't loss write's notifs when a stream is woken up When a write activity is reported on a channel, it is important to keep this information for the stream because it take part on the analyzers' triggering. When some data are written, the flag CF_WRITE_PARTIAL is set. It participates to the task's timeout updates and to the stream's waking. It is also used in CF_MASK_ANALYSER mask to trigger channels anaylzers. In the past, it was cleared by process_stream. Because of a bug (fixed in commit 95fad5ba4 ["BUG/MAJOR: stream-int: don't re-arm recv if send fails"]), It is now cleared before each send and in stream_int_notify. So it is possible to loss this information when process_stream is called, preventing analyzers to be called, and possibly leading to a stalled stream. Today, this happens in HTTP2 when you call the stat page or when you use the cache filter. In fact, this happens when the response is sent by an applet. In HTTP1, everything seems to work as expected. To fix the problem, we need to make the difference between the write activity reported to lower layers and the one reported to the stream. So the flag CF_WRITE_EVENT has been added to notify the stream of the write activity on a channel. It is set when a send succedded and reset by process_stream. It is also used in CF_MASK_ANALYSER. finally, it is checked in stream_int_notify to wake up a stream and in channel_check_timeouts. This bug is probably present in 1.7 but it seems to have no effect. So for now, no needs to backport it.
2017-11-09 03:36:43 -05:00
oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA | CF_WRITE_EVENT;
co_set_data(oc, co_data(oc) - ret);
c_realign_if_empty(oc);
if (!co_data(oc)) {
/* Always clear both flags once everything has been sent, they're one-shot */
oc->flags &= ~(CF_EXPECT_MORE | CF_SEND_DONTWAIT);
}
/* if some data remain in the buffer, it's only because the
* system buffers are full, we will try next time.
*/
}
}
/* We couldn't send all of our data, let the mux know we'd like to send more */
if (co_data(oc)) {
cs_want_send(cs);
conn->mux->subscribe(cs, SUB_CAN_SEND, &si->wait_list);
}
wake_others:
/* Maybe somebody was waiting for this conn_stream, wake them */
if (did_send) {
while (!LIST_ISEMPTY(&cs->send_wait_list)) {
struct wait_list *sw = LIST_ELEM(cs->send_wait_list.n,
struct wait_list *, list);
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
sw->wait_reason &= ~SUB_CAN_SEND;
tasklet_wakeup(sw->task);
}
while (!(LIST_ISEMPTY(&cs->sendrecv_wait_list))) {
struct wait_list *sw = LIST_ELEM(cs->sendrecv_wait_list.n,
struct wait_list *, list);
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
LIST_ADDQ(&cs->recv_wait_list, &sw->list);
sw->wait_reason &= ~SUB_CAN_SEND;
tasklet_wakeup(sw->task);
}
}
return did_send;
}
struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state)
{
struct stream_interface *si = ctx;
struct conn_stream *cs = objt_cs(si->end);
int ret = 0;
if (!cs)
return NULL;
redo:
if (!(si->wait_list.wait_reason & SUB_CAN_SEND))
ret = si_cs_send(cs);
if (!(si->wait_list.wait_reason & SUB_CAN_RECV))
ret |= si_cs_recv(cs);
if (ret != 0)
si_cs_process(cs);
return (NULL);
}
/* This function is designed to be called from within the stream handler to
* update the channels' expiration timers and the stream interface's flags
* based on the channels' flags. It needs to be called only once after the
* channels' flags have settled down, and before they are cleared, though it
* doesn't harm to call it as often as desired (it just slightly hurts
* performance). It must not be called from outside of the stream handler,
* as what it does will be used to compute the stream task's expiration.
*/
void stream_int_update(struct stream_interface *si)
{
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
if (!(ic->flags & CF_SHUTR)) {
/* Read not closed, update FD status and timeout for reads */
if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic)) {
/* stop reading */
if (!(si->flags & SI_FL_WAIT_ROOM)) {
if (!(ic->flags & CF_DONT_READ)) /* full */
si->flags |= SI_FL_WAIT_ROOM;
ic->rex = TICK_ETERNITY;
}
}
BUG/MEDIUM: stream-int: don't immediately enable reading when the buffer was reportedly full There is a long-time issue which affects some applets, at least the stats applet. If a large stats page is read over a slow link, regularly the channel's buffer contains too many response data to allow another round of ci_putblk() to copy a new message. In this case the applet calls si_applet_cant_put() to mention that it failed to emit data into the channel's buffer, and wants to be called only once some room is made. The problem is that stream_int_update(), which is called from process_stream(), will clear this flag whenever it sees there's some spare room in the channel's buffer. It causes the applet to be woken again immediately. This is very visible when reading a large stats page over a slow link, because in this case haproxy will run at 100% CPU and strace shows mostly epoll_wait(0). It is very likely that some other applets like CLI, Lua, peers or SPOE have also been affected but that the effect were less noticeable because it was mixed with traffic. Ideally stream_int_update() should not touch these flags, but changing this would require a very careful auditing of all users. Instead here what we do is that we respect the flag if the channel still has output data. This way the flag will automatically disappear once the buffer is empty, and the applet function will be called only when input data remains, if at all. This patch alone is not enough to observe the behaviour change on the stats page because another bug takes over, addressed by next patch (BUG/MEDIUM: stats: don't ask for more data as long as we're responding). When both are applied, dumping stats for 5k backends over a 10 Mbps link take 1% CPU instead of 100%, with 1.5k epoll_wait() calls instead of 80k. This fix should be backported at least as far as 1.5.
2018-07-24 10:56:34 -04:00
else if (!(si->flags & SI_FL_WAIT_ROOM) || !co_data(ic)) {
/* (re)start reading and update timeout. Note: we don't recompute the timeout
* everytime we get here, otherwise it would risk never to expire. We only
* update it if is was not yet set. The stream socket handler will already
* have updated it if there has been a completed I/O.
*/
si->flags &= ~SI_FL_WAIT_ROOM;
if (!(ic->flags & (CF_READ_NOEXP|CF_DONT_READ)) && !tick_isset(ic->rex))
ic->rex = tick_add_ifset(now_ms, ic->rto);
}
}
if (!(oc->flags & CF_SHUTW)) {
/* Write not closed, update FD status and timeout for writes */
if (channel_is_empty(oc)) {
/* stop writing */
if (!(si->flags & SI_FL_WAIT_DATA)) {
if ((oc->flags & CF_SHUTW_NOW) == 0)
si->flags |= SI_FL_WAIT_DATA;
oc->wex = TICK_ETERNITY;
}
}
else {
/* (re)start writing and update timeout. Note: we don't recompute the timeout
* everytime we get here, otherwise it would risk never to expire. We only
* update it if is was not yet set. The stream socket handler will already
* have updated it if there has been a completed I/O.
*/
si->flags &= ~SI_FL_WAIT_DATA;
if (!tick_isset(oc->wex)) {
oc->wex = tick_add_ifset(now_ms, oc->wto);
if (tick_isset(ic->rex) && !(si->flags & SI_FL_INDEP_STR)) {
/* Note: depending on the protocol, we don't know if we're waiting
* for incoming data or not. So in order to prevent the socket from
* expiring read timeouts during writes, we refresh the read timeout,
* except if it was already infinite or if we have explicitly setup
* independent streams.
*/
ic->rex = tick_add_ifset(now_ms, ic->rto);
}
}
}
}
}
/* Updates the polling status of a connection outside of the connection handler
* based on the channel's flags and the stream interface's flags. It needs to be
* called once after the channels' flags have settled down and the stream has
* been updated. It is not designed to be called from within the connection
* handler itself.
*/
void stream_int_update_conn(struct stream_interface *si)
{
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
struct conn_stream *cs = __objt_cs(si->end);
if (!(ic->flags & CF_SHUTR)) {
/* Read not closed */
if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic))
__cs_stop_recv(cs);
else
__cs_want_recv(cs);
}
if (!(oc->flags & CF_SHUTW)) {
/* Write not closed */
if (channel_is_empty(oc))
__cs_stop_send(cs);
else
__cs_want_send(cs);
}
cs_update_mux_polling(cs);
}
/*
* This function performs a shutdown-read on a stream interface attached to
* a connection in a connected or init state (it does nothing for other
* states). It either shuts the read side or marks itself as closed. The buffer
* flags are updated to reflect the new state. If the stream interface has
* SI_FL_NOHALF, we also forward the close to the write side. If a control
* layer is defined, then it is supposed to be a socket layer and file
* descriptors are then shutdown or closed accordingly. The function
* automatically disables polling if needed.
*/
static void stream_int_shutr_conn(struct stream_interface *si)
{
struct conn_stream *cs = __objt_cs(si->end);
struct connection *conn = cs->conn;
struct channel *ic = si_ic(si);
ic->flags &= ~CF_SHUTR_NOW;
if (ic->flags & CF_SHUTR)
return;
ic->flags |= CF_SHUTR;
ic->rex = TICK_ETERNITY;
si->flags &= ~SI_FL_WAIT_ROOM;
if (si->state != SI_ST_EST && si->state != SI_ST_CON)
return;
if (si_oc(si)->flags & CF_SHUTW) {
cs_close(cs);
si->state = SI_ST_DIS;
si->exp = TICK_ETERNITY;
}
else if (si->flags & SI_FL_NOHALF) {
/* we want to immediately forward this close to the write side */
return stream_int_shutw_conn(si);
}
else if (conn->ctrl) {
/* we want the caller to disable polling on this FD */
cs_stop_recv(cs);
}
}
/*
* This function performs a shutdown-write on a stream interface attached to
* a connection in a connected or init state (it does nothing for other
* states). It either shuts the write side or marks itself as closed. The
* buffer flags are updated to reflect the new state. It does also close
* everything if the SI was marked as being in error state. If there is a
* data-layer shutdown, it is called.
*/
static void stream_int_shutw_conn(struct stream_interface *si)
{
struct conn_stream *cs = __objt_cs(si->end);
struct connection *conn = cs->conn;
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
oc->flags &= ~CF_SHUTW_NOW;
if (oc->flags & CF_SHUTW)
return;
oc->flags |= CF_SHUTW;
oc->wex = TICK_ETERNITY;
si->flags &= ~SI_FL_WAIT_DATA;
BUG/MEDIUM: stream: fix client-fin/server-fin handling A tcp half connection can cause 100% CPU on expiration. First reproduced with this haproxy configuration : global tune.bufsize 10485760 defaults timeout server-fin 90s timeout client-fin 90s backend node2 mode tcp timeout server 900s timeout connect 10s server def 127.0.0.1:3333 frontend fe_api mode tcp timeout client 900s bind :1990 use_backend node2 Ie timeout server-fin shorter than timeout server, the backend server sends data, this package is left in the cache of haproxy, the backend server continue sending fin package, haproxy recv fin package. this time the session information is as follows: time the session information is as follows: 0x2373470: proto=tcpv4 src=127.0.0.1:39513 fe=fe_api be=node2 srv=def ts=08 age=1s calls=3 rq[f=848000h,i=0,an=00h,rx=14m58s,wx=,ax=] rp[f=8004c020h,i=0,an=00h,rx=,wx=14m58s,ax=] s0=[7,0h,fd=6,ex=] s1=[7,18h,fd=7,ex=] exp=14m58s rp has set the CF_SHUTR state, next, the client sends the fin package, session information is as follows: 0x2373470: proto=tcpv4 src=127.0.0.1:39513 fe=fe_api be=node2 srv=def ts=08 age=38s calls=4 rq[f=84a020h,i=0,an=00h,rx=,wx=,ax=] rp[f=8004c020h,i=0,an=00h,rx=1m11s,wx=14m21s,ax=] s0=[7,0h,fd=6,ex=] s1=[9,10h,fd=7,ex=] exp=1m11s After waiting 90s, session information is as follows: 0x2373470: proto=tcpv4 src=127.0.0.1:39513 fe=fe_api be=node2 srv=def ts=04 age=4m11s calls=718074391 rq[f=84a020h,i=0,an=00h,rx=,wx=,ax=] rp[f=8004c020h,i=0,an=00h,rx=?,wx=10m49s,ax=] s0=[7,0h,fd=6,ex=] s1=[9,10h,fd=7,ex=] exp=? run(nice=0) cpu information: 6899 root 20 0 112224 21408 4260 R 100.0 0.7 3:04.96 haproxy Buffering is set to ensure that there is data in the haproxy buffer, and haproxy can receive the fin package, set the CF_SHUTR flag, If the CF_SHUTR flag has been set, The following code does not clear the timeout message, causing cpu 100%: stream.c:process_stream: if (unlikely((res->flags & (CF_SHUTR|CF_READ_TIMEOUT)) == CF_READ_TIMEOUT)) { if (si_b->flags & SI_FL_NOHALF) si_b->flags |= SI_FL_NOLINGER; si_shutr(si_b); } If you have closed the read, set the read timeout does not make sense. With or without cf_shutr, read timeout is set: if (tick_isset(s->be->timeout.serverfin)) { res->rto = s->be->timeout.serverfin; res->rex = tick_add(now_ms, res->rto); } After discussion on the mailing list, setting half-closed timeouts the hard way here doesn't make sense. They should be set only at the moment the shutdown() is performed. It will also solve a special case which was already reported of some half-closed timeouts not working when the shutw() is performed directly at the stream-interface layer (no analyser involved). Since the stream interface layer cannot know the timeout values, we'll have to store them directly in the stream interface so that they are used upon shutw(). This patch does this, fixing the problem. An easier reproducer to validate the fix is to keep the huge buffer and shorten all timeouts, then call it under tcploop server and client, and wait 3 seconds to see haproxy run at 100% CPU : global tune.bufsize 10485760 listen px bind :1990 timeout client 90s timeout server 90s timeout connect 1s timeout server-fin 3s timeout client-fin 3s server def 127.0.0.1:3333 $ tcploop 3333 L W N20 A P100 F P10000 & $ tcploop 127.0.0.1:1990 C S10000000 F
2017-03-10 12:41:51 -05:00
if (tick_isset(si->hcto)) {
ic->rto = si->hcto;
ic->rex = tick_add(now_ms, ic->rto);
}
switch (si->state) {
case SI_ST_EST:
/* we have to shut before closing, otherwise some short messages
* may never leave the system, especially when there are remaining
* unread data in the socket input buffer, or when nolinger is set.
* However, if SI_FL_NOLINGER is explicitly set, we know there is
* no risk so we close both sides immediately.
*/
if (si->flags & SI_FL_ERR) {
/* quick close, the socket is alredy shut anyway */
}
else if (si->flags & SI_FL_NOLINGER) {
/* unclean data-layer shutdown, typically an aborted request
* or a forwarded shutdown from a client to a server due to
* option abortonclose. No need for the TLS layer to try to
* emit a shutdown message.
*/
cs_shutw(cs, CS_SHW_SILENT);
}
else {
/* clean data-layer shutdown. This only happens on the
* frontend side, or on the backend side when forwarding
* a client close in TCP mode or in HTTP TUNNEL mode
* while option abortonclose is set. We want the TLS
* layer to try to signal it to the peer before we close.
*/
cs_shutw(cs, CS_SHW_NORMAL);
if (!(ic->flags & (CF_SHUTR|CF_DONT_READ))) {
/* OK just a shutw, but we want the caller
* to disable polling on this FD if exists.
*/
conn_cond_update_polling(conn);
return;
}
}
/* fall through */
case SI_ST_CON:
/* we may have to close a pending connection, and mark the
* response buffer as shutr
*/
cs_close(cs);
/* fall through */
case SI_ST_CER:
case SI_ST_QUE:
case SI_ST_TAR:
si->state = SI_ST_DIS;
/* fall through */
default:
si->flags &= ~(SI_FL_WAIT_ROOM | SI_FL_NOLINGER);
ic->flags &= ~CF_SHUTR_NOW;
ic->flags |= CF_SHUTR;
ic->rex = TICK_ETERNITY;
si->exp = TICK_ETERNITY;
}
}
/* This function is used for inter-stream-interface calls. It is called by the
* consumer to inform the producer side that it may be interested in checking
* for free space in the buffer. Note that it intentionally does not update
* timeouts, so that we can still check them later at wake-up. This function is
* dedicated to connection-based stream interfaces.
*/
static void stream_int_chk_rcv_conn(struct stream_interface *si)
{
struct channel *ic = si_ic(si);
struct conn_stream *cs = __objt_cs(si->end);
if (unlikely(si->state > SI_ST_EST || (ic->flags & CF_SHUTR)))
return;
if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic)) {
/* stop reading */
if (!(ic->flags & CF_DONT_READ)) /* full */ {
si->flags |= SI_FL_WAIT_ROOM;
}
__cs_stop_recv(cs);
}
else {
/* (re)start reading */
si->flags &= ~SI_FL_WAIT_ROOM;
__cs_want_recv(cs);
}
cs_update_mux_polling(cs);
}
/* This function is used for inter-stream-interface calls. It is called by the
* producer to inform the consumer side that it may be interested in checking
* for data in the buffer. Note that it intentionally does not update timeouts,
* so that we can still check them later at wake-up.
*/
static void stream_int_chk_snd_conn(struct stream_interface *si)
{
struct channel *oc = si_oc(si);
struct conn_stream *cs = __objt_cs(si->end);
BUG/MAJOR: stream-int: don't re-arm recv if send fails When 1) HAProxy configured to enable splice on both directions 2) After some high load, there are 2 input channels with their socket buffer being non-empty and pipe being full at the same time, sitting in `fd_cache` without any other fds. The 2 channels will repeatedly be stopped for receiving (pipe full) and waken for receiving (data in socket), thus getting out and in of `fd_cache`, making their fd swapping location in `fd_cache`. There is a `if (entry < fd_cache_num && fd_cache[entry] != fd) continue;` statement in `fd_process_cached_events` to prevent frequent polling, but since the only 2 fds are constantly swapping location, `fd_cache[entry] != fd` will always hold true, thus HAProxy can't make any progress. The root cause of the issue is dual : - there is a single fd_cache, for next events and for the ones being processed, while using two distinct arrays would avoid the problem. - the write side of the stream interface wakes the read side up even when it couldn't write, and this one really is a bug. Due to CF_WRITE_PARTIAL not being cleared during fast forwarding, a failed send() attempt will still cause ->chk_rcv() to be called on the other side, re-creating an entry for its connection fd in the cache, causing the same sequence to be repeated indefinitely without any opportunity to make progress. CF_WRITE_PARTIAL used to be used for what is present in these tests : check if a recent write operation was performed. It's part of the CF_WRITE_ACTIVITY set and is tested to check if timeouts need to be updated. It's also used to detect if a failed connect() may be retried. What this patch does is use CF_WROTE_DATA() to check for a successful write for connection retransmits, and to clear CF_WRITE_PARTIAL before preparing to send in stream_int_notify(). This way, timeouts are still updated each time a write succeeds, but chk_rcv() won't be called anymore after a failed write. It seems the fix is required all the way down to 1.5. Without this patch, the only workaround at this point is to disable splicing in at least one direction. Strictly speaking, splicing is not absolutely required, as regular forwarding could theorically cause the issue to happen if the timing is appropriate, but in practice it appears impossible to reproduce it without splicing, and even with splicing it may vary. The following config manages to reproduce it after a few attempts (haproxy going 100% CPU and having to be killed) : global maxpipes 50000 maxconn 10000 listen srv1 option splice-request option splice-response bind :8001 server s1 127.0.0.1:8002 server$ tcploop 8002 L N20 A R10 S1000000 R10 S1000000 R10 S1000000 R10 S1000000 R10 S1000000 client$ tcploop 8001 N20 C T S1000000 R10 J
2017-09-15 02:56:40 -04:00
/* ensure it's only set if a write attempt has succeeded */
oc->flags &= ~CF_WRITE_PARTIAL;
if (unlikely(si->state > SI_ST_EST || (oc->flags & CF_SHUTW)))
return;
if (unlikely(channel_is_empty(oc))) /* called with nothing to send ! */
return;
if (!oc->pipe && /* spliced data wants to be forwarded ASAP */
!(si->flags & SI_FL_WAIT_DATA)) /* not waiting for data */
return;
if (cs->flags & CS_FL_DATA_WR_ENA) {
/* already subscribed to write notifications, will be called
* anyway, so let's avoid calling it especially if the reader
* is not ready.
*/
return;
}
__cs_want_send(cs);
BUG/MAJOR: connection: always recompute polling status upon I/O Bryan Berry and Baptiste Assmann both reported some occasional CPU spinning loops where haproxy was still processing I/O but burning CPU for apparently uncaught events. What happens is the following sequence : - proxy is in TCP mode - a connection from a client initiates a connection to a server - the connection to the server does not immediately happen and is polled for - in the mean time, the client speaks and the stream interface calls ->chk_snd() on the peer connection to send the new data - chk_snd() calls send_loop() to send the data. This last one makes the connection succeed and empties the buffer, so it disables polling on the connection and on the FD by creating an update entry. - before the update is processed, poll() succeeds and reports a write event for this fd. The poller does fd_ev_set() on the FD to switch it to speculative mode - the IO handler is called with a connection which has no write flag but an FD which is enabled in speculative mode. - the connection does nothing useful. - conn_update_polling() at the end of conn_fd_handler() cannot disable the FD because there were no changes on this FD. - the handler is left with speculative polling still enabled on the FD, and will be called over and over until a poll event is needed to transfer data. There is no perfectly elegant solution to this. At least we should update the flags indicating the current polling status to reflect what is being done at the FD level. This will allow to detect that the FD needs to be disabled upon exit. chk_snd() also needs minor changes to correctly switch to speculative polling before calling send_loop(), and to reflect this in the connection flags. This is needed so that no event remains stuck there without any polling. In fact, chk_snd() and chk_rcv() should perform the same number of preparations and cleanups as conn_fd_handler().
2012-12-10 10:33:38 -05:00
si_cs_send(cs);
if (cs->flags & CS_FL_ERROR || cs->conn->flags & CO_FL_ERROR) {
/* Write error on the file descriptor */
__cs_stop_both(cs);
si->flags |= SI_FL_ERR;
goto out_wakeup;
}
/* OK, so now we know that some data might have been sent, and that we may
* have to poll first. We have to do that too if the buffer is not empty.
*/
if (channel_is_empty(oc)) {
/* the connection is established but we can't write. Either the
* buffer is empty, or we just refrain from sending because the
* ->o limit was reached. Maybe we just wrote the last
* chunk and need to close.
*/
__cs_stop_send(cs);
if (((oc->flags & (CF_SHUTW|CF_AUTO_CLOSE|CF_SHUTW_NOW)) ==
(CF_AUTO_CLOSE|CF_SHUTW_NOW)) &&
(si->state == SI_ST_EST)) {
si_shutw(si);
goto out_wakeup;
}
if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0)
si->flags |= SI_FL_WAIT_DATA;
oc->wex = TICK_ETERNITY;
}
else {
/* Otherwise there are remaining data to be sent in the buffer,
* which means we have to poll before doing so.
*/
__cs_want_send(cs);
si->flags &= ~SI_FL_WAIT_DATA;
if (!tick_isset(oc->wex))
oc->wex = tick_add_ifset(now_ms, oc->wto);
}
if (likely(oc->flags & CF_WRITE_ACTIVITY)) {
struct channel *ic = si_ic(si);
/* update timeout if we have written something */
if ((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL)) == CF_WRITE_PARTIAL &&
!channel_is_empty(oc))
oc->wex = tick_add_ifset(now_ms, oc->wto);
if (tick_isset(ic->rex) && !(si->flags & SI_FL_INDEP_STR)) {
/* Note: to prevent the client from expiring read timeouts
* during writes, we refresh it. We only do this if the
* interface is not configured for "independent streams",
* because for some applications it's better not to do this,
* for instance when continuously exchanging small amounts
* of data which can full the socket buffers long before a
* write timeout is detected.
*/
ic->rex = tick_add_ifset(now_ms, ic->rto);
}
}
/* in case of special condition (error, shutdown, end of write...), we
* have to notify the task.
*/
if (likely((oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR|CF_SHUTW)) ||
((oc->flags & CF_WAKE_WRITE) &&
((channel_is_empty(oc) && !oc->to_forward) ||
si->state != SI_ST_EST)))) {
out_wakeup:
if (!(si->flags & SI_FL_DONT_WAKE))
task_wakeup(si_task(si), TASK_WOKEN_IO);
}
/* commit possible polling changes */
cs_update_mux_polling(cs);
}
/*
* This is the callback which is called by the connection layer to receive data
* into the buffer from the connection. It iterates over the mux layer's
REORG: connection: rename the data layer the "transport layer" While working on the changes required to make the health checks use the new connections, it started to become obvious that some naming was not logical at all in the connections. Specifically, it is not logical to call the "data layer" the layer which is in charge for all the handshake and which does not yet provide a data layer once established until a session has allocated all the required buffers. In fact, it's more a transport layer, which makes much more sense. The transport layer offers a medium on which data can transit, and it offers the functions to move these data when the upper layer requests this. And it is the upper layer which iterates over the transport layer's functions to move data which should be called the data layer. The use case where it's obvious is with embryonic sessions : an incoming SSL connection is accepted. Only the connection is allocated, not the buffers nor stream interface, etc... The connection handles the SSL handshake by itself. Once this handshake is complete, we can't use the data functions because the buffers and stream interface are not there yet. Hence we have to first call a specific function to complete the session initialization, after which we'll be able to use the data functions. This clearly proves that SSL here is only a transport layer and that the stream interface constitutes the data layer. A similar change will be performed to rename app_cb => data, but the two could not be in the same commit for obvious reasons.
2012-10-02 18:19:48 -04:00
* rcv_buf function.
*/
static int si_cs_recv(struct conn_stream *cs)
{
struct connection *conn = cs->conn;
struct stream_interface *si = cs->data;
struct channel *ic = si_ic(si);
int ret, max, cur_read = 0;
int read_poll = MAX_READ_POLL_LOOPS;
/* stop immediately on errors. Note that we DON'T want to stop on
* POLL_ERR, as the poller might report a write error while there
* are still data available in the recv buffer. This typically
* happens when we send too large a request to a backend server
* which rejects it before reading it all.
*/
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
return 1; // We want to make sure si_cs_wake() is called, so that process_strema is woken up, on failure
/* If another call to si_cs_recv() failed, and we subscribed to
* recv events already, give up now.
*/
if (si->wait_list.wait_reason & SUB_CAN_RECV)
return 0;
/* maybe we were called immediately after an asynchronous shutr */
if (ic->flags & CF_SHUTR)
return 1;
/* stop here if we reached the end of data */
if (cs->flags & CS_FL_EOS)
goto out_shutdown_r;
if ((ic->flags & (CF_STREAMER | CF_STREAMER_FAST)) && !co_data(ic) &&
global.tune.idle_timer &&
(unsigned short)(now_ms - ic->last_read) >= global.tune.idle_timer) {
/* The buffer was empty and nothing was transferred for more
* than one second. This was caused by a pause and not by
* congestion. Reset any streaming mode to reduce latency.
*/
ic->xfer_small = 0;
ic->xfer_large = 0;
ic->flags &= ~(CF_STREAMER | CF_STREAMER_FAST);
}
/* First, let's see if we may splice data across the channel without
* using a buffer.
*/
if (conn->xprt->rcv_pipe && conn->mux->rcv_pipe &&
(ic->pipe || ic->to_forward >= MIN_SPLICE_FORWARD) &&
ic->flags & CF_KERN_SPLICING) {
if (c_data(ic)) {
/* We're embarrassed, there are already data pending in
* the buffer and we don't want to have them at two
* locations at a time. Let's indicate we need some
* place and ask the consumer to hurry.
*/
goto abort_splice;
}
if (unlikely(ic->pipe == NULL)) {
if (pipes_used >= global.maxpipes || !(ic->pipe = get_pipe())) {
ic->flags &= ~CF_KERN_SPLICING;
goto abort_splice;
}
}
ret = conn->mux->rcv_pipe(cs, ic->pipe, ic->to_forward);
if (ret < 0) {
/* splice not supported on this end, let's disable it */
ic->flags &= ~CF_KERN_SPLICING;
goto abort_splice;
}
if (ret > 0) {
if (ic->to_forward != CHN_INFINITE_FORWARD)
ic->to_forward -= ret;
ic->total += ret;
cur_read += ret;
ic->flags |= CF_READ_PARTIAL;
}
if (cs->flags & CS_FL_EOS)
goto out_shutdown_r;
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
return 1;
BUG/MEDIUM: splicing: fix abnormal CPU usage with splicing Mark Janssen reported an issue in 1.5-dev19 which was introduced in 1.5-dev12 by commit 96199b10. From time to time, randomly, the CPU usage spikes to 100% for seconds to minutes. A deep analysis of the traces provided shows that it happens when waiting for the response to a second pipelined HTTP request, or when trying to handle the received shutdown advertised by epoll() after the last block of data. Each time, splice() was involved with data pending in the pipe. The cause of this was that such events could not be taken into account by splice nor by recv and were left pending : - the transfer of the last block of data, optionally with a shutdown was not handled by splice() because of the validation that to_forward is higher than MIN_SPLICE_FORWARD ; - the next recv() call was inhibited because of the test on presence of data in the pipe. This is also what prevented the recv() call from handling a response to a pipelined request until the client had ACKed the previous response. No less than 4 different methods were experimented to fix this, and the current one was finally chosen. The principle is that if an event is not caught by splice(), then it MUST be caught by recv(). So we remove the condition on the pipe's emptiness to perform an recv(), and in order to prevent recv() from being used in the middle of a transfer, we mark supposedly full pipes with CO_FL_WAIT_ROOM, which makes sense because the reason for stopping a splice()-based receive is that the pipe is supposed to be full. The net effect is that we don't wake up and sleep in loops during these transient states. This happened much more often than expected, sometimes for a few cycles at end of transfers, but rarely long enough to be noticed, unless a client timed out with data pending in the pipe. The effect on CPU usage is visible even when transfering 1MB objects in pipeline, where the CPU usage drops from 10 to 6% on a small machine at medium bandwidth. Some further improvements are needed : - the last chunk of a splice() transfer is never done using splice due to the test on to_forward. This is wrong and should be performed with splice if the pipe has not yet been emptied ; - si_chk_snd() should not be called when the write event is already being polled, otherwise we're almost certain to get EAGAIN. Many thanks to Mark for all the traces he cared to provide, they were essential for understanding this issue which was not reproducible without. Only 1.5-dev is affected, no backport is needed.
2013-07-18 15:49:32 -04:00
if (conn->flags & CO_FL_WAIT_ROOM) {
/* the pipe is full or we have read enough data that it
* could soon be full. Let's stop before needing to poll.
*/
si->flags |= SI_FL_WAIT_ROOM;
__cs_stop_recv(cs);
BUG/MEDIUM: splicing: fix abnormal CPU usage with splicing Mark Janssen reported an issue in 1.5-dev19 which was introduced in 1.5-dev12 by commit 96199b10. From time to time, randomly, the CPU usage spikes to 100% for seconds to minutes. A deep analysis of the traces provided shows that it happens when waiting for the response to a second pipelined HTTP request, or when trying to handle the received shutdown advertised by epoll() after the last block of data. Each time, splice() was involved with data pending in the pipe. The cause of this was that such events could not be taken into account by splice nor by recv and were left pending : - the transfer of the last block of data, optionally with a shutdown was not handled by splice() because of the validation that to_forward is higher than MIN_SPLICE_FORWARD ; - the next recv() call was inhibited because of the test on presence of data in the pipe. This is also what prevented the recv() call from handling a response to a pipelined request until the client had ACKed the previous response. No less than 4 different methods were experimented to fix this, and the current one was finally chosen. The principle is that if an event is not caught by splice(), then it MUST be caught by recv(). So we remove the condition on the pipe's emptiness to perform an recv(), and in order to prevent recv() from being used in the middle of a transfer, we mark supposedly full pipes with CO_FL_WAIT_ROOM, which makes sense because the reason for stopping a splice()-based receive is that the pipe is supposed to be full. The net effect is that we don't wake up and sleep in loops during these transient states. This happened much more often than expected, sometimes for a few cycles at end of transfers, but rarely long enough to be noticed, unless a client timed out with data pending in the pipe. The effect on CPU usage is visible even when transfering 1MB objects in pipeline, where the CPU usage drops from 10 to 6% on a small machine at medium bandwidth. Some further improvements are needed : - the last chunk of a splice() transfer is never done using splice due to the test on to_forward. This is wrong and should be performed with splice if the pipe has not yet been emptied ; - si_chk_snd() should not be called when the write event is already being polled, otherwise we're almost certain to get EAGAIN. Many thanks to Mark for all the traces he cared to provide, they were essential for understanding this issue which was not reproducible without. Only 1.5-dev is affected, no backport is needed.
2013-07-18 15:49:32 -04:00
}
/* splice not possible (anymore), let's go on on standard copy */
}
abort_splice:
if (ic->pipe && unlikely(!ic->pipe->data)) {
put_pipe(ic->pipe);
ic->pipe = NULL;
}
BUG/MAJOR: Fix how the list of entities waiting for a buffer is handled When an entity tries to get a buffer, if it cannot be allocted, for example because the number of buffers which may be allocated per process is limited, this entity is added in a list (called <buffer_wq>) and wait for an available buffer. Historically, the <buffer_wq> list was logically attached to streams because it were the only entities likely to be added in it. Now, applets can also be waiting for a free buffer. And with filters, we could imagine to have more other entities waiting for a buffer. So it make sense to have a generic list. Anyway, with the current design there is a bug. When an applet failed to get a buffer, it will wait. But we add the stream attached to the applet in <buffer_wq>, instead of the applet itself. So when a buffer is available, we wake up the stream and not the waiting applet. So, it is possible to have waiting applets and never awakened. So, now, <buffer_wq> is independant from streams. And we really add the waiting entity in <buffer_wq>. To be generic, the entity is responsible to define the callback used to awaken it. In addition, applets will still request an input buffer when they become active. But they will not be sleeped anymore if no buffer are available. So this is the responsibility to the applet I/O handler to check if this buffer is allocated or not. This way, an applet can decide if this buffer is required or not and can do additional processing if not. [wt: backport to 1.7 and 1.6]
2016-12-09 11:30:18 -05:00
/* now we'll need a input buffer for the stream */
if (!channel_alloc_buffer(ic, &(si_strm(si)->buffer_wait))) {
si->flags |= SI_FL_WAIT_ROOM;
goto end_recv;
}
BUG/MEDIUM: splicing: fix abnormal CPU usage with splicing Mark Janssen reported an issue in 1.5-dev19 which was introduced in 1.5-dev12 by commit 96199b10. From time to time, randomly, the CPU usage spikes to 100% for seconds to minutes. A deep analysis of the traces provided shows that it happens when waiting for the response to a second pipelined HTTP request, or when trying to handle the received shutdown advertised by epoll() after the last block of data. Each time, splice() was involved with data pending in the pipe. The cause of this was that such events could not be taken into account by splice nor by recv and were left pending : - the transfer of the last block of data, optionally with a shutdown was not handled by splice() because of the validation that to_forward is higher than MIN_SPLICE_FORWARD ; - the next recv() call was inhibited because of the test on presence of data in the pipe. This is also what prevented the recv() call from handling a response to a pipelined request until the client had ACKed the previous response. No less than 4 different methods were experimented to fix this, and the current one was finally chosen. The principle is that if an event is not caught by splice(), then it MUST be caught by recv(). So we remove the condition on the pipe's emptiness to perform an recv(), and in order to prevent recv() from being used in the middle of a transfer, we mark supposedly full pipes with CO_FL_WAIT_ROOM, which makes sense because the reason for stopping a splice()-based receive is that the pipe is supposed to be full. The net effect is that we don't wake up and sleep in loops during these transient states. This happened much more often than expected, sometimes for a few cycles at end of transfers, but rarely long enough to be noticed, unless a client timed out with data pending in the pipe. The effect on CPU usage is visible even when transfering 1MB objects in pipeline, where the CPU usage drops from 10 to 6% on a small machine at medium bandwidth. Some further improvements are needed : - the last chunk of a splice() transfer is never done using splice due to the test on to_forward. This is wrong and should be performed with splice if the pipe has not yet been emptied ; - si_chk_snd() should not be called when the write event is already being polled, otherwise we're almost certain to get EAGAIN. Many thanks to Mark for all the traces he cared to provide, they were essential for understanding this issue which was not reproducible without. Only 1.5-dev is affected, no backport is needed.
2013-07-18 15:49:32 -04:00
/* Important note : if we're called with POLL_IN|POLL_HUP, it means the read polling
* was enabled, which implies that the recv buffer was not full. So we have a guarantee
* that if such an event is not handled above in splice, it will be handled here by
* recv().
*/
while (!(conn->flags & (CO_FL_ERROR | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE)) &&
!(cs->flags & (CS_FL_ERROR|CS_FL_EOS)) && !(ic->flags & CF_SHUTR)) {
max = channel_recv_max(ic);
if (!max) {
si->flags |= SI_FL_WAIT_ROOM;
break;
}
ret = cs->conn->mux->rcv_buf(cs, &ic->buf, max, co_data(ic) ? CO_RFL_BUF_WET : 0);
BUG/MEDIUM: stream-int: always set SI_FL_WAIT_ROOM on CS_FL_RCV_MORE When a stream interface tries to read data from a mux using rcv_buf(), sometimes it sees 0 as the return value and concludes that there's no more data while there are, resulting in the connection being polled for more data and no new attempt being made at reading these pending data. Now it will automatically check for flag CS_FL_RCV_MORE to know if the mux really did not have anything available or was not able to provide these data by lack of room in the destination buffer, and will set SI_FL_WAIT_ROOM accordingly. This will ensure that once current data lying in the buffer are forwarded to the other side, reading chk_rcv() will be called to re-enable reading. It's important to note that in practice it will rely on the mux's update_poll() function to re-enable reading and that where the calls are placed in the stream interface, it's not possible to perform a new synchronous rcv_buf() call. Thus a corner case remains where the mux cannot receive due to a full buffer or any similar condition, but needs to be able to wake itself up to deliver pending data. This is a limitation of the current connection/conn_stream API which will likely need a new event subscription to at least call ->wake() asynchronously (eg: mux->{kick,restart,touch,update} ?). For now the affected mux (h2 only) will have to take care of the extra logic to carefully enable polling to restart processing incoming data. This patch relies on previous one (MINOR: conn_stream: add new flag CS_FL_RCV_MORE to indicate pending data) and both must be backported to 1.8.
2017-12-10 15:19:33 -05:00
if (cs->flags & CS_FL_RCV_MORE)
si->flags |= SI_FL_WAIT_ROOM;
if (ret <= 0)
break;
cur_read += ret;
/* if we're allowed to directly forward data, we must update ->o */
if (ic->to_forward && !(ic->flags & (CF_SHUTW|CF_SHUTW_NOW))) {
unsigned long fwd = ret;
if (ic->to_forward != CHN_INFINITE_FORWARD) {
if (fwd > ic->to_forward)
fwd = ic->to_forward;
ic->to_forward -= fwd;
}
c_adv(ic, fwd);
}
ic->flags |= CF_READ_PARTIAL;
ic->total += ret;
if (!channel_may_recv(ic)) {
si->flags |= SI_FL_WAIT_ROOM;
break;
}
if ((ic->flags & CF_READ_DONTWAIT) || --read_poll <= 0) {
/*
* This used to be __conn_xprt_done_recv()
* This was changed to accomodate with the mux code,
* but we may have lost a worthwhile optimization.
*/
__cs_stop_recv(cs);
si->flags |= SI_FL_WAIT_ROOM;
break;
}
/* if too many bytes were missing from last read, it means that
* it's pointless trying to read again because the system does
* not have them in buffers.
*/
if (ret < max) {
/* if a streamer has read few data, it may be because we
* have exhausted system buffers. It's not worth trying
* again.
*/
if (ic->flags & CF_STREAMER)
break;
/* if we read a large block smaller than what we requested,
* it's almost certain we'll never get anything more.
*/
if (ret >= global.tune.recv_enough)
break;
}
} /* while !flags */
if (cur_read) {
if ((ic->flags & (CF_STREAMER | CF_STREAMER_FAST)) &&
(cur_read <= ic->buf.size / 2)) {
ic->xfer_large = 0;
ic->xfer_small++;
if (ic->xfer_small >= 3) {
/* we have read less than half of the buffer in
* one pass, and this happened at least 3 times.
* This is definitely not a streamer.
*/
ic->flags &= ~(CF_STREAMER | CF_STREAMER_FAST);
}
else if (ic->xfer_small >= 2) {
/* if the buffer has been at least half full twice,
* we receive faster than we send, so at least it
* is not a "fast streamer".
*/
ic->flags &= ~CF_STREAMER_FAST;
}
}
else if (!(ic->flags & CF_STREAMER_FAST) &&
(cur_read >= ic->buf.size - global.tune.maxrewrite)) {
/* we read a full buffer at once */
ic->xfer_small = 0;
ic->xfer_large++;
if (ic->xfer_large >= 3) {
/* we call this buffer a fast streamer if it manages
* to be filled in one call 3 consecutive times.
*/
ic->flags |= (CF_STREAMER | CF_STREAMER_FAST);
}
}
else {
ic->xfer_small = 0;
ic->xfer_large = 0;
}
ic->last_read = now_ms;
}
if (cur_read > 0) {
while (!LIST_ISEMPTY(&cs->recv_wait_list)) {
struct wait_list *sw = LIST_ELEM(cs->recv_wait_list.n,
struct wait_list *, list);
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
sw->wait_reason &= ~SUB_CAN_RECV;
tasklet_wakeup(sw->task);
}
while (!(LIST_ISEMPTY(&cs->sendrecv_wait_list))) {
struct wait_list *sw = LIST_ELEM(cs->sendrecv_wait_list.n,
struct wait_list *, list);
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
LIST_ADDQ(&cs->send_wait_list, &sw->list);
sw->wait_reason &= ~SUB_CAN_RECV;
tasklet_wakeup(sw->task);
}
}
end_recv:
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
return 1;
if (cs->flags & CS_FL_EOS)
/* connection closed */
goto out_shutdown_r;
/* Subscribe to receive events */
conn->mux->subscribe(cs, SUB_CAN_RECV, &si->wait_list);
return cur_read != 0;
out_shutdown_r:
/* we received a shutdown */
ic->flags |= CF_READ_NULL;
if (ic->flags & CF_AUTO_CLOSE)
channel_shutw_now(ic);
stream_sock_read0(si);
return 1;
}
/*
* This function propagates a null read received on a socket-based connection.
* It updates the stream interface. If the stream interface has SI_FL_NOHALF,
* the close is also forwarded to the write side as an abort.
*/
void stream_sock_read0(struct stream_interface *si)
{
struct conn_stream *cs = __objt_cs(si->end);
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
ic->flags &= ~CF_SHUTR_NOW;
if (ic->flags & CF_SHUTR)
return;
ic->flags |= CF_SHUTR;
ic->rex = TICK_ETERNITY;
si->flags &= ~SI_FL_WAIT_ROOM;
if (si->state != SI_ST_EST && si->state != SI_ST_CON)
return;
if (oc->flags & CF_SHUTW)
goto do_close;
if (si->flags & SI_FL_NOHALF) {
/* we want to immediately forward this close to the write side */
REORG/MAJOR: session: rename the "session" entity to "stream" With HTTP/2, we'll have to support multiplexed streams. A stream is in fact the largest part of what we currently call a session, it has buffers, logs, etc. In order to catch any error, this commit removes any reference to the struct session and tries to rename most "session" occurrences in function names to "stream" and "sess" to "strm" when that's related to a session. The files stream.{c,h} were added and session.{c,h} removed. The session will be reintroduced later and a few parts of the stream will progressively be moved overthere. It will more or less contain only what we need in an embryonic session. Sample fetch functions and converters will have to change a bit so that they'll use an L5 (session) instead of what's currently called "L4" which is in fact L6 for now. Once all changes are completed, we should see approximately this : L7 - http_txn L6 - stream L5 - session L4 - connection | applet There will be at most one http_txn per stream, and a same session will possibly be referenced by multiple streams. A connection will point to a session and to a stream. The session will hold all the information we need to keep even when we don't yet have a stream. Some more cleanup is needed because some code was already far from being clean. The server queue management still refers to sessions at many places while comments talk about connections. This will have to be cleaned up once we have a server-side connection pool manager. Stream flags "SN_*" still need to be renamed, it doesn't seem like any of them will need to move to the session.
2015-04-02 18:22:06 -04:00
/* force flag on ssl to keep stream in cache */
cs_shutw(cs, CS_SHW_SILENT);
goto do_close;
}
/* otherwise that's just a normal read shutdown */
__cs_stop_recv(cs);
return;
do_close:
/* OK we completely close the socket here just as if we went through si_shut[rw]() */
cs_close(cs);
oc->flags &= ~CF_SHUTW_NOW;
oc->flags |= CF_SHUTW;
oc->wex = TICK_ETERNITY;
si->flags &= ~(SI_FL_WAIT_DATA | SI_FL_WAIT_ROOM);
si->state = SI_ST_DIS;
si->exp = TICK_ETERNITY;
return;
}
/* Callback to be used by applet handlers upon completion. It updates the stream
* (which may or may not take this opportunity to try to forward data), then
* may re-enable the applet's based on the channels and stream interface's final
* states.
*/
void si_applet_wake_cb(struct stream_interface *si)
{
struct channel *ic = si_ic(si);
/* If the applet wants to write and the channel is closed, it's a
* broken pipe and it must be reported.
*/
if ((si->flags & SI_FL_WANT_PUT) && (ic->flags & CF_SHUTR))
si->flags |= SI_FL_ERR;
/* update the stream-int, channels, and possibly wake the stream up */
stream_int_notify(si);
/* stream_int_notify may pass throught checksnd and released some
* WAIT_ROOM flags. The process_stream will consider those flags
* to wakeup the appctx but in the case the task is not in runqueue
* we may have to wakeup the appctx immediately.
*/
if (!task_in_rq(si_task(si)))
stream_int_update_applet(si);
}
/* Updates the activity status of an applet outside of the applet handler based
* on the channel's flags and the stream interface's flags. It needs to be
* called once after the channels' flags have settled down and the stream has
* been updated. It is not designed to be called from within the applet handler
* itself.
*/
void stream_int_update_applet(struct stream_interface *si)
{
if (((si->flags & (SI_FL_WANT_PUT|SI_FL_WAIT_ROOM)) == SI_FL_WANT_PUT) ||
((si->flags & (SI_FL_WANT_GET|SI_FL_WAIT_DATA)) == SI_FL_WANT_GET))
appctx_wakeup(si_appctx(si));
}
/*
* This function performs a shutdown-read on a stream interface attached to an
* applet in a connected or init state (it does nothing for other states). It
* either shuts the read side or marks itself as closed. The buffer flags are
* updated to reflect the new state. If the stream interface has SI_FL_NOHALF,
* we also forward the close to the write side. The owner task is woken up if
* it exists.
*/
static void stream_int_shutr_applet(struct stream_interface *si)
{
struct channel *ic = si_ic(si);
ic->flags &= ~CF_SHUTR_NOW;
if (ic->flags & CF_SHUTR)
return;
ic->flags |= CF_SHUTR;
ic->rex = TICK_ETERNITY;
si->flags &= ~SI_FL_WAIT_ROOM;
/* Note: on shutr, we don't call the applet */
if (si->state != SI_ST_EST && si->state != SI_ST_CON)
return;
if (si_oc(si)->flags & CF_SHUTW) {
si_applet_release(si);
si->state = SI_ST_DIS;
si->exp = TICK_ETERNITY;
}
else if (si->flags & SI_FL_NOHALF) {
/* we want to immediately forward this close to the write side */
return stream_int_shutw_applet(si);
}
}
/*
* This function performs a shutdown-write on a stream interface attached to an
* applet in a connected or init state (it does nothing for other states). It
* either shuts the write side or marks itself as closed. The buffer flags are
* updated to reflect the new state. It does also close everything if the SI
* was marked as being in error state. The owner task is woken up if it exists.
*/
static void stream_int_shutw_applet(struct stream_interface *si)
{
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
oc->flags &= ~CF_SHUTW_NOW;
if (oc->flags & CF_SHUTW)
return;
oc->flags |= CF_SHUTW;
oc->wex = TICK_ETERNITY;
si->flags &= ~SI_FL_WAIT_DATA;
BUG/MEDIUM: stream: fix client-fin/server-fin handling A tcp half connection can cause 100% CPU on expiration. First reproduced with this haproxy configuration : global tune.bufsize 10485760 defaults timeout server-fin 90s timeout client-fin 90s backend node2 mode tcp timeout server 900s timeout connect 10s server def 127.0.0.1:3333 frontend fe_api mode tcp timeout client 900s bind :1990 use_backend node2 Ie timeout server-fin shorter than timeout server, the backend server sends data, this package is left in the cache of haproxy, the backend server continue sending fin package, haproxy recv fin package. this time the session information is as follows: time the session information is as follows: 0x2373470: proto=tcpv4 src=127.0.0.1:39513 fe=fe_api be=node2 srv=def ts=08 age=1s calls=3 rq[f=848000h,i=0,an=00h,rx=14m58s,wx=,ax=] rp[f=8004c020h,i=0,an=00h,rx=,wx=14m58s,ax=] s0=[7,0h,fd=6,ex=] s1=[7,18h,fd=7,ex=] exp=14m58s rp has set the CF_SHUTR state, next, the client sends the fin package, session information is as follows: 0x2373470: proto=tcpv4 src=127.0.0.1:39513 fe=fe_api be=node2 srv=def ts=08 age=38s calls=4 rq[f=84a020h,i=0,an=00h,rx=,wx=,ax=] rp[f=8004c020h,i=0,an=00h,rx=1m11s,wx=14m21s,ax=] s0=[7,0h,fd=6,ex=] s1=[9,10h,fd=7,ex=] exp=1m11s After waiting 90s, session information is as follows: 0x2373470: proto=tcpv4 src=127.0.0.1:39513 fe=fe_api be=node2 srv=def ts=04 age=4m11s calls=718074391 rq[f=84a020h,i=0,an=00h,rx=,wx=,ax=] rp[f=8004c020h,i=0,an=00h,rx=?,wx=10m49s,ax=] s0=[7,0h,fd=6,ex=] s1=[9,10h,fd=7,ex=] exp=? run(nice=0) cpu information: 6899 root 20 0 112224 21408 4260 R 100.0 0.7 3:04.96 haproxy Buffering is set to ensure that there is data in the haproxy buffer, and haproxy can receive the fin package, set the CF_SHUTR flag, If the CF_SHUTR flag has been set, The following code does not clear the timeout message, causing cpu 100%: stream.c:process_stream: if (unlikely((res->flags & (CF_SHUTR|CF_READ_TIMEOUT)) == CF_READ_TIMEOUT)) { if (si_b->flags & SI_FL_NOHALF) si_b->flags |= SI_FL_NOLINGER; si_shutr(si_b); } If you have closed the read, set the read timeout does not make sense. With or without cf_shutr, read timeout is set: if (tick_isset(s->be->timeout.serverfin)) { res->rto = s->be->timeout.serverfin; res->rex = tick_add(now_ms, res->rto); } After discussion on the mailing list, setting half-closed timeouts the hard way here doesn't make sense. They should be set only at the moment the shutdown() is performed. It will also solve a special case which was already reported of some half-closed timeouts not working when the shutw() is performed directly at the stream-interface layer (no analyser involved). Since the stream interface layer cannot know the timeout values, we'll have to store them directly in the stream interface so that they are used upon shutw(). This patch does this, fixing the problem. An easier reproducer to validate the fix is to keep the huge buffer and shorten all timeouts, then call it under tcploop server and client, and wait 3 seconds to see haproxy run at 100% CPU : global tune.bufsize 10485760 listen px bind :1990 timeout client 90s timeout server 90s timeout connect 1s timeout server-fin 3s timeout client-fin 3s server def 127.0.0.1:3333 $ tcploop 3333 L W N20 A P100 F P10000 & $ tcploop 127.0.0.1:1990 C S10000000 F
2017-03-10 12:41:51 -05:00
if (tick_isset(si->hcto)) {
ic->rto = si->hcto;
ic->rex = tick_add(now_ms, ic->rto);
}
/* on shutw we always wake the applet up */
appctx_wakeup(si_appctx(si));
switch (si->state) {
case SI_ST_EST:
/* we have to shut before closing, otherwise some short messages
* may never leave the system, especially when there are remaining
* unread data in the socket input buffer, or when nolinger is set.
* However, if SI_FL_NOLINGER is explicitly set, we know there is
* no risk so we close both sides immediately.
*/
if (!(si->flags & (SI_FL_ERR | SI_FL_NOLINGER)) &&
!(ic->flags & (CF_SHUTR|CF_DONT_READ)))
return;
/* fall through */
case SI_ST_CON:
case SI_ST_CER:
case SI_ST_QUE:
case SI_ST_TAR:
/* Note that none of these states may happen with applets */
si_applet_release(si);
si->state = SI_ST_DIS;
default:
si->flags &= ~(SI_FL_WAIT_ROOM | SI_FL_NOLINGER);
ic->flags &= ~CF_SHUTR_NOW;
ic->flags |= CF_SHUTR;
ic->rex = TICK_ETERNITY;
si->exp = TICK_ETERNITY;
}
}
/* chk_rcv function for applets */
static void stream_int_chk_rcv_applet(struct stream_interface *si)
{
struct channel *ic = si_ic(si);
DPRINTF(stderr, "%s: si=%p, si->state=%d ic->flags=%08x oc->flags=%08x\n",
__FUNCTION__,
si, si->state, ic->flags, si_oc(si)->flags);
if (unlikely(si->state != SI_ST_EST || (ic->flags & (CF_SHUTR|CF_DONT_READ))))
return;
/* here we only wake the applet up if it was waiting for some room */
if (!(si->flags & SI_FL_WAIT_ROOM))
return;
if (channel_may_recv(ic) && !ic->pipe) {
/* (re)start reading */
appctx_wakeup(si_appctx(si));
}
}
/* chk_snd function for applets */
static void stream_int_chk_snd_applet(struct stream_interface *si)
{
struct channel *oc = si_oc(si);
DPRINTF(stderr, "%s: si=%p, si->state=%d ic->flags=%08x oc->flags=%08x\n",
__FUNCTION__,
si, si->state, si_ic(si)->flags, oc->flags);
if (unlikely(si->state != SI_ST_EST || (oc->flags & CF_SHUTW)))
return;
/* we only wake the applet up if it was waiting for some data */
if (!(si->flags & SI_FL_WAIT_DATA))
return;
if (!tick_isset(oc->wex))
oc->wex = tick_add_ifset(now_ms, oc->wto);
if (!channel_is_empty(oc)) {
/* (re)start sending */
appctx_wakeup(si_appctx(si));
}
}
[MAJOR] add a connection error state to the stream_interface Tracking connection status changes was hard, and some code was redundant. A new SI_ST_CER state was added to the stream interface to indicate a past connection error, and an SI_FL_ERR flag was added to report past I/O error. The stream_sock code does not set the connection to SI_ST_CLO anymore in case of I/O error, it's the upper layer which does it. This makes it possible to know exactly when the file descriptors are allocated. The new SI_ST_CER state permitted to split tcp_connection_status() in two parts, one processing SI_ST_CON and the other one SI_ST_CER. Synchronous connection errors now make use of this last state, hence eliminating duplicate code. Some ib<->ob copy paste errors were found and fixed, and all entities setting SI_ST_CLO also shut the buffers down. Some of these stream_interface specific functions and structures have migrated to a new stream_interface.c file. Some types of errors are still not detected by the buffers. For instance, let's assume the following scenario in one single pass of process_session: a connection sits in SI_ST_TAR state during a retry. At TAR expiration, a new connection attempt is made, the connection is obtained and srv->cur_sess is increased. Then the buffer timeout is fires and everything is cleared, the new state becomes SI_ST_CLO. The cleaning code checks that previous state was either SI_ST_CON or SI_ST_EST to release the connection. But that's wrong because last state is still SI_ST_TAR. So the server's connection count does not get decreased. This means that prev_state must not be used, and must be replaced by some transition detection instead of level detection. The following debugging line was useful to track state changes : fprintf(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__, s->si[0].state, s->si[1].state, s->si[1].err_type, s->req->flags, s-> rep->flags);
2008-11-03 00:26:53 -05:00
/*
* Local variables:
* c-indent-level: 8
* c-basic-offset: 8
* End:
*/