diff --git a/include/common/hathreads.h b/include/common/hathreads.h index 0f10b48ca..e27ecc63f 100644 --- a/include/common/hathreads.h +++ b/include/common/hathreads.h @@ -256,6 +256,8 @@ void thread_exit_sync(void); int thread_no_sync(void); int thread_need_sync(void); +extern unsigned long all_threads_mask; + #if defined(DEBUG_THREAD) || defined(DEBUG_FULL) /* WARNING!!! if you update this enum, please also keep lock_label() up to date below */ diff --git a/include/proto/fd.h b/include/proto/fd.h index 543a42007..da09731d4 100644 --- a/include/proto/fd.h +++ b/include/proto/fd.h @@ -36,6 +36,8 @@ extern volatile struct fdlist fd_cache; extern volatile struct fdlist fd_cache_local[MAX_THREADS]; +extern volatile struct fdlist update_list; + extern unsigned long fd_cache_mask; // Mask of threads with events in the cache extern THREAD_LOCAL int *fd_updt; // FD updates list @@ -101,15 +103,57 @@ void fd_rm_from_fd_list(volatile struct fdlist *list, int fd, int off); */ static inline void updt_fd_polling(const int fd) { - unsigned int oldupdt; + if (fdtab[fd].thread_mask == tid_bit) { + unsigned int oldupdt; - /* note: we don't have a test-and-set yet in hathreads */ + /* note: we don't have a test-and-set yet in hathreads */ - if (HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid)) - return; + if (HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid)) + return; - oldupdt = HA_ATOMIC_ADD(&fd_nbupdt, 1) - 1; - fd_updt[oldupdt] = fd; + oldupdt = HA_ATOMIC_ADD(&fd_nbupdt, 1) - 1; + fd_updt[oldupdt] = fd; + } else { + unsigned long update_mask = fdtab[fd].update_mask; + do { + if (update_mask == fdtab[fd].thread_mask) + return; + } while (!HA_ATOMIC_CAS(&fdtab[fd].update_mask, &update_mask, + fdtab[fd].thread_mask)); + fd_add_to_fd_list(&update_list, fd, offsetof(struct fdtab, update)); + } + +} + +/* Called from the poller to acknoledge we read an entry from the global + * update list, to remove our bit from the update_mask, and remove it from + * the list if we were the last one. + */ +static inline void done_update_polling(int fd) +{ + unsigned long update_mask; + + update_mask = HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tid_bit); + while ((update_mask & all_threads_mask)== 0) { + /* If we were the last one that had to update that entry, remove it from the list */ + fd_rm_from_fd_list(&update_list, fd, offsetof(struct fdtab, update)); + if (update_list.first == fd) + abort(); + update_mask = (volatile unsigned long)fdtab[fd].update_mask; + if ((update_mask & all_threads_mask) != 0) { + /* Maybe it's been re-updated in the meanwhile, and we + * wrongly removed it from the list, if so, re-add it + */ + fd_add_to_fd_list(&update_list, fd, offsetof(struct fdtab, update)); + update_mask = (volatile unsigned long)(fdtab[fd].update_mask); + /* And then check again, just in case after all it + * should be removed, even if it's very unlikely, given + * the current thread wouldn't have been able to take + * care of it yet */ + } else + break; + + } } /* Allocates a cache entry for a file descriptor if it does not yet have one. diff --git a/include/types/fd.h b/include/types/fd.h index 0902e7fc4..aa18ebefc 100644 --- a/include/types/fd.h +++ b/include/types/fd.h @@ -117,6 +117,7 @@ struct fdtab { unsigned long polled_mask; /* mask of thread IDs currently polling this fd */ unsigned long update_mask; /* mask of thread IDs having an update for fd */ struct fdlist_entry cache; /* Entry in the fdcache */ + struct fdlist_entry update; /* Entry in the global update list */ void (*iocb)(int fd); /* I/O handler */ void *owner; /* the connection or listener associated with this fd, NULL if closed */ unsigned char state; /* FD state for read and write directions (2*3 bits) */ diff --git a/src/ev_epoll.c b/src/ev_epoll.c index a8e57973f..584bf64c9 100644 --- a/src/ev_epoll.c +++ b/src/ev_epoll.c @@ -59,16 +59,55 @@ REGPRM1 static void __fd_clo(int fd) } } +static void _update_fd(int fd) +{ + int en, opcode; + + en = fdtab[fd].state; + + if (fdtab[fd].polled_mask & tid_bit) { + if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) { + /* fd removed from poll list */ + opcode = EPOLL_CTL_DEL; + HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit); + } + else { + /* fd status changed */ + opcode = EPOLL_CTL_MOD; + } + } + else if ((fdtab[fd].thread_mask & tid_bit) && (en & FD_EV_POLLED_RW)) { + /* new fd in the poll list */ + opcode = EPOLL_CTL_ADD; + HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit); + } + else { + return; + } + + /* construct the epoll events based on new state */ + ev.events = 0; + if (en & FD_EV_POLLED_R) + ev.events |= EPOLLIN | EPOLLRDHUP; + + if (en & FD_EV_POLLED_W) + ev.events |= EPOLLOUT; + + ev.data.fd = fd; + epoll_ctl(epoll_fd[tid], opcode, fd, &ev); +} + /* * Linux epoll() poller */ REGPRM2 static void _do_poll(struct poller *p, int exp) { - int status, en; - int fd, opcode; + int status; + int fd; int count; int updt_idx; int wait_time; + int old_fd; /* first, scan the update list to find polling changes */ for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) { @@ -80,40 +119,27 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) continue; } - en = fdtab[fd].state; - - if (fdtab[fd].polled_mask & tid_bit) { - if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) { - /* fd removed from poll list */ - opcode = EPOLL_CTL_DEL; - HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit); - } - else { - /* fd status changed */ - opcode = EPOLL_CTL_MOD; - } - } - else if ((fdtab[fd].thread_mask & tid_bit) && (en & FD_EV_POLLED_RW)) { - /* new fd in the poll list */ - opcode = EPOLL_CTL_ADD; - HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit); - } - else { - continue; - } - - /* construct the epoll events based on new state */ - ev.events = 0; - if (en & FD_EV_POLLED_R) - ev.events |= EPOLLIN | EPOLLRDHUP; - - if (en & FD_EV_POLLED_W) - ev.events |= EPOLLOUT; - - ev.data.fd = fd; - epoll_ctl(epoll_fd[tid], opcode, fd, &ev); + _update_fd(fd); } fd_nbupdt = 0; + /* Scan the global update list */ + for (old_fd = fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) { + if (fd == -2) { + fd = old_fd; + continue; + } + else if (fd <= -3) + fd = -fd -4; + if (fd == -1) + break; + if (fdtab[fd].update_mask & tid_bit) + done_update_polling(fd); + else + continue; + if (!fdtab[fd].owner) + continue; + _update_fd(fd); + } /* compute the epoll_wait() timeout */ if (!exp) diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c index ebfd5d210..926f77c74 100644 --- a/src/ev_kqueue.c +++ b/src/ev_kqueue.c @@ -33,6 +33,41 @@ static int kqueue_fd[MAX_THREADS]; // per-thread kqueue_fd static THREAD_LOCAL struct kevent *kev = NULL; static struct kevent *kev_out = NULL; // Trash buffer for kevent() to write the eventlist in +static int _update_fd(int fd) +{ + int en; + int changes = 0; + + en = fdtab[fd].state; + + if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) { + if (!(fdtab[fd].polled_mask & tid_bit)) { + /* fd was not watched, it's still not */ + return 0; + } + /* fd totally removed from poll list */ + EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit); + } + else { + /* OK fd has to be monitored, it was either added or changed */ + + if (en & FD_EV_POLLED_R) + EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL); + else if (fdtab[fd].polled_mask & tid_bit) + EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + + if (en & FD_EV_POLLED_W) + EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL); + else if (fdtab[fd].polled_mask & tid_bit) + EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + + HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit); + } + return changes; +} + /* * kqueue() poller */ @@ -41,8 +76,9 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) int status; int count, fd, delta_ms; struct timespec timeout; - int updt_idx, en; + int updt_idx; int changes = 0; + int old_fd; timeout.tv_sec = 0; timeout.tv_nsec = 0; @@ -55,35 +91,27 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) activity[tid].poll_drop++; continue; } - - en = fdtab[fd].state; - - if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) { - if (!(fdtab[fd].polled_mask & tid_bit)) { - /* fd was not watched, it's still not */ - continue; - } - /* fd totally removed from poll list */ - EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); - EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); - HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit); - } - else { - /* OK fd has to be monitored, it was either added or changed */ - - if (en & FD_EV_POLLED_R) - EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL); - else if (fdtab[fd].polled_mask & tid_bit) - EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); - - if (en & FD_EV_POLLED_W) - EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL); - else if (fdtab[fd].polled_mask & tid_bit) - EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); - - HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit); - } + changes += _update_fd(fd); } + /* Scan the global update list */ + for (old_fd = fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) { + if (fd == -2) { + fd = old_fd; + continue; + } + else if (fd <= -3) + fd = -fd -4; + if (fd == -1) + break; + if (fdtab[fd].update_mask & tid_bit) + done_update_polling(fd); + else + continue; + if (!fdtab[fd].owner) + continue; + changes += _update_fd(fd); + } + if (changes) { #ifdef EV_RECEIPT kev[0].flags |= EV_RECEIPT; diff --git a/src/ev_poll.c b/src/ev_poll.c index 6093b652b..155ac821d 100644 --- a/src/ev_poll.c +++ b/src/ev_poll.c @@ -45,6 +45,44 @@ REGPRM1 static void __fd_clo(int fd) hap_fd_clr(fd, fd_evts[DIR_WR]); } +static void _update_fd(int fd, int *max_add_fd) +{ + int en; + + en = fdtab[fd].state; + + /* we have a single state for all threads, which is why we + * don't check the tid_bit. First thread to see the update + * takes it for every other one. + */ + if (!(en & FD_EV_POLLED_RW)) { + if (!fdtab[fd].polled_mask) { + /* fd was not watched, it's still not */ + return; + } + /* fd totally removed from poll list */ + hap_fd_clr(fd, fd_evts[DIR_RD]); + hap_fd_clr(fd, fd_evts[DIR_WR]); + HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0); + } + else { + /* OK fd has to be monitored, it was either added or changed */ + if (!(en & FD_EV_POLLED_R)) + hap_fd_clr(fd, fd_evts[DIR_RD]); + else + hap_fd_set(fd, fd_evts[DIR_RD]); + + if (!(en & FD_EV_POLLED_W)) + hap_fd_clr(fd, fd_evts[DIR_WR]); + else + hap_fd_set(fd, fd_evts[DIR_WR]); + + HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit); + if (fd > *max_add_fd) + *max_add_fd = fd; + } +} + /* * Poll() poller */ @@ -53,11 +91,12 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) int status; int fd; int wait_time; - int updt_idx, en; + int updt_idx; int fds, count; int sr, sw; int old_maxfd, new_maxfd, max_add_fd; unsigned rn, wn; /* read new, write new */ + int old_fd; max_add_fd = -1; @@ -70,39 +109,31 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) activity[tid].poll_drop++; continue; } + _update_fd(fd, &max_add_fd); + } - en = fdtab[fd].state; - - /* we have a single state for all threads, which is why we - * don't check the tid_bit. First thread to see the update - * takes it for every other one. - */ - if (!(en & FD_EV_POLLED_RW)) { - if (!fdtab[fd].polled_mask) { - /* fd was not watched, it's still not */ - continue; - } - /* fd totally removed from poll list */ - hap_fd_clr(fd, fd_evts[DIR_RD]); - hap_fd_clr(fd, fd_evts[DIR_WR]); - HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0); - } - else { - /* OK fd has to be monitored, it was either added or changed */ - if (!(en & FD_EV_POLLED_R)) - hap_fd_clr(fd, fd_evts[DIR_RD]); - else - hap_fd_set(fd, fd_evts[DIR_RD]); - - if (!(en & FD_EV_POLLED_W)) - hap_fd_clr(fd, fd_evts[DIR_WR]); - else - hap_fd_set(fd, fd_evts[DIR_WR]); - - HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit); - if (fd > max_add_fd) - max_add_fd = fd; + /* Now scan the global update list */ + for (old_fd = fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) { + if (fd == -2) { + fd = old_fd; + continue; } + else if (fd <= -3) + fd = -fd -4; + if (fd == -1) + break; + if (fdtab[fd].update_mask & tid_bit) { + /* Cheat a bit, as the state is global to all pollers + * we don't need every thread ot take care of the + * update. + */ + HA_ATOMIC_AND(&fdtab[fd].update_mask, ~all_threads_mask); + done_update_polling(fd); + } else + continue; + if (!fdtab[fd].owner) + continue; + _update_fd(fd, &max_add_fd); } /* maybe we added at least one fd larger than maxfd */ diff --git a/src/ev_select.c b/src/ev_select.c index 163a45839..ac4a36064 100644 --- a/src/ev_select.c +++ b/src/ev_select.c @@ -36,6 +36,44 @@ REGPRM1 static void __fd_clo(int fd) hap_fd_clr(fd, fd_evts[DIR_WR]); } +static void _update_fd(int fd, int *max_add_fd) +{ + int en; + + en = fdtab[fd].state; + + /* we have a single state for all threads, which is why we + * don't check the tid_bit. First thread to see the update + * takes it for every other one. + */ + if (!(en & FD_EV_POLLED_RW)) { + if (!fdtab[fd].polled_mask) { + /* fd was not watched, it's still not */ + return; + } + /* fd totally removed from poll list */ + hap_fd_clr(fd, fd_evts[DIR_RD]); + hap_fd_clr(fd, fd_evts[DIR_WR]); + HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0); + } + else { + /* OK fd has to be monitored, it was either added or changed */ + if (!(en & FD_EV_POLLED_R)) + hap_fd_clr(fd, fd_evts[DIR_RD]); + else + hap_fd_set(fd, fd_evts[DIR_RD]); + + if (!(en & FD_EV_POLLED_W)) + hap_fd_clr(fd, fd_evts[DIR_WR]); + else + hap_fd_set(fd, fd_evts[DIR_WR]); + + HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit); + if (fd > *max_add_fd) + *max_add_fd = fd; + } +} + /* * Select() poller */ @@ -46,10 +84,11 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) struct timeval delta; int delta_ms; int fds; - int updt_idx, en; + int updt_idx; char count; int readnotnull, writenotnull; int old_maxfd, new_maxfd, max_add_fd; + int old_fd; max_add_fd = -1; @@ -62,40 +101,32 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) activity[tid].poll_drop++; continue; } - - en = fdtab[fd].state; - - /* we have a single state for all threads, which is why we - * don't check the tid_bit. First thread to see the update - * takes it for every other one. - */ - if (!(en & FD_EV_POLLED_RW)) { - if (!fdtab[fd].polled_mask) { - /* fd was not watched, it's still not */ - continue; - } - /* fd totally removed from poll list */ - hap_fd_clr(fd, fd_evts[DIR_RD]); - hap_fd_clr(fd, fd_evts[DIR_WR]); - HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0); - } - else { - /* OK fd has to be monitored, it was either added or changed */ - if (!(en & FD_EV_POLLED_R)) - hap_fd_clr(fd, fd_evts[DIR_RD]); - else - hap_fd_set(fd, fd_evts[DIR_RD]); - - if (!(en & FD_EV_POLLED_W)) - hap_fd_clr(fd, fd_evts[DIR_WR]); - else - hap_fd_set(fd, fd_evts[DIR_WR]); - - HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit); - if (fd > max_add_fd) - max_add_fd = fd; - } + _update_fd(fd, &max_add_fd); } + /* Now scan the global update list */ + for (old_fd = fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) { + if (fd == -2) { + fd = old_fd; + continue; + } + else if (fd <= -3) + fd = -fd -4; + if (fd == -1) + break; + if (fdtab[fd].update_mask & tid_bit) { + /* Cheat a bit, as the state is global to all pollers + * we don't need every thread ot take care of the + * update. + */ + HA_ATOMIC_AND(&fdtab[fd].update_mask, ~all_threads_mask); + done_update_polling(fd); + } else + continue; + if (!fdtab[fd].owner) + continue; + _update_fd(fd, &max_add_fd); + } + /* maybe we added at least one fd larger than maxfd */ for (old_maxfd = maxfd; old_maxfd <= max_add_fd; ) { diff --git a/src/fd.c b/src/fd.c index 01de0e1ff..4e88d308f 100644 --- a/src/fd.c +++ b/src/fd.c @@ -169,6 +169,7 @@ int nbpollers = 0; volatile struct fdlist fd_cache ; // FD events cache volatile struct fdlist fd_cache_local[MAX_THREADS]; // FD events local for each thread +volatile struct fdlist update_list; // Global update list unsigned long fd_cache_mask = 0; // Mask of threads with events in the cache @@ -244,7 +245,6 @@ void fd_rm_from_fd_list(volatile struct fdlist *list, int fd, int off) int prev; int next; int last; - lock_self: #if (defined(HA_CAS_IS_8B) || defined(HA_HAVE_CAS_DW)) next_list.next = next_list.prev = -2; @@ -492,6 +492,7 @@ int init_pollers() goto fail_info; fd_cache.first = fd_cache.last = -1; + update_list.first = update_list.last = -1; hap_register_per_thread_init(init_pollers_per_thread); hap_register_per_thread_deinit(deinit_pollers_per_thread); @@ -499,7 +500,7 @@ int init_pollers() HA_SPIN_INIT(&fdtab[p].lock); /* Mark the fd as out of the fd cache */ fdtab[p].cache.next = -3; - fdtab[p].cache.next = -3; + fdtab[p].update.next = -3; } for (p = 0; p < global.nbthread; p++) fd_cache_local[p].first = fd_cache_local[p].last = -1; diff --git a/src/hathreads.c b/src/hathreads.c index 0d690f383..5db3c2197 100644 --- a/src/hathreads.c +++ b/src/hathreads.c @@ -31,7 +31,7 @@ void thread_sync_io_handler(int fd) static HA_SPINLOCK_T sync_lock; static int threads_sync_pipe[2]; static unsigned long threads_want_sync = 0; -static unsigned long all_threads_mask = 0; +unsigned long all_threads_mask = 0; #if defined(DEBUG_THREAD) || defined(DEBUG_FULL) struct lock_stat lock_stats[LOCK_LABELS];