Consolidate replication origin session globals into a single struct.

This commit moves the separate global variables for replication origin
state into a single ReplOriginXactState struct. This groups logically
related variables, which improves code readability and simplifies
state management (e.g., resetting the state) by handling them as a
unit.

Author: Chao Li <lic@highgo.com>
Suggested-by: Ashutosh Bapat <ashutosh.bapat.oss@gmail.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Ashutosh Bapat <ashutosh.bapat.oss@gmail.com>
Discussion: https://postgr.es/m/CAEoWx2=pYvfRthXHTzSrOsf5_FfyY4zJyK4zV2v4W=yjUij1cA@mail.gmail.com
This commit is contained in:
Masahiko Sawada 2026-01-28 12:26:22 -08:00
parent 227eb4eea2
commit 8f1e2dfe03
9 changed files with 79 additions and 71 deletions

View file

@ -1157,13 +1157,13 @@ EndPrepare(GlobalTransaction gxact)
Assert(hdr->magic == TWOPHASE_MAGIC);
hdr->total_len = records.total_len + sizeof(pg_crc32c);
replorigin = (replorigin_session_origin != InvalidReplOriginId &&
replorigin_session_origin != DoNotReplicateId);
replorigin = (replorigin_xact_state.origin != InvalidReplOriginId &&
replorigin_xact_state.origin != DoNotReplicateId);
if (replorigin)
{
hdr->origin_lsn = replorigin_session_origin_lsn;
hdr->origin_timestamp = replorigin_session_origin_timestamp;
hdr->origin_lsn = replorigin_xact_state.origin_lsn;
hdr->origin_timestamp = replorigin_xact_state.origin_timestamp;
}
/*
@ -1211,7 +1211,7 @@ EndPrepare(GlobalTransaction gxact)
if (replorigin)
{
/* Move LSNs forward for this replication origin */
replorigin_session_advance(replorigin_session_origin_lsn,
replorigin_session_advance(replorigin_xact_state.origin_lsn,
gxact->prepare_end_lsn);
}
@ -2330,8 +2330,8 @@ RecordTransactionCommitPrepared(TransactionId xid,
* Are we using the replication origins feature? Or, in other words, are
* we replaying remote actions?
*/
replorigin = (replorigin_session_origin != InvalidReplOriginId &&
replorigin_session_origin != DoNotReplicateId);
replorigin = (replorigin_xact_state.origin != InvalidReplOriginId &&
replorigin_xact_state.origin != DoNotReplicateId);
/* Load the injection point before entering the critical section */
INJECTION_POINT_LOAD("commit-after-delay-checkpoint");
@ -2376,23 +2376,23 @@ RecordTransactionCommitPrepared(TransactionId xid,
if (replorigin)
/* Move LSNs forward for this replication origin */
replorigin_session_advance(replorigin_session_origin_lsn,
replorigin_session_advance(replorigin_xact_state.origin_lsn,
XactLastRecEnd);
/*
* Record commit timestamp. The value comes from plain commit timestamp
* if replorigin is not enabled, or replorigin already set a value for us
* in replorigin_session_origin_timestamp otherwise.
* in replorigin_xact_state.origin_timestamp otherwise.
*
* We don't need to WAL-log anything here, as the commit record written
* above already contains the data.
*/
if (!replorigin || replorigin_session_origin_timestamp == 0)
replorigin_session_origin_timestamp = committs;
if (!replorigin || replorigin_xact_state.origin_timestamp == 0)
replorigin_xact_state.origin_timestamp = committs;
TransactionTreeSetCommitTsData(xid, nchildren, children,
replorigin_session_origin_timestamp,
replorigin_session_origin);
replorigin_xact_state.origin_timestamp,
replorigin_xact_state.origin);
/*
* We don't currently try to sleep before flush here ... nor is there any
@ -2445,8 +2445,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
* Are we using the replication origins feature? Or, in other words, are
* we replaying remote actions?
*/
replorigin = (replorigin_session_origin != InvalidReplOriginId &&
replorigin_session_origin != DoNotReplicateId);
replorigin = (replorigin_xact_state.origin != InvalidReplOriginId &&
replorigin_xact_state.origin != DoNotReplicateId);
/*
* Catch the scenario where we aborted partway through
@ -2472,7 +2472,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
if (replorigin)
/* Move LSNs forward for this replication origin */
replorigin_session_advance(replorigin_session_origin_lsn,
replorigin_session_advance(replorigin_xact_state.origin_lsn,
XactLastRecEnd);
/* Always flush, since we're about to remove the 2PC state file */

View file

@ -1413,8 +1413,8 @@ RecordTransactionCommit(void)
* Are we using the replication origins feature? Or, in other words,
* are we replaying remote actions?
*/
replorigin = (replorigin_session_origin != InvalidReplOriginId &&
replorigin_session_origin != DoNotReplicateId);
replorigin = (replorigin_xact_state.origin != InvalidReplOriginId &&
replorigin_xact_state.origin != DoNotReplicateId);
/*
* Mark ourselves as within our "commit critical section". This
@ -1462,25 +1462,25 @@ RecordTransactionCommit(void)
if (replorigin)
/* Move LSNs forward for this replication origin */
replorigin_session_advance(replorigin_session_origin_lsn,
replorigin_session_advance(replorigin_xact_state.origin_lsn,
XactLastRecEnd);
/*
* Record commit timestamp. The value comes from plain commit
* timestamp if there's no replication origin; otherwise, the
* timestamp was already set in replorigin_session_origin_timestamp by
* replication.
* timestamp was already set in replorigin_xact_state.origin_timestamp
* by replication.
*
* We don't need to WAL-log anything here, as the commit record
* written above already contains the data.
*/
if (!replorigin || replorigin_session_origin_timestamp == 0)
replorigin_session_origin_timestamp = GetCurrentTransactionStopTimestamp();
if (!replorigin || replorigin_xact_state.origin_timestamp == 0)
replorigin_xact_state.origin_timestamp = GetCurrentTransactionStopTimestamp();
TransactionTreeSetCommitTsData(xid, nchildren, children,
replorigin_session_origin_timestamp,
replorigin_session_origin);
replorigin_xact_state.origin_timestamp,
replorigin_xact_state.origin);
}
/*
@ -1810,8 +1810,8 @@ RecordTransactionAbort(bool isSubXact)
* Are we using the replication origins feature? Or, in other words, are
* we replaying remote actions?
*/
replorigin = (replorigin_session_origin != InvalidReplOriginId &&
replorigin_session_origin != DoNotReplicateId);
replorigin = (replorigin_xact_state.origin != InvalidReplOriginId &&
replorigin_xact_state.origin != DoNotReplicateId);
/* Fetch the data we need for the abort record */
nrels = smgrGetPendingDeletes(false, &rels);
@ -1838,7 +1838,7 @@ RecordTransactionAbort(bool isSubXact)
if (replorigin)
/* Move LSNs forward for this replication origin */
replorigin_session_advance(replorigin_session_origin_lsn,
replorigin_session_advance(replorigin_xact_state.origin_lsn,
XactLastRecEnd);
/*
@ -5928,12 +5928,12 @@ XactLogCommitRecord(TimestampTz commit_time,
}
/* dump transaction origin information */
if (replorigin_session_origin != InvalidReplOriginId)
if (replorigin_xact_state.origin != InvalidReplOriginId)
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
xl_origin.origin_lsn = replorigin_session_origin_lsn;
xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
xl_origin.origin_lsn = replorigin_xact_state.origin_lsn;
xl_origin.origin_timestamp = replorigin_xact_state.origin_timestamp;
}
if (xl_xinfo.xinfo != 0)
@ -6081,12 +6081,12 @@ XactLogAbortRecord(TimestampTz abort_time,
* Dump transaction origin information. We need this during recovery to
* update the replication origin progress.
*/
if (replorigin_session_origin != InvalidReplOriginId)
if (replorigin_xact_state.origin != InvalidReplOriginId)
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
xl_origin.origin_lsn = replorigin_session_origin_lsn;
xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
xl_origin.origin_lsn = replorigin_xact_state.origin_lsn;
xl_origin.origin_timestamp = replorigin_xact_state.origin_timestamp;
}
if (xl_xinfo.xinfo != 0)

View file

@ -861,11 +861,11 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
/* followed by the record's origin, if any */
if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
replorigin_session_origin != InvalidReplOriginId)
replorigin_xact_state.origin != InvalidReplOriginId)
{
*(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
scratch += sizeof(replorigin_session_origin);
memcpy(scratch, &replorigin_xact_state.origin, sizeof(replorigin_xact_state.origin));
scratch += sizeof(replorigin_xact_state.origin);
}
/* followed by toplevel XID, if not already included in previous record */

View file

@ -962,7 +962,7 @@ ParallelApplyWorkerMain(Datum main_arg)
* origin which was already acquired by its leader process.
*/
replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
replorigin_session_origin = originid;
replorigin_xact_state.origin = originid;
CommitTransactionCommand();
/*
@ -1430,8 +1430,8 @@ pa_stream_abort(LogicalRepStreamAbortData *abort_data)
* Update origin state so we can restart streaming from correct position
* in case of crash.
*/
replorigin_session_origin_lsn = abort_data->abort_lsn;
replorigin_session_origin_timestamp = abort_data->abort_time;
replorigin_xact_state.origin_lsn = abort_data->abort_lsn;
replorigin_xact_state.origin_timestamp = abort_data->abort_time;
/*
* If the two XIDs are the same, it's in fact abort of toplevel xact, so

View file

@ -162,10 +162,12 @@ typedef struct ReplicationStateCtl
ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
} ReplicationStateCtl;
/* external variables */
ReplOriginId replorigin_session_origin = InvalidReplOriginId; /* assumed identity */
XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr;
TimestampTz replorigin_session_origin_timestamp = 0;
/* Global variable for per-transaction replication origin state */
ReplOriginXactState replorigin_xact_state = {
.origin = InvalidReplOriginId, /* assumed identity */
.origin_lsn = InvalidXLogRecPtr,
.origin_timestamp = 0
};
/*
* Base address into a shared memory array of replication states of size
@ -902,7 +904,7 @@ replorigin_redo(XLogReaderState *record)
* Tell the replication origin progress machinery that a commit from 'node'
* that originated at the LSN remote_commit on the remote node was replayed
* successfully and that we don't need to do so again. In combination with
* setting up replorigin_session_origin_lsn and replorigin_session_origin
* setting up replorigin_xact_state {.origin_lsn, .origin_timestamp}
* that ensures we won't lose knowledge about that after a crash if the
* transaction had a persistent effect (think of asynchronous commits).
*
@ -1349,10 +1351,10 @@ replorigin_session_get_progress(bool flush)
void
replorigin_xact_clear(bool clear_origin)
{
replorigin_session_origin_lsn = InvalidXLogRecPtr;
replorigin_session_origin_timestamp = 0;
replorigin_xact_state.origin_lsn = InvalidXLogRecPtr;
replorigin_xact_state.origin_timestamp = 0;
if (clear_origin)
replorigin_session_origin = InvalidReplOriginId;
replorigin_xact_state.origin = InvalidReplOriginId;
}
@ -1462,7 +1464,7 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
pid = PG_GETARG_INT32(1);
replorigin_session_setup(origin, pid);
replorigin_session_origin = origin;
replorigin_xact_state.origin = origin;
pfree(name);
@ -1492,7 +1494,7 @@ pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
{
replorigin_check_prerequisites(false, false);
PG_RETURN_BOOL(replorigin_session_origin != InvalidReplOriginId);
PG_RETURN_BOOL(replorigin_xact_state.origin != InvalidReplOriginId);
}
@ -1536,8 +1538,8 @@ pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("no replication origin is configured")));
replorigin_session_origin_lsn = location;
replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
replorigin_xact_state.origin_lsn = location;
replorigin_xact_state.origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
PG_RETURN_VOID();
}

View file

@ -1318,7 +1318,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
*/
originid = replorigin_by_name(originname, false);
replorigin_session_setup(originid, 0);
replorigin_session_origin = originid;
replorigin_xact_state.origin = originid;
*origin_startpos = replorigin_session_get_progress(false);
CommitTransactionCommand();
@ -1405,7 +1405,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
replorigin_session_setup(originid, 0);
replorigin_session_origin = originid;
replorigin_xact_state.origin = originid;
/*
* If the user did not opt to run as the owner of the subscription

View file

@ -1318,8 +1318,8 @@ apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
* Update origin state so we can restart streaming from correct position
* in case of crash.
*/
replorigin_session_origin_lsn = prepare_data->end_lsn;
replorigin_session_origin_timestamp = prepare_data->prepare_time;
replorigin_xact_state.origin_lsn = prepare_data->end_lsn;
replorigin_xact_state.origin_timestamp = prepare_data->prepare_time;
PrepareTransactionBlock(gid);
}
@ -1421,8 +1421,8 @@ apply_handle_commit_prepared(StringInfo s)
* Update origin state so we can restart streaming from correct position
* in case of crash.
*/
replorigin_session_origin_lsn = prepare_data.end_lsn;
replorigin_session_origin_timestamp = prepare_data.commit_time;
replorigin_xact_state.origin_lsn = prepare_data.end_lsn;
replorigin_xact_state.origin_timestamp = prepare_data.commit_time;
FinishPreparedTransaction(gid, true);
end_replication_step();
@ -1479,8 +1479,8 @@ apply_handle_rollback_prepared(StringInfo s)
* Update origin state so we can restart streaming from correct
* position in case of crash.
*/
replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
replorigin_session_origin_timestamp = rollback_data.rollback_time;
replorigin_xact_state.origin_lsn = rollback_data.rollback_end_lsn;
replorigin_xact_state.origin_timestamp = rollback_data.rollback_time;
/* There is no transaction when ABORT/ROLLBACK PREPARED is called */
begin_replication_step();
@ -2526,8 +2526,8 @@ apply_handle_commit_internal(LogicalRepCommitData *commit_data)
* Update origin state so we can restart streaming from correct
* position in case of crash.
*/
replorigin_session_origin_lsn = commit_data->end_lsn;
replorigin_session_origin_timestamp = commit_data->committime;
replorigin_xact_state.origin_lsn = commit_data->end_lsn;
replorigin_xact_state.origin_timestamp = commit_data->committime;
CommitTransactionCommand();
@ -2940,7 +2940,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
*/
if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
&conflicttuple.origin, &conflicttuple.ts) &&
conflicttuple.origin != replorigin_session_origin)
conflicttuple.origin != replorigin_xact_state.origin)
{
TupleTableSlot *newslot;
@ -2982,7 +2982,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
&conflicttuple.xmin,
&conflicttuple.origin,
&conflicttuple.ts) &&
conflicttuple.origin != replorigin_session_origin)
conflicttuple.origin != replorigin_xact_state.origin)
type = CT_UPDATE_DELETED;
else
type = CT_UPDATE_MISSING;
@ -3135,7 +3135,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
*/
if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
&conflicttuple.origin, &conflicttuple.ts) &&
conflicttuple.origin != replorigin_session_origin)
conflicttuple.origin != replorigin_xact_state.origin)
{
conflicttuple.slot = localslot;
ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
@ -3477,7 +3477,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
&conflicttuple.xmin,
&conflicttuple.origin,
&conflicttuple.ts) &&
conflicttuple.origin != replorigin_session_origin)
conflicttuple.origin != replorigin_xact_state.origin)
type = CT_UPDATE_DELETED;
else
type = CT_UPDATE_MISSING;
@ -3503,7 +3503,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
&conflicttuple.origin,
&conflicttuple.ts) &&
conflicttuple.origin != replorigin_session_origin)
conflicttuple.origin != replorigin_xact_state.origin)
{
TupleTableSlot *newslot;
@ -5652,7 +5652,7 @@ run_apply_worker(void)
if (!OidIsValid(originid))
originid = replorigin_create(originname);
replorigin_session_setup(originid, 0);
replorigin_session_origin = originid;
replorigin_xact_state.origin = originid;
origin_startpos = replorigin_session_get_progress(false);
CommitTransactionCommand();

View file

@ -40,9 +40,14 @@ typedef struct xl_replorigin_drop
*/
#define MAX_RONAME_LEN 512
extern PGDLLIMPORT ReplOriginId replorigin_session_origin;
extern PGDLLIMPORT XLogRecPtr replorigin_session_origin_lsn;
extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp;
typedef struct ReplOriginXactState
{
ReplOriginId origin;
XLogRecPtr origin_lsn;
TimestampTz origin_timestamp;
} ReplOriginXactState;
extern PGDLLIMPORT ReplOriginXactState replorigin_xact_state;
/* GUCs */
extern PGDLLIMPORT int max_active_replication_origins;

View file

@ -2577,6 +2577,7 @@ ReorderBufferUpdateProgressTxnCB
ReorderTuple
ReparameterizeForeignPathByChild_function
ReplOriginId
ReplOriginXactState
ReplaceVarsFromTargetList_context
ReplaceVarsNoMatchOption
ReplaceWrapOption