diff --git a/include/proto/fd.h b/include/proto/fd.h index 444d1b011..673eaae0f 100644 --- a/include/proto/fd.h +++ b/include/proto/fd.h @@ -35,15 +35,10 @@ /* public variables */ -extern volatile struct fdlist fd_cache; -extern volatile struct fdlist fd_cache_local[MAX_THREADS]; - extern volatile struct fdlist update_list; extern unsigned long *polled_mask; -extern unsigned long fd_cache_mask; // Mask of threads with events in the cache - extern THREAD_LOCAL int *fd_updt; // FD updates list extern THREAD_LOCAL int fd_nbupdt; // number of updates in the list @@ -51,8 +46,6 @@ extern int poller_wr_pipe[MAX_THREADS]; extern volatile int ha_used_fds; // Number of FDs we're currently using -__decl_hathreads(extern HA_RWLOCK_T __attribute__((aligned(64))) fdcache_lock); /* global lock to protect fd_cache array */ - /* Deletes an FD from the fdsets. * The file descriptor is also closed. */ @@ -103,11 +96,6 @@ int list_pollers(FILE *out); */ void run_poller(); -/* Scan and process the cached events. This should be called right after - * the poller. - */ -void fd_process_cached_events(); - void fd_add_to_fd_list(volatile struct fdlist *list, int fd, int off); void fd_rm_from_fd_list(volatile struct fdlist *list, int fd, int off); @@ -165,45 +153,6 @@ static inline void done_update_polling(int fd) } } -/* Allocates a cache entry for a file descriptor if it does not yet have one. - * This can be done at any time. - */ -static inline void fd_alloc_cache_entry(const int fd) -{ - _HA_ATOMIC_OR(&fd_cache_mask, fdtab[fd].thread_mask); - if (!(fdtab[fd].thread_mask & (fdtab[fd].thread_mask - 1))) - fd_add_to_fd_list(&fd_cache_local[my_ffsl(fdtab[fd].thread_mask) - 1], fd, offsetof(struct fdtab, cache)); - else - fd_add_to_fd_list(&fd_cache, fd, offsetof(struct fdtab, cache)); -} - -/* Removes entry used by fd from the FD cache and replaces it with the - * last one. - * If the fd has no entry assigned, return immediately. - */ -static inline void fd_release_cache_entry(const int fd) -{ - if (!(fdtab[fd].thread_mask & (fdtab[fd].thread_mask - 1))) - fd_rm_from_fd_list(&fd_cache_local[my_ffsl(fdtab[fd].thread_mask) - 1], fd, offsetof(struct fdtab, cache)); - else - fd_rm_from_fd_list(&fd_cache, fd, offsetof(struct fdtab, cache)); -} - -/* This function automatically enables/disables caching for an entry depending - * on its state. It is only called on state changes. - */ -static inline void fd_update_cache(int fd) -{ - /* only READY and ACTIVE states (the two with both flags set) require a cache entry */ - if (((fdtab[fd].state & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == (FD_EV_READY_R | FD_EV_ACTIVE_R)) || - ((fdtab[fd].state & (FD_EV_READY_W | FD_EV_ACTIVE_W)) == (FD_EV_READY_W | FD_EV_ACTIVE_W))) { - fd_alloc_cache_entry(fd); - } - else { - fd_release_cache_entry(fd); - } -} - /* * returns the FD's recv state (FD_EV_*) */ @@ -280,7 +229,6 @@ static inline int fd_active(const int fd) static inline void fd_stop_recv(int fd) { unsigned char old, new; - unsigned long locked; old = fdtab[fd].state; do { @@ -292,20 +240,12 @@ static inline void fd_stop_recv(int fd) if ((old ^ new) & FD_EV_POLLED_R) updt_fd_polling(fd); - - locked = atleast2(fdtab[fd].thread_mask); - if (locked) - HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); - fd_update_cache(fd); /* need an update entry to change the state */ - if (locked) - HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } /* Disable processing send events on fd */ static inline void fd_stop_send(int fd) { unsigned char old, new; - unsigned long locked; old = fdtab[fd].state; do { @@ -317,20 +257,12 @@ static inline void fd_stop_send(int fd) if ((old ^ new) & FD_EV_POLLED_W) updt_fd_polling(fd); - - locked = atleast2(fdtab[fd].thread_mask); - if (locked) - HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); - fd_update_cache(fd); /* need an update entry to change the state */ - if (locked) - HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } /* Disable processing of events on fd for both directions. */ static inline void fd_stop_both(int fd) { unsigned char old, new; - unsigned long locked; old = fdtab[fd].state; do { @@ -342,20 +274,12 @@ static inline void fd_stop_both(int fd) if ((old ^ new) & FD_EV_POLLED_RW) updt_fd_polling(fd); - - locked = atleast2(fdtab[fd].thread_mask); - if (locked) - HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); - fd_update_cache(fd); /* need an update entry to change the state */ - if (locked) - HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } /* Report that FD cannot receive anymore without polling (EAGAIN detected). */ static inline void fd_cant_recv(const int fd) { unsigned char old, new; - unsigned long locked; old = fdtab[fd].state; do { @@ -368,31 +292,15 @@ static inline void fd_cant_recv(const int fd) if ((old ^ new) & FD_EV_POLLED_R) updt_fd_polling(fd); - - locked = atleast2(fdtab[fd].thread_mask); - if (locked) - HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); - fd_update_cache(fd); /* need an update entry to change the state */ - if (locked) - HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } /* Report that FD may receive again without polling. */ static inline void fd_may_recv(const int fd) { - unsigned long locked; - /* marking ready never changes polled status */ if ((fdtab[fd].state & FD_EV_READY_R) || HA_ATOMIC_BTS(&fdtab[fd].state, FD_EV_READY_R_BIT)) return; - - locked = atleast2(fdtab[fd].thread_mask); - if (locked) - HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); - fd_update_cache(fd); /* need an update entry to change the state */ - if (locked) - HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } /* Disable readiness when polled. This is useful to interrupt reading when it @@ -403,7 +311,6 @@ static inline void fd_may_recv(const int fd) static inline void fd_done_recv(const int fd) { unsigned char old, new; - unsigned long locked; old = fdtab[fd].state; do { @@ -416,20 +323,12 @@ static inline void fd_done_recv(const int fd) if ((old ^ new) & FD_EV_POLLED_R) updt_fd_polling(fd); - - locked = atleast2(fdtab[fd].thread_mask); - if (locked) - HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); - fd_update_cache(fd); /* need an update entry to change the state */ - if (locked) - HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } /* Report that FD cannot send anymore without polling (EAGAIN detected). */ static inline void fd_cant_send(const int fd) { unsigned char old, new; - unsigned long locked; old = fdtab[fd].state; do { @@ -442,83 +341,47 @@ static inline void fd_cant_send(const int fd) if ((old ^ new) & FD_EV_POLLED_W) updt_fd_polling(fd); - - locked = atleast2(fdtab[fd].thread_mask); - if (locked) - HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); - fd_update_cache(fd); /* need an update entry to change the state */ - if (locked) - HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } /* Report that FD may send again without polling (EAGAIN not detected). */ static inline void fd_may_send(const int fd) { - unsigned long locked; - /* marking ready never changes polled status */ if ((fdtab[fd].state & FD_EV_READY_W) || HA_ATOMIC_BTS(&fdtab[fd].state, FD_EV_READY_W_BIT)) return; - - locked = atleast2(fdtab[fd].thread_mask); - if (locked) - HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); - fd_update_cache(fd); /* need an update entry to change the state */ - if (locked) - HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } /* Prepare FD to try to receive */ static inline void fd_want_recv(int fd) { unsigned char old, new; - unsigned long locked; old = fdtab[fd].state; do { if (old & FD_EV_ACTIVE_R) return; - new = old | FD_EV_ACTIVE_R; - if (!(new & FD_EV_READY_R)) - new |= FD_EV_POLLED_R; + new = old | FD_EV_ACTIVE_R | FD_EV_POLLED_R; } while (unlikely(!_HA_ATOMIC_CAS(&fdtab[fd].state, &old, new))); if ((old ^ new) & FD_EV_POLLED_R) updt_fd_polling(fd); - - locked = atleast2(fdtab[fd].thread_mask); - if (locked) - HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); - fd_update_cache(fd); /* need an update entry to change the state */ - if (locked) - HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } /* Prepare FD to try to send */ static inline void fd_want_send(int fd) { unsigned char old, new; - unsigned long locked; old = fdtab[fd].state; do { if (old & FD_EV_ACTIVE_W) return; - new = old | FD_EV_ACTIVE_W; - if (!(new & FD_EV_READY_W)) - new |= FD_EV_POLLED_W; + new = old | FD_EV_ACTIVE_W | FD_EV_POLLED_W; } while (unlikely(!_HA_ATOMIC_CAS(&fdtab[fd].state, &old, new))); if ((old ^ new) & FD_EV_POLLED_W) updt_fd_polling(fd); - - locked = atleast2(fdtab[fd].thread_mask); - if (locked) - HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); - fd_update_cache(fd); /* need an update entry to change the state */ - if (locked) - HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } /* Update events seen for FD and its state if needed. This should be called @@ -545,6 +408,9 @@ static inline void fd_update_events(int fd, int evts) if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR)) fd_may_send(fd); + + if (fdtab[fd].iocb) + fdtab[fd].iocb(fd); } /* Prepares for being polled */ diff --git a/include/types/activity.h b/include/types/activity.h index 37251502e..01e484fac 100644 --- a/include/types/activity.h +++ b/include/types/activity.h @@ -32,7 +32,6 @@ */ struct activity { unsigned int loops; // complete loops in run_poll_loop() - unsigned int wake_cache; // active fd_cache prevented poll() from sleeping unsigned int wake_tasks; // active tasks prevented poll() from sleeping unsigned int wake_signal; // pending signal prevented poll() from sleeping unsigned int poll_exp; // number of times poll() sees an expired timeout (includes wake_*) diff --git a/src/cli.c b/src/cli.c index ab4d3690a..16f6243a7 100644 --- a/src/cli.c +++ b/src/cli.c @@ -984,7 +984,7 @@ static int cli_io_handler_show_fd(struct appctx *appctx) li = fdt.owner; chunk_printf(&trash, - " %5d : st=0x%02x(R:%c%c%c W:%c%c%c) ev=0x%02x(%c%c%c%c%c) [%c%c] cnext=%d cprev=%d tmask=0x%lx umask=0x%lx owner=%p iocb=%p(%s)", + " %5d : st=0x%02x(R:%c%c%c W:%c%c%c) ev=0x%02x(%c%c%c%c%c) [%c%c] tmask=0x%lx umask=0x%lx owner=%p iocb=%p(%s)", fd, fdt.state, (fdt.state & FD_EV_POLLED_R) ? 'P' : 'p', @@ -1001,8 +1001,6 @@ static int cli_io_handler_show_fd(struct appctx *appctx) (fdt.ev & FD_POLL_IN) ? 'I' : 'i', fdt.linger_risk ? 'L' : 'l', fdt.cloned ? 'C' : 'c', - fdt.cache.next, - fdt.cache.prev, fdt.thread_mask, fdt.update_mask, fdt.owner, fdt.iocb, @@ -1119,7 +1117,6 @@ static int cli_io_handler_show_activity(struct appctx *appctx) chunk_appendf(&trash, "thread_id: %u (%u..%u)\n", tid + 1, 1, global.nbthread); chunk_appendf(&trash, "date_now: %lu.%06lu\n", (long)now.tv_sec, (long)now.tv_usec); chunk_appendf(&trash, "loops:"); SHOW_TOT(thr, activity[thr].loops); - chunk_appendf(&trash, "wake_cache:"); SHOW_TOT(thr, activity[thr].wake_cache); chunk_appendf(&trash, "wake_tasks:"); SHOW_TOT(thr, activity[thr].wake_tasks); chunk_appendf(&trash, "wake_signal:"); SHOW_TOT(thr, activity[thr].wake_signal); chunk_appendf(&trash, "poll_exp:"); SHOW_TOT(thr, activity[thr].poll_exp); diff --git a/src/debug.c b/src/debug.c index 059bc6b97..6b9d14988 100644 --- a/src/debug.c +++ b/src/debug.c @@ -45,7 +45,7 @@ void ha_thread_dump(struct buffer *buf, int thr, int calling_tid) chunk_appendf(buf, "%c%cThread %-2u: act=%d glob=%d wq=%d rq=%d tl=%d tlsz=%d rqsz=%d\n" - " stuck=%d fdcache=%d prof=%d", + " stuck=%d prof=%d", (thr == calling_tid) ? '*' : ' ', stuck ? '>' : ' ', thr + 1, thread_has_tasks(), !!(global_tasks_mask & thr_bit), @@ -55,7 +55,6 @@ void ha_thread_dump(struct buffer *buf, int thr, int calling_tid) task_per_thread[thr].task_list_size, task_per_thread[thr].rqueue_size, stuck, - !!(fd_cache_mask & thr_bit), !!(task_profiling_mask & thr_bit)); chunk_appendf(buf, diff --git a/src/ev_epoll.c b/src/ev_epoll.c index 6c09c0498..bd2d616cd 100644 --- a/src/ev_epoll.c +++ b/src/ev_epoll.c @@ -169,6 +169,8 @@ REGPRM3 static void _do_poll(struct poller *p, int exp, int wake) tv_leaving_poll(wait_time, status); thread_harmless_end(); + if (sleeping_thread_mask & tid_bit) + _HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit); /* process polled events */ diff --git a/src/ev_evports.c b/src/ev_evports.c index eae72d099..7842bf242 100644 --- a/src/ev_evports.c +++ b/src/ev_evports.c @@ -140,6 +140,8 @@ REGPRM3 static void _do_poll(struct poller *p, int exp, int wake) } thread_harmless_now(); + if (sleeping_thread_mask & tid_bit) + _HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit); /* * Determine how long to wait for events to materialise on the port. diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c index aea2ab738..692437731 100644 --- a/src/ev_kqueue.c +++ b/src/ev_kqueue.c @@ -164,6 +164,8 @@ REGPRM3 static void _do_poll(struct poller *p, int exp, int wake) tv_leaving_poll(wait_time, status); thread_harmless_end(); + if (sleeping_thread_mask & tid_bit) + _HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit); for (count = 0; count < status; count++) { unsigned int n = 0; diff --git a/src/ev_poll.c b/src/ev_poll.c index 54812f53b..b349c555f 100644 --- a/src/ev_poll.c +++ b/src/ev_poll.c @@ -159,6 +159,8 @@ REGPRM3 static void _do_poll(struct poller *p, int exp, int wake) } while (!_HA_ATOMIC_CAS(&maxfd, &old_maxfd, new_maxfd)); thread_harmless_now(); + if (sleeping_thread_mask & tid_bit) + _HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit); fd_nbupdt = 0; diff --git a/src/ev_select.c b/src/ev_select.c index 0ccf2f150..be88bc2bf 100644 --- a/src/ev_select.c +++ b/src/ev_select.c @@ -176,6 +176,8 @@ REGPRM3 static void _do_poll(struct poller *p, int exp, int wake) tv_leaving_poll(delta_ms, status); thread_harmless_end(); + if (sleeping_thread_mask & tid_bit) + _HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit); if (status <= 0) return; diff --git a/src/fd.c b/src/fd.c index e71c2ee8d..71df46e05 100644 --- a/src/fd.c +++ b/src/fd.c @@ -8,40 +8,6 @@ * as published by the Free Software Foundation; either version * 2 of the License, or (at your option) any later version. * - * This code implements an events cache for file descriptors. It remembers the - * readiness of a file descriptor after a return from poll() and the fact that - * an I/O attempt failed on EAGAIN. Events in the cache which are still marked - * ready and active are processed just as if they were reported by poll(). - * - * This serves multiple purposes. First, it significantly improves performance - * by avoiding to subscribe to polling unless absolutely necessary, so most - * events are processed without polling at all, especially send() which - * benefits from the socket buffers. Second, it is the only way to support - * edge-triggered pollers (eg: EPOLL_ET). And third, it enables I/O operations - * that are backed by invisible buffers. For example, SSL is able to read a - * whole socket buffer and not deliver it to the application buffer because - * it's full. Unfortunately, it won't be reported by a poller anymore until - * some new activity happens. The only way to call it again thus is to keep - * this readiness information in the cache and to access it without polling - * once the FD is enabled again. - * - * One interesting feature of the cache is that it maintains the principle - * of speculative I/O introduced in haproxy 1.3 : the first time an event is - * enabled, the FD is considered as ready so that the I/O attempt is performed - * via the cache without polling. And the polling happens only when EAGAIN is - * first met. This avoids polling for HTTP requests, especially when the - * defer-accept mode is used. It also avoids polling for sending short data - * such as requests to servers or short responses to clients. - * - * The cache consists in a list of active events and a list of updates. - * Active events are events that are expected to come and that we must report - * to the application until it asks to stop or asks to poll. Updates are new - * requests for changing an FD state. Updates are the only way to create new - * events. This is important because it means that the number of cached events - * cannot increase between updates and will only grow one at a time while - * processing updates. All updates must always be processed, though events - * might be processed by small batches if required. - * * There is no direct link between the FD and the updates list. There is only a * bit in the fdtab[] to indicate than a file descriptor is already present in * the updates list. Once an fd is present in the updates list, it will have to @@ -55,16 +21,6 @@ * is that unhandled events will still wake the poller up. Using an edge- * triggered poller such as EPOLL_ET will solve this issue though. * - * Since we do not want to scan all the FD list to find cached I/O events, - * we store them in a list consisting in a linear array holding only the FD - * indexes right now. Note that a closed FD cannot exist in the cache, because - * it is closed by fd_delete() which in turn calls fd_release_cache_entry() - * which always removes it from the list. - * - * The FD array has to hold a back reference to the cache. This reference is - * always valid unless the FD is not in the cache and is not updated, in which - * case the reference points to index 0. - * * The event state for an FD, as found in fdtab[].state, is maintained for each * direction. The state field is built this way, with R bits in the low nibble * and W bits in the high nibble for ease of access and debugging : @@ -175,12 +131,8 @@ struct poller pollers[MAX_POLLERS]; struct poller cur_poller; int nbpollers = 0; -volatile struct fdlist fd_cache ; // FD events cache -volatile struct fdlist fd_cache_local[MAX_THREADS]; // FD events local for each thread volatile struct fdlist update_list; // Global update list -unsigned long fd_cache_mask = 0; // Mask of threads with events in the cache - THREAD_LOCAL int *fd_updt = NULL; // FD updates list THREAD_LOCAL int fd_nbupdt = 0; // number of updates in the list THREAD_LOCAL int poller_rd_pipe = -1; // Pipe to wake the thread @@ -379,7 +331,6 @@ static void fd_dodelete(int fd, int do_close) if (cur_poller.clo) cur_poller.clo(fd); - fd_release_cache_entry(fd); fdtab[fd].state = 0; port_range_release_port(fdinfo[fd].port_range, fdinfo[fd].local_port); @@ -411,66 +362,6 @@ void fd_remove(int fd) fd_dodelete(fd, 0); } -static inline void fdlist_process_cached_events(volatile struct fdlist *fdlist) -{ - int fd, old_fd, e; - unsigned long locked; - - for (old_fd = fd = fdlist->first; fd != -1; fd = fdtab[fd].cache.next) { - if (fd == -2) { - fd = old_fd; - continue; - } else if (fd <= -3) - fd = -fd - 4; - if (fd == -1) - break; - old_fd = fd; - if (!(fdtab[fd].thread_mask & tid_bit)) - continue; - if (fdtab[fd].cache.next < -3) - continue; - - _HA_ATOMIC_OR(&fd_cache_mask, tid_bit); - locked = atleast2(fdtab[fd].thread_mask); - if (locked && HA_SPIN_TRYLOCK(FD_LOCK, &fdtab[fd].lock)) { - activity[tid].fd_lock++; - continue; - } - - e = fdtab[fd].state; - fdtab[fd].ev &= FD_POLL_STICKY; - - if ((e & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == (FD_EV_READY_R | FD_EV_ACTIVE_R)) - fdtab[fd].ev |= FD_POLL_IN; - - if ((e & (FD_EV_READY_W | FD_EV_ACTIVE_W)) == (FD_EV_READY_W | FD_EV_ACTIVE_W)) - fdtab[fd].ev |= FD_POLL_OUT; - - if (fdtab[fd].iocb && fdtab[fd].owner && fdtab[fd].ev) { - if (locked) - HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); - fdtab[fd].iocb(fd); - } - else { - fd_release_cache_entry(fd); - if (locked) - HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); - } - } -} - -/* Scan and process the cached events. This should be called right after - * the poller. The loop may cause new entries to be created, for example - * if a listener causes an accept() to initiate a new incoming connection - * wanting to attempt an recv(). - */ -void fd_process_cached_events() -{ - _HA_ATOMIC_AND(&fd_cache_mask, ~tid_bit); - fdlist_process_cached_events(&fd_cache_local[tid]); - fdlist_process_cached_events(&fd_cache); -} - #if defined(USE_CLOSEFROM) void my_closefrom(int start) { @@ -640,7 +531,6 @@ int init_pollers() if ((fdinfo = calloc(global.maxsock, sizeof(struct fdinfo))) == NULL) goto fail_info; - fd_cache.first = fd_cache.last = -1; update_list.first = update_list.last = -1; for (p = 0; p < global.maxsock; p++) { @@ -649,8 +539,6 @@ int init_pollers() fdtab[p].cache.next = -3; fdtab[p].update.next = -3; } - for (p = 0; p < global.nbthread; p++) - fd_cache_local[p].first = fd_cache_local[p].last = -1; do { bp = NULL; diff --git a/src/haproxy.c b/src/haproxy.c index dfd2819e1..4d58c5321 100644 --- a/src/haproxy.c +++ b/src/haproxy.c @@ -2496,9 +2496,7 @@ static void run_poll_loop() /* expire immediately if events are pending */ wake = 1; - if (fd_cache_mask & tid_bit) - activity[tid].wake_cache++; - else if (thread_has_tasks()) + if (thread_has_tasks()) activity[tid].wake_tasks++; else if (signal_queue_len && tid == 0) activity[tid].wake_signal++; @@ -2514,9 +2512,6 @@ static void run_poll_loop() /* The poller will ensure it returns around */ cur_poller.poll(&cur_poller, next, wake); - if (sleeping_thread_mask & tid_bit) - _HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit); - fd_process_cached_events(); activity[tid].loops++; } diff --git a/src/mux_h1.c b/src/mux_h1.c index 8de76d736..b8beaef24 100644 --- a/src/mux_h1.c +++ b/src/mux_h1.c @@ -48,7 +48,6 @@ #define H1C_F_CS_ERROR 0x00001000 /* connection must be closed ASAP because an error occurred */ #define H1C_F_CS_SHUTW_NOW 0x00002000 /* connection must be shut down for writes ASAP */ #define H1C_F_CS_SHUTDOWN 0x00004000 /* connection is shut down for read and writes */ -#define H1C_F_CS_WAIT_CONN 0x00008000 /* waiting for the connection establishment */ #define H1C_F_WAIT_NEXT_REQ 0x00010000 /* waiting for the next request to start, use keep-alive timeout */ #define H1C_F_UPG_H2C 0x00020000 /* set if an upgrade to h2 should be done */ @@ -451,9 +450,6 @@ static int h1_init(struct connection *conn, struct proxy *proxy, struct session t->expire = tick_add(now_ms, h1c->timeout); } - if (!(conn->flags & CO_FL_CONNECTED) || (conn->flags & CO_FL_HANDSHAKE)) - h1c->flags |= H1C_F_CS_WAIT_CONN; - /* Always Create a new H1S */ if (!h1s_create(h1c, conn->ctx, sess)) goto fail; @@ -1894,11 +1890,6 @@ static int h1_recv(struct h1c *h1c) if (h1c->wait_event.events & SUB_RETRY_RECV) return (b_data(&h1c->ibuf)); - if (!(conn->flags & CO_FL_ERROR) && h1c->flags & H1C_F_CS_WAIT_CONN) { - conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event); - return 0; - } - if (!h1_recv_allowed(h1c)) { rcvd = 1; goto end; @@ -1982,12 +1973,6 @@ static int h1_send(struct h1c *h1c) if (conn->flags & CO_FL_ERROR) return 0; - if (h1c->flags & H1C_F_CS_WAIT_CONN) { - if (!(h1c->wait_event.events & SUB_RETRY_SEND)) - conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_SEND, &h1c->wait_event); - return 0; - } - if (!b_data(&h1c->obuf)) goto end; @@ -2035,14 +2020,6 @@ static int h1_process(struct h1c * h1c) if (!conn->ctx) return -1; - if (h1c->flags & H1C_F_CS_WAIT_CONN) { - if (!(conn->flags & (CO_FL_CONNECTED|CO_FL_ERROR)) || - (!(conn->flags & CO_FL_ERROR) && (conn->flags & CO_FL_HANDSHAKE))) - goto end; - h1c->flags &= ~H1C_F_CS_WAIT_CONN; - h1_wake_stream_for_send(h1s); - } - if (!h1s) { if (h1c->flags & (H1C_F_CS_ERROR|H1C_F_CS_SHUTDOWN) || conn->flags & (CO_FL_ERROR | CO_FL_SOCK_WR_SH) || @@ -2103,10 +2080,7 @@ static struct task *h1_io_cb(struct task *t, void *ctx, unsigned short status) static void h1_reset(struct connection *conn) { - struct h1c *h1c = conn->ctx; - /* Reset the flags, and let the mux know we're waiting for a connection */ - h1c->flags = H1C_F_CS_WAIT_CONN; } static int h1_wake(struct connection *conn) @@ -2401,6 +2375,7 @@ static int h1_subscribe(struct conn_stream *cs, int event_type, void *param) { struct wait_event *sw; struct h1s *h1s = cs->ctx; + struct h1c *h1c = h1s->h1c; if (!h1s) return -1; @@ -2417,6 +2392,17 @@ static int h1_subscribe(struct conn_stream *cs, int event_type, void *param) BUG_ON(h1s->send_wait != NULL || (sw->events & SUB_RETRY_SEND)); sw->events |= SUB_RETRY_SEND; h1s->send_wait = sw; + /* + * If the conn_stream attempt to subscribe, and the + * mux isn't subscribed to the connection, then it + * probably means the connection wasn't established + * yet, so we have to subscribe. + */ + if (!(h1c->wait_event.events & SUB_RETRY_SEND)) + h1c->conn->xprt->subscribe(h1c->conn, + h1c->conn->xprt_ctx, + SUB_RETRY_SEND, + &h1c->wait_event); return 0; default: break; @@ -2461,7 +2447,13 @@ static size_t h1_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t coun return 0; h1c = h1s->h1c; - if (h1c->flags & H1C_F_CS_WAIT_CONN) + + /* If we're not connected yet, or we're waiting for a handshake, stop + * now, as we don't want to remove everything from the channel buffer + * before we're sure we can send it. + */ + if (!(h1c->conn->flags & CO_FL_CONNECTED) || + (h1c->conn->flags & CO_FL_HANDSHAKE)) return 0; while (count) { diff --git a/src/session.c b/src/session.c index 7def38734..7b2564e8c 100644 --- a/src/session.c +++ b/src/session.c @@ -175,7 +175,6 @@ int session_accept_fd(struct listener *l, int cfd, struct sockaddr_storage *addr if (l->options & LI_O_ACC_CIP) cli_conn->flags |= CO_FL_ACCEPT_CIP; - conn_xprt_want_recv(cli_conn); if (conn_xprt_init(cli_conn) < 0) goto out_free_conn;