diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c index d9617c20e76..c1f99cdc25b 100644 --- a/src/backend/storage/aio/method_worker.c +++ b/src/backend/storage/aio/method_worker.c @@ -242,40 +242,51 @@ pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh) static void pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios) { - PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE]; + PgAioHandle **synchronous_ios = NULL; int nsync = 0; Latch *wakeup = NULL; int worker; Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE); - LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); - for (int i = 0; i < num_staged_ios; ++i) + if (LWLockConditionalAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE)) { - Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i])); - if (!pgaio_worker_submission_queue_insert(staged_ios[i])) + for (int i = 0; i < num_staged_ios; ++i) { - /* - * We'll do it synchronously, but only after we've sent as many as - * we can to workers, to maximize concurrency. - */ - synchronous_ios[nsync++] = staged_ios[i]; - continue; - } + Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i])); + if (!pgaio_worker_submission_queue_insert(staged_ios[i])) + { + /* + * Do the rest synchronously. If the queue is full, give up and + * do the rest synchronously. We're holding an exclusive lock + * on the queue so nothing can consume entries. + */ + synchronous_ios = &staged_ios[i]; + nsync = (num_staged_ios - i); - if (wakeup == NULL) - { - /* Choose an idle worker to wake up if we haven't already. */ - worker = pgaio_worker_choose_idle(); - if (worker >= 0) - wakeup = io_worker_control->workers[worker].latch; + break; + } - pgaio_debug_io(DEBUG4, staged_ios[i], - "choosing worker %d", - worker); + if (wakeup == NULL) + { + /* Choose an idle worker to wake up if we haven't already. */ + worker = pgaio_worker_choose_idle(); + if (worker >= 0) + wakeup = io_worker_control->workers[worker].latch; + + pgaio_debug_io(DEBUG4, staged_ios[i], + "choosing worker %d", + worker); + } } + LWLockRelease(AioWorkerSubmissionQueueLock); + } + else + { + /* do everything synchronously, no wakeup needed */ + synchronous_ios = staged_ios; + nsync = num_staged_ios; } - LWLockRelease(AioWorkerSubmissionQueueLock); if (wakeup) SetLatch(wakeup);