haproxy/src/stream_interface.c

1379 lines
43 KiB
C
Raw Normal View History

[MAJOR] add a connection error state to the stream_interface Tracking connection status changes was hard, and some code was redundant. A new SI_ST_CER state was added to the stream interface to indicate a past connection error, and an SI_FL_ERR flag was added to report past I/O error. The stream_sock code does not set the connection to SI_ST_CLO anymore in case of I/O error, it's the upper layer which does it. This makes it possible to know exactly when the file descriptors are allocated. The new SI_ST_CER state permitted to split tcp_connection_status() in two parts, one processing SI_ST_CON and the other one SI_ST_CER. Synchronous connection errors now make use of this last state, hence eliminating duplicate code. Some ib<->ob copy paste errors were found and fixed, and all entities setting SI_ST_CLO also shut the buffers down. Some of these stream_interface specific functions and structures have migrated to a new stream_interface.c file. Some types of errors are still not detected by the buffers. For instance, let's assume the following scenario in one single pass of process_session: a connection sits in SI_ST_TAR state during a retry. At TAR expiration, a new connection attempt is made, the connection is obtained and srv->cur_sess is increased. Then the buffer timeout is fires and everything is cleared, the new state becomes SI_ST_CLO. The cleaning code checks that previous state was either SI_ST_CON or SI_ST_EST to release the connection. But that's wrong because last state is still SI_ST_TAR. So the server's connection count does not get decreased. This means that prev_state must not be used, and must be replaced by some transition detection instead of level detection. The following debugging line was useful to track state changes : fprintf(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__, s->si[0].state, s->si[1].state, s->si[1].err_type, s->req->flags, s-> rep->flags);
2008-11-03 00:26:53 -05:00
/*
* Functions managing stream_interface structures
*
* Copyright 2000-2012 Willy Tarreau <w@1wt.eu>
[MAJOR] add a connection error state to the stream_interface Tracking connection status changes was hard, and some code was redundant. A new SI_ST_CER state was added to the stream interface to indicate a past connection error, and an SI_FL_ERR flag was added to report past I/O error. The stream_sock code does not set the connection to SI_ST_CLO anymore in case of I/O error, it's the upper layer which does it. This makes it possible to know exactly when the file descriptors are allocated. The new SI_ST_CER state permitted to split tcp_connection_status() in two parts, one processing SI_ST_CON and the other one SI_ST_CER. Synchronous connection errors now make use of this last state, hence eliminating duplicate code. Some ib<->ob copy paste errors were found and fixed, and all entities setting SI_ST_CLO also shut the buffers down. Some of these stream_interface specific functions and structures have migrated to a new stream_interface.c file. Some types of errors are still not detected by the buffers. For instance, let's assume the following scenario in one single pass of process_session: a connection sits in SI_ST_TAR state during a retry. At TAR expiration, a new connection attempt is made, the connection is obtained and srv->cur_sess is increased. Then the buffer timeout is fires and everything is cleared, the new state becomes SI_ST_CLO. The cleaning code checks that previous state was either SI_ST_CON or SI_ST_EST to release the connection. But that's wrong because last state is still SI_ST_TAR. So the server's connection count does not get decreased. This means that prev_state must not be used, and must be replaced by some transition detection instead of level detection. The following debugging line was useful to track state changes : fprintf(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__, s->si[0].state, s->si[1].state, s->si[1].err_type, s->req->flags, s-> rep->flags);
2008-11-03 00:26:53 -05:00
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version
* 2 of the License, or (at your option) any later version.
*
*/
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <common/compat.h>
#include <common/config.h>
#include <common/debug.h>
#include <common/standard.h>
#include <common/ticks.h>
#include <common/time.h>
#include <proto/channel.h>
#include <proto/connection.h>
[MAJOR] add a connection error state to the stream_interface Tracking connection status changes was hard, and some code was redundant. A new SI_ST_CER state was added to the stream interface to indicate a past connection error, and an SI_FL_ERR flag was added to report past I/O error. The stream_sock code does not set the connection to SI_ST_CLO anymore in case of I/O error, it's the upper layer which does it. This makes it possible to know exactly when the file descriptors are allocated. The new SI_ST_CER state permitted to split tcp_connection_status() in two parts, one processing SI_ST_CON and the other one SI_ST_CER. Synchronous connection errors now make use of this last state, hence eliminating duplicate code. Some ib<->ob copy paste errors were found and fixed, and all entities setting SI_ST_CLO also shut the buffers down. Some of these stream_interface specific functions and structures have migrated to a new stream_interface.c file. Some types of errors are still not detected by the buffers. For instance, let's assume the following scenario in one single pass of process_session: a connection sits in SI_ST_TAR state during a retry. At TAR expiration, a new connection attempt is made, the connection is obtained and srv->cur_sess is increased. Then the buffer timeout is fires and everything is cleared, the new state becomes SI_ST_CLO. The cleaning code checks that previous state was either SI_ST_CON or SI_ST_EST to release the connection. But that's wrong because last state is still SI_ST_TAR. So the server's connection count does not get decreased. This means that prev_state must not be used, and must be replaced by some transition detection instead of level detection. The following debugging line was useful to track state changes : fprintf(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__, s->si[0].state, s->si[1].state, s->si[1].err_type, s->req->flags, s-> rep->flags);
2008-11-03 00:26:53 -05:00
#include <proto/fd.h>
#include <proto/pipe.h>
#include <proto/stream_interface.h>
[MAJOR] add a connection error state to the stream_interface Tracking connection status changes was hard, and some code was redundant. A new SI_ST_CER state was added to the stream interface to indicate a past connection error, and an SI_FL_ERR flag was added to report past I/O error. The stream_sock code does not set the connection to SI_ST_CLO anymore in case of I/O error, it's the upper layer which does it. This makes it possible to know exactly when the file descriptors are allocated. The new SI_ST_CER state permitted to split tcp_connection_status() in two parts, one processing SI_ST_CON and the other one SI_ST_CER. Synchronous connection errors now make use of this last state, hence eliminating duplicate code. Some ib<->ob copy paste errors were found and fixed, and all entities setting SI_ST_CLO also shut the buffers down. Some of these stream_interface specific functions and structures have migrated to a new stream_interface.c file. Some types of errors are still not detected by the buffers. For instance, let's assume the following scenario in one single pass of process_session: a connection sits in SI_ST_TAR state during a retry. At TAR expiration, a new connection attempt is made, the connection is obtained and srv->cur_sess is increased. Then the buffer timeout is fires and everything is cleared, the new state becomes SI_ST_CLO. The cleaning code checks that previous state was either SI_ST_CON or SI_ST_EST to release the connection. But that's wrong because last state is still SI_ST_TAR. So the server's connection count does not get decreased. This means that prev_state must not be used, and must be replaced by some transition detection instead of level detection. The following debugging line was useful to track state changes : fprintf(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__, s->si[0].state, s->si[1].state, s->si[1].err_type, s->req->flags, s-> rep->flags);
2008-11-03 00:26:53 -05:00
#include <proto/task.h>
#include <types/pipe.h>
/* socket functions used when running a stream interface as a task */
static void stream_int_update_embedded(struct stream_interface *si);
static void stream_int_shutr(struct stream_interface *si);
static void stream_int_shutw(struct stream_interface *si);
static void stream_int_chk_rcv(struct stream_interface *si);
static void stream_int_chk_snd(struct stream_interface *si);
static void stream_int_update_conn(struct stream_interface *si);
static void stream_int_shutr_conn(struct stream_interface *si);
static void stream_int_shutw_conn(struct stream_interface *si);
static void stream_int_chk_rcv_conn(struct stream_interface *si);
static void stream_int_chk_snd_conn(struct stream_interface *si);
static void si_conn_recv_cb(struct connection *conn);
static void si_conn_send_cb(struct connection *conn);
static int si_conn_wake_cb(struct connection *conn);
static int si_idle_conn_wake_cb(struct connection *conn);
static void si_idle_conn_null_cb(struct connection *conn);
/* stream-interface operations for embedded tasks */
struct si_ops si_embedded_ops = {
.update = stream_int_update_embedded,
.chk_rcv = stream_int_chk_rcv,
.chk_snd = stream_int_chk_snd,
.shutr = stream_int_shutr,
.shutw = stream_int_shutw,
};
/* stream-interface operations for connections */
struct si_ops si_conn_ops = {
.update = stream_int_update_conn,
.chk_rcv = stream_int_chk_rcv_conn,
.chk_snd = stream_int_chk_snd_conn,
.shutr = stream_int_shutr_conn,
.shutw = stream_int_shutw_conn,
};
struct data_cb si_conn_cb = {
.recv = si_conn_recv_cb,
.send = si_conn_send_cb,
.wake = si_conn_wake_cb,
};
struct data_cb si_idle_conn_cb = {
.recv = si_idle_conn_null_cb,
.send = si_idle_conn_null_cb,
.wake = si_idle_conn_wake_cb,
};
[MAJOR] add a connection error state to the stream_interface Tracking connection status changes was hard, and some code was redundant. A new SI_ST_CER state was added to the stream interface to indicate a past connection error, and an SI_FL_ERR flag was added to report past I/O error. The stream_sock code does not set the connection to SI_ST_CLO anymore in case of I/O error, it's the upper layer which does it. This makes it possible to know exactly when the file descriptors are allocated. The new SI_ST_CER state permitted to split tcp_connection_status() in two parts, one processing SI_ST_CON and the other one SI_ST_CER. Synchronous connection errors now make use of this last state, hence eliminating duplicate code. Some ib<->ob copy paste errors were found and fixed, and all entities setting SI_ST_CLO also shut the buffers down. Some of these stream_interface specific functions and structures have migrated to a new stream_interface.c file. Some types of errors are still not detected by the buffers. For instance, let's assume the following scenario in one single pass of process_session: a connection sits in SI_ST_TAR state during a retry. At TAR expiration, a new connection attempt is made, the connection is obtained and srv->cur_sess is increased. Then the buffer timeout is fires and everything is cleared, the new state becomes SI_ST_CLO. The cleaning code checks that previous state was either SI_ST_CON or SI_ST_EST to release the connection. But that's wrong because last state is still SI_ST_TAR. So the server's connection count does not get decreased. This means that prev_state must not be used, and must be replaced by some transition detection instead of level detection. The following debugging line was useful to track state changes : fprintf(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__, s->si[0].state, s->si[1].state, s->si[1].err_type, s->req->flags, s-> rep->flags);
2008-11-03 00:26:53 -05:00
/*
* This function only has to be called once after a wakeup event in case of
* suspected timeout. It controls the stream interface timeouts and sets
* si->flags accordingly. It does NOT close anything, as this timeout may
* be used for any purpose. It returns 1 if the timeout fired, otherwise
* zero.
*/
int stream_int_check_timeouts(struct stream_interface *si)
{
if (tick_is_expired(si->exp, now_ms)) {
si->flags |= SI_FL_EXP;
return 1;
}
return 0;
}
/* to be called only when in SI_ST_DIS with SI_FL_ERR */
[MAJOR] add a connection error state to the stream_interface Tracking connection status changes was hard, and some code was redundant. A new SI_ST_CER state was added to the stream interface to indicate a past connection error, and an SI_FL_ERR flag was added to report past I/O error. The stream_sock code does not set the connection to SI_ST_CLO anymore in case of I/O error, it's the upper layer which does it. This makes it possible to know exactly when the file descriptors are allocated. The new SI_ST_CER state permitted to split tcp_connection_status() in two parts, one processing SI_ST_CON and the other one SI_ST_CER. Synchronous connection errors now make use of this last state, hence eliminating duplicate code. Some ib<->ob copy paste errors were found and fixed, and all entities setting SI_ST_CLO also shut the buffers down. Some of these stream_interface specific functions and structures have migrated to a new stream_interface.c file. Some types of errors are still not detected by the buffers. For instance, let's assume the following scenario in one single pass of process_session: a connection sits in SI_ST_TAR state during a retry. At TAR expiration, a new connection attempt is made, the connection is obtained and srv->cur_sess is increased. Then the buffer timeout is fires and everything is cleared, the new state becomes SI_ST_CLO. The cleaning code checks that previous state was either SI_ST_CON or SI_ST_EST to release the connection. But that's wrong because last state is still SI_ST_TAR. So the server's connection count does not get decreased. This means that prev_state must not be used, and must be replaced by some transition detection instead of level detection. The following debugging line was useful to track state changes : fprintf(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__, s->si[0].state, s->si[1].state, s->si[1].err_type, s->req->flags, s-> rep->flags);
2008-11-03 00:26:53 -05:00
void stream_int_report_error(struct stream_interface *si)
{
if (!si->err_type)
si->err_type = SI_ET_DATA_ERR;
si->ob->flags |= CF_WRITE_ERROR;
si->ib->flags |= CF_READ_ERROR;
[MAJOR] add a connection error state to the stream_interface Tracking connection status changes was hard, and some code was redundant. A new SI_ST_CER state was added to the stream interface to indicate a past connection error, and an SI_FL_ERR flag was added to report past I/O error. The stream_sock code does not set the connection to SI_ST_CLO anymore in case of I/O error, it's the upper layer which does it. This makes it possible to know exactly when the file descriptors are allocated. The new SI_ST_CER state permitted to split tcp_connection_status() in two parts, one processing SI_ST_CON and the other one SI_ST_CER. Synchronous connection errors now make use of this last state, hence eliminating duplicate code. Some ib<->ob copy paste errors were found and fixed, and all entities setting SI_ST_CLO also shut the buffers down. Some of these stream_interface specific functions and structures have migrated to a new stream_interface.c file. Some types of errors are still not detected by the buffers. For instance, let's assume the following scenario in one single pass of process_session: a connection sits in SI_ST_TAR state during a retry. At TAR expiration, a new connection attempt is made, the connection is obtained and srv->cur_sess is increased. Then the buffer timeout is fires and everything is cleared, the new state becomes SI_ST_CLO. The cleaning code checks that previous state was either SI_ST_CON or SI_ST_EST to release the connection. But that's wrong because last state is still SI_ST_TAR. So the server's connection count does not get decreased. This means that prev_state must not be used, and must be replaced by some transition detection instead of level detection. The following debugging line was useful to track state changes : fprintf(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__, s->si[0].state, s->si[1].state, s->si[1].err_type, s->req->flags, s-> rep->flags);
2008-11-03 00:26:53 -05:00
}
/*
* Returns a message to the client ; the connection is shut down for read,
* and the request is cleared so that no server connection can be initiated.
* The buffer is marked for read shutdown on the other side to protect the
* message, and the buffer write is enabled. The message is contained in a
* "chunk". If it is null, then an empty message is used. The reply buffer does
* not need to be empty before this, and its contents will not be overwritten.
* The primary goal of this function is to return error messages to a client.
*/
void stream_int_retnclose(struct stream_interface *si, const struct chunk *msg)
{
CLEANUP: channel: use "channel" instead of "buffer" in function names This is a massive rename of most functions which should make use of the word "channel" instead of the word "buffer" in their names. In concerns the following ones (new names) : unsigned long long channel_forward(struct channel *buf, unsigned long long bytes); static inline void channel_init(struct channel *buf) static inline int channel_input_closed(struct channel *buf) static inline int channel_output_closed(struct channel *buf) static inline void channel_check_timeouts(struct channel *b) static inline void channel_erase(struct channel *buf) static inline void channel_shutr_now(struct channel *buf) static inline void channel_shutw_now(struct channel *buf) static inline void channel_abort(struct channel *buf) static inline void channel_stop_hijacker(struct channel *buf) static inline void channel_auto_connect(struct channel *buf) static inline void channel_dont_connect(struct channel *buf) static inline void channel_auto_close(struct channel *buf) static inline void channel_dont_close(struct channel *buf) static inline void channel_auto_read(struct channel *buf) static inline void channel_dont_read(struct channel *buf) unsigned long long channel_forward(struct channel *buf, unsigned long long bytes) Some functions provided by channel.[ch] have kept their "buffer" name because they are really designed to act on the buffer according to some information gathered from the channel. They have been moved together to the same place in the file for better readability but they were not changed at all. The "buffer" memory pool was also renamed "channel".
2012-08-27 18:06:31 -04:00
channel_auto_read(si->ib);
channel_abort(si->ib);
channel_auto_close(si->ib);
channel_erase(si->ib);
bi_erase(si->ob);
if (likely(msg && msg->len))
bo_inject(si->ob, msg->str, msg->len);
si->ob->wex = tick_add_ifset(now_ms, si->ob->wto);
CLEANUP: channel: use "channel" instead of "buffer" in function names This is a massive rename of most functions which should make use of the word "channel" instead of the word "buffer" in their names. In concerns the following ones (new names) : unsigned long long channel_forward(struct channel *buf, unsigned long long bytes); static inline void channel_init(struct channel *buf) static inline int channel_input_closed(struct channel *buf) static inline int channel_output_closed(struct channel *buf) static inline void channel_check_timeouts(struct channel *b) static inline void channel_erase(struct channel *buf) static inline void channel_shutr_now(struct channel *buf) static inline void channel_shutw_now(struct channel *buf) static inline void channel_abort(struct channel *buf) static inline void channel_stop_hijacker(struct channel *buf) static inline void channel_auto_connect(struct channel *buf) static inline void channel_dont_connect(struct channel *buf) static inline void channel_auto_close(struct channel *buf) static inline void channel_dont_close(struct channel *buf) static inline void channel_auto_read(struct channel *buf) static inline void channel_dont_read(struct channel *buf) unsigned long long channel_forward(struct channel *buf, unsigned long long bytes) Some functions provided by channel.[ch] have kept their "buffer" name because they are really designed to act on the buffer according to some information gathered from the channel. They have been moved together to the same place in the file for better readability but they were not changed at all. The "buffer" memory pool was also renamed "channel".
2012-08-27 18:06:31 -04:00
channel_auto_read(si->ob);
channel_auto_close(si->ob);
channel_shutr_now(si->ob);
}
/* default update function for embedded tasks, to be used at the end of the i/o handler */
static void stream_int_update_embedded(struct stream_interface *si)
{
int old_flags = si->flags;
DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
__FUNCTION__,
si, si->state, si->ib->flags, si->ob->flags);
if (si->state != SI_ST_EST)
return;
if ((si->ob->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW &&
channel_is_empty(si->ob))
si_shutw(si);
if ((si->ob->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0 && !channel_full(si->ob))
si->flags |= SI_FL_WAIT_DATA;
/* we're almost sure that we need some space if the buffer is not
* empty, even if it's not full, because the applets can't fill it.
*/
if ((si->ib->flags & (CF_SHUTR|CF_DONT_READ)) == 0 && !channel_is_empty(si->ib))
si->flags |= SI_FL_WAIT_ROOM;
if (si->ob->flags & CF_WRITE_ACTIVITY) {
if (tick_isset(si->ob->wex))
si->ob->wex = tick_add_ifset(now_ms, si->ob->wto);
}
if (si->ib->flags & CF_READ_ACTIVITY ||
(si->ob->flags & CF_WRITE_ACTIVITY && !(si->flags & SI_FL_INDEP_STR))) {
if (tick_isset(si->ib->rex))
si->ib->rex = tick_add_ifset(now_ms, si->ib->rto);
}
/* save flags to detect changes */
old_flags = si->flags;
if (likely((si->ob->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL &&
!channel_full(si->ob) &&
(si->ob->prod->flags & SI_FL_WAIT_ROOM)))
si_chk_rcv(si->ob->prod);
if (((si->ib->flags & CF_READ_PARTIAL) && !channel_is_empty(si->ib)) &&
(si->ib->cons->flags & SI_FL_WAIT_DATA)) {
si_chk_snd(si->ib->cons);
/* check if the consumer has freed some space */
if (!channel_full(si->ib))
si->flags &= ~SI_FL_WAIT_ROOM;
}
/* Note that we're trying to wake up in two conditions here :
* - special event, which needs the holder task attention
* - status indicating that the applet can go on working. This
* is rather hard because we might be blocking on output and
* don't want to wake up on input and vice-versa. The idea is
* to only rely on the changes the chk_* might have performed.
*/
if (/* check stream interface changes */
((old_flags & ~si->flags) & (SI_FL_WAIT_ROOM|SI_FL_WAIT_DATA)) ||
/* changes on the production side */
(si->ib->flags & (CF_READ_NULL|CF_READ_ERROR)) ||
si->state != SI_ST_EST ||
(si->flags & SI_FL_ERR) ||
((si->ib->flags & CF_READ_PARTIAL) &&
(!si->ib->to_forward || si->ib->cons->state != SI_ST_EST)) ||
/* changes on the consumption side */
(si->ob->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
((si->ob->flags & CF_WRITE_ACTIVITY) &&
((si->ob->flags & CF_SHUTW) ||
((si->ob->flags & CF_WAKE_WRITE) &&
(si->ob->prod->state != SI_ST_EST ||
(channel_is_empty(si->ob) && !si->ob->to_forward)))))) {
if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
task_wakeup(si->owner, TASK_WOKEN_IO);
}
if (si->ib->flags & CF_READ_ACTIVITY)
si->ib->flags &= ~CF_READ_DONTWAIT;
}
/*
* This function performs a shutdown-read on a stream interface attached to an
* applet in a connected or init state (it does nothing for other states). It
* either shuts the read side or marks itself as closed. The buffer flags are
* updated to reflect the new state. If the stream interface has SI_FL_NOHALF,
* we also forward the close to the write side. The owner task is woken up if
* it exists.
*/
static void stream_int_shutr(struct stream_interface *si)
{
si->ib->flags &= ~CF_SHUTR_NOW;
if (si->ib->flags & CF_SHUTR)
return;
si->ib->flags |= CF_SHUTR;
si->ib->rex = TICK_ETERNITY;
si->flags &= ~SI_FL_WAIT_ROOM;
if (si->state != SI_ST_EST && si->state != SI_ST_CON)
return;
if (si->ob->flags & CF_SHUTW) {
si->state = SI_ST_DIS;
si->exp = TICK_ETERNITY;
si_applet_release(si);
}
else if (si->flags & SI_FL_NOHALF) {
/* we want to immediately forward this close to the write side */
return stream_int_shutw(si);
}
/* note that if the task exists, it must unregister itself once it runs */
if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
task_wakeup(si->owner, TASK_WOKEN_IO);
}
/*
* This function performs a shutdown-write on a stream interface attached to an
* applet in a connected or init state (it does nothing for other states). It
* either shuts the write side or marks itself as closed. The buffer flags are
* updated to reflect the new state. It does also close everything if the SI
* was marked as being in error state. The owner task is woken up if it exists.
*/
static void stream_int_shutw(struct stream_interface *si)
{
si->ob->flags &= ~CF_SHUTW_NOW;
if (si->ob->flags & CF_SHUTW)
return;
si->ob->flags |= CF_SHUTW;
si->ob->wex = TICK_ETERNITY;
si->flags &= ~SI_FL_WAIT_DATA;
switch (si->state) {
case SI_ST_EST:
/* we have to shut before closing, otherwise some short messages
* may never leave the system, especially when there are remaining
* unread data in the socket input buffer, or when nolinger is set.
* However, if SI_FL_NOLINGER is explicitly set, we know there is
* no risk so we close both sides immediately.
*/
if (!(si->flags & (SI_FL_ERR | SI_FL_NOLINGER)) &&
!(si->ib->flags & (CF_SHUTR|CF_DONT_READ)))
return;
/* fall through */
case SI_ST_CON:
case SI_ST_CER:
case SI_ST_QUE:
case SI_ST_TAR:
/* Note that none of these states may happen with applets */
si->state = SI_ST_DIS;
si_applet_release(si);
default:
si->flags &= ~(SI_FL_WAIT_ROOM | SI_FL_NOLINGER);
si->ib->flags &= ~CF_SHUTR_NOW;
si->ib->flags |= CF_SHUTR;
si->ib->rex = TICK_ETERNITY;
si->exp = TICK_ETERNITY;
}
/* note that if the task exists, it must unregister itself once it runs */
if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
task_wakeup(si->owner, TASK_WOKEN_IO);
}
/* default chk_rcv function for scheduled tasks */
static void stream_int_chk_rcv(struct stream_interface *si)
{
struct channel *ib = si->ib;
DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
__FUNCTION__,
si, si->state, si->ib->flags, si->ob->flags);
if (unlikely(si->state != SI_ST_EST || (ib->flags & (CF_SHUTR|CF_DONT_READ))))
return;
if (channel_full(ib)) {
/* stop reading */
si->flags |= SI_FL_WAIT_ROOM;
}
else {
/* (re)start reading */
si->flags &= ~SI_FL_WAIT_ROOM;
if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
task_wakeup(si->owner, TASK_WOKEN_IO);
}
}
/* default chk_snd function for scheduled tasks */
static void stream_int_chk_snd(struct stream_interface *si)
{
struct channel *ob = si->ob;
DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
__FUNCTION__,
si, si->state, si->ib->flags, si->ob->flags);
if (unlikely(si->state != SI_ST_EST || (si->ob->flags & CF_SHUTW)))
return;
if (!(si->flags & SI_FL_WAIT_DATA) || /* not waiting for data */
channel_is_empty(ob)) /* called with nothing to send ! */
return;
/* Otherwise there are remaining data to be sent in the buffer,
* so we tell the handler.
*/
si->flags &= ~SI_FL_WAIT_DATA;
if (!tick_isset(ob->wex))
ob->wex = tick_add_ifset(now_ms, ob->wto);
if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
task_wakeup(si->owner, TASK_WOKEN_IO);
}
/* Register an applet to handle a stream_interface as part of the
* stream interface's owner task. The SI will wake it up everytime it
* is solicited. The task's processing function must call the applet's
* function before returning. It must be deleted by the task handler
* using stream_int_unregister_handler(), possibly from within the
* function itself. It also pre-initializes the applet's context and
* returns it (or NULL in case it could not be allocated).
*/
struct appctx *stream_int_register_handler(struct stream_interface *si, struct si_applet *app)
{
struct appctx *appctx;
DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", app, si, si->owner);
appctx = si_alloc_appctx(si);
if (!si)
return NULL;
appctx_set_applet(appctx, app);
si->flags |= SI_FL_WAIT_DATA;
return si_appctx(si);
}
/* Unregister a stream interface handler. This must be called by the handler task
* itself when it detects that it is in the SI_ST_DIS state.
*/
void stream_int_unregister_handler(struct stream_interface *si)
{
si_detach(si);
}
/* This callback is used to send a valid PROXY protocol line to a socket being
* established. It returns 0 if it fails in a fatal way or needs to poll to go
* further, otherwise it returns non-zero and removes itself from the connection's
* flags (the bit is provided in <flag> by the caller). It is designed to be
* called by the connection handler and relies on it to commit polling changes.
* Note that it can emit a PROXY line by relying on the other end's address
* when the connection is attached to a stream interface, or by resolving the
* local address otherwise (also called a LOCAL line).
*/
int conn_si_send_proxy(struct connection *conn, unsigned int flag)
{
/* we might have been called just after an asynchronous shutw */
if (conn->flags & CO_FL_SOCK_WR_SH)
goto out_error;
if (!conn_ctrl_ready(conn))
MAJOR: connection: add two new flags to indicate readiness of control/transport Currently the control and transport layers of a connection are supposed to be initialized when their respective pointers are not NULL. This will not work anymore when we plan to reuse connections, because there is an asymmetry between the accept() side and the connect() side : - on accept() side, the fd is set first, then the ctrl layer then the transport layer ; upon error, they must be undone in the reverse order, then the FD must be closed. The FD must not be deleted if the control layer was not yet initialized ; - on the connect() side, the fd is set last and there is no reliable way to know if it has been initialized or not. In practice it's initialized to -1 first but this is hackish and supposes that local FDs only will be used forever. Also, there are even less solutions for keeping trace of the transport layer's state. Also it is possible to support delayed close() when something (eg: logs) tracks some information requiring the transport and/or control layers, making it even more difficult to clean them. So the proposed solution is to add two flags to the connection : - CO_FL_CTRL_READY is set when the control layer is initialized (fd_insert) and cleared after it's released (fd_delete). - CO_FL_XPRT_READY is set when the control layer is initialized (xprt->init) and cleared after it's released (xprt->close). The functions have been adapted to rely on this and not on the pointers anymore. conn_xprt_close() was unused and dangerous : it did not close the control layer (eg: the socket itself) but still marks the transport layer as closed, preventing any future call to conn_full_close() from finishing the job. The problem comes from conn_full_close() in fact. It needs to close the xprt and ctrl layers independantly. After that we're still having an issue : we don't know based on ->ctrl alone whether the fd was registered or not. For this we use the two new flags CO_FL_XPRT_READY and CO_FL_CTRL_READY. We now rely on this and not on conn->xprt nor conn->ctrl anymore to decide what remains to be done on the connection. In order not to miss some flag assignments, we introduce conn_ctrl_init() to initialize the control layer, register the fd using fd_insert() and set the flag, and conn_ctrl_close() which unregisters the fd and removes the flag, but only if the transport layer was closed. Similarly, at the transport layer, conn_xprt_init() calls ->init and sets the flag, while conn_xprt_close() checks the flag, calls ->close and clears the flag, regardless xprt_ctx or xprt_st. This also ensures that the ->init and the ->close functions are called only once each and in the correct order. Note that conn_xprt_close() does nothing if the transport layer is still tracked. conn_full_close() now simply calls conn_xprt_close() then conn_full_close() in turn, which do nothing if CO_FL_XPRT_TRACKED is set. In order to handle the error path, we also provide conn_force_close() which ignores CO_FL_XPRT_TRACKED and closes the transport and the control layers in turns. All relevant instances of fd_delete() have been replaced with conn_force_close(). Now we always know what state the connection is in and we can expect to split its initialization.
2013-10-21 10:30:56 -04:00
goto out_error;
if (!fd_send_ready(conn->t.sock.fd))
goto out_wait;
/* If we have a PROXY line to send, we'll use this to validate the
* connection, in which case the connection is validated only once
* we've sent the whole proxy line. Otherwise we use connect().
*/
while (conn->send_proxy_ofs) {
int ret;
/* The target server expects a PROXY line to be sent first.
* If the send_proxy_ofs is negative, it corresponds to the
* offset to start sending from then end of the proxy string
* (which is recomputed every time since it's constant). If
* it is positive, it means we have to send from the start.
* We can only send a "normal" PROXY line when the connection
* is attached to a stream interface. Otherwise we can only
* send a LOCAL line (eg: for use with health checks).
*/
if (conn->data == &si_conn_cb) {
struct stream_interface *si = conn->owner;
struct connection *remote = objt_conn(si->ob->prod->end);
if (remote)
ret = make_proxy_line(trash.str, trash.size, &remote->addr.from, &remote->addr.to);
else
ret = make_proxy_line(trash.str, trash.size, NULL, NULL);
}
else {
/* The target server expects a LOCAL line to be sent first. Retrieving
* local or remote addresses may fail until the connection is established.
*/
conn_get_from_addr(conn);
if (!(conn->flags & CO_FL_ADDR_FROM_SET))
goto out_wait;
conn_get_to_addr(conn);
if (!(conn->flags & CO_FL_ADDR_TO_SET))
goto out_wait;
ret = make_proxy_line(trash.str, trash.size, &conn->addr.from, &conn->addr.to);
}
if (!ret)
goto out_error;
if (conn->send_proxy_ofs > 0)
conn->send_proxy_ofs = -ret; /* first call */
/* we have to send trash from (ret+sp for -sp bytes). If the
* data layer has a pending write, we'll also set MSG_MORE.
*/
ret = send(conn->t.sock.fd, trash.str + ret + conn->send_proxy_ofs, -conn->send_proxy_ofs,
(conn->flags & CO_FL_DATA_WR_ENA) ? MSG_MORE : 0);
if (ret == 0)
goto out_wait;
if (ret < 0) {
if (errno == EAGAIN || errno == ENOTCONN)
goto out_wait;
if (errno == EINTR)
continue;
conn->flags |= CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH;
goto out_error;
}
conn->send_proxy_ofs += ret; /* becomes zero once complete */
if (conn->send_proxy_ofs != 0)
goto out_wait;
/* OK we've sent the whole line, we're connected */
break;
}
/* The connection is ready now, simply return and let the connection
* handler notify upper layers if needed.
*/
if (conn->flags & CO_FL_WAIT_L4_CONN)
conn->flags &= ~CO_FL_WAIT_L4_CONN;
conn->flags &= ~flag;
return 1;
out_error:
/* Write error on the file descriptor */
conn->flags |= CO_FL_ERROR;
return 0;
out_wait:
__conn_sock_stop_recv(conn);
fd_cant_send(conn->t.sock.fd);
return 0;
}
/* Tiny I/O callback called on recv/send I/O events on idle connections.
* It simply sets the CO_FL_SOCK_RD_SH flag so that si_idle_conn_wake_cb()
* is notified and can kill the connection.
*/
static void si_idle_conn_null_cb(struct connection *conn)
{
if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH))
return;
if (fdtab[conn->t.sock.fd].ev & (FD_POLL_ERR|FD_POLL_HUP)) {
fdtab[conn->t.sock.fd].linger_risk = 0;
conn->flags |= CO_FL_SOCK_RD_SH;
}
else {
conn_drain(conn);
}
/* disable draining if we were called and have no drain function */
if (!conn->ctrl->drain)
__conn_data_stop_recv(conn);
}
/* Callback to be used by connection I/O handlers when some activity is detected
* on an idle server connection. Its main purpose is to kill the connection once
* a close was detected on it. It returns 0 if it did nothing serious, or -1 if
* it killed the connection.
*/
static int si_idle_conn_wake_cb(struct connection *conn)
{
struct stream_interface *si = conn->owner;
if (!conn_ctrl_ready(conn))
return 0;
if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH)) {
/* warning, we can't do anything on <conn> after this call ! */
conn_force_close(conn);
conn_free(conn);
si->end = NULL;
return -1;
}
return 0;
}
/* Callback to be used by connection I/O handlers upon completion. It differs from
* the update function in that it is designed to be called by lower layers after I/O
* events have been completed. It will also try to wake the associated task up if
* an important event requires special handling. It relies on the connection handler
* to commit any polling updates. The function always returns 0.
*/
static int si_conn_wake_cb(struct connection *conn)
{
struct stream_interface *si = conn->owner;
DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
__FUNCTION__,
si, si->state, si->ib->flags, si->ob->flags);
if (conn->flags & CO_FL_ERROR)
si->flags |= SI_FL_ERR;
/* check for recent connection establishment */
if (unlikely(!(conn->flags & (CO_FL_WAIT_L4_CONN | CO_FL_WAIT_L6_CONN | CO_FL_CONNECTED)))) {
si->exp = TICK_ETERNITY;
si->ob->flags |= CF_WRITE_NULL;
}
/* process consumer side */
if (channel_is_empty(si->ob)) {
if (((si->ob->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW) &&
(si->state == SI_ST_EST))
stream_int_shutw_conn(si);
__conn_data_stop_send(conn);
si->ob->wex = TICK_ETERNITY;
}
if ((si->ob->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0 && !channel_full(si->ob))
si->flags |= SI_FL_WAIT_DATA;
if (si->ob->flags & CF_WRITE_ACTIVITY) {
/* update timeouts if we have written something */
if ((si->ob->flags & (CF_SHUTW|CF_WRITE_PARTIAL)) == CF_WRITE_PARTIAL &&
!channel_is_empty(si->ob))
if (tick_isset(si->ob->wex))
si->ob->wex = tick_add_ifset(now_ms, si->ob->wto);
if (!(si->flags & SI_FL_INDEP_STR))
if (tick_isset(si->ib->rex))
si->ib->rex = tick_add_ifset(now_ms, si->ib->rto);
if (likely((si->ob->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL &&
!channel_full(si->ob) &&
(si->ob->prod->flags & SI_FL_WAIT_ROOM)))
si_chk_rcv(si->ob->prod);
}
/* process producer side.
* We might have some data the consumer is waiting for.
* We can do fast-forwarding, but we avoid doing this for partial
* buffers, because it is very likely that it will be done again
* immediately afterwards once the following data is parsed (eg:
* HTTP chunking).
*/
if (((si->ib->flags & CF_READ_PARTIAL) && !channel_is_empty(si->ib)) &&
(si->ib->pipe /* always try to send spliced data */ ||
(si->ib->buf->i == 0 && (si->ib->cons->flags & SI_FL_WAIT_DATA)))) {
int last_len = si->ib->pipe ? si->ib->pipe->data : 0;
si_chk_snd(si->ib->cons);
/* check if the consumer has freed some space either in the
* buffer or in the pipe.
*/
if (!channel_full(si->ib) &&
(!last_len || !si->ib->pipe || si->ib->pipe->data < last_len))
si->flags &= ~SI_FL_WAIT_ROOM;
}
if (si->flags & SI_FL_WAIT_ROOM) {
__conn_data_stop_recv(conn);
si->ib->rex = TICK_ETERNITY;
}
else if ((si->ib->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL &&
!channel_full(si->ib)) {
/* we must re-enable reading if si_chk_snd() has freed some space */
__conn_data_want_recv(conn);
if (!(si->ib->flags & CF_READ_NOEXP) && tick_isset(si->ib->rex))
si->ib->rex = tick_add_ifset(now_ms, si->ib->rto);
}
/* wake the task up only when needed */
if (/* changes on the production side */
(si->ib->flags & (CF_READ_NULL|CF_READ_ERROR)) ||
si->state != SI_ST_EST ||
(si->flags & SI_FL_ERR) ||
((si->ib->flags & CF_READ_PARTIAL) &&
(!si->ib->to_forward || si->ib->cons->state != SI_ST_EST)) ||
/* changes on the consumption side */
(si->ob->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
((si->ob->flags & CF_WRITE_ACTIVITY) &&
((si->ob->flags & CF_SHUTW) ||
((si->ob->flags & CF_WAKE_WRITE) &&
(si->ob->prod->state != SI_ST_EST ||
(channel_is_empty(si->ob) && !si->ob->to_forward)))))) {
task_wakeup(si->owner, TASK_WOKEN_IO);
}
if (si->ib->flags & CF_READ_ACTIVITY)
si->ib->flags &= ~CF_READ_DONTWAIT;
return 0;
}
/*
* This function is called to send buffer data to a stream socket.
* It calls the transport layer's snd_buf function. It relies on the
* caller to commit polling changes. The caller should check conn->flags
* for errors.
*/
static void si_conn_send(struct connection *conn)
{
struct stream_interface *si = conn->owner;
struct channel *chn = si->ob;
int ret;
if (chn->pipe && conn->xprt->snd_pipe) {
ret = conn->xprt->snd_pipe(conn, chn->pipe);
if (ret > 0)
chn->flags |= CF_WRITE_PARTIAL;
if (!chn->pipe->data) {
put_pipe(chn->pipe);
chn->pipe = NULL;
}
if (conn->flags & CO_FL_ERROR)
return;
}
/* At this point, the pipe is empty, but we may still have data pending
* in the normal buffer.
*/
if (!chn->buf->o)
return;
/* when we're here, we already know that there is no spliced
* data left, and that there are sendable buffered data.
*/
if (!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_WR_SH | CO_FL_DATA_WR_SH | CO_FL_WAIT_DATA | CO_FL_HANDSHAKE))) {
/* check if we want to inform the kernel that we're interested in
* sending more data after this call. We want this if :
* - we're about to close after this last send and want to merge
* the ongoing FIN with the last segment.
* - we know we can't send everything at once and must get back
* here because of unaligned data
* - there is still a finite amount of data to forward
* The test is arranged so that the most common case does only 2
* tests.
*/
unsigned int send_flag = MSG_DONTWAIT | MSG_NOSIGNAL;
if ((!(chn->flags & (CF_NEVER_WAIT|CF_SEND_DONTWAIT)) &&
((chn->to_forward && chn->to_forward != CHN_INFINITE_FORWARD) ||
(chn->flags & CF_EXPECT_MORE))) ||
((chn->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW))
send_flag |= MSG_MORE;
ret = conn->xprt->snd_buf(conn, chn->buf, send_flag);
if (ret > 0) {
chn->flags |= CF_WRITE_PARTIAL;
if (!chn->buf->o) {
/* Always clear both flags once everything has been sent, they're one-shot */
chn->flags &= ~(CF_EXPECT_MORE | CF_SEND_DONTWAIT);
}
/* if some data remain in the buffer, it's only because the
* system buffers are full, we will try next time.
*/
}
}
return;
}
/* Updates the timers and flags of a stream interface attached to a connection,
* depending on the buffers' flags. It should only be called once after the
* buffer flags have settled down, and before they are cleared. It doesn't
* harm to call it as often as desired (it just slightly hurts performance).
* It is only meant to be called by upper layers after buffer flags have been
* manipulated by analysers.
*/
void stream_int_update_conn(struct stream_interface *si)
{
struct channel *ib = si->ib;
struct channel *ob = si->ob;
struct connection *conn = __objt_conn(si->end);
/* Check if we need to close the read side */
if (!(ib->flags & CF_SHUTR)) {
/* Read not closed, update FD status and timeout for reads */
if ((ib->flags & CF_DONT_READ) || channel_full(ib)) {
/* stop reading */
if (!(si->flags & SI_FL_WAIT_ROOM)) {
if (!(ib->flags & CF_DONT_READ)) /* full */
si->flags |= SI_FL_WAIT_ROOM;
conn_data_stop_recv(conn);
ib->rex = TICK_ETERNITY;
}
}
else {
/* (re)start reading and update timeout. Note: we don't recompute the timeout
* everytime we get here, otherwise it would risk never to expire. We only
* update it if is was not yet set. The stream socket handler will already
* have updated it if there has been a completed I/O.
*/
si->flags &= ~SI_FL_WAIT_ROOM;
conn_data_want_recv(conn);
if (!(ib->flags & (CF_READ_NOEXP|CF_DONT_READ)) && !tick_isset(ib->rex))
ib->rex = tick_add_ifset(now_ms, ib->rto);
}
}
/* Check if we need to close the write side */
if (!(ob->flags & CF_SHUTW)) {
/* Write not closed, update FD status and timeout for writes */
if (channel_is_empty(ob)) {
/* stop writing */
if (!(si->flags & SI_FL_WAIT_DATA)) {
if ((ob->flags & CF_SHUTW_NOW) == 0)
si->flags |= SI_FL_WAIT_DATA;
conn_data_stop_send(conn);
ob->wex = TICK_ETERNITY;
}
}
else {
/* (re)start writing and update timeout. Note: we don't recompute the timeout
* everytime we get here, otherwise it would risk never to expire. We only
* update it if is was not yet set. The stream socket handler will already
* have updated it if there has been a completed I/O.
*/
si->flags &= ~SI_FL_WAIT_DATA;
conn_data_want_send(conn);
if (!tick_isset(ob->wex)) {
ob->wex = tick_add_ifset(now_ms, ob->wto);
if (tick_isset(ib->rex) && !(si->flags & SI_FL_INDEP_STR)) {
/* Note: depending on the protocol, we don't know if we're waiting
* for incoming data or not. So in order to prevent the socket from
* expiring read timeouts during writes, we refresh the read timeout,
* except if it was already infinite or if we have explicitly setup
* independent streams.
*/
ib->rex = tick_add_ifset(now_ms, ib->rto);
}
}
}
}
}
/*
* This function performs a shutdown-read on a stream interface attached to
* a connection in a connected or init state (it does nothing for other
* states). It either shuts the read side or marks itself as closed. The buffer
* flags are updated to reflect the new state. If the stream interface has
* SI_FL_NOHALF, we also forward the close to the write side. If a control
* layer is defined, then it is supposed to be a socket layer and file
* descriptors are then shutdown or closed accordingly. The function
* automatically disables polling if needed.
*/
static void stream_int_shutr_conn(struct stream_interface *si)
{
struct connection *conn = __objt_conn(si->end);
si->ib->flags &= ~CF_SHUTR_NOW;
if (si->ib->flags & CF_SHUTR)
return;
si->ib->flags |= CF_SHUTR;
si->ib->rex = TICK_ETERNITY;
si->flags &= ~SI_FL_WAIT_ROOM;
if (si->state != SI_ST_EST && si->state != SI_ST_CON)
return;
if (si->ob->flags & CF_SHUTW) {
conn_full_close(conn);
si->state = SI_ST_DIS;
si->exp = TICK_ETERNITY;
}
else if (si->flags & SI_FL_NOHALF) {
/* we want to immediately forward this close to the write side */
return stream_int_shutw_conn(si);
}
else if (conn->ctrl) {
/* we want the caller to disable polling on this FD */
conn_data_stop_recv(conn);
}
}
/*
* This function performs a shutdown-write on a stream interface attached to
* a connection in a connected or init state (it does nothing for other
* states). It either shuts the write side or marks itself as closed. The
* buffer flags are updated to reflect the new state. It does also close
* everything if the SI was marked as being in error state. If there is a
* data-layer shutdown, it is called. If a control layer is defined, then it is
* supposed to be a socket layer and file descriptors are then shutdown or
* closed accordingly. The function automatically disables polling if needed.
* Note: at the moment, we continue to check conn->ctrl eventhough we *know* it
* is valid. This will help selecting the proper shutdown() and setsockopt()
* calls if/when we implement remote sockets later.
*/
static void stream_int_shutw_conn(struct stream_interface *si)
{
struct connection *conn = __objt_conn(si->end);
si->ob->flags &= ~CF_SHUTW_NOW;
if (si->ob->flags & CF_SHUTW)
return;
si->ob->flags |= CF_SHUTW;
si->ob->wex = TICK_ETERNITY;
si->flags &= ~SI_FL_WAIT_DATA;
switch (si->state) {
case SI_ST_EST:
/* we have to shut before closing, otherwise some short messages
* may never leave the system, especially when there are remaining
* unread data in the socket input buffer, or when nolinger is set.
* However, if SI_FL_NOLINGER is explicitly set, we know there is
* no risk so we close both sides immediately.
*/
if (si->flags & SI_FL_ERR) {
/* quick close, the socket is alredy shut anyway */
}
else if (si->flags & SI_FL_NOLINGER) {
/* unclean data-layer shutdown */
if (conn->xprt && conn->xprt->shutw)
conn->xprt->shutw(conn, 0);
}
else {
/* clean data-layer shutdown */
if (conn->xprt && conn->xprt->shutw)
conn->xprt->shutw(conn, 1);
/* If the stream interface is configured to disable half-open
* connections, we'll skip the shutdown(), but only if the
* read size is already closed. Otherwise we can't support
* closed write with pending read (eg: abortonclose while
* waiting for the server).
*/
if (!(si->flags & SI_FL_NOHALF) || !(si->ib->flags & (CF_SHUTR|CF_DONT_READ))) {
/* We shutdown transport layer */
if (conn_ctrl_ready(conn))
shutdown(conn->t.sock.fd, SHUT_WR);
if (!(si->ib->flags & (CF_SHUTR|CF_DONT_READ))) {
/* OK just a shutw, but we want the caller
* to disable polling on this FD if exists.
*/
if (conn->ctrl)
conn_data_stop_send(conn);
return;
}
}
}
/* fall through */
case SI_ST_CON:
/* we may have to close a pending connection, and mark the
* response buffer as shutr
*/
conn_full_close(conn);
/* fall through */
case SI_ST_CER:
case SI_ST_QUE:
case SI_ST_TAR:
si->state = SI_ST_DIS;
/* fall through */
default:
si->flags &= ~(SI_FL_WAIT_ROOM | SI_FL_NOLINGER);
si->ib->flags &= ~CF_SHUTR_NOW;
si->ib->flags |= CF_SHUTR;
si->ib->rex = TICK_ETERNITY;
si->exp = TICK_ETERNITY;
}
}
/* This function is used for inter-stream-interface calls. It is called by the
* consumer to inform the producer side that it may be interested in checking
* for free space in the buffer. Note that it intentionally does not update
* timeouts, so that we can still check them later at wake-up. This function is
* dedicated to connection-based stream interfaces.
*/
static void stream_int_chk_rcv_conn(struct stream_interface *si)
{
struct channel *ib = si->ib;
struct connection *conn = __objt_conn(si->end);
if (unlikely(si->state > SI_ST_EST || (ib->flags & CF_SHUTR)))
return;
conn_refresh_polling_flags(conn);
if ((ib->flags & CF_DONT_READ) || channel_full(ib)) {
/* stop reading */
if (!(ib->flags & CF_DONT_READ)) /* full */
si->flags |= SI_FL_WAIT_ROOM;
__conn_data_stop_recv(conn);
}
else {
/* (re)start reading */
si->flags &= ~SI_FL_WAIT_ROOM;
__conn_data_want_recv(conn);
}
conn_cond_update_data_polling(conn);
}
/* This function is used for inter-stream-interface calls. It is called by the
* producer to inform the consumer side that it may be interested in checking
* for data in the buffer. Note that it intentionally does not update timeouts,
* so that we can still check them later at wake-up.
*/
static void stream_int_chk_snd_conn(struct stream_interface *si)
{
struct channel *ob = si->ob;
struct connection *conn = __objt_conn(si->end);
if (unlikely(si->state > SI_ST_EST || (ob->flags & CF_SHUTW)))
return;
if (unlikely(channel_is_empty(ob))) /* called with nothing to send ! */
return;
if (!ob->pipe && /* spliced data wants to be forwarded ASAP */
!(si->flags & SI_FL_WAIT_DATA)) /* not waiting for data */
return;
if (conn->flags & (CO_FL_DATA_WR_ENA|CO_FL_CURR_WR_ENA)) {
/* already subscribed to write notifications, will be called
* anyway, so let's avoid calling it especially if the reader
* is not ready.
*/
return;
}
if (!(conn->flags & (CO_FL_HANDSHAKE|CO_FL_WAIT_L4_CONN|CO_FL_WAIT_L6_CONN))) {
BUG/MAJOR: connection: always recompute polling status upon I/O Bryan Berry and Baptiste Assmann both reported some occasional CPU spinning loops where haproxy was still processing I/O but burning CPU for apparently uncaught events. What happens is the following sequence : - proxy is in TCP mode - a connection from a client initiates a connection to a server - the connection to the server does not immediately happen and is polled for - in the mean time, the client speaks and the stream interface calls ->chk_snd() on the peer connection to send the new data - chk_snd() calls send_loop() to send the data. This last one makes the connection succeed and empties the buffer, so it disables polling on the connection and on the FD by creating an update entry. - before the update is processed, poll() succeeds and reports a write event for this fd. The poller does fd_ev_set() on the FD to switch it to speculative mode - the IO handler is called with a connection which has no write flag but an FD which is enabled in speculative mode. - the connection does nothing useful. - conn_update_polling() at the end of conn_fd_handler() cannot disable the FD because there were no changes on this FD. - the handler is left with speculative polling still enabled on the FD, and will be called over and over until a poll event is needed to transfer data. There is no perfectly elegant solution to this. At least we should update the flags indicating the current polling status to reflect what is being done at the FD level. This will allow to detect that the FD needs to be disabled upon exit. chk_snd() also needs minor changes to correctly switch to speculative polling before calling send_loop(), and to reflect this in the connection flags. This is needed so that no event remains stuck there without any polling. In fact, chk_snd() and chk_rcv() should perform the same number of preparations and cleanups as conn_fd_handler().
2012-12-10 10:33:38 -05:00
/* Before calling the data-level operations, we have to prepare
* the polling flags to ensure we properly detect changes.
*/
if (conn_ctrl_ready(conn))
fd_want_send(conn->t.sock.fd);
conn_refresh_polling_flags(conn);
BUG/MAJOR: connection: always recompute polling status upon I/O Bryan Berry and Baptiste Assmann both reported some occasional CPU spinning loops where haproxy was still processing I/O but burning CPU for apparently uncaught events. What happens is the following sequence : - proxy is in TCP mode - a connection from a client initiates a connection to a server - the connection to the server does not immediately happen and is polled for - in the mean time, the client speaks and the stream interface calls ->chk_snd() on the peer connection to send the new data - chk_snd() calls send_loop() to send the data. This last one makes the connection succeed and empties the buffer, so it disables polling on the connection and on the FD by creating an update entry. - before the update is processed, poll() succeeds and reports a write event for this fd. The poller does fd_ev_set() on the FD to switch it to speculative mode - the IO handler is called with a connection which has no write flag but an FD which is enabled in speculative mode. - the connection does nothing useful. - conn_update_polling() at the end of conn_fd_handler() cannot disable the FD because there were no changes on this FD. - the handler is left with speculative polling still enabled on the FD, and will be called over and over until a poll event is needed to transfer data. There is no perfectly elegant solution to this. At least we should update the flags indicating the current polling status to reflect what is being done at the FD level. This will allow to detect that the FD needs to be disabled upon exit. chk_snd() also needs minor changes to correctly switch to speculative polling before calling send_loop(), and to reflect this in the connection flags. This is needed so that no event remains stuck there without any polling. In fact, chk_snd() and chk_rcv() should perform the same number of preparations and cleanups as conn_fd_handler().
2012-12-10 10:33:38 -05:00
si_conn_send(conn);
if (conn_ctrl_ready(conn) && (conn->flags & CO_FL_ERROR)) {
BUG/MAJOR: connection: always recompute polling status upon I/O Bryan Berry and Baptiste Assmann both reported some occasional CPU spinning loops where haproxy was still processing I/O but burning CPU for apparently uncaught events. What happens is the following sequence : - proxy is in TCP mode - a connection from a client initiates a connection to a server - the connection to the server does not immediately happen and is polled for - in the mean time, the client speaks and the stream interface calls ->chk_snd() on the peer connection to send the new data - chk_snd() calls send_loop() to send the data. This last one makes the connection succeed and empties the buffer, so it disables polling on the connection and on the FD by creating an update entry. - before the update is processed, poll() succeeds and reports a write event for this fd. The poller does fd_ev_set() on the FD to switch it to speculative mode - the IO handler is called with a connection which has no write flag but an FD which is enabled in speculative mode. - the connection does nothing useful. - conn_update_polling() at the end of conn_fd_handler() cannot disable the FD because there were no changes on this FD. - the handler is left with speculative polling still enabled on the FD, and will be called over and over until a poll event is needed to transfer data. There is no perfectly elegant solution to this. At least we should update the flags indicating the current polling status to reflect what is being done at the FD level. This will allow to detect that the FD needs to be disabled upon exit. chk_snd() also needs minor changes to correctly switch to speculative polling before calling send_loop(), and to reflect this in the connection flags. This is needed so that no event remains stuck there without any polling. In fact, chk_snd() and chk_rcv() should perform the same number of preparations and cleanups as conn_fd_handler().
2012-12-10 10:33:38 -05:00
/* Write error on the file descriptor */
fd_stop_both(conn->t.sock.fd);
__conn_data_stop_both(conn);
BUG/MAJOR: connection: always recompute polling status upon I/O Bryan Berry and Baptiste Assmann both reported some occasional CPU spinning loops where haproxy was still processing I/O but burning CPU for apparently uncaught events. What happens is the following sequence : - proxy is in TCP mode - a connection from a client initiates a connection to a server - the connection to the server does not immediately happen and is polled for - in the mean time, the client speaks and the stream interface calls ->chk_snd() on the peer connection to send the new data - chk_snd() calls send_loop() to send the data. This last one makes the connection succeed and empties the buffer, so it disables polling on the connection and on the FD by creating an update entry. - before the update is processed, poll() succeeds and reports a write event for this fd. The poller does fd_ev_set() on the FD to switch it to speculative mode - the IO handler is called with a connection which has no write flag but an FD which is enabled in speculative mode. - the connection does nothing useful. - conn_update_polling() at the end of conn_fd_handler() cannot disable the FD because there were no changes on this FD. - the handler is left with speculative polling still enabled on the FD, and will be called over and over until a poll event is needed to transfer data. There is no perfectly elegant solution to this. At least we should update the flags indicating the current polling status to reflect what is being done at the FD level. This will allow to detect that the FD needs to be disabled upon exit. chk_snd() also needs minor changes to correctly switch to speculative polling before calling send_loop(), and to reflect this in the connection flags. This is needed so that no event remains stuck there without any polling. In fact, chk_snd() and chk_rcv() should perform the same number of preparations and cleanups as conn_fd_handler().
2012-12-10 10:33:38 -05:00
si->flags |= SI_FL_ERR;
goto out_wakeup;
}
}
/* OK, so now we know that some data might have been sent, and that we may
* have to poll first. We have to do that too if the buffer is not empty.
*/
if (channel_is_empty(ob)) {
/* the connection is established but we can't write. Either the
* buffer is empty, or we just refrain from sending because the
* ->o limit was reached. Maybe we just wrote the last
* chunk and need to close.
*/
__conn_data_stop_send(conn);
if (((ob->flags & (CF_SHUTW|CF_AUTO_CLOSE|CF_SHUTW_NOW)) ==
(CF_AUTO_CLOSE|CF_SHUTW_NOW)) &&
(si->state == SI_ST_EST)) {
si_shutw(si);
goto out_wakeup;
}
if ((ob->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0)
si->flags |= SI_FL_WAIT_DATA;
ob->wex = TICK_ETERNITY;
}
else {
/* Otherwise there are remaining data to be sent in the buffer,
* which means we have to poll before doing so.
*/
__conn_data_want_send(conn);
si->flags &= ~SI_FL_WAIT_DATA;
if (!tick_isset(ob->wex))
ob->wex = tick_add_ifset(now_ms, ob->wto);
}
if (likely(ob->flags & CF_WRITE_ACTIVITY)) {
/* update timeout if we have written something */
if ((ob->flags & (CF_SHUTW|CF_WRITE_PARTIAL)) == CF_WRITE_PARTIAL &&
!channel_is_empty(ob))
ob->wex = tick_add_ifset(now_ms, ob->wto);
if (tick_isset(si->ib->rex) && !(si->flags & SI_FL_INDEP_STR)) {
/* Note: to prevent the client from expiring read timeouts
* during writes, we refresh it. We only do this if the
* interface is not configured for "independent streams",
* because for some applications it's better not to do this,
* for instance when continuously exchanging small amounts
* of data which can full the socket buffers long before a
* write timeout is detected.
*/
si->ib->rex = tick_add_ifset(now_ms, si->ib->rto);
}
}
/* in case of special condition (error, shutdown, end of write...), we
* have to notify the task.
*/
if (likely((ob->flags & (CF_WRITE_NULL|CF_WRITE_ERROR|CF_SHUTW)) ||
((ob->flags & CF_WAKE_WRITE) &&
((channel_is_empty(si->ob) && !ob->to_forward) ||
si->state != SI_ST_EST)))) {
out_wakeup:
if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
task_wakeup(si->owner, TASK_WOKEN_IO);
}
/* commit possible polling changes */
conn_cond_update_polling(conn);
}
/*
* This is the callback which is called by the connection layer to receive data
REORG: connection: rename the data layer the "transport layer" While working on the changes required to make the health checks use the new connections, it started to become obvious that some naming was not logical at all in the connections. Specifically, it is not logical to call the "data layer" the layer which is in charge for all the handshake and which does not yet provide a data layer once established until a session has allocated all the required buffers. In fact, it's more a transport layer, which makes much more sense. The transport layer offers a medium on which data can transit, and it offers the functions to move these data when the upper layer requests this. And it is the upper layer which iterates over the transport layer's functions to move data which should be called the data layer. The use case where it's obvious is with embryonic sessions : an incoming SSL connection is accepted. Only the connection is allocated, not the buffers nor stream interface, etc... The connection handles the SSL handshake by itself. Once this handshake is complete, we can't use the data functions because the buffers and stream interface are not there yet. Hence we have to first call a specific function to complete the session initialization, after which we'll be able to use the data functions. This clearly proves that SSL here is only a transport layer and that the stream interface constitutes the data layer. A similar change will be performed to rename app_cb => data, but the two could not be in the same commit for obvious reasons.
2012-10-02 18:19:48 -04:00
* into the buffer from the connection. It iterates over the transport layer's
* rcv_buf function.
*/
static void si_conn_recv_cb(struct connection *conn)
{
struct stream_interface *si = conn->owner;
struct channel *chn = si->ib;
int ret, max, cur_read;
int read_poll = MAX_READ_POLL_LOOPS;
/* stop immediately on errors. Note that we DON'T want to stop on
* POLL_ERR, as the poller might report a write error while there
* are still data available in the recv buffer. This typically
* happens when we send too large a request to a backend server
* which rejects it before reading it all.
*/
if (conn->flags & CO_FL_ERROR)
return;
/* stop here if we reached the end of data */
if (conn_data_read0_pending(conn))
goto out_shutdown_r;
/* maybe we were called immediately after an asynchronous shutr */
if (chn->flags & CF_SHUTR)
return;
cur_read = 0;
/* First, let's see if we may splice data across the channel without
* using a buffer.
*/
REORG: connection: rename the data layer the "transport layer" While working on the changes required to make the health checks use the new connections, it started to become obvious that some naming was not logical at all in the connections. Specifically, it is not logical to call the "data layer" the layer which is in charge for all the handshake and which does not yet provide a data layer once established until a session has allocated all the required buffers. In fact, it's more a transport layer, which makes much more sense. The transport layer offers a medium on which data can transit, and it offers the functions to move these data when the upper layer requests this. And it is the upper layer which iterates over the transport layer's functions to move data which should be called the data layer. The use case where it's obvious is with embryonic sessions : an incoming SSL connection is accepted. Only the connection is allocated, not the buffers nor stream interface, etc... The connection handles the SSL handshake by itself. Once this handshake is complete, we can't use the data functions because the buffers and stream interface are not there yet. Hence we have to first call a specific function to complete the session initialization, after which we'll be able to use the data functions. This clearly proves that SSL here is only a transport layer and that the stream interface constitutes the data layer. A similar change will be performed to rename app_cb => data, but the two could not be in the same commit for obvious reasons.
2012-10-02 18:19:48 -04:00
if (conn->xprt->rcv_pipe &&
(chn->pipe || chn->to_forward >= MIN_SPLICE_FORWARD) &&
chn->flags & CF_KERN_SPLICING) {
if (buffer_not_empty(chn->buf)) {
/* We're embarrassed, there are already data pending in
* the buffer and we don't want to have them at two
* locations at a time. Let's indicate we need some
* place and ask the consumer to hurry.
*/
goto abort_splice;
}
if (unlikely(chn->pipe == NULL)) {
if (pipes_used >= global.maxpipes || !(chn->pipe = get_pipe())) {
chn->flags &= ~CF_KERN_SPLICING;
goto abort_splice;
}
}
ret = conn->xprt->rcv_pipe(conn, chn->pipe, chn->to_forward);
if (ret < 0) {
/* splice not supported on this end, let's disable it */
chn->flags &= ~CF_KERN_SPLICING;
goto abort_splice;
}
if (ret > 0) {
if (chn->to_forward != CHN_INFINITE_FORWARD)
chn->to_forward -= ret;
chn->total += ret;
cur_read += ret;
chn->flags |= CF_READ_PARTIAL;
}
if (conn_data_read0_pending(conn))
goto out_shutdown_r;
if (conn->flags & CO_FL_ERROR)
return;
BUG/MEDIUM: splicing: fix abnormal CPU usage with splicing Mark Janssen reported an issue in 1.5-dev19 which was introduced in 1.5-dev12 by commit 96199b10. From time to time, randomly, the CPU usage spikes to 100% for seconds to minutes. A deep analysis of the traces provided shows that it happens when waiting for the response to a second pipelined HTTP request, or when trying to handle the received shutdown advertised by epoll() after the last block of data. Each time, splice() was involved with data pending in the pipe. The cause of this was that such events could not be taken into account by splice nor by recv and were left pending : - the transfer of the last block of data, optionally with a shutdown was not handled by splice() because of the validation that to_forward is higher than MIN_SPLICE_FORWARD ; - the next recv() call was inhibited because of the test on presence of data in the pipe. This is also what prevented the recv() call from handling a response to a pipelined request until the client had ACKed the previous response. No less than 4 different methods were experimented to fix this, and the current one was finally chosen. The principle is that if an event is not caught by splice(), then it MUST be caught by recv(). So we remove the condition on the pipe's emptiness to perform an recv(), and in order to prevent recv() from being used in the middle of a transfer, we mark supposedly full pipes with CO_FL_WAIT_ROOM, which makes sense because the reason for stopping a splice()-based receive is that the pipe is supposed to be full. The net effect is that we don't wake up and sleep in loops during these transient states. This happened much more often than expected, sometimes for a few cycles at end of transfers, but rarely long enough to be noticed, unless a client timed out with data pending in the pipe. The effect on CPU usage is visible even when transfering 1MB objects in pipeline, where the CPU usage drops from 10 to 6% on a small machine at medium bandwidth. Some further improvements are needed : - the last chunk of a splice() transfer is never done using splice due to the test on to_forward. This is wrong and should be performed with splice if the pipe has not yet been emptied ; - si_chk_snd() should not be called when the write event is already being polled, otherwise we're almost certain to get EAGAIN. Many thanks to Mark for all the traces he cared to provide, they were essential for understanding this issue which was not reproducible without. Only 1.5-dev is affected, no backport is needed.
2013-07-18 15:49:32 -04:00
if (conn->flags & CO_FL_WAIT_ROOM) {
/* the pipe is full or we have read enough data that it
* could soon be full. Let's stop before needing to poll.
*/
si->flags |= SI_FL_WAIT_ROOM;
BUG/MEDIUM: splicing: fix abnormal CPU usage with splicing Mark Janssen reported an issue in 1.5-dev19 which was introduced in 1.5-dev12 by commit 96199b10. From time to time, randomly, the CPU usage spikes to 100% for seconds to minutes. A deep analysis of the traces provided shows that it happens when waiting for the response to a second pipelined HTTP request, or when trying to handle the received shutdown advertised by epoll() after the last block of data. Each time, splice() was involved with data pending in the pipe. The cause of this was that such events could not be taken into account by splice nor by recv and were left pending : - the transfer of the last block of data, optionally with a shutdown was not handled by splice() because of the validation that to_forward is higher than MIN_SPLICE_FORWARD ; - the next recv() call was inhibited because of the test on presence of data in the pipe. This is also what prevented the recv() call from handling a response to a pipelined request until the client had ACKed the previous response. No less than 4 different methods were experimented to fix this, and the current one was finally chosen. The principle is that if an event is not caught by splice(), then it MUST be caught by recv(). So we remove the condition on the pipe's emptiness to perform an recv(), and in order to prevent recv() from being used in the middle of a transfer, we mark supposedly full pipes with CO_FL_WAIT_ROOM, which makes sense because the reason for stopping a splice()-based receive is that the pipe is supposed to be full. The net effect is that we don't wake up and sleep in loops during these transient states. This happened much more often than expected, sometimes for a few cycles at end of transfers, but rarely long enough to be noticed, unless a client timed out with data pending in the pipe. The effect on CPU usage is visible even when transfering 1MB objects in pipeline, where the CPU usage drops from 10 to 6% on a small machine at medium bandwidth. Some further improvements are needed : - the last chunk of a splice() transfer is never done using splice due to the test on to_forward. This is wrong and should be performed with splice if the pipe has not yet been emptied ; - si_chk_snd() should not be called when the write event is already being polled, otherwise we're almost certain to get EAGAIN. Many thanks to Mark for all the traces he cared to provide, they were essential for understanding this issue which was not reproducible without. Only 1.5-dev is affected, no backport is needed.
2013-07-18 15:49:32 -04:00
__conn_data_stop_recv(conn);
}
/* splice not possible (anymore), let's go on on standard copy */
}
abort_splice:
BUG/MEDIUM: splicing: fix abnormal CPU usage with splicing Mark Janssen reported an issue in 1.5-dev19 which was introduced in 1.5-dev12 by commit 96199b10. From time to time, randomly, the CPU usage spikes to 100% for seconds to minutes. A deep analysis of the traces provided shows that it happens when waiting for the response to a second pipelined HTTP request, or when trying to handle the received shutdown advertised by epoll() after the last block of data. Each time, splice() was involved with data pending in the pipe. The cause of this was that such events could not be taken into account by splice nor by recv and were left pending : - the transfer of the last block of data, optionally with a shutdown was not handled by splice() because of the validation that to_forward is higher than MIN_SPLICE_FORWARD ; - the next recv() call was inhibited because of the test on presence of data in the pipe. This is also what prevented the recv() call from handling a response to a pipelined request until the client had ACKed the previous response. No less than 4 different methods were experimented to fix this, and the current one was finally chosen. The principle is that if an event is not caught by splice(), then it MUST be caught by recv(). So we remove the condition on the pipe's emptiness to perform an recv(), and in order to prevent recv() from being used in the middle of a transfer, we mark supposedly full pipes with CO_FL_WAIT_ROOM, which makes sense because the reason for stopping a splice()-based receive is that the pipe is supposed to be full. The net effect is that we don't wake up and sleep in loops during these transient states. This happened much more often than expected, sometimes for a few cycles at end of transfers, but rarely long enough to be noticed, unless a client timed out with data pending in the pipe. The effect on CPU usage is visible even when transfering 1MB objects in pipeline, where the CPU usage drops from 10 to 6% on a small machine at medium bandwidth. Some further improvements are needed : - the last chunk of a splice() transfer is never done using splice due to the test on to_forward. This is wrong and should be performed with splice if the pipe has not yet been emptied ; - si_chk_snd() should not be called when the write event is already being polled, otherwise we're almost certain to get EAGAIN. Many thanks to Mark for all the traces he cared to provide, they were essential for understanding this issue which was not reproducible without. Only 1.5-dev is affected, no backport is needed.
2013-07-18 15:49:32 -04:00
if (chn->pipe && unlikely(!chn->pipe->data)) {
put_pipe(chn->pipe);
chn->pipe = NULL;
}
BUG/MEDIUM: splicing: fix abnormal CPU usage with splicing Mark Janssen reported an issue in 1.5-dev19 which was introduced in 1.5-dev12 by commit 96199b10. From time to time, randomly, the CPU usage spikes to 100% for seconds to minutes. A deep analysis of the traces provided shows that it happens when waiting for the response to a second pipelined HTTP request, or when trying to handle the received shutdown advertised by epoll() after the last block of data. Each time, splice() was involved with data pending in the pipe. The cause of this was that such events could not be taken into account by splice nor by recv and were left pending : - the transfer of the last block of data, optionally with a shutdown was not handled by splice() because of the validation that to_forward is higher than MIN_SPLICE_FORWARD ; - the next recv() call was inhibited because of the test on presence of data in the pipe. This is also what prevented the recv() call from handling a response to a pipelined request until the client had ACKed the previous response. No less than 4 different methods were experimented to fix this, and the current one was finally chosen. The principle is that if an event is not caught by splice(), then it MUST be caught by recv(). So we remove the condition on the pipe's emptiness to perform an recv(), and in order to prevent recv() from being used in the middle of a transfer, we mark supposedly full pipes with CO_FL_WAIT_ROOM, which makes sense because the reason for stopping a splice()-based receive is that the pipe is supposed to be full. The net effect is that we don't wake up and sleep in loops during these transient states. This happened much more often than expected, sometimes for a few cycles at end of transfers, but rarely long enough to be noticed, unless a client timed out with data pending in the pipe. The effect on CPU usage is visible even when transfering 1MB objects in pipeline, where the CPU usage drops from 10 to 6% on a small machine at medium bandwidth. Some further improvements are needed : - the last chunk of a splice() transfer is never done using splice due to the test on to_forward. This is wrong and should be performed with splice if the pipe has not yet been emptied ; - si_chk_snd() should not be called when the write event is already being polled, otherwise we're almost certain to get EAGAIN. Many thanks to Mark for all the traces he cared to provide, they were essential for understanding this issue which was not reproducible without. Only 1.5-dev is affected, no backport is needed.
2013-07-18 15:49:32 -04:00
/* Important note : if we're called with POLL_IN|POLL_HUP, it means the read polling
* was enabled, which implies that the recv buffer was not full. So we have a guarantee
* that if such an event is not handled above in splice, it will be handled here by
* recv().
*/
while (!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_DATA_RD_SH | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) {
max = bi_avail(chn);
if (!max) {
si->flags |= SI_FL_WAIT_ROOM;
break;
}
ret = conn->xprt->rcv_buf(conn, chn->buf, max);
if (ret <= 0)
break;
cur_read += ret;
/* if we're allowed to directly forward data, we must update ->o */
if (chn->to_forward && !(chn->flags & (CF_SHUTW|CF_SHUTW_NOW))) {
unsigned long fwd = ret;
if (chn->to_forward != CHN_INFINITE_FORWARD) {
if (fwd > chn->to_forward)
fwd = chn->to_forward;
chn->to_forward -= fwd;
}
b_adv(chn->buf, fwd);
}
chn->flags |= CF_READ_PARTIAL;
chn->total += ret;
if (channel_full(chn)) {
/* The buffer is now full, there's no point in going through
* the loop again.
*/
if (!(chn->flags & CF_STREAMER_FAST) && (cur_read == buffer_len(chn->buf))) {
chn->xfer_small = 0;
chn->xfer_large++;
if (chn->xfer_large >= 3) {
/* we call this buffer a fast streamer if it manages
* to be filled in one call 3 consecutive times.
*/
chn->flags |= (CF_STREAMER | CF_STREAMER_FAST);
//fputc('+', stderr);
}
}
else if ((chn->flags & (CF_STREAMER | CF_STREAMER_FAST)) &&
(cur_read <= chn->buf->size / 2)) {
chn->xfer_large = 0;
chn->xfer_small++;
if (chn->xfer_small >= 2) {
/* if the buffer has been at least half full twice,
* we receive faster than we send, so at least it
* is not a "fast streamer".
*/
chn->flags &= ~CF_STREAMER_FAST;
//fputc('-', stderr);
}
}
else {
chn->xfer_small = 0;
chn->xfer_large = 0;
}
si->flags |= SI_FL_WAIT_ROOM;
break;
}
if ((chn->flags & CF_READ_DONTWAIT) || --read_poll <= 0) {
si->flags |= SI_FL_WAIT_ROOM;
__conn_data_stop_recv(conn);
break;
}
/* if too many bytes were missing from last read, it means that
* it's pointless trying to read again because the system does
* not have them in buffers.
*/
if (ret < max) {
if ((chn->flags & (CF_STREAMER | CF_STREAMER_FAST)) &&
(cur_read <= chn->buf->size / 2)) {
chn->xfer_large = 0;
chn->xfer_small++;
if (chn->xfer_small >= 3) {
/* we have read less than half of the buffer in
* one pass, and this happened at least 3 times.
* This is definitely not a streamer.
*/
chn->flags &= ~(CF_STREAMER | CF_STREAMER_FAST);
//fputc('!', stderr);
}
}
/* if a streamer has read few data, it may be because we
* have exhausted system buffers. It's not worth trying
* again.
*/
if (chn->flags & CF_STREAMER)
break;
/* if we read a large block smaller than what we requested,
* it's almost certain we'll never get anything more.
*/
if (ret >= global.tune.recv_enough)
break;
}
} /* while !flags */
if (conn->flags & CO_FL_ERROR)
return;
if (conn_data_read0_pending(conn))
/* connection closed */
goto out_shutdown_r;
return;
out_shutdown_r:
/* we received a shutdown */
chn->flags |= CF_READ_NULL;
if (chn->flags & CF_AUTO_CLOSE)
channel_shutw_now(chn);
stream_sock_read0(si);
conn_data_read0(conn);
return;
}
/*
* This is the callback which is called by the connection layer to send data
REORG: connection: rename the data layer the "transport layer" While working on the changes required to make the health checks use the new connections, it started to become obvious that some naming was not logical at all in the connections. Specifically, it is not logical to call the "data layer" the layer which is in charge for all the handshake and which does not yet provide a data layer once established until a session has allocated all the required buffers. In fact, it's more a transport layer, which makes much more sense. The transport layer offers a medium on which data can transit, and it offers the functions to move these data when the upper layer requests this. And it is the upper layer which iterates over the transport layer's functions to move data which should be called the data layer. The use case where it's obvious is with embryonic sessions : an incoming SSL connection is accepted. Only the connection is allocated, not the buffers nor stream interface, etc... The connection handles the SSL handshake by itself. Once this handshake is complete, we can't use the data functions because the buffers and stream interface are not there yet. Hence we have to first call a specific function to complete the session initialization, after which we'll be able to use the data functions. This clearly proves that SSL here is only a transport layer and that the stream interface constitutes the data layer. A similar change will be performed to rename app_cb => data, but the two could not be in the same commit for obvious reasons.
2012-10-02 18:19:48 -04:00
* from the buffer to the connection. It iterates over the transport layer's
* snd_buf function.
*/
static void si_conn_send_cb(struct connection *conn)
{
struct stream_interface *si = conn->owner;
struct channel *chn = si->ob;
if (conn->flags & CO_FL_ERROR)
return;
if (conn->flags & CO_FL_HANDSHAKE)
/* a handshake was requested */
return;
/* we might have been called just after an asynchronous shutw */
if (chn->flags & CF_SHUTW)
return;
/* OK there are data waiting to be sent */
si_conn_send(conn);
/* OK all done */
return;
}
/*
* This function propagates a null read received on a socket-based connection.
* It updates the stream interface. If the stream interface has SI_FL_NOHALF,
* the close is also forwarded to the write side as an abort. This function is
* still socket-specific as it handles a setsockopt() call to set the SO_LINGER
* state on the socket.
*/
void stream_sock_read0(struct stream_interface *si)
{
struct connection *conn = __objt_conn(si->end);
si->ib->flags &= ~CF_SHUTR_NOW;
if (si->ib->flags & CF_SHUTR)
return;
si->ib->flags |= CF_SHUTR;
si->ib->rex = TICK_ETERNITY;
si->flags &= ~SI_FL_WAIT_ROOM;
if (si->state != SI_ST_EST && si->state != SI_ST_CON)
return;
if (si->ob->flags & CF_SHUTW)
goto do_close;
if (si->flags & SI_FL_NOHALF) {
/* we want to immediately forward this close to the write side */
/* force flag on ssl to keep session in cache */
if (conn->xprt->shutw)
conn->xprt->shutw(conn, 0);
goto do_close;
}
/* otherwise that's just a normal read shutdown */
if (conn_ctrl_ready(conn))
fdtab[conn->t.sock.fd].linger_risk = 0;
__conn_data_stop_recv(conn);
return;
do_close:
/* OK we completely close the socket here just as if we went through si_shut[rw]() */
conn_full_close(conn);
si->ib->flags &= ~CF_SHUTR_NOW;
si->ib->flags |= CF_SHUTR;
si->ib->rex = TICK_ETERNITY;
si->ob->flags &= ~CF_SHUTW_NOW;
si->ob->flags |= CF_SHUTW;
si->ob->wex = TICK_ETERNITY;
si->flags &= ~(SI_FL_WAIT_DATA | SI_FL_WAIT_ROOM);
si->state = SI_ST_DIS;
si->exp = TICK_ETERNITY;
return;
}
[MAJOR] add a connection error state to the stream_interface Tracking connection status changes was hard, and some code was redundant. A new SI_ST_CER state was added to the stream interface to indicate a past connection error, and an SI_FL_ERR flag was added to report past I/O error. The stream_sock code does not set the connection to SI_ST_CLO anymore in case of I/O error, it's the upper layer which does it. This makes it possible to know exactly when the file descriptors are allocated. The new SI_ST_CER state permitted to split tcp_connection_status() in two parts, one processing SI_ST_CON and the other one SI_ST_CER. Synchronous connection errors now make use of this last state, hence eliminating duplicate code. Some ib<->ob copy paste errors were found and fixed, and all entities setting SI_ST_CLO also shut the buffers down. Some of these stream_interface specific functions and structures have migrated to a new stream_interface.c file. Some types of errors are still not detected by the buffers. For instance, let's assume the following scenario in one single pass of process_session: a connection sits in SI_ST_TAR state during a retry. At TAR expiration, a new connection attempt is made, the connection is obtained and srv->cur_sess is increased. Then the buffer timeout is fires and everything is cleared, the new state becomes SI_ST_CLO. The cleaning code checks that previous state was either SI_ST_CON or SI_ST_EST to release the connection. But that's wrong because last state is still SI_ST_TAR. So the server's connection count does not get decreased. This means that prev_state must not be used, and must be replaced by some transition detection instead of level detection. The following debugging line was useful to track state changes : fprintf(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__, s->si[0].state, s->si[1].state, s->si[1].err_type, s->req->flags, s-> rep->flags);
2008-11-03 00:26:53 -05:00
/*
* Local variables:
* c-indent-level: 8
* c-basic-offset: 8
* End:
*/