Fix misreporting of publisher sequence permissions during sync

When synchronizing sequences for logical replication, a
publisher-side permission failure could be reported as if the sequence
were missing on the publisher, making the real cause harder to
identify.

This happened because pg_get_sequence_data() returns a row of NULL
values when the replication connection lacks permission to read a
sequence. Sequence synchronization treated that the same as a missing
sequence, causing it to emit a misleading "missing sequence on
publisher" warning.

Fix this by distinguishing permission failures from genuinely missing
sequences. The synchronization query now checks whether the
replication connection has the required privilege for each published
sequence, allowing the worker to report permission failures
separately.

Author: Fujii Masao <masao.fujii@gmail.com>
Reviewed-by: Tristan Partin <tristan@partin.io>
Discussion: https://postgr.es/m/CAHGQGwGNTaXnBKUV510_P1KwhdbHT+kgZ4zU5njBHy7nCqdhzg@mail.gmail.com
This commit is contained in:
Fujii Masao 2026-06-20 18:19:23 +09:00
parent 73dab12719
commit d4a657b0a4
2 changed files with 86 additions and 31 deletions

View file

@ -72,13 +72,14 @@
#include "utils/syscache.h"
#include "utils/usercontext.h"
#define REMOTE_SEQ_COL_COUNT 10
#define REMOTE_SEQ_COL_COUNT 11
typedef enum CopySeqResult
{
COPYSEQ_SUCCESS,
COPYSEQ_MISMATCH,
COPYSEQ_INSUFFICIENT_PERM,
COPYSEQ_SUBSCRIBER_INSUFFICIENT_PERM,
COPYSEQ_PUBLISHER_INSUFFICIENT_PERM,
COPYSEQ_SKIPPED
} CopySeqResult;
@ -166,18 +167,22 @@ get_sequences_string(List *seqindexes, StringInfo buf)
* Report discrepancies found during sequence synchronization between
* the publisher and subscriber. Emits warnings for:
* a) mismatched definitions or concurrent rename
* b) insufficient privileges
* c) missing sequences on the subscriber
* b) insufficient privileges on the subscriber
* c) insufficient privileges on the publisher
* d) missing sequences on the publisher
* Then raises an ERROR to indicate synchronization failure.
*/
static void
report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
report_sequence_errors(List *mismatched_seqs_idx,
List *sub_insuffperm_seqs_idx,
List *pub_insuffperm_seqs_idx,
List *missing_seqs_idx)
{
StringInfoData seqstr;
/* Quick exit if there are no errors to report */
if (!mismatched_seqs_idx && !insuffperm_seqs_idx && !missing_seqs_idx)
if (!mismatched_seqs_idx && !sub_insuffperm_seqs_idx &&
!pub_insuffperm_seqs_idx && !missing_seqs_idx)
return;
initStringInfo(&seqstr);
@ -193,14 +198,25 @@ report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
seqstr.data));
}
if (insuffperm_seqs_idx)
if (sub_insuffperm_seqs_idx)
{
get_sequences_string(insuffperm_seqs_idx, &seqstr);
get_sequences_string(sub_insuffperm_seqs_idx, &seqstr);
ereport(WARNING,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg_plural("insufficient privileges on sequence (%s)",
"insufficient privileges on sequences (%s)",
list_length(insuffperm_seqs_idx),
errmsg_plural("insufficient privileges on subscriber sequence (%s)",
"insufficient privileges on subscriber sequences (%s)",
list_length(sub_insuffperm_seqs_idx),
seqstr.data));
}
if (pub_insuffperm_seqs_idx)
{
get_sequences_string(pub_insuffperm_seqs_idx, &seqstr);
ereport(WARNING,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg_plural("insufficient privileges on publisher sequence (%s)",
"insufficient privileges on publisher sequences (%s)",
list_length(pub_insuffperm_seqs_idx),
seqstr.data));
}
@ -235,6 +251,7 @@ get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel,
bool isnull;
int col = 0;
Datum datum;
bool remote_has_select_priv;
Oid remote_typid;
int64 remote_start;
int64 remote_increment;
@ -254,12 +271,18 @@ get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel,
(LogicalRepSequenceInfo *) list_nth(seqinfos, *seqidx);
/*
* The sequence data can be NULL due to insufficient privileges or if the
* sequence was dropped concurrently (see pg_get_sequence_data()).
* The remote sequence state can be NULL if the publisher lacks the
* required privileges or if the sequence was dropped concurrently after
* it was identified in the catalog snapshot (see pg_get_sequence_data()).
*/
remote_has_select_priv = DatumGetBool(slot_getattr(slot, ++col, &isnull));
Assert(!isnull);
datum = slot_getattr(slot, ++col, &isnull);
if (isnull)
return COPYSEQ_SKIPPED;
return remote_has_select_priv ? COPYSEQ_SKIPPED :
COPYSEQ_PUBLISHER_INSUFFICIENT_PERM;
seqinfo_local->last_value = DatumGetInt64(datum);
seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull));
@ -351,7 +374,7 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
if (!run_as_owner)
RestoreUserContext(&ucxt);
return COPYSEQ_INSUFFICIENT_PERM;
return COPYSEQ_SUBSCRIBER_INSUFFICIENT_PERM;
}
/*
@ -387,7 +410,8 @@ copy_sequences(WalReceiverConn *conn)
int n_seqinfos = list_length(seqinfos);
List *mismatched_seqs_idx = NIL;
List *missing_seqs_idx = NIL;
List *insuffperm_seqs_idx = NIL;
List *sub_insuffperm_seqs_idx = NIL;
List *pub_insuffperm_seqs_idx = NIL;
StringInfoData seqstr;
StringInfoData cmd;
MemoryContext oldctx;
@ -403,13 +427,14 @@ copy_sequences(WalReceiverConn *conn)
while (cur_batch_base_index < n_seqinfos)
{
Oid seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID,
Oid seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, BOOLOID, INT8OID,
BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID};
int batch_size = 0;
int batch_succeeded_count = 0;
int batch_mismatched_count = 0;
int batch_skipped_count = 0;
int batch_insuffperm_count = 0;
int batch_sub_insuffperm_count = 0;
int batch_pub_insuffperm_count = 0;
int batch_missing_count;
WalRcvExecResult *res;
@ -471,7 +496,8 @@ copy_sequences(WalReceiverConn *conn)
* matching.
*/
appendStringInfo(&cmd,
"SELECT s.seqidx, ps.*, seq.seqtypid,\n"
"SELECT s.seqidx, has_sequence_privilege(c.oid, 'SELECT'),\n"
" ps.*, seq.seqtypid,\n"
" seq.seqstart, seq.seqincrement, seq.seqmin,\n"
" seq.seqmax, seq.seqcycle\n"
"FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n"
@ -532,7 +558,7 @@ copy_sequences(WalReceiverConn *conn)
MemoryContextSwitchTo(oldctx);
batch_mismatched_count++;
break;
case COPYSEQ_INSUFFICIENT_PERM:
case COPYSEQ_SUBSCRIBER_INSUFFICIENT_PERM:
/*
* Remember sequences with insufficient privileges in a
@ -540,10 +566,22 @@ copy_sequences(WalReceiverConn *conn)
* after the transaction is committed.
*/
oldctx = MemoryContextSwitchTo(ApplyContext);
insuffperm_seqs_idx = lappend_int(insuffperm_seqs_idx,
seqidx);
sub_insuffperm_seqs_idx = lappend_int(sub_insuffperm_seqs_idx,
seqidx);
MemoryContextSwitchTo(oldctx);
batch_insuffperm_count++;
batch_sub_insuffperm_count++;
break;
case COPYSEQ_PUBLISHER_INSUFFICIENT_PERM:
/*
* Remember sequences for which the publisher lacks the
* privileges required by pg_get_sequence_data().
*/
oldctx = MemoryContextSwitchTo(ApplyContext);
pub_insuffperm_seqs_idx = lappend_int(pub_insuffperm_seqs_idx,
seqidx);
MemoryContextSwitchTo(oldctx);
batch_pub_insuffperm_count++;
break;
case COPYSEQ_SKIPPED:
@ -575,15 +613,16 @@ copy_sequences(WalReceiverConn *conn)
batch_missing_count = batch_size - (batch_succeeded_count +
batch_mismatched_count +
batch_insuffperm_count +
batch_sub_insuffperm_count +
batch_pub_insuffperm_count +
batch_skipped_count);
elog(DEBUG1,
"logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped",
"logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d subscriber insufficient permission, %d publisher insufficient permission, %d missing from publisher, %d skipped",
MySubscription->name,
(cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1,
batch_size, batch_succeeded_count, batch_mismatched_count,
batch_insuffperm_count, batch_missing_count, batch_skipped_count);
batch_sub_insuffperm_count, batch_pub_insuffperm_count, batch_missing_count, batch_skipped_count);
/* Commit this batch, and prepare for next batch */
CommitTransactionCommand();
@ -610,8 +649,8 @@ copy_sequences(WalReceiverConn *conn)
}
/* Report mismatches, permission issues, or missing sequences */
report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx,
missing_seqs_idx);
report_sequence_errors(mismatched_seqs_idx, sub_insuffperm_seqs_idx,
pub_insuffperm_seqs_idx, missing_seqs_idx);
}
/*

View file

@ -233,9 +233,9 @@ $node_publisher->safe_psql(
));
##########
# Ensure that insufficient privileges on the publisher for a sequence do not
# disrupt the subscriber. The subscriber should log a warning and continue
# retrying.
# Ensure that insufficient privileges on the publisher for a sequence
# are reported correctly as a permission issue, not as a missing sequence.
# The subscriber should log a warning and continue retrying.
##########
$node_publisher->safe_psql(
@ -254,6 +254,22 @@ $node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION regress_seq_sub CONNECTION '$publisher_limited_connstr'"
);
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION regress_seq_sub REFRESH SEQUENCES");
$node_subscriber->wait_for_log(
qr/WARNING: ( [A-Z0-9]+:)? insufficient privileges on publisher sequence \("public.regress_s2"\)/,
$log_offset);
##########
# Ensure that a sequence that is actually removed on the publisher is still
# reported as missing.
##########
$node_publisher->safe_psql('postgres', qq(DROP SEQUENCE regress_s2;));
$log_offset = -s $node_subscriber->logfile;
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION regress_seq_sub REFRESH SEQUENCES");