From 64d997ebfc58fe02fe540979c2c7907dc0d35944 Mon Sep 17 00:00:00 2001 From: Christopher Faulet Date: Thu, 5 Mar 2026 17:17:49 +0100 Subject: [PATCH] MAJOR: muxes: No longer use app_ops .wake() callback function from muxes Thanks to previous commits, it is now possible to wake the data layer up, via a tasklet_wakeup, instead of using the app_ops .wake() callback function. When a data layer must be notified of a mux event (an error for instance), we now always perform a tasklet_wakeup(). TASK_WOKEN_MSG state is used by default. TASK_WOKEN_IO is eventually added if the data layer was subscribed to receives or sends. Changes are not trivial at all. We replaced a synchronous call to the sc_conn_process() function by a tasklet_wakeup(). --- src/mux_fcgi.c | 34 ++++++++++++++++++---------------- src/mux_h1.c | 23 +++++++++++++---------- src/mux_h2.c | 31 ++++++++++++++++--------------- src/mux_pt.c | 24 ++++++++++-------------- src/mux_quic.c | 20 ++++++++++++-------- src/mux_spop.c | 28 ++++++++++++++-------------- 6 files changed, 83 insertions(+), 77 deletions(-) diff --git a/src/mux_fcgi.c b/src/mux_fcgi.c index c34514ae1..2ca2c5ee9 100644 --- a/src/mux_fcgi.c +++ b/src/mux_fcgi.c @@ -886,26 +886,28 @@ static void fcgi_strm_notify_send(struct fcgi_strm *fstrm) } } -/* Alerts the data layer, trying to wake it up by all means, following - * this sequence : - * - if the fcgi stream' data layer is subscribed to recv, then it's woken up - * for recv - * - if its subscribed to send, then it's woken up for send - * - if it was subscribed to neither, its ->wake() callback is called - * It is safe to call this function with a closed stream which doesn't have a - * stream connector anymore. + +/* Alerts the data layer by waking it up. TASK_WOKEN_MSG state is used by + * default and if the data layer is also subscribed to recv or send, + * TASK_WOKEN_IO is added. But first of all, we check if the shut tasklet must + * be woken up or not instead. */ static void fcgi_strm_alert(struct fcgi_strm *fstrm) { TRACE_POINT(FCGI_EV_STRM_WAKE, fstrm->fconn->conn, fstrm); - if (fstrm->subs || - (fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW))) { - fcgi_strm_notify_recv(fstrm); - fcgi_strm_notify_send(fstrm); - } - else if (fcgi_strm_sc(fstrm) && fcgi_strm_sc(fstrm)->app_ops->wake != NULL) { - TRACE_POINT(FCGI_EV_STRM_WAKE, fstrm->fconn->conn, fstrm); - fcgi_strm_sc(fstrm)->app_ops->wake(fcgi_strm_sc(fstrm)); + if (!fstrm->subs && (fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW))) + tasklet_wakeup(fstrm->shut_tl); + else if (fcgi_strm_sc(fstrm)) { + unsigned int state = TASK_WOKEN_MSG; + + if (fstrm->subs) { + if (fstrm->subs->events & SUB_RETRY_SEND) + fstrm->flags |= FCGI_SF_NOTIFIED; + fstrm->subs->events = 0; + fstrm->subs = NULL; + state |= TASK_WOKEN_IO; + } + tasklet_wakeup(fcgi_strm_sc(fstrm)->wait_event.tasklet, state); } } diff --git a/src/mux_h1.c b/src/mux_h1.c index fa9c20119..4be01c8f7 100644 --- a/src/mux_h1.c +++ b/src/mux_h1.c @@ -3733,21 +3733,24 @@ static void h1_wake_stream_for_send(struct h1s *h1s) } } -/* alerts the data layer following this sequence : - * - if the h1s' data layer is subscribed to recv, then it's woken up for recv - * - if its subscribed to send, then it's woken up for send - * - if it was subscribed to neither, its ->wake() callback is called +/* Alerts the data layer by waking it up. TASK_WOKEN_MSG state is used by + * default and if the data layer is also subscribed to recv or send, + * TASK_WOKEN_IO is added. */ static void h1_alert(struct h1s *h1s) { + unsigned int state = TASK_WOKEN_MSG; + + TRACE_POINT(H1_EV_STRM_WAKE, h1s->h1c->conn, h1s); + if (!h1s_sc(h1s)) + return; + if (h1s->subs) { - h1_wake_stream_for_recv(h1s); - h1_wake_stream_for_send(h1s); - } - else if (h1s_sc(h1s) && h1s_sc(h1s)->app_ops->wake != NULL) { - TRACE_POINT(H1_EV_STRM_WAKE, h1s->h1c->conn, h1s); - h1s_sc(h1s)->app_ops->wake(h1s_sc(h1s)); + h1s->subs->events = 0; + h1s->subs = NULL; + state |= TASK_WOKEN_IO; } + tasklet_wakeup(h1s_sc(h1s)->wait_event.tasklet, state); } /* Try to send an HTTP error with h1c->errcode status code. It returns 1 on success diff --git a/src/mux_h2.c b/src/mux_h2.c index d09eb7110..89439a129 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -1695,26 +1695,27 @@ static void __maybe_unused h2s_notify_send(struct h2s *h2s) } } -/* alerts the data layer, trying to wake it up by all means, following - * this sequence : - * - if the h2s' data layer is subscribed to recv, then it's woken up for recv - * - if its subscribed to send, then it's woken up for send - * - if it was subscribed to neither, its ->wake() callback is called - * It is safe to call this function with a closed stream which doesn't have a - * stream connector anymore. +/* alerts the data layer by waking it up. TASK_WOKEN_MSG state is used by + * default and if the data layer is also subscribed to recv or send, + * TASK_WOKEN_IO is added. But first of all, we check if the shut tasklet must + * be woken up or not instead. */ static void __maybe_unused h2s_alert(struct h2s *h2s) { TRACE_ENTER(H2_EV_H2S_WAKE, h2s->h2c->conn, h2s); + if (!h2s->subs && (h2s->flags & (H2_SF_WANT_SHUTR | H2_SF_WANT_SHUTW))) + tasklet_wakeup(h2s->shut_tl); + else if (h2s_sc(h2s)) { + unsigned int state = TASK_WOKEN_MSG; - if (h2s->subs || - (h2s->flags & (H2_SF_WANT_SHUTR | H2_SF_WANT_SHUTW))) { - h2s_notify_recv(h2s); - h2s_notify_send(h2s); - } - else if (h2s_sc(h2s) && h2s_sc(h2s)->app_ops->wake != NULL) { - TRACE_POINT(H2_EV_STRM_WAKE, h2s->h2c->conn, h2s); - h2s_sc(h2s)->app_ops->wake(h2s_sc(h2s)); + if (h2s->subs) { + if (h2s->subs->events & SUB_RETRY_SEND) + h2s->flags |= H2_SF_NOTIFIED; + h2s->subs->events = 0; + h2s->subs = NULL; + state |= TASK_WOKEN_IO; + } + tasklet_wakeup(h2s_sc(h2s)->wait_event.tasklet, state); } TRACE_LEAVE(H2_EV_H2S_WAKE, h2s->h2c->conn, h2s); diff --git a/src/mux_pt.c b/src/mux_pt.c index 0b4c9ce78..0c6a1f972 100644 --- a/src/mux_pt.c +++ b/src/mux_pt.c @@ -252,20 +252,21 @@ struct task *mux_pt_io_cb(struct task *t, void *tctx, unsigned int status) TRACE_ENTER(PT_EV_CONN_WAKE, ctx->conn); if (!se_fl_test(ctx->sd, SE_FL_ORPHAN)) { + unsigned int state = TASK_WOKEN_MSG; + /* There's a small race condition. * mux_pt_io_cb() is only supposed to be called if we have no * stream attached. However, maybe the tasklet got woken up, * and this connection was then attached to a new stream. - * If this happened, just wake the tasklet up if anybody - * subscribed to receive events, and otherwise call the wake - * method, to make sure the event is noticed. + * If this happened, just wake the tasklet up. */ if (ctx->conn->subs) { ctx->conn->subs->events = 0; - tasklet_wakeup(ctx->conn->subs->tasklet); ctx->conn->subs = NULL; - } else if (pt_sc(ctx)->app_ops->wake) - pt_sc(ctx)->app_ops->wake(pt_sc(ctx)); + state |= TASK_WOKEN_IO; + } + tasklet_wakeup(pt_sc(ctx)->wait_event.tasklet, state); + TRACE_DEVEL("leaving waking up SC", PT_EV_CONN_WAKE, ctx->conn); return t; } @@ -360,14 +361,9 @@ static int mux_pt_wake(struct connection *conn) int ret = 0; TRACE_ENTER(PT_EV_CONN_WAKE, ctx->conn); - if (!se_fl_test(ctx->sd, SE_FL_ORPHAN)) { - ret = pt_sc(ctx)->app_ops->wake ? pt_sc(ctx)->app_ops->wake(pt_sc(ctx)) : 0; - - if (ret < 0) { - TRACE_DEVEL("leaving waking up SC", PT_EV_CONN_WAKE, ctx->conn); - return ret; - } - } else { + if (!se_fl_test(ctx->sd, SE_FL_ORPHAN)) + tasklet_wakeup(pt_sc(ctx)->wait_event.tasklet, TASK_WOKEN_MSG); + else { conn_ctrl_drain(conn); if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH)) { TRACE_DEVEL("leaving destroying PT context", PT_EV_CONN_WAKE, ctx->conn); diff --git a/src/mux_quic.c b/src/mux_quic.c index 248c787bb..3fd283d9f 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -506,19 +506,23 @@ static struct ncbuf *qcs_get_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf) return ncbuf; } -/* Notify an eventual subscriber on or else wakeup up the stconn layer if - * initialized. +/* Notify the stconn layer if initialized with TASK_WOKEN_MSG state and + * eventually TASK_WOKEN_IO. */ static void qcs_alert(struct qcs *qcs) { + unsigned int state = TASK_WOKEN_MSG; + + TRACE_POINT(QMUX_EV_STRM_WAKE, qcs->qcc->conn, qcs); + if (!qcs_sc(qcs)) + return; + if (qcs->subs) { - qcs_notify_recv(qcs); - qcs_notify_send(qcs); - } - else if (qcs_sc(qcs) && qcs->sd->sc->app_ops->wake) { - TRACE_POINT(QMUX_EV_STRM_WAKE, qcs->qcc->conn, qcs); - qcs->sd->sc->app_ops->wake(qcs->sd->sc); + qcs->subs->events = 0; + qcs->subs = NULL; + state |= TASK_WOKEN_IO; } + tasklet_wakeup(qcs_sc(qcs)->wait_event.tasklet, state); } int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es) diff --git a/src/mux_spop.c b/src/mux_spop.c index 1fa07e7fb..162842ab2 100644 --- a/src/mux_spop.c +++ b/src/mux_spop.c @@ -972,26 +972,26 @@ static void spop_strm_notify_send(struct spop_strm *spop_strm) } } -/* Alerts the data layer, trying to wake it up by all means, following - * this sequence : - * - if the spop stream' data layer is subscribed to recv, then it's woken up - * for recv - * - if its subscribed to send, then it's woken up for send - * - if it was subscribed to neither, its ->wake() callback is called - * It is safe to call this function with a closed stream which doesn't have a - * stream connector anymore. +/* Alerts the data layer by waking it up. TASK_WOKEN_MSG state is used by + * default and if the data layer is also subscribed to recv or send, + * TASK_WOKEN_IO is added. */ static void spop_strm_alert(struct spop_strm *spop_strm) { + unsigned int state = TASK_WOKEN_MSG; + TRACE_POINT(SPOP_EV_STRM_WAKE, spop_strm->spop_conn->conn, spop_strm); + if (!spop_strm_sc(spop_strm)) + return; + if (spop_strm->subs) { - spop_strm_notify_recv(spop_strm); - spop_strm_notify_send(spop_strm); - } - else if (spop_strm_sc(spop_strm) && spop_strm_sc(spop_strm)->app_ops->wake != NULL) { - TRACE_POINT(SPOP_EV_STRM_WAKE, spop_strm->spop_conn->conn, spop_strm); - spop_strm_sc(spop_strm)->app_ops->wake(spop_strm_sc(spop_strm)); + if (spop_strm->subs->events & SUB_RETRY_SEND) + spop_strm->flags |= SPOP_SF_NOTIFIED; + spop_strm->subs->events = 0; + spop_strm->subs = NULL; + state |= TASK_WOKEN_IO; } + tasklet_wakeup(spop_strm_sc(spop_strm)->wait_event.tasklet, state); } /* Writes the 32-bit frame size at address */