From 8f1e2dfe033e9a3236265c3b9f61bd226f4a8f54 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Wed, 28 Jan 2026 12:26:22 -0800 Subject: [PATCH] Consolidate replication origin session globals into a single struct. This commit moves the separate global variables for replication origin state into a single ReplOriginXactState struct. This groups logically related variables, which improves code readability and simplifies state management (e.g., resetting the state) by handling them as a unit. Author: Chao Li Suggested-by: Ashutosh Bapat Reviewed-by: Masahiko Sawada Reviewed-by: Ashutosh Bapat Discussion: https://postgr.es/m/CAEoWx2=pYvfRthXHTzSrOsf5_FfyY4zJyK4zV2v4W=yjUij1cA@mail.gmail.com --- src/backend/access/transam/twophase.c | 32 ++++++++--------- src/backend/access/transam/xact.c | 36 +++++++++---------- src/backend/access/transam/xloginsert.c | 6 ++-- .../replication/logical/applyparallelworker.c | 6 ++-- src/backend/replication/logical/origin.c | 26 +++++++------- src/backend/replication/logical/tablesync.c | 4 +-- src/backend/replication/logical/worker.c | 28 +++++++-------- src/include/replication/origin.h | 11 ++++-- src/tools/pgindent/typedefs.list | 1 + 9 files changed, 79 insertions(+), 71 deletions(-) diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 71dd3a9a7ef..601ce3faa64 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1157,13 +1157,13 @@ EndPrepare(GlobalTransaction gxact) Assert(hdr->magic == TWOPHASE_MAGIC); hdr->total_len = records.total_len + sizeof(pg_crc32c); - replorigin = (replorigin_session_origin != InvalidReplOriginId && - replorigin_session_origin != DoNotReplicateId); + replorigin = (replorigin_xact_state.origin != InvalidReplOriginId && + replorigin_xact_state.origin != DoNotReplicateId); if (replorigin) { - hdr->origin_lsn = replorigin_session_origin_lsn; - hdr->origin_timestamp = replorigin_session_origin_timestamp; + hdr->origin_lsn = replorigin_xact_state.origin_lsn; + hdr->origin_timestamp = replorigin_xact_state.origin_timestamp; } /* @@ -1211,7 +1211,7 @@ EndPrepare(GlobalTransaction gxact) if (replorigin) { /* Move LSNs forward for this replication origin */ - replorigin_session_advance(replorigin_session_origin_lsn, + replorigin_session_advance(replorigin_xact_state.origin_lsn, gxact->prepare_end_lsn); } @@ -2330,8 +2330,8 @@ RecordTransactionCommitPrepared(TransactionId xid, * Are we using the replication origins feature? Or, in other words, are * we replaying remote actions? */ - replorigin = (replorigin_session_origin != InvalidReplOriginId && - replorigin_session_origin != DoNotReplicateId); + replorigin = (replorigin_xact_state.origin != InvalidReplOriginId && + replorigin_xact_state.origin != DoNotReplicateId); /* Load the injection point before entering the critical section */ INJECTION_POINT_LOAD("commit-after-delay-checkpoint"); @@ -2376,23 +2376,23 @@ RecordTransactionCommitPrepared(TransactionId xid, if (replorigin) /* Move LSNs forward for this replication origin */ - replorigin_session_advance(replorigin_session_origin_lsn, + replorigin_session_advance(replorigin_xact_state.origin_lsn, XactLastRecEnd); /* * Record commit timestamp. The value comes from plain commit timestamp * if replorigin is not enabled, or replorigin already set a value for us - * in replorigin_session_origin_timestamp otherwise. + * in replorigin_xact_state.origin_timestamp otherwise. * * We don't need to WAL-log anything here, as the commit record written * above already contains the data. */ - if (!replorigin || replorigin_session_origin_timestamp == 0) - replorigin_session_origin_timestamp = committs; + if (!replorigin || replorigin_xact_state.origin_timestamp == 0) + replorigin_xact_state.origin_timestamp = committs; TransactionTreeSetCommitTsData(xid, nchildren, children, - replorigin_session_origin_timestamp, - replorigin_session_origin); + replorigin_xact_state.origin_timestamp, + replorigin_xact_state.origin); /* * We don't currently try to sleep before flush here ... nor is there any @@ -2445,8 +2445,8 @@ RecordTransactionAbortPrepared(TransactionId xid, * Are we using the replication origins feature? Or, in other words, are * we replaying remote actions? */ - replorigin = (replorigin_session_origin != InvalidReplOriginId && - replorigin_session_origin != DoNotReplicateId); + replorigin = (replorigin_xact_state.origin != InvalidReplOriginId && + replorigin_xact_state.origin != DoNotReplicateId); /* * Catch the scenario where we aborted partway through @@ -2472,7 +2472,7 @@ RecordTransactionAbortPrepared(TransactionId xid, if (replorigin) /* Move LSNs forward for this replication origin */ - replorigin_session_advance(replorigin_session_origin_lsn, + replorigin_session_advance(replorigin_xact_state.origin_lsn, XactLastRecEnd); /* Always flush, since we're about to remove the 2PC state file */ diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 12a1505b539..eba4f063168 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1413,8 +1413,8 @@ RecordTransactionCommit(void) * Are we using the replication origins feature? Or, in other words, * are we replaying remote actions? */ - replorigin = (replorigin_session_origin != InvalidReplOriginId && - replorigin_session_origin != DoNotReplicateId); + replorigin = (replorigin_xact_state.origin != InvalidReplOriginId && + replorigin_xact_state.origin != DoNotReplicateId); /* * Mark ourselves as within our "commit critical section". This @@ -1462,25 +1462,25 @@ RecordTransactionCommit(void) if (replorigin) /* Move LSNs forward for this replication origin */ - replorigin_session_advance(replorigin_session_origin_lsn, + replorigin_session_advance(replorigin_xact_state.origin_lsn, XactLastRecEnd); /* * Record commit timestamp. The value comes from plain commit * timestamp if there's no replication origin; otherwise, the - * timestamp was already set in replorigin_session_origin_timestamp by - * replication. + * timestamp was already set in replorigin_xact_state.origin_timestamp + * by replication. * * We don't need to WAL-log anything here, as the commit record * written above already contains the data. */ - if (!replorigin || replorigin_session_origin_timestamp == 0) - replorigin_session_origin_timestamp = GetCurrentTransactionStopTimestamp(); + if (!replorigin || replorigin_xact_state.origin_timestamp == 0) + replorigin_xact_state.origin_timestamp = GetCurrentTransactionStopTimestamp(); TransactionTreeSetCommitTsData(xid, nchildren, children, - replorigin_session_origin_timestamp, - replorigin_session_origin); + replorigin_xact_state.origin_timestamp, + replorigin_xact_state.origin); } /* @@ -1810,8 +1810,8 @@ RecordTransactionAbort(bool isSubXact) * Are we using the replication origins feature? Or, in other words, are * we replaying remote actions? */ - replorigin = (replorigin_session_origin != InvalidReplOriginId && - replorigin_session_origin != DoNotReplicateId); + replorigin = (replorigin_xact_state.origin != InvalidReplOriginId && + replorigin_xact_state.origin != DoNotReplicateId); /* Fetch the data we need for the abort record */ nrels = smgrGetPendingDeletes(false, &rels); @@ -1838,7 +1838,7 @@ RecordTransactionAbort(bool isSubXact) if (replorigin) /* Move LSNs forward for this replication origin */ - replorigin_session_advance(replorigin_session_origin_lsn, + replorigin_session_advance(replorigin_xact_state.origin_lsn, XactLastRecEnd); /* @@ -5928,12 +5928,12 @@ XactLogCommitRecord(TimestampTz commit_time, } /* dump transaction origin information */ - if (replorigin_session_origin != InvalidReplOriginId) + if (replorigin_xact_state.origin != InvalidReplOriginId) { xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN; - xl_origin.origin_lsn = replorigin_session_origin_lsn; - xl_origin.origin_timestamp = replorigin_session_origin_timestamp; + xl_origin.origin_lsn = replorigin_xact_state.origin_lsn; + xl_origin.origin_timestamp = replorigin_xact_state.origin_timestamp; } if (xl_xinfo.xinfo != 0) @@ -6081,12 +6081,12 @@ XactLogAbortRecord(TimestampTz abort_time, * Dump transaction origin information. We need this during recovery to * update the replication origin progress. */ - if (replorigin_session_origin != InvalidReplOriginId) + if (replorigin_xact_state.origin != InvalidReplOriginId) { xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN; - xl_origin.origin_lsn = replorigin_session_origin_lsn; - xl_origin.origin_timestamp = replorigin_session_origin_timestamp; + xl_origin.origin_lsn = replorigin_xact_state.origin_lsn; + xl_origin.origin_timestamp = replorigin_xact_state.origin_timestamp; } if (xl_xinfo.xinfo != 0) diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index e2db6adc382..d3acaa636c3 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -861,11 +861,11 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, /* followed by the record's origin, if any */ if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) && - replorigin_session_origin != InvalidReplOriginId) + replorigin_xact_state.origin != InvalidReplOriginId) { *(scratch++) = (char) XLR_BLOCK_ID_ORIGIN; - memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin)); - scratch += sizeof(replorigin_session_origin); + memcpy(scratch, &replorigin_xact_state.origin, sizeof(replorigin_xact_state.origin)); + scratch += sizeof(replorigin_xact_state.origin); } /* followed by toplevel XID, if not already included in previous record */ diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 2f5d74180dc..8a01f16a2ca 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -962,7 +962,7 @@ ParallelApplyWorkerMain(Datum main_arg) * origin which was already acquired by its leader process. */ replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid); - replorigin_session_origin = originid; + replorigin_xact_state.origin = originid; CommitTransactionCommand(); /* @@ -1430,8 +1430,8 @@ pa_stream_abort(LogicalRepStreamAbortData *abort_data) * Update origin state so we can restart streaming from correct position * in case of crash. */ - replorigin_session_origin_lsn = abort_data->abort_lsn; - replorigin_session_origin_timestamp = abort_data->abort_time; + replorigin_xact_state.origin_lsn = abort_data->abort_lsn; + replorigin_xact_state.origin_timestamp = abort_data->abort_time; /* * If the two XIDs are the same, it's in fact abort of toplevel xact, so diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 7704cc5ff1b..c3271a6fd0e 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -162,10 +162,12 @@ typedef struct ReplicationStateCtl ReplicationState states[FLEXIBLE_ARRAY_MEMBER]; } ReplicationStateCtl; -/* external variables */ -ReplOriginId replorigin_session_origin = InvalidReplOriginId; /* assumed identity */ -XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr; -TimestampTz replorigin_session_origin_timestamp = 0; +/* Global variable for per-transaction replication origin state */ +ReplOriginXactState replorigin_xact_state = { + .origin = InvalidReplOriginId, /* assumed identity */ + .origin_lsn = InvalidXLogRecPtr, + .origin_timestamp = 0 +}; /* * Base address into a shared memory array of replication states of size @@ -902,7 +904,7 @@ replorigin_redo(XLogReaderState *record) * Tell the replication origin progress machinery that a commit from 'node' * that originated at the LSN remote_commit on the remote node was replayed * successfully and that we don't need to do so again. In combination with - * setting up replorigin_session_origin_lsn and replorigin_session_origin + * setting up replorigin_xact_state {.origin_lsn, .origin_timestamp} * that ensures we won't lose knowledge about that after a crash if the * transaction had a persistent effect (think of asynchronous commits). * @@ -1349,10 +1351,10 @@ replorigin_session_get_progress(bool flush) void replorigin_xact_clear(bool clear_origin) { - replorigin_session_origin_lsn = InvalidXLogRecPtr; - replorigin_session_origin_timestamp = 0; + replorigin_xact_state.origin_lsn = InvalidXLogRecPtr; + replorigin_xact_state.origin_timestamp = 0; if (clear_origin) - replorigin_session_origin = InvalidReplOriginId; + replorigin_xact_state.origin = InvalidReplOriginId; } @@ -1462,7 +1464,7 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS) pid = PG_GETARG_INT32(1); replorigin_session_setup(origin, pid); - replorigin_session_origin = origin; + replorigin_xact_state.origin = origin; pfree(name); @@ -1492,7 +1494,7 @@ pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS) { replorigin_check_prerequisites(false, false); - PG_RETURN_BOOL(replorigin_session_origin != InvalidReplOriginId); + PG_RETURN_BOOL(replorigin_xact_state.origin != InvalidReplOriginId); } @@ -1536,8 +1538,8 @@ pg_replication_origin_xact_setup(PG_FUNCTION_ARGS) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("no replication origin is configured"))); - replorigin_session_origin_lsn = location; - replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1); + replorigin_xact_state.origin_lsn = location; + replorigin_xact_state.origin_timestamp = PG_GETARG_TIMESTAMPTZ(1); PG_RETURN_VOID(); } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 15a59759645..19a3c21a863 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1318,7 +1318,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) */ originid = replorigin_by_name(originname, false); replorigin_session_setup(originid, 0); - replorigin_session_origin = originid; + replorigin_xact_state.origin = originid; *origin_startpos = replorigin_session_get_progress(false); CommitTransactionCommand(); @@ -1405,7 +1405,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); replorigin_session_setup(originid, 0); - replorigin_session_origin = originid; + replorigin_xact_state.origin = originid; /* * If the user did not opt to run as the owner of the subscription diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 23996a83353..32725c48623 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1318,8 +1318,8 @@ apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data) * Update origin state so we can restart streaming from correct position * in case of crash. */ - replorigin_session_origin_lsn = prepare_data->end_lsn; - replorigin_session_origin_timestamp = prepare_data->prepare_time; + replorigin_xact_state.origin_lsn = prepare_data->end_lsn; + replorigin_xact_state.origin_timestamp = prepare_data->prepare_time; PrepareTransactionBlock(gid); } @@ -1421,8 +1421,8 @@ apply_handle_commit_prepared(StringInfo s) * Update origin state so we can restart streaming from correct position * in case of crash. */ - replorigin_session_origin_lsn = prepare_data.end_lsn; - replorigin_session_origin_timestamp = prepare_data.commit_time; + replorigin_xact_state.origin_lsn = prepare_data.end_lsn; + replorigin_xact_state.origin_timestamp = prepare_data.commit_time; FinishPreparedTransaction(gid, true); end_replication_step(); @@ -1479,8 +1479,8 @@ apply_handle_rollback_prepared(StringInfo s) * Update origin state so we can restart streaming from correct * position in case of crash. */ - replorigin_session_origin_lsn = rollback_data.rollback_end_lsn; - replorigin_session_origin_timestamp = rollback_data.rollback_time; + replorigin_xact_state.origin_lsn = rollback_data.rollback_end_lsn; + replorigin_xact_state.origin_timestamp = rollback_data.rollback_time; /* There is no transaction when ABORT/ROLLBACK PREPARED is called */ begin_replication_step(); @@ -2526,8 +2526,8 @@ apply_handle_commit_internal(LogicalRepCommitData *commit_data) * Update origin state so we can restart streaming from correct * position in case of crash. */ - replorigin_session_origin_lsn = commit_data->end_lsn; - replorigin_session_origin_timestamp = commit_data->committime; + replorigin_xact_state.origin_lsn = commit_data->end_lsn; + replorigin_xact_state.origin_timestamp = commit_data->committime; CommitTransactionCommand(); @@ -2940,7 +2940,7 @@ apply_handle_update_internal(ApplyExecutionData *edata, */ if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin, &conflicttuple.origin, &conflicttuple.ts) && - conflicttuple.origin != replorigin_session_origin) + conflicttuple.origin != replorigin_xact_state.origin) { TupleTableSlot *newslot; @@ -2982,7 +2982,7 @@ apply_handle_update_internal(ApplyExecutionData *edata, &conflicttuple.xmin, &conflicttuple.origin, &conflicttuple.ts) && - conflicttuple.origin != replorigin_session_origin) + conflicttuple.origin != replorigin_xact_state.origin) type = CT_UPDATE_DELETED; else type = CT_UPDATE_MISSING; @@ -3135,7 +3135,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata, */ if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin, &conflicttuple.origin, &conflicttuple.ts) && - conflicttuple.origin != replorigin_session_origin) + conflicttuple.origin != replorigin_xact_state.origin) { conflicttuple.slot = localslot; ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS, @@ -3477,7 +3477,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, &conflicttuple.xmin, &conflicttuple.origin, &conflicttuple.ts) && - conflicttuple.origin != replorigin_session_origin) + conflicttuple.origin != replorigin_xact_state.origin) type = CT_UPDATE_DELETED; else type = CT_UPDATE_MISSING; @@ -3503,7 +3503,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin, &conflicttuple.origin, &conflicttuple.ts) && - conflicttuple.origin != replorigin_session_origin) + conflicttuple.origin != replorigin_xact_state.origin) { TupleTableSlot *newslot; @@ -5652,7 +5652,7 @@ run_apply_worker(void) if (!OidIsValid(originid)) originid = replorigin_create(originname); replorigin_session_setup(originid, 0); - replorigin_session_origin = originid; + replorigin_xact_state.origin = originid; origin_startpos = replorigin_session_get_progress(false); CommitTransactionCommand(); diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h index bc0a16ecc7e..eb46b41b4b7 100644 --- a/src/include/replication/origin.h +++ b/src/include/replication/origin.h @@ -40,9 +40,14 @@ typedef struct xl_replorigin_drop */ #define MAX_RONAME_LEN 512 -extern PGDLLIMPORT ReplOriginId replorigin_session_origin; -extern PGDLLIMPORT XLogRecPtr replorigin_session_origin_lsn; -extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp; +typedef struct ReplOriginXactState +{ + ReplOriginId origin; + XLogRecPtr origin_lsn; + TimestampTz origin_timestamp; +} ReplOriginXactState; + +extern PGDLLIMPORT ReplOriginXactState replorigin_xact_state; /* GUCs */ extern PGDLLIMPORT int max_active_replication_origins; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 7ad16c8ad23..34374df0d67 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2577,6 +2577,7 @@ ReorderBufferUpdateProgressTxnCB ReorderTuple ReparameterizeForeignPathByChild_function ReplOriginId +ReplOriginXactState ReplaceVarsFromTargetList_context ReplaceVarsNoMatchOption ReplaceWrapOption