diff --git a/src/queue.c b/src/queue.c index 2c3b7215f..fe082def4 100644 --- a/src/queue.c +++ b/src/queue.c @@ -254,11 +254,14 @@ static struct pendconn *pendconn_first(struct eb_root *pendconns) * to . * * This function must only be called if the server queue _AND_ the proxy queue - * are locked. Today it is only called by process_srv_queue. When a pending - * connection is dequeued, this function returns 1 if the pending connection can - * be handled by the current thread, else it returns 2. + * are locked. Today it is only called by process_srv_queue. + * + * The function returns the dequeued pendconn on success or NULL if none is + * available. It's up to the caller to add the corresponding stream to the + * server's list, to update the LB algo, update ->served, and to wake up the + * stream's task. */ -static int pendconn_process_next_strm(struct server *srv, struct proxy *px) +static struct pendconn *pendconn_process_next_strm(struct server *srv, struct proxy *px) { struct pendconn *p = NULL; struct pendconn *pp = NULL; @@ -281,7 +284,7 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px) pp = pendconn_first(&px->pendconns); if (!p && !pp) - return 0; + return NULL; else if (!pp) goto use_p; /* p != NULL */ else if (!p) @@ -323,17 +326,8 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px) unlinked: p->strm_flags |= SF_ASSIGNED; p->target = srv; - - _HA_ATOMIC_INC(&srv->served); - _HA_ATOMIC_INC(&srv->proxy->served); __ha_barrier_atomic_store(); - if (px->lbprm.server_take_conn) - px->lbprm.server_take_conn(srv, 1); - stream_add_srv_conn(p->strm, srv); - - task_wakeup(p->strm->task, TASK_WOKEN_RES); - - return 1; + return p; } /* Manages a server's connection queue. This function will try to dequeue as @@ -343,6 +337,7 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px) void process_srv_queue(struct server *s, int server_locked) { struct proxy *p = s->proxy; + int done = 0; int maxconn; if (!server_locked) @@ -350,13 +345,26 @@ void process_srv_queue(struct server *s, int server_locked) HA_RWLOCK_WRLOCK(PROXY_LOCK, &p->lock); maxconn = srv_dynamic_maxconn(s); while (s->served < maxconn) { - int ret = pendconn_process_next_strm(s, p); - if (!ret) + struct pendconn *pc; + + pc = pendconn_process_next_strm(s, p); + if (!pc) break; + + done = 1; + + _HA_ATOMIC_INC(&s->served); + _HA_ATOMIC_INC(&p->served); + + stream_add_srv_conn(pc->strm, s); + task_wakeup(pc->strm->task, TASK_WOKEN_RES); } HA_RWLOCK_WRUNLOCK(PROXY_LOCK, &p->lock); if (!server_locked) HA_SPIN_UNLOCK(SERVER_LOCK, &s->lock); + + if (done && p->lbprm.server_take_conn) + p->lbprm.server_take_conn(s, server_locked); } /* Adds the stream to the pending connection queue of server ->srv