MAJOR: muxes: No longer use app_ops .wake() callback function from muxes

Thanks to previous commits, it is now possible to wake the data layer up,
via a tasklet_wakeup, instead of using the app_ops .wake() callback
function.

When a data layer must be notified of a mux event (an error for instance),
we now always perform a tasklet_wakeup(). TASK_WOKEN_MSG state is used by
default. TASK_WOKEN_IO is eventually added if the data layer was subscribed
to receives or sends.

Changes are not trivial at all. We replaced a synchronous call to the
sc_conn_process() function by a tasklet_wakeup().
This commit is contained in:
Christopher Faulet 2026-03-05 17:17:49 +01:00
parent 26a0817c1a
commit 64d997ebfc
6 changed files with 83 additions and 77 deletions

View file

@ -886,26 +886,28 @@ static void fcgi_strm_notify_send(struct fcgi_strm *fstrm)
}
}
/* Alerts the data layer, trying to wake it up by all means, following
* this sequence :
* - if the fcgi stream' data layer is subscribed to recv, then it's woken up
* for recv
* - if its subscribed to send, then it's woken up for send
* - if it was subscribed to neither, its ->wake() callback is called
* It is safe to call this function with a closed stream which doesn't have a
* stream connector anymore.
/* Alerts the data layer by waking it up. TASK_WOKEN_MSG state is used by
* default and if the data layer is also subscribed to recv or send,
* TASK_WOKEN_IO is added. But first of all, we check if the shut tasklet must
* be woken up or not instead.
*/
static void fcgi_strm_alert(struct fcgi_strm *fstrm)
{
TRACE_POINT(FCGI_EV_STRM_WAKE, fstrm->fconn->conn, fstrm);
if (fstrm->subs ||
(fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW))) {
fcgi_strm_notify_recv(fstrm);
fcgi_strm_notify_send(fstrm);
}
else if (fcgi_strm_sc(fstrm) && fcgi_strm_sc(fstrm)->app_ops->wake != NULL) {
TRACE_POINT(FCGI_EV_STRM_WAKE, fstrm->fconn->conn, fstrm);
fcgi_strm_sc(fstrm)->app_ops->wake(fcgi_strm_sc(fstrm));
if (!fstrm->subs && (fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW)))
tasklet_wakeup(fstrm->shut_tl);
else if (fcgi_strm_sc(fstrm)) {
unsigned int state = TASK_WOKEN_MSG;
if (fstrm->subs) {
if (fstrm->subs->events & SUB_RETRY_SEND)
fstrm->flags |= FCGI_SF_NOTIFIED;
fstrm->subs->events = 0;
fstrm->subs = NULL;
state |= TASK_WOKEN_IO;
}
tasklet_wakeup(fcgi_strm_sc(fstrm)->wait_event.tasklet, state);
}
}

View file

