diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c index e2ff8d77b16..f47f962c7db 100644 --- a/src/backend/replication/logical/sequencesync.c +++ b/src/backend/replication/logical/sequencesync.c @@ -72,13 +72,14 @@ #include "utils/syscache.h" #include "utils/usercontext.h" -#define REMOTE_SEQ_COL_COUNT 10 +#define REMOTE_SEQ_COL_COUNT 11 typedef enum CopySeqResult { COPYSEQ_SUCCESS, COPYSEQ_MISMATCH, - COPYSEQ_INSUFFICIENT_PERM, + COPYSEQ_SUBSCRIBER_INSUFFICIENT_PERM, + COPYSEQ_PUBLISHER_INSUFFICIENT_PERM, COPYSEQ_SKIPPED } CopySeqResult; @@ -166,18 +167,22 @@ get_sequences_string(List *seqindexes, StringInfo buf) * Report discrepancies found during sequence synchronization between * the publisher and subscriber. Emits warnings for: * a) mismatched definitions or concurrent rename - * b) insufficient privileges - * c) missing sequences on the subscriber + * b) insufficient privileges on the subscriber + * c) insufficient privileges on the publisher + * d) missing sequences on the publisher * Then raises an ERROR to indicate synchronization failure. */ static void -report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx, +report_sequence_errors(List *mismatched_seqs_idx, + List *sub_insuffperm_seqs_idx, + List *pub_insuffperm_seqs_idx, List *missing_seqs_idx) { StringInfoData seqstr; /* Quick exit if there are no errors to report */ - if (!mismatched_seqs_idx && !insuffperm_seqs_idx && !missing_seqs_idx) + if (!mismatched_seqs_idx && !sub_insuffperm_seqs_idx && + !pub_insuffperm_seqs_idx && !missing_seqs_idx) return; initStringInfo(&seqstr); @@ -193,14 +198,25 @@ report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx, seqstr.data)); } - if (insuffperm_seqs_idx) + if (sub_insuffperm_seqs_idx) { - get_sequences_string(insuffperm_seqs_idx, &seqstr); + get_sequences_string(sub_insuffperm_seqs_idx, &seqstr); ereport(WARNING, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg_plural("insufficient privileges on sequence (%s)", - "insufficient privileges on sequences (%s)", - list_length(insuffperm_seqs_idx), + errmsg_plural("insufficient privileges on subscriber sequence (%s)", + "insufficient privileges on subscriber sequences (%s)", + list_length(sub_insuffperm_seqs_idx), + seqstr.data)); + } + + if (pub_insuffperm_seqs_idx) + { + get_sequences_string(pub_insuffperm_seqs_idx, &seqstr); + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg_plural("insufficient privileges on publisher sequence (%s)", + "insufficient privileges on publisher sequences (%s)", + list_length(pub_insuffperm_seqs_idx), seqstr.data)); } @@ -235,6 +251,7 @@ get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel, bool isnull; int col = 0; Datum datum; + bool remote_has_select_priv; Oid remote_typid; int64 remote_start; int64 remote_increment; @@ -254,12 +271,18 @@ get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel, (LogicalRepSequenceInfo *) list_nth(seqinfos, *seqidx); /* - * The sequence data can be NULL due to insufficient privileges or if the - * sequence was dropped concurrently (see pg_get_sequence_data()). + * The remote sequence state can be NULL if the publisher lacks the + * required privileges or if the sequence was dropped concurrently after + * it was identified in the catalog snapshot (see pg_get_sequence_data()). */ + remote_has_select_priv = DatumGetBool(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + datum = slot_getattr(slot, ++col, &isnull); if (isnull) - return COPYSEQ_SKIPPED; + return remote_has_select_priv ? COPYSEQ_SKIPPED : + COPYSEQ_PUBLISHER_INSUFFICIENT_PERM; + seqinfo_local->last_value = DatumGetInt64(datum); seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull)); @@ -351,7 +374,7 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner) if (!run_as_owner) RestoreUserContext(&ucxt); - return COPYSEQ_INSUFFICIENT_PERM; + return COPYSEQ_SUBSCRIBER_INSUFFICIENT_PERM; } /* @@ -387,7 +410,8 @@ copy_sequences(WalReceiverConn *conn) int n_seqinfos = list_length(seqinfos); List *mismatched_seqs_idx = NIL; List *missing_seqs_idx = NIL; - List *insuffperm_seqs_idx = NIL; + List *sub_insuffperm_seqs_idx = NIL; + List *pub_insuffperm_seqs_idx = NIL; StringInfoData seqstr; StringInfoData cmd; MemoryContext oldctx; @@ -403,13 +427,14 @@ copy_sequences(WalReceiverConn *conn) while (cur_batch_base_index < n_seqinfos) { - Oid seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID, + Oid seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, BOOLOID, INT8OID, BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID}; int batch_size = 0; int batch_succeeded_count = 0; int batch_mismatched_count = 0; int batch_skipped_count = 0; - int batch_insuffperm_count = 0; + int batch_sub_insuffperm_count = 0; + int batch_pub_insuffperm_count = 0; int batch_missing_count; WalRcvExecResult *res; @@ -471,7 +496,8 @@ copy_sequences(WalReceiverConn *conn) * matching. */ appendStringInfo(&cmd, - "SELECT s.seqidx, ps.*, seq.seqtypid,\n" + "SELECT s.seqidx, has_sequence_privilege(c.oid, 'SELECT'),\n" + " ps.*, seq.seqtypid,\n" " seq.seqstart, seq.seqincrement, seq.seqmin,\n" " seq.seqmax, seq.seqcycle\n" "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n" @@ -532,7 +558,7 @@ copy_sequences(WalReceiverConn *conn) MemoryContextSwitchTo(oldctx); batch_mismatched_count++; break; - case COPYSEQ_INSUFFICIENT_PERM: + case COPYSEQ_SUBSCRIBER_INSUFFICIENT_PERM: /* * Remember sequences with insufficient privileges in a @@ -540,10 +566,22 @@ copy_sequences(WalReceiverConn *conn) * after the transaction is committed. */ oldctx = MemoryContextSwitchTo(ApplyContext); - insuffperm_seqs_idx = lappend_int(insuffperm_seqs_idx, - seqidx); + sub_insuffperm_seqs_idx = lappend_int(sub_insuffperm_seqs_idx, + seqidx); MemoryContextSwitchTo(oldctx); - batch_insuffperm_count++; + batch_sub_insuffperm_count++; + break; + case COPYSEQ_PUBLISHER_INSUFFICIENT_PERM: + + /* + * Remember sequences for which the publisher lacks the + * privileges required by pg_get_sequence_data(). + */ + oldctx = MemoryContextSwitchTo(ApplyContext); + pub_insuffperm_seqs_idx = lappend_int(pub_insuffperm_seqs_idx, + seqidx); + MemoryContextSwitchTo(oldctx); + batch_pub_insuffperm_count++; break; case COPYSEQ_SKIPPED: @@ -575,15 +613,16 @@ copy_sequences(WalReceiverConn *conn) batch_missing_count = batch_size - (batch_succeeded_count + batch_mismatched_count + - batch_insuffperm_count + + batch_sub_insuffperm_count + + batch_pub_insuffperm_count + batch_skipped_count); elog(DEBUG1, - "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped", + "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d subscriber insufficient permission, %d publisher insufficient permission, %d missing from publisher, %d skipped", MySubscription->name, (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1, batch_size, batch_succeeded_count, batch_mismatched_count, - batch_insuffperm_count, batch_missing_count, batch_skipped_count); + batch_sub_insuffperm_count, batch_pub_insuffperm_count, batch_missing_count, batch_skipped_count); /* Commit this batch, and prepare for next batch */ CommitTransactionCommand(); @@ -610,8 +649,8 @@ copy_sequences(WalReceiverConn *conn) } /* Report mismatches, permission issues, or missing sequences */ - report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx, - missing_seqs_idx); + report_sequence_errors(mismatched_seqs_idx, sub_insuffperm_seqs_idx, + pub_insuffperm_seqs_idx, missing_seqs_idx); } /* diff --git a/src/test/subscription/t/036_sequences.pl b/src/test/subscription/t/036_sequences.pl index 8e293871efb..2a0819aaf01 100644 --- a/src/test/subscription/t/036_sequences.pl +++ b/src/test/subscription/t/036_sequences.pl @@ -233,9 +233,9 @@ $node_publisher->safe_psql( )); ########## -# Ensure that insufficient privileges on the publisher for a sequence do not -# disrupt the subscriber. The subscriber should log a warning and continue -# retrying. +# Ensure that insufficient privileges on the publisher for a sequence +# are reported correctly as a permission issue, not as a missing sequence. +# The subscriber should log a warning and continue retrying. ########## $node_publisher->safe_psql( @@ -254,6 +254,22 @@ $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION regress_seq_sub CONNECTION '$publisher_limited_connstr'" ); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_seq_sub REFRESH SEQUENCES"); + +$node_subscriber->wait_for_log( + qr/WARNING: ( [A-Z0-9]+:)? insufficient privileges on publisher sequence \("public.regress_s2"\)/, + $log_offset); + +########## +# Ensure that a sequence that is actually removed on the publisher is still +# reported as missing. +########## + +$node_publisher->safe_psql('postgres', qq(DROP SEQUENCE regress_s2;)); + +$log_offset = -s $node_subscriber->logfile; + $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION regress_seq_sub REFRESH SEQUENCES");