diff --git a/include/common/hathreads.h b/include/common/hathreads.h index 4cf3db9be..574547f36 100644 --- a/include/common/hathreads.h +++ b/include/common/hathreads.h @@ -135,6 +135,27 @@ static inline void __ha_barrier_full(void) { } +static inline void thread_harmless_now() +{ +} + +static inline void thread_harmless_end() +{ +} + +static inline void thread_isolate() +{ +} + +static inline void thread_release() +{ +} + +static inline unsigned long thread_isolated() +{ + return 1; +} + #else /* USE_THREAD */ #include @@ -272,10 +293,34 @@ void thread_enter_sync(void); void thread_exit_sync(void); int thread_no_sync(void); int thread_need_sync(void); +void thread_harmless_till_end(); +void thread_isolate(); +void thread_release(); extern THREAD_LOCAL unsigned int tid; /* The thread id */ extern THREAD_LOCAL unsigned long tid_bit; /* The bit corresponding to the thread id */ extern volatile unsigned long all_threads_mask; +extern volatile unsigned long threads_want_rdv_mask; +extern volatile unsigned long threads_harmless_mask; + +/* explanation for threads_want_rdv_mask and threads_harmless_mask : + * - threads_want_rdv_mask is a bit field indicating all threads that have + * requested a rendez-vous of other threads using thread_isolate(). + * - threads_harmless_mask is a bit field indicating all threads that are + * currently harmless in that they promise not to access a shared resource. + * + * For a given thread, its bits in want_rdv and harmless can be translated like + * this : + * + * ----------+----------+---------------------------------------------------- + * want_rdv | harmless | description + * ----------+----------+---------------------------------------------------- + * 0 | 0 | thread not interested in RDV, possibly harmful + * 0 | 1 | thread not interested in RDV but harmless + * 1 | 1 | thread interested in RDV and waiting for its turn + * 1 | 0 | thread currently working isolated from others + * ----------+----------+---------------------------------------------------- + */ #define ha_sigmask(how, set, oldset) pthread_sigmask(how, set, oldset) @@ -286,6 +331,38 @@ static inline void ha_set_tid(unsigned int data) tid_bit = (1UL << tid); } +/* Marks the thread as harmless. Note: this must be true, i.e. the thread must + * not be touching any unprotected shared resource during this period. Usually + * this is called before poll(), but it may also be placed around very slow + * calls (eg: some crypto operations). Needs to be terminated using + * thread_harmless_end(). + */ +static inline void thread_harmless_now() +{ + HA_ATOMIC_OR(&threads_harmless_mask, tid_bit); +} + +/* Ends the harmless period started by thread_harmless_now(). Usually this is + * placed after the poll() call. If it is discovered that a job was running and + * is relying on the thread still being harmless, the thread waits for the + * other one to finish. + */ +static inline void thread_harmless_end() +{ + while (1) { + HA_ATOMIC_AND(&threads_harmless_mask, ~tid_bit); + if (likely((threads_want_rdv_mask & all_threads_mask) == 0)) + break; + thread_harmless_till_end(); + } +} + +/* an isolated thread has harmless cleared and want_rdv set */ +static inline unsigned long thread_isolated() +{ + return threads_want_rdv_mask & ~threads_harmless_mask & tid_bit; +} + #if defined(DEBUG_THREAD) || defined(DEBUG_FULL) diff --git a/src/ev_epoll.c b/src/ev_epoll.c index abc22ba76..f672c66cb 100644 --- a/src/ev_epoll.c +++ b/src/ev_epoll.c @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -141,6 +142,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) _update_fd(fd); } + thread_harmless_now(); + /* compute the epoll_wait() timeout */ if (!exp) wait_time = MAX_DELAY_MS; @@ -161,6 +164,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) tv_update_date(wait_time, status); measure_idle(); + thread_harmless_end(); + /* process polled events */ for (count = 0; count < status; count++) { diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c index bf7f666dc..087a07e78 100644 --- a/src/ev_kqueue.c +++ b/src/ev_kqueue.c @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -112,6 +113,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) changes = _update_fd(fd, changes); } + thread_harmless_now(); + if (changes) { #ifdef EV_RECEIPT kev[0].flags |= EV_RECEIPT; @@ -154,6 +157,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) tv_update_date(delta_ms, status); measure_idle(); + thread_harmless_end(); + for (count = 0; count < status; count++) { unsigned int n = 0; fd = kev[count].ident; diff --git a/src/ev_poll.c b/src/ev_poll.c index a2e8798e4..712bdf991 100644 --- a/src/ev_poll.c +++ b/src/ev_poll.c @@ -19,6 +19,7 @@ #include #include +#include #include #include @@ -156,6 +157,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) break; } while (!HA_ATOMIC_CAS(&maxfd, &old_maxfd, new_maxfd)); + thread_harmless_now(); + fd_nbupdt = 0; nbfd = 0; @@ -207,6 +210,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) tv_update_date(wait_time, status); measure_idle(); + thread_harmless_end(); + for (count = 0; status > 0 && count < nbfd; count++) { unsigned int n; int e = poll_events[count].revents; diff --git a/src/ev_select.c b/src/ev_select.c index 4890c49de..d248d6d44 100644 --- a/src/ev_select.c +++ b/src/ev_select.c @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -148,6 +149,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) break; } while (!HA_ATOMIC_CAS(&maxfd, &old_maxfd, new_maxfd)); + thread_harmless_now(); + fd_nbupdt = 0; /* let's restore fdset state */ @@ -186,6 +189,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) tv_update_date(delta_ms, status); measure_idle(); + thread_harmless_end(); + if (status <= 0) return; diff --git a/src/hathreads.c b/src/hathreads.c index 595a71717..69dcab9dc 100644 --- a/src/hathreads.c +++ b/src/hathreads.c @@ -30,6 +30,8 @@ void thread_sync_io_handler(int fd) static HA_SPINLOCK_T sync_lock; static int threads_sync_pipe[2]; static unsigned long threads_want_sync = 0; +volatile unsigned long threads_want_rdv_mask = 0; +volatile unsigned long threads_harmless_mask = 0; volatile unsigned long all_threads_mask = 1; // nbthread 1 assumed by default THREAD_LOCAL unsigned int tid = 0; THREAD_LOCAL unsigned long tid_bit = (1UL << 0); @@ -160,6 +162,68 @@ void thread_exit_sync() thread_sync_barrier(&barrier); } +/* Marks the thread as harmless until the last thread using the rendez-vous + * point quits. Given that we can wait for a long time, sched_yield() is used + * when available to offer the CPU resources to competing threads if needed. + */ +void thread_harmless_till_end() +{ + HA_ATOMIC_OR(&threads_harmless_mask, tid_bit); + while (threads_want_rdv_mask & all_threads_mask) { +#if _POSIX_PRIORITY_SCHEDULING + sched_yield(); +#else + pl_cpu_relax(); +#endif + } +} + +/* Isolates the current thread : request the ability to work while all other + * threads are harmless. Only returns once all of them are harmless, with the + * current thread's bit in threads_harmless_mask cleared. Needs to be completed + * using thread_release(). + */ +void thread_isolate() +{ + unsigned long old; + + HA_ATOMIC_OR(&threads_harmless_mask, tid_bit); + __ha_barrier_store(); + HA_ATOMIC_OR(&threads_want_rdv_mask, tid_bit); + + /* wait for all threads to become harmless */ + old = threads_harmless_mask; + while (1) { + if (unlikely((old & all_threads_mask) != all_threads_mask)) + old = threads_harmless_mask; + else if (HA_ATOMIC_CAS(&threads_harmless_mask, &old, old & ~tid_bit)) + break; + +#if _POSIX_PRIORITY_SCHEDULING + sched_yield(); +#else + pl_cpu_relax(); +#endif + } + /* one thread gets released at a time here, with its harmess bit off. + * The loss of this bit makes the other one continue to spin while the + * thread is working alone. + */ +} + +/* Cancels the effect of thread_isolate() by releasing the current thread's bit + * in threads_want_rdv_mask and by marking this thread as harmless until the + * last worker finishes. + */ +void thread_release() +{ + while (1) { + HA_ATOMIC_AND(&threads_want_rdv_mask, ~tid_bit); + if (!(threads_want_rdv_mask & all_threads_mask)) + break; + thread_harmless_till_end(); + } +} __attribute__((constructor)) static void __hathreads_init(void)