diff --git a/src/queue.c b/src/queue.c index aa40fba72..7cad813bf 100644 --- a/src/queue.c +++ b/src/queue.c @@ -81,24 +81,28 @@ static void pendconn_unlink(struct pendconn *p) } /* Process the next pending connection from either a server or a proxy, and - * returns 0 on success. If no pending connection is found, 1 is returned. - * Note that neither nor may be NULL. Priority is given to the - * oldest request in the queue if both and have pending - * requests. This ensures that no request will be left unserved. The queue - * is not considered if the server (or a tracked server) is not RUNNING, is - * disabled, or has a null weight (server going down). The queue is still - * considered in this case, because if some connections remain there, it means - * that some requests have been forced there after it was seen down (eg: due to - * option persist). The stream is immediately marked as "assigned", and both - * its and are set to . + * returns a strictly positive value on success (see below). If no pending + * connection is found, 0 is returned. Note that neither nor may be + * NULL. Priority is given to the oldest request in the queue if both and + * have pending requests. This ensures that no request will be left + * unserved. The queue is not considered if the server (or a tracked + * server) is not RUNNING, is disabled, or has a null weight (server going + * down). The queue is still considered in this case, because if some + * connections remain there, it means that some requests have been forced there + * after it was seen down (eg: due to option persist). The stream is + * immediately marked as "assigned", and both its and are set + * to . * * This function must only be called if the server queue _AND_ the proxy queue - * are locked. Today it is only called by process_srv_queue. + * are locked. Today it is only called by process_srv_queue. When a pending + * connection is dequeued, this function returns 1 if the pending connection can + * be handled by the current thread, else it returns 2. */ static int pendconn_process_next_strm(struct server *srv, struct proxy *px) { struct pendconn *p = NULL; struct server *rsrv; + int remote; rsrv = srv->track; if (!rsrv) @@ -135,7 +139,7 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px) } if (!p) - return 1; + return 0; pendconn_found: pendconn_unlink(p); @@ -148,9 +152,12 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px) px->lbprm.server_take_conn(srv); __stream_add_srv_conn(p->strm, srv); + remote = !(p->strm->task->thread_mask & tid_bit); task_wakeup(p->strm->task, TASK_WOKEN_RES); HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock); - return 0; + + /* Returns 1 if the current thread can process the stream, otherwise returns 2. */ + return remote ? 2 : 1; } /* Manages a server's connection queue. This function will try to dequeue as @@ -159,17 +166,22 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px) void process_srv_queue(struct server *s) { struct proxy *p = s->proxy; - int maxconn; + int maxconn, remote = 0; HA_SPIN_LOCK(PROXY_LOCK, &p->lock); HA_SPIN_LOCK(SERVER_LOCK, &s->lock); maxconn = srv_dynamic_maxconn(s); while (s->served < maxconn) { - if (pendconn_process_next_strm(s, p)) + int ret = pendconn_process_next_strm(s, p); + if (!ret) break; + remote |= (ret == 2); } HA_SPIN_UNLOCK(SERVER_LOCK, &s->lock); HA_SPIN_UNLOCK(PROXY_LOCK, &p->lock); + + if (remote) + thread_want_sync(); } /* Adds the stream to the pending connection list of server ->srv @@ -231,6 +243,7 @@ int pendconn_redistribute(struct server *s) { struct pendconn *p, *pback; int xferred = 0; + int remote = 0; /* The REDISP option was specified. We will ignore cookie and force to * balance or use the dispatcher. */ @@ -249,10 +262,15 @@ int pendconn_redistribute(struct server *s) pendconn_unlink(p); p->strm_flags &= ~(SF_DIRECT | SF_ASSIGNED | SF_ADDR_SET); + remote |= !(p->strm->task->thread_mask & tid_bit); task_wakeup(p->strm->task, TASK_WOKEN_RES); HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock); } HA_SPIN_UNLOCK(SERVER_LOCK, &s->lock); + + if (remote) + thread_want_sync(); + return xferred; } @@ -265,6 +283,7 @@ int pendconn_grab_from_px(struct server *s) { struct pendconn *p, *pback; int maxconn, xferred = 0; + int remote = 0; if (!srv_currently_usable(s)) return 0; @@ -281,11 +300,16 @@ int pendconn_grab_from_px(struct server *s) pendconn_unlink(p); p->srv = s; + remote |= !(p->strm->task->thread_mask & tid_bit); task_wakeup(p->strm->task, TASK_WOKEN_RES); HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock); xferred++; } HA_SPIN_UNLOCK(PROXY_LOCK, &s->proxy->lock); + + if (remote) + thread_want_sync(); + return xferred; }