@ -3733,21 +3733,24 @@ static void h1_wake_stream_for_send(struct h1s *h1s)
}
}
/* alerts the data layer following this sequence :
* - if the h1s' data layer is subscribed to recv, then it's woken up for recv
* - if its subscribed to send, then it's woken up for send
* - if it was subscribed to neither, its ->wake() callback is called
/* Alerts the data layer by waking it up. TASK_WOKEN_MSG state is used by
* default and if the data layer is also subscribed to recv or send,
* TASK_WOKEN_IO is added.
*/
static void h1_alert(struct h1s *h1s)
{
unsigned int state = TASK_WOKEN_MSG;
TRACE_POINT(H1_EV_STRM_WAKE, h1s->h1c->conn, h1s);
if (!h1s_sc(h1s))
return;
if (h1s->subs) {
h1_wake_stream_for_recv(h1s);
h1_wake_stream_for_send(h1s);
}
else if (h1s_sc(h1s) && h1s_sc(h1s)->app_ops->wake != NULL) {
TRACE_POINT(H1_EV_STRM_WAKE, h1s->h1c->conn, h1s);
h1s_sc(h1s)->app_ops->wake(h1s_sc(h1s));
h1s->subs->events = 0;
h1s->subs = NULL;
state |= TASK_WOKEN_IO;
}
tasklet_wakeup(h1s_sc(h1s)->wait_event.tasklet, state);
}
/* Try to send an HTTP error with h1c->errcode status code. It returns 1 on success

View file

@ -1695,26 +1695,27 @@ static void __maybe_unused h2s_notify_send(struct h2s *h2s)
}
}
/* alerts the data layer, trying to wake it up by all means, following
* this sequence :
* - if the h2s' data layer is subscribed to recv, then it's woken up for recv
* - if its subscribed to send, then it's woken up for send
* - if it was subscribed to neither, its ->wake() callback is called
* It is safe to call this function with a closed stream which doesn't have a
* stream connector anymore.
/* alerts the data layer by waking it up. TASK_WOKEN_MSG state is used by
* default and if the data layer is also subscribed to recv or send,
* TASK_WOKEN_IO is added. But first of all, we check if the shut tasklet must
* be woken up or not instead.
*/
static void __maybe_unused h2s_alert(struct h2s *h2s)
{
TRACE_ENTER(H2_EV_H2S_WAKE, h2s->h2c->conn, h2s);
if (!h2s->subs && (h2s->flags & (H2_SF_WANT_SHUTR | H2_SF_WANT_SHUTW)))
tasklet_wakeup(h2s->shut_tl);
else if (h2s_sc(h2s)) {
unsigned int state = TASK_WOKEN_MSG;
if (h2s->subs ||
(h2s->flags & (H2_SF_WANT_SHUTR | H2_SF_WANT_SHUTW))) {
h2s_notify_recv(h2s);
h2s_notify_send(h2s);
}
else if (h2s_sc(h2s) && h2s_sc(h2s)->app_ops->wake != NULL) {
TRACE_POINT(H2_EV_STRM_WAKE, h2s->h2c->conn, h2s);
h2s_sc(h2s)->app_ops->wake(h2s_sc(h2s));
if (h2s->subs) {
if (h2s->subs->events & SUB_RETRY_SEND)
h2s->flags |= H2_SF_NOTIFIED;
h2s->subs->events = 0;
h2s->subs = NULL;
state |= TASK_WOKEN_IO;
}
tasklet_wakeup(h2s_sc(h2s)->wait_event.tasklet, state);
}
TRACE_LEAVE(H2_EV_H2S_WAKE, h2s->h2c->conn, h2s);

View file

@ -252,20 +252,21 @@ struct task *mux_pt_io_cb(struct task *t, void *tctx, unsigned int status)
TRACE_ENTER(PT_EV_CONN_WAKE, ctx->conn);
if (!se_fl_test(ctx->sd, SE_FL_ORPHAN)) {
unsigned int state = TASK_WOKEN_MSG;
/* There's a small race condition.
* mux_pt_io_cb() is only supposed to be called if we have no
* stream attached. However, maybe the tasklet got woken up,
* and this connection was then attached to a new stream.
* If this happened, just wake the tasklet up if anybody
* subscribed to receive events, and otherwise call the wake
* method, to make sure the event is noticed.
* If this happened, just wake the tasklet up.
*/
if (ctx->conn->subs) {
ctx->conn->subs->events = 0;
tasklet_wakeup(ctx->conn->subs->tasklet);
ctx->conn->subs = NULL;
} else if (pt_sc(ctx)->app_ops->wake)
pt_sc(ctx)->app_ops->wake(pt_sc(ctx));
state |= TASK_WOKEN_IO;
}
tasklet_wakeup(pt_sc(ctx)->wait_event.tasklet, state);
TRACE_DEVEL("leaving waking up SC", PT_EV_CONN_WAKE, ctx->conn);
return t;
}
@ -360,14 +361,9 @@ static int mux_pt_wake(struct connection *conn)
int ret = 0;
TRACE_ENTER(PT_EV_CONN_WAKE, ctx->conn);
if (!se_fl_test(ctx->sd, SE_FL_ORPHAN)) {
ret = pt_sc(ctx)->app_ops->wake ? pt_sc(ctx)->app_ops->wake(pt_sc(ctx)) : 0;
if (ret < 0) {
TRACE_DEVEL("leaving waking up SC", PT_EV_CONN_WAKE, ctx->conn);
return ret;
}
} else {
if (!se_fl_test(ctx->sd, SE_FL_ORPHAN))
tasklet_wakeup(pt_sc(ctx)->wait_event.tasklet, TASK_WOKEN_MSG);
else {
conn_ctrl_drain(conn);
if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH)) {
TRACE_DEVEL("leaving destroying PT context", PT_EV_CONN_WAKE, ctx->conn);

View file

@ -506,19 +506,23 @@ static struct ncbuf *qcs_get_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf)
return ncbuf;
}
/* Notify an eventual subscriber on <qcs> or else wakeup up the stconn layer if
* initialized.
/* Notify the stconn layer if initialized with TASK_WOKEN_MSG state and
* eventually TASK_WOKEN_IO.
*/
static void qcs_alert(struct qcs *qcs)
{
unsigned int state = TASK_WOKEN_MSG;
TRACE_POINT(QMUX_EV_STRM_WAKE, qcs->qcc->conn, qcs);
if (!qcs_sc(qcs))
return;
if (qcs->subs) {
qcs_notify_recv(qcs);
qcs_notify_send(qcs);
}
else if (qcs_sc(qcs) && qcs->sd->sc->app_ops->wake) {
TRACE_POINT(QMUX_EV_STRM_WAKE, qcs->qcc->conn, qcs);
qcs->sd->sc->app_ops->wake(qcs->sd->sc);
qcs->subs->events = 0;
qcs->subs = NULL;
state |= TASK_WOKEN_IO;
}
tasklet_wakeup(qcs_sc(qcs)->wait_event.tasklet, state);
}
int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es)

