diff --git a/include/proto/applet.h b/include/proto/applet.h index 7cc2c0ad1..00ad63b6a 100644 --- a/include/proto/applet.h +++ b/include/proto/applet.h @@ -28,14 +28,11 @@ #include #include #include +#include extern unsigned int nb_applets; -extern unsigned long active_applets_mask; -extern unsigned int applets_active_queue; -__decl_hathreads(extern HA_SPINLOCK_T applet_active_lock); -extern struct list applet_active_queue; -void applet_run_active(); +struct task *task_run_applet(struct task *t, void *context, unsigned short state); static int inline appctx_res_wakeup(struct appctx *appctx); @@ -52,7 +49,7 @@ static inline void appctx_init(struct appctx *appctx, unsigned long thread_mask) appctx->chunk = NULL; appctx->io_release = NULL; appctx->thread_mask = thread_mask; - appctx->state = APPLET_SLEEPING; + appctx->state = 0; } /* Tries to allocate a new appctx and initialize its main fields. The appctx @@ -69,7 +66,13 @@ static inline struct appctx *appctx_new(struct applet *applet, unsigned long thr appctx->obj_type = OBJ_TYPE_APPCTX; appctx->applet = applet; appctx_init(appctx, thread_mask); - LIST_INIT(&appctx->runq); + appctx->t = task_new(thread_mask); + if (unlikely(appctx->t == NULL)) { + pool_free(pool_head_connection, appctx); + return NULL; + } + appctx->t->process = task_run_applet; + appctx->t->context = appctx; LIST_INIT(&appctx->buffer_wait.list); appctx->buffer_wait.target = appctx; appctx->buffer_wait.wakeup_cb = (int (*)(void *))appctx_res_wakeup; @@ -83,11 +86,8 @@ static inline struct appctx *appctx_new(struct applet *applet, unsigned long thr */ static inline void __appctx_free(struct appctx *appctx) { - if (!LIST_ISEMPTY(&appctx->runq)) { - LIST_DEL(&appctx->runq); - applets_active_queue--; - } - + task_delete(appctx->t); + task_free(appctx->t); if (!LIST_ISEMPTY(&appctx->buffer_wait.list)) { HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock); LIST_DEL(&appctx->buffer_wait.list); @@ -98,38 +98,27 @@ static inline void __appctx_free(struct appctx *appctx) pool_free(pool_head_connection, appctx); HA_ATOMIC_SUB(&nb_applets, 1); } + static inline void appctx_free(struct appctx *appctx) { - HA_SPIN_LOCK(APPLETS_LOCK, &applet_active_lock); - if (appctx->state & APPLET_RUNNING) { + /* The task is supposed to be run on this thread, so we can just + * check if it's running already (or about to run) or not + */ + if (!(appctx->t->state & TASK_RUNNING)) + __appctx_free(appctx); + else { + /* if it's running, or about to run, defer the freeing + * until the callback is called. + */ appctx->state |= APPLET_WANT_DIE; - HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock); - return; + task_wakeup(appctx->t, TASK_WOKEN_OTHER); } - __appctx_free(appctx); - HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock); } /* wakes up an applet when conditions have changed */ -static inline void __appctx_wakeup(struct appctx *appctx) -{ - if (LIST_ISEMPTY(&appctx->runq)) { - LIST_ADDQ(&applet_active_queue, &appctx->runq); - applets_active_queue++; - active_applets_mask |= appctx->thread_mask; - } -} - static inline void appctx_wakeup(struct appctx *appctx) { - HA_SPIN_LOCK(APPLETS_LOCK, &applet_active_lock); - if (appctx->state & APPLET_RUNNING) { - appctx->state |= APPLET_WOKEN_UP; - HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock); - return; - } - __appctx_wakeup(appctx); - HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock); + task_wakeup(appctx->t, TASK_WOKEN_OTHER); } /* Callback used to wake up an applet when a buffer is available. The applet @@ -139,19 +128,17 @@ static inline void appctx_wakeup(struct appctx *appctx) * requested */ static inline int appctx_res_wakeup(struct appctx *appctx) { - HA_SPIN_LOCK(APPLETS_LOCK, &applet_active_lock); - if (appctx->state & APPLET_RUNNING) { - if (appctx->state & APPLET_WOKEN_UP) { - HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock); - return 0; - } - appctx->state |= APPLET_WOKEN_UP; - HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock); - return 1; - } - __appctx_wakeup(appctx); - HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock); - return 1; + int ret; + + /* To detect if we have already been waken or not, we now that + * if the state contains TASK_RUNNING, but not just TASK_RUNNING. + * This is racy, but that's OK. At worst we will wake a little more + * tasks than necessary when a buffer is available. + */ + ret = ((appctx->state & TASK_RUNNING) != 0) && + ((appctx->state != TASK_RUNNING)); + task_wakeup(appctx->t, TASK_WOKEN_OTHER); + return ret; } diff --git a/include/proto/channel.h b/include/proto/channel.h index 274495f28..d66fc911b 100644 --- a/include/proto/channel.h +++ b/include/proto/channel.h @@ -36,7 +36,6 @@ #include #include -#include #include /* perform minimal intializations, report 0 in case of error, 1 if OK. */ @@ -456,7 +455,7 @@ static inline void channel_release_buffer(struct channel *chn, struct buffer_wai { if (chn->buf->size && buffer_empty(chn->buf)) { b_free(&chn->buf); - offer_buffers(wait->target, tasks_run_queue + applets_active_queue); + offer_buffers(wait->target, tasks_run_queue); } } diff --git a/include/types/applet.h b/include/types/applet.h index b0715866e..8b7c28fc0 100644 --- a/include/types/applet.h +++ b/include/types/applet.h @@ -45,17 +45,13 @@ struct applet { unsigned int timeout; /* execution timeout. */ }; -#define APPLET_SLEEPING 0x00 /* applet is currently sleeping or pending in active queue */ -#define APPLET_RUNNING 0x01 /* applet is currently running */ -#define APPLET_WOKEN_UP 0x02 /* applet was running and requested to woken up again */ -#define APPLET_WANT_DIE 0x04 /* applet was running and requested to die */ +#define APPLET_WANT_DIE 0x01 /* applet was running and requested to die */ #define APPCTX_CLI_ST1_PROMPT (1 << 0) #define APPCTX_CLI_ST1_PAYLOAD (1 << 1) /* Context of a running applet. */ struct appctx { - struct list runq; /* chaining in the applet run queue */ enum obj_type obj_type; /* OBJ_TYPE_APPCTX */ /* 3 unused bytes here */ unsigned short state; /* Internal appctx state */ @@ -72,6 +68,7 @@ struct appctx { int cli_severity_output; /* used within the cli_io_handler to format severity output of informational feedback */ struct buffer_wait buffer_wait; /* position in the list of objects waiting for a buffer */ unsigned long thread_mask; /* mask of thread IDs authorized to process the applet */ + struct task *t; /* task associated to the applet */ union { struct { diff --git a/include/types/global.h b/include/types/global.h index d603c4261..2ab124b62 100644 --- a/include/types/global.h +++ b/include/types/global.h @@ -183,7 +183,6 @@ struct activity { unsigned int loops; // complete loops in run_poll_loop() unsigned int wake_cache; // active fd_cache prevented poll() from sleeping unsigned int wake_tasks; // active tasks prevented poll() from sleeping - unsigned int wake_applets; // active applets prevented poll() from sleeping unsigned int wake_signal; // pending signal prevented poll() from sleeping unsigned int poll_exp; // number of times poll() sees an expired timeout (includes wake_*) unsigned int poll_drop; // poller dropped a dead FD from the update list diff --git a/src/applet.c b/src/applet.c index 77f984d91..d7fbb53eb 100644 --- a/src/applet.c +++ b/src/applet.c @@ -19,100 +19,36 @@ #include #include #include +#include unsigned int nb_applets = 0; -unsigned long active_applets_mask = 0; -unsigned int applets_active_queue = 0; -__decl_hathreads(HA_SPINLOCK_T applet_active_lock); /* spin lock related to applet active queue */ -struct list applet_active_queue = LIST_HEAD_INIT(applet_active_queue); - -void applet_run_active() +struct task *task_run_applet(struct task *t, void *context, unsigned short state) { - struct appctx *curr, *next; - struct stream_interface *si; - struct list applet_cur_queue = LIST_HEAD_INIT(applet_cur_queue); - int max_processed; + struct appctx *app = context; + struct stream_interface *si = app->owner; - max_processed = applets_active_queue; - if (max_processed > 200) - max_processed = 200; - - HA_SPIN_LOCK(APPLETS_LOCK, &applet_active_lock); - if (!(active_applets_mask & tid_bit)) { - HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock); - return; + if (app->state & APPLET_WANT_DIE) { + __appctx_free(app); + return NULL; } - active_applets_mask &= ~tid_bit; - curr = LIST_NEXT(&applet_active_queue, typeof(curr), runq); - while (&curr->runq != &applet_active_queue) { - next = LIST_NEXT(&curr->runq, typeof(next), runq); - if (curr->thread_mask & tid_bit) { - LIST_DEL(&curr->runq); - curr->state = APPLET_RUNNING; - LIST_ADDQ(&applet_cur_queue, &curr->runq); - applets_active_queue--; - max_processed--; - } - curr = next; - if (max_processed <= 0) { - active_applets_mask |= tid_bit; - break; - } - } - HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock); + /* Now we'll try to allocate the input buffer. We wake up the + * applet in all cases. So this is the applet responsibility to + * check if this buffer was allocated or not. This let a chance + * for applets to do some other processing if needed. */ + if (!channel_alloc_buffer(si_ic(si), &app->buffer_wait)) + si_applet_cant_put(si); - /* The list is only scanned from the head. This guarantees that if any - * applet removes another one, there is no side effect while walking - * through the list. + /* We always pretend the applet can't get and doesn't want to + * put, it's up to it to change this if needed. This ensures + * that one applet which ignores any event will not spin. */ - while (!LIST_ISEMPTY(&applet_cur_queue)) { - curr = LIST_ELEM(applet_cur_queue.n, typeof(curr), runq); - si = curr->owner; + si_applet_cant_get(si); + si_applet_stop_put(si); - /* Now we'll try to allocate the input buffer. We wake up the - * applet in all cases. So this is the applet responsibility to - * check if this buffer was allocated or not. This let a chance - * for applets to do some other processing if needed. */ - if (!channel_alloc_buffer(si_ic(si), &curr->buffer_wait)) - si_applet_cant_put(si); - - /* We always pretend the applet can't get and doesn't want to - * put, it's up to it to change this if needed. This ensures - * that one applet which ignores any event will not spin. - */ - si_applet_cant_get(si); - si_applet_stop_put(si); - - curr->applet->fct(curr); - si_applet_wake_cb(si); - channel_release_buffer(si_ic(si), &curr->buffer_wait); - - if (applet_cur_queue.n == &curr->runq) { - /* curr was left in the list, move it back to the active list */ - LIST_DEL(&curr->runq); - LIST_INIT(&curr->runq); - HA_SPIN_LOCK(APPLETS_LOCK, &applet_active_lock); - if (curr->state & APPLET_WANT_DIE) { - curr->state = APPLET_SLEEPING; - __appctx_free(curr); - } - else { - if (curr->state & APPLET_WOKEN_UP) { - curr->state = APPLET_SLEEPING; - __appctx_wakeup(curr); - } - else { - curr->state = APPLET_SLEEPING; - } - } - HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock); - } - } + app->applet->fct(app); + si_applet_wake_cb(si); + channel_release_buffer(si_ic(si), &app->buffer_wait); + return t; } -__attribute__((constructor)) -static void __applet_init(void) -{ - HA_SPIN_INIT(&applet_active_lock); -} diff --git a/src/cfgparse.c b/src/cfgparse.c index 023973f40..3d224b9be 100644 --- a/src/cfgparse.c +++ b/src/cfgparse.c @@ -84,6 +84,7 @@ #include #include #include +#include /* This is the SSLv3 CLIENT HELLO packet used in conjunction with the diff --git a/src/cli.c b/src/cli.c index 4adf68401..c34078f98 100644 --- a/src/cli.c +++ b/src/cli.c @@ -953,7 +953,6 @@ static int cli_io_handler_show_activity(struct appctx *appctx) chunk_appendf(&trash, "\nloops:"); for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].loops); chunk_appendf(&trash, "\nwake_cache:"); for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].wake_cache); chunk_appendf(&trash, "\nwake_tasks:"); for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].wake_tasks); - chunk_appendf(&trash, "\nwake_applets:"); for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].wake_applets); chunk_appendf(&trash, "\nwake_signal:"); for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].wake_signal); chunk_appendf(&trash, "\npoll_exp:"); for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].poll_exp); chunk_appendf(&trash, "\npoll_drop:"); for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].poll_drop); diff --git a/src/flt_spoe.c b/src/flt_spoe.c index 856050d07..c25630301 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -2840,8 +2840,7 @@ spoe_release_buffer(struct buffer **buf, struct buffer_wait *buffer_wait) /* Release the buffer if needed */ if ((*buf)->size) { b_free(buf); - offer_buffers(buffer_wait->target, - tasks_run_queue + applets_active_queue); + offer_buffers(buffer_wait->target, tasks_run_queue); } } diff --git a/src/haproxy.c b/src/haproxy.c index 4e61e4056..766758f62 100644 --- a/src/haproxy.c +++ b/src/haproxy.c @@ -90,7 +90,6 @@ #include #include -#include #include #include #include @@ -2419,8 +2418,6 @@ static void run_poll_loop() activity[tid].wake_cache++; else if (active_tasks_mask & tid_bit) activity[tid].wake_tasks++; - else if (active_applets_mask & tid_bit) - activity[tid].wake_applets++; else if (signal_queue_len) activity[tid].wake_signal++; else @@ -2429,7 +2426,6 @@ static void run_poll_loop() /* The poller will ensure it returns around */ cur_poller.poll(&cur_poller, exp); fd_process_cached_events(); - applet_run_active(); /* Synchronize all polling loops */ diff --git a/src/mux_h2.c b/src/mux_h2.c index 9c7b8284c..57b172250 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -17,7 +17,6 @@ #include #include #include -#include #include #include #include @@ -303,8 +302,7 @@ static inline void h2_release_buf(struct h2c *h2c, struct buffer **bptr) { if ((*bptr)->size) { b_free(bptr); - offer_buffers(h2c->buf_wait.target, - tasks_run_queue + applets_active_queue); + offer_buffers(h2c->buf_wait.target, tasks_run_queue); } } diff --git a/src/stream.c b/src/stream.c index 3ea89538c..7ad84e993 100644 --- a/src/stream.c +++ b/src/stream.c @@ -339,7 +339,7 @@ static void stream_free(struct stream *s) if (s->req.buf->size || s->res.buf->size) { b_drop(&s->req.buf); b_drop(&s->res.buf); - offer_buffers(NULL, tasks_run_queue + applets_active_queue); + offer_buffers(NULL, tasks_run_queue); } hlua_ctx_destroy(s->hlua); @@ -469,7 +469,7 @@ void stream_release_buffers(struct stream *s) * someone waiting, we can wake up a waiter and offer them. */ if (offer) - offer_buffers(s, tasks_run_queue + applets_active_queue); + offer_buffers(s, tasks_run_queue); } /* perform minimal intializations, report 0 in case of error, 1 if OK. */