diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 17b88ff6453..0a8617ea761 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7108,8 +7108,18 @@ CreateCheckPoint(int flags) * according to synchronized LSNs of replication slots. The slot's LSN * might be advanced concurrently, so we call this before * CheckPointReplicationSlots() synchronizes replication slots. + * + * We acquire the Allocation lock to serialize the minimum LSN calculation + * with concurrent slot WAL reservation. This ensures that the WAL + * position being reserved is either included in the miminum LSN or is + * beyond or equal to the redo pointer of the current checkpoint (See + * ReplicationSlotReserveWal for details), thus preventing its removal by + * checkpoints. Note that this lock is required only during checkpoints + * where WAL removal is dictated by the slot's minimum LSN. */ + LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED); slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN(); + LWLockRelease(ReplicationSlotAllocationLock); /* * In some cases there are groups of actions that must all occur on one @@ -7309,7 +7319,10 @@ CreateCheckPoint(int flags) /* * Recalculate the current minimum LSN to be used in the WAL segment * cleanup. Then, we must synchronize the replication slots again in - * order to make this LSN safe to use. + * order to make this LSN safe to use. Here, we don't need to acquire + * the ReplicationSlotAllocationLock to serialize the minimum LSN + * computation with slot reservation as the RedoRecPtr is not updated + * after the previous computation of minimum LSN. */ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN(); CheckPointReplicationSlots(shutdown); @@ -7681,8 +7694,16 @@ CreateRestartPoint(int flags) * according to synchronized LSNs of replication slots. The slot's LSN * might be advanced concurrently, so we call this before * CheckPointReplicationSlots() synchronizes replication slots. + * + * We acquire the Allocation lock to serialize the minimum LSN calculation + * with concurrent slot WAL reservation. This ensures that the WAL + * position being reserved is either included in the miminum LSN or is + * beyond or equal to the redo pointer of the current checkpoint (See + * ReplicationSlotReserveWal for details). */ + LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED); slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN(); + LWLockRelease(ReplicationSlotAllocationLock); if (log_checkpoints) LogCheckpointStart(flags, true); @@ -7780,7 +7801,10 @@ CreateRestartPoint(int flags) /* * Recalculate the current minimum LSN to be used in the WAL segment * cleanup. Then, we must synchronize the replication slots again in - * order to make this LSN safe to use. + * order to make this LSN safe to use. Here, we don't need to acquire + * the ReplicationSlotAllocationLock to serialize the minimum LSN + * computation with slot reservation as the RedoRecPtr is not updated + * after the previous computation of minimum LSN. */ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN(); CheckPointReplicationSlots(flags & CHECKPOINT_IS_SHUTDOWN); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 4efbe85e5b6..4e54a8d0f14 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1459,62 +1459,65 @@ void ReplicationSlotReserveWal(void) { ReplicationSlot *slot = MyReplicationSlot; + XLogSegNo segno; + XLogRecPtr restart_lsn; Assert(slot != NULL); Assert(slot->data.restart_lsn == InvalidXLogRecPtr); /* - * The replication slot mechanism is used to prevent removal of required - * WAL. As there is no interlock between this routine and checkpoints, WAL - * segments could concurrently be removed when a now stale return value of - * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that - * this happens we'll just retry. + * The replication slot mechanism is used to prevent the removal of + * required WAL. + * + * Acquire an exclusive lock to prevent the checkpoint process from + * concurrently computing the minimum slot LSN (see the call to + * XLogGetReplicationSlotMinimumLSN in CreateCheckPoint). This ensures + * that the WAL reserved for replication cannot be removed during a + * checkpoint. + * + * The mechanism is reliable because if WAL reservation occurs first, the + * checkpoint must wait for the restart_lsn update before determining the + * minimum non-removable LSN. On the other hand, if the checkpoint happens + * first, subsequent WAL reservations will select positions at or beyond + * the redo pointer of that checkpoint. */ - while (true) - { - XLogSegNo segno; - XLogRecPtr restart_lsn; + LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE); - /* - * For logical slots log a standby snapshot and start logical decoding - * at exactly that position. That allows the slot to start up more - * quickly. But on a standby we cannot do WAL writes, so just use the - * replay pointer; effectively, an attempt to create a logical slot on - * standby will cause it to wait for an xl_running_xact record to be - * logged independently on the primary, so that a snapshot can be - * built using the record. - * - * None of this is needed (or indeed helpful) for physical slots as - * they'll start replay at the last logged checkpoint anyway. Instead - * return the location of the last redo LSN. While that slightly - * increases the chance that we have to retry, it's where a base - * backup has to start replay at. - */ - if (SlotIsPhysical(slot)) - restart_lsn = GetRedoRecPtr(); - else if (RecoveryInProgress()) - restart_lsn = GetXLogReplayRecPtr(NULL); - else - restart_lsn = GetXLogInsertRecPtr(); + /* + * For logical slots log a standby snapshot and start logical decoding at + * exactly that position. That allows the slot to start up more quickly. + * But on a standby we cannot do WAL writes, so just use the replay + * pointer; effectively, an attempt to create a logical slot on standby + * will cause it to wait for an xl_running_xact record to be logged + * independently on the primary, so that a snapshot can be built using the + * record. + * + * None of this is needed (or indeed helpful) for physical slots as + * they'll start replay at the last logged checkpoint anyway. Instead, + * return the location of the last redo LSN, where a base backup has to + * start replay at. + */ + if (SlotIsPhysical(slot)) + restart_lsn = GetRedoRecPtr(); + else if (RecoveryInProgress()) + restart_lsn = GetXLogReplayRecPtr(NULL); + else + restart_lsn = GetXLogInsertRecPtr(); - SpinLockAcquire(&slot->mutex); - slot->data.restart_lsn = restart_lsn; - SpinLockRelease(&slot->mutex); + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = restart_lsn; + SpinLockRelease(&slot->mutex); - /* prevent WAL removal as fast as possible */ - ReplicationSlotsComputeRequiredLSN(); + /* prevent WAL removal as fast as possible */ + ReplicationSlotsComputeRequiredLSN(); - /* - * If all required WAL is still there, great, otherwise retry. The - * slot should prevent further removal of WAL, unless there's a - * concurrent ReplicationSlotsComputeRequiredLSN() after we've written - * the new restart_lsn above, so normally we should never need to loop - * more than twice. - */ - XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size); - if (XLogGetLastRemovedSegno() < segno) - break; - } + /* Checkpoint shouldn't remove the required WAL. */ + XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size); + if (XLogGetLastRemovedSegno() >= segno) + elog(ERROR, "WAL required by replication slot %s has been removed concurrently", + NameStr(slot->data.name)); + + LWLockRelease(ReplicationSlotAllocationLock); if (!RecoveryInProgress() && SlotIsLogical(slot)) {