Conditional locking in pgaio_worker_submit_internal

With io_method=worker, there's a single I/O submission queue. With
enough workers, the backends and workers may end up spending a lot of
time competing for the AioWorkerSubmissionQueueLock lock. This can
happen with workloads that keep the queue full, in which case it's
impossible to add requests to the queue. Increasing the number of I/O
workers increases the pressure on the lock, worsening the issue.

This change improves the situation in two ways:

* If AioWorkerSubmissionQueueLock can't be acquired without waiting,
  the I/O is performed synchronously (as if the queue was full).

* When an entry can't be added to a full queue, stop trying to add more
  entries. All remaining entries are handled as synchronous I/O.

The regression was reported by Alexandre Felipe. Investigation and
patch by me, based on an idea by Andres Freund.

Reported-by: Alexandre Felipe <o.alexandre.felipe@gmail.com>
Author: Tomas Vondra <tomas@vondra.me>
Reviewed-by: Andres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/CAE8JnxOn4+xUAnce+M7LfZWOqfrMMxasMaEmSKwiKbQtZr65uA@mail.gmail.com
This commit is contained in:
Tomas Vondra 2026-03-11 12:11:04 +01:00
parent 9c05f152b5
commit 29a0fb2157

View file

@ -242,40 +242,51 @@ pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
static void
pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
{
PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
PgAioHandle **synchronous_ios = NULL;
int nsync = 0;
Latch *wakeup = NULL;
int worker;
Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
for (int i = 0; i < num_staged_ios; ++i)
if (LWLockConditionalAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE))
{
Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i]));
if (!pgaio_worker_submission_queue_insert(staged_ios[i]))
for (int i = 0; i < num_staged_ios; ++i)
{
/*
* We'll do it synchronously, but only after we've sent as many as
* we can to workers, to maximize concurrency.
*/
synchronous_ios[nsync++] = staged_ios[i];
continue;
}
Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i]));
if (!pgaio_worker_submission_queue_insert(staged_ios[i]))
{
/*
* Do the rest synchronously. If the queue is full, give up and
* do the rest synchronously. We're holding an exclusive lock
* on the queue so nothing can consume entries.
*/
synchronous_ios = &staged_ios[i];
nsync = (num_staged_ios - i);
if (wakeup == NULL)
{
/* Choose an idle worker to wake up if we haven't already. */
worker = pgaio_worker_choose_idle();
if (worker >= 0)
wakeup = io_worker_control->workers[worker].latch;
break;
}
pgaio_debug_io(DEBUG4, staged_ios[i],
"choosing worker %d",
worker);
if (wakeup == NULL)
{
/* Choose an idle worker to wake up if we haven't already. */
worker = pgaio_worker_choose_idle();
if (worker >= 0)
wakeup = io_worker_control->workers[worker].latch;
pgaio_debug_io(DEBUG4, staged_ios[i],
"choosing worker %d",
worker);
}
}
LWLockRelease(AioWorkerSubmissionQueueLock);
}
else
{
/* do everything synchronously, no wakeup needed */
synchronous_ios = staged_ios;
nsync = num_staged_ios;
}
LWLockRelease(AioWorkerSubmissionQueueLock);
if (wakeup)
SetLatch(wakeup);