From 29a0fb215779d10fae0cbeb8ce57805f244bad9b Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Wed, 11 Mar 2026 12:11:04 +0100 Subject: [PATCH] Conditional locking in pgaio_worker_submit_internal With io_method=worker, there's a single I/O submission queue. With enough workers, the backends and workers may end up spending a lot of time competing for the AioWorkerSubmissionQueueLock lock. This can happen with workloads that keep the queue full, in which case it's impossible to add requests to the queue. Increasing the number of I/O workers increases the pressure on the lock, worsening the issue. This change improves the situation in two ways: * If AioWorkerSubmissionQueueLock can't be acquired without waiting, the I/O is performed synchronously (as if the queue was full). * When an entry can't be added to a full queue, stop trying to add more entries. All remaining entries are handled as synchronous I/O. The regression was reported by Alexandre Felipe. Investigation and patch by me, based on an idea by Andres Freund. Reported-by: Alexandre Felipe Author: Tomas Vondra Reviewed-by: Andres Freund Discussion: https://postgr.es/m/CAE8JnxOn4+xUAnce+M7LfZWOqfrMMxasMaEmSKwiKbQtZr65uA@mail.gmail.com --- src/backend/storage/aio/method_worker.c | 55 +++++++++++++++---------- 1 file changed, 33 insertions(+), 22 deletions(-) diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c index d9617c20e76..c1f99cdc25b 100644 --- a/src/backend/storage/aio/method_worker.c +++ b/src/backend/storage/aio/method_worker.c @@ -242,40 +242,51 @@ pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh) static void pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios) { - PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE]; + PgAioHandle **synchronous_ios = NULL; int nsync = 0; Latch *wakeup = NULL; int worker; Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE); - LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); - for (int i = 0; i < num_staged_ios; ++i) + if (LWLockConditionalAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE)) { - Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i])); - if (!pgaio_worker_submission_queue_insert(staged_ios[i])) + for (int i = 0; i < num_staged_ios; ++i) { - /* - * We'll do it synchronously, but only after we've sent as many as - * we can to workers, to maximize concurrency. - */ - synchronous_ios[nsync++] = staged_ios[i]; - continue; - } + Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i])); + if (!pgaio_worker_submission_queue_insert(staged_ios[i])) + { + /* + * Do the rest synchronously. If the queue is full, give up and + * do the rest synchronously. We're holding an exclusive lock + * on the queue so nothing can consume entries. + */ + synchronous_ios = &staged_ios[i]; + nsync = (num_staged_ios - i); - if (wakeup == NULL) - { - /* Choose an idle worker to wake up if we haven't already. */ - worker = pgaio_worker_choose_idle(); - if (worker >= 0) - wakeup = io_worker_control->workers[worker].latch; + break; + } - pgaio_debug_io(DEBUG4, staged_ios[i], - "choosing worker %d", - worker); + if (wakeup == NULL) + { + /* Choose an idle worker to wake up if we haven't already. */ + worker = pgaio_worker_choose_idle(); + if (worker >= 0) + wakeup = io_worker_control->workers[worker].latch; + + pgaio_debug_io(DEBUG4, staged_ios[i], + "choosing worker %d", + worker); + } } + LWLockRelease(AioWorkerSubmissionQueueLock); + } + else + { + /* do everything synchronously, no wakeup needed */ + synchronous_ios = staged_ios; + nsync = num_staged_ios; } - LWLockRelease(AioWorkerSubmissionQueueLock); if (wakeup) SetLatch(wakeup);