mirror of
https://github.com/nextcloud/server.git
synced 2026-06-18 05:00:03 -04:00
fix(taskprocessing): claim tasks atomically with SKIP LOCKED + composite index
Replace the worker retry/ignore-list claim-loop with a single atomic SELECT ... FOR UPDATE SKIP LOCKED claim (SQLite bounded-retry fallback), preserving the no-duplicate guarantee while removing the thundering-herd contention that throttled backlog draining. Add a (status,type,last_updated) index via the table-creating migration + db:add-missing-indices listener. Signed-off-by: Yoan Bozhilov <bygadd@gmail.com> Assisted-by: Claude Code:claude-opus-4-8
This commit is contained in:
parent
947a490369
commit
aa5f45cecd
9 changed files with 451 additions and 25 deletions
|
|
@ -13,7 +13,6 @@ use OC\Core\Command\InterruptedException;
|
|||
use OCP\AppFramework\Utility\ITimeFactory;
|
||||
use OCP\IAppConfig;
|
||||
use OCP\TaskProcessing\Exception\Exception;
|
||||
use OCP\TaskProcessing\Exception\NotFoundException;
|
||||
use OCP\TaskProcessing\IManager;
|
||||
use OCP\TaskProcessing\ISynchronousProvider;
|
||||
use Psr\Log\LoggerInterface;
|
||||
|
|
@ -117,9 +116,10 @@ class WorkerCommand extends Base {
|
|||
* Attempt to process one task across all preferred synchronous providers.
|
||||
*
|
||||
* To avoid starvation, all eligible task types are first collected and then
|
||||
* the oldest scheduled task across all of them is fetched in a single query.
|
||||
* This ensures that tasks are processed in the order they were scheduled,
|
||||
* regardless of which provider handles them.
|
||||
* the oldest scheduled task across all of them is claimed in a single atomic
|
||||
* query (FOR UPDATE SKIP LOCKED, with a SQLite fallback). This ensures tasks
|
||||
* are processed in the order they were scheduled, regardless of which provider
|
||||
* handles them, and guarantees no two workers ever claim the same task.
|
||||
*
|
||||
* @param list<string> $taskTypes When non-empty, only providers for these task type IDs are considered.
|
||||
* @return bool True if a task was processed, false if no task was found
|
||||
|
|
@ -161,15 +161,21 @@ class WorkerCommand extends Base {
|
|||
return false;
|
||||
}
|
||||
|
||||
// Fetch the oldest scheduled task across all eligible task types in one query.
|
||||
// This naturally prevents starvation: regardless of how many tasks one provider
|
||||
// has queued, another provider's older tasks will be picked up first.
|
||||
// Atomically claim the oldest scheduled task across all eligible task types in
|
||||
// one query. SELECT ... FOR UPDATE SKIP LOCKED (with a SQLite fallback) both
|
||||
// fetches and marks the task RUNNING, so multiple workers never claim the same
|
||||
// task and no per-worker ignore-list / retry loop is needed. This also naturally
|
||||
// prevents starvation: regardless of how many tasks one provider has queued,
|
||||
// another provider's older tasks are picked up first.
|
||||
try {
|
||||
$task = $this->taskProcessingManager->getNextScheduledTask(array_keys($eligibleProviders));
|
||||
} catch (NotFoundException) {
|
||||
return false;
|
||||
$task = $this->taskProcessingManager->claimNextScheduledTask(array_keys($eligibleProviders));
|
||||
} catch (Exception $e) {
|
||||
$this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]);
|
||||
$this->logger->error('Unknown error while claiming scheduled TaskProcessing tasks', ['exception' => $e]);
|
||||
return false;
|
||||
}
|
||||
|
||||
if ($task === null) {
|
||||
// No schedulable task available right now.
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -224,5 +224,11 @@ class AddMissingIndicesListener implements IEventListener {
|
|||
['user', 'mountpoint'],
|
||||
['lengths' => [null, 128]]
|
||||
);
|
||||
|
||||
$event->addMissingIndex(
|
||||
'taskprocessing_tasks',
|
||||
'taskp_status_type_upd',
|
||||
['status', 'type', 'last_updated']
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -97,6 +97,7 @@ class Version30000Date20240429122720 extends SimpleMigrationStep {
|
|||
$table->addIndex(['status', 'type'], 'taskp_tasks_status_type');
|
||||
$table->addIndex(['last_updated'], 'taskp_tasks_updated');
|
||||
$table->addIndex(['user_id', 'app_id', 'custom_id'], 'taskp_tasks_uid_appid_cid');
|
||||
$table->addIndex(['status', 'type', 'last_updated'], 'taskp_status_type_upd');
|
||||
|
||||
return $schema;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ use OCP\AppFramework\Db\MultipleObjectsReturnedException;
|
|||
use OCP\AppFramework\Db\QBMapper;
|
||||
use OCP\AppFramework\Utility\ITimeFactory;
|
||||
use OCP\DB\Exception;
|
||||
use OCP\DB\QueryBuilder\ConflictResolutionMode;
|
||||
use OCP\DB\QueryBuilder\IQueryBuilder;
|
||||
use OCP\IDBConnection;
|
||||
|
||||
|
|
@ -75,6 +76,160 @@ class TaskMapper extends QBMapper {
|
|||
return $this->findEntity($qb);
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically claim the oldest scheduled task of the given task types and mark it RUNNING.
|
||||
*
|
||||
* This is the structural fix for the worker "claim loop": instead of every worker
|
||||
* racing for the single oldest task (a thundering herd that grows a per-worker
|
||||
* `id NOT IN (...)` ignore list and slows the SELECT), each worker claims a
|
||||
* *distinct* task in one round trip.
|
||||
*
|
||||
* On databases that support row-level locking with SKIP LOCKED
|
||||
* (MySQL/MariaDB/PostgreSQL) the claim is a single transaction:
|
||||
* SELECT ... WHERE status = SCHEDULED [AND type IN (...)]
|
||||
* ORDER BY last_updated ASC LIMIT 1 FOR UPDATE SKIP LOCKED
|
||||
* followed by a guarded UPDATE to RUNNING. Concurrent workers skip rows already
|
||||
* locked by another transaction, so no two workers ever claim the same task.
|
||||
*
|
||||
* SQLite does not support SKIP LOCKED (verified: Doctrine throws "Operation
|
||||
* 'SKIP LOCKED' is not supported by platform"), so we feature-detect via the DB
|
||||
* provider and fall back to the existing bounded {@see lockTask} retry, which is
|
||||
* still safe because the UPDATE ... WHERE status = SCHEDULED is itself atomic and
|
||||
* SQLite serialises writers.
|
||||
*
|
||||
* A task is only ever transitioned SCHEDULED -> RUNNING here; it is never marked
|
||||
* FAILED by claiming. If the task cannot be claimed (none scheduled, or it was
|
||||
* taken by another worker between SELECT and UPDATE) this returns null.
|
||||
*
|
||||
* @param list<string> $taskTypes When non-empty, only tasks of these task type IDs are considered.
|
||||
* @return Task|null The claimed task (status RUNNING), or null if nothing could be claimed.
|
||||
* @throws Exception
|
||||
*/
|
||||
public function claimOldestScheduledTask(array $taskTypes): ?Task {
|
||||
if ($this->db->getDatabaseProvider() === IDBConnection::PLATFORM_SQLITE) {
|
||||
// SKIP LOCKED is unsupported on SQLite: fall back to the bounded lock-and-retry claim.
|
||||
return $this->claimWithBoundedRetry($taskTypes);
|
||||
}
|
||||
|
||||
return $this->claimWithSkipLocked($taskTypes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomic claim using FOR UPDATE SKIP LOCKED in a single transaction.
|
||||
*
|
||||
* @param list<string> $taskTypes
|
||||
* @return Task|null
|
||||
* @throws Exception
|
||||
*/
|
||||
private function claimWithSkipLocked(array $taskTypes): ?Task {
|
||||
$this->db->beginTransaction();
|
||||
try {
|
||||
$qb = $this->db->getQueryBuilder();
|
||||
$qb->select(Task::COLUMNS)
|
||||
->from($this->tableName)
|
||||
->where($qb->expr()->eq('status', $qb->createPositionalParameter(\OCP\TaskProcessing\Task::STATUS_SCHEDULED, IQueryBuilder::PARAM_INT)))
|
||||
->orderBy('last_updated', 'ASC')
|
||||
->setMaxResults(1)
|
||||
->forUpdate(ConflictResolutionMode::SkipLocked);
|
||||
|
||||
if (!empty($taskTypes)) {
|
||||
$filter = [];
|
||||
foreach ($taskTypes as $taskType) {
|
||||
$filter[] = $qb->expr()->eq('type', $qb->createPositionalParameter($taskType));
|
||||
}
|
||||
$qb->andWhere($qb->expr()->orX(...$filter));
|
||||
}
|
||||
|
||||
$result = $qb->executeQuery();
|
||||
$row = $result->fetch();
|
||||
$result->closeCursor();
|
||||
|
||||
if ($row === false) {
|
||||
// Nothing schedulable (or every candidate is locked by another worker).
|
||||
$this->db->commit();
|
||||
return null;
|
||||
}
|
||||
|
||||
/** @var Task $task */
|
||||
$task = $this->mapRowToEntity($row);
|
||||
|
||||
// Record the start time at claim time: because the worker receives the task
|
||||
// already in status RUNNING, the later SCHEDULED -> RUNNING transition in
|
||||
// Manager::setTaskStatus is skipped and would otherwise never persist started_at.
|
||||
$startedAt = $this->timeFactory->now()->getTimestamp();
|
||||
|
||||
// Guarded transition SCHEDULED -> RUNNING. The row is locked for this
|
||||
// transaction, so the guard is belt-and-braces rather than strictly required.
|
||||
$update = $this->db->getQueryBuilder();
|
||||
$update->update($this->tableName)
|
||||
->set('status', $update->createPositionalParameter(\OCP\TaskProcessing\Task::STATUS_RUNNING, IQueryBuilder::PARAM_INT))
|
||||
->set('started_at', $update->createPositionalParameter($startedAt, IQueryBuilder::PARAM_INT))
|
||||
->where($update->expr()->eq('id', $update->createPositionalParameter($task->getId(), IQueryBuilder::PARAM_INT)))
|
||||
->andWhere($update->expr()->eq('status', $update->createPositionalParameter(\OCP\TaskProcessing\Task::STATUS_SCHEDULED, IQueryBuilder::PARAM_INT)));
|
||||
$affected = $update->executeStatement();
|
||||
|
||||
$this->db->commit();
|
||||
|
||||
if ($affected === 0) {
|
||||
// Lost the race (should not happen under SKIP LOCKED); leave the task SCHEDULED.
|
||||
return null;
|
||||
}
|
||||
|
||||
$task->setStatus(\OCP\TaskProcessing\Task::STATUS_RUNNING);
|
||||
$task->setStartedAt($startedAt);
|
||||
return $task;
|
||||
} catch (\Throwable $e) {
|
||||
$this->db->rollBack();
|
||||
throw $e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fallback claim for databases without SKIP LOCKED (SQLite).
|
||||
*
|
||||
* Repeatedly fetches the oldest scheduled task and attempts the atomic
|
||||
* UPDATE ... WHERE status = SCHEDULED. Tasks lost to another worker are added to a
|
||||
* short ignore list so the next iteration moves on. Bounded to avoid unbounded
|
||||
* looping under contention.
|
||||
*
|
||||
* @param list<string> $taskTypes
|
||||
* @return Task|null
|
||||
* @throws Exception
|
||||
*/
|
||||
private function claimWithBoundedRetry(array $taskTypes): ?Task {
|
||||
$taskIdsToIgnore = [];
|
||||
// A handful of attempts is plenty: on SQLite writers are serialised, so at most
|
||||
// a few rows can be claimed out from under us before we either win or run dry.
|
||||
for ($attempt = 0; $attempt < 10; $attempt++) {
|
||||
try {
|
||||
$task = $this->findOldestScheduledByType($taskTypes, $taskIdsToIgnore);
|
||||
} catch (DoesNotExistException) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if ($this->lockTask($task) !== 0) {
|
||||
$task->setStatus(\OCP\TaskProcessing\Task::STATUS_RUNNING);
|
||||
// Record the start time at claim time. lockTask only flips the status (and is
|
||||
// shared with other callers), so persist started_at with a targeted follow-up
|
||||
// UPDATE rather than changing lockTask's behaviour. The worker receives the task
|
||||
// already RUNNING, so Manager::setTaskStatus would otherwise never write it.
|
||||
$startedAt = $this->timeFactory->now()->getTimestamp();
|
||||
$update = $this->db->getQueryBuilder();
|
||||
$update->update($this->tableName)
|
||||
->set('started_at', $update->createPositionalParameter($startedAt, IQueryBuilder::PARAM_INT))
|
||||
->where($update->expr()->eq('id', $update->createPositionalParameter($task->getId(), IQueryBuilder::PARAM_INT)));
|
||||
$update->executeStatement();
|
||||
$task->setStartedAt($startedAt);
|
||||
return $task;
|
||||
}
|
||||
|
||||
// Another worker took it; skip this id and try the next oldest.
|
||||
$taskIdsToIgnore[] = $task->getId();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int $id
|
||||
* @param string|null $userId
|
||||
|
|
|
|||
|
|
@ -1338,6 +1338,21 @@ class Manager implements IManager {
|
|||
}
|
||||
}
|
||||
|
||||
#[\Override]
|
||||
public function claimNextScheduledTask(array $taskTypeIds = []): ?Task {
|
||||
try {
|
||||
$taskEntity = $this->taskMapper->claimOldestScheduledTask($taskTypeIds);
|
||||
if ($taskEntity === null) {
|
||||
return null;
|
||||
}
|
||||
return $taskEntity->toPublicTask();
|
||||
} catch (\OCP\DB\Exception $e) {
|
||||
throw new \OCP\TaskProcessing\Exception\Exception('There was a problem claiming the task', previous: $e);
|
||||
} catch (\JsonException $e) {
|
||||
throw new \OCP\TaskProcessing\Exception\Exception('There was a problem parsing JSON after claiming the task', previous: $e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes task input data and replaces fileIds with File objects
|
||||
*
|
||||
|
|
|
|||
|
|
@ -174,6 +174,22 @@ interface IManager {
|
|||
*/
|
||||
public function getNextScheduledTasks(array $taskTypeIds = [], array $taskIdsToIgnore = [], int $numberOfTasks = 1): array;
|
||||
|
||||
/**
|
||||
* Atomically claim the oldest scheduled task of the given task types and mark it RUNNING.
|
||||
*
|
||||
* Unlike {@see getNextScheduledTask} (which only fetches) this both selects and
|
||||
* locks the task in one step, so concurrent workers never claim the same task.
|
||||
* On databases supporting it this uses SELECT ... FOR UPDATE SKIP LOCKED; on
|
||||
* SQLite it falls back to a bounded lock-and-retry. The task is only ever
|
||||
* transitioned SCHEDULED -> RUNNING; it is never marked FAILED by claiming.
|
||||
*
|
||||
* @param list<string> $taskTypeIds When non-empty, only tasks of these task type IDs are considered.
|
||||
* @return Task|null The claimed task (status RUNNING), or null if nothing could be claimed.
|
||||
* @throws Exception If the query failed
|
||||
* @since 34.0.0
|
||||
*/
|
||||
public function claimNextScheduledTask(array $taskTypeIds = []): ?Task;
|
||||
|
||||
/**
|
||||
* @param int $id The id of the task
|
||||
* @param string|null $userId The user id that scheduled the task
|
||||
|
|
|
|||
|
|
@ -13,7 +13,6 @@ use OC\Core\Command\TaskProcessing\WorkerCommand;
|
|||
use OCP\AppFramework\Utility\ITimeFactory;
|
||||
use OCP\IAppConfig;
|
||||
use OCP\TaskProcessing\Exception\Exception;
|
||||
use OCP\TaskProcessing\Exception\NotFoundException;
|
||||
use OCP\TaskProcessing\IManager;
|
||||
use OCP\TaskProcessing\ISynchronousProvider;
|
||||
use OCP\TaskProcessing\Task;
|
||||
|
|
@ -89,7 +88,7 @@ class WorkerCommandTest extends TestCase {
|
|||
->willReturn($provider);
|
||||
|
||||
$this->manager->expects($this->once())
|
||||
->method('getNextScheduledTask')
|
||||
->method('claimNextScheduledTask')
|
||||
->with([$taskTypeId])
|
||||
->willReturn($task);
|
||||
|
||||
|
|
@ -120,7 +119,7 @@ class WorkerCommandTest extends TestCase {
|
|||
->method('getPreferredProvider');
|
||||
|
||||
$this->manager->expects($this->never())
|
||||
->method('getNextScheduledTask');
|
||||
->method('claimNextScheduledTask');
|
||||
|
||||
$input = new ArrayInput(['--once' => true], $this->command->getDefinition());
|
||||
$output = new NullOutput();
|
||||
|
|
@ -144,9 +143,9 @@ class WorkerCommandTest extends TestCase {
|
|||
->with($taskTypeId)
|
||||
->willReturn($preferredProvider);
|
||||
|
||||
// provider_a is not preferred (provider_b is), so getNextScheduledTask is never called
|
||||
// provider_a is not preferred (provider_b is), so claimNextScheduledTask is never called
|
||||
$this->manager->expects($this->never())
|
||||
->method('getNextScheduledTask');
|
||||
->method('claimNextScheduledTask');
|
||||
|
||||
$input = new ArrayInput(['--once' => true], $this->command->getDefinition());
|
||||
$output = new NullOutput();
|
||||
|
|
@ -169,10 +168,11 @@ class WorkerCommandTest extends TestCase {
|
|||
->with($taskTypeId)
|
||||
->willReturn($provider);
|
||||
|
||||
// The no-task path is now claimNextScheduledTask returning null (not an exception).
|
||||
$this->manager->expects($this->once())
|
||||
->method('getNextScheduledTask')
|
||||
->method('claimNextScheduledTask')
|
||||
->with([$taskTypeId])
|
||||
->willThrowException(new NotFoundException());
|
||||
->willReturn(null);
|
||||
|
||||
$this->manager->expects($this->never())
|
||||
->method('processTask');
|
||||
|
|
@ -200,13 +200,13 @@ class WorkerCommandTest extends TestCase {
|
|||
|
||||
$exception = new Exception('DB error');
|
||||
$this->manager->expects($this->once())
|
||||
->method('getNextScheduledTask')
|
||||
->method('claimNextScheduledTask')
|
||||
->with([$taskTypeId])
|
||||
->willThrowException($exception);
|
||||
|
||||
$this->logger->expects($this->once())
|
||||
->method('error')
|
||||
->with('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $exception]);
|
||||
->with('Unknown error while claiming scheduled TaskProcessing tasks', ['exception' => $exception]);
|
||||
|
||||
$this->manager->expects($this->never())
|
||||
->method('processTask');
|
||||
|
|
@ -240,9 +240,9 @@ class WorkerCommandTest extends TestCase {
|
|||
[$taskTypeId2, $provider2],
|
||||
]);
|
||||
|
||||
// All eligible task types are passed in a single query
|
||||
// All eligible task types are passed in a single atomic claim
|
||||
$this->manager->expects($this->once())
|
||||
->method('getNextScheduledTask')
|
||||
->method('claimNextScheduledTask')
|
||||
->with($this->equalTo([$taskTypeId1, $taskTypeId2]))
|
||||
->willReturn($task);
|
||||
|
||||
|
|
@ -278,7 +278,7 @@ class WorkerCommandTest extends TestCase {
|
|||
->willReturn($provider2);
|
||||
|
||||
$this->manager->expects($this->once())
|
||||
->method('getNextScheduledTask')
|
||||
->method('claimNextScheduledTask')
|
||||
->with([$taskTypeId2])
|
||||
->willReturn($task);
|
||||
|
||||
|
|
@ -307,7 +307,7 @@ class WorkerCommandTest extends TestCase {
|
|||
->method('getPreferredProvider');
|
||||
|
||||
$this->manager->expects($this->never())
|
||||
->method('getNextScheduledTask');
|
||||
->method('claimNextScheduledTask');
|
||||
|
||||
$input = new ArrayInput(['--once' => true, '--taskTypes' => ['type_b']], $this->command->getDefinition());
|
||||
$output = new NullOutput();
|
||||
|
|
@ -332,7 +332,7 @@ class WorkerCommandTest extends TestCase {
|
|||
->willReturn($provider);
|
||||
|
||||
$this->manager->expects($this->once())
|
||||
->method('getNextScheduledTask')
|
||||
->method('claimNextScheduledTask')
|
||||
->with([$taskTypeId])
|
||||
->willReturn($task);
|
||||
|
||||
|
|
|
|||
90
tests/lib/TaskProcessing/SkipLockedSqlShapeTest.php
Normal file
90
tests/lib/TaskProcessing/SkipLockedSqlShapeTest.php
Normal file
|
|
@ -0,0 +1,90 @@
|
|||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
/**
|
||||
* SPDX-FileCopyrightText: 2026 Nextcloud GmbH and Nextcloud contributors
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
namespace Test\TaskProcessing;
|
||||
|
||||
use Doctrine\DBAL\Platforms\MySQL80Platform;
|
||||
use OC\DB\Connection;
|
||||
use OC\DB\ConnectionAdapter;
|
||||
use OC\DB\QueryBuilder\QueryBuilder;
|
||||
use OC\SystemConfig;
|
||||
use OCP\DB\QueryBuilder\ConflictResolutionMode;
|
||||
use OCP\DB\QueryBuilder\IQueryBuilder;
|
||||
use OCP\IDBConnection;
|
||||
use PHPUnit\Framework\MockObject\MockObject;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Test\TestCase;
|
||||
|
||||
/**
|
||||
* Guards the SKIP LOCKED claim query shape in CI.
|
||||
*
|
||||
* The atomic worker claim in {@see \OC\TaskProcessing\Db\TaskMapper::claimWithSkipLocked}
|
||||
* relies on the QueryBuilder emitting `... FOR UPDATE SKIP LOCKED` on databases that
|
||||
* support row-level locking (MySQL/MariaDB/PostgreSQL). True multi-transaction
|
||||
* concurrency cannot be exercised inside a single PHPUnit process, so this test
|
||||
* pins the generated SQL shape instead: it builds the exact claim query against a
|
||||
* non-SQLite platform and asserts both clauses are present. A regression that drops
|
||||
* the locking clause (silently turning the claim into a plain SELECT and reopening
|
||||
* the duplicate-claim race) would fail here.
|
||||
*/
|
||||
class SkipLockedSqlShapeTest extends TestCase {
|
||||
private SystemConfig&MockObject $systemConfig;
|
||||
private LoggerInterface&MockObject $logger;
|
||||
|
||||
#[\Override]
|
||||
protected function setUp(): void {
|
||||
parent::setUp();
|
||||
$this->systemConfig = $this->createMock(SystemConfig::class);
|
||||
$this->logger = $this->createMock(LoggerInterface::class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a QueryBuilder backed by a non-SQLite (MySQL 8) platform so the
|
||||
* generated SQL exposes the locking clause the way it would in production.
|
||||
*/
|
||||
private function newMysqlQueryBuilder(): QueryBuilder {
|
||||
$inner = $this->createMock(Connection::class);
|
||||
$inner->method('getDatabasePlatform')->willReturn(new MySQL80Platform());
|
||||
|
||||
$adapter = $this->createMock(ConnectionAdapter::class);
|
||||
$adapter->method('getInner')->willReturn($inner);
|
||||
$adapter->method('getDatabaseProvider')->willReturn(IDBConnection::PLATFORM_MYSQL);
|
||||
|
||||
return new QueryBuilder($adapter, $this->systemConfig, $this->logger);
|
||||
}
|
||||
|
||||
public function testClaimQueryContainsForUpdateSkipLocked(): void {
|
||||
$qb = $this->newMysqlQueryBuilder();
|
||||
$qb->select('id', 'status', 'type', 'last_updated')
|
||||
->from('taskprocessing_tasks')
|
||||
->where($qb->expr()->eq('status', $qb->createPositionalParameter(1, IQueryBuilder::PARAM_INT)))
|
||||
->orderBy('last_updated', 'ASC')
|
||||
->setMaxResults(1)
|
||||
->forUpdate(ConflictResolutionMode::SkipLocked);
|
||||
|
||||
$sql = $qb->getSQL();
|
||||
|
||||
self::assertStringContainsString('FOR UPDATE', $sql);
|
||||
self::assertStringContainsString('SKIP LOCKED', $sql);
|
||||
}
|
||||
|
||||
public function testOrdinaryForUpdateHasNoSkipLocked(): void {
|
||||
// Sanity check: only the SkipLocked mode adds the SKIP LOCKED clause.
|
||||
$qb = $this->newMysqlQueryBuilder();
|
||||
$qb->select('id')
|
||||
->from('taskprocessing_tasks')
|
||||
->setMaxResults(1)
|
||||
->forUpdate(ConflictResolutionMode::Ordinary);
|
||||
|
||||
$sql = $qb->getSQL();
|
||||
|
||||
self::assertStringContainsString('FOR UPDATE', $sql);
|
||||
self::assertStringNotContainsString('SKIP LOCKED', $sql);
|
||||
}
|
||||
}
|
||||
|
|
@ -1623,4 +1623,141 @@ class TaskProcessingTest extends \Test\TestCase {
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a single synchronous provider for TextToText so tasks can be scheduled.
|
||||
*
|
||||
* The integration suite shares one database across tests and does not truncate
|
||||
* between them, so we clear the tasks table first to make the oldest-scheduled
|
||||
* ordering of the claim deterministic for these focused tests.
|
||||
*/
|
||||
private function registerTextToTextProvider(): void {
|
||||
$db = Server::get(IDBConnection::class);
|
||||
$db->getQueryBuilder()->delete('taskprocessing_tasks')->executeStatement();
|
||||
|
||||
$this->appConfig->setValueString('core', 'ai.taskprocessing_type_preferences', '', lazy: true);
|
||||
$this->registrationContext->expects($this->any())->method('getTaskProcessingProviders')->willReturn([
|
||||
new ServiceRegistration('test', SuccessfulSyncProvider::class)
|
||||
]);
|
||||
self::assertTrue($this->manager->hasProviders());
|
||||
}
|
||||
|
||||
public function testClaimReturnsNullWhenNoScheduledTask(): void {
|
||||
$this->registerTextToTextProvider();
|
||||
|
||||
// No task scheduled => nothing to claim.
|
||||
self::assertNull($this->manager->claimNextScheduledTask([TextToText::ID]));
|
||||
self::assertNull($this->manager->claimNextScheduledTask());
|
||||
}
|
||||
|
||||
public function testClaimReturnsTaskAndSetsItRunning(): void {
|
||||
$this->registerTextToTextProvider();
|
||||
|
||||
$task = new Task(TextToText::ID, ['input' => 'Hello'], 'test', null);
|
||||
$this->manager->scheduleTask($task);
|
||||
self::assertEquals(Task::STATUS_SCHEDULED, $task->getStatus());
|
||||
$scheduledId = $task->getId();
|
||||
|
||||
$claimed = $this->manager->claimNextScheduledTask([TextToText::ID]);
|
||||
self::assertNotNull($claimed);
|
||||
self::assertEquals($scheduledId, $claimed->getId());
|
||||
// The returned task object reports RUNNING ...
|
||||
self::assertEquals(Task::STATUS_RUNNING, $claimed->getStatus());
|
||||
// ... and the change is persisted in the database.
|
||||
$persisted = $this->manager->getTask($scheduledId);
|
||||
self::assertEquals(Task::STATUS_RUNNING, $persisted->getStatus());
|
||||
}
|
||||
|
||||
public function testClaimNeverReturnsTheSameTaskTwice(): void {
|
||||
// No-duplicate invariant. We cannot run two truly concurrent DB transactions
|
||||
// inside one PHPUnit process, but the structural guarantee is the same: once a
|
||||
// task is claimed it is RUNNING (no longer SCHEDULED), so a second claim can
|
||||
// never return it again. Under real concurrency, FOR UPDATE SKIP LOCKED enforces
|
||||
// the same property by skipping rows another transaction has locked; that path
|
||||
// is additionally validated live on nc-ai.
|
||||
$this->registerTextToTextProvider();
|
||||
|
||||
$taskA = new Task(TextToText::ID, ['input' => 'A'], 'test', null);
|
||||
$this->manager->scheduleTask($taskA);
|
||||
$taskB = new Task(TextToText::ID, ['input' => 'B'], 'test', null);
|
||||
$this->manager->scheduleTask($taskB);
|
||||
|
||||
$firstClaim = $this->manager->claimNextScheduledTask([TextToText::ID]);
|
||||
$secondClaim = $this->manager->claimNextScheduledTask([TextToText::ID]);
|
||||
|
||||
self::assertNotNull($firstClaim);
|
||||
self::assertNotNull($secondClaim);
|
||||
// Two distinct tasks were handed out, never the same one twice.
|
||||
self::assertNotEquals($firstClaim->getId(), $secondClaim->getId());
|
||||
self::assertEqualsCanonicalizing(
|
||||
[$taskA->getId(), $taskB->getId()],
|
||||
[$firstClaim->getId(), $secondClaim->getId()],
|
||||
);
|
||||
|
||||
// Both are now RUNNING and the queue is drained.
|
||||
self::assertEquals(Task::STATUS_RUNNING, $this->manager->getTask($taskA->getId())->getStatus());
|
||||
self::assertEquals(Task::STATUS_RUNNING, $this->manager->getTask($taskB->getId())->getStatus());
|
||||
self::assertNull($this->manager->claimNextScheduledTask([TextToText::ID]));
|
||||
}
|
||||
|
||||
public function testClaimNeverMarksTaskFailed(): void {
|
||||
$this->registerTextToTextProvider();
|
||||
|
||||
$task = new Task(TextToText::ID, ['input' => 'Hello'], 'test', null);
|
||||
$this->manager->scheduleTask($task);
|
||||
$id = $task->getId();
|
||||
|
||||
$claimed = $this->manager->claimNextScheduledTask([TextToText::ID]);
|
||||
self::assertNotNull($claimed);
|
||||
|
||||
// Claiming only ever transitions SCHEDULED -> RUNNING, never to FAILED/CANCELLED.
|
||||
self::assertNotEquals(Task::STATUS_FAILED, $claimed->getStatus());
|
||||
self::assertNotEquals(Task::STATUS_CANCELLED, $claimed->getStatus());
|
||||
$persisted = $this->manager->getTask($id);
|
||||
self::assertNotEquals(Task::STATUS_FAILED, $persisted->getStatus());
|
||||
self::assertNotEquals(Task::STATUS_CANCELLED, $persisted->getStatus());
|
||||
self::assertEquals(Task::STATUS_RUNNING, $persisted->getStatus());
|
||||
self::assertNull($persisted->getErrorMessage());
|
||||
}
|
||||
|
||||
public function testClaimRespectsTaskTypeFilter(): void {
|
||||
$this->registerTextToTextProvider();
|
||||
|
||||
$task = new Task(TextToText::ID, ['input' => 'Hello'], 'test', null);
|
||||
$this->manager->scheduleTask($task);
|
||||
|
||||
// A type that does not match the only scheduled task must not be claimed.
|
||||
self::assertNull($this->manager->claimNextScheduledTask(['some:other:tasktype']));
|
||||
// The task is still SCHEDULED and claimable without a filter.
|
||||
self::assertEquals(Task::STATUS_SCHEDULED, $this->manager->getTask($task->getId())->getStatus());
|
||||
$claimed = $this->manager->claimNextScheduledTask();
|
||||
self::assertNotNull($claimed);
|
||||
self::assertEquals($task->getId(), $claimed->getId());
|
||||
}
|
||||
|
||||
public function testClaimRecordsStartedAt(): void {
|
||||
$this->registerTextToTextProvider();
|
||||
|
||||
$task = new Task(TextToText::ID, ['input' => 'Hello'], 'test', null);
|
||||
$this->manager->scheduleTask($task);
|
||||
// A scheduled task has not started yet.
|
||||
self::assertNull($this->manager->getTask($task->getId())->getStartedAt());
|
||||
|
||||
$before = time();
|
||||
$claimed = $this->manager->claimNextScheduledTask([TextToText::ID]);
|
||||
$after = time();
|
||||
|
||||
self::assertNotNull($claimed);
|
||||
// started_at is recorded at claim time on the returned task ...
|
||||
self::assertNotNull($claimed->getStartedAt());
|
||||
self::assertGreaterThanOrEqual($before, $claimed->getStartedAt());
|
||||
self::assertLessThanOrEqual($after, $claimed->getStartedAt());
|
||||
// ... and persisted in the database (since the worker receives the task already
|
||||
// RUNNING, the later setTaskStatus SCHEDULED -> RUNNING edge is skipped and would
|
||||
// otherwise never write started_at).
|
||||
$persisted = $this->manager->getTask($task->getId());
|
||||
self::assertNotNull($persisted->getStartedAt());
|
||||
self::assertGreaterThanOrEqual($before, $persisted->getStartedAt());
|
||||
self::assertLessThanOrEqual($after, $persisted->getStartedAt());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue