diff --git a/include/proto/connection.h b/include/proto/connection.h index a2bcc97e6..e55ec8bef 100644 --- a/include/proto/connection.h +++ b/include/proto/connection.h @@ -315,25 +315,25 @@ static inline void __conn_xprt_stop_recv(struct connection *c) c->flags &= ~CO_FL_XPRT_RD_ENA; } -static inline void __cs_data_want_recv(struct conn_stream *cs) +static inline void __cs_want_recv(struct conn_stream *cs) { cs->flags |= CS_FL_DATA_RD_ENA; } -static inline void __cs_data_stop_recv(struct conn_stream *cs) +static inline void __cs_stop_recv(struct conn_stream *cs) { cs->flags &= ~CS_FL_DATA_RD_ENA; } -static inline void cs_data_want_recv(struct conn_stream *cs) +static inline void cs_want_recv(struct conn_stream *cs) { - __cs_data_want_recv(cs); + __cs_want_recv(cs); cs_update_mux_polling(cs); } -static inline void cs_data_stop_recv(struct conn_stream *cs) +static inline void cs_stop_recv(struct conn_stream *cs) { - __cs_data_stop_recv(cs); + __cs_stop_recv(cs); cs_update_mux_polling(cs); } @@ -366,36 +366,36 @@ static inline void __conn_xprt_stop_both(struct connection *c) c->flags &= ~(CO_FL_XPRT_WR_ENA | CO_FL_XPRT_RD_ENA); } -static inline void __cs_data_want_send(struct conn_stream *cs) +static inline void __cs_want_send(struct conn_stream *cs) { cs->flags |= CS_FL_DATA_WR_ENA; } -static inline void __cs_data_stop_send(struct conn_stream *cs) +static inline void __cs_stop_send(struct conn_stream *cs) { cs->flags &= ~CS_FL_DATA_WR_ENA; } -static inline void cs_data_stop_send(struct conn_stream *cs) +static inline void cs_stop_send(struct conn_stream *cs) { - __cs_data_stop_send(cs); + __cs_stop_send(cs); cs_update_mux_polling(cs); } -static inline void cs_data_want_send(struct conn_stream *cs) +static inline void cs_want_send(struct conn_stream *cs) { - __cs_data_want_send(cs); + __cs_want_send(cs); cs_update_mux_polling(cs); } -static inline void __cs_data_stop_both(struct conn_stream *cs) +static inline void __cs_stop_both(struct conn_stream *cs) { cs->flags &= ~(CS_FL_DATA_WR_ENA | CS_FL_DATA_RD_ENA); } -static inline void cs_data_stop_both(struct conn_stream *cs) +static inline void cs_stop_both(struct conn_stream *cs) { - __cs_data_stop_both(cs); + __cs_stop_both(cs); cs_update_mux_polling(cs); } @@ -537,6 +537,45 @@ static inline void conn_xprt_shutw_hard(struct connection *c) c->xprt->shutw(c, 0); } +/* shut read after draining possibly pending data */ +static inline void cs_shutr(struct conn_stream *cs) +{ + __cs_stop_recv(cs); + + /* clean data-layer shutdown */ + if (cs->conn->mux && cs->conn->mux->shutr) + cs->conn->mux->shutr(cs, 1); +} + +/* shut read after disabling lingering */ +static inline void cs_shutr_hard(struct conn_stream *cs) +{ + __cs_stop_recv(cs); + + /* clean data-layer shutdown */ + if (cs->conn->mux && cs->conn->mux->shutr) + cs->conn->mux->shutr(cs, 0); +} + +static inline void cs_shutw(struct conn_stream *cs) +{ + __cs_stop_send(cs); + + /* clean data-layer shutdown */ + if (cs->conn->mux && cs->conn->mux->shutw) + cs->conn->mux->shutw(cs, 1); +} + +static inline void cs_shutw_hard(struct conn_stream *cs) +{ + __cs_stop_send(cs); + + /* unclean data-layer shutdown */ + if (cs->conn->mux && cs->conn->mux->shutw) + cs->conn->mux->shutw(cs, 0); +} + + /* detect sock->data read0 transition */ static inline int conn_xprt_read0_pending(struct connection *c) { @@ -576,7 +615,6 @@ static inline void conn_init(struct connection *conn) { conn->obj_type = OBJ_TYPE_CONN; conn->flags = CO_FL_NONE; - conn->data = NULL; conn->tmp_early_data = -1; conn->mux = NULL; conn->mux_ctx = NULL; @@ -622,31 +660,43 @@ static inline struct connection *conn_new() return conn; } -/* Tries to allocate a new conn_stream and initialize its main fields. The - * connection is returned on success, NULL on failure. The connection must - * be released using pool_free2() or conn_free(). - */ -static inline struct conn_stream *cs_new(struct connection *conn) -{ - struct conn_stream *cs; - - cs = pool_alloc2(pool2_connstream); - if (likely(cs != NULL)) - cs_init(cs, conn); - return cs; -} - /* Releases a conn_stream previously allocated by cs_new() */ static inline void cs_free(struct conn_stream *cs) { pool_free2(pool2_connstream, cs); } +/* Tries to allocate a new conn_stream and initialize its main fields. If + * is NULL, then a new connection is allocated on the fly, initialized, + * and assigned to cs->conn ; this connection will then have to be released + * using pool_free2() or conn_free(). The conn_stream is initialized and added + * to the mux's stream list on success, then returned. On failure, nothing is + * allocated and NULL is returned. + */ +static inline struct conn_stream *cs_new(struct connection *conn) +{ + struct conn_stream *cs; + + cs = pool_alloc2(pool2_connstream); + if (!likely(cs)) + return NULL; + + if (!conn) { + conn = conn_new(); + if (!likely(conn)) { + cs_free(cs); + return NULL; + } + conn_init(conn); + } + + cs_init(cs, conn); + return cs; +} + /* Releases a connection previously allocated by conn_new() */ static inline void conn_free(struct connection *conn) { - if (conn->mux && conn->mux->release) - conn->mux->release(conn); pool_free2(pool2_connection, conn); } @@ -700,11 +750,11 @@ static inline void conn_get_to_addr(struct connection *conn) conn->flags |= CO_FL_ADDR_TO_SET; } -/* Attaches a connection to an owner and assigns a data layer */ -static inline void conn_attach(struct connection *conn, void *owner, const struct data_cb *data) +/* Attaches a conn_stream to a data layer and sets the relevant callbacks */ +static inline void cs_attach(struct conn_stream *cs, void *data, const struct data_cb *data_cb) { - conn->data = data; - conn->owner = owner; + cs->data_cb = data_cb; + cs->data = data; } /* Installs the connection's mux layer for upper context . @@ -789,11 +839,11 @@ static inline const char *conn_get_mux_name(const struct connection *conn) return conn->mux->name; } -static inline const char *conn_get_data_name(const struct connection *conn) +static inline const char *cs_get_data_name(const struct conn_stream *cs) { - if (!conn->data) + if (!cs->data_cb) return "NONE"; - return conn->data->name; + return cs->data_cb->name; } /* registers pointer to transport layer (XPRT_*) */ diff --git a/include/proto/stream.h b/include/proto/stream.h index 3efb42bac..f0edc2ef4 100644 --- a/include/proto/stream.h +++ b/include/proto/stream.h @@ -36,7 +36,7 @@ extern struct list streams; extern struct data_cb sess_conn_cb; struct stream *stream_new(struct session *sess, enum obj_type *origin); -int stream_create_from_conn(struct connection *conn); +int stream_create_from_cs(struct conn_stream *cs); /* perform minimal intializations, report 0 in case of error, 1 if OK. */ int init_stream(); diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h index c6578ef07..ee1fa124a 100644 --- a/include/proto/stream_interface.h +++ b/include/proto/stream_interface.h @@ -152,18 +152,14 @@ static inline enum obj_type *si_detach_endpoint(struct stream_interface *si) */ static inline void si_release_endpoint(struct stream_interface *si) { - struct connection *conn; + struct conn_stream *cs; struct appctx *appctx; if (!si->end) return; - if ((conn = objt_conn(si->end))) { - LIST_DEL(&conn->list); - conn_stop_tracking(conn); - conn_full_close(conn); - conn_free(conn); - } + if ((cs = objt_cs(si->end))) + cs_destroy(cs); else if ((appctx = objt_appctx(si->end))) { if (appctx->applet->release && si->state < SI_ST_DIS) appctx->applet->release(appctx); @@ -178,26 +174,27 @@ static inline void si_release_endpoint(struct stream_interface *si) * connection will also be added at the head of this list. This connection * remains assigned to the stream interface it is currently attached to. */ -static inline void si_idle_conn(struct stream_interface *si, struct list *pool) +static inline void si_idle_cs(struct stream_interface *si, struct list *pool) { - struct connection *conn = __objt_conn(si->end); + struct conn_stream *cs = __objt_cs(si->end); + struct connection *conn = cs->conn; if (pool) LIST_ADD(pool, &conn->list); - conn_attach(conn, si, &si_idle_conn_cb); - conn_xprt_want_recv(conn); + cs_attach(cs, si, &si_idle_conn_cb); + cs_want_recv(cs); } -/* Attach connection to the stream interface . The stream interface +/* Attach conn_stream to the stream interface . The stream interface * is configured to work with a connection and the connection it configured * with a stream interface data layer. */ -static inline void si_attach_conn(struct stream_interface *si, struct connection *conn) +static inline void si_attach_cs(struct stream_interface *si, struct conn_stream *cs) { si->ops = &si_conn_ops; - si->end = &conn->obj_type; - conn_attach(conn, si, &si_conn_cb); + si->end = &cs->obj_type; + cs_attach(cs, si, &si_conn_cb); } /* Returns true if a connection is attached to the stream interface and @@ -205,7 +202,7 @@ static inline void si_attach_conn(struct stream_interface *si, struct connection */ static inline int si_conn_ready(struct stream_interface *si) { - struct connection *conn = objt_conn(si->end); + struct connection *conn = cs_conn(objt_cs(si->end)); return conn && conn_ctrl_ready(conn) && conn_xprt_ready(conn); } @@ -276,22 +273,22 @@ static inline void si_applet_stop_get(struct stream_interface *si) si->flags &= ~SI_FL_WANT_GET; } -/* Try to allocate a new connection and assign it to the interface. If +/* Try to allocate a new conn_stream and assign it to the interface. If * an endpoint was previously allocated, it is released first. The newly - * allocated connection is initialized, assigned to the stream interface, + * allocated conn_stream is initialized, assigned to the stream interface, * and returned. */ -static inline struct connection *si_alloc_conn(struct stream_interface *si) +static inline struct conn_stream *si_alloc_cs(struct stream_interface *si, struct connection *conn) { - struct connection *conn; + struct conn_stream *cs; si_release_endpoint(si); - conn = conn_new(); - if (conn) - si_attach_conn(si, conn); + cs = cs_new(conn); + if (cs) + si_attach_cs(si, cs); - return conn; + return cs; } /* Release the interface's existing endpoint (connection or appctx) and @@ -346,7 +343,8 @@ static inline void si_chk_snd(struct stream_interface *si) /* Calls chk_snd on the connection using the ctrl layer */ static inline int si_connect(struct stream_interface *si) { - struct connection *conn = objt_conn(si->end); + struct conn_stream *cs = objt_cs(si->end); + struct connection *conn = cs_conn(cs); int ret = SF_ERR_NONE; if (unlikely(!conn || !conn->ctrl || !conn->ctrl->connect)) @@ -364,7 +362,7 @@ static inline int si_connect(struct stream_interface *si) /* reuse the existing connection */ if (!channel_is_empty(si_oc(si))) { /* we'll have to send a request there. */ - conn_xprt_want_send(conn); + cs_want_send(cs); } /* the connection is established */ diff --git a/include/types/checks.h b/include/types/checks.h index 3559f2d52..ac3e7b644 100644 --- a/include/types/checks.h +++ b/include/types/checks.h @@ -157,7 +157,7 @@ enum { struct check { struct xprt_ops *xprt; /* transport layer operations for health checks */ - struct connection *conn; /* connection state for health checks */ + struct conn_stream *cs; /* conn_stream state for health checks */ unsigned short port; /* the port to use for the health checks */ struct buffer *bi, *bo; /* input and output buffers to send/recv check */ struct task *task; /* the task associated to the health check processing, NULL if disabled */ diff --git a/include/types/connection.h b/include/types/connection.h index 66ec1b6b1..0d62efc99 100644 --- a/include/types/connection.h +++ b/include/types/connection.h @@ -288,9 +288,9 @@ struct mux_ops { * data movement. It may abort a connection by returning < 0. */ struct data_cb { - void (*recv)(struct connection *conn); /* data-layer recv callback */ - void (*send)(struct connection *conn); /* data-layer send callback */ - int (*wake)(struct connection *conn); /* data-layer callback to report activity */ + void (*recv)(struct conn_stream *cs); /* data-layer recv callback */ + void (*send)(struct conn_stream *cs); /* data-layer send callback */ + int (*wake)(struct conn_stream *cs); /* data-layer callback to report activity */ char name[8]; /* data layer name, zero-terminated */ }; @@ -347,10 +347,9 @@ struct connection { const struct protocol *ctrl; /* operations at the socket layer */ const struct xprt_ops *xprt; /* operations at the transport layer */ const struct mux_ops *mux; /* mux layer opreations. Must be set before xprt->init() */ - const struct data_cb *data; /* data layer callbacks. Must be set before xprt->init() */ void *xprt_ctx; /* general purpose pointer, initialized to NULL */ void *mux_ctx; /* mux-specific context, initialized to NULL */ - void *owner; /* pointer to upper layer's entity (eg: session, stream interface) */ + void *owner; /* pointer to the owner session for incoming connections, or NULL */ int xprt_st; /* transport layer state, initialized to zero */ int tmp_early_data; /* 1st byte of early data, if any */ union conn_handle handle; /* connection handle at the socket layer */ diff --git a/src/backend.c b/src/backend.c index 4d44e5499..9dbbd9198 100644 --- a/src/backend.c +++ b/src/backend.c @@ -567,7 +567,7 @@ int assign_server(struct stream *s) srv = NULL; s->target = NULL; - conn = objt_conn(s->si[1].end); + conn = cs_conn(objt_cs(s->si[1].end)); if (conn && (conn->flags & CO_FL_CONNECTED) && @@ -720,8 +720,7 @@ int assign_server(struct stream *s) s->target = &s->be->obj_type; } else if ((s->be->options & PR_O_HTTP_PROXY) && - (conn = objt_conn(s->si[1].end)) && - is_addr(&conn->addr.to)) { + conn && is_addr(&conn->addr.to)) { /* in proxy mode, we need a valid destination address */ s->target = &s->be->obj_type; } @@ -769,7 +768,7 @@ int assign_server(struct stream *s) int assign_server_address(struct stream *s) { struct connection *cli_conn = objt_conn(strm_orig(s)); - struct connection *srv_conn = objt_conn(s->si[1].end); + struct connection *srv_conn = cs_conn(objt_cs(s->si[1].end)); #ifdef DEBUG_FULL fprintf(stderr,"assign_server_address : s=%p\n",s); @@ -973,7 +972,7 @@ static void assign_tproxy_address(struct stream *s) struct server *srv = objt_server(s->target); struct conn_src *src; struct connection *cli_conn; - struct connection *srv_conn = objt_conn(s->si[1].end); + struct connection *srv_conn = cs_conn(objt_cs(s->si[1].end)); if (srv && srv->conn_src.opts & CO_SRC_BIND) src = &srv->conn_src; @@ -1041,21 +1040,23 @@ int connect_server(struct stream *s) { struct connection *cli_conn; struct connection *srv_conn; - struct connection *old_conn; + struct conn_stream *srv_cs; + struct conn_stream *old_cs; struct server *srv; int reuse = 0; int err; srv = objt_server(s->target); - srv_conn = objt_conn(s->si[1].end); + srv_cs = objt_cs(s->si[1].end); + srv_conn = cs_conn(srv_cs); if (srv_conn) reuse = s->target == srv_conn->target; if (srv && !reuse) { - old_conn = srv_conn; - if (old_conn) { + old_cs = srv_cs; + if (old_cs) { srv_conn = NULL; - old_conn->owner = NULL; + srv_cs->data = NULL; si_detach_endpoint(&s->si[1]); /* note: if the connection was in a server's idle * queue, it doesn't get dequeued. @@ -1101,23 +1102,25 @@ int connect_server(struct stream *s) LIST_DEL(&srv_conn->list); LIST_INIT(&srv_conn->list); - if (srv_conn->owner) { - si_detach_endpoint(srv_conn->owner); - if (old_conn && !(old_conn->flags & CO_FL_PRIVATE)) { - si_attach_conn(srv_conn->owner, old_conn); - si_idle_conn(srv_conn->owner, NULL); + /* XXX cognet: this assumes only 1 conn_stream per + * connection, has to be revisited later + */ + srv_cs = srv_conn->mux_ctx; + + if (srv_conn->mux == &mux_pt_ops && srv_cs->data) { + si_detach_endpoint(srv_cs->data); + if (old_cs && !(old_cs->conn->flags & CO_FL_PRIVATE)) { + si_attach_cs(srv_cs->data, old_cs); + si_idle_cs(srv_cs->data, NULL); } } - si_attach_conn(&s->si[1], srv_conn); + si_attach_cs(&s->si[1], srv_cs); reuse = 1; } /* we may have to release our connection if we couldn't swap it */ - if (old_conn && !old_conn->owner) { - LIST_DEL(&old_conn->list); - conn_full_close(old_conn); - conn_free(old_conn); - } + if (old_cs && !old_cs->data) + cs_destroy(old_cs); } if (reuse) { @@ -1136,15 +1139,16 @@ int connect_server(struct stream *s) } } - if (!reuse) - srv_conn = si_alloc_conn(&s->si[1]); - else { + if (!reuse) { + srv_cs = si_alloc_cs(&s->si[1], NULL); + srv_conn = cs_conn(srv_cs); + } else { /* reusing our connection, take it out of the idle list */ LIST_DEL(&srv_conn->list); LIST_INIT(&srv_conn->list); } - if (!srv_conn) + if (!srv_cs) return SF_ERR_RESOURCE; if (!(s->flags & SF_ADDR_SET)) { @@ -1160,14 +1164,16 @@ int connect_server(struct stream *s) /* set the correct protocol on the output stream interface */ if (srv) { conn_prepare(srv_conn, protocol_by_family(srv_conn->addr.to.ss_family), srv->xprt); - conn_install_mux(srv_conn, &mux_pt_ops, srv_conn); + /* XXX: Pick the right mux, when we finally have one */ + conn_install_mux(srv_conn, &mux_pt_ops, srv_cs); } else if (obj_type(s->target) == OBJ_TYPE_PROXY) { /* proxies exclusively run on raw_sock right now */ conn_prepare(srv_conn, protocol_by_family(srv_conn->addr.to.ss_family), xprt_get(XPRT_RAW)); - if (!objt_conn(s->si[1].end) || !objt_conn(s->si[1].end)->ctrl) + if (!objt_cs(s->si[1].end) || !objt_cs(s->si[1].end)->conn->ctrl) return SF_ERR_INTERNAL; - conn_install_mux(srv_conn, &mux_pt_ops, srv_conn); + /* XXX: Pick the right mux, when we finally have one */ + conn_install_mux(srv_conn, &mux_pt_ops, srv_cs); } else return SF_ERR_INTERNAL; /* how did we get there ? */ @@ -1182,13 +1188,13 @@ int connect_server(struct stream *s) conn_get_to_addr(cli_conn); } - si_attach_conn(&s->si[1], srv_conn); + si_attach_cs(&s->si[1], srv_cs); assign_tproxy_address(s); } else { /* the connection is being reused, just re-attach it */ - si_attach_conn(&s->si[1], srv_conn); + si_attach_cs(&s->si[1], srv_cs); s->flags |= SF_SRV_REUSED; } diff --git a/src/checks.c b/src/checks.c index b717d3870..aff5ff3ea 100644 --- a/src/checks.c +++ b/src/checks.c @@ -582,7 +582,8 @@ static int retrieve_errno_from_socket(struct connection *conn) */ static void chk_report_conn_err(struct check *check, int errno_bck, int expired) { - struct connection *conn = check->conn; + struct conn_stream *cs = check->cs; + struct connection *conn = cs_conn(cs); const char *err_msg; struct chunk *chk; int step; @@ -705,9 +706,10 @@ static void chk_report_conn_err(struct check *check, int errno_bck, int expired) * it sends the request. In other cases, it calls set_server_check_status() * to set check->status, check->duration and check->result. */ -static void event_srv_chk_w(struct connection *conn) +static void event_srv_chk_w(struct conn_stream *cs) { - struct check *check = conn->owner; + struct connection *conn = cs->conn; + struct check *check = cs->data; struct server *s = check->server; struct task *t = check->task; @@ -719,7 +721,7 @@ static void event_srv_chk_w(struct connection *conn) if (retrieve_errno_from_socket(conn)) { chk_report_conn_err(check, errno, 0); - __conn_xprt_stop_both(conn); + __cs_stop_both(cs); goto out_wakeup; } @@ -741,10 +743,10 @@ static void event_srv_chk_w(struct connection *conn) return; if (check->bo->o) { - conn->xprt->snd_buf(conn, check->bo, 0); + conn->mux->snd_buf(cs, check->bo, 0); if (conn->flags & CO_FL_ERROR) { chk_report_conn_err(check, errno, 0); - __conn_xprt_stop_both(conn); + __cs_stop_both(cs); goto out_wakeup; } if (check->bo->o) @@ -761,7 +763,7 @@ static void event_srv_chk_w(struct connection *conn) out_wakeup: task_wakeup(t, TASK_WOKEN_IO); out_nowake: - __conn_xprt_stop_send(conn); /* nothing more to write */ + __cs_stop_send(cs); /* nothing more to write */ } /* @@ -778,9 +780,10 @@ static void event_srv_chk_w(struct connection *conn) * call it with a proper error status like HCHK_STATUS_L7STS, HCHK_STATUS_L6RSP, * etc. */ -static void event_srv_chk_r(struct connection *conn) +static void event_srv_chk_r(struct conn_stream *cs) { - struct check *check = conn->owner; + struct connection *conn = cs->conn; + struct check *check = cs->data; struct server *s = check->server; struct task *t = check->task; char *desc; @@ -815,7 +818,7 @@ static void event_srv_chk_r(struct connection *conn) done = 0; - conn->xprt->rcv_buf(conn, check->bi, check->bi->size); + conn->mux->rcv_buf(cs, check->bi, check->bi->size); if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH)) { done = 1; if ((conn->flags & CO_FL_ERROR) && !check->bi->i) { @@ -1339,8 +1342,8 @@ static void event_srv_chk_r(struct connection *conn) * range quickly. To avoid sending RSTs all the time, we first try to * drain pending data. */ - __conn_xprt_stop_both(conn); - conn_xprt_shutw(conn); + __cs_stop_both(cs); + cs_shutw(cs); /* OK, let's not stay here forever */ if (check->result == CHK_RES_FAILED) @@ -1350,7 +1353,7 @@ static void event_srv_chk_r(struct connection *conn) return; wait_more_data: - __conn_xprt_want_recv(conn); + __cs_want_recv(cs); } /* @@ -1359,15 +1362,17 @@ static void event_srv_chk_r(struct connection *conn) * It returns 0 on normal cases, <0 if at least one close() has happened on the * connection (eg: reconnect). */ -static int wake_srv_chk(struct connection *conn) +static int wake_srv_chk(struct conn_stream *cs) { - struct check *check = conn->owner; + struct connection *conn = cs->conn; + struct check *check = cs->data; int ret = 0; /* we may have to make progress on the TCP checks */ if (check->type == PR_O2_TCPCHK_CHK) { ret = tcpcheck_main(check); - conn = check->conn; + cs = check->cs; + conn = cs_conn(cs); } if (unlikely(conn->flags & CO_FL_ERROR)) { @@ -1378,8 +1383,7 @@ static int wake_srv_chk(struct connection *conn) * we expect errno to still be valid. */ chk_report_conn_err(check, errno, 0); - - __conn_xprt_stop_both(conn); + __cs_stop_both(cs); task_wakeup(check->task, TASK_WOKEN_IO); } else if (!(conn->flags & (CO_FL_XPRT_RD_ENA|CO_FL_XPRT_WR_ENA|CO_FL_HANDSHAKE))) { @@ -1478,7 +1482,8 @@ static int connect_conn_chk(struct task *t) { struct check *check = t->context; struct server *s = check->server; - struct connection *conn = check->conn; + struct conn_stream *cs = check->cs; + struct connection *conn = cs_conn(cs); struct protocol *proto; struct tcpcheck_rule *tcp_rule = NULL; int ret; @@ -1535,9 +1540,10 @@ static int connect_conn_chk(struct task *t) } /* prepare a new connection */ - conn = check->conn = conn_new(); - if (!check->conn) + cs = check->cs = cs_new(NULL); + if (!check->cs) return SF_ERR_RESOURCE; + conn = cs->conn; if (is_addr(&check->addr)) { /* we'll connect to the check addr specified on the server */ @@ -1553,7 +1559,7 @@ static int connect_conn_chk(struct task *t) i = srv_check_healthcheck_port(check); if (i == 0) { - conn->owner = check; + cs->data = check; return SF_ERR_CHK_PORT; } @@ -1563,8 +1569,8 @@ static int connect_conn_chk(struct task *t) proto = protocol_by_family(conn->addr.to.ss_family); conn_prepare(conn, proto, check->xprt); - conn_install_mux(conn, &mux_pt_ops, conn); - conn_attach(conn, check, &check_conn_cb); + conn_install_mux(conn, &mux_pt_ops, cs); + cs_attach(cs, check, &check_conn_cb); conn->target = &s->obj_type; /* no client address */ @@ -2077,7 +2083,8 @@ static struct task *process_chk_conn(struct task *t) { struct check *check = t->context; struct server *s = check->server; - struct connection *conn = check->conn; + struct conn_stream *cs = check->cs; + struct connection *conn = cs_conn(cs); int rv; int ret; int expired = tick_is_expired(t->expire, now_ms); @@ -2105,7 +2112,8 @@ static struct task *process_chk_conn(struct task *t) check->bo->o = 0; ret = connect_conn_chk(t); - conn = check->conn; + cs = check->cs; + conn = cs_conn(cs); switch (ret) { case SF_ERR_UP: @@ -2123,7 +2131,7 @@ static struct task *process_chk_conn(struct task *t) } if (check->type) - conn_xprt_want_recv(conn); /* prepare for reading a possible reply */ + cs_want_recv(cs); /* prepare for reading a possible reply */ task_set_affinity(t, tid_bit); goto reschedule; @@ -2147,9 +2155,10 @@ static struct task *process_chk_conn(struct task *t) } /* here, we have seen a synchronous error, no fd was allocated */ - if (conn) { - conn_free(conn); - check->conn = conn = NULL; + if (cs) { + cs_destroy(cs); + cs = check->cs = NULL; + conn = NULL; } check->state &= ~CHK_ST_INPROGRESS; @@ -2201,8 +2210,9 @@ static struct task *process_chk_conn(struct task *t) } if (conn) { - conn_free(conn); - check->conn = conn = NULL; + cs_destroy(cs); + cs = check->cs = NULL; + conn = NULL; } if (check->result == CHK_RES_FAILED) { @@ -2550,7 +2560,8 @@ static int tcpcheck_main(struct check *check) char *contentptr, *comment; struct tcpcheck_rule *next; int done = 0, ret = 0, step = 0; - struct connection *conn = check->conn; + struct conn_stream *cs = check->cs; + struct connection *conn = cs_conn(cs); struct server *s = check->server; struct task *t = check->task; struct list *head = check->tcpcheck_rules; @@ -2619,8 +2630,8 @@ static int tcpcheck_main(struct check *check) } /* It's only the rules which will enable send/recv */ - if (conn) - __conn_xprt_stop_both(conn); + if (cs) + cs_stop_both(cs); while (1) { /* We have to try to flush the output buffer before reading, at @@ -2633,11 +2644,11 @@ static int tcpcheck_main(struct check *check) check->current_step->action != TCPCHK_ACT_SEND || check->current_step->string_len >= buffer_total_space(check->bo))) { - __conn_xprt_want_send(conn); - if (conn->xprt->snd_buf(conn, check->bo, 0) <= 0) { + __cs_want_send(cs); + if (conn->mux->snd_buf(cs, check->bo, 0) <= 0) { if (conn->flags & CO_FL_ERROR) { chk_report_conn_err(check, errno, 0); - __conn_xprt_stop_both(conn); + __cs_stop_both(cs); goto out_end_tcpcheck; } break; @@ -2673,8 +2684,9 @@ static int tcpcheck_main(struct check *check) * 2: try to get a new connection * 3: release and replace the old one on success */ - if (check->conn) { - conn_full_close(check->conn); + if (check->cs) { + /* XXX: need to kill all CS here as well but not to free them yet */ + conn_full_close(check->cs->conn); retcode = -1; /* do not reuse the fd! */ } @@ -2682,8 +2694,8 @@ static int tcpcheck_main(struct check *check) check->last_started_step = check->current_step; /* prepare new connection */ - conn = conn_new(); - if (!conn) { + cs = cs_new(NULL); + if (!cs) { step = tcpcheck_get_step_id(check); chunk_printf(&trash, "TCPCHK error allocating connection at step %d", step); comment = tcpcheck_get_step_comment(check, step); @@ -2694,11 +2706,15 @@ static int tcpcheck_main(struct check *check) return retcode; } - if (check->conn) - conn_free(check->conn); - check->conn = conn; + if (check->cs) { + if (check->cs->conn) + conn_free(check->cs->conn); + cs_free(check->cs); + } - conn_attach(conn, check, &check_conn_cb); + check->cs = cs; + conn = cs->conn; + cs_attach(cs, check, &check_conn_cb); conn->target = &s->obj_type; /* no client address */ @@ -2727,7 +2743,7 @@ static int tcpcheck_main(struct check *check) xprt = xprt_get(XPRT_RAW); } conn_prepare(conn, proto, xprt); - conn_install_mux(conn, &mux_pt_ops, conn); + conn_install_mux(conn, &mux_pt_ops, cs); ret = SF_ERR_INTERNAL; if (proto->connect) @@ -2860,8 +2876,8 @@ static int tcpcheck_main(struct check *check) if (unlikely(check->result == CHK_RES_FAILED)) goto out_end_tcpcheck; - __conn_xprt_want_recv(conn); - if (conn->xprt->rcv_buf(conn, check->bi, check->bi->size) <= 0) { + __cs_want_recv(cs); + if (conn->mux->rcv_buf(cs, check->bi, check->bi->size) <= 0) { if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH)) { done = 1; if ((conn->flags & CO_FL_ERROR) && !check->bi->i) { @@ -2958,7 +2974,7 @@ static int tcpcheck_main(struct check *check) if (check->current_step->action == TCPCHK_ACT_EXPECT) goto tcpcheck_expect; - __conn_xprt_stop_recv(conn); + __cs_stop_recv(cs); } } else { @@ -2978,7 +2994,7 @@ static int tcpcheck_main(struct check *check) if (check->current_step->action == TCPCHK_ACT_EXPECT) goto tcpcheck_expect; - __conn_xprt_stop_recv(conn); + __cs_stop_recv(cs); } /* not matched but was supposed to => ERROR */ else { @@ -3012,11 +3028,11 @@ static int tcpcheck_main(struct check *check) /* warning, current_step may now point to the head */ if (check->bo->o) - __conn_xprt_want_send(conn); + __cs_want_send(cs); if (&check->current_step->list != head && check->current_step->action == TCPCHK_ACT_EXPECT) - __conn_xprt_want_recv(conn); + __cs_want_recv(cs); return retcode; out_end_tcpcheck: @@ -3030,7 +3046,7 @@ static int tcpcheck_main(struct check *check) if (check->result == CHK_RES_FAILED) conn->flags |= CO_FL_ERROR; - __conn_xprt_stop_both(conn); + __cs_stop_both(cs); return retcode; } @@ -3049,7 +3065,6 @@ const char *init_check(struct check *check, int type) return "out of memory while allocating check buffer"; } check->bo->size = global.tune.chksize; - return NULL; } @@ -3059,8 +3074,10 @@ void free_check(struct check *check) check->bi = NULL; free(check->bo); check->bo = NULL; - free(check->conn); - check->conn = NULL; + free(check->cs->conn); + check->cs->conn = NULL; + cs_free(check->cs); + check->cs = NULL; } void email_alert_free(struct email_alert *alert) diff --git a/src/cli.c b/src/cli.c index 9ca8e1fcf..b546fd02b 100644 --- a/src/cli.c +++ b/src/cli.c @@ -1268,7 +1268,7 @@ static int _getsocks(char **args, struct appctx *appctx, void *private) struct cmsghdr *cmsg; struct stream_interface *si = appctx->owner; struct stream *s = si_strm(si); - struct connection *remote = objt_conn(si_opposite(si)->end); + struct connection *remote = cs_conn(objt_cs(si_opposite(si)->end)); struct msghdr msghdr; struct iovec iov; struct timeval tv = { .tv_sec = 1, .tv_usec = 0 }; diff --git a/src/frontend.c b/src/frontend.c index e03e0995e..24fc0c147 100644 --- a/src/frontend.c +++ b/src/frontend.c @@ -101,7 +101,7 @@ int frontend_accept(struct stream *s) /* try to report the ALPN value when available (also works for NPN) */ - if (conn && conn->owner == &s->si[0]) { + if (conn && conn == cs_conn(objt_cs(s->si[0].end))) { if (conn_get_alpn(conn, &alpn_str, &alpn_len) && alpn_str) { int len = MIN(alpn_len, sizeof(alpn) - 1); memcpy(alpn, alpn_str, len); diff --git a/src/hlua.c b/src/hlua.c index 4911204a5..761fa7f3e 100644 --- a/src/hlua.c +++ b/src/hlua.c @@ -1521,7 +1521,7 @@ __LJMP static struct hlua_socket *hlua_checksocket(lua_State *L, int ud) static void hlua_socket_handler(struct appctx *appctx) { struct stream_interface *si = appctx->owner; - struct connection *c = objt_conn(si_opposite(si)->end); + struct connection *c = cs_conn(objt_cs(si_opposite(si)->end)); if (appctx->ctx.hlua_cosocket.die) { si_shutw(si); @@ -2167,7 +2167,7 @@ __LJMP static int hlua_socket_getpeername(struct lua_State *L) si = appctx->owner; s = si_strm(si); - conn = objt_conn(s->si[1].end); + conn = cs_conn(objt_cs(s->si[1].end)); if (!conn) { xref_unlock(&socket->xref, peer); lua_pushnil(L); @@ -2217,7 +2217,7 @@ static int hlua_socket_getsockname(struct lua_State *L) si = appctx->owner; s = si_strm(si); - conn = objt_conn(s->si[1].end); + conn = cs_conn(objt_cs(s->si[1].end)); if (!conn) { xref_unlock(&socket->xref, peer); lua_pushnil(L); @@ -2346,7 +2346,7 @@ __LJMP static int hlua_socket_connect(struct lua_State *L) s = si_strm(si); /* Initialise connection. */ - conn = si_alloc_conn(&s->si[1]); + conn = cs_conn(si_alloc_cs(&s->si[1], NULL)); if (!conn) { xref_unlock(&socket->xref, peer); WILL_LJMP(luaL_error(L, "connect: internal error")); diff --git a/src/log.c b/src/log.c index 773662b2b..88e0d07a2 100644 --- a/src/log.c +++ b/src/log.c @@ -1525,7 +1525,7 @@ int build_logline(struct stream *s, char *dst, size_t maxsize, struct list *list break; case LOG_FMT_BACKENDIP: // %bi - conn = objt_conn(s->si[1].end); + conn = cs_conn(objt_cs(s->si[1].end)); if (conn) ret = lf_ip(tmplog, (struct sockaddr *)&conn->addr.from, dst + maxsize - tmplog, tmp); else @@ -1538,7 +1538,7 @@ int build_logline(struct stream *s, char *dst, size_t maxsize, struct list *list break; case LOG_FMT_BACKENDPORT: // %bp - conn = objt_conn(s->si[1].end); + conn = cs_conn(objt_cs(s->si[1].end)); if (conn) ret = lf_port(tmplog, (struct sockaddr *)&conn->addr.from, dst + maxsize - tmplog, tmp); else @@ -1551,7 +1551,7 @@ int build_logline(struct stream *s, char *dst, size_t maxsize, struct list *list break; case LOG_FMT_SERVERIP: // %si - conn = objt_conn(s->si[1].end); + conn = cs_conn(objt_cs(s->si[1].end)); if (conn) ret = lf_ip(tmplog, (struct sockaddr *)&conn->addr.to, dst + maxsize - tmplog, tmp); else @@ -1564,7 +1564,7 @@ int build_logline(struct stream *s, char *dst, size_t maxsize, struct list *list break; case LOG_FMT_SERVERPORT: // %sp - conn = objt_conn(s->si[1].end); + conn = cs_conn(objt_cs(s->si[1].end)); if (conn) ret = lf_port(tmplog, (struct sockaddr *)&conn->addr.to, dst + maxsize - tmplog, tmp); else diff --git a/src/mux_pt.c b/src/mux_pt.c index 48a676f8b..2a83eb4c7 100644 --- a/src/mux_pt.c +++ b/src/mux_pt.c @@ -14,17 +14,31 @@ #include #include -/* Initialize the mux once it's attached. If conn->mux_ctx is NULL, it is - * assumed that no data layer has yet been instanciated so the mux is - * attached to an incoming connection and will instanciate a new stream. If - * conn->mux_ctx exists, it is assumed that it is an outgoing connection - * requested for this context. Returns < 0 on error. +/* Initialize the mux once it's attached. It is expected that conn->mux_ctx + * points to the existing conn_stream (for outgoing connections) or NULL (for + * incoming ones, in which case one will be allocated and a new stream will be + * instanciated). Returns < 0 on error. */ static int mux_pt_init(struct connection *conn) { - if (!conn->mux_ctx) - return stream_create_from_conn(conn); + struct conn_stream *cs = conn->mux_ctx; + + if (!cs) { + cs = cs_new(conn); + if (!cs) + goto fail; + + if (stream_create_from_cs(cs) < 0) + goto fail_free; + + conn->mux_ctx = cs; + } return 0; + + fail_free: + cs_free(cs); + fail: + return -1; } /* callback to be used by default for the pass-through mux. It calls the data @@ -32,7 +46,13 @@ static int mux_pt_init(struct connection *conn) */ static int mux_pt_wake(struct connection *conn) { - return conn->data->wake ? conn->data->wake(conn) : 0; + struct conn_stream *cs = conn->mux_ctx; + int ret; + + ret = cs->data_cb->wake ? cs->data_cb->wake(cs) : 0; + + cs_update_mux_polling(cs); + return (ret); } /* callback used to update the mux's polling flags after changing a cs' status. @@ -60,7 +80,12 @@ static void mux_pt_update_poll(struct conn_stream *cs) */ static void mux_pt_recv(struct connection *conn) { - conn->data->recv(conn); + struct conn_stream *cs = conn->mux_ctx; + + if (conn_xprt_read0_pending(conn)) + cs->flags |= CS_FL_EOS; + cs->data_cb->recv(cs); + cs_update_mux_polling(cs); } /* callback to be used by default for the pass-through mux. It simply calls the @@ -68,7 +93,10 @@ static void mux_pt_recv(struct connection *conn) */ static void mux_pt_send(struct connection *conn) { - conn->data->send(conn); + struct conn_stream *cs = conn->mux_ctx; + + cs->data_cb->send(cs); + cs_update_mux_polling(cs); } /* diff --git a/src/peers.c b/src/peers.c index d7705ea76..9419afe4e 100644 --- a/src/peers.c +++ b/src/peers.c @@ -1871,6 +1871,7 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer struct session *sess; struct stream *s; struct connection *conn; + struct conn_stream *cs; peer->reconnect = tick_add(now_ms, MS_TO_TICKS(5000)); peer->statuscode = PEER_SESS_SC_CONNECTCODE; @@ -1912,9 +1913,12 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer if (unlikely((conn = conn_new()) == NULL)) goto out_free_strm; + if (unlikely((cs = cs_new(conn)) == NULL)) + goto out_free_conn; + conn_prepare(conn, peer->proto, peer->xprt); - conn_install_mux(conn, &mux_pt_ops, conn); - si_attach_conn(&s->si[1], conn); + conn_install_mux(conn, &mux_pt_ops, cs); + si_attach_cs(&s->si[1], cs); conn->target = s->target = &s->be->obj_type; memcpy(&conn->addr.to, &peer->addr, sizeof(conn->addr.to)); @@ -1928,6 +1932,8 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer return appctx; /* Error unrolling */ + out_free_conn: + conn_free(conn); out_free_strm: LIST_DEL(&s->list); pool_free2(pool2_stream, s); diff --git a/src/proto_http.c b/src/proto_http.c index ca9918896..8d813d32a 100644 --- a/src/proto_http.c +++ b/src/proto_http.c @@ -3662,7 +3662,7 @@ int http_process_request(struct stream *s, struct channel *req, int an_bit) char *path; /* Note that for now we don't reuse existing proxy connections */ - if (unlikely((conn = si_alloc_conn(&s->si[1])) == NULL)) { + if (unlikely((conn = cs_conn(si_alloc_cs(&s->si[1], NULL))) == NULL)) { txn->req.err_state = txn->req.msg_state; txn->req.msg_state = HTTP_MSG_ERROR; txn->status = 500; @@ -4212,6 +4212,7 @@ void http_end_txn_clean_session(struct stream *s) int prev_status = s->txn->status; struct proxy *fe = strm_fe(s); struct proxy *be = s->be; + struct conn_stream *cs; struct connection *srv_conn; struct server *srv; unsigned int prev_flags = s->txn->flags; @@ -4221,7 +4222,14 @@ void http_end_txn_clean_session(struct stream *s) * flags. We also need a more accurate method for computing per-request * data. */ - srv_conn = objt_conn(s->si[1].end); + /* + * XXX cognet: This is probably wrong, this is killing a whole + * connection, in the new world order, we probably want to just kill + * the stream, this is to be revisited the day we handle multiple + * streams in one server connection. + */ + cs = objt_cs(s->si[1].end); + srv_conn = cs_conn(cs); /* unless we're doing keep-alive, we want to quickly close the connection * to the server. @@ -4364,17 +4372,17 @@ void http_end_txn_clean_session(struct stream *s) if (srv_conn && LIST_ISEMPTY(&srv_conn->list)) { srv = objt_server(srv_conn->target); if (!srv) - si_idle_conn(&s->si[1], NULL); + si_idle_cs(&s->si[1], NULL); else if (srv_conn->flags & CO_FL_PRIVATE) - si_idle_conn(&s->si[1], (srv->priv_conns ? &srv->priv_conns[tid] : NULL)); + si_idle_cs(&s->si[1], (srv->priv_conns ? &srv->priv_conns[tid] : NULL)); else if (prev_flags & TX_NOT_FIRST) /* note: we check the request, not the connection, but * this is valid for strategies SAFE and AGGR, and in * case of ALWS, we don't care anyway. */ - si_idle_conn(&s->si[1], (srv->safe_conns ? &srv->safe_conns[tid] : NULL)); + si_idle_cs(&s->si[1], (srv->safe_conns ? &srv->safe_conns[tid] : NULL)); else - si_idle_conn(&s->si[1], (srv->idle_conns ? &srv->idle_conns[tid] : NULL)); + si_idle_cs(&s->si[1], (srv->idle_conns ? &srv->idle_conns[tid] : NULL)); } s->req.analysers = strm_li(s) ? strm_li(s)->analysers : 0; s->res.analysers = 0; @@ -7936,7 +7944,7 @@ void debug_hdr(const char *dir, struct stream *s, const char *start, const char chunk_printf(&trash, "%08x:%s.%s[%04x:%04x]: ", s->uniq_id, s->be->id, dir, objt_conn(sess->origin) ? (unsigned short)objt_conn(sess->origin)->handle.fd : -1, - objt_conn(s->si[1].end) ? (unsigned short)objt_conn(s->si[1].end)->handle.fd : -1); + objt_cs(s->si[1].end) ? (unsigned short)objt_cs(s->si[1].end)->conn->handle.fd : -1); for (max = 0; start + max < end; max++) if (start[max] == '\r' || start[max] == '\n') diff --git a/src/proto_tcp.c b/src/proto_tcp.c index d5345ee3c..5badda7e0 100644 --- a/src/proto_tcp.c +++ b/src/proto_tcp.c @@ -1598,11 +1598,11 @@ static inline int get_tcp_info(const struct arg *args, struct sample *smp, /* get the object associated with the stream interface.The * object can be other thing than a connection. For example, * it be a appctx. */ - conn = objt_conn(smp->strm->si[dir].end); + conn = cs_conn(objt_cs(smp->strm->si[dir].end)); if (!conn) return 0; - /* The fd may not be avalaible for the tcp_info struct, and the + /* The fd may not be available for the tcp_info struct, and the syscal can fail. */ optlen = sizeof(info); if (getsockopt(conn->handle.fd, SOL_TCP, TCP_INFO, &info, &optlen) == -1) diff --git a/src/stream.c b/src/stream.c index 4808cfa0d..722d2f4fd 100644 --- a/src/stream.c +++ b/src/stream.c @@ -74,11 +74,11 @@ static struct list service_keywords = LIST_HEAD_INIT(service_keywords); * valid right after the handshake, before the connection's data layer is * initialized, because it relies on the session to be in conn->owner. */ -int stream_create_from_conn(struct connection *conn) +int stream_create_from_cs(struct conn_stream *cs) { struct stream *strm; - strm = stream_new(conn->owner, &conn->obj_type); + strm = stream_new(cs->conn->owner, &cs->obj_type); if (strm == NULL) return -1; @@ -99,7 +99,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin) { struct stream *s; struct task *t; - struct connection *conn = objt_conn(origin); + struct conn_stream *cs = objt_cs(origin); struct appctx *appctx = objt_appctx(origin); if (unlikely((s = pool_alloc2(pool2_stream)) == NULL)) @@ -198,8 +198,8 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin) s->si[0].hcto = sess->fe->timeout.clientfin; /* attach the incoming connection to the stream interface now. */ - if (conn) - si_attach_conn(&s->si[0], conn); + if (cs) + si_attach_cs(&s->si[0], cs); else if (appctx) si_attach_appctx(&s->si[0], appctx); @@ -261,8 +261,8 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin) goto out_fail_accept; /* finish initialization of the accepted file descriptor */ - if (conn) - conn_xprt_want_recv(conn); + if (cs) + cs_want_recv(cs); else if (appctx) si_applet_want_get(&s->si[0]); @@ -295,7 +295,8 @@ static void stream_free(struct stream *s) struct session *sess = strm_sess(s); struct proxy *fe = sess->fe; struct bref *bref, *back; - struct connection *cli_conn = objt_conn(s->si[0].end); + struct conn_stream *cli_cs = objt_cs(s->si[0].end); + struct connection *cli_conn = cs_conn(cli_cs); int i; if (s->pend_pos) @@ -343,6 +344,7 @@ static void stream_free(struct stream *s) http_end_txn(s); /* ensure the client-side transport layer is destroyed */ + /* XXX cognet: wrong for multiple streams in one connection */ if (cli_conn) { conn_stop_tracking(cli_conn); conn_full_close(cli_conn); @@ -577,7 +579,7 @@ static int sess_update_st_con_tcp(struct stream *s) struct stream_interface *si = &s->si[1]; struct channel *req = &s->req; struct channel *rep = &s->res; - struct connection *srv_conn = __objt_conn(si->end); + struct connection *srv_conn = __objt_cs(si->end)->conn; /* If we got an error, or if nothing happened and the connection timed * out, we must give up. The CER state handler will take care of retry @@ -597,6 +599,9 @@ static int sess_update_st_con_tcp(struct stream *s) si->exp = TICK_ETERNITY; si->state = SI_ST_CER; + /* XXX cognet: do we really want to kill the connection here ? + * Probably not for multiple streams. + */ conn_full_close(srv_conn); if (si->err_type) @@ -647,7 +652,8 @@ static int sess_update_st_con_tcp(struct stream *s) static int sess_update_st_cer(struct stream *s) { struct stream_interface *si = &s->si[1]; - struct connection *conn = objt_conn(si->end); + struct conn_stream *cs = objt_cs(si->end); + struct connection *conn = cs_conn(cs); /* we probably have to release last stream from the server */ if (objt_server(s->target)) { @@ -812,7 +818,7 @@ static void sess_establish(struct stream *s) req->flags |= CF_WAKE_ONCE; req->flags &= ~CF_WAKE_CONNECT; } - if (objt_conn(si->end)) { + if (objt_cs(si->end)) { /* real connections have timeouts */ req->wto = s->be->timeout.server; rep->rto = s->be->timeout.server; @@ -2111,8 +2117,8 @@ struct task *process_stream(struct task *t) if (!(req->flags & (CF_KERN_SPLICING|CF_SHUTR)) && req->to_forward && (global.tune.options & GTUNE_USE_SPLICE) && - (objt_conn(si_f->end) && __objt_conn(si_f->end)->xprt && __objt_conn(si_f->end)->xprt->rcv_pipe) && - (objt_conn(si_b->end) && __objt_conn(si_b->end)->xprt && __objt_conn(si_b->end)->xprt->snd_pipe) && + (objt_cs(si_f->end) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->rcv_pipe) && + (objt_cs(si_b->end) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->snd_pipe) && (pipes_used < global.maxpipes) && (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_REQ) || (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) && @@ -2292,8 +2298,8 @@ struct task *process_stream(struct task *t) if (!(res->flags & (CF_KERN_SPLICING|CF_SHUTR)) && res->to_forward && (global.tune.options & GTUNE_USE_SPLICE) && - (objt_conn(si_f->end) && __objt_conn(si_f->end)->xprt && __objt_conn(si_f->end)->xprt->snd_pipe) && - (objt_conn(si_b->end) && __objt_conn(si_b->end)->xprt && __objt_conn(si_b->end)->xprt->rcv_pipe) && + (objt_cs(si_f->end) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->snd_pipe) && + (objt_cs(si_b->end) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->rcv_pipe) && (pipes_used < global.maxpipes) && (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_RTR) || (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) && @@ -2361,8 +2367,8 @@ struct task *process_stream(struct task *t) si_b->prev_state == SI_ST_EST) { chunk_printf(&trash, "%08x:%s.srvcls[%04x:%04x]\n", s->uniq_id, s->be->id, - objt_conn(si_f->end) ? (unsigned short)objt_conn(si_f->end)->handle.fd : -1, - objt_conn(si_b->end) ? (unsigned short)objt_conn(si_b->end)->handle.fd : -1); + objt_cs(si_f->end) ? (unsigned short)objt_cs(si_f->end)->conn->handle.fd : -1, + objt_cs(si_b->end) ? (unsigned short)objt_cs(si_b->end)->conn->handle.fd : -1); shut_your_big_mouth_gcc(write(1, trash.str, trash.len)); } @@ -2370,8 +2376,8 @@ struct task *process_stream(struct task *t) si_f->prev_state == SI_ST_EST) { chunk_printf(&trash, "%08x:%s.clicls[%04x:%04x]\n", s->uniq_id, s->be->id, - objt_conn(si_f->end) ? (unsigned short)objt_conn(si_f->end)->handle.fd : -1, - objt_conn(si_b->end) ? (unsigned short)objt_conn(si_b->end)->handle.fd : -1); + objt_cs(si_f->end) ? (unsigned short)objt_cs(si_f->end)->conn->handle.fd : -1, + objt_cs(si_b->end) ? (unsigned short)objt_cs(si_b->end)->conn->handle.fd : -1); shut_your_big_mouth_gcc(write(1, trash.str, trash.len)); } } @@ -2460,8 +2466,8 @@ struct task *process_stream(struct task *t) (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) { chunk_printf(&trash, "%08x:%s.closed[%04x:%04x]\n", s->uniq_id, s->be->id, - objt_conn(si_f->end) ? (unsigned short)objt_conn(si_f->end)->handle.fd : -1, - objt_conn(si_b->end) ? (unsigned short)objt_conn(si_b->end)->handle.fd : -1); + objt_cs(si_f->end) ? (unsigned short)objt_cs(si_f->end)->conn->handle.fd : -1, + objt_cs(si_b->end) ? (unsigned short)objt_cs(si_b->end)->conn->handle.fd : -1); shut_your_big_mouth_gcc(write(1, trash.str, trash.len)); } @@ -2692,6 +2698,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st struct tm tm; extern const char *monthname[12]; char pn[INET6_ADDRSTRLEN]; + struct conn_stream *cs; struct connection *conn; struct appctx *tmpctx; @@ -2777,7 +2784,9 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st else chunk_appendf(&trash, " backend= (id=-1 mode=-)"); - conn = objt_conn(strm->si[1].end); + cs = objt_cs(strm->si[1].end); + conn = cs_conn(cs); + if (conn) conn_get_from_addr(conn); @@ -2869,14 +2878,16 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st TICKS_TO_MS(1000)) : "", strm->si[1].err_type); - if ((conn = objt_conn(strm->si[0].end)) != NULL) { + if ((cs = objt_cs(strm->si[0].end)) != NULL) { + conn = cs->conn; + chunk_appendf(&trash, " co0=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n", conn, conn_get_ctrl_name(conn), conn_get_xprt_name(conn), conn_get_mux_name(conn), - conn_get_data_name(conn), + cs_get_data_name(cs), obj_type_name(conn->target), obj_base_ptr(conn->target)); @@ -2898,14 +2909,16 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st tmpctx->applet->name); } - if ((conn = objt_conn(strm->si[1].end)) != NULL) { + if ((cs = objt_cs(strm->si[1].end)) != NULL) { + conn = cs->conn; + chunk_appendf(&trash, " co1=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n", conn, conn_get_ctrl_name(conn), conn_get_xprt_name(conn), conn_get_mux_name(conn), - conn_get_data_name(conn), + cs_get_data_name(cs), obj_type_name(conn->target), obj_base_ptr(conn->target)); @@ -3171,7 +3184,7 @@ static int cli_io_handler_dump_sess(struct appctx *appctx) human_time(TICKS_TO_MS(curr_strm->res.analyse_exp - now_ms), TICKS_TO_MS(1000)) : ""); - conn = objt_conn(curr_strm->si[0].end); + conn = cs_conn(objt_cs(curr_strm->si[0].end)); chunk_appendf(&trash, " s0=[%d,%1xh,fd=%d,ex=%s]", curr_strm->si[0].state, @@ -3181,7 +3194,7 @@ static int cli_io_handler_dump_sess(struct appctx *appctx) human_time(TICKS_TO_MS(curr_strm->si[0].exp - now_ms), TICKS_TO_MS(1000)) : ""); - conn = objt_conn(curr_strm->si[1].end); + conn = cs_conn(objt_cs(curr_strm->si[1].end)); chunk_appendf(&trash, " s1=[%d,%1xh,fd=%d,ex=%s]", curr_strm->si[1].state, diff --git a/src/stream_interface.c b/src/stream_interface.c index a5463b751..e9bc83157 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -50,11 +51,11 @@ static void stream_int_shutr_applet(struct stream_interface *si); static void stream_int_shutw_applet(struct stream_interface *si); static void stream_int_chk_rcv_applet(struct stream_interface *si); static void stream_int_chk_snd_applet(struct stream_interface *si); -static void si_conn_recv_cb(struct connection *conn); -static void si_conn_send_cb(struct connection *conn); -static int si_conn_wake_cb(struct connection *conn); -static int si_idle_conn_wake_cb(struct connection *conn); -static void si_idle_conn_null_cb(struct connection *conn); +static void si_cs_recv_cb(struct conn_stream *cs); +static void si_cs_send_cb(struct conn_stream *cs); +static int si_cs_wake_cb(struct conn_stream *cs); +static int si_idle_conn_wake_cb(struct conn_stream *cs); +static void si_idle_conn_null_cb(struct conn_stream *cs); /* stream-interface operations for embedded tasks */ struct si_ops si_embedded_ops = { @@ -83,9 +84,9 @@ struct si_ops si_applet_ops = { }; struct data_cb si_conn_cb = { - .recv = si_conn_recv_cb, - .send = si_conn_send_cb, - .wake = si_conn_wake_cb, + .recv = si_cs_recv_cb, + .send = si_cs_send_cb, + .wake = si_cs_wake_cb, .name = "STRM", }; @@ -337,8 +338,10 @@ int conn_si_send_proxy(struct connection *conn, unsigned int flag) * we've sent the whole proxy line. Otherwise we use connect(). */ while (conn->send_proxy_ofs) { + struct conn_stream *cs; int ret; + cs = conn->mux_ctx; /* The target server expects a PROXY line to be sent first. * If the send_proxy_ofs is negative, it corresponds to the * offset to start sending from then end of the proxy string @@ -348,10 +351,10 @@ int conn_si_send_proxy(struct connection *conn, unsigned int flag) * is attached to a stream interface. Otherwise we can only * send a LOCAL line (eg: for use with health checks). */ - if (conn->data == &si_conn_cb) { - struct stream_interface *si = conn->owner; - struct connection *remote = objt_conn(si_opposite(si)->end); - ret = make_proxy_line(trash.str, trash.size, objt_server(conn->target), remote); + if (conn->mux == &mux_pt_ops && cs->data_cb == &si_conn_cb) { + struct stream_interface *si = cs->data; + struct conn_stream *remote_cs = objt_cs(si_opposite(si)->end); + ret = make_proxy_line(trash.str, trash.size, objt_server(conn->target), remote_cs ? remote_cs->conn : NULL); } else { /* The target server expects a LOCAL line to be sent first. Retrieving @@ -414,9 +417,9 @@ int conn_si_send_proxy(struct connection *conn, unsigned int flag) * It simply sets the CO_FL_SOCK_RD_SH flag so that si_idle_conn_wake_cb() * is notified and can kill the connection. */ -static void si_idle_conn_null_cb(struct connection *conn) +static void si_idle_conn_null_cb(struct conn_stream *cs) { - conn_sock_drain(conn); + conn_sock_drain(cs->conn); } /* Callback to be used by connection I/O handlers when some activity is detected @@ -424,9 +427,10 @@ static void si_idle_conn_null_cb(struct connection *conn) * a close was detected on it. It returns 0 if it did nothing serious, or -1 if * it killed the connection. */ -static int si_idle_conn_wake_cb(struct connection *conn) +static int si_idle_conn_wake_cb(struct conn_stream *cs) { - struct stream_interface *si = conn->owner; + struct connection *conn = cs->conn; + struct stream_interface *si = cs->data; if (!conn_ctrl_ready(conn)) return 0; @@ -560,9 +564,10 @@ void stream_int_notify(struct stream_interface *si) * connection's polling based on the channels and stream interface's final * states. The function always returns 0. */ -static int si_conn_wake_cb(struct connection *conn) +static int si_cs_wake_cb(struct conn_stream *cs) { - struct stream_interface *si = conn->owner; + struct connection *conn = cs->conn; + struct stream_interface *si = cs->data; struct channel *ic = si_ic(si); struct channel *oc = si_oc(si); @@ -599,36 +604,37 @@ static int si_conn_wake_cb(struct connection *conn) * was done above (eg: maybe some buffers got emptied). */ if (channel_is_empty(oc)) - __conn_xprt_stop_send(conn); + __cs_stop_send(cs); if (si->flags & SI_FL_WAIT_ROOM) { - __conn_xprt_stop_recv(conn); + __cs_stop_recv(cs); } else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL && channel_may_recv(ic)) { - __conn_xprt_want_recv(conn); + __cs_want_recv(cs); } return 0; } /* * This function is called to send buffer data to a stream socket. - * It calls the transport layer's snd_buf function. It relies on the + * It calls the mux layer's snd_buf function. It relies on the * caller to commit polling changes. The caller should check conn->flags * for errors. */ -static void si_conn_send(struct connection *conn) +static void si_cs_send(struct conn_stream *cs) { - struct stream_interface *si = conn->owner; + struct connection *conn = cs->conn; + struct stream_interface *si = cs->data; struct channel *oc = si_oc(si); int ret; /* ensure it's only set if a write attempt has succeeded */ oc->flags &= ~CF_WRITE_PARTIAL; - if (oc->pipe && conn->xprt->snd_pipe) { - ret = conn->xprt->snd_pipe(conn, oc->pipe); + if (oc->pipe && conn->xprt->snd_pipe && conn->mux->snd_pipe) { + ret = conn->mux->snd_pipe(cs, oc->pipe); if (ret > 0) oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA; @@ -672,7 +678,7 @@ static void si_conn_send(struct connection *conn) if (oc->flags & CF_STREAMER) send_flag |= CO_SFL_STREAMER; - ret = conn->xprt->snd_buf(conn, oc->buf, send_flag); + ret = conn->mux->snd_buf(cs, oc->buf, send_flag); if (ret > 0) { oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA; @@ -766,25 +772,25 @@ void stream_int_update_conn(struct stream_interface *si) { struct channel *ic = si_ic(si); struct channel *oc = si_oc(si); - struct connection *conn = __objt_conn(si->end); + struct conn_stream *cs = __objt_cs(si->end); if (!(ic->flags & CF_SHUTR)) { /* Read not closed */ if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic)) - __conn_xprt_stop_recv(conn); + __cs_stop_recv(cs); else - __conn_xprt_want_recv(conn); + __cs_want_recv(cs); } if (!(oc->flags & CF_SHUTW)) { /* Write not closed */ if (channel_is_empty(oc)) - __conn_xprt_stop_send(conn); + __cs_stop_send(cs); else - __conn_xprt_want_send(conn); + __cs_want_send(cs); } - conn_cond_update_xprt_polling(conn); + cs_update_mux_polling(cs); } /* @@ -799,7 +805,8 @@ void stream_int_update_conn(struct stream_interface *si) */ static void stream_int_shutr_conn(struct stream_interface *si) { - struct connection *conn = __objt_conn(si->end); + struct conn_stream *cs = __objt_cs(si->end); + struct connection *conn = cs->conn; struct channel *ic = si_ic(si); ic->flags &= ~CF_SHUTR_NOW; @@ -813,6 +820,7 @@ static void stream_int_shutr_conn(struct stream_interface *si) return; if (si_oc(si)->flags & CF_SHUTW) { + /* XXX: should just close cs ? */ conn_full_close(conn); si->state = SI_ST_DIS; si->exp = TICK_ETERNITY; @@ -823,7 +831,7 @@ static void stream_int_shutr_conn(struct stream_interface *si) } else if (conn->ctrl) { /* we want the caller to disable polling on this FD */ - conn_xprt_stop_recv(conn); + cs_stop_recv(cs); } } @@ -837,7 +845,8 @@ static void stream_int_shutr_conn(struct stream_interface *si) */ static void stream_int_shutw_conn(struct stream_interface *si) { - struct connection *conn = __objt_conn(si->end); + struct conn_stream *cs = __objt_cs(si->end); + struct connection *conn = cs->conn; struct channel *ic = si_ic(si); struct channel *oc = si_oc(si); @@ -865,13 +874,21 @@ static void stream_int_shutw_conn(struct stream_interface *si) /* quick close, the socket is alredy shut anyway */ } else if (si->flags & SI_FL_NOLINGER) { - /* unclean data-layer shutdown */ - conn_xprt_shutw_hard(conn); + /* unclean data-layer shutdown, typically an aborted request + * or a forwarded shutdown from a client to a server due to + * option abortonclose. No need for the TLS layer to try to + * emit a shutdown message. + */ + cs_shutw_hard(cs); } else { - /* clean data-layer shutdown */ - conn_xprt_shutw(conn); - conn_sock_shutw(conn); + /* clean data-layer shutdown. This only happens on the + * frontend side, or on the backend side when forwarding + * a client close in TCP mode or in HTTP TUNNEL mode + * while option abortonclose is set. We want the TLS + * layer to try to signal it to the peer before we close. + */ + cs_shutw(cs); /* If the stream interface is configured to disable half-open * connections, we'll skip the shutdown(), but only if the @@ -920,25 +937,23 @@ static void stream_int_shutw_conn(struct stream_interface *si) static void stream_int_chk_rcv_conn(struct stream_interface *si) { struct channel *ic = si_ic(si); - struct connection *conn = __objt_conn(si->end); + struct conn_stream *cs = __objt_cs(si->end); if (unlikely(si->state > SI_ST_EST || (ic->flags & CF_SHUTR))) return; - conn_refresh_polling_flags(conn); - if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic)) { /* stop reading */ if (!(ic->flags & CF_DONT_READ)) /* full */ si->flags |= SI_FL_WAIT_ROOM; - __conn_xprt_stop_recv(conn); + __cs_stop_recv(cs); } else { /* (re)start reading */ si->flags &= ~SI_FL_WAIT_ROOM; - __conn_xprt_want_recv(conn); + __cs_want_recv(cs); } - conn_cond_update_xprt_polling(conn); + cs_update_mux_polling(cs); } @@ -950,7 +965,7 @@ static void stream_int_chk_rcv_conn(struct stream_interface *si) static void stream_int_chk_snd_conn(struct stream_interface *si) { struct channel *oc = si_oc(si); - struct connection *conn = __objt_conn(si->end); + struct conn_stream *cs = __objt_cs(si->end); /* ensure it's only set if a write attempt has succeeded */ oc->flags &= ~CF_WRITE_PARTIAL; @@ -965,7 +980,7 @@ static void stream_int_chk_snd_conn(struct stream_interface *si) !(si->flags & SI_FL_WAIT_DATA)) /* not waiting for data */ return; - if (conn->flags & CO_FL_XPRT_WR_ENA) { + if (cs->flags & CS_FL_DATA_WR_ENA) { /* already subscribed to write notifications, will be called * anyway, so let's avoid calling it especially if the reader * is not ready. @@ -973,16 +988,12 @@ static void stream_int_chk_snd_conn(struct stream_interface *si) return; } - /* Before calling the data-level operations, we have to prepare - * the polling flags to ensure we properly detect changes. - */ - conn_refresh_polling_flags(conn); - __conn_xprt_want_send(conn); + __cs_want_send(cs); - si_conn_send(conn); - if (conn->flags & CO_FL_ERROR) { + si_cs_send(cs); + if (cs->conn->flags & CO_FL_ERROR) { /* Write error on the file descriptor */ - __conn_xprt_stop_both(conn); + __cs_stop_both(cs); si->flags |= SI_FL_ERR; goto out_wakeup; } @@ -996,7 +1007,7 @@ static void stream_int_chk_snd_conn(struct stream_interface *si) * ->o limit was reached. Maybe we just wrote the last * chunk and need to close. */ - __conn_xprt_stop_send(conn); + __cs_stop_send(cs); if (((oc->flags & (CF_SHUTW|CF_AUTO_CLOSE|CF_SHUTW_NOW)) == (CF_AUTO_CLOSE|CF_SHUTW_NOW)) && (si->state == SI_ST_EST)) { @@ -1012,7 +1023,7 @@ static void stream_int_chk_snd_conn(struct stream_interface *si) /* Otherwise there are remaining data to be sent in the buffer, * which means we have to poll before doing so. */ - __conn_xprt_want_send(conn); + __cs_want_send(cs); si->flags &= ~SI_FL_WAIT_DATA; if (!tick_isset(oc->wex)) oc->wex = tick_add_ifset(now_ms, oc->wto); @@ -1052,17 +1063,18 @@ static void stream_int_chk_snd_conn(struct stream_interface *si) } /* commit possible polling changes */ - conn_cond_update_polling(conn); + cs_update_mux_polling(cs); } /* * This is the callback which is called by the connection layer to receive data - * into the buffer from the connection. It iterates over the transport layer's + * into the buffer from the connection. It iterates over the mux layer's * rcv_buf function. */ -static void si_conn_recv_cb(struct connection *conn) +static void si_cs_recv_cb(struct conn_stream *cs) { - struct stream_interface *si = conn->owner; + struct connection *conn = cs->conn; + struct stream_interface *si = cs->data; struct channel *ic = si_ic(si); int ret, max, cur_read; int read_poll = MAX_READ_POLL_LOOPS; @@ -1081,7 +1093,7 @@ static void si_conn_recv_cb(struct connection *conn) return; /* stop here if we reached the end of data */ - if (conn_xprt_read0_pending(conn)) + if (cs->flags & CS_FL_EOS) goto out_shutdown_r; cur_read = 0; @@ -1101,7 +1113,7 @@ static void si_conn_recv_cb(struct connection *conn) /* First, let's see if we may splice data across the channel without * using a buffer. */ - if (conn->xprt->rcv_pipe && + if (conn->xprt->rcv_pipe && conn->mux->rcv_pipe && (ic->pipe || ic->to_forward >= MIN_SPLICE_FORWARD) && ic->flags & CF_KERN_SPLICING) { if (buffer_not_empty(ic->buf)) { @@ -1120,7 +1132,7 @@ static void si_conn_recv_cb(struct connection *conn) } } - ret = conn->xprt->rcv_pipe(conn, ic->pipe, ic->to_forward); + ret = conn->mux->rcv_pipe(cs, ic->pipe, ic->to_forward); if (ret < 0) { /* splice not supported on this end, let's disable it */ ic->flags &= ~CF_KERN_SPLICING; @@ -1135,7 +1147,7 @@ static void si_conn_recv_cb(struct connection *conn) ic->flags |= CF_READ_PARTIAL; } - if (conn_xprt_read0_pending(conn)) + if (cs->flags & CS_FL_EOS) goto out_shutdown_r; if (conn->flags & CO_FL_ERROR) @@ -1146,7 +1158,7 @@ static void si_conn_recv_cb(struct connection *conn) * could soon be full. Let's stop before needing to poll. */ si->flags |= SI_FL_WAIT_ROOM; - __conn_xprt_stop_recv(conn); + __cs_stop_recv(cs); } /* splice not possible (anymore), let's go on on standard copy */ @@ -1177,7 +1189,7 @@ static void si_conn_recv_cb(struct connection *conn) break; } - ret = conn->xprt->rcv_buf(conn, ic->buf, max); + ret = conn->mux->rcv_buf(cs, ic->buf, max); if (ret <= 0) break; @@ -1203,9 +1215,12 @@ static void si_conn_recv_cb(struct connection *conn) } if ((ic->flags & CF_READ_DONTWAIT) || --read_poll <= 0) { - if (__conn_xprt_done_recv(conn)) - si->flags |= SI_FL_WAIT_ROOM; - break; + /* + * This used to be __conn_xprt_done_recv() + * This was changed to accomodate with the mux code, + * but we may have lost a worthwhile optimization. + */ + __cs_stop_recv(cs); } /* if too many bytes were missing from last read, it means that @@ -1271,7 +1286,7 @@ static void si_conn_recv_cb(struct connection *conn) if (conn->flags & CO_FL_ERROR) return; - if (conn_xprt_read0_pending(conn)) + if (cs->flags & CS_FL_EOS) /* connection closed */ goto out_shutdown_r; @@ -1291,9 +1306,10 @@ static void si_conn_recv_cb(struct connection *conn) * from the buffer to the connection. It iterates over the transport layer's * snd_buf function. */ -static void si_conn_send_cb(struct connection *conn) +static void si_cs_send_cb(struct conn_stream *cs) { - struct stream_interface *si = conn->owner; + struct connection *conn = cs->conn; + struct stream_interface *si = cs->data; if (conn->flags & CO_FL_ERROR) return; @@ -1307,7 +1323,7 @@ static void si_conn_send_cb(struct connection *conn) return; /* OK there are data waiting to be sent */ - si_conn_send(conn); + si_cs_send(cs); /* OK all done */ return; @@ -1320,7 +1336,7 @@ static void si_conn_send_cb(struct connection *conn) */ void stream_sock_read0(struct stream_interface *si) { - struct connection *conn = __objt_conn(si->end); + struct conn_stream *cs = __objt_cs(si->end); struct channel *ic = si_ic(si); struct channel *oc = si_oc(si); @@ -1340,17 +1356,17 @@ void stream_sock_read0(struct stream_interface *si) if (si->flags & SI_FL_NOHALF) { /* we want to immediately forward this close to the write side */ /* force flag on ssl to keep stream in cache */ - conn_xprt_shutw_hard(conn); + cs_shutw_hard(cs); goto do_close; } /* otherwise that's just a normal read shutdown */ - __conn_xprt_stop_recv(conn); + __cs_stop_recv(cs); return; do_close: /* OK we completely close the socket here just as if we went through si_shut[rw]() */ - conn_full_close(conn); + conn_full_close(cs->conn); ic->flags &= ~CF_SHUTR_NOW; ic->flags |= CF_SHUTR;