diff --git a/include/types/global.h b/include/types/global.h index a7f22ac7c..94e21a28b 100644 --- a/include/types/global.h +++ b/include/types/global.h @@ -124,6 +124,7 @@ extern int stopping; /* non zero means stopping in progress */ extern char hostname[MAX_HOSTNAME_LEN]; extern char localpeer[MAX_HOSTNAME_LEN]; extern struct list global_listener_queue; /* list of the temporarily limited listeners */ +extern struct task *global_listener_queue_task; #endif /* _TYPES_GLOBAL_H */ diff --git a/src/haproxy.c b/src/haproxy.c index 26fd021ef..48c5741c8 100644 --- a/src/haproxy.c +++ b/src/haproxy.c @@ -161,6 +161,8 @@ char localpeer[MAX_HOSTNAME_LEN]; /* list of the temporarily limited listeners because of lack of resource */ struct list global_listener_queue = LIST_HEAD_INIT(global_listener_queue); +struct task *global_listener_queue_task; +static struct task *manage_global_listener_queue(struct task *t); /*********************************************************************/ /* general purpose functions ***************************************/ @@ -546,6 +548,16 @@ void init(int argc, char **argv) exit(0); } + global_listener_queue_task = task_new(); + if (!global_listener_queue_task) { + Alert("Out of memory when initializing global task\n"); + exit(1); + } + /* very simple initialization, users will queue the task if needed */ + global_listener_queue_task->context = NULL; /* not even a context! */ + global_listener_queue_task->process = manage_global_listener_queue; + global_listener_queue_task->expire = TICK_ETERNITY; + /* now we know the buffer size, we can initialize the buffers */ init_buffer(); @@ -957,6 +969,7 @@ void deinit(void) free(global.desc); global.desc = NULL; free(fdtab); fdtab = NULL; free(oldpids); oldpids = NULL; + free(global_listener_queue_task); global_listener_queue_task = NULL; list_for_each_entry_safe(wl, wlb, &cfg_cfgfiles, list) { LIST_DEL(&wl->list); @@ -1018,6 +1031,39 @@ void run_poll_loop() } } +/* This is the global management task for listeners. It enables listeners waiting + * for global resources when there are enough free resource, or at least once in + * a while. It is designed to be called as a task. + */ +static struct task *manage_global_listener_queue(struct task *t) +{ + int next = TICK_ETERNITY; + fprintf(stderr, "coucou!\n"); + /* queue is empty, nothing to do */ + if (LIST_ISEMPTY(&global_listener_queue)) + goto out; + + /* If there are still too many concurrent connections, let's wait for + * some of them to go away. We don't need to re-arm the timer because + * each of them will scan the queue anyway. + */ + if (unlikely(actconn >= global.maxconn)) + goto out; + + /* We should periodically try to enable listeners waiting for a global + * resource here, because it is possible, though very unlikely, that + * they have been blocked by a temporary lack of global resource such + * as a file descriptor or memory and that the temporary condition has + * disappeared. + */ + if (!LIST_ISEMPTY(&global_listener_queue)) + dequeue_all_listeners(&global_listener_queue); + + out: + t->expire = next; + task_queue(t); + return t; +} int main(int argc, char **argv) { diff --git a/src/stream_sock.c b/src/stream_sock.c index d69c15395..61947ebfa 100644 --- a/src/stream_sock.c +++ b/src/stream_sock.c @@ -1212,12 +1212,18 @@ int stream_sock_accept(int fd) max_accept = max; } + /* Note: if we fail to allocate a connection because of configured + * limits, we'll schedule a new attempt worst 1 second later in the + * worst case. If we fail due to system limits or temporary resource + * shortage, we try again 100ms later in the worst case. + */ while (max_accept--) { struct sockaddr_storage addr; socklen_t laddr = sizeof(addr); if (unlikely(actconn >= global.maxconn)) { limit_listener(l, &global_listener_queue); + task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */ return 0; } @@ -1239,6 +1245,7 @@ int stream_sock_accept(int fd) "Proxy %s reached system FD limit at %d. Please check system tunables.\n", p->id, maxfd); limit_listener(l, &global_listener_queue); + task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */ return 0; case EMFILE: if (p) @@ -1246,6 +1253,7 @@ int stream_sock_accept(int fd) "Proxy %s reached process FD limit at %d. Please check 'ulimit-n' and restart.\n", p->id, maxfd); limit_listener(l, &global_listener_queue); + task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */ return 0; case ENOBUFS: case ENOMEM: @@ -1254,6 +1262,7 @@ int stream_sock_accept(int fd) "Proxy %s reached system memory limit at %d sockets. Please check system tunables.\n", p->id, maxfd); limit_listener(l, &global_listener_queue); + task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */ return 0; default: return 0; @@ -1266,6 +1275,7 @@ int stream_sock_accept(int fd) p->id); close(cfd); limit_listener(l, &global_listener_queue); + task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */ return 0; } @@ -1293,6 +1303,7 @@ int stream_sock_accept(int fd) continue; limit_listener(l, &global_listener_queue); + task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */ return 0; }