diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index c09ddbc7a22..bbfd860c401 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -694,7 +694,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) * * It is possible that the origin is not yet created for * tablesync worker, this can happen for the states before - * SUBREL_STATE_FINISHEDCOPY. The apply worker can also + * SUBREL_STATE_DATASYNC. The apply worker can also * concurrently try to drop the origin and by this time * the origin might be already removed. For these reasons, * passing missing_ok = true. @@ -1206,7 +1206,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * * It is possible that the origin is not yet created for tablesync * worker so passing missing_ok = true. This can happen for the states - * before SUBREL_STATE_FINISHEDCOPY. + * before SUBREL_STATE_DATASYNC. */ ReplicationOriginNameForTablesync(subid, relid, originname, sizeof(originname)); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 3e9de50ff12..c02d2b7b5f7 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1057,12 +1057,26 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr; SpinLockRelease(&MyLogicalRepWorker->relmutex); - /* Update the state and make it visible to others. */ + /* + * Update the state, create the replication origin, and make them visible + * to others. + */ StartTransactionCommand(); UpdateSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn); + + /* + * Create the replication origin in a separate transaction from the one + * that sets up the origin in shared memory. This prevents the risk that + * changes to the origin in shared memory cannot be rolled back if the + * transaction aborts. + */ + originid = replorigin_by_name(originname, true); + if (!OidIsValid(originid)) + originid = replorigin_create(originname); + CommitTransactionCommand(); pgstat_report_stat(false); @@ -1100,37 +1114,21 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) CRS_USE_SNAPSHOT, origin_startpos); /* - * Setup replication origin tracking. The purpose of doing this before the - * copy is to avoid doing the copy again due to any error in setting up - * origin tracking. + * Advance the origin to the LSN got from walrcv_create_slot and then set + * up the origin. The advancement is WAL logged for the purpose of + * recovery. Locks are to prevent the replication origin from vanishing + * while advancing. + * + * The purpose of doing these before the copy is to avoid doing the copy + * again due to any error in advancing or setting up origin tracking. */ - originid = replorigin_by_name(originname, true); - if (!OidIsValid(originid)) - { - /* - * Origin tracking does not exist, so create it now. - * - * Then advance to the LSN got from walrcv_create_slot. This is WAL - * logged for the purpose of recovery. Locks are to prevent the - * replication origin from vanishing while advancing. - */ - originid = replorigin_create(originname); + LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); + replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr, + true /* go backward */ , true /* WAL log */ ); + UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); - LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); - replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr, - true /* go backward */ , true /* WAL log */ ); - UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); - - replorigin_session_setup(originid); - replorigin_session_origin = originid; - } - else - { - ereport(ERROR, - (errcode(ERRCODE_DUPLICATE_OBJECT), - errmsg("replication origin \"%s\" already exists", - originname))); - } + replorigin_session_setup(originid); + replorigin_session_origin = originid; /* Now do the initial data copy */ PushActiveSnapshot(GetTransactionSnapshot()); diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl index 545599b8f8d..cd5acf11a0f 100644 --- a/src/test/subscription/t/004_sync.pl +++ b/src/test/subscription/t/004_sync.pl @@ -6,7 +6,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 8; +use Test::More tests => 9; # Initialize publisher node my $node_publisher = get_new_node('publisher'); @@ -172,5 +172,11 @@ ok( $node_publisher->poll_query_until( 'postgres', 'SELECT count(*) = 0 FROM pg_replication_slots'), 'DROP SUBSCRIPTION during error can clean up the slots on the publisher'); +# After dropping the subscription, all replication origins, whether created by +# an apply worker or table sync worker, should have been cleaned up. +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_origin_status"); +is($result, qq(0), 'all replication origins have been cleaned up'); + $node_subscriber->stop('fast'); $node_publisher->stop('fast');