diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 6d2c4a86b96..fd1c36d061d 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -2015,7 +2015,7 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl if (doRequestWalReceiverReply) { doRequestWalReceiverReply = false; - WalRcvForceReply(); + WalRcvRequestApplyReply(); } /* Allow read-only connections if we're consistent now */ @@ -3970,7 +3970,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, */ if (!streaming_reply_sent) { - WalRcvForceReply(); + WalRcvRequestApplyReply(); streaming_reply_sent = true; } diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index fabe3c73034..a437273cf9a 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -143,7 +143,7 @@ static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli); static void XLogWalRcvFlush(bool dying, TimeLineID tli); static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli); -static void XLogWalRcvSendReply(bool force, bool requestReply); +static void XLogWalRcvSendReply(bool force, bool requestReply, bool checkApply); static void XLogWalRcvSendHSFeedback(bool immed); static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now); @@ -417,7 +417,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) WalRcvComputeNextWakeup(i, now); /* Send initial reply/feedback messages. */ - XLogWalRcvSendReply(true, false); + XLogWalRcvSendReply(true, false, false); XLogWalRcvSendHSFeedback(true); /* Loop until end-of-streaming or error */ @@ -493,7 +493,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) } /* Let the primary know that we received some data. */ - XLogWalRcvSendReply(false, false); + XLogWalRcvSendReply(false, false, false); /* * If we've written some records, flush them to disk and @@ -539,7 +539,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); - if (walrcv->force_reply) + if (walrcv->apply_reply_requested) { /* * The recovery process has asked us to send apply @@ -547,9 +547,9 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) * false in shared memory before sending the reply, so * we don't miss a new request for a reply. */ - walrcv->force_reply = false; + walrcv->apply_reply_requested = false; pg_memory_barrier(); - XLogWalRcvSendReply(true, false); + XLogWalRcvSendReply(false, false, true); } } if (rc & WL_TIMEOUT) @@ -595,7 +595,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) wakeup[WALRCV_WAKEUP_PING] = TIMESTAMP_INFINITY; } - XLogWalRcvSendReply(requestReply, requestReply); + XLogWalRcvSendReply(requestReply, requestReply, false); XLogWalRcvSendHSFeedback(false); } } @@ -886,7 +886,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli) /* If the primary requested a reply, send one immediately */ if (replyRequested) - XLogWalRcvSendReply(true, false); + XLogWalRcvSendReply(true, false, false); break; } default: @@ -1053,7 +1053,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli) /* Also let the primary know that we made some progress */ if (!dying) { - XLogWalRcvSendReply(false, false); + XLogWalRcvSendReply(false, false, false); XLogWalRcvSendHSFeedback(false); } } @@ -1107,24 +1107,35 @@ XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli) } /* - * Send reply message to primary, indicating our current WAL locations, oldest - * xmin and the current time. + * Send reply message to primary, indicating our current WAL locations and + * time. * - * If 'force' is not set, the message is only sent if enough time has - * passed since last status update to reach wal_receiver_status_interval. - * If wal_receiver_status_interval is disabled altogether and 'force' is - * false, this is a no-op. + * The message is sent if 'force' is set, if enough time has passed since the + * last update to reach wal_receiver_status_interval, or if WAL locations have + * advanced since the previous status update. If wal_receiver_status_interval + * is disabled and 'force' is false, this function does nothing. Set 'force' to + * send the message unconditionally. + * + * Whether WAL locations are considered "advanced" depends on 'checkApply'. + * If 'checkApply' is false, only the write and flush locations are checked. + * This should be used when the call is triggered by write/flush activity + * (e.g., after walreceiver writes or flushes WAL), and avoids the + * apply-location check, which requires a spinlock. If 'checkApply' is true, + * the apply location is also considered. This should be used when the apply + * location is expected to advance (e.g., when the startup process requests + * an apply notification). * * If 'requestReply' is true, requests the server to reply immediately upon * receiving this message. This is used for heartbeats, when approaching * wal_receiver_timeout. */ static void -XLogWalRcvSendReply(bool force, bool requestReply) +XLogWalRcvSendReply(bool force, bool requestReply, bool checkApply) { static XLogRecPtr writePtr = InvalidXLogRecPtr; static XLogRecPtr flushPtr = InvalidXLogRecPtr; - XLogRecPtr applyPtr; + static XLogRecPtr applyPtr = InvalidXLogRecPtr; + XLogRecPtr latestApplyPtr = InvalidXLogRecPtr; TimestampTz now; /* @@ -1140,17 +1151,19 @@ XLogWalRcvSendReply(bool force, bool requestReply) /* * We can compare the write and flush positions to the last message we * sent without taking any lock, but the apply position requires a spin - * lock, so we don't check that unless something else has changed or 10 - * seconds have passed. This means that the apply WAL location will - * appear, from the primary's point of view, to lag slightly, but since - * this is only for reporting purposes and only on idle systems, that's - * probably OK. + * lock, so we don't check that unless it is expected to advance since the + * previous update, i.e., when 'checkApply' is true. */ - if (!force - && writePtr == LogstreamResult.Write - && flushPtr == LogstreamResult.Flush - && now < wakeup[WALRCV_WAKEUP_REPLY]) - return; + if (!force && now < wakeup[WALRCV_WAKEUP_REPLY]) + { + if (checkApply) + latestApplyPtr = GetXLogReplayRecPtr(NULL); + + if (writePtr == LogstreamResult.Write + && flushPtr == LogstreamResult.Flush + && (!checkApply || applyPtr == latestApplyPtr)) + return; + } /* Make sure we wake up when it's time to send another reply. */ WalRcvComputeNextWakeup(WALRCV_WAKEUP_REPLY, now); @@ -1158,7 +1171,8 @@ XLogWalRcvSendReply(bool force, bool requestReply) /* Construct a new message */ writePtr = LogstreamResult.Write; flushPtr = LogstreamResult.Flush; - applyPtr = GetXLogReplayRecPtr(NULL); + applyPtr = (latestApplyPtr == InvalidXLogRecPtr) ? + GetXLogReplayRecPtr(NULL) : latestApplyPtr; resetStringInfo(&reply_message); pq_sendbyte(&reply_message, PqReplMsg_StandbyStatusUpdate); @@ -1378,11 +1392,11 @@ WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now) * synchronous_commit = remote_apply. */ void -WalRcvForceReply(void) +WalRcvRequestApplyReply(void) { ProcNumber procno; - WalRcv->force_reply = true; + WalRcv->apply_reply_requested = true; /* fetching the proc number is probably atomic, but don't rely on it */ SpinLockAcquire(&WalRcv->mutex); procno = WalRcv->procno; diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 9b9bd916314..85d24c87298 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -156,11 +156,11 @@ typedef struct pg_atomic_uint64 writtenUpto; /* - * force walreceiver reply? This doesn't need to be locked; memory + * request walreceiver reply? This doesn't need to be locked; memory * barriers for ordering are sufficient. But we do need atomic fetch and * store semantics, so use sig_atomic_t. */ - sig_atomic_t force_reply; /* used as a bool */ + sig_atomic_t apply_reply_requested; /* used as a bool */ } WalRcvData; extern PGDLLIMPORT WalRcvData *WalRcv; @@ -488,7 +488,7 @@ walrcv_clear_result(WalRcvExecResult *walres) /* prototypes for functions in walreceiver.c */ pg_noreturn extern void WalReceiverMain(const void *startup_data, size_t startup_data_len); -extern void WalRcvForceReply(void); +extern void WalRcvRequestApplyReply(void); /* prototypes for functions in walreceiverfuncs.c */ extern Size WalRcvShmemSize(void);