diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 81dc86847c0..16614e152dd 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -669,7 +669,6 @@ static XLogRecPtr CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn, TimeLineID newTLI); static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags); static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo); -static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void); static void AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic); @@ -2679,7 +2678,7 @@ XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn) * Return the oldest LSN we must retain to satisfy the needs of some * replication slot. */ -static XLogRecPtr +XLogRecPtr XLogGetReplicationSlotMinimumLSN(void) { XLogRecPtr retval; @@ -7902,6 +7901,9 @@ CreateRestartPoint(int flags) replayPtr = GetXLogReplayRecPtr(&replayTLI); endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr; KeepLogSeg(endptr, &_logSegNo); + + INJECTION_POINT("restartpoint-before-slot-invalidation", NULL); + if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED | RS_INVAL_IDLE_TIMEOUT, _logSegNo, InvalidOid, InvalidTransactionId)) diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 73fc51ea53e..1c343d03d21 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -536,70 +536,71 @@ drop_local_obsolete_slots(List *remote_slot_list) * Reserve WAL for the currently active local slot using the specified WAL * location (restart_lsn). * - * If the given WAL location has been removed, reserve WAL using the oldest - * existing WAL segment. + * If the given WAL location has been removed or is at risk of removal, + * reserve WAL using the oldest segment that is non-removable. */ static void reserve_wal_for_local_slot(XLogRecPtr restart_lsn) { - XLogSegNo oldest_segno; + XLogRecPtr slot_min_lsn; + XLogRecPtr min_safe_lsn; XLogSegNo segno; ReplicationSlot *slot = MyReplicationSlot; Assert(slot != NULL); Assert(!XLogRecPtrIsValid(slot->data.restart_lsn)); - while (true) - { - SpinLockAcquire(&slot->mutex); - slot->data.restart_lsn = restart_lsn; - SpinLockRelease(&slot->mutex); + /* + * Acquire an exclusive lock to prevent the checkpoint process from + * concurrently calculating the minimum slot LSN (see + * CheckPointReplicationSlots), ensuring that if WAL reservation occurs + * first, the checkpoint must wait for the restart_lsn update before + * calculating the minimum LSN. + * + * Note: Unlike ReplicationSlotReserveWal(), this lock does not protect a + * newly synced slot from being invalidated if a concurrent checkpoint has + * invoked CheckPointReplicationSlots() before the WAL reservation here. + * This can happen because the initial restart_lsn received from the + * remote server can precede the redo pointer. Therefore, when selecting + * the initial restart_lsn, we consider using the redo pointer or the + * minimum slot LSN (if those values are greater than the remote + * restart_lsn) instead of relying solely on the remote value. + */ + LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE); - /* Prevent WAL removal as fast as possible */ - ReplicationSlotsComputeRequiredLSN(); + /* + * Determine the minimum non-removable LSN by comparing the redo pointer + * with the minimum slot LSN. + * + * The minimum slot LSN is considered because the redo pointer advances at + * every checkpoint, even when replication slots are present on the + * standby. In such scenarios, the redo pointer can exceed the remote + * restart_lsn, while WALs preceding the remote restart_lsn remain + * protected by a local replication slot. + */ + min_safe_lsn = GetRedoRecPtr(); + slot_min_lsn = XLogGetReplicationSlotMinimumLSN(); - XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size); + if (XLogRecPtrIsValid(slot_min_lsn) && min_safe_lsn > slot_min_lsn) + min_safe_lsn = slot_min_lsn; - /* - * Find the oldest existing WAL segment file. - * - * Normally, we can determine it by using the last removed segment - * number. However, if no WAL segment files have been removed by a - * checkpoint since startup, we need to search for the oldest segment - * file from the current timeline existing in XLOGDIR. - * - * XXX: Currently, we are searching for the oldest segment in the - * current timeline as there is less chance of the slot's restart_lsn - * from being some prior timeline, and even if it happens, in the - * worst case, we will wait to sync till the slot's restart_lsn moved - * to the current timeline. - */ - oldest_segno = XLogGetLastRemovedSegno() + 1; + /* + * If the minimum safe LSN is greater than the given restart_lsn, use it + * as the initial restart_lsn for the newly synced slot. Otherwise, use + * the given remote restart_lsn. + */ + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = Max(restart_lsn, min_safe_lsn); + SpinLockRelease(&slot->mutex); - if (oldest_segno == 1) - { - TimeLineID cur_timeline; + ReplicationSlotsComputeRequiredLSN(); - GetWalRcvFlushRecPtr(NULL, &cur_timeline); - oldest_segno = XLogGetOldestSegno(cur_timeline); - } + XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size); + if (XLogGetLastRemovedSegno() >= segno) + elog(ERROR, "WAL required by replication slot %s has been removed concurrently", + NameStr(slot->data.name)); - elog(DEBUG1, "segno: " UINT64_FORMAT " of purposed restart_lsn for the synced slot, oldest_segno: " UINT64_FORMAT " available", - segno, oldest_segno); - - /* - * If all required WAL is still there, great, otherwise retry. The - * slot should prevent further removal of WAL, unless there's a - * concurrent ReplicationSlotsComputeRequiredLSN() after we've written - * the new restart_lsn above, so normally we should never need to loop - * more than twice. - */ - if (segno >= oldest_segno) - break; - - /* Retry using the location of the oldest wal segment */ - XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size, restart_lsn); - } + LWLockRelease(ReplicationSlotAllocationLock); } /* diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 0591a885dd1..fdfb572467b 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -226,6 +226,7 @@ extern XLogSegNo XLogGetLastRemovedSegno(void); extern XLogSegNo XLogGetOldestSegno(TimeLineID tli); extern void XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN); extern void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn); +extern XLogRecPtr XLogGetReplicationSlotMinimumLSN(void); extern void xlog_redo(struct XLogReaderState *record); extern void xlog_desc(StringInfo buf, struct XLogReaderState *record); diff --git a/src/test/recovery/t/046_checkpoint_logical_slot.pl b/src/test/recovery/t/046_checkpoint_logical_slot.pl index 0bf7f024177..65f0205bb77 100644 --- a/src/test/recovery/t/046_checkpoint_logical_slot.pl +++ b/src/test/recovery/t/046_checkpoint_logical_slot.pl @@ -20,8 +20,7 @@ if ($ENV{enable_injection_points} ne 'yes') my ($node, $result); $node = PostgreSQL::Test::Cluster->new('mike'); -$node->init; -$node->append_conf('postgresql.conf', "wal_level = 'logical'"); +$node->init(allows_streaming => 'logical'); $node->start; # Check if the extension injection_points is available, as it may be @@ -139,4 +138,85 @@ eval { }; is($@, '', "Logical slot still valid"); +# Verify that the synchronized slots won't be invalidated immediately after +# synchronization in the presence of a concurrent checkpoint. +my $primary = $node; + +$primary->append_conf('postgresql.conf', "autovacuum = off"); +$primary->reload; + +my $backup_name = 'backup'; + +$primary->backup($backup_name); + +# Create a standby +my $standby = PostgreSQL::Test::Cluster->new('standby'); +$standby->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); + +my $connstr_1 = $primary->connstr; +$standby->append_conf( + 'postgresql.conf', qq( +hot_standby_feedback = on +primary_slot_name = 'phys_slot' +primary_conninfo = '$connstr_1 dbname=postgres' +)); + +$primary->safe_psql('postgres', + q{SELECT pg_create_logical_replication_slot('failover_slot', 'test_decoding', false, false, true); + SELECT pg_create_physical_replication_slot('phys_slot');} +); + +$standby->start; + +# Generate some activity and switch WAL file on the primary +$primary->advance_wal(1); +$primary->safe_psql('postgres', "CHECKPOINT"); +$primary->wait_for_replay_catchup($standby); + +# checkpoint on the standby and make it wait on the injection point so that the +# checkpoint stops right before invalidating replication slots. +note('starting checkpoint'); + +$checkpoint = $standby->background_psql('postgres'); +$checkpoint->query_safe( + q(select injection_points_attach('restartpoint-before-slot-invalidation','wait')) +); +$checkpoint->query_until( + qr/starting_checkpoint/, + q(\echo starting_checkpoint +checkpoint; +)); + +# Wait until the checkpoint stops right before invalidating slots +note('waiting for injection_point'); +$standby->wait_for_event('checkpointer', 'restartpoint-before-slot-invalidation'); +note('injection_point is reached'); + +# Enable slot sync worker to synchronize the failover slot to the standby +$standby->append_conf('postgresql.conf', qq(sync_replication_slots = on)); +$standby->reload; + +# Wait for the slot to be synced +$standby->poll_query_until( + 'postgres', + "SELECT COUNT(*) > 0 FROM pg_replication_slots WHERE slot_name = 'failover_slot'"); + +# Release the checkpointer +$standby->safe_psql('postgres', + q{select injection_points_wakeup('restartpoint-before-slot-invalidation'); + select injection_points_detach('restartpoint-before-slot-invalidation')}); + +$checkpoint->quit; + +# Confirm that the slot is not invalidated +is( $standby->safe_psql( + 'postgres', + q{SELECT invalidation_reason IS NULL AND synced FROM pg_replication_slots WHERE slot_name = 'failover_slot';} + ), + "t", + 'logical slot is not invalidated'); + done_testing();