View file

@ -972,26 +972,26 @@ static void spop_strm_notify_send(struct spop_strm *spop_strm)
}
}
/* Alerts the data layer, trying to wake it up by all means, following
* this sequence :
* - if the spop stream' data layer is subscribed to recv, then it's woken up
* for recv
* - if its subscribed to send, then it's woken up for send
* - if it was subscribed to neither, its ->wake() callback is called
* It is safe to call this function with a closed stream which doesn't have a
* stream connector anymore.
/* Alerts the data layer by waking it up. TASK_WOKEN_MSG state is used by
* default and if the data layer is also subscribed to recv or send,
* TASK_WOKEN_IO is added.
*/
static void spop_strm_alert(struct spop_strm *spop_strm)
{
unsigned int state = TASK_WOKEN_MSG;
TRACE_POINT(SPOP_EV_STRM_WAKE, spop_strm->spop_conn->conn, spop_strm);
if (!spop_strm_sc(spop_strm))
return;
if (spop_strm->subs) {
spop_strm_notify_recv(spop_strm);
spop_strm_notify_send(spop_strm);
}
else if (spop_strm_sc(spop_strm) && spop_strm_sc(spop_strm)->app_ops->wake != NULL) {
TRACE_POINT(SPOP_EV_STRM_WAKE, spop_strm->spop_conn->conn, spop_strm);
spop_strm_sc(spop_strm)->app_ops->wake(spop_strm_sc(spop_strm));
if (spop_strm->subs->events & SUB_RETRY_SEND)
spop_strm->flags |= SPOP_SF_NOTIFIED;
spop_strm->subs->events = 0;
spop_strm->subs = NULL;
state |= TASK_WOKEN_IO;
}
tasklet_wakeup(spop_strm_sc(spop_strm)->wait_event.tasklet, state);
}
/* Writes the 32-bit frame size <len> at address <frame> */